mirror of
https://github.com/python/cpython.git
synced 2025-08-28 12:45:07 +00:00
New version from Mike Verdone (sat in my inbox since 2/27).
I cleaned up whitespace but otherwise didn't change it. This will need work to reflect the tentative decision to drop nonblocking I/O support from the buffering layers.
This commit is contained in:
parent
c78855465f
commit
01a2752d19
2 changed files with 288 additions and 87 deletions
|
@ -5,10 +5,10 @@ from test import test_support
|
|||
|
||||
import io
|
||||
|
||||
|
||||
class MockReadIO(io.RawIOBase):
|
||||
def __init__(self, readStack):
|
||||
class MockIO(io.RawIOBase):
|
||||
def __init__(self, readStack=()):
|
||||
self._readStack = list(readStack)
|
||||
self._writeStack = []
|
||||
|
||||
def read(self, n=None):
|
||||
try:
|
||||
|
@ -16,27 +16,41 @@ class MockReadIO(io.RawIOBase):
|
|||
except:
|
||||
return io.EOF
|
||||
|
||||
def write(self, b):
|
||||
self._writeStack.append(b)
|
||||
return len(b)
|
||||
|
||||
def writable(self):
|
||||
return True
|
||||
|
||||
def fileno(self):
|
||||
return 42
|
||||
|
||||
def readable(self):
|
||||
return True
|
||||
|
||||
|
||||
class MockWriteIO(io.RawIOBase):
|
||||
def __init__(self):
|
||||
self._writeStack = []
|
||||
|
||||
def write(self, b):
|
||||
self._writeStack.append(b)
|
||||
return len(b)
|
||||
|
||||
def writeable(self):
|
||||
def seekable(self):
|
||||
return True
|
||||
|
||||
def fileno(self):
|
||||
def seek(self, pos, whence):
|
||||
pass
|
||||
|
||||
def tell(self):
|
||||
return 42
|
||||
|
||||
class MockNonBlockWriterIO(io.RawIOBase):
|
||||
def __init__(self, blockingScript):
|
||||
self.bs = list(blockingScript)
|
||||
self._write_stack = []
|
||||
def write(self, b):
|
||||
self._write_stack.append(b)
|
||||
n = self.bs.pop(0)
|
||||
if (n < 0):
|
||||
raise io.BlockingIO(0, "test blocking", -n)
|
||||
else:
|
||||
return n
|
||||
def writable(self):
|
||||
return True
|
||||
|
||||
class IOTest(unittest.TestCase):
|
||||
|
||||
|
@ -90,9 +104,7 @@ class IOTest(unittest.TestCase):
|
|||
f = io.BytesIO(data)
|
||||
self.read_ops(f)
|
||||
|
||||
|
||||
class BytesIOTest(unittest.TestCase):
|
||||
|
||||
def testInit(self):
|
||||
buf = b"1234567890"
|
||||
bytesIo = io.BytesIO(buf)
|
||||
|
@ -134,44 +146,51 @@ class BytesIOTest(unittest.TestCase):
|
|||
bytesIo.seek(10000)
|
||||
self.assertEquals(10000, bytesIo.tell())
|
||||
|
||||
|
||||
class BufferedReaderTest(unittest.TestCase):
|
||||
|
||||
def testRead(self):
|
||||
rawIo = MockReadIO((b"abc", b"d", b"efg"))
|
||||
rawIo = MockIO((b"abc", b"d", b"efg"))
|
||||
bufIo = io.BufferedReader(rawIo)
|
||||
|
||||
self.assertEquals(b"abcdef", bufIo.read(6))
|
||||
|
||||
def testReadNonBlocking(self):
|
||||
# Inject some None's in there to simulate EWOULDBLOCK
|
||||
rawIo = MockIO((b"abc", b"d", None, b"efg", None, None))
|
||||
bufIo = io.BufferedReader(rawIo)
|
||||
|
||||
self.assertEquals(b"abcd", bufIo.read(6))
|
||||
self.assertEquals(b"e", bufIo.read(1))
|
||||
self.assertEquals(b"fg", bufIo.read())
|
||||
self.assert_(None is bufIo.read())
|
||||
self.assertEquals(io.EOF, bufIo.read())
|
||||
|
||||
def testReadToEof(self):
|
||||
rawIo = MockReadIO((b"abc", b"d", b"efg"))
|
||||
rawIo = MockIO((b"abc", b"d", b"efg"))
|
||||
bufIo = io.BufferedReader(rawIo)
|
||||
|
||||
self.assertEquals(b"abcdefg", bufIo.read(9000))
|
||||
|
||||
def testReadNoArgs(self):
|
||||
rawIo = MockReadIO((b"abc", b"d", b"efg"))
|
||||
rawIo = MockIO((b"abc", b"d", b"efg"))
|
||||
bufIo = io.BufferedReader(rawIo)
|
||||
|
||||
self.assertEquals(b"abcdefg", bufIo.read())
|
||||
|
||||
def testFileno(self):
|
||||
rawIo = MockReadIO((b"abc", b"d", b"efg"))
|
||||
rawIo = MockIO((b"abc", b"d", b"efg"))
|
||||
bufIo = io.BufferedReader(rawIo)
|
||||
|
||||
self.assertEquals(42, bufIo.fileno())
|
||||
|
||||
def testFilenoNoFileno(self):
|
||||
# TODO will we always have fileno() function? If so, kill
|
||||
# XXX will we always have fileno() function? If so, kill
|
||||
# this test. Else, write it.
|
||||
pass
|
||||
|
||||
|
||||
class BufferedWriterTest(unittest.TestCase):
|
||||
|
||||
def testWrite(self):
|
||||
# Write to the buffered IO but don't overflow the buffer.
|
||||
writer = MockWriteIO()
|
||||
writer = MockIO()
|
||||
bufIo = io.BufferedWriter(writer, 8)
|
||||
|
||||
bufIo.write(b"abc")
|
||||
|
@ -179,7 +198,7 @@ class BufferedWriterTest(unittest.TestCase):
|
|||
self.assertFalse(writer._writeStack)
|
||||
|
||||
def testWriteOverflow(self):
|
||||
writer = MockWriteIO()
|
||||
writer = MockIO()
|
||||
bufIo = io.BufferedWriter(writer, 8)
|
||||
|
||||
bufIo.write(b"abc")
|
||||
|
@ -187,8 +206,33 @@ class BufferedWriterTest(unittest.TestCase):
|
|||
|
||||
self.assertEquals(b"abcdefghijkl", writer._writeStack[0])
|
||||
|
||||
def testWriteNonBlocking(self):
|
||||
raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
|
||||
bufIo = io.BufferedWriter(raw, 8, 16)
|
||||
|
||||
bufIo.write(b"asdf")
|
||||
bufIo.write(b"asdfa")
|
||||
self.assertEquals(b"asdfasdfa", raw._write_stack[0])
|
||||
|
||||
bufIo.write(b"asdfasdfasdf")
|
||||
self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
|
||||
bufIo.write(b"asdfasdfasdf")
|
||||
self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
|
||||
self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
|
||||
|
||||
bufIo.write(b"asdfasdfasdf")
|
||||
|
||||
# XXX I don't like this test. It relies too heavily on how the algorithm
|
||||
# actually works, which we might change. Refactor later.
|
||||
|
||||
def testFileno(self):
|
||||
rawIo = MockIO((b"abc", b"d", b"efg"))
|
||||
bufIo = io.BufferedWriter(rawIo)
|
||||
|
||||
self.assertEquals(42, bufIo.fileno())
|
||||
|
||||
def testFlush(self):
|
||||
writer = MockWriteIO()
|
||||
writer = MockIO()
|
||||
bufIo = io.BufferedWriter(writer, 8)
|
||||
|
||||
bufIo.write(b"abc")
|
||||
|
@ -196,11 +240,51 @@ class BufferedWriterTest(unittest.TestCase):
|
|||
|
||||
self.assertEquals(b"abc", writer._writeStack[0])
|
||||
|
||||
# TODO. Tests for open()
|
||||
class BufferedRWPairTest(unittest.TestCase):
|
||||
def testRWPair(self):
|
||||
r = MockIO(())
|
||||
w = MockIO()
|
||||
pair = io.BufferedRWPair(r, w)
|
||||
|
||||
# XXX need implementation
|
||||
|
||||
class BufferedRandom(unittest.TestCase):
|
||||
def testReadAndWrite(self):
|
||||
raw = MockIO((b"asdf", b"ghjk"))
|
||||
rw = io.BufferedRandom(raw, 8, 12)
|
||||
|
||||
self.assertEqual(b"as", rw.read(2))
|
||||
rw.write(b"ddd")
|
||||
rw.write(b"eee")
|
||||
self.assertFalse(raw._writeStack) # Buffer writes
|
||||
self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
|
||||
self.assertEquals(b"dddeee", raw._writeStack[0])
|
||||
|
||||
def testSeekAndTell(self):
|
||||
raw = io.BytesIO(b"asdfghjkl")
|
||||
rw = io.BufferedRandom(raw)
|
||||
|
||||
self.assertEquals(b"as", rw.read(2))
|
||||
self.assertEquals(2, rw.tell())
|
||||
rw.seek(0, 0)
|
||||
self.assertEquals(b"asdf", rw.read(4))
|
||||
|
||||
rw.write(b"asdf")
|
||||
rw.seek(0, 0)
|
||||
self.assertEquals(b"asdfasdfl", rw.read())
|
||||
self.assertEquals(9, rw.tell())
|
||||
rw.seek(-4, 2)
|
||||
self.assertEquals(5, rw.tell())
|
||||
rw.seek(2, 1)
|
||||
self.assertEquals(7, rw.tell())
|
||||
self.assertEquals(b"fl", rw.read(11))
|
||||
|
||||
# XXX Tests for open()
|
||||
|
||||
def test_main():
|
||||
test_support.run_unittest(IOTest, BytesIOTest, BufferedReaderTest,
|
||||
BufferedWriterTest)
|
||||
BufferedWriterTest, BufferedRWPairTest,
|
||||
BufferedRandom)
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_main()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue