mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 11:49:12 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			595 lines
		
	
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			595 lines
		
	
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from contextlib import contextmanager
 | 
						|
import datetime
 | 
						|
import faulthandler
 | 
						|
import os
 | 
						|
import re
 | 
						|
import signal
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
from test import support, script_helper
 | 
						|
from test.script_helper import assert_python_ok
 | 
						|
import tempfile
 | 
						|
import unittest
 | 
						|
 | 
						|
try:
 | 
						|
    import threading
 | 
						|
    HAVE_THREADS = True
 | 
						|
except ImportError:
 | 
						|
    HAVE_THREADS = False
 | 
						|
 | 
						|
TIMEOUT = 0.5
 | 
						|
 | 
						|
try:
 | 
						|
    from resource import setrlimit, RLIMIT_CORE, error as resource_error
 | 
						|
except ImportError:
 | 
						|
    prepare_subprocess = None
 | 
						|
else:
 | 
						|
    def prepare_subprocess():
 | 
						|
        # don't create core file
 | 
						|
        try:
 | 
						|
            setrlimit(RLIMIT_CORE, (0, 0))
 | 
						|
        except (ValueError, resource_error):
 | 
						|
            pass
 | 
						|
 | 
						|
def expected_traceback(lineno1, lineno2, header, min_count=1):
 | 
						|
    regex = header
 | 
						|
    regex += '  File "<string>", line %s in func\n' % lineno1
 | 
						|
    regex += '  File "<string>", line %s in <module>' % lineno2
 | 
						|
    if 1 < min_count:
 | 
						|
        return '^' + (regex + '\n') * (min_count - 1) + regex
 | 
						|
    else:
 | 
						|
        return '^' + regex + '$'
 | 
						|
 | 
						|
@contextmanager
 | 
						|
def temporary_filename():
 | 
						|
    filename = tempfile.mktemp()
 | 
						|
    try:
 | 
						|
        yield filename
 | 
						|
    finally:
 | 
						|
        support.unlink(filename)
 | 
						|
 | 
						|
class FaultHandlerTests(unittest.TestCase):
 | 
						|
    def get_output(self, code, filename=None):
 | 
						|
        """
 | 
						|
        Run the specified code in Python (in a new child process) and read the
 | 
						|
        output from the standard error or from a file (if filename is set).
 | 
						|
        Return the output lines as a list.
 | 
						|
 | 
						|
        Strip the reference count from the standard error for Python debug
 | 
						|
        build, and replace "Current thread 0x00007f8d8fbd9700" by "Current
 | 
						|
        thread XXX".
 | 
						|
        """
 | 
						|
        options = {}
 | 
						|
        if prepare_subprocess:
 | 
						|
            options['preexec_fn'] = prepare_subprocess
 | 
						|
        process = script_helper.spawn_python('-c', code, **options)
 | 
						|
        stdout, stderr = process.communicate()
 | 
						|
        exitcode = process.wait()
 | 
						|
        output = support.strip_python_stderr(stdout)
 | 
						|
        output = output.decode('ascii', 'backslashreplace')
 | 
						|
        if filename:
 | 
						|
            self.assertEqual(output, '')
 | 
						|
            with open(filename, "rb") as fp:
 | 
						|
                output = fp.read()
 | 
						|
            output = output.decode('ascii', 'backslashreplace')
 | 
						|
        output = re.sub('Current thread 0x[0-9a-f]+',
 | 
						|
                        'Current thread XXX',
 | 
						|
                        output)
 | 
						|
        return output.splitlines(), exitcode
 | 
						|
 | 
						|
    def check_fatal_error(self, code, line_number, name_regex,
 | 
						|
                          filename=None, all_threads=True, other_regex=None):
 | 
						|
        """
 | 
						|
        Check that the fault handler for fatal errors is enabled and check the
 | 
						|
        traceback from the child process output.
 | 
						|
 | 
						|
        Raise an error if the output doesn't match the expected format.
 | 
						|
        """
 | 
						|
        if all_threads:
 | 
						|
            header = 'Current thread XXX'
 | 
						|
        else:
 | 
						|
            header = 'Traceback (most recent call first)'
 | 
						|
        regex = """
 | 
						|
^Fatal Python error: {name}
 | 
						|
 | 
						|
{header}:
 | 
						|
  File "<string>", line {lineno} in <module>
 | 
						|
""".strip()
 | 
						|
        regex = regex.format(
 | 
						|
            lineno=line_number,
 | 
						|
            name=name_regex,
 | 
						|
            header=re.escape(header))
 | 
						|
        if other_regex:
 | 
						|
            regex += '|' + other_regex
 | 
						|
        with support.suppress_crash_popup():
 | 
						|
            output, exitcode = self.get_output(code, filename)
 | 
						|
        output = '\n'.join(output)
 | 
						|
        self.assertRegex(output, regex)
 | 
						|
        self.assertNotEqual(exitcode, 0)
 | 
						|
 | 
						|
    def test_read_null(self):
 | 
						|
        self.check_fatal_error("""
 | 
						|
import faulthandler
 | 
						|
faulthandler.enable()
 | 
						|
faulthandler._read_null()
 | 
						|
""".strip(),
 | 
						|
            3,
 | 
						|
            # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion
 | 
						|
            '(?:Segmentation fault|Bus error|Illegal instruction)')
 | 
						|
 | 
						|
    def test_sigsegv(self):
 | 
						|
        self.check_fatal_error("""
 | 
						|
import faulthandler
 | 
						|
faulthandler.enable()
 | 
						|
faulthandler._sigsegv()
 | 
						|
""".strip(),
 | 
						|
            3,
 | 
						|
            'Segmentation fault')
 | 
						|
 | 
						|
    def test_sigabrt(self):
 | 
						|
        self.check_fatal_error("""
 | 
						|
import faulthandler
 | 
						|
faulthandler.enable()
 | 
						|
faulthandler._sigabrt()
 | 
						|
""".strip(),
 | 
						|
            3,
 | 
						|
            'Aborted')
 | 
						|
 | 
						|
    @unittest.skipIf(sys.platform == 'win32',
 | 
						|
                     "SIGFPE cannot be caught on Windows")
 | 
						|
    def test_sigfpe(self):
 | 
						|
        self.check_fatal_error("""
 | 
						|
import faulthandler
 | 
						|
faulthandler.enable()
 | 
						|
faulthandler._sigfpe()
 | 
						|
""".strip(),
 | 
						|
            3,
 | 
						|
            'Floating point exception')
 | 
						|
 | 
						|
    @unittest.skipIf(not hasattr(faulthandler, '_sigbus'),
 | 
						|
                     "need faulthandler._sigbus()")
 | 
						|
    def test_sigbus(self):
 | 
						|
        self.check_fatal_error("""
 | 
						|
import faulthandler
 | 
						|
faulthandler.enable()
 | 
						|
faulthandler._sigbus()
 | 
						|
""".strip(),
 | 
						|
            3,
 | 
						|
            'Bus error')
 | 
						|
 | 
						|
    @unittest.skipIf(not hasattr(faulthandler, '_sigill'),
 | 
						|
                     "need faulthandler._sigill()")
 | 
						|
    def test_sigill(self):
 | 
						|
        self.check_fatal_error("""
 | 
						|
import faulthandler
 | 
						|
faulthandler.enable()
 | 
						|
faulthandler._sigill()
 | 
						|
""".strip(),
 | 
						|
            3,
 | 
						|
            'Illegal instruction')
 | 
						|
 | 
						|
    def test_fatal_error(self):
 | 
						|
        self.check_fatal_error("""
 | 
						|
import faulthandler
 | 
						|
faulthandler._fatal_error(b'xyz')
 | 
						|
""".strip(),
 | 
						|
            2,
 | 
						|
            'xyz')
 | 
						|
 | 
						|
    @unittest.skipIf(sys.platform.startswith('openbsd') and HAVE_THREADS,
 | 
						|
                     "Issue #12868: sigaltstack() doesn't work on "
 | 
						|
                     "OpenBSD if Python is compiled with pthread")
 | 
						|
    @unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'),
 | 
						|
                     'need faulthandler._stack_overflow()')
 | 
						|
    def test_stack_overflow(self):
 | 
						|
        self.check_fatal_error("""
 | 
						|
import faulthandler
 | 
						|
faulthandler.enable()
 | 
						|
faulthandler._stack_overflow()
 | 
						|
""".strip(),
 | 
						|
            3,
 | 
						|
            '(?:Segmentation fault|Bus error)',
 | 
						|
            other_regex='unable to raise a stack overflow')
 | 
						|
 | 
						|
    def test_gil_released(self):
 | 
						|
        self.check_fatal_error("""
 | 
						|
import faulthandler
 | 
						|
faulthandler.enable()
 | 
						|
faulthandler._read_null(True)
 | 
						|
""".strip(),
 | 
						|
            3,
 | 
						|
            '(?:Segmentation fault|Bus error|Illegal instruction)')
 | 
						|
 | 
						|
    def test_enable_file(self):
 | 
						|
        with temporary_filename() as filename:
 | 
						|
            self.check_fatal_error("""
 | 
						|
import faulthandler
 | 
						|
output = open({filename}, 'wb')
 | 
						|
faulthandler.enable(output)
 | 
						|
faulthandler._read_null()
 | 
						|
""".strip().format(filename=repr(filename)),
 | 
						|
                4,
 | 
						|
                '(?:Segmentation fault|Bus error|Illegal instruction)',
 | 
						|
                filename=filename)
 | 
						|
 | 
						|
    def test_enable_single_thread(self):
 | 
						|
        self.check_fatal_error("""
 | 
						|
import faulthandler
 | 
						|
faulthandler.enable(all_threads=False)
 | 
						|
faulthandler._read_null()
 | 
						|
""".strip(),
 | 
						|
            3,
 | 
						|
            '(?:Segmentation fault|Bus error|Illegal instruction)',
 | 
						|
            all_threads=False)
 | 
						|
 | 
						|
    def test_disable(self):
 | 
						|
        code = """
 | 
						|
import faulthandler
 | 
						|
faulthandler.enable()
 | 
						|
faulthandler.disable()
 | 
						|
faulthandler._read_null()
 | 
						|
""".strip()
 | 
						|
        not_expected = 'Fatal Python error'
 | 
						|
        with support.suppress_crash_popup():
 | 
						|
            stderr, exitcode = self.get_output(code)
 | 
						|
        stder = '\n'.join(stderr)
 | 
						|
        self.assertTrue(not_expected not in stderr,
 | 
						|
                     "%r is present in %r" % (not_expected, stderr))
 | 
						|
        self.assertNotEqual(exitcode, 0)
 | 
						|
 | 
						|
    def test_is_enabled(self):
 | 
						|
        orig_stderr = sys.stderr
 | 
						|
        try:
 | 
						|
            # regrtest may replace sys.stderr by io.StringIO object, but
 | 
						|
            # faulthandler.enable() requires that sys.stderr has a fileno()
 | 
						|
            # method
 | 
						|
            sys.stderr = sys.__stderr__
 | 
						|
 | 
						|
            was_enabled = faulthandler.is_enabled()
 | 
						|
            try:
 | 
						|
                faulthandler.enable()
 | 
						|
                self.assertTrue(faulthandler.is_enabled())
 | 
						|
                faulthandler.disable()
 | 
						|
                self.assertFalse(faulthandler.is_enabled())
 | 
						|
            finally:
 | 
						|
                if was_enabled:
 | 
						|
                    faulthandler.enable()
 | 
						|
                else:
 | 
						|
                    faulthandler.disable()
 | 
						|
        finally:
 | 
						|
            sys.stderr = orig_stderr
 | 
						|
 | 
						|
    def test_disabled_by_default(self):
 | 
						|
        # By default, the module should be disabled
 | 
						|
        code = "import faulthandler; print(faulthandler.is_enabled())"
 | 
						|
        rc, stdout, stderr = assert_python_ok("-c", code)
 | 
						|
        stdout = (stdout + stderr).strip()
 | 
						|
        self.assertEqual(stdout, b"False")
 | 
						|
 | 
						|
    def test_sys_xoptions(self):
 | 
						|
        # Test python -X faulthandler
 | 
						|
        code = "import faulthandler; print(faulthandler.is_enabled())"
 | 
						|
        rc, stdout, stderr = assert_python_ok("-X", "faulthandler", "-c", code)
 | 
						|
        stdout = (stdout + stderr).strip()
 | 
						|
        self.assertEqual(stdout, b"True")
 | 
						|
 | 
						|
    def check_dump_traceback(self, filename):
 | 
						|
        """
 | 
						|
        Explicitly call dump_traceback() function and check its output.
 | 
						|
        Raise an error if the output doesn't match the expected format.
 | 
						|
        """
 | 
						|
        code = """
 | 
						|
import faulthandler
 | 
						|
 | 
						|
def funcB():
 | 
						|
    if {has_filename}:
 | 
						|
        with open({filename}, "wb") as fp:
 | 
						|
            faulthandler.dump_traceback(fp, all_threads=False)
 | 
						|
    else:
 | 
						|
        faulthandler.dump_traceback(all_threads=False)
 | 
						|
 | 
						|
def funcA():
 | 
						|
    funcB()
 | 
						|
 | 
						|
funcA()
 | 
						|
""".strip()
 | 
						|
        code = code.format(
 | 
						|
            filename=repr(filename),
 | 
						|
            has_filename=bool(filename),
 | 
						|
        )
 | 
						|
        if filename:
 | 
						|
            lineno = 6
 | 
						|
        else:
 | 
						|
            lineno = 8
 | 
						|
        expected = [
 | 
						|
            'Traceback (most recent call first):',
 | 
						|
            '  File "<string>", line %s in funcB' % lineno,
 | 
						|
            '  File "<string>", line 11 in funcA',
 | 
						|
            '  File "<string>", line 13 in <module>'
 | 
						|
        ]
 | 
						|
        trace, exitcode = self.get_output(code, filename)
 | 
						|
        self.assertEqual(trace, expected)
 | 
						|
        self.assertEqual(exitcode, 0)
 | 
						|
 | 
						|
    def test_dump_traceback(self):
 | 
						|
        self.check_dump_traceback(None)
 | 
						|
 | 
						|
    def test_dump_traceback_file(self):
 | 
						|
        with temporary_filename() as filename:
 | 
						|
            self.check_dump_traceback(filename)
 | 
						|
 | 
						|
    def test_truncate(self):
 | 
						|
        maxlen = 500
 | 
						|
        func_name = 'x' * (maxlen + 50)
 | 
						|
        truncated = 'x' * maxlen + '...'
 | 
						|
        code = """
 | 
						|
import faulthandler
 | 
						|
 | 
						|
def {func_name}():
 | 
						|
    faulthandler.dump_traceback(all_threads=False)
 | 
						|
 | 
						|
{func_name}()
 | 
						|
""".strip()
 | 
						|
        code = code.format(
 | 
						|
            func_name=func_name,
 | 
						|
        )
 | 
						|
        expected = [
 | 
						|
            'Traceback (most recent call first):',
 | 
						|
            '  File "<string>", line 4 in %s' % truncated,
 | 
						|
            '  File "<string>", line 6 in <module>'
 | 
						|
        ]
 | 
						|
        trace, exitcode = self.get_output(code)
 | 
						|
        self.assertEqual(trace, expected)
 | 
						|
        self.assertEqual(exitcode, 0)
 | 
						|
 | 
						|
    @unittest.skipIf(not HAVE_THREADS, 'need threads')
 | 
						|
    def check_dump_traceback_threads(self, filename):
 | 
						|
        """
 | 
						|
        Call explicitly dump_traceback(all_threads=True) and check the output.
 | 
						|
        Raise an error if the output doesn't match the expected format.
 | 
						|
        """
 | 
						|
        code = """
 | 
						|
import faulthandler
 | 
						|
from threading import Thread, Event
 | 
						|
import time
 | 
						|
 | 
						|
def dump():
 | 
						|
    if {filename}:
 | 
						|
        with open({filename}, "wb") as fp:
 | 
						|
            faulthandler.dump_traceback(fp, all_threads=True)
 | 
						|
    else:
 | 
						|
        faulthandler.dump_traceback(all_threads=True)
 | 
						|
 | 
						|
class Waiter(Thread):
 | 
						|
    # avoid blocking if the main thread raises an exception.
 | 
						|
    daemon = True
 | 
						|
 | 
						|
    def __init__(self):
 | 
						|
        Thread.__init__(self)
 | 
						|
        self.running = Event()
 | 
						|
        self.stop = Event()
 | 
						|
 | 
						|
    def run(self):
 | 
						|
        self.running.set()
 | 
						|
        self.stop.wait()
 | 
						|
 | 
						|
waiter = Waiter()
 | 
						|
waiter.start()
 | 
						|
waiter.running.wait()
 | 
						|
dump()
 | 
						|
waiter.stop.set()
 | 
						|
waiter.join()
 | 
						|
""".strip()
 | 
						|
        code = code.format(filename=repr(filename))
 | 
						|
        output, exitcode = self.get_output(code, filename)
 | 
						|
        output = '\n'.join(output)
 | 
						|
        if filename:
 | 
						|
            lineno = 8
 | 
						|
        else:
 | 
						|
            lineno = 10
 | 
						|
        regex = """
 | 
						|
^Thread 0x[0-9a-f]+:
 | 
						|
(?:  File ".*threading.py", line [0-9]+ in [_a-z]+
 | 
						|
){{1,3}}  File "<string>", line 23 in run
 | 
						|
  File ".*threading.py", line [0-9]+ in _bootstrap_inner
 | 
						|
  File ".*threading.py", line [0-9]+ in _bootstrap
 | 
						|
 | 
						|
Current thread XXX:
 | 
						|
  File "<string>", line {lineno} in dump
 | 
						|
  File "<string>", line 28 in <module>$
 | 
						|
""".strip()
 | 
						|
        regex = regex.format(lineno=lineno)
 | 
						|
        self.assertRegex(output, regex)
 | 
						|
        self.assertEqual(exitcode, 0)
 | 
						|
 | 
						|
    def test_dump_traceback_threads(self):
 | 
						|
        self.check_dump_traceback_threads(None)
 | 
						|
 | 
						|
    def test_dump_traceback_threads_file(self):
 | 
						|
        with temporary_filename() as filename:
 | 
						|
            self.check_dump_traceback_threads(filename)
 | 
						|
 | 
						|
    def _check_dump_traceback_later(self, repeat, cancel, filename, loops):
 | 
						|
        """
 | 
						|
        Check how many times the traceback is written in timeout x 2.5 seconds,
 | 
						|
        or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending
 | 
						|
        on repeat and cancel options.
 | 
						|
 | 
						|
        Raise an error if the output doesn't match the expect format.
 | 
						|
        """
 | 
						|
        timeout_str = str(datetime.timedelta(seconds=TIMEOUT))
 | 
						|
        code = """
 | 
						|
import faulthandler
 | 
						|
import time
 | 
						|
 | 
						|
def func(timeout, repeat, cancel, file, loops):
 | 
						|
    for loop in range(loops):
 | 
						|
        faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
 | 
						|
        if cancel:
 | 
						|
            faulthandler.cancel_dump_traceback_later()
 | 
						|
        time.sleep(timeout * 5)
 | 
						|
        faulthandler.cancel_dump_traceback_later()
 | 
						|
 | 
						|
timeout = {timeout}
 | 
						|
repeat = {repeat}
 | 
						|
cancel = {cancel}
 | 
						|
loops = {loops}
 | 
						|
if {has_filename}:
 | 
						|
    file = open({filename}, "wb")
 | 
						|
else:
 | 
						|
    file = None
 | 
						|
func(timeout, repeat, cancel, file, loops)
 | 
						|
if file is not None:
 | 
						|
    file.close()
 | 
						|
""".strip()
 | 
						|
        code = code.format(
 | 
						|
            timeout=TIMEOUT,
 | 
						|
            repeat=repeat,
 | 
						|
            cancel=cancel,
 | 
						|
            loops=loops,
 | 
						|
            has_filename=bool(filename),
 | 
						|
            filename=repr(filename),
 | 
						|
        )
 | 
						|
        trace, exitcode = self.get_output(code, filename)
 | 
						|
        trace = '\n'.join(trace)
 | 
						|
 | 
						|
        if not cancel:
 | 
						|
            count = loops
 | 
						|
            if repeat:
 | 
						|
                count *= 2
 | 
						|
            header = r'Timeout \(%s\)!\nThread 0x[0-9a-f]+:\n' % timeout_str
 | 
						|
            regex = expected_traceback(9, 20, header, min_count=count)
 | 
						|
            self.assertRegex(trace, regex)
 | 
						|
        else:
 | 
						|
            self.assertEqual(trace, '')
 | 
						|
        self.assertEqual(exitcode, 0)
 | 
						|
 | 
						|
    @unittest.skipIf(not hasattr(faulthandler, 'dump_traceback_later'),
 | 
						|
                     'need faulthandler.dump_traceback_later()')
 | 
						|
    def check_dump_traceback_later(self, repeat=False, cancel=False,
 | 
						|
                                    file=False, twice=False):
 | 
						|
        if twice:
 | 
						|
            loops = 2
 | 
						|
        else:
 | 
						|
            loops = 1
 | 
						|
        if file:
 | 
						|
            with temporary_filename() as filename:
 | 
						|
                self._check_dump_traceback_later(repeat, cancel,
 | 
						|
                                                  filename, loops)
 | 
						|
        else:
 | 
						|
            self._check_dump_traceback_later(repeat, cancel, None, loops)
 | 
						|
 | 
						|
    def test_dump_traceback_later(self):
 | 
						|
        self.check_dump_traceback_later()
 | 
						|
 | 
						|
    def test_dump_traceback_later_repeat(self):
 | 
						|
        self.check_dump_traceback_later(repeat=True)
 | 
						|
 | 
						|
    def test_dump_traceback_later_cancel(self):
 | 
						|
        self.check_dump_traceback_later(cancel=True)
 | 
						|
 | 
						|
    def test_dump_traceback_later_file(self):
 | 
						|
        self.check_dump_traceback_later(file=True)
 | 
						|
 | 
						|
    def test_dump_traceback_later_twice(self):
 | 
						|
        self.check_dump_traceback_later(twice=True)
 | 
						|
 | 
						|
    @unittest.skipIf(not hasattr(faulthandler, "register"),
 | 
						|
                     "need faulthandler.register")
 | 
						|
    def check_register(self, filename=False, all_threads=False,
 | 
						|
                       unregister=False, chain=False):
 | 
						|
        """
 | 
						|
        Register a handler displaying the traceback on a user signal. Raise the
 | 
						|
        signal and check the written traceback.
 | 
						|
 | 
						|
        If chain is True, check that the previous signal handler is called.
 | 
						|
 | 
						|
        Raise an error if the output doesn't match the expected format.
 | 
						|
        """
 | 
						|
        signum = signal.SIGUSR1
 | 
						|
        code = """
 | 
						|
import faulthandler
 | 
						|
import os
 | 
						|
import signal
 | 
						|
import sys
 | 
						|
 | 
						|
def func(signum):
 | 
						|
    os.kill(os.getpid(), signum)
 | 
						|
 | 
						|
def handler(signum, frame):
 | 
						|
    handler.called = True
 | 
						|
handler.called = False
 | 
						|
 | 
						|
exitcode = 0
 | 
						|
signum = {signum}
 | 
						|
unregister = {unregister}
 | 
						|
chain = {chain}
 | 
						|
 | 
						|
if {has_filename}:
 | 
						|
    file = open({filename}, "wb")
 | 
						|
else:
 | 
						|
    file = None
 | 
						|
if chain:
 | 
						|
    signal.signal(signum, handler)
 | 
						|
faulthandler.register(signum, file=file,
 | 
						|
                      all_threads={all_threads}, chain={chain})
 | 
						|
if unregister:
 | 
						|
    faulthandler.unregister(signum)
 | 
						|
func(signum)
 | 
						|
if chain and not handler.called:
 | 
						|
    if file is not None:
 | 
						|
        output = file
 | 
						|
    else:
 | 
						|
        output = sys.stderr
 | 
						|
    print("Error: signal handler not called!", file=output)
 | 
						|
    exitcode = 1
 | 
						|
if file is not None:
 | 
						|
    file.close()
 | 
						|
sys.exit(exitcode)
 | 
						|
""".strip()
 | 
						|
        code = code.format(
 | 
						|
            filename=repr(filename),
 | 
						|
            has_filename=bool(filename),
 | 
						|
            all_threads=all_threads,
 | 
						|
            signum=signum,
 | 
						|
            unregister=unregister,
 | 
						|
            chain=chain,
 | 
						|
        )
 | 
						|
        trace, exitcode = self.get_output(code, filename)
 | 
						|
        trace = '\n'.join(trace)
 | 
						|
        if not unregister:
 | 
						|
            if all_threads:
 | 
						|
                regex = 'Current thread XXX:\n'
 | 
						|
            else:
 | 
						|
                regex = 'Traceback \(most recent call first\):\n'
 | 
						|
            regex = expected_traceback(7, 28, regex)
 | 
						|
            self.assertRegex(trace, regex)
 | 
						|
        else:
 | 
						|
            self.assertEqual(trace, '')
 | 
						|
        if unregister:
 | 
						|
            self.assertNotEqual(exitcode, 0)
 | 
						|
        else:
 | 
						|
            self.assertEqual(exitcode, 0)
 | 
						|
 | 
						|
    def test_register(self):
 | 
						|
        self.check_register()
 | 
						|
 | 
						|
    def test_unregister(self):
 | 
						|
        self.check_register(unregister=True)
 | 
						|
 | 
						|
    def test_register_file(self):
 | 
						|
        with temporary_filename() as filename:
 | 
						|
            self.check_register(filename=filename)
 | 
						|
 | 
						|
    def test_register_threads(self):
 | 
						|
        self.check_register(all_threads=True)
 | 
						|
 | 
						|
    def test_register_chain(self):
 | 
						|
        self.check_register(chain=True)
 | 
						|
 | 
						|
 | 
						|
def test_main():
 | 
						|
    support.run_unittest(FaultHandlerTests)
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    test_main()
 |