gh-91231: Add shutdown_timeout to multiprocessing BaseManager (#32112)

Add an optional keyword 'shutdown_timeout' parameter to the
multiprocessing.BaseManager constructor. Kill the process if
terminate() takes longer than the timeout.

Multiprocessing tests pass test.support.SHORT_TIMEOUT
to BaseManager.shutdown_timeout.
This commit is contained in:
Victor Stinner 2022-04-19 16:27:00 +02:00 committed by GitHub
parent 74070085da
commit 061a8bf77c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 49 additions and 20 deletions

View file

@ -1676,7 +1676,7 @@ Manager processes will be shutdown as soon as they are garbage collected or
their parent process exits. The manager classes are defined in the their parent process exits. The manager classes are defined in the
:mod:`multiprocessing.managers` module: :mod:`multiprocessing.managers` module:
.. class:: BaseManager([address[, authkey]]) .. class:: BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)
Create a BaseManager object. Create a BaseManager object.
@ -1691,6 +1691,20 @@ their parent process exits. The manager classes are defined in the
*authkey* is ``None`` then ``current_process().authkey`` is used. *authkey* is ``None`` then ``current_process().authkey`` is used.
Otherwise *authkey* is used and it must be a byte string. Otherwise *authkey* is used and it must be a byte string.
*serializer* must be ``'pickle'`` (use :mod:`pickle` serialization) or
``'xmlrpclib'`` (use :mod:`xmlrpc.client` serialization).
*ctx* is a context object, or ``None`` (use the current context). See the
:func:`get_context` function.
*shutdown_timeout* is a timeout in seconds used to wait until the process
used by the manager completes in the :meth:`shutdown` method. If the
shutdown times out, the process is terminated. If terminating the process
also times out, the process is killed.
.. versionchanged: 3.11
Added the *shutdown_timeout* parameter.
.. method:: start([initializer[, initargs]]) .. method:: start([initializer[, initargs]])
Start a subprocess to start the manager. If *initializer* is not ``None`` Start a subprocess to start the manager. If *initializer* is not ``None``

View file

@ -497,7 +497,7 @@ class BaseManager(object):
_Server = Server _Server = Server
def __init__(self, address=None, authkey=None, serializer='pickle', def __init__(self, address=None, authkey=None, serializer='pickle',
ctx=None): ctx=None, *, shutdown_timeout=1.0):
if authkey is None: if authkey is None:
authkey = process.current_process().authkey authkey = process.current_process().authkey
self._address = address # XXX not final address if eg ('', 0) self._address = address # XXX not final address if eg ('', 0)
@ -507,6 +507,7 @@ class BaseManager(object):
self._serializer = serializer self._serializer = serializer
self._Listener, self._Client = listener_client[serializer] self._Listener, self._Client = listener_client[serializer]
self._ctx = ctx or get_context() self._ctx = ctx or get_context()
self._shutdown_timeout = shutdown_timeout
def get_server(self): def get_server(self):
''' '''
@ -570,8 +571,8 @@ class BaseManager(object):
self._state.value = State.STARTED self._state.value = State.STARTED
self.shutdown = util.Finalize( self.shutdown = util.Finalize(
self, type(self)._finalize_manager, self, type(self)._finalize_manager,
args=(self._process, self._address, self._authkey, args=(self._process, self._address, self._authkey, self._state,
self._state, self._Client), self._Client, self._shutdown_timeout),
exitpriority=0 exitpriority=0
) )
@ -656,7 +657,8 @@ class BaseManager(object):
self.shutdown() self.shutdown()
@staticmethod @staticmethod
def _finalize_manager(process, address, authkey, state, _Client): def _finalize_manager(process, address, authkey, state, _Client,
shutdown_timeout):
''' '''
Shutdown the manager process; will be registered as a finalizer Shutdown the manager process; will be registered as a finalizer
''' '''
@ -671,15 +673,17 @@ class BaseManager(object):
except Exception: except Exception:
pass pass
process.join(timeout=1.0) process.join(timeout=shutdown_timeout)
if process.is_alive(): if process.is_alive():
util.info('manager still alive') util.info('manager still alive')
if hasattr(process, 'terminate'): if hasattr(process, 'terminate'):
util.info('trying to `terminate()` manager process') util.info('trying to `terminate()` manager process')
process.terminate() process.terminate()
process.join(timeout=0.1) process.join(timeout=shutdown_timeout)
if process.is_alive(): if process.is_alive():
util.info('manager still alive after terminate') util.info('manager still alive after terminate')
process.kill()
process.join()
state.value = State.SHUTDOWN state.value = State.SHUTDOWN
try: try:

View file

@ -119,6 +119,9 @@ if CHECK_TIMINGS:
else: else:
TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
# BaseManager.shutdown_timeout
SHUTDOWN_TIMEOUT = support.SHORT_TIMEOUT
HAVE_GETVALUE = not getattr(_multiprocessing, HAVE_GETVALUE = not getattr(_multiprocessing,
'HAVE_BROKEN_SEM_GETVALUE', False) 'HAVE_BROKEN_SEM_GETVALUE', False)
@ -2897,7 +2900,7 @@ class _TestMyManager(BaseTestCase):
ALLOWED_TYPES = ('manager',) ALLOWED_TYPES = ('manager',)
def test_mymanager(self): def test_mymanager(self):
manager = MyManager() manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.start() manager.start()
self.common(manager) self.common(manager)
manager.shutdown() manager.shutdown()
@ -2908,7 +2911,8 @@ class _TestMyManager(BaseTestCase):
self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
def test_mymanager_context(self): def test_mymanager_context(self):
with MyManager() as manager: manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
with manager:
self.common(manager) self.common(manager)
# bpo-30356: BaseManager._finalize_manager() sends SIGTERM # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
# to the manager process if it takes longer than 1 second to stop, # to the manager process if it takes longer than 1 second to stop,
@ -2916,7 +2920,7 @@ class _TestMyManager(BaseTestCase):
self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
def test_mymanager_context_prestarted(self): def test_mymanager_context_prestarted(self):
manager = MyManager() manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.start() manager.start()
with manager: with manager:
self.common(manager) self.common(manager)
@ -2978,8 +2982,8 @@ class _TestRemoteManager(BaseTestCase):
@classmethod @classmethod
def _putter(cls, address, authkey): def _putter(cls, address, authkey):
manager = QueueManager2( manager = QueueManager2(
address=address, authkey=authkey, serializer=SERIALIZER address=address, authkey=authkey, serializer=SERIALIZER,
) shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.connect() manager.connect()
queue = manager.get_queue() queue = manager.get_queue()
# Note that xmlrpclib will deserialize object as a list not a tuple # Note that xmlrpclib will deserialize object as a list not a tuple
@ -2989,8 +2993,8 @@ class _TestRemoteManager(BaseTestCase):
authkey = os.urandom(32) authkey = os.urandom(32)
manager = QueueManager( manager = QueueManager(
address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER,
) shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.start() manager.start()
self.addCleanup(manager.shutdown) self.addCleanup(manager.shutdown)
@ -2999,8 +3003,8 @@ class _TestRemoteManager(BaseTestCase):
p.start() p.start()
manager2 = QueueManager2( manager2 = QueueManager2(
address=manager.address, authkey=authkey, serializer=SERIALIZER address=manager.address, authkey=authkey, serializer=SERIALIZER,
) shutdown_timeout=SHUTDOWN_TIMEOUT)
manager2.connect() manager2.connect()
queue = manager2.get_queue() queue = manager2.get_queue()
@ -3020,7 +3024,8 @@ class _TestManagerRestart(BaseTestCase):
@classmethod @classmethod
def _putter(cls, address, authkey): def _putter(cls, address, authkey):
manager = QueueManager( manager = QueueManager(
address=address, authkey=authkey, serializer=SERIALIZER) address=address, authkey=authkey, serializer=SERIALIZER,
shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.connect() manager.connect()
queue = manager.get_queue() queue = manager.get_queue()
queue.put('hello world') queue.put('hello world')
@ -3028,7 +3033,8 @@ class _TestManagerRestart(BaseTestCase):
def test_rapid_restart(self): def test_rapid_restart(self):
authkey = os.urandom(32) authkey = os.urandom(32)
manager = QueueManager( manager = QueueManager(
address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER) address=(socket_helper.HOST, 0), authkey=authkey,
serializer=SERIALIZER, shutdown_timeout=SHUTDOWN_TIMEOUT)
try: try:
srvr = manager.get_server() srvr = manager.get_server()
addr = srvr.address addr = srvr.address
@ -3048,7 +3054,8 @@ class _TestManagerRestart(BaseTestCase):
manager.shutdown() manager.shutdown()
manager = QueueManager( manager = QueueManager(
address=addr, authkey=authkey, serializer=SERIALIZER) address=addr, authkey=authkey, serializer=SERIALIZER,
shutdown_timeout=SHUTDOWN_TIMEOUT)
try: try:
manager.start() manager.start()
self.addCleanup(manager.shutdown) self.addCleanup(manager.shutdown)
@ -3059,7 +3066,8 @@ class _TestManagerRestart(BaseTestCase):
# (sporadic failure on buildbots) # (sporadic failure on buildbots)
time.sleep(1.0) time.sleep(1.0)
manager = QueueManager( manager = QueueManager(
address=addr, authkey=authkey, serializer=SERIALIZER) address=addr, authkey=authkey, serializer=SERIALIZER,
shutdown_timeout=SHUTDOWN_TIMEOUT)
if hasattr(manager, "shutdown"): if hasattr(manager, "shutdown"):
self.addCleanup(manager.shutdown) self.addCleanup(manager.shutdown)

View file

@ -0,0 +1,3 @@
Add an optional keyword *shutdown_timeout* parameter to the
:class:`multiprocessing.BaseManager` constructor. Kill the process if
terminate() takes longer than the timeout. Patch by Victor Stinner.