Issue #22018: Add _testcapi.raise_signal()

- Use _testcapi.raise_signal() in test_signal
- close also os.pipe() file descriptors in some test_signal tests where they
  were not closed properly
- Remove faulthandler._sigill() and faulthandler._sigbus(): reuse
  _testcapi.raise_signal() in test_faulthandler
This commit is contained in:
Victor Stinner 2014-07-21 12:30:22 +02:00
parent 569a7fa13a
commit 56e8c29a4e
4 changed files with 85 additions and 53 deletions

View file

@ -15,6 +15,10 @@ try:
import threading
except ImportError:
threading = None
try:
import _testcapi
except ImportError:
_testcapi = None
class HandlerBCalled(Exception):
@ -250,12 +254,27 @@ class WakeupFDTests(unittest.TestCase):
fd = support.make_bad_fd()
self.assertRaises(ValueError, signal.set_wakeup_fd, fd)
def test_set_wakeup_fd_result(self):
r1, w1 = os.pipe()
os.close(r1)
self.addCleanup(os.close, w1)
r2, w2 = os.pipe()
os.close(r2)
self.addCleanup(os.close, w2)
signal.set_wakeup_fd(w1)
self.assertIs(signal.set_wakeup_fd(w2), w1)
self.assertIs(signal.set_wakeup_fd(-1), w2)
self.assertIs(signal.set_wakeup_fd(-1), -1)
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
class WakeupSignalTests(unittest.TestCase):
@unittest.skipIf(_testcapi is None, 'need _testcapi')
def check_wakeup(self, test_body, *signals, ordered=True):
# use a subprocess to have only one thread
code = """if 1:
import _testcapi
import fcntl
import os
import signal
@ -294,17 +313,18 @@ class WakeupSignalTests(unittest.TestCase):
assert_python_ok('-c', code)
@unittest.skipIf(_testcapi is None, 'need _testcapi')
def test_wakeup_write_error(self):
# Issue #16105: write() errors in the C signal handler should not
# pass silently.
# Use a subprocess to have only one thread.
code = """if 1:
import _testcapi
import errno
import fcntl
import os
import signal
import sys
import time
from test.support import captured_stderr
def handler(signum, frame):
@ -319,8 +339,7 @@ class WakeupSignalTests(unittest.TestCase):
signal.set_wakeup_fd(r)
try:
with captured_stderr() as err:
signal.alarm(1)
time.sleep(5.0)
_testcapi.raise_signal(signal.SIGALRM)
except ZeroDivisionError:
# An ignored exception should have been printed out on stderr
err = err.getvalue()
@ -331,6 +350,9 @@ class WakeupSignalTests(unittest.TestCase):
raise AssertionError(err)
else:
raise AssertionError("ZeroDivisionError not raised")
os.close(r)
os.close(w)
"""
r, w = os.pipe()
try:
@ -394,9 +416,10 @@ class WakeupSignalTests(unittest.TestCase):
def test_signum(self):
self.check_wakeup("""def test():
import _testcapi
signal.signal(signal.SIGUSR1, handler)
os.kill(os.getpid(), signal.SIGUSR1)
os.kill(os.getpid(), signal.SIGALRM)
_testcapi.raise_signal(signal.SIGUSR1)
_testcapi.raise_signal(signal.SIGALRM)
""", signal.SIGUSR1, signal.SIGALRM)
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
@ -410,8 +433,8 @@ class WakeupSignalTests(unittest.TestCase):
signal.signal(signum2, handler)
signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
os.kill(os.getpid(), signum1)
os.kill(os.getpid(), signum2)
_testcapi.raise_signal(signum1)
_testcapi.raise_signal(signum2)
# Unblocking the 2 signals calls the C signal handler twice
signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
""", signal.SIGUSR1, signal.SIGUSR2, ordered=False)
@ -447,18 +470,22 @@ class SiginterruptTest(unittest.TestCase):
sys.stdout.flush()
# run the test twice
for loop in range(2):
# send a SIGALRM in a second (during the read)
signal.alarm(1)
try:
# blocking call: read from a pipe without data
os.read(r, 1)
except OSError as err:
if err.errno != errno.EINTR:
raise
else:
sys.exit(2)
sys.exit(3)
try:
for loop in range(2):
# send a SIGALRM in a second (during the read)
signal.alarm(1)
try:
# blocking call: read from a pipe without data
os.read(r, 1)
except OSError as err:
if err.errno != errno.EINTR:
raise
else:
sys.exit(2)
sys.exit(3)
finally:
os.close(r)
os.close(w)
""" % (interrupt,)
with spawn_python('-c', code) as process:
try: