mirror of
https://github.com/python/cpython.git
synced 2025-07-23 03:05:38 +00:00
bpo-36867: Make semaphore_tracker track other system resources (GH-13222)
The multiprocessing.resource_tracker replaces the multiprocessing.semaphore_tracker module. Other than semaphores, resource_tracker also tracks shared_memory segments. Patch by Pierre Glaser.
This commit is contained in:
parent
d0d64ad1f5
commit
f22cc69b01
9 changed files with 209 additions and 124 deletions
|
@ -11,7 +11,7 @@ import warnings
|
|||
from . import connection
|
||||
from . import process
|
||||
from .context import reduction
|
||||
from . import semaphore_tracker
|
||||
from . import resource_tracker
|
||||
from . import spawn
|
||||
from . import util
|
||||
|
||||
|
@ -69,7 +69,7 @@ class ForkServer(object):
|
|||
parent_r, child_w = os.pipe()
|
||||
child_r, parent_w = os.pipe()
|
||||
allfds = [child_r, child_w, self._forkserver_alive_fd,
|
||||
semaphore_tracker.getfd()]
|
||||
resource_tracker.getfd()]
|
||||
allfds += fds
|
||||
try:
|
||||
reduction.sendfds(client, allfds)
|
||||
|
@ -90,7 +90,7 @@ class ForkServer(object):
|
|||
ensure_running() will do nothing.
|
||||
'''
|
||||
with self._lock:
|
||||
semaphore_tracker.ensure_running()
|
||||
resource_tracker.ensure_running()
|
||||
if self._forkserver_pid is not None:
|
||||
# forkserver was launched before, is it still running?
|
||||
pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG)
|
||||
|
@ -290,7 +290,7 @@ def _serve_one(child_r, fds, unused_fds, handlers):
|
|||
os.close(fd)
|
||||
|
||||
(_forkserver._forkserver_alive_fd,
|
||||
semaphore_tracker._semaphore_tracker._fd,
|
||||
resource_tracker._resource_tracker._fd,
|
||||
*_forkserver._inherited_fds) = fds
|
||||
|
||||
# Run process object received over pipe
|
||||
|
|
|
@ -36,8 +36,8 @@ class Popen(popen_fork.Popen):
|
|||
return fd
|
||||
|
||||
def _launch(self, process_obj):
|
||||
from . import semaphore_tracker
|
||||
tracker_fd = semaphore_tracker.getfd()
|
||||
from . import resource_tracker
|
||||
tracker_fd = resource_tracker.getfd()
|
||||
self._fds.append(tracker_fd)
|
||||
prep_data = spawn.get_preparation_data(process_obj._name)
|
||||
fp = io.BytesIO()
|
||||
|
|
|
@ -1,15 +1,19 @@
|
|||
###############################################################################
|
||||
# Server process to keep track of unlinked resources (like shared memory
|
||||
# segments, semaphores etc.) and clean them.
|
||||
#
|
||||
# On Unix we run a server process which keeps track of unlinked
|
||||
# semaphores. The server ignores SIGINT and SIGTERM and reads from a
|
||||
# resources. The server ignores SIGINT and SIGTERM and reads from a
|
||||
# pipe. Every other process of the program has a copy of the writable
|
||||
# end of the pipe, so we get EOF when all other processes have exited.
|
||||
# Then the server process unlinks any remaining semaphore names.
|
||||
#
|
||||
# This is important because the system only supports a limited number
|
||||
# of named semaphores, and they will not be automatically removed till
|
||||
# the next reboot. Without this semaphore tracker process, "killall
|
||||
# python" would probably leave unlinked semaphores.
|
||||
# Then the server process unlinks any remaining resource names.
|
||||
#
|
||||
# This is important because there may be system limits for such resources: for
|
||||
# instance, the system only supports a limited number of named semaphores, and
|
||||
# shared-memory segments live in the RAM. If a python process leaks such a
|
||||
# resource, this resource will not be removed till the next reboot. Without
|
||||
# this resource tracker process, "killall python" would probably leave unlinked
|
||||
# resources.
|
||||
|
||||
import os
|
||||
import signal
|
||||
|
@ -17,6 +21,7 @@ import sys
|
|||
import threading
|
||||
import warnings
|
||||
import _multiprocessing
|
||||
import _posixshmem
|
||||
|
||||
from . import spawn
|
||||
from . import util
|
||||
|
@ -26,8 +31,14 @@ __all__ = ['ensure_running', 'register', 'unregister']
|
|||
_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
|
||||
_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
|
||||
|
||||
_CLEANUP_FUNCS = {
|
||||
'noop': lambda: None,
|
||||
'semaphore': _multiprocessing.sem_unlink,
|
||||
'shared_memory': _posixshmem.shm_unlink
|
||||
}
|
||||
|
||||
class SemaphoreTracker(object):
|
||||
|
||||
class ResourceTracker(object):
|
||||
|
||||
def __init__(self):
|
||||
self._lock = threading.Lock()
|
||||
|
@ -39,13 +50,13 @@ class SemaphoreTracker(object):
|
|||
return self._fd
|
||||
|
||||
def ensure_running(self):
|
||||
'''Make sure that semaphore tracker process is running.
|
||||
'''Make sure that resource tracker process is running.
|
||||
|
||||
This can be run from any process. Usually a child process will use
|
||||
the semaphore created by its parent.'''
|
||||
the resource created by its parent.'''
|
||||
with self._lock:
|
||||
if self._fd is not None:
|
||||
# semaphore tracker was launched before, is it still running?
|
||||
# resource tracker was launched before, is it still running?
|
||||
if self._check_alive():
|
||||
# => still alive
|
||||
return
|
||||
|
@ -55,24 +66,24 @@ class SemaphoreTracker(object):
|
|||
# Clean-up to avoid dangling processes.
|
||||
try:
|
||||
# _pid can be None if this process is a child from another
|
||||
# python process, which has started the semaphore_tracker.
|
||||
# python process, which has started the resource_tracker.
|
||||
if self._pid is not None:
|
||||
os.waitpid(self._pid, 0)
|
||||
except ChildProcessError:
|
||||
# The semaphore_tracker has already been terminated.
|
||||
# The resource_tracker has already been terminated.
|
||||
pass
|
||||
self._fd = None
|
||||
self._pid = None
|
||||
|
||||
warnings.warn('semaphore_tracker: process died unexpectedly, '
|
||||
'relaunching. Some semaphores might leak.')
|
||||
warnings.warn('resource_tracker: process died unexpectedly, '
|
||||
'relaunching. Some resources might leak.')
|
||||
|
||||
fds_to_pass = []
|
||||
try:
|
||||
fds_to_pass.append(sys.stderr.fileno())
|
||||
except Exception:
|
||||
pass
|
||||
cmd = 'from multiprocessing.semaphore_tracker import main;main(%d)'
|
||||
cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
|
||||
r, w = os.pipe()
|
||||
try:
|
||||
fds_to_pass.append(r)
|
||||
|
@ -107,23 +118,23 @@ class SemaphoreTracker(object):
|
|||
try:
|
||||
# We cannot use send here as it calls ensure_running, creating
|
||||
# a cycle.
|
||||
os.write(self._fd, b'PROBE:0\n')
|
||||
os.write(self._fd, b'PROBE:0:noop\n')
|
||||
except OSError:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def register(self, name):
|
||||
'''Register name of semaphore with semaphore tracker.'''
|
||||
self._send('REGISTER', name)
|
||||
def register(self, name, rtype):
|
||||
'''Register name of resource with resource tracker.'''
|
||||
self._send('REGISTER', name, rtype)
|
||||
|
||||
def unregister(self, name):
|
||||
'''Unregister name of semaphore with semaphore tracker.'''
|
||||
self._send('UNREGISTER', name)
|
||||
def unregister(self, name, rtype):
|
||||
'''Unregister name of resource with resource tracker.'''
|
||||
self._send('UNREGISTER', name, rtype)
|
||||
|
||||
def _send(self, cmd, name):
|
||||
def _send(self, cmd, name, rtype):
|
||||
self.ensure_running()
|
||||
msg = '{0}:{1}\n'.format(cmd, name).encode('ascii')
|
||||
msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
|
||||
if len(name) > 512:
|
||||
# posix guarantees that writes to a pipe of less than PIPE_BUF
|
||||
# bytes are atomic, and that PIPE_BUF >= 512
|
||||
|
@ -133,14 +144,14 @@ class SemaphoreTracker(object):
|
|||
nbytes, len(msg))
|
||||
|
||||
|
||||
_semaphore_tracker = SemaphoreTracker()
|
||||
ensure_running = _semaphore_tracker.ensure_running
|
||||
register = _semaphore_tracker.register
|
||||
unregister = _semaphore_tracker.unregister
|
||||
getfd = _semaphore_tracker.getfd
|
||||
_resource_tracker = ResourceTracker()
|
||||
ensure_running = _resource_tracker.ensure_running
|
||||
register = _resource_tracker.register
|
||||
unregister = _resource_tracker.unregister
|
||||
getfd = _resource_tracker.getfd
|
||||
|
||||
def main(fd):
|
||||
'''Run semaphore tracker.'''
|
||||
'''Run resource tracker.'''
|
||||
# protect the process from ^C and "killall python" etc
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
||||
|
@ -153,18 +164,24 @@ def main(fd):
|
|||
except Exception:
|
||||
pass
|
||||
|
||||
cache = set()
|
||||
cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
|
||||
try:
|
||||
# keep track of registered/unregistered semaphores
|
||||
# keep track of registered/unregistered resources
|
||||
with open(fd, 'rb') as f:
|
||||
for line in f:
|
||||
try:
|
||||
cmd, name = line.strip().split(b':')
|
||||
if cmd == b'REGISTER':
|
||||
cache.add(name)
|
||||
elif cmd == b'UNREGISTER':
|
||||
cache.remove(name)
|
||||
elif cmd == b'PROBE':
|
||||
cmd, name, rtype = line.strip().decode('ascii').split(':')
|
||||
cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
|
||||
if cleanup_func is None:
|
||||
raise ValueError(
|
||||
f'Cannot register {name} for automatic cleanup: '
|
||||
f'unknown resource type {rtype}')
|
||||
|
||||
if cmd == 'REGISTER':
|
||||
cache[rtype].add(name)
|
||||
elif cmd == 'UNREGISTER':
|
||||
cache[rtype].remove(name)
|
||||
elif cmd == 'PROBE':
|
||||
pass
|
||||
else:
|
||||
raise RuntimeError('unrecognized command %r' % cmd)
|
||||
|
@ -174,23 +191,23 @@ def main(fd):
|
|||
except:
|
||||
pass
|
||||
finally:
|
||||
# all processes have terminated; cleanup any remaining semaphores
|
||||
if cache:
|
||||
try:
|
||||
warnings.warn('semaphore_tracker: There appear to be %d '
|
||||
'leaked semaphores to clean up at shutdown' %
|
||||
len(cache))
|
||||
except Exception:
|
||||
pass
|
||||
for name in cache:
|
||||
# For some reason the process which created and registered this
|
||||
# semaphore has failed to unregister it. Presumably it has died.
|
||||
# We therefore unlink it.
|
||||
try:
|
||||
name = name.decode('ascii')
|
||||
# all processes have terminated; cleanup any remaining resources
|
||||
for rtype, rtype_cache in cache.items():
|
||||
if rtype_cache:
|
||||
try:
|
||||
_multiprocessing.sem_unlink(name)
|
||||
except Exception as e:
|
||||
warnings.warn('semaphore_tracker: %r: %s' % (name, e))
|
||||
finally:
|
||||
pass
|
||||
warnings.warn('resource_tracker: There appear to be %d '
|
||||
'leaked %s objects to clean up at shutdown' %
|
||||
(len(rtype_cache), rtype))
|
||||
except Exception:
|
||||
pass
|
||||
for name in rtype_cache:
|
||||
# For some reason the process which created and registered this
|
||||
# resource has failed to unregister it. Presumably it has
|
||||
# died. We therefore unlink it.
|
||||
try:
|
||||
try:
|
||||
_CLEANUP_FUNCS[rtype](name)
|
||||
except Exception as e:
|
||||
warnings.warn('resource_tracker: %r: %s' % (name, e))
|
||||
finally:
|
||||
pass
|
|
@ -113,6 +113,9 @@ class SharedMemory:
|
|||
self.unlink()
|
||||
raise
|
||||
|
||||
from .resource_tracker import register
|
||||
register(self._name, "shared_memory")
|
||||
|
||||
else:
|
||||
|
||||
# Windows Named Shared Memory
|
||||
|
@ -231,7 +234,9 @@ class SharedMemory:
|
|||
called once (and only once) across all processes which have access
|
||||
to the shared memory block."""
|
||||
if _USE_POSIX and self._name:
|
||||
from .resource_tracker import unregister
|
||||
_posixshmem.shm_unlink(self._name)
|
||||
unregister(self._name, "shared_memory")
|
||||
|
||||
|
||||
_encoding = "utf8"
|
||||
|
|
|
@ -111,8 +111,8 @@ def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
|
|||
_winapi.CloseHandle(source_process)
|
||||
fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
|
||||
else:
|
||||
from . import semaphore_tracker
|
||||
semaphore_tracker._semaphore_tracker._fd = tracker_fd
|
||||
from . import resource_tracker
|
||||
resource_tracker._resource_tracker._fd = tracker_fd
|
||||
fd = pipe_handle
|
||||
exitcode = _main(fd)
|
||||
sys.exit(exitcode)
|
||||
|
|
|
@ -76,16 +76,16 @@ class SemLock(object):
|
|||
# We only get here if we are on Unix with forking
|
||||
# disabled. When the object is garbage collected or the
|
||||
# process shuts down we unlink the semaphore name
|
||||
from .semaphore_tracker import register
|
||||
register(self._semlock.name)
|
||||
from .resource_tracker import register
|
||||
register(self._semlock.name, "semaphore")
|
||||
util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
|
||||
exitpriority=0)
|
||||
|
||||
@staticmethod
|
||||
def _cleanup(name):
|
||||
from .semaphore_tracker import unregister
|
||||
from .resource_tracker import unregister
|
||||
sem_unlink(name)
|
||||
unregister(name)
|
||||
unregister(name, "semaphore")
|
||||
|
||||
def _make_methods(self):
|
||||
self.acquire = self._semlock.acquire
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue