- Thanks to Scott David Daniels, a subtle bug in how the zlib

extension implemented flush() was fixed.  Scott also rewrite the
  zlib test suite using the unittest module.  (SF bug #640230 and
  patch #678531.)

Backport candidate I think.
This commit is contained in:
Guido van Rossum 2003-02-03 20:45:52 +00:00
parent 94c30c0124
commit 7d9ea5013f
5 changed files with 487 additions and 181 deletions

View file

@ -1,169 +1,419 @@
import unittest
from test import test_support
import zlib
import sys
import imp
from test.test_support import TestFailed
try:
t = imp.find_module('test_zlib')
file = t[0]
except ImportError:
file = open(__file__)
buf = file.read() * 8
file.close()
# test the checksums (hex so the test doesn't break on 64-bit machines)
def fix(x):
return "0x%x" % (x & 0xffffffffL)
print fix(zlib.crc32('penguin')), fix(zlib.crc32('penguin', 1))
print fix(zlib.adler32('penguin')), fix(zlib.adler32('penguin', 1))
# make sure we generate some expected errors
try:
zlib.compress('ERROR', zlib.MAX_WBITS + 1)
except zlib.error, msg:
print "expecting", msg
try:
zlib.compressobj(1, 8, 0)
except ValueError, msg:
print "expecting", msg
try:
zlib.decompressobj(0)
except ValueError, msg:
print "expecting", msg
x = zlib.compress(buf)
y = zlib.decompress(x)
if buf != y:
print "normal compression/decompression failed"
else:
print "normal compression/decompression succeeded"
buf = buf * 16
co = zlib.compressobj(8, 8, -15)
x1 = co.compress(buf)
x2 = co.flush()
try:
co.flush()
print "Oops - second flush worked when it should not have!"
except zlib.error:
pass
x = x1 + x2
dc = zlib.decompressobj(-15)
y1 = dc.decompress(x)
y2 = dc.flush()
y = y1 + y2
if buf != y:
print "compress/decompression obj failed"
else:
print "compress/decompression obj succeeded"
co = zlib.compressobj(2, 8, -12, 9, 1)
bufs = []
for i in range(0, len(buf), 256):
bufs.append(co.compress(buf[i:i+256]))
bufs.append(co.flush())
combuf = ''.join(bufs)
decomp1 = zlib.decompress(combuf, -12, -5)
if decomp1 != buf:
print "decompress with init options failed"
else:
print "decompress with init options succeeded"
deco = zlib.decompressobj(-12)
bufs = []
for i in range(0, len(combuf), 128):
bufs.append(deco.decompress(combuf[i:i+128]))
bufs.append(deco.flush())
decomp2 = ''.join(bufs)
if decomp2 != buf:
print "decompressobj with init options failed"
else:
print "decompressobj with init options succeeded"
print "should be '':", `deco.unconsumed_tail`
# Check a decompression object with max_length specified
deco = zlib.decompressobj(-12)
cb = combuf
bufs = []
while cb:
max_length = 1 + len(cb)/10
chunk = deco.decompress(cb, max_length)
if len(chunk) > max_length:
print 'chunk too big (%d>%d)' % (len(chunk),max_length)
bufs.append(chunk)
cb = deco.unconsumed_tail
bufs.append(deco.flush())
decomp2 = ''.join(buf)
if decomp2 != buf:
print "max_length decompressobj failed"
else:
print "max_length decompressobj succeeded"
# Misc tests of max_length
deco = zlib.decompressobj(-12)
try:
deco.decompress("", -1)
except ValueError:
pass
else:
print "failed to raise value error on bad max_length"
print "unconsumed_tail should be '':", `deco.unconsumed_tail`
# Test flush() with the various options, using all the different levels
# in order to provide more variations.
sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
sync_opt = [getattr(zlib, opt) for opt in sync_opt if hasattr(zlib, opt)]
for sync in sync_opt:
for level in range(10):
obj = zlib.compressobj( level )
d = obj.compress( buf[:3000] )
d = d + obj.flush( sync )
d = d + obj.compress( buf[3000:] )
d = d + obj.flush()
if zlib.decompress(d) != buf:
print "Decompress failed: flush mode=%i, level=%i" % (sync,level)
del obj
# Test for the odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
import random
random.seed(1)
print 'Testing on 17K of random data'
# print test_support.TESTFN
if hasattr(zlib, 'Z_SYNC_FLUSH'):
def getbuf():
# This was in the original. Avoid non-repeatable sources.
# Left here (unused) in case something wants to be done with it.
import imp
try:
t = imp.find_module('test_zlib')
file = t[0]
except ImportError:
file = open(__file__)
buf = file.read() * 8
file.close()
return buf
# Create compressor and decompressor objects
c=zlib.compressobj(9)
d=zlib.decompressobj()
# Try 17K of data
# generate random data stream
a=""
for i in range(17*1024):
a=a+chr(random.randint(0,255))
# compress, sync-flush, and decompress
t = d.decompress( c.compress(a)+c.flush(zlib.Z_SYNC_FLUSH) )
class ChecksumTestCase(unittest.TestCase):
# checksum test cases
def test_crc32start(self):
self.assertEqual(zlib.crc32(""), zlib.crc32("", 0))
# if decompressed data is different from the input data, choke.
if len(t) != len(a):
print len(a),len(t),len(d.unused_data)
raise TestFailed, "output of 17K doesn't match"
def test_crc32empty(self):
self.assertEqual(zlib.crc32("", 0), 0)
self.assertEqual(zlib.crc32("", 1), 1)
self.assertEqual(zlib.crc32("", 432), 432)
def ignore():
"""An empty function with a big string.
def test_adler32start(self):
self.assertEqual(zlib.adler32(""), zlib.adler32("", 1))
Make the compression algorithm work a little harder.
"""
def test_adler32empty(self):
self.assertEqual(zlib.adler32("", 0), 0)
self.assertEqual(zlib.adler32("", 1), 1)
self.assertEqual(zlib.adler32("", 432), 432)
"""
def assertEqual32(self, seen, expected):
# 32-bit values masked -- checksums on 32- vs 64- bit machines
# This is important if bit 31 (0x08000000L) is set.
self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL)
def test_penguins(self):
self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L)
self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94)
self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6)
self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7)
self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0))
self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1))
class ExceptionTestCase(unittest.TestCase):
# make sure we generate some expected errors
def test_bigbits(self):
# specifying total bits too large causes an error
self.assertRaises(zlib.error,
zlib.compress, 'ERROR', zlib.MAX_WBITS + 1)
def test_badcompressobj(self):
# verify failure on building compress object with bad params
self.assertRaises(ValueError, zlib.compressobj, 1, 8, 0)
def test_baddecompressobj(self):
# verify failure on building decompress object with bad params
self.assertRaises(ValueError, zlib.decompressobj, 0)
class CompressTestCase(unittest.TestCase):
# Test compression in one go (whole message compression)
def test_speech(self):
# decompress(compress(data)) better be data
x = zlib.compress(hamlet_scene)
self.assertEqual(zlib.decompress(x), hamlet_scene)
def test_speech8(self):
# decompress(compress(data)) better be data -- more compression chances
data = hamlet_scene * 8
x = zlib.compress(data)
self.assertEqual(zlib.decompress(x), data)
def test_speech16(self):
# decompress(compress(data)) better be data -- more compression chances
data = hamlet_scene * 16
x = zlib.compress(data)
self.assertEqual(zlib.decompress(x), data)
def test_speech128(self):
# decompress(compress(data)) better be data -- more compression chances
data = hamlet_scene * 8 * 16
x = zlib.compress(data)
self.assertEqual(zlib.decompress(x), data)
def test_monotonic(self):
# higher compression levels should not expand compressed size
data = hamlet_scene * 8 * 16
last = length = len(zlib.compress(data, 0))
self.failUnless(last > len(data), "compress level 0 always expands")
for level in range(10):
length = len(zlib.compress(data, level))
self.failUnless(length <= last,
'compress level %d more effective than %d!' % (
level-1, level))
last = length
class CompressObjectTestCase(unittest.TestCase):
# Test compression object
def test_pairsmall(self):
# use compress object in straightforward manner, decompress w/ object
data = hamlet_scene
co = zlib.compressobj(8, 8, -15)
x1 = co.compress(data)
x2 = co.flush()
self.assertRaises(zlib.error, co.flush) # second flush should not work
dco = zlib.decompressobj(-15)
y1 = dco.decompress(x1 + x2)
y2 = dco.flush()
self.assertEqual(data, y1 + y2)
def test_pair(self):
# straightforward compress/decompress objects, more compression
data = hamlet_scene * 8 * 16
co = zlib.compressobj(8, 8, -15)
x1 = co.compress(data)
x2 = co.flush()
self.assertRaises(zlib.error, co.flush) # second flush should not work
dco = zlib.decompressobj(-15)
y1 = dco.decompress(x1 + x2)
y2 = dco.flush()
self.assertEqual(data, y1 + y2)
def test_compressincremental(self):
# compress object in steps, decompress object as one-shot
data = hamlet_scene * 8 * 16
co = zlib.compressobj(2, 8, -12, 9, 1)
bufs = []
for i in range(0, len(data), 256):
bufs.append(co.compress(data[i:i+256]))
bufs.append(co.flush())
combuf = ''.join(bufs)
dco = zlib.decompressobj(-15)
y1 = dco.decompress(''.join(bufs))
y2 = dco.flush()
self.assertEqual(data, y1 + y2)
def test_decompressincremental(self):
# compress object in steps, decompress object in steps
data = hamlet_scene * 8 * 16
co = zlib.compressobj(2, 8, -12, 9, 1)
bufs = []
for i in range(0, len(data), 256):
bufs.append(co.compress(data[i:i+256]))
bufs.append(co.flush())
combuf = ''.join(bufs)
self.assertEqual(data, zlib.decompress(combuf, -12, -5))
dco = zlib.decompressobj(-12)
bufs = []
for i in range(0, len(combuf), 128):
bufs.append(dco.decompress(combuf[i:i+128]))
self.assertEqual('', dco.unconsumed_tail, ########
"(A) uct should be '': not %d long" %
len(dco.unconsumed_tail))
bufs.append(dco.flush())
self.assertEqual('', dco.unconsumed_tail, ########
"(B) uct should be '': not %d long" %
len(dco.unconsumed_tail))
self.assertEqual(data, ''.join(bufs))
# Failure means: "decompressobj with init options failed"
def test_decompinc(self,sizes=[128],flush=True,source=None,cx=256,dcx=64):
# compress object in steps, decompress object in steps, loop sizes
source = source or hamlet_scene
for reps in sizes:
data = source * reps
co = zlib.compressobj(2, 8, -12, 9, 1)
bufs = []
for i in range(0, len(data), cx):
bufs.append(co.compress(data[i:i+cx]))
bufs.append(co.flush())
combuf = ''.join(bufs)
self.assertEqual(data, zlib.decompress(combuf, -12, -5))
dco = zlib.decompressobj(-12)
bufs = []
for i in range(0, len(combuf), dcx):
bufs.append(dco.decompress(combuf[i:i+dcx]))
self.assertEqual('', dco.unconsumed_tail, ########
"(A) uct should be '': not %d long" %
len(dco.unconsumed_tail))
if flush:
bufs.append(dco.flush())
else:
while True:
chunk = dco.decompress('')
if chunk:
bufs.append(chunk)
else:
break
self.assertEqual('', dco.unconsumed_tail, ########
"(B) uct should be '': not %d long" %
len(dco.unconsumed_tail))
self.assertEqual(data, ''.join(bufs))
# Failure means: "decompressobj with init options failed"
def test_decompimax(self,sizes=[128],flush=True,source=None,cx=256,dcx=64):
# compress in steps, decompress in length-restricted steps, loop sizes
source = source or hamlet_scene
for reps in sizes:
# Check a decompression object with max_length specified
data = source * reps
co = zlib.compressobj(2, 8, -12, 9, 1)
bufs = []
for i in range(0, len(data), cx):
bufs.append(co.compress(data[i:i+cx]))
bufs.append(co.flush())
combuf = ''.join(bufs)
self.assertEqual(data, zlib.decompress(combuf, -12, -5),
'compressed data failure')
dco = zlib.decompressobj(-12)
bufs = []
cb = combuf
while cb:
#max_length = 1 + len(cb)/10
chunk = dco.decompress(cb, dcx)
self.failIf(len(chunk) > dcx,
'chunk too big (%d>%d)' % (len(chunk), dcx))
bufs.append(chunk)
cb = dco.unconsumed_tail
if flush:
bufs.append(dco.flush())
else:
while True:
chunk = dco.decompress('', dcx)
self.failIf(len(chunk) > dcx,
'chunk too big in tail (%d>%d)' % (len(chunk), dcx))
if chunk:
bufs.append(chunk)
else:
break
self.assertEqual(len(data), len(''.join(bufs)))
self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
def test_decompressmaxlen(self):
# Check a decompression object with max_length specified
data = hamlet_scene * 8 * 16
co = zlib.compressobj(2, 8, -12, 9, 1)
bufs = []
for i in range(0, len(data), 256):
bufs.append(co.compress(data[i:i+256]))
bufs.append(co.flush())
combuf = ''.join(bufs)
self.assertEqual(data, zlib.decompress(combuf, -12, -5),
'compressed data failure')
dco = zlib.decompressobj(-12)
bufs = []
cb = combuf
while cb:
max_length = 1 + len(cb)/10
chunk = dco.decompress(cb, max_length)
self.failIf(len(chunk) > max_length,
'chunk too big (%d>%d)' % (len(chunk),max_length))
bufs.append(chunk)
cb = dco.unconsumed_tail
bufs.append(dco.flush())
self.assertEqual(len(data), len(''.join(bufs)))
self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
def test_decompressmaxlenflushless(self):
# identical to test_decompressmaxlen except flush is replaced
# with an equivalent. This works and other fails on (eg) 2.2.2
data = hamlet_scene * 8 * 16
co = zlib.compressobj(2, 8, -12, 9, 1)
bufs = []
for i in range(0, len(data), 256):
bufs.append(co.compress(data[i:i+256]))
bufs.append(co.flush())
combuf = ''.join(bufs)
self.assertEqual(data, zlib.decompress(combuf, -12, -5),
'compressed data mismatch')
dco = zlib.decompressobj(-12)
bufs = []
cb = combuf
while cb:
max_length = 1 + len(cb)/10
chunk = dco.decompress(cb, max_length)
self.failIf(len(chunk) > max_length,
'chunk too big (%d>%d)' % (len(chunk),max_length))
bufs.append(chunk)
cb = dco.unconsumed_tail
#bufs.append(dco.flush())
while len(chunk):
chunk = dco.decompress('', max_length)
self.failIf(len(chunk) > max_length,
'chunk too big (%d>%d)' % (len(chunk),max_length))
bufs.append(chunk)
self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
def test_maxlenmisc(self):
# Misc tests of max_length
dco = zlib.decompressobj(-12)
self.assertRaises(ValueError, dco.decompress, "", -1)
self.assertEqual('', dco.unconsumed_tail)
def test_flushes(self):
# Test flush() with the various options, using all the
# different levels in order to provide more variations.
sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
sync_opt = [getattr(zlib, opt) for opt in sync_opt
if hasattr(zlib, opt)]
data = hamlet_scene * 8
for sync in sync_opt:
for level in range(10):
obj = zlib.compressobj( level )
a = obj.compress( data[:3000] )
b = obj.flush( sync )
c = obj.compress( data[3000:] )
d = obj.flush()
self.assertEqual(zlib.decompress(''.join([a,b,c,d])),
data, ("Decompress failed: flush "
"mode=%i, level=%i") % (sync, level))
del obj
def test_odd_flush(self):
# Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
import random
if hasattr(zlib, 'Z_SYNC_FLUSH'):
# Testing on 17K of "random" data
# Create compressor and decompressor objects
co = zlib.compressobj(9)
dco = zlib.decompressobj()
# Try 17K of data
# generate random data stream
try:
# In 2.3 and later, WichmannHill is the RNG of the bug report
gen = random.WichmannHill()
except AttributeError:
try:
# 2.2 called it Random
gen = random.Random()
except AttributeError:
# others might simply have a single RNG
gen = random
gen.seed(1)
data = genblock(1, 17 * 1024, generator=gen)
# compress, sync-flush, and decompress
first = co.compress(data)
second = co.flush(zlib.Z_SYNC_FLUSH)
expanded = dco.decompress(first + second)
# if decompressed data is different from the input data, choke.
self.assertEqual(expanded, data, "17K random source doesn't match")
def test_manydecompinc(self):
# Run incremental decompress test for a large range of sizes
self.test_decompinc(sizes=[1<<n for n in range(8)],
flush=True, cx=32, dcx=4)
def test_manydecompimax(self):
# Run incremental decompress maxlen test for a large range of sizes
# avoid the flush bug
self.test_decompimax(sizes=[1<<n for n in range(8)],
flush=False, cx=32, dcx=4)
def test_manydecompimaxflush(self):
# Run incremental decompress maxlen test for a large range of sizes
# avoid the flush bug
self.test_decompimax(sizes=[1<<n for n in range(8)],
flush=True, cx=32, dcx=4)
def genblock(seed, length, step=1024, generator=random):
"""length-byte stream of random data from a seed (in step-byte blocks)."""
if seed is not None:
generator.seed(seed)
randint = generator.randint
if length < step or step < 2:
step = length
blocks = []
for i in range(0, length, step):
blocks.append(''.join([chr(randint(0,255))
for x in range(step)]))
return ''.join(blocks)[:length]
def choose_lines(source, number, seed=None, generator=random):
"""Return a list of number lines randomly chosen from the source"""
if seed is not None:
generator.seed(seed)
sources = source.split('\n')
return [generator.choice(sources) for n in range(number)]
hamlet_scene = """
LAERTES
O, fear me not.
@ -226,3 +476,34 @@ LAERTES
Farewell.
"""
def test_main():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(ChecksumTestCase))
suite.addTest(unittest.makeSuite(ExceptionTestCase))
suite.addTest(unittest.makeSuite(CompressTestCase))
suite.addTest(unittest.makeSuite(CompressObjectTestCase))
test_support.run_suite(suite)
if __name__ == "__main__":
test_main()
def test(tests=''):
if not tests: tests = 'o'
suite = unittest.TestSuite()
if 'k' in tests: suite.addTest(unittest.makeSuite(ChecksumTestCase))
if 'x' in tests: suite.addTest(unittest.makeSuite(ExceptionTestCase))
if 'c' in tests: suite.addTest(unittest.makeSuite(CompressTestCase))
if 'o' in tests: suite.addTest(unittest.makeSuite(CompressObjectTestCase))
test_support.run_suite(suite)
if False:
import sys
sys.path.insert(1, '/Py23Src/python/dist/src/Lib/test')
import test_zlib as tz
ts, ut = tz.test_support, tz.unittest
su = ut.TestSuite()
su.addTest(ut.makeSuite(tz.CompressTestCase))
ts.run_suite(su)