mirror of
https://github.com/python/cpython.git
synced 2025-09-26 10:19:53 +00:00
gh-104090: Add exit code to multiprocessing ResourceTracker (GH-115410)
This builds on https://github.com/python/cpython/pull/106807, which adds a return code to ResourceTracker, to make future debugging easier. Testing this “in situ” proved difficult, since the global ResourceTracker is involved in test infrastructure. So, the tests here create a new instance and feed it fake data. --------- Co-authored-by: Yonatan Bitton <yonatan.bitton@perception-point.io> Co-authored-by: Yonatan Bitton <bityob@gmail.com> Co-authored-by: Antoine Pitrou <antoine@python.org>
This commit is contained in:
parent
b052fa381f
commit
4a9e6497c2
4 changed files with 94 additions and 7 deletions
|
@ -29,8 +29,12 @@ __all__ = ['ensure_running', 'register', 'unregister']
|
||||||
_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
|
_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
|
||||||
_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
|
_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
|
||||||
|
|
||||||
|
def cleanup_noop(name):
|
||||||
|
raise RuntimeError('noop should never be registered or cleaned up')
|
||||||
|
|
||||||
_CLEANUP_FUNCS = {
|
_CLEANUP_FUNCS = {
|
||||||
'noop': lambda: None,
|
'noop': cleanup_noop,
|
||||||
|
'dummy': lambda name: None, # Dummy resource used in tests
|
||||||
}
|
}
|
||||||
|
|
||||||
if os.name == 'posix':
|
if os.name == 'posix':
|
||||||
|
@ -61,6 +65,7 @@ class ResourceTracker(object):
|
||||||
self._lock = threading.RLock()
|
self._lock = threading.RLock()
|
||||||
self._fd = None
|
self._fd = None
|
||||||
self._pid = None
|
self._pid = None
|
||||||
|
self._exitcode = None
|
||||||
|
|
||||||
def _reentrant_call_error(self):
|
def _reentrant_call_error(self):
|
||||||
# gh-109629: this happens if an explicit call to the ResourceTracker
|
# gh-109629: this happens if an explicit call to the ResourceTracker
|
||||||
|
@ -84,9 +89,16 @@ class ResourceTracker(object):
|
||||||
os.close(self._fd)
|
os.close(self._fd)
|
||||||
self._fd = None
|
self._fd = None
|
||||||
|
|
||||||
os.waitpid(self._pid, 0)
|
_, status = os.waitpid(self._pid, 0)
|
||||||
|
|
||||||
self._pid = None
|
self._pid = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._exitcode = os.waitstatus_to_exitcode(status)
|
||||||
|
except ValueError:
|
||||||
|
# os.waitstatus_to_exitcode may raise an exception for invalid values
|
||||||
|
self._exitcode = None
|
||||||
|
|
||||||
def getfd(self):
|
def getfd(self):
|
||||||
self.ensure_running()
|
self.ensure_running()
|
||||||
return self._fd
|
return self._fd
|
||||||
|
@ -119,6 +131,7 @@ class ResourceTracker(object):
|
||||||
pass
|
pass
|
||||||
self._fd = None
|
self._fd = None
|
||||||
self._pid = None
|
self._pid = None
|
||||||
|
self._exitcode = None
|
||||||
|
|
||||||
warnings.warn('resource_tracker: process died unexpectedly, '
|
warnings.warn('resource_tracker: process died unexpectedly, '
|
||||||
'relaunching. Some resources might leak.')
|
'relaunching. Some resources might leak.')
|
||||||
|
@ -221,6 +234,8 @@ def main(fd):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
|
cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
|
||||||
|
exit_code = 0
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# keep track of registered/unregistered resources
|
# keep track of registered/unregistered resources
|
||||||
with open(fd, 'rb') as f:
|
with open(fd, 'rb') as f:
|
||||||
|
@ -242,6 +257,7 @@ def main(fd):
|
||||||
else:
|
else:
|
||||||
raise RuntimeError('unrecognized command %r' % cmd)
|
raise RuntimeError('unrecognized command %r' % cmd)
|
||||||
except Exception:
|
except Exception:
|
||||||
|
exit_code = 3
|
||||||
try:
|
try:
|
||||||
sys.excepthook(*sys.exc_info())
|
sys.excepthook(*sys.exc_info())
|
||||||
except:
|
except:
|
||||||
|
@ -251,10 +267,17 @@ def main(fd):
|
||||||
for rtype, rtype_cache in cache.items():
|
for rtype, rtype_cache in cache.items():
|
||||||
if rtype_cache:
|
if rtype_cache:
|
||||||
try:
|
try:
|
||||||
warnings.warn(
|
exit_code = 1
|
||||||
f'resource_tracker: There appear to be {len(rtype_cache)} '
|
if rtype == 'dummy':
|
||||||
f'leaked {rtype} objects to clean up at shutdown: {rtype_cache}'
|
# The test 'dummy' resource is expected to leak.
|
||||||
)
|
# We skip the warning (and *only* the warning) for it.
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
warnings.warn(
|
||||||
|
f'resource_tracker: There appear to be '
|
||||||
|
f'{len(rtype_cache)} leaked {rtype} objects to '
|
||||||
|
f'clean up at shutdown: {rtype_cache}'
|
||||||
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
for name in rtype_cache:
|
for name in rtype_cache:
|
||||||
|
@ -265,6 +288,9 @@ def main(fd):
|
||||||
try:
|
try:
|
||||||
_CLEANUP_FUNCS[rtype](name)
|
_CLEANUP_FUNCS[rtype](name)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
exit_code = 2
|
||||||
warnings.warn('resource_tracker: %r: %s' % (name, e))
|
warnings.warn('resource_tracker: %r: %s' % (name, e))
|
||||||
finally:
|
finally:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
sys.exit(exit_code)
|
||||||
|
|
|
@ -5609,8 +5609,9 @@ class TestResourceTracker(unittest.TestCase):
|
||||||
'''
|
'''
|
||||||
for rtype in resource_tracker._CLEANUP_FUNCS:
|
for rtype in resource_tracker._CLEANUP_FUNCS:
|
||||||
with self.subTest(rtype=rtype):
|
with self.subTest(rtype=rtype):
|
||||||
if rtype == "noop":
|
if rtype in ("noop", "dummy"):
|
||||||
# Artefact resource type used by the resource_tracker
|
# Artefact resource type used by the resource_tracker
|
||||||
|
# or tests
|
||||||
continue
|
continue
|
||||||
r, w = os.pipe()
|
r, w = os.pipe()
|
||||||
p = subprocess.Popen([sys.executable,
|
p = subprocess.Popen([sys.executable,
|
||||||
|
@ -5730,6 +5731,38 @@ class TestResourceTracker(unittest.TestCase):
|
||||||
with self.assertRaises(ValueError):
|
with self.assertRaises(ValueError):
|
||||||
resource_tracker.register(too_long_name_resource, rtype)
|
resource_tracker.register(too_long_name_resource, rtype)
|
||||||
|
|
||||||
|
def _test_resource_tracker_leak_resources(self, cleanup):
|
||||||
|
# We use a separate instance for testing, since the main global
|
||||||
|
# _resource_tracker may be used to watch test infrastructure.
|
||||||
|
from multiprocessing.resource_tracker import ResourceTracker
|
||||||
|
tracker = ResourceTracker()
|
||||||
|
tracker.ensure_running()
|
||||||
|
self.assertTrue(tracker._check_alive())
|
||||||
|
|
||||||
|
self.assertIsNone(tracker._exitcode)
|
||||||
|
tracker.register('somename', 'dummy')
|
||||||
|
if cleanup:
|
||||||
|
tracker.unregister('somename', 'dummy')
|
||||||
|
expected_exit_code = 0
|
||||||
|
else:
|
||||||
|
expected_exit_code = 1
|
||||||
|
|
||||||
|
self.assertTrue(tracker._check_alive())
|
||||||
|
self.assertIsNone(tracker._exitcode)
|
||||||
|
tracker._stop()
|
||||||
|
self.assertEqual(tracker._exitcode, expected_exit_code)
|
||||||
|
|
||||||
|
def test_resource_tracker_exit_code(self):
|
||||||
|
"""
|
||||||
|
Test the exit code of the resource tracker.
|
||||||
|
|
||||||
|
If no leaked resources were found, exit code should be 0, otherwise 1
|
||||||
|
"""
|
||||||
|
for cleanup in [True, False]:
|
||||||
|
with self.subTest(cleanup=cleanup):
|
||||||
|
self._test_resource_tracker_leak_resources(
|
||||||
|
cleanup=cleanup,
|
||||||
|
)
|
||||||
|
|
||||||
class TestSimpleQueue(unittest.TestCase):
|
class TestSimpleQueue(unittest.TestCase):
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ import logging
|
||||||
import queue
|
import queue
|
||||||
import time
|
import time
|
||||||
import unittest
|
import unittest
|
||||||
|
import sys
|
||||||
from concurrent.futures._base import BrokenExecutor
|
from concurrent.futures._base import BrokenExecutor
|
||||||
from logging.handlers import QueueHandler
|
from logging.handlers import QueueHandler
|
||||||
|
|
||||||
|
@ -109,6 +110,31 @@ create_executor_tests(globals(), InitializerMixin)
|
||||||
create_executor_tests(globals(), FailingInitializerMixin)
|
create_executor_tests(globals(), FailingInitializerMixin)
|
||||||
|
|
||||||
|
|
||||||
|
@unittest.skipIf(sys.platform == "win32", "Resource Tracker doesn't run on Windows")
|
||||||
|
class FailingInitializerResourcesTest(unittest.TestCase):
|
||||||
|
"""
|
||||||
|
Source: https://github.com/python/cpython/issues/104090
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _test(self, test_class):
|
||||||
|
runner = unittest.TextTestRunner()
|
||||||
|
runner.run(test_class('test_initializer'))
|
||||||
|
|
||||||
|
# GH-104090:
|
||||||
|
# Stop resource tracker manually now, so we can verify there are not leaked resources by checking
|
||||||
|
# the process exit code
|
||||||
|
from multiprocessing.resource_tracker import _resource_tracker
|
||||||
|
_resource_tracker._stop()
|
||||||
|
|
||||||
|
self.assertEqual(_resource_tracker._exitcode, 0)
|
||||||
|
|
||||||
|
def test_spawn(self):
|
||||||
|
self._test(ProcessPoolSpawnFailingInitializerTest)
|
||||||
|
|
||||||
|
def test_forkserver(self):
|
||||||
|
self._test(ProcessPoolForkserverFailingInitializerTest)
|
||||||
|
|
||||||
|
|
||||||
def setUpModule():
|
def setUpModule():
|
||||||
setup_module()
|
setup_module()
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
The multiprocessing resource tracker now exits with non-zero status code if a resource
|
||||||
|
leak was detected. It still exits with status code 0 otherwise.
|
Loading…
Add table
Add a link
Reference in a new issue