mirror of
https://github.com/python/cpython.git
synced 2025-07-08 03:45:36 +00:00

Instead of explicitly enumerate test classes for run_unittest() use the unittest ability to discover tests. This also makes these tests discoverable and runnable with unittest. load_tests() can be used for dynamic generating tests and adding doctests. setUpModule(), tearDownModule() and addModuleCleanup() can be used for running code before and after all module tests.
149 lines
4.4 KiB
Python
149 lines
4.4 KiB
Python
from . import util as test_util
|
|
|
|
init = test_util.import_importlib('importlib')
|
|
|
|
import sys
|
|
import threading
|
|
import unittest
|
|
import weakref
|
|
|
|
from test import support
|
|
from test.support import threading_helper
|
|
from test import lock_tests
|
|
|
|
|
|
class ModuleLockAsRLockTests:
|
|
locktype = classmethod(lambda cls: cls.LockType("some_lock"))
|
|
|
|
# _is_owned() unsupported
|
|
test__is_owned = None
|
|
# acquire(blocking=False) unsupported
|
|
test_try_acquire = None
|
|
test_try_acquire_contended = None
|
|
# `with` unsupported
|
|
test_with = None
|
|
# acquire(timeout=...) unsupported
|
|
test_timeout = None
|
|
# _release_save() unsupported
|
|
test_release_save_unacquired = None
|
|
# lock status in repr unsupported
|
|
test_repr = None
|
|
test_locked_repr = None
|
|
|
|
LOCK_TYPES = {kind: splitinit._bootstrap._ModuleLock
|
|
for kind, splitinit in init.items()}
|
|
|
|
(Frozen_ModuleLockAsRLockTests,
|
|
Source_ModuleLockAsRLockTests
|
|
) = test_util.test_both(ModuleLockAsRLockTests, lock_tests.RLockTests,
|
|
LockType=LOCK_TYPES)
|
|
|
|
|
|
class DeadlockAvoidanceTests:
|
|
|
|
def setUp(self):
|
|
try:
|
|
self.old_switchinterval = sys.getswitchinterval()
|
|
support.setswitchinterval(0.000001)
|
|
except AttributeError:
|
|
self.old_switchinterval = None
|
|
|
|
def tearDown(self):
|
|
if self.old_switchinterval is not None:
|
|
sys.setswitchinterval(self.old_switchinterval)
|
|
|
|
def run_deadlock_avoidance_test(self, create_deadlock):
|
|
NLOCKS = 10
|
|
locks = [self.LockType(str(i)) for i in range(NLOCKS)]
|
|
pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
|
|
if create_deadlock:
|
|
NTHREADS = NLOCKS
|
|
else:
|
|
NTHREADS = NLOCKS - 1
|
|
barrier = threading.Barrier(NTHREADS)
|
|
results = []
|
|
|
|
def _acquire(lock):
|
|
"""Try to acquire the lock. Return True on success,
|
|
False on deadlock."""
|
|
try:
|
|
lock.acquire()
|
|
except self.DeadlockError:
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def f():
|
|
a, b = pairs.pop()
|
|
ra = _acquire(a)
|
|
barrier.wait()
|
|
rb = _acquire(b)
|
|
results.append((ra, rb))
|
|
if rb:
|
|
b.release()
|
|
if ra:
|
|
a.release()
|
|
lock_tests.Bunch(f, NTHREADS).wait_for_finished()
|
|
self.assertEqual(len(results), NTHREADS)
|
|
return results
|
|
|
|
def test_deadlock(self):
|
|
results = self.run_deadlock_avoidance_test(True)
|
|
# At least one of the threads detected a potential deadlock on its
|
|
# second acquire() call. It may be several of them, because the
|
|
# deadlock avoidance mechanism is conservative.
|
|
nb_deadlocks = results.count((True, False))
|
|
self.assertGreaterEqual(nb_deadlocks, 1)
|
|
self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks)
|
|
|
|
def test_no_deadlock(self):
|
|
results = self.run_deadlock_avoidance_test(False)
|
|
self.assertEqual(results.count((True, False)), 0)
|
|
self.assertEqual(results.count((True, True)), len(results))
|
|
|
|
|
|
DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError
|
|
for kind, splitinit in init.items()}
|
|
|
|
(Frozen_DeadlockAvoidanceTests,
|
|
Source_DeadlockAvoidanceTests
|
|
) = test_util.test_both(DeadlockAvoidanceTests,
|
|
LockType=LOCK_TYPES,
|
|
DeadlockError=DEADLOCK_ERRORS)
|
|
|
|
|
|
class LifetimeTests:
|
|
|
|
@property
|
|
def bootstrap(self):
|
|
return self.init._bootstrap
|
|
|
|
def test_lock_lifetime(self):
|
|
name = "xyzzy"
|
|
self.assertNotIn(name, self.bootstrap._module_locks)
|
|
lock = self.bootstrap._get_module_lock(name)
|
|
self.assertIn(name, self.bootstrap._module_locks)
|
|
wr = weakref.ref(lock)
|
|
del lock
|
|
support.gc_collect()
|
|
self.assertNotIn(name, self.bootstrap._module_locks)
|
|
self.assertIsNone(wr())
|
|
|
|
def test_all_locks(self):
|
|
support.gc_collect()
|
|
self.assertEqual(0, len(self.bootstrap._module_locks),
|
|
self.bootstrap._module_locks)
|
|
|
|
|
|
(Frozen_LifetimeTests,
|
|
Source_LifetimeTests
|
|
) = test_util.test_both(LifetimeTests, init=init)
|
|
|
|
|
|
def setUpModule():
|
|
thread_info = threading_helper.threading_setup()
|
|
unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittets.main()
|