mirror of
https://github.com/python/cpython.git
synced 2025-08-31 05:58:33 +00:00
Checkpoint.
Some cleanup of test_io.py and io.py. Added seeking to buffered reader and writer, but no tests yet.
This commit is contained in:
parent
186685905c
commit
76c5d4d72d
2 changed files with 111 additions and 77 deletions
|
@ -1,25 +1,26 @@
|
|||
"""Unit tests for io.py."""
|
||||
|
||||
import unittest
|
||||
from test import test_support
|
||||
from itertools import chain
|
||||
from test import test_support
|
||||
|
||||
import io
|
||||
|
||||
|
||||
class MockIO(io.RawIOBase):
|
||||
|
||||
def __init__(self, readStack=()):
|
||||
self._readStack = list(readStack)
|
||||
self._writeStack = []
|
||||
def __init__(self, read_stack=()):
|
||||
self._read_stack = list(read_stack)
|
||||
self._write_stack = []
|
||||
|
||||
def read(self, n=None):
|
||||
try:
|
||||
return self._readStack.pop(0)
|
||||
return self._read_stack.pop(0)
|
||||
except:
|
||||
return b""
|
||||
|
||||
def write(self, b):
|
||||
self._writeStack.append(b)
|
||||
self._write_stack.append(b[:])
|
||||
return len(b)
|
||||
|
||||
def writable(self):
|
||||
|
@ -60,7 +61,7 @@ class MockNonBlockWriterIO(io.RawIOBase):
|
|||
self._write_stack = []
|
||||
|
||||
def write(self, b):
|
||||
self._write_stack.append(b)
|
||||
self._write_stack.append(b[:])
|
||||
n = self.bs.pop(0)
|
||||
if (n < 0):
|
||||
raise io.BlockingIO(0, "test blocking", -n)
|
||||
|
@ -159,7 +160,7 @@ class IOTest(unittest.TestCase):
|
|||
f.close()
|
||||
|
||||
|
||||
class MemorySeekTest(unittest.TestCase):
|
||||
class MemorySeekTestMixin:
|
||||
|
||||
def testInit(self):
|
||||
buf = self.buftype("1234567890")
|
||||
|
@ -203,13 +204,13 @@ class MemorySeekTest(unittest.TestCase):
|
|||
self.assertEquals(10000, bytesIo.tell())
|
||||
|
||||
|
||||
class BytesIOTest(MemorySeekTest):
|
||||
class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
|
||||
buftype = bytes
|
||||
ioclass = io.BytesIO
|
||||
EOF = b""
|
||||
|
||||
|
||||
class StringIOTest(MemorySeekTest):
|
||||
class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
|
||||
buftype = str
|
||||
ioclass = io.StringIO
|
||||
EOF = ""
|
||||
|
@ -218,10 +219,10 @@ class StringIOTest(MemorySeekTest):
|
|||
class BufferedReaderTest(unittest.TestCase):
|
||||
|
||||
def testRead(self):
|
||||
rawIo = MockIO((b"abc", b"d", b"efg"))
|
||||
bufIo = io.BufferedReader(rawIo)
|
||||
rawio = MockIO((b"abc", b"d", b"efg"))
|
||||
bufio = io.BufferedReader(rawio)
|
||||
|
||||
self.assertEquals(b"abcdef", bufIo.read(6))
|
||||
self.assertEquals(b"abcdef", bufio.read(6))
|
||||
|
||||
def testBuffering(self):
|
||||
data = b"abcdefghi"
|
||||
|
@ -234,42 +235,42 @@ class BufferedReaderTest(unittest.TestCase):
|
|||
]
|
||||
|
||||
for bufsize, buf_read_sizes, raw_read_sizes in tests:
|
||||
rawIo = MockFileIO(data)
|
||||
bufIo = io.BufferedReader(rawIo, buffer_size=bufsize)
|
||||
rawio = MockFileIO(data)
|
||||
bufio = io.BufferedReader(rawio, buffer_size=bufsize)
|
||||
pos = 0
|
||||
for nbytes in buf_read_sizes:
|
||||
self.assertEquals(bufIo.read(nbytes), data[pos:pos+nbytes])
|
||||
self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
|
||||
pos += nbytes
|
||||
self.assertEquals(rawIo.read_history, raw_read_sizes)
|
||||
self.assertEquals(rawio.read_history, raw_read_sizes)
|
||||
|
||||
def testReadNonBlocking(self):
|
||||
# Inject some None's in there to simulate EWOULDBLOCK
|
||||
rawIo = MockIO((b"abc", b"d", None, b"efg", None, None))
|
||||
bufIo = io.BufferedReader(rawIo)
|
||||
rawio = MockIO((b"abc", b"d", None, b"efg", None, None))
|
||||
bufio = io.BufferedReader(rawio)
|
||||
|
||||
self.assertEquals(b"abcd", bufIo.read(6))
|
||||
self.assertEquals(b"e", bufIo.read(1))
|
||||
self.assertEquals(b"fg", bufIo.read())
|
||||
self.assert_(None is bufIo.read())
|
||||
self.assertEquals(b"", bufIo.read())
|
||||
self.assertEquals(b"abcd", bufio.read(6))
|
||||
self.assertEquals(b"e", bufio.read(1))
|
||||
self.assertEquals(b"fg", bufio.read())
|
||||
self.assert_(None is bufio.read())
|
||||
self.assertEquals(b"", bufio.read())
|
||||
|
||||
def testReadToEof(self):
|
||||
rawIo = MockIO((b"abc", b"d", b"efg"))
|
||||
bufIo = io.BufferedReader(rawIo)
|
||||
rawio = MockIO((b"abc", b"d", b"efg"))
|
||||
bufio = io.BufferedReader(rawio)
|
||||
|
||||
self.assertEquals(b"abcdefg", bufIo.read(9000))
|
||||
self.assertEquals(b"abcdefg", bufio.read(9000))
|
||||
|
||||
def testReadNoArgs(self):
|
||||
rawIo = MockIO((b"abc", b"d", b"efg"))
|
||||
bufIo = io.BufferedReader(rawIo)
|
||||
rawio = MockIO((b"abc", b"d", b"efg"))
|
||||
bufio = io.BufferedReader(rawio)
|
||||
|
||||
self.assertEquals(b"abcdefg", bufIo.read())
|
||||
self.assertEquals(b"abcdefg", bufio.read())
|
||||
|
||||
def testFileno(self):
|
||||
rawIo = MockIO((b"abc", b"d", b"efg"))
|
||||
bufIo = io.BufferedReader(rawIo)
|
||||
rawio = MockIO((b"abc", b"d", b"efg"))
|
||||
bufio = io.BufferedReader(rawio)
|
||||
|
||||
self.assertEquals(42, bufIo.fileno())
|
||||
self.assertEquals(42, bufio.fileno())
|
||||
|
||||
def testFilenoNoFileno(self):
|
||||
# XXX will we always have fileno() function? If so, kill
|
||||
|
@ -282,55 +283,55 @@ class BufferedWriterTest(unittest.TestCase):
|
|||
def testWrite(self):
|
||||
# Write to the buffered IO but don't overflow the buffer.
|
||||
writer = MockIO()
|
||||
bufIo = io.BufferedWriter(writer, 8)
|
||||
bufio = io.BufferedWriter(writer, 8)
|
||||
|
||||
bufIo.write(b"abc")
|
||||
bufio.write(b"abc")
|
||||
|
||||
self.assertFalse(writer._writeStack)
|
||||
self.assertFalse(writer._write_stack)
|
||||
|
||||
def testWriteOverflow(self):
|
||||
writer = MockIO()
|
||||
bufIo = io.BufferedWriter(writer, 8)
|
||||
bufio = io.BufferedWriter(writer, 8)
|
||||
|
||||
bufIo.write(b"abc")
|
||||
bufIo.write(b"defghijkl")
|
||||
bufio.write(b"abc")
|
||||
bufio.write(b"defghijkl")
|
||||
|
||||
self.assertEquals(b"abcdefghijkl", writer._writeStack[0])
|
||||
self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
|
||||
|
||||
def testWriteNonBlocking(self):
|
||||
raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
|
||||
bufIo = io.BufferedWriter(raw, 8, 16)
|
||||
bufio = io.BufferedWriter(raw, 8, 16)
|
||||
|
||||
bufIo.write(b"asdf")
|
||||
bufIo.write(b"asdfa")
|
||||
bufio.write(b"asdf")
|
||||
bufio.write(b"asdfa")
|
||||
self.assertEquals(b"asdfasdfa", raw._write_stack[0])
|
||||
|
||||
bufIo.write(b"asdfasdfasdf")
|
||||
bufio.write(b"asdfasdfasdf")
|
||||
self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
|
||||
bufIo.write(b"asdfasdfasdf")
|
||||
bufio.write(b"asdfasdfasdf")
|
||||
self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
|
||||
self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
|
||||
|
||||
bufIo.write(b"asdfasdfasdf")
|
||||
bufio.write(b"asdfasdfasdf")
|
||||
|
||||
# XXX I don't like this test. It relies too heavily on how the
|
||||
# algorithm actually works, which we might change. Refactor
|
||||
# later.
|
||||
|
||||
def testFileno(self):
|
||||
rawIo = MockIO((b"abc", b"d", b"efg"))
|
||||
bufIo = io.BufferedWriter(rawIo)
|
||||
rawio = MockIO((b"abc", b"d", b"efg"))
|
||||
bufio = io.BufferedWriter(rawio)
|
||||
|
||||
self.assertEquals(42, bufIo.fileno())
|
||||
self.assertEquals(42, bufio.fileno())
|
||||
|
||||
def testFlush(self):
|
||||
writer = MockIO()
|
||||
bufIo = io.BufferedWriter(writer, 8)
|
||||
bufio = io.BufferedWriter(writer, 8)
|
||||
|
||||
bufIo.write(b"abc")
|
||||
bufIo.flush()
|
||||
bufio.write(b"abc")
|
||||
bufio.flush()
|
||||
|
||||
self.assertEquals(b"abc", writer._writeStack[0])
|
||||
self.assertEquals(b"abc", writer._write_stack[0])
|
||||
|
||||
|
||||
class BufferedRWPairTest(unittest.TestCase):
|
||||
|
@ -352,9 +353,9 @@ class BufferedRandomTest(unittest.TestCase):
|
|||
self.assertEqual(b"as", rw.read(2))
|
||||
rw.write(b"ddd")
|
||||
rw.write(b"eee")
|
||||
self.assertFalse(raw._writeStack) # Buffer writes
|
||||
self.assertFalse(raw._write_stack) # Buffer writes
|
||||
self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
|
||||
self.assertEquals(b"dddeee", raw._writeStack[0])
|
||||
self.assertEquals(b"dddeee", raw._write_stack[0])
|
||||
|
||||
def testSeekAndTell(self):
|
||||
raw = io.BytesIO(b"asdfghjkl")
|
||||
|
@ -400,19 +401,19 @@ class TextIOWrapperTest(unittest.TestCase):
|
|||
|
||||
for newline, exp_line_ends in tests:
|
||||
exp_lines = [ pad + line for line in exp_line_ends ]
|
||||
bufIo = io.BufferedReader(io.BytesIO(data))
|
||||
textIo = io.TextIOWrapper(bufIo, newline=newline,
|
||||
bufio = io.BufferedReader(io.BytesIO(data))
|
||||
textio = io.TextIOWrapper(bufio, newline=newline,
|
||||
encoding=encoding)
|
||||
if do_reads:
|
||||
got_lines = []
|
||||
while True:
|
||||
c2 = textIo.read(2)
|
||||
c2 = textio.read(2)
|
||||
if c2 == '':
|
||||
break
|
||||
self.assertEquals(len(c2), 2)
|
||||
got_lines.append(c2 + textIo.readline())
|
||||
got_lines.append(c2 + textio.readline())
|
||||
else:
|
||||
got_lines = list(textIo)
|
||||
got_lines = list(textio)
|
||||
|
||||
for got_line, exp_line in zip(got_lines, exp_lines):
|
||||
self.assertEquals(got_line, exp_line)
|
||||
|
@ -427,4 +428,4 @@ def test_main():
|
|||
BufferedRandomTest, TextIOWrapperTest)
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_main()
|
||||
unittest.main()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue