mirror of
https://github.com/python/cpython.git
synced 2025-08-04 00:48:58 +00:00
[3.11] gh-116401: Fix blocking os.fwalk() and shutil.rmtree() on opening a named pipe (GH-116421) (GH-116717)
(cherry picked from commit aa7bcf284f
)
This commit is contained in:
parent
5bd350709d
commit
4a73637966
6 changed files with 113 additions and 8 deletions
|
@ -1278,6 +1278,7 @@ class EnvironTests(mapping_tests.BasicTestMappingProtocol):
|
|||
|
||||
class WalkTests(unittest.TestCase):
|
||||
"""Tests for os.walk()."""
|
||||
is_fwalk = False
|
||||
|
||||
# Wrapper to hide minor differences between os.walk and os.fwalk
|
||||
# to tests both functions with the same code base
|
||||
|
@ -1312,14 +1313,14 @@ class WalkTests(unittest.TestCase):
|
|||
self.sub11_path = join(self.sub1_path, "SUB11")
|
||||
sub2_path = join(self.walk_path, "SUB2")
|
||||
sub21_path = join(sub2_path, "SUB21")
|
||||
tmp1_path = join(self.walk_path, "tmp1")
|
||||
self.tmp1_path = join(self.walk_path, "tmp1")
|
||||
tmp2_path = join(self.sub1_path, "tmp2")
|
||||
tmp3_path = join(sub2_path, "tmp3")
|
||||
tmp5_path = join(sub21_path, "tmp3")
|
||||
self.link_path = join(sub2_path, "link")
|
||||
t2_path = join(os_helper.TESTFN, "TEST2")
|
||||
tmp4_path = join(os_helper.TESTFN, "TEST2", "tmp4")
|
||||
broken_link_path = join(sub2_path, "broken_link")
|
||||
self.broken_link_path = join(sub2_path, "broken_link")
|
||||
broken_link2_path = join(sub2_path, "broken_link2")
|
||||
broken_link3_path = join(sub2_path, "broken_link3")
|
||||
|
||||
|
@ -1329,13 +1330,13 @@ class WalkTests(unittest.TestCase):
|
|||
os.makedirs(sub21_path)
|
||||
os.makedirs(t2_path)
|
||||
|
||||
for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
|
||||
for path in self.tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
|
||||
with open(path, "x", encoding='utf-8') as f:
|
||||
f.write("I'm " + path + " and proud of it. Blame test_os.\n")
|
||||
|
||||
if os_helper.can_symlink():
|
||||
os.symlink(os.path.abspath(t2_path), self.link_path)
|
||||
os.symlink('broken', broken_link_path, True)
|
||||
os.symlink('broken', self.broken_link_path, True)
|
||||
os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
|
||||
os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
|
||||
self.sub2_tree = (sub2_path, ["SUB21", "link"],
|
||||
|
@ -1431,6 +1432,11 @@ class WalkTests(unittest.TestCase):
|
|||
else:
|
||||
self.fail("Didn't follow symlink with followlinks=True")
|
||||
|
||||
walk_it = self.walk(self.broken_link_path, follow_symlinks=True)
|
||||
if self.is_fwalk:
|
||||
self.assertRaises(FileNotFoundError, next, walk_it)
|
||||
self.assertRaises(StopIteration, next, walk_it)
|
||||
|
||||
def test_walk_bad_dir(self):
|
||||
# Walk top-down.
|
||||
errors = []
|
||||
|
@ -1452,6 +1458,73 @@ class WalkTests(unittest.TestCase):
|
|||
finally:
|
||||
os.rename(path1new, path1)
|
||||
|
||||
def test_walk_bad_dir2(self):
|
||||
walk_it = self.walk('nonexisting')
|
||||
if self.is_fwalk:
|
||||
self.assertRaises(FileNotFoundError, next, walk_it)
|
||||
self.assertRaises(StopIteration, next, walk_it)
|
||||
|
||||
walk_it = self.walk('nonexisting', follow_symlinks=True)
|
||||
if self.is_fwalk:
|
||||
self.assertRaises(FileNotFoundError, next, walk_it)
|
||||
self.assertRaises(StopIteration, next, walk_it)
|
||||
|
||||
walk_it = self.walk(self.tmp1_path)
|
||||
self.assertRaises(StopIteration, next, walk_it)
|
||||
|
||||
walk_it = self.walk(self.tmp1_path, follow_symlinks=True)
|
||||
if self.is_fwalk:
|
||||
self.assertRaises(NotADirectoryError, next, walk_it)
|
||||
self.assertRaises(StopIteration, next, walk_it)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
|
||||
@unittest.skipIf(sys.platform == "vxworks",
|
||||
"fifo requires special path on VxWorks")
|
||||
def test_walk_named_pipe(self):
|
||||
path = os_helper.TESTFN + '-pipe'
|
||||
os.mkfifo(path)
|
||||
self.addCleanup(os.unlink, path)
|
||||
|
||||
walk_it = self.walk(path)
|
||||
self.assertRaises(StopIteration, next, walk_it)
|
||||
|
||||
walk_it = self.walk(path, follow_symlinks=True)
|
||||
if self.is_fwalk:
|
||||
self.assertRaises(NotADirectoryError, next, walk_it)
|
||||
self.assertRaises(StopIteration, next, walk_it)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
|
||||
@unittest.skipIf(sys.platform == "vxworks",
|
||||
"fifo requires special path on VxWorks")
|
||||
def test_walk_named_pipe2(self):
|
||||
path = os_helper.TESTFN + '-dir'
|
||||
os.mkdir(path)
|
||||
self.addCleanup(shutil.rmtree, path)
|
||||
os.mkfifo(os.path.join(path, 'mypipe'))
|
||||
|
||||
errors = []
|
||||
walk_it = self.walk(path, onerror=errors.append)
|
||||
next(walk_it)
|
||||
self.assertRaises(StopIteration, next, walk_it)
|
||||
self.assertEqual(errors, [])
|
||||
|
||||
errors = []
|
||||
walk_it = self.walk(path, onerror=errors.append)
|
||||
root, dirs, files = next(walk_it)
|
||||
self.assertEqual(root, path)
|
||||
self.assertEqual(dirs, [])
|
||||
self.assertEqual(files, ['mypipe'])
|
||||
dirs.extend(files)
|
||||
files.clear()
|
||||
if self.is_fwalk:
|
||||
self.assertRaises(NotADirectoryError, next, walk_it)
|
||||
self.assertRaises(StopIteration, next, walk_it)
|
||||
if self.is_fwalk:
|
||||
self.assertEqual(errors, [])
|
||||
else:
|
||||
self.assertEqual(len(errors), 1, errors)
|
||||
self.assertIsInstance(errors[0], NotADirectoryError)
|
||||
|
||||
def test_walk_many_open_files(self):
|
||||
depth = 30
|
||||
base = os.path.join(os_helper.TESTFN, 'deep')
|
||||
|
@ -1477,6 +1550,7 @@ class WalkTests(unittest.TestCase):
|
|||
@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
|
||||
class FwalkTests(WalkTests):
|
||||
"""Tests for os.fwalk()."""
|
||||
is_fwalk = True
|
||||
|
||||
def walk(self, top, **kwargs):
|
||||
for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue