bpo-39595: Improve zipfile.Path performance (#18406)

* Improve zipfile.Path performance on zipfiles with a large number of entries.

* 📜🤖 Added by blurb_it.

* Add bpo to blurb

* Sync with importlib_metadata 1.5 (6fe70ca)

* Update blurb.

* Remove compatibility code

* Add stubs module, omitted from earlier commit

Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
This commit is contained in:
Jason R. Coombs 2020-02-11 21:58:47 -05:00 committed by GitHub
parent e6be9b59a9
commit e5bd73632e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 254 additions and 68 deletions

View file

@ -16,6 +16,8 @@ import struct
import sys
import threading
import time
import contextlib
from collections import OrderedDict
try:
import zlib # We may need its compression method
@ -2159,6 +2161,79 @@ def _ancestry(path):
path, tail = posixpath.split(path)
class CompleteDirs(ZipFile):
"""
A ZipFile subclass that ensures that implied directories
are always included in the namelist.
"""
@staticmethod
def _implied_dirs(names):
parents = itertools.chain.from_iterable(map(_parents, names))
# Deduplicate entries in original order
implied_dirs = OrderedDict.fromkeys(
p + posixpath.sep for p in parents
# Cast names to a set for O(1) lookups
if p + posixpath.sep not in set(names)
)
return implied_dirs
def namelist(self):
names = super(CompleteDirs, self).namelist()
return names + list(self._implied_dirs(names))
def _name_set(self):
return set(self.namelist())
def resolve_dir(self, name):
"""
If the name represents a directory, return that name
as a directory (with the trailing slash).
"""
names = self._name_set()
dirname = name + '/'
dir_match = name not in names and dirname in names
return dirname if dir_match else name
@classmethod
def make(cls, source):
"""
Given a source (filename or zipfile), return an
appropriate CompleteDirs subclass.
"""
if isinstance(source, CompleteDirs):
return source
if not isinstance(source, ZipFile):
return cls(source)
# Only allow for FastPath when supplied zipfile is read-only
if 'r' not in source.mode:
cls = CompleteDirs
res = cls.__new__(cls)
vars(res).update(vars(source))
return res
class FastLookup(CompleteDirs):
"""
ZipFile subclass to ensure implicit
dirs exist and are resolved rapidly.
"""
def namelist(self):
with contextlib.suppress(AttributeError):
return self.__names
self.__names = super(FastLookup, self).namelist()
return self.__names
def _name_set(self):
with contextlib.suppress(AttributeError):
return self.__lookup
self.__lookup = super(FastLookup, self)._name_set()
return self.__lookup
class Path:
"""
A pathlib-compatible interface for zip files.
@ -2227,7 +2302,7 @@ class Path:
__repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})"
def __init__(self, root, at=""):
self.root = root if isinstance(root, ZipFile) else ZipFile(root)
self.root = FastLookup.make(root)
self.at = at
@property
@ -2259,12 +2334,12 @@ class Path:
return not self.is_dir()
def exists(self):
return self.at in self._names()
return self.at in self.root._name_set()
def iterdir(self):
if not self.is_dir():
raise ValueError("Can't listdir a file")
subs = map(self._next, self._names())
subs = map(self._next, self.root.namelist())
return filter(self._is_child, subs)
def __str__(self):
@ -2275,25 +2350,10 @@ class Path:
def joinpath(self, add):
next = posixpath.join(self.at, add)
next_dir = posixpath.join(self.at, add, "")
names = self._names()
return self._next(next_dir if next not in names and next_dir in names else next)
return self._next(self.root.resolve_dir(next))
__truediv__ = joinpath
@staticmethod
def _implied_dirs(names):
return _unique_everseen(
parent + "/"
for name in names
for parent in _parents(name)
if parent + "/" not in names
)
@classmethod
def _add_implied_dirs(cls, names):
return names + list(cls._implied_dirs(names))
@property
def parent(self):
parent_at = posixpath.dirname(self.at.rstrip('/'))
@ -2301,9 +2361,6 @@ class Path:
parent_at += '/'
return self._next(parent_at)
def _names(self):
return self._add_implied_dirs(self.root.namelist())
def main(args=None):
import argparse
@ -2365,5 +2422,6 @@ def main(args=None):
zippath = ''
addToZip(zf, path, zippath)
if __name__ == "__main__":
main()