Revert "Revert "Do not wait for settrace() to finish until wait_for_attach(). (#628)" (#666)" (#670)

(helps #545)

This reverts commit 80d86b9.

Note that this still suffers from the caveat that if attach doesn't happen within a second (and wait_for_attach() isn't called) then the main thread will never have debugging enabled. One way or another that will need to be addressed.
This commit is contained in:
Eric Snow 2018-07-17 14:25:53 -06:00 committed by GitHub
parent 475a2d1513
commit c99f884ae0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 424 additions and 113 deletions

View file

@ -1,9 +1,27 @@
import pydevd
from ptvsd._util import debug, new_hidden_thread
from ptvsd.pydevd_hooks import install, start_server
from ptvsd.socket import Address
def _pydevd_settrace(redirect_output=None, _pydevd=pydevd, **kwargs):
if redirect_output is not None:
kwargs.setdefault('stdoutToServer', redirect_output)
kwargs.setdefault('stderrToServer', redirect_output)
# pydevd.settrace() only enables debugging of the current
# thread and all future threads. PyDevd is not enabled for
# existing threads (other than the current one). Consequently,
# pydevd.settrace() must be called ASAP in the current thread.
# See issue #509.
#
# This is tricky, however, because settrace() will block until
# it receives a CMD_RUN message. You can't just call it in a
# thread to avoid blocking; doing so would prevent the current
# thread from being debugged.
_pydevd.settrace(**kwargs)
# TODO: Split up enable_attach() to align with module organization.
# This should including making better use of Daemon (e,g, the
# start_server() method).
@ -13,9 +31,12 @@ from ptvsd.socket import Address
def enable_attach(address,
on_attach=(lambda: None),
redirect_output=True,
_pydevd=pydevd, _install=install,
_pydevd=pydevd,
_install=install,
_settrace=_pydevd_settrace,
**kwargs):
addr = Address.as_server(*address)
debug('installing ptvsd as server')
# pydevd.settrace() forces a "client" connection, so we trick it
# by setting start_client to start_server..
daemon = _install(
@ -26,12 +47,43 @@ def enable_attach(address,
singlesession=False,
**kwargs
)
# Only pass the port so start_server() gets triggered.
_pydevd.settrace(
host=addr.host,
stdoutToServer=redirect_output,
stderrToServer=redirect_output,
port=addr.port,
suspend=False,
)
return daemon
def start_pydevd():
debug('enabling pydevd')
# Only pass the port so start_server() gets triggered.
# As noted above, we also have to trick settrace() because it
# *always* forces a client connection.
_settrace(
stdoutToServer=redirect_output,
stderrToServer=redirect_output,
port=addr.port,
suspend=False,
_pydevd=_pydevd,
)
debug('pydevd enabled')
t = new_hidden_thread('start-pydevd', start_pydevd)
t.start()
def wait(timeout=None):
t.join(timeout)
return not t.is_alive()
def debug_current_thread(suspend=False, **kwargs):
# Make sure that pydevd has finished starting before enabling
# in the current thread.
t.join()
debug('enabling pydevd (current thread)')
_settrace(
host=None, # ignored
stdoutToServer=False, # ignored
stderrToServer=False, # ignored
port=None, # ignored
suspend=suspend,
trace_only_current_thread=True,
overwrite_prev_trace=True,
patch_multiprocessing=False,
_pydevd=_pydevd,
**kwargs
)
debug('pydevd enabled (current thread)')
return daemon, wait, debug_current_thread

View file

@ -4,18 +4,22 @@
import threading
import pydevd
# TODO: Why import run_module & run_file?
from ptvsd._local import run_module, run_file # noqa
from ptvsd._remote import enable_attach as ptvsd_enable_attach
from ptvsd._remote import (
enable_attach as ptvsd_enable_attach, _pydevd_settrace,
)
WAIT_TIMEOUT = 1.0
DEFAULT_HOST = '0.0.0.0'
DEFAULT_PORT = 5678
_enabled = False
_attached = threading.Event()
_debug_current_thread = None
_pending_threads = set()
def wait_for_attach(timeout=None):
@ -30,6 +34,14 @@ def wait_for_attach(timeout=None):
"""
_attached.wait(timeout)
tid = threading.current_thread().ident
if tid in _pending_threads:
_pending_threads.remove(tid)
# Enable pydevd in the current thread. This is necessary because
# we started pydevd in a new thread. We must do it here because
# that previous invocation must have finished already.
_debug_current_thread()
def enable_attach(address=(DEFAULT_HOST, DEFAULT_PORT), redirect_output=True):
"""Enables a client to attach to this process remotely to debug Python code.
@ -65,11 +77,29 @@ def enable_attach(address=(DEFAULT_HOST, DEFAULT_PORT), redirect_output=True):
_enabled = True
_attached.clear()
ptvsd_enable_attach(
# Note: this only starts pydevd (e.g. sets it up) and enables
# debugging for *future* threads. It does not actually enable
# debugging in the *current* thread. That is done in
# wait_for_attach(). Thus this approach is problematic if
# wait_for_attach() is never called.
# TODO: Is there any way to ensure that debug_current_thread()
# gets called in the current thread, regardless of if
# wait_for_attach() gets called?
_, wait, debug_current_thread = ptvsd_enable_attach(
address,
on_attach=_attached.set,
redirect_output=redirect_output,
)
global _debug_current_thread
_debug_current_thread = debug_current_thread
# Give it a chance to finish starting. This helps reduce possible
# issues due to relying on wait_for_attach().
if wait(WAIT_TIMEOUT):
debug_current_thread()
else:
_pending_threads.add(threading.current_thread().ident)
# TODO: Add disable_attach()?
@ -87,7 +117,7 @@ def break_into_debugger():
return
import sys
pydevd.settrace(
_pydevd_settrace(
suspend=True,
trace_only_current_thread=True,
patch_multiprocessing=False,

View file

@ -147,6 +147,7 @@ class DaemonBase(object):
assert self._sessionlock is None
assert self.session is None
self._server = create_server(addr.host, addr.port)
debug('server socket created')
self._sessionlock = threading.Lock()
sock = self._sock

View file

@ -31,12 +31,9 @@ def start_server(daemon, host, port, **kwargs):
debug('failed:', exc, tb=True)
return None
while True:
def serve_forever():
debug('waiting on initial connection')
handle_next()
break
def serve_forever():
while True:
debug('waiting on next connection')
try:

View file

@ -416,12 +416,16 @@ class PydevdSocket(object):
def pydevd_notify(self, cmd_id, args):
# TODO: docstring
if self.pipe_w is None:
raise EOFError
seq, s = self.make_packet(cmd_id, args)
_util.log_pydevd_msg(cmd_id, seq, args, inbound=False)
os.write(self.pipe_w, s.encode('utf8'))
def pydevd_request(self, loop, cmd_id, args):
# TODO: docstring
if self.pipe_w is None:
raise EOFError
seq, s = self.make_packet(cmd_id, args)
_util.log_pydevd_msg(cmd_id, seq, args, inbound=False)
fut = loop.create_future()

View file

@ -52,6 +52,7 @@ COPIED_ENV = [
'WINDIR',
]
SERVER_READY_TIMEOUT = 3.0 # seconds
try:
ConnectionRefusedError
@ -82,7 +83,7 @@ def _copy_env(verbose=False, env=None):
return variables
def wait_for_socket_server(addr, timeout=3.0, **kwargs):
def wait_for_socket_server(addr, timeout=SERVER_READY_TIMEOUT):
start_time = time.time()
while True:
try:
@ -177,9 +178,11 @@ class DebugAdapter(Closeable):
@classmethod
def start_for_attach(cls, addr, *args, **kwargs):
srvtimeout = kwargs.pop('srvtimeout', SERVER_READY_TIMEOUT)
addr = Address.as_server(*addr)
adapter = cls._start_as(addr, *args, server=True, **kwargs)
wait_for_socket_server(addr)
if srvtimeout is not None:
wait_for_socket_server(addr, timeout=srvtimeout)
return adapter
@classmethod
@ -205,6 +208,8 @@ class DebugAdapter(Closeable):
@classmethod
def start_embedded(cls, addr, filename, argv=[], **kwargs):
# ptvsd.enable_attach() slows things down, so we must wait longer.
srvtimeout = kwargs.pop('srvtimeout', SERVER_READY_TIMEOUT + 2)
addr = Address.as_server(*addr)
with open(filename, 'r+') as scriptfile:
content = scriptfile.read()
@ -212,7 +217,8 @@ class DebugAdapter(Closeable):
assert 'ptvsd.enable_attach' in content
adapter = cls.start_wrapper_script(
filename, argv=argv, addr=addr, **kwargs)
wait_for_socket_server(addr, **kwargs)
if srvtimeout is not None:
wait_for_socket_server(addr, timeout=srvtimeout)
return adapter
@classmethod

View file

@ -179,6 +179,9 @@ class DebugSession(Closeable):
wait = args.pop('wait', False)
req = self._create_request(command, **args)
if self.VERBOSE:
msg = parse_message(req)
print(' <-', msg)
if wait:
with self.wait_for_response(req) as resp:

View file

@ -66,6 +66,13 @@ def find_line(script, label):
########################
# wait points
def _indent(script, line):
indent = ' ' * (len(line) - len(line.lstrip(' ')))
if not indent:
return script
return indent + (os.linesep + indent).join(script.splitlines())
def insert_release(script, lockfile, label=None):
"""Return (script, wait func) after adding a done script to the original.
@ -85,6 +92,7 @@ def insert_release(script, lockfile, label=None):
lines = iter(script.splitlines())
for line, matched in iter_until_label(lines, label):
if matched:
donescript = _indent(donescript, line)
leading.extend([
donescript,
line,
@ -143,6 +151,7 @@ def insert_lock(script, lockfile, label=None, timeout=5):
lines = iter(script.splitlines())
for line, matched in iter_until_label(lines, label):
if matched:
waitscript = _indent(waitscript, line)
leading.extend([
waitscript,
line,

View file

@ -2,10 +2,14 @@ import ptvsd
import sys
import time
ptvsd.enable_attach((sys.argv[1], sys.argv[2]))
ptvsd.wait_for_attach()
i = 0
while True:
time.sleep(0.1)
# <bp>
print(i)
i += 1

View file

@ -23,8 +23,8 @@ PORT = 9879
CONNECT_TIMEOUT = 5.0
DELAY_WAITING_FOR_SOCKETS = 1.0
DebugInfo = namedtuple('DebugInfo', 'host port starttype argv filename modulename env cwd attachtype') # noqa
DebugInfo.__new__.__defaults__ = ('localhost', PORT, 'launch', []) + ((None, ) * (len(DebugInfo._fields) - 4)) # noqa
DebugInfo = namedtuple('DebugInfo', 'host port starttype argv filename modulename env cwd attachtype verbose') # noqa
DebugInfo.__new__.__defaults__ = ('localhost', PORT, 'launch', [], None, None, None, None, None, False) # noqa
Debugger = namedtuple('Debugger', 'session adapter')
@ -153,20 +153,48 @@ def lifecycle_handshake(session, command='launch', options=None,
adapterID='spam',
)
req_command = session.send_request(command, **options or {})
req_command.wait()
req_threads = session.send_request('threads') if threads else None
reqs_bps, reqs_exc, req_done = _configure(
session,
breakpoints,
excbreakpoints,
)
return (req_initialize, req_command, req_done,
reqs_bps, reqs_exc, req_threads)
def _configure(session, breakpoints, excbreakpoints):
reqs_bps = []
reqs_exc = []
for req in breakpoints or ():
reqs_bps.append(
session.send_request('setBreakpoints', **req))
reqs_exc = []
for req in excbreakpoints or ():
reqs_bps.append(
reqs_exc.append(
session.send_request('setExceptionBreakpoints', **req))
# All config requests must be done before sending "configurationDone".
for req in reqs_bps + reqs_exc:
req.wait()
req_done = session.send_request('configurationDone')
return (req_initialize, req_command, req_done,
reqs_bps, reqs_exc, req_threads)
return reqs_bps, reqs_exc, req_done
def react_to_stopped(session, tid):
req_threads = session.send_request('threads')
req_threads.wait()
req_stacktrace = session.send_request(
'stackTrace',
threadId=tid,
)
req_stacktrace.wait()
return req_threads, req_stacktrace
class TestsBase(object):
@ -265,6 +293,8 @@ Original Error:
_kill_proc(adapter.pid)
_wrap_and_reraise(session, ex, exc_type, exc_value, exc_traceback)
if debug_info.verbose:
DebugAdapter.VERBOSE = True
if debug_info.attachtype == 'import' and \
debug_info.modulename is not None:
argv = debug_info.argv
@ -285,12 +315,14 @@ Original Error:
debug_info.starttype == 'attach' and \
debug_info.filename is not None:
argv = debug_info.argv
with DebugAdapter.start_embedded(
addr,
debug_info.filename,
argv=argv,
env=env,
cwd=cwd) as adapter:
adapter = DebugAdapter.start_embedded(
addr,
debug_info.filename,
argv=argv,
env=env,
cwd=cwd,
)
with adapter:
with DebugClient() as editor:
time.sleep(DELAY_WAITING_FOR_SOCKETS)
session = editor.attach_socket(addr, adapter)
@ -307,13 +339,15 @@ Original Error:
name = debug_info.modulename
kind = 'module'
argv = debug_info.argv
with DebugAdapter.start_for_attach(
addr,
name=name,
extra=argv,
kind=kind,
env=env,
cwd=cwd) as adapter:
adapter = DebugAdapter.start_for_attach(
addr,
name=name,
extra=argv,
kind=kind,
env=env,
cwd=cwd,
)
with adapter:
with DebugClient() as editor:
time.sleep(DELAY_WAITING_FOR_SOCKETS)
session = editor.attach_socket(addr, adapter)

View file

@ -0,0 +1,148 @@
import unittest
from ptvsd import attach_server
from ptvsd.socket import Address
from tests import PROJECT_ROOT
from tests.helpers.debugadapter import DebugAdapter
from tests.helpers.debugclient import EasyDebugClient as DebugClient
from tests.helpers.lock import LockTimeoutError
from tests.helpers.script import set_lock, set_release, find_line
from . import LifecycleTestsBase, PORT, lifecycle_handshake
class EnableAttachTests(LifecycleTestsBase, unittest.TestCase):
def setUp(self):
super(EnableAttachTests, self).setUp()
self._orig_wait_timeout = attach_server.WAIT_TIMEOUT
attach_server.WAIT_TIMEOUT = 1.0
def tearDown(self):
super(EnableAttachTests, self).tearDown()
attach_server.WAIT_TIMEOUT = self._orig_wait_timeout
def test_does_not_block(self):
addr = Address('localhost', PORT)
filename = self.write_script('spam.py', """
import sys
sys.path.insert(0, {!r})
import ptvsd
ptvsd.enable_attach({}, redirect_output=False)
# <ready>
""".format(PROJECT_ROOT, tuple(addr)),
)
lockfile = self.workspace.lockfile()
_, wait = set_release(filename, lockfile, 'ready')
#DebugAdapter.VERBOSE = True
adapter = DebugAdapter.start_embedded(addr, filename)
with adapter:
wait(timeout=3)
adapter.wait()
@unittest.skip('fails due to "stopped" event never happening')
def test_never_call_wait_for_attach(self):
addr = Address('localhost', PORT)
filename = self.write_script('spam.py', """
import sys
import threading
import time
sys.path.insert(0, {!r})
import ptvsd
ptvsd.enable_attach({}, redirect_output=False)
# <ready>
print('== ready ==')
# Allow tracing to be triggered.
def wait():
# <wait>
pass
t = threading.Thread(target=wait)
t.start()
for _ in range(100): # 10 seconds
print('-----')
t.join(0.1)
if not t.is_alive():
break
t.join()
print('== starting ==')
# <bp>
print('== done ==')
""".format(PROJECT_ROOT, tuple(addr)),
)
lockfile1 = self.workspace.lockfile('ready.lock')
_, wait = set_release(filename, lockfile1, 'ready')
lockfile2 = self.workspace.lockfile('wait.log')
done, script = set_lock(filename, lockfile2, 'wait')
bp = find_line(script, 'bp')
breakpoints = [{
'source': {'path': filename},
'breakpoints': [
{'line': bp},
],
}]
#DebugAdapter.VERBOSE = True
#DebugClient.SESSION.VERBOSE = True
adapter = DebugAdapter.start_embedded(
addr,
filename,
srvtimeout=None,
)
with adapter:
# Wait longer that WAIT_TIMEOUT, so that debugging isn't
# immediately enabled in the script's thread.
wait(timeout=3.0)
with DebugClient() as editor:
session = editor.attach_socket(addr, adapter, timeout=1)
with session.wait_for_event('thread') as result:
lifecycle_handshake(session, 'attach',
breakpoints=breakpoints,
threads=True)
event = result['msg']
tid = event.body['threadId']
with session.wait_for_event('stopped'):
done()
session.send_request('continue', threadId=tid)
adapter.wait()
out = str(adapter.output)
self.assertIn('== ready ==', out)
self.assertIn('== starting ==', out)
def test_wait_for_attach(self):
addr = Address('localhost', PORT)
filename = self.write_script('spam.py', """
import sys
sys.path.insert(0, {!r})
import ptvsd
ptvsd.enable_attach({}, redirect_output=False)
ptvsd.wait_for_attach()
# <ready>
# <wait>
""".format(PROJECT_ROOT, tuple(addr)),
)
lockfile1 = self.workspace.lockfile()
_, wait = set_release(filename, lockfile1, 'ready')
lockfile2 = self.workspace.lockfile()
done, _ = set_lock(filename, lockfile2, 'wait')
adapter = DebugAdapter.start_embedded(addr, filename)
with adapter:
with DebugClient() as editor:
session = editor.attach_socket(addr, adapter, timeout=1)
# Ensure that it really does wait.
with self.assertRaises(LockTimeoutError):
wait(timeout=0.5)
lifecycle_handshake(session, 'attach')
wait(timeout=1)
done()
adapter.wait()

View file

@ -15,7 +15,9 @@ from tests.helpers.debugsession import Awaitable
from . import (
_strip_newline_output_events, lifecycle_handshake, TestsBase,
LifecycleTestsBase, _strip_output_event, _strip_exit, _find_events,
PORT)
PORT, react_to_stopped,
)
ROOT = os.path.dirname(os.path.dirname(ptvsd.__file__))
@ -223,6 +225,7 @@ class LifecycleTests(LifecycleTestsBase):
sys.path.insert(0, {!r})
import ptvsd
ptvsd.enable_attach({}, redirect_output={})
ptvsd.wait_for_attach()
print('success!', end='')
@ -343,6 +346,7 @@ class LifecycleTests(LifecycleTestsBase):
addr = {}
ptvsd.enable_attach(addr)
ptvsd.wait_for_attach()
# <before>
print('==before==')
@ -573,13 +577,13 @@ class LifecycleTests(LifecycleTestsBase):
addr = {}
ptvsd.enable_attach(addr)
print('waiting for attach')
print('== waiting for attach ==')
# <waiting>
ptvsd.wait_for_attach()
# <attached>
print('attached!')
print('== attached! ==')
# <bp 2>
print('done waiting')
print('== done waiting ==')
""".format(ROOT, tuple(addr)))
lockfile1 = self.workspace.lockfile()
done1, _ = set_lock(filename, lockfile1, 'waiting')
@ -618,35 +622,35 @@ class LifecycleTests(LifecycleTestsBase):
with DebugClient() as editor:
session = editor.attach_socket(addr, adapter, timeout=1)
# TODO: There appears to be a small race that may
# cause the test to fail here.
with session.wait_for_event('stopped'):
with session.wait_for_event('thread') as result:
with session.wait_for_event('process'):
(req_init, req_attach, req_config,
reqs_bps, _, req_threads1,
) = lifecycle_handshake(session, 'attach',
breakpoints=breakpoints,
options=options,
threads=True)
# Grab the initial output.
out1 = next(adapter.output) # 'waiting for attach'
line = adapter.output.readline()
while line:
out1 += line
line = adapter.output.readline()
done1()
with session.wait_for_event('thread') as result:
with session.wait_for_event('process'):
(req_init, req_attach, req_config,
reqs_bps, _, req_threads1,
) = lifecycle_handshake(session, 'attach',
breakpoints=breakpoints,
options=options,
threads=True)
req_bps, = reqs_bps # There should only be one.
event = result['msg']
tid = event.body['threadId']
req_threads2 = session.send_request('threads')
req_stacktrace1 = session.send_request(
'stackTrace',
threadId=tid,
)
out2 = str(adapter.output)
event = result['msg']
tid = event.body['threadId']
# Grab the initial output.
out1 = next(adapter.output) # "waiting for attach"
line = adapter.output.readline()
while line:
out1 += line
line = adapter.output.readline()
with session.wait_for_event('stopped'):
# Tell the script to proceed (at "# <waiting>").
# This leads to the first breakpoint.
done1()
req_threads2, req_stacktrace1 = react_to_stopped(session, tid)
out2 = str(adapter.output) # ""
# Tell the script to proceed (at "# <bp 2>"). This
# leads to the second breakpoint. At this point
# execution is still stopped at the first breakpoint.
done2()
with session.wait_for_event('stopped'):
with session.wait_for_event('continued'):
@ -654,27 +658,25 @@ class LifecycleTests(LifecycleTestsBase):
'continue',
threadId=tid,
)
req_threads3 = session.send_request('threads')
req_stacktrace2 = session.send_request(
'stackTrace',
threadId=tid,
)
out3 = str(adapter.output)
req_continue1.wait()
req_threads3, req_stacktrace2 = react_to_stopped(session, tid)
out3 = str(adapter.output) # "attached!"
with session.wait_for_event('continued'):
req_continue2 = session.send_request(
'continue',
threadId=tid,
)
req_continue2.wait()
adapter.wait()
out4 = str(adapter.output)
out4 = str(adapter.output) # "done waiting"
# Output between enable_attach() and wait_for_attach() may
# be sent at a relatively arbitrary time (or not at all).
# So we ignore it by removing it from the message list.
received = list(_strip_output_event(session.received,
u'waiting for attach'))
u'== waiting for attach =='))
received = list(_strip_newline_output_events(received))
# There's an ordering race with continue/continued that pops
# up occasionally. We work around that by manually fixing the
@ -762,7 +764,7 @@ class LifecycleTests(LifecycleTestsBase):
self.new_event(
'output',
category='stdout',
output='attached!',
output='== attached! ==',
),
self.new_event(
'stopped',
@ -795,7 +797,7 @@ class LifecycleTests(LifecycleTestsBase):
self.new_event(
'output',
category='stdout',
output='done waiting',
output='== done waiting ==',
),
#self.new_event(
# 'thread',

View file

@ -2,10 +2,9 @@ import os
import os.path
import signal
import sys
import time
from tests.helpers.debugsession import Awaitable
from tests.helpers.resource import TestResources
from tests.helpers.script import find_line
from tests.helpers.socket import resolve_hostname
from . import (
_strip_newline_output_events, lifecycle_handshake,
@ -53,37 +52,41 @@ class RemoteTests(LifecycleTestsBase):
'pathMappings': path_mappings
}
with open(debug_info.filename) as scriptfile:
script = scriptfile.read()
bp = find_line(script, 'bp')
with self.start_debugging(debug_info) as dbg:
(_, req_attach, _, _, _, req_threads) = lifecycle_handshake(
dbg.session,
debug_info.starttype,
options=options,
threads=True)
lifecycle_handshake(dbg.session, debug_info.starttype,
options=options,
threads=True)
# wait till we enter the for loop.
time.sleep(1)
Awaitable.wait_all(req_attach, req_threads)
with dbg.session.wait_for_event('stopped') as result:
arguments = {
'source': {
'name': os.path.basename(debug_info.filename),
'path': debug_info.filename
},
'lines': [9],
'breakpoints': [{'line': 9}]
'lines': [bp],
'breakpoints': [{'line': bp}]
}
dbg.session.send_request('setBreakpoints', **arguments)
tid = result['msg'].body['threadId']
stacktrace = dbg.session.send_request('stackTrace', threadId=tid)
stacktrace.wait()
dbg.session.send_request('continue', threadId=tid).wait()
event = result['msg']
tid = event.body['threadId']
req_stacktrace = dbg.session.send_request(
'stackTrace',
threadId=tid,
)
req_stacktrace.wait()
stacktrace = req_stacktrace.resp.body
req_continue = dbg.session.send_request('continue', threadId=tid)
req_continue.wait()
# Kill remove program.
os.kill(dbg.adapter.pid, signal.SIGTERM)
self._assert_stacktrace_is_subset(stacktrace.resp.body,
expected_stacktrace)
self._assert_stacktrace_is_subset(stacktrace, expected_stacktrace)
class AttachFileTests(RemoteTests):
@ -143,7 +146,9 @@ class AttachFileTests(RemoteTests):
host=ip,
cwd=cwd,
starttype='attach',
argv=argv))
argv=argv,
),
)
def test_source_references_should_be_returned_without_path_mappings(self):
filename = WITH_TEST_FORVER.resolve('attach_forever.py')
@ -153,7 +158,7 @@ class AttachFileTests(RemoteTests):
'stackFrames': [{
'source': {
'path': filename,
'sourceReference': 1
'sourceReference': 1,
}
}],
}
@ -163,7 +168,10 @@ class AttachFileTests(RemoteTests):
attachtype='import',
cwd=cwd,
starttype='attach',
argv=argv), expected_stacktrace)
argv=argv,
),
expected_stacktrace,
)
def test_source_references_should_not_be_returned_with_path_mappings(self):
filename = WITH_TEST_FORVER.resolve('attach_forever.py')
@ -177,7 +185,7 @@ class AttachFileTests(RemoteTests):
'stackFrames': [{
'source': {
'path': filename,
'sourceReference': 0
'sourceReference': 0,
}
}],
}
@ -187,7 +195,12 @@ class AttachFileTests(RemoteTests):
attachtype='import',
cwd=cwd,
starttype='attach',
argv=argv), expected_stacktrace, path_mappings)
argv=argv,
#verbose=True,
),
expected_stacktrace,
path_mappings,
)
def test_source_references_should_be_returned_with_invalid_path_mappings(
self):
@ -202,7 +215,7 @@ class AttachFileTests(RemoteTests):
'stackFrames': [{
'source': {
'path': filename,
'sourceReference': 1
'sourceReference': 1,
}
}],
}
@ -212,7 +225,11 @@ class AttachFileTests(RemoteTests):
attachtype='import',
cwd=cwd,
starttype='attach',
argv=argv), expected_stacktrace, path_mappings)
argv=argv,
),
expected_stacktrace,
path_mappings,
)
def test_source_references_should_be_returned_with_win_client(self):
filename = WITH_TEST_FORVER.resolve('attach_forever.py')
@ -227,7 +244,7 @@ class AttachFileTests(RemoteTests):
'stackFrames': [{
'source': {
'path': client_dir + '\\' + os.path.basename(filename),
'sourceReference': 0
'sourceReference': 0,
}
}],
}
@ -237,10 +254,12 @@ class AttachFileTests(RemoteTests):
attachtype='import',
cwd=cwd,
starttype='attach',
argv=argv),
argv=argv,
),
expected_stacktrace,
path_mappings=path_mappings,
debug_options=['WindowsClient'])
debug_options=['WindowsClient'],
)
def test_source_references_should_be_returned_with_unix_client(self):
filename = WITH_TEST_FORVER.resolve('attach_forever.py')
@ -255,7 +274,7 @@ class AttachFileTests(RemoteTests):
'stackFrames': [{
'source': {
'path': client_dir + '/' + os.path.basename(filename),
'sourceReference': 0
'sourceReference': 0,
}
}],
}
@ -265,7 +284,9 @@ class AttachFileTests(RemoteTests):
attachtype='import',
cwd=cwd,
starttype='attach',
argv=argv),
argv=argv,
),
expected_stacktrace,
path_mappings=path_mappings,
debug_options=['UnixClient'])
debug_options=['UnixClient'],
)