mirror of
https://github.com/python/cpython.git
synced 2025-09-26 18:29:57 +00:00
bpo-4833: Add ZipFile.mkdir (GH-32160)
This commit is contained in:
parent
9e88b572fb
commit
050a8f94c6
4 changed files with 101 additions and 17 deletions
|
@ -478,6 +478,17 @@ ZipFile Objects
|
||||||
a closed ZipFile will raise a :exc:`ValueError`. Previously,
|
a closed ZipFile will raise a :exc:`ValueError`. Previously,
|
||||||
a :exc:`RuntimeError` was raised.
|
a :exc:`RuntimeError` was raised.
|
||||||
|
|
||||||
|
.. method:: ZipFile.mkdir(zinfo_or_directory, mode=511)
|
||||||
|
|
||||||
|
Create a directory inside the archive. If *zinfo_or_directory* is a string,
|
||||||
|
a directory is created inside the archive with the mode that is specified in
|
||||||
|
the *mode* argument. If, however, *zinfo_or_directory* is
|
||||||
|
a :class:`ZipInfo` instance then the *mode* argument is ignored.
|
||||||
|
|
||||||
|
The archive must be opened with mode ``'w'``, ``'x'`` or ``'a'``.
|
||||||
|
|
||||||
|
.. versionadded:: 3.11
|
||||||
|
|
||||||
|
|
||||||
The following data attributes are also available:
|
The following data attributes are also available:
|
||||||
|
|
||||||
|
|
|
@ -2637,6 +2637,59 @@ class TestWithDirectory(unittest.TestCase):
|
||||||
self.assertTrue(os.path.isdir(os.path.join(target, "x")))
|
self.assertTrue(os.path.isdir(os.path.join(target, "x")))
|
||||||
self.assertEqual(os.listdir(target), ["x"])
|
self.assertEqual(os.listdir(target), ["x"])
|
||||||
|
|
||||||
|
def test_mkdir(self):
|
||||||
|
with zipfile.ZipFile(TESTFN, "w") as zf:
|
||||||
|
zf.mkdir("directory")
|
||||||
|
zinfo = zf.filelist[0]
|
||||||
|
self.assertEqual(zinfo.filename, "directory/")
|
||||||
|
self.assertEqual(zinfo.external_attr, (0o40777 << 16) | 0x10)
|
||||||
|
|
||||||
|
zf.mkdir("directory2/")
|
||||||
|
zinfo = zf.filelist[1]
|
||||||
|
self.assertEqual(zinfo.filename, "directory2/")
|
||||||
|
self.assertEqual(zinfo.external_attr, (0o40777 << 16) | 0x10)
|
||||||
|
|
||||||
|
zf.mkdir("directory3", mode=0o777)
|
||||||
|
zinfo = zf.filelist[2]
|
||||||
|
self.assertEqual(zinfo.filename, "directory3/")
|
||||||
|
self.assertEqual(zinfo.external_attr, (0o40777 << 16) | 0x10)
|
||||||
|
|
||||||
|
old_zinfo = zipfile.ZipInfo("directory4/")
|
||||||
|
old_zinfo.external_attr = (0o40777 << 16) | 0x10
|
||||||
|
old_zinfo.CRC = 0
|
||||||
|
old_zinfo.file_size = 0
|
||||||
|
old_zinfo.compress_size = 0
|
||||||
|
zf.mkdir(old_zinfo)
|
||||||
|
new_zinfo = zf.filelist[3]
|
||||||
|
self.assertEqual(old_zinfo.filename, "directory4/")
|
||||||
|
self.assertEqual(old_zinfo.external_attr, new_zinfo.external_attr)
|
||||||
|
|
||||||
|
target = os.path.join(TESTFN2, "target")
|
||||||
|
os.mkdir(target)
|
||||||
|
zf.extractall(target)
|
||||||
|
self.assertEqual(set(os.listdir(target)), {"directory", "directory2", "directory3", "directory4"})
|
||||||
|
|
||||||
|
def test_create_directory_with_write(self):
|
||||||
|
with zipfile.ZipFile(TESTFN, "w") as zf:
|
||||||
|
zf.writestr(zipfile.ZipInfo('directory/'), '')
|
||||||
|
|
||||||
|
zinfo = zf.filelist[0]
|
||||||
|
self.assertEqual(zinfo.filename, "directory/")
|
||||||
|
|
||||||
|
directory = os.path.join(TESTFN2, "directory2")
|
||||||
|
os.mkdir(directory)
|
||||||
|
mode = os.stat(directory).st_mode
|
||||||
|
zf.write(directory, arcname="directory2/")
|
||||||
|
zinfo = zf.filelist[1]
|
||||||
|
self.assertEqual(zinfo.filename, "directory2/")
|
||||||
|
self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
|
||||||
|
|
||||||
|
target = os.path.join(TESTFN2, "target")
|
||||||
|
os.mkdir(target)
|
||||||
|
zf.extractall(target)
|
||||||
|
|
||||||
|
self.assertEqual(set(os.listdir(target)), {"directory", "directory2"})
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
rmtree(TESTFN2)
|
rmtree(TESTFN2)
|
||||||
if os.path.exists(TESTFN):
|
if os.path.exists(TESTFN):
|
||||||
|
|
|
@ -1772,6 +1772,7 @@ class ZipFile:
|
||||||
if zinfo.is_dir():
|
if zinfo.is_dir():
|
||||||
zinfo.compress_size = 0
|
zinfo.compress_size = 0
|
||||||
zinfo.CRC = 0
|
zinfo.CRC = 0
|
||||||
|
self.mkdir(zinfo)
|
||||||
else:
|
else:
|
||||||
if compress_type is not None:
|
if compress_type is not None:
|
||||||
zinfo.compress_type = compress_type
|
zinfo.compress_type = compress_type
|
||||||
|
@ -1783,23 +1784,6 @@ class ZipFile:
|
||||||
else:
|
else:
|
||||||
zinfo._compresslevel = self.compresslevel
|
zinfo._compresslevel = self.compresslevel
|
||||||
|
|
||||||
if zinfo.is_dir():
|
|
||||||
with self._lock:
|
|
||||||
if self._seekable:
|
|
||||||
self.fp.seek(self.start_dir)
|
|
||||||
zinfo.header_offset = self.fp.tell() # Start of header bytes
|
|
||||||
if zinfo.compress_type == ZIP_LZMA:
|
|
||||||
# Compressed data includes an end-of-stream (EOS) marker
|
|
||||||
zinfo.flag_bits |= _MASK_COMPRESS_OPTION_1
|
|
||||||
|
|
||||||
self._writecheck(zinfo)
|
|
||||||
self._didModify = True
|
|
||||||
|
|
||||||
self.filelist.append(zinfo)
|
|
||||||
self.NameToInfo[zinfo.filename] = zinfo
|
|
||||||
self.fp.write(zinfo.FileHeader(False))
|
|
||||||
self.start_dir = self.fp.tell()
|
|
||||||
else:
|
|
||||||
with open(filename, "rb") as src, self.open(zinfo, 'w') as dest:
|
with open(filename, "rb") as src, self.open(zinfo, 'w') as dest:
|
||||||
shutil.copyfileobj(src, dest, 1024*8)
|
shutil.copyfileobj(src, dest, 1024*8)
|
||||||
|
|
||||||
|
@ -1844,6 +1828,41 @@ class ZipFile:
|
||||||
with self.open(zinfo, mode='w') as dest:
|
with self.open(zinfo, mode='w') as dest:
|
||||||
dest.write(data)
|
dest.write(data)
|
||||||
|
|
||||||
|
def mkdir(self, zinfo_or_directory_name, mode=511):
|
||||||
|
"""Creates a directory inside the zip archive."""
|
||||||
|
if isinstance(zinfo_or_directory_name, ZipInfo):
|
||||||
|
zinfo = zinfo_or_directory_name
|
||||||
|
if not zinfo.is_dir():
|
||||||
|
raise ValueError("The given ZipInfo does not describe a directory")
|
||||||
|
elif isinstance(zinfo_or_directory_name, str):
|
||||||
|
directory_name = zinfo_or_directory_name
|
||||||
|
if not directory_name.endswith("/"):
|
||||||
|
directory_name += "/"
|
||||||
|
zinfo = ZipInfo(directory_name)
|
||||||
|
zinfo.compress_size = 0
|
||||||
|
zinfo.CRC = 0
|
||||||
|
zinfo.external_attr = ((0o40000 | mode) & 0xFFFF) << 16
|
||||||
|
zinfo.file_size = 0
|
||||||
|
zinfo.external_attr |= 0x10
|
||||||
|
else:
|
||||||
|
raise TypeError("Expected type str or ZipInfo")
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
if self._seekable:
|
||||||
|
self.fp.seek(self.start_dir)
|
||||||
|
zinfo.header_offset = self.fp.tell() # Start of header bytes
|
||||||
|
if zinfo.compress_type == ZIP_LZMA:
|
||||||
|
# Compressed data includes an end-of-stream (EOS) marker
|
||||||
|
zinfo.flag_bits |= _MASK_COMPRESS_OPTION_1
|
||||||
|
|
||||||
|
self._writecheck(zinfo)
|
||||||
|
self._didModify = True
|
||||||
|
|
||||||
|
self.filelist.append(zinfo)
|
||||||
|
self.NameToInfo[zinfo.filename] = zinfo
|
||||||
|
self.fp.write(zinfo.FileHeader(False))
|
||||||
|
self.start_dir = self.fp.tell()
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
"""Call the "close()" method in case the user forgot."""
|
"""Call the "close()" method in case the user forgot."""
|
||||||
self.close()
|
self.close()
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Add :meth:`ZipFile.mkdir`
|
Loading…
Add table
Add a link
Reference in a new issue