Closed pool when parallel test runner encounters unpicklable exceptions.
Some checks are pending
Docs / spelling (push) Waiting to run
Docs / blacken-docs (push) Waiting to run
Docs / lint-docs (push) Waiting to run
Linters / zizmor (push) Waiting to run
Linters / black (push) Waiting to run
Linters / flake8 (push) Waiting to run
Linters / isort (push) Waiting to run
Tests / Windows, SQLite, Python 3.14 (push) Waiting to run
Tests / JavaScript tests (push) Waiting to run

This commit is contained in:
Jacob Walls 2025-11-26 10:00:38 -05:00
parent 93540b34d4
commit bd4a562a88
2 changed files with 29 additions and 31 deletions

View file

@ -567,7 +567,14 @@ class ParallelTestSuite(unittest.TestSuite):
"""
self.initialize_suite()
counter = multiprocessing.Value(ctypes.c_int, 0)
pool = multiprocessing.Pool(
args = [
(self.runner_class, index, subsuite, self.failfast, self.buffer)
for index, subsuite in enumerate(self.subsuites)
]
# Don't buffer in the main process to avoid error propagation issues.
result.buffer = False
with multiprocessing.Pool(
processes=self.processes,
initializer=functools.partial(_safe_init_worker, self.init_worker.__func__),
initargs=[
@ -579,38 +586,30 @@ class ParallelTestSuite(unittest.TestSuite):
self.debug_mode,
self.used_aliases,
],
)
args = [
(self.runner_class, index, subsuite, self.failfast, self.buffer)
for index, subsuite in enumerate(self.subsuites)
]
# Don't buffer in the main process to avoid error propagation issues.
result.buffer = False
) as pool:
test_results = pool.imap_unordered(self.run_subsuite.__func__, args)
test_results = pool.imap_unordered(self.run_subsuite.__func__, args)
while True:
if result.shouldStop:
pool.terminate()
break
while True:
if result.shouldStop:
pool.terminate()
break
try:
subsuite_index, events = test_results.next(timeout=0.1)
except multiprocessing.TimeoutError as err:
if counter.value < 0:
err.add_note("ERROR: _init_worker failed, see prior traceback")
try:
subsuite_index, events = test_results.next(timeout=0.1)
except multiprocessing.TimeoutError as err:
if counter.value < 0:
err.add_note("ERROR: _init_worker failed, see prior traceback")
raise
continue
except StopIteration:
pool.close()
raise
continue
except StopIteration:
pool.close()
break
break
tests = list(self.subsuites[subsuite_index])
for event in events:
self.handle_event(result, tests, event)
tests = list(self.subsuites[subsuite_index])
for event in events:
self.handle_event(result, tests, event)
pool.join()
pool.join()
return result

View file

@ -309,9 +309,8 @@ class ParallelTestSuiteTest(SimpleTestCase):
test_result.shouldStop = True
return (0, remote_result.events)
mock_pool.return_value.imap_unordered.return_value = unittest.mock.Mock(
next=fake_next
)
mock_imap = mock_pool.return_value.__enter__.return_value.imap_unordered
mock_imap.return_value = unittest.mock.Mock(next=fake_next)
pts.run(test_result)
self.assertIn("ValueError: woops", test_result.errors[0][1])