Issue #14626: Large refactoring of functions / parameters in the os module.

Many functions now support "dir_fd" and "follow_symlinks" parameters;
some also support accepting an open file descriptor in place of of a path
string.  Added os.support_* collections as LBYL helpers.  Removed many
functions only previously seen in 3.3 alpha releases (often starting with
"f" or "l", or ending with "at").  Originally suggested by Serhiy Storchaka;
implemented by Larry Hastings.
This commit is contained in:
Larry Hastings 2012-06-22 16:30:09 -07:00
parent f0f4742b49
commit 9cf065cfdc
9 changed files with 3413 additions and 3041 deletions

View file

@ -345,40 +345,36 @@ class StatAttributeTests(unittest.TestCase):
return os.utime(file, ns=times)
self._test_utime_ns(utime_ns)
requires_lutimes = unittest.skipUnless(hasattr(os, 'lutimes'),
"os.lutimes required for this test.")
requires_futimes = unittest.skipUnless(hasattr(os, 'futimes'),
"os.futimes required for this test.")
requires_utime_dir_fd = unittest.skipUnless(
os.utime in os.supports_dir_fd,
"dir_fd support for utime required for this test.")
requires_utime_fd = unittest.skipUnless(
os.utime in os.supports_fd,
"fd support for utime required for this test.")
requires_utime_nofollow_symlinks = unittest.skipUnless(
os.utime in os.supports_follow_symlinks,
"follow_symlinks support for utime required for this test.")
@requires_lutimes
@requires_utime_nofollow_symlinks
def test_lutimes_ns(self):
def lutimes_ns(file, times):
return os.lutimes(file, ns=times)
return os.utime(file, ns=times, follow_symlinks=False)
self._test_utime_ns(lutimes_ns)
@requires_futimes
@requires_utime_fd
def test_futimes_ns(self):
def futimes_ns(file, times):
with open(file, "wb") as f:
os.futimes(f.fileno(), ns=times)
os.utime(f.fileno(), ns=times)
self._test_utime_ns(futimes_ns, test_dir=False)
def _utime_invalid_arguments(self, name, arg):
with self.assertRaises(RuntimeError):
with self.assertRaises(ValueError):
getattr(os, name)(arg, (5, 5), ns=(5, 5))
def test_utime_invalid_arguments(self):
self._utime_invalid_arguments('utime', self.fname)
@requires_lutimes
def test_lutimes_invalid_arguments(self):
self._utime_invalid_arguments('lutimes', self.fname)
@requires_futimes
def test_futimes_invalid_arguments(self):
with open(self.fname, "wb") as f:
self._utime_invalid_arguments('futimes', f.fileno())
@unittest.skipUnless(stat_supports_subsecond,
"os.stat() doesn't has a subsecond resolution")
@ -402,64 +398,46 @@ class StatAttributeTests(unittest.TestCase):
os.utime(filename, (atime, mtime))
self._test_utime_subsecond(set_time)
@requires_futimes
@requires_utime_fd
def test_futimes_subsecond(self):
def set_time(filename, atime, mtime):
with open(filename, "wb") as f:
os.futimes(f.fileno(), (atime, mtime))
os.utime(f.fileno(), times=(atime, mtime))
self._test_utime_subsecond(set_time)
@unittest.skipUnless(hasattr(os, 'futimens'),
"os.futimens required for this test.")
@requires_utime_fd
def test_futimens_subsecond(self):
def set_time(filename, atime, mtime):
with open(filename, "wb") as f:
asec, ansec = divmod(atime, 1.0)
asec = int(asec)
ansec = int(ansec * 1e9)
msec, mnsec = divmod(mtime, 1.0)
msec = int(msec)
mnsec = int(mnsec * 1e9)
os.futimens(f.fileno(),
(asec, ansec),
(msec, mnsec))
os.utime(f.fileno(), times=(atime, mtime))
self._test_utime_subsecond(set_time)
@unittest.skipUnless(hasattr(os, 'futimesat'),
"os.futimesat required for this test.")
@requires_utime_dir_fd
def test_futimesat_subsecond(self):
def set_time(filename, atime, mtime):
dirname = os.path.dirname(filename)
dirfd = os.open(dirname, os.O_RDONLY)
try:
os.futimesat(dirfd, os.path.basename(filename),
(atime, mtime))
os.utime(os.path.basename(filename), dir_fd=dirfd,
times=(atime, mtime))
finally:
os.close(dirfd)
self._test_utime_subsecond(set_time)
@requires_lutimes
@requires_utime_nofollow_symlinks
def test_lutimes_subsecond(self):
def set_time(filename, atime, mtime):
os.lutimes(filename, (atime, mtime))
os.utime(filename, (atime, mtime), follow_symlinks=False)
self._test_utime_subsecond(set_time)
@unittest.skipUnless(hasattr(os, 'utimensat'),
"os.utimensat required for this test.")
@requires_utime_dir_fd
def test_utimensat_subsecond(self):
def set_time(filename, atime, mtime):
dirname = os.path.dirname(filename)
dirfd = os.open(dirname, os.O_RDONLY)
try:
asec, ansec = divmod(atime, 1.0)
asec = int(asec)
ansec = int(ansec * 1e9)
msec, mnsec = divmod(mtime, 1.0)
msec = int(msec)
mnsec = int(mnsec * 1e9)
os.utimensat(dirfd, os.path.basename(filename),
(asec, ansec),
(msec, mnsec))
os.utime(os.path.basename(filename), dir_fd=dirfd,
times=(atime, mtime))
finally:
os.close(dirfd)
self._test_utime_subsecond(set_time)
@ -782,8 +760,10 @@ class FwalkTests(WalkTests):
for root, dirs, files, rootfd in os.fwalk(*args):
# check that the FD is valid
os.fstat(rootfd)
# check that flistdir() returns consistent information
self.assertEqual(set(os.flistdir(rootfd)), set(dirs) | set(files))
# redundant check
os.stat(rootfd)
# check that listdir() returns consistent information
self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
def test_fd_leak(self):
# Since we're opening a lot of FDs, we must be careful to avoid leaks:
@ -802,13 +782,10 @@ class FwalkTests(WalkTests):
# cleanup
for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
for name in files:
os.unlinkat(rootfd, name)
os.unlink(name, dir_fd=rootfd)
for name in dirs:
st = os.fstatat(rootfd, name, os.AT_SYMLINK_NOFOLLOW)
if stat.S_ISDIR(st.st_mode):
os.unlinkat(rootfd, name, os.AT_REMOVEDIR)
else:
os.unlinkat(rootfd, name)
st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
os.unlink(name, dir_fd=rootfd, rmdir=stat.S_ISDIR(st.st_mode))
os.rmdir(support.TESTFN)
@ -1262,6 +1239,13 @@ if sys.platform != 'win32':
expected = self.unicodefn
found = set(os.listdir(self.dir))
self.assertEqual(found, expected)
# test listdir without arguments
current_directory = os.getcwd()
try:
os.chdir(os.sep)
self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
finally:
os.chdir(current_directory)
def test_open(self):
for fn in self.unicodefn:
@ -1846,79 +1830,97 @@ class TestSendfile(unittest.TestCase):
raise
@support.skip_unless_xattr
def supports_extended_attributes():
if not hasattr(os, "setxattr"):
return False
try:
with open(support.TESTFN, "wb") as fp:
try:
os.setxattr(fp.fileno(), b"user.test", b"")
except OSError:
return False
finally:
support.unlink(support.TESTFN)
# Kernels < 2.6.39 don't respect setxattr flags.
kernel_version = platform.release()
m = re.match("2.6.(\d{1,2})", kernel_version)
return m is None or int(m.group(1)) >= 39
@unittest.skipUnless(supports_extended_attributes(),
"no non-broken extended attribute support")
class ExtendedAttributeTests(unittest.TestCase):
def tearDown(self):
support.unlink(support.TESTFN)
def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr):
def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
fn = support.TESTFN
open(fn, "wb").close()
with self.assertRaises(OSError) as cm:
getxattr(fn, s("user.test"))
getxattr(fn, s("user.test"), **kwargs)
self.assertEqual(cm.exception.errno, errno.ENODATA)
init_xattr = listxattr(fn)
self.assertIsInstance(init_xattr, list)
setxattr(fn, s("user.test"), b"")
setxattr(fn, s("user.test"), b"", **kwargs)
xattr = set(init_xattr)
xattr.add("user.test")
self.assertEqual(set(listxattr(fn)), xattr)
self.assertEqual(getxattr(fn, b"user.test"), b"")
setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE)
self.assertEqual(getxattr(fn, b"user.test"), b"hello")
self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
with self.assertRaises(OSError) as cm:
setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE)
setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
self.assertEqual(cm.exception.errno, errno.EEXIST)
with self.assertRaises(OSError) as cm:
setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE)
setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
self.assertEqual(cm.exception.errno, errno.ENODATA)
setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE)
setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
xattr.add("user.test2")
self.assertEqual(set(listxattr(fn)), xattr)
removexattr(fn, s("user.test"))
removexattr(fn, s("user.test"), **kwargs)
with self.assertRaises(OSError) as cm:
getxattr(fn, s("user.test"))
getxattr(fn, s("user.test"), **kwargs)
self.assertEqual(cm.exception.errno, errno.ENODATA)
xattr.remove("user.test")
self.assertEqual(set(listxattr(fn)), xattr)
self.assertEqual(getxattr(fn, s("user.test2")), b"foo")
setxattr(fn, s("user.test"), b"a"*1024)
self.assertEqual(getxattr(fn, s("user.test")), b"a"*1024)
removexattr(fn, s("user.test"))
self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
removexattr(fn, s("user.test"), **kwargs)
many = sorted("user.test{}".format(i) for i in range(100))
for thing in many:
setxattr(fn, thing, b"x")
setxattr(fn, thing, b"x", **kwargs)
self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
def _check_xattrs(self, *args):
def _check_xattrs(self, *args, **kwargs):
def make_bytes(s):
return bytes(s, "ascii")
self._check_xattrs_str(str, *args)
self._check_xattrs_str(str, *args, **kwargs)
support.unlink(support.TESTFN)
self._check_xattrs_str(make_bytes, *args)
self._check_xattrs_str(make_bytes, *args, **kwargs)
def test_simple(self):
self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
os.listxattr)
def test_lpath(self):
self._check_xattrs(os.lgetxattr, os.lsetxattr, os.lremovexattr,
os.llistxattr)
self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
os.listxattr, follow_symlinks=False)
def test_fds(self):
def getxattr(path, *args):
with open(path, "rb") as fp:
return os.fgetxattr(fp.fileno(), *args)
return os.getxattr(fp.fileno(), *args)
def setxattr(path, *args):
with open(path, "wb") as fp:
os.fsetxattr(fp.fileno(), *args)
os.setxattr(fp.fileno(), *args)
def removexattr(path, *args):
with open(path, "wb") as fp:
os.fremovexattr(fp.fileno(), *args)
os.removexattr(fp.fileno(), *args)
def listxattr(path, *args):
with open(path, "rb") as fp:
return os.flistxattr(fp.fileno(), *args)
return os.listxattr(fp.fileno(), *args)
self._check_xattrs(getxattr, setxattr, removexattr, listxattr)