Merged revisions 80928 via svnmerge from

svn+ssh://pythondev@svn.python.org/python/branches/py3k

................
  r80928 | antoine.pitrou | 2010-05-07 19:04:02 +0200 (ven., 07 mai 2010) | 11 lines

  Merged revisions 80926 via svnmerge from
  svn+ssh://pythondev@svn.python.org/python/trunk

  ........
    r80926 | antoine.pitrou | 2010-05-07 18:50:34 +0200 (ven., 07 mai 2010) | 5 lines

    Issue #8571: Fix an internal error when compressing or decompressing a
    chunk larger than 1GB with the zlib module's compressor and decompressor
    objects.
  ........
................
This commit is contained in:
Antoine Pitrou 2010-05-07 17:08:54 +00:00
parent c5852813b8
commit 4b3fe14d4b
3 changed files with 67 additions and 5 deletions

View file

@ -2,6 +2,7 @@ import unittest
from test import support
import binascii
import random
from test.support import precisionbigmemtest, _1G
zlib = support.import_module('zlib')
@ -93,8 +94,39 @@ class ExceptionTestCase(unittest.TestCase):
self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
class BaseCompressTestCase(object):
def check_big_compress_buffer(self, size, compress_func):
_1M = 1024 * 1024
fmt = "%%0%dx" % (2 * _1M)
# Generate 10MB worth of random, and expand it by repeating it.
# The assumption is that zlib's memory is not big enough to exploit
# such spread out redundancy.
data = b''.join([random.getrandbits(8 * _1M).to_bytes(_1M, 'little')
for i in range(10)])
data = data * (size // len(data) + 1)
try:
compress_func(data)
finally:
# Release memory
data = None
class CompressTestCase(unittest.TestCase):
def check_big_decompress_buffer(self, size, decompress_func):
data = b'x' * size
try:
compressed = zlib.compress(data, 1)
finally:
# Release memory
data = None
data = decompress_func(compressed)
# Sanity check
try:
self.assertEqual(len(data), size)
self.assertEqual(len(data.strip(b'x')), 0)
finally:
data = None
class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
# Test compression in one go (whole message compression)
def test_speech(self):
x = zlib.compress(HAMLET_SCENE)
@ -108,9 +140,19 @@ class CompressTestCase(unittest.TestCase):
for ob in x, bytearray(x):
self.assertEqual(zlib.decompress(ob), data)
# Memory use of the following functions takes into account overallocation
@precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
def test_big_compress_buffer(self, size):
compress = lambda s: zlib.compress(s, 1)
self.check_big_compress_buffer(size, compress)
@precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
def test_big_decompress_buffer(self, size):
self.check_big_decompress_buffer(size, zlib.decompress)
class CompressObjectTestCase(unittest.TestCase):
class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
# Test compression object
def test_pair(self):
# straightforward compress/decompress objects
@ -399,6 +441,21 @@ class CompressObjectTestCase(unittest.TestCase):
d.flush()
self.assertRaises(ValueError, d.copy)
# Memory use of the following functions takes into account overallocation
@precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
def test_big_compress_buffer(self, size):
c = zlib.compressobj(1)
compress = lambda s: c.compress(s) + c.flush()
self.check_big_compress_buffer(size, compress)
@precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
def test_big_decompress_buffer(self, size):
d = zlib.decompressobj()
decompress = lambda s: d.decompress(s) + d.flush()
self.check_big_decompress_buffer(size, decompress)
def genblock(seed, length, step=1024, generator=random):
"""length-byte stream of random data from a seed (in step-byte blocks)."""
if seed is not None: