mirror of
https://github.com/python/cpython.git
synced 2025-08-04 08:59:19 +00:00
bpo-18233: Add internal methods to access peer chain (GH-25467)
The internal `_ssl._SSLSocket` object now provides methods to retrieve the peer cert chain and verified cert chain as a list of Certificate objects. Certificate objects have methods to convert the cert to a dict, PEM, or DER (ASN.1). These are private APIs for now. There is a slim chance to stabilize the approach and provide a public API for 3.10. Otherwise I'll provide a stable API in 3.11. Signed-off-by: Christian Heimes <christian@python.org>
This commit is contained in:
parent
3c586ca500
commit
666991fc59
9 changed files with 563 additions and 6 deletions
|
@ -32,6 +32,7 @@ except ImportError:
|
|||
ctypes = None
|
||||
|
||||
ssl = import_helper.import_module("ssl")
|
||||
import _ssl
|
||||
|
||||
from ssl import TLSVersion, _TLSContentType, _TLSMessageType, _TLSAlertType
|
||||
|
||||
|
@ -297,7 +298,7 @@ def test_wrap_socket(sock, *,
|
|||
return context.wrap_socket(sock, **kwargs)
|
||||
|
||||
|
||||
def testing_context(server_cert=SIGNED_CERTFILE):
|
||||
def testing_context(server_cert=SIGNED_CERTFILE, *, server_chain=True):
|
||||
"""Create context
|
||||
|
||||
client_context, server_context, hostname = testing_context()
|
||||
|
@ -316,7 +317,8 @@ def testing_context(server_cert=SIGNED_CERTFILE):
|
|||
|
||||
server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||||
server_context.load_cert_chain(server_cert)
|
||||
server_context.load_verify_locations(SIGNING_CA)
|
||||
if server_chain:
|
||||
server_context.load_verify_locations(SIGNING_CA)
|
||||
|
||||
return client_context, server_context, hostname
|
||||
|
||||
|
@ -2482,6 +2484,12 @@ class ThreadedEchoServer(threading.Thread):
|
|||
elif stripped == b'GETCERT':
|
||||
cert = self.sslconn.getpeercert()
|
||||
self.write(repr(cert).encode("us-ascii") + b"\n")
|
||||
elif stripped == b'VERIFIEDCHAIN':
|
||||
certs = self.sslconn._sslobj.get_verified_chain()
|
||||
self.write(len(certs).to_bytes(1, "big") + b"\n")
|
||||
elif stripped == b'UNVERIFIEDCHAIN':
|
||||
certs = self.sslconn._sslobj.get_unverified_chain()
|
||||
self.write(len(certs).to_bytes(1, "big") + b"\n")
|
||||
else:
|
||||
if (support.verbose and
|
||||
self.server.connectionchatty):
|
||||
|
@ -4567,6 +4575,63 @@ class TestPostHandshakeAuth(unittest.TestCase):
|
|||
# server cert has not been validated
|
||||
self.assertEqual(s.getpeercert(), {})
|
||||
|
||||
def test_internal_chain_client(self):
|
||||
client_context, server_context, hostname = testing_context(
|
||||
server_chain=False
|
||||
)
|
||||
server = ThreadedEchoServer(context=server_context, chatty=False)
|
||||
with server:
|
||||
with client_context.wrap_socket(
|
||||
socket.socket(),
|
||||
server_hostname=hostname
|
||||
) as s:
|
||||
s.connect((HOST, server.port))
|
||||
vc = s._sslobj.get_verified_chain()
|
||||
self.assertEqual(len(vc), 2)
|
||||
ee, ca = vc
|
||||
uvc = s._sslobj.get_unverified_chain()
|
||||
self.assertEqual(len(uvc), 1)
|
||||
|
||||
self.assertEqual(ee, uvc[0])
|
||||
self.assertEqual(hash(ee), hash(uvc[0]))
|
||||
self.assertEqual(repr(ee), repr(uvc[0]))
|
||||
|
||||
self.assertNotEqual(ee, ca)
|
||||
self.assertNotEqual(hash(ee), hash(ca))
|
||||
self.assertNotEqual(repr(ee), repr(ca))
|
||||
self.assertNotEqual(ee.get_info(), ca.get_info())
|
||||
self.assertIn("CN=localhost", repr(ee))
|
||||
self.assertIn("CN=our-ca-server", repr(ca))
|
||||
|
||||
pem = ee.public_bytes(_ssl.ENCODING_PEM)
|
||||
der = ee.public_bytes(_ssl.ENCODING_DER)
|
||||
self.assertIsInstance(pem, str)
|
||||
self.assertIn("-----BEGIN CERTIFICATE-----", pem)
|
||||
self.assertIsInstance(der, bytes)
|
||||
self.assertEqual(
|
||||
ssl.PEM_cert_to_DER_cert(pem), der
|
||||
)
|
||||
|
||||
def test_internal_chain_server(self):
|
||||
client_context, server_context, hostname = testing_context()
|
||||
client_context.load_cert_chain(SIGNED_CERTFILE)
|
||||
server_context.verify_mode = ssl.CERT_REQUIRED
|
||||
server_context.maximum_version = ssl.TLSVersion.TLSv1_2
|
||||
|
||||
server = ThreadedEchoServer(context=server_context, chatty=False)
|
||||
with server:
|
||||
with client_context.wrap_socket(
|
||||
socket.socket(),
|
||||
server_hostname=hostname
|
||||
) as s:
|
||||
s.connect((HOST, server.port))
|
||||
s.write(b'VERIFIEDCHAIN\n')
|
||||
res = s.recv(1024)
|
||||
self.assertEqual(res, b'\x02\n')
|
||||
s.write(b'UNVERIFIEDCHAIN\n')
|
||||
res = s.recv(1024)
|
||||
self.assertEqual(res, b'\x02\n')
|
||||
|
||||
|
||||
HAS_KEYLOG = hasattr(ssl.SSLContext, 'keylog_filename')
|
||||
requires_keylog = unittest.skipUnless(
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue