bpo-44733: Add max_tasks_per_child to ProcessPoolExecutor (GH-27373)

Co-authored-by: Antoine Pitrou <antoine@python.org>
This commit is contained in:
Logan Jones 2021-11-20 15:19:41 -05:00 committed by GitHub
parent 123a3527dd
commit fdc0e09c33
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 98 additions and 16 deletions

View file

@ -231,7 +231,7 @@ that :class:`ProcessPoolExecutor` will not work in the interactive interpreter.
Calling :class:`Executor` or :class:`Future` methods from a callable submitted Calling :class:`Executor` or :class:`Future` methods from a callable submitted
to a :class:`ProcessPoolExecutor` will result in deadlock. to a :class:`ProcessPoolExecutor` will result in deadlock.
.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=()) .. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)
An :class:`Executor` subclass that executes calls asynchronously using a pool An :class:`Executor` subclass that executes calls asynchronously using a pool
of at most *max_workers* processes. If *max_workers* is ``None`` or not of at most *max_workers* processes. If *max_workers* is ``None`` or not
@ -252,6 +252,11 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
pending jobs will raise a :exc:`~concurrent.futures.process.BrokenProcessPool`, pending jobs will raise a :exc:`~concurrent.futures.process.BrokenProcessPool`,
as well as any attempt to submit more jobs to the pool. as well as any attempt to submit more jobs to the pool.
*max_tasks_per_child* is an optional argument that specifies the maximum
number of tasks a single process can execute before it will exit and be
replaced with a fresh worker process. The default *max_tasks_per_child* is
``None`` which means worker processes will live as long as the pool.
.. versionchanged:: 3.3 .. versionchanged:: 3.3
When one of the worker processes terminates abruptly, a When one of the worker processes terminates abruptly, a
:exc:`BrokenProcessPool` error is now raised. Previously, behaviour :exc:`BrokenProcessPool` error is now raised. Previously, behaviour
@ -264,6 +269,10 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
Added the *initializer* and *initargs* arguments. Added the *initializer* and *initargs* arguments.
.. versionchanged:: 3.11
The *max_tasks_per_child* argument was added to allow users to
control the lifetime of workers in the pool.
.. _processpoolexecutor-example: .. _processpoolexecutor-example:

View file

@ -141,10 +141,11 @@ class _WorkItem(object):
self.kwargs = kwargs self.kwargs = kwargs
class _ResultItem(object): class _ResultItem(object):
def __init__(self, work_id, exception=None, result=None): def __init__(self, work_id, exception=None, result=None, exit_pid=None):
self.work_id = work_id self.work_id = work_id
self.exception = exception self.exception = exception
self.result = result self.result = result
self.exit_pid = exit_pid
class _CallItem(object): class _CallItem(object):
def __init__(self, work_id, fn, args, kwargs): def __init__(self, work_id, fn, args, kwargs):
@ -201,17 +202,19 @@ def _process_chunk(fn, chunk):
return [fn(*args) for args in chunk] return [fn(*args) for args in chunk]
def _sendback_result(result_queue, work_id, result=None, exception=None): def _sendback_result(result_queue, work_id, result=None, exception=None,
exit_pid=None):
"""Safely send back the given result or exception""" """Safely send back the given result or exception"""
try: try:
result_queue.put(_ResultItem(work_id, result=result, result_queue.put(_ResultItem(work_id, result=result,
exception=exception)) exception=exception, exit_pid=exit_pid))
except BaseException as e: except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__) exc = _ExceptionWithTraceback(e, e.__traceback__)
result_queue.put(_ResultItem(work_id, exception=exc)) result_queue.put(_ResultItem(work_id, exception=exc,
exit_pid=exit_pid))
def _process_worker(call_queue, result_queue, initializer, initargs): def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
"""Evaluates calls from call_queue and places the results in result_queue. """Evaluates calls from call_queue and places the results in result_queue.
This worker is run in a separate process. This worker is run in a separate process.
@ -232,25 +235,38 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
# The parent will notice that the process stopped and # The parent will notice that the process stopped and
# mark the pool broken # mark the pool broken
return return
num_tasks = 0
exit_pid = None
while True: while True:
call_item = call_queue.get(block=True) call_item = call_queue.get(block=True)
if call_item is None: if call_item is None:
# Wake up queue management thread # Wake up queue management thread
result_queue.put(os.getpid()) result_queue.put(os.getpid())
return return
if max_tasks is not None:
num_tasks += 1
if num_tasks >= max_tasks:
exit_pid = os.getpid()
try: try:
r = call_item.fn(*call_item.args, **call_item.kwargs) r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e: except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__) exc = _ExceptionWithTraceback(e, e.__traceback__)
_sendback_result(result_queue, call_item.work_id, exception=exc) _sendback_result(result_queue, call_item.work_id, exception=exc,
exit_pid=exit_pid)
else: else:
_sendback_result(result_queue, call_item.work_id, result=r) _sendback_result(result_queue, call_item.work_id, result=r,
exit_pid=exit_pid)
del r del r
# Liberate the resource as soon as possible, to avoid holding onto # Liberate the resource as soon as possible, to avoid holding onto
# open files or shared memory that is not needed anymore # open files or shared memory that is not needed anymore
del call_item del call_item
if exit_pid is not None:
return
class _ExecutorManagerThread(threading.Thread): class _ExecutorManagerThread(threading.Thread):
"""Manages the communication between this process and the worker processes. """Manages the communication between this process and the worker processes.
@ -301,6 +317,10 @@ class _ExecutorManagerThread(threading.Thread):
# A queue.Queue of work ids e.g. Queue([5, 6, ...]). # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
self.work_ids_queue = executor._work_ids self.work_ids_queue = executor._work_ids
# Maximum number of tasks a worker process can execute before
# exiting safely
self.max_tasks_per_child = executor._max_tasks_per_child
# A dict mapping work ids to _WorkItems e.g. # A dict mapping work ids to _WorkItems e.g.
# {5: <_WorkItem...>, 6: <_WorkItem...>, ...} # {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
self.pending_work_items = executor._pending_work_items self.pending_work_items = executor._pending_work_items
@ -320,13 +340,21 @@ class _ExecutorManagerThread(threading.Thread):
return return
if result_item is not None: if result_item is not None:
self.process_result_item(result_item) self.process_result_item(result_item)
process_exited = result_item.exit_pid is not None
if process_exited:
p = self.processes.pop(result_item.exit_pid)
p.join()
# Delete reference to result_item to avoid keeping references # Delete reference to result_item to avoid keeping references
# while waiting on new results. # while waiting on new results.
del result_item del result_item
# attempt to increment idle process count if executor := self.executor_reference():
executor = self.executor_reference() if process_exited:
if executor is not None: with self.shutdown_lock:
executor._adjust_process_count()
else:
executor._idle_worker_semaphore.release() executor._idle_worker_semaphore.release()
del executor del executor
@ -578,7 +606,7 @@ class BrokenProcessPool(_base.BrokenExecutor):
class ProcessPoolExecutor(_base.Executor): class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, mp_context=None, def __init__(self, max_workers=None, mp_context=None,
initializer=None, initargs=()): initializer=None, initargs=(), *, max_tasks_per_child=None):
"""Initializes a new ProcessPoolExecutor instance. """Initializes a new ProcessPoolExecutor instance.
Args: Args:
@ -589,6 +617,11 @@ class ProcessPoolExecutor(_base.Executor):
object should provide SimpleQueue, Queue and Process. object should provide SimpleQueue, Queue and Process.
initializer: A callable used to initialize worker processes. initializer: A callable used to initialize worker processes.
initargs: A tuple of arguments to pass to the initializer. initargs: A tuple of arguments to pass to the initializer.
max_tasks_per_child: The maximum number of tasks a worker process can
complete before it will exit and be replaced with a fresh
worker process, to enable unused resources to be freed. The
default value is None, which means worker process will live
as long as the executor will live.
""" """
_check_system_limits() _check_system_limits()
@ -616,6 +649,13 @@ class ProcessPoolExecutor(_base.Executor):
self._initializer = initializer self._initializer = initializer
self._initargs = initargs self._initargs = initargs
if max_tasks_per_child is not None:
if not isinstance(max_tasks_per_child, int):
raise TypeError("max_tasks_per_child must be an integer")
elif max_tasks_per_child <= 0:
raise ValueError("max_tasks_per_child must be >= 1")
self._max_tasks_per_child = max_tasks_per_child
# Management thread # Management thread
self._executor_manager_thread = None self._executor_manager_thread = None
@ -678,7 +718,8 @@ class ProcessPoolExecutor(_base.Executor):
args=(self._call_queue, args=(self._call_queue,
self._result_queue, self._result_queue,
self._initializer, self._initializer,
self._initargs)) self._initargs,
self._max_tasks_per_child))
p.start() p.start()
self._processes[p.pid] = p self._processes[p.pid] = p

View file

@ -49,7 +49,6 @@ SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
INITIALIZER_STATUS = 'uninitialized' INITIALIZER_STATUS = 'uninitialized'
def mul(x, y): def mul(x, y):
return x * y return x * y
@ -1038,6 +1037,36 @@ class ProcessPoolExecutorTest(ExecutorTest):
self.assertLessEqual(len(executor._processes), 2) self.assertLessEqual(len(executor._processes), 2)
executor.shutdown() executor.shutdown()
def test_max_tasks_per_child(self):
executor = self.executor_type(1, max_tasks_per_child=3)
f1 = executor.submit(os.getpid)
original_pid = f1.result()
# The worker pid remains the same as the worker could be reused
f2 = executor.submit(os.getpid)
self.assertEqual(f2.result(), original_pid)
self.assertEqual(len(executor._processes), 1)
f3 = executor.submit(os.getpid)
self.assertEqual(f3.result(), original_pid)
# A new worker is spawned, with a statistically different pid,
# while the previous was reaped.
f4 = executor.submit(os.getpid)
new_pid = f4.result()
self.assertNotEqual(original_pid, new_pid)
self.assertEqual(len(executor._processes), 1)
executor.shutdown()
def test_max_tasks_early_shutdown(self):
executor = self.executor_type(3, max_tasks_per_child=1)
futures = []
for i in range(6):
futures.append(executor.submit(mul, i, i))
executor.shutdown()
for i, future in enumerate(futures):
self.assertEqual(future.result(), mul(i, i))
create_executor_tests(ProcessPoolExecutorTest, create_executor_tests(ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin, executor_mixins=(ProcessPoolForkMixin,
ProcessPoolForkserverMixin, ProcessPoolForkserverMixin,

View file

@ -0,0 +1,3 @@
Add ``max_tasks_per_child`` to :class:`concurrent.futures.ProcessPoolExecutor`.
This allows users to specify the maximum number of tasks a single process
should execute before the process needs to be restarted.