mirror of
https://github.com/python/cpython.git
synced 2025-07-19 09:15:34 +00:00

Passing the port as a string value works fine in regular mode, but if you turned debug on it would throw an error trying to print the port number, which is surprising and confusing.
359 lines
11 KiB
Python
359 lines
11 KiB
Python
import socket
|
|
import select
|
|
import telnetlib
|
|
import time
|
|
import contextlib
|
|
|
|
from unittest import TestCase
|
|
from test import support
|
|
threading = support.import_module('threading')
|
|
|
|
HOST = support.HOST
|
|
|
|
def server(evt, serv):
|
|
serv.listen(5)
|
|
evt.set()
|
|
try:
|
|
conn, addr = serv.accept()
|
|
except socket.timeout:
|
|
pass
|
|
finally:
|
|
serv.close()
|
|
conn.close()
|
|
evt.set()
|
|
|
|
class GeneralTests(TestCase):
|
|
|
|
def setUp(self):
|
|
self.evt = threading.Event()
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.sock.settimeout(3)
|
|
self.port = support.bind_port(self.sock)
|
|
self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
|
|
self.thread.start()
|
|
self.evt.wait()
|
|
self.evt.clear()
|
|
time.sleep(.1)
|
|
|
|
def tearDown(self):
|
|
self.evt.wait()
|
|
self.thread.join()
|
|
|
|
def testBasic(self):
|
|
# connects
|
|
telnet = telnetlib.Telnet(HOST, self.port)
|
|
telnet.sock.close()
|
|
|
|
def testTimeoutDefault(self):
|
|
self.assertTrue(socket.getdefaulttimeout() is None)
|
|
socket.setdefaulttimeout(30)
|
|
try:
|
|
telnet = telnetlib.Telnet("localhost", self.port)
|
|
finally:
|
|
socket.setdefaulttimeout(None)
|
|
self.assertEqual(telnet.sock.gettimeout(), 30)
|
|
telnet.sock.close()
|
|
|
|
def testTimeoutNone(self):
|
|
# None, having other default
|
|
self.assertTrue(socket.getdefaulttimeout() is None)
|
|
socket.setdefaulttimeout(30)
|
|
try:
|
|
telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
|
|
finally:
|
|
socket.setdefaulttimeout(None)
|
|
self.assertTrue(telnet.sock.gettimeout() is None)
|
|
telnet.sock.close()
|
|
|
|
def testTimeoutValue(self):
|
|
telnet = telnetlib.Telnet("localhost", self.port, timeout=30)
|
|
self.assertEqual(telnet.sock.gettimeout(), 30)
|
|
telnet.sock.close()
|
|
|
|
def testTimeoutOpen(self):
|
|
telnet = telnetlib.Telnet()
|
|
telnet.open("localhost", self.port, timeout=30)
|
|
self.assertEqual(telnet.sock.gettimeout(), 30)
|
|
telnet.sock.close()
|
|
|
|
class SocketStub(object):
|
|
''' a socket proxy that re-defines sendall() '''
|
|
def __init__(self, reads=[]):
|
|
self.reads = reads
|
|
self.writes = []
|
|
self.block = False
|
|
def sendall(self, data):
|
|
self.writes.append(data)
|
|
def recv(self, size):
|
|
out = b''
|
|
while self.reads and len(out) < size:
|
|
out += self.reads.pop(0)
|
|
if len(out) > size:
|
|
self.reads.insert(0, out[size:])
|
|
out = out[:size]
|
|
return out
|
|
|
|
class TelnetAlike(telnetlib.Telnet):
|
|
def fileno(self):
|
|
raise NotImplementedError()
|
|
def close(self): pass
|
|
def sock_avail(self):
|
|
return (not self.sock.block)
|
|
def msg(self, msg, *args):
|
|
with support.captured_stdout() as out:
|
|
telnetlib.Telnet.msg(self, msg, *args)
|
|
self._messages += out.getvalue()
|
|
return
|
|
|
|
def new_select(*s_args):
|
|
block = False
|
|
for l in s_args:
|
|
for fob in l:
|
|
if isinstance(fob, TelnetAlike):
|
|
block = fob.sock.block
|
|
if block:
|
|
return [[], [], []]
|
|
else:
|
|
return s_args
|
|
|
|
@contextlib.contextmanager
|
|
def test_socket(reads):
|
|
def new_conn(*ignored):
|
|
return SocketStub(reads)
|
|
try:
|
|
old_conn = socket.create_connection
|
|
socket.create_connection = new_conn
|
|
yield None
|
|
finally:
|
|
socket.create_connection = old_conn
|
|
return
|
|
|
|
def test_telnet(reads=[], cls=TelnetAlike):
|
|
''' return a telnetlib.Telnet object that uses a SocketStub with
|
|
reads queued up to be read '''
|
|
for x in reads:
|
|
assert type(x) is bytes, x
|
|
with test_socket(reads):
|
|
telnet = cls('dummy', 0)
|
|
telnet._messages = '' # debuglevel output
|
|
return telnet
|
|
|
|
class ReadTests(TestCase):
|
|
def setUp(self):
|
|
self.old_select = select.select
|
|
select.select = new_select
|
|
def tearDown(self):
|
|
select.select = self.old_select
|
|
|
|
def test_read_until(self):
|
|
"""
|
|
read_until(expected, timeout=None)
|
|
test the blocking version of read_util
|
|
"""
|
|
want = [b'xxxmatchyyy']
|
|
telnet = test_telnet(want)
|
|
data = telnet.read_until(b'match')
|
|
self.assertEqual(data, b'xxxmatch', msg=(telnet.cookedq, telnet.rawq, telnet.sock.reads))
|
|
|
|
reads = [b'x' * 50, b'match', b'y' * 50]
|
|
expect = b''.join(reads[:-1])
|
|
telnet = test_telnet(reads)
|
|
data = telnet.read_until(b'match')
|
|
self.assertEqual(data, expect)
|
|
|
|
|
|
def test_read_all(self):
|
|
"""
|
|
read_all()
|
|
Read all data until EOF; may block.
|
|
"""
|
|
reads = [b'x' * 500, b'y' * 500, b'z' * 500]
|
|
expect = b''.join(reads)
|
|
telnet = test_telnet(reads)
|
|
data = telnet.read_all()
|
|
self.assertEqual(data, expect)
|
|
return
|
|
|
|
def test_read_some(self):
|
|
"""
|
|
read_some()
|
|
Read at least one byte or EOF; may block.
|
|
"""
|
|
# test 'at least one byte'
|
|
telnet = test_telnet([b'x' * 500])
|
|
data = telnet.read_some()
|
|
self.assertTrue(len(data) >= 1)
|
|
# test EOF
|
|
telnet = test_telnet()
|
|
data = telnet.read_some()
|
|
self.assertEqual(b'', data)
|
|
|
|
def _read_eager(self, func_name):
|
|
"""
|
|
read_*_eager()
|
|
Read all data available already queued or on the socket,
|
|
without blocking.
|
|
"""
|
|
want = b'x' * 100
|
|
telnet = test_telnet([want])
|
|
func = getattr(telnet, func_name)
|
|
telnet.sock.block = True
|
|
self.assertEqual(b'', func())
|
|
telnet.sock.block = False
|
|
data = b''
|
|
while True:
|
|
try:
|
|
data += func()
|
|
except EOFError:
|
|
break
|
|
self.assertEqual(data, want)
|
|
|
|
def test_read_eager(self):
|
|
# read_eager and read_very_eager make the same gaurantees
|
|
# (they behave differently but we only test the gaurantees)
|
|
self._read_eager('read_eager')
|
|
self._read_eager('read_very_eager')
|
|
# NB -- we need to test the IAC block which is mentioned in the
|
|
# docstring but not in the module docs
|
|
|
|
def read_very_lazy(self):
|
|
want = b'x' * 100
|
|
telnet = test_telnet([want])
|
|
self.assertEqual(b'', telnet.read_very_lazy())
|
|
while telnet.sock.reads:
|
|
telnet.fill_rawq()
|
|
data = telnet.read_very_lazy()
|
|
self.assertEqual(want, data)
|
|
self.assertRaises(EOFError, telnet.read_very_lazy)
|
|
|
|
def test_read_lazy(self):
|
|
want = b'x' * 100
|
|
telnet = test_telnet([want])
|
|
self.assertEqual(b'', telnet.read_lazy())
|
|
data = b''
|
|
while True:
|
|
try:
|
|
read_data = telnet.read_lazy()
|
|
data += read_data
|
|
if not read_data:
|
|
telnet.fill_rawq()
|
|
except EOFError:
|
|
break
|
|
self.assertTrue(want.startswith(data))
|
|
self.assertEqual(data, want)
|
|
|
|
class nego_collector(object):
|
|
def __init__(self, sb_getter=None):
|
|
self.seen = b''
|
|
self.sb_getter = sb_getter
|
|
self.sb_seen = b''
|
|
|
|
def do_nego(self, sock, cmd, opt):
|
|
self.seen += cmd + opt
|
|
if cmd == tl.SE and self.sb_getter:
|
|
sb_data = self.sb_getter()
|
|
self.sb_seen += sb_data
|
|
|
|
tl = telnetlib
|
|
|
|
class WriteTests(TestCase):
|
|
'''The only thing that write does is replace each tl.IAC for
|
|
tl.IAC+tl.IAC'''
|
|
|
|
def test_write(self):
|
|
data_sample = [b'data sample without IAC',
|
|
b'data sample with' + tl.IAC + b' one IAC',
|
|
b'a few' + tl.IAC + tl.IAC + b' iacs' + tl.IAC,
|
|
tl.IAC,
|
|
b'']
|
|
for data in data_sample:
|
|
telnet = test_telnet()
|
|
telnet.write(data)
|
|
written = b''.join(telnet.sock.writes)
|
|
self.assertEqual(data.replace(tl.IAC,tl.IAC+tl.IAC), written)
|
|
|
|
class OptionTests(TestCase):
|
|
# RFC 854 commands
|
|
cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
|
|
|
|
def _test_command(self, data):
|
|
""" helper for testing IAC + cmd """
|
|
telnet = test_telnet(data)
|
|
data_len = len(b''.join(data))
|
|
nego = nego_collector()
|
|
telnet.set_option_negotiation_callback(nego.do_nego)
|
|
txt = telnet.read_all()
|
|
cmd = nego.seen
|
|
self.assertTrue(len(cmd) > 0) # we expect at least one command
|
|
self.assertIn(cmd[:1], self.cmds)
|
|
self.assertEqual(cmd[1:2], tl.NOOPT)
|
|
self.assertEqual(data_len, len(txt + cmd))
|
|
nego.sb_getter = None # break the nego => telnet cycle
|
|
|
|
def test_IAC_commands(self):
|
|
for cmd in self.cmds:
|
|
self._test_command([tl.IAC, cmd])
|
|
self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100])
|
|
self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10])
|
|
# all at once
|
|
self._test_command([tl.IAC + cmd for (cmd) in self.cmds])
|
|
|
|
def test_SB_commands(self):
|
|
# RFC 855, subnegotiations portion
|
|
send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
|
|
tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
|
|
tl.IAC + tl.SB + tl.IAC + tl.IAC + b'aa' + tl.IAC + tl.SE,
|
|
tl.IAC + tl.SB + b'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
|
|
tl.IAC + tl.SB + b'cc' + tl.IAC + tl.IAC + b'dd' + tl.IAC + tl.SE,
|
|
]
|
|
telnet = test_telnet(send)
|
|
nego = nego_collector(telnet.read_sb_data)
|
|
telnet.set_option_negotiation_callback(nego.do_nego)
|
|
txt = telnet.read_all()
|
|
self.assertEqual(txt, b'')
|
|
want_sb_data = tl.IAC + tl.IAC + b'aabb' + tl.IAC + b'cc' + tl.IAC + b'dd'
|
|
self.assertEqual(nego.sb_seen, want_sb_data)
|
|
self.assertEqual(b'', telnet.read_sb_data())
|
|
nego.sb_getter = None # break the nego => telnet cycle
|
|
|
|
def test_debuglevel_reads(self):
|
|
# test all the various places that self.msg(...) is called
|
|
given_a_expect_b = [
|
|
# Telnet.fill_rawq
|
|
(b'a', ": recv b''\n"),
|
|
# Telnet.process_rawq
|
|
(tl.IAC + bytes([88]), ": IAC 88 not recognized\n"),
|
|
(tl.IAC + tl.DO + bytes([1]), ": IAC DO 1\n"),
|
|
(tl.IAC + tl.DONT + bytes([1]), ": IAC DONT 1\n"),
|
|
(tl.IAC + tl.WILL + bytes([1]), ": IAC WILL 1\n"),
|
|
(tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"),
|
|
]
|
|
for a, b in given_a_expect_b:
|
|
telnet = test_telnet([a])
|
|
telnet.set_debuglevel(1)
|
|
txt = telnet.read_all()
|
|
self.assertIn(b, telnet._messages)
|
|
return
|
|
|
|
def test_debuglevel_write(self):
|
|
telnet = test_telnet()
|
|
telnet.set_debuglevel(1)
|
|
telnet.write(b'xxx')
|
|
expected = "send b'xxx'\n"
|
|
self.assertIn(expected, telnet._messages)
|
|
|
|
def test_debug_accepts_str_port(self):
|
|
# Issue 10695
|
|
with test_socket([]):
|
|
telnet = TelnetAlike('dummy', '0')
|
|
telnet._messages = ''
|
|
telnet.set_debuglevel(1)
|
|
telnet.msg('test')
|
|
self.assertRegex(telnet._messages, r'0.*test')
|
|
|
|
|
|
def test_main(verbose=None):
|
|
support.run_unittest(GeneralTests, ReadTests, WriteTests, OptionTests)
|
|
|
|
if __name__ == '__main__':
|
|
test_main()
|