mirror of
https://github.com/python/cpython.git
synced 2025-07-07 19:35:27 +00:00
gh-129205: Modernize test_eintr (#129316)
* Use f-string. * Fix grammar: replace 'datas' with 'data' (and replace 'data' with 'item'). * Remove unused variables: 'pid' and 'old_mask'. * Factorize test_read() and test_readinto() code. Co-authored-by: Cody Maloney <cmaloney@users.noreply.github.com>
This commit is contained in:
parent
75b4962157
commit
8a5a18a36e
1 changed files with 48 additions and 69 deletions
|
@ -91,7 +91,7 @@ class OSEINTRTest(EINTRBaseTest):
|
|||
""" EINTR tests for the os module. """
|
||||
|
||||
def new_sleep_process(self):
|
||||
code = 'import time; time.sleep(%r)' % self.sleep_time
|
||||
code = f'import time; time.sleep({self.sleep_time!r})'
|
||||
return self.subprocess(code)
|
||||
|
||||
def _test_wait_multiple(self, wait_func):
|
||||
|
@ -123,65 +123,45 @@ class OSEINTRTest(EINTRBaseTest):
|
|||
def test_wait4(self):
|
||||
self._test_wait_single(lambda pid: os.wait4(pid, 0))
|
||||
|
||||
def test_read(self):
|
||||
def _interrupted_reads(self):
|
||||
"""Make a fd which will force block on read of expected bytes."""
|
||||
rd, wr = os.pipe()
|
||||
self.addCleanup(os.close, rd)
|
||||
# wr closed explicitly by parent
|
||||
|
||||
# the payload below are smaller than PIPE_BUF, hence the writes will be
|
||||
# atomic
|
||||
datas = [b"hello", b"world", b"spam"]
|
||||
data = [b"hello", b"world", b"spam"]
|
||||
|
||||
code = '\n'.join((
|
||||
'import os, sys, time',
|
||||
'',
|
||||
'wr = int(sys.argv[1])',
|
||||
'datas = %r' % datas,
|
||||
'sleep_time = %r' % self.sleep_time,
|
||||
f'data = {data!r}',
|
||||
f'sleep_time = {self.sleep_time!r}',
|
||||
'',
|
||||
'for data in datas:',
|
||||
'for item in data:',
|
||||
' # let the parent block on read()',
|
||||
' time.sleep(sleep_time)',
|
||||
' os.write(wr, data)',
|
||||
' os.write(wr, item)',
|
||||
))
|
||||
|
||||
proc = self.subprocess(code, str(wr), pass_fds=[wr])
|
||||
with kill_on_error(proc):
|
||||
os.close(wr)
|
||||
for data in datas:
|
||||
self.assertEqual(data, os.read(rd, len(data)))
|
||||
for datum in data:
|
||||
yield rd, datum
|
||||
self.assertEqual(proc.wait(), 0)
|
||||
|
||||
def test_read(self):
|
||||
for fd, expected in self._interrupted_reads():
|
||||
self.assertEqual(expected, os.read(fd, len(expected)))
|
||||
|
||||
def test_readinto(self):
|
||||
rd, wr = os.pipe()
|
||||
self.addCleanup(os.close, rd)
|
||||
# wr closed explicitly by parent
|
||||
|
||||
# the payload below are smaller than PIPE_BUF, hence the writes will be
|
||||
# atomic
|
||||
datas = [b"hello", b"world", b"spam"]
|
||||
|
||||
code = '\n'.join((
|
||||
'import os, sys, time',
|
||||
'',
|
||||
'wr = int(sys.argv[1])',
|
||||
'datas = %r' % datas,
|
||||
'sleep_time = %r' % self.sleep_time,
|
||||
'',
|
||||
'for data in datas:',
|
||||
' # let the parent block on read()',
|
||||
' time.sleep(sleep_time)',
|
||||
' os.write(wr, data)',
|
||||
))
|
||||
|
||||
proc = self.subprocess(code, str(wr), pass_fds=[wr])
|
||||
with kill_on_error(proc):
|
||||
os.close(wr)
|
||||
for data in datas:
|
||||
buffer = bytearray(len(data))
|
||||
self.assertEqual(os.readinto(rd, buffer), len(data))
|
||||
self.assertEqual(buffer, data)
|
||||
self.assertEqual(proc.wait(), 0)
|
||||
for fd, expected in self._interrupted_reads():
|
||||
buffer = bytearray(len(expected))
|
||||
self.assertEqual(os.readinto(fd, buffer), len(expected))
|
||||
self.assertEqual(buffer, expected)
|
||||
|
||||
def test_write(self):
|
||||
rd, wr = os.pipe()
|
||||
|
@ -195,8 +175,8 @@ class OSEINTRTest(EINTRBaseTest):
|
|||
'import io, os, sys, time',
|
||||
'',
|
||||
'rd = int(sys.argv[1])',
|
||||
'sleep_time = %r' % self.sleep_time,
|
||||
'data = b"x" * %s' % support.PIPE_MAX_SIZE,
|
||||
f'sleep_time = {self.sleep_time!r}',
|
||||
f'data = b"x" * {support.PIPE_MAX_SIZE}',
|
||||
'data_len = len(data)',
|
||||
'',
|
||||
'# let the parent block on write()',
|
||||
|
@ -209,8 +189,8 @@ class OSEINTRTest(EINTRBaseTest):
|
|||
'',
|
||||
'value = read_data.getvalue()',
|
||||
'if value != data:',
|
||||
' raise Exception("read error: %s vs %s bytes"',
|
||||
' % (len(value), data_len))',
|
||||
' raise Exception(f"read error: {len(value)}'
|
||||
' vs {data_len} bytes")',
|
||||
))
|
||||
|
||||
proc = self.subprocess(code, str(rd), pass_fds=[rd])
|
||||
|
@ -233,33 +213,33 @@ class SocketEINTRTest(EINTRBaseTest):
|
|||
# wr closed explicitly by parent
|
||||
|
||||
# single-byte payload guard us against partial recv
|
||||
datas = [b"x", b"y", b"z"]
|
||||
data = [b"x", b"y", b"z"]
|
||||
|
||||
code = '\n'.join((
|
||||
'import os, socket, sys, time',
|
||||
'',
|
||||
'fd = int(sys.argv[1])',
|
||||
'family = %s' % int(wr.family),
|
||||
'sock_type = %s' % int(wr.type),
|
||||
'datas = %r' % datas,
|
||||
'sleep_time = %r' % self.sleep_time,
|
||||
f'family = {int(wr.family)}',
|
||||
f'sock_type = {int(wr.type)}',
|
||||
f'data = {data!r}',
|
||||
f'sleep_time = {self.sleep_time!r}',
|
||||
'',
|
||||
'wr = socket.fromfd(fd, family, sock_type)',
|
||||
'os.close(fd)',
|
||||
'',
|
||||
'with wr:',
|
||||
' for data in datas:',
|
||||
' for item in data:',
|
||||
' # let the parent block on recv()',
|
||||
' time.sleep(sleep_time)',
|
||||
' wr.sendall(data)',
|
||||
' wr.sendall(item)',
|
||||
))
|
||||
|
||||
fd = wr.fileno()
|
||||
proc = self.subprocess(code, str(fd), pass_fds=[fd])
|
||||
with kill_on_error(proc):
|
||||
wr.close()
|
||||
for data in datas:
|
||||
self.assertEqual(data, recv_func(rd, len(data)))
|
||||
for item in data:
|
||||
self.assertEqual(item, recv_func(rd, len(item)))
|
||||
self.assertEqual(proc.wait(), 0)
|
||||
|
||||
def test_recv(self):
|
||||
|
@ -281,10 +261,10 @@ class SocketEINTRTest(EINTRBaseTest):
|
|||
'import os, socket, sys, time',
|
||||
'',
|
||||
'fd = int(sys.argv[1])',
|
||||
'family = %s' % int(rd.family),
|
||||
'sock_type = %s' % int(rd.type),
|
||||
'sleep_time = %r' % self.sleep_time,
|
||||
'data = b"xyz" * %s' % (support.SOCK_MAX_SIZE // 3),
|
||||
f'family = {int(rd.family)}',
|
||||
f'sock_type = {int(rd.type)}',
|
||||
f'sleep_time = {self.sleep_time!r}',
|
||||
f'data = b"xyz" * {support.SOCK_MAX_SIZE // 3}',
|
||||
'data_len = len(data)',
|
||||
'',
|
||||
'rd = socket.fromfd(fd, family, sock_type)',
|
||||
|
@ -300,8 +280,8 @@ class SocketEINTRTest(EINTRBaseTest):
|
|||
' n += rd.recv_into(memoryview(received_data)[n:])',
|
||||
'',
|
||||
'if received_data != data:',
|
||||
' raise Exception("recv error: %s vs %s bytes"',
|
||||
' % (len(received_data), data_len))',
|
||||
' raise Exception(f"recv error: {len(received_data)}'
|
||||
' vs {data_len} bytes")',
|
||||
))
|
||||
|
||||
fd = rd.fileno()
|
||||
|
@ -333,9 +313,9 @@ class SocketEINTRTest(EINTRBaseTest):
|
|||
code = '\n'.join((
|
||||
'import socket, time',
|
||||
'',
|
||||
'host = %r' % socket_helper.HOST,
|
||||
'port = %s' % port,
|
||||
'sleep_time = %r' % self.sleep_time,
|
||||
f'host = {socket_helper.HOST!r}',
|
||||
f'port = {port}',
|
||||
f'sleep_time = {self.sleep_time!r}',
|
||||
'',
|
||||
'# let parent block on accept()',
|
||||
'time.sleep(sleep_time)',
|
||||
|
@ -363,15 +343,15 @@ class SocketEINTRTest(EINTRBaseTest):
|
|||
os_helper.unlink(filename)
|
||||
try:
|
||||
os.mkfifo(filename)
|
||||
except PermissionError as e:
|
||||
self.skipTest('os.mkfifo(): %s' % e)
|
||||
except PermissionError as exc:
|
||||
self.skipTest(f'os.mkfifo(): {exc!r}')
|
||||
self.addCleanup(os_helper.unlink, filename)
|
||||
|
||||
code = '\n'.join((
|
||||
'import os, time',
|
||||
'',
|
||||
'path = %a' % filename,
|
||||
'sleep_time = %r' % self.sleep_time,
|
||||
f'path = {filename!a}',
|
||||
f'sleep_time = {self.sleep_time!r}',
|
||||
'',
|
||||
'# let the parent block',
|
||||
'time.sleep(sleep_time)',
|
||||
|
@ -427,21 +407,20 @@ class SignalEINTRTest(EINTRBaseTest):
|
|||
|
||||
def check_sigwait(self, wait_func):
|
||||
signum = signal.SIGUSR1
|
||||
pid = os.getpid()
|
||||
|
||||
old_handler = signal.signal(signum, lambda *args: None)
|
||||
self.addCleanup(signal.signal, signum, old_handler)
|
||||
|
||||
code = '\n'.join((
|
||||
'import os, time',
|
||||
'pid = %s' % os.getpid(),
|
||||
'signum = %s' % int(signum),
|
||||
'sleep_time = %r' % self.sleep_time,
|
||||
f'pid = {os.getpid()}',
|
||||
f'signum = {int(signum)}',
|
||||
f'sleep_time = {self.sleep_time!r}',
|
||||
'time.sleep(sleep_time)',
|
||||
'os.kill(pid, signum)',
|
||||
))
|
||||
|
||||
old_mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
|
||||
signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
|
||||
self.addCleanup(signal.pthread_sigmask, signal.SIG_UNBLOCK, [signum])
|
||||
|
||||
proc = self.subprocess(code)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue