Fix #1486: Message processing/forwarding loop

Implements the main message loops for IDE and debug server communication channels, and framework for DAP message handlers.

Lays out scaffolding for debuggee process management and connectivity.

Implements generic message propagation between IDE and debug server.

Partially implements playback of the initialization sequence when debug server is connected (without validation).

Implements `initialize`, `disconnect`, and `terminate` DAP messages.

Partially implements `attach`, `launch`, and `configurationDone` DAP messages.
This commit is contained in:
Pavel Minaev 2019-06-07 14:15:16 -07:00 committed by Pavel Minaev
parent ee7710adf6
commit deedc8f57b
7 changed files with 627 additions and 34 deletions

View file

@ -4,10 +4,64 @@
from __future__ import absolute_import, print_function, unicode_literals
import sys
# WARNING: ptvsd and submodules must not be imported on top level in this module,
# and should be imported locally inside main() instead.
def main():
raise NotImplementedError
import ptvsd
from ptvsd.common import log
from ptvsd.adapter import channels
log.to_file()
chan = channels.Channels()
chan.connect_to_ide()
chan.ide.send_event(
"output",
{
"category": "telemetry",
"output": "ptvsd.adapter",
"data": {"version": ptvsd.__version__},
},
)
chan.ide.wait()
# There might not be a server connection yet at this point.
if chan.server is not None:
chan.server.wait()
if __name__ == "__main__":
# ptvsd can also be invoked directly rather than via -m. In this case, the first
# entry on sys.path is the one added automatically by Python for the directory
# containing this file. This means that import ptvsd will not work, since we need
# the parent directory of ptvsd/ to be in sys.path, rather than ptvsd/adapter/.
#
# The other issue is that many other absolute imports will break, because they
# will be resolved relative to ptvsd/adapter/ - e.g. `import state` will then try
# to import ptvsd/adapter/state.py.
#
# To fix both, we need to replace the automatically added entry such that it points
# at parent directory of ptvsd/ instead of ptvsd/adapter, import ptvsd with that
# in sys.path, and then remove the first entry entry altogether, so that it doesn't
# affect any further imports we might do. For example, suppose the user did:
#
# python /foo/bar/ptvsd/adapter ...
#
# At the beginning of this script, sys.path will contain "/foo/bar/ptvsd/adapter"
# as the first entry. What we want is to replace it with "/foo/bar', then import
# ptvsd with that in effect, and then remove the replaced entry before any more
# code runs. The imported ptvsd module will remain in sys.modules, and thus all
# future imports of it or its submodules will resolve accordingly.
if "ptvsd" not in sys.modules:
# Do not use dirname() to walk up - this can be a relative path, e.g. ".".
sys.path[0] = sys.path[0] + "/../../"
__import__("ptvsd")
del sys.path[0]
main()

View file

@ -0,0 +1,51 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import absolute_import, print_function, unicode_literals
from ptvsd.common import messaging, singleton
class Channels(singleton.ThreadSafeSingleton):
ide = None
"""DAP channel to the IDE over stdin/stdout.
Created by main() as soon as the adapter process starts.
When the IDE disconnects, the channel remains, but is closed, and will raise
EOFError on writes.
"""
server = None
"""DAP channel to the debug server over a socket.
Created when handling the "attach" or "launch" request.
When the server disconnects, the channel remains, but is closed, and will raise
EOFError on writes.
"""
@singleton.autolocked_method
def connect_to_ide(self):
assert self.ide is None
# Import message handlers lazily to avoid circular imports.
from ptvsd.adapter import messages
ide_channel = messaging.JsonIOStream.from_stdio("IDE")
self.ide = messaging.JsonMessageChannel(
ide_channel, messages.IDEMessages(), ide_channel.name
)
self.ide.start()
@singleton.autolocked_method
def connect_to_server(self, address):
assert self.server is None
raise NotImplementedError
@singleton.autolocked_method
def accept_connection_from_server(self, address):
assert self.server is None
raise NotImplementedError

View file

@ -0,0 +1,36 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import absolute_import, print_function, unicode_literals
import atexit
terminate_at_exit = True
"""Whether the debuggee process should be terminated when the adapter process exits,
or allowed to continue running.
"""
def launch_and_connect(request):
"""Launch the process as requested by the DAP "launch" request, with the debug
server running inside the process; and connect to that server.
"""
raise NotImplementedError
def terminate(after=0):
"""Terminate the debuggee process, if it is still alive after the specified time.
"""
pass # TODO
def _atexit_handler():
if terminate_at_exit:
terminate()
atexit.register(_atexit_handler)

View file

@ -0,0 +1,265 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import absolute_import, print_function, unicode_literals
from ptvsd.common import log, messaging, singleton
from ptvsd.adapter import channels, debuggee, state
class Shared(singleton.ThreadSafeSingleton):
"""Global state shared between IDE and server handlers."""
client_id = "" # always a string to avoid None checks
class Messages(singleton.Singleton):
# Misc helpers that are identical for both IDEMessages and ServerMessages.
_channels = channels.Channels()
@property
def _ide(self):
return self._channels.ide
@property
def _server(self):
"""Raises RequestFailure if the server is not available.
To test whether it is available or not, use _channels.server instead, and
check for None.
"""
server = self._channels.server
if server is None:
messaging.raise_failure("Connection to debug server is not established yet")
return server
# Specifies the allowed adapter states for a message handler - if the corresponding
# message is received in a state that is not listed, the handler is not invoked.
# If the message is a request, a failed response is returned.
def _only_allowed_while(*states):
def decorate(handler):
def handle_if_allowed(self, message):
current_state = state.current()
if current_state in states:
return handler(self, message)
if isinstance(message, messaging.Request):
messaging.raise_failure(
"Request {0!r} is not allowed in adapter state {1!r}.",
message.command,
current_state,
)
return handle_if_allowed
return decorate
class IDEMessages(Messages):
"""Message handlers and the associated global state for the IDE channel.
"""
_only_allowed_while = Messages._only_allowed_while
# The contents of the "initialize" response that is sent from the adapter to the IDE,
# and is expected to match what the debug server sends to the adapter once connected.
_INITIALIZE_RESPONSE_BODY = {
"supportsCompletionsRequest": True,
"supportsConditionalBreakpoints": True,
"supportsConfigurationDoneRequest": True,
"supportsDebuggerProperties": True,
"supportsDelayedStackTraceLoading": True,
"supportsEvaluateForHovers": True,
"supportsExceptionInfoRequest": True,
"supportsExceptionOptions": True,
"supportsHitConditionalBreakpoints": True,
"supportsLogPoints": True,
"supportsModulesRequest": True,
"supportsSetExpression": True,
"supportsSetVariable": True,
"supportsValueFormattingOptions": True,
"supportTerminateDebuggee": True,
"supportsGotoTargetsRequest": True,
"exceptionBreakpointFilters": [
{"filter": "raised", "label": "Raised Exceptions", "default": False},
{"filter": "uncaught", "label": "Uncaught Exceptions", "default": True},
],
}
# Until "launch" or "attach", there's no _channels.server, and so we can't propagate
# messages. But they will need to be replayed once we establish connection to server,
# so store them here until then. After all messages are replayed, it is set to None.
_initial_messages = []
terminate_on_disconnect = True
# A decorator to add the message to initial_messages if needed before handling it.
# Must be applied to the handler for every message that can be received before
# connection to the debug server can be established while handling attach/launch,
# and that must be replayed to the server once it is established.
def _replay_to_server(handler):
def store_and_handle(self, message):
if self.initial_messages is not None:
self.initial_messages.append(message)
return handler(self, message)
return store_and_handle
# Generic event handler. There are no specific handlers for IDE events, because
# there are no events from the IDE in DAP - but we propagate them if we can, in
# case some events appear in future protocol versions.
@_replay_to_server
def event(self, event):
if self._server is not None:
self._server.propagate(event)
# Generic request handler, used if there's no specific handler below.
@_replay_to_server
def request(self, request):
response = self._server.message(request).wait_for_response()
return response.body
@_replay_to_server
@_only_allowed_while("starting")
def initialize_request(self, request):
with Shared() as shared:
shared.client_id = str(request.arguments.get("clientID", ""))
state.change("initializing")
return self._INITIALIZE_RESPONSE_BODY
# Handles various attributes common to both "launch" and "attach".
def _debug_config(self, request):
assert request.command in ("launch", "attach")
pass # TODO: options and debugOptions
pass # TODO: pathMappings (unless server does that entirely?)
@_replay_to_server
@_only_allowed_while("initializing")
def launch_request(self, request):
self._debug_config(request)
# TODO: nodebug
debuggee.launch_and_connect(request)
return self._configure()
@_replay_to_server
@_only_allowed_while("initializing")
def attach_request(self, request):
self.terminate_on_disconnect = False
self._debug_config(request)
# TODO: get address and port
channels.connect_to_server()
return self._configure()
# Handles the configuration request sequence for "launch" or "attach", from when
# the "initialized" event is sent, to when "configurationDone" is received; see
# https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
def _configure(self):
log.debug("Replaying previously received messages to server.")
for msg in self.initial_messages:
# TODO: validate server response to ensure it matches our own earlier.
self._server.propagate(msg)
log.debug("Finished replaying messages to server.")
self.initial_messages = None
# Let the IDE know that it can begin configuring the adapter.
state.change("configuring")
self._ide.send_event("initialized")
# Process further incoming messages, until we get "configurationDone".
while state.current() == "configuring":
yield
@_only_allowed_while("configuring")
def configurationDone_request(self, request):
state.change("running")
return self._server.propagate(request)
# Handle a "disconnect" or a "terminate" request.
def _shutdown(self, request, terminate):
if request.arguments.get("restart", False):
messaging.raise_failure("Restart is not supported")
response = self._server.propagate(request)
state.change("shutting_down")
if terminate:
debuggee.terminate()
return response
@_only_allowed_while("running")
def disconnect_request(self, request):
# We've already decided earlier based on whether it was launch or attach, but
# let the request override that.
terminate = request.arguments.get(
"terminateDebuggee", self.terminate_on_disconnect
)
return self._shutdown(request, terminate)
@_only_allowed_while("running")
def terminate_request(self, request):
return self._shutdown(request, terminate=True)
# Adapter's stdout was closed by IDE.
def disconnect(self):
try:
if state.current() == "shutting_down":
# Graceful disconnect. We have already received "disconnect" or
# "terminate", and propagated it to the server. Nothing to do.
return
# Can happen if the IDE was force-closed or crashed.
log.warn('IDE disconnected without sending "disconnect" or "terminate".')
state.change("shutting_down")
if self._server is None:
if self.terminate_on_disconnect:
# It happened before we connected to the server, so we cannot gracefully
# terminate the debuggee. Force-kill it immediately.
debuggee.terminate()
return
# Try to shut down the server gracefully, even though the adapter wasn't.
command = "terminate" if self.terminate_on_disconnect else "disconnect"
try:
self._server.send_request(command)
except Exception:
# The server might have already disconnected as well, or it might fail
# to handle the request. But we can't report failure to the IDE at this
# point, and it's already logged, so just move on.
pass
finally:
if self.terminate_on_disconnect:
# If debuggee is still there, give it some time to terminate itself,
# then force-kill. Since the IDE is gone already, and nobody is waiting
# for us to respond, there's no rush.
debuggee.terminate(after=60)
class ServerMessages(Messages):
"""Message handlers and the associated global state for the server channel.
"""
_channels = channels.Channels()
# Socket was closed by the server.
def disconnect(self):
log.info("Debug server disconnected")
# Generic request handler, used if there's no specific handler below.
def request(self, request):
return self._ide.propagate(request)
# Generic event handler, used if there's no specific handler below.
def event(self, event):
self._ide.propagate(event)

View file

@ -0,0 +1,44 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import absolute_import, print_function, unicode_literals
"""Tracks the overall state of the adapter, and enforces valid state transitions.
"""
from ptvsd.common import log, singleton
# Order defines valid transitions.
STATES = (
"starting", # before "initialize" is received
"initializing", # until "initialized" is sent
"configuring", # until "configurationDone" is received
"running", # until "disconnect" or "terminate" is received
"shutting_down", # until the adapter process exits
)
class State(singleton.ThreadSafeSingleton):
_state = STATES[0]
@property
@singleton.autolocked_method
def state(self):
return self._state
@state.setter
@singleton.autolocked_method
def state(self, new_state):
assert STATES.index(self._state) < STATES.index(new_state)
log.debug("Adapter state changed from {0!r} to {1!r}", self._state, new_state)
self._state = self.new_state
def current():
return State().state
def change(new_state):
State().state = new_state

View file

@ -2,7 +2,7 @@
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import print_function, with_statement, absolute_import
from __future__ import absolute_import, print_function, unicode_literals
import contextlib
import inspect
@ -11,7 +11,7 @@ import json
import sys
import threading
import ptvsd.common.log
from ptvsd.common import log
from ptvsd.common._util import new_hidden_thread
@ -84,6 +84,11 @@ class JsonIOStream(object):
if there are no more objects to be read.
"""
# Parse the message, and try to log any failures using as much information
# as we already have at the point of the failure. For example, if it fails
# after decoding during JSON parsing, log as a Unicode string, rather than
# a bytestring.
headers = {}
while True:
line = self._read_line()
@ -97,7 +102,7 @@ class JsonIOStream(object):
if not (0 <= length <= self.MAX_BODY_SIZE):
raise ValueError
except (KeyError, ValueError):
ptvsd.common.log.exception('{0} --> {1}', self.name, headers)
log.exception('{0} --> {1}', self.name, headers)
raise IOError('Content-Length is missing or invalid')
try:
@ -112,20 +117,19 @@ class JsonIOStream(object):
else:
raise
if isinstance(body, bytes):
try:
body = body.decode('utf-8')
except Exception:
ptvsd.common.log.exception('{0} --> {1}', self.name, body)
raise
try:
body = body.decode('utf-8')
except Exception:
log.exception('{0} --> {1}', self.name, body)
raise
try:
body = json.loads(body)
except Exception:
ptvsd.common.log.exception('{0} --> {1}', self.name, body)
log.exception('{0} --> {1}', self.name, body)
raise
ptvsd.common.log.debug('{0} --> {1!j}', self.name, body)
log.debug('{0} --> {1!j}', self.name, body)
return body
@ -138,26 +142,34 @@ class JsonIOStream(object):
try:
body = json.dumps(value, sort_keys=True)
except Exception:
ptvsd.common.log.exception('{0} <-- {1!r}', self.name, value)
log.exception('{0} <-- {1!r}', self.name, value)
if not isinstance(body, bytes):
body = body.encode('utf-8')
try:
header = 'Content-Length: %d\r\n\r\n' % len(body)
if not isinstance(header, bytes):
header = header.encode('ascii')
header = u'Content-Length: {0}\r\n\r\n'.format(len(body))
header = header.encode('ascii')
try:
self._writer.write(header)
self._writer.write(body)
except Exception:
ptvsd.common.log.exception('{0} <-- {1!j}', self.name, value)
log.exception('{0} <-- {1!j}', self.name, value)
raise
ptvsd.common.log.debug('{0} <-- {1!j}', self.name, value)
log.debug('{0} <-- {1!j}', self.name, value)
class Request(object):
class Message(object):
"""Represents an incoming or an outgoing message.
"""
def __init__(self, channel, seq):
self.channel = channel
self.seq = seq
class Request(Message):
"""Represents an incoming or an outgoing request.
Incoming requests are represented by instances of this class.
@ -167,8 +179,7 @@ class Request(object):
"""
def __init__(self, channel, seq, command, arguments):
self.channel = channel
self.seq = seq
super(Request, self).__init__(channel, seq)
self.command = command
self.arguments = arguments
self.response = None
@ -179,8 +190,8 @@ class OutgoingRequest(Request):
response to be received, and register a response callback.
"""
def __init__(self, *args):
super(OutgoingRequest, self).__init__(*args)
def __init__(self, channel, seq, command, arguments):
super(OutgoingRequest, self).__init__(channel, seq, command, arguments)
self._lock = threading.Lock()
self._got_response = threading.Event()
self._callback = lambda _: None
@ -228,13 +239,12 @@ class OutgoingRequest(Request):
callback(response)
class Response(object):
"""Represents a response to a Request.
class Response(Message):
"""Represents an incoming or an outgoing response to a Request.
"""
def __init__(self, channel, seq, request, body):
self.channel = channel
self.seq = seq
super(Response, self).__init__(channel, seq)
self.request = request
"""Request object that this is a response to.
@ -257,13 +267,12 @@ class Response(object):
return not isinstance(self.body, Exception)
class Event(object):
"""Represents a received event.
class Event(Message):
"""Represents an incoming event.
"""
def __init__(self, channel, seq, event, body):
self.channel = channel
self.seq = seq
super(Event, self).__init__(channel, seq)
self.event = event
self.body = body
@ -345,6 +354,17 @@ class JsonMessageChannel(object):
with self._send_message('event', d):
pass
def propagate(self, message):
"""Sends a new message with the same type and payload.
If it was a request, returns the new OutgoingRequest object for it.
"""
if isinstance(message, Request):
return self.send_request(message.command, message.arguments)
else:
return self.send_event(message.event, message.body)
def _send_response(self, request, body):
d = {
'request_seq': request.seq,
@ -467,7 +487,7 @@ class JsonMessageChannel(object):
try:
return self.on_message(message)
except Exception:
ptvsd.common.log.exception('Error while processing message for {0}:\n\n{1!r}', self.name, message)
log.exception('Error while processing message for {0}:\n\n{1!r}', self.name, message)
raise
def _process_incoming_messages(self):
@ -481,7 +501,7 @@ class JsonMessageChannel(object):
try:
self.on_disconnect()
except Exception:
ptvsd.common.log.exception('Error while processing disconnect for {0}', self.name)
log.exception('Error while processing disconnect for {0}', self.name)
raise
@ -494,3 +514,16 @@ class MessageHandlers(object):
def __init__(self, **kwargs):
for name, func in kwargs.items():
setattr(self, name, func)
def raise_failure(fmt, *args, **kwargs):
"""Raises RequestFailure from the point at which it is invoked with the specified
formatted message. The message is also immediately logged.
"""
msg = log.formatter.format(fmt, *args, **kwargs)
try:
raise RequestFailure(msg)
except RequestFailure:
log.exception()
raise

View file

@ -0,0 +1,110 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import absolute_import, print_function, unicode_literals
import threading
class Singleton(object):
"""A base class for a class of a singleton object.
For any derived class T, the first invocation of T() will create the instance,
and any future invocations of T() will return that instance.
Concurrent invocations of T() from different threads are safe.
"""
# All singletons share a single global construction lock, since we need to lock
# before we can construct any objects - it cannot be created per-class in __new__.
_lock = threading.RLock()
# Specific subclasses will get their own _instance set in __new__.
_instance = None
def __new__(cls):
if not cls._instance:
with cls._lock:
if not cls._instance:
cls._instance = object.__new__(cls)
return cls._instance
def __init__(self):
"""For singletons, __init__ is called on every access, not just on initial
creation. Initialization of attributes in derived classes should be done
on class level instead.
"""
pass
class ThreadSafeSingleton(Singleton):
"""A singleton that incorporates a lock for thread-safe access to its members.
The lock can be acquired using the context manager protocol, and thus idiomatic
use is in conjunction with a with-statement. For example, given derived class T::
with T() as t:
t.x = t.frob(t.y)
All access to the singleton from the outside should follow this pattern for both
attributes and method calls. Singleton members can assume that self is locked by
the caller while they're executing, but recursive locking of the same singleton
on the same thread is also permitted.
"""
# TODO: use a separate data lock for each subclass to reduce contention.
def __enter__(self):
type(self)._lock.acquire()
return self
def __exit__(self, exc_type, exc_value, exc_tb):
type(self)._lock.release()
# Prevent callers from reading or writing attributes without locking, except for
# methods specifically marked @threadsafe_method. Such methods should perform
# the necessary locking to guarantee safety for the callers.
@staticmethod
def assert_locked(self):
lock = type(self)._lock
assert lock.acquire(blocking=False), (
"ThreadSafeSingleton accessed without locking. Either use with-statement, "
"or if it is a method or property, mark it as @threadsafe_method or with "
"@autolocked_method, as appropriate."
)
lock.release()
def __getattribute__(self, name):
value = object.__getattribute__(self, name)
if not getattr(value, 'is_threadsafe_method', False):
ThreadSafeSingleton.assert_locked(self)
return value
def __setattr__(self, name, value):
ThreadSafeSingleton.assert_locked(self)
return object.__setattr__(self, name, value)
def threadsafe_method(func):
"""Marks a method of a ThreadSafeSingleton-derived class as inherently thread-safe.
A method so marked must either not use any singleton state, or lock it appropriately.
"""
func.is_threadsafe_method = True
return func
def autolocked_method(func):
"""Automatically synchronizes all calls of a method of a ThreadSafeSingleton-derived
class by locking the singleton for the duration of each call.
"""
@threadsafe_method
def lock_and_call(self, *args, **kwargs):
with self:
return func(self, *args, **kwargs)
return lock_and_call