mirror of
				https://github.com/python/cpython.git
				synced 2025-10-24 23:46:23 +00:00 
			
		
		
		
	 16994912c9
			
		
	
	
		16994912c9
		
			
		
	
	
	
	
		
			
			* Move socket related functions from test.support to socket_helper. * Import socket, nntplib and urllib.error lazily in transient_internet(). * Remove importing multiprocess.
		
			
				
	
	
		
			209 lines
		
	
	
	
		
			7.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			209 lines
		
	
	
	
		
			7.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os
 | |
| import sys
 | |
| import ssl
 | |
| import pprint
 | |
| import threading
 | |
| import urllib.parse
 | |
| # Rename HTTPServer to _HTTPServer so as to avoid confusion with HTTPSServer.
 | |
| from http.server import (HTTPServer as _HTTPServer,
 | |
|     SimpleHTTPRequestHandler, BaseHTTPRequestHandler)
 | |
| 
 | |
| from test import support
 | |
| from test.support import socket_helper
 | |
| 
 | |
| here = os.path.dirname(__file__)
 | |
| 
 | |
| HOST = socket_helper.HOST
 | |
| CERTFILE = os.path.join(here, 'keycert.pem')
 | |
| 
 | |
| # This one's based on HTTPServer, which is based on socketserver
 | |
| 
 | |
| class HTTPSServer(_HTTPServer):
 | |
| 
 | |
|     def __init__(self, server_address, handler_class, context):
 | |
|         _HTTPServer.__init__(self, server_address, handler_class)
 | |
|         self.context = context
 | |
| 
 | |
|     def __str__(self):
 | |
|         return ('<%s %s:%s>' %
 | |
|                 (self.__class__.__name__,
 | |
|                  self.server_name,
 | |
|                  self.server_port))
 | |
| 
 | |
|     def get_request(self):
 | |
|         # override this to wrap socket with SSL
 | |
|         try:
 | |
|             sock, addr = self.socket.accept()
 | |
|             sslconn = self.context.wrap_socket(sock, server_side=True)
 | |
|         except OSError as e:
 | |
|             # socket errors are silenced by the caller, print them here
 | |
|             if support.verbose:
 | |
|                 sys.stderr.write("Got an error:\n%s\n" % e)
 | |
|             raise
 | |
|         return sslconn, addr
 | |
| 
 | |
| class RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
 | |
|     # need to override translate_path to get a known root,
 | |
|     # instead of using os.curdir, since the test could be
 | |
|     # run from anywhere
 | |
| 
 | |
|     server_version = "TestHTTPS/1.0"
 | |
|     root = here
 | |
|     # Avoid hanging when a request gets interrupted by the client
 | |
|     timeout = support.LOOPBACK_TIMEOUT
 | |
| 
 | |
|     def translate_path(self, path):
 | |
|         """Translate a /-separated PATH to the local filename syntax.
 | |
| 
 | |
|         Components that mean special things to the local file system
 | |
|         (e.g. drive or directory names) are ignored.  (XXX They should
 | |
|         probably be diagnosed.)
 | |
| 
 | |
|         """
 | |
|         # abandon query parameters
 | |
|         path = urllib.parse.urlparse(path)[2]
 | |
|         path = os.path.normpath(urllib.parse.unquote(path))
 | |
|         words = path.split('/')
 | |
|         words = filter(None, words)
 | |
|         path = self.root
 | |
|         for word in words:
 | |
|             drive, word = os.path.splitdrive(word)
 | |
|             head, word = os.path.split(word)
 | |
|             path = os.path.join(path, word)
 | |
|         return path
 | |
| 
 | |
|     def log_message(self, format, *args):
 | |
|         # we override this to suppress logging unless "verbose"
 | |
|         if support.verbose:
 | |
|             sys.stdout.write(" server (%s:%d %s):\n   [%s] %s\n" %
 | |
|                              (self.server.server_address,
 | |
|                               self.server.server_port,
 | |
|                               self.request.cipher(),
 | |
|                               self.log_date_time_string(),
 | |
|                               format%args))
 | |
| 
 | |
| 
 | |
| class StatsRequestHandler(BaseHTTPRequestHandler):
 | |
|     """Example HTTP request handler which returns SSL statistics on GET
 | |
|     requests.
 | |
|     """
 | |
| 
 | |
|     server_version = "StatsHTTPS/1.0"
 | |
| 
 | |
|     def do_GET(self, send_body=True):
 | |
|         """Serve a GET request."""
 | |
|         sock = self.rfile.raw._sock
 | |
|         context = sock.context
 | |
|         stats = {
 | |
|             'session_cache': context.session_stats(),
 | |
|             'cipher': sock.cipher(),
 | |
|             'compression': sock.compression(),
 | |
|             }
 | |
|         body = pprint.pformat(stats)
 | |
|         body = body.encode('utf-8')
 | |
|         self.send_response(200)
 | |
|         self.send_header("Content-type", "text/plain; charset=utf-8")
 | |
|         self.send_header("Content-Length", str(len(body)))
 | |
|         self.end_headers()
 | |
|         if send_body:
 | |
|             self.wfile.write(body)
 | |
| 
 | |
|     def do_HEAD(self):
 | |
|         """Serve a HEAD request."""
 | |
|         self.do_GET(send_body=False)
 | |
| 
 | |
|     def log_request(self, format, *args):
 | |
|         if support.verbose:
 | |
|             BaseHTTPRequestHandler.log_request(self, format, *args)
 | |
| 
 | |
| 
 | |
| class HTTPSServerThread(threading.Thread):
 | |
| 
 | |
|     def __init__(self, context, host=HOST, handler_class=None):
 | |
|         self.flag = None
 | |
|         self.server = HTTPSServer((host, 0),
 | |
|                                   handler_class or RootedHTTPRequestHandler,
 | |
|                                   context)
 | |
|         self.port = self.server.server_port
 | |
|         threading.Thread.__init__(self)
 | |
|         self.daemon = True
 | |
| 
 | |
|     def __str__(self):
 | |
|         return "<%s %s>" % (self.__class__.__name__, self.server)
 | |
| 
 | |
|     def start(self, flag=None):
 | |
|         self.flag = flag
 | |
|         threading.Thread.start(self)
 | |
| 
 | |
|     def run(self):
 | |
|         if self.flag:
 | |
|             self.flag.set()
 | |
|         try:
 | |
|             self.server.serve_forever(0.05)
 | |
|         finally:
 | |
|             self.server.server_close()
 | |
| 
 | |
|     def stop(self):
 | |
|         self.server.shutdown()
 | |
| 
 | |
| 
 | |
| def make_https_server(case, *, context=None, certfile=CERTFILE,
 | |
|                       host=HOST, handler_class=None):
 | |
|     if context is None:
 | |
|         context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
 | |
|     # We assume the certfile contains both private key and certificate
 | |
|     context.load_cert_chain(certfile)
 | |
|     server = HTTPSServerThread(context, host, handler_class)
 | |
|     flag = threading.Event()
 | |
|     server.start(flag)
 | |
|     flag.wait()
 | |
|     def cleanup():
 | |
|         if support.verbose:
 | |
|             sys.stdout.write('stopping HTTPS server\n')
 | |
|         server.stop()
 | |
|         if support.verbose:
 | |
|             sys.stdout.write('joining HTTPS thread\n')
 | |
|         server.join()
 | |
|     case.addCleanup(cleanup)
 | |
|     return server
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     import argparse
 | |
|     parser = argparse.ArgumentParser(
 | |
|         description='Run a test HTTPS server. '
 | |
|                     'By default, the current directory is served.')
 | |
|     parser.add_argument('-p', '--port', type=int, default=4433,
 | |
|                         help='port to listen on (default: %(default)s)')
 | |
|     parser.add_argument('-q', '--quiet', dest='verbose', default=True,
 | |
|                         action='store_false', help='be less verbose')
 | |
|     parser.add_argument('-s', '--stats', dest='use_stats_handler', default=False,
 | |
|                         action='store_true', help='always return stats page')
 | |
|     parser.add_argument('--curve-name', dest='curve_name', type=str,
 | |
|                         action='store',
 | |
|                         help='curve name for EC-based Diffie-Hellman')
 | |
|     parser.add_argument('--ciphers', dest='ciphers', type=str,
 | |
|                         help='allowed cipher list')
 | |
|     parser.add_argument('--dh', dest='dh_file', type=str, action='store',
 | |
|                         help='PEM file containing DH parameters')
 | |
|     args = parser.parse_args()
 | |
| 
 | |
|     support.verbose = args.verbose
 | |
|     if args.use_stats_handler:
 | |
|         handler_class = StatsRequestHandler
 | |
|     else:
 | |
|         handler_class = RootedHTTPRequestHandler
 | |
|         handler_class.root = os.getcwd()
 | |
|     context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
 | |
|     context.load_cert_chain(CERTFILE)
 | |
|     if args.curve_name:
 | |
|         context.set_ecdh_curve(args.curve_name)
 | |
|     if args.dh_file:
 | |
|         context.load_dh_params(args.dh_file)
 | |
|     if args.ciphers:
 | |
|         context.set_ciphers(args.ciphers)
 | |
| 
 | |
|     server = HTTPSServer(("", args.port), handler_class, context)
 | |
|     if args.verbose:
 | |
|         print("Listening on https://localhost:{0.port}".format(args))
 | |
|     server.serve_forever(0.1)
 |