[3.12] gh-108822: regrtest computes statistics (#108793) (#108833)

gh-108822: regrtest computes statistics (#108793)

test_netrc, test_pep646_syntax and test_xml_etree now return results
in the test_main() function.

Changes:

* Rewrite TestResult as a dataclass with a new State class.
* Add test.support.TestStats class and Regrtest.stats_dict attribute.
* libregrtest.runtest functions now modify a TestResult instance
  in-place.
* libregrtest summary lists the number of run tests and skipped
  tests, and denied resources.
* Add TestResult.has_meaningful_duration() method.
* Compute TestResult duration in the upper function.
* Use time.perf_counter() instead of time.monotonic().
* Regrtest: rename 'resource_denieds' attribute to 'resource_denied'.
* Rename CHILD_ERROR to MULTIPROCESSING_ERROR.
* Use match/case syntadx to have different code depending on the
  test state.

Notes on the backport: doctest.TestResults.skipped is a new feature
in Python 3.13, so don't use it in the backport.

Co-authored-by: Alex Waygood <Alex.Waygood@Gmail.com>
(cherry picked from commit d4e534cbb3)
This commit is contained in:
Victor Stinner 2023-09-04 13:45:50 +02:00 committed by GitHub
parent dcaacd9066
commit c8cf6be213
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 506 additions and 303 deletions

View file

@ -11,15 +11,14 @@ import time
import unittest
from test.libregrtest.cmdline import _parse_args
from test.libregrtest.runtest import (
findtests, split_test_packages, runtest, get_abs_module, is_failed,
PROGRESS_MIN_TIME,
Passed, Failed, EnvChanged, Skipped, ResourceDenied, Interrupted,
ChildError, DidNotRun)
findtests, split_test_packages, runtest, get_abs_module,
PROGRESS_MIN_TIME, State)
from test.libregrtest.setup import setup_tests
from test.libregrtest.pgo import setup_pgo_tests
from test.libregrtest.utils import (removepy, count, format_duration,
printlist, get_build_info)
from test import support
from test.support import TestStats
from test.support import os_helper
from test.support import threading_helper
@ -78,13 +77,14 @@ class Regrtest:
self.good = []
self.bad = []
self.skipped = []
self.resource_denieds = []
self.resource_denied = []
self.environment_changed = []
self.run_no_tests = []
self.need_rerun = []
self.rerun = []
self.first_result = None
self.interrupted = False
self.stats_dict: dict[str, TestStats] = {}
# used by --slow
self.test_times = []
@ -93,7 +93,7 @@ class Regrtest:
self.tracer = None
# used to display the progress bar "[ 3/100]"
self.start_time = time.monotonic()
self.start_time = time.perf_counter()
self.test_count = ''
self.test_count_width = 1
@ -111,36 +111,41 @@ class Regrtest:
def get_executed(self):
return (set(self.good) | set(self.bad) | set(self.skipped)
| set(self.resource_denieds) | set(self.environment_changed)
| set(self.resource_denied) | set(self.environment_changed)
| set(self.run_no_tests))
def accumulate_result(self, result, rerun=False):
test_name = result.name
test_name = result.test_name
if not isinstance(result, (ChildError, Interrupted)) and not rerun:
self.test_times.append((result.duration_sec, test_name))
if result.has_meaningful_duration() and not rerun:
self.test_times.append((result.duration, test_name))
if isinstance(result, Passed):
self.good.append(test_name)
elif isinstance(result, ResourceDenied):
self.skipped.append(test_name)
self.resource_denieds.append(test_name)
elif isinstance(result, Skipped):
self.skipped.append(test_name)
elif isinstance(result, EnvChanged):
self.environment_changed.append(test_name)
elif isinstance(result, Failed):
if not rerun:
self.bad.append(test_name)
self.need_rerun.append(result)
elif isinstance(result, DidNotRun):
self.run_no_tests.append(test_name)
elif isinstance(result, Interrupted):
self.interrupted = True
else:
raise ValueError("invalid test result: %r" % result)
match result.state:
case State.PASSED:
self.good.append(test_name)
case State.ENV_CHANGED:
self.environment_changed.append(test_name)
case State.SKIPPED:
self.skipped.append(test_name)
case State.RESOURCE_DENIED:
self.skipped.append(test_name)
self.resource_denied.append(test_name)
case State.INTERRUPTED:
self.interrupted = True
case State.DID_NOT_RUN:
self.run_no_tests.append(test_name)
case _:
if result.is_failed(self.ns.fail_env_changed):
if not rerun:
self.bad.append(test_name)
self.need_rerun.append(result)
else:
raise ValueError(f"invalid test state: {state!r}")
if rerun and not isinstance(result, (Failed, Interrupted)):
if result.stats is not None:
self.stats_dict[result.test_name] = result.stats
if rerun and not(result.is_failed(False) or result.state == State.INTERRUPTED):
self.bad.remove(test_name)
xml_data = result.xml_data
@ -162,7 +167,7 @@ class Regrtest:
line = f"load avg: {load_avg:.2f} {line}"
# add the timestamp prefix: "0:01:05 "
test_time = time.monotonic() - self.start_time
test_time = time.perf_counter() - self.start_time
mins, secs = divmod(int(test_time), 60)
hours, mins = divmod(mins, 60)
@ -337,7 +342,7 @@ class Regrtest:
rerun_list = list(self.need_rerun)
self.need_rerun.clear()
for result in rerun_list:
test_name = result.name
test_name = result.test_name
self.rerun.append(test_name)
errors = result.errors or []
@ -364,7 +369,7 @@ class Regrtest:
self.accumulate_result(result, rerun=True)
if isinstance(result, Interrupted):
if result.state == State.INTERRUPTED:
break
if self.bad:
@ -461,7 +466,7 @@ class Regrtest:
previous_test = None
for test_index, test_name in enumerate(self.tests, 1):
start_time = time.monotonic()
start_time = time.perf_counter()
text = test_name
if previous_test:
@ -480,14 +485,14 @@ class Regrtest:
result = runtest(self.ns, test_name)
self.accumulate_result(result)
if isinstance(result, Interrupted):
if result.state == State.INTERRUPTED:
break
previous_test = str(result)
test_time = time.monotonic() - start_time
test_time = time.perf_counter() - start_time
if test_time >= PROGRESS_MIN_TIME:
previous_test = "%s in %s" % (previous_test, format_duration(test_time))
elif isinstance(result, Passed):
elif result.state == State.PASSED:
# be quiet: say nothing if the test passed shortly
previous_test = None
@ -496,7 +501,7 @@ class Regrtest:
if module not in save_modules and module.startswith("test."):
support.unload(module)
if self.ns.failfast and is_failed(result, self.ns):
if self.ns.failfast and result.is_failed(self.ns.fail_env_changed):
break
if previous_test:
@ -631,13 +636,48 @@ class Regrtest:
coverdir=self.ns.coverdir)
print()
duration = time.monotonic() - self.start_time
print("Total duration: %s" % format_duration(duration))
print("Tests result: %s" % self.get_tests_result())
self.display_summary()
if self.ns.runleaks:
os.system("leaks %d" % os.getpid())
def display_summary(self):
duration = time.perf_counter() - self.start_time
# Total duration
print("Total duration: %s" % format_duration(duration))
# Total tests
total = TestStats()
for stats in self.stats_dict.values():
total.accumulate(stats)
stats = [f'run={total.tests_run:,}']
if total.failures:
stats.append(f'failures={total.failures:,}')
if total.skipped:
stats.append(f'skipped={total.skipped:,}')
print(f"Total tests: {' '.join(stats)}")
# Total test files
report = [f'success={len(self.good)}']
if self.bad:
report.append(f'failed={len(self.bad)}')
if self.environment_changed:
report.append(f'env_changed={len(self.environment_changed)}')
if self.skipped:
report.append(f'skipped={len(self.skipped)}')
if self.resource_denied:
report.append(f'resource_denied={len(self.resource_denied)}')
if self.rerun:
report.append(f'rerun={len(self.rerun)}')
if self.run_no_tests:
report.append(f'run_no_tests={len(self.run_no_tests)}')
print(f"Total test files: {' '.join(report)}")
# Result
result = self.get_tests_result()
print(f"Result: {result}")
def save_xml_result(self):
if not self.ns.xmlpath and not self.testsuite_xml:
return

View file

@ -83,11 +83,12 @@ def dash_R(ns, test_name, test_func):
print(("1234567890"*(repcount//10 + 1))[:repcount], file=sys.stderr,
flush=True)
results = None
dash_R_cleanup(fs, ps, pic, zdc, abcs)
support.gc_collect()
for i in rep_range:
test_func()
results = test_func()
dash_R_cleanup(fs, ps, pic, zdc, abcs)
support.gc_collect()
@ -151,7 +152,7 @@ def dash_R(ns, test_name, test_func):
print(msg, file=refrep)
refrep.flush()
failed = True
return failed
return (failed, results)
def dash_R_cleanup(fs, ps, pic, zdc, abcs):

View file

@ -1,3 +1,5 @@
import dataclasses
import doctest
import faulthandler
import functools
import gc
@ -10,6 +12,7 @@ import traceback
import unittest
from test import support
from test.support import TestStats
from test.support import os_helper
from test.support import threading_helper
from test.libregrtest.cmdline import Namespace
@ -17,108 +20,114 @@ from test.libregrtest.save_env import saved_test_environment
from test.libregrtest.utils import clear_caches, format_duration, print_warning
# Avoid enum.Enum to reduce the number of imports when tests are run
class State:
PASSED = "PASSED"
FAILED = "FAILED"
SKIPPED = "SKIPPED"
UNCAUGHT_EXC = "UNCAUGHT_EXC"
REFLEAK = "REFLEAK"
ENV_CHANGED = "ENV_CHANGED"
RESOURCE_DENIED = "RESOURCE_DENIED"
INTERRUPTED = "INTERRUPTED"
MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR"
DID_NOT_RUN = "DID_NOT_RUN"
TIMEOUT = "TIMEOUT"
@staticmethod
def is_failed(state):
return state in {
State.FAILED,
State.UNCAUGHT_EXC,
State.REFLEAK,
State.MULTIPROCESSING_ERROR,
State.TIMEOUT}
@staticmethod
def has_meaningful_duration(state):
# Consider that the duration is meaningless for these cases.
# For example, if a whole test file is skipped, its duration
# is unlikely to be the duration of executing its tests,
# but just the duration to execute code which skips the test.
return state not in {
State.SKIPPED,
State.RESOURCE_DENIED,
State.INTERRUPTED,
State.MULTIPROCESSING_ERROR,
State.DID_NOT_RUN}
@dataclasses.dataclass(slots=True)
class TestResult:
def __init__(
self,
name: str,
duration_sec: float = 0.0,
xml_data: list[str] | None = None,
) -> None:
self.name = name
self.duration_sec = duration_sec
self.xml_data = xml_data
test_name: str
state: str | None = None
# Test duration in seconds
duration: float | None = None
xml_data: list[str] | None = None
stats: TestStats | None = None
def __str__(self) -> str:
return f"{self.name} finished"
# errors and failures copied from support.TestFailedWithDetails
errors: list[tuple[str, str]] | None = None
failures: list[tuple[str, str]] | None = None
def is_failed(self, fail_env_changed: bool) -> bool:
if self.state == State.ENV_CHANGED:
return fail_env_changed
return State.is_failed(self.state)
class Passed(TestResult):
def __str__(self) -> str:
return f"{self.name} passed"
class Failed(TestResult):
def __init__(
self,
name: str,
duration_sec: float = 0.0,
xml_data: list[str] | None = None,
errors: list[tuple[str, str]] | None = None,
failures: list[tuple[str, str]] | None = None,
) -> None:
super().__init__(name, duration_sec=duration_sec, xml_data=xml_data)
self.errors = errors
self.failures = failures
def __str__(self) -> str:
def _format_failed(self):
if self.errors and self.failures:
le = len(self.errors)
lf = len(self.failures)
error_s = "error" + ("s" if le > 1 else "")
failure_s = "failure" + ("s" if lf > 1 else "")
return f"{self.name} failed ({le} {error_s}, {lf} {failure_s})"
return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"
if self.errors:
le = len(self.errors)
error_s = "error" + ("s" if le > 1 else "")
return f"{self.name} failed ({le} {error_s})"
return f"{self.test_name} failed ({le} {error_s})"
if self.failures:
lf = len(self.failures)
failure_s = "failure" + ("s" if lf > 1 else "")
return f"{self.name} failed ({lf} {failure_s})"
return f"{self.test_name} failed ({lf} {failure_s})"
return f"{self.name} failed"
return f"{self.test_name} failed"
class UncaughtException(Failed):
def __str__(self) -> str:
return f"{self.name} failed (uncaught exception)"
match self.state:
case State.PASSED:
return f"{self.test_name} passed"
case State.FAILED:
return self._format_failed()
case State.SKIPPED:
return f"{self.test_name} skipped"
case State.UNCAUGHT_EXC:
return f"{self.test_name} failed (uncaught exception)"
case State.REFLEAK:
return f"{self.test_name} failed (reference leak)"
case State.ENV_CHANGED:
return f"{self.test_name} failed (env changed)"
case State.RESOURCE_DENIED:
return f"{self.test_name} skipped (resource denied)"
case State.INTERRUPTED:
return f"{self.test_name} interrupted"
case State.MULTIPROCESSING_ERROR:
return f"{self.test_name} process crashed"
case State.DID_NOT_RUN:
return f"{self.test_name} ran no tests"
case State.TIMEOUT:
return f"{self.test_name} timed out ({format_duration(self.duration)})"
case _:
raise ValueError("unknown result state: {state!r}")
def has_meaningful_duration(self):
return State.has_meaningful_duration(self.state)
class EnvChanged(Failed):
def __str__(self) -> str:
return f"{self.name} failed (env changed)"
# Convert Passed to EnvChanged
@staticmethod
def from_passed(other):
return EnvChanged(other.name, other.duration_sec, other.xml_data)
class RefLeak(Failed):
def __str__(self) -> str:
return f"{self.name} failed (reference leak)"
class Skipped(TestResult):
def __str__(self) -> str:
return f"{self.name} skipped"
class ResourceDenied(Skipped):
def __str__(self) -> str:
return f"{self.name} skipped (resource denied)"
class Interrupted(TestResult):
def __str__(self) -> str:
return f"{self.name} interrupted"
class ChildError(Failed):
def __str__(self) -> str:
return f"{self.name} crashed"
class DidNotRun(TestResult):
def __str__(self) -> str:
return f"{self.name} ran no tests"
class Timeout(Failed):
def __str__(self) -> str:
return f"{self.name} timed out ({format_duration(self.duration_sec)})"
def set_env_changed(self):
if self.state is None or self.state == State.PASSED:
self.state = State.ENV_CHANGED
# Minimum duration of a test to display its duration or to mention that
@ -142,12 +151,6 @@ SPLITTESTDIRS = {
FOUND_GARBAGE = []
def is_failed(result: TestResult, ns: Namespace) -> bool:
if isinstance(result, EnvChanged):
return ns.fail_env_changed
return isinstance(result, Failed)
def findtestdir(path=None):
return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
@ -194,9 +197,9 @@ def get_abs_module(ns: Namespace, test_name: str) -> str:
return 'test.' + test_name
def _runtest(ns: Namespace, test_name: str) -> TestResult:
# Handle faulthandler timeout, capture stdout+stderr, XML serialization
# and measure time.
def _runtest_capture_output_timeout_junit(result: TestResult, ns: Namespace) -> None:
# Capture stdout and stderr, set faulthandler timeout,
# and create JUnit XML report.
output_on_failure = ns.verbose3
@ -206,7 +209,6 @@ def _runtest(ns: Namespace, test_name: str) -> TestResult:
if use_timeout:
faulthandler.dump_traceback_later(ns.timeout, exit=True)
start_time = time.perf_counter()
try:
support.set_match_tests(ns.match_tests, ns.ignore_tests)
support.junit_xml_list = xml_list = [] if ns.xmlpath else None
@ -231,9 +233,9 @@ def _runtest(ns: Namespace, test_name: str) -> TestResult:
# warnings will be written to sys.stderr below.
print_warning.orig_stderr = stream
result = _runtest_inner(ns, test_name,
display_failure=False)
if not isinstance(result, Passed):
_runtest_env_changed_exc(result, ns, display_failure=False)
# Ignore output if the test passed successfully
if result.state != State.PASSED:
output = stream.getvalue()
finally:
sys.stdout = orig_stdout
@ -247,18 +249,13 @@ def _runtest(ns: Namespace, test_name: str) -> TestResult:
# Tell tests to be moderately quiet
support.verbose = ns.verbose
result = _runtest_inner(ns, test_name,
display_failure=not ns.verbose)
_runtest_env_changed_exc(result, ns,
display_failure=not ns.verbose)
if xml_list:
import xml.etree.ElementTree as ET
result.xml_data = [
ET.tostring(x).decode('us-ascii')
for x in xml_list
]
result.duration_sec = time.perf_counter() - start_time
return result
result.xml_data = [ET.tostring(x).decode('us-ascii')
for x in xml_list]
finally:
if use_timeout:
faulthandler.cancel_dump_traceback_later()
@ -271,19 +268,23 @@ def runtest(ns: Namespace, test_name: str) -> TestResult:
ns -- regrtest namespace of options
test_name -- the name of the test
Returns a TestResult sub-class depending on the kind of result received.
Returns a TestResult.
If ns.xmlpath is not None, xml_data is a list containing each
generated testsuite element.
"""
start_time = time.perf_counter()
result = TestResult(test_name)
try:
return _runtest(ns, test_name)
_runtest_capture_output_timeout_junit(result, ns)
except:
if not ns.pgo:
msg = traceback.format_exc()
print(f"test {test_name} crashed -- {msg}",
file=sys.stderr, flush=True)
return Failed(test_name)
result.state = State.UNCAUGHT_EXC
result.duration = time.perf_counter() - start_time
return result
def _test_module(the_module):
@ -293,18 +294,48 @@ def _test_module(the_module):
print(error, file=sys.stderr)
if loader.errors:
raise Exception("errors while loading tests")
support.run_unittest(tests)
return support.run_unittest(tests)
def save_env(ns: Namespace, test_name: str):
return saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo)
def _runtest_inner2(ns: Namespace, test_name: str) -> bool:
# Load the test function, run the test function, handle huntrleaks
# to detect leaks.
def regrtest_runner(result, test_func, ns) -> None:
# Run test_func(), collect statistics, and detect reference and memory
# leaks.
abstest = get_abs_module(ns, test_name)
if ns.huntrleaks:
from test.libregrtest.refleak import dash_R
refleak, test_result = dash_R(ns, result.test_name, test_func)
else:
test_result = test_func()
refleak = False
if refleak:
result.state = State.REFLEAK
match test_result:
case TestStats():
stats = test_result
case unittest.TestResult():
stats = TestStats.from_unittest(test_result)
case doctest.TestResults():
stats = TestStats.from_doctest(test_result)
case None:
print_warning(f"{result.test_name} test runner returned None: {test_func}")
stats = None
case _:
print_warning(f"Unknown test result type: {type(test_result)}")
stats = None
result.stats = stats
def _load_run_test(result: TestResult, ns: Namespace) -> None:
# Load the test function, run the test function.
abstest = get_abs_module(ns, result.test_name)
# remove the module from sys.module to reload it if it was already imported
try:
@ -314,23 +345,15 @@ def _runtest_inner2(ns: Namespace, test_name: str) -> bool:
the_module = importlib.import_module(abstest)
if ns.huntrleaks:
from test.libregrtest.refleak import dash_R
# If the test has a test_main, that will run the appropriate
# tests. If not, use normal unittest test loading.
test_runner = getattr(the_module, "test_main", None)
if test_runner is None:
test_runner = functools.partial(_test_module, the_module)
test_func = getattr(the_module, "test_main", None)
if test_func is None:
test_func = functools.partial(_test_module, the_module)
try:
with save_env(ns, test_name):
if ns.huntrleaks:
# Return True if the test leaked references
refleak = dash_R(ns, test_name, test_runner)
else:
test_runner()
refleak = False
with save_env(ns, result.test_name):
regrtest_runner(result, test_func, ns)
finally:
# First kill any dangling references to open files etc.
# This can also issue some ResourceWarnings which would otherwise get
@ -338,11 +361,11 @@ def _runtest_inner2(ns: Namespace, test_name: str) -> bool:
# failures.
support.gc_collect()
cleanup_test_droppings(test_name, ns.verbose)
cleanup_test_droppings(result.test_name, ns.verbose)
if gc.garbage:
support.environment_altered = True
print_warning(f"{test_name} created {len(gc.garbage)} "
print_warning(f"{result.test_name} created {len(gc.garbage)} "
f"uncollectable object(s).")
# move the uncollectable objects somewhere,
@ -352,12 +375,9 @@ def _runtest_inner2(ns: Namespace, test_name: str) -> bool:
support.reap_children()
return refleak
def _runtest_inner(
ns: Namespace, test_name: str, display_failure: bool = True
) -> TestResult:
def _runtest_env_changed_exc(result: TestResult, ns: Namespace,
display_failure: bool = True) -> None:
# Detect environment changes, handle exceptions.
# Reset the environment_altered flag to detect if a test altered
@ -367,49 +387,61 @@ def _runtest_inner(
if ns.pgo:
display_failure = False
test_name = result.test_name
try:
clear_caches()
support.gc_collect()
with save_env(ns, test_name):
refleak = _runtest_inner2(ns, test_name)
_load_run_test(result, ns)
except support.ResourceDenied as msg:
if not ns.quiet and not ns.pgo:
print(f"{test_name} skipped -- {msg}", flush=True)
return ResourceDenied(test_name)
result.state = State.RESOURCE_DENIED
return
except unittest.SkipTest as msg:
if not ns.quiet and not ns.pgo:
print(f"{test_name} skipped -- {msg}", flush=True)
return Skipped(test_name)
result.state = State.SKIPPED
return
except support.TestFailedWithDetails as exc:
msg = f"test {test_name} failed"
if display_failure:
msg = f"{msg} -- {exc}"
print(msg, file=sys.stderr, flush=True)
return Failed(test_name, errors=exc.errors, failures=exc.failures)
result.state = State.FAILED
result.errors = exc.errors
result.failures = exc.failures
result.stats = exc.stats
return
except support.TestFailed as exc:
msg = f"test {test_name} failed"
if display_failure:
msg = f"{msg} -- {exc}"
print(msg, file=sys.stderr, flush=True)
return Failed(test_name)
result.state = State.FAILED
result.stats = exc.stats
return
except support.TestDidNotRun:
return DidNotRun(test_name)
result.state = State.DID_NOT_RUN
return
except KeyboardInterrupt:
print()
return Interrupted(test_name)
result.state = State.INTERRUPTED
return
except:
if not ns.pgo:
msg = traceback.format_exc()
print(f"test {test_name} crashed -- {msg}",
file=sys.stderr, flush=True)
return UncaughtException(test_name)
result.state = State.UNCAUGHT_EXC
return
if refleak:
return RefLeak(test_name)
if support.environment_altered:
return EnvChanged(test_name)
return Passed(test_name)
result.set_env_changed()
# Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
if result.state is None:
result.state = State.PASSED
def cleanup_test_droppings(test_name: str, verbose: int) -> None:

View file

@ -1,3 +1,4 @@
import dataclasses
import faulthandler
import json
import os.path
@ -13,12 +14,13 @@ from typing import NamedTuple, NoReturn, Literal, Any, TextIO
from test import support
from test.support import os_helper
from test.support import TestStats
from test.libregrtest.cmdline import Namespace
from test.libregrtest.main import Regrtest
from test.libregrtest.runtest import (
runtest, is_failed, TestResult, Interrupted, Timeout, ChildError,
PROGRESS_MIN_TIME, Passed, EnvChanged)
runtest, TestResult, State,
PROGRESS_MIN_TIME)
from test.libregrtest.setup import setup_tests
from test.libregrtest.utils import format_duration, print_warning
@ -43,9 +45,9 @@ USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
def must_stop(result: TestResult, ns: Namespace) -> bool:
if isinstance(result, Interrupted):
if result.state == State.INTERRUPTED:
return True
if ns.failfast and is_failed(result, ns):
if ns.failfast and result.is_failed(ns.fail_env_changed):
return True
return False
@ -130,8 +132,8 @@ class MultiprocessIterator:
class MultiprocessResult(NamedTuple):
result: TestResult
# bpo-45410: stderr is written into stdout to keep messages order
stdout: str
error_msg: str
worker_stdout: str | None = None
err_msg: str | None = None
ExcStr = str
@ -209,15 +211,12 @@ class TestWorkerProcess(threading.Thread):
def mp_result_error(
self,
test_result: TestResult,
stdout: str = '',
stdout: str | None = None,
err_msg=None
) -> MultiprocessResult:
test_result.duration_sec = time.monotonic() - self.start_time
return MultiprocessResult(test_result, stdout, err_msg)
def _run_process(self, test_name: str, tmp_dir: str, stdout_fh: TextIO) -> int:
self.start_time = time.monotonic()
self.current_test_name = test_name
try:
popen = run_test_in_subprocess(test_name, self.ns, tmp_dir, stdout_fh)
@ -306,38 +305,41 @@ class TestWorkerProcess(threading.Thread):
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
err_msg = f"Cannot read process stdout: {exc}"
return self.mp_result_error(ChildError(test_name), '', err_msg)
result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
return self.mp_result_error(result, err_msg=err_msg)
if retcode is None:
return self.mp_result_error(Timeout(test_name), stdout)
result = TestResult(test_name, state=State.TIMEOUT)
return self.mp_result_error(result, stdout)
err_msg = None
if retcode != 0:
err_msg = "Exit code %s" % retcode
else:
stdout, _, result = stdout.rpartition("\n")
stdout, _, worker_json = stdout.rpartition("\n")
stdout = stdout.rstrip()
if not result:
if not worker_json:
err_msg = "Failed to parse worker stdout"
else:
try:
# deserialize run_tests_worker() output
result = json.loads(result, object_hook=decode_test_result)
result = json.loads(worker_json,
object_hook=decode_test_result)
except Exception as exc:
err_msg = "Failed to parse worker JSON: %s" % exc
if err_msg is not None:
return self.mp_result_error(ChildError(test_name), stdout, err_msg)
if err_msg:
result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
return self.mp_result_error(result, stdout, err_msg)
if tmp_files:
msg = (f'\n\n'
f'Warning -- {test_name} leaked temporary files '
f'({len(tmp_files)}): {", ".join(sorted(tmp_files))}')
stdout += msg
if isinstance(result, Passed):
result = EnvChanged.from_passed(result)
result.set_env_changed()
return MultiprocessResult(result, stdout, err_msg)
return MultiprocessResult(result, stdout)
def run(self) -> None:
while not self._stopped:
@ -347,7 +349,9 @@ class TestWorkerProcess(threading.Thread):
except StopIteration:
break
self.start_time = time.monotonic()
mp_result = self._runtest(test_name)
mp_result.result.duration = time.monotonic() - self.start_time
self.output.put((False, mp_result))
if must_stop(mp_result.result, self.ns):
@ -473,11 +477,11 @@ class MultiprocessTestRunner:
result = mp_result.result
text = str(result)
if mp_result.error_msg is not None:
# CHILD_ERROR
text += ' (%s)' % mp_result.error_msg
elif (result.duration_sec >= PROGRESS_MIN_TIME and not self.ns.pgo):
text += ' (%s)' % format_duration(result.duration_sec)
if mp_result.err_msg:
# MULTIPROCESSING_ERROR
text += ' (%s)' % mp_result.err_msg
elif (result.duration >= PROGRESS_MIN_TIME and not self.ns.pgo):
text += ' (%s)' % format_duration(result.duration)
running = get_running(self.workers)
if running and not self.ns.pgo:
text += ' -- running: %s' % ', '.join(running)
@ -489,7 +493,7 @@ class MultiprocessTestRunner:
# Thread got an exception
format_exc = item[1]
print_warning(f"regrtest worker thread failed: {format_exc}")
result = ChildError("<regrtest worker>")
result = TestResult("<regrtest worker>", state=State.MULTIPROCESSING_ERROR)
self.regrtest.accumulate_result(result)
return True
@ -498,8 +502,8 @@ class MultiprocessTestRunner:
self.regrtest.accumulate_result(mp_result.result)
self.display_result(mp_result)
if mp_result.stdout:
print(mp_result.stdout, flush=True)
if mp_result.worker_stdout:
print(mp_result.worker_stdout, flush=True)
if must_stop(mp_result.result, self.ns):
return True
@ -541,32 +545,20 @@ class EncodeTestResult(json.JSONEncoder):
def default(self, o: Any) -> dict[str, Any]:
if isinstance(o, TestResult):
result = vars(o)
result = dataclasses.asdict(o)
result["__test_result__"] = o.__class__.__name__
return result
return super().default(o)
def decode_test_result(d: dict[str, Any]) -> TestResult | dict[str, Any]:
def decode_test_result(d: dict[str, Any]) -> TestResult | TestStats | dict[str, Any]:
"""Decode a TestResult (sub)class object from a JSON dict."""
if "__test_result__" not in d:
return d
cls_name = d.pop("__test_result__")
for cls in get_all_test_result_classes():
if cls.__name__ == cls_name:
return cls(**d)
def get_all_test_result_classes() -> set[type[TestResult]]:
prev_count = 0
classes = {TestResult}
while len(classes) > prev_count:
prev_count = len(classes)
to_add = []
for cls in classes:
to_add.extend(cls.__subclasses__())
classes.update(to_add)
return classes
d.pop('__test_result__')
if d['stats'] is not None:
d['stats'] = TestStats(**d['stats'])
return TestResult(**d)

View file

@ -23,7 +23,7 @@ class SkipTestEnvironment(Exception):
class saved_test_environment:
"""Save bits of the test environment and restore them at block exit.
with saved_test_environment(testname, verbose, quiet):
with saved_test_environment(test_name, verbose, quiet):
#stuff
Unless quiet is True, a warning is printed to stderr if any of
@ -34,8 +34,8 @@ class saved_test_environment:
items is also printed.
"""
def __init__(self, testname, verbose=0, quiet=False, *, pgo=False):
self.testname = testname
def __init__(self, test_name, verbose=0, quiet=False, *, pgo=False):
self.test_name = test_name
self.verbose = verbose
self.quiet = quiet
self.pgo = pgo
@ -323,7 +323,7 @@ class saved_test_environment:
restore(original)
if not self.quiet and not self.pgo:
print_warning(
f"{name} was modified by {self.testname}\n"
f"{name} was modified by {self.test_name}\n"
f" Before: {original}\n"
f" After: {current} ")
return False

View file

@ -4,6 +4,7 @@ if __name__ != 'test.support':
raise ImportError('support must be imported from the test package')
import contextlib
import dataclasses
import functools
import getpass
import opcode
@ -118,18 +119,21 @@ class Error(Exception):
class TestFailed(Error):
"""Test failed."""
class TestFailedWithDetails(TestFailed):
"""Test failed."""
def __init__(self, msg, errors, failures):
def __init__(self, msg, *args, stats=None):
self.msg = msg
self.errors = errors
self.failures = failures
super().__init__(msg, errors, failures)
self.stats = stats
super().__init__(msg, *args)
def __str__(self):
return self.msg
class TestFailedWithDetails(TestFailed):
"""Test failed."""
def __init__(self, msg, errors, failures, stats):
self.errors = errors
self.failures = failures
super().__init__(msg, errors, failures, stats=stats)
class TestDidNotRun(Error):
"""Test did not run any subtests."""
@ -1108,6 +1112,29 @@ def _filter_suite(suite, pred):
newtests.append(test)
suite._tests = newtests
@dataclasses.dataclass(slots=True)
class TestStats:
tests_run: int = 0
failures: int = 0
skipped: int = 0
@staticmethod
def from_unittest(result):
return TestStats(result.testsRun,
len(result.failures),
len(result.skipped))
@staticmethod
def from_doctest(results):
return TestStats(results.attempted,
results.failed)
def accumulate(self, stats):
self.tests_run += stats.tests_run
self.failures += stats.failures
self.skipped += stats.skipped
def _run_suite(suite):
"""Run tests from a unittest.TestSuite-derived class."""
runner = get_test_runner(sys.stdout,
@ -1122,6 +1149,7 @@ def _run_suite(suite):
if not result.testsRun and not result.skipped and not result.errors:
raise TestDidNotRun
if not result.wasSuccessful():
stats = TestStats.from_unittest(result)
if len(result.errors) == 1 and not result.failures:
err = result.errors[0][1]
elif len(result.failures) == 1 and not result.errors:
@ -1131,7 +1159,8 @@ def _run_suite(suite):
if not verbose: err += "; run in verbose mode for details"
errors = [(str(tc), exc_str) for tc, exc_str in result.errors]
failures = [(str(tc), exc_str) for tc, exc_str in result.failures]
raise TestFailedWithDetails(err, errors, failures)
raise TestFailedWithDetails(err, errors, failures, stats=stats)
return result
# By default, don't filter tests
@ -1240,7 +1269,7 @@ def run_unittest(*classes):
else:
suite.addTest(loader.loadTestsFromTestCase(cls))
_filter_suite(suite, match_test)
_run_suite(suite)
return _run_suite(suite)
#=======================================================================
# Check for the presence of docstrings.
@ -1280,13 +1309,18 @@ def run_doctest(module, verbosity=None, optionflags=0):
else:
verbosity = None
f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags)
if f:
raise TestFailed("%d of %d doctests failed" % (f, t))
results = doctest.testmod(module,
verbose=verbosity,
optionflags=optionflags)
if results.failed:
stats = TestStats.from_doctest(results)
raise TestFailed(f"{results.failed} of {results.attempted} "
f"doctests failed",
stats=stats)
if verbose:
print('doctest (%s) ... %d tests with zero failures' %
(module.__name__, t))
return f, t
(module.__name__, results.attempted))
return results
#=======================================================================

View file

@ -309,7 +309,7 @@ class NetrcTestCase(unittest.TestCase):
('anonymous', '', 'pass'))
def test_main():
run_unittest(NetrcTestCase)
return run_unittest(NetrcTestCase)
if __name__ == "__main__":
test_main()

View file

@ -320,7 +320,7 @@ __test__ = {'doctests' : doctests}
def test_main(verbose=False):
from test import support
from test import test_pep646_syntax
support.run_doctest(test_pep646_syntax, verbose)
return support.run_doctest(test_pep646_syntax, verbose)
if __name__ == "__main__":
test_main(verbose=True)

View file

@ -19,7 +19,7 @@ import textwrap
import unittest
from test import libregrtest
from test import support
from test.support import os_helper
from test.support import os_helper, TestStats
from test.libregrtest import utils, setup
if not support.has_subprocess_support:
@ -409,7 +409,9 @@ class BaseTestCase(unittest.TestCase):
self.fail("%r not found in %r" % (regex, output))
return match
def check_line(self, output, regex):
def check_line(self, output, regex, full=False):
if full:
regex += '\n'
regex = re.compile(r'^' + regex, re.MULTILINE)
self.assertRegex(output, regex)
@ -421,21 +423,27 @@ class BaseTestCase(unittest.TestCase):
def check_executed_tests(self, output, tests, skipped=(), failed=(),
env_changed=(), omitted=(),
rerun={}, no_test_ran=(),
rerun={}, run_no_tests=(),
resource_denied=(),
randomize=False, interrupted=False,
fail_env_changed=False):
fail_env_changed=False,
*, stats):
if isinstance(tests, str):
tests = [tests]
if isinstance(skipped, str):
skipped = [skipped]
if isinstance(resource_denied, str):
resource_denied = [resource_denied]
if isinstance(failed, str):
failed = [failed]
if isinstance(env_changed, str):
env_changed = [env_changed]
if isinstance(omitted, str):
omitted = [omitted]
if isinstance(no_test_ran, str):
no_test_ran = [no_test_ran]
if isinstance(run_no_tests, str):
run_no_tests = [run_no_tests]
if isinstance(stats, int):
stats = TestStats(stats)
executed = self.parse_executed_tests(output)
if randomize:
@ -479,12 +487,12 @@ class BaseTestCase(unittest.TestCase):
regex = LOG_PREFIX + f"Re-running {name} in verbose mode \\(matching: {match}\\)"
self.check_line(output, regex)
if no_test_ran:
regex = list_regex('%s test%s run no tests', no_test_ran)
if run_no_tests:
regex = list_regex('%s test%s run no tests', run_no_tests)
self.check_line(output, regex)
good = (len(tests) - len(skipped) - len(failed)
- len(omitted) - len(env_changed) - len(no_test_ran))
- len(omitted) - len(env_changed) - len(run_no_tests))
if good:
regex = r'%s test%s OK\.$' % (good, plural(good))
if not skipped and not failed and good > 1:
@ -494,6 +502,33 @@ class BaseTestCase(unittest.TestCase):
if interrupted:
self.check_line(output, 'Test suite interrupted by signal SIGINT.')
# Total tests
parts = [f'run={stats.tests_run:,}']
if stats.failures:
parts.append(f'failures={stats.failures:,}')
if stats.skipped:
parts.append(f'skipped={stats.skipped:,}')
line = fr'Total tests: {" ".join(parts)}'
self.check_line(output, line, full=True)
# Total test files
report = [f'success={good}']
if failed:
report.append(f'failed={len(failed)}')
if env_changed:
report.append(f'env_changed={len(env_changed)}')
if skipped:
report.append(f'skipped={len(skipped)}')
if resource_denied:
report.append(f'resource_denied={len(resource_denied)}')
if rerun:
report.append(f'rerun={len(rerun)}')
if run_no_tests:
report.append(f'run_no_tests={len(run_no_tests)}')
line = fr'Total test files: {" ".join(report)}'
self.check_line(output, line, full=True)
# Result
result = []
if failed:
result.append('FAILURE')
@ -508,10 +543,8 @@ class BaseTestCase(unittest.TestCase):
result.append('SUCCESS')
result = ', '.join(result)
if rerun:
self.check_line(output, 'Tests result: FAILURE')
result = 'FAILURE then %s' % result
self.check_line(output, 'Tests result: %s' % result)
self.check_line(output, f'Result: {result}', full=True)
def parse_random_seed(self, output):
match = self.regex_search(r'Using random seed ([0-9]+)', output)
@ -600,7 +633,8 @@ class ProgramsTestCase(BaseTestCase):
def check_output(self, output):
self.parse_random_seed(output)
self.check_executed_tests(output, self.tests, randomize=True)
self.check_executed_tests(output, self.tests,
randomize=True, stats=len(self.tests))
def run_tests(self, args):
output = self.run_python(args)
@ -714,7 +748,8 @@ class ArgsTestCase(BaseTestCase):
tests = [test_ok, test_failing]
output = self.run_tests(*tests, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, tests, failed=test_failing)
self.check_executed_tests(output, tests, failed=test_failing,
stats=TestStats(2, 1))
def test_resources(self):
# test -u command line option
@ -733,17 +768,21 @@ class ArgsTestCase(BaseTestCase):
# -u all: 2 resources enabled
output = self.run_tests('-u', 'all', *test_names)
self.check_executed_tests(output, test_names)
self.check_executed_tests(output, test_names, stats=2)
# -u audio: 1 resource enabled
output = self.run_tests('-uaudio', *test_names)
self.check_executed_tests(output, test_names,
skipped=tests['network'])
skipped=tests['network'],
resource_denied=tests['network'],
stats=1)
# no option: 0 resources enabled
output = self.run_tests(*test_names)
self.check_executed_tests(output, test_names,
skipped=test_names)
skipped=test_names,
resource_denied=test_names,
stats=0)
def test_random(self):
# test -r and --randseed command line option
@ -791,7 +830,8 @@ class ArgsTestCase(BaseTestCase):
previous = name
output = self.run_tests('--fromfile', filename)
self.check_executed_tests(output, tests)
stats = len(tests)
self.check_executed_tests(output, tests, stats=stats)
# test format '[2/7] test_opcodes'
with open(filename, "w") as fp:
@ -799,7 +839,7 @@ class ArgsTestCase(BaseTestCase):
print("[%s/%s] %s" % (index, len(tests), name), file=fp)
output = self.run_tests('--fromfile', filename)
self.check_executed_tests(output, tests)
self.check_executed_tests(output, tests, stats=stats)
# test format 'test_opcodes'
with open(filename, "w") as fp:
@ -807,7 +847,7 @@ class ArgsTestCase(BaseTestCase):
print(name, file=fp)
output = self.run_tests('--fromfile', filename)
self.check_executed_tests(output, tests)
self.check_executed_tests(output, tests, stats=stats)
# test format 'Lib/test/test_opcodes.py'
with open(filename, "w") as fp:
@ -815,20 +855,20 @@ class ArgsTestCase(BaseTestCase):
print('Lib/test/%s.py' % name, file=fp)
output = self.run_tests('--fromfile', filename)
self.check_executed_tests(output, tests)
self.check_executed_tests(output, tests, stats=stats)
def test_interrupted(self):
code = TEST_INTERRUPTED
test = self.create_test('sigint', code=code)
output = self.run_tests(test, exitcode=EXITCODE_INTERRUPTED)
self.check_executed_tests(output, test, omitted=test,
interrupted=True)
interrupted=True, stats=0)
def test_slowest(self):
# test --slowest
tests = [self.create_test() for index in range(3)]
output = self.run_tests("--slowest", *tests)
self.check_executed_tests(output, tests)
self.check_executed_tests(output, tests, stats=len(tests))
regex = ('10 slowest tests:\n'
'(?:- %s: .*\n){%s}'
% (self.TESTNAME_REGEX, len(tests)))
@ -847,7 +887,8 @@ class ArgsTestCase(BaseTestCase):
args = ("--slowest", test)
output = self.run_tests(*args, exitcode=EXITCODE_INTERRUPTED)
self.check_executed_tests(output, test,
omitted=test, interrupted=True)
omitted=test, interrupted=True,
stats=0)
regex = ('10 slowest tests:\n')
self.check_line(output, regex)
@ -856,7 +897,7 @@ class ArgsTestCase(BaseTestCase):
# test --coverage
test = self.create_test('coverage')
output = self.run_tests("--coverage", test)
self.check_executed_tests(output, [test])
self.check_executed_tests(output, [test], stats=1)
regex = (r'lines +cov% +module +\(path\)\n'
r'(?: *[0-9]+ *[0-9]{1,2}% *[^ ]+ +\([^)]+\)+)+')
self.check_line(output, regex)
@ -886,7 +927,8 @@ class ArgsTestCase(BaseTestCase):
""")
test = self.create_test('forever', code=code)
output = self.run_tests('--forever', test, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, [test]*3, failed=test)
self.check_executed_tests(output, [test]*3, failed=test,
stats=TestStats(1, 1))
def check_leak(self, code, what):
test = self.create_test('huntrleaks', code=code)
@ -896,7 +938,7 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests('--huntrleaks', '3:3:', test,
exitcode=EXITCODE_BAD_TEST,
stderr=subprocess.STDOUT)
self.check_executed_tests(output, [test], failed=test)
self.check_executed_tests(output, [test], failed=test, stats=1)
line = 'beginning 6 repetitions\n123456\n......\n'
self.check_line(output, re.escape(line))
@ -978,7 +1020,7 @@ class ArgsTestCase(BaseTestCase):
tests = [crash_test]
output = self.run_tests("-j2", *tests, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, tests, failed=crash_test,
randomize=True)
randomize=True, stats=0)
def parse_methods(self, output):
regex = re.compile("^(test[^ ]+).*ok$", flags=re.MULTILINE)
@ -1073,13 +1115,14 @@ class ArgsTestCase(BaseTestCase):
# don't fail by default
output = self.run_tests(testname)
self.check_executed_tests(output, [testname], env_changed=testname)
self.check_executed_tests(output, [testname],
env_changed=testname, stats=1)
# fail with --fail-env-changed
output = self.run_tests("--fail-env-changed", testname,
exitcode=EXITCODE_ENV_CHANGED)
self.check_executed_tests(output, [testname], env_changed=testname,
fail_env_changed=True)
fail_env_changed=True, stats=1)
def test_rerun_fail(self):
# FAILURE then FAILURE
@ -1098,7 +1141,9 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, [testname],
failed=testname, rerun={testname: "test_fail_always"})
failed=testname,
rerun={testname: "test_fail_always"},
stats=TestStats(1, 1))
def test_rerun_success(self):
# FAILURE then SUCCESS
@ -1119,7 +1164,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-w", testname, exitcode=0)
self.check_executed_tests(output, [testname],
rerun={testname: "test_fail_once"})
rerun={testname: "test_fail_once"},
stats=1)
def test_rerun_setup_class_hook_failure(self):
# FAILURE then FAILURE
@ -1139,7 +1185,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
rerun={testname: "ExampleTests"})
rerun={testname: "ExampleTests"},
stats=0)
def test_rerun_teardown_class_hook_failure(self):
# FAILURE then FAILURE
@ -1159,7 +1206,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
rerun={testname: "ExampleTests"})
rerun={testname: "ExampleTests"},
stats=1)
def test_rerun_setup_module_hook_failure(self):
# FAILURE then FAILURE
@ -1178,7 +1226,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
rerun={testname: testname})
rerun={testname: testname},
stats=0)
def test_rerun_teardown_module_hook_failure(self):
# FAILURE then FAILURE
@ -1197,7 +1246,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
rerun={testname: testname})
rerun={testname: testname},
stats=1)
def test_rerun_setup_hook_failure(self):
# FAILURE then FAILURE
@ -1216,7 +1266,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
rerun={testname: "test_success"})
rerun={testname: "test_success"},
stats=1)
def test_rerun_teardown_hook_failure(self):
# FAILURE then FAILURE
@ -1235,7 +1286,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
rerun={testname: "test_success"})
rerun={testname: "test_success"},
stats=1)
def test_rerun_async_setup_hook_failure(self):
# FAILURE then FAILURE
@ -1254,7 +1306,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
rerun={testname: "test_success"})
rerun={testname: "test_success"},
stats=1)
def test_rerun_async_teardown_hook_failure(self):
# FAILURE then FAILURE
@ -1273,7 +1326,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
rerun={testname: "test_success"})
rerun={testname: "test_success"},
stats=1)
def test_no_tests_ran(self):
code = textwrap.dedent("""
@ -1287,7 +1341,9 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests(testname, "-m", "nosuchtest",
exitcode=EXITCODE_NO_TESTS_RAN)
self.check_executed_tests(output, [testname], no_test_ran=testname)
self.check_executed_tests(output, [testname],
run_no_tests=testname,
stats=0)
def test_no_tests_ran_skip(self):
code = textwrap.dedent("""
@ -1300,7 +1356,8 @@ class ArgsTestCase(BaseTestCase):
testname = self.create_test(code=code)
output = self.run_tests(testname)
self.check_executed_tests(output, [testname])
self.check_executed_tests(output, [testname],
stats=TestStats(1, skipped=1))
def test_no_tests_ran_multiple_tests_nonexistent(self):
code = textwrap.dedent("""
@ -1316,7 +1373,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests(testname, testname2, "-m", "nosuchtest",
exitcode=EXITCODE_NO_TESTS_RAN)
self.check_executed_tests(output, [testname, testname2],
no_test_ran=[testname, testname2])
run_no_tests=[testname, testname2],
stats=0)
def test_no_test_ran_some_test_exist_some_not(self):
code = textwrap.dedent("""
@ -1339,7 +1397,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests(testname, testname2, "-m", "nosuchtest",
"-m", "test_other_bug", exitcode=0)
self.check_executed_tests(output, [testname, testname2],
no_test_ran=[testname])
run_no_tests=[testname],
stats=1)
@support.cpython_only
def test_uncollectable(self):
@ -1366,7 +1425,8 @@ class ArgsTestCase(BaseTestCase):
exitcode=EXITCODE_ENV_CHANGED)
self.check_executed_tests(output, [testname],
env_changed=[testname],
fail_env_changed=True)
fail_env_changed=True,
stats=1)
def test_multiprocessing_timeout(self):
code = textwrap.dedent(r"""
@ -1392,7 +1452,7 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-j2", "--timeout=1.0", testname,
exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, [testname],
failed=testname)
failed=testname, stats=0)
self.assertRegex(output,
re.compile('%s timed out' % testname, re.MULTILINE))
@ -1426,7 +1486,8 @@ class ArgsTestCase(BaseTestCase):
exitcode=EXITCODE_ENV_CHANGED)
self.check_executed_tests(output, [testname],
env_changed=[testname],
fail_env_changed=True)
fail_env_changed=True,
stats=1)
self.assertIn("Warning -- Unraisable exception", output)
self.assertIn("Exception: weakref callback bug", output)
@ -1458,7 +1519,8 @@ class ArgsTestCase(BaseTestCase):
exitcode=EXITCODE_ENV_CHANGED)
self.check_executed_tests(output, [testname],
env_changed=[testname],
fail_env_changed=True)
fail_env_changed=True,
stats=1)
self.assertIn("Warning -- Uncaught thread exception", output)
self.assertIn("Exception: bug in thread", output)
@ -1499,7 +1561,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests(*cmd, exitcode=EXITCODE_ENV_CHANGED)
self.check_executed_tests(output, [testname],
env_changed=[testname],
fail_env_changed=True)
fail_env_changed=True,
stats=1)
self.assertRegex(output, regex)
def test_unicode_guard_env(self):
@ -1546,7 +1609,8 @@ class ArgsTestCase(BaseTestCase):
self.check_executed_tests(output, testnames,
env_changed=testnames,
fail_env_changed=True,
randomize=True)
randomize=True,
stats=len(testnames))
for testname in testnames:
self.assertIn(f"Warning -- {testname} leaked temporary "
f"files (1): mytmpfile",
@ -1585,7 +1649,47 @@ class ArgsTestCase(BaseTestCase):
exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, [testname],
failed=[testname],
randomize=True)
randomize=True,
stats=0)
def test_doctest(self):
code = textwrap.dedent(fr'''
import doctest
import sys
from test import support
def my_function():
"""
Pass:
>>> 1 + 1
2
Failure:
>>> 2 + 3
23
>>> 1 + 1
11
Skipped test (ignored):
>>> id(1.0) # doctest: +SKIP
7948648
"""
def test_main():
testmod = sys.modules[__name__]
return support.run_doctest(testmod)
''')
testname = self.create_test(code=code)
output = self.run_tests("--fail-env-changed", "-v", "-j1", testname,
exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, [testname],
failed=[testname],
randomize=True,
stats=TestStats(3, 2, 0))
class TestUtils(unittest.TestCase):

View file

@ -4250,7 +4250,7 @@ def test_main(module=None):
old_factories = None
try:
support.run_unittest(*test_classes)
return support.run_unittest(*test_classes)
finally:
from xml.etree import ElementPath
# Restore mapping and path cache