mirror of
https://github.com/python/cpython.git
synced 2025-10-17 04:08:28 +00:00
gh-90385: Add pathlib.Path.walk()
method (GH-92517)
Automerge-Triggered-By: GH:brettcannon
This commit is contained in:
parent
e4d3a96a11
commit
c1e929858a
5 changed files with 338 additions and 1 deletions
|
@ -2478,6 +2478,203 @@ class _BasePathTest(object):
|
|||
def test_complex_symlinks_relative_dot_dot(self):
|
||||
self._check_complex_symlinks(os.path.join('dirA', '..'))
|
||||
|
||||
class WalkTests(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.addCleanup(os_helper.rmtree, os_helper.TESTFN)
|
||||
|
||||
# Build:
|
||||
# TESTFN/
|
||||
# TEST1/ a file kid and two directory kids
|
||||
# tmp1
|
||||
# SUB1/ a file kid and a directory kid
|
||||
# tmp2
|
||||
# SUB11/ no kids
|
||||
# SUB2/ a file kid and a dirsymlink kid
|
||||
# tmp3
|
||||
# SUB21/ not readable
|
||||
# tmp5
|
||||
# link/ a symlink to TEST2
|
||||
# broken_link
|
||||
# broken_link2
|
||||
# broken_link3
|
||||
# TEST2/
|
||||
# tmp4 a lone file
|
||||
self.walk_path = pathlib.Path(os_helper.TESTFN, "TEST1")
|
||||
self.sub1_path = self.walk_path / "SUB1"
|
||||
self.sub11_path = self.sub1_path / "SUB11"
|
||||
self.sub2_path = self.walk_path / "SUB2"
|
||||
sub21_path= self.sub2_path / "SUB21"
|
||||
tmp1_path = self.walk_path / "tmp1"
|
||||
tmp2_path = self.sub1_path / "tmp2"
|
||||
tmp3_path = self.sub2_path / "tmp3"
|
||||
tmp5_path = sub21_path / "tmp3"
|
||||
self.link_path = self.sub2_path / "link"
|
||||
t2_path = pathlib.Path(os_helper.TESTFN, "TEST2")
|
||||
tmp4_path = pathlib.Path(os_helper.TESTFN, "TEST2", "tmp4")
|
||||
broken_link_path = self.sub2_path / "broken_link"
|
||||
broken_link2_path = self.sub2_path / "broken_link2"
|
||||
broken_link3_path = self.sub2_path / "broken_link3"
|
||||
|
||||
os.makedirs(self.sub11_path)
|
||||
os.makedirs(self.sub2_path)
|
||||
os.makedirs(sub21_path)
|
||||
os.makedirs(t2_path)
|
||||
|
||||
for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
|
||||
with open(path, "x", encoding='utf-8') as f:
|
||||
f.write(f"I'm {path} and proud of it. Blame test_pathlib.\n")
|
||||
|
||||
if os_helper.can_symlink():
|
||||
os.symlink(os.path.abspath(t2_path), self.link_path)
|
||||
os.symlink('broken', broken_link_path, True)
|
||||
os.symlink(pathlib.Path('tmp3', 'broken'), broken_link2_path, True)
|
||||
os.symlink(pathlib.Path('SUB21', 'tmp5'), broken_link3_path, True)
|
||||
self.sub2_tree = (self.sub2_path, ["SUB21"],
|
||||
["broken_link", "broken_link2", "broken_link3",
|
||||
"link", "tmp3"])
|
||||
else:
|
||||
self.sub2_tree = (self.sub2_path, ["SUB21"], ["tmp3"])
|
||||
|
||||
if not is_emscripten:
|
||||
# Emscripten fails with inaccessible directories.
|
||||
os.chmod(sub21_path, 0)
|
||||
try:
|
||||
os.listdir(sub21_path)
|
||||
except PermissionError:
|
||||
self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
|
||||
else:
|
||||
os.chmod(sub21_path, stat.S_IRWXU)
|
||||
os.unlink(tmp5_path)
|
||||
os.rmdir(sub21_path)
|
||||
del self.sub2_tree[1][:1]
|
||||
|
||||
def test_walk_topdown(self):
|
||||
all = list(self.walk_path.walk())
|
||||
|
||||
self.assertEqual(len(all), 4)
|
||||
# We can't know which order SUB1 and SUB2 will appear in.
|
||||
# Not flipped: TESTFN, SUB1, SUB11, SUB2
|
||||
# flipped: TESTFN, SUB2, SUB1, SUB11
|
||||
flipped = all[0][1][0] != "SUB1"
|
||||
all[0][1].sort()
|
||||
all[3 - 2 * flipped][-1].sort()
|
||||
all[3 - 2 * flipped][1].sort()
|
||||
self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
|
||||
self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
|
||||
self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
|
||||
self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
|
||||
|
||||
def test_walk_prune(self, walk_path=None):
|
||||
if walk_path is None:
|
||||
walk_path = self.walk_path
|
||||
# Prune the search.
|
||||
all = []
|
||||
for root, dirs, files in walk_path.walk():
|
||||
all.append((root, dirs, files))
|
||||
if 'SUB1' in dirs:
|
||||
# Note that this also mutates the dirs we appended to all!
|
||||
dirs.remove('SUB1')
|
||||
|
||||
self.assertEqual(len(all), 2)
|
||||
self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
|
||||
|
||||
all[1][-1].sort()
|
||||
all[1][1].sort()
|
||||
self.assertEqual(all[1], self.sub2_tree)
|
||||
|
||||
def test_file_like_path(self):
|
||||
self.test_walk_prune(FakePath(self.walk_path).__fspath__())
|
||||
|
||||
def test_walk_bottom_up(self):
|
||||
all = list(self.walk_path.walk( top_down=False))
|
||||
|
||||
self.assertEqual(len(all), 4, all)
|
||||
# We can't know which order SUB1 and SUB2 will appear in.
|
||||
# Not flipped: SUB11, SUB1, SUB2, TESTFN
|
||||
# flipped: SUB2, SUB11, SUB1, TESTFN
|
||||
flipped = all[3][1][0] != "SUB1"
|
||||
all[3][1].sort()
|
||||
all[2 - 2 * flipped][-1].sort()
|
||||
all[2 - 2 * flipped][1].sort()
|
||||
self.assertEqual(all[3],
|
||||
(self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
|
||||
self.assertEqual(all[flipped],
|
||||
(self.sub11_path, [], []))
|
||||
self.assertEqual(all[flipped + 1],
|
||||
(self.sub1_path, ["SUB11"], ["tmp2"]))
|
||||
self.assertEqual(all[2 - 2 * flipped],
|
||||
self.sub2_tree)
|
||||
|
||||
@os_helper.skip_unless_symlink
|
||||
def test_walk_follow_symlinks(self):
|
||||
walk_it = self.walk_path.walk(follow_symlinks=True)
|
||||
for root, dirs, files in walk_it:
|
||||
if root == self.link_path:
|
||||
self.assertEqual(dirs, [])
|
||||
self.assertEqual(files, ["tmp4"])
|
||||
break
|
||||
else:
|
||||
self.fail("Didn't follow symlink with follow_symlinks=True")
|
||||
|
||||
def test_walk_symlink_location(self):
|
||||
# Tests whether symlinks end up in filenames or dirnames depending
|
||||
# on the `follow_symlinks` argument.
|
||||
walk_it = self.walk_path.walk(follow_symlinks=False)
|
||||
for root, dirs, files in walk_it:
|
||||
if root == self.sub2_path:
|
||||
self.assertIn("link", files)
|
||||
break
|
||||
else:
|
||||
self.fail("symlink not found")
|
||||
|
||||
walk_it = self.walk_path.walk(follow_symlinks=True)
|
||||
for root, dirs, files in walk_it:
|
||||
if root == self.sub2_path:
|
||||
self.assertIn("link", dirs)
|
||||
break
|
||||
|
||||
def test_walk_bad_dir(self):
|
||||
errors = []
|
||||
walk_it = self.walk_path.walk(on_error=errors.append)
|
||||
root, dirs, files = next(walk_it)
|
||||
self.assertEqual(errors, [])
|
||||
dir1 = 'SUB1'
|
||||
path1 = root / dir1
|
||||
path1new = (root / dir1).with_suffix(".new")
|
||||
path1.rename(path1new)
|
||||
try:
|
||||
roots = [r for r, _, _ in walk_it]
|
||||
self.assertTrue(errors)
|
||||
self.assertNotIn(path1, roots)
|
||||
self.assertNotIn(path1new, roots)
|
||||
for dir2 in dirs:
|
||||
if dir2 != dir1:
|
||||
self.assertIn(root / dir2, roots)
|
||||
finally:
|
||||
path1new.rename(path1)
|
||||
|
||||
def test_walk_many_open_files(self):
|
||||
depth = 30
|
||||
base = pathlib.Path(os_helper.TESTFN, 'deep')
|
||||
path = pathlib.Path(base, *(['d']*depth))
|
||||
path.mkdir(parents=True)
|
||||
|
||||
iters = [base.walk(top_down=False) for _ in range(100)]
|
||||
for i in range(depth + 1):
|
||||
expected = (path, ['d'] if i else [], [])
|
||||
for it in iters:
|
||||
self.assertEqual(next(it), expected)
|
||||
path = path.parent
|
||||
|
||||
iters = [base.walk(top_down=True) for _ in range(100)]
|
||||
path = base
|
||||
for i in range(depth + 1):
|
||||
expected = (path, ['d'] if i < depth else [], [])
|
||||
for it in iters:
|
||||
self.assertEqual(next(it), expected)
|
||||
path = path / 'd'
|
||||
|
||||
|
||||
class PathTest(_BasePathTest, unittest.TestCase):
|
||||
cls = pathlib.Path
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue