Manual py3k backport: [svn r74158] Issue #6218: Make io.BytesIO and io.StringIO picklable.

This commit is contained in:
Antoine Pitrou 2009-10-24 12:23:18 +00:00
parent fd42f30991
commit fa94e80f3b
5 changed files with 415 additions and 18 deletions

View file

@ -12,6 +12,7 @@ from test import test_support as support
import io
import _pyio as pyio
import sys
import pickle
class MemorySeekTestMixin:
@ -352,6 +353,42 @@ class MemoryTestMixin:
memio = self.ioclass()
memio.foo = 1
def test_pickling(self):
buf = self.buftype("1234567890")
memio = self.ioclass(buf)
memio.foo = 42
memio.seek(2)
class PickleTestMemIO(self.ioclass):
def __init__(me, initvalue, foo):
self.ioclass.__init__(me, initvalue)
me.foo = foo
# __getnewargs__ is undefined on purpose. This checks that PEP 307
# is used to provide pickling support.
# Pickle expects the class to be on the module level. Here we use a
# little hack to allow the PickleTestMemIO class to derive from
# self.ioclass without having to define all combinations explictly on
# the module-level.
import __main__
PickleTestMemIO.__module__ = '__main__'
__main__.PickleTestMemIO = PickleTestMemIO
submemio = PickleTestMemIO(buf, 80)
submemio.seek(2)
# We only support pickle protocol 2 and onward since we use extended
# __reduce__ API of PEP 307 to provide pickling support.
for proto in range(2, pickle.HIGHEST_PROTOCOL):
for obj in (memio, submemio):
obj2 = pickle.loads(pickle.dumps(obj, protocol=proto))
self.assertEqual(obj.getvalue(), obj2.getvalue())
self.assertEqual(obj.__class__, obj2.__class__)
self.assertEqual(obj.foo, obj2.foo)
self.assertEqual(obj.tell(), obj2.tell())
obj.close()
self.assertRaises(ValueError, pickle.dumps, obj, proto)
del __main__.PickleTestMemIO
class PyBytesIOTest(MemoryTestMixin, MemorySeekTestMixin, unittest.TestCase):
@ -431,13 +468,26 @@ class PyBytesIOTest(MemoryTestMixin, MemorySeekTestMixin, unittest.TestCase):
self.assertEqual(memio.getvalue(), buf)
class PyStringIOTest(MemoryTestMixin, MemorySeekTestMixin, unittest.TestCase):
buftype = unicode
ioclass = pyio.StringIO
UnsupportedOperation = pyio.UnsupportedOperation
EOF = ""
class TextIOTestMixin:
# TextIO-specific behaviour.
def test_relative_seek(self):
memio = self.ioclass()
self.assertRaises(IOError, memio.seek, -1, 1)
self.assertRaises(IOError, memio.seek, 3, 1)
self.assertRaises(IOError, memio.seek, -3, 1)
self.assertRaises(IOError, memio.seek, -1, 2)
self.assertRaises(IOError, memio.seek, 1, 1)
self.assertRaises(IOError, memio.seek, 1, 2)
def test_textio_properties(self):
memio = self.ioclass()
# These are just dummy values but we nevertheless check them for fear
# of unexpected breakage.
self.assertTrue(memio.encoding is None)
self.assertEqual(memio.errors, "strict")
self.assertEqual(memio.line_buffering, False)
def test_newlines_property(self):
memio = self.ioclass(newline=None)
@ -519,7 +569,6 @@ class PyStringIOTest(MemoryTestMixin, MemorySeekTestMixin, unittest.TestCase):
def test_newline_cr(self):
# newline="\r"
memio = self.ioclass("a\nb\r\nc\rd", newline="\r")
memio.seek(0)
self.assertEqual(memio.read(), "a\rb\r\rc\rd")
memio.seek(0)
self.assertEqual(list(memio), ["a\r", "b\r", "\r", "c\r", "d"])
@ -527,7 +576,6 @@ class PyStringIOTest(MemoryTestMixin, MemorySeekTestMixin, unittest.TestCase):
def test_newline_crlf(self):
# newline="\r\n"
memio = self.ioclass("a\nb\r\nc\rd", newline="\r\n")
memio.seek(0)
self.assertEqual(memio.read(), "a\r\nb\r\r\nc\rd")
memio.seek(0)
self.assertEqual(list(memio), ["a\r\n", "b\r\r\n", "c\rd"])
@ -538,6 +586,28 @@ class PyStringIOTest(MemoryTestMixin, MemorySeekTestMixin, unittest.TestCase):
self.assertEqual(memio.read(5), "a\nb\n")
class PyStringIOTest(MemoryTestMixin, MemorySeekTestMixin,
TextIOTestMixin, unittest.TestCase):
buftype = unicode
ioclass = pyio.StringIO
UnsupportedOperation = pyio.UnsupportedOperation
EOF = ""
class PyStringIOPickleTest(TextIOTestMixin, unittest.TestCase):
"""Test if pickle restores properly the internal state of StringIO.
"""
buftype = unicode
UnsupportedOperation = pyio.UnsupportedOperation
EOF = ""
class ioclass(pyio.StringIO):
def __new__(cls, *args, **kwargs):
return pickle.loads(pickle.dumps(pyio.StringIO(*args, **kwargs)))
def __init__(self, *args, **kwargs):
pass
class CBytesIOTest(PyBytesIOTest):
ioclass = io.BytesIO
UnsupportedOperation = io.UnsupportedOperation
@ -547,6 +617,33 @@ class CBytesIOTest(PyBytesIOTest):
)(PyBytesIOTest.test_bytes_array)
def test_getstate(self):
memio = self.ioclass()
state = memio.__getstate__()
self.assertEqual(len(state), 3)
bytearray(state[0]) # Check if state[0] supports the buffer interface.
self.assert_(isinstance(state[1], int))
self.assert_(isinstance(state[2], dict) or state[2] is None)
memio.close()
self.assertRaises(ValueError, memio.__getstate__)
def test_setstate(self):
# This checks whether __setstate__ does proper input validation.
memio = self.ioclass()
memio.__setstate__((b"no error", 0, None))
memio.__setstate__((bytearray(b"no error"), 0, None))
memio.__setstate__((b"no error", 0, {'spam': 3}))
self.assertRaises(ValueError, memio.__setstate__, (b"", -1, None))
self.assertRaises(TypeError, memio.__setstate__, ("unicode", 0, None))
self.assertRaises(TypeError, memio.__setstate__, (b"", 0.0, None))
self.assertRaises(TypeError, memio.__setstate__, (b"", 0, 0))
self.assertRaises(TypeError, memio.__setstate__, (b"len-test", 0))
self.assertRaises(TypeError, memio.__setstate__)
self.assertRaises(TypeError, memio.__setstate__, 0)
memio.close()
self.assertRaises(ValueError, memio.__setstate__, (b"closed", 0, None))
class CStringIOTest(PyStringIOTest):
ioclass = io.StringIO
UnsupportedOperation = io.UnsupportedOperation
@ -565,9 +662,50 @@ class CStringIOTest(PyStringIOTest):
self.assertEqual(memio.tell(), len(buf) * 2)
self.assertEqual(memio.getvalue(), buf + buf)
def test_getstate(self):
memio = self.ioclass()
state = memio.__getstate__()
self.assertEqual(len(state), 4)
self.assert_(isinstance(state[0], unicode))
self.assert_(isinstance(state[1], str))
self.assert_(isinstance(state[2], int))
self.assert_(isinstance(state[3], dict) or state[3] is None)
memio.close()
self.assertRaises(ValueError, memio.__getstate__)
def test_setstate(self):
# This checks whether __setstate__ does proper input validation.
memio = self.ioclass()
memio.__setstate__(("no error", "\n", 0, None))
memio.__setstate__(("no error", "", 0, {'spam': 3}))
self.assertRaises(ValueError, memio.__setstate__, ("", "f", 0, None))
self.assertRaises(ValueError, memio.__setstate__, ("", "", -1, None))
self.assertRaises(TypeError, memio.__setstate__, (b"", "", 0, None))
# trunk is more tolerant than py3k on the type of the newline param
#self.assertRaises(TypeError, memio.__setstate__, ("", b"", 0, None))
self.assertRaises(TypeError, memio.__setstate__, ("", "", 0.0, None))
self.assertRaises(TypeError, memio.__setstate__, ("", "", 0, 0))
self.assertRaises(TypeError, memio.__setstate__, ("len-test", 0))
self.assertRaises(TypeError, memio.__setstate__)
self.assertRaises(TypeError, memio.__setstate__, 0)
memio.close()
self.assertRaises(ValueError, memio.__setstate__, ("closed", "", 0, None))
class CStringIOPickleTest(PyStringIOPickleTest):
UnsupportedOperation = io.UnsupportedOperation
class ioclass(io.StringIO):
def __new__(cls, *args, **kwargs):
return pickle.loads(pickle.dumps(io.StringIO(*args, **kwargs),
protocol=2))
def __init__(self, *args, **kwargs):
pass
def test_main():
tests = [PyBytesIOTest, PyStringIOTest, CBytesIOTest, CStringIOTest]
tests = [PyBytesIOTest, PyStringIOTest, CBytesIOTest, CStringIOTest,
PyStringIOPickleTest, CStringIOPickleTest]
support.run_unittest(*tests)
if __name__ == '__main__':