Issue #18702: All skipped tests now reported as skipped.

This commit is contained in:
Serhiy Storchaka 2013-11-03 21:31:18 +02:00
parent 834856aca9
commit 7908068627
21 changed files with 908 additions and 883 deletions

View file

@ -11,6 +11,7 @@ import operator
import io import io
import math import math
import struct import struct
import sys
import warnings import warnings
import array import array
@ -993,15 +994,15 @@ class BaseTest:
s = None s = None
self.assertRaises(ReferenceError, len, p) self.assertRaises(ReferenceError, len, p)
@unittest.skipUnless(hasattr(sys, 'getrefcount'),
'test needs sys.getrefcount()')
def test_bug_782369(self): def test_bug_782369(self):
import sys for i in range(10):
if hasattr(sys, "getrefcount"): b = array.array('B', range(64))
for i in range(10): rc = sys.getrefcount(10)
b = array.array('B', range(64)) for i in range(10):
rc = sys.getrefcount(10) b = array.array('B', range(64))
for i in range(10): self.assertEqual(rc, sys.getrefcount(10))
b = array.array('B', range(64))
self.assertEqual(rc, sys.getrefcount(10))
def test_subclass_with_kwargs(self): def test_subclass_with_kwargs(self):
# SF bug #1486663 -- this used to erroneously raise a TypeError # SF bug #1486663 -- this used to erroneously raise a TypeError

View file

@ -39,11 +39,10 @@ class CompileallTests(unittest.TestCase):
compare = struct.pack('<4sl', imp.get_magic(), mtime) compare = struct.pack('<4sl', imp.get_magic(), mtime)
return data, compare return data, compare
@unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def recreation_check(self, metadata): def recreation_check(self, metadata):
"""Check that compileall recreates bytecode when the new metadata is """Check that compileall recreates bytecode when the new metadata is
used.""" used."""
if not hasattr(os, 'stat'):
return
py_compile.compile(self.source_path) py_compile.compile(self.source_path)
self.assertEqual(*self.data()) self.assertEqual(*self.data())
with open(self.bc_path, 'rb') as file: with open(self.bc_path, 'rb') as file:

View file

@ -896,78 +896,77 @@ Stonecutters Seafood and Chop House+ Lemont+ IL+ 12/19/02+ Week Back
dialect = sniffer.sniff(self.sample9) dialect = sniffer.sniff(self.sample9)
self.assertTrue(dialect.doublequote) self.assertTrue(dialect.doublequote)
if not hasattr(sys, "gettotalrefcount"): class NUL:
if support.verbose: print("*** skipping leakage tests ***") def write(s, *args):
else: pass
class NUL: writelines = write
def write(s, *args):
pass
writelines = write
class TestLeaks(unittest.TestCase): @unittest.skipUnless(hasattr(sys, "gettotalrefcount"),
def test_create_read(self): 'requires sys.gettotalrefcount()')
delta = 0 class TestLeaks(unittest.TestCase):
lastrc = sys.gettotalrefcount() def test_create_read(self):
for i in range(20): delta = 0
gc.collect() lastrc = sys.gettotalrefcount()
self.assertEqual(gc.garbage, []) for i in range(20):
rc = sys.gettotalrefcount() gc.collect()
csv.reader(["a,b,c\r\n"]) self.assertEqual(gc.garbage, [])
csv.reader(["a,b,c\r\n"]) rc = sys.gettotalrefcount()
csv.reader(["a,b,c\r\n"]) csv.reader(["a,b,c\r\n"])
delta = rc-lastrc csv.reader(["a,b,c\r\n"])
lastrc = rc csv.reader(["a,b,c\r\n"])
# if csv.reader() leaks, last delta should be 3 or more delta = rc-lastrc
self.assertEqual(delta < 3, True) lastrc = rc
# if csv.reader() leaks, last delta should be 3 or more
self.assertEqual(delta < 3, True)
def test_create_write(self): def test_create_write(self):
delta = 0 delta = 0
lastrc = sys.gettotalrefcount() lastrc = sys.gettotalrefcount()
s = NUL() s = NUL()
for i in range(20): for i in range(20):
gc.collect() gc.collect()
self.assertEqual(gc.garbage, []) self.assertEqual(gc.garbage, [])
rc = sys.gettotalrefcount() rc = sys.gettotalrefcount()
csv.writer(s) csv.writer(s)
csv.writer(s) csv.writer(s)
csv.writer(s) csv.writer(s)
delta = rc-lastrc delta = rc-lastrc
lastrc = rc lastrc = rc
# if csv.writer() leaks, last delta should be 3 or more # if csv.writer() leaks, last delta should be 3 or more
self.assertEqual(delta < 3, True) self.assertEqual(delta < 3, True)
def test_read(self): def test_read(self):
delta = 0 delta = 0
rows = ["a,b,c\r\n"]*5 rows = ["a,b,c\r\n"]*5
lastrc = sys.gettotalrefcount() lastrc = sys.gettotalrefcount()
for i in range(20): for i in range(20):
gc.collect() gc.collect()
self.assertEqual(gc.garbage, []) self.assertEqual(gc.garbage, [])
rc = sys.gettotalrefcount() rc = sys.gettotalrefcount()
rdr = csv.reader(rows) rdr = csv.reader(rows)
for row in rdr: for row in rdr:
pass pass
delta = rc-lastrc delta = rc-lastrc
lastrc = rc lastrc = rc
# if reader leaks during read, delta should be 5 or more # if reader leaks during read, delta should be 5 or more
self.assertEqual(delta < 5, True) self.assertEqual(delta < 5, True)
def test_write(self): def test_write(self):
delta = 0 delta = 0
rows = [[1,2,3]]*5 rows = [[1,2,3]]*5
s = NUL() s = NUL()
lastrc = sys.gettotalrefcount() lastrc = sys.gettotalrefcount()
for i in range(20): for i in range(20):
gc.collect() gc.collect()
self.assertEqual(gc.garbage, []) self.assertEqual(gc.garbage, [])
rc = sys.gettotalrefcount() rc = sys.gettotalrefcount()
writer = csv.writer(s) writer = csv.writer(s)
for row in rows: for row in rows:
writer.writerow(row) writer.writerow(row)
delta = rc-lastrc delta = rc-lastrc
lastrc = rc lastrc = rc
# if writer leaks during write, last delta should be 5 or more # if writer leaks during write, last delta should be 5 or more
self.assertEqual(delta < 5, True) self.assertEqual(delta < 5, True)
class TestUnicode(unittest.TestCase): class TestUnicode(unittest.TestCase):

View file

@ -37,11 +37,9 @@ class DumbDBMTestCase(unittest.TestCase):
self.read_helper(f) self.read_helper(f)
f.close() f.close()
@unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
@unittest.skipUnless(hasattr(os, 'chmod'), 'test needs os.chmod()')
def test_dumbdbm_creation_mode(self): def test_dumbdbm_creation_mode(self):
# On platforms without chmod, don't do anything.
if not (hasattr(os, 'chmod') and hasattr(os, 'umask')):
return
try: try:
old_umask = os.umask(0o002) old_umask = os.umask(0o002)
f = dumbdbm.open(_fname, 'c', 0o637) f = dumbdbm.open(_fname, 'c', 0o637)

View file

@ -204,11 +204,10 @@ class TestReversed(unittest.TestCase, PickleTest):
self.assertRaises(TypeError, reversed) self.assertRaises(TypeError, reversed)
self.assertRaises(TypeError, reversed, [], 'extra') self.assertRaises(TypeError, reversed, [], 'extra')
@unittest.skipUnless(hasattr(sys, 'getrefcount'), 'test needs sys.getrefcount()')
def test_bug1229429(self): def test_bug1229429(self):
# this bug was never in reversed, it was in # this bug was never in reversed, it was in
# PyObject_CallMethod, and reversed_new calls that sometimes. # PyObject_CallMethod, and reversed_new calls that sometimes.
if not hasattr(sys, "getrefcount"):
return
def f(): def f():
pass pass
r = f.__reversed__ = object() r = f.__reversed__ = object()

View file

@ -16,7 +16,7 @@ try:
except ImportError: except ImportError:
ssl = None ssl = None
from unittest import TestCase from unittest import TestCase, skipUnless
from test import support from test import support
from test.support import HOST, HOSTv6 from test.support import HOST, HOSTv6
threading = support.import_module('threading') threading = support.import_module('threading')
@ -779,6 +779,7 @@ class TestFTPClass(TestCase):
self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f) self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
@skipUnless(support.IPV6_ENABLED, "IPv6 not enabled")
class TestIPv6Environment(TestCase): class TestIPv6Environment(TestCase):
def setUp(self): def setUp(self):
@ -819,6 +820,7 @@ class TestIPv6Environment(TestCase):
retr() retr()
@skipUnless(ssl, "SSL not available")
class TestTLS_FTPClassMixin(TestFTPClass): class TestTLS_FTPClassMixin(TestFTPClass):
"""Repeat TestFTPClass tests starting the TLS layer for both control """Repeat TestFTPClass tests starting the TLS layer for both control
and data connections first. and data connections first.
@ -834,6 +836,7 @@ class TestTLS_FTPClassMixin(TestFTPClass):
self.client.prot_p() self.client.prot_p()
@skipUnless(ssl, "SSL not available")
class TestTLS_FTPClass(TestCase): class TestTLS_FTPClass(TestCase):
"""Specific TLS_FTP class tests.""" """Specific TLS_FTP class tests."""
@ -1015,12 +1018,9 @@ class TestTimeouts(TestCase):
def test_main(): def test_main():
tests = [TestFTPClass, TestTimeouts] tests = [TestFTPClass, TestTimeouts,
if support.IPV6_ENABLED: TestIPv6Environment,
tests.append(TestIPv6Environment) TestTLS_FTPClassMixin, TestTLS_FTPClass]
if ssl is not None:
tests.extend([TestTLS_FTPClassMixin, TestTLS_FTPClass])
thread_info = support.threading_setup() thread_info = support.threading_setup()
try: try:

View file

@ -868,10 +868,10 @@ class TestMaildir(TestMailbox, unittest.TestCase):
for msg in self._box: for msg in self._box:
pass pass
@unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
@unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def test_file_permissions(self): def test_file_permissions(self):
# Verify that message files are created without execute permissions # Verify that message files are created without execute permissions
if not hasattr(os, "stat") or not hasattr(os, "umask"):
return
msg = mailbox.MaildirMessage(self._template % 0) msg = mailbox.MaildirMessage(self._template % 0)
orig_umask = os.umask(0) orig_umask = os.umask(0)
try: try:
@ -882,12 +882,11 @@ class TestMaildir(TestMailbox, unittest.TestCase):
mode = os.stat(path).st_mode mode = os.stat(path).st_mode
self.assertFalse(mode & 0o111) self.assertFalse(mode & 0o111)
@unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
@unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def test_folder_file_perms(self): def test_folder_file_perms(self):
# From bug #3228, we want to verify that the file created inside a Maildir # From bug #3228, we want to verify that the file created inside a Maildir
# subfolder isn't marked as executable. # subfolder isn't marked as executable.
if not hasattr(os, "stat") or not hasattr(os, "umask"):
return
orig_umask = os.umask(0) orig_umask = os.umask(0)
try: try:
subfolder = self._box.add_folder('subfolder') subfolder = self._box.add_folder('subfolder')
@ -1097,24 +1096,25 @@ class TestMbox(_TestMboxMMDF, unittest.TestCase):
_factory = lambda self, path, factory=None: mailbox.mbox(path, factory) _factory = lambda self, path, factory=None: mailbox.mbox(path, factory)
@unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
@unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def test_file_perms(self): def test_file_perms(self):
# From bug #3228, we want to verify that the mailbox file isn't executable, # From bug #3228, we want to verify that the mailbox file isn't executable,
# even if the umask is set to something that would leave executable bits set. # even if the umask is set to something that would leave executable bits set.
# We only run this test on platforms that support umask. # We only run this test on platforms that support umask.
if hasattr(os, 'umask') and hasattr(os, 'stat'): try:
try: old_umask = os.umask(0o077)
old_umask = os.umask(0o077) self._box.close()
self._box.close() os.unlink(self._path)
os.unlink(self._path) self._box = mailbox.mbox(self._path, create=True)
self._box = mailbox.mbox(self._path, create=True) self._box.add('')
self._box.add('') self._box.close()
self._box.close() finally:
finally: os.umask(old_umask)
os.umask(old_umask)
st = os.stat(self._path) st = os.stat(self._path)
perms = st.st_mode perms = st.st_mode
self.assertFalse((perms & 0o111)) # Execute bits should all be off. self.assertFalse((perms & 0o111)) # Execute bits should all be off.
def test_terminating_newline(self): def test_terminating_newline(self):
message = email.message.Message() message = email.message.Message()

View file

@ -980,38 +980,37 @@ class MathTests(unittest.TestCase):
# still fails this part of the test on some platforms. For now, we only # still fails this part of the test on some platforms. For now, we only
# *run* test_exceptions() in verbose mode, so that this isn't normally # *run* test_exceptions() in verbose mode, so that this isn't normally
# tested. # tested.
@unittest.skipUnless(verbose, 'requires verbose mode')
def test_exceptions(self):
try:
x = math.exp(-1000000000)
except:
# mathmodule.c is failing to weed out underflows from libm, or
# we've got an fp format with huge dynamic range
self.fail("underflowing exp() should not have raised "
"an exception")
if x != 0:
self.fail("underflowing exp() should have returned 0")
if verbose: # If this fails, probably using a strict IEEE-754 conforming libm, and x
def test_exceptions(self): # is +Inf afterwards. But Python wants overflows detected by default.
try: try:
x = math.exp(-1000000000) x = math.exp(1000000000)
except: except OverflowError:
# mathmodule.c is failing to weed out underflows from libm, or pass
# we've got an fp format with huge dynamic range else:
self.fail("underflowing exp() should not have raised " self.fail("overflowing exp() didn't trigger OverflowError")
"an exception")
if x != 0:
self.fail("underflowing exp() should have returned 0")
# If this fails, probably using a strict IEEE-754 conforming libm, and x # If this fails, it could be a puzzle. One odd possibility is that
# is +Inf afterwards. But Python wants overflows detected by default. # mathmodule.c's macros are getting confused while comparing
try: # Inf (HUGE_VAL) to a NaN, and artificially setting errno to ERANGE
x = math.exp(1000000000) # as a result (and so raising OverflowError instead).
except OverflowError: try:
pass x = math.sqrt(-1.0)
else: except ValueError:
self.fail("overflowing exp() didn't trigger OverflowError") pass
else:
# If this fails, it could be a puzzle. One odd possibility is that self.fail("sqrt(-1) didn't raise ValueError")
# mathmodule.c's macros are getting confused while comparing
# Inf (HUGE_VAL) to a NaN, and artificially setting errno to ERANGE
# as a result (and so raising OverflowError instead).
try:
x = math.sqrt(-1.0)
except ValueError:
pass
else:
self.fail("sqrt(-1) didn't raise ValueError")
@requires_IEEE_754 @requires_IEEE_754
def test_testfile(self): def test_testfile(self):

View file

@ -314,26 +314,25 @@ class MmapTests(unittest.TestCase):
mf.close() mf.close()
f.close() f.close()
@unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()")
def test_entire_file(self): def test_entire_file(self):
# test mapping of entire file by passing 0 for map length # test mapping of entire file by passing 0 for map length
if hasattr(os, "stat"): f = open(TESTFN, "wb+")
f = open(TESTFN, "wb+")
f.write(2**16 * b'm') # Arbitrary character f.write(2**16 * b'm') # Arbitrary character
f.close() f.close()
f = open(TESTFN, "rb+") f = open(TESTFN, "rb+")
mf = mmap.mmap(f.fileno(), 0) mf = mmap.mmap(f.fileno(), 0)
self.assertEqual(len(mf), 2**16, "Map size should equal file size.") self.assertEqual(len(mf), 2**16, "Map size should equal file size.")
self.assertEqual(mf.read(2**16), 2**16 * b"m") self.assertEqual(mf.read(2**16), 2**16 * b"m")
mf.close() mf.close()
f.close() f.close()
@unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()")
def test_length_0_offset(self): def test_length_0_offset(self):
# Issue #10916: test mapping of remainder of file by passing 0 for # Issue #10916: test mapping of remainder of file by passing 0 for
# map length with an offset doesn't cause a segfault. # map length with an offset doesn't cause a segfault.
if not hasattr(os, "stat"):
self.skipTest("needs os.stat")
# NOTE: allocation granularity is currently 65536 under Win64, # NOTE: allocation granularity is currently 65536 under Win64,
# and therefore the minimum offset alignment. # and therefore the minimum offset alignment.
with open(TESTFN, "wb") as f: with open(TESTFN, "wb") as f:
@ -343,12 +342,10 @@ class MmapTests(unittest.TestCase):
with mmap.mmap(f.fileno(), 0, offset=65536, access=mmap.ACCESS_READ) as mf: with mmap.mmap(f.fileno(), 0, offset=65536, access=mmap.ACCESS_READ) as mf:
self.assertRaises(IndexError, mf.__getitem__, 80000) self.assertRaises(IndexError, mf.__getitem__, 80000)
@unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()")
def test_length_0_large_offset(self): def test_length_0_large_offset(self):
# Issue #10959: test mapping of a file by passing 0 for # Issue #10959: test mapping of a file by passing 0 for
# map length with a large offset doesn't cause a segfault. # map length with a large offset doesn't cause a segfault.
if not hasattr(os, "stat"):
self.skipTest("needs os.stat")
with open(TESTFN, "wb") as f: with open(TESTFN, "wb") as f:
f.write(115699 * b'm') # Arbitrary character f.write(115699 * b'm') # Arbitrary character
@ -560,9 +557,8 @@ class MmapTests(unittest.TestCase):
return mmap.mmap.__new__(klass, -1, *args, **kwargs) return mmap.mmap.__new__(klass, -1, *args, **kwargs)
anon_mmap(PAGESIZE) anon_mmap(PAGESIZE)
@unittest.skipUnless(hasattr(mmap, 'PROT_READ'), "needs mmap.PROT_READ")
def test_prot_readonly(self): def test_prot_readonly(self):
if not hasattr(mmap, 'PROT_READ'):
return
mapsize = 10 mapsize = 10
with open(TESTFN, "wb") as fp: with open(TESTFN, "wb") as fp:
fp.write(b"a"*mapsize) fp.write(b"a"*mapsize)
@ -616,67 +612,69 @@ class MmapTests(unittest.TestCase):
self.assertEqual(m.read_byte(), b) self.assertEqual(m.read_byte(), b)
m.close() m.close()
if os.name == 'nt': @unittest.skipUnless(os.name == 'nt', 'requires Windows')
def test_tagname(self): def test_tagname(self):
data1 = b"0123456789" data1 = b"0123456789"
data2 = b"abcdefghij" data2 = b"abcdefghij"
assert len(data1) == len(data2) assert len(data1) == len(data2)
# Test same tag # Test same tag
m1 = mmap.mmap(-1, len(data1), tagname="foo") m1 = mmap.mmap(-1, len(data1), tagname="foo")
m1[:] = data1 m1[:] = data1
m2 = mmap.mmap(-1, len(data2), tagname="foo") m2 = mmap.mmap(-1, len(data2), tagname="foo")
m2[:] = data2 m2[:] = data2
self.assertEqual(m1[:], data2) self.assertEqual(m1[:], data2)
self.assertEqual(m2[:], data2) self.assertEqual(m2[:], data2)
m2.close() m2.close()
m1.close() m1.close()
# Test different tag # Test different tag
m1 = mmap.mmap(-1, len(data1), tagname="foo") m1 = mmap.mmap(-1, len(data1), tagname="foo")
m1[:] = data1 m1[:] = data1
m2 = mmap.mmap(-1, len(data2), tagname="boo") m2 = mmap.mmap(-1, len(data2), tagname="boo")
m2[:] = data2 m2[:] = data2
self.assertEqual(m1[:], data1) self.assertEqual(m1[:], data1)
self.assertEqual(m2[:], data2) self.assertEqual(m2[:], data2)
m2.close() m2.close()
m1.close() m1.close()
def test_crasher_on_windows(self): @unittest.skipUnless(os.name == 'nt', 'requires Windows')
# Should not crash (Issue 1733986) def test_crasher_on_windows(self):
m = mmap.mmap(-1, 1000, tagname="foo") # Should not crash (Issue 1733986)
try: m = mmap.mmap(-1, 1000, tagname="foo")
mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size try:
except: mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size
pass except:
m.close() pass
m.close()
# Should not crash (Issue 5385) # Should not crash (Issue 5385)
with open(TESTFN, "wb") as fp: with open(TESTFN, "wb") as fp:
fp.write(b"x"*10) fp.write(b"x"*10)
f = open(TESTFN, "r+b") f = open(TESTFN, "r+b")
m = mmap.mmap(f.fileno(), 0) m = mmap.mmap(f.fileno(), 0)
f.close() f.close()
try: try:
m.resize(0) # will raise WindowsError m.resize(0) # will raise WindowsError
except: except:
pass pass
try: try:
m[:] m[:]
except: except:
pass pass
m.close() m.close()
def test_invalid_descriptor(self): @unittest.skipUnless(os.name == 'nt', 'requires Windows')
# socket file descriptors are valid, but out of range def test_invalid_descriptor(self):
# for _get_osfhandle, causing a crash when validating the # socket file descriptors are valid, but out of range
# parameters to _get_osfhandle. # for _get_osfhandle, causing a crash when validating the
s = socket.socket() # parameters to _get_osfhandle.
try: s = socket.socket()
with self.assertRaises(mmap.error): try:
m = mmap.mmap(s.fileno(), 10) with self.assertRaises(mmap.error):
finally: m = mmap.mmap(s.fileno(), 10)
s.close() finally:
s.close()
def test_context_manager(self): def test_context_manager(self):
with mmap.mmap(-1, 10) as m: with mmap.mmap(-1, 10) as m:

View file

@ -6,10 +6,12 @@ import unittest
import functools import functools
import contextlib import contextlib
from test import support from test import support
from nntplib import NNTP, GroupInfo, _have_ssl from nntplib import NNTP, GroupInfo
import nntplib import nntplib
if _have_ssl: try:
import ssl import ssl
except ImportError:
ssl = None
TIMEOUT = 30 TIMEOUT = 30
@ -199,23 +201,23 @@ class NetworkedNNTPTestsMixin:
resp, caps = self.server.capabilities() resp, caps = self.server.capabilities()
_check_caps(caps) _check_caps(caps)
if _have_ssl: @unittest.skipUnless(ssl, 'requires SSL support')
def test_starttls(self): def test_starttls(self):
file = self.server.file file = self.server.file
sock = self.server.sock sock = self.server.sock
try: try:
self.server.starttls() self.server.starttls()
except nntplib.NNTPPermanentError: except nntplib.NNTPPermanentError:
self.skipTest("STARTTLS not supported by server.") self.skipTest("STARTTLS not supported by server.")
else: else:
# Check that the socket and internal pseudo-file really were # Check that the socket and internal pseudo-file really were
# changed. # changed.
self.assertNotEqual(file, self.server.file) self.assertNotEqual(file, self.server.file)
self.assertNotEqual(sock, self.server.sock) self.assertNotEqual(sock, self.server.sock)
# Check that the new socket really is an SSL one # Check that the new socket really is an SSL one
self.assertIsInstance(self.server.sock, ssl.SSLSocket) self.assertIsInstance(self.server.sock, ssl.SSLSocket)
# Check that trying starttls when it's already active fails. # Check that trying starttls when it's already active fails.
self.assertRaises(ValueError, self.server.starttls) self.assertRaises(ValueError, self.server.starttls)
def test_zlogin(self): def test_zlogin(self):
# This test must be the penultimate because further commands will be # This test must be the penultimate because further commands will be
@ -300,25 +302,24 @@ class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
if cls.server is not None: if cls.server is not None:
cls.server.quit() cls.server.quit()
@unittest.skipUnless(ssl, 'requires SSL support')
class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
if _have_ssl: # Technical limits for this public NNTP server (see http://www.aioe.org):
class NetworkedNNTP_SSLTests(NetworkedNNTPTests): # "Only two concurrent connections per IP address are allowed and
# 400 connections per day are accepted from each IP address."
# Technical limits for this public NNTP server (see http://www.aioe.org): NNTP_HOST = 'nntp.aioe.org'
# "Only two concurrent connections per IP address are allowed and GROUP_NAME = 'comp.lang.python'
# 400 connections per day are accepted from each IP address." GROUP_PAT = 'comp.lang.*'
NNTP_HOST = 'nntp.aioe.org' NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
GROUP_NAME = 'comp.lang.python'
GROUP_PAT = 'comp.lang.*'
NNTP_CLASS = nntplib.NNTP_SSL # Disabled as it produces too much data
test_list = None
# Disabled as it produces too much data # Disabled as the connection will already be encrypted.
test_list = None test_starttls = None
# Disabled as the connection will already be encrypted.
test_starttls = None
# #
@ -1407,12 +1408,13 @@ class MiscTests(unittest.TestCase):
gives(2000, 6, 23, "000623", "000000") gives(2000, 6, 23, "000623", "000000")
gives(2010, 6, 5, "100605", "000000") gives(2010, 6, 5, "100605", "000000")
@unittest.skipUnless(ssl, 'requires SSL support')
def test_ssl_support(self):
self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
def test_main(): def test_main():
tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, CapsAfterLoginNNTPv2Tests, tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, CapsAfterLoginNNTPv2Tests,
SendReaderNNTPv2Tests, NetworkedNNTPTests] SendReaderNNTPv2Tests, NetworkedNNTPTests, NetworkedNNTP_SSLTests]
if _have_ssl:
tests.append(NetworkedNNTP_SSLTests)
support.run_unittest(*tests) support.run_unittest(*tests)

View file

@ -178,10 +178,8 @@ class StatAttributeTests(unittest.TestCase):
os.unlink(self.fname) os.unlink(self.fname)
os.rmdir(support.TESTFN) os.rmdir(support.TESTFN)
@unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def check_stat_attributes(self, fname): def check_stat_attributes(self, fname):
if not hasattr(os, "stat"):
return
result = os.stat(fname) result = os.stat(fname)
# Make sure direct access works # Make sure direct access works
@ -258,10 +256,8 @@ class StatAttributeTests(unittest.TestCase):
warnings.simplefilter("ignore", DeprecationWarning) warnings.simplefilter("ignore", DeprecationWarning)
self.check_stat_attributes(fname) self.check_stat_attributes(fname)
@unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
def test_statvfs_attributes(self): def test_statvfs_attributes(self):
if not hasattr(os, "statvfs"):
return
try: try:
result = os.statvfs(self.fname) result = os.statvfs(self.fname)
except OSError as e: except OSError as e:
@ -450,10 +446,10 @@ class StatAttributeTests(unittest.TestCase):
os.close(dirfd) os.close(dirfd)
self._test_utime_subsecond(set_time) self._test_utime_subsecond(set_time)
# Restrict test to Win32, since there is no guarantee other # Restrict tests to Win32, since there is no guarantee other
# systems support centiseconds # systems support centiseconds
if sys.platform == 'win32': def get_file_system(path):
def get_file_system(path): if sys.platform == 'win32':
root = os.path.splitdrive(os.path.abspath(path))[0] + '\\' root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
import ctypes import ctypes
kernel32 = ctypes.windll.kernel32 kernel32 = ctypes.windll.kernel32
@ -461,38 +457,45 @@ class StatAttributeTests(unittest.TestCase):
if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)): if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
return buf.value return buf.value
if get_file_system(support.TESTFN) == "NTFS": @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
def test_1565150(self): @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
t1 = 1159195039.25 "requires NTFS")
os.utime(self.fname, (t1, t1)) def test_1565150(self):
self.assertEqual(os.stat(self.fname).st_mtime, t1) t1 = 1159195039.25
os.utime(self.fname, (t1, t1))
self.assertEqual(os.stat(self.fname).st_mtime, t1)
def test_large_time(self): @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
t1 = 5000000000 # some day in 2128 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
os.utime(self.fname, (t1, t1)) "requires NTFS")
self.assertEqual(os.stat(self.fname).st_mtime, t1) def test_large_time(self):
t1 = 5000000000 # some day in 2128
os.utime(self.fname, (t1, t1))
self.assertEqual(os.stat(self.fname).st_mtime, t1)
def test_1686475(self): @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
# Verify that an open file can be stat'ed def test_1686475(self):
try: # Verify that an open file can be stat'ed
os.stat(r"c:\pagefile.sys") try:
except WindowsError as e: os.stat(r"c:\pagefile.sys")
if e.errno == 2: # file does not exist; cannot run test except WindowsError as e:
return if e.errno == 2: # file does not exist; cannot run test
self.fail("Could not stat pagefile.sys") return
self.fail("Could not stat pagefile.sys")
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
def test_15261(self): @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
# Verify that stat'ing a closed fd does not cause crash def test_15261(self):
r, w = os.pipe() # Verify that stat'ing a closed fd does not cause crash
try: r, w = os.pipe()
os.stat(r) # should not raise error try:
finally: os.stat(r) # should not raise error
os.close(r) finally:
os.close(w) os.close(r)
with self.assertRaises(OSError) as ctx: os.close(w)
os.stat(r) with self.assertRaises(OSError) as ctx:
self.assertEqual(ctx.exception.errno, errno.EBADF) os.stat(r)
self.assertEqual(ctx.exception.errno, errno.EBADF)
from test import mapping_tests from test import mapping_tests
@ -1127,6 +1130,7 @@ class ExecTests(unittest.TestCase):
self._test_internal_execvpe(bytes) self._test_internal_execvpe(bytes)
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
class Win32ErrorTests(unittest.TestCase): class Win32ErrorTests(unittest.TestCase):
def test_rename(self): def test_rename(self):
self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak") self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
@ -1173,63 +1177,63 @@ class TestInvalidFD(unittest.TestCase):
self.fail("%r didn't raise a OSError with a bad file descriptor" self.fail("%r didn't raise a OSError with a bad file descriptor"
% f) % f)
@unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
def test_isatty(self): def test_isatty(self):
if hasattr(os, "isatty"): self.assertEqual(os.isatty(support.make_bad_fd()), False)
self.assertEqual(os.isatty(support.make_bad_fd()), False)
@unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
def test_closerange(self): def test_closerange(self):
if hasattr(os, "closerange"): fd = support.make_bad_fd()
fd = support.make_bad_fd() # Make sure none of the descriptors we are about to close are
# Make sure none of the descriptors we are about to close are # currently valid (issue 6542).
# currently valid (issue 6542). for i in range(10):
for i in range(10): try: os.fstat(fd+i)
try: os.fstat(fd+i) except OSError:
except OSError: pass
pass else:
else: break
break if i < 2:
if i < 2: raise unittest.SkipTest(
raise unittest.SkipTest( "Unable to acquire a range of invalid file descriptors")
"Unable to acquire a range of invalid file descriptors") self.assertEqual(os.closerange(fd, fd + i-1), None)
self.assertEqual(os.closerange(fd, fd + i-1), None)
@unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
def test_dup2(self): def test_dup2(self):
if hasattr(os, "dup2"): self.check(os.dup2, 20)
self.check(os.dup2, 20)
@unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
def test_fchmod(self): def test_fchmod(self):
if hasattr(os, "fchmod"): self.check(os.fchmod, 0)
self.check(os.fchmod, 0)
@unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
def test_fchown(self): def test_fchown(self):
if hasattr(os, "fchown"): self.check(os.fchown, -1, -1)
self.check(os.fchown, -1, -1)
@unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
def test_fpathconf(self): def test_fpathconf(self):
if hasattr(os, "fpathconf"): self.check(os.pathconf, "PC_NAME_MAX")
self.check(os.pathconf, "PC_NAME_MAX") self.check(os.fpathconf, "PC_NAME_MAX")
self.check(os.fpathconf, "PC_NAME_MAX")
@unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
def test_ftruncate(self): def test_ftruncate(self):
if hasattr(os, "ftruncate"): self.check(os.truncate, 0)
self.check(os.truncate, 0) self.check(os.ftruncate, 0)
self.check(os.ftruncate, 0)
@unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
def test_lseek(self): def test_lseek(self):
if hasattr(os, "lseek"): self.check(os.lseek, 0, 0)
self.check(os.lseek, 0, 0)
@unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
def test_read(self): def test_read(self):
if hasattr(os, "read"): self.check(os.read, 1)
self.check(os.read, 1)
@unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
def test_tcsetpgrpt(self): def test_tcsetpgrpt(self):
if hasattr(os, "tcsetpgrp"): self.check(os.tcsetpgrp, 0)
self.check(os.tcsetpgrp, 0)
@unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
def test_write(self): def test_write(self):
if hasattr(os, "write"): self.check(os.write, b" ")
self.check(os.write, b" ")
class LinkTests(unittest.TestCase): class LinkTests(unittest.TestCase):
@ -1269,138 +1273,117 @@ class LinkTests(unittest.TestCase):
self.file2 = self.file1 + "2" self.file2 = self.file1 + "2"
self._test_link(self.file1, self.file2) self._test_link(self.file1, self.file2)
if sys.platform != 'win32': @unittest.skipIf(sys.platform == "win32", "Posix specific tests")
class Win32ErrorTests(unittest.TestCase): class PosixUidGidTests(unittest.TestCase):
pass @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
def test_setuid(self):
if os.getuid() != 0:
self.assertRaises(os.error, os.setuid, 0)
self.assertRaises(OverflowError, os.setuid, 1<<32)
class PosixUidGidTests(unittest.TestCase): @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
if hasattr(os, 'setuid'): def test_setgid(self):
def test_setuid(self): if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
if os.getuid() != 0: self.assertRaises(os.error, os.setgid, 0)
self.assertRaises(os.error, os.setuid, 0) self.assertRaises(OverflowError, os.setgid, 1<<32)
self.assertRaises(OverflowError, os.setuid, 1<<32)
if hasattr(os, 'setgid'): @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
def test_setgid(self): def test_seteuid(self):
if os.getuid() != 0 and not HAVE_WHEEL_GROUP: if os.getuid() != 0:
self.assertRaises(os.error, os.setgid, 0) self.assertRaises(os.error, os.seteuid, 0)
self.assertRaises(OverflowError, os.setgid, 1<<32) self.assertRaises(OverflowError, os.seteuid, 1<<32)
if hasattr(os, 'seteuid'): @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
def test_seteuid(self): def test_setegid(self):
if os.getuid() != 0: if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
self.assertRaises(os.error, os.seteuid, 0) self.assertRaises(os.error, os.setegid, 0)
self.assertRaises(OverflowError, os.seteuid, 1<<32) self.assertRaises(OverflowError, os.setegid, 1<<32)
if hasattr(os, 'setegid'): @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
def test_setegid(self): def test_setreuid(self):
if os.getuid() != 0 and not HAVE_WHEEL_GROUP: if os.getuid() != 0:
self.assertRaises(os.error, os.setegid, 0) self.assertRaises(os.error, os.setreuid, 0, 0)
self.assertRaises(OverflowError, os.setegid, 1<<32) self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
if hasattr(os, 'setreuid'): @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
def test_setreuid(self): def test_setregid(self):
if os.getuid() != 0: if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
self.assertRaises(os.error, os.setreuid, 0, 0) self.assertRaises(os.error, os.setregid, 0, 0)
self.assertRaises(OverflowError, os.setreuid, 1<<32, 0) self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
self.assertRaises(OverflowError, os.setreuid, 0, 1<<32) self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
def test_setreuid_neg1(self): @unittest.skipIf(sys.platform == "win32", "Posix specific tests")
# Needs to accept -1. We run this in a subprocess to avoid class Pep383Tests(unittest.TestCase):
# altering the test runner's process state (issue8045). def setUp(self):
subprocess.check_call([ if support.TESTFN_UNENCODABLE:
sys.executable, '-c', self.dir = support.TESTFN_UNENCODABLE
'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) elif support.TESTFN_NONASCII:
self.dir = support.TESTFN_NONASCII
else:
self.dir = support.TESTFN
self.bdir = os.fsencode(self.dir)
if hasattr(os, 'setregid'): bytesfn = []
def test_setregid(self): def add_filename(fn):
if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
self.assertRaises(os.error, os.setregid, 0, 0)
self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
def test_setregid_neg1(self):
# Needs to accept -1. We run this in a subprocess to avoid
# altering the test runner's process state (issue8045).
subprocess.check_call([
sys.executable, '-c',
'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
class Pep383Tests(unittest.TestCase):
def setUp(self):
if support.TESTFN_UNENCODABLE:
self.dir = support.TESTFN_UNENCODABLE
elif support.TESTFN_NONASCII:
self.dir = support.TESTFN_NONASCII
else:
self.dir = support.TESTFN
self.bdir = os.fsencode(self.dir)
bytesfn = []
def add_filename(fn):
try:
fn = os.fsencode(fn)
except UnicodeEncodeError:
return
bytesfn.append(fn)
add_filename(support.TESTFN_UNICODE)
if support.TESTFN_UNENCODABLE:
add_filename(support.TESTFN_UNENCODABLE)
if support.TESTFN_NONASCII:
add_filename(support.TESTFN_NONASCII)
if not bytesfn:
self.skipTest("couldn't create any non-ascii filename")
self.unicodefn = set()
os.mkdir(self.dir)
try: try:
for fn in bytesfn: fn = os.fsencode(fn)
support.create_empty_file(os.path.join(self.bdir, fn)) except UnicodeEncodeError:
fn = os.fsdecode(fn) return
if fn in self.unicodefn: bytesfn.append(fn)
raise ValueError("duplicate filename") add_filename(support.TESTFN_UNICODE)
self.unicodefn.add(fn) if support.TESTFN_UNENCODABLE:
except: add_filename(support.TESTFN_UNENCODABLE)
shutil.rmtree(self.dir) if support.TESTFN_NONASCII:
raise add_filename(support.TESTFN_NONASCII)
if not bytesfn:
self.skipTest("couldn't create any non-ascii filename")
def tearDown(self): self.unicodefn = set()
os.mkdir(self.dir)
try:
for fn in bytesfn:
support.create_empty_file(os.path.join(self.bdir, fn))
fn = os.fsdecode(fn)
if fn in self.unicodefn:
raise ValueError("duplicate filename")
self.unicodefn.add(fn)
except:
shutil.rmtree(self.dir) shutil.rmtree(self.dir)
raise
def test_listdir(self): def tearDown(self):
expected = self.unicodefn shutil.rmtree(self.dir)
found = set(os.listdir(self.dir))
self.assertEqual(found, expected)
# test listdir without arguments
current_directory = os.getcwd()
try:
os.chdir(os.sep)
self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
finally:
os.chdir(current_directory)
def test_open(self): def test_listdir(self):
for fn in self.unicodefn: expected = self.unicodefn
f = open(os.path.join(self.dir, fn), 'rb') found = set(os.listdir(self.dir))
f.close() self.assertEqual(found, expected)
# test listdir without arguments
current_directory = os.getcwd()
try:
os.chdir(os.sep)
self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
finally:
os.chdir(current_directory)
@unittest.skipUnless(hasattr(os, 'statvfs'), def test_open(self):
"need os.statvfs()") for fn in self.unicodefn:
def test_statvfs(self): f = open(os.path.join(self.dir, fn), 'rb')
# issue #9645 f.close()
for fn in self.unicodefn:
# should not fail with file not found error
fullname = os.path.join(self.dir, fn)
os.statvfs(fullname)
def test_stat(self): @unittest.skipUnless(hasattr(os, 'statvfs'),
for fn in self.unicodefn: "need os.statvfs()")
os.stat(os.path.join(self.dir, fn)) def test_statvfs(self):
else: # issue #9645
class PosixUidGidTests(unittest.TestCase): for fn in self.unicodefn:
pass # should not fail with file not found error
class Pep383Tests(unittest.TestCase): fullname = os.path.join(self.dir, fn)
pass os.statvfs(fullname)
def test_stat(self):
for fn in self.unicodefn:
os.stat(os.path.join(self.dir, fn))
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
class Win32KillTests(unittest.TestCase): class Win32KillTests(unittest.TestCase):
@ -1838,6 +1821,8 @@ class TestSendfile(unittest.TestCase):
SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \ SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
not sys.platform.startswith("solaris") and \ not sys.platform.startswith("solaris") and \
not sys.platform.startswith("sunos") not sys.platform.startswith("sunos")
requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
'requires headers and trailers support')
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
@ -1956,52 +1941,54 @@ class TestSendfile(unittest.TestCase):
# --- headers / trailers tests # --- headers / trailers tests
if SUPPORT_HEADERS_TRAILERS: @requires_headers_trailers
def test_headers(self):
def test_headers(self): total_sent = 0
total_sent = 0 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
sent = os.sendfile(self.sockno, self.fileno, 0, 4096, headers=[b"x" * 512])
headers=[b"x" * 512]) total_sent += sent
offset = 4096
nbytes = 4096
while 1:
sent = self.sendfile_wrapper(self.sockno, self.fileno,
offset, nbytes)
if sent == 0:
break
total_sent += sent total_sent += sent
offset = 4096 offset += sent
nbytes = 4096
while 1:
sent = self.sendfile_wrapper(self.sockno, self.fileno,
offset, nbytes)
if sent == 0:
break
total_sent += sent
offset += sent
expected_data = b"x" * 512 + self.DATA expected_data = b"x" * 512 + self.DATA
self.assertEqual(total_sent, len(expected_data)) self.assertEqual(total_sent, len(expected_data))
self.client.close()
self.server.wait()
data = self.server.handler_instance.get_data()
self.assertEqual(hash(data), hash(expected_data))
@requires_headers_trailers
def test_trailers(self):
TESTFN2 = support.TESTFN + "2"
file_data = b"abcdef"
with open(TESTFN2, 'wb') as f:
f.write(file_data)
with open(TESTFN2, 'rb')as f:
self.addCleanup(os.remove, TESTFN2)
os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
trailers=[b"1234"])
self.client.close() self.client.close()
self.server.wait() self.server.wait()
data = self.server.handler_instance.get_data() data = self.server.handler_instance.get_data()
self.assertEqual(hash(data), hash(expected_data)) self.assertEqual(data, b"abcdef1234")
def test_trailers(self): @requires_headers_trailers
TESTFN2 = support.TESTFN + "2" @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
file_data = b"abcdef" 'test needs os.SF_NODISKIO')
with open(TESTFN2, 'wb') as f: def test_flags(self):
f.write(file_data) try:
with open(TESTFN2, 'rb')as f: os.sendfile(self.sockno, self.fileno, 0, 4096,
self.addCleanup(os.remove, TESTFN2) flags=os.SF_NODISKIO)
os.sendfile(self.sockno, f.fileno(), 0, len(file_data), except OSError as err:
trailers=[b"1234"]) if err.errno not in (errno.EBUSY, errno.EAGAIN):
self.client.close() raise
self.server.wait()
data = self.server.handler_instance.get_data()
self.assertEqual(data, b"abcdef1234")
if hasattr(os, "SF_NODISKIO"):
def test_flags(self):
try:
os.sendfile(self.sockno, self.fileno, 0, 4096,
flags=os.SF_NODISKIO)
except OSError as err:
if err.errno not in (errno.EBUSY, errno.EAGAIN):
raise
def supports_extended_attributes(): def supports_extended_attributes():

View file

@ -11,7 +11,7 @@ import os
import time import time
import errno import errno
from unittest import TestCase from unittest import TestCase, skipUnless
from test import support as test_support from test import support as test_support
threading = test_support.import_module('threading') threading = test_support.import_module('threading')
@ -288,35 +288,37 @@ if hasattr(poplib, 'POP3_SSL'):
else: else:
DummyPOP3Handler.handle_read(self) DummyPOP3Handler.handle_read(self)
requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported')
class TestPOP3_SSLClass(TestPOP3Class): @requires_ssl
# repeat previous tests by using poplib.POP3_SSL class TestPOP3_SSLClass(TestPOP3Class):
# repeat previous tests by using poplib.POP3_SSL
def setUp(self): def setUp(self):
self.server = DummyPOP3Server((HOST, PORT)) self.server = DummyPOP3Server((HOST, PORT))
self.server.handler = DummyPOP3_SSLHandler self.server.handler = DummyPOP3_SSLHandler
self.server.start() self.server.start()
self.client = poplib.POP3_SSL(self.server.host, self.server.port) self.client = poplib.POP3_SSL(self.server.host, self.server.port)
def test__all__(self): def test__all__(self):
self.assertIn('POP3_SSL', poplib.__all__) self.assertIn('POP3_SSL', poplib.__all__)
def test_context(self): def test_context(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
self.server.port, keyfile=CERTFILE, context=ctx) self.server.port, keyfile=CERTFILE, context=ctx)
self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
self.server.port, certfile=CERTFILE, context=ctx) self.server.port, certfile=CERTFILE, context=ctx)
self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
self.server.port, keyfile=CERTFILE, self.server.port, keyfile=CERTFILE,
certfile=CERTFILE, context=ctx) certfile=CERTFILE, context=ctx)
self.client.quit() self.client.quit()
self.client = poplib.POP3_SSL(self.server.host, self.server.port, self.client = poplib.POP3_SSL(self.server.host, self.server.port,
context=ctx) context=ctx)
self.assertIsInstance(self.client.sock, ssl.SSLSocket) self.assertIsInstance(self.client.sock, ssl.SSLSocket)
self.assertIs(self.client.sock.context, ctx) self.assertIs(self.client.sock.context, ctx)
self.assertTrue(self.client.noop().startswith(b'+OK')) self.assertTrue(self.client.noop().startswith(b'+OK'))
class TestTimeouts(TestCase): class TestTimeouts(TestCase):
@ -374,9 +376,8 @@ class TestTimeouts(TestCase):
def test_main(): def test_main():
tests = [TestPOP3Class, TestTimeouts] tests = [TestPOP3Class, TestTimeouts,
if SUPPORTS_SSL: TestPOP3_SSLClass]
tests.append(TestPOP3_SSLClass)
thread_info = test_support.threading_setup() thread_info = test_support.threading_setup()
try: try:
test_support.run_unittest(*tests) test_support.run_unittest(*tests)

View file

@ -54,47 +54,55 @@ class PosixTester(unittest.TestCase):
posix_func() posix_func()
self.assertRaises(TypeError, posix_func, 1) self.assertRaises(TypeError, posix_func, 1)
if hasattr(posix, 'getresuid'): @unittest.skipUnless(hasattr(posix, 'getresuid'),
def test_getresuid(self): 'test needs posix.getresuid()')
user_ids = posix.getresuid() def test_getresuid(self):
self.assertEqual(len(user_ids), 3) user_ids = posix.getresuid()
for val in user_ids: self.assertEqual(len(user_ids), 3)
self.assertGreaterEqual(val, 0) for val in user_ids:
self.assertGreaterEqual(val, 0)
if hasattr(posix, 'getresgid'): @unittest.skipUnless(hasattr(posix, 'getresgid'),
def test_getresgid(self): 'test needs posix.getresgid()')
group_ids = posix.getresgid() def test_getresgid(self):
self.assertEqual(len(group_ids), 3) group_ids = posix.getresgid()
for val in group_ids: self.assertEqual(len(group_ids), 3)
self.assertGreaterEqual(val, 0) for val in group_ids:
self.assertGreaterEqual(val, 0)
if hasattr(posix, 'setresuid'): @unittest.skipUnless(hasattr(posix, 'setresuid'),
def test_setresuid(self): 'test needs posix.setresuid()')
current_user_ids = posix.getresuid() def test_setresuid(self):
self.assertIsNone(posix.setresuid(*current_user_ids)) current_user_ids = posix.getresuid()
# -1 means don't change that value. self.assertIsNone(posix.setresuid(*current_user_ids))
self.assertIsNone(posix.setresuid(-1, -1, -1)) # -1 means don't change that value.
self.assertIsNone(posix.setresuid(-1, -1, -1))
def test_setresuid_exception(self): @unittest.skipUnless(hasattr(posix, 'setresuid'),
# Don't do this test if someone is silly enough to run us as root. 'test needs posix.setresuid()')
current_user_ids = posix.getresuid() def test_setresuid_exception(self):
if 0 not in current_user_ids: # Don't do this test if someone is silly enough to run us as root.
new_user_ids = (current_user_ids[0]+1, -1, -1) current_user_ids = posix.getresuid()
self.assertRaises(OSError, posix.setresuid, *new_user_ids) if 0 not in current_user_ids:
new_user_ids = (current_user_ids[0]+1, -1, -1)
self.assertRaises(OSError, posix.setresuid, *new_user_ids)
if hasattr(posix, 'setresgid'): @unittest.skipUnless(hasattr(posix, 'setresgid'),
def test_setresgid(self): 'test needs posix.setresgid()')
current_group_ids = posix.getresgid() def test_setresgid(self):
self.assertIsNone(posix.setresgid(*current_group_ids)) current_group_ids = posix.getresgid()
# -1 means don't change that value. self.assertIsNone(posix.setresgid(*current_group_ids))
self.assertIsNone(posix.setresgid(-1, -1, -1)) # -1 means don't change that value.
self.assertIsNone(posix.setresgid(-1, -1, -1))
def test_setresgid_exception(self): @unittest.skipUnless(hasattr(posix, 'setresgid'),
# Don't do this test if someone is silly enough to run us as root. 'test needs posix.setresgid()')
current_group_ids = posix.getresgid() def test_setresgid_exception(self):
if 0 not in current_group_ids: # Don't do this test if someone is silly enough to run us as root.
new_group_ids = (current_group_ids[0]+1, -1, -1) current_group_ids = posix.getresgid()
self.assertRaises(OSError, posix.setresgid, *new_group_ids) if 0 not in current_group_ids:
new_group_ids = (current_group_ids[0]+1, -1, -1)
self.assertRaises(OSError, posix.setresgid, *new_group_ids)
@unittest.skipUnless(hasattr(posix, 'initgroups'), @unittest.skipUnless(hasattr(posix, 'initgroups'),
"test needs os.initgroups()") "test needs os.initgroups()")
@ -121,29 +129,32 @@ class PosixTester(unittest.TestCase):
else: else:
self.fail("Expected OSError to be raised by initgroups") self.fail("Expected OSError to be raised by initgroups")
@unittest.skipUnless(hasattr(posix, 'statvfs'),
'test needs posix.statvfs()')
def test_statvfs(self): def test_statvfs(self):
if hasattr(posix, 'statvfs'): self.assertTrue(posix.statvfs(os.curdir))
self.assertTrue(posix.statvfs(os.curdir))
@unittest.skipUnless(hasattr(posix, 'fstatvfs'),
'test needs posix.fstatvfs()')
def test_fstatvfs(self): def test_fstatvfs(self):
if hasattr(posix, 'fstatvfs'): fp = open(support.TESTFN)
fp = open(support.TESTFN) try:
try: self.assertTrue(posix.fstatvfs(fp.fileno()))
self.assertTrue(posix.fstatvfs(fp.fileno())) self.assertTrue(posix.statvfs(fp.fileno()))
self.assertTrue(posix.statvfs(fp.fileno())) finally:
finally: fp.close()
fp.close()
@unittest.skipUnless(hasattr(posix, 'ftruncate'),
'test needs posix.ftruncate()')
def test_ftruncate(self): def test_ftruncate(self):
if hasattr(posix, 'ftruncate'): fp = open(support.TESTFN, 'w+')
fp = open(support.TESTFN, 'w+') try:
try: # we need to have some data to truncate
# we need to have some data to truncate fp.write('test')
fp.write('test') fp.flush()
fp.flush() posix.ftruncate(fp.fileno(), 0)
posix.ftruncate(fp.fileno(), 0) finally:
finally: fp.close()
fp.close()
@unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()") @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()")
def test_truncate(self): def test_truncate(self):
@ -290,30 +301,33 @@ class PosixTester(unittest.TestCase):
finally: finally:
os.close(fd) os.close(fd)
@unittest.skipUnless(hasattr(posix, 'dup'),
'test needs posix.dup()')
def test_dup(self): def test_dup(self):
if hasattr(posix, 'dup'): fp = open(support.TESTFN)
fp = open(support.TESTFN) try:
try: fd = posix.dup(fp.fileno())
fd = posix.dup(fp.fileno()) self.assertIsInstance(fd, int)
self.assertIsInstance(fd, int) os.close(fd)
os.close(fd) finally:
finally: fp.close()
fp.close()
@unittest.skipUnless(hasattr(posix, 'confstr'),
'test needs posix.confstr()')
def test_confstr(self): def test_confstr(self):
if hasattr(posix, 'confstr'): self.assertRaises(ValueError, posix.confstr, "CS_garbage")
self.assertRaises(ValueError, posix.confstr, "CS_garbage") self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True)
self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True)
@unittest.skipUnless(hasattr(posix, 'dup2'),
'test needs posix.dup2()')
def test_dup2(self): def test_dup2(self):
if hasattr(posix, 'dup2'): fp1 = open(support.TESTFN)
fp1 = open(support.TESTFN) fp2 = open(support.TESTFN)
fp2 = open(support.TESTFN) try:
try: posix.dup2(fp1.fileno(), fp2.fileno())
posix.dup2(fp1.fileno(), fp2.fileno()) finally:
finally: fp1.close()
fp1.close() fp2.close()
fp2.close()
@unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC") @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC")
@support.requires_linux_version(2, 6, 23) @support.requires_linux_version(2, 6, 23)
@ -322,65 +336,69 @@ class PosixTester(unittest.TestCase):
self.addCleanup(os.close, fd) self.addCleanup(os.close, fd)
self.assertTrue(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC) self.assertTrue(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC)
@unittest.skipUnless(hasattr(posix, 'O_EXLOCK'),
'test needs posix.O_EXLOCK')
def test_osexlock(self): def test_osexlock(self):
if hasattr(posix, "O_EXLOCK"): fd = os.open(support.TESTFN,
os.O_WRONLY|os.O_EXLOCK|os.O_CREAT)
self.assertRaises(OSError, os.open, support.TESTFN,
os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
os.close(fd)
if hasattr(posix, "O_SHLOCK"):
fd = os.open(support.TESTFN, fd = os.open(support.TESTFN,
os.O_WRONLY|os.O_EXLOCK|os.O_CREAT) os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
self.assertRaises(OSError, os.open, support.TESTFN, self.assertRaises(OSError, os.open, support.TESTFN,
os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
os.close(fd) os.close(fd)
if hasattr(posix, "O_SHLOCK"): @unittest.skipUnless(hasattr(posix, 'O_SHLOCK'),
fd = os.open(support.TESTFN, 'test needs posix.O_SHLOCK')
os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
self.assertRaises(OSError, os.open, support.TESTFN,
os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
os.close(fd)
def test_osshlock(self): def test_osshlock(self):
if hasattr(posix, "O_SHLOCK"): fd1 = os.open(support.TESTFN,
fd1 = os.open(support.TESTFN, os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
fd2 = os.open(support.TESTFN,
os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
os.close(fd2)
os.close(fd1)
if hasattr(posix, "O_EXLOCK"):
fd = os.open(support.TESTFN,
os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
fd2 = os.open(support.TESTFN, self.assertRaises(OSError, os.open, support.TESTFN,
os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK)
os.close(fd2) os.close(fd)
os.close(fd1)
if hasattr(posix, "O_EXLOCK"):
fd = os.open(support.TESTFN,
os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
self.assertRaises(OSError, os.open, support.TESTFN,
os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK)
os.close(fd)
@unittest.skipUnless(hasattr(posix, 'fstat'),
'test needs posix.fstat()')
def test_fstat(self): def test_fstat(self):
if hasattr(posix, 'fstat'): fp = open(support.TESTFN)
fp = open(support.TESTFN) try:
try: self.assertTrue(posix.fstat(fp.fileno()))
self.assertTrue(posix.fstat(fp.fileno())) self.assertTrue(posix.stat(fp.fileno()))
self.assertTrue(posix.stat(fp.fileno()))
self.assertRaisesRegex(TypeError, self.assertRaisesRegex(TypeError,
'should be string, bytes or integer, not', 'should be string, bytes or integer, not',
posix.stat, float(fp.fileno())) posix.stat, float(fp.fileno()))
finally: finally:
fp.close() fp.close()
@unittest.skipUnless(hasattr(posix, 'stat'),
'test needs posix.stat()')
def test_stat(self): def test_stat(self):
if hasattr(posix, 'stat'): self.assertTrue(posix.stat(support.TESTFN))
self.assertTrue(posix.stat(support.TESTFN)) self.assertTrue(posix.stat(os.fsencode(support.TESTFN)))
self.assertTrue(posix.stat(os.fsencode(support.TESTFN))) self.assertTrue(posix.stat(bytearray(os.fsencode(support.TESTFN))))
self.assertTrue(posix.stat(bytearray(os.fsencode(support.TESTFN))))
self.assertRaisesRegex(TypeError, self.assertRaisesRegex(TypeError,
'can\'t specify None for path argument', 'can\'t specify None for path argument',
posix.stat, None) posix.stat, None)
self.assertRaisesRegex(TypeError, self.assertRaisesRegex(TypeError,
'should be string, bytes or integer, not', 'should be string, bytes or integer, not',
posix.stat, list(support.TESTFN)) posix.stat, list(support.TESTFN))
self.assertRaisesRegex(TypeError, self.assertRaisesRegex(TypeError,
'should be string, bytes or integer, not', 'should be string, bytes or integer, not',
posix.stat, list(os.fsencode(support.TESTFN))) posix.stat, list(os.fsencode(support.TESTFN)))
@unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()") @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()")
def test_mkfifo(self): def test_mkfifo(self):
@ -495,10 +513,10 @@ class PosixTester(unittest.TestCase):
self._test_all_chown_common(posix.lchown, support.TESTFN, self._test_all_chown_common(posix.lchown, support.TESTFN,
getattr(posix, 'lstat', None)) getattr(posix, 'lstat', None))
@unittest.skipUnless(hasattr(posix, 'chdir'), 'test needs posix.chdir()')
def test_chdir(self): def test_chdir(self):
if hasattr(posix, 'chdir'): posix.chdir(os.curdir)
posix.chdir(os.curdir) self.assertRaises(OSError, posix.chdir, support.TESTFN)
self.assertRaises(OSError, posix.chdir, support.TESTFN)
def test_listdir(self): def test_listdir(self):
self.assertTrue(support.TESTFN in posix.listdir(os.curdir)) self.assertTrue(support.TESTFN in posix.listdir(os.curdir))
@ -528,25 +546,26 @@ class PosixTester(unittest.TestCase):
sorted(posix.listdir(f)) sorted(posix.listdir(f))
) )
@unittest.skipUnless(hasattr(posix, 'access'), 'test needs posix.access()')
def test_access(self): def test_access(self):
if hasattr(posix, 'access'): self.assertTrue(posix.access(support.TESTFN, os.R_OK))
self.assertTrue(posix.access(support.TESTFN, os.R_OK))
@unittest.skipUnless(hasattr(posix, 'umask'), 'test needs posix.umask()')
def test_umask(self): def test_umask(self):
if hasattr(posix, 'umask'): old_mask = posix.umask(0)
old_mask = posix.umask(0) self.assertIsInstance(old_mask, int)
self.assertIsInstance(old_mask, int) posix.umask(old_mask)
posix.umask(old_mask)
@unittest.skipUnless(hasattr(posix, 'strerror'),
'test needs posix.strerror()')
def test_strerror(self): def test_strerror(self):
if hasattr(posix, 'strerror'): self.assertTrue(posix.strerror(0))
self.assertTrue(posix.strerror(0))
@unittest.skipUnless(hasattr(posix, 'pipe'), 'test needs posix.pipe()')
def test_pipe(self): def test_pipe(self):
if hasattr(posix, 'pipe'): reader, writer = posix.pipe()
reader, writer = posix.pipe() os.close(reader)
os.close(reader) os.close(writer)
os.close(writer)
@unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()") @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")
@support.requires_linux_version(2, 6, 27) @support.requires_linux_version(2, 6, 27)
@ -578,15 +597,15 @@ class PosixTester(unittest.TestCase):
self.assertRaises(OverflowError, os.pipe2, _testcapi.INT_MAX + 1) self.assertRaises(OverflowError, os.pipe2, _testcapi.INT_MAX + 1)
self.assertRaises(OverflowError, os.pipe2, _testcapi.UINT_MAX + 1) self.assertRaises(OverflowError, os.pipe2, _testcapi.UINT_MAX + 1)
@unittest.skipUnless(hasattr(posix, 'utime'), 'test needs posix.utime()')
def test_utime(self): def test_utime(self):
if hasattr(posix, 'utime'): now = time.time()
now = time.time() posix.utime(support.TESTFN, None)
posix.utime(support.TESTFN, None) self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None))
self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None)) self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None))
self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None)) self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now))
self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now)) posix.utime(support.TESTFN, (int(now), int(now)))
posix.utime(support.TESTFN, (int(now), int(now))) posix.utime(support.TESTFN, (now, now))
posix.utime(support.TESTFN, (now, now))
def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs): def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs):
st = os.stat(target_file) st = os.stat(target_file)
@ -663,6 +682,7 @@ class PosixTester(unittest.TestCase):
self.assertEqual(type(k), item_type) self.assertEqual(type(k), item_type)
self.assertEqual(type(v), item_type) self.assertEqual(type(v), item_type)
@unittest.skipUnless(hasattr(posix, 'getcwd'), 'test needs posix.getcwd()')
def test_getcwd_long_pathnames(self): def test_getcwd_long_pathnames(self):
if hasattr(posix, 'getcwd'): if hasattr(posix, 'getcwd'):
dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef' dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'

View file

@ -625,10 +625,10 @@ class TestSet(TestJointOps, unittest.TestCase):
myset >= myobj myset >= myobj
self.assertTrue(myobj.le_called) self.assertTrue(myobj.le_called)
# C API test only available in a debug build @unittest.skipUnless(hasattr(set, "test_c_api"),
if hasattr(set, "test_c_api"): 'C API test only available in a debug build')
def test_c_api(self): def test_c_api(self):
self.assertEqual(set().test_c_api(), True) self.assertEqual(set().test_c_api(), True)
class SetSubclass(set): class SetSubclass(set):
pass pass

View file

@ -194,37 +194,37 @@ class TestShutil(unittest.TestCase):
self.assertIn(errors[1][2][1].filename, possible_args) self.assertIn(errors[1][2][1].filename, possible_args)
# See bug #1071513 for why we don't run this on cygwin @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod()')
# and bug #1076467 for why we don't run this as root. @unittest.skipIf(sys.platform[:6] == 'cygwin',
if (hasattr(os, 'chmod') and sys.platform[:6] != 'cygwin' "This test can't be run on Cygwin (issue #1071513).")
and not (hasattr(os, 'geteuid') and os.geteuid() == 0)): @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
def test_on_error(self): "This test can't be run reliably as root (issue #1076467).")
self.errorState = 0 def test_on_error(self):
os.mkdir(TESTFN) self.errorState = 0
self.addCleanup(shutil.rmtree, TESTFN) os.mkdir(TESTFN)
self.addCleanup(shutil.rmtree, TESTFN)
self.child_file_path = os.path.join(TESTFN, 'a') self.child_file_path = os.path.join(TESTFN, 'a')
self.child_dir_path = os.path.join(TESTFN, 'b') self.child_dir_path = os.path.join(TESTFN, 'b')
support.create_empty_file(self.child_file_path) support.create_empty_file(self.child_file_path)
os.mkdir(self.child_dir_path) os.mkdir(self.child_dir_path)
old_dir_mode = os.stat(TESTFN).st_mode old_dir_mode = os.stat(TESTFN).st_mode
old_child_file_mode = os.stat(self.child_file_path).st_mode old_child_file_mode = os.stat(self.child_file_path).st_mode
old_child_dir_mode = os.stat(self.child_dir_path).st_mode old_child_dir_mode = os.stat(self.child_dir_path).st_mode
# Make unwritable. # Make unwritable.
new_mode = stat.S_IREAD|stat.S_IEXEC new_mode = stat.S_IREAD|stat.S_IEXEC
os.chmod(self.child_file_path, new_mode) os.chmod(self.child_file_path, new_mode)
os.chmod(self.child_dir_path, new_mode) os.chmod(self.child_dir_path, new_mode)
os.chmod(TESTFN, new_mode) os.chmod(TESTFN, new_mode)
self.addCleanup(os.chmod, TESTFN, old_dir_mode) self.addCleanup(os.chmod, TESTFN, old_dir_mode)
self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode) self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode) self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror) shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
# Test whether onerror has actually been called. # Test whether onerror has actually been called.
self.assertEqual(self.errorState, 3, self.assertEqual(self.errorState, 3,
"Expected call to onerror function did not " "Expected call to onerror function did not happen.")
"happen.")
def check_args_to_onerror(self, func, arg, exc): def check_args_to_onerror(self, func, arg, exc):
# test_rmtree_errors deliberately runs rmtree # test_rmtree_errors deliberately runs rmtree
@ -806,38 +806,39 @@ class TestShutil(unittest.TestCase):
finally: finally:
shutil.rmtree(TESTFN, ignore_errors=True) shutil.rmtree(TESTFN, ignore_errors=True)
if hasattr(os, "mkfifo"): # Issue #3002: copyfile and copytree block indefinitely on named pipes
# Issue #3002: copyfile and copytree block indefinitely on named pipes @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
def test_copyfile_named_pipe(self): def test_copyfile_named_pipe(self):
os.mkfifo(TESTFN) os.mkfifo(TESTFN)
try: try:
self.assertRaises(shutil.SpecialFileError, self.assertRaises(shutil.SpecialFileError,
shutil.copyfile, TESTFN, TESTFN2) shutil.copyfile, TESTFN, TESTFN2)
self.assertRaises(shutil.SpecialFileError, self.assertRaises(shutil.SpecialFileError,
shutil.copyfile, __file__, TESTFN) shutil.copyfile, __file__, TESTFN)
finally: finally:
os.remove(TESTFN) os.remove(TESTFN)
@support.skip_unless_symlink @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
def test_copytree_named_pipe(self): @support.skip_unless_symlink
os.mkdir(TESTFN) def test_copytree_named_pipe(self):
os.mkdir(TESTFN)
try:
subdir = os.path.join(TESTFN, "subdir")
os.mkdir(subdir)
pipe = os.path.join(subdir, "mypipe")
os.mkfifo(pipe)
try: try:
subdir = os.path.join(TESTFN, "subdir") shutil.copytree(TESTFN, TESTFN2)
os.mkdir(subdir) except shutil.Error as e:
pipe = os.path.join(subdir, "mypipe") errors = e.args[0]
os.mkfifo(pipe) self.assertEqual(len(errors), 1)
try: src, dst, error_msg = errors[0]
shutil.copytree(TESTFN, TESTFN2) self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
except shutil.Error as e: else:
errors = e.args[0] self.fail("shutil.Error should have been raised")
self.assertEqual(len(errors), 1) finally:
src, dst, error_msg = errors[0] shutil.rmtree(TESTFN, ignore_errors=True)
self.assertEqual("`%s` is a named pipe" % pipe, error_msg) shutil.rmtree(TESTFN2, ignore_errors=True)
else:
self.fail("shutil.Error should have been raised")
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
shutil.rmtree(TESTFN2, ignore_errors=True)
def test_copytree_special_func(self): def test_copytree_special_func(self):

View file

@ -772,16 +772,17 @@ class GeneralModuleTests(unittest.TestCase):
self.assertRaises(TypeError, socket.if_nametoindex, 0) self.assertRaises(TypeError, socket.if_nametoindex, 0)
self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF') self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF')
@unittest.skipUnless(hasattr(sys, 'getrefcount'),
'test needs sys.getrefcount()')
def testRefCountGetNameInfo(self): def testRefCountGetNameInfo(self):
# Testing reference count for getnameinfo # Testing reference count for getnameinfo
if hasattr(sys, "getrefcount"): try:
try: # On some versions, this loses a reference
# On some versions, this loses a reference orig = sys.getrefcount(__name__)
orig = sys.getrefcount(__name__) socket.getnameinfo(__name__,0)
socket.getnameinfo(__name__,0) except TypeError:
except TypeError: if sys.getrefcount(__name__) != orig:
if sys.getrefcount(__name__) != orig: self.fail("socket.getnameinfo loses a reference")
self.fail("socket.getnameinfo loses a reference")
def testInterpreterCrash(self): def testInterpreterCrash(self):
# Making sure getnameinfo doesn't crash the interpreter # Making sure getnameinfo doesn't crash the interpreter
@ -886,17 +887,17 @@ class GeneralModuleTests(unittest.TestCase):
# Check that setting it to an invalid type raises TypeError # Check that setting it to an invalid type raises TypeError
self.assertRaises(TypeError, socket.setdefaulttimeout, "spam") self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
@unittest.skipUnless(hasattr(socket, 'inet_aton'),
'test needs socket.inet_aton()')
def testIPv4_inet_aton_fourbytes(self): def testIPv4_inet_aton_fourbytes(self):
if not hasattr(socket, 'inet_aton'):
return # No inet_aton, nothing to check
# Test that issue1008086 and issue767150 are fixed. # Test that issue1008086 and issue767150 are fixed.
# It must return 4 bytes. # It must return 4 bytes.
self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0')) self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0'))
self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255')) self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255'))
@unittest.skipUnless(hasattr(socket, 'inet_pton'),
'test needs socket.inet_pton()')
def testIPv4toString(self): def testIPv4toString(self):
if not hasattr(socket, 'inet_pton'):
return # No inet_pton() on this platform
from socket import inet_aton as f, inet_pton, AF_INET from socket import inet_aton as f, inet_pton, AF_INET
g = lambda a: inet_pton(AF_INET, a) g = lambda a: inet_pton(AF_INET, a)
@ -925,9 +926,9 @@ class GeneralModuleTests(unittest.TestCase):
assertInvalid(g, '1.2.3.4.5') assertInvalid(g, '1.2.3.4.5')
assertInvalid(g, '::1') assertInvalid(g, '::1')
@unittest.skipUnless(hasattr(socket, 'inet_pton'),
'test needs socket.inet_pton()')
def testIPv6toString(self): def testIPv6toString(self):
if not hasattr(socket, 'inet_pton'):
return # No inet_pton() on this platform
try: try:
from socket import inet_pton, AF_INET6, has_ipv6 from socket import inet_pton, AF_INET6, has_ipv6
if not has_ipv6: if not has_ipv6:
@ -979,9 +980,9 @@ class GeneralModuleTests(unittest.TestCase):
assertInvalid('::1.2.3.4:0') assertInvalid('::1.2.3.4:0')
assertInvalid('0.100.200.0:3:4:5:6:7:8') assertInvalid('0.100.200.0:3:4:5:6:7:8')
@unittest.skipUnless(hasattr(socket, 'inet_ntop'),
'test needs socket.inet_ntop()')
def testStringToIPv4(self): def testStringToIPv4(self):
if not hasattr(socket, 'inet_ntop'):
return # No inet_ntop() on this platform
from socket import inet_ntoa as f, inet_ntop, AF_INET from socket import inet_ntoa as f, inet_ntop, AF_INET
g = lambda a: inet_ntop(AF_INET, a) g = lambda a: inet_ntop(AF_INET, a)
assertInvalid = lambda func,a: self.assertRaises( assertInvalid = lambda func,a: self.assertRaises(
@ -1003,9 +1004,9 @@ class GeneralModuleTests(unittest.TestCase):
assertInvalid(g, b'\x00' * 5) assertInvalid(g, b'\x00' * 5)
assertInvalid(g, b'\x00' * 16) assertInvalid(g, b'\x00' * 16)
@unittest.skipUnless(hasattr(socket, 'inet_ntop'),
'test needs socket.inet_ntop()')
def testStringToIPv6(self): def testStringToIPv6(self):
if not hasattr(socket, 'inet_ntop'):
return # No inet_ntop() on this platform
try: try:
from socket import inet_ntop, AF_INET6, has_ipv6 from socket import inet_ntop, AF_INET6, has_ipv6
if not has_ipv6: if not has_ipv6:
@ -3531,6 +3532,8 @@ class TCPCloserTest(ThreadedTCPSocketTest):
self.cli.connect((HOST, self.port)) self.cli.connect((HOST, self.port))
time.sleep(1.0) time.sleep(1.0)
@unittest.skipUnless(hasattr(socket, 'socketpair'),
'test needs socket.socketpair()')
@unittest.skipUnless(thread, 'Threading required for this test.') @unittest.skipUnless(thread, 'Threading required for this test.')
class BasicSocketPairTest(SocketPairTest): class BasicSocketPairTest(SocketPairTest):
@ -3593,26 +3596,27 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
def _testSetBlocking(self): def _testSetBlocking(self):
pass pass
if hasattr(socket, "SOCK_NONBLOCK"): @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
@support.requires_linux_version(2, 6, 28) 'test needs socket.SOCK_NONBLOCK')
def testInitNonBlocking(self): @support.requires_linux_version(2, 6, 28)
# reinit server socket def testInitNonBlocking(self):
self.serv.close() # reinit server socket
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM | self.serv.close()
socket.SOCK_NONBLOCK) self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM |
self.port = support.bind_port(self.serv) socket.SOCK_NONBLOCK)
self.serv.listen(1) self.port = support.bind_port(self.serv)
# actual testing self.serv.listen(1)
start = time.time() # actual testing
try: start = time.time()
self.serv.accept() try:
except socket.error: self.serv.accept()
pass except socket.error:
end = time.time()
self.assertTrue((end - start) < 1.0, "Error creating with non-blocking mode.")
def _testInitNonBlocking(self):
pass pass
end = time.time()
self.assertTrue((end - start) < 1.0, "Error creating with non-blocking mode.")
def _testInitNonBlocking(self):
pass
def testInheritFlags(self): def testInheritFlags(self):
# Issue #7995: when calling accept() on a listening socket with a # Issue #7995: when calling accept() on a listening socket with a
@ -4302,12 +4306,12 @@ class TCPTimeoutTest(SocketTCPTest):
if not ok: if not ok:
self.fail("accept() returned success when we did not expect it") self.fail("accept() returned success when we did not expect it")
@unittest.skipUnless(hasattr(signal, 'alarm'),
'test needs signal.alarm()')
def testInterruptedTimeout(self): def testInterruptedTimeout(self):
# XXX I don't know how to do this test on MSWindows or any other # XXX I don't know how to do this test on MSWindows or any other
# plaform that doesn't support signal.alarm() or os.kill(), though # plaform that doesn't support signal.alarm() or os.kill(), though
# the bug should have existed on all platforms. # the bug should have existed on all platforms.
if not hasattr(signal, "alarm"):
return # can only test on *nix
self.serv.settimeout(5.0) # must be longer than alarm self.serv.settimeout(5.0) # must be longer than alarm
class Alarm(Exception): class Alarm(Exception):
pass pass
@ -4367,6 +4371,7 @@ class TestExceptions(unittest.TestCase):
self.assertTrue(issubclass(socket.gaierror, socket.error)) self.assertTrue(issubclass(socket.gaierror, socket.error))
self.assertTrue(issubclass(socket.timeout, socket.error)) self.assertTrue(issubclass(socket.timeout, socket.error))
@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test')
class TestLinuxAbstractNamespace(unittest.TestCase): class TestLinuxAbstractNamespace(unittest.TestCase):
UNIX_PATH_MAX = 108 UNIX_PATH_MAX = 108
@ -4402,6 +4407,7 @@ class TestLinuxAbstractNamespace(unittest.TestCase):
finally: finally:
s.close() s.close()
@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'test needs socket.AF_UNIX')
class TestUnixDomain(unittest.TestCase): class TestUnixDomain(unittest.TestCase):
def setUp(self): def setUp(self):
@ -4551,10 +4557,10 @@ def isTipcAvailable():
for line in f: for line in f:
if line.startswith("tipc "): if line.startswith("tipc "):
return True return True
if support.verbose:
print("TIPC module is not loaded, please 'sudo modprobe tipc'")
return False return False
@unittest.skipUnless(isTipcAvailable(),
"TIPC module is not loaded, please 'sudo modprobe tipc'")
class TIPCTest(unittest.TestCase): class TIPCTest(unittest.TestCase):
def testRDM(self): def testRDM(self):
srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
@ -4577,6 +4583,8 @@ class TIPCTest(unittest.TestCase):
self.assertEqual(msg, MSG) self.assertEqual(msg, MSG)
@unittest.skipUnless(isTipcAvailable(),
"TIPC module is not loaded, please 'sudo modprobe tipc'")
class TIPCThreadableTest(unittest.TestCase, ThreadableTest): class TIPCThreadableTest(unittest.TestCase, ThreadableTest):
def __init__(self, methodName = 'runTest'): def __init__(self, methodName = 'runTest'):
unittest.TestCase.__init__(self, methodName = methodName) unittest.TestCase.__init__(self, methodName = methodName)
@ -4842,15 +4850,10 @@ def test_main():
CloexecConstantTest, CloexecConstantTest,
NonblockConstantTest NonblockConstantTest
]) ])
if hasattr(socket, "socketpair"): tests.append(BasicSocketPairTest)
tests.append(BasicSocketPairTest) tests.append(TestUnixDomain)
if hasattr(socket, "AF_UNIX"): tests.append(TestLinuxAbstractNamespace)
tests.append(TestUnixDomain) tests.extend([TIPCTest, TIPCThreadableTest])
if sys.platform == 'linux':
tests.append(TestLinuxAbstractNamespace)
if isTipcAvailable():
tests.append(TIPCTest)
tests.append(TIPCThreadableTest)
tests.extend([BasicCANTest, CANTest]) tests.extend([BasicCANTest, CANTest])
tests.extend([BasicRDSTest, RDSTest]) tests.extend([BasicRDSTest, RDSTest])
tests.extend([ tests.extend([

View file

@ -27,7 +27,10 @@ TEST_STR = b"hello world\n"
HOST = test.support.HOST HOST = test.support.HOST
HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX") HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
requires_unix_sockets = unittest.skipUnless(HAVE_UNIX_SOCKETS,
'requires Unix sockets')
HAVE_FORKING = hasattr(os, "fork") and os.name != "os2" HAVE_FORKING = hasattr(os, "fork") and os.name != "os2"
requires_forking = unittest.skipUnless(HAVE_FORKING, 'requires forking')
def signal_alarm(n): def signal_alarm(n):
"""Call signal.alarm when it exists (i.e. not on Windows).""" """Call signal.alarm when it exists (i.e. not on Windows)."""
@ -189,31 +192,33 @@ class SocketServerTest(unittest.TestCase):
socketserver.StreamRequestHandler, socketserver.StreamRequestHandler,
self.stream_examine) self.stream_examine)
if HAVE_FORKING: @requires_forking
def test_ForkingTCPServer(self): def test_ForkingTCPServer(self):
with simple_subprocess(self): with simple_subprocess(self):
self.run_server(socketserver.ForkingTCPServer, self.run_server(socketserver.ForkingTCPServer,
socketserver.StreamRequestHandler,
self.stream_examine)
if HAVE_UNIX_SOCKETS:
def test_UnixStreamServer(self):
self.run_server(socketserver.UnixStreamServer,
socketserver.StreamRequestHandler, socketserver.StreamRequestHandler,
self.stream_examine) self.stream_examine)
def test_ThreadingUnixStreamServer(self): @requires_unix_sockets
self.run_server(socketserver.ThreadingUnixStreamServer, def test_UnixStreamServer(self):
self.run_server(socketserver.UnixStreamServer,
socketserver.StreamRequestHandler,
self.stream_examine)
@requires_unix_sockets
def test_ThreadingUnixStreamServer(self):
self.run_server(socketserver.ThreadingUnixStreamServer,
socketserver.StreamRequestHandler,
self.stream_examine)
@requires_unix_sockets
@requires_forking
def test_ForkingUnixStreamServer(self):
with simple_subprocess(self):
self.run_server(ForkingUnixStreamServer,
socketserver.StreamRequestHandler, socketserver.StreamRequestHandler,
self.stream_examine) self.stream_examine)
if HAVE_FORKING:
def test_ForkingUnixStreamServer(self):
with simple_subprocess(self):
self.run_server(ForkingUnixStreamServer,
socketserver.StreamRequestHandler,
self.stream_examine)
def test_UDPServer(self): def test_UDPServer(self):
self.run_server(socketserver.UDPServer, self.run_server(socketserver.UDPServer,
socketserver.DatagramRequestHandler, socketserver.DatagramRequestHandler,
@ -224,12 +229,12 @@ class SocketServerTest(unittest.TestCase):
socketserver.DatagramRequestHandler, socketserver.DatagramRequestHandler,
self.dgram_examine) self.dgram_examine)
if HAVE_FORKING: @requires_forking
def test_ForkingUDPServer(self): def test_ForkingUDPServer(self):
with simple_subprocess(self): with simple_subprocess(self):
self.run_server(socketserver.ForkingUDPServer, self.run_server(socketserver.ForkingUDPServer,
socketserver.DatagramRequestHandler, socketserver.DatagramRequestHandler,
self.dgram_examine) self.dgram_examine)
@contextlib.contextmanager @contextlib.contextmanager
def mocked_select_module(self): def mocked_select_module(self):
@ -266,22 +271,24 @@ class SocketServerTest(unittest.TestCase):
# Alas, on Linux (at least) recvfrom() doesn't return a meaningful # Alas, on Linux (at least) recvfrom() doesn't return a meaningful
# client address so this cannot work: # client address so this cannot work:
# if HAVE_UNIX_SOCKETS: # @requires_unix_sockets
# def test_UnixDatagramServer(self): # def test_UnixDatagramServer(self):
# self.run_server(socketserver.UnixDatagramServer, # self.run_server(socketserver.UnixDatagramServer,
# socketserver.DatagramRequestHandler, # socketserver.DatagramRequestHandler,
# self.dgram_examine) # self.dgram_examine)
# #
# def test_ThreadingUnixDatagramServer(self): # @requires_unix_sockets
# self.run_server(socketserver.ThreadingUnixDatagramServer, # def test_ThreadingUnixDatagramServer(self):
# socketserver.DatagramRequestHandler, # self.run_server(socketserver.ThreadingUnixDatagramServer,
# self.dgram_examine) # socketserver.DatagramRequestHandler,
# self.dgram_examine)
# #
# if HAVE_FORKING: # @requires_unix_sockets
# def test_ForkingUnixDatagramServer(self): # @requires_forking
# self.run_server(socketserver.ForkingUnixDatagramServer, # def test_ForkingUnixDatagramServer(self):
# socketserver.DatagramRequestHandler, # self.run_server(socketserver.ForkingUnixDatagramServer,
# self.dgram_examine) # socketserver.DatagramRequestHandler,
# self.dgram_examine)
@reap_threads @reap_threads
def test_shutdown(self): def test_shutdown(self):

View file

@ -291,15 +291,16 @@ class SysModuleTest(unittest.TestCase):
def test_call_tracing(self): def test_call_tracing(self):
self.assertRaises(TypeError, sys.call_tracing, type, 2) self.assertRaises(TypeError, sys.call_tracing, type, 2)
@unittest.skipUnless(hasattr(sys, "setdlopenflags"),
'test needs sys.setdlopenflags()')
def test_dlopenflags(self): def test_dlopenflags(self):
if hasattr(sys, "setdlopenflags"): self.assertTrue(hasattr(sys, "getdlopenflags"))
self.assertTrue(hasattr(sys, "getdlopenflags")) self.assertRaises(TypeError, sys.getdlopenflags, 42)
self.assertRaises(TypeError, sys.getdlopenflags, 42) oldflags = sys.getdlopenflags()
oldflags = sys.getdlopenflags() self.assertRaises(TypeError, sys.setdlopenflags)
self.assertRaises(TypeError, sys.setdlopenflags) sys.setdlopenflags(oldflags+1)
sys.setdlopenflags(oldflags+1) self.assertEqual(sys.getdlopenflags(), oldflags+1)
self.assertEqual(sys.getdlopenflags(), oldflags+1) sys.setdlopenflags(oldflags)
sys.setdlopenflags(oldflags)
@test.support.refcount_test @test.support.refcount_test
def test_refcount(self): def test_refcount(self):

View file

@ -271,11 +271,10 @@ class WarnTests(BaseTest):
finally: finally:
warning_tests.__file__ = filename warning_tests.__file__ = filename
@unittest.skipUnless(hasattr(sys, 'argv'), 'test needs sys.argv')
def test_missing_filename_main_with_argv(self): def test_missing_filename_main_with_argv(self):
# If __file__ is not specified and the caller is __main__ and sys.argv # If __file__ is not specified and the caller is __main__ and sys.argv
# exists, then use sys.argv[0] as the file. # exists, then use sys.argv[0] as the file.
if not hasattr(sys, 'argv'):
return
filename = warning_tests.__file__ filename = warning_tests.__file__
module_name = warning_tests.__name__ module_name = warning_tests.__name__
try: try:

View file

@ -7,6 +7,13 @@ from test.support import bigmemtest, _1G, _4G
zlib = support.import_module('zlib') zlib = support.import_module('zlib')
requires_Compress_copy = unittest.skipUnless(
hasattr(zlib.compressobj(), "copy"),
'requires Compress.copy()')
requires_Decompress_copy = unittest.skipUnless(
hasattr(zlib.decompressobj(), "copy"),
'requires Decompress.copy()')
class VersionTestCase(unittest.TestCase): class VersionTestCase(unittest.TestCase):
@ -381,39 +388,39 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
"mode=%i, level=%i") % (sync, level)) "mode=%i, level=%i") % (sync, level))
del obj del obj
@unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
'requires zlib.Z_SYNC_FLUSH')
def test_odd_flush(self): def test_odd_flush(self):
# Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
import random import random
# Testing on 17K of "random" data
if hasattr(zlib, 'Z_SYNC_FLUSH'): # Create compressor and decompressor objects
# Testing on 17K of "random" data co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
dco = zlib.decompressobj()
# Create compressor and decompressor objects # Try 17K of data
co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) # generate random data stream
dco = zlib.decompressobj() try:
# In 2.3 and later, WichmannHill is the RNG of the bug report
# Try 17K of data gen = random.WichmannHill()
# generate random data stream except AttributeError:
try: try:
# In 2.3 and later, WichmannHill is the RNG of the bug report # 2.2 called it Random
gen = random.WichmannHill() gen = random.Random()
except AttributeError: except AttributeError:
try: # others might simply have a single RNG
# 2.2 called it Random gen = random
gen = random.Random() gen.seed(1)
except AttributeError: data = genblock(1, 17 * 1024, generator=gen)
# others might simply have a single RNG
gen = random
gen.seed(1)
data = genblock(1, 17 * 1024, generator=gen)
# compress, sync-flush, and decompress # compress, sync-flush, and decompress
first = co.compress(data) first = co.compress(data)
second = co.flush(zlib.Z_SYNC_FLUSH) second = co.flush(zlib.Z_SYNC_FLUSH)
expanded = dco.decompress(first + second) expanded = dco.decompress(first + second)
# if decompressed data is different from the input data, choke. # if decompressed data is different from the input data, choke.
self.assertEqual(expanded, data, "17K random source doesn't match") self.assertEqual(expanded, data, "17K random source doesn't match")
def test_empty_flush(self): def test_empty_flush(self):
# Test that calling .flush() on unused objects works. # Test that calling .flush() on unused objects works.
@ -525,67 +532,69 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
data = zlib.compress(input2) data = zlib.compress(input2)
self.assertEqual(dco.flush(), input1[1:]) self.assertEqual(dco.flush(), input1[1:])
if hasattr(zlib.compressobj(), "copy"): @requires_Compress_copy
def test_compresscopy(self): def test_compresscopy(self):
# Test copying a compression object # Test copying a compression object
data0 = HAMLET_SCENE data0 = HAMLET_SCENE
data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii") data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii")
c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
bufs0 = [] bufs0 = []
bufs0.append(c0.compress(data0)) bufs0.append(c0.compress(data0))
c1 = c0.copy() c1 = c0.copy()
bufs1 = bufs0[:] bufs1 = bufs0[:]
bufs0.append(c0.compress(data0)) bufs0.append(c0.compress(data0))
bufs0.append(c0.flush()) bufs0.append(c0.flush())
s0 = b''.join(bufs0) s0 = b''.join(bufs0)
bufs1.append(c1.compress(data1)) bufs1.append(c1.compress(data1))
bufs1.append(c1.flush()) bufs1.append(c1.flush())
s1 = b''.join(bufs1) s1 = b''.join(bufs1)
self.assertEqual(zlib.decompress(s0),data0+data0) self.assertEqual(zlib.decompress(s0),data0+data0)
self.assertEqual(zlib.decompress(s1),data0+data1) self.assertEqual(zlib.decompress(s1),data0+data1)
def test_badcompresscopy(self): @requires_Compress_copy
# Test copying a compression object in an inconsistent state def test_badcompresscopy(self):
c = zlib.compressobj() # Test copying a compression object in an inconsistent state
c.compress(HAMLET_SCENE) c = zlib.compressobj()
c.flush() c.compress(HAMLET_SCENE)
self.assertRaises(ValueError, c.copy) c.flush()
self.assertRaises(ValueError, c.copy)
if hasattr(zlib.decompressobj(), "copy"): @requires_Decompress_copy
def test_decompresscopy(self): def test_decompresscopy(self):
# Test copying a decompression object # Test copying a decompression object
data = HAMLET_SCENE data = HAMLET_SCENE
comp = zlib.compress(data) comp = zlib.compress(data)
# Test type of return value # Test type of return value
self.assertIsInstance(comp, bytes) self.assertIsInstance(comp, bytes)
d0 = zlib.decompressobj() d0 = zlib.decompressobj()
bufs0 = [] bufs0 = []
bufs0.append(d0.decompress(comp[:32])) bufs0.append(d0.decompress(comp[:32]))
d1 = d0.copy() d1 = d0.copy()
bufs1 = bufs0[:] bufs1 = bufs0[:]
bufs0.append(d0.decompress(comp[32:])) bufs0.append(d0.decompress(comp[32:]))
s0 = b''.join(bufs0) s0 = b''.join(bufs0)
bufs1.append(d1.decompress(comp[32:])) bufs1.append(d1.decompress(comp[32:]))
s1 = b''.join(bufs1) s1 = b''.join(bufs1)
self.assertEqual(s0,s1) self.assertEqual(s0,s1)
self.assertEqual(s0,data) self.assertEqual(s0,data)
def test_baddecompresscopy(self): @requires_Decompress_copy
# Test copying a compression object in an inconsistent state def test_baddecompresscopy(self):
data = zlib.compress(HAMLET_SCENE) # Test copying a compression object in an inconsistent state
d = zlib.decompressobj() data = zlib.compress(HAMLET_SCENE)
d.decompress(data) d = zlib.decompressobj()
d.flush() d.decompress(data)
self.assertRaises(ValueError, d.copy) d.flush()
self.assertRaises(ValueError, d.copy)
# Memory use of the following functions takes into account overallocation # Memory use of the following functions takes into account overallocation

View file

@ -29,6 +29,8 @@ Library
Tests Tests
----- -----
- Issue #18702: All skipped tests now reported as skipped.
- Issue #19085: Added basic tests for all tkinter widget options. - Issue #19085: Added basic tests for all tkinter widget options.