mirror of
https://github.com/python/cpython.git
synced 2025-08-03 16:39:00 +00:00
expose linux extended file system attributes (closes #12720)
This commit is contained in:
parent
ab3bea6815
commit
799bd80d8a
6 changed files with 649 additions and 2 deletions
|
@ -14,6 +14,8 @@ import shutil
|
|||
from test import support
|
||||
import contextlib
|
||||
import mmap
|
||||
import platform
|
||||
import re
|
||||
import uuid
|
||||
import asyncore
|
||||
import asynchat
|
||||
|
@ -1506,6 +1508,97 @@ class TestSendfile(unittest.TestCase):
|
|||
raise
|
||||
|
||||
|
||||
def supports_extended_attributes():
|
||||
if not hasattr(os, "setxattr"):
|
||||
return False
|
||||
try:
|
||||
with open(support.TESTFN, "wb") as fp:
|
||||
try:
|
||||
os.fsetxattr(fp.fileno(), b"user.test", b"")
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOTSUP:
|
||||
raise
|
||||
return False
|
||||
finally:
|
||||
support.unlink(support.TESTFN)
|
||||
# Kernels < 2.6.39 don't respect setxattr flags.
|
||||
kernel_version = platform.release()
|
||||
m = re.match("2.6.(\d{1,2})", kernel_version)
|
||||
return m is None or int(m.group(1)) >= 39
|
||||
|
||||
|
||||
@unittest.skipUnless(supports_extended_attributes(),
|
||||
"no non-broken extended attribute support")
|
||||
class ExtendedAttributeTests(unittest.TestCase):
|
||||
|
||||
def tearDown(self):
|
||||
support.unlink(support.TESTFN)
|
||||
|
||||
def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr):
|
||||
fn = support.TESTFN
|
||||
open(fn, "wb").close()
|
||||
with self.assertRaises(OSError) as cm:
|
||||
getxattr(fn, s("user.test"))
|
||||
self.assertEqual(cm.exception.errno, errno.ENODATA)
|
||||
self.assertEqual(listxattr(fn), [])
|
||||
setxattr(fn, s("user.test"), b"")
|
||||
self.assertEqual(listxattr(fn), ["user.test"])
|
||||
self.assertEqual(getxattr(fn, b"user.test"), b"")
|
||||
setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE)
|
||||
self.assertEqual(getxattr(fn, b"user.test"), b"hello")
|
||||
with self.assertRaises(OSError) as cm:
|
||||
setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE)
|
||||
self.assertEqual(cm.exception.errno, errno.EEXIST)
|
||||
with self.assertRaises(OSError) as cm:
|
||||
setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE)
|
||||
self.assertEqual(cm.exception.errno, errno.ENODATA)
|
||||
setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE)
|
||||
self.assertEqual(sorted(listxattr(fn)), ["user.test", "user.test2"])
|
||||
removexattr(fn, s("user.test"))
|
||||
with self.assertRaises(OSError) as cm:
|
||||
getxattr(fn, s("user.test"))
|
||||
self.assertEqual(cm.exception.errno, errno.ENODATA)
|
||||
self.assertEqual(listxattr(fn), ["user.test2"])
|
||||
self.assertEqual(getxattr(fn, s("user.test2")), b"foo")
|
||||
setxattr(fn, s("user.test"), b"a"*1024)
|
||||
self.assertEqual(getxattr(fn, s("user.test")), b"a"*1024)
|
||||
removexattr(fn, s("user.test"))
|
||||
many = sorted("user.test{}".format(i) for i in range(100))
|
||||
for thing in many:
|
||||
setxattr(fn, thing, b"x")
|
||||
self.assertEqual(sorted(listxattr(fn)), many)
|
||||
|
||||
def _check_xattrs(self, *args):
|
||||
def make_bytes(s):
|
||||
return bytes(s, "ascii")
|
||||
self._check_xattrs_str(str, *args)
|
||||
support.unlink(support.TESTFN)
|
||||
self._check_xattrs_str(make_bytes, *args)
|
||||
|
||||
def test_simple(self):
|
||||
self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
|
||||
os.listxattr)
|
||||
|
||||
def test_lpath(self):
|
||||
self._check_xattrs(os.lgetxattr, os.lsetxattr, os.lremovexattr,
|
||||
os.llistxattr)
|
||||
|
||||
def test_fds(self):
|
||||
def getxattr(path, *args):
|
||||
with open(path, "rb") as fp:
|
||||
return os.fgetxattr(fp.fileno(), *args)
|
||||
def setxattr(path, *args):
|
||||
with open(path, "wb") as fp:
|
||||
os.fsetxattr(fp.fileno(), *args)
|
||||
def removexattr(path, *args):
|
||||
with open(path, "wb") as fp:
|
||||
os.fremovexattr(fp.fileno(), *args)
|
||||
def listxattr(path, *args):
|
||||
with open(path, "rb") as fp:
|
||||
return os.flistxattr(fp.fileno(), *args)
|
||||
self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
|
||||
|
||||
|
||||
@support.reap_threads
|
||||
def test_main():
|
||||
support.run_unittest(
|
||||
|
@ -1529,6 +1622,7 @@ def test_main():
|
|||
LinkTests,
|
||||
TestSendfile,
|
||||
ProgramPriorityTests,
|
||||
ExtendedAttributeTests,
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue