mirror of
https://github.com/python/cpython.git
synced 2025-11-02 03:01:58 +00:00
Issue #8109: The ssl module now has support for server-side SNI, thanks to a :meth:SSLContext.set_servername_callback method.
Patch by Daniel Black.
This commit is contained in:
parent
3c9850aad7
commit
58ddc9d743
11 changed files with 881 additions and 42 deletions
|
|
@ -48,6 +48,11 @@ KEY_PASSWORD = "somepass"
|
|||
CAPATH = data_file("capath")
|
||||
BYTES_CAPATH = os.fsencode(CAPATH)
|
||||
|
||||
# Two keys and certs signed by the same CA (for SNI tests)
|
||||
SIGNED_CERTFILE = data_file("keycert3.pem")
|
||||
SIGNED_CERTFILE2 = data_file("keycert4.pem")
|
||||
SIGNING_CA = data_file("pycacert.pem")
|
||||
|
||||
SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
|
||||
|
||||
EMPTYCERT = data_file("nullcert.pem")
|
||||
|
|
@ -59,6 +64,7 @@ NOKIACERT = data_file("nokia.pem")
|
|||
DHFILE = data_file("dh512.pem")
|
||||
BYTES_DHFILE = os.fsencode(DHFILE)
|
||||
|
||||
|
||||
def handle_error(prefix):
|
||||
exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
|
||||
if support.verbose:
|
||||
|
|
@ -89,6 +95,8 @@ def skip_if_broken_ubuntu_ssl(func):
|
|||
else:
|
||||
return func
|
||||
|
||||
needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
|
||||
|
||||
|
||||
class BasicSocketTests(unittest.TestCase):
|
||||
|
||||
|
|
@ -142,6 +150,7 @@ class BasicSocketTests(unittest.TestCase):
|
|||
(('organizationName', 'Python Software Foundation'),),
|
||||
(('commonName', 'localhost'),))
|
||||
)
|
||||
# Note the next three asserts will fail if the keys are regenerated
|
||||
self.assertEqual(p['notAfter'], 'Oct 5 23:01:56 2020 GMT')
|
||||
self.assertEqual(p['notBefore'], 'Oct 8 23:01:56 2010 GMT')
|
||||
self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
|
||||
|
|
@ -585,6 +594,34 @@ class ContextTests(unittest.TestCase):
|
|||
self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
|
||||
self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
|
||||
|
||||
@needs_sni
|
||||
def test_sni_callback(self):
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
|
||||
|
||||
# set_servername_callback expects a callable, or None
|
||||
self.assertRaises(TypeError, ctx.set_servername_callback)
|
||||
self.assertRaises(TypeError, ctx.set_servername_callback, 4)
|
||||
self.assertRaises(TypeError, ctx.set_servername_callback, "")
|
||||
self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
|
||||
|
||||
def dummycallback(sock, servername, ctx):
|
||||
pass
|
||||
ctx.set_servername_callback(None)
|
||||
ctx.set_servername_callback(dummycallback)
|
||||
|
||||
@needs_sni
|
||||
def test_sni_callback_refcycle(self):
|
||||
# Reference cycles through the servername callback are detected
|
||||
# and cleared.
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
|
||||
def dummycallback(sock, servername, ctx, cycle=ctx):
|
||||
pass
|
||||
ctx.set_servername_callback(dummycallback)
|
||||
wr = weakref.ref(ctx)
|
||||
del ctx, dummycallback
|
||||
gc.collect()
|
||||
self.assertIs(wr(), None)
|
||||
|
||||
|
||||
class SSLErrorTests(unittest.TestCase):
|
||||
|
||||
|
|
@ -1249,7 +1286,7 @@ else:
|
|||
raise AssertionError("Use of invalid cert should have failed!")
|
||||
|
||||
def server_params_test(client_context, server_context, indata=b"FOO\n",
|
||||
chatty=True, connectionchatty=False):
|
||||
chatty=True, connectionchatty=False, sni_name=None):
|
||||
"""
|
||||
Launch a server, connect a client to it and try various reads
|
||||
and writes.
|
||||
|
|
@ -1259,7 +1296,8 @@ else:
|
|||
chatty=chatty,
|
||||
connectionchatty=False)
|
||||
with server:
|
||||
with client_context.wrap_socket(socket.socket()) as s:
|
||||
with client_context.wrap_socket(socket.socket(),
|
||||
server_hostname=sni_name) as s:
|
||||
s.connect((HOST, server.port))
|
||||
for arg in [indata, bytearray(indata), memoryview(indata)]:
|
||||
if connectionchatty:
|
||||
|
|
@ -1283,6 +1321,7 @@ else:
|
|||
stats.update({
|
||||
'compression': s.compression(),
|
||||
'cipher': s.cipher(),
|
||||
'peercert': s.getpeercert(),
|
||||
'client_npn_protocol': s.selected_npn_protocol()
|
||||
})
|
||||
s.close()
|
||||
|
|
@ -1988,6 +2027,100 @@ else:
|
|||
if len(stats['server_npn_protocols']) else 'nothing'
|
||||
self.assertEqual(server_result, expected, msg % (server_result, "server"))
|
||||
|
||||
def sni_contexts(self):
|
||||
server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
|
||||
server_context.load_cert_chain(SIGNED_CERTFILE)
|
||||
other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
|
||||
other_context.load_cert_chain(SIGNED_CERTFILE2)
|
||||
client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
|
||||
client_context.verify_mode = ssl.CERT_REQUIRED
|
||||
client_context.load_verify_locations(SIGNING_CA)
|
||||
return server_context, other_context, client_context
|
||||
|
||||
def check_common_name(self, stats, name):
|
||||
cert = stats['peercert']
|
||||
self.assertIn((('commonName', name),), cert['subject'])
|
||||
|
||||
@needs_sni
|
||||
def test_sni_callback(self):
|
||||
calls = []
|
||||
server_context, other_context, client_context = self.sni_contexts()
|
||||
|
||||
def servername_cb(ssl_sock, server_name, initial_context):
|
||||
calls.append((server_name, initial_context))
|
||||
ssl_sock.context = other_context
|
||||
server_context.set_servername_callback(servername_cb)
|
||||
|
||||
stats = server_params_test(client_context, server_context,
|
||||
chatty=True,
|
||||
sni_name='supermessage')
|
||||
# The hostname was fetched properly, and the certificate was
|
||||
# changed for the connection.
|
||||
self.assertEqual(calls, [("supermessage", server_context)])
|
||||
# CERTFILE4 was selected
|
||||
self.check_common_name(stats, 'fakehostname')
|
||||
|
||||
# Check disabling the callback
|
||||
calls = []
|
||||
server_context.set_servername_callback(None)
|
||||
|
||||
stats = server_params_test(client_context, server_context,
|
||||
chatty=True,
|
||||
sni_name='notfunny')
|
||||
# Certificate didn't change
|
||||
self.check_common_name(stats, 'localhost')
|
||||
self.assertEqual(calls, [])
|
||||
|
||||
@needs_sni
|
||||
def test_sni_callback_alert(self):
|
||||
# Returning a TLS alert is reflected to the connecting client
|
||||
server_context, other_context, client_context = self.sni_contexts()
|
||||
|
||||
def cb_returning_alert(ssl_sock, server_name, initial_context):
|
||||
return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
|
||||
server_context.set_servername_callback(cb_returning_alert)
|
||||
|
||||
with self.assertRaises(ssl.SSLError) as cm:
|
||||
stats = server_params_test(client_context, server_context,
|
||||
chatty=False,
|
||||
sni_name='supermessage')
|
||||
self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
|
||||
|
||||
@needs_sni
|
||||
def test_sni_callback_raising(self):
|
||||
# Raising fails the connection with a TLS handshake failure alert.
|
||||
server_context, other_context, client_context = self.sni_contexts()
|
||||
|
||||
def cb_raising(ssl_sock, server_name, initial_context):
|
||||
1/0
|
||||
server_context.set_servername_callback(cb_raising)
|
||||
|
||||
with self.assertRaises(ssl.SSLError) as cm, \
|
||||
support.captured_stderr() as stderr:
|
||||
stats = server_params_test(client_context, server_context,
|
||||
chatty=False,
|
||||
sni_name='supermessage')
|
||||
self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
|
||||
self.assertIn("ZeroDivisionError", stderr.getvalue())
|
||||
|
||||
@needs_sni
|
||||
def test_sni_callback_wrong_return_type(self):
|
||||
# Returning the wrong return type terminates the TLS connection
|
||||
# with an internal error alert.
|
||||
server_context, other_context, client_context = self.sni_contexts()
|
||||
|
||||
def cb_wrong_return_type(ssl_sock, server_name, initial_context):
|
||||
return "foo"
|
||||
server_context.set_servername_callback(cb_wrong_return_type)
|
||||
|
||||
with self.assertRaises(ssl.SSLError) as cm, \
|
||||
support.captured_stderr() as stderr:
|
||||
stats = server_params_test(client_context, server_context,
|
||||
chatty=False,
|
||||
sni_name='supermessage')
|
||||
self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
|
||||
self.assertIn("TypeError", stderr.getvalue())
|
||||
|
||||
|
||||
def test_main(verbose=False):
|
||||
if support.verbose:
|
||||
|
|
@ -2011,6 +2144,7 @@ def test_main(verbose=False):
|
|||
for filename in [
|
||||
CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
|
||||
ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
|
||||
SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
|
||||
BADCERT, BADKEY, EMPTYCERT]:
|
||||
if not os.path.exists(filename):
|
||||
raise support.TestFailed("Can't read certificate file %r" % filename)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue