Issue 5665: add more pickling tests.

- Add tests for the module-level load() and dump() functions.
- Add tests for cPickle's internal data structures, stressing workloads
with many gets/puts.
- Add tests for the Pickler and Unpickler classes, in particular the
memo attribute.
- test_xpickle is extended to test backwards compatibility with Python
2.4, 2.5 and 2.6 by round-tripping pickled objects through a worker
process. This is guarded with a regrtest -u xpickle resource.
This commit is contained in:
Collin Winter 2009-04-09 16:46:46 +00:00
parent 5963185b23
commit f8089c7789
5 changed files with 428 additions and 21 deletions

View file

@ -1,11 +1,11 @@
import unittest
import pickle
import cPickle
import cStringIO
import pickletools
import copy_reg
from test.test_support import TestFailed, have_unicode, TESTFN, \
run_with_locale
from test.test_support import TestFailed, have_unicode, TESTFN
# Tests that try a number of pickle protocols should have a
# for proto in protocols:
@ -13,6 +13,42 @@ from test.test_support import TestFailed, have_unicode, TESTFN, \
assert pickle.HIGHEST_PROTOCOL == cPickle.HIGHEST_PROTOCOL == 2
protocols = range(pickle.HIGHEST_PROTOCOL + 1)
# Copy of test.test_support.run_with_locale. This is needed to support Python
# 2.4, which didn't include it. This is all to support test_xpickle, which
# bounces pickled objects through older Python versions to test backwards
# compatibility.
def run_with_locale(catstr, *locales):
def decorator(func):
def inner(*args, **kwds):
try:
import locale
category = getattr(locale, catstr)
orig_locale = locale.setlocale(category)
except AttributeError:
# if the test author gives us an invalid category string
raise
except:
# cannot retrieve original locale, so do nothing
locale = orig_locale = None
else:
for loc in locales:
try:
locale.setlocale(category, loc)
break
except:
pass
# now run the function, resetting the locale on exceptions
try:
return func(*args, **kwds)
finally:
if locale and orig_locale:
locale.setlocale(category, orig_locale)
inner.func_name = func.func_name
inner.__doc__ = func.__doc__
return inner
return decorator
# Return True if opcode code appears in the pickle, else False.
def opcode_in_pickle(code, pickle):
@ -409,12 +445,11 @@ class AbstractPickleTests(unittest.TestCase):
# is a mystery. cPickle also suppresses PUT for objects with a refcount
# of 1.
def dont_test_disassembly(self):
from cStringIO import StringIO
from pickletools import dis
for proto, expected in (0, DATA0_DIS), (1, DATA1_DIS):
s = self.dumps(self._testdata, proto)
filelike = StringIO()
filelike = cStringIO.StringIO()
dis(s, out=filelike)
got = filelike.getvalue()
self.assertEqual(expected, got)
@ -822,7 +857,7 @@ class AbstractPickleTests(unittest.TestCase):
self.assertEqual(x.bar, y.bar)
def test_reduce_overrides_default_reduce_ex(self):
for proto in 0, 1, 2:
for proto in protocols:
x = REX_one()
self.assertEqual(x._reduce_called, 0)
s = self.dumps(x, proto)
@ -831,7 +866,7 @@ class AbstractPickleTests(unittest.TestCase):
self.assertEqual(y._reduce_called, 0)
def test_reduce_ex_called(self):
for proto in 0, 1, 2:
for proto in protocols:
x = REX_two()
self.assertEqual(x._proto, None)
s = self.dumps(x, proto)
@ -840,7 +875,7 @@ class AbstractPickleTests(unittest.TestCase):
self.assertEqual(y._proto, None)
def test_reduce_ex_overrides_reduce(self):
for proto in 0, 1, 2:
for proto in protocols:
x = REX_three()
self.assertEqual(x._proto, None)
s = self.dumps(x, proto)
@ -849,7 +884,7 @@ class AbstractPickleTests(unittest.TestCase):
self.assertEqual(y._proto, None)
def test_reduce_ex_calls_base(self):
for proto in 0, 1, 2:
for proto in protocols:
x = REX_four()
self.assertEqual(x._proto, None)
s = self.dumps(x, proto)
@ -858,7 +893,7 @@ class AbstractPickleTests(unittest.TestCase):
self.assertEqual(y._proto, proto)
def test_reduce_calls_base(self):
for proto in 0, 1, 2:
for proto in protocols:
x = REX_five()
self.assertEqual(x._reduce_called, 0)
s = self.dumps(x, proto)
@ -879,7 +914,7 @@ class AbstractPickleTests(unittest.TestCase):
return dict, (), None, None, []
# Protocol 0 is less strict and also accept iterables.
for proto in 0, 1, 2:
for proto in protocols:
try:
self.dumps(C(), proto)
except (AttributeError, pickle.PickleError, cPickle.PickleError):
@ -889,6 +924,21 @@ class AbstractPickleTests(unittest.TestCase):
except (AttributeError, pickle.PickleError, cPickle.PickleError):
pass
def test_many_puts_and_gets(self):
# Test that internal data structures correctly deal with lots of
# puts/gets.
keys = ("aaa" + str(i) for i in xrange(100))
large_dict = dict((k, [4, 5, 6]) for k in keys)
obj = [dict(large_dict), dict(large_dict), dict(large_dict)]
for proto in protocols:
dumped = self.dumps(obj, proto)
loaded = self.loads(dumped)
self.assertEqual(loaded, obj,
"Failed protocol %d: %r != %r"
% (proto, obj, loaded))
# Test classes for reduce_ex
class REX_one(object):
@ -990,13 +1040,20 @@ class AbstractPickleModuleTests(unittest.TestCase):
finally:
os.remove(TESTFN)
def test_load_from_and_dump_to_file(self):
stream = cStringIO.StringIO()
data = [123, {}, 124]
self.module.dump(data, stream)
stream.seek(0)
unpickled = self.module.load(stream)
self.assertEqual(unpickled, data)
def test_highest_protocol(self):
# Of course this needs to be changed when HIGHEST_PROTOCOL changes.
self.assertEqual(self.module.HIGHEST_PROTOCOL, 2)
def test_callapi(self):
from cStringIO import StringIO
f = StringIO()
f = cStringIO.StringIO()
# With and without keyword arguments
self.module.dump(123, f, -1)
self.module.dump(123, file=f, protocol=-1)
@ -1039,3 +1096,116 @@ class AbstractPersistentPicklerTests(unittest.TestCase):
self.assertEqual(self.loads(self.dumps(L, 1)), L)
self.assertEqual(self.id_count, 5)
self.assertEqual(self.load_count, 5)
class AbstractPicklerUnpicklerObjectTests(unittest.TestCase):
pickler_class = None
unpickler_class = None
def setUp(self):
assert self.pickler_class
assert self.unpickler_class
def test_clear_pickler_memo(self):
# To test whether clear_memo() has any effect, we pickle an object,
# then pickle it again without clearing the memo; the two serialized
# forms should be different. If we clear_memo() and then pickle the
# object again, the third serialized form should be identical to the
# first one we obtained.
data = ["abcdefg", "abcdefg", 44]
f = cStringIO.StringIO()
pickler = self.pickler_class(f)
pickler.dump(data)
first_pickled = f.getvalue()
# Reset StringIO object.
f.seek(0)
f.truncate()
pickler.dump(data)
second_pickled = f.getvalue()
# Reset the Pickler and StringIO objects.
pickler.clear_memo()
f.seek(0)
f.truncate()
pickler.dump(data)
third_pickled = f.getvalue()
self.assertNotEqual(first_pickled, second_pickled)
self.assertEqual(first_pickled, third_pickled)
def test_priming_pickler_memo(self):
# Verify that we can set the Pickler's memo attribute.
data = ["abcdefg", "abcdefg", 44]
f = cStringIO.StringIO()
pickler = self.pickler_class(f)
pickler.dump(data)
first_pickled = f.getvalue()
f = cStringIO.StringIO()
primed = self.pickler_class(f)
primed.memo = pickler.memo
primed.dump(data)
primed_pickled = f.getvalue()
self.assertNotEqual(first_pickled, primed_pickled)
def test_priming_unpickler_memo(self):
# Verify that we can set the Unpickler's memo attribute.
data = ["abcdefg", "abcdefg", 44]
f = cStringIO.StringIO()
pickler = self.pickler_class(f)
pickler.dump(data)
first_pickled = f.getvalue()
f = cStringIO.StringIO()
primed = self.pickler_class(f)
primed.memo = pickler.memo
primed.dump(data)
primed_pickled = f.getvalue()
unpickler = self.unpickler_class(cStringIO.StringIO(first_pickled))
unpickled_data1 = unpickler.load()
self.assertEqual(unpickled_data1, data)
primed = self.unpickler_class(cStringIO.StringIO(primed_pickled))
primed.memo = unpickler.memo
unpickled_data2 = primed.load()
primed.memo.clear()
self.assertEqual(unpickled_data2, data)
self.assertTrue(unpickled_data2 is unpickled_data1)
def test_reusing_unpickler_objects(self):
data1 = ["abcdefg", "abcdefg", 44]
f = cStringIO.StringIO()
pickler = self.pickler_class(f)
pickler.dump(data1)
pickled1 = f.getvalue()
data2 = ["abcdefg", 44, 44]
f = cStringIO.StringIO()
pickler = self.pickler_class(f)
pickler.dump(data2)
pickled2 = f.getvalue()
f = cStringIO.StringIO()
f.write(pickled1)
f.seek(0)
unpickler = self.unpickler_class(f)
self.assertEqual(unpickler.load(), data1)
f.seek(0)
f.truncate()
f.write(pickled2)
f.seek(0)
self.assertEqual(unpickler.load(), data2)