mirror of
https://github.com/microsoft/debugpy.git
synced 2025-12-23 08:48:12 +00:00
Some checks failed
Code scanning - action / CodeQL-Build (push) Has been cancelled
* Add support for space in the python file itself when using shell expansion. * Fix linter * Fix flakey test
1023 lines
37 KiB
Python
1023 lines
37 KiB
Python
# Copyright (c) Microsoft Corporation. All rights reserved.
|
|
# Licensed under the MIT License. See LICENSE in the project root
|
|
# for license information.
|
|
|
|
import collections
|
|
import itertools
|
|
import os
|
|
import psutil
|
|
import py
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
|
|
import debugpy.adapter
|
|
from debugpy.common import json, log, messaging, sockets, util
|
|
import tests
|
|
from tests import code, timeline, watchdog
|
|
from tests.debug import comms, config, output
|
|
from tests.patterns import some
|
|
|
|
DEBUGGEE_PYTHONPATH = tests.root / "DEBUGGEE_PYTHONPATH"
|
|
|
|
StopInfo = collections.namedtuple(
|
|
"StopInfo", ["body", "frames", "thread_id", "frame_id"]
|
|
)
|
|
|
|
|
|
class Session(object):
|
|
"""A test debug session. Manages the lifetime of the adapter and the debuggee
|
|
processes, captures debuggee stdio output, establishes a DAP message channel to
|
|
the debuggee, and records all DAP messages in that channel on a Timeline object.
|
|
|
|
Must be used in a with-statement for proper cleanup. On successful exit - if no
|
|
exception escapes from the with-statement - the session will:
|
|
|
|
1. Invoke wait_for_exit(), unless expected_exit_code is None.
|
|
2. Invoke disconnect().
|
|
3. Wait for the adapter process to exit.
|
|
4. Finalize and closes the timeline
|
|
|
|
If the exit is due to an exception, the session will:
|
|
|
|
1. Invoke disconnect(force=True).
|
|
2. Kill the debuggee and the adapter processes.
|
|
|
|
Example::
|
|
|
|
with debug.Session() as session:
|
|
# Neither debuggee nor adapter are spawned yet. Initial configuration.
|
|
session.log_dir = ...
|
|
session.config.update({...})
|
|
|
|
with session.launch(...):
|
|
# Debuggee and adapter are spawned, but there is no code executing
|
|
# in the debuggee yet.
|
|
session.set_breakpoints(...)
|
|
|
|
# Code is executing in the debuggee.
|
|
session.wait_for_stop(expected_frames=[...])
|
|
assert session.get_variable(...) == ...
|
|
session.request_continue()
|
|
|
|
# Session is disconnected from the debuggee, and both the debuggee and the
|
|
# adapter processes have exited.
|
|
assert session.exit_code == ...
|
|
"""
|
|
|
|
tmpdir = None
|
|
"""Temporary directory in which Sessions can create the temp files they need.
|
|
|
|
Automatically set to tmpdir for the current test by pytest_fixtures.test_wrapper().
|
|
"""
|
|
|
|
@classmethod
|
|
def reset_counter(cls):
|
|
cls._counter = itertools.count(1)
|
|
|
|
def __init__(self, debug_config=None):
|
|
assert Session.tmpdir is not None
|
|
watchdog.start()
|
|
|
|
self.id = next(Session._counter)
|
|
log.info("Starting {0}", self)
|
|
|
|
self.client_id = "vscode"
|
|
|
|
self.capabilities = {
|
|
"pathFormat": "path",
|
|
"clientID": self.client_id,
|
|
"adapterID": "test",
|
|
"linesStartAt1": True,
|
|
"columnsStartAt1": True,
|
|
"supportsVariableType": True,
|
|
"supportsRunInTerminalRequest": True,
|
|
"supportsArgsCanBeInterpretedByShell": True,
|
|
"supportsStartDebuggingRequest": False,
|
|
}
|
|
|
|
self.debuggee = None
|
|
"""psutil.Popen instance for the debuggee process."""
|
|
|
|
self.adapter = None
|
|
"""psutil.Popen instance for the adapter process."""
|
|
|
|
self.expected_adapter_sockets = {
|
|
"client": {"host": some.str, "port": some.int, "internal": False},
|
|
}
|
|
"""The sockets which the adapter is expected to report."""
|
|
|
|
self.adapter_endpoints = None
|
|
"""Name of the file that contains the adapter endpoints information.
|
|
|
|
This file is generated by the adapter when it opens the listener sockets,
|
|
and deleted by it when it exits.
|
|
"""
|
|
|
|
self.channel = None
|
|
"""JsonMessageChannel to the adapter."""
|
|
|
|
self.captured_output = {"stdout", "stderr"}
|
|
"""Before the debuggee is spawned, this is the set of stdio streams that
|
|
should be captured once it is spawned.
|
|
|
|
After it is spawned, this is a CapturedOutput object capturing those streams.
|
|
"""
|
|
|
|
self.backchannel = None
|
|
"""The BackChannel object to talk to the debuggee.
|
|
|
|
Must be explicitly created with open_backchannel().
|
|
"""
|
|
|
|
self.scratchpad = comms.ScratchPad(self)
|
|
"""The ScratchPad object to talk to the debuggee."""
|
|
|
|
self.start_command = None
|
|
"""Set to either "launch" or "attach" just before the corresponding request is sent.
|
|
"""
|
|
|
|
self.start_request = None
|
|
"""The "launch" or "attach" request that started executing code in this session.
|
|
"""
|
|
|
|
self.expected_exit_code = 0
|
|
"""The expected exit code for the debuggee process.
|
|
|
|
If None, the debuggee is not expected to exit when the Session is closed.
|
|
|
|
If not None, this is validated against both exit_code and debuggee.returncode.
|
|
"""
|
|
|
|
self.exit_code = None
|
|
"""The actual exit code for the debuggee process, as received from DAP.
|
|
"""
|
|
|
|
self.config = config.DebugConfig(
|
|
debug_config
|
|
if debug_config is not None
|
|
else {
|
|
"justMyCode": True,
|
|
"name": "Test",
|
|
"type": "python",
|
|
}
|
|
)
|
|
"""The debug configuration for this session."""
|
|
|
|
self.before_connect = lambda address: None
|
|
"""Invoked right before a socket connection to the adapter is established.
|
|
"""
|
|
|
|
self.before_request = lambda command, arguments: None
|
|
"""Invoked for every outgoing request in this session, allowing any final
|
|
tweaks to the request before it is sent.
|
|
"""
|
|
|
|
self.log_dir = (
|
|
None if log.log_dir is None else py.path.local(log.log_dir) / str(self)
|
|
)
|
|
"""The log directory for this session. Passed via DEBUGPY_LOG_DIR to all spawned
|
|
child processes.
|
|
|
|
If set to None, DEBUGPY_LOG_DIR is not automatically added, but tests can still
|
|
provide it manually.
|
|
"""
|
|
|
|
self.tmpdir = Session.tmpdir / str(self)
|
|
self.tmpdir.ensure(dir=True)
|
|
|
|
self.timeline = timeline.Timeline(str(self))
|
|
self.ignore_unobserved.extend(
|
|
[
|
|
timeline.Event("module"),
|
|
timeline.Event("continued"),
|
|
timeline.Event("debugpyWaitingForServer"),
|
|
timeline.Event("debugpySockets"),
|
|
timeline.Event("thread", some.dict.containing({"reason": "started"})),
|
|
timeline.Event("thread", some.dict.containing({"reason": "exited"})),
|
|
timeline.Event("output", some.dict.containing({"category": "stdout"})),
|
|
timeline.Event("output", some.dict.containing({"category": "stderr"})),
|
|
timeline.Event("output", some.dict.containing({"category": "console"})),
|
|
timeline.Event(
|
|
"output", some.dict.containing({"category": "important"})
|
|
),
|
|
]
|
|
)
|
|
|
|
# Expose some common members of timeline directly - these should be the ones
|
|
# that are the most straightforward to use, and are difficult to use incorrectly.
|
|
# Conversely, most tests should restrict themselves to this subset of the API,
|
|
# and avoid calling members of timeline directly unless there is a good reason.
|
|
self.new = self.timeline.new
|
|
self.observe = self.timeline.observe
|
|
self.wait_for_next = self.timeline.wait_for_next
|
|
self.proceed = self.timeline.proceed
|
|
self.expect_new = self.timeline.expect_new
|
|
self.expect_realized = self.timeline.expect_realized
|
|
self.all_occurrences_of = self.timeline.all_occurrences_of
|
|
self.observe_all = self.timeline.observe_all
|
|
|
|
spawn_adapter = self.spawn_adapter
|
|
self.spawn_adapter = lambda *args, **kwargs: spawn_adapter(*args, **kwargs)
|
|
self.spawn_adapter.env = util.Env()
|
|
|
|
spawn_debuggee = self.spawn_debuggee
|
|
self.spawn_debuggee = lambda *args, **kwargs: spawn_debuggee(*args, **kwargs)
|
|
self.spawn_debuggee.env = util.Env()
|
|
|
|
def __str__(self):
|
|
return f"Session[{self.id}]"
|
|
|
|
@property
|
|
def adapter_id(self):
|
|
return f"Adapter[{self.id}]"
|
|
|
|
@property
|
|
def debuggee_id(self):
|
|
return f"Debuggee[{self.id}]"
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
log.info("Ending {0}.", self)
|
|
|
|
if self.timeline.is_frozen:
|
|
self.timeline.unfreeze()
|
|
|
|
# Only wait for exit if there was no exception in the test - if there was one,
|
|
# the debuggee might still be waiting for further requests.
|
|
if exc_type is None:
|
|
# If expected_exit_code is set to None, the debuggee is not expected to
|
|
# exit after this Session is closed (e.g. because another Session will
|
|
# attach to it later on).
|
|
if self.expected_exit_code is not None:
|
|
self.wait_for_exit()
|
|
else:
|
|
# Log the error, in case another one happens during shutdown.
|
|
log.swallow_exception(exc_info=(exc_type, exc_val, exc_tb))
|
|
|
|
if exc_type is None:
|
|
self.disconnect()
|
|
self.timeline.close()
|
|
else:
|
|
# If there was an exception, don't try to send any more messages to avoid
|
|
# spamming log with irrelevant entries - just close the channel and kill
|
|
# all the processes immediately. Don't close or finalize the timeline,
|
|
# either, since it'll likely have unobserved events in it.
|
|
if self.adapter is not None:
|
|
log.info("Killing {0}.", self.adapter_id)
|
|
try:
|
|
self.adapter.kill()
|
|
except Exception:
|
|
pass
|
|
if self.debuggee is not None:
|
|
log.info("Killing {0}.", self.debuggee_id)
|
|
try:
|
|
self.debuggee.kill()
|
|
except Exception:
|
|
pass
|
|
self.disconnect(force=True)
|
|
|
|
if self.adapter_endpoints is not None and self.expected_exit_code is not None:
|
|
log.info("Waiting for {0} to close listener ports ...", self.adapter_id)
|
|
timeout_start = time.time()
|
|
while self.adapter_endpoints.check():
|
|
if time.time() - timeout_start > 10:
|
|
log.warning("{0} listener ports did not close within 10 seconds", self.adapter_id)
|
|
break
|
|
time.sleep(0.1)
|
|
|
|
if self.adapter is not None:
|
|
log.info(
|
|
"Waiting for {0} with PID={1} to exit.",
|
|
self.adapter_id,
|
|
self.adapter.pid,
|
|
)
|
|
try:
|
|
self.adapter.wait(timeout=10)
|
|
except Exception:
|
|
log.warning("{0} did not exit gracefully within 10 seconds, force-killing", self.adapter_id)
|
|
try:
|
|
self.adapter.kill()
|
|
self.adapter.wait(timeout=5)
|
|
except Exception as e:
|
|
log.error("Failed to force-kill {0}: {1}", self.adapter_id, e)
|
|
|
|
try:
|
|
watchdog.unregister_spawn(self.adapter.pid, self.adapter_id)
|
|
except Exception as e:
|
|
log.warning("Failed to unregister adapter spawn: {0}", e)
|
|
self.adapter = None
|
|
|
|
if self.backchannel is not None:
|
|
self.backchannel.close()
|
|
self.backchannel = None
|
|
|
|
# Work around https://bugs.python.org/issue37380
|
|
for popen in self.debuggee, self.adapter:
|
|
if popen is not None and popen.returncode is None:
|
|
popen.returncode = -1
|
|
|
|
@property
|
|
def ignore_unobserved(self):
|
|
return self.timeline.ignore_unobserved
|
|
|
|
@property
|
|
def is_subprocess(self):
|
|
return "subProcessId" in self.config
|
|
|
|
def open_backchannel(self):
|
|
assert self.backchannel is None
|
|
self.backchannel = comms.BackChannel(self)
|
|
self.backchannel.listen()
|
|
return self.backchannel
|
|
|
|
def _init_log_dir(self):
|
|
if self.log_dir is None:
|
|
return False
|
|
|
|
log.info("Logs for {0} will be in {1}", self, json.repr(self.log_dir))
|
|
try:
|
|
self.log_dir.remove()
|
|
except Exception:
|
|
pass
|
|
self.log_dir.ensure(dir=True)
|
|
|
|
# Make subsequent calls of this method no-op for the remainder of the session.
|
|
self._init_log_dir = lambda: True
|
|
return True
|
|
|
|
def _make_env(self, base_env, codecov=True):
|
|
env = util.Env.snapshot()
|
|
|
|
if base_env is not None:
|
|
base_env = dict(base_env)
|
|
python_path = base_env.pop("PYTHONPATH", None)
|
|
if python_path is not None:
|
|
env.prepend_to("PYTHONPATH", python_path)
|
|
env.update(base_env)
|
|
|
|
env["PYTHONUNBUFFERED"] = "1"
|
|
env["PYTHONWARNINGS"] = "error"
|
|
env["DEBUGPY_TEST_SESSION_ID"] = str(self.id)
|
|
env.prepend_to("PYTHONPATH", DEBUGGEE_PYTHONPATH.strpath)
|
|
|
|
if self._init_log_dir():
|
|
env.update(
|
|
{
|
|
"DEBUGPY_LOG_DIR": self.log_dir.strpath,
|
|
"PYDEVD_DEBUG": "True",
|
|
"PYDEVD_DEBUG_FILE": (self.log_dir / "pydevd.log").strpath,
|
|
}
|
|
)
|
|
|
|
if self.backchannel is not None:
|
|
env["DEBUGPY_TEST_BACKCHANNEL_PORT"] = str(self.backchannel.port)
|
|
|
|
if not codecov:
|
|
# Disable codecov subprocess hook for that process.
|
|
env.pop("COV_CORE_SOURCE", None)
|
|
|
|
return env
|
|
|
|
def _make_python_cmdline(self, exe, *args):
|
|
def normalize(s, strip_quotes=False):
|
|
# Convert py.path.local to string
|
|
if isinstance(s, py.path.local):
|
|
s = s.strpath
|
|
else:
|
|
s = str(s)
|
|
# Strip surrounding quotes if requested
|
|
if strip_quotes and len(s) >= 2 and " " in s and (s[0] == s[-1] == '"' or s[0] == s[-1] == "'"):
|
|
s = s[1:-1]
|
|
return s
|
|
|
|
# Strip quotes from exe
|
|
result = [normalize(exe, strip_quotes=True)]
|
|
for arg in args:
|
|
# Don't strip quotes on anything except the exe
|
|
result.append(normalize(arg, strip_quotes=False))
|
|
return result
|
|
|
|
def spawn_debuggee(self, args, cwd=None, exe=sys.executable, setup=None):
|
|
assert self.debuggee is None
|
|
assert not len(self.captured_output - {"stdout", "stderr"})
|
|
|
|
args = self._make_python_cmdline(exe, *args)
|
|
cwd = cwd.strpath if isinstance(cwd, py.path.local) else cwd
|
|
|
|
env = self._make_env(self.spawn_debuggee.env, codecov=False)
|
|
self.adapter_endpoints = self.tmpdir / "adapter_endpoints"
|
|
env["DEBUGPY_ADAPTER_ENDPOINTS"] = self.adapter_endpoints.strpath
|
|
if setup is not None:
|
|
env["DEBUGPY_TEST_DEBUGGEE_SETUP"] = setup
|
|
|
|
log.info(
|
|
"Spawning {0}:\n\n"
|
|
"Current directory: {1}\n\n"
|
|
"Command line: {2}\n\n"
|
|
"Environment variables: {3}\n\n",
|
|
self.debuggee_id,
|
|
json.repr(cwd),
|
|
json.repr(args),
|
|
json.repr(env),
|
|
)
|
|
|
|
popen_fds = {}
|
|
capture_fds = {}
|
|
for stream_name in self.captured_output:
|
|
rfd, wfd = os.pipe()
|
|
popen_fds[stream_name] = wfd
|
|
capture_fds[stream_name] = rfd
|
|
self.debuggee = psutil.Popen(
|
|
args, cwd=cwd, env=env, bufsize=0, stdin=subprocess.PIPE, **popen_fds
|
|
)
|
|
log.info("Spawned {0} with PID={1}", self.debuggee_id, self.debuggee.pid)
|
|
watchdog.register_spawn(self.debuggee.pid, self.debuggee_id)
|
|
|
|
if len(capture_fds):
|
|
self.captured_output = output.CapturedOutput(self, **capture_fds)
|
|
for fd in popen_fds.values():
|
|
os.close(fd)
|
|
|
|
def wait_for_adapter_socket(self):
|
|
log.info(
|
|
"Waiting for {0} to open the client listener socket...", self.adapter_id
|
|
)
|
|
while not self.adapter_endpoints.check():
|
|
time.sleep(0.1)
|
|
|
|
def spawn_adapter(self, args=()):
|
|
assert self.adapter is None
|
|
assert self.channel is None
|
|
|
|
args = self._make_python_cmdline(
|
|
sys.executable, os.path.dirname(debugpy.adapter.__file__), *args
|
|
)
|
|
env = self._make_env(self.spawn_adapter.env)
|
|
|
|
log.info(
|
|
"Spawning {0}:\n\n"
|
|
"Command line: {1}\n\n"
|
|
"Environment variables: {2}\n\n",
|
|
self.adapter_id,
|
|
json.repr(args),
|
|
json.repr(env),
|
|
)
|
|
self.adapter = psutil.Popen(
|
|
args,
|
|
bufsize=0,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
env=env,
|
|
)
|
|
log.info("Spawned {0} with PID={1}", self.adapter_id, self.adapter.pid)
|
|
watchdog.register_spawn(self.adapter.pid, self.adapter_id)
|
|
|
|
stream = messaging.JsonIOStream.from_process(self.adapter, name=self.adapter_id)
|
|
self._start_channel(stream)
|
|
|
|
def expect_server_socket(self, port=some.int):
|
|
self.expected_adapter_sockets["server"] = {
|
|
"host": some.str,
|
|
"port": port,
|
|
"internal": True,
|
|
}
|
|
|
|
def connect_to_adapter(self, address):
|
|
assert self.channel is None
|
|
|
|
self.before_connect(address)
|
|
host, port = address
|
|
log.info("Connecting to {0} at {1}:{2}", self.adapter_id, host, port)
|
|
|
|
self.expected_adapter_sockets["client"]["port"] = port
|
|
|
|
ipv6 = host.count(":") > 1
|
|
sock = sockets.create_client(ipv6)
|
|
sock.connect(address)
|
|
|
|
stream = messaging.JsonIOStream.from_socket(sock, name=self.adapter_id)
|
|
self._start_channel(stream)
|
|
|
|
def start(self):
|
|
config = self.config
|
|
request = config.get("request", None)
|
|
if request == "attach":
|
|
host = config["connect"]["host"]
|
|
port = config["connect"]["port"]
|
|
self.connect_to_adapter((host, port))
|
|
return self.request_attach()
|
|
else:
|
|
raise ValueError(
|
|
f'Unsupported "request":{json.repr(request)} in session.config'
|
|
)
|
|
|
|
def request(self, *args, **kwargs):
|
|
freeze = kwargs.pop("freeze", True)
|
|
raise_if_failed = kwargs.pop("raise_if_failed", True)
|
|
return (
|
|
self.send_request(*args, **kwargs)
|
|
.wait_for_response(freeze=freeze, raise_if_failed=raise_if_failed)
|
|
.body
|
|
)
|
|
|
|
def send_request(self, command, arguments=None, proceed=True):
|
|
self.before_request(command, arguments)
|
|
|
|
if self.timeline.is_frozen and proceed:
|
|
self.proceed()
|
|
|
|
if command in ("launch", "attach"):
|
|
self.start_command = command
|
|
|
|
message = self.channel.send_request(command, arguments)
|
|
request = self.timeline.record_request(message)
|
|
|
|
if command in ("launch", "attach"):
|
|
self.start_request = request
|
|
|
|
# Register callback after recording the request, so that there's no race
|
|
# between it being recorded, and the response to it being received.
|
|
message.on_response(lambda response: self._process_response(request, response))
|
|
|
|
return request
|
|
|
|
def _process_event(self, event):
|
|
occ = self.timeline.record_event(event, block=False)
|
|
|
|
if event.event == "exited":
|
|
self.observe(occ)
|
|
self.exit_code = event("exitCode", int)
|
|
self.exit_reason = event("reason", str, optional=True)
|
|
assert self.exit_code == self.expected_exit_code
|
|
|
|
elif event.event == "terminated":
|
|
# Server socket should be closed next.
|
|
self.expected_adapter_sockets.pop("server", None)
|
|
|
|
elif event.event == "debugpyAttach":
|
|
self.observe(occ)
|
|
pid = event("subProcessId", int)
|
|
watchdog.register_spawn(pid, f"{self.debuggee_id}-subprocess-{pid}")
|
|
|
|
elif event.event == "debugpySockets":
|
|
assert not self.is_subprocess
|
|
sockets = list(event("sockets", json.array(json.object())))
|
|
for purpose, expected_socket in self.expected_adapter_sockets.items():
|
|
if expected_socket is None:
|
|
continue
|
|
socket = None
|
|
for socket in sockets:
|
|
if socket == expected_socket:
|
|
break
|
|
assert (
|
|
socket is not None
|
|
), f"Expected {purpose} socket {expected_socket} not reported by adapter"
|
|
sockets.remove(socket)
|
|
assert not sockets, f"Unexpected sockets reported by adapter: {sockets}"
|
|
|
|
if self.start_command == "launch":
|
|
if "launcher" in self.expected_adapter_sockets:
|
|
# If adapter has just reported the launcher socket, it shouldn't be
|
|
# reported thereafter.
|
|
self.expected_adapter_sockets["launcher"] = None
|
|
elif "server" in self.expected_adapter_sockets:
|
|
# If adapter just reported the server socket, the next event should
|
|
# report the launcher socket.
|
|
self.expected_adapter_sockets["launcher"] = {
|
|
"host": some.str,
|
|
"port": some.int,
|
|
"internal": False,
|
|
}
|
|
|
|
def run_in_terminal(self, args, cwd, env):
|
|
exe = args.pop(0)
|
|
if getattr(self, "_run_in_terminal_args_can_be_interpreted_by_shell", False):
|
|
exe = self._shell_unquote(exe)
|
|
args = [self._shell_unquote(a) for a in args]
|
|
self.spawn_debuggee.env.update(env)
|
|
self.spawn_debuggee(args, cwd, exe=exe)
|
|
return {}
|
|
|
|
@staticmethod
|
|
def _shell_unquote(s):
|
|
s = str(s)
|
|
if len(s) >= 2 and s[0] == s[-1] and s[0] in ("\"", "'"):
|
|
return s[1:-1]
|
|
return s
|
|
|
|
@classmethod
|
|
def _split_shell_arg_string(cls, s):
|
|
"""Split a shell argument string into args, honoring simple single/double quotes.
|
|
|
|
This is intentionally minimal: it matches how terminals remove surrounding quotes
|
|
before passing args to the spawned process, which our tests need to emulate.
|
|
"""
|
|
s = str(s)
|
|
args = []
|
|
current = []
|
|
quote = None
|
|
|
|
def flush():
|
|
if current:
|
|
args.append("".join(current))
|
|
current.clear()
|
|
|
|
for ch in s:
|
|
if quote is None:
|
|
if ch.isspace():
|
|
flush()
|
|
continue
|
|
if ch in ("\"", "'"):
|
|
quote = ch
|
|
continue
|
|
current.append(ch)
|
|
else:
|
|
if ch == quote:
|
|
quote = None
|
|
continue
|
|
current.append(ch)
|
|
flush()
|
|
|
|
return [cls._shell_unquote(a) for a in args]
|
|
|
|
def _process_request(self, request):
|
|
self.timeline.record_request(request, block=False)
|
|
if request.command == "runInTerminal":
|
|
args = request("args", json.array(str, vectorize=True))
|
|
args_can_be_interpreted_by_shell = request("argsCanBeInterpretedByShell", False)
|
|
if len(args) > 0 and args_can_be_interpreted_by_shell:
|
|
# The final arg is a string that contains multiple actual arguments.
|
|
# Split it like a shell would, but keep the rest of the args (including
|
|
# any quoting) intact so tests can inspect the raw runInTerminal argv.
|
|
last_arg = args.pop()
|
|
args += self._split_shell_arg_string(last_arg)
|
|
cwd = request("cwd", ".")
|
|
env = request("env", json.object(str))
|
|
try:
|
|
self._run_in_terminal_args_can_be_interpreted_by_shell = (
|
|
args_can_be_interpreted_by_shell
|
|
)
|
|
return self.run_in_terminal(args, cwd, env)
|
|
except Exception as exc:
|
|
log.swallow_exception('"runInTerminal" failed:')
|
|
raise request.cant_handle(str(exc))
|
|
finally:
|
|
self._run_in_terminal_args_can_be_interpreted_by_shell = False
|
|
|
|
elif request.command == "startDebugging":
|
|
pid = request("configuration", dict)("subProcessId", int)
|
|
watchdog.register_spawn(pid, f"{self.debuggee_id}-subprocess-{pid}")
|
|
return {}
|
|
|
|
else:
|
|
raise request.isnt_valid("not supported")
|
|
|
|
def _process_response(self, request, response):
|
|
self.timeline.record_response(request, response, block=False)
|
|
if request.command == "disconnect":
|
|
# Stop the message loop, since debugpy is going to close the connection
|
|
# from its end shortly after sending this event, and no further messages
|
|
# are expected.
|
|
log.info(
|
|
'Received "disconnect" response from {0}; stopping message processing.',
|
|
self.adapter_id,
|
|
)
|
|
try:
|
|
self.channel.close()
|
|
except Exception:
|
|
pass
|
|
|
|
def _process_disconnect(self):
|
|
self.timeline.mark("disconnect", block=False)
|
|
|
|
def _start_channel(self, stream):
|
|
handlers = messaging.MessageHandlers(
|
|
request=self._process_request,
|
|
event=self._process_event,
|
|
disconnect=self._process_disconnect,
|
|
)
|
|
self.channel = messaging.JsonMessageChannel(stream, handlers)
|
|
self.channel.start()
|
|
|
|
self.wait_for_next(
|
|
timeline.Event(
|
|
"output",
|
|
{
|
|
"category": "telemetry",
|
|
"output": "ptvsd",
|
|
"data": {"packageVersion": some.str},
|
|
},
|
|
)
|
|
& timeline.Event(
|
|
"output",
|
|
{
|
|
"category": "telemetry",
|
|
"output": "debugpy",
|
|
"data": {"packageVersion": some.str},
|
|
},
|
|
)
|
|
)
|
|
|
|
if not self.is_subprocess:
|
|
self.wait_for_next(timeline.Event("debugpySockets"))
|
|
|
|
self.request("initialize", self.capabilities)
|
|
|
|
def all_events(self, event, body=some.object):
|
|
return [
|
|
occ.body
|
|
for occ in self.timeline.all_occurrences_of(timeline.Event(event, body))
|
|
]
|
|
|
|
def output(self, category):
|
|
"""Returns all output of a given category as a single string, assembled from
|
|
all the "output" events received for that category so far.
|
|
"""
|
|
events = self.all_events("output", some.dict.containing({"category": category}))
|
|
return "".join(event("output", str) for event in events)
|
|
|
|
def _request_start(self, method):
|
|
self.config.normalize()
|
|
start_request = self.send_request(method, self.config)
|
|
|
|
# Depending on whether it's "noDebug" or not, we either get the "initialized"
|
|
# event, or an immediate response to our request.
|
|
self.timeline.wait_until_realized(
|
|
timeline.Event("initialized") | timeline.Response(start_request),
|
|
freeze=True,
|
|
)
|
|
|
|
if start_request.response is not None:
|
|
# It was an immediate response - either the request failed, or there is
|
|
# no configuration stage for this debug session.
|
|
start_request.response.result # raise exception if failed
|
|
return self.wait_for_process()
|
|
|
|
# We got "initialized" - now we need to yield to the caller, so that it can
|
|
# configure the session before it starts running.
|
|
return self._ConfigurationContextManager(self)
|
|
|
|
class _ConfigurationContextManager(object):
|
|
"""Handles the start configuration sequence from "initialized" event until
|
|
start_request receives a response.
|
|
"""
|
|
|
|
def __init__(self, session):
|
|
self.session = session
|
|
self._entered = False
|
|
|
|
def __enter__(self):
|
|
self._entered = True
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.session.request("configurationDone")
|
|
self.session.start_request.wait_for_response()
|
|
self.session.wait_for_process()
|
|
|
|
def __del__(self):
|
|
assert self._entered, (
|
|
"The return value of request_launch() or request_attach() must be "
|
|
"used in a with-statement."
|
|
)
|
|
|
|
def request_launch(self):
|
|
if "PYTHONPATH" in self.config.env:
|
|
# If specified, launcher will use it in lieu of PYTHONPATH it inherited
|
|
# from the adapter when spawning debuggee, so we need to adjust again.
|
|
self.config.env.prepend_to("PYTHONPATH", DEBUGGEE_PYTHONPATH.strpath)
|
|
|
|
# Adapter is going to start listening for server and spawn the launcher at
|
|
# this point. Server socket gets reported first.
|
|
self.expect_server_socket()
|
|
|
|
return self._request_start("launch")
|
|
|
|
def request_attach(self):
|
|
# In attach(listen) scenario, adapter only starts listening for server
|
|
# after receiving the "attach" request.
|
|
listen = self.config.get("listen", None)
|
|
if listen is not None:
|
|
assert "server" not in self.expected_adapter_sockets
|
|
self.expect_server_socket(listen["port"])
|
|
return self._request_start("attach")
|
|
|
|
def request_continue(self):
|
|
self.request("continue", freeze=False)
|
|
|
|
def set_breakpoints(self, path, lines):
|
|
"""Sets breakpoints in the specified file, and returns the list of all the
|
|
corresponding DAP Breakpoint objects in the same order.
|
|
|
|
If lines are specified, it should be an iterable in which every element is
|
|
either a line number or a string. If it is a string, then it is translated
|
|
to the corresponding line number via get_marked_line_numbers(path).
|
|
|
|
If lines=all, breakpoints will be set on all the marked lines in the file.
|
|
"""
|
|
|
|
# Don't fetch line markers unless needed - in some cases, the breakpoints
|
|
# might be set in a file that does not exist on disk (e.g. remote attach).
|
|
get_marked_line_numbers = lambda: code.get_marked_line_numbers(path)
|
|
|
|
if lines is all:
|
|
lines = get_marked_line_numbers().keys()
|
|
|
|
def make_breakpoint(line):
|
|
if isinstance(line, int):
|
|
descr = str(line)
|
|
else:
|
|
marker = line
|
|
line = get_marked_line_numbers()[marker]
|
|
descr = f"{line} (@{marker})"
|
|
bp_log.append((line, descr))
|
|
return {"line": line}
|
|
|
|
bp_log = []
|
|
breakpoints = self.request(
|
|
"setBreakpoints",
|
|
{
|
|
"source": {"path": path},
|
|
"breakpoints": [make_breakpoint(line) for line in lines],
|
|
},
|
|
)("breakpoints", json.array())
|
|
|
|
bp_log = sorted(bp_log, key=lambda pair: pair[0])
|
|
bp_log = ", ".join((descr for _, descr in bp_log))
|
|
log.info("Breakpoints set in {0}: {1}", path, bp_log)
|
|
|
|
return breakpoints
|
|
|
|
def get_variables(self, *varnames, **kwargs):
|
|
"""Fetches the specified variables from the frame specified by frame_id, or
|
|
from the topmost frame in the last "stackTrace" response if frame_id is not
|
|
specified.
|
|
|
|
If varnames is empty, then all variables in the frame are returned. The result
|
|
is an OrderedDict, in which every entry has variable name as the key, and a
|
|
DAP Variable object as the value. The original order of variables as reported
|
|
by the debugger is preserved.
|
|
|
|
If varnames is not empty, then only the specified variables are returned.
|
|
The result is a tuple, in which every entry is a DAP Variable object; those
|
|
entries are in the same order as varnames.
|
|
"""
|
|
|
|
assert self.timeline.is_frozen
|
|
|
|
frame_id = kwargs.pop("frame_id", None)
|
|
if frame_id is None:
|
|
stackTrace_responses = self.all_occurrences_of(
|
|
timeline.Response(timeline.Request("stackTrace"))
|
|
)
|
|
assert stackTrace_responses, (
|
|
"get_variables() without frame_id requires at least one response "
|
|
'to a "stackTrace" request in the timeline.'
|
|
)
|
|
stack_trace = stackTrace_responses[-1]
|
|
frame_id = stack_trace.body.get("stackFrames", json.array())[0]("id", int)
|
|
|
|
scopes = self.request("scopes", {"frameId": frame_id})("scopes", json.array())
|
|
assert len(scopes) > 0
|
|
|
|
variables = self.request(
|
|
"variables", {"variablesReference": scopes[0]("variablesReference", int)}
|
|
)("variables", json.array())
|
|
|
|
variables = collections.OrderedDict(((v("name", str), v) for v in variables))
|
|
if varnames:
|
|
assert set(varnames) <= set(variables.keys())
|
|
return tuple((variables[name] for name in varnames))
|
|
else:
|
|
return variables
|
|
|
|
def get_variable(self, varname, frame_id=None):
|
|
"""Same as get_variables(...)[0]."""
|
|
return self.get_variables(varname, frame_id=frame_id)[0]
|
|
|
|
def wait_for_next_event(self, event, body=some.object, freeze=True):
|
|
return self.timeline.wait_for_next(
|
|
timeline.Event(event, body), freeze=freeze
|
|
).body
|
|
|
|
def wait_for_process(self):
|
|
process = self.wait_for_next_event("process", freeze=False)
|
|
assert process == some.dict.containing(
|
|
{
|
|
"startMethod": self.start_request.command,
|
|
"name": some.str,
|
|
"isLocalProcess": True,
|
|
"systemProcessId": some.int,
|
|
}
|
|
)
|
|
|
|
def wait_for_stop(
|
|
self,
|
|
reason=some.str,
|
|
expected_frames=None,
|
|
expected_text=None,
|
|
expected_description=None,
|
|
):
|
|
stopped = self.wait_for_next_event("stopped")
|
|
|
|
expected_stopped = {
|
|
"reason": reason,
|
|
"threadId": some.int,
|
|
"allThreadsStopped": True,
|
|
}
|
|
if expected_text is not None:
|
|
expected_stopped["text"] = expected_text
|
|
if expected_description is not None:
|
|
expected_stopped["description"] = expected_description
|
|
if stopped("reason", str) not in [
|
|
"step",
|
|
"exception",
|
|
"breakpoint",
|
|
"entry",
|
|
"goto",
|
|
]:
|
|
expected_stopped["preserveFocusHint"] = True
|
|
assert stopped == some.dict.containing(expected_stopped)
|
|
|
|
tid = stopped("threadId", int)
|
|
stack_trace = self.request("stackTrace", {"threadId": tid})
|
|
frames = stack_trace("stackFrames", json.array()) or []
|
|
assert len(frames) == stack_trace("totalFrames", int)
|
|
|
|
if expected_frames:
|
|
assert len(expected_frames) <= len(frames)
|
|
assert expected_frames == frames[0 : len(expected_frames)]
|
|
|
|
assert len(frames) > 0
|
|
|
|
fid = frames[0]("id", int)
|
|
return StopInfo(stopped, frames, tid, fid)
|
|
|
|
def wait_for_next_subprocess(self):
|
|
message = self.timeline.wait_for_next(
|
|
timeline.Event("debugpyAttach") | timeline.Request("startDebugging")
|
|
)
|
|
if isinstance(message, timeline.EventOccurrence):
|
|
config = message.body
|
|
assert "request" in config
|
|
elif isinstance(message, timeline.RequestOccurrence):
|
|
config = dict(message.body("configuration", dict))
|
|
assert "request" not in config
|
|
config["request"] = "attach"
|
|
return Session(config)
|
|
|
|
def wait_for_disconnect(self):
|
|
self.timeline.wait_until_realized(timeline.Mark("disconnect"), freeze=True)
|
|
|
|
def wait_for_exit(self):
|
|
if self.debuggee is not None:
|
|
log.info("Waiting for {0} to exit ...", self.debuggee_id)
|
|
try:
|
|
self.debuggee.wait()
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
watchdog.unregister_spawn(self.debuggee.pid, self.debuggee_id)
|
|
|
|
self.wait_for_terminated()
|
|
|
|
# FIXME: "exited" event is not properly reported in attach scenarios at the
|
|
# moment, so the exit code is only checked if it's present.
|
|
if self.debuggee is not None and self.exit_code is not None:
|
|
assert self.debuggee.returncode == self.exit_code
|
|
return self.exit_code
|
|
|
|
def wait_for_terminated(self):
|
|
self.timeline.wait_until_realized(timeline.Event("terminated"))
|
|
|
|
def captured_stdout(self, encoding=None):
|
|
assert self.debuggee is not None
|
|
return self.captured_output.stdout(encoding)
|
|
|
|
def captured_stderr(self, encoding=None):
|
|
assert self.debuggee is not None
|
|
return self.captured_output.stderr(encoding)
|
|
|
|
def disconnect(self, force=False):
|
|
if self.channel is None:
|
|
return
|
|
|
|
try:
|
|
if not force:
|
|
self.request("disconnect")
|
|
self.wait_for_terminated()
|
|
except messaging.JsonIOError:
|
|
pass
|
|
finally:
|
|
try:
|
|
self.channel.close()
|
|
except Exception:
|
|
pass
|
|
self.channel.wait()
|
|
self.channel = None
|
|
|
|
|
|
Session.reset_counter()
|