GH-125413: Add private pathlib.Path method to write metadata (#130238)

Replace `WritablePath._copy_writer` with a new `_write_info()` method. This
method allows the target of a `copy()` to preserve metadata.

Replace `pathlib._os.CopyWriter` and `LocalCopyWriter` classes with new
`copy_file()` and `copy_info()` functions. The `copy_file()` function uses
`source_path.info` wherever possible to save on `stat()`s.
This commit is contained in:
Barney Gale 2025-02-26 21:07:27 +00:00 committed by GitHub
parent 5ba69e747f
commit b251d409f9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 122 additions and 174 deletions

View file

@ -102,16 +102,16 @@ else:
if _winapi and hasattr(_winapi, 'CopyFile2'):
def copyfile(source, target):
def _copyfile2(source, target):
"""
Copy from one file to another using CopyFile2 (Windows only).
"""
_winapi.CopyFile2(source, target, 0)
else:
copyfile = None
_copyfile2 = None
def copyfileobj(source_f, target_f):
def _copyfileobj(source_f, target_f):
"""
Copy data from file-like object source_f to file-like object target_f.
"""
@ -200,70 +200,6 @@ def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None,
raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}")
class CopyWriter:
"""
Class that implements the "write" part of copying between path objects. An
instance of this class is available from the WritablePath._copy_writer
property.
"""
__slots__ = ('_path',)
def __init__(self, path):
self._path = path
def _copy_metadata(self, source, follow_symlinks=True):
"""Copy metadata from the given path to our path."""
pass
def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
ensure_distinct_paths(source, self._path)
if not follow_symlinks and source.is_symlink():
self._create_symlink(source, preserve_metadata)
elif source.is_dir():
self._create_dir(source, follow_symlinks, dirs_exist_ok, preserve_metadata)
else:
self._create_file(source, preserve_metadata)
return self._path
def _create_dir(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
"""Copy the given directory to our path."""
children = list(source.iterdir())
self._path.mkdir(exist_ok=dirs_exist_ok)
for src in children:
dst = self._path.joinpath(src.name)
if not follow_symlinks and src.is_symlink():
dst._copy_writer._create_symlink(src, preserve_metadata)
elif src.is_dir():
dst._copy_writer._create_dir(src, follow_symlinks, dirs_exist_ok, preserve_metadata)
else:
dst._copy_writer._create_file(src, preserve_metadata)
if preserve_metadata:
self._copy_metadata(source)
def _create_file(self, source, preserve_metadata):
"""Copy the given file to our path."""
ensure_different_files(source, self._path)
with magic_open(source, 'rb') as source_f:
try:
with magic_open(self._path, 'wb') as target_f:
copyfileobj(source_f, target_f)
except IsADirectoryError as e:
if not self._path.exists():
# Raise a less confusing exception.
raise FileNotFoundError(
f'Directory does not exist: {self._path}') from e
raise
if preserve_metadata:
self._copy_metadata(source)
def _create_symlink(self, source, preserve_metadata):
"""Copy the given symbolic link to our path."""
self._path.symlink_to(source.readlink())
if preserve_metadata:
self._copy_metadata(source, follow_symlinks=False)
def ensure_distinct_paths(source, target):
"""
Raise OSError(EINVAL) if the other path is within this path.
@ -284,94 +220,6 @@ def ensure_distinct_paths(source, target):
raise err
class LocalCopyWriter(CopyWriter):
"""This object implements the "write" part of copying local paths. Don't
try to construct it yourself.
"""
__slots__ = ()
def _copy_metadata(self, source, follow_symlinks=True):
"""Copy metadata from the given path to our path."""
target = self._path
info = source.info
copy_times_ns = (
hasattr(info, '_access_time_ns') and
hasattr(info, '_mod_time_ns') and
(follow_symlinks or os.utime in os.supports_follow_symlinks))
if copy_times_ns:
t0 = info._access_time_ns(follow_symlinks=follow_symlinks)
t1 = info._mod_time_ns(follow_symlinks=follow_symlinks)
os.utime(target, ns=(t0, t1), follow_symlinks=follow_symlinks)
# We must copy extended attributes before the file is (potentially)
# chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
copy_xattrs = (
hasattr(info, '_xattrs') and
hasattr(os, 'setxattr') and
(follow_symlinks or os.setxattr in os.supports_follow_symlinks))
if copy_xattrs:
xattrs = info._xattrs(follow_symlinks=follow_symlinks)
for attr, value in xattrs:
try:
os.setxattr(target, attr, value, follow_symlinks=follow_symlinks)
except OSError as e:
if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
raise
copy_posix_permissions = (
hasattr(info, '_posix_permissions') and
(follow_symlinks or os.chmod in os.supports_follow_symlinks))
if copy_posix_permissions:
posix_permissions = info._posix_permissions(follow_symlinks=follow_symlinks)
try:
os.chmod(target, posix_permissions, follow_symlinks=follow_symlinks)
except NotImplementedError:
# if we got a NotImplementedError, it's because
# * follow_symlinks=False,
# * lchown() is unavailable, and
# * either
# * fchownat() is unavailable or
# * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
# (it returned ENOSUP.)
# therefore we're out of options--we simply cannot chown the
# symlink. give up, suppress the error.
# (which is what shutil always did in this circumstance.)
pass
copy_bsd_flags = (
hasattr(info, '_bsd_flags') and
hasattr(os, 'chflags') and
(follow_symlinks or os.chflags in os.supports_follow_symlinks))
if copy_bsd_flags:
bsd_flags = info._bsd_flags(follow_symlinks=follow_symlinks)
try:
os.chflags(target, bsd_flags, follow_symlinks=follow_symlinks)
except OSError as why:
if why.errno not in (EOPNOTSUPP, ENOTSUP):
raise
if copyfile:
# Use fast OS routine for local file copying where available.
def _create_file(self, source, preserve_metadata):
"""Copy the given file to the given target."""
try:
source = os.fspath(source)
except TypeError:
super()._create_file(source, preserve_metadata)
else:
copyfile(source, os.fspath(self._path))
if os.name == 'nt':
# Windows: symlink target might not exist yet if we're copying several
# files, so ensure we pass is_dir to os.symlink().
def _create_symlink(self, source, preserve_metadata):
"""Copy the given symlink to the given target."""
self._path.symlink_to(source.readlink(), source.is_dir())
if preserve_metadata:
self._copy_metadata(source, follow_symlinks=False)
def ensure_different_files(source, target):
"""
Raise OSError(EINVAL) if both paths refer to the same file.
@ -394,6 +242,102 @@ def ensure_different_files(source, target):
raise err
def copy_file(source, target, follow_symlinks=True, dirs_exist_ok=False,
preserve_metadata=False):
"""
Recursively copy the given source ReadablePath to the given target WritablePath.
"""
info = source.info
if not follow_symlinks and info.is_symlink():
target.symlink_to(source.readlink(), info.is_dir())
if preserve_metadata:
target._write_info(info, follow_symlinks=False)
elif info.is_dir():
children = source.iterdir()
target.mkdir(exist_ok=dirs_exist_ok)
for src in children:
dst = target.joinpath(src.name)
copy_file(src, dst, follow_symlinks, dirs_exist_ok, preserve_metadata)
if preserve_metadata:
target._write_info(info)
else:
if _copyfile2:
# Use fast OS routine for local file copying where available.
try:
source_p = os.fspath(source)
target_p = os.fspath(target)
except TypeError:
pass
else:
_copyfile2(source_p, target_p)
return
ensure_different_files(source, target)
with magic_open(source, 'rb') as source_f:
with magic_open(target, 'wb') as target_f:
_copyfileobj(source_f, target_f)
if preserve_metadata:
target._write_info(info)
def copy_info(info, target, follow_symlinks=True):
"""Copy metadata from the given PathInfo to the given local path."""
copy_times_ns = (
hasattr(info, '_access_time_ns') and
hasattr(info, '_mod_time_ns') and
(follow_symlinks or os.utime in os.supports_follow_symlinks))
if copy_times_ns:
t0 = info._access_time_ns(follow_symlinks=follow_symlinks)
t1 = info._mod_time_ns(follow_symlinks=follow_symlinks)
os.utime(target, ns=(t0, t1), follow_symlinks=follow_symlinks)
# We must copy extended attributes before the file is (potentially)
# chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
copy_xattrs = (
hasattr(info, '_xattrs') and
hasattr(os, 'setxattr') and
(follow_symlinks or os.setxattr in os.supports_follow_symlinks))
if copy_xattrs:
xattrs = info._xattrs(follow_symlinks=follow_symlinks)
for attr, value in xattrs:
try:
os.setxattr(target, attr, value, follow_symlinks=follow_symlinks)
except OSError as e:
if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
raise
copy_posix_permissions = (
hasattr(info, '_posix_permissions') and
(follow_symlinks or os.chmod in os.supports_follow_symlinks))
if copy_posix_permissions:
posix_permissions = info._posix_permissions(follow_symlinks=follow_symlinks)
try:
os.chmod(target, posix_permissions, follow_symlinks=follow_symlinks)
except NotImplementedError:
# if we got a NotImplementedError, it's because
# * follow_symlinks=False,
# * lchown() is unavailable, and
# * either
# * fchownat() is unavailable or
# * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
# (it returned ENOSUP.)
# therefore we're out of options--we simply cannot chown the
# symlink. give up, suppress the error.
# (which is what shutil always did in this circumstance.)
pass
copy_bsd_flags = (
hasattr(info, '_bsd_flags') and
hasattr(os, 'chflags') and
(follow_symlinks or os.chflags in os.supports_follow_symlinks))
if copy_bsd_flags:
bsd_flags = info._bsd_flags(follow_symlinks=follow_symlinks)
try:
os.chflags(target, bsd_flags, follow_symlinks=follow_symlinks)
except OSError as why:
if why.errno not in (EOPNOTSUPP, ENOTSUP):
raise
class _PathInfoBase:
__slots__ = ('_path', '_stat_result', '_lstat_result')