mirror of
https://github.com/python/cpython.git
synced 2025-08-31 14:07:50 +00:00
Issue #22524: New os.scandir() function, part of the PEP 471: "os.scandir()
function -- a better and faster directory iterator". Patch written by Ben Hoyt.
This commit is contained in:
parent
adb351fcea
commit
6036e4431d
5 changed files with 1222 additions and 20 deletions
|
@ -2698,5 +2698,229 @@ class ExportsTests(unittest.TestCase):
|
|||
self.assertIn('walk', os.__all__)
|
||||
|
||||
|
||||
class TestScandir(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.path = os.path.realpath(support.TESTFN)
|
||||
self.addCleanup(support.rmtree, self.path)
|
||||
os.mkdir(self.path)
|
||||
|
||||
def create_file(self, name="file.txt"):
|
||||
filename = os.path.join(self.path, name)
|
||||
with open(filename, "wb") as fp:
|
||||
fp.write(b'python')
|
||||
return filename
|
||||
|
||||
def get_entries(self, names):
|
||||
entries = dict((entry.name, entry)
|
||||
for entry in os.scandir(self.path))
|
||||
self.assertEqual(sorted(entries.keys()), names)
|
||||
return entries
|
||||
|
||||
def assert_stat_equal(self, stat1, stat2, skip_fields):
|
||||
if skip_fields:
|
||||
for attr in dir(stat1):
|
||||
if not attr.startswith("st_"):
|
||||
continue
|
||||
if attr in ("st_dev", "st_ino", "st_nlink"):
|
||||
continue
|
||||
self.assertEqual(getattr(stat1, attr),
|
||||
getattr(stat2, attr),
|
||||
(stat1, stat2, attr))
|
||||
else:
|
||||
self.assertEqual(stat1, stat2)
|
||||
|
||||
def check_entry(self, entry, name, is_dir, is_file, is_symlink):
|
||||
self.assertEqual(entry.name, name)
|
||||
self.assertEqual(entry.path, os.path.join(self.path, name))
|
||||
self.assertEqual(entry.inode(),
|
||||
os.stat(entry.path, follow_symlinks=False).st_ino)
|
||||
|
||||
entry_stat = os.stat(entry.path)
|
||||
self.assertEqual(entry.is_dir(),
|
||||
stat.S_ISDIR(entry_stat.st_mode))
|
||||
self.assertEqual(entry.is_file(),
|
||||
stat.S_ISREG(entry_stat.st_mode))
|
||||
self.assertEqual(entry.is_symlink(),
|
||||
os.path.islink(entry.path))
|
||||
|
||||
entry_lstat = os.stat(entry.path, follow_symlinks=False)
|
||||
self.assertEqual(entry.is_dir(follow_symlinks=False),
|
||||
stat.S_ISDIR(entry_lstat.st_mode))
|
||||
self.assertEqual(entry.is_file(follow_symlinks=False),
|
||||
stat.S_ISREG(entry_lstat.st_mode))
|
||||
|
||||
self.assert_stat_equal(entry.stat(),
|
||||
entry_stat,
|
||||
os.name == 'nt' and not is_symlink)
|
||||
self.assert_stat_equal(entry.stat(follow_symlinks=False),
|
||||
entry_lstat,
|
||||
os.name == 'nt')
|
||||
|
||||
def test_attributes(self):
|
||||
link = hasattr(os, 'link')
|
||||
symlink = support.can_symlink()
|
||||
|
||||
dirname = os.path.join(self.path, "dir")
|
||||
os.mkdir(dirname)
|
||||
filename = self.create_file("file.txt")
|
||||
if link:
|
||||
os.link(filename, os.path.join(self.path, "link_file.txt"))
|
||||
if symlink:
|
||||
os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
|
||||
target_is_directory=True)
|
||||
os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
|
||||
|
||||
names = ['dir', 'file.txt']
|
||||
if link:
|
||||
names.append('link_file.txt')
|
||||
if symlink:
|
||||
names.extend(('symlink_dir', 'symlink_file.txt'))
|
||||
entries = self.get_entries(names)
|
||||
|
||||
entry = entries['dir']
|
||||
self.check_entry(entry, 'dir', True, False, False)
|
||||
|
||||
entry = entries['file.txt']
|
||||
self.check_entry(entry, 'file.txt', False, True, False)
|
||||
|
||||
if link:
|
||||
entry = entries['link_file.txt']
|
||||
self.check_entry(entry, 'link_file.txt', False, True, False)
|
||||
|
||||
if symlink:
|
||||
entry = entries['symlink_dir']
|
||||
self.check_entry(entry, 'symlink_dir', True, False, True)
|
||||
|
||||
entry = entries['symlink_file.txt']
|
||||
self.check_entry(entry, 'symlink_file.txt', False, True, True)
|
||||
|
||||
def get_entry(self, name):
|
||||
entries = list(os.scandir(self.path))
|
||||
self.assertEqual(len(entries), 1)
|
||||
|
||||
entry = entries[0]
|
||||
self.assertEqual(entry.name, name)
|
||||
return entry
|
||||
|
||||
def create_file_entry(self):
|
||||
filename = self.create_file()
|
||||
return self.get_entry(os.path.basename(filename))
|
||||
|
||||
def test_current_directory(self):
|
||||
filename = self.create_file()
|
||||
old_dir = os.getcwd()
|
||||
try:
|
||||
os.chdir(self.path)
|
||||
|
||||
# call scandir() without parameter: it must list the content
|
||||
# of the current directory
|
||||
entries = dict((entry.name, entry) for entry in os.scandir())
|
||||
self.assertEqual(sorted(entries.keys()),
|
||||
[os.path.basename(filename)])
|
||||
finally:
|
||||
os.chdir(old_dir)
|
||||
|
||||
def test_repr(self):
|
||||
entry = self.create_file_entry()
|
||||
self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
|
||||
|
||||
def test_removed_dir(self):
|
||||
path = os.path.join(self.path, 'dir')
|
||||
|
||||
os.mkdir(path)
|
||||
entry = self.get_entry('dir')
|
||||
os.rmdir(path)
|
||||
|
||||
# On POSIX, is_dir() result depends if scandir() filled d_type or not
|
||||
if os.name == 'nt':
|
||||
self.assertTrue(entry.is_dir())
|
||||
self.assertFalse(entry.is_file())
|
||||
self.assertFalse(entry.is_symlink())
|
||||
if os.name == 'nt':
|
||||
self.assertRaises(FileNotFoundError, entry.inode)
|
||||
# don't fail
|
||||
entry.stat()
|
||||
entry.stat(follow_symlinks=False)
|
||||
else:
|
||||
self.assertGreater(entry.inode(), 0)
|
||||
self.assertRaises(FileNotFoundError, entry.stat)
|
||||
self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
|
||||
|
||||
def test_removed_file(self):
|
||||
entry = self.create_file_entry()
|
||||
os.unlink(entry.path)
|
||||
|
||||
self.assertFalse(entry.is_dir())
|
||||
# On POSIX, is_dir() result depends if scandir() filled d_type or not
|
||||
if os.name == 'nt':
|
||||
self.assertTrue(entry.is_file())
|
||||
self.assertFalse(entry.is_symlink())
|
||||
if os.name == 'nt':
|
||||
self.assertRaises(FileNotFoundError, entry.inode)
|
||||
# don't fail
|
||||
entry.stat()
|
||||
entry.stat(follow_symlinks=False)
|
||||
else:
|
||||
self.assertGreater(entry.inode(), 0)
|
||||
self.assertRaises(FileNotFoundError, entry.stat)
|
||||
self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
|
||||
|
||||
def test_broken_symlink(self):
|
||||
if not support.can_symlink():
|
||||
return self.skipTest('cannot create symbolic link')
|
||||
|
||||
filename = self.create_file("file.txt")
|
||||
os.symlink(filename,
|
||||
os.path.join(self.path, "symlink.txt"))
|
||||
entries = self.get_entries(['file.txt', 'symlink.txt'])
|
||||
entry = entries['symlink.txt']
|
||||
os.unlink(filename)
|
||||
|
||||
self.assertGreater(entry.inode(), 0)
|
||||
self.assertFalse(entry.is_dir())
|
||||
self.assertFalse(entry.is_file()) # broken symlink returns False
|
||||
self.assertFalse(entry.is_dir(follow_symlinks=False))
|
||||
self.assertFalse(entry.is_file(follow_symlinks=False))
|
||||
self.assertTrue(entry.is_symlink())
|
||||
self.assertRaises(FileNotFoundError, entry.stat)
|
||||
# don't fail
|
||||
entry.stat(follow_symlinks=False)
|
||||
|
||||
def test_bytes(self):
|
||||
if os.name == "nt":
|
||||
# On Windows, os.scandir(bytes) must raise an exception
|
||||
self.assertRaises(TypeError, os.scandir, b'.')
|
||||
return
|
||||
|
||||
self.create_file("file.txt")
|
||||
|
||||
path_bytes = os.fsencode(self.path)
|
||||
entries = list(os.scandir(path_bytes))
|
||||
self.assertEqual(len(entries), 1, entries)
|
||||
entry = entries[0]
|
||||
|
||||
self.assertEqual(entry.name, b'file.txt')
|
||||
self.assertEqual(entry.path,
|
||||
os.fsencode(os.path.join(self.path, 'file.txt')))
|
||||
|
||||
def test_empty_path(self):
|
||||
self.assertRaises(FileNotFoundError, os.scandir, '')
|
||||
|
||||
def test_consume_iterator_twice(self):
|
||||
self.create_file("file.txt")
|
||||
iterator = os.scandir(self.path)
|
||||
|
||||
entries = list(iterator)
|
||||
self.assertEqual(len(entries), 1, entries)
|
||||
|
||||
# check than consuming the iterator twice doesn't raise exception
|
||||
entries2 = list(iterator)
|
||||
self.assertEqual(len(entries2), 0, entries2)
|
||||
|
||||
def test_bad_path_type(self):
|
||||
for obj in [1234, 1.234, {}, []]:
|
||||
self.assertRaises(TypeError, os.scandir, obj)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue