gh-90622: Prevent max_tasks_per_child use with a fork mp_context. (#91587)

Prevent `max_tasks_per_child` use with a "fork" mp_context to avoid deadlocks.

Also defaults to "spawn" when no mp_context is supplied for safe convenience.
This commit is contained in:
Gregory P. Smith 2022-05-06 00:04:53 -07:00 committed by GitHub
parent 2b563f1ad3
commit fa4f0a134e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 43 additions and 11 deletions

View file

@ -254,8 +254,11 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
*max_tasks_per_child* is an optional argument that specifies the maximum *max_tasks_per_child* is an optional argument that specifies the maximum
number of tasks a single process can execute before it will exit and be number of tasks a single process can execute before it will exit and be
replaced with a fresh worker process. The default *max_tasks_per_child* is replaced with a fresh worker process. By default *max_tasks_per_child* is
``None`` which means worker processes will live as long as the pool. ``None`` which means worker processes will live as long as the pool. When
a max is specified, the "spawn" multiprocessing start method will be used by
default in absense of a *mp_context* parameter. This feature is incompatible
with the "fork" start method.
.. versionchanged:: 3.3 .. versionchanged:: 3.3
When one of the worker processes terminates abruptly, a When one of the worker processes terminates abruptly, a

View file

@ -617,14 +617,16 @@ class ProcessPoolExecutor(_base.Executor):
execute the given calls. If None or not given then as many execute the given calls. If None or not given then as many
worker processes will be created as the machine has processors. worker processes will be created as the machine has processors.
mp_context: A multiprocessing context to launch the workers. This mp_context: A multiprocessing context to launch the workers. This
object should provide SimpleQueue, Queue and Process. object should provide SimpleQueue, Queue and Process. Useful
to allow specific multiprocessing start methods.
initializer: A callable used to initialize worker processes. initializer: A callable used to initialize worker processes.
initargs: A tuple of arguments to pass to the initializer. initargs: A tuple of arguments to pass to the initializer.
max_tasks_per_child: The maximum number of tasks a worker process can max_tasks_per_child: The maximum number of tasks a worker process
complete before it will exit and be replaced with a fresh can complete before it will exit and be replaced with a fresh
worker process, to enable unused resources to be freed. The worker process. The default of None means worker process will
default value is None, which means worker process will live live as long as the executor. Requires a non-'fork' mp_context
as long as the executor will live. start method. When given, we default to using 'spawn' if no
mp_context is supplied.
""" """
_check_system_limits() _check_system_limits()
@ -644,6 +646,9 @@ class ProcessPoolExecutor(_base.Executor):
self._max_workers = max_workers self._max_workers = max_workers
if mp_context is None: if mp_context is None:
if max_tasks_per_child is not None:
mp_context = mp.get_context("spawn")
else:
mp_context = mp.get_context() mp_context = mp.get_context()
self._mp_context = mp_context self._mp_context = mp_context
@ -657,6 +662,11 @@ class ProcessPoolExecutor(_base.Executor):
raise TypeError("max_tasks_per_child must be an integer") raise TypeError("max_tasks_per_child must be an integer")
elif max_tasks_per_child <= 0: elif max_tasks_per_child <= 0:
raise ValueError("max_tasks_per_child must be >= 1") raise ValueError("max_tasks_per_child must be >= 1")
if self._mp_context.get_start_method(allow_none=False) == "fork":
# https://github.com/python/cpython/issues/90622
raise ValueError("max_tasks_per_child is incompatible with"
" the 'fork' multiprocessing start method;"
" supply a different mp_context.")
self._max_tasks_per_child = max_tasks_per_child self._max_tasks_per_child = max_tasks_per_child
# Management thread # Management thread

View file

@ -1039,10 +1039,15 @@ class ProcessPoolExecutorTest(ExecutorTest):
executor.shutdown() executor.shutdown()
def test_max_tasks_per_child(self): def test_max_tasks_per_child(self):
context = self.get_context()
if context.get_start_method(allow_none=False) == "fork":
with self.assertRaises(ValueError):
self.executor_type(1, mp_context=context, max_tasks_per_child=3)
return
# not using self.executor as we need to control construction. # not using self.executor as we need to control construction.
# arguably this could go in another class w/o that mixin. # arguably this could go in another class w/o that mixin.
executor = self.executor_type( executor = self.executor_type(
1, mp_context=self.get_context(), max_tasks_per_child=3) 1, mp_context=context, max_tasks_per_child=3)
f1 = executor.submit(os.getpid) f1 = executor.submit(os.getpid)
original_pid = f1.result() original_pid = f1.result()
# The worker pid remains the same as the worker could be reused # The worker pid remains the same as the worker could be reused
@ -1061,11 +1066,20 @@ class ProcessPoolExecutorTest(ExecutorTest):
executor.shutdown() executor.shutdown()
def test_max_tasks_per_child_defaults_to_spawn_context(self):
# not using self.executor as we need to control construction.
# arguably this could go in another class w/o that mixin.
executor = self.executor_type(1, max_tasks_per_child=3)
self.assertEqual(executor._mp_context.get_start_method(), "spawn")
def test_max_tasks_early_shutdown(self): def test_max_tasks_early_shutdown(self):
context = self.get_context()
if context.get_start_method(allow_none=False) == "fork":
raise unittest.SkipTest("Incompatible with the fork start method.")
# not using self.executor as we need to control construction. # not using self.executor as we need to control construction.
# arguably this could go in another class w/o that mixin. # arguably this could go in another class w/o that mixin.
executor = self.executor_type( executor = self.executor_type(
3, mp_context=self.get_context(), max_tasks_per_child=1) 3, mp_context=context, max_tasks_per_child=1)
futures = [] futures = []
for i in range(6): for i in range(6):
futures.append(executor.submit(mul, i, i)) futures.append(executor.submit(mul, i, i))

View file

@ -0,0 +1,5 @@
In ``concurrent.futures.process.ProcessPoolExecutor`` disallow the "fork"
multiprocessing start method when the new ``max_tasks_per_child`` feature is
used as the mix of threads+fork can hang the child processes. Default to
using the safe "spawn" start method in that circumstance if no
``mp_context`` was supplied.