mirror of
https://github.com/python/cpython.git
synced 2025-08-30 21:48:47 +00:00
Issue #12551: Provide a get_channel_binding() method on SSL sockets so as
to get channel binding data for the current SSL session (only the "tls-unique" channel binding is implemented). This allows the implementation of certain authentication mechanisms such as SCRAM-SHA-1-PLUS. Patch by Jacek Konieczny.
This commit is contained in:
parent
875048bd4c
commit
d649480739
6 changed files with 196 additions and 0 deletions
|
@ -321,6 +321,25 @@ class BasicSocketTests(unittest.TestCase):
|
|||
self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
|
||||
server_hostname="some.hostname")
|
||||
|
||||
def test_unknown_channel_binding(self):
|
||||
# should raise ValueError for unknown type
|
||||
s = socket.socket(socket.AF_INET)
|
||||
ss = ssl.wrap_socket(s)
|
||||
with self.assertRaises(ValueError):
|
||||
ss.get_channel_binding("unknown-type")
|
||||
|
||||
@unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
|
||||
"'tls-unique' channel binding not available")
|
||||
def test_tls_unique_channel_binding(self):
|
||||
# unconnected should return None for known type
|
||||
s = socket.socket(socket.AF_INET)
|
||||
ss = ssl.wrap_socket(s)
|
||||
self.assertIsNone(ss.get_channel_binding("tls-unique"))
|
||||
# the same for server-side
|
||||
s = socket.socket(socket.AF_INET)
|
||||
ss = ssl.wrap_socket(s, server_side=True, certfile=CERTFILE)
|
||||
self.assertIsNone(ss.get_channel_binding("tls-unique"))
|
||||
|
||||
class ContextTests(unittest.TestCase):
|
||||
|
||||
@skip_if_broken_ubuntu_ssl
|
||||
|
@ -826,6 +845,11 @@ else:
|
|||
self.sslconn = None
|
||||
if support.verbose and self.server.connectionchatty:
|
||||
sys.stdout.write(" server: connection is now unencrypted...\n")
|
||||
elif stripped == b'CB tls-unique':
|
||||
if support.verbose and self.server.connectionchatty:
|
||||
sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
|
||||
data = self.sslconn.get_channel_binding("tls-unique")
|
||||
self.write(repr(data).encode("us-ascii") + b"\n")
|
||||
else:
|
||||
if (support.verbose and
|
||||
self.server.connectionchatty):
|
||||
|
@ -1625,6 +1649,73 @@ else:
|
|||
t.join()
|
||||
server.close()
|
||||
|
||||
@unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
|
||||
"'tls-unique' channel binding not available")
|
||||
def test_tls_unique_channel_binding(self):
|
||||
"""Test tls-unique channel binding."""
|
||||
if support.verbose:
|
||||
sys.stdout.write("\n")
|
||||
|
||||
server = ThreadedEchoServer(CERTFILE,
|
||||
certreqs=ssl.CERT_NONE,
|
||||
ssl_version=ssl.PROTOCOL_TLSv1,
|
||||
cacerts=CERTFILE,
|
||||
chatty=True,
|
||||
connectionchatty=False)
|
||||
flag = threading.Event()
|
||||
server.start(flag)
|
||||
# wait for it to start
|
||||
flag.wait()
|
||||
# try to connect
|
||||
s = ssl.wrap_socket(socket.socket(),
|
||||
server_side=False,
|
||||
certfile=CERTFILE,
|
||||
ca_certs=CERTFILE,
|
||||
cert_reqs=ssl.CERT_NONE,
|
||||
ssl_version=ssl.PROTOCOL_TLSv1)
|
||||
s.connect((HOST, server.port))
|
||||
try:
|
||||
# get the data
|
||||
cb_data = s.get_channel_binding("tls-unique")
|
||||
if support.verbose:
|
||||
sys.stdout.write(" got channel binding data: {0!r}\n"
|
||||
.format(cb_data))
|
||||
|
||||
# check if it is sane
|
||||
self.assertIsNotNone(cb_data)
|
||||
self.assertEqual(len(cb_data), 12) # True for TLSv1
|
||||
|
||||
# and compare with the peers version
|
||||
s.write(b"CB tls-unique\n")
|
||||
peer_data_repr = s.read().strip()
|
||||
self.assertEqual(peer_data_repr,
|
||||
repr(cb_data).encode("us-ascii"))
|
||||
s.close()
|
||||
|
||||
# now, again
|
||||
s = ssl.wrap_socket(socket.socket(),
|
||||
server_side=False,
|
||||
certfile=CERTFILE,
|
||||
ca_certs=CERTFILE,
|
||||
cert_reqs=ssl.CERT_NONE,
|
||||
ssl_version=ssl.PROTOCOL_TLSv1)
|
||||
s.connect((HOST, server.port))
|
||||
new_cb_data = s.get_channel_binding("tls-unique")
|
||||
if support.verbose:
|
||||
sys.stdout.write(" got another channel binding data: {0!r}\n"
|
||||
.format(new_cb_data))
|
||||
# is it really unique
|
||||
self.assertNotEqual(cb_data, new_cb_data)
|
||||
self.assertIsNotNone(cb_data)
|
||||
self.assertEqual(len(cb_data), 12) # True for TLSv1
|
||||
s.write(b"CB tls-unique\n")
|
||||
peer_data_repr = s.read().strip()
|
||||
self.assertEqual(peer_data_repr,
|
||||
repr(new_cb_data).encode("us-ascii"))
|
||||
s.close()
|
||||
finally:
|
||||
server.stop()
|
||||
server.join()
|
||||
|
||||
def test_main(verbose=False):
|
||||
if support.verbose:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue