mirror of
https://github.com/python/cpython.git
synced 2025-11-03 03:22:27 +00:00
GH-127807: pathlib ABCs: move private copying methods to dedicated class (#127810)
Move 9 private `PathBase` attributes and methods into a new `CopyWorker` class. Change `PathBase.copy` from a method to a `CopyWorker` instance. The methods remain private in the `CopyWorker` class. In future we might make some/all of them public so that user subclasses of `PathBase` can customize the copying process (in particular reading/writing of metadata,) but we'd need to make `PathBase` public first.
This commit is contained in:
parent
f5ba74b819
commit
8d9f52a7be
3 changed files with 261 additions and 248 deletions
|
|
@ -57,6 +57,132 @@ class PathGlobber(_GlobberBase):
|
|||
return path.with_segments(str(path) + text)
|
||||
|
||||
|
||||
class CopyWorker:
|
||||
"""
|
||||
Class that implements copying between path objects. An instance of this
|
||||
class is available from the PathBase.copy property; it's made callable so
|
||||
that PathBase.copy() can be treated as a method.
|
||||
|
||||
The target path's CopyWorker drives the process from its _create() method.
|
||||
Files and directories are exchanged by calling methods on the source and
|
||||
target paths, and metadata is exchanged by calling
|
||||
source.copy._read_metadata() and target.copy._write_metadata().
|
||||
"""
|
||||
__slots__ = ('_path',)
|
||||
|
||||
def __init__(self, path):
|
||||
self._path = path
|
||||
|
||||
def __call__(self, target, follow_symlinks=True, dirs_exist_ok=False,
|
||||
preserve_metadata=False):
|
||||
"""
|
||||
Recursively copy this file or directory tree to the given destination.
|
||||
"""
|
||||
if not isinstance(target, PathBase):
|
||||
target = self._path.with_segments(target)
|
||||
|
||||
# Delegate to the target path's CopyWorker object.
|
||||
return target.copy._create(self._path, follow_symlinks, dirs_exist_ok, preserve_metadata)
|
||||
|
||||
_readable_metakeys = frozenset()
|
||||
|
||||
def _read_metadata(self, metakeys, *, follow_symlinks=True):
|
||||
"""
|
||||
Returns path metadata as a dict with string keys.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
_writable_metakeys = frozenset()
|
||||
|
||||
def _write_metadata(self, metadata, *, follow_symlinks=True):
|
||||
"""
|
||||
Sets path metadata from the given dict with string keys.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
|
||||
self._ensure_distinct_path(source)
|
||||
if preserve_metadata:
|
||||
metakeys = self._writable_metakeys & source.copy._readable_metakeys
|
||||
else:
|
||||
metakeys = None
|
||||
if not follow_symlinks and source.is_symlink():
|
||||
self._create_symlink(source, metakeys)
|
||||
elif source.is_dir():
|
||||
self._create_dir(source, metakeys, follow_symlinks, dirs_exist_ok)
|
||||
else:
|
||||
self._create_file(source, metakeys)
|
||||
return self._path
|
||||
|
||||
def _create_dir(self, source, metakeys, follow_symlinks, dirs_exist_ok):
|
||||
"""Copy the given directory to our path."""
|
||||
children = list(source.iterdir())
|
||||
self._path.mkdir(exist_ok=dirs_exist_ok)
|
||||
for src in children:
|
||||
dst = self._path.joinpath(src.name)
|
||||
if not follow_symlinks and src.is_symlink():
|
||||
dst.copy._create_symlink(src, metakeys)
|
||||
elif src.is_dir():
|
||||
dst.copy._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok)
|
||||
else:
|
||||
dst.copy._create_file(src, metakeys)
|
||||
if metakeys:
|
||||
metadata = source.copy._read_metadata(metakeys)
|
||||
if metadata:
|
||||
self._write_metadata(metadata)
|
||||
|
||||
def _create_file(self, source, metakeys):
|
||||
"""Copy the given file to our path."""
|
||||
self._ensure_different_file(source)
|
||||
with source.open('rb') as source_f:
|
||||
try:
|
||||
with self._path.open('wb') as target_f:
|
||||
copyfileobj(source_f, target_f)
|
||||
except IsADirectoryError as e:
|
||||
if not self._path.exists():
|
||||
# Raise a less confusing exception.
|
||||
raise FileNotFoundError(
|
||||
f'Directory does not exist: {self._path}') from e
|
||||
raise
|
||||
if metakeys:
|
||||
metadata = source.copy._read_metadata(metakeys)
|
||||
if metadata:
|
||||
self._write_metadata(metadata)
|
||||
|
||||
def _create_symlink(self, source, metakeys):
|
||||
"""Copy the given symbolic link to our path."""
|
||||
self._path.symlink_to(source.readlink())
|
||||
if metakeys:
|
||||
metadata = source.copy._read_metadata(metakeys, follow_symlinks=False)
|
||||
if metadata:
|
||||
self._write_metadata(metadata, follow_symlinks=False)
|
||||
|
||||
def _ensure_different_file(self, source):
|
||||
"""
|
||||
Raise OSError(EINVAL) if both paths refer to the same file.
|
||||
"""
|
||||
pass
|
||||
|
||||
def _ensure_distinct_path(self, source):
|
||||
"""
|
||||
Raise OSError(EINVAL) if the other path is within this path.
|
||||
"""
|
||||
# Note: there is no straightforward, foolproof algorithm to determine
|
||||
# if one directory is within another (a particularly perverse example
|
||||
# would be a single network share mounted in one location via NFS, and
|
||||
# in another location via CIFS), so we simply checks whether the
|
||||
# other path is lexically equal to, or within, this path.
|
||||
if source == self._path:
|
||||
err = OSError(EINVAL, "Source and target are the same path")
|
||||
elif source in self._path.parents:
|
||||
err = OSError(EINVAL, "Source path is a parent of target path")
|
||||
else:
|
||||
return
|
||||
err.filename = str(source)
|
||||
err.filename2 = str(self._path)
|
||||
raise err
|
||||
|
||||
|
||||
class PurePathBase:
|
||||
"""Base class for pure path objects.
|
||||
|
||||
|
|
@ -374,31 +500,6 @@ class PathBase(PurePathBase):
|
|||
except (OSError, ValueError):
|
||||
return False
|
||||
|
||||
def _ensure_different_file(self, other_path):
|
||||
"""
|
||||
Raise OSError(EINVAL) if both paths refer to the same file.
|
||||
"""
|
||||
pass
|
||||
|
||||
def _ensure_distinct_path(self, other_path):
|
||||
"""
|
||||
Raise OSError(EINVAL) if the other path is within this path.
|
||||
"""
|
||||
# Note: there is no straightforward, foolproof algorithm to determine
|
||||
# if one directory is within another (a particularly perverse example
|
||||
# would be a single network share mounted in one location via NFS, and
|
||||
# in another location via CIFS), so we simply checks whether the
|
||||
# other path is lexically equal to, or within, this path.
|
||||
if self == other_path:
|
||||
err = OSError(EINVAL, "Source and target are the same path")
|
||||
elif self in other_path.parents:
|
||||
err = OSError(EINVAL, "Source path is a parent of target path")
|
||||
else:
|
||||
return
|
||||
err.filename = str(self)
|
||||
err.filename2 = str(other_path)
|
||||
raise err
|
||||
|
||||
def open(self, mode='r', buffering=-1, encoding=None,
|
||||
errors=None, newline=None):
|
||||
"""
|
||||
|
|
@ -537,88 +638,13 @@ class PathBase(PurePathBase):
|
|||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def _symlink_to_target_of(self, link):
|
||||
"""
|
||||
Make this path a symlink with the same target as the given link. This
|
||||
is used by copy().
|
||||
"""
|
||||
self.symlink_to(link.readlink())
|
||||
|
||||
def mkdir(self, mode=0o777, parents=False, exist_ok=False):
|
||||
"""
|
||||
Create a new directory at this given path.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
# Metadata keys supported by this path type.
|
||||
_readable_metadata = _writable_metadata = frozenset()
|
||||
|
||||
def _read_metadata(self, keys=None, *, follow_symlinks=True):
|
||||
"""
|
||||
Returns path metadata as a dict with string keys.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def _write_metadata(self, metadata, *, follow_symlinks=True):
|
||||
"""
|
||||
Sets path metadata from the given dict with string keys.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def _copy_metadata(self, target, *, follow_symlinks=True):
|
||||
"""
|
||||
Copies metadata (permissions, timestamps, etc) from this path to target.
|
||||
"""
|
||||
# Metadata types supported by both source and target.
|
||||
keys = self._readable_metadata & target._writable_metadata
|
||||
if keys:
|
||||
metadata = self._read_metadata(keys, follow_symlinks=follow_symlinks)
|
||||
target._write_metadata(metadata, follow_symlinks=follow_symlinks)
|
||||
|
||||
def _copy_file(self, target):
|
||||
"""
|
||||
Copy the contents of this file to the given target.
|
||||
"""
|
||||
self._ensure_different_file(target)
|
||||
with self.open('rb') as source_f:
|
||||
try:
|
||||
with target.open('wb') as target_f:
|
||||
copyfileobj(source_f, target_f)
|
||||
except IsADirectoryError as e:
|
||||
if not target.exists():
|
||||
# Raise a less confusing exception.
|
||||
raise FileNotFoundError(
|
||||
f'Directory does not exist: {target}') from e
|
||||
else:
|
||||
raise
|
||||
|
||||
def copy(self, target, *, follow_symlinks=True, dirs_exist_ok=False,
|
||||
preserve_metadata=False):
|
||||
"""
|
||||
Recursively copy this file or directory tree to the given destination.
|
||||
"""
|
||||
if not isinstance(target, PathBase):
|
||||
target = self.with_segments(target)
|
||||
self._ensure_distinct_path(target)
|
||||
stack = [(self, target)]
|
||||
while stack:
|
||||
src, dst = stack.pop()
|
||||
if not follow_symlinks and src.is_symlink():
|
||||
dst._symlink_to_target_of(src)
|
||||
if preserve_metadata:
|
||||
src._copy_metadata(dst, follow_symlinks=False)
|
||||
elif src.is_dir():
|
||||
children = src.iterdir()
|
||||
dst.mkdir(exist_ok=dirs_exist_ok)
|
||||
stack.extend((child, dst.joinpath(child.name))
|
||||
for child in children)
|
||||
if preserve_metadata:
|
||||
src._copy_metadata(dst)
|
||||
else:
|
||||
src._copy_file(dst)
|
||||
if preserve_metadata:
|
||||
src._copy_metadata(dst)
|
||||
return target
|
||||
copy = property(CopyWorker, doc=CopyWorker.__call__.__doc__)
|
||||
|
||||
def copy_into(self, target_dir, *, follow_symlinks=True,
|
||||
dirs_exist_ok=False, preserve_metadata=False):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue