mirror of
https://github.com/python/cpython.git
synced 2025-08-15 06:10:47 +00:00
Merged revisions 76137,76172 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk ........ r76137 | antoine.pitrou | 2009-11-06 23:34:35 +0100 (ven., 06 nov. 2009) | 4 lines Issue #7270: Add some dedicated unit tests for multi-thread synchronization primitives such as Lock, RLock, Condition, Event and Semaphore. ........ r76172 | antoine.pitrou | 2009-11-09 17:00:11 +0100 (lun., 09 nov. 2009) | 5 lines Issue #7282: Fix a memory leak when an RLock was used in a thread other than those started through `threading.Thread` (for example, using `thread.start_new_thread()`. ........
This commit is contained in:
parent
f4b581ed25
commit
c747d3a5d2
5 changed files with 604 additions and 30 deletions
|
@ -11,6 +11,8 @@ import time
|
|||
import unittest
|
||||
import weakref
|
||||
|
||||
from test import lock_tests
|
||||
|
||||
# A trivial mutable counter.
|
||||
class Counter(object):
|
||||
def __init__(self):
|
||||
|
@ -132,11 +134,9 @@ class ThreadTests(unittest.TestCase):
|
|||
def test_foreign_thread(self):
|
||||
# Check that a "foreign" thread can use the threading module.
|
||||
def f(mutex):
|
||||
# Acquiring an RLock forces an entry for the foreign
|
||||
# Calling current_thread() forces an entry for the foreign
|
||||
# thread to get made in the threading._active map.
|
||||
r = threading.RLock()
|
||||
r.acquire()
|
||||
r.release()
|
||||
threading.current_thread()
|
||||
mutex.release()
|
||||
|
||||
mutex = threading.Lock()
|
||||
|
@ -453,22 +453,6 @@ class ThreadingExceptionTests(unittest.TestCase):
|
|||
thread.start()
|
||||
self.assertRaises(RuntimeError, thread.start)
|
||||
|
||||
def test_releasing_unacquired_rlock(self):
|
||||
rlock = threading.RLock()
|
||||
self.assertRaises(RuntimeError, rlock.release)
|
||||
|
||||
def test_waiting_on_unacquired_condition(self):
|
||||
cond = threading.Condition()
|
||||
self.assertRaises(RuntimeError, cond.wait)
|
||||
|
||||
def test_notify_on_unacquired_condition(self):
|
||||
cond = threading.Condition()
|
||||
self.assertRaises(RuntimeError, cond.notify)
|
||||
|
||||
def test_semaphore_with_negative_value(self):
|
||||
self.assertRaises(ValueError, threading.Semaphore, value = -1)
|
||||
self.assertRaises(ValueError, threading.Semaphore, value = -sys.maxint)
|
||||
|
||||
def test_joining_current_thread(self):
|
||||
current_thread = threading.current_thread()
|
||||
self.assertRaises(RuntimeError, current_thread.join);
|
||||
|
@ -483,8 +467,34 @@ class ThreadingExceptionTests(unittest.TestCase):
|
|||
self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
|
||||
|
||||
|
||||
class LockTests(lock_tests.LockTests):
|
||||
locktype = staticmethod(threading.Lock)
|
||||
|
||||
class RLockTests(lock_tests.RLockTests):
|
||||
locktype = staticmethod(threading.RLock)
|
||||
|
||||
class EventTests(lock_tests.EventTests):
|
||||
eventtype = staticmethod(threading.Event)
|
||||
|
||||
class ConditionAsRLockTests(lock_tests.RLockTests):
|
||||
# An Condition uses an RLock by default and exports its API.
|
||||
locktype = staticmethod(threading.Condition)
|
||||
|
||||
class ConditionTests(lock_tests.ConditionTests):
|
||||
condtype = staticmethod(threading.Condition)
|
||||
|
||||
class SemaphoreTests(lock_tests.SemaphoreTests):
|
||||
semtype = staticmethod(threading.Semaphore)
|
||||
|
||||
class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
|
||||
semtype = staticmethod(threading.BoundedSemaphore)
|
||||
|
||||
|
||||
def test_main():
|
||||
test.test_support.run_unittest(ThreadTests,
|
||||
test.test_support.run_unittest(LockTests, RLockTests, EventTests,
|
||||
ConditionAsRLockTests, ConditionTests,
|
||||
SemaphoreTests, BoundedSemaphoreTests,
|
||||
ThreadTests,
|
||||
ThreadJoinOnShutdown,
|
||||
ThreadingExceptionTests,
|
||||
)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue