mirror of
https://github.com/python/cpython.git
synced 2025-08-03 08:34:29 +00:00
parent
77143dbaee
commit
a96fea03e8
4 changed files with 235 additions and 19 deletions
|
@ -943,6 +943,71 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
|
|||
self.assertEqual(bufio.readinto(b), 1)
|
||||
self.assertEqual(b, b"cb")
|
||||
|
||||
def test_readinto1(self):
|
||||
buffer_size = 10
|
||||
rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
|
||||
bufio = self.tp(rawio, buffer_size=buffer_size)
|
||||
b = bytearray(2)
|
||||
self.assertEqual(bufio.peek(3), b'abc')
|
||||
self.assertEqual(rawio._reads, 1)
|
||||
self.assertEqual(bufio.readinto1(b), 2)
|
||||
self.assertEqual(b, b"ab")
|
||||
self.assertEqual(rawio._reads, 1)
|
||||
self.assertEqual(bufio.readinto1(b), 1)
|
||||
self.assertEqual(b[:1], b"c")
|
||||
self.assertEqual(rawio._reads, 1)
|
||||
self.assertEqual(bufio.readinto1(b), 2)
|
||||
self.assertEqual(b, b"de")
|
||||
self.assertEqual(rawio._reads, 2)
|
||||
b = bytearray(2*buffer_size)
|
||||
self.assertEqual(bufio.peek(3), b'fgh')
|
||||
self.assertEqual(rawio._reads, 3)
|
||||
self.assertEqual(bufio.readinto1(b), 6)
|
||||
self.assertEqual(b[:6], b"fghjkl")
|
||||
self.assertEqual(rawio._reads, 4)
|
||||
|
||||
def test_readinto_array(self):
|
||||
buffer_size = 60
|
||||
data = b"a" * 26
|
||||
rawio = self.MockRawIO((data,))
|
||||
bufio = self.tp(rawio, buffer_size=buffer_size)
|
||||
|
||||
# Create an array with element size > 1 byte
|
||||
b = array.array('i', b'x' * 32)
|
||||
assert len(b) != 16
|
||||
|
||||
# Read into it. We should get as many *bytes* as we can fit into b
|
||||
# (which is more than the number of elements)
|
||||
n = bufio.readinto(b)
|
||||
self.assertGreater(n, len(b))
|
||||
|
||||
# Check that old contents of b are preserved
|
||||
bm = memoryview(b).cast('B')
|
||||
self.assertLess(n, len(bm))
|
||||
self.assertEqual(bm[:n], data[:n])
|
||||
self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
|
||||
|
||||
def test_readinto1_array(self):
|
||||
buffer_size = 60
|
||||
data = b"a" * 26
|
||||
rawio = self.MockRawIO((data,))
|
||||
bufio = self.tp(rawio, buffer_size=buffer_size)
|
||||
|
||||
# Create an array with element size > 1 byte
|
||||
b = array.array('i', b'x' * 32)
|
||||
assert len(b) != 16
|
||||
|
||||
# Read into it. We should get as many *bytes* as we can fit into b
|
||||
# (which is more than the number of elements)
|
||||
n = bufio.readinto1(b)
|
||||
self.assertGreater(n, len(b))
|
||||
|
||||
# Check that old contents of b are preserved
|
||||
bm = memoryview(b).cast('B')
|
||||
self.assertLess(n, len(bm))
|
||||
self.assertEqual(bm[:n], data[:n])
|
||||
self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
|
||||
|
||||
def test_readlines(self):
|
||||
def bufio():
|
||||
rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
|
||||
|
@ -3050,6 +3115,8 @@ class MiscIOTest(unittest.TestCase):
|
|||
self.assertRaises(ValueError, f.readall)
|
||||
if hasattr(f, "readinto"):
|
||||
self.assertRaises(ValueError, f.readinto, bytearray(1024))
|
||||
if hasattr(f, "readinto1"):
|
||||
self.assertRaises(ValueError, f.readinto1, bytearray(1024))
|
||||
self.assertRaises(ValueError, f.readline)
|
||||
self.assertRaises(ValueError, f.readlines)
|
||||
self.assertRaises(ValueError, f.seek, 0)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue