Merge 3.4 (asyncio)

This commit is contained in:
Victor Stinner 2015-01-26 15:04:15 +01:00
commit d7770d9bec
3 changed files with 46 additions and 22 deletions

View file

@ -257,7 +257,7 @@ class PipeServer(object):
def _server_pipe_handle(self, first): def _server_pipe_handle(self, first):
# Return a wrapper for a new pipe handle. # Return a wrapper for a new pipe handle.
if self._address is None: if self.closed():
return None return None
flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
if first: if first:
@ -273,6 +273,9 @@ class PipeServer(object):
self._free_instances.add(pipe) self._free_instances.add(pipe)
return pipe return pipe
def closed(self):
return (self._address is None)
def close(self): def close(self):
if self._accept_pipe_future is not None: if self._accept_pipe_future is not None:
self._accept_pipe_future.cancel() self._accept_pipe_future.cancel()
@ -325,12 +328,21 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
if f: if f:
pipe = f.result() pipe = f.result()
server._free_instances.discard(pipe) server._free_instances.discard(pipe)
if server.closed():
# A client connected before the server was closed:
# drop the client (close the pipe) and exit
pipe.close()
return
protocol = protocol_factory() protocol = protocol_factory()
self._make_duplex_pipe_transport( self._make_duplex_pipe_transport(
pipe, protocol, extra={'addr': address}) pipe, protocol, extra={'addr': address})
pipe = server._get_unconnected_pipe() pipe = server._get_unconnected_pipe()
if pipe is None: if pipe is None:
return return
f = self._proactor.accept_pipe(pipe) f = self._proactor.accept_pipe(pipe)
except OSError as exc: except OSError as exc:
if pipe and pipe.fileno() != -1: if pipe and pipe.fileno() != -1:
@ -506,28 +518,25 @@ class IocpProactor:
return self._register(ov, pipe, finish_accept_pipe) return self._register(ov, pipe, finish_accept_pipe)
def _connect_pipe(self, fut, address, delay): @coroutine
def connect_pipe(self, address):
delay = CONNECT_PIPE_INIT_DELAY
while True:
# Unfortunately there is no way to do an overlapped connect to a pipe. # Unfortunately there is no way to do an overlapped connect to a pipe.
# Call CreateFile() in a loop until it doesn't fail with # Call CreateFile() in a loop until it doesn't fail with
# ERROR_PIPE_BUSY # ERROR_PIPE_BUSY
try: try:
handle = _overlapped.ConnectPipe(address) handle = _overlapped.ConnectPipe(address)
break
except OSError as exc: except OSError as exc:
if exc.winerror == _overlapped.ERROR_PIPE_BUSY: if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
# Polling: retry later raise
delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
self._loop.call_later(delay,
self._connect_pipe, fut, address, delay)
else:
fut.set_exception(exc)
else:
pipe = windows_utils.PipeHandle(handle)
fut.set_result(pipe)
def connect_pipe(self, address): # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
fut = futures.Future(loop=self._loop) delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
self._connect_pipe(fut, address, CONNECT_PIPE_INIT_DELAY) yield from tasks.sleep(delay, loop=self._loop)
return fut
return windows_utils.PipeHandle(handle)
def wait_for_handle(self, handle, timeout=None): def wait_for_handle(self, handle, timeout=None):
"""Wait for a handle. """Wait for a handle.

View file

@ -147,6 +147,8 @@ class PipeHandle:
return self._handle return self._handle
def fileno(self): def fileno(self):
if self._handle is None:
raise ValueError("I/O operatioon on closed pipe")
return self._handle return self._handle
def close(self, *, CloseHandle=_winapi.CloseHandle): def close(self, *, CloseHandle=_winapi.CloseHandle):

View file

@ -1,6 +1,7 @@
import os import os
import sys import sys
import unittest import unittest
from unittest import mock
if sys.platform != 'win32': if sys.platform != 'win32':
raise unittest.SkipTest('Windows only') raise unittest.SkipTest('Windows only')
@ -91,6 +92,18 @@ class ProactorTests(test_utils.TestCase):
return 'done' return 'done'
def test_connect_pipe_cancel(self):
exc = OSError()
exc.winerror = _overlapped.ERROR_PIPE_BUSY
with mock.patch.object(_overlapped, 'ConnectPipe', side_effect=exc) as connect:
coro = self.loop._proactor.connect_pipe('pipe_address')
task = self.loop.create_task(coro)
# check that it's possible to cancel connect_pipe()
task.cancel()
with self.assertRaises(asyncio.CancelledError):
self.loop.run_until_complete(task)
def test_wait_for_handle(self): def test_wait_for_handle(self):
event = _overlapped.CreateEvent(None, True, False, None) event = _overlapped.CreateEvent(None, True, False, None)
self.addCleanup(_winapi.CloseHandle, event) self.addCleanup(_winapi.CloseHandle, event)