mirror of
https://github.com/python/cpython.git
synced 2025-08-31 14:07:50 +00:00
Added a working Text I/O layer, by Mark Russell.
This is essentially a checkpoint.
This commit is contained in:
parent
0e074483e7
commit
78892e4613
2 changed files with 366 additions and 47 deletions
|
@ -2,7 +2,7 @@
|
|||
|
||||
import unittest
|
||||
from test import test_support
|
||||
|
||||
from itertools import chain
|
||||
import io
|
||||
|
||||
|
||||
|
@ -16,7 +16,7 @@ class MockIO(io.RawIOBase):
|
|||
try:
|
||||
return self._readStack.pop(0)
|
||||
except:
|
||||
return io.EOF
|
||||
return b""
|
||||
|
||||
def write(self, b):
|
||||
self._writeStack.append(b)
|
||||
|
@ -41,6 +41,18 @@ class MockIO(io.RawIOBase):
|
|||
return 42
|
||||
|
||||
|
||||
class MockFileIO(io.BytesIO):
|
||||
|
||||
def __init__(self, data):
|
||||
self.read_history = []
|
||||
io.BytesIO.__init__(self, data)
|
||||
|
||||
def read(self, n=None):
|
||||
res = io.BytesIO.read(self, n)
|
||||
self.read_history.append(None if res is None else len(res))
|
||||
return res
|
||||
|
||||
|
||||
class MockNonBlockWriterIO(io.RawIOBase):
|
||||
|
||||
def __init__(self, blockingScript):
|
||||
|
@ -147,31 +159,31 @@ class IOTest(unittest.TestCase):
|
|||
f.close()
|
||||
|
||||
|
||||
class BytesIOTest(unittest.TestCase):
|
||||
class MemorySeekTest(unittest.TestCase):
|
||||
|
||||
def testInit(self):
|
||||
buf = b"1234567890"
|
||||
bytesIo = io.BytesIO(buf)
|
||||
buf = self.buftype("1234567890")
|
||||
bytesIo = self.ioclass(buf)
|
||||
|
||||
def testRead(self):
|
||||
buf = b"1234567890"
|
||||
bytesIo = io.BytesIO(buf)
|
||||
buf = self.buftype("1234567890")
|
||||
bytesIo = self.ioclass(buf)
|
||||
|
||||
self.assertEquals(buf[:1], bytesIo.read(1))
|
||||
self.assertEquals(buf[1:5], bytesIo.read(4))
|
||||
self.assertEquals(buf[5:], bytesIo.read(900))
|
||||
self.assertEquals(io.EOF, bytesIo.read())
|
||||
self.assertEquals(self.EOF, bytesIo.read())
|
||||
|
||||
def testReadNoArgs(self):
|
||||
buf = b"1234567890"
|
||||
bytesIo = io.BytesIO(buf)
|
||||
buf = self.buftype("1234567890")
|
||||
bytesIo = self.ioclass(buf)
|
||||
|
||||
self.assertEquals(buf, bytesIo.read())
|
||||
self.assertEquals(io.EOF, bytesIo.read())
|
||||
self.assertEquals(self.EOF, bytesIo.read())
|
||||
|
||||
def testSeek(self):
|
||||
buf = b"1234567890"
|
||||
bytesIo = io.BytesIO(buf)
|
||||
buf = self.buftype("1234567890")
|
||||
bytesIo = self.ioclass(buf)
|
||||
|
||||
bytesIo.read(5)
|
||||
bytesIo.seek(0)
|
||||
|
@ -181,8 +193,8 @@ class BytesIOTest(unittest.TestCase):
|
|||
self.assertEquals(buf[3:], bytesIo.read())
|
||||
|
||||
def testTell(self):
|
||||
buf = b"1234567890"
|
||||
bytesIo = io.BytesIO(buf)
|
||||
buf = self.buftype("1234567890")
|
||||
bytesIo = self.ioclass(buf)
|
||||
|
||||
self.assertEquals(0, bytesIo.tell())
|
||||
bytesIo.seek(5)
|
||||
|
@ -191,6 +203,18 @@ class BytesIOTest(unittest.TestCase):
|
|||
self.assertEquals(10000, bytesIo.tell())
|
||||
|
||||
|
||||
class BytesIOTest(MemorySeekTest):
|
||||
buftype = bytes
|
||||
ioclass = io.BytesIO
|
||||
EOF = b""
|
||||
|
||||
|
||||
class StringIOTest(MemorySeekTest):
|
||||
buftype = str
|
||||
ioclass = io.StringIO
|
||||
EOF = ""
|
||||
|
||||
|
||||
class BufferedReaderTest(unittest.TestCase):
|
||||
|
||||
def testRead(self):
|
||||
|
@ -199,6 +223,25 @@ class BufferedReaderTest(unittest.TestCase):
|
|||
|
||||
self.assertEquals(b"abcdef", bufIo.read(6))
|
||||
|
||||
def testBuffering(self):
|
||||
data = b"abcdefghi"
|
||||
dlen = len(data)
|
||||
|
||||
tests = [
|
||||
[ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
|
||||
[ 100, [ 3, 3, 3], [ dlen ] ],
|
||||
[ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
|
||||
]
|
||||
|
||||
for bufsize, buf_read_sizes, raw_read_sizes in tests:
|
||||
rawIo = MockFileIO(data)
|
||||
bufIo = io.BufferedReader(rawIo, buffer_size=bufsize)
|
||||
pos = 0
|
||||
for nbytes in buf_read_sizes:
|
||||
self.assertEquals(bufIo.read(nbytes), data[pos:pos+nbytes])
|
||||
pos += nbytes
|
||||
self.assertEquals(rawIo.read_history, raw_read_sizes)
|
||||
|
||||
def testReadNonBlocking(self):
|
||||
# Inject some None's in there to simulate EWOULDBLOCK
|
||||
rawIo = MockIO((b"abc", b"d", None, b"efg", None, None))
|
||||
|
@ -208,7 +251,7 @@ class BufferedReaderTest(unittest.TestCase):
|
|||
self.assertEquals(b"e", bufIo.read(1))
|
||||
self.assertEquals(b"fg", bufIo.read())
|
||||
self.assert_(None is bufIo.read())
|
||||
self.assertEquals(io.EOF, bufIo.read())
|
||||
self.assertEquals(b"", bufIo.read())
|
||||
|
||||
def testReadToEof(self):
|
||||
rawIo = MockIO((b"abc", b"d", b"efg"))
|
||||
|
@ -270,8 +313,9 @@ class BufferedWriterTest(unittest.TestCase):
|
|||
|
||||
bufIo.write(b"asdfasdfasdf")
|
||||
|
||||
# XXX I don't like this test. It relies too heavily on how the algorithm
|
||||
# actually works, which we might change. Refactor later.
|
||||
# XXX I don't like this test. It relies too heavily on how the
|
||||
# algorithm actually works, which we might change. Refactor
|
||||
# later.
|
||||
|
||||
def testFileno(self):
|
||||
rawIo = MockIO((b"abc", b"d", b"efg"))
|
||||
|
@ -299,7 +343,7 @@ class BufferedRWPairTest(unittest.TestCase):
|
|||
# XXX need implementation
|
||||
|
||||
|
||||
class BufferedRandom(unittest.TestCase):
|
||||
class BufferedRandomTest(unittest.TestCase):
|
||||
|
||||
def testReadAndWrite(self):
|
||||
raw = MockIO((b"asdf", b"ghjk"))
|
||||
|
@ -331,12 +375,56 @@ class BufferedRandom(unittest.TestCase):
|
|||
self.assertEquals(7, rw.tell())
|
||||
self.assertEquals(b"fl", rw.read(11))
|
||||
|
||||
|
||||
class TextIOWrapperTest(unittest.TestCase):
|
||||
def testNewlines(self):
|
||||
input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
|
||||
|
||||
tests = [
|
||||
[ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
|
||||
[ '\n', input_lines ],
|
||||
[ '\r\n', input_lines ],
|
||||
]
|
||||
|
||||
encodings = ('utf-8', 'bz2')
|
||||
|
||||
# Try a range of pad sizes to test the case where \r is the last
|
||||
# character in TextIOWrapper._pending_line.
|
||||
for encoding in encodings:
|
||||
for do_reads in (False, True):
|
||||
for padlen in chain(range(10), range(50, 60)):
|
||||
pad = '.' * padlen
|
||||
data_lines = [ pad + line for line in input_lines ]
|
||||
# XXX: str.encode() should return bytes
|
||||
data = bytes(''.join(data_lines).encode(encoding))
|
||||
|
||||
for newline, exp_line_ends in tests:
|
||||
exp_lines = [ pad + line for line in exp_line_ends ]
|
||||
bufIo = io.BufferedReader(io.BytesIO(data))
|
||||
textIo = io.TextIOWrapper(bufIo, newline=newline,
|
||||
encoding=encoding)
|
||||
if do_reads:
|
||||
got_lines = []
|
||||
while True:
|
||||
c2 = textIo.read(2)
|
||||
if c2 == '':
|
||||
break
|
||||
self.assertEquals(len(c2), 2)
|
||||
got_lines.append(c2 + textIo.readline())
|
||||
else:
|
||||
got_lines = list(textIo)
|
||||
|
||||
for got_line, exp_line in zip(got_lines, exp_lines):
|
||||
self.assertEquals(got_line, exp_line)
|
||||
self.assertEquals(len(got_lines), len(exp_lines))
|
||||
|
||||
# XXX Tests for open()
|
||||
|
||||
def test_main():
|
||||
test_support.run_unittest(IOTest, BytesIOTest, BufferedReaderTest,
|
||||
test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
|
||||
BufferedReaderTest,
|
||||
BufferedWriterTest, BufferedRWPairTest,
|
||||
BufferedRandom)
|
||||
BufferedRandomTest, TextIOWrapperTest)
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_main()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue