Merged revisions 83183,83186 via svnmerge from

svn+ssh://pythondev@svn.python.org/python/branches/py3k

........
  r83183 | ezio.melotti | 2010-07-28 01:03:33 +0300 (Wed, 28 Jul 2010) | 1 line

  Use proper skips and assert* methods in test_asyncore.
........
  r83186 | ezio.melotti | 2010-07-28 01:24:13 +0300 (Wed, 28 Jul 2010) | 1 line

  With skipUnless there is no need to add test classes conditionally.
........
This commit is contained in:
Ezio Melotti 2010-07-27 23:45:05 +00:00
parent e8d2eee0fa
commit 3ff4463b35

View file

@ -117,65 +117,65 @@ class HelperFunctionTests(unittest.TestCase):
# http://mail.python.org/pipermail/python-list/2001-October/109973.html)
# These constants should be present as long as poll is available
if hasattr(select, 'poll'):
def test_readwrite(self):
# Check that correct methods are called by readwrite()
@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
def test_readwrite(self):
# Check that correct methods are called by readwrite()
attributes = ('read', 'expt', 'write', 'closed', 'error_handled')
attributes = ('read', 'expt', 'write', 'closed', 'error_handled')
expected = (
(select.POLLIN, 'read'),
(select.POLLPRI, 'expt'),
(select.POLLOUT, 'write'),
(select.POLLERR, 'closed'),
(select.POLLHUP, 'closed'),
(select.POLLNVAL, 'closed'),
)
expected = (
(select.POLLIN, 'read'),
(select.POLLPRI, 'expt'),
(select.POLLOUT, 'write'),
(select.POLLERR, 'closed'),
(select.POLLHUP, 'closed'),
(select.POLLNVAL, 'closed'),
)
class testobj:
def __init__(self):
self.read = False
self.write = False
self.closed = False
self.expt = False
self.error_handled = False
class testobj:
def __init__(self):
self.read = False
self.write = False
self.closed = False
self.expt = False
self.error_handled = False
def handle_read_event(self):
self.read = True
def handle_read_event(self):
self.read = True
def handle_write_event(self):
self.write = True
def handle_write_event(self):
self.write = True
def handle_close(self):
self.closed = True
def handle_close(self):
self.closed = True
def handle_expt_event(self):
self.expt = True
def handle_expt_event(self):
self.expt = True
def handle_error(self):
self.error_handled = True
def handle_error(self):
self.error_handled = True
for flag, expectedattr in expected:
tobj = testobj()
self.assertEqual(getattr(tobj, expectedattr), False)
asyncore.readwrite(tobj, flag)
for flag, expectedattr in expected:
tobj = testobj()
self.assertEqual(getattr(tobj, expectedattr), False)
asyncore.readwrite(tobj, flag)
# Only the attribute modified by the routine we expect to be
# called should be True.
for attr in attributes:
self.assertEqual(getattr(tobj, attr), attr==expectedattr)
# Only the attribute modified by the routine we expect to be
# called should be True.
for attr in attributes:
self.assertEqual(getattr(tobj, attr), attr==expectedattr)
# check that ExitNow exceptions in the object handler method
# bubbles all the way up through asyncore readwrite call
tr1 = exitingdummy()
self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
# check that ExitNow exceptions in the object handler method
# bubbles all the way up through asyncore readwrite call
tr1 = exitingdummy()
self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
# check that an exception other than ExitNow in the object handler
# method causes the handle_error method to get called
tr2 = crashingdummy()
self.assertEqual(tr2.error_handled, False)
asyncore.readwrite(tr2, flag)
self.assertEqual(tr2.error_handled, True)
# check that an exception other than ExitNow in the object handler
# method causes the handle_error method to get called
tr2 = crashingdummy()
self.assertEqual(tr2.error_handled, False)
asyncore.readwrite(tr2, flag)
self.assertEqual(tr2.error_handled, True)
def test_closeall(self):
self.closeall_check(False)
@ -258,7 +258,7 @@ class DispatcherTests(unittest.TestCase):
sys.stderr = stderr
lines = fp.getvalue().splitlines()
self.assertEquals(lines, ['log: %s' % l1, 'log: %s' % l2])
self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2])
def test_log_info(self):
d = asyncore.dispatcher()
@ -280,7 +280,7 @@ class DispatcherTests(unittest.TestCase):
lines = fp.getvalue().splitlines()
expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
self.assertEquals(lines, expected)
self.assertEqual(lines, expected)
def test_unhandled(self):
d = asyncore.dispatcher()
@ -305,7 +305,7 @@ class DispatcherTests(unittest.TestCase):
'warning: unhandled write event',
'warning: unhandled connect event',
'warning: unhandled accept event']
self.assertEquals(lines, expected)
self.assertEqual(lines, expected)
def test_issue_8594(self):
# XXX - this test is supposed to be removed in next major Python
@ -321,7 +321,7 @@ class DispatcherTests(unittest.TestCase):
warnings.simplefilter("always")
family = d.family
self.assertEqual(family, socket.AF_INET)
self.assertTrue(len(w) == 1)
self.assertEqual(len(w), 1)
self.assertTrue(issubclass(w[0].category, DeprecationWarning))
def test_strerror(self):
@ -330,7 +330,7 @@ class DispatcherTests(unittest.TestCase):
if hasattr(os, 'strerror'):
self.assertEqual(err, os.strerror(errno.EPERM))
err = asyncore._strerror(-1)
self.assertTrue("unknown error" in err.lower())
self.assertIn("unknown error", err.lower())
class dispatcherwithsend_noread(asyncore.dispatcher_with_send):
@ -393,38 +393,40 @@ class DispatcherWithSendTests(unittest.TestCase):
class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests):
usepoll = True
if hasattr(asyncore, 'file_wrapper'):
class FileWrapperTest(unittest.TestCase):
def setUp(self):
self.d = "It's not dead, it's sleeping!"
file(TESTFN, 'w').write(self.d)
@unittest.skipUnless(hasattr(asyncore, 'file_wrapper'),
'asyncore.file_wrapper required')
class FileWrapperTest(unittest.TestCase):
def setUp(self):
self.d = "It's not dead, it's sleeping!"
file(TESTFN, 'w').write(self.d)
def tearDown(self):
unlink(TESTFN)
def tearDown(self):
unlink(TESTFN)
def test_recv(self):
fd = os.open(TESTFN, os.O_RDONLY)
w = asyncore.file_wrapper(fd)
os.close(fd)
def test_recv(self):
fd = os.open(TESTFN, os.O_RDONLY)
w = asyncore.file_wrapper(fd)
os.close(fd)
self.assertNotEqual(w.fd, fd)
self.assertNotEqual(w.fileno(), fd)
self.assertEqual(w.recv(13), "It's not dead")
self.assertEqual(w.read(6), ", it's")
w.close()
self.assertRaises(OSError, w.read, 1)
self.assertNotEqual(w.fd, fd)
self.assertNotEqual(w.fileno(), fd)
self.assertEqual(w.recv(13), "It's not dead")
self.assertEqual(w.read(6), ", it's")
w.close()
self.assertRaises(OSError, w.read, 1)
def test_send(self):
d1 = "Come again?"
d2 = "I want to buy some cheese."
fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)
w = asyncore.file_wrapper(fd)
os.close(fd)
w.write(d1)
w.send(d2)
w.close()
self.assertEqual(file(TESTFN).read(), self.d + d1 + d2)
def test_send(self):
d1 = "Come again?"
d2 = "I want to buy some cheese."
fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)
w = asyncore.file_wrapper(fd)
os.close(fd)
w.write(d1)
w.send(d2)
w.close()
self.assertEqual(file(TESTFN).read(), self.d + d1 + d2)
class BaseTestHandler(asyncore.dispatcher):
@ -691,18 +693,15 @@ class BaseTestAPI(unittest.TestCase):
class TestAPI_UseSelect(BaseTestAPI):
use_poll = False
@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
class TestAPI_UsePoll(BaseTestAPI):
use_poll = True
def test_main():
tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests,
DispatcherWithSendTests_UsePoll, TestAPI_UseSelect]
if hasattr(asyncore, 'file_wrapper'):
tests.append(FileWrapperTest)
if hasattr(select, 'poll'):
tests.append(TestAPI_UsePoll)
DispatcherWithSendTests_UsePoll, TestAPI_UseSelect,
TestAPI_UsePoll, FileWrapperTest]
run_unittest(*tests)
if __name__ == "__main__":