Merge 3.4 (asyncio)

This commit is contained in:
Victor Stinner 2015-01-21 23:40:19 +01:00
commit 91d11bbb2d
7 changed files with 173 additions and 50 deletions

View file

@ -636,7 +636,7 @@ class BaseEventLoop(events.AbstractEventLoop):
try: try:
yield from waiter yield from waiter
except Exception as exc: except Exception:
transport.close() transport.close()
raise raise

View file

@ -463,9 +463,15 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
if f is not None: if f is not None:
f.result() # may raise f.result() # may raise
f = self._proactor.recv(self._ssock, 4096) f = self._proactor.recv(self._ssock, 4096)
except: except futures.CancelledError:
self.close() # _close_self_pipe() has been called, stop waiting for data
raise return
except Exception as exc:
self.call_exception_handler({
'message': 'Error on reading from the event loop self pipe',
'exception': exc,
'loop': self,
})
else: else:
self._self_reading_future = f self._self_reading_future = f
f.add_done_callback(self._loop_self_reading) f.add_done_callback(self._loop_self_reading)

View file

@ -10,7 +10,6 @@ import collections
import errno import errno
import functools import functools
import socket import socket
import sys
try: try:
import ssl import ssl
except ImportError: # pragma: no cover except ImportError: # pragma: no cover

View file

@ -78,20 +78,23 @@ class _OverlappedFuture(futures.Future):
self._ov = None self._ov = None
class _WaitHandleFuture(futures.Future): class _BaseWaitHandleFuture(futures.Future):
"""Subclass of Future which represents a wait handle.""" """Subclass of Future which represents a wait handle."""
def __init__(self, iocp, ov, handle, wait_handle, *, loop=None): def __init__(self, ov, handle, wait_handle, *, loop=None):
super().__init__(loop=loop) super().__init__(loop=loop)
if self._source_traceback: if self._source_traceback:
del self._source_traceback[-1] del self._source_traceback[-1]
# iocp and ov are only used by cancel() to notify IocpProactor # Keep a reference to the Overlapped object to keep it alive until the
# that the wait was cancelled # wait is unregistered
self._iocp = iocp
self._ov = ov self._ov = ov
self._handle = handle self._handle = handle
self._wait_handle = wait_handle self._wait_handle = wait_handle
# Should we call UnregisterWaitEx() if the wait completes
# or is cancelled?
self._registered = True
def _poll(self): def _poll(self):
# non-blocking wait: use a timeout of 0 millisecond # non-blocking wait: use a timeout of 0 millisecond
return (_winapi.WaitForSingleObject(self._handle, 0) == return (_winapi.WaitForSingleObject(self._handle, 0) ==
@ -99,21 +102,32 @@ class _WaitHandleFuture(futures.Future):
def _repr_info(self): def _repr_info(self):
info = super()._repr_info() info = super()._repr_info()
info.insert(1, 'handle=%#x' % self._handle) info.append('handle=%#x' % self._handle)
if self._wait_handle: if self._handle is not None:
state = 'signaled' if self._poll() else 'waiting' state = 'signaled' if self._poll() else 'waiting'
info.insert(1, 'wait_handle=<%s, %#x>' info.append(state)
% (state, self._wait_handle)) if self._wait_handle is not None:
info.append('wait_handle=%#x' % self._wait_handle)
return info return info
def _unregister_wait_cb(self, fut):
# The wait was unregistered: it's not safe to destroy the Overlapped
# object
self._ov = None
def _unregister_wait(self): def _unregister_wait(self):
if self._wait_handle is None: if not self._registered:
return return
self._registered = False
try: try:
_overlapped.UnregisterWait(self._wait_handle) _overlapped.UnregisterWait(self._wait_handle)
except OSError as exc: except OSError as exc:
self._wait_handle = None
if exc.winerror == _overlapped.ERROR_IO_PENDING:
# ERROR_IO_PENDING is not an error, the wait was unregistered # ERROR_IO_PENDING is not an error, the wait was unregistered
if exc.winerror != _overlapped.ERROR_IO_PENDING: self._unregister_wait_cb(None)
elif exc.winerror != _overlapped.ERROR_IO_PENDING:
context = { context = {
'message': 'Failed to unregister the wait handle', 'message': 'Failed to unregister the wait handle',
'exception': exc, 'exception': exc,
@ -122,26 +136,91 @@ class _WaitHandleFuture(futures.Future):
if self._source_traceback: if self._source_traceback:
context['source_traceback'] = self._source_traceback context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context) self._loop.call_exception_handler(context)
else:
self._wait_handle = None self._wait_handle = None
self._iocp = None self._unregister_wait_cb(None)
self._ov = None
def cancel(self): def cancel(self):
result = super().cancel()
if self._ov is not None:
# signal the cancellation to the overlapped object
_overlapped.PostQueuedCompletionStatus(self._iocp, True,
0, self._ov.address)
self._unregister_wait() self._unregister_wait()
return result return super().cancel()
def set_exception(self, exception): def set_exception(self, exception):
super().set_exception(exception)
self._unregister_wait() self._unregister_wait()
super().set_exception(exception)
def set_result(self, result): def set_result(self, result):
super().set_result(result)
self._unregister_wait() self._unregister_wait()
super().set_result(result)
class _WaitCancelFuture(_BaseWaitHandleFuture):
"""Subclass of Future which represents a wait for the cancellation of a
_WaitHandleFuture using an event.
"""
def __init__(self, ov, event, wait_handle, *, loop=None):
super().__init__(ov, event, wait_handle, loop=loop)
self._done_callback = None
def _schedule_callbacks(self):
super(_WaitCancelFuture, self)._schedule_callbacks()
if self._done_callback is not None:
self._done_callback(self)
class _WaitHandleFuture(_BaseWaitHandleFuture):
def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
super().__init__(ov, handle, wait_handle, loop=loop)
self._proactor = proactor
self._unregister_proactor = True
self._event = _overlapped.CreateEvent(None, True, False, None)
self._event_fut = None
def _unregister_wait_cb(self, fut):
if self._event is not None:
_winapi.CloseHandle(self._event)
self._event = None
self._event_fut = None
# If the wait was cancelled, the wait may never be signalled, so
# it's required to unregister it. Otherwise, IocpProactor.close() will
# wait forever for an event which will never come.
#
# If the IocpProactor already received the event, it's safe to call
# _unregister() because we kept a reference to the Overlapped object
# which is used as an unique key.
self._proactor._unregister(self._ov)
self._proactor = None
super()._unregister_wait_cb(fut)
def _unregister_wait(self):
if not self._registered:
return
self._registered = False
try:
_overlapped.UnregisterWaitEx(self._wait_handle, self._event)
except OSError as exc:
self._wait_handle = None
if exc.winerror == _overlapped.ERROR_IO_PENDING:
# ERROR_IO_PENDING is not an error, the wait was unregistered
self._unregister_wait_cb(None)
elif exc.winerror != _overlapped.ERROR_IO_PENDING:
context = {
'message': 'Failed to unregister the wait handle',
'exception': exc,
'future': self,
}
if self._source_traceback:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
else:
self._wait_handle = None
self._event_fut = self._proactor._wait_cancel(
self._event,
self._unregister_wait_cb)
class PipeServer(object): class PipeServer(object):
@ -291,6 +370,7 @@ class IocpProactor:
_overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency) _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
self._cache = {} self._cache = {}
self._registered = weakref.WeakSet() self._registered = weakref.WeakSet()
self._unregistered = []
self._stopped_serving = weakref.WeakSet() self._stopped_serving = weakref.WeakSet()
def __repr__(self): def __repr__(self):
@ -438,6 +518,16 @@ class IocpProactor:
Return a Future object. The result of the future is True if the wait Return a Future object. The result of the future is True if the wait
completed, or False if the wait did not complete (on timeout). completed, or False if the wait did not complete (on timeout).
""" """
return self._wait_for_handle(handle, timeout, False)
def _wait_cancel(self, event, done_callback):
fut = self._wait_for_handle(event, None, True)
# add_done_callback() cannot be used because the wait may only complete
# in IocpProactor.close(), while the event loop is not running.
fut._done_callback = done_callback
return fut
def _wait_for_handle(self, handle, timeout, _is_cancel):
if timeout is None: if timeout is None:
ms = _winapi.INFINITE ms = _winapi.INFINITE
else: else:
@ -447,9 +537,13 @@ class IocpProactor:
# We only create ov so we can use ov.address as a key for the cache. # We only create ov so we can use ov.address as a key for the cache.
ov = _overlapped.Overlapped(NULL) ov = _overlapped.Overlapped(NULL)
wh = _overlapped.RegisterWaitWithQueue( wait_handle = _overlapped.RegisterWaitWithQueue(
handle, self._iocp, ov.address, ms) handle, self._iocp, ov.address, ms)
f = _WaitHandleFuture(self._iocp, ov, handle, wh, loop=self._loop) if _is_cancel:
f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
else:
f = _WaitHandleFuture(ov, handle, wait_handle, self,
loop=self._loop)
if f._source_traceback: if f._source_traceback:
del f._source_traceback[-1] del f._source_traceback[-1]
@ -462,14 +556,6 @@ class IocpProactor:
# False even though we have not timed out. # False even though we have not timed out.
return f._poll() return f._poll()
if f._poll():
try:
result = f._poll()
except OSError as exc:
f.set_exception(exc)
else:
f.set_result(result)
self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle) self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
return f return f
@ -521,6 +607,15 @@ class IocpProactor:
self._cache[ov.address] = (f, ov, obj, callback) self._cache[ov.address] = (f, ov, obj, callback)
return f return f
def _unregister(self, ov):
"""Unregister an overlapped object.
Call this method when its future has been cancelled. The event can
already be signalled (pending in the proactor event queue). It is also
safe if the event is never signalled (because it was cancelled).
"""
self._unregistered.append(ov)
def _get_accept_socket(self, family): def _get_accept_socket(self, family):
s = socket.socket(family) s = socket.socket(family)
s.settimeout(0) s.settimeout(0)
@ -541,7 +636,7 @@ class IocpProactor:
while True: while True:
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
if status is None: if status is None:
return break
ms = 0 ms = 0
err, transferred, key, address = status err, transferred, key, address = status
@ -576,6 +671,11 @@ class IocpProactor:
f.set_result(value) f.set_result(value)
self._results.append(f) self._results.append(f)
# Remove unregisted futures
for ov in self._unregistered:
self._cache.pop(ov.address, None)
self._unregistered.clear()
def _stop_serving(self, obj): def _stop_serving(self, obj):
# obj is a socket or pipe handle. It will be closed in # obj is a socket or pipe handle. It will be closed in
# BaseProactorEventLoop._stop_serving() which will make any # BaseProactorEventLoop._stop_serving() which will make any

View file

@ -523,9 +523,10 @@ class BaseProactorEventLoopTests(test_utils.TestCase):
def test_loop_self_reading_exception(self): def test_loop_self_reading_exception(self):
self.loop.close = mock.Mock() self.loop.close = mock.Mock()
self.loop.call_exception_handler = mock.Mock()
self.proactor.recv.side_effect = OSError() self.proactor.recv.side_effect = OSError()
self.assertRaises(OSError, self.loop._loop_self_reading) self.loop._loop_self_reading()
self.assertTrue(self.loop.close.called) self.assertTrue(self.loop.call_exception_handler.called)
def test_write_to_self(self): def test_write_to_self(self):
self.loop._write_to_self() self.loop._write_to_self()

View file

@ -415,10 +415,6 @@ class StreamReaderTests(test_utils.TestCase):
def set_err(): def set_err():
stream.set_exception(ValueError()) stream.set_exception(ValueError())
@asyncio.coroutine
def readline():
yield from stream.readline()
t1 = asyncio.Task(stream.readline(), loop=self.loop) t1 = asyncio.Task(stream.readline(), loop=self.loop)
t2 = asyncio.Task(set_err(), loop=self.loop) t2 = asyncio.Task(set_err(), loop=self.loop)
@ -429,11 +425,7 @@ class StreamReaderTests(test_utils.TestCase):
def test_exception_cancel(self): def test_exception_cancel(self):
stream = asyncio.StreamReader(loop=self.loop) stream = asyncio.StreamReader(loop=self.loop)
@asyncio.coroutine t = asyncio.Task(stream.readline(), loop=self.loop)
def read_a_line():
yield from stream.readline()
t = asyncio.Task(read_a_line(), loop=self.loop)
test_utils.run_briefly(self.loop) test_utils.run_briefly(self.loop)
t.cancel() t.cancel()
test_utils.run_briefly(self.loop) test_utils.run_briefly(self.loop)

View file

@ -309,6 +309,29 @@ overlapped_UnregisterWait(PyObject *self, PyObject *args)
Py_RETURN_NONE; Py_RETURN_NONE;
} }
PyDoc_STRVAR(
UnregisterWaitEx_doc,
"UnregisterWaitEx(WaitHandle, Event) -> None\n\n"
"Unregister wait handle.\n");
static PyObject *
overlapped_UnregisterWaitEx(PyObject *self, PyObject *args)
{
HANDLE WaitHandle, Event;
BOOL ret;
if (!PyArg_ParseTuple(args, F_HANDLE F_HANDLE, &WaitHandle, &Event))
return NULL;
Py_BEGIN_ALLOW_THREADS
ret = UnregisterWaitEx(WaitHandle, Event);
Py_END_ALLOW_THREADS
if (!ret)
return SetFromWindowsErr(0);
Py_RETURN_NONE;
}
/* /*
* Event functions -- currently only used by tests * Event functions -- currently only used by tests
*/ */
@ -1319,6 +1342,8 @@ static PyMethodDef overlapped_functions[] = {
METH_VARARGS, RegisterWaitWithQueue_doc}, METH_VARARGS, RegisterWaitWithQueue_doc},
{"UnregisterWait", overlapped_UnregisterWait, {"UnregisterWait", overlapped_UnregisterWait,
METH_VARARGS, UnregisterWait_doc}, METH_VARARGS, UnregisterWait_doc},
{"UnregisterWaitEx", overlapped_UnregisterWaitEx,
METH_VARARGS, UnregisterWaitEx_doc},
{"CreateEvent", overlapped_CreateEvent, {"CreateEvent", overlapped_CreateEvent,
METH_VARARGS, CreateEvent_doc}, METH_VARARGS, CreateEvent_doc},
{"SetEvent", overlapped_SetEvent, {"SetEvent", overlapped_SetEvent,