mirror of
https://github.com/python/cpython.git
synced 2025-07-28 21:55:21 +00:00

46640 Patch #1454481: Make thread stack size runtime tunable. 46647 Markup fix The first is causing many buildbots to fail test runs, and there are multiple causes with seemingly no immediate prospects for repairing them. See python-dev discussion. Note that a branch can (and should) be created for resolving these problems, like svn copy svn+ssh://svn.python.org/python/trunk -r46640 svn+ssh://svn.python.org/python/branches/NEW_BRANCH followed by merging rev 46647 to the new branch.
112 lines
3.2 KiB
Python
112 lines
3.2 KiB
Python
# Very rudimentary test of threading module
|
|
|
|
import test.test_support
|
|
from test.test_support import verbose
|
|
import random
|
|
import threading
|
|
import thread
|
|
import time
|
|
import unittest
|
|
|
|
# A trivial mutable counter.
|
|
class Counter(object):
|
|
def __init__(self):
|
|
self.value = 0
|
|
def inc(self):
|
|
self.value += 1
|
|
def dec(self):
|
|
self.value -= 1
|
|
def get(self):
|
|
return self.value
|
|
|
|
class TestThread(threading.Thread):
|
|
def __init__(self, name, testcase, sema, mutex, nrunning):
|
|
threading.Thread.__init__(self, name=name)
|
|
self.testcase = testcase
|
|
self.sema = sema
|
|
self.mutex = mutex
|
|
self.nrunning = nrunning
|
|
|
|
def run(self):
|
|
delay = random.random() * 2
|
|
if verbose:
|
|
print 'task', self.getName(), 'will run for', delay, 'sec'
|
|
|
|
self.sema.acquire()
|
|
|
|
self.mutex.acquire()
|
|
self.nrunning.inc()
|
|
if verbose:
|
|
print self.nrunning.get(), 'tasks are running'
|
|
self.testcase.assert_(self.nrunning.get() <= 3)
|
|
self.mutex.release()
|
|
|
|
time.sleep(delay)
|
|
if verbose:
|
|
print 'task', self.getName(), 'done'
|
|
|
|
self.mutex.acquire()
|
|
self.nrunning.dec()
|
|
self.testcase.assert_(self.nrunning.get() >= 0)
|
|
if verbose:
|
|
print self.getName(), 'is finished.', self.nrunning.get(), \
|
|
'tasks are running'
|
|
self.mutex.release()
|
|
|
|
self.sema.release()
|
|
|
|
class ThreadTests(unittest.TestCase):
|
|
|
|
# Create a bunch of threads, let each do some work, wait until all are
|
|
# done.
|
|
def test_various_ops(self):
|
|
# This takes about n/3 seconds to run (about n/3 clumps of tasks,
|
|
# times about 1 second per clump).
|
|
NUMTASKS = 10
|
|
|
|
# no more than 3 of the 10 can run at once
|
|
sema = threading.BoundedSemaphore(value=3)
|
|
mutex = threading.RLock()
|
|
numrunning = Counter()
|
|
|
|
threads = []
|
|
|
|
for i in range(NUMTASKS):
|
|
t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
|
|
threads.append(t)
|
|
t.start()
|
|
|
|
if verbose:
|
|
print 'waiting for all tasks to complete'
|
|
for t in threads:
|
|
t.join(NUMTASKS)
|
|
self.assert_(not t.isAlive())
|
|
if verbose:
|
|
print 'all tasks done'
|
|
self.assertEqual(numrunning.get(), 0)
|
|
|
|
def test_foreign_thread(self):
|
|
# Check that a "foreign" thread can use the threading module.
|
|
def f(mutex):
|
|
# Acquiring an RLock forces an entry for the foreign
|
|
# thread to get made in the threading._active map.
|
|
r = threading.RLock()
|
|
r.acquire()
|
|
r.release()
|
|
mutex.release()
|
|
|
|
mutex = threading.Lock()
|
|
mutex.acquire()
|
|
tid = thread.start_new_thread(f, (mutex,))
|
|
# Wait for the thread to finish.
|
|
mutex.acquire()
|
|
self.assert_(tid in threading._active)
|
|
self.assert_(isinstance(threading._active[tid],
|
|
threading._DummyThread))
|
|
del threading._active[tid]
|
|
|
|
def test_main():
|
|
test.test_support.run_unittest(ThreadTests)
|
|
|
|
if __name__ == "__main__":
|
|
test_main()
|