mirror of
https://github.com/python/cpython.git
synced 2025-08-04 00:48:58 +00:00
Merge 3.4 (asyncio)
This commit is contained in:
commit
d1fcca8ada
4 changed files with 73 additions and 12 deletions
|
@ -57,6 +57,17 @@ ONLYCERT = data_file('ssl_cert.pem')
|
|||
ONLYKEY = data_file('ssl_key.pem')
|
||||
SIGNED_CERTFILE = data_file('keycert3.pem')
|
||||
SIGNING_CA = data_file('pycacert.pem')
|
||||
PEERCERT = {'serialNumber': 'B09264B1F2DA21D1',
|
||||
'version': 1,
|
||||
'subject': ((('countryName', 'XY'),),
|
||||
(('localityName', 'Castle Anthrax'),),
|
||||
(('organizationName', 'Python Software Foundation'),),
|
||||
(('commonName', 'localhost'),)),
|
||||
'issuer': ((('countryName', 'XY'),),
|
||||
(('organizationName', 'Python Software Foundation CA'),),
|
||||
(('commonName', 'our-ca-server'),)),
|
||||
'notAfter': 'Nov 13 19:47:07 2022 GMT',
|
||||
'notBefore': 'Jan 4 19:47:07 2013 GMT'}
|
||||
|
||||
|
||||
class MyBaseProto(asyncio.Protocol):
|
||||
|
@ -596,22 +607,56 @@ class EventLoopTestsMixin:
|
|||
self.assertGreater(pr.nbytes, 0)
|
||||
tr.close()
|
||||
|
||||
def check_ssl_extra_info(self, client, check_sockname=True,
|
||||
peername=None, peercert={}):
|
||||
if check_sockname:
|
||||
self.assertIsNotNone(client.get_extra_info('sockname'))
|
||||
if peername:
|
||||
self.assertEqual(peername,
|
||||
client.get_extra_info('peername'))
|
||||
else:
|
||||
self.assertIsNotNone(client.get_extra_info('peername'))
|
||||
self.assertEqual(peercert,
|
||||
client.get_extra_info('peercert'))
|
||||
|
||||
# Python disables compression to prevent CRIME attacks by default
|
||||
self.assertIsNone(client.get_extra_info('compression'))
|
||||
|
||||
# test SSL cipher
|
||||
cipher = client.get_extra_info('cipher')
|
||||
self.assertIsInstance(cipher, tuple)
|
||||
self.assertEqual(len(cipher), 3, cipher)
|
||||
self.assertIsInstance(cipher[0], str)
|
||||
self.assertIsInstance(cipher[1], str)
|
||||
self.assertIsInstance(cipher[2], int)
|
||||
|
||||
# test SSL object
|
||||
sslobj = client.get_extra_info('ssl_object')
|
||||
self.assertIsNotNone(sslobj)
|
||||
self.assertEqual(sslobj.compression(),
|
||||
client.get_extra_info('compression'))
|
||||
self.assertEqual(sslobj.cipher(),
|
||||
client.get_extra_info('cipher'))
|
||||
self.assertEqual(sslobj.getpeercert(),
|
||||
client.get_extra_info('peercert'))
|
||||
|
||||
def _basetest_create_ssl_connection(self, connection_fut,
|
||||
check_sockname=True):
|
||||
check_sockname=True,
|
||||
peername=None):
|
||||
tr, pr = self.loop.run_until_complete(connection_fut)
|
||||
self.assertIsInstance(tr, asyncio.Transport)
|
||||
self.assertIsInstance(pr, asyncio.Protocol)
|
||||
self.assertTrue('ssl' in tr.__class__.__name__.lower())
|
||||
if check_sockname:
|
||||
self.assertIsNotNone(tr.get_extra_info('sockname'))
|
||||
self.check_ssl_extra_info(tr, check_sockname, peername)
|
||||
self.loop.run_until_complete(pr.done)
|
||||
self.assertGreater(pr.nbytes, 0)
|
||||
tr.close()
|
||||
|
||||
def _test_create_ssl_connection(self, httpd, create_connection,
|
||||
check_sockname=True):
|
||||
check_sockname=True, peername=None):
|
||||
conn_fut = create_connection(ssl=test_utils.dummy_ssl_context())
|
||||
self._basetest_create_ssl_connection(conn_fut, check_sockname)
|
||||
self._basetest_create_ssl_connection(conn_fut, check_sockname,
|
||||
peername)
|
||||
|
||||
# ssl.Purpose was introduced in Python 3.4
|
||||
if hasattr(ssl, 'Purpose'):
|
||||
|
@ -629,7 +674,8 @@ class EventLoopTestsMixin:
|
|||
with mock.patch('ssl.create_default_context',
|
||||
side_effect=_dummy_ssl_create_context) as m:
|
||||
conn_fut = create_connection(ssl=True)
|
||||
self._basetest_create_ssl_connection(conn_fut, check_sockname)
|
||||
self._basetest_create_ssl_connection(conn_fut, check_sockname,
|
||||
peername)
|
||||
self.assertEqual(m.call_count, 1)
|
||||
|
||||
# With the real ssl.create_default_context(), certificate
|
||||
|
@ -638,7 +684,8 @@ class EventLoopTestsMixin:
|
|||
conn_fut = create_connection(ssl=True)
|
||||
# Ignore the "SSL handshake failed" log in debug mode
|
||||
with test_utils.disable_logger():
|
||||
self._basetest_create_ssl_connection(conn_fut, check_sockname)
|
||||
self._basetest_create_ssl_connection(conn_fut, check_sockname,
|
||||
peername)
|
||||
|
||||
self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
|
||||
|
||||
|
@ -649,7 +696,8 @@ class EventLoopTestsMixin:
|
|||
self.loop.create_connection,
|
||||
lambda: MyProto(loop=self.loop),
|
||||
*httpd.address)
|
||||
self._test_create_ssl_connection(httpd, create_connection)
|
||||
self._test_create_ssl_connection(httpd, create_connection,
|
||||
peername=httpd.address)
|
||||
|
||||
def test_legacy_create_ssl_connection(self):
|
||||
with test_utils.force_legacy_ssl_support():
|
||||
|
@ -669,7 +717,8 @@ class EventLoopTestsMixin:
|
|||
server_hostname='127.0.0.1')
|
||||
|
||||
self._test_create_ssl_connection(httpd, create_connection,
|
||||
check_sockname)
|
||||
check_sockname,
|
||||
peername=httpd.address)
|
||||
|
||||
def test_legacy_create_ssl_unix_connection(self):
|
||||
with test_utils.force_legacy_ssl_support():
|
||||
|
@ -819,9 +868,7 @@ class EventLoopTestsMixin:
|
|||
self.assertEqual(3, proto.nbytes)
|
||||
|
||||
# extra info is available
|
||||
self.assertIsNotNone(proto.transport.get_extra_info('sockname'))
|
||||
self.assertEqual('127.0.0.1',
|
||||
proto.transport.get_extra_info('peername')[0])
|
||||
self.check_ssl_extra_info(client, peername=(host, port))
|
||||
|
||||
# close connection
|
||||
proto.transport.close()
|
||||
|
@ -1023,6 +1070,10 @@ class EventLoopTestsMixin:
|
|||
server_hostname='localhost')
|
||||
client, pr = self.loop.run_until_complete(f_c)
|
||||
|
||||
# extra info is available
|
||||
self.check_ssl_extra_info(client,peername=(host, port),
|
||||
peercert=PEERCERT)
|
||||
|
||||
# close connection
|
||||
proto.transport.close()
|
||||
client.close()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue