[3.13] gh-128364: Fix flaky test_concurrent_futures.test_wait tests (gh-130742) (#130922)

Use events instead of relying on `time.sleep()`. The tests are also now about
four times faster.
(cherry picked from commit c4d37eefb7)
This commit is contained in:
Sam Gross 2025-03-06 13:49:03 -05:00 committed by GitHub
parent e285232c76
commit 0c088e4442
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 116 additions and 58 deletions

View file

@ -1,9 +1,9 @@
import sys import sys
import threading import threading
import time
import unittest import unittest
from concurrent import futures from concurrent import futures
from test import support from test import support
from test.support import threading_helper
from .util import ( from .util import (
CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE,
@ -16,15 +16,15 @@ from .util import (
def mul(x, y): def mul(x, y):
return x * y return x * y
def sleep_and_raise(t): def wait_and_raise(e):
time.sleep(t) e.wait()
raise Exception('this is an exception') raise Exception('this is an exception')
class WaitTests: class WaitTests:
def test_20369(self): def test_20369(self):
# See https://bugs.python.org/issue20369 # See https://bugs.python.org/issue20369
future = self.executor.submit(time.sleep, 1.5) future = self.executor.submit(mul, 1, 2)
done, not_done = futures.wait([future, future], done, not_done = futures.wait([future, future],
return_when=futures.ALL_COMPLETED) return_when=futures.ALL_COMPLETED)
self.assertEqual({future}, done) self.assertEqual({future}, done)
@ -32,66 +32,102 @@ class WaitTests:
def test_first_completed(self): def test_first_completed(self):
event = self.create_event()
future1 = self.executor.submit(mul, 21, 2) future1 = self.executor.submit(mul, 21, 2)
future2 = self.executor.submit(time.sleep, 1.5) future2 = self.executor.submit(event.wait)
done, not_done = futures.wait( try:
[CANCELLED_FUTURE, future1, future2], done, not_done = futures.wait(
return_when=futures.FIRST_COMPLETED) [CANCELLED_FUTURE, future1, future2],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(set([future1]), done) self.assertEqual(set([future1]), done)
self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
finally:
event.set()
future2.result() # wait for job to finish
def test_first_completed_some_already_completed(self): def test_first_completed_some_already_completed(self):
future1 = self.executor.submit(time.sleep, 1.5) event = self.create_event()
future1 = self.executor.submit(event.wait)
finished, pending = futures.wait( try:
[CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], finished, pending = futures.wait(
return_when=futures.FIRST_COMPLETED) [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
return_when=futures.FIRST_COMPLETED)
self.assertEqual( self.assertEqual(
set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]), set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
finished) finished)
self.assertEqual(set([future1]), pending) self.assertEqual(set([future1]), pending)
finally:
event.set()
future1.result() # wait for job to finish
@support.requires_resource('walltime')
def test_first_exception(self): def test_first_exception(self):
future1 = self.executor.submit(mul, 2, 21) event1 = self.create_event()
future2 = self.executor.submit(sleep_and_raise, 1.5) event2 = self.create_event()
future3 = self.executor.submit(time.sleep, 3) try:
future1 = self.executor.submit(mul, 2, 21)
future2 = self.executor.submit(wait_and_raise, event1)
future3 = self.executor.submit(event2.wait)
finished, pending = futures.wait( # Ensure that future1 is completed before future2 finishes
[future1, future2, future3], def wait_for_future1():
return_when=futures.FIRST_EXCEPTION) future1.result()
event1.set()
self.assertEqual(set([future1, future2]), finished) t = threading.Thread(target=wait_for_future1)
self.assertEqual(set([future3]), pending) t.start()
finished, pending = futures.wait(
[future1, future2, future3],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([future1, future2]), finished)
self.assertEqual(set([future3]), pending)
threading_helper.join_thread(t)
finally:
event1.set()
event2.set()
future3.result() # wait for job to finish
def test_first_exception_some_already_complete(self): def test_first_exception_some_already_complete(self):
event = self.create_event()
future1 = self.executor.submit(divmod, 21, 0) future1 = self.executor.submit(divmod, 21, 0)
future2 = self.executor.submit(time.sleep, 1.5) future2 = self.executor.submit(event.wait)
finished, pending = futures.wait( try:
[SUCCESSFUL_FUTURE, finished, pending = futures.wait(
CANCELLED_FUTURE, [SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE, CANCELLED_FUTURE,
future1, future2], CANCELLED_AND_NOTIFIED_FUTURE,
return_when=futures.FIRST_EXCEPTION) future1, future2],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([SUCCESSFUL_FUTURE, self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE,
future1]), finished) future1]), finished)
self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
finally:
event.set()
future2.result() # wait for job to finish
def test_first_exception_one_already_failed(self): def test_first_exception_one_already_failed(self):
future1 = self.executor.submit(time.sleep, 2) event = self.create_event()
future1 = self.executor.submit(event.wait)
finished, pending = futures.wait( try:
[EXCEPTION_FUTURE, future1], finished, pending = futures.wait(
return_when=futures.FIRST_EXCEPTION) [EXCEPTION_FUTURE, future1],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([EXCEPTION_FUTURE]), finished) self.assertEqual(set([EXCEPTION_FUTURE]), finished)
self.assertEqual(set([future1]), pending) self.assertEqual(set([future1]), pending)
finally:
event.set()
future1.result() # wait for job to finish
def test_all_completed(self): def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0) future1 = self.executor.submit(divmod, 2, 0)
@ -114,23 +150,27 @@ class WaitTests:
def test_timeout(self): def test_timeout(self):
short_timeout = 0.050 short_timeout = 0.050
long_timeout = short_timeout * 10
future = self.executor.submit(time.sleep, long_timeout) event = self.create_event()
future = self.executor.submit(event.wait)
finished, pending = futures.wait( try:
[CANCELLED_AND_NOTIFIED_FUTURE, finished, pending = futures.wait(
EXCEPTION_FUTURE, [CANCELLED_AND_NOTIFIED_FUTURE,
SUCCESSFUL_FUTURE, EXCEPTION_FUTURE,
future], SUCCESSFUL_FUTURE,
timeout=short_timeout, future],
return_when=futures.ALL_COMPLETED) timeout=short_timeout,
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE, EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE]), SUCCESSFUL_FUTURE]),
finished) finished)
self.assertEqual(set([future]), pending) self.assertEqual(set([future]), pending)
finally:
event.set()
future.result() # wait for job to finish
class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase): class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):

View file

@ -1,5 +1,6 @@
import multiprocessing import multiprocessing
import sys import sys
import threading
import time import time
import unittest import unittest
from concurrent import futures from concurrent import futures
@ -50,14 +51,19 @@ class ExecutorMixin:
max_workers=self.worker_count, max_workers=self.worker_count,
mp_context=self.get_context(), mp_context=self.get_context(),
**self.executor_kwargs) **self.executor_kwargs)
self.manager = self.get_context().Manager()
else: else:
self.executor = self.executor_type( self.executor = self.executor_type(
max_workers=self.worker_count, max_workers=self.worker_count,
**self.executor_kwargs) **self.executor_kwargs)
self.manager = None
def tearDown(self): def tearDown(self):
self.executor.shutdown(wait=True) self.executor.shutdown(wait=True)
self.executor = None self.executor = None
if self.manager is not None:
self.manager.shutdown()
self.manager = None
dt = time.monotonic() - self.t1 dt = time.monotonic() - self.t1
if support.verbose: if support.verbose:
@ -73,6 +79,9 @@ class ExecutorMixin:
class ThreadPoolMixin(ExecutorMixin): class ThreadPoolMixin(ExecutorMixin):
executor_type = futures.ThreadPoolExecutor executor_type = futures.ThreadPoolExecutor
def create_event(self):
return threading.Event()
class ProcessPoolForkMixin(ExecutorMixin): class ProcessPoolForkMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor executor_type = futures.ProcessPoolExecutor
@ -89,6 +98,9 @@ class ProcessPoolForkMixin(ExecutorMixin):
self.skipTest("TSAN doesn't support threads after fork") self.skipTest("TSAN doesn't support threads after fork")
return super().get_context() return super().get_context()
def create_event(self):
return self.manager.Event()
class ProcessPoolSpawnMixin(ExecutorMixin): class ProcessPoolSpawnMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor executor_type = futures.ProcessPoolExecutor
@ -101,6 +113,9 @@ class ProcessPoolSpawnMixin(ExecutorMixin):
self.skipTest("ProcessPoolExecutor unavailable on this system") self.skipTest("ProcessPoolExecutor unavailable on this system")
return super().get_context() return super().get_context()
def create_event(self):
return self.manager.Event()
class ProcessPoolForkserverMixin(ExecutorMixin): class ProcessPoolForkserverMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor executor_type = futures.ProcessPoolExecutor
@ -117,6 +132,9 @@ class ProcessPoolForkserverMixin(ExecutorMixin):
self.skipTest("TSAN doesn't support threads after fork") self.skipTest("TSAN doesn't support threads after fork")
return super().get_context() return super().get_context()
def create_event(self):
return self.manager.Event()
def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,), def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,),
executor_mixins=(ThreadPoolMixin, executor_mixins=(ThreadPoolMixin,