Cumulative patch to http and xmlrpc
This commit is contained in:
Kristján Valur Jónsson 2009-06-28 21:04:17 +00:00
parent 552e7a7e2f
commit e007860b8b
7 changed files with 406 additions and 70 deletions

View file

@ -273,7 +273,7 @@ ADDR = PORT = URL = None
# The evt is set twice. First when the server is ready to serve.
# Second when the server has been shutdown. The user must clear
# the event after it has been set the first time to catch the second set.
def http_server(evt, numrequests):
def http_server(evt, numrequests, requestHandler=None):
class TestInstanceClass:
def div(self, x, y):
return x // y
@ -294,7 +294,9 @@ def http_server(evt, numrequests):
s.setblocking(True)
return s, port
serv = MyXMLRPCServer(("localhost", 0),
if not requestHandler:
requestHandler = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
serv = MyXMLRPCServer(("localhost", 0), requestHandler,
logRequests=False, bind_and_activate=False)
try:
serv.socket.settimeout(3)
@ -348,34 +350,36 @@ def is_unavailable_exception(e):
return False
# NOTE: The tests in SimpleServerTestCase will ignore failures caused by
# "temporarily unavailable" exceptions raised in SimpleXMLRPCServer. This
# condition occurs infrequently on some platforms, frequently on others, and
# is apparently caused by using SimpleXMLRPCServer with a non-blocking socket.
# If the server class is updated at some point in the future to handle this
# situation more gracefully, these tests should be modified appropriately.
class SimpleServerTestCase(unittest.TestCase):
class BaseServerTestCase(unittest.TestCase):
requestHandler = None
def setUp(self):
# enable traceback reporting
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
self.evt = threading.Event()
# start server thread to handle requests
serv_args = (self.evt, 1)
serv_args = (self.evt, 1, self.requestHandler)
threading.Thread(target=http_server, args=serv_args).start()
# wait for the server to be ready
self.evt.wait()
self.evt.wait(10)
self.evt.clear()
def tearDown(self):
# wait on the server thread to terminate
self.evt.wait()
self.evt.wait(10)
# disable traceback reporting
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False
# NOTE: The tests in SimpleServerTestCase will ignore failures caused by
# "temporarily unavailable" exceptions raised in SimpleXMLRPCServer. This
# condition occurs infrequently on some platforms, frequently on others, and
# is apparently caused by using SimpleXMLRPCServer with a non-blocking socket
# If the server class is updated at some point in the future to handle this
# situation more gracefully, these tests should be modified appropriately.
class SimpleServerTestCase(BaseServerTestCase):
def test_simple1(self):
try:
p = xmlrpclib.ServerProxy(URL)
@ -512,6 +516,110 @@ class SimpleServerTestCase(unittest.TestCase):
# This avoids waiting for the socket timeout.
self.test_simple1()
#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
#does indeed serve subsequent requests on the same connection
class KeepaliveServerTestCase(BaseServerTestCase):
#a request handler that supports keep-alive and logs requests into a
#class variable
class RequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
parentClass = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
protocol_version = 'HTTP/1.1'
myRequests = []
def handle(self):
self.myRequests.append([])
return self.parentClass.handle(self)
def handle_one_request(self):
result = self.parentClass.handle_one_request(self)
self.myRequests[-1].append(self.raw_requestline)
return result
requestHandler = RequestHandler
def setUp(self):
#clear request log
self.RequestHandler.myRequests = []
return BaseServerTestCase.setUp(self)
def test_two(self):
p = xmlrpclib.ServerProxy(URL)
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(len(self.RequestHandler.myRequests), 1)
#we may or may not catch the final "append" with the empty line
self.assertTrue(len(self.RequestHandler.myRequests[-1]) >= 2)
#A test case that verifies that gzip encoding works in both directions
#(for a request and the response)
class GzipServerTestCase(BaseServerTestCase):
#a request handler that supports keep-alive and logs requests into a
#class variable
class RequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
parentClass = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
protocol_version = 'HTTP/1.1'
def do_POST(self):
#store content of last request in class
self.__class__.content_length = int(self.headers["content-length"])
return self.parentClass.do_POST(self)
requestHandler = RequestHandler
class Transport(xmlrpclib.Transport):
#custom transport, stores the response length for our perusal
fake_gzip = False
def parse_response(self, response):
self.response_length=int(response.getheader("content-length", 0))
return xmlrpclib.Transport.parse_response(self, response)
def send_content(self, connection, body):
if self.fake_gzip:
#add a lone gzip header to induce decode error remotely
connection.putheader("Content-Encoding", "gzip")
return xmlrpclib.Transport.send_content(self, connection, body)
def test_gzip_request(self):
t = self.Transport()
t.encode_threshold = None
p = xmlrpclib.ServerProxy(URL, transport=t)
self.assertEqual(p.pow(6,8), 6**8)
a = self.RequestHandler.content_length
t.encode_threshold = 0 #turn on request encoding
self.assertEqual(p.pow(6,8), 6**8)
b = self.RequestHandler.content_length
self.assertTrue(a>b)
def test_bad_gzip_request(self):
t = self.Transport()
t.encode_threshold = None
t.fake_gzip = True
p = xmlrpclib.ServerProxy(URL, transport=t)
cm = self.assertRaisesRegexp(xmlrpclib.ProtocolError,
re.compile(r"\b400\b"))
with cm:
p.pow(6, 8)
def test_gsip_response(self):
t = self.Transport()
p = xmlrpclib.ServerProxy(URL, transport=t)
old = self.requestHandler.encode_threshold
self.requestHandler.encode_threshold = None #no encoding
self.assertEqual(p.pow(6,8), 6**8)
a = t.response_length
self.requestHandler.encode_threshold = 0 #always encode
self.assertEqual(p.pow(6,8), 6**8)
b = t.response_length
self.requestHandler.encode_threshold = old
self.assertTrue(a>b)
#Test special attributes of the ServerProxy object
class ServerProxyTestCase(unittest.TestCase):
def test_close(self):
p = xmlrpclib.ServerProxy(URL)
self.assertEqual(p('close')(), None)
def test_transport(self):
t = xmlrpclib.Transport()
p = xmlrpclib.ServerProxy(URL, transport=t)
self.assertEqual(p('transport'), t)
# This is a contrived way to make a failure occur on the server side
# in order to test the _send_traceback_header flag on the server
class FailingMessageClass(mimetools.Message):
@ -693,6 +801,9 @@ class FakeSocket:
def makefile(self, x='r', y=-1):
raise RuntimeError
def close(self):
pass
class FakeTransport(xmlrpclib.Transport):
"""A Transport instance that records instead of sending a request.
@ -703,7 +814,7 @@ class FakeTransport(xmlrpclib.Transport):
def make_connection(self, host):
conn = xmlrpclib.Transport.make_connection(self, host)
conn._conn.sock = self.fake_socket = FakeSocket()
conn.sock = self.fake_socket = FakeSocket()
return conn
class TransportSubclassTestCase(unittest.TestCase):
@ -763,6 +874,9 @@ def test_main():
xmlrpc_tests = [XMLRPCTestCase, HelperTestCase, DateTimeTestCase,
BinaryTestCase, FaultTestCase, TransportSubclassTestCase]
xmlrpc_tests.append(SimpleServerTestCase)
xmlrpc_tests.append(KeepaliveServerTestCase)
xmlrpc_tests.append(GzipServerTestCase)
xmlrpc_tests.append(ServerProxyTestCase)
xmlrpc_tests.append(FailingServerTestCase)
xmlrpc_tests.append(CGIHandlerTestCase)