mirror of
https://github.com/python/cpython.git
synced 2025-10-21 22:22:48 +00:00
gh-97930: Merge with importlib_resources 5.9 (GH-97929)
* Merge with importlib_resources 5.9 * Update changelog
This commit is contained in:
parent
5c9302d03a
commit
cea910ebf1
7 changed files with 102 additions and 29 deletions
|
@ -67,10 +67,14 @@ def from_package(package):
|
||||||
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def _tempfile(reader, suffix='',
|
def _tempfile(
|
||||||
# gh-93353: Keep a reference to call os.remove() in late Python
|
reader,
|
||||||
# finalization.
|
suffix='',
|
||||||
*, _os_remove=os.remove):
|
# gh-93353: Keep a reference to call os.remove() in late Python
|
||||||
|
# finalization.
|
||||||
|
*,
|
||||||
|
_os_remove=os.remove,
|
||||||
|
):
|
||||||
# Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
|
# Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
|
||||||
# blocks due to the need to close the temporary file to work on Windows
|
# blocks due to the need to close the temporary file to work on Windows
|
||||||
# properly.
|
# properly.
|
||||||
|
@ -89,13 +93,30 @@ def _tempfile(reader, suffix='',
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _temp_file(path):
|
||||||
|
return _tempfile(path.read_bytes, suffix=path.name)
|
||||||
|
|
||||||
|
|
||||||
|
def _is_present_dir(path: Traversable) -> bool:
|
||||||
|
"""
|
||||||
|
Some Traversables implement ``is_dir()`` to raise an
|
||||||
|
exception (i.e. ``FileNotFoundError``) when the
|
||||||
|
directory doesn't exist. This function wraps that call
|
||||||
|
to always return a boolean and only return True
|
||||||
|
if there's a dir and it exists.
|
||||||
|
"""
|
||||||
|
with contextlib.suppress(FileNotFoundError):
|
||||||
|
return path.is_dir()
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
@functools.singledispatch
|
@functools.singledispatch
|
||||||
def as_file(path):
|
def as_file(path):
|
||||||
"""
|
"""
|
||||||
Given a Traversable object, return that object as a
|
Given a Traversable object, return that object as a
|
||||||
path on the local file system in a context manager.
|
path on the local file system in a context manager.
|
||||||
"""
|
"""
|
||||||
return _tempfile(path.read_bytes, suffix=path.name)
|
return _temp_dir(path) if _is_present_dir(path) else _temp_file(path)
|
||||||
|
|
||||||
|
|
||||||
@as_file.register(pathlib.Path)
|
@as_file.register(pathlib.Path)
|
||||||
|
@ -105,3 +126,34 @@ def _(path):
|
||||||
Degenerate behavior for pathlib.Path objects.
|
Degenerate behavior for pathlib.Path objects.
|
||||||
"""
|
"""
|
||||||
yield path
|
yield path
|
||||||
|
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def _temp_path(dir: tempfile.TemporaryDirectory):
|
||||||
|
"""
|
||||||
|
Wrap tempfile.TemporyDirectory to return a pathlib object.
|
||||||
|
"""
|
||||||
|
with dir as result:
|
||||||
|
yield pathlib.Path(result)
|
||||||
|
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def _temp_dir(path):
|
||||||
|
"""
|
||||||
|
Given a traversable dir, recursively replicate the whole tree
|
||||||
|
to the file system in a context manager.
|
||||||
|
"""
|
||||||
|
assert path.is_dir()
|
||||||
|
with _temp_path(tempfile.TemporaryDirectory()) as temp_dir:
|
||||||
|
yield _write_contents(temp_dir, path)
|
||||||
|
|
||||||
|
|
||||||
|
def _write_contents(target, source):
|
||||||
|
child = target.joinpath(source.name)
|
||||||
|
if source.is_dir():
|
||||||
|
child.mkdir()
|
||||||
|
for item in source.iterdir():
|
||||||
|
_write_contents(child, item)
|
||||||
|
else:
|
||||||
|
child.open('wb').write(source.read_bytes())
|
||||||
|
return child
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
import abc
|
import abc
|
||||||
import io
|
import io
|
||||||
|
import itertools
|
||||||
import os
|
import os
|
||||||
|
import pathlib
|
||||||
from typing import Any, BinaryIO, Iterable, Iterator, NoReturn, Text, Optional
|
from typing import Any, BinaryIO, Iterable, Iterator, NoReturn, Text, Optional
|
||||||
from typing import runtime_checkable, Protocol
|
from typing import runtime_checkable, Protocol
|
||||||
from typing import Union
|
from typing import Union
|
||||||
|
@ -53,6 +55,10 @@ class ResourceReader(metaclass=abc.ABCMeta):
|
||||||
raise FileNotFoundError
|
raise FileNotFoundError
|
||||||
|
|
||||||
|
|
||||||
|
class TraversalError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
@runtime_checkable
|
@runtime_checkable
|
||||||
class Traversable(Protocol):
|
class Traversable(Protocol):
|
||||||
"""
|
"""
|
||||||
|
@ -95,7 +101,6 @@ class Traversable(Protocol):
|
||||||
Return True if self is a file
|
Return True if self is a file
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def joinpath(self, *descendants: StrPath) -> "Traversable":
|
def joinpath(self, *descendants: StrPath) -> "Traversable":
|
||||||
"""
|
"""
|
||||||
Return Traversable resolved with any descendants applied.
|
Return Traversable resolved with any descendants applied.
|
||||||
|
@ -104,6 +109,22 @@ class Traversable(Protocol):
|
||||||
and each may contain multiple levels separated by
|
and each may contain multiple levels separated by
|
||||||
``posixpath.sep`` (``/``).
|
``posixpath.sep`` (``/``).
|
||||||
"""
|
"""
|
||||||
|
if not descendants:
|
||||||
|
return self
|
||||||
|
names = itertools.chain.from_iterable(
|
||||||
|
path.parts for path in map(pathlib.PurePosixPath, descendants)
|
||||||
|
)
|
||||||
|
target = next(names)
|
||||||
|
matches = (
|
||||||
|
traversable for traversable in self.iterdir() if traversable.name == target
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
match = next(matches)
|
||||||
|
except StopIteration:
|
||||||
|
raise TraversalError(
|
||||||
|
"Target not found during traversal.", target, list(names)
|
||||||
|
)
|
||||||
|
return match.joinpath(*names)
|
||||||
|
|
||||||
def __truediv__(self, child: StrPath) -> "Traversable":
|
def __truediv__(self, child: StrPath) -> "Traversable":
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -82,15 +82,13 @@ class MultiplexedPath(abc.Traversable):
|
||||||
def is_file(self):
|
def is_file(self):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def joinpath(self, child):
|
def joinpath(self, *descendants):
|
||||||
# first try to find child in current paths
|
try:
|
||||||
for file in self.iterdir():
|
return super().joinpath(*descendants)
|
||||||
if file.name == child:
|
except abc.TraversalError:
|
||||||
return file
|
# One of the paths did not resolve (a directory does not exist).
|
||||||
# if it does not exist, construct it with the first path
|
# Just return something that will not exist.
|
||||||
return self._paths[0] / child
|
return self._paths[0].joinpath(*descendants)
|
||||||
|
|
||||||
__truediv__ = joinpath
|
|
||||||
|
|
||||||
def open(self, *args, **kwargs):
|
def open(self, *args, **kwargs):
|
||||||
raise FileNotFoundError(f'{self} is not a file')
|
raise FileNotFoundError(f'{self} is not a file')
|
||||||
|
|
|
@ -99,20 +99,6 @@ class ResourceContainer(Traversable):
|
||||||
def open(self, *args, **kwargs):
|
def open(self, *args, **kwargs):
|
||||||
raise IsADirectoryError()
|
raise IsADirectoryError()
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _flatten(compound_names):
|
|
||||||
for name in compound_names:
|
|
||||||
yield from name.split('/')
|
|
||||||
|
|
||||||
def joinpath(self, *descendants):
|
|
||||||
if not descendants:
|
|
||||||
return self
|
|
||||||
names = self._flatten(descendants)
|
|
||||||
target = next(names)
|
|
||||||
return next(
|
|
||||||
traversable for traversable in self.iterdir() if traversable.name == target
|
|
||||||
).joinpath(*names)
|
|
||||||
|
|
||||||
|
|
||||||
class TraversableReader(TraversableResources, SimpleReader):
|
class TraversableReader(TraversableResources, SimpleReader):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -75,6 +75,11 @@ class MultiplexedPathTest(unittest.TestCase):
|
||||||
str(path.joinpath('imaginary'))[len(prefix) + 1 :],
|
str(path.joinpath('imaginary'))[len(prefix) + 1 :],
|
||||||
os.path.join('namespacedata01', 'imaginary'),
|
os.path.join('namespacedata01', 'imaginary'),
|
||||||
)
|
)
|
||||||
|
self.assertEqual(path.joinpath(), path)
|
||||||
|
|
||||||
|
def test_join_path_compound(self):
|
||||||
|
path = MultiplexedPath(self.folder)
|
||||||
|
assert not path.joinpath('imaginary/foo.py').exists()
|
||||||
|
|
||||||
def test_repr(self):
|
def test_repr(self):
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
|
|
|
@ -111,6 +111,14 @@ class ResourceFromZipsTest01(util.ZipSetupBase, unittest.TestCase):
|
||||||
{'__init__.py', 'binary.file'},
|
{'__init__.py', 'binary.file'},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_as_file_directory(self):
|
||||||
|
with resources.as_file(resources.files('ziptestdata')) as data:
|
||||||
|
assert data.name == 'ziptestdata'
|
||||||
|
assert data.is_dir()
|
||||||
|
assert data.joinpath('subdirectory').is_dir()
|
||||||
|
assert len(list(data.iterdir()))
|
||||||
|
assert not data.parent.exists()
|
||||||
|
|
||||||
|
|
||||||
class ResourceFromZipsTest02(util.ZipSetupBase, unittest.TestCase):
|
class ResourceFromZipsTest02(util.ZipSetupBase, unittest.TestCase):
|
||||||
ZIP_MODULE = zipdata02 # type: ignore
|
ZIP_MODULE = zipdata02 # type: ignore
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
Apply changes from importlib_resources 5.8 and 5.9: ``Traversable.joinpath``
|
||||||
|
provides a concrete implementation. ``as_file`` now supports directories of
|
||||||
|
resources.
|
Loading…
Add table
Add a link
Reference in a new issue