mirror of
https://github.com/python/cpython.git
synced 2025-08-04 08:59:19 +00:00
tets
This commit is contained in:
parent
316b16de13
commit
d18ccd19f0
5 changed files with 278 additions and 41 deletions
|
@ -6,6 +6,7 @@ import gc
|
|||
import pickle
|
||||
import select
|
||||
import signal
|
||||
import socket
|
||||
import struct
|
||||
import subprocess
|
||||
import traceback
|
||||
|
@ -251,21 +252,43 @@ class WindowsSignalTests(unittest.TestCase):
|
|||
class WakeupFDTests(unittest.TestCase):
|
||||
|
||||
def test_invalid_fd(self):
|
||||
fd = support.make_bad_fd()
|
||||
if sys.platform == "win32":
|
||||
sock = socket.socket()
|
||||
fd = sock.fileno()
|
||||
sock.close()
|
||||
else:
|
||||
fd = support.make_bad_fd()
|
||||
self.assertRaises((ValueError, OSError),
|
||||
signal.set_wakeup_fd, fd)
|
||||
|
||||
def test_set_wakeup_fd_result(self):
|
||||
r1, w1 = os.pipe()
|
||||
self.addCleanup(os.close, r1)
|
||||
self.addCleanup(os.close, w1)
|
||||
r2, w2 = os.pipe()
|
||||
self.addCleanup(os.close, r2)
|
||||
self.addCleanup(os.close, w2)
|
||||
@unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
|
||||
def test_only_socket(self):
|
||||
# set_wakeup_fd() expects a socket on Windows
|
||||
with open(support.TESTFN, 'wb') as fp:
|
||||
self.addCleanup(support.unlink, support.TESTFN)
|
||||
self.assertRaises(ValueError,
|
||||
signal.set_wakeup_fd, fp.fileno())
|
||||
|
||||
signal.set_wakeup_fd(w1)
|
||||
self.assertIs(signal.set_wakeup_fd(w2), w1)
|
||||
self.assertIs(signal.set_wakeup_fd(-1), w2)
|
||||
def test_set_wakeup_fd_result(self):
|
||||
if sys.platform == 'win32':
|
||||
sock1 = socket.socket()
|
||||
self.addCleanup(sock1.close)
|
||||
fd1 = sock1.fileno()
|
||||
|
||||
sock2 = socket.socket()
|
||||
self.addCleanup(sock2.close)
|
||||
fd2 = sock2.fileno()
|
||||
else:
|
||||
r1, fd1 = os.pipe()
|
||||
self.addCleanup(os.close, r1)
|
||||
self.addCleanup(os.close, fd1)
|
||||
r2, fd2 = os.pipe()
|
||||
self.addCleanup(os.close, r2)
|
||||
self.addCleanup(os.close, fd2)
|
||||
|
||||
signal.set_wakeup_fd(fd1)
|
||||
self.assertIs(signal.set_wakeup_fd(fd2), fd1)
|
||||
self.assertIs(signal.set_wakeup_fd(-1), fd2)
|
||||
self.assertIs(signal.set_wakeup_fd(-1), -1)
|
||||
|
||||
|
||||
|
@ -441,6 +464,90 @@ class WakeupSignalTests(unittest.TestCase):
|
|||
""", signal.SIGUSR1, signal.SIGUSR2, ordered=False)
|
||||
|
||||
|
||||
@unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair')
|
||||
class WakeupSocketSignalTests(unittest.TestCase):
|
||||
|
||||
@unittest.skipIf(_testcapi is None, 'need _testcapi')
|
||||
def test_socket(self):
|
||||
# use a subprocess to have only one thread
|
||||
code = """if 1:
|
||||
import signal
|
||||
import socket
|
||||
import struct
|
||||
import _testcapi
|
||||
|
||||
signum = signal.SIGINT
|
||||
signals = (signum,)
|
||||
|
||||
def handler(signum, frame):
|
||||
pass
|
||||
|
||||
signal.signal(signum, handler)
|
||||
|
||||
read, write = socket.socketpair()
|
||||
read.setblocking(False)
|
||||
write.setblocking(False)
|
||||
signal.set_wakeup_fd(write.fileno())
|
||||
|
||||
_testcapi.raise_signal(signum)
|
||||
|
||||
data = read.recv(1)
|
||||
if not data:
|
||||
raise Exception("no signum written")
|
||||
raised = struct.unpack('B', data)
|
||||
if raised != signals:
|
||||
raise Exception("%r != %r" % (raised, signals))
|
||||
|
||||
read.close()
|
||||
write.close()
|
||||
"""
|
||||
|
||||
assert_python_ok('-c', code)
|
||||
|
||||
@unittest.skipIf(_testcapi is None, 'need _testcapi')
|
||||
def test_send_error(self):
|
||||
# Use a subprocess to have only one thread.
|
||||
if os.name == 'nt':
|
||||
action = 'send'
|
||||
else:
|
||||
action = 'write'
|
||||
code = """if 1:
|
||||
import errno
|
||||
import signal
|
||||
import socket
|
||||
import sys
|
||||
import time
|
||||
import _testcapi
|
||||
from test.support import captured_stderr
|
||||
|
||||
signum = signal.SIGINT
|
||||
|
||||
def handler(signum, frame):
|
||||
pass
|
||||
|
||||
signal.signal(signum, handler)
|
||||
|
||||
read, write = socket.socketpair()
|
||||
read.setblocking(False)
|
||||
write.setblocking(False)
|
||||
|
||||
signal.set_wakeup_fd(write.fileno())
|
||||
|
||||
# Close sockets: send() will fail
|
||||
read.close()
|
||||
write.close()
|
||||
|
||||
with captured_stderr() as err:
|
||||
_testcapi.raise_signal(signum)
|
||||
|
||||
err = err.getvalue()
|
||||
if ('Exception ignored when trying to {action} to the signal wakeup fd'
|
||||
not in err):
|
||||
raise AssertionError(err)
|
||||
""".format(action=action)
|
||||
assert_python_ok('-c', code)
|
||||
|
||||
|
||||
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
|
||||
class SiginterruptTest(unittest.TestCase):
|
||||
|
||||
|
@ -990,6 +1097,7 @@ def test_main():
|
|||
try:
|
||||
support.run_unittest(GenericTests, PosixTests, InterProcessSignalTests,
|
||||
WakeupFDTests, WakeupSignalTests,
|
||||
WakeupSocketSignalTests,
|
||||
SiginterruptTest, ItimerTest, WindowsSignalTests,
|
||||
PendingSignalsTests)
|
||||
finally:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue