GH-130614: pathlib ABCs: revise test suite for readable paths (#131018)

Test `pathlib.types._ReadablePath` in a dedicated test module. These tests
cover `ReadableZipPath`, `ReadableLocalPath` and `Path`, where the former
two classes are implementations of `_ReadablePath` for use in tests.
This commit is contained in:
Barney Gale 2025-03-11 20:54:22 +00:00 committed by GitHub
parent 24070492cf
commit ad90c5fabc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 759 additions and 268 deletions

View file

@ -0,0 +1,145 @@
"""
Implementation of ReadablePath for local paths, for use in pathlib tests.
LocalPathGround is also defined here. It helps establish the "ground truth"
about local paths in tests.
"""
import os
import pathlib.types
from test.support import os_helper
from test.test_pathlib.support.lexical_path import LexicalPath
class LocalPathGround:
can_symlink = os_helper.can_symlink()
def __init__(self, path_cls):
self.path_cls = path_cls
def setup(self, local_suffix=""):
root = self.path_cls(os_helper.TESTFN + local_suffix)
os.mkdir(root)
return root
def teardown(self, root):
os_helper.rmtree(root)
def create_file(self, p, data=b''):
with open(p, 'wb') as f:
f.write(data)
def create_dir(self, p):
os.mkdir(p)
def create_symlink(self, p, target):
os.symlink(target, p)
def create_hierarchy(self, p):
os.mkdir(os.path.join(p, 'dirA'))
os.mkdir(os.path.join(p, 'dirB'))
os.mkdir(os.path.join(p, 'dirC'))
os.mkdir(os.path.join(p, 'dirC', 'dirD'))
with open(os.path.join(p, 'fileA'), 'wb') as f:
f.write(b"this is file A\n")
with open(os.path.join(p, 'dirB', 'fileB'), 'wb') as f:
f.write(b"this is file B\n")
with open(os.path.join(p, 'dirC', 'fileC'), 'wb') as f:
f.write(b"this is file C\n")
with open(os.path.join(p, 'dirC', 'novel.txt'), 'wb') as f:
f.write(b"this is a novel\n")
with open(os.path.join(p, 'dirC', 'dirD', 'fileD'), 'wb') as f:
f.write(b"this is file D\n")
if self.can_symlink:
# Relative symlinks.
os.symlink('fileA', os.path.join(p, 'linkA'))
os.symlink('non-existing', os.path.join(p, 'brokenLink'))
os.symlink('dirB',
os.path.join(p, 'linkB'),
target_is_directory=True)
os.symlink(os.path.join('..', 'dirB'),
os.path.join(p, 'dirA', 'linkC'),
target_is_directory=True)
# Broken symlink (pointing to itself).
os.symlink('brokenLinkLoop', os.path.join(p, 'brokenLinkLoop'))
isdir = staticmethod(os.path.isdir)
isfile = staticmethod(os.path.isfile)
islink = staticmethod(os.path.islink)
readlink = staticmethod(os.readlink)
def readtext(self, p):
with open(p, 'r') as f:
return f.read()
def readbytes(self, p):
with open(p, 'rb') as f:
return f.read()
class LocalPathInfo(pathlib.types.PathInfo):
"""
Simple implementation of PathInfo for a local path
"""
__slots__ = ('_path', '_exists', '_is_dir', '_is_file', '_is_symlink')
def __init__(self, path):
self._path = str(path)
self._exists = None
self._is_dir = None
self._is_file = None
self._is_symlink = None
def exists(self, *, follow_symlinks=True):
"""Whether this path exists."""
if not follow_symlinks and self.is_symlink():
return True
if self._exists is None:
self._exists = os.path.exists(self._path)
return self._exists
def is_dir(self, *, follow_symlinks=True):
"""Whether this path is a directory."""
if not follow_symlinks and self.is_symlink():
return False
if self._is_dir is None:
self._is_dir = os.path.isdir(self._path)
return self._is_dir
def is_file(self, *, follow_symlinks=True):
"""Whether this path is a regular file."""
if not follow_symlinks and self.is_symlink():
return False
if self._is_file is None:
self._is_file = os.path.isfile(self._path)
return self._is_file
def is_symlink(self):
"""Whether this path is a symbolic link."""
if self._is_symlink is None:
self._is_symlink = os.path.islink(self._path)
return self._is_symlink
class ReadableLocalPath(pathlib.types._ReadablePath, LexicalPath):
"""
Simple implementation of a ReadablePath class for local filesystem paths.
"""
__slots__ = ('info',)
def __init__(self, *pathsegments):
super().__init__(*pathsegments)
self.info = LocalPathInfo(self)
def __fspath__(self):
return str(self)
def __open_rb__(self, buffering=-1):
return open(self, 'rb')
def iterdir(self):
return (self / name for name in os.listdir(self))
def readlink(self):
return self.with_segments(os.readlink(self))

View file

@ -0,0 +1,278 @@
"""
Implementation of ReadablePath for zip file members, for use in pathlib tests.
ZipPathGround is also defined here. It helps establish the "ground truth"
about zip file members in tests.
"""
import errno
import io
import pathlib.types
import posixpath
import stat
import zipfile
from stat import S_IFMT, S_ISDIR, S_ISREG, S_ISLNK
class ZipPathGround:
can_symlink = True
def __init__(self, path_cls):
self.path_cls = path_cls
def setup(self, local_suffix=""):
return self.path_cls(zip_file=zipfile.ZipFile(io.BytesIO(), "w"))
def teardown(self, root):
root.zip_file.close()
def create_file(self, path, data=b''):
path.zip_file.writestr(str(path), data)
def create_dir(self, path):
path.zip_file.mkdir(str(path))
def create_symlink(self, path, target):
zip_info = zipfile.ZipInfo(str(path))
zip_info.external_attr = stat.S_IFLNK << 16
path.zip_file.writestr(zip_info, target.encode())
def create_hierarchy(self, p):
# Add regular files
self.create_file(p.joinpath('fileA'), b'this is file A\n')
self.create_file(p.joinpath('dirB/fileB'), b'this is file B\n')
self.create_file(p.joinpath('dirC/fileC'), b'this is file C\n')
self.create_file(p.joinpath('dirC/dirD/fileD'), b'this is file D\n')
self.create_file(p.joinpath('dirC/novel.txt'), b'this is a novel\n')
# Add symlinks
self.create_symlink(p.joinpath('linkA'), 'fileA')
self.create_symlink(p.joinpath('linkB'), 'dirB')
self.create_symlink(p.joinpath('dirA/linkC'), '../dirB')
self.create_symlink(p.joinpath('brokenLink'), 'non-existing')
self.create_symlink(p.joinpath('brokenLinkLoop'), 'brokenLinkLoop')
def readtext(self, p):
with p.zip_file.open(str(p), 'r') as f:
f = io.TextIOWrapper(f)
return f.read()
def readbytes(self, p):
with p.zip_file.open(str(p), 'r') as f:
return f.read()
readlink = readtext
def isdir(self, p):
path_str = str(p) + "/"
return path_str in p.zip_file.NameToInfo
def isfile(self, p):
info = p.zip_file.NameToInfo.get(str(p))
if info is None:
return False
return not stat.S_ISLNK(info.external_attr >> 16)
def islink(self, p):
info = p.zip_file.NameToInfo.get(str(p))
if info is None:
return False
return stat.S_ISLNK(info.external_attr >> 16)
class MissingZipPathInfo:
"""
PathInfo implementation that is used when a zip file member is missing.
"""
__slots__ = ()
def exists(self, follow_symlinks=True):
return False
def is_dir(self, follow_symlinks=True):
return False
def is_file(self, follow_symlinks=True):
return False
def is_symlink(self):
return False
def resolve(self):
return self
missing_zip_path_info = MissingZipPathInfo()
class ZipPathInfo:
"""
PathInfo implementation for an existing zip file member.
"""
__slots__ = ('zip_file', 'zip_info', 'parent', 'children')
def __init__(self, zip_file, parent=None):
self.zip_file = zip_file
self.zip_info = None
self.parent = parent or self
self.children = {}
def exists(self, follow_symlinks=True):
if follow_symlinks and self.is_symlink():
return self.resolve().exists()
return True
def is_dir(self, follow_symlinks=True):
if follow_symlinks and self.is_symlink():
return self.resolve().is_dir()
elif self.zip_info is None:
return True
elif fmt := S_IFMT(self.zip_info.external_attr >> 16):
return S_ISDIR(fmt)
else:
return self.zip_info.filename.endswith('/')
def is_file(self, follow_symlinks=True):
if follow_symlinks and self.is_symlink():
return self.resolve().is_file()
elif self.zip_info is None:
return False
elif fmt := S_IFMT(self.zip_info.external_attr >> 16):
return S_ISREG(fmt)
else:
return not self.zip_info.filename.endswith('/')
def is_symlink(self):
if self.zip_info is None:
return False
elif fmt := S_IFMT(self.zip_info.external_attr >> 16):
return S_ISLNK(fmt)
else:
return False
def resolve(self, path=None, create=False, follow_symlinks=True):
"""
Traverse zip hierarchy (parents, children and symlinks) starting
from this PathInfo. This is called from three places:
- When a zip file member is added to ZipFile.filelist, this method
populates the ZipPathInfo tree (using create=True).
- When ReadableZipPath.info is accessed, this method is finds a
ZipPathInfo entry for the path without resolving any final symlink
(using follow_symlinks=False)
- When ZipPathInfo methods are called with follow_symlinks=True, this
method resolves any symlink in the final path position.
"""
link_count = 0
stack = path.split('/')[::-1] if path else []
info = self
while True:
if info.is_symlink() and (follow_symlinks or stack):
link_count += 1
if link_count >= 40:
return missing_zip_path_info # Symlink loop!
path = info.zip_file.read(info.zip_info).decode()
stack += path.split('/')[::-1] if path else []
info = info.parent
if stack:
name = stack.pop()
else:
return info
if name == '..':
info = info.parent
elif name and name != '.':
if name not in info.children:
if create:
info.children[name] = ZipPathInfo(info.zip_file, info)
else:
return missing_zip_path_info # No such child!
info = info.children[name]
class ZipFileList:
"""
`list`-like object that we inject as `ZipFile.filelist`. We maintain a
tree of `ZipPathInfo` objects representing the zip file members.
"""
__slots__ = ('tree', '_items')
def __init__(self, zip_file):
self.tree = ZipPathInfo(zip_file)
self._items = []
for item in zip_file.filelist:
self.append(item)
def __len__(self):
return len(self._items)
def __iter__(self):
return iter(self._items)
def append(self, item):
self._items.append(item)
self.tree.resolve(item.filename, create=True).zip_info = item
class ReadableZipPath(pathlib.types._ReadablePath):
"""
Simple implementation of a ReadablePath class for .zip files.
"""
__slots__ = ('_segments', 'zip_file')
parser = posixpath
def __init__(self, *pathsegments, zip_file):
self._segments = pathsegments
self.zip_file = zip_file
if not isinstance(zip_file.filelist, ZipFileList):
zip_file.filelist = ZipFileList(zip_file)
def __hash__(self):
return hash((str(self), self.zip_file))
def __eq__(self, other):
if not isinstance(other, ReadableZipPath):
return NotImplemented
return str(self) == str(other) and self.zip_file is other.zip_file
def __str__(self):
if not self._segments:
return ''
return self.parser.join(*self._segments)
def __repr__(self):
return f'{type(self).__name__}({str(self)!r}, zip_file={self.zip_file!r})'
def with_segments(self, *pathsegments):
return type(self)(*pathsegments, zip_file=self.zip_file)
@property
def info(self):
tree = self.zip_file.filelist.tree
return tree.resolve(str(self), follow_symlinks=False)
def __open_rb__(self, buffering=-1):
info = self.info.resolve()
if not info.exists():
raise FileNotFoundError(errno.ENOENT, "File not found", self)
elif info.is_dir():
raise IsADirectoryError(errno.EISDIR, "Is a directory", self)
return self.zip_file.open(info.zip_info, 'r')
def iterdir(self):
info = self.info.resolve()
if not info.exists():
raise FileNotFoundError(errno.ENOENT, "File not found", self)
elif not info.is_dir():
raise NotADirectoryError(errno.ENOTDIR, "Not a directory", self)
return (self / name for name in info.children)
def readlink(self):
info = self.info
if not info.exists():
raise FileNotFoundError(errno.ENOENT, "File not found", self)
elif not info.is_symlink():
raise OSError(errno.EINVAL, "Not a symlink", self)
return self.with_segments(self.zip_file.read(info.zip_info).decode())

View file

@ -2429,6 +2429,33 @@ class PathTest(test_pathlib_abc.RWPathTest, PurePathTest):
with self.assertRaises(pathlib.UnsupportedOperation):
q.symlink_to(p)
def test_info_exists_caching(self):
p = self.cls(self.base)
q = p / 'myfile'
self.assertFalse(q.info.exists())
self.assertFalse(q.info.exists(follow_symlinks=False))
q.write_text('hullo')
self.assertFalse(q.info.exists())
self.assertFalse(q.info.exists(follow_symlinks=False))
def test_info_is_dir_caching(self):
p = self.cls(self.base)
q = p / 'mydir'
self.assertFalse(q.info.is_dir())
self.assertFalse(q.info.is_dir(follow_symlinks=False))
q.mkdir()
self.assertFalse(q.info.is_dir())
self.assertFalse(q.info.is_dir(follow_symlinks=False))
def test_info_is_file_caching(self):
p = self.cls(self.base)
q = p / 'myfile'
self.assertFalse(q.info.is_file())
self.assertFalse(q.info.is_file(follow_symlinks=False))
q.write_text('hullo')
self.assertFalse(q.info.is_file())
self.assertFalse(q.info.is_file(follow_symlinks=False))
@needs_symlinks
def test_info_is_symlink_caching(self):
p = self.cls(self.base)

View file

@ -314,76 +314,6 @@ class ReadablePathTest(JoinablePathTest):
normcase = self.parser.normcase
self.assertEqual(normcase(path_a), normcase(path_b))
def test_is_readable(self):
p = self.cls(self.base)
self.assertIsInstance(p, _ReadablePath)
def test_magic_open(self):
p = self.cls(self.base)
with magic_open(p / 'fileA', 'r') as f:
self.assertIsInstance(f, io.TextIOBase)
self.assertEqual(f.read(), "this is file A\n")
with magic_open(p / 'fileA', 'rb') as f:
self.assertIsInstance(f, io.BufferedIOBase)
self.assertEqual(f.read().strip(), b"this is file A")
def test_iterdir(self):
P = self.cls
p = P(self.base)
it = p.iterdir()
paths = set(it)
expected = ['dirA', 'dirB', 'dirC', 'dirE', 'fileA']
if self.can_symlink:
expected += ['linkA', 'linkB', 'brokenLink', 'brokenLinkLoop']
self.assertEqual(paths, { P(self.base, q) for q in expected })
def test_iterdir_nodir(self):
# __iter__ on something that is not a directory.
p = self.cls(self.base, 'fileA')
with self.assertRaises(OSError) as cm:
p.iterdir()
# ENOENT or EINVAL under Windows, ENOTDIR otherwise
# (see issue #12802).
self.assertIn(cm.exception.errno, (errno.ENOTDIR,
errno.ENOENT, errno.EINVAL))
def test_iterdir_info(self):
p = self.cls(self.base)
for child in p.iterdir():
self.assertIsInstance(child.info, PathInfo)
self.assertTrue(child.info.exists(follow_symlinks=False))
def test_glob_common(self):
def _check(glob, expected):
self.assertEqual(set(glob), { P(self.base, q) for q in expected })
P = self.cls
p = P(self.base)
it = p.glob("fileA")
self.assertIsInstance(it, collections.abc.Iterator)
_check(it, ["fileA"])
_check(p.glob("fileB"), [])
_check(p.glob("dir*/file*"), ["dirB/fileB", "dirC/fileC"])
if not self.can_symlink:
_check(p.glob("*A"), ['dirA', 'fileA'])
else:
_check(p.glob("*A"), ['dirA', 'fileA', 'linkA'])
if not self.can_symlink:
_check(p.glob("*B/*"), ['dirB/fileB'])
else:
_check(p.glob("*B/*"), ['dirB/fileB', 'dirB/linkD',
'linkB/fileB', 'linkB/linkD'])
if not self.can_symlink:
_check(p.glob("*/fileB"), ['dirB/fileB'])
else:
_check(p.glob("*/fileB"), ['dirB/fileB', 'linkB/fileB'])
if self.can_symlink:
_check(p.glob("brokenLink"), ['brokenLink'])
if not self.can_symlink:
_check(p.glob("*/"), ["dirA/", "dirB/", "dirC/", "dirE/"])
else:
_check(p.glob("*/"), ["dirA/", "dirB/", "dirC/", "dirE/", "linkB/"])
@needs_posix
def test_glob_posix(self):
P = self.cls
@ -402,123 +332,6 @@ class ReadablePathTest(JoinablePathTest):
self.assertEqual(set(p.glob("*a\\")), { P(self.base, "dirA/") })
self.assertEqual(set(p.glob("F*a")), { P(self.base, "fileA") })
def test_glob_empty_pattern(self):
P = self.cls
p = P(self.base)
self.assertEqual(list(p.glob("")), [p.joinpath("")])
def test_info_exists(self):
p = self.cls(self.base)
self.assertTrue(p.info.exists())
self.assertTrue((p / 'dirA').info.exists())
self.assertTrue((p / 'dirA').info.exists(follow_symlinks=False))
self.assertTrue((p / 'fileA').info.exists())
self.assertTrue((p / 'fileA').info.exists(follow_symlinks=False))
self.assertFalse((p / 'non-existing').info.exists())
self.assertFalse((p / 'non-existing').info.exists(follow_symlinks=False))
if self.can_symlink:
self.assertTrue((p / 'linkA').info.exists())
self.assertTrue((p / 'linkA').info.exists(follow_symlinks=False))
self.assertTrue((p / 'linkB').info.exists())
self.assertTrue((p / 'linkB').info.exists(follow_symlinks=True))
self.assertFalse((p / 'brokenLink').info.exists())
self.assertTrue((p / 'brokenLink').info.exists(follow_symlinks=False))
self.assertFalse((p / 'brokenLinkLoop').info.exists())
self.assertTrue((p / 'brokenLinkLoop').info.exists(follow_symlinks=False))
self.assertFalse((p / 'fileA\udfff').info.exists())
self.assertFalse((p / 'fileA\udfff').info.exists(follow_symlinks=False))
self.assertFalse((p / 'fileA\x00').info.exists())
self.assertFalse((p / 'fileA\x00').info.exists(follow_symlinks=False))
def test_info_exists_caching(self):
p = self.cls(self.base)
q = p / 'myfile'
self.assertFalse(q.info.exists())
self.assertFalse(q.info.exists(follow_symlinks=False))
if isinstance(self.cls, _WritablePath):
q.write_text('hullo')
self.assertFalse(q.info.exists())
self.assertFalse(q.info.exists(follow_symlinks=False))
def test_info_is_dir(self):
p = self.cls(self.base)
self.assertTrue((p / 'dirA').info.is_dir())
self.assertTrue((p / 'dirA').info.is_dir(follow_symlinks=False))
self.assertFalse((p / 'fileA').info.is_dir())
self.assertFalse((p / 'fileA').info.is_dir(follow_symlinks=False))
self.assertFalse((p / 'non-existing').info.is_dir())
self.assertFalse((p / 'non-existing').info.is_dir(follow_symlinks=False))
if self.can_symlink:
self.assertFalse((p / 'linkA').info.is_dir())
self.assertFalse((p / 'linkA').info.is_dir(follow_symlinks=False))
self.assertTrue((p / 'linkB').info.is_dir())
self.assertFalse((p / 'linkB').info.is_dir(follow_symlinks=False))
self.assertFalse((p / 'brokenLink').info.is_dir())
self.assertFalse((p / 'brokenLink').info.is_dir(follow_symlinks=False))
self.assertFalse((p / 'brokenLinkLoop').info.is_dir())
self.assertFalse((p / 'brokenLinkLoop').info.is_dir(follow_symlinks=False))
self.assertFalse((p / 'dirA\udfff').info.is_dir())
self.assertFalse((p / 'dirA\udfff').info.is_dir(follow_symlinks=False))
self.assertFalse((p / 'dirA\x00').info.is_dir())
self.assertFalse((p / 'dirA\x00').info.is_dir(follow_symlinks=False))
def test_info_is_dir_caching(self):
p = self.cls(self.base)
q = p / 'mydir'
self.assertFalse(q.info.is_dir())
self.assertFalse(q.info.is_dir(follow_symlinks=False))
if isinstance(self.cls, _WritablePath):
q.mkdir()
self.assertFalse(q.info.is_dir())
self.assertFalse(q.info.is_dir(follow_symlinks=False))
def test_info_is_file(self):
p = self.cls(self.base)
self.assertTrue((p / 'fileA').info.is_file())
self.assertTrue((p / 'fileA').info.is_file(follow_symlinks=False))
self.assertFalse((p / 'dirA').info.is_file())
self.assertFalse((p / 'dirA').info.is_file(follow_symlinks=False))
self.assertFalse((p / 'non-existing').info.is_file())
self.assertFalse((p / 'non-existing').info.is_file(follow_symlinks=False))
if self.can_symlink:
self.assertTrue((p / 'linkA').info.is_file())
self.assertFalse((p / 'linkA').info.is_file(follow_symlinks=False))
self.assertFalse((p / 'linkB').info.is_file())
self.assertFalse((p / 'linkB').info.is_file(follow_symlinks=False))
self.assertFalse((p / 'brokenLink').info.is_file())
self.assertFalse((p / 'brokenLink').info.is_file(follow_symlinks=False))
self.assertFalse((p / 'brokenLinkLoop').info.is_file())
self.assertFalse((p / 'brokenLinkLoop').info.is_file(follow_symlinks=False))
self.assertFalse((p / 'fileA\udfff').info.is_file())
self.assertFalse((p / 'fileA\udfff').info.is_file(follow_symlinks=False))
self.assertFalse((p / 'fileA\x00').info.is_file())
self.assertFalse((p / 'fileA\x00').info.is_file(follow_symlinks=False))
def test_info_is_file_caching(self):
p = self.cls(self.base)
q = p / 'myfile'
self.assertFalse(q.info.is_file())
self.assertFalse(q.info.is_file(follow_symlinks=False))
if isinstance(self.cls, _WritablePath):
q.write_text('hullo')
self.assertFalse(q.info.is_file())
self.assertFalse(q.info.is_file(follow_symlinks=False))
def test_info_is_symlink(self):
p = self.cls(self.base)
self.assertFalse((p / 'fileA').info.is_symlink())
self.assertFalse((p / 'dirA').info.is_symlink())
self.assertFalse((p / 'non-existing').info.is_symlink())
if self.can_symlink:
self.assertTrue((p / 'linkA').info.is_symlink())
self.assertTrue((p / 'linkB').info.is_symlink())
self.assertTrue((p / 'brokenLink').info.is_symlink())
self.assertFalse((p / 'linkA\udfff').info.is_symlink())
self.assertFalse((p / 'linkA\x00').info.is_symlink())
self.assertTrue((p / 'brokenLinkLoop').info.is_symlink())
self.assertFalse((p / 'fileA\udfff').info.is_symlink())
self.assertFalse((p / 'fileA\x00').info.is_symlink())
class WritablePathTest(JoinablePathTest):
cls = DummyWritablePath
@ -553,21 +366,6 @@ class RWPathTest(WritablePathTest, ReadablePathTest):
self.assertRaises(TypeError, (p / 'fileA').write_text, b'somebytes')
self.assertEqual((p / 'fileA').read_text(encoding='latin-1'), 'äbcdefg')
def test_read_text_with_newlines(self):
p = self.cls(self.base)
# Check that `\n` character change nothing
(p / 'fileA').write_bytes(b'abcde\r\nfghlk\n\rmnopq')
self.assertEqual((p / 'fileA').read_text(newline='\n'),
'abcde\r\nfghlk\n\rmnopq')
# Check that `\r` character replaces `\n`
(p / 'fileA').write_bytes(b'abcde\r\nfghlk\n\rmnopq')
self.assertEqual((p / 'fileA').read_text(newline='\r'),
'abcde\r\nfghlk\n\rmnopq')
# Check that `\r\n` character replaces `\n`
(p / 'fileA').write_bytes(b'abcde\r\nfghlk\n\rmnopq')
self.assertEqual((p / 'fileA').read_text(newline='\r\n'),
'abcde\r\nfghlk\n\rmnopq')
def test_write_text_with_newlines(self):
p = self.cls(self.base)
# Check that `\n` character change nothing
@ -763,72 +561,6 @@ class ReadablePathWalkTest(unittest.TestCase):
cls._files.clear()
cls._directories.clear()
def test_walk_topdown(self):
walker = self.walk_path.walk()
entry = next(walker)
entry[1].sort() # Ensure we visit SUB1 before SUB2
self.assertEqual(entry, (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
entry = next(walker)
self.assertEqual(entry, (self.sub1_path, ["SUB11"], ["tmp2"]))
entry = next(walker)
self.assertEqual(entry, (self.sub11_path, [], []))
entry = next(walker)
entry[1].sort()
entry[2].sort()
self.assertEqual(entry, self.sub2_tree)
with self.assertRaises(StopIteration):
next(walker)
def test_walk_prune(self):
# Prune the search.
all = []
for root, dirs, files in self.walk_path.walk():
all.append((root, dirs, files))
if 'SUB1' in dirs:
# Note that this also mutates the dirs we appended to all!
dirs.remove('SUB1')
self.assertEqual(len(all), 2)
self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
all[1][-1].sort()
all[1][1].sort()
self.assertEqual(all[1], self.sub2_tree)
def test_walk_bottom_up(self):
seen_testfn = seen_sub1 = seen_sub11 = seen_sub2 = False
for path, dirnames, filenames in self.walk_path.walk(top_down=False):
if path == self.walk_path:
self.assertFalse(seen_testfn)
self.assertTrue(seen_sub1)
self.assertTrue(seen_sub2)
self.assertEqual(sorted(dirnames), ["SUB1", "SUB2"])
self.assertEqual(filenames, ["tmp1"])
seen_testfn = True
elif path == self.sub1_path:
self.assertFalse(seen_testfn)
self.assertFalse(seen_sub1)
self.assertTrue(seen_sub11)
self.assertEqual(dirnames, ["SUB11"])
self.assertEqual(filenames, ["tmp2"])
seen_sub1 = True
elif path == self.sub11_path:
self.assertFalse(seen_sub1)
self.assertFalse(seen_sub11)
self.assertEqual(dirnames, [])
self.assertEqual(filenames, [])
seen_sub11 = True
elif path == self.sub2_path:
self.assertFalse(seen_testfn)
self.assertFalse(seen_sub2)
self.assertEqual(sorted(dirnames), sorted(self.sub2_tree[1]))
self.assertEqual(sorted(filenames), sorted(self.sub2_tree[2]))
seen_sub2 = True
else:
raise AssertionError(f"Unexpected path: {path}")
self.assertTrue(seen_testfn)
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,309 @@
"""
Tests for pathlib.types._ReadablePath
"""
import collections.abc
import io
import unittest
from pathlib import Path
from pathlib.types import PathInfo, _ReadablePath
from pathlib._os import magic_open
from test.test_pathlib.support.local_path import ReadableLocalPath, LocalPathGround
from test.test_pathlib.support.zip_path import ReadableZipPath, ZipPathGround
class ReadTestBase:
def setUp(self):
self.root = self.ground.setup()
self.ground.create_hierarchy(self.root)
def tearDown(self):
self.ground.teardown(self.root)
def test_is_readable(self):
self.assertIsInstance(self.root, _ReadablePath)
def test_open_r(self):
p = self.root / 'fileA'
with magic_open(p, 'r') as f:
self.assertIsInstance(f, io.TextIOBase)
self.assertEqual(f.read(), 'this is file A\n')
def test_open_rb(self):
p = self.root / 'fileA'
with magic_open(p, 'rb') as f:
self.assertEqual(f.read(), b'this is file A\n')
def test_read_bytes(self):
p = self.root / 'fileA'
self.assertEqual(p.read_bytes(), b'this is file A\n')
def test_read_text(self):
p = self.root / 'fileA'
self.assertEqual(p.read_text(), 'this is file A\n')
q = self.root / 'abc'
self.ground.create_file(q, b'\xe4bcdefg')
self.assertEqual(q.read_text(encoding='latin-1'), 'äbcdefg')
self.assertEqual(q.read_text(encoding='utf-8', errors='ignore'), 'bcdefg')
def test_read_text_with_newlines(self):
p = self.root / 'abc'
self.ground.create_file(p, b'abcde\r\nfghlk\n\rmnopq')
# Check that `\n` character change nothing
self.assertEqual(p.read_text(newline='\n'), 'abcde\r\nfghlk\n\rmnopq')
# Check that `\r` character replaces `\n`
self.assertEqual(p.read_text(newline='\r'), 'abcde\r\nfghlk\n\rmnopq')
# Check that `\r\n` character replaces `\n`
self.assertEqual(p.read_text(newline='\r\n'), 'abcde\r\nfghlk\n\rmnopq')
def test_iterdir(self):
expected = ['dirA', 'dirB', 'dirC', 'fileA']
if self.ground.can_symlink:
expected += ['linkA', 'linkB', 'brokenLink', 'brokenLinkLoop']
expected = {self.root.joinpath(name) for name in expected}
actual = set(self.root.iterdir())
self.assertEqual(actual, expected)
def test_iterdir_nodir(self):
p = self.root / 'fileA'
self.assertRaises(OSError, p.iterdir)
def test_iterdir_info(self):
for child in self.root.iterdir():
self.assertIsInstance(child.info, PathInfo)
self.assertTrue(child.info.exists(follow_symlinks=False))
def test_glob(self):
if not self.ground.can_symlink:
self.skipTest("requires symlinks")
p = self.root
sep = self.root.parser.sep
altsep = self.root.parser.altsep
def check(pattern, expected):
if altsep:
expected = {name.replace(altsep, sep) for name in expected}
expected = {p.joinpath(name) for name in expected}
actual = set(p.glob(pattern, recurse_symlinks=True))
self.assertEqual(actual, expected)
it = p.glob("fileA")
self.assertIsInstance(it, collections.abc.Iterator)
self.assertEqual(list(it), [p.joinpath("fileA")])
check("*A", ["dirA", "fileA", "linkA"])
check("*A", ['dirA', 'fileA', 'linkA'])
check("*B/*", ["dirB/fileB", "linkB/fileB"])
check("*B/*", ['dirB/fileB', 'linkB/fileB'])
check("brokenLink", ['brokenLink'])
check("brokenLinkLoop", ['brokenLinkLoop'])
check("**/", ["", "dirA/", "dirA/linkC/", "dirB/", "dirC/", "dirC/dirD/", "linkB/"])
check("**/*/", ["dirA/", "dirA/linkC/", "dirB/", "dirC/", "dirC/dirD/", "linkB/"])
check("*/", ["dirA/", "dirB/", "dirC/", "linkB/"])
check("*/dirD/**/", ["dirC/dirD/"])
check("*/dirD/**", ["dirC/dirD/", "dirC/dirD/fileD"])
check("dir*/**", ["dirA/", "dirA/linkC", "dirA/linkC/fileB", "dirB/", "dirB/fileB", "dirC/",
"dirC/fileC", "dirC/dirD", "dirC/dirD/fileD", "dirC/novel.txt"])
check("dir*/**/", ["dirA/", "dirA/linkC/", "dirB/", "dirC/", "dirC/dirD/"])
check("dir*/**/..", ["dirA/..", "dirA/linkC/..", "dirB/..", "dirC/..", "dirC/dirD/.."])
check("dir*/*/**", ["dirA/linkC/", "dirA/linkC/fileB", "dirC/dirD/", "dirC/dirD/fileD"])
check("dir*/*/**/", ["dirA/linkC/", "dirC/dirD/"])
check("dir*/*/**/..", ["dirA/linkC/..", "dirC/dirD/.."])
check("dir*/*/..", ["dirC/dirD/..", "dirA/linkC/.."])
check("dir*/*/../dirD/**/", ["dirC/dirD/../dirD/"])
check("dir*/**/fileC", ["dirC/fileC"])
check("dir*/file*", ["dirB/fileB", "dirC/fileC"])
check("**/*/fileA", [])
check("fileB", [])
check("**/*/fileB", ["dirB/fileB", "dirA/linkC/fileB", "linkB/fileB"])
check("**/fileB", ["dirB/fileB", "dirA/linkC/fileB", "linkB/fileB"])
check("*/fileB", ["dirB/fileB", "linkB/fileB"])
check("*/fileB", ['dirB/fileB', 'linkB/fileB'])
check("**/file*",
["fileA", "dirA/linkC/fileB", "dirB/fileB", "dirC/fileC", "dirC/dirD/fileD",
"linkB/fileB"])
def test_walk_top_down(self):
it = self.root.walk()
path, dirnames, filenames = next(it)
dirnames.sort()
filenames.sort()
self.assertEqual(path, self.root)
self.assertEqual(dirnames, ['dirA', 'dirB', 'dirC'])
self.assertEqual(filenames, ['brokenLink', 'brokenLinkLoop', 'fileA', 'linkA', 'linkB']
if self.ground.can_symlink else ['fileA'])
path, dirnames, filenames = next(it)
self.assertEqual(path, self.root / 'dirA')
self.assertEqual(dirnames, [])
self.assertEqual(filenames, ['linkC'] if self.ground.can_symlink else [])
path, dirnames, filenames = next(it)
self.assertEqual(path, self.root / 'dirB')
self.assertEqual(dirnames, [])
self.assertEqual(filenames, ['fileB'])
path, dirnames, filenames = next(it)
filenames.sort()
self.assertEqual(path, self.root / 'dirC')
self.assertEqual(dirnames, ['dirD'])
self.assertEqual(filenames, ['fileC', 'novel.txt'])
path, dirnames, filenames = next(it)
self.assertEqual(path, self.root / 'dirC' / 'dirD')
self.assertEqual(dirnames, [])
self.assertEqual(filenames, ['fileD'])
self.assertRaises(StopIteration, next, it)
def test_walk_prune(self):
expected = {self.root, self.root / 'dirA', self.root / 'dirC', self.root / 'dirC' / 'dirD'}
actual = set()
for path, dirnames, filenames in self.root.walk():
actual.add(path)
if path == self.root:
dirnames.remove('dirB')
self.assertEqual(actual, expected)
def test_walk_bottom_up(self):
seen_root = seen_dira = seen_dirb = seen_dirc = seen_dird = False
for path, dirnames, filenames in self.root.walk(top_down=False):
if path == self.root:
self.assertFalse(seen_root)
self.assertTrue(seen_dira)
self.assertTrue(seen_dirb)
self.assertTrue(seen_dirc)
self.assertEqual(sorted(dirnames), ['dirA', 'dirB', 'dirC'])
self.assertEqual(sorted(filenames),
['brokenLink', 'brokenLinkLoop', 'fileA', 'linkA', 'linkB']
if self.ground.can_symlink else ['fileA'])
seen_root = True
elif path == self.root / 'dirA':
self.assertFalse(seen_root)
self.assertFalse(seen_dira)
self.assertEqual(dirnames, [])
self.assertEqual(filenames, ['linkC'] if self.ground.can_symlink else [])
seen_dira = True
elif path == self.root / 'dirB':
self.assertFalse(seen_root)
self.assertFalse(seen_dirb)
self.assertEqual(dirnames, [])
self.assertEqual(filenames, ['fileB'])
seen_dirb = True
elif path == self.root / 'dirC':
self.assertFalse(seen_root)
self.assertFalse(seen_dirc)
self.assertTrue(seen_dird)
self.assertEqual(dirnames, ['dirD'])
self.assertEqual(sorted(filenames), ['fileC', 'novel.txt'])
seen_dirc = True
elif path == self.root / 'dirC' / 'dirD':
self.assertFalse(seen_root)
self.assertFalse(seen_dirc)
self.assertFalse(seen_dird)
self.assertEqual(dirnames, [])
self.assertEqual(filenames, ['fileD'])
seen_dird = True
else:
raise AssertionError(f"Unexpected path: {path}")
self.assertTrue(seen_root)
def test_info_exists(self):
p = self.root
self.assertTrue(p.info.exists())
self.assertTrue((p / 'dirA').info.exists())
self.assertTrue((p / 'dirA').info.exists(follow_symlinks=False))
self.assertTrue((p / 'fileA').info.exists())
self.assertTrue((p / 'fileA').info.exists(follow_symlinks=False))
self.assertFalse((p / 'non-existing').info.exists())
self.assertFalse((p / 'non-existing').info.exists(follow_symlinks=False))
if self.ground.can_symlink:
self.assertTrue((p / 'linkA').info.exists())
self.assertTrue((p / 'linkA').info.exists(follow_symlinks=False))
self.assertTrue((p / 'linkB').info.exists())
self.assertTrue((p / 'linkB').info.exists(follow_symlinks=True))
self.assertFalse((p / 'brokenLink').info.exists())
self.assertTrue((p / 'brokenLink').info.exists(follow_symlinks=False))
self.assertFalse((p / 'brokenLinkLoop').info.exists())
self.assertTrue((p / 'brokenLinkLoop').info.exists(follow_symlinks=False))
self.assertFalse((p / 'fileA\udfff').info.exists())
self.assertFalse((p / 'fileA\udfff').info.exists(follow_symlinks=False))
self.assertFalse((p / 'fileA\x00').info.exists())
self.assertFalse((p / 'fileA\x00').info.exists(follow_symlinks=False))
def test_info_is_dir(self):
p = self.root
self.assertTrue((p / 'dirA').info.is_dir())
self.assertTrue((p / 'dirA').info.is_dir(follow_symlinks=False))
self.assertFalse((p / 'fileA').info.is_dir())
self.assertFalse((p / 'fileA').info.is_dir(follow_symlinks=False))
self.assertFalse((p / 'non-existing').info.is_dir())
self.assertFalse((p / 'non-existing').info.is_dir(follow_symlinks=False))
if self.ground.can_symlink:
self.assertFalse((p / 'linkA').info.is_dir())
self.assertFalse((p / 'linkA').info.is_dir(follow_symlinks=False))
self.assertTrue((p / 'linkB').info.is_dir())
self.assertFalse((p / 'linkB').info.is_dir(follow_symlinks=False))
self.assertFalse((p / 'brokenLink').info.is_dir())
self.assertFalse((p / 'brokenLink').info.is_dir(follow_symlinks=False))
self.assertFalse((p / 'brokenLinkLoop').info.is_dir())
self.assertFalse((p / 'brokenLinkLoop').info.is_dir(follow_symlinks=False))
self.assertFalse((p / 'dirA\udfff').info.is_dir())
self.assertFalse((p / 'dirA\udfff').info.is_dir(follow_symlinks=False))
self.assertFalse((p / 'dirA\x00').info.is_dir())
self.assertFalse((p / 'dirA\x00').info.is_dir(follow_symlinks=False))
def test_info_is_file(self):
p = self.root
self.assertTrue((p / 'fileA').info.is_file())
self.assertTrue((p / 'fileA').info.is_file(follow_symlinks=False))
self.assertFalse((p / 'dirA').info.is_file())
self.assertFalse((p / 'dirA').info.is_file(follow_symlinks=False))
self.assertFalse((p / 'non-existing').info.is_file())
self.assertFalse((p / 'non-existing').info.is_file(follow_symlinks=False))
if self.ground.can_symlink:
self.assertTrue((p / 'linkA').info.is_file())
self.assertFalse((p / 'linkA').info.is_file(follow_symlinks=False))
self.assertFalse((p / 'linkB').info.is_file())
self.assertFalse((p / 'linkB').info.is_file(follow_symlinks=False))
self.assertFalse((p / 'brokenLink').info.is_file())
self.assertFalse((p / 'brokenLink').info.is_file(follow_symlinks=False))
self.assertFalse((p / 'brokenLinkLoop').info.is_file())
self.assertFalse((p / 'brokenLinkLoop').info.is_file(follow_symlinks=False))
self.assertFalse((p / 'fileA\udfff').info.is_file())
self.assertFalse((p / 'fileA\udfff').info.is_file(follow_symlinks=False))
self.assertFalse((p / 'fileA\x00').info.is_file())
self.assertFalse((p / 'fileA\x00').info.is_file(follow_symlinks=False))
def test_info_is_symlink(self):
p = self.root
self.assertFalse((p / 'fileA').info.is_symlink())
self.assertFalse((p / 'dirA').info.is_symlink())
self.assertFalse((p / 'non-existing').info.is_symlink())
if self.ground.can_symlink:
self.assertTrue((p / 'linkA').info.is_symlink())
self.assertTrue((p / 'linkB').info.is_symlink())
self.assertTrue((p / 'brokenLink').info.is_symlink())
self.assertFalse((p / 'linkA\udfff').info.is_symlink())
self.assertFalse((p / 'linkA\x00').info.is_symlink())
self.assertTrue((p / 'brokenLinkLoop').info.is_symlink())
self.assertFalse((p / 'fileA\udfff').info.is_symlink())
self.assertFalse((p / 'fileA\x00').info.is_symlink())
class ZipPathReadTest(ReadTestBase, unittest.TestCase):
ground = ZipPathGround(ReadableZipPath)
class LocalPathReadTest(ReadTestBase, unittest.TestCase):
ground = LocalPathGround(ReadableLocalPath)
class PathReadTest(ReadTestBase, unittest.TestCase):
ground = LocalPathGround(Path)
if __name__ == "__main__":
unittest.main()