Enhance and modernize test_os

* add create_file() helper function
* create files using "x" mode instead of "w" to detect when a previous test
  forget to remove a file
* open file for writing in unbuferred mode (buffering=0)
* replace "try/finally: unlink" with self.addCleanup(support.unlink)
* register unlink cleanup function *before* creating new files
This commit is contained in:
Victor Stinner 2016-03-24 17:12:55 +01:00
parent f95a19b900
commit ae39d236b4

View file

@ -99,6 +99,11 @@ def bytes_filename_warn(expected):
yield yield
def create_file(filename, content=b'content'):
with open(filename, "xb", 0) as fp:
fp.write(content)
# Tests creating TESTFN # Tests creating TESTFN
class FileTests(unittest.TestCase): class FileTests(unittest.TestCase):
def setUp(self): def setUp(self):
@ -157,9 +162,8 @@ class FileTests(unittest.TestCase):
"needs INT_MAX < PY_SSIZE_T_MAX") "needs INT_MAX < PY_SSIZE_T_MAX")
@support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False) @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
def test_large_read(self, size): def test_large_read(self, size):
with open(support.TESTFN, "wb") as fp:
fp.write(b'test')
self.addCleanup(support.unlink, support.TESTFN) self.addCleanup(support.unlink, support.TESTFN)
create_file(support.TESTFN, b'test')
# Issue #21932: Make sure that os.read() does not raise an # Issue #21932: Make sure that os.read() does not raise an
# OverflowError for size larger than INT_MAX # OverflowError for size larger than INT_MAX
@ -216,11 +220,12 @@ class FileTests(unittest.TestCase):
def test_replace(self): def test_replace(self):
TESTFN2 = support.TESTFN + ".2" TESTFN2 = support.TESTFN + ".2"
with open(support.TESTFN, 'w') as f: self.addCleanup(support.unlink, support.TESTFN)
f.write("1") self.addCleanup(support.unlink, TESTFN2)
with open(TESTFN2, 'w') as f:
f.write("2") create_file(support.TESTFN, b"1")
self.addCleanup(os.unlink, TESTFN2) create_file(TESTFN2, b"2")
os.replace(support.TESTFN, TESTFN2) os.replace(support.TESTFN, TESTFN2)
self.assertRaises(FileNotFoundError, os.stat, support.TESTFN) self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
with open(TESTFN2, 'r') as f: with open(TESTFN2, 'r') as f:
@ -245,8 +250,7 @@ class StatAttributeTests(unittest.TestCase):
def setUp(self): def setUp(self):
self.fname = support.TESTFN self.fname = support.TESTFN
self.addCleanup(support.unlink, self.fname) self.addCleanup(support.unlink, self.fname)
with open(self.fname, 'wb') as fp: create_file(self.fname, b"ABC")
fp.write(b"ABC")
@unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def check_stat_attributes(self, fname): def check_stat_attributes(self, fname):
@ -455,8 +459,7 @@ class UtimeTests(unittest.TestCase):
self.addCleanup(support.rmtree, self.dirname) self.addCleanup(support.rmtree, self.dirname)
os.mkdir(self.dirname) os.mkdir(self.dirname)
with open(self.fname, 'wb') as fp: create_file(self.fname)
fp.write(b"ABC")
def restore_float_times(state): def restore_float_times(state):
with ignore_deprecation_warnings('stat_float_times'): with ignore_deprecation_warnings('stat_float_times'):
@ -555,7 +558,7 @@ class UtimeTests(unittest.TestCase):
"fd support for utime required for this test.") "fd support for utime required for this test.")
def test_utime_fd(self): def test_utime_fd(self):
def set_time(filename, ns): def set_time(filename, ns):
with open(filename, 'wb') as fp: with open(filename, 'wb', 0) as fp:
# use a file descriptor to test futimens(timespec) # use a file descriptor to test futimens(timespec)
# or futimes(timeval) # or futimes(timeval)
os.utime(fp.fileno(), ns=ns) os.utime(fp.fileno(), ns=ns)
@ -1213,8 +1216,7 @@ class RemoveDirsTests(unittest.TestCase):
os.mkdir(dira) os.mkdir(dira)
dirb = os.path.join(dira, 'dirb') dirb = os.path.join(dira, 'dirb')
os.mkdir(dirb) os.mkdir(dirb)
with open(os.path.join(dira, 'file.txt'), 'w') as f: create_file(os.path.join(dira, 'file.txt'))
f.write('text')
os.removedirs(dirb) os.removedirs(dirb)
self.assertFalse(os.path.exists(dirb)) self.assertFalse(os.path.exists(dirb))
self.assertTrue(os.path.exists(dira)) self.assertTrue(os.path.exists(dira))
@ -1225,8 +1227,7 @@ class RemoveDirsTests(unittest.TestCase):
os.mkdir(dira) os.mkdir(dira)
dirb = os.path.join(dira, 'dirb') dirb = os.path.join(dira, 'dirb')
os.mkdir(dirb) os.mkdir(dirb)
with open(os.path.join(dirb, 'file.txt'), 'w') as f: create_file(os.path.join(dirb, 'file.txt'))
f.write('text')
with self.assertRaises(OSError): with self.assertRaises(OSError):
os.removedirs(dirb) os.removedirs(dirb)
self.assertTrue(os.path.exists(dirb)) self.assertTrue(os.path.exists(dirb))
@ -1236,7 +1237,7 @@ class RemoveDirsTests(unittest.TestCase):
class DevNullTests(unittest.TestCase): class DevNullTests(unittest.TestCase):
def test_devnull(self): def test_devnull(self):
with open(os.devnull, 'wb') as f: with open(os.devnull, 'wb', 0) as f:
f.write(b'hello') f.write(b'hello')
f.close() f.close()
with open(os.devnull, 'rb') as f: with open(os.devnull, 'rb') as f:
@ -1323,9 +1324,9 @@ class URandomFDTests(unittest.TestCase):
def test_urandom_fd_reopened(self): def test_urandom_fd_reopened(self):
# Issue #21207: urandom() should detect its fd to /dev/urandom # Issue #21207: urandom() should detect its fd to /dev/urandom
# changed to something else, and reopen it. # changed to something else, and reopen it.
with open(support.TESTFN, 'wb') as f: self.addCleanup(support.unlink, support.TESTFN)
f.write(b"x" * 256) create_file(support.TESTFN, b"x" * 256)
self.addCleanup(os.unlink, support.TESTFN)
code = """if 1: code = """if 1:
import os import os
import sys import sys
@ -1464,12 +1465,10 @@ class Win32ErrorTests(unittest.TestCase):
self.assertRaises(OSError, os.chdir, support.TESTFN) self.assertRaises(OSError, os.chdir, support.TESTFN)
def test_mkdir(self): def test_mkdir(self):
f = open(support.TESTFN, "w") self.addCleanup(support.unlink, support.TESTFN)
try:
with open(support.TESTFN, "w") as f:
self.assertRaises(OSError, os.mkdir, support.TESTFN) self.assertRaises(OSError, os.mkdir, support.TESTFN)
finally:
f.close()
os.unlink(support.TESTFN)
def test_utime(self): def test_utime(self):
self.assertRaises(OSError, os.utime, support.TESTFN, None) self.assertRaises(OSError, os.utime, support.TESTFN, None)
@ -1988,15 +1987,14 @@ class Win32SymlinkTests(unittest.TestCase):
level1 = os.path.abspath(support.TESTFN) level1 = os.path.abspath(support.TESTFN)
level2 = os.path.join(level1, "level2") level2 = os.path.join(level1, "level2")
level3 = os.path.join(level2, "level3") level3 = os.path.join(level2, "level3")
try: self.addCleanup(support.rmtree, level1)
os.mkdir(level1) os.mkdir(level1)
os.mkdir(level2) os.mkdir(level2)
os.mkdir(level3) os.mkdir(level3)
file1 = os.path.abspath(os.path.join(level1, "file1")) file1 = os.path.abspath(os.path.join(level1, "file1"))
create_file(file1)
with open(file1, "w") as f:
f.write("file1")
orig_dir = os.getcwd() orig_dir = os.getcwd()
try: try:
@ -2019,11 +2017,6 @@ class Win32SymlinkTests(unittest.TestCase):
os.stat(os.path.relpath(link))) os.stat(os.path.relpath(link)))
finally: finally:
os.chdir(orig_dir) os.chdir(orig_dir)
except OSError as err:
self.fail(err)
finally:
os.remove(file1)
shutil.rmtree(level1)
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
@ -2159,8 +2152,8 @@ class ProgramPriorityTests(unittest.TestCase):
try: try:
new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid()) new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
if base >= 19 and new_prio <= 19: if base >= 19 and new_prio <= 19:
raise unittest.SkipTest( raise unittest.SkipTest("unable to reliably test setpriority "
"unable to reliably test setpriority at current nice level of %s" % base) "at current nice level of %s" % base)
else: else:
self.assertEqual(new_prio, base + 1) self.assertEqual(new_prio, base + 1)
finally: finally:
@ -2270,8 +2263,7 @@ class TestSendfile(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
cls.key = support.threading_setup() cls.key = support.threading_setup()
with open(support.TESTFN, "wb") as f: create_file(support.TESTFN, cls.DATA)
f.write(cls.DATA)
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
@ -2421,10 +2413,11 @@ class TestSendfile(unittest.TestCase):
def test_trailers(self): def test_trailers(self):
TESTFN2 = support.TESTFN + "2" TESTFN2 = support.TESTFN + "2"
file_data = b"abcdef" file_data = b"abcdef"
with open(TESTFN2, 'wb') as f:
f.write(file_data) self.addCleanup(support.unlink, TESTFN2)
with open(TESTFN2, 'rb')as f: create_file(TESTFN2, file_data)
self.addCleanup(os.remove, TESTFN2)
with open(TESTFN2, 'rb') as f:
os.sendfile(self.sockno, f.fileno(), 0, len(file_data), os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
trailers=[b"1234"]) trailers=[b"1234"])
self.client.close() self.client.close()
@ -2447,8 +2440,9 @@ class TestSendfile(unittest.TestCase):
def supports_extended_attributes(): def supports_extended_attributes():
if not hasattr(os, "setxattr"): if not hasattr(os, "setxattr"):
return False return False
try: try:
with open(support.TESTFN, "wb") as fp: with open(support.TESTFN, "xb", 0) as fp:
try: try:
os.setxattr(fp.fileno(), b"user.test", b"") os.setxattr(fp.fileno(), b"user.test", b"")
except OSError: except OSError:
@ -2465,17 +2459,18 @@ def supports_extended_attributes():
@support.requires_linux_version(2, 6, 39) @support.requires_linux_version(2, 6, 39)
class ExtendedAttributeTests(unittest.TestCase): class ExtendedAttributeTests(unittest.TestCase):
def tearDown(self):
support.unlink(support.TESTFN)
def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs): def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
fn = support.TESTFN fn = support.TESTFN
open(fn, "wb").close() self.addCleanup(support.unlink, fn)
create_file(fn)
with self.assertRaises(OSError) as cm: with self.assertRaises(OSError) as cm:
getxattr(fn, s("user.test"), **kwargs) getxattr(fn, s("user.test"), **kwargs)
self.assertEqual(cm.exception.errno, errno.ENODATA) self.assertEqual(cm.exception.errno, errno.ENODATA)
init_xattr = listxattr(fn) init_xattr = listxattr(fn)
self.assertIsInstance(init_xattr, list) self.assertIsInstance(init_xattr, list)
setxattr(fn, s("user.test"), b"", **kwargs) setxattr(fn, s("user.test"), b"", **kwargs)
xattr = set(init_xattr) xattr = set(init_xattr)
xattr.add("user.test") xattr.add("user.test")
@ -2483,19 +2478,24 @@ class ExtendedAttributeTests(unittest.TestCase):
self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"") self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs) setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello") self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
with self.assertRaises(OSError) as cm: with self.assertRaises(OSError) as cm:
setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs) setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
self.assertEqual(cm.exception.errno, errno.EEXIST) self.assertEqual(cm.exception.errno, errno.EEXIST)
with self.assertRaises(OSError) as cm: with self.assertRaises(OSError) as cm:
setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs) setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
self.assertEqual(cm.exception.errno, errno.ENODATA) self.assertEqual(cm.exception.errno, errno.ENODATA)
setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs) setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
xattr.add("user.test2") xattr.add("user.test2")
self.assertEqual(set(listxattr(fn)), xattr) self.assertEqual(set(listxattr(fn)), xattr)
removexattr(fn, s("user.test"), **kwargs) removexattr(fn, s("user.test"), **kwargs)
with self.assertRaises(OSError) as cm: with self.assertRaises(OSError) as cm:
getxattr(fn, s("user.test"), **kwargs) getxattr(fn, s("user.test"), **kwargs)
self.assertEqual(cm.exception.errno, errno.ENODATA) self.assertEqual(cm.exception.errno, errno.ENODATA)
xattr.remove("user.test") xattr.remove("user.test")
self.assertEqual(set(listxattr(fn)), xattr) self.assertEqual(set(listxattr(fn)), xattr)
self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo") self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
@ -2508,11 +2508,11 @@ class ExtendedAttributeTests(unittest.TestCase):
self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many)) self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
def _check_xattrs(self, *args, **kwargs): def _check_xattrs(self, *args, **kwargs):
def make_bytes(s):
return bytes(s, "ascii")
self._check_xattrs_str(str, *args, **kwargs) self._check_xattrs_str(str, *args, **kwargs)
support.unlink(support.TESTFN) support.unlink(support.TESTFN)
self._check_xattrs_str(make_bytes, *args, **kwargs)
self._check_xattrs_str(os.fsencode, *args, **kwargs)
support.unlink(support.TESTFN)
def test_simple(self): def test_simple(self):
self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
@ -2527,10 +2527,10 @@ class ExtendedAttributeTests(unittest.TestCase):
with open(path, "rb") as fp: with open(path, "rb") as fp:
return os.getxattr(fp.fileno(), *args) return os.getxattr(fp.fileno(), *args)
def setxattr(path, *args): def setxattr(path, *args):
with open(path, "wb") as fp: with open(path, "wb", 0) as fp:
os.setxattr(fp.fileno(), *args) os.setxattr(fp.fileno(), *args)
def removexattr(path, *args): def removexattr(path, *args):
with open(path, "wb") as fp: with open(path, "wb", 0) as fp:
os.removexattr(fp.fileno(), *args) os.removexattr(fp.fileno(), *args)
def listxattr(path, *args): def listxattr(path, *args):
with open(path, "rb") as fp: with open(path, "rb") as fp:
@ -2844,8 +2844,7 @@ class TestScandir(unittest.TestCase):
def create_file(self, name="file.txt"): def create_file(self, name="file.txt"):
filename = os.path.join(self.path, name) filename = os.path.join(self.path, name)
with open(filename, "wb") as fp: create_file(filename, b'python')
fp.write(b'python')
return filename return filename
def get_entries(self, names): def get_entries(self, names):