Fix #865: Debugging through poetry drops subprocess

Handle "exited" { "pydevdReason": "processReplaced" } appropriately.

Add test for os.exec() in-place process replacement.
This commit is contained in:
Pavel Minaev 2022-06-02 13:55:59 -07:00 committed by Pavel Minaev
parent 6d049b73ab
commit 0a9b01b008
9 changed files with 155 additions and 28 deletions

View file

@ -12,3 +12,9 @@ exclude = '''
| ^/src/debugpy/_version.py
)
'''
[tool.pyright]
pythonVersion = "3.7"
include = ["src"]
extraPaths = ["src/debugpy/_vendored/pydevd"]
ignore = ["src/debugpy/_vendored/pydevd", "src/debugpy/_version.py"]

View file

@ -2,6 +2,8 @@
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import annotations
import atexit
import os
import sys
@ -17,6 +19,10 @@ class Client(components.Component):
message_handler = components.Component.message_handler
known_subprocesses: set[servers.Connection]
"""Server connections to subprocesses that this client has been made aware of.
"""
class Capabilities(components.Capabilities):
PROPERTIES = {
"supportsVariableType": False,
@ -70,10 +76,7 @@ class Client(components.Component):
only if and when the "launch" or "attach" response is sent.
"""
self._known_subprocesses = set()
"""servers.Connection instances for subprocesses that this client has been
made aware of.
"""
self.known_subprocesses = set()
session.client = self
session.register()
@ -630,8 +633,9 @@ class Client(components.Component):
return {}
def notify_of_subprocess(self, conn):
log.info("{1} is a subprocess of {0}.", self, conn)
with self.session:
if self.start_request is None or conn in self._known_subprocesses:
if self.start_request is None or conn in self.known_subprocesses:
return
if "processId" in self.start_request.arguments:
log.warning(
@ -643,7 +647,8 @@ class Client(components.Component):
log.info("Notifying {0} about {1}.", self, conn)
body = dict(self.start_request.arguments)
self._known_subprocesses.add(conn)
self.known_subprocesses.add(conn)
self.session.notify_changed()
for key in "processId", "listen", "preLaunchTask", "postDebugTask":
body.pop(key, None)

View file

@ -2,6 +2,8 @@
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import annotations
import os
import subprocess
import sys
@ -37,14 +39,31 @@ class Connection(object):
once the session ends.
"""
disconnected: bool
process_replaced: bool
"""Whether this is a connection to a process that is being replaced in situ
by another process, e.g. via exec().
"""
server: Server | None
"""The Server component, if this debug server belongs to Session.
"""
pid: int | None
ppid: int | None
channel: messaging.JsonMessageChannel
def __init__(self, sock):
from debugpy.adapter import sessions
self.disconnected = False
self.process_replaced = False
self.server = None
"""The Server component, if this debug server belongs to Session.
"""
self.pid = None
@ -109,7 +128,13 @@ if 'debugpy' not in sys.modules:
if self.disconnected:
return
if any(conn.pid == self.pid for conn in _connections):
# An existing connection with the same PID and process_replaced == True
# corresponds to the process that replaced itself with this one, so it's
# not an error.
if any(
conn.pid == self.pid and not conn.process_replaced
for conn in _connections
):
raise KeyError(f"{self} is already connected to this adapter")
is_first_server = len(_connections) == 0
@ -130,9 +155,17 @@ if 'debugpy' not in sys.modules:
return
parent_session = sessions.get(self.ppid)
if parent_session is None:
parent_session = sessions.get(self.pid)
if parent_session is None:
log.info("No active debug session for parent process of {0}.", self)
else:
if self.pid == parent_session.pid:
parent_server = parent_session.server
if not (parent_server and parent_server.connection.process_replaced):
log.error("{0} is not expecting replacement.", parent_session)
self.channel.close()
return
try:
parent_session.client.notify_of_subprocess(self)
return
@ -218,6 +251,8 @@ class Server(components.Component):
message_handler = components.Component.message_handler
connection: Connection
class Capabilities(components.Capabilities):
PROPERTIES = {
"supportsCompletionsRequest": False,
@ -340,10 +375,17 @@ class Server(components.Component):
self.client.propagate_after_start(event)
@message_handler
def exited_event(self, event):
# If there is a launcher, it's handling the exit code.
if not self.launcher:
self.client.propagate_after_start(event)
def exited_event(self, event: messaging.Event):
if event("pydevdReason", str, optional=True) == "processReplaced":
# The parent process used some API like exec() that replaced it with another
# process in situ. The connection will shut down immediately afterwards, but
# we need to keep the corresponding session alive long enough to report the
# subprocess to it.
self.connection.process_replaced = True
else:
# If there is a launcher, it's handling the exit code.
if not self.launcher:
self.client.propagate_after_start(event)
@message_handler
def terminated_event(self, event):
@ -358,6 +400,27 @@ class Server(components.Component):
self.connection.server = None
def disconnect(self):
if self.connection.process_replaced:
# Wait for the replacement server to connect to the adapter, and to report
# itself to the client for this session if there is one.
log.info("{0} is waiting for replacement subprocess.", self)
session = self.session
if not session.client or not session.client.is_connected:
wait_for_connection(
session, lambda conn: conn.pid == self.pid, timeout=30
)
else:
self.wait_for(
lambda: (
not session.client
or not session.client.is_connected
or any(
conn.pid == self.pid
for conn in session.client.known_subprocesses
)
),
timeout=30,
)
with _lock:
_connections.remove(self.connection)
_connections_changed.set()
@ -383,8 +446,8 @@ def connections():
def wait_for_connection(session, predicate, timeout=None):
"""Waits until there is a server with the specified PID connected to this adapter,
and returns the corresponding Connection.
"""Waits until there is a server matching the specified predicate connected to
this adapter, and returns the corresponding Connection.
If there is more than one server connection already available, returns the oldest
one.

View file

@ -192,8 +192,9 @@ class Session(util.Observable):
if self.launcher and self.launcher.is_connected:
# If there was a server, we just disconnected from it above, which should
# cause the debuggee process to exit - so let's wait for that first.
if self.server:
# cause the debuggee process to exit, unless it is being replaced in situ -
# so let's wait for that first.
if self.server and not self.server.connection.process_replaced:
log.info('{0} waiting for "exited" event...', self)
if not self.wait_for(
lambda: self.launcher.exit_code is not None,
@ -203,12 +204,16 @@ class Session(util.Observable):
# Terminate the debuggee process if it's still alive for any reason -
# whether it's because there was no server to handle graceful shutdown,
# or because the server couldn't handle it for some reason.
self.launcher.terminate_debuggee()
# or because the server couldn't handle it for some reason - unless the
# process is being replaced in situ.
if not (self.server and self.server.connection.process_replaced):
self.launcher.terminate_debuggee()
# Wait until the launcher message queue fully drains. There is no timeout
# here, because the final "terminated" event will only come after reading
# user input in wait-on-exit scenarios.
# user input in wait-on-exit scenarios. In addition, if the process was
# replaced in situ, the launcher might still have more output to capture
# from its replacement.
log.info("{0} waiting for {1} to disconnect...", self, self.launcher)
self.wait_for(lambda: not self.launcher.is_connected)
@ -229,6 +234,7 @@ class Session(util.Observable):
if (
self.client.start_request is not None
and self.client.start_request.command == "launch"
and not (self.server and self.server.connection.process_replaced)
):
servers.stop_serving()
log.info(

View file

@ -9,6 +9,8 @@ responses, and events.
https://microsoft.github.io/debug-adapter-protocol/overview#base-protocol
"""
from __future__ import annotations
import collections
import contextlib
import functools
@ -460,7 +462,7 @@ class Message(object):
raise NotImplementedError
@property
def payload(self):
def payload(self) -> MessageDict:
"""Payload of the message - self.body or self.arguments, depending on the
message type.
"""

View file

@ -3,6 +3,7 @@
# for license information.
from __future__ import annotations
import functools
import typing
@ -17,10 +18,7 @@ from debugpy import _version
# than 72 characters per line! - and must be readable when retrieved via help().
# Type aliases and protocols must be guarded to avoid runtime errors due to unsupported
# syntax in Python <3.9; since they aren't annotations, they're eagerly evaluated!
if typing.TYPE_CHECKING:
Endpoint = tuple[str, int]
Endpoint = typing.Tuple[str, int]
def _api(cancelable=False):
@ -57,7 +55,7 @@ def log_to(__path: str) -> None:
@_api()
def configure(__properties: dict[str] = None, **kwargs) -> None:
def configure(__properties: dict[str, typing.Any] | None = None, **kwargs) -> None:
"""Sets debug configuration properties that cannot be set in the
"attach" request, because they must be applied as early as possible
in the process being debugged.
@ -113,7 +111,7 @@ def listen(__endpoint: Endpoint | int) -> Endpoint:
@_api()
def connect(__endpoint: Endpoint | int, *, access_token: str = None) -> Endpoint:
def connect(__endpoint: Endpoint | int, *, access_token: str | None = None) -> Endpoint:
"""Tells an existing debug adapter instance that is listening on the
specified address to debug this process.

View file

@ -474,6 +474,7 @@ class Session(object):
if event.event == "exited":
self.observe(occ)
self.exit_code = event("exitCode", int)
self.exit_reason = event("reason", str, optional=True)
assert self.exit_code == self.expected_exit_code
elif event.event == "debugpyAttach":
self.observe(occ)

View file

@ -537,3 +537,49 @@ def test_breakaway_job(pyfile, target, run):
log.info("Waiting for child process...")
child_process.wait()
@pytest.mark.parametrize("run", runners.all_launch)
@pytest.mark.skipif(
sys.platform == "win32", reason="os.exec() is specific to POSIX"
)
def test_subprocess_replace(pyfile, target, run):
@pyfile
def child():
import os
import sys
assert "debugpy" in sys.modules
from debuggee import backchannel
backchannel.send(os.getpid())
@pyfile
def parent():
import debuggee
import os
import sys
debuggee.setup()
print(f"execl({sys.executable!r}, {sys.argv[1]!r})")
os.execl(sys.executable, sys.executable, sys.argv[1])
with debug.Session() as parent_session:
backchannel = parent_session.open_backchannel()
with run(parent_session, target(parent, args=[child])):
pass
expected_child_config = expected_subprocess_config(parent_session)
child_config = parent_session.wait_for_next_event("debugpyAttach")
child_config.pop("isOutputRedirected", None)
assert child_config == expected_child_config
parent_session.proceed()
with debug.Session(child_config) as child_session:
with child_session.start():
pass
child_pid = backchannel.receive()
assert child_pid == child_config["subProcessId"]
assert str(child_pid) in child_config["name"]

View file

@ -295,7 +295,7 @@ class Timeline(object):
# Otherwise, break it down expectation by expectation.
message += ":"
for exp, reason in reasons.items():
message += "\n\n where {exp!r}\n == {reason!r}"
message += f"\n\n where {exp!r}\n == {reason!r}"
else:
message += "."