bpo-42129: Add support for resources in namespaces (GH-24670)

* Unify behavior in ResourceReaderDefaultsTests and align with the behavior found in importlib_resources.
* Equip NamespaceLoader with a NamespaceReader.
* Apply changes from importlib_resources 5.0.4
This commit is contained in:
Jason R. Coombs 2021-03-04 13:43:00 -05:00 committed by GitHub
parent fbf75b9997
commit 6714825414
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 1315 additions and 916 deletions

View file

@ -0,0 +1,82 @@
from contextlib import suppress
from . import abc
class SpecLoaderAdapter:
"""
Adapt a package spec to adapt the underlying loader.
"""
def __init__(self, spec, adapter=lambda spec: spec.loader):
self.spec = spec
self.loader = adapter(spec)
def __getattr__(self, name):
return getattr(self.spec, name)
class TraversableResourcesLoader:
"""
Adapt a loader to provide TraversableResources.
"""
def __init__(self, spec):
self.spec = spec
def get_resource_reader(self, name):
return DegenerateFiles(self.spec)._native()
class DegenerateFiles:
"""
Adapter for an existing or non-existant resource reader
to provide a degenerate .files().
"""
class Path(abc.Traversable):
def iterdir(self):
return iter(())
def is_dir(self):
return False
is_file = exists = is_dir # type: ignore
def joinpath(self, other):
return DegenerateFiles.Path()
def name(self):
return ''
def open(self):
raise ValueError()
def __init__(self, spec):
self.spec = spec
@property
def _reader(self):
with suppress(AttributeError):
return self.spec.loader.get_resource_reader(self.spec.name)
def _native(self):
"""
Return the native reader if it supports files().
"""
reader = self._reader
return reader if hasattr(reader, 'files') else self
def __getattr__(self, attr):
return getattr(self._reader, attr)
def files(self):
return DegenerateFiles.Path()
def wrap_spec(package):
"""
Construct a package spec with traversable compatibility
on the spec/loader/reader.
"""
return SpecLoaderAdapter(package.__spec__, TraversableResourcesLoader)

View file

@ -1259,6 +1259,10 @@ class _NamespaceLoader:
# Warning implemented in _load_module_shim().
return _bootstrap._load_module_shim(self, fullname)
def get_resource_reader(self, module):
from importlib.readers import NamespaceReader
return NamespaceReader(self._path)
# Finders #####################################################################

View file

@ -9,6 +9,8 @@ import importlib
from typing import Union, Any, Optional
from .abc import ResourceReader
from ._adapters import wrap_spec
Package = Union[types.ModuleType, str]
@ -43,18 +45,15 @@ def get_resource_reader(package):
# zipimport.zipimporter does not support weak references, resulting in a
# TypeError. That seems terrible.
spec = package.__spec__
reader = getattr(spec.loader, 'get_resource_reader', None)
reader = getattr(spec.loader, 'get_resource_reader', None) # type: ignore
if reader is None:
return None
return reader(spec.name)
return reader(spec.name) # type: ignore
def resolve(cand):
# type: (Package) -> types.ModuleType
return (
cand if isinstance(cand, types.ModuleType)
else importlib.import_module(cand)
)
return cand if isinstance(cand, types.ModuleType) else importlib.import_module(cand)
def get_package(package):
@ -64,7 +63,7 @@ def get_package(package):
Raise an exception if the resolved module is not a package.
"""
resolved = resolve(package)
if resolved.__spec__.submodule_search_locations is None:
if wrap_spec(resolved).submodule_search_locations is None:
raise TypeError('{!r} is not a package'.format(package))
return resolved
@ -74,7 +73,7 @@ def from_package(package):
Return a Traversable object for the given package.
"""
spec = package.__spec__
spec = wrap_spec(package)
reader = spec.loader.get_resource_reader(spec.name)
return reader.files()

View file

@ -14,6 +14,7 @@ except ImportError:
from ._abc import Loader
import abc
import warnings
from typing import BinaryIO, Iterable, Text
from typing import Protocol, runtime_checkable
@ -297,49 +298,45 @@ _register(SourceLoader, machinery.SourceFileLoader)
class ResourceReader(metaclass=abc.ABCMeta):
"""Abstract base class to provide resource-reading support.
Loaders that support resource reading are expected to implement
the ``get_resource_reader(fullname)`` method and have it either return None
or an object compatible with this ABC.
"""
"""Abstract base class for loaders to provide resource reading support."""
@abc.abstractmethod
def open_resource(self, resource):
def open_resource(self, resource: Text) -> BinaryIO:
"""Return an opened, file-like object for binary reading.
The 'resource' argument is expected to represent only a file name
and thus not contain any subdirectory components.
The 'resource' argument is expected to represent only a file name.
If the resource cannot be found, FileNotFoundError is raised.
"""
# This deliberately raises FileNotFoundError instead of
# NotImplementedError so that if this method is accidentally called,
# it'll still do the right thing.
raise FileNotFoundError
@abc.abstractmethod
def resource_path(self, resource):
def resource_path(self, resource: Text) -> Text:
"""Return the file system path to the specified resource.
The 'resource' argument is expected to represent only a file name
and thus not contain any subdirectory components.
The 'resource' argument is expected to represent only a file name.
If the resource does not exist on the file system, raise
FileNotFoundError.
"""
# This deliberately raises FileNotFoundError instead of
# NotImplementedError so that if this method is accidentally called,
# it'll still do the right thing.
raise FileNotFoundError
@abc.abstractmethod
def is_resource(self, name):
"""Return True if the named 'name' is consider a resource."""
def is_resource(self, path: Text) -> bool:
"""Return True if the named 'path' is a resource.
Files are resources, directories are not.
"""
raise FileNotFoundError
@abc.abstractmethod
def contents(self):
"""Return an iterable of strings over the contents of the package."""
return []
_register(ResourceReader, machinery.SourceFileLoader)
def contents(self) -> Iterable[str]:
"""Return an iterable of entries in `package`."""
raise FileNotFoundError
@runtime_checkable
@ -355,26 +352,28 @@ class Traversable(Protocol):
Yield Traversable objects in self
"""
@abc.abstractmethod
def read_bytes(self):
"""
Read contents of self as bytes
"""
with self.open('rb') as strm:
return strm.read()
@abc.abstractmethod
def read_text(self, encoding=None):
"""
Read contents of self as bytes
Read contents of self as text
"""
with self.open(encoding=encoding) as strm:
return strm.read()
@abc.abstractmethod
def is_dir(self):
def is_dir(self) -> bool:
"""
Return True if self is a dir
"""
@abc.abstractmethod
def is_file(self):
def is_file(self) -> bool:
"""
Return True if self is a file
"""
@ -385,11 +384,11 @@ class Traversable(Protocol):
Return Traversable child in self
"""
@abc.abstractmethod
def __truediv__(self, child):
"""
Return Traversable child in self
"""
return self.joinpath(child)
@abc.abstractmethod
def open(self, mode='r', *args, **kwargs):
@ -402,14 +401,18 @@ class Traversable(Protocol):
"""
@abc.abstractproperty
def name(self):
# type: () -> str
def name(self) -> str:
"""
The base name of this object without any parent references.
"""
class TraversableResources(ResourceReader):
"""
The required interface for providing traversable
resources.
"""
@abc.abstractmethod
def files(self):
"""Return a Traversable object for the loaded package."""

View file

@ -1,8 +1,13 @@
import collections
import zipfile
import pathlib
from . import abc
def remove_duplicates(items):
return iter(collections.OrderedDict.fromkeys(items))
class FileReader(abc.TraversableResources):
def __init__(self, loader):
self.path = pathlib.Path(loader.path).parent
@ -39,3 +44,80 @@ class ZipReader(abc.TraversableResources):
def files(self):
return zipfile.Path(self.archive, self.prefix)
class MultiplexedPath(abc.Traversable):
"""
Given a series of Traversable objects, implement a merged
version of the interface across all objects. Useful for
namespace packages which may be multihomed at a single
name.
"""
def __init__(self, *paths):
self._paths = list(map(pathlib.Path, remove_duplicates(paths)))
if not self._paths:
message = 'MultiplexedPath must contain at least one path'
raise FileNotFoundError(message)
if not all(path.is_dir() for path in self._paths):
raise NotADirectoryError('MultiplexedPath only supports directories')
def iterdir(self):
visited = []
for path in self._paths:
for file in path.iterdir():
if file.name in visited:
continue
visited.append(file.name)
yield file
def read_bytes(self):
raise FileNotFoundError(f'{self} is not a file')
def read_text(self, *args, **kwargs):
raise FileNotFoundError(f'{self} is not a file')
def is_dir(self):
return True
def is_file(self):
return False
def joinpath(self, child):
# first try to find child in current paths
for file in self.iterdir():
if file.name == child:
return file
# if it does not exist, construct it with the first path
return self._paths[0] / child
__truediv__ = joinpath
def open(self, *args, **kwargs):
raise FileNotFoundError('{} is not a file'.format(self))
def name(self):
return self._paths[0].name
def __repr__(self):
return 'MultiplexedPath({})'.format(
', '.join("'{}'".format(path) for path in self._paths)
)
class NamespaceReader(abc.TraversableResources):
def __init__(self, namespace_path):
if 'NamespacePath' not in str(namespace_path):
raise ValueError('Invalid path')
self.path = MultiplexedPath(*list(namespace_path))
def resource_path(self, resource):
"""
Return the file system path to prevent
`resources.path()` from creating a temporary
copy.
"""
return str(self.path.joinpath(resource))
def files(self):
return self.path

View file

@ -3,8 +3,10 @@ import io
from . import _common
from ._common import as_file, files
from .abc import ResourceReader
from contextlib import suppress
from importlib.abc import ResourceLoader
from importlib.machinery import ModuleSpec
from io import BytesIO, TextIOWrapper
from pathlib import Path
from types import ModuleType
@ -18,6 +20,7 @@ from functools import singledispatch
__all__ = [
'Package',
'Resource',
'ResourceReader',
'as_file',
'contents',
'files',
@ -41,9 +44,15 @@ def open_binary(package: Package, resource: Resource) -> BinaryIO:
reader = _common.get_resource_reader(package)
if reader is not None:
return reader.open_resource(resource)
absolute_package_path = os.path.abspath(
package.__spec__.origin or 'non-existent file')
package_path = os.path.dirname(absolute_package_path)
spec = cast(ModuleSpec, package.__spec__)
# Using pathlib doesn't work well here due to the lack of 'strict'
# argument for pathlib.Path.resolve() prior to Python 3.6.
if spec.submodule_search_locations is not None:
paths = spec.submodule_search_locations
elif spec.origin is not None:
paths = [os.path.dirname(os.path.abspath(spec.origin))]
for package_path in paths:
full_path = os.path.join(package_path, resource)
try:
return open(full_path, mode='rb')
@ -51,26 +60,29 @@ def open_binary(package: Package, resource: Resource) -> BinaryIO:
# Just assume the loader is a resource loader; all the relevant
# importlib.machinery loaders are and an AttributeError for
# get_data() will make it clear what is needed from the loader.
loader = cast(ResourceLoader, package.__spec__.loader)
loader = cast(ResourceLoader, spec.loader)
data = None
if hasattr(package.__spec__.loader, 'get_data'):
if hasattr(spec.loader, 'get_data'):
with suppress(OSError):
data = loader.get_data(full_path)
if data is None:
package_name = package.__spec__.name
message = '{!r} resource not found in {!r}'.format(
resource, package_name)
raise FileNotFoundError(message)
if data is not None:
return BytesIO(data)
raise FileNotFoundError(
'{!r} resource not found in {!r}'.format(resource, spec.name)
)
def open_text(package: Package,
def open_text(
package: Package,
resource: Resource,
encoding: str = 'utf-8',
errors: str = 'strict') -> TextIO:
errors: str = 'strict',
) -> TextIO:
"""Return a file-like object opened for text reading of the resource."""
return TextIOWrapper(
open_binary(package, resource), encoding=encoding, errors=errors)
open_binary(package, resource), encoding=encoding, errors=errors
)
def read_binary(package: Package, resource: Resource) -> bytes:
@ -79,10 +91,12 @@ def read_binary(package: Package, resource: Resource) -> bytes:
return fp.read()
def read_text(package: Package,
def read_text(
package: Package,
resource: Resource,
encoding: str = 'utf-8',
errors: str = 'strict') -> str:
errors: str = 'strict',
) -> str:
"""Return the decoded string of the resource.
The decoding-related arguments have the same semantics as those of
@ -93,7 +107,8 @@ def read_text(package: Package,
def path(
package: Package, resource: Resource,
package: Package,
resource: Resource,
) -> 'ContextManager[Path]':
"""A context manager providing a file path object to the resource.
@ -106,15 +121,17 @@ def path(
reader = _common.get_resource_reader(_common.get_package(package))
return (
_path_from_reader(reader, _common.normalize_path(resource))
if reader else
_common.as_file(
_common.files(package).joinpath(_common.normalize_path(resource)))
if reader
else _common.as_file(
_common.files(package).joinpath(_common.normalize_path(resource))
)
)
def _path_from_reader(reader, resource):
return _path_from_resource_path(reader, resource) or \
_path_from_open_resource(reader, resource)
return _path_from_resource_path(reader, resource) or _path_from_open_resource(
reader, resource
)
def _path_from_resource_path(reader, resource):
@ -154,15 +171,10 @@ def contents(package: Package) -> Iterable[str]:
reader = _common.get_resource_reader(package)
if reader is not None:
return _ensure_sequence(reader.contents())
# Is the package a namespace package? By definition, namespace packages
# cannot have resources.
namespace = (
package.__spec__.origin is None or
package.__spec__.origin == 'namespace'
)
if namespace or not package.__spec__.has_location:
return ()
return list(item.name for item in _common.from_package(package).iterdir())
transversable = _common.from_package(package)
if transversable.is_dir():
return list(item.name for item in transversable.iterdir())
return []
@singledispatch

Binary file not shown.

Binary file not shown.

View file

@ -0,0 +1 @@
Hello, UTF-8 world!

View file

@ -338,7 +338,9 @@ class ResourceReaderDefaultsTests(ABCTestHarness):
self.ins.is_resource('dummy_file')
def test_contents(self):
self.assertEqual([], list(self.ins.contents()))
with self.assertRaises(FileNotFoundError):
self.ins.contents()
(Frozen_RRDefaultTests,
Source_RRDefaultsTests

View file

@ -29,34 +29,32 @@ class OpenTests:
self.assertEqual(result, 'Hello, UTF-8 world!\n')
def test_open_text_given_encoding(self):
with resources.open_text(
self.data, 'utf-16.file', 'utf-16', 'strict') as fp:
with resources.open_text(self.data, 'utf-16.file', 'utf-16', 'strict') as fp:
result = fp.read()
self.assertEqual(result, 'Hello, UTF-16 world!\n')
def test_open_text_with_errors(self):
# Raises UnicodeError without the 'errors' argument.
with resources.open_text(
self.data, 'utf-16.file', 'utf-8', 'strict') as fp:
with resources.open_text(self.data, 'utf-16.file', 'utf-8', 'strict') as fp:
self.assertRaises(UnicodeError, fp.read)
with resources.open_text(
self.data, 'utf-16.file', 'utf-8', 'ignore') as fp:
with resources.open_text(self.data, 'utf-16.file', 'utf-8', 'ignore') as fp:
result = fp.read()
self.assertEqual(
result,
'H\x00e\x00l\x00l\x00o\x00,\x00 '
'\x00U\x00T\x00F\x00-\x001\x006\x00 '
'\x00w\x00o\x00r\x00l\x00d\x00!\x00\n\x00')
'\x00w\x00o\x00r\x00l\x00d\x00!\x00\n\x00',
)
def test_open_binary_FileNotFoundError(self):
self.assertRaises(
FileNotFoundError,
resources.open_binary, self.data, 'does-not-exist')
FileNotFoundError, resources.open_binary, self.data, 'does-not-exist'
)
def test_open_text_FileNotFoundError(self):
self.assertRaises(
FileNotFoundError,
resources.open_text, self.data, 'does-not-exist')
FileNotFoundError, resources.open_text, self.data, 'does-not-exist'
)
class OpenDiskTests(OpenTests, unittest.TestCase):
@ -64,6 +62,13 @@ class OpenDiskTests(OpenTests, unittest.TestCase):
self.data = data01
class OpenDiskNamespaceTests(OpenTests, unittest.TestCase):
def setUp(self):
from . import namespacedata01
self.data = namespacedata01
class OpenZipTests(OpenTests, util.ZipSetup, unittest.TestCase):
pass

View file

@ -1,3 +1,4 @@
import io
import unittest
from importlib import resources
@ -37,6 +38,17 @@ class PathDiskTests(PathTests, unittest.TestCase):
assert 'data' in str(path)
class PathMemoryTests(PathTests, unittest.TestCase):
def setUp(self):
file = io.BytesIO(b'Hello, UTF-8 world!\n')
self.addCleanup(file.close)
self.data = util.create_package(
file=file, path=FileNotFoundError("package exists only in memory")
)
self.data.__spec__.origin = None
self.data.__spec__.has_location = False
class PathZipTests(PathTests, util.ZipSetup, unittest.TestCase):
def test_remove_in_context_manager(self):
# It is not an error if the file that was temporarily stashed on the

View file

@ -25,20 +25,19 @@ class ReadTests:
self.assertEqual(result, 'Hello, UTF-8 world!\n')
def test_read_text_given_encoding(self):
result = resources.read_text(
self.data, 'utf-16.file', encoding='utf-16')
result = resources.read_text(self.data, 'utf-16.file', encoding='utf-16')
self.assertEqual(result, 'Hello, UTF-16 world!\n')
def test_read_text_with_errors(self):
# Raises UnicodeError without the 'errors' argument.
self.assertRaises(
UnicodeError, resources.read_text, self.data, 'utf-16.file')
self.assertRaises(UnicodeError, resources.read_text, self.data, 'utf-16.file')
result = resources.read_text(self.data, 'utf-16.file', errors='ignore')
self.assertEqual(
result,
'H\x00e\x00l\x00l\x00o\x00,\x00 '
'\x00U\x00T\x00F\x00-\x001\x006\x00 '
'\x00w\x00o\x00r\x00l\x00d\x00!\x00\n\x00')
'\x00w\x00o\x00r\x00l\x00d\x00!\x00\n\x00',
)
class ReadDiskTests(ReadTests, unittest.TestCase):
@ -48,13 +47,11 @@ class ReadDiskTests(ReadTests, unittest.TestCase):
class ReadZipTests(ReadTests, util.ZipSetup, unittest.TestCase):
def test_read_submodule_resource(self):
submodule = import_module('ziptestdata.subdirectory')
result = resources.read_binary(
submodule, 'binary.file')
result = resources.read_binary(submodule, 'binary.file')
self.assertEqual(result, b'\0\1\2\3')
def test_read_submodule_resource_by_name(self):
result = resources.read_binary(
'ziptestdata.subdirectory', 'binary.file')
result = resources.read_binary('ziptestdata.subdirectory', 'binary.file')
self.assertEqual(result, b'\0\1\2\3')

View file

@ -0,0 +1,123 @@
import os.path
import sys
import pathlib
import unittest
from importlib import import_module
from importlib.readers import MultiplexedPath, NamespaceReader
class MultiplexedPathTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
path = pathlib.Path(__file__).parent / 'namespacedata01'
cls.folder = str(path)
def test_init_no_paths(self):
with self.assertRaises(FileNotFoundError):
MultiplexedPath()
def test_init_file(self):
with self.assertRaises(NotADirectoryError):
MultiplexedPath(os.path.join(self.folder, 'binary.file'))
def test_iterdir(self):
contents = {path.name for path in MultiplexedPath(self.folder).iterdir()}
try:
contents.remove('__pycache__')
except (KeyError, ValueError):
pass
self.assertEqual(contents, {'binary.file', 'utf-16.file', 'utf-8.file'})
def test_iterdir_duplicate(self):
data01 = os.path.abspath(os.path.join(__file__, '..', 'data01'))
contents = {
path.name for path in MultiplexedPath(self.folder, data01).iterdir()
}
for remove in ('__pycache__', '__init__.pyc'):
try:
contents.remove(remove)
except (KeyError, ValueError):
pass
self.assertEqual(
contents,
{'__init__.py', 'binary.file', 'subdirectory', 'utf-16.file', 'utf-8.file'},
)
def test_is_dir(self):
self.assertEqual(MultiplexedPath(self.folder).is_dir(), True)
def test_is_file(self):
self.assertEqual(MultiplexedPath(self.folder).is_file(), False)
def test_open_file(self):
path = MultiplexedPath(self.folder)
with self.assertRaises(FileNotFoundError):
path.read_bytes()
with self.assertRaises(FileNotFoundError):
path.read_text()
with self.assertRaises(FileNotFoundError):
path.open()
def test_join_path(self):
print('test_join_path')
prefix = os.path.abspath(os.path.join(__file__, '..'))
data01 = os.path.join(prefix, 'data01')
path = MultiplexedPath(self.folder, data01)
self.assertEqual(
str(path.joinpath('binary.file'))[len(prefix) + 1 :],
os.path.join('namespacedata01', 'binary.file'),
)
self.assertEqual(
str(path.joinpath('subdirectory'))[len(prefix) + 1 :],
os.path.join('data01', 'subdirectory'),
)
self.assertEqual(
str(path.joinpath('imaginary'))[len(prefix) + 1 :],
os.path.join('namespacedata01', 'imaginary'),
)
def test_repr(self):
self.assertEqual(
repr(MultiplexedPath(self.folder)),
"MultiplexedPath('{}')".format(self.folder),
)
class NamespaceReaderTest(unittest.TestCase):
site_dir = str(pathlib.Path(__file__).parent)
@classmethod
def setUpClass(cls):
sys.path.append(cls.site_dir)
@classmethod
def tearDownClass(cls):
sys.path.remove(cls.site_dir)
def test_init_error(self):
with self.assertRaises(ValueError):
NamespaceReader(['path1', 'path2'])
def test_resource_path(self):
namespacedata01 = import_module('namespacedata01')
reader = NamespaceReader(namespacedata01.__spec__.submodule_search_locations)
root = os.path.abspath(os.path.join(__file__, '..', 'namespacedata01'))
self.assertEqual(
reader.resource_path('binary.file'), os.path.join(root, 'binary.file')
)
self.assertEqual(
reader.resource_path('imaginary'), os.path.join(root, 'imaginary')
)
def test_files(self):
namespacedata01 = import_module('namespacedata01')
reader = NamespaceReader(namespacedata01.__spec__.submodule_search_locations)
root = os.path.abspath(os.path.join(__file__, '..', 'namespacedata01'))
self.assertIsInstance(reader.files(), MultiplexedPath)
self.assertEqual(repr(reader.files()), "MultiplexedPath('{}')".format(root))
if __name__ == '__main__':
unittest.main()

View file

@ -27,20 +27,21 @@ class ResourceTests:
def test_contents(self):
contents = set(resources.contents(self.data))
# There may be cruft in the directory listing of the data directory.
# Under Python 3 we could have a __pycache__ directory, and under
# Python 2 we could have .pyc files. These are both artifacts of the
# test suite importing these modules and writing these caches. They
# aren't germane to this test, so just filter them out.
# It could have a __pycache__ directory,
# an artifact of the
# test suite importing these modules, which
# are not germane to this test, so just filter them out.
contents.discard('__pycache__')
contents.discard('__init__.pyc')
contents.discard('__init__.pyo')
self.assertEqual(contents, {
self.assertEqual(
contents,
{
'__init__.py',
'subdirectory',
'utf-8.file',
'binary.file',
'utf-16.file',
})
},
)
class ResourceDiskTests(ResourceTests, unittest.TestCase):
@ -55,27 +56,26 @@ class ResourceZipTests(ResourceTests, util.ZipSetup, unittest.TestCase):
class ResourceLoaderTests(unittest.TestCase):
def test_resource_contents(self):
package = util.create_package(
file=data01, path=data01.__file__, contents=['A', 'B', 'C'])
self.assertEqual(
set(resources.contents(package)),
{'A', 'B', 'C'})
file=data01, path=data01.__file__, contents=['A', 'B', 'C']
)
self.assertEqual(set(resources.contents(package)), {'A', 'B', 'C'})
def test_resource_is_resource(self):
package = util.create_package(
file=data01, path=data01.__file__,
contents=['A', 'B', 'C', 'D/E', 'D/F'])
file=data01, path=data01.__file__, contents=['A', 'B', 'C', 'D/E', 'D/F']
)
self.assertTrue(resources.is_resource(package, 'B'))
def test_resource_directory_is_not_resource(self):
package = util.create_package(
file=data01, path=data01.__file__,
contents=['A', 'B', 'C', 'D/E', 'D/F'])
file=data01, path=data01.__file__, contents=['A', 'B', 'C', 'D/E', 'D/F']
)
self.assertFalse(resources.is_resource(package, 'D'))
def test_resource_missing_is_not_resource(self):
package = util.create_package(
file=data01, path=data01.__file__,
contents=['A', 'B', 'C', 'D/E', 'D/F'])
file=data01, path=data01.__file__, contents=['A', 'B', 'C', 'D/E', 'D/F']
)
self.assertFalse(resources.is_resource(package, 'Z'))
@ -86,90 +86,63 @@ class ResourceCornerCaseTests(unittest.TestCase):
# 2. Are not on the file system
# 3. Are not in a zip file
module = util.create_package(
file=data01, path=data01.__file__, contents=['A', 'B', 'C'])
file=data01, path=data01.__file__, contents=['A', 'B', 'C']
)
# Give the module a dummy loader.
module.__loader__ = object()
# Give the module a dummy origin.
module.__file__ = '/path/which/shall/not/be/named'
if sys.version_info >= (3,):
module.__spec__.loader = module.__loader__
module.__spec__.origin = module.__file__
self.assertFalse(resources.is_resource(module, 'A'))
class ResourceFromZipsTest(util.ZipSetupBase, unittest.TestCase):
ZIP_MODULE = zipdata02 # type: ignore
def test_unrelated_contents(self):
# https://gitlab.com/python-devs/importlib_resources/issues/44
#
# Here we have a zip file with two unrelated subpackages. The bug
# reports that getting the contents of a resource returns unrelated
# files.
self.assertEqual(
set(resources.contents('ziptestdata.one')),
{'__init__.py', 'resource1.txt'})
self.assertEqual(
set(resources.contents('ziptestdata.two')),
{'__init__.py', 'resource2.txt'})
class SubdirectoryResourceFromZipsTest(util.ZipSetupBase, unittest.TestCase):
class ResourceFromZipsTest01(util.ZipSetupBase, unittest.TestCase):
ZIP_MODULE = zipdata01 # type: ignore
def test_is_submodule_resource(self):
submodule = import_module('ziptestdata.subdirectory')
self.assertTrue(
resources.is_resource(submodule, 'binary.file'))
self.assertTrue(resources.is_resource(submodule, 'binary.file'))
def test_read_submodule_resource_by_name(self):
self.assertTrue(
resources.is_resource('ziptestdata.subdirectory', 'binary.file'))
resources.is_resource('ziptestdata.subdirectory', 'binary.file')
)
def test_submodule_contents(self):
submodule = import_module('ziptestdata.subdirectory')
self.assertEqual(
set(resources.contents(submodule)),
{'__init__.py', 'binary.file'})
set(resources.contents(submodule)), {'__init__.py', 'binary.file'}
)
def test_submodule_contents_by_name(self):
self.assertEqual(
set(resources.contents('ziptestdata.subdirectory')),
{'__init__.py', 'binary.file'})
{'__init__.py', 'binary.file'},
)
class NamespaceTest(unittest.TestCase):
def test_namespaces_cannot_have_resources(self):
contents = resources.contents('test.test_importlib.data03.namespace')
self.assertFalse(list(contents))
# Even though there is a file in the namespace directory, it is not
# considered a resource, since namespace packages can't have them.
self.assertFalse(resources.is_resource(
'test.test_importlib.data03.namespace',
'resource1.txt'))
# We should get an exception if we try to read it or open it.
self.assertRaises(
FileNotFoundError,
resources.open_text,
'test.test_importlib.data03.namespace', 'resource1.txt')
self.assertRaises(
FileNotFoundError,
resources.open_binary,
'test.test_importlib.data03.namespace', 'resource1.txt')
self.assertRaises(
FileNotFoundError,
resources.read_text,
'test.test_importlib.data03.namespace', 'resource1.txt')
self.assertRaises(
FileNotFoundError,
resources.read_binary,
'test.test_importlib.data03.namespace', 'resource1.txt')
class ResourceFromZipsTest02(util.ZipSetupBase, unittest.TestCase):
ZIP_MODULE = zipdata02 # type: ignore
def test_unrelated_contents(self):
"""
Test thata zip with two unrelated subpackages return
distinct resources. Ref python/importlib_resources#44.
"""
self.assertEqual(
set(resources.contents('ziptestdata.one')), {'__init__.py', 'resource1.txt'}
)
self.assertEqual(
set(resources.contents('ziptestdata.two')), {'__init__.py', 'resource2.txt'}
)
class DeletingZipsTest(unittest.TestCase):
"""Having accessed resources in a zip file should not keep an open
reference to the zip.
"""
ZIP_MODULE = zipdata01
def setUp(self):
@ -241,5 +214,41 @@ class DeletingZipsTest(unittest.TestCase):
del c
class ResourceFromNamespaceTest01(unittest.TestCase):
site_dir = str(pathlib.Path(__file__).parent)
@classmethod
def setUpClass(cls):
sys.path.append(cls.site_dir)
@classmethod
def tearDownClass(cls):
sys.path.remove(cls.site_dir)
def test_is_submodule_resource(self):
self.assertTrue(
resources.is_resource(import_module('namespacedata01'), 'binary.file')
)
def test_read_submodule_resource_by_name(self):
self.assertTrue(resources.is_resource('namespacedata01', 'binary.file'))
def test_submodule_contents(self):
contents = set(resources.contents(import_module('namespacedata01')))
try:
contents.remove('__pycache__')
except KeyError:
pass
self.assertEqual(contents, {'binary.file', 'utf-8.file', 'utf-16.file'})
def test_submodule_contents_by_name(self):
contents = set(resources.contents('namespacedata01'))
try:
contents.remove('__pycache__')
except KeyError:
pass
self.assertEqual(contents, {'binary.file', 'utf-8.file', 'utf-16.file'})
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,53 @@
"""
Generate the zip test data files.
Run to build the tests/zipdataNN/ziptestdata.zip files from
files in tests/dataNN.
Replaces the file with the working copy, but does commit anything
to the source repo.
"""
import contextlib
import os
import pathlib
import zipfile
def main():
"""
>>> from unittest import mock
>>> monkeypatch = getfixture('monkeypatch')
>>> monkeypatch.setattr(zipfile, 'ZipFile', mock.MagicMock())
>>> print(); main() # print workaround for bpo-32509
<BLANKLINE>
...data01... -> ziptestdata/...
...
...data02... -> ziptestdata/...
...
"""
suffixes = '01', '02'
tuple(map(generate, suffixes))
def generate(suffix):
root = pathlib.Path(__file__).parent.relative_to(os.getcwd())
zfpath = root / f'zipdata{suffix}/ziptestdata.zip'
with zipfile.ZipFile(zfpath, 'w') as zf:
for src, rel in walk(root / f'data{suffix}'):
dst = 'ziptestdata' / pathlib.PurePosixPath(rel.as_posix())
print(src, '->', dst)
zf.write(src, dst)
def walk(datapath):
for dirpath, dirnames, filenames in os.walk(datapath):
with contextlib.suppress(KeyError):
dirnames.remove('__pycache__')
for filename in filenames:
res = pathlib.Path(dirpath) / filename
rel = res.relative_to(datapath)
yield res, rel
__name__ == '__main__' and main()

View file

@ -0,0 +1,3 @@
``importlib.resources`` now honors namespace packages, merging resources
from each location in the namespace as introduced in
``importlib_resources`` 3.2 and including incidental changes through 5.0.3.

File diff suppressed because it is too large Load diff