Improve multiprocess and reattach scenario handling in debug.Session.

Use unbuffered pipes for subprocesses on all Python versions.

Make test watchdog more lenient with respect to process ID reuse - warn, but don't fail.
This commit is contained in:
Pavel Minaev 2019-07-15 22:07:31 -07:00 committed by Pavel Minaev
parent 045fb03888
commit 4aec028c32
5 changed files with 98 additions and 47 deletions

View file

@ -68,14 +68,25 @@ class Session(object):
_counter = itertools.count(1)
def __init__(self, start_method, ptvsd_port=None, pid=None):
assert start_method in self.START_METHODS
assert ptvsd_port is None or start_method.startswith('attach_socket_')
def __init__(self, start_method=None, pid=None, ptvsd_port=None):
self._created = False
if pid is None:
assert start_method in self.START_METHODS
assert ptvsd_port is None or start_method.startswith('attach_socket_')
else:
assert start_method is None
assert ptvsd_port is not None
start_method = "custom_server"
watchdog.start()
self.id = next(self._counter)
log.info('New debug session {0}; will be started via {1!r}.', self, start_method)
format_string = 'New debug session {session}'
if pid is None:
format_string += '; debugged process will be started via {start_method!r}.'
else:
format_string += " for existing process with pid={pid}."
log.info(format_string, session=self, start_method=start_method, pid=pid)
self.lock = threading.RLock()
self.target = None
@ -98,11 +109,11 @@ class Session(object):
self.env['PYTHONPATH'] = (tests.root / "DEBUGGEE_PYTHONPATH").strpath
self.env['PTVSD_SESSION_ID'] = str(self.id)
self.is_running = False
self.process = None
self.process_terminated = False
self.process_exited = False
self.pid = pid
self.psutil_process = psutil.Process(self.pid) if self.pid else None
self.is_running = pid is not None
self.psutil_process = psutil.Process(self.pid) if self.is_running else None
self.kill_ptvsd_on_close = True
self.socket = None
self.server_socket = None
@ -133,10 +144,16 @@ class Session(object):
self.all_occurrences_of = self.timeline.all_occurrences_of
self.observe_all = self.timeline.observe_all
# This must always be the last attribute set, to avoid issues with __del__
# trying to cleanup a partially constructed Session.
self._created = True
def __str__(self):
return fmt("ptvsd-{0}", self.id)
def __del__(self):
if not self._created:
return
# Always kill the process tree, even if kill_ptvsd_on_close is False. Any
# test that wants to keep it alive should do so explicitly by keeping the
# Session object alive for the requisite amount of time after it is closed.
@ -148,10 +165,13 @@ class Session(object):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# If we're exiting a failed test, make sure that all output from the debuggee
# process has been received and logged, before we close the sockets and kill
# the process tree. In success case, wait_for_exit() takes care of that.
if exc_type is not None:
# Log the error, in case another one happens during shutdown.
log.exception(exc_info=(exc_type, exc_val, exc_tb))
# If we're exiting a failed test, make sure that all output from the debuggee
# process has been received and logged, before we close the sockets and kill
# the process tree. In success case, wait_for_exit() takes care of that.
# If it failed in the middle of the test, the debuggee process might still
# be alive, and waiting for the test to tell it to continue. In this case,
# it will never close its stdout/stderr, so use a reasonable timeout here.
@ -433,6 +453,7 @@ class Session(object):
spawn_args,
env=env,
cwd=cwd,
bufsize=0,
**stdio
)
self.pid = self.process.pid
@ -471,14 +492,6 @@ class Session(object):
assert self.ptvsd_port
assert self.socket
telemetry = self.wait_for_next_event('output')
assert telemetry == {
'category': 'telemetry',
'output': 'ptvsd',
'data': {'version': some.str},
#'data': {'version': ptvsd.__version__},
}
if self.perform_handshake:
return self.handshake()
@ -559,7 +572,7 @@ class Session(object):
log.info('Waiting for {0} (pid={1}) to terminate...', self, self.pid)
returncode = self.psutil_process.wait()
watchdog.unregister_spawn(self.pid, str(self))
self.process_terminated = True
self.process_exited = True
assert not timed_out, "wait_for_exit() timed out"
assert returncode == self.expected_returncode
@ -568,12 +581,11 @@ class Session(object):
self.wait_for_termination(close=True)
def kill_process_tree(self):
if self.process is None or self.process_terminated:
if self.psutil_process is None or self.process_exited:
return
log.info('Killing {0} process tree...', self)
assert self.psutil_process is not None
procs = [self.psutil_process]
try:
procs += self.psutil_process.children(recursive=True)
@ -594,13 +606,16 @@ class Session(object):
except Exception:
log.exception()
self.process_terminated = True
self.process_exited = True
log.info('Killed {0} process tree', self)
self.captured_output.wait()
self.close_stdio()
def close_stdio(self):
if self.process is None:
return
log.debug('Closing stdio pipes of {0}...', self)
try:
self.process.stdin.close()
@ -714,6 +729,14 @@ class Session(object):
to finalize the configuration stage, and start running code.
"""
telemetry = self.wait_for_next_event('output')
assert telemetry == {
'category': 'telemetry',
'output': 'ptvsd',
'data': {'version': some.str},
#'data': {'version': ptvsd.__version__},
}
self.request('initialize', {'adapterID': 'test'})
self.wait_for_next(Event('initialized'))
@ -764,11 +787,13 @@ class Session(object):
return start
def _process_event(self, event):
self.timeline.record_event(event, block=False)
if event.event == "ptvsd_subprocess":
pid = event.body["processId"]
watchdog.register_spawn(pid, fmt("{0}-subprocess-{1}", self, pid))
elif event.event == "terminated":
self.timeline.record_event(event, block=False)
if event.event == "terminated":
# Stop the message loop, since the ptvsd is going to close the connection
# from its end shortly after sending this event, and no further messages
# are expected.
@ -939,15 +964,25 @@ class Session(object):
def attach_to_subprocess(self, ptvsd_subprocess):
assert ptvsd_subprocess == Event("ptvsd_subprocess")
pid = ptvsd_subprocess.body['processId']
child_port = ptvsd_subprocess.body['port']
assert child_port != 0
assert (pid, child_port) == (some.int, some.int)
child_session = Session('attach_socket_cmdline', ptvsd_port=child_port)
log.info(
'Attaching to subprocess of {0} with pid={1} at port {2}',
self,
pid,
child_port,
)
child_session = Session(pid=pid, ptvsd_port=child_port)
try:
child_session.ignore_unobserved = self.ignore_unobserved
child_session.debug_options = self.debug_options
child_session.rules = self.rules
child_session.connect()
child_session.connected.wait()
child_session.handshake()
except Exception:
child_session.close()
@ -969,17 +1004,13 @@ class Session(object):
assert self.start_method.startswith("attach_socket_")
ns = Session('attach_socket_import', ptvsd_port=self.ptvsd_port)
ns = Session(pid=self.pid, ptvsd_port=self.ptvsd_port)
try:
ns._setup_session(**kwargs)
ns.ignore_unobserved = list(self.ignore_unobserved)
ns.debug_options = set(self.debug_options)
ns.rules = list(self.rules)
ns.pid = self.pid
ns.process = self.process
ns.psutil_process = psutil.Process(ns.pid)
ns.is_running = True
ns.connect()
ns.connected.wait()

View file

@ -131,8 +131,8 @@ def test_multiprocessing(pyfile, start_method, run_as):
parent_backchannel.send("continue")
grandchild_session.wait_for_termination()
child_session.wait_for_termination()
grandchild_session.wait_for_exit()
child_session.wait_for_exit()
assert parent_backchannel.receive() == "done"
parent_session.wait_for_exit()

View file

@ -33,7 +33,7 @@ def pytest_addoption(parser):
def pytest_configure(config):
log_dir = config.rootdir / "tests" / "_logs"
if config.option.ptvsd_logs:
if True or config.option.ptvsd_logs:
log.info("ptvsd logs will be in {0}", log_dir)
debug.PTVSD_ENV["PTVSD_LOG_DIR"] = str(log_dir)
if config.option.pydevd_logs:

View file

@ -45,7 +45,13 @@ def start():
os.getpid(),
"\n".join(repr(s) for s in args),
)
_process = psutil.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
_process = psutil.Popen(
args,
bufsize=0,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
_stream = messaging.JsonIOStream(_process.stdout, _process.stdin, _name)

View file

@ -11,14 +11,18 @@ from __future__ import absolute_import, print_function, unicode_literals
# Do not import ptvsd on top level, either - sys.path needs to be fixed first -
# this is done in main().
import collections
import os
import sys
import platform
import psutil
import sys
import tempfile
import time
ProcessInfo = collections.namedtuple("ProcessInfo", ["process", "name"])
def main(tests_pid):
# To import ptvsd, the "" entry in sys.path - which is added automatically on
# Python 2 - must be removed first; otherwise, we end up importing tests/ptvsd.
@ -27,7 +31,7 @@ def main(tests_pid):
from ptvsd.common import fmt, log, messaging
log.stderr_levels |= {"info"}
# log.stderr_levels |= {"info"}
log.timestamp_format = "06.3f"
log.filename_prefix = "watchdog"
log.to_file()
@ -37,7 +41,7 @@ def main(tests_pid):
tests_process = psutil.Process(tests_pid)
stream.write_json(["watchdog", log.filename()])
spawned_processes = {}
spawned_processes = {} # pid -> ProcessInfo
try:
stop = False
while not stop:
@ -57,20 +61,30 @@ def main(tests_pid):
pid, name = args
pid = int(pid)
log.debug(
log.info(
"watchdog-{0} registering spawned process {1} (pid={2})",
tests_pid,
name,
pid,
)
assert pid not in spawned_processes
spawned_processes[pid] = psutil.Process(pid)
try:
_, old_name = spawned_processes[pid]
except KeyError:
pass
else:
log.warning(
"watchdog-{0} already tracks a process with pid={1}: {2}",
tests_pid,
pid,
old_name,
)
spawned_processes[pid] = ProcessInfo(psutil.Process(pid), name)
elif command == "unregister_spawn":
pid, name = args
pid = int(pid)
log.debug(
log.info(
"watchdog-{0} unregistering spawned process {1} (pid={2})",
tests_pid,
name,
@ -92,8 +106,8 @@ def main(tests_pid):
sys.stdout.close()
tests_process.wait()
leftover_processes = set(spawned_processes.values())
for proc in spawned_processes.values():
leftover_processes = {proc for proc, _ in spawned_processes.values()}
for proc, _ in spawned_processes.values():
try:
leftover_processes |= proc.children(recursive=True)
except Exception:
@ -126,7 +140,7 @@ def main(tests_pid):
# gcore will automatically add pid to the filename
core_file = os.path.join(tempfile.gettempdir(), "ptvsd_core")
gcore_cmd = fmt("gcore -o {0} {1}", core_file, proc.pid)
log.warning("{0}", gcore_cmd)
log.warning("watchdog-{0}: {1}", tests_pid, gcore_cmd)
os.system(gcore_cmd)
except Exception:
log.exception()
@ -138,7 +152,7 @@ def main(tests_pid):
except Exception:
log.exception()
log.debug("watchdog-{0} exiting", tests_pid)
log.info("watchdog-{0} exiting", tests_pid)
if __name__ == "__main__":