Refactor tests.debug to accommodate ptvsd.server spawning the adapter, and remove the need for "custom_client" and "custom_server" start methods.

Fix launcher not propagating debuggee exit code.

Fix attach-by-PID without explicit --log-dir overriding PTVSD_LOG_DIR (and disabling logging).

Improve test logging, with a separate directory for every test.

Various test fixes.
This commit is contained in:
Pavel Minaev 2019-09-13 19:59:11 -07:00 committed by Pavel Minaev
parent 346ebd47cd
commit 8f358d6e0f
45 changed files with 1878 additions and 2818 deletions

View file

@ -4,18 +4,9 @@
from __future__ import absolute_import, division, print_function, unicode_literals
import py
import ptvsd
PTVSD_DIR = py.path.local(ptvsd.__file__) / ".."
PTVSD_ADAPTER_DIR = PTVSD_DIR / "adapter"
# Added to the environment variables of all adapters and servers.
PTVSD_ENV = {"PYTHONUNBUFFERED": "1"}
# Expose Session directly.
def Session(*args, **kwargs):
from tests.debug import session
return session.Session(*args, **kwargs)

View file

@ -24,7 +24,7 @@ class BackChannel(object):
self._server_socket = None
def __str__(self):
return fmt("backchannel-{0}", self.session.id)
return fmt("{0}.backchannel", self.session.debuggee_id)
def listen(self):
self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@ -70,14 +70,6 @@ class BackChannel(object):
self._stream.write_json(value)
return t
def expect(self, expected):
actual = self.receive()
assert expected == actual, fmt(
"Test expected {0!r} on backchannel, but got {1!r} from the debuggee",
expected,
actual,
)
def close(self):
if self._socket:
log.debug("Closing {0} socket of {1}...", self, self.session)

222
tests/debug/config.py Normal file
View file

@ -0,0 +1,222 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import absolute_import, division, print_function, unicode_literals
import collections
import os
class DebugConfig(collections.MutableMapping):
"""Debug configuration for a session. Corresponds to bodies of DAP "launch" and
"attach" requests, or launch.json in VSCode.
It is a dict-like object that only allows keys that are valid debug configuration
properties for ptvsd. When a property is queried, but it's not explicitly set in
the config, the default value (i.e. what ptvsd will assume the property is set to)
is returned.
In addition, it exposes high-level wrappers over "env" and "debugOptions".
"""
# Valid configuration properties. Keys are names, and values are defaults that
# are assumed by the adapter and/or the server if the property is not specified.
# If the property is required, or if the default is computed in such a way that
# it cannot be predicted, the value is ().
PROPERTIES = {
# Common
"breakOnSystemExitZero": False,
"debugOptions": [],
"django": False,
"jinja": False,
"justMyCode": False,
"flask": False,
"logToFile": False,
"maxExceptionStackFrames": (),
"name": (),
"noDebug": False,
"pathMappings": [],
"pyramid": False,
"pythonPath": (),
"redirectOutput": False,
"rules": [],
"showReturnValue": True,
"steppingResumesAllThreads": True,
"subProcess": False,
"successExitCodes": [0],
"type": (),
# Launch
"args": [],
"code": (),
"console": "internal",
"cwd": (),
"env": {},
"gevent": False,
"internalConsoleOptions": "neverOpen",
"module": (),
"program": (),
"stopOnEntry": False,
"sudo": False,
"waitOnNormalExit": False,
"waitOnAbnormalExit": False,
# Attach by socket
"host": (),
"port": (),
# Attach by PID
"processId": (),
}
def __init__(self, *args, **kwargs):
self._dict = dict(*args, **kwargs)
self.env = self.Env(self)
self.debug_options = self.DebugOptions(self)
def __iter__(self):
return iter(self._dict)
def __len__(self):
return len(self._dict)
def __contains__(self, key):
return key in self._dict
def __getitem__(self, key):
try:
return self._dict[key]
except KeyError:
try:
value = self.PROPERTIES[key]
except KeyError:
pass
else:
if value != ():
return value
raise
def __delitem__(self, key):
del self._dict[key]
def __setitem__(self, key, value):
assert key in self.PROPERTIES
self._dict[key] = value
def __getstate__(self):
return dict(self)
def setdefault(self, key, default=None):
if key not in self:
self[key] = default
return self[key]
def setdefaults(self, defaults):
"""Like setdefault(), but sets multiple default values at once.
"""
for k, v in defaults.items():
self.setdefault(k, v)
def normalize(self):
"""Normalizes the debug configuration by adding any derived properties, in
the manner similar to what VSCode does to launch.json before submitting it
to the adapter.
"""
if self["showReturnValue"]:
self.debug_options.add("ShowReturnValue")
if self["redirectOutput"]:
self.debug_options.add("RedirectOutput")
if not self["justMyCode"]:
self.debug_options.add("DebugStdLib")
if self["django"]:
self.debug_options.add("Django")
if self["jinja"]:
self.debug_options.add("Jinja")
if self["flask"]:
self.debug_options.add("Flask")
if self["pyramid"]:
self.debug_options.add("Pyramid")
if self["subProcess"]:
self.debug_options.add("Multiprocess")
if self["breakOnSystemExitZero"]:
self.debug_options.add("BreakOnSystemExitZero")
if self["stopOnEntry"]:
self.debug_options.add("StopOnEntry")
if self["waitOnNormalExit"]:
self.debug_options.add("WaitOnNormalExit")
if self["waitOnAbnormalExit"]:
self.debug_options.add("WaitOnAbnormalExit")
class Env(collections.MutableMapping):
"""Wraps config["env"], automatically creating and destroying it as needed.
"""
def __init__(self, config):
self.config = config
def __iter__(self):
return iter(self.config["env"])
def __len__(self):
return len(self.config["env"])
def __getitem__(self, key):
return self.config["env"][key]
def __delitem__(self, key):
env = self.config.get("env", {})
del env[key]
if not len(env):
del self.config["env"]
def __setitem__(self, key, value):
self.config.setdefault("env", {})[key] = value
def __getstate__(self):
return dict(self)
def prepend_to(self, key, entry):
"""Prepends a new entry to a PATH-style environment variable, creating
it if it doesn't exist already.
"""
try:
tail = os.path.pathsep + self[key]
except KeyError:
tail = ""
self[key] = entry + tail
class DebugOptions(collections.MutableSet):
"""Wraps config["debugOptions"], automatically creating and destroying it as
needed, and providing set operations for it.
"""
def __init__(self, config):
self.config = config
def __iter__(self):
return iter(self.config["debugOptions"])
def __len__(self):
return len(self.config["env"])
def __contains__(self, key):
return key in self.config["debugOptions"]
def add(self, key):
opts = self.config.setdefault("debugOptions", [])
if key not in opts:
opts.append(key)
def discard(self, key):
opts = self.config.get("debugOptions", [])
opts[:] = [x for x in opts if x != key]

View file

@ -9,7 +9,7 @@ import threading
from ptvsd.common import fmt, log
class CaptureOutput(object):
class CapturedOutput(object):
"""Captures stdout and stderr of the debugged process.
"""
@ -19,8 +19,14 @@ class CaptureOutput(object):
self._chunks = {}
self._worker_threads = []
assert not len(session.captured_output - {"stdout", "stderr"})
for stream_name in session.captured_output:
log.info("Capturing {0} {1}", session.debuggee_id, stream_name)
stream = getattr(session.debuggee, stream_name)
self._capture(stream, stream_name)
def __str__(self):
return fmt("CaptureOutput({0})", self.session)
return fmt("CapturedOutput({0})", self.session)
def _worker(self, pipe, name):
chunks = self._chunks[name]
@ -32,7 +38,7 @@ class CaptureOutput(object):
if not len(chunk):
break
log.info("{0} {1}> {2!r}", self.session, name, chunk)
log.info("{0} {1}:\n{2!r}", self.session.debuggee_id, name, chunk)
with self._lock:
chunks.append(chunk)
@ -47,20 +53,12 @@ class CaptureOutput(object):
thread.start()
self._worker_threads.append(thread)
def capture(self, process):
"""Start capturing stdout and stderr of the process.
"""
assert not self._worker_threads
log.info("Capturing {0} stdout and stderr", self.session)
self._capture(process.stdout, "stdout")
self._capture(process.stderr, "stderr")
def wait(self, timeout=None):
"""Wait for all remaining output to be captured.
"""
if not self._worker_threads:
return
log.debug("Waiting for remaining {0} stdout and stderr...", self.session)
log.debug("Waiting for remaining {0} output...", self.session.debuggee_id)
for t in self._worker_threads:
t.join(timeout)
self._worker_threads[:] = []
@ -70,7 +68,7 @@ class CaptureOutput(object):
result = self._chunks[which]
except KeyError:
raise AssertionError(
fmt("{0} was not captured for {1}", which, self.session)
fmt("{0} was not captured for {1}", which, self.session.debuggee_id)
)
with self._lock:

242
tests/debug/runners.py Normal file
View file

@ -0,0 +1,242 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import absolute_import, division, print_function, unicode_literals
"""Runners are recipes for executing Targets in a debug.Session.
Every function in this module that is decorated with @_runner must have at least two
positional arguments: (session, target) - and can have additional arguments. For every
such function, two artifacts are produced.
The function is exposed directly as a method on Session, with the session argument
becoming self.
The function is also exposed as a Runner object from this module. Runner objects are
callable, and invoke the wrapped function when called, but in addition, they can also
be bound to specific arguments, by using either [] or with_options(), which can be
chained arbitrarily::
# Direct invocation:
session.attach_by_socket("cli", log_dir="...")
# Indirect invocation:
run = runners.attach_by_socket
run = run["cli"]
run = run.with_options(log_dir="...")
run(session, target)
runner[x][y][z] is just a convenient shorthand for binding positional arguments, same
as runner.with_options(x, y, z).
Runners are immutable, so every use of [] or with_options() creates a new runner with
the specified arguments bound. The runner must have all its required arguments bound
before it can be invoked.
Regardless of whether the runner is invoked directly on the Session, or via a Runner
object, if the start DAP sequence involves a configuration phase (the "initialized"
event and the "configurationDone" request), the runner must be used in a with-statement.
The statements inside the with-statement are executed after receiving the "initialized"
event, and before sending the "configurationDone" request::
with run(session, target):
# DAP requests can be made to session, but target is not running yet.
session.set_breakpoints(...)
# target is running now!
If there is no configuration phase, the runner returns directly::
session.config["noDebug"] = True
run(session, target)
# target is running now!
"""
import os
import sys
import ptvsd
from ptvsd.common import compat, fmt, log
from tests import net
from tests.debug import session
def _runner(f):
assert f.__name__.startswith("launch") or f.__name__.startswith("attach")
setattr(session.Session, f.__name__, f)
class Runner(object):
request = "launch" if f.__name__.startswith("launch") else "attach"
def __init__(self, *args, **kwargs):
self._args = tuple(args)
self._kwargs = dict(kwargs)
def __getattr__(self, name):
return self._kwargs[name]
def __call__(self, session, target, *args, **kwargs):
if len(args) or len(kwargs):
return self.with_options(*args, **kwargs)(session, target)
return f(session, target, *self._args, **self._kwargs)
def __getitem__(self, arg):
return self.with_options(arg)
def with_options(self, *args, **kwargs):
new_args = self._args + args
new_kwargs = dict(self._kwargs)
new_kwargs.update(kwargs)
return Runner(*new_args, **new_kwargs)
def __repr__(self):
result = type(self).__name__
args = [str(x) for x in self._args] + [
fmt("{0}={1}", k, v) for k, v in self._kwargs.items()
]
if len(args):
result += "(" + ", ".join(args) + ")"
return result
@property
def pytest_id(self):
return repr(self)
Runner.__name__ = f.__name__
return Runner()
@_runner
def launch(session, target, console="integratedTerminal", cwd=None):
assert console in ("internalConsole", "integratedTerminal", "externalTerminal")
log.info("Launching {0} in {1} using {2!j}.", target, session, console)
target.configure(session)
config = session.config
config.setdefaults(
{
"console": "externalTerminal",
"internalConsoleOptions": "neverOpen",
"pythonPath": sys.executable,
}
)
config["console"] = console
if cwd is not None:
config["cwd"] = cwd
env = (
session.spawn_adapter.env
if config["console"] == "internalConsole"
else config.env
)
target.cli(env)
session.spawn_adapter()
return session.request_launch()
def _attach_common_config(session, target, cwd):
assert target.code is None or "debug_me" in target.code, fmt(
"{0} must import debug_me.", target.filename
)
target.configure(session)
config = session.config
if cwd is not None:
config.setdefault("pathMappings", [{"localRoot": cwd, "remoteRoot": "."}])
return config
@_runner
def attach_by_pid(session, target, cwd=None, wait=True):
log.info("Attaching {0} to {1} by PID.", session, target)
config = session.config
try:
config["processId"] = int(target)
except TypeError:
pass
if "processId" not in config:
_attach_common_config(session, target, cwd)
args = target.cli(session.spawn_debuggee.env)
if wait:
debug_me = """
import sys
while not "ptvsd" in sys.modules: pass
import ptvsd
while not ptvsd.is_attached(): pass
"""
else:
debug_me = None
session.spawn_debuggee(args, cwd=cwd, debug_me=debug_me)
config["processId"] = session.debuggee.pid
session.spawn_adapter()
return session.request_attach()
@_runner
def attach_by_socket(
session, target, method, listener="server", cwd=None, wait=True, log_dir=None
):
log.info(
"Attaching {0} to {1} by socket using {2}.", session, target, method.upper()
)
assert method in ("api", "cli")
assert listener in ("server") # TODO: ("adapter", "server")
config = _attach_common_config(session, target, cwd)
host = config["host"] = attach_by_socket.host
port = config["port"] = attach_by_socket.port
if method == "cli":
args = [os.path.dirname(ptvsd.__file__)]
if wait:
args += ["--wait"]
args += ["--host", compat.filename_str(host), "--port", str(port)]
if not config["subProcess"]:
args += ["--no-subprocesses"]
if log_dir is not None:
args += ["--log-dir", log_dir]
debug_me = None
elif method == "api":
args = []
debug_me = """
import ptvsd
ptvsd.enable_attach(({host!r}, {port!r}), {args})
if {wait!r}:
ptvsd.wait_for_attach()
"""
attach_args = "" if log_dir is None else fmt("log_dir={0!r}", log_dir)
debug_me = fmt(debug_me, host=host, port=port, wait=wait, args=attach_args)
else:
raise ValueError
args += target.cli(session.spawn_debuggee.env)
session.spawn_debuggee(args, cwd=cwd, debug_me=debug_me)
if wait:
session.wait_for_enable_attach()
session.connect_to_adapter((host, port))
return session.request_attach()
attach_by_socket.host = "127.0.0.1"
attach_by_socket.port = net.get_test_server_port(5678, 5800)
all_launch = [
launch["internalConsole"],
launch["integratedTerminal"],
launch["externalTerminal"],
]
all_attach = [attach_by_socket["api"], attach_by_socket["cli"], attach_by_pid]
all = all_launch + all_attach

View file

@ -5,57 +5,169 @@
from __future__ import absolute_import, division, print_function, unicode_literals
import collections
import contextlib
import itertools
import os
import psutil
import py
import subprocess
import sys
import time
from ptvsd.common import compat, fmt, json, log, messaging
import ptvsd.adapter
from ptvsd.common import compat, fmt, json, log, messaging, options, sockets, util
from ptvsd.common.compat import unicode
import tests
from tests import code, debug, timeline, watchdog
from tests.debug import comms, output
from tests import code, timeline, watchdog
from tests.debug import comms, config, output
from tests.patterns import some
DEBUGGEE_PYTHONPATH = tests.root / "DEBUGGEE_PYTHONPATH"
StopInfo = collections.namedtuple(
"StopInfo", ["body", "frames", "thread_id", "frame_id"]
)
class Session(object):
counter = itertools.count(1)
"""A test debug session. Manages the lifetime of the adapter and the debuggee
processes, captures debuggee stdio output, establishes a DAP message channel to
the debuggee, and records all DAP messages in that channel on a Timeline object.
_ignore_unobserved = [
timeline.Event("module"),
timeline.Event("continued"),
timeline.Event("exited"),
timeline.Event("terminated"),
timeline.Event("thread", some.dict.containing({"reason": "started"})),
timeline.Event("thread", some.dict.containing({"reason": "exited"})),
timeline.Event("output", some.dict.containing({"category": "stdout"})),
timeline.Event("output", some.dict.containing({"category": "stderr"})),
timeline.Event("output", some.dict.containing({"category": "console"})),
]
Must be used in a with-statement for proper cleanup. On successful exit - if no
exception escapes from the with-statement - the session will:
def __init__(
self, start_method, log_dir=None, client_id="vscode", backchannel=False
):
1. Invoke wait_for_exit(), unless expected_exit_code is None.
2. Invoke disconnect().
3. Wait for the adapter process to exit.
4. Finalize and closes the timeline
If the exit is due to an exception, the session will:
1. Invoke disconnect(force=True).
2. Kill the debuggee and the adapter processes.
Example::
with debug.Session() as session:
# Neither debuggee nor adapter are spawned yet. Initial configuration.
session.log_dir = ...
session.config.update({...})
with session.launch(...):
# Debuggee and adapter are spawned, but there is no code executing
# in the debuggee yet.
session.set_breakpoints(...)
# Code is executing in the debuggee.
session.wait_for_stop(expected_frames=[...])
assert session.get_variable(...) == ...
session.request_continue()
# Session is disconnected from the debuggee, and both the debuggee and the
# adapter processes have exited.
assert session.exit_code == ...
"""
tmpdir = None
"""Temporary directory in which Sessions can create the temp files they need.
Automatically set to tmpdir for the current test by pytest_fixtures.test_wrapper().
"""
_counter = itertools.count(1)
def __init__(self):
assert Session.tmpdir is not None
watchdog.start()
self.id = next(Session.counter)
self.log_dir = log_dir
self.start_method = start_method(self)
self.client_id = client_id
self.id = next(Session._counter)
log.info("Starting {0}", self)
self.client_id = "vscode"
self.debuggee = None
"""psutil.Popen instance for the debuggee process."""
self.adapter = None
"""psutil.Popen instance for the adapter process."""
self.channel = None
"""JsonMessageChannel to the adapter."""
self.captured_output = {"stdout", "stderr"}
"""Before the debuggee is spawned, this is the set of stdio streams that
should be captured once it is spawned.
After it is spawned, this is a CapturedOutput object capturing those streams.
"""
self.backchannel = None
"""The BackChannel object to talk to the debuggee.
Must be explicitly created with open_backchannel().
"""
self.scratchpad = comms.ScratchPad(self)
"""The ScratchPad object to talk to the debuggee."""
self.start_request = None
"""The "launch" or "attach" request that started executing code in this session.
"""
self.expected_exit_code = 0
"""The expected exit code for the debuggee process.
If None, the debuggee is not expected to exit when the Session is closed.
If not None, this is validated against both exit_code and debuggee.returncode.
"""
self.exit_code = None
"""The actual exit code for the debuggee process, as received from DAP.
"""
self.config = config.DebugConfig(
{
"justMyCode": True,
"name": "Test",
"redirectOutput": True,
"type": "python",
}
)
"""The debug configuration for this session."""
self.log_dir = (
None
if options.log_dir is None
else py.path.local(options.log_dir) / str(self)
)
"""The log directory for this session. Passed via PTVSD_LOG_DIR to all spawned
child processes.
If set to None, PTVSD_LOG_DIR is not automatically added, but tests can still
provide it manually.
"""
self.tmpdir = Session.tmpdir / str(self)
self.tmpdir.ensure(dir=True)
self.timeline = timeline.Timeline(str(self))
self.ignore_unobserved.extend(self._ignore_unobserved)
self.ignore_unobserved.extend(self.start_method.ignore_unobserved)
self.adapter_process = None
self.channel = None
self.backchannel = comms.BackChannel(self) if backchannel else None
self.scratchpad = comms.ScratchPad(self)
self.ignore_unobserved.extend(
[
timeline.Event("module"),
timeline.Event("continued"),
# timeline.Event("exited"),
# timeline.Event("terminated"),
timeline.Event("thread", some.dict.containing({"reason": "started"})),
timeline.Event("thread", some.dict.containing({"reason": "exited"})),
timeline.Event("output", some.dict.containing({"category": "stdout"})),
timeline.Event("output", some.dict.containing({"category": "stderr"})),
timeline.Event("output", some.dict.containing({"category": "console"})),
]
)
# Expose some common members of timeline directly - these should be the ones
# that are the most straightforward to use, and are difficult to use incorrectly.
@ -70,27 +182,40 @@ class Session(object):
self.all_occurrences_of = self.timeline.all_occurrences_of
self.observe_all = self.timeline.observe_all
spawn_adapter = self.spawn_adapter
self.spawn_adapter = lambda *args, **kwargs: spawn_adapter(*args, **kwargs)
self.spawn_adapter.env = util.Env()
spawn_debuggee = self.spawn_debuggee
self.spawn_debuggee = lambda *args, **kwargs: spawn_debuggee(*args, **kwargs)
self.spawn_debuggee.env = util.Env()
def __str__(self):
return fmt("ptvsd-{0}", self.id)
return fmt("Session-{0}", self.id)
@property
def adapter_id(self):
return fmt("adapter-{0}", self.id)
return fmt("Adapter-{0}", self.id)
@property
def debuggee_id(self):
return fmt("debuggee-{0}", self.id)
return fmt("Debuggee-{0}", self.id)
def __enter__(self):
self._start_adapter()
self._handshake()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.timeline.is_frozen:
self.timeline.unfreeze()
# Only wait for exit if there was no exception in the test - if there was one,
# the debuggee might still be waiting for further requests.
if exc_type is None:
# Only wait for debuggee if there was no exception in the test - if there
# was one, the debuggee might still be waiting for further requests.
self.start_method.wait_for_debuggee()
# If expected_exit_code is set to None, the debuggee is not expected to
# exit after this Session is closed (e.g. because another Session will
# attach to it later on).
if self.expected_exit_code is not None:
self.wait_for_exit()
else:
# Log the error, in case another one happens during shutdown.
log.exception(exc_info=(exc_type, exc_val, exc_tb))
@ -101,48 +226,170 @@ class Session(object):
else:
# If there was an exception, don't try to send any more messages to avoid
# spamming log with irrelevant entries - just close the channel and kill
# the adapter process immediately. Don't close or finalize the timeline,
# either, since it'll have unobserved events in it.
# all the processes immediately. Don't close or finalize the timeline,
# either, since it'll likely have unobserved events in it.
self.disconnect(force=True)
if self.adapter_process is not None:
if self.adapter is not None:
try:
self.adapter_process.kill()
self.adapter.kill()
except Exception:
pass
if self.debuggee is not None:
try:
self.debuggee.kill()
except Exception:
pass
if self.adapter_process is not None:
if self.adapter is not None:
log.info(
"Waiting for {0} with PID={1} to exit.",
self.adapter_id,
self.adapter_process.pid,
self.adapter.pid,
)
self.adapter_process.wait()
watchdog.unregister_spawn(self.adapter_process.pid, self.adapter_id)
self.adapter_process = None
self.adapter.wait()
watchdog.unregister_spawn(self.adapter.pid, self.adapter_id)
self.adapter = None
if self.backchannel:
if self.backchannel is not None:
self.backchannel.close()
self.backchannel = None
@property
def process(self):
return self.start_method.debuggee_process
@property
def pid(self):
return self.process.pid
@property
def ignore_unobserved(self):
return self.timeline.ignore_unobserved
@property
def expected_exit_code(self):
return self.start_method.expected_exit_code
def open_backchannel(self):
assert self.backchannel is None
self.backchannel = comms.BackChannel(self)
self.backchannel.listen()
return self.backchannel
@expected_exit_code.setter
def expected_exit_code(self, value):
self.start_method.expected_exit_code = value
def _init_log_dir(self):
if self.log_dir is None:
return False
log.info("Logs for {0} will be in {1!j}", self, self.log_dir)
try:
self.log_dir.remove()
except Exception:
pass
self.log_dir.ensure(dir=True)
# Make subsequent calls of this method no-op for the remainder of the session.
self._init_log_dir = lambda: True
return True
def _make_env(self, base_env, codecov=True):
env = util.Env.snapshot()
if base_env is not None:
base_env = dict(base_env)
python_path = base_env.pop("PYTHONPATH", None)
if python_path is not None:
env.prepend_to("PYTHONPATH", python_path)
env.update(base_env)
env["PTVSD_TEST_SESSION_ID"] = str(self.id)
env.prepend_to("PYTHONPATH", DEBUGGEE_PYTHONPATH.strpath)
if self._init_log_dir():
env.update(
{
"PTVSD_LOG_DIR": self.log_dir.strpath,
"PYDEVD_DEBUG": "True",
"PYDEVD_DEBUG_FILE": (self.log_dir / "pydevd.log").strpath,
}
)
if self.backchannel is not None:
env["PTVSD_TEST_BACKCHANNEL_PORT"] = str(self.backchannel.port)
return env
def spawn_debuggee(self, args, cwd=None, exe=sys.executable, debug_me=None):
assert self.debuggee is None
args = [exe] + [
compat.filename_str(s.strpath if isinstance(s, py.path.local) else s)
for s in args
]
env = self._make_env(self.spawn_debuggee.env, codecov=False)
env["PTVSD_LISTENER_FILE"] = self.listener_file = self.tmpdir / "listener"
if debug_me is not None:
env["PTVSD_TEST_DEBUG_ME"] = debug_me
log.info(
"Spawning {0}:\n\n"
"Current directory: {1!j}\n\n"
"Command line: {2!j}\n\n"
"Environment variables: {3!j}\n\n",
self.debuggee_id,
cwd,
args,
env,
)
self.debuggee = psutil.Popen(
args,
cwd=cwd,
env=env.for_popen(),
bufsize=0,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
log.info("Spawned {0} with PID={1}", self.debuggee_id, self.debuggee.pid)
watchdog.register_spawn(self.debuggee.pid, self.debuggee_id)
if self.captured_output:
self.captured_output = output.CapturedOutput(self)
def wait_for_enable_attach(self):
log.info(
"Waiting for debug server in {0} to open a listener socket...",
self.debuggee_id,
)
while not self.listener_file.check():
time.sleep(0.1)
def spawn_adapter(self):
assert self.adapter is None
assert self.channel is None
args = [sys.executable, os.path.dirname(ptvsd.adapter.__file__)]
env = self._make_env(self.spawn_adapter.env)
log.info(
"Spawning {0}:\n\n"
"Command line: {1!j}\n\n"
"Environment variables: {2!j}\n\n",
self.adapter_id,
args,
env,
)
self.adapter = psutil.Popen(
args,
bufsize=0,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
env=env.for_popen(),
)
log.info("Spawned {0} with PID={1}", self.adapter_id, self.adapter.pid)
watchdog.register_spawn(self.adapter.pid, self.adapter_id)
stream = messaging.JsonIOStream.from_process(self.adapter, name=self.adapter_id)
self._start_channel(stream)
def connect_to_adapter(self, address):
assert self.channel is None
host, port = address
log.info("Connecting to {0} at {1}:{2}", self.adapter_id, host, port)
sock = sockets.create_client()
sock.connect(address)
stream = messaging.JsonIOStream.from_socket(sock, name=self.adapter_id)
self._start_channel(stream)
def request(self, *args, **kwargs):
freeze = kwargs.pop("freeze", True)
@ -159,6 +406,8 @@ class Session(object):
message = self.channel.send_request(command, arguments)
request = self.timeline.record_request(message)
if command in ("launch", "attach"):
self.start_request = request
# Register callback after recording the request, so that there's no race
# between it being recorded, and the response to it being received.
@ -167,15 +416,33 @@ class Session(object):
return request
def _process_event(self, event):
if event.event == "ptvsd_subprocess":
occ = self.timeline.record_event(event, block=False)
if event.event == "exited":
self.observe(occ)
self.exit_code = event("exitCode", int)
assert self.exit_code == self.expected_exit_code
elif event.event == "ptvsd_subprocess":
self.observe(occ)
pid = event("processId", int)
watchdog.register_spawn(pid, fmt("{0}-subprocess-{1}", self, pid))
self.timeline.record_event(event, block=False)
watchdog.register_spawn(
pid, fmt("{0}-subprocess-{1}", self.debuggee_id, pid)
)
def _process_request(self, request):
self.timeline.record_request(request, block=False)
if request.command == "runInTerminal":
return self.start_method.run_in_terminal(request)
args = request("args", json.array(unicode))
cwd = request("cwd", ".")
env = request("env", json.object(unicode))
try:
exe = args.pop(0)
assert not len(self.spawn_debuggee.env)
self.spawn_debuggee.env = env
self.spawn_debuggee(args, cwd, exe=exe)
return {}
except OSError as exc:
log.exception('"runInTerminal" failed:')
raise request.cant_handle(str(exc))
else:
raise request.isnt_valid("not supported")
@ -197,28 +464,7 @@ class Session(object):
def _process_disconnect(self):
self.timeline.mark("disconnect", block=False)
def _start_adapter(self):
args = [sys.executable, debug.PTVSD_ADAPTER_DIR]
if self.log_dir is not None:
args += ["--log-dir", self.log_dir]
args = [compat.filename_str(s) for s in args]
env = os.environ.copy()
env.update(debug.PTVSD_ENV)
env = {
compat.filename_str(k): compat.filename_str(v) for k, v in env.items()
}
log.info("Spawning {0}: {1!j}", self.adapter_id, args)
self.adapter_process = psutil.Popen(
args, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE, env=env
)
log.info("Spawned {0} with PID={1}", self.adapter_id, self.adapter_process.pid)
watchdog.register_spawn(self.adapter_process.pid, self.adapter_id)
stream = messaging.JsonIOStream.from_process(
self.adapter_process, name=str(self)
)
def _start_channel(self, stream):
handlers = messaging.MessageHandlers(
request=self._process_request,
event=self._process_event,
@ -227,7 +473,6 @@ class Session(object):
self.channel = messaging.JsonMessageChannel(stream, handlers)
self.channel.start()
def _handshake(self):
telemetry = self.wait_for_next_event("output")
assert telemetry == {
"category": "telemetry",
@ -235,58 +480,83 @@ class Session(object):
"data": {"version": some.str},
}
self.send_request(
self.request(
"initialize",
{
"pathFormat": "path",
"clientID": self.client_id,
# "clientName":"Visual Studio Code",
"adapterID": "test",
"linesStartAt1": True,
"columnsStartAt1": True,
"supportsVariableType": True,
"supportsRunInTerminalRequest": True,
# "supportsMemoryReferences":true,
# "supportsHandshakeRequest":true,
# "AdditionalProperties":{}
},
).wait_for_response()
def configure(self, run_as, target, env=None, **kwargs):
env = {} if env is None else dict(env)
env.update(debug.PTVSD_ENV)
pythonpath = env.get("PYTHONPATH", "")
if pythonpath:
pythonpath += os.pathsep
pythonpath += (tests.root / "DEBUGGEE_PYTHONPATH").strpath
pythonpath += os.pathsep + (debug.PTVSD_DIR / "..").strpath
env["PYTHONPATH"] = pythonpath
env["PTVSD_SESSION_ID"] = str(self.id)
if self.backchannel is not None:
self.backchannel.listen()
env["PTVSD_BACKCHANNEL_PORT"] = str(self.backchannel.port)
if self.log_dir is not None:
kwargs["logToFile"] = True
self.captured_output = output.CaptureOutput(self)
self.start_method.configure(run_as, target, env=env, **kwargs)
def start_debugging(self):
start_request = self.start_method.start_debugging()
process = self.wait_for_next_event("process", freeze=False)
assert process == some.dict.containing(
{
"startMethod": start_request.command,
"name": some.str,
"isLocalProcess": True,
"systemProcessId": some.int,
}
)
def all_events(self, event, body=some.object):
return [
occ.body
for occ in self.timeline.all_occurrences_of(timeline.Event(event, body))
]
def output(self, category):
"""Returns all output of a given category as a single string, assembled from
all the "output" events received for that category so far.
"""
events = self.all_events("output", some.dict.containing({"category": category}))
return "".join(event("output", unicode) for event in events)
def _request_start(self, method):
self.config.normalize()
start_request = self.send_request(method, self.config)
def wait_for_process_event():
process = self.wait_for_next_event("process", freeze=False)
assert process == some.dict.containing(
{
"startMethod": self.start_request.command,
"name": some.str,
"isLocalProcess": True,
"systemProcessId": some.int,
}
)
# Depending on whether it's "noDebug" or not, we either get the "initialized"
# event, or an immediate response to our request.
self.timeline.wait_until_realized(
timeline.Event("initialized") | timeline.Response(start_request),
freeze=True,
)
if start_request.response is not None:
# It was an immediate response - configuration is not possible. Just get
# the "process" event, and return to caller.
return wait_for_process_event()
# We got "initialized" - now we need to yield to the caller, so that it can
# configure the session before it starts running, and then give control back
# to us to finalize the configuration sequence. A nested context manager is
# used to ensure that all code up to this point executes eagerly.
@contextlib.contextmanager
def configure():
yield
self.request("configurationDone")
start_request.wait_for_response()
wait_for_process_event()
return configure()
def request_launch(self):
if "PYTHONPATH" in self.config.env:
# If specified, launcher will use it in lieu of PYTHONPATH it inherited
# from the adapter when spawning debuggee, so we need to adjust again.
self.config.env.prepend_to("PYTHONPATH", DEBUGGEE_PYTHONPATH.strpath)
return self._request_start("launch")
def request_attach(self):
return self._request_start("attach")
def request_continue(self):
self.request("continue", freeze=False)
@ -383,6 +653,11 @@ class Session(object):
"""
return self.get_variables(varname, frame_id=frame_id)[0]
def wait_for_next_event(self, event, body=some.object, freeze=True):
return self.timeline.wait_for_next(
timeline.Event(event, body), freeze=freeze
).body
def wait_for_stop(
self,
reason=some.str,
@ -423,29 +698,39 @@ class Session(object):
fid = frames[0]("id", int)
return StopInfo(stopped, frames, tid, fid)
def wait_for_next_event(self, event, body=some.object, freeze=True):
return self.timeline.wait_for_next(
timeline.Event(event, body), freeze=freeze
).body
def wait_for_next_subprocess(self):
raise NotImplementedError
def output(self, category):
"""Returns all output of a given category as a single string, assembled from
all the "output" events received for that category so far.
"""
events = self.all_occurrences_of(
timeline.Event("output", some.dict.containing({"category": category}))
)
return "".join(event("output", unicode) for event in events)
def wait_for_disconnect(self):
self.timeline.wait_until_realized(timeline.Mark("disconnect"), freeze=True)
def wait_for_exit(self):
if self.debuggee is not None:
try:
self.debuggee.wait()
except Exception:
pass
finally:
watchdog.unregister_spawn(self.debuggee.pid, self.debuggee_id)
self.timeline.wait_until_realized(timeline.Event("terminated"))
# FIXME: "exited" event is not properly reported in attach scenarios at the
# moment, so the exit code is only checked if it's present.
if self.start_request.command == "launch":
assert self.exit_code is not None
if self.debuggee is not None and self.exit_code is not None:
assert self.debuggee.returncode == self.exit_code
return self.exit_code
def captured_stdout(self, encoding=None):
assert self.debuggee is not None
return self.captured_output.stdout(encoding)
def captured_stderr(self, encoding=None):
assert self.debuggee is not None
return self.captured_output.stderr(encoding)
def wait_for_disconnect(self):
self.timeline.wait_for_next(timeline.Mark("disconnect"))
def disconnect(self, force=False):
if self.channel is None:
return
@ -453,6 +738,7 @@ class Session(object):
try:
if not force:
self.request("disconnect")
self.timeline.wait_until_realized(timeline.Event("terminated"))
except messaging.JsonIOError:
pass
finally:

View file

@ -1,577 +0,0 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import absolute_import, division, print_function, unicode_literals
import os
import ptvsd
import psutil
import py.path
import pytest
import subprocess
import sys
import time
from ptvsd.common import compat, fmt, json, log
from ptvsd.common.compat import unicode
from tests import net, timeline, watchdog
from tests.patterns import some
PTVSD_DIR = py.path.local(ptvsd.__file__) / ".."
PTVSD_PORT = net.get_test_server_port(5678, 5800)
# Code that is injected into the debuggee process when it does `import debug_me`,
# and start_method is attach_socket_*
PTVSD_DEBUG_ME = """
import ptvsd
ptvsd.enable_attach(("127.0.0.1", {ptvsd_port}), log_dir={log_dir!r})
ptvsd.wait_for_attach()
"""
class DebugStartBase(object):
ignore_unobserved = []
def __init__(self, session, method="base"):
self.session = session
self.method = method
self.debuggee_process = None
self.expected_exit_code = None
def start_debugging(self, **kwargs):
pass
def wait_for_debuggee(self):
# TODO: Exit should not be restricted to launch tests only
if "launch" in self.method:
exited = self.session.timeline.wait_until_realized(
timeline.Event("exited")
).body
assert exited == some.dict.containing(
{
"exitCode": some.int
if self.expected_exit_code is None
else self.expected_exit_code
}
)
self.session.timeline.wait_until_realized(timeline.Event("terminated"))
if self.debuggee_process is None:
return
try:
self.debuggee_process.wait()
except Exception:
pass
finally:
watchdog.unregister_spawn(
self.debuggee_process.pid, self.session.debuggee_id
)
def run_in_terminal(self, request, **kwargs):
raise request.isnt_valid("not supported")
def _build_common_args(
self,
args,
showReturnValue=None,
justMyCode=True,
subProcess=None,
django=None,
jinja=None,
flask=None,
pyramid=None,
logToFile=None,
redirectOutput=True,
noDebug=None,
maxExceptionStackFrames=None,
steppingResumesAllThreads=None,
rules=None,
successExitCodes=None,
breakOnSystemExitZero=None,
pathMappings=None,
):
if logToFile:
args["logToFile"] = logToFile
if "env" in args:
args["env"]["PTVSD_LOG_DIR"] = self.session.log_dir
if showReturnValue:
args["showReturnValue"] = showReturnValue
args["debugOptions"] += ["ShowReturnValue"]
if redirectOutput:
args["redirectOutput"] = redirectOutput
args["debugOptions"] += ["RedirectOutput"]
if justMyCode is False:
# default behavior is Just-my-code = true
args["justMyCode"] = justMyCode
args["debugOptions"] += ["DebugStdLib"]
if django:
args["django"] = django
args["debugOptions"] += ["Django"]
if jinja:
args["jinja"] = jinja
args["debugOptions"] += ["Jinja"]
if flask:
args["flask"] = flask
args["debugOptions"] += ["Flask"]
if pyramid:
args["pyramid"] = pyramid
args["debugOptions"] += ["Pyramid"]
# VS Code uses noDebug in both attach and launch cases. Even though
# noDebug on attach does not make any sense.
if noDebug:
args["noDebug"] = True
if subProcess:
args["subProcess"] = subProcess
args["debugOptions"] += ["Multiprocess"]
if maxExceptionStackFrames:
args["maxExceptionStackFrames"] = maxExceptionStackFrames
if steppingResumesAllThreads is not None:
args["steppingResumesAllThreads"] = steppingResumesAllThreads
if rules is not None:
args["rules"] = rules
if successExitCodes:
args["successExitCodes"] = successExitCodes
if breakOnSystemExitZero:
args["debugOptions"] += ["BreakOnSystemExitZero"]
if pathMappings is not None:
args["pathMappings"] = pathMappings
def __str__(self):
return self.method
class Launch(DebugStartBase):
def __init__(self, session):
super(Launch, self).__init__(session, "launch")
self._launch_args = None
def _build_launch_args(
self,
launch_args,
run_as,
target,
pythonPath=sys.executable,
args=(),
cwd=None,
env=None,
stopOnEntry=None,
gevent=None,
sudo=None,
waitOnNormalExit=None,
waitOnAbnormalExit=None,
console="externalTerminal",
internalConsoleOptions="neverOpen",
**kwargs
):
assert console in ("internalConsole", "integratedTerminal", "externalTerminal")
env = {} if env is None else dict(env)
debug_options = []
launch_args.update(
{
"name": "Terminal",
"type": "python",
"request": "launch",
"console": console,
"env": env,
"pythonPath": pythonPath,
"args": args,
"internalConsoleOptions": internalConsoleOptions,
"debugOptions": debug_options,
}
)
if stopOnEntry:
launch_args["stopOnEntry"] = stopOnEntry
debug_options += ["StopOnEntry"]
if gevent:
launch_args["gevent"] = gevent
env["GEVENT_SUPPORT"] = "True"
if sudo:
launch_args["sudo"] = sudo
if waitOnNormalExit:
debug_options += ["WaitOnNormalExit"]
if waitOnAbnormalExit:
debug_options += ["WaitOnAbnormalExit"]
target_str = target
if isinstance(target, py.path.local):
target_str = target.strpath
if cwd:
launch_args["cwd"] = cwd
elif os.path.isfile(target_str) or os.path.isdir(target_str):
launch_args["cwd"] = os.path.dirname(target_str)
else:
launch_args["cwd"] = os.getcwd()
if "PYTHONPATH" not in env:
env["PYTHONPATH"] = ""
if run_as == "program":
launch_args["program"] = target_str
elif run_as == "module":
if os.path.isfile(target_str) or os.path.isdir(target_str):
env["PYTHONPATH"] += os.pathsep + os.path.dirname(target_str)
try:
launch_args["module"] = target_str[
(len(os.path.dirname(target_str)) + 1) : -3
]
except Exception:
launch_args["module"] = "code_to_debug"
else:
launch_args["module"] = target_str
elif run_as == "code":
with open(target_str, "rb") as f:
launch_args["code"] = f.read().decode("utf-8")
else:
pytest.fail()
self._build_common_args(launch_args, **kwargs)
return launch_args
def configure(self, run_as, target, **kwargs):
self._launch_args = self._build_launch_args({}, run_as, target, **kwargs)
self.no_debug = self._launch_args.get("noDebug", False)
if not self.no_debug:
self._launch_request = self.session.send_request(
"launch", self._launch_args
)
self.session.wait_for_next_event("initialized")
def start_debugging(self):
if self.no_debug:
self._launch_request = self.session.send_request(
"launch", self._launch_args
)
else:
self.session.request("configurationDone")
self._launch_request.wait_for_response(freeze=False)
return self._launch_request
def run_in_terminal(self, request):
args = request("args", json.array(unicode))
cwd = request("cwd", ".")
env = os.environ.copy()
env.pop("COV_CORE_SOURCE", None) # disable codecov subprocess hook
env.update(request("env", json.object(unicode)))
if sys.version_info < (3,):
args = [compat.filename_str(s) for s in args]
env = {
compat.filename_str(k): compat.filename_str(v) for k, v in env.items()
}
log.info(
'{0} spawning {1} via "runInTerminal" request',
self.session,
self.session.debuggee_id,
)
self.debuggee_process = psutil.Popen(
args,
cwd=cwd,
env=env,
bufsize=0,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
watchdog.register_spawn(self.debuggee_process.pid, self.session.debuggee_id)
self.session.captured_output.capture(self.debuggee_process)
return {}
class AttachBase(DebugStartBase):
ignore_unobserved = DebugStartBase.ignore_unobserved + []
def __init__(self, session, method):
super(AttachBase, self).__init__(session, method)
self._attach_args = {}
def _build_attach_args(
self,
attach_args,
run_as,
target,
host="127.0.0.1",
port=PTVSD_PORT,
**kwargs
):
assert host is not None
assert port is not None
debug_options = []
attach_args.update(
{
"name": "Attach",
"type": "python",
"request": "attach",
"debugOptions": debug_options,
}
)
attach_args["host"] = host
attach_args["port"] = port
self._build_common_args(attach_args, **kwargs)
return attach_args
def configure(self, run_as, target, **kwargs):
target_str = target
if isinstance(target, py.path.local):
target_str = target.strpath
env = os.environ.copy()
env.pop("COV_CORE_SOURCE", None) # disable codecov subprocess hook
env.update(kwargs["env"])
cli_args = kwargs.get("cli_args")
if run_as == "program":
cli_args += [target_str]
elif run_as == "module":
if os.path.isfile(target_str) or os.path.isdir(target_str):
env["PYTHONPATH"] += os.pathsep + os.path.dirname(target_str)
try:
module = target_str[(len(os.path.dirname(target_str)) + 1) : -3]
except Exception:
module = "code_to_debug"
else:
module = target_str
cli_args += ["-m", module]
elif run_as == "code":
with open(target_str, "rb") as f:
cli_args += ["-c", f.read()]
else:
pytest.fail()
cli_args += kwargs.get("args")
cli_args = [compat.filename_str(s) for s in cli_args]
env = {compat.filename_str(k): compat.filename_str(v) for k, v in env.items()}
cwd = kwargs.get("cwd")
if cwd:
pass
elif os.path.isfile(target_str) or os.path.isdir(target_str):
cwd = os.path.dirname(target_str)
else:
cwd = os.getcwd()
if "pathMappings" not in self._attach_args:
self._attach_args["pathMappings"] = [{"localRoot": cwd, "remoteRoot": "."}]
env_str = "\n".join((fmt(" {0}={1}", k, env[k]) for k in sorted(env.keys())))
log.info(
"Spawning {0}: {1!j}\n\n" "with cwd:\n {2!j}\n\n" "with env:\n{3}",
self.session.debuggee_id,
cli_args,
cwd,
env_str,
)
self.debuggee_process = psutil.Popen(
cli_args,
cwd=cwd,
env=env,
bufsize=0,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
watchdog.register_spawn(self.debuggee_process.pid, self.session.debuggee_id)
self.session.captured_output.capture(self.debuggee_process)
pid = self.debuggee_process.pid
if self.method == "attach_pid":
self._attach_args["processId"] = pid
else:
log.info(
"Waiting for {0} to open listener socket...", self.session.debuggee_id
)
for i in range(0, 100):
connections = psutil.net_connections()
if any(p == pid for (_, _, _, _, _, _, p) in connections):
break
time.sleep(0.1)
else:
log.warning("Couldn't detect open listener socket; proceeding anyway.")
self._attach_request = self.session.send_request("attach", self._attach_args)
self.session.wait_for_next_event("initialized")
def start_debugging(self):
self.session.request("configurationDone")
self.no_debug = self._attach_args.get("noDebug", False)
if self.no_debug:
log.info('{0} ignoring "noDebug" in "attach"', self.session)
self._attach_request.wait_for_response()
return self._attach_request
class AttachSocketImport(AttachBase):
def __init__(self, session):
super(AttachSocketImport, self).__init__(session, "attach_socket_import")
def _check_ready_for_import(self, path_or_code):
if isinstance(path_or_code, py.path.local):
path_or_code = path_or_code.strpath
if os.path.isfile(path_or_code):
with open(path_or_code, "rb") as f:
code = f.read()
elif "\n" in path_or_code:
code = path_or_code
else:
# path_or_code is a module name
return
assert b"debug_me" in code, fmt(
"{0} is started via {1}, but it doesn't import debug_me.",
path_or_code,
self.method,
)
def configure(
self,
run_as,
target,
pythonPath=sys.executable,
args=(),
cwd=None,
env=None,
**kwargs
):
env = {} if env is None else dict(env)
self._attach_args = self._build_attach_args({}, run_as, target, **kwargs)
ptvsd_port = self._attach_args["port"]
log_dir = (
self.session.log_dir
if not self._attach_args.get("logToFile", False)
else None
)
env["PTVSD_DEBUG_ME"] = fmt(
PTVSD_DEBUG_ME, ptvsd_port=ptvsd_port, log_dir=log_dir
)
self._check_ready_for_import(target)
cli_args = [pythonPath]
super(AttachSocketImport, self).configure(
run_as, target, cwd=cwd, env=env, args=args, cli_args=cli_args, **kwargs
)
class AttachSocketCmdLine(AttachBase):
def __init__(self, session):
super(AttachSocketCmdLine, self).__init__(session, "attach_socket_cmdline")
def configure(
self,
run_as,
target,
pythonPath=sys.executable,
args=(),
cwd=None,
env=None,
**kwargs
):
env = {} if env is None else dict(env)
self._attach_args = self._build_attach_args({}, run_as, target, **kwargs)
cli_args = [pythonPath]
cli_args += [PTVSD_DIR.strpath]
cli_args += ["--wait"]
cli_args += [
"--host",
self._attach_args["host"],
"--port",
str(self._attach_args["port"]),
]
log_dir = (
self.session.log_dir if self._attach_args.get("logToFile", False) else None
)
if log_dir:
cli_args += ["--log-dir", log_dir]
if self._attach_args.get("subProcess", False):
cli_args += ["--multiprocess"]
super(AttachSocketCmdLine, self).configure(
run_as, target, cwd=cwd, env=env, args=args, cli_args=cli_args, **kwargs
)
class AttachProcessId(AttachBase):
def __init__(self, session):
super(AttachProcessId, self).__init__(session, "attach_pid")
def configure(
self,
run_as,
target,
pythonPath=sys.executable,
args=(),
cwd=None,
env=None,
**kwargs
):
env = {} if env is None else dict(env)
self._attach_args = self._build_attach_args({}, run_as, target, **kwargs)
log_dir = (
self.session.log_dir if self._attach_args.get("logToFile", False) else None
)
if log_dir:
self._attach_args["ptvsdArgs"] = ["--log-dir", log_dir]
cli_args = [pythonPath]
super(AttachProcessId, self).configure(
run_as, target, cwd=cwd, env=env, args=args, cli_args=cli_args, **kwargs
)
class CustomServer(DebugStartBase):
def __init__(self, session):
super().__init__(session, "custom_server")
class CustomClient(DebugStartBase):
def __init__(self, session):
super().__init__(session, "custom_client")
__all__ = [
Launch, # ptvsd --client ... foo.py
AttachSocketCmdLine, # ptvsd ... foo.py
AttachSocketImport, # python foo.py (foo.py must import debug_me)
AttachProcessId, # python foo.py && ptvsd ... --pid
CustomClient, # python foo.py (foo.py has to manually call ptvsd.attach)
CustomServer, # python foo.py (foo.py has to manually call ptvsd.enable_attach)
]

167
tests/debug/targets.py Normal file
View file

@ -0,0 +1,167 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import absolute_import, division, print_function, unicode_literals
import py
from ptvsd.common import fmt
from tests.patterns import some
class Target(object):
"""Describes Python code that gets run by a Runner.
"""
def __init__(self, filename, args=()):
if filename is not None and not isinstance(filename, py.path.local):
filename = py.path.local(filename)
self.filename = filename
self.args = args
if self.filename is None:
self.code = None
else:
with open(self.filename.strpath, "rb") as f:
self.code = f.read().decode("utf-8")
def configure(self, session):
"""Configures the session to execute this target.
This should only modify session.config, but gets access to the entire session
to retrieve information about it.
"""
raise NotImplementedError
def cli(self, env):
"""Provides the command line arguments, suitable for passing to python or
python -m ptvsd, to execute this target.
Returns command line arguments as a list, e.g. ["-m", "module"].
If any environment variables are needed to properly interpret the command
line - e.g. PYTHONPATH - the implementation should send them in env.
"""
raise NotImplementedError
@property
def co_filename(self):
"""co_filename of code objects created at runtime from the source that this
Target describes, assuming no path mapping.
"""
assert (
self.filename is not None
), "co_filename requires Target created from filename"
return self.filename.strpath
@property
def source(self):
"""DAP "source" JSON for this Target."""
return some.dap.source(py.path.local(self.co_filename))
@property
def lines(self):
"""Same as self.filename.lines, if it is valid - e.g. for @pyfile objects.
"""
assert (
self.filename is not None
), "lines() requires Target created from filename"
return self.filename.lines
class Program(Target):
"""A Python script, executed directly: python foo.py
"""
pytest_id = "program"
def __repr__(self):
return fmt("program {0!j}", self.filename)
def configure(self, session):
session.config["program"] = (
[self.filename] + self.args if len(self.args) else self.filename
)
def cli(self, env):
return [self.filename.strpath] + list(self.args)
class Module(Target):
"""A Python module, executed by name: python -m foo.bar
If created from a filename, the module name is the name of the file, and the
Target will automatically add a PYTHONPATH entry.
"""
pytest_id = "module"
def __init__(self, filename=None, name=None, args=()):
assert (filename is None) ^ (name is None)
super(Module, self).__init__(filename, args)
self.name = name if name is not None else self.filename.purebasename
def __repr__(self):
return fmt("module {0}", self.name)
def configure(self, session):
session.config["module"] = (
[self.name] + self.args if len(self.args) else self.name
)
def cli(self, env):
if self.filename is not None:
env.prepend_to("PYTHONPATH", self.filename.dirname)
return ["-m", self.name] + list(self.args)
class Code(Target):
"""A snippet of Python code: python -c "print('foo')"
If created from a filename, the code is the contents of the file.
"""
pytest_id = "code"
def __init__(self, filename=None, code=None, args=()):
assert (filename is None) ^ (code is None)
super(Code, self).__init__(filename, args)
if code is not None:
self.code = code
def __repr__(self):
lines = self.code.split("\n")
return fmt("code: {0!j}", lines)
def configure(self, session):
session.config["code"] = (
[self.code] + self.args if len(self.args) else self.code
)
def cli(self, env):
return ["-c", self.code] + list(self.args)
@property
def co_filename(self):
return "<string>"
@property
def source(self):
"""DAP "source" JSON for this Target."""
return some.dap.source("<string>")
all_named = [Program, Module]
"""All targets that produce uniquely named code objects at runtime, and thus can
have breakpoints set in them.
"""
all_unnamed = [Code]
"""All targets that produce unnamed code objects at runtime, and thus cannot have
breakpoints set in them.
"""
all = all_named + all_unnamed