mirror of
https://github.com/python/cpython.git
synced 2025-08-31 05:58:33 +00:00
test_multiprocessing detects dangling per test case (#2841)
bpo-26762: test_multiprocessing now detects dangling processes and threads per test case classes: * setUpClass()/tearDownClass() of mixin classes now check if multiprocessing.process._dangling or threading._dangling was modified to detect "dangling" processses and threads. * ManagerMixin.tearDownClass() now also emits a warning if it still has more than one active child process after 5 seconds. * tearDownModule() now checks for dangling processes and threads before sleep 500 ms. And it now only sleeps if there is a least one dangling process or thread.
This commit is contained in:
parent
d7e64d9934
commit
ffb49408f0
1 changed files with 67 additions and 14 deletions
|
@ -4303,7 +4303,32 @@ class TestSimpleQueue(unittest.TestCase):
|
||||||
# Mixins
|
# Mixins
|
||||||
#
|
#
|
||||||
|
|
||||||
class ProcessesMixin(object):
|
class BaseMixin(object):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
cls.dangling = (multiprocessing.process._dangling.copy(),
|
||||||
|
threading._dangling.copy())
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def tearDownClass(cls):
|
||||||
|
# bpo-26762: Some multiprocessing objects like Pool create reference
|
||||||
|
# cycles. Trigger a garbage collection to break these cycles.
|
||||||
|
test.support.gc_collect()
|
||||||
|
|
||||||
|
processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
|
||||||
|
if processes:
|
||||||
|
print('Warning -- Dangling processes: %s' % processes,
|
||||||
|
file=sys.stderr)
|
||||||
|
processes = None
|
||||||
|
|
||||||
|
threads = set(threading._dangling) - set(cls.dangling[1])
|
||||||
|
if threads:
|
||||||
|
print('Warning -- Dangling threads: %s' % threads,
|
||||||
|
file=sys.stderr)
|
||||||
|
threads = None
|
||||||
|
|
||||||
|
|
||||||
|
class ProcessesMixin(BaseMixin):
|
||||||
TYPE = 'processes'
|
TYPE = 'processes'
|
||||||
Process = multiprocessing.Process
|
Process = multiprocessing.Process
|
||||||
connection = multiprocessing.connection
|
connection = multiprocessing.connection
|
||||||
|
@ -4326,7 +4351,7 @@ class ProcessesMixin(object):
|
||||||
RawArray = staticmethod(multiprocessing.RawArray)
|
RawArray = staticmethod(multiprocessing.RawArray)
|
||||||
|
|
||||||
|
|
||||||
class ManagerMixin(object):
|
class ManagerMixin(BaseMixin):
|
||||||
TYPE = 'manager'
|
TYPE = 'manager'
|
||||||
Process = multiprocessing.Process
|
Process = multiprocessing.Process
|
||||||
Queue = property(operator.attrgetter('manager.Queue'))
|
Queue = property(operator.attrgetter('manager.Queue'))
|
||||||
|
@ -4350,6 +4375,7 @@ class ManagerMixin(object):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def setUpClass(cls):
|
def setUpClass(cls):
|
||||||
|
super().setUpClass()
|
||||||
cls.manager = multiprocessing.Manager()
|
cls.manager = multiprocessing.Manager()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -4357,23 +4383,35 @@ class ManagerMixin(object):
|
||||||
# only the manager process should be returned by active_children()
|
# only the manager process should be returned by active_children()
|
||||||
# but this can take a bit on slow machines, so wait a few seconds
|
# but this can take a bit on slow machines, so wait a few seconds
|
||||||
# if there are other children too (see #17395)
|
# if there are other children too (see #17395)
|
||||||
|
start_time = time.monotonic()
|
||||||
t = 0.01
|
t = 0.01
|
||||||
while len(multiprocessing.active_children()) > 1 and t < 5:
|
while len(multiprocessing.active_children()) > 1:
|
||||||
time.sleep(t)
|
time.sleep(t)
|
||||||
t *= 2
|
t *= 2
|
||||||
|
dt = time.monotonic() - start_time
|
||||||
|
if dt >= 5.0:
|
||||||
|
print("Warning -- multiprocessing.Manager still has %s active "
|
||||||
|
"children after %s seconds"
|
||||||
|
% (multiprocessing.active_children(), dt),
|
||||||
|
file=sys.stderr)
|
||||||
|
break
|
||||||
|
|
||||||
gc.collect() # do garbage collection
|
gc.collect() # do garbage collection
|
||||||
if cls.manager._number_of_objects() != 0:
|
if cls.manager._number_of_objects() != 0:
|
||||||
# This is not really an error since some tests do not
|
# This is not really an error since some tests do not
|
||||||
# ensure that all processes which hold a reference to a
|
# ensure that all processes which hold a reference to a
|
||||||
# managed object have been joined.
|
# managed object have been joined.
|
||||||
print('Shared objects which still exist at manager shutdown:')
|
print('Warning -- Shared objects which still exist at manager '
|
||||||
|
'shutdown:')
|
||||||
print(cls.manager._debug_info())
|
print(cls.manager._debug_info())
|
||||||
cls.manager.shutdown()
|
cls.manager.shutdown()
|
||||||
cls.manager.join()
|
cls.manager.join()
|
||||||
cls.manager = None
|
cls.manager = None
|
||||||
|
|
||||||
|
super().tearDownClass()
|
||||||
|
|
||||||
class ThreadsMixin(object):
|
|
||||||
|
class ThreadsMixin(BaseMixin):
|
||||||
TYPE = 'threads'
|
TYPE = 'threads'
|
||||||
Process = multiprocessing.dummy.Process
|
Process = multiprocessing.dummy.Process
|
||||||
connection = multiprocessing.dummy.connection
|
connection = multiprocessing.dummy.connection
|
||||||
|
@ -4450,18 +4488,33 @@ def install_tests_in_module_dict(remote_globs, start_method):
|
||||||
multiprocessing.get_logger().setLevel(LOG_LEVEL)
|
multiprocessing.get_logger().setLevel(LOG_LEVEL)
|
||||||
|
|
||||||
def tearDownModule():
|
def tearDownModule():
|
||||||
|
need_sleep = False
|
||||||
|
|
||||||
|
# bpo-26762: Some multiprocessing objects like Pool create reference
|
||||||
|
# cycles. Trigger a garbage collection to break these cycles.
|
||||||
|
test.support.gc_collect()
|
||||||
|
|
||||||
multiprocessing.set_start_method(old_start_method[0], force=True)
|
multiprocessing.set_start_method(old_start_method[0], force=True)
|
||||||
# pause a bit so we don't get warning about dangling threads/processes
|
# pause a bit so we don't get warning about dangling threads/processes
|
||||||
time.sleep(0.5)
|
processes = set(multiprocessing.process._dangling) - set(dangling[0])
|
||||||
|
if processes:
|
||||||
|
need_sleep = True
|
||||||
|
print('Warning -- Dangling processes: %s' % processes,
|
||||||
|
file=sys.stderr)
|
||||||
|
processes = None
|
||||||
|
|
||||||
|
threads = set(threading._dangling) - set(dangling[1])
|
||||||
|
if threads:
|
||||||
|
need_sleep = True
|
||||||
|
print('Warning -- Dangling threads: %s' % threads,
|
||||||
|
file=sys.stderr)
|
||||||
|
threads = None
|
||||||
|
|
||||||
|
# Sleep 500 ms to give time to child processes to complete.
|
||||||
|
if need_sleep:
|
||||||
|
time.sleep(0.5)
|
||||||
multiprocessing.process._cleanup()
|
multiprocessing.process._cleanup()
|
||||||
gc.collect()
|
test.support.gc_collect()
|
||||||
tmp = set(multiprocessing.process._dangling) - set(dangling[0])
|
|
||||||
if tmp:
|
|
||||||
print('Dangling processes:', tmp, file=sys.stderr)
|
|
||||||
del tmp
|
|
||||||
tmp = set(threading._dangling) - set(dangling[1])
|
|
||||||
if tmp:
|
|
||||||
print('Dangling threads:', tmp, file=sys.stderr)
|
|
||||||
|
|
||||||
remote_globs['setUpModule'] = setUpModule
|
remote_globs['setUpModule'] = setUpModule
|
||||||
remote_globs['tearDownModule'] = tearDownModule
|
remote_globs['tearDownModule'] = tearDownModule
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue