Issue #12303: run sig*wait*() tests in a subprocesss

... instead of using fork(): sig*wait*() functions behave differently (not
correctly) after a fork, especially on FreeBSD 6.

Skip also test_sigtimedwait_poll() on FreeBSD 6 because of a kernel bug.
This commit is contained in:
Victor Stinner 2011-06-29 10:43:02 +02:00
parent 470e0dc23a
commit 9e8b82f1e1

View file

@ -520,7 +520,6 @@ class PendingSignalsTests(unittest.TestCase):
functions. functions.
""" """
def setUp(self): def setUp(self):
self.hndl_called = False
self.has_pthread_kill = hasattr(signal, 'pthread_kill') self.has_pthread_kill = hasattr(signal, 'pthread_kill')
def handler(self, signum, frame): def handler(self, signum, frame):
@ -610,17 +609,26 @@ class PendingSignalsTests(unittest.TestCase):
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
'need signal.pthread_sigmask()') 'need signal.pthread_sigmask()')
@unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()') def wait_helper(self, test, blocked):
def wait_helper(self, test, handler, blocked): """
signum = signal.SIGALRM test: body of the "def test(signum):" function.
blocked: number of the blocked signal
"""
code = '''
import signal
import sys
# sig*wait* must be called with the signal blocked: since the current def handler(signum, frame):
# process might have several threads running, we fork() a child process 1/0
# to have a single thread.
pid = os.fork() def test(signum):
if pid == 0: %s
# child: block and wait the signal
try: blocked = %s
signum = signal.SIGALRM
# child: block and wait the signal
try:
signal.signal(signum, handler) signal.signal(signum, handler)
signal.pthread_sigmask(signal.SIG_BLOCK, [blocked]) signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
@ -633,64 +641,76 @@ class PendingSignalsTests(unittest.TestCase):
except ZeroDivisionError: except ZeroDivisionError:
print("the signal handler has been called", print("the signal handler has been called",
file=sys.stderr) file=sys.stderr)
os._exit(1) sys.exit(1)
except BaseException as err: except BaseException as err:
print("error: {}".format(err), file=sys.stderr) print("error: {}".format(err), file=sys.stderr)
sys.stderr.flush() sys.stderr.flush()
os._exit(1) sys.exit(1)
else: ''' % (test, blocked)
os._exit(0)
else: # sig*wait* must be called with the signal blocked: since the current
# parent: check that the child correcty received the signal # process might have several threads running, use a subprocess to have
self.assertEqual(os.waitpid(pid, 0), (pid, 0)) # a single thread.
assert_python_ok('-c', code)
@unittest.skipUnless(hasattr(signal, 'sigwait'), @unittest.skipUnless(hasattr(signal, 'sigwait'),
'need signal.sigwait()') 'need signal.sigwait()')
def test_sigwait(self): def test_sigwait(self):
def test(signum): test = '''
signal.alarm(1) signal.alarm(1)
self.assertEqual(signum, signal.sigwait([signum])) received = signal.sigwait([signum])
assert received == signum , 'received %s, not %s' % (received, signum)
'''
self.wait_helper(test, self.handler, signal.SIGALRM) self.wait_helper(test, signal.SIGALRM)
@unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
'need signal.sigwaitinfo()') 'need signal.sigwaitinfo()')
def test_sigwaitinfo(self): def test_sigwaitinfo(self):
def test(signum): test = '''
signal.alarm(1) signal.alarm(1)
info = signal.sigwaitinfo([signum]) info = signal.sigwaitinfo([signum])
self.assertEqual(signum, info.si_signo) assert info.si_signo == signum, "info.si_signo != %s" % signum
'''
self.wait_helper(test, self.handler, signal.SIGALRM) self.wait_helper(test, signal.SIGALRM)
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'), @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
'need signal.sigtimedwait()') 'need signal.sigtimedwait()')
def test_sigtimedwait(self): def test_sigtimedwait(self):
def test(signum): test = '''
signal.alarm(1) signal.alarm(1)
info = signal.sigtimedwait([signum], (10, 1000)) info = signal.sigtimedwait([signum], (10, 1000))
self.assertEqual(signum, info.si_signo) assert info.si_signo == signum, 'info.si_signo != %s' % signum
'''
self.wait_helper(test, self.handler, signal.SIGALRM) self.wait_helper(test, signal.SIGALRM)
# check that polling with sigtimedwait works
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'), @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
'need signal.sigtimedwait()') 'need signal.sigtimedwait()')
# issue #12303: sigtimedwait() takes 30 seconds on FreeBSD 6 (kernel bug)
@unittest.skipIf(sys.platform =='freebsd6',
'sigtimedwait() with a null timeout doens\'t work on FreeBSD 6')
def test_sigtimedwait_poll(self): def test_sigtimedwait_poll(self):
def test(signum): # check that polling with sigtimedwait works
self.kill(signum) test = '''
import os
os.kill(os.getpid(), signum)
info = signal.sigtimedwait([signum], (0, 0)) info = signal.sigtimedwait([signum], (0, 0))
self.assertEqual(signum, info.si_signo) assert info.si_signo == signum, 'info.si_signo != %s' % signum
'''
self.wait_helper(test, self.handler, signal.SIGALRM) self.wait_helper(test, signal.SIGALRM)
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'), @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
'need signal.sigtimedwait()') 'need signal.sigtimedwait()')
def test_sigtimedwait_timeout(self): def test_sigtimedwait_timeout(self):
def test(signum): test = '''
self.assertEqual(None, signal.sigtimedwait([signum], (1, 35500))) received = signal.sigtimedwait([signum], (1, 0))
assert received is None, "received=%r" % (received,)
'''
self.wait_helper(test, self.handler, signal.SIGALRM) self.wait_helper(test, signal.SIGALRM)
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'), @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
'need signal.sigtimedwait()') 'need signal.sigtimedwait()')
@ -700,25 +720,30 @@ class PendingSignalsTests(unittest.TestCase):
self.assertRaises(ValueError, signal.sigtimedwait, [signum], (0, -1)) self.assertRaises(ValueError, signal.sigtimedwait, [signum], (0, -1))
self.assertRaises(ValueError, signal.sigtimedwait, [signum], (-1, 0)) self.assertRaises(ValueError, signal.sigtimedwait, [signum], (-1, 0))
def alarm_handler(self, signum, frame):
self.hndl_called = True
@unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
'need signal.sigwaitinfo()') 'need signal.sigwaitinfo()')
def test_sigwaitinfo_interrupted(self): def test_sigwaitinfo_interrupted(self):
def test(signum): test = '''
import errno
hndl_called = True
def alarm_handler(signum, frame):
hndl_called = False
signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(1) signal.alarm(1)
try: try:
signal.sigwaitinfo([signal.SIGUSR1]) signal.sigwaitinfo([signal.SIGUSR1])
except OSError as e: except OSError as e:
if e.errno == errno.EINTR: if e.errno == errno.EINTR:
self.assertTrue(self.hndl_called) assert hndl_called, "SIGALRM handler not called"
else: else:
self.fail("Expected EINTR to be raised by sigwaitinfo") raise Exception("Expected EINTR to be raised by sigwaitinfo")
else: else:
self.fail("Expected EINTR to be raised by sigwaitinfo") raise Exception("Expected EINTR to be raised by sigwaitinfo")
'''
self.wait_helper(test, self.alarm_handler, signal.SIGUSR1) self.wait_helper(test, signal.SIGUSR1)
@unittest.skipUnless(hasattr(signal, 'sigwait'), @unittest.skipUnless(hasattr(signal, 'sigwait'),
'need signal.sigwait()') 'need signal.sigwait()')