mirror of
https://github.com/microsoft/debugpy.git
synced 2025-12-23 08:48:12 +00:00
* Provide a single notification when a breakpoint is hit. #805 (#922) * Provide a single notification when a breakpoint is hit. #805 ptvsd requires all threads to be stopped or all threads to be running (this is a limitation for vsts), so, we generate a single notification when a breakpoint is hit and have CMD_GET_THREAD_STACK wait to get the actual stack (or if the thread is not paused in a timely manner, provide the stack as it is, but in this case it may not be possible to get the locals or interact with the frame -- issued commands will timeout). * Rename CMD_SUSPEND_ON_BREAKPOINT_EXCEPTION to CMD_PYDEVD_JSON_CONFIG. * Fixes to ptvsd tests related to differences of thread events after changes to PyDBCommandThread. * Make ptvsd use CMD_THREAD_SUSPEND_SINGLE_NOTIFICATION and CMD_THREAD_RESUME_SINGLE_NOTIFICATION. * Fixing tests. * Sleep on wait_for_attach() (should be removed later) -- it seems there's still a racing condition as wait_for_attach() seems to proceed before CMD_PYDEVD_JSON_CONFIG is passed on to pydevd. * Test changes needed to integrate 805 (#969) * Integrate 805 initial * Fix send suspend event to use single suspend event command * Fix thread run event tests. * Fix event ordering * fix reattach tests * Increase timeouts for some tests. * Fix more tests * fix typo * Skip flaky/redundent re-attach tests * more cleanup * Replace completions tests with pytests * Ensure continued is sent when the thread runs. * Dont wait for continued in completions tests. * Revert "Ensure continued is sent when the thread runs." This reverts commit caef558fcf4d890d01bf3e5694b3dbc42795aaaf. * Skip broken tests. * Parametrize completion tests
149 lines
5 KiB
Python
149 lines
5 KiB
Python
import unittest
|
|
|
|
from ptvsd import attach_server
|
|
from ptvsd.socket import Address
|
|
from tests import PROJECT_ROOT
|
|
from tests.helpers.debugadapter import DebugAdapter
|
|
from tests.helpers.debugclient import EasyDebugClient as DebugClient
|
|
from tests.helpers.lock import LockTimeoutError
|
|
from tests.helpers.script import set_lock, set_release, find_line
|
|
from . import LifecycleTestsBase, PORT, lifecycle_handshake
|
|
|
|
|
|
class EnableAttachTests(LifecycleTestsBase, unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
super(EnableAttachTests, self).setUp()
|
|
self._orig_wait_timeout = attach_server.WAIT_TIMEOUT
|
|
attach_server.WAIT_TIMEOUT = 1.0
|
|
|
|
def tearDown(self):
|
|
super(EnableAttachTests, self).tearDown()
|
|
attach_server.WAIT_TIMEOUT = self._orig_wait_timeout
|
|
|
|
@unittest.skip('Fix test #721')
|
|
def test_does_not_block(self):
|
|
addr = Address('localhost', PORT)
|
|
filename = self.write_script('spam.py', """
|
|
import sys
|
|
sys.path.insert(0, {!r})
|
|
import ptvsd
|
|
ptvsd.enable_attach({}, redirect_output=False)
|
|
# <ready>
|
|
""".format(PROJECT_ROOT, tuple(addr)),
|
|
)
|
|
lockfile = self.workspace.lockfile()
|
|
_, wait = set_release(filename, lockfile, 'ready')
|
|
|
|
#DebugAdapter.VERBOSE = True
|
|
adapter = DebugAdapter.start_embedded(addr, filename)
|
|
with adapter:
|
|
wait(timeout=3)
|
|
adapter.wait()
|
|
|
|
def test_never_call_wait_for_attach(self):
|
|
addr = Address('localhost', PORT)
|
|
filename = self.write_script('spam.py', """
|
|
import sys
|
|
import threading
|
|
import time
|
|
|
|
sys.path.insert(0, {!r})
|
|
import ptvsd
|
|
ptvsd.enable_attach({}, redirect_output=False)
|
|
# <ready>
|
|
print('== ready ==')
|
|
|
|
# Allow tracing to be triggered.
|
|
def wait():
|
|
# <wait>
|
|
pass
|
|
t = threading.Thread(target=wait)
|
|
t.start()
|
|
for _ in range(100): # 10 seconds
|
|
print('-----')
|
|
t.join(0.1)
|
|
if not t.is_alive():
|
|
break
|
|
t.join()
|
|
|
|
print('== starting ==')
|
|
# <bp>
|
|
print('== done ==')
|
|
""".format(PROJECT_ROOT, tuple(addr)),
|
|
)
|
|
lockfile1 = self.workspace.lockfile('ready.lock')
|
|
_, wait = set_release(filename, lockfile1, 'ready')
|
|
lockfile2 = self.workspace.lockfile('wait.log')
|
|
done, script = set_lock(filename, lockfile2, 'wait')
|
|
|
|
bp = find_line(script, 'bp')
|
|
breakpoints = [{
|
|
'source': {'path': filename},
|
|
'breakpoints': [
|
|
{'line': bp},
|
|
],
|
|
}]
|
|
|
|
#DebugAdapter.VERBOSE = True
|
|
#DebugClient.SESSION.VERBOSE = True
|
|
adapter = DebugAdapter.start_embedded(
|
|
addr,
|
|
filename,
|
|
srvtimeout=None,
|
|
)
|
|
with adapter:
|
|
# Wait longer that WAIT_TIMEOUT, so that debugging isn't
|
|
# immediately enabled in the script's thread.
|
|
wait(timeout=3.0)
|
|
|
|
with DebugClient() as editor:
|
|
session = editor.attach_socket(addr, adapter, timeout=1)
|
|
stopped = session.get_awaiter_for_event('stopped')
|
|
with session.wait_for_event('thread') as result:
|
|
lifecycle_handshake(session, 'attach',
|
|
breakpoints=breakpoints,
|
|
threads=True)
|
|
event = result['msg']
|
|
tid = event.body['threadId']
|
|
|
|
stopped.wait(timeout=5.0)
|
|
done()
|
|
session.send_request('continue', threadId=tid)
|
|
|
|
adapter.wait()
|
|
out = str(adapter.output)
|
|
|
|
self.assertIn('== ready ==', out)
|
|
self.assertIn('== starting ==', out)
|
|
|
|
def test_wait_for_attach(self):
|
|
addr = Address('localhost', PORT)
|
|
filename = self.write_script('spam.py', """
|
|
import sys
|
|
sys.path.insert(0, {!r})
|
|
import ptvsd
|
|
ptvsd.enable_attach({}, redirect_output=False)
|
|
|
|
ptvsd.wait_for_attach()
|
|
# <ready>
|
|
# <wait>
|
|
""".format(PROJECT_ROOT, tuple(addr)),
|
|
)
|
|
lockfile1 = self.workspace.lockfile()
|
|
_, wait = set_release(filename, lockfile1, 'ready')
|
|
lockfile2 = self.workspace.lockfile()
|
|
done, _ = set_lock(filename, lockfile2, 'wait')
|
|
|
|
adapter = DebugAdapter.start_embedded(addr, filename)
|
|
with adapter:
|
|
with DebugClient() as editor:
|
|
session = editor.attach_socket(addr, adapter, timeout=1)
|
|
# Ensure that it really does wait.
|
|
with self.assertRaises(LockTimeoutError):
|
|
wait(timeout=0.5)
|
|
|
|
lifecycle_handshake(session, 'attach')
|
|
wait(timeout=1)
|
|
done()
|
|
adapter.wait()
|