gh-111284: Make multiprocessing tests with threads faster and more reliable (GH-111285)

This commit is contained in:
Serhiy Storchaka 2023-10-30 19:18:36 +02:00 committed by GitHub
parent 55df2deb1a
commit 624ace5a2f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -2438,8 +2438,11 @@ class _TestContainers(BaseTestCase):
#
#
def sqr(x, wait=0.0):
time.sleep(wait)
def sqr(x, wait=0.0, event=None):
if event is None:
time.sleep(wait)
else:
event.wait(wait)
return x*x
def mul(x, y):
@ -2578,10 +2581,18 @@ class _TestPool(BaseTestCase):
self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
def test_async_timeout(self):
res = self.pool.apply_async(sqr, (6, TIMEOUT2 + support.SHORT_TIMEOUT))
get = TimingWrapper(res.get)
self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
p = self.Pool(3)
try:
event = threading.Event() if self.TYPE == 'threads' else None
res = p.apply_async(sqr, (6, TIMEOUT2 + support.SHORT_TIMEOUT, event))
get = TimingWrapper(res.get)
self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
finally:
if event is not None:
event.set()
p.terminate()
p.join()
def test_imap(self):
it = self.pool.imap(sqr, list(range(10)))
@ -2683,10 +2694,11 @@ class _TestPool(BaseTestCase):
def test_terminate(self):
# Simulate slow tasks which take "forever" to complete
p = self.Pool(3)
args = [support.LONG_TIMEOUT for i in range(10_000)]
result = self.pool.map_async(time.sleep, args, chunksize=1)
self.pool.terminate()
self.pool.join()
result = p.map_async(time.sleep, args, chunksize=1)
p.terminate()
p.join()
def test_empty_iterable(self):
# See Issue 12157