debugpy/tests/helpers/session.py
Pavel Minaev 8f68b3d359
Port refactored command line parser to master (#1105)
* Fix #1090: Port refactored command line parser to master

* Fix #921: Double dash in program arguments gets caught and crashes pydevd

* Fix #1013: -h/--help arg to program instead shows help for interpreter
2019-01-16 20:26:39 -08:00

610 lines
22 KiB
Python

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import print_function, with_statement, absolute_import
import os
import psutil
import pytest
import socket
import subprocess
import sys
import threading
import time
import traceback
import ptvsd
import ptvsd.__main__
from ptvsd.messaging import JsonIOStream, JsonMessageChannel, MessageHandlers
from . import colors, debuggee, print, watchdog
from .messaging import LoggingJsonStream
from .pattern import ANY
from .printer import wait_for_output
from .timeline import Timeline, Event, Response
from collections import namedtuple
PTVSD_ENABLE_KEY = 'PTVSD_ENABLE_ATTACH'
PTVSD_HOST_KEY = 'PTVSD_TEST_HOST'
PTVSD_PORT_KEY = 'PTVSD_TEST_PORT'
class DebugSession(object):
WAIT_FOR_EXIT_TIMEOUT = 5
BACKCHANNEL_TIMEOUT = 15
StopInfo = namedtuple('StopInfo', 'thread_stopped, stacktrace, thread_id, frame_id')
def __init__(self, start_method='launch', ptvsd_port=None, pid=None):
assert start_method in ('launch', 'attach_pid', 'attach_socket_cmdline', 'attach_socket_import')
assert ptvsd_port is None or start_method.startswith('attach_socket_')
print('New debug session with method %r' % str(start_method))
self.target = ('code', 'print("OK")')
self.start_method = start_method
self.ptvsd_port = ptvsd_port or 5678
self.multiprocess = False
self.multiprocess_port_range = None
self.debug_options = ['RedirectOutput']
self.path_mappings = []
self.env = os.environ.copy()
self.env['PYTHONPATH'] = os.path.dirname(debuggee.__file__)
self.cwd = None
self.expected_returncode = 0
self.program_args = []
self.is_running = False
self.process = None
self.pid = pid
self.psutil_process = psutil.Process(self.pid) if self.pid else None
self.socket = None
self.server_socket = None
self.connected = threading.Event()
self.backchannel_socket = None
self.backchannel_port = None
self.backchannel_established = threading.Event()
self._output_capture_threads = []
self.output_data = {'OUT': [], 'ERR': []}
self.timeline = Timeline(ignore_unobserved=[
Event('output'),
Event('thread', ANY.dict_with({'reason': 'exited'}))
])
self.timeline.freeze()
self.perform_handshake = True
self.use_backchannel = False
# Expose some common members of timeline directly - these should be the ones
# that are the most straightforward to use, and are difficult to use incorrectly.
# Conversely, most tests should restrict themselves to this subset of the API,
# and avoid calling members of timeline directly unless there is a good reason.
self.new = self.timeline.new
self.observe = self.timeline.observe
self.wait_for_next = self.timeline.wait_for_next
self.proceed = self.timeline.proceed
self.expect_new = self.timeline.expect_new
self.expect_realized = self.timeline.expect_realized
self.all_occurrences_of = self.timeline.all_occurrences_of
self.observe_all = self.timeline.observe_all
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
was_final = self.timeline.is_final
self.close()
assert exc_type is not None or was_final, (
'Session timeline must be finalized before session goes out of scope at the end of the '
'with-statement. Use wait_for_exit(), wait_for_termination(), or wait_for_disconnect() '
'as appropriate.'
)
def __contains__(self, expectation):
return expectation in self.timeline
@property
def ignore_unobserved(self):
return self.timeline.ignore_unobserved
@ignore_unobserved.setter
def ignore_unobserved(self, value):
self.timeline.ignore_unobserved = value
def close(self):
if self.socket:
try:
self.socket.shutdown(socket.SHUT_RDWR)
except:
self.socket = None
if self.server_socket:
try:
self.server_socket.shutdown(socket.SHUT_RDWR)
except:
self.server_socket = None
if self.backchannel_socket:
try:
self.backchannel_socket.shutdown(socket.SHUT_RDWR)
except:
self.backchannel_socket = None
if self.process:
try:
self._kill_process_tree()
except:
traceback.print_exc()
pass
self._wait_for_remaining_output()
def _get_argv_for_attach_using_import(self):
argv = [sys.executable]
return argv
def _get_argv_for_launch(self):
argv = [sys.executable]
argv += [os.path.dirname(ptvsd.__file__)]
argv += ['--client']
argv += ['--host', 'localhost', '--port', str(self.ptvsd_port)]
return argv
def _get_argv_for_attach_using_cmdline(self):
argv = [sys.executable]
argv += [os.path.dirname(ptvsd.__file__)]
argv += ['--wait']
argv += ['--host', 'localhost', '--port', str(self.ptvsd_port)]
return argv
def _get_argv_for_attach_using_pid(self):
argv = [sys.executable]
argv += [os.path.dirname(ptvsd.__file__)]
argv += ['--host', 'localhost', '--port', str(self.ptvsd_port)]
argv += ['--pid', str(self.pid)]
return argv
def _get_target(self):
argv = []
run_as, path_or_code = self.target
if run_as == 'file':
assert os.path.isfile(path_or_code)
argv += [path_or_code]
elif run_as == 'module':
if os.path.isfile(path_or_code) or os.path.isdir(path_or_code):
self.env['PYTHONPATH'] += os.pathsep + os.path.dirname(path_or_code)
try:
module = path_or_code[len(os.path.dirname(path_or_code)) + 1:-3]
except Exception:
module = 'code_to_debug'
argv += ['-m', module]
else:
argv += ['-m', path_or_code]
elif run_as == 'code':
if os.path.isfile(path_or_code):
with open(path_or_code, 'r') as f:
code = f.read()
argv += ['-c', code]
else:
argv += ['-c', path_or_code]
else:
pytest.fail()
return argv
def _setup_session(self, **kwargs):
self.ignore_unobserved += [
Event('thread', ANY.dict_with({'reason': 'started'})),
Event('module')
] + kwargs.pop('ignore_unobserved', [])
self.env.update(kwargs.pop('env', {}))
self.path_mappings += kwargs.pop('path_mappings', [])
self.debug_options += kwargs.pop('debug_options', [])
self.program_args += kwargs.pop('program_args', [])
for k, v in kwargs.items():
setattr(self, k, v)
assert self.start_method in ('launch', 'attach_pid', 'attach_socket_cmdline', 'attach_socket_import')
assert len(self.target) == 2
assert self.target[0] in ('file', 'module', 'code')
def initialize(self, **kwargs):
"""Spawns ptvsd using the configured method, telling it to execute the
provided Python file, module, or code, and establishes a message channel
to it.
If use_backchannel is True, calls self.setup_backchannel() before returning.
If perform_handshake is True, calls self.handshake() before returning.
"""
self._setup_session(**kwargs)
print('Initializing debug session for ptvsd#%d' % self.ptvsd_port)
argv = []
if self.start_method == 'launch':
self._listen()
argv += self._get_argv_for_launch()
elif self.start_method == 'attach_socket_cmdline':
argv += self._get_argv_for_attach_using_cmdline()
elif self.start_method == 'attach_socket_import':
argv += self._get_argv_for_attach_using_import()
# TODO: Remove adding ot python path after enabling TOX
ptvsd_path = os.path.dirname(os.path.dirname(ptvsd.__main__.__file__))
self.env['PYTHONPATH'] = ptvsd_path + os.pathsep + self.env['PYTHONPATH']
self.env[PTVSD_ENABLE_KEY] = '1'
self.env[PTVSD_HOST_KEY] = 'localhost'
self.env[PTVSD_PORT_KEY] = str(self.ptvsd_port)
elif self.start_method == 'attach_pid':
argv += self._get_argv_for_attach_using_pid()
else:
pytest.fail()
argv += self._get_target()
if self.program_args:
argv += list(self.program_args)
if self.multiprocess and 'Multiprocess' not in self.debug_options:
self.debug_options += ['Multiprocess']
if self.use_backchannel:
self.setup_backchannel()
if self.backchannel_port:
self.env['PTVSD_BACKCHANNEL_PORT'] = str(self.backchannel_port)
print('ptvsd: %s' % ptvsd.__file__)
print('Start method: %s' % self.start_method)
print('Target: (%s) %s' % self.target)
print('Current directory: %s' % self.cwd)
print('PYTHONPATH: %s' % self.env['PYTHONPATH'])
print('Spawning %r' % argv)
self.process = subprocess.Popen(argv, env=self.env, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=self.cwd)
self.pid = self.process.pid
self.psutil_process = psutil.Process(self.pid)
self.is_running = True
watchdog.create(self.pid)
self._capture_output(self.process.stdout, 'OUT')
self._capture_output(self.process.stderr, 'ERR')
if self.start_method != 'launch':
self.connect()
self.connected.wait()
assert self.ptvsd_port
assert self.socket
print('ptvsd#%d has pid=%d' % (self.ptvsd_port, self.pid))
telemetry = self.timeline.wait_for_next(Event('output'))
assert telemetry == Event('output', {
'category': 'telemetry',
'output': 'ptvsd',
'data': {'version': ptvsd.__version__}
})
if self.perform_handshake:
return self.handshake()
def wait_for_disconnect(self, close=True):
"""Waits for the connected ptvsd process to disconnect.
"""
print(colors.LIGHT_MAGENTA + 'Waiting for ptvsd#%d to disconnect' % self.ptvsd_port + colors.RESET)
# self.channel.wait()
self.channel.close()
self.timeline.finalize()
if close:
self.timeline.close()
wait_for_output()
def wait_for_termination(self):
print(colors.LIGHT_MAGENTA + 'Waiting for ptvsd#%d to terminate' % self.ptvsd_port + colors.RESET)
# BUG: ptvsd sometimes exits without sending 'terminate' or 'exited', likely due to
# https://github.com/Microsoft/ptvsd/issues/530. So rather than wait for them, wait until
# we disconnect, then check those events for proper body only if they're actually present.
self.wait_for_disconnect(close=False)
if Event('exited') in self:
self.expect_realized(Event('exited', {'exitCode': self.expected_returncode}))
if Event('terminated') in self:
self.expect_realized(Event('exited') >> Event('terminated', {}))
self.timeline.close()
wait_for_output()
def wait_for_exit(self):
"""Waits for the spawned ptvsd process to exit. If it doesn't exit within
WAIT_FOR_EXIT_TIMEOUT seconds, forcibly kills the process. After the process
exits, validates its return code to match expected_returncode.
"""
if not self.is_running:
return
assert self.psutil_process is not None
def kill():
time.sleep(self.WAIT_FOR_EXIT_TIMEOUT)
if self.is_running:
print('ptvsd#%r (pid=%d) timed out, killing it' % (self.ptvsd_port, self.pid))
self._kill_process_tree()
kill_thread = threading.Thread(target=kill, name='ptvsd#%r watchdog (pid=%d)' % (self.ptvsd_port, self.pid))
kill_thread.daemon = True
kill_thread.start()
print(colors.LIGHT_MAGENTA + 'Waiting for ptvsd#%d (pid=%d) to terminate' % (self.ptvsd_port, self.pid) + colors.RESET)
self.psutil_process.wait()
self.is_running = False
if self.process is not None:
self.process.wait()
assert self.process.returncode == self.expected_returncode
self.wait_for_termination()
def _kill_process_tree(self):
assert self.psutil_process is not None
procs = [self.psutil_process]
try:
procs += self.psutil_process.children(recursive=True)
except:
pass
for p in procs:
try:
p.kill()
except:
pass
def _listen(self):
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.bind(('localhost', 0))
_, self.ptvsd_port = self.server_socket.getsockname()
self.server_socket.listen(0)
def accept_worker():
print('Listening for incoming connection from ptvsd#%d' % self.ptvsd_port)
self.socket, _ = self.server_socket.accept()
print('Incoming ptvsd#%d connection accepted' % self.ptvsd_port)
self._setup_channel()
accept_thread = threading.Thread(target=accept_worker, name='ptvsd#%d listener' % self.ptvsd_port)
accept_thread.daemon = True
accept_thread.start()
def connect(self):
# ptvsd will take some time to spawn and start listening on the port,
# so just hammer at it until it responds (or we time out).
while not self.socket:
try:
self._try_connect()
except socket.error:
pass
time.sleep(0.1)
def _try_connect(self):
print('Trying to connect to ptvsd#%d' % self.ptvsd_port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', self.ptvsd_port))
print('Successfully connected to ptvsd#%d' % self.ptvsd_port)
self.socket = sock
self._setup_channel()
def _setup_channel(self):
self.stream = LoggingJsonStream(JsonIOStream.from_socket(self.socket), 'ptvsd#%d' % self.ptvsd_port)
handlers = MessageHandlers(request=self._process_request, event=self._process_event)
self.channel = JsonMessageChannel(self.stream, handlers)
self.channel.start()
self.connected.set()
def send_request(self, command, arguments=None, proceed=True):
if self.timeline.is_frozen and proceed:
self.proceed()
request = self.timeline.record_request(command, arguments)
request.sent = self.channel.send_request(command, arguments)
request.sent.on_response(lambda response: self._process_response(request, response))
def causing(*expectations):
for exp in expectations:
(request >> exp).wait()
return request
request.causing = causing
return request
def handshake(self):
"""Performs the handshake that establishes the debug session ('initialized'
and 'launch' or 'attach').
After this method returns, ptvsd is not running any code yet, but it is
ready to accept any configuration requests (e.g. for initial breakpoints).
Once initial configuration is complete, start_debugging() should be called
to finalize the configuration stage, and start running code.
"""
self.send_request('initialize', {'adapterID': 'test'}).wait_for_response()
self.wait_for_next(Event('initialized', {}))
request = 'launch' if self.start_method == 'launch' else 'attach'
self.send_request(request, {
'debugOptions': self.debug_options,
'pathMappings': self.path_mappings,
}).wait_for_response()
# Issue 'threads' so that we get the 'thread' event for the main thread now,
# rather than at some random time later during the test.
(self.send_request('threads')
.causing(Event('thread'))
.wait_for_response())
def start_debugging(self, freeze=True):
"""Finalizes the configuration stage, and issues a 'configurationDone' request
to start running code under debugger.
After this method returns, ptvsd is running the code in the script file or module
that was specified via self.target.
"""
configurationDone_request = self.send_request('configurationDone')
# The relative ordering of 'process' and 'configurationDone' is not deterministic.
# (implementation varies depending on whether it's launch or attach, but in any
# case, it is an implementation detail).
start = self.wait_for_next(Event('process') & Response(configurationDone_request))
self.expect_new(Event('process', {
'name': ANY.str,
'isLocalProcess': True,
'startMethod': 'launch' if self.start_method == 'launch' else 'attach',
'systemProcessId': self.pid if self.pid is not None else ANY.int,
}))
if not freeze:
self.proceed()
return start
def _process_event(self, event):
self.timeline.record_event(event.event, event.body, block=False)
def _process_response(self, request_occ, response):
self.timeline.record_response(request_occ, response.body, block=False)
def _process_request(self, request):
assert False, 'ptvsd should not be sending requests.'
def setup_backchannel(self):
self.backchannel_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.backchannel_socket.settimeout(self.BACKCHANNEL_TIMEOUT)
self.backchannel_socket.bind(('localhost', 0))
_, self.backchannel_port = self.backchannel_socket.getsockname()
self.backchannel_socket.listen(0)
backchannel_thread = threading.Thread(target=self._backchannel_worker, name='bchan#%d listener' % self.ptvsd_port)
backchannel_thread.daemon = True
backchannel_thread.start()
def _backchannel_worker(self):
print('Listening for incoming backchannel connection for bchan#%d' % self.ptvsd_port)
sock = None
try:
sock, _ = self.backchannel_socket.accept()
except socket.timeout:
assert sock is not None, 'bchan#%r timed out!' % self.ptvsd_port
print('Incoming bchan#%d backchannel connection accepted' % self.ptvsd_port)
sock.settimeout(None)
self._backchannel_stream = LoggingJsonStream(JsonIOStream.from_socket(sock), 'bchan#%d' % self.ptvsd_port)
self.backchannel_established.set()
@property
def backchannel(self):
assert self.backchannel_port, 'backchannel() must be called after setup_backchannel()'
self.backchannel_established.wait()
return self._backchannel_stream
def read_json(self):
return self.backchannel.read_json()
def write_json(self, value):
self.timeline.unfreeze()
t = self.timeline.mark(('sending', value))
self.backchannel.write_json(value)
return t
def _capture_output(self, pipe, name):
def _output_worker():
while True:
try:
line = pipe.readline()
if not line:
break
self.output_data[name].append(line)
except Exception:
break
else:
prefix = 'ptvsd#%d %s ' % (self.ptvsd_port, name)
line = colors.LIGHT_BLUE + prefix + colors.RESET + line.decode('utf-8')
print(line, end='')
thread = threading.Thread(target=_output_worker, name='ptvsd#%r %s' % (self.ptvsd_port, name))
thread.daemon = True
thread.start()
self._output_capture_threads.append(thread)
def _wait_for_remaining_output(self):
for thread in self._output_capture_threads:
thread.join()
def set_breakpoints(self, path, lines=()):
self.send_request('setBreakpoints', arguments={
'source': {'path': path},
'breakpoints': [{'line': bp_line} for bp_line in lines],
}).wait_for_response()
def wait_for_thread_stopped(self, reason=ANY):
thread_stopped = self.wait_for_next(Event('stopped', ANY.dict_with({'reason': reason})))
tid = thread_stopped.body['threadId']
assert tid is not None
resp_stacktrace = self.send_request('stackTrace', arguments={
'threadId': tid,
}).wait_for_response()
assert resp_stacktrace.body['totalFrames'] > 0
frames = resp_stacktrace.body['stackFrames']
fid = frames[0]['id']
return self.StopInfo(thread_stopped, resp_stacktrace, tid, fid)
def connect_to_child_session(self, ptvsd_subprocess):
child_port = ptvsd_subprocess.body['port']
assert child_port != 0
child_session = DebugSession(start_method='attach_socket_cmdline', ptvsd_port=child_port)
try:
child_session.ignore_unobserved = self.ignore_unobserved
child_session.debug_options = self.debug_options
child_session.connect()
child_session.handshake()
except:
child_session.close()
else:
return child_session
def connect_to_next_child_session(self):
ptvsd_subprocess = self.wait_for_next(Event('ptvsd_subprocess'))
return self.connect_to_child_session(ptvsd_subprocess)
def get_stdout_as_string(self):
return b''.join(self.output_data['OUT'])
def get_stderr_as_string(self):
return b''.join(self.output_data['ERR'])
def connect_with_new_session(self, **kwargs):
new_session = DebugSession(start_method='attach_socket_import', ptvsd_port=self.ptvsd_port)
try:
new_session._setup_session(**kwargs)
new_session.ignore_unobserved = self.ignore_unobserved
new_session.debug_options = self.debug_options
new_session.connect()
new_session.handshake()
except:
new_session.close()
else:
return new_session