Provide disconnect/reconnect and a non-blocking way of setting up the debugger. Fixes #1382, Fixes #1248. (#1414)

This commit is contained in:
Fabio Zadrozny 2019-05-10 14:17:12 -03:00 committed by GitHub
parent 2653656cef
commit f39747ceba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 457 additions and 76 deletions

View file

@ -25,6 +25,12 @@ class PyDevdAPI(object):
def run(self, py_db):
py_db.ready_to_run = True
def notify_configuration_done(self, py_db):
py_db.on_configuration_done()
def notify_disconnect(self, py_db):
py_db.on_disconnect()
def set_protocol(self, py_db, seq, protocol):
set_protocol(protocol.strip())
if get_protocol() in (HTTP_JSON_PROTOCOL, JSON_PROTOCOL):
@ -105,6 +111,14 @@ class PyDevdAPI(object):
'''
py_db.set_enable_thread_notifications(enable)
def request_disconnect(self, py_db, resume_threads):
self.set_enable_thread_notifications(py_db, False)
self.remove_all_breakpoints(py_db, filename='*')
self.remove_all_exception_breakpoints(py_db)
self.notify_disconnect(py_db)
if resume_threads:
self.request_resume_thread(thread_id='*')
def request_resume_thread(self, thread_id):
threads = []
if thread_id == '*':

View file

@ -187,10 +187,12 @@ def run_as_pydevd_daemon_thread(func, *args, **kwargs):
class ReaderThread(PyDBDaemonThread):
''' reader thread reads and dispatches commands in an infinite loop '''
def __init__(self, sock):
def __init__(self, sock, terminate_on_socket_close=True):
assert sock is not None
from _pydevd_bundle.pydevd_process_net_command_json import process_net_command_json
from _pydevd_bundle.pydevd_process_net_command import process_net_command
PyDBDaemonThread.__init__(self)
self._terminate_on_socket_close = terminate_on_socket_close
self.sock = sock
self._buffer = b''
@ -199,13 +201,17 @@ class ReaderThread(PyDBDaemonThread):
self.process_net_command_json = process_net_command_json
self.global_debugger_holder = GlobalDebuggerHolder
@overrides(PyDBDaemonThread.do_kill_pydev_thread)
def do_kill_pydev_thread(self):
PyDBDaemonThread.do_kill_pydev_thread(self)
# We must close the socket so that it doesn't stay halted there.
self.killReceived = True
try:
self.sock.shutdown(SHUT_RD) # shutdown the socket for read
except:
# just ignore that
pass
try:
self.sock.close()
except:
pass
def _read(self, size):
@ -313,7 +319,8 @@ class ReaderThread(PyDBDaemonThread):
self.handle_except()
def handle_except(self):
self.global_debugger_holder.global_dbg.finish_debugging_session()
if self._terminate_on_socket_close:
self.global_debugger_holder.global_dbg.finish_debugging_session()
def process_command(self, cmd_id, seq, text):
self.process_net_command(self.global_debugger_holder.global_dbg, cmd_id, seq, text)
@ -322,9 +329,10 @@ class ReaderThread(PyDBDaemonThread):
class WriterThread(PyDBDaemonThread):
''' writer thread writes out the commands in an infinite loop '''
def __init__(self, sock):
def __init__(self, sock, terminate_on_socket_close=True):
PyDBDaemonThread.__init__(self)
self.sock = sock
self._terminate_on_socket_close = terminate_on_socket_close
self.setName("pydevd.Writer")
self.cmdQueue = _queue.Queue()
if pydevd_vm_type.get_vm_type() == 'python':
@ -350,6 +358,9 @@ class WriterThread(PyDBDaemonThread):
if self.killReceived:
try:
self.sock.shutdown(SHUT_WR)
except:
pass
try:
self.sock.close()
except:
pass
@ -370,16 +381,29 @@ class WriterThread(PyDBDaemonThread):
break # interpreter shutdown
time.sleep(self.timeout)
except Exception:
GlobalDebuggerHolder.global_dbg.finish_debugging_session()
if DebugInfoHolder.DEBUG_TRACE_LEVEL >= 0:
pydev_log_exception()
if self._terminate_on_socket_close:
GlobalDebuggerHolder.global_dbg.finish_debugging_session()
if DebugInfoHolder.DEBUG_TRACE_LEVEL > 0:
pydev_log_exception()
def empty(self):
return self.cmdQueue.empty()
@overrides(PyDBDaemonThread.do_kill_pydev_thread)
def do_kill_pydev_thread(self):
PyDBDaemonThread.do_kill_pydev_thread(self)
# We must close the socket so that it doesn't stay halted there.
try:
self.sock.shutdown(SHUT_WR) # shutdown the socket for write
except:
pass
try:
self.sock.close()
except:
pass
def start_server(port):
''' binds to a port, waits for the debugger to connect '''
def create_server_socket(host, port):
s = socket(AF_INET, SOCK_STREAM)
s.settimeout(None)
@ -389,20 +413,26 @@ def start_server(port):
except ImportError:
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
s.bind(('', port))
s.bind((host, port))
pydev_log.info("Bound to port :%s", port)
return s
def start_server(port):
''' binds to a port, waits for the debugger to connect '''
s = create_server_socket(host='', port=port)
try:
s.listen(1)
newSock, _addr = s.accept()
new_socket, _addr = s.accept()
pydev_log.info("Connection accepted")
# closing server socket is not necessary but we don't need it
s.shutdown(SHUT_RDWR)
s.close()
return newSock
return new_socket
except:
pydev_log.exception("Could not bind to port: %s\n", port)
raise
def start_client(host, port):

View file

@ -201,6 +201,8 @@ class _PyDevJsonCommandProcessor(object):
:param ConfigurationDoneRequest request:
'''
self.api.run(py_db)
self.api.notify_configuration_done(py_db)
configuration_done_response = pydevd_base_schema.build_response(request)
return NetCommand(CMD_RETURN, 0, configuration_done_response, is_json=True)
@ -421,10 +423,7 @@ class _PyDevJsonCommandProcessor(object):
'''
:param DisconnectRequest request:
'''
self.api.set_enable_thread_notifications(py_db, False)
self.api.remove_all_breakpoints(py_db, filename='*')
self.api.remove_all_exception_breakpoints(py_db)
self.api.request_resume_thread(thread_id='*')
self.api.request_disconnect(py_db, resume_threads=True)
response = pydevd_base_schema.build_response(request)
return NetCommand(CMD_RETURN, 0, response, is_json=True)

View file

@ -55,7 +55,8 @@ from pydevd_concurrency_analyser.pydevd_thread_wrappers import wrap_threads
from pydevd_file_utils import get_abs_path_real_path_and_base_from_frame, NORM_PATHS_AND_BASE_CONTAINER, get_abs_path_real_path_and_base_from_file
from pydevd_file_utils import get_fullname, rPath, get_package_dir
import pydevd_tracing
from _pydevd_bundle.pydevd_comm import InternalThreadCommand, InternalThreadCommandForAnyThread
from _pydevd_bundle.pydevd_comm import (InternalThreadCommand, InternalThreadCommandForAnyThread,
create_server_socket)
from _pydevd_bundle.pydevd_comm import(InternalConsoleExec,
PyDBDaemonThread, _queue, ReaderThread, GetGlobalDebugger, get_global_debugger,
set_global_debugger, WriterThread,
@ -65,6 +66,8 @@ from _pydevd_bundle.pydevd_comm import(InternalConsoleExec,
from _pydevd_bundle.pydevd_breakpoints import stop_on_unhandled_exception
from _pydevd_bundle.pydevd_collect_try_except_info import collect_try_except_info
from _pydevd_bundle.pydevd_suspended_frames import SuspendedFramesManager
from socket import SHUT_RDWR
from _pydevd_bundle.pydevd_api import PyDevdAPI
__version_info__ = (1, 3, 3)
__version_info_str__ = []
@ -129,11 +132,9 @@ try:
except:
pass
connected = False
_debugger_setup = False
bufferStdOutToServer = False
bufferStdErrToServer = False
remote = False
forked = False
file_system_encoding = getfilesystemencoding()
@ -220,9 +221,6 @@ class CheckOutputThread(PyDBDaemonThread):
pydev_log.debug("The following pydb threads may not have finished correctly: %s",
', '.join([t.getName() for t in pydb_daemon_threads if t is not self]))
def do_kill_pydev_thread(self):
self.killReceived = True
class AbstractSingleNotificationBehavior(object):
'''
@ -388,6 +386,8 @@ class PyDB(object):
self.reader = None
self.writer = None
self._waiting_for_connection_thread = None
self._on_configuration_done_event = threading.Event()
self.output_checker_thread = None
self.py_db_command_thread = None
self.quitting = None
@ -517,6 +517,21 @@ class PyDB(object):
self._exclude_by_filter_cache = {}
self._apply_filter_cache = {}
def on_configuration_done(self):
'''
Note: only called when using the DAP (Debug Adapter Protocol).
'''
self._on_configuration_done_event.set()
def on_disconnect(self):
'''
Note: only called when using the DAP (Debug Adapter Protocol).
'''
self._on_configuration_done_event.clear()
def block_until_configuration_done(self):
self._on_configuration_done_event.wait()
def add_fake_frame(self, thread_id, frame_id, frame):
self.suspended_frames_manager.add_fake_frame(thread_id, frame_id, frame)
@ -877,13 +892,20 @@ class PyDB(object):
def finish_debugging_session(self):
self._finish_debugging_session = True
def initialize_network(self, sock):
def initialize_network(self, sock, terminate_on_socket_close=True):
try:
sock.settimeout(None) # infinite, no timeouts from now on - jython does not have it
except:
pass
self.writer = WriterThread(sock)
self.reader = ReaderThread(sock)
curr_reader = getattr(self, 'reader', None)
curr_writer = getattr(self, 'writer', None)
if curr_reader:
curr_reader.do_kill_pydev_thread()
if curr_writer:
curr_writer.do_kill_pydev_thread()
self.writer = WriterThread(sock, terminate_on_socket_close=terminate_on_socket_close)
self.reader = ReaderThread(sock, terminate_on_socket_close=terminate_on_socket_close)
self.writer.start()
self.reader.start()
@ -897,6 +919,66 @@ class PyDB(object):
self.initialize_network(s)
def create_wait_for_connection_thread(self):
if self._waiting_for_connection_thread is not None:
raise AssertionError('There is already another thread waiting for a connection.')
self._waiting_for_connection_thread = self._WaitForConnectionThread(self)
self._waiting_for_connection_thread.start()
class _WaitForConnectionThread(PyDBDaemonThread):
def __init__(self, py_db):
PyDBDaemonThread.__init__(self)
self.py_db = py_db
self._server_socket = None
def run(self):
host = SetupHolder.setup['client']
port = SetupHolder.setup['port']
self._server_socket = create_server_socket(host=host, port=port)
while not self.killReceived:
try:
s = self._server_socket
if s is None:
return
s.listen(1)
new_socket, _addr = s.accept()
if self.killReceived:
pydev_log.info("Connection (from wait_for_attach) accepted but ignored as kill was already received.")
return
pydev_log.info("Connection (from wait_for_attach) accepted.")
reader = getattr(self.py_db, 'reader', None)
if reader is not None:
# This is needed if a new connection is done without the client properly
# sending a disconnect for the previous connection.
api = PyDevdAPI()
api.request_disconnect(self.py_db, resume_threads=False)
self.py_db.initialize_network(new_socket, terminate_on_socket_close=False)
except:
if DebugInfoHolder.DEBUG_TRACE_LEVEL > 0:
pydev_log.exception()
pydev_log.debug("Exiting _WaitForConnectionThread: %s\n", port)
def do_kill_pydev_thread(self):
PyDBDaemonThread.do_kill_pydev_thread(self)
s = self._server_socket
try:
s.shutdown(SHUT_RDWR)
except:
pass
try:
s.close()
except:
pass
self._server_socket = None
def get_internal_queue(self, thread_id):
""" returns internal command queue for a given thread.
if new queue is created, notify the RDB about it """
@ -1845,6 +1927,39 @@ def init_stderr_redirect(on_write=None):
sys.stderr = pydevd_io.IORedirector(original, sys._pydevd_err_buffer_, wrap_buffer) # @UndefinedVariable
def _enable_attach(address):
'''
Starts accepting connections at the given host/port. The debugger will not be initialized nor
configured, it'll only start accepting connections (and will have the tracing setup in this
thread).
Meant to be used with the DAP (Debug Adapter Protocol) with _wait_for_attach().
:param address: (host, port)
:type address: tuple(str, int)
'''
host = address[0]
port = int(address[1])
if _debugger_setup:
if port != SetupHolder.setup['port']:
raise AssertionError('Unable to listen in port: %s (already listening in port: %s)' % (port, SetupHolder.setup['port']))
settrace(host=host, port=port, suspend=False, wait_for_ready_to_run=False, block_until_connected=False)
def _wait_for_attach():
'''
Meant to be called after _enable_attach() -- the current thread will only unblock after a
connection is in place and the the DAP (Debug Adapter Protocol) sends the ConfigurationDone
request.
'''
py_db = get_global_debugger()
if py_db is None:
raise AssertionError('Debugger still not created. Please use _enable_attach() before using _wait_for_attach().')
py_db.block_until_configuration_done()
#=======================================================================================================================
# settrace
#=======================================================================================================================
@ -1858,6 +1973,8 @@ def settrace(
overwrite_prev_trace=False,
patch_multiprocessing=False,
stop_at_frame=None,
block_until_connected=True,
wait_for_ready_to_run=True,
):
'''Sets the tracing function with the pydev debug function and initializes needed facilities.
@ -1884,9 +2001,15 @@ def settrace(
@param stop_at_frame: if passed it'll stop at the given frame, otherwise it'll stop in the function which
called this method.
@param wait_for_ready_to_run: if True settrace will block until the ready_to_run flag is set to True,
otherwise, it'll set ready_to_run to True and this function won't block.
Note that if wait_for_ready_to_run == False, there are no guarantees that the debugger is synchronized
with what's configured in the client (IDE), the only guarantee is that when leaving this function
the debugger will be already connected.
'''
_set_trace_lock.acquire()
try:
with _set_trace_lock:
_locked_settrace(
host,
stdoutToServer,
@ -1896,9 +2019,9 @@ def settrace(
trace_only_current_thread,
patch_multiprocessing,
stop_at_frame,
block_until_connected,
wait_for_ready_to_run,
)
finally:
_set_trace_lock.release()
_set_trace_lock = thread.allocate_lock()
@ -1913,6 +2036,8 @@ def _locked_settrace(
trace_only_current_thread,
patch_multiprocessing,
stop_at_frame,
block_until_connected,
wait_for_ready_to_run,
):
if patch_multiprocessing:
try:
@ -1926,11 +2051,11 @@ def _locked_settrace(
from _pydev_bundle import pydev_localhost
host = pydev_localhost.get_localhost()
global connected
global _debugger_setup
global bufferStdOutToServer
global bufferStdErrToServer
if not connected:
if not _debugger_setup:
pydevd_vm_type.setup_type()
if SetupHolder.setup is None:
@ -1945,10 +2070,15 @@ def _locked_settrace(
debugger = get_global_debugger()
if debugger is None:
debugger = PyDB()
debugger.connect(host, port) # Note: connect can raise error.
if block_until_connected:
debugger.connect(host, port) # Note: connect can raise error.
else:
# Create a dummy writer and wait for the real connection.
debugger.writer = WriterThread(NULL, terminate_on_socket_close=False)
debugger.create_wait_for_connection_thread()
# Mark connected only if it actually succeeded.
connected = True
_debugger_setup = True
bufferStdOutToServer = stdoutToServer
bufferStdErrToServer = stderrToServer
@ -1963,18 +2093,18 @@ def _locked_settrace(
t = threadingCurrentThread()
additional_info = set_additional_thread_info(t)
if not wait_for_ready_to_run:
debugger.ready_to_run = True
while not debugger.ready_to_run:
time.sleep(0.1) # busy wait until we receive run command
# Set the tracing only
debugger.set_trace_for_frame_and_parents(get_frame().f_back)
CustomFramesContainer.custom_frames_lock.acquire() # @UndefinedVariable
try:
with CustomFramesContainer.custom_frames_lock: # @UndefinedVariable
for _frameId, custom_frame in dict_iter_items(CustomFramesContainer.custom_frames):
debugger.set_trace_for_frame_and_parents(custom_frame.frame)
finally:
CustomFramesContainer.custom_frames_lock.release() # @UndefinedVariable
debugger.start_auxiliary_daemon_threads()
@ -2020,8 +2150,8 @@ def _locked_settrace(
def stoptrace():
global connected
if connected:
global _debugger_setup
if _debugger_setup:
pydevd_tracing.restore_sys_set_trace_func()
sys.settrace(None)
try:
@ -2042,7 +2172,7 @@ def stoptrace():
kill_all_pydev_threads()
connected = False
_debugger_setup = False
class Dispatcher(object):
@ -2121,10 +2251,8 @@ def settrace_forked():
pydevd_tracing.restore_sys_set_trace_func()
if port is not None:
global connected
connected = False
global forked
forked = True
global _debugger_setup
_debugger_setup = False
custom_frames_container_init()
@ -2347,8 +2475,8 @@ def main():
pydev_log.exception()
sys.exit(1)
global connected
connected = True # Mark that we're connected when started from inside ide.
global _debugger_setup
_debugger_setup = True # Mark that the debugger is setup when started from the ide.
globals = debugger.run(setup['file'], None, None, is_module)

View file

@ -369,6 +369,51 @@ def case_setup_remote():
return CaseSetup()
@pytest.fixture
def case_setup_remote_attach_to():
'''
The difference from this to case_setup_remote is that this one will connect to a server
socket started by the debugger and case_setup_remote will create the server socket and wait
for a connection from the debugger.
'''
runner = DebuggerRunnerRemote()
class WriterThread(debugger_unittest.AbstractWriterThread):
@overrides(debugger_unittest.AbstractWriterThread.run)
def run(self):
# I.e.: don't start socket on start(), rather, the test should call
# start_socket_client() when needed.
pass
class CaseSetup(object):
@contextmanager
def test_file(
self,
filename,
port,
**kwargs
):
def update_command_line_args(writer, args):
ret = debugger_unittest.AbstractWriterThread.update_command_line_args(writer, args)
ret.append(str(port))
return ret
WriterThread.TEST_FILE = debugger_unittest._get_debugger_test_file(filename)
WriterThread.update_command_line_args = update_command_line_args
for key, value in kwargs.items():
assert hasattr(WriterThread, key)
setattr(WriterThread, key, value)
with runner.check_case(WriterThread, wait_for_port=False) as writer:
yield writer
return CaseSetup()
@pytest.fixture
def case_setup_multiprocessing():

View file

@ -381,14 +381,15 @@ class DebuggerRunner(object):
return args + ret
@contextmanager
def check_case(self, writer_class):
def check_case(self, writer_class, wait_for_port=True):
if callable(writer_class):
writer = writer_class()
else:
writer = writer_class
try:
writer.start()
wait_for_condition(lambda: hasattr(writer, 'port'))
if wait_for_port:
wait_for_condition(lambda: hasattr(writer, 'port'))
self.writer = writer
args = self.get_command_line()
@ -400,7 +401,8 @@ class DebuggerRunner(object):
with self.run_process(args, writer) as dct_with_stdout_stder:
try:
wait_for_condition(lambda: writer.finished_initialization)
if wait_for_port:
wait_for_condition(lambda: writer.finished_initialization)
except TimeoutError:
sys.stderr.write('Timed out waiting for initialization\n')
sys.stderr.write('stdout:\n%s\n\nstderr:\n%s\n' % (
@ -717,19 +719,63 @@ class AbstractWriterThread(threading.Thread):
if SHOW_WRITES_AND_READS:
print('Waiting in socket.accept()')
self.server_socket = server_socket
new_sock, addr = server_socket.accept()
new_socket, addr = server_socket.accept()
if SHOW_WRITES_AND_READS:
print('Test Writer Thread Socket:', new_sock, addr)
print('Test Writer Thread Socket:', new_socket, addr)
reader_thread = self.reader_thread = ReaderThread(new_sock)
self._set_socket(new_socket)
def _set_socket(self, new_socket):
curr_socket = getattr(self, 'sock', None)
if curr_socket:
try:
curr_socket.shutdown(socket.SHUT_WR)
except:
pass
try:
curr_socket.close()
except:
pass
reader_thread = self.reader_thread = ReaderThread(new_socket)
self.sock = new_socket
reader_thread.start()
self.sock = new_sock
# initial command is always the version
self.write_version()
self.log.append('start_socket')
self.finished_initialization = True
def start_socket_client(self, host, port):
self._sequence = -1
if SHOW_WRITES_AND_READS:
print("Connecting to %s:%s" % (host, port))
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Set TCP keepalive on an open socket.
# It activates after 1 second (TCP_KEEPIDLE,) of idleness,
# then sends a keepalive ping once every 3 seconds (TCP_KEEPINTVL),
# and closes the connection after 5 failed ping (TCP_KEEPCNT), or 15 seconds
try:
from socket import IPPROTO_TCP, SO_KEEPALIVE, TCP_KEEPIDLE, TCP_KEEPINTVL, TCP_KEEPCNT
s.setsockopt(socket.SOL_SOCKET, SO_KEEPALIVE, 1)
s.setsockopt(IPPROTO_TCP, TCP_KEEPIDLE, 1)
s.setsockopt(IPPROTO_TCP, TCP_KEEPINTVL, 3)
s.setsockopt(IPPROTO_TCP, TCP_KEEPCNT, 5)
except ImportError:
pass # May not be available everywhere.
# 10 seconds default timeout
timeout = int(os.environ.get('PYDEVD_CONNECT_TIMEOUT', 10))
s.settimeout(timeout)
s.connect((host, port))
s.settimeout(None) # no timeout after connected
if SHOW_WRITES_AND_READS:
print("Connected.")
self._set_socket(s)
return s
def next_breakpoint_id(self):
self._next_breakpoint_id += 1
return self._next_breakpoint_id

View file

@ -0,0 +1,50 @@
if __name__ == '__main__':
import os
import sys
import time
port = int(sys.argv[1])
root_dirname = os.path.dirname(os.path.dirname(__file__))
if root_dirname not in sys.path:
sys.path.append(root_dirname)
import pydevd
try:
pydevd._wait_for_attach() # Cannot be called before _enable_attach.
except AssertionError:
pass
else:
raise AssertionError('Expected _wait_for_attach to raise exception.')
assert sys.gettrace() is None
print('enable attach to port: %s' % (port,))
pydevd._enable_attach(('127.0.0.1', port))
pydevd._enable_attach(('127.0.0.1', port)) # no-op in practice
try:
pydevd._enable_attach(('127.0.0.1', port + 15)) # different port: raise error.
except AssertionError:
pass
else:
raise AssertionError('Expected _enable_attach to raise exception (because it is already hearing in another port).')
assert pydevd.get_global_debugger() is not None
assert sys.gettrace() is not None
a = 10 # Break 1
print('wait for attach')
pydevd._wait_for_attach()
print('finished wait for attach')
pydevd._wait_for_attach() # Should promptly return (already connected).
a = 20 # Break 2
pydevd._wait_for_attach() # As we disconnected on the 2nd break, this one should wait until a new configurationDone.
a = 20 # Break 3
while a == 20: # Pause 1
# The debugger should disconnect/reconnect, pause and then change 'a' to another value.
time.sleep(1 / 20.) # Pause 2
print('TEST SUCEEDED!')

View file

@ -13,6 +13,7 @@ from _pydevd_bundle.pydevd_constants import int_types
from tests_python.debug_constants import * # noqa
import time
from os.path import normcase
from _pydev_bundle.pydev_localhost import get_socket_name
pytest_plugins = [
str('tests_python.debugger_fixtures'),
@ -27,6 +28,13 @@ pytestmark = pytest.mark.skipif(IS_JYTHON, reason='Single notification is not OK
MAX_EXPECTED_ID = 10000
class _MessageWithMark(object):
def __init__(self, msg):
self.msg = msg
self.marked = False
class JsonFacade(object):
def __init__(self, writer):
@ -34,29 +42,26 @@ class JsonFacade(object):
writer.reader_thread.accept_xml_messages = False
writer.write_set_protocol('http_json')
writer.write_multi_threads_single_notification(True)
self._all_json_messages_found = []
def collect_messages_until_accepted(self, expected_class, accept_message=lambda obj:True):
collected_messages = []
def accept_json_message(msg):
if msg.startswith('{'):
decoded_msg = from_json(msg)
if isinstance(decoded_msg, expected_class):
if accept_message(decoded_msg):
return True
collected_messages.append(decoded_msg)
return False
msg = self.writer.wait_for_message(accept_json_message, unquote_msg=False, expect_xml=False)
return collected_messages, from_json(msg)
def mark_messages(self, expected_class, accept_message):
ret = []
for message_with_mark in self._all_json_messages_found:
if not message_with_mark.marked:
if isinstance(message_with_mark.msg, expected_class):
if accept_message(message_with_mark.msg):
message_with_mark.marked = True
ret.append(message_with_mark.msg)
return ret
def wait_for_json_message(self, expected_class, accept_message=lambda obj:True):
def accept_json_message(msg):
if msg.startswith('{'):
decoded_msg = from_json(msg)
self._all_json_messages_found.append(_MessageWithMark(decoded_msg))
if isinstance(decoded_msg, expected_class):
if accept_message(decoded_msg):
return True
@ -101,7 +106,10 @@ class JsonFacade(object):
assert stopped_event.body.reason == reason
json_hit = self.get_stack_as_json_hit(stopped_event.body.threadId)
if line is not None:
assert json_hit.stack_trace_response.body.stackFrames[0]['line'] == line
found_line = json_hit.stack_trace_response.body.stackFrames[0]['line']
if not isinstance(line, (tuple, list)):
line = [line]
assert found_line in line, 'Expect to break at line: %s. Found: %s' % (line, found_line)
if file is not None:
assert json_hit.stack_trace_response.body.stackFrames[0]['source']['path'].endswith(file)
if name is not None:
@ -453,9 +461,9 @@ def test_case_started_exited_threads_protocol(case_setup):
json_facade.write_make_initial_run()
collected_messages, _stopped_event = json_facade.collect_messages_until_accepted(StoppedEvent)
started_events = [x for x in collected_messages if isinstance(x, ThreadEvent) and x.body.reason == 'started']
exited_events = [x for x in collected_messages if isinstance(x, ThreadEvent) and x.body.reason == 'exited']
_stopped_event = json_facade.wait_for_json_message(StoppedEvent)
started_events = json_facade.mark_messages(ThreadEvent, lambda x: x.body.reason == 'started')
exited_events = json_facade.mark_messages(ThreadEvent, lambda x: x.body.reason == 'exited')
assert len(started_events) == 4
assert len(exited_events) == 3 # Main is still running.
json_facade.write_continue(wait_for_response=False)
@ -1744,6 +1752,67 @@ def test_set_debugger_property(case_setup, dbg_property):
writer.finished_ok = True
def test_wait_for_attach(case_setup_remote_attach_to):
host_port = get_socket_name(close=True)
def check_thread_events(json_facade):
json_facade.write_list_threads()
# Check that we have the started thread event (whenever we reconnect).
started_events = json_facade.mark_messages(ThreadEvent, lambda x: x.body.reason == 'started')
assert len(started_events) == 1
with case_setup_remote_attach_to.test_file('_debugger_case_wait_for_attach.py', host_port[1]) as writer:
time.sleep(.5) # Give some time for it to pass the first breakpoint and wait in 'wait_for_attach'.
writer.start_socket_client(*host_port)
json_facade = JsonFacade(writer)
check_thread_events(json_facade)
break1_line = writer.get_line_index_with_content('Break 1')
break2_line = writer.get_line_index_with_content('Break 2')
break3_line = writer.get_line_index_with_content('Break 3')
pause1_line = writer.get_line_index_with_content('Pause 1')
pause2_line = writer.get_line_index_with_content('Pause 2')
json_facade.write_set_breakpoints([break1_line, break2_line, break3_line])
json_facade.write_make_initial_run()
json_facade.wait_for_thread_stopped(line=break2_line)
# Upon disconnect, all threads should be running again.
json_facade.write_disconnect()
# Connect back (socket should remain open).
writer.start_socket_client(*host_port)
json_facade = JsonFacade(writer)
check_thread_events(json_facade)
json_facade.write_set_breakpoints([break1_line, break2_line, break3_line])
json_facade.write_make_initial_run()
json_facade.wait_for_thread_stopped(line=break3_line)
# Upon disconnect, all threads should be running again.
json_facade.write_disconnect()
# Connect back (socket should remain open).
writer.start_socket_client(*host_port)
json_facade = JsonFacade(writer)
check_thread_events(json_facade)
# Connect back without a disconnect (auto-disconnects previous and connects new client).
writer.start_socket_client(*host_port)
json_facade = JsonFacade(writer)
check_thread_events(json_facade)
json_facade.write_pause()
json_hit = json_facade.wait_for_thread_stopped(reason='pause', line=[pause1_line, pause2_line])
# Change value of 'a' for test to finish.
json_facade.write_set_variable(json_hit.frame_id, 'a', '10')
json_facade.write_disconnect()
writer.finished_ok = True
@pytest.mark.skipif(IS_JYTHON, reason='Flaky on Jython.')
def test_path_translation_and_source_reference(case_setup):