gh-109162: libregrtest: move code around (#109253)

* Move Regrtest.display_header() to utils.py.
* Move cleanup_temp_dir() to utils.py.
* Move list_cases() to findtests.py.
This commit is contained in:
Victor Stinner 2023-09-11 10:52:03 +02:00 committed by GitHub
parent a9b1f84790
commit c439f6a72d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 105 additions and 93 deletions

View file

@ -1,6 +1,12 @@
import os import os
import sys
import unittest
from .utils import StrPath, TestName, TestList from test import support
from .utils import (
StrPath, TestName, TestTuple, TestList, FilterTuple,
abs_module_name, count, printlist)
# If these test directories are encountered recurse into them and treat each # If these test directories are encountered recurse into them and treat each
@ -56,3 +62,37 @@ def split_test_packages(tests, *, testdir: StrPath | None = None, exclude=(),
else: else:
splitted.append(name) splitted.append(name)
return splitted return splitted
def _list_cases(suite):
for test in suite:
if isinstance(test, unittest.loader._FailedTest):
continue
if isinstance(test, unittest.TestSuite):
_list_cases(test)
elif isinstance(test, unittest.TestCase):
if support.match_test(test):
print(test.id())
def list_cases(tests: TestTuple, *,
match_tests: FilterTuple | None = None,
ignore_tests: FilterTuple | None = None,
test_dir: StrPath | None = None):
support.verbose = False
support.set_match_tests(match_tests, ignore_tests)
skipped = []
for test_name in tests:
module_name = abs_module_name(test_name, test_dir)
try:
suite = unittest.defaultTestLoader.loadTestsFromName(module_name)
_list_cases(suite)
except unittest.SkipTest:
skipped.append(test_name)
if skipped:
sys.stdout.flush()
stderr = sys.stderr
print(file=stderr)
print(count(len(skipped), "test"), "skipped:", file=stderr)
printlist(skipped, file=stderr)

View file

@ -1,17 +1,14 @@
import locale
import os import os
import platform
import random import random
import re import re
import sys import sys
import time import time
import unittest
from test import support from test import support
from test.support import os_helper from test.support import os_helper
from .cmdline import _parse_args, Namespace from .cmdline import _parse_args, Namespace
from .findtests import findtests, split_test_packages from .findtests import findtests, split_test_packages, list_cases
from .logger import Logger from .logger import Logger
from .result import State from .result import State
from .runtests import RunTests, HuntRefleak from .runtests import RunTests, HuntRefleak
@ -22,8 +19,8 @@ from .results import TestResults
from .utils import ( from .utils import (
StrPath, StrJSON, TestName, TestList, TestTuple, FilterTuple, StrPath, StrJSON, TestName, TestList, TestTuple, FilterTuple,
strip_py_suffix, count, format_duration, strip_py_suffix, count, format_duration,
printlist, get_build_info, get_temp_dir, get_work_dir, exit_timeout, printlist, get_temp_dir, get_work_dir, exit_timeout,
abs_module_name) display_header, cleanup_temp_dir)
class Regrtest: class Regrtest:
@ -214,36 +211,6 @@ class Regrtest:
for name in tests: for name in tests:
print(name) print(name)
def _list_cases(self, suite):
for test in suite:
if isinstance(test, unittest.loader._FailedTest):
continue
if isinstance(test, unittest.TestSuite):
self._list_cases(test)
elif isinstance(test, unittest.TestCase):
if support.match_test(test):
print(test.id())
def list_cases(self, tests: TestTuple):
support.verbose = False
support.set_match_tests(self.match_tests, self.ignore_tests)
skipped = []
for test_name in tests:
module_name = abs_module_name(test_name, self.test_dir)
try:
suite = unittest.defaultTestLoader.loadTestsFromName(module_name)
self._list_cases(suite)
except unittest.SkipTest:
skipped.append(test_name)
if skipped:
sys.stdout.flush()
stderr = sys.stderr
print(file=stderr)
print(count(len(skipped), "test"), "skipped:", file=stderr)
printlist(skipped, file=stderr)
def _rerun_failed_tests(self, runtests: RunTests): def _rerun_failed_tests(self, runtests: RunTests):
# Configure the runner to re-run tests # Configure the runner to re-run tests
if self.num_workers == 0: if self.num_workers == 0:
@ -363,45 +330,6 @@ class Regrtest:
return tracer return tracer
@staticmethod
def display_header():
# Print basic platform information
print("==", platform.python_implementation(), *sys.version.split())
print("==", platform.platform(aliased=True),
"%s-endian" % sys.byteorder)
print("== Python build:", ' '.join(get_build_info()))
print("== cwd:", os.getcwd())
cpu_count = os.cpu_count()
if cpu_count:
print("== CPU count:", cpu_count)
print("== encodings: locale=%s, FS=%s"
% (locale.getencoding(), sys.getfilesystemencoding()))
# This makes it easier to remember what to set in your local
# environment when trying to reproduce a sanitizer failure.
asan = support.check_sanitizer(address=True)
msan = support.check_sanitizer(memory=True)
ubsan = support.check_sanitizer(ub=True)
sanitizers = []
if asan:
sanitizers.append("address")
if msan:
sanitizers.append("memory")
if ubsan:
sanitizers.append("undefined behavior")
if not sanitizers:
return
print(f"== sanitizers: {', '.join(sanitizers)}")
for sanitizer, env_var in (
(asan, "ASAN_OPTIONS"),
(msan, "MSAN_OPTIONS"),
(ubsan, "UBSAN_OPTIONS"),
):
options= os.environ.get(env_var)
if sanitizer and options is not None:
print(f"== {env_var}={options!r}")
def get_state(self): def get_state(self):
state = self.results.get_state(self.fail_env_changed) state = self.results.get_state(self.fail_env_changed)
if self.first_state: if self.first_state:
@ -445,20 +373,6 @@ class Regrtest:
state = self.get_state() state = self.get_state()
print(f"Result: {state}") print(f"Result: {state}")
@staticmethod
def cleanup_temp_dir(tmp_dir: StrPath):
import glob
path = os.path.join(glob.escape(tmp_dir), 'test_python_*')
print("Cleanup %s directory" % tmp_dir)
for name in glob.glob(path):
if os.path.isdir(name):
print("Remove directory: %s" % name)
os_helper.rmtree(name)
else:
print("Remove file: %s" % name)
os_helper.unlink(name)
def create_run_tests(self, tests: TestTuple): def create_run_tests(self, tests: TestTuple):
return RunTests( return RunTests(
tests, tests,
@ -496,7 +410,7 @@ class Regrtest:
if (self.want_header if (self.want_header
or not(self.pgo or self.quiet or self.single_test_run or not(self.pgo or self.quiet or self.single_test_run
or tests or self.cmdline_args)): or tests or self.cmdline_args)):
self.display_header() display_header()
if self.randomize: if self.randomize:
print("Using random seed", self.random_seed) print("Using random seed", self.random_seed)
@ -554,7 +468,7 @@ class Regrtest:
self.tmp_dir = get_temp_dir(self.tmp_dir) self.tmp_dir = get_temp_dir(self.tmp_dir)
if self.want_cleanup: if self.want_cleanup:
self.cleanup_temp_dir(self.tmp_dir) cleanup_temp_dir(self.tmp_dir)
sys.exit(0) sys.exit(0)
if self.want_wait: if self.want_wait:
@ -567,7 +481,10 @@ class Regrtest:
if self.want_list_tests: if self.want_list_tests:
self.list_tests(selected) self.list_tests(selected)
elif self.want_list_cases: elif self.want_list_cases:
self.list_cases(selected) list_cases(selected,
match_tests=self.match_tests,
ignore_tests=self.ignore_tests,
test_dir=self.test_dir)
else: else:
exitcode = self.run_tests(selected, tests) exitcode = self.run_tests(selected, tests)

View file

@ -1,8 +1,10 @@
import atexit import atexit
import contextlib import contextlib
import faulthandler import faulthandler
import locale
import math import math
import os.path import os.path
import platform
import random import random
import sys import sys
import sysconfig import sysconfig
@ -524,3 +526,56 @@ def adjust_rlimit_nofile():
except (ValueError, OSError) as err: except (ValueError, OSError) as err:
print_warning(f"Unable to raise RLIMIT_NOFILE from {fd_limit} to " print_warning(f"Unable to raise RLIMIT_NOFILE from {fd_limit} to "
f"{new_fd_limit}: {err}.") f"{new_fd_limit}: {err}.")
def display_header():
# Print basic platform information
print("==", platform.python_implementation(), *sys.version.split())
print("==", platform.platform(aliased=True),
"%s-endian" % sys.byteorder)
print("== Python build:", ' '.join(get_build_info()))
print("== cwd:", os.getcwd())
cpu_count = os.cpu_count()
if cpu_count:
print("== CPU count:", cpu_count)
print("== encodings: locale=%s, FS=%s"
% (locale.getencoding(), sys.getfilesystemencoding()))
# This makes it easier to remember what to set in your local
# environment when trying to reproduce a sanitizer failure.
asan = support.check_sanitizer(address=True)
msan = support.check_sanitizer(memory=True)
ubsan = support.check_sanitizer(ub=True)
sanitizers = []
if asan:
sanitizers.append("address")
if msan:
sanitizers.append("memory")
if ubsan:
sanitizers.append("undefined behavior")
if not sanitizers:
return
print(f"== sanitizers: {', '.join(sanitizers)}")
for sanitizer, env_var in (
(asan, "ASAN_OPTIONS"),
(msan, "MSAN_OPTIONS"),
(ubsan, "UBSAN_OPTIONS"),
):
options= os.environ.get(env_var)
if sanitizer and options is not None:
print(f"== {env_var}={options!r}")
def cleanup_temp_dir(tmp_dir: StrPath):
import glob
path = os.path.join(glob.escape(tmp_dir), 'test_python_*')
print("Cleanup %s directory" % tmp_dir)
for name in glob.glob(path):
if os.path.isdir(name):
print("Remove directory: %s" % name)
os_helper.rmtree(name)
else:
print("Remove file: %s" % name)
os_helper.unlink(name)