Issue #12213: Fix a buffering bug with interleaved reads and writes that

could appear on io.BufferedRandom streams.
This commit is contained in:
Antoine Pitrou 2011-08-20 15:40:58 +02:00
parent acacbaa788
commit 808cec536a
3 changed files with 105 additions and 62 deletions

View file

@ -1367,15 +1367,18 @@ class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
rw.seek(0, 0)
self.assertEqual(b"asdf", rw.read(4))
rw.write(b"asdf")
rw.write(b"123f")
rw.seek(0, 0)
self.assertEqual(b"asdfasdfl", rw.read())
self.assertEqual(b"asdf123fl", rw.read())
self.assertEqual(9, rw.tell())
rw.seek(-4, 2)
self.assertEqual(5, rw.tell())
rw.seek(2, 1)
self.assertEqual(7, rw.tell())
self.assertEqual(b"fl", rw.read(11))
rw.flush()
self.assertEqual(b"asdf123fl", raw.getvalue())
self.assertRaises(TypeError, rw.seek, 0.0)
def check_flush_and_read(self, read_func):
@ -1520,6 +1523,44 @@ class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
BufferedReaderTest.test_misbehaved_io(self)
BufferedWriterTest.test_misbehaved_io(self)
def test_interleaved_read_write(self):
# Test for issue #12213
with self.BytesIO(b'abcdefgh') as raw:
with self.tp(raw, 100) as f:
f.write(b"1")
self.assertEqual(f.read(1), b'b')
f.write(b'2')
self.assertEqual(f.read1(1), b'd')
f.write(b'3')
buf = bytearray(1)
f.readinto(buf)
self.assertEqual(buf, b'f')
f.write(b'4')
self.assertEqual(f.peek(1), b'h')
f.flush()
self.assertEqual(raw.getvalue(), b'1b2d3f4h')
with self.BytesIO(b'abc') as raw:
with self.tp(raw, 100) as f:
self.assertEqual(f.read(1), b'a')
f.write(b"2")
self.assertEqual(f.read(1), b'c')
f.flush()
self.assertEqual(raw.getvalue(), b'a2c')
def test_interleaved_readline_write(self):
with self.BytesIO(b'ab\ncdef\ng\n') as raw:
with self.tp(raw) as f:
f.write(b'1')
self.assertEqual(f.readline(), b'b\n')
f.write(b'2')
self.assertEqual(f.readline(), b'def\n')
f.write(b'3')
self.assertEqual(f.readline(), b'\n')
f.flush()
self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
tp = io.BufferedRandom