bpo-39791: Add files() to importlib.resources (GH-19722)

* bpo-39791: Update importlib.resources to support files() API (importlib_resources 1.5).

* 📜🤖 Added by blurb_it.

* Add some documentation about the new objects added.

Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
This commit is contained in:
Jason R. Coombs 2020-05-08 19:20:26 -04:00 committed by GitHub
parent d10091aa17
commit 7f7e706d78
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 295 additions and 102 deletions

View file

@ -1,14 +1,15 @@
import os
import tempfile
from . import abc as resources_abc
from . import _common
from ._common import as_file
from contextlib import contextmanager, suppress
from importlib import import_module
from importlib.abc import ResourceLoader
from io import BytesIO, TextIOWrapper
from pathlib import Path
from types import ModuleType
from typing import Iterable, Iterator, Optional, Union # noqa: F401
from typing import ContextManager, Iterable, Optional, Union
from typing import cast
from typing.io import BinaryIO, TextIO
@ -16,7 +17,9 @@ from typing.io import BinaryIO, TextIO
__all__ = [
'Package',
'Resource',
'as_file',
'contents',
'files',
'is_resource',
'open_binary',
'open_text',
@ -30,24 +33,23 @@ Package = Union[str, ModuleType]
Resource = Union[str, os.PathLike]
def _resolve(name) -> ModuleType:
"""If name is a string, resolve to a module."""
if hasattr(name, '__spec__'):
return name
return import_module(name)
def _get_package(package) -> ModuleType:
"""Take a package name or module object and return the module.
If a name, the module is imported. If the passed or imported module
If a name, the module is imported. If the resolved module
object is not a package, raise an exception.
"""
if hasattr(package, '__spec__'):
if package.__spec__.submodule_search_locations is None:
raise TypeError('{!r} is not a package'.format(
package.__spec__.name))
else:
return package
else:
module = import_module(package)
if module.__spec__.submodule_search_locations is None:
raise TypeError('{!r} is not a package'.format(package))
else:
return module
module = _resolve(package)
if module.__spec__.submodule_search_locations is None:
raise TypeError('{!r} is not a package'.format(package))
return module
def _normalize_path(path) -> str:
@ -58,8 +60,7 @@ def _normalize_path(path) -> str:
parent, file_name = os.path.split(path)
if parent:
raise ValueError('{!r} must be only a file name'.format(path))
else:
return file_name
return file_name
def _get_resource_reader(
@ -88,8 +89,8 @@ def open_binary(package: Package, resource: Resource) -> BinaryIO:
reader = _get_resource_reader(package)
if reader is not None:
return reader.open_resource(resource)
_check_location(package)
absolute_package_path = os.path.abspath(package.__spec__.origin)
absolute_package_path = os.path.abspath(
package.__spec__.origin or 'non-existent file')
package_path = os.path.dirname(absolute_package_path)
full_path = os.path.join(package_path, resource)
try:
@ -108,8 +109,7 @@ def open_binary(package: Package, resource: Resource) -> BinaryIO:
message = '{!r} resource not found in {!r}'.format(
resource, package_name)
raise FileNotFoundError(message)
else:
return BytesIO(data)
return BytesIO(data)
def open_text(package: Package,
@ -117,39 +117,12 @@ def open_text(package: Package,
encoding: str = 'utf-8',
errors: str = 'strict') -> TextIO:
"""Return a file-like object opened for text reading of the resource."""
resource = _normalize_path(resource)
package = _get_package(package)
reader = _get_resource_reader(package)
if reader is not None:
return TextIOWrapper(reader.open_resource(resource), encoding, errors)
_check_location(package)
absolute_package_path = os.path.abspath(package.__spec__.origin)
package_path = os.path.dirname(absolute_package_path)
full_path = os.path.join(package_path, resource)
try:
return open(full_path, mode='r', encoding=encoding, errors=errors)
except OSError:
# Just assume the loader is a resource loader; all the relevant
# importlib.machinery loaders are and an AttributeError for
# get_data() will make it clear what is needed from the loader.
loader = cast(ResourceLoader, package.__spec__.loader)
data = None
if hasattr(package.__spec__.loader, 'get_data'):
with suppress(OSError):
data = loader.get_data(full_path)
if data is None:
package_name = package.__spec__.name
message = '{!r} resource not found in {!r}'.format(
resource, package_name)
raise FileNotFoundError(message)
else:
return TextIOWrapper(BytesIO(data), encoding, errors)
return TextIOWrapper(
open_binary(package, resource), encoding=encoding, errors=errors)
def read_binary(package: Package, resource: Resource) -> bytes:
"""Return the binary contents of the resource."""
resource = _normalize_path(resource)
package = _get_package(package)
with open_binary(package, resource) as fp:
return fp.read()
@ -163,14 +136,20 @@ def read_text(package: Package,
The decoding-related arguments have the same semantics as those of
bytes.decode().
"""
resource = _normalize_path(resource)
package = _get_package(package)
with open_text(package, resource, encoding, errors) as fp:
return fp.read()
@contextmanager
def path(package: Package, resource: Resource) -> Iterator[Path]:
def files(package: Package) -> resources_abc.Traversable:
"""
Get a Traversable resource from a package
"""
return _common.from_package(_get_package(package))
def path(
package: Package, resource: Resource,
) -> 'ContextManager[Path]':
"""A context manager providing a file path object to the resource.
If the resource does not already exist on its own on the file system,
@ -179,39 +158,23 @@ def path(package: Package, resource: Resource) -> Iterator[Path]:
raised if the file was deleted prior to the context manager
exiting).
"""
resource = _normalize_path(resource)
package = _get_package(package)
reader = _get_resource_reader(package)
if reader is not None:
try:
yield Path(reader.resource_path(resource))
return
except FileNotFoundError:
pass
else:
_check_location(package)
# Fall-through for both the lack of resource_path() *and* if
# resource_path() raises FileNotFoundError.
package_directory = Path(package.__spec__.origin).parent
file_path = package_directory / resource
if file_path.exists():
yield file_path
else:
with open_binary(package, resource) as fp:
data = fp.read()
# Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
# blocks due to the need to close the temporary file to work on
# Windows properly.
fd, raw_path = tempfile.mkstemp()
try:
os.write(fd, data)
os.close(fd)
yield Path(raw_path)
finally:
try:
os.remove(raw_path)
except FileNotFoundError:
pass
reader = _get_resource_reader(_get_package(package))
return (
_path_from_reader(reader, resource)
if reader else
_common.as_file(files(package).joinpath(_normalize_path(resource)))
)
@contextmanager
def _path_from_reader(reader, resource):
norm_resource = _normalize_path(resource)
with suppress(FileNotFoundError):
yield Path(reader.resource_path(norm_resource))
return
opener_reader = reader.open_resource(norm_resource)
with _common._tempfile(opener_reader.read, suffix=norm_resource) as res:
yield res
def is_resource(package: Package, name: str) -> bool:
@ -224,17 +187,10 @@ def is_resource(package: Package, name: str) -> bool:
reader = _get_resource_reader(package)
if reader is not None:
return reader.is_resource(name)
try:
package_contents = set(contents(package))
except (NotADirectoryError, FileNotFoundError):
return False
package_contents = set(contents(package))
if name not in package_contents:
return False
# Just because the given file_name lives as an entry in the package's
# contents doesn't necessarily mean it's a resource. Directories are not
# resources, so let's try to find out if it's a directory or not.
path = Path(package.__spec__.origin).parent / name
return path.is_file()
return (_common.from_package(package) / name).is_file()
def contents(package: Package) -> Iterable[str]:
@ -249,10 +205,11 @@ def contents(package: Package) -> Iterable[str]:
if reader is not None:
return reader.contents()
# Is the package a namespace package? By definition, namespace packages
# cannot have resources. We could use _check_location() and catch the
# exception, but that's extra work, so just inline the check.
elif package.__spec__.origin is None or not package.__spec__.has_location:
# cannot have resources.
namespace = (
package.__spec__.origin is None or
package.__spec__.origin == 'namespace'
)
if namespace or not package.__spec__.has_location:
return ()
else:
package_directory = Path(package.__spec__.origin).parent
return os.listdir(package_directory)
return list(item.name for item in _common.from_package(package).iterdir())