mirror of
https://github.com/python/cpython.git
synced 2025-09-03 15:31:08 +00:00
Use proper skips and assert* methods in test_asyncore.
This commit is contained in:
parent
f1046ca817
commit
63c4640327
1 changed files with 80 additions and 79 deletions
|
@ -118,65 +118,65 @@ class HelperFunctionTests(unittest.TestCase):
|
||||||
# http://mail.python.org/pipermail/python-list/2001-October/109973.html)
|
# http://mail.python.org/pipermail/python-list/2001-October/109973.html)
|
||||||
# These constants should be present as long as poll is available
|
# These constants should be present as long as poll is available
|
||||||
|
|
||||||
if hasattr(select, 'poll'):
|
@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
|
||||||
def test_readwrite(self):
|
def test_readwrite(self):
|
||||||
# Check that correct methods are called by readwrite()
|
# Check that correct methods are called by readwrite()
|
||||||
|
|
||||||
attributes = ('read', 'expt', 'write', 'closed', 'error_handled')
|
attributes = ('read', 'expt', 'write', 'closed', 'error_handled')
|
||||||
|
|
||||||
expected = (
|
expected = (
|
||||||
(select.POLLIN, 'read'),
|
(select.POLLIN, 'read'),
|
||||||
(select.POLLPRI, 'expt'),
|
(select.POLLPRI, 'expt'),
|
||||||
(select.POLLOUT, 'write'),
|
(select.POLLOUT, 'write'),
|
||||||
(select.POLLERR, 'closed'),
|
(select.POLLERR, 'closed'),
|
||||||
(select.POLLHUP, 'closed'),
|
(select.POLLHUP, 'closed'),
|
||||||
(select.POLLNVAL, 'closed'),
|
(select.POLLNVAL, 'closed'),
|
||||||
)
|
)
|
||||||
|
|
||||||
class testobj:
|
class testobj:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.read = False
|
self.read = False
|
||||||
self.write = False
|
self.write = False
|
||||||
self.closed = False
|
self.closed = False
|
||||||
self.expt = False
|
self.expt = False
|
||||||
self.error_handled = False
|
self.error_handled = False
|
||||||
|
|
||||||
def handle_read_event(self):
|
def handle_read_event(self):
|
||||||
self.read = True
|
self.read = True
|
||||||
|
|
||||||
def handle_write_event(self):
|
def handle_write_event(self):
|
||||||
self.write = True
|
self.write = True
|
||||||
|
|
||||||
def handle_close(self):
|
def handle_close(self):
|
||||||
self.closed = True
|
self.closed = True
|
||||||
|
|
||||||
def handle_expt_event(self):
|
def handle_expt_event(self):
|
||||||
self.expt = True
|
self.expt = True
|
||||||
|
|
||||||
def handle_error(self):
|
def handle_error(self):
|
||||||
self.error_handled = True
|
self.error_handled = True
|
||||||
|
|
||||||
for flag, expectedattr in expected:
|
for flag, expectedattr in expected:
|
||||||
tobj = testobj()
|
tobj = testobj()
|
||||||
self.assertEqual(getattr(tobj, expectedattr), False)
|
self.assertEqual(getattr(tobj, expectedattr), False)
|
||||||
asyncore.readwrite(tobj, flag)
|
asyncore.readwrite(tobj, flag)
|
||||||
|
|
||||||
# Only the attribute modified by the routine we expect to be
|
# Only the attribute modified by the routine we expect to be
|
||||||
# called should be True.
|
# called should be True.
|
||||||
for attr in attributes:
|
for attr in attributes:
|
||||||
self.assertEqual(getattr(tobj, attr), attr==expectedattr)
|
self.assertEqual(getattr(tobj, attr), attr==expectedattr)
|
||||||
|
|
||||||
# check that ExitNow exceptions in the object handler method
|
# check that ExitNow exceptions in the object handler method
|
||||||
# bubbles all the way up through asyncore readwrite call
|
# bubbles all the way up through asyncore readwrite call
|
||||||
tr1 = exitingdummy()
|
tr1 = exitingdummy()
|
||||||
self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
|
self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
|
||||||
|
|
||||||
# check that an exception other than ExitNow in the object handler
|
# check that an exception other than ExitNow in the object handler
|
||||||
# method causes the handle_error method to get called
|
# method causes the handle_error method to get called
|
||||||
tr2 = crashingdummy()
|
tr2 = crashingdummy()
|
||||||
self.assertEqual(tr2.error_handled, False)
|
self.assertEqual(tr2.error_handled, False)
|
||||||
asyncore.readwrite(tr2, flag)
|
asyncore.readwrite(tr2, flag)
|
||||||
self.assertEqual(tr2.error_handled, True)
|
self.assertEqual(tr2.error_handled, True)
|
||||||
|
|
||||||
def test_closeall(self):
|
def test_closeall(self):
|
||||||
self.closeall_check(False)
|
self.closeall_check(False)
|
||||||
|
@ -259,7 +259,7 @@ class DispatcherTests(unittest.TestCase):
|
||||||
sys.stderr = stderr
|
sys.stderr = stderr
|
||||||
|
|
||||||
lines = fp.getvalue().splitlines()
|
lines = fp.getvalue().splitlines()
|
||||||
self.assertEquals(lines, ['log: %s' % l1, 'log: %s' % l2])
|
self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2])
|
||||||
|
|
||||||
def test_log_info(self):
|
def test_log_info(self):
|
||||||
d = asyncore.dispatcher()
|
d = asyncore.dispatcher()
|
||||||
|
@ -281,7 +281,7 @@ class DispatcherTests(unittest.TestCase):
|
||||||
lines = fp.getvalue().splitlines()
|
lines = fp.getvalue().splitlines()
|
||||||
expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
|
expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
|
||||||
|
|
||||||
self.assertEquals(lines, expected)
|
self.assertEqual(lines, expected)
|
||||||
|
|
||||||
def test_unhandled(self):
|
def test_unhandled(self):
|
||||||
d = asyncore.dispatcher()
|
d = asyncore.dispatcher()
|
||||||
|
@ -306,7 +306,7 @@ class DispatcherTests(unittest.TestCase):
|
||||||
'warning: unhandled write event',
|
'warning: unhandled write event',
|
||||||
'warning: unhandled connect event',
|
'warning: unhandled connect event',
|
||||||
'warning: unhandled accept event']
|
'warning: unhandled accept event']
|
||||||
self.assertEquals(lines, expected)
|
self.assertEqual(lines, expected)
|
||||||
|
|
||||||
def test_issue_8594(self):
|
def test_issue_8594(self):
|
||||||
# XXX - this test is supposed to be removed in next major Python
|
# XXX - this test is supposed to be removed in next major Python
|
||||||
|
@ -322,7 +322,7 @@ class DispatcherTests(unittest.TestCase):
|
||||||
warnings.simplefilter("always")
|
warnings.simplefilter("always")
|
||||||
family = d.family
|
family = d.family
|
||||||
self.assertEqual(family, socket.AF_INET)
|
self.assertEqual(family, socket.AF_INET)
|
||||||
self.assertTrue(len(w) == 1)
|
self.assertEqual(len(w), 1)
|
||||||
self.assertTrue(issubclass(w[0].category, DeprecationWarning))
|
self.assertTrue(issubclass(w[0].category, DeprecationWarning))
|
||||||
|
|
||||||
def test_strerror(self):
|
def test_strerror(self):
|
||||||
|
@ -331,7 +331,7 @@ class DispatcherTests(unittest.TestCase):
|
||||||
if hasattr(os, 'strerror'):
|
if hasattr(os, 'strerror'):
|
||||||
self.assertEqual(err, os.strerror(errno.EPERM))
|
self.assertEqual(err, os.strerror(errno.EPERM))
|
||||||
err = asyncore._strerror(-1)
|
err = asyncore._strerror(-1)
|
||||||
self.assertTrue("unknown error" in err.lower())
|
self.assertIn("unknown error", err.lower())
|
||||||
|
|
||||||
|
|
||||||
class dispatcherwithsend_noread(asyncore.dispatcher_with_send):
|
class dispatcherwithsend_noread(asyncore.dispatcher_with_send):
|
||||||
|
@ -394,38 +394,39 @@ class DispatcherWithSendTests(unittest.TestCase):
|
||||||
class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests):
|
class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests):
|
||||||
usepoll = True
|
usepoll = True
|
||||||
|
|
||||||
if hasattr(asyncore, 'file_wrapper'):
|
@unittest.skipUnless(hasattr(asyncore, 'file_wrapper'),
|
||||||
class FileWrapperTest(unittest.TestCase):
|
'asyncore.file_wrapper required')
|
||||||
def setUp(self):
|
class FileWrapperTest(unittest.TestCase):
|
||||||
self.d = b"It's not dead, it's sleeping!"
|
def setUp(self):
|
||||||
open(TESTFN, 'wb').write(self.d)
|
self.d = b"It's not dead, it's sleeping!"
|
||||||
|
open(TESTFN, 'wb').write(self.d)
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
unlink(TESTFN)
|
unlink(TESTFN)
|
||||||
|
|
||||||
def test_recv(self):
|
def test_recv(self):
|
||||||
fd = os.open(TESTFN, os.O_RDONLY)
|
fd = os.open(TESTFN, os.O_RDONLY)
|
||||||
w = asyncore.file_wrapper(fd)
|
w = asyncore.file_wrapper(fd)
|
||||||
os.close(fd)
|
os.close(fd)
|
||||||
|
|
||||||
self.assertNotEqual(w.fd, fd)
|
self.assertNotEqual(w.fd, fd)
|
||||||
self.assertNotEqual(w.fileno(), fd)
|
self.assertNotEqual(w.fileno(), fd)
|
||||||
self.assertEqual(w.recv(13), b"It's not dead")
|
self.assertEqual(w.recv(13), b"It's not dead")
|
||||||
self.assertEqual(w.read(6), b", it's")
|
self.assertEqual(w.read(6), b", it's")
|
||||||
w.close()
|
w.close()
|
||||||
self.assertRaises(OSError, w.read, 1)
|
self.assertRaises(OSError, w.read, 1)
|
||||||
|
|
||||||
def test_send(self):
|
def test_send(self):
|
||||||
d1 = b"Come again?"
|
d1 = b"Come again?"
|
||||||
d2 = b"I want to buy some cheese."
|
d2 = b"I want to buy some cheese."
|
||||||
fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)
|
fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)
|
||||||
w = asyncore.file_wrapper(fd)
|
w = asyncore.file_wrapper(fd)
|
||||||
os.close(fd)
|
os.close(fd)
|
||||||
|
|
||||||
w.write(d1)
|
w.write(d1)
|
||||||
w.send(d2)
|
w.send(d2)
|
||||||
w.close()
|
w.close()
|
||||||
self.assertEqual(open(TESTFN, 'rb').read(), self.d + d1 + d2)
|
self.assertEqual(open(TESTFN, 'rb').read(), self.d + d1 + d2)
|
||||||
|
|
||||||
|
|
||||||
class BaseTestHandler(asyncore.dispatcher):
|
class BaseTestHandler(asyncore.dispatcher):
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue