mirror of
https://github.com/python/cpython.git
synced 2025-11-02 11:08:57 +00:00
Issue #14099: ZipFile.open() no longer reopen the underlying file. Objects
returned by ZipFile.open() can now operate independently of the ZipFile even if the ZipFile was created by passing in a file-like object as the first argument to the constructor.
This commit is contained in:
parent
d87de83582
commit
1ad088f3ea
4 changed files with 166 additions and 73 deletions
|
|
@ -1,3 +1,4 @@
|
|||
import contextlib
|
||||
import io
|
||||
import os
|
||||
import sys
|
||||
|
|
@ -25,6 +26,9 @@ SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'),
|
|||
('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'),
|
||||
('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')]
|
||||
|
||||
def getrandbytes(size):
|
||||
return getrandbits(8 * size).to_bytes(size, 'little')
|
||||
|
||||
def get_files(test):
|
||||
yield TESTFN2
|
||||
with TemporaryFile() as f:
|
||||
|
|
@ -289,7 +293,7 @@ class AbstractTestsWithSourceFile:
|
|||
# than requested.
|
||||
for test_size in (1, 4095, 4096, 4097, 16384):
|
||||
file_size = test_size + 1
|
||||
junk = getrandbits(8 * file_size).to_bytes(file_size, 'little')
|
||||
junk = getrandbytes(file_size)
|
||||
with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf:
|
||||
zipf.writestr('foo', junk)
|
||||
with zipf.open('foo', 'r') as fp:
|
||||
|
|
@ -1635,46 +1639,111 @@ class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
|
|||
|
||||
@requires_zlib
|
||||
class TestsWithMultipleOpens(unittest.TestCase):
|
||||
def setUp(self):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.data1 = b'111' + getrandbytes(10000)
|
||||
cls.data2 = b'222' + getrandbytes(10000)
|
||||
|
||||
def make_test_archive(self, f):
|
||||
# Create the ZIP archive
|
||||
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp:
|
||||
zipfp.writestr('ones', '1'*FIXEDTEST_SIZE)
|
||||
zipfp.writestr('twos', '2'*FIXEDTEST_SIZE)
|
||||
with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp:
|
||||
zipfp.writestr('ones', self.data1)
|
||||
zipfp.writestr('twos', self.data2)
|
||||
|
||||
def test_same_file(self):
|
||||
# Verify that (when the ZipFile is in control of creating file objects)
|
||||
# multiple open() calls can be made without interfering with each other.
|
||||
with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
|
||||
with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2:
|
||||
data1 = zopen1.read(500)
|
||||
data2 = zopen2.read(500)
|
||||
data1 += zopen1.read(500)
|
||||
data2 += zopen2.read(500)
|
||||
self.assertEqual(data1, data2)
|
||||
for f in get_files(self):
|
||||
self.make_test_archive(f)
|
||||
with zipfile.ZipFile(f, mode="r") as zipf:
|
||||
with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2:
|
||||
data1 = zopen1.read(500)
|
||||
data2 = zopen2.read(500)
|
||||
data1 += zopen1.read()
|
||||
data2 += zopen2.read()
|
||||
self.assertEqual(data1, data2)
|
||||
self.assertEqual(data1, self.data1)
|
||||
|
||||
def test_different_file(self):
|
||||
# Verify that (when the ZipFile is in control of creating file objects)
|
||||
# multiple open() calls can be made without interfering with each other.
|
||||
with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
|
||||
with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
|
||||
data1 = zopen1.read(500)
|
||||
data2 = zopen2.read(500)
|
||||
data1 += zopen1.read(500)
|
||||
data2 += zopen2.read(500)
|
||||
self.assertEqual(data1, b'1'*FIXEDTEST_SIZE)
|
||||
self.assertEqual(data2, b'2'*FIXEDTEST_SIZE)
|
||||
for f in get_files(self):
|
||||
self.make_test_archive(f)
|
||||
with zipfile.ZipFile(f, mode="r") as zipf:
|
||||
with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
|
||||
data1 = zopen1.read(500)
|
||||
data2 = zopen2.read(500)
|
||||
data1 += zopen1.read()
|
||||
data2 += zopen2.read()
|
||||
self.assertEqual(data1, self.data1)
|
||||
self.assertEqual(data2, self.data2)
|
||||
|
||||
def test_interleaved(self):
|
||||
# Verify that (when the ZipFile is in control of creating file objects)
|
||||
# multiple open() calls can be made without interfering with each other.
|
||||
with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
|
||||
with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
|
||||
for f in get_files(self):
|
||||
self.make_test_archive(f)
|
||||
with zipfile.ZipFile(f, mode="r") as zipf:
|
||||
with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
|
||||
data1 = zopen1.read(500)
|
||||
data2 = zopen2.read(500)
|
||||
data1 += zopen1.read()
|
||||
data2 += zopen2.read()
|
||||
self.assertEqual(data1, self.data1)
|
||||
self.assertEqual(data2, self.data2)
|
||||
|
||||
def test_read_after_close(self):
|
||||
for f in get_files(self):
|
||||
self.make_test_archive(f)
|
||||
with contextlib.ExitStack() as stack:
|
||||
with zipfile.ZipFile(f, 'r') as zipf:
|
||||
zopen1 = stack.enter_context(zipf.open('ones'))
|
||||
zopen2 = stack.enter_context(zipf.open('twos'))
|
||||
data1 = zopen1.read(500)
|
||||
data2 = zopen2.read(500)
|
||||
data1 += zopen1.read(500)
|
||||
data2 += zopen2.read(500)
|
||||
self.assertEqual(data1, b'1'*FIXEDTEST_SIZE)
|
||||
self.assertEqual(data2, b'2'*FIXEDTEST_SIZE)
|
||||
data1 += zopen1.read()
|
||||
data2 += zopen2.read()
|
||||
self.assertEqual(data1, self.data1)
|
||||
self.assertEqual(data2, self.data2)
|
||||
|
||||
def test_read_after_write(self):
|
||||
for f in get_files(self):
|
||||
with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
||||
zipf.writestr('ones', self.data1)
|
||||
zipf.writestr('twos', self.data2)
|
||||
with zipf.open('ones') as zopen1:
|
||||
data1 = zopen1.read(500)
|
||||
self.assertEqual(data1, self.data1[:500])
|
||||
with zipfile.ZipFile(f, 'r') as zipf:
|
||||
data1 = zipf.read('ones')
|
||||
data2 = zipf.read('twos')
|
||||
self.assertEqual(data1, self.data1)
|
||||
self.assertEqual(data2, self.data2)
|
||||
|
||||
def test_write_after_read(self):
|
||||
for f in get_files(self):
|
||||
with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf:
|
||||
zipf.writestr('ones', self.data1)
|
||||
with zipf.open('ones') as zopen1:
|
||||
zopen1.read(500)
|
||||
zipf.writestr('twos', self.data2)
|
||||
with zipfile.ZipFile(f, 'r') as zipf:
|
||||
data1 = zipf.read('ones')
|
||||
data2 = zipf.read('twos')
|
||||
self.assertEqual(data1, self.data1)
|
||||
self.assertEqual(data2, self.data2)
|
||||
|
||||
def test_many_opens(self):
|
||||
# Verify that read() and open() promptly close the file descriptor,
|
||||
# and don't rely on the garbage collector to free resources.
|
||||
self.make_test_archive(TESTFN2)
|
||||
with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
|
||||
for x in range(100):
|
||||
zipf.read('ones')
|
||||
with zipf.open('ones') as zopen1:
|
||||
pass
|
||||
with open(os.devnull) as f:
|
||||
self.assertLess(f.fileno(), 100)
|
||||
|
||||
def tearDown(self):
|
||||
unlink(TESTFN2)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue