test_multiprocessing: Fix dangling process/thread (#2850)

bpo-26762: Fix more dangling processes and threads in
test_multiprocessing:

* Queue: call close() followed by join_thread()
* Process: call join() or self.addCleanup(p.join)
This commit is contained in:
Victor Stinner 2017-07-25 00:33:56 +02:00 committed by GitHub
parent d439d3e291
commit d7e64d9934

View file

@ -32,11 +32,12 @@ test.support.import_module('multiprocessing.synchronize')
# without thread support. # without thread support.
import threading import threading
import multiprocessing.dummy
import multiprocessing.connection import multiprocessing.connection
import multiprocessing.managers import multiprocessing.dummy
import multiprocessing.heap import multiprocessing.heap
import multiprocessing.managers
import multiprocessing.pool import multiprocessing.pool
import multiprocessing.queues
from multiprocessing import util from multiprocessing import util
@ -64,6 +65,13 @@ except ImportError:
def latin(s): def latin(s):
return s.encode('latin') return s.encode('latin')
def close_queue(queue):
if isinstance(queue, multiprocessing.queues.Queue):
queue.close()
queue.join_thread()
# #
# Constants # Constants
# #
@ -825,6 +833,7 @@ class _TestQueue(BaseTestCase):
self.assertEqual(q.qsize(), 1) self.assertEqual(q.qsize(), 1)
q.get() q.get()
self.assertEqual(q.qsize(), 0) self.assertEqual(q.qsize(), 0)
close_queue(q)
@classmethod @classmethod
def _test_task_done(cls, q): def _test_task_done(cls, q):
@ -897,6 +906,7 @@ class _TestQueue(BaseTestCase):
q.put(True) q.put(True)
# bpo-30595: use a timeout of 1 second for slow buildbots # bpo-30595: use a timeout of 1 second for slow buildbots
self.assertTrue(q.get(timeout=1.0)) self.assertTrue(q.get(timeout=1.0))
close_queue(q)
# #
# #
@ -1020,10 +1030,12 @@ class _TestCondition(BaseTestCase):
p = self.Process(target=self.f, args=(cond, sleeping, woken)) p = self.Process(target=self.f, args=(cond, sleeping, woken))
p.daemon = True p.daemon = True
p.start() p.start()
self.addCleanup(p.join)
p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
p.daemon = True p.daemon = True
p.start() p.start()
self.addCleanup(p.join)
# wait for both children to start sleeping # wait for both children to start sleeping
sleeping.acquire() sleeping.acquire()
@ -1066,11 +1078,13 @@ class _TestCondition(BaseTestCase):
args=(cond, sleeping, woken, TIMEOUT1)) args=(cond, sleeping, woken, TIMEOUT1))
p.daemon = True p.daemon = True
p.start() p.start()
self.addCleanup(p.join)
t = threading.Thread(target=self.f, t = threading.Thread(target=self.f,
args=(cond, sleeping, woken, TIMEOUT1)) args=(cond, sleeping, woken, TIMEOUT1))
t.daemon = True t.daemon = True
t.start() t.start()
self.addCleanup(t.join)
# wait for them all to sleep # wait for them all to sleep
for i in range(6): for i in range(6):
@ -1089,10 +1103,12 @@ class _TestCondition(BaseTestCase):
p = self.Process(target=self.f, args=(cond, sleeping, woken)) p = self.Process(target=self.f, args=(cond, sleeping, woken))
p.daemon = True p.daemon = True
p.start() p.start()
self.addCleanup(p.join)
t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
t.daemon = True t.daemon = True
t.start() t.start()
self.addCleanup(t.join)
# wait for them to all sleep # wait for them to all sleep
for i in range(6): for i in range(6):
@ -1123,10 +1139,12 @@ class _TestCondition(BaseTestCase):
p = self.Process(target=self.f, args=(cond, sleeping, woken)) p = self.Process(target=self.f, args=(cond, sleeping, woken))
p.daemon = True p.daemon = True
p.start() p.start()
self.addCleanup(p.join)
t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
t.daemon = True t.daemon = True
t.start() t.start()
self.addCleanup(t.join)
# wait for them to all sleep # wait for them to all sleep
for i in range(6): for i in range(6):
@ -1309,6 +1327,7 @@ class _TestEvent(BaseTestCase):
p.daemon = True p.daemon = True
p.start() p.start()
self.assertEqual(wait(), True) self.assertEqual(wait(), True)
p.join()
# #
# Tests for Barrier - adapted from tests in test/lock_tests.py # Tests for Barrier - adapted from tests in test/lock_tests.py
@ -1654,6 +1673,7 @@ class _TestBarrier(BaseTestCase):
p = self.Process(target=self._test_thousand_f, p = self.Process(target=self._test_thousand_f,
args=(self.barrier, passes, child_conn, lock)) args=(self.barrier, passes, child_conn, lock))
p.start() p.start()
self.addCleanup(p.join)
for i in range(passes): for i in range(passes):
for j in range(self.N): for j in range(self.N):