cpython/Lib/test/test_queue.py
Christian Heimes 679db4aa99 Merged revisions 59985-60000,60002,60005-60007,60009-60042 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk

........
  r59987 | raymond.hettinger | 2008-01-15 21:52:42 +0100 (Tue, 15 Jan 2008) | 1 line

  Refactor if/elif chain for clarity and speed.  Remove dependency on subclasses having to implement _empty and _full.
........
  r59988 | raymond.hettinger | 2008-01-15 22:22:47 +0100 (Tue, 15 Jan 2008) | 1 line

  Fix-up half-written paragraph in the docs
........
  r59989 | amaury.forgeotdarc | 2008-01-15 22:25:11 +0100 (Tue, 15 Jan 2008) | 3 lines

  test_doctest fails since r59984.
  Not sure if these are the correct values, but save_stdout has to be set before its usage...
........
  r59992 | andrew.kuchling | 2008-01-16 01:32:03 +0100 (Wed, 16 Jan 2008) | 1 line

  Docstring typos
........
  r59993 | andrew.kuchling | 2008-01-16 04:17:25 +0100 (Wed, 16 Jan 2008) | 1 line

  Add PEP 3141 section
........
  r59998 | andrew.kuchling | 2008-01-16 14:01:51 +0100 (Wed, 16 Jan 2008) | 1 line

  Markup fix
........
  r59999 | georg.brandl | 2008-01-16 17:56:29 +0100 (Wed, 16 Jan 2008) | 2 lines

  Fix MSDN library URL. (#1854)
........
  r60006 | georg.brandl | 2008-01-16 21:27:56 +0100 (Wed, 16 Jan 2008) | 3 lines

  Add Python-specific content to Doc dir. Update configuration file
  to work with the newest Sphinx.
........
  r60007 | georg.brandl | 2008-01-16 21:29:00 +0100 (Wed, 16 Jan 2008) | 2 lines

  Doc build should work with 2.4 now.
........
  r60009 | raymond.hettinger | 2008-01-17 00:38:16 +0100 (Thu, 17 Jan 2008) | 1 line

  Minor wordsmithing.
........
  r60010 | raymond.hettinger | 2008-01-17 00:40:45 +0100 (Thu, 17 Jan 2008) | 1 line

  Add queues will alternative fetch orders (priority based and stack based).
........
  r60011 | raymond.hettinger | 2008-01-17 00:49:35 +0100 (Thu, 17 Jan 2008) | 1 line

  Add news entry.
........
  r60013 | raymond.hettinger | 2008-01-17 04:02:14 +0100 (Thu, 17 Jan 2008) | 1 line

  Make starmap() match its pure python definition and accept any itertable input (not just tuples).
........
  r60015 | gregory.p.smith | 2008-01-17 08:43:20 +0100 (Thu, 17 Jan 2008) | 3 lines

  Comply with RFC 3207.
  Fixes issue 829951 - http://bugs.python.org/issue829951
........
  r60018 | gregory.p.smith | 2008-01-17 09:03:17 +0100 (Thu, 17 Jan 2008) | 2 lines

  entry for r60015
........
  r60019 | raymond.hettinger | 2008-01-17 09:07:05 +0100 (Thu, 17 Jan 2008) | 1 line

  Note versionadded.
........
  r60020 | gregory.p.smith | 2008-01-17 09:35:49 +0100 (Thu, 17 Jan 2008) | 8 lines

  Fixes (accepts patch) issue1339 - http://bugs.python.org/issue1339
  - Factor out the duplication of EHLO/HELO in login() and sendmail() to
    a new function, ehlo_or_helo_if_needed().
  - Use ehlo_or_helo_if_needed() in starttls()
  - Check for the starttls exception in starttls() in the same way as
    login() checks for the auth extension.
  Contributed by Bill Fenner.
........
  r60021 | andrew.kuchling | 2008-01-17 13:00:15 +0100 (Thu, 17 Jan 2008) | 1 line

  Revise 3141 section a bit; add some Windows items
........
  r60022 | brett.cannon | 2008-01-17 19:45:10 +0100 (Thu, 17 Jan 2008) | 2 lines

  Fix a function pointer declaration to silence the compiler.
........
  r60024 | raymond.hettinger | 2008-01-17 20:31:38 +0100 (Thu, 17 Jan 2008) | 1 line

  Issue #1861:  Add read-only attribute listing upcoming events in the order they will be run.
........
  r60025 | andrew.kuchling | 2008-01-17 20:49:24 +0100 (Thu, 17 Jan 2008) | 1 line

  Correction from Jordan Lewis: halfdelay() uses tenths of a second, not milliseconds
........
  r60026 | raymond.hettinger | 2008-01-17 23:27:49 +0100 (Thu, 17 Jan 2008) | 1 line

  Add advice on choosing between scheduler and threading.Timer().
........
  r60028 | christian.heimes | 2008-01-18 00:01:44 +0100 (Fri, 18 Jan 2008) | 2 lines

  Updated new property syntax. An elaborate example for subclassing and the getter was missing.
  Added comment about VS 2008 and PGO builds.
........
  r60029 | raymond.hettinger | 2008-01-18 00:32:01 +0100 (Fri, 18 Jan 2008) | 1 line

  Fix-up Timer() example.
........
  r60030 | raymond.hettinger | 2008-01-18 00:56:56 +0100 (Fri, 18 Jan 2008) | 1 line

  Fix markup
........
  r60031 | raymond.hettinger | 2008-01-18 01:10:42 +0100 (Fri, 18 Jan 2008) | 1 line

  clearcache() needs to remove the dict as well as clear it.
........
  r60033 | andrew.kuchling | 2008-01-18 03:26:16 +0100 (Fri, 18 Jan 2008) | 1 line

  Bump verson
........
  r60034 | andrew.kuchling | 2008-01-18 03:42:52 +0100 (Fri, 18 Jan 2008) | 1 line

  Typo fix
........
  r60035 | christian.heimes | 2008-01-18 08:30:20 +0100 (Fri, 18 Jan 2008) | 3 lines

  Coverity issue CID #197
  var_decl: Declared variable "stm" without initializer
  ninit_use_in_call: Using uninitialized value "stm" (field "stm".tm_zone uninitialized) in call to function "mktime"
........
  r60036 | christian.heimes | 2008-01-18 08:45:30 +0100 (Fri, 18 Jan 2008) | 11 lines

  Coverity issue CID #167
  Event alloc_fn: Called allocation function "metacompile" [model]
  Event var_assign: Assigned variable "gr" to storage returned from "metacompile"
  		gr = metacompile(n);
  Event pass_arg: Variable "gr" not freed or pointed-to in function "maketables" [model]
  		g = maketables(gr);
    		translatelabels(g);
    		addfirstsets(g);
  Event leaked_storage: Returned without freeing storage "gr"
  		return g;
........
  r60038 | christian.heimes | 2008-01-18 09:04:57 +0100 (Fri, 18 Jan 2008) | 3 lines

  Coverity issue CID #182
  size_error: Allocating 1 bytes to pointer "children", which needs at least 4 bytes
........
  r60041 | christian.heimes | 2008-01-18 09:47:59 +0100 (Fri, 18 Jan 2008) | 4 lines

  Coverity issue CID #169
  local_ptr_assign_local: Assigning address of stack variable "namebuf" to pointer "filename"
  out_of_scope: Variable "namebuf" goes out of scope
  use_invalid: Used "filename" pointing to out-of-scope variable "namebuf"
........
  r60042 | christian.heimes | 2008-01-18 09:53:45 +0100 (Fri, 18 Jan 2008) | 2 lines

  Coverity CID #168
  leaked_storage: Returned without freeing storage "fp"
........
2008-01-18 09:56:22 +00:00

292 lines
9.7 KiB
Python

# Some simple Queue module tests, plus some failure conditions
# to ensure the Queue locks remain stable.
import Queue
import sys
import threading
import time
from test.test_support import verify, TestFailed, verbose
QUEUE_SIZE = 5
def qfull(q):
return q.maxsize > 0 and q.qsize() == q.maxsize
# A thread to run a function that unclogs a blocked Queue.
class _TriggerThread(threading.Thread):
def __init__(self, fn, args):
self.fn = fn
self.args = args
self.startedEvent = threading.Event()
threading.Thread.__init__(self)
def run(self):
# The sleep isn't necessary, but is intended to give the blocking
# function in the main thread a chance at actually blocking before
# we unclog it. But if the sleep is longer than the timeout-based
# tests wait in their blocking functions, those tests will fail.
# So we give them much longer timeout values compared to the
# sleep here (I aimed at 10 seconds for blocking functions --
# they should never actually wait that long - they should make
# progress as soon as we call self.fn()).
time.sleep(0.1)
self.startedEvent.set()
self.fn(*self.args)
# Execute a function that blocks, and in a separate thread, a function that
# triggers the release. Returns the result of the blocking function.
# Caution: block_func must guarantee to block until trigger_func is
# called, and trigger_func must guarantee to change queue state so that
# block_func can make enough progress to return. In particular, a
# block_func that just raises an exception regardless of whether trigger_func
# is called will lead to timing-dependent sporadic failures, and one of
# those went rarely seen but undiagnosed for years. Now block_func
# must be unexceptional. If block_func is supposed to raise an exception,
# call _doExceptionalBlockingTest() instead.
def _doBlockingTest(block_func, block_args, trigger_func, trigger_args):
t = _TriggerThread(trigger_func, trigger_args)
t.start()
result = block_func(*block_args)
# If block_func returned before our thread made the call, we failed!
if not t.startedEvent.isSet():
raise TestFailed("blocking function '%r' appeared not to block" %
block_func)
t.join(10) # make sure the thread terminates
if t.isAlive():
raise TestFailed("trigger function '%r' appeared to not return" %
trigger_func)
return result
# Call this instead if block_func is supposed to raise an exception.
def _doExceptionalBlockingTest(block_func, block_args, trigger_func,
trigger_args, expected_exception_class):
t = _TriggerThread(trigger_func, trigger_args)
t.start()
try:
try:
block_func(*block_args)
except expected_exception_class:
raise
else:
raise TestFailed("expected exception of kind %r" %
expected_exception_class)
finally:
t.join(10) # make sure the thread terminates
if t.isAlive():
raise TestFailed("trigger function '%r' appeared to not return" %
trigger_func)
if not t.startedEvent.isSet():
raise TestFailed("trigger thread ended but event never set")
# A Queue subclass that can provoke failure at a moment's notice :)
class FailingQueueException(Exception):
pass
class FailingQueue(Queue.Queue):
def __init__(self, *args):
self.fail_next_put = False
self.fail_next_get = False
Queue.Queue.__init__(self, *args)
def _put(self, item):
if self.fail_next_put:
self.fail_next_put = False
raise FailingQueueException("You Lose")
return Queue.Queue._put(self, item)
def _get(self):
if self.fail_next_get:
self.fail_next_get = False
raise FailingQueueException("You Lose")
return Queue.Queue._get(self)
def FailingQueueTest(q):
if q.qsize():
raise RuntimeError("Call this function with an empty queue")
for i in range(QUEUE_SIZE-1):
q.put(i)
# Test a failing non-blocking put.
q.fail_next_put = True
try:
q.put("oops", block=0)
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
q.fail_next_put = True
try:
q.put("oops", timeout=0.1)
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
q.put("last")
verify(qfull(q), "Queue should be full")
# Test a failing blocking put
q.fail_next_put = True
try:
_doBlockingTest(q.put, ("full",), q.get, ())
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
# Check the Queue isn't damaged.
# put failed, but get succeeded - re-add
q.put("last")
# Test a failing timeout put
q.fail_next_put = True
try:
_doExceptionalBlockingTest(q.put, ("full", True, 10), q.get, (),
FailingQueueException)
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
# Check the Queue isn't damaged.
# put failed, but get succeeded - re-add
q.put("last")
verify(qfull(q), "Queue should be full")
q.get()
verify(not qfull(q), "Queue should not be full")
q.put("last")
verify(qfull(q), "Queue should be full")
# Test a blocking put
_doBlockingTest( q.put, ("full",), q.get, ())
# Empty it
for i in range(QUEUE_SIZE):
q.get()
verify(not q.qsize(), "Queue should be empty")
q.put("first")
q.fail_next_get = True
try:
q.get()
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
verify(q.qsize(), "Queue should not be empty")
q.fail_next_get = True
try:
q.get(timeout=0.1)
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
verify(q.qsize(), "Queue should not be empty")
q.get()
verify(not q.qsize(), "Queue should be empty")
q.fail_next_get = True
try:
_doExceptionalBlockingTest(q.get, (), q.put, ('empty',),
FailingQueueException)
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
# put succeeded, but get failed.
verify(q.qsize(), "Queue should not be empty")
q.get()
verify(not q.qsize(), "Queue should be empty")
def SimpleQueueTest(q):
if q.qsize():
raise RuntimeError("Call this function with an empty queue")
# I guess we better check things actually queue correctly a little :)
q.put(111)
q.put(333)
q.put(222)
target_order = dict(Queue = [111, 333, 222],
LifoQueue = [222, 333, 111],
PriorityQueue = [111, 222, 333])
actual_order = [q.get(), q.get(), q.get()]
verify(actual_order == target_order[q.__class__.__name__],
"Didn't seem to queue the correct data!")
for i in range(QUEUE_SIZE-1):
q.put(i)
verify(q.qsize(), "Queue should not be empty")
verify(not qfull(q), "Queue should not be full")
last = 2*QUEUE_SIZE
full = 3*2*QUEUE_SIZE
q.put(last)
verify(qfull(q), "Queue should be full")
try:
q.put(full, block=0)
raise TestFailed("Didn't appear to block with a full queue")
except Queue.Full:
pass
try:
q.put(full, timeout=0.01)
raise TestFailed("Didn't appear to time-out with a full queue")
except Queue.Full:
pass
# Test a blocking put
_doBlockingTest(q.put, (full,), q.get, ())
_doBlockingTest(q.put, (full, True, 10), q.get, ())
# Empty it
for i in range(QUEUE_SIZE):
q.get()
verify(not q.qsize(), "Queue should be empty")
try:
q.get(block=0)
raise TestFailed("Didn't appear to block with an empty queue")
except Queue.Empty:
pass
try:
q.get(timeout=0.01)
raise TestFailed("Didn't appear to time-out with an empty queue")
except Queue.Empty:
pass
# Test a blocking get
_doBlockingTest(q.get, (), q.put, ('empty',))
_doBlockingTest(q.get, (True, 10), q.put, ('empty',))
cum = 0
cumlock = threading.Lock()
def worker(q):
global cum
while True:
x = q.get()
if x is None:
q.task_done()
return
cumlock.acquire()
try:
cum += x
finally:
cumlock.release()
q.task_done()
def QueueJoinTest(q):
global cum
cum = 0
for i in (0,1):
threading.Thread(target=worker, args=(q,)).start()
for i in range(100):
q.put(i)
q.join()
verify(cum==sum(range(100)), "q.join() did not block until all tasks were done")
q.put(None) # instruct the threads to close
q.join() # verify that you can join twice
def QueueTaskDoneTest(q):
try:
q.task_done()
except ValueError:
pass
else:
raise TestFailed("Did not detect task count going negative")
def test():
for Q in Queue.Queue, Queue.LifoQueue, Queue.PriorityQueue:
q = Q()
QueueTaskDoneTest(q)
QueueJoinTest(q)
QueueJoinTest(q)
QueueTaskDoneTest(q)
q = Q(QUEUE_SIZE)
# Do it a couple of times on the same queue
SimpleQueueTest(q)
SimpleQueueTest(q)
if verbose:
print("Simple Queue tests seemed to work for", Q.__name__)
q = FailingQueue(QUEUE_SIZE)
FailingQueueTest(q)
FailingQueueTest(q)
if verbose:
print("Failing Queue tests seemed to work")
test()