[3.6] bpo-26762, bpo-31019: Backport multiprocessing fixes from master to 3.6 (#2879)

* bpo-26762: Avoid daemon process in _test_multiprocessing (#2842)

test_level() of _test_multiprocessing._TestLogging now uses regular
processes rather than daemon processes to prevent zombi processes
(to not "leak" processes).
(cherry picked from commit 06634950c5)

* test_multiprocessing: Fix dangling process/thread (#2850)

bpo-26762: Fix more dangling processes and threads in
test_multiprocessing:

* Queue: call close() followed by join_thread()
* Process: call join() or self.addCleanup(p.join)
(cherry picked from commit d7e64d9934)

* test_multiprocessing detects dangling per test case (#2841)

bpo-26762: test_multiprocessing now detects dangling processes and
threads per test case classes:

* setUpClass()/tearDownClass() of mixin classes now check if
  multiprocessing.process._dangling or threading._dangling was
  modified to detect "dangling" processses and threads.
* ManagerMixin.tearDownClass() now also emits a warning if it still
  has more than one active child process after 5 seconds.
* tearDownModule() now checks for dangling processes and threads
  before sleep 500 ms. And it now only sleeps if there is a least one
  dangling process or thread.
(cherry picked from commit ffb49408f0)

* bpo-26762: test_multiprocessing close more queues (#2855)

* Close explicitly queues to make sure that we don't leave dangling
  threads
* test_queue_in_process(): remove unused queue
* test_access() joins also the process to fix a random warning
(cherry picked from commit b4c52966c8)

* bpo-31019: Fix multiprocessing.Process.is_alive() (#2875)

multiprocessing.Process.is_alive() now removes the process from the
_children set if the process completed.

The change prevents leaking "dangling" processes.
(cherry picked from commit 2db64823c2)
This commit is contained in:
Victor Stinner 2017-07-26 04:48:56 +02:00 committed by GitHub
parent efe9fcbd2c
commit d0adfb25c5
2 changed files with 111 additions and 24 deletions

View file

@ -132,10 +132,16 @@ class BaseProcess(object):
if self is _current_process: if self is _current_process:
return True return True
assert self._parent_pid == os.getpid(), 'can only test a child process' assert self._parent_pid == os.getpid(), 'can only test a child process'
if self._popen is None: if self._popen is None:
return False return False
self._popen.poll()
return self._popen.returncode is None returncode = self._popen.poll()
if returncode is None:
return True
else:
_children.discard(self)
return False
@property @property
def name(self): def name(self):

View file

@ -32,11 +32,12 @@ test.support.import_module('multiprocessing.synchronize')
# without thread support. # without thread support.
import threading import threading
import multiprocessing.dummy
import multiprocessing.connection import multiprocessing.connection
import multiprocessing.managers import multiprocessing.dummy
import multiprocessing.heap import multiprocessing.heap
import multiprocessing.managers
import multiprocessing.pool import multiprocessing.pool
import multiprocessing.queues
from multiprocessing import util from multiprocessing import util
@ -64,6 +65,13 @@ except ImportError:
def latin(s): def latin(s):
return s.encode('latin') return s.encode('latin')
def close_queue(queue):
if isinstance(queue, multiprocessing.queues.Queue):
queue.close()
queue.join_thread()
# #
# Constants # Constants
# #
@ -275,6 +283,7 @@ class _TestProcess(BaseTestCase):
self.assertEqual(p.exitcode, 0) self.assertEqual(p.exitcode, 0)
self.assertEqual(p.is_alive(), False) self.assertEqual(p.is_alive(), False)
self.assertNotIn(p, self.active_children()) self.assertNotIn(p, self.active_children())
close_queue(q)
@classmethod @classmethod
def _test_terminate(cls): def _test_terminate(cls):
@ -414,6 +423,7 @@ class _TestProcess(BaseTestCase):
p.join() p.join()
self.assertIs(wr(), None) self.assertIs(wr(), None)
self.assertEqual(q.get(), 5) self.assertEqual(q.get(), 5)
close_queue(q)
# #
@ -600,6 +610,7 @@ class _TestQueue(BaseTestCase):
self.assertEqual(queue_full(queue, MAXSIZE), False) self.assertEqual(queue_full(queue, MAXSIZE), False)
proc.join() proc.join()
close_queue(queue)
@classmethod @classmethod
def _test_get(cls, queue, child_can_start, parent_can_continue): def _test_get(cls, queue, child_can_start, parent_can_continue):
@ -662,6 +673,7 @@ class _TestQueue(BaseTestCase):
self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
proc.join() proc.join()
close_queue(queue)
@classmethod @classmethod
def _test_fork(cls, queue): def _test_fork(cls, queue):
@ -697,6 +709,7 @@ class _TestQueue(BaseTestCase):
self.assertRaises(pyqueue.Empty, queue.get, False) self.assertRaises(pyqueue.Empty, queue.get, False)
p.join() p.join()
close_queue(queue)
def test_qsize(self): def test_qsize(self):
q = self.Queue() q = self.Queue()
@ -712,6 +725,7 @@ class _TestQueue(BaseTestCase):
self.assertEqual(q.qsize(), 1) self.assertEqual(q.qsize(), 1)
q.get() q.get()
self.assertEqual(q.qsize(), 0) self.assertEqual(q.qsize(), 0)
close_queue(q)
@classmethod @classmethod
def _test_task_done(cls, q): def _test_task_done(cls, q):
@ -739,6 +753,7 @@ class _TestQueue(BaseTestCase):
for p in workers: for p in workers:
p.join() p.join()
close_queue(queue)
def test_no_import_lock_contention(self): def test_no_import_lock_contention(self):
with test.support.temp_cwd(): with test.support.temp_cwd():
@ -769,6 +784,7 @@ class _TestQueue(BaseTestCase):
# Tolerate a delta of 30 ms because of the bad clock resolution on # Tolerate a delta of 30 ms because of the bad clock resolution on
# Windows (usually 15.6 ms) # Windows (usually 15.6 ms)
self.assertGreaterEqual(delta, 0.170) self.assertGreaterEqual(delta, 0.170)
close_queue(q)
def test_queue_feeder_donot_stop_onexc(self): def test_queue_feeder_donot_stop_onexc(self):
# bpo-30414: verify feeder handles exceptions correctly # bpo-30414: verify feeder handles exceptions correctly
@ -782,7 +798,9 @@ class _TestQueue(BaseTestCase):
q = self.Queue() q = self.Queue()
q.put(NotSerializable()) q.put(NotSerializable())
q.put(True) q.put(True)
self.assertTrue(q.get(timeout=0.1)) # bpo-30595: use a timeout of 1 second for slow buildbots
self.assertTrue(q.get(timeout=1.0))
close_queue(q)
# #
# #
@ -895,10 +913,12 @@ class _TestCondition(BaseTestCase):
p = self.Process(target=self.f, args=(cond, sleeping, woken)) p = self.Process(target=self.f, args=(cond, sleeping, woken))
p.daemon = True p.daemon = True
p.start() p.start()
self.addCleanup(p.join)
p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
p.daemon = True p.daemon = True
p.start() p.start()
self.addCleanup(p.join)
# wait for both children to start sleeping # wait for both children to start sleeping
sleeping.acquire() sleeping.acquire()
@ -941,11 +961,13 @@ class _TestCondition(BaseTestCase):
args=(cond, sleeping, woken, TIMEOUT1)) args=(cond, sleeping, woken, TIMEOUT1))
p.daemon = True p.daemon = True
p.start() p.start()
self.addCleanup(p.join)
t = threading.Thread(target=self.f, t = threading.Thread(target=self.f,
args=(cond, sleeping, woken, TIMEOUT1)) args=(cond, sleeping, woken, TIMEOUT1))
t.daemon = True t.daemon = True
t.start() t.start()
self.addCleanup(t.join)
# wait for them all to sleep # wait for them all to sleep
for i in range(6): for i in range(6):
@ -964,10 +986,12 @@ class _TestCondition(BaseTestCase):
p = self.Process(target=self.f, args=(cond, sleeping, woken)) p = self.Process(target=self.f, args=(cond, sleeping, woken))
p.daemon = True p.daemon = True
p.start() p.start()
self.addCleanup(p.join)
t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
t.daemon = True t.daemon = True
t.start() t.start()
self.addCleanup(t.join)
# wait for them to all sleep # wait for them to all sleep
for i in range(6): for i in range(6):
@ -1143,6 +1167,7 @@ class _TestEvent(BaseTestCase):
p.daemon = True p.daemon = True
p.start() p.start()
self.assertEqual(wait(), True) self.assertEqual(wait(), True)
p.join()
# #
# Tests for Barrier - adapted from tests in test/lock_tests.py # Tests for Barrier - adapted from tests in test/lock_tests.py
@ -1318,6 +1343,7 @@ class _TestBarrier(BaseTestCase):
self.run_threads(self._test_wait_return_f, (self.barrier, queue)) self.run_threads(self._test_wait_return_f, (self.barrier, queue))
results = [queue.get() for i in range(self.N)] results = [queue.get() for i in range(self.N)]
self.assertEqual(results.count(0), 1) self.assertEqual(results.count(0), 1)
close_queue(queue)
@classmethod @classmethod
def _test_action_f(cls, barrier, results): def _test_action_f(cls, barrier, results):
@ -1488,6 +1514,7 @@ class _TestBarrier(BaseTestCase):
p = self.Process(target=self._test_thousand_f, p = self.Process(target=self._test_thousand_f,
args=(self.barrier, passes, child_conn, lock)) args=(self.barrier, passes, child_conn, lock))
p.start() p.start()
self.addCleanup(p.join)
for i in range(passes): for i in range(passes):
for j in range(self.N): for j in range(self.N):
@ -2971,6 +2998,8 @@ class _TestPicklingConnections(BaseTestCase):
w.close() w.close()
self.assertEqual(conn.recv(), 'foobar'*2) self.assertEqual(conn.recv(), 'foobar'*2)
p.join()
# #
# #
# #
@ -3296,16 +3325,16 @@ class _TestLogging(BaseTestCase):
logger.setLevel(LEVEL1) logger.setLevel(LEVEL1)
p = self.Process(target=self._test_level, args=(writer,)) p = self.Process(target=self._test_level, args=(writer,))
p.daemon = True
p.start() p.start()
self.assertEqual(LEVEL1, reader.recv()) self.assertEqual(LEVEL1, reader.recv())
p.join()
logger.setLevel(logging.NOTSET) logger.setLevel(logging.NOTSET)
root_logger.setLevel(LEVEL2) root_logger.setLevel(LEVEL2)
p = self.Process(target=self._test_level, args=(writer,)) p = self.Process(target=self._test_level, args=(writer,))
p.daemon = True
p.start() p.start()
self.assertEqual(LEVEL2, reader.recv()) self.assertEqual(LEVEL2, reader.recv())
p.join()
root_logger.setLevel(root_level) root_logger.setLevel(root_level)
logger.setLevel(level=LOG_LEVEL) logger.setLevel(level=LOG_LEVEL)
@ -3459,7 +3488,7 @@ def _this_sub_process(q):
except pyqueue.Empty: except pyqueue.Empty:
pass pass
def _test_process(q): def _test_process():
queue = multiprocessing.Queue() queue = multiprocessing.Queue()
subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,)) subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
subProc.daemon = True subProc.daemon = True
@ -3499,8 +3528,7 @@ class _file_like(object):
class TestStdinBadfiledescriptor(unittest.TestCase): class TestStdinBadfiledescriptor(unittest.TestCase):
def test_queue_in_process(self): def test_queue_in_process(self):
queue = multiprocessing.Queue() proc = multiprocessing.Process(target=_test_process)
proc = multiprocessing.Process(target=_test_process, args=(queue,))
proc.start() proc.start()
proc.join() proc.join()
@ -4108,7 +4136,32 @@ class TestSimpleQueue(unittest.TestCase):
# Mixins # Mixins
# #
class ProcessesMixin(object): class BaseMixin(object):
@classmethod
def setUpClass(cls):
cls.dangling = (multiprocessing.process._dangling.copy(),
threading._dangling.copy())
@classmethod
def tearDownClass(cls):
# bpo-26762: Some multiprocessing objects like Pool create reference
# cycles. Trigger a garbage collection to break these cycles.
test.support.gc_collect()
processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
if processes:
print('Warning -- Dangling processes: %s' % processes,
file=sys.stderr)
processes = None
threads = set(threading._dangling) - set(cls.dangling[1])
if threads:
print('Warning -- Dangling threads: %s' % threads,
file=sys.stderr)
threads = None
class ProcessesMixin(BaseMixin):
TYPE = 'processes' TYPE = 'processes'
Process = multiprocessing.Process Process = multiprocessing.Process
connection = multiprocessing.connection connection = multiprocessing.connection
@ -4131,7 +4184,7 @@ class ProcessesMixin(object):
RawArray = staticmethod(multiprocessing.RawArray) RawArray = staticmethod(multiprocessing.RawArray)
class ManagerMixin(object): class ManagerMixin(BaseMixin):
TYPE = 'manager' TYPE = 'manager'
Process = multiprocessing.Process Process = multiprocessing.Process
Queue = property(operator.attrgetter('manager.Queue')) Queue = property(operator.attrgetter('manager.Queue'))
@ -4155,6 +4208,7 @@ class ManagerMixin(object):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
super().setUpClass()
cls.manager = multiprocessing.Manager() cls.manager = multiprocessing.Manager()
@classmethod @classmethod
@ -4162,23 +4216,35 @@ class ManagerMixin(object):
# only the manager process should be returned by active_children() # only the manager process should be returned by active_children()
# but this can take a bit on slow machines, so wait a few seconds # but this can take a bit on slow machines, so wait a few seconds
# if there are other children too (see #17395) # if there are other children too (see #17395)
start_time = time.monotonic()
t = 0.01 t = 0.01
while len(multiprocessing.active_children()) > 1 and t < 5: while len(multiprocessing.active_children()) > 1:
time.sleep(t) time.sleep(t)
t *= 2 t *= 2
dt = time.monotonic() - start_time
if dt >= 5.0:
print("Warning -- multiprocessing.Manager still has %s active "
"children after %s seconds"
% (multiprocessing.active_children(), dt),
file=sys.stderr)
break
gc.collect() # do garbage collection gc.collect() # do garbage collection
if cls.manager._number_of_objects() != 0: if cls.manager._number_of_objects() != 0:
# This is not really an error since some tests do not # This is not really an error since some tests do not
# ensure that all processes which hold a reference to a # ensure that all processes which hold a reference to a
# managed object have been joined. # managed object have been joined.
print('Shared objects which still exist at manager shutdown:') print('Warning -- Shared objects which still exist at manager '
'shutdown:')
print(cls.manager._debug_info()) print(cls.manager._debug_info())
cls.manager.shutdown() cls.manager.shutdown()
cls.manager.join() cls.manager.join()
cls.manager = None cls.manager = None
super().tearDownClass()
class ThreadsMixin(object):
class ThreadsMixin(BaseMixin):
TYPE = 'threads' TYPE = 'threads'
Process = multiprocessing.dummy.Process Process = multiprocessing.dummy.Process
connection = multiprocessing.dummy.connection connection = multiprocessing.dummy.connection
@ -4255,18 +4321,33 @@ def install_tests_in_module_dict(remote_globs, start_method):
multiprocessing.get_logger().setLevel(LOG_LEVEL) multiprocessing.get_logger().setLevel(LOG_LEVEL)
def tearDownModule(): def tearDownModule():
need_sleep = False
# bpo-26762: Some multiprocessing objects like Pool create reference
# cycles. Trigger a garbage collection to break these cycles.
test.support.gc_collect()
multiprocessing.set_start_method(old_start_method[0], force=True) multiprocessing.set_start_method(old_start_method[0], force=True)
# pause a bit so we don't get warning about dangling threads/processes # pause a bit so we don't get warning about dangling threads/processes
processes = set(multiprocessing.process._dangling) - set(dangling[0])
if processes:
need_sleep = True
print('Warning -- Dangling processes: %s' % processes,
file=sys.stderr)
processes = None
threads = set(threading._dangling) - set(dangling[1])
if threads:
need_sleep = True
print('Warning -- Dangling threads: %s' % threads,
file=sys.stderr)
threads = None
# Sleep 500 ms to give time to child processes to complete.
if need_sleep:
time.sleep(0.5) time.sleep(0.5)
multiprocessing.process._cleanup() multiprocessing.process._cleanup()
gc.collect() test.support.gc_collect()
tmp = set(multiprocessing.process._dangling) - set(dangling[0])
if tmp:
print('Dangling processes:', tmp, file=sys.stderr)
del tmp
tmp = set(threading._dangling) - set(dangling[1])
if tmp:
print('Dangling threads:', tmp, file=sys.stderr)
remote_globs['setUpModule'] = setUpModule remote_globs['setUpModule'] = setUpModule
remote_globs['tearDownModule'] = tearDownModule remote_globs['tearDownModule'] = tearDownModule