Issue #18138: Implement cadata argument of SSLContext.load_verify_location()

to load CA certificates and CRL from memory. It supports PEM and DER
encoded strings.
This commit is contained in:
Christian Heimes 2013-11-21 03:35:02 +01:00
parent e6e2d9be6e
commit efff7060f8
4 changed files with 267 additions and 30 deletions

View file

@ -25,7 +25,8 @@ ssl = support.import_module("ssl")
PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
HOST = support.HOST
data_file = lambda name: os.path.join(os.path.dirname(__file__), name)
def data_file(*name):
return os.path.join(os.path.dirname(__file__), *name)
# The custom key and certificate files used in test_ssl are generated
# using Lib/test/make_ssl_certs.py.
@ -43,6 +44,9 @@ ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
KEY_PASSWORD = "somepass"
CAPATH = data_file("capath")
BYTES_CAPATH = os.fsencode(CAPATH)
CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
CAFILE_CACERT = data_file("capath", "5ed36f99.0")
# Two keys and certs signed by the same CA (for SNI tests)
SIGNED_CERTFILE = data_file("keycert3.pem")
@ -726,7 +730,7 @@ class ContextTests(unittest.TestCase):
ctx.load_verify_locations(BYTES_CERTFILE)
ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
self.assertRaises(TypeError, ctx.load_verify_locations)
self.assertRaises(TypeError, ctx.load_verify_locations, None, None)
self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
with self.assertRaises(OSError) as cm:
ctx.load_verify_locations(WRONGCERT)
self.assertEqual(cm.exception.errno, errno.ENOENT)
@ -738,6 +742,64 @@ class ContextTests(unittest.TestCase):
# Issue #10989: crash if the second argument type is invalid
self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
def test_load_verify_cadata(self):
# test cadata
with open(CAFILE_CACERT) as f:
cacert_pem = f.read()
cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
with open(CAFILE_NEURONIO) as f:
neuronio_pem = f.read()
neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
# test PEM
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
ctx.load_verify_locations(cadata=cacert_pem)
self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
ctx.load_verify_locations(cadata=neuronio_pem)
self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
# cert already in hash table
ctx.load_verify_locations(cadata=neuronio_pem)
self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
# combined
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
combined = "\n".join((cacert_pem, neuronio_pem))
ctx.load_verify_locations(cadata=combined)
self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
# with junk around the certs
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
combined = ["head", cacert_pem, "other", neuronio_pem, "again",
neuronio_pem, "tail"]
ctx.load_verify_locations(cadata="\n".join(combined))
self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
# test DER
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
ctx.load_verify_locations(cadata=cacert_der)
ctx.load_verify_locations(cadata=neuronio_der)
self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
# cert already in hash table
ctx.load_verify_locations(cadata=cacert_der)
self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
# combined
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
combined = b"".join((cacert_der, neuronio_der))
ctx.load_verify_locations(cadata=combined)
self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
# error cases
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
with self.assertRaisesRegex(ssl.SSLError, "no start line"):
ctx.load_verify_locations(cadata="broken")
with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
ctx.load_verify_locations(cadata=b"broken")
def test_load_dh_params(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
ctx.load_dh_params(DHFILE)
@ -1057,6 +1119,28 @@ class NetworkedTests(unittest.TestCase):
finally:
s.close()
def test_connect_cadata(self):
with open(CAFILE_CACERT) as f:
pem = f.read()
der = ssl.PEM_cert_to_DER_cert(pem)
with support.transient_internet("svn.python.org"):
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(cadata=pem)
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
s.connect(("svn.python.org", 443))
cert = s.getpeercert()
self.assertTrue(cert)
# same with DER
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(cadata=der)
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
s.connect(("svn.python.org", 443))
cert = s.getpeercert()
self.assertTrue(cert)
@unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
def test_makefile_close(self):
# Issue #5238: creating a file-like object with makefile() shouldn't