mirror of
https://github.com/python/cpython.git
synced 2025-07-24 11:44:31 +00:00
Bill Janssen wrote:
Here's a patch which makes test_ssl a better player in the buildbots environment. I deep-ended on "try-except-else" clauses.
This commit is contained in:
parent
7fc8e2993a
commit
e472933e27
2 changed files with 87 additions and 43 deletions
|
@ -91,38 +91,66 @@ class ConnectedTests(unittest.TestCase):
|
|||
def testTLSecho (self):
|
||||
|
||||
s1 = socket.socket()
|
||||
s1.connect(('127.0.0.1', 10024))
|
||||
c1 = ssl.sslsocket(s1, ssl_version=ssl.PROTOCOL_TLSv1)
|
||||
indata = "FOO\n"
|
||||
c1.write(indata)
|
||||
outdata = c1.read()
|
||||
if outdata != indata.lower():
|
||||
sys.stderr.write("bad data <<%s>> received\n" % data)
|
||||
c1.close()
|
||||
try:
|
||||
s1.connect(('127.0.0.1', 10024))
|
||||
except:
|
||||
sys.stdout.write("connection failure:\n" + string.join(
|
||||
traceback.format_exception(*sys.exc_info())))
|
||||
raise test_support.TestFailed("Can't connect to test server")
|
||||
else:
|
||||
try:
|
||||
c1 = ssl.sslsocket(s1, ssl_version=ssl.PROTOCOL_TLSv1)
|
||||
except:
|
||||
sys.stdout.write("SSL handshake failure:\n" + string.join(
|
||||
traceback.format_exception(*sys.exc_info())))
|
||||
raise test_support.TestFailed("Can't SSL-handshake with test server")
|
||||
else:
|
||||
if not c1:
|
||||
raise test_support.TestFailed("Can't SSL-handshake with test server")
|
||||
indata = "FOO\n"
|
||||
c1.write(indata)
|
||||
outdata = c1.read()
|
||||
if outdata != indata.lower():
|
||||
raise test_support.TestFailed("bad data <<%s>> received; expected <<%s>>\n" % (data, indata.lower()))
|
||||
c1.close()
|
||||
|
||||
def testReadCert(self):
|
||||
|
||||
s2 = socket.socket()
|
||||
s2.connect(('127.0.0.1', 10024))
|
||||
c2 = ssl.sslsocket(s2, ssl_version=ssl.PROTOCOL_TLSv1,
|
||||
cert_reqs=ssl.CERT_REQUIRED, ca_certs=CERTFILE)
|
||||
cert = c2.getpeercert()
|
||||
if not cert:
|
||||
raise test_support.TestFailed("Can't get peer certificate.")
|
||||
if not cert.has_key('subject'):
|
||||
raise test_support.TestFailed(
|
||||
"No subject field in certificate: %s." %
|
||||
pprint.pformat(cert))
|
||||
if not (cert['subject'].has_key('organizationName')):
|
||||
raise test_support.TestFailed(
|
||||
"No 'organizationName' field in certificate subject: %s." %
|
||||
pprint.pformat(cert))
|
||||
if (cert['subject']['organizationName'] !=
|
||||
"Python Software Foundation"):
|
||||
raise test_support.TestFailed(
|
||||
"Invalid 'organizationName' field in certificate subject; "
|
||||
"should be 'Python Software Foundation'.");
|
||||
c2.close()
|
||||
try:
|
||||
s2.connect(('127.0.0.1', 10024))
|
||||
except:
|
||||
sys.stdout.write("connection failure:\n" + string.join(
|
||||
traceback.format_exception(*sys.exc_info())))
|
||||
raise test_support.TestFailed("Can't connect to test server")
|
||||
else:
|
||||
try:
|
||||
c2 = ssl.sslsocket(s2, ssl_version=ssl.PROTOCOL_TLSv1,
|
||||
cert_reqs=ssl.CERT_REQUIRED, ca_certs=CERTFILE)
|
||||
except:
|
||||
sys.stdout.write("SSL handshake failure:\n" + string.join(
|
||||
traceback.format_exception(*sys.exc_info())))
|
||||
raise test_support.TestFailed("Can't SSL-handshake with test server")
|
||||
else:
|
||||
if not c2:
|
||||
raise test_support.TestFailed("Can't SSL-handshake with test server")
|
||||
cert = c2.getpeercert()
|
||||
if not cert:
|
||||
raise test_support.TestFailed("Can't get peer certificate.")
|
||||
if not cert.has_key('subject'):
|
||||
raise test_support.TestFailed(
|
||||
"No subject field in certificate: %s." %
|
||||
pprint.pformat(cert))
|
||||
if not (cert['subject'].has_key('organizationName')):
|
||||
raise test_support.TestFailed(
|
||||
"No 'organizationName' field in certificate subject: %s." %
|
||||
pprint.pformat(cert))
|
||||
if (cert['subject']['organizationName'] !=
|
||||
"Python Software Foundation"):
|
||||
raise test_support.TestFailed(
|
||||
"Invalid 'organizationName' field in certificate subject; "
|
||||
"should be 'Python Software Foundation'.");
|
||||
c2.close()
|
||||
|
||||
|
||||
class threadedEchoServer(threading.Thread):
|
||||
|
@ -138,10 +166,22 @@ class threadedEchoServer(threading.Thread):
|
|||
|
||||
def run (self):
|
||||
self.running = True
|
||||
sslconn = ssl.sslsocket(self.sock, server_side=True,
|
||||
certfile=self.server.certificate,
|
||||
ssl_version=self.server.protocol,
|
||||
cert_reqs=self.server.certreqs)
|
||||
try:
|
||||
sslconn = ssl.sslsocket(self.sock, server_side=True,
|
||||
certfile=self.server.certificate,
|
||||
ssl_version=self.server.protocol,
|
||||
cert_reqs=self.server.certreqs)
|
||||
except:
|
||||
# here, we want to stop the server, because this shouldn't
|
||||
# happen in the context of our test case
|
||||
sys.stdout.write("Test server failure:\n" + string.join(
|
||||
traceback.format_exception(*sys.exc_info())))
|
||||
self.running = False
|
||||
# normally, we'd just stop here, but for the test
|
||||
# harness, we want to stop the server
|
||||
self.server.stop()
|
||||
return
|
||||
|
||||
while self.running:
|
||||
try:
|
||||
msg = sslconn.read()
|
||||
|
@ -154,15 +194,18 @@ class threadedEchoServer(threading.Thread):
|
|||
self.server.stop()
|
||||
self.running = False
|
||||
else:
|
||||
# print "server:", msg.strip().lower()
|
||||
sys.stdout.write("\nserver: %s\n" % msg.strip().lower())
|
||||
sslconn.write(msg.lower())
|
||||
except ssl.sslerror:
|
||||
sys.stderr.write(string.join(
|
||||
sys.stdout.write("Test server failure:\n" + string.join(
|
||||
traceback.format_exception(*sys.exc_info())))
|
||||
sslconn.close()
|
||||
self.running = False
|
||||
# normally, we'd just stop here, but for the test
|
||||
# harness, we want to stop the server
|
||||
self.server.stop()
|
||||
except:
|
||||
sys.stderr.write(string.join(
|
||||
sys.stdout.write(string.join(
|
||||
traceback.format_exception(*sys.exc_info())))
|
||||
|
||||
def __init__(self, port, certificate, ssl_version=None,
|
||||
|
@ -192,20 +235,20 @@ class threadedEchoServer(threading.Thread):
|
|||
while self.active:
|
||||
try:
|
||||
newconn, connaddr = self.sock.accept()
|
||||
# sys.stderr.write('new connection from ' + str(connaddr))
|
||||
sys.stdout.write('\nserver: new connection from ' + str(connaddr) + '\n')
|
||||
handler = self.connectionHandler(self, newconn)
|
||||
handler.start()
|
||||
except socket.timeout:
|
||||
pass
|
||||
except KeyboardInterrupt:
|
||||
self.active = False
|
||||
self.stop()
|
||||
except:
|
||||
sys.stderr.write(string.join(
|
||||
sys.stdout.write("Test server failure:\n" + string.join(
|
||||
traceback.format_exception(*sys.exc_info())))
|
||||
|
||||
def stop (self):
|
||||
self.active = False
|
||||
|
||||
self.sock.close()
|
||||
|
||||
CERTFILE_CONFIG_TEMPLATE = """
|
||||
# create RSA certs - Server
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue