New test suite for the socket module by Michael Gilfix.

Changed test_timeout.py to conform to the guidelines in Lib/test/README.
This commit is contained in:
Guido van Rossum 2002-06-12 19:18:08 +00:00
parent e3fdc975c0
commit 24e4af8c72
3 changed files with 481 additions and 175 deletions

View file

@ -1,3 +0,0 @@
test_socket
socket.error
23

View file

@ -1,36 +1,171 @@
# Not tested: #!/usr/bin/env python
# socket.fromfd()
# sktobj.getsockopt()
# sktobj.recvfrom()
# sktobj.sendto()
# sktobj.setblocking()
# sktobj.setsockopt()
# sktobj.shutdown()
import unittest
import test_support
from test_support import verbose, TestFailed
import socket import socket
import os import select
import time import time
import thread, threading
import Queue
def missing_ok(str): PORT = 50007
HOST = 'localhost'
MSG = 'Michael Gilfix was here\n'
class SocketTCPTest(unittest.TestCase):
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.serv.bind((HOST, PORT))
self.serv.listen(1)
def tearDown(self):
self.serv.close()
self.serv = None
class SocketUDPTest(unittest.TestCase):
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.serv.bind((HOST, PORT))
def tearDown(self):
self.serv.close()
self.serv = None
class ThreadableTest:
def __init__(self):
# Swap the true setup function
self.__setUp = self.setUp
self.__tearDown = self.tearDown
self.setUp = self._setUp
self.tearDown = self._tearDown
def _setUp(self):
self.ready = threading.Event()
self.done = threading.Event()
self.queue = Queue.Queue(1)
# Do some munging to start the client test.
test_method = getattr(self, ''.join(('_', self._TestCase__testMethodName)))
self.client_thread = thread.start_new_thread(self.clientRun, (test_method, ))
self.__setUp()
self.ready.wait()
def _tearDown(self):
self.__tearDown()
self.done.wait()
if not self.queue.empty():
msg = self.queue.get()
self.fail(msg)
def clientRun(self, test_func):
self.ready.set()
self.clientSetUp()
if not callable(test_func):
raise TypeError, "test_func must be a callable function"
try: try:
getattr(socket, str) test_func()
except AttributeError: except Exception, strerror:
pass self.queue.put(strerror)
self.clientTearDown()
try: raise socket.error def clientSetUp(self):
except socket.error: print "socket.error" raise NotImplementedError, "clientSetUp must be implemented."
socket.AF_INET def clientTearDown(self):
self.done.set()
thread.exit()
socket.SOCK_STREAM class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
socket.SOCK_DGRAM
socket.SOCK_RAW
socket.SOCK_RDM
socket.SOCK_SEQPACKET
for optional in ("AF_UNIX", def __init__(self, methodName='runTest'):
SocketTCPTest.__init__(self, methodName=methodName)
ThreadableTest.__init__(self)
def clientSetUp(self):
self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def clientTearDown(self):
self.cli.close()
self.cli = None
ThreadableTest.clientTearDown(self)
class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
def __init__(self, methodName='runTest'):
SocketUDPTest.__init__(self, methodName=methodName)
ThreadableTest.__init__(self)
def clientSetUp(self):
self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
class SocketConnectedTest(ThreadedTCPSocketTest):
def __init__(self, methodName='runTest'):
ThreadedTCPSocketTest.__init__(self, methodName=methodName)
def setUp(self):
ThreadedTCPSocketTest.setUp(self)
conn, addr = self.serv.accept()
self.cli_conn = conn
def tearDown(self):
self.cli_conn.close()
self.cli_conn = None
ThreadedTCPSocketTest.tearDown(self)
def clientSetUp(self):
ThreadedTCPSocketTest.clientSetUp(self)
self.cli.connect((HOST, PORT))
self.serv_conn = self.cli
def clientTearDown(self):
self.serv_conn.close()
self.serv_conn = None
ThreadedTCPSocketTest.clientTearDown(self)
#######################################################################
## Begin Tests
class GeneralModuleTests(unittest.TestCase):
def testSocketError(self):
"""Testing that socket module exceptions."""
def raise_error(*args, **kwargs):
raise socket.error
def raise_herror(*args, **kwargs):
raise socket.herror
def raise_gaierror(*args, **kwargs):
raise socket.gaierror
self.failUnlessRaises(socket.error, raise_error,
"Error raising socket exception.")
self.failUnlessRaises(socket.error, raise_herror,
"Error raising socket exception.")
self.failUnlessRaises(socket.error, raise_gaierror,
"Error raising socket exception.")
def testCrucialConstants(self):
"""Testing for mission critical constants."""
socket.AF_INET
socket.SOCK_STREAM
socket.SOCK_DGRAM
socket.SOCK_RAW
socket.SOCK_RDM
socket.SOCK_SEQPACKET
socket.SOL_SOCKET
socket.SO_REUSEADDR
def testNonCrucialConstants(self):
"""Testing for existance of non-crucial constants."""
for const in (
"AF_UNIX",
"SO_DEBUG", "SO_ACCEPTCONN", "SO_REUSEADDR", "SO_KEEPALIVE", "SO_DEBUG", "SO_ACCEPTCONN", "SO_REUSEADDR", "SO_KEEPALIVE",
"SO_DONTROUTE", "SO_BROADCAST", "SO_USELOOPBACK", "SO_LINGER", "SO_DONTROUTE", "SO_BROADCAST", "SO_USELOOPBACK", "SO_LINGER",
@ -63,146 +198,318 @@ for optional in ("AF_UNIX",
"IP_MULTICAST_LOOP", "IP_ADD_MEMBERSHIP", "IP_MULTICAST_LOOP", "IP_ADD_MEMBERSHIP",
"IP_DROP_MEMBERSHIP", "IP_DROP_MEMBERSHIP",
): ):
missing_ok(optional)
socktype = socket.SocketType
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
all_host_names = [hname] + aliases
if verbose:
print hostname
print ip
print hname, aliases, ipaddrs
print all_host_names
for name in all_host_names:
if name.find('.'):
break
else:
print 'FQDN not found'
if hasattr(socket, 'getservbyname'):
print socket.getservbyname('telnet', 'tcp')
try: try:
socket.getservbyname('telnet', 'udp') getattr(socket, const)
except socket.error: except AttributeError:
pass pass
import sys def testHostnameRes(self):
if not sys.platform.startswith('java'): """Testing hostname resolution mechanisms."""
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
self.assert_(ip.find('.') >= 0, "Error resolving host to ip.")
hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
all_host_names = [hname] + aliases
fqhn = socket.getfqdn()
if not fqhn in all_host_names:
self.fail("Error testing host resolution mechanisms.")
def testJavaRef(self):
"""Testing reference count for getnameinfo."""
import sys
if not sys.platform.startswith('java'):
try: try:
# On some versions, this loses a reference # On some versions, this loses a reference
orig = sys.getrefcount(__name__) orig = sys.getrefcount(__name__)
socket.getnameinfo(__name__,0) socket.getnameinfo(__name__,0)
except SystemError: except SystemError:
if sys.getrefcount(__name__) <> orig: if sys.getrefcount(__name__) <> orig:
raise TestFailed,"socket.getnameinfo loses a reference" self.fail("socket.getnameinfo loses a reference")
try: def testInterpreterCrash(self):
"""Making sure getnameinfo doesn't crash the interpreter."""
try:
# On some versions, this crashes the interpreter. # On some versions, this crashes the interpreter.
socket.getnameinfo(('x', 0, 0, 0), 0) socket.getnameinfo(('x', 0, 0, 0), 0)
except socket.error: except socket.error:
pass pass
canfork = hasattr(os, 'fork') def testGetServByName(self):
try: """Testing getservbyname."""
PORT = 50007 if hasattr(socket, 'getservbyname'):
msg = 'socket test\n' socket.getservbyname('telnet', 'tcp')
if not canfork or os.fork(): try:
# parent is server socket.getservbyname('telnet', 'udp')
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) except socket.error:
s.bind(("127.0.0.1", PORT)) pass
s.listen(1)
if verbose:
print 'parent accepting'
if canfork:
conn, addr = s.accept()
if verbose:
print 'connected by', addr
# couple of interesting tests while we've got a live socket
f = conn.fileno()
if verbose:
print 'fileno:', f
p = conn.getpeername()
if verbose:
print 'peer:', p
n = conn.getsockname()
if verbose:
print 'sockname:', n
f = conn.makefile()
if verbose:
print 'file obj:', f
data = conn.recv(1024)
if verbose:
print 'received:', data
conn.sendall(data)
# Perform a few tests on the windows file object def testSockName(self):
if verbose: """Testing getsockname()."""
print "Staring _fileobject tests..." sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
f = socket._fileobject (conn, 'rb', 8192) name = sock.getsockname()
first_seg = f.read(7)
second_seg = f.read(5)
if not first_seg == 'socket ' or not second_seg == 'test\n':
print "Error performing read with the python _fileobject class"
os._exit (1)
elif verbose:
print "_fileobject buffered read works"
f.write (data)
f.flush ()
def testGetSockOpt(self):
"""Testing getsockopt()."""
# We know a socket should start without reuse==0
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
self.assert_(reuse == 0, "Error performing getsockopt.")
def testSetSockOpt(self):
"""Testing setsockopt()."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
self.assert_(reuse == 1, "Error performing setsockopt.")
class BasicTCPTest(SocketConnectedTest):
def __init__(self, methodName='runTest'):
SocketConnectedTest.__init__(self, methodName=methodName)
def testRecv(self):
"""Testing large receive over TCP."""
msg = self.cli_conn.recv(1024)
self.assertEqual(msg, MSG, "Error performing recv.")
def _testRecv(self):
self.serv_conn.send(MSG)
def testOverFlowRecv(self):
"""Testing receive in chunks over TCP."""
seg1 = self.cli_conn.recv(len(MSG) - 3)
seg2 = self.cli_conn.recv(1024)
msg = ''.join ((seg1, seg2))
self.assertEqual(msg, MSG, "Error performing recv in chunks.")
def _testOverFlowRecv(self):
self.serv_conn.send(MSG)
def testRecvFrom(self):
"""Testing large recvfrom() over TCP."""
msg, addr = self.cli_conn.recvfrom(1024)
hostname, port = addr
self.assertEqual (hostname, socket.gethostbyname('localhost'),
"Wrong address from recvfrom.")
self.assertEqual(msg, MSG, "Error performing recvfrom.")
def _testRecvFrom(self):
self.serv_conn.send(MSG)
def testOverFlowRecvFrom(self):
"""Testing recvfrom() in chunks over TCP."""
seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
seg2, addr = self.cli_conn.recvfrom(1024)
msg = ''.join((seg1, seg2))
hostname, port = addr
self.assertEqual(hostname, socket.gethostbyname('localhost'),
"Wrong address from recvfrom.")
self.assertEqual(msg, MSG, "Error performing recvfrom in chunks.")
def _testOverFlowRecvFrom(self):
self.serv_conn.send(MSG)
def testSendAll(self):
"""Testing sendall() with a 2048 byte string over TCP."""
while 1:
read = self.cli_conn.recv(1024)
if not read:
break
self.assert_(len(read) == 1024, "Error performing sendall.")
read = filter(lambda x: x == 'f', read)
self.assert_(len(read) == 1024, "Error performing sendall.")
def _testSendAll(self):
big_chunk = ''.join([ 'f' ] * 2048)
self.serv_conn.sendall(big_chunk)
def testFromFd(self):
"""Testing fromfd()."""
fd = self.cli_conn.fileno()
sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
msg = sock.recv(1024)
self.assertEqual(msg, MSG, "Error creating socket using fromfd.")
def _testFromFd(self):
self.serv_conn.send(MSG)
def testShutdown(self):
"""Testing shutdown()."""
msg = self.cli_conn.recv(1024)
self.assertEqual(msg, MSG, "Error testing shutdown.")
def _testShutdown(self):
self.serv_conn.send(MSG)
self.serv_conn.shutdown(2)
class BasicUDPTest(ThreadedUDPSocketTest):
def __init__(self, methodName='runTest'):
ThreadedUDPSocketTest.__init__(self, methodName=methodName)
def testSendtoAndRecv(self):
"""Testing sendto() and Recv() over UDP."""
msg = self.serv.recv(len(MSG))
self.assertEqual(msg, MSG, "Error performing sendto")
def _testSendtoAndRecv(self):
self.cli.sendto(MSG, 0, (HOST, PORT))
def testRecvfrom(self):
"""Testing recfrom() over UDP."""
msg, addr = self.serv.recvfrom(len(MSG))
hostname, port = addr
self.assertEqual(hostname, socket.gethostbyname('localhost'),
"Wrong address from recvfrom.")
self.assertEqual(msg, MSG, "Error performing recvfrom in chunks.")
def _testRecvfrom(self):
self.cli.sendto(MSG, 0, (HOST, PORT))
class NonBlockingTCPTests(ThreadedTCPSocketTest):
def __init__(self, methodName='runTest'):
ThreadedTCPSocketTest.__init__(self, methodName=methodName)
def testSetBlocking(self):
"""Testing whether set blocking works."""
self.serv.setblocking(0)
start = time.time()
try:
self.serv.accept()
except socket.error:
pass
end = time.time()
self.assert_((end - start) < 1.0, "Error setting non-blocking mode.")
def _testSetBlocking(self):
pass
def testAccept(self):
"""Testing non-blocking accept."""
self.serv.setblocking(0)
try:
conn, addr = self.serv.accept()
except socket.error:
pass
else:
self.fail("Error trying to do non-blocking accept.")
read, write, err = select.select([self.serv], [], [])
if self.serv in read:
conn, addr = self.serv.accept()
else:
self.fail("Error trying to do accept after select.")
def _testAccept(self):
time.sleep(1)
self.cli.connect((HOST, PORT))
def testConnect(self):
"""Testing non-blocking connect."""
time.sleep(1)
conn, addr = self.serv.accept()
def _testConnect(self):
self.cli.setblocking(0)
try:
self.cli.connect((HOST, PORT))
except socket.error:
pass
else:
self.fail("Error trying to do non-blocking connect.")
read, write, err = select.select([self.cli], [], [])
if self.cli in read:
self.cli.connect((HOST, PORT))
else:
self.fail("Error trying to do connect after select.")
def testRecv(self):
"""Testing non-blocking recv."""
conn, addr = self.serv.accept()
conn.setblocking(0)
try:
msg = conn.recv(len(MSG))
except socket.error:
pass
else:
self.fail("Error trying to do non-blocking recv.")
read, write, err = select.select([conn], [], [])
if conn in read:
msg = conn.recv(len(MSG))
self.assertEqual(msg, MSG, "Error performing non-blocking recv.")
else:
self.fail("Error during select call to non-blocking socket.")
def _testRecv(self):
self.cli.connect((HOST, PORT))
time.sleep(1)
self.cli.send(MSG)
class FileObjectClassTestCase(SocketConnectedTest):
def __init__(self, methodName='runTest'):
SocketConnectedTest.__init__(self, methodName=methodName)
def setUp(self):
SocketConnectedTest.setUp(self)
self.serv_file = socket._fileobject(self.cli_conn, 'rb', 8192)
def tearDown(self):
self.serv_file.close()
self.serv_file = None
SocketConnectedTest.tearDown(self)
def clientSetUp(self):
SocketConnectedTest.clientSetUp(self)
self.cli_file = socket._fileobject(self.serv_conn, 'rb', 8192)
def clientTearDown(self):
self.cli_file.close()
self.cli_file = None
SocketConnectedTest.clientTearDown(self)
def testSmallRead(self):
"""Performing small file read test."""
first_seg = self.serv_file.read(len(MSG)-3)
second_seg = self.serv_file.read(3)
msg = ''.join((first_seg, second_seg))
self.assertEqual(msg, MSG, "Error performing small read.")
def _testSmallRead(self):
self.cli_file.write(MSG)
self.cli_file.flush()
def testUnbufferedRead(self):
"""Performing unbuffered file read test."""
buf = '' buf = ''
while 1: while 1:
char = f.read(1) char = self.serv_file.read(1)
if not char: self.failIf(not char, "Error performing unbuffered read.")
print "Error performing unbuffered read with the python ", \
"_fileobject class"
os._exit (1)
buf += char buf += char
if buf == msg: if buf == MSG:
if verbose:
print "__fileobject unbuffered read works"
break break
if verbose:
# If we got this far, write() must work as well
print "__fileobject write works"
f.write(buf)
f.flush()
line = f.readline() def _testUnbufferedRead(self):
if not line == msg: self.cli_file.write(MSG)
print "Error perferming readline with the python _fileobject class" self.cli_file.flush()
os._exit (1)
f.write(line)
f.flush()
if verbose:
print "__fileobject readline works"
conn.close() def testReadline(self):
else: """Performing file readline test."""
try: line = self.serv_file.readline()
# child is client self.assertEqual(line, MSG, "Error performing readline.")
time.sleep(5)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if verbose:
print 'child connecting'
s.connect(("127.0.0.1", PORT))
iteration = 0 def _testReadline(self):
while 1: self.cli_file.write(MSG)
s.send(msg) self.cli_file.flush()
data = s.recv(12)
if not data: def test_main():
break suite = unittest.TestSuite()
if msg != data: suite.addTest(unittest.makeSuite(GeneralModuleTests))
print "parent/client mismatch. Failed in %s iteration. Received: [%s]" \ suite.addTest(unittest.makeSuite(BasicTCPTest))
%(iteration, data) suite.addTest(unittest.makeSuite(BasicUDPTest))
time.sleep (1) suite.addTest(unittest.makeSuite(NonBlockingTCPTests))
iteration += 1 suite.addTest(unittest.makeSuite(FileObjectClassTestCase))
s.close() test_support.run_suite(suite)
finally:
os._exit(1) if __name__ == "__main__":
except socket.error, msg: test_main()
raise TestFailed, msg

View file

@ -1,11 +1,12 @@
#!/home/bernie/src/python23/dist/src/python #!/home/bernie/src/python23/dist/src/python
import unittest import unittest
import test_support
import time import time
import socket import socket
class creationTestCase(unittest.TestCase): class CreationTestCase(unittest.TestCase):
"""Test Case for socket.gettimeout() and socket.settimeout()""" """Test Case for socket.gettimeout() and socket.settimeout()"""
def setUp(self): def setUp(self):
self.__s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.__s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@ -39,7 +40,7 @@ class creationTestCase(unittest.TestCase):
"return type of gettimeout() is not FloatType") "return type of gettimeout() is not FloatType")
class timeoutTestCase(unittest.TestCase): class TimeoutTestCase(unittest.TestCase):
"""Test Case for socket.socket() timeout functions""" """Test Case for socket.socket() timeout functions"""
def setUp(self): def setUp(self):
self.__s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.__s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@ -127,10 +128,11 @@ class timeoutTestCase(unittest.TestCase):
pass pass
def suite(): def test_main():
suite = unittest.TestSuite() suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(CreationTestCase))
return suite suite.addTest(unittest.makeSuite(TimeoutTestCase))
test_support.run_suite(suite)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() test_main()