mirror of
https://github.com/python/cpython.git
synced 2025-07-20 01:35:19 +00:00
Use transient_internet() where appropriate in test_ssl
(svn.python.org is sometimes unavailable)
This commit is contained in:
parent
6e6cc830c4
commit
350c7229be
1 changed files with 131 additions and 130 deletions
|
@ -305,63 +305,59 @@ class ContextTests(unittest.TestCase):
|
||||||
|
|
||||||
|
|
||||||
class NetworkedTests(unittest.TestCase):
|
class NetworkedTests(unittest.TestCase):
|
||||||
def setUp(self):
|
|
||||||
self.old_timeout = socket.getdefaulttimeout()
|
|
||||||
socket.setdefaulttimeout(30)
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
socket.setdefaulttimeout(self.old_timeout)
|
|
||||||
|
|
||||||
def test_connect(self):
|
def test_connect(self):
|
||||||
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
|
with support.transient_internet("svn.python.org"):
|
||||||
cert_reqs=ssl.CERT_NONE)
|
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
|
||||||
try:
|
cert_reqs=ssl.CERT_NONE)
|
||||||
s.connect(("svn.python.org", 443))
|
try:
|
||||||
self.assertEqual({}, s.getpeercert())
|
s.connect(("svn.python.org", 443))
|
||||||
finally:
|
self.assertEqual({}, s.getpeercert())
|
||||||
|
finally:
|
||||||
|
s.close()
|
||||||
|
|
||||||
|
# this should fail because we have no verification certs
|
||||||
|
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
|
||||||
|
cert_reqs=ssl.CERT_REQUIRED)
|
||||||
|
self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
|
||||||
|
s.connect, ("svn.python.org", 443))
|
||||||
s.close()
|
s.close()
|
||||||
|
|
||||||
# this should fail because we have no verification certs
|
# this should succeed because we specify the root cert
|
||||||
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
|
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
|
||||||
cert_reqs=ssl.CERT_REQUIRED)
|
cert_reqs=ssl.CERT_REQUIRED,
|
||||||
self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
|
ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
|
||||||
s.connect, ("svn.python.org", 443))
|
try:
|
||||||
s.close()
|
s.connect(("svn.python.org", 443))
|
||||||
|
self.assertTrue(s.getpeercert())
|
||||||
# this should succeed because we specify the root cert
|
finally:
|
||||||
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
|
s.close()
|
||||||
cert_reqs=ssl.CERT_REQUIRED,
|
|
||||||
ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
|
|
||||||
try:
|
|
||||||
s.connect(("svn.python.org", 443))
|
|
||||||
self.assertTrue(s.getpeercert())
|
|
||||||
finally:
|
|
||||||
s.close()
|
|
||||||
|
|
||||||
def test_connect_with_context(self):
|
def test_connect_with_context(self):
|
||||||
# Same as test_connect, but with a separately created context
|
with support.transient_internet("svn.python.org"):
|
||||||
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
# Same as test_connect, but with a separately created context
|
||||||
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
|
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
||||||
s.connect(("svn.python.org", 443))
|
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
|
||||||
try:
|
s.connect(("svn.python.org", 443))
|
||||||
self.assertEqual({}, s.getpeercert())
|
try:
|
||||||
finally:
|
self.assertEqual({}, s.getpeercert())
|
||||||
s.close()
|
finally:
|
||||||
# This should fail because we have no verification certs
|
s.close()
|
||||||
ctx.verify_mode = ssl.CERT_REQUIRED
|
# This should fail because we have no verification certs
|
||||||
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
|
ctx.verify_mode = ssl.CERT_REQUIRED
|
||||||
self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
|
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
|
||||||
s.connect, ("svn.python.org", 443))
|
self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
|
||||||
s.close()
|
s.connect, ("svn.python.org", 443))
|
||||||
# This should succeed because we specify the root cert
|
|
||||||
ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
|
|
||||||
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
|
|
||||||
s.connect(("svn.python.org", 443))
|
|
||||||
try:
|
|
||||||
cert = s.getpeercert()
|
|
||||||
self.assertTrue(cert)
|
|
||||||
finally:
|
|
||||||
s.close()
|
s.close()
|
||||||
|
# This should succeed because we specify the root cert
|
||||||
|
ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
|
||||||
|
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
|
||||||
|
s.connect(("svn.python.org", 443))
|
||||||
|
try:
|
||||||
|
cert = s.getpeercert()
|
||||||
|
self.assertTrue(cert)
|
||||||
|
finally:
|
||||||
|
s.close()
|
||||||
|
|
||||||
def test_connect_capath(self):
|
def test_connect_capath(self):
|
||||||
# Verify server certificates using the `capath` argument
|
# Verify server certificates using the `capath` argument
|
||||||
|
@ -369,104 +365,109 @@ class NetworkedTests(unittest.TestCase):
|
||||||
# OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
|
# OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
|
||||||
# contain both versions of each certificate (same content, different
|
# contain both versions of each certificate (same content, different
|
||||||
# filename) for this test to be portable across OpenSSL releases.
|
# filename) for this test to be portable across OpenSSL releases.
|
||||||
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
with support.transient_internet("svn.python.org"):
|
||||||
ctx.verify_mode = ssl.CERT_REQUIRED
|
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
||||||
ctx.load_verify_locations(capath=CAPATH)
|
ctx.verify_mode = ssl.CERT_REQUIRED
|
||||||
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
|
ctx.load_verify_locations(capath=CAPATH)
|
||||||
s.connect(("svn.python.org", 443))
|
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
|
||||||
try:
|
s.connect(("svn.python.org", 443))
|
||||||
cert = s.getpeercert()
|
try:
|
||||||
self.assertTrue(cert)
|
cert = s.getpeercert()
|
||||||
finally:
|
self.assertTrue(cert)
|
||||||
s.close()
|
finally:
|
||||||
# Same with a bytes `capath` argument
|
s.close()
|
||||||
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
# Same with a bytes `capath` argument
|
||||||
ctx.verify_mode = ssl.CERT_REQUIRED
|
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
||||||
ctx.load_verify_locations(capath=BYTES_CAPATH)
|
ctx.verify_mode = ssl.CERT_REQUIRED
|
||||||
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
|
ctx.load_verify_locations(capath=BYTES_CAPATH)
|
||||||
s.connect(("svn.python.org", 443))
|
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
|
||||||
try:
|
s.connect(("svn.python.org", 443))
|
||||||
cert = s.getpeercert()
|
try:
|
||||||
self.assertTrue(cert)
|
cert = s.getpeercert()
|
||||||
finally:
|
self.assertTrue(cert)
|
||||||
s.close()
|
finally:
|
||||||
|
s.close()
|
||||||
|
|
||||||
@unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
|
@unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
|
||||||
def test_makefile_close(self):
|
def test_makefile_close(self):
|
||||||
# Issue #5238: creating a file-like object with makefile() shouldn't
|
# Issue #5238: creating a file-like object with makefile() shouldn't
|
||||||
# delay closing the underlying "real socket" (here tested with its
|
# delay closing the underlying "real socket" (here tested with its
|
||||||
# file descriptor, hence skipping the test under Windows).
|
# file descriptor, hence skipping the test under Windows).
|
||||||
ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
|
with support.transient_internet("svn.python.org"):
|
||||||
ss.connect(("svn.python.org", 443))
|
ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
|
||||||
fd = ss.fileno()
|
ss.connect(("svn.python.org", 443))
|
||||||
f = ss.makefile()
|
fd = ss.fileno()
|
||||||
f.close()
|
f = ss.makefile()
|
||||||
# The fd is still open
|
f.close()
|
||||||
os.read(fd, 0)
|
# The fd is still open
|
||||||
# Closing the SSL socket should close the fd too
|
|
||||||
ss.close()
|
|
||||||
gc.collect()
|
|
||||||
with self.assertRaises(OSError) as e:
|
|
||||||
os.read(fd, 0)
|
os.read(fd, 0)
|
||||||
self.assertEqual(e.exception.errno, errno.EBADF)
|
# Closing the SSL socket should close the fd too
|
||||||
|
ss.close()
|
||||||
|
gc.collect()
|
||||||
|
with self.assertRaises(OSError) as e:
|
||||||
|
os.read(fd, 0)
|
||||||
|
self.assertEqual(e.exception.errno, errno.EBADF)
|
||||||
|
|
||||||
def test_non_blocking_handshake(self):
|
def test_non_blocking_handshake(self):
|
||||||
s = socket.socket(socket.AF_INET)
|
with support.transient_internet("svn.python.org"):
|
||||||
s.connect(("svn.python.org", 443))
|
s = socket.socket(socket.AF_INET)
|
||||||
s.setblocking(False)
|
s.connect(("svn.python.org", 443))
|
||||||
s = ssl.wrap_socket(s,
|
s.setblocking(False)
|
||||||
cert_reqs=ssl.CERT_NONE,
|
s = ssl.wrap_socket(s,
|
||||||
do_handshake_on_connect=False)
|
cert_reqs=ssl.CERT_NONE,
|
||||||
count = 0
|
do_handshake_on_connect=False)
|
||||||
while True:
|
count = 0
|
||||||
try:
|
while True:
|
||||||
count += 1
|
try:
|
||||||
s.do_handshake()
|
count += 1
|
||||||
break
|
s.do_handshake()
|
||||||
except ssl.SSLError as err:
|
break
|
||||||
if err.args[0] == ssl.SSL_ERROR_WANT_READ:
|
except ssl.SSLError as err:
|
||||||
select.select([s], [], [])
|
if err.args[0] == ssl.SSL_ERROR_WANT_READ:
|
||||||
elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
|
select.select([s], [], [])
|
||||||
select.select([], [s], [])
|
elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
|
||||||
else:
|
select.select([], [s], [])
|
||||||
raise
|
else:
|
||||||
s.close()
|
raise
|
||||||
if support.verbose:
|
s.close()
|
||||||
sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
|
if support.verbose:
|
||||||
|
sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
|
||||||
|
|
||||||
def test_get_server_certificate(self):
|
def test_get_server_certificate(self):
|
||||||
pem = ssl.get_server_certificate(("svn.python.org", 443))
|
with support.transient_internet("svn.python.org"):
|
||||||
if not pem:
|
pem = ssl.get_server_certificate(("svn.python.org", 443))
|
||||||
self.fail("No server certificate on svn.python.org:443!")
|
if not pem:
|
||||||
|
self.fail("No server certificate on svn.python.org:443!")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
|
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
|
||||||
except ssl.SSLError as x:
|
except ssl.SSLError as x:
|
||||||
#should fail
|
#should fail
|
||||||
|
if support.verbose:
|
||||||
|
sys.stdout.write("%s\n" % x)
|
||||||
|
else:
|
||||||
|
self.fail("Got server certificate %s for svn.python.org!" % pem)
|
||||||
|
|
||||||
|
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
|
||||||
|
if not pem:
|
||||||
|
self.fail("No server certificate on svn.python.org:443!")
|
||||||
if support.verbose:
|
if support.verbose:
|
||||||
sys.stdout.write("%s\n" % x)
|
sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
|
||||||
else:
|
|
||||||
self.fail("Got server certificate %s for svn.python.org!" % pem)
|
|
||||||
|
|
||||||
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
|
|
||||||
if not pem:
|
|
||||||
self.fail("No server certificate on svn.python.org:443!")
|
|
||||||
if support.verbose:
|
|
||||||
sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
|
|
||||||
|
|
||||||
def test_ciphers(self):
|
def test_ciphers(self):
|
||||||
remote = ("svn.python.org", 443)
|
remote = ("svn.python.org", 443)
|
||||||
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
|
with support.transient_internet(remote[0]):
|
||||||
cert_reqs=ssl.CERT_NONE, ciphers="ALL")
|
|
||||||
s.connect(remote)
|
|
||||||
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
|
|
||||||
cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")
|
|
||||||
s.connect(remote)
|
|
||||||
# Error checking can happen at instantiation or when connecting
|
|
||||||
with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
|
|
||||||
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
|
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
|
||||||
cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
|
cert_reqs=ssl.CERT_NONE, ciphers="ALL")
|
||||||
s.connect(remote)
|
s.connect(remote)
|
||||||
|
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
|
||||||
|
cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")
|
||||||
|
s.connect(remote)
|
||||||
|
# Error checking can happen at instantiation or when connecting
|
||||||
|
with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
|
||||||
|
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
|
||||||
|
cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
|
||||||
|
s.connect(remote)
|
||||||
|
|
||||||
def test_algorithms(self):
|
def test_algorithms(self):
|
||||||
# Issue #8484: all algorithms should be available when verifying a
|
# Issue #8484: all algorithms should be available when verifying a
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue