Fix #1713: Adapter: multiple concurrent sessions

This commit is contained in:
Pavel Minaev 2019-10-20 16:24:22 -07:00 committed by Pavel Minaev
parent 608803cb99
commit 095e5bcd5c
17 changed files with 833 additions and 421 deletions

13
.vscode/launch.json vendored
View file

@ -17,10 +17,10 @@
// For these, ptvsd.adapter must be started first via the above configuration.
{
//"debugServer": 8765,
"name": "Launch Python file [debugServer]",
"type": "python",
"request": "launch",
"debugServer": 8765,
//"console": "internalConsole",
"console": "integratedTerminal",
//"console": "externalTerminal",
@ -30,12 +30,21 @@
//"ptvsdArgs": ["--log-stderr"],
},
{
//"debugServer": 8765,
"name": "Attach [debugServer]",
"type": "python",
"request": "attach",
"debugServer": 8765,
"host": "localhost",
"port": 5678,
},
{
//"debugServer": 8765,
"name": "Attach Child Process [debugServer]",
"type": "python",
"request": "attach",
"host": "localhost",
"port": 5678,
"subProcessId": 00000,
},
]
}

View file

@ -19,7 +19,7 @@ __file__ = os.path.abspath(__file__)
def main(args):
from ptvsd.common import log, options as common_options
from ptvsd.adapter import session, options as adapter_options
from ptvsd.adapter import ide, server, session, options as adapter_options
if args.log_stderr:
log.stderr.levels |= set(log.LEVELS)
@ -30,30 +30,30 @@ def main(args):
log.to_file(prefix="ptvsd.adapter")
log.describe_environment("ptvsd.adapter startup environment:")
session = session.Session()
if args.for_enable_attach and args.port is None:
log.error("--for-enable-attach requires --port")
sys.exit(64)
server_host, server_port = server.listen()
ide_host, ide_port = ide.listen(port=args.port)
if args.for_enable_attach:
endpoints = {
"ide": {"host": ide_host, "port": ide_port},
"server": {"host": server_host, "port": server_port}
}
log.info("Sending endpoints to stdout: {0!r}", endpoints)
print(json.dumps(endpoints))
sys.stdout.flush()
if args.port is None:
session.connect_to_ide()
else:
if args.for_enable_attach:
# Users may want the adapter to choose the port for them, by setting port==0.
# For example, the Python Data Science extension uses this mode in enable_attach.
# Let enable_attach know the port that users should use to connect to the adapter.
with session.accept_connection_from_ide((args.host, args.port)) as (adapter_host, adapter_port):
# This mode is used only for enable_attach. Here, we always connect to
# adapter from the debug server as client. Adapter needs to start a listener
# and provide that port to debug server.
with session.accept_connection_from_server() as (server_host, server_port):
connection_details = {
"adapter": {"host": adapter_host, "port": adapter_port},
"server": {"host": server_host, "port": server_port}
}
log.info("Writing to stdout for enable_attach: {0!r}", connection_details)
print(json.dumps(connection_details))
sys.stdout.flush()
else:
with session.accept_connection_from_ide((args.host, args.port)) as (_, adapter_port):
pass
session.wait_for_completion()
ide.IDE("stdio")
server.wait_until_disconnected()
log.info("All debug servers disconnected; waiting for remaining sessions...")
session.wait_until_ended()
log.info("All debug sessions have ended; exiting.")
def _parse_argv(argv):

View file

@ -34,19 +34,34 @@ class Component(util.Observable):
to wait_for() a change caused by another component.
"""
def __init__(self, session, stream):
def __init__(self, session, stream=None, channel=None):
assert (stream is None) ^ (channel is None)
try:
lock_held = session.lock.acquire(blocking=False)
assert lock_held, "__init__ of a Component subclass must lock its Session"
finally:
session.lock.release()
super(Component, self).__init__()
self.session = session
stream.name = str(self)
self.channel = messaging.JsonMessageChannel(stream, self)
if channel is None:
stream.name = str(self)
channel = messaging.JsonMessageChannel(stream, self)
channel.start()
else:
channel.name = channel.stream.name = str(self)
channel.handlers = self
self.channel = channel
self.is_connected = True
self.observers += [lambda *_: session.notify_changed()]
self.channel.start()
# Do this last to avoid triggering useless notifications for assignments above.
self.observers += [lambda *_: self.session.notify_changed()]
def __str__(self):
return fmt("{0}-{1}", type(self).__name__, self.session.id)
return fmt("{0}[{1}]", type(self).__name__, self.session.id)
@property
def ide(self):

View file

@ -4,15 +4,17 @@
from __future__ import absolute_import, print_function, unicode_literals
import os
import platform
import sys
import ptvsd
from ptvsd.common import json, log, messaging
from ptvsd.common import json, log, messaging, sockets
from ptvsd.common.compat import unicode
from ptvsd.adapter import components
from ptvsd.adapter import components, server, session
class IDE(components.Component):
class IDE(components.Component, sockets.ClientConnection):
"""Handles the IDE side of a debug session."""
message_handler = components.Component.message_handler
@ -33,23 +35,43 @@ class IDE(components.Component):
"pathFormat": json.enum("path"), # we don't support "uri"
}
def __init__(self, session, stream):
super(IDE, self).__init__(session, stream)
def __init__(self, sock):
if sock == "stdio":
log.info("Connecting to IDE over stdio...", self)
stream = messaging.JsonIOStream.from_stdio()
# Make sure that nothing else tries to interfere with the stdio streams
# that are going to be used for DAP communication from now on.
sys.stdout = sys.stderr
sys.stdin = open(os.devnull, "r")
else:
stream = messaging.JsonIOStream.from_socket(sock)
self.client_id = None
"""ID of the connecting client. This can be 'test' while running tests."""
with session.Session() as new_session:
super(IDE, self).__init__(new_session, stream)
self._initialize_request = None
"""The "initialize" request as received from the IDE, to propagate to the
server later."""
self.client_id = None
"""ID of the connecting client. This can be 'test' while running tests."""
self._deferred_events = []
"""Deferred events from the launcher and the server that must be propagated
only if and when the "launch" or "attach" response is sent.
"""
self.has_started = False
"""Whether the "launch" or "attach" request was received from the IDE, and
fully handled.
"""
assert not session.ide
session.ide = self
self.start_request = None
"""The "launch" or "attach" request as received from the IDE.
"""
self._initialize_request = None
"""The "initialize" request as received from the IDE, to propagate to the
server later."""
self._deferred_events = []
"""Deferred events from the launcher and the server that must be propagated
only if and when the "launch" or "attach" response is sent.
"""
new_session.ide = self
new_session.register()
self.channel.send_event(
"output",
@ -135,11 +157,14 @@ class IDE(components.Component):
assert request.is_request("launch", "attach")
if self._initialize_request is None:
raise request.isnt_valid("Session is not initialized yet")
if self.launcher:
if self.launcher or self.server:
raise request.isnt_valid("Session is already started")
self.session.no_debug = request("noDebug", json.default(False))
self.session.debug_options = set(
if self.session.no_debug:
server.dont_expect_connections()
self.session.debug_options = debug_options = set(
request("debugOptions", json.array(unicode))
)
@ -149,19 +174,31 @@ class IDE(components.Component):
self.server.initialize(self._initialize_request)
self._initialize_request = None
arguments = request.arguments
if self.launcher and "RedirectOutput" in debug_options:
# The launcher is doing output redirection, so we don't need the
# server to do it, as well.
arguments = dict(arguments)
arguments["debugOptions"] = list(debug_options - {"RedirectOutput"})
# pydevd doesn't send "initialized", and responds to the start request
# immediately, without waiting for "configurationDone". If it changes
# to conform to the DAP spec, we'll need to defer waiting for response.
self.server.channel.delegate(request)
try:
self.server.channel.request(request.command, arguments)
except messaging.MessageHandlingError as exc:
exc.propagate(request)
if self.session.no_debug:
self.start_request = request
self.has_started = True
request.respond({})
self._propagate_deferred_events()
return
if {"WindowsClient", "Windows"} & self.session.debug_options:
if {"WindowsClient", "Windows"} & debug_options:
client_os_type = "WINDOWS"
elif {"UnixClient", "UNIX"} & self.session.debug_options:
elif {"UnixClient", "UNIX"} & debug_options:
client_os_type = "UNIX"
else:
client_os_type = "WINDOWS" if platform.system() == "Windows" else "UNIX"
@ -178,13 +215,15 @@ class IDE(components.Component):
# Let the IDE know that it can begin configuring the adapter.
self.channel.send_event("initialized")
self._start_request = request
self.start_request = request
return messaging.NO_RESPONSE # will respond on "configurationDone"
return handle
@_start_message_handler
def launch_request(self, request):
from ptvsd.adapter import launcher
sudo = request("sudo", json.default("Sudo" in self.session.debug_options))
if sudo:
if platform.system() == "Windows":
@ -222,58 +261,101 @@ class IDE(components.Component):
)
console_title = request("consoleTitle", json.default("Python Debug Console"))
self.session.spawn_debuggee(request, sudo, args, console, console_title)
if "RedirectOutput" in self.session.debug_options:
# The launcher is doing output redirection, so we don't need the server.
request.arguments["debugOptions"].remove("RedirectOutput")
launcher.spawn_debuggee(
self.session, request, sudo, args, console, console_title
)
@_start_message_handler
def attach_request(self, request):
if self.session.no_debug:
raise request.isnt_valid('"noDebug" is not supported for "attach"')
# There are four distinct possibilities here.
#
# If "processId" is specified, this is attach-by-PID. We need to inject the
# debug server into the designated process, and then wait until it connects
# back to us. Since the injected server can crash, there must be a timeout.
#
# If "subProcessId" is specified, this is attach to a known subprocess, likely
# in response to a "ptvsd_attach" event. If so, the debug server should be
# connected already, and thus the wait timeout is zero.
#
# If neither are specified, but "listen" is true, this is attach-by-socket
# with the server expected to connect to the adapter via ptvsd.attach(). There
# is no PID known in advance, so just wait until the first server connection
# indefinitely, with no timeout.
#
# If neither are specified, but "listen" is false, this is attach-by-socket
# in which the server has spawned the adapter via ptvsd.enable_attach(). There
# is no PID known to the IDE in advance, but the server connection should be
# there already, so the wait timeout is zero.
#
# In the last two cases, if there's more than one server connection already,
# this is a multiprocess re-attach. The IDE doesn't know the PID, so we just
# connect it to the oldest server connection that we have - in most cases, it
# will be the one for the root debuggee process, but if it has exited already,
# it will be some subprocess.
pid = request("processId", int, optional=True)
if pid == ():
# When the adapter is spawned by the debug server, it is connected to the
# latter from the get go, and "host" and "port" in the "attach" request
# are actually the host and port on which the adapter itself was listening,
# so we can ignore those.
if self.server:
return
host = request("host", "127.0.0.1")
port = request("port", int)
if request("listen", False):
with self.accept_connection_from_server((host, port)):
pass
else:
self.session.connect_to_server((host, port))
else:
if self.server:
sub_pid = request("subProcessId", int, optional=True)
if pid != ():
if sub_pid != ():
raise request.isnt_valid(
'"attach" with "processId" cannot be serviced by adapter '
"that is already associated with a debug server"
'"processId" and "subProcessId" are mutually exclusive'
)
ptvsd_args = request("ptvsdArgs", json.array(unicode))
self.session.inject_server(pid, ptvsd_args)
server.inject(pid, ptvsd_args)
timeout = 10
else:
if sub_pid == ():
pid = any
timeout = None if request("waitForAttach", False) else 0
else:
pid = sub_pid
timeout = 0
conn = server.wait_for_connection(pid, timeout)
if conn is None:
raise request.cant_handle(
(
"Timed out waiting for injected debug server to connect"
if timeout
else "There is no debug server connected to this adapter."
if pid is any
else 'No known subprocess with "subProcessId":{0}'
),
pid,
)
try:
conn.attach_to_session(self.session)
except ValueError:
request.cant_handle("Debuggee with PID={0} is already being debugged.", pid)
@message_handler
def configurationDone_request(self, request):
if self._start_request is None:
if self.start_request is None or self.has_started:
request.cant_handle(
'"configurationDone" is only allowed during handling of a "launch" '
'or an "attach" request'
)
try:
self.has_started = True
request.respond(self.server.channel.delegate(request))
except messaging.MessageHandlingError as exc:
self.start_request.cant_handle(str(exc))
finally:
self._start_request.respond({})
self._start_request = None
self.start_request.respond({})
self._propagate_deferred_events()
# Notify the IDE of any child processes of the debuggee that aren't already
# being debugged.
for conn in server.connections():
if conn.server is None and conn.ppid == self.session.pid:
# FIXME: race condition with server.Connection()
self.notify_of_subprocess(conn)
@message_handler
def pause_request(self, request):
request.arguments["threadId"] = "*"
@ -312,8 +394,37 @@ class IDE(components.Component):
@message_handler
def disconnect_request(self, request):
self.session.finalize(
'IDE requested "disconnect"',
request("terminateDebuggee", json.default(bool(self.launcher))),
)
terminate_debuggee = request("terminateDebuggee", bool, optional=True)
if terminate_debuggee == ():
terminate_debuggee = None
self.session.finalize('IDE requested "disconnect"', terminate_debuggee)
return {}
def notify_of_subprocess(self, conn):
with self.session:
if self.start_request is None:
return
if "processId" in self.start_request.arguments:
log.warning(
"Not reporting subprocess for {0}, because the parent process "
'was attached to using "processId" rather than "port".',
self.session,
)
return
log.info("Notifying {0} about {1}.", self, conn)
body = dict(self.start_request.arguments)
body["request"] = "attach"
if "host" not in body:
body["host"] = "127.0.0.1"
if "port" not in body:
_, body["port"] = self.listener.getsockname()
if "processId" in body:
del body["processId"]
body["subProcessId"] = conn.pid
self.channel.send_event("ptvsd_attach", body)
listen = IDE.listen

View file

@ -4,7 +4,13 @@
from __future__ import absolute_import, print_function, unicode_literals
from ptvsd.adapter import components
import os
import subprocess
import sys
import ptvsd.launcher
from ptvsd.common import compat, log, messaging, options as common_options
from ptvsd.adapter import components, options as adapter_options, server
class Launcher(components.Component):
@ -13,16 +19,17 @@ class Launcher(components.Component):
message_handler = components.Component.message_handler
def __init__(self, session, stream):
super(Launcher, self).__init__(session, stream)
with session:
assert not session.launcher
super(Launcher, self).__init__(session, stream)
self.pid = None
"""Process ID of the debuggee process, as reported by the launcher."""
self.pid = None
"""Process ID of the debuggee process, as reported by the launcher."""
self.exit_code = None
"""Exit code of the debuggee process."""
self.exit_code = None
"""Exit code of the debuggee process."""
assert not session.launcher
session.launcher = self
session.launcher = self
@message_handler
def process_event(self, event):
@ -47,3 +54,77 @@ class Launcher(components.Component):
def terminated_event(self, event):
self.ide.channel.send_event("exited", {"exitCode": self.exit_code})
self.channel.close()
def spawn_debuggee(session, start_request, sudo, args, console, console_title):
cmdline = ["sudo"] if sudo else []
cmdline += [sys.executable, os.path.dirname(ptvsd.launcher.__file__)]
cmdline += args
env = {str("PTVSD_SESSION_ID"): str(session.id)}
def spawn_launcher():
with session.accept_connection_from_launcher() as (_, launcher_port):
env[str("PTVSD_LAUNCHER_PORT")] = str(launcher_port)
if common_options.log_dir is not None:
env[str("PTVSD_LOG_DIR")] = compat.filename_str(common_options.log_dir)
if adapter_options.log_stderr:
env[str("PTVSD_LOG_STDERR")] = str("debug info warning error")
if console == "internalConsole":
log.info("{0} spawning launcher: {1!r}", session, cmdline)
# If we are talking to the IDE over stdio, sys.stdin and sys.stdout are
# redirected to avoid mangling the DAP message stream. Make sure the
# launcher also respects that.
subprocess.Popen(
cmdline,
env=dict(list(os.environ.items()) + list(env.items())),
stdin=sys.stdin,
stdout=sys.stdout,
stderr=sys.stderr,
)
else:
log.info('{0} spawning launcher via "runInTerminal" request.', session)
session.ide.capabilities.require("supportsRunInTerminalRequest")
kinds = {
"integratedTerminal": "integrated",
"externalTerminal": "external",
}
session.ide.channel.request(
"runInTerminal",
{
"kind": kinds[console],
"title": console_title,
"args": cmdline,
"env": env,
},
)
try:
session.launcher.channel.request(start_request.command, arguments)
except messaging.MessageHandlingError as exc:
exc.propagate(start_request)
if session.no_debug:
arguments = start_request.arguments
spawn_launcher()
else:
_, port = server.Connection.listener.getsockname()
arguments = dict(start_request.arguments)
arguments["port"] = port
spawn_launcher()
if not session.wait_for(lambda: session.pid is not None, timeout=5):
raise start_request.cant_handle(
'{0} timed out waiting for "process" event from {1}',
session,
session.launcher,
)
conn = server.wait_for_connection(session.pid, timeout=10)
if conn is None:
raise start_request.cant_handle(
"{0} timed out waiting for debuggee to spawn", session
)
conn.attach_to_session(session)

View file

@ -2,11 +2,128 @@
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import absolute_import, print_function, unicode_literals
from __future__ import absolute_import, division, print_function, unicode_literals
import os
import subprocess
import sys
import threading
import time
import ptvsd
from ptvsd.common import compat, fmt, json, log, messaging, sockets
from ptvsd.adapter import components
_lock = threading.RLock()
_connections = []
"""All servers that are connected to this adapter, in order in which they connected.
"""
_connections_changed = threading.Event()
class Connection(sockets.ClientConnection):
"""A debug server that is connected to the adapter.
Servers that are not participating in a debug session are managed directly by the
corresponding Connection instance.
Servers that are participating in a debug session are managed by that sessions's
Server component instance, but Connection object remains, and takes over again
once the session ends.
"""
def __init__(self, sock):
from ptvsd.adapter import session
self.server = None
"""The Server component, if this debug server belongs to Session.
"""
self.pid = None
stream = messaging.JsonIOStream.from_socket(sock, str(self))
self.channel = messaging.JsonMessageChannel(stream, self)
self.channel.start()
try:
info = self.channel.request("pydevdSystemInfo")
process_info = info("process", json.object())
self.pid = process_info("pid", int)
self.ppid = process_info("ppid", int, optional=True)
if self.ppid == ():
self.ppid = None
self.channel.name = stream.name = str(self)
with _lock:
if any(conn.pid == self.pid for conn in _connections):
raise KeyError(
fmt("{0} is already connected to this adapter", self)
)
_connections.append(self)
_connections_changed.set()
except Exception:
log.exception("Failed to accept incoming server connection:")
# If we couldn't retrieve all the necessary info from the debug server,
# or there's a PID clash, we don't want to track this debuggee anymore,
# but we want to continue accepting connections.
self.channel.close()
return
parent_session = session.get(self.ppid)
if parent_session is None:
log.info("No active debug session for parent process of {0}.", self)
else:
try:
parent_session.ide.notify_of_subprocess(self)
except Exception:
# This might fail if the IDE concurrently disconnects from the parent
# session. We still want to keep the connection around, in case the
# IDE reconnects later. If the parent session was "launch", it'll take
# care of closing the remaining server connections.
log.exception("Failed to notify parent session about {0}:", self)
def __str__(self):
return "Server" + fmt("[?]" if self.pid is None else "[pid={0}]", self.pid)
def request(self, request):
raise request.isnt_valid(
"Requests from the debug server to the IDE are not allowed."
)
def event(self, event):
pass
def terminated_event(self, event):
self.channel.close()
def disconnect(self):
with _lock:
# If the disconnect happened while Server was being instantiated, we need
# to tell it, so that it can clean up properly via Session.finalize(). It
# will also take care of deregistering the connection in that case.
if self.server is not None:
self.server.disconnect()
elif self in _connections:
_connections.remove(self)
_connections_changed.set()
def attach_to_session(self, session):
"""Attaches this server to the specified Session as a Server component.
Raises ValueError if the server already belongs to some session.
"""
with _lock:
if self.server is not None:
raise ValueError
log.info("Attaching {0} to {1}", self, session)
self.server = Server(session, self)
class Server(components.Component):
"""Handles the debug server side of a debug session."""
@ -45,14 +162,38 @@ class Server(components.Component):
"supportedChecksumAlgorithms": [],
}
def __init__(self, session, stream):
super(Server, self).__init__(session, stream)
def __init__(self, session, connection):
assert connection.server is None
with session:
assert not session.server
super(Server, self).__init__(session, channel=connection.channel)
self.pid = None
self.connection = connection
if self.launcher:
assert self.session.pid is not None
else:
assert self.session.pid is None
if self.session.pid is not None and self.session.pid != self.pid:
log.warning(
"Launcher reported PID={0}, but server reported PID={1}",
self.session.pid,
self.pid,
)
else:
self.session.pid = self.pid
session.server = self
@property
def pid(self):
"""Process ID of the debuggee process, as reported by the server."""
return self.connection.pid
assert not session.server
session.server = self
@property
def ppid(self):
"""Parent process ID of the debuggee process, as reported by the server."""
return self.connection.ppid
def initialize(self, request):
assert request.is_request("initialize")
@ -60,10 +201,6 @@ class Server(components.Component):
request.wait_for_response()
self.capabilities = self.Capabilities(self, request.response)
def set_debugger_property(self, arguments):
assert isinstance(arguments, dict)
self.channel.request("setDebuggerProperty", arguments=arguments)
# Generic request handler, used if there's no specific handler below.
@message_handler
def request(self, request):
@ -88,22 +225,6 @@ class Server(components.Component):
@message_handler
def process_event(self, event):
self.pid = event("systemProcessId", int)
if self.launcher:
assert self.session.pid is not None
else:
assert self.session.pid is None
if self.session.pid is not None and self.session.pid != self.pid:
event.cant_handle(
'"process" event mismatch: launcher reported "systemProcessId":{0}, '
'but server reported "systemProcessId":{1}',
self.session.pid,
self.pid,
)
else:
self.session.pid = self.pid
# If there is a launcher, it's handling the process event.
if not self.launcher:
self.ide.propagate_after_start(event)
@ -141,4 +262,117 @@ class Server(components.Component):
@message_handler
def terminated_event(self, event):
# Do not propagate this, since we'll report our own.
pass
self.channel.close()
def detach_from_session(self):
with _lock:
self.is_connected = False
self.channel.handlers = self.connection
self.channel.name = self.channel.stream.name = str(self.connection)
self.connection.server = None
def disconnect(self):
with _lock:
_connections.remove(self.connection)
_connections_changed.set()
super(Server, self).disconnect()
listen = Connection.listen
def connections():
with _lock:
return list(_connections)
def wait_for_connection(pid=any, timeout=None):
"""Waits until there is a server with the specified PID connected to this adapter,
and returns the corresponding Connection.
If there is more than one server connection already available, returns the oldest
one.
"""
def wait_for_timeout():
time.sleep(timeout)
wait_for_timeout.timed_out = True
with _lock:
_connections_changed.set()
wait_for_timeout.timed_out = timeout == 0
if timeout:
thread = threading.Thread(
target=wait_for_timeout, name="server.wait_for_connection() timeout"
)
thread.daemon = True
thread.start()
if timeout != 0:
log.info(
"Waiting for connection from debug server..."
if pid is any
else "Waiting for connection from debug server with PID={0}...",
pid,
)
while True:
with _lock:
_connections_changed.clear()
conns = (conn for conn in _connections if pid is any or conn.pid == pid)
conn = next(conns, None)
if conn is not None or wait_for_timeout.timed_out:
return conn
_connections_changed.wait()
def wait_until_disconnected():
"""Blocks until all debug servers disconnect from the adapter.
If there are no server connections, waits until at least one is established first,
before waiting for it to disconnect.
"""
while True:
_connections_changed.wait()
with _lock:
_connections_changed.clear()
if not len(_connections):
return
def dont_expect_connections():
"""Unblocks any pending wait_until_disconnected() call that is waiting on the
first server to connect.
"""
with _lock:
_connections_changed.set()
def inject(pid, ptvsd_args):
host, port = Connection.listener.getsockname()
cmdline = [
sys.executable,
compat.filename(os.path.dirname(ptvsd.__file__)),
"--client",
"--host",
host,
"--port",
str(port),
]
cmdline += ptvsd_args
cmdline += ["--pid", str(pid)]
log.info("Spawning attach-to-PID debugger injector: {0!r}", cmdline)
try:
subprocess.Popen(
cmdline,
bufsize=0,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except Exception as exc:
log.exception("Failed to inject debug server into process with PID={0}", pid)
raise messaging.MessageHandlingError(
"Failed to inject debug server into process with PID={0}: {1}", pid, exc
)

View file

@ -7,23 +7,16 @@ from __future__ import absolute_import, print_function, unicode_literals
import contextlib
import itertools
import os
import subprocess
import sys
import threading
import time
import ptvsd
import ptvsd.launcher
from ptvsd.common import (
compat,
fmt,
log,
messaging,
options as common_options,
sockets,
util,
)
from ptvsd.adapter import components, ide, launcher, options as adapter_options, server
from ptvsd.common import fmt, log, messaging, sockets, util
from ptvsd.adapter import components, launcher, server
_lock = threading.RLock()
_sessions = set()
_sessions_changed = threading.Event()
class Session(util.Observable):
@ -36,6 +29,8 @@ class Session(util.Observable):
_counter = itertools.count(1)
def __init__(self):
from ptvsd.adapter import ide
super(Session, self).__init__()
self.lock = threading.RLock()
@ -81,17 +76,25 @@ class Session(util.Observable):
"""Unlock the session."""
self.lock.release()
def wait_for_completion(self):
self.ide.channel.wait()
if self.launcher:
self.launcher.channel.wait()
if self.server:
self.server.channel.wait()
def register(self):
with _lock:
_sessions.add(self)
_sessions_changed.set()
def notify_changed(self):
with self:
self._changed_condition.notify_all()
# A session is considered ended once all components disconnect, and there
# are no further incoming messages from anything to handle.
components = self.ide, self.launcher, self.server
if all(not com or not com.is_connected for com in components):
with _lock:
if self in _sessions:
log.info("{0} has ended.", self)
_sessions.remove(self)
_sessions_changed.set()
def wait_for(self, predicate, timeout=None):
"""Waits until predicate() becomes true.
@ -130,35 +133,6 @@ class Session(util.Observable):
self._changed_condition.wait()
return True
def connect_to_ide(self):
"""Sets up a DAP message channel to the IDE over stdio.
"""
log.info("{0} connecting to IDE over stdio...", self)
stream = messaging.JsonIOStream.from_stdio()
# Make sure that nothing else tries to interfere with the stdio streams
# that are going to be used for DAP communication from now on.
sys.stdout = sys.stderr
sys.stdin = open(os.devnull, "r")
ide.IDE(self, stream)
def connect_to_server(self, address):
"""Sets up a DAP message channel to the server.
The channel is established by connecting to the TCP socket listening on the
specified address
"""
host, port = address
log.info("{0} connecting to Server on {1}:{2}...", self, host, port)
sock = sockets.create_client()
sock.connect(address)
stream = messaging.JsonIOStream.from_socket(sock)
server.Server(self, stream)
@contextlib.contextmanager
def _accept_connection_from(self, what, address, timeout=None):
"""Sets up a listening socket, accepts an incoming connection on it, sets
@ -199,110 +173,14 @@ class Session(util.Observable):
stream = messaging.JsonIOStream.from_socket(sock, what)
what(self, stream)
def accept_connection_from_ide(self, address):
return self._accept_connection_from(ide.IDE, address)
def accept_connection_from_server(self, address=("127.0.0.1", 0)):
return self._accept_connection_from(server.Server, address, timeout=10)
def _accept_connection_from_launcher(self, address=("127.0.0.1", 0)):
def accept_connection_from_launcher(self, address=("127.0.0.1", 0)):
return self._accept_connection_from(launcher.Launcher, address, timeout=10)
def spawn_debuggee(self, request, sudo, args, console, console_title):
cmdline = ["sudo"] if sudo else []
cmdline += [sys.executable, os.path.dirname(ptvsd.launcher.__file__)]
cmdline += args
env = {str("PTVSD_SESSION_ID"): str(self.id)}
def spawn_launcher():
with self._accept_connection_from_launcher() as (_, launcher_port):
env[str("PTVSD_LAUNCHER_PORT")] = str(launcher_port)
if common_options.log_dir is not None:
env[str("PTVSD_LOG_DIR")] = compat.filename_str(
common_options.log_dir
)
if adapter_options.log_stderr:
env[str("PTVSD_LOG_STDERR")] = str("debug info warning error")
if console == "internalConsole":
# If we are talking to the IDE over stdio, sys.stdin and sys.stdout are
# redirected to avoid mangling the DAP message stream. Make sure the
# launcher also respects that.
subprocess.Popen(
cmdline,
env=dict(list(os.environ.items()) + list(env.items())),
stdin=sys.stdin,
stdout=sys.stdout,
stderr=sys.stderr,
)
else:
self.ide.capabilities.require("supportsRunInTerminalRequest")
kinds = {
"integratedTerminal": "integrated",
"externalTerminal": "external",
}
self.ide.channel.request(
"runInTerminal",
{
"kind": kinds[console],
"title": console_title,
"args": cmdline,
"env": env,
},
)
self.launcher.channel.delegate(request)
if self.no_debug:
spawn_launcher()
else:
with self.accept_connection_from_server() as (_, server_port):
request.arguments["port"] = server_port
spawn_launcher()
# Don't accept connection from server until launcher sends us the
# "process" event, to avoid a race condition between the launcher
# and the server.
if not self.wait_for(lambda: self.pid is not None, timeout=5):
raise request.cant_handle(
'Session timed out waiting for "process" event from {0}',
self.launcher,
)
def inject_server(self, pid, ptvsd_args):
with self.accept_connection_from_server() as (host, port):
cmdline = [
sys.executable,
compat.filename(os.path.dirname(ptvsd.__file__)),
"--client",
"--host",
host,
"--port",
str(port),
]
cmdline += ptvsd_args
cmdline += ["--pid", str(pid)]
log.info(
"{0} spawning attach-to-PID debugger injector: {1!r}", self, cmdline
)
try:
subprocess.Popen(
cmdline,
bufsize=0,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except Exception as exc:
log.exception("{0} failed to inject debugger", self)
raise messaging.MessageHandlingError(
fmt("Failed to inject debugger: {0}", exc)
)
def finalize(self, why, terminate_debuggee=False):
def finalize(self, why, terminate_debuggee=None):
"""Finalizes the debug session.
If the server is present, sends "disconnect" request with "terminateDebuggee"
set as specified) request to it; waits for it to disconnect, allowing any
set as specified request to it; waits for it to disconnect, allowing any
remaining messages from it to be handled; and closes the server channel.
If the launcher is present, sends "terminate" request to it, regardless of the
@ -310,6 +188,9 @@ class Session(util.Observable):
from it to be handled; and closes the launcher channel.
If the IDE is present, sends "terminated" event to it.
If terminate_debuggee=None, it is treated as True if the session has a Launcher
component, and False otherwise.
"""
if self.is_finalizing:
@ -317,6 +198,9 @@ class Session(util.Observable):
self.is_finalizing = True
log.info("{0}; finalizing {1}.", why, self)
if terminate_debuggee is None:
terminate_debuggee = bool(self.launcher)
try:
self._finalize(why, terminate_debuggee)
except Exception:
@ -328,26 +212,15 @@ class Session(util.Observable):
log.info("{0} finalized.", self)
def _finalize(self, why, terminate_debuggee):
if self.server and self.server.is_connected:
try:
self.server.channel.request(
"disconnect", {"terminateDebuggee": terminate_debuggee}
)
except Exception:
pass
try:
self.server.channel.close()
except Exception:
log.exception()
# Wait until the server message queue fully drains - there won't be any
# more events after close(), but there may still be pending responses.
log.info("{0} waiting for {1} to disconnect...", self, self.server)
if not self.wait_for(lambda: not self.server.is_connected, timeout=5):
log.warning(
"{0} timed out waiting for {1} to disconnect.", self, self.server
)
if self.server:
if self.server.is_connected:
try:
self.server.channel.request(
"disconnect", {"terminateDebuggee": terminate_debuggee}
)
except Exception:
pass
self.server.detach_from_session()
if self.launcher and self.launcher.is_connected:
# If there was a server, we just disconnected from it above, which should
@ -385,3 +258,21 @@ class Session(util.Observable):
self.ide.channel.send_event("terminated")
except Exception:
pass
def get(pid):
with _lock:
return next((session for session in _sessions if session.pid == pid), None)
def wait_until_ended():
"""Blocks until all sessions have ended.
A session ends when all components that it manages disconnect from it.
"""
while True:
_sessions_changed.wait()
with _lock:
_sessions_changed.clear()
if not len(_sessions):
return

View file

@ -1166,6 +1166,7 @@ class JsonMessageChannel(object):
self.stream = stream
self.handlers = handlers
self.name = name if name is not None else stream.name
self.started = False
self._lock = threading.RLock()
self._closed = False
self._seq_iter = itertools.count(1)
@ -1209,6 +1210,10 @@ class JsonMessageChannel(object):
Incoming messages, including responses to requests, will not be processed at
all until this is invoked.
"""
assert not self.started
self.started = True
self._parser_thread = threading.Thread(
target=self._parse_incoming_messages, name=fmt("{0} message parser", self)
)
@ -1510,7 +1515,7 @@ class JsonMessageChannel(object):
if closed and handler in (Event._handle, Request._handle):
continue
with log.prefixed("[handling {0}]\n", what.describe()):
with log.prefixed("/handling {0}/\n", what.describe()):
try:
handler()
except Exception:
@ -1522,16 +1527,20 @@ class JsonMessageChannel(object):
"""Returns the handler for a message of a given type.
"""
with self:
handlers = self.handlers
for handler_name in (name + "_" + type, type):
try:
return getattr(self.handlers, handler_name)
return getattr(handlers, handler_name)
except AttributeError:
continue
raise AttributeError(
fmt(
"Channel {0} has no handler for {1} {2!r}",
compat.srcnameof(self.handlers),
"handler object {0} for channel {1} has no handler for {2} {3!r}",
compat.srcnameof(handlers),
self,
type,
name,
)

View file

@ -6,12 +6,18 @@ from __future__ import absolute_import, print_function, unicode_literals
import platform
import socket
import threading
from ptvsd.common import log
def create_server(host, port, timeout=None):
"""Return a local server socket listening on the given port."""
if host is None:
host = "127.0.0.1"
if port is None:
port = 0
try:
server = _new_sock()
server.bind((host, port))
@ -50,3 +56,44 @@ def close_socket(sock):
except Exception:
pass
sock.close()
class ClientConnection(object):
listener = None
"""After listen() is invoked, this is the socket listening for connections.
"""
@classmethod
def listen(cls, host=None, port=0, timeout=None):
"""Accepts TCP connections on the specified host and port, and creates a new
instance of this class wrapping every accepted socket.
"""
assert cls.listener is None
cls.listener = create_server(host, port, timeout)
host, port = cls.listener.getsockname()
log.info(
"Waiting for incoming {0} connections on {1}:{2}...",
cls.__name__,
host,
port,
)
def accept_worker():
while True:
sock, (other_host, other_port) = cls.listener.accept()
log.info(
"Accepted incoming {0} connection from {1}:{2}.",
cls.__name__,
other_host,
other_port,
)
cls(sock)
thread = threading.Thread(target=accept_worker)
thread.daemon = True
thread.pydev_do_not_trace = True
thread.is_pydev_daemon_thread = True
thread.start()
return host, port

View file

@ -22,6 +22,8 @@ def evaluate(code, path=__file__, mode="eval"):
class Observable(object):
"""An object with change notifications."""
observers = () # used when attributes are set before __init__ is invoked
def __init__(self):
self.observers = []

View file

@ -117,6 +117,7 @@ def enable_attach(dont_trace_start_patterns, dont_trace_end_patterns):
host=host,
port=port,
suspend=False,
patch_multiprocessing=server_opts.multiprocess,
wait_for_ready_to_run=False,
block_until_connected=True,
dont_trace_start_patterns=dont_trace_start_patterns,
@ -127,7 +128,7 @@ def enable_attach(dont_trace_start_patterns, dont_trace_end_patterns):
# Ensure that we ignore the adapter process when terminating the debugger.
pydevd.add_dont_terminate_child_pid(process.pid)
server_opts.port = connection_details["adapter"]["port"]
server_opts.port = connection_details["ide"]["port"]
listener_file = os.getenv("PTVSD_LISTENER_FILE")
if listener_file is not None:

View file

@ -207,8 +207,6 @@ def attach_by_socket(
if wait:
args += ["--wait"]
args += ["--host", compat.filename_str(host), "--port", str(port)]
if not config["subProcess"]:
args += ["--no-subprocesses"]
if log_dir is not None:
args += ["--log-dir", log_dir]
debug_me = None

View file

@ -76,7 +76,7 @@ class Session(object):
_counter = itertools.count(1)
def __init__(self):
def __init__(self, debug_config=None):
assert Session.tmpdir is not None
watchdog.start()
@ -127,7 +127,9 @@ class Session(object):
"""
self.config = config.DebugConfig(
{
debug_config
if debug_config is not None
else {
"justMyCode": True,
"name": "Test",
"redirectOutput": True,
@ -404,6 +406,19 @@ class Session(object):
stream = messaging.JsonIOStream.from_socket(sock, name=self.adapter_id)
self._start_channel(stream)
def start(self):
config = self.config
request = config.get("request", None)
if request == "attach":
host = config["host"]
port = config["port"]
self.connect_to_adapter((host, port))
return self.request_attach()
else:
raise ValueError(
fmt('Unsupported "request":{0!j} in session.config', request)
)
def request(self, *args, **kwargs):
freeze = kwargs.pop("freeze", True)
raise_if_failed = kwargs.pop("raise_if_failed", True)
@ -434,9 +449,9 @@ class Session(object):
self.observe(occ)
self.exit_code = event("exitCode", int)
assert self.exit_code == self.expected_exit_code
elif event.event == "ptvsd_subprocess":
elif event.event == "ptvsd_attach":
self.observe(occ)
pid = event("processId", int)
pid = event("subProcessId", int)
watchdog.register_spawn(
pid, fmt("{0}-subprocess-{1}", self.debuggee_id, pid)
)
@ -726,7 +741,7 @@ class Session(object):
return StopInfo(stopped, frames, tid, fid)
def wait_for_next_subprocess(self):
raise NotImplementedError
return Session(self.wait_for_next_event("ptvsd_attach"))
def wait_for_disconnect(self):
self.timeline.wait_until_realized(timeline.Mark("disconnect"), freeze=True)

View file

@ -9,7 +9,7 @@ import pytest
from ptvsd.common import compat
from tests import code, debug, log, net, test_data
from tests.debug import runners, targets
from tests.debug import targets
from tests.patterns import some
pytestmark = pytest.mark.timeout(60)
@ -29,12 +29,8 @@ class lines:
@pytest.fixture
@pytest.mark.parametrize("run", [runners.launch, runners.attach_by_socket["cli"]])
def start_flask(run):
def start(session, multiprocess=False):
if multiprocess:
pytest.skip("https://github.com/microsoft/ptvsd/issues/1706")
# No clean way to kill Flask server, expect non-zero exit code
session.expected_exit_code = some.int
@ -49,14 +45,30 @@ def start_flask(run):
locale = "en_US.utf8" if platform.system() == "Linux" else "en_US.UTF-8"
session.config.env.update({"LC_ALL": locale, "LANG": locale})
session.config.update({"jinja": True, "subProcess": bool(multiprocess)})
session.config.update({"jinja": True, "subProcess": multiprocess})
args = ["run"]
if not multiprocess:
args += ["--no-debugger", "--no-reload", "--with-threads"]
args = ["run", "--no-debugger", "--with-threads"]
if multiprocess:
args += ["--reload"]
else:
args += ["--no-reload"]
args += ["--port", str(flask_server.port)]
return run(session, targets.Module(name="flask", args=args), cwd=paths.flask1)
if multiprocess and run.request == "attach":
# For multiproc attach, we need to use a helper stub to import debug_me
# before running Flask; otherwise, we will get the connection only from
# the subprocess, not from the Flask server process.
target = targets.Code(
code=(
"import debug_me, runpy;"
"runpy.run_module('flask', run_name='__main__')"
),
args=args,
)
else:
target = targets.Module(name="flask", args=args)
return run(session, target, cwd=paths.flask1)
return start
@ -208,12 +220,8 @@ def test_flask_breakpoint_multiproc(start_flask):
with start_flask(parent_session, multiprocess=True):
parent_session.set_breakpoints(paths.app_py, [bp_line])
child_pid = parent_session.wait_for_next_subprocess()
with debug.Session() as child_session:
# TODO: this is wrong, but we don't have multiproc attach
# yet, so update this when that is done
# https://github.com/microsoft/ptvsd/issues/1776
with child_session.attach_by_pid(child_pid):
with parent_session.wait_for_next_subprocess() as child_session:
with child_session.start():
child_session.set_breakpoints(paths.app_py, [bp_line])
with flask_server:

View file

@ -15,131 +15,124 @@ from tests.patterns import some
from tests.timeline import Event, Request
pytestmark = pytest.mark.skip("https://github.com/microsoft/ptvsd/issues/1706")
# pytestmark = pytest.mark.skip("https://github.com/microsoft/ptvsd/issues/1706")
@pytest.mark.timeout(30)
@pytest.mark.skipif(
platform.system() != "Windows",
reason="Debugging multiprocessing module only works on Windows",
)
@pytest.mark.parametrize(
"start_method", [runners.launch, runners.attach_by_socket["cli"]]
"start_method",
[""]
if sys.version_info < (3,)
else ["spawn"]
if platform.system() == "Windows"
else ["spawn", "fork"],
)
def test_multiprocessing(pyfile, start_method, run_as):
def test_multiprocessing(pyfile, target, run, start_method):
@pyfile
def code_to_debug():
import multiprocessing
import platform
import sys
import debug_me # noqa
import multiprocessing
import os
import sys
def child_of_child(q):
print("entering child of child")
assert q.get() == 2
q.put(3)
print("leaving child of child")
def child(q):
print("entering child")
assert q.get() == 1
print("spawning child of child")
p = multiprocessing.Process(target=child_of_child, args=(q,))
p.start()
p.join()
assert q.get() == 3
q.put(4)
print("leaving child")
if __name__ == "__main__":
def parent(q, a):
from debug_me import backchannel
if sys.version_info >= (3, 4):
multiprocessing.set_start_method("spawn")
else:
assert platform.system() == "Windows"
print("spawning child")
q = multiprocessing.Queue()
p = multiprocessing.Process(target=child, args=(q,))
p = multiprocessing.Process(target=child, args=(q, a))
p.start()
print("child spawned")
backchannel.send(p.pid)
q.put(1)
q.put("child_pid?")
what, child_pid = a.get()
assert what == "child_pid"
backchannel.send(child_pid)
q.put("grandchild_pid?")
what, grandchild_pid = a.get()
assert what == "grandchild_pid"
backchannel.send(grandchild_pid)
assert backchannel.receive() == "continue"
q.put(2)
q.put("exit!")
p.join()
assert q.get() == 4
q.close()
backchannel.send("done")
with debug.Session(start_method) as parent_session:
parent_backchannel = parent_session.setup_backchannel()
parent_session.debug_options |= {"Multiprocess"}
parent_session.configure(run_as, code_to_debug)
parent_session.start_debugging()
def child(q, a):
print("entering child")
assert q.get() == "child_pid?"
a.put(("child_pid", os.getpid()))
root_start_request, = parent_session.all_occurrences_of(
Request("launch") | Request("attach")
)
root_process, = parent_session.all_occurrences_of(Event("process"))
root_pid = int(root_process.body["systemProcessId"])
print("spawning child of child")
p = multiprocessing.Process(target=grandchild, args=(q, a))
p.start()
p.join()
child_pid = parent_backchannel.receive()
print("leaving child")
child_subprocess = parent_session.wait_for_next(Event("ptvsd_subprocess"))
assert child_subprocess == Event(
"ptvsd_subprocess",
def grandchild(q, a):
print("entering grandchild")
assert q.get() == "grandchild_pid?"
a.put(("grandchild_pid", os.getpid()))
assert q.get() == "exit!"
print("leaving grandchild")
if __name__ == "__main__":
start_method = sys.argv[1]
if start_method != "":
multiprocessing.set_start_method(start_method)
q = multiprocessing.Queue()
a = multiprocessing.Queue()
try:
parent(q, a)
finally:
q.close()
a.close()
with debug.Session() as parent_session:
parent_backchannel = parent_session.open_backchannel()
with run(parent_session, target(code_to_debug, args=[start_method])):
pass
expected_child_config = dict(parent_session.config)
expected_child_config.update(
{
"rootProcessId": root_pid,
"parentProcessId": root_pid,
"processId": child_pid,
"request": "attach",
"subProcessId": some.int,
"host": some.str,
"port": some.int,
"rootStartRequest": {
"seq": some.int,
"type": "request",
"command": root_start_request.command,
"arguments": root_start_request.arguments,
},
},
}
)
child_config = parent_session.wait_for_next_event("ptvsd_attach")
assert child_config == expected_child_config
parent_session.proceed()
with parent_session.attach_to_subprocess(child_subprocess) as child_session:
child_session.start_debugging()
with debug.Session(child_config) as child_session:
with child_session.start():
pass
grandchild_subprocess = parent_session.wait_for_next(
Event("ptvsd_subprocess")
)
assert grandchild_subprocess == Event(
"ptvsd_subprocess",
expected_grandchild_config = dict(child_session.config)
expected_grandchild_config.update(
{
"rootProcessId": root_pid,
"parentProcessId": child_pid,
"processId": some.int,
"request": "attach",
"subProcessId": some.int,
"host": some.str,
"port": some.int,
"rootStartRequest": {
"seq": some.int,
"type": "request",
"command": root_start_request.command,
"arguments": root_start_request.arguments,
},
},
}
)
parent_session.proceed()
with parent_session.attach_to_subprocess(
grandchild_subprocess
) as grandchild_session:
grandchild_session.start_debugging()
grandchild_config = child_session.wait_for_next_event("ptvsd_attach")
assert grandchild_config == expected_grandchild_config
with debug.Session(grandchild_config) as grandchild_session:
with grandchild_session.start():
pass
parent_backchannel.send("continue")
assert parent_backchannel.receive() == "done"
@pytest.mark.timeout(30)
@pytest.mark.skipif(

View file

@ -1,3 +1,4 @@
import debug_me # noqa
from flask import Flask
from flask import render_template

View file

@ -12,11 +12,8 @@ from __future__ import absolute_import, print_function, unicode_literals
# this is done in main().
import collections
import os
import platform
import psutil
import sys
import tempfile
import time
@ -154,15 +151,15 @@ def main(tests_pid):
proc.pid,
)
if platform.system() == "Linux":
try:
# gcore will automatically add pid to the filename
core_file = os.path.join(tempfile.gettempdir(), "ptvsd_core")
gcore_cmd = fmt("gcore -o {0} {1}", core_file, proc.pid)
log.warning("WatchDog-{0}: {1}", tests_pid, gcore_cmd)
os.system(gcore_cmd)
except Exception:
log.exception()
# if platform.system() == "Linux":
# try:
# # gcore will automatically add pid to the filename
# core_file = os.path.join(tempfile.gettempdir(), "ptvsd_core")
# gcore_cmd = fmt("gcore -o {0} {1}", core_file, proc.pid)
# log.warning("WatchDog-{0}: {1}", tests_pid, gcore_cmd)
# os.system(gcore_cmd)
# except Exception:
# log.exception()
try:
proc.kill()