gh-127933: Add option to run regression tests in parallel (gh-128003)

This adds a new command line argument, `--parallel-threads` to the
regression test runner to allow it to run individual tests in multiple
threads in parallel in order to find multithreading bugs.

Some tests pass when run with `--parallel-threads`, but there's still
more work before the entire suite passes.
This commit is contained in:
Sam Gross 2025-02-04 17:44:59 -05:00 committed by GitHub
parent 285c1c4e95
commit e5f10a7414
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 150 additions and 3 deletions

View file

@ -792,6 +792,11 @@ The :mod:`test.support` module defines the following functions:
Decorator for invoking :func:`check_impl_detail` on *guards*. If that
returns ``False``, then uses *msg* as the reason for skipping the test.
.. decorator:: thread_unsafe(reason=None)
Decorator for marking tests as thread-unsafe. This test always runs in one
thread even when invoked with ``--parallel-threads``.
.. decorator:: no_tracing

View file

@ -160,6 +160,7 @@ class Namespace(argparse.Namespace):
self.print_slow = False
self.random_seed = None
self.use_mp = None
self.parallel_threads = None
self.forever = False
self.header = False
self.failfast = False
@ -316,6 +317,10 @@ def _create_parser():
'a single process, ignore -jN option, '
'and failed tests are also rerun sequentially '
'in the same process')
group.add_argument('--parallel-threads', metavar='PARALLEL_THREADS',
type=int,
help='run copies of each test in PARALLEL_THREADS at '
'once')
group.add_argument('-T', '--coverage', action='store_true',
dest='trace',
help='turn on code coverage tracing using the trace '

View file

@ -142,6 +142,8 @@ class Regrtest:
else:
self.random_seed = ns.random_seed
self.parallel_threads = ns.parallel_threads
# tests
self.first_runtests: RunTests | None = None
@ -506,6 +508,7 @@ class Regrtest:
python_cmd=self.python_cmd,
randomize=self.randomize,
random_seed=self.random_seed,
parallel_threads=self.parallel_threads,
)
def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int:

View file

@ -0,0 +1,79 @@
"""Run a test case multiple times in parallel threads."""
import copy
import functools
import threading
import unittest
from unittest import TestCase
class ParallelTestCase(TestCase):
def __init__(self, test_case: TestCase, num_threads: int):
self.test_case = test_case
self.num_threads = num_threads
self._testMethodName = test_case._testMethodName
self._testMethodDoc = test_case._testMethodDoc
def __str__(self):
return f"{str(self.test_case)} [threads={self.num_threads}]"
def run_worker(self, test_case: TestCase, result: unittest.TestResult,
barrier: threading.Barrier):
barrier.wait()
test_case.run(result)
def run(self, result=None):
if result is None:
result = test_case.defaultTestResult()
startTestRun = getattr(result, 'startTestRun', None)
stopTestRun = getattr(result, 'stopTestRun', None)
if startTestRun is not None:
startTestRun()
else:
stopTestRun = None
# Called at the beginning of each test. See TestCase.run.
result.startTest(self)
cases = [copy.copy(self.test_case) for _ in range(self.num_threads)]
results = [unittest.TestResult() for _ in range(self.num_threads)]
barrier = threading.Barrier(self.num_threads)
threads = []
for i, (case, r) in enumerate(zip(cases, results)):
thread = threading.Thread(target=self.run_worker,
args=(case, r, barrier),
name=f"{str(self.test_case)}-{i}",
daemon=True)
threads.append(thread)
for thread in threads:
thread.start()
for threads in threads:
threads.join()
# Aggregate test results
if all(r.wasSuccessful() for r in results):
result.addSuccess(self)
# Note: We can't call result.addError, result.addFailure, etc. because
# we no longer have the original exception, just the string format.
for r in results:
if len(r.errors) > 0 or len(r.failures) > 0:
result._mirrorOutput = True
result.errors.extend(r.errors)
result.failures.extend(r.failures)
result.skipped.extend(r.skipped)
result.expectedFailures.extend(r.expectedFailures)
result.unexpectedSuccesses.extend(r.unexpectedSuccesses)
result.collectedDurations.extend(r.collectedDurations)
if any(r.shouldStop for r in results):
result.stop()
# Test has finished running
result.stopTest(self)
if stopTestRun is not None:
stopTestRun()

View file

@ -100,6 +100,7 @@ class RunTests:
python_cmd: tuple[str, ...] | None
randomize: bool
random_seed: int | str
parallel_threads: int | None
def copy(self, **override) -> 'RunTests':
state = dataclasses.asdict(self)
@ -184,6 +185,8 @@ class RunTests:
args.extend(("--python", cmd))
if self.randomize:
args.append(f"--randomize")
if self.parallel_threads:
args.append(f"--parallel-threads={self.parallel_threads}")
args.append(f"--randseed={self.random_seed}")
return args

View file

@ -17,6 +17,7 @@ from .runtests import RunTests
from .save_env import saved_test_environment
from .setup import setup_tests
from .testresult import get_test_runner
from .parallel_case import ParallelTestCase
from .utils import (
TestName,
clear_caches, remove_testfn, abs_module_name, print_warning)
@ -27,14 +28,17 @@ from .utils import (
PROGRESS_MIN_TIME = 30.0 # seconds
def run_unittest(test_mod):
def run_unittest(test_mod, runtests: RunTests):
loader = unittest.TestLoader()
tests = loader.loadTestsFromModule(test_mod)
for error in loader.errors:
print(error, file=sys.stderr)
if loader.errors:
raise Exception("errors while loading tests")
_filter_suite(tests, match_test)
if runtests.parallel_threads:
_parallelize_tests(tests, runtests.parallel_threads)
return _run_suite(tests)
def _filter_suite(suite, pred):
@ -49,6 +53,28 @@ def _filter_suite(suite, pred):
newtests.append(test)
suite._tests = newtests
def _parallelize_tests(suite, parallel_threads: int):
def is_thread_unsafe(test):
test_method = getattr(test, test._testMethodName)
instance = test_method.__self__
return (getattr(test_method, "__unittest_thread_unsafe__", False) or
getattr(instance, "__unittest_thread_unsafe__", False))
newtests: list[object] = []
for test in suite._tests:
if isinstance(test, unittest.TestSuite):
_parallelize_tests(test, parallel_threads)
newtests.append(test)
continue
if is_thread_unsafe(test):
# Don't parallelize thread-unsafe tests
newtests.append(test)
continue
newtests.append(ParallelTestCase(test, parallel_threads))
suite._tests = newtests
def _run_suite(suite):
"""Run tests from a unittest.TestSuite-derived class."""
runner = get_test_runner(sys.stdout,
@ -133,7 +159,7 @@ def _load_run_test(result: TestResult, runtests: RunTests) -> None:
raise Exception(f"Module {test_name} defines test_main() which "
f"is no longer supported by regrtest")
def test_func():
return run_unittest(test_mod)
return run_unittest(test_mod, runtests)
try:
regrtest_runner(result, test_func, runtests)

View file

@ -40,7 +40,7 @@ __all__ = [
"anticipate_failure", "load_package_tests", "detect_api_mismatch",
"check__all__", "skip_if_buggy_ucrt_strfptime",
"check_disallow_instantiation", "check_sanitizer", "skip_if_sanitizer",
"requires_limited_api", "requires_specialization",
"requires_limited_api", "requires_specialization", "thread_unsafe",
# sys
"MS_WINDOWS", "is_jython", "is_android", "is_emscripten", "is_wasi",
"is_apple_mobile", "check_impl_detail", "unix_shell", "setswitchinterval",
@ -382,6 +382,21 @@ def requires_mac_ver(*min_version):
return decorator
def thread_unsafe(reason):
"""Mark a test as not thread safe. When the test runner is run with
--parallel-threads=N, the test will be run in a single thread."""
def decorator(test_item):
test_item.__unittest_thread_unsafe__ = True
# the reason is not currently used
test_item.__unittest_thread_unsafe__why__ = reason
return test_item
if isinstance(reason, types.FunctionType):
test_item = reason
reason = ''
return decorator(test_item)
return decorator
def skip_if_buildbot(reason=None):
"""Decorator raising SkipTest if running on a buildbot."""
import getpass

View file

@ -1,6 +1,7 @@
"Test the functionality of Python classes implementing operators."
import unittest
from test import support
from test.support import cpython_only, import_helper, script_helper, skip_emscripten_stack_overflow
testmeths = [
@ -134,6 +135,7 @@ for method in testmeths:
AllTests = type("AllTests", (object,), d)
del d, statictests, method, method_template
@support.thread_unsafe("callLst is shared between threads")
class ClassTests(unittest.TestCase):
def setUp(self):
callLst[:] = []

View file

@ -1103,6 +1103,7 @@ class ClassPropertiesAndMethods(unittest.TestCase):
with self.assertRaises(TypeError):
frozenset().__class__ = MyFrozenSet
@support.thread_unsafe
def test_slots(self):
# Testing __slots__...
class C0(object):
@ -5485,6 +5486,7 @@ class PicklingTests(unittest.TestCase):
{pickle.dumps, pickle._dumps},
{pickle.loads, pickle._loads}))
@support.thread_unsafe
def test_pickle_slots(self):
# Tests pickling of classes with __slots__.
@ -5552,6 +5554,7 @@ class PicklingTests(unittest.TestCase):
y = pickle_copier.copy(x)
self._assert_is_copy(x, y)
@support.thread_unsafe
def test_reduce_copying(self):
# Tests pickling and copying new-style classes and objects.
global C1

View file

@ -666,6 +666,7 @@ class COperatorTestCase(OperatorTestCase, unittest.TestCase):
module = c_operator
@support.thread_unsafe("swaps global operator module")
class OperatorPickleTestCase:
def copy(self, obj, proto):
with support.swap_item(sys.modules, 'operator', self.module):

View file

@ -1538,6 +1538,7 @@ class TestDetectEncoding(TestCase):
self.assertEqual(encoding, 'utf-8')
self.assertEqual(consumed_lines, [b'print("#coding=fake")'])
@support.thread_unsafe
def test_open(self):
filename = os_helper.TESTFN + '.py'
self.addCleanup(os_helper.unlink, filename)

View file

@ -0,0 +1,4 @@
Add an option ``--parallel-threads=N`` to the regression test runner that
runs individual tests in multiple threads in parallel in order to find
concurrency bugs. Note that most of the test suite is not yet reviewed for
thread-safety or annotated with ``@thread_unsafe`` when necessary.