gh-71339: Use new assertion methods in the multiprocessing tests (GH-128847)

This commit is contained in:
Serhiy Storchaka 2025-01-15 01:17:11 +02:00 committed by GitHub
parent f7ceb317ae
commit b52de22ac3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -319,7 +319,7 @@ class _TestProcess(BaseTestCase):
authkey = current.authkey
self.assertTrue(current.is_alive())
self.assertTrue(not current.daemon)
self.assertFalse(current.daemon)
self.assertIsInstance(authkey, bytes)
self.assertTrue(len(authkey) > 0)
self.assertEqual(current.ident, os.getpid())
@ -463,7 +463,7 @@ class _TestProcess(BaseTestCase):
self.assertEqual(p.is_alive(), False)
self.assertEqual(p.daemon, True)
self.assertNotIn(p, self.active_children())
self.assertTrue(type(self.active_children()) is list)
self.assertIs(type(self.active_children()), list)
self.assertEqual(p.exitcode, None)
p.start()
@ -583,8 +583,8 @@ class _TestProcess(BaseTestCase):
cpus = multiprocessing.cpu_count()
except NotImplementedError:
cpus = 1
self.assertTrue(type(cpus) is int)
self.assertTrue(cpus >= 1)
self.assertIsInstance(cpus, int)
self.assertGreaterEqual(cpus, 1)
def test_active_children(self):
self.assertEqual(type(self.active_children()), list)
@ -2382,14 +2382,14 @@ class _TestValue(BaseTestCase):
self.assertEqual(lock, lock3)
arr4 = self.Value('i', 5, lock=False)
self.assertFalse(hasattr(arr4, 'get_lock'))
self.assertFalse(hasattr(arr4, 'get_obj'))
self.assertNotHasAttr(arr4, 'get_lock')
self.assertNotHasAttr(arr4, 'get_obj')
self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
arr5 = self.RawValue('i', 5)
self.assertFalse(hasattr(arr5, 'get_lock'))
self.assertFalse(hasattr(arr5, 'get_obj'))
self.assertNotHasAttr(arr5, 'get_lock')
self.assertNotHasAttr(arr5, 'get_obj')
class _TestArray(BaseTestCase):
@ -2462,14 +2462,14 @@ class _TestArray(BaseTestCase):
self.assertEqual(lock, lock3)
arr4 = self.Array('i', range(10), lock=False)
self.assertFalse(hasattr(arr4, 'get_lock'))
self.assertFalse(hasattr(arr4, 'get_obj'))
self.assertNotHasAttr(arr4, 'get_lock')
self.assertNotHasAttr(arr4, 'get_obj')
self.assertRaises(AttributeError,
self.Array, 'i', range(10), lock='notalock')
arr5 = self.RawArray('i', range(10))
self.assertFalse(hasattr(arr5, 'get_lock'))
self.assertFalse(hasattr(arr5, 'get_obj'))
self.assertNotHasAttr(arr5, 'get_lock')
self.assertNotHasAttr(arr5, 'get_obj')
#
#
@ -2657,8 +2657,8 @@ class _TestContainers(BaseTestCase):
self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
del n.job
self.assertEqual(str(n), "Namespace(name='Bob')")
self.assertTrue(hasattr(n, 'name'))
self.assertTrue(not hasattr(n, 'job'))
self.assertHasAttr(n, 'name')
self.assertNotHasAttr(n, 'job')
#
#
@ -4938,13 +4938,9 @@ class _TestImportStar(unittest.TestCase):
for name in modules:
__import__(name)
mod = sys.modules[name]
self.assertTrue(hasattr(mod, '__all__'), name)
self.assertHasAttr(mod, '__all__', name)
for attr in mod.__all__:
self.assertTrue(
hasattr(mod, attr),
'%r does not have attribute %r' % (mod, attr)
)
self.assertHasAttr(mod, attr)
#
# Quick test that logging works -- does not test logging output
@ -4957,7 +4953,7 @@ class _TestLogging(BaseTestCase):
def test_enable_logging(self):
logger = multiprocessing.get_logger()
logger.setLevel(util.SUBWARNING)
self.assertTrue(logger is not None)
self.assertIsNotNone(logger)
logger.debug('this will not be printed')
logger.info('nor will this')
logger.setLevel(LOG_LEVEL)
@ -5753,9 +5749,8 @@ class TestStartMethod(unittest.TestCase):
self.assertEqual(multiprocessing.get_start_method(), method)
ctx = multiprocessing.get_context()
self.assertEqual(ctx.get_start_method(), method)
self.assertTrue(type(ctx).__name__.lower().startswith(method))
self.assertTrue(
ctx.Process.__name__.lower().startswith(method))
self.assertStartsWith(type(ctx).__name__.lower(), method)
self.assertStartsWith(ctx.Process.__name__.lower(), method)
self.check_context(multiprocessing)
count += 1
finally:
@ -5956,9 +5951,9 @@ class TestResourceTracker(unittest.TestCase):
if should_die:
self.assertEqual(len(all_warn), 1)
the_warn = all_warn[0]
self.assertTrue(issubclass(the_warn.category, UserWarning))
self.assertTrue("resource_tracker: process died"
in str(the_warn.message))
self.assertIsSubclass(the_warn.category, UserWarning)
self.assertIn("resource_tracker: process died",
str(the_warn.message))
else:
self.assertEqual(len(all_warn), 0)
@ -6163,8 +6158,8 @@ class TestPoolNotLeakOnFailure(unittest.TestCase):
Process=FailingForkProcess))
p.close()
p.join()
self.assertFalse(
any(process.is_alive() for process in forked_processes))
for process in forked_processes:
self.assertFalse(process.is_alive(), process)
@hashlib_helper.requires_hashdigest('sha256')