gh-108834: regrtest reruns failed tests in subprocesses (#108839)

When using --rerun option, regrtest now re-runs failed tests
in verbose mode in fresh worker processes to have more
deterministic behavior. So it can write its final report even
if a test killed a worker progress.

Add --fail-rerun option to regrtest: exit with non-zero exit code
if a test failed pass passed when re-run in verbose mode (in a
fresh process). That's now more useful since tests can pass
when re-run in a fresh worker progress, whereas they failed
when run after other tests when tests are run sequentially.

Rename --verbose2 option (-w) to --rerun. Keep --verbose2 as a
deprecated alias.

Changes:

* Fix and enhance statistics in regrtest summary. Add "(filtered)"
  when --match and/or --ignore options are used.
* Add RunTests class.
* Add TestResult.get_rerun_match_tests() method
* Rewrite code to serialize/deserialize worker arguments as JSON
  using a new WorkerJob class.
* Fix stats when a test is run with --forever --rerun.
* If failed test names cannot be parsed, log a warning and don't
  filter tests.
* test_regrtest.test_rerun_success() now uses a marker file, since
  the test is re-run in a separated process.
* Add tests on normalize_test_name() function.
* Add test_success() and test_skip() tests to test_regrtest.
This commit is contained in:
Victor Stinner 2023-09-03 23:37:15 +02:00 committed by GitHub
parent c2ec174d24
commit 31c2945f14
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 819 additions and 478 deletions

View file

@ -11,11 +11,11 @@ import time
import unittest
from test.libregrtest.cmdline import _parse_args
from test.libregrtest.runtest import (
findtests, split_test_packages, runtest, get_abs_module,
PROGRESS_MIN_TIME, State)
findtests, split_test_packages, runtest, abs_module_name,
PROGRESS_MIN_TIME, State, MatchTestsDict, RunTests)
from test.libregrtest.setup import setup_tests
from test.libregrtest.pgo import setup_pgo_tests
from test.libregrtest.utils import (removepy, count, format_duration,
from test.libregrtest.utils import (strip_py_suffix, count, format_duration,
printlist, get_build_info)
from test import support
from test.support import TestStats
@ -28,14 +28,6 @@ from test.support import threading_helper
# Must be smaller than buildbot "1200 seconds without output" limit.
EXIT_TIMEOUT = 120.0
# gh-90681: When rerunning tests, we might need to rerun the whole
# class or module suite if some its life-cycle hooks fail.
# Test level hooks are not affected.
_TEST_LIFECYCLE_HOOKS = frozenset((
'setUpClass', 'tearDownClass',
'setUpModule', 'tearDownModule',
))
EXITCODE_BAD_TEST = 2
EXITCODE_INTERRUPTED = 130
EXITCODE_ENV_CHANGED = 3
@ -72,19 +64,22 @@ class Regrtest:
# tests
self.tests = []
self.selected = []
self.all_runtests: list[RunTests] = []
# test results
self.good = []
self.bad = []
self.skipped = []
self.resource_denied = []
self.environment_changed = []
self.run_no_tests = []
self.need_rerun = []
self.rerun = []
self.first_result = None
self.good: list[str] = []
self.bad: list[str] = []
self.rerun_bad: list[str] = []
self.skipped: list[str] = []
self.resource_denied: list[str] = []
self.environment_changed: list[str] = []
self.run_no_tests: list[str] = []
self.rerun: list[str] = []
self.need_rerun: list[TestResult] = []
self.first_state: str | None = None
self.interrupted = False
self.stats_dict: dict[str, TestStats] = {}
self.total_stats = TestStats()
# used by --slow
self.test_times = []
@ -94,7 +89,7 @@ class Regrtest:
# used to display the progress bar "[ 3/100]"
self.start_time = time.perf_counter()
self.test_count = ''
self.test_count_text = ''
self.test_count_width = 1
# used by --single
@ -107,7 +102,6 @@ class Regrtest:
# misc
self.win_load_tracker = None
self.tmp_dir = None
self.worker_test_name = None
def get_executed(self):
return (set(self.good) | set(self.bad) | set(self.skipped)
@ -115,11 +109,9 @@ class Regrtest:
| set(self.run_no_tests))
def accumulate_result(self, result, rerun=False):
fail_env_changed = self.ns.fail_env_changed
test_name = result.test_name
if result.has_meaningful_duration() and not rerun:
self.test_times.append((result.duration, test_name))
match result.state:
case State.PASSED:
self.good.append(test_name)
@ -128,25 +120,24 @@ class Regrtest:
case State.SKIPPED:
self.skipped.append(test_name)
case State.RESOURCE_DENIED:
self.skipped.append(test_name)
self.resource_denied.append(test_name)
case State.INTERRUPTED:
self.interrupted = True
case State.DID_NOT_RUN:
self.run_no_tests.append(test_name)
case _:
if result.is_failed(self.ns.fail_env_changed):
if not rerun:
self.bad.append(test_name)
self.need_rerun.append(result)
if result.is_failed(fail_env_changed):
self.bad.append(test_name)
self.need_rerun.append(result)
else:
raise ValueError(f"invalid test state: {state!r}")
raise ValueError(f"invalid test state: {result.state!r}")
if result.has_meaningful_duration() and not rerun:
self.test_times.append((result.duration, test_name))
if result.stats is not None:
self.stats_dict[result.test_name] = result.stats
if rerun and not(result.is_failed(False) or result.state == State.INTERRUPTED):
self.bad.remove(test_name)
self.total_stats.accumulate(result.stats)
if rerun:
self.rerun.append(test_name)
xml_data = result.xml_data
if xml_data:
@ -180,13 +171,15 @@ class Regrtest:
print(line, flush=True)
def display_progress(self, test_index, text):
if self.ns.quiet:
quiet = self.ns.quiet
pgo = self.ns.pgo
if quiet:
return
# "[ 51/405/1] test_tcl passed"
line = f"{test_index:{self.test_count_width}}{self.test_count}"
line = f"{test_index:{self.test_count_width}}{self.test_count_text}"
fails = len(self.bad) + len(self.environment_changed)
if fails and not self.ns.pgo:
if fails and not pgo:
line = f"{line}/{fails}"
self.log(f"[{line}] {text}")
@ -196,15 +189,7 @@ class Regrtest:
if ns.xmlpath:
support.junit_xml_list = self.testsuite_xml = []
worker_args = ns.worker_args
if worker_args is not None:
from test.libregrtest.runtest_mp import parse_worker_args
ns, test_name = parse_worker_args(ns.worker_args)
ns.worker_args = worker_args
self.worker_test_name = test_name
# Strip .py extensions.
removepy(ns.args)
strip_py_suffix(ns.args)
if ns.huntrleaks:
warmup, repetitions, _ = ns.huntrleaks
@ -221,9 +206,18 @@ class Regrtest:
self.ns = ns
def find_tests(self, tests):
ns = self.ns
single = ns.single
fromfile = ns.fromfile
pgo = ns.pgo
exclude = ns.exclude
test_dir = ns.testdir
starting_test = ns.start
randomize = ns.randomize
self.tests = tests
if self.ns.single:
if single:
self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest')
try:
with open(self.next_single_filename, 'r') as fp:
@ -232,12 +226,12 @@ class Regrtest:
except OSError:
pass
if self.ns.fromfile:
if fromfile:
self.tests = []
# regex to match 'test_builtin' in line:
# '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec'
regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b')
with open(os.path.join(os_helper.SAVEDCWD, self.ns.fromfile)) as fp:
with open(os.path.join(os_helper.SAVEDCWD, fromfile)) as fp:
for line in fp:
line = line.split('#', 1)[0]
line = line.strip()
@ -245,22 +239,22 @@ class Regrtest:
if match is not None:
self.tests.append(match.group())
removepy(self.tests)
strip_py_suffix(self.tests)
if self.ns.pgo:
if pgo:
# add default PGO tests if no tests are specified
setup_pgo_tests(self.ns)
setup_pgo_tests(ns)
exclude = set()
if self.ns.exclude:
for arg in self.ns.args:
exclude.add(arg)
self.ns.args = []
exclude_tests = set()
if exclude:
for arg in ns.args:
exclude_tests.add(arg)
ns.args = []
alltests = findtests(testdir=self.ns.testdir, exclude=exclude)
alltests = findtests(testdir=test_dir, exclude=exclude_tests)
if not self.ns.fromfile:
self.selected = self.tests or self.ns.args
if not fromfile:
self.selected = self.tests or ns.args
if self.selected:
self.selected = split_test_packages(self.selected)
else:
@ -268,7 +262,7 @@ class Regrtest:
else:
self.selected = self.tests
if self.ns.single:
if single:
self.selected = self.selected[:1]
try:
pos = alltests.index(self.selected[0])
@ -277,17 +271,17 @@ class Regrtest:
pass
# Remove all the selected tests that precede start if it's set.
if self.ns.start:
if starting_test:
try:
del self.selected[:self.selected.index(self.ns.start)]
del self.selected[:self.selected.index(starting_test)]
except ValueError:
print("Couldn't find starting test (%s), using all tests"
% self.ns.start, file=sys.stderr)
print(f"Cannot find starting test: {starting_test}")
sys.exit(1)
if self.ns.randomize:
if self.ns.random_seed is None:
self.ns.random_seed = random.randrange(10000000)
random.seed(self.ns.random_seed)
if randomize:
if ns.random_seed is None:
ns.random_seed = random.randrange(10000000)
random.seed(ns.random_seed)
random.shuffle(self.selected)
def list_tests(self):
@ -305,25 +299,63 @@ class Regrtest:
print(test.id())
def list_cases(self):
ns = self.ns
test_dir = ns.testdir
support.verbose = False
support.set_match_tests(self.ns.match_tests, self.ns.ignore_tests)
support.set_match_tests(ns.match_tests, ns.ignore_tests)
skipped = []
for test_name in self.selected:
abstest = get_abs_module(self.ns, test_name)
module_name = abs_module_name(test_name, test_dir)
try:
suite = unittest.defaultTestLoader.loadTestsFromName(abstest)
suite = unittest.defaultTestLoader.loadTestsFromName(module_name)
self._list_cases(suite)
except unittest.SkipTest:
self.skipped.append(test_name)
skipped.append(test_name)
if self.skipped:
print(file=sys.stderr)
print(count(len(self.skipped), "test"), "skipped:", file=sys.stderr)
printlist(self.skipped, file=sys.stderr)
if skipped:
sys.stdout.flush()
stderr = sys.stderr
print(file=stderr)
print(count(len(skipped), "test"), "skipped:", file=stderr)
printlist(skipped, file=stderr)
def rerun_failed_tests(self):
self.log()
def get_rerun_match(self, rerun_list) -> MatchTestsDict:
rerun_match_tests = {}
for result in rerun_list:
match_tests = result.get_rerun_match_tests()
# ignore empty match list
if match_tests:
rerun_match_tests[result.test_name] = match_tests
return rerun_match_tests
def _rerun_failed_tests(self, need_rerun):
# Configure the runner to re-run tests
ns = self.ns
ns.verbose = True
ns.failfast = False
ns.verbose3 = False
ns.forever = False
if ns.use_mp is None:
ns.use_mp = 1
# Get tests to re-run
tests = [result.test_name for result in need_rerun]
match_tests = self.get_rerun_match(need_rerun)
self.set_tests(tests)
# Clear previously failed tests
self.rerun_bad.extend(self.bad)
self.bad.clear()
self.need_rerun.clear()
# Re-run failed tests
self.log(f"Re-running {len(tests)} failed tests in verbose mode in subprocesses")
runtests = RunTests(tests, match_tests=match_tests, rerun=True)
self.all_runtests.append(runtests)
self._run_tests_mp(runtests)
def rerun_failed_tests(self, need_rerun):
if self.ns.python:
# Temp patch for https://github.com/python/cpython/issues/94052
self.log(
@ -332,45 +364,10 @@ class Regrtest:
)
return
self.ns.verbose = True
self.ns.failfast = False
self.ns.verbose3 = False
self.first_state = self.get_tests_state()
self.first_result = self.get_tests_result()
self.log("Re-running failed tests in verbose mode")
rerun_list = list(self.need_rerun)
self.need_rerun.clear()
for result in rerun_list:
test_name = result.test_name
self.rerun.append(test_name)
errors = result.errors or []
failures = result.failures or []
error_names = [
self.normalize_test_name(test_full_name, is_error=True)
for (test_full_name, *_) in errors]
failure_names = [
self.normalize_test_name(test_full_name)
for (test_full_name, *_) in failures]
self.ns.verbose = True
orig_match_tests = self.ns.match_tests
if errors or failures:
if self.ns.match_tests is None:
self.ns.match_tests = []
self.ns.match_tests.extend(error_names)
self.ns.match_tests.extend(failure_names)
matching = "matching: " + ", ".join(self.ns.match_tests)
self.log(f"Re-running {test_name} in verbose mode ({matching})")
else:
self.log(f"Re-running {test_name} in verbose mode")
result = runtest(self.ns, test_name)
self.ns.match_tests = orig_match_tests
self.accumulate_result(result, rerun=True)
if result.state == State.INTERRUPTED:
break
print()
self._rerun_failed_tests(need_rerun)
if self.bad:
print(count(len(self.bad), 'test'), "failed again:")
@ -378,28 +375,17 @@ class Regrtest:
self.display_result()
def normalize_test_name(self, test_full_name, *, is_error=False):
short_name = test_full_name.split(" ")[0]
if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
# This means that we have a failure in a life-cycle hook,
# we need to rerun the whole module or class suite.
# Basically the error looks like this:
# ERROR: setUpClass (test.test_reg_ex.RegTest)
# or
# ERROR: setUpModule (test.test_reg_ex)
# So, we need to parse the class / module name.
lpar = test_full_name.index('(')
rpar = test_full_name.index(')')
return test_full_name[lpar + 1: rpar].split('.')[-1]
return short_name
def display_result(self):
pgo = self.ns.pgo
quiet = self.ns.quiet
print_slow = self.ns.print_slow
# If running the test suite for PGO then no one cares about results.
if self.ns.pgo:
if pgo:
return
print()
print("== Tests result: %s ==" % self.get_tests_result())
print("== Tests result: %s ==" % self.get_tests_state())
if self.interrupted:
print("Test suite interrupted by signal SIGINT.")
@ -410,7 +396,7 @@ class Regrtest:
print(count(len(omitted), "test"), "omitted:")
printlist(omitted)
if self.good and not self.ns.quiet:
if self.good and not quiet:
print()
if (not self.bad
and not self.skipped
@ -419,7 +405,7 @@ class Regrtest:
print("All", end=' ')
print(count(len(self.good), "test"), "OK.")
if self.ns.print_slow:
if print_slow:
self.test_times.sort(reverse=True)
print()
print("10 slowest tests:")
@ -437,11 +423,16 @@ class Regrtest:
count(len(self.environment_changed), "test")))
printlist(self.environment_changed)
if self.skipped and not self.ns.quiet:
if self.skipped and not quiet:
print()
print(count(len(self.skipped), "test"), "skipped:")
printlist(self.skipped)
if self.resource_denied and not quiet:
print()
print(count(len(self.resource_denied), "test"), "skipped (resource denied):")
printlist(self.resource_denied)
if self.rerun:
print()
print("%s:" % count(len(self.rerun), "re-run test"))
@ -452,40 +443,58 @@ class Regrtest:
print(count(len(self.run_no_tests), "test"), "run no tests:")
printlist(self.run_no_tests)
def run_tests_sequential(self):
if self.ns.trace:
def run_test(self, test_index, test_name, previous_test, save_modules):
text = test_name
if previous_test:
text = '%s -- %s' % (text, previous_test)
self.display_progress(test_index, text)
if self.tracer:
# If we're tracing code coverage, then we don't exit with status
# if on a false return value from main.
cmd = ('result = runtest(self.ns, test_name); '
'self.accumulate_result(result)')
ns = dict(locals())
self.tracer.runctx(cmd, globals=globals(), locals=ns)
result = ns['result']
else:
result = runtest(self.ns, test_name)
self.accumulate_result(result)
# Unload the newly imported modules (best effort finalization)
for module in sys.modules.keys():
if module not in save_modules and module.startswith("test."):
support.unload(module)
return result
def run_tests_sequentially(self, runtests):
ns = self.ns
coverage = ns.trace
fail_fast = ns.failfast
fail_env_changed = ns.fail_env_changed
timeout = ns.timeout
if coverage:
import trace
self.tracer = trace.Trace(trace=False, count=True)
save_modules = sys.modules.keys()
msg = "Run tests sequentially"
if self.ns.timeout:
msg += " (timeout: %s)" % format_duration(self.ns.timeout)
if timeout:
msg += " (timeout: %s)" % format_duration(timeout)
self.log(msg)
previous_test = None
for test_index, test_name in enumerate(self.tests, 1):
tests_iter = runtests.iter_tests()
for test_index, test_name in enumerate(tests_iter, 1):
start_time = time.perf_counter()
text = test_name
if previous_test:
text = '%s -- %s' % (text, previous_test)
self.display_progress(test_index, text)
result = self.run_test(test_index, test_name,
previous_test, save_modules)
if self.tracer:
# If we're tracing code coverage, then we don't exit with status
# if on a false return value from main.
cmd = ('result = runtest(self.ns, test_name); '
'self.accumulate_result(result)')
ns = dict(locals())
self.tracer.runctx(cmd, globals=globals(), locals=ns)
result = ns['result']
else:
result = runtest(self.ns, test_name)
self.accumulate_result(result)
if result.state == State.INTERRUPTED:
if result.must_stop(fail_fast, fail_env_changed):
break
previous_test = str(result)
@ -496,26 +505,9 @@ class Regrtest:
# be quiet: say nothing if the test passed shortly
previous_test = None
# Unload the newly imported modules (best effort finalization)
for module in sys.modules.keys():
if module not in save_modules and module.startswith("test."):
support.unload(module)
if self.ns.failfast and result.is_failed(self.ns.fail_env_changed):
break
if previous_test:
print(previous_test)
def _test_forever(self, tests):
while True:
for test_name in tests:
yield test_name
if self.bad:
return
if self.ns.fail_env_changed and self.environment_changed:
return
def display_header(self):
# Print basic platform information
print("==", platform.python_implementation(), *sys.version.split())
@ -560,11 +552,13 @@ class Regrtest:
return not any((self.good, self.bad, self.skipped, self.interrupted,
self.environment_changed))
def get_tests_result(self):
def get_tests_state(self):
fail_env_changed = self.ns.fail_env_changed
result = []
if self.bad:
result.append("FAILURE")
elif self.ns.fail_env_changed and self.environment_changed:
elif fail_env_changed and self.environment_changed:
result.append("ENV CHANGED")
elif self.no_tests_run():
result.append("NO TESTS RAN")
@ -576,10 +570,40 @@ class Regrtest:
result.append("SUCCESS")
result = ', '.join(result)
if self.first_result:
result = '%s then %s' % (self.first_result, result)
if self.first_state:
result = '%s then %s' % (self.first_state, result)
return result
def _run_tests_mp(self, runtests: RunTests) -> None:
from test.libregrtest.runtest_mp import run_tests_multiprocess
# If we're on windows and this is the parent runner (not a worker),
# track the load average.
if sys.platform == 'win32':
from test.libregrtest.win_utils import WindowsLoadTracker
try:
self.win_load_tracker = WindowsLoadTracker()
except PermissionError as error:
# Standard accounts may not have access to the performance
# counters.
print(f'Failed to create WindowsLoadTracker: {error}')
try:
run_tests_multiprocess(self, runtests)
finally:
if self.win_load_tracker is not None:
self.win_load_tracker.close()
self.win_load_tracker = None
def set_tests(self, tests):
self.tests = tests
if self.ns.forever:
self.test_count_text = ''
self.test_count_width = 3
else:
self.test_count_text = '/{}'.format(len(self.tests))
self.test_count_width = len(self.test_count_text) - 1
def run_tests(self):
# For a partial run, we do not need to clutter the output.
if (self.ns.header
@ -597,37 +621,14 @@ class Regrtest:
if self.ns.randomize:
print("Using random seed", self.ns.random_seed)
if self.ns.forever:
self.tests = self._test_forever(list(self.selected))
self.test_count = ''
self.test_count_width = 3
else:
self.tests = iter(self.selected)
self.test_count = '/{}'.format(len(self.selected))
self.test_count_width = len(self.test_count) - 1
tests = self.selected
self.set_tests(tests)
runtests = RunTests(tests, forever=self.ns.forever)
self.all_runtests.append(runtests)
if self.ns.use_mp:
from test.libregrtest.runtest_mp import run_tests_multiprocess
# If we're on windows and this is the parent runner (not a worker),
# track the load average.
if sys.platform == 'win32' and self.worker_test_name is None:
from test.libregrtest.win_utils import WindowsLoadTracker
try:
self.win_load_tracker = WindowsLoadTracker()
except PermissionError as error:
# Standard accounts may not have access to the performance
# counters.
print(f'Failed to create WindowsLoadTracker: {error}')
try:
run_tests_multiprocess(self)
finally:
if self.win_load_tracker is not None:
self.win_load_tracker.close()
self.win_load_tracker = None
self._run_tests_mp(runtests)
else:
self.run_tests_sequential()
self.run_tests_sequentially(runtests)
def finalize(self):
if self.next_single_filename:
@ -642,23 +643,29 @@ class Regrtest:
r.write_results(show_missing=True, summary=True,
coverdir=self.ns.coverdir)
print()
self.display_summary()
if self.ns.runleaks:
os.system("leaks %d" % os.getpid())
self.save_xml_result()
def display_summary(self):
duration = time.perf_counter() - self.start_time
first_runtests = self.all_runtests[0]
# the second runtests (re-run failed tests) disables forever,
# use the first runtests
forever = first_runtests.forever
filtered = bool(self.ns.match_tests) or bool(self.ns.ignore_tests)
# Total duration
print()
print("Total duration: %s" % format_duration(duration))
# Total tests
total = TestStats()
for stats in self.stats_dict.values():
total.accumulate(stats)
stats = [f'run={total.tests_run:,}']
total = self.total_stats
text = f'run={total.tests_run:,}'
if filtered:
text = f"{text} (filtered)"
stats = [text]
if total.failures:
stats.append(f'failures={total.failures:,}')
if total.skipped:
@ -666,23 +673,31 @@ class Regrtest:
print(f"Total tests: {' '.join(stats)}")
# Total test files
report = [f'success={len(self.good)}']
if self.bad:
report.append(f'failed={len(self.bad)}')
if self.environment_changed:
report.append(f'env_changed={len(self.environment_changed)}')
if self.skipped:
report.append(f'skipped={len(self.skipped)}')
if self.resource_denied:
report.append(f'resource_denied={len(self.resource_denied)}')
if self.rerun:
report.append(f'rerun={len(self.rerun)}')
if self.run_no_tests:
report.append(f'run_no_tests={len(self.run_no_tests)}')
all_tests = [self.good, self.bad, self.rerun,
self.skipped,
self.environment_changed, self.run_no_tests]
run = sum(map(len, all_tests))
text = f'run={run}'
if not forever:
ntest = len(first_runtests.tests)
text = f"{text}/{ntest}"
if filtered:
text = f"{text} (filtered)"
report = [text]
for name, tests in (
('failed', self.bad),
('env_changed', self.environment_changed),
('skipped', self.skipped),
('resource_denied', self.resource_denied),
('rerun', self.rerun),
('run_no_tests', self.run_no_tests),
):
if tests:
report.append(f'{name}={len(tests)}')
print(f"Total test files: {' '.join(report)}")
# Result
result = self.get_tests_result()
result = self.get_tests_state()
print(f"Result: {result}")
def save_xml_result(self):
@ -742,6 +757,9 @@ class Regrtest:
self.tmp_dir = os.path.abspath(self.tmp_dir)
def is_worker(self):
return (self.ns.worker_args is not None)
def create_temp_dir(self):
os.makedirs(self.tmp_dir, exist_ok=True)
@ -754,7 +772,8 @@ class Regrtest:
nounce = random.randint(0, 1_000_000)
else:
nounce = os.getpid()
if self.worker_test_name is not None:
if self.is_worker():
test_cwd = 'test_python_worker_{}'.format(nounce)
else:
test_cwd = 'test_python_{}'.format(nounce)
@ -817,48 +836,53 @@ class Regrtest:
return None
def get_exitcode(self):
exitcode = 0
if self.bad:
exitcode = EXITCODE_BAD_TEST
elif self.interrupted:
exitcode = EXITCODE_INTERRUPTED
elif self.ns.fail_env_changed and self.environment_changed:
exitcode = EXITCODE_ENV_CHANGED
elif self.no_tests_run():
exitcode = EXITCODE_NO_TESTS_RAN
elif self.rerun and self.ns.fail_rerun:
exitcode = EXITCODE_BAD_TEST
return exitcode
def action_run_tests(self):
self.run_tests()
self.display_result()
need_rerun = self.need_rerun
if self.ns.rerun and need_rerun:
self.rerun_failed_tests(need_rerun)
self.display_summary()
self.finalize()
def _main(self, tests, kwargs):
if self.worker_test_name is not None:
if self.is_worker():
from test.libregrtest.runtest_mp import run_tests_worker
run_tests_worker(self.ns, self.worker_test_name)
run_tests_worker(self.ns.worker_args)
return
if self.ns.wait:
input("Press any key to continue...")
support.PGO = self.ns.pgo
support.PGO_EXTENDED = self.ns.pgo_extended
setup_tests(self.ns)
self.find_tests(tests)
exitcode = 0
if self.ns.list_tests:
self.list_tests()
sys.exit(0)
if self.ns.list_cases:
elif self.ns.list_cases:
self.list_cases()
sys.exit(0)
else:
self.action_run_tests()
exitcode = self.get_exitcode()
self.run_tests()
self.display_result()
if self.ns.verbose2 and self.bad:
self.rerun_failed_tests()
self.finalize()
self.save_xml_result()
if self.bad:
sys.exit(EXITCODE_BAD_TEST)
if self.interrupted:
sys.exit(EXITCODE_INTERRUPTED)
if self.ns.fail_env_changed and self.environment_changed:
sys.exit(EXITCODE_ENV_CHANGED)
if self.no_tests_run():
sys.exit(EXITCODE_NO_TESTS_RAN)
sys.exit(0)
sys.exit(exitcode)
def main(tests=None, **kwargs):