mirror of
https://github.com/python/cpython.git
synced 2025-07-29 06:05:00 +00:00
Issue #7610: Reworked implementation of the internal
:class:`zipfile.ZipExtFile` class used to represent files stored inside an archive. The new implementation is significantly faster and can be wrapped in a :class:`io.BufferedReader` object for more speedups. It also solves an issue where interleaved calls to `read()` and `readline()` give wrong results. Patch by Nir Aides.
This commit is contained in:
parent
7b4e02c862
commit
94c33ebfa8
3 changed files with 227 additions and 171 deletions
|
@ -172,6 +172,45 @@ class TestsWithSourceFile(unittest.TestCase):
|
|||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
||||
self.zip_random_open_test(f, zipfile.ZIP_STORED)
|
||||
|
||||
def test_univeral_readaheads(self):
|
||||
f = StringIO()
|
||||
|
||||
data = 'a\r\n' * 16 * 1024
|
||||
zipfp = zipfile.ZipFile(f, 'w', zipfile.ZIP_STORED)
|
||||
zipfp.writestr(TESTFN, data)
|
||||
zipfp.close()
|
||||
|
||||
data2 = ''
|
||||
zipfp = zipfile.ZipFile(f, 'r')
|
||||
zipopen = zipfp.open(TESTFN, 'rU')
|
||||
for line in zipopen:
|
||||
data2 += line
|
||||
zipfp.close()
|
||||
|
||||
self.assertEqual(data, data2.replace('\n', '\r\n'))
|
||||
|
||||
def zip_readline_read_test(self, f, compression):
|
||||
self.make_test_archive(f, compression)
|
||||
|
||||
# Read the ZIP archive
|
||||
zipfp = zipfile.ZipFile(f, "r")
|
||||
zipopen = zipfp.open(TESTFN)
|
||||
|
||||
data = ''
|
||||
while True:
|
||||
read = zipopen.readline()
|
||||
if not read:
|
||||
break
|
||||
data += read
|
||||
|
||||
read = zipopen.read(100)
|
||||
if not read:
|
||||
break
|
||||
data += read
|
||||
|
||||
self.assertEqual(data, self.data)
|
||||
zipfp.close()
|
||||
|
||||
def zip_readline_test(self, f, compression):
|
||||
self.make_test_archive(f, compression)
|
||||
|
||||
|
@ -199,6 +238,11 @@ class TestsWithSourceFile(unittest.TestCase):
|
|||
for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)):
|
||||
self.assertEqual(zipline, line + '\n')
|
||||
|
||||
def test_readline_read_stored(self):
|
||||
# Issue #7610: calls to readline() interleaved with calls to read().
|
||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
||||
self.zip_readline_read_test(f, zipfile.ZIP_STORED)
|
||||
|
||||
def test_readline_stored(self):
|
||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
||||
self.zip_readline_test(f, zipfile.ZIP_STORED)
|
||||
|
@ -226,6 +270,12 @@ class TestsWithSourceFile(unittest.TestCase):
|
|||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
||||
self.zip_random_open_test(f, zipfile.ZIP_DEFLATED)
|
||||
|
||||
@skipUnless(zlib, "requires zlib")
|
||||
def test_readline_read_deflated(self):
|
||||
# Issue #7610: calls to readline() interleaved with calls to read().
|
||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
||||
self.zip_readline_read_test(f, zipfile.ZIP_DEFLATED)
|
||||
|
||||
@skipUnless(zlib, "requires zlib")
|
||||
def test_readline_deflated(self):
|
||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
||||
|
@ -1058,6 +1108,29 @@ class UniversalNewlineTests(unittest.TestCase):
|
|||
zipdata = zipfp.open(fn, "rU").read()
|
||||
self.assertEqual(self.arcdata[sep], zipdata)
|
||||
|
||||
def readline_read_test(self, f, compression):
|
||||
self.make_test_archive(f, compression)
|
||||
|
||||
# Read the ZIP archive
|
||||
zipfp = zipfile.ZipFile(f, "r")
|
||||
for sep, fn in self.arcfiles.items():
|
||||
zipopen = zipfp.open(fn, "rU")
|
||||
data = ''
|
||||
while True:
|
||||
read = zipopen.readline()
|
||||
if not read:
|
||||
break
|
||||
data += read
|
||||
|
||||
read = zipopen.read(5)
|
||||
if not read:
|
||||
break
|
||||
data += read
|
||||
|
||||
self.assertEqual(data, self.arcdata['\n'])
|
||||
|
||||
zipfp.close()
|
||||
|
||||
def readline_test(self, f, compression):
|
||||
self.make_test_archive(f, compression)
|
||||
|
||||
|
@ -1092,6 +1165,11 @@ class UniversalNewlineTests(unittest.TestCase):
|
|||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
||||
self.read_test(f, zipfile.ZIP_STORED)
|
||||
|
||||
def test_readline_read_stored(self):
|
||||
# Issue #7610: calls to readline() interleaved with calls to read().
|
||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
||||
self.readline_read_test(f, zipfile.ZIP_STORED)
|
||||
|
||||
def test_readline_stored(self):
|
||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
||||
self.readline_test(f, zipfile.ZIP_STORED)
|
||||
|
@ -1109,6 +1187,12 @@ class UniversalNewlineTests(unittest.TestCase):
|
|||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
||||
self.read_test(f, zipfile.ZIP_DEFLATED)
|
||||
|
||||
@skipUnless(zlib, "requires zlib")
|
||||
def test_readline_read_deflated(self):
|
||||
# Issue #7610: calls to readline() interleaved with calls to read().
|
||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
||||
self.readline_read_test(f, zipfile.ZIP_DEFLATED)
|
||||
|
||||
@skipUnless(zlib, "requires zlib")
|
||||
def test_readline_deflated(self):
|
||||
for f in (TESTFN2, TemporaryFile(), StringIO()):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue