[3.12] gh-109974: Fix threading lock_tests race conditions (#110057) (#110346)

* gh-109974: Fix threading lock_tests race conditions (#110057)

Fix race conditions in test_threading lock tests. Wait until a
condition is met rather than using time.sleep() with a hardcoded
number of seconds.

* Replace sleeping loops with support.sleeping_retry() which raises
  an exception on timeout.
* Add wait_threads_blocked(nthread) which computes a sleep depending
  on the number of threads. Remove _wait() function.
* test_set_and_clear(): use a way longer Event.wait() timeout.
* BarrierTests.test_repr(): wait until the 2 threads are waiting for
  the barrier. Use a way longer timeout for Barrier.wait() timeout.
* test_thread_leak() no longer needs to count
  len(threading.enumerate()): Bunch uses
  threading_helper.wait_threads_exit() internally which does it in
  wait_for_finished().
* Add BaseLockTests.wait_phase() which implements a timeout.
  test_reacquire() and test_recursion_count() use wait_phase().

(cherry picked from commit 4e356ad183)

* gh-109974: Fix more threading lock_tests race conditions (#110089)

* Add context manager on Bunch class.
* Bunch now catchs exceptions on executed functions and re-raise them
  at __exit__() as an ExceptionGroup.
* Rewrite BarrierProxy.test_default_timeout(). Use a single thread.
  Only check that barrier.wait() blocks for at least default timeout
  seconds.
* test_with(): inline _with() function.

(cherry picked from commit 743e3572ee)
This commit is contained in:
Victor Stinner 2023-10-04 13:26:45 +02:00 committed by GitHub
parent f53871e1e8
commit 1d032ea3d6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 378 additions and 249 deletions

View file

@ -19,54 +19,74 @@ requires_fork = unittest.skipUnless(support.has_fork_support,
"(no _at_fork_reinit method)") "(no _at_fork_reinit method)")
def _wait(): def wait_threads_blocked(nthread):
# A crude wait/yield function not relying on synchronization primitives. # Arbitrary sleep to wait until N threads are blocked,
time.sleep(0.01) # like waiting for a lock.
time.sleep(0.010 * nthread)
class Bunch(object): class Bunch(object):
""" """
A bunch of threads. A bunch of threads.
""" """
def __init__(self, f, n, wait_before_exit=False): def __init__(self, func, nthread, wait_before_exit=False):
""" """
Construct a bunch of `n` threads running the same function `f`. Construct a bunch of `nthread` threads running the same function `func`.
If `wait_before_exit` is True, the threads won't terminate until If `wait_before_exit` is True, the threads won't terminate until
do_finish() is called. do_finish() is called.
""" """
self.f = f self.func = func
self.n = n self.nthread = nthread
self.started = [] self.started = []
self.finished = [] self.finished = []
self.exceptions = []
self._can_exit = not wait_before_exit self._can_exit = not wait_before_exit
self.wait_thread = threading_helper.wait_threads_exit() self._wait_thread = None
self.wait_thread.__enter__()
def task(): def task(self):
tid = threading.get_ident() tid = threading.get_ident()
self.started.append(tid) self.started.append(tid)
try: try:
f() self.func()
finally: except BaseException as exc:
self.finished.append(tid) self.exceptions.append(exc)
while not self._can_exit: finally:
_wait() self.finished.append(tid)
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
if self._can_exit:
break
def __enter__(self):
self._wait_thread = threading_helper.wait_threads_exit(support.SHORT_TIMEOUT)
self._wait_thread.__enter__()
try: try:
for i in range(n): for _ in range(self.nthread):
start_new_thread(task, ()) start_new_thread(self.task, ())
except: except:
self._can_exit = True self._can_exit = True
raise raise
def wait_for_started(self): for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
while len(self.started) < self.n: if len(self.started) >= self.nthread:
_wait() break
def wait_for_finished(self): return self
while len(self.finished) < self.n:
_wait() def __exit__(self, exc_type, exc_value, traceback):
# Wait for threads exit for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
self.wait_thread.__exit__(None, None, None) if len(self.finished) >= self.nthread:
break
# Wait until threads completely exit according to _thread._count()
self._wait_thread.__exit__(None, None, None)
# Break reference cycle
exceptions = self.exceptions
self.exceptions = None
if exceptions:
raise ExceptionGroup(f"{self.func} threads raised exceptions",
exceptions)
def do_finish(self): def do_finish(self):
self._can_exit = True self._can_exit = True
@ -94,6 +114,12 @@ class BaseLockTests(BaseTestCase):
Tests for both recursive and non-recursive locks. Tests for both recursive and non-recursive locks.
""" """
def wait_phase(self, phase, expected):
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
if len(phase) >= expected:
break
self.assertEqual(len(phase), expected)
def test_constructor(self): def test_constructor(self):
lock = self.locktype() lock = self.locktype()
del lock del lock
@ -131,41 +157,57 @@ class BaseLockTests(BaseTestCase):
result = [] result = []
def f(): def f():
result.append(lock.acquire(False)) result.append(lock.acquire(False))
Bunch(f, 1).wait_for_finished() with Bunch(f, 1):
pass
self.assertFalse(result[0]) self.assertFalse(result[0])
lock.release() lock.release()
def test_acquire_contended(self): def test_acquire_contended(self):
lock = self.locktype() lock = self.locktype()
lock.acquire() lock.acquire()
N = 5
def f(): def f():
lock.acquire() lock.acquire()
lock.release() lock.release()
b = Bunch(f, N) N = 5
b.wait_for_started() with Bunch(f, N) as bunch:
_wait() # Threads block on lock.acquire()
self.assertEqual(len(b.finished), 0) wait_threads_blocked(N)
lock.release() self.assertEqual(len(bunch.finished), 0)
b.wait_for_finished()
self.assertEqual(len(b.finished), N) # Threads unblocked
lock.release()
self.assertEqual(len(bunch.finished), N)
def test_with(self): def test_with(self):
lock = self.locktype() lock = self.locktype()
def f(): def f():
lock.acquire() lock.acquire()
lock.release() lock.release()
def _with(err=None):
def with_lock(err=None):
with lock: with lock:
if err is not None: if err is not None:
raise err raise err
_with()
# Check the lock is unacquired # Acquire the lock, do nothing, with releases the lock
Bunch(f, 1).wait_for_finished() with lock:
self.assertRaises(TypeError, _with, TypeError) pass
# Check the lock is unacquired
Bunch(f, 1).wait_for_finished() # Check that the lock is unacquired
with Bunch(f, 1):
pass
# Acquire the lock, raise an exception, with releases the lock
with self.assertRaises(TypeError):
with lock:
raise TypeError
# Check that the lock is unacquired even if after an exception
# was raised in the previous "with lock:" block
with Bunch(f, 1):
pass
def test_thread_leak(self): def test_thread_leak(self):
# The lock shouldn't leak a Thread instance when used from a foreign # The lock shouldn't leak a Thread instance when used from a foreign
@ -174,17 +216,11 @@ class BaseLockTests(BaseTestCase):
def f(): def f():
lock.acquire() lock.acquire()
lock.release() lock.release()
n = len(threading.enumerate())
# We run many threads in the hope that existing threads ids won't # We run many threads in the hope that existing threads ids won't
# be recycled. # be recycled.
Bunch(f, 15).wait_for_finished() with Bunch(f, 15):
if len(threading.enumerate()) != n: pass
# There is a small window during which a Thread instance's
# target function has finished running, but the Thread is still
# alive and registered. Avoid spurious failures by waiting a
# bit more (seen on a buildbot).
time.sleep(0.4)
self.assertEqual(n, len(threading.enumerate()))
def test_timeout(self): def test_timeout(self):
lock = self.locktype() lock = self.locktype()
@ -208,7 +244,8 @@ class BaseLockTests(BaseTestCase):
results.append(lock.acquire(timeout=0.5)) results.append(lock.acquire(timeout=0.5))
t2 = time.monotonic() t2 = time.monotonic()
results.append(t2 - t1) results.append(t2 - t1)
Bunch(f, 1).wait_for_finished() with Bunch(f, 1):
pass
self.assertFalse(results[0]) self.assertFalse(results[0])
self.assertTimeout(results[1], 0.5) self.assertTimeout(results[1], 0.5)
@ -242,15 +279,13 @@ class LockTests(BaseLockTests):
phase.append(None) phase.append(None)
with threading_helper.wait_threads_exit(): with threading_helper.wait_threads_exit():
# Thread blocked on lock.acquire()
start_new_thread(f, ()) start_new_thread(f, ())
while len(phase) == 0: self.wait_phase(phase, 1)
_wait()
_wait() # Thread unblocked
self.assertEqual(len(phase), 1)
lock.release() lock.release()
while len(phase) == 1: self.wait_phase(phase, 2)
_wait()
self.assertEqual(len(phase), 2)
def test_different_thread(self): def test_different_thread(self):
# Lock can be released from a different thread. # Lock can be released from a different thread.
@ -258,8 +293,8 @@ class LockTests(BaseLockTests):
lock.acquire() lock.acquire()
def f(): def f():
lock.release() lock.release()
b = Bunch(f, 1) with Bunch(f, 1):
b.wait_for_finished() pass
lock.acquire() lock.acquire()
lock.release() lock.release()
@ -349,21 +384,20 @@ class RLockTests(BaseLockTests):
def f(): def f():
lock.acquire() lock.acquire()
phase.append(None) phase.append(None)
while len(phase) == 1:
_wait() self.wait_phase(phase, 2)
lock.release() lock.release()
phase.append(None) phase.append(None)
with threading_helper.wait_threads_exit(): with threading_helper.wait_threads_exit():
# Thread blocked on lock.acquire()
start_new_thread(f, ()) start_new_thread(f, ())
while len(phase) == 0: self.wait_phase(phase, 1)
_wait()
self.assertEqual(len(phase), 1)
self.assertEqual(0, lock._recursion_count()) self.assertEqual(0, lock._recursion_count())
# Thread unblocked
phase.append(None) phase.append(None)
while len(phase) == 2: self.wait_phase(phase, 3)
_wait()
self.assertEqual(len(phase), 3)
self.assertEqual(0, lock._recursion_count()) self.assertEqual(0, lock._recursion_count())
def test_different_thread(self): def test_different_thread(self):
@ -371,12 +405,12 @@ class RLockTests(BaseLockTests):
lock = self.locktype() lock = self.locktype()
def f(): def f():
lock.acquire() lock.acquire()
b = Bunch(f, 1, True)
try: with Bunch(f, 1, True) as bunch:
self.assertRaises(RuntimeError, lock.release) try:
finally: self.assertRaises(RuntimeError, lock.release)
b.do_finish() finally:
b.wait_for_finished() bunch.do_finish()
def test__is_owned(self): def test__is_owned(self):
lock = self.locktype() lock = self.locktype()
@ -388,7 +422,8 @@ class RLockTests(BaseLockTests):
result = [] result = []
def f(): def f():
result.append(lock._is_owned()) result.append(lock._is_owned())
Bunch(f, 1).wait_for_finished() with Bunch(f, 1):
pass
self.assertFalse(result[0]) self.assertFalse(result[0])
lock.release() lock.release()
self.assertTrue(lock._is_owned()) self.assertTrue(lock._is_owned())
@ -421,12 +456,15 @@ class EventTests(BaseTestCase):
def f(): def f():
results1.append(evt.wait()) results1.append(evt.wait())
results2.append(evt.wait()) results2.append(evt.wait())
b = Bunch(f, N)
b.wait_for_started() with Bunch(f, N):
_wait() # Threads blocked on first evt.wait()
self.assertEqual(len(results1), 0) wait_threads_blocked(N)
evt.set() self.assertEqual(len(results1), 0)
b.wait_for_finished()
# Threads unblocked
evt.set()
self.assertEqual(results1, [True] * N) self.assertEqual(results1, [True] * N)
self.assertEqual(results2, [True] * N) self.assertEqual(results2, [True] * N)
@ -449,35 +487,43 @@ class EventTests(BaseTestCase):
r = evt.wait(0.5) r = evt.wait(0.5)
t2 = time.monotonic() t2 = time.monotonic()
results2.append((r, t2 - t1)) results2.append((r, t2 - t1))
Bunch(f, N).wait_for_finished()
with Bunch(f, N):
pass
self.assertEqual(results1, [False] * N) self.assertEqual(results1, [False] * N)
for r, dt in results2: for r, dt in results2:
self.assertFalse(r) self.assertFalse(r)
self.assertTimeout(dt, 0.5) self.assertTimeout(dt, 0.5)
# The event is set # The event is set
results1 = [] results1 = []
results2 = [] results2 = []
evt.set() evt.set()
Bunch(f, N).wait_for_finished() with Bunch(f, N):
pass
self.assertEqual(results1, [True] * N) self.assertEqual(results1, [True] * N)
for r, dt in results2: for r, dt in results2:
self.assertTrue(r) self.assertTrue(r)
def test_set_and_clear(self): def test_set_and_clear(self):
# Issue #13502: check that wait() returns true even when the event is # gh-57711: check that wait() returns true even when the event is
# cleared before the waiting thread is woken up. # cleared before the waiting thread is woken up.
evt = self.eventtype() event = self.eventtype()
results = [] results = []
timeout = 0.250
N = 5
def f(): def f():
results.append(evt.wait(timeout * 4)) results.append(event.wait(support.LONG_TIMEOUT))
b = Bunch(f, N)
b.wait_for_started() N = 5
time.sleep(timeout) with Bunch(f, N):
evt.set() # Threads blocked on event.wait()
evt.clear() wait_threads_blocked(N)
b.wait_for_finished()
# Threads unblocked
event.set()
event.clear()
self.assertEqual(results, [True] * N) self.assertEqual(results, [True] * N)
@requires_fork @requires_fork
@ -533,15 +579,14 @@ class ConditionTests(BaseTestCase):
# Note that this test is sensitive to timing. If the worker threads # Note that this test is sensitive to timing. If the worker threads
# don't execute in a timely fashion, the main thread may think they # don't execute in a timely fashion, the main thread may think they
# are further along then they are. The main thread therefore issues # are further along then they are. The main thread therefore issues
# _wait() statements to try to make sure that it doesn't race ahead # wait_threads_blocked() statements to try to make sure that it doesn't
# of the workers. # race ahead of the workers.
# Secondly, this test assumes that condition variables are not subject # Secondly, this test assumes that condition variables are not subject
# to spurious wakeups. The absence of spurious wakeups is an implementation # to spurious wakeups. The absence of spurious wakeups is an implementation
# detail of Condition Variables in current CPython, but in general, not # detail of Condition Variables in current CPython, but in general, not
# a guaranteed property of condition variables as a programming # a guaranteed property of condition variables as a programming
# construct. In particular, it is possible that this can no longer # construct. In particular, it is possible that this can no longer
# be conveniently guaranteed should their implementation ever change. # be conveniently guaranteed should their implementation ever change.
N = 5
ready = [] ready = []
results1 = [] results1 = []
results2 = [] results2 = []
@ -550,58 +595,83 @@ class ConditionTests(BaseTestCase):
cond.acquire() cond.acquire()
ready.append(phase_num) ready.append(phase_num)
result = cond.wait() result = cond.wait()
cond.release() cond.release()
results1.append((result, phase_num)) results1.append((result, phase_num))
cond.acquire() cond.acquire()
ready.append(phase_num) ready.append(phase_num)
result = cond.wait() result = cond.wait()
cond.release() cond.release()
results2.append((result, phase_num)) results2.append((result, phase_num))
b = Bunch(f, N)
b.wait_for_started() N = 5
# first wait, to ensure all workers settle into cond.wait() before with Bunch(f, N):
# we continue. See issues #8799 and #30727. # first wait, to ensure all workers settle into cond.wait() before
while len(ready) < 5: # we continue. See issues #8799 and #30727.
_wait() for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
ready.clear() if len(ready) >= N:
self.assertEqual(results1, []) break
# Notify 3 threads at first
cond.acquire() ready.clear()
cond.notify(3) self.assertEqual(results1, [])
_wait()
phase_num = 1 # Notify 3 threads at first
cond.release() count1 = 3
while len(results1) < 3: cond.acquire()
_wait() cond.notify(count1)
self.assertEqual(results1, [(True, 1)] * 3) wait_threads_blocked(count1)
self.assertEqual(results2, [])
# make sure all awaken workers settle into cond.wait() # Phase 1
while len(ready) < 3: phase_num = 1
_wait() cond.release()
# Notify 5 threads: they might be in their first or second wait for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
cond.acquire() if len(results1) >= count1:
cond.notify(5) break
_wait()
phase_num = 2 self.assertEqual(results1, [(True, 1)] * count1)
cond.release() self.assertEqual(results2, [])
while len(results1) + len(results2) < 8:
_wait() # Wait until awaken workers are blocked on cond.wait()
self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2) for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
self.assertEqual(results2, [(True, 2)] * 3) if len(ready) >= count1 :
# make sure all workers settle into cond.wait() break
while len(ready) < 5:
_wait() # Notify 5 threads: they might be in their first or second wait
# Notify all threads: they are all in their second wait cond.acquire()
cond.acquire() cond.notify(5)
cond.notify_all() wait_threads_blocked(N)
_wait()
phase_num = 3 # Phase 2
cond.release() phase_num = 2
while len(results2) < 5: cond.release()
_wait() for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2) if len(results1) + len(results2) >= (N + count1):
self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2) break
b.wait_for_finished()
count2 = N - count1
self.assertEqual(results1, [(True, 1)] * count1 + [(True, 2)] * count2)
self.assertEqual(results2, [(True, 2)] * count1)
# Make sure all workers settle into cond.wait()
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
if len(ready) >= N:
break
# Notify all threads: they are all in their second wait
cond.acquire()
cond.notify_all()
wait_threads_blocked(N)
# Phase 3
phase_num = 3
cond.release()
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
if len(results2) >= N:
break
self.assertEqual(results1, [(True, 1)] * count1 + [(True, 2)] * count2)
self.assertEqual(results2, [(True, 2)] * count1 + [(True, 3)] * count2)
def test_notify(self): def test_notify(self):
cond = self.condtype() cond = self.condtype()
@ -611,19 +681,23 @@ class ConditionTests(BaseTestCase):
def test_timeout(self): def test_timeout(self):
cond = self.condtype() cond = self.condtype()
timeout = 0.5
results = [] results = []
N = 5
def f(): def f():
cond.acquire() cond.acquire()
t1 = time.monotonic() t1 = time.monotonic()
result = cond.wait(0.5) result = cond.wait(timeout)
t2 = time.monotonic() t2 = time.monotonic()
cond.release() cond.release()
results.append((t2 - t1, result)) results.append((t2 - t1, result))
Bunch(f, N).wait_for_finished()
N = 5
with Bunch(f, N):
pass
self.assertEqual(len(results), N) self.assertEqual(len(results), N)
for dt, result in results: for dt, result in results:
self.assertTimeout(dt, 0.5) self.assertTimeout(dt, timeout)
# Note that conceptually (that"s the condition variable protocol) # Note that conceptually (that"s the condition variable protocol)
# a wait() may succeed even if no one notifies us and before any # a wait() may succeed even if no one notifies us and before any
# timeout occurs. Spurious wakeups can occur. # timeout occurs. Spurious wakeups can occur.
@ -636,17 +710,16 @@ class ConditionTests(BaseTestCase):
state = 0 state = 0
def f(): def f():
with cond: with cond:
result = cond.wait_for(lambda : state==4) result = cond.wait_for(lambda: state == 4)
self.assertTrue(result) self.assertTrue(result)
self.assertEqual(state, 4) self.assertEqual(state, 4)
b = Bunch(f, 1)
b.wait_for_started() with Bunch(f, 1):
for i in range(4): for i in range(4):
time.sleep(0.01) time.sleep(0.010)
with cond: with cond:
state += 1 state += 1
cond.notify() cond.notify()
b.wait_for_finished()
def test_waitfor_timeout(self): def test_waitfor_timeout(self):
cond = self.condtype() cond = self.condtype()
@ -660,15 +733,15 @@ class ConditionTests(BaseTestCase):
self.assertFalse(result) self.assertFalse(result)
self.assertTimeout(dt, 0.1) self.assertTimeout(dt, 0.1)
success.append(None) success.append(None)
b = Bunch(f, 1)
b.wait_for_started() with Bunch(f, 1):
# Only increment 3 times, so state == 4 is never reached. # Only increment 3 times, so state == 4 is never reached.
for i in range(3): for i in range(3):
time.sleep(0.01) time.sleep(0.010)
with cond: with cond:
state += 1 state += 1
cond.notify() cond.notify()
b.wait_for_finished()
self.assertEqual(len(success), 1) self.assertEqual(len(success), 1)
@ -697,73 +770,107 @@ class BaseSemaphoreTests(BaseTestCase):
del sem del sem
def test_acquire_contended(self): def test_acquire_contended(self):
sem = self.semtype(7) sem_value = 7
sem = self.semtype(sem_value)
sem.acquire() sem.acquire()
N = 10
sem_results = [] sem_results = []
results1 = [] results1 = []
results2 = [] results2 = []
phase_num = 0 phase_num = 0
def f():
def func():
sem_results.append(sem.acquire()) sem_results.append(sem.acquire())
results1.append(phase_num) results1.append(phase_num)
sem_results.append(sem.acquire()) sem_results.append(sem.acquire())
results2.append(phase_num) results2.append(phase_num)
b = Bunch(f, 10)
b.wait_for_started() def wait_count(count):
while len(results1) + len(results2) < 6: for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
_wait() if len(results1) + len(results2) >= count:
self.assertEqual(results1 + results2, [0] * 6) break
phase_num = 1
for i in range(7): N = 10
with Bunch(func, N):
# Phase 0
count1 = sem_value - 1
wait_count(count1)
self.assertEqual(results1 + results2, [0] * count1)
# Phase 1
phase_num = 1
for i in range(sem_value):
sem.release()
count2 = sem_value
wait_count(count1 + count2)
self.assertEqual(sorted(results1 + results2),
[0] * count1 + [1] * count2)
# Phase 2
phase_num = 2
count3 = (sem_value - 1)
for i in range(count3):
sem.release()
wait_count(count1 + count2 + count3)
self.assertEqual(sorted(results1 + results2),
[0] * count1 + [1] * count2 + [2] * count3)
# The semaphore is still locked
self.assertFalse(sem.acquire(False))
# Final release, to let the last thread finish
count4 = 1
sem.release() sem.release()
while len(results1) + len(results2) < 13:
_wait() self.assertEqual(sem_results,
self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7) [True] * (count1 + count2 + count3 + count4))
phase_num = 2
for i in range(6):
sem.release()
while len(results1) + len(results2) < 19:
_wait()
self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
# The semaphore is still locked
self.assertFalse(sem.acquire(False))
# Final release, to let the last thread finish
sem.release()
b.wait_for_finished()
self.assertEqual(sem_results, [True] * (6 + 7 + 6 + 1))
def test_multirelease(self): def test_multirelease(self):
sem = self.semtype(7) sem_value = 7
sem = self.semtype(sem_value)
sem.acquire() sem.acquire()
results1 = [] results1 = []
results2 = [] results2 = []
phase_num = 0 phase_num = 0
def f(): def func():
sem.acquire() sem.acquire()
results1.append(phase_num) results1.append(phase_num)
sem.acquire() sem.acquire()
results2.append(phase_num) results2.append(phase_num)
b = Bunch(f, 10)
b.wait_for_started() def wait_count(count):
while len(results1) + len(results2) < 6: for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
_wait() if len(results1) + len(results2) >= count:
self.assertEqual(results1 + results2, [0] * 6) break
phase_num = 1
sem.release(7) with Bunch(func, 10):
while len(results1) + len(results2) < 13: # Phase 0
_wait() count1 = sem_value - 1
self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7) wait_count(count1)
phase_num = 2 self.assertEqual(results1 + results2, [0] * count1)
sem.release(6)
while len(results1) + len(results2) < 19: # Phase 1
_wait() phase_num = 1
self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6) count2 = sem_value
# The semaphore is still locked sem.release(count2)
self.assertFalse(sem.acquire(False)) wait_count(count1 + count2)
# Final release, to let the last thread finish self.assertEqual(sorted(results1 + results2),
sem.release() [0] * count1 + [1] * count2)
b.wait_for_finished()
# Phase 2
phase_num = 2
count3 = sem_value - 1
sem.release(count3)
wait_count(count1 + count2 + count3)
self.assertEqual(sorted(results1 + results2),
[0] * count1 + [1] * count2 + [2] * count3)
# The semaphore is still locked
self.assertFalse(sem.acquire(False))
# Final release, to let the last thread finish
sem.release()
def test_try_acquire(self): def test_try_acquire(self):
sem = self.semtype(2) sem = self.semtype(2)
@ -780,7 +887,8 @@ class BaseSemaphoreTests(BaseTestCase):
def f(): def f():
results.append(sem.acquire(False)) results.append(sem.acquire(False))
results.append(sem.acquire(False)) results.append(sem.acquire(False))
Bunch(f, 5).wait_for_finished() with Bunch(f, 5):
pass
# There can be a thread switch between acquiring the semaphore and # There can be a thread switch between acquiring the semaphore and
# appending the result, therefore results will not necessarily be # appending the result, therefore results will not necessarily be
# ordered. # ordered.
@ -806,12 +914,14 @@ class BaseSemaphoreTests(BaseTestCase):
def f(): def f():
sem.acquire() sem.acquire()
sem.release() sem.release()
b = Bunch(f, 1)
b.wait_for_started() with Bunch(f, 1) as bunch:
_wait() # Thread blocked on sem.acquire()
self.assertFalse(b.finished) wait_threads_blocked(1)
sem.release() self.assertFalse(bunch.finished)
b.wait_for_finished()
# Thread unblocked
sem.release()
def test_with(self): def test_with(self):
sem = self.semtype(2) sem = self.semtype(2)
@ -882,13 +992,13 @@ class BarrierTests(BaseTestCase):
def setUp(self): def setUp(self):
self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout) self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
def tearDown(self): def tearDown(self):
self.barrier.abort() self.barrier.abort()
def run_threads(self, f): def run_threads(self, f):
b = Bunch(f, self.N-1) with Bunch(f, self.N):
f() pass
b.wait_for_finished()
def multipass(self, results, n): def multipass(self, results, n):
m = self.barrier.parties m = self.barrier.parties
@ -979,8 +1089,9 @@ class BarrierTests(BaseTestCase):
i = self.barrier.wait() i = self.barrier.wait()
if i == self.N//2: if i == self.N//2:
# Wait until the other threads are all in the barrier. # Wait until the other threads are all in the barrier.
while self.barrier.n_waiting < self.N-1: for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
time.sleep(0.001) if self.barrier.n_waiting >= (self.N - 1):
break
self.barrier.reset() self.barrier.reset()
else: else:
try: try:
@ -1040,25 +1151,27 @@ class BarrierTests(BaseTestCase):
i = self.barrier.wait() i = self.barrier.wait()
if i == self.N // 2: if i == self.N // 2:
# One thread is late! # One thread is late!
time.sleep(1.0) time.sleep(self.defaultTimeout / 2)
# Default timeout is 2.0, so this is shorter. # Default timeout is 2.0, so this is shorter.
self.assertRaises(threading.BrokenBarrierError, self.assertRaises(threading.BrokenBarrierError,
self.barrier.wait, 0.5) self.barrier.wait, self.defaultTimeout / 4)
self.run_threads(f) self.run_threads(f)
def test_default_timeout(self): def test_default_timeout(self):
""" """
Test the barrier's default timeout Test the barrier's default timeout
""" """
# create a barrier with a low default timeout timeout = 0.100
barrier = self.barriertype(self.N, timeout=0.3) barrier = self.barriertype(2, timeout=timeout)
def f(): def f():
i = barrier.wait() self.assertRaises(threading.BrokenBarrierError,
if i == self.N // 2: barrier.wait)
# One thread is later than the default timeout of 0.3s.
time.sleep(1.0) start_time = time.monotonic()
self.assertRaises(threading.BrokenBarrierError, barrier.wait) with Bunch(f, 1):
self.run_threads(f) pass
dt = time.monotonic() - start_time
self.assertGreaterEqual(dt, timeout)
def test_single_thread(self): def test_single_thread(self):
b = self.barriertype(1) b = self.barriertype(1)
@ -1066,16 +1179,28 @@ class BarrierTests(BaseTestCase):
b.wait() b.wait()
def test_repr(self): def test_repr(self):
b = self.barriertype(3) barrier = self.barriertype(3)
self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=0/3>") timeout = support.LONG_TIMEOUT
self.assertRegex(repr(barrier), r"<\w+\.Barrier at .*: waiters=0/3>")
def f(): def f():
b.wait(3) barrier.wait(timeout)
bunch = Bunch(f, 2)
bunch.wait_for_started() N = 2
time.sleep(0.2) with Bunch(f, N):
self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=2/3>") # Threads blocked on barrier.wait()
b.wait(3) for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
bunch.wait_for_finished() if barrier.n_waiting >= N:
self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=0/3>") break
b.abort() self.assertRegex(repr(barrier),
self.assertRegex(repr(b), r"<\w+\.Barrier at .*: broken>") r"<\w+\.Barrier at .*: waiters=2/3>")
# Threads unblocked
barrier.wait(timeout)
self.assertRegex(repr(barrier),
r"<\w+\.Barrier at .*: waiters=0/3>")
# Abort the barrier
barrier.abort()
self.assertRegex(repr(barrier),
r"<\w+\.Barrier at .*: broken>")

View file

@ -93,7 +93,8 @@ class DeadlockAvoidanceTests:
b.release() b.release()
if ra: if ra:
a.release() a.release()
lock_tests.Bunch(f, NTHREADS).wait_for_finished() with lock_tests.Bunch(f, NTHREADS):
pass
self.assertEqual(len(results), NTHREADS) self.assertEqual(len(results), NTHREADS)
return results return results

View file

@ -0,0 +1,3 @@
Fix race conditions in test_threading lock tests. Wait until a condition is met
rather than using :func:`time.sleep` with a hardcoded number of seconds. Patch
by Victor Stinner.