Rewrite eintr_tester.py to avoid os.fork()

eintr_tester.py calls signal.setitimer() to send signals to the current process
every 100 ms. The test sometimes hangs on FreeBSD. Maybe there is a race
condition in the child process after fork(). It's unsafe to run arbitrary code
after fork().

This change replace os.fork() with a regular call to subprocess.Popen(). This
change avoids the risk of having a child process which continue to execute
eintr_tester.py instead of exiting. It also ensures that the child process
doesn't inherit unexpected file descriptors by mistake.

Since I'm unable to reproduce the issue on FreeBSD, I will have to watch
FreeBSD buildbots to check if the issue is fixed or not.

Remove previous attempt to debug: remove call to
faulthandler.dump_traceback_later().
This commit is contained in:
Victor Stinner 2015-09-03 01:38:44 +02:00
parent 5a682c9ca5
commit be923ac948

View file

@ -8,12 +8,13 @@ Signals are generated in-process using setitimer(ITIMER_REAL), which allows
sub-second periodicity (contrarily to signal()). sub-second periodicity (contrarily to signal()).
""" """
import faulthandler
import io import io
import os import os
import select import select
import signal import signal
import socket import socket
import subprocess
import sys
import time import time
import unittest import unittest
@ -29,7 +30,7 @@ class EINTRBaseTest(unittest.TestCase):
# signal delivery periodicity # signal delivery periodicity
signal_period = 0.1 signal_period = 0.1
# default sleep time for tests - should obviously have: # default sleep time for tests - should obviously have:
# sleep_time > signal_period # sleep_time > signal_period
sleep_time = 0.2 sleep_time = 0.2
@classmethod @classmethod
@ -37,17 +38,10 @@ class EINTRBaseTest(unittest.TestCase):
cls.orig_handler = signal.signal(signal.SIGALRM, lambda *args: None) cls.orig_handler = signal.signal(signal.SIGALRM, lambda *args: None)
signal.setitimer(signal.ITIMER_REAL, cls.signal_delay, signal.setitimer(signal.ITIMER_REAL, cls.signal_delay,
cls.signal_period) cls.signal_period)
if hasattr(faulthandler, 'dump_traceback_later'):
# Most tests take less than 30 seconds, so 15 minutes should be
# enough. dump_traceback_later() is implemented with a thread, but
# pthread_sigmask() is used to mask all signaled on this thread.
faulthandler.dump_traceback_later(15 * 60, exit=True)
@classmethod @classmethod
def stop_alarm(cls): def stop_alarm(cls):
signal.setitimer(signal.ITIMER_REAL, 0, 0) signal.setitimer(signal.ITIMER_REAL, 0, 0)
if hasattr(faulthandler, 'cancel_dump_traceback_later'):
faulthandler.cancel_dump_traceback_later()
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
@ -59,18 +53,22 @@ class EINTRBaseTest(unittest.TestCase):
# default sleep time # default sleep time
time.sleep(cls.sleep_time) time.sleep(cls.sleep_time)
def subprocess(self, *args, **kw):
cmd_args = (sys.executable, '-c') + args
return subprocess.Popen(cmd_args, **kw)
@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
class OSEINTRTest(EINTRBaseTest): class OSEINTRTest(EINTRBaseTest):
""" EINTR tests for the os module. """ """ EINTR tests for the os module. """
def new_sleep_process(self):
code = 'import time; time.sleep(%r)' % self.sleep_time
return self.subprocess(code)
def _test_wait_multiple(self, wait_func): def _test_wait_multiple(self, wait_func):
num = 3 num = 3
for _ in range(num): processes = [self.new_sleep_process() for _ in range(num)]
pid = os.fork()
if pid == 0:
self._sleep()
os._exit(0)
for _ in range(num): for _ in range(num):
wait_func() wait_func()
@ -82,12 +80,8 @@ class OSEINTRTest(EINTRBaseTest):
self._test_wait_multiple(lambda: os.wait3(0)) self._test_wait_multiple(lambda: os.wait3(0))
def _test_wait_single(self, wait_func): def _test_wait_single(self, wait_func):
pid = os.fork() proc = self.new_sleep_process()
if pid == 0: wait_func(proc.pid)
self._sleep()
os._exit(0)
else:
wait_func(pid)
def test_waitpid(self): def test_waitpid(self):
self._test_wait_single(lambda pid: os.waitpid(pid, 0)) self._test_wait_single(lambda pid: os.waitpid(pid, 0))
@ -105,19 +99,24 @@ class OSEINTRTest(EINTRBaseTest):
# atomic # atomic
datas = [b"hello", b"world", b"spam"] datas = [b"hello", b"world", b"spam"]
pid = os.fork() code = '\n'.join((
if pid == 0: 'import os, sys, time',
os.close(rd) '',
for data in datas: 'wr = int(sys.argv[1])',
# let the parent block on read() 'datas = %r' % datas,
self._sleep() 'sleep_time = %r' % self.sleep_time,
os.write(wr, data) '',
os._exit(0) 'for data in datas:',
else: ' # let the parent block on read()',
self.addCleanup(os.waitpid, pid, 0) ' time.sleep(sleep_time)',
' os.write(wr, data)',
))
with self.subprocess(code, str(wr), pass_fds=[wr]) as proc:
os.close(wr) os.close(wr)
for data in datas: for data in datas:
self.assertEqual(data, os.read(rd, len(data))) self.assertEqual(data, os.read(rd, len(data)))
self.assertEqual(proc.wait(), 0)
def test_write(self): def test_write(self):
rd, wr = os.pipe() rd, wr = os.pipe()
@ -127,23 +126,34 @@ class OSEINTRTest(EINTRBaseTest):
# we must write enough data for the write() to block # we must write enough data for the write() to block
data = b"xyz" * support.PIPE_MAX_SIZE data = b"xyz" * support.PIPE_MAX_SIZE
pid = os.fork() code = '\n'.join((
if pid == 0: 'import io, os, sys, time',
os.close(wr) '',
read_data = io.BytesIO() 'rd = int(sys.argv[1])',
# let the parent block on write() 'sleep_time = %r' % self.sleep_time,
self._sleep() 'data = b"xyz" * %s' % support.PIPE_MAX_SIZE,
while len(read_data.getvalue()) < len(data): 'data_len = len(data)',
chunk = os.read(rd, 2 * len(data)) '',
read_data.write(chunk) '# let the parent block on write()',
self.assertEqual(read_data.getvalue(), data) 'time.sleep(sleep_time)',
os._exit(0) '',
else: 'read_data = io.BytesIO()',
'while len(read_data.getvalue()) < data_len:',
' chunk = os.read(rd, 2 * data_len)',
' read_data.write(chunk)',
'',
'value = read_data.getvalue()',
'if value != data:',
' raise Exception("read error: %s vs %s bytes"',
' % (len(value), data_len))',
))
with self.subprocess(code, str(rd), pass_fds=[rd]) as proc:
os.close(rd) os.close(rd)
written = 0 written = 0
while written < len(data): while written < len(data):
written += os.write(wr, memoryview(data)[written:]) written += os.write(wr, memoryview(data)[written:])
self.assertEqual(0, os.waitpid(pid, 0)[1]) self.assertEqual(proc.wait(), 0)
@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
@ -159,19 +169,32 @@ class SocketEINTRTest(EINTRBaseTest):
# single-byte payload guard us against partial recv # single-byte payload guard us against partial recv
datas = [b"x", b"y", b"z"] datas = [b"x", b"y", b"z"]
pid = os.fork() code = '\n'.join((
if pid == 0: 'import os, socket, sys, time',
rd.close() '',
for data in datas: 'fd = int(sys.argv[1])',
# let the parent block on recv() 'family = %s' % int(wr.family),
self._sleep() 'sock_type = %s' % int(wr.type),
wr.sendall(data) 'datas = %r' % datas,
os._exit(0) 'sleep_time = %r' % self.sleep_time,
else: '',
self.addCleanup(os.waitpid, pid, 0) 'wr = socket.fromfd(fd, family, sock_type)',
'os.close(fd)',
'',
'with wr:',
' for data in datas:',
' # let the parent block on recv()',
' time.sleep(sleep_time)',
' wr.sendall(data)',
))
fd = wr.fileno()
proc = self.subprocess(code, str(fd), pass_fds=[fd])
with proc:
wr.close() wr.close()
for data in datas: for data in datas:
self.assertEqual(data, recv_func(rd, len(data))) self.assertEqual(data, recv_func(rd, len(data)))
self.assertEqual(proc.wait(), 0)
def test_recv(self): def test_recv(self):
self._test_recv(socket.socket.recv) self._test_recv(socket.socket.recv)
@ -188,25 +211,43 @@ class SocketEINTRTest(EINTRBaseTest):
# we must send enough data for the send() to block # we must send enough data for the send() to block
data = b"xyz" * (support.SOCK_MAX_SIZE // 3) data = b"xyz" * (support.SOCK_MAX_SIZE // 3)
pid = os.fork() code = '\n'.join((
if pid == 0: 'import os, socket, sys, time',
wr.close() '',
# let the parent block on send() 'fd = int(sys.argv[1])',
self._sleep() 'family = %s' % int(rd.family),
received_data = bytearray(len(data)) 'sock_type = %s' % int(rd.type),
n = 0 'sleep_time = %r' % self.sleep_time,
while n < len(data): 'data = b"xyz" * %s' % (support.SOCK_MAX_SIZE // 3),
n += rd.recv_into(memoryview(received_data)[n:]) 'data_len = len(data)',
self.assertEqual(received_data, data) '',
os._exit(0) 'rd = socket.fromfd(fd, family, sock_type)',
else: 'os.close(fd)',
'',
'with rd:',
' # let the parent block on send()',
' time.sleep(sleep_time)',
'',
' received_data = bytearray(data_len)',
' n = 0',
' while n < data_len:',
' n += rd.recv_into(memoryview(received_data)[n:])',
'',
'if received_data != data:',
' raise Exception("recv error: %s vs %s bytes"',
' % (len(received_data), data_len))',
))
fd = rd.fileno()
proc = self.subprocess(code, str(fd), pass_fds=[fd])
with proc:
rd.close() rd.close()
written = 0 written = 0
while written < len(data): while written < len(data):
sent = send_func(wr, memoryview(data)[written:]) sent = send_func(wr, memoryview(data)[written:])
# sendall() returns None # sendall() returns None
written += len(data) if sent is None else sent written += len(data) if sent is None else sent
self.assertEqual(0, os.waitpid(pid, 0)[1]) self.assertEqual(proc.wait(), 0)
def test_send(self): def test_send(self):
self._test_send(socket.socket.send) self._test_send(socket.socket.send)
@ -223,45 +264,60 @@ class SocketEINTRTest(EINTRBaseTest):
self.addCleanup(sock.close) self.addCleanup(sock.close)
sock.bind((support.HOST, 0)) sock.bind((support.HOST, 0))
_, port = sock.getsockname() port = sock.getsockname()[1]
sock.listen() sock.listen()
pid = os.fork() code = '\n'.join((
if pid == 0: 'import socket, time',
# let parent block on accept() '',
self._sleep() 'host = %r' % support.HOST,
with socket.create_connection((support.HOST, port)): 'port = %s' % port,
self._sleep() 'sleep_time = %r' % self.sleep_time,
os._exit(0) '',
else: '# let parent block on accept()',
self.addCleanup(os.waitpid, pid, 0) 'time.sleep(sleep_time)',
'with socket.create_connection((host, port)):',
' time.sleep(sleep_time)',
))
with self.subprocess(code) as proc:
client_sock, _ = sock.accept() client_sock, _ = sock.accept()
client_sock.close() client_sock.close()
self.assertEqual(proc.wait(), 0)
@unittest.skipUnless(hasattr(os, 'mkfifo'), 'needs mkfifo()') @unittest.skipUnless(hasattr(os, 'mkfifo'), 'needs mkfifo()')
def _test_open(self, do_open_close_reader, do_open_close_writer): def _test_open(self, do_open_close_reader, do_open_close_writer):
filename = support.TESTFN
# Use a fifo: until the child opens it for reading, the parent will # Use a fifo: until the child opens it for reading, the parent will
# block when trying to open it for writing. # block when trying to open it for writing.
support.unlink(support.TESTFN) support.unlink(filename)
os.mkfifo(support.TESTFN) os.mkfifo(filename)
self.addCleanup(support.unlink, support.TESTFN) self.addCleanup(support.unlink, filename)
pid = os.fork() code = '\n'.join((
if pid == 0: 'import os, time',
# let the parent block '',
self._sleep() 'path = %a' % filename,
do_open_close_reader(support.TESTFN) 'sleep_time = %r' % self.sleep_time,
os._exit(0) '',
else: '# let the parent block',
self.addCleanup(os.waitpid, pid, 0) 'time.sleep(sleep_time)',
do_open_close_writer(support.TESTFN) '',
do_open_close_reader,
))
with self.subprocess(code) as proc:
do_open_close_writer(filename)
self.assertEqual(proc.wait(), 0)
def test_open(self): def test_open(self):
self._test_open(lambda path: open(path, 'r').close(), self._test_open("open(path, 'r').close()",
lambda path: open(path, 'w').close()) lambda path: open(path, 'w').close())
def test_os_open(self): def test_os_open(self):
self._test_open(lambda path: os.close(os.open(path, os.O_RDONLY)), self._test_open("os.close(os.open(path, os.O_RDONLY))",
lambda path: os.close(os.open(path, os.O_WRONLY))) lambda path: os.close(os.open(path, os.O_WRONLY)))
@ -298,20 +354,21 @@ class SignalEINTRTest(EINTRBaseTest):
old_handler = signal.signal(signum, lambda *args: None) old_handler = signal.signal(signum, lambda *args: None)
self.addCleanup(signal.signal, signum, old_handler) self.addCleanup(signal.signal, signum, old_handler)
code = '\n'.join((
'import os, time',
'pid = %s' % os.getpid(),
'signum = %s' % int(signum),
'sleep_time = %r' % self.sleep_time,
'time.sleep(sleep_time)',
'os.kill(pid, signum)',
))
t0 = time.monotonic() t0 = time.monotonic()
child_pid = os.fork() with self.subprocess(code) as proc:
if child_pid == 0:
# child
try:
self._sleep()
os.kill(pid, signum)
finally:
os._exit(0)
else:
# parent # parent
signal.sigwaitinfo([signum]) signal.sigwaitinfo([signum])
dt = time.monotonic() - t0 dt = time.monotonic() - t0
os.waitpid(child_pid, 0) self.assertEqual(proc.wait(), 0)
self.assertGreaterEqual(dt, self.sleep_time) self.assertGreaterEqual(dt, self.sleep_time)