Add support for asyncore server-side SSL support. This requires

adding the 'makefile' method to ssl.SSLSocket, and importing the
requisite fakefile class from socket.py, and making the appropriate
changes to it to make it use the SSL connection.

Added sample HTTPS server to test_ssl.py, and test that uses it.

Change SSL tests to use https://svn.python.org/, instead of
www.sf.net and pop.gmail.com.

Added utility function to ssl module, get_server_certificate,
to wrap up the several things to be done to pull a certificate
from a remote server.
This commit is contained in:
Bill Janssen 2007-09-16 22:06:00 +00:00
parent 7e84c7f4b5
commit 296a59d3be
5 changed files with 616 additions and 188 deletions

View file

@ -9,10 +9,13 @@ import subprocess
import time
import os
import pprint
import urllib
import urllib, urlparse
import shutil
import traceback
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
# Optionally test SSL support, if we have it in the tested platform
skip_expected = False
try:
@ -21,6 +24,7 @@ except ImportError:
skip_expected = True
CERTFILE = None
SVN_PYTHON_ORG_ROOT_CERT = None
TESTPORT = 10025
@ -34,24 +38,24 @@ class BasicTests(unittest.TestCase):
def testSSLconnect(self):
import os
with test_support.transient_internet():
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE)
s.connect(("pop.gmail.com", 995))
c = s.getpeercert()
if c:
raise test_support.TestFailed("Peer cert %s shouldn't be here!")
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE)
s.connect(("svn.python.org", 443))
c = s.getpeercert()
if c:
raise test_support.TestFailed("Peer cert %s shouldn't be here!")
s.close()
# this should fail because we have no verification certs
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED)
try:
s.connect(("svn.python.org", 443))
except ssl.SSLError:
pass
finally:
s.close()
# this should fail because we have no verification certs
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED)
try:
s.connect(("pop.gmail.com", 995))
except ssl.SSLError:
pass
finally:
s.close()
def testCrucialConstants(self):
ssl.PROTOCOL_SSLv2
@ -84,6 +88,70 @@ class BasicTests(unittest.TestCase):
if test_support.verbose:
sys.stdout.write("\n" + pprint.pformat(p) + "\n")
def testDERtoPEM(self):
pem = open(SVN_PYTHON_ORG_ROOT_CERT, 'r').read()
d1 = ssl.PEM_cert_to_DER_cert(pem)
p2 = ssl.DER_cert_to_PEM_cert(d1)
d2 = ssl.PEM_cert_to_DER_cert(p2)
if (d1 != d2):
raise test_support.TestFailed("PEM-to-DER or DER-to-PEM translation failed")
class NetworkTests(unittest.TestCase):
def testConnect(self):
import os
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE)
s.connect(("svn.python.org", 443))
c = s.getpeercert()
if c:
raise test_support.TestFailed("Peer cert %s shouldn't be here!")
s.close()
# this should fail because we have no verification certs
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED)
try:
s.connect(("svn.python.org", 443))
except ssl.SSLError:
pass
finally:
s.close()
# this should succeed because we specify the root cert
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
try:
s.connect(("svn.python.org", 443))
except ssl.SSLError, x:
raise test_support.TestFailed("Unexpected exception %s" % x)
finally:
s.close()
def testFetchServerCert(self):
pem = ssl.get_server_certificate(("svn.python.org", 443))
if not pem:
raise test_support.TestFailed("No server certificate on svn.python.org:443!")
try:
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
except ssl.SSLError:
#should fail
pass
else:
raise test_support.TestFailed("Got server certificate %s for svn.python.org!" % pem)
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
if not pem:
raise test_support.TestFailed("No server certificate on svn.python.org:443!")
if test_support.verbose:
sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
try:
import threading
except ImportError:
@ -258,6 +326,133 @@ else:
self.active = False
self.sock.close()
class AsyncoreHTTPSServer(threading.Thread):
class HTTPSServer(HTTPServer):
def __init__(self, server_address, RequestHandlerClass, certfile):
HTTPServer.__init__(self, server_address, RequestHandlerClass)
# we assume the certfile contains both private key and certificate
self.certfile = certfile
self.active = False
self.allow_reuse_address = True
def get_request (self):
# override this to wrap socket with SSL
sock, addr = self.socket.accept()
sslconn = ssl.wrap_socket(sock, server_side=True,
certfile=self.certfile)
return sslconn, addr
# The methods overridden below this are mainly so that we
# can run it in a thread and be able to stop it from another
# You probably wouldn't need them in other uses.
def server_activate(self):
# We want to run this in a thread for testing purposes,
# so we override this to set timeout, so that we get
# a chance to stop the server
self.socket.settimeout(0.5)
HTTPServer.server_activate(self)
def serve_forever(self):
# We want this to run in a thread, so we use a slightly
# modified version of "forever".
self.active = True
while self.active:
try:
self.handle_request()
except socket.timeout:
pass
except KeyboardInterrupt:
self.server_close()
return
except:
sys.stdout.write(''.join(traceback.format_exception(*sys.exc_info())));
def server_close(self):
# Again, we want this to run in a thread, so we need to override
# close to clear the "active" flag, so that serve_forever() will
# terminate.
HTTPServer.server_close(self)
self.active = False
class RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
# need to override translate_path to get a known root,
# instead of using os.curdir, since the test could be
# run from anywhere
server_version = "TestHTTPS/1.0"
root = None
def translate_path(self, path):
"""Translate a /-separated PATH to the local filename syntax.
Components that mean special things to the local file system
(e.g. drive or directory names) are ignored. (XXX They should
probably be diagnosed.)
"""
# abandon query parameters
path = urlparse.urlparse(path)[2]
path = os.path.normpath(urllib.unquote(path))
words = path.split('/')
words = filter(None, words)
path = self.root
for word in words:
drive, word = os.path.splitdrive(word)
head, word = os.path.split(word)
if word in self.root: continue
path = os.path.join(path, word)
return path
def log_message(self, format, *args):
# we override this to suppress logging unless "verbose"
if test_support.verbose:
sys.stdout.write(" server (%s, %d, %s):\n [%s] %s\n" %
(self.server.server_name,
self.server.server_port,
self.request.cipher(),
self.log_date_time_string(),
format%args))
def __init__(self, port, certfile):
self.flag = None
self.active = False
self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
self.server = self.HTTPSServer(
('', port), self.RootedHTTPRequestHandler, certfile)
threading.Thread.__init__(self)
self.setDaemon(True)
def __str__(self):
return '<%s %s:%d>' % (self.__class__.__name__,
self.server.server_name,
self.server.server_port)
def start (self, flag=None):
self.flag = flag
threading.Thread.start(self)
def run (self):
self.active = True
if self.flag:
self.flag.set()
self.server.serve_forever()
self.active = False
def stop (self):
self.active = False
self.server.server_close()
def badCertTest (certfile):
server = ThreadedEchoServer(TESTPORT, CERTFILE,
certreqs=ssl.CERT_REQUIRED,
@ -331,7 +526,6 @@ else:
if connectionchatty:
if test_support.verbose:
sys.stdout.write(" client: closing connection.\n")
s.ssl_shutdown()
s.close()
finally:
server.stop()
@ -388,7 +582,7 @@ else:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, 'SO_REUSEPORT'):
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
port = test_support.bind_port(s, '127.0.0.1', TESTPORT)
s.bind(('127.0.0.1', TESTPORT))
s.listen(5)
listener_ready.set()
s.accept()
@ -566,27 +760,31 @@ else:
sys.stdout.write("\n")
for indata in msgs:
if test_support.verbose:
sys.stdout.write(" client: sending %s...\n" % repr(indata))
sys.stdout.write(
" client: sending %s...\n" % repr(indata))
if wrapped:
conn.write(indata)
outdata = conn.read()
else:
s.send(indata)
outdata = s.recv(1024)
if indata == "STARTTLS" and outdata.strip().lower().startswith("ok"):
if (indata == "STARTTLS" and
outdata.strip().lower().startswith("ok")):
if test_support.verbose:
sys.stdout.write(" client: read %s from server, starting TLS...\n" % repr(outdata))
sys.stdout.write(
" client: read %s from server, starting TLS...\n"
% repr(outdata))
conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
wrapped = True
else:
if test_support.verbose:
sys.stdout.write(" client: read %s from server\n" % repr(outdata))
sys.stdout.write(
" client: read %s from server\n" % repr(outdata))
if test_support.verbose:
sys.stdout.write(" client: closing connection.\n")
if wrapped:
conn.write("over\n")
conn.ssl_shutdown()
else:
s.send("over\n")
s.close()
@ -594,83 +792,43 @@ else:
server.stop()
server.join()
def testAsyncore(self):
CERTFILE_CONFIG_TEMPLATE = """
# create RSA certs - Server
[ req ]
default_bits = 1024
encrypt_key = yes
distinguished_name = req_dn
x509_extensions = cert_type
[ req_dn ]
countryName = Country Name (2 letter code)
countryName_default = US
countryName_min = 2
countryName_max = 2
stateOrProvinceName = State or Province Name (full name)
stateOrProvinceName_default = %(state)s
localityName = Locality Name (eg, city)
localityName_default = %(city)s
0.organizationName = Organization Name (eg, company)
0.organizationName_default = %(organization)s
organizationalUnitName = Organizational Unit Name (eg, section)
organizationalUnitName_default = %(unit)s
0.commonName = Common Name (FQDN of your server)
0.commonName_default = %(common-name)s
# To create a certificate for more than one name uncomment:
# 1.commonName = DNS alias of your server
# 2.commonName = DNS alias of your server
# ...
# See http://home.netscape.com/eng/security/ssl_2.0_certificate.html
# to see how Netscape understands commonName.
[ cert_type ]
nsCertType = server
"""
def create_cert_files(hostname=None):
"""This is the routine that was run to create the certificate
and private key contained in keycert.pem."""
import tempfile, socket, os
d = tempfile.mkdtemp()
# now create a configuration file for the CA signing cert
fqdn = hostname or socket.getfqdn()
crtfile = os.path.join(d, "cert.pem")
conffile = os.path.join(d, "ca.conf")
fp = open(conffile, "w")
fp.write(CERTFILE_CONFIG_TEMPLATE %
{'state': "Delaware",
'city': "Wilmington",
'organization': "Python Software Foundation",
'unit': "SSL",
'common-name': fqdn,
})
fp.close()
error = os.system(
"openssl req -batch -new -x509 -days 2000 -nodes -config %s "
"-keyout \"%s\" -out \"%s\" > /dev/null < /dev/null 2>&1" %
(conffile, crtfile, crtfile))
# now we have a self-signed server cert in crtfile
os.unlink(conffile)
if (os.WEXITSTATUS(error) or
not os.path.exists(crtfile) or os.path.getsize(crtfile) == 0):
if test_support.verbose:
sys.stdout.write("Unable to create certificate for test, "
+ "error status %d\n" % (error >> 8))
crtfile = None
elif test_support.verbose:
sys.stdout.write(open(crtfile, 'r').read() + '\n')
return d, crtfile
server = AsyncoreHTTPSServer(TESTPORT, CERTFILE)
flag = threading.Event()
server.start(flag)
# wait for it to start
flag.wait()
# try to connect
try:
if test_support.verbose:
sys.stdout.write('\n')
d1 = open(CERTFILE, 'r').read()
d2 = ''
# now fetch the same data from the HTTPS server
url = 'https://127.0.0.1:%d/%s' % (
TESTPORT, os.path.split(CERTFILE)[1])
f = urllib.urlopen(url)
dlen = f.info().getheader("content-length")
if dlen and (int(dlen) > 0):
d2 = f.read(int(dlen))
if test_support.verbose:
sys.stdout.write(
" client: read %d bytes from remote server '%s'\n"
% (len(d2), server))
f.close()
except:
msg = ''.join(traceback.format_exception(*sys.exc_info()))
if test_support.verbose:
sys.stdout.write('\n' + msg)
raise test_support.TestFailed(msg)
else:
if not (d1 == d2):
raise test_support.TestFailed(
"Couldn't fetch data from HTTPS server")
finally:
server.stop()
server.join()
def findtestsocket(start, end):
@ -695,10 +853,15 @@ def test_main(verbose=False):
if skip_expected:
raise test_support.TestSkipped("No SSL support")
global CERTFILE, TESTPORT
global CERTFILE, TESTPORT, SVN_PYTHON_ORG_ROOT_CERT
CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
"keycert.pem")
if (not os.path.exists(CERTFILE)):
"keycert.pem")
SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
os.path.dirname(__file__) or os.curdir,
"https_svn_python_org_root.pem")
if (not os.path.exists(CERTFILE) or
not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)):
raise test_support.TestFailed("Can't read certificate files!")
TESTPORT = findtestsocket(10025, 12000)
if not TESTPORT:
@ -706,9 +869,12 @@ def test_main(verbose=False):
tests = [BasicTests]
if test_support.is_resource_enabled('network'):
tests.append(NetworkTests)
if _have_threads:
thread_info = test_support.threading_setup()
if CERTFILE and thread_info and test_support.is_resource_enabled('network'):
if thread_info and test_support.is_resource_enabled('network'):
tests.append(ConnectedTests)
test_support.run_unittest(*tests)