mirror of
https://github.com/python/cpython.git
synced 2025-11-24 12:20:42 +00:00
gh-131788: make resource_tracker re-entrant safe (GH-131787)
* make resource_tracker re-entrant safe * Update Lib/multiprocessing/resource_tracker.py * trim trailing whitespace * use f-string and args = [x, *y, z] * raise self._reentrant_call_error --------- Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com> Co-authored-by: Gregory P. Smith <greg@krypto.org>
This commit is contained in:
parent
a10152f8fd
commit
f24a012350
2 changed files with 95 additions and 71 deletions
|
|
@ -20,6 +20,7 @@ import signal
|
||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
import warnings
|
import warnings
|
||||||
|
from collections import deque
|
||||||
|
|
||||||
from . import spawn
|
from . import spawn
|
||||||
from . import util
|
from . import util
|
||||||
|
|
@ -62,6 +63,7 @@ class ResourceTracker(object):
|
||||||
self._fd = None
|
self._fd = None
|
||||||
self._pid = None
|
self._pid = None
|
||||||
self._exitcode = None
|
self._exitcode = None
|
||||||
|
self._reentrant_messages = deque()
|
||||||
|
|
||||||
def _reentrant_call_error(self):
|
def _reentrant_call_error(self):
|
||||||
# gh-109629: this happens if an explicit call to the ResourceTracker
|
# gh-109629: this happens if an explicit call to the ResourceTracker
|
||||||
|
|
@ -98,7 +100,7 @@ class ResourceTracker(object):
|
||||||
# This shouldn't happen (it might when called by a finalizer)
|
# This shouldn't happen (it might when called by a finalizer)
|
||||||
# so we check for it anyway.
|
# so we check for it anyway.
|
||||||
if self._lock._recursion_count() > 1:
|
if self._lock._recursion_count() > 1:
|
||||||
return self._reentrant_call_error()
|
raise self._reentrant_call_error()
|
||||||
if self._fd is None:
|
if self._fd is None:
|
||||||
# not running
|
# not running
|
||||||
return
|
return
|
||||||
|
|
@ -128,69 +130,99 @@ class ResourceTracker(object):
|
||||||
|
|
||||||
This can be run from any process. Usually a child process will use
|
This can be run from any process. Usually a child process will use
|
||||||
the resource created by its parent.'''
|
the resource created by its parent.'''
|
||||||
|
return self._ensure_running_and_write()
|
||||||
|
|
||||||
|
def _teardown_dead_process(self):
|
||||||
|
os.close(self._fd)
|
||||||
|
|
||||||
|
# Clean-up to avoid dangling processes.
|
||||||
|
try:
|
||||||
|
# _pid can be None if this process is a child from another
|
||||||
|
# python process, which has started the resource_tracker.
|
||||||
|
if self._pid is not None:
|
||||||
|
os.waitpid(self._pid, 0)
|
||||||
|
except ChildProcessError:
|
||||||
|
# The resource_tracker has already been terminated.
|
||||||
|
pass
|
||||||
|
self._fd = None
|
||||||
|
self._pid = None
|
||||||
|
self._exitcode = None
|
||||||
|
|
||||||
|
warnings.warn('resource_tracker: process died unexpectedly, '
|
||||||
|
'relaunching. Some resources might leak.')
|
||||||
|
|
||||||
|
def _launch(self):
|
||||||
|
fds_to_pass = []
|
||||||
|
try:
|
||||||
|
fds_to_pass.append(sys.stderr.fileno())
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
r, w = os.pipe()
|
||||||
|
try:
|
||||||
|
fds_to_pass.append(r)
|
||||||
|
# process will out live us, so no need to wait on pid
|
||||||
|
exe = spawn.get_executable()
|
||||||
|
args = [
|
||||||
|
exe,
|
||||||
|
*util._args_from_interpreter_flags(),
|
||||||
|
'-c',
|
||||||
|
f'from multiprocessing.resource_tracker import main;main({r})',
|
||||||
|
]
|
||||||
|
# bpo-33613: Register a signal mask that will block the signals.
|
||||||
|
# This signal mask will be inherited by the child that is going
|
||||||
|
# to be spawned and will protect the child from a race condition
|
||||||
|
# that can make the child die before it registers signal handlers
|
||||||
|
# for SIGINT and SIGTERM. The mask is unregistered after spawning
|
||||||
|
# the child.
|
||||||
|
prev_sigmask = None
|
||||||
|
try:
|
||||||
|
if _HAVE_SIGMASK:
|
||||||
|
prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
|
||||||
|
pid = util.spawnv_passfds(exe, args, fds_to_pass)
|
||||||
|
finally:
|
||||||
|
if prev_sigmask is not None:
|
||||||
|
signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask)
|
||||||
|
except:
|
||||||
|
os.close(w)
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
self._fd = w
|
||||||
|
self._pid = pid
|
||||||
|
finally:
|
||||||
|
os.close(r)
|
||||||
|
|
||||||
|
def _ensure_running_and_write(self, msg=None):
|
||||||
with self._lock:
|
with self._lock:
|
||||||
if self._lock._recursion_count() > 1:
|
if self._lock._recursion_count() > 1:
|
||||||
# The code below is certainly not reentrant-safe, so bail out
|
# The code below is certainly not reentrant-safe, so bail out
|
||||||
return self._reentrant_call_error()
|
if msg is None:
|
||||||
|
raise self._reentrant_call_error()
|
||||||
|
return self._reentrant_messages.append(msg)
|
||||||
|
|
||||||
if self._fd is not None:
|
if self._fd is not None:
|
||||||
# resource tracker was launched before, is it still running?
|
# resource tracker was launched before, is it still running?
|
||||||
if self._check_alive():
|
if msg is None:
|
||||||
# => still alive
|
to_send = b'PROBE:0:noop\n'
|
||||||
return
|
else:
|
||||||
# => dead, launch it again
|
to_send = msg
|
||||||
os.close(self._fd)
|
|
||||||
|
|
||||||
# Clean-up to avoid dangling processes.
|
|
||||||
try:
|
try:
|
||||||
# _pid can be None if this process is a child from another
|
self._write(to_send)
|
||||||
# python process, which has started the resource_tracker.
|
except OSError:
|
||||||
if self._pid is not None:
|
self._teardown_dead_process()
|
||||||
os.waitpid(self._pid, 0)
|
self._launch()
|
||||||
except ChildProcessError:
|
|
||||||
# The resource_tracker has already been terminated.
|
|
||||||
pass
|
|
||||||
self._fd = None
|
|
||||||
self._pid = None
|
|
||||||
self._exitcode = None
|
|
||||||
|
|
||||||
warnings.warn('resource_tracker: process died unexpectedly, '
|
msg = None # message was sent in probe
|
||||||
'relaunching. Some resources might leak.')
|
|
||||||
|
|
||||||
fds_to_pass = []
|
|
||||||
try:
|
|
||||||
fds_to_pass.append(sys.stderr.fileno())
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
|
|
||||||
r, w = os.pipe()
|
|
||||||
try:
|
|
||||||
fds_to_pass.append(r)
|
|
||||||
# process will out live us, so no need to wait on pid
|
|
||||||
exe = spawn.get_executable()
|
|
||||||
args = [exe] + util._args_from_interpreter_flags()
|
|
||||||
args += ['-c', cmd % r]
|
|
||||||
# bpo-33613: Register a signal mask that will block the signals.
|
|
||||||
# This signal mask will be inherited by the child that is going
|
|
||||||
# to be spawned and will protect the child from a race condition
|
|
||||||
# that can make the child die before it registers signal handlers
|
|
||||||
# for SIGINT and SIGTERM. The mask is unregistered after spawning
|
|
||||||
# the child.
|
|
||||||
prev_sigmask = None
|
|
||||||
try:
|
|
||||||
if _HAVE_SIGMASK:
|
|
||||||
prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
|
|
||||||
pid = util.spawnv_passfds(exe, args, fds_to_pass)
|
|
||||||
finally:
|
|
||||||
if prev_sigmask is not None:
|
|
||||||
signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask)
|
|
||||||
except:
|
|
||||||
os.close(w)
|
|
||||||
raise
|
|
||||||
else:
|
else:
|
||||||
self._fd = w
|
self._launch()
|
||||||
self._pid = pid
|
|
||||||
finally:
|
while True:
|
||||||
os.close(r)
|
try:
|
||||||
|
reentrant_msg = self._reentrant_messages.popleft()
|
||||||
|
except IndexError:
|
||||||
|
break
|
||||||
|
self._write(reentrant_msg)
|
||||||
|
if msg is not None:
|
||||||
|
self._write(msg)
|
||||||
|
|
||||||
def _check_alive(self):
|
def _check_alive(self):
|
||||||
'''Check that the pipe has not been closed by sending a probe.'''
|
'''Check that the pipe has not been closed by sending a probe.'''
|
||||||
|
|
@ -211,27 +243,18 @@ class ResourceTracker(object):
|
||||||
'''Unregister name of resource with resource tracker.'''
|
'''Unregister name of resource with resource tracker.'''
|
||||||
self._send('UNREGISTER', name, rtype)
|
self._send('UNREGISTER', name, rtype)
|
||||||
|
|
||||||
|
def _write(self, msg):
|
||||||
|
nbytes = os.write(self._fd, msg)
|
||||||
|
assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}"
|
||||||
|
|
||||||
def _send(self, cmd, name, rtype):
|
def _send(self, cmd, name, rtype):
|
||||||
try:
|
msg = f"{cmd}:{name}:{rtype}\n".encode("ascii")
|
||||||
self.ensure_running()
|
|
||||||
except ReentrantCallError:
|
|
||||||
# The code below might or might not work, depending on whether
|
|
||||||
# the resource tracker was already running and still alive.
|
|
||||||
# Better warn the user.
|
|
||||||
# (XXX is warnings.warn itself reentrant-safe? :-)
|
|
||||||
warnings.warn(
|
|
||||||
f"ResourceTracker called reentrantly for resource cleanup, "
|
|
||||||
f"which is unsupported. "
|
|
||||||
f"The {rtype} object {name!r} might leak.")
|
|
||||||
msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
|
|
||||||
if len(msg) > 512:
|
if len(msg) > 512:
|
||||||
# posix guarantees that writes to a pipe of less than PIPE_BUF
|
# posix guarantees that writes to a pipe of less than PIPE_BUF
|
||||||
# bytes are atomic, and that PIPE_BUF >= 512
|
# bytes are atomic, and that PIPE_BUF >= 512
|
||||||
raise ValueError('msg too long')
|
raise ValueError('msg too long')
|
||||||
nbytes = os.write(self._fd, msg)
|
|
||||||
assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
|
|
||||||
nbytes, len(msg))
|
|
||||||
|
|
||||||
|
self._ensure_running_and_write(msg)
|
||||||
|
|
||||||
_resource_tracker = ResourceTracker()
|
_resource_tracker = ResourceTracker()
|
||||||
ensure_running = _resource_tracker.ensure_running
|
ensure_running = _resource_tracker.ensure_running
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
Make ``ResourceTracker.send`` from :mod:`multiprocessing` re-entrant safe
|
||||||
Loading…
Add table
Add a link
Reference in a new issue