bpo-46118: Move importlib.resources to its own package. (#30176)

* bpo-46118: Move importlib.resources to its own package.

* Expand compatibility shims with documentation and explicit imports.
This commit is contained in:
Jason R. Coombs 2021-12-30 21:00:48 -05:00 committed by GitHub
parent 2cf7d02b99
commit e712a5b277
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 409 additions and 368 deletions

View file

@ -0,0 +1,36 @@
"""Read resources contained within a package."""
from ._common import (
as_file,
files,
Package,
)
from ._legacy import (
contents,
open_binary,
read_binary,
open_text,
read_text,
is_resource,
path,
Resource,
)
from .abc import ResourceReader
__all__ = [
'Package',
'Resource',
'ResourceReader',
'as_file',
'contents',
'files',
'is_resource',
'open_binary',
'open_text',
'path',
'read_binary',
'read_text',
]

View file

@ -0,0 +1,170 @@
from contextlib import suppress
from io import TextIOWrapper
from . import abc
class SpecLoaderAdapter:
"""
Adapt a package spec to adapt the underlying loader.
"""
def __init__(self, spec, adapter=lambda spec: spec.loader):
self.spec = spec
self.loader = adapter(spec)
def __getattr__(self, name):
return getattr(self.spec, name)
class TraversableResourcesLoader:
"""
Adapt a loader to provide TraversableResources.
"""
def __init__(self, spec):
self.spec = spec
def get_resource_reader(self, name):
return CompatibilityFiles(self.spec)._native()
def _io_wrapper(file, mode='r', *args, **kwargs):
if mode == 'r':
return TextIOWrapper(file, *args, **kwargs)
elif mode == 'rb':
return file
raise ValueError(
"Invalid mode value '{}', only 'r' and 'rb' are supported".format(mode)
)
class CompatibilityFiles:
"""
Adapter for an existing or non-existent resource reader
to provide a compatibility .files().
"""
class SpecPath(abc.Traversable):
"""
Path tied to a module spec.
Can be read and exposes the resource reader children.
"""
def __init__(self, spec, reader):
self._spec = spec
self._reader = reader
def iterdir(self):
if not self._reader:
return iter(())
return iter(
CompatibilityFiles.ChildPath(self._reader, path)
for path in self._reader.contents()
)
def is_file(self):
return False
is_dir = is_file
def joinpath(self, other):
if not self._reader:
return CompatibilityFiles.OrphanPath(other)
return CompatibilityFiles.ChildPath(self._reader, other)
@property
def name(self):
return self._spec.name
def open(self, mode='r', *args, **kwargs):
return _io_wrapper(self._reader.open_resource(None), mode, *args, **kwargs)
class ChildPath(abc.Traversable):
"""
Path tied to a resource reader child.
Can be read but doesn't expose any meaningful children.
"""
def __init__(self, reader, name):
self._reader = reader
self._name = name
def iterdir(self):
return iter(())
def is_file(self):
return self._reader.is_resource(self.name)
def is_dir(self):
return not self.is_file()
def joinpath(self, other):
return CompatibilityFiles.OrphanPath(self.name, other)
@property
def name(self):
return self._name
def open(self, mode='r', *args, **kwargs):
return _io_wrapper(
self._reader.open_resource(self.name), mode, *args, **kwargs
)
class OrphanPath(abc.Traversable):
"""
Orphan path, not tied to a module spec or resource reader.
Can't be read and doesn't expose any meaningful children.
"""
def __init__(self, *path_parts):
if len(path_parts) < 1:
raise ValueError('Need at least one path part to construct a path')
self._path = path_parts
def iterdir(self):
return iter(())
def is_file(self):
return False
is_dir = is_file
def joinpath(self, other):
return CompatibilityFiles.OrphanPath(*self._path, other)
@property
def name(self):
return self._path[-1]
def open(self, mode='r', *args, **kwargs):
raise FileNotFoundError("Can't open orphan path")
def __init__(self, spec):
self.spec = spec
@property
def _reader(self):
with suppress(AttributeError):
return self.spec.loader.get_resource_reader(self.spec.name)
def _native(self):
"""
Return the native reader if it supports files().
"""
reader = self._reader
return reader if hasattr(reader, 'files') else self
def __getattr__(self, attr):
return getattr(self._reader, attr)
def files(self):
return CompatibilityFiles.SpecPath(self.spec, self._reader)
def wrap_spec(package):
"""
Construct a package spec with traversable compatibility
on the spec/loader/reader.
"""
return SpecLoaderAdapter(package.__spec__, TraversableResourcesLoader)

View file

@ -0,0 +1,104 @@
import os
import pathlib
import tempfile
import functools
import contextlib
import types
import importlib
from typing import Union, Optional
from .abc import ResourceReader, Traversable
from ._adapters import wrap_spec
Package = Union[types.ModuleType, str]
def files(package):
# type: (Package) -> Traversable
"""
Get a Traversable resource from a package
"""
return from_package(get_package(package))
def get_resource_reader(package):
# type: (types.ModuleType) -> Optional[ResourceReader]
"""
Return the package's loader if it's a ResourceReader.
"""
# We can't use
# a issubclass() check here because apparently abc.'s __subclasscheck__()
# hook wants to create a weak reference to the object, but
# zipimport.zipimporter does not support weak references, resulting in a
# TypeError. That seems terrible.
spec = package.__spec__
reader = getattr(spec.loader, 'get_resource_reader', None) # type: ignore
if reader is None:
return None
return reader(spec.name) # type: ignore
def resolve(cand):
# type: (Package) -> types.ModuleType
return cand if isinstance(cand, types.ModuleType) else importlib.import_module(cand)
def get_package(package):
# type: (Package) -> types.ModuleType
"""Take a package name or module object and return the module.
Raise an exception if the resolved module is not a package.
"""
resolved = resolve(package)
if wrap_spec(resolved).submodule_search_locations is None:
raise TypeError(f'{package!r} is not a package')
return resolved
def from_package(package):
"""
Return a Traversable object for the given package.
"""
spec = wrap_spec(package)
reader = spec.loader.get_resource_reader(spec.name)
return reader.files()
@contextlib.contextmanager
def _tempfile(reader, suffix=''):
# Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
# blocks due to the need to close the temporary file to work on Windows
# properly.
fd, raw_path = tempfile.mkstemp(suffix=suffix)
try:
try:
os.write(fd, reader())
finally:
os.close(fd)
del reader
yield pathlib.Path(raw_path)
finally:
try:
os.remove(raw_path)
except FileNotFoundError:
pass
@functools.singledispatch
def as_file(path):
"""
Given a Traversable object, return that object as a
path on the local file system in a context manager.
"""
return _tempfile(path.read_bytes, suffix=path.name)
@as_file.register(pathlib.Path)
@contextlib.contextmanager
def _(path):
"""
Degenerate behavior for pathlib.Path objects.
"""
yield path

View file

@ -0,0 +1,35 @@
from itertools import filterfalse
from typing import (
Callable,
Iterable,
Iterator,
Optional,
Set,
TypeVar,
Union,
)
# Type and type variable definitions
_T = TypeVar('_T')
_U = TypeVar('_U')
def unique_everseen(
iterable: Iterable[_T], key: Optional[Callable[[_T], _U]] = None
) -> Iterator[_T]:
"List unique elements, preserving order. Remember all elements ever seen."
# unique_everseen('AAAABBBCCDAABBB') --> A B C D
# unique_everseen('ABBCcAD', str.lower) --> A B C D
seen: Set[Union[_T, _U]] = set()
seen_add = seen.add
if key is None:
for element in filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element

View file

@ -0,0 +1,121 @@
import functools
import os
import pathlib
import types
import warnings
from typing import Union, Iterable, ContextManager, BinaryIO, TextIO, Any
from . import _common
Package = Union[types.ModuleType, str]
Resource = str
def deprecated(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
warnings.warn(
f"{func.__name__} is deprecated. Use files() instead. "
"Refer to https://importlib-resources.readthedocs.io"
"/en/latest/using.html#migrating-from-legacy for migration advice.",
DeprecationWarning,
stacklevel=2,
)
return func(*args, **kwargs)
return wrapper
def normalize_path(path):
# type: (Any) -> str
"""Normalize a path by ensuring it is a string.
If the resulting string contains path separators, an exception is raised.
"""
str_path = str(path)
parent, file_name = os.path.split(str_path)
if parent:
raise ValueError(f'{path!r} must be only a file name')
return file_name
@deprecated
def open_binary(package: Package, resource: Resource) -> BinaryIO:
"""Return a file-like object opened for binary reading of the resource."""
return (_common.files(package) / normalize_path(resource)).open('rb')
@deprecated
def read_binary(package: Package, resource: Resource) -> bytes:
"""Return the binary contents of the resource."""
return (_common.files(package) / normalize_path(resource)).read_bytes()
@deprecated
def open_text(
package: Package,
resource: Resource,
encoding: str = 'utf-8',
errors: str = 'strict',
) -> TextIO:
"""Return a file-like object opened for text reading of the resource."""
return (_common.files(package) / normalize_path(resource)).open(
'r', encoding=encoding, errors=errors
)
@deprecated
def read_text(
package: Package,
resource: Resource,
encoding: str = 'utf-8',
errors: str = 'strict',
) -> str:
"""Return the decoded string of the resource.
The decoding-related arguments have the same semantics as those of
bytes.decode().
"""
with open_text(package, resource, encoding, errors) as fp:
return fp.read()
@deprecated
def contents(package: Package) -> Iterable[str]:
"""Return an iterable of entries in `package`.
Note that not all entries are resources. Specifically, directories are
not considered resources. Use `is_resource()` on each entry returned here
to check if it is a resource or not.
"""
return [path.name for path in _common.files(package).iterdir()]
@deprecated
def is_resource(package: Package, name: str) -> bool:
"""True if `name` is a resource inside `package`.
Directories are *not* resources.
"""
resource = normalize_path(name)
return any(
traversable.name == resource and traversable.is_file()
for traversable in _common.files(package).iterdir()
)
@deprecated
def path(
package: Package,
resource: Resource,
) -> ContextManager[pathlib.Path]:
"""A context manager providing a file path object to the resource.
If the resource does not already exist on its own on the file system,
a temporary file will be created. If the file was created, the file
will be deleted upon exiting the context manager (no exception is
raised if the file was deleted prior to the context manager
exiting).
"""
return _common.as_file(_common.files(package) / normalize_path(resource))

View file

@ -0,0 +1,136 @@
import abc
from typing import BinaryIO, Iterable, Text
from typing import runtime_checkable, Protocol
class ResourceReader(metaclass=abc.ABCMeta):
"""Abstract base class for loaders to provide resource reading support."""
@abc.abstractmethod
def open_resource(self, resource: Text) -> BinaryIO:
"""Return an opened, file-like object for binary reading.
The 'resource' argument is expected to represent only a file name.
If the resource cannot be found, FileNotFoundError is raised.
"""
# This deliberately raises FileNotFoundError instead of
# NotImplementedError so that if this method is accidentally called,
# it'll still do the right thing.
raise FileNotFoundError
@abc.abstractmethod
def resource_path(self, resource: Text) -> Text:
"""Return the file system path to the specified resource.
The 'resource' argument is expected to represent only a file name.
If the resource does not exist on the file system, raise
FileNotFoundError.
"""
# This deliberately raises FileNotFoundError instead of
# NotImplementedError so that if this method is accidentally called,
# it'll still do the right thing.
raise FileNotFoundError
@abc.abstractmethod
def is_resource(self, path: Text) -> bool:
"""Return True if the named 'path' is a resource.
Files are resources, directories are not.
"""
raise FileNotFoundError
@abc.abstractmethod
def contents(self) -> Iterable[str]:
"""Return an iterable of entries in `package`."""
raise FileNotFoundError
@runtime_checkable
class Traversable(Protocol):
"""
An object with a subset of pathlib.Path methods suitable for
traversing directories and opening files.
"""
@abc.abstractmethod
def iterdir(self):
"""
Yield Traversable objects in self
"""
def read_bytes(self):
"""
Read contents of self as bytes
"""
with self.open('rb') as strm:
return strm.read()
def read_text(self, encoding=None):
"""
Read contents of self as text
"""
with self.open(encoding=encoding) as strm:
return strm.read()
@abc.abstractmethod
def is_dir(self) -> bool:
"""
Return True if self is a directory
"""
@abc.abstractmethod
def is_file(self) -> bool:
"""
Return True if self is a file
"""
@abc.abstractmethod
def joinpath(self, child):
"""
Return Traversable child in self
"""
def __truediv__(self, child):
"""
Return Traversable child in self
"""
return self.joinpath(child)
@abc.abstractmethod
def open(self, mode='r', *args, **kwargs):
"""
mode may be 'r' or 'rb' to open as text or binary. Return a handle
suitable for reading (same as pathlib.Path.open).
When opening as text, accepts encoding parameters such as those
accepted by io.TextIOWrapper.
"""
@abc.abstractproperty
def name(self) -> str:
"""
The base name of this object without any parent references.
"""
class TraversableResources(ResourceReader):
"""
The required interface for providing traversable
resources.
"""
@abc.abstractmethod
def files(self):
"""Return a Traversable object for the loaded package."""
def open_resource(self, resource):
return self.files().joinpath(resource).open('rb')
def resource_path(self, resource):
raise FileNotFoundError(resource)
def is_resource(self, path):
return self.files().joinpath(path).is_file()
def contents(self):
return (item.name for item in self.files().iterdir())

View file

@ -0,0 +1,122 @@
import collections
import operator
import pathlib
import zipfile
from . import abc
from ._itertools import unique_everseen
def remove_duplicates(items):
return iter(collections.OrderedDict.fromkeys(items))
class FileReader(abc.TraversableResources):
def __init__(self, loader):
self.path = pathlib.Path(loader.path).parent
def resource_path(self, resource):
"""
Return the file system path to prevent
`resources.path()` from creating a temporary
copy.
"""
return str(self.path.joinpath(resource))
def files(self):
return self.path
class ZipReader(abc.TraversableResources):
def __init__(self, loader, module):
_, _, name = module.rpartition('.')
self.prefix = loader.prefix.replace('\\', '/') + name + '/'
self.archive = loader.archive
def open_resource(self, resource):
try:
return super().open_resource(resource)
except KeyError as exc:
raise FileNotFoundError(exc.args[0])
def is_resource(self, path):
# workaround for `zipfile.Path.is_file` returning true
# for non-existent paths.
target = self.files().joinpath(path)
return target.is_file() and target.exists()
def files(self):
return zipfile.Path(self.archive, self.prefix)
class MultiplexedPath(abc.Traversable):
"""
Given a series of Traversable objects, implement a merged
version of the interface across all objects. Useful for
namespace packages which may be multihomed at a single
name.
"""
def __init__(self, *paths):
self._paths = list(map(pathlib.Path, remove_duplicates(paths)))
if not self._paths:
message = 'MultiplexedPath must contain at least one path'
raise FileNotFoundError(message)
if not all(path.is_dir() for path in self._paths):
raise NotADirectoryError('MultiplexedPath only supports directories')
def iterdir(self):
files = (file for path in self._paths for file in path.iterdir())
return unique_everseen(files, key=operator.attrgetter('name'))
def read_bytes(self):
raise FileNotFoundError(f'{self} is not a file')
def read_text(self, *args, **kwargs):
raise FileNotFoundError(f'{self} is not a file')
def is_dir(self):
return True
def is_file(self):
return False
def joinpath(self, child):
# first try to find child in current paths
for file in self.iterdir():
if file.name == child:
return file
# if it does not exist, construct it with the first path
return self._paths[0] / child
__truediv__ = joinpath
def open(self, *args, **kwargs):
raise FileNotFoundError(f'{self} is not a file')
@property
def name(self):
return self._paths[0].name
def __repr__(self):
paths = ', '.join(f"'{path}'" for path in self._paths)
return f'MultiplexedPath({paths})'
class NamespaceReader(abc.TraversableResources):
def __init__(self, namespace_path):
if 'NamespacePath' not in str(namespace_path):
raise ValueError('Invalid path')
self.path = MultiplexedPath(*list(namespace_path))
def resource_path(self, resource):
"""
Return the file system path to prevent
`resources.path()` from creating a temporary
copy.
"""
return str(self.path.joinpath(resource))
def files(self):
return self.path

View file

@ -0,0 +1,116 @@
"""
Interface adapters for low-level readers.
"""
import abc
import io
import itertools
from typing import BinaryIO, List
from .abc import Traversable, TraversableResources
class SimpleReader(abc.ABC):
"""
The minimum, low-level interface required from a resource
provider.
"""
@abc.abstractproperty
def package(self):
# type: () -> str
"""
The name of the package for which this reader loads resources.
"""
@abc.abstractmethod
def children(self):
# type: () -> List['SimpleReader']
"""
Obtain an iterable of SimpleReader for available
child containers (e.g. directories).
"""
@abc.abstractmethod
def resources(self):
# type: () -> List[str]
"""
Obtain available named resources for this virtual package.
"""
@abc.abstractmethod
def open_binary(self, resource):
# type: (str) -> BinaryIO
"""
Obtain a File-like for a named resource.
"""
@property
def name(self):
return self.package.split('.')[-1]
class ResourceHandle(Traversable):
"""
Handle to a named resource in a ResourceReader.
"""
def __init__(self, parent, name):
# type: (ResourceContainer, str) -> None
self.parent = parent
self.name = name # type: ignore
def is_file(self):
return True
def is_dir(self):
return False
def open(self, mode='r', *args, **kwargs):
stream = self.parent.reader.open_binary(self.name)
if 'b' not in mode:
stream = io.TextIOWrapper(*args, **kwargs)
return stream
def joinpath(self, name):
raise RuntimeError("Cannot traverse into a resource")
class ResourceContainer(Traversable):
"""
Traversable container for a package's resources via its reader.
"""
def __init__(self, reader):
# type: (SimpleReader) -> None
self.reader = reader
def is_dir(self):
return True
def is_file(self):
return False
def iterdir(self):
files = (ResourceHandle(self, name) for name in self.reader.resources)
dirs = map(ResourceContainer, self.reader.children())
return itertools.chain(files, dirs)
def open(self, *args, **kwargs):
raise IsADirectoryError()
def joinpath(self, name):
return next(
traversable for traversable in self.iterdir() if traversable.name == name
)
class TraversableReader(TraversableResources, SimpleReader):
"""
A TraversableResources based on SimpleReader. Resource providers
may derive from this class to provide the TraversableResources
interface by supplying the SimpleReader interface.
"""
def files(self):
return ResourceContainer(self)