Fix race conditions.

This commit is contained in:
Pavel Minaev 2024-02-25 21:22:50 -08:00
parent 256967225e
commit 79d86b9510
4 changed files with 478 additions and 344 deletions

View file

@ -2,6 +2,18 @@
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
import itertools
# Unique IDs for DAP objects such as threads, variables, breakpoints etc. These are
# negative to allow for pre-existing OS-assigned IDs (which are positive) to be used
# where available, e.g. for threads.
_dap_ids = itertools.count(-1, -1)
def new_dap_id():
"""Returns the next unique ID."""
return next(_dap_ids)
def adapter():
"""

View file

@ -10,8 +10,17 @@ from itertools import islice
from debugpy.adapter import components
from debugpy.common import json, log, messaging, sockets
from debugpy.common.messaging import MessageDict, Request
from debugpy.server import eval
from debugpy.server.tracing import Breakpoint, StackFrame, Thread, Tracer
from debugpy.server import eval, new_dap_id
from debugpy.server.tracing import (
Breakpoint,
Condition,
HitCondition,
LogMessage,
Source,
StackFrame,
Thread,
Tracer,
)
class Adapter:
@ -125,18 +134,20 @@ class Adapter:
"default": False,
"description": "Break whenever any exception is raised.",
},
{
"filter": "uncaught",
"label": "Uncaught Exceptions",
"default": True,
"description": "Break when the process is exiting due to unhandled exception.",
},
{
"filter": "userUnhandled",
"label": "User Uncaught Exceptions",
"default": False,
"description": "Break when exception escapes into library code.",
},
# TODO: https://github.com/microsoft/debugpy/issues/1453
# {
# "filter": "uncaught",
# "label": "Uncaught Exceptions",
# "default": True,
# "description": "Break when the process is exiting due to unhandled exception.",
# },
# TODO: https://github.com/microsoft/debugpy/issues/1454
# {
# "filter": "userUnhandled",
# "label": "User Uncaught Exceptions",
# "default": False,
# "description": "Break when exception escapes into library code.",
# },
]
return {
@ -219,8 +230,7 @@ class Adapter:
def setBreakpoints_request(self, request: Request):
# TODO: implement source.reference for setting breakpoints in sources for
# which source code was decompiled or retrieved via inspect.getsource.
source = request("source", json.object())
path = source("path", str)
source = Source(request("source", json.object())("path", str))
# TODO: implement column support.
# Use dis.get_instruction() to iterate over instructions and corresponding
@ -236,15 +246,66 @@ class Adapter:
lines = request("lines", json.array(int))
bps = [MessageDict(request, {"line": line}) for line in lines]
Breakpoint.clear([path])
Breakpoint.clear([source])
# Do the first pass validating conditions and log messages for syntax errors; if
# any breakpoint fails validation, we want to respond with an error right away
# so that user gets immediate feedback, but this also means that we shouldn't
# actually set any breakpoints until we've validated all of them.
bps_info = []
for bp in bps:
id = new_dap_id()
line = bp("line", int)
# A missing condition or log message can be represented as the corresponding
# property missing, or as the property being present but set to empty string.
condition = bp("condition", str, optional=True)
if condition:
try:
condition = Condition(id, condition)
except SyntaxError as exc:
raise request.isnt_valid(
f"Syntax error in condition ({condition}): {exc}"
)
else:
condition = None
hit_condition = bp("hitCondition", str, optional=True)
if hit_condition:
try:
hit_condition = HitCondition(id, hit_condition)
except SyntaxError as exc:
raise request.isnt_valid(
f"Syntax error in hit condition ({hit_condition}): {exc}"
)
else:
hit_condition = None
log_message = bp("logMessage", str, optional=True)
if log_message:
try:
log_message = LogMessage(id, log_message)
except SyntaxError as exc:
raise request.isnt_valid(
f"Syntax error in log message f{log_message!r}: {exc}"
)
else:
log_message = None
bps_info.append((id, source, line, condition, hit_condition, log_message))
# Now that we know all breakpoints are syntactically valid, we can set them.
bps_set = [
Breakpoint.set(
path, bp["line"],
condition=bp("condition", str, optional=True),
hit_condition=bp("hitCondition", str, optional=True),
log_message=bp("logMessage", str, optional=True),
Breakpoint(
id,
source,
line,
condition=condition,
hit_condition=hit_condition,
log_message=log_message,
)
for bp in bps
for id, source, line, condition, hit_condition, log_message in bps_info
]
return {"breakpoints": bps_set}
@ -269,39 +330,44 @@ class Adapter:
finally:
del frames
# For "pause" and "continue" requests, DAP requires a thread ID to be specified,
# but does not require the adapter to only pause/unpause the specified thread.
# Visual Studio debug adapter host does not support the ability to pause/unpause
# only the specified thread, and requires the adapter to always pause/unpause all
# threads. For "continue" requests, there is a capability flag that the client can
# use to indicate support for per-thread continuation, but there's no such flag
# for per-thread pausing. Furethermore, the semantics of unpausing a specific
# thread after all threads have been paused is unclear in the event the unpaused
# thread then spawns additional threads. Therefore, we always ignore the "threadId"
# property and just pause/unpause everything.
def pause_request(self, request: Request):
if request.arguments.get("threadId", None) == "*":
thread_ids = None
else:
thread_ids = [request("threadId", int)]
self._tracer.pause(thread_ids)
try:
self._tracer.pause()
except ValueError:
raise request.cant_handle("No threads to pause")
return {}
def continue_request(self, request: Request):
if request.arguments.get("threadId", None) == "*":
thread_ids = None
else:
thread_ids = [request("threadId", int)]
single_thread = request("singleThread", False)
self._tracer.resume(thread_ids if single_thread else None)
self._tracer.resume()
return {}
def stepIn_request(self, request: Request):
# TODO: support "singleThread" and "granularity"
thread_id = request("threadId", int)
self._tracer.step_in(thread_id)
thread = Thread.get(request("threadId", int))
self._tracer.step_in(thread)
return {}
def stepOut_request(self, request: Request):
# TODO: support "singleThread" and "granularity"
thread_id = request("threadId", int)
self._tracer.step_out(thread_id)
thread = Thread.get(request("threadId", int))
self._tracer.step_out(thread)
return {}
def next_request(self, request: Request):
# TODO: support "singleThread" and "granularity"
thread_id = request("threadId", int)
self._tracer.step_over(thread_id)
thread = Thread.get(request("threadId", int))
self._tracer.step_over(thread)
return {}
def scopes_request(self, request: Request):

View file

@ -3,13 +3,13 @@
# for license information.
import re
import sys
import threading
import traceback
from collections import defaultdict
from dataclasses import dataclass
from debugpy import server
from debugpy.common import log
from debugpy.server import new_dap_id
from debugpy.server.eval import Scope, VariableContainer
from pathlib import Path
from sys import monitoring
@ -20,6 +20,47 @@ from typing import Callable, ClassVar, Dict, Iterable, List, Literal, Union
_cvar = threading.Condition()
class Source:
"""
Represents a DAP Source object.
"""
path: str
"""
Path to the source file; immutable. Note that this needs not be an actual valid
path on the filesystem; values such as <string> or <stdin> are also allowed.
"""
# TODO: support "sourceReference" for cases where path isn't available (e.g. decompiled code)
def __init__(self, path: str):
# If it is a valid file path, we want to resolve and normalize it, so that it
# can be unambiguously compared to code object paths later.
try:
path = str(Path(path).resolve())
except (OSError, RuntimeError):
# Something like <string> or <stdin>
pass
self.path = path
def __getstate__(self) -> dict:
return {"path": self.path}
def __repr__(self) -> str:
return f"Source({self.path!r})"
def __str__(self) -> str:
return self.path
def __eq__(self, other) -> bool:
if not isinstance(other, Source):
return False
return self.path == other.path
def __hash__(self) -> int:
return hash(self.path)
class Thread:
"""
Represents a DAP Thread object. Instances must never be created directly;
@ -32,6 +73,12 @@ class Thread:
python_thread: threading.Thread
"""The Python thread object this DAP Thread represents."""
python_frame: FrameType | None
"""
The Python frame object corresponding to the topmost stack frame on this thread
if it is suspended, or None if it is running.
"""
is_known_to_adapter: bool
"""
Whether this thread has been reported to the adapter via the
@ -44,42 +91,46 @@ class Thread:
can exclude a specific thread from tracing.
"""
_last_id = 0
_all: ClassVar[Dict[int, "Thread"]] = {}
def __init__(self, python_thread):
def __init__(self, python_thread: threading.Thread):
"""
Create a new Thread object for the given thread. Do not invoke directly;
use Thread.get() instead.
"""
self.python_thread = python_thread
self.current_frame = None
self.is_known_to_adapter = False
self.is_traced = True
with _cvar:
# Thread IDs are serialized as JSON numbers in DAP, which are handled as 64-bit
# floats by most DAP clients. However, OS thread IDs can be large 64-bit integers
# on some platforms. To avoid loss of precision, we map all thread IDs to 32-bit
# signed integers; if the original ID fits, we use it as is, otherwise we use a
# generated negative ID that is guaranteed to fit.
self.id = self.python_thread.ident
if self.id != float(self.id):
Thread._last_id -= 1
self.id = Thread._last_id
self._all[self.id] = self
# Thread IDs are serialized as JSON numbers in DAP, which are handled as 64-bit
# floats by most DAP clients. However, OS thread IDs can be large 64-bit integers
# on some platforms. To avoid loss of precision, we map all thread IDs to 32-bit
# signed integers; if the original ID fits, we use it as is, otherwise we use a
# generated negative ID that is guaranteed to fit.
self.id = self.python_thread.ident
assert self.id is not None
if self.id < 0 or self.id != float(self.id):
self.id = new_dap_id()
self._all[self.id] = self
log.info(
f"DAP Thread(id={self.id}) created for Python Thread(ident={self.python_thread.ident})"
f"DAP {self} created for Python Thread(ident={self.python_thread.ident})"
)
def __getstate__(self):
def __repr__(self) -> str:
return f"Thread({self.id})"
def __getstate__(self) -> dict:
return {
"id": self.id,
"name": self.name,
}
@property
def name(self):
def name(self) -> str:
return self.python_thread.name
@classmethod
@ -89,18 +140,19 @@ class Thread:
the current Python thread if None, creating it and reporting it to adapter if
necessary. If the current thread is internal debugpy thread, returns None.
"""
if python_thread is None:
python_thread = threading.current_thread()
if python_thread.ident is None:
return None
if getattr(python_thread, "is_debugpy_thread", False):
return None
with _cvar:
for thread in self._all.values():
if thread.python_thread is python_thread:
if thread.python_thread.ident == python_thread.ident:
break
else:
thread = Thread(python_thread)
thread.make_known_to_adapter()
thread.make_known_to_adapter()
return thread
@classmethod
@ -112,16 +164,11 @@ class Thread:
return self._all.get(id, None)
@classmethod
def enumerate(self) -> List["Thread"]:
def enumerate(self) -> list["Thread"]:
"""
Returns a list of all running threads in this process.
Returns all running threads in this process.
"""
return [
thread
for python_thread in threading.enumerate()
for thread in [Thread.from_python_thread(python_thread)]
if thread is not None and thread.is_traced
]
return [thread for thread in self._all.values() if thread.is_traced]
def make_known_to_adapter(self):
"""
@ -155,15 +202,16 @@ class Thread:
starting with the topmost frame.
"""
try:
(fobj,) = (
fobj for (id, fobj) in sys._current_frames().items() if id == self.id
)
with _cvar:
python_frame = self.python_frame
except ValueError:
raise ValueError(f"Can't get frames for inactive Thread({self.id})")
for fobj, _ in traceback.walk_stack(fobj):
frame = StackFrame.from_frame_object(self, fobj)
for python_frame, _ in traceback.walk_stack(python_frame):
frame = StackFrame.from_frame_object(self, python_frame)
log.info("{0}", f"{self}: {frame}")
if not frame.is_internal():
yield frame
log.info("{0}", f"{self}: End stack trace.")
class StackFrame:
@ -176,10 +224,9 @@ class StackFrame:
frame_object: FrameType
id: int
_path: Path
_source: Source | None
_scopes: List[Scope]
_last_id = 0
_all: ClassVar[Dict[int, "StackFrame"]] = {}
def __init__(self, thread: Thread, frame_object: FrameType):
@ -187,45 +234,44 @@ class StackFrame:
Create a new StackFrame object for the given thread and frame object. Do not
invoke directly; use StackFrame.from_frame_object() instead.
"""
StackFrame._last_id += 1
self.id = StackFrame._last_id
self.id = new_dap_id()
self.thread = thread
self.frame_object = frame_object
self._path = None
self._source = None
self._scopes = None
self._all[self.id] = self
def __getstate__(self):
def __getstate__(self) -> dict:
return {
"id": self.id,
"name": self.frame_object.f_code.co_name,
"source": {
# TODO: use "sourceReference" when path isn't available (e.g. decompiled code)
"path": str(self.path()),
},
"source": self.source(),
"line": self.frame_object.f_lineno,
"column": 1, # TODO
# TODO: "endLine", "endColumn", "moduleId", "instructionPointerReference"
}
def __repr__(self) -> str:
result = f"StackFrame({self.id}, {self.frame_object}"
if self.is_internal():
result += ", internal=True"
result += ")"
return result
@property
def line(self) -> int:
return self.frame_object.f_lineno
def path(self) -> Path:
if self._path is None:
path = Path(self.frame_object.f_code.co_filename)
try:
path = path.resolve()
except (OSError, RuntimeError):
pass
# No need to sync this since all instances are equivalent.
self._path = path
return self._path
def source(self) -> Source:
if self._source is None:
# No need to sync this since all instances created from the same path
# are equivalent for all purposes.
self._source = Source(self.frame_object.f_code.co_filename)
return self._source
def is_internal(self) -> bool:
# TODO: filter internal frames properly
parts = self.path().parts
parts = Path(self.source().path).parts
internals = ["debugpy", "threading"]
return any(part.startswith(s) for s in internals for part in parts)
@ -262,38 +308,68 @@ class Step:
origin: FrameType = None
origin_line: int = None
def __repr__(self):
return f"Step({self.step})"
def is_complete(self, python_frame: FrameType) -> bool:
is_complete = False
if self.step == "in":
is_complete = (
python_frame is not self.origin
or python_frame.f_lineno != self.origin_line
)
elif self.step == "over":
is_complete = True
for python_frame, _ in traceback.walk_stack(python_frame):
if (
python_frame is self.origin
and python_frame.f_lineno == self.origin_line
):
is_complete = False
break
return is_complete
elif self.step == "out":
while python_frame is not None:
if python_frame is self.origin:
is_complete = False
break
else:
raise ValueError(f"Unknown step type: {self.step}")
return is_complete
class Condition:
"""
Expression that must be true for the breakpoint to be triggered.
"""
id: int
"""Used to identify the condition in stack traces. Should match breakpoint ID."""
expression: str
"""Python expression that must evaluate to True for the breakpoint to be triggered."""
_code: CodeType
def __init__(self, breakpoint: "Breakpoint", expression: str):
def __init__(self, id: int, expression: str):
self.id = id
self.expression = expression
self._code = compile(
expression, f"breakpoint-{breakpoint.id}-condition", "eval"
)
self._code = compile(expression, f"breakpoint-{id}-condition", "eval")
def test(self, frame: StackFrame) -> bool:
"""
Returns True if the breakpoint should be triggered in the specified frame.
"""
try:
return bool(
eval(
self._code,
frame.frame_object.f_globals,
frame.frame_object.f_locals,
)
result = eval(
self._code,
frame.frame_object.f_globals,
frame.frame_object.f_locals,
)
except:
log.exception(
f"Exception while evaluating breakpoint condition: {self.expression}"
return bool(result)
except BaseException as exc:
log.error(
f"Exception while evaluating breakpoint condition ({self.expression}): {exc}"
)
return False
@ -322,20 +398,26 @@ class HitCondition:
"%": lambda expected_count, count: count % expected_count == 0,
}
id: int
"""Used to identify the condition in stack traces. Should match breakpoint ID."""
hit_condition: str
"""Hit count expression."""
_count: int
_operator: Callable[[int, int], bool]
def __init__(self, hit_condition: str):
def __init__(self, id: int, hit_condition: str):
self.id = id
self.hit_condition = hit_condition
m = re.match(r"([<>=]+)?(\d+)", hit_condition)
m = re.match(r"^\D*(\d+)$", hit_condition)
if not m:
raise ValueError(f"Invalid hit condition: {hit_condition}")
raise SyntaxError(f"Invalid hit condition: {hit_condition}")
self._count = int(m.group(2))
try:
op = self._OPERATORS[m.group(1) or "=="]
except KeyError:
raise ValueError(f"Invalid hit condition operator: {op}")
raise SyntaxError(f"Invalid hit condition operator: {op}")
self.test = lambda count: op(self._count, count)
def test(self, count: int) -> bool:
@ -352,16 +434,20 @@ class LogMessage:
A message with spliced expressions, to be logged when a breakpoint is triggered.
"""
id: int
"""Used to identify the condition in stack traces. Should match breakpoint ID."""
message: str
"""The message to be logged. May contain expressions in curly braces."""
_code: CodeType
"""Compiled code object for the f-string corresponding to the message."""
def __init__(self, breakpoint: "Breakpoint", message: str):
def __init__(self, id: int, message: str):
self.id = id
self.message = message
f_string = "f" + repr(message)
self._code = compile(f_string, f"breakpoint-{breakpoint.id}-logMessage", "eval")
self._code = compile(f_string, f"breakpoint-{id}-logMessage", "eval")
def format(self, frame: StackFrame) -> str:
"""
@ -371,9 +457,9 @@ class LogMessage:
return eval(
self._code, frame.frame_object.f_globals, frame.frame_object.f_locals
)
except:
except BaseException as exc:
log.exception(
f"Exception while formatting breakpoint log message: {self.message}"
f"Exception while formatting breakpoint log message f{self.message!r}: {exc}"
)
return self.message
@ -384,7 +470,7 @@ class Breakpoint:
"""
id: int
path: Path
source: Source
line: int
is_enabled: bool
@ -397,93 +483,69 @@ class Breakpoint:
hit_count: int
"""Number of times this breakpoint has been hit."""
_last_id = 0
_all: ClassVar[Dict[int, "Breakpoint"]] = {}
_at: ClassVar[Dict[Path, Dict[int, List["Breakpoint"]]]] = defaultdict(
_at: ClassVar[Dict[Source, Dict[int, List["Breakpoint"]]]] = defaultdict(
lambda: defaultdict(lambda: [])
)
# ID must be explicitly specified so that conditions and log message can
# use the same ID - this makes for better call stacks and error messages.
def __init__(
self, path, line, *, condition=None, hit_condition=None, log_message=None
self,
id: int,
source: Source,
line: int,
*,
condition: Condition | None = None,
hit_condition: HitCondition | None = None,
log_message: LogMessage | None = None,
):
with _cvar:
Breakpoint._last_id += 1
self.id = Breakpoint._last_id
self.path = path
self.id = id
self.source = source
self.line = line
self.is_enabled = True
self.condition = Condition(self, condition) if condition else None
self.hit_condition = HitCondition(hit_condition) if hit_condition else None
self.log_message = LogMessage(self, log_message) if log_message else None
self.condition = condition
self.hit_condition = hit_condition
self.log_message = log_message
self.hit_count = 0
with _cvar:
self._all[self.id] = self
self._at[self.path][self.line].append(self)
self._at[self.source][self.line].append(self)
_cvar.notify_all()
monitoring.restart_events()
def __getstate__(self):
def __getstate__(self) -> dict:
return {
"line": self.line,
"verified": True, # TODO
}
@classmethod
def at(self, path: str, line: int) -> List["Breakpoint"]:
def at(self, source: Source, line: int) -> List["Breakpoint"]:
"""
Returns a list of all breakpoints at the specified location.
"""
with _cvar:
return self._at[path][line]
return self._at[source][line]
@classmethod
def clear(self, paths: Iterable[str] = None):
def clear(self, sources: Iterable[Source] = None):
"""
Removes all breakpoints in the specified files, or all files if None.
"""
if paths is not None:
paths = [Path(path).resolve() for path in paths]
with _cvar:
if paths is None:
paths = list(self._at.keys())
for path in paths:
bps_in = self._at.pop(path, {}).values()
if sources is None:
sources = list(self._at.keys())
for source in sources:
bps_in = self._at.pop(source, {}).values()
for bps_at in bps_in:
for bp in bps_at:
del self._all[bp.id]
_cvar.notify_all()
monitoring.restart_events()
@classmethod
def set(
self,
path: str,
line: int,
*,
condition=None,
hit_condition=None,
log_message=None,
) -> "Breakpoint":
"""
Creates a new breakpoint at the specified location.
"""
try:
path = Path(path).resolve()
except (OSError, RuntimeError):
pass
bp = Breakpoint(
path,
line,
condition=condition,
hit_condition=hit_condition,
log_message=log_message,
)
monitoring.restart_events()
return bp
def enable(self, is_enabled: bool):
"""
Enables or disables this breakpoint.
@ -501,11 +563,11 @@ class Breakpoint:
a log message, it is formatted and returned, otherwise True is returned.
"""
with _cvar:
# Check path last since path resolution is potentially expensive.
# Check source last since path resolution is potentially expensive.
if (
not self.is_enabled
or frame.line != self.line
or frame.path() != self.path
or frame.source() != self.source
):
return False
@ -520,7 +582,7 @@ class Breakpoint:
return False
if self.condition is not None and not self.condition.test(frame):
return False
# If this is a logpoint, return the formatted message instead of True.
if self.log_message is not None:
return self.log_message.format(frame)

View file

@ -35,6 +35,7 @@ class Log:
self.debug = self.info = self.warning = self.error = self.exception = nop
def debug(self, *args, **kwargs):
# TODO: improve logging performance enough to enable this.
# self.log.debug("{0}", *args, **kwargs)
# print(*args)
pass
@ -60,19 +61,29 @@ class Tracer:
import inspect
import threading
from debugpy import server
from debugpy.server.tracing import Breakpoint, Step, Thread, StackFrame, _cvar
from pathlib import Path
from debugpy.server.tracing import (
Breakpoint,
Source,
Step,
Thread,
StackFrame,
_cvar,
)
from sys import monitoring
instance: "Tracer"
log: Log
_pause_ids = set()
"""IDs of threads that are currently pausing or paused."""
_stopped_by: Thread | None = None
"""
If not None, indicates the thread on which the event that caused the debuggee
to enter suspended state has occurred. When any other thread observes a non-None
value of this attribute, it must immediately suspend and wait until it is cleared.
"""
_steps = {}
"""Ongoing steps, keyed by thread ID."""
_steps: dict[Thread, Step] = {}
"""Ongoing steps, keyed by thread."""
def __init__(self):
self.log = Log()
@ -121,118 +132,130 @@ class Tracer:
self.log.info("sys.monitoring tracing callbacks registered.")
def pause(self, thread_ids: Iterable[int] = None):
def pause(self):
"""
Pause the specified threads, or all threads if thread_ids is None.
"""
if thread_ids is None:
# Pausing is async, so additional threads may be spawned even as we are
# trying to pause the ones we currently know about; iterate until all
# known threads are paused, and no new threads appear.
while True:
thread_ids = {thread.id for thread in self.Thread.enumerate()}
if self._pause_ids.keys() == thread_ids:
return
self.pause(thread_ids)
else:
self.log.info(f"Pausing threads: {thread_ids}")
with self._cvar:
self._pause_ids.update(thread_ids)
self._cvar.notify_all()
self.monitoring.restart_events()
def resume(self, thread_ids: Iterable[int] = None):
"""
Resume the specified threads, or all threads if thread_ids is None.
Pause all threads.
"""
self.log.info("Pausing all threads.")
with self._cvar:
if thread_ids is None:
self.log.info("Resuming all threads.")
self._pause_ids.clear()
else:
self.log.info(f"Resuming threads: {thread_ids}")
self._pause_ids.difference_update(thread_ids)
self._cvar.notify_all()
self.monitoring.restart_events()
# Although "pause" is a user-induced scenariop that is not specifically
# associated with any thread, we still need to pick some thread that
# will nominally own it to report the event on. If there is a designated
# main thread in the process, use that, otherwise pick one at random.
python_thread = self.threading.main_thread()
if python_thread is None:
python_thread = next(iter(self.threading.enumerate()), None)
if python_thread is None:
raise ValueError("No threads to pause.")
thread = self.Thread.from_python_thread(python_thread)
self.begin_stop(thread, "pause")
def abandon_step(self, thread_ids: Iterable[int] = None):
def resume(self):
"""
Resume all threads.
"""
self.log.info("Resuming all threads.")
self.end_stop()
def abandon_step(self, threads: Iterable[int] = None):
"""
Abandon any ongoing steps that are in progress on the specified threads
(all threads if thread_ids is None).
(all threads if argument is None).
"""
with self._cvar:
if thread_ids is None:
thread_ids = [thread.id for thread in self.Thread.enumerate()]
for thread_id in thread_ids:
step = self._steps.pop(thread_id, None)
if step is not None:
self.log.info(f"Abandoned step-{step.step} on {thread_id}.")
if threads is None:
step = self._steps.clear()
while self._steps:
thread, step = self._steps.popitem()
self.log.info(f"Abandoned {step} on {thread}.")
else:
for thread in threads:
step = self._steps.pop(thread, None)
if step is not None:
self.log.info(f"Abandoned {step} on {thread}.")
self._cvar.notify_all()
self.monitoring.restart_events()
def step_in(self, thread_id: int):
def step_in(self, thread: Thread):
"""
Step into the next statement executed by the specified thread.
"""
self.log.info(f"Step in on thread {thread_id}.")
self.log.info(f"Step in on {thread}.")
with self._cvar:
self._steps[thread_id] = self.Step("in")
self._pause_ids.clear()
self._cvar.notify_all()
self._steps[thread] = self.Step("in")
self.end_stop()
self.monitoring.restart_events()
def step_out(self, thread_id: int):
def step_out(self, thread: Thread):
"""
Step out of the current function executed by the specified thread.
"""
self.log.info(f"Step out on thread {thread_id}.")
self.log.info(f"Step out on {thread}.")
with self._cvar:
self._steps[thread_id] = self.Step("out")
self._pause_ids.clear()
self._cvar.notify_all()
self._steps[thread] = self.Step("out")
self.end_stop()
self.monitoring.restart_events()
def step_over(self, thread_id: int):
self.log.info(f"Step over on thread {thread_id}.")
def step_over(self, thread: Thread):
self.log.info(f"Step over on {thread}.")
"""
Step over the next statement executed by the specified thread.
"""
with self._cvar:
self._steps[thread_id] = self.Step("over")
self._pause_ids.clear()
self._cvar.notify_all()
self._steps[thread] = self.Step("over")
self.end_stop()
self.monitoring.restart_events()
def _stop(
self,
frame_obj: FrameType,
reason: str,
hit_breakpoints: Iterable[Breakpoint] = (),
def begin_stop(
self, thread: Thread, reason: str, hit_breakpoints: Iterable[Breakpoint] = ()
):
thread = self.Thread.from_python_thread()
self.log.info(f"Pausing thread {thread.id}: {reason}.")
"""
Report the stop to the adapter and tell all threads to suspend themselves.
"""
with self._cvar:
if thread.id not in self._pause_ids:
self._stopped_by = thread
self._cvar.notify_all()
self.monitoring.restart_events()
self.adapter.channel.send_event(
"stopped",
{
"reason": reason,
"threadId": thread.id,
"allThreadsStopped": True,
"hitBreakpointIds": [bp.id for bp in hit_breakpoints],
},
)
def end_stop(self):
"""
Tell all threads to resume themselves.
"""
with self._cvar:
self._stopped_by = None
self._cvar.notify_all()
def suspend_this_thread(self, frame_obj: FrameType):
"""
Suspends execution of this thread until the current stop ends.
"""
thread = self.Thread.from_python_thread()
with self._cvar:
if self._stopped_by is None:
return
self.adapter.channel.send_event(
"stopped",
{
"reason": reason,
"threadId": thread.id,
"allThreadsStopped": False, # TODO
"hitBreakpointIds": [bp.id for bp in hit_breakpoints],
},
)
self.log.info(f"Thread {thread.id} paused.")
while thread.id in self._pause_ids:
self.log.info(f"{thread} suspended.")
thread.python_frame = frame_obj
while self._stopped_by is not None:
self._cvar.wait()
self.log.info(f"Thread {thread.id} unpaused.")
thread.python_frame = None
self.log.info(f"{thread} resumed.")
step = self._steps.get(thread.id, None)
step = self._steps.get(thread, None)
if step is not None and step.origin is None:
# This step has just begun - update the Step object with information
# about current frame that will be used to track step completion.
step.origin = frame_obj
step.origin_line = frame_obj.f_lineno
@ -242,108 +265,79 @@ class Tracer:
return self.monitoring.DISABLE
self.log.debug(f"sys.monitoring event: LINE({line_number}, {code})")
frame_obj = self.inspect.currentframe().f_back
stop_reason = None
with self._cvar:
if thread.id in self._pause_ids:
stop_reason = "pause"
step = self._steps.get(thread.id, None)
is_stepping = step is not None and step.origin is not None
if not is_stepping:
self.log.debug(f"No step in progress on thread {thread.id}.")
else:
self.log.debug(
f"Tracing step-{step.step} originating from {step.origin} on thread {thread.id}."
)
# TODO: use CALL/RETURN/PY_RETURN to track these more efficiently.
step_finished = False
if step.step == "in":
if frame_obj is not step.origin or line_number != step.origin_line:
step_finished = True
elif step.step == "out":
step_finished = True
while frame_obj is not None:
if frame_obj is step.origin:
step_finished = False
break
frame_obj = frame_obj.f_back
elif step.step == "over":
step_finished = True
while frame_obj is not None:
if (
frame_obj is step.origin
and frame_obj.f_lineno == step.origin_line
):
step_finished = False
break
frame_obj = frame_obj.f_back
# These two local variables hold direct or indirect references to frame
# objects on the stack of the current thread, and thus must be cleaned up
# on exit to avoid expensive GC cycles.
python_frame = self.inspect.currentframe().f_back
frame = None
try:
with self._cvar:
step = self._steps.get(thread, None)
is_stepping = step is not None and step.origin is not None
if not is_stepping:
self.log.debug(f"No step in progress on {thread}.")
else:
raise ValueError(f"Unknown step type: {step.step}")
self.log.debug(
f"Tracing {step} originating from {step.origin} on {thread}."
)
if step.is_complete(python_frame):
self.log.info(f"{step} finished on thread {thread}.")
del self._steps[thread]
self.begin_stop(thread, "step")
if step_finished:
self.log.info(f"Step-{step.step} finished on thread {thread.id}.")
del self._steps[thread.id]
self._pause_ids.add(thread.id)
self._cvar.notify_all()
stop_reason = "step"
if self._stopped_by is not None:
# Even if this thread is pausing, any debugpy internal code on it should
# keep running until it returns to user code; otherwise, it may deadlock
# if it was holding e.g. a messaging lock.
if not python_frame.f_globals.get("__name__", "").startswith(
"debugpy"
):
self.suspend_this_thread(python_frame)
return
if stop_reason is not None:
# Even if this thread is pausing, any debugpy internal code on it should
# keep running until it returns to user code; otherwise, it may deadlock
# if it was holding e.g. a messaging lock.
print(frame_obj.f_globals.get("__name__"))
if not frame_obj.f_globals.get("__name__", "").startswith("debugpy"):
return self._stop(frame_obj, stop_reason)
self.log.debug(f"Resolving path {code.co_filename!r}...")
source = self.Source(code.co_filename)
self.log.debug(f"Path {code.co_filename!r} resolved to {source}.")
self.log.debug(f"Resolving path {code.co_filename!r}...")
path = self.Path(code.co_filename)
try:
path = path.resolve()
except (OSError, RuntimeError):
pass
self.log.debug(f"Path {code.co_filename!r} resolved to {path}.")
bps = self.Breakpoint.at(path, line_number)
if not bps and not is_stepping:
self.log.debug(f"No breakpoints at {path}:{line_number}.")
return self.monitoring.DISABLE
self.log.debug(f"Considering breakpoints: {[bp.__getstate__() for bp in bps]}.")
frame = self.StackFrame(thread, self.inspect.currentframe().f_back)
try:
stop_bps = []
for bp in bps:
match bp.is_triggered(frame):
case str() as message:
# Triggered, has logMessage - print it but don't stop.
self.adapter.channel.send_event(
"output",
{
"category": "console",
"output": message,
"line": line_number,
"source": {"path": path},
},
)
case triggered if triggered:
# Triggered, no logMessage - stop.
stop_bps.append(bp)
case _:
continue
if stop_bps:
self.log.info(
f"Stack frame {frame} stopping at breakpoints {[bp.__getstate__() for bp in stop_bps]}."
bps = self.Breakpoint.at(source, line_number)
if not bps and not is_stepping:
self.log.debug(f"No breakpoints at {source}:{line_number}.")
return self.monitoring.DISABLE
self.log.debug(
f"Considering breakpoints: {[bp.__getstate__() for bp in bps]}."
)
with self._cvar:
self._pause_ids.add(thread.id)
self._cvar.notify_all()
return self._stop(frame.frame_object, "breakpoint", stop_bps)
frame = self.StackFrame(thread, self.inspect.currentframe().f_back)
stop_bps = []
for bp in bps:
match bp.is_triggered(frame):
case str() as message:
# Triggered, has logMessage - print it but don't stop.
self.adapter.channel.send_event(
"output",
{
"category": "console",
"output": message,
"line": line_number,
"source": source,
},
)
case triggered if triggered:
# Triggered, no logMessage - stop.
stop_bps.append(bp)
case _:
continue
if stop_bps:
self.log.info(
f"Stack frame {frame} stopping at breakpoints {[bp.__getstate__() for bp in stop_bps]}."
)
self.begin_stop(thread, "breakpoint", stop_bps)
self.suspend_this_thread(frame.frame_object)
finally:
del frame
del python_frame
def _trace_py_start(self, code: CodeType, ip: int):
thread = self.Thread.from_python_thread()