debugpy/ptvsd/futures.py
2018-01-10 19:57:30 +00:00

163 lines
4.2 KiB
Python

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import print_function, with_statement, absolute_import
import sys
import threading
import traceback
from ptvsd.reraise import reraise
__author__ = "Microsoft Corporation <ptvshelp@microsoft.com>"
__version__ = "4.0.0a1"
class Future(object):
# XXX docstring
def __init__(self, loop):
self._lock = threading.Lock()
self._loop = loop
self._done = False
self._observed = False
self._done_callbacks = []
self._exc_info = None
def __del__(self):
with self._lock:
if self._done and self._exc_info and not self._observed:
print('Unobserved exception in a Future:', file=sys.__stderr__)
exctype, exc, tb = self._exc_info
traceback.print_exception(exctype, exc, tb,
file=sys.__stderr__)
def result(self):
# XXX docstring
with self._lock:
self._observed = True
if self._exc_info:
reraise(self._exc_info)
return self._result
def exc_info(self):
# XXX docstring
with self._lock:
self._observed = True
return self._exc_info
def set_result(self, result):
# XXX docstring
with self._lock:
self._result = result
self._exc_info = None
self._done = True
callbacks = list(self._done_callbacks)
def invoke_callbacks():
for cb in callbacks:
cb(self)
self._loop.call_soon(invoke_callbacks)
def set_exc_info(self, exc_info):
# XXX docstring
with self._lock:
self._exc_info = exc_info
self._done = True
callbacks = list(self._done_callbacks)
def invoke_callbacks():
for cb in callbacks:
cb(self)
self._loop.call_soon(invoke_callbacks)
def add_done_callback(self, callback):
# XXX docstring
with self._lock:
done = self._done
self._done_callbacks.append(callback)
if done:
callback(self)
def remove_done_callback(self, callback):
# XXX docstring
with self._lock:
self._done_callbacks.remove(callback)
class EventLoop(object):
# XXX docstring
def __init__(self):
self._queue = []
self._lock = threading.Lock()
self._event = threading.Event()
self._event.set()
def create_future(self):
return Future(self)
def run_forever(self):
while True:
self._event.wait()
with self._lock:
queue = self._queue
self._queue = []
self._event.clear()
for (f, args) in queue:
f(*args)
def call_soon(self, f, *args):
with self._lock:
self._queue.append((f, args))
self._event.set()
def call_soon_threadsafe(self, f, *args):
return self.call_soon(f, *args)
class Result(object):
# XXX docstring
__slots__ = ['value']
def __init__(self, value):
self.value = value
def async(f):
# XXX docstring
def g(self, loop, *args, **kwargs):
it = f(self, *args, **kwargs)
result = Future(loop)
if it is None:
result.set_result(None)
return result
def callback(fut):
try:
if fut is None:
x = next(it)
else:
exc_info = fut.exc_info()
if exc_info:
x = it.throw(exc_info[0], exc_info[1], exc_info[2])
else:
x = it.send(fut.result())
except StopIteration:
result.set_result(None)
except BaseException as ex:
result.set_exc_info(sys.exc_info())
else:
if isinstance(x, Result):
result.set_result(x.value)
else:
x.add_done_callback(callback)
callback(None)
return result
return g