mirror of
https://github.com/python/cpython.git
synced 2025-08-28 12:45:07 +00:00
gh-102828: add onexc arg to shutil.rmtree. Deprecate onerror. (#102829)
This commit is contained in:
parent
4d1f033986
commit
d51a6dc28e
5 changed files with 256 additions and 56 deletions
|
@ -195,7 +195,7 @@ class TestRmTree(BaseTest, unittest.TestCase):
|
|||
shutil.rmtree(victim)
|
||||
|
||||
@os_helper.skip_unless_symlink
|
||||
def test_rmtree_fails_on_symlink(self):
|
||||
def test_rmtree_fails_on_symlink_onerror(self):
|
||||
tmp = self.mkdtemp()
|
||||
dir_ = os.path.join(tmp, 'dir')
|
||||
os.mkdir(dir_)
|
||||
|
@ -213,6 +213,25 @@ class TestRmTree(BaseTest, unittest.TestCase):
|
|||
self.assertEqual(errors[0][1], link)
|
||||
self.assertIsInstance(errors[0][2][1], OSError)
|
||||
|
||||
@os_helper.skip_unless_symlink
|
||||
def test_rmtree_fails_on_symlink_onexc(self):
|
||||
tmp = self.mkdtemp()
|
||||
dir_ = os.path.join(tmp, 'dir')
|
||||
os.mkdir(dir_)
|
||||
link = os.path.join(tmp, 'link')
|
||||
os.symlink(dir_, link)
|
||||
self.assertRaises(OSError, shutil.rmtree, link)
|
||||
self.assertTrue(os.path.exists(dir_))
|
||||
self.assertTrue(os.path.lexists(link))
|
||||
errors = []
|
||||
def onexc(*args):
|
||||
errors.append(args)
|
||||
shutil.rmtree(link, onexc=onexc)
|
||||
self.assertEqual(len(errors), 1)
|
||||
self.assertIs(errors[0][0], os.path.islink)
|
||||
self.assertEqual(errors[0][1], link)
|
||||
self.assertIsInstance(errors[0][2], OSError)
|
||||
|
||||
@os_helper.skip_unless_symlink
|
||||
def test_rmtree_works_on_symlinks(self):
|
||||
tmp = self.mkdtemp()
|
||||
|
@ -236,7 +255,7 @@ class TestRmTree(BaseTest, unittest.TestCase):
|
|||
self.assertTrue(os.path.exists(file1))
|
||||
|
||||
@unittest.skipUnless(_winapi, 'only relevant on Windows')
|
||||
def test_rmtree_fails_on_junctions(self):
|
||||
def test_rmtree_fails_on_junctions_onerror(self):
|
||||
tmp = self.mkdtemp()
|
||||
dir_ = os.path.join(tmp, 'dir')
|
||||
os.mkdir(dir_)
|
||||
|
@ -255,6 +274,26 @@ class TestRmTree(BaseTest, unittest.TestCase):
|
|||
self.assertEqual(errors[0][1], link)
|
||||
self.assertIsInstance(errors[0][2][1], OSError)
|
||||
|
||||
@unittest.skipUnless(_winapi, 'only relevant on Windows')
|
||||
def test_rmtree_fails_on_junctions_onexc(self):
|
||||
tmp = self.mkdtemp()
|
||||
dir_ = os.path.join(tmp, 'dir')
|
||||
os.mkdir(dir_)
|
||||
link = os.path.join(tmp, 'link')
|
||||
_winapi.CreateJunction(dir_, link)
|
||||
self.addCleanup(os_helper.unlink, link)
|
||||
self.assertRaises(OSError, shutil.rmtree, link)
|
||||
self.assertTrue(os.path.exists(dir_))
|
||||
self.assertTrue(os.path.lexists(link))
|
||||
errors = []
|
||||
def onexc(*args):
|
||||
errors.append(args)
|
||||
shutil.rmtree(link, onexc=onexc)
|
||||
self.assertEqual(len(errors), 1)
|
||||
self.assertIs(errors[0][0], os.path.islink)
|
||||
self.assertEqual(errors[0][1], link)
|
||||
self.assertIsInstance(errors[0][2], OSError)
|
||||
|
||||
@unittest.skipUnless(_winapi, 'only relevant on Windows')
|
||||
def test_rmtree_works_on_junctions(self):
|
||||
tmp = self.mkdtemp()
|
||||
|
@ -277,7 +316,7 @@ class TestRmTree(BaseTest, unittest.TestCase):
|
|||
self.assertTrue(os.path.exists(dir3))
|
||||
self.assertTrue(os.path.exists(file1))
|
||||
|
||||
def test_rmtree_errors(self):
|
||||
def test_rmtree_errors_onerror(self):
|
||||
# filename is guaranteed not to exist
|
||||
filename = tempfile.mktemp(dir=self.mkdtemp())
|
||||
self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
|
||||
|
@ -309,6 +348,37 @@ class TestRmTree(BaseTest, unittest.TestCase):
|
|||
self.assertIsInstance(errors[1][2][1], NotADirectoryError)
|
||||
self.assertEqual(errors[1][2][1].filename, filename)
|
||||
|
||||
def test_rmtree_errors_onexc(self):
|
||||
# filename is guaranteed not to exist
|
||||
filename = tempfile.mktemp(dir=self.mkdtemp())
|
||||
self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
|
||||
# test that ignore_errors option is honored
|
||||
shutil.rmtree(filename, ignore_errors=True)
|
||||
|
||||
# existing file
|
||||
tmpdir = self.mkdtemp()
|
||||
write_file((tmpdir, "tstfile"), "")
|
||||
filename = os.path.join(tmpdir, "tstfile")
|
||||
with self.assertRaises(NotADirectoryError) as cm:
|
||||
shutil.rmtree(filename)
|
||||
self.assertEqual(cm.exception.filename, filename)
|
||||
self.assertTrue(os.path.exists(filename))
|
||||
# test that ignore_errors option is honored
|
||||
shutil.rmtree(filename, ignore_errors=True)
|
||||
self.assertTrue(os.path.exists(filename))
|
||||
errors = []
|
||||
def onexc(*args):
|
||||
errors.append(args)
|
||||
shutil.rmtree(filename, onexc=onexc)
|
||||
self.assertEqual(len(errors), 2)
|
||||
self.assertIs(errors[0][0], os.scandir)
|
||||
self.assertEqual(errors[0][1], filename)
|
||||
self.assertIsInstance(errors[0][2], NotADirectoryError)
|
||||
self.assertEqual(errors[0][2].filename, filename)
|
||||
self.assertIs(errors[1][0], os.rmdir)
|
||||
self.assertEqual(errors[1][1], filename)
|
||||
self.assertIsInstance(errors[1][2], NotADirectoryError)
|
||||
self.assertEqual(errors[1][2].filename, filename)
|
||||
|
||||
@unittest.skipIf(sys.platform[:6] == 'cygwin',
|
||||
"This test can't be run on Cygwin (issue #1071513).")
|
||||
|
@ -368,6 +438,100 @@ class TestRmTree(BaseTest, unittest.TestCase):
|
|||
self.assertTrue(issubclass(exc[0], OSError))
|
||||
self.errorState = 3
|
||||
|
||||
@unittest.skipIf(sys.platform[:6] == 'cygwin',
|
||||
"This test can't be run on Cygwin (issue #1071513).")
|
||||
@os_helper.skip_if_dac_override
|
||||
@os_helper.skip_unless_working_chmod
|
||||
def test_on_exc(self):
|
||||
self.errorState = 0
|
||||
os.mkdir(TESTFN)
|
||||
self.addCleanup(shutil.rmtree, TESTFN)
|
||||
|
||||
self.child_file_path = os.path.join(TESTFN, 'a')
|
||||
self.child_dir_path = os.path.join(TESTFN, 'b')
|
||||
os_helper.create_empty_file(self.child_file_path)
|
||||
os.mkdir(self.child_dir_path)
|
||||
old_dir_mode = os.stat(TESTFN).st_mode
|
||||
old_child_file_mode = os.stat(self.child_file_path).st_mode
|
||||
old_child_dir_mode = os.stat(self.child_dir_path).st_mode
|
||||
# Make unwritable.
|
||||
new_mode = stat.S_IREAD|stat.S_IEXEC
|
||||
os.chmod(self.child_file_path, new_mode)
|
||||
os.chmod(self.child_dir_path, new_mode)
|
||||
os.chmod(TESTFN, new_mode)
|
||||
|
||||
self.addCleanup(os.chmod, TESTFN, old_dir_mode)
|
||||
self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
|
||||
self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
|
||||
|
||||
shutil.rmtree(TESTFN, onexc=self.check_args_to_onexc)
|
||||
# Test whether onexc has actually been called.
|
||||
self.assertEqual(self.errorState, 3,
|
||||
"Expected call to onexc function did not happen.")
|
||||
|
||||
def check_args_to_onexc(self, func, arg, exc):
|
||||
# test_rmtree_errors deliberately runs rmtree
|
||||
# on a directory that is chmod 500, which will fail.
|
||||
# This function is run when shutil.rmtree fails.
|
||||
# 99.9% of the time it initially fails to remove
|
||||
# a file in the directory, so the first time through
|
||||
# func is os.remove.
|
||||
# However, some Linux machines running ZFS on
|
||||
# FUSE experienced a failure earlier in the process
|
||||
# at os.listdir. The first failure may legally
|
||||
# be either.
|
||||
if self.errorState < 2:
|
||||
if func is os.unlink:
|
||||
self.assertEqual(arg, self.child_file_path)
|
||||
elif func is os.rmdir:
|
||||
self.assertEqual(arg, self.child_dir_path)
|
||||
else:
|
||||
self.assertIs(func, os.listdir)
|
||||
self.assertIn(arg, [TESTFN, self.child_dir_path])
|
||||
self.assertTrue(isinstance(exc, OSError))
|
||||
self.errorState += 1
|
||||
else:
|
||||
self.assertEqual(func, os.rmdir)
|
||||
self.assertEqual(arg, TESTFN)
|
||||
self.assertTrue(isinstance(exc, OSError))
|
||||
self.errorState = 3
|
||||
|
||||
def test_both_onerror_and_onexc(self):
|
||||
onerror_called = False
|
||||
onexc_called = False
|
||||
|
||||
def onerror(*args):
|
||||
nonlocal onerror_called
|
||||
onerror_called = True
|
||||
|
||||
def onexc(*args):
|
||||
nonlocal onexc_called
|
||||
onexc_called = True
|
||||
|
||||
os.mkdir(TESTFN)
|
||||
self.addCleanup(shutil.rmtree, TESTFN)
|
||||
|
||||
self.child_file_path = os.path.join(TESTFN, 'a')
|
||||
self.child_dir_path = os.path.join(TESTFN, 'b')
|
||||
os_helper.create_empty_file(self.child_file_path)
|
||||
os.mkdir(self.child_dir_path)
|
||||
old_dir_mode = os.stat(TESTFN).st_mode
|
||||
old_child_file_mode = os.stat(self.child_file_path).st_mode
|
||||
old_child_dir_mode = os.stat(self.child_dir_path).st_mode
|
||||
# Make unwritable.
|
||||
new_mode = stat.S_IREAD|stat.S_IEXEC
|
||||
os.chmod(self.child_file_path, new_mode)
|
||||
os.chmod(self.child_dir_path, new_mode)
|
||||
os.chmod(TESTFN, new_mode)
|
||||
|
||||
self.addCleanup(os.chmod, TESTFN, old_dir_mode)
|
||||
self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
|
||||
self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
|
||||
|
||||
shutil.rmtree(TESTFN, onerror=onerror, onexc=onexc)
|
||||
self.assertTrue(onexc_called)
|
||||
self.assertFalse(onerror_called)
|
||||
|
||||
def test_rmtree_does_not_choke_on_failing_lstat(self):
|
||||
try:
|
||||
orig_lstat = os.lstat
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue