mirror of
				https://github.com/python/cpython.git
				synced 2025-10-22 06:32:43 +00:00 
			
		
		
		
	 1d032ea3d6
			
		
	
	
		1d032ea3d6
		
			
		
	
	
	
	
		
			
			* gh-109974: Fix threading lock_tests race conditions (#110057) Fix race conditions in test_threading lock tests. Wait until a condition is met rather than using time.sleep() with a hardcoded number of seconds. * Replace sleeping loops with support.sleeping_retry() which raises an exception on timeout. * Add wait_threads_blocked(nthread) which computes a sleep depending on the number of threads. Remove _wait() function. * test_set_and_clear(): use a way longer Event.wait() timeout. * BarrierTests.test_repr(): wait until the 2 threads are waiting for the barrier. Use a way longer timeout for Barrier.wait() timeout. * test_thread_leak() no longer needs to count len(threading.enumerate()): Bunch uses threading_helper.wait_threads_exit() internally which does it in wait_for_finished(). * Add BaseLockTests.wait_phase() which implements a timeout. test_reacquire() and test_recursion_count() use wait_phase(). (cherry picked from commit4e356ad183) * gh-109974: Fix more threading lock_tests race conditions (#110089) * Add context manager on Bunch class. * Bunch now catchs exceptions on executed functions and re-raise them at __exit__() as an ExceptionGroup. * Rewrite BarrierProxy.test_default_timeout(). Use a single thread. Only check that barrier.wait() blocks for at least default timeout seconds. * test_with(): inline _with() function. (cherry picked from commit743e3572ee)
		
			
				
	
	
		
			1206 lines
		
	
	
	
		
			36 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1206 lines
		
	
	
	
		
			36 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """
 | |
| Various tests for synchronization primitives.
 | |
| """
 | |
| 
 | |
| import gc
 | |
| import sys
 | |
| import time
 | |
| from _thread import start_new_thread, TIMEOUT_MAX
 | |
| import threading
 | |
| import unittest
 | |
| import weakref
 | |
| 
 | |
| from test import support
 | |
| from test.support import threading_helper
 | |
| 
 | |
| 
 | |
| requires_fork = unittest.skipUnless(support.has_fork_support,
 | |
|                                     "platform doesn't support fork "
 | |
|                                      "(no _at_fork_reinit method)")
 | |
| 
 | |
| 
 | |
| def wait_threads_blocked(nthread):
 | |
|     # Arbitrary sleep to wait until N threads are blocked,
 | |
|     # like waiting for a lock.
 | |
|     time.sleep(0.010 * nthread)
 | |
| 
 | |
| 
 | |
| class Bunch(object):
 | |
|     """
 | |
|     A bunch of threads.
 | |
|     """
 | |
|     def __init__(self, func, nthread, wait_before_exit=False):
 | |
|         """
 | |
|         Construct a bunch of `nthread` threads running the same function `func`.
 | |
|         If `wait_before_exit` is True, the threads won't terminate until
 | |
|         do_finish() is called.
 | |
|         """
 | |
|         self.func = func
 | |
|         self.nthread = nthread
 | |
|         self.started = []
 | |
|         self.finished = []
 | |
|         self.exceptions = []
 | |
|         self._can_exit = not wait_before_exit
 | |
|         self._wait_thread = None
 | |
| 
 | |
|     def task(self):
 | |
|         tid = threading.get_ident()
 | |
|         self.started.append(tid)
 | |
|         try:
 | |
|             self.func()
 | |
|         except BaseException as exc:
 | |
|             self.exceptions.append(exc)
 | |
|         finally:
 | |
|             self.finished.append(tid)
 | |
|             for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
 | |
|                 if self._can_exit:
 | |
|                     break
 | |
| 
 | |
|     def __enter__(self):
 | |
|         self._wait_thread = threading_helper.wait_threads_exit(support.SHORT_TIMEOUT)
 | |
|         self._wait_thread.__enter__()
 | |
| 
 | |
|         try:
 | |
|             for _ in range(self.nthread):
 | |
|                 start_new_thread(self.task, ())
 | |
|         except:
 | |
|             self._can_exit = True
 | |
|             raise
 | |
| 
 | |
|         for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
 | |
|             if len(self.started) >= self.nthread:
 | |
|                 break
 | |
| 
 | |
|         return self
 | |
| 
 | |
|     def __exit__(self, exc_type, exc_value, traceback):
 | |
|         for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
 | |
|             if len(self.finished) >= self.nthread:
 | |
|                 break
 | |
| 
 | |
|         # Wait until threads completely exit according to _thread._count()
 | |
|         self._wait_thread.__exit__(None, None, None)
 | |
| 
 | |
|         # Break reference cycle
 | |
|         exceptions = self.exceptions
 | |
|         self.exceptions = None
 | |
|         if exceptions:
 | |
|             raise ExceptionGroup(f"{self.func} threads raised exceptions",
 | |
|                                  exceptions)
 | |
| 
 | |
|     def do_finish(self):
 | |
|         self._can_exit = True
 | |
| 
 | |
| 
 | |
| class BaseTestCase(unittest.TestCase):
 | |
|     def setUp(self):
 | |
|         self._threads = threading_helper.threading_setup()
 | |
| 
 | |
|     def tearDown(self):
 | |
|         threading_helper.threading_cleanup(*self._threads)
 | |
|         support.reap_children()
 | |
| 
 | |
|     def assertTimeout(self, actual, expected):
 | |
|         # The waiting and/or time.monotonic() can be imprecise, which
 | |
|         # is why comparing to the expected value would sometimes fail
 | |
|         # (especially under Windows).
 | |
|         self.assertGreaterEqual(actual, expected * 0.6)
 | |
|         # Test nothing insane happened
 | |
|         self.assertLess(actual, expected * 10.0)
 | |
| 
 | |
| 
 | |
| class BaseLockTests(BaseTestCase):
 | |
|     """
 | |
|     Tests for both recursive and non-recursive locks.
 | |
|     """
 | |
| 
 | |
|     def wait_phase(self, phase, expected):
 | |
|         for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
 | |
|             if len(phase) >= expected:
 | |
|                 break
 | |
|         self.assertEqual(len(phase), expected)
 | |
| 
 | |
|     def test_constructor(self):
 | |
|         lock = self.locktype()
 | |
|         del lock
 | |
| 
 | |
|     def test_repr(self):
 | |
|         lock = self.locktype()
 | |
|         self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>")
 | |
|         del lock
 | |
| 
 | |
|     def test_locked_repr(self):
 | |
|         lock = self.locktype()
 | |
|         lock.acquire()
 | |
|         self.assertRegex(repr(lock), "<locked .* object (.*)?at .*>")
 | |
|         del lock
 | |
| 
 | |
|     def test_acquire_destroy(self):
 | |
|         lock = self.locktype()
 | |
|         lock.acquire()
 | |
|         del lock
 | |
| 
 | |
|     def test_acquire_release(self):
 | |
|         lock = self.locktype()
 | |
|         lock.acquire()
 | |
|         lock.release()
 | |
|         del lock
 | |
| 
 | |
|     def test_try_acquire(self):
 | |
|         lock = self.locktype()
 | |
|         self.assertTrue(lock.acquire(False))
 | |
|         lock.release()
 | |
| 
 | |
|     def test_try_acquire_contended(self):
 | |
|         lock = self.locktype()
 | |
|         lock.acquire()
 | |
|         result = []
 | |
|         def f():
 | |
|             result.append(lock.acquire(False))
 | |
|         with Bunch(f, 1):
 | |
|             pass
 | |
|         self.assertFalse(result[0])
 | |
|         lock.release()
 | |
| 
 | |
|     def test_acquire_contended(self):
 | |
|         lock = self.locktype()
 | |
|         lock.acquire()
 | |
|         def f():
 | |
|             lock.acquire()
 | |
|             lock.release()
 | |
| 
 | |
|         N = 5
 | |
|         with Bunch(f, N) as bunch:
 | |
|             # Threads block on lock.acquire()
 | |
|             wait_threads_blocked(N)
 | |
|             self.assertEqual(len(bunch.finished), 0)
 | |
| 
 | |
|             # Threads unblocked
 | |
|             lock.release()
 | |
| 
 | |
|         self.assertEqual(len(bunch.finished), N)
 | |
| 
 | |
|     def test_with(self):
 | |
|         lock = self.locktype()
 | |
|         def f():
 | |
|             lock.acquire()
 | |
|             lock.release()
 | |
| 
 | |
|         def with_lock(err=None):
 | |
|             with lock:
 | |
|                 if err is not None:
 | |
|                     raise err
 | |
| 
 | |
|         # Acquire the lock, do nothing, with releases the lock
 | |
|         with lock:
 | |
|             pass
 | |
| 
 | |
|         # Check that the lock is unacquired
 | |
|         with Bunch(f, 1):
 | |
|             pass
 | |
| 
 | |
|         # Acquire the lock, raise an exception, with releases the lock
 | |
|         with self.assertRaises(TypeError):
 | |
|             with lock:
 | |
|                 raise TypeError
 | |
| 
 | |
|         # Check that the lock is unacquired even if after an exception
 | |
|         # was raised in the previous "with lock:" block
 | |
|         with Bunch(f, 1):
 | |
|             pass
 | |
| 
 | |
|     def test_thread_leak(self):
 | |
|         # The lock shouldn't leak a Thread instance when used from a foreign
 | |
|         # (non-threading) thread.
 | |
|         lock = self.locktype()
 | |
|         def f():
 | |
|             lock.acquire()
 | |
|             lock.release()
 | |
| 
 | |
|         # We run many threads in the hope that existing threads ids won't
 | |
|         # be recycled.
 | |
|         with Bunch(f, 15):
 | |
|             pass
 | |
| 
 | |
|     def test_timeout(self):
 | |
|         lock = self.locktype()
 | |
|         # Can't set timeout if not blocking
 | |
|         self.assertRaises(ValueError, lock.acquire, False, 1)
 | |
|         # Invalid timeout values
 | |
|         self.assertRaises(ValueError, lock.acquire, timeout=-100)
 | |
|         self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
 | |
|         self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
 | |
|         # TIMEOUT_MAX is ok
 | |
|         lock.acquire(timeout=TIMEOUT_MAX)
 | |
|         lock.release()
 | |
|         t1 = time.monotonic()
 | |
|         self.assertTrue(lock.acquire(timeout=5))
 | |
|         t2 = time.monotonic()
 | |
|         # Just a sanity test that it didn't actually wait for the timeout.
 | |
|         self.assertLess(t2 - t1, 5)
 | |
|         results = []
 | |
|         def f():
 | |
|             t1 = time.monotonic()
 | |
|             results.append(lock.acquire(timeout=0.5))
 | |
|             t2 = time.monotonic()
 | |
|             results.append(t2 - t1)
 | |
|         with Bunch(f, 1):
 | |
|             pass
 | |
|         self.assertFalse(results[0])
 | |
|         self.assertTimeout(results[1], 0.5)
 | |
| 
 | |
|     def test_weakref_exists(self):
 | |
|         lock = self.locktype()
 | |
|         ref = weakref.ref(lock)
 | |
|         self.assertIsNotNone(ref())
 | |
| 
 | |
|     def test_weakref_deleted(self):
 | |
|         lock = self.locktype()
 | |
|         ref = weakref.ref(lock)
 | |
|         del lock
 | |
|         gc.collect()  # For PyPy or other GCs.
 | |
|         self.assertIsNone(ref())
 | |
| 
 | |
| 
 | |
| class LockTests(BaseLockTests):
 | |
|     """
 | |
|     Tests for non-recursive, weak locks
 | |
|     (which can be acquired and released from different threads).
 | |
|     """
 | |
|     def test_reacquire(self):
 | |
|         # Lock needs to be released before re-acquiring.
 | |
|         lock = self.locktype()
 | |
|         phase = []
 | |
| 
 | |
|         def f():
 | |
|             lock.acquire()
 | |
|             phase.append(None)
 | |
|             lock.acquire()
 | |
|             phase.append(None)
 | |
| 
 | |
|         with threading_helper.wait_threads_exit():
 | |
|             # Thread blocked on lock.acquire()
 | |
|             start_new_thread(f, ())
 | |
|             self.wait_phase(phase, 1)
 | |
| 
 | |
|             # Thread unblocked
 | |
|             lock.release()
 | |
|             self.wait_phase(phase, 2)
 | |
| 
 | |
|     def test_different_thread(self):
 | |
|         # Lock can be released from a different thread.
 | |
|         lock = self.locktype()
 | |
|         lock.acquire()
 | |
|         def f():
 | |
|             lock.release()
 | |
|         with Bunch(f, 1):
 | |
|             pass
 | |
|         lock.acquire()
 | |
|         lock.release()
 | |
| 
 | |
|     def test_state_after_timeout(self):
 | |
|         # Issue #11618: check that lock is in a proper state after a
 | |
|         # (non-zero) timeout.
 | |
|         lock = self.locktype()
 | |
|         lock.acquire()
 | |
|         self.assertFalse(lock.acquire(timeout=0.01))
 | |
|         lock.release()
 | |
|         self.assertFalse(lock.locked())
 | |
|         self.assertTrue(lock.acquire(blocking=False))
 | |
| 
 | |
|     @requires_fork
 | |
|     def test_at_fork_reinit(self):
 | |
|         def use_lock(lock):
 | |
|             # make sure that the lock still works normally
 | |
|             # after _at_fork_reinit()
 | |
|             lock.acquire()
 | |
|             lock.release()
 | |
| 
 | |
|         # unlocked
 | |
|         lock = self.locktype()
 | |
|         lock._at_fork_reinit()
 | |
|         use_lock(lock)
 | |
| 
 | |
|         # locked: _at_fork_reinit() resets the lock to the unlocked state
 | |
|         lock2 = self.locktype()
 | |
|         lock2.acquire()
 | |
|         lock2._at_fork_reinit()
 | |
|         use_lock(lock2)
 | |
| 
 | |
| 
 | |
| class RLockTests(BaseLockTests):
 | |
|     """
 | |
|     Tests for recursive locks.
 | |
|     """
 | |
|     def test_reacquire(self):
 | |
|         lock = self.locktype()
 | |
|         lock.acquire()
 | |
|         lock.acquire()
 | |
|         lock.release()
 | |
|         lock.acquire()
 | |
|         lock.release()
 | |
|         lock.release()
 | |
| 
 | |
|     def test_release_unacquired(self):
 | |
|         # Cannot release an unacquired lock
 | |
|         lock = self.locktype()
 | |
|         self.assertRaises(RuntimeError, lock.release)
 | |
|         lock.acquire()
 | |
|         lock.acquire()
 | |
|         lock.release()
 | |
|         lock.acquire()
 | |
|         lock.release()
 | |
|         lock.release()
 | |
|         self.assertRaises(RuntimeError, lock.release)
 | |
| 
 | |
|     def test_release_save_unacquired(self):
 | |
|         # Cannot _release_save an unacquired lock
 | |
|         lock = self.locktype()
 | |
|         self.assertRaises(RuntimeError, lock._release_save)
 | |
|         lock.acquire()
 | |
|         lock.acquire()
 | |
|         lock.release()
 | |
|         lock.acquire()
 | |
|         lock.release()
 | |
|         lock.release()
 | |
|         self.assertRaises(RuntimeError, lock._release_save)
 | |
| 
 | |
|     def test_recursion_count(self):
 | |
|         lock = self.locktype()
 | |
|         self.assertEqual(0, lock._recursion_count())
 | |
|         lock.acquire()
 | |
|         self.assertEqual(1, lock._recursion_count())
 | |
|         lock.acquire()
 | |
|         lock.acquire()
 | |
|         self.assertEqual(3, lock._recursion_count())
 | |
|         lock.release()
 | |
|         self.assertEqual(2, lock._recursion_count())
 | |
|         lock.release()
 | |
|         lock.release()
 | |
|         self.assertEqual(0, lock._recursion_count())
 | |
| 
 | |
|         phase = []
 | |
| 
 | |
|         def f():
 | |
|             lock.acquire()
 | |
|             phase.append(None)
 | |
| 
 | |
|             self.wait_phase(phase, 2)
 | |
|             lock.release()
 | |
|             phase.append(None)
 | |
| 
 | |
|         with threading_helper.wait_threads_exit():
 | |
|             # Thread blocked on lock.acquire()
 | |
|             start_new_thread(f, ())
 | |
|             self.wait_phase(phase, 1)
 | |
|             self.assertEqual(0, lock._recursion_count())
 | |
| 
 | |
|             # Thread unblocked
 | |
|             phase.append(None)
 | |
|             self.wait_phase(phase, 3)
 | |
|             self.assertEqual(0, lock._recursion_count())
 | |
| 
 | |
|     def test_different_thread(self):
 | |
|         # Cannot release from a different thread
 | |
|         lock = self.locktype()
 | |
|         def f():
 | |
|             lock.acquire()
 | |
| 
 | |
|         with Bunch(f, 1, True) as bunch:
 | |
|             try:
 | |
|                 self.assertRaises(RuntimeError, lock.release)
 | |
|             finally:
 | |
|                 bunch.do_finish()
 | |
| 
 | |
|     def test__is_owned(self):
 | |
|         lock = self.locktype()
 | |
|         self.assertFalse(lock._is_owned())
 | |
|         lock.acquire()
 | |
|         self.assertTrue(lock._is_owned())
 | |
|         lock.acquire()
 | |
|         self.assertTrue(lock._is_owned())
 | |
|         result = []
 | |
|         def f():
 | |
|             result.append(lock._is_owned())
 | |
|         with Bunch(f, 1):
 | |
|             pass
 | |
|         self.assertFalse(result[0])
 | |
|         lock.release()
 | |
|         self.assertTrue(lock._is_owned())
 | |
|         lock.release()
 | |
|         self.assertFalse(lock._is_owned())
 | |
| 
 | |
| 
 | |
| class EventTests(BaseTestCase):
 | |
|     """
 | |
|     Tests for Event objects.
 | |
|     """
 | |
| 
 | |
|     def test_is_set(self):
 | |
|         evt = self.eventtype()
 | |
|         self.assertFalse(evt.is_set())
 | |
|         evt.set()
 | |
|         self.assertTrue(evt.is_set())
 | |
|         evt.set()
 | |
|         self.assertTrue(evt.is_set())
 | |
|         evt.clear()
 | |
|         self.assertFalse(evt.is_set())
 | |
|         evt.clear()
 | |
|         self.assertFalse(evt.is_set())
 | |
| 
 | |
|     def _check_notify(self, evt):
 | |
|         # All threads get notified
 | |
|         N = 5
 | |
|         results1 = []
 | |
|         results2 = []
 | |
|         def f():
 | |
|             results1.append(evt.wait())
 | |
|             results2.append(evt.wait())
 | |
| 
 | |
|         with Bunch(f, N):
 | |
|             # Threads blocked on first evt.wait()
 | |
|             wait_threads_blocked(N)
 | |
|             self.assertEqual(len(results1), 0)
 | |
| 
 | |
|             # Threads unblocked
 | |
|             evt.set()
 | |
| 
 | |
|         self.assertEqual(results1, [True] * N)
 | |
|         self.assertEqual(results2, [True] * N)
 | |
| 
 | |
|     def test_notify(self):
 | |
|         evt = self.eventtype()
 | |
|         self._check_notify(evt)
 | |
|         # Another time, after an explicit clear()
 | |
|         evt.set()
 | |
|         evt.clear()
 | |
|         self._check_notify(evt)
 | |
| 
 | |
|     def test_timeout(self):
 | |
|         evt = self.eventtype()
 | |
|         results1 = []
 | |
|         results2 = []
 | |
|         N = 5
 | |
|         def f():
 | |
|             results1.append(evt.wait(0.0))
 | |
|             t1 = time.monotonic()
 | |
|             r = evt.wait(0.5)
 | |
|             t2 = time.monotonic()
 | |
|             results2.append((r, t2 - t1))
 | |
| 
 | |
|         with Bunch(f, N):
 | |
|             pass
 | |
| 
 | |
|         self.assertEqual(results1, [False] * N)
 | |
|         for r, dt in results2:
 | |
|             self.assertFalse(r)
 | |
|             self.assertTimeout(dt, 0.5)
 | |
| 
 | |
|         # The event is set
 | |
|         results1 = []
 | |
|         results2 = []
 | |
|         evt.set()
 | |
|         with Bunch(f, N):
 | |
|             pass
 | |
| 
 | |
|         self.assertEqual(results1, [True] * N)
 | |
|         for r, dt in results2:
 | |
|             self.assertTrue(r)
 | |
| 
 | |
|     def test_set_and_clear(self):
 | |
|         # gh-57711: check that wait() returns true even when the event is
 | |
|         # cleared before the waiting thread is woken up.
 | |
|         event = self.eventtype()
 | |
|         results = []
 | |
|         def f():
 | |
|             results.append(event.wait(support.LONG_TIMEOUT))
 | |
| 
 | |
|         N = 5
 | |
|         with Bunch(f, N):
 | |
|             # Threads blocked on event.wait()
 | |
|             wait_threads_blocked(N)
 | |
| 
 | |
|             # Threads unblocked
 | |
|             event.set()
 | |
|             event.clear()
 | |
| 
 | |
|         self.assertEqual(results, [True] * N)
 | |
| 
 | |
|     @requires_fork
 | |
|     def test_at_fork_reinit(self):
 | |
|         # ensure that condition is still using a Lock after reset
 | |
|         evt = self.eventtype()
 | |
|         with evt._cond:
 | |
|             self.assertFalse(evt._cond.acquire(False))
 | |
|         evt._at_fork_reinit()
 | |
|         with evt._cond:
 | |
|             self.assertFalse(evt._cond.acquire(False))
 | |
| 
 | |
|     def test_repr(self):
 | |
|         evt = self.eventtype()
 | |
|         self.assertRegex(repr(evt), r"<\w+\.Event at .*: unset>")
 | |
|         evt.set()
 | |
|         self.assertRegex(repr(evt), r"<\w+\.Event at .*: set>")
 | |
| 
 | |
| 
 | |
| class ConditionTests(BaseTestCase):
 | |
|     """
 | |
|     Tests for condition variables.
 | |
|     """
 | |
| 
 | |
|     def test_acquire(self):
 | |
|         cond = self.condtype()
 | |
|         # Be default we have an RLock: the condition can be acquired multiple
 | |
|         # times.
 | |
|         cond.acquire()
 | |
|         cond.acquire()
 | |
|         cond.release()
 | |
|         cond.release()
 | |
|         lock = threading.Lock()
 | |
|         cond = self.condtype(lock)
 | |
|         cond.acquire()
 | |
|         self.assertFalse(lock.acquire(False))
 | |
|         cond.release()
 | |
|         self.assertTrue(lock.acquire(False))
 | |
|         self.assertFalse(cond.acquire(False))
 | |
|         lock.release()
 | |
|         with cond:
 | |
|             self.assertFalse(lock.acquire(False))
 | |
| 
 | |
|     def test_unacquired_wait(self):
 | |
|         cond = self.condtype()
 | |
|         self.assertRaises(RuntimeError, cond.wait)
 | |
| 
 | |
|     def test_unacquired_notify(self):
 | |
|         cond = self.condtype()
 | |
|         self.assertRaises(RuntimeError, cond.notify)
 | |
| 
 | |
|     def _check_notify(self, cond):
 | |
|         # Note that this test is sensitive to timing.  If the worker threads
 | |
|         # don't execute in a timely fashion, the main thread may think they
 | |
|         # are further along then they are.  The main thread therefore issues
 | |
|         # wait_threads_blocked() statements to try to make sure that it doesn't
 | |
|         # race ahead of the workers.
 | |
|         # Secondly, this test assumes that condition variables are not subject
 | |
|         # to spurious wakeups.  The absence of spurious wakeups is an implementation
 | |
|         # detail of Condition Variables in current CPython, but in general, not
 | |
|         # a guaranteed property of condition variables as a programming
 | |
|         # construct.  In particular, it is possible that this can no longer
 | |
|         # be conveniently guaranteed should their implementation ever change.
 | |
|         ready = []
 | |
|         results1 = []
 | |
|         results2 = []
 | |
|         phase_num = 0
 | |
|         def f():
 | |
|             cond.acquire()
 | |
|             ready.append(phase_num)
 | |
|             result = cond.wait()
 | |
| 
 | |
|             cond.release()
 | |
|             results1.append((result, phase_num))
 | |
| 
 | |
|             cond.acquire()
 | |
|             ready.append(phase_num)
 | |
| 
 | |
|             result = cond.wait()
 | |
|             cond.release()
 | |
|             results2.append((result, phase_num))
 | |
| 
 | |
|         N = 5
 | |
|         with Bunch(f, N):
 | |
|             # first wait, to ensure all workers settle into cond.wait() before
 | |
|             # we continue. See issues #8799 and #30727.
 | |
|             for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
 | |
|                 if len(ready) >= N:
 | |
|                     break
 | |
| 
 | |
|             ready.clear()
 | |
|             self.assertEqual(results1, [])
 | |
| 
 | |
|             # Notify 3 threads at first
 | |
|             count1 = 3
 | |
|             cond.acquire()
 | |
|             cond.notify(count1)
 | |
|             wait_threads_blocked(count1)
 | |
| 
 | |
|             # Phase 1
 | |
|             phase_num = 1
 | |
|             cond.release()
 | |
|             for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
 | |
|                 if len(results1) >= count1:
 | |
|                     break
 | |
| 
 | |
|             self.assertEqual(results1, [(True, 1)] * count1)
 | |
|             self.assertEqual(results2, [])
 | |
| 
 | |
|             # Wait until awaken workers are blocked on cond.wait()
 | |
|             for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
 | |
|                 if len(ready) >= count1 :
 | |
|                     break
 | |
| 
 | |
|             # Notify 5 threads: they might be in their first or second wait
 | |
|             cond.acquire()
 | |
|             cond.notify(5)
 | |
|             wait_threads_blocked(N)
 | |
| 
 | |
|             # Phase 2
 | |
|             phase_num = 2
 | |
|             cond.release()
 | |
|             for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
 | |
|                 if len(results1) + len(results2) >= (N + count1):
 | |
|                     break
 | |
| 
 | |
|             count2 = N - count1
 | |
|             self.assertEqual(results1, [(True, 1)] * count1 + [(True, 2)] * count2)
 | |
|             self.assertEqual(results2, [(True, 2)] * count1)
 | |
| 
 | |
|             # Make sure all workers settle into cond.wait()
 | |
|             for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
 | |
|                 if len(ready) >= N:
 | |
|                     break
 | |
| 
 | |
|             # Notify all threads: they are all in their second wait
 | |
|             cond.acquire()
 | |
|             cond.notify_all()
 | |
|             wait_threads_blocked(N)
 | |
| 
 | |
|             # Phase 3
 | |
|             phase_num = 3
 | |
|             cond.release()
 | |
|             for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
 | |
|                 if len(results2) >= N:
 | |
|                     break
 | |
|             self.assertEqual(results1, [(True, 1)] * count1 + [(True, 2)] * count2)
 | |
|             self.assertEqual(results2, [(True, 2)] * count1 + [(True, 3)] * count2)
 | |
| 
 | |
|     def test_notify(self):
 | |
|         cond = self.condtype()
 | |
|         self._check_notify(cond)
 | |
|         # A second time, to check internal state is still ok.
 | |
|         self._check_notify(cond)
 | |
| 
 | |
|     def test_timeout(self):
 | |
|         cond = self.condtype()
 | |
|         timeout = 0.5
 | |
|         results = []
 | |
|         def f():
 | |
|             cond.acquire()
 | |
|             t1 = time.monotonic()
 | |
|             result = cond.wait(timeout)
 | |
|             t2 = time.monotonic()
 | |
|             cond.release()
 | |
|             results.append((t2 - t1, result))
 | |
| 
 | |
|         N = 5
 | |
|         with Bunch(f, N):
 | |
|             pass
 | |
|         self.assertEqual(len(results), N)
 | |
| 
 | |
|         for dt, result in results:
 | |
|             self.assertTimeout(dt, timeout)
 | |
|             # Note that conceptually (that"s the condition variable protocol)
 | |
|             # a wait() may succeed even if no one notifies us and before any
 | |
|             # timeout occurs.  Spurious wakeups can occur.
 | |
|             # This makes it hard to verify the result value.
 | |
|             # In practice, this implementation has no spurious wakeups.
 | |
|             self.assertFalse(result)
 | |
| 
 | |
|     def test_waitfor(self):
 | |
|         cond = self.condtype()
 | |
|         state = 0
 | |
|         def f():
 | |
|             with cond:
 | |
|                 result = cond.wait_for(lambda: state == 4)
 | |
|                 self.assertTrue(result)
 | |
|                 self.assertEqual(state, 4)
 | |
| 
 | |
|         with Bunch(f, 1):
 | |
|             for i in range(4):
 | |
|                 time.sleep(0.010)
 | |
|                 with cond:
 | |
|                     state += 1
 | |
|                     cond.notify()
 | |
| 
 | |
|     def test_waitfor_timeout(self):
 | |
|         cond = self.condtype()
 | |
|         state = 0
 | |
|         success = []
 | |
|         def f():
 | |
|             with cond:
 | |
|                 dt = time.monotonic()
 | |
|                 result = cond.wait_for(lambda : state==4, timeout=0.1)
 | |
|                 dt = time.monotonic() - dt
 | |
|                 self.assertFalse(result)
 | |
|                 self.assertTimeout(dt, 0.1)
 | |
|                 success.append(None)
 | |
| 
 | |
|         with Bunch(f, 1):
 | |
|             # Only increment 3 times, so state == 4 is never reached.
 | |
|             for i in range(3):
 | |
|                 time.sleep(0.010)
 | |
|                 with cond:
 | |
|                     state += 1
 | |
|                     cond.notify()
 | |
| 
 | |
|         self.assertEqual(len(success), 1)
 | |
| 
 | |
| 
 | |
| class BaseSemaphoreTests(BaseTestCase):
 | |
|     """
 | |
|     Common tests for {bounded, unbounded} semaphore objects.
 | |
|     """
 | |
| 
 | |
|     def test_constructor(self):
 | |
|         self.assertRaises(ValueError, self.semtype, value = -1)
 | |
|         self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
 | |
| 
 | |
|     def test_acquire(self):
 | |
|         sem = self.semtype(1)
 | |
|         sem.acquire()
 | |
|         sem.release()
 | |
|         sem = self.semtype(2)
 | |
|         sem.acquire()
 | |
|         sem.acquire()
 | |
|         sem.release()
 | |
|         sem.release()
 | |
| 
 | |
|     def test_acquire_destroy(self):
 | |
|         sem = self.semtype()
 | |
|         sem.acquire()
 | |
|         del sem
 | |
| 
 | |
|     def test_acquire_contended(self):
 | |
|         sem_value = 7
 | |
|         sem = self.semtype(sem_value)
 | |
|         sem.acquire()
 | |
| 
 | |
|         sem_results = []
 | |
|         results1 = []
 | |
|         results2 = []
 | |
|         phase_num = 0
 | |
| 
 | |
|         def func():
 | |
|             sem_results.append(sem.acquire())
 | |
|             results1.append(phase_num)
 | |
| 
 | |
|             sem_results.append(sem.acquire())
 | |
|             results2.append(phase_num)
 | |
| 
 | |
|         def wait_count(count):
 | |
|             for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
 | |
|                 if len(results1) + len(results2) >= count:
 | |
|                     break
 | |
| 
 | |
|         N = 10
 | |
|         with Bunch(func, N):
 | |
|             # Phase 0
 | |
|             count1 = sem_value - 1
 | |
|             wait_count(count1)
 | |
|             self.assertEqual(results1 + results2, [0] * count1)
 | |
| 
 | |
|             # Phase 1
 | |
|             phase_num = 1
 | |
|             for i in range(sem_value):
 | |
|                 sem.release()
 | |
|             count2 = sem_value
 | |
|             wait_count(count1 + count2)
 | |
|             self.assertEqual(sorted(results1 + results2),
 | |
|                              [0] * count1 + [1] * count2)
 | |
| 
 | |
|             # Phase 2
 | |
|             phase_num = 2
 | |
|             count3 = (sem_value - 1)
 | |
|             for i in range(count3):
 | |
|                 sem.release()
 | |
|             wait_count(count1 + count2 + count3)
 | |
|             self.assertEqual(sorted(results1 + results2),
 | |
|                              [0] * count1 + [1] * count2 + [2] * count3)
 | |
|             # The semaphore is still locked
 | |
|             self.assertFalse(sem.acquire(False))
 | |
| 
 | |
|             # Final release, to let the last thread finish
 | |
|             count4 = 1
 | |
|             sem.release()
 | |
| 
 | |
|         self.assertEqual(sem_results,
 | |
|                          [True] * (count1 + count2 + count3 + count4))
 | |
| 
 | |
|     def test_multirelease(self):
 | |
|         sem_value = 7
 | |
|         sem = self.semtype(sem_value)
 | |
|         sem.acquire()
 | |
| 
 | |
|         results1 = []
 | |
|         results2 = []
 | |
|         phase_num = 0
 | |
|         def func():
 | |
|             sem.acquire()
 | |
|             results1.append(phase_num)
 | |
| 
 | |
|             sem.acquire()
 | |
|             results2.append(phase_num)
 | |
| 
 | |
|         def wait_count(count):
 | |
|             for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
 | |
|                 if len(results1) + len(results2) >= count:
 | |
|                     break
 | |
| 
 | |
|         with Bunch(func, 10):
 | |
|             # Phase 0
 | |
|             count1 = sem_value - 1
 | |
|             wait_count(count1)
 | |
|             self.assertEqual(results1 + results2, [0] * count1)
 | |
| 
 | |
|             # Phase 1
 | |
|             phase_num = 1
 | |
|             count2 = sem_value
 | |
|             sem.release(count2)
 | |
|             wait_count(count1 + count2)
 | |
|             self.assertEqual(sorted(results1 + results2),
 | |
|                              [0] * count1 + [1] * count2)
 | |
| 
 | |
|             # Phase 2
 | |
|             phase_num = 2
 | |
|             count3 = sem_value - 1
 | |
|             sem.release(count3)
 | |
|             wait_count(count1 + count2 + count3)
 | |
|             self.assertEqual(sorted(results1 + results2),
 | |
|                              [0] * count1 + [1] * count2 + [2] * count3)
 | |
|             # The semaphore is still locked
 | |
|             self.assertFalse(sem.acquire(False))
 | |
| 
 | |
|             # Final release, to let the last thread finish
 | |
|             sem.release()
 | |
| 
 | |
|     def test_try_acquire(self):
 | |
|         sem = self.semtype(2)
 | |
|         self.assertTrue(sem.acquire(False))
 | |
|         self.assertTrue(sem.acquire(False))
 | |
|         self.assertFalse(sem.acquire(False))
 | |
|         sem.release()
 | |
|         self.assertTrue(sem.acquire(False))
 | |
| 
 | |
|     def test_try_acquire_contended(self):
 | |
|         sem = self.semtype(4)
 | |
|         sem.acquire()
 | |
|         results = []
 | |
|         def f():
 | |
|             results.append(sem.acquire(False))
 | |
|             results.append(sem.acquire(False))
 | |
|         with Bunch(f, 5):
 | |
|             pass
 | |
|         # There can be a thread switch between acquiring the semaphore and
 | |
|         # appending the result, therefore results will not necessarily be
 | |
|         # ordered.
 | |
|         self.assertEqual(sorted(results), [False] * 7 + [True] *  3 )
 | |
| 
 | |
|     def test_acquire_timeout(self):
 | |
|         sem = self.semtype(2)
 | |
|         self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
 | |
|         self.assertTrue(sem.acquire(timeout=0.005))
 | |
|         self.assertTrue(sem.acquire(timeout=0.005))
 | |
|         self.assertFalse(sem.acquire(timeout=0.005))
 | |
|         sem.release()
 | |
|         self.assertTrue(sem.acquire(timeout=0.005))
 | |
|         t = time.monotonic()
 | |
|         self.assertFalse(sem.acquire(timeout=0.5))
 | |
|         dt = time.monotonic() - t
 | |
|         self.assertTimeout(dt, 0.5)
 | |
| 
 | |
|     def test_default_value(self):
 | |
|         # The default initial value is 1.
 | |
|         sem = self.semtype()
 | |
|         sem.acquire()
 | |
|         def f():
 | |
|             sem.acquire()
 | |
|             sem.release()
 | |
| 
 | |
|         with Bunch(f, 1) as bunch:
 | |
|             # Thread blocked on sem.acquire()
 | |
|             wait_threads_blocked(1)
 | |
|             self.assertFalse(bunch.finished)
 | |
| 
 | |
|             # Thread unblocked
 | |
|             sem.release()
 | |
| 
 | |
|     def test_with(self):
 | |
|         sem = self.semtype(2)
 | |
|         def _with(err=None):
 | |
|             with sem:
 | |
|                 self.assertTrue(sem.acquire(False))
 | |
|                 sem.release()
 | |
|                 with sem:
 | |
|                     self.assertFalse(sem.acquire(False))
 | |
|                     if err:
 | |
|                         raise err
 | |
|         _with()
 | |
|         self.assertTrue(sem.acquire(False))
 | |
|         sem.release()
 | |
|         self.assertRaises(TypeError, _with, TypeError)
 | |
|         self.assertTrue(sem.acquire(False))
 | |
|         sem.release()
 | |
| 
 | |
| class SemaphoreTests(BaseSemaphoreTests):
 | |
|     """
 | |
|     Tests for unbounded semaphores.
 | |
|     """
 | |
| 
 | |
|     def test_release_unacquired(self):
 | |
|         # Unbounded releases are allowed and increment the semaphore's value
 | |
|         sem = self.semtype(1)
 | |
|         sem.release()
 | |
|         sem.acquire()
 | |
|         sem.acquire()
 | |
|         sem.release()
 | |
| 
 | |
|     def test_repr(self):
 | |
|         sem = self.semtype(3)
 | |
|         self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=3>")
 | |
|         sem.acquire()
 | |
|         self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=2>")
 | |
|         sem.release()
 | |
|         sem.release()
 | |
|         self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=4>")
 | |
| 
 | |
| 
 | |
| class BoundedSemaphoreTests(BaseSemaphoreTests):
 | |
|     """
 | |
|     Tests for bounded semaphores.
 | |
|     """
 | |
| 
 | |
|     def test_release_unacquired(self):
 | |
|         # Cannot go past the initial value
 | |
|         sem = self.semtype()
 | |
|         self.assertRaises(ValueError, sem.release)
 | |
|         sem.acquire()
 | |
|         sem.release()
 | |
|         self.assertRaises(ValueError, sem.release)
 | |
| 
 | |
|     def test_repr(self):
 | |
|         sem = self.semtype(3)
 | |
|         self.assertRegex(repr(sem), r"<\w+\.BoundedSemaphore at .*: value=3/3>")
 | |
|         sem.acquire()
 | |
|         self.assertRegex(repr(sem), r"<\w+\.BoundedSemaphore at .*: value=2/3>")
 | |
| 
 | |
| 
 | |
| class BarrierTests(BaseTestCase):
 | |
|     """
 | |
|     Tests for Barrier objects.
 | |
|     """
 | |
|     N = 5
 | |
|     defaultTimeout = 2.0
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
 | |
| 
 | |
|     def tearDown(self):
 | |
|         self.barrier.abort()
 | |
| 
 | |
|     def run_threads(self, f):
 | |
|         with Bunch(f, self.N):
 | |
|             pass
 | |
| 
 | |
|     def multipass(self, results, n):
 | |
|         m = self.barrier.parties
 | |
|         self.assertEqual(m, self.N)
 | |
|         for i in range(n):
 | |
|             results[0].append(True)
 | |
|             self.assertEqual(len(results[1]), i * m)
 | |
|             self.barrier.wait()
 | |
|             results[1].append(True)
 | |
|             self.assertEqual(len(results[0]), (i + 1) * m)
 | |
|             self.barrier.wait()
 | |
|         self.assertEqual(self.barrier.n_waiting, 0)
 | |
|         self.assertFalse(self.barrier.broken)
 | |
| 
 | |
|     def test_barrier(self, passes=1):
 | |
|         """
 | |
|         Test that a barrier is passed in lockstep
 | |
|         """
 | |
|         results = [[],[]]
 | |
|         def f():
 | |
|             self.multipass(results, passes)
 | |
|         self.run_threads(f)
 | |
| 
 | |
|     def test_barrier_10(self):
 | |
|         """
 | |
|         Test that a barrier works for 10 consecutive runs
 | |
|         """
 | |
|         return self.test_barrier(10)
 | |
| 
 | |
|     def test_wait_return(self):
 | |
|         """
 | |
|         test the return value from barrier.wait
 | |
|         """
 | |
|         results = []
 | |
|         def f():
 | |
|             r = self.barrier.wait()
 | |
|             results.append(r)
 | |
| 
 | |
|         self.run_threads(f)
 | |
|         self.assertEqual(sum(results), sum(range(self.N)))
 | |
| 
 | |
|     def test_action(self):
 | |
|         """
 | |
|         Test the 'action' callback
 | |
|         """
 | |
|         results = []
 | |
|         def action():
 | |
|             results.append(True)
 | |
|         barrier = self.barriertype(self.N, action)
 | |
|         def f():
 | |
|             barrier.wait()
 | |
|             self.assertEqual(len(results), 1)
 | |
| 
 | |
|         self.run_threads(f)
 | |
| 
 | |
|     def test_abort(self):
 | |
|         """
 | |
|         Test that an abort will put the barrier in a broken state
 | |
|         """
 | |
|         results1 = []
 | |
|         results2 = []
 | |
|         def f():
 | |
|             try:
 | |
|                 i = self.barrier.wait()
 | |
|                 if i == self.N//2:
 | |
|                     raise RuntimeError
 | |
|                 self.barrier.wait()
 | |
|                 results1.append(True)
 | |
|             except threading.BrokenBarrierError:
 | |
|                 results2.append(True)
 | |
|             except RuntimeError:
 | |
|                 self.barrier.abort()
 | |
|                 pass
 | |
| 
 | |
|         self.run_threads(f)
 | |
|         self.assertEqual(len(results1), 0)
 | |
|         self.assertEqual(len(results2), self.N-1)
 | |
|         self.assertTrue(self.barrier.broken)
 | |
| 
 | |
|     def test_reset(self):
 | |
|         """
 | |
|         Test that a 'reset' on a barrier frees the waiting threads
 | |
|         """
 | |
|         results1 = []
 | |
|         results2 = []
 | |
|         results3 = []
 | |
|         def f():
 | |
|             i = self.barrier.wait()
 | |
|             if i == self.N//2:
 | |
|                 # Wait until the other threads are all in the barrier.
 | |
|                 for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
 | |
|                     if self.barrier.n_waiting >= (self.N - 1):
 | |
|                         break
 | |
|                 self.barrier.reset()
 | |
|             else:
 | |
|                 try:
 | |
|                     self.barrier.wait()
 | |
|                     results1.append(True)
 | |
|                 except threading.BrokenBarrierError:
 | |
|                     results2.append(True)
 | |
|             # Now, pass the barrier again
 | |
|             self.barrier.wait()
 | |
|             results3.append(True)
 | |
| 
 | |
|         self.run_threads(f)
 | |
|         self.assertEqual(len(results1), 0)
 | |
|         self.assertEqual(len(results2), self.N-1)
 | |
|         self.assertEqual(len(results3), self.N)
 | |
| 
 | |
| 
 | |
|     def test_abort_and_reset(self):
 | |
|         """
 | |
|         Test that a barrier can be reset after being broken.
 | |
|         """
 | |
|         results1 = []
 | |
|         results2 = []
 | |
|         results3 = []
 | |
|         barrier2 = self.barriertype(self.N)
 | |
|         def f():
 | |
|             try:
 | |
|                 i = self.barrier.wait()
 | |
|                 if i == self.N//2:
 | |
|                     raise RuntimeError
 | |
|                 self.barrier.wait()
 | |
|                 results1.append(True)
 | |
|             except threading.BrokenBarrierError:
 | |
|                 results2.append(True)
 | |
|             except RuntimeError:
 | |
|                 self.barrier.abort()
 | |
|                 pass
 | |
|             # Synchronize and reset the barrier.  Must synchronize first so
 | |
|             # that everyone has left it when we reset, and after so that no
 | |
|             # one enters it before the reset.
 | |
|             if barrier2.wait() == self.N//2:
 | |
|                 self.barrier.reset()
 | |
|             barrier2.wait()
 | |
|             self.barrier.wait()
 | |
|             results3.append(True)
 | |
| 
 | |
|         self.run_threads(f)
 | |
|         self.assertEqual(len(results1), 0)
 | |
|         self.assertEqual(len(results2), self.N-1)
 | |
|         self.assertEqual(len(results3), self.N)
 | |
| 
 | |
|     def test_timeout(self):
 | |
|         """
 | |
|         Test wait(timeout)
 | |
|         """
 | |
|         def f():
 | |
|             i = self.barrier.wait()
 | |
|             if i == self.N // 2:
 | |
|                 # One thread is late!
 | |
|                 time.sleep(self.defaultTimeout / 2)
 | |
|             # Default timeout is 2.0, so this is shorter.
 | |
|             self.assertRaises(threading.BrokenBarrierError,
 | |
|                               self.barrier.wait, self.defaultTimeout / 4)
 | |
|         self.run_threads(f)
 | |
| 
 | |
|     def test_default_timeout(self):
 | |
|         """
 | |
|         Test the barrier's default timeout
 | |
|         """
 | |
|         timeout = 0.100
 | |
|         barrier = self.barriertype(2, timeout=timeout)
 | |
|         def f():
 | |
|             self.assertRaises(threading.BrokenBarrierError,
 | |
|                               barrier.wait)
 | |
| 
 | |
|         start_time = time.monotonic()
 | |
|         with Bunch(f, 1):
 | |
|             pass
 | |
|         dt = time.monotonic() - start_time
 | |
|         self.assertGreaterEqual(dt, timeout)
 | |
| 
 | |
|     def test_single_thread(self):
 | |
|         b = self.barriertype(1)
 | |
|         b.wait()
 | |
|         b.wait()
 | |
| 
 | |
|     def test_repr(self):
 | |
|         barrier = self.barriertype(3)
 | |
|         timeout = support.LONG_TIMEOUT
 | |
|         self.assertRegex(repr(barrier), r"<\w+\.Barrier at .*: waiters=0/3>")
 | |
|         def f():
 | |
|             barrier.wait(timeout)
 | |
| 
 | |
|         N = 2
 | |
|         with Bunch(f, N):
 | |
|             # Threads blocked on barrier.wait()
 | |
|             for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
 | |
|                 if barrier.n_waiting >= N:
 | |
|                     break
 | |
|             self.assertRegex(repr(barrier),
 | |
|                              r"<\w+\.Barrier at .*: waiters=2/3>")
 | |
| 
 | |
|             # Threads unblocked
 | |
|             barrier.wait(timeout)
 | |
| 
 | |
|         self.assertRegex(repr(barrier),
 | |
|                          r"<\w+\.Barrier at .*: waiters=0/3>")
 | |
| 
 | |
|         # Abort the barrier
 | |
|         barrier.abort()
 | |
|         self.assertRegex(repr(barrier),
 | |
|                          r"<\w+\.Barrier at .*: broken>")
 |