mirror of
https://github.com/python/cpython.git
synced 2025-09-26 18:29:57 +00:00
Issue #16923: Fix ResourceWarnings in test_ssl.
This commit is contained in:
commit
e9bb4733d9
2 changed files with 37 additions and 33 deletions
|
@ -217,6 +217,7 @@ class BasicSocketTests(unittest.TestCase):
|
||||||
s = socket.socket(socket.AF_INET)
|
s = socket.socket(socket.AF_INET)
|
||||||
ss = ssl.wrap_socket(s)
|
ss = ssl.wrap_socket(s)
|
||||||
wr = weakref.ref(ss)
|
wr = weakref.ref(ss)
|
||||||
|
with support.check_warnings(("", ResourceWarning)):
|
||||||
del ss
|
del ss
|
||||||
self.assertEqual(wr(), None)
|
self.assertEqual(wr(), None)
|
||||||
|
|
||||||
|
@ -224,7 +225,7 @@ class BasicSocketTests(unittest.TestCase):
|
||||||
# Methods on an unconnected SSLSocket propagate the original
|
# Methods on an unconnected SSLSocket propagate the original
|
||||||
# OSError raise by the underlying socket object.
|
# OSError raise by the underlying socket object.
|
||||||
s = socket.socket(socket.AF_INET)
|
s = socket.socket(socket.AF_INET)
|
||||||
ss = ssl.wrap_socket(s)
|
with ssl.wrap_socket(s) as ss:
|
||||||
self.assertRaises(OSError, ss.recv, 1)
|
self.assertRaises(OSError, ss.recv, 1)
|
||||||
self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
|
self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
|
||||||
self.assertRaises(OSError, ss.recvfrom, 1)
|
self.assertRaises(OSError, ss.recvfrom, 1)
|
||||||
|
@ -238,7 +239,7 @@ class BasicSocketTests(unittest.TestCase):
|
||||||
for timeout in (None, 0.0, 5.0):
|
for timeout in (None, 0.0, 5.0):
|
||||||
s = socket.socket(socket.AF_INET)
|
s = socket.socket(socket.AF_INET)
|
||||||
s.settimeout(timeout)
|
s.settimeout(timeout)
|
||||||
ss = ssl.wrap_socket(s)
|
with ssl.wrap_socket(s) as ss:
|
||||||
self.assertEqual(timeout, ss.gettimeout())
|
self.assertEqual(timeout, ss.gettimeout())
|
||||||
|
|
||||||
def test_errors(self):
|
def test_errors(self):
|
||||||
|
@ -252,7 +253,7 @@ class BasicSocketTests(unittest.TestCase):
|
||||||
self.assertRaisesRegex(ValueError,
|
self.assertRaisesRegex(ValueError,
|
||||||
"certfile must be specified for server-side operations",
|
"certfile must be specified for server-side operations",
|
||||||
ssl.wrap_socket, sock, server_side=True, certfile="")
|
ssl.wrap_socket, sock, server_side=True, certfile="")
|
||||||
s = ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE)
|
with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
|
||||||
self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
|
self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
|
||||||
s.connect, (HOST, 8080))
|
s.connect, (HOST, 8080))
|
||||||
with self.assertRaises(OSError) as cm:
|
with self.assertRaises(OSError) as cm:
|
||||||
|
@ -367,7 +368,7 @@ class BasicSocketTests(unittest.TestCase):
|
||||||
def test_unknown_channel_binding(self):
|
def test_unknown_channel_binding(self):
|
||||||
# should raise ValueError for unknown type
|
# should raise ValueError for unknown type
|
||||||
s = socket.socket(socket.AF_INET)
|
s = socket.socket(socket.AF_INET)
|
||||||
ss = ssl.wrap_socket(s)
|
with ssl.wrap_socket(s) as ss:
|
||||||
with self.assertRaises(ValueError):
|
with self.assertRaises(ValueError):
|
||||||
ss.get_channel_binding("unknown-type")
|
ss.get_channel_binding("unknown-type")
|
||||||
|
|
||||||
|
@ -376,11 +377,11 @@ class BasicSocketTests(unittest.TestCase):
|
||||||
def test_tls_unique_channel_binding(self):
|
def test_tls_unique_channel_binding(self):
|
||||||
# unconnected should return None for known type
|
# unconnected should return None for known type
|
||||||
s = socket.socket(socket.AF_INET)
|
s = socket.socket(socket.AF_INET)
|
||||||
ss = ssl.wrap_socket(s)
|
with ssl.wrap_socket(s) as ss:
|
||||||
self.assertIsNone(ss.get_channel_binding("tls-unique"))
|
self.assertIsNone(ss.get_channel_binding("tls-unique"))
|
||||||
# the same for server-side
|
# the same for server-side
|
||||||
s = socket.socket(socket.AF_INET)
|
s = socket.socket(socket.AF_INET)
|
||||||
ss = ssl.wrap_socket(s, server_side=True, certfile=CERTFILE)
|
with ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
|
||||||
self.assertIsNone(ss.get_channel_binding("tls-unique"))
|
self.assertIsNone(ss.get_channel_binding("tls-unique"))
|
||||||
|
|
||||||
def test_dealloc_warn(self):
|
def test_dealloc_warn(self):
|
||||||
|
@ -660,10 +661,10 @@ class SSLErrorTests(unittest.TestCase):
|
||||||
with socket.socket() as s:
|
with socket.socket() as s:
|
||||||
s.bind(("127.0.0.1", 0))
|
s.bind(("127.0.0.1", 0))
|
||||||
s.listen(5)
|
s.listen(5)
|
||||||
with socket.socket() as c:
|
c = socket.socket()
|
||||||
c.connect(s.getsockname())
|
c.connect(s.getsockname())
|
||||||
c.setblocking(False)
|
c.setblocking(False)
|
||||||
c = ctx.wrap_socket(c, False, do_handshake_on_connect=False)
|
with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
|
||||||
with self.assertRaises(ssl.SSLWantReadError) as cm:
|
with self.assertRaises(ssl.SSLWantReadError) as cm:
|
||||||
c.do_handshake()
|
c.do_handshake()
|
||||||
s = str(cm.exception)
|
s = str(cm.exception)
|
||||||
|
@ -904,11 +905,11 @@ class NetworkedTests(unittest.TestCase):
|
||||||
def test_ciphers(self):
|
def test_ciphers(self):
|
||||||
remote = ("svn.python.org", 443)
|
remote = ("svn.python.org", 443)
|
||||||
with support.transient_internet(remote[0]):
|
with support.transient_internet(remote[0]):
|
||||||
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
|
with ssl.wrap_socket(socket.socket(socket.AF_INET),
|
||||||
cert_reqs=ssl.CERT_NONE, ciphers="ALL")
|
cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
|
||||||
s.connect(remote)
|
s.connect(remote)
|
||||||
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
|
with ssl.wrap_socket(socket.socket(socket.AF_INET),
|
||||||
cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")
|
cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
|
||||||
s.connect(remote)
|
s.connect(remote)
|
||||||
# Error checking can happen at instantiation or when connecting
|
# Error checking can happen at instantiation or when connecting
|
||||||
with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
|
with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
|
||||||
|
@ -1886,6 +1887,8 @@ else:
|
||||||
client_addr = client.getsockname()
|
client_addr = client.getsockname()
|
||||||
client.close()
|
client.close()
|
||||||
t.join()
|
t.join()
|
||||||
|
remote.close()
|
||||||
|
server.close()
|
||||||
# Sanity checks.
|
# Sanity checks.
|
||||||
self.assertIsInstance(remote, ssl.SSLSocket)
|
self.assertIsInstance(remote, ssl.SSLSocket)
|
||||||
self.assertEqual(peer, client_addr)
|
self.assertEqual(peer, client_addr)
|
||||||
|
@ -1900,8 +1903,7 @@ else:
|
||||||
with ThreadedEchoServer(CERTFILE,
|
with ThreadedEchoServer(CERTFILE,
|
||||||
ssl_version=ssl.PROTOCOL_SSLv23,
|
ssl_version=ssl.PROTOCOL_SSLv23,
|
||||||
chatty=False) as server:
|
chatty=False) as server:
|
||||||
with socket.socket() as sock:
|
with context.wrap_socket(socket.socket()) as s:
|
||||||
s = context.wrap_socket(sock)
|
|
||||||
with self.assertRaises(OSError):
|
with self.assertRaises(OSError):
|
||||||
s.connect((HOST, server.port))
|
s.connect((HOST, server.port))
|
||||||
self.assertIn("no shared cipher", str(server.conn_errors[0]))
|
self.assertIn("no shared cipher", str(server.conn_errors[0]))
|
||||||
|
|
|
@ -635,6 +635,8 @@ Extension Modules
|
||||||
Tests
|
Tests
|
||||||
-----
|
-----
|
||||||
|
|
||||||
|
- Issue #16923: Fix ResourceWarnings in test_ssl.
|
||||||
|
|
||||||
- Issue #15539: Added regression tests for Tools/scripts/pindent.py.
|
- Issue #15539: Added regression tests for Tools/scripts/pindent.py.
|
||||||
|
|
||||||
- Issue #16836: Enable IPv6 support even if IPv6 is disabled on the build host.
|
- Issue #16836: Enable IPv6 support even if IPv6 is disabled on the build host.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue