Expand the testing helpers.

This commit is contained in:
Eric Snow 2018-02-22 00:46:56 +00:00
parent 608ca082da
commit 29fa1e123f
3 changed files with 187 additions and 48 deletions

View file

@ -148,16 +148,9 @@ class Daemon(object):
self._handlers.append(entry)
return handler
def reset(self, force=False):
def reset(self, *initial, **kwargs):
"""Clear the recorded messages."""
if self._failures:
raise RuntimeError('have failures ({!r})'.format(self._failures))
if self._handlers:
if force:
self._handlers = []
else:
raise RuntimeError('have pending handlers')
self._received = []
self._reset(initial, **kwargs)
# internal methods
@ -242,3 +235,13 @@ class Daemon(object):
#if self._listener.is_alive():
# raise RuntimeError('timed out')
self._listener = None
def _reset(self, initial, force=False):
if self._failures:
raise RuntimeError('have failures ({!r})'.format(self._failures))
if self._handlers:
if force:
self._handlers = []
else:
raise RuntimeError('have pending handlers')
self._received = list(self._protocol.parse_each(initial))

View file

@ -4,6 +4,8 @@ import platform
from _pydevd_bundle.pydevd_comm import (
CMD_VERSION,
CMD_LIST_THREADS,
CMD_THREAD_SUSPEND,
)
from tests.helpers.pydevd import FakePyDevd
@ -49,6 +51,24 @@ class PyDevdMessages(object):
msg = (cmdid, seq, text)
return self.protocol.parse(msg)
def format_threads(self, *threads):
text = '<xml>'
for thread in threads: # (tid, tname)
text += '<thread id="{}" name="{}" />'.format(*thread)
text += '</xml>'
return text
def format_frames(self, thread, reason, *frames):
tid, _ = thread # (tid, tname)
text = '<xml>'
text += '<thread id="{}" stop_reason="{}">'.format(tid, reason)
fmt = '<frame id="{}" name="{}" file="{}" line="{}" />'
for frame in frames: # (fid, func, filename, line)
text += fmt.format(*frame)
text += '</thread>'
text += '</xml>'
return text
class VSCMessages(object):
@ -83,7 +103,7 @@ class VSCMessages(object):
def new_failure(self, req, err, seq=None, **body):
"""Return a new VSC response message."""
return self._new_response(req, err, body)
return self._new_response(req, err, body=body)
def _new_response(self, req, err=None, seq=None, body=None):
if seq is None:
@ -133,11 +153,13 @@ class VSCLifecycle(object):
def launch(self, **kwargs):
"""Initialize the debugger protocol and then launch."""
self._handshake('launch', **kwargs)
with self._fix.hidden():
self._handshake('launch', **kwargs)
def attach(self, **kwargs):
"""Initialize the debugger protocol and then attach."""
self._handshake('attach', **kwargs)
with self._fix.hidden():
self._handshake('attach', **kwargs)
def disconnect(self, **reqargs):
self._send_request('disconnect', reqargs)
@ -156,26 +178,29 @@ class VSCLifecycle(object):
start()
yield
def _handshake(self, command, config=None, reset=True, **kwargs):
def _handshake(self, command, threads=None, config=None,
default_threads=True, reset=True,
**kwargs):
initargs = dict(
kwargs.pop('initargs', None) or {},
disconnect=kwargs.pop('disconnect', True),
)
with self._fix.vsc.wait_for_event('initialized'):
with self._fix.wait_for_event('initialized'):
self._initialize(**initargs)
self._send_request(command, **kwargs)
next(self._fix.vsc_msgs.event_seq)
self._fix.send_request(command, **kwargs)
self._fix.set_threads(*threads or (),
**dict(default_threads=default_threads))
self._handle_config(**config or {})
self._send_request('configurationDone')
next(self._fix.vsc_msgs.event_seq)
with self._fix.wait_for_event('process'):
self._fix.send_request('configurationDone')
next(self._fix.debugger_msgs.request_seq) # CMD_RUN
assert self._fix.vsc.failures == [], self._fix.vsc.failures
assert self._fix.debugger.failures == [], self._fix.debugger.failures
if reset:
self._fix.vsc.reset()
self._fix.debugger.reset()
self._fix.reset()
else:
self._fix.assert_no_failures()
def _initialize(self, **reqargs):
"""
@ -184,8 +209,8 @@ class VSCLifecycle(object):
def handle_response(resp, _):
self._capabilities = resp.data['body']
version = self._fix.debugger.VERSION
self._set_debugger_response(CMD_VERSION, version)
self._send_request(
self._fix.set_debugger_response(CMD_VERSION, version)
self._fix.send_request(
'initialize',
dict(self.MIN_INITIALIZE_ARGS, **reqargs),
handle_response,
@ -193,12 +218,12 @@ class VSCLifecycle(object):
def _handle_config(self, breakpoints=None, excbreakpoints=None):
if breakpoints:
self._send_request(
self._fix.send_request(
'setBreakpoints',
self._parse_breakpoints(breakpoints),
)
if excbreakpoints:
self._send_request(
self._fix.send_request(
'setExceptionBreakpoints',
self._parse_breakpoints(excbreakpoints),
)
@ -207,14 +232,6 @@ class VSCLifecycle(object):
for bp in breakpoints or ():
raise NotImplementedError
def _send_request(self, *args, **kwargs):
self._fix.send_request(*args, **kwargs)
next(self._fix.vsc_msgs.response_seq)
def _set_debugger_response(self, *args, **kwargs):
self._fix.set_debugger_response(*args, **kwargs)
next(self._fix.debugger_msgs.request_seq)
class HighlevelFixture(object):
@ -232,6 +249,8 @@ class HighlevelFixture(object):
self.vsc_msgs = VSCMessages()
self.debugger_msgs = PyDevdMessages()
self._hidden = False
@property
def vsc(self):
try:
@ -256,6 +275,18 @@ class HighlevelFixture(object):
self._lifecycle = VSCLifecycle(self)
return self._lifecycle
@contextlib.contextmanager
def hidden(self):
vsc = self.vsc.received
debugger = self.debugger.received
self._hidden = True
try:
yield
finally:
self._hidden = False
self.vsc.reset(*vsc)
self.debugger.reset(*debugger)
def new_fake(self, debugger=None, handler=None):
"""Return a new fake VSC that may be used in tests."""
if debugger is None:
@ -264,13 +295,87 @@ class HighlevelFixture(object):
return vsc, debugger
def send_request(self, command, args=None, handle_response=None):
req = self.vsc_msgs.new_request(command, **args or {})
with self.vsc.wait_for_response(req, handler=handle_response):
kwargs = dict(args or {}, handler=handle_response)
with self._wait_for_response(command, **kwargs) as req:
self.vsc.send_request(req)
return req
@contextlib.contextmanager
def _wait_for_response(self, command, *args, **kwargs):
handler = kwargs.pop('handler', None)
req = self.vsc_msgs.new_request(command, *args, **kwargs)
with self.vsc.wait_for_response(req, handler=handler):
yield req
if self._hidden:
next(self.vsc_msgs.response_seq)
@contextlib.contextmanager
def wait_for_event(self, event, *args, **kwargs):
with self.vsc.wait_for_event(event, *args, **kwargs):
yield
if self._hidden:
next(self.vsc_msgs.event_seq)
def set_debugger_response(self, cmdid, payload):
self.debugger.add_pending_response(cmdid, payload)
if self._hidden:
next(self.debugger_msgs.request_seq)
def send_debugger_event(self, cmdid, payload):
event = self.debugger_msgs.new_event(cmdid, payload)
self.debugger.send_event(event)
def set_threads(self, *threads, **kwargs):
return self._set_threads(threads, **kwargs)
def _set_threads(self, threads, default_threads=True):
request = {t[1]: t for t in threads}
response = {t: None for t in threads}
if default_threads:
threads = self._add_default_threads(threads)
text = self.debugger_msgs.format_threads(*threads)
self.set_debugger_response(CMD_LIST_THREADS, text)
self.send_request('threads')
for tinfo in self.vsc.received[-1].data['body']['threads']:
try:
thread = request[tinfo['name']]
except KeyError:
continue
response[thread] = tinfo['id']
return response
def _add_default_threads(self, threads):
defaults = {
'MainThread',
'ptvsd.Server',
'pydevd.thread1',
'pydevd.thread2',
}
seen = set()
for thread in threads:
tid, tname = thread
seen.add(tid)
if tname in defaults:
defaults.remove(tname)
ids = (id for id in itertools.count(1) if id not in seen)
allthreads = []
for tname in defaults:
tid = next(ids)
thread = tid, tname
allthreads.append(thread)
allthreads.extend(threads)
return allthreads
def suspend(self, thread, reason, *stack):
with self.wait_for_event('stopped'):
self.send_debugger_event(
CMD_THREAD_SUSPEND,
self.debugger_msgs.format_frames(thread, reason, *stack),
)
#def set_variables(self, ...):
# ...
@contextlib.contextmanager
def disconnect_when_done(self):
@ -278,7 +383,15 @@ class HighlevelFixture(object):
yield
finally:
self.send_request('disconnect')
#self.vsc._received.pop(-1)
def assert_no_failures(self):
assert self.vsc.failures == [], self.vsc.failures
assert self.debugger.failures == [], self.debugger.failures
def reset(self):
self.assert_no_failures()
self.vsc.reset()
self.debugger.reset()
class HighlevelTest(object):
@ -324,6 +437,16 @@ class HighlevelTest(object):
expected = list(self.vsc.protocol.parse_each(expected))
self.assertEqual(received, expected)
def assert_vsc_failure(self, received, expected, req):
received = list(self.vsc.protocol.parse_each(received))
expected = list(self.vsc.protocol.parse_each(expected))
self.assertEqual(received[:-1], expected)
failure = received[-1]
expected = self.vsc.protocol.parse(
self.fix.vsc_msgs.new_failure(req, failure.data['message']))
self.assertEqual(failure, expected)
def assert_received(self, daemon, expected):
"""Ensure that the received messages match the expected ones."""
received = list(daemon.protocol.parse_each(daemon.received))

View file

@ -115,8 +115,8 @@ class InitializeTests(LifecycleTest, unittest.TestCase):
self.new_event(1, 'initialized'),
])
self.assert_received(self.debugger, [
self.debugger_msgs.new_request(CMD_VERSION,
*['1.1', OS_ID, 'ID']),
self.new_debugger_request(CMD_VERSION,
*['1.1', OS_ID, 'ID']),
])
@ -129,8 +129,8 @@ class NormalRequestTest(RunningTest):
PYDEVD_REQ = None
PYDEVD_CMD = None
def launched(self, port=8888):
return super(NormalRequestTest, self).launched(port)
def launched(self, port=8888, **kwargs):
return super(NormalRequestTest, self).launched(port, **kwargs)
def set_debugger_response(self, *args, **kwargs):
if self.PYDEVD_REQ is None:
@ -145,6 +145,7 @@ class NormalRequestTest(RunningTest):
def send_request(self, **args):
self.req = self.fix.send_request(self.COMMAND, args)
return self.req
def expected_response(self, **body):
return self.new_response(
@ -165,14 +166,10 @@ class ThreadsTests(NormalRequestTest, unittest.TestCase):
PYDEVD_REQ = CMD_LIST_THREADS
def pydevd_payload(self, *threads):
text = '<xml>'
for tid, tname in threads:
text += '<thread name="{}" id="{}" />'.format(tname, tid)
text += '</xml>'
return text
return self.debugger_msgs.format_threads(*threads)
def test_basic(self):
with self.launched():
def test_few(self):
with self.launched(default_threads=False):
self.set_debugger_response(
(10, 'spam'),
(11, 'pydevd.eggs'),
@ -195,6 +192,22 @@ class ThreadsTests(NormalRequestTest, unittest.TestCase):
self.expected_pydevd_request(),
])
def test_none(self):
with self.launched(default_threads=False):
self.set_debugger_response()
self.send_request()
received = self.vsc.received
self.assert_vsc_received(received, [
self.expected_response(
threads=[],
),
# no events
])
self.assert_received(self.debugger, [
self.expected_pydevd_request(),
])
# TODO: finish!
@unittest.skip('not finished')