mirror of
https://github.com/python/cpython.git
synced 2025-08-04 08:59:19 +00:00
asyncio: sync with Tulip
* _WaitHandleFuture.cancel() now notify IocpProactor through the overlapped object that the wait was cancelled. * Optimize IocpProactor.wait_for_handle() gets the result if the wait is signaled immediatly. * Enhance representation of Future and Future subclasses - Add "created at filename:lineno" in the representation - Add Future._repr_info() method which can be more easily overriden than Future.__repr__(). It should now be more easy to enhance Future representation without having to modify each subclass. For example, _OverlappedFuture and _WaitHandleFuture get the new "created at" information. - Use reprlib to format Future result, and function arguments when formatting a callback, to limit the length of the representation. * Fix repr(_WaitHandleFuture) * _WaitHandleFuture and _OverlappedFuture: hide frames of internal calls in the source traceback. * Cleanup ProactorIocp._poll(): set the timeout to 0 after the first call to GetQueuedCompletionStatus() * test_locks: close the temporary event loop and check the condition lock * Remove workaround in test_futures, no more needed
This commit is contained in:
parent
7eca7343a0
commit
313a980904
7 changed files with 108 additions and 72 deletions
|
@ -42,16 +42,12 @@ class _OverlappedFuture(futures.Future):
|
|||
del self._source_traceback[-1]
|
||||
self._ov = ov
|
||||
|
||||
def __repr__(self):
|
||||
info = [self._state.lower()]
|
||||
def _repr_info(self):
|
||||
info = super()._repr_info()
|
||||
if self._ov is not None:
|
||||
state = 'pending' if self._ov.pending else 'completed'
|
||||
info.append('overlapped=<%s, %#x>' % (state, self._ov.address))
|
||||
if self._state == futures._FINISHED:
|
||||
info.append(self._format_result())
|
||||
if self._callbacks:
|
||||
info.append(self._format_callbacks())
|
||||
return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
|
||||
info.insert(1, 'overlapped=<%s, %#x>' % (state, self._ov.address))
|
||||
return info
|
||||
|
||||
def _cancel_overlapped(self):
|
||||
if self._ov is None:
|
||||
|
@ -85,8 +81,14 @@ class _OverlappedFuture(futures.Future):
|
|||
class _WaitHandleFuture(futures.Future):
|
||||
"""Subclass of Future which represents a wait handle."""
|
||||
|
||||
def __init__(self, handle, wait_handle, *, loop=None):
|
||||
def __init__(self, iocp, ov, handle, wait_handle, *, loop=None):
|
||||
super().__init__(loop=loop)
|
||||
if self._source_traceback:
|
||||
del self._source_traceback[-1]
|
||||
# iocp and ov are only used by cancel() to notify IocpProactor
|
||||
# that the wait was cancelled
|
||||
self._iocp = iocp
|
||||
self._ov = ov
|
||||
self._handle = handle
|
||||
self._wait_handle = wait_handle
|
||||
|
||||
|
@ -95,19 +97,16 @@ class _WaitHandleFuture(futures.Future):
|
|||
return (_winapi.WaitForSingleObject(self._handle, 0) ==
|
||||
_winapi.WAIT_OBJECT_0)
|
||||
|
||||
def __repr__(self):
|
||||
info = [self._state.lower()]
|
||||
def _repr_info(self):
|
||||
info = super()._repr_info()
|
||||
info.insert(1, 'handle=%#x' % self._handle)
|
||||
if self._wait_handle:
|
||||
state = 'pending' if self._poll() else 'completed'
|
||||
info.append('wait_handle=<%s, %#x>' % (state, self._wait_handle))
|
||||
info.append('handle=<%#x>' % self._handle)
|
||||
if self._state == futures._FINISHED:
|
||||
info.append(self._format_result())
|
||||
if self._callbacks:
|
||||
info.append(self._format_callbacks())
|
||||
return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
|
||||
state = 'signaled' if self._poll() else 'waiting'
|
||||
info.insert(1, 'wait_handle=<%s, %#x>'
|
||||
% (state, self._wait_handle))
|
||||
return info
|
||||
|
||||
def _unregister(self):
|
||||
def _unregister_wait(self):
|
||||
if self._wait_handle is None:
|
||||
return
|
||||
try:
|
||||
|
@ -117,10 +116,25 @@ class _WaitHandleFuture(futures.Future):
|
|||
raise
|
||||
# ERROR_IO_PENDING is not an error, the wait was unregistered
|
||||
self._wait_handle = None
|
||||
self._iocp = None
|
||||
self._ov = None
|
||||
|
||||
def cancel(self):
|
||||
self._unregister()
|
||||
return super().cancel()
|
||||
result = super().cancel()
|
||||
if self._ov is not None:
|
||||
# signal the cancellation to the overlapped object
|
||||
_overlapped.PostQueuedCompletionStatus(self._iocp, True,
|
||||
0, self._ov.address)
|
||||
self._unregister_wait()
|
||||
return result
|
||||
|
||||
def set_exception(self, exception):
|
||||
super().set_exception(exception)
|
||||
self._unregister_wait()
|
||||
|
||||
def set_result(self, result):
|
||||
super().set_result(result)
|
||||
self._unregister_wait()
|
||||
|
||||
|
||||
class PipeServer(object):
|
||||
|
@ -405,7 +419,9 @@ class IocpProactor:
|
|||
ov = _overlapped.Overlapped(NULL)
|
||||
wh = _overlapped.RegisterWaitWithQueue(
|
||||
handle, self._iocp, ov.address, ms)
|
||||
f = _WaitHandleFuture(handle, wh, loop=self._loop)
|
||||
f = _WaitHandleFuture(self._iocp, ov, handle, wh, loop=self._loop)
|
||||
if f._source_traceback:
|
||||
del f._source_traceback[-1]
|
||||
|
||||
def finish_wait_for_handle(trans, key, ov):
|
||||
# Note that this second wait means that we should only use
|
||||
|
@ -414,12 +430,17 @@ class IocpProactor:
|
|||
# or semaphores are not. Also note if the handle is
|
||||
# signalled and then quickly reset, then we may return
|
||||
# False even though we have not timed out.
|
||||
try:
|
||||
return f._poll()
|
||||
finally:
|
||||
f._unregister()
|
||||
return f._poll()
|
||||
|
||||
self._cache[ov.address] = (f, ov, None, finish_wait_for_handle)
|
||||
if f._poll():
|
||||
try:
|
||||
result = f._poll()
|
||||
except OSError as exc:
|
||||
f.set_exception(exc)
|
||||
else:
|
||||
f.set_result(result)
|
||||
|
||||
self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
|
||||
return f
|
||||
|
||||
def _register_with_iocp(self, obj):
|
||||
|
@ -438,6 +459,8 @@ class IocpProactor:
|
|||
# operation when it completes. The future's value is actually
|
||||
# the value returned by callback().
|
||||
f = _OverlappedFuture(ov, loop=self._loop)
|
||||
if f._source_traceback:
|
||||
del f._source_traceback[-1]
|
||||
if not ov.pending and not wait_for_post:
|
||||
# The operation has completed, so no need to postpone the
|
||||
# work. We cannot take this short cut if we need the
|
||||
|
@ -484,10 +507,13 @@ class IocpProactor:
|
|||
ms = math.ceil(timeout * 1e3)
|
||||
if ms >= INFINITE:
|
||||
raise ValueError("timeout too big")
|
||||
|
||||
while True:
|
||||
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
|
||||
if status is None:
|
||||
return
|
||||
ms = 0
|
||||
|
||||
err, transferred, key, address = status
|
||||
try:
|
||||
f, ov, obj, callback = self._cache.pop(address)
|
||||
|
@ -504,7 +530,6 @@ class IocpProactor:
|
|||
# handle which should be closed to avoid a leak.
|
||||
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
|
||||
_winapi.CloseHandle(key)
|
||||
ms = 0
|
||||
continue
|
||||
|
||||
if obj in self._stopped_serving:
|
||||
|
@ -520,7 +545,6 @@ class IocpProactor:
|
|||
else:
|
||||
f.set_result(value)
|
||||
self._results.append(f)
|
||||
ms = 0
|
||||
|
||||
def _stop_serving(self, obj):
|
||||
# obj is a socket or pipe handle. It will be closed in
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue