Add test.support.busy_retry() (#93770)

Add busy_retry() and sleeping_retry() functions to test.support.
This commit is contained in:
Victor Stinner 2022-06-15 11:42:10 +02:00 committed by GitHub
parent 4e9fa71d7e
commit 7e9eaad864
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 186 additions and 99 deletions

View file

@ -413,6 +413,51 @@ The :mod:`test.support` module defines the following constants:
The :mod:`test.support` module defines the following functions:
.. function:: busy_retry(timeout, err_msg=None, /, *, error=True)
Run the loop body until ``break`` stops the loop.
After *timeout* seconds, raise an :exc:`AssertionError` if *error* is true,
or just stop the loop if *error* is false.
Example::
for _ in support.busy_retry(support.SHORT_TIMEOUT):
if check():
break
Example of error=False usage::
for _ in support.busy_retry(support.SHORT_TIMEOUT, error=False):
if check():
break
else:
raise RuntimeError('my custom error')
.. function:: sleeping_retry(timeout, err_msg=None, /, *, init_delay=0.010, max_delay=1.0, error=True)
Wait strategy that applies exponential backoff.
Run the loop body until ``break`` stops the loop. Sleep at each loop
iteration, but not at the first iteration. The sleep delay is doubled at
each iteration (up to *max_delay* seconds).
See :func:`busy_retry` documentation for the parameters usage.
Example raising an exception after SHORT_TIMEOUT seconds::
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
if check():
break
Example of error=False usage::
for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
if check():
break
else:
raise RuntimeError('my custom error')
.. function:: is_resource_enabled(resource)
Return ``True`` if *resource* is enabled and available. The list of

View file

@ -4313,18 +4313,13 @@ class _TestSharedMemory(BaseTestCase):
p.terminate()
p.wait()
deadline = time.monotonic() + support.LONG_TIMEOUT
t = 0.1
while time.monotonic() < deadline:
time.sleep(t)
t = min(t*2, 5)
err_msg = ("A SharedMemory segment was leaked after "
"a process was abruptly terminated")
for _ in support.sleeping_retry(support.LONG_TIMEOUT, err_msg):
try:
smm = shared_memory.SharedMemory(name, create=False)
except FileNotFoundError:
break
else:
raise AssertionError("A SharedMemory segment was leaked after"
" a process was abruptly terminated.")
if os.name == 'posix':
# Without this line it was raising warnings like:
@ -5334,9 +5329,10 @@ class TestResourceTracker(unittest.TestCase):
p.terminate()
p.wait()
deadline = time.monotonic() + support.LONG_TIMEOUT
while time.monotonic() < deadline:
time.sleep(.5)
err_msg = (f"A {rtype} resource was leaked after a process was "
f"abruptly terminated")
for _ in support.sleeping_retry(support.SHORT_TIMEOUT,
err_msg):
try:
_resource_unlink(name2, rtype)
except OSError as e:
@ -5344,10 +5340,7 @@ class TestResourceTracker(unittest.TestCase):
# EINVAL
self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL))
break
else:
raise AssertionError(
f"A {rtype} resource was leaked after a process was "
f"abruptly terminated.")
err = p.stderr.read().decode('utf-8')
p.stderr.close()
expected = ('resource_tracker: There appear to be 2 leaked {} '
@ -5575,18 +5568,17 @@ class TestSyncManagerTypes(unittest.TestCase):
# but this can take a bit on slow machines, so wait a few seconds
# if there are other children too (see #17395).
join_process(self.proc)
start_time = time.monotonic()
t = 0.01
while len(multiprocessing.active_children()) > 1:
time.sleep(t)
t *= 2
dt = time.monotonic() - start_time
if dt >= 5.0:
test.support.environment_altered = True
support.print_warning(f"multiprocessing.Manager still has "
f"{multiprocessing.active_children()} "
f"active children after {dt} seconds")
for _ in support.sleeping_retry(5.0, error=False):
if len(multiprocessing.active_children()) <= 1:
break
else:
dt = time.monotonic() - start_time
support.environment_altered = True
support.print_warning(f"multiprocessing.Manager still has "
f"{multiprocessing.active_children()} "
f"active children after {dt:.1f} seconds")
def run_worker(self, worker, obj):
self.proc = multiprocessing.Process(target=worker, args=(obj, ))
@ -5884,17 +5876,15 @@ class ManagerMixin(BaseMixin):
# but this can take a bit on slow machines, so wait a few seconds
# if there are other children too (see #17395)
start_time = time.monotonic()
t = 0.01
while len(multiprocessing.active_children()) > 1:
time.sleep(t)
t *= 2
dt = time.monotonic() - start_time
if dt >= 5.0:
test.support.environment_altered = True
support.print_warning(f"multiprocessing.Manager still has "
f"{multiprocessing.active_children()} "
f"active children after {dt} seconds")
for _ in support.sleeping_retry(5.0, error=False):
if len(multiprocessing.active_children()) <= 1:
break
else:
dt = time.monotonic() - start_time
support.environment_altered = True
support.print_warning(f"multiprocessing.Manager still has "
f"{multiprocessing.active_children()} "
f"active children after {dt:.1f} seconds")
gc.collect() # do garbage collection
if cls.manager._number_of_objects() != 0:

View file

@ -54,10 +54,8 @@ class ForkWait(unittest.TestCase):
self.threads.append(thread)
# busy-loop to wait for threads
deadline = time.monotonic() + support.SHORT_TIMEOUT
while len(self.alive) < NUM_THREADS:
time.sleep(0.1)
if deadline < time.monotonic():
for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
if len(self.alive) >= NUM_THREADS:
break
a = sorted(self.alive.keys())

View file

@ -2250,3 +2250,79 @@ def late_deletion(obj):
pass
atfork_func.reference = ref_cycle
os.register_at_fork(before=atfork_func)
def busy_retry(timeout, err_msg=None, /, *, error=True):
"""
Run the loop body until "break" stops the loop.
After *timeout* seconds, raise an AssertionError if *error* is true,
or just stop if *error is false.
Example:
for _ in support.busy_retry(support.SHORT_TIMEOUT):
if check():
break
Example of error=False usage:
for _ in support.busy_retry(support.SHORT_TIMEOUT, error=False):
if check():
break
else:
raise RuntimeError('my custom error')
"""
if timeout <= 0:
raise ValueError("timeout must be greater than zero")
start_time = time.monotonic()
deadline = start_time + timeout
while True:
yield
if time.monotonic() >= deadline:
break
if error:
dt = time.monotonic() - start_time
msg = f"timeout ({dt:.1f} seconds)"
if err_msg:
msg = f"{msg}: {err_msg}"
raise AssertionError(msg)
def sleeping_retry(timeout, err_msg=None, /,
*, init_delay=0.010, max_delay=1.0, error=True):
"""
Wait strategy that applies exponential backoff.
Run the loop body until "break" stops the loop. Sleep at each loop
iteration, but not at the first iteration. The sleep delay is doubled at
each iteration (up to *max_delay* seconds).
See busy_retry() documentation for the parameters usage.
Example raising an exception after SHORT_TIMEOUT seconds:
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
if check():
break
Example of error=False usage:
for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
if check():
break
else:
raise RuntimeError('my custom error')
"""
delay = init_delay
for _ in busy_retry(timeout, err_msg, error=error):
yield
time.sleep(delay)
delay = min(delay * 2, max_delay)

View file

@ -45,12 +45,11 @@ def _wait_for_interp_to_run(interp, timeout=None):
# run subinterpreter eariler than the main thread in multiprocess.
if timeout is None:
timeout = support.SHORT_TIMEOUT
start_time = time.monotonic()
deadline = start_time + timeout
while not interpreters.is_running(interp):
if time.monotonic() > deadline:
raise RuntimeError('interp is not running')
time.sleep(0.010)
for _ in support.sleeping_retry(timeout, error=False):
if interpreters.is_running(interp):
break
else:
raise RuntimeError('interp is not running')
@contextlib.contextmanager

View file

@ -256,12 +256,12 @@ class FailingInitializerMixin(ExecutorMixin):
else:
with self.assertRaises(BrokenExecutor):
future.result()
# At some point, the executor should break
t1 = time.monotonic()
while not self.executor._broken:
if time.monotonic() - t1 > 5:
self.fail("executor not broken after 5 s.")
time.sleep(0.01)
for _ in support.sleeping_retry(5, "executor not broken"):
if self.executor._broken:
break
# ... and from this point submit() is guaranteed to fail
with self.assertRaises(BrokenExecutor):
self.executor.submit(get_init_status)

View file

@ -40,6 +40,7 @@ test_source = """\
import sys
import time
from multiprocessing import Pool, set_start_method
from test import support
# We use this __main__ defined function in the map call below in order to
# check that multiprocessing in correctly running the unguarded
@ -59,13 +60,11 @@ if __name__ == '__main__':
results = []
with Pool(5) as pool:
pool.map_async(f, [1, 2, 3], callback=results.extend)
start_time = time.monotonic()
while not results:
time.sleep(0.05)
# up to 1 min to report the results
dt = time.monotonic() - start_time
if dt > 60.0:
raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt)
# up to 1 min to report the results
for _ in support.sleeping_retry(60, "Timed out waiting for results"):
if results:
break
results.sort()
print(start_method, "->", results)
@ -86,19 +85,17 @@ if __name__ != "__main__":
import sys
import time
from multiprocessing import Pool, set_start_method
from test import support
start_method = sys.argv[1]
set_start_method(start_method)
results = []
with Pool(5) as pool:
pool.map_async(int, [1, 4, 9], callback=results.extend)
start_time = time.monotonic()
while not results:
time.sleep(0.05)
# up to 1 min to report the results
dt = time.monotonic() - start_time
if dt > 60.0:
raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt)
# up to 1 min to report the results
for _ in support.sleeping_retry(60, "Timed out waiting for results"):
if results:
break
results.sort()
print(start_method, "->", results)

View file

@ -812,13 +812,14 @@ class ItimerTest(unittest.TestCase):
signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
signal.setitimer(self.itimer, 0.3, 0.2)
start_time = time.monotonic()
while time.monotonic() - start_time < 60.0:
for _ in support.busy_retry(60.0, error=False):
# use up some virtual time by doing real work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
break # sig_vtalrm handler stopped this itimer
else: # Issue 8424
# sig_vtalrm handler stopped this itimer
break
else:
# bpo-8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")
@ -832,13 +833,14 @@ class ItimerTest(unittest.TestCase):
signal.signal(signal.SIGPROF, self.sig_prof)
signal.setitimer(self.itimer, 0.2, 0.2)
start_time = time.monotonic()
while time.monotonic() - start_time < 60.0:
for _ in support.busy_retry(60.0, error=False):
# do some work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
break # sig_prof handler stopped this itimer
else: # Issue 8424
# sig_prof handler stopped this itimer
break
else:
# bpo-8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")
@ -1307,8 +1309,6 @@ class StressTest(unittest.TestCase):
self.setsig(signal.SIGALRM, handler) # for ITIMER_REAL
expected_sigs = 0
deadline = time.monotonic() + support.SHORT_TIMEOUT
while expected_sigs < N:
# Hopefully the SIGALRM will be received somewhere during
# initial processing of SIGUSR1.
@ -1317,8 +1317,9 @@ class StressTest(unittest.TestCase):
expected_sigs += 2
# Wait for handlers to run to avoid signal coalescing
while len(sigs) < expected_sigs and time.monotonic() < deadline:
time.sleep(1e-5)
for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
if len(sigs) >= expected_sigs:
break
# All ITIMER_REAL signals should have been delivered to the
# Python handler

View file

@ -2262,11 +2262,8 @@ class SimpleBackgroundTests(unittest.TestCase):
# A simple IO loop. Call func(*args) depending on the error we get
# (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
timeout = kwargs.get('timeout', support.SHORT_TIMEOUT)
deadline = time.monotonic() + timeout
count = 0
while True:
if time.monotonic() > deadline:
self.fail("timeout")
for _ in support.busy_retry(timeout):
errno = None
count += 1
try:

View file

@ -9,7 +9,6 @@ import subprocess
import sys
import tempfile
import textwrap
import time
import unittest
import warnings
@ -461,18 +460,12 @@ class TestSupport(unittest.TestCase):
# child process: do nothing, just exit
os._exit(0)
t0 = time.monotonic()
deadline = time.monotonic() + support.SHORT_TIMEOUT
was_altered = support.environment_altered
try:
support.environment_altered = False
stderr = io.StringIO()
while True:
if time.monotonic() > deadline:
self.fail("timeout")
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
with support.swap_attr(support.print_warning, 'orig_stderr', stderr):
support.reap_children()
@ -481,9 +474,6 @@ class TestSupport(unittest.TestCase):
if support.environment_altered:
break
# loop until the child process completed
time.sleep(0.100)
msg = "Warning -- reap_children() reaped child process %s" % pid
self.assertIn(msg, stderr.getvalue())
self.assertTrue(support.environment_altered)

View file

@ -4,7 +4,6 @@
import os
import subprocess
import sys
import time
import unittest
from test.fork_wait import ForkWait
from test import support
@ -20,14 +19,12 @@ class Wait3Test(ForkWait):
# This many iterations can be required, since some previously run
# tests (e.g. test_ctypes) could have spawned a lot of children
# very quickly.
deadline = time.monotonic() + support.SHORT_TIMEOUT
while time.monotonic() <= deadline:
for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
# wait3() shouldn't hang, but some of the buildbots seem to hang
# in the forking tests. This is an attempt to fix the problem.
spid, status, rusage = os.wait3(os.WNOHANG)
if spid == cpid:
break
time.sleep(0.1)
self.assertEqual(spid, cpid)
self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)

View file

@ -2,7 +2,6 @@
"""
import os
import time
import sys
import unittest
from test.fork_wait import ForkWait
@ -22,14 +21,12 @@ class Wait4Test(ForkWait):
# Issue #11185: wait4 is broken on AIX and will always return 0
# with WNOHANG.
option = 0
deadline = time.monotonic() + support.SHORT_TIMEOUT
while time.monotonic() <= deadline:
for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
# wait4() shouldn't hang, but some of the buildbots seem to hang
# in the forking tests. This is an attempt to fix the problem.
spid, status, rusage = os.wait4(cpid, option)
if spid == cpid:
break
time.sleep(0.1)
self.assertEqual(spid, cpid)
self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
self.assertTrue(rusage)