mirror of
https://github.com/python/cpython.git
synced 2025-07-29 06:05:00 +00:00

wrapper around the system call with the same name. Also added test cases, doc changes and NEWS entry. Thanks Jason and Ralf Schmitt.
268 lines
9.1 KiB
Python
268 lines
9.1 KiB
Python
import unittest
|
|
from test import test_support
|
|
import signal
|
|
import os, sys, time, errno
|
|
|
|
class HandlerBCalled(Exception):
|
|
pass
|
|
|
|
class InterProcessSignalTests(unittest.TestCase):
|
|
MAX_DURATION = 20 # Entire test should last at most 20 sec.
|
|
|
|
# Set up a child to send signals to us (the parent) after waiting
|
|
# long enough to receive the alarm. It seems we miss the alarm
|
|
# for some reason. This will hopefully stop the hangs on
|
|
# Tru64/Alpha. Alas, it doesn't. Tru64 appears to miss all the
|
|
# signals at times, or seemingly random subsets of them, and
|
|
# nothing done in force_test_exit so far has actually helped.
|
|
def spawn_force_test_exit_process(self, parent_pid):
|
|
# Sigh, both imports seem necessary to avoid errors.
|
|
import os
|
|
fork_pid = os.fork()
|
|
if fork_pid:
|
|
# In parent.
|
|
return fork_pid
|
|
|
|
# In child.
|
|
import os, time
|
|
try:
|
|
# Wait 5 seconds longer than the expected alarm to give enough
|
|
# time for the normal sequence of events to occur. This is
|
|
# just a stop-gap to try to prevent the test from hanging.
|
|
time.sleep(self.MAX_DURATION + 5)
|
|
print >> sys.__stdout__, " child should not have to kill parent"
|
|
for signame in "SIGHUP", "SIGUSR1", "SIGUSR2", "SIGALRM":
|
|
os.kill(parent_pid, getattr(signal, signame))
|
|
print >> sys.__stdout__, " child sent", signame, "to", \
|
|
parent_pid
|
|
time.sleep(1)
|
|
finally:
|
|
os._exit(0)
|
|
|
|
def handlerA(self, *args):
|
|
self.a_called = True
|
|
if test_support.verbose:
|
|
print "handlerA invoked", args
|
|
|
|
def handlerB(self, *args):
|
|
self.b_called = True
|
|
if test_support.verbose:
|
|
print "handlerB invoked", args
|
|
raise HandlerBCalled(*args)
|
|
|
|
def test_main(self):
|
|
self.assertEquals(signal.getsignal(signal.SIGHUP), self.handlerA)
|
|
self.assertEquals(signal.getsignal(signal.SIGUSR1), self.handlerB)
|
|
self.assertEquals(signal.getsignal(signal.SIGUSR2), signal.SIG_IGN)
|
|
self.assertEquals(signal.getsignal(signal.SIGALRM),
|
|
signal.default_int_handler)
|
|
|
|
# Launch an external script to send us signals.
|
|
# We expect the external script to:
|
|
# send HUP, which invokes handlerA to set a_called
|
|
# send USR1, which invokes handlerB to set b_called and raise
|
|
# HandlerBCalled
|
|
# send USR2, which is ignored
|
|
#
|
|
# Then we expect the alarm to go off, and its handler raises
|
|
# KeyboardInterrupt, finally getting us out of the loop.
|
|
|
|
if test_support.verbose:
|
|
verboseflag = '-x'
|
|
else:
|
|
verboseflag = '+x'
|
|
|
|
pid = self.pid
|
|
if test_support.verbose:
|
|
print "test runner's pid is", pid
|
|
|
|
# Shell script that will send us asynchronous signals
|
|
script = """
|
|
(
|
|
set %(verboseflag)s
|
|
sleep 2
|
|
kill -HUP %(pid)d
|
|
sleep 2
|
|
kill -USR1 %(pid)d
|
|
sleep 2
|
|
kill -USR2 %(pid)d
|
|
) &
|
|
""" % vars()
|
|
|
|
signal.alarm(self.MAX_DURATION)
|
|
|
|
handler_b_exception_raised = False
|
|
|
|
os.system(script)
|
|
try:
|
|
if test_support.verbose:
|
|
print "starting pause() loop..."
|
|
while 1:
|
|
try:
|
|
if test_support.verbose:
|
|
print "call pause()..."
|
|
signal.pause()
|
|
if test_support.verbose:
|
|
print "pause() returned"
|
|
except HandlerBCalled:
|
|
handler_b_exception_raised = True
|
|
if test_support.verbose:
|
|
print "HandlerBCalled exception caught"
|
|
|
|
except KeyboardInterrupt:
|
|
if test_support.verbose:
|
|
print "KeyboardInterrupt (the alarm() went off)"
|
|
|
|
self.assert_(self.a_called)
|
|
self.assert_(self.b_called)
|
|
self.assert_(handler_b_exception_raised)
|
|
|
|
def setUp(self):
|
|
# Install handlers.
|
|
self.hup = signal.signal(signal.SIGHUP, self.handlerA)
|
|
self.usr1 = signal.signal(signal.SIGUSR1, self.handlerB)
|
|
self.usr2 = signal.signal(signal.SIGUSR2, signal.SIG_IGN)
|
|
self.alrm = signal.signal(signal.SIGALRM,
|
|
signal.default_int_handler)
|
|
self.a_called = False
|
|
self.b_called = False
|
|
self.pid = os.getpid()
|
|
self.fork_pid = self.spawn_force_test_exit_process(self.pid)
|
|
|
|
def tearDown(self):
|
|
# Forcibly kill the child we created to ping us if there was a
|
|
# test error.
|
|
try:
|
|
# Make sure we don't kill ourself if there was a fork
|
|
# error.
|
|
if self.fork_pid > 0:
|
|
os.kill(self.fork_pid, signal.SIGKILL)
|
|
except:
|
|
# If the child killed us, it has probably exited. Killing
|
|
# a non-existent process will raise an error which we
|
|
# don't care about.
|
|
pass
|
|
|
|
# Restore handlers.
|
|
signal.alarm(0) # cancel alarm in case we died early
|
|
signal.signal(signal.SIGHUP, self.hup)
|
|
signal.signal(signal.SIGUSR1, self.usr1)
|
|
signal.signal(signal.SIGUSR2, self.usr2)
|
|
signal.signal(signal.SIGALRM, self.alrm)
|
|
|
|
|
|
class BasicSignalTests(unittest.TestCase):
|
|
def test_out_of_range_signal_number_raises_error(self):
|
|
self.assertRaises(ValueError, signal.getsignal, 4242)
|
|
|
|
def trivial_signal_handler(*args):
|
|
pass
|
|
|
|
self.assertRaises(ValueError, signal.signal, 4242,
|
|
trivial_signal_handler)
|
|
|
|
def test_setting_signal_handler_to_none_raises_error(self):
|
|
self.assertRaises(TypeError, signal.signal,
|
|
signal.SIGUSR1, None)
|
|
|
|
class WakeupSignalTests(unittest.TestCase):
|
|
TIMEOUT_FULL = 10
|
|
TIMEOUT_HALF = 5
|
|
|
|
def test_wakeup_fd_early(self):
|
|
import select
|
|
|
|
signal.alarm(1)
|
|
before_time = time.time()
|
|
# We attempt to get a signal during the sleep,
|
|
# before select is called
|
|
time.sleep(self.TIMEOUT_FULL)
|
|
mid_time = time.time()
|
|
self.assert_(mid_time - before_time < self.TIMEOUT_HALF)
|
|
select.select([self.read], [], [], self.TIMEOUT_FULL)
|
|
after_time = time.time()
|
|
self.assert_(after_time - mid_time < self.TIMEOUT_HALF)
|
|
|
|
def test_wakeup_fd_during(self):
|
|
import select
|
|
|
|
signal.alarm(1)
|
|
before_time = time.time()
|
|
# We attempt to get a signal during the select call
|
|
self.assertRaises(select.error, select.select,
|
|
[self.read], [], [], self.TIMEOUT_FULL)
|
|
after_time = time.time()
|
|
self.assert_(after_time - before_time < self.TIMEOUT_HALF)
|
|
|
|
def setUp(self):
|
|
import fcntl
|
|
|
|
self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
|
|
self.read, self.write = os.pipe()
|
|
flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
|
|
flags = flags | os.O_NONBLOCK
|
|
fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
|
|
self.old_wakeup = signal.set_wakeup_fd(self.write)
|
|
|
|
def tearDown(self):
|
|
signal.set_wakeup_fd(self.old_wakeup)
|
|
os.close(self.read)
|
|
os.close(self.write)
|
|
signal.signal(signal.SIGALRM, self.alrm)
|
|
|
|
class SiginterruptTest(unittest.TestCase):
|
|
signum = signal.SIGUSR1
|
|
def readpipe_interrupted(self, cb):
|
|
r, w = os.pipe()
|
|
ppid = os.getpid()
|
|
pid = os.fork()
|
|
|
|
oldhandler = signal.signal(self.signum, lambda x,y: None)
|
|
cb()
|
|
if pid==0:
|
|
# child code: sleep, kill, sleep. and then exit,
|
|
# which closes the pipe from which the parent process reads
|
|
try:
|
|
time.sleep(0.2)
|
|
os.kill(ppid, self.signum)
|
|
time.sleep(0.2)
|
|
finally:
|
|
os._exit(0)
|
|
|
|
try:
|
|
os.close(w)
|
|
|
|
try:
|
|
d=os.read(r, 1)
|
|
return False
|
|
except OSError, err:
|
|
if err.errno != errno.EINTR:
|
|
raise
|
|
return True
|
|
finally:
|
|
signal.signal(self.signum, oldhandler)
|
|
os.waitpid(pid, 0)
|
|
|
|
def test_without_siginterrupt(self):
|
|
i=self.readpipe_interrupted(lambda: None)
|
|
self.assertEquals(i, True)
|
|
|
|
def test_siginterrupt_on(self):
|
|
i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 1))
|
|
self.assertEquals(i, True)
|
|
|
|
def test_siginterrupt_off(self):
|
|
i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 0))
|
|
self.assertEquals(i, False)
|
|
|
|
def test_main():
|
|
if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos':
|
|
raise test_support.TestSkipped("Can't test signal on %s" % \
|
|
sys.platform)
|
|
|
|
test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
|
|
WakeupSignalTests, SiginterruptTest)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
test_main()
|