mirror of
https://github.com/python/cpython.git
synced 2025-08-30 05:35:08 +00:00
Issue #18702: All skipped tests now reported as skipped.
This commit is contained in:
parent
7a07cc90c7
commit
43767638a9
21 changed files with 967 additions and 925 deletions
|
@ -185,10 +185,8 @@ class StatAttributeTests(unittest.TestCase):
|
|||
os.unlink(self.fname)
|
||||
os.rmdir(support.TESTFN)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
|
||||
def check_stat_attributes(self, fname):
|
||||
if not hasattr(os, "stat"):
|
||||
return
|
||||
|
||||
result = os.stat(fname)
|
||||
|
||||
# Make sure direct access works
|
||||
|
@ -272,10 +270,8 @@ class StatAttributeTests(unittest.TestCase):
|
|||
unpickled = pickle.loads(p)
|
||||
self.assertEqual(result, unpickled)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
|
||||
def test_statvfs_attributes(self):
|
||||
if not hasattr(os, "statvfs"):
|
||||
return
|
||||
|
||||
try:
|
||||
result = os.statvfs(self.fname)
|
||||
except OSError as e:
|
||||
|
@ -479,10 +475,10 @@ class StatAttributeTests(unittest.TestCase):
|
|||
os.close(dirfd)
|
||||
self._test_utime_subsecond(set_time)
|
||||
|
||||
# Restrict test to Win32, since there is no guarantee other
|
||||
# Restrict tests to Win32, since there is no guarantee other
|
||||
# systems support centiseconds
|
||||
if sys.platform == 'win32':
|
||||
def get_file_system(path):
|
||||
def get_file_system(path):
|
||||
if sys.platform == 'win32':
|
||||
root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
|
||||
import ctypes
|
||||
kernel32 = ctypes.windll.kernel32
|
||||
|
@ -490,38 +486,45 @@ class StatAttributeTests(unittest.TestCase):
|
|||
if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
|
||||
return buf.value
|
||||
|
||||
if get_file_system(support.TESTFN) == "NTFS":
|
||||
def test_1565150(self):
|
||||
t1 = 1159195039.25
|
||||
os.utime(self.fname, (t1, t1))
|
||||
self.assertEqual(os.stat(self.fname).st_mtime, t1)
|
||||
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
|
||||
@unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
|
||||
"requires NTFS")
|
||||
def test_1565150(self):
|
||||
t1 = 1159195039.25
|
||||
os.utime(self.fname, (t1, t1))
|
||||
self.assertEqual(os.stat(self.fname).st_mtime, t1)
|
||||
|
||||
def test_large_time(self):
|
||||
t1 = 5000000000 # some day in 2128
|
||||
os.utime(self.fname, (t1, t1))
|
||||
self.assertEqual(os.stat(self.fname).st_mtime, t1)
|
||||
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
|
||||
@unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
|
||||
"requires NTFS")
|
||||
def test_large_time(self):
|
||||
t1 = 5000000000 # some day in 2128
|
||||
os.utime(self.fname, (t1, t1))
|
||||
self.assertEqual(os.stat(self.fname).st_mtime, t1)
|
||||
|
||||
def test_1686475(self):
|
||||
# Verify that an open file can be stat'ed
|
||||
try:
|
||||
os.stat(r"c:\pagefile.sys")
|
||||
except FileNotFoundError:
|
||||
pass # file does not exist; cannot run test
|
||||
except OSError as e:
|
||||
self.fail("Could not stat pagefile.sys")
|
||||
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
|
||||
def test_1686475(self):
|
||||
# Verify that an open file can be stat'ed
|
||||
try:
|
||||
os.stat(r"c:\pagefile.sys")
|
||||
except FileNotFoundError:
|
||||
pass # file does not exist; cannot run test
|
||||
except OSError as e:
|
||||
self.fail("Could not stat pagefile.sys")
|
||||
|
||||
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
|
||||
def test_15261(self):
|
||||
# Verify that stat'ing a closed fd does not cause crash
|
||||
r, w = os.pipe()
|
||||
try:
|
||||
os.stat(r) # should not raise error
|
||||
finally:
|
||||
os.close(r)
|
||||
os.close(w)
|
||||
with self.assertRaises(OSError) as ctx:
|
||||
os.stat(r)
|
||||
self.assertEqual(ctx.exception.errno, errno.EBADF)
|
||||
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
|
||||
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
|
||||
def test_15261(self):
|
||||
# Verify that stat'ing a closed fd does not cause crash
|
||||
r, w = os.pipe()
|
||||
try:
|
||||
os.stat(r) # should not raise error
|
||||
finally:
|
||||
os.close(r)
|
||||
os.close(w)
|
||||
with self.assertRaises(OSError) as ctx:
|
||||
os.stat(r)
|
||||
self.assertEqual(ctx.exception.errno, errno.EBADF)
|
||||
|
||||
from test import mapping_tests
|
||||
|
||||
|
@ -1167,6 +1170,7 @@ class ExecTests(unittest.TestCase):
|
|||
self._test_internal_execvpe(bytes)
|
||||
|
||||
|
||||
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
|
||||
class Win32ErrorTests(unittest.TestCase):
|
||||
def test_rename(self):
|
||||
self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
|
||||
|
@ -1213,63 +1217,63 @@ class TestInvalidFD(unittest.TestCase):
|
|||
self.fail("%r didn't raise a OSError with a bad file descriptor"
|
||||
% f)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
|
||||
def test_isatty(self):
|
||||
if hasattr(os, "isatty"):
|
||||
self.assertEqual(os.isatty(support.make_bad_fd()), False)
|
||||
self.assertEqual(os.isatty(support.make_bad_fd()), False)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
|
||||
def test_closerange(self):
|
||||
if hasattr(os, "closerange"):
|
||||
fd = support.make_bad_fd()
|
||||
# Make sure none of the descriptors we are about to close are
|
||||
# currently valid (issue 6542).
|
||||
for i in range(10):
|
||||
try: os.fstat(fd+i)
|
||||
except OSError:
|
||||
pass
|
||||
else:
|
||||
break
|
||||
if i < 2:
|
||||
raise unittest.SkipTest(
|
||||
"Unable to acquire a range of invalid file descriptors")
|
||||
self.assertEqual(os.closerange(fd, fd + i-1), None)
|
||||
fd = support.make_bad_fd()
|
||||
# Make sure none of the descriptors we are about to close are
|
||||
# currently valid (issue 6542).
|
||||
for i in range(10):
|
||||
try: os.fstat(fd+i)
|
||||
except OSError:
|
||||
pass
|
||||
else:
|
||||
break
|
||||
if i < 2:
|
||||
raise unittest.SkipTest(
|
||||
"Unable to acquire a range of invalid file descriptors")
|
||||
self.assertEqual(os.closerange(fd, fd + i-1), None)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
|
||||
def test_dup2(self):
|
||||
if hasattr(os, "dup2"):
|
||||
self.check(os.dup2, 20)
|
||||
self.check(os.dup2, 20)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
|
||||
def test_fchmod(self):
|
||||
if hasattr(os, "fchmod"):
|
||||
self.check(os.fchmod, 0)
|
||||
self.check(os.fchmod, 0)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
|
||||
def test_fchown(self):
|
||||
if hasattr(os, "fchown"):
|
||||
self.check(os.fchown, -1, -1)
|
||||
self.check(os.fchown, -1, -1)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
|
||||
def test_fpathconf(self):
|
||||
if hasattr(os, "fpathconf"):
|
||||
self.check(os.pathconf, "PC_NAME_MAX")
|
||||
self.check(os.fpathconf, "PC_NAME_MAX")
|
||||
self.check(os.pathconf, "PC_NAME_MAX")
|
||||
self.check(os.fpathconf, "PC_NAME_MAX")
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
|
||||
def test_ftruncate(self):
|
||||
if hasattr(os, "ftruncate"):
|
||||
self.check(os.truncate, 0)
|
||||
self.check(os.ftruncate, 0)
|
||||
self.check(os.truncate, 0)
|
||||
self.check(os.ftruncate, 0)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
|
||||
def test_lseek(self):
|
||||
if hasattr(os, "lseek"):
|
||||
self.check(os.lseek, 0, 0)
|
||||
self.check(os.lseek, 0, 0)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
|
||||
def test_read(self):
|
||||
if hasattr(os, "read"):
|
||||
self.check(os.read, 1)
|
||||
self.check(os.read, 1)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
|
||||
def test_tcsetpgrpt(self):
|
||||
if hasattr(os, "tcsetpgrp"):
|
||||
self.check(os.tcsetpgrp, 0)
|
||||
self.check(os.tcsetpgrp, 0)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
|
||||
def test_write(self):
|
||||
if hasattr(os, "write"):
|
||||
self.check(os.write, b" ")
|
||||
self.check(os.write, b" ")
|
||||
|
||||
|
||||
class LinkTests(unittest.TestCase):
|
||||
|
@ -1309,138 +1313,133 @@ class LinkTests(unittest.TestCase):
|
|||
self.file2 = self.file1 + "2"
|
||||
self._test_link(self.file1, self.file2)
|
||||
|
||||
if sys.platform != 'win32':
|
||||
class Win32ErrorTests(unittest.TestCase):
|
||||
pass
|
||||
@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
|
||||
class PosixUidGidTests(unittest.TestCase):
|
||||
@unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
|
||||
def test_setuid(self):
|
||||
if os.getuid() != 0:
|
||||
self.assertRaises(OSError, os.setuid, 0)
|
||||
self.assertRaises(OverflowError, os.setuid, 1<<32)
|
||||
|
||||
class PosixUidGidTests(unittest.TestCase):
|
||||
if hasattr(os, 'setuid'):
|
||||
def test_setuid(self):
|
||||
if os.getuid() != 0:
|
||||
self.assertRaises(OSError, os.setuid, 0)
|
||||
self.assertRaises(OverflowError, os.setuid, 1<<32)
|
||||
@unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
|
||||
def test_setgid(self):
|
||||
if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
|
||||
self.assertRaises(OSError, os.setgid, 0)
|
||||
self.assertRaises(OverflowError, os.setgid, 1<<32)
|
||||
|
||||
if hasattr(os, 'setgid'):
|
||||
def test_setgid(self):
|
||||
if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
|
||||
self.assertRaises(OSError, os.setgid, 0)
|
||||
self.assertRaises(OverflowError, os.setgid, 1<<32)
|
||||
@unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
|
||||
def test_seteuid(self):
|
||||
if os.getuid() != 0:
|
||||
self.assertRaises(OSError, os.seteuid, 0)
|
||||
self.assertRaises(OverflowError, os.seteuid, 1<<32)
|
||||
|
||||
if hasattr(os, 'seteuid'):
|
||||
def test_seteuid(self):
|
||||
if os.getuid() != 0:
|
||||
self.assertRaises(OSError, os.seteuid, 0)
|
||||
self.assertRaises(OverflowError, os.seteuid, 1<<32)
|
||||
@unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
|
||||
def test_setegid(self):
|
||||
if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
|
||||
self.assertRaises(OSError, os.setegid, 0)
|
||||
self.assertRaises(OverflowError, os.setegid, 1<<32)
|
||||
|
||||
if hasattr(os, 'setegid'):
|
||||
def test_setegid(self):
|
||||
if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
|
||||
self.assertRaises(OSError, os.setegid, 0)
|
||||
self.assertRaises(OverflowError, os.setegid, 1<<32)
|
||||
@unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
|
||||
def test_setreuid(self):
|
||||
if os.getuid() != 0:
|
||||
self.assertRaises(OSError, os.setreuid, 0, 0)
|
||||
self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
|
||||
self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
|
||||
|
||||
if hasattr(os, 'setreuid'):
|
||||
def test_setreuid(self):
|
||||
if os.getuid() != 0:
|
||||
self.assertRaises(OSError, os.setreuid, 0, 0)
|
||||
self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
|
||||
self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
|
||||
@unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
|
||||
def test_setreuid_neg1(self):
|
||||
# Needs to accept -1. We run this in a subprocess to avoid
|
||||
# altering the test runner's process state (issue8045).
|
||||
subprocess.check_call([
|
||||
sys.executable, '-c',
|
||||
'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
|
||||
|
||||
def test_setreuid_neg1(self):
|
||||
# Needs to accept -1. We run this in a subprocess to avoid
|
||||
# altering the test runner's process state (issue8045).
|
||||
subprocess.check_call([
|
||||
sys.executable, '-c',
|
||||
'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
|
||||
@unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
|
||||
def test_setregid(self):
|
||||
if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
|
||||
self.assertRaises(OSError, os.setregid, 0, 0)
|
||||
self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
|
||||
self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
|
||||
|
||||
if hasattr(os, 'setregid'):
|
||||
def test_setregid(self):
|
||||
if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
|
||||
self.assertRaises(OSError, os.setregid, 0, 0)
|
||||
self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
|
||||
self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
|
||||
@unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
|
||||
def test_setregid_neg1(self):
|
||||
# Needs to accept -1. We run this in a subprocess to avoid
|
||||
# altering the test runner's process state (issue8045).
|
||||
subprocess.check_call([
|
||||
sys.executable, '-c',
|
||||
'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
|
||||
|
||||
def test_setregid_neg1(self):
|
||||
# Needs to accept -1. We run this in a subprocess to avoid
|
||||
# altering the test runner's process state (issue8045).
|
||||
subprocess.check_call([
|
||||
sys.executable, '-c',
|
||||
'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
|
||||
@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
|
||||
class Pep383Tests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
if support.TESTFN_UNENCODABLE:
|
||||
self.dir = support.TESTFN_UNENCODABLE
|
||||
elif support.TESTFN_NONASCII:
|
||||
self.dir = support.TESTFN_NONASCII
|
||||
else:
|
||||
self.dir = support.TESTFN
|
||||
self.bdir = os.fsencode(self.dir)
|
||||
|
||||
class Pep383Tests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
if support.TESTFN_UNENCODABLE:
|
||||
self.dir = support.TESTFN_UNENCODABLE
|
||||
elif support.TESTFN_NONASCII:
|
||||
self.dir = support.TESTFN_NONASCII
|
||||
else:
|
||||
self.dir = support.TESTFN
|
||||
self.bdir = os.fsencode(self.dir)
|
||||
|
||||
bytesfn = []
|
||||
def add_filename(fn):
|
||||
try:
|
||||
fn = os.fsencode(fn)
|
||||
except UnicodeEncodeError:
|
||||
return
|
||||
bytesfn.append(fn)
|
||||
add_filename(support.TESTFN_UNICODE)
|
||||
if support.TESTFN_UNENCODABLE:
|
||||
add_filename(support.TESTFN_UNENCODABLE)
|
||||
if support.TESTFN_NONASCII:
|
||||
add_filename(support.TESTFN_NONASCII)
|
||||
if not bytesfn:
|
||||
self.skipTest("couldn't create any non-ascii filename")
|
||||
|
||||
self.unicodefn = set()
|
||||
os.mkdir(self.dir)
|
||||
bytesfn = []
|
||||
def add_filename(fn):
|
||||
try:
|
||||
for fn in bytesfn:
|
||||
support.create_empty_file(os.path.join(self.bdir, fn))
|
||||
fn = os.fsdecode(fn)
|
||||
if fn in self.unicodefn:
|
||||
raise ValueError("duplicate filename")
|
||||
self.unicodefn.add(fn)
|
||||
except:
|
||||
shutil.rmtree(self.dir)
|
||||
raise
|
||||
fn = os.fsencode(fn)
|
||||
except UnicodeEncodeError:
|
||||
return
|
||||
bytesfn.append(fn)
|
||||
add_filename(support.TESTFN_UNICODE)
|
||||
if support.TESTFN_UNENCODABLE:
|
||||
add_filename(support.TESTFN_UNENCODABLE)
|
||||
if support.TESTFN_NONASCII:
|
||||
add_filename(support.TESTFN_NONASCII)
|
||||
if not bytesfn:
|
||||
self.skipTest("couldn't create any non-ascii filename")
|
||||
|
||||
def tearDown(self):
|
||||
self.unicodefn = set()
|
||||
os.mkdir(self.dir)
|
||||
try:
|
||||
for fn in bytesfn:
|
||||
support.create_empty_file(os.path.join(self.bdir, fn))
|
||||
fn = os.fsdecode(fn)
|
||||
if fn in self.unicodefn:
|
||||
raise ValueError("duplicate filename")
|
||||
self.unicodefn.add(fn)
|
||||
except:
|
||||
shutil.rmtree(self.dir)
|
||||
raise
|
||||
|
||||
def test_listdir(self):
|
||||
expected = self.unicodefn
|
||||
found = set(os.listdir(self.dir))
|
||||
self.assertEqual(found, expected)
|
||||
# test listdir without arguments
|
||||
current_directory = os.getcwd()
|
||||
try:
|
||||
os.chdir(os.sep)
|
||||
self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
|
||||
finally:
|
||||
os.chdir(current_directory)
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.dir)
|
||||
|
||||
def test_open(self):
|
||||
for fn in self.unicodefn:
|
||||
f = open(os.path.join(self.dir, fn), 'rb')
|
||||
f.close()
|
||||
def test_listdir(self):
|
||||
expected = self.unicodefn
|
||||
found = set(os.listdir(self.dir))
|
||||
self.assertEqual(found, expected)
|
||||
# test listdir without arguments
|
||||
current_directory = os.getcwd()
|
||||
try:
|
||||
os.chdir(os.sep)
|
||||
self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
|
||||
finally:
|
||||
os.chdir(current_directory)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'statvfs'),
|
||||
"need os.statvfs()")
|
||||
def test_statvfs(self):
|
||||
# issue #9645
|
||||
for fn in self.unicodefn:
|
||||
# should not fail with file not found error
|
||||
fullname = os.path.join(self.dir, fn)
|
||||
os.statvfs(fullname)
|
||||
def test_open(self):
|
||||
for fn in self.unicodefn:
|
||||
f = open(os.path.join(self.dir, fn), 'rb')
|
||||
f.close()
|
||||
|
||||
def test_stat(self):
|
||||
for fn in self.unicodefn:
|
||||
os.stat(os.path.join(self.dir, fn))
|
||||
else:
|
||||
class PosixUidGidTests(unittest.TestCase):
|
||||
pass
|
||||
class Pep383Tests(unittest.TestCase):
|
||||
pass
|
||||
@unittest.skipUnless(hasattr(os, 'statvfs'),
|
||||
"need os.statvfs()")
|
||||
def test_statvfs(self):
|
||||
# issue #9645
|
||||
for fn in self.unicodefn:
|
||||
# should not fail with file not found error
|
||||
fullname = os.path.join(self.dir, fn)
|
||||
os.statvfs(fullname)
|
||||
|
||||
def test_stat(self):
|
||||
for fn in self.unicodefn:
|
||||
os.stat(os.path.join(self.dir, fn))
|
||||
|
||||
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
|
||||
class Win32KillTests(unittest.TestCase):
|
||||
|
@ -1924,6 +1923,8 @@ class TestSendfile(unittest.TestCase):
|
|||
SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
|
||||
not sys.platform.startswith("solaris") and \
|
||||
not sys.platform.startswith("sunos")
|
||||
requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
|
||||
'requires headers and trailers support')
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
|
@ -2042,52 +2043,54 @@ class TestSendfile(unittest.TestCase):
|
|||
|
||||
# --- headers / trailers tests
|
||||
|
||||
if SUPPORT_HEADERS_TRAILERS:
|
||||
|
||||
def test_headers(self):
|
||||
total_sent = 0
|
||||
sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
|
||||
headers=[b"x" * 512])
|
||||
@requires_headers_trailers
|
||||
def test_headers(self):
|
||||
total_sent = 0
|
||||
sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
|
||||
headers=[b"x" * 512])
|
||||
total_sent += sent
|
||||
offset = 4096
|
||||
nbytes = 4096
|
||||
while 1:
|
||||
sent = self.sendfile_wrapper(self.sockno, self.fileno,
|
||||
offset, nbytes)
|
||||
if sent == 0:
|
||||
break
|
||||
total_sent += sent
|
||||
offset = 4096
|
||||
nbytes = 4096
|
||||
while 1:
|
||||
sent = self.sendfile_wrapper(self.sockno, self.fileno,
|
||||
offset, nbytes)
|
||||
if sent == 0:
|
||||
break
|
||||
total_sent += sent
|
||||
offset += sent
|
||||
offset += sent
|
||||
|
||||
expected_data = b"x" * 512 + self.DATA
|
||||
self.assertEqual(total_sent, len(expected_data))
|
||||
expected_data = b"x" * 512 + self.DATA
|
||||
self.assertEqual(total_sent, len(expected_data))
|
||||
self.client.close()
|
||||
self.server.wait()
|
||||
data = self.server.handler_instance.get_data()
|
||||
self.assertEqual(hash(data), hash(expected_data))
|
||||
|
||||
@requires_headers_trailers
|
||||
def test_trailers(self):
|
||||
TESTFN2 = support.TESTFN + "2"
|
||||
file_data = b"abcdef"
|
||||
with open(TESTFN2, 'wb') as f:
|
||||
f.write(file_data)
|
||||
with open(TESTFN2, 'rb')as f:
|
||||
self.addCleanup(os.remove, TESTFN2)
|
||||
os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
|
||||
trailers=[b"1234"])
|
||||
self.client.close()
|
||||
self.server.wait()
|
||||
data = self.server.handler_instance.get_data()
|
||||
self.assertEqual(hash(data), hash(expected_data))
|
||||
self.assertEqual(data, b"abcdef1234")
|
||||
|
||||
def test_trailers(self):
|
||||
TESTFN2 = support.TESTFN + "2"
|
||||
file_data = b"abcdef"
|
||||
with open(TESTFN2, 'wb') as f:
|
||||
f.write(file_data)
|
||||
with open(TESTFN2, 'rb')as f:
|
||||
self.addCleanup(os.remove, TESTFN2)
|
||||
os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
|
||||
trailers=[b"1234"])
|
||||
self.client.close()
|
||||
self.server.wait()
|
||||
data = self.server.handler_instance.get_data()
|
||||
self.assertEqual(data, b"abcdef1234")
|
||||
|
||||
if hasattr(os, "SF_NODISKIO"):
|
||||
def test_flags(self):
|
||||
try:
|
||||
os.sendfile(self.sockno, self.fileno, 0, 4096,
|
||||
flags=os.SF_NODISKIO)
|
||||
except OSError as err:
|
||||
if err.errno not in (errno.EBUSY, errno.EAGAIN):
|
||||
raise
|
||||
@requires_headers_trailers
|
||||
@unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
|
||||
'test needs os.SF_NODISKIO')
|
||||
def test_flags(self):
|
||||
try:
|
||||
os.sendfile(self.sockno, self.fileno, 0, 4096,
|
||||
flags=os.SF_NODISKIO)
|
||||
except OSError as err:
|
||||
if err.errno not in (errno.EBUSY, errno.EAGAIN):
|
||||
raise
|
||||
|
||||
|
||||
def supports_extended_attributes():
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue