mirror of
https://github.com/python/cpython.git
synced 2025-08-30 21:48:47 +00:00
Issue #12303: Add sigwaitinfo() and sigtimedwait() to the signal module.
This commit is contained in:
parent
bb66972c0b
commit
bc808224b6
8 changed files with 295 additions and 23 deletions
|
@ -520,6 +520,7 @@ class PendingSignalsTests(unittest.TestCase):
|
|||
functions.
|
||||
"""
|
||||
def setUp(self):
|
||||
self.hndl_called = False
|
||||
self.has_pthread_kill = hasattr(signal, 'pthread_kill')
|
||||
|
||||
def handler(self, signum, frame):
|
||||
|
@ -607,45 +608,35 @@ class PendingSignalsTests(unittest.TestCase):
|
|||
with self.assertRaises(ZeroDivisionError):
|
||||
signal.pthread_kill(current, signum)
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'sigwait'),
|
||||
'need signal.sigwait()')
|
||||
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
|
||||
'need signal.pthread_sigmask()')
|
||||
@unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
|
||||
def test_sigwait(self):
|
||||
def test(signum):
|
||||
signal.alarm(1)
|
||||
received = signal.sigwait([signum])
|
||||
if received != signum:
|
||||
print("sigwait() received %s, not %s"
|
||||
% (received, signum),
|
||||
file=sys.stderr)
|
||||
os._exit(1)
|
||||
|
||||
def wait_helper(self, test, handler, blocked=signal.SIGALRM):
|
||||
signum = signal.SIGALRM
|
||||
|
||||
# sigwait must be called with the signal blocked: since the current
|
||||
# sig*wait* must be called with the signal blocked: since the current
|
||||
# process might have several threads running, we fork() a child process
|
||||
# to have a single thread.
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
# child: block and wait the signal
|
||||
try:
|
||||
signal.signal(signum, self.handler)
|
||||
signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
|
||||
signal.signal(signum, handler)
|
||||
signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
|
||||
|
||||
# Do the tests
|
||||
test(signum)
|
||||
|
||||
# The handler must not be called on unblock
|
||||
try:
|
||||
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
|
||||
signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
|
||||
except ZeroDivisionError:
|
||||
print("the signal handler has been called",
|
||||
file=sys.stderr)
|
||||
os._exit(1)
|
||||
except BaseException as err:
|
||||
print("error: {}".format(err), file=sys.stderr)
|
||||
sys.stderr.flush()
|
||||
os._exit(1)
|
||||
else:
|
||||
os._exit(0)
|
||||
|
@ -653,6 +644,82 @@ class PendingSignalsTests(unittest.TestCase):
|
|||
# parent: check that the child correcty received the signal
|
||||
self.assertEqual(os.waitpid(pid, 0), (pid, 0))
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'sigwait'),
|
||||
'need signal.sigwait()')
|
||||
def test_sigwait(self):
|
||||
def test(signum):
|
||||
signal.alarm(1)
|
||||
self.assertEqual(signum, signal.sigwait([signum]))
|
||||
|
||||
self.wait_helper(test, self.handler)
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
|
||||
'need signal.sigwaitinfo()')
|
||||
def test_sigwaitinfo(self):
|
||||
def test(signum):
|
||||
signal.alarm(1)
|
||||
info = signal.sigwaitinfo([signum])
|
||||
self.assertEqual(signum, info.si_signo)
|
||||
|
||||
self.wait_helper(test, self.handler)
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
|
||||
'need signal.sigtimedwait()')
|
||||
def test_sigtimedwait(self):
|
||||
def test(signum):
|
||||
signal.alarm(1)
|
||||
info = signal.sigtimedwait([signum], (10, 1000))
|
||||
self.assertEqual(signum, info.si_signo)
|
||||
|
||||
self.wait_helper(test, self.handler)
|
||||
|
||||
# check that polling with sigtimedwait works
|
||||
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
|
||||
'need signal.sigtimedwait()')
|
||||
def test_sigtimedwait_poll(self):
|
||||
def test(signum):
|
||||
self.kill(signum)
|
||||
info = signal.sigtimedwait([signum], (0, 0))
|
||||
self.assertEqual(signum, info.si_signo)
|
||||
|
||||
self.wait_helper(test, self.handler)
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
|
||||
'need signal.sigtimedwait()')
|
||||
def test_sigtimedwait_timeout(self):
|
||||
def test(signum):
|
||||
self.assertEqual(None, signal.sigtimedwait([signum], (1, 35500)))
|
||||
|
||||
self.wait_helper(test, self.handler)
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
|
||||
'need signal.sigtimedwait()')
|
||||
def test_sigtimedwait_negative_timeout(self):
|
||||
signum = signal.SIGALRM
|
||||
self.assertRaises(ValueError, signal.sigtimedwait, [signum], (-1, -1))
|
||||
self.assertRaises(ValueError, signal.sigtimedwait, [signum], (0, -1))
|
||||
self.assertRaises(ValueError, signal.sigtimedwait, [signum], (-1, 0))
|
||||
|
||||
def alarm_handler(self, signum, frame):
|
||||
self.hndl_called = True
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
|
||||
'need signal.sigwaitinfo()')
|
||||
def test_sigwaitinfo_interrupted(self):
|
||||
def test(signum):
|
||||
signal.alarm(1)
|
||||
try:
|
||||
signal.sigwaitinfo([signal.SIGUSR1])
|
||||
except OSError as e:
|
||||
if e.errno == errno.EINTR:
|
||||
self.assertTrue(self.hndl_called)
|
||||
else:
|
||||
self.fail("Expected EINTR to be raised by sigwaitinfo")
|
||||
else:
|
||||
self.fail("Expected EINTR to be raised by sigwaitinfo")
|
||||
|
||||
self.wait_helper(test, self.alarm_handler, signal.SIGUSR1)
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'sigwait'),
|
||||
'need signal.sigwait()')
|
||||
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue