Merge pull request #215 from ericsnowcurrently/tests-using-pydevd

Add some initial tests that each use a live pydevd.
This commit is contained in:
Eric Snow 2018-03-19 16:56:32 -07:00 committed by GitHub
commit b076906e02
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 1083 additions and 322 deletions

View file

@ -4,6 +4,10 @@
import sys
# import the wrapper first, so that it gets a chance
# to detour pydevd socket functionality.
import ptvsd.wrapper
__author__ = "Microsoft Corporation <ptvshelp@microsoft.com>"
__version__ = "4.0.0a2"
@ -13,22 +17,39 @@ DONT_DEBUG = []
def debug(filename, port_num, debug_id, debug_options, run_as):
# TODO: docstring
# import the wrapper first, so that it gets a chance
# to detour pydevd socket functionality.
import ptvsd.wrapper
import pydevd
args = [
'--port', str(port_num),
'--client', '127.0.0.1',
]
address = (None, port_num)
if run_as == 'module':
args.append('--module')
args.extend(('--file', filename + ":"))
_run_module(address, filename)
else:
args.extend(('--file', filename))
sys.argv[1:0] = args
_run_file(address, filename)
def _run_module(address, modname):
filename = modname + ':'
argv = _run_argv(address, filename)
argv.append('--module')
_run(argv)
def _run_file(address, filename):
argv = _run_argv(address, filename)
_run(argv)
def _run_argv(address, filename):
host, port = address
if host is None:
host = '127.0.0.1'
return [
'--port', str(port),
'--client', host,
'--file', filename,
]
def _run(argv):
import pydevd
sys.argv[1:0] = argv
try:
pydevd.main()
except SystemExit as ex:

View file

@ -99,6 +99,9 @@ class SocketIO(object):
self.__socket.send(content)
except BrokenPipeError:
pass
except OSError as exc:
if exc.errno not in (errno.ENOTCONN, errno.EBADF):
raise
def _buffered_read_line_as_ascii(self):
"""Return the next line from the buffer as a string.
@ -239,7 +242,7 @@ class IpcChannel(object):
self.__exit = False
self.__lock = thread.allocate_lock()
self.__message = []
self.__exit_on_unknown_command = True
self._exit_on_unknown_command = True
def close(self):
# TODO: docstring
@ -352,7 +355,9 @@ class IpcChannel(object):
def on_invalid_request(self, request, args):
# TODO: docstring
self.send_response(request, success=False, message='Unknown command')
if self.__exit_on_unknown_command:
if self._exit_on_unknown_command:
# TODO: Shouldn't we let VSC decide how to handle this
# instead of exiting?
self.__exit = True
def _receive_message(self, message):

View file

@ -247,6 +247,18 @@ class PydevdSocket(object):
def shutdown(self, mode):
"""Called when pydevd has stopped."""
def getpeername(self):
"""Return the remote address to which the socket is connected."""
if self._vscprocessor is None:
raise NotImplementedError
return self._vscprocessor.socket.getpeername()
def getsockname(self):
"""Return the sockets own address."""
if self._vscprocessor is None:
raise NotImplementedError
return self._vscprocessor.socket.getsockname()
def recv(self, count):
"""Return the requested number of bytes.
@ -501,12 +513,26 @@ class VSCodeMessageProcessor(ipcjson.SocketIO, ipcjson.IpcChannel):
self.disconnect_request_event = threading.Event()
pydevd._vscprocessor = self
self._closed = False
self._exited = False
self.path_casing = PathUnNormcase()
self.event_loop_thread = threading.Thread(target=self.loop.run_forever,
name='ptvsd.EventLoop')
self.event_loop_thread.daemon = True
self.event_loop_thread.start()
def _handle_exit(self):
if self._exited:
return
self._exited = True
self.send_event('exited', exitCode=ptvsd_sys_exit_code)
self.send_event('terminated')
self.disconnect_request_event.wait(WAIT_FOR_DISCONNECT_REQUEST_TIMEOUT)
if self.disconnect_request is not None:
self.send_response(self.disconnect_request)
self.disconnect_request = None
def close(self):
"""Stop the message processor and release its resources."""
if self._closed:
@ -518,14 +544,7 @@ class VSCodeMessageProcessor(ipcjson.SocketIO, ipcjson.IpcChannel):
pydevd.shutdown(socket.SHUT_RDWR)
pydevd.close()
global ptvsd_sys_exit_code
self.send_event('exited', exitCode=ptvsd_sys_exit_code)
self.send_event('terminated')
self.disconnect_request_event.wait(WAIT_FOR_DISCONNECT_REQUEST_TIMEOUT)
if self.disconnect_request is not None:
self.send_response(self.disconnect_request)
self.disconnect_request = None
self._handle_exit()
self.set_exit()
self.loop.stop()
@ -1293,7 +1312,7 @@ def _new_sock():
return sock
def _start(client, server, killonclose=True):
def _start(client, server, killonclose=True, addhandlers=True):
name = 'ptvsd.Client' if server is None else 'ptvsd.Server'
pydevd = PydevdSocket(lambda *args: proc.on_pydevd_event(*args))
@ -1305,24 +1324,35 @@ def _start(client, server, killonclose=True):
server_thread.daemon = True
server_thread.start()
return pydevd, proc, server_thread
if addhandlers:
_add_atexit_handler(proc, server_thread)
_set_signal_handlers(proc)
return pydevd
def _add_atexit_handler(proc, server_thread):
def handler(proc, server_thread):
proc.close()
if server_thread.is_alive():
server_thread.join(WAIT_FOR_THREAD_FINISH_TIMEOUT)
atexit.register(handler)
def _set_signal_handlers(proc):
if platform.system() == 'Windows':
return None
def handler(signum, frame):
proc.close()
sys.exit(0)
signal.signal(signal.SIGHUP, handler)
########################
# pydevd hooks
def exit_handler(proc, server_thread):
proc.close()
if server_thread.is_alive():
server_thread.join(WAIT_FOR_THREAD_FINISH_TIMEOUT)
def signal_handler(signum, frame, proc):
proc.close()
sys.exit(0)
def start_server(port):
def start_server(port, addhandlers=True):
"""Return a socket to a (new) local pydevd-handling daemon.
The daemon supports the pydevd client wire protocol, sending
@ -1332,17 +1362,11 @@ def start_server(port):
"""
server = _create_server(port)
client, _ = server.accept()
pydevd, proc, server_thread = _start(client, server)
atexit.register(lambda: exit_handler(proc, server_thread))
if platform.system() != 'Windows':
signal.signal(
signal.SIGHUP,
(lambda signum, frame: signal_handler(signum, frame, proc)),
)
pydevd = _start(client, server)
return pydevd
def start_client(host, port):
def start_client(host, port, addhandlers=True):
"""Return a socket to an existing "remote" pydevd-handling daemon.
The daemon supports the pydevd client wire protocol, sending
@ -1352,13 +1376,7 @@ def start_client(host, port):
"""
client = _create_client()
client.connect((host, port))
pydevd, proc, server_thread = _start(client, None)
atexit.register(lambda: exit_handler(proc, server_thread))
if platform.system() != 'Windows':
signal.signal(
signal.SIGHUP,
(lambda signum, frame: signal_handler(signum, frame, proc)),
)
pydevd = _start(client, None)
return pydevd

View file

@ -77,11 +77,11 @@ class MessageCounters(namedtuple('MessageCounters',
return next(self.event)
class Started(object):
"""A simple wrapper around a started message protocol daemon."""
class DaemonStarted(object):
"""A simple wrapper around a started protocol daemon."""
def __init__(self, fake, address, starting=None):
self.fake = fake
def __init__(self, daemon, address, starting=None):
self.daemon = daemon
self.address = address
self._starting = starting
@ -101,19 +101,77 @@ class Started(object):
raise RuntimeError('timed out')
self._starting = None
def send_message(self, msg):
self.wait_until_connected()
return self.fake.send_message(msg)
def close(self):
self.wait_until_connected()
self.fake.close()
self.daemon.close()
class Daemon(object):
STARTED = DaemonStarted
def __init__(self, bind):
self._bind = bind
self._closed = False
# These are set when we start.
self._address = None
self._sock = None
def start(self, address):
"""Start the fake daemon.
This calls the earlier provided bind() function.
A listener loop is started in another thread to handle incoming
messages from the socket.
"""
self._address = address
addr, starting = self._start(address)
return self.STARTED(self, addr, starting)
def close(self):
"""Clean up the daemon's resources (e.g. sockets, files, listener)."""
if self._closed:
return
self._closed = True
self._close()
# internal methods
def _start(self, address):
connect, addr = self._bind(address)
def run():
self._sock = connect()
self._handle_connected()
t = threading.Thread(target=run)
t.start()
return addr, t
def _handle_connected(self):
pass
def _close(self):
if self._sock is not None:
socket.close(self._sock)
self._sock = None
class MessageDaemonStarted(DaemonStarted):
"""A simple wrapper around a started message protocol daemon."""
def send_message(self, msg):
self.wait_until_connected()
return self.daemon.send_message(msg)
class MessageDaemon(Daemon):
"""A testing double for a protocol daemon."""
STARTED = Started
STARTED = MessageDaemonStarted
@classmethod
def validate_message(cls, msg):
@ -121,10 +179,10 @@ class Daemon(object):
# By default check nothing.
def __init__(self, bind, protocol, handler):
self._bind = bind
super(MessageDaemon, self).__init__(bind)
self._protocol = protocol
self._closed = False
self._received = []
self._failures = []
@ -132,8 +190,6 @@ class Daemon(object):
self._default_handler = handler
# These are set when we start.
self._address = None
self._sock = None
self._listener = None
@property
@ -152,18 +208,6 @@ class Daemon(object):
"""All send/recv failures thus far."""
return list(self._failures)
def start(self, address):
"""Start the fake daemon.
This calls the earlier provided bind() function.
A listener loop is started in another thread to handle incoming
messages from the socket.
"""
self._address = address
addr, starting = self._start(address)
return self.STARTED(self, addr, starting)
def send_message(self, msg):
"""Serialize msg to the line format and send it to the socket."""
if self._closed:
@ -171,14 +215,6 @@ class Daemon(object):
self._validate_message(msg)
self._send_message(msg)
def close(self):
"""Clean up the daemon's resources (e.g. sockets, files, listener)."""
if self._closed:
return
self._closed = True
self._close()
def add_handler(self, handler, handlername=None, oneoff=True):
"""Add the given handler to the list of possible handlers."""
entry = (
@ -195,17 +231,10 @@ class Daemon(object):
# internal methods
def _start(self, address):
connect, addr = self._bind(address)
def run():
self._sock = connect()
# TODO: make it a daemon thread?
self._listener = threading.Thread(target=self._listen)
self._listener.start()
t = threading.Thread(target=run)
t.start()
return addr, t
def _handle_connected(self):
# TODO: make it a daemon thread?
self._listener = threading.Thread(target=self._listen)
self._listener.start()
def _listen(self):
try:
@ -267,9 +296,7 @@ class Daemon(object):
raw = raw[sent:]
def _close(self):
if self._sock is not None:
socket.close(self._sock)
self._sock = None
super(MessageDaemon, self)._close()
if self._listener is not None:
self._listener.join(timeout=1)
# TODO: the listener isn't stopping!

View file

@ -19,12 +19,15 @@ def _bind(address):
def connect(_connect=connect):
client, server = _connect()
pydevd, _, _ = _ptvsd._start(client, server, killonclose=False)
pydevd = _ptvsd._start(client, server,
killonclose=False,
addhandlers=False)
pydevd._vscprocessor._exit_on_unknown_command = False
return socket.Connection(pydevd, server)
return connect, remote
class Started(protocol.Started):
class Started(protocol.MessageDaemonStarted):
def send_response(self, msg):
self.wait_until_connected()
@ -35,7 +38,7 @@ class Started(protocol.Started):
return self.fake.send_event(msg)
class FakePyDevd(protocol.Daemon):
class FakePyDevd(protocol.MessageDaemon):
"""A testing double for PyDevd.
Note that you have the option to provide a handler function. This

View file

@ -0,0 +1,108 @@
import os
import os.path
import sys
import threading
import _pydevd_bundle.pydevd_comm as pydevd_comm
from ptvsd import wrapper, debugger
from tests.helpers import protocol, socket
class Binder(object):
def __init__(self, filename, module):
self.filename = filename
self.module = module
self.address = None
self._waiter = None
self._connect = None
self._thread = None
self.client = None
self.server = None
self.fakesock = None
self.proc = None
def bind(self, address):
if self._connect is not None:
raise RuntimeError('already bound')
self.address = address
self._connect, remote = socket.bind(address)
self._waiter = threading.Lock()
self._waiter.acquire()
def connect():
self._thread = threading.Thread(target=self.run_pydevd)
self._thread.start()
if self._waiter.acquire(timeout=1):
self._waiter.release()
else:
raise RuntimeError('timed out')
return socket.Connection(self.client, self.server)
#return socket.Connection(self.fakesock, self.server)
return connect, remote
def new_pydevd_sock(self, *args):
if self.client is not None:
raise RuntimeError('already connected')
self.client, self.server = self._connect()
self.fakesock = wrapper._start(self.client, self.server,
killonclose=False,
addhandlers=False)
self.proc = self.fakesock._vscprocessor
self._waiter.release()
return self.fakesock
def run_pydevd(self):
pydevd_comm.start_server = self.new_pydevd_sock
pydevd_comm.start_client = self.new_pydevd_sock
# Force a fresh pydevd.
sys.modules.pop('pydevd', None)
try:
if self.module is None:
debugger._run_file(self.address, self.filename)
else:
debugger._run_module(self.address, self.module)
except SystemExit as exc:
wrapper.ptvsd_sys_exit_code = int(exc.code)
raise
wrapper.ptvsd_sys_exit_code = 0
self.proc.close()
def wait_until_done(self):
if self._thread is None:
return
self._thread.join()
class LivePyDevd(protocol.Daemon):
@classmethod
def parse_source(cls, source):
kind, sep, name = source.partition(':')
if kind == 'file':
return name, None, False
elif kind == 'module':
parts = (name).split('.')
filename = os.path.join(*parts) + '.py'
return filename, name, False
else:
# TODO: Write source code to temp module?
raise NotImplementedError
def __init__(self, source):
filename, module, owned = self.parse_source(source)
self._filename = filename
self._owned = owned
self.binder = Binder(filename, module)
super(LivePyDevd, self).__init__(self.binder.bind)
def _close(self):
super(LivePyDevd, self)._close()
# TODO: Close pydevd somehow?
if self._owned:
os.unlink(self._filename)

View file

@ -91,7 +91,7 @@ class Connection(namedtuple('Connection', 'client server')):
try:
self.client.shutdown(*args, **kwargs)
except OSError as exc:
if exc.errno != errno.ENOTCONN:
if exc.errno not in (errno.ENOTCONN, errno.EBADF):
raise
def close(self):

View file

@ -1,5 +1,6 @@
import contextlib
import threading
import warnings
from tests.helpers import protocol, socket
from ._vsc import encode_message, iter_messages, parse_message
@ -21,14 +22,14 @@ def _bind(address):
return connect, remote
class Started(protocol.Started):
class Started(protocol.MessageDaemonStarted):
def send_request(self, msg):
self.wait_until_connected()
return self.fake.send_request(msg)
class FakeVSC(protocol.Daemon):
class FakeVSC(protocol.MessageDaemon):
"""A testing double for a VSC debugger protocol client.
This class facilitates sending VSC debugger protocol messages over
@ -148,15 +149,19 @@ class FakeVSC(protocol.Daemon):
lock.acquire()
def handle_message(msg, send_message):
if match(msg):
lock.release()
if handler is not None:
handler(msg, send_message)
else:
if not match(msg):
return False
lock.release()
if handler is not None:
handler(msg, send_message)
return True
self.add_handler(handle_message, handlername)
yield req
lock.acquire(timeout=timeout) # Wait for the message to match.
lock.release()
# Wait for the message to match.
if lock.acquire(timeout=timeout):
lock.release()
else:
msg = 'timed out after {} seconds waiting for message ({})'
warnings.warn(msg.format(timeout, handlername))

105
tests/helpers/workspace.py Normal file
View file

@ -0,0 +1,105 @@
from textwrap import dedent
import os
import os.path
import shutil
import sys
import tempfile
class Workspace(object):
PREFIX = 'workspace-'
@classmethod
def _new_root(cls):
return tempfile.mkdtemp(prefix=cls.PREFIX)
def __init__(self, root=None):
if root is not None:
self._root = root
self._owned = False
@property
def root(self):
try:
return self._root
except AttributeError:
self._root = self._new_root()
self._owned = True
return self._root
def cleanup(self):
if self._owned:
shutil.rmtree(self._root)
self._owned = False
self._root = None
def resolve(self, *path):
return os.path.join(self.root, *path)
def write(self, *path, **kwargs):
return self._write(path, **kwargs)
def _write(self, path, content='', fixup=True):
if fixup:
content = dedent(content)
filename = self.resolve(*path)
with open(filename, 'w') as outfile:
outfile.write(content)
return filename
class PathEntry(Workspace):
def __init__(self, root=None):
super(PathEntry, self).__init__(root)
self._syspath = None
def cleanup(self):
self.uninstall()
super(PathEntry, self).cleanup()
def install(self):
if self._syspath is not None:
return
if sys.path[0] in ('', '.'):
self._syspath = 1
else:
self._syspath = 0
sys.path.insert(self._syspath, self.root)
def uninstall(self):
if self._syspath is None:
return
del sys.path[self._syspath]
self._syspath = None
def resolve_module(self, name):
parts = (name + '.py').split('.')
return self.resolve(*parts)
def write_module(self, name, content=''):
parent, sep, name = name.rpartition('.')
filename = name + '.py'
if sep:
dirname = self._ensure_package(parent)
filename = os.path.join(dirname, filename)
return self.write(filename, content=content)
def _ensure_package(self, name, root=None):
parent, sep, name = name.rpartition('.')
if sep:
dirname = self._ensure_package(parent, root)
else:
if root is None:
root = self.root
dirname = root
dirname = os.path.join(dirname, name)
initpy = os.path.join(dirname, '__init__.py')
if not os.path.exists(initpy):
os.mkdirs(dirname)
with open(initpy, 'w'):
pass
return dirname

View file

@ -1,3 +1,4 @@
from collections import namedtuple
import contextlib
import itertools
import platform
@ -20,6 +21,7 @@ from _pydevd_bundle.pydevd_comm import (
CMD_GET_VARIABLE,
)
from ptvsd import wrapper
from tests.helpers.protocol import MessageCounters
from tests.helpers.pydevd import FakePyDevd
from tests.helpers.vsc import FakeVSC
@ -28,6 +30,34 @@ from tests.helpers.vsc import FakeVSC
OS_ID = 'WINDOWS' if platform.system() == 'Windows' else 'UNIX'
class Thread(namedtuple('Thread', 'id name')):
"""Information about a thread."""
PREFIX = 'Thread-'
@classmethod
def from_raw(cls, raw):
"""Return a Thread corresponding to the given value."""
if isinstance(raw, cls):
return raw
elif isinstance(raw, str):
return cls(None, raw)
elif isinstance(raw, int):
return cls(raw)
else:
return cls(*raw)
def __new__(cls, id, name=None):
id = int(id) if id or id == 0 else None
name = str(name) if name else cls.PREFIX + str(id)
self = super(Thread, cls).__new__(cls, id, name)
return self
def __init__(self, *args, **kwargs):
if self.id is None:
raise TypeError('missing id')
class PyDevdMessages(object):
protocol = FakePyDevd.PROTOCOL
@ -181,6 +211,25 @@ class VSCMessages(object):
}
class PyDevdLifecycle(object):
def __init__(self, fix):
self._fix = fix
@contextlib.contextmanager
def _wait_for_initialized(self):
with self._fix.expect_command(CMD_REDIRECT_OUTPUT):
with self._fix.expect_command(CMD_RUN):
yield
def _initialize(self):
version = self._fix.fake.VERSION
self._fix.set_response(CMD_VERSION, version)
def notify_main_thread(self):
self._fix.notify_main_thread()
class VSCLifecycle(object):
PORT = 8888
@ -189,27 +238,29 @@ class VSCLifecycle(object):
'adapterID': '<an adapter ID>',
}
def __init__(self, fix):
def __init__(self, fix, pydevd=None, hidden=None):
self._fix = fix
self._pydevd = pydevd
self._hidden = hidden or fix.hidden
def launched(self, port=None, **kwargs):
def launched(self, port=None, hidedisconnect=False, **kwargs):
def start():
self.launch(**kwargs)
return self._started(start, port)
return self._started(start, port, hidedisconnect=hidedisconnect)
def attached(self, port=None, **kwargs):
def attached(self, port=None, hidedisconnect=False, **kwargs):
def start():
self.attach(**kwargs)
return self._started(start, port)
return self._started(start, port, hidedisconnect=hidedisconnect)
def launch(self, **kwargs):
"""Initialize the debugger protocol and then launch."""
with self._fix.hidden():
with self._hidden():
self._handshake('launch', **kwargs)
def attach(self, **kwargs):
"""Initialize the debugger protocol and then attach."""
with self._fix.hidden():
with self._hidden():
self._handshake('attach', **kwargs)
def disconnect(self, **reqargs):
@ -220,12 +271,12 @@ class VSCLifecycle(object):
# internal methods
@contextlib.contextmanager
def _started(self, start, port):
def _started(self, start, port, hidedisconnect=False):
if port is None:
port = self.PORT
addr = (None, port)
with self._fix.vsc.start(addr):
with self._fix.disconnect_when_done():
with self._fix.fake.start(addr):
with self._fix.disconnect_when_done(hide=hidedisconnect):
start()
yield
@ -245,32 +296,36 @@ class VSCLifecycle(object):
**dict(default_threads=default_threads))
self._handle_config(**config or {})
with self._fix.expect_debugger_command(CMD_REDIRECT_OUTPUT):
with self._fix.expect_debugger_command(CMD_RUN):
self._fix.send_request('configurationDone')
with self._wait_for_debugger_init():
self._fix.send_request('configurationDone')
if process:
main = (1, 'MainThead')
with self._fix.wait_for_event('process'):
with self._fix.wait_for_event('thread'):
self._fix.send_event(
CMD_THREAD_CREATE,
self._fix.debugger_msgs.format_threads(main),
)
if self._pydevd:
self._pydevd.notify_main_thread()
if reset:
self._fix.reset()
else:
self._fix.assert_no_failures()
@contextlib.contextmanager
def _wait_for_debugger_init(self):
if self._pydevd:
with self._pydevd._wait_for_initialized():
yield
else:
yield
def _initialize(self, **reqargs):
"""
See https://code.visualstudio.com/docs/extensionAPI/api-debugging#_the-vs-code-debug-protocol-in-a-nutshell
""" # noqa
def handle_response(resp, _):
self._capabilities = resp.body
version = self._fix.debugger.VERSION
self._fix.set_debugger_response(CMD_VERSION, version)
if self._pydevd:
self._pydevd._initialize()
self._fix.send_request(
'initialize',
dict(self.MIN_INITIALIZE_ARGS, **reqargs),
@ -294,48 +349,24 @@ class VSCLifecycle(object):
raise NotImplementedError
class HighlevelFixture(object):
class FixtureBase(object):
"""Base class for protocol daemon test fixtures."""
DAEMON = FakeVSC
DEBUGGER = FakePyDevd
def __init__(self, new_daemon=None, new_debugger=None):
if new_daemon is None:
new_daemon = self.DAEMON
if new_debugger is None:
new_debugger = self.DEBUGGER
self._new_daemon = new_daemon
self._new_debugger = new_debugger
self.vsc_msgs = VSCMessages()
self.debugger_msgs = PyDevdMessages()
def __init__(self, new_fake, new_msgs):
if not callable(new_fake):
raise ValueError('bad new_fake {!r}'.format(new_fake))
self._new_fake = new_fake
self.msgs = new_msgs()
self._hidden = False
self._default_threads = None
@property
def vsc(self):
def fake(self):
try:
return self._vsc
return self._fake
except AttributeError:
self._vsc, self._debugger = self.new_fake()
return self._vsc
@property
def debugger(self):
try:
return self._debugger
except AttributeError:
self._vsc, self._debugger = self.new_fake()
return self._debugger
@property
def lifecycle(self):
try:
return self._lifecycle
except AttributeError:
self._lifecycle = VSCLifecycle(self)
return self._lifecycle
self._fake = self.new_fake()
return self._fake
@property
def ishidden(self):
@ -343,113 +374,74 @@ class HighlevelFixture(object):
@contextlib.contextmanager
def hidden(self):
vsc = self.vsc.received
debugger = self.debugger.received
received = self.fake.received
orig = self._hidden
self._hidden = True
try:
yield
finally:
self._hidden = orig
self.vsc.reset(*vsc)
self.debugger.reset(*debugger)
self.fake.reset(*received)
def new_fake(self, debugger=None, handler=None):
"""Return a new fake VSC that may be used in tests."""
if debugger is None:
debugger = self._new_debugger()
vsc = self._new_daemon(debugger.start, handler)
return vsc, debugger
def set_fake(self, fake):
if hasattr(self, '_fake'):
raise AttributeError('fake already set')
self._fake = fake
def send_request(self, command, args=None, handle_response=None):
kwargs = dict(args or {}, handler=handle_response)
with self._wait_for_response(command, **kwargs) as req:
self.vsc.send_request(req)
return req
def new_fake(self, handler=None, **kwargs):
"""Return a new fake that may be used in tests."""
return self._new_fake(handler=handler, **kwargs)
def assert_no_failures(self):
assert self.fake.failures == [], self.fake.failures
def reset(self):
self.assert_no_failures()
self.fake.reset()
class PyDevdFixture(FixtureBase):
"""A test fixture for the PyDevd protocol."""
FAKE = FakePyDevd
MSGS = PyDevdMessages
def __init__(self, new_fake=None):
if new_fake is None:
new_fake = self.FAKE
super(PyDevdFixture, self).__init__(new_fake, self.MSGS)
self._default_threads = None
def notify_main_thread(self):
main = (1, 'MainThead')
self.send_event(
CMD_THREAD_CREATE,
self.msgs.format_threads(main),
)
@contextlib.contextmanager
def _wait_for_response(self, command, *args, **kwargs):
handler = kwargs.pop('handler', None)
req = self.vsc_msgs.new_request(command, *args, **kwargs)
with self.vsc.wait_for_response(req, handler=handler):
yield req
if self._hidden:
self.vsc_msgs.next_response()
@contextlib.contextmanager
def wait_for_event(self, event, *args, **kwargs):
with self.vsc.wait_for_event(event, *args, **kwargs):
yield
if self._hidden:
self.vsc_msgs.next_event()
@contextlib.contextmanager
def _wait_for_events(self, events):
if not events:
yield
return
with self._wait_for_events(events[1:]):
with self.wait_for_event(events[0]):
yield
@contextlib.contextmanager
def expect_debugger_command(self, cmdid):
def expect_command(self, cmdid):
yield
if self._hidden:
self.debugger_msgs.next_request()
self.msgs.next_request()
def set_debugger_response(self, cmdid, payload, **kwargs):
self.debugger.add_pending_response(cmdid, payload, **kwargs)
def set_response(self, cmdid, payload, **kwargs):
self.fake.add_pending_response(cmdid, payload, **kwargs)
if self._hidden:
self.debugger_msgs.next_request()
self.msgs.next_request()
def send_debugger_event(self, cmdid, payload):
event = self.debugger_msgs.new_event(cmdid, payload)
self.debugger.send_event(event)
def send_event(self, cmdid, payload):
event = self.msgs.new_event(cmdid, payload)
self.fake.send_event(event)
def send_event(self, cmdid, text, event=None, handler=None):
if event is not None:
with self.wait_for_event(event, handler=handler):
self.send_debugger_event(cmdid, text)
else:
self.send_debugger_event(cmdid, text)
return None
def set_threads(self, _thread, *threads, **kwargs):
threads = (_thread,) + threads
return self._set_threads(threads, **kwargs)
def set_thread(self, thread):
threads = (thread,)
return self._set_threads(threads)[thread]
def _set_threads(self, threads, default_threads=True):
request = {t[1]: t for t in threads}
response = {t: None for t in threads}
def set_threads_response(self, threads, default_threads=True):
threads = [Thread.from_raw(t) for t in threads]
if default_threads:
threads = self._add_default_threads(threads)
active = [name
for _, name in threads
if not name.startswith(('ptvsd.', 'pydevd.'))]
text = self.debugger_msgs.format_threads(*threads)
self.set_debugger_response(CMD_RETURN, text, reqid=CMD_LIST_THREADS)
with self._wait_for_events(['thread' for _ in active]):
self.send_request('threads')
for msg in reversed(self.vsc.received):
if msg.type == 'response':
if msg.command == 'threads':
break
else:
assert False, 'we waited for the response in send_request()'
for tinfo in msg.body['threads']:
try:
thread = request[tinfo['name']]
except KeyError:
continue
response[thread] = tinfo['id']
return response
text = self.msgs.format_threads(*threads)
self.set_response(CMD_RETURN, text, reqid=CMD_LIST_THREADS)
return threads
def _add_default_threads(self, threads):
if self._default_threads is not None:
@ -470,90 +462,356 @@ class HighlevelFixture(object):
allthreads = []
for tname in defaults:
tid = next(ids)
thread = tid, tname
thread = Thread(tid, tname)
allthreads.append(thread)
self._default_threads = list(allthreads)
allthreads.extend(threads)
return allthreads
def send_suspend_event(self, thread, reason, *stack):
thread = Thread.from_raw(thread)
self._suspend(thread, reason, stack)
def send_pause_event(self, thread, *stack):
thread = Thread.from_raw(thread)
reason = CMD_THREAD_SUSPEND
self._suspend(thread, reason, stack)
def _suspend(self, thread, reason, stack):
self.send_event(
CMD_THREAD_SUSPEND,
self.msgs.format_frames(thread.id, reason, *stack),
)
def send_caught_exception_events(self, thread, exc, *stack):
thread = Thread.from_raw(thread)
reason = CMD_STEP_CAUGHT_EXCEPTION
self._exception(thread, exc, reason, stack)
def _exception(self, thread, exc, reason, stack):
self.send_event(
CMD_SEND_CURR_EXCEPTION_TRACE,
self.msgs.format_exception(thread.id, exc, *stack),
)
self.send_suspend_event(thread, reason, *stack)
#self.set_exception_var_response(exc)
def set_exception_var_response(self, exc):
self.set_response(
CMD_GET_VARIABLE,
self.msgs.format_variables(
('???', '???'),
('???', exc),
),
)
class VSCFixture(FixtureBase):
"""A test fixture for the DAP."""
FAKE = FakeVSC
MSGS = VSCMessages
LIFECYCLE = VSCLifecycle
START_ADAPTER = None
def __init__(self, new_fake=None, start_adapter=None):
if new_fake is None:
new_fake = self.FAKE
if start_adapter is None:
start_adapter = self.START_ADAPTER
elif not callable(start_adapter):
raise ValueError('bad start_adapter {!r}'.format(start_adapter))
def new_fake(start_adapter=start_adapter, handler=None,
_new_fake=new_fake):
return _new_fake(start_adapter, handler=handler)
super(VSCFixture, self).__init__(new_fake, self.MSGS)
@property
def vsc(self):
return self.fake
@property
def vsc_msgs(self):
return self.msgs
@property
def lifecycle(self):
try:
return self._lifecycle
except AttributeError:
self._lifecycle = self.LIFECYCLE(self)
return self._lifecycle
def send_request(self, command, args=None, handle_response=None):
kwargs = dict(args or {}, handler=handle_response)
with self._wait_for_response(command, **kwargs) as req:
self.fake.send_request(req)
return req
@contextlib.contextmanager
def _wait_for_response(self, command, *args, **kwargs):
handler = kwargs.pop('handler', None)
req = self.msgs.new_request(command, *args, **kwargs)
with self.fake.wait_for_response(req, handler=handler):
yield req
if self._hidden:
self.msgs.next_response()
@contextlib.contextmanager
def wait_for_event(self, event, *args, **kwargs):
with self.fake.wait_for_event(event, *args, **kwargs):
yield
if self._hidden:
self.msgs.next_event()
@contextlib.contextmanager
def _wait_for_events(self, events):
if not events:
yield
return
with self._wait_for_events(events[1:]):
with self.wait_for_event(events[0]):
yield
@contextlib.contextmanager
def disconnect_when_done(self, hide=True):
try:
yield
finally:
if hide:
with self.hidden():
self._disconnect()
else:
self._disconnect()
def _disconnect(self):
with self.exits_after(exitcode=0):
self.send_request('disconnect')
@contextlib.contextmanager
def exits_after(self, exitcode):
wrapper.ptvsd_sys_exit_code = exitcode
with self._wait_for_events(['exited', 'terminated']):
yield
class HighlevelFixture(object):
DAEMON = FakeVSC
DEBUGGER = FakePyDevd
def __init__(self, vsc=None, pydevd=None):
if vsc is None:
self._new_vsc = self.DAEMON
vsc = VSCFixture(new_fake=self._new_fake_vsc)
elif callable(vsc):
self._new_vsc = vsc
vsc = VSCFixture(new_fake=self._new_fake_vsc)
else:
self._new_vsc = None
self._vsc = vsc
if pydevd is None:
pydevd = PyDevdFixture(self.DEBUGGER)
elif callable(pydevd):
pydevd = PyDevdFixture(pydevd)
self._pydevd = pydevd
def highlevel_lifecycle(fix, _cls=vsc.LIFECYCLE):
pydevd = PyDevdLifecycle(self._pydevd)
return _cls(fix, pydevd, self.hidden)
vsc.LIFECYCLE = highlevel_lifecycle
def _new_fake_vsc(self, start_adapter=None, handler=None):
if start_adapter is None:
try:
self._default_fake_vsc
except AttributeError:
pass
else:
raise RuntimeError('default fake VSC already created')
start_adapter = self.debugger.start
return self._new_vsc(start_adapter, handler)
@property
def vsc(self):
return self._vsc.fake
@property
def vsc_msgs(self):
return self._vsc.msgs
@property
def debugger(self):
return self._pydevd.fake
@property
def debugger_msgs(self):
return self._pydevd.msgs
@property
def lifecycle(self):
return self._vsc.lifecycle
@property
def ishidden(self):
return self._vsc.ishidden and self._pydevd.ishidden
@contextlib.contextmanager
def hidden(self):
with self._vsc.hidden():
with self._pydevd.hidden():
yield
def new_fake(self, debugger=None, handler=None):
"""Return a new fake VSC that may be used in tests."""
if debugger is None:
debugger = self._pydevd.new_fake()
vsc = self._vsc.new_fake(debugger.start, handler)
return vsc, debugger
def assert_no_failures(self):
self._vsc.assert_no_failures()
self._pydevd.assert_no_failures()
def reset(self):
self._vsc.reset()
self._debugger.reset()
# wrappers
def send_request(self, command, args=None, handle_response=None):
return self._vsc.send_request(command, args, handle_response)
@contextlib.contextmanager
def wait_for_event(self, event, *args, **kwargs):
with self._vsc.wait_for_event(event, *args, **kwargs):
yield
@contextlib.contextmanager
def expect_debugger_command(self, cmdid):
with self._pydevd.expected_command(cmdid):
yield
def set_debugger_response(self, cmdid, payload, **kwargs):
self._pydevd.set_response(cmdid, payload, **kwargs)
def send_debugger_event(self, cmdid, payload):
self._pydevd.send_event(cmdid, payload)
@contextlib.contextmanager
def disconnect_when_done(self):
with self._vsc.disconnect_when_done():
yield
# combinations
def send_event(self, cmdid, text, event=None, handler=None):
if event is not None:
with self.wait_for_event(event, handler=handler):
self.send_debugger_event(cmdid, text)
else:
self.send_debugger_event(cmdid, text)
return None
def set_threads(self, _thread, *threads, **kwargs):
first = Thread.from_raw(_thread)
threads = [first] + [Thread.from_raw(t) for t in threads]
return self._set_threads(threads, **kwargs)
def set_thread(self, thread):
thread = Thread.from_raw(thread)
threads = (thread,)
return self._set_threads(threads)[thread]
def _set_threads(self, threads, default_threads=True):
# Set up and send messages.
allthreads = self._pydevd.set_threads_response(
threads,
default_threads=default_threads,
)
ignored = ('ptvsd.', 'pydevd.')
supported = [t for t in allthreads if not t.name.startswith(ignored)]
with self._vsc._wait_for_events(['thread' for _ in supported]):
self.send_request('threads')
# Extract thread info from the response.
request = {t.name: t for t in threads}
response = {t: None for t in threads}
for msg in reversed(self.vsc.received):
if msg.type == 'response':
if msg.command == 'threads':
break
else:
assert False, 'we waited for the response in send_request()'
for tinfo in msg.body['threads']:
try:
thread = request[tinfo['name']]
except KeyError:
continue
response[thread] = tinfo['id']
return response
def suspend(self, thread, reason, *stack):
ptid, _ = thread
with self.wait_for_event('stopped'):
if isinstance(reason, Exception):
exc = reason
reason = CMD_STEP_CAUGHT_EXCEPTION
self.set_debugger_response(
CMD_GET_VARIABLE,
self.debugger_msgs.format_variables(
('???', '???'),
('???', exc),
),
)
self.send_debugger_event(
CMD_THREAD_SUSPEND,
self.debugger_msgs.format_frames(ptid, reason, *stack),
)
self._pydevd.send_caught_exception_events(thread, exc, *stack)
self._pydevd.set_exception_var_response(exc)
else:
self._pydevd.send_suspend_event(thread, reason, *stack)
def pause(self, thread, *stack):
thread = Thread.from_raw(thread)
tid = self.set_thread(thread)
self.suspend(thread, CMD_THREAD_SUSPEND, *stack)
self._pydevd.send_pause_event(thread, *stack)
if self._vsc._hidden:
self._vsc.msgs.next_event()
self.send_request('stackTrace', {'threadId': tid})
self.send_request('scopes', {'frameId': 1})
return tid
def error(self, thread, exc, frame):
thread = Thread.from_raw(thread)
tid = self.set_thread(thread)
self.send_debugger_event(
CMD_SEND_CURR_EXCEPTION_TRACE,
self.debugger_msgs.format_exception(thread[0], exc, frame),
)
self.suspend(thread, exc, frame)
return tid
#def set_variables(self, ...):
# ...
@contextlib.contextmanager
def disconnect_when_done(self):
try:
yield
finally:
self.send_request('disconnect')
class VSCTest(object):
"""The base mixin class for high-level VSC-only ptvsd tests."""
def assert_no_failures(self):
assert self.vsc.failures == [], self.vsc.failures
assert self.debugger.failures == [], self.debugger.failures
FIXTURE = VSCFixture
def reset(self):
self.assert_no_failures()
self.vsc.reset()
self.debugger.reset()
_ready = False
_fix = None # overridden in setUp()
class HighlevelTest(object):
"""The base mixin class for high-level ptvsd tests."""
FIXTURE = HighlevelFixture
fix = None # overridden in setUp()
@classmethod
def _new_daemon(cls, *args, **kwargs):
return cls.FIXTURE.FAKE(*args, **kwargs)
def setUp(self):
super(HighlevelTest, self).setUp()
def new_daemon(*args, **kwargs):
vsc = self.FIXTURE.DAEMON(*args, **kwargs)
self.addCleanup(vsc.close)
return vsc
self.fix = self.FIXTURE(new_daemon)
super(VSCTest, self).setUp()
self._ready = True
self.maxDiff = None
def __getattr__(self, name):
if not self._ready:
raise AttributeError
return getattr(self.fix, name)
@property
def pydevd(self):
return self.debugger
def fix(self):
if self._fix is None:
def new_daemon(*args, **kwargs):
vsc = self._new_daemon(*args, **kwargs)
self.addCleanup(vsc.close)
return vsc
self._fix = self._new_fixture(new_daemon)
return self._fix
@property
def new_response(self):
@ -567,10 +825,8 @@ class HighlevelTest(object):
def new_event(self):
return self.fix.vsc_msgs.new_event
def new_fake(self, debugger=None, handler=None):
"""Return a new fake VSC that may be used in tests."""
vsc, debugger = self.fix.new_fake(debugger, handler)
return vsc, debugger
def _new_fixture(self, new_daemon):
return self.FIXTURE(new_daemon)
def assert_vsc_received(self, received, expected):
received = list(self.vsc.protocol.parse_each(received))
@ -594,6 +850,25 @@ class HighlevelTest(object):
self.assertEqual(received, expected)
class HighlevelTest(VSCTest):
"""The base mixin class for high-level ptvsd tests."""
FIXTURE = HighlevelFixture
@classmethod
def _new_daemon(cls, *args, **kwargs):
return cls.FIXTURE.DAEMON(*args, **kwargs)
@property
def pydevd(self):
return self.debugger
def new_fake(self, debugger=None, handler=None):
"""Return a new fake VSC that may be used in tests."""
vsc, debugger = self.fix.new_fake(debugger, handler)
return vsc, debugger
class RunningTest(HighlevelTest):
"""The mixin class for high-level tests for post-start operations."""

View file

@ -0,0 +1,188 @@
import contextlib
import unittest
from tests.helpers.pydevd._live import LivePyDevd
from tests.helpers.workspace import PathEntry
from . import VSCFixture, VSCTest
class Fixture(VSCFixture):
def __init__(self, source, new_fake=None):
self._pydevd = LivePyDevd(source)
super(Fixture, self).__init__(
new_fake=new_fake,
start_adapter=self._pydevd.start,
)
@property
def binder(self):
return self._pydevd.binder
def install_sig_handler(self):
self._pydevd._ptvsd.install_sig_handler()
class TestBase(VSCTest):
FIXTURE = Fixture
FILENAME = None
SOURCE = ''
def setUp(self):
super(TestBase, self).setUp()
self._workspace = PathEntry()
self._filename = None
if self.FILENAME is not None:
self.set_source_file(self.FILENAME, self.SOURCE)
def tearDown(self):
super(TestBase, self).tearDown()
self._workspace.cleanup()
@property
def workspace(self):
return self._workspace
def _new_fixture(self, new_daemon):
self.assertIsNotNone(self._filename)
return self.FIXTURE(self._filename, new_daemon)
def set_source_file(self, filename, content=None):
self.assertIsNone(self._fix)
if content is not None:
filename = self.workspace.write(filename, content=content)
self.workspace.install()
self._filename = 'file:' + filename
def set_module(self, name, content=None):
self.assertIsNone(self._fix)
if content is not None:
self.write_module(name, content)
self.workspace.install()
self._filename = 'module:' + name
##################################
# lifecycle tests
class LifecycleTests(TestBase, unittest.TestCase):
FILENAME = 'spam.py'
SOURCE = ''
@contextlib.contextmanager
def running(self):
addr = (None, 8888)
with self.fake.start(addr):
#with self.fix.install_sig_handler():
yield
def test_launch(self):
addr = (None, 8888)
with self.fake.start(addr):
with self.vsc.wait_for_event('initialized'):
# initialize
req_initialize = self.send_request('initialize', {
'adapterID': 'spam',
})
# attach
req_attach = self.send_request('attach')
# configuration
req_config = self.send_request('configurationDone')
# Normal ops would go here.
# end
with self._wait_for_events(['exited', 'terminated']):
pass
self.fix.binder.wait_until_done()
received = self.vsc.received
self.assert_vsc_received(received, [
self.new_response(req_initialize, **dict(
supportsExceptionInfoRequest=True,
supportsConfigurationDoneRequest=True,
supportsConditionalBreakpoints=True,
supportsSetVariable=True,
supportsExceptionOptions=True,
exceptionBreakpointFilters=[
{
'filter': 'raised',
'label': 'Raised Exceptions',
'default': 'false'
},
{
'filter': 'uncaught',
'label': 'Uncaught Exceptions',
'default': 'true'
},
],
supportsEvaluateForHovers=True,
)),
self.new_event('initialized'),
self.new_response(req_attach),
self.new_response(req_config),
self.new_event('exited', exitCode=0),
self.new_event('terminated'),
])
##################################
# "normal operation" tests
class VSCFlowTest(TestBase):
@contextlib.contextmanager
def launched(self, port=8888, **kwargs):
kwargs.setdefault('process', False)
with self.lifecycle.launched(port=port, hidedisconnect=True, **kwargs):
#with self.fix.install_sig_handler():
yield
class BreakpointTests(VSCFlowTest, unittest.TestCase):
FILENAME = 'spam.py'
SOURCE = """
from __future__ import print_function
#class Counter(object):
# def __init__(self, start=0):
# self._next = start
# def __repr__(self):
# return '{}(start={})'.format(type(self).__name__, self._next)
# def __int__(self):
# return self._next - 1
# __index__ = __int__
# def __iter__(self):
# return self
# def __next__(self):
# value = self._next
# self._next += 1
# def peek(self):
# return self._next
# def inc(self, diff=1):
# self._next += diff
def inc(value, count=1):
return value + count
x = 1
x = inc(x)
y = inc(x, 2)
z = inc(3)
print(x, y, z)
"""
def test_no_breakpoints(self):
with self.launched():
# All the script to run to completion.
received = self.vsc.received
self.assert_received(self.vsc, [])
self.assert_vsc_received(received, [])

View file

@ -1875,7 +1875,7 @@ class ThreadEventTest(PyDevdEventTest):
return self._tid
class ThreadCreateTests(ThreadEventTest, unittest.TestCase):
class ThreadCreateEventTests(ThreadEventTest, unittest.TestCase):
CMD = CMD_THREAD_CREATE
EVENT = 'thread'
@ -1904,6 +1904,7 @@ class ThreadCreateTests(ThreadEventTest, unittest.TestCase):
])
self.assert_received(self.debugger, [])
@unittest.skip('currently not supported')
def test_attached(self):
with self.attached(8888, process=False):
with self.wait_for_event('process'):
@ -1963,6 +1964,7 @@ class ThreadCreateTests(ThreadEventTest, unittest.TestCase):
self.assert_received(self.debugger, [])
def test_pydevd_name(self):
self.EVENT = None
with self.launched():
self.send_event(10, 'pydevd.spam')
received = self.vsc.received
@ -1971,6 +1973,7 @@ class ThreadCreateTests(ThreadEventTest, unittest.TestCase):
self.assert_received(self.debugger, [])
def test_ptvsd_name(self):
self.EVENT = None
with self.launched():
self.send_event(10, 'ptvsd.spam')
received = self.vsc.received
@ -2004,6 +2007,7 @@ class ThreadKillEventTests(ThreadEventTest, unittest.TestCase):
self.assert_received(self.debugger, [])
def test_unknown(self):
self.EVENT = None
with self.launched():
self.send_event(10)
received = self.vsc.received
@ -2012,6 +2016,7 @@ class ThreadKillEventTests(ThreadEventTest, unittest.TestCase):
self.assert_received(self.debugger, [])
def test_pydevd_name(self):
self.EVENT = None
thread = (10, 'pydevd.spam')
with self.launched():
with self.hidden():
@ -2023,6 +2028,7 @@ class ThreadKillEventTests(ThreadEventTest, unittest.TestCase):
self.assert_received(self.debugger, [])
def test_ptvsd_name(self):
self.EVENT = None
thread = (10, 'ptvsd.spam')
with self.launched():
with self.hidden():
@ -2049,7 +2055,7 @@ class ThreadSuspendEventTests(ThreadEventTest, unittest.TestCase):
return self.debugger_msgs.format_frames(threadid, reason, *frames)
def test_step_into(self):
thread = (10, '')
thread = (10, 'x')
with self.launched():
with self.hidden():
self.set_thread(thread)
@ -2067,7 +2073,7 @@ class ThreadSuspendEventTests(ThreadEventTest, unittest.TestCase):
self.assert_received(self.debugger, [])
def test_step_over(self):
thread = (10, '')
thread = (10, 'x')
with self.launched():
with self.hidden():
self.set_thread(thread)
@ -2085,7 +2091,7 @@ class ThreadSuspendEventTests(ThreadEventTest, unittest.TestCase):
self.assert_received(self.debugger, [])
def test_step_return(self):
thread = (10, '')
thread = (10, 'x')
with self.launched():
with self.hidden():
self.set_thread(thread)
@ -2104,7 +2110,7 @@ class ThreadSuspendEventTests(ThreadEventTest, unittest.TestCase):
def test_caught_exception(self):
exc = RuntimeError('something went wrong')
thread = (10, '')
thread = (10, 'x')
with self.launched():
with self.hidden():
self.set_thread(thread)
@ -2137,7 +2143,7 @@ class ThreadSuspendEventTests(ThreadEventTest, unittest.TestCase):
def test_exception_break(self):
exc = RuntimeError('something went wrong')
thread = (10, '')
thread = (10, 'x')
with self.launched():
with self.hidden():
self.set_thread(thread)
@ -2169,7 +2175,7 @@ class ThreadSuspendEventTests(ThreadEventTest, unittest.TestCase):
])
def test_suspend(self):
thread = (10, '')
thread = (10, 'x')
with self.launched():
with self.hidden():
self.set_thread(thread)
@ -2187,7 +2193,7 @@ class ThreadSuspendEventTests(ThreadEventTest, unittest.TestCase):
self.assert_received(self.debugger, [])
def test_unknown_reason(self):
thread = (10, '')
thread = (10, 'x')
with self.launched():
with self.hidden():
self.set_thread(thread)
@ -2208,11 +2214,11 @@ class ThreadSuspendEventTests(ThreadEventTest, unittest.TestCase):
# TODO: verify behavior
@unittest.skip('poorly specified')
def test_no_reason(self):
thread = (10, '')
thread = (10, 'x')
with self.launched():
with self.hidden():
self.set_thread(thread)
tid = self.send_event(10, '')
tid = self.send_event(10, 'x')
received = self.vsc.received
self.assert_vsc_received(received, [
@ -2228,7 +2234,7 @@ class ThreadSuspendEventTests(ThreadEventTest, unittest.TestCase):
# TODO: verify behavior
@unittest.skip('poorly specified')
def test_str_reason(self):
thread = (10, '')
thread = (10, 'x')
with self.launched():
with self.hidden():
self.set_thread(thread)
@ -2256,7 +2262,7 @@ class ThreadRunEventTests(ThreadEventTest, unittest.TestCase):
return '{}\t{}'.format(threadid, reason)
def test_basic(self):
thread = (10, '')
thread = (10, 'x')
with self.launched():
with self.hidden():
self.pause(thread, *[