mirror of
https://github.com/python/cpython.git
synced 2025-08-30 21:48:47 +00:00
Issue #20556: Used specific assert methods in threading tests.
This commit is contained in:
parent
20be53e5b5
commit
8c0f0c5c1e
3 changed files with 22 additions and 23 deletions
|
@ -58,7 +58,7 @@ class TestThread(threading.Thread):
|
|||
self.nrunning.inc()
|
||||
if verbose:
|
||||
print(self.nrunning.get(), 'tasks are running')
|
||||
self.testcase.assertTrue(self.nrunning.get() <= 3)
|
||||
self.testcase.assertLessEqual(self.nrunning.get(), 3)
|
||||
|
||||
time.sleep(delay)
|
||||
if verbose:
|
||||
|
@ -66,7 +66,7 @@ class TestThread(threading.Thread):
|
|||
|
||||
with self.mutex:
|
||||
self.nrunning.dec()
|
||||
self.testcase.assertTrue(self.nrunning.get() >= 0)
|
||||
self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
|
||||
if verbose:
|
||||
print('%s is finished. %d tasks are running' %
|
||||
(self.name, self.nrunning.get()))
|
||||
|
@ -100,26 +100,25 @@ class ThreadTests(BaseTestCase):
|
|||
for i in range(NUMTASKS):
|
||||
t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
|
||||
threads.append(t)
|
||||
self.assertEqual(t.ident, None)
|
||||
self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t)))
|
||||
self.assertIsNone(t.ident)
|
||||
self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$')
|
||||
t.start()
|
||||
|
||||
if verbose:
|
||||
print('waiting for all tasks to complete')
|
||||
for t in threads:
|
||||
t.join()
|
||||
self.assertTrue(not t.is_alive())
|
||||
self.assertFalse(t.is_alive())
|
||||
self.assertNotEqual(t.ident, 0)
|
||||
self.assertFalse(t.ident is None)
|
||||
self.assertTrue(re.match('<TestThread\(.*, stopped -?\d+\)>',
|
||||
repr(t)))
|
||||
self.assertIsNotNone(t.ident)
|
||||
self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$')
|
||||
if verbose:
|
||||
print('all tasks done')
|
||||
self.assertEqual(numrunning.get(), 0)
|
||||
|
||||
def test_ident_of_no_threading_threads(self):
|
||||
# The ident still must work for the main thread and dummy threads.
|
||||
self.assertFalse(threading.currentThread().ident is None)
|
||||
self.assertIsNotNone(threading.currentThread().ident)
|
||||
def f():
|
||||
ident.append(threading.currentThread().ident)
|
||||
done.set()
|
||||
|
@ -127,7 +126,7 @@ class ThreadTests(BaseTestCase):
|
|||
ident = []
|
||||
_thread.start_new_thread(f, ())
|
||||
done.wait()
|
||||
self.assertFalse(ident[0] is None)
|
||||
self.assertIsNotNone(ident[0])
|
||||
# Kill the "immortal" _DummyThread
|
||||
del threading._active[ident[0]]
|
||||
|
||||
|
@ -244,7 +243,7 @@ class ThreadTests(BaseTestCase):
|
|||
self.assertTrue(ret)
|
||||
if verbose:
|
||||
print(" verifying worker hasn't exited")
|
||||
self.assertTrue(not t.finished)
|
||||
self.assertFalse(t.finished)
|
||||
if verbose:
|
||||
print(" attempting to raise asynch exception in worker")
|
||||
result = set_async_exc(ctypes.c_long(t.id), exception)
|
||||
|
@ -415,9 +414,9 @@ class ThreadTests(BaseTestCase):
|
|||
|
||||
def test_repr_daemon(self):
|
||||
t = threading.Thread()
|
||||
self.assertFalse('daemon' in repr(t))
|
||||
self.assertNotIn('daemon', repr(t))
|
||||
t.daemon = True
|
||||
self.assertTrue('daemon' in repr(t))
|
||||
self.assertIn('daemon', repr(t))
|
||||
|
||||
def test_deamon_param(self):
|
||||
t = threading.Thread()
|
||||
|
@ -569,7 +568,7 @@ class ThreadTests(BaseTestCase):
|
|||
tstate_lock.release()
|
||||
self.assertFalse(t.is_alive())
|
||||
# And verify the thread disposed of _tstate_lock.
|
||||
self.assertTrue(t._tstate_lock is None)
|
||||
self.assertIsNone(t._tstate_lock)
|
||||
|
||||
def test_repr_stopped(self):
|
||||
# Verify that "stopped" shows up in repr(Thread) appropriately.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue