mirror of
https://github.com/python/cpython.git
synced 2025-08-09 11:29:45 +00:00

This should make the Linux distros happy as it is now easier to leave importlib's tests out of their base Python distribution.
115 lines
3.3 KiB
Python
115 lines
3.3 KiB
Python
from importlib import _bootstrap
|
|
import time
|
|
import unittest
|
|
import weakref
|
|
|
|
from test import support
|
|
|
|
try:
|
|
import threading
|
|
except ImportError:
|
|
threading = None
|
|
else:
|
|
from test import lock_tests
|
|
|
|
|
|
LockType = _bootstrap._ModuleLock
|
|
DeadlockError = _bootstrap._DeadlockError
|
|
|
|
|
|
if threading is not None:
|
|
class ModuleLockAsRLockTests(lock_tests.RLockTests):
|
|
locktype = staticmethod(lambda: LockType("some_lock"))
|
|
|
|
# _is_owned() unsupported
|
|
test__is_owned = None
|
|
# acquire(blocking=False) unsupported
|
|
test_try_acquire = None
|
|
test_try_acquire_contended = None
|
|
# `with` unsupported
|
|
test_with = None
|
|
# acquire(timeout=...) unsupported
|
|
test_timeout = None
|
|
# _release_save() unsupported
|
|
test_release_save_unacquired = None
|
|
|
|
else:
|
|
class ModuleLockAsRLockTests(unittest.TestCase):
|
|
pass
|
|
|
|
|
|
@unittest.skipUnless(threading, "threads needed for this test")
|
|
class DeadlockAvoidanceTests(unittest.TestCase):
|
|
|
|
def run_deadlock_avoidance_test(self, create_deadlock):
|
|
NLOCKS = 10
|
|
locks = [LockType(str(i)) for i in range(NLOCKS)]
|
|
pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
|
|
if create_deadlock:
|
|
NTHREADS = NLOCKS
|
|
else:
|
|
NTHREADS = NLOCKS - 1
|
|
barrier = threading.Barrier(NTHREADS)
|
|
results = []
|
|
def _acquire(lock):
|
|
"""Try to acquire the lock. Return True on success, False on deadlock."""
|
|
try:
|
|
lock.acquire()
|
|
except DeadlockError:
|
|
return False
|
|
else:
|
|
return True
|
|
def f():
|
|
a, b = pairs.pop()
|
|
ra = _acquire(a)
|
|
barrier.wait()
|
|
rb = _acquire(b)
|
|
results.append((ra, rb))
|
|
if rb:
|
|
b.release()
|
|
if ra:
|
|
a.release()
|
|
lock_tests.Bunch(f, NTHREADS).wait_for_finished()
|
|
self.assertEqual(len(results), NTHREADS)
|
|
return results
|
|
|
|
def test_deadlock(self):
|
|
results = self.run_deadlock_avoidance_test(True)
|
|
# One of the threads detected a potential deadlock on its second
|
|
# acquire() call.
|
|
self.assertEqual(results.count((True, False)), 1)
|
|
self.assertEqual(results.count((True, True)), len(results) - 1)
|
|
|
|
def test_no_deadlock(self):
|
|
results = self.run_deadlock_avoidance_test(False)
|
|
self.assertEqual(results.count((True, False)), 0)
|
|
self.assertEqual(results.count((True, True)), len(results))
|
|
|
|
|
|
class LifetimeTests(unittest.TestCase):
|
|
|
|
def test_lock_lifetime(self):
|
|
name = "xyzzy"
|
|
self.assertNotIn(name, _bootstrap._module_locks)
|
|
lock = _bootstrap._get_module_lock(name)
|
|
self.assertIn(name, _bootstrap._module_locks)
|
|
wr = weakref.ref(lock)
|
|
del lock
|
|
support.gc_collect()
|
|
self.assertNotIn(name, _bootstrap._module_locks)
|
|
self.assertIsNone(wr())
|
|
|
|
def test_all_locks(self):
|
|
support.gc_collect()
|
|
self.assertEqual(0, len(_bootstrap._module_locks), _bootstrap._module_locks)
|
|
|
|
|
|
@support.reap_threads
|
|
def test_main():
|
|
support.run_unittest(ModuleLockAsRLockTests,
|
|
DeadlockAvoidanceTests,
|
|
LifetimeTests)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
test_main()
|