Converted to a unittest. Added checks that the bounded semaphore actually

does what it's supposed to do.
This commit is contained in:
Tim Peters 2005-01-08 06:03:17 +00:00
parent e6539c4401
commit 84d548994e

View file

@ -1,55 +1,91 @@
# Very rudimentary test of threading module # Very rudimentary test of threading module
# Create a bunch of threads, let each do some work, wait until all are done import test.test_support
from test.test_support import verbose from test.test_support import verbose
import random import random
import threading import threading
import time import time
import unittest
# This takes about n/3 seconds to run (about n/3 clumps of tasks, times # A trivial mutable counter.
# about 1 second per clump). class Counter(object):
numtasks = 10 def __init__(self):
self.value = 0
# no more than 3 of the 10 can run at once def inc(self):
sema = threading.BoundedSemaphore(value=3) self.value += 1
mutex = threading.RLock() def dec(self):
running = 0 self.value -= 1
def get(self):
return self.value
class TestThread(threading.Thread): class TestThread(threading.Thread):
def __init__(self, name, testcase, sema, mutex, nrunning):
threading.Thread.__init__(self, name=name)
self.testcase = testcase
self.sema = sema
self.mutex = mutex
self.nrunning = nrunning
def run(self): def run(self):
global running
delay = random.random() * 2 delay = random.random() * 2
if verbose: if verbose:
print 'task', self.getName(), 'will run for', delay, 'sec' print 'task', self.getName(), 'will run for', delay, 'sec'
sema.acquire()
mutex.acquire() self.sema.acquire()
running = running + 1
self.mutex.acquire()
self.nrunning.inc()
if verbose: if verbose:
print running, 'tasks are running' print self.nrunning.get(), 'tasks are running'
mutex.release() self.testcase.assert_(self.nrunning.get() <= 3)
self.mutex.release()
time.sleep(delay) time.sleep(delay)
if verbose: if verbose:
print 'task', self.getName(), 'done' print 'task', self.getName(), 'done'
mutex.acquire()
running = running - 1
if verbose:
print self.getName(), 'is finished.', running, 'tasks are running'
mutex.release()
sema.release()
threads = [] self.mutex.acquire()
def starttasks(): self.nrunning.dec()
for i in range(numtasks): self.testcase.assert_(self.nrunning.get() >= 0)
t = TestThread(name="<thread %d>"%i) if verbose:
print self.getName(), 'is finished.', self.nrunning.get(), \
'tasks are running'
self.mutex.release()
self.sema.release()
class ThreadTests(unittest.TestCase):
# Create a bunch of threads, let each do some work, wait until all are
# done.
def test_various_ops(self):
# This takes about n/3 seconds to run (about n/3 clumps of tasks,
# times about 1 second per clump).
NUMTASKS = 10
# no more than 3 of the 10 can run at once
sema = threading.BoundedSemaphore(value=3)
mutex = threading.RLock()
numrunning = Counter()
threads = []
for i in range(NUMTASKS):
t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
threads.append(t) threads.append(t)
t.start() t.start()
starttasks() if verbose:
if verbose:
print 'waiting for all tasks to complete' print 'waiting for all tasks to complete'
for t in threads: for t in threads:
t.join() t.join()
if verbose: if verbose:
print 'all tasks done' print 'all tasks done'
self.assertEqual(numrunning.get(), 0)
def test_main():
test.test_support.run_unittest(ThreadTests)
if __name__ == "__main__":
test_main()