Fix remaining tests to reflect the debug adapter refactoring changes.

Fix Flask and Django multiprocess tests.

Fix test logs not being captured by pytest.

Fix "import debug_me" check improperly applied in tests where it is unnecessary.

Fix some clarifying patterns not respecting the underlying pattern.

Add pattern helpers for strings: starting_with, ending_with, containing.

Move DAP test helpers to a separate module, and add a helper for frames.

Add support for line markers when setting breakpoints and matching frames.

Assorted test fixes around handling of Unicode and paths.
This commit is contained in:
Pavel Minaev 2019-07-09 17:49:12 -07:00 committed by Pavel Minaev
parent 746bda561e
commit c03206972d
40 changed files with 1549 additions and 1344 deletions

View file

@ -26,4 +26,4 @@ install:
- pip install -U pip setuptools tox tox-travis
script:
- tox
- tox -- -n4 -v

View file

@ -2,4 +2,4 @@
testpaths=tests
timeout=30
timeout_method=thread
addopts=-n6
addopts=-n8

View file

@ -71,7 +71,7 @@ def force_bytes(s, encoding, errors="strict"):
return s
def force_str(s, encoding, errors="strict"):
def force_str(s, encoding="ascii", errors="strict"):
"""Converts s to str (which is bytes on Python 2, and unicode on Python 3), using
the provided encoding if necessary. If s is already str, it is returned as is.
@ -137,6 +137,12 @@ def nameof(obj, quote=False):
return force_unicode(name, "utf-8", "replace")
def unicode_repr(obj):
"""Like repr(), but guarantees that the result is Unicode even on Python 2.
"""
return force_unicode(repr(obj), "ascii")
def srcnameof(obj):
"""Returns the most descriptive name of a Python module, class, or function,
including source information (filename and linenumber), if available.

View file

@ -21,6 +21,8 @@ LEVELS = ("debug", "info", "warning", "error")
"""Logging levels, lowest to highest importance.
"""
stderr = sys.__stderr__
stderr_levels = {"warning", "error"}
"""What should be logged to stderr.
"""
@ -44,6 +46,17 @@ _lock = threading.Lock()
_tls = threading.local()
# Used to inject a newline into stderr if logging there, to clean up the output
# when it's intermixed with regular prints from other sources.
def newline(level="info"):
with _lock:
if level in stderr_levels:
try:
stderr.write("\n")
except Exception:
pass
def write(level, text):
assert level in LEVELS
@ -62,7 +75,7 @@ def write(level, text):
with _lock:
if level in stderr_levels:
try:
sys.__stderr__.write(output)
stderr.write(output)
except Exception:
pass

View file

@ -70,7 +70,13 @@ class JsonIOStream(object):
socket.settimeout(None) # make socket blocking
if name is None:
name = repr(socket)
# TODO: investigate switching to buffered sockets; readline() on unbuffered
# sockets is very slow! Although the implementation of readline() itself is
# native code, it calls read(1) in a loop - and that then ultimately calls
# SocketIO.readinto(), which is implemented in Python.
socket_io = socket.makefile("rwb", 0)
return cls(socket_io, socket_io, name)
def __init__(self, reader, writer, name=None):
@ -668,7 +674,7 @@ class JsonMessageChannel(object):
self.stream = stream
self.handlers = handlers
self.name = name if name is not None else stream.name
self._lock = threading.Lock()
self._lock = threading.RLock()
self._stop = threading.Event()
self._seq_iter = itertools.count(1)
self._requests = {}

View file

@ -32,7 +32,7 @@ import _pydevd_bundle.pydevd_comm_constants as pydevd_comm_constants # noqa
import _pydevd_bundle.pydevd_extension_api as pydevd_extapi # noqa
import _pydevd_bundle.pydevd_extension_utils as pydevd_extutil # noqa
import _pydevd_bundle.pydevd_frame as pydevd_frame # noqa
from pydevd_file_utils import get_abs_path_real_path_and_base_from_file # noqa
import pydevd_file_utils
from _pydevd_bundle.pydevd_dont_trace_files import PYDEV_FILE # noqa
import ptvsd
@ -59,8 +59,7 @@ def path_to_unicode(s):
return s if isinstance(s, unicode) else s.decode(sys.getfilesystemencoding())
PTVSD_DIR_PATH = os.path.dirname(os.path.abspath(get_abs_path_real_path_and_base_from_file(__file__)[0])) + os.path.sep
NORM_PTVSD_DIR_PATH = os.path.normcase(PTVSD_DIR_PATH)
PTVSD_DIR_PATH = pydevd_file_utils.normcase(os.path.dirname(ptvsd.__file__) + os.path.sep)
class UnsupportedPyDevdCommandError(Exception):

View file

@ -40,5 +40,10 @@ name = "ptvsd-" + str(session_id)
# to DebugSession - the debuggee simply needs to execute it as is.
_code = os.getenv("PTVSD_DEBUG_ME")
if _code:
# Remove it, so that subprocesses don't try to manually configure ptvsd on the
# same port. In multiprocess scenarios, subprocesses are supposed to load ptvsd
# via code that is automatically injected into the subprocess by its parent.
del os.environ["PTVSD_DEBUG_ME"]
_code = compile(_code, "<PTVSD_DEBUG_ME>", "exec")
eval(_code, {})

View file

@ -18,45 +18,6 @@ import debug_me
from ptvsd.common import fmt, log, messaging
name = fmt("backchannel-{0}", debug_me.session_id)
port = os.getenv("PTVSD_BACKCHANNEL_PORT")
if port is not None:
port = int(port)
# Remove it, so that child processes don't try to use the same backchannel.
del os.environ["PTVSD_BACKCHANNEL_PORT"]
if port:
log.info("Connecting {0} to port {1}...", name, port)
_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
_socket.connect(("localhost", port))
_stream = messaging.JsonIOStream.from_socket(_socket, name="backchannel")
@atexit.register
def _atexit_handler():
log.info("Shutting down {0}...", name)
try:
_socket.shutdown(socket.SHUT_RDWR)
except Exception:
pass
finally:
try:
_socket.close()
except Exception:
pass
else:
class _stream:
def _error(*_):
raise AssertionError("Backchannel is not set up for this process")
read_json = write_json = _error
def send(value):
_stream.write_json(value)
@ -72,3 +33,52 @@ def wait_for(expected):
expected,
actual,
)
def close():
global _socket, _stream
if _socket is None:
return
log.info("Shutting down {0}...", name)
try:
_socket.shutdown(socket.SHUT_RDWR)
except Exception:
pass
finally:
_socket = None
try:
_stream.close()
except Exception:
pass
finally:
_stream = None
class _stream:
def _error(*_):
raise AssertionError("Backchannel is not set up for this process")
read_json = write_json = _error
close = lambda: None
name = fmt("backchannel-{0}", debug_me.session_id)
port = os.getenv("PTVSD_BACKCHANNEL_PORT")
if port is not None:
port = int(port)
log.info("Connecting {0} to port {1}...", name, port)
# Remove it, so that subprocesses don't try to use the same backchannel.
del os.environ["PTVSD_BACKCHANNEL_PORT"]
_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
_socket.connect(("localhost", port))
except Exception:
_socket.close()
raise
else:
_stream = messaging.JsonIOStream.from_socket(_socket, name="backchannel") # noqa
atexit.register(close)

View file

@ -11,6 +11,7 @@ import json
import pkgutil
import pytest
import py.path
import sys
# Do not import anything from ptvsd until assert rewriting is enabled below!
@ -55,6 +56,7 @@ from ptvsd.common import fmt, log, messaging
# Enable full logging to stderr, and make timestamps shorter to match maximum test
# run time better.
log.stderr = sys.stderr # use pytest-captured stderr rather than __stderr__
log.stderr_levels = set(log.LEVELS)
log.timestamp_format = "06.3f"

View file

@ -12,6 +12,8 @@ import re
from ptvsd.common import compat
_marked_line_numbers_cache = {}
def get_marked_line_numbers(path):
"""Given a path to a Python source file, extracts line numbers for all lines
@ -29,6 +31,11 @@ def get_marked_line_numbers(path):
if isinstance(path, py.path.local):
path = path.strpath
try:
return _marked_line_numbers_cache[path]
except KeyError:
pass
# Read as bytes, to avoid decoding errors on Python 3.
with open(path, "rb") as f:
lines = {}
@ -37,4 +44,6 @@ def get_marked_line_numbers(path):
if match:
marker = compat.force_unicode(match.group(1), "ascii")
lines[marker] = i + 1
return lines
_marked_line_numbers_cache[path] = lines
return lines

View file

@ -4,7 +4,7 @@
from __future__ import absolute_import, print_function, unicode_literals
from collections import namedtuple
import collections
import itertools
import os
import platform
@ -20,9 +20,9 @@ import time
import ptvsd
from ptvsd.common import compat, fmt, log, messaging
import tests
from tests import net
from tests import code, net
from tests.patterns import some
from tests.timeline import Timeline, Event, Response
from tests.timeline import Timeline, Event, Request, Response
PTVSD_DIR = py.path.local(ptvsd.__file__) / ".."
PTVSD_PORT = net.get_test_server_port(5678, 5800)
@ -41,7 +41,7 @@ ptvsd.wait_for_attach()
"""
StopInfo = namedtuple('StopInfo', [
StopInfo = collections.namedtuple('StopInfo', [
'body',
'frames',
'thread_id',
@ -51,11 +51,25 @@ StopInfo = namedtuple('StopInfo', [
class Session(object):
WAIT_FOR_EXIT_TIMEOUT = 10
"""Timeout used by wait_for_exit() before it kills the ptvsd process.
"""
START_METHODS = {
'launch', # ptvsd --client ... foo.py
'attach_socket_cmdline', # ptvsd ... foo.py
'attach_socket_import', # python foo.py (foo.py must import debug_me)
'attach_pid', # python foo.py && ptvsd ... --pid
'custom_client', # python foo.py (foo.py has to manually call ptvsd.attach)
'custom_server', # python foo.py (foo.py has to manually call ptvsd.enable_attach)
}
DEBUG_ME_START_METHODS = {"attach_socket_import"}
"""Start methods that require import debug_me."""
_counter = itertools.count(1)
def __init__(self, start_method='launch', ptvsd_port=None, pid=None):
assert start_method in ('launch', 'attach_pid', 'attach_socket_cmdline', 'attach_socket_import', 'custom_client')
assert start_method in self.START_METHODS
assert ptvsd_port is None or start_method.startswith('attach_socket_')
self.id = next(self._counter)
@ -77,6 +91,7 @@ class Session(object):
self.expected_returncode = 0
self.program_args = []
self.log_dir = None
self._before_connect = lambda: None
self.env = os.environ.copy()
self.env.update(PTVSD_ENV)
@ -230,6 +245,9 @@ class Session(object):
# argv += ['--pid', '<pid>'] # pid value to be appended later
return argv
def _get_argv_for_custom_server(self):
return [sys.executable]
def _get_argv_for_custom_client(self):
return [sys.executable]
@ -237,11 +255,13 @@ class Session(object):
assert os.path.isfile(filename)
with open(filename, "rb") as f:
code = f.read()
if self.start_method != "custom_client":
assert b"debug_me" in code, (
"Python source code that is run via tests.debug.Session must "
"import debug_me"
if self.start_method in self.DEBUG_ME_START_METHODS:
assert b"debug_me" in code, fmt(
"{0} is started via {1}, but it doesn't import debug_me.",
filename,
self.start_method,
)
return code
def _get_target(self):
@ -289,7 +309,7 @@ class Session(object):
for k, v in kwargs.items():
setattr(self, k, v)
assert self.start_method in ('launch', 'attach_pid', 'attach_socket_cmdline', 'attach_socket_import', 'custom_client')
assert self.start_method in self.START_METHODS
assert len(self.target) == 2
assert self.target[0] in ('file', 'module', 'code')
@ -304,13 +324,11 @@ class Session(object):
self.backchannel = BackChannel(self)
return self.backchannel
def before_connect(self):
"""Invoked by initialize() before connecting to the debuggee, or before waiting
for an incoming connection, but after all the session parameters (port number etc)
are determined."""
# The default implementation does nothing - this is a hook for the tests to override.
pass
def before_connect(self, func):
"""Registers a function to be invoked by initialize() before connecting to
the debuggee, or before waiting for an incoming connection, but after all
the session parameters (port number etc) are determined."""
self._before_connect = func
def initialize(self, **kwargs):
"""Spawns ptvsd using the configured method, telling it to execute the
@ -319,26 +337,32 @@ class Session(object):
If perform_handshake is True, calls self.handshake() before returning.
"""
self._setup_session(**kwargs)
start_method = self.start_method
log.info('Initializing debug session for {0}', self)
dbg_argv = []
usr_argv = []
if self.start_method == 'launch':
if start_method == 'launch':
self._listen()
dbg_argv += self._get_argv_for_launch()
elif self.start_method == 'attach_socket_cmdline':
elif start_method == 'attach_socket_cmdline':
dbg_argv += self._get_argv_for_attach_using_cmdline()
elif self.start_method == 'attach_socket_import':
elif start_method == 'attach_socket_import':
dbg_argv += self._get_argv_for_attach_using_import()
# TODO: Remove adding to python path after enabling Tox
self.env['PYTHONPATH'] = (PTVSD_DIR / "..").strpath + os.pathsep + self.env['PYTHONPATH']
self.env['PTVSD_DEBUG_ME'] = fmt(PTVSD_DEBUG_ME, ptvsd_port=self.ptvsd_port)
elif self.start_method == 'attach_pid':
elif start_method == 'attach_pid':
self._listen()
dbg_argv += self._get_argv_for_attach_using_pid()
elif self.start_method == 'custom_client':
elif start_method == 'custom_client':
self._listen()
dbg_argv += self._get_argv_for_custom_client()
elif start_method == 'custom_server':
dbg_argv += self._get_argv_for_custom_server()
else:
pytest.fail()
@ -348,14 +372,14 @@ class Session(object):
if self.no_debug:
dbg_argv += ['--nodebug']
if self.start_method == 'attach_pid':
if start_method == 'attach_pid':
usr_argv += [sys.executable]
usr_argv += self._get_target()
else:
dbg_argv += self._get_target()
if self.program_args:
if self.start_method == 'attach_pid':
if start_method == 'attach_pid':
usr_argv += list(self.program_args)
else:
dbg_argv += list(self.program_args)
@ -371,7 +395,7 @@ class Session(object):
# Assume that values are filenames - it's usually either that, or numbers.
make_filename = compat.filename_bytes if sys.version_info < (3,) else compat.filename
env = {
compat.force_str(k, "ascii"): make_filename(v)
compat.force_str(k): make_filename(v)
for k, v in self.env.items()
}
@ -395,14 +419,14 @@ class Session(object):
self,
py.path.local(ptvsd.__file__).dirpath(),
self.ptvsd_port,
self.start_method,
start_method,
self.target[0],
self.target[1],
self.cwd,
env_str,
)
spawn_args = usr_argv if self.start_method == 'attach_pid' else dbg_argv
spawn_args = usr_argv if start_method == 'attach_pid' else dbg_argv
# Normalize args to either bytes or unicode, depending on Python version.
spawn_args = [make_filename(s) for s in spawn_args]
@ -424,18 +448,19 @@ class Session(object):
if self.capture_output:
self.captured_output.capture(self.process)
if self.start_method == 'attach_pid':
if start_method == 'attach_pid':
# This is a temp process spawned to inject debugger into the debuggee.
dbg_argv += ['--pid', str(self.pid)]
log.info('Spawning {0} attach helper: {1!r}', self, dbg_argv)
attach_helper = subprocess.Popen(dbg_argv)
log.info('Spawned {0} attach helper with pid={1}', self, attach_helper.pid)
self.before_connect()
self._before_connect()
if self.start_method not in ('launch', 'attach_pid', 'custom_client'):
if start_method.startswith("attach_socket_") or start_method == "custom_server":
self.connect()
self.connected.wait()
assert self.ptvsd_port
assert self.socket
log.info('Spawned {0} with pid={1}', self, self.pid)
@ -695,18 +720,26 @@ class Session(object):
def _process_request(self, request):
assert False, 'ptvsd should not be sending requests.'
def request_continue(self):
self.send_request('continue').wait_for_response(freeze=False)
def set_breakpoints(self, path, lines=()):
return self.request('setBreakpoints', arguments={
'source': {'path': path},
'breakpoints': [{'line': bp_line} for bp_line in lines],
}).get('breakpoints', {})
def wait_for_next_event(self, event, body=some.object):
return self.timeline.wait_for_next(Event(event, body)).body
def output(self, category):
"""Returns all output of a given category as a single string, assembled from
all the "output" events received for that category so far.
"""
events = self.all_occurrences_of(
Event("output", some.dict.containing({"category": category}))
)
return "".join(event.body["output"] for event in events)
def captured_stdout(self, encoding=None):
return self.captured_output.stdout(encoding)
def captured_stderr(self, encoding=None):
return self.captured_output.stderr(encoding)
# Helpers for specific DAP patterns.
def wait_for_stop(self, reason=some.str, expected_frames=None, expected_text=None, expected_description=None):
stopped_event = self.wait_for_next(Event('stopped', some.dict.containing({'reason': reason})))
stopped = stopped_event.body
@ -737,7 +770,108 @@ class Session(object):
return StopInfo(stopped, frames, tid, fid)
def connect_to_child_session(self, ptvsd_subprocess):
def request_continue(self):
self.send_request('continue').wait_for_response(freeze=False)
def set_breakpoints(self, path, lines):
"""Sets breakpoints in the specified file, and returns the list of all the
corresponding DAP Breakpoint objects in the same order.
If lines are specified, it should be an iterable in which every element is
either a line number or a string. If it is a string, then it is translated
to the corresponding line number via get_marked_line_numbers(path).
If lines=all, breakpoints will be set on all the marked lines in the file.
"""
# Don't fetch line markers unless needed - in some cases, the breakpoints
# might be set in a file that does not exist on disk (e.g. remote attach).
def get_marked_line_numbers():
try:
return get_marked_line_numbers.cached
except AttributeError:
get_marked_line_numbers.cached = code.get_marked_line_numbers(path)
return get_marked_line_numbers()
if lines is all:
lines = get_marked_line_numbers().keys()
def make_breakpoint(line):
if isinstance(line, int):
descr = str(line)
else:
marker = line
line = get_marked_line_numbers()[marker]
descr = fmt("{0} (@{1})", line, marker)
bp_log.append((line, descr))
return {'line': line}
bp_log = []
breakpoints = self.request(
'setBreakpoints',
{
'source': {'path': path},
'breakpoints': [make_breakpoint(line) for line in lines],
},
).get('breakpoints', [])
bp_log = sorted(bp_log, key=lambda pair: pair[0])
bp_log = ", ".join((descr for _, descr in bp_log))
log.info("Breakpoints set in {0}: {1}", path, bp_log)
return breakpoints
def get_variables(self, *varnames, **kwargs):
"""Fetches the specified variables from the frame specified by frame_id, or
from the topmost frame in the last "stackTrace" response if frame_id is not
specified.
If varnames is empty, then all variables in the frame are returned. The result
is an OrderedDict, in which every entry has variable name as the key, and a
DAP Variable object as the value. The original order of variables as reported
by the debugger is preserved.
If varnames is not empty, then only the specified variables are returned.
The result is a tuple, in which every entry is a DAP Variable object; those
entries are in the same order as varnames.
"""
assert self.timeline.is_frozen
frame_id = kwargs.pop("frame_id", None)
if frame_id is None:
stackTrace_responses = self.all_occurrences_of(
Response(Request("stackTrace"))
)
assert stackTrace_responses, (
'get_variables() without frame_id requires at least one response '
'to a "stackTrace" request in the timeline.'
)
stack_trace = stackTrace_responses[-1].body
frame_id = stack_trace["stackFrames"][0]["id"]
scopes = self.request("scopes", {"frameId": frame_id})["scopes"]
assert len(scopes) > 0
variables = self.request(
"variables", {"variablesReference": scopes[0]["variablesReference"]}
)["variables"]
variables = collections.OrderedDict(((v["name"], v) for v in variables))
if varnames:
assert set(varnames) <= set(variables.keys())
return tuple((variables[name] for name in varnames))
else:
return variables
def get_variable(self, varname, frame_id=None):
"""Same as get_variables(...)[0].
"""
return self.get_variables(varname, frame_id=frame_id)[0]
def attach_to_subprocess(self, ptvsd_subprocess):
assert ptvsd_subprocess == Event("ptvsd_subprocess")
child_port = ptvsd_subprocess.body['port']
assert child_port != 0
@ -754,11 +888,20 @@ class Session(object):
else:
return child_session
def connect_to_next_child_session(self):
def attach_to_next_subprocess(self):
ptvsd_subprocess = self.wait_for_next(Event('ptvsd_subprocess'))
return self.connect_to_child_session(ptvsd_subprocess)
return self.attach_to_subprocess(ptvsd_subprocess)
def reattach(self, **kwargs):
"""Creates and initializes a new Session that tries to attach to the same
process.
Upon return, handshake() has been performed, but the caller is responsible
for invoking start_debugging().
"""
assert self.start_method.startswith("attach_socket_")
def connect_with_new_session(self, **kwargs):
ns = Session(start_method='attach_socket_import', ptvsd_port=self.ptvsd_port)
try:
ns._setup_session(**kwargs)
@ -776,24 +919,10 @@ class Session(object):
ns.handshake()
except Exception:
ns.close()
raise
else:
return ns
def output(self, category):
"""Returns all output of a given category as a single string, assembled from
all the "output" events received for that category so far.
"""
events = self.all_occurrences_of(
Event("output", some.dict.containing({"category": category}))
)
return "".join(event.body["output"] for event in events)
def captured_stdout(self, encoding=None):
return self.captured_output.stdout(encoding)
def captured_stderr(self, encoding=None):
return self.captured_output.stderr(encoding)
class CapturedOutput(object):
"""Captured stdout and stderr of the debugged process.

View file

@ -31,13 +31,13 @@ def get_test_server_port(start, stop):
"""
try:
worker_id = compat.force_ascii(os.environ['PYTEST_XDIST_WORKER'])
worker_id = compat.force_ascii(os.environ["PYTEST_XDIST_WORKER"])
except KeyError:
n = 0
else:
assert worker_id == some.str.matching(br"gw(\d+)"), (
"Unrecognized PYTEST_XDIST_WORKER format"
)
assert worker_id == some.bytes.matching(
br"gw(\d+)"
), "Unrecognized PYTEST_XDIST_WORKER format"
n = int(worker_id[2:])
port = start + n
@ -60,13 +60,15 @@ def wait_until_port_is_listening(port, interval=1, max_attempts=1000):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
for i in compat.xrange(0, max_attempts):
for i in compat.xrange(1, max_attempts + 1):
try:
log.info("Trying to connect to port {0} (attempt {1})", port, i)
log.info("Probing localhost:{0} (attempt {1})...", port, i)
sock.connect(("localhost", port))
return
except socket.error:
time.sleep(interval)
else:
log.info("localhost:{0} is listening - server is up!", port)
return
finally:
sock.close()
@ -86,25 +88,64 @@ class WebRequest(object):
def __init__(self, method, url, *args, **kwargs):
"""Invokes requests.method(url, *args, **kwargs) on a background thread,
and immediately returns.
If method() raises an exception, it is logged, unless log_errors=False.
"""
self.method = method
self.url = url
self.log_errors = kwargs.pop("log_errors", True)
self.request = None
"""The underlying Request object. Not set until wait_for_response() returns.
"""The underlying requests.Request object.
Not set until wait_for_response() returns.
"""
method = getattr(requests, method)
self._worker_thread = threading.Thread(
target=lambda: self._worker(method, url, *args, **kwargs),
name=fmt("WebRequest({0!r})", url)
)
self.exception = None
"""Exception that occurred while performing the request, if any.
def _worker(self, method, url, *args, **kwargs):
self.request = method(url, *args, **kwargs)
Not set until wait_for_response() returns.
"""
log.info("{0}", self)
func = getattr(requests, method)
self._worker_thread = threading.Thread(
target=lambda: self._worker(func, *args, **kwargs),
name=fmt("WebRequest({0})", self),
)
self._worker_thread.daemon = True
self._worker_thread.start()
def __str__(self):
return fmt("HTTP {0} {1}", self.method.upper(), self.url)
def _worker(self, func, *args, **kwargs):
try:
self.request = func(self.url, *args, **kwargs)
except Exception as exc:
if self.log_errors:
log.exception("{0} failed:", self)
self.exception = exc
else:
log.info(
"{0} --> {1} {2}",
self,
self.request.status_code,
self.request.reason
)
def wait_for_response(self, timeout=None):
"""Blocks until the request completes, and returns self.request.
"""
self._worker_thread.join(timeout)
if self._worker_thread.is_alive():
log.info("Waiting for response to {0} ...", self)
self._worker_thread.join(timeout)
if self.exception is not None:
raise self.exception
return self.request
def response_text(self):
@ -124,13 +165,16 @@ class WebServer(object):
def __enter__(self):
"""Blocks until the server starts listening on self.port.
"""
wait_until_port_is_listening(self.port)
log.info("Web server expected on {0}", self.url)
wait_until_port_is_listening(self.port, interval=3)
return self
def __exit__(self, exc_type, exc_value, exc_tb):
"""Sends an HTTP /exit POST request to the server.
"""Sends an HTTP /exit GET request to the server.
The server is expected to terminate its process while handling that request.
"""
self.post("exit").wait_for_response()
self.get("/exit", log_errors=False)
def get(self, path, *args, **kwargs):
return WebRequest.get(self.url + path, *args, **kwargs)

View file

@ -7,320 +7,8 @@ from __future__ import absolute_import, print_function, unicode_literals
"""Do not import this package directly - import tests.patterns.some instead.
"""
# The actual patterns are defined here, so that tests.patterns.some can redefine
# builtin names like str, int etc without affecting the implementations in this
# file - some.* then provides shorthand aliases.
# Wire up some.dap to be an alias for dap, to allow writing some.dap.id etc.
from tests.patterns import some
from tests.patterns import dap
import itertools
import py.path
import re
import sys
from ptvsd.common import compat, fmt
from ptvsd.common.compat import unicode, xrange
import pydevd_file_utils
class Some(object):
"""A pattern that can be tested against a value with == to see if it matches.
"""
def __repr__(self):
try:
return self.name
except AttributeError:
raise NotImplementedError
def __eq__(self, value):
raise NotImplementedError
def __ne__(self, other):
return not (self == other)
def __invert__(self):
"""The inverse pattern - matches everything that this one doesn't.
"""
return Not(self)
def __or__(self, pattern):
"""Union pattern - matches if either of the two patterns match.
"""
return Either(self, pattern)
def such_that(self, condition):
"""Same pattern, but it only matches if condition() is true.
"""
return SuchThat(self, condition)
def in_range(self, start, stop):
"""Same pattern, but it only matches if the start <= value < stop.
"""
return InRange(self, start, stop)
class Not(Some):
"""Matches the inverse of the pattern.
"""
def __init__(self, pattern):
self.pattern = pattern
def __repr__(self):
return fmt("~{0!r}", self.pattern)
def __eq__(self, value):
return value != self.pattern
class Either(Some):
"""Matches either of the patterns.
"""
def __init__(self, *patterns):
assert len(patterns) > 0
self.patterns = tuple(patterns)
def __repr__(self):
try:
return self.name
except AttributeError:
return fmt("({0})", " | ".join(repr(pat) for pat in self.patterns))
def __eq__(self, value):
return any(pattern == value for pattern in self.patterns)
def __or__(self, pattern):
return Either(*(self.patterns + (pattern,)))
class SuchThat(Some):
"""Matches only if condition is true.
"""
def __init__(self, pattern, condition):
self.pattern = pattern
self.condition = condition
def __repr__(self):
try:
return self.name
except AttributeError:
return fmt("({0!r} if {1})", self.pattern, compat.nameof(self.condition))
def __eq__(self, value):
return self.condition(value) and value == self.pattern
class InRange(Some):
"""Matches only if the value is within the specified range.
"""
def __init__(self, pattern, start, stop):
self.pattern = pattern
self.start = start
self.stop = stop
def __repr__(self):
try:
return self.name
except AttributeError:
return fmt("({0!r} <= {1!r} < {2!r})", self.start, self.pattern, self.stop)
def __eq__(self, value):
return self.start <= value < self.stop and value == self.pattern
class Object(Some):
"""Matches anything.
"""
name = "<?>"
def __eq__(self, value):
return True
def equal_to(self, obj):
return EqualTo(obj)
def same_as(self, obj):
return SameAs(obj)
class Thing(Some):
"""Matches anything that is not None.
"""
name = "<>"
def __eq__(self, value):
return value is not None
class InstanceOf(Some):
"""Matches any object that is an instance of the specified type.
"""
def __init__(self, classinfo, name=None):
if isinstance(classinfo, type):
classinfo = (classinfo,)
assert (
len(classinfo) > 0 and
all((isinstance(cls, type) for cls in classinfo))
), "classinfo must be a type or a tuple of types"
self.name = name
self.classinfo = classinfo
def __repr__(self):
if self.name:
name = self.name
else:
name = " | ".join(cls.__name__ for cls in self.classinfo)
return fmt("<{0}>", name)
def __eq__(self, value):
return isinstance(value, self.classinfo)
class EqualTo(Some):
"""Matches any object that is equal to the specified object.
"""
def __init__(self, obj):
self.obj = obj
def __repr__(self):
return repr(self.obj)
def __eq__(self, value):
return self.obj == value
class SameAs(Some):
"""Matches one specific object only (i.e. makes '==' behave like 'is').
"""
def __init__(self, obj):
self.obj = obj
def __repr__(self):
return fmt("is {0!r}", self.obj)
def __eq__(self, value):
return self.obj is value
class Matching(Some):
"""Matches any string that matches the specified regular expression.
"""
def __init__(self, regex):
assert isinstance(regex, bytes) or isinstance(regex, unicode)
self.regex = regex
def __repr__(self):
s = repr(self.regex)
if s[0] in "bu":
return s[0] + "/" + s[2:-1] + "/"
else:
return "/" + s[1:-1] + "/"
def __eq__(self, other):
regex = self.regex
if isinstance(regex, bytes) and not isinstance(other, bytes):
return NotImplemented
if isinstance(regex, unicode) and not isinstance(other, unicode):
return NotImplemented
return re.match(regex, other) is not None
class Path(Some):
"""Matches any string that matches the specified path.
Uses os.path.normcase() to normalize both strings before comparison.
If one string is unicode, but the other one is not, both strings are normalized
to unicode using sys.getfilesystemencoding().
"""
def __init__(self, path):
if isinstance(path, py.path.local):
path = path.strpath
if isinstance(path, bytes):
path = path.encode(sys.getfilesystemencoding())
assert isinstance(path, unicode)
self.path = path
def __repr__(self):
return fmt("some.path({0!r})", self.path)
def __eq__(self, other):
if isinstance(other, py.path.local):
other = other.strpath
if isinstance(other, unicode):
pass
elif isinstance(other, bytes):
other = other.encode(sys.getfilesystemencoding())
else:
return NotImplemented
left = pydevd_file_utils.get_path_with_real_case(self.path)
right = pydevd_file_utils.get_path_with_real_case(other)
return left == right
class ListContaining(Some):
"""Matches any list that contains the specified subsequence of elements.
"""
def __init__(self, *items):
self.items = tuple(items)
def __repr__(self):
if not self.items:
return "[...]"
s = repr(list(self.items))
return fmt("[..., {0}, ...]", s[1:-1])
def __eq__(self, other):
if not isinstance(other, list):
return NotImplemented
items = self.items
if not items:
return True # every list contains an empty sequence
if len(items) == 1:
return self.items[0] in other
# Zip the other list with itself, shifting by one every time, to produce
# tuples of equal length with items - i.e. all potential subsequences. So,
# given other=[1, 2, 3, 4, 5] and items=(2, 3, 4), we want to get a list
# like [(1, 2, 3), (2, 3, 4), (3, 4, 5)] - and then search for items in it.
iters = [itertools.islice(other, i, None) for i in xrange(0, len(items))]
subseqs = compat.izip(*iters)
return any(subseq == items for subseq in subseqs)
class DictContaining(Some):
"""Matches any dict that contains the specified key-value pairs::
d1 = {'a': 1, 'b': 2, 'c': 3}
d2 = {'a': 1, 'b': 2}
assert d1 == some.dict.containing(d2)
assert d2 != some.dict.containing(d1)
"""
def __init__(self, items):
self.items = dict(items)
def __repr__(self):
return repr(self.items)[:-1] + ', ...}'
def __eq__(self, other):
if not isinstance(other, dict):
return NotImplemented
any = Object()
d = {key: any for key in other}
d.update(self.items)
return d == other
some.dap = dap

378
tests/patterns/_impl.py Normal file
View file

@ -0,0 +1,378 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import absolute_import, print_function, unicode_literals
# The actual patterns are defined here, so that tests.patterns.some can redefine
# builtin names like str, int etc without affecting the implementations in this
# file - some.* then provides shorthand aliases.
import itertools
import py.path
import re
import sys
from ptvsd.common import compat, fmt
from ptvsd.common.compat import unicode, xrange
import pydevd_file_utils
class Some(object):
"""A pattern that can be tested against a value with == to see if it matches.
"""
def matches(self, value):
raise NotImplementedError
def __repr__(self):
try:
return self.name
except AttributeError:
raise NotImplementedError
def __eq__(self, value):
return self.matches(value)
def __ne__(self, value):
return not self.matches(value)
def __invert__(self):
"""The inverse pattern - matches everything that this one doesn't.
"""
return Not(self)
def __or__(self, pattern):
"""Union pattern - matches if either of the two patterns match.
"""
return Either(self, pattern)
def such_that(self, condition):
"""Same pattern, but it only matches if condition() is true.
"""
return SuchThat(self, condition)
def in_range(self, start, stop):
"""Same pattern, but it only matches if the start <= value < stop.
"""
return InRange(self, start, stop)
def equal_to(self, obj):
return EqualTo(self, obj)
def not_equal_to(self, obj):
return NotEqualTo(self, obj)
def same_as(self, obj):
return SameAs(self, obj)
def matching(self, regex, flags=0):
"""Same pattern, but it only matches if re.match(regex, flags) produces
a match that corresponds to the entire string.
"""
return Matching(self, regex, flags)
class Not(Some):
"""Matches the inverse of the pattern.
"""
def __init__(self, pattern):
self.pattern = pattern
def __repr__(self):
return fmt("~{0!r}", self.pattern)
def matches(self, value):
return value != self.pattern
class Either(Some):
"""Matches either of the patterns.
"""
def __init__(self, *patterns):
assert len(patterns) > 0
self.patterns = tuple(patterns)
def __repr__(self):
try:
return self.name
except AttributeError:
return fmt("({0})", " | ".join(repr(pat) for pat in self.patterns))
def matches(self, value):
return any(pattern == value for pattern in self.patterns)
def __or__(self, pattern):
return Either(*(self.patterns + (pattern,)))
class Object(Some):
"""Matches anything.
"""
name = "<?>"
def matches(self, value):
return True
class Thing(Some):
"""Matches anything that is not None.
"""
name = "<>"
def matches(self, value):
return value is not None
class InstanceOf(Some):
"""Matches any object that is an instance of the specified type.
"""
def __init__(self, classinfo, name=None):
if isinstance(classinfo, type):
classinfo = (classinfo,)
assert (
len(classinfo) > 0 and
all((isinstance(cls, type) for cls in classinfo))
), "classinfo must be a type or a tuple of types"
self.name = name
self.classinfo = classinfo
def __repr__(self):
if self.name:
name = self.name
else:
name = " | ".join(cls.__name__ for cls in self.classinfo)
return fmt("<{0}>", name)
def matches(self, value):
return isinstance(value, self.classinfo)
class Path(Some):
"""Matches any string that matches the specified path.
Uses os.path.normcase() to normalize both strings before comparison.
If one string is unicode, but the other one is not, both strings are normalized
to unicode using sys.getfilesystemencoding().
"""
def __init__(self, path):
if isinstance(path, py.path.local):
path = path.strpath
if isinstance(path, bytes):
path = path.encode(sys.getfilesystemencoding())
assert isinstance(path, unicode)
self.path = path
def __repr__(self):
return fmt("some.path({0!r})", self.path)
def matches(self, other):
if isinstance(other, py.path.local):
other = other.strpath
if isinstance(other, unicode):
pass
elif isinstance(other, bytes):
other = other.encode(sys.getfilesystemencoding())
else:
return NotImplemented
left = pydevd_file_utils.get_path_with_real_case(self.path)
right = pydevd_file_utils.get_path_with_real_case(other)
return left == right
class ListContaining(Some):
"""Matches any list that contains the specified subsequence of elements.
"""
def __init__(self, *items):
self.items = tuple(items)
def __repr__(self):
if not self.items:
return "[...]"
s = repr(list(self.items))
return fmt("[..., {0}, ...]", s[1:-1])
def matches(self, other):
if not isinstance(other, list):
return NotImplemented
items = self.items
if not items:
return True # every list contains an empty sequence
if len(items) == 1:
return self.items[0] in other
# Zip the other list with itself, shifting by one every time, to produce
# tuples of equal length with items - i.e. all potential subsequences. So,
# given other=[1, 2, 3, 4, 5] and items=(2, 3, 4), we want to get a list
# like [(1, 2, 3), (2, 3, 4), (3, 4, 5)] - and then search for items in it.
iters = [itertools.islice(other, i, None) for i in xrange(0, len(items))]
subseqs = compat.izip(*iters)
return any(subseq == items for subseq in subseqs)
class DictContaining(Some):
"""Matches any dict that contains the specified key-value pairs::
d1 = {'a': 1, 'b': 2, 'c': 3}
d2 = {'a': 1, 'b': 2}
assert d1 == some.dict.containing(d2)
assert d2 != some.dict.containing(d1)
"""
def __init__(self, items):
self.items = dict(items)
def __repr__(self):
return repr(self.items)[:-1] + ', ...}'
def matches(self, other):
if not isinstance(other, dict):
return NotImplemented
any = Object()
d = {key: any for key in other}
d.update(self.items)
return d == other
class Also(Some):
"""Base class for patterns that narrow down another pattern.
"""
def __init__(self, pattern):
self.pattern = pattern
def matches(self, value):
return self.pattern == value and self._also(value)
def _also(self, value):
raise NotImplementedError
class SuchThat(Also):
"""Matches only if condition is true.
"""
def __init__(self, pattern, condition):
super(SuchThat, self).__init__(pattern)
self.condition = condition
def __repr__(self):
try:
return self.name
except AttributeError:
return fmt("({0!r} if {1})", self.pattern, compat.nameof(self.condition))
def _also(self, value):
return self.condition(value)
class InRange(Also):
"""Matches only if the value is within the specified range.
"""
def __init__(self, pattern, start, stop):
super(InRange, self).__init__(pattern)
self.start = start
self.stop = stop
def __repr__(self):
try:
return self.name
except AttributeError:
return fmt("({0!r} <= {1!r} < {2!r})", self.start, self.pattern, self.stop)
def _also(self, value):
return self.start <= value < self.stop
class EqualTo(Also):
"""Matches any object that is equal to the specified object.
"""
def __init__(self, pattern, obj):
super(EqualTo, self).__init__(pattern)
self.obj = obj
def __repr__(self):
return repr(self.obj)
def _also(self, value):
return self.obj == value
class NotEqualTo(Also):
"""Matches any object that is not equal to the specified object.
"""
def __init__(self, pattern, obj):
super(NotEqualTo, self).__init__(pattern)
self.obj = obj
def __repr__(self):
return repr(self.obj)
def _also(self, value):
return self.obj != value
class SameAs(Also):
"""Matches one specific object only (i.e. makes '==' behave like 'is').
"""
def __init__(self, pattern, obj):
super(SameAs, self).__init__(pattern)
self.obj = obj
def __repr__(self):
return fmt("is {0!r}", self.obj)
def _also(self, value):
return self.obj is value
class Matching(Also):
"""Matches any string that matches the specified regular expression.
"""
def __init__(self, pattern, regex, flags=0):
assert isinstance(regex, bytes) or isinstance(regex, unicode)
super(Matching, self).__init__(pattern)
self.regex = regex
self.flags = flags
def __repr__(self):
s = repr(self.regex)
if s[0] in "bu":
return s[0] + "/" + s[2:-1] + "/"
else:
return "/" + s[1:-1] + "/"
def _also(self, value):
regex = self.regex
# re.match() always starts matching at the beginning, but does not require
# a complete match of the string - append "$" to ensure the latter.
if isinstance(regex, bytes):
if not isinstance(value, bytes):
return NotImplemented
regex += b"$"
elif isinstance(regex, unicode):
if not isinstance(value, unicode):
return NotImplemented
regex += "$"
else:
raise AssertionError()
return re.match(regex, value, self.flags) is not None

59
tests/patterns/dap.py Normal file
View file

@ -0,0 +1,59 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import absolute_import, print_function, unicode_literals
"""Patterns that are specific to the Debug Adapter Protocol.
"""
import py.path
from ptvsd.common.compat import unicode
from tests import code
from tests.patterns import some, _impl
id = some.int.in_range(0, 10000)
"""Matches a DAP "id", assuming some reasonable range for an implementation that
generates those ids sequentially.
"""
def source(path, **kwargs):
"""Matches DAP Source objects.
"""
if isinstance(path, py.path.local):
path = some.path(path)
d = {"path": path}
d.update(kwargs)
return some.dict.containing(d)
def frame(source, line, **kwargs):
"""Matches DAP Frame objects.
If source is py.path.local, it's automatically wrapped with some.dap.source().
If line is unicode, it is treated as a line marker, and translated to a line
number via get_marked_line_numbers(source["path"]) if possible.
"""
if isinstance(source, py.path.local):
source = some.dap.source(source)
if isinstance(line, unicode):
if isinstance(source, dict):
path = source["path"]
elif isinstance(source, _impl.DictContaining):
path = source.items["path"]
else:
path = None
assert isinstance(path, _impl.Path), (
"source must be some.dap.source() to use line markers in some.dap.frame()"
)
line = code.get_marked_line_numbers(path.path)[line]
d = {"id": some.dap.id, "source": source, "line": line, "column": 1}
d.update(kwargs)
return some.dict.containing(d)

View file

@ -33,14 +33,20 @@ Usage::
assert Exception() == some.error
assert object() == some.object.same_as(object())
assert b"abc" == some.bytes
assert u"abc" == some.str
if sys.version_info < (3,):
assert b"abc" == some.str
else:
assert b"abc" != some.str
assert "abbbc" == some.str.matching(r".(b+).")
assert "abbbc" != some.str.matching(r"bbb")
assert "abbc" == some.str.starting_with("ab")
assert "abbc" == some.str.ending_with("bc")
assert "abbc" == some.str.containing("bb")
assert "abbc" == some.str.matching(r".(b+).")
assert "abbc" != some.str.matching(r"ab")
assert "abbc" != some.str.matching(r"bc")
if platform.system() == "Windows":
assert "\\Foo\\Bar" == some.path("/foo/bar")
@ -65,7 +71,7 @@ Usage::
__all__ = [
"bool",
"bytes",
"dap_id",
"dap",
"dict",
"error",
"instanceof",
@ -73,23 +79,23 @@ __all__ = [
"list",
"number",
"path",
"source",
"str",
"thing",
"tuple",
]
import numbers
import re
import sys
from ptvsd.common.compat import builtins
from tests import patterns as some
from tests.patterns import _impl
object = some.Object()
thing = some.Thing()
instanceof = some.InstanceOf
path = some.Path
object = _impl.Object()
thing = _impl.Thing()
instanceof = _impl.InstanceOf
path = _impl.Path
bool = instanceof(builtins.bool)
@ -100,7 +106,9 @@ error = instanceof(Exception)
bytes = instanceof(builtins.bytes)
bytes.matching = some.Matching
bytes.starting_with = lambda prefix: bytes.matching(re.escape(prefix) + b".*", re.DOTALL)
bytes.ending_with = lambda suffix: bytes.matching(b".*" + re.escape(suffix), re.DOTALL)
bytes.containing = lambda sub: bytes.matching(b".*" + re.escape(sub) + b".*", re.DOTALL)
"""In Python 2, matches both str and unicode. In Python 3, only matches str.
@ -109,24 +117,19 @@ if sys.version_info < (3,):
str = instanceof((builtins.str, builtins.unicode), "str")
else:
str = instanceof(builtins.str)
str.matching = some.Matching
str.starting_with = lambda prefix: str.matching(re.escape(prefix) + ".*", re.DOTALL)
str.ending_with = lambda suffix: str.matching(".*" + re.escape(suffix), re.DOTALL)
str.containing = lambda sub: str.matching(".*" + re.escape(sub) + ".*", re.DOTALL)
list = instanceof(builtins.list)
list.containing = some.ListContaining
list.containing = _impl.ListContaining
dict = instanceof(builtins.dict)
dict.containing = some.DictContaining
dict.containing = _impl.DictContaining
dap_id = int.in_range(0, 10000)
"""Matches a DAP "id", assuming some reasonable range for an implementation that
generates those ids sequentially.
"""
def source(path):
"""Matches "source": {"path": ...} values in DAP.
"""
return dict.containing({"path": path})
# Set in __init__.py to avoid circular dependency.
dap = None

View file

@ -7,6 +7,7 @@ from __future__ import absolute_import, print_function, unicode_literals
"""Tests for JSON message streams and channels.
"""
import collections
import json
import io
import pytest
@ -66,7 +67,7 @@ class TestJsonIOStream(object):
def setup_class(cls):
for seq in range(0, 3):
message_body = cls.MESSAGE_BODY_TEMPLATE % seq
message = json.loads(message_body)
message = json.loads(message_body, object_pairs_hook=collections.OrderedDict)
message_body = message_body.encode("utf-8")
cls.MESSAGES.append(message)
message_header = "Content-Length: %d\r\n\r\n" % len(message_body)
@ -592,7 +593,7 @@ class TestJsonMessageChannel(object):
input_exhausted.wait()
def missing_property(name):
return some.str.matching("Invalid message:.*" + re.escape(name))
return some.str.matching("Invalid message:.*" + re.escape(name) + ".*")
assert output == [
{

View file

@ -34,7 +34,6 @@ def test_attach(run_as, wait_for_attach, is_attached, break_into):
target=(run_as, attach1_py),
start_method="launch",
env=env,
use_backchannel=True,
)
session.start_debugging()
@ -83,7 +82,6 @@ def test_reattach(pyfile, start_method, run_as):
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
use_backchannel=True,
kill_ptvsd=False,
capture_output=False,
)
@ -94,8 +92,7 @@ def test_reattach(pyfile, start_method, run_as):
session.wait_for_disconnect()
assert backchannel.receive() == "continued"
# re-attach
with session.connect_with_new_session(target=(run_as, code_to_debug)) as session2:
with session.reattach(target=(run_as, code_to_debug)) as session2:
session2.start_debugging()
hit = session2.wait_for_stop()
assert code_to_debug.lines["second"] == hit.frames[0]["line"]

View file

@ -11,25 +11,26 @@ import re
import sys
from ptvsd.common import fmt
from tests import code, debug, test_data
from tests import debug, test_data
from tests.patterns import some
BP_TEST_ROOT = test_data / "bp"
bp_root = test_data / "bp"
def test_path_with_ampersand(start_method, run_as):
test_py = BP_TEST_ROOT / "a&b" / "test.py"
lines = code.get_marked_line_numbers(test_py)
test_py = bp_root / "a&b" / "test.py"
with debug.Session(start_method) as session:
session.initialize(target=(run_as, test_py))
session.set_breakpoints(test_py, [lines["two"]])
session.set_breakpoints(test_py, ["two"])
session.start_debugging()
session.wait_for_stop(
"breakpoint",
expected_frames=[some.dict.containing({"source": some.source(test_py)})],
expected_frames=[
some.dap.frame(test_py, line="two"),
],
)
session.request_continue()
@ -44,20 +45,17 @@ def test_path_with_ampersand(start_method, run_as):
reason="https://github.com/Microsoft/ptvsd/issues/1124#issuecomment-459506802",
)
def test_path_with_unicode(start_method, run_as):
test_py = BP_TEST_ROOT / "ನನ್ನ_ಸ್ಕ್ರಿಪ್ಟ್.py"
lines = code.get_marked_line_numbers(test_py)
test_py = bp_root / "ನನ್ನ_ಸ್ಕ್ರಿಪ್ಟ್.py"
with debug.Session() as session:
session.initialize(target=(run_as, test_py), start_method=start_method)
session.set_breakpoints(test_py, [lines["bp"]])
with debug.Session(start_method) as session:
session.initialize(target=(run_as, test_py))
session.set_breakpoints(test_py, ["bp"])
session.start_debugging()
session.wait_for_stop("breakpoint", expected_frames=[
some.dict.containing({
"source": some.source(test_py),
"name": "ಏನಾದರೂ_ಮಾಡು",
}),
])
session.wait_for_stop(
"breakpoint",
expected_frames=[some.dap.frame(test_py, name="ಏನಾದರೂ_ಮಾಡು", line="bp")],
)
session.request_continue()
session.wait_for_exit()
@ -66,14 +64,14 @@ def test_path_with_unicode(start_method, run_as):
@pytest.mark.parametrize(
"condition_kind",
[
("condition",),
("hitCondition",),
("hitCondition", "eq"),
("hitCondition", "gt"),
("hitCondition", "ge"),
("hitCondition", "lt"),
("hitCondition", "le"),
("hitCondition", "mod"),
"condition",
"hitCondition",
"hitCondition-eq",
"hitCondition-gt",
"hitCondition-ge",
"hitCondition-lt",
"hitCondition-le",
"hitCondition-mod",
],
)
def test_conditional_breakpoint(pyfile, start_method, run_as, condition_kind):
@ -84,51 +82,41 @@ def test_conditional_breakpoint(pyfile, start_method, run_as, condition_kind):
for i in range(0, 10):
print(i) # @bp
condition_property = condition_kind[0]
condition_property = condition_kind.partition("-")[0]
condition, value, hits = {
("condition",): ("i==5", "5", 1),
("hitCondition",): ("5", "4", 1),
("hitCondition", "eq"): ("==5", "4", 1),
("hitCondition", "gt"): (">5", "5", 5),
("hitCondition", "ge"): (">=5", "4", 6),
("hitCondition", "lt"): ("<5", "0", 4),
("hitCondition", "le"): ("<=5", "0", 5),
("hitCondition", "mod"): ("%3", "2", 3),
"condition": ("i==5", "5", 1),
"hitCondition": ("5", "4", 1),
"hitCondition-eq": ("==5", "4", 1),
"hitCondition-gt": (">5", "5", 5),
"hitCondition-ge": (">=5", "4", 6),
"hitCondition-lt": ("<5", "0", 4),
"hitCondition-le": ("<=5", "0", 5),
"hitCondition-mod": ("%3", "2", 3),
}[condition_kind]
lines = code_to_debug.lines
with debug.Session() as session:
session.initialize(target=(run_as, code_to_debug), start_method=start_method)
with debug.Session(start_method) as session:
session.initialize(target=(run_as, code_to_debug))
session.request(
"setBreakpoints",
arguments={
{
"source": {"path": code_to_debug},
"breakpoints": [{"line": lines["bp"], condition_property: condition}],
"breakpoints": [
{"line": code_to_debug.lines["bp"], condition_property: condition}
],
},
)
session.start_debugging()
frame_id = session.wait_for_stop(expected_frames=[
some.dict.containing({"line": lines["bp"]})
]).frame_id
session.wait_for_stop(
expected_frames=[some.dap.frame(code_to_debug, line="bp")]
)
scopes = session.request(
"scopes", arguments={"frameId": frame_id}
)["scopes"]
session.get_variables()
assert len(scopes) > 0
variables = session.request(
"variables",
arguments={"variablesReference": scopes[0]["variablesReference"]},
)["variables"]
variables = [v for v in variables if v["name"] == "i"]
assert variables == [
some.dict.containing(
{"name": "i", "type": "int", "value": value, "evaluateName": "i"}
)
]
var_i = session.get_variable("i")
assert var_i == some.dict.containing(
{"name": "i", "type": "int", "value": value, "evaluateName": "i"}
)
session.request_continue()
for i in range(1, hits):
@ -153,27 +141,17 @@ def test_crossfile_breakpoint(pyfile, start_method, run_as):
script1.do_something() # @bp
print("Done")
with debug.Session() as session:
session.initialize(target=(run_as, script2), start_method=start_method)
session.set_breakpoints(script1, lines=[script1.lines["bp"]])
session.set_breakpoints(script2, lines=[script2.lines["bp"]])
with debug.Session(start_method) as session:
session.initialize(target=(run_as, script2))
session.set_breakpoints(script1, all)
session.set_breakpoints(script2, all)
session.start_debugging()
session.wait_for_stop(expected_frames=[
some.dict.containing({
"source": some.source(script2),
"line": script2.lines["bp"],
})
])
session.wait_for_stop(expected_frames=[some.dap.frame(script2, line="bp")])
session.request_continue()
session.wait_for_stop(expected_frames=[
some.dict.containing({
"source": some.source(script1),
"line": script1.lines["bp"],
})
])
session.wait_for_stop(expected_frames=[some.dap.frame(script1, line="bp")])
session.request_continue()
session.wait_for_exit()
@ -198,17 +176,17 @@ def test_error_in_condition(pyfile, start_method, run_as, error_name):
"ZeroDivisionError": ("1 / 0", True),
}[error_name]
with debug.Session() as session:
session.initialize(target=(run_as, code_to_debug), start_method=start_method)
session.send_request(
with debug.Session(start_method) as session:
session.initialize(target=(run_as, code_to_debug))
session.request(
"setBreakpoints",
arguments={
{
"source": {"path": code_to_debug},
"breakpoints": [
{"line": code_to_debug.lines["bp"], "condition": condition}
],
},
).wait_for_response()
)
session.start_debugging()
session.wait_for_exit()
@ -231,8 +209,8 @@ def test_log_point(pyfile, start_method, run_as, condition):
print(i * 10) # @bp
lines = code_to_debug.lines
with debug.Session() as session:
session.initialize(target=(run_as, code_to_debug), start_method=start_method)
with debug.Session(start_method) as session:
session.initialize(target=(run_as, code_to_debug))
bp = {"line": lines["bp"], "logMessage": "{i}"}
if condition:
@ -240,37 +218,22 @@ def test_log_point(pyfile, start_method, run_as, condition):
session.request(
"setBreakpoints",
arguments={
"source": {"path": code_to_debug},
"breakpoints": [bp],
},
arguments={"source": {"path": code_to_debug}, "breakpoints": [bp]},
)
session.start_debugging()
if condition:
frame_id = session.wait_for_stop(expected_frames=[
some.dict.containing({
"line": lines["bp"]
})
]).frame_id
session.wait_for_stop(
"breakpoint",
expected_frames=[
some.dap.frame(code_to_debug, line="bp")
],
)
scopes = session.request(
"scopes", arguments={"frameId": frame_id}
)["scopes"]
assert len(scopes) > 0
variables = session.request(
"variables",
arguments={"variablesReference": scopes[0]["variablesReference"]},
)["variables"]
variables = [v for v in variables if v["name"] == "i"]
assert variables == [
some.dict.containing(
{"name": "i", "type": "int", "value": "5", "evaluateName": "i"}
)
]
var_i = session.get_variable("i")
assert var_i == some.dict.containing(
{"name": "i", "evaluateName": "i", "type": "int", "value": "5"},
)
session.request_continue()
@ -278,25 +241,30 @@ def test_log_point(pyfile, start_method, run_as, condition):
assert not session.captured_stderr()
expected_stdout = "".join((
fmt(r"{0}\r?\n{1}\r?\n", re.escape(str(i)), re.escape(str(i * 10)))
for i in range(0, 10)
))
expected_stdout = "".join(
(
fmt(r"{0}\r?\n{1}\r?\n", re.escape(str(i)), re.escape(str(i * 10)))
for i in range(0, 10)
)
)
assert session.output("stdout") == some.str.matching(expected_stdout)
def test_package_launch():
cwd = test_data / "testpkgs"
test_py = cwd / "pkg1" / "__main__.py"
lines = code.get_marked_line_numbers(test_py)
with debug.Session() as session:
session.initialize(target=("module", "pkg1"), start_method="launch", cwd=cwd)
session.set_breakpoints(test_py, [lines["two"]])
with debug.Session("launch") as session:
session.initialize(target=("module", "pkg1"), cwd=cwd)
session.set_breakpoints(test_py, ["two"])
session.start_debugging()
hit = session.wait_for_stop()
assert lines["two"] == hit.frames[0]["line"]
session.wait_for_stop(
"breakpoint",
expected_frames=[
some.dap.frame(test_py, line="two"),
],
)
session.request_continue()
session.wait_for_exit()
@ -310,19 +278,19 @@ def test_add_and_remove_breakpoint(pyfile, start_method, run_as):
for i in range(0, 10):
print(i) # @bp
lines = code_to_debug.lines
with debug.Session() as session:
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
)
session.set_breakpoints(code_to_debug, [lines["bp"]])
with debug.Session(start_method) as session:
session.initialize(target=(run_as, code_to_debug))
session.set_breakpoints(code_to_debug, ["bp"])
session.start_debugging()
hit = session.wait_for_stop()
assert lines["bp"] == hit.frames[0]["line"]
session.wait_for_stop(
"breakpoint",
expected_frames=[
some.dap.frame(code_to_debug, line="bp"),
],
)
# remove breakpoints in file
# Remove breakpoints in file.
session.set_breakpoints(code_to_debug, [])
session.request_continue()
session.wait_for_exit()
@ -354,48 +322,37 @@ def test_invalid_breakpoints(pyfile, start_method, run_as):
4, 5, 6)
# fmt: on
lines = code_to_debug.lines
with debug.Session() as session:
session.initialize(target=(run_as, code_to_debug), start_method=start_method)
with debug.Session(start_method) as session:
session.initialize(target=(run_as, code_to_debug))
requested_bps = [
lines["bp1-requested"],
lines["bp2-requested"],
lines["bp3-requested"],
]
bp_markers = ["bp1-requested", "bp2-requested", "bp3-requested"]
if sys.version_info < (3,):
requested_bps += [
lines["bp4-requested-1"],
lines["bp4-requested-2"],
]
bp_markers += ["bp4-requested-1", "bp4-requested-2"]
actual_bps = session.set_breakpoints(code_to_debug, requested_bps)
actual_bps = [bp["line"] for bp in actual_bps]
bps = session.set_breakpoints(code_to_debug, bp_markers)
actual_lines = [bp["line"] for bp in bps]
expected_bps = [
lines["bp1-expected"],
lines["bp2-expected"],
lines["bp3-expected"],
]
expected_markers = ["bp1-expected", "bp2-expected", "bp3-expected"]
if sys.version_info < (3,):
expected_bps += [lines["bp4-expected"], lines["bp4-expected"]]
expected_markers += ["bp4-expected", "bp4-expected"]
expected_lines = [code_to_debug.lines[marker] for marker in expected_markers]
assert expected_bps == actual_bps
assert actual_lines == expected_lines
# Now let's make sure that we hit all of the expected breakpoints,
# and stop where we expect them to be.
session.start_debugging()
# If there's multiple breakpoints on the same line, we only stop once,
# so remove duplicates first.
expected_bps = sorted(set(expected_bps))
expected_lines = sorted(set(expected_lines))
while expected_bps:
expected_line = expected_bps.pop(0)
session.wait_for_stop(expected_frames=[
some.dict.containing({"line": expected_line})
])
while expected_lines:
expected_line = expected_lines.pop(0)
session.wait_for_stop(
"breakpoint",
expected_frames=[some.dap.frame(code_to_debug, line=expected_line)],
)
session.request_continue()
session.wait_for_exit()
@ -414,11 +371,9 @@ def test_deep_stacks(pyfile, start_method, run_as):
deep_stack(100)
with debug.Session() as session:
session.initialize(target=(run_as, code_to_debug), start_method=start_method)
actual_bps = session.set_breakpoints(code_to_debug, [code_to_debug.lines["bp"]])
actual_bps = [bp["line"] for bp in actual_bps]
with debug.Session(start_method) as session:
session.initialize(target=(run_as, code_to_debug))
session.set_breakpoints(code_to_debug, all)
session.start_debugging()
stop = session.wait_for_stop()

View file

@ -28,7 +28,6 @@ def test_continue_on_disconnect_for_attach(pyfile, start_method, run_as):
target=(run_as, code_to_debug),
start_method=start_method,
ignore_unobserved=[Event("exited"), Event("terminated")],
use_backchannel=True,
)
session.set_breakpoints(code_to_debug, [code_to_debug.lines["bp"]])
session.start_debugging()
@ -56,7 +55,6 @@ def test_exit_on_disconnect_for_launch(pyfile, start_method, run_as):
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
use_backchannel=True,
expected_returncode=some.int,
)
session.set_breakpoints(code_to_debug, code_to_debug.lines["bp"])

View file

@ -4,81 +4,89 @@
from __future__ import absolute_import, print_function, unicode_literals
import platform
import pytest
from tests import code, debug, net, test_data
import sys
from ptvsd.common import compat
from tests import code, debug, log, net, test_data
from tests.patterns import some
from tests.timeline import Event
from tests.net import find_http_url
pytestmark = pytest.mark.timeout(60)
django = net.WebServer(net.get_test_server_port(8000, 8100))
DJANGO1_ROOT = test_data / "django1"
DJANGO1_MANAGE = DJANGO1_ROOT / "app.py"
DJANGO1_TEMPLATE = DJANGO1_ROOT / "templates" / "hello.html"
DJANGO1_BAD_TEMPLATE = DJANGO1_ROOT / "templates" / "bad.html"
DJANGO_PORT = net.get_test_server_port(8000, 8100)
django = net.WebServer(DJANGO_PORT)
app_py_lines = code.get_marked_line_numbers(DJANGO1_MANAGE)
class paths:
django1 = test_data / "django1"
app_py = django1 / "app.py"
hello_html = django1 / "templates" / "hello.html"
bad_html = django1 / "templates" / "bad.html"
class lines:
app_py = code.get_marked_line_numbers(paths.app_py)
def _initialize_session(session, multiprocess=False):
program_args = [
"runserver",
"--",
str(django.port),
]
if not multiprocess:
program_args[1:1] = [
"--noreload",
]
session.initialize(
target=("file", paths.app_py),
program_args=program_args,
debug_options=["Django"],
cwd=paths.django1,
multiprocess=multiprocess,
expected_returncode=some.int, # No clean way to kill Django server
)
@pytest.mark.parametrize("bp_target", ["code", "template"])
@pytest.mark.parametrize("start_method", ["launch", "attach_socket_cmdline"])
@pytest.mark.timeout(60)
@pytest.mark.parametrize("bp_target", ["code", "template"])
def test_django_breakpoint_no_multiproc(start_method, bp_target):
bp_file, bp_line, bp_name = {
"code": (DJANGO1_MANAGE, app_py_lines["bphome"], "home"),
"template": (DJANGO1_TEMPLATE, 8, "Django Template"),
"code": (paths.app_py, lines.app_py["bphome"], "home"),
"template": (paths.hello_html, 8, "Django Template"),
}[bp_target]
bp_var_content = compat.force_str("Django-Django-Test")
with debug.Session() as session:
session.initialize(
start_method=start_method,
target=("file", DJANGO1_MANAGE),
program_args=["runserver", "--noreload", "--", str(DJANGO_PORT)],
debug_options=["Django"],
cwd=DJANGO1_ROOT,
expected_returncode=some.int, # No clean way to kill Django server
)
bp_var_content = "Django-Django-Test"
with debug.Session(start_method) as session:
_initialize_session(session)
session.set_breakpoints(bp_file, [bp_line])
session.start_debugging()
with django:
home_request = django.get("home")
stop = session.wait_for_stop(
home_request = django.get("/home")
session.wait_for_stop(
"breakpoint",
[
{
"id": some.dap_id,
"name": bp_name,
"source": {
"sourceReference": some.str,
"path": some.path(bp_file),
},
"line": bp_line,
"column": 1,
}
expected_frames=[
some.dap.frame(
some.dap.source(bp_file),
line=bp_line,
name=bp_name,
),
],
)
scopes = session.request("scopes", arguments={"frameId": stop.frame_id})
assert len(scopes) > 0
variables = session.request(
"variables",
arguments={"variablesReference": scopes[0]["variablesReference"]},
)
variables = [v for v in variables["variables"] if v["name"] == "content"]
assert variables == [
var_content = session.get_variable("content")
assert var_content == some.dict.containing(
{
"name": "content",
"type": "str",
"value": repr(bp_var_content),
"value": compat.unicode_repr(bp_var_content),
"presentationHint": {"attributes": ["rawString"]},
"evaluateName": "content",
"variablesReference": 0,
}
]
)
session.request_continue()
assert bp_var_content in home_request.response_text()
@ -87,64 +95,38 @@ def test_django_breakpoint_no_multiproc(start_method, bp_target):
@pytest.mark.parametrize("start_method", ["launch", "attach_socket_cmdline"])
@pytest.mark.timeout(60)
def test_django_template_exception_no_multiproc(start_method):
with debug.Session() as session:
session.initialize(
start_method=start_method,
target=("file", DJANGO1_MANAGE),
program_args=["runserver", "--noreload", "--nothreading", str(DJANGO_PORT)],
debug_options=["Django"],
cwd=DJANGO1_ROOT,
expected_returncode=some.int, # No clean way to kill Django server
)
session.send_request(
"setExceptionBreakpoints", arguments={"filters": ["raised", "uncaught"]}
).wait_for_response()
with debug.Session(start_method) as session:
_initialize_session(session)
session.request("setExceptionBreakpoints", {"filters": ["raised", "uncaught"]})
session.start_debugging()
with django:
web_request = django.get("badtemplate")
hit = session.wait_for_stop(reason="exception")
assert hit.frames[0] == some.dict.containing(
{
"id": some.dap_id,
"name": "Django TemplateSyntaxError",
"source": some.dict.containing(
{
"sourceReference": some.dap_id,
"path": some.path(DJANGO1_BAD_TEMPLATE),
}
),
"line": 8,
"column": 1,
}
with django:
django.get("/badtemplate", log_errors=False)
stop = session.wait_for_stop(
"exception",
expected_frames=[
some.dap.frame(
some.dap.source(paths.bad_html),
line=8,
name="Django TemplateSyntaxError",
)
],
)
# Will stop once in the plugin
resp_exception_info = session.send_request(
"exceptionInfo", arguments={"threadId": hit.thread_id}
).wait_for_response()
exception = resp_exception_info.body
assert exception == some.dict.containing(
exception_info = session.request(
"exceptionInfo", {"threadId": stop.thread_id}
)
assert exception_info == some.dict.containing(
{
"exceptionId": some.str.such_that(
lambda s: s.endswith("TemplateSyntaxError")
),
"exceptionId": some.str.ending_with("TemplateSyntaxError"),
"breakMode": "always",
"description": some.str.such_that(
lambda s: s.find("doesnotexist") > -1
),
"details": some.dict_with(
"description": some.str.containing("doesnotexist"),
"details": some.dict.containing(
{
"message": some.str.such_that(
lambda s: s.endswith("doesnotexist") > -1
),
"typeName": some.str.such_that(
lambda s: s.endswith("TemplateSyntaxError")
),
"message": some.str.containing("doesnotexist"),
"typeName": some.str.ending_with("TemplateSyntaxError"),
}
),
}
@ -152,195 +134,107 @@ def test_django_template_exception_no_multiproc(start_method):
session.request_continue()
# And a second time when the exception reaches the user code.
hit = session.wait_for_stop(reason="exception")
log.info("Exception will be reported again in {0}", paths.app_py)
session.wait_for_stop("exception")
session.request_continue()
# ignore response for exception tests
web_request.wait_for_response()
session.wait_for_exit()
@pytest.mark.parametrize("ex_type", ["handled", "unhandled"])
@pytest.mark.parametrize("start_method", ["launch", "attach_socket_cmdline"])
@pytest.mark.timeout(60)
def test_django_exception_no_multiproc(ex_type, start_method):
ex_line = {"handled": 50, "unhandled": 64}[ex_type]
with debug.Session() as session:
session.initialize(
start_method=start_method,
target=("file", DJANGO1_MANAGE),
program_args=["runserver", "--noreload", "--nothreading", str(DJANGO_PORT)],
debug_options=["Django"],
cwd=DJANGO1_ROOT,
expected_returncode=some.int, # No clean way to kill Django server
)
session.send_request(
"setExceptionBreakpoints", arguments={"filters": ["raised", "uncaught"]}
).wait_for_response()
@pytest.mark.parametrize("exc_type", ["handled", "unhandled"])
def test_django_exception_no_multiproc(start_method, exc_type):
exc_line = lines.app_py["exc_" + exc_type]
with debug.Session(start_method) as session:
_initialize_session(session)
session.request("setExceptionBreakpoints", {"filters": ["raised", "uncaught"]})
session.start_debugging()
with django:
web_request = django.get(ex_type)
django.get("/" + exc_type)
stopped = session.wait_for_stop(
"exception",
expected_frames=[
some.dap.frame(
some.dap.source(paths.app_py),
line=exc_line,
name="bad_route_" + exc_type,
)
],
).body
thread_stopped = session.wait_for_next(
Event("stopped", some.dict.containing({"reason": "exception"}))
)
assert thread_stopped == Event(
"stopped",
some.dict.containing(
{
"reason": "exception",
"text": some.str.such_that(
lambda s: s.endswith("ArithmeticError")
),
"description": "Hello",
}
),
assert stopped == some.dict.containing(
{
"reason": "exception",
"text": some.str.ending_with("ArithmeticError"),
"description": "Hello",
}
)
tid = thread_stopped.body["threadId"]
resp_exception_info = session.send_request(
"exceptionInfo", arguments={"threadId": tid}
).wait_for_response()
exception = resp_exception_info.body
assert exception == {
"exceptionId": some.str.such_that(
lambda s: s.endswith("ArithmeticError")
),
exception_info = session.request(
"exceptionInfo", {"threadId": stopped["threadId"]}
)
assert exception_info == {
"exceptionId": some.str.ending_with("ArithmeticError"),
"breakMode": "always",
"description": "Hello",
"details": {
"message": "Hello",
"typeName": some.str.such_that(
lambda s: s.endswith("ArithmeticError")
),
"source": some.path(DJANGO1_MANAGE),
"stackTrace": some.str.such_that(lambda s: True),
"typeName": some.str.ending_with("ArithmeticError"),
"source": some.path(paths.app_py),
"stackTrace": some.str,
},
}
resp_stacktrace = session.send_request(
"stackTrace", arguments={"threadId": tid}
).wait_for_response()
assert resp_stacktrace.body["totalFrames"] > 1
frames = resp_stacktrace.body["stackFrames"]
assert frames[0] == {
"id": some.dap_id,
"name": "bad_route_" + ex_type,
"source": {
"sourceReference": some.dap_id,
"path": some.path(DJANGO1_MANAGE),
},
"line": ex_line,
"column": 1,
}
session.request_continue()
# ignore response for exception tests
web_request.wait_for_response()
session.wait_for_exit()
@pytest.mark.skip()
@pytest.mark.timeout(120)
@pytest.mark.parametrize("start_method", ["launch"])
@pytest.mark.skipif(
sys.version_info < (3, 0) and platform.system() != "Windows",
reason="https://github.com/microsoft/ptvsd/issues/935",
)
def test_django_breakpoint_multiproc(start_method):
with debug.Session() as parent_session:
parent_session.initialize(
start_method=start_method,
target=("file", DJANGO1_MANAGE),
multiprocess=True,
program_args=["runserver"],
debug_options=["Django"],
cwd=DJANGO1_ROOT,
ignore_unobserved=[Event("stopped")],
expected_returncode=some.int, # No clean way to kill Django server
)
bp_line = lines.app_py["bphome"]
bp_var_content = compat.force_str("Django-Django-Test")
bp_line = app_py_lines["bphome"]
bp_var_content = "Django-Django-Test"
parent_session.set_breakpoints(DJANGO1_MANAGE, [bp_line])
with debug.Session(start_method) as parent_session:
_initialize_session(parent_session, multiprocess=True)
parent_session.set_breakpoints(paths.app_py, [bp_line])
parent_session.start_debugging()
with parent_session.connect_to_next_child_session() as child_session:
child_session.send_request(
"setBreakpoints",
arguments={
"source": {"path": DJANGO1_MANAGE},
"breakpoints": [{"line": bp_line}],
},
).wait_for_response()
with parent_session.attach_to_next_subprocess() as child_session:
child_session.set_breakpoints(paths.app_py, [bp_line])
child_session.start_debugging()
# wait for Django server to start
while True:
child_session.proceed()
o = child_session.wait_for_next(Event("output"))
if find_http_url(o.body["output"]) is not None:
break
with django:
web_request = django.get("home")
thread_stopped = child_session.wait_for_next(
Event("stopped", some.dict.containing({"reason": "breakpoint"}))
home_request = django.get("/home")
child_session.wait_for_stop(
"breakpoint",
expected_frames=[
some.dap.frame(
some.dap.source(paths.app_py), line=bp_line, name="home"
)
],
)
assert thread_stopped.body["threadId"] is not None
tid = thread_stopped.body["threadId"]
resp_stacktrace = child_session.send_request(
"stackTrace", arguments={"threadId": tid}
).wait_for_response()
assert resp_stacktrace.body["totalFrames"] > 0
frames = resp_stacktrace.body["stackFrames"]
assert frames[0] == {
"id": some.dap_id,
"name": "home",
"source": {
"sourceReference": some.dap_id,
"path": some.path(DJANGO1_MANAGE),
},
"line": bp_line,
"column": 1,
}
fid = frames[0]["id"]
resp_scopes = child_session.send_request(
"scopes", arguments={"frameId": fid}
).wait_for_response()
scopes = resp_scopes.body["scopes"]
assert len(scopes) > 0
resp_variables = child_session.send_request(
"variables",
arguments={"variablesReference": scopes[0]["variablesReference"]},
).wait_for_response()
variables = list(
v
for v in resp_variables.body["variables"]
if v["name"] == "content"
)
assert variables == [
var_content = child_session.get_variable("content")
assert var_content == some.dict.containing(
{
"name": "content",
"type": "str",
"value": repr(bp_var_content),
"value": compat.unicode_repr(bp_var_content),
"presentationHint": {"attributes": ["rawString"]},
"evaluateName": "content",
}
]
)
child_session.request_continue()
web_content = web_request.wait_for_response()
assert web_content.find(bp_var_content) != -1
assert bp_var_content in home_request.response_text()
child_session.wait_for_termination()
parent_session.wait_for_exit()

View file

@ -54,14 +54,14 @@ def test_variables_and_evaluate(pyfile, start_method, run_as):
assert b_variables[0] == {
"type": "int",
"value": "1",
"name": some.str.matching(r".*one.*"),
"name": some.str.containing("one"),
"evaluateName": "b['one']",
"variablesReference": 0,
}
assert b_variables[1] == {
"type": "int",
"value": "2",
"name": some.str.matching(r".*two.*"),
"name": some.str.containing("two"),
"evaluateName": "b['two']",
"variablesReference": 0,
}
@ -117,7 +117,6 @@ def test_set_variable(pyfile, start_method, run_as):
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
use_backchannel=True,
)
session.start_debugging()
hit = session.wait_for_stop()
@ -427,7 +426,7 @@ def test_hex_numbers(pyfile, start_method, run_as):
"value": "[0x1, 0xa, 0x64]",
"type": "list",
"evaluateName": "b",
"variablesReference": some.dap_id,
"variablesReference": some.dap.id,
}
)
@ -477,7 +476,7 @@ def test_hex_numbers(pyfile, start_method, run_as):
"value": "{0xa: 0xa, 0x64: 0x64, 0x3e8: 0x3e8}",
"type": "dict",
"evaluateName": "c",
"variablesReference": some.dap_id,
"variablesReference": some.dap.id,
}
)
@ -527,7 +526,7 @@ def test_hex_numbers(pyfile, start_method, run_as):
"value": "{(0x1, 0xa, 0x64): (0x2710, 0x186a0, 0x186a0)}",
"type": "dict",
"evaluateName": "d",
"variablesReference": some.dap_id,
"variablesReference": some.dap.id,
}
)
resp_variables = session.send_request(
@ -544,7 +543,7 @@ def test_hex_numbers(pyfile, start_method, run_as):
"value": "(0x2710, 0x186a0, 0x186a0)",
"type": "tuple",
"evaluateName": "d[(1, 10, 100)]",
"variablesReference": some.dap_id,
"variablesReference": some.dap.id,
},
{
"name": "__len__",

View file

@ -10,9 +10,7 @@ from tests import debug
from tests.patterns import some
from tests.timeline import Event
pytestmark = pytest.mark.skip("Exception tests are broken")
str_matching_ArithmeticError = some.str.matching(r"($|.*\.)ArithmeticError")
str_matching_ArithmeticError = some.str.matching(r"(.+\.)?ArithmeticError")
@pytest.mark.parametrize("raised", ["raisedOn", "raisedOff"])
@ -36,11 +34,10 @@ def test_vsc_exception_options_raise_with_except(
filters = []
filters += ["raised"] if raised == "raisedOn" else []
filters += ["uncaught"] if uncaught == "uncaughtOn" else []
with debug.Session() as session:
session.initialize(target=(run_as, code_to_debug), start_method=start_method)
session.send_request(
"setExceptionBreakpoints", {"filters": filters}
).wait_for_response()
session.request("setExceptionBreakpoints", {"filters": filters})
session.start_debugging()
expected = some.dict.containing(
@ -60,9 +57,9 @@ def test_vsc_exception_options_raise_with_except(
if raised == "raisedOn":
hit = session.wait_for_stop(
reason="exception",
text=str_matching_ArithmeticError,
description="bad code",
"exception",
expected_text=str_matching_ArithmeticError,
expected_description="bad code",
)
assert ex_line == hit.frames[0]["line"]
@ -124,7 +121,7 @@ def test_vsc_exception_options_raise_without_except(
)
if raised == "raisedOn":
hit = session.wait_for_stop(reason="exception")
hit = session.wait_for_stop("exception")
assert ex_line == hit.frames[0]["line"]
resp_exc_info = session.send_request(
@ -138,11 +135,11 @@ def test_vsc_exception_options_raise_without_except(
# This behavior can be changed by updating 'notify_on_handled_exceptions'
# setting we send to pydevd to notify only once. In our test code, we have
# two frames, hence two stops.
session.wait_for_stop(reason="exception")
session.wait_for_stop("exception")
session.request_continue()
if uncaught == "uncaughtOn":
hit = session.wait_for_stop(reason="exception")
hit = session.wait_for_stop("exception")
assert ex_line == hit.frames[0]["line"]
resp_exc_info = session.send_request(
@ -213,11 +210,11 @@ def test_systemexit(pyfile, start_method, run_as, raised, uncaught, zero, exit_c
# When breaking on raised exceptions, we'll stop on both lines,
# unless it's SystemExit(0) and we asked to ignore that.
if raised and (zero or exit_code != 0):
hit = session.wait_for_stop(reason="exception")
hit = session.wait_for_stop("exception")
assert hit.frames[0]["line"] == line_numbers["handled"]
session.request_continue()
hit = session.wait_for_stop(reason="exception")
hit = session.wait_for_stop("exception")
assert hit.frames[0]["line"] == line_numbers["unhandled"]
session.request_continue()
@ -228,7 +225,7 @@ def test_systemexit(pyfile, start_method, run_as, raised, uncaught, zero, exit_c
# for it unwinding the stack without finding a handler. The block above
# takes care of the first stop, so here we just take care of the second.
if uncaught and (zero or exit_code != 0):
hit = session.wait_for_stop(reason="exception")
hit = session.wait_for_stop("exception")
assert hit.frames[0]["line"] == line_numbers["unhandled"]
session.request_continue()
@ -315,7 +312,7 @@ def test_raise_exception_options(pyfile, start_method, run_as, exceptions, break
session.start_debugging()
for expected_exception in expect_exceptions:
hit = session.wait_for_stop(reason="exception")
hit = session.wait_for_stop("exception")
assert hit.frames[0]["source"]["path"].endswith("code_to_debug.py")
assert hit.frames[0]["line"] == code_to_debug.lines[expected_exception]
session.request_continue()
@ -348,7 +345,7 @@ def test_success_exitcodes(pyfile, start_method, run_as, exit_code):
session.start_debugging()
if exit_code == 0:
session.wait_for_stop(reason="exception")
session.wait_for_stop("exception")
session.request_continue()
session.wait_for_exit()
@ -397,7 +394,7 @@ def test_exception_stack(pyfile, start_method, run_as, max_frames):
).wait_for_response()
session.start_debugging()
hit = session.wait_for_stop(reason="exception")
hit = session.wait_for_stop("exception")
assert hit.frames[0]["line"] == code_to_debug.lines["unhandled"]
resp_exc_info = session.send_request(
@ -406,12 +403,12 @@ def test_exception_stack(pyfile, start_method, run_as, max_frames):
expected = some.dict.containing(
{
"exceptionId": some.str.matching(ArithmeticError.__name__),
"exceptionId": str_matching_ArithmeticError,
"description": "bad code",
"breakMode": "unhandled",
"details": some.dict.containing(
{
"typeName": some.str.matching(ArithmeticError.__name__),
"typeName": str_matching_ArithmeticError,
"message": "bad code",
"source": some.path(code_to_debug),
}

View file

@ -4,20 +4,18 @@
from __future__ import absolute_import, print_function, unicode_literals
import os.path
import pytest
from tests import debug, log, test_data
from tests import code, debug, log, test_data
from tests.patterns import some
@pytest.mark.parametrize("scenario", ["exclude_by_name", "exclude_by_dir"])
@pytest.mark.parametrize("exception_type", ["RuntimeError", "SysExit"])
@pytest.mark.parametrize("exc_type", ["RuntimeError", "SystemExit"])
def test_exceptions_and_exclude_rules(
pyfile, start_method, run_as, scenario, exception_type
pyfile, start_method, run_as, scenario, exc_type
):
if exception_type == "RuntimeError":
if exc_type == "RuntimeError":
@pyfile
def code_to_debug():
@ -25,7 +23,7 @@ def test_exceptions_and_exclude_rules(
raise RuntimeError("unhandled error") # @raise_line
elif exception_type == "SysExit":
elif exc_type == "SystemExit":
@pyfile
def code_to_debug():
@ -35,7 +33,7 @@ def test_exceptions_and_exclude_rules(
sys.exit(1) # @raise_line
else:
raise AssertionError("Unexpected exception_type: %s" % (exception_type,))
pytest.fail(exc_type)
if scenario == "exclude_by_name":
rules = [{"path": "**/" + code_to_debug.basename, "include": False}]
@ -45,18 +43,16 @@ def test_exceptions_and_exclude_rules(
pytest.fail(scenario)
log.info("Rules: {0!j}", rules)
with debug.Session() as session:
with debug.Session(start_method) as session:
session.initialize(
target=(run_as, code_to_debug), start_method=start_method, rules=rules
target=(run_as, code_to_debug),
rules=rules,
# https://github.com/Microsoft/ptvsd/issues/1278:
expected_returncode=some.int,
)
session.request(
"setExceptionBreakpoints", {"filters": ["raised", "uncaught"]}
)
# TODO: The process returncode doesn't match the one returned from the DAP.
# See: https://github.com/Microsoft/ptvsd/issues/1278
session.expected_returncode = some.int
filters = ["raised", "uncaught"]
session.send_request(
"setExceptionBreakpoints", {"filters": filters}
).wait_for_response()
session.start_debugging()
# No exceptions should be seen.
@ -70,20 +66,20 @@ def test_exceptions_and_partial_exclude_rules(pyfile, start_method, run_as, scen
from debug_me import backchannel
import sys
json = backchannel.receive()
call_me_back_dir = json["call_me_back_dir"]
sys.path.append(call_me_back_dir)
call_me_back_dir = backchannel.receive()
sys.path.insert(0, call_me_back_dir)
import call_me_back
def call_func():
raise RuntimeError("unhandled error") # @raise_line
raise RuntimeError("unhandled error") # @raise
call_me_back.call_me_back(call_func) # @call_me_back_line
call_me_back.call_me_back(call_func) # @call_me_back
print("done")
line_numbers = code_to_debug.lines
call_me_back_dir = test_data / "call_me_back"
call_me_back_py = call_me_back_dir / "call_me_back.py"
call_me_back_py.lines = code.get_marked_line_numbers(call_me_back_py)
if scenario == "exclude_code_to_debug":
rules = [{"path": "**/" + code_to_debug.basename, "include": False}]
@ -93,104 +89,113 @@ def test_exceptions_and_partial_exclude_rules(pyfile, start_method, run_as, scen
pytest.fail(scenario)
log.info("Rules: {0!j}", rules)
with debug.Session() as session:
with debug.Session(start_method) as session:
backchannel = session.setup_backchannel()
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
rules=rules,
# https://github.com/Microsoft/ptvsd/issues/1278:
expected_returncode=some.int,
)
session.request(
"setExceptionBreakpoints", {"filters": ["raised", "uncaught"]}
)
# TODO: The process returncode doesn't match the one returned from the DAP.
# See: https://github.com/Microsoft/ptvsd/issues/1278
session.expected_returncode = some.int
filters = ["raised", "uncaught"]
session.send_request(
"setExceptionBreakpoints", {"filters": filters}
).wait_for_response()
session.start_debugging()
backchannel.send({"call_me_back_dir": call_me_back_dir})
backchannel.send(call_me_back_dir)
if scenario == "exclude_code_to_debug":
# Stop at handled
hit = session.wait_for_stop(reason="exception")
# We don't stop at the raise line but rather at the callback module which is
# not excluded.
assert len(hit.frames) == 1
assert hit.frames[0] == some.dict.containing(
{
"line": 2,
"source": some.dict.containing(
{
"path": some.path(
os.path.join(call_me_back_dir, "call_me_back.py")
)
}
),
}
)
# assert hit.frames[1] == some.dict.containing({ -- filtered out
# 'line': line_numbers['call_me_back_line'],
# 'source': some.dict.containing({
# 'path': some.path(code_to_debug)
# })
# })
# 'continue' should terminate the debuggee
session.request_continue()
# Stop at handled exception, with code_to_debug.py excluded.
#
# Since the module raising the exception is excluded, it must not stop at
# @raise, but rather at @callback (i.e. the closest non-excluded frame).
# Note: does not stop at unhandled exception because raise was in excluded file.
stop = session.wait_for_stop(
"exception",
expected_frames=[
some.dap.frame(
some.dap.source(call_me_back_py),
line=call_me_back_py.lines["callback"],
),
],
)
assert stop.frames != some.list.containing([
some.dap.frame(some.dap.source(code_to_debug), line=some.int),
])
# As exception unwinds the stack, we shouldn't stop at @call_me_back,
# since that line is in the excluded file. Furthermore, although the
# exception is unhandled, we shouldn't get a stop for that, either,
# because the exception is last seen in an excluded file.
session.request_continue()
elif scenario == "exclude_callback_dir":
# Stop at handled raise_line
hit = session.wait_for_stop(reason="exception")
assert [
(frame["name"], os.path.basename(frame["source"]["path"]))
for frame in hit.frames
] == [
("call_func", "code_to_debug.py"),
# ('call_me_back', 'call_me_back.py'), -- filtered out
("<module>", "code_to_debug.py"),
]
assert hit.frames[0] == some.dict.containing(
{
"line": line_numbers["raise_line"],
"source": some.dict.containing({"path": some.path(code_to_debug)}),
}
)
session.send_request("continue").wait_for_response()
# Stop at handled exception, with call_me_back.py excluded.
#
# Since the module raising the exception is not excluded, it must stop at
# @raise.
# Stop at handled call_me_back_line
hit = session.wait_for_stop(reason="exception")
assert [
(frame["name"], os.path.basename(frame["source"]["path"]))
for frame in hit.frames
] == [("<module>", "code_to_debug.py")]
assert hit.frames[0] == some.dict.containing(
{
"line": line_numbers["call_me_back_line"],
"source": some.dict.containing({"path": some.path(code_to_debug)}),
}
stop = session.wait_for_stop(
"exception",
expected_frames=[
some.dap.frame(
some.dap.source(code_to_debug),
name="call_func",
line=code_to_debug.lines["raise"],
),
some.dap.frame(
some.dap.source(code_to_debug),
name="<module>",
line=code_to_debug.lines["call_me_back"],
),
],
)
session.send_request("continue").wait_for_response()
assert stop.frames != some.list.containing([
some.dap.frame(some.dap.source(call_me_back_py), line=some.int),
])
# Stop at unhandled
hit = session.wait_for_stop(reason="exception")
assert [
(frame["name"], os.path.basename(frame["source"]["path"]))
for frame in hit.frames
] == [
("call_func", "code_to_debug.py"),
# ('call_me_back', 'call_me_back.py'), -- filtered out
("<module>", "code_to_debug.py"),
]
assert hit.frames[0] == some.dict.containing(
{
"line": line_numbers["raise_line"],
"source": some.dict.containing({"path": some.path(code_to_debug)}),
}
)
session.request_continue()
# As exception unwinds the stack, it must not stop at @callback, since that
# line is in the excluded file. However, it must stop at @call_me_back.
stop = session.wait_for_stop(
"exception",
expected_frames=[
some.dap.frame(
some.dap.source(code_to_debug),
name="<module>",
line=code_to_debug.lines["call_me_back"],
),
],
)
assert stop.frames != some.list.containing([
some.dap.frame(some.dap.source(call_me_back_py), line=some.int),
])
session.request_continue()
# Now the exception is unhandled, and should be reported as such.
stop = session.wait_for_stop(
"exception",
expected_frames=[
some.dap.frame(
some.dap.source(code_to_debug),
name="call_func",
line=code_to_debug.lines["raise"],
),
some.dap.frame(
some.dap.source(code_to_debug),
name="<module>",
line=code_to_debug.lines["call_me_back"],
),
],
)
assert stop.frames != some.list.containing([
some.dap.frame(some.dap.source(call_me_back_py), line=some.int),
])
# Let the process crash due to unhandled exception.
session.request_continue()
else:
pytest.fail(scenario)

View file

@ -8,86 +8,88 @@ import platform
import pytest
import sys
from tests import code, debug, net, test_data
from ptvsd.common import compat
from tests import code, debug, log, net, test_data
from tests.patterns import some
from tests.timeline import Event
pytestmark = pytest.mark.timeout(60)
flask = net.WebServer(net.get_test_server_port(7000, 7100))
FLASK1_ROOT = test_data / "flask1"
FLASK1_APP = FLASK1_ROOT / "app.py"
FLASK1_TEMPLATE = FLASK1_ROOT / "templates" / "hello.html"
FLASK1_BAD_TEMPLATE = FLASK1_ROOT / "templates" / "bad.html"
FLASK_PORT = net.get_test_server_port(7000, 7100)
flask_server = net.WebServer(FLASK_PORT)
app_py_lines = code.get_marked_line_numbers(FLASK1_APP)
class paths:
flask1 = test_data / "flask1"
app_py = flask1 / "app.py"
hello_html = flask1 / "templates" / "hello.html"
bad_html = flask1 / "templates" / "bad.html"
def _initialize_flask_session_no_multiproc(session, start_method):
env = {"FLASK_APP": "app.py", "FLASK_ENV": "development", "FLASK_DEBUG": "0"}
class lines:
app_py = code.get_marked_line_numbers(paths.app_py)
def _initialize_session(session, multiprocess=False):
env = {
"FLASK_APP": paths.app_py,
"FLASK_ENV": "development",
"FLASK_DEBUG": "1" if multiprocess else "0",
}
if platform.system() != "Windows":
locale = "en_US.utf8" if platform.system() == "Linux" else "en_US.UTF-8"
env.update({"LC_ALL": locale, "LANG": locale})
session.initialize(
start_method=start_method,
target=("module", "flask"),
program_args=[
"run",
program_args = [
"run",
"--port",
str(flask.port),
]
if not multiprocess:
program_args[1:1] = [
"--no-debugger",
"--no-reload",
"--with-threads",
"--port",
str(FLASK_PORT),
],
ignore_unobserved=[Event("stopped")],
]
session.initialize(
target=("module", "flask"),
program_args=program_args,
debug_options=["Jinja"],
cwd=FLASK1_ROOT,
cwd=paths.flask1,
env=env,
multiprocess=multiprocess,
expected_returncode=some.int, # No clean way to kill Flask server
)
@pytest.mark.parametrize("bp_target", ["code", "template"])
@pytest.mark.parametrize("start_method", ["launch", "attach_socket_cmdline"])
@pytest.mark.timeout(60)
def test_flask_breakpoint_no_multiproc(bp_target, start_method):
@pytest.mark.parametrize("bp_target", ["code", "template"])
def test_flask_breakpoint_no_multiproc(start_method, bp_target):
bp_file, bp_line, bp_name = {
"code": (FLASK1_APP, app_py_lines["bphome"], "home"),
"template": (FLASK1_TEMPLATE, 8, "template"),
"code": (paths.app_py, lines.app_py["bphome"], "home"),
"template": (paths.hello_html, 8, "template"),
}[bp_target]
bp_var_content = compat.force_str("Flask-Jinja-Test")
with debug.Session() as session:
_initialize_flask_session_no_multiproc(session, start_method)
bp_var_content = "Flask-Jinja-Test"
with debug.Session(start_method) as session:
_initialize_session(session)
session.set_breakpoints(bp_file, [bp_line])
session.start_debugging()
with flask_server:
home_request = flask_server.get("/")
with flask:
home_request = flask.get("/")
session.wait_for_stop(
"breakpoint",
expected_frames=[
some.dap.frame(
some.dap.source(bp_file),
name=bp_name,
line=bp_line,
),
],
)
hit = session.wait_for_stop(reason="breakpoint")
assert hit.frames[0] == {
"id": some.dap_id,
"name": bp_name,
"source": {"sourceReference": some.dap_id, "path": some.path(bp_file)},
"line": bp_line,
"column": 1,
}
resp_scopes = session.send_request(
"scopes", arguments={"frameId": hit.frame_id}
).wait_for_response()
scopes = resp_scopes.body["scopes"]
assert len(scopes) > 0
resp_variables = session.send_request(
"variables",
arguments={"variablesReference": scopes[0]["variablesReference"]},
).wait_for_response()
variables = [v for v in resp_variables.body["variables"] if v["name"] == "content"]
assert variables == [
var_content = session.get_variable("content")
assert var_content == some.dict.containing(
{
"name": "content",
"type": "str",
@ -96,7 +98,7 @@ def test_flask_breakpoint_no_multiproc(bp_target, start_method):
"evaluateName": "content",
"variablesReference": 0,
}
]
)
session.request_continue()
assert bp_var_content in home_request.response_text()
@ -105,60 +107,37 @@ def test_flask_breakpoint_no_multiproc(bp_target, start_method):
@pytest.mark.parametrize("start_method", ["launch", "attach_socket_cmdline"])
@pytest.mark.timeout(60)
def test_flask_template_exception_no_multiproc(start_method):
with debug.Session() as session:
_initialize_flask_session_no_multiproc(session, start_method)
session.send_request(
"setExceptionBreakpoints", arguments={"filters": ["raised", "uncaught"]}
).wait_for_response()
with debug.Session(start_method) as session:
_initialize_session(session)
session.request("setExceptionBreakpoints", {"filters": ["raised", "uncaught"]})
session.start_debugging()
# wait for Flask web server to start
with flask_server:
web_request = flask_server.get("badtemplate")
hit = session.wait_for_stop()
assert hit.frames[0] == some.dict.containing(
{
"id": some.dap_id,
"name": "template"
if sys.version_info[0] >= 3
else "Jinja2 TemplateSyntaxError",
"source": some.dict.containing(
{
"sourceReference": some.dap_id,
"path": some.path(FLASK1_BAD_TEMPLATE),
}
with flask:
flask.get("/badtemplate")
stop = session.wait_for_stop(
"exception",
expected_frames=[
some.dap.frame(
some.dap.source(paths.bad_html),
name=some.str, # varies depending on Jinja version
line=8,
),
"line": 8,
"column": 1,
}
],
)
resp_exception_info = session.send_request(
"exceptionInfo", arguments={"threadId": hit.thread_id}
).wait_for_response()
exception = resp_exception_info.body
assert exception == some.dict.containing(
exception_info = session.request(
"exceptionInfo", {"threadId": stop.thread_id}
)
assert exception_info == some.dict.containing(
{
"exceptionId": some.str.such_that(
lambda s: s.endswith("TemplateSyntaxError")
),
"exceptionId": some.str.ending_with("TemplateSyntaxError"),
"breakMode": "always",
"description": some.str.such_that(
lambda s: s.find("doesnotexist") > -1
),
"description": some.str.containing("doesnotexist"),
"details": some.dict.containing(
{
"message": some.str.such_that(
lambda s: s.find("doesnotexist") > -1
),
"typeName": some.str.such_that(
lambda s: s.endswith("TemplateSyntaxError")
),
"message": some.str.containing("doesnotexist"),
"typeName": some.str.ending_with("TemplateSyntaxError"),
}
),
}
@ -166,150 +145,107 @@ def test_flask_template_exception_no_multiproc(start_method):
session.request_continue()
# ignore response for exception tests
web_request.wait_for_response()
log.info("Exception will be reported again in {0}", paths.app_py)
session.wait_for_stop("exception")
session.request_continue()
# In Python 2, Flask reports this exception one more time, and it is
# reported for both frames again.
if sys.version_info < (3,):
log.info("Exception gets double-reported in Python 2.")
session.wait_for_stop("exception")
session.request_continue()
session.wait_for_stop("exception")
session.request_continue()
session.wait_for_exit()
@pytest.mark.parametrize("ex_type", ["handled", "unhandled"])
@pytest.mark.parametrize("start_method", ["launch", "attach_socket_cmdline"])
@pytest.mark.timeout(60)
def test_flask_exception_no_multiproc(ex_type, start_method):
ex_line = {"handled": 21, "unhandled": 33}[ex_type]
with debug.Session() as session:
_initialize_flask_session_no_multiproc(session, start_method)
session.send_request(
"setExceptionBreakpoints", arguments={"filters": ["raised", "uncaught"]}
).wait_for_response()
@pytest.mark.parametrize("exc_type", ["handled", "unhandled"])
def test_flask_exception_no_multiproc(start_method, exc_type):
exc_line = lines.app_py["exc_" + exc_type]
with debug.Session(start_method) as session:
_initialize_session(session)
session.request("setExceptionBreakpoints", {"filters": ["raised", "uncaught"]})
session.start_debugging()
with flask_server:
web_request = flask_server.get(ex_type)
with flask:
flask.get("/" + exc_type)
stopped = session.wait_for_stop(
"exception",
expected_frames=[
some.dap.frame(
some.dap.source(paths.app_py),
line=exc_line,
name="bad_route_" + exc_type,
)
],
).body
thread_stopped = session.wait_for_next(
Event("stopped", some.dict.containing({"reason": "exception"}))
)
assert thread_stopped == Event(
"stopped",
some.dict.containing(
{
"reason": "exception",
"text": some.str.such_that(lambda s: s.endswith("ArithmeticError")),
"description": "Hello",
}
),
assert stopped == some.dict.containing(
{
"reason": "exception",
"text": some.str.ending_with("ArithmeticError"),
"description": "Hello",
}
)
tid = thread_stopped.body["threadId"]
resp_exception_info = session.send_request(
"exceptionInfo", arguments={"threadId": tid}
).wait_for_response()
exception = resp_exception_info.body
assert exception == {
"exceptionId": some.str.such_that(lambda s: s.endswith("ArithmeticError")),
exception_info = session.request(
"exceptionInfo", {"threadId": stopped["threadId"]}
)
assert exception_info == {
"exceptionId": some.str.ending_with("ArithmeticError"),
"breakMode": "always",
"description": "Hello",
"details": {
"message": "Hello",
"typeName": some.str.such_that(lambda s: s.endswith("ArithmeticError")),
"source": some.path(FLASK1_APP),
"stackTrace": some.str.such_that(lambda s: True),
"typeName": some.str.ending_with("ArithmeticError"),
"source": some.path(paths.app_py),
"stackTrace": some.str,
},
}
resp_stacktrace = session.send_request(
"stackTrace", arguments={"threadId": tid}
).wait_for_response()
assert resp_stacktrace.body["totalFrames"] > 0
frames = resp_stacktrace.body["stackFrames"]
assert frames[0] == {
"id": some.dap_id,
"name": "bad_route_" + ex_type,
"source": {"sourceReference": some.dap_id, "path": some.path(FLASK1_APP)},
"line": ex_line,
"column": 1,
}
session.request_continue()
# ignore response for exception tests
web_request.wait_for_response()
session.wait_for_exit()
@pytest.mark.timeout(120)
@pytest.mark.parametrize("start_method", ["launch"])
@pytest.mark.skipif(
(sys.version_info < (3, 0)) and (platform.system() != "Windows"), reason="Bug #935"
sys.version_info < (3, 0) and platform.system() != "Windows",
reason="https://github.com/microsoft/ptvsd/issues/935",
)
def test_flask_breakpoint_multiproc(start_method):
env = {"FLASK_APP": "app", "FLASK_ENV": "development", "FLASK_DEBUG": "1"}
if platform.system() != "Windows":
locale = "en_US.utf8" if platform.system() == "Linux" else "en_US.UTF-8"
env.update({"LC_ALL": locale, "LANG": locale})
bp_line = lines.app_py["bphome"]
bp_var_content = compat.force_str("Flask-Jinja-Test")
with debug.Session() as parent_session:
parent_session.initialize(
start_method=start_method,
target=("module", "flask"),
multiprocess=True,
program_args=["run", "--port", str(FLASK_PORT)],
ignore_unobserved=[Event("stopped")],
debug_options=["Jinja"],
cwd=FLASK1_ROOT,
env=env,
expected_returncode=some.int, # No clean way to kill Flask server
)
bp_line = app_py_lines["bphome"]
bp_var_content = "Flask-Jinja-Test"
parent_session.set_breakpoints(FLASK1_APP, [bp_line])
with debug.Session(start_method) as parent_session:
_initialize_session(parent_session, multiprocess=True)
parent_session.set_breakpoints(paths.app_py, [bp_line])
parent_session.start_debugging()
with parent_session.connect_to_next_child_session() as child_session:
child_session.send_request(
"setBreakpoints",
arguments={
"source": {"path": FLASK1_APP},
"breakpoints": [{"line": bp_line}],
},
).wait_for_response()
with parent_session.attach_to_next_subprocess() as child_session:
child_session.set_breakpoints(paths.app_py, [bp_line])
child_session.start_debugging()
with flask_server:
web_request = flask_server.get("/")
with flask:
home_request = flask.get("/")
child_session.wait_for_stop(
"breakpoint",
expected_frames=[
some.dap.frame(
some.dap.source(paths.app_py),
line=bp_line,
name="home",
),
],
)
hit = child_session.wait_for_stop(reason="breakpoint")
assert hit.frames[0] == {
"id": some.dap_id,
"name": "home",
"source": {
"sourceReference": some.dap_id,
"path": some.path(FLASK1_APP),
},
"line": bp_line,
"column": 1,
}
resp_scopes = child_session.send_request(
"scopes", arguments={"frameId": hit.frames_id}
).wait_for_response()
scopes = resp_scopes.body["scopes"]
assert len(scopes) > 0
resp_variables = child_session.send_request(
"variables",
arguments={"variablesReference": scopes[0]["variablesReference"]},
).wait_for_response()
variables = [
v for v in resp_variables.body["variables"] if v["name"] == "content"
]
assert variables == [
var_content = child_session.get_variable("content")
assert var_content == some.dict.containing(
{
"name": "content",
"type": "str",
@ -318,10 +254,10 @@ def test_flask_breakpoint_multiproc(start_method):
"evaluateName": "content",
"variablesReference": 0,
}
]
)
child_session.request_continue()
assert bp_var_content in web_request.response_text()
assert bp_var_content in home_request.response_text()
child_session.wait_for_termination()
parent_session.wait_for_exit()

View file

@ -7,6 +7,7 @@ from __future__ import absolute_import, print_function, unicode_literals
import contextlib
import pytest
from ptvsd.common import compat
from tests import debug
@ -44,16 +45,20 @@ def test_log_cli(pyfile, tmpdir, start_method, run_as, cli):
def test_log_api(pyfile, tmpdir, run_as):
@pyfile
def code_to_debug():
# import sys
import debug_me # noqa
from debug_me import backchannel, ptvsd
port, log_dir = backchannel.receive()
ptvsd.enable_attach(("localhost", port), log_dir=log_dir)
ptvsd.wait_for_attach()
# import_and_enable_debugger(log_dir=str(sys.argv[1]))
log_dir = compat.filename(tmpdir)
with debug.Session("custom_server") as session:
backchannel = session.setup_backchannel()
@session.before_connect
def before_connect():
backchannel.send([session.ptvsd_port, log_dir])
with debug.Session() as session:
with check_logs(tmpdir, session):
session.program_args += [str(tmpdir)]
session.initialize(
target=(run_as, code_to_debug), start_method="attach_socket_import"
)
session.initialize(target=(run_as, code_to_debug))
session.start_debugging()
session.wait_for_exit()

View file

@ -75,7 +75,6 @@ def test_multiprocessing(pyfile, start_method, run_as):
multiprocess=True,
target=(run_as, code_to_debug),
start_method=start_method,
use_backchannel=True,
)
parent_session.start_debugging()
@ -105,7 +104,7 @@ def test_multiprocessing(pyfile, start_method, run_as):
)
parent_session.proceed()
with parent_session.connect_to_child_session(child_subprocess) as child_session:
with parent_session.attach_to_subprocess(child_subprocess) as child_session:
child_session.start_debugging()
grandchild_subprocess = parent_session.wait_for_next(
@ -128,7 +127,7 @@ def test_multiprocessing(pyfile, start_method, run_as):
)
parent_session.proceed()
with parent_session.connect_to_child_session(
with parent_session.attach_to_subprocess(
grandchild_subprocess
) as grandchild_session:
grandchild_session.start_debugging()
@ -180,7 +179,6 @@ def test_subprocess(pyfile, start_method, run_as):
multiprocess=True,
target=(run_as, parent),
start_method=start_method,
use_backchannel=True,
)
parent_session.start_debugging()
@ -208,7 +206,7 @@ def test_subprocess(pyfile, start_method, run_as):
)
parent_session.proceed()
with parent_session.connect_to_child_session(child_subprocess) as child_session:
with parent_session.attach_to_subprocess(child_subprocess) as child_session:
child_session.start_debugging()
child_argv = parent_backchannel.receive()
@ -256,17 +254,19 @@ def test_autokill(pyfile, start_method, run_as):
multiprocess=True,
target=(run_as, parent),
start_method=start_method,
use_backchannel=True,
)
parent_session.start_debugging()
with parent_session.connect_to_next_child_session() as child_session:
with parent_session.attach_to_next_subprocess() as child_session:
child_session.start_debugging()
if parent_session.start_method == "launch":
# In launch scenario, terminate the parent process by disconnecting from it.
parent_session.expected_returncode = some.int
parent_session.send_request("disconnect")
try:
parent_session.request("disconnect")
except EOFError:
pass
parent_session.wait_for_disconnect()
else:
# In attach scenario, just let the parent process run to completion.
@ -327,7 +327,6 @@ def test_argv_quoting(pyfile, start_method, run_as):
target=(run_as, parent),
start_method=start_method,
program_args=[child],
use_backchannel=True,
)
session.start_debugging()

View file

@ -60,7 +60,7 @@ def test_redirect_output(pyfile, start_method, run_as, redirect):
for i in [111, 222, 333, 444]:
print(i)
print() # @bp1
() # @bp1
with debug.Session() as session:
# By default 'RedirectOutput' is always set. So using this way
@ -77,6 +77,6 @@ def test_redirect_output(pyfile, start_method, run_as, redirect):
session.wait_for_exit()
if redirect:
assert session.output("stdout") == "111\n222\n333\n444\n\n"
assert session.output("stdout") == "111\n222\n333\n444\n"
else:
assert not session.output("stdout")

View file

@ -4,11 +4,8 @@
from __future__ import absolute_import, print_function, unicode_literals
import os
import pytest
import shutil
import sys
import traceback
from tests import debug, test_data
from tests.patterns import some
@ -29,36 +26,39 @@ def test_client_ide_from_path_mapping_linux_backend(
from debug_me import backchannel
import pydevd_file_utils
backchannel.send({"ide_os": pydevd_file_utils._ide_os})
print("done") # @break_here
backchannel.send(pydevd_file_utils._ide_os)
print("done") # @bp
with debug.Session() as session:
with debug.Session(start_method) as session:
backchannel = session.setup_backchannel()
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
use_backchannel=True,
path_mappings=[
{
"localRoot": "C:\\TEMP\\src",
"remoteRoot": os.path.dirname(code_to_debug),
"remoteRoot": code_to_debug.dirname,
}
],
)
if invalid_os_type:
session.debug_options.append("CLIENT_OS_TYPE=INVALID")
session.set_breakpoints(
"c:\\temp\\src\\" + os.path.basename(code_to_debug),
[code_to_debug.lines["break_here"]],
"c:\\temp\\src\\" + code_to_debug.basename,
[code_to_debug.lines["bp"]],
)
session.start_debugging()
hit = session.wait_for_stop("breakpoint")
assert hit.frames[0]["source"]["path"] == "C:\\TEMP\\src\\" + os.path.basename(
code_to_debug
)
json_read = backchannel.receive()
assert json_read == {"ide_os": "WINDOWS"}
assert backchannel.receive() == "WINDOWS"
session.wait_for_stop(
"breakpoint",
expected_frames=[
some.dap.frame(
some.dap.source("C:\\TEMP\src\\" + code_to_debug.basename),
line=code_to_debug.lines["bp"],
),
],
)
session.request_continue()
session.wait_for_exit()
@ -73,33 +73,37 @@ def test_with_dot_remote_root(pyfile, tmpdir, start_method, run_as):
backchannel.send(os.path.abspath(__file__))
print("done") # @bp
path_local = tmpdir.mkdir("local").join("code_to_debug.py").strpath
path_remote = tmpdir.mkdir("remote").join("code_to_debug.py").strpath
path_local = tmpdir.mkdir("local") / "code_to_debug.py"
path_remote = tmpdir.mkdir("remote") / "code_to_debug.py"
dir_local = os.path.dirname(path_local)
dir_remote = os.path.dirname(path_remote)
dir_local = path_local.dirname
dir_remote = path_remote.dirname
shutil.copyfile(code_to_debug, path_local)
shutil.copyfile(code_to_debug, path_remote)
code_to_debug.copy(path_local)
code_to_debug.copy(path_remote)
with debug.Session() as session:
with debug.Session(start_method) as session:
backchannel = session.setup_backchannel()
session.initialize(
target=(run_as, path_remote),
start_method=start_method,
use_backchannel=True,
path_mappings=[{"localRoot": dir_local, "remoteRoot": "."}],
cwd=dir_remote,
path_mappings=[{"localRoot": dir_local, "remoteRoot": "."}],
)
session.set_breakpoints(path_remote, [code_to_debug["bp"]])
session.set_breakpoints(path_local, all)
session.start_debugging()
hit = session.wait_for_stop("breakpoint")
print("Local Path: " + path_local)
print("Frames: " + str(hit.frames))
assert hit.frames[0]["source"]["path"] == some.path(path_local)
remote_code_path = backchannel.receive()
assert path_remote == some.path(remote_code_path)
actual_path_remote = backchannel.receive()
assert some.path(actual_path_remote) == path_remote
session.wait_for_stop(
"breakpoint",
expected_frames=[
some.dap.frame(
some.dap.source(path_local),
line="bp",
),
],
)
session.request_continue()
session.wait_for_exit()
@ -112,67 +116,79 @@ def test_with_path_mappings(pyfile, tmpdir, start_method, run_as):
import os
import sys
json = backchannel.receive()
call_me_back_dir = json["call_me_back_dir"]
sys.path.append(call_me_back_dir)
backchannel.send(os.path.abspath(__file__))
call_me_back_dir = backchannel.receive()
sys.path.insert(0, call_me_back_dir)
import call_me_back
def call_func():
print("break here") # @bp
backchannel.send(os.path.abspath(__file__))
call_me_back.call_me_back(call_func)
call_me_back.call_me_back(call_func) # @call_me_back
print("done")
path_local = tmpdir.mkdir("local").join("code_to_debug.py").strpath
path_remote = tmpdir.mkdir("remote").join("code_to_debug.py").strpath
dir_local = tmpdir.mkdir("local")
dir_remote = tmpdir.mkdir("remote")
dir_local = os.path.dirname(path_local)
dir_remote = os.path.dirname(path_remote)
path_local = dir_local / "code_to_debug.py"
path_remote = dir_remote / "code_to_debug.py"
shutil.copyfile(code_to_debug, path_local)
shutil.copyfile(code_to_debug, path_remote)
code_to_debug.copy(path_local)
code_to_debug.copy(path_remote)
call_me_back_dir = test_data / "call_me_back"
call_me_back_py = call_me_back_dir / "call_me_back.py"
with debug.Session() as session:
with debug.Session(start_method) as session:
backchannel = session.setup_backchannel()
session.initialize(
target=(run_as, path_remote),
start_method=start_method,
use_backchannel=True,
path_mappings=[{"localRoot": dir_local, "remoteRoot": dir_remote}],
)
session.set_breakpoints(path_remote, [code_to_debug.lines["bp"]])
session.set_breakpoints(path_local, ["bp"])
session.start_debugging()
backchannel.send({"call_me_back_dir": call_me_back_dir})
hit = session.wait_for_stop("breakpoint")
assert hit.frames[0]["source"]["path"] == some.path(path_local)
source_reference = hit.frames[0]["source"]["sourceReference"]
assert source_reference == 0 # Mapped files should be found locally.
actual_path_remote = backchannel.receive()
assert some.path(actual_path_remote) == path_remote
backchannel.send(call_me_back_dir)
assert hit.frames[1]["source"]["path"].endswith("call_me_back.py")
source_reference = hit.frames[1]["source"]["sourceReference"]
assert source_reference > 0 # Unmapped file should have a source reference.
stop = session.wait_for_stop(
"breakpoint",
expected_frames=[
some.dap.frame(
# Mapped files should not have a sourceReference, so that the IDE
# doesn't try to fetch them instead of opening the local file.
some.dap.source(path_local, sourceReference=0),
line="bp",
),
some.dap.frame(
# Unmapped files should have a sourceReference, since there's no
# local file for the IDE to open.
some.dap.source(call_me_back_py, sourceReference=some.int.not_equal_to(0)),
line="callback",
resp_source = session.send_request(
"source", arguments={"sourceReference": 0}
).wait_for_response(raise_if_failed=False)
assert not resp_source.success
text = "".join(
traceback.format_exception_only(type(resp_source.body), resp_source.body)
),
some.dap.frame(
# Mapped files should not have a sourceReference, so that the IDE
# doesn't try to fetch them instead of opening the local file.
some.dap.source(path_local, sourceReference=0),
line="call_me_back",
),
],
)
assert "Source unavailable" in text
resp_source = session.send_request(
"source", arguments={"sourceReference": source_reference}
).wait_for_response()
assert "def call_me_back(callback):" in (resp_source.body["content"])
srcref = stop.frames[1]["source"]["sourceReference"]
remote_code_path = backchannel.receive()
assert path_remote == some.path(remote_code_path)
try:
session.request("source", {"sourceReference": 0})
except Exception as ex:
assert "Source unavailable" in str(ex)
else:
pytest.fail("sourceReference=0 should not be valid")
source = session.request("source", {"sourceReference": srcref})
assert "def call_me_back(callback):" in source["content"]
session.request_continue()
session.wait_for_exit()

View file

@ -37,7 +37,7 @@ def test_run(pyfile, start_method, run_as):
expected_name = (
"-c"
if run_as == "code"
else some.str.matching(re.escape(code_to_debug.strpath) + r"(c|o)?$")
else some.str.matching(re.escape(code_to_debug.strpath) + r"(c|o)?")
)
assert process_event == Event(
"process", some.dict.containing({"name": expected_name})
@ -47,7 +47,7 @@ def test_run(pyfile, start_method, run_as):
expected_ptvsd_path = path.abspath(ptvsd.__file__)
backchannel.expect(some.str.matching(
re.escape(expected_ptvsd_path) + r"(c|o)?$"
re.escape(expected_ptvsd_path) + r"(c|o)?"
))
session.wait_for_exit()
@ -80,9 +80,7 @@ def test_nodebug(pyfile, run_as):
backchannel = session.setup_backchannel()
session.initialize(target=(run_as, code_to_debug))
breakpoints = session.set_breakpoints(
code_to_debug, [code_to_debug.lines["bp1"], code_to_debug.lines["bp2"]]
)
breakpoints = session.set_breakpoints(code_to_debug, all)
assert breakpoints == [{"verified": False}, {"verified": False}]
session.start_debugging()
@ -113,16 +111,17 @@ def test_run_vs(pyfile, run_as):
import ptvsd.debugger
args = tuple(backchannel.receive())
print("debug{0!r}".format(args))
ptvsd.debugger.debug(*args)
filename = "code_to_debug" if run_as == "module" else code_to_debug
with debug.Session("custom_client") as session:
backchannel = session.setup_backchannel()
session.before_connect = lambda: backchannel.send(
[filename, session.ptvsd_port, None, None, run_as]
)
@session.before_connect
def before_connect():
backchannel.send(
[filename, session.ptvsd_port, None, None, run_as]
)
session.initialize(target=("file", ptvsd_launcher))
session.start_debugging()

View file

@ -22,7 +22,6 @@ def test_set_expression(pyfile, start_method, run_as):
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
use_backchannel=True,
)
session.start_debugging()
hit = session.wait_for_stop()

View file

@ -17,30 +17,27 @@ from tests.patterns import some
sys.version_info < (3, 0) and platform.system() == "Windows",
reason="On Windows + Python 2, unable to send key strokes to test.",
)
@pytest.mark.skip("https://github.com/microsoft/ptvsd/issues/1571")
def test_wait_on_normal_exit_enabled(pyfile, start_method, run_as):
@pyfile
def code_to_debug():
from debug_me import backchannel, ptvsd
from debug_me import ptvsd
ptvsd.break_into_debugger()
backchannel.send("done")
print() # line on which it'll actually break
with debug.Session() as session:
backchannel = session.setup_backchannel()
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
debug_options=["WaitOnNormalExit"],
use_backchannel=True,
expected_returncode=some.int,
)
session.start_debugging()
session.wait_for_stop()
session.request_continue()
session.expected_returncode = some.int
assert backchannel.receive() == "done"
session.process.stdin.write(b" \r\n")
session.wait_for_exit()
@ -52,6 +49,7 @@ def test_wait_on_normal_exit_enabled(pyfile, start_method, run_as):
sys.version_info < (3, 0) and platform.system() == "Windows",
reason="On Windows + Python 2, unable to send key strokes to test.",
)
@pytest.mark.skip("https://github.com/microsoft/ptvsd/issues/1571")
def test_wait_on_abnormal_exit_enabled(pyfile, start_method, run_as):
@pyfile
def code_to_debug():
@ -68,14 +66,13 @@ def test_wait_on_abnormal_exit_enabled(pyfile, start_method, run_as):
target=(run_as, code_to_debug),
start_method=start_method,
debug_options=["WaitOnAbnormalExit"],
use_backchannel=True,
expected_returncode=some.int,
)
session.start_debugging()
session.wait_for_stop()
session.request_continue()
session.expected_returncode = some.int
assert backchannel.receive() == "done"
session.process.stdin.write(b" \r\n")
@ -99,7 +96,6 @@ def test_exit_normally_with_wait_on_abnormal_exit_enabled(pyfile, start_method,
target=(run_as, code_to_debug),
start_method=start_method,
debug_options=["WaitOnAbnormalExit"],
use_backchannel=True,
)
session.start_debugging()
@ -107,7 +103,5 @@ def test_exit_normally_with_wait_on_abnormal_exit_enabled(pyfile, start_method,
session.request_continue()
session.wait_for_termination()
assert backchannel.receive() == "done"
session.wait_for_exit()

View file

@ -25,7 +25,6 @@ def test_stop_on_entry(pyfile, start_method, run_as, with_bp):
target=(run_as, code_to_debug),
start_method=start_method,
debug_options=["StopOnEntry"],
use_backchannel=True,
)
if bool(with_bp):
session.set_breakpoints(code_to_debug, [code_to_debug.lines["bp"]])

View file

@ -1,2 +1,2 @@
def call_me_back(callback):
callback()
callback() # @callback

View file

@ -47,7 +47,7 @@ def home(request):
def bad_route_handled(request):
try:
raise ArithmeticError('Hello')
raise ArithmeticError('Hello') # @exc_handled
except Exception:
pass
title = 'hello'
@ -61,7 +61,7 @@ def bad_route_handled(request):
def bad_route_unhandled(request):
raise ArithmeticError('Hello')
raise ArithmeticError('Hello') # @exc_unhandled
title = 'hello'
content = 'Django-Django-Test'
template = loader.get_template('hello.html')

View file

@ -19,7 +19,7 @@ def home():
@app.route("/handled")
def bad_route_handled():
try:
raise ArithmeticError('Hello')
raise ArithmeticError('Hello') # @exc_handled
except Exception:
pass
return render_template(
@ -31,7 +31,7 @@ def bad_route_handled():
@app.route("/unhandled")
def bad_route_unhandled():
raise ArithmeticError('Hello')
raise ArithmeticError('Hello') # @exc_unhandled
return render_template(
"hello.html",
title='Hello',

View file

@ -39,6 +39,9 @@ def test_value(x):
log_repr(some.object.equal_to(x))
assert x == some.object.equal_to(x)
log_repr(some.object.not_equal_to(x))
assert x != some.object.not_equal_to(x)
log_repr(some.object.same_as(x))
assert x == some.object.same_as(x)
@ -68,6 +71,11 @@ def test_equal():
assert NAN != some.object.equal_to(NAN)
def test_not_equal():
assert 123.0 != some.object.not_equal_to(123)
assert NAN == some.object.not_equal_to(NAN)
def test_same():
assert 123.0 != some.object.same_as(123)
assert NAN == some.object.same_as(NAN)
@ -131,6 +139,58 @@ def test_matching():
assert pattern != b"abbbc"
def test_starting_with():
pattern = some.str.starting_with("aa")
log_repr(pattern)
assert pattern == "aabbbb"
assert pattern != "bbbbaa"
assert pattern != "bbaabb"
assert pattern != "ababab"
pattern = some.bytes.starting_with(b"aa")
log_repr(pattern)
assert pattern == b"aabbbb"
assert pattern != b"bbbbaa"
assert pattern != b"bbaabb"
assert pattern != b"ababab"
def test_ending_with():
pattern = some.str.ending_with("aa")
log_repr(pattern)
assert pattern == "bbbbaa"
assert pattern == "bb\nbb\naa"
assert pattern != "aabbbb"
assert pattern != "bbaabb"
assert pattern != "ababab"
pattern = some.bytes.ending_with(b"aa")
log_repr(pattern)
assert pattern == b"bbbbaa"
assert pattern == b"bb\nbb\naa"
assert pattern != b"aabbbb"
assert pattern != b"bbaabb"
assert pattern != b"ababab"
def test_containing():
pattern = some.str.containing("aa")
log_repr(pattern)
assert pattern == "aabbbb"
assert pattern == "bbbbaa"
assert pattern == "bbaabb"
assert pattern == "bb\naa\nbb"
assert pattern != "ababab"
pattern = some.bytes.containing(b"aa")
log_repr(pattern)
assert pattern == b"aabbbb"
assert pattern == b"bbbbaa"
assert pattern == b"bbaabb"
assert pattern == b"bb\naa\nbb"
assert pattern != b"ababab"
def test_list():
assert [1, 2, 3] == [1, some.thing, 3]
assert [1, 2, 3, 4] != [1, some.thing, 4]

View file

@ -8,6 +8,7 @@ import pytest
import threading
import time
from ptvsd.common import log
from tests.patterns import some
from tests.timeline import Timeline, Mark, Event, Request, Response
@ -31,6 +32,7 @@ def make_timeline(request):
return timeline, initial_history
yield factory
log.newline()
try:
failed = request.node.call_result.failed

View file

@ -6,7 +6,6 @@ from __future__ import absolute_import, print_function, unicode_literals
import contextlib
import itertools
import pytest
import threading
from ptvsd.common import fmt, log, timestamp
@ -232,7 +231,7 @@ class Timeline(object):
log.info('No matching {0!r}', expectation)
occurrences = list(first.and_following())
log.info("Occurrences considered: {0!r}", occurrences)
pytest.fail("Expectation not matched")
raise AssertionError("Expectation not matched")
occs = tuple(reasons.values())
assert occs

View file

@ -5,4 +5,4 @@ envlist = py{27,34,35,36,37}
deps = -rtests/requirements.txt
passenv = PTVSD_LOG_DIR
commands =
pytest {posargs:-n32}
pytest {posargs:-n8}