Fix #1551: Backchannel failures in tests

Fix various bugs around handling of disconnect in JsonIOStream and JsonMessageChannel.

Fix handling of DAP "terminated" event in debug.Session.

Add --ptvsd-logs and --pydevd-logs switches to pytest.

Improve message logging to fully capture the raw message data in the logs if deserialization fails.

Log all debuggee environment variables in debug.Session, and improve log readability.
This commit is contained in:
Pavel Minaev 2019-07-01 17:01:27 -07:00 committed by Pavel Minaev
parent 1e6d2269dd
commit 82b62b77b5
11 changed files with 266 additions and 113 deletions

1
.gitignore vendored
View file

@ -38,6 +38,7 @@ pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
tests/_logs/*.log
.tox/
.coverage
.coverage.*

View file

@ -19,6 +19,51 @@ import sys
import types
class JsonObject(object):
"""A wrapped Python object that formats itself as JSON when asked for a string
representation via str() or format().
"""
json_encoder_type = json.JSONEncoder
"""Used by __format__ when format_spec is not empty."""
json_encoder = json_encoder_type(indent=4)
"""The default encoder used by __format__ when format_spec is empty."""
def __init__(self, value):
self.value = value
def __repr__(self):
return repr(self.value)
def __str__(self):
return format(self)
def __format__(self, format_spec):
"""If format_spec is empty, uses self.json_encoder to serialize self.value
as a string. Otherwise, format_spec is treated as an argument list to be
passed to self.json_encoder_type - which defaults to JSONEncoder - and then
the resulting formatter is used to serialize self.value as a string.
Example::
fmt("{0!j} {0!j:indent=4,sort_keys=True}", x)
"""
if format_spec:
# At this point, format_spec is a string that looks something like
# "indent=4,sort_keys=True". What we want is to build a function call
# from that which looks like:
#
# json_encoder_type(indent=4,sort_keys=True)
#
# which we can then eval() to create our encoder instance.
make_encoder = "json_encoder_type(" + format_spec + ")"
encoder = eval(make_encoder, {"json_encoder_type": self.json_encoder_type})
else:
encoder = self.json_encoder
return encoder.encode(self.value)
class Formatter(string.Formatter, types.ModuleType):
"""A custom string.Formatter with support for JSON pretty-printing.
@ -30,13 +75,6 @@ class Formatter(string.Formatter, types.ModuleType):
placeholders are supported.
"""
# Because globals() go away after the module object substitution, all modules
# that were imported globally, but that are referenced by method bodies that
# can run after substitition occurred, must be re-imported here, so that they
# can be accessed via self.
json_encoder = json.JSONEncoder(indent=4)
# Because globals() go away after the module object substitution, all method bodies
# below must access globals via self instead, or re-import modules locally.
@ -54,7 +92,7 @@ class Formatter(string.Formatter, types.ModuleType):
def convert_field(self, value, conversion):
if conversion == "j":
return self.json_encoder.encode(value)
return self.JsonObject(value)
return super(self.Formatter, self).convert_field(value, conversion)

View file

@ -11,6 +11,7 @@ responses, and events.
https://microsoft.github.io/debug-adapter-protocol/overview#base-protocol
"""
import collections
import contextlib
import inspect
import itertools
@ -55,8 +56,7 @@ class JsonIOStream(object):
def from_socket(cls, socket, name=None):
"""Creates a new instance that sends and receives messages over a socket.
"""
if socket.gettimeout() is not None:
raise ValueError("Socket must be in blocking mode")
socket.settimeout(None) # make socket blocking
if name is None:
name = repr(socket)
socket_io = socket.makefile("rwb", 0)
@ -95,10 +95,10 @@ class JsonIOStream(object):
while True:
try:
line += self._reader.readline()
except Exception:
raise EOFError
except Exception as ex:
raise EOFError(str(ex))
if not line:
raise EOFError
raise EOFError("No more data")
if line.endswith(b"\r\n"):
line = line[0:-2]
return line
@ -112,16 +112,45 @@ class JsonIOStream(object):
decoder = decoder if decoder is not None else json.JSONDecoder()
# Parse the message, and try to log any failures using as much information
# as we already have at the point of the failure. For example, if it fails
# after decoding during JSON parsing, log as a Unicode string, rather than
# a bytestring.
# If any error occurs while reading and parsing the message, log the original
# raw message data as is, so that it's possible to diagnose missing or invalid
# headers, encoding issues, JSON syntax errors etc.
def log_message_and_exception(format_string="", *args, **kwargs):
if format_string:
format_string += "\n\n"
format_string += "{name} -->\n{raw_lines}"
raw_lines = b"".join(raw_chunks).split(b"\n")
raw_lines = "\n".join(repr(line) for line in raw_lines)
return log.exception(
format_string,
*args,
name=self.name,
raw_lines=raw_lines,
**kwargs
)
raw_chunks = []
headers = {}
while True:
line = self._read_line()
try:
line = self._read_line()
except Exception:
# Only log it if we have already read some headers, and are looking
# for a blank line terminating them. If this is the very first read,
# there's no message data to log in any case, and the caller might
# be anticipating the error - e.g. EOFError on disconnect.
if headers:
raise log_message_and_exception("Error while reading message headers:")
else:
raise
raw_chunks += [line, b"\n"]
if line == b"":
break
key, _, value = line.partition(b":")
headers[key] = value
@ -130,31 +159,41 @@ class JsonIOStream(object):
if not (0 <= length <= self.MAX_BODY_SIZE):
raise ValueError
except (KeyError, ValueError):
log.exception("{0} --> {1}", self.name, headers)
raise IOError("Content-Length is missing or invalid")
try:
raise IOError("Content-Length is missing or invalid:")
except Exception:
raise log_message_and_exception()
try:
body = b""
while length > 0:
chunk = self._reader.read(length)
body += chunk
length -= len(chunk)
except Exception:
if self._is_closing:
raise EOFError
else:
raise
body_start = len(raw_chunks)
body_remaining = length
while body_remaining > 0:
try:
chunk = self._reader.read(body_remaining)
except Exception:
if self._is_closing:
raise EOFError
else:
raise log_message_and_exception(
"Couldn't read the expected {0} bytes of body:",
length,
)
raw_chunks.append(chunk)
body_remaining -= len(chunk)
assert body_remaining == 0
body = b"".join(raw_chunks[body_start:])
try:
body = body.decode("utf-8")
except Exception:
raise log.exception("{0} --> {1}", self.name, body)
raise log_message_and_exception()
try:
body = decoder.decode(body)
except Exception:
raise log.exception("{0} --> {1}", self.name, body)
raise log_message_and_exception()
# If parsed successfully, log as JSON for readability.
log.debug("{0} --> {1!j}", self.name, body)
return body
@ -192,7 +231,7 @@ class JsonIOStream(object):
return fmt("{0}({1!r})", type(self).__name__, self.name)
class MessageDict(dict):
class MessageDict(collections.OrderedDict):
"""A specialized dict that is used for JSON message payloads - Request.arguments,
Response.body, and Event.body.
@ -210,13 +249,13 @@ class MessageDict(dict):
such guarantee for outgoing messages.
"""
def __init__(self, message, mapping=None):
def __init__(self, message, items=None):
assert message is None or isinstance(message, Message)
if mapping is None:
if items is None:
super(MessageDict, self).__init__()
else:
super(MessageDict, self).__init__(mapping)
super(MessageDict, self).__init__(items)
self.message = message
"""The Message object that owns this dict. If None, then MessageDict behaves
@ -239,9 +278,9 @@ class MessageDict(dict):
return wrap
__getitem__ = _invalid_if_no_key(dict.__getitem__)
__delitem__ = _invalid_if_no_key(dict.__delitem__)
pop = _invalid_if_no_key(dict.pop)
__getitem__ = _invalid_if_no_key(collections.OrderedDict.__getitem__)
__delitem__ = _invalid_if_no_key(collections.OrderedDict.__delitem__)
pop = _invalid_if_no_key(collections.OrderedDict.pop)
del _invalid_if_no_key
@ -656,6 +695,31 @@ class JsonMessageChannel(object):
"""
self._worker.join()
@staticmethod
def _prettify(message_dict):
"""Reorders items in a MessageDict such that it is more readable.
"""
# https://microsoft.github.io/debug-adapter-protocol/specification
keys = (
"seq",
"type",
"request_seq",
"success",
"command",
"event",
"message",
"arguments",
"body",
"error",
)
for key in keys:
try:
value = message_dict[key]
except KeyError:
continue
del message_dict[key]
message_dict[key] = value
@contextlib.contextmanager
def _send_message(self, message):
"""Sends a new message to the other party.
@ -674,7 +738,10 @@ class JsonMessageChannel(object):
assert "seq" not in message
with self:
seq = next(self._seq_iter)
message = MessageDict(None, message)
message["seq"] = seq
self._prettify(message)
with self:
yield seq
@ -1057,6 +1124,8 @@ class JsonMessageChannel(object):
# the Message they belong to, once it is instantiated.
def object_hook(d):
d = MessageDict(None, d)
if "seq" in d:
self._prettify(d)
d.associate_with = associate_with
message_dicts.append(d)
return d
@ -1082,20 +1151,23 @@ class JsonMessageChannel(object):
try:
return self.on_message(message)
except EOFError:
raise
except Exception:
log.exception(
raise log.exception(
"Fatal error while processing message for {0}:\n\n{1!r}",
self.name,
message,
)
raise
def _process_incoming_messages(self):
try:
log.debug("Starting message loop for {0}", self.name)
while True:
try:
self._process_incoming_message()
except EOFError:
except EOFError as ex:
log.debug("Exiting message loop for {0}: {1}", self.name, str(ex))
return False
finally:
try:

View file

@ -12,13 +12,12 @@ import sys
import time
import threading
import traceback
import pytest_timeout
from ptvsd.common import log
def dump():
"""Dump the stacks of all threads except the current thread.
"""Dump stacks of all threads in this process, except for the current thread.
"""
tid = threading.current_thread().ident
@ -53,14 +52,14 @@ def dump():
def dump_after(secs):
"""Invokes dump() on a background thread after waiting for the specified time.
Can be called from debugged code before the point after which it hangs,
to determine the cause of the hang when debugging a test.
"""
def dumper():
time.sleep(secs)
pytest_timeout.dump_stacks()
try:
dump()
except:
log.exception()
thread = threading.Thread(target=dumper)
thread.daemon = True

13
tests/_logs/README Normal file
View file

@ -0,0 +1,13 @@
Pass --ptvsd-logs and/or --pydevd-logs to pytest to create log files here for a run.
For example:
tox -e py37 -- --ptvsd-logs "tests/ptvsd/server/test_run.py::test_run[launch-file]"
A separate ptvsd-{pid}.log file will be created for every ptvsd process spawned by
the tests. However, because process IDs are reused by OS, logs may end up overwriting
earlier logs.
All pydevd logs go into the same file named pydevd.log - and subsequent tests will
overwrite earlier logs.
Thus, it is best to use this when running a single test, or a small number of tests.

View file

@ -26,6 +26,10 @@ from tests.timeline import Timeline, Event, Response
PTVSD_DIR = py.path.local(ptvsd.__file__) / ".."
PTVSD_PORT = net.get_test_server_port(5678, 5800)
# Added to the environment variables of every new debug.Session - after copying
# os.environ(), but before setting any session-specific variables.
PTVSD_ENV = {
}
# Code that is injected into the debuggee process when it does `import debug_me`,
# and start_method is attach_socket_*
@ -73,6 +77,7 @@ class Session(object):
self.log_dir = None
self.env = os.environ.copy()
self.env.update(PTVSD_ENV)
self.env['PYTHONPATH'] = str(test_data / "_PYTHONPATH")
self.env['PTVSD_SESSION_ID'] = str(self.id)
@ -109,6 +114,9 @@ class Session(object):
self.all_occurrences_of = self.timeline.all_occurrences_of
self.observe_all = self.timeline.observe_all
def __str__(self):
return fmt("ptvsd-{0}", self.id)
def __enter__(self):
return self
@ -141,9 +149,6 @@ class Session(object):
def ignore_unobserved(self, value):
self.timeline.ignore_unobserved = value
def __str__(self):
return fmt("ptvsd-{0}", self.id)
def close(self):
if self.socket:
try:
@ -349,30 +354,33 @@ class Session(object):
self.backchannel.listen()
self.env['PTVSD_BACKCHANNEL_PORT'] = str(self.backchannel.port)
# Force env to use str everywhere - this is needed for Python 2.7 on Windows.
env = {str(k): str(v) for k, v in self.env.items()}
env_str = "\n".join((
fmt("{0}={1}", env_name, env[env_name])
for env_name in sorted(self.env.keys())
))
log.info(
'{6} will have:\n\n'
'ptvsd: {0}\n'
'port: {7}\n'
'start method: {1}\n'
'target: ({2}) {3}\n'
'current directory: {4}\n'
'PYTHONPATH: {5}',
'{0} will have:\n\n'
'ptvsd: {1}\n'
'port: {2}\n'
'start method: {3}\n'
'target: ({4}) {5}\n'
'current directory: {6}\n'
'environment variables:\n\n{7}',
self,
py.path.local(ptvsd.__file__).dirpath(),
self.ptvsd_port,
self.start_method,
self.target[0],
self.target[1],
self.cwd,
self.env['PYTHONPATH'],
self,
self.ptvsd_port,
env_str,
)
spawn_args = usr_argv if self.start_method == 'attach_pid' else dbg_argv
log.info('Spawning {0}: {1!r}', self, spawn_args)
# Force env to use str everywhere - this is needed for Python 2.7 on Windows.
env = {str(k): str(v) for k, v in self.env.items()}
log.info('Spawning {0}: {1!j}', self, spawn_args)
self.process = subprocess.Popen(
spawn_args,
env=env,
@ -615,6 +623,15 @@ class Session(object):
def _process_event(self, event):
self.timeline.record_event(event.event, event.body, block=False)
if event.event == "terminated":
# Stop the message loop, since the ptvsd is going to close the connection
# from its end shortly after sending this event, and no further messages
# are expected.
log.info(
'Received "terminated" event from {0}; stopping message processing.',
self,
)
raise EOFError(fmt("{0} terminated", self))
def _process_response(self, request_occ, response):
self.timeline.record_response(request_occ, response.body, block=False)

View file

@ -7,16 +7,19 @@ from __future__ import absolute_import, print_function, unicode_literals
import contextlib
import os
from ptvsd.common import log
@contextlib.contextmanager
def enabled(filename):
os.environ['PYDEVD_DEBUG'] = 'True'
os.environ['PYDEVD_DEBUG_FILE'] = filename
yield
del os.environ['PYDEVD_DEBUG']
del os.environ['PYDEVD_DEBUG_FILE']
log.debug("pydevd log will be at {0}", filename)
try:
yield
finally:
del os.environ['PYDEVD_DEBUG']
del os.environ['PYDEVD_DEBUG_FILE']
def dump(why):
@ -28,10 +31,10 @@ def dump(why):
try:
f = open(pydevd_debug_file)
with f:
pydevd_log = f.read()
except Exception:
print('Test {0}, but no ptvsd log found'.format(why))
log.exception("Test {0}, but pydevd log {1} could not be retrieved.", why, pydevd_debug_file)
return
with f:
print('Test {0}; dumping pydevd log:'.format(why))
print(f.read())
log.info("Test {0}; pydevd log:\n\n{1}", why, pydevd_log)

View file

@ -12,6 +12,7 @@ import tempfile
import threading
import types
from ptvsd.common import timestamp
from tests import code, pydevd_log
__all__ = ['run_as', 'start_method', 'with_pydevd_log', 'daemon', 'pyfile']
@ -41,6 +42,13 @@ def start_method(request):
return request.param
@pytest.fixture(autouse=True)
def reset_timestamp(request):
timestamp.reset()
print("\n") # make sure logs start on a new line
yield
@pytest.fixture(autouse=True)
def with_pydevd_log(request, tmpdir):
"""Enables pydevd logging during the test run, and dumps the log if the test fails.

View file

@ -14,7 +14,32 @@ import sys
import sysconfig
import threading # noqa
from ptvsd.common import fmt, log, timestamp
from ptvsd.common import fmt, log
from tests import debug, pydevd_log
def pytest_addoption(parser):
parser.addoption(
"--ptvsd-logs",
action="store_true",
help="Write ptvsd logs to {rootdir}/tests/_logs/",
)
parser.addoption(
"--pydevd-logs",
action="store_true",
help="Write pydevd logs to {rootdir}/tests/_logs/",
)
def pytest_configure(config):
log_dir = config.rootdir / "tests" / "_logs"
if config.option.ptvsd_logs:
log.info("ptvsd logs will be in {0}", log_dir)
debug.PTVSD_ENV["PTVSD_LOG_DIR"] = str(log_dir)
if config.option.pydevd_logs:
log.info("pydevd logs will be in {0}", log_dir)
debug.PTVSD_ENV["PYDEVD_DEBUG"] = "True"
debug.PTVSD_ENV["PYDEVD_DEBUG_FILE"] = str(log_dir / "pydevd.log")
def pytest_report_header(config):
@ -82,39 +107,8 @@ def pytest_runtest_makereport(item, call):
setattr(item, result.when + '_result', result)
@pytest.hookimpl(hookwrapper=True, tryfirst=True)
def pytest_pyfunc_call(pyfuncitem):
# Resets the timestamp to zero for every new test.
timestamp.reset()
yield
# If a test times out and pytest tries to print the stacks of where it was hanging,
# we want to print the pydevd log as well. This is not a normal pytest hook - we
# just detour pytest_timeout.dump_stacks directly.
def print_pydevd_log(what):
assert what
pydevd_debug_file = os.environ.get('PYDEVD_DEBUG_FILE')
if not pydevd_debug_file:
return
try:
f = open(pydevd_debug_file)
except Exception:
print('Test {0}, but no ptvsd log found'.format(what))
return
with f:
print('Test {0}; dumping pydevd log:'.format(what))
print(f.read())
def dump_stacks_and_print_pydevd_log():
print_pydevd_log('timed out')
dump_stacks()
dump_stacks = pytest_timeout.dump_stacks
pytest_timeout.dump_stacks = dump_stacks_and_print_pydevd_log
_dump_stacks = pytest_timeout.dump_stacks
pytest_timeout.dump_stacks = lambda: (_dump_stacks(), pydevd_log.dump("timed out"))

View file

@ -18,12 +18,14 @@ import sys
assert "debug_me" in sys.modules
import debug_me
from ptvsd.common import fmt, messaging
from ptvsd.common import fmt, log, messaging
name = fmt("backchannel-{0}", debug_me.session_id)
port = int(os.getenv('PTVSD_BACKCHANNEL_PORT', 0))
if port:
print(fmt('Connecting backchannel-{0} to port {1}...', debug_me.session_id, port))
log.info('Connecting {0} to port {1}...', name, port)
_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
@ -35,8 +37,13 @@ if port:
@atexit.register
def _atexit_handler():
print(fmt('Shutting down backchannel-{0}...', debug_me.session_id))
log.info('Shutting down {0}...', name)
try:
_socket.shutdown(socket.SHUT_RDWR)
except Exception:
pass
finally:
try:
_socket.close()
except Exception:
pass

View file

@ -24,6 +24,7 @@ import os
# Needs to be set before backchannel can set things up.
session_id = int(os.getenv('PTVSD_SESSION_ID'))
name = "ptvsd-" + str(session_id)
# For `from debug_me import ...`.
import backchannel # noqa