gh-128041: Add terminate_workers and kill_workers methods to ProcessPoolExecutor (GH-128043)

This adds two new methods to `multiprocessing`'s `ProcessPoolExecutor`:
- **`terminate_workers()`**: forcefully terminates worker processes using `Process.terminate()`
- **`kill_workers()`**: forcefully kills worker processes using `Process.kill()`

These methods provide users with a direct way to stop worker processes without `shutdown()` or relying on implementation details, addressing situations where immediate termination is needed.

Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com>
Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
Commit-message-mostly-authored-by: Claude Sonnet 3.7 (because why not -greg)
This commit is contained in:
Charles Machalow 2025-03-02 18:01:45 -08:00 committed by GitHub
parent 7afa476874
commit f97e4098ff
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 201 additions and 0 deletions

View file

@ -415,6 +415,30 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
require the *fork* start method for :class:`ProcessPoolExecutor` you must require the *fork* start method for :class:`ProcessPoolExecutor` you must
explicitly pass ``mp_context=multiprocessing.get_context("fork")``. explicitly pass ``mp_context=multiprocessing.get_context("fork")``.
.. method:: terminate_workers()
Attempt to terminate all living worker processes immediately by calling
:meth:`Process.terminate <multiprocessing.Process.terminate>` on each of them.
Internally, it will also call :meth:`Executor.shutdown` to ensure that all
other resources associated with the executor are freed.
After calling this method the caller should no longer submit tasks to the
executor.
.. versionadded:: next
.. method:: kill_workers()
Attempt to kill all living worker processes immediately by calling
:meth:`Process.kill <multiprocessing.Process.kill>` on each of them.
Internally, it will also call :meth:`Executor.shutdown` to ensure that all
other resources associated with the executor are freed.
After calling this method the caller should no longer submit tasks to the
executor.
.. versionadded:: next
.. _processpoolexecutor-example: .. _processpoolexecutor-example:
ProcessPoolExecutor Example ProcessPoolExecutor Example

View file

@ -444,6 +444,11 @@ contextvars
* Support context manager protocol by :class:`contextvars.Token`. * Support context manager protocol by :class:`contextvars.Token`.
(Contributed by Andrew Svetlov in :gh:`129889`.) (Contributed by Andrew Svetlov in :gh:`129889`.)
* Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and
:meth:`concurrent.futures.ProcessPoolExecutor.kill_workers` as
ways to terminate or kill all living worker processes in the given pool.
(Contributed by Charles Machalow in :gh:`128043`.)
ctypes ctypes
------ ------

View file

@ -626,6 +626,14 @@ class BrokenProcessPool(_base.BrokenExecutor):
while a future was in the running state. while a future was in the running state.
""" """
_TERMINATE = "terminate"
_KILL = "kill"
_SHUTDOWN_CALLBACK_OPERATION = {
_TERMINATE,
_KILL
}
class ProcessPoolExecutor(_base.Executor): class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, mp_context=None, def __init__(self, max_workers=None, mp_context=None,
@ -855,3 +863,66 @@ class ProcessPoolExecutor(_base.Executor):
self._executor_manager_thread_wakeup = None self._executor_manager_thread_wakeup = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__ shutdown.__doc__ = _base.Executor.shutdown.__doc__
def _force_shutdown(self, operation):
"""Attempts to terminate or kill the executor's workers based off the
given operation. Iterates through all of the current processes and
performs the relevant task if the process is still alive.
After terminating workers, the pool will be in a broken state
and no longer usable (for instance, new tasks should not be
submitted).
"""
if operation not in _SHUTDOWN_CALLBACK_OPERATION:
raise ValueError(f"Unsupported operation: {operation!r}")
processes = {}
if self._processes:
processes = self._processes.copy()
# shutdown will invalidate ._processes, so we copy it right before
# calling. If we waited here, we would deadlock if a process decides not
# to exit.
self.shutdown(wait=False, cancel_futures=True)
if not processes:
return
for proc in processes.values():
try:
if not proc.is_alive():
continue
except ValueError:
# The process is already exited/closed out.
continue
try:
if operation == _TERMINATE:
proc.terminate()
elif operation == _KILL:
proc.kill()
except ProcessLookupError:
# The process just ended before our signal
continue
def terminate_workers(self):
"""Attempts to terminate the executor's workers.
Iterates through all of the current worker processes and terminates
each one that is still alive.
After terminating workers, the pool will be in a broken state
and no longer usable (for instance, new tasks should not be
submitted).
"""
return self._force_shutdown(operation=_TERMINATE)
def kill_workers(self):
"""Attempts to kill the executor's workers.
Iterates through all of the current worker processes and kills
each one that is still alive.
After killing workers, the pool will be in a broken state
and no longer usable (for instance, new tasks should not be
submitted).
"""
return self._force_shutdown(operation=_KILL)

View file

@ -1,13 +1,17 @@
import os import os
import queue
import signal
import sys import sys
import threading import threading
import time import time
import unittest import unittest
import unittest.mock
from concurrent import futures from concurrent import futures
from concurrent.futures.process import BrokenProcessPool from concurrent.futures.process import BrokenProcessPool
from test import support from test import support
from test.support import hashlib_helper from test.support import hashlib_helper
from test.test_importlib.metadata.fixtures import parameterize
from .executor import ExecutorTest, mul from .executor import ExecutorTest, mul
from .util import ( from .util import (
@ -22,6 +26,19 @@ class EventfulGCObj():
def __del__(self): def __del__(self):
self.event.set() self.event.set()
TERMINATE_WORKERS = futures.ProcessPoolExecutor.terminate_workers.__name__
KILL_WORKERS = futures.ProcessPoolExecutor.kill_workers.__name__
FORCE_SHUTDOWN_PARAMS = [
dict(function_name=TERMINATE_WORKERS),
dict(function_name=KILL_WORKERS),
]
def _put_sleep_put(queue):
""" Used as part of test_terminate_workers """
queue.put('started')
time.sleep(2)
queue.put('finished')
class ProcessPoolExecutorTest(ExecutorTest): class ProcessPoolExecutorTest(ExecutorTest):
@ -218,6 +235,86 @@ class ProcessPoolExecutorTest(ExecutorTest):
list(executor.map(mul, [(2, 3)] * 10)) list(executor.map(mul, [(2, 3)] * 10))
executor.shutdown() executor.shutdown()
def test_terminate_workers(self):
mock_fn = unittest.mock.Mock()
with self.executor_type(max_workers=1) as executor:
executor._force_shutdown = mock_fn
executor.terminate_workers()
mock_fn.assert_called_once_with(operation=futures.process._TERMINATE)
def test_kill_workers(self):
mock_fn = unittest.mock.Mock()
with self.executor_type(max_workers=1) as executor:
executor._force_shutdown = mock_fn
executor.kill_workers()
mock_fn.assert_called_once_with(operation=futures.process._KILL)
def test_force_shutdown_workers_invalid_op(self):
with self.executor_type(max_workers=1) as executor:
self.assertRaises(ValueError,
executor._force_shutdown,
operation='invalid operation'),
@parameterize(*FORCE_SHUTDOWN_PARAMS)
def test_force_shutdown_workers(self, function_name):
manager = self.get_context().Manager()
q = manager.Queue()
with self.executor_type(max_workers=1) as executor:
executor.submit(_put_sleep_put, q)
# We should get started, but not finished since we'll terminate the
# workers just after
self.assertEqual(q.get(timeout=5), 'started')
worker_process = list(executor._processes.values())[0]
getattr(executor, function_name)()
worker_process.join()
if function_name == TERMINATE_WORKERS or \
sys.platform == 'win32':
# On windows, kill and terminate both send SIGTERM
self.assertEqual(worker_process.exitcode, -signal.SIGTERM)
elif function_name == KILL_WORKERS:
self.assertEqual(worker_process.exitcode, -signal.SIGKILL)
else:
self.fail(f"Unknown operation: {function_name}")
self.assertRaises(queue.Empty, q.get, timeout=1)
@parameterize(*FORCE_SHUTDOWN_PARAMS)
def test_force_shutdown_workers_dead_workers(self, function_name):
with self.executor_type(max_workers=1) as executor:
future = executor.submit(os._exit, 1)
self.assertRaises(BrokenProcessPool, future.result)
# even though the pool is broken, this shouldn't raise
getattr(executor, function_name)()
@parameterize(*FORCE_SHUTDOWN_PARAMS)
def test_force_shutdown_workers_not_started_yet(self, function_name):
ctx = self.get_context()
with unittest.mock.patch.object(ctx, 'Process') as mock_process:
with self.executor_type(max_workers=1, mp_context=ctx) as executor:
# The worker has not been started yet, terminate/kill_workers
# should basically no-op
getattr(executor, function_name)()
mock_process.return_value.kill.assert_not_called()
mock_process.return_value.terminate.assert_not_called()
@parameterize(*FORCE_SHUTDOWN_PARAMS)
def test_force_shutdown_workers_stops_pool(self, function_name):
with self.executor_type(max_workers=1) as executor:
task = executor.submit(time.sleep, 0)
self.assertIsNone(task.result())
getattr(executor, function_name)()
self.assertRaises(RuntimeError, executor.submit, time.sleep, 0)
create_executor_tests(globals(), ProcessPoolExecutorTest, create_executor_tests(globals(), ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin, executor_mixins=(ProcessPoolForkMixin,

View file

@ -0,0 +1,4 @@
Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and
:meth:`concurrent.futures.ProcessPoolExecutor.kill_workers` as
ways to terminate or kill all living worker processes in the given pool.
(Contributed by Charles Machalow in :gh:`128043`.)