gh-136306: Add support for getting and setting SSL groups (#136307)

Add support for getting and setting groups used for key agreement.

* `ssl.SSLSocket.group()` returns the name of the group used
  for the key agreement of the current session establishment.
  This feature requires Python to be built with OpenSSL 3.2 or later.

* `ssl.SSLContext.get_groups()` returns the list of names of groups
  that are compatible with the TLS version of the current context.
  This feature requires Python to be built with OpenSSL 3.5 or later.

* `ssl.SSLContext.set_groups()` sets the groups allowed for key agreement
  for sockets created with this context. This feature is always supported.
This commit is contained in:
Ron Frederick 2025-07-28 10:33:31 -07:00 committed by GitHub
parent 59e2330cf3
commit 377b787618
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 370 additions and 1 deletions

View file

@ -1290,6 +1290,13 @@ SSL sockets also have the following additional methods and attributes:
.. versionadded:: 3.5
.. method:: SSLSocket.group()
Return the group used for doing key agreement on this connection. If no
connection has been established, returns ``None``.
.. versionadded:: next
.. method:: SSLSocket.compression()
Return the compression algorithm being used as a string, or ``None``
@ -1647,6 +1654,25 @@ to speed up repeated connections from the same clients.
.. versionadded:: 3.6
.. method:: SSLContext.get_groups(*, include_aliases=False)
Get a list of groups implemented for key agreement, taking into
account the current TLS :attr:`~SSLContext.minimum_version` and
:attr:`~SSLContext.maximum_version` values. For example::
>>> ctx = ssl.create_default_context()
>>> ctx.minimum_version = ssl.TLSVersion.TLSv1_3
>>> ctx.maximum_version = ssl.TLSVersion.TLSv1_3
>>> ctx.get_groups() # doctest: +SKIP
['secp256r1', 'secp384r1', 'secp521r1', 'x25519', 'x448', ...]
By default, this method returns only the preferred IANA names for the
available groups. However, if the ``include_aliases`` parameter is set to
:const:`True` this method will also return any associated aliases such as
the ECDH curve names supported in older versions of OpenSSL.
.. versionadded:: next
.. method:: SSLContext.set_default_verify_paths()
Load a set of default "certification authority" (CA) certificates from
@ -1672,6 +1698,19 @@ to speed up repeated connections from the same clients.
TLS 1.3 cipher suites cannot be disabled with
:meth:`~SSLContext.set_ciphers`.
.. method:: SSLContext.set_groups(groups)
Set the groups allowed for key agreement for sockets created with this
context. It should be a string in the `OpenSSL group list format
<https://docs.openssl.org/master/man3/SSL_CTX_set1_groups_list/>`_.
.. note::
When connected, the :meth:`SSLSocket.group` method of SSL sockets will
return the group used for key agreement on that connection.
.. versionadded:: next
.. method:: SSLContext.set_alpn_protocols(protocols)
Specify which protocols the socket should advertise during the SSL/TLS

View file

@ -300,6 +300,24 @@ ssl
supports "External PSKs" in TLSv1.3, as described in RFC 9258.
(Contributed by Will Childs-Klein in :gh:`133624`.)
* Added new methods for managing groups used for SSL key agreement
* :meth:`ssl.SSLContext.set_groups` sets the groups allowed for doing
key agreement, extending the previous
:meth:`ssl.SSLContext.set_ecdh_curve` method.
This new API provides the ability to list multiple groups and
supports fixed-field and post-quantum groups in addition to ECDH
curves. This method can also be used to control what key shares
are sent in the TLS handshake.
* :meth:`ssl.SSLSocket.group` returns the group selected for doing key
agreement on the current connection after the TLS handshake completes.
This call requires OpenSSL 3.2 or later.
* :meth:`ssl.SSLContext.get_groups` returns a list of all available key
agreement groups compatible with the minimum and maximum TLS versions
currently set in the context. This call requires OpenSSL 3.5 or later.
(Contributed by Ron Frederick in :gh:`136306`)
tarfile
-------

View file

@ -1005,6 +1005,7 @@ _PyStaticObjects_CheckRefcnt(PyInterpreterState *interp) {
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(imag));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(importlib));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(in_fd));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(include_aliases));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(incoming));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(index));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(indexgroup));

View file

@ -496,6 +496,7 @@ struct _Py_global_strings {
STRUCT_FOR_ID(imag)
STRUCT_FOR_ID(importlib)
STRUCT_FOR_ID(in_fd)
STRUCT_FOR_ID(include_aliases)
STRUCT_FOR_ID(incoming)
STRUCT_FOR_ID(index)
STRUCT_FOR_ID(indexgroup)

View file

@ -1003,6 +1003,7 @@ extern "C" {
INIT_ID(imag), \
INIT_ID(importlib), \
INIT_ID(in_fd), \
INIT_ID(include_aliases), \
INIT_ID(incoming), \
INIT_ID(index), \
INIT_ID(indexgroup), \

View file

@ -1772,6 +1772,10 @@ _PyUnicode_InitStaticStrings(PyInterpreterState *interp) {
_PyUnicode_InternStatic(interp, &string);
assert(_PyUnicode_CheckConsistency(string, 1));
assert(PyUnicode_GET_LENGTH(string) != 1);
string = &_Py_ID(include_aliases);
_PyUnicode_InternStatic(interp, &string);
assert(_PyUnicode_CheckConsistency(string, 1));
assert(PyUnicode_GET_LENGTH(string) != 1);
string = &_Py_ID(incoming);
_PyUnicode_InternStatic(interp, &string);
assert(_PyUnicode_CheckConsistency(string, 1));

View file

@ -931,6 +931,10 @@ class SSLObject:
ssl_version, secret_bits)``."""
return self._sslobj.cipher()
def group(self):
"""Return the currently selected key agreement group name."""
return self._sslobj.group()
def shared_ciphers(self):
"""Return a list of ciphers shared by the client during the handshake or
None if this is not a valid server connection.
@ -1210,6 +1214,14 @@ class SSLSocket(socket):
else:
return self._sslobj.cipher()
@_sslcopydoc
def group(self):
self._checkClosed()
if self._sslobj is None:
return None
else:
return self._sslobj.group()
@_sslcopydoc
def shared_ciphers(self):
self._checkClosed()

View file

@ -48,6 +48,8 @@ Py_DEBUG_WIN32 = support.Py_DEBUG and sys.platform == 'win32'
PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
HOST = socket_helper.HOST
IS_OPENSSL_3_0_0 = ssl.OPENSSL_VERSION_INFO >= (3, 0, 0)
CAN_GET_SELECTED_OPENSSL_GROUP = ssl.OPENSSL_VERSION_INFO >= (3, 2)
CAN_GET_AVAILABLE_OPENSSL_GROUPS = ssl.OPENSSL_VERSION_INFO >= (3, 5)
PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
PROTOCOL_TO_TLS_VERSION = {}
@ -960,6 +962,19 @@ class ContextTests(unittest.TestCase):
len(intersection), 2, f"\ngot: {sorted(names)}\nexpected: {sorted(expected)}"
)
def test_set_groups(self):
ctx = ssl.create_default_context()
self.assertIsNone(ctx.set_groups('P-256:X25519'))
self.assertRaises(ssl.SSLError, ctx.set_groups, 'P-256:xxx')
@unittest.skipUnless(CAN_GET_AVAILABLE_OPENSSL_GROUPS,
"OpenSSL version doesn't support getting groups")
def test_get_groups(self):
ctx = ssl.create_default_context()
# By default, only return official IANA names.
self.assertNotIn('P-256', ctx.get_groups())
self.assertIn('P-256', ctx.get_groups(include_aliases=True))
def test_options(self):
# Test default SSLContext options
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
@ -2720,6 +2735,8 @@ def server_params_test(client_context, server_context, indata=b"FOO\n",
'session_reused': s.session_reused,
'session': s.session,
})
if CAN_GET_SELECTED_OPENSSL_GROUP:
stats.update({'group': s.group()})
s.close()
stats['server_alpn_protocols'] = server.selected_alpn_protocols
stats['server_shared_ciphers'] = server.shared_ciphers
@ -3870,6 +3887,8 @@ class ThreadedTests(unittest.TestCase):
with self.assertRaises(OSError):
s.connect((HOST, server.port))
self.assertIn("NO_SHARED_CIPHER", server.conn_errors[0])
self.assertIsNone(s.cipher())
self.assertIsNone(s.group())
def test_version_basic(self):
"""
@ -4145,6 +4164,38 @@ class ThreadedTests(unittest.TestCase):
chatty=True, connectionchatty=True,
sni_name=hostname)
def test_groups(self):
# server secp384r1, client auto
client_context, server_context, hostname = testing_context()
server_context.set_groups("secp384r1")
server_context.minimum_version = ssl.TLSVersion.TLSv1_3
stats = server_params_test(client_context, server_context,
chatty=True, connectionchatty=True,
sni_name=hostname)
if CAN_GET_SELECTED_OPENSSL_GROUP:
self.assertEqual(stats['group'], "secp384r1")
# server auto, client secp384r1
client_context, server_context, hostname = testing_context()
client_context.set_groups("secp384r1")
server_context.minimum_version = ssl.TLSVersion.TLSv1_3
stats = server_params_test(client_context, server_context,
chatty=True, connectionchatty=True,
sni_name=hostname)
if CAN_GET_SELECTED_OPENSSL_GROUP:
self.assertEqual(stats['group'], "secp384r1")
# server / client curve mismatch
client_context, server_context, hostname = testing_context()
client_context.set_groups("prime256v1")
server_context.set_groups("secp384r1")
server_context.minimum_version = ssl.TLSVersion.TLSv1_3
with self.assertRaises(ssl.SSLError):
server_params_test(client_context, server_context,
chatty=True, connectionchatty=True,
sni_name=hostname)
def test_selected_alpn_protocol(self):
# selected_alpn_protocol() is None unless ALPN is used.
client_context, server_context, hostname = testing_context()

View file

@ -0,0 +1 @@
:mod:`ssl` can now get and set groups used for key agreement.

View file

@ -2176,6 +2176,33 @@ _ssl__SSLSocket_cipher_impl(PySSLSocket *self)
return cipher_to_tuple(current);
}
/*[clinic input]
@critical_section
_ssl._SSLSocket.group
[clinic start generated code]*/
static PyObject *
_ssl__SSLSocket_group_impl(PySSLSocket *self)
/*[clinic end generated code: output=9c168ee877017b95 input=5f187d8bf0d433b7]*/
{
#if OPENSSL_VERSION_NUMBER >= 0x30200000L
const char *group_name;
if (self->ssl == NULL) {
Py_RETURN_NONE;
}
group_name = SSL_get0_group_name(self->ssl);
if (group_name == NULL) {
Py_RETURN_NONE;
}
return PyUnicode_DecodeFSDefault(group_name);
#else
PyErr_SetString(PyExc_NotImplementedError,
"Getting selected group requires OpenSSL 3.2 or later.");
return NULL;
#endif
}
/*[clinic input]
@critical_section
_ssl._SSLSocket.version
@ -3240,6 +3267,7 @@ static PyMethodDef PySSLMethods[] = {
_SSL__SSLSOCKET_GETPEERCERT_METHODDEF
_SSL__SSLSOCKET_GET_CHANNEL_BINDING_METHODDEF
_SSL__SSLSOCKET_CIPHER_METHODDEF
_SSL__SSLSOCKET_GROUP_METHODDEF
_SSL__SSLSOCKET_SHARED_CIPHERS_METHODDEF
_SSL__SSLSOCKET_VERSION_METHODDEF
_SSL__SSLSOCKET_SELECTED_ALPN_PROTOCOL_METHODDEF
@ -3622,6 +3650,89 @@ _ssl__SSLContext_get_ciphers_impl(PySSLContext *self)
}
/*[clinic input]
@critical_section
_ssl._SSLContext.set_groups
grouplist: str
/
[clinic start generated code]*/
static PyObject *
_ssl__SSLContext_set_groups_impl(PySSLContext *self, const char *grouplist)
/*[clinic end generated code: output=0b5d05dfd371ffd0 input=2cc64cef21930741]*/
{
if (!SSL_CTX_set1_groups_list(self->ctx, grouplist)) {
_setSSLError(get_state_ctx(self), "unrecognized group", 0, __FILE__, __LINE__);
return NULL;
}
Py_RETURN_NONE;
}
/*[clinic input]
@critical_section
_ssl._SSLContext.get_groups
*
include_aliases: bool = False
[clinic start generated code]*/
static PyObject *
_ssl__SSLContext_get_groups_impl(PySSLContext *self, int include_aliases)
/*[clinic end generated code: output=6d6209dd1051529b input=3e8ee5deb277dcc5]*/
{
#if OPENSSL_VERSION_NUMBER >= 0x30500000L
STACK_OF(OPENSSL_CSTRING) *groups = NULL;
const char *group;
int i, num;
PyObject *item, *result = NULL;
// This "groups" object is dynamically allocated, but the strings inside
// it are internal constants which shouldn't ever be modified or freed.
if ((groups = sk_OPENSSL_CSTRING_new_null()) == NULL) {
_setSSLError(get_state_ctx(self), "Can't allocate stack", 0, __FILE__, __LINE__);
goto error;
}
if (!SSL_CTX_get0_implemented_groups(self->ctx, include_aliases, groups)) {
_setSSLError(get_state_ctx(self), "Can't get groups", 0, __FILE__, __LINE__);
goto error;
}
num = sk_OPENSSL_CSTRING_num(groups);
result = PyList_New(num);
if (result == NULL) {
_setSSLError(get_state_ctx(self), "Can't allocate list", 0, __FILE__, __LINE__);
goto error;
}
for (i = 0; i < num; ++i) {
// There's no allocation here, so group won't ever be NULL.
group = sk_OPENSSL_CSTRING_value(groups, i);
assert(group != NULL);
// Group names are plain ASCII, so there's no chance of a decoding
// error here. However, an allocation failure could occur when
// constructing the Unicode version of the names.
item = PyUnicode_DecodeASCII(group, strlen(group), "strict");
if (item == NULL) {
_setSSLError(get_state_ctx(self), "Can't allocate group name", 0, __FILE__, __LINE__);
goto error;
}
PyList_SET_ITEM(result, i, item);
}
sk_OPENSSL_CSTRING_free(groups);
return result;
error:
Py_XDECREF(result);
sk_OPENSSL_CSTRING_free(groups);
return NULL;
#else
PyErr_SetString(PyExc_NotImplementedError,
"Getting implemented groups requires OpenSSL 3.5 or later.");
return NULL;
#endif
}
static int
do_protocol_selection(int alpn, unsigned char **out, unsigned char *outlen,
@ -5472,6 +5583,7 @@ static struct PyMethodDef context_methods[] = {
_SSL__SSLCONTEXT__WRAP_SOCKET_METHODDEF
_SSL__SSLCONTEXT__WRAP_BIO_METHODDEF
_SSL__SSLCONTEXT_SET_CIPHERS_METHODDEF
_SSL__SSLCONTEXT_SET_GROUPS_METHODDEF
_SSL__SSLCONTEXT__SET_ALPN_PROTOCOLS_METHODDEF
_SSL__SSLCONTEXT_LOAD_CERT_CHAIN_METHODDEF
_SSL__SSLCONTEXT_LOAD_DH_PARAMS_METHODDEF
@ -5482,6 +5594,7 @@ static struct PyMethodDef context_methods[] = {
_SSL__SSLCONTEXT_CERT_STORE_STATS_METHODDEF
_SSL__SSLCONTEXT_GET_CA_CERTS_METHODDEF
_SSL__SSLCONTEXT_GET_CIPHERS_METHODDEF
_SSL__SSLCONTEXT_GET_GROUPS_METHODDEF
_SSL__SSLCONTEXT_SET_PSK_CLIENT_CALLBACK_METHODDEF
_SSL__SSLCONTEXT_SET_PSK_SERVER_CALLBACK_METHODDEF
{NULL, NULL} /* sentinel */

130
Modules/clinic/_ssl.c.h generated
View file

@ -196,6 +196,29 @@ _ssl__SSLSocket_cipher(PyObject *self, PyObject *Py_UNUSED(ignored))
return return_value;
}
PyDoc_STRVAR(_ssl__SSLSocket_group__doc__,
"group($self, /)\n"
"--\n"
"\n");
#define _SSL__SSLSOCKET_GROUP_METHODDEF \
{"group", (PyCFunction)_ssl__SSLSocket_group, METH_NOARGS, _ssl__SSLSocket_group__doc__},
static PyObject *
_ssl__SSLSocket_group_impl(PySSLSocket *self);
static PyObject *
_ssl__SSLSocket_group(PyObject *self, PyObject *Py_UNUSED(ignored))
{
PyObject *return_value = NULL;
Py_BEGIN_CRITICAL_SECTION(self);
return_value = _ssl__SSLSocket_group_impl((PySSLSocket *)self);
Py_END_CRITICAL_SECTION();
return return_value;
}
PyDoc_STRVAR(_ssl__SSLSocket_version__doc__,
"version($self, /)\n"
"--\n"
@ -969,6 +992,111 @@ _ssl__SSLContext_get_ciphers(PyObject *self, PyObject *Py_UNUSED(ignored))
return return_value;
}
PyDoc_STRVAR(_ssl__SSLContext_set_groups__doc__,
"set_groups($self, grouplist, /)\n"
"--\n"
"\n");
#define _SSL__SSLCONTEXT_SET_GROUPS_METHODDEF \
{"set_groups", (PyCFunction)_ssl__SSLContext_set_groups, METH_O, _ssl__SSLContext_set_groups__doc__},
static PyObject *
_ssl__SSLContext_set_groups_impl(PySSLContext *self, const char *grouplist);
static PyObject *
_ssl__SSLContext_set_groups(PyObject *self, PyObject *arg)
{
PyObject *return_value = NULL;
const char *grouplist;
if (!PyUnicode_Check(arg)) {
_PyArg_BadArgument("set_groups", "argument", "str", arg);
goto exit;
}
Py_ssize_t grouplist_length;
grouplist = PyUnicode_AsUTF8AndSize(arg, &grouplist_length);
if (grouplist == NULL) {
goto exit;
}
if (strlen(grouplist) != (size_t)grouplist_length) {
PyErr_SetString(PyExc_ValueError, "embedded null character");
goto exit;
}
Py_BEGIN_CRITICAL_SECTION(self);
return_value = _ssl__SSLContext_set_groups_impl((PySSLContext *)self, grouplist);
Py_END_CRITICAL_SECTION();
exit:
return return_value;
}
PyDoc_STRVAR(_ssl__SSLContext_get_groups__doc__,
"get_groups($self, /, *, include_aliases=False)\n"
"--\n"
"\n");
#define _SSL__SSLCONTEXT_GET_GROUPS_METHODDEF \
{"get_groups", _PyCFunction_CAST(_ssl__SSLContext_get_groups), METH_FASTCALL|METH_KEYWORDS, _ssl__SSLContext_get_groups__doc__},
static PyObject *
_ssl__SSLContext_get_groups_impl(PySSLContext *self, int include_aliases);
static PyObject *
_ssl__SSLContext_get_groups(PyObject *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
{
PyObject *return_value = NULL;
#if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE)
#define NUM_KEYWORDS 1
static struct {
PyGC_Head _this_is_not_used;
PyObject_VAR_HEAD
Py_hash_t ob_hash;
PyObject *ob_item[NUM_KEYWORDS];
} _kwtuple = {
.ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS)
.ob_hash = -1,
.ob_item = { &_Py_ID(include_aliases), },
};
#undef NUM_KEYWORDS
#define KWTUPLE (&_kwtuple.ob_base.ob_base)
#else // !Py_BUILD_CORE
# define KWTUPLE NULL
#endif // !Py_BUILD_CORE
static const char * const _keywords[] = {"include_aliases", NULL};
static _PyArg_Parser _parser = {
.keywords = _keywords,
.fname = "get_groups",
.kwtuple = KWTUPLE,
};
#undef KWTUPLE
PyObject *argsbuf[1];
Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 0;
int include_aliases = 0;
args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser,
/*minpos*/ 0, /*maxpos*/ 0, /*minkw*/ 0, /*varpos*/ 0, argsbuf);
if (!args) {
goto exit;
}
if (!noptargs) {
goto skip_optional_kwonly;
}
include_aliases = PyObject_IsTrue(args[0]);
if (include_aliases < 0) {
goto exit;
}
skip_optional_kwonly:
Py_BEGIN_CRITICAL_SECTION(self);
return_value = _ssl__SSLContext_get_groups_impl((PySSLContext *)self, include_aliases);
Py_END_CRITICAL_SECTION();
exit:
return return_value;
}
PyDoc_STRVAR(_ssl__SSLContext__set_alpn_protocols__doc__,
"_set_alpn_protocols($self, protos, /)\n"
"--\n"
@ -3014,4 +3142,4 @@ exit:
#ifndef _SSL_ENUM_CRLS_METHODDEF
#define _SSL_ENUM_CRLS_METHODDEF
#endif /* !defined(_SSL_ENUM_CRLS_METHODDEF) */
/*[clinic end generated code: output=1adc3780d8ca682a input=a9049054013a1b77]*/
/*[clinic end generated code: output=c409bdf3c123b28b input=a9049054013a1b77]*/