bpo-31699 Deadlocks in concurrent.futures.ProcessPoolExecutor with pickling error (#3895)

Fix deadlocks in :class:`concurrent.futures.ProcessPoolExecutor` when task arguments or results cause pickling or unpickling errors.
This should make sure that calls to the :class:`ProcessPoolExecutor` API always eventually return.
This commit is contained in:
Thomas Moreau 2018-01-05 11:15:54 +01:00 committed by Antoine Pitrou
parent 65f2a6dcc2
commit 94459fd7dc
5 changed files with 387 additions and 57 deletions

View file

@ -8,10 +8,10 @@ The follow diagram and text describe the data-flow through the system:
|======================= In-process =====================|== Out-of-process ==|
+----------+ +----------+ +--------+ +-----------+ +---------+
| | => | Work Ids | => | | => | Call Q | => | |
| | +----------+ | | +-----------+ | |
| | | ... | | | | ... | | |
| | | 6 | | | | 5, call() | | |
| | => | Work Ids | | | | Call Q | | Process |
| | +----------+ | | +-----------+ | Pool |
| | | ... | | | | ... | +---------+
| | | 6 | => | | => | 5, call() | => | |
| | | 7 | | | | ... | | |
| Process | | ... | | Local | +-----------+ | Process |
| Pool | +----------+ | Worker | | #1..n |
@ -52,6 +52,7 @@ import queue
from queue import Full
import multiprocessing as mp
from multiprocessing.connection import wait
from multiprocessing.queues import Queue
import threading
import weakref
from functools import partial
@ -72,16 +73,31 @@ import traceback
# workers to exit when their work queues are empty and then waits until the
# threads/processes finish.
_threads_queues = weakref.WeakKeyDictionary()
_threads_wakeups = weakref.WeakKeyDictionary()
_global_shutdown = False
class _ThreadWakeup:
__slot__ = ["_state"]
def __init__(self):
self._reader, self._writer = mp.Pipe(duplex=False)
def wakeup(self):
self._writer.send_bytes(b"")
def clear(self):
while self._reader.poll():
self._reader.recv_bytes()
def _python_exit():
global _global_shutdown
_global_shutdown = True
items = list(_threads_queues.items())
for t, q in items:
q.put(None)
for t, q in items:
items = list(_threads_wakeups.items())
for _, thread_wakeup in items:
thread_wakeup.wakeup()
for t, _ in items:
t.join()
# Controls how many more calls than processes will be queued in the call queue.
@ -90,6 +106,7 @@ def _python_exit():
# (Futures in the call queue cannot be cancelled).
EXTRA_QUEUED_CALLS = 1
# Hack to embed stringification of remote traceback in local traceback
class _RemoteTraceback(Exception):
@ -132,6 +149,25 @@ class _CallItem(object):
self.kwargs = kwargs
class _SafeQueue(Queue):
"""Safe Queue set exception to the future object linked to a job"""
def __init__(self, max_size=0, *, ctx, pending_work_items):
self.pending_work_items = pending_work_items
super().__init__(max_size, ctx=ctx)
def _on_queue_feeder_error(self, e, obj):
if isinstance(obj, _CallItem):
tb = traceback.format_exception(type(e), e, e.__traceback__)
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
# work_item can be None if another process terminated. In this case,
# the queue_manager_thread fails all work_items with BrokenProcessPool
if work_item is not None:
work_item.future.set_exception(e)
else:
super()._on_queue_feeder_error(e, obj)
def _get_chunks(*iterables, chunksize):
""" Iterates over zip()ed iterables in chunks. """
it = zip(*iterables)
@ -152,6 +188,17 @@ def _process_chunk(fn, chunk):
"""
return [fn(*args) for args in chunk]
def _sendback_result(result_queue, work_id, result=None, exception=None):
"""Safely send back the given result or exception"""
try:
result_queue.put(_ResultItem(work_id, result=result,
exception=exception))
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
result_queue.put(_ResultItem(work_id, exception=exc))
def _process_worker(call_queue, result_queue, initializer, initargs):
"""Evaluates calls from call_queue and places the results in result_queue.
@ -183,10 +230,9 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
result_queue.put(_ResultItem(call_item.work_id, exception=exc))
_sendback_result(result_queue, call_item.work_id, exception=exc)
else:
result_queue.put(_ResultItem(call_item.work_id,
result=r))
_sendback_result(result_queue, call_item.work_id, result=r)
# Liberate the resource as soon as possible, to avoid holding onto
# open files or shared memory that is not needed anymore
@ -230,12 +276,14 @@ def _add_call_item_to_queue(pending_work_items,
del pending_work_items[work_id]
continue
def _queue_management_worker(executor_reference,
processes,
pending_work_items,
work_ids_queue,
call_queue,
result_queue):
result_queue,
thread_wakeup):
"""Manages the communication between this process and the worker processes.
This function is run in a local thread.
@ -253,6 +301,9 @@ def _queue_management_worker(executor_reference,
derived from _WorkItems for processing by the process workers.
result_queue: A ctx.SimpleQueue of _ResultItems generated by the
process workers.
thread_wakeup: A _ThreadWakeup to allow waking up the
queue_manager_thread from the main Thread and avoid deadlocks
caused by permanently locked queues.
"""
executor = None
@ -261,10 +312,21 @@ def _queue_management_worker(executor_reference,
or executor._shutdown_thread)
def shutdown_worker():
# This is an upper bound
nb_children_alive = sum(p.is_alive() for p in processes.values())
for i in range(0, nb_children_alive):
call_queue.put_nowait(None)
# This is an upper bound on the number of children alive.
n_children_alive = sum(p.is_alive() for p in processes.values())
n_children_to_stop = n_children_alive
n_sentinels_sent = 0
# Send the right number of sentinels, to make sure all children are
# properly terminated.
while n_sentinels_sent < n_children_to_stop and n_children_alive > 0:
for i in range(n_children_to_stop - n_sentinels_sent):
try:
call_queue.put_nowait(None)
n_sentinels_sent += 1
except Full:
break
n_children_alive = sum(p.is_alive() for p in processes.values())
# Release the queue's resources as soon as possible.
call_queue.close()
# If .join() is not called on the created processes then
@ -272,19 +334,37 @@ def _queue_management_worker(executor_reference,
for p in processes.values():
p.join()
reader = result_queue._reader
result_reader = result_queue._reader
wakeup_reader = thread_wakeup._reader
readers = [result_reader, wakeup_reader]
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
sentinels = [p.sentinel for p in processes.values()]
assert sentinels
ready = wait([reader] + sentinels)
if reader in ready:
result_item = reader.recv()
else:
# Wait for a result to be ready in the result_queue while checking
# that all worker processes are still running, or for a wake up
# signal send. The wake up signals come either from new tasks being
# submitted, from the executor being shutdown/gc-ed, or from the
# shutdown of the python interpreter.
worker_sentinels = [p.sentinel for p in processes.values()]
ready = wait(readers + worker_sentinels)
cause = None
is_broken = True
if result_reader in ready:
try:
result_item = result_reader.recv()
is_broken = False
except BaseException as e:
cause = traceback.format_exception(type(e), e, e.__traceback__)
elif wakeup_reader in ready:
is_broken = False
result_item = None
thread_wakeup.clear()
if is_broken:
# Mark the process pool broken so that submits fail right now.
executor = executor_reference()
if executor is not None:
@ -293,14 +373,15 @@ def _queue_management_worker(executor_reference,
'usable anymore')
executor._shutdown_thread = True
executor = None
bpe = BrokenProcessPool("A process in the process pool was "
"terminated abruptly while the future was "
"running or pending.")
if cause is not None:
bpe.__cause__ = _RemoteTraceback(
f"\n'''\n{''.join(cause)}'''")
# All futures in flight must be marked failed
for work_id, work_item in pending_work_items.items():
work_item.future.set_exception(
BrokenProcessPool(
"A process in the process pool was "
"terminated abruptly while the future was "
"running or pending."
))
work_item.future.set_exception(bpe)
# Delete references to object. See issue16284
del work_item
pending_work_items.clear()
@ -329,6 +410,9 @@ def _queue_management_worker(executor_reference,
work_item.future.set_result(result_item.result)
# Delete references to object. See issue16284
del work_item
# Delete reference to result_item
del result_item
# Check whether we should start shutting down.
executor = executor_reference()
# No more work items can be added if:
@ -348,8 +432,11 @@ def _queue_management_worker(executor_reference,
pass
executor = None
_system_limits_checked = False
_system_limited = None
def _check_system_limits():
global _system_limits_checked, _system_limited
if _system_limits_checked:
@ -369,7 +456,8 @@ def _check_system_limits():
# minimum number of semaphores available
# according to POSIX
return
_system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
_system_limited = ("system provides too few semaphores (%d"
" available, 256 necessary)" % nsems_max)
raise NotImplementedError(_system_limited)
@ -415,6 +503,7 @@ class ProcessPoolExecutor(_base.Executor):
raise ValueError("max_workers must be greater than 0")
self._max_workers = max_workers
if mp_context is None:
mp_context = mp.get_context()
self._mp_context = mp_context
@ -424,18 +513,9 @@ class ProcessPoolExecutor(_base.Executor):
self._initializer = initializer
self._initargs = initargs
# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
# because futures in the call queue cannot be cancelled.
queue_size = self._max_workers + EXTRA_QUEUED_CALLS
self._call_queue = mp_context.Queue(queue_size)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
# processes anyway, so silence the tracebacks.
self._call_queue._ignore_epipe = True
self._result_queue = mp_context.SimpleQueue()
self._work_ids = queue.Queue()
# Management thread
self._queue_management_thread = None
# Map of pids to processes
self._processes = {}
@ -446,12 +526,39 @@ class ProcessPoolExecutor(_base.Executor):
self._queue_count = 0
self._pending_work_items = {}
# Create communication channels for the executor
# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
# because futures in the call queue cannot be cancelled.
queue_size = self._max_workers + EXTRA_QUEUED_CALLS
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
pending_work_items=self._pending_work_items)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
# processes anyway, so silence the tracebacks.
self._call_queue._ignore_epipe = True
self._result_queue = mp_context.SimpleQueue()
self._work_ids = queue.Queue()
# _ThreadWakeup is a communication channel used to interrupt the wait
# of the main loop of queue_manager_thread from another thread (e.g.
# when calling executor.submit or executor.shutdown). We do not use the
# _result_queue to send the wakeup signal to the queue_manager_thread
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
self._queue_management_thread_wakeup = _ThreadWakeup()
def _start_queue_management_thread(self):
# When the executor gets lost, the weakref callback will wake up
# the queue management thread.
def weakref_cb(_, q=self._result_queue):
q.put(None)
if self._queue_management_thread is None:
# When the executor gets garbarge collected, the weakref callback
# will wake up the queue management thread so that it can terminate
# if there is no pending work item.
def weakref_cb(_,
thread_wakeup=self._queue_management_thread_wakeup):
mp.util.debug('Executor collected: triggering callback for'
' QueueManager wakeup')
thread_wakeup.wakeup()
# Start the processes so that their sentinels are known.
self._adjust_process_count()
self._queue_management_thread = threading.Thread(
@ -461,10 +568,13 @@ class ProcessPoolExecutor(_base.Executor):
self._pending_work_items,
self._work_ids,
self._call_queue,
self._result_queue))
self._result_queue,
self._queue_management_thread_wakeup),
name="QueueManagerThread")
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
_threads_queues[self._queue_management_thread] = self._result_queue
_threads_wakeups[self._queue_management_thread] = \
self._queue_management_thread_wakeup
def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
@ -491,7 +601,7 @@ class ProcessPoolExecutor(_base.Executor):
self._work_ids.put(self._queue_count)
self._queue_count += 1
# Wake up queue management thread
self._result_queue.put(None)
self._queue_management_thread_wakeup.wakeup()
self._start_queue_management_thread()
return f
@ -531,7 +641,7 @@ class ProcessPoolExecutor(_base.Executor):
self._shutdown_thread = True
if self._queue_management_thread:
# Wake up queue management thread
self._result_queue.put(None)
self._queue_management_thread_wakeup.wakeup()
if wait:
self._queue_management_thread.join()
# To reduce the risk of opening too many files, remove references to