A bunch of cleanup/lint fixes for the system tests. (#625)

The system tests have a number of minor formatting and structural issues that would be better to clean up sooner rather than later. This PR addresses nearly all of them.
This commit is contained in:
Eric Snow 2018-07-09 17:14:57 -06:00 committed by GitHub
parent b25d654987
commit 10af45bccc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 922 additions and 751 deletions

View file

@ -11,7 +11,8 @@ import ptvsd # noqa
from ptvsd._vendored import list_all as vendored
TEST_ROOT = os.path.dirname(__file__) # noqa
TEST_ROOT = os.path.abspath(os.path.dirname(__file__)) # noqa
RESOURCES_ROOT = os.path.join(TEST_ROOT, 'resources') # noqa
PROJECT_ROOT = os.path.dirname(TEST_ROOT) # noqa
VENDORED_ROOTS = vendored(resolve=True) # noqa

View file

@ -1,35 +1,3 @@
def noop(*args, **kwargs):
"""Do nothing."""
class Closeable(object):
def __init__(self):
self._closed = False
def __del__(self):
if not self._closed:
self.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
@property
def closed(self):
return self._closed
def close(self):
if self._closed:
return
self._closed = True
self._close()
# implemented by subclasses
def _close(self):
raise NotImplementedError

View file

@ -4,7 +4,7 @@ import socket
import time
from ptvsd.socket import Address
from . import Closeable
from ptvsd._util import Closeable, ClosedError
from .proc import Proc
from .. import PROJECT_ROOT
@ -263,7 +263,10 @@ class DebugAdapter(Closeable):
def _close(self):
if self._proc is not None:
self._proc.close()
try:
self._proc.close()
except ClosedError:
pass
if self.VERBOSE:
lines = self.output.decode('utf-8').splitlines()
print(' + ' + '\n + '.join(lines))

View file

@ -3,8 +3,7 @@ from __future__ import absolute_import
import warnings
from ptvsd.socket import Address
from ptvsd._util import new_hidden_thread
from . import Closeable
from ptvsd._util import new_hidden_thread, Closeable, ClosedError
from .debugadapter import DebugAdapter, wait_for_socket_server
from .debugsession import DebugSession
@ -13,11 +12,15 @@ from .debugsession import DebugSession
class _LifecycleClient(Closeable):
SESSION = DebugSession
def __init__(self,
addr=None,
port=8888,
breakpoints=None,
connecttimeout=1.0):
connecttimeout=1.0,
):
super(_LifecycleClient, self).__init__()
self._addr = Address.from_raw(addr, defaultport=port)
self._connecttimeout = connecttimeout
@ -51,7 +54,11 @@ class _LifecycleClient(Closeable):
if self._session is not None:
self._detach()
self._adapter.close()
try:
self._adapter.close()
except ClosedError:
pass
self._adapter = None
def attach_pid(self, pid, **kwargs):
@ -98,9 +105,15 @@ class _LifecycleClient(Closeable):
def _close(self):
if self._session is not None:
self._session.close()
try:
self._session.close()
except ClosedError:
pass
if self._adapter is not None:
self._adapter.close()
try:
self._adapter.close()
except ClosedError:
pass
def _launch(self,
argv,
@ -131,11 +144,17 @@ class _LifecycleClient(Closeable):
if addr is None:
addr = self._addr
assert addr.host == 'localhost'
self._session = DebugSession.create_client(addr, **kwargs)
self._session = self.SESSION.create_client(addr, **kwargs)
def _detach(self):
self._session.close()
session = self._session
if session is None:
return
self._session = None
try:
session.close()
except ClosedError:
pass
class DebugClient(_LifecycleClient):
@ -146,6 +165,7 @@ class DebugClient(_LifecycleClient):
class EasyDebugClient(DebugClient):
def start_detached(self, argv):
"""Start an adapter in a background process."""
if self.closed:
@ -172,8 +192,7 @@ class EasyDebugClient(DebugClient):
addr = ('localhost', self._addr.port)
def run():
self._session = DebugSession.create_server(addr, **kwargs)
self._session = self.SESSION.create_server(addr, **kwargs)
t = new_hidden_thread(
target=run,
name='test.client',

View file

@ -8,8 +8,7 @@ import time
import threading
import warnings
from ptvsd._util import new_hidden_thread
from . import Closeable
from ptvsd._util import new_hidden_thread, Closeable, ClosedError
from .message import (
raw_read_all as read_messages,
raw_write_one as write_message
@ -266,7 +265,10 @@ class DebugSession(Closeable):
def _close(self):
if self._owned:
self._conn.close()
try:
self._conn.close()
except ClosedError:
pass
if self._listenerthread != threading.current_thread():
self._listenerthread.join(timeout=1.0)
if self._listenerthread.is_alive():
@ -280,7 +282,10 @@ class DebugSession(Closeable):
print(' ->', msg)
self._receive_message(msg)
except EOFError:
self.close()
try:
self.close()
except ClosedError:
pass
def _receive_message(self, msg):
for i, handler in enumerate(list(self._handlers)):

View file

@ -8,8 +8,7 @@ import subprocess
import sys
import time
from ptvsd._util import new_hidden_thread
from . import Closeable
from ptvsd._util import new_hidden_thread, Closeable
_NOT_SET = object()

19
tests/helpers/resource.py Normal file
View file

@ -0,0 +1,19 @@
import os.path
from tests import RESOURCES_ROOT
from .workspace import ReadonlyFSTree
class TestResources(ReadonlyFSTree):
@classmethod
def from_module(cls, modname):
parts = modname.split('.')
assert parts and parts[0] == 'tests'
root = os.path.join(RESOURCES_ROOT, *parts[1:])
return cls(root)
def __init__(self, root):
root = os.path.abspath(root)
assert root.startswith(RESOURCES_ROOT)
super(TestResources, self).__init__(root)

View file

@ -2,6 +2,7 @@ from __future__ import absolute_import
from collections import namedtuple
import contextlib
import socket
import ptvsd.socket as _ptvsd
@ -9,6 +10,20 @@ import ptvsd.socket as _ptvsd
convert_eof = _ptvsd.convert_eof
def resolve_hostname():
hostname = socket.gethostname()
try:
return socket.gethostbyname(hostname)
except socket.gaierror:
addr = ('8.8.8.8', 80)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
sock.connect(addr)
return sock.getsockname()[0]
finally:
sock.close()
# TODO: Add timeouts to the functions.
def create_server(address):

View file

@ -24,16 +24,14 @@ def _touch(filename):
pass
class Workspace(object):
"""File operations relative to some root directory ("workspace")."""
PREFIX = 'workspace-'
class FSTreeBase(object):
"""File operations relative to some root directory."""
@classmethod
def _new_root(cls):
return tempfile.mkdtemp(prefix=cls.PREFIX)
raise NotImplementedError
def __init__(self, root=None):
def __init__(self, root):
if root is not None:
self._root = root
self._owned = False
@ -47,6 +45,44 @@ class Workspace(object):
self._owned = True
return self._root
@property
def parent(self):
parent = os.path.dirname(self.root)
return FSTreeBase(parent)
def resolve(self, *path):
"""Return the absolute path (relative to the workspace)."""
return os.path.join(self.root, *path)
def sub(self, *path):
cls = type(self)
root = self.resolve(*path)
return cls(root)
def env_with_py_path(self, *path):
return {'PYTHONPATH': self.root}
class ReadonlyFSTree(FSTreeBase):
"""File operations relative to some root directory."""
def __init__(self, root):
assert root
super(ReadonlyFSTree, self).__init__(root)
class Workspace(FSTreeBase):
"""File operations relative to some root directory ("workspace")."""
PREFIX = 'workspace-'
@classmethod
def _new_root(cls):
return tempfile.mkdtemp(prefix=cls.PREFIX)
def __init__(self, root=None):
super(Workspace, self).__init__(root)
def cleanup(self):
"""Release and destroy the workspace."""
if self._owned:
@ -54,10 +90,6 @@ class Workspace(object):
self._owned = False
self._root = None
def resolve(self, *path):
"""Return the absolute path (relative to the workspace)."""
return os.path.join(self.root, *path)
def random(self, *dirpath, **kwargs):
"""Return a random filename resolved to the given directory."""
dirname = self.resolve(*dirpath)

View file

@ -13,7 +13,7 @@ from tests.helpers.message import assert_is_subset
from tests.helpers.script import find_line
from tests.helpers.threading import get_locked_and_waiter
from tests.helpers.workspace import Workspace, PathEntry
from tests.helpers.vsc import parse_message, VSCMessages, Response, Event # noqa
from tests.helpers.vsc import parse_message, VSCMessages, Response, Event
ROOT = os.path.dirname(os.path.dirname(ptvsd.__file__))
@ -232,8 +232,7 @@ class LifecycleTestsBase(TestsBase, unittest.TestCase):
os.kill(pid, signal.SIGTERM)
except Exception:
pass
import time
time.sleep(1) # wait for socket connections to die out. # noqa
time.sleep(1) # wait for socket connections to die out.
def _wrap_and_reraise(ex, session):
messages = []
@ -313,7 +312,7 @@ class LifecycleTestsBase(TestsBase, unittest.TestCase):
_handle_exception(ex, adapter, session)
else:
if debug_info.filename is None:
argv = ["-m", debug_info.modulename] + debug_info.argv
argv = ['-m', debug_info.modulename] + debug_info.argv
else:
argv = [debug_info.filename] + debug_info.argv
with DebugClient(
@ -355,9 +354,10 @@ class LifecycleTestsBase(TestsBase, unittest.TestCase):
return True
except Exception:
return False
return list(
response for response in responses if isinstance(response, Event)
and response.event == event and is_subset(response.body)) # noqa
return list(resp
for resp in responses
if (isinstance(resp, Event) and resp.event == event and
is_subset(resp.body)))
def find_responses(self, responses, command, condition=lambda x: True):
return list(

View file

@ -2,51 +2,50 @@ import os
import os.path
import unittest
from ptvsd.wrapper import INITIALIZE_RESPONSE # noqa
from tests.helpers.debugsession import Awaitable
from tests.helpers.resource import TestResources
from . import (
_strip_newline_output_events, lifecycle_handshake,
LifecycleTestsBase, DebugInfo, PORT,
)
from . import (_strip_newline_output_events, lifecycle_handshake,
LifecycleTestsBase, DebugInfo, ROOT, PORT)
TEST_FILES_DIR = os.path.join(ROOT, 'tests', 'resources', 'system_tests',
'test_basic')
TEST_TERMINATION_FILES_DIR = os.path.join(ROOT, 'tests', 'resources',
'system_tests', 'test_terminate')
TEST_FILES = TestResources.from_module(__name__)
WITH_OUTPUT = TEST_FILES.sub('test_output')
WITHOUT_OUTPUT = TEST_FILES.sub('test_without_output')
WITH_ARGS = TEST_FILES.sub('test_args')
TEST_TERMINATION_FILES = TestResources.from_module(
'tests.system_tests.test_terminate')
class BasicTests(LifecycleTestsBase):
def run_test_output(self, debug_info):
options = {"debugOptions": ["RedirectOutput"]}
options = {'debugOptions': ['RedirectOutput']}
with self.start_debugging(debug_info) as dbg:
(_, _, _, _, _, _) = lifecycle_handshake(
dbg.session, debug_info.starttype, options=options)
lifecycle_handshake(dbg.session, debug_info.starttype,
options=options)
received = list(_strip_newline_output_events(dbg.session.received))
self.assert_contains(
received,
[
self.new_event("output", category="stdout", output="yes"),
self.new_event("output", category="stderr", output="no"),
],
)
self.assert_contains(received, [
self.new_event('output', category='stdout', output='yes'),
self.new_event('output', category='stderr', output='no'),
])
def run_test_arguments(self, debug_info, expected_args):
options = {"debugOptions": ["RedirectOutput"]}
options = {'debugOptions': ['RedirectOutput']}
with self.start_debugging(debug_info) as dbg:
(_, _, _, _, _, _) = lifecycle_handshake(
dbg.session, debug_info.starttype, options=options)
lifecycle_handshake(dbg.session, debug_info.starttype,
options=options)
received = list(_strip_newline_output_events(dbg.session.received))
expected_output = "{}, {}".format(len(expected_args), expected_args)
self.assert_contains(
received,
[
self.new_event(
"output", category="stdout", output=expected_output)
],
)
expected_output = '{}, {}'.format(len(expected_args), expected_args)
self.assert_contains(received, [
self.new_event(
'output', category='stdout', output=expected_output),
])
def run_test_termination(self, debug_info):
with self.start_debugging(debug_info) as dbg:
@ -55,24 +54,23 @@ class BasicTests(LifecycleTestsBase):
exited = session.get_awaiter_for_event('exited')
terminated = session.get_awaiter_for_event('terminated')
(_, req_launch, _, _, _, _) = lifecycle_handshake(
dbg.session, debug_info.starttype, threads=True)
(_, req_launch, _, _, _, _
) = lifecycle_handshake(dbg.session, debug_info.starttype,
threads=True)
Awaitable.wait_all(req_launch,
session.get_awaiter_for_event('thread')) # noqa
disconnect = session.send_request("disconnect")
session.get_awaiter_for_event('thread'))
disconnect = session.send_request('disconnect')
Awaitable.wait_all(exited, terminated, disconnect)
def run_test_without_output(self, debug_info):
options = {"debugOptions": ["RedirectOutput"]}
options = {'debugOptions': ['RedirectOutput']}
with self.start_debugging(debug_info) as dbg:
(_, _, _, _, _, _) = lifecycle_handshake(
dbg.session,
debug_info.starttype,
options=options,
threads=True)
lifecycle_handshake(dbg.session, debug_info.starttype,
options=options,
threads=True)
received = list(_strip_newline_output_events(dbg.session.received))
@ -82,91 +80,112 @@ class BasicTests(LifecycleTestsBase):
class LaunchFileTests(BasicTests):
def test_with_output(self):
filename = os.path.join(TEST_FILES_DIR, 'test_output', 'output.py')
filename = WITH_OUTPUT.resolve('output.py')
cwd = os.path.dirname(filename)
self.run_test_output(DebugInfo(filename=filename, cwd=cwd))
def test_arguments(self):
filename = os.path.join(TEST_FILES_DIR, 'test_args',
'launch_with_args.py')
filename = WITH_ARGS.resolve('launch_with_args.py')
cwd = os.path.dirname(filename)
argv = ['arg1', 'arg2']
self.run_test_arguments(
DebugInfo(filename=filename, cwd=cwd, argv=argv),
[filename] + argv)
[filename] + argv,
)
@unittest.skip('Broken')
def test_termination(self):
filename = os.path.join(TEST_TERMINATION_FILES_DIR, 'simple.py')
filename = TEST_TERMINATION_FILES.resolve('simple.py')
cwd = os.path.dirname(filename)
self.run_test_termination(DebugInfo(filename=filename, cwd=cwd))
self.run_test_termination(
DebugInfo(filename=filename, cwd=cwd),
)
def test_without_output(self):
filename = os.path.join(TEST_FILES_DIR, 'test_without_output',
'output.py')
filename = WITHOUT_OUTPUT.resolve('output.py')
cwd = os.path.dirname(filename)
self.run_test_without_output(DebugInfo(filename=filename, cwd=cwd))
self.run_test_without_output(
DebugInfo(filename=filename, cwd=cwd),
)
class LaunchModuleTests(BasicTests):
def test_with_output(self):
module_name = 'mymod_launch1'
cwd = os.path.join(TEST_FILES_DIR, 'test_output')
env = {"PYTHONPATH": cwd}
cwd = WITH_OUTPUT.root
env = WITH_OUTPUT.env_with_py_path()
self.run_test_output(
DebugInfo(modulename=module_name, env=env, cwd=cwd))
DebugInfo(modulename=module_name, env=env, cwd=cwd),
)
def test_without_output(self):
module_name = 'mymod_launch1'
cwd = os.path.join(TEST_FILES_DIR, 'test_without_output')
env = {"PYTHONPATH": cwd}
cwd = WITHOUT_OUTPUT.root
env = WITHOUT_OUTPUT.env_with_py_path()
self.run_test_without_output(
DebugInfo(modulename=module_name, env=env, cwd=cwd))
DebugInfo(modulename=module_name, env=env, cwd=cwd),
)
@unittest.skip('Broken')
def test_termination(self):
module_name = 'mymod_launch1'
cwd = TEST_TERMINATION_FILES_DIR
env = {"PYTHONPATH": cwd}
cwd = TEST_TERMINATION_FILES.root
env = TEST_TERMINATION_FILES.env_with_py_path()
self.run_test_output(
DebugInfo(modulename=module_name, env=env, cwd=cwd))
self.run_test_termination(DebugInfo(modulename=module_name, cwd=cwd))
DebugInfo(modulename=module_name, env=env, cwd=cwd),
)
self.run_test_termination(
DebugInfo(modulename=module_name, cwd=cwd),
)
@unittest.skip('Broken')
def test_arguments(self):
module_name = 'mymod_launch1'
cwd = os.path.join(TEST_FILES_DIR, 'test_args')
env = {"PYTHONPATH": cwd}
cwd = WITH_ARGS.root
env = WITH_ARGS.env_with_py_path()
argv = ['arg1', 'arg2']
self.run_test_arguments(
DebugInfo(modulename=module_name, env=env, cwd=cwd, argv=argv),
['-m'] + argv)
['-m'] + argv,
)
class ServerAttachTests(BasicTests):
def test_with_output(self):
filename = os.path.join(TEST_FILES_DIR, 'test_output', 'output.py')
filename = WITH_OUTPUT.resolve('output.py')
cwd = os.path.dirname(filename)
argv = ['localhost', str(PORT)]
self.run_test_output(
DebugInfo(
filename=filename, cwd=cwd, starttype='attach', argv=argv))
filename=filename,
cwd=cwd,
starttype='attach',
argv=argv,
),
)
def test_without_output(self):
filename = os.path.join(TEST_FILES_DIR, 'test_without_output',
'output.py')
filename = WITHOUT_OUTPUT.resolve('output.py')
cwd = os.path.dirname(filename)
argv = ['localhost', str(PORT)]
self.run_test_without_output(
DebugInfo(
filename=filename, cwd=cwd, starttype='attach', argv=argv))
filename=filename,
cwd=cwd,
starttype='attach',
argv=argv,
),
)
class PTVSDAttachTests(BasicTests):
def test_with_output(self):
filename = os.path.join(TEST_FILES_DIR, 'test_output',
'attach_output.py')
filename = WITH_OUTPUT.resolve('attach_output.py')
cwd = os.path.dirname(filename)
argv = ['localhost', str(PORT)]
self.run_test_output(
@ -175,11 +194,12 @@ class PTVSDAttachTests(BasicTests):
attachtype='import',
cwd=cwd,
starttype='attach',
argv=argv))
argv=argv,
),
)
def test_without_output(self):
filename = os.path.join(TEST_FILES_DIR, 'test_without_output',
'attach_output.py')
filename = WITHOUT_OUTPUT.resolve('attach_output.py')
cwd = os.path.dirname(filename)
argv = ['localhost', str(PORT)]
self.run_test_without_output(
@ -188,14 +208,17 @@ class PTVSDAttachTests(BasicTests):
attachtype='import',
cwd=cwd,
starttype='attach',
argv=argv))
argv=argv,
),
)
class ServerAttachModuleTests(BasicTests): # noqa
class ServerAttachModuleTests(BasicTests):
def test_with_output(self):
module_name = 'mymod_launch1'
cwd = os.path.join(TEST_FILES_DIR, 'test_output')
env = {"PYTHONPATH": cwd}
cwd = WITH_OUTPUT.root
env = WITH_OUTPUT.env_with_py_path()
argv = ['localhost', str(PORT)]
self.run_test_output(
DebugInfo(
@ -203,12 +226,14 @@ class ServerAttachModuleTests(BasicTests): # noqa
env=env,
cwd=cwd,
argv=argv,
starttype='attach'))
starttype='attach',
),
)
def test_without_output(self):
module_name = 'mymod_launch1'
cwd = os.path.join(TEST_FILES_DIR, 'test_without_output')
env = {"PYTHONPATH": cwd}
cwd = WITHOUT_OUTPUT.root
env = WITHOUT_OUTPUT.env_with_py_path()
argv = ['localhost', str(PORT)]
self.run_test_without_output(
DebugInfo(
@ -216,7 +241,9 @@ class ServerAttachModuleTests(BasicTests): # noqa
env=env,
cwd=cwd,
argv=argv,
starttype='attach'))
starttype='attach',
),
)
class PTVSDAttachModuleTests(BasicTests):
@ -224,8 +251,8 @@ class PTVSDAttachModuleTests(BasicTests):
def test_with_output(self):
#self.enable_verbose()
module_name = 'mymod_attach1'
cwd = os.path.join(TEST_FILES_DIR, 'test_output')
env = {"PYTHONPATH": cwd}
cwd = WITH_OUTPUT.root
env = WITH_OUTPUT.env_with_py_path()
argv = ['localhost', str(PORT)]
self.run_test_output(
DebugInfo(
@ -234,12 +261,14 @@ class PTVSDAttachModuleTests(BasicTests):
cwd=cwd,
argv=argv,
attachtype='import',
starttype='attach'))
starttype='attach',
),
)
def test_without_output(self):
module_name = 'mymod_attach1'
cwd = os.path.join(TEST_FILES_DIR, 'test_without_output')
env = {"PYTHONPATH": cwd}
cwd = WITHOUT_OUTPUT.root
env = WITHOUT_OUTPUT.env_with_py_path()
argv = ['localhost', str(PORT)]
self.run_test_without_output(
DebugInfo(
@ -248,4 +277,6 @@ class PTVSDAttachModuleTests(BasicTests):
cwd=cwd,
argv=argv,
attachtype='import',
starttype='attach'))
starttype='attach',
),
)

View file

@ -2,201 +2,200 @@ import os
import os.path
import unittest
from ptvsd.wrapper import INITIALIZE_RESPONSE # noqa
from tests.helpers.resource import TestResources
from . import (
_strip_newline_output_events, lifecycle_handshake,
LifecycleTestsBase, DebugInfo, PORT,
)
from . import (_strip_newline_output_events, lifecycle_handshake,
LifecycleTestsBase, DebugInfo, ROOT, PORT)
TEST_FILES_DIR = os.path.join(ROOT, 'tests', 'resources', 'system_tests',
'test_breakpoints')
TEST_FILES = TestResources.from_module(__name__)
class BreakpointTests(LifecycleTestsBase):
def run_test_with_break_points(self, debug_info, bp_filename, bp_line):
options = {"debugOptions": ["RedirectOutput"]}
options = {'debugOptions': ['RedirectOutput']}
breakpoints = [{
"source": {
"path": bp_filename
'source': {
'path': bp_filename
},
"breakpoints": [{
"line": bp_line
'breakpoints': [{
'line': bp_line
}]
}]
with self.start_debugging(debug_info) as dbg:
session = dbg.session
with session.wait_for_event("stopped") as result:
(
_,
req_launch_attach,
_,
reqs_bps,
_,
_,
) = lifecycle_handshake(
session,
debug_info.starttype,
options=options,
breakpoints=breakpoints)
with session.wait_for_event('stopped') as result:
(_, req_launch_attach, _, _, _, _,
) = lifecycle_handshake(session, debug_info.starttype,
options=options,
breakpoints=breakpoints)
req_launch_attach.wait()
event = result['msg']
tid = event.body['threadId']
req_bps, = reqs_bps # There should only be one.
tid = result["msg"].body["threadId"]
stacktrace = session.send_request("stackTrace", threadId=tid)
stacktrace.wait()
session.send_request("continue", threadId=tid)
req_stacktrace = session.send_request(
'stackTrace',
threadId=tid,
)
req_stacktrace.wait()
stacktrace = req_stacktrace.resp.body
session.send_request(
'continue',
threadId=tid,
)
received = list(_strip_newline_output_events(session.received))
self.assertGreaterEqual(stacktrace.resp.body["totalFrames"], 1)
self.assert_is_subset(
stacktrace.resp.body,
{
# We get Python and PTVSD frames as well.
# "totalFrames": 2,
"stackFrames": [{
"id": 1,
"name": "<module>",
"source": {
"sourceReference": 0
},
"line": bp_line,
"column": 1,
}],
})
self.assertGreaterEqual(stacktrace['totalFrames'], 1)
self.assert_is_subset(stacktrace, {
# We get Python and PTVSD frames as well.
# 'totalFrames': 2,
'stackFrames': [{
'id': 1,
'name': '<module>',
'source': {
'sourceReference': 0
},
'line': bp_line,
'column': 1,
}],
})
self.assert_contains(
received,
[
self.new_event(
"stopped",
reason="breakpoint",
threadId=tid,
text=None,
description=None,
),
self.new_event("continued", threadId=tid),
self.new_event("output", category="stdout", output="yes"),
self.new_event("output", category="stderr", output="no"),
self.new_event("exited", exitCode=0),
self.new_event("terminated"),
],
)
self.assert_contains(received, [
self.new_event(
'stopped',
reason='breakpoint',
threadId=tid,
text=None,
description=None,
),
self.new_event('continued', threadId=tid),
self.new_event('output', category='stdout', output='yes'),
self.new_event('output', category='stderr', output='no'),
self.new_event('exited', exitCode=0),
self.new_event('terminated'),
])
def run_test_with_break_points_across_files(
self, debug_info, first_file, second_file, second_file_line,
expected_modules, expected_stacktrace):
breakpoints = [{
"source": {
"path": second_file
'source': {
'path': second_file
},
"breakpoints": [{
"line": second_file_line
'breakpoints': [{
'line': second_file_line
}]
}]
with self.start_debugging(debug_info) as dbg:
session = dbg.session
with session.wait_for_event("stopped") as result:
(
_,
req_launch_attach,
_,
_,
_,
_,
) = lifecycle_handshake(
session, debug_info.starttype, breakpoints=breakpoints)
with session.wait_for_event('stopped') as result:
(_, req_launch_attach, _, _, _, _,
) = lifecycle_handshake(session, debug_info.starttype,
breakpoints=breakpoints)
req_launch_attach.wait()
event = result['msg']
tid = event.body['threadId']
tid = result["msg"].body["threadId"]
stacktrace = session.send_request("stackTrace", threadId=tid)
stacktrace.wait()
session.send_request("continue", threadId=tid)
req_stacktrace = session.send_request(
'stackTrace',
threadId=tid,
)
req_stacktrace.wait()
stacktrace = req_stacktrace.resp.body
session.send_request('continue', threadId=tid)
received = list(_strip_newline_output_events(session.received))
for mod in expected_modules:
found_mod = self.find_events(received, 'module', mod)
self.assertEqual(
len(found_mod), 1, 'Modul not found {}'.format(mod))
self.assertEqual(len(found_mod),
1,
'Modul not found {}'.format(mod))
self.assert_is_subset(stacktrace.resp, expected_stacktrace)
self.assert_is_subset(stacktrace, expected_stacktrace)
def run_test_conditional_break_points(self, debug_info):
breakpoints = [{
"source": {
"path": debug_info.filename
'source': {
'path': debug_info.filename
},
"breakpoints": [{
"line": 4,
"condition": "i == 2"
'breakpoints': [{
'line': 4,
'condition': 'i == 2'
}],
"lines": [4]
'lines': [4]
}]
with self.start_debugging(debug_info) as dbg:
session = dbg.session
with session.wait_for_event("stopped") as result:
(
_,
_,
_,
_,
_,
_,
) = lifecycle_handshake(
session, debug_info.starttype, breakpoints=breakpoints)
with session.wait_for_event('stopped') as result:
lifecycle_handshake(session, debug_info.starttype,
breakpoints=breakpoints)
event = result['msg']
tid = event.body['threadId']
tid = result["msg"].body["threadId"]
stacktrace = session.send_request("stackTrace", threadId=tid)
stacktrace.wait()
req_stacktrace = session.send_request(
'stackTrace',
threadId=tid,
)
req_stacktrace.wait()
frames = req_stacktrace.resp.body['stackFrames']
frame_id = frames[0]['id']
req_scopes = session.send_request(
'scopes',
frameId=frame_id,
)
req_scopes.wait()
scopes = req_scopes.resp.body['scopes']
variables_reference = scopes[0]['variablesReference']
req_variables = session.send_request(
'variables',
variablesReference=variables_reference,
)
req_variables.wait()
variables = req_variables.resp.body['variables']
frame_id = stacktrace.resp.body["stackFrames"][0]["id"]
scopes = session.send_request('scopes', frameId=frame_id)
scopes.wait()
variables_reference = scopes.resp.body["scopes"][0][
"variablesReference"]
variables = session.send_request(
'variables', variablesReference=variables_reference)
variables.wait()
session.send_request("continue", threadId=tid)
session.send_request('continue', threadId=tid)
self.assert_is_subset(variables.resp.body["variables"],
[{
"name": "a",
"type": "int",
"value": "1",
"evaluateName": "a"
}, {
"name": "b",
"type": "int",
"value": "2",
"evaluateName": "b"
}, {
"name": "c",
"type": "int",
"value": "1",
"evaluateName": "c"
}, {
"name": "i",
"type": "int",
"value": "2",
"evaluateName": "i"
}])
self.assert_is_subset(variables, [{
'name': 'a',
'type': 'int',
'value': '1',
'evaluateName': 'a'
}, {
'name': 'b',
'type': 'int',
'value': '2',
'evaluateName': 'b'
}, {
'name': 'c',
'type': 'int',
'value': '1',
'evaluateName': 'c'
}, {
'name': 'i',
'type': 'int',
'value': '2',
'evaluateName': 'i'
}])
def run_test_hit_conditional_break_points(self, debug_info, **kwargs):
breakpoints = [{
"source": {
"path": debug_info.filename
'source': {
'path': debug_info.filename
},
"breakpoints": [{
"line": 4,
"hitCondition": kwargs['hit_condition']
'breakpoints': [{
'line': 4,
'hitCondition': kwargs['hit_condition']
}],
"lines": [4]
'lines': [4]
}]
i_values = []
@ -206,76 +205,69 @@ class BreakpointTests(LifecycleTestsBase):
count = 0
while count < hits:
if count == 0:
with session.wait_for_event("stopped") as result:
(
_,
_,
_,
_,
_,
_,
) = lifecycle_handshake(
session, debug_info.starttype,
breakpoints=breakpoints)
with session.wait_for_event('stopped') as result:
lifecycle_handshake(session, debug_info.starttype,
breakpoints=breakpoints)
event = result['msg']
tid = event.body['threadId']
tid = result["msg"].body["threadId"]
stacktrace = session.send_request("stackTrace", threadId=tid)
stacktrace.wait()
frame_id = stacktrace.resp.body["stackFrames"][0]["id"]
scopes = session.send_request('scopes', frameId=frame_id)
scopes.wait()
variables_reference = scopes.resp.body["scopes"][0][
"variablesReference"]
variables = session.send_request(
'variables', variablesReference=variables_reference)
variables.wait()
req_stacktrace = session.send_request(
'stackTrace',
threadId=tid,
)
req_stacktrace.wait()
frames = req_stacktrace.resp.body['stackFrames']
frame_id = frames[0]['id']
req_scopes = session.send_request(
'scopes',
frameId=frame_id,
)
req_scopes.wait()
scopes = req_scopes.resp.body['scopes']
variables_reference = scopes[0]['variablesReference']
req_variables = session.send_request(
'variables',
variablesReference=variables_reference,
)
req_variables.wait()
variables = req_variables.resp.body['variables']
i_value = list(int(v['value'])
for v in variables.resp.body["variables"]
for v in variables
if v['name'] == 'i')
i_values.append(i_value[0] if len(i_value) > 0 else None)
count = count + 1
if count < hits:
with session.wait_for_event("stopped") as result:
session.send_request("continue", threadId=tid)
with session.wait_for_event('stopped') as result:
session.send_request('continue', threadId=tid)
else:
session.send_request("continue", threadId=tid)
session.send_request('continue', threadId=tid)
self.assertEqual(i_values, kwargs['expected'])
def run_test_logpoints(self, debug_info):
options = {"debugOptions": ["RedirectOutput"]}
options = {'debugOptions': ['RedirectOutput']}
breakpoints = [{
"source": {
"path": debug_info.filename
'source': {
'path': debug_info.filename
},
"breakpoints": [{
"line": 4,
"logMessage": "Sum of a + i = {a + i}"
'breakpoints': [{
'line': 4,
'logMessage': 'Sum of a + i = {a + i}'
}],
"lines": [4]
'lines': [4]
}]
with self.start_debugging(debug_info) as dbg:
session = dbg.session
(
_,
_,
_,
_,
_,
_,
) = lifecycle_handshake(
session,
debug_info.starttype,
options=options,
breakpoints=breakpoints)
lifecycle_handshake(session, debug_info.starttype,
options=options,
breakpoints=breakpoints)
received = list(_strip_newline_output_events(session.received))
expected_events = [
self.new_event(
"output",
category="stdout",
output="Sum of a + i = {}{}".format(i + 1, os.linesep))
'output',
category='stdout',
output='Sum of a + i = {}{}'.format(i + 1, os.linesep))
for i in range(5)
]
@ -283,215 +275,246 @@ class BreakpointTests(LifecycleTestsBase):
class LaunchFileTests(BreakpointTests):
def test_with_break_points(self):
filename = os.path.join(TEST_FILES_DIR, 'output.py')
filename = TEST_FILES.resolve('output.py')
cwd = os.path.dirname(filename)
self.run_test_with_break_points(
DebugInfo(filename=filename, cwd=cwd), filename, bp_line=3)
DebugInfo(filename=filename, cwd=cwd),
filename,
bp_line=3,
)
def test_with_break_points_across_files(self):
first_file = os.path.join(TEST_FILES_DIR, 'foo.py')
second_file = os.path.join(TEST_FILES_DIR, 'bar.py')
first_file = TEST_FILES.resolve('foo.py')
second_file = TEST_FILES.resolve('bar.py')
cwd = os.path.dirname(first_file)
expected_modules = [{
"reason": "new",
"module": {
"path": second_file,
"name": "bar"
'reason': 'new',
'module': {
'path': second_file,
'name': 'bar'
}
}, {
"reason": "new",
"module": {
"path": first_file,
"name": "__main__"
'reason': 'new',
'module': {
'path': first_file,
'name': '__main__'
}
}]
expected_stacktrace = {
"stackFrames": [{
"name": "do_bar",
"source": {
"path": second_file,
"sourceReference": 0
'stackFrames': [{
'name': 'do_bar',
'source': {
'path': second_file,
'sourceReference': 0
},
"line": 2,
"column": 1
'line': 2,
'column': 1
}, {
"name": "do_foo",
"source": {
"path": first_file,
"sourceReference": 0
'name': 'do_foo',
'source': {
'path': first_file,
'sourceReference': 0
},
"line": 5,
"column": 1
'line': 5,
'column': 1
}, {
"id": 3,
"name": "<module>",
"source": {
"path": first_file,
"sourceReference": 0
'id': 3,
'name': '<module>',
'source': {
'path': first_file,
'sourceReference': 0
},
"line": 8,
"column": 1
'line': 8,
'column': 1
}],
}
self.run_test_with_break_points_across_files(
DebugInfo(filename=first_file, cwd=cwd), first_file, second_file,
2, expected_modules, expected_stacktrace)
DebugInfo(filename=first_file, cwd=cwd),
first_file,
second_file,
2,
expected_modules,
expected_stacktrace,
)
def test_conditional_break_points(self):
filename = os.path.join(TEST_FILES_DIR, 'loopy.py')
filename = TEST_FILES.resolve('loopy.py')
cwd = os.path.dirname(filename)
self.run_test_conditional_break_points(
DebugInfo(filename=filename, cwd=cwd))
def test_logpoints(self):
filename = os.path.join(TEST_FILES_DIR, 'loopy.py')
filename = TEST_FILES.resolve('loopy.py')
cwd = os.path.dirname(filename)
self.run_test_logpoints(DebugInfo(filename=filename, cwd=cwd))
self.run_test_logpoints(
DebugInfo(filename=filename, cwd=cwd))
def test_hit_conditional_break_points_equal(self):
filename = os.path.join(TEST_FILES_DIR, 'loopy.py')
filename = TEST_FILES.resolve('loopy.py')
cwd = os.path.dirname(filename)
self.run_test_hit_conditional_break_points(
DebugInfo(filename=filename, cwd=cwd),
hit_condition='== 5',
hits=1,
expected=[4])
expected=[4],
)
def test_hit_conditional_break_points_equal2(self):
filename = os.path.join(TEST_FILES_DIR, 'loopy.py')
filename = TEST_FILES.resolve('loopy.py')
cwd = os.path.dirname(filename)
self.run_test_hit_conditional_break_points(
DebugInfo(filename=filename, cwd=cwd),
hit_condition='5',
hits=1,
expected=[4])
expected=[4],
)
def test_hit_conditional_break_points_greater(self):
filename = os.path.join(TEST_FILES_DIR, 'loopy.py')
filename = TEST_FILES.resolve('loopy.py')
cwd = os.path.dirname(filename)
self.run_test_hit_conditional_break_points(
DebugInfo(filename=filename, cwd=cwd),
hit_condition='> 5',
hits=5,
expected=[5, 6, 7, 8, 9])
expected=[5, 6, 7, 8, 9],
)
def test_hit_conditional_break_points_greater_or_equal(self):
filename = os.path.join(TEST_FILES_DIR, 'loopy.py')
filename = TEST_FILES.resolve('loopy.py')
cwd = os.path.dirname(filename)
self.run_test_hit_conditional_break_points(
DebugInfo(filename=filename, cwd=cwd),
hit_condition='>= 5',
hits=6,
expected=[4, 5, 6, 7, 8, 9])
expected=[4, 5, 6, 7, 8, 9],
)
def test_hit_conditional_break_points_lesser(self):
filename = os.path.join(TEST_FILES_DIR, 'loopy.py')
filename = TEST_FILES.resolve('loopy.py')
cwd = os.path.dirname(filename)
self.run_test_hit_conditional_break_points(
DebugInfo(filename=filename, cwd=cwd),
hit_condition='< 5',
hits=4,
expected=[0, 1, 2, 3])
expected=[0, 1, 2, 3],
)
def test_hit_conditional_break_points_lesser_or_equal(self):
filename = os.path.join(TEST_FILES_DIR, 'loopy.py')
filename = TEST_FILES.resolve('loopy.py')
cwd = os.path.dirname(filename)
self.run_test_hit_conditional_break_points(
DebugInfo(filename=filename, cwd=cwd),
hit_condition='<= 5',
hits=5,
expected=[0, 1, 2, 3, 4])
expected=[0, 1, 2, 3, 4],
)
def test_hit_conditional_break_points_mod(self):
filename = os.path.join(TEST_FILES_DIR, 'loopy.py')
filename = TEST_FILES.resolve('loopy.py')
cwd = os.path.dirname(filename)
self.run_test_hit_conditional_break_points(
DebugInfo(filename=filename, cwd=cwd),
hit_condition='% 4',
hits=2,
expected=[3, 7])
expected=[3, 7],
)
class LaunchModuleTests(BreakpointTests):
def test_with_break_points(self):
module_name = 'mymod_launch1'
cwd = os.path.join(TEST_FILES_DIR)
env = {"PYTHONPATH": cwd}
env = TEST_FILES.env_with_py_path()
cwd = TEST_FILES.root
bp_filename = os.path.join(cwd, module_name, '__init__.py')
self.run_test_with_break_points(
DebugInfo(modulename=module_name, env=env, cwd=cwd),
bp_filename,
bp_line=3)
bp_line=3,
)
def test_with_break_points_across_files(self):
module_name = 'mymod_foo'
first_file = os.path.join(TEST_FILES_DIR, module_name, '__init__.py')
second_file = os.path.join(TEST_FILES_DIR, 'mymod_bar', 'bar.py')
cwd = os.path.join(TEST_FILES_DIR)
env = {"PYTHONPATH": cwd}
first_file = TEST_FILES.resolve(module_name, '__init__.py')
second_file = TEST_FILES.resolve('mymod_bar', 'bar.py')
env = TEST_FILES.env_with_py_path()
cwd = TEST_FILES.root
expected_modules = [{
"reason": "new",
"module": {
"package": "mymod_bar",
"path": second_file,
"name": "mymod_bar.bar"
'reason': 'new',
'module': {
'package': 'mymod_bar',
'path': second_file,
'name': 'mymod_bar.bar'
}
}, {
"reason": "new",
"module": {
"path": first_file,
"name": "__main__"
'reason': 'new',
'module': {
'path': first_file,
'name': '__main__'
}
}]
expected_stacktrace = {
"stackFrames": [{
"name": "do_bar",
"source": {
"path": second_file,
"sourceReference": 0
'stackFrames': [{
'name': 'do_bar',
'source': {
'path': second_file,
'sourceReference': 0
},
"line": 2,
"column": 1
'line': 2,
'column': 1
}, {
"name": "do_foo",
"source": {
"path": first_file,
"sourceReference": 0
'name': 'do_foo',
'source': {
'path': first_file,
'sourceReference': 0
},
"line": 5,
"column": 1
'line': 5,
'column': 1
}, {
"id": 3,
"name": "<module>",
"source": {
"path": first_file,
"sourceReference": 0
'id': 3,
'name': '<module>',
'source': {
'path': first_file,
'sourceReference': 0
},
"line": 8,
"column": 1
'line': 8,
'column': 1
}],
}
self.run_test_with_break_points_across_files(
DebugInfo(modulename=module_name, cwd=cwd, env=env), first_file,
second_file, 2, expected_modules, expected_stacktrace)
DebugInfo(modulename=module_name, cwd=cwd, env=env),
first_file,
second_file,
2,
expected_modules,
expected_stacktrace,
)
class ServerAttachTests(BreakpointTests):
def test_with_break_points(self):
filename = os.path.join(TEST_FILES_DIR, 'output.py')
filename = TEST_FILES.resolve('output.py')
cwd = os.path.dirname(filename)
argv = ['localhost', str(PORT)]
self.run_test_with_break_points(
DebugInfo(
filename=filename, cwd=cwd, starttype='attach', argv=argv),
filename=filename,
cwd=cwd,
starttype='attach',
argv=argv,
),
filename,
bp_line=3)
bp_line=3,
)
class PTVSDAttachTests(BreakpointTests):
def test_with_break_points(self):
filename = os.path.join(TEST_FILES_DIR, 'attach_output.py')
filename = TEST_FILES.resolve('attach_output.py')
cwd = os.path.dirname(filename)
argv = ['localhost', str(PORT)]
self.run_test_with_break_points(
@ -500,16 +523,19 @@ class PTVSDAttachTests(BreakpointTests):
attachtype='import',
cwd=cwd,
starttype='attach',
argv=argv),
argv=argv,
),
filename,
bp_line=6)
bp_line=6,
)
class ServerAttachModuleTests(BreakpointTests): # noqa
class ServerAttachModuleTests(BreakpointTests):
def test_with_break_points(self):
module_name = 'mymod_launch1'
cwd = os.path.join(TEST_FILES_DIR)
env = {"PYTHONPATH": cwd}
env = TEST_FILES.env_with_py_path()
cwd = TEST_FILES.root
argv = ['localhost', str(PORT)]
bp_filename = os.path.join(cwd, module_name, '__init__.py')
self.run_test_with_break_points(
@ -518,17 +544,20 @@ class ServerAttachModuleTests(BreakpointTests): # noqa
env=env,
cwd=cwd,
argv=argv,
starttype='attach'),
starttype='attach',
),
bp_filename,
bp_line=3)
bp_line=3,
)
@unittest.skip('Needs fixing')
class PTVSDAttachModuleTests(BreakpointTests): # noqa
class PTVSDAttachModuleTests(BreakpointTests):
def test_with_break_points(self):
module_name = 'mymod_attach1'
cwd = os.path.join(TEST_FILES_DIR)
env = {"PYTHONPATH": cwd}
env = TEST_FILES.env_with_py_path()
cwd = TEST_FILES.root
argv = ['localhost', str(PORT)]
bp_filename = os.path.join(cwd, module_name, '__init__.py')
self.run_test_with_break_points(
@ -538,6 +567,8 @@ class PTVSDAttachModuleTests(BreakpointTests): # noqa
cwd=cwd,
argv=argv,
attachtype='import',
starttype='attach'),
starttype='attach',
),
bp_filename,
bp_line=6)
bp_line=6,
)

View file

@ -2,140 +2,149 @@ import os
import os.path
import unittest
from ptvsd.wrapper import INITIALIZE_RESPONSE # noqa
from tests.helpers.debugsession import Awaitable
from tests.helpers.resource import TestResources
from . import (
_strip_newline_output_events, lifecycle_handshake,
LifecycleTestsBase, DebugInfo, PORT,
)
from . import (_strip_newline_output_events, lifecycle_handshake,
LifecycleTestsBase, DebugInfo, ROOT, PORT)
TEST_FILES_DIR = os.path.join(ROOT, 'tests', 'resources', 'system_tests',
'test_exceptions')
TEST_FILES = TestResources.from_module(__name__)
class ExceptionTests(LifecycleTestsBase):
def run_test_not_breaking_into_handled_exceptions(self, debug_info):
excbreakpoints = [{"filters": ["uncaught"]}]
options = {"debugOptions": ["RedirectOutput"]}
excbreakpoints = [{'filters': ['uncaught']}]
options = {'debugOptions': ['RedirectOutput']}
with self.start_debugging(debug_info) as dbg:
(_, req_attach, _, _, _, _) = lifecycle_handshake(
dbg.session,
debug_info.starttype,
excbreakpoints=excbreakpoints,
options=options)
(_, req_attach, _, _, _, _
) = lifecycle_handshake(dbg.session, debug_info.starttype,
excbreakpoints=excbreakpoints,
options=options)
received = list(_strip_newline_output_events(dbg.session.received))
self.assert_contains(
received,
[
self.new_event("output", category="stdout", output="end"),
self.new_event("exited", exitCode=0),
self.new_event("terminated"),
],
)
self.assert_contains(received, [
self.new_event('output', category='stdout', output='end'),
self.new_event('exited', exitCode=0),
self.new_event('terminated'),
])
def run_test_breaking_into_handled_exceptions(self, debug_info):
excbreakpoints = [{"filters": ["raised", "uncaught"]}]
options = {"debugOptions": ["RedirectOutput"]}
excbreakpoints = [{'filters': ['raised', 'uncaught']}]
options = {'debugOptions': ['RedirectOutput']}
with self.start_debugging(debug_info) as dbg:
stopped = dbg.session.get_awaiter_for_event('stopped')
(_, req_launch_attach, _, _, _, _) = lifecycle_handshake(
dbg.session,
debug_info.starttype,
excbreakpoints=excbreakpoints,
options=options,
threads=True)
(_, req_launch_attach, _, _, _, _
) = lifecycle_handshake(dbg.session, debug_info.starttype,
excbreakpoints=excbreakpoints,
options=options,
threads=True)
Awaitable.wait_all(req_launch_attach, stopped)
self.assertEqual(stopped.event.body["text"], "ArithmeticError")
self.assertEqual(stopped.event.body['text'], 'ArithmeticError')
self.assertIn("ArithmeticError('Hello'",
stopped.event.body["description"])
stopped.event.body['description'])
thread_id = stopped.event.body["threadId"]
ex_info = dbg.session.send_request(
'exceptionInfo', threadId=thread_id)
ex_info.wait()
thread_id = stopped.event.body['threadId']
req_exc_info = dbg.session.send_request(
'exceptionInfo',
threadId=thread_id,
)
req_exc_info.wait()
exc_info = req_exc_info.resp.body
self.assert_is_subset(
ex_info.resp.body,
{
"exceptionId": "ArithmeticError",
"breakMode": "always",
"details": {
"typeName": "ArithmeticError",
# "source": debug_info.filename
}
})
self.assert_is_subset(exc_info, {
'exceptionId': 'ArithmeticError',
'breakMode': 'always',
'details': {
'typeName': 'ArithmeticError',
# 'source': debug_info.filename
}
})
continued = dbg.session.get_awaiter_for_event('continued')
dbg.session.send_request("continue", threadId=thread_id).wait()
dbg.session.send_request(
'continue',
threadId=thread_id,
).wait()
Awaitable.wait_all(continued)
received = list(_strip_newline_output_events(dbg.session.received))
self.assert_contains(
received,
[
self.new_event("continued", threadId=thread_id),
self.new_event("output", category="stdout", output="end"),
self.new_event("exited", exitCode=0),
self.new_event("terminated"),
],
)
self.assert_contains(received, [
self.new_event('continued', threadId=thread_id),
self.new_event('output', category='stdout', output='end'),
self.new_event('exited', exitCode=0),
self.new_event('terminated'),
])
class LaunchFileTests(ExceptionTests):
def test_not_breaking_into_handled_exceptions(self):
filename = os.path.join(TEST_FILES_DIR, 'handled_exceptions_launch.py')
filename = TEST_FILES.resolve('handled_exceptions_launch.py')
cwd = os.path.dirname(filename)
self.run_test_not_breaking_into_handled_exceptions(
DebugInfo(filename=filename, cwd=cwd))
def test_breaking_into_handled_exceptions(self):
filename = os.path.join(TEST_FILES_DIR, 'handled_exceptions_launch.py')
filename = TEST_FILES.resolve('handled_exceptions_launch.py')
cwd = os.path.dirname(filename)
self.run_test_breaking_into_handled_exceptions(
DebugInfo(filename=filename, cwd=cwd))
class LaunchModuleExceptionLifecycleTests(ExceptionTests):
def test_breaking_into_handled_exceptions(self):
module_name = 'mymod_launch1'
env = {"PYTHONPATH": TEST_FILES_DIR}
cwd = os.path.dirname(TEST_FILES_DIR)
env = TEST_FILES.env_with_py_path()
cwd = TEST_FILES.parent.root
self.run_test_breaking_into_handled_exceptions(
DebugInfo(modulename=module_name, env=env, cwd=cwd))
def test_not_breaking_into_handled_exceptions(self):
module_name = 'mymod_launch1'
env = {"PYTHONPATH": TEST_FILES_DIR}
cwd = os.path.dirname(TEST_FILES_DIR)
env = TEST_FILES.env_with_py_path()
cwd = TEST_FILES.parent.root
self.run_test_not_breaking_into_handled_exceptions(
DebugInfo(modulename=module_name, env=env, cwd=cwd))
class ServerAttachExceptionLifecycleTests(ExceptionTests):
def test_breaking_into_handled_exceptions(self):
filename = os.path.join(TEST_FILES_DIR, 'handled_exceptions_launch.py')
filename = TEST_FILES.resolve('handled_exceptions_launch.py')
cwd = os.path.dirname(filename)
argv = ['localhost', str(PORT)]
self.run_test_breaking_into_handled_exceptions(
DebugInfo(
filename=filename, cwd=cwd, starttype='attach', argv=argv))
filename=filename,
cwd=cwd,
starttype='attach',
argv=argv,
))
def test_not_breaking_into_handled_exceptions(self):
filename = os.path.join(TEST_FILES_DIR, 'handled_exceptions_launch.py')
filename = TEST_FILES.resolve('handled_exceptions_launch.py')
cwd = os.path.dirname(filename)
argv = ['localhost', str(PORT)]
self.run_test_not_breaking_into_handled_exceptions(
DebugInfo(
filename=filename, cwd=cwd, starttype='attach', argv=argv))
filename=filename,
cwd=cwd,
starttype='attach',
argv=argv,
))
class PTVSDAttachExceptionLifecycleTests(ExceptionTests):
def test_breaking_into_handled_exceptions(self):
filename = os.path.join(TEST_FILES_DIR, 'handled_exceptions_attach.py')
filename = TEST_FILES.resolve('handled_exceptions_attach.py')
cwd = os.path.dirname(filename)
argv = ['localhost', str(PORT)]
self.run_test_breaking_into_handled_exceptions(
@ -144,11 +153,12 @@ class PTVSDAttachExceptionLifecycleTests(ExceptionTests):
attachtype='import',
cwd=cwd,
starttype='attach',
argv=argv))
argv=argv,
))
@unittest.skip('Needs fixing in #609')
def test_not_breaking_into_handled_exceptions(self):
filename = os.path.join(TEST_FILES_DIR, 'handled_exceptions_attach.py')
filename = TEST_FILES.resolve('handled_exceptions_attach.py')
cwd = os.path.dirname(filename)
argv = ['localhost', str(PORT)]
self.run_test_not_breaking_into_handled_exceptions(
@ -157,14 +167,16 @@ class PTVSDAttachExceptionLifecycleTests(ExceptionTests):
attachtype='import',
cwd=cwd,
starttype='attach',
argv=argv))
argv=argv,
))
class ServerAttachModuleExceptionLifecycleTests(ExceptionTests): # noqa
class ServerAttachModuleExceptionLifecycleTests(ExceptionTests):
def test_breaking_into_handled_exceptions(self):
module_name = 'mymod_launch1'
env = {"PYTHONPATH": TEST_FILES_DIR}
cwd = TEST_FILES_DIR
env = TEST_FILES.env_with_py_path()
cwd = TEST_FILES.root
argv = ['localhost', str(PORT)]
self.run_test_breaking_into_handled_exceptions(
DebugInfo(
@ -172,12 +184,13 @@ class ServerAttachModuleExceptionLifecycleTests(ExceptionTests): # noqa
env=env,
cwd=cwd,
argv=argv,
starttype='attach'))
starttype='attach',
))
def test_not_breaking_into_handled_exceptions(self):
module_name = 'mymod_launch1'
env = {"PYTHONPATH": TEST_FILES_DIR}
cwd = TEST_FILES_DIR
env = TEST_FILES.env_with_py_path()
cwd = TEST_FILES.root
argv = ['localhost', str(PORT)]
self.run_test_not_breaking_into_handled_exceptions(
DebugInfo(
@ -185,15 +198,17 @@ class ServerAttachModuleExceptionLifecycleTests(ExceptionTests): # noqa
env=env,
cwd=cwd,
argv=argv,
starttype='attach'))
starttype='attach',
))
@unittest.skip('Needs fixing')
class PTVSDAttachModuleExceptionLifecycleTests(ExceptionTests): # noqa
class PTVSDAttachModuleExceptionLifecycleTests(ExceptionTests):
def test_breaking_into_handled_exceptions(self):
module_name = 'mymod_attach1'
env = {"PYTHONPATH": TEST_FILES_DIR}
cwd = TEST_FILES_DIR
env = TEST_FILES.env_with_py_path()
cwd = TEST_FILES.root
argv = ['localhost', str(PORT)]
self.run_test_breaking_into_handled_exceptions(
DebugInfo(
@ -202,12 +217,13 @@ class PTVSDAttachModuleExceptionLifecycleTests(ExceptionTests): # noqa
cwd=cwd,
argv=argv,
attachtype='import',
starttype='attach'))
starttype='attach',
))
def test_not_breaking_into_handled_exceptions(self):
module_name = 'mymod_attach1'
env = {"PYTHONPATH": TEST_FILES_DIR}
cwd = TEST_FILES_DIR
env = TEST_FILES.env_with_py_path()
cwd = TEST_FILES.root
argv = ['localhost', str(PORT)]
self.run_test_not_breaking_into_handled_exceptions(
DebugInfo(
@ -216,4 +232,5 @@ class PTVSDAttachModuleExceptionLifecycleTests(ExceptionTests): # noqa
cwd=cwd,
argv=argv,
attachtype='import',
starttype='attach'))
starttype='attach',
))

View file

@ -5,7 +5,7 @@ import unittest
import ptvsd
from ptvsd.socket import Address
from ptvsd.wrapper import INITIALIZE_RESPONSE # noqa
from ptvsd.wrapper import INITIALIZE_RESPONSE
from tests.helpers.debugadapter import DebugAdapter
from tests.helpers.debugclient import EasyDebugClient as DebugClient
from tests.helpers.lock import LockTimeoutError
@ -123,7 +123,7 @@ class LifecycleTests(LifecycleTestsBase):
done()
adapter.wait()
# Skipping the 'thread exited' and 'terminated' messages which
# Skipping the "thread exited" and "terminated" messages which
# may appear randomly in the received list.
received = list(_strip_newline_output_events(session.received))
self.assert_received(received[:7], [
@ -376,16 +376,18 @@ class LifecycleTests(LifecycleTestsBase):
_, _, req_threads1,
) = lifecycle_handshake(session1, 'attach',
threads=True)
tid1 = result['msg'].body['threadId']
event = result['msg']
tid1 = event.body['threadId']
stopped_event = session1.get_awaiter_for_event('stopped')
req_bps = session1.send_request('setBreakpoints', **{
'source': {'path': filename},
'breakpoints': [
req_bps = session1.send_request(
'setBreakpoints',
source={'path': filename},
breakpoints=[
{'line': bp1},
{'line': bp2},
],
})
)
req_bps.wait()
done1()
@ -417,7 +419,8 @@ class LifecycleTests(LifecycleTestsBase):
_, _, req_threads3,
) = lifecycle_handshake(session2, 'attach',
threads=True)
tid2 = result['msg'].body['threadId']
event = result['msg']
tid2 = event.body['threadId']
done2()
adapter.wait()
@ -542,7 +545,7 @@ class LifecycleTests(LifecycleTestsBase):
@unittest.skip('not implemented')
def test_attach_exit_during_session(self):
# TODO: Ensure we see the "terminated" and "exited" events.
# TODO: Ensure we see the 'terminated' and 'exited' events.
raise NotImplementedError
@unittest.skip('re-attach needs fixing')
@ -616,17 +619,17 @@ class LifecycleTests(LifecycleTestsBase):
}]
options = {
"pathMappings": [
'pathMappings': [
{
"localRoot": os.path.dirname(filename),
"remoteRoot": os.path.dirname(filename)
'localRoot': os.path.dirname(filename),
'remoteRoot': os.path.dirname(filename)
},
# This specific mapping is for Mac.
# For some reason temp paths on Mac get prefixed with
# `private` when returned from ptvsd.
{
"localRoot": os.path.dirname(filename),
"remoteRoot": '/private' + os.path.dirname(filename)
'localRoot': os.path.dirname(filename),
'remoteRoot': '/private' + os.path.dirname(filename)
}
]
}
@ -650,14 +653,15 @@ class LifecycleTests(LifecycleTestsBase):
threads=True)
# Grab the initial output.
out1 = next(adapter.output) # "waiting for attach"
out1 = next(adapter.output) # 'waiting for attach'
line = adapter.output.readline()
while line:
out1 += line
line = adapter.output.readline()
done1()
req_bps, = reqs_bps # There should only be one.
tid = result['msg'].body['threadId']
event = result['msg']
tid = event.body['threadId']
req_threads2 = session.send_request('threads')
req_stacktrace1 = session.send_request(
'stackTrace',

View file

@ -1,36 +1,38 @@
import os
import os.path
import socket
from . import (_strip_newline_output_events, lifecycle_handshake,
LifecycleTestsBase, DebugInfo, ROOT, PORT)
from tests.helpers.resource import TestResources
from tests.helpers.socket import resolve_hostname
from . import (
_strip_newline_output_events, lifecycle_handshake,
LifecycleTestsBase, DebugInfo, PORT,
)
TEST_FILES_DIR = os.path.join(ROOT, 'tests', 'resources', 'system_tests',
'test_basic')
TEST_FILES = TestResources.from_module('tests.system_tests.test_basic')
WITH_OUTPUT = TEST_FILES.sub('test_output')
class RemoteTests(LifecycleTestsBase):
def run_test_attach(self, debug_info):
options = {"debugOptions": ["RedirectOutput"]}
options = {'debugOptions': ['RedirectOutput']}
with self.start_debugging(debug_info) as dbg:
(_, _, _, _, _, _) = lifecycle_handshake(
dbg.session, debug_info.starttype, options=options)
lifecycle_handshake(dbg.session, debug_info.starttype,
options=options)
received = list(_strip_newline_output_events(dbg.session.received))
self.assert_contains(
received,
[
self.new_event("output", category="stdout", output="yes"),
self.new_event("output", category="stderr", output="no"),
],
)
self.assert_contains(received, [
self.new_event('output', category='stdout', output='yes'),
self.new_event('output', category='stderr', output='no'),
])
class AttachFileTests(RemoteTests):
def test_attach_localhost(self):
filename = os.path.join(TEST_FILES_DIR, 'test_output',
'attach_output.py')
filename = WITH_OUTPUT.resolve('attach_output.py')
cwd = os.path.dirname(filename)
argv = ['localhost', str(PORT)]
self.run_test_attach(
@ -39,11 +41,12 @@ class AttachFileTests(RemoteTests):
attachtype='import',
cwd=cwd,
starttype='attach',
argv=argv))
argv=argv,
),
)
def test_attach_127001(self):
filename = os.path.join(TEST_FILES_DIR, 'test_output',
'attach_output.py')
filename = WITH_OUTPUT.resolve('attach_output.py')
cwd = os.path.dirname(filename)
argv = ['127.0.0.1', str(PORT)]
self.run_test_attach(
@ -52,11 +55,12 @@ class AttachFileTests(RemoteTests):
attachtype='import',
cwd=cwd,
starttype='attach',
argv=argv))
argv=argv,
),
)
def test_attach_0000(self):
filename = os.path.join(TEST_FILES_DIR, 'test_output',
'attach_output.py')
filename = WITH_OUTPUT.resolve('attach_output.py')
cwd = os.path.dirname(filename)
argv = ['0.0.0.0', str(PORT)]
self.run_test_attach(
@ -65,14 +69,16 @@ class AttachFileTests(RemoteTests):
attachtype='import',
cwd=cwd,
starttype='attach',
argv=argv))
argv=argv,
),
)
def test_attach_byip(self):
filename = os.path.join(TEST_FILES_DIR, 'test_output',
'attach_output.py')
filename = WITH_OUTPUT.resolve('attach_output.py')
cwd = os.path.dirname(filename)
argv = ['0.0.0.0', str(PORT)]
ip = socket.gethostbyname(socket.gethostname())
ip = resolve_hostname()
self.run_test_attach(
DebugInfo(
filename=filename,
@ -80,4 +86,6 @@ class AttachFileTests(RemoteTests):
host=ip,
cwd=cwd,
starttype='attach',
argv=argv))
argv=argv,
),
)

View file

@ -2,26 +2,27 @@ import os
import os.path
import unittest
from ptvsd.wrapper import INITIALIZE_RESPONSE # noqa
from tests.helpers.resource import TestResources
from . import (
_strip_newline_output_events, lifecycle_handshake,
LifecycleTestsBase, DebugInfo,
)
from . import (_strip_newline_output_events, lifecycle_handshake,
LifecycleTestsBase, DebugInfo, ROOT)
TEST_FILES_DIR = os.path.join(ROOT, 'tests', 'resources', 'system_tests',
'test_forever')
TEST_FILES = TestResources.from_module('tests.system_tests.test_forever')
@unittest.skip('Needs fixing in #530')
class RestartVSCTests(LifecycleTestsBase):
def test_disconnect_without_restart(self):
filename = os.path.join(TEST_FILES_DIR, 'launch_forever.py')
filename = TEST_FILES.resolve('launch_forever.py')
cwd = os.path.dirname(filename)
debug_info = DebugInfo(filename=filename, cwd=cwd)
with self.start_debugging(debug_info) as dbg:
(_, req_launch, _, _, _, _) = lifecycle_handshake(
dbg.session, debug_info.starttype)
(_, req_launch, _, _, _, _
) = lifecycle_handshake(dbg.session, debug_info.starttype)
req_launch.wait()
dbg.session.send_request('disconnect', restart=False)
@ -31,14 +32,13 @@ class RestartVSCTests(LifecycleTestsBase):
self.assertEqual(len(evts), 1)
def test_disconnect_with_restart(self):
filename = os.path.join(TEST_FILES_DIR, 'launch_forever.py')
filename = TEST_FILES.resolve('launch_forever.py')
cwd = os.path.dirname(filename)
debug_info = DebugInfo(filename=filename, cwd=cwd)
with self.start_debugging(debug_info) as dbg:
(_, req_launch, _, _, _, _) = lifecycle_handshake(
dbg.session, debug_info.starttype)
(_, req_launch, _, _, _, _
) = lifecycle_handshake(dbg.session, debug_info.starttype)
req_launch.wait()
dbg.session.send_request('disconnect', restart=True)

View file

@ -124,7 +124,7 @@ class DownloadCommandTests(unittest.TestCase):
metadata = '\n'.join(line
for line in metadata.split('\n')
if not line.startswith('downloaded: '))
self.assertEqual(data, "<a schema>")
self.assertEqual(data, '<a schema>')
self.assertEqual(metadata, dedent("""\
upstream: http://localhost:8000/schema.json
revision: <unknown>

View file

@ -1,230 +1,250 @@
import os
import os.path
from ptvsd.wrapper import INITIALIZE_RESPONSE # noqa
from tests.helpers.debugsession import Awaitable
from tests.helpers.resource import TestResources
from . import (
lifecycle_handshake, LifecycleTestsBase, DebugInfo,
)
from . import (lifecycle_handshake, LifecycleTestsBase, DebugInfo, ROOT)
TEST_FILES_DIR = os.path.join(ROOT, 'tests', 'resources', 'system_tests',
'test_variables')
TEST_FILES = TestResources.from_module(__name__)
class VariableTests(LifecycleTestsBase):
def test_variables(self):
filename = os.path.join(TEST_FILES_DIR, 'simple.py')
filename = TEST_FILES.resolve('simple.py')
cwd = os.path.dirname(filename)
self.run_test_variables(DebugInfo(filename=filename, cwd=cwd))
def run_test_variables(self, debug_info):
bp_line = 3
breakpoints = [{
"source": {
"path": debug_info.filename
'source': {
'path': debug_info.filename
},
"breakpoints": [{
"line": bp_line
'breakpoints': [{
'line': bp_line
}]
}]
with self.start_debugging(debug_info) as dbg:
session = dbg.session
with session.wait_for_event("stopped") as result:
(
_,
req_launch_attach,
_,
reqs_bps,
_,
_,
) = lifecycle_handshake(
session, debug_info.starttype, breakpoints=breakpoints)
with session.wait_for_event('stopped') as result:
(_, req_launch_attach, _, _, _, _,
) = lifecycle_handshake(session, debug_info.starttype,
breakpoints=breakpoints)
req_launch_attach.wait()
event = result['msg']
tid = event.body['threadId']
tid = result["msg"].body["threadId"]
req_stacktrace = session.send_request(
'stackTrace',
threadId=tid,
)
req_stacktrace.wait()
frames = req_stacktrace.resp.body['stackFrames']
frame_id = frames[0]['id']
req_scopes = session.send_request(
'scopes',
frameId=frame_id,
)
req_scopes.wait()
scopes = req_scopes.resp.body['scopes']
variables_reference = scopes[0]['variablesReference']
req_variables = session.send_request(
'variables',
variablesReference=variables_reference,
)
req_variables.wait()
variables = req_variables.resp.body['variables']
stacktrace = session.send_request("stackTrace", threadId=tid)
stacktrace.wait()
frame_id = stacktrace.resp.body["stackFrames"][0]["id"]
scopes = session.send_request('scopes', frameId=frame_id)
scopes.wait()
variables_reference = scopes.resp.body["scopes"][0][
"variablesReference"] # noqa
variables = session.send_request(
'variables', variablesReference=variables_reference) # noqa
variables.wait()
var_b = list(b for b in variables.resp.body["variables"]
if b["name"] == "b") # noqa
var_b = list(b for b in variables if b['name'] == 'b')
var_b = var_b[0] if len(var_b) == 1 else None
if var_b is None:
var_b_variables = None
else:
var_b_ref = var_b["variablesReference"]
var_b_variables = session.send_request(
'variables', variablesReference=var_b_ref) # noqa
var_b_variables.wait()
var_b_ref = var_b['variablesReference']
req_variables = session.send_request(
'variables',
variablesReference=var_b_ref,
)
req_variables.wait()
var_b_variables = req_variables.resp.body['variables']
var_a_evaluate = session.send_request(
'evaluate', expression="a", frameId=frame_id)
var_b_one_evaluate = session.send_request(
req_evaluate1 = session.send_request(
'evaluate',
expression="b['one']", # noqa
frameId=frame_id)
expression='a',
frameId=frame_id,
)
req_evaluate2 = session.send_request(
'evaluate',
expression="b['one']",
frameId=frame_id,
)
Awaitable.wait_all(req_evaluate1, req_evaluate2)
var_a_evaluate = req_evaluate1.resp.body
var_b_one_evaluate = req_evaluate2.resp.body
Awaitable.wait_all(var_a_evaluate, var_b_one_evaluate)
session.send_request("continue", threadId=tid)
session.send_request('continue', threadId=tid)
# Variables for a, b, __file__, __main__
self.assertGreaterEqual(len(variables.resp.body["variables"]), 3)
expected_variables = [{
"name": "a",
"type": "int",
"value": "1",
"evaluateName": "a"
self.assertGreaterEqual(len(variables), 3)
self.assert_is_subset(variables, [{
'name': 'a',
'type': 'int',
'value': '1',
'evaluateName': 'a'
}, {
"name": "b",
"type": "dict",
"value": "{'one': 1, 'two': 2}",
"evaluateName": "b"
'name': 'b',
'type': 'dict',
'value': "{'one': 1, 'two': 2}",
'evaluateName': 'b'
}, {
"name": "__builtins__",
"type": "dict",
"evaluateName": "__builtins__"
'name': '__builtins__',
'type': 'dict',
'evaluateName': '__builtins__'
}, {
"name": "__doc__",
"type": "NoneType",
"value": "None",
"evaluateName": "__doc__"
'name': '__doc__',
'type': 'NoneType',
'value': 'None',
'evaluateName': '__doc__'
}, {
"name": "__file__",
"type": "str",
"presentationHint": {
"attributes": ["rawString"]
'name': '__file__',
'type': 'str',
'presentationHint': {
'attributes': ['rawString']
},
"evaluateName": "__file__"
'evaluateName': '__file__'
}, {
"name": "__loader__",
"type": "SourceFileLoader",
"evaluateName": "__loader__"
'name': '__loader__',
'type': 'SourceFileLoader',
'evaluateName': '__loader__'
}, {
"name": "__name__",
"type": "str",
"value": "'__main__'",
"presentationHint": {
"attributes": ["rawString"]
'name': '__name__',
'type': 'str',
'value': "'__main__'",
'presentationHint': {
'attributes': ['rawString']
},
"evaluateName": "__name__"
'evaluateName': '__name__'
}, {
"name": "__package__",
"type": "NoneType",
"value": "None",
"evaluateName": "__package__"
'name': '__package__',
'type': 'NoneType',
'value': 'None',
'evaluateName': '__package__'
}, {
"name": "__spec__",
"type": "NoneType",
"value": "None",
"evaluateName": "__spec__"
}]
self.assert_is_subset(variables.resp.body["variables"],
expected_variables) # noqa
expected_var_a_eval = {"type": "int", "result": "1"}
var_a_evaluate.resp.body == expected_var_a_eval
'name': '__spec__',
'type': 'NoneType',
'value': 'None',
'evaluateName': '__spec__'
}])
self.assertEqual(var_a_evaluate, {
'type': 'int',
'result': '1',
})
assert var_b_variables is not None
expected_var_b = {
"variables": [{
"type": "int",
"value": "1",
"evaluateName": "b['one']"
}, {
"type": "int",
"value": "2",
"evaluateName": "b['two']"
}, {
"name": "__len__",
"type": "int",
"value": "2",
"evaluateName": "b.__len__"
}]
}
self.assert_is_subset(var_b_variables.resp.body,
expected_var_b) # noqa
self.assert_is_subset(var_b_variables, [{
'type': 'int',
'value': '1',
'evaluateName': "b['one']"
}, {
'type': 'int',
'value': '2',
'evaluateName': "b['two']"
}, {
'name': '__len__',
'type': 'int',
'value': '2',
'evaluateName': 'b.__len__'
}])
expected_var_b_eval = {"type": " int", "result": "1"}
var_b_one_evaluate.resp.body == expected_var_b_eval
self.assertEqual(var_b_one_evaluate, {
'type': 'int',
'result': '1',
})
def test_variable_sorting(self):
filename = os.path.join(TEST_FILES_DIR, 'for_sorting.py')
filename = TEST_FILES.resolve('for_sorting.py')
cwd = os.path.dirname(filename)
self.run_test_variable_sorting(DebugInfo(filename=filename, cwd=cwd))
def run_test_variable_sorting(self, debug_info):
bp_line = 16
breakpoints = [{
"source": {
"path": debug_info.filename
'source': {
'path': debug_info.filename
},
"breakpoints": [{
"line": bp_line
'breakpoints': [{
'line': bp_line
}]
}]
with self.start_debugging(debug_info) as dbg:
session = dbg.session
with session.wait_for_event("stopped") as result:
(
_,
req_launch_attach,
_,
reqs_bps,
_,
_,
) = lifecycle_handshake(
session, debug_info.starttype, breakpoints=breakpoints)
with session.wait_for_event('stopped') as result:
(_, req_launch_attach, _, _, _, _,
) = lifecycle_handshake(session, debug_info.starttype,
breakpoints=breakpoints)
req_launch_attach.wait()
tid = result["msg"].body["threadId"]
event = result['msg']
tid = event.body['threadId']
stacktrace = session.send_request("stackTrace", threadId=tid)
stacktrace.wait()
frame_id = stacktrace.resp.body["stackFrames"][0]["id"]
scopes = session.send_request('scopes', frameId=frame_id)
scopes.wait()
variables_reference = scopes.resp.body["scopes"][0][
"variablesReference"] # noqa
variables = session.send_request(
'variables', variablesReference=variables_reference) # noqa
variables.wait()
req_stacktrace = session.send_request(
'stackTrace',
threadId=tid,
)
req_stacktrace.wait()
frames = req_stacktrace.resp.body['stackFrames']
frame_id = frames[0]['id']
req_scopes = session.send_request(
'scopes',
frameId=frame_id,
)
req_scopes.wait()
scopes = req_scopes.resp.body['scopes']
variables_reference = scopes[0]['variablesReference']
req_variables = session.send_request(
'variables',
variablesReference=variables_reference,
)
req_variables.wait()
variables = req_variables.resp.body['variables']
try:
b_dict_var = list(v for v in variables.resp.body["variables"]
if v["name"] == 'b_test')[0]
b_dict_var_ref = b_dict_var["variablesReference"]
b_dict_var_items = session.send_request(
'variables', variablesReference=b_dict_var_ref) # noqa
b_dict_var_items.wait()
except IndexError:
b_dict_vars = list(v for v in variables if v['name'] == 'b_test')
if not b_dict_vars:
b_dict_var_items = None
else:
b_dict_var, = b_dict_vars
b_dict_var_ref = b_dict_var['variablesReference']
req_variables = session.send_request(
'variables',
variablesReference=b_dict_var_ref,
)
req_variables.wait()
b_dict_var_items = req_variables.resp.body['variables']
try:
c_dict_var = list(v for v in variables.resp.body["variables"]
if v["name"] == 'c_test')[0]
c_dict_var_ref = c_dict_var["variablesReference"]
c_dict_var_items = session.send_request(
'variables', variablesReference=c_dict_var_ref) # noqa
c_dict_var_items.wait()
except IndexError:
c_dict_var_items = None
#c_dict_vars = list(v for v in variables if v['name'] == 'c_test')
#if not c_dict_vars:
# c_dict_var_items = None
#else:
# c_dict_var, = c_dict_vars
# c_dict_var_ref = c_dict_var['variablesReference']
# req_variables = session.send_request(
# 'variables',
# variablesReference=c_dict_var_ref,
# )
# req_variables.wait()
# c_dict_var_items = req_variables.resp.body['variables']
session.send_request("continue", threadId=tid)
session.send_request('continue', threadId=tid)
variables_to_check = list(v["name"]
for v in variables.resp.body["variables"]
if v["name"].find('_test') > 0)
variables_to_check = list(v['name']
for v in variables
if v['name'].find('_test') > 0)
expected_var_order = [
'a_test', 'b_test', 'c_test', '_a_test', '_b_test', '_c_test',
'__a_test', '__b_test', '__c_test', '__a_test__', '__b_test__',
@ -233,16 +253,15 @@ class VariableTests(LifecycleTestsBase):
self.assertEqual(expected_var_order, variables_to_check)
# Dict keys are sorted normally, i.e., the '_' rules don't apply
# TODO: v["name"][1:5] is needed due to bug #45
b_dict_var_keys = list(v["name"][1:5]
for v in b_dict_var_items.resp.body["variables"]
if not v["name"].startswith('__')) # noqa
# TODO: v['name'][1:5] is needed due to bug #45
b_dict_var_keys = list(v['name'][1:5]
for v in b_dict_var_items
if not v['name'].startswith('__'))
expected_b_dict_var_keys_order = ['abcd', 'eggs', 'spam']
self.assertEqual(b_dict_var_keys, expected_b_dict_var_keys_order)
# TODO: Numeric dict keys have following issues
# bug: #45 and #213
# c_dict_var_keys = list(v["name"]
# for v in c_dict_var_items.resp.body["variables"]) # noqa
# c_dict_var_keys = list(v['name'] for v in c_dict_var_items)
# expected_c_dict_var_keys_order = ['1', '2', '10', '__len__']
# self.assertEqual(c_dict_var_keys, expected_c_dict_var_keys_order)