[3.12] gh-105829: Fix concurrent.futures.ProcessPoolExecutor deadlock (GH-108513) (#109784)

This fixes issue GH-105829, https://github.com/python/cpython/issues/105829
(cherry picked from commit 405b06375a)
This commit is contained in:
elfstrom 2023-10-02 17:24:19 +02:00 committed by GitHub
parent 5e6e99646e
commit c2cadb0ec2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 87 additions and 4 deletions

View file

@ -71,6 +71,11 @@ class _ThreadWakeup:
self._reader, self._writer = mp.Pipe(duplex=False) self._reader, self._writer = mp.Pipe(duplex=False)
def close(self): def close(self):
# Please note that we do not take the shutdown lock when
# calling clear() (to avoid deadlocking) so this method can
# only be called safely from the same thread as all calls to
# clear() even if you hold the shutdown lock. Otherwise we
# might try to read from the closed pipe.
if not self._closed: if not self._closed:
self._closed = True self._closed = True
self._writer.close() self._writer.close()
@ -426,8 +431,12 @@ class _ExecutorManagerThread(threading.Thread):
elif wakeup_reader in ready: elif wakeup_reader in ready:
is_broken = False is_broken = False
with self.shutdown_lock: # No need to hold the _shutdown_lock here because:
self.thread_wakeup.clear() # 1. we're the only thread to use the wakeup reader
# 2. we're also the only thread to call thread_wakeup.close()
# 3. we want to avoid a possible deadlock when both reader and writer
# would block (gh-105829)
self.thread_wakeup.clear()
return result_item, is_broken, cause return result_item, is_broken, cause
@ -710,7 +719,10 @@ class ProcessPoolExecutor(_base.Executor):
# as it could result in a deadlock if a worker process dies with the # as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired. # _result_queue write lock still acquired.
# #
# _shutdown_lock must be locked to access _ThreadWakeup. # _shutdown_lock must be locked to access _ThreadWakeup.close() and
# .wakeup(). Care must also be taken to not call clear or close from
# more than one thread since _ThreadWakeup.clear() is not protected by
# the _shutdown_lock
self._executor_manager_thread_wakeup = _ThreadWakeup() self._executor_manager_thread_wakeup = _ThreadWakeup()
# Create communication channels for the executor # Create communication channels for the executor

View file

@ -1,10 +1,13 @@
import contextlib import contextlib
import queue
import signal
import sys import sys
import time import time
import unittest import unittest
import unittest.mock
from pickle import PicklingError from pickle import PicklingError
from concurrent import futures from concurrent import futures
from concurrent.futures.process import BrokenProcessPool from concurrent.futures.process import BrokenProcessPool, _ThreadWakeup
from test import support from test import support
@ -239,6 +242,73 @@ class ExecutorDeadlockTest:
with self.assertRaises(BrokenProcessPool): with self.assertRaises(BrokenProcessPool):
list(executor.map(_crash_with_data, [data] * 10)) list(executor.map(_crash_with_data, [data] * 10))
def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self):
# Issue #105829: The _ExecutorManagerThread wakeup pipe could
# fill up and block. See: https://github.com/python/cpython/issues/105829
# Lots of cargo culting while writing this test, apologies if
# something is really stupid...
self.executor.shutdown(wait=True)
if not hasattr(signal, 'alarm'):
raise unittest.SkipTest(
"Tested platform does not support the alarm signal")
def timeout(_signum, _frame):
import faulthandler
faulthandler.dump_traceback()
raise RuntimeError("timed out while submitting jobs?")
thread_run = futures.process._ExecutorManagerThread.run
def mock_run(self):
# Delay thread startup so the wakeup pipe can fill up and block
time.sleep(3)
thread_run(self)
class MockWakeup(_ThreadWakeup):
"""Mock wakeup object to force the wakeup to block"""
def __init__(self):
super().__init__()
self._dummy_queue = queue.Queue(maxsize=1)
def wakeup(self):
self._dummy_queue.put(None, block=True)
super().wakeup()
def clear(self):
try:
while True:
self._dummy_queue.get_nowait()
except queue.Empty:
super().clear()
with (unittest.mock.patch.object(futures.process._ExecutorManagerThread,
'run', mock_run),
unittest.mock.patch('concurrent.futures.process._ThreadWakeup',
MockWakeup)):
with self.executor_type(max_workers=2,
mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock
job_num = 100
job_data = range(job_num)
# Need to use sigalarm for timeout detection because
# Executor.submit is not guarded by any timeout (both
# self._work_ids.put(self._queue_count) and
# self._executor_manager_thread_wakeup.wakeup() might
# timeout, maybe more?). In this specific case it was
# the wakeup call that deadlocked on a blocking pipe.
old_handler = signal.signal(signal.SIGALRM, timeout)
try:
signal.alarm(int(self.TIMEOUT))
self.assertEqual(job_num, len(list(executor.map(int, job_data))))
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
create_executor_tests(globals(), ExecutorDeadlockTest, create_executor_tests(globals(), ExecutorDeadlockTest,
executor_mixins=(ProcessPoolForkMixin, executor_mixins=(ProcessPoolForkMixin,

View file

@ -0,0 +1 @@
Fix concurrent.futures.ProcessPoolExecutor deadlock