GH-122890: Fix low-level error handling in pathlib.Path.copy() (#122897)

Give unique names to our low-level FD copying functions, and try each one
in turn. Handle errors appropriately for each implementation:

- `fcntl.FICLONE`: suppress `EBADF`, `EOPNOTSUPP`, `ETXTBSY`, `EXDEV`
- `posix._fcopyfile`: suppress `EBADF`, `ENOTSUP`
- `os.copy_file_range`: suppress `ETXTBSY`, `EXDEV`
- `os.sendfile`: suppress `ENOTSOCK`
This commit is contained in:
Barney Gale 2024-08-24 15:11:39 +01:00 committed by GitHub
parent 127660bcdb
commit c4ee4e756a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 90 additions and 16 deletions

View file

@ -1,3 +1,4 @@
import contextlib
import io
import os
import sys
@ -22,10 +23,18 @@ from test.support.os_helper import TESTFN, FakePath
from test.test_pathlib import test_pathlib_abc
from test.test_pathlib.test_pathlib_abc import needs_posix, needs_windows, needs_symlinks
try:
import fcntl
except ImportError:
fcntl = None
try:
import grp, pwd
except ImportError:
grp = pwd = None
try:
import posix
except ImportError:
posix = None
root_in_posix = False
@ -707,6 +716,45 @@ class PathTest(test_pathlib_abc.DummyPathTest, PurePathTest):
if hasattr(source_st, 'st_flags'):
self.assertEqual(source_st.st_flags, target_st.st_flags)
def test_copy_error_handling(self):
def make_raiser(err):
def raiser(*args, **kwargs):
raise OSError(err, os.strerror(err))
return raiser
base = self.cls(self.base)
source = base / 'fileA'
target = base / 'copyA'
# Raise non-fatal OSError from all available fast copy functions.
with contextlib.ExitStack() as ctx:
if fcntl and hasattr(fcntl, 'FICLONE'):
ctx.enter_context(mock.patch('fcntl.ioctl', make_raiser(errno.EXDEV)))
if posix and hasattr(posix, '_fcopyfile'):
ctx.enter_context(mock.patch('posix._fcopyfile', make_raiser(errno.ENOTSUP)))
if hasattr(os, 'copy_file_range'):
ctx.enter_context(mock.patch('os.copy_file_range', make_raiser(errno.EXDEV)))
if hasattr(os, 'sendfile'):
ctx.enter_context(mock.patch('os.sendfile', make_raiser(errno.ENOTSOCK)))
source.copy(target)
self.assertTrue(target.exists())
self.assertEqual(source.read_text(), target.read_text())
# Raise fatal OSError from first available fast copy function.
if fcntl and hasattr(fcntl, 'FICLONE'):
patchpoint = 'fcntl.ioctl'
elif posix and hasattr(posix, '_fcopyfile'):
patchpoint = 'posix._fcopyfile'
elif hasattr(os, 'copy_file_range'):
patchpoint = 'os.copy_file_range'
elif hasattr(os, 'sendfile'):
patchpoint = 'os.sendfile'
else:
return
with mock.patch(patchpoint, make_raiser(errno.ENOENT)):
self.assertRaises(FileNotFoundError, source.copy, target)
@unittest.skipIf(sys.platform == "win32" or sys.platform == "wasi", "directories are always readable on Windows and WASI")
@unittest.skipIf(root_in_posix, "test fails with root privilege")
def test_copy_dir_no_read_permission(self):