gh-129205: Add os.readinto() API for reading data into a caller provided buffer (#129211)

Add a new OS API which will read data directly into a caller provided
writeable buffer protocol object.

Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com>
Co-authored-by: Victor Stinner <vstinner@python.org>
This commit is contained in:
Cody Maloney 2025-01-26 05:21:03 -08:00 committed by GitHub
parent 0ef8d470b7
commit 1ed4487968
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 267 additions and 1 deletions

View file

@ -230,6 +230,93 @@ class FileTests(unittest.TestCase):
self.assertEqual(type(s), bytes)
self.assertEqual(s, b"spam")
def test_readinto(self):
with open(os_helper.TESTFN, "w+b") as fobj:
fobj.write(b"spam")
fobj.flush()
fd = fobj.fileno()
os.lseek(fd, 0, 0)
# Oversized so readinto without hitting end.
buffer = bytearray(7)
s = os.readinto(fd, buffer)
self.assertEqual(type(s), int)
self.assertEqual(s, 4)
# Should overwrite the first 4 bytes of the buffer.
self.assertEqual(buffer[:4], b"spam")
# Readinto at EOF should return 0 and not touch buffer.
buffer[:] = b"notspam"
s = os.readinto(fd, buffer)
self.assertEqual(type(s), int)
self.assertEqual(s, 0)
self.assertEqual(bytes(buffer), b"notspam")
s = os.readinto(fd, buffer)
self.assertEqual(s, 0)
self.assertEqual(bytes(buffer), b"notspam")
# Readinto a 0 length bytearray when at EOF should return 0
self.assertEqual(os.readinto(fd, bytearray()), 0)
# Readinto a 0 length bytearray with data available should return 0.
os.lseek(fd, 0, 0)
self.assertEqual(os.readinto(fd, bytearray()), 0)
@unittest.skipUnless(hasattr(os, 'get_blocking'),
'needs os.get_blocking() and os.set_blocking()')
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
def test_readinto_non_blocking(self):
# Verify behavior of a readinto which would block on a non-blocking fd.
r, w = os.pipe()
try:
os.set_blocking(r, False)
with self.assertRaises(BlockingIOError):
os.readinto(r, bytearray(5))
# Pass some data through
os.write(w, b"spam")
self.assertEqual(os.readinto(r, bytearray(4)), 4)
# Still don't block or return 0.
with self.assertRaises(BlockingIOError):
os.readinto(r, bytearray(5))
# At EOF should return size 0
os.close(w)
w = None
self.assertEqual(os.readinto(r, bytearray(5)), 0)
self.assertEqual(os.readinto(r, bytearray(5)), 0) # Still EOF
finally:
os.close(r)
if w is not None:
os.close(w)
def test_readinto_badarg(self):
with open(os_helper.TESTFN, "w+b") as fobj:
fobj.write(b"spam")
fobj.flush()
fd = fobj.fileno()
os.lseek(fd, 0, 0)
for bad_arg in ("test", bytes(), 14):
with self.subTest(f"bad buffer {type(bad_arg)}"):
with self.assertRaises(TypeError):
os.readinto(fd, bad_arg)
with self.subTest("doesn't work on file objects"):
with self.assertRaises(TypeError):
os.readinto(fobj, bytearray(5))
# takes two args
with self.assertRaises(TypeError):
os.readinto(fd)
# No data should have been read with the bad arguments.
buffer = bytearray(4)
s = os.readinto(fd, buffer)
self.assertEqual(s, 4)
self.assertEqual(buffer, b"spam")
@support.cpython_only
# Skip the test on 32-bit platforms: the number of bytes must fit in a
# Py_ssize_t type
@ -249,6 +336,29 @@ class FileTests(unittest.TestCase):
# operating system is free to return less bytes than requested.
self.assertEqual(data, b'test')
@support.cpython_only
# Skip the test on 32-bit platforms: the number of bytes must fit in a
# Py_ssize_t type
@unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
"needs INT_MAX < PY_SSIZE_T_MAX")
@support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
def test_large_readinto(self, size):
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
create_file(os_helper.TESTFN, b'test')
# Issue #21932: For readinto the buffer contains the length rather than
# a length being passed explicitly to read, should still get capped to a
# valid size / not raise an OverflowError for sizes larger than INT_MAX.
buffer = bytearray(INT_MAX + 10)
with open(os_helper.TESTFN, "rb") as fp:
length = os.readinto(fp.fileno(), buffer)
# The test does not try to read more than 2 GiB at once because the
# operating system is free to return less bytes than requested.
self.assertEqual(length, 4)
self.assertEqual(buffer[:4], b'test')
def test_write(self):
# os.write() accepts bytes- and buffer-like objects but not strings
fd = os.open(os_helper.TESTFN, os.O_CREAT | os.O_WRONLY)
@ -2467,6 +2577,10 @@ class TestInvalidFD(unittest.TestCase):
def test_read(self):
self.check(os.read, 1)
@unittest.skipUnless(hasattr(os, 'readinto'), 'test needs os.readinto()')
def test_readinto(self):
self.check(os.readinto, bytearray(5))
@unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
def test_readv(self):
buf = bytearray(10)