mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 11:49:12 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			176 lines
		
	
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			176 lines
		
	
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from . import util as test_util
 | 
						|
 | 
						|
init = test_util.import_importlib('importlib')
 | 
						|
 | 
						|
import sys
 | 
						|
import time
 | 
						|
import unittest
 | 
						|
import weakref
 | 
						|
 | 
						|
from test import support
 | 
						|
 | 
						|
try:
 | 
						|
    import threading
 | 
						|
except ImportError:
 | 
						|
    threading = None
 | 
						|
else:
 | 
						|
    from test import lock_tests
 | 
						|
 | 
						|
if threading is not None:
 | 
						|
    class ModuleLockAsRLockTests:
 | 
						|
        locktype = classmethod(lambda cls: cls.LockType("some_lock"))
 | 
						|
 | 
						|
        # _is_owned() unsupported
 | 
						|
        test__is_owned = None
 | 
						|
        # acquire(blocking=False) unsupported
 | 
						|
        test_try_acquire = None
 | 
						|
        test_try_acquire_contended = None
 | 
						|
        # `with` unsupported
 | 
						|
        test_with = None
 | 
						|
        # acquire(timeout=...) unsupported
 | 
						|
        test_timeout = None
 | 
						|
        # _release_save() unsupported
 | 
						|
        test_release_save_unacquired = None
 | 
						|
        # lock status in repr unsupported
 | 
						|
        test_repr = None
 | 
						|
        test_locked_repr = None
 | 
						|
 | 
						|
    LOCK_TYPES = {kind: splitinit._bootstrap._ModuleLock
 | 
						|
                  for kind, splitinit in init.items()}
 | 
						|
 | 
						|
    (Frozen_ModuleLockAsRLockTests,
 | 
						|
     Source_ModuleLockAsRLockTests
 | 
						|
     ) = test_util.test_both(ModuleLockAsRLockTests, lock_tests.RLockTests,
 | 
						|
                             LockType=LOCK_TYPES)
 | 
						|
else:
 | 
						|
    LOCK_TYPES = {}
 | 
						|
 | 
						|
    class Frozen_ModuleLockAsRLockTests(unittest.TestCase):
 | 
						|
        pass
 | 
						|
 | 
						|
    class Source_ModuleLockAsRLockTests(unittest.TestCase):
 | 
						|
        pass
 | 
						|
 | 
						|
 | 
						|
if threading is not None:
 | 
						|
    class DeadlockAvoidanceTests:
 | 
						|
 | 
						|
        def setUp(self):
 | 
						|
            try:
 | 
						|
                self.old_switchinterval = sys.getswitchinterval()
 | 
						|
                sys.setswitchinterval(0.000001)
 | 
						|
            except AttributeError:
 | 
						|
                self.old_switchinterval = None
 | 
						|
 | 
						|
        def tearDown(self):
 | 
						|
            if self.old_switchinterval is not None:
 | 
						|
                sys.setswitchinterval(self.old_switchinterval)
 | 
						|
 | 
						|
        def run_deadlock_avoidance_test(self, create_deadlock):
 | 
						|
            NLOCKS = 10
 | 
						|
            locks = [self.LockType(str(i)) for i in range(NLOCKS)]
 | 
						|
            pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
 | 
						|
            if create_deadlock:
 | 
						|
                NTHREADS = NLOCKS
 | 
						|
            else:
 | 
						|
                NTHREADS = NLOCKS - 1
 | 
						|
            barrier = threading.Barrier(NTHREADS)
 | 
						|
            results = []
 | 
						|
 | 
						|
            def _acquire(lock):
 | 
						|
                """Try to acquire the lock. Return True on success,
 | 
						|
                False on deadlock."""
 | 
						|
                try:
 | 
						|
                    lock.acquire()
 | 
						|
                except self.DeadlockError:
 | 
						|
                    return False
 | 
						|
                else:
 | 
						|
                    return True
 | 
						|
 | 
						|
            def f():
 | 
						|
                a, b = pairs.pop()
 | 
						|
                ra = _acquire(a)
 | 
						|
                barrier.wait()
 | 
						|
                rb = _acquire(b)
 | 
						|
                results.append((ra, rb))
 | 
						|
                if rb:
 | 
						|
                    b.release()
 | 
						|
                if ra:
 | 
						|
                    a.release()
 | 
						|
            lock_tests.Bunch(f, NTHREADS).wait_for_finished()
 | 
						|
            self.assertEqual(len(results), NTHREADS)
 | 
						|
            return results
 | 
						|
 | 
						|
        def test_deadlock(self):
 | 
						|
            results = self.run_deadlock_avoidance_test(True)
 | 
						|
            # At least one of the threads detected a potential deadlock on its
 | 
						|
            # second acquire() call.  It may be several of them, because the
 | 
						|
            # deadlock avoidance mechanism is conservative.
 | 
						|
            nb_deadlocks = results.count((True, False))
 | 
						|
            self.assertGreaterEqual(nb_deadlocks, 1)
 | 
						|
            self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks)
 | 
						|
 | 
						|
        def test_no_deadlock(self):
 | 
						|
            results = self.run_deadlock_avoidance_test(False)
 | 
						|
            self.assertEqual(results.count((True, False)), 0)
 | 
						|
            self.assertEqual(results.count((True, True)), len(results))
 | 
						|
 | 
						|
 | 
						|
    DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError
 | 
						|
                       for kind, splitinit in init.items()}
 | 
						|
 | 
						|
    (Frozen_DeadlockAvoidanceTests,
 | 
						|
     Source_DeadlockAvoidanceTests
 | 
						|
     ) = test_util.test_both(DeadlockAvoidanceTests,
 | 
						|
                             LockType=LOCK_TYPES,
 | 
						|
                             DeadlockError=DEADLOCK_ERRORS)
 | 
						|
else:
 | 
						|
    DEADLOCK_ERRORS = {}
 | 
						|
 | 
						|
    class Frozen_DeadlockAvoidanceTests(unittest.TestCase):
 | 
						|
        pass
 | 
						|
 | 
						|
    class Source_DeadlockAvoidanceTests(unittest.TestCase):
 | 
						|
        pass
 | 
						|
 | 
						|
 | 
						|
class LifetimeTests:
 | 
						|
 | 
						|
    @property
 | 
						|
    def bootstrap(self):
 | 
						|
        return self.init._bootstrap
 | 
						|
 | 
						|
    def test_lock_lifetime(self):
 | 
						|
        name = "xyzzy"
 | 
						|
        self.assertNotIn(name, self.bootstrap._module_locks)
 | 
						|
        lock = self.bootstrap._get_module_lock(name)
 | 
						|
        self.assertIn(name, self.bootstrap._module_locks)
 | 
						|
        wr = weakref.ref(lock)
 | 
						|
        del lock
 | 
						|
        support.gc_collect()
 | 
						|
        self.assertNotIn(name, self.bootstrap._module_locks)
 | 
						|
        self.assertIsNone(wr())
 | 
						|
 | 
						|
    def test_all_locks(self):
 | 
						|
        support.gc_collect()
 | 
						|
        self.assertEqual(0, len(self.bootstrap._module_locks),
 | 
						|
                         self.bootstrap._module_locks)
 | 
						|
 | 
						|
 | 
						|
(Frozen_LifetimeTests,
 | 
						|
 Source_LifetimeTests
 | 
						|
 ) = test_util.test_both(LifetimeTests, init=init)
 | 
						|
 | 
						|
 | 
						|
@support.reap_threads
 | 
						|
def test_main():
 | 
						|
    support.run_unittest(Frozen_ModuleLockAsRLockTests,
 | 
						|
                         Source_ModuleLockAsRLockTests,
 | 
						|
                         Frozen_DeadlockAvoidanceTests,
 | 
						|
                         Source_DeadlockAvoidanceTests,
 | 
						|
                         Frozen_LifetimeTests,
 | 
						|
                         Source_LifetimeTests)
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    test_main()
 |