Add queue tests for empty, full, put_nowait, and get_nowait.

Closes issue 9357. Thanks to Brian Brazil for the patch.
This commit is contained in:
Brett Cannon 2010-07-23 16:56:21 +00:00
parent 6b0e0e41b8
commit 671153db25

View file

@ -90,6 +90,8 @@ class BaseQueueTest(unittest.TestCase, BlockingTestMixin):
def simple_queue_test(self, q):
if q.qsize():
raise RuntimeError("Call this function with an empty queue")
self.assertTrue(q.empty())
self.assertFalse(q.full())
# I guess we better check things actually queue correctly a little :)
q.put(111)
q.put(333)
@ -108,6 +110,8 @@ class BaseQueueTest(unittest.TestCase, BlockingTestMixin):
full = 3 * 2 * QUEUE_SIZE
q.put(last)
self.assertTrue(qfull(q), "Queue should be full")
self.assertFalse(q.empty())
self.assertTrue(q.full())
try:
q.put(full, block=0)
self.fail("Didn't appear to block with a full queue")
@ -193,6 +197,25 @@ class BaseQueueTest(unittest.TestCase, BlockingTestMixin):
self.simple_queue_test(q)
self.simple_queue_test(q)
def test_negative_timeout_raises_exception(self):
q = self.type2test(QUEUE_SIZE)
with self.assertRaises(ValueError):
q.put(1, timeout=-1)
with self.assertRaises(ValueError):
q.get(1, timeout=-1)
def test_nowait(self):
q = self.type2test(QUEUE_SIZE)
for i in range(QUEUE_SIZE):
q.put_nowait(1)
with self.assertRaises(queue.Full):
q.put_nowait(1)
for i in range(QUEUE_SIZE):
q.get_nowait()
with self.assertRaises(queue.Empty):
q.get_nowait()
class QueueTest(BaseQueueTest):
type2test = queue.Queue