From 10af45bcccf46fc141ed66d375659c3c4a905bc3 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 9 Jul 2018 17:14:57 -0600 Subject: [PATCH] A bunch of cleanup/lint fixes for the system tests. (#625) The system tests have a number of minor formatting and structural issues that would be better to clean up sooner rather than later. This PR addresses nearly all of them. --- tests/__init__.py | 3 +- tests/helpers/__init__.py | 32 -- tests/helpers/debugadapter.py | 7 +- tests/helpers/debugclient.py | 39 +- tests/helpers/debugsession.py | 13 +- tests/helpers/proc.py | 3 +- tests/helpers/resource.py | 19 + tests/helpers/socket.py | 15 + tests/helpers/workspace.py | 52 ++- tests/system_tests/__init__.py | 14 +- tests/system_tests/test_basic.py | 199 ++++---- tests/system_tests/test_breakpoints.py | 619 +++++++++++++------------ tests/system_tests/test_exceptions.py | 183 ++++---- tests/system_tests/test_main.py | 36 +- tests/system_tests/test_remote.py | 64 +-- tests/system_tests/test_restart_vsc.py | 26 +- tests/system_tests/test_schema.py | 2 +- tests/system_tests/test_variables.py | 347 +++++++------- 18 files changed, 922 insertions(+), 751 deletions(-) create mode 100644 tests/helpers/resource.py diff --git a/tests/__init__.py b/tests/__init__.py index 4322993f..b463bb81 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -11,7 +11,8 @@ import ptvsd # noqa from ptvsd._vendored import list_all as vendored -TEST_ROOT = os.path.dirname(__file__) # noqa +TEST_ROOT = os.path.abspath(os.path.dirname(__file__)) # noqa +RESOURCES_ROOT = os.path.join(TEST_ROOT, 'resources') # noqa PROJECT_ROOT = os.path.dirname(TEST_ROOT) # noqa VENDORED_ROOTS = vendored(resolve=True) # noqa diff --git a/tests/helpers/__init__.py b/tests/helpers/__init__.py index dc0843e2..530435f8 100644 --- a/tests/helpers/__init__.py +++ b/tests/helpers/__init__.py @@ -1,35 +1,3 @@ def noop(*args, **kwargs): """Do nothing.""" - - -class Closeable(object): - - def __init__(self): - self._closed = False - - def __del__(self): - if not self._closed: - self.close() - - def __enter__(self): - return self - - def __exit__(self, *args): - self.close() - - @property - def closed(self): - return self._closed - - def close(self): - if self._closed: - return - self._closed = True - - self._close() - - # implemented by subclasses - - def _close(self): - raise NotImplementedError diff --git a/tests/helpers/debugadapter.py b/tests/helpers/debugadapter.py index 551a3b61..900b2e00 100644 --- a/tests/helpers/debugadapter.py +++ b/tests/helpers/debugadapter.py @@ -4,7 +4,7 @@ import socket import time from ptvsd.socket import Address -from . import Closeable +from ptvsd._util import Closeable, ClosedError from .proc import Proc from .. import PROJECT_ROOT @@ -263,7 +263,10 @@ class DebugAdapter(Closeable): def _close(self): if self._proc is not None: - self._proc.close() + try: + self._proc.close() + except ClosedError: + pass if self.VERBOSE: lines = self.output.decode('utf-8').splitlines() print(' + ' + '\n + '.join(lines)) diff --git a/tests/helpers/debugclient.py b/tests/helpers/debugclient.py index b976a2ce..fe994575 100644 --- a/tests/helpers/debugclient.py +++ b/tests/helpers/debugclient.py @@ -3,8 +3,7 @@ from __future__ import absolute_import import warnings from ptvsd.socket import Address -from ptvsd._util import new_hidden_thread -from . import Closeable +from ptvsd._util import new_hidden_thread, Closeable, ClosedError from .debugadapter import DebugAdapter, wait_for_socket_server from .debugsession import DebugSession @@ -13,11 +12,15 @@ from .debugsession import DebugSession class _LifecycleClient(Closeable): + + SESSION = DebugSession + def __init__(self, addr=None, port=8888, breakpoints=None, - connecttimeout=1.0): + connecttimeout=1.0, + ): super(_LifecycleClient, self).__init__() self._addr = Address.from_raw(addr, defaultport=port) self._connecttimeout = connecttimeout @@ -51,7 +54,11 @@ class _LifecycleClient(Closeable): if self._session is not None: self._detach() - self._adapter.close() + + try: + self._adapter.close() + except ClosedError: + pass self._adapter = None def attach_pid(self, pid, **kwargs): @@ -98,9 +105,15 @@ class _LifecycleClient(Closeable): def _close(self): if self._session is not None: - self._session.close() + try: + self._session.close() + except ClosedError: + pass if self._adapter is not None: - self._adapter.close() + try: + self._adapter.close() + except ClosedError: + pass def _launch(self, argv, @@ -131,11 +144,17 @@ class _LifecycleClient(Closeable): if addr is None: addr = self._addr assert addr.host == 'localhost' - self._session = DebugSession.create_client(addr, **kwargs) + self._session = self.SESSION.create_client(addr, **kwargs) def _detach(self): - self._session.close() + session = self._session + if session is None: + return self._session = None + try: + session.close() + except ClosedError: + pass class DebugClient(_LifecycleClient): @@ -146,6 +165,7 @@ class DebugClient(_LifecycleClient): class EasyDebugClient(DebugClient): + def start_detached(self, argv): """Start an adapter in a background process.""" if self.closed: @@ -172,8 +192,7 @@ class EasyDebugClient(DebugClient): addr = ('localhost', self._addr.port) def run(): - self._session = DebugSession.create_server(addr, **kwargs) - + self._session = self.SESSION.create_server(addr, **kwargs) t = new_hidden_thread( target=run, name='test.client', diff --git a/tests/helpers/debugsession.py b/tests/helpers/debugsession.py index 680400db..2ead92db 100644 --- a/tests/helpers/debugsession.py +++ b/tests/helpers/debugsession.py @@ -8,8 +8,7 @@ import time import threading import warnings -from ptvsd._util import new_hidden_thread -from . import Closeable +from ptvsd._util import new_hidden_thread, Closeable, ClosedError from .message import ( raw_read_all as read_messages, raw_write_one as write_message @@ -266,7 +265,10 @@ class DebugSession(Closeable): def _close(self): if self._owned: - self._conn.close() + try: + self._conn.close() + except ClosedError: + pass if self._listenerthread != threading.current_thread(): self._listenerthread.join(timeout=1.0) if self._listenerthread.is_alive(): @@ -280,7 +282,10 @@ class DebugSession(Closeable): print(' ->', msg) self._receive_message(msg) except EOFError: - self.close() + try: + self.close() + except ClosedError: + pass def _receive_message(self, msg): for i, handler in enumerate(list(self._handlers)): diff --git a/tests/helpers/proc.py b/tests/helpers/proc.py index b135a427..f88dc5b2 100644 --- a/tests/helpers/proc.py +++ b/tests/helpers/proc.py @@ -8,8 +8,7 @@ import subprocess import sys import time -from ptvsd._util import new_hidden_thread -from . import Closeable +from ptvsd._util import new_hidden_thread, Closeable _NOT_SET = object() diff --git a/tests/helpers/resource.py b/tests/helpers/resource.py new file mode 100644 index 00000000..fa4f781f --- /dev/null +++ b/tests/helpers/resource.py @@ -0,0 +1,19 @@ +import os.path + +from tests import RESOURCES_ROOT +from .workspace import ReadonlyFSTree + + +class TestResources(ReadonlyFSTree): + + @classmethod + def from_module(cls, modname): + parts = modname.split('.') + assert parts and parts[0] == 'tests' + root = os.path.join(RESOURCES_ROOT, *parts[1:]) + return cls(root) + + def __init__(self, root): + root = os.path.abspath(root) + assert root.startswith(RESOURCES_ROOT) + super(TestResources, self).__init__(root) diff --git a/tests/helpers/socket.py b/tests/helpers/socket.py index f534e897..6e98af46 100644 --- a/tests/helpers/socket.py +++ b/tests/helpers/socket.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from collections import namedtuple import contextlib +import socket import ptvsd.socket as _ptvsd @@ -9,6 +10,20 @@ import ptvsd.socket as _ptvsd convert_eof = _ptvsd.convert_eof +def resolve_hostname(): + hostname = socket.gethostname() + try: + return socket.gethostbyname(hostname) + except socket.gaierror: + addr = ('8.8.8.8', 80) + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + sock.connect(addr) + return sock.getsockname()[0] + finally: + sock.close() + + # TODO: Add timeouts to the functions. def create_server(address): diff --git a/tests/helpers/workspace.py b/tests/helpers/workspace.py index ea00480a..16c2bb3c 100644 --- a/tests/helpers/workspace.py +++ b/tests/helpers/workspace.py @@ -24,16 +24,14 @@ def _touch(filename): pass -class Workspace(object): - """File operations relative to some root directory ("workspace").""" - - PREFIX = 'workspace-' +class FSTreeBase(object): + """File operations relative to some root directory.""" @classmethod def _new_root(cls): - return tempfile.mkdtemp(prefix=cls.PREFIX) + raise NotImplementedError - def __init__(self, root=None): + def __init__(self, root): if root is not None: self._root = root self._owned = False @@ -47,6 +45,44 @@ class Workspace(object): self._owned = True return self._root + @property + def parent(self): + parent = os.path.dirname(self.root) + return FSTreeBase(parent) + + def resolve(self, *path): + """Return the absolute path (relative to the workspace).""" + return os.path.join(self.root, *path) + + def sub(self, *path): + cls = type(self) + root = self.resolve(*path) + return cls(root) + + def env_with_py_path(self, *path): + return {'PYTHONPATH': self.root} + + +class ReadonlyFSTree(FSTreeBase): + """File operations relative to some root directory.""" + + def __init__(self, root): + assert root + super(ReadonlyFSTree, self).__init__(root) + + +class Workspace(FSTreeBase): + """File operations relative to some root directory ("workspace").""" + + PREFIX = 'workspace-' + + @classmethod + def _new_root(cls): + return tempfile.mkdtemp(prefix=cls.PREFIX) + + def __init__(self, root=None): + super(Workspace, self).__init__(root) + def cleanup(self): """Release and destroy the workspace.""" if self._owned: @@ -54,10 +90,6 @@ class Workspace(object): self._owned = False self._root = None - def resolve(self, *path): - """Return the absolute path (relative to the workspace).""" - return os.path.join(self.root, *path) - def random(self, *dirpath, **kwargs): """Return a random filename resolved to the given directory.""" dirname = self.resolve(*dirpath) diff --git a/tests/system_tests/__init__.py b/tests/system_tests/__init__.py index 19b6f75c..18b0e30e 100644 --- a/tests/system_tests/__init__.py +++ b/tests/system_tests/__init__.py @@ -13,7 +13,7 @@ from tests.helpers.message import assert_is_subset from tests.helpers.script import find_line from tests.helpers.threading import get_locked_and_waiter from tests.helpers.workspace import Workspace, PathEntry -from tests.helpers.vsc import parse_message, VSCMessages, Response, Event # noqa +from tests.helpers.vsc import parse_message, VSCMessages, Response, Event ROOT = os.path.dirname(os.path.dirname(ptvsd.__file__)) @@ -232,8 +232,7 @@ class LifecycleTestsBase(TestsBase, unittest.TestCase): os.kill(pid, signal.SIGTERM) except Exception: pass - import time - time.sleep(1) # wait for socket connections to die out. # noqa + time.sleep(1) # wait for socket connections to die out. def _wrap_and_reraise(ex, session): messages = [] @@ -313,7 +312,7 @@ class LifecycleTestsBase(TestsBase, unittest.TestCase): _handle_exception(ex, adapter, session) else: if debug_info.filename is None: - argv = ["-m", debug_info.modulename] + debug_info.argv + argv = ['-m', debug_info.modulename] + debug_info.argv else: argv = [debug_info.filename] + debug_info.argv with DebugClient( @@ -355,9 +354,10 @@ class LifecycleTestsBase(TestsBase, unittest.TestCase): return True except Exception: return False - return list( - response for response in responses if isinstance(response, Event) - and response.event == event and is_subset(response.body)) # noqa + return list(resp + for resp in responses + if (isinstance(resp, Event) and resp.event == event and + is_subset(resp.body))) def find_responses(self, responses, command, condition=lambda x: True): return list( diff --git a/tests/system_tests/test_basic.py b/tests/system_tests/test_basic.py index accc12f2..c7b86624 100644 --- a/tests/system_tests/test_basic.py +++ b/tests/system_tests/test_basic.py @@ -2,51 +2,50 @@ import os import os.path import unittest -from ptvsd.wrapper import INITIALIZE_RESPONSE # noqa from tests.helpers.debugsession import Awaitable +from tests.helpers.resource import TestResources +from . import ( + _strip_newline_output_events, lifecycle_handshake, + LifecycleTestsBase, DebugInfo, PORT, +) -from . import (_strip_newline_output_events, lifecycle_handshake, - LifecycleTestsBase, DebugInfo, ROOT, PORT) -TEST_FILES_DIR = os.path.join(ROOT, 'tests', 'resources', 'system_tests', - 'test_basic') -TEST_TERMINATION_FILES_DIR = os.path.join(ROOT, 'tests', 'resources', - 'system_tests', 'test_terminate') +TEST_FILES = TestResources.from_module(__name__) +WITH_OUTPUT = TEST_FILES.sub('test_output') +WITHOUT_OUTPUT = TEST_FILES.sub('test_without_output') +WITH_ARGS = TEST_FILES.sub('test_args') +TEST_TERMINATION_FILES = TestResources.from_module( + 'tests.system_tests.test_terminate') class BasicTests(LifecycleTestsBase): + def run_test_output(self, debug_info): - options = {"debugOptions": ["RedirectOutput"]} + options = {'debugOptions': ['RedirectOutput']} with self.start_debugging(debug_info) as dbg: - (_, _, _, _, _, _) = lifecycle_handshake( - dbg.session, debug_info.starttype, options=options) + lifecycle_handshake(dbg.session, debug_info.starttype, + options=options) received = list(_strip_newline_output_events(dbg.session.received)) - self.assert_contains( - received, - [ - self.new_event("output", category="stdout", output="yes"), - self.new_event("output", category="stderr", output="no"), - ], - ) + self.assert_contains(received, [ + self.new_event('output', category='stdout', output='yes'), + self.new_event('output', category='stderr', output='no'), + ]) def run_test_arguments(self, debug_info, expected_args): - options = {"debugOptions": ["RedirectOutput"]} + options = {'debugOptions': ['RedirectOutput']} with self.start_debugging(debug_info) as dbg: - (_, _, _, _, _, _) = lifecycle_handshake( - dbg.session, debug_info.starttype, options=options) + lifecycle_handshake(dbg.session, debug_info.starttype, + options=options) received = list(_strip_newline_output_events(dbg.session.received)) - expected_output = "{}, {}".format(len(expected_args), expected_args) - self.assert_contains( - received, - [ - self.new_event( - "output", category="stdout", output=expected_output) - ], - ) + expected_output = '{}, {}'.format(len(expected_args), expected_args) + self.assert_contains(received, [ + self.new_event( + 'output', category='stdout', output=expected_output), + ]) def run_test_termination(self, debug_info): with self.start_debugging(debug_info) as dbg: @@ -55,24 +54,23 @@ class BasicTests(LifecycleTestsBase): exited = session.get_awaiter_for_event('exited') terminated = session.get_awaiter_for_event('terminated') - (_, req_launch, _, _, _, _) = lifecycle_handshake( - dbg.session, debug_info.starttype, threads=True) + (_, req_launch, _, _, _, _ + ) = lifecycle_handshake(dbg.session, debug_info.starttype, + threads=True) Awaitable.wait_all(req_launch, - session.get_awaiter_for_event('thread')) # noqa - disconnect = session.send_request("disconnect") + session.get_awaiter_for_event('thread')) + disconnect = session.send_request('disconnect') Awaitable.wait_all(exited, terminated, disconnect) def run_test_without_output(self, debug_info): - options = {"debugOptions": ["RedirectOutput"]} + options = {'debugOptions': ['RedirectOutput']} with self.start_debugging(debug_info) as dbg: - (_, _, _, _, _, _) = lifecycle_handshake( - dbg.session, - debug_info.starttype, - options=options, - threads=True) + lifecycle_handshake(dbg.session, debug_info.starttype, + options=options, + threads=True) received = list(_strip_newline_output_events(dbg.session.received)) @@ -82,91 +80,112 @@ class BasicTests(LifecycleTestsBase): class LaunchFileTests(BasicTests): + def test_with_output(self): - filename = os.path.join(TEST_FILES_DIR, 'test_output', 'output.py') + filename = WITH_OUTPUT.resolve('output.py') cwd = os.path.dirname(filename) self.run_test_output(DebugInfo(filename=filename, cwd=cwd)) def test_arguments(self): - filename = os.path.join(TEST_FILES_DIR, 'test_args', - 'launch_with_args.py') + filename = WITH_ARGS.resolve('launch_with_args.py') cwd = os.path.dirname(filename) argv = ['arg1', 'arg2'] self.run_test_arguments( DebugInfo(filename=filename, cwd=cwd, argv=argv), - [filename] + argv) + [filename] + argv, + ) @unittest.skip('Broken') def test_termination(self): - filename = os.path.join(TEST_TERMINATION_FILES_DIR, 'simple.py') + filename = TEST_TERMINATION_FILES.resolve('simple.py') cwd = os.path.dirname(filename) - self.run_test_termination(DebugInfo(filename=filename, cwd=cwd)) + self.run_test_termination( + DebugInfo(filename=filename, cwd=cwd), + ) def test_without_output(self): - filename = os.path.join(TEST_FILES_DIR, 'test_without_output', - 'output.py') + filename = WITHOUT_OUTPUT.resolve('output.py') cwd = os.path.dirname(filename) - self.run_test_without_output(DebugInfo(filename=filename, cwd=cwd)) + self.run_test_without_output( + DebugInfo(filename=filename, cwd=cwd), + ) class LaunchModuleTests(BasicTests): + def test_with_output(self): module_name = 'mymod_launch1' - cwd = os.path.join(TEST_FILES_DIR, 'test_output') - env = {"PYTHONPATH": cwd} + cwd = WITH_OUTPUT.root + env = WITH_OUTPUT.env_with_py_path() self.run_test_output( - DebugInfo(modulename=module_name, env=env, cwd=cwd)) + DebugInfo(modulename=module_name, env=env, cwd=cwd), + ) def test_without_output(self): module_name = 'mymod_launch1' - cwd = os.path.join(TEST_FILES_DIR, 'test_without_output') - env = {"PYTHONPATH": cwd} + cwd = WITHOUT_OUTPUT.root + env = WITHOUT_OUTPUT.env_with_py_path() self.run_test_without_output( - DebugInfo(modulename=module_name, env=env, cwd=cwd)) + DebugInfo(modulename=module_name, env=env, cwd=cwd), + ) @unittest.skip('Broken') def test_termination(self): module_name = 'mymod_launch1' - cwd = TEST_TERMINATION_FILES_DIR - env = {"PYTHONPATH": cwd} + cwd = TEST_TERMINATION_FILES.root + env = TEST_TERMINATION_FILES.env_with_py_path() self.run_test_output( - DebugInfo(modulename=module_name, env=env, cwd=cwd)) - self.run_test_termination(DebugInfo(modulename=module_name, cwd=cwd)) + DebugInfo(modulename=module_name, env=env, cwd=cwd), + ) + self.run_test_termination( + DebugInfo(modulename=module_name, cwd=cwd), + ) @unittest.skip('Broken') def test_arguments(self): module_name = 'mymod_launch1' - cwd = os.path.join(TEST_FILES_DIR, 'test_args') - env = {"PYTHONPATH": cwd} + cwd = WITH_ARGS.root + env = WITH_ARGS.env_with_py_path() argv = ['arg1', 'arg2'] self.run_test_arguments( DebugInfo(modulename=module_name, env=env, cwd=cwd, argv=argv), - ['-m'] + argv) + ['-m'] + argv, + ) class ServerAttachTests(BasicTests): + def test_with_output(self): - filename = os.path.join(TEST_FILES_DIR, 'test_output', 'output.py') + filename = WITH_OUTPUT.resolve('output.py') cwd = os.path.dirname(filename) argv = ['localhost', str(PORT)] self.run_test_output( DebugInfo( - filename=filename, cwd=cwd, starttype='attach', argv=argv)) + filename=filename, + cwd=cwd, + starttype='attach', + argv=argv, + ), + ) def test_without_output(self): - filename = os.path.join(TEST_FILES_DIR, 'test_without_output', - 'output.py') + filename = WITHOUT_OUTPUT.resolve('output.py') cwd = os.path.dirname(filename) argv = ['localhost', str(PORT)] self.run_test_without_output( DebugInfo( - filename=filename, cwd=cwd, starttype='attach', argv=argv)) + filename=filename, + cwd=cwd, + starttype='attach', + argv=argv, + ), + ) class PTVSDAttachTests(BasicTests): + def test_with_output(self): - filename = os.path.join(TEST_FILES_DIR, 'test_output', - 'attach_output.py') + filename = WITH_OUTPUT.resolve('attach_output.py') cwd = os.path.dirname(filename) argv = ['localhost', str(PORT)] self.run_test_output( @@ -175,11 +194,12 @@ class PTVSDAttachTests(BasicTests): attachtype='import', cwd=cwd, starttype='attach', - argv=argv)) + argv=argv, + ), + ) def test_without_output(self): - filename = os.path.join(TEST_FILES_DIR, 'test_without_output', - 'attach_output.py') + filename = WITHOUT_OUTPUT.resolve('attach_output.py') cwd = os.path.dirname(filename) argv = ['localhost', str(PORT)] self.run_test_without_output( @@ -188,14 +208,17 @@ class PTVSDAttachTests(BasicTests): attachtype='import', cwd=cwd, starttype='attach', - argv=argv)) + argv=argv, + ), + ) -class ServerAttachModuleTests(BasicTests): # noqa +class ServerAttachModuleTests(BasicTests): + def test_with_output(self): module_name = 'mymod_launch1' - cwd = os.path.join(TEST_FILES_DIR, 'test_output') - env = {"PYTHONPATH": cwd} + cwd = WITH_OUTPUT.root + env = WITH_OUTPUT.env_with_py_path() argv = ['localhost', str(PORT)] self.run_test_output( DebugInfo( @@ -203,12 +226,14 @@ class ServerAttachModuleTests(BasicTests): # noqa env=env, cwd=cwd, argv=argv, - starttype='attach')) + starttype='attach', + ), + ) def test_without_output(self): module_name = 'mymod_launch1' - cwd = os.path.join(TEST_FILES_DIR, 'test_without_output') - env = {"PYTHONPATH": cwd} + cwd = WITHOUT_OUTPUT.root + env = WITHOUT_OUTPUT.env_with_py_path() argv = ['localhost', str(PORT)] self.run_test_without_output( DebugInfo( @@ -216,7 +241,9 @@ class ServerAttachModuleTests(BasicTests): # noqa env=env, cwd=cwd, argv=argv, - starttype='attach')) + starttype='attach', + ), + ) class PTVSDAttachModuleTests(BasicTests): @@ -224,8 +251,8 @@ class PTVSDAttachModuleTests(BasicTests): def test_with_output(self): #self.enable_verbose() module_name = 'mymod_attach1' - cwd = os.path.join(TEST_FILES_DIR, 'test_output') - env = {"PYTHONPATH": cwd} + cwd = WITH_OUTPUT.root + env = WITH_OUTPUT.env_with_py_path() argv = ['localhost', str(PORT)] self.run_test_output( DebugInfo( @@ -234,12 +261,14 @@ class PTVSDAttachModuleTests(BasicTests): cwd=cwd, argv=argv, attachtype='import', - starttype='attach')) + starttype='attach', + ), + ) def test_without_output(self): module_name = 'mymod_attach1' - cwd = os.path.join(TEST_FILES_DIR, 'test_without_output') - env = {"PYTHONPATH": cwd} + cwd = WITHOUT_OUTPUT.root + env = WITHOUT_OUTPUT.env_with_py_path() argv = ['localhost', str(PORT)] self.run_test_without_output( DebugInfo( @@ -248,4 +277,6 @@ class PTVSDAttachModuleTests(BasicTests): cwd=cwd, argv=argv, attachtype='import', - starttype='attach')) + starttype='attach', + ), + ) diff --git a/tests/system_tests/test_breakpoints.py b/tests/system_tests/test_breakpoints.py index 221a6292..20e48906 100644 --- a/tests/system_tests/test_breakpoints.py +++ b/tests/system_tests/test_breakpoints.py @@ -2,201 +2,200 @@ import os import os.path import unittest -from ptvsd.wrapper import INITIALIZE_RESPONSE # noqa +from tests.helpers.resource import TestResources +from . import ( + _strip_newline_output_events, lifecycle_handshake, + LifecycleTestsBase, DebugInfo, PORT, +) -from . import (_strip_newline_output_events, lifecycle_handshake, - LifecycleTestsBase, DebugInfo, ROOT, PORT) -TEST_FILES_DIR = os.path.join(ROOT, 'tests', 'resources', 'system_tests', - 'test_breakpoints') +TEST_FILES = TestResources.from_module(__name__) class BreakpointTests(LifecycleTestsBase): + def run_test_with_break_points(self, debug_info, bp_filename, bp_line): - options = {"debugOptions": ["RedirectOutput"]} + options = {'debugOptions': ['RedirectOutput']} breakpoints = [{ - "source": { - "path": bp_filename + 'source': { + 'path': bp_filename }, - "breakpoints": [{ - "line": bp_line + 'breakpoints': [{ + 'line': bp_line }] }] with self.start_debugging(debug_info) as dbg: session = dbg.session - with session.wait_for_event("stopped") as result: - ( - _, - req_launch_attach, - _, - reqs_bps, - _, - _, - ) = lifecycle_handshake( - session, - debug_info.starttype, - options=options, - breakpoints=breakpoints) - + with session.wait_for_event('stopped') as result: + (_, req_launch_attach, _, _, _, _, + ) = lifecycle_handshake(session, debug_info.starttype, + options=options, + breakpoints=breakpoints) req_launch_attach.wait() + event = result['msg'] + tid = event.body['threadId'] - req_bps, = reqs_bps # There should only be one. - tid = result["msg"].body["threadId"] - stacktrace = session.send_request("stackTrace", threadId=tid) - stacktrace.wait() - session.send_request("continue", threadId=tid) + req_stacktrace = session.send_request( + 'stackTrace', + threadId=tid, + ) + req_stacktrace.wait() + stacktrace = req_stacktrace.resp.body + + session.send_request( + 'continue', + threadId=tid, + ) received = list(_strip_newline_output_events(session.received)) - self.assertGreaterEqual(stacktrace.resp.body["totalFrames"], 1) - self.assert_is_subset( - stacktrace.resp.body, - { - # We get Python and PTVSD frames as well. - # "totalFrames": 2, - "stackFrames": [{ - "id": 1, - "name": "", - "source": { - "sourceReference": 0 - }, - "line": bp_line, - "column": 1, - }], - }) + self.assertGreaterEqual(stacktrace['totalFrames'], 1) + self.assert_is_subset(stacktrace, { + # We get Python and PTVSD frames as well. + # 'totalFrames': 2, + 'stackFrames': [{ + 'id': 1, + 'name': '', + 'source': { + 'sourceReference': 0 + }, + 'line': bp_line, + 'column': 1, + }], + }) - self.assert_contains( - received, - [ - self.new_event( - "stopped", - reason="breakpoint", - threadId=tid, - text=None, - description=None, - ), - self.new_event("continued", threadId=tid), - self.new_event("output", category="stdout", output="yes"), - self.new_event("output", category="stderr", output="no"), - self.new_event("exited", exitCode=0), - self.new_event("terminated"), - ], - ) + self.assert_contains(received, [ + self.new_event( + 'stopped', + reason='breakpoint', + threadId=tid, + text=None, + description=None, + ), + self.new_event('continued', threadId=tid), + self.new_event('output', category='stdout', output='yes'), + self.new_event('output', category='stderr', output='no'), + self.new_event('exited', exitCode=0), + self.new_event('terminated'), + ]) def run_test_with_break_points_across_files( self, debug_info, first_file, second_file, second_file_line, expected_modules, expected_stacktrace): breakpoints = [{ - "source": { - "path": second_file + 'source': { + 'path': second_file }, - "breakpoints": [{ - "line": second_file_line + 'breakpoints': [{ + 'line': second_file_line }] }] with self.start_debugging(debug_info) as dbg: session = dbg.session - with session.wait_for_event("stopped") as result: - ( - _, - req_launch_attach, - _, - _, - _, - _, - ) = lifecycle_handshake( - session, debug_info.starttype, breakpoints=breakpoints) - + with session.wait_for_event('stopped') as result: + (_, req_launch_attach, _, _, _, _, + ) = lifecycle_handshake(session, debug_info.starttype, + breakpoints=breakpoints) req_launch_attach.wait() + event = result['msg'] + tid = event.body['threadId'] - tid = result["msg"].body["threadId"] - stacktrace = session.send_request("stackTrace", threadId=tid) - stacktrace.wait() - session.send_request("continue", threadId=tid) + req_stacktrace = session.send_request( + 'stackTrace', + threadId=tid, + ) + req_stacktrace.wait() + stacktrace = req_stacktrace.resp.body + + session.send_request('continue', threadId=tid) received = list(_strip_newline_output_events(session.received)) for mod in expected_modules: found_mod = self.find_events(received, 'module', mod) - self.assertEqual( - len(found_mod), 1, 'Modul not found {}'.format(mod)) + self.assertEqual(len(found_mod), + 1, + 'Modul not found {}'.format(mod)) - self.assert_is_subset(stacktrace.resp, expected_stacktrace) + self.assert_is_subset(stacktrace, expected_stacktrace) def run_test_conditional_break_points(self, debug_info): breakpoints = [{ - "source": { - "path": debug_info.filename + 'source': { + 'path': debug_info.filename }, - "breakpoints": [{ - "line": 4, - "condition": "i == 2" + 'breakpoints': [{ + 'line': 4, + 'condition': 'i == 2' }], - "lines": [4] + 'lines': [4] }] with self.start_debugging(debug_info) as dbg: session = dbg.session - with session.wait_for_event("stopped") as result: - ( - _, - _, - _, - _, - _, - _, - ) = lifecycle_handshake( - session, debug_info.starttype, breakpoints=breakpoints) + with session.wait_for_event('stopped') as result: + lifecycle_handshake(session, debug_info.starttype, + breakpoints=breakpoints) + event = result['msg'] + tid = event.body['threadId'] - tid = result["msg"].body["threadId"] - stacktrace = session.send_request("stackTrace", threadId=tid) - stacktrace.wait() + req_stacktrace = session.send_request( + 'stackTrace', + threadId=tid, + ) + req_stacktrace.wait() + frames = req_stacktrace.resp.body['stackFrames'] + frame_id = frames[0]['id'] + req_scopes = session.send_request( + 'scopes', + frameId=frame_id, + ) + req_scopes.wait() + scopes = req_scopes.resp.body['scopes'] + variables_reference = scopes[0]['variablesReference'] + req_variables = session.send_request( + 'variables', + variablesReference=variables_reference, + ) + req_variables.wait() + variables = req_variables.resp.body['variables'] - frame_id = stacktrace.resp.body["stackFrames"][0]["id"] - scopes = session.send_request('scopes', frameId=frame_id) - scopes.wait() - variables_reference = scopes.resp.body["scopes"][0][ - "variablesReference"] - variables = session.send_request( - 'variables', variablesReference=variables_reference) - variables.wait() - session.send_request("continue", threadId=tid) + session.send_request('continue', threadId=tid) - self.assert_is_subset(variables.resp.body["variables"], - [{ - "name": "a", - "type": "int", - "value": "1", - "evaluateName": "a" - }, { - "name": "b", - "type": "int", - "value": "2", - "evaluateName": "b" - }, { - "name": "c", - "type": "int", - "value": "1", - "evaluateName": "c" - }, { - "name": "i", - "type": "int", - "value": "2", - "evaluateName": "i" - }]) + self.assert_is_subset(variables, [{ + 'name': 'a', + 'type': 'int', + 'value': '1', + 'evaluateName': 'a' + }, { + 'name': 'b', + 'type': 'int', + 'value': '2', + 'evaluateName': 'b' + }, { + 'name': 'c', + 'type': 'int', + 'value': '1', + 'evaluateName': 'c' + }, { + 'name': 'i', + 'type': 'int', + 'value': '2', + 'evaluateName': 'i' + }]) def run_test_hit_conditional_break_points(self, debug_info, **kwargs): breakpoints = [{ - "source": { - "path": debug_info.filename + 'source': { + 'path': debug_info.filename }, - "breakpoints": [{ - "line": 4, - "hitCondition": kwargs['hit_condition'] + 'breakpoints': [{ + 'line': 4, + 'hitCondition': kwargs['hit_condition'] }], - "lines": [4] + 'lines': [4] }] i_values = [] @@ -206,76 +205,69 @@ class BreakpointTests(LifecycleTestsBase): count = 0 while count < hits: if count == 0: - with session.wait_for_event("stopped") as result: - ( - _, - _, - _, - _, - _, - _, - ) = lifecycle_handshake( - session, debug_info.starttype, - breakpoints=breakpoints) + with session.wait_for_event('stopped') as result: + lifecycle_handshake(session, debug_info.starttype, + breakpoints=breakpoints) + event = result['msg'] + tid = event.body['threadId'] - tid = result["msg"].body["threadId"] - stacktrace = session.send_request("stackTrace", threadId=tid) - stacktrace.wait() - - frame_id = stacktrace.resp.body["stackFrames"][0]["id"] - scopes = session.send_request('scopes', frameId=frame_id) - scopes.wait() - variables_reference = scopes.resp.body["scopes"][0][ - "variablesReference"] - variables = session.send_request( - 'variables', variablesReference=variables_reference) - variables.wait() + req_stacktrace = session.send_request( + 'stackTrace', + threadId=tid, + ) + req_stacktrace.wait() + frames = req_stacktrace.resp.body['stackFrames'] + frame_id = frames[0]['id'] + req_scopes = session.send_request( + 'scopes', + frameId=frame_id, + ) + req_scopes.wait() + scopes = req_scopes.resp.body['scopes'] + variables_reference = scopes[0]['variablesReference'] + req_variables = session.send_request( + 'variables', + variablesReference=variables_reference, + ) + req_variables.wait() + variables = req_variables.resp.body['variables'] i_value = list(int(v['value']) - for v in variables.resp.body["variables"] + for v in variables if v['name'] == 'i') i_values.append(i_value[0] if len(i_value) > 0 else None) count = count + 1 if count < hits: - with session.wait_for_event("stopped") as result: - session.send_request("continue", threadId=tid) + with session.wait_for_event('stopped') as result: + session.send_request('continue', threadId=tid) else: - session.send_request("continue", threadId=tid) + session.send_request('continue', threadId=tid) self.assertEqual(i_values, kwargs['expected']) def run_test_logpoints(self, debug_info): - options = {"debugOptions": ["RedirectOutput"]} + options = {'debugOptions': ['RedirectOutput']} breakpoints = [{ - "source": { - "path": debug_info.filename + 'source': { + 'path': debug_info.filename }, - "breakpoints": [{ - "line": 4, - "logMessage": "Sum of a + i = {a + i}" + 'breakpoints': [{ + 'line': 4, + 'logMessage': 'Sum of a + i = {a + i}' }], - "lines": [4] + 'lines': [4] }] with self.start_debugging(debug_info) as dbg: session = dbg.session - ( - _, - _, - _, - _, - _, - _, - ) = lifecycle_handshake( - session, - debug_info.starttype, - options=options, - breakpoints=breakpoints) + lifecycle_handshake(session, debug_info.starttype, + options=options, + breakpoints=breakpoints) received = list(_strip_newline_output_events(session.received)) expected_events = [ self.new_event( - "output", - category="stdout", - output="Sum of a + i = {}{}".format(i + 1, os.linesep)) + 'output', + category='stdout', + output='Sum of a + i = {}{}'.format(i + 1, os.linesep)) for i in range(5) ] @@ -283,215 +275,246 @@ class BreakpointTests(LifecycleTestsBase): class LaunchFileTests(BreakpointTests): + def test_with_break_points(self): - filename = os.path.join(TEST_FILES_DIR, 'output.py') + filename = TEST_FILES.resolve('output.py') cwd = os.path.dirname(filename) self.run_test_with_break_points( - DebugInfo(filename=filename, cwd=cwd), filename, bp_line=3) + DebugInfo(filename=filename, cwd=cwd), + filename, + bp_line=3, + ) def test_with_break_points_across_files(self): - first_file = os.path.join(TEST_FILES_DIR, 'foo.py') - second_file = os.path.join(TEST_FILES_DIR, 'bar.py') + first_file = TEST_FILES.resolve('foo.py') + second_file = TEST_FILES.resolve('bar.py') cwd = os.path.dirname(first_file) expected_modules = [{ - "reason": "new", - "module": { - "path": second_file, - "name": "bar" + 'reason': 'new', + 'module': { + 'path': second_file, + 'name': 'bar' } }, { - "reason": "new", - "module": { - "path": first_file, - "name": "__main__" + 'reason': 'new', + 'module': { + 'path': first_file, + 'name': '__main__' } }] expected_stacktrace = { - "stackFrames": [{ - "name": "do_bar", - "source": { - "path": second_file, - "sourceReference": 0 + 'stackFrames': [{ + 'name': 'do_bar', + 'source': { + 'path': second_file, + 'sourceReference': 0 }, - "line": 2, - "column": 1 + 'line': 2, + 'column': 1 }, { - "name": "do_foo", - "source": { - "path": first_file, - "sourceReference": 0 + 'name': 'do_foo', + 'source': { + 'path': first_file, + 'sourceReference': 0 }, - "line": 5, - "column": 1 + 'line': 5, + 'column': 1 }, { - "id": 3, - "name": "", - "source": { - "path": first_file, - "sourceReference": 0 + 'id': 3, + 'name': '', + 'source': { + 'path': first_file, + 'sourceReference': 0 }, - "line": 8, - "column": 1 + 'line': 8, + 'column': 1 }], } self.run_test_with_break_points_across_files( - DebugInfo(filename=first_file, cwd=cwd), first_file, second_file, - 2, expected_modules, expected_stacktrace) + DebugInfo(filename=first_file, cwd=cwd), + first_file, + second_file, + 2, + expected_modules, + expected_stacktrace, + ) def test_conditional_break_points(self): - filename = os.path.join(TEST_FILES_DIR, 'loopy.py') + filename = TEST_FILES.resolve('loopy.py') cwd = os.path.dirname(filename) self.run_test_conditional_break_points( DebugInfo(filename=filename, cwd=cwd)) def test_logpoints(self): - filename = os.path.join(TEST_FILES_DIR, 'loopy.py') + filename = TEST_FILES.resolve('loopy.py') cwd = os.path.dirname(filename) - self.run_test_logpoints(DebugInfo(filename=filename, cwd=cwd)) + self.run_test_logpoints( + DebugInfo(filename=filename, cwd=cwd)) def test_hit_conditional_break_points_equal(self): - filename = os.path.join(TEST_FILES_DIR, 'loopy.py') + filename = TEST_FILES.resolve('loopy.py') cwd = os.path.dirname(filename) self.run_test_hit_conditional_break_points( DebugInfo(filename=filename, cwd=cwd), hit_condition='== 5', hits=1, - expected=[4]) + expected=[4], + ) def test_hit_conditional_break_points_equal2(self): - filename = os.path.join(TEST_FILES_DIR, 'loopy.py') + filename = TEST_FILES.resolve('loopy.py') cwd = os.path.dirname(filename) self.run_test_hit_conditional_break_points( DebugInfo(filename=filename, cwd=cwd), hit_condition='5', hits=1, - expected=[4]) + expected=[4], + ) def test_hit_conditional_break_points_greater(self): - filename = os.path.join(TEST_FILES_DIR, 'loopy.py') + filename = TEST_FILES.resolve('loopy.py') cwd = os.path.dirname(filename) self.run_test_hit_conditional_break_points( DebugInfo(filename=filename, cwd=cwd), hit_condition='> 5', hits=5, - expected=[5, 6, 7, 8, 9]) + expected=[5, 6, 7, 8, 9], + ) def test_hit_conditional_break_points_greater_or_equal(self): - filename = os.path.join(TEST_FILES_DIR, 'loopy.py') + filename = TEST_FILES.resolve('loopy.py') cwd = os.path.dirname(filename) self.run_test_hit_conditional_break_points( DebugInfo(filename=filename, cwd=cwd), hit_condition='>= 5', hits=6, - expected=[4, 5, 6, 7, 8, 9]) + expected=[4, 5, 6, 7, 8, 9], + ) def test_hit_conditional_break_points_lesser(self): - filename = os.path.join(TEST_FILES_DIR, 'loopy.py') + filename = TEST_FILES.resolve('loopy.py') cwd = os.path.dirname(filename) self.run_test_hit_conditional_break_points( DebugInfo(filename=filename, cwd=cwd), hit_condition='< 5', hits=4, - expected=[0, 1, 2, 3]) + expected=[0, 1, 2, 3], + ) def test_hit_conditional_break_points_lesser_or_equal(self): - filename = os.path.join(TEST_FILES_DIR, 'loopy.py') + filename = TEST_FILES.resolve('loopy.py') cwd = os.path.dirname(filename) self.run_test_hit_conditional_break_points( DebugInfo(filename=filename, cwd=cwd), hit_condition='<= 5', hits=5, - expected=[0, 1, 2, 3, 4]) + expected=[0, 1, 2, 3, 4], + ) def test_hit_conditional_break_points_mod(self): - filename = os.path.join(TEST_FILES_DIR, 'loopy.py') + filename = TEST_FILES.resolve('loopy.py') cwd = os.path.dirname(filename) self.run_test_hit_conditional_break_points( DebugInfo(filename=filename, cwd=cwd), hit_condition='% 4', hits=2, - expected=[3, 7]) + expected=[3, 7], + ) class LaunchModuleTests(BreakpointTests): + def test_with_break_points(self): module_name = 'mymod_launch1' - cwd = os.path.join(TEST_FILES_DIR) - env = {"PYTHONPATH": cwd} + env = TEST_FILES.env_with_py_path() + cwd = TEST_FILES.root bp_filename = os.path.join(cwd, module_name, '__init__.py') self.run_test_with_break_points( DebugInfo(modulename=module_name, env=env, cwd=cwd), bp_filename, - bp_line=3) + bp_line=3, + ) def test_with_break_points_across_files(self): module_name = 'mymod_foo' - first_file = os.path.join(TEST_FILES_DIR, module_name, '__init__.py') - second_file = os.path.join(TEST_FILES_DIR, 'mymod_bar', 'bar.py') - cwd = os.path.join(TEST_FILES_DIR) - env = {"PYTHONPATH": cwd} + first_file = TEST_FILES.resolve(module_name, '__init__.py') + second_file = TEST_FILES.resolve('mymod_bar', 'bar.py') + env = TEST_FILES.env_with_py_path() + cwd = TEST_FILES.root expected_modules = [{ - "reason": "new", - "module": { - "package": "mymod_bar", - "path": second_file, - "name": "mymod_bar.bar" + 'reason': 'new', + 'module': { + 'package': 'mymod_bar', + 'path': second_file, + 'name': 'mymod_bar.bar' } }, { - "reason": "new", - "module": { - "path": first_file, - "name": "__main__" + 'reason': 'new', + 'module': { + 'path': first_file, + 'name': '__main__' } }] expected_stacktrace = { - "stackFrames": [{ - "name": "do_bar", - "source": { - "path": second_file, - "sourceReference": 0 + 'stackFrames': [{ + 'name': 'do_bar', + 'source': { + 'path': second_file, + 'sourceReference': 0 }, - "line": 2, - "column": 1 + 'line': 2, + 'column': 1 }, { - "name": "do_foo", - "source": { - "path": first_file, - "sourceReference": 0 + 'name': 'do_foo', + 'source': { + 'path': first_file, + 'sourceReference': 0 }, - "line": 5, - "column": 1 + 'line': 5, + 'column': 1 }, { - "id": 3, - "name": "", - "source": { - "path": first_file, - "sourceReference": 0 + 'id': 3, + 'name': '', + 'source': { + 'path': first_file, + 'sourceReference': 0 }, - "line": 8, - "column": 1 + 'line': 8, + 'column': 1 }], } self.run_test_with_break_points_across_files( - DebugInfo(modulename=module_name, cwd=cwd, env=env), first_file, - second_file, 2, expected_modules, expected_stacktrace) + DebugInfo(modulename=module_name, cwd=cwd, env=env), + first_file, + second_file, + 2, + expected_modules, + expected_stacktrace, + ) class ServerAttachTests(BreakpointTests): + def test_with_break_points(self): - filename = os.path.join(TEST_FILES_DIR, 'output.py') + filename = TEST_FILES.resolve('output.py') cwd = os.path.dirname(filename) argv = ['localhost', str(PORT)] self.run_test_with_break_points( DebugInfo( - filename=filename, cwd=cwd, starttype='attach', argv=argv), + filename=filename, + cwd=cwd, + starttype='attach', + argv=argv, + ), filename, - bp_line=3) + bp_line=3, + ) class PTVSDAttachTests(BreakpointTests): + def test_with_break_points(self): - filename = os.path.join(TEST_FILES_DIR, 'attach_output.py') + filename = TEST_FILES.resolve('attach_output.py') cwd = os.path.dirname(filename) argv = ['localhost', str(PORT)] self.run_test_with_break_points( @@ -500,16 +523,19 @@ class PTVSDAttachTests(BreakpointTests): attachtype='import', cwd=cwd, starttype='attach', - argv=argv), + argv=argv, + ), filename, - bp_line=6) + bp_line=6, + ) -class ServerAttachModuleTests(BreakpointTests): # noqa +class ServerAttachModuleTests(BreakpointTests): + def test_with_break_points(self): module_name = 'mymod_launch1' - cwd = os.path.join(TEST_FILES_DIR) - env = {"PYTHONPATH": cwd} + env = TEST_FILES.env_with_py_path() + cwd = TEST_FILES.root argv = ['localhost', str(PORT)] bp_filename = os.path.join(cwd, module_name, '__init__.py') self.run_test_with_break_points( @@ -518,17 +544,20 @@ class ServerAttachModuleTests(BreakpointTests): # noqa env=env, cwd=cwd, argv=argv, - starttype='attach'), + starttype='attach', + ), bp_filename, - bp_line=3) + bp_line=3, + ) @unittest.skip('Needs fixing') -class PTVSDAttachModuleTests(BreakpointTests): # noqa +class PTVSDAttachModuleTests(BreakpointTests): + def test_with_break_points(self): module_name = 'mymod_attach1' - cwd = os.path.join(TEST_FILES_DIR) - env = {"PYTHONPATH": cwd} + env = TEST_FILES.env_with_py_path() + cwd = TEST_FILES.root argv = ['localhost', str(PORT)] bp_filename = os.path.join(cwd, module_name, '__init__.py') self.run_test_with_break_points( @@ -538,6 +567,8 @@ class PTVSDAttachModuleTests(BreakpointTests): # noqa cwd=cwd, argv=argv, attachtype='import', - starttype='attach'), + starttype='attach', + ), bp_filename, - bp_line=6) + bp_line=6, + ) diff --git a/tests/system_tests/test_exceptions.py b/tests/system_tests/test_exceptions.py index 64e10b60..94fc40aa 100644 --- a/tests/system_tests/test_exceptions.py +++ b/tests/system_tests/test_exceptions.py @@ -2,140 +2,149 @@ import os import os.path import unittest -from ptvsd.wrapper import INITIALIZE_RESPONSE # noqa from tests.helpers.debugsession import Awaitable +from tests.helpers.resource import TestResources +from . import ( + _strip_newline_output_events, lifecycle_handshake, + LifecycleTestsBase, DebugInfo, PORT, +) -from . import (_strip_newline_output_events, lifecycle_handshake, - LifecycleTestsBase, DebugInfo, ROOT, PORT) -TEST_FILES_DIR = os.path.join(ROOT, 'tests', 'resources', 'system_tests', - 'test_exceptions') +TEST_FILES = TestResources.from_module(__name__) class ExceptionTests(LifecycleTestsBase): + def run_test_not_breaking_into_handled_exceptions(self, debug_info): - excbreakpoints = [{"filters": ["uncaught"]}] - options = {"debugOptions": ["RedirectOutput"]} + excbreakpoints = [{'filters': ['uncaught']}] + options = {'debugOptions': ['RedirectOutput']} with self.start_debugging(debug_info) as dbg: - (_, req_attach, _, _, _, _) = lifecycle_handshake( - dbg.session, - debug_info.starttype, - excbreakpoints=excbreakpoints, - options=options) + (_, req_attach, _, _, _, _ + ) = lifecycle_handshake(dbg.session, debug_info.starttype, + excbreakpoints=excbreakpoints, + options=options) received = list(_strip_newline_output_events(dbg.session.received)) - self.assert_contains( - received, - [ - self.new_event("output", category="stdout", output="end"), - self.new_event("exited", exitCode=0), - self.new_event("terminated"), - ], - ) + self.assert_contains(received, [ + self.new_event('output', category='stdout', output='end'), + self.new_event('exited', exitCode=0), + self.new_event('terminated'), + ]) def run_test_breaking_into_handled_exceptions(self, debug_info): - excbreakpoints = [{"filters": ["raised", "uncaught"]}] - options = {"debugOptions": ["RedirectOutput"]} + excbreakpoints = [{'filters': ['raised', 'uncaught']}] + options = {'debugOptions': ['RedirectOutput']} with self.start_debugging(debug_info) as dbg: stopped = dbg.session.get_awaiter_for_event('stopped') - (_, req_launch_attach, _, _, _, _) = lifecycle_handshake( - dbg.session, - debug_info.starttype, - excbreakpoints=excbreakpoints, - options=options, - threads=True) + (_, req_launch_attach, _, _, _, _ + ) = lifecycle_handshake(dbg.session, debug_info.starttype, + excbreakpoints=excbreakpoints, + options=options, + threads=True) Awaitable.wait_all(req_launch_attach, stopped) - self.assertEqual(stopped.event.body["text"], "ArithmeticError") + self.assertEqual(stopped.event.body['text'], 'ArithmeticError') self.assertIn("ArithmeticError('Hello'", - stopped.event.body["description"]) + stopped.event.body['description']) - thread_id = stopped.event.body["threadId"] - ex_info = dbg.session.send_request( - 'exceptionInfo', threadId=thread_id) - ex_info.wait() + thread_id = stopped.event.body['threadId'] + req_exc_info = dbg.session.send_request( + 'exceptionInfo', + threadId=thread_id, + ) + req_exc_info.wait() + exc_info = req_exc_info.resp.body - self.assert_is_subset( - ex_info.resp.body, - { - "exceptionId": "ArithmeticError", - "breakMode": "always", - "details": { - "typeName": "ArithmeticError", - # "source": debug_info.filename - } - }) + self.assert_is_subset(exc_info, { + 'exceptionId': 'ArithmeticError', + 'breakMode': 'always', + 'details': { + 'typeName': 'ArithmeticError', + # 'source': debug_info.filename + } + }) continued = dbg.session.get_awaiter_for_event('continued') - dbg.session.send_request("continue", threadId=thread_id).wait() - + dbg.session.send_request( + 'continue', + threadId=thread_id, + ).wait() Awaitable.wait_all(continued) received = list(_strip_newline_output_events(dbg.session.received)) - self.assert_contains( - received, - [ - self.new_event("continued", threadId=thread_id), - self.new_event("output", category="stdout", output="end"), - self.new_event("exited", exitCode=0), - self.new_event("terminated"), - ], - ) + self.assert_contains(received, [ + self.new_event('continued', threadId=thread_id), + self.new_event('output', category='stdout', output='end'), + self.new_event('exited', exitCode=0), + self.new_event('terminated'), + ]) class LaunchFileTests(ExceptionTests): + def test_not_breaking_into_handled_exceptions(self): - filename = os.path.join(TEST_FILES_DIR, 'handled_exceptions_launch.py') + filename = TEST_FILES.resolve('handled_exceptions_launch.py') cwd = os.path.dirname(filename) self.run_test_not_breaking_into_handled_exceptions( DebugInfo(filename=filename, cwd=cwd)) def test_breaking_into_handled_exceptions(self): - filename = os.path.join(TEST_FILES_DIR, 'handled_exceptions_launch.py') + filename = TEST_FILES.resolve('handled_exceptions_launch.py') cwd = os.path.dirname(filename) self.run_test_breaking_into_handled_exceptions( DebugInfo(filename=filename, cwd=cwd)) class LaunchModuleExceptionLifecycleTests(ExceptionTests): + def test_breaking_into_handled_exceptions(self): module_name = 'mymod_launch1' - env = {"PYTHONPATH": TEST_FILES_DIR} - cwd = os.path.dirname(TEST_FILES_DIR) + env = TEST_FILES.env_with_py_path() + cwd = TEST_FILES.parent.root self.run_test_breaking_into_handled_exceptions( DebugInfo(modulename=module_name, env=env, cwd=cwd)) def test_not_breaking_into_handled_exceptions(self): module_name = 'mymod_launch1' - env = {"PYTHONPATH": TEST_FILES_DIR} - cwd = os.path.dirname(TEST_FILES_DIR) + env = TEST_FILES.env_with_py_path() + cwd = TEST_FILES.parent.root self.run_test_not_breaking_into_handled_exceptions( DebugInfo(modulename=module_name, env=env, cwd=cwd)) class ServerAttachExceptionLifecycleTests(ExceptionTests): + def test_breaking_into_handled_exceptions(self): - filename = os.path.join(TEST_FILES_DIR, 'handled_exceptions_launch.py') + filename = TEST_FILES.resolve('handled_exceptions_launch.py') cwd = os.path.dirname(filename) argv = ['localhost', str(PORT)] self.run_test_breaking_into_handled_exceptions( DebugInfo( - filename=filename, cwd=cwd, starttype='attach', argv=argv)) + filename=filename, + cwd=cwd, + starttype='attach', + argv=argv, + )) def test_not_breaking_into_handled_exceptions(self): - filename = os.path.join(TEST_FILES_DIR, 'handled_exceptions_launch.py') + filename = TEST_FILES.resolve('handled_exceptions_launch.py') cwd = os.path.dirname(filename) argv = ['localhost', str(PORT)] self.run_test_not_breaking_into_handled_exceptions( DebugInfo( - filename=filename, cwd=cwd, starttype='attach', argv=argv)) + filename=filename, + cwd=cwd, + starttype='attach', + argv=argv, + )) class PTVSDAttachExceptionLifecycleTests(ExceptionTests): + def test_breaking_into_handled_exceptions(self): - filename = os.path.join(TEST_FILES_DIR, 'handled_exceptions_attach.py') + filename = TEST_FILES.resolve('handled_exceptions_attach.py') cwd = os.path.dirname(filename) argv = ['localhost', str(PORT)] self.run_test_breaking_into_handled_exceptions( @@ -144,11 +153,12 @@ class PTVSDAttachExceptionLifecycleTests(ExceptionTests): attachtype='import', cwd=cwd, starttype='attach', - argv=argv)) + argv=argv, + )) @unittest.skip('Needs fixing in #609') def test_not_breaking_into_handled_exceptions(self): - filename = os.path.join(TEST_FILES_DIR, 'handled_exceptions_attach.py') + filename = TEST_FILES.resolve('handled_exceptions_attach.py') cwd = os.path.dirname(filename) argv = ['localhost', str(PORT)] self.run_test_not_breaking_into_handled_exceptions( @@ -157,14 +167,16 @@ class PTVSDAttachExceptionLifecycleTests(ExceptionTests): attachtype='import', cwd=cwd, starttype='attach', - argv=argv)) + argv=argv, + )) -class ServerAttachModuleExceptionLifecycleTests(ExceptionTests): # noqa +class ServerAttachModuleExceptionLifecycleTests(ExceptionTests): + def test_breaking_into_handled_exceptions(self): module_name = 'mymod_launch1' - env = {"PYTHONPATH": TEST_FILES_DIR} - cwd = TEST_FILES_DIR + env = TEST_FILES.env_with_py_path() + cwd = TEST_FILES.root argv = ['localhost', str(PORT)] self.run_test_breaking_into_handled_exceptions( DebugInfo( @@ -172,12 +184,13 @@ class ServerAttachModuleExceptionLifecycleTests(ExceptionTests): # noqa env=env, cwd=cwd, argv=argv, - starttype='attach')) + starttype='attach', + )) def test_not_breaking_into_handled_exceptions(self): module_name = 'mymod_launch1' - env = {"PYTHONPATH": TEST_FILES_DIR} - cwd = TEST_FILES_DIR + env = TEST_FILES.env_with_py_path() + cwd = TEST_FILES.root argv = ['localhost', str(PORT)] self.run_test_not_breaking_into_handled_exceptions( DebugInfo( @@ -185,15 +198,17 @@ class ServerAttachModuleExceptionLifecycleTests(ExceptionTests): # noqa env=env, cwd=cwd, argv=argv, - starttype='attach')) + starttype='attach', + )) @unittest.skip('Needs fixing') -class PTVSDAttachModuleExceptionLifecycleTests(ExceptionTests): # noqa +class PTVSDAttachModuleExceptionLifecycleTests(ExceptionTests): + def test_breaking_into_handled_exceptions(self): module_name = 'mymod_attach1' - env = {"PYTHONPATH": TEST_FILES_DIR} - cwd = TEST_FILES_DIR + env = TEST_FILES.env_with_py_path() + cwd = TEST_FILES.root argv = ['localhost', str(PORT)] self.run_test_breaking_into_handled_exceptions( DebugInfo( @@ -202,12 +217,13 @@ class PTVSDAttachModuleExceptionLifecycleTests(ExceptionTests): # noqa cwd=cwd, argv=argv, attachtype='import', - starttype='attach')) + starttype='attach', + )) def test_not_breaking_into_handled_exceptions(self): module_name = 'mymod_attach1' - env = {"PYTHONPATH": TEST_FILES_DIR} - cwd = TEST_FILES_DIR + env = TEST_FILES.env_with_py_path() + cwd = TEST_FILES.root argv = ['localhost', str(PORT)] self.run_test_not_breaking_into_handled_exceptions( DebugInfo( @@ -216,4 +232,5 @@ class PTVSDAttachModuleExceptionLifecycleTests(ExceptionTests): # noqa cwd=cwd, argv=argv, attachtype='import', - starttype='attach')) + starttype='attach', + )) diff --git a/tests/system_tests/test_main.py b/tests/system_tests/test_main.py index fd917704..f53e633d 100644 --- a/tests/system_tests/test_main.py +++ b/tests/system_tests/test_main.py @@ -5,7 +5,7 @@ import unittest import ptvsd from ptvsd.socket import Address -from ptvsd.wrapper import INITIALIZE_RESPONSE # noqa +from ptvsd.wrapper import INITIALIZE_RESPONSE from tests.helpers.debugadapter import DebugAdapter from tests.helpers.debugclient import EasyDebugClient as DebugClient from tests.helpers.lock import LockTimeoutError @@ -123,7 +123,7 @@ class LifecycleTests(LifecycleTestsBase): done() adapter.wait() - # Skipping the 'thread exited' and 'terminated' messages which + # Skipping the "thread exited" and "terminated" messages which # may appear randomly in the received list. received = list(_strip_newline_output_events(session.received)) self.assert_received(received[:7], [ @@ -376,16 +376,18 @@ class LifecycleTests(LifecycleTestsBase): _, _, req_threads1, ) = lifecycle_handshake(session1, 'attach', threads=True) - tid1 = result['msg'].body['threadId'] + event = result['msg'] + tid1 = event.body['threadId'] stopped_event = session1.get_awaiter_for_event('stopped') - req_bps = session1.send_request('setBreakpoints', **{ - 'source': {'path': filename}, - 'breakpoints': [ + req_bps = session1.send_request( + 'setBreakpoints', + source={'path': filename}, + breakpoints=[ {'line': bp1}, {'line': bp2}, ], - }) + ) req_bps.wait() done1() @@ -417,7 +419,8 @@ class LifecycleTests(LifecycleTestsBase): _, _, req_threads3, ) = lifecycle_handshake(session2, 'attach', threads=True) - tid2 = result['msg'].body['threadId'] + event = result['msg'] + tid2 = event.body['threadId'] done2() adapter.wait() @@ -542,7 +545,7 @@ class LifecycleTests(LifecycleTestsBase): @unittest.skip('not implemented') def test_attach_exit_during_session(self): - # TODO: Ensure we see the "terminated" and "exited" events. + # TODO: Ensure we see the 'terminated' and 'exited' events. raise NotImplementedError @unittest.skip('re-attach needs fixing') @@ -616,17 +619,17 @@ class LifecycleTests(LifecycleTestsBase): }] options = { - "pathMappings": [ + 'pathMappings': [ { - "localRoot": os.path.dirname(filename), - "remoteRoot": os.path.dirname(filename) + 'localRoot': os.path.dirname(filename), + 'remoteRoot': os.path.dirname(filename) }, # This specific mapping is for Mac. # For some reason temp paths on Mac get prefixed with # `private` when returned from ptvsd. { - "localRoot": os.path.dirname(filename), - "remoteRoot": '/private' + os.path.dirname(filename) + 'localRoot': os.path.dirname(filename), + 'remoteRoot': '/private' + os.path.dirname(filename) } ] } @@ -650,14 +653,15 @@ class LifecycleTests(LifecycleTestsBase): threads=True) # Grab the initial output. - out1 = next(adapter.output) # "waiting for attach" + out1 = next(adapter.output) # 'waiting for attach' line = adapter.output.readline() while line: out1 += line line = adapter.output.readline() done1() req_bps, = reqs_bps # There should only be one. - tid = result['msg'].body['threadId'] + event = result['msg'] + tid = event.body['threadId'] req_threads2 = session.send_request('threads') req_stacktrace1 = session.send_request( 'stackTrace', diff --git a/tests/system_tests/test_remote.py b/tests/system_tests/test_remote.py index 74b900fb..255238e0 100644 --- a/tests/system_tests/test_remote.py +++ b/tests/system_tests/test_remote.py @@ -1,36 +1,38 @@ import os import os.path -import socket -from . import (_strip_newline_output_events, lifecycle_handshake, - LifecycleTestsBase, DebugInfo, ROOT, PORT) +from tests.helpers.resource import TestResources +from tests.helpers.socket import resolve_hostname +from . import ( + _strip_newline_output_events, lifecycle_handshake, + LifecycleTestsBase, DebugInfo, PORT, +) -TEST_FILES_DIR = os.path.join(ROOT, 'tests', 'resources', 'system_tests', - 'test_basic') + +TEST_FILES = TestResources.from_module('tests.system_tests.test_basic') +WITH_OUTPUT = TEST_FILES.sub('test_output') class RemoteTests(LifecycleTestsBase): + def run_test_attach(self, debug_info): - options = {"debugOptions": ["RedirectOutput"]} + options = {'debugOptions': ['RedirectOutput']} with self.start_debugging(debug_info) as dbg: - (_, _, _, _, _, _) = lifecycle_handshake( - dbg.session, debug_info.starttype, options=options) + lifecycle_handshake(dbg.session, debug_info.starttype, + options=options) received = list(_strip_newline_output_events(dbg.session.received)) - self.assert_contains( - received, - [ - self.new_event("output", category="stdout", output="yes"), - self.new_event("output", category="stderr", output="no"), - ], - ) + self.assert_contains(received, [ + self.new_event('output', category='stdout', output='yes'), + self.new_event('output', category='stderr', output='no'), + ]) class AttachFileTests(RemoteTests): + def test_attach_localhost(self): - filename = os.path.join(TEST_FILES_DIR, 'test_output', - 'attach_output.py') + filename = WITH_OUTPUT.resolve('attach_output.py') cwd = os.path.dirname(filename) argv = ['localhost', str(PORT)] self.run_test_attach( @@ -39,11 +41,12 @@ class AttachFileTests(RemoteTests): attachtype='import', cwd=cwd, starttype='attach', - argv=argv)) + argv=argv, + ), + ) def test_attach_127001(self): - filename = os.path.join(TEST_FILES_DIR, 'test_output', - 'attach_output.py') + filename = WITH_OUTPUT.resolve('attach_output.py') cwd = os.path.dirname(filename) argv = ['127.0.0.1', str(PORT)] self.run_test_attach( @@ -52,11 +55,12 @@ class AttachFileTests(RemoteTests): attachtype='import', cwd=cwd, starttype='attach', - argv=argv)) + argv=argv, + ), + ) def test_attach_0000(self): - filename = os.path.join(TEST_FILES_DIR, 'test_output', - 'attach_output.py') + filename = WITH_OUTPUT.resolve('attach_output.py') cwd = os.path.dirname(filename) argv = ['0.0.0.0', str(PORT)] self.run_test_attach( @@ -65,14 +69,16 @@ class AttachFileTests(RemoteTests): attachtype='import', cwd=cwd, starttype='attach', - argv=argv)) + argv=argv, + ), + ) def test_attach_byip(self): - filename = os.path.join(TEST_FILES_DIR, 'test_output', - 'attach_output.py') + filename = WITH_OUTPUT.resolve('attach_output.py') cwd = os.path.dirname(filename) argv = ['0.0.0.0', str(PORT)] - ip = socket.gethostbyname(socket.gethostname()) + ip = resolve_hostname() + self.run_test_attach( DebugInfo( filename=filename, @@ -80,4 +86,6 @@ class AttachFileTests(RemoteTests): host=ip, cwd=cwd, starttype='attach', - argv=argv)) + argv=argv, + ), + ) diff --git a/tests/system_tests/test_restart_vsc.py b/tests/system_tests/test_restart_vsc.py index 2287e52d..0c55380e 100644 --- a/tests/system_tests/test_restart_vsc.py +++ b/tests/system_tests/test_restart_vsc.py @@ -2,26 +2,27 @@ import os import os.path import unittest -from ptvsd.wrapper import INITIALIZE_RESPONSE # noqa +from tests.helpers.resource import TestResources +from . import ( + _strip_newline_output_events, lifecycle_handshake, + LifecycleTestsBase, DebugInfo, +) -from . import (_strip_newline_output_events, lifecycle_handshake, - LifecycleTestsBase, DebugInfo, ROOT) -TEST_FILES_DIR = os.path.join(ROOT, 'tests', 'resources', 'system_tests', - 'test_forever') +TEST_FILES = TestResources.from_module('tests.system_tests.test_forever') @unittest.skip('Needs fixing in #530') class RestartVSCTests(LifecycleTestsBase): + def test_disconnect_without_restart(self): - filename = os.path.join(TEST_FILES_DIR, 'launch_forever.py') + filename = TEST_FILES.resolve('launch_forever.py') cwd = os.path.dirname(filename) debug_info = DebugInfo(filename=filename, cwd=cwd) with self.start_debugging(debug_info) as dbg: - (_, req_launch, _, _, _, _) = lifecycle_handshake( - dbg.session, debug_info.starttype) - + (_, req_launch, _, _, _, _ + ) = lifecycle_handshake(dbg.session, debug_info.starttype) req_launch.wait() dbg.session.send_request('disconnect', restart=False) @@ -31,14 +32,13 @@ class RestartVSCTests(LifecycleTestsBase): self.assertEqual(len(evts), 1) def test_disconnect_with_restart(self): - filename = os.path.join(TEST_FILES_DIR, 'launch_forever.py') + filename = TEST_FILES.resolve('launch_forever.py') cwd = os.path.dirname(filename) debug_info = DebugInfo(filename=filename, cwd=cwd) with self.start_debugging(debug_info) as dbg: - (_, req_launch, _, _, _, _) = lifecycle_handshake( - dbg.session, debug_info.starttype) - + (_, req_launch, _, _, _, _ + ) = lifecycle_handshake(dbg.session, debug_info.starttype) req_launch.wait() dbg.session.send_request('disconnect', restart=True) diff --git a/tests/system_tests/test_schema.py b/tests/system_tests/test_schema.py index a11d0f4a..84fd0564 100644 --- a/tests/system_tests/test_schema.py +++ b/tests/system_tests/test_schema.py @@ -124,7 +124,7 @@ class DownloadCommandTests(unittest.TestCase): metadata = '\n'.join(line for line in metadata.split('\n') if not line.startswith('downloaded: ')) - self.assertEqual(data, "") + self.assertEqual(data, '') self.assertEqual(metadata, dedent("""\ upstream: http://localhost:8000/schema.json revision: diff --git a/tests/system_tests/test_variables.py b/tests/system_tests/test_variables.py index 8dd9276b..336cc4e9 100644 --- a/tests/system_tests/test_variables.py +++ b/tests/system_tests/test_variables.py @@ -1,230 +1,250 @@ import os import os.path -from ptvsd.wrapper import INITIALIZE_RESPONSE # noqa from tests.helpers.debugsession import Awaitable +from tests.helpers.resource import TestResources +from . import ( + lifecycle_handshake, LifecycleTestsBase, DebugInfo, +) -from . import (lifecycle_handshake, LifecycleTestsBase, DebugInfo, ROOT) -TEST_FILES_DIR = os.path.join(ROOT, 'tests', 'resources', 'system_tests', - 'test_variables') +TEST_FILES = TestResources.from_module(__name__) class VariableTests(LifecycleTestsBase): + def test_variables(self): - filename = os.path.join(TEST_FILES_DIR, 'simple.py') + filename = TEST_FILES.resolve('simple.py') cwd = os.path.dirname(filename) self.run_test_variables(DebugInfo(filename=filename, cwd=cwd)) def run_test_variables(self, debug_info): bp_line = 3 breakpoints = [{ - "source": { - "path": debug_info.filename + 'source': { + 'path': debug_info.filename }, - "breakpoints": [{ - "line": bp_line + 'breakpoints': [{ + 'line': bp_line }] }] with self.start_debugging(debug_info) as dbg: session = dbg.session - with session.wait_for_event("stopped") as result: - ( - _, - req_launch_attach, - _, - reqs_bps, - _, - _, - ) = lifecycle_handshake( - session, debug_info.starttype, breakpoints=breakpoints) - + with session.wait_for_event('stopped') as result: + (_, req_launch_attach, _, _, _, _, + ) = lifecycle_handshake(session, debug_info.starttype, + breakpoints=breakpoints) req_launch_attach.wait() + event = result['msg'] + tid = event.body['threadId'] - tid = result["msg"].body["threadId"] + req_stacktrace = session.send_request( + 'stackTrace', + threadId=tid, + ) + req_stacktrace.wait() + frames = req_stacktrace.resp.body['stackFrames'] + frame_id = frames[0]['id'] + req_scopes = session.send_request( + 'scopes', + frameId=frame_id, + ) + req_scopes.wait() + scopes = req_scopes.resp.body['scopes'] + variables_reference = scopes[0]['variablesReference'] + req_variables = session.send_request( + 'variables', + variablesReference=variables_reference, + ) + req_variables.wait() + variables = req_variables.resp.body['variables'] - stacktrace = session.send_request("stackTrace", threadId=tid) - stacktrace.wait() - frame_id = stacktrace.resp.body["stackFrames"][0]["id"] - scopes = session.send_request('scopes', frameId=frame_id) - scopes.wait() - variables_reference = scopes.resp.body["scopes"][0][ - "variablesReference"] # noqa - variables = session.send_request( - 'variables', variablesReference=variables_reference) # noqa - variables.wait() - - var_b = list(b for b in variables.resp.body["variables"] - if b["name"] == "b") # noqa + var_b = list(b for b in variables if b['name'] == 'b') var_b = var_b[0] if len(var_b) == 1 else None if var_b is None: var_b_variables = None else: - var_b_ref = var_b["variablesReference"] - var_b_variables = session.send_request( - 'variables', variablesReference=var_b_ref) # noqa - var_b_variables.wait() + var_b_ref = var_b['variablesReference'] + req_variables = session.send_request( + 'variables', + variablesReference=var_b_ref, + ) + req_variables.wait() + var_b_variables = req_variables.resp.body['variables'] - var_a_evaluate = session.send_request( - 'evaluate', expression="a", frameId=frame_id) - var_b_one_evaluate = session.send_request( + req_evaluate1 = session.send_request( 'evaluate', - expression="b['one']", # noqa - frameId=frame_id) + expression='a', + frameId=frame_id, + ) + req_evaluate2 = session.send_request( + 'evaluate', + expression="b['one']", + frameId=frame_id, + ) + Awaitable.wait_all(req_evaluate1, req_evaluate2) + var_a_evaluate = req_evaluate1.resp.body + var_b_one_evaluate = req_evaluate2.resp.body - Awaitable.wait_all(var_a_evaluate, var_b_one_evaluate) - - session.send_request("continue", threadId=tid) + session.send_request('continue', threadId=tid) # Variables for a, b, __file__, __main__ - self.assertGreaterEqual(len(variables.resp.body["variables"]), 3) - expected_variables = [{ - "name": "a", - "type": "int", - "value": "1", - "evaluateName": "a" + self.assertGreaterEqual(len(variables), 3) + self.assert_is_subset(variables, [{ + 'name': 'a', + 'type': 'int', + 'value': '1', + 'evaluateName': 'a' }, { - "name": "b", - "type": "dict", - "value": "{'one': 1, 'two': 2}", - "evaluateName": "b" + 'name': 'b', + 'type': 'dict', + 'value': "{'one': 1, 'two': 2}", + 'evaluateName': 'b' }, { - "name": "__builtins__", - "type": "dict", - "evaluateName": "__builtins__" + 'name': '__builtins__', + 'type': 'dict', + 'evaluateName': '__builtins__' }, { - "name": "__doc__", - "type": "NoneType", - "value": "None", - "evaluateName": "__doc__" + 'name': '__doc__', + 'type': 'NoneType', + 'value': 'None', + 'evaluateName': '__doc__' }, { - "name": "__file__", - "type": "str", - "presentationHint": { - "attributes": ["rawString"] + 'name': '__file__', + 'type': 'str', + 'presentationHint': { + 'attributes': ['rawString'] }, - "evaluateName": "__file__" + 'evaluateName': '__file__' }, { - "name": "__loader__", - "type": "SourceFileLoader", - "evaluateName": "__loader__" + 'name': '__loader__', + 'type': 'SourceFileLoader', + 'evaluateName': '__loader__' }, { - "name": "__name__", - "type": "str", - "value": "'__main__'", - "presentationHint": { - "attributes": ["rawString"] + 'name': '__name__', + 'type': 'str', + 'value': "'__main__'", + 'presentationHint': { + 'attributes': ['rawString'] }, - "evaluateName": "__name__" + 'evaluateName': '__name__' }, { - "name": "__package__", - "type": "NoneType", - "value": "None", - "evaluateName": "__package__" + 'name': '__package__', + 'type': 'NoneType', + 'value': 'None', + 'evaluateName': '__package__' }, { - "name": "__spec__", - "type": "NoneType", - "value": "None", - "evaluateName": "__spec__" - }] - self.assert_is_subset(variables.resp.body["variables"], - expected_variables) # noqa - expected_var_a_eval = {"type": "int", "result": "1"} - var_a_evaluate.resp.body == expected_var_a_eval + 'name': '__spec__', + 'type': 'NoneType', + 'value': 'None', + 'evaluateName': '__spec__' + }]) + self.assertEqual(var_a_evaluate, { + 'type': 'int', + 'result': '1', + }) assert var_b_variables is not None - expected_var_b = { - "variables": [{ - "type": "int", - "value": "1", - "evaluateName": "b['one']" - }, { - "type": "int", - "value": "2", - "evaluateName": "b['two']" - }, { - "name": "__len__", - "type": "int", - "value": "2", - "evaluateName": "b.__len__" - }] - } - self.assert_is_subset(var_b_variables.resp.body, - expected_var_b) # noqa + self.assert_is_subset(var_b_variables, [{ + 'type': 'int', + 'value': '1', + 'evaluateName': "b['one']" + }, { + 'type': 'int', + 'value': '2', + 'evaluateName': "b['two']" + }, { + 'name': '__len__', + 'type': 'int', + 'value': '2', + 'evaluateName': 'b.__len__' + }]) - expected_var_b_eval = {"type": " int", "result": "1"} - var_b_one_evaluate.resp.body == expected_var_b_eval + self.assertEqual(var_b_one_evaluate, { + 'type': 'int', + 'result': '1', + }) def test_variable_sorting(self): - filename = os.path.join(TEST_FILES_DIR, 'for_sorting.py') + filename = TEST_FILES.resolve('for_sorting.py') cwd = os.path.dirname(filename) self.run_test_variable_sorting(DebugInfo(filename=filename, cwd=cwd)) def run_test_variable_sorting(self, debug_info): bp_line = 16 breakpoints = [{ - "source": { - "path": debug_info.filename + 'source': { + 'path': debug_info.filename }, - "breakpoints": [{ - "line": bp_line + 'breakpoints': [{ + 'line': bp_line }] }] with self.start_debugging(debug_info) as dbg: session = dbg.session - with session.wait_for_event("stopped") as result: - ( - _, - req_launch_attach, - _, - reqs_bps, - _, - _, - ) = lifecycle_handshake( - session, debug_info.starttype, breakpoints=breakpoints) - + with session.wait_for_event('stopped') as result: + (_, req_launch_attach, _, _, _, _, + ) = lifecycle_handshake(session, debug_info.starttype, + breakpoints=breakpoints) req_launch_attach.wait() - tid = result["msg"].body["threadId"] + event = result['msg'] + tid = event.body['threadId'] - stacktrace = session.send_request("stackTrace", threadId=tid) - stacktrace.wait() - frame_id = stacktrace.resp.body["stackFrames"][0]["id"] - scopes = session.send_request('scopes', frameId=frame_id) - scopes.wait() - variables_reference = scopes.resp.body["scopes"][0][ - "variablesReference"] # noqa - variables = session.send_request( - 'variables', variablesReference=variables_reference) # noqa - variables.wait() + req_stacktrace = session.send_request( + 'stackTrace', + threadId=tid, + ) + req_stacktrace.wait() + frames = req_stacktrace.resp.body['stackFrames'] + frame_id = frames[0]['id'] + req_scopes = session.send_request( + 'scopes', + frameId=frame_id, + ) + req_scopes.wait() + scopes = req_scopes.resp.body['scopes'] + variables_reference = scopes[0]['variablesReference'] + req_variables = session.send_request( + 'variables', + variablesReference=variables_reference, + ) + req_variables.wait() + variables = req_variables.resp.body['variables'] - try: - b_dict_var = list(v for v in variables.resp.body["variables"] - if v["name"] == 'b_test')[0] - b_dict_var_ref = b_dict_var["variablesReference"] - b_dict_var_items = session.send_request( - 'variables', variablesReference=b_dict_var_ref) # noqa - b_dict_var_items.wait() - except IndexError: + b_dict_vars = list(v for v in variables if v['name'] == 'b_test') + if not b_dict_vars: b_dict_var_items = None + else: + b_dict_var, = b_dict_vars + b_dict_var_ref = b_dict_var['variablesReference'] + req_variables = session.send_request( + 'variables', + variablesReference=b_dict_var_ref, + ) + req_variables.wait() + b_dict_var_items = req_variables.resp.body['variables'] - try: - c_dict_var = list(v for v in variables.resp.body["variables"] - if v["name"] == 'c_test')[0] - c_dict_var_ref = c_dict_var["variablesReference"] - c_dict_var_items = session.send_request( - 'variables', variablesReference=c_dict_var_ref) # noqa - c_dict_var_items.wait() - except IndexError: - c_dict_var_items = None + #c_dict_vars = list(v for v in variables if v['name'] == 'c_test') + #if not c_dict_vars: + # c_dict_var_items = None + #else: + # c_dict_var, = c_dict_vars + # c_dict_var_ref = c_dict_var['variablesReference'] + # req_variables = session.send_request( + # 'variables', + # variablesReference=c_dict_var_ref, + # ) + # req_variables.wait() + # c_dict_var_items = req_variables.resp.body['variables'] - session.send_request("continue", threadId=tid) + session.send_request('continue', threadId=tid) - variables_to_check = list(v["name"] - for v in variables.resp.body["variables"] - if v["name"].find('_test') > 0) + variables_to_check = list(v['name'] + for v in variables + if v['name'].find('_test') > 0) expected_var_order = [ 'a_test', 'b_test', 'c_test', '_a_test', '_b_test', '_c_test', '__a_test', '__b_test', '__c_test', '__a_test__', '__b_test__', @@ -233,16 +253,15 @@ class VariableTests(LifecycleTestsBase): self.assertEqual(expected_var_order, variables_to_check) # Dict keys are sorted normally, i.e., the '_' rules don't apply - # TODO: v["name"][1:5] is needed due to bug #45 - b_dict_var_keys = list(v["name"][1:5] - for v in b_dict_var_items.resp.body["variables"] - if not v["name"].startswith('__')) # noqa + # TODO: v['name'][1:5] is needed due to bug #45 + b_dict_var_keys = list(v['name'][1:5] + for v in b_dict_var_items + if not v['name'].startswith('__')) expected_b_dict_var_keys_order = ['abcd', 'eggs', 'spam'] self.assertEqual(b_dict_var_keys, expected_b_dict_var_keys_order) # TODO: Numeric dict keys have following issues # bug: #45 and #213 - # c_dict_var_keys = list(v["name"] - # for v in c_dict_var_items.resp.body["variables"]) # noqa + # c_dict_var_keys = list(v['name'] for v in c_dict_var_items) # expected_c_dict_var_keys_order = ['1', '2', '10', '__len__'] # self.assertEqual(c_dict_var_keys, expected_c_dict_var_keys_order)