mirror of
https://github.com/microsoft/debugpy.git
synced 2025-12-23 08:48:12 +00:00
Move VSCMessages and PyDevdMessages under the helpers package.
This commit is contained in:
parent
041c197269
commit
60d4148131
5 changed files with 171 additions and 161 deletions
|
|
@ -1,2 +1,3 @@
|
|||
from ._fake import FakePyDevd # noqa
|
||||
from ._messages import PyDevdMessages # noqa
|
||||
from ._pydevd import RawMessage # noqa
|
||||
|
|
|
|||
103
tests/helpers/pydevd/_messages.py
Normal file
103
tests/helpers/pydevd/_messages.py
Normal file
|
|
@ -0,0 +1,103 @@
|
|||
try:
|
||||
import urllib.parse as urllib
|
||||
except ImportError:
|
||||
import urllib
|
||||
|
||||
from _pydevd_bundle import pydevd_xml
|
||||
from _pydevd_bundle.pydevd_comm import (
|
||||
CMD_SEND_CURR_EXCEPTION_TRACE,
|
||||
)
|
||||
|
||||
from tests.helpers.protocol import MessageCounters
|
||||
from ._fake import FakePyDevd
|
||||
|
||||
|
||||
class PyDevdMessages(object):
|
||||
|
||||
protocol = FakePyDevd.PROTOCOL
|
||||
|
||||
def __init__(self,
|
||||
request_seq=1000000000, # ptvsd requests to pydevd
|
||||
response_seq=0, # PyDevd responses/events to ptvsd
|
||||
event_seq=None,
|
||||
):
|
||||
self.counters = MessageCounters(
|
||||
request_seq,
|
||||
response_seq,
|
||||
event_seq,
|
||||
)
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.counters, name)
|
||||
|
||||
def new_request(self, cmdid, *args, **kwargs):
|
||||
"""Return a new PyDevd request message."""
|
||||
seq = kwargs.pop('seq', None)
|
||||
if seq is None:
|
||||
seq = self.counters.next_request()
|
||||
return self._new_message(cmdid, seq, args, **kwargs)
|
||||
|
||||
def new_response(self, req, *args):
|
||||
"""Return a new VSC response message."""
|
||||
#seq = kwargs.pop('seq', None)
|
||||
#if seq is None:
|
||||
# seq = next(self.response_seq)
|
||||
req = self.protocol.parse(req)
|
||||
return self._new_message(req.cmdid, req.seq, args)
|
||||
|
||||
def new_event(self, cmdid, *args, **kwargs):
|
||||
"""Return a new VSC event message."""
|
||||
seq = kwargs.pop('seq', None)
|
||||
if seq is None:
|
||||
seq = self.counters.next_event()
|
||||
return self._new_message(cmdid, seq, args, **kwargs)
|
||||
|
||||
def _new_message(self, cmdid, seq, args=()):
|
||||
text = '\t'.join(args)
|
||||
msg = (cmdid, seq, text)
|
||||
return self.protocol.parse(msg)
|
||||
|
||||
def format_threads(self, *threads):
|
||||
text = '<xml>'
|
||||
for thread in threads: # (tid, tname)
|
||||
text += '<thread id="{}" name="{}" />'.format(*thread)
|
||||
text += '</xml>'
|
||||
return text
|
||||
|
||||
def format_frames(self, threadid, reason, *frames):
|
||||
text = '<xml>'
|
||||
text += '<thread id="{}" stop_reason="{}">'.format(threadid, reason)
|
||||
fmt = '<frame id="{}" name="{}" file="{}" line="{}" />'
|
||||
for frame in frames: # (fid, func, filename, line)
|
||||
text += fmt.format(*frame)
|
||||
text += '</thread>'
|
||||
text += '</xml>'
|
||||
return text
|
||||
|
||||
def format_variables(self, *variables):
|
||||
text = '<xml>'
|
||||
for name, value in variables:
|
||||
if isinstance(value, str) and value.startswith('err:'):
|
||||
value = pydevd_xml.ExceptionOnEvaluate(value[4:])
|
||||
text += pydevd_xml.var_to_xml(value, name)
|
||||
text += '</xml>'
|
||||
return urllib.quote(text)
|
||||
|
||||
def format_exception(self, threadid, exc, frame):
|
||||
frameid, _, _, _ = frame
|
||||
name = pydevd_xml.make_valid_xml_value(type(exc).__name__)
|
||||
description = pydevd_xml.make_valid_xml_value(str(exc))
|
||||
|
||||
info = '<xml>'
|
||||
info += '<thread id="{}" />'.format(threadid)
|
||||
info += '</xml>'
|
||||
return '{}\t{}\t{}\t{}'.format(
|
||||
frameid,
|
||||
name or 'exception: type unknown',
|
||||
description or 'exception: no description',
|
||||
self.format_frames(
|
||||
threadid,
|
||||
CMD_SEND_CURR_EXCEPTION_TRACE,
|
||||
frame,
|
||||
),
|
||||
)
|
||||
|
|
@ -1,2 +1,3 @@
|
|||
from ._fake import FakeVSC # noqa
|
||||
from ._messages import VSCMessages # noqa
|
||||
from ._vsc import parse_message, RawMessage, Request, Response, Event # noqa
|
||||
|
|
|
|||
64
tests/helpers/vsc/_messages.py
Normal file
64
tests/helpers/vsc/_messages.py
Normal file
|
|
@ -0,0 +1,64 @@
|
|||
from tests.helpers.protocol import MessageCounters
|
||||
from ._fake import FakeVSC
|
||||
|
||||
|
||||
class VSCMessages(object):
|
||||
|
||||
protocol = FakeVSC.PROTOCOL
|
||||
|
||||
def __init__(self,
|
||||
request_seq=0, # VSC requests to ptvsd
|
||||
response_seq=0, # ptvsd responses/events to VSC
|
||||
event_seq=None,
|
||||
):
|
||||
self.counters = MessageCounters(
|
||||
request_seq,
|
||||
response_seq,
|
||||
event_seq,
|
||||
)
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.counters, name)
|
||||
|
||||
def new_request(self, command, seq=None, **args):
|
||||
"""Return a new VSC request message."""
|
||||
if seq is None:
|
||||
seq = self.counters.next_request()
|
||||
return {
|
||||
'type': 'request',
|
||||
'seq': seq,
|
||||
'command': command,
|
||||
'arguments': args,
|
||||
}
|
||||
|
||||
def new_response(self, req, seq=None, **body):
|
||||
"""Return a new VSC response message."""
|
||||
return self._new_response(req, None, seq, body)
|
||||
|
||||
def new_failure(self, req, err, seq=None, **body):
|
||||
"""Return a new VSC response message."""
|
||||
return self._new_response(req, err, body=body)
|
||||
|
||||
def _new_response(self, req, err=None, seq=None, body=None):
|
||||
if seq is None:
|
||||
seq = self.counters.next_response()
|
||||
return {
|
||||
'type': 'response',
|
||||
'seq': seq,
|
||||
'request_seq': req['seq'],
|
||||
'command': req['command'],
|
||||
'success': err is None,
|
||||
'message': err or '',
|
||||
'body': body,
|
||||
}
|
||||
|
||||
def new_event(self, eventname, seq=None, **body):
|
||||
"""Return a new VSC event message."""
|
||||
if seq is None:
|
||||
seq = self.counters.next_event()
|
||||
return {
|
||||
'type': 'event',
|
||||
'seq': seq,
|
||||
'event': eventname,
|
||||
'body': body,
|
||||
}
|
||||
|
|
@ -2,13 +2,8 @@ from collections import namedtuple
|
|||
import contextlib
|
||||
import platform
|
||||
import threading
|
||||
try:
|
||||
import urllib.parse as urllib
|
||||
except ImportError:
|
||||
import urllib
|
||||
import warnings
|
||||
|
||||
from _pydevd_bundle import pydevd_xml
|
||||
from _pydevd_bundle.pydevd_comm import (
|
||||
CMD_VERSION,
|
||||
CMD_LIST_THREADS,
|
||||
|
|
@ -23,9 +18,8 @@ from _pydevd_bundle.pydevd_comm import (
|
|||
)
|
||||
|
||||
from ptvsd import wrapper
|
||||
from tests.helpers.protocol import MessageCounters
|
||||
from tests.helpers.pydevd import FakePyDevd
|
||||
from tests.helpers.vsc import FakeVSC
|
||||
from tests.helpers.pydevd import FakePyDevd, PyDevdMessages
|
||||
from tests.helpers.vsc import FakeVSC, VSCMessages
|
||||
|
||||
|
||||
OS_ID = 'WINDOWS' if platform.system() == 'Windows' else 'UNIX'
|
||||
|
|
@ -127,159 +121,6 @@ class Threads(object):
|
|||
self._remove(t)
|
||||
|
||||
|
||||
class PyDevdMessages(object):
|
||||
|
||||
protocol = FakePyDevd.PROTOCOL
|
||||
|
||||
def __init__(self,
|
||||
request_seq=1000000000, # ptvsd requests to pydevd
|
||||
response_seq=0, # PyDevd responses/events to ptvsd
|
||||
event_seq=None,
|
||||
):
|
||||
self.counters = MessageCounters(
|
||||
request_seq,
|
||||
response_seq,
|
||||
event_seq,
|
||||
)
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.counters, name)
|
||||
|
||||
def new_request(self, cmdid, *args, **kwargs):
|
||||
"""Return a new PyDevd request message."""
|
||||
seq = kwargs.pop('seq', None)
|
||||
if seq is None:
|
||||
seq = self.counters.next_request()
|
||||
return self._new_message(cmdid, seq, args, **kwargs)
|
||||
|
||||
def new_response(self, req, *args):
|
||||
"""Return a new VSC response message."""
|
||||
#seq = kwargs.pop('seq', None)
|
||||
#if seq is None:
|
||||
# seq = next(self.response_seq)
|
||||
req = self.protocol.parse(req)
|
||||
return self._new_message(req.cmdid, req.seq, args)
|
||||
|
||||
def new_event(self, cmdid, *args, **kwargs):
|
||||
"""Return a new VSC event message."""
|
||||
seq = kwargs.pop('seq', None)
|
||||
if seq is None:
|
||||
seq = self.counters.next_event()
|
||||
return self._new_message(cmdid, seq, args, **kwargs)
|
||||
|
||||
def _new_message(self, cmdid, seq, args=()):
|
||||
text = '\t'.join(args)
|
||||
msg = (cmdid, seq, text)
|
||||
return self.protocol.parse(msg)
|
||||
|
||||
def format_threads(self, *threads):
|
||||
text = '<xml>'
|
||||
for thread in threads: # (tid, tname)
|
||||
text += '<thread id="{}" name="{}" />'.format(*thread)
|
||||
text += '</xml>'
|
||||
return text
|
||||
|
||||
def format_frames(self, threadid, reason, *frames):
|
||||
text = '<xml>'
|
||||
text += '<thread id="{}" stop_reason="{}">'.format(threadid, reason)
|
||||
fmt = '<frame id="{}" name="{}" file="{}" line="{}" />'
|
||||
for frame in frames: # (fid, func, filename, line)
|
||||
text += fmt.format(*frame)
|
||||
text += '</thread>'
|
||||
text += '</xml>'
|
||||
return text
|
||||
|
||||
def format_variables(self, *variables):
|
||||
text = '<xml>'
|
||||
for name, value in variables:
|
||||
if isinstance(value, str) and value.startswith('err:'):
|
||||
value = pydevd_xml.ExceptionOnEvaluate(value[4:])
|
||||
text += pydevd_xml.var_to_xml(value, name)
|
||||
text += '</xml>'
|
||||
return urllib.quote(text)
|
||||
|
||||
def format_exception(self, threadid, exc, frame):
|
||||
frameid, _, _, _ = frame
|
||||
name = pydevd_xml.make_valid_xml_value(type(exc).__name__)
|
||||
description = pydevd_xml.make_valid_xml_value(str(exc))
|
||||
|
||||
info = '<xml>'
|
||||
info += '<thread id="{}" />'.format(threadid)
|
||||
info += '</xml>'
|
||||
return '{}\t{}\t{}\t{}'.format(
|
||||
frameid,
|
||||
name or 'exception: type unknown',
|
||||
description or 'exception: no description',
|
||||
self.format_frames(
|
||||
threadid,
|
||||
CMD_SEND_CURR_EXCEPTION_TRACE,
|
||||
frame,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class VSCMessages(object):
|
||||
|
||||
protocol = FakeVSC.PROTOCOL
|
||||
|
||||
def __init__(self,
|
||||
request_seq=0, # VSC requests to ptvsd
|
||||
response_seq=0, # ptvsd responses/events to VSC
|
||||
event_seq=None,
|
||||
):
|
||||
self.counters = MessageCounters(
|
||||
request_seq,
|
||||
response_seq,
|
||||
event_seq,
|
||||
)
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.counters, name)
|
||||
|
||||
def new_request(self, command, seq=None, **args):
|
||||
"""Return a new VSC request message."""
|
||||
if seq is None:
|
||||
seq = self.counters.next_request()
|
||||
return {
|
||||
'type': 'request',
|
||||
'seq': seq,
|
||||
'command': command,
|
||||
'arguments': args,
|
||||
}
|
||||
|
||||
def new_response(self, req, seq=None, **body):
|
||||
"""Return a new VSC response message."""
|
||||
return self._new_response(req, None, seq, body)
|
||||
|
||||
def new_failure(self, req, err, seq=None, **body):
|
||||
"""Return a new VSC response message."""
|
||||
return self._new_response(req, err, body=body)
|
||||
|
||||
def _new_response(self, req, err=None, seq=None, body=None):
|
||||
if seq is None:
|
||||
seq = self.counters.next_response()
|
||||
return {
|
||||
'type': 'response',
|
||||
'seq': seq,
|
||||
'request_seq': req['seq'],
|
||||
'command': req['command'],
|
||||
'success': err is None,
|
||||
'message': err or '',
|
||||
'body': body,
|
||||
}
|
||||
|
||||
def new_event(self, eventname, seq=None, **body):
|
||||
"""Return a new VSC event message."""
|
||||
if seq is None:
|
||||
seq = self.counters.next_event()
|
||||
return {
|
||||
'type': 'event',
|
||||
'seq': seq,
|
||||
'event': eventname,
|
||||
'body': body,
|
||||
}
|
||||
|
||||
|
||||
class PyDevdLifecycle(object):
|
||||
|
||||
def __init__(self, fix):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue