mirror of
https://github.com/python/cpython.git
synced 2025-08-08 10:58:51 +00:00
GH-125413: Add pathlib.Path.scandir()
method (#126060)
Add `pathlib.Path.scandir()` as a trivial wrapper of `os.scandir()`. This will be used to implement several `PathBase` methods more efficiently, including methods that provide `Path.copy()`.
This commit is contained in:
parent
d0abd0b826
commit
260843df1b
6 changed files with 114 additions and 11 deletions
|
@ -1,4 +1,5 @@
|
|||
import collections
|
||||
import contextlib
|
||||
import io
|
||||
import os
|
||||
import errno
|
||||
|
@ -1424,6 +1425,24 @@ DummyPathStatResult = collections.namedtuple(
|
|||
'st_mode st_ino st_dev st_nlink st_uid st_gid st_size st_atime st_mtime st_ctime')
|
||||
|
||||
|
||||
class DummyDirEntry:
|
||||
"""
|
||||
Minimal os.DirEntry-like object. Returned from DummyPath.scandir().
|
||||
"""
|
||||
__slots__ = ('name', '_is_symlink', '_is_dir')
|
||||
|
||||
def __init__(self, name, is_symlink, is_dir):
|
||||
self.name = name
|
||||
self._is_symlink = is_symlink
|
||||
self._is_dir = is_dir
|
||||
|
||||
def is_symlink(self):
|
||||
return self._is_symlink
|
||||
|
||||
def is_dir(self, *, follow_symlinks=True):
|
||||
return self._is_dir and (follow_symlinks or not self._is_symlink)
|
||||
|
||||
|
||||
class DummyPath(PathBase):
|
||||
"""
|
||||
Simple implementation of PathBase that keeps files and directories in
|
||||
|
@ -1491,14 +1510,25 @@ class DummyPath(PathBase):
|
|||
stream = io.TextIOWrapper(stream, encoding=encoding, errors=errors, newline=newline)
|
||||
return stream
|
||||
|
||||
def iterdir(self):
|
||||
path = str(self.resolve())
|
||||
if path in self._files:
|
||||
raise NotADirectoryError(errno.ENOTDIR, "Not a directory", path)
|
||||
elif path in self._directories:
|
||||
return iter([self / name for name in self._directories[path]])
|
||||
@contextlib.contextmanager
|
||||
def scandir(self):
|
||||
path = self.resolve()
|
||||
path_str = str(path)
|
||||
if path_str in self._files:
|
||||
raise NotADirectoryError(errno.ENOTDIR, "Not a directory", path_str)
|
||||
elif path_str in self._directories:
|
||||
yield iter([path.joinpath(name)._dir_entry for name in self._directories[path_str]])
|
||||
else:
|
||||
raise FileNotFoundError(errno.ENOENT, "File not found", path)
|
||||
raise FileNotFoundError(errno.ENOENT, "File not found", path_str)
|
||||
|
||||
@property
|
||||
def _dir_entry(self):
|
||||
path_str = str(self)
|
||||
is_symlink = path_str in self._symlinks
|
||||
is_directory = (path_str in self._directories
|
||||
if not is_symlink
|
||||
else self._symlinks[path_str][1])
|
||||
return DummyDirEntry(self.name, is_symlink, is_directory)
|
||||
|
||||
def mkdir(self, mode=0o777, parents=False, exist_ok=False):
|
||||
path = str(self.parent.resolve() / self.name)
|
||||
|
@ -1602,7 +1632,7 @@ class DummyPathTest(DummyPurePathTest):
|
|||
if self.can_symlink:
|
||||
p.joinpath('linkA').symlink_to('fileA')
|
||||
p.joinpath('brokenLink').symlink_to('non-existing')
|
||||
p.joinpath('linkB').symlink_to('dirB')
|
||||
p.joinpath('linkB').symlink_to('dirB', target_is_directory=True)
|
||||
p.joinpath('dirA', 'linkC').symlink_to(parser.join('..', 'dirB'))
|
||||
p.joinpath('dirB', 'linkD').symlink_to(parser.join('..', 'dirB'))
|
||||
p.joinpath('brokenLinkLoop').symlink_to('brokenLinkLoop')
|
||||
|
@ -2187,6 +2217,23 @@ class DummyPathTest(DummyPurePathTest):
|
|||
self.assertIn(cm.exception.errno, (errno.ENOTDIR,
|
||||
errno.ENOENT, errno.EINVAL))
|
||||
|
||||
def test_scandir(self):
|
||||
p = self.cls(self.base)
|
||||
with p.scandir() as entries:
|
||||
self.assertTrue(list(entries))
|
||||
with p.scandir() as entries:
|
||||
for entry in entries:
|
||||
child = p / entry.name
|
||||
self.assertIsNotNone(entry)
|
||||
self.assertEqual(entry.name, child.name)
|
||||
self.assertEqual(entry.is_symlink(),
|
||||
child.is_symlink())
|
||||
self.assertEqual(entry.is_dir(follow_symlinks=False),
|
||||
child.is_dir(follow_symlinks=False))
|
||||
if entry.name != 'brokenLinkLoop':
|
||||
self.assertEqual(entry.is_dir(), child.is_dir())
|
||||
|
||||
|
||||
def test_glob_common(self):
|
||||
def _check(glob, expected):
|
||||
self.assertEqual(set(glob), { P(self.base, q) for q in expected })
|
||||
|
@ -3038,7 +3085,7 @@ class DummyPathWithSymlinks(DummyPath):
|
|||
def readlink(self):
|
||||
path = str(self.parent.resolve() / self.name)
|
||||
if path in self._symlinks:
|
||||
return self.with_segments(self._symlinks[path])
|
||||
return self.with_segments(self._symlinks[path][0])
|
||||
elif path in self._files or path in self._directories:
|
||||
raise OSError(errno.EINVAL, "Not a symlink", path)
|
||||
else:
|
||||
|
@ -3050,7 +3097,7 @@ class DummyPathWithSymlinks(DummyPath):
|
|||
if path in self._symlinks:
|
||||
raise FileExistsError(errno.EEXIST, "File exists", path)
|
||||
self._directories[parent].add(self.name)
|
||||
self._symlinks[path] = str(target)
|
||||
self._symlinks[path] = str(target), target_is_directory
|
||||
|
||||
|
||||
class DummyPathWithSymlinksTest(DummyPathTest):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue