mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 11:49:12 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			224 lines
		
	
	
	
		
			7.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			224 lines
		
	
	
	
		
			7.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import pickle
 | 
						|
import io
 | 
						|
import collections
 | 
						|
import struct
 | 
						|
import sys
 | 
						|
 | 
						|
import unittest
 | 
						|
from test import support
 | 
						|
 | 
						|
from test.pickletester import AbstractPickleTests
 | 
						|
from test.pickletester import AbstractPickleModuleTests
 | 
						|
from test.pickletester import AbstractPersistentPicklerTests
 | 
						|
from test.pickletester import AbstractPicklerUnpicklerObjectTests
 | 
						|
from test.pickletester import AbstractDispatchTableTests
 | 
						|
from test.pickletester import BigmemPickleTests
 | 
						|
 | 
						|
try:
 | 
						|
    import _pickle
 | 
						|
    has_c_implementation = True
 | 
						|
except ImportError:
 | 
						|
    has_c_implementation = False
 | 
						|
 | 
						|
 | 
						|
class PickleTests(AbstractPickleModuleTests):
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
class PyPicklerTests(AbstractPickleTests):
 | 
						|
 | 
						|
    pickler = pickle._Pickler
 | 
						|
    unpickler = pickle._Unpickler
 | 
						|
 | 
						|
    def dumps(self, arg, proto=None):
 | 
						|
        f = io.BytesIO()
 | 
						|
        p = self.pickler(f, proto)
 | 
						|
        p.dump(arg)
 | 
						|
        f.seek(0)
 | 
						|
        return bytes(f.read())
 | 
						|
 | 
						|
    def loads(self, buf, **kwds):
 | 
						|
        f = io.BytesIO(buf)
 | 
						|
        u = self.unpickler(f, **kwds)
 | 
						|
        return u.load()
 | 
						|
 | 
						|
 | 
						|
class InMemoryPickleTests(AbstractPickleTests, BigmemPickleTests):
 | 
						|
 | 
						|
    pickler = pickle._Pickler
 | 
						|
    unpickler = pickle._Unpickler
 | 
						|
 | 
						|
    def dumps(self, arg, protocol=None):
 | 
						|
        return pickle.dumps(arg, protocol)
 | 
						|
 | 
						|
    def loads(self, buf, **kwds):
 | 
						|
        return pickle.loads(buf, **kwds)
 | 
						|
 | 
						|
 | 
						|
class PyPersPicklerTests(AbstractPersistentPicklerTests):
 | 
						|
 | 
						|
    pickler = pickle._Pickler
 | 
						|
    unpickler = pickle._Unpickler
 | 
						|
 | 
						|
    def dumps(self, arg, proto=None):
 | 
						|
        class PersPickler(self.pickler):
 | 
						|
            def persistent_id(subself, obj):
 | 
						|
                return self.persistent_id(obj)
 | 
						|
        f = io.BytesIO()
 | 
						|
        p = PersPickler(f, proto)
 | 
						|
        p.dump(arg)
 | 
						|
        f.seek(0)
 | 
						|
        return f.read()
 | 
						|
 | 
						|
    def loads(self, buf, **kwds):
 | 
						|
        class PersUnpickler(self.unpickler):
 | 
						|
            def persistent_load(subself, obj):
 | 
						|
                return self.persistent_load(obj)
 | 
						|
        f = io.BytesIO(buf)
 | 
						|
        u = PersUnpickler(f, **kwds)
 | 
						|
        return u.load()
 | 
						|
 | 
						|
 | 
						|
class PyPicklerUnpicklerObjectTests(AbstractPicklerUnpicklerObjectTests):
 | 
						|
 | 
						|
    pickler_class = pickle._Pickler
 | 
						|
    unpickler_class = pickle._Unpickler
 | 
						|
 | 
						|
 | 
						|
class PyDispatchTableTests(AbstractDispatchTableTests):
 | 
						|
 | 
						|
    pickler_class = pickle._Pickler
 | 
						|
 | 
						|
    def get_dispatch_table(self):
 | 
						|
        return pickle.dispatch_table.copy()
 | 
						|
 | 
						|
 | 
						|
class PyChainDispatchTableTests(AbstractDispatchTableTests):
 | 
						|
 | 
						|
    pickler_class = pickle._Pickler
 | 
						|
 | 
						|
    def get_dispatch_table(self):
 | 
						|
        return collections.ChainMap({}, pickle.dispatch_table)
 | 
						|
 | 
						|
 | 
						|
if has_c_implementation:
 | 
						|
    class CPicklerTests(PyPicklerTests):
 | 
						|
        pickler = _pickle.Pickler
 | 
						|
        unpickler = _pickle.Unpickler
 | 
						|
 | 
						|
    class CPersPicklerTests(PyPersPicklerTests):
 | 
						|
        pickler = _pickle.Pickler
 | 
						|
        unpickler = _pickle.Unpickler
 | 
						|
 | 
						|
    class CDumpPickle_LoadPickle(PyPicklerTests):
 | 
						|
        pickler = _pickle.Pickler
 | 
						|
        unpickler = pickle._Unpickler
 | 
						|
 | 
						|
    class DumpPickle_CLoadPickle(PyPicklerTests):
 | 
						|
        pickler = pickle._Pickler
 | 
						|
        unpickler = _pickle.Unpickler
 | 
						|
 | 
						|
    class CPicklerUnpicklerObjectTests(AbstractPicklerUnpicklerObjectTests):
 | 
						|
        pickler_class = _pickle.Pickler
 | 
						|
        unpickler_class = _pickle.Unpickler
 | 
						|
 | 
						|
        def test_issue18339(self):
 | 
						|
            unpickler = self.unpickler_class(io.BytesIO())
 | 
						|
            with self.assertRaises(TypeError):
 | 
						|
                unpickler.memo = object
 | 
						|
            # used to cause a segfault
 | 
						|
            with self.assertRaises(ValueError):
 | 
						|
                unpickler.memo = {-1: None}
 | 
						|
            unpickler.memo = {1: None}
 | 
						|
 | 
						|
    class CDispatchTableTests(AbstractDispatchTableTests):
 | 
						|
        pickler_class = pickle.Pickler
 | 
						|
        def get_dispatch_table(self):
 | 
						|
            return pickle.dispatch_table.copy()
 | 
						|
 | 
						|
    class CChainDispatchTableTests(AbstractDispatchTableTests):
 | 
						|
        pickler_class = pickle.Pickler
 | 
						|
        def get_dispatch_table(self):
 | 
						|
            return collections.ChainMap({}, pickle.dispatch_table)
 | 
						|
 | 
						|
    @support.cpython_only
 | 
						|
    class SizeofTests(unittest.TestCase):
 | 
						|
        check_sizeof = support.check_sizeof
 | 
						|
 | 
						|
        def test_pickler(self):
 | 
						|
            basesize = support.calcobjsize('5P2n3i2n3iP')
 | 
						|
            p = _pickle.Pickler(io.BytesIO())
 | 
						|
            self.assertEqual(object.__sizeof__(p), basesize)
 | 
						|
            MT_size = struct.calcsize('3nP0n')
 | 
						|
            ME_size = struct.calcsize('Pn0P')
 | 
						|
            check = self.check_sizeof
 | 
						|
            check(p, basesize +
 | 
						|
                MT_size + 8 * ME_size +  # Minimal memo table size.
 | 
						|
                sys.getsizeof(b'x'*4096))  # Minimal write buffer size.
 | 
						|
            for i in range(6):
 | 
						|
                p.dump(chr(i))
 | 
						|
            check(p, basesize +
 | 
						|
                MT_size + 32 * ME_size +  # Size of memo table required to
 | 
						|
                                          # save references to 6 objects.
 | 
						|
                0)  # Write buffer is cleared after every dump().
 | 
						|
 | 
						|
        def test_unpickler(self):
 | 
						|
            basesize = support.calcobjsize('2Pn2P 2P2n2i5P 2P3n6P2n2i')
 | 
						|
            unpickler = _pickle.Unpickler
 | 
						|
            P = struct.calcsize('P')  # Size of memo table entry.
 | 
						|
            n = struct.calcsize('n')  # Size of mark table entry.
 | 
						|
            check = self.check_sizeof
 | 
						|
            for encoding in 'ASCII', 'UTF-16', 'latin-1':
 | 
						|
                for errors in 'strict', 'replace':
 | 
						|
                    u = unpickler(io.BytesIO(),
 | 
						|
                                  encoding=encoding, errors=errors)
 | 
						|
                    self.assertEqual(object.__sizeof__(u), basesize)
 | 
						|
                    check(u, basesize +
 | 
						|
                             32 * P +  # Minimal memo table size.
 | 
						|
                             len(encoding) + 1 + len(errors) + 1)
 | 
						|
 | 
						|
            stdsize = basesize + len('ASCII') + 1 + len('strict') + 1
 | 
						|
            def check_unpickler(data, memo_size, marks_size):
 | 
						|
                dump = pickle.dumps(data)
 | 
						|
                u = unpickler(io.BytesIO(dump),
 | 
						|
                              encoding='ASCII', errors='strict')
 | 
						|
                u.load()
 | 
						|
                check(u, stdsize + memo_size * P + marks_size * n)
 | 
						|
 | 
						|
            check_unpickler(0, 32, 0)
 | 
						|
            # 20 is minimal non-empty mark stack size.
 | 
						|
            check_unpickler([0] * 100, 32, 20)
 | 
						|
            # 128 is memo table size required to save references to 100 objects.
 | 
						|
            check_unpickler([chr(i) for i in range(100)], 128, 20)
 | 
						|
            def recurse(deep):
 | 
						|
                data = 0
 | 
						|
                for i in range(deep):
 | 
						|
                    data = [data, data]
 | 
						|
                return data
 | 
						|
            check_unpickler(recurse(0), 32, 0)
 | 
						|
            check_unpickler(recurse(1), 32, 20)
 | 
						|
            check_unpickler(recurse(20), 32, 58)
 | 
						|
            check_unpickler(recurse(50), 64, 58)
 | 
						|
            check_unpickler(recurse(100), 128, 134)
 | 
						|
 | 
						|
            u = unpickler(io.BytesIO(pickle.dumps('a', 0)),
 | 
						|
                          encoding='ASCII', errors='strict')
 | 
						|
            u.load()
 | 
						|
            check(u, stdsize + 32 * P + 2 + 1)
 | 
						|
 | 
						|
 | 
						|
def test_main():
 | 
						|
    tests = [PickleTests, PyPicklerTests, PyPersPicklerTests,
 | 
						|
             PyDispatchTableTests, PyChainDispatchTableTests]
 | 
						|
    if has_c_implementation:
 | 
						|
        tests.extend([CPicklerTests, CPersPicklerTests,
 | 
						|
                      CDumpPickle_LoadPickle, DumpPickle_CLoadPickle,
 | 
						|
                      PyPicklerUnpicklerObjectTests,
 | 
						|
                      CPicklerUnpicklerObjectTests,
 | 
						|
                      CDispatchTableTests, CChainDispatchTableTests,
 | 
						|
                      InMemoryPickleTests, SizeofTests])
 | 
						|
    support.run_unittest(*tests)
 | 
						|
    support.run_doctest(pickle)
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    test_main()
 |