merge forward from the python 2.x branch

This commit is contained in:
Jean-Paul Calderone 2010-06-19 19:54:48 +00:00
parent 2aa3af4a16
commit 867c435460
7 changed files with 474 additions and 7 deletions

View file

@ -462,9 +462,210 @@ class ItimerTest(unittest.TestCase):
# and the handler should have been called
self.assertEqual(self.hndl_called, True)
class SomeException(Exception):
"""
A unique exception class to be raised by a signal handler to verify that the
signal handler was invoked.
"""
def raiser(*args):
"""A signal handler which raises SomeException."""
raise SomeException()
class SigprocmaskTests(unittest.TestCase):
"""Tests for sigprocmask."""
def _handle_sigusr1(self):
old_handler = signal.signal(signal.SIGUSR1, raiser)
self.addCleanup(signal.signal, signal.SIGUSR1, old_handler)
def test_signature(self):
"""When invoked with other than two arguments, sigprocmask raises
TypeError.
"""
self.assertRaises(TypeError, signal.sigprocmask)
self.assertRaises(TypeError, signal.sigprocmask, 1)
self.assertRaises(TypeError, signal.sigprocmask, 1, 2, 3)
def test_invalid_how(self):
"""If a value other than SIG_BLOCK, SIG_UNBLOCK, or SIG_SETMASK is
passed for the how argument to sigprocmask, ValueError is raised.
"""
message = "value specified for how \(1700\) invalid"
with self.assertRaisesRegexp(ValueError, message):
signal.sigprocmask(1700, [])
def test_invalid_signal_iterable(self):
"""If iterating over the value passed for the signals parameter to
sigprocmask raises an exception, sigprocmask raises that exception.
"""
class BrokenIter(object):
def __iter__(self):
raise RuntimeError("my __iter__ is broken")
with self.assertRaisesRegexp(RuntimeError, "my __iter__ is broken"):
signal.sigprocmask(signal.SIG_BLOCK, BrokenIter())
def test_invalid_signal(self):
"""If an object in the iterable passed for the signals parameter to
sigprocmask isn't an integer, TypeError is raised."""
with self.assertRaisesRegexp(TypeError, "an integer is required"):
signal.sigprocmask(signal.SIG_BLOCK, [object()])
def test_return_previous_mask(self):
"""sigprocmask returns a list of the signals previously masked.
"""
previous = signal.sigprocmask(signal.SIG_BLOCK, [1, 3, 5])
result = signal.sigprocmask(signal.SIG_BLOCK, previous)
self.assertEquals(result, [1, 3, 5])
def test_block(self):
"""When invoked with SIG_BLOCK, sigprocmask blocks the signals in the
sigmask list.
"""
self._handle_sigusr1()
previous = signal.sigprocmask(signal.SIG_BLOCK, [signal.SIGUSR1])
os.kill(os.getpid(), signal.SIGUSR1)
with self.assertRaises(SomeException):
# Expect to receive SIGUSR1 after unblocking it.
signal.sigprocmask(signal.SIG_SETMASK, previous)
def test_unblock(self):
"""When invoked with SIG_UNBLOCK, sigprocmask unblocks the signals in
the sigmask list.
"""
self._handle_sigusr1()
previous = signal.sigprocmask(signal.SIG_BLOCK, [signal.SIGUSR1])
self.addCleanup(signal.sigprocmask, signal.SIG_SETMASK, previous)
signal.sigprocmask(signal.SIG_UNBLOCK, [signal.SIGUSR1])
with self.assertRaises(SomeException):
os.kill(os.getpid(), signal.SIGUSR1)
def test_long_signals(self):
"""sigprocmask accepts signal numbers as instances of long."""
previous = signal.sigprocmask(
signal.SIG_SETMASK, [long(signal.SIGUSR1), long(signal.SIGUSR2)])
masked = signal.sigprocmask(signal.SIG_SETMASK, previous)
self.assertEquals(masked, [signal.SIGUSR1, signal.SIGUSR2])
class SignalfdTests(unittest.TestCase):
"""
Tests for signal.signalfd.
"""
def test_signature(self):
"""When invoked with fewer than two arguments or more than three,
signalfd raises TypeError.
"""
self.assertRaises(TypeError, signal.signalfd)
self.assertRaises(TypeError, signal.signalfd, 1)
self.assertRaises(TypeError, signal.signalfd, 1, 2, 3, 4)
def test_create_signalfd(self):
"""When invoked with a file descriptor of -1, signalfd allocates a new
file descriptor for signal information delivery and returns it.
"""
fd = signal.signalfd(-1, [])
self.assertTrue(isinstance(fd, int))
os.close(fd)
def test_non_iterable_signals(self):
"""If an object which is not iterable is passed for the sigmask list
argument to signalfd, the exception raised by trying to iterate over
that object is raised.
"""
self.assertRaises(TypeError, signal.signalfd, -1, object())
def test_non_integer_signals(self):
"""If any non-integer values are included in the sigmask list argument
to signalfd, the exception raised by the attempt to convert them to an
integer is raised.
"""
self.assertRaises(TypeError, signal.signalfd, -1, [object()])
def test_out_of_range_signal(self):
"""If a signal number that is out of the valid range is included in the
sigmask list argument to signalfd, ValueError is raised.
"""
message = "signal number -2 out of range"
with self.assertRaisesRegexp(ValueError, message):
signal.signalfd(-1, [-2])
def test_handle_signals(self):
"""After signalfd is called, if a signal is received which was in the
sigmask list passed to that call, information about the signal can be
read from the fd returned by that call.
"""
fd = signal.signalfd(-1, [signal.SIGUSR2])
self.addCleanup(os.close, fd)
previous = signal.sigprocmask(signal.SIG_BLOCK, [signal.SIGUSR2])
self.addCleanup(signal.sigprocmask, signal.SIG_SETMASK, previous)
os.kill(os.getpid(), signal.SIGUSR2)
bytes = os.read(fd, 128)
self.assertTrue(bytes)
def test_close_on_exec(self):
"""If the bit mask passed as the 3rd argument to signalfd includes
SFD_CLOEXEC, the returned file descriptor has FD_CLOEXEC set on it.
"""
import fcntl
fd = signal.signalfd(-1, [], signal.SFD_CLOEXEC)
self.addCleanup(os.close, fd)
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
self.assertTrue(flags & fcntl.FD_CLOEXEC)
def test_nonblocking(self):
"""If the bit mask passed as the 3rd argument to signalfd includes
SFD_NOBLOCK, the file description referenced by the returned file
descriptor has O_NONBLOCK set on it.
"""
import fcntl
fd = signal.signalfd(-1, [], signal.SFD_NONBLOCK)
self.addCleanup(os.close, fd)
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
self.assertTrue(flags & os.O_NONBLOCK)
def test_default_flags(self):
"""If an empty bit mask is passed as the 3rd argument to signalfd,
neither FD_CLOEXEC nor O_NONBLOCK is set on the resulting file
descriptor/description.
"""
import fcntl
fd = signal.signalfd(-1, [])
self.addCleanup(os.close, fd)
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
self.assertFalse(flags & fcntl.FD_CLOEXEC)
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
self.assertFalse(flags & os.O_NONBLOCK)
def test_main():
support.run_unittest(BasicSignalTests, InterProcessSignalTests,
WakeupSignalTests, SiginterruptTest, ItimerTest)
support.run_unittest(
BasicSignalTests, InterProcessSignalTests,
WakeupSignalTests, SiginterruptTest, ItimerTest, SignalfdTests,
SigprocmaskTests)
if __name__ == "__main__":