bpo-44771: Apply changes from importlib_resources 5.2.1 (GH-27436)

* bpo-44771: Apply changes from importlib_resources@3b24bd6307

* Add blurb

* Exclude namespacedata01 from eol conversion.
This commit is contained in:
Jason R. Coombs 2021-07-29 21:05:05 -04:00 committed by GitHub
parent 851cca8c22
commit aaa83cdfab
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 716 additions and 375 deletions

View file

@ -1,4 +1,5 @@
from contextlib import suppress
from io import TextIOWrapper
from . import abc
@ -25,32 +26,119 @@ class TraversableResourcesLoader:
self.spec = spec
def get_resource_reader(self, name):
return DegenerateFiles(self.spec)._native()
return CompatibilityFiles(self.spec)._native()
class DegenerateFiles:
def _io_wrapper(file, mode='r', *args, **kwargs):
if mode == 'r':
return TextIOWrapper(file, *args, **kwargs)
elif mode == 'rb':
return file
raise ValueError(
"Invalid mode value '{}', only 'r' and 'rb' are supported".format(mode)
)
class CompatibilityFiles:
"""
Adapter for an existing or non-existant resource reader
to provide a degenerate .files().
to provide a compability .files().
"""
class Path(abc.Traversable):
class SpecPath(abc.Traversable):
"""
Path tied to a module spec.
Can be read and exposes the resource reader children.
"""
def __init__(self, spec, reader):
self._spec = spec
self._reader = reader
def iterdir(self):
if not self._reader:
return iter(())
return iter(
CompatibilityFiles.ChildPath(self._reader, path)
for path in self._reader.contents()
)
def is_file(self):
return False
is_dir = is_file
def joinpath(self, other):
if not self._reader:
return CompatibilityFiles.OrphanPath(other)
return CompatibilityFiles.ChildPath(self._reader, other)
@property
def name(self):
return self._spec.name
def open(self, mode='r', *args, **kwargs):
return _io_wrapper(self._reader.open_resource(None), mode, *args, **kwargs)
class ChildPath(abc.Traversable):
"""
Path tied to a resource reader child.
Can be read but doesn't expose any meaningfull children.
"""
def __init__(self, reader, name):
self._reader = reader
self._name = name
def iterdir(self):
return iter(())
def is_dir(self):
return False
def is_file(self):
return self._reader.is_resource(self.name)
is_file = exists = is_dir # type: ignore
def is_dir(self):
return not self.is_file()
def joinpath(self, other):
return DegenerateFiles.Path()
return CompatibilityFiles.OrphanPath(self.name, other)
@property
def name(self):
return ''
return self._name
def open(self):
raise ValueError()
def open(self, mode='r', *args, **kwargs):
return _io_wrapper(
self._reader.open_resource(self.name), mode, *args, **kwargs
)
class OrphanPath(abc.Traversable):
"""
Orphan path, not tied to a module spec or resource reader.
Can't be read and doesn't expose any meaningful children.
"""
def __init__(self, *path_parts):
if len(path_parts) < 1:
raise ValueError('Need at least one path part to construct a path')
self._path = path_parts
def iterdir(self):
return iter(())
def is_file(self):
return False
is_dir = is_file
def joinpath(self, other):
return CompatibilityFiles.OrphanPath(*self._path, other)
@property
def name(self):
return self._path[-1]
def open(self, mode='r', *args, **kwargs):
raise FileNotFoundError("Can't open orphan path")
def __init__(self, spec):
self.spec = spec
@ -71,7 +159,7 @@ class DegenerateFiles:
return getattr(self._reader, attr)
def files(self):
return DegenerateFiles.Path()
return CompatibilityFiles.SpecPath(self.spec, self._reader)
def wrap_spec(package):

View file

@ -12,6 +12,7 @@ from .abc import ResourceReader, Traversable
from ._adapters import wrap_spec
Package = Union[types.ModuleType, str]
Resource = Union[str, os.PathLike]
def files(package):
@ -93,7 +94,7 @@ def _tempfile(reader, suffix=''):
finally:
try:
os.remove(raw_path)
except FileNotFoundError:
except (FileNotFoundError, PermissionError):
pass

View file

@ -0,0 +1,19 @@
from itertools import filterfalse
def unique_everseen(iterable, key=None):
"List unique elements, preserving order. Remember all elements ever seen."
# unique_everseen('AAAABBBCCDAABBB') --> A B C D
# unique_everseen('ABBCcAD', str.lower) --> A B C D
seen = set()
seen_add = seen.add
if key is None:
for element in filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element

84
Lib/importlib/_legacy.py Normal file
View file

@ -0,0 +1,84 @@
import os
import pathlib
import types
from typing import Union, Iterable, ContextManager, BinaryIO, TextIO
from . import _common
Package = Union[types.ModuleType, str]
Resource = Union[str, os.PathLike]
def open_binary(package: Package, resource: Resource) -> BinaryIO:
"""Return a file-like object opened for binary reading of the resource."""
return (_common.files(package) / _common.normalize_path(resource)).open('rb')
def read_binary(package: Package, resource: Resource) -> bytes:
"""Return the binary contents of the resource."""
return (_common.files(package) / _common.normalize_path(resource)).read_bytes()
def open_text(
package: Package,
resource: Resource,
encoding: str = 'utf-8',
errors: str = 'strict',
) -> TextIO:
"""Return a file-like object opened for text reading of the resource."""
return (_common.files(package) / _common.normalize_path(resource)).open(
'r', encoding=encoding, errors=errors
)
def read_text(
package: Package,
resource: Resource,
encoding: str = 'utf-8',
errors: str = 'strict',
) -> str:
"""Return the decoded string of the resource.
The decoding-related arguments have the same semantics as those of
bytes.decode().
"""
with open_text(package, resource, encoding, errors) as fp:
return fp.read()
def contents(package: Package) -> Iterable[str]:
"""Return an iterable of entries in `package`.
Note that not all entries are resources. Specifically, directories are
not considered resources. Use `is_resource()` on each entry returned here
to check if it is a resource or not.
"""
return [path.name for path in _common.files(package).iterdir()]
def is_resource(package: Package, name: str) -> bool:
"""True if `name` is a resource inside `package`.
Directories are *not* resources.
"""
resource = _common.normalize_path(name)
return any(
traversable.name == resource and traversable.is_file()
for traversable in _common.files(package).iterdir()
)
def path(
package: Package,
resource: Resource,
) -> ContextManager[pathlib.Path]:
"""A context manager providing a file path object to the resource.
If the resource does not already exist on its own on the file system,
a temporary file will be created. If the file was created, the file
will be deleted upon exiting the context manager (no exception is
raised if the file was deleted prior to the context manager
exiting).
"""
return _common.as_file(_common.files(package) / _common.normalize_path(resource))

View file

@ -1,8 +1,12 @@
import collections
import zipfile
import operator
import pathlib
import zipfile
from . import abc
from ._itertools import unique_everseen
def remove_duplicates(items):
return iter(collections.OrderedDict.fromkeys(items))
@ -63,13 +67,8 @@ class MultiplexedPath(abc.Traversable):
raise NotADirectoryError('MultiplexedPath only supports directories')
def iterdir(self):
visited = []
for path in self._paths:
for file in path.iterdir():
if file.name in visited:
continue
visited.append(file.name)
yield file
files = (file for path in self._paths for file in path.iterdir())
return unique_everseen(files, key=operator.attrgetter('name'))
def read_bytes(self):
raise FileNotFoundError(f'{self} is not a file')

View file

@ -1,19 +1,23 @@
import os
import io
"""Read resources contained within a package."""
from . import _common
from ._common import as_file, files
from .abc import ResourceReader
from contextlib import suppress
from importlib.abc import ResourceLoader
from importlib.machinery import ModuleSpec
from io import BytesIO, TextIOWrapper
from pathlib import Path
from types import ModuleType
from typing import ContextManager, Iterable, Union
from typing import cast, BinaryIO, TextIO
from collections.abc import Sequence
from functools import singledispatch
from ._common import (
as_file,
files,
Package,
Resource,
)
from ._legacy import (
contents,
open_binary,
read_binary,
open_text,
read_text,
is_resource,
path,
)
from importlib.abc import ResourceReader
__all__ = [
@ -30,155 +34,3 @@ __all__ = [
'read_binary',
'read_text',
]
Package = Union[str, ModuleType]
Resource = Union[str, os.PathLike]
def open_binary(package: Package, resource: Resource) -> BinaryIO:
"""Return a file-like object opened for binary reading of the resource."""
resource = _common.normalize_path(resource)
package = _common.get_package(package)
reader = _common.get_resource_reader(package)
if reader is not None:
return reader.open_resource(resource)
spec = cast(ModuleSpec, package.__spec__)
# Using pathlib doesn't work well here due to the lack of 'strict'
# argument for pathlib.Path.resolve() prior to Python 3.6.
if spec.submodule_search_locations is not None:
paths = spec.submodule_search_locations
elif spec.origin is not None:
paths = [os.path.dirname(os.path.abspath(spec.origin))]
for package_path in paths:
full_path = os.path.join(package_path, resource)
try:
return open(full_path, mode='rb')
except OSError:
# Just assume the loader is a resource loader; all the relevant
# importlib.machinery loaders are and an AttributeError for
# get_data() will make it clear what is needed from the loader.
loader = cast(ResourceLoader, spec.loader)
data = None
if hasattr(spec.loader, 'get_data'):
with suppress(OSError):
data = loader.get_data(full_path)
if data is not None:
return BytesIO(data)
raise FileNotFoundError(f'{resource!r} resource not found in {spec.name!r}')
def open_text(
package: Package,
resource: Resource,
encoding: str = 'utf-8',
errors: str = 'strict',
) -> TextIO:
"""Return a file-like object opened for text reading of the resource."""
return TextIOWrapper(
open_binary(package, resource), encoding=encoding, errors=errors
)
def read_binary(package: Package, resource: Resource) -> bytes:
"""Return the binary contents of the resource."""
with open_binary(package, resource) as fp:
return fp.read()
def read_text(
package: Package,
resource: Resource,
encoding: str = 'utf-8',
errors: str = 'strict',
) -> str:
"""Return the decoded string of the resource.
The decoding-related arguments have the same semantics as those of
bytes.decode().
"""
with open_text(package, resource, encoding, errors) as fp:
return fp.read()
def path(
package: Package,
resource: Resource,
) -> 'ContextManager[Path]':
"""A context manager providing a file path object to the resource.
If the resource does not already exist on its own on the file system,
a temporary file will be created. If the file was created, the file
will be deleted upon exiting the context manager (no exception is
raised if the file was deleted prior to the context manager
exiting).
"""
reader = _common.get_resource_reader(_common.get_package(package))
return (
_path_from_reader(reader, _common.normalize_path(resource))
if reader
else _common.as_file(
_common.files(package).joinpath(_common.normalize_path(resource))
)
)
def _path_from_reader(reader, resource):
return _path_from_resource_path(reader, resource) or _path_from_open_resource(
reader, resource
)
def _path_from_resource_path(reader, resource):
with suppress(FileNotFoundError):
return Path(reader.resource_path(resource))
def _path_from_open_resource(reader, resource):
saved = io.BytesIO(reader.open_resource(resource).read())
return _common._tempfile(saved.read, suffix=resource)
def is_resource(package: Package, name: str) -> bool:
"""True if 'name' is a resource inside 'package'.
Directories are *not* resources.
"""
package = _common.get_package(package)
_common.normalize_path(name)
reader = _common.get_resource_reader(package)
if reader is not None:
return reader.is_resource(name)
package_contents = set(contents(package))
if name not in package_contents:
return False
return (_common.from_package(package) / name).is_file()
def contents(package: Package) -> Iterable[str]:
"""Return an iterable of entries in 'package'.
Note that not all entries are resources. Specifically, directories are
not considered resources. Use `is_resource()` on each entry returned here
to check if it is a resource or not.
"""
package = _common.get_package(package)
reader = _common.get_resource_reader(package)
if reader is not None:
return _ensure_sequence(reader.contents())
transversable = _common.from_package(package)
if transversable.is_dir():
return list(item.name for item in transversable.iterdir())
return []
@singledispatch
def _ensure_sequence(iterable):
return list(iterable)
@_ensure_sequence.register(Sequence)
def _(iterable):
return iterable

116
Lib/importlib/simple.py Normal file
View file

@ -0,0 +1,116 @@
"""
Interface adapters for low-level readers.
"""
import abc
import io
import itertools
from typing import BinaryIO, List
from .abc import Traversable, TraversableResources
class SimpleReader(abc.ABC):
"""
The minimum, low-level interface required from a resource
provider.
"""
@abc.abstractproperty
def package(self):
# type: () -> str
"""
The name of the package for which this reader loads resources.
"""
@abc.abstractmethod
def children(self):
# type: () -> List['SimpleReader']
"""
Obtain an iterable of SimpleReader for available
child containers (e.g. directories).
"""
@abc.abstractmethod
def resources(self):
# type: () -> List[str]
"""
Obtain available named resources for this virtual package.
"""
@abc.abstractmethod
def open_binary(self, resource):
# type: (str) -> BinaryIO
"""
Obtain a File-like for a named resource.
"""
@property
def name(self):
return self.package.split('.')[-1]
class ResourceHandle(Traversable):
"""
Handle to a named resource in a ResourceReader.
"""
def __init__(self, parent, name):
# type: (ResourceContainer, str) -> None
self.parent = parent
self.name = name # type: ignore
def is_file(self):
return True
def is_dir(self):
return False
def open(self, mode='r', *args, **kwargs):
stream = self.parent.reader.open_binary(self.name)
if 'b' not in mode:
stream = io.TextIOWrapper(*args, **kwargs)
return stream
def joinpath(self, name):
raise RuntimeError("Cannot traverse into a resource")
class ResourceContainer(Traversable):
"""
Traversable container for a package's resources via its reader.
"""
def __init__(self, reader):
# type: (SimpleReader) -> None
self.reader = reader
def is_dir(self):
return True
def is_file(self):
return False
def iterdir(self):
files = (ResourceHandle(self, name) for name in self.reader.resources)
dirs = map(ResourceContainer, self.reader.children())
return itertools.chain(files, dirs)
def open(self, *args, **kwargs):
raise IsADirectoryError()
def joinpath(self, name):
return next(
traversable for traversable in self.iterdir() if traversable.name == name
)
class TraversableReader(TraversableResources, SimpleReader):
"""
A TraversableResources based on SimpleReader. Resource providers
may derive from this class to provide the TraversableResources
interface by supplying the SimpleReader interface.
"""
def files(self):
return ResourceContainer(self)