mirror of
https://github.com/python/cpython.git
synced 2025-11-25 21:11:09 +00:00
Issue #26039: zipfile.ZipFile.open() can now be used to write data into a ZIP
file, as well as for extracting data. Patch by Thomas Kluyver.
This commit is contained in:
parent
5d1110a952
commit
18ee29d0b8
5 changed files with 295 additions and 127 deletions
|
|
@ -61,6 +61,9 @@ class AbstractTestsWithSourceFile:
|
|||
zipfp.write(TESTFN, "another.name")
|
||||
zipfp.write(TESTFN, TESTFN)
|
||||
zipfp.writestr("strfile", self.data)
|
||||
with zipfp.open('written-open-w', mode='w') as f:
|
||||
for line in self.line_gen:
|
||||
f.write(line)
|
||||
|
||||
def zip_test(self, f, compression):
|
||||
self.make_test_archive(f, compression)
|
||||
|
|
@ -76,7 +79,7 @@ class AbstractTestsWithSourceFile:
|
|||
zipfp.printdir(file=fp)
|
||||
directory = fp.getvalue()
|
||||
lines = directory.splitlines()
|
||||
self.assertEqual(len(lines), 4) # Number of files + header
|
||||
self.assertEqual(len(lines), 5) # Number of files + header
|
||||
|
||||
self.assertIn('File Name', lines[0])
|
||||
self.assertIn('Modified', lines[0])
|
||||
|
|
@ -90,23 +93,25 @@ class AbstractTestsWithSourceFile:
|
|||
|
||||
# Check the namelist
|
||||
names = zipfp.namelist()
|
||||
self.assertEqual(len(names), 3)
|
||||
self.assertEqual(len(names), 4)
|
||||
self.assertIn(TESTFN, names)
|
||||
self.assertIn("another.name", names)
|
||||
self.assertIn("strfile", names)
|
||||
self.assertIn("written-open-w", names)
|
||||
|
||||
# Check infolist
|
||||
infos = zipfp.infolist()
|
||||
names = [i.filename for i in infos]
|
||||
self.assertEqual(len(names), 3)
|
||||
self.assertEqual(len(names), 4)
|
||||
self.assertIn(TESTFN, names)
|
||||
self.assertIn("another.name", names)
|
||||
self.assertIn("strfile", names)
|
||||
self.assertIn("written-open-w", names)
|
||||
for i in infos:
|
||||
self.assertEqual(i.file_size, len(self.data))
|
||||
|
||||
# check getinfo
|
||||
for nm in (TESTFN, "another.name", "strfile"):
|
||||
for nm in (TESTFN, "another.name", "strfile", "written-open-w"):
|
||||
info = zipfp.getinfo(nm)
|
||||
self.assertEqual(info.filename, nm)
|
||||
self.assertEqual(info.file_size, len(self.data))
|
||||
|
|
@ -372,14 +377,18 @@ class StoredTestsWithSourceFile(AbstractTestsWithSourceFile,
|
|||
test_low_compression = None
|
||||
|
||||
def zip_test_writestr_permissions(self, f, compression):
|
||||
# Make sure that writestr creates files with mode 0600,
|
||||
# when it is passed a name rather than a ZipInfo instance.
|
||||
# Make sure that writestr and open(... mode='w') create files with
|
||||
# mode 0600, when they are passed a name rather than a ZipInfo
|
||||
# instance.
|
||||
|
||||
self.make_test_archive(f, compression)
|
||||
with zipfile.ZipFile(f, "r") as zipfp:
|
||||
zinfo = zipfp.getinfo('strfile')
|
||||
self.assertEqual(zinfo.external_attr, 0o600 << 16)
|
||||
|
||||
zinfo2 = zipfp.getinfo('written-open-w')
|
||||
self.assertEqual(zinfo2.external_attr, 0o600 << 16)
|
||||
|
||||
def test_writestr_permissions(self):
|
||||
for f in get_files(self):
|
||||
self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
|
||||
|
|
@ -451,6 +460,10 @@ class StoredTestsWithSourceFile(AbstractTestsWithSourceFile,
|
|||
with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
|
||||
self.assertRaises(RuntimeError, zipfp.write, TESTFN)
|
||||
|
||||
with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
|
||||
with self.assertRaises(RuntimeError):
|
||||
zipfp.open(TESTFN, mode='w')
|
||||
|
||||
def test_add_file_before_1980(self):
|
||||
# Set atime and mtime to 1970-01-01
|
||||
os.utime(TESTFN, (0, 0))
|
||||
|
|
@ -1428,6 +1441,35 @@ class OtherTests(unittest.TestCase):
|
|||
# testzip returns the name of the first corrupt file, or None
|
||||
self.assertIsNone(zipf.testzip())
|
||||
|
||||
def test_open_conflicting_handles(self):
|
||||
# It's only possible to open one writable file handle at a time
|
||||
msg1 = b"It's fun to charter an accountant!"
|
||||
msg2 = b"And sail the wide accountant sea"
|
||||
msg3 = b"To find, explore the funds offshore"
|
||||
with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf:
|
||||
with zipf.open('foo', mode='w') as w2:
|
||||
w2.write(msg1)
|
||||
with zipf.open('bar', mode='w') as w1:
|
||||
with self.assertRaises(RuntimeError):
|
||||
zipf.open('handle', mode='w')
|
||||
with self.assertRaises(RuntimeError):
|
||||
zipf.open('foo', mode='r')
|
||||
with self.assertRaises(RuntimeError):
|
||||
zipf.writestr('str', 'abcde')
|
||||
with self.assertRaises(RuntimeError):
|
||||
zipf.write(__file__, 'file')
|
||||
with self.assertRaises(RuntimeError):
|
||||
zipf.close()
|
||||
w1.write(msg2)
|
||||
with zipf.open('baz', mode='w') as w2:
|
||||
w2.write(msg3)
|
||||
|
||||
with zipfile.ZipFile(TESTFN2, 'r') as zipf:
|
||||
self.assertEqual(zipf.read('foo'), msg1)
|
||||
self.assertEqual(zipf.read('bar'), msg2)
|
||||
self.assertEqual(zipf.read('baz'), msg3)
|
||||
self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz'])
|
||||
|
||||
def tearDown(self):
|
||||
unlink(TESTFN)
|
||||
unlink(TESTFN2)
|
||||
|
|
@ -1761,6 +1803,22 @@ class UnseekableTests(unittest.TestCase):
|
|||
with zipf.open('twos') as zopen:
|
||||
self.assertEqual(zopen.read(), b'222')
|
||||
|
||||
def test_open_write(self):
|
||||
for wrapper in (lambda f: f), Tellable, Unseekable:
|
||||
with self.subTest(wrapper=wrapper):
|
||||
f = io.BytesIO()
|
||||
f.write(b'abc')
|
||||
bf = io.BufferedWriter(f)
|
||||
with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf:
|
||||
with zipf.open('ones', 'w') as zopen:
|
||||
zopen.write(b'111')
|
||||
with zipf.open('twos', 'w') as zopen:
|
||||
zopen.write(b'222')
|
||||
self.assertEqual(f.getvalue()[:5], b'abcPK')
|
||||
with zipfile.ZipFile(f) as zipf:
|
||||
self.assertEqual(zipf.read('ones'), b'111')
|
||||
self.assertEqual(zipf.read('twos'), b'222')
|
||||
|
||||
|
||||
@requires_zlib
|
||||
class TestsWithMultipleOpens(unittest.TestCase):
|
||||
|
|
@ -1870,6 +1928,19 @@ class TestsWithMultipleOpens(unittest.TestCase):
|
|||
with open(os.devnull) as f:
|
||||
self.assertLess(f.fileno(), 100)
|
||||
|
||||
def test_write_while_reading(self):
|
||||
with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
||||
zipf.writestr('ones', self.data1)
|
||||
with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf:
|
||||
with zipf.open('ones', 'r') as r1:
|
||||
data1 = r1.read(500)
|
||||
with zipf.open('twos', 'w') as w1:
|
||||
w1.write(self.data2)
|
||||
data1 += r1.read()
|
||||
self.assertEqual(data1, self.data1)
|
||||
with zipfile.ZipFile(TESTFN2) as zipf:
|
||||
self.assertEqual(zipf.read('twos'), self.data2)
|
||||
|
||||
def tearDown(self):
|
||||
unlink(TESTFN2)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue