mirror of
https://github.com/python/cpython.git
synced 2025-08-03 16:39:00 +00:00
Issue #28022: Deprecate ssl-related arguments in favor of SSLContext.
The deprecation include manual creation of SSLSocket and certfile/keyfile (or similar) in ftplib, httplib, imaplib, smtplib, poplib and urllib. ssl.wrap_socket() is not marked as deprecated yet.
This commit is contained in:
parent
130bbe5fd3
commit
d04863771b
23 changed files with 189 additions and 85 deletions
|
@ -143,6 +143,21 @@ def skip_if_broken_ubuntu_ssl(func):
|
|||
needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
|
||||
|
||||
|
||||
def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
|
||||
cert_reqs=ssl.CERT_NONE, ca_certs=None,
|
||||
ciphers=None, certfile=None, keyfile=None,
|
||||
**kwargs):
|
||||
context = ssl.SSLContext(ssl_version)
|
||||
if cert_reqs is not None:
|
||||
context.verify_mode = cert_reqs
|
||||
if ca_certs is not None:
|
||||
context.load_verify_locations(ca_certs)
|
||||
if certfile is not None or keyfile is not None:
|
||||
context.load_cert_chain(certfile, keyfile)
|
||||
if ciphers is not None:
|
||||
context.set_ciphers(ciphers)
|
||||
return context.wrap_socket(sock, **kwargs)
|
||||
|
||||
class BasicSocketTests(unittest.TestCase):
|
||||
|
||||
def test_constants(self):
|
||||
|
@ -363,7 +378,7 @@ class BasicSocketTests(unittest.TestCase):
|
|||
# Issue #7943: an SSL object doesn't create reference cycles with
|
||||
# itself.
|
||||
s = socket.socket(socket.AF_INET)
|
||||
ss = ssl.wrap_socket(s)
|
||||
ss = test_wrap_socket(s)
|
||||
wr = weakref.ref(ss)
|
||||
with support.check_warnings(("", ResourceWarning)):
|
||||
del ss
|
||||
|
@ -373,7 +388,7 @@ class BasicSocketTests(unittest.TestCase):
|
|||
# Methods on an unconnected SSLSocket propagate the original
|
||||
# OSError raise by the underlying socket object.
|
||||
s = socket.socket(socket.AF_INET)
|
||||
with ssl.wrap_socket(s) as ss:
|
||||
with test_wrap_socket(s) as ss:
|
||||
self.assertRaises(OSError, ss.recv, 1)
|
||||
self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
|
||||
self.assertRaises(OSError, ss.recvfrom, 1)
|
||||
|
@ -387,10 +402,10 @@ class BasicSocketTests(unittest.TestCase):
|
|||
for timeout in (None, 0.0, 5.0):
|
||||
s = socket.socket(socket.AF_INET)
|
||||
s.settimeout(timeout)
|
||||
with ssl.wrap_socket(s) as ss:
|
||||
with test_wrap_socket(s) as ss:
|
||||
self.assertEqual(timeout, ss.gettimeout())
|
||||
|
||||
def test_errors(self):
|
||||
def test_errors_sslwrap(self):
|
||||
sock = socket.socket()
|
||||
self.assertRaisesRegex(ValueError,
|
||||
"certfile must be specified",
|
||||
|
@ -400,10 +415,10 @@ class BasicSocketTests(unittest.TestCase):
|
|||
ssl.wrap_socket, sock, server_side=True)
|
||||
self.assertRaisesRegex(ValueError,
|
||||
"certfile must be specified for server-side operations",
|
||||
ssl.wrap_socket, sock, server_side=True, certfile="")
|
||||
ssl.wrap_socket, sock, server_side=True, certfile="")
|
||||
with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
|
||||
self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
|
||||
s.connect, (HOST, 8080))
|
||||
s.connect, (HOST, 8080))
|
||||
with self.assertRaises(OSError) as cm:
|
||||
with socket.socket() as sock:
|
||||
ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
|
||||
|
@ -426,7 +441,7 @@ class BasicSocketTests(unittest.TestCase):
|
|||
sock = socket.socket()
|
||||
self.addCleanup(sock.close)
|
||||
with self.assertRaises(ssl.SSLError):
|
||||
ssl.wrap_socket(sock,
|
||||
test_wrap_socket(sock,
|
||||
certfile=certfile,
|
||||
ssl_version=ssl.PROTOCOL_TLSv1)
|
||||
|
||||
|
@ -613,7 +628,7 @@ class BasicSocketTests(unittest.TestCase):
|
|||
s.listen()
|
||||
c = socket.socket(socket.AF_INET)
|
||||
c.connect(s.getsockname())
|
||||
with ssl.wrap_socket(c, do_handshake_on_connect=False) as ss:
|
||||
with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
|
||||
with self.assertRaises(ValueError):
|
||||
ss.get_channel_binding("unknown-type")
|
||||
s.close()
|
||||
|
@ -623,15 +638,15 @@ class BasicSocketTests(unittest.TestCase):
|
|||
def test_tls_unique_channel_binding(self):
|
||||
# unconnected should return None for known type
|
||||
s = socket.socket(socket.AF_INET)
|
||||
with ssl.wrap_socket(s) as ss:
|
||||
with test_wrap_socket(s) as ss:
|
||||
self.assertIsNone(ss.get_channel_binding("tls-unique"))
|
||||
# the same for server-side
|
||||
s = socket.socket(socket.AF_INET)
|
||||
with ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
|
||||
with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
|
||||
self.assertIsNone(ss.get_channel_binding("tls-unique"))
|
||||
|
||||
def test_dealloc_warn(self):
|
||||
ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
|
||||
ss = test_wrap_socket(socket.socket(socket.AF_INET))
|
||||
r = repr(ss)
|
||||
with self.assertWarns(ResourceWarning) as cm:
|
||||
ss = None
|
||||
|
@ -750,7 +765,7 @@ class BasicSocketTests(unittest.TestCase):
|
|||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self.addCleanup(s.close)
|
||||
with self.assertRaises(NotImplementedError) as cx:
|
||||
ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
|
||||
test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
|
||||
self.assertEqual(str(cx.exception), "only stream sockets are supported")
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
||||
with self.assertRaises(NotImplementedError) as cx:
|
||||
|
@ -826,7 +841,7 @@ class BasicSocketTests(unittest.TestCase):
|
|||
server = socket.socket(socket.AF_INET)
|
||||
self.addCleanup(server.close)
|
||||
port = support.bind_port(server) # Reserve port but don't listen
|
||||
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
|
||||
s = test_wrap_socket(socket.socket(socket.AF_INET),
|
||||
cert_reqs=ssl.CERT_REQUIRED)
|
||||
self.addCleanup(s.close)
|
||||
rc = s.connect_ex((HOST, port))
|
||||
|
@ -1444,13 +1459,13 @@ class SimpleBackgroundTests(unittest.TestCase):
|
|||
self.addCleanup(server.__exit__, None, None, None)
|
||||
|
||||
def test_connect(self):
|
||||
with ssl.wrap_socket(socket.socket(socket.AF_INET),
|
||||
with test_wrap_socket(socket.socket(socket.AF_INET),
|
||||
cert_reqs=ssl.CERT_NONE) as s:
|
||||
s.connect(self.server_addr)
|
||||
self.assertEqual({}, s.getpeercert())
|
||||
|
||||
# this should succeed because we specify the root cert
|
||||
with ssl.wrap_socket(socket.socket(socket.AF_INET),
|
||||
with test_wrap_socket(socket.socket(socket.AF_INET),
|
||||
cert_reqs=ssl.CERT_REQUIRED,
|
||||
ca_certs=SIGNING_CA) as s:
|
||||
s.connect(self.server_addr)
|
||||
|
@ -1460,7 +1475,7 @@ class SimpleBackgroundTests(unittest.TestCase):
|
|||
# This should fail because we have no verification certs. Connection
|
||||
# failure crashes ThreadedEchoServer, so run this in an independent
|
||||
# test method.
|
||||
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
|
||||
s = test_wrap_socket(socket.socket(socket.AF_INET),
|
||||
cert_reqs=ssl.CERT_REQUIRED)
|
||||
self.addCleanup(s.close)
|
||||
self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
|
||||
|
@ -1468,7 +1483,7 @@ class SimpleBackgroundTests(unittest.TestCase):
|
|||
|
||||
def test_connect_ex(self):
|
||||
# Issue #11326: check connect_ex() implementation
|
||||
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
|
||||
s = test_wrap_socket(socket.socket(socket.AF_INET),
|
||||
cert_reqs=ssl.CERT_REQUIRED,
|
||||
ca_certs=SIGNING_CA)
|
||||
self.addCleanup(s.close)
|
||||
|
@ -1478,7 +1493,7 @@ class SimpleBackgroundTests(unittest.TestCase):
|
|||
def test_non_blocking_connect_ex(self):
|
||||
# Issue #11326: non-blocking connect_ex() should allow handshake
|
||||
# to proceed after the socket gets ready.
|
||||
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
|
||||
s = test_wrap_socket(socket.socket(socket.AF_INET),
|
||||
cert_reqs=ssl.CERT_REQUIRED,
|
||||
ca_certs=SIGNING_CA,
|
||||
do_handshake_on_connect=False)
|
||||
|
@ -1578,7 +1593,7 @@ class SimpleBackgroundTests(unittest.TestCase):
|
|||
# Issue #5238: creating a file-like object with makefile() shouldn't
|
||||
# delay closing the underlying "real socket" (here tested with its
|
||||
# file descriptor, hence skipping the test under Windows).
|
||||
ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
|
||||
ss = test_wrap_socket(socket.socket(socket.AF_INET))
|
||||
ss.connect(self.server_addr)
|
||||
fd = ss.fileno()
|
||||
f = ss.makefile()
|
||||
|
@ -1596,7 +1611,7 @@ class SimpleBackgroundTests(unittest.TestCase):
|
|||
s = socket.socket(socket.AF_INET)
|
||||
s.connect(self.server_addr)
|
||||
s.setblocking(False)
|
||||
s = ssl.wrap_socket(s,
|
||||
s = test_wrap_socket(s,
|
||||
cert_reqs=ssl.CERT_NONE,
|
||||
do_handshake_on_connect=False)
|
||||
self.addCleanup(s.close)
|
||||
|
@ -1622,16 +1637,16 @@ class SimpleBackgroundTests(unittest.TestCase):
|
|||
_test_get_server_certificate_fail(self, *self.server_addr)
|
||||
|
||||
def test_ciphers(self):
|
||||
with ssl.wrap_socket(socket.socket(socket.AF_INET),
|
||||
with test_wrap_socket(socket.socket(socket.AF_INET),
|
||||
cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
|
||||
s.connect(self.server_addr)
|
||||
with ssl.wrap_socket(socket.socket(socket.AF_INET),
|
||||
with test_wrap_socket(socket.socket(socket.AF_INET),
|
||||
cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
|
||||
s.connect(self.server_addr)
|
||||
# Error checking can happen at instantiation or when connecting
|
||||
with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
|
||||
with socket.socket(socket.AF_INET) as sock:
|
||||
s = ssl.wrap_socket(sock,
|
||||
s = test_wrap_socket(sock,
|
||||
cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
|
||||
s.connect(self.server_addr)
|
||||
|
||||
|
@ -1749,7 +1764,7 @@ class NetworkedTests(unittest.TestCase):
|
|||
# Issue #12065: on a timeout, connect_ex() should return the original
|
||||
# errno (mimicking the behaviour of non-SSL sockets).
|
||||
with support.transient_internet(REMOTE_HOST):
|
||||
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
|
||||
s = test_wrap_socket(socket.socket(socket.AF_INET),
|
||||
cert_reqs=ssl.CERT_REQUIRED,
|
||||
do_handshake_on_connect=False)
|
||||
self.addCleanup(s.close)
|
||||
|
@ -2040,7 +2055,7 @@ if _have_threads:
|
|||
class ConnectionHandler (asyncore.dispatcher_with_send):
|
||||
|
||||
def __init__(self, conn, certfile):
|
||||
self.socket = ssl.wrap_socket(conn, server_side=True,
|
||||
self.socket = test_wrap_socket(conn, server_side=True,
|
||||
certfile=certfile,
|
||||
do_handshake_on_connect=False)
|
||||
asyncore.dispatcher_with_send.__init__(self, self.socket)
|
||||
|
@ -2401,7 +2416,7 @@ if _have_threads:
|
|||
connectionchatty=False)
|
||||
with server, \
|
||||
socket.socket() as sock, \
|
||||
ssl.wrap_socket(sock,
|
||||
test_wrap_socket(sock,
|
||||
certfile=certfile,
|
||||
ssl_version=ssl.PROTOCOL_TLSv1) as s:
|
||||
try:
|
||||
|
@ -2448,7 +2463,7 @@ if _have_threads:
|
|||
c.connect((HOST, port))
|
||||
listener_gone.wait()
|
||||
try:
|
||||
ssl_sock = ssl.wrap_socket(c)
|
||||
ssl_sock = test_wrap_socket(c)
|
||||
except OSError:
|
||||
pass
|
||||
else:
|
||||
|
@ -2638,7 +2653,7 @@ if _have_threads:
|
|||
sys.stdout.write(
|
||||
" client: read %r from server, starting TLS...\n"
|
||||
% msg)
|
||||
conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
|
||||
conn = test_wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
|
||||
wrapped = True
|
||||
elif indata == b"ENDTLS" and msg.startswith(b"ok"):
|
||||
# ENDTLS ok, switch back to clear text
|
||||
|
@ -2699,7 +2714,7 @@ if _have_threads:
|
|||
indata = b"FOO\n"
|
||||
server = AsyncoreEchoServer(CERTFILE)
|
||||
with server:
|
||||
s = ssl.wrap_socket(socket.socket())
|
||||
s = test_wrap_socket(socket.socket())
|
||||
s.connect(('127.0.0.1', server.port))
|
||||
if support.verbose:
|
||||
sys.stdout.write(
|
||||
|
@ -2732,7 +2747,7 @@ if _have_threads:
|
|||
chatty=True,
|
||||
connectionchatty=False)
|
||||
with server:
|
||||
s = ssl.wrap_socket(socket.socket(),
|
||||
s = test_wrap_socket(socket.socket(),
|
||||
server_side=False,
|
||||
certfile=CERTFILE,
|
||||
ca_certs=CERTFILE,
|
||||
|
@ -2856,7 +2871,7 @@ if _have_threads:
|
|||
self.addCleanup(server.__exit__, None, None)
|
||||
s = socket.create_connection((HOST, server.port))
|
||||
self.addCleanup(s.close)
|
||||
s = ssl.wrap_socket(s, suppress_ragged_eofs=False)
|
||||
s = test_wrap_socket(s, suppress_ragged_eofs=False)
|
||||
self.addCleanup(s.close)
|
||||
|
||||
# recv/read(0) should return no data
|
||||
|
@ -2878,7 +2893,7 @@ if _have_threads:
|
|||
chatty=True,
|
||||
connectionchatty=False)
|
||||
with server:
|
||||
s = ssl.wrap_socket(socket.socket(),
|
||||
s = test_wrap_socket(socket.socket(),
|
||||
server_side=False,
|
||||
certfile=CERTFILE,
|
||||
ca_certs=CERTFILE,
|
||||
|
@ -2932,12 +2947,12 @@ if _have_threads:
|
|||
c.connect((host, port))
|
||||
# Will attempt handshake and time out
|
||||
self.assertRaisesRegex(socket.timeout, "timed out",
|
||||
ssl.wrap_socket, c)
|
||||
test_wrap_socket, c)
|
||||
finally:
|
||||
c.close()
|
||||
try:
|
||||
c = socket.socket(socket.AF_INET)
|
||||
c = ssl.wrap_socket(c)
|
||||
c = test_wrap_socket(c)
|
||||
c.settimeout(0.2)
|
||||
# Will attempt handshake and time out
|
||||
self.assertRaisesRegex(socket.timeout, "timed out",
|
||||
|
@ -3062,7 +3077,7 @@ if _have_threads:
|
|||
chatty=True,
|
||||
connectionchatty=False)
|
||||
with server:
|
||||
s = ssl.wrap_socket(socket.socket(),
|
||||
s = test_wrap_socket(socket.socket(),
|
||||
server_side=False,
|
||||
certfile=CERTFILE,
|
||||
ca_certs=CERTFILE,
|
||||
|
@ -3087,7 +3102,7 @@ if _have_threads:
|
|||
s.close()
|
||||
|
||||
# now, again
|
||||
s = ssl.wrap_socket(socket.socket(),
|
||||
s = test_wrap_socket(socket.socket(),
|
||||
server_side=False,
|
||||
certfile=CERTFILE,
|
||||
ca_certs=CERTFILE,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue