mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 11:49:12 +00:00 
			
		
		
		
	to stdout. Repaired by not printing at all except in verbose mode. Made the test about 6x faster -- envelope analysis showed it took time proportional to the square of the # of tasks. Now it's linear.
		
			
				
	
	
		
			55 lines
		
	
	
	
		
			1.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			55 lines
		
	
	
	
		
			1.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Very rudimentary test of threading module
 | 
						|
 | 
						|
# Create a bunch of threads, let each do some work, wait until all are done
 | 
						|
 | 
						|
from test_support import verbose
 | 
						|
import random
 | 
						|
import threading
 | 
						|
import time
 | 
						|
 | 
						|
# This takes about n/3 seconds to run (about n/3 clumps of tasks, times
 | 
						|
# about 1 second per clump).
 | 
						|
numtasks = 10
 | 
						|
 | 
						|
# no more than 3 of the 10 can run at once
 | 
						|
sema = threading.BoundedSemaphore(value=3)
 | 
						|
mutex = threading.RLock()
 | 
						|
running = 0
 | 
						|
 | 
						|
class TestThread(threading.Thread):
 | 
						|
    def run(self):
 | 
						|
        global running
 | 
						|
        delay = random.random() * 2
 | 
						|
        if verbose:
 | 
						|
            print 'task', self.getName(), 'will run for', delay, 'sec'
 | 
						|
        sema.acquire()
 | 
						|
        mutex.acquire()
 | 
						|
        running = running + 1
 | 
						|
        if verbose:
 | 
						|
            print running, 'tasks are running'
 | 
						|
        mutex.release()
 | 
						|
        time.sleep(delay)
 | 
						|
        if verbose:
 | 
						|
            print 'task', self.getName(), 'done'
 | 
						|
        mutex.acquire()
 | 
						|
        running = running - 1
 | 
						|
        if verbose:
 | 
						|
            print self.getName(), 'is finished.', running, 'tasks are running'
 | 
						|
        mutex.release()
 | 
						|
        sema.release()
 | 
						|
 | 
						|
threads = []
 | 
						|
def starttasks():
 | 
						|
    for i in range(numtasks):
 | 
						|
        t = TestThread(name="<thread %d>"%i)
 | 
						|
        threads.append(t)
 | 
						|
        t.start()
 | 
						|
 | 
						|
starttasks()
 | 
						|
 | 
						|
if verbose:
 | 
						|
    print 'waiting for all tasks to complete'
 | 
						|
for t in threads:
 | 
						|
    t.join()
 | 
						|
if verbose:
 | 
						|
    print 'all tasks done'
 |