Issue #8550: Add first class SSLContext objects to the ssl module.

This commit is contained in:
Antoine Pitrou 2010-05-16 18:19:27 +00:00
parent 8eac60d9af
commit 152efa2ae2
7 changed files with 864 additions and 283 deletions

View file

@ -10,6 +10,7 @@ import gc
import os
import errno
import pprint
import tempfile
import urllib.parse, urllib.request
import traceback
import asyncore
@ -25,8 +26,30 @@ except ImportError:
skip_expected = True
HOST = support.HOST
CERTFILE = None
SVN_PYTHON_ORG_ROOT_CERT = None
PROTOCOLS = [
ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3,
ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1
]
data_file = lambda name: os.path.join(os.path.dirname(__file__), name)
fsencode = lambda name: name.encode(sys.getfilesystemencoding(), "surrogateescape")
CERTFILE = data_file("keycert.pem")
BYTES_CERTFILE = fsencode(CERTFILE)
ONLYCERT = data_file("ssl_cert.pem")
ONLYKEY = data_file("ssl_key.pem")
BYTES_ONLYCERT = fsencode(ONLYCERT)
BYTES_ONLYKEY = fsencode(ONLYKEY)
CAPATH = data_file("capath")
BYTES_CAPATH = fsencode(CAPATH)
SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
EMPTYCERT = data_file("nullcert.pem")
BADCERT = data_file("badcert.pem")
WRONGCERT = data_file("XXXnonexisting.pem")
BADKEY = data_file("badkey.pem")
def handle_error(prefix):
exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
@ -34,7 +57,7 @@ def handle_error(prefix):
sys.stdout.write(prefix + exc_format)
class BasicTests(unittest.TestCase):
class BasicSocketTests(unittest.TestCase):
def test_constants(self):
ssl.PROTOCOL_SSLv2
@ -116,11 +139,10 @@ class BasicTests(unittest.TestCase):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")
s.connect(remote)
# Error checking occurs when connecting, because the SSL context
# isn't created before.
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
# Error checking can happen at instantiation or when connecting
with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
s.connect(remote)
@support.cpython_only
@ -143,26 +165,103 @@ class BasicTests(unittest.TestCase):
self.assertEqual(timeout, ss.gettimeout())
class ContextTests(unittest.TestCase):
def test_constructor(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv2)
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv3)
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
self.assertRaises(TypeError, ssl.SSLContext)
self.assertRaises(ValueError, ssl.SSLContext, -1)
self.assertRaises(ValueError, ssl.SSLContext, 42)
def test_protocol(self):
for proto in PROTOCOLS:
ctx = ssl.SSLContext(proto)
self.assertEqual(ctx.protocol, proto)
def test_ciphers(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
ctx.set_ciphers("ALL")
ctx.set_ciphers("DEFAULT")
with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
ctx.set_ciphers("^$:,;?*'dorothyx")
def test_verify(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
# Default value
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
ctx.verify_mode = ssl.CERT_OPTIONAL
self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
ctx.verify_mode = ssl.CERT_REQUIRED
self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
ctx.verify_mode = ssl.CERT_NONE
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
with self.assertRaises(TypeError):
ctx.verify_mode = None
with self.assertRaises(ValueError):
ctx.verify_mode = 42
def test_load_cert_chain(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
# Combined key and cert in a single file
ctx.load_cert_chain(CERTFILE)
ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
with self.assertRaisesRegexp(ssl.SSLError, "system lib"):
ctx.load_cert_chain(WRONGCERT)
with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
ctx.load_cert_chain(BADCERT)
with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
ctx.load_cert_chain(EMPTYCERT)
# Separate key and cert
ctx.load_cert_chain(ONLYCERT, ONLYKEY)
ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
ctx.load_cert_chain(ONLYCERT)
with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
ctx.load_cert_chain(ONLYKEY)
with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
# Mismatching key and cert
with self.assertRaisesRegexp(ssl.SSLError, "key values mismatch"):
ctx.load_cert_chain(CERTFILE, ONLYKEY)
def test_load_verify_locations(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
ctx.load_verify_locations(CERTFILE)
ctx.load_verify_locations(cafile=CERTFILE, capath=None)
ctx.load_verify_locations(BYTES_CERTFILE)
ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
self.assertRaises(TypeError, ctx.load_verify_locations)
self.assertRaises(TypeError, ctx.load_verify_locations, None, None)
with self.assertRaisesRegexp(ssl.SSLError, "system lib"):
ctx.load_verify_locations(WRONGCERT)
with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
ctx.load_verify_locations(BADCERT)
ctx.load_verify_locations(CERTFILE, CAPATH)
ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
class NetworkedTests(unittest.TestCase):
def test_connect(self):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE)
s.connect(("svn.python.org", 443))
c = s.getpeercert()
if c:
self.fail("Peer cert %s shouldn't be here!")
s.close()
try:
s.connect(("svn.python.org", 443))
self.assertEqual({}, s.getpeercert())
finally:
s.close()
# this should fail because we have no verification certs
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED)
try:
s.connect(("svn.python.org", 443))
except ssl.SSLError:
pass
finally:
s.close()
self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
s.connect, ("svn.python.org", 443))
s.close()
# this should succeed because we specify the root cert
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
@ -170,6 +269,56 @@ class NetworkedTests(unittest.TestCase):
ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
try:
s.connect(("svn.python.org", 443))
self.assertTrue(s.getpeercert())
finally:
s.close()
def test_connect_with_context(self):
# Same as test_connect, but with a separately created context
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
s.connect(("svn.python.org", 443))
try:
self.assertEqual({}, s.getpeercert())
finally:
s.close()
# This should fail because we have no verification certs
ctx.verify_mode = ssl.CERT_REQUIRED
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
s.connect, ("svn.python.org", 443))
s.close()
# This should succeed because we specify the root cert
ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
s.connect(("svn.python.org", 443))
try:
cert = s.getpeercert()
self.assertTrue(cert)
finally:
s.close()
def test_connect_capath(self):
# Verify server certificates using the `capath` argument
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(capath=CAPATH)
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
s.connect(("svn.python.org", 443))
try:
cert = s.getpeercert()
self.assertTrue(cert)
finally:
s.close()
# Same with a bytes `capath` argument
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(capath=BYTES_CAPATH)
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
s.connect(("svn.python.org", 443))
try:
cert = s.getpeercert()
self.assertTrue(cert)
finally:
s.close()
@ -1227,18 +1376,14 @@ def test_main(verbose=False):
if skip_expected:
raise unittest.SkipTest("No SSL support")
global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT
CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
"keycert.pem")
SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
os.path.dirname(__file__) or os.curdir,
"https_svn_python_org_root.pem")
for filename in [
CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
BADCERT, BADKEY, EMPTYCERT]:
if not os.path.exists(filename):
raise support.TestFailed("Can't read certificate file %r" % filename)
if (not os.path.exists(CERTFILE) or
not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)):
raise support.TestFailed("Can't read certificate files!")
tests = [BasicTests]
tests = [ContextTests, BasicSocketTests]
if support.is_resource_enabled('network'):
tests.append(NetworkedTests)