mirror of
https://github.com/python/cpython.git
synced 2025-08-28 04:35:02 +00:00
Issue #20699: Merge io bytes-like fixes from 3.5
This commit is contained in:
commit
c249221dfd
10 changed files with 128 additions and 67 deletions
|
@ -45,6 +45,22 @@ try:
|
|||
except ImportError:
|
||||
threading = None
|
||||
|
||||
try:
|
||||
import ctypes
|
||||
except ImportError:
|
||||
def byteslike(*pos, **kw):
|
||||
return array.array("b", bytes(*pos, **kw))
|
||||
else:
|
||||
def byteslike(*pos, **kw):
|
||||
"""Create a bytes-like object having no string or sequence methods"""
|
||||
data = bytes(*pos, **kw)
|
||||
obj = EmptyStruct()
|
||||
ctypes.resize(obj, len(data))
|
||||
memoryview(obj).cast("B")[:] = data
|
||||
return obj
|
||||
class EmptyStruct(ctypes.Structure):
|
||||
pass
|
||||
|
||||
def _default_chunk_size():
|
||||
"""Get the default TextIOWrapper chunk size"""
|
||||
with open(__file__, "r", encoding="latin-1") as f:
|
||||
|
@ -284,7 +300,9 @@ class IOTest(unittest.TestCase):
|
|||
self.assertEqual(f.tell(), 6)
|
||||
self.assertEqual(f.seek(-1, 1), 5)
|
||||
self.assertEqual(f.tell(), 5)
|
||||
self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
|
||||
buffer = bytearray(b" world\n\n\n")
|
||||
self.assertEqual(f.write(buffer), 9)
|
||||
buffer[:] = b"*" * 9 # Overwrite our copy of the data
|
||||
self.assertEqual(f.seek(0), 0)
|
||||
self.assertEqual(f.write(b"h"), 1)
|
||||
self.assertEqual(f.seek(-1, 2), 13)
|
||||
|
@ -297,20 +315,21 @@ class IOTest(unittest.TestCase):
|
|||
def read_ops(self, f, buffered=False):
|
||||
data = f.read(5)
|
||||
self.assertEqual(data, b"hello")
|
||||
data = bytearray(data)
|
||||
data = byteslike(data)
|
||||
self.assertEqual(f.readinto(data), 5)
|
||||
self.assertEqual(data, b" worl")
|
||||
self.assertEqual(bytes(data), b" worl")
|
||||
data = bytearray(5)
|
||||
self.assertEqual(f.readinto(data), 2)
|
||||
self.assertEqual(len(data), 5)
|
||||
self.assertEqual(data[:2], b"d\n")
|
||||
self.assertEqual(f.seek(0), 0)
|
||||
self.assertEqual(f.read(20), b"hello world\n")
|
||||
self.assertEqual(f.read(1), b"")
|
||||
self.assertEqual(f.readinto(bytearray(b"x")), 0)
|
||||
self.assertEqual(f.readinto(byteslike(b"x")), 0)
|
||||
self.assertEqual(f.seek(-6, 2), 6)
|
||||
self.assertEqual(f.read(5), b"world")
|
||||
self.assertEqual(f.read(0), b"")
|
||||
self.assertEqual(f.readinto(bytearray()), 0)
|
||||
self.assertEqual(f.readinto(byteslike()), 0)
|
||||
self.assertEqual(f.seek(-6, 1), 5)
|
||||
self.assertEqual(f.read(5), b" worl")
|
||||
self.assertEqual(f.tell(), 10)
|
||||
|
@ -321,6 +340,10 @@ class IOTest(unittest.TestCase):
|
|||
f.seek(6)
|
||||
self.assertEqual(f.read(), b"world\n")
|
||||
self.assertEqual(f.read(), b"")
|
||||
f.seek(0)
|
||||
data = byteslike(5)
|
||||
self.assertEqual(f.readinto1(data), 5)
|
||||
self.assertEqual(bytes(data), b"hello")
|
||||
|
||||
LARGE = 2**31
|
||||
|
||||
|
@ -641,10 +664,15 @@ class IOTest(unittest.TestCase):
|
|||
def test_array_writes(self):
|
||||
a = array.array('i', range(10))
|
||||
n = len(a.tobytes())
|
||||
with self.open(support.TESTFN, "wb", 0) as f:
|
||||
self.assertEqual(f.write(a), n)
|
||||
with self.open(support.TESTFN, "wb") as f:
|
||||
self.assertEqual(f.write(a), n)
|
||||
def check(f):
|
||||
with f:
|
||||
self.assertEqual(f.write(a), n)
|
||||
f.writelines((a,))
|
||||
check(self.BytesIO())
|
||||
check(self.FileIO(support.TESTFN, "w"))
|
||||
check(self.BufferedWriter(self.MockRawIO()))
|
||||
check(self.BufferedRandom(self.MockRawIO()))
|
||||
check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
|
||||
|
||||
def test_closefd(self):
|
||||
self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
|
||||
|
@ -803,6 +831,19 @@ class IOTest(unittest.TestCase):
|
|||
with self.assertRaises(ValueError):
|
||||
self.open(support.TESTFN, 'w', newline='invalid')
|
||||
|
||||
def test_buffered_readinto_mixin(self):
|
||||
# Test the implementation provided by BufferedIOBase
|
||||
class Stream(self.BufferedIOBase):
|
||||
def read(self, size):
|
||||
return b"12345"
|
||||
read1 = read
|
||||
stream = Stream()
|
||||
for method in ("readinto", "readinto1"):
|
||||
with self.subTest(method):
|
||||
buffer = byteslike(5)
|
||||
self.assertEqual(getattr(stream, method)(buffer), 5)
|
||||
self.assertEqual(bytes(buffer), b"12345")
|
||||
|
||||
|
||||
class CIOTest(IOTest):
|
||||
|
||||
|
@ -1394,6 +1435,11 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
|
|||
bufio = self.tp(writer, 8)
|
||||
bufio.write(b"abc")
|
||||
self.assertFalse(writer._write_stack)
|
||||
buffer = bytearray(b"def")
|
||||
bufio.write(buffer)
|
||||
buffer[:] = b"***" # Overwrite our copy of the data
|
||||
bufio.flush()
|
||||
self.assertEqual(b"".join(writer._write_stack), b"abcdef")
|
||||
|
||||
def test_write_overflow(self):
|
||||
writer = self.MockRawIO()
|
||||
|
@ -1720,11 +1766,13 @@ class BufferedRWPairTest(unittest.TestCase):
|
|||
self.assertEqual(pair.read1(3), b"abc")
|
||||
|
||||
def test_readinto(self):
|
||||
pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
|
||||
for method in ("readinto", "readinto1"):
|
||||
with self.subTest(method):
|
||||
pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
|
||||
|
||||
data = bytearray(5)
|
||||
self.assertEqual(pair.readinto(data), 5)
|
||||
self.assertEqual(data, b"abcde")
|
||||
data = byteslike(5)
|
||||
self.assertEqual(getattr(pair, method)(data), 5)
|
||||
self.assertEqual(bytes(data), b"abcde")
|
||||
|
||||
def test_write(self):
|
||||
w = self.MockRawIO()
|
||||
|
@ -1732,7 +1780,9 @@ class BufferedRWPairTest(unittest.TestCase):
|
|||
|
||||
pair.write(b"abc")
|
||||
pair.flush()
|
||||
pair.write(b"def")
|
||||
buffer = bytearray(b"def")
|
||||
pair.write(buffer)
|
||||
buffer[:] = b"***" # Overwrite our copy of the data
|
||||
pair.flush()
|
||||
self.assertEqual(w._write_stack, [b"abc", b"def"])
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue