gh-110756: Sync regrtest with main branch (#110758) (#110781)

Copy files from main to this branch:

* Lib/test/libregrtest/*.py
* Lib/test/__init__.py
* Lib/test/__main__.py
* Lib/test/autotest.py
* Lib/test/pythoninfo.py
* Lib/test/regrtest.py
* Lib/test/test_regrtest.py

Copy also changes from:

* Lib/test/support/__init__.py
* Lib/test/support/os_helper.py
* Lib/test/support/testresult.py
* Lib/test/support/threading_helper.py
* Lib/test/test_support.py

Do not modify scripts running tests such as Makefile.pre.in,
.github/workflows/build.yml or Tools/scripts/run_tests.py: do not use
--fast-ci and --slow-ci in this change.

Changes:

* SPLITTESTDIRS: don't include test_inspect.
* Add utils.process_cpu_count() using len(os.sched_getaffinity(0)).
* test_regrtest doesn't use @support.without_optimizer which doesn't
  exist in Python 3.11.
* Add support.set_sanitizer_env_var().
* Update test_faulthandler to use support.set_sanitizer_env_var().
* @support.without_optimizer doesn't exist in 3.11.
* Add support.Py_DEBUG.
* regrtest.refleak: 3.11 doesn't have sys.getunicodeinternedsize.
This commit is contained in:
Victor Stinner 2023-10-12 23:45:36 +02:00 committed by GitHub
parent e16922f070
commit 26748ed4f6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 3711 additions and 2175 deletions

View file

@ -1,2 +1,2 @@
from test.libregrtest import main
main()
from test.libregrtest.main import main
main(_add_python_opts=True)

View file

@ -1,5 +1,5 @@
# This should be equivalent to running regrtest.py from the cmdline.
# It can be especially handy if you're in an interactive shell, e.g.,
# from test import autotest.
from test.libregrtest import main
from test.libregrtest.main import main
main()

View file

@ -1,2 +0,0 @@
from test.libregrtest.cmdline import _parse_args, RESOURCE_NAMES, ALL_RESOURCES
from test.libregrtest.main import main

View file

@ -1,8 +1,9 @@
import argparse
import os
import os.path
import shlex
import sys
from test.support import os_helper
from .utils import ALL_RESOURCES, RESOURCE_NAMES
USAGE = """\
@ -27,8 +28,10 @@ EPILOG = """\
Additional option details:
-r randomizes test execution order. You can use --randseed=int to provide an
int seed value for the randomizer; this is useful for reproducing troublesome
test orders.
int seed value for the randomizer. The randseed value will be used
to set seeds for all random usages in tests
(including randomizing the tests order if -r is set).
By default we always set random seed, but do not randomize test order.
-s On the first invocation of regrtest using -s, the first test file found
or the first test file given on the command line is run, and the name of
@ -130,25 +133,17 @@ Pattern examples:
"""
ALL_RESOURCES = ('audio', 'curses', 'largefile', 'network',
'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui', 'walltime')
# Other resources excluded from --use=all:
#
# - extralagefile (ex: test_zipfile64): really too slow to be enabled
# "by default"
# - tzdata: while needed to validate fully test_datetime, it makes
# test_datetime too slow (15-20 min on some buildbots) and so is disabled by
# default (see bpo-30822).
RESOURCE_NAMES = ALL_RESOURCES + ('extralargefile', 'tzdata')
class Namespace(argparse.Namespace):
def __init__(self, **kwargs) -> None:
self.ci = False
self.testdir = None
self.verbose = 0
self.quiet = False
self.exclude = False
self.cleanup = False
self.wait = False
self.list_cases = False
self.list_tests = False
self.single = False
self.randomize = False
self.fromfile = None
@ -157,8 +152,8 @@ class Namespace(argparse.Namespace):
self.trace = False
self.coverdir = 'coverage'
self.runleaks = False
self.huntrleaks = False
self.verbose2 = False
self.huntrleaks: tuple[int, int, str] | None = None
self.rerun = False
self.verbose3 = False
self.print_slow = False
self.random_seed = None
@ -170,6 +165,14 @@ class Namespace(argparse.Namespace):
self.ignore_tests = None
self.pgo = False
self.pgo_extended = False
self.worker_json = None
self.start = None
self.timeout = None
self.memlimit = None
self.threshold = None
self.fail_rerun = False
self.tempdir = None
self._add_python_opts = True
super().__init__(**kwargs)
@ -198,25 +201,35 @@ def _create_parser():
# We add help explicitly to control what argument group it renders under.
group.add_argument('-h', '--help', action='help',
help='show this help message and exit')
group.add_argument('--timeout', metavar='TIMEOUT', type=float,
group.add_argument('--fast-ci', action='store_true',
help='Fast Continuous Integration (CI) mode used by '
'GitHub Actions')
group.add_argument('--slow-ci', action='store_true',
help='Slow Continuous Integration (CI) mode used by '
'buildbot workers')
group.add_argument('--timeout', metavar='TIMEOUT',
help='dump the traceback and exit if a test takes '
'more than TIMEOUT seconds; disabled if TIMEOUT '
'is negative or equals to zero')
group.add_argument('--wait', action='store_true',
help='wait for user input, e.g., allow a debugger '
'to be attached')
group.add_argument('--worker-args', metavar='ARGS')
group.add_argument('-S', '--start', metavar='START',
help='the name of the test at which to start.' +
more_details)
group.add_argument('-p', '--python', metavar='PYTHON',
help='Command to run Python test subprocesses with.')
group.add_argument('--randseed', metavar='SEED',
dest='random_seed', type=int,
help='pass a global random seed')
group = parser.add_argument_group('Verbosity')
group.add_argument('-v', '--verbose', action='count',
help='run tests in verbose mode with output to stdout')
group.add_argument('-w', '--verbose2', action='store_true',
group.add_argument('-w', '--rerun', action='store_true',
help='re-run failed tests in verbose mode')
group.add_argument('--verbose2', action='store_true', dest='rerun',
help='deprecated alias to --rerun')
group.add_argument('-W', '--verbose3', action='store_true',
help='display test output on failure')
group.add_argument('-q', '--quiet', action='store_true',
@ -229,10 +242,6 @@ def _create_parser():
group = parser.add_argument_group('Selecting tests')
group.add_argument('-r', '--randomize', action='store_true',
help='randomize test execution order.' + more_details)
group.add_argument('--randseed', metavar='SEED',
dest='random_seed', type=int,
help='pass a random seed to reproduce a previous '
'random run')
group.add_argument('-f', '--fromfile', metavar='FILE',
help='read names of tests to run from a file.' +
more_details)
@ -311,6 +320,9 @@ def _create_parser():
group.add_argument('--fail-env-changed', action='store_true',
help='if a test file alters the environment, mark '
'the test as failed')
group.add_argument('--fail-rerun', action='store_true',
help='if a test failed and then passed when re-run, '
'mark the tests as failed')
group.add_argument('--junit-xml', dest='xmlpath', metavar='FILENAME',
help='writes JUnit-style XML results to the specified '
@ -319,6 +331,9 @@ def _create_parser():
help='override the working directory for the test run')
group.add_argument('--cleanup', action='store_true',
help='remove old test_python_* directories')
group.add_argument('--dont-add-python-opts', dest='_add_python_opts',
action='store_false',
help="internal option, don't use it")
return parser
@ -369,7 +384,50 @@ def _parse_args(args, **kwargs):
for arg in ns.args:
if arg.startswith('-'):
parser.error("unrecognized arguments: %s" % arg)
sys.exit(1)
if ns.timeout is not None:
# Support "--timeout=" (no value) so Makefile.pre.pre TESTTIMEOUT
# can be used by "make buildbottest" and "make test".
if ns.timeout != "":
try:
ns.timeout = float(ns.timeout)
except ValueError:
parser.error(f"invalid timeout value: {ns.timeout!r}")
else:
ns.timeout = None
# Continuous Integration (CI): common options for fast/slow CI modes
if ns.slow_ci or ns.fast_ci:
# Similar to options:
#
# -j0 --randomize --fail-env-changed --fail-rerun --rerun
# --slowest --verbose3
if ns.use_mp is None:
ns.use_mp = 0
ns.randomize = True
ns.fail_env_changed = True
ns.fail_rerun = True
if ns.python is None:
ns.rerun = True
ns.print_slow = True
ns.verbose3 = True
else:
ns._add_python_opts = False
# When both --slow-ci and --fast-ci options are present,
# --slow-ci has the priority
if ns.slow_ci:
# Similar to: -u "all" --timeout=1200
if not ns.use:
ns.use = [['all']]
if ns.timeout is None:
ns.timeout = 1200 # 20 minutes
elif ns.fast_ci:
# Similar to: -u "all,-cpu" --timeout=600
if not ns.use:
ns.use = [['all', '-cpu']]
if ns.timeout is None:
ns.timeout = 600 # 10 minutes
if ns.single and ns.fromfile:
parser.error("-s and -f don't go together!")
@ -382,7 +440,7 @@ def _parse_args(args, **kwargs):
ns.python = shlex.split(ns.python)
if ns.failfast and not (ns.verbose or ns.verbose3):
parser.error("-G/--failfast needs either -v or -W")
if ns.pgo and (ns.verbose or ns.verbose2 or ns.verbose3):
if ns.pgo and (ns.verbose or ns.rerun or ns.verbose3):
parser.error("--pgo/-v don't go together!")
if ns.pgo_extended:
ns.pgo = True # pgo_extended implies pgo
@ -396,10 +454,6 @@ def _parse_args(args, **kwargs):
if ns.timeout is not None:
if ns.timeout <= 0:
ns.timeout = None
if ns.use_mp is not None:
if ns.use_mp <= 0:
# Use all cores + extras for tests that like to sleep
ns.use_mp = 2 + (os.cpu_count() or 1)
if ns.use:
for a in ns.use:
for r in a:
@ -443,4 +497,13 @@ def _parse_args(args, **kwargs):
# --forever implies --failfast
ns.failfast = True
if ns.huntrleaks:
warmup, repetitions, _ = ns.huntrleaks
if warmup < 1 or repetitions < 1:
msg = ("Invalid values for the --huntrleaks/-R parameters. The "
"number of warmups and repetitions must be at least 1 "
"each (1:1).")
print(msg, file=sys.stderr, flush=True)
sys.exit(2)
return ns

View file

@ -0,0 +1,105 @@
import os
import sys
import unittest
from test import support
from .utils import (
StrPath, TestName, TestTuple, TestList, FilterTuple,
abs_module_name, count, printlist)
# If these test directories are encountered recurse into them and treat each
# "test_*.py" file or each sub-directory as a separate test module. This can
# increase parallelism.
#
# Beware this can't generally be done for any directory with sub-tests as the
# __init__.py may do things which alter what tests are to be run.
SPLITTESTDIRS: set[TestName] = {
"test_asyncio",
"test_concurrent_futures",
"test_future_stmt",
"test_gdb",
"test_multiprocessing_fork",
"test_multiprocessing_forkserver",
"test_multiprocessing_spawn",
}
def findtestdir(path: StrPath | None = None) -> StrPath:
return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
def findtests(*, testdir: StrPath | None = None, exclude=(),
split_test_dirs: set[TestName] = SPLITTESTDIRS,
base_mod: str = "") -> TestList:
"""Return a list of all applicable test modules."""
testdir = findtestdir(testdir)
tests = []
for name in os.listdir(testdir):
mod, ext = os.path.splitext(name)
if (not mod.startswith("test_")) or (mod in exclude):
continue
if base_mod:
fullname = f"{base_mod}.{mod}"
else:
fullname = mod
if fullname in split_test_dirs:
subdir = os.path.join(testdir, mod)
if not base_mod:
fullname = f"test.{mod}"
tests.extend(findtests(testdir=subdir, exclude=exclude,
split_test_dirs=split_test_dirs,
base_mod=fullname))
elif ext in (".py", ""):
tests.append(fullname)
return sorted(tests)
def split_test_packages(tests, *, testdir: StrPath | None = None, exclude=(),
split_test_dirs=SPLITTESTDIRS):
testdir = findtestdir(testdir)
splitted = []
for name in tests:
if name in split_test_dirs:
subdir = os.path.join(testdir, name)
splitted.extend(findtests(testdir=subdir, exclude=exclude,
split_test_dirs=split_test_dirs,
base_mod=name))
else:
splitted.append(name)
return splitted
def _list_cases(suite):
for test in suite:
if isinstance(test, unittest.loader._FailedTest):
continue
if isinstance(test, unittest.TestSuite):
_list_cases(test)
elif isinstance(test, unittest.TestCase):
if support.match_test(test):
print(test.id())
def list_cases(tests: TestTuple, *,
match_tests: FilterTuple | None = None,
ignore_tests: FilterTuple | None = None,
test_dir: StrPath | None = None):
support.verbose = False
support.set_match_tests(match_tests, ignore_tests)
skipped = []
for test_name in tests:
module_name = abs_module_name(test_name, test_dir)
try:
suite = unittest.defaultTestLoader.loadTestsFromName(module_name)
_list_cases(suite)
except unittest.SkipTest:
skipped.append(test_name)
if skipped:
sys.stdout.flush()
stderr = sys.stderr
print(file=stderr)
print(count(len(skipped), "test"), "skipped:", file=stderr)
printlist(skipped, file=stderr)

View file

@ -0,0 +1,86 @@
import os
import time
from test.support import MS_WINDOWS
from .results import TestResults
from .runtests import RunTests
from .utils import print_warning
if MS_WINDOWS:
from .win_utils import WindowsLoadTracker
class Logger:
def __init__(self, results: TestResults, quiet: bool, pgo: bool):
self.start_time = time.perf_counter()
self.test_count_text = ''
self.test_count_width = 3
self.win_load_tracker: WindowsLoadTracker | None = None
self._results: TestResults = results
self._quiet: bool = quiet
self._pgo: bool = pgo
def log(self, line: str = '') -> None:
empty = not line
# add the system load prefix: "load avg: 1.80 "
load_avg = self.get_load_avg()
if load_avg is not None:
line = f"load avg: {load_avg:.2f} {line}"
# add the timestamp prefix: "0:01:05 "
log_time = time.perf_counter() - self.start_time
mins, secs = divmod(int(log_time), 60)
hours, mins = divmod(mins, 60)
formatted_log_time = "%d:%02d:%02d" % (hours, mins, secs)
line = f"{formatted_log_time} {line}"
if empty:
line = line[:-1]
print(line, flush=True)
def get_load_avg(self) -> float | None:
if hasattr(os, 'getloadavg'):
return os.getloadavg()[0]
if self.win_load_tracker is not None:
return self.win_load_tracker.getloadavg()
return None
def display_progress(self, test_index: int, text: str) -> None:
if self._quiet:
return
results = self._results
# "[ 51/405/1] test_tcl passed"
line = f"{test_index:{self.test_count_width}}{self.test_count_text}"
fails = len(results.bad) + len(results.env_changed)
if fails and not self._pgo:
line = f"{line}/{fails}"
self.log(f"[{line}] {text}")
def set_tests(self, runtests: RunTests) -> None:
if runtests.forever:
self.test_count_text = ''
self.test_count_width = 3
else:
self.test_count_text = '/{}'.format(len(runtests.tests))
self.test_count_width = len(self.test_count_text) - 1
def start_load_tracker(self) -> None:
if not MS_WINDOWS:
return
try:
self.win_load_tracker = WindowsLoadTracker()
except PermissionError as error:
# Standard accounts may not have access to the performance
# counters.
print_warning(f'Failed to create WindowsLoadTracker: {error}')
def stop_load_tracker(self) -> None:
if self.win_load_tracker is None:
return
self.win_load_tracker.close()
self.win_load_tracker = None

File diff suppressed because it is too large Load diff

View file

@ -42,15 +42,15 @@ PGO_TESTS = [
'test_set',
'test_sqlite3',
'test_statistics',
'test_str',
'test_struct',
'test_tabnanny',
'test_time',
'test_unicode',
'test_xml_etree',
'test_xml_etree_c',
]
def setup_pgo_tests(ns):
if not ns.args and not ns.pgo_extended:
def setup_pgo_tests(cmdline_args, pgo_extended: bool):
if not cmdline_args and not pgo_extended:
# run default set of tests for PGO training
ns.args = PGO_TESTS[:]
cmdline_args[:] = PGO_TESTS[:]

View file

@ -1,10 +1,13 @@
import os
import sys
import warnings
from inspect import isabstract
from typing import Any
from test import support
from test.support import os_helper
from test.libregrtest.utils import clear_caches
from .runtests import HuntRefleak
from .utils import clear_caches
try:
from _abc import _get_dump
@ -19,7 +22,9 @@ except ImportError:
cls._abc_negative_cache, cls._abc_negative_cache_version)
def dash_R(ns, test_name, test_func):
def runtest_refleak(test_name, test_func,
hunt_refleak: HuntRefleak,
quiet: bool):
"""Run a test multiple times, looking for reference leaks.
Returns:
@ -41,6 +46,7 @@ def dash_R(ns, test_name, test_func):
fs = warnings.filters[:]
ps = copyreg.dispatch_table.copy()
pic = sys.path_importer_cache.copy()
zdc: dict[str, Any] | None
try:
import zipimport
except ImportError:
@ -62,9 +68,10 @@ def dash_R(ns, test_name, test_func):
def get_pooled_int(value):
return int_pool.setdefault(value, value)
nwarmup, ntracked, fname = ns.huntrleaks
fname = os.path.join(os_helper.SAVEDCWD, fname)
repcount = nwarmup + ntracked
warmups = hunt_refleak.warmups
runs = hunt_refleak.runs
filename = hunt_refleak.filename
repcount = warmups + runs
# Pre-allocate to ensure that the loop doesn't allocate anything new
rep_range = list(range(repcount))
@ -73,12 +80,11 @@ def dash_R(ns, test_name, test_func):
fd_deltas = [0] * repcount
getallocatedblocks = sys.getallocatedblocks
gettotalrefcount = sys.gettotalrefcount
_getquickenedcount = sys._getquickenedcount
fd_count = os_helper.fd_count
# initialize variables to make pyflakes quiet
rc_before = alloc_before = fd_before = 0
if not ns.quiet:
if not quiet:
print("beginning", repcount, "repetitions", file=sys.stderr)
print(("1234567890"*(repcount//10 + 1))[:repcount], file=sys.stderr,
flush=True)
@ -93,12 +99,12 @@ def dash_R(ns, test_name, test_func):
dash_R_cleanup(fs, ps, pic, zdc, abcs)
support.gc_collect()
# Read memory statistics immediately after the garbage collection
alloc_after = getallocatedblocks() - _getquickenedcount()
# Read memory statistics immediately after the garbage collection.
alloc_after = getallocatedblocks()
rc_after = gettotalrefcount()
fd_after = fd_count()
if not ns.quiet:
if not quiet:
print('.', end='', file=sys.stderr, flush=True)
rc_deltas[i] = get_pooled_int(rc_after - rc_before)
@ -109,7 +115,7 @@ def dash_R(ns, test_name, test_func):
rc_before = rc_after
fd_before = fd_after
if not ns.quiet:
if not quiet:
print(file=sys.stderr)
# These checkers return False on success, True on failure
@ -138,12 +144,12 @@ def dash_R(ns, test_name, test_func):
(fd_deltas, 'file descriptors', check_fd_deltas)
]:
# ignore warmup runs
deltas = deltas[nwarmup:]
deltas = deltas[warmups:]
if checker(deltas):
msg = '%s leaked %s %s, sum=%s' % (
test_name, deltas, item_name, sum(deltas))
print(msg, file=sys.stderr, flush=True)
with open(fname, "a", encoding="utf-8") as refrep:
with open(filename, "a", encoding="utf-8") as refrep:
print(msg, file=refrep)
refrep.flush()
failed = True
@ -169,6 +175,7 @@ def dash_R_cleanup(fs, ps, pic, zdc, abcs):
zipimport._zip_directory_cache.update(zdc)
# Clear ABC registries, restoring previously saved ABC registries.
# ignore deprecation warning for collections.abc.ByteString
abs_classes = [getattr(collections.abc, a) for a in collections.abc.__all__]
abs_classes = filter(isabstract, abs_classes)
for abc in abs_classes:

View file

@ -0,0 +1,190 @@
import dataclasses
import json
from typing import Any
from test.support import TestStats
from .utils import (
StrJSON, TestName, FilterTuple,
format_duration, normalize_test_name, print_warning)
# Avoid enum.Enum to reduce the number of imports when tests are run
class State:
PASSED = "PASSED"
FAILED = "FAILED"
SKIPPED = "SKIPPED"
UNCAUGHT_EXC = "UNCAUGHT_EXC"
REFLEAK = "REFLEAK"
ENV_CHANGED = "ENV_CHANGED"
RESOURCE_DENIED = "RESOURCE_DENIED"
INTERRUPTED = "INTERRUPTED"
WORKER_FAILED = "WORKER_FAILED" # non-zero worker process exit code
WORKER_BUG = "WORKER_BUG" # exception when running a worker
DID_NOT_RUN = "DID_NOT_RUN"
TIMEOUT = "TIMEOUT"
@staticmethod
def is_failed(state):
return state in {
State.FAILED,
State.UNCAUGHT_EXC,
State.REFLEAK,
State.WORKER_FAILED,
State.WORKER_BUG,
State.TIMEOUT}
@staticmethod
def has_meaningful_duration(state):
# Consider that the duration is meaningless for these cases.
# For example, if a whole test file is skipped, its duration
# is unlikely to be the duration of executing its tests,
# but just the duration to execute code which skips the test.
return state not in {
State.SKIPPED,
State.RESOURCE_DENIED,
State.INTERRUPTED,
State.WORKER_FAILED,
State.WORKER_BUG,
State.DID_NOT_RUN}
@staticmethod
def must_stop(state):
return state in {
State.INTERRUPTED,
State.WORKER_BUG,
}
@dataclasses.dataclass(slots=True)
class TestResult:
test_name: TestName
state: str | None = None
# Test duration in seconds
duration: float | None = None
xml_data: list[str] | None = None
stats: TestStats | None = None
# errors and failures copied from support.TestFailedWithDetails
errors: list[tuple[str, str]] | None = None
failures: list[tuple[str, str]] | None = None
def is_failed(self, fail_env_changed: bool) -> bool:
if self.state == State.ENV_CHANGED:
return fail_env_changed
return State.is_failed(self.state)
def _format_failed(self):
if self.errors and self.failures:
le = len(self.errors)
lf = len(self.failures)
error_s = "error" + ("s" if le > 1 else "")
failure_s = "failure" + ("s" if lf > 1 else "")
return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"
if self.errors:
le = len(self.errors)
error_s = "error" + ("s" if le > 1 else "")
return f"{self.test_name} failed ({le} {error_s})"
if self.failures:
lf = len(self.failures)
failure_s = "failure" + ("s" if lf > 1 else "")
return f"{self.test_name} failed ({lf} {failure_s})"
return f"{self.test_name} failed"
def __str__(self) -> str:
match self.state:
case State.PASSED:
return f"{self.test_name} passed"
case State.FAILED:
return self._format_failed()
case State.SKIPPED:
return f"{self.test_name} skipped"
case State.UNCAUGHT_EXC:
return f"{self.test_name} failed (uncaught exception)"
case State.REFLEAK:
return f"{self.test_name} failed (reference leak)"
case State.ENV_CHANGED:
return f"{self.test_name} failed (env changed)"
case State.RESOURCE_DENIED:
return f"{self.test_name} skipped (resource denied)"
case State.INTERRUPTED:
return f"{self.test_name} interrupted"
case State.WORKER_FAILED:
return f"{self.test_name} worker non-zero exit code"
case State.WORKER_BUG:
return f"{self.test_name} worker bug"
case State.DID_NOT_RUN:
return f"{self.test_name} ran no tests"
case State.TIMEOUT:
return f"{self.test_name} timed out ({format_duration(self.duration)})"
case _:
raise ValueError("unknown result state: {state!r}")
def has_meaningful_duration(self):
return State.has_meaningful_duration(self.state)
def set_env_changed(self):
if self.state is None or self.state == State.PASSED:
self.state = State.ENV_CHANGED
def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool:
if State.must_stop(self.state):
return True
if fail_fast and self.is_failed(fail_env_changed):
return True
return False
def get_rerun_match_tests(self) -> FilterTuple | None:
match_tests = []
errors = self.errors or []
failures = self.failures or []
for error_list, is_error in (
(errors, True),
(failures, False),
):
for full_name, *_ in error_list:
match_name = normalize_test_name(full_name, is_error=is_error)
if match_name is None:
# 'setUpModule (test.test_sys)': don't filter tests
return None
if not match_name:
error_type = "ERROR" if is_error else "FAIL"
print_warning(f"rerun failed to parse {error_type} test name: "
f"{full_name!r}: don't filter tests")
return None
match_tests.append(match_name)
if not match_tests:
return None
return tuple(match_tests)
def write_json_into(self, file) -> None:
json.dump(self, file, cls=_EncodeTestResult)
@staticmethod
def from_json(worker_json: StrJSON) -> 'TestResult':
return json.loads(worker_json, object_hook=_decode_test_result)
class _EncodeTestResult(json.JSONEncoder):
def default(self, o: Any) -> dict[str, Any]:
if isinstance(o, TestResult):
result = dataclasses.asdict(o)
result["__test_result__"] = o.__class__.__name__
return result
else:
return super().default(o)
def _decode_test_result(data: dict[str, Any]) -> TestResult | dict[str, Any]:
if "__test_result__" in data:
data.pop('__test_result__')
if data['stats'] is not None:
data['stats'] = TestStats(**data['stats'])
return TestResult(**data)
else:
return data

View file

@ -0,0 +1,261 @@
import sys
from test.support import TestStats
from .runtests import RunTests
from .result import State, TestResult
from .utils import (
StrPath, TestName, TestTuple, TestList, FilterDict,
printlist, count, format_duration)
# Python uses exit code 1 when an exception is not catched
# argparse.ArgumentParser.error() uses exit code 2
EXITCODE_BAD_TEST = 2
EXITCODE_ENV_CHANGED = 3
EXITCODE_NO_TESTS_RAN = 4
EXITCODE_RERUN_FAIL = 5
EXITCODE_INTERRUPTED = 130 # 128 + signal.SIGINT=2
class TestResults:
def __init__(self):
self.bad: TestList = []
self.good: TestList = []
self.rerun_bad: TestList = []
self.skipped: TestList = []
self.resource_denied: TestList = []
self.env_changed: TestList = []
self.run_no_tests: TestList = []
self.rerun: TestList = []
self.rerun_results: list[TestResult] = []
self.interrupted: bool = False
self.worker_bug: bool = False
self.test_times: list[tuple[float, TestName]] = []
self.stats = TestStats()
# used by --junit-xml
self.testsuite_xml: list[str] = []
def is_all_good(self):
return (not self.bad
and not self.skipped
and not self.interrupted
and not self.worker_bug)
def get_executed(self):
return (set(self.good) | set(self.bad) | set(self.skipped)
| set(self.resource_denied) | set(self.env_changed)
| set(self.run_no_tests))
def no_tests_run(self):
return not any((self.good, self.bad, self.skipped, self.interrupted,
self.env_changed))
def get_state(self, fail_env_changed):
state = []
if self.bad:
state.append("FAILURE")
elif fail_env_changed and self.env_changed:
state.append("ENV CHANGED")
elif self.no_tests_run():
state.append("NO TESTS RAN")
if self.interrupted:
state.append("INTERRUPTED")
if self.worker_bug:
state.append("WORKER BUG")
if not state:
state.append("SUCCESS")
return ', '.join(state)
def get_exitcode(self, fail_env_changed, fail_rerun):
exitcode = 0
if self.bad:
exitcode = EXITCODE_BAD_TEST
elif self.interrupted:
exitcode = EXITCODE_INTERRUPTED
elif fail_env_changed and self.env_changed:
exitcode = EXITCODE_ENV_CHANGED
elif self.no_tests_run():
exitcode = EXITCODE_NO_TESTS_RAN
elif fail_rerun and self.rerun:
exitcode = EXITCODE_RERUN_FAIL
elif self.worker_bug:
exitcode = EXITCODE_BAD_TEST
return exitcode
def accumulate_result(self, result: TestResult, runtests: RunTests):
test_name = result.test_name
rerun = runtests.rerun
fail_env_changed = runtests.fail_env_changed
match result.state:
case State.PASSED:
self.good.append(test_name)
case State.ENV_CHANGED:
self.env_changed.append(test_name)
self.rerun_results.append(result)
case State.SKIPPED:
self.skipped.append(test_name)
case State.RESOURCE_DENIED:
self.resource_denied.append(test_name)
case State.INTERRUPTED:
self.interrupted = True
case State.DID_NOT_RUN:
self.run_no_tests.append(test_name)
case _:
if result.is_failed(fail_env_changed):
self.bad.append(test_name)
self.rerun_results.append(result)
else:
raise ValueError(f"invalid test state: {result.state!r}")
if result.state == State.WORKER_BUG:
self.worker_bug = True
if result.has_meaningful_duration() and not rerun:
self.test_times.append((result.duration, test_name))
if result.stats is not None:
self.stats.accumulate(result.stats)
if rerun:
self.rerun.append(test_name)
xml_data = result.xml_data
if xml_data:
self.add_junit(xml_data)
def need_rerun(self):
return bool(self.rerun_results)
def prepare_rerun(self) -> tuple[TestTuple, FilterDict]:
tests: TestList = []
match_tests_dict = {}
for result in self.rerun_results:
tests.append(result.test_name)
match_tests = result.get_rerun_match_tests()
# ignore empty match list
if match_tests:
match_tests_dict[result.test_name] = match_tests
# Clear previously failed tests
self.rerun_bad.extend(self.bad)
self.bad.clear()
self.env_changed.clear()
self.rerun_results.clear()
return (tuple(tests), match_tests_dict)
def add_junit(self, xml_data: list[str]):
import xml.etree.ElementTree as ET
for e in xml_data:
try:
self.testsuite_xml.append(ET.fromstring(e))
except ET.ParseError:
print(xml_data, file=sys.__stderr__)
raise
def write_junit(self, filename: StrPath):
if not self.testsuite_xml:
# Don't create empty XML file
return
import xml.etree.ElementTree as ET
root = ET.Element("testsuites")
# Manually count the totals for the overall summary
totals = {'tests': 0, 'errors': 0, 'failures': 0}
for suite in self.testsuite_xml:
root.append(suite)
for k in totals:
try:
totals[k] += int(suite.get(k, 0))
except ValueError:
pass
for k, v in totals.items():
root.set(k, str(v))
with open(filename, 'wb') as f:
for s in ET.tostringlist(root):
f.write(s)
def display_result(self, tests: TestTuple, quiet: bool, print_slowest: bool):
if print_slowest:
self.test_times.sort(reverse=True)
print()
print("10 slowest tests:")
for test_time, test in self.test_times[:10]:
print("- %s: %s" % (test, format_duration(test_time)))
all_tests = []
omitted = set(tests) - self.get_executed()
# less important
all_tests.append((omitted, "test", "{} omitted:"))
if not quiet:
all_tests.append((self.skipped, "test", "{} skipped:"))
all_tests.append((self.resource_denied, "test", "{} skipped (resource denied):"))
all_tests.append((self.run_no_tests, "test", "{} run no tests:"))
# more important
all_tests.append((self.env_changed, "test", "{} altered the execution environment (env changed):"))
all_tests.append((self.rerun, "re-run test", "{}:"))
all_tests.append((self.bad, "test", "{} failed:"))
for tests_list, count_text, title_format in all_tests:
if tests_list:
print()
count_text = count(len(tests_list), count_text)
print(title_format.format(count_text))
printlist(tests_list)
if self.good and not quiet:
print()
text = count(len(self.good), "test")
text = f"{text} OK."
if (self.is_all_good() and len(self.good) > 1):
text = f"All {text}"
print(text)
if self.interrupted:
print()
print("Test suite interrupted by signal SIGINT.")
def display_summary(self, first_runtests: RunTests, filtered: bool):
# Total tests
stats = self.stats
text = f'run={stats.tests_run:,}'
if filtered:
text = f"{text} (filtered)"
report = [text]
if stats.failures:
report.append(f'failures={stats.failures:,}')
if stats.skipped:
report.append(f'skipped={stats.skipped:,}')
print(f"Total tests: {' '.join(report)}")
# Total test files
all_tests = [self.good, self.bad, self.rerun,
self.skipped,
self.env_changed, self.run_no_tests]
run = sum(map(len, all_tests))
text = f'run={run}'
if not first_runtests.forever:
ntest = len(first_runtests.tests)
text = f"{text}/{ntest}"
if filtered:
text = f"{text} (filtered)"
report = [text]
for name, tests in (
('failed', self.bad),
('env_changed', self.env_changed),
('skipped', self.skipped),
('resource_denied', self.resource_denied),
('rerun', self.rerun),
('run_no_tests', self.run_no_tests),
):
if tests:
report.append(f'{name}={len(tests)}')
print(f"Total test files: {' '.join(report)}")

View file

@ -0,0 +1,607 @@
import contextlib
import dataclasses
import faulthandler
import os.path
import queue
import signal
import subprocess
import sys
import tempfile
import threading
import time
import traceback
from typing import Literal, TextIO
from test import support
from test.support import os_helper, MS_WINDOWS
from .logger import Logger
from .result import TestResult, State
from .results import TestResults
from .runtests import RunTests, JsonFile, JsonFileType
from .single import PROGRESS_MIN_TIME
from .utils import (
StrPath, TestName,
format_duration, print_warning, count, plural, get_signal_name)
from .worker import create_worker_process, USE_PROCESS_GROUP
if MS_WINDOWS:
import locale
import msvcrt
# Display the running tests if nothing happened last N seconds
PROGRESS_UPDATE = 30.0 # seconds
assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME
# Kill the main process after 5 minutes. It is supposed to write an update
# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest
# buildbot workers.
MAIN_PROCESS_TIMEOUT = 5 * 60.0
assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE
# Time to wait until a worker completes: should be immediate
WAIT_COMPLETED_TIMEOUT = 30.0 # seconds
# Time to wait a killed process (in seconds)
WAIT_KILLED_TIMEOUT = 60.0
# We do not use a generator so multiple threads can call next().
class MultiprocessIterator:
"""A thread-safe iterator over tests for multiprocess mode."""
def __init__(self, tests_iter):
self.lock = threading.Lock()
self.tests_iter = tests_iter
def __iter__(self):
return self
def __next__(self):
with self.lock:
if self.tests_iter is None:
raise StopIteration
return next(self.tests_iter)
def stop(self):
with self.lock:
self.tests_iter = None
@dataclasses.dataclass(slots=True, frozen=True)
class MultiprocessResult:
result: TestResult
# bpo-45410: stderr is written into stdout to keep messages order
worker_stdout: str | None = None
err_msg: str | None = None
ExcStr = str
QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr]
class ExitThread(Exception):
pass
class WorkerError(Exception):
def __init__(self,
test_name: TestName,
err_msg: str | None,
stdout: str | None,
state: str):
result = TestResult(test_name, state=state)
self.mp_result = MultiprocessResult(result, stdout, err_msg)
super().__init__()
class WorkerThread(threading.Thread):
def __init__(self, worker_id: int, runner: "RunWorkers") -> None:
super().__init__()
self.worker_id = worker_id
self.runtests = runner.runtests
self.pending = runner.pending
self.output = runner.output
self.timeout = runner.worker_timeout
self.log = runner.log
self.test_name: TestName | None = None
self.start_time: float | None = None
self._popen: subprocess.Popen[str] | None = None
self._killed = False
self._stopped = False
def __repr__(self) -> str:
info = [f'WorkerThread #{self.worker_id}']
if self.is_alive():
info.append("running")
else:
info.append('stopped')
test = self.test_name
if test:
info.append(f'test={test}')
popen = self._popen
if popen is not None:
dt = time.monotonic() - self.start_time
info.extend((f'pid={self._popen.pid}',
f'time={format_duration(dt)}'))
return '<%s>' % ' '.join(info)
def _kill(self) -> None:
popen = self._popen
if popen is None:
return
if self._killed:
return
self._killed = True
if USE_PROCESS_GROUP:
what = f"{self} process group"
else:
what = f"{self} process"
print(f"Kill {what}", file=sys.stderr, flush=True)
try:
if USE_PROCESS_GROUP:
os.killpg(popen.pid, signal.SIGKILL)
else:
popen.kill()
except ProcessLookupError:
# popen.kill(): the process completed, the WorkerThread thread
# read its exit status, but Popen.send_signal() read the returncode
# just before Popen.wait() set returncode.
pass
except OSError as exc:
print_warning(f"Failed to kill {what}: {exc!r}")
def stop(self) -> None:
# Method called from a different thread to stop this thread
self._stopped = True
self._kill()
def _run_process(self, runtests: RunTests, output_fd: int,
tmp_dir: StrPath | None = None) -> int | None:
popen = create_worker_process(runtests, output_fd, tmp_dir)
self._popen = popen
self._killed = False
try:
if self._stopped:
# If kill() has been called before self._popen is set,
# self._popen is still running. Call again kill()
# to ensure that the process is killed.
self._kill()
raise ExitThread
try:
# gh-94026: stdout+stderr are written to tempfile
retcode = popen.wait(timeout=self.timeout)
assert retcode is not None
return retcode
except subprocess.TimeoutExpired:
if self._stopped:
# kill() has been called: communicate() fails on reading
# closed stdout
raise ExitThread
# On timeout, kill the process
self._kill()
# None means TIMEOUT for the caller
retcode = None
# bpo-38207: Don't attempt to call communicate() again: on it
# can hang until all child processes using stdout
# pipes completes.
except OSError:
if self._stopped:
# kill() has been called: communicate() fails
# on reading closed stdout
raise ExitThread
raise
except:
self._kill()
raise
finally:
self._wait_completed()
self._popen = None
def create_stdout(self, stack: contextlib.ExitStack) -> TextIO:
"""Create stdout temporay file (file descriptor)."""
if MS_WINDOWS:
# gh-95027: When stdout is not a TTY, Python uses the ANSI code
# page for the sys.stdout encoding. If the main process runs in a
# terminal, sys.stdout uses WindowsConsoleIO with UTF-8 encoding.
encoding = locale.getencoding()
else:
encoding = sys.stdout.encoding
# gh-94026: Write stdout+stderr to a tempfile as workaround for
# non-blocking pipes on Emscripten with NodeJS.
# gh-109425: Use "backslashreplace" error handler: log corrupted
# stdout+stderr, instead of failing with a UnicodeDecodeError and not
# logging stdout+stderr at all.
stdout_file = tempfile.TemporaryFile('w+',
encoding=encoding,
errors='backslashreplace')
stack.enter_context(stdout_file)
return stdout_file
def create_json_file(self, stack: contextlib.ExitStack) -> tuple[JsonFile, TextIO | None]:
"""Create JSON file."""
json_file_use_stdout = self.runtests.json_file_use_stdout()
if json_file_use_stdout:
json_file = JsonFile(None, JsonFileType.STDOUT)
json_tmpfile = None
else:
json_tmpfile = tempfile.TemporaryFile('w+', encoding='utf8')
stack.enter_context(json_tmpfile)
json_fd = json_tmpfile.fileno()
if MS_WINDOWS:
json_handle = msvcrt.get_osfhandle(json_fd)
json_file = JsonFile(json_handle,
JsonFileType.WINDOWS_HANDLE)
else:
json_file = JsonFile(json_fd, JsonFileType.UNIX_FD)
return (json_file, json_tmpfile)
def create_worker_runtests(self, test_name: TestName, json_file: JsonFile) -> RunTests:
"""Create the worker RunTests."""
tests = (test_name,)
if self.runtests.rerun:
match_tests = self.runtests.get_match_tests(test_name)
else:
match_tests = None
kwargs = {}
if match_tests:
kwargs['match_tests'] = match_tests
if self.runtests.output_on_failure:
kwargs['verbose'] = True
kwargs['output_on_failure'] = False
return self.runtests.copy(
tests=tests,
json_file=json_file,
**kwargs)
def run_tmp_files(self, worker_runtests: RunTests,
stdout_fd: int) -> tuple[int | None, list[StrPath]]:
# gh-93353: Check for leaked temporary files in the parent process,
# since the deletion of temporary files can happen late during
# Python finalization: too late for libregrtest.
if not support.is_wasi:
# Don't check for leaked temporary files and directories if Python is
# run on WASI. WASI don't pass environment variables like TMPDIR to
# worker processes.
tmp_dir = tempfile.mkdtemp(prefix="test_python_")
tmp_dir = os.path.abspath(tmp_dir)
try:
retcode = self._run_process(worker_runtests,
stdout_fd, tmp_dir)
finally:
tmp_files = os.listdir(tmp_dir)
os_helper.rmtree(tmp_dir)
else:
retcode = self._run_process(worker_runtests, stdout_fd)
tmp_files = []
return (retcode, tmp_files)
def read_stdout(self, stdout_file: TextIO) -> str:
stdout_file.seek(0)
try:
return stdout_file.read().strip()
except Exception as exc:
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
raise WorkerError(self.test_name,
f"Cannot read process stdout: {exc}",
stdout=None,
state=State.WORKER_BUG)
def read_json(self, json_file: JsonFile, json_tmpfile: TextIO | None,
stdout: str) -> tuple[TestResult, str]:
try:
if json_tmpfile is not None:
json_tmpfile.seek(0)
worker_json = json_tmpfile.read()
elif json_file.file_type == JsonFileType.STDOUT:
stdout, _, worker_json = stdout.rpartition("\n")
stdout = stdout.rstrip()
else:
with json_file.open(encoding='utf8') as json_fp:
worker_json = json_fp.read()
except Exception as exc:
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
err_msg = f"Failed to read worker process JSON: {exc}"
raise WorkerError(self.test_name, err_msg, stdout,
state=State.WORKER_BUG)
if not worker_json:
raise WorkerError(self.test_name, "empty JSON", stdout,
state=State.WORKER_BUG)
try:
result = TestResult.from_json(worker_json)
except Exception as exc:
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
err_msg = f"Failed to parse worker process JSON: {exc}"
raise WorkerError(self.test_name, err_msg, stdout,
state=State.WORKER_BUG)
return (result, stdout)
def _runtest(self, test_name: TestName) -> MultiprocessResult:
with contextlib.ExitStack() as stack:
stdout_file = self.create_stdout(stack)
json_file, json_tmpfile = self.create_json_file(stack)
worker_runtests = self.create_worker_runtests(test_name, json_file)
retcode, tmp_files = self.run_tmp_files(worker_runtests,
stdout_file.fileno())
stdout = self.read_stdout(stdout_file)
if retcode is None:
raise WorkerError(self.test_name, stdout=stdout,
err_msg=None,
state=State.TIMEOUT)
if retcode != 0:
name = get_signal_name(retcode)
if name:
retcode = f"{retcode} ({name})"
raise WorkerError(self.test_name, f"Exit code {retcode}", stdout,
state=State.WORKER_FAILED)
result, stdout = self.read_json(json_file, json_tmpfile, stdout)
if tmp_files:
msg = (f'\n\n'
f'Warning -- {test_name} leaked temporary files '
f'({len(tmp_files)}): {", ".join(sorted(tmp_files))}')
stdout += msg
result.set_env_changed()
return MultiprocessResult(result, stdout)
def run(self) -> None:
fail_fast = self.runtests.fail_fast
fail_env_changed = self.runtests.fail_env_changed
while not self._stopped:
try:
try:
test_name = next(self.pending)
except StopIteration:
break
self.start_time = time.monotonic()
self.test_name = test_name
try:
mp_result = self._runtest(test_name)
except WorkerError as exc:
mp_result = exc.mp_result
finally:
self.test_name = None
mp_result.result.duration = time.monotonic() - self.start_time
self.output.put((False, mp_result))
if mp_result.result.must_stop(fail_fast, fail_env_changed):
break
except ExitThread:
break
except BaseException:
self.output.put((True, traceback.format_exc()))
break
def _wait_completed(self) -> None:
popen = self._popen
try:
popen.wait(WAIT_COMPLETED_TIMEOUT)
except (subprocess.TimeoutExpired, OSError) as exc:
print_warning(f"Failed to wait for {self} completion "
f"(timeout={format_duration(WAIT_COMPLETED_TIMEOUT)}): "
f"{exc!r}")
def wait_stopped(self, start_time: float) -> None:
# bpo-38207: RunWorkers.stop_workers() called self.stop()
# which killed the process. Sometimes, killing the process from the
# main thread does not interrupt popen.communicate() in
# WorkerThread thread. This loop with a timeout is a workaround
# for that.
#
# Moreover, if this method fails to join the thread, it is likely
# that Python will hang at exit while calling threading._shutdown()
# which tries again to join the blocked thread. Regrtest.main()
# uses EXIT_TIMEOUT to workaround this second bug.
while True:
# Write a message every second
self.join(1.0)
if not self.is_alive():
break
dt = time.monotonic() - start_time
self.log(f"Waiting for {self} thread for {format_duration(dt)}")
if dt > WAIT_KILLED_TIMEOUT:
print_warning(f"Failed to join {self} in {format_duration(dt)}")
break
def get_running(workers: list[WorkerThread]) -> str | None:
running: list[str] = []
for worker in workers:
test_name = worker.test_name
if not test_name:
continue
dt = time.monotonic() - worker.start_time
if dt >= PROGRESS_MIN_TIME:
text = f'{test_name} ({format_duration(dt)})'
running.append(text)
if not running:
return None
return f"running ({len(running)}): {', '.join(running)}"
class RunWorkers:
def __init__(self, num_workers: int, runtests: RunTests,
logger: Logger, results: TestResults) -> None:
self.num_workers = num_workers
self.runtests = runtests
self.log = logger.log
self.display_progress = logger.display_progress
self.results: TestResults = results
self.output: queue.Queue[QueueOutput] = queue.Queue()
tests_iter = runtests.iter_tests()
self.pending = MultiprocessIterator(tests_iter)
self.timeout = runtests.timeout
if self.timeout is not None:
# Rely on faulthandler to kill a worker process. This timouet is
# when faulthandler fails to kill a worker process. Give a maximum
# of 5 minutes to faulthandler to kill the worker.
self.worker_timeout: float | None = min(self.timeout * 1.5, self.timeout + 5 * 60)
else:
self.worker_timeout = None
self.workers: list[WorkerThread] | None = None
jobs = self.runtests.get_jobs()
if jobs is not None:
# Don't spawn more threads than the number of jobs:
# these worker threads would never get anything to do.
self.num_workers = min(self.num_workers, jobs)
def start_workers(self) -> None:
self.workers = [WorkerThread(index, self)
for index in range(1, self.num_workers + 1)]
jobs = self.runtests.get_jobs()
if jobs is not None:
tests = count(jobs, 'test')
else:
tests = 'tests'
nworkers = len(self.workers)
processes = plural(nworkers, "process", "processes")
msg = (f"Run {tests} in parallel using "
f"{nworkers} worker {processes}")
if self.timeout:
msg += (" (timeout: %s, worker timeout: %s)"
% (format_duration(self.timeout),
format_duration(self.worker_timeout)))
self.log(msg)
for worker in self.workers:
worker.start()
def stop_workers(self) -> None:
start_time = time.monotonic()
for worker in self.workers:
worker.stop()
for worker in self.workers:
worker.wait_stopped(start_time)
def _get_result(self) -> QueueOutput | None:
pgo = self.runtests.pgo
use_faulthandler = (self.timeout is not None)
# bpo-46205: check the status of workers every iteration to avoid
# waiting forever on an empty queue.
while any(worker.is_alive() for worker in self.workers):
if use_faulthandler:
faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
exit=True)
# wait for a thread
try:
return self.output.get(timeout=PROGRESS_UPDATE)
except queue.Empty:
pass
if not pgo:
# display progress
running = get_running(self.workers)
if running:
self.log(running)
# all worker threads are done: consume pending results
try:
return self.output.get(timeout=0)
except queue.Empty:
return None
def display_result(self, mp_result: MultiprocessResult) -> None:
result = mp_result.result
pgo = self.runtests.pgo
text = str(result)
if mp_result.err_msg:
# WORKER_BUG
text += ' (%s)' % mp_result.err_msg
elif (result.duration >= PROGRESS_MIN_TIME and not pgo):
text += ' (%s)' % format_duration(result.duration)
if not pgo:
running = get_running(self.workers)
if running:
text += f' -- {running}'
self.display_progress(self.test_index, text)
def _process_result(self, item: QueueOutput) -> TestResult:
"""Returns True if test runner must stop."""
if item[0]:
# Thread got an exception
format_exc = item[1]
print_warning(f"regrtest worker thread failed: {format_exc}")
result = TestResult("<regrtest worker>", state=State.WORKER_BUG)
self.results.accumulate_result(result, self.runtests)
return result
self.test_index += 1
mp_result = item[1]
result = mp_result.result
self.results.accumulate_result(result, self.runtests)
self.display_result(mp_result)
# Display worker stdout
if not self.runtests.output_on_failure:
show_stdout = True
else:
# --verbose3 ignores stdout on success
show_stdout = (result.state != State.PASSED)
if show_stdout:
stdout = mp_result.worker_stdout
if stdout:
print(stdout, flush=True)
return result
def run(self) -> None:
fail_fast = self.runtests.fail_fast
fail_env_changed = self.runtests.fail_env_changed
self.start_workers()
self.test_index = 0
try:
while True:
item = self._get_result()
if item is None:
break
result = self._process_result(item)
if result.must_stop(fail_fast, fail_env_changed):
break
except KeyboardInterrupt:
print()
self.results.interrupted = True
finally:
if self.timeout is not None:
faulthandler.cancel_dump_traceback_later()
# Always ensure that all worker processes are no longer
# worker when we exit this function
self.pending.stop()
self.stop_workers()

View file

@ -1,479 +0,0 @@
import dataclasses
import doctest
import faulthandler
import functools
import gc
import importlib
import io
import os
import sys
import time
import traceback
import unittest
from test import support
from test.support import TestStats
from test.support import os_helper
from test.support import threading_helper
from test.libregrtest.cmdline import Namespace
from test.libregrtest.save_env import saved_test_environment
from test.libregrtest.utils import clear_caches, format_duration, print_warning
# Avoid enum.Enum to reduce the number of imports when tests are run
class State:
PASSED = "PASSED"
FAILED = "FAILED"
SKIPPED = "SKIPPED"
UNCAUGHT_EXC = "UNCAUGHT_EXC"
REFLEAK = "REFLEAK"
ENV_CHANGED = "ENV_CHANGED"
RESOURCE_DENIED = "RESOURCE_DENIED"
INTERRUPTED = "INTERRUPTED"
MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR"
DID_NOT_RUN = "DID_NOT_RUN"
TIMEOUT = "TIMEOUT"
@staticmethod
def is_failed(state):
return state in {
State.FAILED,
State.UNCAUGHT_EXC,
State.REFLEAK,
State.MULTIPROCESSING_ERROR,
State.TIMEOUT}
@staticmethod
def has_meaningful_duration(state):
# Consider that the duration is meaningless for these cases.
# For example, if a whole test file is skipped, its duration
# is unlikely to be the duration of executing its tests,
# but just the duration to execute code which skips the test.
return state not in {
State.SKIPPED,
State.RESOURCE_DENIED,
State.INTERRUPTED,
State.MULTIPROCESSING_ERROR,
State.DID_NOT_RUN}
@dataclasses.dataclass(slots=True)
class TestResult:
test_name: str
state: str | None = None
# Test duration in seconds
duration: float | None = None
xml_data: list[str] | None = None
stats: TestStats | None = None
# errors and failures copied from support.TestFailedWithDetails
errors: list[tuple[str, str]] | None = None
failures: list[tuple[str, str]] | None = None
def is_failed(self, fail_env_changed: bool) -> bool:
if self.state == State.ENV_CHANGED:
return fail_env_changed
return State.is_failed(self.state)
def _format_failed(self):
if self.errors and self.failures:
le = len(self.errors)
lf = len(self.failures)
error_s = "error" + ("s" if le > 1 else "")
failure_s = "failure" + ("s" if lf > 1 else "")
return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"
if self.errors:
le = len(self.errors)
error_s = "error" + ("s" if le > 1 else "")
return f"{self.test_name} failed ({le} {error_s})"
if self.failures:
lf = len(self.failures)
failure_s = "failure" + ("s" if lf > 1 else "")
return f"{self.test_name} failed ({lf} {failure_s})"
return f"{self.test_name} failed"
def __str__(self) -> str:
match self.state:
case State.PASSED:
return f"{self.test_name} passed"
case State.FAILED:
return self._format_failed()
case State.SKIPPED:
return f"{self.test_name} skipped"
case State.UNCAUGHT_EXC:
return f"{self.test_name} failed (uncaught exception)"
case State.REFLEAK:
return f"{self.test_name} failed (reference leak)"
case State.ENV_CHANGED:
return f"{self.test_name} failed (env changed)"
case State.RESOURCE_DENIED:
return f"{self.test_name} skipped (resource denied)"
case State.INTERRUPTED:
return f"{self.test_name} interrupted"
case State.MULTIPROCESSING_ERROR:
return f"{self.test_name} process crashed"
case State.DID_NOT_RUN:
return f"{self.test_name} ran no tests"
case State.TIMEOUT:
return f"{self.test_name} timed out ({format_duration(self.duration)})"
case _:
raise ValueError("unknown result state: {state!r}")
def has_meaningful_duration(self):
return State.has_meaningful_duration(self.state)
def set_env_changed(self):
if self.state is None or self.state == State.PASSED:
self.state = State.ENV_CHANGED
# Minimum duration of a test to display its duration or to mention that
# the test is running in background
PROGRESS_MIN_TIME = 30.0 # seconds
#If these test directories are encountered recurse into them and treat each
# test_ .py or dir as a separate test module. This can increase parallelism.
# Beware this can't generally be done for any directory with sub-tests as the
# __init__.py may do things which alter what tests are to be run.
SPLITTESTDIRS = {
"test_asyncio",
"test_concurrent_futures",
"test_future_stmt",
"test_gdb",
"test_multiprocessing_fork",
"test_multiprocessing_forkserver",
"test_multiprocessing_spawn",
}
# Storage of uncollectable objects
FOUND_GARBAGE = []
def findtestdir(path=None):
return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
def findtests(*, testdir=None, exclude=(),
split_test_dirs=SPLITTESTDIRS, base_mod=""):
"""Return a list of all applicable test modules."""
testdir = findtestdir(testdir)
tests = []
for name in os.listdir(testdir):
mod, ext = os.path.splitext(name)
if (not mod.startswith("test_")) or (mod in exclude):
continue
if mod in split_test_dirs:
subdir = os.path.join(testdir, mod)
mod = f"{base_mod or 'test'}.{mod}"
tests.extend(findtests(testdir=subdir, exclude=exclude,
split_test_dirs=split_test_dirs, base_mod=mod))
elif ext in (".py", ""):
tests.append(f"{base_mod}.{mod}" if base_mod else mod)
return sorted(tests)
def split_test_packages(tests, *, testdir=None, exclude=(),
split_test_dirs=SPLITTESTDIRS):
testdir = findtestdir(testdir)
splitted = []
for name in tests:
if name in split_test_dirs:
subdir = os.path.join(testdir, name)
splitted.extend(findtests(testdir=subdir, exclude=exclude,
split_test_dirs=split_test_dirs,
base_mod=name))
else:
splitted.append(name)
return splitted
def get_abs_module(ns: Namespace, test_name: str) -> str:
if test_name.startswith('test.') or ns.testdir:
return test_name
else:
# Import it from the test package
return 'test.' + test_name
def _runtest_capture_output_timeout_junit(result: TestResult, ns: Namespace) -> None:
# Capture stdout and stderr, set faulthandler timeout,
# and create JUnit XML report.
output_on_failure = ns.verbose3
use_timeout = (
ns.timeout is not None and threading_helper.can_start_thread
)
if use_timeout:
faulthandler.dump_traceback_later(ns.timeout, exit=True)
try:
support.set_match_tests(ns.match_tests, ns.ignore_tests)
support.junit_xml_list = xml_list = [] if ns.xmlpath else None
if ns.failfast:
support.failfast = True
if output_on_failure:
support.verbose = True
stream = io.StringIO()
orig_stdout = sys.stdout
orig_stderr = sys.stderr
print_warning = support.print_warning
orig_print_warnings_stderr = print_warning.orig_stderr
output = None
try:
sys.stdout = stream
sys.stderr = stream
# print_warning() writes into the temporary stream to preserve
# messages order. If support.environment_altered becomes true,
# warnings will be written to sys.stderr below.
print_warning.orig_stderr = stream
_runtest_env_changed_exc(result, ns, display_failure=False)
# Ignore output if the test passed successfully
if result.state != State.PASSED:
output = stream.getvalue()
finally:
sys.stdout = orig_stdout
sys.stderr = orig_stderr
print_warning.orig_stderr = orig_print_warnings_stderr
if output is not None:
sys.stderr.write(output)
sys.stderr.flush()
else:
# Tell tests to be moderately quiet
support.verbose = ns.verbose
_runtest_env_changed_exc(result, ns,
display_failure=not ns.verbose)
if xml_list:
import xml.etree.ElementTree as ET
result.xml_data = [ET.tostring(x).decode('us-ascii')
for x in xml_list]
finally:
if use_timeout:
faulthandler.cancel_dump_traceback_later()
support.junit_xml_list = None
def runtest(ns: Namespace, test_name: str) -> TestResult:
"""Run a single test.
ns -- regrtest namespace of options
test_name -- the name of the test
Returns a TestResult.
If ns.xmlpath is not None, xml_data is a list containing each
generated testsuite element.
"""
start_time = time.perf_counter()
result = TestResult(test_name)
try:
_runtest_capture_output_timeout_junit(result, ns)
except:
if not ns.pgo:
msg = traceback.format_exc()
print(f"test {test_name} crashed -- {msg}",
file=sys.stderr, flush=True)
result.state = State.UNCAUGHT_EXC
result.duration = time.perf_counter() - start_time
return result
def _test_module(the_module):
loader = unittest.TestLoader()
tests = loader.loadTestsFromModule(the_module)
for error in loader.errors:
print(error, file=sys.stderr)
if loader.errors:
raise Exception("errors while loading tests")
return support.run_unittest(tests)
def save_env(ns: Namespace, test_name: str):
return saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo)
def regrtest_runner(result, test_func, ns) -> None:
# Run test_func(), collect statistics, and detect reference and memory
# leaks.
if ns.huntrleaks:
from test.libregrtest.refleak import dash_R
refleak, test_result = dash_R(ns, result.test_name, test_func)
else:
test_result = test_func()
refleak = False
if refleak:
result.state = State.REFLEAK
match test_result:
case TestStats():
stats = test_result
case unittest.TestResult():
stats = TestStats.from_unittest(test_result)
case doctest.TestResults():
stats = TestStats.from_doctest(test_result)
case None:
print_warning(f"{result.test_name} test runner returned None: {test_func}")
stats = None
case _:
print_warning(f"Unknown test result type: {type(test_result)}")
stats = None
result.stats = stats
def _load_run_test(result: TestResult, ns: Namespace) -> None:
# Load the test function, run the test function.
abstest = get_abs_module(ns, result.test_name)
# remove the module from sys.module to reload it if it was already imported
try:
del sys.modules[abstest]
except KeyError:
pass
the_module = importlib.import_module(abstest)
if hasattr(the_module, "test_main"):
# https://github.com/python/cpython/issues/89392
raise Exception(f"Module {result.test_name} defines test_main() which is no longer supported by regrtest")
test_func = functools.partial(_test_module, the_module)
try:
with save_env(ns, result.test_name):
regrtest_runner(result, test_func, ns)
finally:
# First kill any dangling references to open files etc.
# This can also issue some ResourceWarnings which would otherwise get
# triggered during the following test run, and possibly produce
# failures.
support.gc_collect()
cleanup_test_droppings(result.test_name, ns.verbose)
if gc.garbage:
support.environment_altered = True
print_warning(f"{result.test_name} created {len(gc.garbage)} "
f"uncollectable object(s).")
# move the uncollectable objects somewhere,
# so we don't see them again
FOUND_GARBAGE.extend(gc.garbage)
gc.garbage.clear()
support.reap_children()
def _runtest_env_changed_exc(result: TestResult, ns: Namespace,
display_failure: bool = True) -> None:
# Detect environment changes, handle exceptions.
# Reset the environment_altered flag to detect if a test altered
# the environment
support.environment_altered = False
if ns.pgo:
display_failure = False
test_name = result.test_name
try:
clear_caches()
support.gc_collect()
with save_env(ns, test_name):
_load_run_test(result, ns)
except support.ResourceDenied as msg:
if not ns.quiet and not ns.pgo:
print(f"{test_name} skipped -- {msg}", flush=True)
result.state = State.RESOURCE_DENIED
return
except unittest.SkipTest as msg:
if not ns.quiet and not ns.pgo:
print(f"{test_name} skipped -- {msg}", flush=True)
result.state = State.SKIPPED
return
except support.TestFailedWithDetails as exc:
msg = f"test {test_name} failed"
if display_failure:
msg = f"{msg} -- {exc}"
print(msg, file=sys.stderr, flush=True)
result.state = State.FAILED
result.errors = exc.errors
result.failures = exc.failures
result.stats = exc.stats
return
except support.TestFailed as exc:
msg = f"test {test_name} failed"
if display_failure:
msg = f"{msg} -- {exc}"
print(msg, file=sys.stderr, flush=True)
result.state = State.FAILED
result.stats = exc.stats
return
except support.TestDidNotRun:
result.state = State.DID_NOT_RUN
return
except KeyboardInterrupt:
print()
result.state = State.INTERRUPTED
return
except:
if not ns.pgo:
msg = traceback.format_exc()
print(f"test {test_name} crashed -- {msg}",
file=sys.stderr, flush=True)
result.state = State.UNCAUGHT_EXC
return
if support.environment_altered:
result.set_env_changed()
# Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
if result.state is None:
result.state = State.PASSED
def cleanup_test_droppings(test_name: str, verbose: int) -> None:
# Try to clean up junk commonly left behind. While tests shouldn't leave
# any files or directories behind, when a test fails that can be tedious
# for it to arrange. The consequences can be especially nasty on Windows,
# since if a test leaves a file open, it cannot be deleted by name (while
# there's nothing we can do about that here either, we can display the
# name of the offending test, which is a real help).
for name in (os_helper.TESTFN,):
if not os.path.exists(name):
continue
if os.path.isdir(name):
import shutil
kind, nuker = "directory", shutil.rmtree
elif os.path.isfile(name):
kind, nuker = "file", os.unlink
else:
raise RuntimeError(f"os.path says {name!r} exists but is neither "
f"directory nor file")
if verbose:
print_warning(f"{test_name} left behind {kind} {name!r}")
support.environment_altered = True
try:
import stat
# fix possible permissions problems that might prevent cleanup
os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
nuker(name)
except Exception as exc:
print_warning(f"{test_name} left behind {kind} {name!r} "
f"and it couldn't be removed: {exc}")

View file

@ -1,564 +0,0 @@
import dataclasses
import faulthandler
import json
import os.path
import queue
import signal
import subprocess
import sys
import tempfile
import threading
import time
import traceback
from typing import NamedTuple, NoReturn, Literal, Any, TextIO
from test import support
from test.support import os_helper
from test.support import TestStats
from test.libregrtest.cmdline import Namespace
from test.libregrtest.main import Regrtest
from test.libregrtest.runtest import (
runtest, TestResult, State,
PROGRESS_MIN_TIME)
from test.libregrtest.setup import setup_tests
from test.libregrtest.utils import format_duration, print_warning
if sys.platform == 'win32':
import locale
# Display the running tests if nothing happened last N seconds
PROGRESS_UPDATE = 30.0 # seconds
assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME
# Kill the main process after 5 minutes. It is supposed to write an update
# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest
# buildbot workers.
MAIN_PROCESS_TIMEOUT = 5 * 60.0
assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE
# Time to wait until a worker completes: should be immediate
JOIN_TIMEOUT = 30.0 # seconds
USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
def must_stop(result: TestResult, ns: Namespace) -> bool:
if result.state == State.INTERRUPTED:
return True
if ns.failfast and result.is_failed(ns.fail_env_changed):
return True
return False
def parse_worker_args(worker_args) -> tuple[Namespace, str]:
ns_dict, test_name = json.loads(worker_args)
ns = Namespace(**ns_dict)
return (ns, test_name)
def run_test_in_subprocess(testname: str, ns: Namespace, tmp_dir: str, stdout_fh: TextIO) -> subprocess.Popen:
ns_dict = vars(ns)
worker_args = (ns_dict, testname)
worker_args = json.dumps(worker_args)
if ns.python is not None:
executable = ns.python
else:
executable = [sys.executable]
cmd = [*executable, *support.args_from_interpreter_flags(),
'-u', # Unbuffered stdout and stderr
'-m', 'test.regrtest',
'--worker-args', worker_args]
env = dict(os.environ)
if tmp_dir is not None:
env['TMPDIR'] = tmp_dir
env['TEMP'] = tmp_dir
env['TMP'] = tmp_dir
# Running the child from the same working directory as regrtest's original
# invocation ensures that TEMPDIR for the child is the same when
# sysconfig.is_python_build() is true. See issue 15300.
kw = dict(
env=env,
stdout=stdout_fh,
# bpo-45410: Write stderr into stdout to keep messages order
stderr=stdout_fh,
text=True,
close_fds=(os.name != 'nt'),
cwd=os_helper.SAVEDCWD,
)
if USE_PROCESS_GROUP:
kw['start_new_session'] = True
return subprocess.Popen(cmd, **kw)
def run_tests_worker(ns: Namespace, test_name: str) -> NoReturn:
setup_tests(ns)
result = runtest(ns, test_name)
print() # Force a newline (just in case)
# Serialize TestResult as dict in JSON
print(json.dumps(result, cls=EncodeTestResult), flush=True)
sys.exit(0)
# We do not use a generator so multiple threads can call next().
class MultiprocessIterator:
"""A thread-safe iterator over tests for multiprocess mode."""
def __init__(self, tests_iter):
self.lock = threading.Lock()
self.tests_iter = tests_iter
def __iter__(self):
return self
def __next__(self):
with self.lock:
if self.tests_iter is None:
raise StopIteration
return next(self.tests_iter)
def stop(self):
with self.lock:
self.tests_iter = None
class MultiprocessResult(NamedTuple):
result: TestResult
# bpo-45410: stderr is written into stdout to keep messages order
worker_stdout: str | None = None
err_msg: str | None = None
ExcStr = str
QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr]
class ExitThread(Exception):
pass
class TestWorkerProcess(threading.Thread):
def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None:
super().__init__()
self.worker_id = worker_id
self.pending = runner.pending
self.output = runner.output
self.ns = runner.ns
self.timeout = runner.worker_timeout
self.regrtest = runner.regrtest
self.current_test_name = None
self.start_time = None
self._popen = None
self._killed = False
self._stopped = False
def __repr__(self) -> str:
info = [f'TestWorkerProcess #{self.worker_id}']
if self.is_alive():
info.append("running")
else:
info.append('stopped')
test = self.current_test_name
if test:
info.append(f'test={test}')
popen = self._popen
if popen is not None:
dt = time.monotonic() - self.start_time
info.extend((f'pid={self._popen.pid}',
f'time={format_duration(dt)}'))
return '<%s>' % ' '.join(info)
def _kill(self) -> None:
popen = self._popen
if popen is None:
return
if self._killed:
return
self._killed = True
if USE_PROCESS_GROUP:
what = f"{self} process group"
else:
what = f"{self}"
print(f"Kill {what}", file=sys.stderr, flush=True)
try:
if USE_PROCESS_GROUP:
os.killpg(popen.pid, signal.SIGKILL)
else:
popen.kill()
except ProcessLookupError:
# popen.kill(): the process completed, the TestWorkerProcess thread
# read its exit status, but Popen.send_signal() read the returncode
# just before Popen.wait() set returncode.
pass
except OSError as exc:
print_warning(f"Failed to kill {what}: {exc!r}")
def stop(self) -> None:
# Method called from a different thread to stop this thread
self._stopped = True
self._kill()
def mp_result_error(
self,
test_result: TestResult,
stdout: str | None = None,
err_msg=None
) -> MultiprocessResult:
return MultiprocessResult(test_result, stdout, err_msg)
def _run_process(self, test_name: str, tmp_dir: str, stdout_fh: TextIO) -> int:
self.current_test_name = test_name
try:
popen = run_test_in_subprocess(test_name, self.ns, tmp_dir, stdout_fh)
self._killed = False
self._popen = popen
except:
self.current_test_name = None
raise
try:
if self._stopped:
# If kill() has been called before self._popen is set,
# self._popen is still running. Call again kill()
# to ensure that the process is killed.
self._kill()
raise ExitThread
try:
# gh-94026: stdout+stderr are written to tempfile
retcode = popen.wait(timeout=self.timeout)
assert retcode is not None
return retcode
except subprocess.TimeoutExpired:
if self._stopped:
# kill() has been called: communicate() fails on reading
# closed stdout
raise ExitThread
# On timeout, kill the process
self._kill()
# None means TIMEOUT for the caller
retcode = None
# bpo-38207: Don't attempt to call communicate() again: on it
# can hang until all child processes using stdout
# pipes completes.
except OSError:
if self._stopped:
# kill() has been called: communicate() fails
# on reading closed stdout
raise ExitThread
raise
except:
self._kill()
raise
finally:
self._wait_completed()
self._popen = None
self.current_test_name = None
def _runtest(self, test_name: str) -> MultiprocessResult:
if sys.platform == 'win32':
# gh-95027: When stdout is not a TTY, Python uses the ANSI code
# page for the sys.stdout encoding. If the main process runs in a
# terminal, sys.stdout uses WindowsConsoleIO with UTF-8 encoding.
encoding = locale.getencoding()
else:
encoding = sys.stdout.encoding
# gh-94026: Write stdout+stderr to a tempfile as workaround for
# non-blocking pipes on Emscripten with NodeJS.
with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_fh:
# gh-93353: Check for leaked temporary files in the parent process,
# since the deletion of temporary files can happen late during
# Python finalization: too late for libregrtest.
if not support.is_wasi:
# Don't check for leaked temporary files and directories if Python is
# run on WASI. WASI don't pass environment variables like TMPDIR to
# worker processes.
tmp_dir = tempfile.mkdtemp(prefix="test_python_")
tmp_dir = os.path.abspath(tmp_dir)
try:
retcode = self._run_process(test_name, tmp_dir, stdout_fh)
finally:
tmp_files = os.listdir(tmp_dir)
os_helper.rmtree(tmp_dir)
else:
retcode = self._run_process(test_name, None, stdout_fh)
tmp_files = ()
stdout_fh.seek(0)
try:
stdout = stdout_fh.read().strip()
except Exception as exc:
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
err_msg = f"Cannot read process stdout: {exc}"
result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
return self.mp_result_error(result, err_msg=err_msg)
if retcode is None:
result = TestResult(test_name, state=State.TIMEOUT)
return self.mp_result_error(result, stdout)
err_msg = None
if retcode != 0:
err_msg = "Exit code %s" % retcode
else:
stdout, _, worker_json = stdout.rpartition("\n")
stdout = stdout.rstrip()
if not worker_json:
err_msg = "Failed to parse worker stdout"
else:
try:
# deserialize run_tests_worker() output
result = json.loads(worker_json,
object_hook=decode_test_result)
except Exception as exc:
err_msg = "Failed to parse worker JSON: %s" % exc
if err_msg:
result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
return self.mp_result_error(result, stdout, err_msg)
if tmp_files:
msg = (f'\n\n'
f'Warning -- {test_name} leaked temporary files '
f'({len(tmp_files)}): {", ".join(sorted(tmp_files))}')
stdout += msg
result.set_env_changed()
return MultiprocessResult(result, stdout)
def run(self) -> None:
while not self._stopped:
try:
try:
test_name = next(self.pending)
except StopIteration:
break
self.start_time = time.monotonic()
mp_result = self._runtest(test_name)
mp_result.result.duration = time.monotonic() - self.start_time
self.output.put((False, mp_result))
if must_stop(mp_result.result, self.ns):
break
except ExitThread:
break
except BaseException:
self.output.put((True, traceback.format_exc()))
break
def _wait_completed(self) -> None:
popen = self._popen
try:
popen.wait(JOIN_TIMEOUT)
except (subprocess.TimeoutExpired, OSError) as exc:
print_warning(f"Failed to wait for {self} completion "
f"(timeout={format_duration(JOIN_TIMEOUT)}): "
f"{exc!r}")
def wait_stopped(self, start_time: float) -> None:
# bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop()
# which killed the process. Sometimes, killing the process from the
# main thread does not interrupt popen.communicate() in
# TestWorkerProcess thread. This loop with a timeout is a workaround
# for that.
#
# Moreover, if this method fails to join the thread, it is likely
# that Python will hang at exit while calling threading._shutdown()
# which tries again to join the blocked thread. Regrtest.main()
# uses EXIT_TIMEOUT to workaround this second bug.
while True:
# Write a message every second
self.join(1.0)
if not self.is_alive():
break
dt = time.monotonic() - start_time
self.regrtest.log(f"Waiting for {self} thread "
f"for {format_duration(dt)}")
if dt > JOIN_TIMEOUT:
print_warning(f"Failed to join {self} in {format_duration(dt)}")
break
def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]:
running = []
for worker in workers:
current_test_name = worker.current_test_name
if not current_test_name:
continue
dt = time.monotonic() - worker.start_time
if dt >= PROGRESS_MIN_TIME:
text = '%s (%s)' % (current_test_name, format_duration(dt))
running.append(text)
return running
class MultiprocessTestRunner:
def __init__(self, regrtest: Regrtest) -> None:
self.regrtest = regrtest
self.log = self.regrtest.log
self.ns = regrtest.ns
self.output: queue.Queue[QueueOutput] = queue.Queue()
self.pending = MultiprocessIterator(self.regrtest.tests)
if self.ns.timeout is not None:
# Rely on faulthandler to kill a worker process. This timouet is
# when faulthandler fails to kill a worker process. Give a maximum
# of 5 minutes to faulthandler to kill the worker.
self.worker_timeout = min(self.ns.timeout * 1.5,
self.ns.timeout + 5 * 60)
else:
self.worker_timeout = None
self.workers = None
def start_workers(self) -> None:
self.workers = [TestWorkerProcess(index, self)
for index in range(1, self.ns.use_mp + 1)]
msg = f"Run tests in parallel using {len(self.workers)} child processes"
if self.ns.timeout:
msg += (" (timeout: %s, worker timeout: %s)"
% (format_duration(self.ns.timeout),
format_duration(self.worker_timeout)))
self.log(msg)
for worker in self.workers:
worker.start()
def stop_workers(self) -> None:
start_time = time.monotonic()
for worker in self.workers:
worker.stop()
for worker in self.workers:
worker.wait_stopped(start_time)
def _get_result(self) -> QueueOutput | None:
use_faulthandler = (self.ns.timeout is not None)
timeout = PROGRESS_UPDATE
# bpo-46205: check the status of workers every iteration to avoid
# waiting forever on an empty queue.
while any(worker.is_alive() for worker in self.workers):
if use_faulthandler:
faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
exit=True)
# wait for a thread
try:
return self.output.get(timeout=timeout)
except queue.Empty:
pass
# display progress
running = get_running(self.workers)
if running and not self.ns.pgo:
self.log('running: %s' % ', '.join(running))
# all worker threads are done: consume pending results
try:
return self.output.get(timeout=0)
except queue.Empty:
return None
def display_result(self, mp_result: MultiprocessResult) -> None:
result = mp_result.result
text = str(result)
if mp_result.err_msg:
# MULTIPROCESSING_ERROR
text += ' (%s)' % mp_result.err_msg
elif (result.duration >= PROGRESS_MIN_TIME and not self.ns.pgo):
text += ' (%s)' % format_duration(result.duration)
running = get_running(self.workers)
if running and not self.ns.pgo:
text += ' -- running: %s' % ', '.join(running)
self.regrtest.display_progress(self.test_index, text)
def _process_result(self, item: QueueOutput) -> bool:
"""Returns True if test runner must stop."""
if item[0]:
# Thread got an exception
format_exc = item[1]
print_warning(f"regrtest worker thread failed: {format_exc}")
result = TestResult("<regrtest worker>", state=State.MULTIPROCESSING_ERROR)
self.regrtest.accumulate_result(result)
return True
self.test_index += 1
mp_result = item[1]
self.regrtest.accumulate_result(mp_result.result)
self.display_result(mp_result)
if mp_result.worker_stdout:
print(mp_result.worker_stdout, flush=True)
if must_stop(mp_result.result, self.ns):
return True
return False
def run_tests(self) -> None:
self.start_workers()
self.test_index = 0
try:
while True:
item = self._get_result()
if item is None:
break
stop = self._process_result(item)
if stop:
break
except KeyboardInterrupt:
print()
self.regrtest.interrupted = True
finally:
if self.ns.timeout is not None:
faulthandler.cancel_dump_traceback_later()
# Always ensure that all worker processes are no longer
# worker when we exit this function
self.pending.stop()
self.stop_workers()
def run_tests_multiprocess(regrtest: Regrtest) -> None:
MultiprocessTestRunner(regrtest).run_tests()
class EncodeTestResult(json.JSONEncoder):
"""Encode a TestResult (sub)class object into a JSON dict."""
def default(self, o: Any) -> dict[str, Any]:
if isinstance(o, TestResult):
result = dataclasses.asdict(o)
result["__test_result__"] = o.__class__.__name__
return result
return super().default(o)
def decode_test_result(d: dict[str, Any]) -> TestResult | TestStats | dict[str, Any]:
"""Decode a TestResult (sub)class object from a JSON dict."""
if "__test_result__" not in d:
return d
d.pop('__test_result__')
if d['stats'] is not None:
d['stats'] = TestStats(**d['stats'])
return TestResult(**d)

View file

@ -0,0 +1,162 @@
import contextlib
import dataclasses
import json
import os
import subprocess
from typing import Any
from test import support
from .utils import (
StrPath, StrJSON, TestTuple, FilterTuple, FilterDict)
class JsonFileType:
UNIX_FD = "UNIX_FD"
WINDOWS_HANDLE = "WINDOWS_HANDLE"
STDOUT = "STDOUT"
@dataclasses.dataclass(slots=True, frozen=True)
class JsonFile:
# file type depends on file_type:
# - UNIX_FD: file descriptor (int)
# - WINDOWS_HANDLE: handle (int)
# - STDOUT: use process stdout (None)
file: int | None
file_type: str
def configure_subprocess(self, popen_kwargs: dict) -> None:
match self.file_type:
case JsonFileType.UNIX_FD:
# Unix file descriptor
popen_kwargs['pass_fds'] = [self.file]
case JsonFileType.WINDOWS_HANDLE:
# Windows handle
startupinfo = subprocess.STARTUPINFO()
startupinfo.lpAttributeList = {"handle_list": [self.file]}
popen_kwargs['startupinfo'] = startupinfo
@contextlib.contextmanager
def inherit_subprocess(self):
if self.file_type == JsonFileType.WINDOWS_HANDLE:
os.set_handle_inheritable(self.file, True)
try:
yield
finally:
os.set_handle_inheritable(self.file, False)
else:
yield
def open(self, mode='r', *, encoding):
if self.file_type == JsonFileType.STDOUT:
raise ValueError("for STDOUT file type, just use sys.stdout")
file = self.file
if self.file_type == JsonFileType.WINDOWS_HANDLE:
import msvcrt
# Create a file descriptor from the handle
file = msvcrt.open_osfhandle(file, os.O_WRONLY)
return open(file, mode, encoding=encoding)
@dataclasses.dataclass(slots=True, frozen=True)
class HuntRefleak:
warmups: int
runs: int
filename: StrPath
@dataclasses.dataclass(slots=True, frozen=True)
class RunTests:
tests: TestTuple
fail_fast: bool
fail_env_changed: bool
match_tests: FilterTuple | None
ignore_tests: FilterTuple | None
match_tests_dict: FilterDict | None
rerun: bool
forever: bool
pgo: bool
pgo_extended: bool
output_on_failure: bool
timeout: float | None
verbose: int
quiet: bool
hunt_refleak: HuntRefleak | None
test_dir: StrPath | None
use_junit: bool
memory_limit: str | None
gc_threshold: int | None
use_resources: tuple[str, ...]
python_cmd: tuple[str, ...] | None
randomize: bool
random_seed: int | None
json_file: JsonFile | None
def copy(self, **override):
state = dataclasses.asdict(self)
state.update(override)
return RunTests(**state)
def get_match_tests(self, test_name) -> FilterTuple | None:
if self.match_tests_dict is not None:
return self.match_tests_dict.get(test_name, None)
else:
return None
def get_jobs(self):
# Number of run_single_test() calls needed to run all tests.
# None means that there is not bound limit (--forever option).
if self.forever:
return None
return len(self.tests)
def iter_tests(self):
if self.forever:
while True:
yield from self.tests
else:
yield from self.tests
def as_json(self) -> StrJSON:
return json.dumps(self, cls=_EncodeRunTests)
@staticmethod
def from_json(worker_json: StrJSON) -> 'RunTests':
return json.loads(worker_json, object_hook=_decode_runtests)
def json_file_use_stdout(self) -> bool:
# Use STDOUT in two cases:
#
# - If --python command line option is used;
# - On Emscripten and WASI.
#
# On other platforms, UNIX_FD or WINDOWS_HANDLE can be used.
return (
bool(self.python_cmd)
or support.is_emscripten
or support.is_wasi
)
class _EncodeRunTests(json.JSONEncoder):
def default(self, o: Any) -> dict[str, Any]:
if isinstance(o, RunTests):
result = dataclasses.asdict(o)
result["__runtests__"] = True
return result
else:
return super().default(o)
def _decode_runtests(data: dict[str, Any]) -> RunTests | dict[str, Any]:
if "__runtests__" in data:
data.pop('__runtests__')
if data['hunt_refleak']:
data['hunt_refleak'] = HuntRefleak(**data['hunt_refleak'])
if data['json_file']:
data['json_file'] = JsonFile(**data['json_file'])
return RunTests(**data)
else:
return data

View file

@ -3,9 +3,11 @@ import locale
import os
import sys
import threading
from test import support
from test.support import os_helper
from test.libregrtest.utils import print_warning
from .utils import print_warning
class SkipTestEnvironment(Exception):
@ -34,7 +36,7 @@ class saved_test_environment:
items is also printed.
"""
def __init__(self, test_name, verbose=0, quiet=False, *, pgo=False):
def __init__(self, test_name, verbose, quiet, *, pgo):
self.test_name = test_name
self.verbose = verbose
self.quiet = quiet
@ -161,11 +163,11 @@ class saved_test_environment:
warnings.filters[:] = saved_filters[2]
def get_asyncore_socket_map(self):
asyncore = sys.modules.get('asyncore')
asyncore = sys.modules.get('test.support.asyncore')
# XXX Making a copy keeps objects alive until __exit__ gets called.
return asyncore and asyncore.socket_map.copy() or {}
def restore_asyncore_socket_map(self, saved_map):
asyncore = sys.modules.get('asyncore')
asyncore = sys.modules.get('test.support.asyncore')
if asyncore is not None:
asyncore.close_all(ignore_all=True)
asyncore.socket_map.update(saved_map)
@ -257,8 +259,10 @@ class saved_test_environment:
sysconfig._INSTALL_SCHEMES.update(saved[2])
def get_files(self):
# XXX: Maybe add an allow-list here?
return sorted(fn + ('/' if os.path.isdir(fn) else '')
for fn in os.listdir())
for fn in os.listdir()
if not fn.startswith(".hypothesis"))
def restore_files(self, saved_value):
fn = os_helper.TESTFN
if fn not in saved_value and (fn + '/') not in saved_value:

View file

@ -1,24 +1,32 @@
import atexit
import faulthandler
import gc
import os
import random
import signal
import sys
import unittest
from test import support
from test.support.os_helper import TESTFN_UNDECODABLE, FS_NONASCII
try:
import gc
except ImportError:
gc = None
from test.libregrtest.utils import (setup_unraisable_hook,
setup_threading_excepthook)
from .runtests import RunTests
from .utils import (
setup_unraisable_hook, setup_threading_excepthook, fix_umask,
adjust_rlimit_nofile)
UNICODE_GUARD_ENV = "PYTHONREGRTEST_UNICODE_GUARD"
def setup_tests(ns):
def setup_test_dir(testdir: str | None) -> None:
if testdir:
# Prepend test directory to sys.path, so runtest() will be able
# to locate tests
sys.path.insert(0, os.path.abspath(testdir))
def setup_process():
fix_umask()
try:
stderr_fd = sys.__stderr__.fileno()
except (ValueError, AttributeError):
@ -40,14 +48,9 @@ def setup_tests(ns):
for signum in signals:
faulthandler.register(signum, chain=True, file=stderr_fd)
_adjust_resource_limits()
replace_stdout()
support.record_original_stdout(sys.stdout)
adjust_rlimit_nofile()
if ns.testdir:
# Prepend test directory to sys.path, so runtest() will be able
# to locate tests
sys.path.insert(0, os.path.abspath(ns.testdir))
support.record_original_stdout(sys.stdout)
# Some times __path__ and __file__ are not absolute (e.g. while running from
# Lib/) and, if we change the CWD to run the tests in a temporary dir, some
@ -66,19 +69,6 @@ def setup_tests(ns):
if getattr(module, '__file__', None):
module.__file__ = os.path.abspath(module.__file__)
if ns.huntrleaks:
unittest.BaseTestSuite._cleanup = False
if ns.memlimit is not None:
support.set_memlimit(ns.memlimit)
if ns.threshold is not None:
gc.set_threshold(ns.threshold)
support.suppress_msvcrt_asserts(ns.verbose and ns.verbose >= 2)
support.use_resources = ns.use_resources
if hasattr(sys, 'addaudithook'):
# Add an auditing hook for all tests to ensure PySys_Audit is tested
def _test_audit_hook(name, args):
@ -88,7 +78,37 @@ def setup_tests(ns):
setup_unraisable_hook()
setup_threading_excepthook()
timeout = ns.timeout
# Ensure there's a non-ASCII character in env vars at all times to force
# tests consider this case. See BPO-44647 for details.
if TESTFN_UNDECODABLE and os.supports_bytes_environ:
os.environb.setdefault(UNICODE_GUARD_ENV.encode(), TESTFN_UNDECODABLE)
elif FS_NONASCII:
os.environ.setdefault(UNICODE_GUARD_ENV, FS_NONASCII)
def setup_tests(runtests: RunTests):
support.verbose = runtests.verbose
support.failfast = runtests.fail_fast
support.PGO = runtests.pgo
support.PGO_EXTENDED = runtests.pgo_extended
support.set_match_tests(runtests.match_tests, runtests.ignore_tests)
if runtests.use_junit:
support.junit_xml_list = []
from test.support.testresult import RegressionTestResult
RegressionTestResult.USE_XML = True
else:
support.junit_xml_list = None
if runtests.memory_limit is not None:
support.set_memlimit(runtests.memory_limit)
support.suppress_msvcrt_asserts(runtests.verbose >= 2)
support.use_resources = runtests.use_resources
timeout = runtests.timeout
if timeout is not None:
# For a slow buildbot worker, increase SHORT_TIMEOUT and LONG_TIMEOUT
support.LOOPBACK_TIMEOUT = max(support.LOOPBACK_TIMEOUT, timeout / 120)
@ -102,61 +122,10 @@ def setup_tests(ns):
support.SHORT_TIMEOUT = min(support.SHORT_TIMEOUT, timeout)
support.LONG_TIMEOUT = min(support.LONG_TIMEOUT, timeout)
if ns.xmlpath:
from test.support.testresult import RegressionTestResult
RegressionTestResult.USE_XML = True
if runtests.hunt_refleak:
unittest.BaseTestSuite._cleanup = False
# Ensure there's a non-ASCII character in env vars at all times to force
# tests consider this case. See BPO-44647 for details.
if TESTFN_UNDECODABLE and os.supports_bytes_environ:
os.environb.setdefault(UNICODE_GUARD_ENV.encode(), TESTFN_UNDECODABLE)
elif FS_NONASCII:
os.environ.setdefault(UNICODE_GUARD_ENV, FS_NONASCII)
if runtests.gc_threshold is not None:
gc.set_threshold(runtests.gc_threshold)
def replace_stdout():
"""Set stdout encoder error handler to backslashreplace (as stderr error
handler) to avoid UnicodeEncodeError when printing a traceback"""
stdout = sys.stdout
try:
fd = stdout.fileno()
except ValueError:
# On IDLE, sys.stdout has no file descriptor and is not a TextIOWrapper
# object. Leaving sys.stdout unchanged.
#
# Catch ValueError to catch io.UnsupportedOperation on TextIOBase
# and ValueError on a closed stream.
return
sys.stdout = open(fd, 'w',
encoding=stdout.encoding,
errors="backslashreplace",
closefd=False,
newline='\n')
def restore_stdout():
sys.stdout.close()
sys.stdout = stdout
atexit.register(restore_stdout)
def _adjust_resource_limits():
"""Adjust the system resource limits (ulimit) if needed."""
try:
import resource
from resource import RLIMIT_NOFILE, RLIM_INFINITY
except ImportError:
return
fd_limit, max_fds = resource.getrlimit(RLIMIT_NOFILE)
# On macOS the default fd limit is sometimes too low (256) for our
# test suite to succeed. Raise it to something more reasonable.
# 1024 is a common Linux default.
desired_fds = 1024
if fd_limit < desired_fds and fd_limit < max_fds:
new_fd_limit = min(desired_fds, max_fds)
try:
resource.setrlimit(RLIMIT_NOFILE, (new_fd_limit, max_fds))
print(f"Raised RLIMIT_NOFILE: {fd_limit} -> {new_fd_limit}")
except (ValueError, OSError) as err:
print(f"Unable to raise RLIMIT_NOFILE from {fd_limit} to "
f"{new_fd_limit}: {err}.")
random.seed(runtests.random_seed)

View file

@ -0,0 +1,278 @@
import doctest
import faulthandler
import gc
import importlib
import io
import sys
import time
import traceback
import unittest
from test import support
from test.support import TestStats
from test.support import threading_helper
from .result import State, TestResult
from .runtests import RunTests
from .save_env import saved_test_environment
from .setup import setup_tests
from .utils import (
TestName,
clear_caches, remove_testfn, abs_module_name, print_warning)
# Minimum duration of a test to display its duration or to mention that
# the test is running in background
PROGRESS_MIN_TIME = 30.0 # seconds
def run_unittest(test_mod):
loader = unittest.TestLoader()
tests = loader.loadTestsFromModule(test_mod)
for error in loader.errors:
print(error, file=sys.stderr)
if loader.errors:
raise Exception("errors while loading tests")
return support.run_unittest(tests)
def regrtest_runner(result: TestResult, test_func, runtests: RunTests) -> None:
# Run test_func(), collect statistics, and detect reference and memory
# leaks.
if runtests.hunt_refleak:
from .refleak import runtest_refleak
refleak, test_result = runtest_refleak(result.test_name, test_func,
runtests.hunt_refleak,
runtests.quiet)
else:
test_result = test_func()
refleak = False
if refleak:
result.state = State.REFLEAK
stats: TestStats | None
match test_result:
case TestStats():
stats = test_result
case unittest.TestResult():
stats = TestStats.from_unittest(test_result)
case doctest.TestResults():
stats = TestStats.from_doctest(test_result)
case None:
print_warning(f"{result.test_name} test runner returned None: {test_func}")
stats = None
case _:
print_warning(f"Unknown test result type: {type(test_result)}")
stats = None
result.stats = stats
# Storage of uncollectable GC objects (gc.garbage)
GC_GARBAGE = []
def _load_run_test(result: TestResult, runtests: RunTests) -> None:
# Load the test module and run the tests.
test_name = result.test_name
module_name = abs_module_name(test_name, runtests.test_dir)
# Remove the module from sys.module to reload it if it was already imported
sys.modules.pop(module_name, None)
test_mod = importlib.import_module(module_name)
if hasattr(test_mod, "test_main"):
# https://github.com/python/cpython/issues/89392
raise Exception(f"Module {test_name} defines test_main() which "
f"is no longer supported by regrtest")
def test_func():
return run_unittest(test_mod)
try:
regrtest_runner(result, test_func, runtests)
finally:
# First kill any dangling references to open files etc.
# This can also issue some ResourceWarnings which would otherwise get
# triggered during the following test run, and possibly produce
# failures.
support.gc_collect()
remove_testfn(test_name, runtests.verbose)
if gc.garbage:
support.environment_altered = True
print_warning(f"{test_name} created {len(gc.garbage)} "
f"uncollectable object(s)")
# move the uncollectable objects somewhere,
# so we don't see them again
GC_GARBAGE.extend(gc.garbage)
gc.garbage.clear()
support.reap_children()
def _runtest_env_changed_exc(result: TestResult, runtests: RunTests,
display_failure: bool = True) -> None:
# Handle exceptions, detect environment changes.
# Reset the environment_altered flag to detect if a test altered
# the environment
support.environment_altered = False
pgo = runtests.pgo
if pgo:
display_failure = False
quiet = runtests.quiet
test_name = result.test_name
try:
clear_caches()
support.gc_collect()
with saved_test_environment(test_name,
runtests.verbose, quiet, pgo=pgo):
_load_run_test(result, runtests)
except support.ResourceDenied as exc:
if not quiet and not pgo:
print(f"{test_name} skipped -- {exc}", flush=True)
result.state = State.RESOURCE_DENIED
return
except unittest.SkipTest as exc:
if not quiet and not pgo:
print(f"{test_name} skipped -- {exc}", flush=True)
result.state = State.SKIPPED
return
except support.TestFailedWithDetails as exc:
msg = f"test {test_name} failed"
if display_failure:
msg = f"{msg} -- {exc}"
print(msg, file=sys.stderr, flush=True)
result.state = State.FAILED
result.errors = exc.errors
result.failures = exc.failures
result.stats = exc.stats
return
except support.TestFailed as exc:
msg = f"test {test_name} failed"
if display_failure:
msg = f"{msg} -- {exc}"
print(msg, file=sys.stderr, flush=True)
result.state = State.FAILED
result.stats = exc.stats
return
except support.TestDidNotRun:
result.state = State.DID_NOT_RUN
return
except KeyboardInterrupt:
print()
result.state = State.INTERRUPTED
return
except:
if not pgo:
msg = traceback.format_exc()
print(f"test {test_name} crashed -- {msg}",
file=sys.stderr, flush=True)
result.state = State.UNCAUGHT_EXC
return
if support.environment_altered:
result.set_env_changed()
# Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
if result.state is None:
result.state = State.PASSED
def _runtest(result: TestResult, runtests: RunTests) -> None:
# Capture stdout and stderr, set faulthandler timeout,
# and create JUnit XML report.
verbose = runtests.verbose
output_on_failure = runtests.output_on_failure
timeout = runtests.timeout
use_timeout = (
timeout is not None and threading_helper.can_start_thread
)
if use_timeout:
faulthandler.dump_traceback_later(timeout, exit=True)
try:
setup_tests(runtests)
if output_on_failure:
support.verbose = True
stream = io.StringIO()
orig_stdout = sys.stdout
orig_stderr = sys.stderr
print_warning = support.print_warning
orig_print_warnings_stderr = print_warning.orig_stderr
output = None
try:
sys.stdout = stream
sys.stderr = stream
# print_warning() writes into the temporary stream to preserve
# messages order. If support.environment_altered becomes true,
# warnings will be written to sys.stderr below.
print_warning.orig_stderr = stream
_runtest_env_changed_exc(result, runtests, display_failure=False)
# Ignore output if the test passed successfully
if result.state != State.PASSED:
output = stream.getvalue()
finally:
sys.stdout = orig_stdout
sys.stderr = orig_stderr
print_warning.orig_stderr = orig_print_warnings_stderr
if output is not None:
sys.stderr.write(output)
sys.stderr.flush()
else:
# Tell tests to be moderately quiet
support.verbose = verbose
_runtest_env_changed_exc(result, runtests,
display_failure=not verbose)
xml_list = support.junit_xml_list
if xml_list:
import xml.etree.ElementTree as ET
result.xml_data = [ET.tostring(x).decode('us-ascii')
for x in xml_list]
finally:
if use_timeout:
faulthandler.cancel_dump_traceback_later()
support.junit_xml_list = None
def run_single_test(test_name: TestName, runtests: RunTests) -> TestResult:
"""Run a single test.
test_name -- the name of the test
Returns a TestResult.
If runtests.use_junit, xml_data is a list containing each generated
testsuite element.
"""
start_time = time.perf_counter()
result = TestResult(test_name)
pgo = runtests.pgo
try:
_runtest(result, runtests)
except:
if not pgo:
msg = traceback.format_exc()
print(f"test {test_name} crashed -- {msg}",
file=sys.stderr, flush=True)
result.state = State.UNCAUGHT_EXC
sys.stdout.flush()
sys.stderr.flush()
result.duration = time.perf_counter() - start_time
return result

View file

@ -1,9 +1,59 @@
import contextlib
import faulthandler
import locale
import math
import os.path
import platform
import random
import shlex
import signal
import subprocess
import sys
import sysconfig
import tempfile
import textwrap
from collections.abc import Callable
from test import support
from test.support import os_helper
from test.support import threading_helper
# All temporary files and temporary directories created by libregrtest should
# use TMP_PREFIX so cleanup_temp_dir() can remove them all.
TMP_PREFIX = 'test_python_'
WORK_DIR_PREFIX = TMP_PREFIX
WORKER_WORK_DIR_PREFIX = WORK_DIR_PREFIX + 'worker_'
# bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()).
# Used to protect against threading._shutdown() hang.
# Must be smaller than buildbot "1200 seconds without output" limit.
EXIT_TIMEOUT = 120.0
ALL_RESOURCES = ('audio', 'curses', 'largefile', 'network',
'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui', 'walltime')
# Other resources excluded from --use=all:
#
# - extralagefile (ex: test_zipfile64): really too slow to be enabled
# "by default"
# - tzdata: while needed to validate fully test_datetime, it makes
# test_datetime too slow (15-20 min on some buildbots) and so is disabled by
# default (see bpo-30822).
RESOURCE_NAMES = ALL_RESOURCES + ('extralargefile', 'tzdata')
# Types for types hints
StrPath = str
TestName = str
StrJSON = str
TestTuple = tuple[TestName, ...]
TestList = list[TestName]
# --match and --ignore options: list of patterns
# ('*' joker character can be used)
FilterTuple = tuple[TestName, ...]
FilterDict = dict[TestName, FilterTuple]
def format_duration(seconds):
@ -31,7 +81,7 @@ def format_duration(seconds):
return ' '.join(parts)
def removepy(names):
def strip_py_suffix(names: list[str] | None) -> None:
if not names:
return
for idx, name in enumerate(names):
@ -40,11 +90,20 @@ def removepy(names):
names[idx] = basename
def plural(n, singular, plural=None):
if n == 1:
return singular
elif plural is not None:
return plural
else:
return singular + 's'
def count(n, word):
if n == 1:
return "%d %s" % (n, word)
return f"{n} {word}"
else:
return "%d %ss" % (n, word)
return f"{n} {word}s"
def printlist(x, width=70, indent=4, file=None):
@ -125,15 +184,6 @@ def clear_caches():
if stream is not None:
stream.flush()
# Clear assorted module caches.
# Don't worry about resetting the cache if the module is not loaded
try:
distutils_dir_util = sys.modules['distutils.dir_util']
except KeyError:
pass
else:
distutils_dir_util._path_created.clear()
try:
re = sys.modules['re']
except KeyError:
@ -212,6 +262,13 @@ def clear_caches():
for f in typing._cleanups:
f()
try:
fractions = sys.modules['fractions']
except KeyError:
pass
else:
fractions._hash_algorithm.cache_clear()
def get_build_info():
# Get most important configure and build options as a list of strings.
@ -292,3 +349,331 @@ def get_build_info():
build.append("dtrace")
return build
def get_temp_dir(tmp_dir: StrPath | None = None) -> StrPath:
if tmp_dir:
tmp_dir = os.path.expanduser(tmp_dir)
else:
# When tests are run from the Python build directory, it is best practice
# to keep the test files in a subfolder. This eases the cleanup of leftover
# files using the "make distclean" command.
if sysconfig.is_python_build():
if not support.is_wasi:
tmp_dir = sysconfig.get_config_var('abs_builddir')
if tmp_dir is None:
tmp_dir = sysconfig.get_config_var('abs_srcdir')
if not tmp_dir:
# gh-74470: On Windows, only srcdir is available. Using
# abs_builddir mostly matters on UNIX when building
# Python out of the source tree, especially when the
# source tree is read only.
tmp_dir = sysconfig.get_config_var('srcdir')
tmp_dir = os.path.join(tmp_dir, 'build')
else:
# WASI platform
tmp_dir = sysconfig.get_config_var('projectbase')
tmp_dir = os.path.join(tmp_dir, 'build')
# When get_temp_dir() is called in a worker process,
# get_temp_dir() path is different than in the parent process
# which is not a WASI process. So the parent does not create
# the same "tmp_dir" than the test worker process.
os.makedirs(tmp_dir, exist_ok=True)
else:
tmp_dir = tempfile.gettempdir()
return os.path.abspath(tmp_dir)
def fix_umask():
if support.is_emscripten:
# Emscripten has default umask 0o777, which breaks some tests.
# see https://github.com/emscripten-core/emscripten/issues/17269
old_mask = os.umask(0)
if old_mask == 0o777:
os.umask(0o027)
else:
os.umask(old_mask)
def get_work_dir(parent_dir: StrPath, worker: bool = False) -> StrPath:
# Define a writable temp dir that will be used as cwd while running
# the tests. The name of the dir includes the pid to allow parallel
# testing (see the -j option).
# Emscripten and WASI have stubbed getpid(), Emscripten has only
# milisecond clock resolution. Use randint() instead.
if support.is_emscripten or support.is_wasi:
nounce = random.randint(0, 1_000_000)
else:
nounce = os.getpid()
if worker:
work_dir = WORK_DIR_PREFIX + str(nounce)
else:
work_dir = WORKER_WORK_DIR_PREFIX + str(nounce)
work_dir += os_helper.FS_NONASCII
work_dir = os.path.join(parent_dir, work_dir)
return work_dir
@contextlib.contextmanager
def exit_timeout():
try:
yield
except SystemExit as exc:
# bpo-38203: Python can hang at exit in Py_Finalize(), especially
# on threading._shutdown() call: put a timeout
if threading_helper.can_start_thread:
faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
sys.exit(exc.code)
def remove_testfn(test_name: TestName, verbose: int) -> None:
# Try to clean up os_helper.TESTFN if left behind.
#
# While tests shouldn't leave any files or directories behind, when a test
# fails that can be tedious for it to arrange. The consequences can be
# especially nasty on Windows, since if a test leaves a file open, it
# cannot be deleted by name (while there's nothing we can do about that
# here either, we can display the name of the offending test, which is a
# real help).
name = os_helper.TESTFN
if not os.path.exists(name):
return
nuker: Callable[[str], None]
if os.path.isdir(name):
import shutil
kind, nuker = "directory", shutil.rmtree
elif os.path.isfile(name):
kind, nuker = "file", os.unlink
else:
raise RuntimeError(f"os.path says {name!r} exists but is neither "
f"directory nor file")
if verbose:
print_warning(f"{test_name} left behind {kind} {name!r}")
support.environment_altered = True
try:
import stat
# fix possible permissions problems that might prevent cleanup
os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
nuker(name)
except Exception as exc:
print_warning(f"{test_name} left behind {kind} {name!r} "
f"and it couldn't be removed: {exc}")
def abs_module_name(test_name: TestName, test_dir: StrPath | None) -> TestName:
if test_name.startswith('test.') or test_dir:
return test_name
else:
# Import it from the test package
return 'test.' + test_name
# gh-90681: When rerunning tests, we might need to rerun the whole
# class or module suite if some its life-cycle hooks fail.
# Test level hooks are not affected.
_TEST_LIFECYCLE_HOOKS = frozenset((
'setUpClass', 'tearDownClass',
'setUpModule', 'tearDownModule',
))
def normalize_test_name(test_full_name, *, is_error=False):
short_name = test_full_name.split(" ")[0]
if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
if test_full_name.startswith(('setUpModule (', 'tearDownModule (')):
# if setUpModule() or tearDownModule() failed, don't filter
# tests with the test file name, don't use use filters.
return None
# This means that we have a failure in a life-cycle hook,
# we need to rerun the whole module or class suite.
# Basically the error looks like this:
# ERROR: setUpClass (test.test_reg_ex.RegTest)
# or
# ERROR: setUpModule (test.test_reg_ex)
# So, we need to parse the class / module name.
lpar = test_full_name.index('(')
rpar = test_full_name.index(')')
return test_full_name[lpar + 1: rpar].split('.')[-1]
return short_name
def adjust_rlimit_nofile():
"""
On macOS the default fd limit (RLIMIT_NOFILE) is sometimes too low (256)
for our test suite to succeed. Raise it to something more reasonable. 1024
is a common Linux default.
"""
try:
import resource
except ImportError:
return
fd_limit, max_fds = resource.getrlimit(resource.RLIMIT_NOFILE)
desired_fds = 1024
if fd_limit < desired_fds and fd_limit < max_fds:
new_fd_limit = min(desired_fds, max_fds)
try:
resource.setrlimit(resource.RLIMIT_NOFILE,
(new_fd_limit, max_fds))
print(f"Raised RLIMIT_NOFILE: {fd_limit} -> {new_fd_limit}")
except (ValueError, OSError) as err:
print_warning(f"Unable to raise RLIMIT_NOFILE from {fd_limit} to "
f"{new_fd_limit}: {err}.")
def get_host_runner():
if (hostrunner := os.environ.get("_PYTHON_HOSTRUNNER")) is None:
hostrunner = sysconfig.get_config_var("HOSTRUNNER")
return hostrunner
def is_cross_compiled():
return ('_PYTHON_HOST_PLATFORM' in os.environ)
def format_resources(use_resources: tuple[str, ...]):
use_resources = set(use_resources)
all_resources = set(ALL_RESOURCES)
# Express resources relative to "all"
relative_all = ['all']
for name in sorted(all_resources - use_resources):
relative_all.append(f'-{name}')
for name in sorted(use_resources - all_resources):
relative_all.append(f'{name}')
all_text = ','.join(relative_all)
all_text = f"resources: {all_text}"
# List of enabled resources
text = ','.join(sorted(use_resources))
text = f"resources ({len(use_resources)}): {text}"
# Pick the shortest string (prefer relative to all if lengths are equal)
if len(all_text) <= len(text):
return all_text
else:
return text
def process_cpu_count():
if hasattr(os, 'sched_getaffinity'):
return len(os.sched_getaffinity(0))
else:
return os.cpu_count()
def display_header(use_resources: tuple[str, ...],
python_cmd: tuple[str, ...] | None):
# Print basic platform information
print("==", platform.python_implementation(), *sys.version.split())
print("==", platform.platform(aliased=True),
"%s-endian" % sys.byteorder)
print("== Python build:", ' '.join(get_build_info()))
print("== cwd:", os.getcwd())
cpu_count = os.cpu_count()
if cpu_count:
affinity = process_cpu_count()
if affinity and affinity != cpu_count:
cpu_count = f"{affinity} (process) / {cpu_count} (system)"
print("== CPU count:", cpu_count)
print("== encodings: locale=%s FS=%s"
% (locale.getencoding(), sys.getfilesystemencoding()))
if use_resources:
text = format_resources(use_resources)
print(f"== {text}")
else:
print("== resources: all test resources are disabled, "
"use -u option to unskip tests")
cross_compile = is_cross_compiled()
if cross_compile:
print("== cross compiled: Yes")
if python_cmd:
cmd = shlex.join(python_cmd)
print(f"== host python: {cmd}")
get_cmd = [*python_cmd, '-m', 'platform']
proc = subprocess.run(
get_cmd,
stdout=subprocess.PIPE,
text=True,
cwd=os_helper.SAVEDCWD)
stdout = proc.stdout.replace('\n', ' ').strip()
if stdout:
print(f"== host platform: {stdout}")
elif proc.returncode:
print(f"== host platform: <command failed with exit code {proc.returncode}>")
else:
hostrunner = get_host_runner()
if hostrunner:
print(f"== host runner: {hostrunner}")
# This makes it easier to remember what to set in your local
# environment when trying to reproduce a sanitizer failure.
asan = support.check_sanitizer(address=True)
msan = support.check_sanitizer(memory=True)
ubsan = support.check_sanitizer(ub=True)
sanitizers = []
if asan:
sanitizers.append("address")
if msan:
sanitizers.append("memory")
if ubsan:
sanitizers.append("undefined behavior")
if sanitizers:
print(f"== sanitizers: {', '.join(sanitizers)}")
for sanitizer, env_var in (
(asan, "ASAN_OPTIONS"),
(msan, "MSAN_OPTIONS"),
(ubsan, "UBSAN_OPTIONS"),
):
options= os.environ.get(env_var)
if sanitizer and options is not None:
print(f"== {env_var}={options!r}")
print(flush=True)
def cleanup_temp_dir(tmp_dir: StrPath):
import glob
path = os.path.join(glob.escape(tmp_dir), TMP_PREFIX + '*')
print("Cleanup %s directory" % tmp_dir)
for name in glob.glob(path):
if os.path.isdir(name):
print("Remove directory: %s" % name)
os_helper.rmtree(name)
else:
print("Remove file: %s" % name)
os_helper.unlink(name)
WINDOWS_STATUS = {
0xC0000005: "STATUS_ACCESS_VIOLATION",
0xC00000FD: "STATUS_STACK_OVERFLOW",
0xC000013A: "STATUS_CONTROL_C_EXIT",
}
def get_signal_name(exitcode):
if exitcode < 0:
signum = -exitcode
try:
return signal.Signals(signum).name
except ValueError:
pass
try:
return WINDOWS_STATUS[exitcode]
except KeyError:
pass
return None

View file

@ -0,0 +1,116 @@
import subprocess
import sys
import os
from typing import Any, NoReturn
from test import support
from test.support import os_helper
from .setup import setup_process, setup_test_dir
from .runtests import RunTests, JsonFile, JsonFileType
from .single import run_single_test
from .utils import (
StrPath, StrJSON, FilterTuple,
get_temp_dir, get_work_dir, exit_timeout)
USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
def create_worker_process(runtests: RunTests, output_fd: int,
tmp_dir: StrPath | None = None) -> subprocess.Popen:
python_cmd = runtests.python_cmd
worker_json = runtests.as_json()
python_opts = support.args_from_interpreter_flags()
if python_cmd is not None:
executable = python_cmd
# Remove -E option, since --python=COMMAND can set PYTHON environment
# variables, such as PYTHONPATH, in the worker process.
python_opts = [opt for opt in python_opts if opt != "-E"]
else:
executable = (sys.executable,)
cmd = [*executable, *python_opts,
'-u', # Unbuffered stdout and stderr
'-m', 'test.libregrtest.worker',
worker_json]
env = dict(os.environ)
if tmp_dir is not None:
env['TMPDIR'] = tmp_dir
env['TEMP'] = tmp_dir
env['TMP'] = tmp_dir
# Running the child from the same working directory as regrtest's original
# invocation ensures that TEMPDIR for the child is the same when
# sysconfig.is_python_build() is true. See issue 15300.
#
# Emscripten and WASI Python must start in the Python source code directory
# to get 'python.js' or 'python.wasm' file. Then worker_process() changes
# to a temporary directory created to run tests.
work_dir = os_helper.SAVEDCWD
kwargs: dict[str, Any] = dict(
env=env,
stdout=output_fd,
# bpo-45410: Write stderr into stdout to keep messages order
stderr=output_fd,
text=True,
close_fds=True,
cwd=work_dir,
)
if USE_PROCESS_GROUP:
kwargs['start_new_session'] = True
# Pass json_file to the worker process
json_file = runtests.json_file
json_file.configure_subprocess(kwargs)
with json_file.inherit_subprocess():
return subprocess.Popen(cmd, **kwargs)
def worker_process(worker_json: StrJSON) -> NoReturn:
runtests = RunTests.from_json(worker_json)
test_name = runtests.tests[0]
match_tests: FilterTuple | None = runtests.match_tests
json_file: JsonFile = runtests.json_file
setup_test_dir(runtests.test_dir)
setup_process()
if runtests.rerun:
if match_tests:
matching = "matching: " + ", ".join(match_tests)
print(f"Re-running {test_name} in verbose mode ({matching})", flush=True)
else:
print(f"Re-running {test_name} in verbose mode", flush=True)
result = run_single_test(test_name, runtests)
if json_file.file_type == JsonFileType.STDOUT:
print()
result.write_json_into(sys.stdout)
else:
with json_file.open('w', encoding='utf-8') as json_fp:
result.write_json_into(json_fp)
sys.exit(0)
def main():
if len(sys.argv) != 2:
print("usage: python -m test.libregrtest.worker JSON")
sys.exit(1)
worker_json = sys.argv[1]
tmp_dir = get_temp_dir()
work_dir = get_work_dir(tmp_dir, worker=True)
with exit_timeout():
with os_helper.temp_cwd(work_dir, quiet=True):
worker_process(worker_json)
if __name__ == "__main__":
main()

View file

@ -1,18 +1,13 @@
"""
Collect various information about Python to help debugging test failures.
"""
from __future__ import print_function
import errno
import re
import sys
import traceback
import unittest
import warnings
MS_WINDOWS = (sys.platform == 'win32')
def normalize_text(text):
if text is None:
return None
@ -244,6 +239,7 @@ def collect_os(info_add):
'getresgid',
'getresuid',
'getuid',
'process_cpu_count',
'uname',
):
call_func(info_add, 'os.%s' % func, os, func)
@ -273,6 +269,7 @@ def collect_os(info_add):
"ARCHFLAGS",
"ARFLAGS",
"AUDIODEV",
"BUILDPYTHON",
"CC",
"CFLAGS",
"COLUMNS",
@ -317,7 +314,6 @@ def collect_os(info_add):
"TEMP",
"TERM",
"TILE_LIBRARY",
"TIX_LIBRARY",
"TMP",
"TMPDIR",
"TRAVIS",
@ -326,6 +322,7 @@ def collect_os(info_add):
"VIRTUAL_ENV",
"WAYLAND_DISPLAY",
"WINDIR",
"_PYTHON_HOSTRUNNER",
"_PYTHON_HOST_PLATFORM",
"_PYTHON_PROJECT_BASE",
"_PYTHON_SYSCONFIGDATA_NAME",
@ -341,7 +338,8 @@ def collect_os(info_add):
for name, value in os.environ.items():
uname = name.upper()
if (uname in ENV_VARS
# Copy PYTHON* and LC_* variables
# Copy PYTHON* variables like PYTHONPATH
# Copy LC_* variables like LC_ALL
or uname.startswith(("PYTHON", "LC_"))
# Visual Studio: VS140COMNTOOLS
or (uname.startswith("VS") and uname.endswith("COMNTOOLS"))):
@ -494,13 +492,10 @@ def collect_datetime(info_add):
def collect_sysconfig(info_add):
# On Windows, sysconfig is not reliable to get macros used
# to build Python
if MS_WINDOWS:
return
import sysconfig
info_add('sysconfig.is_python_build', sysconfig.is_python_build())
for name in (
'ABIFLAGS',
'ANDROID_API_LEVEL',
@ -509,6 +504,7 @@ def collect_sysconfig(info_add):
'CFLAGS',
'CFLAGSFORSHARED',
'CONFIG_ARGS',
'HOSTRUNNER',
'HOST_GNU_TYPE',
'MACHDEP',
'MULTIARCH',
@ -634,7 +630,7 @@ def collect_sqlite(info_add):
except ImportError:
return
attributes = ('version', 'sqlite_version')
attributes = ('sqlite_version',)
copy_attributes(info_add, sqlite3, 'sqlite3.%s', attributes)
@ -674,7 +670,29 @@ def collect_testcapi(info_add):
except ImportError:
return
call_func(info_add, 'pymem.allocator', _testcapi, 'pymem_getallocatorsname')
for name in (
'LONG_MAX', # always 32-bit on Windows, 64-bit on 64-bit Unix
'PY_SSIZE_T_MAX',
'Py_C_RECURSION_LIMIT',
'SIZEOF_TIME_T', # 32-bit or 64-bit depending on the platform
'SIZEOF_WCHAR_T', # 16-bit or 32-bit depending on the platform
):
copy_attr(info_add, f'_testcapi.{name}', _testcapi, name)
def collect_testinternalcapi(info_add):
try:
import _testinternalcapi
except ImportError:
return
call_func(info_add, 'pymem.allocator', _testinternalcapi, 'pymem_getallocatorsname')
for name in (
'SIZEOF_PYGC_HEAD',
'SIZEOF_PYOBJECT',
):
copy_attr(info_add, f'_testinternalcapi.{name}', _testinternalcapi, name)
def collect_resource(info_add):
@ -693,6 +711,7 @@ def collect_resource(info_add):
def collect_test_socket(info_add):
import unittest
try:
from test import test_socket
except (ImportError, unittest.SkipTest):
@ -704,15 +723,12 @@ def collect_test_socket(info_add):
copy_attributes(info_add, test_socket, 'test_socket.%s', attributes)
def collect_test_support(info_add):
def collect_support(info_add):
try:
from test import support
except ImportError:
return
attributes = ('IPV6_ENABLED',)
copy_attributes(info_add, support, 'test_support.%s', attributes)
attributes = (
'MS_WINDOWS',
'has_fork_support',
@ -726,17 +742,64 @@ def collect_test_support(info_add):
)
copy_attributes(info_add, support, 'support.%s', attributes)
call_func(info_add, 'test_support._is_gui_available', support, '_is_gui_available')
call_func(info_add, 'test_support.python_is_optimized', support, 'python_is_optimized')
call_func(info_add, 'support._is_gui_available', support, '_is_gui_available')
call_func(info_add, 'support.python_is_optimized', support, 'python_is_optimized')
info_add('test_support.check_sanitizer(address=True)',
info_add('support.check_sanitizer(address=True)',
support.check_sanitizer(address=True))
info_add('test_support.check_sanitizer(memory=True)',
info_add('support.check_sanitizer(memory=True)',
support.check_sanitizer(memory=True))
info_add('test_support.check_sanitizer(ub=True)',
info_add('support.check_sanitizer(ub=True)',
support.check_sanitizer(ub=True))
def collect_support_os_helper(info_add):
try:
from test.support import os_helper
except ImportError:
return
for name in (
'can_symlink',
'can_xattr',
'can_chmod',
'can_dac_override',
):
func = getattr(os_helper, name)
info_add(f'support_os_helper.{name}', func())
def collect_support_socket_helper(info_add):
try:
from test.support import socket_helper
except ImportError:
return
attributes = (
'IPV6_ENABLED',
'has_gethostname',
)
copy_attributes(info_add, socket_helper, 'support_socket_helper.%s', attributes)
for name in (
'tcp_blackhole',
):
func = getattr(socket_helper, name)
info_add(f'support_socket_helper.{name}', func())
def collect_support_threading_helper(info_add):
try:
from test.support import threading_helper
except ImportError:
return
attributes = (
'can_start_thread',
)
copy_attributes(info_add, threading_helper, 'support_threading_helper.%s', attributes)
def collect_cc(info_add):
import subprocess
import sysconfig
@ -891,6 +954,12 @@ def collect_fips(info_add):
pass
def collect_tempfile(info_add):
import tempfile
info_add('tempfile.gettempdir', tempfile.gettempdir())
def collect_libregrtest_utils(info_add):
try:
from test.libregrtest import utils
@ -933,6 +1002,8 @@ def collect_info(info):
collect_sys,
collect_sysconfig,
collect_testcapi,
collect_testinternalcapi,
collect_tempfile,
collect_time,
collect_tkinter,
collect_windows,
@ -941,7 +1012,10 @@ def collect_info(info):
# Collecting from tests should be last as they have side effects.
collect_test_socket,
collect_test_support,
collect_support,
collect_support_os_helper,
collect_support_socket_helper,
collect_support_threading_helper,
):
try:
collect_func(info_add)

View file

@ -8,7 +8,7 @@ Run this script with -h or --help for documentation.
import os
import sys
from test.libregrtest import main
from test.libregrtest.main import main
# Alias for backward compatibility (just in case)

View file

@ -431,6 +431,14 @@ def skip_if_sanitizer(reason=None, *, address=False, memory=False, ub=False):
HAVE_ASAN_FORK_BUG = check_sanitizer(address=True)
def set_sanitizer_env_var(env, option):
for name in ('ASAN_OPTIONS', 'MSAN_OPTIONS', 'UBSAN_OPTIONS'):
if name in env:
env[name] += f':{option}'
else:
env[name] = option
def system_must_validate_cert(f):
"""Skip the test on TLS certificate validation failures."""
@functools.wraps(f)
@ -892,27 +900,31 @@ _4G = 4 * _1G
MAX_Py_ssize_t = sys.maxsize
def set_memlimit(limit):
global max_memuse
global real_max_memuse
def _parse_memlimit(limit: str) -> int:
sizes = {
'k': 1024,
'm': _1M,
'g': _1G,
't': 1024*_1G,
}
m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
m = re.match(r'(\d+(?:\.\d+)?) (K|M|G|T)b?$', limit,
re.IGNORECASE | re.VERBOSE)
if m is None:
raise ValueError('Invalid memory limit %r' % (limit,))
memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
real_max_memuse = memlimit
if memlimit > MAX_Py_ssize_t:
memlimit = MAX_Py_ssize_t
raise ValueError(f'Invalid memory limit: {limit!r}')
return int(float(m.group(1)) * sizes[m.group(2).lower()])
def set_memlimit(limit: str) -> None:
global max_memuse
global real_max_memuse
memlimit = _parse_memlimit(limit)
if memlimit < _2G - 1:
raise ValueError('Memory limit %r too low to be useful' % (limit,))
raise ValueError('Memory limit {limit!r} too low to be useful')
real_max_memuse = memlimit
memlimit = min(memlimit, MAX_Py_ssize_t)
max_memuse = memlimit
class _MemoryWatchdog:
"""An object which periodically watches the process' memory consumption
and prints it out.
@ -1187,7 +1199,6 @@ def _is_full_match_test(pattern):
def set_match_tests(accept_patterns=None, ignore_patterns=None):
global _match_test_func, _accept_test_patterns, _ignore_test_patterns
if accept_patterns is None:
accept_patterns = ()
if ignore_patterns is None:
@ -2139,17 +2150,14 @@ def wait_process(pid, *, exitcode, timeout=None):
if timeout is None:
timeout = LONG_TIMEOUT
t0 = time.monotonic()
sleep = 0.001
max_sleep = 0.1
while True:
start_time = time.monotonic()
for _ in sleeping_retry(timeout, error=False):
pid2, status = os.waitpid(pid, os.WNOHANG)
if pid2 != 0:
break
# process is still running
dt = time.monotonic() - t0
if dt > timeout:
# rety: the process is still running
else:
try:
os.kill(pid, signal.SIGKILL)
os.waitpid(pid, 0)
@ -2157,13 +2165,11 @@ def wait_process(pid, *, exitcode, timeout=None):
# Ignore errors like ChildProcessError or PermissionError
pass
dt = time.monotonic() - start_time
raise AssertionError(f"process {pid} is still running "
f"after {dt:.1f} seconds")
sleep = min(sleep * 2, max_sleep)
time.sleep(sleep)
else:
# Windows implementation
# Windows implementation: don't support timeout :-(
pid2, status = os.waitpid(pid, 0)
exitcode2 = os.waitstatus_to_exitcode(status)
@ -2327,6 +2333,11 @@ def requires_venv_with_pip():
return unittest.skipUnless(ctypes, 'venv: pip requires ctypes')
# True if Python is built with the Py_DEBUG macro defined: if
# Python is built in debug mode (./configure --with-pydebug).
Py_DEBUG = hasattr(sys, 'gettotalrefcount')
def busy_retry(timeout, err_msg=None, /, *, error=True):
"""
Run the loop body until "break" stops the loop.

View file

@ -567,7 +567,7 @@ def fs_is_case_insensitive(directory):
class FakePath:
"""Simple implementing of the path protocol.
"""Simple implementation of the path protocol.
"""
def __init__(self, path):
self.path = path

View file

@ -8,6 +8,7 @@ import sys
import time
import traceback
import unittest
from test import support
class RegressionTestResult(unittest.TextTestResult):
USE_XML = False
@ -109,6 +110,8 @@ class RegressionTestResult(unittest.TextTestResult):
def addFailure(self, test, err):
self._add_result(test, True, failure=self.__makeErrorDict(*err))
super().addFailure(test, err)
if support.failfast:
self.stop()
def addSkip(self, test, reason):
self._add_result(test, skipped=reason)

View file

@ -88,19 +88,17 @@ def wait_threads_exit(timeout=None):
yield
finally:
start_time = time.monotonic()
deadline = start_time + timeout
while True:
for _ in support.sleeping_retry(timeout, error=False):
support.gc_collect()
count = _thread._count()
if count <= old_count:
break
if time.monotonic() > deadline:
else:
dt = time.monotonic() - start_time
msg = (f"wait_threads() failed to cleanup {count - old_count} "
f"threads after {dt:.1f} seconds "
f"(count: {count}, old count: {old_count})")
raise AssertionError(msg)
time.sleep(0.010)
support.gc_collect()
def join_thread(thread, timeout=None):

View file

@ -8,7 +8,6 @@ import subprocess
import sys
from test import support
from test.support import os_helper, script_helper, is_android, MS_WINDOWS
from test.support import skip_if_sanitizer
import tempfile
import unittest
from textwrap import dedent
@ -34,7 +33,7 @@ def expected_traceback(lineno1, lineno2, header, min_count=1):
return '^' + regex + '$'
def skip_segfault_on_android(test):
# Issue #32138: Raising SIGSEGV on Android may not cause a crash.
# gh-76319: Raising SIGSEGV on Android may not cause a crash.
return unittest.skipIf(is_android,
'raising SIGSEGV on Android is unreliable')(test)
@ -62,8 +61,16 @@ class FaultHandlerTests(unittest.TestCase):
pass_fds = []
if fd is not None:
pass_fds.append(fd)
env = dict(os.environ)
# Sanitizers must not handle SIGSEGV (ex: for test_enable_fd())
option = 'handle_segv=0'
support.set_sanitizer_env_var(env, option)
with support.SuppressCrashReport():
process = script_helper.spawn_python('-c', code, pass_fds=pass_fds)
process = script_helper.spawn_python('-c', code,
pass_fds=pass_fds,
env=env)
with process:
output, stderr = process.communicate()
exitcode = process.wait()
@ -302,8 +309,6 @@ class FaultHandlerTests(unittest.TestCase):
3,
'Segmentation fault')
@skip_if_sanitizer(memory=True, ub=True, reason="sanitizer "
"builds change crashing process output.")
@skip_segfault_on_android
def test_enable_file(self):
with temporary_filename() as filename:
@ -319,8 +324,6 @@ class FaultHandlerTests(unittest.TestCase):
@unittest.skipIf(sys.platform == "win32",
"subprocess doesn't support pass_fds on Windows")
@skip_if_sanitizer(memory=True, ub=True, reason="sanitizer "
"builds change crashing process output.")
@skip_segfault_on_android
def test_enable_fd(self):
with tempfile.TemporaryFile('wb+') as fp:

File diff suppressed because it is too large Load diff

View file

@ -31,7 +31,7 @@ class TestSupport(unittest.TestCase):
"test.support.warnings_helper", like=".*used in test_support.*"
)
cls._test_support_token = support.ignore_deprecations_from(
"test.test_support", like=".*You should NOT be seeing this.*"
__name__, like=".*You should NOT be seeing this.*"
)
assert len(warnings.filters) == orig_filter_len + 2
@ -775,7 +775,45 @@ class TestSupport(unittest.TestCase):
else:
self.fail("RecursionError was not raised")
#self.assertEqual(available, 2)
def test_parse_memlimit(self):
parse = support._parse_memlimit
KiB = 1024
MiB = KiB * 1024
GiB = MiB * 1024
TiB = GiB * 1024
self.assertEqual(parse('0k'), 0)
self.assertEqual(parse('3k'), 3 * KiB)
self.assertEqual(parse('2.4m'), int(2.4 * MiB))
self.assertEqual(parse('4g'), int(4 * GiB))
self.assertEqual(parse('1t'), TiB)
for limit in ('', '3', '3.5.10k', '10x'):
with self.subTest(limit=limit):
with self.assertRaises(ValueError):
parse(limit)
def test_set_memlimit(self):
_4GiB = 4 * 1024 ** 3
TiB = 1024 ** 4
old_max_memuse = support.max_memuse
old_real_max_memuse = support.real_max_memuse
try:
if sys.maxsize > 2**32:
support.set_memlimit('4g')
self.assertEqual(support.max_memuse, _4GiB)
self.assertEqual(support.real_max_memuse, _4GiB)
big = 2**100 // TiB
support.set_memlimit(f'{big}t')
self.assertEqual(support.max_memuse, sys.maxsize)
self.assertEqual(support.real_max_memuse, big * TiB)
else:
support.set_memlimit('4g')
self.assertEqual(support.max_memuse, sys.maxsize)
self.assertEqual(support.real_max_memuse, _4GiB)
finally:
support.max_memuse = old_max_memuse
support.real_max_memuse = old_real_max_memuse
def test_copy_python_src_ignore(self):
# Get source directory
@ -824,7 +862,6 @@ class TestSupport(unittest.TestCase):
# EnvironmentVarGuard
# transient_internet
# run_with_locale
# set_memlimit
# bigmemtest
# precisionbigmemtest
# bigaddrspacetest