Merged revisions 74638,76133 via svnmerge from

svn+ssh://pythondev@svn.python.org/python/branches/py3k

........
  r74638 | jack.diederich | 2009-09-03 16:37:58 -0400 (Thu, 03 Sep 2009) | 4 lines

  - apply issue 6582 to test all the write methods of telnetlib
  - add patch author to ACKS
  - possibly fix test timeouts on non-linux platforms
........
  r76133 | jack.diederich | 2009-11-06 12:20:42 -0500 (Fri, 06 Nov 2009) | 3 lines

  - issue #6748 intermittent test failures from sockets
  - telnetlib tests now use mock sockets for most tests
........
This commit is contained in:
Jack Diederich 2009-11-06 21:14:59 +00:00
parent a954607cd3
commit 1fa0c3f5bf
2 changed files with 157 additions and 224 deletions

View file

@ -1,41 +1,20 @@
import socket import socket
import select
import threading import threading
import telnetlib import telnetlib
import time import time
import queue import contextlib
import sys
import io
from unittest import TestCase from unittest import TestCase
from test import support from test import support
HOST = support.HOST HOST = support.HOST
EOF_sigil = object()
def server(evt, serv, dataq=None): def server(evt, serv):
""" Open a tcp server in three steps
1) set evt to true to let the parent know we are ready
2) [optional] if is not False, write the list of data from dataq.get()
to the socket.
3) set evt to true to let the parent know we're done
"""
serv.listen(5) serv.listen(5)
evt.set() evt.set()
try: try:
conn, addr = serv.accept() conn, addr = serv.accept()
if dataq:
data = b''
new_data = dataq.get(True, 0.5)
dataq.task_done()
for item in new_data:
if item == EOF_sigil:
break
if type(item) in [int, float]:
time.sleep(item)
else:
data += item
written = conn.send(data)
data = data[written:]
except socket.timeout: except socket.timeout:
pass pass
finally: finally:
@ -96,163 +75,159 @@ class GeneralTests(TestCase):
self.assertEqual(telnet.sock.gettimeout(), 30) self.assertEqual(telnet.sock.gettimeout(), 30)
telnet.sock.close() telnet.sock.close()
def _read_setUp(self): class SocketStub(object):
self.evt = threading.Event() ''' a socket proxy that re-defines sendall() '''
self.dataq = queue.Queue() def __init__(self, reads=[]):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.reads = reads
self.sock.settimeout(3) self.writes = []
self.port = support.bind_port(self.sock) self.block = False
self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq)) def sendall(self, data):
self.thread.start() self.writes.append(data)
self.evt.wait() def recv(self, size):
self.evt.clear() out = b''
time.sleep(.1) while self.reads and len(out) < size:
out += self.reads.pop(0)
if len(out) > size:
self.reads.insert(0, out[size:])
out = out[:size]
return out
def _read_tearDown(self): class TelnetAlike(telnetlib.Telnet):
self.evt.wait() def fileno(self):
self.thread.join() raise NotImplementedError()
def close(self): pass
def sock_avail(self):
return (not self.sock.block)
def msg(self, msg, *args):
with support.captured_stdout() as out:
telnetlib.Telnet.msg(self, msg, *args)
self._messages += out.getvalue()
return
def new_select(*s_args):
block = False
for l in s_args:
for fob in l:
if isinstance(fob, TelnetAlike):
block = fob.sock.block
if block:
return [[], [], []]
else:
return s_args
@contextlib.contextmanager
def test_socket(reads):
def new_conn(*ignored):
return SocketStub(reads)
try:
old_conn = socket.create_connection
socket.create_connection = new_conn
yield None
finally:
socket.create_connection = old_conn
return
def test_telnet(reads=[], cls=TelnetAlike):
''' return a telnetlib.Telnet object that uses a SocketStub with
reads queued up to be read '''
for x in reads:
assert type(x) is bytes, x
with test_socket(reads):
telnet = cls('dummy', 0)
telnet._messages = '' # debuglevel output
return telnet
class ReadTests(TestCase): class ReadTests(TestCase):
setUp = _read_setUp def setUp(self):
tearDown = _read_tearDown self.old_select = select.select
select.select = new_select
def tearDown(self):
select.select = self.old_select
# use a similar approach to testing timeouts as test_timeout.py def test_read_until(self):
# these will never pass 100% but make the fuzz big enough that it is rare
block_long = 0.6
block_short = 0.3
def test_read_until_A(self):
""" """
read_until(expected, [timeout]) read_until(expected, timeout=None)
Read until the expected string has been seen, or a timeout is test the blocking version of read_util
hit (default is no timeout); may block.
""" """
want = [b'x' * 10, b'match', b'y' * 10, EOF_sigil] want = [b'xxxmatchyyy']
self.dataq.put(want) telnet = test_telnet(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
data = telnet.read_until(b'match') data = telnet.read_until(b'match')
self.assertEqual(data, b''.join(want[:-2])) self.assertEqual(data, b'xxxmatch', msg=(telnet.cookedq, telnet.rawq, telnet.sock.reads))
def test_read_until_B(self): reads = [b'x' * 50, b'match', b'y' * 50]
# test the timeout - it does NOT raise socket.timeout expect = b''.join(reads[:-1])
want = [b'hello', self.block_long, b'not seen', EOF_sigil] telnet = test_telnet(reads)
self.dataq.put(want) data = telnet.read_until(b'match')
telnet = telnetlib.Telnet(HOST, self.port) self.assertEqual(data, expect)
self.dataq.join()
data = telnet.read_until(b'not seen', self.block_short)
self.assertEqual(data, want[0])
self.assertEqual(telnet.read_all(), b'not seen')
def test_read_all_A(self):
def test_read_all(self):
""" """
read_all() read_all()
Read all data until EOF; may block. Read all data until EOF; may block.
""" """
want = [b'x' * 500, b'y' * 500, b'z' * 500, EOF_sigil] reads = [b'x' * 500, b'y' * 500, b'z' * 500]
self.dataq.put(want) expect = b''.join(reads)
telnet = telnetlib.Telnet(HOST, self.port) telnet = test_telnet(reads)
self.dataq.join()
data = telnet.read_all() data = telnet.read_all()
self.assertEqual(data, b''.join(want[:-1])) self.assertEqual(data, expect)
return return
def _test_blocking(self, func): def test_read_some(self):
self.dataq.put([self.block_long, EOF_sigil])
self.dataq.join()
start = time.time()
data = func()
self.assertTrue(self.block_short <= time.time() - start)
def test_read_all_B(self):
self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
def test_read_all_C(self):
self.dataq.put([EOF_sigil])
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
telnet.read_all()
telnet.read_all() # shouldn't raise
def test_read_some_A(self):
""" """
read_some() read_some()
Read at least one byte or EOF; may block. Read at least one byte or EOF; may block.
""" """
# test 'at least one byte' # test 'at least one byte'
want = [b'x' * 500, EOF_sigil] telnet = test_telnet([b'x' * 500])
self.dataq.put(want) data = telnet.read_some()
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
data = telnet.read_all()
self.assertTrue(len(data) >= 1) self.assertTrue(len(data) >= 1)
def test_read_some_B(self):
# test EOF # test EOF
self.dataq.put([EOF_sigil]) telnet = test_telnet()
telnet = telnetlib.Telnet(HOST, self.port) data = telnet.read_some()
self.dataq.join() self.assertEqual(b'', data)
self.assertEqual(b'', telnet.read_some())
def test_read_some_C(self): def _read_eager(self, func_name):
self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
def _test_read_any_eager_A(self, func_name):
""" """
read_very_eager() read_*_eager()
Read all data available already queued or on the socket, Read all data available already queued or on the socket,
without blocking. without blocking.
""" """
want = [self.block_long, b'x' * 100, b'y' * 100, EOF_sigil] want = b'x' * 100
expects = want[1] + want[2] telnet = test_telnet([want])
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
func = getattr(telnet, func_name) func = getattr(telnet, func_name)
telnet.sock.block = True
self.assertEqual(b'', func())
telnet.sock.block = False
data = b'' data = b''
while True: while True:
try: try:
data += func() data += func()
self.assertTrue(expects.startswith(data))
except EOFError: except EOFError:
break break
self.assertEqual(expects, data) self.assertEqual(data, want)
def _test_read_any_eager_B(self, func_name):
# test EOF
self.dataq.put([EOF_sigil])
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
time.sleep(self.block_short)
func = getattr(telnet, func_name)
self.assertRaises(EOFError, func)
def test_read_eager(self):
# read_eager and read_very_eager make the same gaurantees # read_eager and read_very_eager make the same gaurantees
# (they behave differently but we only test the gaurantees) # (they behave differently but we only test the gaurantees)
def test_read_very_eager_A(self): self._read_eager('read_eager')
self._test_read_any_eager_A('read_very_eager') self._read_eager('read_very_eager')
def test_read_very_eager_B(self): # NB -- we need to test the IAC block which is mentioned in the
self._test_read_any_eager_B('read_very_eager') # docstring but not in the module docs
def test_read_eager_A(self):
self._test_read_any_eager_A('read_eager')
def test_read_eager_B(self):
self._test_read_any_eager_B('read_eager')
# NB -- we need to test the IAC block which is mentioned in the docstring
# but not in the module docs
def _test_read_any_lazy_B(self, func_name): def read_very_lazy(self):
self.dataq.put([EOF_sigil]) want = b'x' * 100
telnet = telnetlib.Telnet(HOST, self.port) telnet = test_telnet([want])
self.dataq.join() self.assertEqual(b'', telnet.read_very_lazy())
func = getattr(telnet, func_name) while telnet.sock.reads:
telnet.fill_rawq() telnet.fill_rawq()
self.assertRaises(EOFError, func) data = telnet.read_very_lazy()
self.assertEqual(want, data)
self.assertRaises(EOFError, telnet.read_very_lazy)
def test_read_lazy_A(self): def test_read_lazy(self):
want = [b'x' * 100, EOF_sigil] want = b'x' * 100
self.dataq.put(want) telnet = test_telnet([want])
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
time.sleep(self.block_short)
self.assertEqual(b'', telnet.read_lazy()) self.assertEqual(b'', telnet.read_lazy())
data = b'' data = b''
while True: while True:
@ -263,35 +238,8 @@ class ReadTests(TestCase):
telnet.fill_rawq() telnet.fill_rawq()
except EOFError: except EOFError:
break break
self.assertTrue(want[0].startswith(data)) self.assertTrue(want.startswith(data))
self.assertEqual(data, want[0]) self.assertEqual(data, want)
def test_read_lazy_B(self):
self._test_read_any_lazy_B('read_lazy')
def test_read_very_lazy_A(self):
want = [b'x' * 100, EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
time.sleep(self.block_short)
self.assertEqual(b'', telnet.read_very_lazy())
data = b''
while True:
try:
read_data = telnet.read_very_lazy()
except EOFError:
break
data += read_data
if not read_data:
telnet.fill_rawq()
self.assertEqual(b'', telnet.cookedq)
telnet.process_rawq()
self.assertTrue(want[0].startswith(data))
self.assertEqual(data, want[0])
def test_read_very_lazy_B(self):
self._test_read_any_lazy_B('read_very_lazy')
class nego_collector(object): class nego_collector(object):
def __init__(self, sb_getter=None): def __init__(self, sb_getter=None):
@ -307,31 +255,30 @@ class nego_collector(object):
tl = telnetlib tl = telnetlib
class TelnetDebuglevel(tl.Telnet): class WriteTests(TestCase):
''' Telnet-alike that captures messages written to stdout when '''The only thing that write does is replace each tl.IAC for
debuglevel > 0 tl.IAC+tl.IAC'''
'''
_messages = '' def test_write(self):
def msg(self, msg, *args): data_sample = [b'data sample without IAC',
orig_stdout = sys.stdout b'data sample with' + tl.IAC + b' one IAC',
sys.stdout = fake_stdout = io.StringIO() b'a few' + tl.IAC + tl.IAC + b' iacs' + tl.IAC,
tl.Telnet.msg(self, msg, *args) tl.IAC,
self._messages += fake_stdout.getvalue() b'']
sys.stdout = orig_stdout for data in data_sample:
return telnet = test_telnet()
telnet.write(data)
written = b''.join(telnet.sock.writes)
self.assertEqual(data.replace(tl.IAC,tl.IAC+tl.IAC), written)
class OptionTests(TestCase): class OptionTests(TestCase):
setUp = _read_setUp
tearDown = _read_tearDown
# RFC 854 commands # RFC 854 commands
cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP] cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
def _test_command(self, data): def _test_command(self, data):
""" helper for testing IAC + cmd """ """ helper for testing IAC + cmd """
self.setUp() telnet = test_telnet(data)
self.dataq.put(data) data_len = len(b''.join(data))
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
nego = nego_collector() nego = nego_collector()
telnet.set_option_negotiation_callback(nego.do_nego) telnet.set_option_negotiation_callback(nego.do_nego)
txt = telnet.read_all() txt = telnet.read_all()
@ -339,24 +286,16 @@ class OptionTests(TestCase):
self.assertTrue(len(cmd) > 0) # we expect at least one command self.assertTrue(len(cmd) > 0) # we expect at least one command
self.assertTrue(cmd[:1] in self.cmds) self.assertTrue(cmd[:1] in self.cmds)
self.assertEqual(cmd[1:2], tl.NOOPT) self.assertEqual(cmd[1:2], tl.NOOPT)
self.assertEqual(len(b''.join(data[:-1])), len(txt + cmd)) self.assertEqual(data_len, len(txt + cmd))
nego.sb_getter = None # break the nego => telnet cycle nego.sb_getter = None # break the nego => telnet cycle
self.tearDown()
def test_IAC_commands(self): def test_IAC_commands(self):
# reset our setup
self.dataq.put([EOF_sigil])
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
self.tearDown()
for cmd in self.cmds: for cmd in self.cmds:
self._test_command([tl.IAC, cmd, EOF_sigil]) self._test_command([tl.IAC, cmd])
self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100, EOF_sigil]) self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100])
self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10, EOF_sigil]) self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10])
# all at once # all at once
self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil]) self._test_command([tl.IAC + cmd for (cmd) in self.cmds])
self.assertEqual(b'', telnet.read_sb_data())
def test_SB_commands(self): def test_SB_commands(self):
# RFC 855, subnegotiations portion # RFC 855, subnegotiations portion
@ -365,11 +304,8 @@ class OptionTests(TestCase):
tl.IAC + tl.SB + tl.IAC + tl.IAC + b'aa' + tl.IAC + tl.SE, tl.IAC + tl.SB + tl.IAC + tl.IAC + b'aa' + tl.IAC + tl.SE,
tl.IAC + tl.SB + b'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE, tl.IAC + tl.SB + b'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
tl.IAC + tl.SB + b'cc' + tl.IAC + tl.IAC + b'dd' + tl.IAC + tl.SE, tl.IAC + tl.SB + b'cc' + tl.IAC + tl.IAC + b'dd' + tl.IAC + tl.SE,
EOF_sigil,
] ]
self.dataq.put(send) telnet = test_telnet(send)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
nego = nego_collector(telnet.read_sb_data) nego = nego_collector(telnet.read_sb_data)
telnet.set_option_negotiation_callback(nego.do_nego) telnet.set_option_negotiation_callback(nego.do_nego)
txt = telnet.read_all() txt = telnet.read_all()
@ -379,19 +315,7 @@ class OptionTests(TestCase):
self.assertEqual(b'', telnet.read_sb_data()) self.assertEqual(b'', telnet.read_sb_data())
nego.sb_getter = None # break the nego => telnet cycle nego.sb_getter = None # break the nego => telnet cycle
def _test_debuglevel(self, data, expected_msg): def test_debuglevel_reads(self):
""" helper for testing debuglevel messages """
self.setUp()
self.dataq.put(data)
telnet = TelnetDebuglevel(HOST, self.port)
telnet.set_debuglevel(1)
self.dataq.join()
txt = telnet.read_all()
self.assertTrue(expected_msg in telnet._messages,
msg=(telnet._messages, expected_msg))
self.tearDown()
def test_debuglevel(self):
# test all the various places that self.msg(...) is called # test all the various places that self.msg(...) is called
given_a_expect_b = [ given_a_expect_b = [
# Telnet.fill_rawq # Telnet.fill_rawq
@ -402,15 +326,23 @@ class OptionTests(TestCase):
(tl.IAC + tl.DONT + bytes([1]), ": IAC DONT 1\n"), (tl.IAC + tl.DONT + bytes([1]), ": IAC DONT 1\n"),
(tl.IAC + tl.WILL + bytes([1]), ": IAC WILL 1\n"), (tl.IAC + tl.WILL + bytes([1]), ": IAC WILL 1\n"),
(tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"), (tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"),
# Telnet.write
# XXX, untested
] ]
for a, b in given_a_expect_b: for a, b in given_a_expect_b:
self._test_debuglevel([a, EOF_sigil], b) telnet = test_telnet([a])
telnet.set_debuglevel(1)
txt = telnet.read_all()
self.assertTrue(b in telnet._messages)
return return
def test_debuglevel_write(self):
telnet = test_telnet()
telnet.set_debuglevel(1)
telnet.write(b'xxx')
expected = "send b'xxx'\n"
self.assertTrue(expected in telnet._messages)
def test_main(verbose=None): def test_main(verbose=None):
support.run_unittest(GeneralTests, ReadTests, OptionTests) support.run_unittest(GeneralTests, ReadTests, WriteTests, OptionTests)
if __name__ == '__main__': if __name__ == '__main__':
test_main() test_main()

View file

@ -774,6 +774,7 @@ Wojtek Walczak
Charles Waldman Charles Waldman
Richard Walker Richard Walker
Larry Wall Larry Wall
Rodrigo Steinmuller Wanderley
Greg Ward Greg Ward
Barry Warsaw Barry Warsaw
Steve Waterbury Steve Waterbury