mirror of
https://github.com/python/cpython.git
synced 2025-08-31 14:07:50 +00:00
Implement long positioning (Unix only, probably).
Etc., etc.
This commit is contained in:
parent
cce92b27d6
commit
53807dabf0
3 changed files with 210 additions and 179 deletions
|
@ -4,10 +4,10 @@ import unittest
|
|||
from itertools import chain
|
||||
from test import test_support
|
||||
|
||||
import io
|
||||
import io # The module under test
|
||||
|
||||
|
||||
class MockIO(io.RawIOBase):
|
||||
class MockRawIO(io.RawIOBase):
|
||||
|
||||
def __init__(self, read_stack=()):
|
||||
self._read_stack = list(read_stack)
|
||||
|
@ -56,13 +56,13 @@ class MockFileIO(io.BytesIO):
|
|||
|
||||
class MockNonBlockWriterIO(io.RawIOBase):
|
||||
|
||||
def __init__(self, blockingScript):
|
||||
self.bs = list(blockingScript)
|
||||
def __init__(self, blocking_script):
|
||||
self._blocking_script = list(blocking_script)
|
||||
self._write_stack = []
|
||||
|
||||
def write(self, b):
|
||||
self._write_stack.append(b[:])
|
||||
n = self.bs.pop(0)
|
||||
n = self._blocking_script.pop(0)
|
||||
if (n < 0):
|
||||
raise io.BlockingIOError(0, "test blocking", -n)
|
||||
else:
|
||||
|
@ -90,6 +90,23 @@ class IOTest(unittest.TestCase):
|
|||
f.seek(-2, 2)
|
||||
f.truncate()
|
||||
|
||||
def large_file_ops(self, f):
|
||||
assert f.readable()
|
||||
assert f.writable()
|
||||
self.assertEqual(f.seek(2**32), 2**32)
|
||||
self.assertEqual(f.tell(), 2**32)
|
||||
self.assertEqual(f.write(b"xxx"), 3)
|
||||
self.assertEqual(f.tell(), 2**32 + 3)
|
||||
self.assertEqual(f.seek(-1, 1), 2**32 + 2)
|
||||
f.truncate()
|
||||
self.assertEqual(f.tell(), 2**32 + 2)
|
||||
self.assertEqual(f.seek(0, 2), 2**32 + 2)
|
||||
f.truncate(2**32 + 1)
|
||||
self.assertEqual(f.tell(), 2**32 + 1)
|
||||
self.assertEqual(f.seek(0, 2), 2**32 + 1)
|
||||
self.assertEqual(f.seek(-1, 2), 2**32)
|
||||
self.assertEqual(f.read(2), b"x")
|
||||
|
||||
def read_ops(self, f):
|
||||
data = f.read(5)
|
||||
self.assertEqual(data, b"hello")
|
||||
|
@ -130,19 +147,9 @@ class IOTest(unittest.TestCase):
|
|||
f = io.BytesIO(data)
|
||||
self.read_ops(f)
|
||||
|
||||
def test_fileio_FileIO(self):
|
||||
import _fileio
|
||||
f = _fileio._FileIO(test_support.TESTFN, "w")
|
||||
self.assertEqual(f.readable(), False)
|
||||
self.assertEqual(f.writable(), True)
|
||||
self.assertEqual(f.seekable(), True)
|
||||
self.write_ops(f)
|
||||
f.close()
|
||||
f = _fileio._FileIO(test_support.TESTFN, "r")
|
||||
self.assertEqual(f.readable(), True)
|
||||
self.assertEqual(f.writable(), False)
|
||||
self.assertEqual(f.seekable(), True)
|
||||
self.read_ops(f)
|
||||
def test_large_file_ops(self):
|
||||
f = io.open(test_support.TESTFN, "w+b", buffering=0)
|
||||
self.large_file_ops(f)
|
||||
f.close()
|
||||
|
||||
|
||||
|
@ -205,7 +212,7 @@ class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
|
|||
class BufferedReaderTest(unittest.TestCase):
|
||||
|
||||
def testRead(self):
|
||||
rawio = MockIO((b"abc", b"d", b"efg"))
|
||||
rawio = MockRawIO((b"abc", b"d", b"efg"))
|
||||
bufio = io.BufferedReader(rawio)
|
||||
|
||||
self.assertEquals(b"abcdef", bufio.read(6))
|
||||
|
@ -231,7 +238,7 @@ class BufferedReaderTest(unittest.TestCase):
|
|||
|
||||
def testReadNonBlocking(self):
|
||||
# Inject some None's in there to simulate EWOULDBLOCK
|
||||
rawio = MockIO((b"abc", b"d", None, b"efg", None, None))
|
||||
rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
|
||||
bufio = io.BufferedReader(rawio)
|
||||
|
||||
self.assertEquals(b"abcd", bufio.read(6))
|
||||
|
@ -241,19 +248,19 @@ class BufferedReaderTest(unittest.TestCase):
|
|||
self.assertEquals(b"", bufio.read())
|
||||
|
||||
def testReadToEof(self):
|
||||
rawio = MockIO((b"abc", b"d", b"efg"))
|
||||
rawio = MockRawIO((b"abc", b"d", b"efg"))
|
||||
bufio = io.BufferedReader(rawio)
|
||||
|
||||
self.assertEquals(b"abcdefg", bufio.read(9000))
|
||||
|
||||
def testReadNoArgs(self):
|
||||
rawio = MockIO((b"abc", b"d", b"efg"))
|
||||
rawio = MockRawIO((b"abc", b"d", b"efg"))
|
||||
bufio = io.BufferedReader(rawio)
|
||||
|
||||
self.assertEquals(b"abcdefg", bufio.read())
|
||||
|
||||
def testFileno(self):
|
||||
rawio = MockIO((b"abc", b"d", b"efg"))
|
||||
rawio = MockRawIO((b"abc", b"d", b"efg"))
|
||||
bufio = io.BufferedReader(rawio)
|
||||
|
||||
self.assertEquals(42, bufio.fileno())
|
||||
|
@ -268,7 +275,7 @@ class BufferedWriterTest(unittest.TestCase):
|
|||
|
||||
def testWrite(self):
|
||||
# Write to the buffered IO but don't overflow the buffer.
|
||||
writer = MockIO()
|
||||
writer = MockRawIO()
|
||||
bufio = io.BufferedWriter(writer, 8)
|
||||
|
||||
bufio.write(b"abc")
|
||||
|
@ -276,7 +283,7 @@ class BufferedWriterTest(unittest.TestCase):
|
|||
self.assertFalse(writer._write_stack)
|
||||
|
||||
def testWriteOverflow(self):
|
||||
writer = MockIO()
|
||||
writer = MockRawIO()
|
||||
bufio = io.BufferedWriter(writer, 8)
|
||||
|
||||
bufio.write(b"abc")
|
||||
|
@ -305,13 +312,13 @@ class BufferedWriterTest(unittest.TestCase):
|
|||
# later.
|
||||
|
||||
def testFileno(self):
|
||||
rawio = MockIO((b"abc", b"d", b"efg"))
|
||||
rawio = MockRawIO((b"abc", b"d", b"efg"))
|
||||
bufio = io.BufferedWriter(rawio)
|
||||
|
||||
self.assertEquals(42, bufio.fileno())
|
||||
|
||||
def testFlush(self):
|
||||
writer = MockIO()
|
||||
writer = MockRawIO()
|
||||
bufio = io.BufferedWriter(writer, 8)
|
||||
|
||||
bufio.write(b"abc")
|
||||
|
@ -323,8 +330,8 @@ class BufferedWriterTest(unittest.TestCase):
|
|||
class BufferedRWPairTest(unittest.TestCase):
|
||||
|
||||
def testRWPair(self):
|
||||
r = MockIO(())
|
||||
w = MockIO()
|
||||
r = MockRawIO(())
|
||||
w = MockRawIO()
|
||||
pair = io.BufferedRWPair(r, w)
|
||||
|
||||
# XXX need implementation
|
||||
|
@ -333,7 +340,7 @@ class BufferedRWPairTest(unittest.TestCase):
|
|||
class BufferedRandomTest(unittest.TestCase):
|
||||
|
||||
def testReadAndWrite(self):
|
||||
raw = MockIO((b"asdf", b"ghjk"))
|
||||
raw = MockRawIO((b"asdf", b"ghjk"))
|
||||
rw = io.BufferedRandom(raw, 8, 12)
|
||||
|
||||
self.assertEqual(b"as", rw.read(2))
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue