Implement #1430: [sys.monitoring] Conditional breakpoints

Implement #1431: [sys.monitoring] Hit-conditional breakpoints
Implement #1432: [sys.monitoring] Logpoints
This commit is contained in:
Pavel Minaev 2024-01-08 21:48:29 -08:00
parent c91ae0ee78
commit 8ce0f35d75
7 changed files with 998 additions and 576 deletions

View file

@ -160,5 +160,6 @@ def hide_thread_from_debugger(thread):
DEBUGPY_TRACE_DEBUGPY is used to debug debugpy with debugpy
"""
if hide_debugpy_internals():
thread.is_debugpy_thread = True
thread.pydev_do_not_trace = True
thread.is_pydev_daemon_thread = True

View file

@ -4,6 +4,11 @@
def adapter():
"""
Returns the instance of Adapter corresponding to the debug adapter that is currently
connected to this process, or None if there is no adapter connected. Use in lieu of
Adapter.instance to avoid import cycles.
"""
from debugpy.server.adapters import Adapter
return Adapter.instance

View file

@ -9,9 +9,9 @@ from itertools import islice
from debugpy.adapter import components
from debugpy.common import json, log, messaging, sockets
from debugpy.common.messaging import Request
from debugpy.server import tracing, eval
from debugpy.server.tracing import Breakpoint, StackFrame
from debugpy.common.messaging import MessageDict, Request
from debugpy.server import eval
from debugpy.server.tracing import Breakpoint, StackFrame, Thread, Tracer
class Adapter:
@ -50,13 +50,13 @@ class Adapter:
server_access_token = None
"""Access token that the adapter must use to authenticate with this server."""
_is_initialized: bool = False
_has_started: bool = False
_client_id: str = None
_capabilities: Capabilities = None
_expectations: Expectations = None
_start_request: messaging.Request = None
_tracer: Tracer = None
def __init__(self, stream: messaging.JsonIOStream):
self._is_initialized = False
@ -65,6 +65,7 @@ class Adapter:
self._capabilities = None
self._expectations = None
self._start_request = None
self._tracer = Tracer.instance
self.channel = messaging.JsonMessageChannel(stream, self)
self.channel.start()
@ -139,6 +140,8 @@ class Adapter:
]
return {
"exceptionBreakpointFilters": exception_breakpoint_filters,
"supportsClipboardContext": True,
"supportsCompletionsRequest": True,
"supportsConditionalBreakpoints": True,
"supportsConfigurationDoneRequest": True,
@ -148,17 +151,15 @@ class Adapter:
"supportsExceptionInfoRequest": True,
"supportsExceptionOptions": True,
"supportsFunctionBreakpoints": True,
"supportsGotoTargetsRequest": True,
"supportsHitConditionalBreakpoints": True,
"supportsLogPoints": True,
"supportsModulesRequest": True,
"supportsSetExpression": True,
"supportsSetVariable": True,
"supportsValueFormattingOptions": True,
"supportsTerminateRequest": True,
"supportsGotoTargetsRequest": True,
"supportsClipboardContext": True,
"exceptionBreakpointFilters": exception_breakpoint_filters,
"supportsStepInTargetsRequest": True,
"supportsTerminateRequest": True,
"supportsValueFormattingOptions": True,
}
def _handle_start_request(self, request: Request):
@ -189,7 +190,7 @@ class Adapter:
'or an "attach" request'
)
tracing.start()
self._tracer.start()
self._has_started = True
request.respond({})
@ -233,20 +234,28 @@ class Adapter:
bps = list(request("breakpoints", json.array(json.object())))
else:
lines = request("lines", json.array(int))
bps = [{"line": line} for line in lines]
bps = [MessageDict(request, {"line": line}) for line in lines]
Breakpoint.clear([path])
bps_set = [Breakpoint.set(path, bp["line"]) for bp in bps]
bps_set = [
Breakpoint.set(
path, bp["line"],
condition=bp("condition", str, optional=True),
hit_condition=bp("hitCondition", str, optional=True),
log_message=bp("logMessage", str, optional=True),
)
for bp in bps
]
return {"breakpoints": bps_set}
def threads_request(self, request: Request):
return {"threads": tracing.Thread.enumerate()}
return {"threads": Thread.enumerate()}
def stackTrace_request(self, request: Request):
thread_id = request("threadId", int)
start_frame = request("startFrame", 0)
thread = tracing.Thread.get(thread_id)
thread = Thread.get(thread_id)
if thread is None:
raise request.isnt_valid(f'Invalid "threadId": {thread_id}')
@ -265,7 +274,7 @@ class Adapter:
thread_ids = None
else:
thread_ids = [request("threadId", int)]
tracing.pause(thread_ids)
self._tracer.pause(thread_ids)
return {}
def continue_request(self, request: Request):
@ -274,25 +283,25 @@ class Adapter:
else:
thread_ids = [request("threadId", int)]
single_thread = request("singleThread", False)
tracing.resume(thread_ids if single_thread else None)
self._tracer.resume(thread_ids if single_thread else None)
return {}
def stepIn_request(self, request: Request):
# TODO: support "singleThread" and "granularity"
thread_id = request("threadId", int)
tracing.step_in(thread_id)
self._tracer.step_in(thread_id)
return {}
def stepOut_request(self, request: Request):
# TODO: support "singleThread" and "granularity"
thread_id = request("threadId", int)
tracing.step_out(thread_id)
self._tracer.step_out(thread_id)
return {}
def next_request(self, request: Request):
# TODO: support "singleThread" and "granularity"
thread_id = request("threadId", int)
tracing.step_over(thread_id)
self._tracer.step_over(thread_id)
return {}
def scopes_request(self, request: Request):
@ -316,18 +325,18 @@ class Adapter:
return {"result": var.repr, "variablesReference": var.id}
def disconnect_request(self, request: Request):
tracing.Breakpoint.clear()
tracing.abandon_step()
tracing.resume()
Breakpoint.clear()
self._tracer.abandon_step()
self._tracer.resume()
return {}
def terminate_request(self, request: Request):
tracing.Breakpoint.clear()
tracing.abandon_step()
tracing.resume()
Breakpoint.clear()
self._tracer.abandon_step()
self._tracer.resume()
return {}
def disconnect(self):
tracing.resume()
self._tracer.resume()
self.connected_event.clear()
return {}

View file

@ -2,29 +2,28 @@
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
import debugpy
import threading
from collections.abc import Iterable
from debugpy.server.inspect import inspect
from types import FrameType
from typing import ClassVar, Dict, Literal, Self
from debugpy.server import tracing
from debugpy.server.inspect import inspect
ScopeKind = Literal["global", "nonlocal", "local"]
type ScopeKind = Literal["global", "nonlocal", "local"]
type StackFrame = "debugpy.server.tracing.StackFrame"
_lock = threading.RLock()
class VariableContainer:
frame: "tracing.StackFrame"
frame: StackFrame
id: int
_last_id: ClassVar[int] = 0
_all: ClassVar[Dict[int, "VariableContainer"]] = {}
def __init__(self, frame: "tracing.StackFrame"):
def __init__(self, frame: StackFrame):
self.frame = frame
with _lock:
VariableContainer._last_id += 1
@ -46,7 +45,7 @@ class VariableContainer:
raise NotImplementedError
@classmethod
def invalidate(self, *frames: Iterable["tracing.StackFrame"]) -> None:
def invalidate(self, *frames: Iterable[StackFrame]) -> None:
with _lock:
ids = [
id
@ -61,7 +60,7 @@ class Scope(VariableContainer):
frame: FrameType
kind: ScopeKind
def __init__(self, frame: "tracing.StackFrame", kind: ScopeKind):
def __init__(self, frame: StackFrame, kind: ScopeKind):
super().__init__(frame)
self.kind = kind
@ -92,7 +91,7 @@ class Variable(VariableContainer):
value: object
# TODO: evaluateName, memoryReference, presentationHint
def __init__(self, frame: "tracing.StackFrame", name: str, value: object):
def __init__(self, frame: StackFrame, name: str, value: object):
super().__init__(frame)
self.name = name
self.value = value

View file

@ -1,540 +0,0 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
import inspect
import sys
import threading
import traceback
from contextlib import contextmanager
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import Path
from sys import monitoring
from types import CodeType, FrameType
from typing import ClassVar, Dict, Iterable, List, Literal, Union
from debugpy.server import adapter
from debugpy.server.eval import Scope, VariableContainer
# Shared for all global state pertaining to breakpoints and stepping.
_cvar = threading.Condition()
# IDs of threads that are currently pausing or paused.
_pause_ids = set()
_steps = {}
@contextmanager
def cvar(who):
#print(end=f"ACQUIRING {who}\n")
with _cvar:
#print(end=f"ACQUIRED {who}\n")
yield
#print(end=f"RELEASING {who}\n")
#print(end=f"RELEASED {who}\n")
@dataclass
class Thread:
id: int = field(init=False)
thread: threading.Thread
def __post_init__(self):
# TODO: map 32-bit DAP thread IDs to (potentially) 64-bit Python thread IDs.
# Otherwise, large thread IDs (common on Linux) will be truncated when they are serialized as JSON.
self.id = self.thread.ident
def __getstate__(self):
return {
"id": self.id,
"name": self.thread.name,
}
@property
def is_traced(self):
return not getattr(self.thread, "pydev_do_not_trace", False)
@property
def name(self):
return self.thread.name
@classmethod
def enumerate(self) -> List["Thread"]:
return [
thread
for t in threading.enumerate()
for thread in [Thread(t)]
if thread.is_traced
]
@classmethod
def get(self, id: int) -> Union["Thread", None]:
for thread in self.enumerate():
if thread.id == id:
return thread
return None
def stack_trace(self) -> Iterable["StackFrame"]:
try:
(fobj,) = (fobj for (id, fobj) in sys._current_frames().items() if id == self.id)
except ValueError:
raise ValueError(f"Can't get frames for inactive Thread({self.id})")
for fobj, _ in traceback.walk_stack(fobj):
frame = StackFrame.from_frame_object(self, fobj)
if not frame.is_internal():
yield frame
@dataclass
class StackFrame:
thread: Thread
frame_object: FrameType
id: int = field(init=False)
_path: Path = field(init=False)
_scopes: List[Scope] = field(init=False, default=None)
_last_id: ClassVar[int] = 0
_all: ClassVar[Dict[int, "StackFrame"]] = {}
def __post_init__(self):
StackFrame._last_id += 1
self.id = StackFrame._last_id
self._path = None
self._all[self.id] = self
def __getstate__(self):
return {
"id": self.id,
"name": self.frame_object.f_code.co_name,
"source": {
# TODO: use "sourceReference" when path isn't available (e.g. decompiled code)
"path": str(self.path()),
},
"line": self.frame_object.f_lineno,
"column": 1, # TODO
# TODO: "endLine", "endColumn", "moduleId", "instructionPointerReference"
}
@property
def line(self) -> int:
return self.frame_object.f_lineno
def path(self) -> Path:
if self._path is None:
path = Path(self.frame_object.f_code.co_filename)
try:
path = path.resolve()
except (OSError, RuntimeError):
pass
# No need to sync this.
self._path = path
return self._path
def is_internal(self) -> bool:
# TODO: filter internal frames properly
parts = self.path().parts
internals = ["debugpy", "threading"]
return any(part.startswith(s) for s in internals for part in parts)
@classmethod
def get(self, id: int) -> "StackFrame":
return self._all.get(id, None)
@classmethod
def from_frame_object(self, thread: Thread, frame_object: FrameType) -> "StackFrame":
for frame in self._all.values():
if frame.thread.id == thread.id and frame.frame_object is frame_object:
return frame
return StackFrame(thread, frame_object)
def scopes(self) -> List[Scope]:
if self._scopes is None:
self._scopes = [
Scope(self.frame_object, "local"),
Scope(self.frame_object, "global"),
]
return self._scopes
@classmethod
def invalidate(self, thread_id: int):
frames = [frame for frame in self._all.values() if frame.thread.id == thread_id]
VariableContainer.invalidate(*frames)
@dataclass
class Step:
step: Literal["in", "out", "over"]
origin: FrameType = None
origin_line: int = None
@dataclass
class Breakpoint:
path: Path
line: int
is_enabled: bool = True
id: int = field(init=False)
_last_id: ClassVar[int] = 0
_all: ClassVar[Dict[int, "Breakpoint"]] = {}
_at: ClassVar[Dict[Path, Dict[int, List["Breakpoint"]]]] = defaultdict(
lambda: defaultdict(lambda: [])
)
def __post_init__(self):
Breakpoint._last_id += 1
self.id = Breakpoint._last_id
with cvar(1):
self._all[self.id] = self
self._at[self.path][self.line].append(self)
_cvar.notify_all()
def __getstate__(self):
return {
"line": self.line,
"verified": True, # TODO
}
def is_hit(self, frame: StackFrame):
with cvar(2):
# Check path last since path resolution is potentially expensive.
return (
self.is_enabled
and frame.line == self.line
and frame.path() == self.path
)
@classmethod
def at(self, path: str, line: int) -> List["Breakpoint"]:
with cvar(3):
return self._at[path][line]
@classmethod
def clear(self, paths: Iterable[str] = None):
#print("clear-bp", paths)
if paths is not None:
paths = [Path(path).resolve() for path in paths]
with cvar(4):
if paths is None:
paths = list(self._at.keys())
for path in paths:
bps_in = self._at.pop(path, {}).values()
for bps_at in bps_in:
for bp in bps_at:
del self._all[bp.id]
_cvar.notify_all()
monitoring.restart_events()
@classmethod
def set(self, path: str, line: int) -> "Breakpoint":
try:
path = Path(path).resolve()
except (OSError, RuntimeError):
pass
#print("set-bp", path, line)
bp = Breakpoint(path, line)
monitoring.restart_events()
return bp
def enable(self, is_enabled: bool):
with cvar(5):
self.is_enabled = is_enabled
_cvar.notify_all()
def start():
for thread in Thread.enumerate():
adapter().channel.send_event(
"thread",
{
"reason": "started",
"threadId": thread.id,
"name": thread.name,
},
)
monitoring.use_tool_id(monitoring.DEBUGGER_ID, "debugpy")
monitoring.set_events(
monitoring.DEBUGGER_ID,
(
monitoring.events.LINE
| monitoring.events.PY_START
| monitoring.events.PY_RETURN
| monitoring.events.PY_RESUME
| monitoring.events.PY_YIELD
| monitoring.events.PY_THROW
| monitoring.events.PY_UNWIND
| monitoring.events.RAISE
| monitoring.events.RERAISE
| monitoring.events.EXCEPTION_HANDLED
),
)
trace_funcs = {
monitoring.events.LINE: _trace_line,
monitoring.events.PY_START: _trace_py_start,
monitoring.events.PY_RESUME: _trace_py_resume,
monitoring.events.PY_RETURN: _trace_py_return,
monitoring.events.PY_YIELD: _trace_py_yield,
monitoring.events.PY_THROW: _trace_py_throw,
monitoring.events.PY_UNWIND: _trace_py_unwind,
monitoring.events.RAISE: _trace_raise,
monitoring.events.RERAISE: _trace_reraise,
monitoring.events.EXCEPTION_HANDLED: _trace_exception_handled,
}
for event, func in trace_funcs.items():
monitoring.register_callback(monitoring.DEBUGGER_ID, event, func)
def pause(thread_ids: List[int] = None):
#print(f"PAUSE {thread_ids=}")
if thread_ids is None:
thread_ids = [thread.id for thread in Thread.enumerate()]
# TODO: handle race between the above and new threads starting when doing pause-the-world.
with cvar(6):
_pause_ids.update(thread_ids)
_cvar.notify_all()
monitoring.restart_events()
def resume(thread_ids: List[int] = None):
#print(f"RESUME {thread_ids=}")
with cvar(7):
if thread_ids is None:
_pause_ids.clear()
else:
_pause_ids.difference_update(thread_ids)
_cvar.notify_all()
monitoring.restart_events()
def abandon_step(thread_ids: List[int] = None):
#print(f"ABANDON_STEP {thread_ids=}")
with cvar(8):
if thread_ids is None:
thread_ids = [thread.id for thread in Thread.enumerate()]
for thread_id in thread_ids:
_steps.pop(thread_id, None)
_cvar.notify_all()
monitoring.restart_events()
def step_in(thread_id: int):
with cvar(9):
_steps[thread_id] = Step("in")
_pause_ids.clear()
_cvar.notify_all()
monitoring.restart_events()
def step_out(thread_id: int):
with cvar(10):
_steps[thread_id] = Step("out")
_pause_ids.clear()
_cvar.notify_all()
monitoring.restart_events()
def step_over(thread_id: int):
with cvar(11):
_steps[thread_id] = Step("over")
_pause_ids.clear()
_cvar.notify_all()
monitoring.restart_events()
# On shutdown, modules go away (become None), but _trace_line is still invoked.
DISABLE = monitoring.DISABLE
def _stop(frame_obj: FrameType, reason: str, hit_breakpoints: Iterable[Breakpoint] = ()):
thread_id = threading.get_ident()
#print(f"STOP {thread_id=}, {reason=}, {hit_breakpoints=}")
with cvar(12):
if thread_id not in _pause_ids:
#print("STOP: not paused")
return
#print("SENDING...")
adapter().channel.send_event(
"stopped",
{
"reason": reason,
"threadId": threading.get_ident(),
"allThreadsStopped": False, # TODO
"hitBreakpointIds": [bp.id for bp in hit_breakpoints],
},
)
#print("SENT!")
#print(f"BLOCK {thread_id=}")
while thread_id in _pause_ids:
_cvar.wait()
#print(f"UNBLOCK {thread_id=}")
step = _steps.get(thread_id, None)
if step is not None and step.origin is None:
step.origin = frame_obj
step.origin_line = frame_obj.f_lineno
def _trace_line(code: CodeType, line_number: int):
if monitoring is None:
return DISABLE
thread = Thread(threading.current_thread())
if not thread.is_traced:
return DISABLE
stop_reason = None
with cvar(13):
if thread.id in _pause_ids:
stop_reason = "pause"
step = _steps.get(thread.id, None)
is_stepping = step is not None and step.origin is not None
if is_stepping:
# TODO: use CALL/RETURN/PY_RETURN to track these more efficiently.
frame_obj = inspect.currentframe().f_back
step_finished = False
if step.step == "in":
if frame_obj is not step.origin or line_number != step.origin_line:
step_finished = True
elif step.step == "out":
step_finished = True
while frame_obj is not None:
if frame_obj is step.origin:
step_finished = False
break
frame_obj = frame_obj.f_back
elif step.step == "over":
step_finished = True
while frame_obj is not None:
if frame_obj is step.origin and frame_obj.f_lineno == step.origin_line:
step_finished = False
break
frame_obj = frame_obj.f_back
else:
raise ValueError(f"Unknown step type: {step.step}")
if step_finished:
del _steps[thread.id]
_pause_ids.add(thread.id)
_cvar.notify_all()
stop_reason = "step"
if stop_reason is not None:
return _stop(inspect.currentframe().f_back, stop_reason)
path = Path(code.co_filename)
try:
path = path.resolve()
except (OSError, RuntimeError):
pass
# print(f"TRACE_LINE {thread_id=}, {path=}, {line_number=}")
bps = Breakpoint.at(path, line_number)
if not bps and not is_stepping:
return DISABLE
frame = StackFrame(thread, inspect.currentframe().f_back)
try:
bps_hit = [bp for bp in bps if bp.is_hit(frame)]
if bps_hit:
#print("!BREAKPOINT HIT!")
with cvar(14):
_pause_ids.add(thread.id)
_cvar.notify_all()
return _stop(frame.frame_object, "breakpoint", bps_hit)
finally:
del frame
def _trace_py_start(code: CodeType, ip: int):
if threading.current_thread() is not threading.main_thread():
return
#print(f"TRACE_PY_START {code=}, {ip=}")
def _trace_py_resume(code: CodeType, ip: int):
if threading.current_thread() is not threading.main_thread():
return
#print(f"TRACE_PY_RESUME {code=}, {ip=}")
def _trace_py_return(code: CodeType, ip: int, retval: object):
if threading.current_thread() is not threading.main_thread():
return
try:
retval = repr(retval)
except:
retval = "<unrepresentable>"
#print(f"TRACE_PY_RETURN {code=}, {ip=}, {retval=}")
def _trace_py_yield(code: CodeType, ip: int, retval: object):
if threading.current_thread() is not threading.main_thread():
return
try:
retval = repr(retval)
except:
retval = "<unrepresentable>"
#print(f"TRACE_PY_YIELD {code=}, {ip=}, {retval=}")
def _trace_py_throw(code: CodeType, ip: int, exc: BaseException):
if threading.current_thread() is not threading.main_thread():
return
try:
exc = repr(exc)
except:
exc = "<unrepresentable>"
#print(f"TRACE_PY_THROW {code=}, {ip=}, {exc=}")
def _trace_py_unwind(code: CodeType, ip: int, exc: BaseException):
if threading.current_thread() is not threading.main_thread():
return
try:
exc = repr(exc)
except:
exc = "<unrepresentable>"
#print(f"TRACE_PY_UNWIND {code=}, {ip=}, {exc=}")
def _trace_raise(code: CodeType, ip: int, exc: BaseException):
if threading.current_thread() is not threading.main_thread():
return
try:
exc = repr(exc)
except:
exc = "<unrepresentable>"
#print(f"TRACE_RAISE {code=}, {ip=}, {exc=}")
def _trace_reraise(code: CodeType, ip: int, exc: BaseException):
if threading.current_thread() is not threading.main_thread():
return
try:
exc = repr(exc)
except:
exc = "<unrepresentable>"
#print(f"TRACE_RERAISE {code=}, {ip=}, {exc=}")
def _trace_exception_handled(code: CodeType, ip: int, exc: BaseException):
if threading.current_thread() is not threading.main_thread():
return
try:
exc = repr(exc)
except:
exc = "<unrepresentable>"
#print(f"TRACE_EXCEPTION_HANDLED {code=}, {ip=}, {exc=}")

View file

@ -0,0 +1,531 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
import re
import sys
import threading
import traceback
from collections import defaultdict
from dataclasses import dataclass
from debugpy import server
from debugpy.common import log
from debugpy.server.eval import Scope, VariableContainer
from pathlib import Path
from sys import monitoring
from types import CodeType, FrameType
from typing import Callable, ClassVar, Dict, Iterable, List, Literal, Union
# Shared for all global state pertaining to breakpoints and stepping.
_cvar = threading.Condition()
class Thread:
"""
Represents a DAP Thread object. Instances must never be created directly;
use Thread.from_python_thread() instead.
"""
id: int
"""DAP ID of this thread. Distinct from thread.ident."""
python_thread: threading.Thread
"""The Python thread object this DAP Thread represents."""
is_known_to_adapter: bool
"""
Whether this thread has been reported to the adapter via the
DAP "thread" event with "reason":"started".
"""
_last_id = 0
_all: ClassVar[Dict[int, "Thread"]] = {}
def __init__(self, python_thread):
"""
Create a new Thread object for the given thread. Do not invoke directly;
use Thread.get() instead.
"""
self.python_thread = python_thread
self.is_known_to_adapter = False
with _cvar:
# Thread IDs are serialized as JSON numbers in DAP, which are handled as 64-bit
# floats by most DAP clients. However, OS thread IDs can be large 64-bit integers
# on some platforms. To avoid loss of precision, we map all thread IDs to 32-bit
# signed integers; if the original ID fits, we use it as is, otherwise we use a
# generated negative ID that is guaranteed to fit.
self.id = self.python_thread.ident
if self.id != float(self.id):
Thread._last_id -= 1
self.id = Thread._last_id
self._all[self.id] = self
log.info(
f"DAP Thread(id={self.id}) created for Python Thread(ident={self.python_thread.ident})"
)
def __getstate__(self):
return {
"id": self.id,
"name": self.name,
}
@property
def is_debugpy_thread(self):
return getattr(self.python_thread, "is_debugpy_thread", False)
@property
def is_traced(self):
return not self.is_debugpy_thread
@property
def name(self):
return self.python_thread.name
@classmethod
def from_python_thread(self, python_thread: threading.Thread = None) -> "Thread":
"""
Returns the DAP Thread object corresponding to the given Python thread, or for
the current Python thread if None, creating it and reporting it to adapter if
necessary.
"""
if python_thread is None:
python_thread = threading.current_thread()
with _cvar:
for thread in self._all.values():
if thread.python_thread is python_thread:
break
else:
thread = Thread(python_thread)
thread.make_known_to_adapter()
return thread
@classmethod
def get(self, id: int) -> Union["Thread", None]:
"""
Finds a thread by its DAP ID. Returns None if ID is unknown.
"""
with _cvar:
return self._all.get(id, None)
@classmethod
def enumerate(self) -> List["Thread"]:
"""
Returns a list of all running threads in this process.
"""
return [
thread
for python_thread in threading.enumerate()
for thread in [Thread.from_python_thread(python_thread)]
if thread.is_traced
]
def make_known_to_adapter(self):
"""
If adapter is connected to this process, reports this thread to it via DAP
"thread" event with "reason":"started" if it hasn't been reported already.
Returns True if thread is now known to the adapter, and False if there was
no adapter to report it to.
"""
with _cvar:
if not self.is_traced:
return False
if self.is_known_to_adapter:
return True
adapter = server.adapter()
if adapter is None:
return False
adapter.channel.send_event(
"thread",
{
"reason": "started",
"threadId": self.id,
"name": self.name,
},
)
self.is_known_to_adapter = True
return True
def stack_trace(self) -> Iterable["StackFrame"]:
"""
Returns an iterable of StackFrame objects for the current stack of this thread,
starting with the topmost frame.
"""
try:
(fobj,) = (
fobj for (id, fobj) in sys._current_frames().items() if id == self.id
)
except ValueError:
raise ValueError(f"Can't get frames for inactive Thread({self.id})")
for fobj, _ in traceback.walk_stack(fobj):
frame = StackFrame.from_frame_object(self, fobj)
if not frame.is_internal():
yield frame
class StackFrame:
"""
Represents a DAP StackFrame object. Instances must never be created directly;
use StackFrame.from_frame_object() instead.
"""
thread: Thread
frame_object: FrameType
id: int
_path: Path
_scopes: List[Scope]
_last_id = 0
_all: ClassVar[Dict[int, "StackFrame"]] = {}
def __init__(self, thread: Thread, frame_object: FrameType):
"""
Create a new StackFrame object for the given thread and frame object. Do not
invoke directly; use StackFrame.from_frame_object() instead.
"""
StackFrame._last_id += 1
self.id = StackFrame._last_id
self.thread = thread
self.frame_object = frame_object
self._path = None
self._scopes = None
self._all[self.id] = self
def __getstate__(self):
return {
"id": self.id,
"name": self.frame_object.f_code.co_name,
"source": {
# TODO: use "sourceReference" when path isn't available (e.g. decompiled code)
"path": str(self.path()),
},
"line": self.frame_object.f_lineno,
"column": 1, # TODO
# TODO: "endLine", "endColumn", "moduleId", "instructionPointerReference"
}
@property
def line(self) -> int:
return self.frame_object.f_lineno
def path(self) -> Path:
if self._path is None:
path = Path(self.frame_object.f_code.co_filename)
try:
path = path.resolve()
except (OSError, RuntimeError):
pass
# No need to sync this since all instances are equivalent.
self._path = path
return self._path
def is_internal(self) -> bool:
# TODO: filter internal frames properly
parts = self.path().parts
internals = ["debugpy", "threading"]
return any(part.startswith(s) for s in internals for part in parts)
@classmethod
def from_frame_object(
self, thread: Thread, frame_object: FrameType
) -> "StackFrame":
for frame in self._all.values():
if frame.thread is thread and frame.frame_object is frame_object:
return frame
return StackFrame(thread, frame_object)
@classmethod
def get(self, id: int) -> "StackFrame":
return self._all.get(id, None)
def scopes(self) -> List[Scope]:
if self._scopes is None:
self._scopes = [
Scope(self.frame_object, "local"),
Scope(self.frame_object, "global"),
]
return self._scopes
@classmethod
def invalidate(self, thread_id: int):
frames = [frame for frame in self._all.values() if frame.thread.id == thread_id]
VariableContainer.invalidate(*frames)
@dataclass
class Step:
step: Literal["in", "out", "over"]
origin: FrameType = None
origin_line: int = None
class Condition:
"""
Expression that must be true for the breakpoint to be triggered.
"""
expression: str
"""Python expression that must evaluate to True for the breakpoint to be triggered."""
_code: CodeType
def __init__(self, breakpoint: "Breakpoint", expression: str):
self.expression = expression
self._code = compile(
expression, f"breakpoint-{breakpoint.id}-condition", "eval"
)
def test(self, frame: StackFrame) -> bool:
"""
Returns True if the breakpoint should be triggered in the specified frame.
"""
try:
return bool(
eval(
self._code,
frame.frame_object.f_globals,
frame.frame_object.f_locals,
)
)
except:
log.exception(
f"Exception while evaluating breakpoint condition: {self.expression}"
)
return False
class HitCondition:
"""
Hit count expression that must be True for the breakpoint to be triggered.
Must have the format `[<operator>]<count>`, where <count> is a positive integer literal,
and <operator> is one of `==` `>` `>=` `<` `<=` `%`, defaulting to `==` if unspecified.
Examples:
5: break on the 5th hit
==5: ditto
>5: break on every hit after the 5th
>=5: break on the 5th hit and thereafter
%5: break on every 5th hit
"""
_OPERATORS = {
"==": lambda expected_count, count: count == expected_count,
">": lambda expected_count, count: count > expected_count,
">=": lambda expected_count, count: count >= expected_count,
"<": lambda expected_count, count: count < expected_count,
"<=": lambda expected_count, count: count <= expected_count,
"%": lambda expected_count, count: count % expected_count == 0,
}
hit_condition: str
_count: int
_operator: Callable[[int, int], bool]
def __init__(self, hit_condition: str):
self.hit_condition = hit_condition
m = re.match(r"([<>=]+)?(\d+)", hit_condition)
if not m:
raise ValueError(f"Invalid hit condition: {hit_condition}")
self._count = int(m.group(2))
try:
op = self._OPERATORS[m.group(1) or "=="]
except KeyError:
raise ValueError(f"Invalid hit condition operator: {op}")
self.test = lambda count: op(self._count, count)
def test(self, count: int) -> bool:
"""
Returns True if the breakpoint should be triggered on the given hit count.
"""
# __init__ replaces this method with an actual implementation from _OPERATORS
# when it parses the condition.
raise NotImplementedError
class LogMessage:
"""
A message with spliced expressions, to be logged when a breakpoint is triggered.
"""
message: str
"""The message to be logged. May contain expressions in curly braces."""
_code: CodeType
"""Compiled code object for the f-string corresponding to the message."""
def __init__(self, breakpoint: "Breakpoint", message: str):
self.message = message
f_string = "f" + repr(message)
self._code = compile(f_string, f"breakpoint-{breakpoint.id}-logMessage", "eval")
def format(self, frame: StackFrame) -> str:
"""
Formats the message using the specified frame's locals and globals.
"""
try:
return eval(
self._code, frame.frame_object.f_globals, frame.frame_object.f_locals
)
except:
log.exception(
f"Exception while formatting breakpoint log message: {self.message}"
)
return self.message
class Breakpoint:
"""
Represents a DAP Breakpoint.
"""
id: int
path: Path
line: int
is_enabled: bool
condition: Condition | None
hit_condition: HitCondition | None
log_message: LogMessage | None
hit_count: int
"""Number of times this breakpoint has been hit."""
_last_id = 0
_all: ClassVar[Dict[int, "Breakpoint"]] = {}
_at: ClassVar[Dict[Path, Dict[int, List["Breakpoint"]]]] = defaultdict(
lambda: defaultdict(lambda: [])
)
def __init__(
self, path, line, *, condition=None, hit_condition=None, log_message=None
):
with _cvar:
Breakpoint._last_id += 1
self.id = Breakpoint._last_id
self.path = path
self.line = line
self.is_enabled = True
self.condition = Condition(self, condition) if condition else None
self.hit_condition = HitCondition(hit_condition) if hit_condition else None
self.log_message = LogMessage(self, log_message) if log_message else None
self.hit_count = 0
with _cvar:
self._all[self.id] = self
self._at[self.path][self.line].append(self)
_cvar.notify_all()
def __getstate__(self):
return {
"line": self.line,
"verified": True, # TODO
}
@classmethod
def at(self, path: str, line: int) -> List["Breakpoint"]:
"""
Returns a list of all breakpoints at the specified location.
"""
with _cvar:
return self._at[path][line]
@classmethod
def clear(self, paths: Iterable[str] = None):
"""
Removes all breakpoints in the specified files, or all files if None.
"""
if paths is not None:
paths = [Path(path).resolve() for path in paths]
with _cvar:
if paths is None:
paths = list(self._at.keys())
for path in paths:
bps_in = self._at.pop(path, {}).values()
for bps_at in bps_in:
for bp in bps_at:
del self._all[bp.id]
_cvar.notify_all()
monitoring.restart_events()
@classmethod
def set(
self,
path: str,
line: int,
*,
condition=None,
hit_condition=None,
log_message=None,
) -> "Breakpoint":
"""
Creates a new breakpoint at the specified location.
"""
try:
path = Path(path).resolve()
except (OSError, RuntimeError):
pass
bp = Breakpoint(
path,
line,
condition=condition,
hit_condition=hit_condition,
log_message=log_message,
)
monitoring.restart_events()
return bp
def enable(self, is_enabled: bool):
"""
Enables or disables this breakpoint.
"""
with _cvar:
self.is_enabled = is_enabled
_cvar.notify_all()
def is_triggered(self, frame: StackFrame) -> bool | str:
"""
Determines whether this breakpoint is triggered by the current line in the
specified stack frame, and updates its hit count.
If the breakpoint is triggered, returns a truthy value; if the breakpoint has
a log message, it is formatted and returned, otherwise True is returned.
"""
with _cvar:
# Check path last since path resolution is potentially expensive.
if (
not self.is_enabled
or frame.line != self.line
or frame.path() != self.path
):
return False
# Hit count must be updated even if conditions are false and execution
# isn't stopped.
self.hit_count += 1
# Check hit_condition first since it is faster than checking condition.
if self.hit_condition is not None and not self.hit_condition.test(
self.hit_count
):
return False
if self.condition is not None and not self.condition.test(frame):
return False
# If this is a logpoint, return the formatted message instead of True.
if self.log_message is not None:
return self.log_message.format(frame)
return True
# sys.monitoring callbacks are defined in a separate submodule to enable tighter
# control over their use of global state; see comment there for details.
from .tracer import Tracer # noqa

View file

@ -0,0 +1,417 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
# Once callbacks are registered they are invoked even during finalization when the
# Python is shutting down. Thus, trace_* methods, and any other methods that they
# invoke, must not use any globals from this or other modules (including globals
# that represent imported modules or defined classes!) until it checks that they
# are present, or preload them into class or instance attributes in advance.
# To facilitate this, Tracer is defined in a separate submodule which should not
# contain ANY top-level imports other than typing nor definitions other than the
# class itself. All other imports must be done in class scope and then referred to
# from methods via self.
from types import CodeType, FrameType
from typing import Iterable
class Log:
"""
Safe logging for Tracer. Delegates to debugpy.common.log, but only when it is
safe to do so (i.e. not during finalization).
"""
from debugpy.common import log
def __init__(self):
import atexit
def nop(*args, **kwargs):
pass
@atexit.register
def disable():
self.debug = self.info = self.warning = self.error = self.exception = nop
def debug(self, *args, **kwargs):
# self.log.debug("{0}", *args, **kwargs)
# print(*args)
pass
def info(self, *args, **kwargs):
self.log.info("{0}", *args, **kwargs)
def warning(self, *args, **kwargs):
self.log.warning("{0}", *args, **kwargs)
def error(self, *args, **kwargs):
self.log.error("{0}", *args, **kwargs)
def exception(self, *args, **kwargs):
self.log.exception("{0}", *args, **kwargs)
class Tracer:
"""
Singleton that manages sys.monitoring callbacks for this process.
"""
import inspect
import threading
from debugpy import server
from debugpy.server.tracing import Breakpoint, Step, Thread, StackFrame, _cvar
from pathlib import Path
from sys import monitoring
instance: "Tracer"
log: Log
_pause_ids = set()
"""IDs of threads that are currently pausing or paused."""
_steps = {}
"""Ongoing steps, keyed by thread ID."""
def __init__(self):
self.log = Log()
@property
def adapter(self):
return self.server.adapter()
def start(self):
"""
Register sys.monitoring tracing callbacks.
"""
self.log.info("Registering sys.monitoring tracing callbacks...")
self.monitoring.use_tool_id(self.monitoring.DEBUGGER_ID, "debugpy")
self.monitoring.set_events(
self.monitoring.DEBUGGER_ID,
(
self.monitoring.events.LINE
| self.monitoring.events.PY_START
| self.monitoring.events.PY_RETURN
| self.monitoring.events.PY_RESUME
| self.monitoring.events.PY_YIELD
| self.monitoring.events.PY_THROW
| self.monitoring.events.PY_UNWIND
| self.monitoring.events.RAISE
| self.monitoring.events.RERAISE
| self.monitoring.events.EXCEPTION_HANDLED
),
)
trace_funcs = {
self.monitoring.events.LINE: self._trace_line,
self.monitoring.events.PY_START: self._trace_py_start,
self.monitoring.events.PY_RESUME: self._trace_py_resume,
self.monitoring.events.PY_RETURN: self._trace_py_return,
self.monitoring.events.PY_YIELD: self._trace_py_yield,
self.monitoring.events.PY_THROW: self._trace_py_throw,
self.monitoring.events.PY_UNWIND: self._trace_py_unwind,
self.monitoring.events.RAISE: self._trace_raise,
self.monitoring.events.RERAISE: self._trace_reraise,
self.monitoring.events.EXCEPTION_HANDLED: self._trace_exception_handled,
}
for event, func in trace_funcs.items():
self.monitoring.register_callback(self.monitoring.DEBUGGER_ID, event, func)
self.log.info("sys.monitoring tracing callbacks registered.")
def pause(self, thread_ids: Iterable[int] = None):
"""
Pause the specified threads, or all threads if thread_ids is None.
"""
if thread_ids is None:
# Pausing is async, so additional threads may be spawned even as we are
# trying to pause the ones we currently know about; iterate until all
# known threads are paused, and no new threads appear.
while True:
thread_ids = {thread.id for thread in self.Thread.enumerate()}
if self._pause_ids.keys() == thread_ids:
return
self.pause(thread_ids)
else:
self.log.info(f"Pausing threads: {thread_ids}")
with self._cvar:
self._pause_ids.update(thread_ids)
self._cvar.notify_all()
self.monitoring.restart_events()
def resume(self, thread_ids: Iterable[int] = None):
"""
Resume the specified threads, or all threads if thread_ids is None.
"""
with self._cvar:
if thread_ids is None:
self.log.info("Resuming all threads.")
self._pause_ids.clear()
else:
self.log.info(f"Resuming threads: {thread_ids}")
self._pause_ids.difference_update(thread_ids)
self._cvar.notify_all()
self.monitoring.restart_events()
def abandon_step(self, thread_ids: Iterable[int] = None):
"""
Abandon any ongoing steps that are in progress on the specified threads
(all threads if thread_ids is None).
"""
with self._cvar:
if thread_ids is None:
thread_ids = [thread.id for thread in self.Thread.enumerate()]
for thread_id in thread_ids:
step = self._steps.pop(thread_id, None)
if step is not None:
self.log.info(f"Abandoned step-{step.step} on {thread_id}.")
self._cvar.notify_all()
self.monitoring.restart_events()
def step_in(self, thread_id: int):
"""
Step into the next statement executed by the specified thread.
"""
self.log.info(f"Step in on thread {thread_id}.")
with self._cvar:
self._steps[thread_id] = self.Step("in")
self._pause_ids.clear()
self._cvar.notify_all()
self.monitoring.restart_events()
def step_out(self, thread_id: int):
"""
Step out of the current function executed by the specified thread.
"""
self.log.info(f"Step out on thread {thread_id}.")
with self._cvar:
self._steps[thread_id] = self.Step("out")
self._pause_ids.clear()
self._cvar.notify_all()
self.monitoring.restart_events()
def step_over(self, thread_id: int):
self.log.info(f"Step over on thread {thread_id}.")
"""
Step over the next statement executed by the specified thread.
"""
with self._cvar:
self._steps[thread_id] = self.Step("over")
self._pause_ids.clear()
self._cvar.notify_all()
self.monitoring.restart_events()
def _stop(
self,
frame_obj: FrameType,
reason: str,
hit_breakpoints: Iterable[Breakpoint] = (),
):
thread = self.Thread.from_python_thread()
self.log.info(f"Pausing thread {thread.id}: {reason}.")
with self._cvar:
if thread.id not in self._pause_ids:
return
self.adapter.channel.send_event(
"stopped",
{
"reason": reason,
"threadId": thread.id,
"allThreadsStopped": False, # TODO
"hitBreakpointIds": [bp.id for bp in hit_breakpoints],
},
)
self.log.info(f"Thread {thread.id} paused.")
while thread.id in self._pause_ids:
self._cvar.wait()
self.log.info(f"Thread {thread.id} unpaused.")
step = self._steps.get(thread.id, None)
if step is not None and step.origin is None:
step.origin = frame_obj
step.origin_line = frame_obj.f_lineno
def _trace_line(self, code: CodeType, line_number: int):
thread = self.Thread.from_python_thread()
if not thread.is_traced:
return self.monitoring.DISABLE
self.log.debug(f"sys.monitoring event: LINE({line_number}, {code})")
frame_obj = self.inspect.currentframe().f_back
stop_reason = None
with self._cvar:
if thread.id in self._pause_ids:
stop_reason = "pause"
step = self._steps.get(thread.id, None)
is_stepping = step is not None and step.origin is not None
if not is_stepping:
self.log.debug(f"No step in progress on thread {thread.id}.")
else:
self.log.debug(
f"Tracing step-{step.step} originating from {step.origin} on thread {thread.id}."
)
# TODO: use CALL/RETURN/PY_RETURN to track these more efficiently.
step_finished = False
if step.step == "in":
if frame_obj is not step.origin or line_number != step.origin_line:
step_finished = True
elif step.step == "out":
step_finished = True
while frame_obj is not None:
if frame_obj is step.origin:
step_finished = False
break
frame_obj = frame_obj.f_back
elif step.step == "over":
step_finished = True
while frame_obj is not None:
if (
frame_obj is step.origin
and frame_obj.f_lineno == step.origin_line
):
step_finished = False
break
frame_obj = frame_obj.f_back
else:
raise ValueError(f"Unknown step type: {step.step}")
if step_finished:
self.log.info(f"Step-{step.step} finished on thread {thread.id}.")
del self._steps[thread.id]
self._pause_ids.add(thread.id)
self._cvar.notify_all()
stop_reason = "step"
if stop_reason is not None:
# Even if this thread is pausing, any debugpy internal code on it should
# keep running until it returns to user code; otherwise, it may deadlock
# if it was holding e.g. a messaging lock.
print(frame_obj.f_globals.get("__name__"))
if not frame_obj.f_globals.get("__name__", "").startswith("debugpy"):
return self._stop(frame_obj, stop_reason)
self.log.debug(f"Resolving path {code.co_filename!r}...")
path = self.Path(code.co_filename)
try:
path = path.resolve()
except (OSError, RuntimeError):
pass
self.log.debug(f"Path {code.co_filename!r} resolved to {path}.")
bps = self.Breakpoint.at(path, line_number)
if not bps and not is_stepping:
self.log.debug(f"No breakpoints at {path}:{line_number}.")
return self.monitoring.DISABLE
self.log.debug(f"Considering breakpoints: {[bp.__getstate__() for bp in bps]}.")
frame = self.StackFrame(thread, self.inspect.currentframe().f_back)
try:
stop_bps = []
for bp in bps:
match bp.is_triggered(frame):
case str() as message:
# Triggered, has logMessage - print it but don't stop.
self.adapter.channel.send_event(
"output",
{
"category": "console",
"output": message,
"line": line_number,
"source": {"path": path},
},
)
case triggered if triggered:
# Triggered, no logMessage - stop.
stop_bps.append(bp)
case _:
continue
if stop_bps:
self.log.info(
f"Stack frame {frame} stopping at breakpoints {[bp.__getstate__() for bp in stop_bps]}."
)
with self._cvar:
self._pause_ids.add(thread.id)
self._cvar.notify_all()
return self._stop(frame.frame_object, "breakpoint", stop_bps)
finally:
del frame
def _trace_py_start(self, code: CodeType, ip: int):
thread = self.Thread.from_python_thread()
if not thread.is_traced:
return self.monitoring.DISABLE
self.log.debug(f"sys.monitoring event: PY_START({code}, {ip})")
def _trace_py_resume(self, code: CodeType, ip: int):
thread = self.Thread.from_python_thread()
if not thread.is_traced:
return self.monitoring.DISABLE
self.log.debug(f"sys.monitoring event: PY_RESUME({code}, {ip})")
def _trace_py_return(self, code: CodeType, ip: int, retval: object):
thread = self.Thread.from_python_thread()
if not thread.is_traced:
return self.monitoring.DISABLE
self.log.debug(f"sys.monitoring event: PY_RETURN({code}, {ip})")
# TODO: capture returned value to report it when client requests locals.
pass
def _trace_py_yield(self, code: CodeType, ip: int, retval: object):
thread = self.Thread.from_python_thread()
if not thread.is_traced:
return self.monitoring.DISABLE
self.log.debug(f"sys.monitoring event: PY_YIELD({code}, {ip})")
# TODO: capture yielded value to report it when client requests locals.
pass
def _trace_py_throw(self, code: CodeType, ip: int, exc: BaseException):
thread = self.Thread.from_python_thread()
if not thread.is_traced:
return
self.log.debug(
f"sys.monitoring event: PY_THROW({code}, {ip}, {type(exc).__qualname__})"
)
def _trace_py_unwind(self, code: CodeType, ip: int, exc: BaseException):
thread = self.Thread.from_python_thread()
if not thread.is_traced:
return
self.log.debug(
f"sys.monitoring event: PY_UNWIND({code}, {ip}, {type(exc).__qualname__})"
)
def _trace_raise(self, code: CodeType, ip: int, exc: BaseException):
thread = self.Thread.from_python_thread()
if not thread.is_traced:
return
self.log.debug(
f"sys.monitoring event: RAISE({code}, {ip}, {type(exc).__qualname__})"
)
def _trace_reraise(self, code: CodeType, ip: int, exc: BaseException):
thread = self.Thread.from_python_thread()
if not thread.is_traced:
return
self.log.debug(
f"sys.monitoring event: RERAISE({code}, {ip}, {type(exc).__qualname__})"
)
def _trace_exception_handled(self, code: CodeType, ip: int, exc: BaseException):
thread = self.Thread.from_python_thread()
if not thread.is_traced:
return
self.log.debug(
f"sys.monitoring event: EXCEPTION_HANDLED({code}, {ip}, {type(exc).__qualname__})"
)
Tracer.instance = Tracer()