mirror of
https://github.com/python/cpython.git
synced 2025-09-26 18:29:57 +00:00
[3.12] gh-107219: Fix concurrent.futures terminate_broken() (GH-109244) (#109254)
gh-107219: Fix concurrent.futures terminate_broken() (GH-109244)
Fix a race condition in concurrent.futures. When a process in the
process pool was terminated abruptly (while the future was running or
pending), close the connection write end. If the call queue is
blocked on sending bytes to a worker process, closing the connection
write end interrupts the send, so the queue can be closed.
Changes:
* _ExecutorManagerThread.terminate_broken() now closes
call_queue._writer.
* multiprocessing PipeConnection.close() now interrupts
WaitForMultipleObjects() in _send_bytes() by cancelling the
overlapped operation.
(cherry picked from commit a9b1f84790
)
Co-authored-by: Victor Stinner <vstinner@python.org>
This commit is contained in:
parent
3e1c9e8264
commit
1d8c18c39d
3 changed files with 27 additions and 0 deletions
|
@ -503,6 +503,10 @@ class _ExecutorManagerThread(threading.Thread):
|
||||||
# https://github.com/python/cpython/issues/94777
|
# https://github.com/python/cpython/issues/94777
|
||||||
self.call_queue._reader.close()
|
self.call_queue._reader.close()
|
||||||
|
|
||||||
|
# gh-107219: Close the connection writer which can unblock
|
||||||
|
# Queue._feed() if it was stuck in send_bytes().
|
||||||
|
self.call_queue._writer.close()
|
||||||
|
|
||||||
# clean up resources
|
# clean up resources
|
||||||
self.join_executor_internals()
|
self.join_executor_internals()
|
||||||
|
|
||||||
|
|
|
@ -9,6 +9,7 @@
|
||||||
|
|
||||||
__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
|
__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
|
||||||
|
|
||||||
|
import errno
|
||||||
import io
|
import io
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
@ -41,6 +42,7 @@ except ImportError:
|
||||||
BUFSIZE = 8192
|
BUFSIZE = 8192
|
||||||
# A very generous timeout when it comes to local connections...
|
# A very generous timeout when it comes to local connections...
|
||||||
CONNECTION_TIMEOUT = 20.
|
CONNECTION_TIMEOUT = 20.
|
||||||
|
WSA_OPERATION_ABORTED = 995
|
||||||
|
|
||||||
_mmap_counter = itertools.count()
|
_mmap_counter = itertools.count()
|
||||||
|
|
||||||
|
@ -271,12 +273,22 @@ if _winapi:
|
||||||
with FILE_FLAG_OVERLAPPED.
|
with FILE_FLAG_OVERLAPPED.
|
||||||
"""
|
"""
|
||||||
_got_empty_message = False
|
_got_empty_message = False
|
||||||
|
_send_ov = None
|
||||||
|
|
||||||
def _close(self, _CloseHandle=_winapi.CloseHandle):
|
def _close(self, _CloseHandle=_winapi.CloseHandle):
|
||||||
|
ov = self._send_ov
|
||||||
|
if ov is not None:
|
||||||
|
# Interrupt WaitForMultipleObjects() in _send_bytes()
|
||||||
|
ov.cancel()
|
||||||
_CloseHandle(self._handle)
|
_CloseHandle(self._handle)
|
||||||
|
|
||||||
def _send_bytes(self, buf):
|
def _send_bytes(self, buf):
|
||||||
|
if self._send_ov is not None:
|
||||||
|
# A connection should only be used by a single thread
|
||||||
|
raise ValueError("concurrent send_bytes() calls "
|
||||||
|
"are not supported")
|
||||||
ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
|
ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
|
||||||
|
self._send_ov = ov
|
||||||
try:
|
try:
|
||||||
if err == _winapi.ERROR_IO_PENDING:
|
if err == _winapi.ERROR_IO_PENDING:
|
||||||
waitres = _winapi.WaitForMultipleObjects(
|
waitres = _winapi.WaitForMultipleObjects(
|
||||||
|
@ -286,7 +298,13 @@ if _winapi:
|
||||||
ov.cancel()
|
ov.cancel()
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
|
self._send_ov = None
|
||||||
nwritten, err = ov.GetOverlappedResult(True)
|
nwritten, err = ov.GetOverlappedResult(True)
|
||||||
|
if err == WSA_OPERATION_ABORTED:
|
||||||
|
# close() was called by another thread while
|
||||||
|
# WaitForMultipleObjects() was waiting for the overlapped
|
||||||
|
# operation.
|
||||||
|
raise OSError(errno.EPIPE, "handle is closed")
|
||||||
assert err == 0
|
assert err == 0
|
||||||
assert nwritten == len(buf)
|
assert nwritten == len(buf)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
Fix a race condition in ``concurrent.futures``. When a process in the
|
||||||
|
process pool was terminated abruptly (while the future was running or
|
||||||
|
pending), close the connection write end. If the call queue is blocked on
|
||||||
|
sending bytes to a worker process, closing the connection write end interrupts
|
||||||
|
the send, so the queue can be closed. Patch by Victor Stinner.
|
Loading…
Add table
Add a link
Reference in a new issue