SF Patch #1455676: Simplify using Queues with daemon consumer threads

Adds join() and task_done() methods to track when all enqueued tasks have
been gotten and fully processed by daemon consumer threads.
This commit is contained in:
Raymond Hettinger 2006-03-24 20:43:29 +00:00
parent 98bcb70815
commit fd3fcf0b35
4 changed files with 128 additions and 0 deletions

View file

@ -221,7 +221,37 @@ def SimpleQueueTest(q):
_doBlockingTest(q.get, (), q.put, ('empty',))
_doBlockingTest(q.get, (True, 10), q.put, ('empty',))
cum = 0
cumlock = threading.Lock()
def worker(q):
global cum
while True:
x = q.get()
cumlock.acquire()
try:
cum += x
finally:
cumlock.release()
q.task_done()
def QueueJoinTest(q):
global cum
cum = 0
for i in (0,1):
t = threading.Thread(target=worker, args=(q,))
t.setDaemon(True)
t.start()
for i in xrange(100):
q.put(i)
q.join()
verify(cum==sum(range(100)), "q.join() did not block until all tasks were done")
def test():
q = Queue.Queue()
QueueJoinTest(q)
QueueJoinTest(q)
q = Queue.Queue(QUEUE_SIZE)
# Do it a couple of times on the same queue
SimpleQueueTest(q)