Fix #1585: Output tests fail sporadically

Wait on a breakpoint before checking output.

Add temp workaround for #1574: Flask tests fail with "no such option: --wait" on windows py27

Pin Flask version to last known good one in tests/requirements.txt.

Other fixes and improvements:

Make start_method mandatory for debug.Session to avoid problems with tests forgetting to specify it.

Add debug_me.scratchpad to enable async communication between test code and debuggee.

Improve debug.Session logging.

Improve test_attach.
This commit is contained in:
Pavel Minaev 2019-07-14 00:52:08 -07:00 committed by Pavel Minaev
parent edd5753d7e
commit 4cd1d4163f
24 changed files with 285 additions and 274 deletions

View file

@ -47,3 +47,10 @@ if _code:
_code = compile(_code, "<PTVSD_DEBUG_ME>", "exec")
eval(_code, {})
# For non-blocking communication between the test and the debuggee. The debuggee
# can access this as a normal dict - scratchpad["foo"] etc. The test should assign
# to session.scratchpad[...], which will automatically perform "evaluate" requests
# as needed to assign the value.
scratchpad = {}

View file

@ -68,22 +68,20 @@ class Session(object):
_counter = itertools.count(1)
def __init__(self, start_method='launch', ptvsd_port=None, pid=None):
def __init__(self, start_method, ptvsd_port=None, pid=None):
assert start_method in self.START_METHODS
assert ptvsd_port is None or start_method.startswith('attach_socket_')
self.id = next(self._counter)
log.info('Starting debug session {0} via {1!r}', self.id, start_method)
log.info('New debug session {0}; will be started via {1!r}.', self, start_method)
self.lock = threading.RLock()
self.target = ('code', 'print("OK")')
self.target = None
self.start_method = start_method
self.start_method_args = {}
self.no_debug = False
self.ptvsd_port = ptvsd_port or PTVSD_PORT
self.multiprocess = False
self.multiprocess_port_range = None
self.debug_options = ['RedirectOutput']
self.debug_options = {'RedirectOutput'}
self.path_mappings = []
self.success_exitcodes = None
self.rules = []
@ -107,6 +105,7 @@ class Session(object):
self.server_socket = None
self.connected = threading.Event()
self.backchannel = None
self.scratchpad = ScratchPad(self)
self.capture_output = True
self.captured_output = CapturedOutput(self)
@ -169,6 +168,7 @@ class Session(object):
def close(self):
with self.lock:
if self.socket:
log.debug('Closing socket to {0}...', self)
try:
self.socket.shutdown(socket.SHUT_RDWR)
except Exception:
@ -181,6 +181,7 @@ class Session(object):
log.debug('Closed socket to {0}', self)
if self.server_socket:
log.debug('Closing server socket for {0}...', self)
try:
self.server_socket.shutdown(socket.SHUT_RDWR)
except Exception:
@ -198,6 +199,7 @@ class Session(object):
if self.process:
if self.kill_ptvsd:
log.info('Killing {0} (pid={1}) process tree...', self, self.pid)
try:
self._kill_process_tree()
except Exception:
@ -205,6 +207,7 @@ class Session(object):
log.info('Killed {0} (pid={1}) process tree', self, self.pid)
# Clean up pipes to avoid leaking OS handles.
log.debug('Closing stdio pipes of {0}...', self)
try:
self.process.stdin.close()
except Exception:
@ -217,9 +220,13 @@ class Session(object):
self.process.stderr.close()
except Exception:
pass
log.debug('Closed stdio pipes of {0}', self)
log.debug('Waiting for remaining captured output of {0}...', self)
self.captured_output.wait()
log.info('{0} closed', self)
def _get_argv_for_attach_using_import(self):
argv = [sys.executable]
return argv
@ -301,8 +308,8 @@ class Session(object):
self.env.update(kwargs.pop('env', {}))
self.start_method_args.update(kwargs.pop('args', {}))
self.debug_options |= set(kwargs.pop('debug_options', []))
self.path_mappings += kwargs.pop('path_mappings', [])
self.debug_options += kwargs.pop('debug_options', [])
self.program_args += kwargs.pop('program_args', [])
self.rules += kwargs.pop('rules', [])
@ -341,7 +348,13 @@ class Session(object):
self._setup_session(**kwargs)
start_method = self.start_method
log.info('Initializing debug session for {0}', self)
log.debug('Initializing debug session for {0}', self)
if self.ignore_unobserved:
log.info(
"Will not complain about unobserved:\n\n{0}",
"\n\n".join(repr(exp) for exp in self.ignore_unobserved)
)
dbg_argv = []
usr_argv = []
@ -384,9 +397,6 @@ class Session(object):
else:
dbg_argv += list(self.program_args)
if self.multiprocess and 'Multiprocess' not in self.debug_options:
self.debug_options += ['Multiprocess']
if self.backchannel:
self.backchannel.listen()
self.env['PTVSD_BACKCHANNEL_PORT'] = str(self.backchannel.port)
@ -432,13 +442,14 @@ class Session(object):
spawn_args = [make_filename(s) for s in spawn_args]
log.info('Spawning {0}:\n\n{1}', self, "\n".join((repr(s) for s in spawn_args)))
stdio = {}
if self.capture_output:
stdio["stdin"] = stdio["stdout"] = stdio["stderr"] = subprocess.PIPE
self.process = subprocess.Popen(
spawn_args,
env=env,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=cwd,
**stdio
)
self.pid = self.process.pid
self.psutil_process = psutil.Process(self.pid)
@ -645,7 +656,12 @@ class Session(object):
return request
def request(self, *args, **kwargs):
return self.send_request(*args, **kwargs).wait_for_response().body
freeze = kwargs.pop("freeze", True)
raise_if_failed = kwargs.pop("raise_if_failed", True)
return self.send_request(*args, **kwargs).wait_for_response(
freeze=freeze,
raise_if_failed=raise_if_failed,
).body
def handshake(self):
"""Performs the handshake that establishes the debug session ('initialized'
@ -662,7 +678,7 @@ class Session(object):
request = 'launch' if self.start_method == 'launch' else 'attach'
self.start_method_args.update({
'debugOptions': self.debug_options,
'debugOptions': list(self.debug_options),
'pathMappings': self.path_mappings,
'rules': self.rules,
})
@ -775,7 +791,10 @@ class Session(object):
return StopInfo(stopped, frames, tid, fid)
def request_continue(self):
self.send_request('continue').wait_for_response(freeze=False)
self.request('continue', freeze=False)
def request_disconnect(self):
self.request('disconnect', freeze=False)
def set_breakpoints(self, path, lines):
"""Sets breakpoints in the specified file, and returns the list of all the
@ -879,7 +898,7 @@ class Session(object):
child_port = ptvsd_subprocess.body['port']
assert child_port != 0
child_session = Session(start_method='attach_socket_cmdline', ptvsd_port=child_port)
child_session = Session('attach_socket_cmdline', ptvsd_port=child_port)
try:
child_session.ignore_unobserved = self.ignore_unobserved
child_session.debug_options = self.debug_options
@ -906,12 +925,12 @@ class Session(object):
assert self.start_method.startswith("attach_socket_")
ns = Session(start_method='attach_socket_import', ptvsd_port=self.ptvsd_port)
ns = Session('attach_socket_import', ptvsd_port=self.ptvsd_port)
try:
ns._setup_session(**kwargs)
ns.ignore_unobserved = self.ignore_unobserved
ns.debug_options = self.debug_options
ns.rules = self.rules
ns.ignore_unobserved = list(self.ignore_unobserved)
ns.debug_options = set(self.debug_options)
ns.rules = list(self.rules)
ns.pid = self.pid
ns.process = self.process
@ -1106,3 +1125,39 @@ class BackChannel(object):
pass
self._server_socket = None
log.debug('Closed server socket for {0} to {1}', self, self.session)
class ScratchPad(object):
def __init__(self, session):
self.session = session
def __getitem__(self, key):
raise NotImplementedError
def __setitem__(self, key, value):
"""Sets debug_me.scratchpad[key] = value inside the debugged process.
"""
stackTrace_responses = self.session.all_occurrences_of(
Response(Request("stackTrace"))
)
assert stackTrace_responses, (
'scratchpad requires at least one "stackTrace" request in the timeline.'
)
stack_trace = stackTrace_responses[-1].body
frame_id = stack_trace["stackFrames"][0]["id"]
log.info("{0} debug_me.scratchpad[{1!r}] = {2!r}", self.session, key, value)
expr = fmt(
"__import__('debug_me').scratchpad[{0!r}] = {1!r}",
key,
value,
)
self.session.request(
"evaluate",
{
"frameId": frame_id,
"context": "repl",
"expression": expr,
},
)

View file

@ -7,25 +7,26 @@ from __future__ import absolute_import, print_function, unicode_literals
import pytest
from tests import debug
from tests.patterns import some
@pytest.mark.parametrize("run_as", ["file", "module", "code"])
def test_args(pyfile, start_method, run_as):
@pyfile
def code_to_debug():
import debug_me # noqa
from debug_me import backchannel
import sys
backchannel.send(sys.argv)
print(sys.argv)
assert sys.argv[1] == "--arg1"
assert sys.argv[2] == "arg2"
assert sys.argv[3] == "-arg3"
args = ["--arg1", "arg2", "-arg3"]
with debug.Session() as session:
with debug.Session(start_method) as session:
backchannel = session.setup_backchannel()
session.initialize(
target=(run_as, code_to_debug), start_method=start_method, program_args=args
target=(run_as, code_to_debug),
program_args=["--arg1", "arg2", "-arg3", "--", "arg4", "-a"]
)
session.start_debugging()
argv = backchannel.receive()
assert argv == [some.str] + session.program_args
session.wait_for_exit()

View file

@ -6,55 +6,47 @@ from __future__ import absolute_import, print_function, unicode_literals
import pytest
from tests import code, debug, test_data
from tests import debug, test_data
from tests.patterns import some
from tests.timeline import Event
@pytest.mark.parametrize("wait_for_attach", ["waitOn", "waitOff"])
@pytest.mark.parametrize("is_attached", ["attachCheckOn", "attachCheckOff"])
@pytest.mark.parametrize("break_into", ["break", "pause"])
def test_attach(run_as, wait_for_attach, is_attached, break_into):
@pytest.mark.parametrize("wait_for_attach", ["wait_for_attach", ""])
@pytest.mark.parametrize("is_attached", ["is_attached", ""])
@pytest.mark.parametrize("break_into_debugger", ["break_into_debugger", ""])
def test_attach(run_as, wait_for_attach, is_attached, break_into_debugger):
attach1_py = test_data / "attach" / "attach1.py"
lines = code.get_marked_line_numbers(attach1_py)
with debug.Session() as session:
env = {
"PTVSD_TEST_HOST": "localhost",
"PTVSD_TEST_PORT": str(session.ptvsd_port),
}
if wait_for_attach == "waitOn":
env["PTVSD_WAIT_FOR_ATTACH"] = "1"
if is_attached == "attachCheckOn":
env["PTVSD_IS_ATTACHED"] = "1"
if break_into == "break":
env["PTVSD_BREAK_INTO_DBG"] = "1"
with debug.Session("custom_server") as session:
session.env.update({
"ATTACH1_TEST_PORT": str(session.ptvsd_port),
"ATTACH1_WAIT_FOR_ATTACH": "1" if wait_for_attach else "0",
"ATTACH1_IS_ATTACHED": "1" if is_attached else "0",
"ATTACH1_BREAK_INTO_DEBUGGER": "1" if break_into_debugger else "0",
})
backchannel = session.setup_backchannel()
session.initialize(
target=(run_as, attach1_py),
start_method="launch",
env=env,
)
session.initialize(target=(run_as, attach1_py))
session.start_debugging()
if wait_for_attach == "waitOn":
if wait_for_attach:
assert backchannel.receive() == "wait_for_attach"
if is_attached == "attachCheckOn":
if is_attached:
assert backchannel.receive() == "is_attached"
if break_into == "break":
assert backchannel.receive() == "break_into_debugger"
hit = session.wait_for_stop()
assert lines["bp"] == hit.frames[0]["line"]
if break_into_debugger:
assert backchannel.receive() == "break_into_debugger?"
backchannel.send("proceed")
session.wait_for_stop(expected_frames=[
some.dap.frame(attach1_py, "break_into_debugger")
])
else:
# pause test
backchannel.send("pause_test")
session.send_request("pause").wait_for_response(freeze=False)
hit = session.wait_for_stop(reason="pause")
# Note: no longer asserting line as it can even stop on different files
# (such as as backchannel.py).
# assert hit.frames[0]['line'] in [27, 28, 29]
assert backchannel.receive() == "loop?"
backchannel.send("proceed")
session.request("pause", freeze=False)
session.wait_for_stop("pause")
session.scratchpad["paused"] = True
session.request_continue()
session.wait_for_exit()
@ -63,41 +55,43 @@ def test_attach(run_as, wait_for_attach, is_attached, break_into):
@pytest.mark.parametrize(
"start_method", ["attach_socket_cmdline", "attach_socket_import"]
)
# @pytest.mark.skip(reason="Test fails often in local runs. Uncomment as needed.")
def test_reattach(pyfile, start_method, run_as):
@pyfile
def code_to_debug():
from debug_me import backchannel, ptvsd
from debug_me import ptvsd, scratchpad
import time
ptvsd.break_into_debugger()
print("first") # @first
backchannel.send("continued")
for _ in range(0, 100):
object() # @first
scratchpad["exit"] = False
while not scratchpad["exit"]:
time.sleep(0.1)
ptvsd.break_into_debugger()
print("second") # @second
object() # @second
with debug.Session() as session:
backchannel = session.setup_backchannel()
with debug.Session(start_method) as session:
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
kill_ptvsd=False,
capture_output=False,
)
session.start_debugging()
hit = session.wait_for_stop()
assert code_to_debug.lines["first"] == hit.frames[0]["line"]
session.send_request("disconnect").wait_for_response(freeze=False)
session.wait_for_stop(expected_frames=[
some.dap.frame(code_to_debug, "first"),
])
session.request_disconnect()
session.wait_for_disconnect()
assert backchannel.receive() == "continued"
with session.reattach(target=(run_as, code_to_debug)) as session2:
session2.start_debugging()
hit = session2.wait_for_stop()
assert code_to_debug.lines["second"] == hit.frames[0]["line"]
session2.send_request("disconnect").wait_for_response(freeze=False)
session2.wait_for_disconnect()
session2.wait_for_stop(expected_frames=[
some.dap.frame(code_to_debug, "second"),
])
session.scratchpad["exit"] = True
session.request_disconnect()
session.wait_for_disconnect()
@pytest.mark.parametrize("start_method", ["attach_pid"])
@ -111,22 +105,25 @@ def test_attaching_by_pid(pyfile, run_as, start_method):
def do_something(i):
time.sleep(0.1)
print(i) # @break
print(i) # @bp
for i in range(100):
do_something(i)
with debug.Session() as session:
session.initialize(target=(run_as, code_to_debug), start_method=start_method)
session.set_breakpoints(code_to_debug, [code_to_debug.lines["break"]])
with debug.Session(start_method) as session:
session.initialize(target=(run_as, code_to_debug))
session.set_breakpoints(code_to_debug, all)
session.start_debugging()
hit = session.wait_for_stop()
assert code_to_debug.lines["break"] == hit.frames[0]["line"]
# remove breakpoint and continue
session.wait_for_stop(expected_frames=[
some.dap.frame(code_to_debug, "bp"),
])
# Remove breakpoint and continue.
session.set_breakpoints(code_to_debug, [])
session.request_continue()
session.wait_for_next(
Event("output", some.dict.containing({"category": "stdout"}))
)
session.wait_for_exit()

View file

@ -20,8 +20,8 @@ def test_with_wait_for_attach(pyfile, start_method, run_as):
ptvsd.break_into_debugger()
print("break here") # @break
with debug.Session() as session:
session.initialize(target=(run_as, code_to_debug), start_method=start_method)
with debug.Session(start_method) as session:
session.initialize(target=(run_as, code_to_debug))
session.start_debugging()
hit = session.wait_for_stop()
assert hit.frames[0]["line"] == code_to_debug.lines["break"]
@ -43,8 +43,8 @@ def test_breakpoint_function(pyfile, start_method, run_as):
breakpoint() # noqa
print("break here") # @break
with debug.Session() as session:
session.initialize(target=(run_as, code_to_debug), start_method=start_method)
with debug.Session(start_method) as session:
session.initialize(target=(run_as, code_to_debug))
session.start_debugging()
hit = session.wait_for_stop()
path = hit.frames[0]["source"]["path"]

View file

@ -53,10 +53,9 @@ def test_completions_scope(pyfile, bp_label, start_method, run_as):
expected = expected_at_line[bp_label]
with debug.Session() as session:
with debug.Session(start_method) as session:
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
ignore_unobserved=[Event("stopped")],
)
@ -88,8 +87,8 @@ def test_completions_cases(pyfile, start_method, run_as):
c = 3
print([a, b, c]) # @break
with debug.Session() as session:
session.initialize(target=(run_as, code_to_debug), start_method=start_method)
with debug.Session(start_method) as session:
session.initialize(target=(run_as, code_to_debug))
session.set_breakpoints(code_to_debug, [code_to_debug.lines["break"]])
session.start_debugging()
hit = session.wait_for_stop()

View file

@ -22,11 +22,10 @@ def test_continue_on_disconnect_for_attach(pyfile, start_method, run_as):
backchannel.send("continued") # @bp
with debug.Session() as session:
with debug.Session(start_method) as session:
backchannel = session.setup_backchannel()
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
ignore_unobserved=[Event("exited"), Event("terminated")],
)
session.set_breakpoints(code_to_debug, [code_to_debug.lines["bp"]])
@ -51,10 +50,9 @@ def test_exit_on_disconnect_for_launch(pyfile, start_method, run_as):
with open(fp, "w") as f:
print("Should not continue after disconnect on launch", file=f)
with debug.Session() as session:
with debug.Session(start_method) as session:
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
expected_returncode=some.int,
)
session.set_breakpoints(code_to_debug, code_to_debug.lines["bp"])

View file

@ -29,22 +29,17 @@ class lines:
def _initialize_session(session, multiprocess=False):
program_args = [
"runserver",
"--",
str(django.port),
]
session.program_args = ["runserver", "--", str(django.port)]
if not multiprocess:
program_args[1:1] = [
"--noreload",
]
session.program_args[1:1] = ["--noreload"]
session.debug_options |= {"Django"}
if multiprocess:
session.debug_options |= {"Multiprocess"}
session.initialize(
target=("file", paths.app_py),
program_args=program_args,
debug_options=["Django"],
cwd=paths.django1,
multiprocess=multiprocess,
expected_returncode=some.int, # No clean way to kill Django server
)

View file

@ -20,8 +20,8 @@ def test_variables_and_evaluate(pyfile, start_method, run_as):
c = 3
print([a, b, c]) # @bp
with debug.Session() as session:
session.initialize(target=(run_as, code_to_debug), start_method=start_method)
with debug.Session(start_method) as session:
session.initialize(target=(run_as, code_to_debug))
session.set_breakpoints(code_to_debug, [code_to_debug.lines["bp"]])
session.start_debugging()
hit = session.wait_for_stop()
@ -112,12 +112,9 @@ def test_set_variable(pyfile, start_method, run_as):
ptvsd.break_into_debugger()
backchannel.send(a)
with debug.Session() as session:
with debug.Session(start_method) as session:
backchannel = session.setup_backchannel()
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
)
session.initialize(target=(run_as, code_to_debug))
session.start_debugging()
hit = session.wait_for_stop()
@ -182,8 +179,8 @@ def test_variable_sort(pyfile, start_method, run_as):
d = 3 # noqa
print("done") # @bp
with debug.Session() as session:
session.initialize(target=(run_as, code_to_debug), start_method=start_method)
with debug.Session(start_method) as session:
session.initialize(target=(run_as, code_to_debug))
session.set_breakpoints(code_to_debug, [code_to_debug.lines["bp"]])
session.start_debugging()
hit = session.wait_for_stop()
@ -289,11 +286,10 @@ def test_return_values(pyfile, start_method, run_as):
}
)
with debug.Session() as session:
with debug.Session(start_method) as session:
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
debug_options=["ShowReturnValue"],
debug_options={"ShowReturnValue"},
)
session.set_breakpoints(code_to_debug, [code_to_debug.lines["bp"]])
session.start_debugging()
@ -354,8 +350,8 @@ def test_unicode(pyfile, start_method, run_as):
ptvsd.break_into_debugger()
print("break")
with debug.Session() as session:
session.initialize(target=(run_as, code_to_debug), start_method=start_method)
with debug.Session(start_method) as session:
session.initialize(target=(run_as, code_to_debug))
session.start_debugging()
hit = session.wait_for_stop()
@ -385,8 +381,8 @@ def test_hex_numbers(pyfile, start_method, run_as):
d = {(1, 10, 100): (10000, 100000, 100000)}
print((a, b, c, d)) # @bp
with debug.Session() as session:
session.initialize(target=(run_as, code_to_debug), start_method=start_method)
with debug.Session(start_method) as session:
session.initialize(target=(run_as, code_to_debug))
session.set_breakpoints(code_to_debug, [code_to_debug.lines["bp"]])
session.start_debugging()
hit = session.wait_for_stop()

View file

@ -35,8 +35,8 @@ def test_vsc_exception_options_raise_with_except(
filters += ["raised"] if raised == "raisedOn" else []
filters += ["uncaught"] if uncaught == "uncaughtOn" else []
with debug.Session() as session:
session.initialize(target=(run_as, code_to_debug), start_method=start_method)
with debug.Session(start_method) as session:
session.initialize(target=(run_as, code_to_debug))
session.request("setExceptionBreakpoints", {"filters": filters})
session.start_debugging()
@ -93,10 +93,9 @@ def test_vsc_exception_options_raise_without_except(
filters = []
filters += ["raised"] if raised == "raisedOn" else []
filters += ["uncaught"] if uncaught == "uncaughtOn" else []
with debug.Session() as session:
with debug.Session(start_method) as session:
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
ignore_unobserved=[Event("stopped")],
expected_returncode=some.int,
)
@ -193,13 +192,12 @@ def test_systemexit(pyfile, start_method, run_as, raised, uncaught, zero, exit_c
if uncaught:
filters += ["uncaught"]
with debug.Session() as session:
with debug.Session(start_method) as session:
session.program_args = [repr(exit_code)]
if zero:
session.debug_options += ["BreakOnSystemExitZero"]
session.debug_options |= {"BreakOnSystemExitZero"}
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
expected_returncode=some.int,
)
session.send_request(
@ -287,10 +285,9 @@ def test_raise_exception_options(pyfile, start_method, run_as, exceptions, break
except IndexError:
pass
with debug.Session() as session:
with debug.Session(start_method) as session:
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
ignore_unobserved=[Event("stopped")],
expected_returncode=some.int,
)
@ -331,12 +328,11 @@ def test_success_exitcodes(pyfile, start_method, run_as, exit_code):
print("sys.exit(%r)" % (exit_code,))
sys.exit(exit_code)
with debug.Session() as session:
with debug.Session(start_method) as session:
session.program_args = [repr(exit_code)]
session.success_exitcodes = [3]
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
expected_returncode=exit_code,
)
session.send_request(
@ -382,10 +378,9 @@ def test_exception_stack(pyfile, start_method, run_as, max_frames):
max_expected_lines = 21
args = {"maxExceptionStackFrames": 10}
with debug.Session() as session:
with debug.Session(start_method) as session:
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
expected_returncode=some.int,
args=args,
)

View file

@ -29,34 +29,26 @@ class lines:
def _initialize_session(session, multiprocess=False):
env = {
session.env.update({
"FLASK_APP": paths.app_py,
"FLASK_ENV": "development",
"FLASK_DEBUG": "1" if multiprocess else "0",
}
})
if platform.system() != "Windows":
locale = "en_US.utf8" if platform.system() == "Linux" else "en_US.UTF-8"
env.update({"LC_ALL": locale, "LANG": locale})
session.env.update({"LC_ALL": locale, "LANG": locale})
program_args = [
"run",
"--port",
str(flask.port),
]
session.program_args = ["run", "--port", str(flask.port)]
if not multiprocess:
program_args[1:1] = [
"--no-debugger",
"--no-reload",
"--with-threads",
]
session.program_args[1:1] = ["--no-debugger", "--no-reload", "--with-threads"]
session.debug_options |= {"Jinja"}
if multiprocess:
session.debug_options |= {"Multiprocess"}
session.initialize(
target=("module", "flask"),
program_args=program_args,
debug_options=["Jinja"],
cwd=paths.flask1,
env=env,
multiprocess=multiprocess,
expected_returncode=some.int, # No clean way to kill Flask server
)

View file

@ -10,7 +10,7 @@ from tests import debug
from tests.patterns import some
@pytest.mark.parametrize("jmc", ["jmcOn", "jmcOff"])
@pytest.mark.parametrize("jmc", ["jmc", ""])
def test_justmycode_frames(pyfile, start_method, run_as, jmc):
@pyfile
def code_to_debug():
@ -18,15 +18,12 @@ def test_justmycode_frames(pyfile, start_method, run_as, jmc):
print("break here") # @bp
with debug.Session() as session:
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
debug_options=[] if jmc == "jmcOn" else ["DebugStdLib"],
)
with debug.Session(start_method) as session:
if not jmc:
session.debug_options |= {"DebugStdLib"}
session.initialize(target=(run_as, code_to_debug))
bp_line = code_to_debug.lines["bp"]
actual_bps = session.set_breakpoints(code_to_debug, [bp_line])
actual_bps = [bp["line"] for bp in actual_bps]
session.start_debugging()
@ -39,7 +36,7 @@ def test_justmycode_frames(pyfile, start_method, run_as, jmc):
}
)
if jmc == "jmcOn":
if jmc:
assert len(hit.frames) == 1
session.send_request(
"stepIn", {"threadId": hit.thread_id}

View file

@ -29,15 +29,13 @@ def test_log_cli(pyfile, tmpdir, start_method, run_as, cli):
def code_to_debug():
import debug_me # noqa
with debug.Session() as session:
with debug.Session(start_method) as session:
with check_logs(tmpdir, session):
if cli == "arg":
session.log_dir = str(tmpdir)
else:
session.env["PTVSD_LOG_DIR"] = str(tmpdir)
session.initialize(
target=(run_as, code_to_debug), start_method=start_method
)
session.initialize(target=(run_as, code_to_debug))
session.start_debugging()
session.wait_for_exit()

View file

@ -69,13 +69,10 @@ def test_multiprocessing(pyfile, start_method, run_as):
q.close()
backchannel.send("done")
with debug.Session() as parent_session:
with debug.Session(start_method) as parent_session:
parent_backchannel = parent_session.setup_backchannel()
parent_session.initialize(
multiprocess=True,
target=(run_as, code_to_debug),
start_method=start_method,
)
parent_session.debug_options |= {"Multiprocess"}
parent_session.initialize(target=(run_as, code_to_debug))
parent_session.start_debugging()
root_start_request, = parent_session.all_occurrences_of(
@ -172,14 +169,11 @@ def test_subprocess(pyfile, start_method, run_as):
)
process.wait()
with debug.Session() as parent_session:
parent_session.program_args += [child]
with debug.Session(start_method) as parent_session:
parent_backchannel = parent_session.setup_backchannel()
parent_session.initialize(
multiprocess=True,
target=(run_as, parent),
start_method=start_method,
)
parent_session.program_args += [child]
parent_session.debug_options |= {"Multiprocess"}
parent_session.initialize(target=(run_as, parent))
parent_session.start_debugging()
root_start_request, = parent_session.all_occurrences_of(
@ -247,14 +241,11 @@ def test_autokill(pyfile, start_method, run_as):
)
backchannel.receive()
with debug.Session() as parent_session:
parent_session.program_args += [child]
with debug.Session(start_method) as parent_session:
parent_backchannel = parent_session.setup_backchannel()
parent_session.initialize(
multiprocess=True,
target=(run_as, parent),
start_method=start_method,
)
parent_session.program_args += [child]
parent_session.debug_options |= {"Multiprocess"}
parent_session.initialize(target=(run_as, parent))
parent_session.start_debugging()
with parent_session.attach_to_next_subprocess() as child_session:
@ -321,11 +312,10 @@ def test_argv_quoting(pyfile, start_method, run_as):
actual_args = sys.argv[1:]
backchannel.send(actual_args)
with debug.Session() as session:
with debug.Session(start_method) as session:
backchannel = session.setup_backchannel()
session.initialize(
target=(run_as, parent),
start_method=start_method,
program_args=[child],
)

View file

@ -9,16 +9,25 @@ import pytest
from tests import debug
# When debuggee exits, there's no guarantee currently that all "output" events have
# already been sent. To ensure that they are, all tests below must set a breakpoint
# on the last line of the debuggee, and stop on it. Since debugger sends its events
# sequentially, by the time we get to "stopped", we also have all the output events.
def test_with_no_output(pyfile, start_method, run_as):
@pyfile
def code_to_debug():
import debug_me # noqa
() # @wait_for_output
# Do nothing, and check if there is any output
with debug.Session(start_method) as session:
session.initialize(target=(run_as, code_to_debug))
session.set_breakpoints(code_to_debug, all)
with debug.Session() as session:
session.initialize(target=(run_as, code_to_debug), start_method=start_method)
session.start_debugging()
session.wait_for_stop("breakpoint")
session.request_continue()
session.wait_for_exit()
assert not session.output("stdout")
@ -34,16 +43,13 @@ def test_with_tab_in_output(pyfile, start_method, run_as):
a = "\t".join(("Hello", "World"))
print(a)
# Break here so we are sure to get the output event.
a = 1 # @bp1
() # @wait_for_output
with debug.Session() as session:
session.initialize(target=(run_as, code_to_debug), start_method=start_method)
with debug.Session(start_method) as session:
session.initialize(target=(run_as, code_to_debug))
session.set_breakpoints(code_to_debug, [code_to_debug.lines["bp1"]])
session.set_breakpoints(code_to_debug, all)
session.start_debugging()
# Breakpoint at the end just to make sure we get all output events.
session.wait_for_stop()
session.request_continue()
session.wait_for_exit()
@ -51,7 +57,7 @@ def test_with_tab_in_output(pyfile, start_method, run_as):
assert session.output("stdout").startswith("Hello\tWorld")
@pytest.mark.parametrize("redirect", ["RedirectOutput", ""])
@pytest.mark.parametrize("redirect", ["enabled", "disabled"])
def test_redirect_output(pyfile, start_method, run_as, redirect):
@pyfile
def code_to_debug():
@ -60,23 +66,20 @@ def test_redirect_output(pyfile, start_method, run_as, redirect):
for i in [111, 222, 333, 444]:
print(i)
() # @bp1
() # @wait_for_output
with debug.Session() as session:
# By default 'RedirectOutput' is always set. So using this way
# to override the default in session.
session.debug_options = [redirect] if bool(redirect) else []
session.initialize(target=(run_as, code_to_debug), start_method=start_method)
session.set_breakpoints(code_to_debug, [code_to_debug.lines["bp1"]])
with debug.Session(start_method) as session:
if redirect == "disabled":
session.debug_options -= {"RedirectOutput"} # enabled by default
session.initialize(target=(run_as, code_to_debug))
session.set_breakpoints(code_to_debug, all)
session.start_debugging()
# Breakpoint at the end just to make sure we get all output events.
session.wait_for_stop()
session.request_continue()
session.wait_for_exit()
if redirect:
if redirect == "enabled":
assert session.output("stdout") == "111\n222\n333\n444\n"
else:
assert not session.output("stdout")

View file

@ -7,14 +7,15 @@ from __future__ import absolute_import, print_function, unicode_literals
import pytest
import sys
from ptvsd.common import fmt
from tests import debug, test_data
from tests.patterns import some
@pytest.mark.skipif(sys.platform == "win32", reason="Linux/Mac only test.")
@pytest.mark.parametrize("invalid_os_type", [True])
@pytest.mark.parametrize("os_type", ["INVALID", ""])
def test_client_ide_from_path_mapping_linux_backend(
pyfile, tmpdir, start_method, run_as, invalid_os_type
pyfile, tmpdir, start_method, run_as, os_type
):
"""
Test simulating that the backend is on Linux and the client is on Windows
@ -31,6 +32,8 @@ def test_client_ide_from_path_mapping_linux_backend(
with debug.Session(start_method) as session:
backchannel = session.setup_backchannel()
if os_type:
session.debug_options |= {fmt("CLIENT_OS_TYPE={0}", os_type)}
session.initialize(
target=(run_as, code_to_debug),
path_mappings=[
@ -40,8 +43,6 @@ def test_client_ide_from_path_mapping_linux_backend(
}
],
)
if invalid_os_type:
session.debug_options.append("CLIENT_OS_TYPE=INVALID")
session.set_breakpoints(
"c:\\temp\\src\\" + code_to_debug.basename,
[code_to_debug.lines["bp"]],

View file

@ -17,12 +17,9 @@ def test_set_expression(pyfile, start_method, run_as):
ptvsd.break_into_debugger()
backchannel.send(a)
with debug.Session() as session:
with debug.Session(start_method) as session:
backchannel = session.setup_backchannel()
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
)
session.initialize(target=(run_as, code_to_debug))
session.start_debugging()
hit = session.wait_for_stop()

View file

@ -26,11 +26,10 @@ def test_wait_on_normal_exit_enabled(pyfile, start_method, run_as):
ptvsd.break_into_debugger()
print() # line on which it'll actually break
with debug.Session() as session:
with debug.Session(start_method) as session:
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
debug_options=["WaitOnNormalExit"],
debug_options={"WaitOnNormalExit"},
expected_returncode=some.int,
)
session.start_debugging()
@ -60,12 +59,11 @@ def test_wait_on_abnormal_exit_enabled(pyfile, start_method, run_as):
backchannel.send("done")
sys.exit(12345)
with debug.Session() as session:
with debug.Session(start_method) as session:
backchannel = session.setup_backchannel()
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
debug_options=["WaitOnAbnormalExit"],
debug_options={"WaitOnAbnormalExit"},
expected_returncode=some.int,
)
session.start_debugging()
@ -90,12 +88,11 @@ def test_exit_normally_with_wait_on_abnormal_exit_enabled(pyfile, start_method,
ptvsd.break_into_debugger()
backchannel.send("done")
with debug.Session() as session:
with debug.Session(start_method) as session:
backchannel = session.setup_backchannel()
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
debug_options=["WaitOnAbnormalExit"],
debug_options={"WaitOnAbnormalExit"},
)
session.start_debugging()

View file

@ -25,10 +25,9 @@ def test_set_next_statement(pyfile, start_method, run_as):
line_numbers = code_to_debug.lines
with debug.Session() as session:
with debug.Session(start_method) as session:
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
ignore_unobserved=[Event("continued")],
env={"PTVSD_USE_CONTINUED": "1"},
)

View file

@ -10,28 +10,25 @@ from tests import debug
from tests.patterns import some
@pytest.mark.parametrize("start_method", ["launch"])
@pytest.mark.parametrize("with_bp", ["with_breakpoint", ""])
def test_stop_on_entry(pyfile, start_method, run_as, with_bp):
@pytest.mark.parametrize("breakpoint", ["breakpoint", ""])
def test_stop_on_entry(pyfile, run_as, breakpoint):
@pyfile
def code_to_debug():
from debug_me import backchannel # @bp
backchannel.send("done")
with debug.Session() as session:
with debug.Session("launch") as session:
backchannel = session.setup_backchannel()
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
debug_options=["StopOnEntry"],
debug_options={"StopOnEntry"},
)
if bool(with_bp):
session.set_breakpoints(code_to_debug, [code_to_debug.lines["bp"]])
if breakpoint:
session.set_breakpoints(code_to_debug, all)
session.start_debugging()
if bool(with_bp):
if breakpoint:
hit = session.wait_for_stop(reason="breakpoint")
assert hit.frames[0]["line"] == 1
assert hit.frames[0]["source"]["path"] == some.path(code_to_debug)

View file

@ -37,10 +37,9 @@ def test_thread_count(pyfile, start_method, run_as, count):
print("check here") # @bp
stop = True
with debug.Session() as session:
with debug.Session(start_method) as session:
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
program_args=[str(count)],
)
session.set_breakpoints(code_to_debug, [code_to_debug.lines["bp"]])
@ -104,8 +103,8 @@ def test_debug_this_thread(pyfile, start_method, run_as):
event.wait()
with debug.Session() as session:
session.initialize(target=(run_as, code_to_debug), start_method=start_method)
with debug.Session(start_method) as session:
session.initialize(target=(run_as, code_to_debug))
session.set_breakpoints(code_to_debug, [code_to_debug.lines["bp"]])
session.start_debugging()

View file

@ -26,10 +26,9 @@ def test_stack_format(pyfile, start_method, run_as, module, line):
def do_something():
print("break here") # @bp
with debug.Session() as session:
with debug.Session(start_method) as session:
session.initialize(
target=(run_as, code_to_debug),
start_method=start_method,
ignore_unobserved=[Event("stopped")],
)
session.set_breakpoints(test_module, [test_module.lines["bp"]])
@ -76,10 +75,9 @@ def test_module_events(pyfile, start_method, run_as):
do_something()
with debug.Session() as session:
with debug.Session(start_method) as session:
session.initialize(
target=(run_as, test_code),
start_method=start_method,
ignore_unobserved=[Event("stopped")],
)
session.set_breakpoints(module2, [module2.lines["bp"]])

View file

@ -16,5 +16,8 @@ pygments
## Used in Python code that is run/debugged by the tests:
django
flask
requests
# https://github.com/microsoft/ptvsd/issues/1574
# Remove specific version once Flask 1.1.2 is released.
flask==1.0.3

View file

@ -1,32 +1,29 @@
from debug_me import backchannel, ptvsd
from debug_me import backchannel, ptvsd, scratchpad
import os
import time
ptvsd.enable_attach(("localhost", int(os.environ["ATTACH1_TEST_PORT"])))
host = os.getenv('PTVSD_TEST_HOST', 'localhost')
port = os.getenv('PTVSD_TEST_PORT', '5678')
ptvsd.enable_attach((host, port))
if os.getenv('PTVSD_WAIT_FOR_ATTACH', None) is not None:
backchannel.send('wait_for_attach')
if int(os.environ["ATTACH1_WAIT_FOR_ATTACH"]):
backchannel.send("wait_for_attach")
ptvsd.wait_for_attach()
if os.getenv('PTVSD_IS_ATTACHED', None) is not None:
backchannel.send('is_attached')
if int(os.environ["ATTACH1_IS_ATTACHED"]):
backchannel.send("is_attached")
while not ptvsd.is_attached():
print("looping until is_attached")
time.sleep(0.1)
pause_test = True
if os.getenv('PTVSD_BREAK_INTO_DBG', None) is not None:
backchannel.send('break_into_debugger')
pause_test = False
if pause_test:
backchannel.wait_for('pause_test')
for _ in range(0, 20):
time.sleep(0.1)
print('looping')
else:
if int(os.environ["ATTACH1_BREAK_INTO_DEBUGGER"]):
backchannel.send("break_into_debugger?")
assert backchannel.receive() == "proceed"
ptvsd.break_into_debugger()
print('done') # @bp
print("break") # @break_into_debugger
else:
backchannel.send("loop?")
scratchpad["paused"] = False
assert backchannel.receive() == "proceed"
while not scratchpad["paused"]:
print("looping until paused")
time.sleep(0.1)