GH-73991: Add pathlib.Path.rmtree() (#119060)

Add a `Path.rmtree()` method that removes an entire directory tree, like
`shutil.rmtree()`. The signature of the optional *on_error* argument
matches the `Path.walk()` argument of the same name, but differs from the
*onexc* and *onerror* arguments to `shutil.rmtree()`. Consistency within
pathlib is probably more important.

In the private pathlib ABCs, we add an implementation based on `walk()`.

Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com>
This commit is contained in:
Barney Gale 2024-07-20 21:14:13 +01:00 committed by GitHub
parent 8db5f48007
commit 094375b9b7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 448 additions and 5 deletions

View file

@ -16,6 +16,7 @@ from urllib.request import pathname2url
from test.support import import_helper
from test.support import is_emscripten, is_wasi
from test.support import infinite_recursion
from test.support import swap_attr
from test.support import os_helper
from test.support.os_helper import TESTFN, FakePath
from test.test_pathlib import test_pathlib_abc
@ -31,6 +32,10 @@ root_in_posix = False
if hasattr(os, 'geteuid'):
root_in_posix = (os.geteuid() == 0)
rmtree_use_fd_functions = (
{os.open, os.stat, os.unlink, os.rmdir} <= os.supports_dir_fd and
os.listdir in os.supports_fd and os.stat in os.supports_follow_symlinks)
#
# Tests for the pure classes.
#
@ -827,6 +832,252 @@ class PathTest(test_pathlib_abc.DummyPathTest, PurePathTest):
self.assertEqual(expected_gid, gid_2)
self.assertEqual(expected_name, link.group(follow_symlinks=False))
def test_rmtree_uses_safe_fd_version_if_available(self):
if rmtree_use_fd_functions:
d = self.cls(self.base, 'a')
d.mkdir()
try:
real_open = os.open
class Called(Exception):
pass
def _raiser(*args, **kwargs):
raise Called
os.open = _raiser
self.assertRaises(Called, d.rmtree)
finally:
os.open = real_open
@unittest.skipIf(sys.platform[:6] == 'cygwin',
"This test can't be run on Cygwin (issue #1071513).")
@os_helper.skip_if_dac_override
@os_helper.skip_unless_working_chmod
def test_rmtree_unwritable(self):
tmp = self.cls(self.base, 'rmtree')
tmp.mkdir()
child_file_path = tmp / 'a'
child_dir_path = tmp / 'b'
child_file_path.write_text("")
child_dir_path.mkdir()
old_dir_mode = tmp.stat().st_mode
old_child_file_mode = child_file_path.stat().st_mode
old_child_dir_mode = child_dir_path.stat().st_mode
# Make unwritable.
new_mode = stat.S_IREAD | stat.S_IEXEC
try:
child_file_path.chmod(new_mode)
child_dir_path.chmod(new_mode)
tmp.chmod(new_mode)
errors = []
tmp.rmtree(on_error=errors.append)
# Test whether onerror has actually been called.
print(errors)
self.assertEqual(len(errors), 3)
finally:
tmp.chmod(old_dir_mode)
child_file_path.chmod(old_child_file_mode)
child_dir_path.chmod(old_child_dir_mode)
@needs_windows
def test_rmtree_inner_junction(self):
import _winapi
tmp = self.cls(self.base, 'rmtree')
tmp.mkdir()
dir1 = tmp / 'dir1'
dir2 = dir1 / 'dir2'
dir3 = tmp / 'dir3'
for d in dir1, dir2, dir3:
d.mkdir()
file1 = tmp / 'file1'
file1.write_text('foo')
link1 = dir1 / 'link1'
_winapi.CreateJunction(str(dir2), str(link1))
link2 = dir1 / 'link2'
_winapi.CreateJunction(str(dir3), str(link2))
link3 = dir1 / 'link3'
_winapi.CreateJunction(str(file1), str(link3))
# make sure junctions are removed but not followed
dir1.rmtree()
self.assertFalse(dir1.exists())
self.assertTrue(dir3.exists())
self.assertTrue(file1.exists())
@needs_windows
def test_rmtree_outer_junction(self):
import _winapi
tmp = self.cls(self.base, 'rmtree')
tmp.mkdir()
try:
src = tmp / 'cheese'
dst = tmp / 'shop'
src.mkdir()
spam = src / 'spam'
spam.write_text('')
_winapi.CreateJunction(str(src), str(dst))
self.assertRaises(OSError, dst.rmtree)
dst.rmtree(ignore_errors=True)
finally:
tmp.rmtree(ignore_errors=True)
@needs_windows
def test_rmtree_outer_junction_on_error(self):
import _winapi
tmp = self.cls(self.base, 'rmtree')
tmp.mkdir()
dir_ = tmp / 'dir'
dir_.mkdir()
link = tmp / 'link'
_winapi.CreateJunction(str(dir_), str(link))
try:
self.assertRaises(OSError, link.rmtree)
self.assertTrue(dir_.exists())
self.assertTrue(link.exists(follow_symlinks=False))
errors = []
def on_error(error):
errors.append(error)
link.rmtree(on_error=on_error)
self.assertEqual(len(errors), 1)
self.assertIsInstance(errors[0], OSError)
self.assertEqual(errors[0].filename, str(link))
finally:
os.unlink(str(link))
@unittest.skipUnless(rmtree_use_fd_functions, "requires safe rmtree")
def test_rmtree_fails_on_close(self):
# Test that the error handler is called for failed os.close() and that
# os.close() is only called once for a file descriptor.
tmp = self.cls(self.base, 'rmtree')
tmp.mkdir()
dir1 = tmp / 'dir1'
dir1.mkdir()
dir2 = dir1 / 'dir2'
dir2.mkdir()
def close(fd):
orig_close(fd)
nonlocal close_count
close_count += 1
raise OSError
close_count = 0
with swap_attr(os, 'close', close) as orig_close:
with self.assertRaises(OSError):
dir1.rmtree()
self.assertTrue(dir2.is_dir())
self.assertEqual(close_count, 2)
close_count = 0
errors = []
with swap_attr(os, 'close', close) as orig_close:
dir1.rmtree(on_error=errors.append)
print(errors)
self.assertEqual(len(errors), 2)
self.assertEqual(errors[0].filename, str(dir2))
self.assertEqual(errors[1].filename, str(dir1))
self.assertEqual(close_count, 2)
@unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
@unittest.skipIf(sys.platform == "vxworks",
"fifo requires special path on VxWorks")
def test_rmtree_on_named_pipe(self):
p = self.cls(self.base, 'pipe')
os.mkfifo(p)
try:
with self.assertRaises(NotADirectoryError):
p.rmtree()
self.assertTrue(p.exists())
finally:
p.unlink()
p = self.cls(self.base, 'dir')
p.mkdir()
os.mkfifo(p / 'mypipe')
p.rmtree()
self.assertFalse(p.exists())
@unittest.skipIf(sys.platform[:6] == 'cygwin',
"This test can't be run on Cygwin (issue #1071513).")
@os_helper.skip_if_dac_override
@os_helper.skip_unless_working_chmod
def test_rmtree_deleted_race_condition(self):
# bpo-37260
#
# Test that a file or a directory deleted after it is enumerated
# by scandir() but before unlink() or rmdr() is called doesn't
# generate any errors.
def on_error(exc):
assert exc.filename
if not isinstance(exc, PermissionError):
raise
# Make the parent and the children writeable.
for p, mode in zip(paths, old_modes):
p.chmod(mode)
# Remove other dirs except one.
keep = next(p for p in dirs if str(p) != exc.filename)
for p in dirs:
if p != keep:
p.rmdir()
# Remove other files except one.
keep = next(p for p in files if str(p) != exc.filename)
for p in files:
if p != keep:
p.unlink()
tmp = self.cls(self.base, 'rmtree')
tmp.mkdir()
paths = [tmp] + [tmp / f'child{i}' for i in range(6)]
dirs = paths[1::2]
files = paths[2::2]
for path in dirs:
path.mkdir()
for path in files:
path.write_text('')
old_modes = [path.stat().st_mode for path in paths]
# Make the parent and the children non-writeable.
new_mode = stat.S_IREAD | stat.S_IEXEC
for path in reversed(paths):
path.chmod(new_mode)
try:
tmp.rmtree(on_error=on_error)
except:
# Test failed, so cleanup artifacts.
for path, mode in zip(paths, old_modes):
try:
path.chmod(mode)
except OSError:
pass
tmp.rmtree()
raise
def test_rmtree_does_not_choke_on_failing_lstat(self):
try:
orig_lstat = os.lstat
tmp = self.cls(self.base, 'rmtree')
def raiser(fn, *args, **kwargs):
if fn != str(tmp):
raise OSError()
else:
return orig_lstat(fn)
os.lstat = raiser
tmp.mkdir()
foo = tmp / 'foo'
foo.write_text('')
tmp.rmtree()
finally:
os.lstat = orig_lstat
@os_helper.skip_unless_hardlink
def test_hardlink_to(self):
P = self.cls(self.base)