mirror of
https://github.com/python/cpython.git
synced 2025-08-09 19:38:42 +00:00
473 lines
16 KiB
Python
473 lines
16 KiB
Python
import dataclasses
|
|
import doctest
|
|
import faulthandler
|
|
import functools
|
|
import gc
|
|
import importlib
|
|
import io
|
|
import os
|
|
import sys
|
|
import time
|
|
import traceback
|
|
import unittest
|
|
|
|
from test import support
|
|
from test.support import TestStats
|
|
from test.support import os_helper
|
|
from test.support import threading_helper
|
|
from test.libregrtest.cmdline import Namespace
|
|
from test.libregrtest.save_env import saved_test_environment
|
|
from test.libregrtest.utils import clear_caches, format_duration, print_warning
|
|
|
|
|
|
# Avoid enum.Enum to reduce the number of imports when tests are run
|
|
class State:
|
|
PASSED = "PASSED"
|
|
FAILED = "FAILED"
|
|
SKIPPED = "SKIPPED"
|
|
UNCAUGHT_EXC = "UNCAUGHT_EXC"
|
|
REFLEAK = "REFLEAK"
|
|
ENV_CHANGED = "ENV_CHANGED"
|
|
RESOURCE_DENIED = "RESOURCE_DENIED"
|
|
INTERRUPTED = "INTERRUPTED"
|
|
MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR"
|
|
DID_NOT_RUN = "DID_NOT_RUN"
|
|
TIMEOUT = "TIMEOUT"
|
|
|
|
@staticmethod
|
|
def is_failed(state):
|
|
return state in {
|
|
State.FAILED,
|
|
State.UNCAUGHT_EXC,
|
|
State.REFLEAK,
|
|
State.MULTIPROCESSING_ERROR,
|
|
State.TIMEOUT}
|
|
|
|
@staticmethod
|
|
def has_meaningful_duration(state):
|
|
# Consider that the duration is meaningless for these cases.
|
|
# For example, if a whole test file is skipped, its duration
|
|
# is unlikely to be the duration of executing its tests,
|
|
# but just the duration to execute code which skips the test.
|
|
return state not in {
|
|
State.SKIPPED,
|
|
State.RESOURCE_DENIED,
|
|
State.INTERRUPTED,
|
|
State.MULTIPROCESSING_ERROR,
|
|
State.DID_NOT_RUN}
|
|
|
|
|
|
@dataclasses.dataclass(slots=True)
|
|
class TestResult:
|
|
test_name: str
|
|
state: str | None = None
|
|
# Test duration in seconds
|
|
duration: float | None = None
|
|
xml_data: list[str] | None = None
|
|
stats: TestStats | None = None
|
|
|
|
# errors and failures copied from support.TestFailedWithDetails
|
|
errors: list[tuple[str, str]] | None = None
|
|
failures: list[tuple[str, str]] | None = None
|
|
|
|
def is_failed(self, fail_env_changed: bool) -> bool:
|
|
if self.state == State.ENV_CHANGED:
|
|
return fail_env_changed
|
|
return State.is_failed(self.state)
|
|
|
|
def _format_failed(self):
|
|
if self.errors and self.failures:
|
|
le = len(self.errors)
|
|
lf = len(self.failures)
|
|
error_s = "error" + ("s" if le > 1 else "")
|
|
failure_s = "failure" + ("s" if lf > 1 else "")
|
|
return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"
|
|
|
|
if self.errors:
|
|
le = len(self.errors)
|
|
error_s = "error" + ("s" if le > 1 else "")
|
|
return f"{self.test_name} failed ({le} {error_s})"
|
|
|
|
if self.failures:
|
|
lf = len(self.failures)
|
|
failure_s = "failure" + ("s" if lf > 1 else "")
|
|
return f"{self.test_name} failed ({lf} {failure_s})"
|
|
|
|
return f"{self.test_name} failed"
|
|
|
|
def __str__(self) -> str:
|
|
match self.state:
|
|
case State.PASSED:
|
|
return f"{self.test_name} passed"
|
|
case State.FAILED:
|
|
return self._format_failed()
|
|
case State.SKIPPED:
|
|
return f"{self.test_name} skipped"
|
|
case State.UNCAUGHT_EXC:
|
|
return f"{self.test_name} failed (uncaught exception)"
|
|
case State.REFLEAK:
|
|
return f"{self.test_name} failed (reference leak)"
|
|
case State.ENV_CHANGED:
|
|
return f"{self.test_name} failed (env changed)"
|
|
case State.RESOURCE_DENIED:
|
|
return f"{self.test_name} skipped (resource denied)"
|
|
case State.INTERRUPTED:
|
|
return f"{self.test_name} interrupted"
|
|
case State.MULTIPROCESSING_ERROR:
|
|
return f"{self.test_name} process crashed"
|
|
case State.DID_NOT_RUN:
|
|
return f"{self.test_name} ran no tests"
|
|
case State.TIMEOUT:
|
|
return f"{self.test_name} timed out ({format_duration(self.duration)})"
|
|
case _:
|
|
raise ValueError("unknown result state: {state!r}")
|
|
|
|
def has_meaningful_duration(self):
|
|
return State.has_meaningful_duration(self.state)
|
|
|
|
def set_env_changed(self):
|
|
if self.state is None or self.state == State.PASSED:
|
|
self.state = State.ENV_CHANGED
|
|
|
|
|
|
# Minimum duration of a test to display its duration or to mention that
|
|
# the test is running in background
|
|
PROGRESS_MIN_TIME = 30.0 # seconds
|
|
|
|
#If these test directories are encountered recurse into them and treat each
|
|
# test_ .py or dir as a separate test module. This can increase parallelism.
|
|
# Beware this can't generally be done for any directory with sub-tests as the
|
|
# __init__.py may do things which alter what tests are to be run.
|
|
|
|
SPLITTESTDIRS = {
|
|
"test_asyncio",
|
|
}
|
|
|
|
# Storage of uncollectable objects
|
|
FOUND_GARBAGE = []
|
|
|
|
|
|
def findtestdir(path=None):
|
|
return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
|
|
|
|
|
|
def findtests(*, testdir=None, exclude=(),
|
|
split_test_dirs=SPLITTESTDIRS, base_mod=""):
|
|
"""Return a list of all applicable test modules."""
|
|
testdir = findtestdir(testdir)
|
|
tests = []
|
|
for name in os.listdir(testdir):
|
|
mod, ext = os.path.splitext(name)
|
|
if (not mod.startswith("test_")) or (mod in exclude):
|
|
continue
|
|
if mod in split_test_dirs:
|
|
subdir = os.path.join(testdir, mod)
|
|
mod = f"{base_mod or 'test'}.{mod}"
|
|
tests.extend(findtests(testdir=subdir, exclude=exclude,
|
|
split_test_dirs=split_test_dirs, base_mod=mod))
|
|
elif ext in (".py", ""):
|
|
tests.append(f"{base_mod}.{mod}" if base_mod else mod)
|
|
return sorted(tests)
|
|
|
|
|
|
def split_test_packages(tests, *, testdir=None, exclude=(),
|
|
split_test_dirs=SPLITTESTDIRS):
|
|
testdir = findtestdir(testdir)
|
|
splitted = []
|
|
for name in tests:
|
|
if name in split_test_dirs:
|
|
subdir = os.path.join(testdir, name)
|
|
splitted.extend(findtests(testdir=subdir, exclude=exclude,
|
|
split_test_dirs=split_test_dirs,
|
|
base_mod=name))
|
|
else:
|
|
splitted.append(name)
|
|
return splitted
|
|
|
|
|
|
def get_abs_module(ns: Namespace, test_name: str) -> str:
|
|
if test_name.startswith('test.') or ns.testdir:
|
|
return test_name
|
|
else:
|
|
# Import it from the test package
|
|
return 'test.' + test_name
|
|
|
|
|
|
def _runtest_capture_output_timeout_junit(result: TestResult, ns: Namespace) -> None:
|
|
# Capture stdout and stderr, set faulthandler timeout,
|
|
# and create JUnit XML report.
|
|
|
|
output_on_failure = ns.verbose3
|
|
|
|
use_timeout = (
|
|
ns.timeout is not None and threading_helper.can_start_thread
|
|
)
|
|
if use_timeout:
|
|
faulthandler.dump_traceback_later(ns.timeout, exit=True)
|
|
|
|
try:
|
|
support.set_match_tests(ns.match_tests, ns.ignore_tests)
|
|
support.junit_xml_list = xml_list = [] if ns.xmlpath else None
|
|
if ns.failfast:
|
|
support.failfast = True
|
|
|
|
if output_on_failure:
|
|
support.verbose = True
|
|
|
|
stream = io.StringIO()
|
|
orig_stdout = sys.stdout
|
|
orig_stderr = sys.stderr
|
|
print_warning = support.print_warning
|
|
orig_print_warnings_stderr = print_warning.orig_stderr
|
|
|
|
output = None
|
|
try:
|
|
sys.stdout = stream
|
|
sys.stderr = stream
|
|
# print_warning() writes into the temporary stream to preserve
|
|
# messages order. If support.environment_altered becomes true,
|
|
# warnings will be written to sys.stderr below.
|
|
print_warning.orig_stderr = stream
|
|
|
|
_runtest_env_changed_exc(result, ns, display_failure=False)
|
|
# Ignore output if the test passed successfully
|
|
if result.state != State.PASSED:
|
|
output = stream.getvalue()
|
|
finally:
|
|
sys.stdout = orig_stdout
|
|
sys.stderr = orig_stderr
|
|
print_warning.orig_stderr = orig_print_warnings_stderr
|
|
|
|
if output is not None:
|
|
sys.stderr.write(output)
|
|
sys.stderr.flush()
|
|
else:
|
|
# Tell tests to be moderately quiet
|
|
support.verbose = ns.verbose
|
|
|
|
_runtest_env_changed_exc(result, ns,
|
|
display_failure=not ns.verbose)
|
|
|
|
if xml_list:
|
|
import xml.etree.ElementTree as ET
|
|
result.xml_data = [ET.tostring(x).decode('us-ascii')
|
|
for x in xml_list]
|
|
finally:
|
|
if use_timeout:
|
|
faulthandler.cancel_dump_traceback_later()
|
|
support.junit_xml_list = None
|
|
|
|
|
|
def runtest(ns: Namespace, test_name: str) -> TestResult:
|
|
"""Run a single test.
|
|
|
|
ns -- regrtest namespace of options
|
|
test_name -- the name of the test
|
|
|
|
Returns a TestResult.
|
|
|
|
If ns.xmlpath is not None, xml_data is a list containing each
|
|
generated testsuite element.
|
|
"""
|
|
start_time = time.perf_counter()
|
|
result = TestResult(test_name)
|
|
try:
|
|
_runtest_capture_output_timeout_junit(result, ns)
|
|
except:
|
|
if not ns.pgo:
|
|
msg = traceback.format_exc()
|
|
print(f"test {test_name} crashed -- {msg}",
|
|
file=sys.stderr, flush=True)
|
|
result.state = State.UNCAUGHT_EXC
|
|
result.duration = time.perf_counter() - start_time
|
|
return result
|
|
|
|
|
|
def _test_module(the_module):
|
|
loader = unittest.TestLoader()
|
|
tests = loader.loadTestsFromModule(the_module)
|
|
for error in loader.errors:
|
|
print(error, file=sys.stderr)
|
|
if loader.errors:
|
|
raise Exception("errors while loading tests")
|
|
return support.run_unittest(tests)
|
|
|
|
|
|
def save_env(ns: Namespace, test_name: str):
|
|
return saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo)
|
|
|
|
|
|
def regrtest_runner(result, test_func, ns) -> None:
|
|
# Run test_func(), collect statistics, and detect reference and memory
|
|
# leaks.
|
|
|
|
if ns.huntrleaks:
|
|
from test.libregrtest.refleak import dash_R
|
|
refleak, test_result = dash_R(ns, result.test_name, test_func)
|
|
else:
|
|
test_result = test_func()
|
|
refleak = False
|
|
|
|
if refleak:
|
|
result.state = State.REFLEAK
|
|
|
|
match test_result:
|
|
case TestStats():
|
|
stats = test_result
|
|
case unittest.TestResult():
|
|
stats = TestStats.from_unittest(test_result)
|
|
case doctest.TestResults():
|
|
stats = TestStats.from_doctest(test_result)
|
|
case None:
|
|
print_warning(f"{result.test_name} test runner returned None: {test_func}")
|
|
stats = None
|
|
case _:
|
|
print_warning(f"Unknown test result type: {type(test_result)}")
|
|
stats = None
|
|
|
|
result.stats = stats
|
|
|
|
|
|
def _load_run_test(result: TestResult, ns: Namespace) -> None:
|
|
# Load the test function, run the test function.
|
|
|
|
abstest = get_abs_module(ns, result.test_name)
|
|
|
|
# remove the module from sys.module to reload it if it was already imported
|
|
try:
|
|
del sys.modules[abstest]
|
|
except KeyError:
|
|
pass
|
|
|
|
the_module = importlib.import_module(abstest)
|
|
|
|
if hasattr(the_module, "test_main"):
|
|
# https://github.com/python/cpython/issues/89392
|
|
raise Exception(f"Module {result.test_name} defines test_main() which is no longer supported by regrtest")
|
|
test_func = functools.partial(_test_module, the_module)
|
|
|
|
try:
|
|
with save_env(ns, result.test_name):
|
|
regrtest_runner(result, test_func, ns)
|
|
finally:
|
|
# First kill any dangling references to open files etc.
|
|
# This can also issue some ResourceWarnings which would otherwise get
|
|
# triggered during the following test run, and possibly produce
|
|
# failures.
|
|
support.gc_collect()
|
|
|
|
cleanup_test_droppings(result.test_name, ns.verbose)
|
|
|
|
if gc.garbage:
|
|
support.environment_altered = True
|
|
print_warning(f"{result.test_name} created {len(gc.garbage)} "
|
|
f"uncollectable object(s).")
|
|
|
|
# move the uncollectable objects somewhere,
|
|
# so we don't see them again
|
|
FOUND_GARBAGE.extend(gc.garbage)
|
|
gc.garbage.clear()
|
|
|
|
support.reap_children()
|
|
|
|
|
|
def _runtest_env_changed_exc(result: TestResult, ns: Namespace,
|
|
display_failure: bool = True) -> None:
|
|
# Detect environment changes, handle exceptions.
|
|
|
|
# Reset the environment_altered flag to detect if a test altered
|
|
# the environment
|
|
support.environment_altered = False
|
|
|
|
if ns.pgo:
|
|
display_failure = False
|
|
|
|
test_name = result.test_name
|
|
try:
|
|
clear_caches()
|
|
support.gc_collect()
|
|
|
|
with save_env(ns, test_name):
|
|
_load_run_test(result, ns)
|
|
except support.ResourceDenied as msg:
|
|
if not ns.quiet and not ns.pgo:
|
|
print(f"{test_name} skipped -- {msg}", flush=True)
|
|
result.state = State.RESOURCE_DENIED
|
|
return
|
|
except unittest.SkipTest as msg:
|
|
if not ns.quiet and not ns.pgo:
|
|
print(f"{test_name} skipped -- {msg}", flush=True)
|
|
result.state = State.SKIPPED
|
|
return
|
|
except support.TestFailedWithDetails as exc:
|
|
msg = f"test {test_name} failed"
|
|
if display_failure:
|
|
msg = f"{msg} -- {exc}"
|
|
print(msg, file=sys.stderr, flush=True)
|
|
result.state = State.FAILED
|
|
result.errors = exc.errors
|
|
result.failures = exc.failures
|
|
result.stats = exc.stats
|
|
return
|
|
except support.TestFailed as exc:
|
|
msg = f"test {test_name} failed"
|
|
if display_failure:
|
|
msg = f"{msg} -- {exc}"
|
|
print(msg, file=sys.stderr, flush=True)
|
|
result.state = State.FAILED
|
|
result.stats = exc.stats
|
|
return
|
|
except support.TestDidNotRun:
|
|
result.state = State.DID_NOT_RUN
|
|
return
|
|
except KeyboardInterrupt:
|
|
print()
|
|
result.state = State.INTERRUPTED
|
|
return
|
|
except:
|
|
if not ns.pgo:
|
|
msg = traceback.format_exc()
|
|
print(f"test {test_name} crashed -- {msg}",
|
|
file=sys.stderr, flush=True)
|
|
result.state = State.UNCAUGHT_EXC
|
|
return
|
|
|
|
if support.environment_altered:
|
|
result.set_env_changed()
|
|
# Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
|
|
if result.state is None:
|
|
result.state = State.PASSED
|
|
|
|
|
|
def cleanup_test_droppings(test_name: str, verbose: int) -> None:
|
|
# Try to clean up junk commonly left behind. While tests shouldn't leave
|
|
# any files or directories behind, when a test fails that can be tedious
|
|
# for it to arrange. The consequences can be especially nasty on Windows,
|
|
# since if a test leaves a file open, it cannot be deleted by name (while
|
|
# there's nothing we can do about that here either, we can display the
|
|
# name of the offending test, which is a real help).
|
|
for name in (os_helper.TESTFN,):
|
|
if not os.path.exists(name):
|
|
continue
|
|
|
|
if os.path.isdir(name):
|
|
import shutil
|
|
kind, nuker = "directory", shutil.rmtree
|
|
elif os.path.isfile(name):
|
|
kind, nuker = "file", os.unlink
|
|
else:
|
|
raise RuntimeError(f"os.path says {name!r} exists but is neither "
|
|
f"directory nor file")
|
|
|
|
if verbose:
|
|
print_warning(f"{test_name} left behind {kind} {name!r}")
|
|
support.environment_altered = True
|
|
|
|
try:
|
|
import stat
|
|
# fix possible permissions problems that might prevent cleanup
|
|
os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
|
|
nuker(name)
|
|
except Exception as exc:
|
|
print_warning(f"{test_name} left behind {kind} {name!r} "
|
|
f"and it couldn't be removed: {exc}")
|