debugpy/tests/watchdog/__init__.py

123 lines
3.1 KiB
Python

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
"""A watchdog process for debuggee processes spawned by tests.
Interacts with the main test runner process over stdio, and keeps track of running
debugpy processes. If the test runner process goes down, any debugpy test processes
are automatically killed.
"""
__all__ = ["start", "register_spawn", "unregister_spawn"]
import atexit
import os
import psutil
import subprocess
import sys
import threading
import time
from debugpy.common import log, messaging
from tests.watchdog import worker
WATCHDOG_TIMEOUT = 3
_name = f"watchdog-{os.getpid()}"
_stream = None
_process = None
_worker_log_filename = None
def start():
global _stream, _process, _worker_log_filename
if _stream is not None:
return
args = [sys.executable, worker.__file__, str(os.getpid())]
log.info(
"Spawning {0} for tests-{1}:\n\n{2}",
_name,
os.getpid(),
"\n".join(repr(s) for s in args),
)
_process = psutil.Popen(
args, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE
)
_stream = messaging.JsonIOStream(_process.stdout, _process.stdin, _name)
event, _worker_log_filename = _stream.read_json()
assert event == "watchdog"
atexit.register(stop)
def _dump_worker_log(command, problem, exc_info=None):
reason = f"{_name}.{command}() {problem}"
if _worker_log_filename is None:
reason += ", but there is no log."
else:
try:
with open(_worker_log_filename) as f:
worker_log = f.read()
except Exception:
reason += f", but log {_worker_log_filename} could not be retrieved."
else:
reason += f"; watchdog worker process log:\n\n{worker_log}"
if exc_info is None:
log.error("{0}", reason)
else:
log.swallow_exception("{0}", reason, exc_info=exc_info)
return reason
def _invoke(command, *args):
def timeout():
time.sleep(WATCHDOG_TIMEOUT)
if timeout.occurred is None:
reason = _dump_worker_log(command, "timed out")
timeout.occurred = reason
timeout.occurred = None
timeout_thread = threading.Thread(target=timeout)
timeout_thread.daemon = True
timeout_thread.start()
try:
try:
_stream.write_json([command] + list(args))
response = _stream.read_json()
assert response == ["ok"], f"{_name} {response!r}"
finally:
timeout.occurred = False
except Exception:
_dump_worker_log(command, "failed", sys.exc_info())
raise
else:
assert not timeout.occurred, str(timeout.occurred)
def stop():
if _stream is None:
return
try:
_invoke("stop")
_stream.close()
except Exception:
log.swallow_exception()
def register_spawn(pid, name):
if _stream is None:
start()
_invoke("register_spawn", pid, name)
def unregister_spawn(pid, name):
assert _stream is not None
_invoke("unregister_spawn", pid, name)