mirror of
https://github.com/python/cpython.git
synced 2025-09-26 18:29:57 +00:00
GH-128520: Make pathlib._abc.WritablePath
a sibling of ReadablePath
(#129014)
In the private pathlib ABCs, support write-only virtual filesystems by making `WritablePath` inherit directly from `JoinablePath`, rather than subclassing `ReadablePath`. There are two complications: - `ReadablePath.open()` applies to both reading and writing - `ReadablePath.copy` is secretly an object that supports the *read* side of copying, whereas `WritablePath.copy` is a different kind of object supporting the *write* side We untangle these as follow: - A new `pathlib._abc.magic_open()` function replaces the `open()` method, which is dropped from the ABCs but remains in `pathlib.Path`. The function works like `io.open()`, but additionally accepts objects with `__open_rb__()` or `__open_wb__()` methods as appropriate for the mode. These new dunders are made abstract methods of `ReadablePath` and `WritablePath` respectively. If the pathlib ABCs are made public, we could consider blessing an "openable" protocol and supporting it in `io.open()`, removing the need for `pathlib._abc.magic_open()`. - `ReadablePath.copy` becomes a true method, whereas `WritablePath.copy` is deleted. A new `ReadablePath._copy_reader` property provides a `CopyReader` object, and similarly `WritablePath._copy_writer` is a `CopyWriter` object. Once GH-125413 is resolved, we'll be able to move the `CopyReader` functionality into `ReadablePath.info` and eliminate `ReadablePath._copy_reader`.
This commit is contained in:
parent
3d7c0e5366
commit
01d91500ca
4 changed files with 178 additions and 115 deletions
|
@ -12,6 +12,7 @@ WritablePath.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import functools
|
import functools
|
||||||
|
import io
|
||||||
import operator
|
import operator
|
||||||
import posixpath
|
import posixpath
|
||||||
from errno import EINVAL
|
from errno import EINVAL
|
||||||
|
@ -41,6 +42,40 @@ def _explode_path(path):
|
||||||
return path, names
|
return path, names
|
||||||
|
|
||||||
|
|
||||||
|
def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None,
|
||||||
|
newline=None):
|
||||||
|
"""
|
||||||
|
Open the file pointed to by this path and return a file object, as
|
||||||
|
the built-in open() function does.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return io.open(path, mode, buffering, encoding, errors, newline)
|
||||||
|
except TypeError:
|
||||||
|
pass
|
||||||
|
cls = type(path)
|
||||||
|
text = 'b' not in mode
|
||||||
|
mode = ''.join(sorted(c for c in mode if c not in 'bt'))
|
||||||
|
if text:
|
||||||
|
try:
|
||||||
|
attr = getattr(cls, f'__open_{mode}__')
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
return attr(path, buffering, encoding, errors, newline)
|
||||||
|
|
||||||
|
try:
|
||||||
|
attr = getattr(cls, f'__open_{mode}b__')
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
stream = attr(path, buffering)
|
||||||
|
if text:
|
||||||
|
stream = io.TextIOWrapper(stream, encoding, errors, newline)
|
||||||
|
return stream
|
||||||
|
|
||||||
|
raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}")
|
||||||
|
|
||||||
|
|
||||||
class PathGlobber(_GlobberBase):
|
class PathGlobber(_GlobberBase):
|
||||||
"""
|
"""
|
||||||
Class providing shell-style globbing for path objects.
|
Class providing shell-style globbing for path objects.
|
||||||
|
@ -58,35 +93,15 @@ class PathGlobber(_GlobberBase):
|
||||||
|
|
||||||
class CopyReader:
|
class CopyReader:
|
||||||
"""
|
"""
|
||||||
Class that implements copying between path objects. An instance of this
|
Class that implements the "read" part of copying between path objects.
|
||||||
class is available from the ReadablePath.copy property; it's made callable
|
An instance of this class is available from the ReadablePath._copy_reader
|
||||||
so that ReadablePath.copy() can be treated as a method.
|
property.
|
||||||
|
|
||||||
The target path's CopyWriter drives the process from its _create() method.
|
|
||||||
Files and directories are exchanged by calling methods on the source and
|
|
||||||
target paths, and metadata is exchanged by calling
|
|
||||||
source.copy._read_metadata() and target.copy._write_metadata().
|
|
||||||
"""
|
"""
|
||||||
__slots__ = ('_path',)
|
__slots__ = ('_path',)
|
||||||
|
|
||||||
def __init__(self, path):
|
def __init__(self, path):
|
||||||
self._path = path
|
self._path = path
|
||||||
|
|
||||||
def __call__(self, target, follow_symlinks=True, dirs_exist_ok=False,
|
|
||||||
preserve_metadata=False):
|
|
||||||
"""
|
|
||||||
Recursively copy this file or directory tree to the given destination.
|
|
||||||
"""
|
|
||||||
if not isinstance(target, ReadablePath):
|
|
||||||
target = self._path.with_segments(target)
|
|
||||||
|
|
||||||
# Delegate to the target path's CopyWriter object.
|
|
||||||
try:
|
|
||||||
create = target.copy._create
|
|
||||||
except AttributeError:
|
|
||||||
raise TypeError(f"Target is not writable: {target}") from None
|
|
||||||
return create(self._path, follow_symlinks, dirs_exist_ok, preserve_metadata)
|
|
||||||
|
|
||||||
_readable_metakeys = frozenset()
|
_readable_metakeys = frozenset()
|
||||||
|
|
||||||
def _read_metadata(self, metakeys, *, follow_symlinks=True):
|
def _read_metadata(self, metakeys, *, follow_symlinks=True):
|
||||||
|
@ -96,8 +111,16 @@ class CopyReader:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
class CopyWriter(CopyReader):
|
class CopyWriter:
|
||||||
__slots__ = ()
|
"""
|
||||||
|
Class that implements the "write" part of copying between path objects. An
|
||||||
|
instance of this class is available from the WritablePath._copy_writer
|
||||||
|
property.
|
||||||
|
"""
|
||||||
|
__slots__ = ('_path',)
|
||||||
|
|
||||||
|
def __init__(self, path):
|
||||||
|
self._path = path
|
||||||
|
|
||||||
_writable_metakeys = frozenset()
|
_writable_metakeys = frozenset()
|
||||||
|
|
||||||
|
@ -110,7 +133,7 @@ class CopyWriter(CopyReader):
|
||||||
def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
|
def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
|
||||||
self._ensure_distinct_path(source)
|
self._ensure_distinct_path(source)
|
||||||
if preserve_metadata:
|
if preserve_metadata:
|
||||||
metakeys = self._writable_metakeys & source.copy._readable_metakeys
|
metakeys = self._writable_metakeys & source._copy_reader._readable_metakeys
|
||||||
else:
|
else:
|
||||||
metakeys = None
|
metakeys = None
|
||||||
if not follow_symlinks and source.is_symlink():
|
if not follow_symlinks and source.is_symlink():
|
||||||
|
@ -128,22 +151,22 @@ class CopyWriter(CopyReader):
|
||||||
for src in children:
|
for src in children:
|
||||||
dst = self._path.joinpath(src.name)
|
dst = self._path.joinpath(src.name)
|
||||||
if not follow_symlinks and src.is_symlink():
|
if not follow_symlinks and src.is_symlink():
|
||||||
dst.copy._create_symlink(src, metakeys)
|
dst._copy_writer._create_symlink(src, metakeys)
|
||||||
elif src.is_dir():
|
elif src.is_dir():
|
||||||
dst.copy._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok)
|
dst._copy_writer._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok)
|
||||||
else:
|
else:
|
||||||
dst.copy._create_file(src, metakeys)
|
dst._copy_writer._create_file(src, metakeys)
|
||||||
if metakeys:
|
if metakeys:
|
||||||
metadata = source.copy._read_metadata(metakeys)
|
metadata = source._copy_reader._read_metadata(metakeys)
|
||||||
if metadata:
|
if metadata:
|
||||||
self._write_metadata(metadata)
|
self._write_metadata(metadata)
|
||||||
|
|
||||||
def _create_file(self, source, metakeys):
|
def _create_file(self, source, metakeys):
|
||||||
"""Copy the given file to our path."""
|
"""Copy the given file to our path."""
|
||||||
self._ensure_different_file(source)
|
self._ensure_different_file(source)
|
||||||
with source.open('rb') as source_f:
|
with magic_open(source, 'rb') as source_f:
|
||||||
try:
|
try:
|
||||||
with self._path.open('wb') as target_f:
|
with magic_open(self._path, 'wb') as target_f:
|
||||||
copyfileobj(source_f, target_f)
|
copyfileobj(source_f, target_f)
|
||||||
except IsADirectoryError as e:
|
except IsADirectoryError as e:
|
||||||
if not self._path.exists():
|
if not self._path.exists():
|
||||||
|
@ -152,7 +175,7 @@ class CopyWriter(CopyReader):
|
||||||
f'Directory does not exist: {self._path}') from e
|
f'Directory does not exist: {self._path}') from e
|
||||||
raise
|
raise
|
||||||
if metakeys:
|
if metakeys:
|
||||||
metadata = source.copy._read_metadata(metakeys)
|
metadata = source._copy_reader._read_metadata(metakeys)
|
||||||
if metadata:
|
if metadata:
|
||||||
self._write_metadata(metadata)
|
self._write_metadata(metadata)
|
||||||
|
|
||||||
|
@ -160,7 +183,7 @@ class CopyWriter(CopyReader):
|
||||||
"""Copy the given symbolic link to our path."""
|
"""Copy the given symbolic link to our path."""
|
||||||
self._path.symlink_to(source.readlink())
|
self._path.symlink_to(source.readlink())
|
||||||
if metakeys:
|
if metakeys:
|
||||||
metadata = source.copy._read_metadata(metakeys, follow_symlinks=False)
|
metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False)
|
||||||
if metadata:
|
if metadata:
|
||||||
self._write_metadata(metadata, follow_symlinks=False)
|
self._write_metadata(metadata, follow_symlinks=False)
|
||||||
|
|
||||||
|
@ -420,11 +443,10 @@ class ReadablePath(JoinablePath):
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
def open(self, mode='r', buffering=-1, encoding=None,
|
def __open_rb__(self, buffering=-1):
|
||||||
errors=None, newline=None):
|
|
||||||
"""
|
"""
|
||||||
Open the file pointed to by this path and return a file object, as
|
Open the file pointed to by this path for reading in binary mode and
|
||||||
the built-in open() function does.
|
return a file object, like open(mode='rb').
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@ -432,14 +454,14 @@ class ReadablePath(JoinablePath):
|
||||||
"""
|
"""
|
||||||
Open the file in bytes mode, read it, and close the file.
|
Open the file in bytes mode, read it, and close the file.
|
||||||
"""
|
"""
|
||||||
with self.open(mode='rb', buffering=0) as f:
|
with magic_open(self, mode='rb', buffering=0) as f:
|
||||||
return f.read()
|
return f.read()
|
||||||
|
|
||||||
def read_text(self, encoding=None, errors=None, newline=None):
|
def read_text(self, encoding=None, errors=None, newline=None):
|
||||||
"""
|
"""
|
||||||
Open the file in text mode, read it, and close the file.
|
Open the file in text mode, read it, and close the file.
|
||||||
"""
|
"""
|
||||||
with self.open(mode='r', encoding=encoding, errors=errors, newline=newline) as f:
|
with magic_open(self, mode='r', encoding=encoding, errors=errors, newline=newline) as f:
|
||||||
return f.read()
|
return f.read()
|
||||||
|
|
||||||
def _scandir(self):
|
def _scandir(self):
|
||||||
|
@ -532,7 +554,22 @@ class ReadablePath(JoinablePath):
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
copy = property(CopyReader, doc=CopyReader.__call__.__doc__)
|
_copy_reader = property(CopyReader)
|
||||||
|
|
||||||
|
def copy(self, target, follow_symlinks=True, dirs_exist_ok=False,
|
||||||
|
preserve_metadata=False):
|
||||||
|
"""
|
||||||
|
Recursively copy this file or directory tree to the given destination.
|
||||||
|
"""
|
||||||
|
if not hasattr(target, '_copy_writer'):
|
||||||
|
target = self.with_segments(target)
|
||||||
|
|
||||||
|
# Delegate to the target path's CopyWriter object.
|
||||||
|
try:
|
||||||
|
create = target._copy_writer._create
|
||||||
|
except AttributeError:
|
||||||
|
raise TypeError(f"Target is not writable: {target}") from None
|
||||||
|
return create(self, follow_symlinks, dirs_exist_ok, preserve_metadata)
|
||||||
|
|
||||||
def copy_into(self, target_dir, *, follow_symlinks=True,
|
def copy_into(self, target_dir, *, follow_symlinks=True,
|
||||||
dirs_exist_ok=False, preserve_metadata=False):
|
dirs_exist_ok=False, preserve_metadata=False):
|
||||||
|
@ -542,7 +579,7 @@ class ReadablePath(JoinablePath):
|
||||||
name = self.name
|
name = self.name
|
||||||
if not name:
|
if not name:
|
||||||
raise ValueError(f"{self!r} has an empty name")
|
raise ValueError(f"{self!r} has an empty name")
|
||||||
elif isinstance(target_dir, ReadablePath):
|
elif hasattr(target_dir, '_copy_writer'):
|
||||||
target = target_dir / name
|
target = target_dir / name
|
||||||
else:
|
else:
|
||||||
target = self.with_segments(target_dir, name)
|
target = self.with_segments(target_dir, name)
|
||||||
|
@ -551,7 +588,7 @@ class ReadablePath(JoinablePath):
|
||||||
preserve_metadata=preserve_metadata)
|
preserve_metadata=preserve_metadata)
|
||||||
|
|
||||||
|
|
||||||
class WritablePath(ReadablePath):
|
class WritablePath(JoinablePath):
|
||||||
__slots__ = ()
|
__slots__ = ()
|
||||||
|
|
||||||
def symlink_to(self, target, target_is_directory=False):
|
def symlink_to(self, target, target_is_directory=False):
|
||||||
|
@ -567,13 +604,20 @@ class WritablePath(ReadablePath):
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def __open_wb__(self, buffering=-1):
|
||||||
|
"""
|
||||||
|
Open the file pointed to by this path for writing in binary mode and
|
||||||
|
return a file object, like open(mode='wb').
|
||||||
|
"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
def write_bytes(self, data):
|
def write_bytes(self, data):
|
||||||
"""
|
"""
|
||||||
Open the file in bytes mode, write to it, and close the file.
|
Open the file in bytes mode, write to it, and close the file.
|
||||||
"""
|
"""
|
||||||
# type-check for the buffer interface before truncating the file
|
# type-check for the buffer interface before truncating the file
|
||||||
view = memoryview(data)
|
view = memoryview(data)
|
||||||
with self.open(mode='wb') as f:
|
with magic_open(self, mode='wb') as f:
|
||||||
return f.write(view)
|
return f.write(view)
|
||||||
|
|
||||||
def write_text(self, data, encoding=None, errors=None, newline=None):
|
def write_text(self, data, encoding=None, errors=None, newline=None):
|
||||||
|
@ -583,7 +627,7 @@ class WritablePath(ReadablePath):
|
||||||
if not isinstance(data, str):
|
if not isinstance(data, str):
|
||||||
raise TypeError('data must be str, not %s' %
|
raise TypeError('data must be str, not %s' %
|
||||||
data.__class__.__name__)
|
data.__class__.__name__)
|
||||||
with self.open(mode='w', encoding=encoding, errors=errors, newline=newline) as f:
|
with magic_open(self, mode='w', encoding=encoding, errors=errors, newline=newline) as f:
|
||||||
return f.write(data)
|
return f.write(data)
|
||||||
|
|
||||||
copy = property(CopyWriter, doc=CopyWriter.__call__.__doc__)
|
_copy_writer = property(CopyWriter)
|
||||||
|
|
|
@ -20,7 +20,7 @@ except ImportError:
|
||||||
grp = None
|
grp = None
|
||||||
|
|
||||||
from pathlib._os import copyfile
|
from pathlib._os import copyfile
|
||||||
from pathlib._abc import CopyWriter, JoinablePath, WritablePath
|
from pathlib._abc import CopyReader, CopyWriter, JoinablePath, ReadablePath, WritablePath
|
||||||
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
|
@ -65,9 +65,10 @@ class _PathParents(Sequence):
|
||||||
return "<{}.parents>".format(type(self._path).__name__)
|
return "<{}.parents>".format(type(self._path).__name__)
|
||||||
|
|
||||||
|
|
||||||
class _LocalCopyWriter(CopyWriter):
|
class _LocalCopyReader(CopyReader):
|
||||||
"""This object implements the Path.copy callable. Don't try to construct
|
"""This object implements the "read" part of copying local paths. Don't
|
||||||
it yourself."""
|
try to construct it yourself.
|
||||||
|
"""
|
||||||
__slots__ = ()
|
__slots__ = ()
|
||||||
|
|
||||||
_readable_metakeys = {'mode', 'times_ns'}
|
_readable_metakeys = {'mode', 'times_ns'}
|
||||||
|
@ -75,7 +76,7 @@ class _LocalCopyWriter(CopyWriter):
|
||||||
_readable_metakeys.add('flags')
|
_readable_metakeys.add('flags')
|
||||||
if hasattr(os, 'listxattr'):
|
if hasattr(os, 'listxattr'):
|
||||||
_readable_metakeys.add('xattrs')
|
_readable_metakeys.add('xattrs')
|
||||||
_readable_metakeys = _writable_metakeys = frozenset(_readable_metakeys)
|
_readable_metakeys = frozenset(_readable_metakeys)
|
||||||
|
|
||||||
def _read_metadata(self, metakeys, *, follow_symlinks=True):
|
def _read_metadata(self, metakeys, *, follow_symlinks=True):
|
||||||
metadata = {}
|
metadata = {}
|
||||||
|
@ -97,6 +98,15 @@ class _LocalCopyWriter(CopyWriter):
|
||||||
raise
|
raise
|
||||||
return metadata
|
return metadata
|
||||||
|
|
||||||
|
|
||||||
|
class _LocalCopyWriter(CopyWriter):
|
||||||
|
"""This object implements the "write" part of copying local paths. Don't
|
||||||
|
try to construct it yourself.
|
||||||
|
"""
|
||||||
|
__slots__ = ()
|
||||||
|
|
||||||
|
_writable_metakeys = _LocalCopyReader._readable_metakeys
|
||||||
|
|
||||||
def _write_metadata(self, metadata, *, follow_symlinks=True):
|
def _write_metadata(self, metadata, *, follow_symlinks=True):
|
||||||
def _nop(*args, ns=None, follow_symlinks=None):
|
def _nop(*args, ns=None, follow_symlinks=None):
|
||||||
pass
|
pass
|
||||||
|
@ -171,7 +181,7 @@ class _LocalCopyWriter(CopyWriter):
|
||||||
"""Copy the given symlink to the given target."""
|
"""Copy the given symlink to the given target."""
|
||||||
self._path.symlink_to(source.readlink(), source.is_dir())
|
self._path.symlink_to(source.readlink(), source.is_dir())
|
||||||
if metakeys:
|
if metakeys:
|
||||||
metadata = source.copy._read_metadata(metakeys, follow_symlinks=False)
|
metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False)
|
||||||
if metadata:
|
if metadata:
|
||||||
self._write_metadata(metadata, follow_symlinks=False)
|
self._write_metadata(metadata, follow_symlinks=False)
|
||||||
|
|
||||||
|
@ -683,7 +693,7 @@ class PureWindowsPath(PurePath):
|
||||||
__slots__ = ()
|
__slots__ = ()
|
||||||
|
|
||||||
|
|
||||||
class Path(WritablePath, PurePath):
|
class Path(WritablePath, ReadablePath, PurePath):
|
||||||
"""PurePath subclass that can make system calls.
|
"""PurePath subclass that can make system calls.
|
||||||
|
|
||||||
Path represents a filesystem path but unlike PurePath, also offers
|
Path represents a filesystem path but unlike PurePath, also offers
|
||||||
|
@ -823,6 +833,13 @@ class Path(WritablePath, PurePath):
|
||||||
encoding = io.text_encoding(encoding)
|
encoding = io.text_encoding(encoding)
|
||||||
return io.open(self, mode, buffering, encoding, errors, newline)
|
return io.open(self, mode, buffering, encoding, errors, newline)
|
||||||
|
|
||||||
|
def read_bytes(self):
|
||||||
|
"""
|
||||||
|
Open the file in bytes mode, read it, and close the file.
|
||||||
|
"""
|
||||||
|
with self.open(mode='rb', buffering=0) as f:
|
||||||
|
return f.read()
|
||||||
|
|
||||||
def read_text(self, encoding=None, errors=None, newline=None):
|
def read_text(self, encoding=None, errors=None, newline=None):
|
||||||
"""
|
"""
|
||||||
Open the file in text mode, read it, and close the file.
|
Open the file in text mode, read it, and close the file.
|
||||||
|
@ -830,7 +847,17 @@ class Path(WritablePath, PurePath):
|
||||||
# Call io.text_encoding() here to ensure any warning is raised at an
|
# Call io.text_encoding() here to ensure any warning is raised at an
|
||||||
# appropriate stack level.
|
# appropriate stack level.
|
||||||
encoding = io.text_encoding(encoding)
|
encoding = io.text_encoding(encoding)
|
||||||
return super().read_text(encoding, errors, newline)
|
with self.open(mode='r', encoding=encoding, errors=errors, newline=newline) as f:
|
||||||
|
return f.read()
|
||||||
|
|
||||||
|
def write_bytes(self, data):
|
||||||
|
"""
|
||||||
|
Open the file in bytes mode, write to it, and close the file.
|
||||||
|
"""
|
||||||
|
# type-check for the buffer interface before truncating the file
|
||||||
|
view = memoryview(data)
|
||||||
|
with self.open(mode='wb') as f:
|
||||||
|
return f.write(view)
|
||||||
|
|
||||||
def write_text(self, data, encoding=None, errors=None, newline=None):
|
def write_text(self, data, encoding=None, errors=None, newline=None):
|
||||||
"""
|
"""
|
||||||
|
@ -839,7 +866,11 @@ class Path(WritablePath, PurePath):
|
||||||
# Call io.text_encoding() here to ensure any warning is raised at an
|
# Call io.text_encoding() here to ensure any warning is raised at an
|
||||||
# appropriate stack level.
|
# appropriate stack level.
|
||||||
encoding = io.text_encoding(encoding)
|
encoding = io.text_encoding(encoding)
|
||||||
return super().write_text(data, encoding, errors, newline)
|
if not isinstance(data, str):
|
||||||
|
raise TypeError('data must be str, not %s' %
|
||||||
|
data.__class__.__name__)
|
||||||
|
with self.open(mode='w', encoding=encoding, errors=errors, newline=newline) as f:
|
||||||
|
return f.write(data)
|
||||||
|
|
||||||
_remove_leading_dot = operator.itemgetter(slice(2, None))
|
_remove_leading_dot = operator.itemgetter(slice(2, None))
|
||||||
_remove_trailing_slash = operator.itemgetter(slice(-1))
|
_remove_trailing_slash = operator.itemgetter(slice(-1))
|
||||||
|
@ -1122,7 +1153,8 @@ class Path(WritablePath, PurePath):
|
||||||
os.replace(self, target)
|
os.replace(self, target)
|
||||||
return self.with_segments(target)
|
return self.with_segments(target)
|
||||||
|
|
||||||
copy = property(_LocalCopyWriter, doc=_LocalCopyWriter.__call__.__doc__)
|
_copy_reader = property(_LocalCopyReader)
|
||||||
|
_copy_writer = property(_LocalCopyWriter)
|
||||||
|
|
||||||
def move(self, target):
|
def move(self, target):
|
||||||
"""
|
"""
|
||||||
|
@ -1134,9 +1166,9 @@ class Path(WritablePath, PurePath):
|
||||||
except TypeError:
|
except TypeError:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
if not isinstance(target, WritablePath):
|
if not hasattr(target, '_copy_writer'):
|
||||||
target = self.with_segments(target_str)
|
target = self.with_segments(target_str)
|
||||||
target.copy._ensure_different_file(self)
|
target._copy_writer._ensure_different_file(self)
|
||||||
try:
|
try:
|
||||||
os.replace(self, target_str)
|
os.replace(self, target_str)
|
||||||
return target
|
return target
|
||||||
|
@ -1155,7 +1187,7 @@ class Path(WritablePath, PurePath):
|
||||||
name = self.name
|
name = self.name
|
||||||
if not name:
|
if not name:
|
||||||
raise ValueError(f"{self!r} has an empty name")
|
raise ValueError(f"{self!r} has an empty name")
|
||||||
elif isinstance(target_dir, WritablePath):
|
elif hasattr(target_dir, '_copy_writer'):
|
||||||
target = target_dir / name
|
target = target_dir / name
|
||||||
else:
|
else:
|
||||||
target = self.with_segments(target_dir, name)
|
target = self.with_segments(target_dir, name)
|
||||||
|
|
|
@ -924,7 +924,7 @@ class PurePathSubclassTest(PurePathTest):
|
||||||
# Tests for the concrete classes.
|
# Tests for the concrete classes.
|
||||||
#
|
#
|
||||||
|
|
||||||
class PathTest(test_pathlib_abc.DummyWritablePathTest, PurePathTest):
|
class PathTest(test_pathlib_abc.DummyRWPathTest, PurePathTest):
|
||||||
"""Tests for the FS-accessing functionalities of the Path classes."""
|
"""Tests for the FS-accessing functionalities of the Path classes."""
|
||||||
cls = pathlib.Path
|
cls = pathlib.Path
|
||||||
can_symlink = os_helper.can_symlink()
|
can_symlink = os_helper.can_symlink()
|
||||||
|
@ -1102,6 +1102,15 @@ class PathTest(test_pathlib_abc.DummyWritablePathTest, PurePathTest):
|
||||||
for dirpath, dirnames, filenames in p.walk():
|
for dirpath, dirnames, filenames in p.walk():
|
||||||
self.assertEqual(42, dirpath.session_id)
|
self.assertEqual(42, dirpath.session_id)
|
||||||
|
|
||||||
|
def test_open_common(self):
|
||||||
|
p = self.cls(self.base)
|
||||||
|
with (p / 'fileA').open('r') as f:
|
||||||
|
self.assertIsInstance(f, io.TextIOBase)
|
||||||
|
self.assertEqual(f.read(), "this is file A\n")
|
||||||
|
with (p / 'fileA').open('rb') as f:
|
||||||
|
self.assertIsInstance(f, io.BufferedIOBase)
|
||||||
|
self.assertEqual(f.read().strip(), b"this is file A")
|
||||||
|
|
||||||
def test_open_unbuffered(self):
|
def test_open_unbuffered(self):
|
||||||
p = self.cls(self.base)
|
p = self.cls(self.base)
|
||||||
with (p / 'fileA').open('rb', buffering=0) as f:
|
with (p / 'fileA').open('rb', buffering=0) as f:
|
||||||
|
|
|
@ -4,7 +4,7 @@ import os
|
||||||
import errno
|
import errno
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from pathlib._abc import JoinablePath, ReadablePath, WritablePath
|
from pathlib._abc import JoinablePath, ReadablePath, WritablePath, magic_open
|
||||||
from pathlib._types import Parser
|
from pathlib._types import Parser
|
||||||
import posixpath
|
import posixpath
|
||||||
|
|
||||||
|
@ -918,7 +918,7 @@ class DummyJoinablePathTest(unittest.TestCase):
|
||||||
|
|
||||||
class DummyWritablePathIO(io.BytesIO):
|
class DummyWritablePathIO(io.BytesIO):
|
||||||
"""
|
"""
|
||||||
Used by DummyWritablePath to implement `open('w')`
|
Used by DummyWritablePath to implement `__open_wb__()`
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, files, path):
|
def __init__(self, files, path):
|
||||||
|
@ -931,38 +931,16 @@ class DummyWritablePathIO(io.BytesIO):
|
||||||
super().close()
|
super().close()
|
||||||
|
|
||||||
|
|
||||||
class DummyReadablePath(ReadablePath):
|
class DummyReadablePath(ReadablePath, DummyJoinablePath):
|
||||||
"""
|
"""
|
||||||
Simple implementation of DummyReadablePath that keeps files and
|
Simple implementation of DummyReadablePath that keeps files and
|
||||||
directories in memory.
|
directories in memory.
|
||||||
"""
|
"""
|
||||||
__slots__ = ('_segments')
|
__slots__ = ()
|
||||||
|
|
||||||
_files = {}
|
_files = {}
|
||||||
_directories = {}
|
_directories = {}
|
||||||
|
|
||||||
def __init__(self, *segments):
|
|
||||||
self._segments = segments
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
if self._segments:
|
|
||||||
return self.parser.join(*self._segments)
|
|
||||||
return ''
|
|
||||||
|
|
||||||
def __eq__(self, other):
|
|
||||||
if not isinstance(other, DummyReadablePath):
|
|
||||||
return NotImplemented
|
|
||||||
return str(self) == str(other)
|
|
||||||
|
|
||||||
def __hash__(self):
|
|
||||||
return hash(str(self))
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "{}({!r})".format(self.__class__.__name__, str(self))
|
|
||||||
|
|
||||||
def with_segments(self, *pathsegments):
|
|
||||||
return type(self)(*pathsegments)
|
|
||||||
|
|
||||||
def exists(self, *, follow_symlinks=True):
|
def exists(self, *, follow_symlinks=True):
|
||||||
return self.is_dir() or self.is_file()
|
return self.is_dir() or self.is_file()
|
||||||
|
|
||||||
|
@ -975,33 +953,13 @@ class DummyReadablePath(ReadablePath):
|
||||||
def is_symlink(self):
|
def is_symlink(self):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def open(self, mode='r', buffering=-1, encoding=None,
|
def __open_rb__(self, buffering=-1):
|
||||||
errors=None, newline=None):
|
|
||||||
if buffering != -1 and not (buffering == 0 and 'b' in mode):
|
|
||||||
raise NotImplementedError
|
|
||||||
path = str(self)
|
path = str(self)
|
||||||
if path in self._directories:
|
if path in self._directories:
|
||||||
raise IsADirectoryError(errno.EISDIR, "Is a directory", path)
|
raise IsADirectoryError(errno.EISDIR, "Is a directory", path)
|
||||||
|
elif path not in self._files:
|
||||||
text = 'b' not in mode
|
raise FileNotFoundError(errno.ENOENT, "File not found", path)
|
||||||
mode = ''.join(c for c in mode if c not in 'btU')
|
return io.BytesIO(self._files[path])
|
||||||
if mode == 'r':
|
|
||||||
if path not in self._files:
|
|
||||||
raise FileNotFoundError(errno.ENOENT, "File not found", path)
|
|
||||||
stream = io.BytesIO(self._files[path])
|
|
||||||
elif mode == 'w':
|
|
||||||
# FIXME: move to DummyWritablePath
|
|
||||||
parent, name = posixpath.split(path)
|
|
||||||
if parent not in self._directories:
|
|
||||||
raise FileNotFoundError(errno.ENOENT, "File not found", parent)
|
|
||||||
stream = DummyWritablePathIO(self._files, path)
|
|
||||||
self._files[path] = b''
|
|
||||||
self._directories[parent].add(name)
|
|
||||||
else:
|
|
||||||
raise NotImplementedError
|
|
||||||
if text:
|
|
||||||
stream = io.TextIOWrapper(stream, encoding=encoding, errors=errors, newline=newline)
|
|
||||||
return stream
|
|
||||||
|
|
||||||
def iterdir(self):
|
def iterdir(self):
|
||||||
path = str(self).rstrip('/')
|
path = str(self).rstrip('/')
|
||||||
|
@ -1013,9 +971,20 @@ class DummyReadablePath(ReadablePath):
|
||||||
raise FileNotFoundError(errno.ENOENT, "File not found", path)
|
raise FileNotFoundError(errno.ENOENT, "File not found", path)
|
||||||
|
|
||||||
|
|
||||||
class DummyWritablePath(DummyReadablePath, WritablePath):
|
class DummyWritablePath(WritablePath, DummyJoinablePath):
|
||||||
__slots__ = ()
|
__slots__ = ()
|
||||||
|
|
||||||
|
def __open_wb__(self, buffering=-1):
|
||||||
|
path = str(self)
|
||||||
|
if path in self._directories:
|
||||||
|
raise IsADirectoryError(errno.EISDIR, "Is a directory", path)
|
||||||
|
parent, name = posixpath.split(path)
|
||||||
|
if parent not in self._directories:
|
||||||
|
raise FileNotFoundError(errno.ENOENT, "File not found", parent)
|
||||||
|
self._files[path] = b''
|
||||||
|
self._directories[parent].add(name)
|
||||||
|
return DummyWritablePathIO(self._files, path)
|
||||||
|
|
||||||
def mkdir(self, mode=0o777, parents=False, exist_ok=False):
|
def mkdir(self, mode=0o777, parents=False, exist_ok=False):
|
||||||
path = str(self)
|
path = str(self)
|
||||||
parent = str(self.parent)
|
parent = str(self.parent)
|
||||||
|
@ -1121,12 +1090,12 @@ class DummyReadablePathTest(DummyJoinablePathTest):
|
||||||
self.assertIs(False, P(self.base + '\udfff').exists())
|
self.assertIs(False, P(self.base + '\udfff').exists())
|
||||||
self.assertIs(False, P(self.base + '\x00').exists())
|
self.assertIs(False, P(self.base + '\x00').exists())
|
||||||
|
|
||||||
def test_open_common(self):
|
def test_magic_open(self):
|
||||||
p = self.cls(self.base)
|
p = self.cls(self.base)
|
||||||
with (p / 'fileA').open('r') as f:
|
with magic_open(p / 'fileA', 'r') as f:
|
||||||
self.assertIsInstance(f, io.TextIOBase)
|
self.assertIsInstance(f, io.TextIOBase)
|
||||||
self.assertEqual(f.read(), "this is file A\n")
|
self.assertEqual(f.read(), "this is file A\n")
|
||||||
with (p / 'fileA').open('rb') as f:
|
with magic_open(p / 'fileA', 'rb') as f:
|
||||||
self.assertIsInstance(f, io.BufferedIOBase)
|
self.assertIsInstance(f, io.BufferedIOBase)
|
||||||
self.assertEqual(f.read().strip(), b"this is file A")
|
self.assertEqual(f.read().strip(), b"this is file A")
|
||||||
|
|
||||||
|
@ -1359,9 +1328,18 @@ class DummyReadablePathTest(DummyJoinablePathTest):
|
||||||
self.assertIs((P / 'linkA\x00').is_file(), False)
|
self.assertIs((P / 'linkA\x00').is_file(), False)
|
||||||
|
|
||||||
|
|
||||||
class DummyWritablePathTest(DummyReadablePathTest):
|
class DummyWritablePathTest(DummyJoinablePathTest):
|
||||||
cls = DummyWritablePath
|
cls = DummyWritablePath
|
||||||
|
|
||||||
|
|
||||||
|
class DummyRWPath(DummyWritablePath, DummyReadablePath):
|
||||||
|
__slots__ = ()
|
||||||
|
|
||||||
|
|
||||||
|
class DummyRWPathTest(DummyWritablePathTest, DummyReadablePathTest):
|
||||||
|
cls = DummyRWPath
|
||||||
|
can_symlink = False
|
||||||
|
|
||||||
def test_read_write_bytes(self):
|
def test_read_write_bytes(self):
|
||||||
p = self.cls(self.base)
|
p = self.cls(self.base)
|
||||||
(p / 'fileA').write_bytes(b'abcdefg')
|
(p / 'fileA').write_bytes(b'abcdefg')
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue