mirror of
https://github.com/python/cpython.git
synced 2025-07-24 11:44:31 +00:00
Issue #17778: Fix test discovery for test_multiprocessing. (Patch by
Zachary Ware.)
This commit is contained in:
parent
265fba40c8
commit
d15642e428
2 changed files with 107 additions and 122 deletions
|
@ -19,6 +19,7 @@ import socket
|
|||
import random
|
||||
import logging
|
||||
import struct
|
||||
import operator
|
||||
import test.support
|
||||
import test.script_helper
|
||||
|
||||
|
@ -1624,6 +1625,18 @@ def mul(x, y):
|
|||
|
||||
class _TestPool(BaseTestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super().setUpClass()
|
||||
cls.pool = cls.Pool(4)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.pool.terminate()
|
||||
cls.pool.join()
|
||||
cls.pool = None
|
||||
super().tearDownClass()
|
||||
|
||||
def test_apply(self):
|
||||
papply = self.pool.apply
|
||||
self.assertEqual(papply(sqr, (5,)), sqr(5))
|
||||
|
@ -1715,15 +1728,6 @@ class _TestPool(BaseTestCase):
|
|||
p.join()
|
||||
|
||||
def test_terminate(self):
|
||||
if self.TYPE == 'manager':
|
||||
# On Unix a forked process increfs each shared object to
|
||||
# which its parent process held a reference. If the
|
||||
# forked process gets terminated then there is likely to
|
||||
# be a reference leak. So to prevent
|
||||
# _TestZZZNumberOfObjects from failing we skip this test
|
||||
# when using a manager.
|
||||
return
|
||||
|
||||
result = self.pool.map_async(
|
||||
time.sleep, [0.1 for i in range(10000)], chunksize=1
|
||||
)
|
||||
|
@ -1751,7 +1755,6 @@ class _TestPool(BaseTestCase):
|
|||
with multiprocessing.Pool(2) as p:
|
||||
r = p.map_async(sqr, L)
|
||||
self.assertEqual(r.get(), expected)
|
||||
print(p._state)
|
||||
self.assertRaises(ValueError, p.map_async, sqr, L)
|
||||
|
||||
def raising():
|
||||
|
@ -1845,35 +1848,6 @@ class _TestPoolWorkerLifetime(BaseTestCase):
|
|||
for (j, res) in enumerate(results):
|
||||
self.assertEqual(res.get(), sqr(j))
|
||||
|
||||
|
||||
#
|
||||
# Test that manager has expected number of shared objects left
|
||||
#
|
||||
|
||||
class _TestZZZNumberOfObjects(BaseTestCase):
|
||||
# Because test cases are sorted alphabetically, this one will get
|
||||
# run after all the other tests for the manager. It tests that
|
||||
# there have been no "reference leaks" for the manager's shared
|
||||
# objects. Note the comment in _TestPool.test_terminate().
|
||||
|
||||
# If some other test using ManagerMixin.manager fails, then the
|
||||
# raised exception may keep alive a frame which holds a reference
|
||||
# to a managed object. This will cause test_number_of_objects to
|
||||
# also fail.
|
||||
ALLOWED_TYPES = ('manager',)
|
||||
|
||||
def test_number_of_objects(self):
|
||||
EXPECTED_NUMBER = 1 # the pool object is still alive
|
||||
multiprocessing.active_children() # discard dead process objs
|
||||
gc.collect() # do garbage collection
|
||||
refs = self.manager._number_of_objects()
|
||||
debug_info = self.manager._debug_info()
|
||||
if refs != EXPECTED_NUMBER:
|
||||
print(self.manager._debug_info())
|
||||
print(debug_info)
|
||||
|
||||
self.assertEqual(refs, EXPECTED_NUMBER)
|
||||
|
||||
#
|
||||
# Test of creating a customized manager class
|
||||
#
|
||||
|
@ -2051,7 +2025,7 @@ class _TestManagerRestart(BaseTestCase):
|
|||
address=addr, authkey=authkey, serializer=SERIALIZER)
|
||||
try:
|
||||
manager.start()
|
||||
except IOError as e:
|
||||
except OSError as e:
|
||||
if e.errno != errno.EADDRINUSE:
|
||||
raise
|
||||
# Retry after some time, in case the old socket was lingering
|
||||
|
@ -2165,9 +2139,9 @@ class _TestConnection(BaseTestCase):
|
|||
self.assertEqual(reader.writable, False)
|
||||
self.assertEqual(writer.readable, False)
|
||||
self.assertEqual(writer.writable, True)
|
||||
self.assertRaises(IOError, reader.send, 2)
|
||||
self.assertRaises(IOError, writer.recv)
|
||||
self.assertRaises(IOError, writer.poll)
|
||||
self.assertRaises(OSError, reader.send, 2)
|
||||
self.assertRaises(OSError, writer.recv)
|
||||
self.assertRaises(OSError, writer.poll)
|
||||
|
||||
def test_spawn_close(self):
|
||||
# We test that a pipe connection can be closed by parent
|
||||
|
@ -2329,8 +2303,8 @@ class _TestConnection(BaseTestCase):
|
|||
if self.TYPE == 'processes':
|
||||
self.assertTrue(a.closed)
|
||||
self.assertTrue(b.closed)
|
||||
self.assertRaises(IOError, a.recv)
|
||||
self.assertRaises(IOError, b.recv)
|
||||
self.assertRaises(OSError, a.recv)
|
||||
self.assertRaises(OSError, b.recv)
|
||||
|
||||
class _TestListener(BaseTestCase):
|
||||
|
||||
|
@ -2351,7 +2325,7 @@ class _TestListener(BaseTestCase):
|
|||
self.assertEqual(d.recv(), 1729)
|
||||
|
||||
if self.TYPE == 'processes':
|
||||
self.assertRaises(IOError, l.accept)
|
||||
self.assertRaises(OSError, l.accept)
|
||||
|
||||
class _TestListenerClient(BaseTestCase):
|
||||
|
||||
|
@ -2401,7 +2375,7 @@ class _TestListenerClient(BaseTestCase):
|
|||
c.close()
|
||||
l.close()
|
||||
|
||||
class _TestPoll(unittest.TestCase):
|
||||
class _TestPoll(BaseTestCase):
|
||||
|
||||
ALLOWED_TYPES = ('processes', 'threads')
|
||||
|
||||
|
@ -2942,27 +2916,18 @@ class TestInvalidHandle(unittest.TestCase):
|
|||
def test_invalid_handles(self):
|
||||
conn = multiprocessing.connection.Connection(44977608)
|
||||
try:
|
||||
self.assertRaises((ValueError, IOError), conn.poll)
|
||||
self.assertRaises((ValueError, OSError), conn.poll)
|
||||
finally:
|
||||
# Hack private attribute _handle to avoid printing an error
|
||||
# in conn.__del__
|
||||
conn._handle = None
|
||||
self.assertRaises((ValueError, IOError),
|
||||
self.assertRaises((ValueError, OSError),
|
||||
multiprocessing.connection.Connection, -1)
|
||||
|
||||
#
|
||||
# Functions used to create test cases from the base ones in this module
|
||||
#
|
||||
|
||||
def get_attributes(Source, names):
|
||||
d = {}
|
||||
for name in names:
|
||||
obj = getattr(Source, name)
|
||||
if type(obj) == type(get_attributes):
|
||||
obj = staticmethod(obj)
|
||||
d[name] = obj
|
||||
return d
|
||||
|
||||
def create_test_cases(Mixin, type):
|
||||
result = {}
|
||||
glob = globals()
|
||||
|
@ -2975,10 +2940,10 @@ def create_test_cases(Mixin, type):
|
|||
assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
|
||||
if type in base.ALLOWED_TYPES:
|
||||
newname = 'With' + Type + name[1:]
|
||||
class Temp(base, unittest.TestCase, Mixin):
|
||||
class Temp(base, Mixin, unittest.TestCase):
|
||||
pass
|
||||
result[newname] = Temp
|
||||
Temp.__name__ = newname
|
||||
Temp.__name__ = Temp.__qualname__ = newname
|
||||
Temp.__module__ = Mixin.__module__
|
||||
return result
|
||||
|
||||
|
@ -2989,12 +2954,24 @@ def create_test_cases(Mixin, type):
|
|||
class ProcessesMixin(object):
|
||||
TYPE = 'processes'
|
||||
Process = multiprocessing.Process
|
||||
locals().update(get_attributes(multiprocessing, (
|
||||
'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
|
||||
'Condition', 'Event', 'Barrier', 'Value', 'Array', 'RawValue',
|
||||
'RawArray', 'current_process', 'active_children', 'Pipe',
|
||||
'connection', 'JoinableQueue', 'Pool'
|
||||
)))
|
||||
connection = multiprocessing.connection
|
||||
current_process = staticmethod(multiprocessing.current_process)
|
||||
active_children = staticmethod(multiprocessing.active_children)
|
||||
Pool = staticmethod(multiprocessing.Pool)
|
||||
Pipe = staticmethod(multiprocessing.Pipe)
|
||||
Queue = staticmethod(multiprocessing.Queue)
|
||||
JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
|
||||
Lock = staticmethod(multiprocessing.Lock)
|
||||
RLock = staticmethod(multiprocessing.RLock)
|
||||
Semaphore = staticmethod(multiprocessing.Semaphore)
|
||||
BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
|
||||
Condition = staticmethod(multiprocessing.Condition)
|
||||
Event = staticmethod(multiprocessing.Event)
|
||||
Barrier = staticmethod(multiprocessing.Barrier)
|
||||
Value = staticmethod(multiprocessing.Value)
|
||||
Array = staticmethod(multiprocessing.Array)
|
||||
RawValue = staticmethod(multiprocessing.RawValue)
|
||||
RawArray = staticmethod(multiprocessing.RawArray)
|
||||
|
||||
testcases_processes = create_test_cases(ProcessesMixin, type='processes')
|
||||
globals().update(testcases_processes)
|
||||
|
@ -3003,12 +2980,48 @@ globals().update(testcases_processes)
|
|||
class ManagerMixin(object):
|
||||
TYPE = 'manager'
|
||||
Process = multiprocessing.Process
|
||||
manager = object.__new__(multiprocessing.managers.SyncManager)
|
||||
locals().update(get_attributes(manager, (
|
||||
'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
|
||||
'Condition', 'Event', 'Barrier', 'Value', 'Array', 'list', 'dict',
|
||||
'Namespace', 'JoinableQueue', 'Pool'
|
||||
)))
|
||||
Queue = property(operator.attrgetter('manager.Queue'))
|
||||
JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
|
||||
Lock = property(operator.attrgetter('manager.Lock'))
|
||||
RLock = property(operator.attrgetter('manager.RLock'))
|
||||
Semaphore = property(operator.attrgetter('manager.Semaphore'))
|
||||
BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
|
||||
Condition = property(operator.attrgetter('manager.Condition'))
|
||||
Event = property(operator.attrgetter('manager.Event'))
|
||||
Barrier = property(operator.attrgetter('manager.Barrier'))
|
||||
Value = property(operator.attrgetter('manager.Value'))
|
||||
Array = property(operator.attrgetter('manager.Array'))
|
||||
list = property(operator.attrgetter('manager.list'))
|
||||
dict = property(operator.attrgetter('manager.dict'))
|
||||
Namespace = property(operator.attrgetter('manager.Namespace'))
|
||||
|
||||
@classmethod
|
||||
def Pool(cls, *args, **kwds):
|
||||
return cls.manager.Pool(*args, **kwds)
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.manager = multiprocessing.Manager()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
# only the manager process should be returned by active_children()
|
||||
# but this can take a bit on slow machines, so wait a few seconds
|
||||
# if there are other children too (see #17395)
|
||||
t = 0.01
|
||||
while len(multiprocessing.active_children()) > 1 and t < 5:
|
||||
time.sleep(t)
|
||||
t *= 2
|
||||
gc.collect() # do garbage collection
|
||||
if cls.manager._number_of_objects() != 0:
|
||||
# This is not really an error since some tests do not
|
||||
# ensure that all processes which hold a reference to a
|
||||
# managed object have been joined.
|
||||
print('Shared objects which still exist at manager shutdown:')
|
||||
print(cls.manager._debug_info())
|
||||
cls.manager.shutdown()
|
||||
cls.manager.join()
|
||||
cls.manager = None
|
||||
|
||||
testcases_manager = create_test_cases(ManagerMixin, type='manager')
|
||||
globals().update(testcases_manager)
|
||||
|
@ -3017,16 +3030,27 @@ globals().update(testcases_manager)
|
|||
class ThreadsMixin(object):
|
||||
TYPE = 'threads'
|
||||
Process = multiprocessing.dummy.Process
|
||||
locals().update(get_attributes(multiprocessing.dummy, (
|
||||
'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
|
||||
'Condition', 'Event', 'Barrier', 'Value', 'Array', 'current_process',
|
||||
'active_children', 'Pipe', 'connection', 'dict', 'list',
|
||||
'Namespace', 'JoinableQueue', 'Pool'
|
||||
)))
|
||||
connection = multiprocessing.dummy.connection
|
||||
current_process = staticmethod(multiprocessing.dummy.current_process)
|
||||
active_children = staticmethod(multiprocessing.dummy.active_children)
|
||||
Pool = staticmethod(multiprocessing.Pool)
|
||||
Pipe = staticmethod(multiprocessing.dummy.Pipe)
|
||||
Queue = staticmethod(multiprocessing.dummy.Queue)
|
||||
JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
|
||||
Lock = staticmethod(multiprocessing.dummy.Lock)
|
||||
RLock = staticmethod(multiprocessing.dummy.RLock)
|
||||
Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
|
||||
BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
|
||||
Condition = staticmethod(multiprocessing.dummy.Condition)
|
||||
Event = staticmethod(multiprocessing.dummy.Event)
|
||||
Barrier = staticmethod(multiprocessing.dummy.Barrier)
|
||||
Value = staticmethod(multiprocessing.dummy.Value)
|
||||
Array = staticmethod(multiprocessing.dummy.Array)
|
||||
|
||||
testcases_threads = create_test_cases(ThreadsMixin, type='threads')
|
||||
globals().update(testcases_threads)
|
||||
|
||||
|
||||
class OtherTest(unittest.TestCase):
|
||||
# TODO: add more tests for deliver/answer challenge.
|
||||
def test_deliver_challenge_auth_failure(self):
|
||||
|
@ -3532,16 +3556,7 @@ class TestIgnoreEINTR(unittest.TestCase):
|
|||
#
|
||||
#
|
||||
|
||||
testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
|
||||
TestStdinBadfiledescriptor, TestWait, TestInvalidFamily,
|
||||
TestFlags, TestTimeouts, TestNoForkBomb,
|
||||
TestForkAwareThreadLock, TestIgnoreEINTR]
|
||||
|
||||
#
|
||||
#
|
||||
#
|
||||
|
||||
def test_main(run=None):
|
||||
def setUpModule():
|
||||
if sys.platform.startswith("linux"):
|
||||
try:
|
||||
lock = multiprocessing.RLock()
|
||||
|
@ -3550,43 +3565,10 @@ def test_main(run=None):
|
|||
|
||||
check_enough_semaphores()
|
||||
|
||||
if run is None:
|
||||
from test.support import run_unittest as run
|
||||
|
||||
util.get_temp_dir() # creates temp directory for use by all processes
|
||||
|
||||
multiprocessing.get_logger().setLevel(LOG_LEVEL)
|
||||
|
||||
ProcessesMixin.pool = multiprocessing.Pool(4)
|
||||
ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
|
||||
ManagerMixin.manager.__init__()
|
||||
ManagerMixin.manager.start()
|
||||
ManagerMixin.pool = ManagerMixin.manager.Pool(4)
|
||||
|
||||
testcases = (
|
||||
sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
|
||||
sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
|
||||
sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
|
||||
testcases_other
|
||||
)
|
||||
|
||||
loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
|
||||
suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
|
||||
try:
|
||||
run(suite)
|
||||
finally:
|
||||
ThreadsMixin.pool.terminate()
|
||||
ProcessesMixin.pool.terminate()
|
||||
ManagerMixin.pool.terminate()
|
||||
ManagerMixin.pool.join()
|
||||
ManagerMixin.manager.shutdown()
|
||||
ManagerMixin.manager.join()
|
||||
ThreadsMixin.pool.join()
|
||||
ProcessesMixin.pool.join()
|
||||
del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
|
||||
|
||||
def main():
|
||||
test_main(unittest.TextTestRunner(verbosity=2).run)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
unittest.main()
|
||||
|
|
|
@ -49,6 +49,9 @@ Core and Builtins
|
|||
Library
|
||||
-------
|
||||
|
||||
- Issue #17778: Fix test discovery for test_multiprocessing. (Patch by
|
||||
Zachary Ware.)
|
||||
|
||||
- Issue #18431: The new email header parser now decodes RFC2047 encoded words
|
||||
in structured headers.
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue