mirror of
https://github.com/python/cpython.git
synced 2025-08-10 03:49:18 +00:00
[3.12] gh-109593: Fix reentrancy issue in multiprocessing resource_tracker (GH-109629) (#109898)
gh-109593: Fix reentrancy issue in multiprocessing resource_tracker (GH-109629)
---------
(cherry picked from commit 0eb98837b6
)
Co-authored-by: Antoine Pitrou <antoine@python.org>
Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
This commit is contained in:
parent
84f9da9ab5
commit
b723b8a13d
7 changed files with 95 additions and 2 deletions
|
@ -51,15 +51,31 @@ if os.name == 'posix':
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
|
class ReentrantCallError(RuntimeError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class ResourceTracker(object):
|
class ResourceTracker(object):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._lock = threading.Lock()
|
self._lock = threading.RLock()
|
||||||
self._fd = None
|
self._fd = None
|
||||||
self._pid = None
|
self._pid = None
|
||||||
|
|
||||||
|
def _reentrant_call_error(self):
|
||||||
|
# gh-109629: this happens if an explicit call to the ResourceTracker
|
||||||
|
# gets interrupted by a garbage collection, invoking a finalizer (*)
|
||||||
|
# that itself calls back into ResourceTracker.
|
||||||
|
# (*) for example the SemLock finalizer
|
||||||
|
raise ReentrantCallError(
|
||||||
|
"Reentrant call into the multiprocessing resource tracker")
|
||||||
|
|
||||||
def _stop(self):
|
def _stop(self):
|
||||||
with self._lock:
|
with self._lock:
|
||||||
|
# This should not happen (_stop() isn't called by a finalizer)
|
||||||
|
# but we check for it anyway.
|
||||||
|
if self._lock._recursion_count() > 1:
|
||||||
|
return self._reentrant_call_error()
|
||||||
if self._fd is None:
|
if self._fd is None:
|
||||||
# not running
|
# not running
|
||||||
return
|
return
|
||||||
|
@ -81,6 +97,9 @@ class ResourceTracker(object):
|
||||||
This can be run from any process. Usually a child process will use
|
This can be run from any process. Usually a child process will use
|
||||||
the resource created by its parent.'''
|
the resource created by its parent.'''
|
||||||
with self._lock:
|
with self._lock:
|
||||||
|
if self._lock._recursion_count() > 1:
|
||||||
|
# The code below is certainly not reentrant-safe, so bail out
|
||||||
|
return self._reentrant_call_error()
|
||||||
if self._fd is not None:
|
if self._fd is not None:
|
||||||
# resource tracker was launched before, is it still running?
|
# resource tracker was launched before, is it still running?
|
||||||
if self._check_alive():
|
if self._check_alive():
|
||||||
|
@ -159,7 +178,17 @@ class ResourceTracker(object):
|
||||||
self._send('UNREGISTER', name, rtype)
|
self._send('UNREGISTER', name, rtype)
|
||||||
|
|
||||||
def _send(self, cmd, name, rtype):
|
def _send(self, cmd, name, rtype):
|
||||||
self.ensure_running()
|
try:
|
||||||
|
self.ensure_running()
|
||||||
|
except ReentrantCallError:
|
||||||
|
# The code below might or might not work, depending on whether
|
||||||
|
# the resource tracker was already running and still alive.
|
||||||
|
# Better warn the user.
|
||||||
|
# (XXX is warnings.warn itself reentrant-safe? :-)
|
||||||
|
warnings.warn(
|
||||||
|
f"ResourceTracker called reentrantly for resource cleanup, "
|
||||||
|
f"which is unsupported. "
|
||||||
|
f"The {rtype} object {name!r} might leak.")
|
||||||
msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
|
msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
|
||||||
if len(msg) > 512:
|
if len(msg) > 512:
|
||||||
# posix guarantees that writes to a pipe of less than PIPE_BUF
|
# posix guarantees that writes to a pipe of less than PIPE_BUF
|
||||||
|
@ -176,6 +205,7 @@ register = _resource_tracker.register
|
||||||
unregister = _resource_tracker.unregister
|
unregister = _resource_tracker.unregister
|
||||||
getfd = _resource_tracker.getfd
|
getfd = _resource_tracker.getfd
|
||||||
|
|
||||||
|
|
||||||
def main(fd):
|
def main(fd):
|
||||||
'''Run resource tracker.'''
|
'''Run resource tracker.'''
|
||||||
# protect the process from ^C and "killall python" etc
|
# protect the process from ^C and "killall python" etc
|
||||||
|
|
|
@ -330,6 +330,42 @@ class RLockTests(BaseLockTests):
|
||||||
lock.release()
|
lock.release()
|
||||||
self.assertRaises(RuntimeError, lock._release_save)
|
self.assertRaises(RuntimeError, lock._release_save)
|
||||||
|
|
||||||
|
def test_recursion_count(self):
|
||||||
|
lock = self.locktype()
|
||||||
|
self.assertEqual(0, lock._recursion_count())
|
||||||
|
lock.acquire()
|
||||||
|
self.assertEqual(1, lock._recursion_count())
|
||||||
|
lock.acquire()
|
||||||
|
lock.acquire()
|
||||||
|
self.assertEqual(3, lock._recursion_count())
|
||||||
|
lock.release()
|
||||||
|
self.assertEqual(2, lock._recursion_count())
|
||||||
|
lock.release()
|
||||||
|
lock.release()
|
||||||
|
self.assertEqual(0, lock._recursion_count())
|
||||||
|
|
||||||
|
phase = []
|
||||||
|
|
||||||
|
def f():
|
||||||
|
lock.acquire()
|
||||||
|
phase.append(None)
|
||||||
|
while len(phase) == 1:
|
||||||
|
_wait()
|
||||||
|
lock.release()
|
||||||
|
phase.append(None)
|
||||||
|
|
||||||
|
with threading_helper.wait_threads_exit():
|
||||||
|
start_new_thread(f, ())
|
||||||
|
while len(phase) == 0:
|
||||||
|
_wait()
|
||||||
|
self.assertEqual(len(phase), 1)
|
||||||
|
self.assertEqual(0, lock._recursion_count())
|
||||||
|
phase.append(None)
|
||||||
|
while len(phase) == 2:
|
||||||
|
_wait()
|
||||||
|
self.assertEqual(len(phase), 3)
|
||||||
|
self.assertEqual(0, lock._recursion_count())
|
||||||
|
|
||||||
def test_different_thread(self):
|
def test_different_thread(self):
|
||||||
# Cannot release from a different thread
|
# Cannot release from a different thread
|
||||||
lock = self.locktype()
|
lock = self.locktype()
|
||||||
|
|
|
@ -29,6 +29,8 @@ class ModuleLockAsRLockTests:
|
||||||
test_timeout = None
|
test_timeout = None
|
||||||
# _release_save() unsupported
|
# _release_save() unsupported
|
||||||
test_release_save_unacquired = None
|
test_release_save_unacquired = None
|
||||||
|
# _recursion_count() unsupported
|
||||||
|
test_recursion_count = None
|
||||||
# lock status in repr unsupported
|
# lock status in repr unsupported
|
||||||
test_repr = None
|
test_repr = None
|
||||||
test_locked_repr = None
|
test_locked_repr = None
|
||||||
|
|
|
@ -1751,6 +1751,9 @@ class ConditionAsRLockTests(lock_tests.RLockTests):
|
||||||
# Condition uses an RLock by default and exports its API.
|
# Condition uses an RLock by default and exports its API.
|
||||||
locktype = staticmethod(threading.Condition)
|
locktype = staticmethod(threading.Condition)
|
||||||
|
|
||||||
|
def test_recursion_count(self):
|
||||||
|
self.skipTest("Condition does not expose _recursion_count()")
|
||||||
|
|
||||||
class ConditionTests(lock_tests.ConditionTests):
|
class ConditionTests(lock_tests.ConditionTests):
|
||||||
condtype = staticmethod(threading.Condition)
|
condtype = staticmethod(threading.Condition)
|
||||||
|
|
||||||
|
|
|
@ -238,6 +238,13 @@ class _RLock:
|
||||||
def _is_owned(self):
|
def _is_owned(self):
|
||||||
return self._owner == get_ident()
|
return self._owner == get_ident()
|
||||||
|
|
||||||
|
# Internal method used for reentrancy checks
|
||||||
|
|
||||||
|
def _recursion_count(self):
|
||||||
|
if self._owner != get_ident():
|
||||||
|
return 0
|
||||||
|
return self._count
|
||||||
|
|
||||||
_PyRLock = _RLock
|
_PyRLock = _RLock
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Avoid deadlocking on a reentrant call to the multiprocessing resource tracker. Such a reentrant call, though unlikely, can happen if a GC pass invokes the finalizer for a multiprocessing object such as SemLock.
|
|
@ -486,6 +486,18 @@ PyDoc_STRVAR(rlock_release_save_doc,
|
||||||
\n\
|
\n\
|
||||||
For internal use by `threading.Condition`.");
|
For internal use by `threading.Condition`.");
|
||||||
|
|
||||||
|
static PyObject *
|
||||||
|
rlock_recursion_count(rlockobject *self, PyObject *Py_UNUSED(ignored))
|
||||||
|
{
|
||||||
|
unsigned long tid = PyThread_get_thread_ident();
|
||||||
|
return PyLong_FromUnsignedLong(
|
||||||
|
self->rlock_owner == tid ? self->rlock_count : 0UL);
|
||||||
|
}
|
||||||
|
|
||||||
|
PyDoc_STRVAR(rlock_recursion_count_doc,
|
||||||
|
"_recursion_count() -> int\n\
|
||||||
|
\n\
|
||||||
|
For internal use by reentrancy checks.");
|
||||||
|
|
||||||
static PyObject *
|
static PyObject *
|
||||||
rlock_is_owned(rlockobject *self, PyObject *Py_UNUSED(ignored))
|
rlock_is_owned(rlockobject *self, PyObject *Py_UNUSED(ignored))
|
||||||
|
@ -561,6 +573,8 @@ static PyMethodDef rlock_methods[] = {
|
||||||
METH_VARARGS, rlock_acquire_restore_doc},
|
METH_VARARGS, rlock_acquire_restore_doc},
|
||||||
{"_release_save", (PyCFunction)rlock_release_save,
|
{"_release_save", (PyCFunction)rlock_release_save,
|
||||||
METH_NOARGS, rlock_release_save_doc},
|
METH_NOARGS, rlock_release_save_doc},
|
||||||
|
{"_recursion_count", (PyCFunction)rlock_recursion_count,
|
||||||
|
METH_NOARGS, rlock_recursion_count_doc},
|
||||||
{"__enter__", _PyCFunction_CAST(rlock_acquire),
|
{"__enter__", _PyCFunction_CAST(rlock_acquire),
|
||||||
METH_VARARGS | METH_KEYWORDS, rlock_acquire_doc},
|
METH_VARARGS | METH_KEYWORDS, rlock_acquire_doc},
|
||||||
{"__exit__", (PyCFunction)rlock_release,
|
{"__exit__", (PyCFunction)rlock_release,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue