cpython/Lib/test/test_socket.py
Guido van Rossum d59da4b432 Merged revisions 55407-55513 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/branches/p3yk

................
  r55413 | fred.drake | 2007-05-17 12:30:10 -0700 (Thu, 17 May 2007) | 1 line

  fix argument name in documentation; match the implementation
................
  r55430 | jack.diederich | 2007-05-18 06:39:59 -0700 (Fri, 18 May 2007) | 1 line

  Implements class decorators, PEP 3129.
................
  r55432 | guido.van.rossum | 2007-05-18 08:09:41 -0700 (Fri, 18 May 2007) | 2 lines

  obsubmit.
................
  r55434 | guido.van.rossum | 2007-05-18 09:39:10 -0700 (Fri, 18 May 2007) | 3 lines

  Fix bug in test_inspect.  (I presume this is how it should be fixed;
  Jack Diedrich, please verify.)
................
  r55460 | brett.cannon | 2007-05-20 00:31:57 -0700 (Sun, 20 May 2007) | 4 lines

  Remove the imageop module.  With imgfile already removed in Python 3.0 and
  rgbimg gone in Python 2.6 the unit tests themselves were made worthless.  Plus
  third-party libraries perform the same function much better.
................
  r55469 | neal.norwitz | 2007-05-20 11:28:20 -0700 (Sun, 20 May 2007) | 118 lines

  Merged revisions 55324-55467 via svnmerge from
  svn+ssh://pythondev@svn.python.org/python/trunk

  ........
    r55348 | georg.brandl | 2007-05-15 13:19:34 -0700 (Tue, 15 May 2007) | 4 lines

    HTML-escape the plain traceback in cgitb's HTML output, to prevent
    the traceback inadvertently or maliciously closing the comment and
    injecting HTML into the error page.
  ........
    r55372 | neal.norwitz | 2007-05-15 21:33:50 -0700 (Tue, 15 May 2007) | 6 lines

    Port rev 55353 from Guido:
    Add what looks like a necessary call to PyErr_NoMemory() when PyMem_MALLOC()
    fails.

    Will backport.
  ........
    r55377 | neal.norwitz | 2007-05-15 22:06:33 -0700 (Tue, 15 May 2007) | 1 line

    Mention removal of some directories for obsolete platforms
  ........
    r55380 | brett.cannon | 2007-05-15 22:50:03 -0700 (Tue, 15 May 2007) | 2 lines

    Change the maintainer of the BeOS port.
  ........
    r55383 | georg.brandl | 2007-05-16 06:44:18 -0700 (Wed, 16 May 2007) | 2 lines

    Bug #1719995: don't use deprecated method in sets example.
  ........
    r55386 | neal.norwitz | 2007-05-16 13:05:11 -0700 (Wed, 16 May 2007) | 5 lines

    Fix bug in marshal where bad data would cause a segfault due to
    lack of an infinite recursion check.

    Contributed by Damien Miller at Google.
  ........
    r55389 | brett.cannon | 2007-05-16 15:42:29 -0700 (Wed, 16 May 2007) | 6 lines

    Remove the gopherlib module.  It has been raising a DeprecationWarning since
    Python 2.5.

    Also remove gopher support from urllib/urllib2.  As both imported gopherlib the
    usage of the support would have raised a DeprecationWarning.
  ........
    r55394 | raymond.hettinger | 2007-05-16 18:08:04 -0700 (Wed, 16 May 2007) | 1 line

    calendar.py gets no benefit from xrange() instead of range()
  ........
    r55395 | brett.cannon | 2007-05-16 19:02:56 -0700 (Wed, 16 May 2007) | 3 lines

    Complete deprecation of BaseException.message.  Some subclasses were directly
    accessing the message attribute instead of using the descriptor.
  ........
    r55396 | neal.norwitz | 2007-05-16 23:11:36 -0700 (Wed, 16 May 2007) | 4 lines

    Reduce the max stack depth to see if this fixes the segfaults on
    Windows and some other boxes.  If this is successful, this rev should
    be backported.  I'm not sure how close to the limit we should push this.
  ........
    r55397 | neal.norwitz | 2007-05-16 23:23:50 -0700 (Wed, 16 May 2007) | 4 lines

    Set the depth to something very small to try to determine if the
    crashes on Windows are really due to the stack size or possibly
    some other problem.
  ........
    r55398 | neal.norwitz | 2007-05-17 00:04:46 -0700 (Thu, 17 May 2007) | 4 lines

    Last try for tweaking the max stack depth.  5000 was the original value,
    4000 didn't work either.  1000 does work on Windows.  If 2000 works,
    that will hopefully be a reasonable balance.
  ........
    r55412 | fred.drake | 2007-05-17 12:29:58 -0700 (Thu, 17 May 2007) | 1 line

    fix argument name in documentation; match the implementation
  ........
    r55427 | neal.norwitz | 2007-05-17 22:47:16 -0700 (Thu, 17 May 2007) | 1 line

    Verify neither dumps or loads overflow the stack and segfault.
  ........
    r55446 | collin.winter | 2007-05-18 16:11:24 -0700 (Fri, 18 May 2007) | 1 line

    Backport PEP 3110's new 'except' syntax to 2.6.
  ........
    r55448 | raymond.hettinger | 2007-05-18 18:11:16 -0700 (Fri, 18 May 2007) | 1 line

    Improvements to NamedTuple's implementation, tests, and documentation
  ........
    r55449 | raymond.hettinger | 2007-05-18 18:50:11 -0700 (Fri, 18 May 2007) | 1 line

    Fix beginner mistake -- don't mix spaces and tabs.
  ........
    r55450 | neal.norwitz | 2007-05-18 20:48:47 -0700 (Fri, 18 May 2007) | 1 line

    Clear data so random memory does not get freed.  Will backport.
  ........
    r55452 | neal.norwitz | 2007-05-18 21:34:55 -0700 (Fri, 18 May 2007) | 3 lines

    Whoops, need to pay attention to those test failures.
    Move the clear to *before* the first use, not after.
  ........
    r55453 | neal.norwitz | 2007-05-18 21:35:52 -0700 (Fri, 18 May 2007) | 1 line

    Give some clue as to what happened if the test fails.
  ........
    r55455 | georg.brandl | 2007-05-19 11:09:26 -0700 (Sat, 19 May 2007) | 2 lines

    Fix docstring for add_package in site.py.
  ........
    r55458 | brett.cannon | 2007-05-20 00:09:50 -0700 (Sun, 20 May 2007) | 2 lines

    Remove the rgbimg module.  It has been deprecated since Python 2.5.
  ........
    r55465 | nick.coghlan | 2007-05-20 04:12:49 -0700 (Sun, 20 May 2007) | 1 line

    Fix typo in example (should be backported, but my maintenance branch is woefully out of date)
  ........
................
  r55472 | brett.cannon | 2007-05-20 12:06:18 -0700 (Sun, 20 May 2007) | 2 lines

  Remove imageop from the Windows build process.
................
  r55486 | neal.norwitz | 2007-05-20 23:59:52 -0700 (Sun, 20 May 2007) | 1 line

  Remove callable() builtin
................
  r55506 | neal.norwitz | 2007-05-22 00:43:29 -0700 (Tue, 22 May 2007) | 78 lines

  Merged revisions 55468-55505 via svnmerge from
  svn+ssh://pythondev@svn.python.org/python/trunk

  ........
    r55468 | neal.norwitz | 2007-05-20 11:06:27 -0700 (Sun, 20 May 2007) | 1 line

    rotor is long gone.
  ........
    r55470 | neal.norwitz | 2007-05-20 11:43:00 -0700 (Sun, 20 May 2007) | 1 line

    Update directories/files at the top-level.
  ........
    r55471 | brett.cannon | 2007-05-20 12:05:06 -0700 (Sun, 20 May 2007) | 2 lines

    Try to remove rgbimg from Windows builds.
  ........
    r55474 | brett.cannon | 2007-05-20 16:17:38 -0700 (Sun, 20 May 2007) | 4 lines

    Remove the macfs module.  This led to the deprecation of macostools.touched();
    it completely relied on macfs and is a no-op on OS X according to code
    comments.
  ........
    r55476 | brett.cannon | 2007-05-20 16:56:18 -0700 (Sun, 20 May 2007) | 3 lines

    Move imgfile import to the global namespace to trigger an import error ASAP to
    prevent creation of a test file.
  ........
    r55477 | brett.cannon | 2007-05-20 16:57:38 -0700 (Sun, 20 May 2007) | 3 lines

    Cause posixfile to raise a DeprecationWarning.  Documented as deprecated since
    Ptyhon 1.5.
  ........
    r55479 | andrew.kuchling | 2007-05-20 17:03:15 -0700 (Sun, 20 May 2007) | 1 line

    Note removed modules
  ........
    r55481 | martin.v.loewis | 2007-05-20 21:35:47 -0700 (Sun, 20 May 2007) | 2 lines

    Add Alexandre Vassalotti.
  ........
    r55482 | george.yoshida | 2007-05-20 21:41:21 -0700 (Sun, 20 May 2007) | 4 lines

    fix against r55474 [Remove the macfs module]

    Remove "libmacfs.tex" from Makefile.deps and mac/mac.tex.
  ........
    r55487 | raymond.hettinger | 2007-05-21 01:13:35 -0700 (Mon, 21 May 2007) | 1 line

    Replace assertion with straight error-checking.
  ........
    r55489 | raymond.hettinger | 2007-05-21 09:40:10 -0700 (Mon, 21 May 2007) | 1 line

    Allow all alphanumeric and underscores in type and field names.
  ........
    r55490 | facundo.batista | 2007-05-21 10:32:32 -0700 (Mon, 21 May 2007) | 5 lines


    Added timeout support to HTTPSConnection, through the
    socket.create_connection function. Also added a small
    test for this, and updated NEWS file.
  ........
    r55495 | georg.brandl | 2007-05-21 13:34:16 -0700 (Mon, 21 May 2007) | 2 lines

    Patch #1686487: you can now pass any mapping after '**' in function calls.
  ........
    r55502 | neal.norwitz | 2007-05-21 23:03:36 -0700 (Mon, 21 May 2007) | 1 line

    Document new params to HTTPSConnection
  ........
    r55504 | neal.norwitz | 2007-05-22 00:16:10 -0700 (Tue, 22 May 2007) | 1 line

    Stop using METH_OLDARGS
  ........
    r55505 | neal.norwitz | 2007-05-22 00:16:44 -0700 (Tue, 22 May 2007) | 1 line

    Stop using METH_OLDARGS implicitly
  ........
................
2007-05-22 18:11:13 +00:00

1080 lines
35 KiB
Python

#!/usr/bin/env python
import unittest
from test import test_support
import socket
import select
import time
import thread, threading
import Queue
import sys
from weakref import proxy
import signal
PORT = 50007
HOST = 'localhost'
MSG = b'Michael Gilfix was here\n'
class SocketTCPTest(unittest.TestCase):
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
global PORT
PORT = test_support.bind_port(self.serv, HOST, PORT)
self.serv.listen(1)
def tearDown(self):
self.serv.close()
self.serv = None
class SocketUDPTest(unittest.TestCase):
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
global PORT
PORT = test_support.bind_port(self.serv, HOST, PORT)
def tearDown(self):
self.serv.close()
self.serv = None
class ThreadableTest:
"""Threadable Test class
The ThreadableTest class makes it easy to create a threaded
client/server pair from an existing unit test. To create a
new threaded class from an existing unit test, use multiple
inheritance:
class NewClass (OldClass, ThreadableTest):
pass
This class defines two new fixture functions with obvious
purposes for overriding:
clientSetUp ()
clientTearDown ()
Any new test functions within the class must then define
tests in pairs, where the test name is preceeded with a
'_' to indicate the client portion of the test. Ex:
def testFoo(self):
# Server portion
def _testFoo(self):
# Client portion
Any exceptions raised by the clients during their tests
are caught and transferred to the main thread to alert
the testing framework.
Note, the server setup function cannot call any blocking
functions that rely on the client thread during setup,
unless serverExplicitReady() is called just before
the blocking call (such as in setting up a client/server
connection and performing the accept() in setUp().
"""
def __init__(self):
# Swap the true setup function
self.__setUp = self.setUp
self.__tearDown = self.tearDown
self.setUp = self._setUp
self.tearDown = self._tearDown
def serverExplicitReady(self):
"""This method allows the server to explicitly indicate that
it wants the client thread to proceed. This is useful if the
server is about to execute a blocking routine that is
dependent upon the client thread during its setup routine."""
self.server_ready.set()
def _setUp(self):
self.server_ready = threading.Event()
self.client_ready = threading.Event()
self.done = threading.Event()
self.queue = Queue.Queue(1)
# Do some munging to start the client test.
methodname = self.id()
i = methodname.rfind('.')
methodname = methodname[i+1:]
test_method = getattr(self, '_' + methodname)
self.client_thread = thread.start_new_thread(
self.clientRun, (test_method,))
self.__setUp()
if not self.server_ready.isSet():
self.server_ready.set()
self.client_ready.wait()
def _tearDown(self):
self.__tearDown()
self.done.wait()
if not self.queue.empty():
msg = self.queue.get()
self.fail(msg)
def clientRun(self, test_func):
self.server_ready.wait()
self.client_ready.set()
self.clientSetUp()
if not hasattr(test_func, '__call__'):
raise TypeError, "test_func must be a callable function"
try:
test_func()
except Exception as strerror:
self.queue.put(strerror)
self.clientTearDown()
def clientSetUp(self):
raise NotImplementedError, "clientSetUp must be implemented."
def clientTearDown(self):
self.done.set()
thread.exit()
class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
def __init__(self, methodName='runTest'):
SocketTCPTest.__init__(self, methodName=methodName)
ThreadableTest.__init__(self)
def clientSetUp(self):
self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def clientTearDown(self):
self.cli.close()
self.cli = None
ThreadableTest.clientTearDown(self)
class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
def __init__(self, methodName='runTest'):
SocketUDPTest.__init__(self, methodName=methodName)
ThreadableTest.__init__(self)
def clientSetUp(self):
self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
class SocketConnectedTest(ThreadedTCPSocketTest):
def __init__(self, methodName='runTest'):
ThreadedTCPSocketTest.__init__(self, methodName=methodName)
def setUp(self):
ThreadedTCPSocketTest.setUp(self)
# Indicate explicitly we're ready for the client thread to
# proceed and then perform the blocking call to accept
self.serverExplicitReady()
conn, addr = self.serv.accept()
self.cli_conn = conn
def tearDown(self):
self.cli_conn.close()
self.cli_conn = None
ThreadedTCPSocketTest.tearDown(self)
def clientSetUp(self):
ThreadedTCPSocketTest.clientSetUp(self)
self.cli.connect((HOST, PORT))
self.serv_conn = self.cli
def clientTearDown(self):
self.serv_conn.close()
self.serv_conn = None
ThreadedTCPSocketTest.clientTearDown(self)
class SocketPairTest(unittest.TestCase, ThreadableTest):
def __init__(self, methodName='runTest'):
unittest.TestCase.__init__(self, methodName=methodName)
ThreadableTest.__init__(self)
def setUp(self):
self.serv, self.cli = socket.socketpair()
def tearDown(self):
self.serv.close()
self.serv = None
def clientSetUp(self):
pass
def clientTearDown(self):
self.cli.close()
self.cli = None
ThreadableTest.clientTearDown(self)
#######################################################################
## Begin Tests
class GeneralModuleTests(unittest.TestCase):
def test_weakref(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
p = proxy(s)
self.assertEqual(p.fileno(), s.fileno())
s.close()
s = None
try:
p.fileno()
except ReferenceError:
pass
else:
self.fail('Socket proxy still exists')
def testSocketError(self):
# Testing socket module exceptions
def raise_error(*args, **kwargs):
raise socket.error
def raise_herror(*args, **kwargs):
raise socket.herror
def raise_gaierror(*args, **kwargs):
raise socket.gaierror
self.failUnlessRaises(socket.error, raise_error,
"Error raising socket exception.")
self.failUnlessRaises(socket.error, raise_herror,
"Error raising socket exception.")
self.failUnlessRaises(socket.error, raise_gaierror,
"Error raising socket exception.")
def testCrucialConstants(self):
# Testing for mission critical constants
socket.AF_INET
socket.SOCK_STREAM
socket.SOCK_DGRAM
socket.SOCK_RAW
socket.SOCK_RDM
socket.SOCK_SEQPACKET
socket.SOL_SOCKET
socket.SO_REUSEADDR
def testHostnameRes(self):
# Testing hostname resolution mechanisms
hostname = socket.gethostname()
try:
ip = socket.gethostbyname(hostname)
except socket.error:
# Probably name lookup wasn't set up right; skip this test
return
self.assert_(ip.find('.') >= 0, "Error resolving host to ip.")
try:
hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
except socket.error:
# Probably a similar problem as above; skip this test
return
all_host_names = [hostname, hname] + aliases
fqhn = socket.getfqdn(ip)
if not fqhn in all_host_names:
self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
def testRefCountGetNameInfo(self):
# Testing reference count for getnameinfo
import sys
if hasattr(sys, "getrefcount"):
try:
# On some versions, this loses a reference
orig = sys.getrefcount(__name__)
socket.getnameinfo(__name__,0)
except SystemError:
if sys.getrefcount(__name__) != orig:
self.fail("socket.getnameinfo loses a reference")
def testInterpreterCrash(self):
# Making sure getnameinfo doesn't crash the interpreter
try:
# On some versions, this crashes the interpreter.
socket.getnameinfo(('x', 0, 0, 0), 0)
except socket.error:
pass
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1<<34)
def testNtoHErrors(self):
good_values = [ 1, 2, 3, 1, 2, 3 ]
bad_values = [ -1, -2, -3, -1, -2, -3 ]
for k in good_values:
socket.ntohl(k)
socket.ntohs(k)
socket.htonl(k)
socket.htons(k)
for k in bad_values:
self.assertRaises(OverflowError, socket.ntohl, k)
self.assertRaises(OverflowError, socket.ntohs, k)
self.assertRaises(OverflowError, socket.htonl, k)
self.assertRaises(OverflowError, socket.htons, k)
def testGetServBy(self):
eq = self.assertEqual
# Find one service that exists, then check all the related interfaces.
# I've ordered this by protocols that have both a tcp and udp
# protocol, at least for modern Linuxes.
if sys.platform in ('linux2', 'freebsd4', 'freebsd5', 'freebsd6',
'freebsd7', 'darwin'):
# avoid the 'echo' service on this platform, as there is an
# assumption breaking non-standard port/protocol entry
services = ('daytime', 'qotd', 'domain')
else:
services = ('echo', 'daytime', 'domain')
for service in services:
try:
port = socket.getservbyname(service, 'tcp')
break
except socket.error:
pass
else:
raise socket.error
# Try same call with optional protocol omitted
port2 = socket.getservbyname(service)
eq(port, port2)
# Try udp, but don't barf it it doesn't exist
try:
udpport = socket.getservbyname(service, 'udp')
except socket.error:
udpport = None
else:
eq(udpport, port)
# Now make sure the lookup by port returns the same service name
eq(socket.getservbyport(port2), service)
eq(socket.getservbyport(port, 'tcp'), service)
if udpport is not None:
eq(socket.getservbyport(udpport, 'udp'), service)
def testDefaultTimeout(self):
# Testing default timeout
# The default timeout should initially be None
self.assertEqual(socket.getdefaulttimeout(), None)
s = socket.socket()
self.assertEqual(s.gettimeout(), None)
s.close()
# Set the default timeout to 10, and see if it propagates
socket.setdefaulttimeout(10)
self.assertEqual(socket.getdefaulttimeout(), 10)
s = socket.socket()
self.assertEqual(s.gettimeout(), 10)
s.close()
# Reset the default timeout to None, and see if it propagates
socket.setdefaulttimeout(None)
self.assertEqual(socket.getdefaulttimeout(), None)
s = socket.socket()
self.assertEqual(s.gettimeout(), None)
s.close()
# Check that setting it to an invalid value raises ValueError
self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
# Check that setting it to an invalid type raises TypeError
self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
def testIPv4toString(self):
if not hasattr(socket, 'inet_pton'):
return # No inet_pton() on this platform
from socket import inet_aton as f, inet_pton, AF_INET
g = lambda a: inet_pton(AF_INET, a)
self.assertEquals(b'\x00\x00\x00\x00', f('0.0.0.0'))
self.assertEquals(b'\xff\x00\xff\x00', f('255.0.255.0'))
self.assertEquals(b'\xaa\xaa\xaa\xaa', f('170.170.170.170'))
self.assertEquals(b'\x01\x02\x03\x04', f('1.2.3.4'))
self.assertEquals(b'\xff\xff\xff\xff', f('255.255.255.255'))
self.assertEquals(b'\x00\x00\x00\x00', g('0.0.0.0'))
self.assertEquals(b'\xff\x00\xff\x00', g('255.0.255.0'))
self.assertEquals(b'\xaa\xaa\xaa\xaa', g('170.170.170.170'))
self.assertEquals(b'\xff\xff\xff\xff', g('255.255.255.255'))
def testIPv6toString(self):
if not hasattr(socket, 'inet_pton'):
return # No inet_pton() on this platform
try:
from socket import inet_pton, AF_INET6, has_ipv6
if not has_ipv6:
return
except ImportError:
return
f = lambda a: inet_pton(AF_INET6, a)
self.assertEquals('\x00' * 16, f('::'))
self.assertEquals('\x00' * 16, f('0::0'))
self.assertEquals('\x00\x01' + '\x00' * 14, f('1::'))
self.assertEquals(
'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
)
def testStringToIPv4(self):
if not hasattr(socket, 'inet_ntop'):
return # No inet_ntop() on this platform
from socket import inet_ntoa as f, inet_ntop, AF_INET
g = lambda a: inet_ntop(AF_INET, a)
self.assertEquals('1.0.1.0', f(b'\x01\x00\x01\x00'))
self.assertEquals('170.85.170.85', f(b'\xaa\x55\xaa\x55'))
self.assertEquals('255.255.255.255', f(b'\xff\xff\xff\xff'))
self.assertEquals('1.2.3.4', f(b'\x01\x02\x03\x04'))
self.assertEquals('1.0.1.0', g(b'\x01\x00\x01\x00'))
self.assertEquals('170.85.170.85', g(b'\xaa\x55\xaa\x55'))
self.assertEquals('255.255.255.255', g(b'\xff\xff\xff\xff'))
def testStringToIPv6(self):
if not hasattr(socket, 'inet_ntop'):
return # No inet_ntop() on this platform
try:
from socket import inet_ntop, AF_INET6, has_ipv6
if not has_ipv6:
return
except ImportError:
return
f = lambda a: inet_ntop(AF_INET6, a)
self.assertEquals('::', f('\x00' * 16))
self.assertEquals('::1', f('\x00' * 15 + '\x01'))
self.assertEquals(
'aef:b01:506:1001:ffff:9997:55:170',
f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
)
# XXX The following don't test module-level functionality...
def testSockName(self):
# Testing getsockname()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("0.0.0.0", PORT+1))
name = sock.getsockname()
# XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
# it reasonable to get the host's addr in addition to 0.0.0.0.
# At least for eCos. This is required for the S/390 to pass.
my_ip_addr = socket.gethostbyname(socket.gethostname())
self.assert_(name[0] in ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
self.assertEqual(name[1], PORT+1)
def testGetSockOpt(self):
# Testing getsockopt()
# We know a socket should start without reuse==0
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
self.failIf(reuse != 0, "initial mode is reuse")
def testSetSockOpt(self):
# Testing setsockopt()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
self.failIf(reuse == 0, "failed to set reuse mode")
def testSendAfterClose(self):
# testing send() after close() with timeout
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
sock.close()
self.assertRaises(socket.error, sock.send, "spam")
def testNewAttributes(self):
# testing .family, .type and .protocol
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.assertEqual(sock.family, socket.AF_INET)
self.assertEqual(sock.type, socket.SOCK_STREAM)
self.assertEqual(sock.proto, 0)
sock.close()
class BasicTCPTest(SocketConnectedTest):
def __init__(self, methodName='runTest'):
SocketConnectedTest.__init__(self, methodName=methodName)
def testRecv(self):
# Testing large receive over TCP
msg = self.cli_conn.recv(1024)
self.assertEqual(msg, MSG)
def _testRecv(self):
self.serv_conn.send(MSG)
def testOverFlowRecv(self):
# Testing receive in chunks over TCP
seg1 = self.cli_conn.recv(len(MSG) - 3)
seg2 = self.cli_conn.recv(1024)
msg = seg1 + seg2
self.assertEqual(msg, MSG)
def _testOverFlowRecv(self):
self.serv_conn.send(MSG)
def testRecvFrom(self):
# Testing large recvfrom() over TCP
msg, addr = self.cli_conn.recvfrom(1024)
self.assertEqual(msg, MSG)
def _testRecvFrom(self):
self.serv_conn.send(MSG)
def testOverFlowRecvFrom(self):
# Testing recvfrom() in chunks over TCP
seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
seg2, addr = self.cli_conn.recvfrom(1024)
msg = seg1 + seg2
self.assertEqual(msg, MSG)
def _testOverFlowRecvFrom(self):
self.serv_conn.send(MSG)
def testSendAll(self):
# Testing sendall() with a 2048 byte string over TCP
msg = b''
while 1:
read = self.cli_conn.recv(1024)
if not read:
break
msg += read
self.assertEqual(msg, b'f' * 2048)
def _testSendAll(self):
big_chunk = b'f' * 2048
self.serv_conn.sendall(big_chunk)
def testFromFd(self):
# Testing fromfd()
if not hasattr(socket, "fromfd"):
return # On Windows, this doesn't exist
fd = self.cli_conn.fileno()
sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
msg = sock.recv(1024)
self.assertEqual(msg, MSG)
def _testFromFd(self):
self.serv_conn.send(MSG)
def testShutdown(self):
# Testing shutdown()
msg = self.cli_conn.recv(1024)
self.assertEqual(msg, MSG)
def _testShutdown(self):
self.serv_conn.send(MSG)
self.serv_conn.shutdown(2)
class BasicUDPTest(ThreadedUDPSocketTest):
def __init__(self, methodName='runTest'):
ThreadedUDPSocketTest.__init__(self, methodName=methodName)
def testSendtoAndRecv(self):
# Testing sendto() and Recv() over UDP
msg = self.serv.recv(len(MSG))
self.assertEqual(msg, MSG)
def _testSendtoAndRecv(self):
self.cli.sendto(MSG, 0, (HOST, PORT))
def testRecvFrom(self):
# Testing recvfrom() over UDP
msg, addr = self.serv.recvfrom(len(MSG))
self.assertEqual(msg, MSG)
def _testRecvFrom(self):
self.cli.sendto(MSG, 0, (HOST, PORT))
def testRecvFromNegative(self):
# Negative lengths passed to recvfrom should give ValueError.
self.assertRaises(ValueError, self.serv.recvfrom, -1)
def _testRecvFromNegative(self):
self.cli.sendto(MSG, 0, (HOST, PORT))
class TCPCloserTest(ThreadedTCPSocketTest):
def testClose(self):
conn, addr = self.serv.accept()
conn.close()
sd = self.cli
read, write, err = select.select([sd], [], [], 1.0)
self.assertEqual(read, [sd])
self.assertEqual(sd.recv(1), b'')
def _testClose(self):
self.cli.connect((HOST, PORT))
time.sleep(1.0)
class BasicSocketPairTest(SocketPairTest):
def __init__(self, methodName='runTest'):
SocketPairTest.__init__(self, methodName=methodName)
def testRecv(self):
msg = self.serv.recv(1024)
self.assertEqual(msg, MSG)
def _testRecv(self):
self.cli.send(MSG)
def testSend(self):
self.serv.send(MSG)
def _testSend(self):
msg = self.cli.recv(1024)
self.assertEqual(msg, MSG)
class NonBlockingTCPTests(ThreadedTCPSocketTest):
def __init__(self, methodName='runTest'):
ThreadedTCPSocketTest.__init__(self, methodName=methodName)
def testSetBlocking(self):
# Testing whether set blocking works
self.serv.setblocking(0)
start = time.time()
try:
self.serv.accept()
except socket.error:
pass
end = time.time()
self.assert_((end - start) < 1.0, "Error setting non-blocking mode.")
def _testSetBlocking(self):
pass
def testAccept(self):
# Testing non-blocking accept
self.serv.setblocking(0)
try:
conn, addr = self.serv.accept()
except socket.error:
pass
else:
self.fail("Error trying to do non-blocking accept.")
read, write, err = select.select([self.serv], [], [])
if self.serv in read:
conn, addr = self.serv.accept()
else:
self.fail("Error trying to do accept after select.")
def _testAccept(self):
time.sleep(0.1)
self.cli.connect((HOST, PORT))
def testConnect(self):
# Testing non-blocking connect
conn, addr = self.serv.accept()
def _testConnect(self):
self.cli.settimeout(10)
self.cli.connect((HOST, PORT))
def testRecv(self):
# Testing non-blocking recv
conn, addr = self.serv.accept()
conn.setblocking(0)
try:
msg = conn.recv(len(MSG))
except socket.error:
pass
else:
self.fail("Error trying to do non-blocking recv.")
read, write, err = select.select([conn], [], [])
if conn in read:
msg = conn.recv(len(MSG))
self.assertEqual(msg, MSG)
else:
self.fail("Error during select call to non-blocking socket.")
def _testRecv(self):
self.cli.connect((HOST, PORT))
time.sleep(0.1)
self.cli.send(MSG)
class FileObjectClassTestCase(SocketConnectedTest):
bufsize = -1 # Use default buffer size
def __init__(self, methodName='runTest'):
SocketConnectedTest.__init__(self, methodName=methodName)
def setUp(self):
SocketConnectedTest.setUp(self)
self.serv_file = self.cli_conn.makefile('rb', self.bufsize)
def tearDown(self):
self.serv_file.close()
self.assert_(self.serv_file.closed)
self.serv_file = None
SocketConnectedTest.tearDown(self)
def clientSetUp(self):
SocketConnectedTest.clientSetUp(self)
self.cli_file = self.serv_conn.makefile('wb')
def clientTearDown(self):
self.cli_file.close()
self.assert_(self.cli_file.closed)
self.cli_file = None
SocketConnectedTest.clientTearDown(self)
def testSmallRead(self):
# Performing small file read test
first_seg = self.serv_file.read(len(MSG)-3)
second_seg = self.serv_file.read(3)
msg = first_seg + second_seg
self.assertEqual(msg, MSG)
def _testSmallRead(self):
self.cli_file.write(MSG)
self.cli_file.flush()
def testFullRead(self):
# read until EOF
msg = self.serv_file.read()
self.assertEqual(msg, MSG)
def _testFullRead(self):
self.cli_file.write(MSG)
self.cli_file.close()
def testUnbufferedRead(self):
# Performing unbuffered file read test
buf = b''
while 1:
char = self.serv_file.read(1)
if not char:
break
buf += char
self.assertEqual(buf, MSG)
def _testUnbufferedRead(self):
self.cli_file.write(MSG)
self.cli_file.flush()
def testReadline(self):
# Performing file readline test
line = self.serv_file.readline()
self.assertEqual(line, MSG)
def _testReadline(self):
self.cli_file.write(MSG)
self.cli_file.flush()
def testClosedAttr(self):
self.assert_(not self.serv_file.closed)
def _testClosedAttr(self):
self.assert_(not self.cli_file.closed)
class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
"""Repeat the tests from FileObjectClassTestCase with bufsize==0.
In this case (and in this case only), it should be possible to
create a file object, read a line from it, create another file
object, read another line from it, without loss of data in the
first file object's buffer. Note that httplib relies on this
when reading multiple requests from the same socket."""
bufsize = 0 # Use unbuffered mode
def testUnbufferedReadline(self):
# Read a line, create a new file object, read another line with it
line = self.serv_file.readline() # first line
self.assertEqual(line, b"A. " + MSG) # first line
self.serv_file = self.cli_conn.makefile('rb', 0)
line = self.serv_file.readline() # second line
self.assertEqual(line, b"B. " + MSG) # second line
def _testUnbufferedReadline(self):
self.cli_file.write(b"A. " + MSG)
self.cli_file.write(b"B. " + MSG)
self.cli_file.flush()
class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
bufsize = 1 # Default-buffered for reading; line-buffered for writing
class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
bufsize = 2 # Exercise the buffering code
class NetworkConnectionTest(object):
"""Prove network connection."""
def clientSetUp(self):
self.cli = socket.create_connection((HOST, PORT))
self.serv_conn = self.cli
class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
"""Tests that NetworkConnection does not break existing TCP functionality.
"""
class NetworkConnectionNoServer(unittest.TestCase):
def testWithoutServer(self):
self.failUnlessRaises(socket.error, lambda: socket.create_connection((HOST, PORT)))
class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
def __init__(self, methodName='runTest'):
SocketTCPTest.__init__(self, methodName=methodName)
ThreadableTest.__init__(self)
def clientSetUp(self):
pass
def clientTearDown(self):
self.cli.close()
self.cli = None
ThreadableTest.clientTearDown(self)
def _justAccept(self):
conn, addr = self.serv.accept()
testFamily = _justAccept
def _testFamily(self):
self.cli = socket.create_connection((HOST, PORT), timeout=30)
self.assertEqual(self.cli.family, 2)
testTimeoutDefault = _justAccept
def _testTimeoutDefault(self):
self.cli = socket.create_connection((HOST, PORT))
self.assertTrue(self.cli.gettimeout() is None)
testTimeoutValueNamed = _justAccept
def _testTimeoutValueNamed(self):
self.cli = socket.create_connection((HOST, PORT), timeout=30)
self.assertEqual(self.cli.gettimeout(), 30)
testTimeoutValueNonamed = _justAccept
def _testTimeoutValueNonamed(self):
self.cli = socket.create_connection((HOST, PORT), 30)
self.assertEqual(self.cli.gettimeout(), 30)
testTimeoutNone = _justAccept
def _testTimeoutNone(self):
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
self.cli = socket.create_connection((HOST, PORT), timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(self.cli.gettimeout(), 30)
class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
def __init__(self, methodName='runTest'):
SocketTCPTest.__init__(self, methodName=methodName)
ThreadableTest.__init__(self)
def clientSetUp(self):
pass
def clientTearDown(self):
self.cli.close()
self.cli = None
ThreadableTest.clientTearDown(self)
def testInsideTimeout(self):
conn, addr = self.serv.accept()
time.sleep(3)
conn.send(b"done!")
testOutsideTimeout = testInsideTimeout
def _testInsideTimeout(self):
self.cli = sock = socket.create_connection((HOST, PORT))
data = sock.recv(5)
self.assertEqual(data, b"done!")
def _testOutsideTimeout(self):
self.cli = sock = socket.create_connection((HOST, PORT), timeout=1)
self.failUnlessRaises(socket.timeout, lambda: sock.recv(5))
class TCPTimeoutTest(SocketTCPTest):
def testTCPTimeout(self):
def raise_timeout(*args, **kwargs):
self.serv.settimeout(1.0)
self.serv.accept()
self.failUnlessRaises(socket.timeout, raise_timeout,
"Error generating a timeout exception (TCP)")
def testTimeoutZero(self):
ok = False
try:
self.serv.settimeout(0.0)
foo = self.serv.accept()
except socket.timeout:
self.fail("caught timeout instead of error (TCP)")
except socket.error:
ok = True
except:
self.fail("caught unexpected exception (TCP)")
if not ok:
self.fail("accept() returned success when we did not expect it")
def testInterruptedTimeout(self):
# XXX I don't know how to do this test on MSWindows or any other
# plaform that doesn't support signal.alarm() or os.kill(), though
# the bug should have existed on all platforms.
if not hasattr(signal, "alarm"):
return # can only test on *nix
self.serv.settimeout(5.0) # must be longer than alarm
class Alarm(Exception):
pass
def alarm_handler(signal, frame):
raise Alarm
old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
try:
signal.alarm(2) # POSIX allows alarm to be up to 1 second early
try:
foo = self.serv.accept()
except socket.timeout:
self.fail("caught timeout instead of Alarm")
except Alarm:
pass
except:
self.fail("caught other exception instead of Alarm")
else:
self.fail("nothing caught")
signal.alarm(0) # shut off alarm
except Alarm:
self.fail("got Alarm in wrong place")
finally:
# no alarm can be pending. Safe to restore old handler.
signal.signal(signal.SIGALRM, old_alarm)
class UDPTimeoutTest(SocketTCPTest):
def testUDPTimeout(self):
def raise_timeout(*args, **kwargs):
self.serv.settimeout(1.0)
self.serv.recv(1024)
self.failUnlessRaises(socket.timeout, raise_timeout,
"Error generating a timeout exception (UDP)")
def testTimeoutZero(self):
ok = False
try:
self.serv.settimeout(0.0)
foo = self.serv.recv(1024)
except socket.timeout:
self.fail("caught timeout instead of error (UDP)")
except socket.error:
ok = True
except:
self.fail("caught unexpected exception (UDP)")
if not ok:
self.fail("recv() returned success when we did not expect it")
class TestExceptions(unittest.TestCase):
def testExceptionTree(self):
self.assert_(issubclass(socket.error, Exception))
self.assert_(issubclass(socket.herror, socket.error))
self.assert_(issubclass(socket.gaierror, socket.error))
self.assert_(issubclass(socket.timeout, socket.error))
class TestLinuxAbstractNamespace(unittest.TestCase):
UNIX_PATH_MAX = 108
def testLinuxAbstractNamespace(self):
address = "\x00python-test-hello\x00\xff"
s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s1.bind(address)
s1.listen(1)
s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s2.connect(s1.getsockname())
s1.accept()
self.assertEqual(s1.getsockname(), address)
self.assertEqual(s2.getpeername(), address)
def testMaxName(self):
address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1)
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.bind(address)
self.assertEqual(s.getsockname(), address)
def testNameOverflow(self):
address = "\x00" + "h" * self.UNIX_PATH_MAX
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.assertRaises(socket.error, s.bind, address)
class BufferIOTest(SocketConnectedTest):
"""
Test the buffer versions of socket.recv() and socket.send().
"""
def __init__(self, methodName='runTest'):
SocketConnectedTest.__init__(self, methodName=methodName)
def testRecvInto(self):
buf = b" "*1024
nbytes = self.cli_conn.recv_into(buf)
self.assertEqual(nbytes, len(MSG))
msg = buf[:len(MSG)]
self.assertEqual(msg, MSG)
def _testRecvInto(self):
buf = bytes(MSG)
self.serv_conn.send(buf)
def testRecvFromInto(self):
buf = b" "*1024
nbytes, addr = self.cli_conn.recvfrom_into(buf)
self.assertEqual(nbytes, len(MSG))
msg = buf[:len(MSG)]
self.assertEqual(msg, MSG)
def _testRecvFromInto(self):
buf = bytes(MSG)
self.serv_conn.send(buf)
def test_main():
tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
TestExceptions, BufferIOTest, BasicTCPTest2]
if sys.platform != 'mac':
tests.extend([ BasicUDPTest, UDPTimeoutTest ])
tests.extend([
NonBlockingTCPTests,
FileObjectClassTestCase,
UnbufferedFileObjectClassTestCase,
LineBufferedFileObjectClassTestCase,
SmallBufferedFileObjectClassTestCase,
NetworkConnectionNoServer,
NetworkConnectionAttributesTest,
NetworkConnectionBehaviourTest,
])
if hasattr(socket, "socketpair"):
tests.append(BasicSocketPairTest)
if sys.platform == 'linux2':
tests.append(TestLinuxAbstractNamespace)
thread_info = test_support.threading_setup()
test_support.run_unittest(*tests)
test_support.threading_cleanup(*thread_info)
if __name__ == "__main__":
test_main()