mirror of
https://github.com/python/cpython.git
synced 2025-07-28 21:55:21 +00:00
* Use the same code to profile for test_profile and test_cprofile.
* Convert both to unittest. * Use the same unit testing code. * Include the expected output in both test files. * Make it possible to regenerate the expected output by running the file as a script with an '-r' argument.
This commit is contained in:
parent
3c0f309fd1
commit
b70907796a
6 changed files with 390 additions and 406 deletions
|
@ -9,6 +9,10 @@ from test import test_support
|
|||
|
||||
QUEUE_SIZE = 5
|
||||
|
||||
def qfull(q):
|
||||
return q.maxsize > 0 and q.qsize() == q.maxsize
|
||||
|
||||
|
||||
# A thread to run a function that unclogs a blocked Queue.
|
||||
class _TriggerThread(threading.Thread):
|
||||
def __init__(self, fn, args):
|
||||
|
@ -86,7 +90,7 @@ class BaseQueueTest(unittest.TestCase, BlockingTestMixin):
|
|||
self.cumlock = threading.Lock()
|
||||
|
||||
def simple_queue_test(self, q):
|
||||
if not q.empty():
|
||||
if q.qsize():
|
||||
raise RuntimeError, "Call this function with an empty queue"
|
||||
# I guess we better check things actually queue correctly a little :)
|
||||
q.put(111)
|
||||
|
@ -100,10 +104,10 @@ class BaseQueueTest(unittest.TestCase, BlockingTestMixin):
|
|||
"Didn't seem to queue the correct data!")
|
||||
for i in range(QUEUE_SIZE-1):
|
||||
q.put(i)
|
||||
self.assert_(not q.empty(), "Queue should not be empty")
|
||||
self.assert_(not q.full(), "Queue should not be full")
|
||||
self.assert_(q.qsize(), "Queue should not be empty")
|
||||
self.assert_(not qfull(q), "Queue should not be full")
|
||||
q.put("last")
|
||||
self.assert_(q.full(), "Queue should be full")
|
||||
self.assert_(qfull(q), "Queue should be full")
|
||||
try:
|
||||
q.put("full", block=0)
|
||||
self.fail("Didn't appear to block with a full queue")
|
||||
|
@ -120,7 +124,7 @@ class BaseQueueTest(unittest.TestCase, BlockingTestMixin):
|
|||
# Empty it
|
||||
for i in range(QUEUE_SIZE):
|
||||
q.get()
|
||||
self.assert_(q.empty(), "Queue should be empty")
|
||||
self.assert_(not q.qsize(), "Queue should be empty")
|
||||
try:
|
||||
q.get(block=0)
|
||||
self.fail("Didn't appear to block with an empty queue")
|
||||
|
@ -224,7 +228,7 @@ class FailingQueue(Queue.Queue):
|
|||
class FailingQueueTest(unittest.TestCase, BlockingTestMixin):
|
||||
|
||||
def failing_queue_test(self, q):
|
||||
if not q.empty():
|
||||
if q.qsize():
|
||||
raise RuntimeError, "Call this function with an empty queue"
|
||||
for i in range(QUEUE_SIZE-1):
|
||||
q.put(i)
|
||||
|
@ -242,7 +246,7 @@ class FailingQueueTest(unittest.TestCase, BlockingTestMixin):
|
|||
except FailingQueueException:
|
||||
pass
|
||||
q.put("last")
|
||||
self.assert_(q.full(), "Queue should be full")
|
||||
self.assert_(qfull(q), "Queue should be full")
|
||||
# Test a failing blocking put
|
||||
q.fail_next_put = True
|
||||
try:
|
||||
|
@ -264,17 +268,17 @@ class FailingQueueTest(unittest.TestCase, BlockingTestMixin):
|
|||
# Check the Queue isn't damaged.
|
||||
# put failed, but get succeeded - re-add
|
||||
q.put("last")
|
||||
self.assert_(q.full(), "Queue should be full")
|
||||
self.assert_(qfull(q), "Queue should be full")
|
||||
q.get()
|
||||
self.assert_(not q.full(), "Queue should not be full")
|
||||
self.assert_(not qfull(q), "Queue should not be full")
|
||||
q.put("last")
|
||||
self.assert_(q.full(), "Queue should be full")
|
||||
self.assert_(qfull(q), "Queue should be full")
|
||||
# Test a blocking put
|
||||
self.do_blocking_test(q.put, ("full",), q.get, ())
|
||||
# Empty it
|
||||
for i in range(QUEUE_SIZE):
|
||||
q.get()
|
||||
self.assert_(q.empty(), "Queue should be empty")
|
||||
self.assert_(not q.qsize(), "Queue should be empty")
|
||||
q.put("first")
|
||||
q.fail_next_get = True
|
||||
try:
|
||||
|
@ -282,16 +286,16 @@ class FailingQueueTest(unittest.TestCase, BlockingTestMixin):
|
|||
self.fail("The queue didn't fail when it should have")
|
||||
except FailingQueueException:
|
||||
pass
|
||||
self.assert_(not q.empty(), "Queue should not be empty")
|
||||
self.assert_(q.qsize(), "Queue should not be empty")
|
||||
q.fail_next_get = True
|
||||
try:
|
||||
q.get(timeout=0.1)
|
||||
self.fail("The queue didn't fail when it should have")
|
||||
except FailingQueueException:
|
||||
pass
|
||||
self.assert_(not q.empty(), "Queue should not be empty")
|
||||
self.assert_(q.qsize(), "Queue should not be empty")
|
||||
q.get()
|
||||
self.assert_(q.empty(), "Queue should be empty")
|
||||
self.assert_(not q.qsize(), "Queue should be empty")
|
||||
q.fail_next_get = True
|
||||
try:
|
||||
self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
|
||||
|
@ -300,9 +304,9 @@ class FailingQueueTest(unittest.TestCase, BlockingTestMixin):
|
|||
except FailingQueueException:
|
||||
pass
|
||||
# put succeeded, but get failed.
|
||||
self.assert_(not q.empty(), "Queue should not be empty")
|
||||
self.assert_(q.qsize(), "Queue should not be empty")
|
||||
q.get()
|
||||
self.assert_(q.empty(), "Queue should be empty")
|
||||
self.assert_(not q.qsize(), "Queue should be empty")
|
||||
|
||||
def test_failing_queue(self):
|
||||
# Test to make sure a queue is functioning correctly.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue