bpo-36725: Refactor regrtest multiprocessing code (GH-12961)

Rewrite run_tests_multiprocess() function as a new MultiprocessRunner
class with multiple methods to better report errors and stop
immediately when needed.

Changes:

* Worker processes are now killed immediately if tests are
  interrupted or if a test does crash (CHILD_ERROR): worker
  processes are killed.
* Rewrite how errors in a worker thread are reported to
  the main thread. No longer ignore BaseException or parsing errors
  silently.
* Remove 'finished' variable: use worker.is_alive() instead
* Always compute omitted tests. Add Regrtest.get_executed() method.
This commit is contained in:
Victor Stinner 2019-04-26 08:40:25 +02:00 committed by GitHub
parent 87d23a041d
commit 3cde440f20
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 207 additions and 154 deletions

View file

@ -79,8 +79,8 @@ class Regrtest:
self.skipped = [] self.skipped = []
self.resource_denieds = [] self.resource_denieds = []
self.environment_changed = [] self.environment_changed = []
self.rerun = []
self.run_no_tests = [] self.run_no_tests = []
self.rerun = []
self.first_result = None self.first_result = None
self.interrupted = False self.interrupted = False
@ -105,6 +105,11 @@ class Regrtest:
# used by --junit-xml # used by --junit-xml
self.testsuite_xml = None self.testsuite_xml = None
def get_executed(self):
return (set(self.good) | set(self.bad) | set(self.skipped)
| set(self.resource_denieds) | set(self.environment_changed)
| set(self.run_no_tests))
def accumulate_result(self, result): def accumulate_result(self, result):
test_name = result.test_name test_name = result.test_name
ok = result.result ok = result.result
@ -311,8 +316,6 @@ class Regrtest:
self.bad.remove(test_name) self.bad.remove(test_name)
if ok.result == INTERRUPTED: if ok.result == INTERRUPTED:
# print a newline separate from the ^C
print()
self.interrupted = True self.interrupted = True
break break
else: else:
@ -331,11 +334,11 @@ class Regrtest:
print("== Tests result: %s ==" % self.get_tests_result()) print("== Tests result: %s ==" % self.get_tests_result())
if self.interrupted: if self.interrupted:
print()
# print a newline after ^C
print("Test suite interrupted by signal SIGINT.") print("Test suite interrupted by signal SIGINT.")
executed = set(self.good) | set(self.bad) | set(self.skipped)
omitted = set(self.selected) - executed omitted = set(self.selected) - self.get_executed()
if omitted:
print()
print(count(len(omitted), "test"), "omitted:") print(count(len(omitted), "test"), "omitted:")
printlist(omitted) printlist(omitted)

View file

@ -275,6 +275,7 @@ def _runtest_inner(ns, test_name, display_failure=True):
except support.TestDidNotRun: except support.TestDidNotRun:
return TEST_DID_NOT_RUN return TEST_DID_NOT_RUN
except KeyboardInterrupt: except KeyboardInterrupt:
print()
return INTERRUPTED return INTERRUPTED
except: except:
if not ns.pgo: if not ns.pgo:

View file

@ -3,9 +3,11 @@ import faulthandler
import json import json
import os import os
import queue import queue
import subprocess
import sys import sys
import threading import threading
import time import time
import traceback
import types import types
from test import support from test import support
@ -19,20 +21,12 @@ from test.libregrtest.utils import format_duration
# Display the running tests if nothing happened last N seconds # Display the running tests if nothing happened last N seconds
PROGRESS_UPDATE = 30.0 # seconds PROGRESS_UPDATE = 30.0 # seconds
# If interrupted, display the wait progress every N seconds
WAIT_PROGRESS = 2.0 # seconds def must_stop(result):
return result.result in (INTERRUPTED, CHILD_ERROR)
def run_test_in_subprocess(testname, ns): def run_test_in_subprocess(testname, ns):
"""Run the given test in a subprocess with --worker-args.
ns is the option Namespace parsed from command-line arguments. regrtest
is invoked in a subprocess with the --worker-args argument; when the
subprocess exits, its return code, stdout and stderr are returned as a
3-tuple.
"""
from subprocess import Popen, PIPE
ns_dict = vars(ns) ns_dict = vars(ns)
worker_args = (ns_dict, testname) worker_args = (ns_dict, testname)
worker_args = json.dumps(worker_args) worker_args = json.dumps(worker_args)
@ -47,15 +41,12 @@ def run_test_in_subprocess(testname, ns):
# Running the child from the same working directory as regrtest's original # Running the child from the same working directory as regrtest's original
# invocation ensures that TEMPDIR for the child is the same when # invocation ensures that TEMPDIR for the child is the same when
# sysconfig.is_python_build() is true. See issue 15300. # sysconfig.is_python_build() is true. See issue 15300.
popen = Popen(cmd, return subprocess.Popen(cmd,
stdout=PIPE, stderr=PIPE, stdout=subprocess.PIPE,
universal_newlines=True, stderr=subprocess.PIPE,
close_fds=(os.name != 'nt'), universal_newlines=True,
cwd=support.SAVEDCWD) close_fds=(os.name != 'nt'),
with popen: cwd=support.SAVEDCWD)
stdout, stderr = popen.communicate()
retcode = popen.wait()
return retcode, stdout, stderr
def run_tests_worker(worker_args): def run_tests_worker(worker_args):
@ -66,7 +57,6 @@ def run_tests_worker(worker_args):
result = runtest(ns, testname) result = runtest(ns, testname)
print() # Force a newline (just in case) print() # Force a newline (just in case)
print(json.dumps(result), flush=True) print(json.dumps(result), flush=True)
sys.exit(0) sys.exit(0)
@ -77,7 +67,6 @@ class MultiprocessIterator:
"""A thread-safe iterator over tests for multiprocess mode.""" """A thread-safe iterator over tests for multiprocess mode."""
def __init__(self, tests): def __init__(self, tests):
self.interrupted = False
self.lock = threading.Lock() self.lock = threading.Lock()
self.tests = tests self.tests = tests
@ -86,8 +75,6 @@ class MultiprocessIterator:
def __next__(self): def __next__(self):
with self.lock: with self.lock:
if self.interrupted:
raise StopIteration('tests interrupted')
return next(self.tests) return next(self.tests)
@ -102,143 +89,202 @@ class MultiprocessThread(threading.Thread):
self.ns = ns self.ns = ns
self.current_test_name = None self.current_test_name = None
self.start_time = None self.start_time = None
self._popen = None
def _runtest(self): def kill(self):
try: if not self.is_alive():
test_name = next(self.pending) return
except StopIteration: if self._popen is not None:
self.output.put(None) self._popen.kill()
return True
def _runtest(self, test_name):
try: try:
self.start_time = time.monotonic() self.start_time = time.monotonic()
self.current_test_name = test_name self.current_test_name = test_name
retcode, stdout, stderr = run_test_in_subprocess(test_name, self.ns) popen = run_test_in_subprocess(test_name, self.ns)
self._popen = popen
with popen:
try:
stdout, stderr = popen.communicate()
except:
popen.kill()
popen.wait()
raise
retcode = popen.wait()
finally: finally:
self.current_test_name = None self.current_test_name = None
self._popen = None
stdout = stdout.strip()
stderr = stderr.rstrip()
err_msg = None
if retcode != 0: if retcode != 0:
err_msg = "Exit code %s" % retcode
else:
stdout, _, result = stdout.rpartition("\n")
stdout = stdout.rstrip()
if not result:
err_msg = "Failed to parse worker stdout"
else:
try:
# deserialize run_tests_worker() output
result = json.loads(result)
result = TestResult(*result)
except Exception as exc:
err_msg = "Failed to parse worker JSON: %s" % exc
if err_msg is not None:
test_time = time.monotonic() - self.start_time test_time = time.monotonic() - self.start_time
result = TestResult(test_name, CHILD_ERROR, test_time, None) result = TestResult(test_name, CHILD_ERROR, test_time, None)
err_msg = "Exit code %s" % retcode
mp_result = MultiprocessResult(result, stdout.rstrip(), stderr.rstrip(), err_msg)
self.output.put(mp_result)
return False
stdout, _, result = stdout.strip().rpartition("\n") return MultiprocessResult(result, stdout, stderr, err_msg)
if not result:
self.output.put(None)
return True
# deserialize run_tests_worker() output
result = json.loads(result)
result = TestResult(*result)
mp_result = MultiprocessResult(result, stdout.rstrip(), stderr.rstrip(), None)
self.output.put(mp_result)
return False
def run(self): def run(self):
while True:
try:
try:
test_name = next(self.pending)
except StopIteration:
break
mp_result = self._runtest(test_name)
self.output.put((False, mp_result))
if must_stop(mp_result.result):
break
except BaseException:
self.output.put((True, traceback.format_exc()))
break
def get_running(workers):
running = []
for worker in workers:
current_test_name = worker.current_test_name
if not current_test_name:
continue
dt = time.monotonic() - worker.start_time
if dt >= PROGRESS_MIN_TIME:
text = '%s (%s)' % (current_test_name, format_duration(dt))
running.append(text)
return running
class MultiprocessRunner:
def __init__(self, regrtest):
self.regrtest = regrtest
self.ns = regrtest.ns
self.output = queue.Queue()
self.pending = MultiprocessIterator(self.regrtest.tests)
if self.ns.timeout is not None:
self.test_timeout = self.ns.timeout * 1.5
else:
self.test_timeout = None
self.workers = None
def start_workers(self):
self.workers = [MultiprocessThread(self.pending, self.output, self.ns)
for _ in range(self.ns.use_mp)]
print("Run tests in parallel using %s child processes"
% len(self.workers))
for worker in self.workers:
worker.start()
def wait_workers(self):
for worker in self.workers:
worker.kill()
for worker in self.workers:
worker.join()
def _get_result(self):
if not any(worker.is_alive() for worker in self.workers):
# all worker threads are done: consume pending results
try:
return self.output.get(timeout=0)
except queue.Empty:
return None
while True:
if self.test_timeout is not None:
faulthandler.dump_traceback_later(self.test_timeout, exit=True)
# wait for a thread
timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME)
try:
return self.output.get(timeout=timeout)
except queue.Empty:
pass
# display progress
running = get_running(self.workers)
if running and not self.ns.pgo:
print('running: %s' % ', '.join(running), flush=True)
def display_result(self, mp_result):
result = mp_result.result
text = format_test_result(result)
if mp_result.error_msg is not None:
# CHILD_ERROR
text += ' (%s)' % mp_result.error_msg
elif (result.test_time >= PROGRESS_MIN_TIME and not self.ns.pgo):
text += ' (%s)' % format_duration(result.test_time)
running = get_running(self.workers)
if running and not self.ns.pgo:
text += ' -- running: %s' % ', '.join(running)
self.regrtest.display_progress(self.test_index, text)
def _process_result(self, item):
if item[0]:
# Thread got an exception
format_exc = item[1]
print(f"regrtest worker thread failed: {format_exc}",
file=sys.stderr, flush=True)
return True
self.test_index += 1
mp_result = item[1]
self.regrtest.accumulate_result(mp_result.result)
self.display_result(mp_result)
if mp_result.stdout:
print(mp_result.stdout, flush=True)
if mp_result.stderr and not self.ns.pgo:
print(mp_result.stderr, file=sys.stderr, flush=True)
if mp_result.result.result == INTERRUPTED:
self.regrtest.interrupted = True
if must_stop(mp_result.result):
return True
return False
def run_tests(self):
self.start_workers()
self.test_index = 0
try: try:
stop = False while True:
while not stop: item = self._get_result()
stop = self._runtest() if item is None:
except BaseException: break
self.output.put(None)
raise stop = self._process_result(item)
if stop:
break
except KeyboardInterrupt:
print()
self.regrtest.interrupted = True
finally:
if self.test_timeout is not None:
faulthandler.cancel_dump_traceback_later()
self.wait_workers()
def run_tests_multiprocess(regrtest): def run_tests_multiprocess(regrtest):
output = queue.Queue() MultiprocessRunner(regrtest).run_tests()
pending = MultiprocessIterator(regrtest.tests)
test_timeout = regrtest.ns.timeout
use_timeout = (test_timeout is not None)
workers = [MultiprocessThread(pending, output, regrtest.ns)
for i in range(regrtest.ns.use_mp)]
print("Run tests in parallel using %s child processes"
% len(workers))
for worker in workers:
worker.start()
def get_running(workers):
running = []
for worker in workers:
current_test_name = worker.current_test_name
if not current_test_name:
continue
dt = time.monotonic() - worker.start_time
if dt >= PROGRESS_MIN_TIME:
text = '%s (%s)' % (current_test_name, format_duration(dt))
running.append(text)
return running
finished = 0
test_index = 1
get_timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME)
try:
while finished < regrtest.ns.use_mp:
if use_timeout:
faulthandler.dump_traceback_later(test_timeout, exit=True)
try:
mp_result = output.get(timeout=get_timeout)
except queue.Empty:
running = get_running(workers)
if running and not regrtest.ns.pgo:
print('running: %s' % ', '.join(running), flush=True)
continue
if mp_result is None:
finished += 1
continue
result = mp_result.result
regrtest.accumulate_result(result)
# Display progress
ok = result.result
text = format_test_result(result)
if (ok not in (CHILD_ERROR, INTERRUPTED)
and result.test_time >= PROGRESS_MIN_TIME
and not regrtest.ns.pgo):
text += ' (%s)' % format_duration(result.test_time)
elif ok == CHILD_ERROR:
text = '%s (%s)' % (text, mp_result.error_msg)
running = get_running(workers)
if running and not regrtest.ns.pgo:
text += ' -- running: %s' % ', '.join(running)
regrtest.display_progress(test_index, text)
# Copy stdout and stderr from the child process
if mp_result.stdout:
print(mp_result.stdout, flush=True)
if mp_result.stderr and not regrtest.ns.pgo:
print(mp_result.stderr, file=sys.stderr, flush=True)
if result.result == INTERRUPTED:
raise KeyboardInterrupt
test_index += 1
except KeyboardInterrupt:
regrtest.interrupted = True
pending.interrupted = True
print()
finally:
if use_timeout:
faulthandler.cancel_dump_traceback_later()
# If tests are interrupted, wait until tests complete
wait_start = time.monotonic()
while True:
running = [worker.current_test_name for worker in workers]
running = list(filter(bool, running))
if not running:
break
dt = time.monotonic() - wait_start
line = "Waiting for %s (%s tests)" % (', '.join(running), len(running))
if dt >= WAIT_PROGRESS:
line = "%s since %.0f sec" % (line, dt)
print(line, flush=True)
for worker in workers:
worker.join(WAIT_PROGRESS)

View file

@ -916,13 +916,13 @@ class ArgsTestCase(BaseTestCase):
testname) testname)
self.assertEqual(output.splitlines(), all_methods) self.assertEqual(output.splitlines(), all_methods)
@support.cpython_only
def test_crashed(self): def test_crashed(self):
# Any code which causes a crash # Any code which causes a crash
code = 'import faulthandler; faulthandler._sigsegv()' code = 'import faulthandler; faulthandler._sigsegv()'
crash_test = self.create_test(name="crash", code=code) crash_test = self.create_test(name="crash", code=code)
ok_test = self.create_test(name="ok")
tests = [crash_test, ok_test] tests = [crash_test]
output = self.run_tests("-j2", *tests, exitcode=2) output = self.run_tests("-j2", *tests, exitcode=2)
self.check_executed_tests(output, tests, failed=crash_test, self.check_executed_tests(output, tests, failed=crash_test,
randomize=True) randomize=True)

View file

@ -0,0 +1,3 @@
When using mulitprocessing mode (-jN), regrtest now better reports errors if
a worker process fails, and it exits immediately on a worker thread failure
or when interrupted.