mirror of
https://github.com/python/cpython.git
synced 2025-12-10 11:00:14 +00:00
Make test_zipfile pass.
The zipfile module now does all I/O in binary mode using bytes. (Maybe we should support wrapping a TextIOWrapper around it when text mode reading is requested?) Even the password is a bytes array now. Had to fix py_compile.py to use bytes while I was at it. The _struct needed a patch to support bytes, str8 and str for the 's' and 'p' formats.
This commit is contained in:
parent
94ca1c620e
commit
d6ca546091
4 changed files with 177 additions and 146 deletions
|
|
@ -72,10 +72,10 @@ else:
|
||||||
|
|
||||||
def wr_long(f, x):
|
def wr_long(f, x):
|
||||||
"""Internal; write a 32-bit int to a file in little-endian order."""
|
"""Internal; write a 32-bit int to a file in little-endian order."""
|
||||||
f.write(chr( x & 0xff))
|
f.write(bytes([x & 0xff,
|
||||||
f.write(chr((x >> 8) & 0xff))
|
(x >> 8) & 0xff,
|
||||||
f.write(chr((x >> 16) & 0xff))
|
(x >> 16) & 0xff,
|
||||||
f.write(chr((x >> 24) & 0xff))
|
(x >> 24) & 0xff]))
|
||||||
|
|
||||||
def compile(file, cfile=None, dfile=None, doraise=False):
|
def compile(file, cfile=None, dfile=None, doraise=False):
|
||||||
"""Byte-compile one Python source file to Python bytecode.
|
"""Byte-compile one Python source file to Python bytecode.
|
||||||
|
|
@ -133,7 +133,7 @@ def compile(file, cfile=None, dfile=None, doraise=False):
|
||||||
if cfile is None:
|
if cfile is None:
|
||||||
cfile = file + (__debug__ and 'c' or 'o')
|
cfile = file + (__debug__ and 'c' or 'o')
|
||||||
fc = open(cfile, 'wb')
|
fc = open(cfile, 'wb')
|
||||||
fc.write('\0\0\0\0')
|
fc.write(b'\0\0\0\0')
|
||||||
wr_long(fc, timestamp)
|
wr_long(fc, timestamp)
|
||||||
marshal.dump(codeobject, fc)
|
marshal.dump(codeobject, fc)
|
||||||
fc.flush()
|
fc.flush()
|
||||||
|
|
|
||||||
|
|
@ -3,10 +3,8 @@ try:
|
||||||
import zlib
|
import zlib
|
||||||
except ImportError:
|
except ImportError:
|
||||||
zlib = None
|
zlib = None
|
||||||
|
import zipfile, os, unittest, sys, shutil, struct, io
|
||||||
|
|
||||||
import zipfile, os, unittest, sys, shutil, struct
|
|
||||||
|
|
||||||
from StringIO import StringIO
|
|
||||||
from tempfile import TemporaryFile
|
from tempfile import TemporaryFile
|
||||||
from random import randint, random
|
from random import randint, random
|
||||||
|
|
||||||
|
|
@ -18,9 +16,10 @@ FIXEDTEST_SIZE = 10
|
||||||
|
|
||||||
class TestsWithSourceFile(unittest.TestCase):
|
class TestsWithSourceFile(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.line_gen = ("Zipfile test line %d. random float: %f" % (i, random())
|
self.line_gen = (bytes("Zipfile test line %d. random float: %f" %
|
||||||
for i in range(FIXEDTEST_SIZE))
|
(i, random()))
|
||||||
self.data = '\n'.join(self.line_gen) + '\n'
|
for i in range(FIXEDTEST_SIZE))
|
||||||
|
self.data = b'\n'.join(self.line_gen) + b'\n'
|
||||||
|
|
||||||
# Make a source file with some lines
|
# Make a source file with some lines
|
||||||
fp = open(TESTFN, "wb")
|
fp = open(TESTFN, "wb")
|
||||||
|
|
@ -45,14 +44,8 @@ class TestsWithSourceFile(unittest.TestCase):
|
||||||
self.assertEqual(zipfp.read("strfile"), self.data)
|
self.assertEqual(zipfp.read("strfile"), self.data)
|
||||||
|
|
||||||
# Print the ZIP directory
|
# Print the ZIP directory
|
||||||
fp = StringIO()
|
fp = io.StringIO()
|
||||||
stdout = sys.stdout
|
zipfp.printdir(file=fp)
|
||||||
try:
|
|
||||||
sys.stdout = fp
|
|
||||||
|
|
||||||
zipfp.printdir()
|
|
||||||
finally:
|
|
||||||
sys.stdout = stdout
|
|
||||||
|
|
||||||
directory = fp.getvalue()
|
directory = fp.getvalue()
|
||||||
lines = directory.splitlines()
|
lines = directory.splitlines()
|
||||||
|
|
@ -95,7 +88,7 @@ class TestsWithSourceFile(unittest.TestCase):
|
||||||
zipfp.close()
|
zipfp.close()
|
||||||
|
|
||||||
def testStored(self):
|
def testStored(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.zipTest(f, zipfile.ZIP_STORED)
|
self.zipTest(f, zipfile.ZIP_STORED)
|
||||||
|
|
||||||
def zipOpenTest(self, f, compression):
|
def zipOpenTest(self, f, compression):
|
||||||
|
|
@ -119,12 +112,12 @@ class TestsWithSourceFile(unittest.TestCase):
|
||||||
break
|
break
|
||||||
zipdata2.append(read_data)
|
zipdata2.append(read_data)
|
||||||
|
|
||||||
self.assertEqual(''.join(zipdata1), self.data)
|
self.assertEqual(b''.join(zipdata1), self.data)
|
||||||
self.assertEqual(''.join(zipdata2), self.data)
|
self.assertEqual(b''.join(zipdata2), self.data)
|
||||||
zipfp.close()
|
zipfp.close()
|
||||||
|
|
||||||
def testOpenStored(self):
|
def testOpenStored(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.zipOpenTest(f, zipfile.ZIP_STORED)
|
self.zipOpenTest(f, zipfile.ZIP_STORED)
|
||||||
|
|
||||||
def zipRandomOpenTest(self, f, compression):
|
def zipRandomOpenTest(self, f, compression):
|
||||||
|
|
@ -140,11 +133,11 @@ class TestsWithSourceFile(unittest.TestCase):
|
||||||
break
|
break
|
||||||
zipdata1.append(read_data)
|
zipdata1.append(read_data)
|
||||||
|
|
||||||
self.assertEqual(''.join(zipdata1), self.data)
|
self.assertEqual(b''.join(zipdata1), self.data)
|
||||||
zipfp.close()
|
zipfp.close()
|
||||||
|
|
||||||
def testRandomOpenStored(self):
|
def testRandomOpenStored(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.zipRandomOpenTest(f, zipfile.ZIP_STORED)
|
self.zipRandomOpenTest(f, zipfile.ZIP_STORED)
|
||||||
|
|
||||||
def zipReadlineTest(self, f, compression):
|
def zipReadlineTest(self, f, compression):
|
||||||
|
|
@ -181,40 +174,40 @@ class TestsWithSourceFile(unittest.TestCase):
|
||||||
zipfp.close()
|
zipfp.close()
|
||||||
|
|
||||||
def testReadlineStored(self):
|
def testReadlineStored(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.zipReadlineTest(f, zipfile.ZIP_STORED)
|
self.zipReadlineTest(f, zipfile.ZIP_STORED)
|
||||||
|
|
||||||
def testReadlinesStored(self):
|
def testReadlinesStored(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.zipReadlinesTest(f, zipfile.ZIP_STORED)
|
self.zipReadlinesTest(f, zipfile.ZIP_STORED)
|
||||||
|
|
||||||
def testIterlinesStored(self):
|
def testIterlinesStored(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.zipIterlinesTest(f, zipfile.ZIP_STORED)
|
self.zipIterlinesTest(f, zipfile.ZIP_STORED)
|
||||||
|
|
||||||
if zlib:
|
if zlib:
|
||||||
def testDeflated(self):
|
def testDeflated(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.zipTest(f, zipfile.ZIP_DEFLATED)
|
self.zipTest(f, zipfile.ZIP_DEFLATED)
|
||||||
|
|
||||||
def testOpenDeflated(self):
|
def testOpenDeflated(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.zipOpenTest(f, zipfile.ZIP_DEFLATED)
|
self.zipOpenTest(f, zipfile.ZIP_DEFLATED)
|
||||||
|
|
||||||
def testRandomOpenDeflated(self):
|
def testRandomOpenDeflated(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.zipRandomOpenTest(f, zipfile.ZIP_DEFLATED)
|
self.zipRandomOpenTest(f, zipfile.ZIP_DEFLATED)
|
||||||
|
|
||||||
def testReadlineDeflated(self):
|
def testReadlineDeflated(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.zipReadlineTest(f, zipfile.ZIP_DEFLATED)
|
self.zipReadlineTest(f, zipfile.ZIP_DEFLATED)
|
||||||
|
|
||||||
def testReadlinesDeflated(self):
|
def testReadlinesDeflated(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.zipReadlinesTest(f, zipfile.ZIP_DEFLATED)
|
self.zipReadlinesTest(f, zipfile.ZIP_DEFLATED)
|
||||||
|
|
||||||
def testIterlinesDeflated(self):
|
def testIterlinesDeflated(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.zipIterlinesTest(f, zipfile.ZIP_DEFLATED)
|
self.zipIterlinesTest(f, zipfile.ZIP_DEFLATED)
|
||||||
|
|
||||||
def testLowCompression(self):
|
def testLowCompression(self):
|
||||||
|
|
@ -227,8 +220,8 @@ class TestsWithSourceFile(unittest.TestCase):
|
||||||
# Get an open object for strfile
|
# Get an open object for strfile
|
||||||
zipfp = zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED)
|
zipfp = zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED)
|
||||||
openobj = zipfp.open("strfile")
|
openobj = zipfp.open("strfile")
|
||||||
self.assertEqual(openobj.read(1), '1')
|
self.assertEqual(openobj.read(1), b'1')
|
||||||
self.assertEqual(openobj.read(1), '2')
|
self.assertEqual(openobj.read(1), b'2')
|
||||||
|
|
||||||
def testAbsoluteArcnames(self):
|
def testAbsoluteArcnames(self):
|
||||||
zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED)
|
zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED)
|
||||||
|
|
@ -251,8 +244,9 @@ class TestZip64InSmallFiles(unittest.TestCase):
|
||||||
self._limit = zipfile.ZIP64_LIMIT
|
self._limit = zipfile.ZIP64_LIMIT
|
||||||
zipfile.ZIP64_LIMIT = 5
|
zipfile.ZIP64_LIMIT = 5
|
||||||
|
|
||||||
line_gen = ("Test of zipfile line %d." % i for i in range(0, FIXEDTEST_SIZE))
|
line_gen = (bytes("Test of zipfile line %d." % i)
|
||||||
self.data = '\n'.join(line_gen)
|
for i in range(0, FIXEDTEST_SIZE))
|
||||||
|
self.data = b'\n'.join(line_gen)
|
||||||
|
|
||||||
# Make a source file with some lines
|
# Make a source file with some lines
|
||||||
fp = open(TESTFN, "wb")
|
fp = open(TESTFN, "wb")
|
||||||
|
|
@ -272,7 +266,7 @@ class TestZip64InSmallFiles(unittest.TestCase):
|
||||||
zipfp.close()
|
zipfp.close()
|
||||||
|
|
||||||
def testLargeFileException(self):
|
def testLargeFileException(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.largeFileExceptionTest(f, zipfile.ZIP_STORED)
|
self.largeFileExceptionTest(f, zipfile.ZIP_STORED)
|
||||||
self.largeFileExceptionTest2(f, zipfile.ZIP_STORED)
|
self.largeFileExceptionTest2(f, zipfile.ZIP_STORED)
|
||||||
|
|
||||||
|
|
@ -291,14 +285,8 @@ class TestZip64InSmallFiles(unittest.TestCase):
|
||||||
self.assertEqual(zipfp.read("strfile"), self.data)
|
self.assertEqual(zipfp.read("strfile"), self.data)
|
||||||
|
|
||||||
# Print the ZIP directory
|
# Print the ZIP directory
|
||||||
fp = StringIO()
|
fp = io.StringIO()
|
||||||
stdout = sys.stdout
|
zipfp.printdir(fp)
|
||||||
try:
|
|
||||||
sys.stdout = fp
|
|
||||||
|
|
||||||
zipfp.printdir()
|
|
||||||
finally:
|
|
||||||
sys.stdout = stdout
|
|
||||||
|
|
||||||
directory = fp.getvalue()
|
directory = fp.getvalue()
|
||||||
lines = directory.splitlines()
|
lines = directory.splitlines()
|
||||||
|
|
@ -343,13 +331,13 @@ class TestZip64InSmallFiles(unittest.TestCase):
|
||||||
zipfp.close()
|
zipfp.close()
|
||||||
|
|
||||||
def testStored(self):
|
def testStored(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.zipTest(f, zipfile.ZIP_STORED)
|
self.zipTest(f, zipfile.ZIP_STORED)
|
||||||
|
|
||||||
|
|
||||||
if zlib:
|
if zlib:
|
||||||
def testDeflated(self):
|
def testDeflated(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.zipTest(f, zipfile.ZIP_DEFLATED)
|
self.zipTest(f, zipfile.ZIP_DEFLATED)
|
||||||
|
|
||||||
def testAbsoluteArcnames(self):
|
def testAbsoluteArcnames(self):
|
||||||
|
|
@ -440,7 +428,7 @@ class OtherTests(unittest.TestCase):
|
||||||
os.unlink(TESTFN)
|
os.unlink(TESTFN)
|
||||||
|
|
||||||
filename = 'testfile.txt'
|
filename = 'testfile.txt'
|
||||||
content = 'hello, world. this is some content.'
|
content = b'hello, world. this is some content.'
|
||||||
|
|
||||||
try:
|
try:
|
||||||
zf = zipfile.ZipFile(TESTFN, 'a')
|
zf = zipfile.ZipFile(TESTFN, 'a')
|
||||||
|
|
@ -483,7 +471,7 @@ class OtherTests(unittest.TestCase):
|
||||||
# This test checks that the is_zipfile function correctly identifies
|
# This test checks that the is_zipfile function correctly identifies
|
||||||
# a file that is a zip file
|
# a file that is a zip file
|
||||||
zipf = zipfile.ZipFile(TESTFN, mode="w")
|
zipf = zipfile.ZipFile(TESTFN, mode="w")
|
||||||
zipf.writestr("foo.txt", "O, for a Muse of Fire!")
|
zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
|
||||||
zipf.close()
|
zipf.close()
|
||||||
chk = zipfile.is_zipfile(TESTFN)
|
chk = zipfile.is_zipfile(TESTFN)
|
||||||
self.assert_(chk is True)
|
self.assert_(chk is True)
|
||||||
|
|
@ -504,7 +492,7 @@ class OtherTests(unittest.TestCase):
|
||||||
|
|
||||||
def testClosedZipRaisesRuntimeError(self):
|
def testClosedZipRaisesRuntimeError(self):
|
||||||
# Verify that testzip() doesn't swallow inappropriate exceptions.
|
# Verify that testzip() doesn't swallow inappropriate exceptions.
|
||||||
data = StringIO()
|
data = io.BytesIO()
|
||||||
zipf = zipfile.ZipFile(data, mode="w")
|
zipf = zipfile.ZipFile(data, mode="w")
|
||||||
zipf.writestr("foo.txt", "O, for a Muse of Fire!")
|
zipf.writestr("foo.txt", "O, for a Muse of Fire!")
|
||||||
zipf.close()
|
zipf.close()
|
||||||
|
|
@ -525,15 +513,15 @@ class DecryptionTests(unittest.TestCase):
|
||||||
# ZIP file
|
# ZIP file
|
||||||
|
|
||||||
data = (
|
data = (
|
||||||
'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
|
b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
|
||||||
'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
|
b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
|
||||||
'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
|
b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
|
||||||
'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
|
b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
|
||||||
'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
|
b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
|
||||||
'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
|
b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
|
||||||
'\x00\x00L\x00\x00\x00\x00\x00' )
|
b'\x00\x00L\x00\x00\x00\x00\x00' )
|
||||||
|
|
||||||
plain = 'zipfile.py encryption test'
|
plain = b'zipfile.py encryption test'
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
fp = open(TESTFN, "wb")
|
fp = open(TESTFN, "wb")
|
||||||
|
|
@ -551,18 +539,19 @@ class DecryptionTests(unittest.TestCase):
|
||||||
self.assertRaises(RuntimeError, self.zip.read, "test.txt")
|
self.assertRaises(RuntimeError, self.zip.read, "test.txt")
|
||||||
|
|
||||||
def testBadPassword(self):
|
def testBadPassword(self):
|
||||||
self.zip.setpassword("perl")
|
self.zip.setpassword(b"perl")
|
||||||
self.assertRaises(RuntimeError, self.zip.read, "test.txt")
|
self.assertRaises(RuntimeError, self.zip.read, "test.txt")
|
||||||
|
|
||||||
def testGoodPassword(self):
|
def testGoodPassword(self):
|
||||||
self.zip.setpassword("python")
|
self.zip.setpassword(b"python")
|
||||||
self.assertEquals(self.zip.read("test.txt"), self.plain)
|
self.assertEquals(self.zip.read("test.txt"), self.plain)
|
||||||
|
|
||||||
|
|
||||||
class TestsWithRandomBinaryFiles(unittest.TestCase):
|
class TestsWithRandomBinaryFiles(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
datacount = randint(16, 64)*1024 + randint(1, 1024)
|
datacount = randint(16, 64)*1024 + randint(1, 1024)
|
||||||
self.data = ''.join((struct.pack('<f', random()*randint(-1000, 1000)) for i in range(datacount)))
|
self.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000))
|
||||||
|
for i in range(datacount))
|
||||||
|
|
||||||
# Make a source file with some lines
|
# Make a source file with some lines
|
||||||
fp = open(TESTFN, "wb")
|
fp = open(TESTFN, "wb")
|
||||||
|
|
@ -592,7 +581,7 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
|
||||||
zipfp.close()
|
zipfp.close()
|
||||||
|
|
||||||
def testStored(self):
|
def testStored(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.zipTest(f, zipfile.ZIP_STORED)
|
self.zipTest(f, zipfile.ZIP_STORED)
|
||||||
|
|
||||||
def zipOpenTest(self, f, compression):
|
def zipOpenTest(self, f, compression):
|
||||||
|
|
@ -616,17 +605,17 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
|
||||||
break
|
break
|
||||||
zipdata2.append(read_data)
|
zipdata2.append(read_data)
|
||||||
|
|
||||||
testdata1 = ''.join(zipdata1)
|
testdata1 = b''.join(zipdata1)
|
||||||
self.assertEqual(len(testdata1), len(self.data))
|
self.assertEqual(len(testdata1), len(self.data))
|
||||||
self.assertEqual(testdata1, self.data)
|
self.assertEqual(testdata1, self.data)
|
||||||
|
|
||||||
testdata2 = ''.join(zipdata2)
|
testdata2 = b''.join(zipdata2)
|
||||||
self.assertEqual(len(testdata1), len(self.data))
|
self.assertEqual(len(testdata1), len(self.data))
|
||||||
self.assertEqual(testdata1, self.data)
|
self.assertEqual(testdata1, self.data)
|
||||||
zipfp.close()
|
zipfp.close()
|
||||||
|
|
||||||
def testOpenStored(self):
|
def testOpenStored(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.zipOpenTest(f, zipfile.ZIP_STORED)
|
self.zipOpenTest(f, zipfile.ZIP_STORED)
|
||||||
|
|
||||||
def zipRandomOpenTest(self, f, compression):
|
def zipRandomOpenTest(self, f, compression):
|
||||||
|
|
@ -642,13 +631,13 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
|
||||||
break
|
break
|
||||||
zipdata1.append(read_data)
|
zipdata1.append(read_data)
|
||||||
|
|
||||||
testdata = ''.join(zipdata1)
|
testdata = b''.join(zipdata1)
|
||||||
self.assertEqual(len(testdata), len(self.data))
|
self.assertEqual(len(testdata), len(self.data))
|
||||||
self.assertEqual(testdata, self.data)
|
self.assertEqual(testdata, self.data)
|
||||||
zipfp.close()
|
zipfp.close()
|
||||||
|
|
||||||
def testRandomOpenStored(self):
|
def testRandomOpenStored(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.zipRandomOpenTest(f, zipfile.ZIP_STORED)
|
self.zipRandomOpenTest(f, zipfile.ZIP_STORED)
|
||||||
|
|
||||||
class TestsWithMultipleOpens(unittest.TestCase):
|
class TestsWithMultipleOpens(unittest.TestCase):
|
||||||
|
|
@ -682,8 +671,8 @@ class TestsWithMultipleOpens(unittest.TestCase):
|
||||||
data2 = zopen2.read(500)
|
data2 = zopen2.read(500)
|
||||||
data1 += zopen1.read(500)
|
data1 += zopen1.read(500)
|
||||||
data2 += zopen2.read(500)
|
data2 += zopen2.read(500)
|
||||||
self.assertEqual(data1, '1'*FIXEDTEST_SIZE)
|
self.assertEqual(data1, b'1'*FIXEDTEST_SIZE)
|
||||||
self.assertEqual(data2, '2'*FIXEDTEST_SIZE)
|
self.assertEqual(data2, b'2'*FIXEDTEST_SIZE)
|
||||||
zipf.close()
|
zipf.close()
|
||||||
|
|
||||||
def testInterleaved(self):
|
def testInterleaved(self):
|
||||||
|
|
@ -696,8 +685,8 @@ class TestsWithMultipleOpens(unittest.TestCase):
|
||||||
data2 = zopen2.read(500)
|
data2 = zopen2.read(500)
|
||||||
data1 += zopen1.read(500)
|
data1 += zopen1.read(500)
|
||||||
data2 += zopen2.read(500)
|
data2 += zopen2.read(500)
|
||||||
self.assertEqual(data1, '1'*FIXEDTEST_SIZE)
|
self.assertEqual(data1, b'1'*FIXEDTEST_SIZE)
|
||||||
self.assertEqual(data2, '2'*FIXEDTEST_SIZE)
|
self.assertEqual(data2, b'2'*FIXEDTEST_SIZE)
|
||||||
zipf.close()
|
zipf.close()
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
|
|
@ -706,13 +695,19 @@ class TestsWithMultipleOpens(unittest.TestCase):
|
||||||
|
|
||||||
class UniversalNewlineTests(unittest.TestCase):
|
class UniversalNewlineTests(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.line_gen = ["Test of zipfile line %d." % i for i in range(FIXEDTEST_SIZE)]
|
self.line_gen = [bytes("Test of zipfile line %d." % i)
|
||||||
|
for i in range(FIXEDTEST_SIZE)]
|
||||||
self.seps = ('\r', '\r\n', '\n')
|
self.seps = ('\r', '\r\n', '\n')
|
||||||
self.arcdata, self.arcfiles = {}, {}
|
self.arcdata, self.arcfiles = {}, {}
|
||||||
for n, s in enumerate(self.seps):
|
for n, s in enumerate(self.seps):
|
||||||
self.arcdata[s] = s.join(self.line_gen) + s
|
b = s.encode("ascii")
|
||||||
|
self.arcdata[s] = b.join(self.line_gen) + b
|
||||||
self.arcfiles[s] = '%s-%d' % (TESTFN, n)
|
self.arcfiles[s] = '%s-%d' % (TESTFN, n)
|
||||||
file(self.arcfiles[s], "wb").write(self.arcdata[s])
|
f = open(self.arcfiles[s], "wb")
|
||||||
|
try:
|
||||||
|
f.write(self.arcdata[s])
|
||||||
|
finally:
|
||||||
|
f.close()
|
||||||
|
|
||||||
def makeTestArchive(self, f, compression):
|
def makeTestArchive(self, f, compression):
|
||||||
# Create the ZIP archive
|
# Create the ZIP archive
|
||||||
|
|
@ -741,7 +736,7 @@ class UniversalNewlineTests(unittest.TestCase):
|
||||||
zipopen = zipfp.open(fn, "rU")
|
zipopen = zipfp.open(fn, "rU")
|
||||||
for line in self.line_gen:
|
for line in self.line_gen:
|
||||||
linedata = zipopen.readline()
|
linedata = zipopen.readline()
|
||||||
self.assertEqual(linedata, line + '\n')
|
self.assertEqual(linedata, line + b'\n')
|
||||||
|
|
||||||
zipfp.close()
|
zipfp.close()
|
||||||
|
|
||||||
|
|
@ -753,7 +748,7 @@ class UniversalNewlineTests(unittest.TestCase):
|
||||||
for sep, fn in self.arcfiles.items():
|
for sep, fn in self.arcfiles.items():
|
||||||
ziplines = zipfp.open(fn, "rU").readlines()
|
ziplines = zipfp.open(fn, "rU").readlines()
|
||||||
for line, zipline in zip(self.line_gen, ziplines):
|
for line, zipline in zip(self.line_gen, ziplines):
|
||||||
self.assertEqual(zipline, line + '\n')
|
self.assertEqual(zipline, line + b'\n')
|
||||||
|
|
||||||
zipfp.close()
|
zipfp.close()
|
||||||
|
|
||||||
|
|
@ -764,41 +759,41 @@ class UniversalNewlineTests(unittest.TestCase):
|
||||||
zipfp = zipfile.ZipFile(f, "r")
|
zipfp = zipfile.ZipFile(f, "r")
|
||||||
for sep, fn in self.arcfiles.items():
|
for sep, fn in self.arcfiles.items():
|
||||||
for line, zipline in zip(self.line_gen, zipfp.open(fn, "rU")):
|
for line, zipline in zip(self.line_gen, zipfp.open(fn, "rU")):
|
||||||
self.assertEqual(zipline, line + '\n')
|
self.assertEqual(zipline, line + b'\n')
|
||||||
|
|
||||||
zipfp.close()
|
zipfp.close()
|
||||||
|
|
||||||
def testReadStored(self):
|
def testReadStored(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.readTest(f, zipfile.ZIP_STORED)
|
self.readTest(f, zipfile.ZIP_STORED)
|
||||||
|
|
||||||
def testReadlineStored(self):
|
def testReadlineStored(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.readlineTest(f, zipfile.ZIP_STORED)
|
self.readlineTest(f, zipfile.ZIP_STORED)
|
||||||
|
|
||||||
def testReadlinesStored(self):
|
def testReadlinesStored(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.readlinesTest(f, zipfile.ZIP_STORED)
|
self.readlinesTest(f, zipfile.ZIP_STORED)
|
||||||
|
|
||||||
def testIterlinesStored(self):
|
def testIterlinesStored(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.iterlinesTest(f, zipfile.ZIP_STORED)
|
self.iterlinesTest(f, zipfile.ZIP_STORED)
|
||||||
|
|
||||||
if zlib:
|
if zlib:
|
||||||
def testReadDeflated(self):
|
def testReadDeflated(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.readTest(f, zipfile.ZIP_DEFLATED)
|
self.readTest(f, zipfile.ZIP_DEFLATED)
|
||||||
|
|
||||||
def testReadlineDeflated(self):
|
def testReadlineDeflated(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.readlineTest(f, zipfile.ZIP_DEFLATED)
|
self.readlineTest(f, zipfile.ZIP_DEFLATED)
|
||||||
|
|
||||||
def testReadlinesDeflated(self):
|
def testReadlinesDeflated(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.readlinesTest(f, zipfile.ZIP_DEFLATED)
|
self.readlinesTest(f, zipfile.ZIP_DEFLATED)
|
||||||
|
|
||||||
def testIterlinesDeflated(self):
|
def testIterlinesDeflated(self):
|
||||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
|
||||||
self.iterlinesTest(f, zipfile.ZIP_DEFLATED)
|
self.iterlinesTest(f, zipfile.ZIP_DEFLATED)
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
|
|
|
||||||
112
Lib/zipfile.py
112
Lib/zipfile.py
|
|
@ -1,5 +1,7 @@
|
||||||
"""
|
"""
|
||||||
Read and write ZIP files.
|
Read and write ZIP files.
|
||||||
|
|
||||||
|
XXX references to utf-8 need further investigation.
|
||||||
"""
|
"""
|
||||||
import struct, os, time, sys
|
import struct, os, time, sys
|
||||||
import binascii, io
|
import binascii, io
|
||||||
|
|
@ -33,15 +35,15 @@ ZIP_DEFLATED = 8
|
||||||
|
|
||||||
# Here are some struct module formats for reading headers
|
# Here are some struct module formats for reading headers
|
||||||
structEndArchive = "<4s4H2lH" # 9 items, end of archive, 22 bytes
|
structEndArchive = "<4s4H2lH" # 9 items, end of archive, 22 bytes
|
||||||
stringEndArchive = "PK\005\006" # magic number for end of archive record
|
stringEndArchive = b"PK\005\006" # magic number for end of archive record
|
||||||
structCentralDir = "<4s4B4HlLL5HLl"# 19 items, central directory, 46 bytes
|
structCentralDir = "<4s4B4HlLL5HLl"# 19 items, central directory, 46 bytes
|
||||||
stringCentralDir = "PK\001\002" # magic number for central directory
|
stringCentralDir = b"PK\001\002" # magic number for central directory
|
||||||
structFileHeader = "<4s2B4HlLL2H" # 12 items, file header record, 30 bytes
|
structFileHeader = "<4s2B4HlLL2H" # 12 items, file header record, 30 bytes
|
||||||
stringFileHeader = "PK\003\004" # magic number for file header
|
stringFileHeader = b"PK\003\004" # magic number for file header
|
||||||
structEndArchive64Locator = "<4slql" # 4 items, locate Zip64 header, 20 bytes
|
structEndArchive64Locator = "<4slql" # 4 items, locate Zip64 header, 20 bytes
|
||||||
stringEndArchive64Locator = "PK\x06\x07" # magic token for locator header
|
stringEndArchive64Locator = b"PK\x06\x07" # magic token for locator header
|
||||||
structEndArchive64 = "<4sqhhllqqqq" # 10 items, end of archive (Zip64), 56 bytes
|
structEndArchive64 = "<4sqhhllqqqq" # 10 items, end of archive (Zip64), 56 bytes
|
||||||
stringEndArchive64 = "PK\x06\x06" # magic token for Zip64 header
|
stringEndArchive64 = b"PK\x06\x06" # magic token for Zip64 header
|
||||||
|
|
||||||
|
|
||||||
# indexes of entries in the central directory structure
|
# indexes of entries in the central directory structure
|
||||||
|
|
@ -82,7 +84,7 @@ _FH_EXTRA_FIELD_LENGTH = 11
|
||||||
def is_zipfile(filename):
|
def is_zipfile(filename):
|
||||||
"""Quickly see if file is a ZIP file by checking the magic number."""
|
"""Quickly see if file is a ZIP file by checking the magic number."""
|
||||||
try:
|
try:
|
||||||
fpin = open(filename, "rb")
|
fpin = io.open(filename, "rb")
|
||||||
endrec = _EndRecData(fpin)
|
endrec = _EndRecData(fpin)
|
||||||
fpin.close()
|
fpin.close()
|
||||||
if endrec:
|
if endrec:
|
||||||
|
|
@ -206,8 +208,8 @@ class ZipInfo (object):
|
||||||
self.date_time = date_time # year, month, day, hour, min, sec
|
self.date_time = date_time # year, month, day, hour, min, sec
|
||||||
# Standard values:
|
# Standard values:
|
||||||
self.compress_type = ZIP_STORED # Type of compression for the file
|
self.compress_type = ZIP_STORED # Type of compression for the file
|
||||||
self.comment = "" # Comment for each file
|
self.comment = b"" # Comment for each file
|
||||||
self.extra = "" # ZIP extra data
|
self.extra = b"" # ZIP extra data
|
||||||
if sys.platform == 'win32':
|
if sys.platform == 'win32':
|
||||||
self.create_system = 0 # System which created ZIP archive
|
self.create_system = 0 # System which created ZIP archive
|
||||||
else:
|
else:
|
||||||
|
|
@ -257,7 +259,7 @@ class ZipInfo (object):
|
||||||
self.compress_type, dostime, dosdate, CRC,
|
self.compress_type, dostime, dosdate, CRC,
|
||||||
compress_size, file_size,
|
compress_size, file_size,
|
||||||
len(self.filename), len(extra))
|
len(self.filename), len(extra))
|
||||||
return header + self.filename + extra
|
return header + self.filename.encode("utf-8") + extra
|
||||||
|
|
||||||
def _decodeExtra(self):
|
def _decodeExtra(self):
|
||||||
# Try to decode the extra field.
|
# Try to decode the extra field.
|
||||||
|
|
@ -331,7 +333,7 @@ class _ZipDecrypter:
|
||||||
|
|
||||||
def _crc32(self, ch, crc):
|
def _crc32(self, ch, crc):
|
||||||
"""Compute the CRC32 primitive on one byte."""
|
"""Compute the CRC32 primitive on one byte."""
|
||||||
return ((crc >> 8) & 0xffffff) ^ self.crctable[(crc ^ ord(ch)) & 0xff]
|
return ((crc >> 8) & 0xffffff) ^ self.crctable[(crc ^ ch) & 0xff]
|
||||||
|
|
||||||
def __init__(self, pwd):
|
def __init__(self, pwd):
|
||||||
self.key0 = 305419896
|
self.key0 = 305419896
|
||||||
|
|
@ -344,20 +346,13 @@ class _ZipDecrypter:
|
||||||
self.key0 = self._crc32(c, self.key0)
|
self.key0 = self._crc32(c, self.key0)
|
||||||
self.key1 = (self.key1 + (self.key0 & 255)) & 4294967295
|
self.key1 = (self.key1 + (self.key0 & 255)) & 4294967295
|
||||||
self.key1 = (self.key1 * 134775813 + 1) & 4294967295
|
self.key1 = (self.key1 * 134775813 + 1) & 4294967295
|
||||||
self.key2 = self._crc32(chr((self.key1 >> 24) & 255), self.key2)
|
self.key2 = self._crc32((self.key1 >> 24) & 255, self.key2)
|
||||||
|
|
||||||
def __call__(self, c):
|
def __call__(self, c):
|
||||||
"""Decrypt a single character."""
|
"""Decrypt a single character."""
|
||||||
# XXX When this is called with a byte instead of a char, ord()
|
assert isinstance(c, int)
|
||||||
# isn't needed. Don't die in that case. In the future we should
|
|
||||||
# just leave this out, once we're always using bytes.
|
|
||||||
try:
|
|
||||||
c = ord(c)
|
|
||||||
except TypeError:
|
|
||||||
pass
|
|
||||||
k = self.key2 | 2
|
k = self.key2 | 2
|
||||||
c = c ^ (((k * (k^1)) >> 8) & 255)
|
c = c ^ (((k * (k^1)) >> 8) & 255)
|
||||||
c = chr(c)
|
|
||||||
self._UpdateKeys(c)
|
self._UpdateKeys(c)
|
||||||
return c
|
return c
|
||||||
|
|
||||||
|
|
@ -370,13 +365,13 @@ class ZipExtFile:
|
||||||
self.fileobj = fileobj
|
self.fileobj = fileobj
|
||||||
self.decrypter = decrypt
|
self.decrypter = decrypt
|
||||||
self.bytes_read = 0
|
self.bytes_read = 0
|
||||||
self.rawbuffer = ''
|
self.rawbuffer = b''
|
||||||
self.readbuffer = ''
|
self.readbuffer = b''
|
||||||
self.linebuffer = ''
|
self.linebuffer = b''
|
||||||
self.eof = False
|
self.eof = False
|
||||||
self.univ_newlines = False
|
self.univ_newlines = False
|
||||||
self.nlSeps = ("\n", )
|
self.nlSeps = (b"\n", )
|
||||||
self.lastdiscard = ''
|
self.lastdiscard = b''
|
||||||
|
|
||||||
self.compress_type = zipinfo.compress_type
|
self.compress_type = zipinfo.compress_type
|
||||||
self.compress_size = zipinfo.compress_size
|
self.compress_size = zipinfo.compress_size
|
||||||
|
|
@ -394,9 +389,9 @@ class ZipExtFile:
|
||||||
self.univ_newlines = univ_newlines
|
self.univ_newlines = univ_newlines
|
||||||
|
|
||||||
# pick line separator char(s) based on universal newlines flag
|
# pick line separator char(s) based on universal newlines flag
|
||||||
self.nlSeps = ("\n", )
|
self.nlSeps = (b"\n", )
|
||||||
if self.univ_newlines:
|
if self.univ_newlines:
|
||||||
self.nlSeps = ("\r\n", "\r", "\n")
|
self.nlSeps = (b"\r\n", b"\r", b"\n")
|
||||||
|
|
||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
return self
|
return self
|
||||||
|
|
@ -417,7 +412,7 @@ class ZipExtFile:
|
||||||
# ugly check for cases where half of an \r\n pair was
|
# ugly check for cases where half of an \r\n pair was
|
||||||
# read on the last pass, and the \r was discarded. In this
|
# read on the last pass, and the \r was discarded. In this
|
||||||
# case we just throw away the \n at the start of the buffer.
|
# case we just throw away the \n at the start of the buffer.
|
||||||
if (self.lastdiscard, self.linebuffer[0]) == ('\r','\n'):
|
if (self.lastdiscard, self.linebuffer[0]) == (b'\r', b'\n'):
|
||||||
self.linebuffer = self.linebuffer[1:]
|
self.linebuffer = self.linebuffer[1:]
|
||||||
|
|
||||||
for sep in self.nlSeps:
|
for sep in self.nlSeps:
|
||||||
|
|
@ -435,7 +430,7 @@ class ZipExtFile:
|
||||||
if size < 0:
|
if size < 0:
|
||||||
size = sys.maxint
|
size = sys.maxint
|
||||||
elif size == 0:
|
elif size == 0:
|
||||||
return ''
|
return b''
|
||||||
|
|
||||||
# check for a newline already in buffer
|
# check for a newline already in buffer
|
||||||
nl, nllen = self._checkfornewline()
|
nl, nllen = self._checkfornewline()
|
||||||
|
|
@ -461,7 +456,7 @@ class ZipExtFile:
|
||||||
# so return current buffer
|
# so return current buffer
|
||||||
if nl < 0:
|
if nl < 0:
|
||||||
s = self.linebuffer
|
s = self.linebuffer
|
||||||
self.linebuffer = ''
|
self.linebuffer = b''
|
||||||
return s
|
return s
|
||||||
|
|
||||||
buf = self.linebuffer[:nl]
|
buf = self.linebuffer[:nl]
|
||||||
|
|
@ -470,7 +465,7 @@ class ZipExtFile:
|
||||||
|
|
||||||
# line is always returned with \n as newline char (except possibly
|
# line is always returned with \n as newline char (except possibly
|
||||||
# for a final incomplete line in the file, which is handled above).
|
# for a final incomplete line in the file, which is handled above).
|
||||||
return buf + "\n"
|
return buf + b"\n"
|
||||||
|
|
||||||
def readlines(self, sizehint = -1):
|
def readlines(self, sizehint = -1):
|
||||||
"""Return a list with all (following) lines. The sizehint parameter
|
"""Return a list with all (following) lines. The sizehint parameter
|
||||||
|
|
@ -516,18 +511,23 @@ class ZipExtFile:
|
||||||
|
|
||||||
# try to read from file (if necessary)
|
# try to read from file (if necessary)
|
||||||
if bytesToRead > 0:
|
if bytesToRead > 0:
|
||||||
bytes = self.fileobj.read(bytesToRead)
|
data = self.fileobj.read(bytesToRead)
|
||||||
self.bytes_read += len(bytes)
|
self.bytes_read += len(data)
|
||||||
self.rawbuffer += bytes
|
try:
|
||||||
|
self.rawbuffer += data
|
||||||
|
except:
|
||||||
|
print(repr(self.fileobj), repr(self.rawbuffer),
|
||||||
|
repr(data))
|
||||||
|
raise
|
||||||
|
|
||||||
# handle contents of raw buffer
|
# handle contents of raw buffer
|
||||||
if self.rawbuffer:
|
if self.rawbuffer:
|
||||||
newdata = self.rawbuffer
|
newdata = self.rawbuffer
|
||||||
self.rawbuffer = ''
|
self.rawbuffer = b''
|
||||||
|
|
||||||
# decrypt new data if we were given an object to handle that
|
# decrypt new data if we were given an object to handle that
|
||||||
if newdata and self.decrypter is not None:
|
if newdata and self.decrypter is not None:
|
||||||
newdata = ''.join(map(self.decrypter, newdata))
|
newdata = bytes(map(self.decrypter, newdata))
|
||||||
|
|
||||||
# decompress newly read data if necessary
|
# decompress newly read data if necessary
|
||||||
if newdata and self.compress_type == ZIP_DEFLATED:
|
if newdata and self.compress_type == ZIP_DEFLATED:
|
||||||
|
|
@ -546,13 +546,13 @@ class ZipExtFile:
|
||||||
|
|
||||||
# return what the user asked for
|
# return what the user asked for
|
||||||
if size is None or len(self.readbuffer) <= size:
|
if size is None or len(self.readbuffer) <= size:
|
||||||
bytes = self.readbuffer
|
data = self.readbuffer
|
||||||
self.readbuffer = ''
|
self.readbuffer = b''
|
||||||
else:
|
else:
|
||||||
bytes = self.readbuffer[:size]
|
data = self.readbuffer[:size]
|
||||||
self.readbuffer = self.readbuffer[size:]
|
self.readbuffer = self.readbuffer[size:]
|
||||||
|
|
||||||
return bytes
|
return data
|
||||||
|
|
||||||
|
|
||||||
class ZipFile:
|
class ZipFile:
|
||||||
|
|
@ -593,15 +593,16 @@ class ZipFile:
|
||||||
|
|
||||||
# Check if we were passed a file-like object
|
# Check if we were passed a file-like object
|
||||||
if isinstance(file, basestring):
|
if isinstance(file, basestring):
|
||||||
|
# No, it's a filename
|
||||||
self._filePassed = 0
|
self._filePassed = 0
|
||||||
self.filename = file
|
self.filename = file
|
||||||
modeDict = {'r' : 'rb', 'w': 'wb', 'a' : 'r+b'}
|
modeDict = {'r' : 'rb', 'w': 'wb', 'a' : 'r+b'}
|
||||||
try:
|
try:
|
||||||
self.fp = open(file, modeDict[mode])
|
self.fp = io.open(file, modeDict[mode])
|
||||||
except IOError:
|
except IOError:
|
||||||
if mode == 'a':
|
if mode == 'a':
|
||||||
mode = key = 'w'
|
mode = key = 'w'
|
||||||
self.fp = open(file, modeDict[mode])
|
self.fp = io.open(file, modeDict[mode])
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
else:
|
else:
|
||||||
|
|
@ -661,7 +662,7 @@ class ZipFile:
|
||||||
self.start_dir = offset_cd + concat
|
self.start_dir = offset_cd + concat
|
||||||
fp.seek(self.start_dir, 0)
|
fp.seek(self.start_dir, 0)
|
||||||
data = fp.read(size_cd)
|
data = fp.read(size_cd)
|
||||||
fp = io.StringIO(data)
|
fp = io.BytesIO(data)
|
||||||
total = 0
|
total = 0
|
||||||
while total < size_cd:
|
while total < size_cd:
|
||||||
centdir = fp.read(46)
|
centdir = fp.read(46)
|
||||||
|
|
@ -673,7 +674,7 @@ class ZipFile:
|
||||||
print(centdir)
|
print(centdir)
|
||||||
filename = fp.read(centdir[_CD_FILENAME_LENGTH])
|
filename = fp.read(centdir[_CD_FILENAME_LENGTH])
|
||||||
# Create ZipInfo instance to store file information
|
# Create ZipInfo instance to store file information
|
||||||
x = ZipInfo(filename)
|
x = ZipInfo(str(filename))
|
||||||
x.extra = fp.read(centdir[_CD_EXTRA_FIELD_LENGTH])
|
x.extra = fp.read(centdir[_CD_EXTRA_FIELD_LENGTH])
|
||||||
x.comment = fp.read(centdir[_CD_COMMENT_LENGTH])
|
x.comment = fp.read(centdir[_CD_COMMENT_LENGTH])
|
||||||
total = (total + centdir[_CD_FILENAME_LENGTH]
|
total = (total + centdir[_CD_FILENAME_LENGTH]
|
||||||
|
|
@ -708,12 +709,16 @@ class ZipFile:
|
||||||
archive."""
|
archive."""
|
||||||
return self.filelist
|
return self.filelist
|
||||||
|
|
||||||
def printdir(self):
|
def printdir(self, file=None):
|
||||||
"""Print a table of contents for the zip file."""
|
"""Print a table of contents for the zip file."""
|
||||||
print("%-46s %19s %12s" % ("File Name", "Modified ", "Size"))
|
if file is None:
|
||||||
|
file = sys.stdout
|
||||||
|
print("%-46s %19s %12s" % ("File Name", "Modified ", "Size"),
|
||||||
|
file=file)
|
||||||
for zinfo in self.filelist:
|
for zinfo in self.filelist:
|
||||||
date = "%d-%02d-%02d %02d:%02d:%02d" % zinfo.date_time
|
date = "%d-%02d-%02d %02d:%02d:%02d" % zinfo.date_time
|
||||||
print("%-46s %s %12d" % (zinfo.filename, date, zinfo.file_size))
|
print("%-46s %s %12d" % (zinfo.filename, date, zinfo.file_size),
|
||||||
|
file=file)
|
||||||
|
|
||||||
def testzip(self):
|
def testzip(self):
|
||||||
"""Read all the files and check the CRC."""
|
"""Read all the files and check the CRC."""
|
||||||
|
|
@ -730,6 +735,7 @@ class ZipFile:
|
||||||
|
|
||||||
def setpassword(self, pwd):
|
def setpassword(self, pwd):
|
||||||
"""Set default password for encrypted files."""
|
"""Set default password for encrypted files."""
|
||||||
|
assert isinstance(pwd, bytes)
|
||||||
self.pwd = pwd
|
self.pwd = pwd
|
||||||
|
|
||||||
def read(self, name, pwd=None):
|
def read(self, name, pwd=None):
|
||||||
|
|
@ -749,7 +755,7 @@ class ZipFile:
|
||||||
if self._filePassed:
|
if self._filePassed:
|
||||||
zef_file = self.fp
|
zef_file = self.fp
|
||||||
else:
|
else:
|
||||||
zef_file = open(self.filename, 'rb')
|
zef_file = io.open(self.filename, 'rb')
|
||||||
|
|
||||||
# Get info object for name
|
# Get info object for name
|
||||||
zinfo = self.getinfo(name)
|
zinfo = self.getinfo(name)
|
||||||
|
|
@ -768,9 +774,9 @@ class ZipFile:
|
||||||
if fheader[_FH_EXTRA_FIELD_LENGTH]:
|
if fheader[_FH_EXTRA_FIELD_LENGTH]:
|
||||||
zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH])
|
zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH])
|
||||||
|
|
||||||
if fname != zinfo.orig_filename:
|
if fname != zinfo.orig_filename.encode("utf-8"):
|
||||||
raise BadZipfile, \
|
raise BadZipfile, \
|
||||||
'File name in directory "%s" and header "%s" differ.' % (
|
'File name in directory %r and header %r differ.' % (
|
||||||
zinfo.orig_filename, fname)
|
zinfo.orig_filename, fname)
|
||||||
|
|
||||||
# check for encrypted flag & handle password
|
# check for encrypted flag & handle password
|
||||||
|
|
@ -790,7 +796,7 @@ class ZipFile:
|
||||||
# and is used to check the correctness of the password.
|
# and is used to check the correctness of the password.
|
||||||
bytes = zef_file.read(12)
|
bytes = zef_file.read(12)
|
||||||
h = map(zd, bytes[0:12])
|
h = map(zd, bytes[0:12])
|
||||||
if ord(h[11]) != ((zinfo.CRC>>24)&255):
|
if h[11] != ((zinfo.CRC>>24) & 255):
|
||||||
raise RuntimeError, "Bad password for file %s" % name
|
raise RuntimeError, "Bad password for file %s" % name
|
||||||
|
|
||||||
# build and return a ZipExtFile
|
# build and return a ZipExtFile
|
||||||
|
|
@ -852,7 +858,7 @@ class ZipFile:
|
||||||
|
|
||||||
self._writecheck(zinfo)
|
self._writecheck(zinfo)
|
||||||
self._didModify = True
|
self._didModify = True
|
||||||
fp = open(filename, "rb")
|
fp = io.open(filename, "rb")
|
||||||
# Must overwrite CRC and sizes with correct data later
|
# Must overwrite CRC and sizes with correct data later
|
||||||
zinfo.CRC = CRC = 0
|
zinfo.CRC = CRC = 0
|
||||||
zinfo.compress_size = compress_size = 0
|
zinfo.compress_size = compress_size = 0
|
||||||
|
|
@ -982,7 +988,7 @@ class ZipFile:
|
||||||
0, zinfo.internal_attr, zinfo.external_attr,
|
0, zinfo.internal_attr, zinfo.external_attr,
|
||||||
header_offset)
|
header_offset)
|
||||||
self.fp.write(centdir)
|
self.fp.write(centdir)
|
||||||
self.fp.write(zinfo.filename)
|
self.fp.write(zinfo.filename.encode("utf-8"))
|
||||||
self.fp.write(extra_data)
|
self.fp.write(extra_data)
|
||||||
self.fp.write(zinfo.comment)
|
self.fp.write(zinfo.comment)
|
||||||
|
|
||||||
|
|
@ -1163,7 +1169,7 @@ def main(args = None):
|
||||||
tgtdir = os.path.dirname(tgt)
|
tgtdir = os.path.dirname(tgt)
|
||||||
if not os.path.exists(tgtdir):
|
if not os.path.exists(tgtdir):
|
||||||
os.makedirs(tgtdir)
|
os.makedirs(tgtdir)
|
||||||
fp = open(tgt, 'wb')
|
fp = io.open(tgt, 'wb')
|
||||||
fp.write(zf.read(path))
|
fp.write(zf.read(path))
|
||||||
fp.close()
|
fp.close()
|
||||||
zf.close()
|
zf.close()
|
||||||
|
|
|
||||||
|
|
@ -1635,27 +1635,57 @@ s_pack_internal(PyStructObject *soself, PyObject *args, int offset, char* buf)
|
||||||
const formatdef *e = code->fmtdef;
|
const formatdef *e = code->fmtdef;
|
||||||
char *res = buf + code->offset;
|
char *res = buf + code->offset;
|
||||||
if (e->format == 's') {
|
if (e->format == 's') {
|
||||||
if (!PyString_Check(v)) {
|
int isstring;
|
||||||
|
void *p;
|
||||||
|
if (PyUnicode_Check(v)) {
|
||||||
|
v = _PyUnicode_AsDefaultEncodedString(v, NULL);
|
||||||
|
if (v == NULL)
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
isstring = PyString_Check(v);
|
||||||
|
if (!isstring && !PyBytes_Check(v)) {
|
||||||
PyErr_SetString(StructError,
|
PyErr_SetString(StructError,
|
||||||
"argument for 's' must be a string");
|
"argument for 's' must be a string");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
n = PyString_GET_SIZE(v);
|
if (isstring) {
|
||||||
|
n = PyString_GET_SIZE(v);
|
||||||
|
p = PyString_AS_STRING(v);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
n = PyBytes_GET_SIZE(v);
|
||||||
|
p = PyBytes_AS_STRING(v);
|
||||||
|
}
|
||||||
if (n > code->size)
|
if (n > code->size)
|
||||||
n = code->size;
|
n = code->size;
|
||||||
if (n > 0)
|
if (n > 0)
|
||||||
memcpy(res, PyString_AS_STRING(v), n);
|
memcpy(res, p, n);
|
||||||
} else if (e->format == 'p') {
|
} else if (e->format == 'p') {
|
||||||
if (!PyString_Check(v)) {
|
int isstring;
|
||||||
|
void *p;
|
||||||
|
if (PyUnicode_Check(v)) {
|
||||||
|
v = _PyUnicode_AsDefaultEncodedString(v, NULL);
|
||||||
|
if (v == NULL)
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
isstring = PyString_Check(v);
|
||||||
|
if (!isstring && !PyBytes_Check(v)) {
|
||||||
PyErr_SetString(StructError,
|
PyErr_SetString(StructError,
|
||||||
"argument for 'p' must be a string");
|
"argument for 'p' must be a string");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
n = PyString_GET_SIZE(v);
|
if (isstring) {
|
||||||
|
n = PyString_GET_SIZE(v);
|
||||||
|
p = PyString_AS_STRING(v);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
n = PyBytes_GET_SIZE(v);
|
||||||
|
p = PyBytes_AS_STRING(v);
|
||||||
|
}
|
||||||
if (n > (code->size - 1))
|
if (n > (code->size - 1))
|
||||||
n = code->size - 1;
|
n = code->size - 1;
|
||||||
if (n > 0)
|
if (n > 0)
|
||||||
memcpy(res + 1, PyString_AS_STRING(v), n);
|
memcpy(res + 1, p, n);
|
||||||
if (n > 255)
|
if (n > 255)
|
||||||
n = 255;
|
n = 255;
|
||||||
*res = Py_SAFE_DOWNCAST(n, Py_ssize_t, unsigned char);
|
*res = Py_SAFE_DOWNCAST(n, Py_ssize_t, unsigned char);
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue