mirror of
https://github.com/python/cpython.git
synced 2025-12-04 00:30:19 +00:00
parent
fae45ed874
commit
985fc6a304
5 changed files with 400 additions and 72 deletions
|
|
@ -246,7 +246,7 @@ ADDR = PORT = URL = None
|
|||
# The evt is set twice. First when the server is ready to serve.
|
||||
# Second when the server has been shutdown. The user must clear
|
||||
# the event after it has been set the first time to catch the second set.
|
||||
def http_server(evt, numrequests):
|
||||
def http_server(evt, numrequests, requestHandler=None):
|
||||
class TestInstanceClass:
|
||||
def div(self, x, y):
|
||||
return x // y
|
||||
|
|
@ -267,7 +267,9 @@ def http_server(evt, numrequests):
|
|||
s.setblocking(True)
|
||||
return s, port
|
||||
|
||||
serv = MyXMLRPCServer(("localhost", 0),
|
||||
if not requestHandler:
|
||||
requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
|
||||
serv = MyXMLRPCServer(("localhost", 0), requestHandler,
|
||||
logRequests=False, bind_and_activate=False)
|
||||
try:
|
||||
serv.server_bind()
|
||||
|
|
@ -318,14 +320,15 @@ def is_unavailable_exception(e):
|
|||
if exc_mess and 'temporarily unavailable' in exc_mess.lower():
|
||||
return True
|
||||
|
||||
class SimpleServerTestCase(unittest.TestCase):
|
||||
class BaseServerTestCase(unittest.TestCase):
|
||||
requestHandler = None
|
||||
def setUp(self):
|
||||
# enable traceback reporting
|
||||
xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
|
||||
|
||||
self.evt = threading.Event()
|
||||
# start server thread to handle requests
|
||||
serv_args = (self.evt, 1)
|
||||
serv_args = (self.evt, 1, self.requestHandler)
|
||||
threading.Thread(target=http_server, args=serv_args).start()
|
||||
|
||||
# wait for the server to be ready
|
||||
|
|
@ -343,6 +346,7 @@ class SimpleServerTestCase(unittest.TestCase):
|
|||
# disable traceback reporting
|
||||
xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
|
||||
|
||||
class SimpleServerTestCase(BaseServerTestCase):
|
||||
def test_simple1(self):
|
||||
try:
|
||||
p = xmlrpclib.ServerProxy(URL)
|
||||
|
|
@ -478,6 +482,110 @@ class SimpleServerTestCase(unittest.TestCase):
|
|||
# This avoids waiting for the socket timeout.
|
||||
self.test_simple1()
|
||||
|
||||
#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
|
||||
#does indeed serve subsequent requests on the same connection
|
||||
class KeepaliveServerTestCase(BaseServerTestCase):
|
||||
#a request handler that supports keep-alive and logs requests into a
|
||||
#class variable
|
||||
class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
|
||||
parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
|
||||
protocol_version = 'HTTP/1.1'
|
||||
myRequests = []
|
||||
def handle(self):
|
||||
self.myRequests.append([])
|
||||
return self.parentClass.handle(self)
|
||||
def handle_one_request(self):
|
||||
result = self.parentClass.handle_one_request(self)
|
||||
self.myRequests[-1].append(self.raw_requestline)
|
||||
return result
|
||||
|
||||
requestHandler = RequestHandler
|
||||
def setUp(self):
|
||||
#clear request log
|
||||
self.RequestHandler.myRequests = []
|
||||
return BaseServerTestCase.setUp(self)
|
||||
|
||||
def test_two(self):
|
||||
p = xmlrpclib.ServerProxy(URL)
|
||||
self.assertEqual(p.pow(6,8), 6**8)
|
||||
self.assertEqual(p.pow(6,8), 6**8)
|
||||
self.assertEqual(len(self.RequestHandler.myRequests), 1)
|
||||
#we may or may not catch the final "append" with the empty line
|
||||
self.assertTrue(len(self.RequestHandler.myRequests[-1]) >= 2)
|
||||
|
||||
#A test case that verifies that gzip encoding works in both directions
|
||||
#(for a request and the response)
|
||||
class GzipServerTestCase(BaseServerTestCase):
|
||||
#a request handler that supports keep-alive and logs requests into a
|
||||
#class variable
|
||||
class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
|
||||
parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
|
||||
protocol_version = 'HTTP/1.1'
|
||||
|
||||
def do_POST(self):
|
||||
#store content of last request in class
|
||||
self.__class__.content_length = int(self.headers["content-length"])
|
||||
return self.parentClass.do_POST(self)
|
||||
requestHandler = RequestHandler
|
||||
|
||||
class Transport(xmlrpclib.Transport):
|
||||
#custom transport, stores the response length for our perusal
|
||||
fake_gzip = False
|
||||
def parse_response(self, response):
|
||||
self.response_length=int(response.getheader("content-length", 0))
|
||||
return xmlrpclib.Transport.parse_response(self, response)
|
||||
|
||||
def send_content(self, connection, body):
|
||||
if self.fake_gzip:
|
||||
#add a lone gzip header to induce decode error remotely
|
||||
connection.putheader("Content-Encoding", "gzip")
|
||||
return xmlrpclib.Transport.send_content(self, connection, body)
|
||||
|
||||
def test_gzip_request(self):
|
||||
t = self.Transport()
|
||||
t.encode_threshold = None
|
||||
p = xmlrpclib.ServerProxy(URL, transport=t)
|
||||
self.assertEqual(p.pow(6,8), 6**8)
|
||||
a = self.RequestHandler.content_length
|
||||
t.encode_threshold = 0 #turn on request encoding
|
||||
self.assertEqual(p.pow(6,8), 6**8)
|
||||
b = self.RequestHandler.content_length
|
||||
self.assertTrue(a>b)
|
||||
|
||||
def test_bad_gzip_request(self):
|
||||
t = self.Transport()
|
||||
t.encode_threshold = None
|
||||
t.fake_gzip = True
|
||||
p = xmlrpclib.ServerProxy(URL, transport=t)
|
||||
cm = self.assertRaisesRegexp(xmlrpclib.ProtocolError,
|
||||
re.compile(r"\b400\b"))
|
||||
with cm:
|
||||
p.pow(6, 8)
|
||||
|
||||
def test_gsip_response(self):
|
||||
t = self.Transport()
|
||||
p = xmlrpclib.ServerProxy(URL, transport=t)
|
||||
old = self.requestHandler.encode_threshold
|
||||
self.requestHandler.encode_threshold = None #no encoding
|
||||
self.assertEqual(p.pow(6,8), 6**8)
|
||||
a = t.response_length
|
||||
self.requestHandler.encode_threshold = 0 #always encode
|
||||
self.assertEqual(p.pow(6,8), 6**8)
|
||||
b = t.response_length
|
||||
self.requestHandler.encode_threshold = old
|
||||
self.assertTrue(a>b)
|
||||
|
||||
#Test special attributes of the ServerProxy object
|
||||
class ServerProxyTestCase(unittest.TestCase):
|
||||
def test_close(self):
|
||||
p = xmlrpclib.ServerProxy(URL)
|
||||
self.assertEqual(p('close')(), None)
|
||||
|
||||
def test_transport(self):
|
||||
t = xmlrpclib.Transport()
|
||||
p = xmlrpclib.ServerProxy(URL, transport=t)
|
||||
self.assertEqual(p('transport'), t)
|
||||
|
||||
# This is a contrived way to make a failure occur on the server side
|
||||
# in order to test the _send_traceback_header flag on the server
|
||||
class FailingMessageClass(http.client.HTTPMessage):
|
||||
|
|
@ -647,6 +755,9 @@ def test_main():
|
|||
xmlrpc_tests = [XMLRPCTestCase, HelperTestCase, DateTimeTestCase,
|
||||
BinaryTestCase, FaultTestCase]
|
||||
xmlrpc_tests.append(SimpleServerTestCase)
|
||||
xmlrpc_tests.append(KeepaliveServerTestCase)
|
||||
xmlrpc_tests.append(GzipServerTestCase)
|
||||
xmlrpc_tests.append(ServerProxyTestCase)
|
||||
xmlrpc_tests.append(FailingServerTestCase)
|
||||
xmlrpc_tests.append(CGIHandlerTestCase)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue