Issue #8407: Make signal.sigwait() tests more reliable

Block the signal before calling sigwait(). Use os.fork() to ensure that we have
only one thread.

Initial patch written by Charles-François Natali.
This commit is contained in:
Victor Stinner 2011-06-10 12:48:13 +02:00
parent b0ae53d8a0
commit af4946020e

View file

@ -598,34 +598,73 @@ class PendingSignalsTests(unittest.TestCase):
with self.assertRaises(ZeroDivisionError): with self.assertRaises(ZeroDivisionError):
signal.pthread_kill(current, signum) signal.pthread_kill(current, signum)
def check_sigwait(self, test, signum):
# sigwait must be called with the signal blocked: since the current
# process might have several threads running, we fork() a child process
# to have a single thread.
pid = os.fork()
if pid == 0:
# child: block and wait the signal
try:
signal.signal(signum, self.handler)
signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
# Do the tests
test(signum)
# The handler must not be called on unblock
try:
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
except ZeroDivisionError:
print("the signal handler has been called",
file=sys.stderr)
os._exit(1)
os._exit(0)
finally:
os._exit(1)
else:
# parent: let the child some time to wait, send him the signal, and
# check it correcty received it
self.assertEqual(os.waitpid(pid, 0), (pid, 0))
@unittest.skipUnless(hasattr(signal, 'sigwait'), @unittest.skipUnless(hasattr(signal, 'sigwait'),
'need signal.sigwait()') 'need signal.sigwait()')
@unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
def test_sigwait(self): def test_sigwait(self):
old_handler = signal.signal(signal.SIGALRM, self.handler) def test(signum):
self.addCleanup(signal.signal, signal.SIGALRM, old_handler) signal.alarm(1)
received = signal.sigwait([signum])
if received != signum:
print("sigwait() received %s, not %s"
% (received, signum),
file=sys.stderr)
os._exit(1)
signal.alarm(1) self.check_sigwait(test, signal.SIGALRM)
self.assertEqual(signal.sigwait([signal.SIGALRM]), signal.SIGALRM)
@unittest.skipUnless(hasattr(signal, 'sigwait'), @unittest.skipUnless(hasattr(signal, 'sigwait'),
'need signal.sigwait()') 'need signal.sigwait()')
@unittest.skipIf(threading is None, "test needs threading module") @unittest.skipIf(threading is None, "test needs threading module")
@unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
def test_sigwait_thread(self): def test_sigwait_thread(self):
signum = signal.SIGUSR1 def kill_later(signum):
old_handler = signal.signal(signum, self.handler) # wait until the main thread is waiting in sigwait()
self.addCleanup(signal.signal, signum, old_handler)
def kill_later():
time.sleep(1) time.sleep(1)
os.kill(os.getpid(), signum) os.kill(os.getpid(), signum)
killer = threading.Thread(target=kill_later) def test(signum):
killer.start() killer = threading.Thread(target=kill_later, args=(signum,))
try: killer.start()
self.assertEqual(signal.sigwait([signum]), signum) received = signal.sigwait([signum])
finally: if received != signum:
print("sigwait() received %s, not %s" % (received, signum),
file=sys.stderr)
os._exit(1)
killer.join() killer.join()
self.check_sigwait(test, signal.SIGUSR1)
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
'need signal.pthread_sigmask()') 'need signal.pthread_sigmask()')
def test_pthread_sigmask_arguments(self): def test_pthread_sigmask_arguments(self):