mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 19:34:08 +00:00 
			
		
		
		
	Tests now call busy_retry() and sleeping_retry() with SHORT_TIMEOUT or LONG_TIMEOUT (of test.support), rather than hardcoded constants. Add also WAIT_ACTIVE_CHILDREN_TIMEOUT constant to _test_multiprocessing.
		
			
				
	
	
		
			530 lines
		
	
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			530 lines
		
	
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""
 | 
						|
This test suite exercises some system calls subject to interruption with EINTR,
 | 
						|
to check that it is actually handled transparently.
 | 
						|
It is intended to be run by the main test suite within a child process, to
 | 
						|
ensure there is no background thread running (so that signals are delivered to
 | 
						|
the correct thread).
 | 
						|
Signals are generated in-process using setitimer(ITIMER_REAL), which allows
 | 
						|
sub-second periodicity (contrarily to signal()).
 | 
						|
"""
 | 
						|
 | 
						|
import contextlib
 | 
						|
import faulthandler
 | 
						|
import fcntl
 | 
						|
import os
 | 
						|
import platform
 | 
						|
import select
 | 
						|
import signal
 | 
						|
import socket
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
import time
 | 
						|
import unittest
 | 
						|
 | 
						|
from test import support
 | 
						|
from test.support import os_helper
 | 
						|
from test.support import socket_helper
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def kill_on_error(proc):
 | 
						|
    """Context manager killing the subprocess if a Python exception is raised."""
 | 
						|
    with proc:
 | 
						|
        try:
 | 
						|
            yield proc
 | 
						|
        except:
 | 
						|
            proc.kill()
 | 
						|
            raise
 | 
						|
 | 
						|
 | 
						|
@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
 | 
						|
class EINTRBaseTest(unittest.TestCase):
 | 
						|
    """ Base class for EINTR tests. """
 | 
						|
 | 
						|
    # delay for initial signal delivery
 | 
						|
    signal_delay = 0.1
 | 
						|
    # signal delivery periodicity
 | 
						|
    signal_period = 0.1
 | 
						|
    # default sleep time for tests - should obviously have:
 | 
						|
    # sleep_time > signal_period
 | 
						|
    sleep_time = 0.2
 | 
						|
 | 
						|
    def sighandler(self, signum, frame):
 | 
						|
        self.signals += 1
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        self.signals = 0
 | 
						|
        self.orig_handler = signal.signal(signal.SIGALRM, self.sighandler)
 | 
						|
        signal.setitimer(signal.ITIMER_REAL, self.signal_delay,
 | 
						|
                         self.signal_period)
 | 
						|
 | 
						|
        # Use faulthandler as watchdog to debug when a test hangs
 | 
						|
        # (timeout of 10 minutes)
 | 
						|
        faulthandler.dump_traceback_later(10 * 60, exit=True,
 | 
						|
                                          file=sys.__stderr__)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def stop_alarm():
 | 
						|
        signal.setitimer(signal.ITIMER_REAL, 0, 0)
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        self.stop_alarm()
 | 
						|
        signal.signal(signal.SIGALRM, self.orig_handler)
 | 
						|
        faulthandler.cancel_dump_traceback_later()
 | 
						|
 | 
						|
    def subprocess(self, *args, **kw):
 | 
						|
        cmd_args = (sys.executable, '-c') + args
 | 
						|
        return subprocess.Popen(cmd_args, **kw)
 | 
						|
 | 
						|
 | 
						|
@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
 | 
						|
class OSEINTRTest(EINTRBaseTest):
 | 
						|
    """ EINTR tests for the os module. """
 | 
						|
 | 
						|
    def new_sleep_process(self):
 | 
						|
        code = 'import time; time.sleep(%r)' % self.sleep_time
 | 
						|
        return self.subprocess(code)
 | 
						|
 | 
						|
    def _test_wait_multiple(self, wait_func):
 | 
						|
        num = 3
 | 
						|
        processes = [self.new_sleep_process() for _ in range(num)]
 | 
						|
        for _ in range(num):
 | 
						|
            wait_func()
 | 
						|
        # Call the Popen method to avoid a ResourceWarning
 | 
						|
        for proc in processes:
 | 
						|
            proc.wait()
 | 
						|
 | 
						|
    def test_wait(self):
 | 
						|
        self._test_wait_multiple(os.wait)
 | 
						|
 | 
						|
    @unittest.skipUnless(hasattr(os, 'wait3'), 'requires wait3()')
 | 
						|
    def test_wait3(self):
 | 
						|
        self._test_wait_multiple(lambda: os.wait3(0))
 | 
						|
 | 
						|
    def _test_wait_single(self, wait_func):
 | 
						|
        proc = self.new_sleep_process()
 | 
						|
        wait_func(proc.pid)
 | 
						|
        # Call the Popen method to avoid a ResourceWarning
 | 
						|
        proc.wait()
 | 
						|
 | 
						|
    def test_waitpid(self):
 | 
						|
        self._test_wait_single(lambda pid: os.waitpid(pid, 0))
 | 
						|
 | 
						|
    @unittest.skipUnless(hasattr(os, 'wait4'), 'requires wait4()')
 | 
						|
    def test_wait4(self):
 | 
						|
        self._test_wait_single(lambda pid: os.wait4(pid, 0))
 | 
						|
 | 
						|
    def test_read(self):
 | 
						|
        rd, wr = os.pipe()
 | 
						|
        self.addCleanup(os.close, rd)
 | 
						|
        # wr closed explicitly by parent
 | 
						|
 | 
						|
        # the payload below are smaller than PIPE_BUF, hence the writes will be
 | 
						|
        # atomic
 | 
						|
        datas = [b"hello", b"world", b"spam"]
 | 
						|
 | 
						|
        code = '\n'.join((
 | 
						|
            'import os, sys, time',
 | 
						|
            '',
 | 
						|
            'wr = int(sys.argv[1])',
 | 
						|
            'datas = %r' % datas,
 | 
						|
            'sleep_time = %r' % self.sleep_time,
 | 
						|
            '',
 | 
						|
            'for data in datas:',
 | 
						|
            '    # let the parent block on read()',
 | 
						|
            '    time.sleep(sleep_time)',
 | 
						|
            '    os.write(wr, data)',
 | 
						|
        ))
 | 
						|
 | 
						|
        proc = self.subprocess(code, str(wr), pass_fds=[wr])
 | 
						|
        with kill_on_error(proc):
 | 
						|
            os.close(wr)
 | 
						|
            for data in datas:
 | 
						|
                self.assertEqual(data, os.read(rd, len(data)))
 | 
						|
            self.assertEqual(proc.wait(), 0)
 | 
						|
 | 
						|
    def test_write(self):
 | 
						|
        rd, wr = os.pipe()
 | 
						|
        self.addCleanup(os.close, wr)
 | 
						|
        # rd closed explicitly by parent
 | 
						|
 | 
						|
        # we must write enough data for the write() to block
 | 
						|
        data = b"x" * support.PIPE_MAX_SIZE
 | 
						|
 | 
						|
        code = '\n'.join((
 | 
						|
            'import io, os, sys, time',
 | 
						|
            '',
 | 
						|
            'rd = int(sys.argv[1])',
 | 
						|
            'sleep_time = %r' % self.sleep_time,
 | 
						|
            'data = b"x" * %s' % support.PIPE_MAX_SIZE,
 | 
						|
            'data_len = len(data)',
 | 
						|
            '',
 | 
						|
            '# let the parent block on write()',
 | 
						|
            'time.sleep(sleep_time)',
 | 
						|
            '',
 | 
						|
            'read_data = io.BytesIO()',
 | 
						|
            'while len(read_data.getvalue()) < data_len:',
 | 
						|
            '    chunk = os.read(rd, 2 * data_len)',
 | 
						|
            '    read_data.write(chunk)',
 | 
						|
            '',
 | 
						|
            'value = read_data.getvalue()',
 | 
						|
            'if value != data:',
 | 
						|
            '    raise Exception("read error: %s vs %s bytes"',
 | 
						|
            '                    % (len(value), data_len))',
 | 
						|
        ))
 | 
						|
 | 
						|
        proc = self.subprocess(code, str(rd), pass_fds=[rd])
 | 
						|
        with kill_on_error(proc):
 | 
						|
            os.close(rd)
 | 
						|
            written = 0
 | 
						|
            while written < len(data):
 | 
						|
                written += os.write(wr, memoryview(data)[written:])
 | 
						|
            self.assertEqual(proc.wait(), 0)
 | 
						|
 | 
						|
 | 
						|
@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
 | 
						|
class SocketEINTRTest(EINTRBaseTest):
 | 
						|
    """ EINTR tests for the socket module. """
 | 
						|
 | 
						|
    @unittest.skipUnless(hasattr(socket, 'socketpair'), 'needs socketpair()')
 | 
						|
    def _test_recv(self, recv_func):
 | 
						|
        rd, wr = socket.socketpair()
 | 
						|
        self.addCleanup(rd.close)
 | 
						|
        # wr closed explicitly by parent
 | 
						|
 | 
						|
        # single-byte payload guard us against partial recv
 | 
						|
        datas = [b"x", b"y", b"z"]
 | 
						|
 | 
						|
        code = '\n'.join((
 | 
						|
            'import os, socket, sys, time',
 | 
						|
            '',
 | 
						|
            'fd = int(sys.argv[1])',
 | 
						|
            'family = %s' % int(wr.family),
 | 
						|
            'sock_type = %s' % int(wr.type),
 | 
						|
            'datas = %r' % datas,
 | 
						|
            'sleep_time = %r' % self.sleep_time,
 | 
						|
            '',
 | 
						|
            'wr = socket.fromfd(fd, family, sock_type)',
 | 
						|
            'os.close(fd)',
 | 
						|
            '',
 | 
						|
            'with wr:',
 | 
						|
            '    for data in datas:',
 | 
						|
            '        # let the parent block on recv()',
 | 
						|
            '        time.sleep(sleep_time)',
 | 
						|
            '        wr.sendall(data)',
 | 
						|
        ))
 | 
						|
 | 
						|
        fd = wr.fileno()
 | 
						|
        proc = self.subprocess(code, str(fd), pass_fds=[fd])
 | 
						|
        with kill_on_error(proc):
 | 
						|
            wr.close()
 | 
						|
            for data in datas:
 | 
						|
                self.assertEqual(data, recv_func(rd, len(data)))
 | 
						|
            self.assertEqual(proc.wait(), 0)
 | 
						|
 | 
						|
    def test_recv(self):
 | 
						|
        self._test_recv(socket.socket.recv)
 | 
						|
 | 
						|
    @unittest.skipUnless(hasattr(socket.socket, 'recvmsg'), 'needs recvmsg()')
 | 
						|
    def test_recvmsg(self):
 | 
						|
        self._test_recv(lambda sock, data: sock.recvmsg(data)[0])
 | 
						|
 | 
						|
    def _test_send(self, send_func):
 | 
						|
        rd, wr = socket.socketpair()
 | 
						|
        self.addCleanup(wr.close)
 | 
						|
        # rd closed explicitly by parent
 | 
						|
 | 
						|
        # we must send enough data for the send() to block
 | 
						|
        data = b"xyz" * (support.SOCK_MAX_SIZE // 3)
 | 
						|
 | 
						|
        code = '\n'.join((
 | 
						|
            'import os, socket, sys, time',
 | 
						|
            '',
 | 
						|
            'fd = int(sys.argv[1])',
 | 
						|
            'family = %s' % int(rd.family),
 | 
						|
            'sock_type = %s' % int(rd.type),
 | 
						|
            'sleep_time = %r' % self.sleep_time,
 | 
						|
            'data = b"xyz" * %s' % (support.SOCK_MAX_SIZE // 3),
 | 
						|
            'data_len = len(data)',
 | 
						|
            '',
 | 
						|
            'rd = socket.fromfd(fd, family, sock_type)',
 | 
						|
            'os.close(fd)',
 | 
						|
            '',
 | 
						|
            'with rd:',
 | 
						|
            '    # let the parent block on send()',
 | 
						|
            '    time.sleep(sleep_time)',
 | 
						|
            '',
 | 
						|
            '    received_data = bytearray(data_len)',
 | 
						|
            '    n = 0',
 | 
						|
            '    while n < data_len:',
 | 
						|
            '        n += rd.recv_into(memoryview(received_data)[n:])',
 | 
						|
            '',
 | 
						|
            'if received_data != data:',
 | 
						|
            '    raise Exception("recv error: %s vs %s bytes"',
 | 
						|
            '                    % (len(received_data), data_len))',
 | 
						|
        ))
 | 
						|
 | 
						|
        fd = rd.fileno()
 | 
						|
        proc = self.subprocess(code, str(fd), pass_fds=[fd])
 | 
						|
        with kill_on_error(proc):
 | 
						|
            rd.close()
 | 
						|
            written = 0
 | 
						|
            while written < len(data):
 | 
						|
                sent = send_func(wr, memoryview(data)[written:])
 | 
						|
                # sendall() returns None
 | 
						|
                written += len(data) if sent is None else sent
 | 
						|
            self.assertEqual(proc.wait(), 0)
 | 
						|
 | 
						|
    def test_send(self):
 | 
						|
        self._test_send(socket.socket.send)
 | 
						|
 | 
						|
    def test_sendall(self):
 | 
						|
        self._test_send(socket.socket.sendall)
 | 
						|
 | 
						|
    @unittest.skipUnless(hasattr(socket.socket, 'sendmsg'), 'needs sendmsg()')
 | 
						|
    def test_sendmsg(self):
 | 
						|
        self._test_send(lambda sock, data: sock.sendmsg([data]))
 | 
						|
 | 
						|
    def test_accept(self):
 | 
						|
        sock = socket.create_server((socket_helper.HOST, 0))
 | 
						|
        self.addCleanup(sock.close)
 | 
						|
        port = sock.getsockname()[1]
 | 
						|
 | 
						|
        code = '\n'.join((
 | 
						|
            'import socket, time',
 | 
						|
            '',
 | 
						|
            'host = %r' % socket_helper.HOST,
 | 
						|
            'port = %s' % port,
 | 
						|
            'sleep_time = %r' % self.sleep_time,
 | 
						|
            '',
 | 
						|
            '# let parent block on accept()',
 | 
						|
            'time.sleep(sleep_time)',
 | 
						|
            'with socket.create_connection((host, port)):',
 | 
						|
            '    time.sleep(sleep_time)',
 | 
						|
        ))
 | 
						|
 | 
						|
        proc = self.subprocess(code)
 | 
						|
        with kill_on_error(proc):
 | 
						|
            client_sock, _ = sock.accept()
 | 
						|
            client_sock.close()
 | 
						|
            self.assertEqual(proc.wait(), 0)
 | 
						|
 | 
						|
    # Issue #25122: There is a race condition in the FreeBSD kernel on
 | 
						|
    # handling signals in the FIFO device. Skip the test until the bug is
 | 
						|
    # fixed in the kernel.
 | 
						|
    # https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=203162
 | 
						|
    @support.requires_freebsd_version(10, 3)
 | 
						|
    @unittest.skipUnless(hasattr(os, 'mkfifo'), 'needs mkfifo()')
 | 
						|
    def _test_open(self, do_open_close_reader, do_open_close_writer):
 | 
						|
        filename = os_helper.TESTFN
 | 
						|
 | 
						|
        # Use a fifo: until the child opens it for reading, the parent will
 | 
						|
        # block when trying to open it for writing.
 | 
						|
        os_helper.unlink(filename)
 | 
						|
        try:
 | 
						|
            os.mkfifo(filename)
 | 
						|
        except PermissionError as e:
 | 
						|
            self.skipTest('os.mkfifo(): %s' % e)
 | 
						|
        self.addCleanup(os_helper.unlink, filename)
 | 
						|
 | 
						|
        code = '\n'.join((
 | 
						|
            'import os, time',
 | 
						|
            '',
 | 
						|
            'path = %a' % filename,
 | 
						|
            'sleep_time = %r' % self.sleep_time,
 | 
						|
            '',
 | 
						|
            '# let the parent block',
 | 
						|
            'time.sleep(sleep_time)',
 | 
						|
            '',
 | 
						|
            do_open_close_reader,
 | 
						|
        ))
 | 
						|
 | 
						|
        proc = self.subprocess(code)
 | 
						|
        with kill_on_error(proc):
 | 
						|
            do_open_close_writer(filename)
 | 
						|
            self.assertEqual(proc.wait(), 0)
 | 
						|
 | 
						|
    def python_open(self, path):
 | 
						|
        fp = open(path, 'w')
 | 
						|
        fp.close()
 | 
						|
 | 
						|
    @unittest.skipIf(sys.platform == "darwin",
 | 
						|
                     "hangs under macOS; see bpo-25234, bpo-35363")
 | 
						|
    def test_open(self):
 | 
						|
        self._test_open("fp = open(path, 'r')\nfp.close()",
 | 
						|
                        self.python_open)
 | 
						|
 | 
						|
    def os_open(self, path):
 | 
						|
        fd = os.open(path, os.O_WRONLY)
 | 
						|
        os.close(fd)
 | 
						|
 | 
						|
    @unittest.skipIf(sys.platform == "darwin",
 | 
						|
                     "hangs under macOS; see bpo-25234, bpo-35363")
 | 
						|
    def test_os_open(self):
 | 
						|
        self._test_open("fd = os.open(path, os.O_RDONLY)\nos.close(fd)",
 | 
						|
                        self.os_open)
 | 
						|
 | 
						|
 | 
						|
@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
 | 
						|
class TimeEINTRTest(EINTRBaseTest):
 | 
						|
    """ EINTR tests for the time module. """
 | 
						|
 | 
						|
    def test_sleep(self):
 | 
						|
        t0 = time.monotonic()
 | 
						|
        time.sleep(self.sleep_time)
 | 
						|
        self.stop_alarm()
 | 
						|
        dt = time.monotonic() - t0
 | 
						|
        self.assertGreaterEqual(dt, self.sleep_time)
 | 
						|
 | 
						|
 | 
						|
@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
 | 
						|
# bpo-30320: Need pthread_sigmask() to block the signal, otherwise the test
 | 
						|
# is vulnerable to a race condition between the child and the parent processes.
 | 
						|
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
 | 
						|
                     'need signal.pthread_sigmask()')
 | 
						|
class SignalEINTRTest(EINTRBaseTest):
 | 
						|
    """ EINTR tests for the signal module. """
 | 
						|
 | 
						|
    def check_sigwait(self, wait_func):
 | 
						|
        signum = signal.SIGUSR1
 | 
						|
        pid = os.getpid()
 | 
						|
 | 
						|
        old_handler = signal.signal(signum, lambda *args: None)
 | 
						|
        self.addCleanup(signal.signal, signum, old_handler)
 | 
						|
 | 
						|
        code = '\n'.join((
 | 
						|
            'import os, time',
 | 
						|
            'pid = %s' % os.getpid(),
 | 
						|
            'signum = %s' % int(signum),
 | 
						|
            'sleep_time = %r' % self.sleep_time,
 | 
						|
            'time.sleep(sleep_time)',
 | 
						|
            'os.kill(pid, signum)',
 | 
						|
        ))
 | 
						|
 | 
						|
        old_mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
 | 
						|
        self.addCleanup(signal.pthread_sigmask, signal.SIG_UNBLOCK, [signum])
 | 
						|
 | 
						|
        proc = self.subprocess(code)
 | 
						|
        with kill_on_error(proc):
 | 
						|
            wait_func(signum)
 | 
						|
 | 
						|
        self.assertEqual(proc.wait(), 0)
 | 
						|
 | 
						|
    @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
 | 
						|
                         'need signal.sigwaitinfo()')
 | 
						|
    def test_sigwaitinfo(self):
 | 
						|
        def wait_func(signum):
 | 
						|
            signal.sigwaitinfo([signum])
 | 
						|
 | 
						|
        self.check_sigwait(wait_func)
 | 
						|
 | 
						|
    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
 | 
						|
                         'need signal.sigwaitinfo()')
 | 
						|
    def test_sigtimedwait(self):
 | 
						|
        def wait_func(signum):
 | 
						|
            signal.sigtimedwait([signum], 120.0)
 | 
						|
 | 
						|
        self.check_sigwait(wait_func)
 | 
						|
 | 
						|
 | 
						|
@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
 | 
						|
class SelectEINTRTest(EINTRBaseTest):
 | 
						|
    """ EINTR tests for the select module. """
 | 
						|
 | 
						|
    def test_select(self):
 | 
						|
        t0 = time.monotonic()
 | 
						|
        select.select([], [], [], self.sleep_time)
 | 
						|
        dt = time.monotonic() - t0
 | 
						|
        self.stop_alarm()
 | 
						|
        self.assertGreaterEqual(dt, self.sleep_time)
 | 
						|
 | 
						|
    @unittest.skipIf(sys.platform == "darwin",
 | 
						|
                     "poll may fail on macOS; see issue #28087")
 | 
						|
    @unittest.skipUnless(hasattr(select, 'poll'), 'need select.poll')
 | 
						|
    def test_poll(self):
 | 
						|
        poller = select.poll()
 | 
						|
 | 
						|
        t0 = time.monotonic()
 | 
						|
        poller.poll(self.sleep_time * 1e3)
 | 
						|
        dt = time.monotonic() - t0
 | 
						|
        self.stop_alarm()
 | 
						|
        self.assertGreaterEqual(dt, self.sleep_time)
 | 
						|
 | 
						|
    @unittest.skipUnless(hasattr(select, 'epoll'), 'need select.epoll')
 | 
						|
    def test_epoll(self):
 | 
						|
        poller = select.epoll()
 | 
						|
        self.addCleanup(poller.close)
 | 
						|
 | 
						|
        t0 = time.monotonic()
 | 
						|
        poller.poll(self.sleep_time)
 | 
						|
        dt = time.monotonic() - t0
 | 
						|
        self.stop_alarm()
 | 
						|
        self.assertGreaterEqual(dt, self.sleep_time)
 | 
						|
 | 
						|
    @unittest.skipUnless(hasattr(select, 'kqueue'), 'need select.kqueue')
 | 
						|
    def test_kqueue(self):
 | 
						|
        kqueue = select.kqueue()
 | 
						|
        self.addCleanup(kqueue.close)
 | 
						|
 | 
						|
        t0 = time.monotonic()
 | 
						|
        kqueue.control(None, 1, self.sleep_time)
 | 
						|
        dt = time.monotonic() - t0
 | 
						|
        self.stop_alarm()
 | 
						|
        self.assertGreaterEqual(dt, self.sleep_time)
 | 
						|
 | 
						|
    @unittest.skipUnless(hasattr(select, 'devpoll'), 'need select.devpoll')
 | 
						|
    def test_devpoll(self):
 | 
						|
        poller = select.devpoll()
 | 
						|
        self.addCleanup(poller.close)
 | 
						|
 | 
						|
        t0 = time.monotonic()
 | 
						|
        poller.poll(self.sleep_time * 1e3)
 | 
						|
        dt = time.monotonic() - t0
 | 
						|
        self.stop_alarm()
 | 
						|
        self.assertGreaterEqual(dt, self.sleep_time)
 | 
						|
 | 
						|
 | 
						|
class FNTLEINTRTest(EINTRBaseTest):
 | 
						|
    def _lock(self, lock_func, lock_name):
 | 
						|
        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
 | 
						|
        code = '\n'.join((
 | 
						|
            "import fcntl, time",
 | 
						|
            "with open('%s', 'wb') as f:" % os_helper.TESTFN,
 | 
						|
            "   fcntl.%s(f, fcntl.LOCK_EX)" % lock_name,
 | 
						|
            "   time.sleep(%s)" % self.sleep_time))
 | 
						|
        start_time = time.monotonic()
 | 
						|
        proc = self.subprocess(code)
 | 
						|
        with kill_on_error(proc):
 | 
						|
            with open(os_helper.TESTFN, 'wb') as f:
 | 
						|
                # synchronize the subprocess
 | 
						|
                start_time = time.monotonic()
 | 
						|
                for _ in support.sleeping_retry(support.LONG_TIMEOUT, error=False):
 | 
						|
                    try:
 | 
						|
                        lock_func(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
 | 
						|
                        lock_func(f, fcntl.LOCK_UN)
 | 
						|
                    except BlockingIOError:
 | 
						|
                        break
 | 
						|
                else:
 | 
						|
                    dt = time.monotonic() - start_time
 | 
						|
                    raise Exception("failed to sync child in %.1f sec" % dt)
 | 
						|
 | 
						|
                # the child locked the file just a moment ago for 'sleep_time' seconds
 | 
						|
                # that means that the lock below will block for 'sleep_time' minus some
 | 
						|
                # potential context switch delay
 | 
						|
                lock_func(f, fcntl.LOCK_EX)
 | 
						|
                dt = time.monotonic() - start_time
 | 
						|
                self.assertGreaterEqual(dt, self.sleep_time)
 | 
						|
                self.stop_alarm()
 | 
						|
            proc.wait()
 | 
						|
 | 
						|
    # Issue 35633: See https://bugs.python.org/issue35633#msg333662
 | 
						|
    # skip test rather than accept PermissionError from all platforms
 | 
						|
    @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError")
 | 
						|
    def test_lockf(self):
 | 
						|
        self._lock(fcntl.lockf, "lockf")
 | 
						|
 | 
						|
    def test_flock(self):
 | 
						|
        self._lock(fcntl.flock, "flock")
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    unittest.main()
 |