gh-128364: Fix flaky test_concurrent_futures.test_wait tests (gh-130742)

Use events instead of relying on `time.sleep()`. The tests are also now about
four times faster.
This commit is contained in:
Sam Gross 2025-03-06 12:30:58 -05:00 committed by GitHub
parent 052cb717f5
commit c4d37eefb7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 119 additions and 58 deletions

View file

@ -1,9 +1,9 @@
import sys
import threading
import time
import unittest
from concurrent import futures
from test import support
from test.support import threading_helper
from .util import (
CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE,
@ -16,15 +16,15 @@ from .util import (
def mul(x, y):
return x * y
def sleep_and_raise(t):
time.sleep(t)
def wait_and_raise(e):
e.wait()
raise Exception('this is an exception')
class WaitTests:
def test_20369(self):
# See https://bugs.python.org/issue20369
future = self.executor.submit(time.sleep, 1.5)
future = self.executor.submit(mul, 1, 2)
done, not_done = futures.wait([future, future],
return_when=futures.ALL_COMPLETED)
self.assertEqual({future}, done)
@ -32,66 +32,102 @@ class WaitTests:
def test_first_completed(self):
event = self.create_event()
future1 = self.executor.submit(mul, 21, 2)
future2 = self.executor.submit(time.sleep, 1.5)
future2 = self.executor.submit(event.wait)
done, not_done = futures.wait(
[CANCELLED_FUTURE, future1, future2],
return_when=futures.FIRST_COMPLETED)
try:
done, not_done = futures.wait(
[CANCELLED_FUTURE, future1, future2],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(set([future1]), done)
self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
self.assertEqual(set([future1]), done)
self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
finally:
event.set()
future2.result() # wait for job to finish
def test_first_completed_some_already_completed(self):
future1 = self.executor.submit(time.sleep, 1.5)
event = self.create_event()
future1 = self.executor.submit(event.wait)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
return_when=futures.FIRST_COMPLETED)
try:
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(
set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
finished)
self.assertEqual(set([future1]), pending)
self.assertEqual(
set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
finished)
self.assertEqual(set([future1]), pending)
finally:
event.set()
future1.result() # wait for job to finish
@support.requires_resource('walltime')
def test_first_exception(self):
future1 = self.executor.submit(mul, 2, 21)
future2 = self.executor.submit(sleep_and_raise, 1.5)
future3 = self.executor.submit(time.sleep, 3)
event1 = self.create_event()
event2 = self.create_event()
try:
future1 = self.executor.submit(mul, 2, 21)
future2 = self.executor.submit(wait_and_raise, event1)
future3 = self.executor.submit(event2.wait)
finished, pending = futures.wait(
[future1, future2, future3],
return_when=futures.FIRST_EXCEPTION)
# Ensure that future1 is completed before future2 finishes
def wait_for_future1():
future1.result()
event1.set()
self.assertEqual(set([future1, future2]), finished)
self.assertEqual(set([future3]), pending)
t = threading.Thread(target=wait_for_future1)
t.start()
finished, pending = futures.wait(
[future1, future2, future3],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([future1, future2]), finished)
self.assertEqual(set([future3]), pending)
threading_helper.join_thread(t)
finally:
event1.set()
event2.set()
future3.result() # wait for job to finish
def test_first_exception_some_already_complete(self):
event = self.create_event()
future1 = self.executor.submit(divmod, 21, 0)
future2 = self.executor.submit(time.sleep, 1.5)
future2 = self.executor.submit(event.wait)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1, future2],
return_when=futures.FIRST_EXCEPTION)
try:
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1, future2],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1]), finished)
self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1]), finished)
self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
finally:
event.set()
future2.result() # wait for job to finish
def test_first_exception_one_already_failed(self):
future1 = self.executor.submit(time.sleep, 2)
event = self.create_event()
future1 = self.executor.submit(event.wait)
finished, pending = futures.wait(
[EXCEPTION_FUTURE, future1],
return_when=futures.FIRST_EXCEPTION)
try:
finished, pending = futures.wait(
[EXCEPTION_FUTURE, future1],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([EXCEPTION_FUTURE]), finished)
self.assertEqual(set([future1]), pending)
self.assertEqual(set([EXCEPTION_FUTURE]), finished)
self.assertEqual(set([future1]), pending)
finally:
event.set()
future1.result() # wait for job to finish
def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0)
@ -114,23 +150,27 @@ class WaitTests:
def test_timeout(self):
short_timeout = 0.050
long_timeout = short_timeout * 10
future = self.executor.submit(time.sleep, long_timeout)
event = self.create_event()
future = self.executor.submit(event.wait)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future],
timeout=short_timeout,
return_when=futures.ALL_COMPLETED)
try:
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future],
timeout=short_timeout,
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE]),
finished)
self.assertEqual(set([future]), pending)
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE]),
finished)
self.assertEqual(set([future]), pending)
finally:
event.set()
future.result() # wait for job to finish
class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):

View file

@ -1,5 +1,6 @@
import multiprocessing
import sys
import threading
import time
import unittest
from concurrent import futures
@ -50,14 +51,19 @@ class ExecutorMixin:
max_workers=self.worker_count,
mp_context=self.get_context(),
**self.executor_kwargs)
self.manager = self.get_context().Manager()
else:
self.executor = self.executor_type(
max_workers=self.worker_count,
**self.executor_kwargs)
self.manager = None
def tearDown(self):
self.executor.shutdown(wait=True)
self.executor = None
if self.manager is not None:
self.manager.shutdown()
self.manager = None
dt = time.monotonic() - self.t1
if support.verbose:
@ -73,11 +79,17 @@ class ExecutorMixin:
class ThreadPoolMixin(ExecutorMixin):
executor_type = futures.ThreadPoolExecutor
def create_event(self):
return threading.Event()
@support.skip_if_sanitizer("gh-129824: data races in InterpreterPool tests", thread=True)
class InterpreterPoolMixin(ExecutorMixin):
executor_type = futures.InterpreterPoolExecutor
def create_event(self):
self.skipTest("InterpreterPoolExecutor doesn't support events")
class ProcessPoolForkMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor
@ -94,6 +106,9 @@ class ProcessPoolForkMixin(ExecutorMixin):
self.skipTest("TSAN doesn't support threads after fork")
return super().get_context()
def create_event(self):
return self.manager.Event()
class ProcessPoolSpawnMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor
@ -106,6 +121,9 @@ class ProcessPoolSpawnMixin(ExecutorMixin):
self.skipTest("ProcessPoolExecutor unavailable on this system")
return super().get_context()
def create_event(self):
return self.manager.Event()
class ProcessPoolForkserverMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor
@ -122,6 +140,9 @@ class ProcessPoolForkserverMixin(ExecutorMixin):
self.skipTest("TSAN doesn't support threads after fork")
return super().get_context()
def create_event(self):
return self.manager.Event()
def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,),
executor_mixins=(ThreadPoolMixin,