diff --git a/Makefile b/Makefile index 9ddc8c22..f20be6c3 100644 --- a/Makefile +++ b/Makefile @@ -9,14 +9,9 @@ help: ## Print help about available targets. depends: $(PYTHON) -m pip install --upgrade pip $(PYTHON) -m pip install setuptools - $(PYTHON) -m pip install flake8 $(PYTHON) -m pip install flake8_formatter_junit_xml $(PYTHON) -m pip install unittest-xml-reporting - $(PYTHON) -m pip install coverage - $(PYTHON) -m pip install requests - $(PYTHON) -m pip install flask - $(PYTHON) -m pip install django - $(PYTHON) -m pip install pytest + $(PYTHON) -m pip install -r test_requirements.txt .PHONY: lint lint: ## Lint the Python source code. diff --git a/ptvsd/__main__.py b/ptvsd/__main__.py index 49015ada..2053c5ef 100644 --- a/ptvsd/__main__.py +++ b/ptvsd/__main__.py @@ -6,7 +6,7 @@ import argparse import os.path import sys -from ptvsd import pydevd_hooks +from ptvsd import multiproc from ptvsd._attach import attach_main from ptvsd._local import debug_main, run_main from ptvsd.socket import Address @@ -35,7 +35,6 @@ PYDEVD_FLAGS = { '--DEBUG_RECORD_SOCKET_READS', '--cmd-line', '--module', - '--multiprocess', '--print-in-debugger-startup', '--save-signatures', '--save-threading', @@ -126,7 +125,7 @@ def _group_args(argv): # ptvsd support elif arg in ('--host', '--server-host', '--port', '--pid', '-m', '--multiprocess-port-range'): - if arg == '-m' or arg == '--pid': + if arg in ('-m', '--pid'): gottarget = True supported.append(arg) if nextarg is not None: @@ -134,6 +133,9 @@ def _group_args(argv): skip += 1 elif arg in ('--single-session', '--wait'): supported.append(arg) + elif arg == '--multiprocess': + supported.append(arg) + pydevd.append(arg) elif not arg.startswith('-'): supported.append(arg) gottarget = True @@ -193,7 +195,12 @@ def _parse_args(prog, argv): multiprocess_port_range = ns.pop('multiprocess_port_range') if multiprocess_port_range is not None: - pydevd_hooks.multiprocess_port_range = multiprocess_port_range + if not ns['multiprocess']: + parser.error('--multiprocess-port-range requires --multiprocess') + multiproc.subprocess_port_range = multiprocess_port_range + + if ns['multiprocess']: + multiproc.enable() pid = ns.pop('pid') module = ns.pop('module') diff --git a/ptvsd/_util.py b/ptvsd/_util.py index d18d9d21..412acf14 100644 --- a/ptvsd/_util.py +++ b/ptvsd/_util.py @@ -24,8 +24,8 @@ def debug(*msg, **kwargs): if tb: import traceback traceback.print_exc() - print(*msg, file=sys.stderr) - sys.stderr.flush() + print(*msg, file=sys.__stderr__) + sys.__stderr__.flush() @contextlib.contextmanager diff --git a/ptvsd/messaging.py b/ptvsd/messaging.py index ea7823ac..8f6c7b0a 100644 --- a/ptvsd/messaging.py +++ b/ptvsd/messaging.py @@ -10,7 +10,7 @@ import itertools import json import sys import threading - +from ._util import new_hidden_thread class JsonIOStream(object): """Implements a JSON value stream over two byte streams (input and output). @@ -63,7 +63,10 @@ class JsonIOStream(object): def _read_line(self): line = b'' while True: - line += self._reader.readline() + try: + line += self._reader.readline() + except Exception: + raise EOFError if not line: raise EOFError if line.endswith(b'\r\n'): @@ -79,14 +82,7 @@ class JsonIOStream(object): headers = {} while True: - try: - line = self._read_line() - except Exception: - if self._is_closing: - raise EOFError - else: - raise - + line = self._read_line() if line == b'': break key, _, value = line.partition(b':') @@ -202,7 +198,7 @@ class JsonMessageChannel(object): Debug Adapter Protocol (https://microsoft.github.io/debug-adapter-protocol/overview). """ - def __init__(self, stream, handlers=None): + def __init__(self, stream, handlers=None, name='vsc_messaging'): self.stream = stream self.send_callback = lambda channel, message: None self.receive_callback = lambda channel, message: None @@ -211,7 +207,7 @@ class JsonMessageChannel(object): self._seq_iter = itertools.count(1) self._requests = {} self._handlers = handlers - self._worker = threading.Thread(target=self._process_incoming_messages) + self._worker = new_hidden_thread(name, self._process_incoming_messages) self._worker.daemon = True def close(self): @@ -296,7 +292,10 @@ class JsonMessageChannel(object): if specific_handler is not None: handler = lambda: specific_handler(self, arguments) else: - generic_handler = getattr(self._handlers, 'request') + try: + generic_handler = getattr(self._handlers, 'request') + except AttributeError: + raise AttributeError('%r has no handler for request %r' % (self._handlers, command)) handler = lambda: generic_handler(self, command, arguments) try: response_body = handler() @@ -311,7 +310,10 @@ class JsonMessageChannel(object): if specific_handler is not None: handler = lambda: specific_handler(self, body) else: - generic_handler = getattr(self._handlers, 'event') + try: + generic_handler = getattr(self._handlers, 'event') + except AttributeError: + raise AttributeError('%r has no handler for event %r' % (self._handlers, event)) handler = lambda: generic_handler(self, event, body) handler() @@ -344,4 +346,4 @@ class MessageHandlers(object): def __init__(self, **kwargs): for name, func in kwargs.items(): - setattr(self, name, func) \ No newline at end of file + setattr(self, name, func) diff --git a/ptvsd/multiproc.py b/ptvsd/multiproc.py new file mode 100644 index 00000000..0401fc49 --- /dev/null +++ b/ptvsd/multiproc.py @@ -0,0 +1,110 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE in the project root +# for license information. + +from __future__ import print_function, with_statement, absolute_import + +import atexit +import itertools +import os +import random +import socket + +try: + import queue +except ImportError: + import Queue as queue + +import ptvsd +from .socket import create_server, create_client +from .messaging import JsonIOStream, JsonMessageChannel +from ._util import new_hidden_thread, debug + +import pydevd +from _pydev_bundle import pydev_monkey + + +# Defaults to the intersection of default ephemeral port ranges for various common systems. +subprocess_port_range = (49152, 61000) + +listener_port = None + +subprocess_queue = queue.Queue() + + +def enable(): + global listener_port, _server + _server = create_server('localhost', 0) + atexit.register(disable) + _, listener_port = _server.getsockname() + + listener_thread = new_hidden_thread('SubprocessListener', _listener) + listener_thread.start() + + +def disable(): + try: + _server.shutdown(socket.SHUT_RDWR) + except Exception: + pass + +def _listener(): + counter = itertools.count(1) + while listener_port: + (sock, _) = _server.accept() + stream = JsonIOStream.from_socket(sock) + _handle_subprocess(next(counter), stream) + + +def _handle_subprocess(n, stream): + class Handlers(object): + def ptvsd_subprocess_event(self, channel, body): + debug('ptvsd_subprocess: %r' % body) + subprocess_queue.put(body) + channel.close() + + name = 'SubprocessListener-%d' % n + channel = JsonMessageChannel(stream, Handlers(), name) + channel.start() + + +def init_subprocess(parent_port, first_port, last_port, pydevd_setup): + # Called from the code injected into subprocess, before it starts + # running user code. See pydevd_hooks.get_python_c_args. + + global listener_port, subprocess_port_range + listener_port = parent_port + subprocess_port_range = (first_port, last_port) + + pydevd.SetupHolder.setup = pydevd_setup + pydev_monkey.patch_new_process_functions() + + ports = list(range(first_port, last_port)) + random.shuffle(ports) + for port in ports: + try: + ptvsd.enable_attach(('localhost', port)) + except IOError: + pass + else: + debug('Child process %d listening on port %d' % (os.getpid(), port)) + break + else: + raise Exception('Could not find a free port in range {first_port}-{last_port}') + + enable() + + debug('Child process %d notifying parent process at port %d' % (os.getpid(), parent_port)) + conn = create_client() + conn.connect(('localhost', parent_port)) + stream = JsonIOStream.from_socket(conn) + channel = JsonMessageChannel(stream) + channel.send_event('ptvsd_subprocess', { + 'processId': os.getpid(), + 'port': port, + }) + + debug('Child process %d notified parent process; waiting for connection.' % os.getpid()) + ptvsd.wait_for_attach() + + diff --git a/ptvsd/pydevd_hooks.py b/ptvsd/pydevd_hooks.py index b83a29f0..f9559b1a 100644 --- a/ptvsd/pydevd_hooks.py +++ b/ptvsd/pydevd_hooks.py @@ -10,15 +10,12 @@ from _pydev_bundle import pydev_monkey from _pydevd_bundle import pydevd_comm import ptvsd +from ptvsd import multiproc from ptvsd.socket import Address from ptvsd.daemon import Daemon, DaemonStoppedError, DaemonClosedError from ptvsd._util import debug, new_hidden_thread -# The intersection of default ephemeral port ranges for various common systems. -multiprocess_port_range = (49152, 61000) - - def start_server(daemon, host, port, **kwargs): """Return a socket to a (new) local pydevd-handling daemon. @@ -78,48 +75,24 @@ def start_client(daemon, host, port, **kwargs): # See pydevd/_vendored/pydevd/_pydev_bundle/pydev_monkey.py def get_python_c_args(host, port, indC, args, setup): runner = ''' -import os -import random import sys - sys.path.append(r'{ptvsd_syspath}') - -import ptvsd -from ptvsd._util import DEBUG -from ptvsd import pydevd_hooks - -pydevd_hooks.multiprocess_port_range = ({first_port}, {last_port}) - -from _pydev_bundle import pydev_monkey -pydev_monkey.patch_new_process_functions() - -ports = list(range({first_port}, {last_port})) -random.shuffle(ports) -for port in ports: - try: - ptvsd.enable_attach(('localhost', port)) - except IOError: - pass - else: - if DEBUG: - print('Child process %d listening on port %d' % (os.getpid(), port)) - break -else: - raise Exception('Could not find a free port in range {first_port}-{last_port}') - -ptvsd.wait_for_attach() +from ptvsd import multiproc +multiproc.init_subprocess({parent_port}, {first_port}, {last_port}, {pydevd_setup}) {rest} ''' - first_port, last_port = multiprocess_port_range + first_port, last_port = multiproc.subprocess_port_range # __file__ will be .../ptvsd/__init__.py, and we want the ... ptvsd_syspath = os.path.join(ptvsd.__file__, '../..') return runner.format( + parent_port=multiproc.listener_port, first_port=first_port, last_port=last_port, ptvsd_syspath=ptvsd_syspath, + pydevd_setup=setup, rest=args[indC + 1]) diff --git a/ptvsd/socket.py b/ptvsd/socket.py index eb8affe0..9f535d2f 100644 --- a/ptvsd/socket.py +++ b/ptvsd/socket.py @@ -70,12 +70,14 @@ def is_socket(sock): return isinstance(sock, socket.socket) -def create_server(host, port): +def create_server(host, port, timeout=None): """Return a local server socket listening on the given port.""" if host is None: host = 'localhost' server = _new_sock() server.bind((host, port)) + if timeout is not None: + server.settimeout(timeout) server.listen(1) return server diff --git a/ptvsd/wrapper.py b/ptvsd/wrapper.py index bdb3e80d..33a1e4c2 100644 --- a/ptvsd/wrapper.py +++ b/ptvsd/wrapper.py @@ -26,6 +26,10 @@ try: from functools import reduce except Exception: pass +try: + import queue +except ImportError: + import Queue as queue import warnings from xml.sax import SAXParseException @@ -37,6 +41,7 @@ import _pydevd_bundle.pydevd_frame as pydevd_frame # noqa from _pydevd_bundle.pydevd_additional_thread_info import PyDBAdditionalThreadInfo # noqa from ptvsd import _util +from ptvsd import multiproc import ptvsd.ipcjson as ipcjson # noqa import ptvsd.futures as futures # noqa import ptvsd.untangle as untangle # noqa @@ -1270,6 +1275,24 @@ class VSCodeMessageProcessor(VSCLifecycleMsgProcessor): self.loop.stop() self.event_loop_thread.join(WAIT_FOR_THREAD_FINISH_TIMEOUT) + def start(self, threadname): + super(VSCodeMessageProcessor, self).start(threadname) + self._subprocess_notifier_thread = _util.new_hidden_thread('SubprocessNotifier', self._subprocess_notifier) + self._subprocess_notifier_thread.start() + + def close(self): + super(VSCodeMessageProcessor, self).close() + self._subprocess_notifier_thread.join() + + def _subprocess_notifier(self): + while not self.closed: + try: + subprocess_info = multiproc.subprocess_queue.get(block=False, timeout=0.1) + except queue.Empty: + pass + else: + self.send_event('ptvsd_subprocess', **subprocess_info) + # async helpers def async_method(m): diff --git a/pytest.ini b/pytest.ini index eb3d91bd..c96a6b67 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,3 +1,3 @@ [pytest] testpaths=pytests -timeout=3 +timeout=5 diff --git a/pytests/func/test_multiproc.py b/pytests/func/test_multiproc.py new file mode 100644 index 00000000..cdb35b24 --- /dev/null +++ b/pytests/func/test_multiproc.py @@ -0,0 +1,93 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE in the project root +# for license information. + +from __future__ import print_function, with_statement, absolute_import + +import platform +import pytest + +from ..helpers.pattern import ANY, Pattern +from ..helpers.session import DebugSession +from ..helpers.timeline import Event + + +@pytest.mark.timeout(20) +@pytest.mark.skipif(platform.system() != 'Windows', + reason='Debugging multiprocessing module only works on Windows') +def test_multiprocessing(debug_session, pyfile): + @pyfile + def code_to_debug(): + import multiprocessing + + def child_of_child(q): + print('entering child of child') + assert q.get() == 2 + q.put(3) + print('leaving child of child') + + def child(q): + print('entering child') + assert q.get() == 1 + p = multiprocessing.Process(target=child_of_child, args=(q,)) + p.start() + p.join() + assert q.get() == 3 + q.put(4) + print('leaving child') + + if __name__ == '__main__': + import pytests.helpers.backchannel as backchannel + multiprocessing.set_start_method('spawn') + q = multiprocessing.Queue() + p = multiprocessing.Process(target=child, args=(q,)) + p.start() + backchannel.write_json(p.pid) + q.put(1) + assert backchannel.read_json() == 'continue' + q.put(2) + p.join() + assert q.get() == 4 + q.close() + + debug_session.setup_backchannel() + debug_session.multiprocess = True + debug_session.prepare_to_run(filename=code_to_debug) + debug_session.start_debugging() + + child_pid = debug_session.backchannel.read_json() + + child_subprocess = debug_session.wait_until(Event('ptvsd_subprocess')) + assert child_subprocess.body in Pattern({ + 'processId': child_pid, + 'port': ANY.int, + }) + child_port = child_subprocess.body['port'] + + child_session = DebugSession(method='attach_socket', ptvsd_port=child_port) + child_session.connect() + child_session.handshake() + child_session.start_debugging() + + child_child_subprocess = child_session.wait_until(Event('ptvsd_subprocess')) + assert child_child_subprocess.body in Pattern({ + 'processId': ANY.int, + 'port': ANY.int, + }) + child_child_port = child_child_subprocess.body['port'] + + child_child_session = DebugSession(method='attach_socket', ptvsd_port=child_child_port) + child_child_session.connect() + child_child_session.handshake() + child_child_session.start_debugging() + child_child_session.wait_until(Event('process')) + + debug_session.backchannel.write_json('continue') + + child_child_session.send_request('disconnect') + child_child_session.wait_for_disconnect() + + child_session.send_request('disconnect') + child_session.wait_for_disconnect() + + debug_session.wait_for_exit() diff --git a/pytests/helpers/__init__.py b/pytests/helpers/__init__.py index 4ce1997f..7a29c745 100644 --- a/pytests/helpers/__init__.py +++ b/pytests/helpers/__init__.py @@ -4,7 +4,11 @@ from __future__ import print_function, with_statement, absolute_import +import os +import sys import threading +import time +import traceback print_lock = threading.Lock() @@ -15,3 +19,36 @@ def print(*args, **kwargs): """ with print_lock: real_print(*args, **kwargs) + + +def dump_stacks(): + """Dump the stacks of all threads except the current thread""" + current_ident = threading.current_thread().ident + for thread_ident, frame in sys._current_frames().items(): + if thread_ident == current_ident: + continue + for t in threading.enumerate(): + if t.ident == thread_ident: + thread_name = t.name + thread_daemon = t.daemon + break + else: + thread_name = '' + print('Stack of %s (%s) in pid %s; daemon=%s' % (thread_name, thread_ident, os.getpid(), thread_daemon)) + print(''.join(traceback.format_stack(frame))) + + +def dump_stacks_in(secs): + """Invokes dump_stacks() on a background thread after waiting. + + Can be called from debugged code before the point after which it hangs, + to determine the cause of the hang while debugging a test. + """ + + def dumper(): + time.sleep(secs) + dump_stacks() + + thread = threading.Thread(target=dumper) + thread.daemon = True + thread.start() diff --git a/pytests/helpers/backchannel.py b/pytests/helpers/backchannel.py new file mode 100644 index 00000000..d85a539e --- /dev/null +++ b/pytests/helpers/backchannel.py @@ -0,0 +1,27 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE in the project root +# for license information. + +from __future__ import print_function, with_statement, absolute_import + +"""Imported from test code that runs under ptvsd, and allows that code +to communcate back to the test. Works in conjunction with debug_session +fixture and its backchannel method.""" + +import os +import socket + +from ptvsd.messaging import JsonIOStream + +port = int(os.getenv('PTVSD_BACKCHANNEL_PORT')) +sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +sock.connect(('localhost', port)) +stream = JsonIOStream.from_socket(sock) + + +def read_json(): + return stream.read_json() + + +def write_json(value): + stream.write_json(value) diff --git a/pytests/helpers/messaging.py b/pytests/helpers/messaging.py index 30e04626..41755653 100644 --- a/pytests/helpers/messaging.py +++ b/pytests/helpers/messaging.py @@ -40,20 +40,20 @@ class LoggingJsonStream(object): id_iter = itertools.count() - def __init__(self, stream): + def __init__(self, stream, id=None): self.stream = stream - self.id = next(self.id_iter) + self.id = id or next(self.id_iter) def close(self): self.stream.close() def read_json(self): value = self.stream.read_json() - print('%d --> %r' % (self.id, value)) + print('%s --> %r' % (self.id, value)) return value def write_json(self, value): - print('%d <-- %r' % (self.id, value)) + print('%s <-- %r' % (self.id, value)) self.stream.write_json(value) diff --git a/pytests/helpers/pattern.py b/pytests/helpers/pattern.py index 77c4222f..fb4f5ee2 100644 --- a/pytests/helpers/pattern.py +++ b/pytests/helpers/pattern.py @@ -5,6 +5,9 @@ from __future__ import print_function, with_statement, absolute_import from collections import defaultdict +import numbers + +from ptvsd.compat import unicode class BasePattern(object): @@ -149,6 +152,11 @@ class Success(BasePattern): return self.success != isinstance(response_body, Exception) -ANY = Any() SUCCESS = Success(True) FAILURE = Success(False) + +ANY = Any() +ANY.bool = ANY.such_that(lambda x: x is True or x is False) +ANY.str = ANY.such_that(lambda x: isinstance(x, unicode)) +ANY.num = ANY.such_that(lambda x: isinstance(x, numbers.Real)) +ANY.int = ANY.such_that(lambda x: isinstance(x, numbers.Integral)) diff --git a/pytests/helpers/session.py b/pytests/helpers/session.py index 43da18bd..1eaf4aba 100644 --- a/pytests/helpers/session.py +++ b/pytests/helpers/session.py @@ -14,7 +14,7 @@ import time import ptvsd from ptvsd.messaging import JsonIOStream, JsonMessageChannel, MessageHandlers, RequestFailure -from . import print +from . import print, watchdog from .messaging import LoggingJsonStream from .timeline import Timeline, Request, Response, Event @@ -33,11 +33,19 @@ class DebugSession(object): print('New debug session with method %r' % method) self.method = method - self.ptvsd_port = ptvsd_port + self.ptvsd_port = ptvsd_port or 5678 + self.multiprocess = False + self.multiprocess_port_range = None + + self.is_running = False self.process = None self.socket = None self.server_socket = None self.connected = threading.Event() + self.backchannel_socket = None + self.backchannel_port = None + self.backchannel_established = threading.Event() + self.debug_options = ['RedirectOutput'] self.timeline = Timeline() def stop(self): @@ -56,6 +64,11 @@ class DebugSession(object): self.server_socket.shutdown(socket.SHUT_RDWR) except: self.server_socket = None + if self.backchannel_socket: + try: + self.backchannel_socket.shutdown(socket.SHUT_RDWR) + except: + self.backchannel_socket = None def prepare_to_run(self, perform_handshake=True, filename=None, module=None): """Spawns ptvsd using the configured method, telling it to execute the @@ -69,12 +82,16 @@ class DebugSession(object): argv += ['-m', 'ptvsd'] if self.method == 'attach_socket': - if self.ptvsd_port is None: - self.ptvsd_port = 5678 - argv += ['--port', str(self.ptvsd_port)] + argv += ['--port', str(self.ptvsd_port), '--wait'] else: - port = self._listen() - argv += ['--host', 'localhost', '--port', str(port)] + self._listen() + argv += ['--host', 'localhost', '--port', str(self.ptvsd_port)] + + if self.multiprocess: + argv += ['--multiprocess'] + + if self.multiprocess_port_range: + argv += ['--multiprocess-port-range', '%d-%d' % self.multiprocess_port_range] if filename: assert not module @@ -85,27 +102,20 @@ class DebugSession(object): env = os.environ.copy() env.update({'PYTHONPATH': PTVSD_SYS_PATH}) + if self.backchannel_port: + env['PTVSD_BACKCHANNEL_PORT'] = str(self.backchannel_port) print('Spawning %r' % argv) self.process = subprocess.Popen(argv, env=env, stdin=subprocess.PIPE) - if self.ptvsd_port: - # ptvsd will take some time to spawn and start listening on the port, - # so just hammer at it until it responds (or we time out). - while not self.socket: - try: - self._connect() - except socket.error: - pass - time.sleep(0.1) - else: - self.connected.wait() - assert self.socket + self.is_running = True + watchdog.create(self.process.pid) - self.stream = LoggingJsonStream(JsonIOStream.from_socket(self.socket)) - - handlers = MessageHandlers(request=self._process_request, event=self._process_event) - self.channel = JsonMessageChannel(self.stream, handlers) - self.channel.start() + if self.method == 'attach_socket': + self.connect() + self.connected.wait() + assert self.ptvsd_port + assert self.socket + print('ptvsd@%d has pid=%d' % (self.ptvsd_port, self.process.pid)) if perform_handshake: self.handshake() @@ -116,47 +126,64 @@ class DebugSession(object): exits, validates its return code to match expected_returncode. """ - process = self.process - if not process: - return - def kill(): time.sleep(self.WAIT_FOR_EXIT_TIMEOUT) - print('ptvsd process timed out, killing it') - p = process - if p: - p.kill() + print('ptvsd process %d timed out, killing it' % self.process.pid) + if self.is_running: + self.process.kill() kill_thread = threading.Thread(target=kill) kill_thread.daemon = True kill_thread.start() - process.wait() - assert process.returncode == expected_returncode + self.process.wait() + self.is_running = False + assert self.process.returncode == expected_returncode + + def wait_for_disconnect(self): + """Waits for the connected ptvsd process to disconnect. + """ + return self.channel.wait() def _listen(self): self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.bind(('localhost', 0)) - _, port = self.server_socket.getsockname() + _, self.ptvsd_port = self.server_socket.getsockname() self.server_socket.listen(0) def accept_worker(): - print('Listening for incoming connection from ptvsd on port %d' % port) + print('Listening for incoming connection from ptvsd@%d' % self.ptvsd_port) self.socket, _ = self.server_socket.accept() - print('Incoming ptvsd connection accepted') - self.connected.set() + print('Incoming ptvsd@%d connection accepted' % self.ptvsd_port) + self._setup_channel() accept_thread = threading.Thread(target=accept_worker) accept_thread.daemon = True accept_thread.start() - return port + def connect(self): + # ptvsd will take some time to spawn and start listening on the port, + # so just hammer at it until it responds (or we time out). + while not self.socket: + try: + self._try_connect() + except socket.error: + pass + time.sleep(0.1) - def _connect(self): - print('Trying to connect to ptvsd on port %d' % self.ptvsd_port) + def _try_connect(self): + print('Trying to connect to ptvsd@%d' % self.ptvsd_port) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect(('localhost', self.ptvsd_port)) - print('Successfully connected to ptvsd') + print('Successfully connected to ptvsd@%d' % self.ptvsd_port) self.socket = sock + self._setup_channel() + + def _setup_channel(self): + self.stream = LoggingJsonStream(JsonIOStream.from_socket(self.socket), 'ptvsd@%d' % self.ptvsd_port) + handlers = MessageHandlers(request=self._process_request, event=self._process_event) + self.channel = JsonMessageChannel(self.stream, handlers) + self.channel.start() + self.connected.set() def send_request(self, command, arguments=None): request = self.timeline.record_request(command, arguments) @@ -177,7 +204,8 @@ class DebugSession(object): promised_occurrence[:] = (occ,) def handshake(self): - """Performs the handshake that establishes the debug session. + """Performs the handshake that establishes the debug session ('initialized' + and 'launch' or 'attach'). After this method returns, ptvsd is not running any code yet, but it is ready to accept any configuration requests (e.g. for initial breakpoints). @@ -188,18 +216,16 @@ class DebugSession(object): with self.causing(Event('initialized', {})): self.send_request('initialize', {'adapterID': 'test'}).wait_for_response() - def start_debugging(self, arguments=None, force_threads=True): - """Finalizes the configuration stage, and issues a 'launch' or an 'attach' request - to start running code under debugger, passing arguments through. + request = 'launch' if self.method == 'launch' else 'attach' + self.send_request(request, {'debugOptions': self.debug_options}).wait_for_response() + + def start_debugging(self): + """Finalizes the configuration stage, and issues a 'configurationDone' request + to start running code under debugger. After this method returns, ptvsd is running the code in the script file or module - that was specified in run(). + that was specified in prepare_to_run(). """ - - request = 'launch' if self.method == 'launch' else 'attach' - self.send_request(request, arguments).wait_for_response() - if force_threads: - self.send_request('threads').wait_for_response() self.send_request('configurationDone').wait_for_response() def _process_event(self, channel, event, body): @@ -219,4 +245,25 @@ class DebugSession(object): return self.timeline.wait_until(expectation) def history(self): - return self.timeline.history \ No newline at end of file + return self.timeline.history + + def setup_backchannel(self): + assert self.process is None, 'setup_backchannel() must be called before prepare_to_run()' + self.backchannel_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.backchannel_socket.bind(('localhost', 0)) + _, self.backchannel_port = self.backchannel_socket.getsockname() + self.backchannel_socket.listen(0) + backchannel_thread = threading.Thread(target=self._backchannel_worker) + backchannel_thread.daemon = True + backchannel_thread.start() + + def _backchannel_worker(self): + sock, _ = self.backchannel_socket.accept() + self._backchannel_stream = LoggingJsonStream(JsonIOStream.from_socket(sock), 'bchan@%d' % self.ptvsd_port) + self.backchannel_established.set() + + @property + def backchannel(self): + assert self.backchannel_port, 'backchannel() must be called after setup_backchannel()' + self.backchannel_established.wait() + return self._backchannel_stream diff --git a/pytests/helpers/timeline.py b/pytests/helpers/timeline.py index 4b6e34ae..86b88efc 100644 --- a/pytests/helpers/timeline.py +++ b/pytests/helpers/timeline.py @@ -6,6 +6,7 @@ from __future__ import print_function, with_statement, absolute_import import threading import time +import pytest_timeout # noqa import pytests.helpers.pattern as pattern diff --git a/pytests/helpers/watchdog.py b/pytests/helpers/watchdog.py new file mode 100644 index 00000000..a7c913bf --- /dev/null +++ b/pytests/helpers/watchdog.py @@ -0,0 +1,42 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE in the project root +# for license information. + +from __future__ import print_function, with_statement, absolute_import + +import multiprocessing +import os +import sys +import psutil + + +def create(pid): + proc = multiprocessing.Process(target=watch, args=(os.getpid(), pid)) + proc.daemon = True + proc.start() + + +def watch(test_pid, ptvsd_pid): + print('Watchdog created for ptvsd process %d' % ptvsd_pid) + test_process = psutil.Process(test_pid) + ptvsd_process = psutil.Process(ptvsd_pid) + + test_process.wait() + + if ptvsd_process.is_running(): + print('Child ptvsd process %d still running after test process exited! Killing it.' % ptvsd_pid) + procs = [ptvsd_process] + try: + procs += ptvsd_process.children(recursive=True) + except: + pass + for p in procs: + try: + p.kill() + except: + pass + + +if __name__ == '__main__': + _, test_pid, ptvsd_pid = sys.argv + watch(test_pid, ptvsd_pid) diff --git a/pytests/requirements.txt b/pytests/requirements.txt index 47829ae5..7aba617b 100644 --- a/pytests/requirements.txt +++ b/pytests/requirements.txt @@ -1,2 +1,3 @@ pytest>=3.8 pytest-timeout>=1.3 +psutil>=5.4 diff --git a/setup.py b/setup.py index 6025fc23..7d8ade4c 100644 --- a/setup.py +++ b/setup.py @@ -59,7 +59,7 @@ if __name__ == '__main__': url='https://aka.ms/ptvs', python_requires='>=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*', setup_requires=['pytest_runner>=4.2'], - tests_require=['pytest>=3.8', 'pytest-timeout>=1.3'], + tests_require=['pytest>=3.8', 'pytest-timeout>=1.3', 'psutil>=5.4'], classifiers=[ 'Development Status :: 5 - Production/Stable', 'Programming Language :: Python :: 2.7', diff --git a/test_requirements.txt b/test_requirements.txt index 79b5b545..9cc0279c 100644 --- a/test_requirements.txt +++ b/test_requirements.txt @@ -3,5 +3,6 @@ coverage requests flask django -pytest -pytest-timeout +pytest>=3.8 +pytest-timeout>=1.3 +psutil>=5.4