cpython/Lib/test/test_socketserver.py
Neal Norwitz b15ac3169d Add new utility function, reap_children(), to test_support. This should
be called at the end of each test that spawns children (perhaps it
should be called from regrtest instead?).  This will hopefully prevent
some of the unexplained failures in the buildbots (hppa and alpha)
during tests that spawn children.  The problems were not reproducible.
There were many zombies that remained at the end of several tests.
In the worst case, this shouldn't cause any more problems,
though it may not help either.  Time will tell.
2006-06-29 04:10:08 +00:00

206 lines
6.3 KiB
Python

# Test suite for SocketServer.py
from test import test_support
from test.test_support import (verbose, verify, TESTFN, TestSkipped,
reap_children)
test_support.requires('network')
from SocketServer import *
import socket
import errno
import select
import time
import threading
import os
NREQ = 3
DELAY = 0.5
class MyMixinHandler:
def handle(self):
time.sleep(DELAY)
line = self.rfile.readline()
time.sleep(DELAY)
self.wfile.write(line)
class MyStreamHandler(MyMixinHandler, StreamRequestHandler):
pass
class MyDatagramHandler(MyMixinHandler, DatagramRequestHandler):
pass
class MyMixinServer:
def serve_a_few(self):
for i in range(NREQ):
self.handle_request()
def handle_error(self, request, client_address):
self.close_request(request)
self.server_close()
raise
teststring = "hello world\n"
def receive(sock, n, timeout=20):
r, w, x = select.select([sock], [], [], timeout)
if sock in r:
return sock.recv(n)
else:
raise RuntimeError, "timed out on %r" % (sock,)
def testdgram(proto, addr):
s = socket.socket(proto, socket.SOCK_DGRAM)
s.sendto(teststring, addr)
buf = data = receive(s, 100)
while data and '\n' not in buf:
data = receive(s, 100)
buf += data
verify(buf == teststring)
s.close()
def teststream(proto, addr):
s = socket.socket(proto, socket.SOCK_STREAM)
s.connect(addr)
s.sendall(teststring)
buf = data = receive(s, 100)
while data and '\n' not in buf:
data = receive(s, 100)
buf += data
verify(buf == teststring)
s.close()
class ServerThread(threading.Thread):
def __init__(self, addr, svrcls, hdlrcls):
threading.Thread.__init__(self)
self.__addr = addr
self.__svrcls = svrcls
self.__hdlrcls = hdlrcls
def run(self):
class svrcls(MyMixinServer, self.__svrcls):
pass
if verbose: print "thread: creating server"
svr = svrcls(self.__addr, self.__hdlrcls)
# pull the address out of the server in case it changed
# this can happen if another process is using the port
addr = getattr(svr, 'server_address')
if addr:
self.__addr = addr
if verbose: print "thread: serving three times"
svr.serve_a_few()
if verbose: print "thread: done"
seed = 0
def pickport():
global seed
seed += 1
return 10000 + (os.getpid() % 1000)*10 + seed
host = "localhost"
testfiles = []
def pickaddr(proto):
if proto == socket.AF_INET:
return (host, pickport())
else:
fn = TESTFN + str(pickport())
if os.name == 'os2':
# AF_UNIX socket names on OS/2 require a specific prefix
# which can't include a drive letter and must also use
# backslashes as directory separators
if fn[1] == ':':
fn = fn[2:]
if fn[0] in (os.sep, os.altsep):
fn = fn[1:]
fn = os.path.join('\socket', fn)
if os.sep == '/':
fn = fn.replace(os.sep, os.altsep)
else:
fn = fn.replace(os.altsep, os.sep)
testfiles.append(fn)
return fn
def cleanup():
for fn in testfiles:
try:
os.remove(fn)
except os.error:
pass
testfiles[:] = []
def testloop(proto, servers, hdlrcls, testfunc):
for svrcls in servers:
addr = pickaddr(proto)
if verbose:
print "ADDR =", addr
print "CLASS =", svrcls
t = ServerThread(addr, svrcls, hdlrcls)
if verbose: print "server created"
t.start()
if verbose: print "server running"
for i in range(NREQ):
time.sleep(DELAY)
if verbose: print "test client", i
testfunc(proto, addr)
if verbose: print "waiting for server"
t.join()
if verbose: print "done"
class ForgivingTCPServer(TCPServer):
# prevent errors if another process is using the port we want
def server_bind(self):
host, default_port = self.server_address
# this code shamelessly stolen from test.test_support
# the ports were changed to protect the innocent
import sys
for port in [default_port, 3434, 8798, 23833]:
try:
self.server_address = host, port
TCPServer.server_bind(self)
break
except socket.error, (err, msg):
if err != errno.EADDRINUSE:
raise
print >>sys.__stderr__, \
' WARNING: failed to listen on port %d, trying another' % port
tcpservers = [ForgivingTCPServer, ThreadingTCPServer]
if hasattr(os, 'fork') and os.name not in ('os2',):
tcpservers.append(ForkingTCPServer)
udpservers = [UDPServer, ThreadingUDPServer]
if hasattr(os, 'fork') and os.name not in ('os2',):
udpservers.append(ForkingUDPServer)
if not hasattr(socket, 'AF_UNIX'):
streamservers = []
dgramservers = []
else:
class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): pass
streamservers = [UnixStreamServer, ThreadingUnixStreamServer]
if hasattr(os, 'fork') and os.name not in ('os2',):
streamservers.append(ForkingUnixStreamServer)
class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer): pass
dgramservers = [UnixDatagramServer, ThreadingUnixDatagramServer]
if hasattr(os, 'fork') and os.name not in ('os2',):
dgramservers.append(ForkingUnixDatagramServer)
def testall():
testloop(socket.AF_INET, tcpservers, MyStreamHandler, teststream)
testloop(socket.AF_INET, udpservers, MyDatagramHandler, testdgram)
if hasattr(socket, 'AF_UNIX'):
testloop(socket.AF_UNIX, streamservers, MyStreamHandler, teststream)
# Alas, on Linux (at least) recvfrom() doesn't return a meaningful
# client address so this cannot work:
##testloop(socket.AF_UNIX, dgramservers, MyDatagramHandler, testdgram)
def test_main():
import imp
if imp.lock_held():
# If the import lock is held, the threads will hang.
raise TestSkipped("can't run when import lock is held")
try:
testall()
finally:
cleanup()
reap_children()
if __name__ == "__main__":
test_main()