mirror of
https://github.com/python/cpython.git
synced 2025-08-30 13:38:43 +00:00
Issue #11393: Add the new faulthandler module
This commit is contained in:
parent
d85456279f
commit
024e37adcc
19 changed files with 1907 additions and 5 deletions
469
Lib/test/test_faulthandler.py
Normal file
469
Lib/test/test_faulthandler.py
Normal file
|
@ -0,0 +1,469 @@
|
|||
from contextlib import contextmanager
|
||||
import faulthandler
|
||||
import re
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
from test import support, script_helper
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
try:
|
||||
from resource import setrlimit, RLIMIT_CORE, error as resource_error
|
||||
except ImportError:
|
||||
prepare_subprocess = None
|
||||
else:
|
||||
def prepare_subprocess():
|
||||
# don't create core file
|
||||
try:
|
||||
setrlimit(RLIMIT_CORE, (0, 0))
|
||||
except (ValueError, resource_error):
|
||||
pass
|
||||
|
||||
def expected_traceback(lineno1, lineno2, header, count=1):
|
||||
regex = header
|
||||
regex += r' File "\<string\>", line %s in func\n' % lineno1
|
||||
regex += r' File "\<string\>", line %s in \<module\>' % lineno2
|
||||
if count != 1:
|
||||
regex = (regex + '\n') * (count - 1) + regex
|
||||
return '^' + regex + '$'
|
||||
|
||||
@contextmanager
|
||||
def temporary_filename():
|
||||
filename = tempfile.mktemp()
|
||||
try:
|
||||
yield filename
|
||||
finally:
|
||||
support.unlink(filename)
|
||||
|
||||
class FaultHandlerTests(unittest.TestCase):
|
||||
def get_output(self, code, expect_success, filename=None):
|
||||
"""
|
||||
Run the specified code in Python (in a new child process) and read the
|
||||
output from the standard error or from a file (if filename is set).
|
||||
Return the output lines as a list.
|
||||
|
||||
Strip the reference count from the standard error for Python debug
|
||||
build, and replace "Current thread 0x00007f8d8fbd9700" by "Current
|
||||
thread XXX".
|
||||
"""
|
||||
options = {}
|
||||
if prepare_subprocess:
|
||||
options['preexec_fn'] = prepare_subprocess
|
||||
process = script_helper.spawn_python('-c', code, **options)
|
||||
stdout, stderr = process.communicate()
|
||||
exitcode = process.wait()
|
||||
if expect_success:
|
||||
self.assertEqual(exitcode, 0)
|
||||
else:
|
||||
self.assertNotEqual(exitcode, 0)
|
||||
if filename:
|
||||
with open(filename, "rb") as fp:
|
||||
output = fp.read()
|
||||
else:
|
||||
output = support.strip_python_stderr(stdout)
|
||||
output = output.decode('ascii', 'backslashreplace')
|
||||
output = re.sub('Current thread 0x[0-9a-f]+',
|
||||
'Current thread XXX',
|
||||
output)
|
||||
return output.splitlines()
|
||||
|
||||
def check_fatal_error(self, code, line_number, name_regex,
|
||||
filename=None, all_threads=False):
|
||||
"""
|
||||
Check that the fault handler for fatal errors is enabled and check the
|
||||
traceback from the child process output.
|
||||
|
||||
Raise an error if the output doesn't match the expected format.
|
||||
"""
|
||||
if all_threads:
|
||||
header = 'Current thread XXX'
|
||||
else:
|
||||
header = 'Traceback (most recent call first)'
|
||||
regex = """
|
||||
^Fatal Python error: {name}
|
||||
|
||||
{header}:
|
||||
File "<string>", line {lineno} in <module>$
|
||||
""".strip()
|
||||
regex = regex.format(
|
||||
lineno=line_number,
|
||||
name=name_regex,
|
||||
header=re.escape(header))
|
||||
output = self.get_output(code, False, filename)
|
||||
output = '\n'.join(output)
|
||||
self.assertRegex(output, regex)
|
||||
|
||||
def test_read_null(self):
|
||||
self.check_fatal_error("""
|
||||
import faulthandler
|
||||
faulthandler.enable()
|
||||
faulthandler._read_null()
|
||||
""".strip(),
|
||||
3,
|
||||
'(?:Segmentation fault|Bus error)')
|
||||
|
||||
def test_sigsegv(self):
|
||||
self.check_fatal_error("""
|
||||
import faulthandler
|
||||
faulthandler.enable()
|
||||
faulthandler._sigsegv()
|
||||
""".strip(),
|
||||
3,
|
||||
'Segmentation fault')
|
||||
|
||||
@unittest.skipIf(sys.platform == 'win32',
|
||||
"SIGFPE cannot be caught on Windows")
|
||||
def test_sigfpe(self):
|
||||
self.check_fatal_error("""
|
||||
import faulthandler
|
||||
faulthandler.enable()
|
||||
faulthandler._sigfpe()
|
||||
""".strip(),
|
||||
3,
|
||||
'Floating point exception')
|
||||
|
||||
@unittest.skipIf(not hasattr(faulthandler, '_sigbus'),
|
||||
"need faulthandler._sigbus()")
|
||||
def test_sigbus(self):
|
||||
self.check_fatal_error("""
|
||||
import faulthandler
|
||||
faulthandler.enable()
|
||||
faulthandler._sigbus()
|
||||
""".strip(),
|
||||
3,
|
||||
'Bus error')
|
||||
|
||||
@unittest.skipIf(not hasattr(faulthandler, '_sigill'),
|
||||
"need faulthandler._sigill()")
|
||||
def test_sigill(self):
|
||||
self.check_fatal_error("""
|
||||
import faulthandler
|
||||
faulthandler.enable()
|
||||
faulthandler._sigill()
|
||||
""".strip(),
|
||||
3,
|
||||
'Illegal instruction')
|
||||
|
||||
def test_fatal_error(self):
|
||||
self.check_fatal_error("""
|
||||
import faulthandler
|
||||
faulthandler._fatal_error(b'xyz')
|
||||
""".strip(),
|
||||
2,
|
||||
'xyz')
|
||||
|
||||
@unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'),
|
||||
'need faulthandler._stack_overflow()')
|
||||
def test_stack_overflow(self):
|
||||
self.check_fatal_error("""
|
||||
import faulthandler
|
||||
faulthandler.enable()
|
||||
faulthandler._stack_overflow()
|
||||
""".strip(),
|
||||
3,
|
||||
'(?:Segmentation fault|Bus error)')
|
||||
|
||||
def test_gil_released(self):
|
||||
self.check_fatal_error("""
|
||||
import faulthandler
|
||||
faulthandler.enable()
|
||||
faulthandler._read_null(True)
|
||||
""".strip(),
|
||||
3,
|
||||
'(?:Segmentation fault|Bus error)')
|
||||
|
||||
def test_enable_file(self):
|
||||
with temporary_filename() as filename:
|
||||
self.check_fatal_error("""
|
||||
import faulthandler
|
||||
output = open({filename}, 'wb')
|
||||
faulthandler.enable(output)
|
||||
faulthandler._read_null(True)
|
||||
""".strip().format(filename=repr(filename)),
|
||||
4,
|
||||
'(?:Segmentation fault|Bus error)',
|
||||
filename=filename)
|
||||
|
||||
def test_enable_threads(self):
|
||||
self.check_fatal_error("""
|
||||
import faulthandler
|
||||
faulthandler.enable(all_threads=True)
|
||||
faulthandler._read_null(True)
|
||||
""".strip(),
|
||||
3,
|
||||
'(?:Segmentation fault|Bus error)',
|
||||
all_threads=True)
|
||||
|
||||
def test_disable(self):
|
||||
code = """
|
||||
import faulthandler
|
||||
faulthandler.enable()
|
||||
faulthandler.disable()
|
||||
faulthandler._read_null()
|
||||
""".strip()
|
||||
not_expected = 'Fatal Python error'
|
||||
stderr = self.get_output(code, False)
|
||||
stder = '\n'.join(stderr)
|
||||
self.assertTrue(not_expected not in stderr,
|
||||
"%r is present in %r" % (not_expected, stderr))
|
||||
|
||||
def test_is_enabled(self):
|
||||
was_enabled = faulthandler.is_enabled()
|
||||
try:
|
||||
faulthandler.enable()
|
||||
self.assertTrue(faulthandler.is_enabled())
|
||||
faulthandler.disable()
|
||||
self.assertFalse(faulthandler.is_enabled())
|
||||
finally:
|
||||
if was_enabled:
|
||||
faulthandler.enable()
|
||||
else:
|
||||
faulthandler.disable()
|
||||
|
||||
def check_dump_traceback(self, filename):
|
||||
"""
|
||||
Explicitly call dump_traceback() function and check its output.
|
||||
Raise an error if the output doesn't match the expected format.
|
||||
"""
|
||||
code = """
|
||||
import faulthandler
|
||||
|
||||
def funcB():
|
||||
if {has_filename}:
|
||||
with open({filename}, "wb") as fp:
|
||||
faulthandler.dump_traceback(fp)
|
||||
else:
|
||||
faulthandler.dump_traceback()
|
||||
|
||||
def funcA():
|
||||
funcB()
|
||||
|
||||
funcA()
|
||||
""".strip()
|
||||
code = code.format(
|
||||
filename=repr(filename),
|
||||
has_filename=bool(filename),
|
||||
)
|
||||
if filename:
|
||||
lineno = 6
|
||||
else:
|
||||
lineno = 8
|
||||
expected = [
|
||||
'Traceback (most recent call first):',
|
||||
' File "<string>", line %s in funcB' % lineno,
|
||||
' File "<string>", line 11 in funcA',
|
||||
' File "<string>", line 13 in <module>'
|
||||
]
|
||||
trace = self.get_output(code, True, filename)
|
||||
self.assertEqual(trace, expected)
|
||||
|
||||
def test_dump_traceback(self):
|
||||
self.check_dump_traceback(None)
|
||||
with temporary_filename() as filename:
|
||||
self.check_dump_traceback(filename)
|
||||
|
||||
def check_dump_traceback_threads(self, filename):
|
||||
"""
|
||||
Call explicitly dump_traceback(all_threads=True) and check the output.
|
||||
Raise an error if the output doesn't match the expected format.
|
||||
"""
|
||||
code = """
|
||||
import faulthandler
|
||||
from threading import Thread, Event
|
||||
import time
|
||||
|
||||
def dump():
|
||||
if {filename}:
|
||||
with open({filename}, "wb") as fp:
|
||||
faulthandler.dump_traceback(fp, all_threads=True)
|
||||
else:
|
||||
faulthandler.dump_traceback(all_threads=True)
|
||||
|
||||
class Waiter(Thread):
|
||||
# avoid blocking if the main thread raises an exception.
|
||||
daemon = True
|
||||
|
||||
def __init__(self):
|
||||
Thread.__init__(self)
|
||||
self.running = Event()
|
||||
self.stop = Event()
|
||||
|
||||
def run(self):
|
||||
self.running.set()
|
||||
self.stop.wait()
|
||||
|
||||
waiter = Waiter()
|
||||
waiter.start()
|
||||
waiter.running.wait()
|
||||
dump()
|
||||
waiter.stop.set()
|
||||
waiter.join()
|
||||
""".strip()
|
||||
code = code.format(filename=repr(filename))
|
||||
output = self.get_output(code, True, filename)
|
||||
output = '\n'.join(output)
|
||||
if filename:
|
||||
lineno = 8
|
||||
else:
|
||||
lineno = 10
|
||||
regex = """
|
||||
^Thread 0x[0-9a-f]+:
|
||||
(?: File ".*threading.py", line [0-9]+ in wait
|
||||
)? File ".*threading.py", line [0-9]+ in wait
|
||||
File "<string>", line 23 in run
|
||||
File ".*threading.py", line [0-9]+ in _bootstrap_inner
|
||||
File ".*threading.py", line [0-9]+ in _bootstrap
|
||||
|
||||
Current thread XXX:
|
||||
File "<string>", line {lineno} in dump
|
||||
File "<string>", line 28 in <module>$
|
||||
""".strip()
|
||||
regex = regex.format(lineno=lineno)
|
||||
self.assertRegex(output, regex)
|
||||
|
||||
def test_dump_traceback_threads(self):
|
||||
self.check_dump_traceback_threads(None)
|
||||
with temporary_filename() as filename:
|
||||
self.check_dump_traceback_threads(filename)
|
||||
|
||||
def _check_dump_tracebacks_later(self, repeat, cancel, filename):
|
||||
"""
|
||||
Check how many times the traceback is written in timeout x 2.5 seconds,
|
||||
or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending
|
||||
on repeat and cancel options.
|
||||
|
||||
Raise an error if the output doesn't match the expect format.
|
||||
"""
|
||||
code = """
|
||||
import faulthandler
|
||||
import time
|
||||
|
||||
def func(repeat, cancel, timeout):
|
||||
pause = timeout * 2.5
|
||||
a = time.time()
|
||||
time.sleep(pause)
|
||||
faulthandler.cancel_dump_tracebacks_later()
|
||||
b = time.time()
|
||||
# Check that sleep() was not interrupted
|
||||
assert (b -a) >= pause
|
||||
|
||||
if cancel:
|
||||
pause = timeout * 1.5
|
||||
a = time.time()
|
||||
time.sleep(pause)
|
||||
b = time.time()
|
||||
# Check that sleep() was not interrupted
|
||||
assert (b -a) >= pause
|
||||
|
||||
timeout = 0.5
|
||||
repeat = {repeat}
|
||||
cancel = {cancel}
|
||||
if {has_filename}:
|
||||
file = open({filename}, "wb")
|
||||
else:
|
||||
file = None
|
||||
faulthandler.dump_tracebacks_later(timeout,
|
||||
repeat=repeat, file=file)
|
||||
func(repeat, cancel, timeout)
|
||||
if file is not None:
|
||||
file.close()
|
||||
""".strip()
|
||||
code = code.format(
|
||||
filename=repr(filename),
|
||||
has_filename=bool(filename),
|
||||
repeat=repeat,
|
||||
cancel=cancel,
|
||||
)
|
||||
trace = self.get_output(code, True, filename)
|
||||
trace = '\n'.join(trace)
|
||||
|
||||
if repeat:
|
||||
count = 2
|
||||
else:
|
||||
count = 1
|
||||
header = 'Thread 0x[0-9a-f]+:\n'
|
||||
regex = expected_traceback(7, 30, header, count=count)
|
||||
self.assertRegex(trace, '^%s$' % regex)
|
||||
|
||||
@unittest.skipIf(not hasattr(faulthandler, 'dump_tracebacks_later'),
|
||||
'need faulthandler.dump_tracebacks_later()')
|
||||
def check_dump_tracebacks_later(self, repeat=False, cancel=False,
|
||||
file=False):
|
||||
if file:
|
||||
with temporary_filename() as filename:
|
||||
self._check_dump_tracebacks_later(repeat, cancel, filename)
|
||||
else:
|
||||
self._check_dump_tracebacks_later(repeat, cancel, None)
|
||||
|
||||
def test_dump_tracebacks_later(self):
|
||||
self.check_dump_tracebacks_later()
|
||||
|
||||
def test_dump_tracebacks_later_repeat(self):
|
||||
self.check_dump_tracebacks_later(repeat=True)
|
||||
|
||||
def test_dump_tracebacks_later_repeat_cancel(self):
|
||||
self.check_dump_tracebacks_later(repeat=True, cancel=True)
|
||||
|
||||
def test_dump_tracebacks_later_file(self):
|
||||
self.check_dump_tracebacks_later(file=True)
|
||||
|
||||
@unittest.skipIf(not hasattr(faulthandler, "register"),
|
||||
"need faulthandler.register")
|
||||
def check_register(self, filename=False, all_threads=False):
|
||||
"""
|
||||
Register a handler displaying the traceback on a user signal. Raise the
|
||||
signal and check the written traceback.
|
||||
|
||||
Raise an error if the output doesn't match the expected format.
|
||||
"""
|
||||
code = """
|
||||
import faulthandler
|
||||
import os
|
||||
import signal
|
||||
|
||||
def func(signum):
|
||||
os.kill(os.getpid(), signum)
|
||||
|
||||
signum = signal.SIGUSR1
|
||||
if {has_filename}:
|
||||
file = open({filename}, "wb")
|
||||
else:
|
||||
file = None
|
||||
faulthandler.register(signum, file=file, all_threads={all_threads})
|
||||
func(signum)
|
||||
if file is not None:
|
||||
file.close()
|
||||
""".strip()
|
||||
code = code.format(
|
||||
filename=repr(filename),
|
||||
has_filename=bool(filename),
|
||||
all_threads=all_threads,
|
||||
)
|
||||
trace = self.get_output(code, True, filename)
|
||||
trace = '\n'.join(trace)
|
||||
if all_threads:
|
||||
regex = 'Current thread XXX:\n'
|
||||
else:
|
||||
regex = 'Traceback \(most recent call first\):\n'
|
||||
regex = expected_traceback(6, 14, regex)
|
||||
self.assertTrue(re.match(regex, trace),
|
||||
"[%s] doesn't match [%s]: use_filename=%s, all_threads=%s"
|
||||
% (regex, trace, bool(filename), all_threads))
|
||||
|
||||
def test_register(self):
|
||||
self.check_register()
|
||||
|
||||
def test_register_file(self):
|
||||
with temporary_filename() as filename:
|
||||
self.check_register(filename=filename)
|
||||
|
||||
def test_register_threads(self):
|
||||
self.check_register(all_threads=True)
|
||||
|
||||
|
||||
def test_main():
|
||||
support.run_unittest(FaultHandlerTests)
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_main()
|
Loading…
Add table
Add a link
Reference in a new issue