mirror of
https://github.com/python/cpython.git
synced 2025-08-16 14:50:43 +00:00
bpo-37772: fix zipfile.Path.iterdir() outputs (GH-15170) (#15461)
* fix Path._add_implied_dirs to include all implied directories
* fix Path._add_implied_dirs to include all implied directories
* Optimize code by using sets instead of lists
* 📜🤖 Added by blurb_it.
* fix Path._add_implied_dirs to include all implied directories
* Optimize code by using sets instead of lists
* 📜🤖 Added by blurb_it.
* Add tests to zipfile.Path.iterdir() fix
* Update test for zipfile.Path.iterdir()
* remove whitespace from test file
* Rewrite NEWS blurb to describe the user-facing impact and avoid implementation details.
* remove redundant [] within set comprehension
* Update to use unique_everseen to maintain order and other suggestions in review
* remove whitespace and add back add_dirs in tests
* Add new standalone function parents using posixpath to get parents of a directory
* removing whitespace (sorry)
* Remove import pathlib from zipfile.py
* Rewrite _parents as a slice on a generator of the ancestry of a path.
* Remove check for '.' and '/', now that parents no longer returns those.
* Separate calculation of implied dirs from adding those
* Re-use _implied_dirs in tests for generating zipfile with dir entries.
* Replace three fixtures (abcde, abcdef, abde) with one representative example alpharep.
* Simplify implementation of _implied_dirs by collapsing the generation of parent directories for each name.
(cherry picked from commit a4e2991bdc
)
Co-authored-by: shireenrao <shireenrao@gmail.com>
This commit is contained in:
parent
ed146b52a3
commit
c410f381bf
3 changed files with 135 additions and 52 deletions
|
@ -2397,37 +2397,49 @@ class CommandLineTest(unittest.TestCase):
|
||||||
consume = tuple
|
consume = tuple
|
||||||
|
|
||||||
|
|
||||||
def add_dirs(zipfile):
|
def add_dirs(zf):
|
||||||
"""
|
"""
|
||||||
Given a writable zipfile, inject directory entries for
|
Given a writable zip file zf, inject directory entries for
|
||||||
any directories implied by the presence of children.
|
any directories implied by the presence of children.
|
||||||
"""
|
"""
|
||||||
names = zipfile.namelist()
|
for name in zipfile.Path._implied_dirs(zf.namelist()):
|
||||||
consume(
|
zf.writestr(name, b"")
|
||||||
zipfile.writestr(name + "/", b"")
|
return zf
|
||||||
for name in map(posixpath.dirname, names)
|
|
||||||
if name and name + "/" not in names
|
|
||||||
)
|
|
||||||
return zipfile
|
|
||||||
|
|
||||||
|
|
||||||
def build_abcde_files():
|
def build_alpharep_fixture():
|
||||||
"""
|
"""
|
||||||
Create a zip file with this structure:
|
Create a zip file with this structure:
|
||||||
|
|
||||||
.
|
.
|
||||||
├── a.txt
|
├── a.txt
|
||||||
└── b
|
├── b
|
||||||
├── c.txt
|
│ ├── c.txt
|
||||||
└── d
|
│ ├── d
|
||||||
└── e.txt
|
│ │ └── e.txt
|
||||||
|
│ └── f.txt
|
||||||
|
└── g
|
||||||
|
└── h
|
||||||
|
└── i.txt
|
||||||
|
|
||||||
|
This fixture has the following key characteristics:
|
||||||
|
|
||||||
|
- a file at the root (a)
|
||||||
|
- a file two levels deep (b/d/e)
|
||||||
|
- multiple files in a directory (b/c, b/f)
|
||||||
|
- a directory containing only a directory (g/h)
|
||||||
|
|
||||||
|
"alpha" because it uses alphabet
|
||||||
|
"rep" because it's a representative example
|
||||||
"""
|
"""
|
||||||
data = io.BytesIO()
|
data = io.BytesIO()
|
||||||
zf = zipfile.ZipFile(data, "w")
|
zf = zipfile.ZipFile(data, "w")
|
||||||
zf.writestr("a.txt", b"content of a")
|
zf.writestr("a.txt", b"content of a")
|
||||||
zf.writestr("b/c.txt", b"content of c")
|
zf.writestr("b/c.txt", b"content of c")
|
||||||
zf.writestr("b/d/e.txt", b"content of e")
|
zf.writestr("b/d/e.txt", b"content of e")
|
||||||
zf.filename = "abcde.zip"
|
zf.writestr("b/f.txt", b"content of f")
|
||||||
|
zf.writestr("g/h/i.txt", b"content of i")
|
||||||
|
zf.filename = "alpharep.zip"
|
||||||
return zf
|
return zf
|
||||||
|
|
||||||
|
|
||||||
|
@ -2436,60 +2448,64 @@ class TestPath(unittest.TestCase):
|
||||||
self.fixtures = contextlib.ExitStack()
|
self.fixtures = contextlib.ExitStack()
|
||||||
self.addCleanup(self.fixtures.close)
|
self.addCleanup(self.fixtures.close)
|
||||||
|
|
||||||
def zipfile_abcde(self):
|
def zipfile_alpharep(self):
|
||||||
with self.subTest():
|
with self.subTest():
|
||||||
yield build_abcde_files()
|
yield build_alpharep_fixture()
|
||||||
with self.subTest():
|
with self.subTest():
|
||||||
yield add_dirs(build_abcde_files())
|
yield add_dirs(build_alpharep_fixture())
|
||||||
|
|
||||||
def zipfile_ondisk(self):
|
def zipfile_ondisk(self):
|
||||||
tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir()))
|
tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir()))
|
||||||
for zipfile_abcde in self.zipfile_abcde():
|
for alpharep in self.zipfile_alpharep():
|
||||||
buffer = zipfile_abcde.fp
|
buffer = alpharep.fp
|
||||||
zipfile_abcde.close()
|
alpharep.close()
|
||||||
path = tmpdir / zipfile_abcde.filename
|
path = tmpdir / alpharep.filename
|
||||||
with path.open("wb") as strm:
|
with path.open("wb") as strm:
|
||||||
strm.write(buffer.getvalue())
|
strm.write(buffer.getvalue())
|
||||||
yield path
|
yield path
|
||||||
|
|
||||||
def test_iterdir_istype(self):
|
def test_iterdir_and_types(self):
|
||||||
for zipfile_abcde in self.zipfile_abcde():
|
for alpharep in self.zipfile_alpharep():
|
||||||
root = zipfile.Path(zipfile_abcde)
|
root = zipfile.Path(alpharep)
|
||||||
assert root.is_dir()
|
assert root.is_dir()
|
||||||
a, b = root.iterdir()
|
a, b, g = root.iterdir()
|
||||||
assert a.is_file()
|
assert a.is_file()
|
||||||
assert b.is_dir()
|
assert b.is_dir()
|
||||||
c, d = b.iterdir()
|
assert g.is_dir()
|
||||||
assert c.is_file()
|
c, f, d = b.iterdir()
|
||||||
|
assert c.is_file() and f.is_file()
|
||||||
e, = d.iterdir()
|
e, = d.iterdir()
|
||||||
assert e.is_file()
|
assert e.is_file()
|
||||||
|
h, = g.iterdir()
|
||||||
|
i, = h.iterdir()
|
||||||
|
assert i.is_file()
|
||||||
|
|
||||||
def test_open(self):
|
def test_open(self):
|
||||||
for zipfile_abcde in self.zipfile_abcde():
|
for alpharep in self.zipfile_alpharep():
|
||||||
root = zipfile.Path(zipfile_abcde)
|
root = zipfile.Path(alpharep)
|
||||||
a, b = root.iterdir()
|
a, b, g = root.iterdir()
|
||||||
with a.open() as strm:
|
with a.open() as strm:
|
||||||
data = strm.read()
|
data = strm.read()
|
||||||
assert data == b"content of a"
|
assert data == b"content of a"
|
||||||
|
|
||||||
def test_read(self):
|
def test_read(self):
|
||||||
for zipfile_abcde in self.zipfile_abcde():
|
for alpharep in self.zipfile_alpharep():
|
||||||
root = zipfile.Path(zipfile_abcde)
|
root = zipfile.Path(alpharep)
|
||||||
a, b = root.iterdir()
|
a, b, g = root.iterdir()
|
||||||
assert a.read_text() == "content of a"
|
assert a.read_text() == "content of a"
|
||||||
assert a.read_bytes() == b"content of a"
|
assert a.read_bytes() == b"content of a"
|
||||||
|
|
||||||
def test_joinpath(self):
|
def test_joinpath(self):
|
||||||
for zipfile_abcde in self.zipfile_abcde():
|
for alpharep in self.zipfile_alpharep():
|
||||||
root = zipfile.Path(zipfile_abcde)
|
root = zipfile.Path(alpharep)
|
||||||
a = root.joinpath("a")
|
a = root.joinpath("a")
|
||||||
assert a.is_file()
|
assert a.is_file()
|
||||||
e = root.joinpath("b").joinpath("d").joinpath("e.txt")
|
e = root.joinpath("b").joinpath("d").joinpath("e.txt")
|
||||||
assert e.read_text() == "content of e"
|
assert e.read_text() == "content of e"
|
||||||
|
|
||||||
def test_traverse_truediv(self):
|
def test_traverse_truediv(self):
|
||||||
for zipfile_abcde in self.zipfile_abcde():
|
for alpharep in self.zipfile_alpharep():
|
||||||
root = zipfile.Path(zipfile_abcde)
|
root = zipfile.Path(alpharep)
|
||||||
a = root / "a"
|
a = root / "a"
|
||||||
assert a.is_file()
|
assert a.is_file()
|
||||||
e = root / "b" / "d" / "e.txt"
|
e = root / "b" / "d" / "e.txt"
|
||||||
|
@ -2504,26 +2520,27 @@ class TestPath(unittest.TestCase):
|
||||||
zipfile.Path(pathlike)
|
zipfile.Path(pathlike)
|
||||||
|
|
||||||
def test_traverse_pathlike(self):
|
def test_traverse_pathlike(self):
|
||||||
for zipfile_abcde in self.zipfile_abcde():
|
for alpharep in self.zipfile_alpharep():
|
||||||
root = zipfile.Path(zipfile_abcde)
|
root = zipfile.Path(alpharep)
|
||||||
root / pathlib.Path("a")
|
root / pathlib.Path("a")
|
||||||
|
|
||||||
def test_parent(self):
|
def test_parent(self):
|
||||||
for zipfile_abcde in self.zipfile_abcde():
|
for alpharep in self.zipfile_alpharep():
|
||||||
root = zipfile.Path(zipfile_abcde)
|
root = zipfile.Path(alpharep)
|
||||||
assert (root / 'a').parent.at == ''
|
assert (root / 'a').parent.at == ''
|
||||||
assert (root / 'a' / 'b').parent.at == 'a/'
|
assert (root / 'a' / 'b').parent.at == 'a/'
|
||||||
|
|
||||||
def test_dir_parent(self):
|
def test_dir_parent(self):
|
||||||
for zipfile_abcde in self.zipfile_abcde():
|
for alpharep in self.zipfile_alpharep():
|
||||||
root = zipfile.Path(zipfile_abcde)
|
root = zipfile.Path(alpharep)
|
||||||
assert (root / 'b').parent.at == ''
|
assert (root / 'b').parent.at == ''
|
||||||
assert (root / 'b/').parent.at == ''
|
assert (root / 'b/').parent.at == ''
|
||||||
|
|
||||||
def test_missing_dir_parent(self):
|
def test_missing_dir_parent(self):
|
||||||
for zipfile_abcde in self.zipfile_abcde():
|
for alpharep in self.zipfile_alpharep():
|
||||||
root = zipfile.Path(zipfile_abcde)
|
root = zipfile.Path(alpharep)
|
||||||
assert (root / 'missing dir/').parent.at == ''
|
assert (root / 'missing dir/').parent.at == ''
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
|
@ -7,6 +7,7 @@ import binascii
|
||||||
import functools
|
import functools
|
||||||
import importlib.util
|
import importlib.util
|
||||||
import io
|
import io
|
||||||
|
import itertools
|
||||||
import os
|
import os
|
||||||
import posixpath
|
import posixpath
|
||||||
import shutil
|
import shutil
|
||||||
|
@ -2104,6 +2105,65 @@ class PyZipFile(ZipFile):
|
||||||
return (fname, archivename)
|
return (fname, archivename)
|
||||||
|
|
||||||
|
|
||||||
|
def _unique_everseen(iterable, key=None):
|
||||||
|
"List unique elements, preserving order. Remember all elements ever seen."
|
||||||
|
# unique_everseen('AAAABBBCCDAABBB') --> A B C D
|
||||||
|
# unique_everseen('ABBCcAD', str.lower) --> A B C D
|
||||||
|
seen = set()
|
||||||
|
seen_add = seen.add
|
||||||
|
if key is None:
|
||||||
|
for element in itertools.filterfalse(seen.__contains__, iterable):
|
||||||
|
seen_add(element)
|
||||||
|
yield element
|
||||||
|
else:
|
||||||
|
for element in iterable:
|
||||||
|
k = key(element)
|
||||||
|
if k not in seen:
|
||||||
|
seen_add(k)
|
||||||
|
yield element
|
||||||
|
|
||||||
|
|
||||||
|
def _parents(path):
|
||||||
|
"""
|
||||||
|
Given a path with elements separated by
|
||||||
|
posixpath.sep, generate all parents of that path.
|
||||||
|
|
||||||
|
>>> list(_parents('b/d'))
|
||||||
|
['b']
|
||||||
|
>>> list(_parents('/b/d/'))
|
||||||
|
['/b']
|
||||||
|
>>> list(_parents('b/d/f/'))
|
||||||
|
['b/d', 'b']
|
||||||
|
>>> list(_parents('b'))
|
||||||
|
[]
|
||||||
|
>>> list(_parents(''))
|
||||||
|
[]
|
||||||
|
"""
|
||||||
|
return itertools.islice(_ancestry(path), 1, None)
|
||||||
|
|
||||||
|
|
||||||
|
def _ancestry(path):
|
||||||
|
"""
|
||||||
|
Given a path with elements separated by
|
||||||
|
posixpath.sep, generate all elements of that path
|
||||||
|
|
||||||
|
>>> list(_ancestry('b/d'))
|
||||||
|
['b/d', 'b']
|
||||||
|
>>> list(_ancestry('/b/d/'))
|
||||||
|
['/b/d', '/b']
|
||||||
|
>>> list(_ancestry('b/d/f/'))
|
||||||
|
['b/d/f', 'b/d', 'b']
|
||||||
|
>>> list(_ancestry('b'))
|
||||||
|
['b']
|
||||||
|
>>> list(_ancestry(''))
|
||||||
|
[]
|
||||||
|
"""
|
||||||
|
path = path.rstrip(posixpath.sep)
|
||||||
|
while path and path != posixpath.sep:
|
||||||
|
yield path
|
||||||
|
path, tail = posixpath.split(path)
|
||||||
|
|
||||||
|
|
||||||
class Path:
|
class Path:
|
||||||
"""
|
"""
|
||||||
A pathlib-compatible interface for zip files.
|
A pathlib-compatible interface for zip files.
|
||||||
|
@ -2227,12 +2287,17 @@ class Path:
|
||||||
__truediv__ = joinpath
|
__truediv__ = joinpath
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _add_implied_dirs(names):
|
def _implied_dirs(names):
|
||||||
return names + [
|
return _unique_everseen(
|
||||||
name + "/"
|
parent + "/"
|
||||||
for name in map(posixpath.dirname, names)
|
for name in names
|
||||||
if name and name + "/" not in names
|
for parent in _parents(name)
|
||||||
]
|
if parent + "/" not in names
|
||||||
|
)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _add_implied_dirs(cls, names):
|
||||||
|
return names + list(cls._implied_dirs(names))
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def parent(self):
|
def parent(self):
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
In ``zipfile.Path``, when adding implicit dirs, ensure that ancestral directories are added and that duplicates are excluded.
|
Loading…
Add table
Add a link
Reference in a new issue