Fixes Issue #14635: telnetlib will use poll() rather than select() when possible

to avoid failing due to the select() file descriptor limit.

Contributed by Akintayo Holder and under the Google contributor agreement.
This commit is contained in:
Gregory P. Smith 2012-07-15 22:16:06 -07:00
parent 7d8a2e41a0
commit e0c22206e4
4 changed files with 224 additions and 1 deletions

View file

@ -135,6 +135,28 @@ class ReadTests(TestCase):
self.assertEqual(data, want[0])
self.assertEqual(telnet.read_all(), 'not seen')
def test_read_until_with_poll(self):
"""Use select.poll() to implement telnet.read_until()."""
want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
if not telnet._has_poll:
raise unittest.SkipTest('select.poll() is required')
telnet._has_poll = True
self.dataq.join()
data = telnet.read_until('match')
self.assertEqual(data, ''.join(want[:-2]))
def test_read_until_with_select(self):
"""Use select.select() to implement telnet.read_until()."""
want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
telnet._has_poll = False
self.dataq.join()
data = telnet.read_until('match')
self.assertEqual(data, ''.join(want[:-2]))
def test_read_all_A(self):
"""
read_all()
@ -357,8 +379,75 @@ class OptionTests(TestCase):
self.assertEqual('', telnet.read_sb_data())
nego.sb_getter = None # break the nego => telnet cycle
class ExpectTests(TestCase):
def setUp(self):
self.evt = threading.Event()
self.dataq = Queue.Queue()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(10)
self.port = test_support.bind_port(self.sock)
self.thread = threading.Thread(target=server, args=(self.evt,self.sock,
self.dataq))
self.thread.start()
self.evt.wait()
def tearDown(self):
self.thread.join()
# use a similar approach to testing timeouts as test_timeout.py
# these will never pass 100% but make the fuzz big enough that it is rare
block_long = 0.6
block_short = 0.3
def test_expect_A(self):
"""
expect(expected, [timeout])
Read until the expected string has been seen, or a timeout is
hit (default is no timeout); may block.
"""
want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
(_,_,data) = telnet.expect(['match'])
self.assertEqual(data, ''.join(want[:-2]))
def test_expect_B(self):
# test the timeout - it does NOT raise socket.timeout
want = ['hello', self.block_long, 'not seen', EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
(_,_,data) = telnet.expect(['not seen'], self.block_short)
self.assertEqual(data, want[0])
self.assertEqual(telnet.read_all(), 'not seen')
def test_expect_with_poll(self):
"""Use select.poll() to implement telnet.expect()."""
want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
if not telnet._has_poll:
raise unittest.SkipTest('select.poll() is required')
telnet._has_poll = True
self.dataq.join()
(_,_,data) = telnet.expect(['match'])
self.assertEqual(data, ''.join(want[:-2]))
def test_expect_with_select(self):
"""Use select.select() to implement telnet.expect()."""
want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
telnet._has_poll = False
self.dataq.join()
(_,_,data) = telnet.expect(['match'])
self.assertEqual(data, ''.join(want[:-2]))
def test_main(verbose=None):
test_support.run_unittest(GeneralTests, ReadTests, OptionTests)
test_support.run_unittest(GeneralTests, ReadTests, OptionTests,
ExpectTests)
if __name__ == '__main__':
test_main()