Merge 3.4 (asyncio)

This commit is contained in:
Victor Stinner 2015-01-22 22:55:31 +01:00
commit 58c85144db
2 changed files with 55 additions and 113 deletions

View file

@ -29,6 +29,12 @@ INFINITE = 0xffffffff
ERROR_CONNECTION_REFUSED = 1225
ERROR_CONNECTION_ABORTED = 1236
# Initial delay in seconds for connect_pipe() before retrying to connect
CONNECT_PIPE_INIT_DELAY = 0.001
# Maximum delay in seconds for connect_pipe() before retrying to connect
CONNECT_PIPE_MAX_DELAY = 0.100
class _OverlappedFuture(futures.Future):
"""Subclass of Future which represents an overlapped operation.
@ -495,25 +501,28 @@ class IocpProactor:
return self._register(ov, pipe, finish_accept_pipe,
register=False)
def connect_pipe(self, address):
ov = _overlapped.Overlapped(NULL)
ov.WaitNamedPipeAndConnect(address, self._iocp, ov.address)
def finish_connect_pipe(err, handle, ov):
# err, handle were arguments passed to PostQueuedCompletionStatus()
# in a function run in a thread pool.
if err == _overlapped.ERROR_SEM_TIMEOUT:
# Connection did not succeed within time limit.
msg = _overlapped.FormatMessage(err)
raise ConnectionRefusedError(0, msg, None, err)
elif err != 0:
msg = _overlapped.FormatMessage(err)
raise OSError(0, msg, None, err)
def _connect_pipe(self, fut, address, delay):
# Unfortunately there is no way to do an overlapped connect to a pipe.
# Call CreateFile() in a loop until it doesn't fail with
# ERROR_PIPE_BUSY
try:
handle = _overlapped.ConnectPipe(address)
except OSError as exc:
if exc.winerror == _overlapped.ERROR_PIPE_BUSY:
# Polling: retry later
delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
self._loop.call_later(delay,
self._connect_pipe, fut, address, delay)
else:
return windows_utils.PipeHandle(handle)
fut.set_exception(exc)
else:
pipe = windows_utils.PipeHandle(handle)
fut.set_result(pipe)
return self._register(ov, None, finish_connect_pipe,
wait_for_post=True)
def connect_pipe(self, address):
fut = futures.Future(loop=self._loop)
self._connect_pipe(fut, address, CONNECT_PIPE_INIT_DELAY)
return fut
def wait_for_handle(self, handle, timeout=None):
"""Wait for a handle.
@ -693,12 +702,16 @@ class IocpProactor:
# queues a task to Windows' thread pool. This cannot
# be cancelled, so just forget it.
del self._cache[address]
# FIXME: Tulip issue 196: remove this case, it should not happen
elif fut.done() and not fut.cancelled():
del self._cache[address]
elif fut.cancelled():
# Nothing to do with cancelled futures
pass
elif isinstance(fut, _WaitCancelFuture):
# _WaitCancelFuture must not be cancelled
pass
elif fut.done():
# FIXME: Tulip issue 196: remove this case, it should not
# happen
del self._cache[address]
else:
try:
fut.cancel()