Issue #12328: Fix multiprocessing's use of overlapped I/O on Windows.

Also, add a multiprocessing.connection.wait(rlist, timeout=None) function
for polling multiple objects at once.  Patch by sbt.

Complete changelist from sbt's patch:

* Adds a wait(rlist, timeout=None) function for polling multiple
  objects at once.  On Unix this is just a wrapper for
  select(rlist, [], [], timeout=None).

* Removes use of the SentinelReady exception and the sentinels argument
  to certain methods.  concurrent.futures.process has been changed to
  use wait() instead of SentinelReady.

* Fixes bugs concerning PipeConnection.poll() and messages of zero
  length.

* Fixes PipeListener.accept() to call ConnectNamedPipe() with
  overlapped=True.

* Fixes Queue.empty() and SimpleQueue.empty() so that they are
  threadsafe on Windows.

* Now PipeConnection.poll() and wait() will not modify the pipe except
  possibly by consuming a zero length message.  (Previously poll()
  could consume a partial message.)

* All of multiprocesing's pipe related blocking functions/methods are
  now interruptible by SIGINT on Windows.
This commit is contained in:
Antoine Pitrou 2012-03-05 19:28:37 +01:00
parent 1e88f3faa6
commit bdb1cf1ca5
7 changed files with 581 additions and 152 deletions

View file

@ -32,7 +32,7 @@
# SUCH DAMAGE.
#
__all__ = [ 'Client', 'Listener', 'Pipe' ]
__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
import io
import os
@ -58,8 +58,6 @@ except ImportError:
raise
win32 = None
_select = _eintr_retry(select.select)
#
#
#
@ -122,15 +120,6 @@ def address_type(address):
else:
raise ValueError('address type of %r unrecognized' % address)
class SentinelReady(Exception):
"""
Raised when a sentinel is ready when polling.
"""
def __init__(self, *args):
Exception.__init__(self, *args)
self.sentinels = args[0]
#
# Connection classes
#
@ -268,11 +257,11 @@ class _ConnectionBase:
(offset + size) // itemsize])
return size
def recv(self, sentinels=None):
def recv(self):
"""Receive a (picklable) object"""
self._check_closed()
self._check_readable()
buf = self._recv_bytes(sentinels=sentinels)
buf = self._recv_bytes()
return pickle.loads(buf.getbuffer())
def poll(self, timeout=0.0):
@ -290,85 +279,80 @@ if win32:
Overlapped I/O is used, so the handles must have been created
with FILE_FLAG_OVERLAPPED.
"""
_buffered = b''
_got_empty_message = False
def _close(self, _CloseHandle=win32.CloseHandle):
_CloseHandle(self._handle)
def _send_bytes(self, buf):
overlapped = win32.WriteFile(self._handle, buf, overlapped=True)
nwritten, complete = overlapped.GetOverlappedResult(True)
assert complete
ov, err = win32.WriteFile(self._handle, buf, overlapped=True)
try:
if err == win32.ERROR_IO_PENDING:
waitres = win32.WaitForMultipleObjects(
[ov.event], False, INFINITE)
assert waitres == WAIT_OBJECT_0
except:
ov.cancel()
raise
finally:
nwritten, err = ov.GetOverlappedResult(True)
assert err == 0
assert nwritten == len(buf)
def _recv_bytes(self, maxsize=None, sentinels=()):
if sentinels:
self._poll(-1.0, sentinels)
buf = io.BytesIO()
firstchunk = self._buffered
if firstchunk:
lenfirstchunk = len(firstchunk)
buf.write(firstchunk)
self._buffered = b''
def _recv_bytes(self, maxsize=None):
if self._got_empty_message:
self._got_empty_message = False
return io.BytesIO()
else:
# A reasonable size for the first chunk transfer
bufsize = 128
if maxsize is not None and maxsize < bufsize:
bufsize = maxsize
bsize = 128 if maxsize is None else min(maxsize, 128)
try:
overlapped = win32.ReadFile(self._handle, bufsize, overlapped=True)
lenfirstchunk, complete = overlapped.GetOverlappedResult(True)
firstchunk = overlapped.getbuffer()
assert lenfirstchunk == len(firstchunk)
ov, err = win32.ReadFile(self._handle, bsize,
overlapped=True)
try:
if err == win32.ERROR_IO_PENDING:
waitres = win32.WaitForMultipleObjects(
[ov.event], False, INFINITE)
assert waitres == WAIT_OBJECT_0
except:
ov.cancel()
raise
finally:
nread, err = ov.GetOverlappedResult(True)
if err == 0:
f = io.BytesIO()
f.write(ov.getbuffer())
return f
elif err == win32.ERROR_MORE_DATA:
return self._get_more_data(ov, maxsize)
except IOError as e:
if e.winerror == win32.ERROR_BROKEN_PIPE:
raise EOFError
raise
buf.write(firstchunk)
if complete:
return buf
navail, nleft = win32.PeekNamedPipe(self._handle)
if maxsize is not None and lenfirstchunk + nleft > maxsize:
return None
if nleft > 0:
overlapped = win32.ReadFile(self._handle, nleft, overlapped=True)
res, complete = overlapped.GetOverlappedResult(True)
assert res == nleft
assert complete
buf.write(overlapped.getbuffer())
return buf
else:
raise
raise RuntimeError("shouldn't get here; expected KeyboardInterrupt")
def _poll(self, timeout, sentinels=()):
# Fast non-blocking path
navail, nleft = win32.PeekNamedPipe(self._handle)
if navail > 0:
def _poll(self, timeout):
if (self._got_empty_message or
win32.PeekNamedPipe(self._handle)[0] != 0):
return True
elif timeout == 0.0:
return False
# Blocking: use overlapped I/O
if timeout < 0.0:
timeout = INFINITE
else:
timeout = int(timeout * 1000 + 0.5)
overlapped = win32.ReadFile(self._handle, 1, overlapped=True)
try:
handles = [overlapped.event]
handles += sentinels
res = win32.WaitForMultipleObjects(handles, False, timeout)
finally:
# Always cancel overlapped I/O in the same thread
# (because CancelIoEx() appears only in Vista)
overlapped.cancel()
if res == WAIT_TIMEOUT:
return False
idx = res - WAIT_OBJECT_0
if idx == 0:
# I/O was successful, store received data
overlapped.GetOverlappedResult(True)
self._buffered += overlapped.getbuffer()
return True
assert 0 < idx < len(handles)
raise SentinelReady([handles[idx]])
if timeout < 0:
timeout = None
return bool(wait([self], timeout))
def _get_more_data(self, ov, maxsize):
buf = ov.getbuffer()
f = io.BytesIO()
f.write(buf)
left = win32.PeekNamedPipe(self._handle)[1]
assert left > 0
if maxsize is not None and len(buf) + left > maxsize:
self._bad_message_length()
ov, err = win32.ReadFile(self._handle, left, overlapped=True)
rbytes, err = ov.GetOverlappedResult(True)
assert err == 0
assert rbytes == left
f.write(ov.getbuffer())
return f
class Connection(_ConnectionBase):
@ -397,17 +381,11 @@ class Connection(_ConnectionBase):
break
buf = buf[n:]
def _recv(self, size, sentinels=(), read=_read):
def _recv(self, size, read=_read):
buf = io.BytesIO()
handle = self._handle
if sentinels:
handles = [handle] + sentinels
remaining = size
while remaining > 0:
if sentinels:
r = _select(handles, [], [])[0]
if handle not in r:
raise SentinelReady(r)
chunk = read(handle, remaining)
n = len(chunk)
if n == 0:
@ -428,17 +406,17 @@ class Connection(_ConnectionBase):
if n > 0:
self._send(buf)
def _recv_bytes(self, maxsize=None, sentinels=()):
buf = self._recv(4, sentinels)
def _recv_bytes(self, maxsize=None):
buf = self._recv(4)
size, = struct.unpack("!i", buf.getvalue())
if maxsize is not None and size > maxsize:
return None
return self._recv(size, sentinels)
return self._recv(size)
def _poll(self, timeout):
if timeout < 0.0:
timeout = None
r = _select([self._handle], [], [], timeout)[0]
r = wait([self._handle], timeout)
return bool(r)
@ -559,7 +537,8 @@ else:
)
overlapped = win32.ConnectNamedPipe(h1, overlapped=True)
overlapped.GetOverlappedResult(True)
_, err = overlapped.GetOverlappedResult(True)
assert err == 0
c1 = PipeConnection(h1, writable=duplex)
c2 = PipeConnection(h2, readable=duplex)
@ -633,39 +612,40 @@ if sys.platform == 'win32':
'''
def __init__(self, address, backlog=None):
self._address = address
handle = win32.CreateNamedPipe(
address, win32.PIPE_ACCESS_DUPLEX |
win32.FILE_FLAG_FIRST_PIPE_INSTANCE,
win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
win32.PIPE_WAIT,
win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
win32.NMPWAIT_WAIT_FOREVER, win32.NULL
)
self._handle_queue = [handle]
self._handle_queue = [self._new_handle(first=True)]
self._last_accepted = None
sub_debug('listener created with address=%r', self._address)
self.close = Finalize(
self, PipeListener._finalize_pipe_listener,
args=(self._handle_queue, self._address), exitpriority=0
)
def accept(self):
newhandle = win32.CreateNamedPipe(
self._address, win32.PIPE_ACCESS_DUPLEX,
def _new_handle(self, first=False):
flags = win32.PIPE_ACCESS_DUPLEX | win32.FILE_FLAG_OVERLAPPED
if first:
flags |= win32.FILE_FLAG_FIRST_PIPE_INSTANCE
return win32.CreateNamedPipe(
self._address, flags,
win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
win32.PIPE_WAIT,
win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
win32.NMPWAIT_WAIT_FOREVER, win32.NULL
)
self._handle_queue.append(newhandle)
def accept(self):
self._handle_queue.append(self._new_handle())
handle = self._handle_queue.pop(0)
ov = win32.ConnectNamedPipe(handle, overlapped=True)
try:
win32.ConnectNamedPipe(handle, win32.NULL)
except WindowsError as e:
if e.winerror != win32.ERROR_PIPE_CONNECTED:
raise
res = win32.WaitForMultipleObjects([ov.event], False, INFINITE)
except:
ov.cancel()
win32.CloseHandle(handle)
raise
finally:
_, err = ov.GetOverlappedResult(True)
assert err == 0
return PipeConnection(handle)
@staticmethod
@ -684,7 +664,8 @@ if sys.platform == 'win32':
win32.WaitNamedPipe(address, 1000)
h = win32.CreateFile(
address, win32.GENERIC_READ | win32.GENERIC_WRITE,
0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
0, win32.NULL, win32.OPEN_EXISTING,
win32.FILE_FLAG_OVERLAPPED, win32.NULL
)
except WindowsError as e:
if e.winerror not in (win32.ERROR_SEM_TIMEOUT,
@ -773,6 +754,125 @@ def XmlClient(*args, **kwds):
import xmlrpc.client as xmlrpclib
return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
#
# Wait
#
if sys.platform == 'win32':
def _exhaustive_wait(handles, timeout):
# Return ALL handles which are currently signalled. (Only
# returning the first signalled might create starvation issues.)
L = list(handles)
ready = []
while L:
res = win32.WaitForMultipleObjects(L, False, timeout)
if res == WAIT_TIMEOUT:
break
elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L):
res -= WAIT_OBJECT_0
elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L):
res -= WAIT_ABANDONED_0
else:
raise RuntimeError('Should not get here')
ready.append(L[res])
L = L[res+1:]
timeout = 0
return ready
_ready_errors = {win32.ERROR_BROKEN_PIPE, win32.ERROR_NETNAME_DELETED}
def wait(object_list, timeout=None):
'''
Wait till an object in object_list is ready/readable.
Returns list of those objects in object_list which are ready/readable.
'''
if timeout is None:
timeout = INFINITE
elif timeout < 0:
timeout = 0
else:
timeout = int(timeout * 1000 + 0.5)
object_list = list(object_list)
waithandle_to_obj = {}
ov_list = []
ready_objects = set()
ready_handles = set()
try:
for o in object_list:
try:
fileno = getattr(o, 'fileno')
except AttributeError:
waithandle_to_obj[o.__index__()] = o
else:
# start an overlapped read of length zero
try:
ov, err = win32.ReadFile(fileno(), 0, True)
except OSError as e:
err = e.winerror
if err not in _ready_errors:
raise
if err == win32.ERROR_IO_PENDING:
ov_list.append(ov)
waithandle_to_obj[ov.event] = o
else:
# If o.fileno() is an overlapped pipe handle and
# err == 0 then there is a zero length message
# in the pipe, but it HAS NOT been consumed.
ready_objects.add(o)
timeout = 0
ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
finally:
# request that overlapped reads stop
for ov in ov_list:
ov.cancel()
# wait for all overlapped reads to stop
for ov in ov_list:
try:
_, err = ov.GetOverlappedResult(True)
except OSError as e:
err = e.winerror
if err not in _ready_errors:
raise
if err != win32.ERROR_OPERATION_ABORTED:
o = waithandle_to_obj[ov.event]
ready_objects.add(o)
if err == 0:
# If o.fileno() is an overlapped pipe handle then
# a zero length message HAS been consumed.
if hasattr(o, '_got_empty_message'):
o._got_empty_message = True
ready_objects.update(waithandle_to_obj[h] for h in ready_handles)
return [o for o in object_list if o in ready_objects]
else:
def wait(object_list, timeout=None):
'''
Wait till an object in object_list is ready/readable.
Returns list of those objects in object_list which are ready/readable.
'''
if timeout is not None:
if timeout <= 0:
return select.select(object_list, [], [], 0)[0]
else:
deadline = time.time() + timeout
while True:
try:
return select.select(object_list, [], [], timeout)[0]
except OSError as e:
if e.errno != errno.EINTR:
raise
if timeout is not None:
timeout = deadline - time.time()
# Late import because of circular import
from multiprocessing.forking import duplicate, close