mirror of
https://github.com/python/cpython.git
synced 2025-08-31 14:07:50 +00:00
bpo-25007: Add copy protocol support to zlib compressors and decompressors (GH-7940)
This commit is contained in:
parent
fbd7172325
commit
d2cbfffc84
5 changed files with 183 additions and 25 deletions
|
@ -1,6 +1,7 @@
|
|||
import unittest
|
||||
from test import support
|
||||
import binascii
|
||||
import copy
|
||||
import pickle
|
||||
import random
|
||||
import sys
|
||||
|
@ -626,23 +627,24 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
|
|||
# Test copying a compression object
|
||||
data0 = HAMLET_SCENE
|
||||
data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii")
|
||||
c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
|
||||
bufs0 = []
|
||||
bufs0.append(c0.compress(data0))
|
||||
for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
|
||||
c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
|
||||
bufs0 = []
|
||||
bufs0.append(c0.compress(data0))
|
||||
|
||||
c1 = c0.copy()
|
||||
bufs1 = bufs0[:]
|
||||
c1 = func(c0)
|
||||
bufs1 = bufs0[:]
|
||||
|
||||
bufs0.append(c0.compress(data0))
|
||||
bufs0.append(c0.flush())
|
||||
s0 = b''.join(bufs0)
|
||||
bufs0.append(c0.compress(data0))
|
||||
bufs0.append(c0.flush())
|
||||
s0 = b''.join(bufs0)
|
||||
|
||||
bufs1.append(c1.compress(data1))
|
||||
bufs1.append(c1.flush())
|
||||
s1 = b''.join(bufs1)
|
||||
bufs1.append(c1.compress(data1))
|
||||
bufs1.append(c1.flush())
|
||||
s1 = b''.join(bufs1)
|
||||
|
||||
self.assertEqual(zlib.decompress(s0),data0+data0)
|
||||
self.assertEqual(zlib.decompress(s1),data0+data1)
|
||||
self.assertEqual(zlib.decompress(s0),data0+data0)
|
||||
self.assertEqual(zlib.decompress(s1),data0+data1)
|
||||
|
||||
@requires_Compress_copy
|
||||
def test_badcompresscopy(self):
|
||||
|
@ -651,6 +653,8 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
|
|||
c.compress(HAMLET_SCENE)
|
||||
c.flush()
|
||||
self.assertRaises(ValueError, c.copy)
|
||||
self.assertRaises(ValueError, copy.copy, c)
|
||||
self.assertRaises(ValueError, copy.deepcopy, c)
|
||||
|
||||
@requires_Decompress_copy
|
||||
def test_decompresscopy(self):
|
||||
|
@ -660,21 +664,22 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
|
|||
# Test type of return value
|
||||
self.assertIsInstance(comp, bytes)
|
||||
|
||||
d0 = zlib.decompressobj()
|
||||
bufs0 = []
|
||||
bufs0.append(d0.decompress(comp[:32]))
|
||||
for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
|
||||
d0 = zlib.decompressobj()
|
||||
bufs0 = []
|
||||
bufs0.append(d0.decompress(comp[:32]))
|
||||
|
||||
d1 = d0.copy()
|
||||
bufs1 = bufs0[:]
|
||||
d1 = func(d0)
|
||||
bufs1 = bufs0[:]
|
||||
|
||||
bufs0.append(d0.decompress(comp[32:]))
|
||||
s0 = b''.join(bufs0)
|
||||
bufs0.append(d0.decompress(comp[32:]))
|
||||
s0 = b''.join(bufs0)
|
||||
|
||||
bufs1.append(d1.decompress(comp[32:]))
|
||||
s1 = b''.join(bufs1)
|
||||
bufs1.append(d1.decompress(comp[32:]))
|
||||
s1 = b''.join(bufs1)
|
||||
|
||||
self.assertEqual(s0,s1)
|
||||
self.assertEqual(s0,data)
|
||||
self.assertEqual(s0,s1)
|
||||
self.assertEqual(s0,data)
|
||||
|
||||
@requires_Decompress_copy
|
||||
def test_baddecompresscopy(self):
|
||||
|
@ -684,6 +689,8 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
|
|||
d.decompress(data)
|
||||
d.flush()
|
||||
self.assertRaises(ValueError, d.copy)
|
||||
self.assertRaises(ValueError, copy.copy, d)
|
||||
self.assertRaises(ValueError, copy.deepcopy, d)
|
||||
|
||||
def test_compresspickle(self):
|
||||
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue