Allow ptvsd to run as server and let code connect to it

Expose --connect and debugpy.connect() as a public API, and add tests for it.
This commit is contained in:
Pavel Minaev 2020-02-28 17:04:40 -08:00 committed by Pavel Minaev
parent 4f43e00a07
commit 6ad1382a8c
19 changed files with 154 additions and 65 deletions

View file

@ -30,6 +30,7 @@ class Timeline(object):
self.name = str(name if name is not None else id(self))
self.ignore_unobserved = []
self._listeners = [] # [(expectation, callable)]
self._index_iter = itertools.count(1)
self._accepting_new = threading.Event()
self._finalized = threading.Event()
@ -341,6 +342,10 @@ class Timeline(object):
self._recorded_new.notify_all()
self._record_queue.task_done()
for exp, callback in tuple(self._listeners):
if exp == occ:
callback(occ)
def mark(self, id, block=True):
occ = Occurrence("mark", id)
occ.id = id
@ -360,6 +365,12 @@ class Timeline(object):
occ = ResponseOccurrence(request_occ, message)
return self._record(occ, block)
def when(self, expectation, callback):
"""For every occurrence recorded after this call, invokes callback(occurrence)
if occurrence == expectation.
"""
self._listeners.append((expectation, callback))
def _snapshot(self):
last = self._last
occ = self._beginning