Issue #23972: updates to asyncio datagram API. By Chris Laws. (Merge 3.4->3.5.)

This commit is contained in:
Guido van Rossum 2015-10-05 09:19:11 -07:00
commit eda1955d82
7 changed files with 385 additions and 74 deletions

View file

@ -3,6 +3,7 @@
import errno
import logging
import math
import os
import socket
import sys
import threading
@ -790,11 +791,11 @@ class MyProto(asyncio.Protocol):
class MyDatagramProto(asyncio.DatagramProtocol):
done = None
def __init__(self, create_future=False):
def __init__(self, create_future=False, loop=None):
self.state = 'INITIAL'
self.nbytes = 0
if create_future:
self.done = asyncio.Future()
self.done = asyncio.Future(loop=loop)
def connection_made(self, transport):
self.transport = transport
@ -1099,6 +1100,19 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
f = self.loop.create_server(MyProto, '0.0.0.0', 0)
self.assertRaises(OSError, self.loop.run_until_complete, f)
@mock.patch('asyncio.base_events.socket')
def test_create_server_nosoreuseport(self, m_socket):
m_socket.getaddrinfo = socket.getaddrinfo
m_socket.SOCK_STREAM = socket.SOCK_STREAM
m_socket.SOL_SOCKET = socket.SOL_SOCKET
del m_socket.SO_REUSEPORT
m_socket.socket.return_value = mock.Mock()
f = self.loop.create_server(
MyProto, '0.0.0.0', 0, reuse_port=True)
self.assertRaises(ValueError, self.loop.run_until_complete, f)
@mock.patch('asyncio.base_events.socket')
def test_create_server_cant_bind(self, m_socket):
@ -1199,6 +1213,128 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
self.assertRaises(Err, self.loop.run_until_complete, fut)
self.assertTrue(m_sock.close.called)
def test_create_datagram_endpoint_sock(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
fut = self.loop.create_datagram_endpoint(
lambda: MyDatagramProto(create_future=True, loop=self.loop),
sock=sock)
transport, protocol = self.loop.run_until_complete(fut)
transport.close()
self.loop.run_until_complete(protocol.done)
self.assertEqual('CLOSED', protocol.state)
def test_create_datagram_endpoint_sock_sockopts(self):
fut = self.loop.create_datagram_endpoint(
MyDatagramProto, local_addr=('127.0.0.1', 0), sock=object())
self.assertRaises(ValueError, self.loop.run_until_complete, fut)
fut = self.loop.create_datagram_endpoint(
MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=object())
self.assertRaises(ValueError, self.loop.run_until_complete, fut)
fut = self.loop.create_datagram_endpoint(
MyDatagramProto, family=1, sock=object())
self.assertRaises(ValueError, self.loop.run_until_complete, fut)
fut = self.loop.create_datagram_endpoint(
MyDatagramProto, proto=1, sock=object())
self.assertRaises(ValueError, self.loop.run_until_complete, fut)
fut = self.loop.create_datagram_endpoint(
MyDatagramProto, flags=1, sock=object())
self.assertRaises(ValueError, self.loop.run_until_complete, fut)
fut = self.loop.create_datagram_endpoint(
MyDatagramProto, reuse_address=True, sock=object())
self.assertRaises(ValueError, self.loop.run_until_complete, fut)
fut = self.loop.create_datagram_endpoint(
MyDatagramProto, reuse_port=True, sock=object())
self.assertRaises(ValueError, self.loop.run_until_complete, fut)
fut = self.loop.create_datagram_endpoint(
MyDatagramProto, allow_broadcast=True, sock=object())
self.assertRaises(ValueError, self.loop.run_until_complete, fut)
def test_create_datagram_endpoint_sockopts(self):
# Socket options should not be applied unless asked for.
# SO_REUSEADDR defaults to on for UNIX.
# SO_REUSEPORT is not available on all platforms.
coro = self.loop.create_datagram_endpoint(
lambda: MyDatagramProto(create_future=True, loop=self.loop),
local_addr=('127.0.0.1', 0))
transport, protocol = self.loop.run_until_complete(coro)
sock = transport.get_extra_info('socket')
reuse_address_default_on = (
os.name == 'posix' and sys.platform != 'cygwin')
reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
if reuse_address_default_on:
self.assertTrue(
sock.getsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR))
else:
self.assertFalse(
sock.getsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR))
if reuseport_supported:
self.assertFalse(
sock.getsockopt(
socket.SOL_SOCKET, socket.SO_REUSEPORT))
self.assertFalse(
sock.getsockopt(
socket.SOL_SOCKET, socket.SO_BROADCAST))
transport.close()
self.loop.run_until_complete(protocol.done)
self.assertEqual('CLOSED', protocol.state)
coro = self.loop.create_datagram_endpoint(
lambda: MyDatagramProto(create_future=True, loop=self.loop),
local_addr=('127.0.0.1', 0),
reuse_address=True,
reuse_port=reuseport_supported,
allow_broadcast=True)
transport, protocol = self.loop.run_until_complete(coro)
sock = transport.get_extra_info('socket')
self.assertTrue(
sock.getsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR))
if reuseport_supported:
self.assertTrue(
sock.getsockopt(
socket.SOL_SOCKET, socket.SO_REUSEPORT))
else:
self.assertFalse(
sock.getsockopt(
socket.SOL_SOCKET, socket.SO_REUSEPORT))
self.assertTrue(
sock.getsockopt(
socket.SOL_SOCKET, socket.SO_BROADCAST))
transport.close()
self.loop.run_until_complete(protocol.done)
self.assertEqual('CLOSED', protocol.state)
@mock.patch('asyncio.base_events.socket')
def test_create_datagram_endpoint_nosoreuseport(self, m_socket):
m_socket.getaddrinfo = socket.getaddrinfo
m_socket.SOCK_DGRAM = socket.SOCK_DGRAM
m_socket.SOL_SOCKET = socket.SOL_SOCKET
del m_socket.SO_REUSEPORT
m_socket.socket.return_value = mock.Mock()
coro = self.loop.create_datagram_endpoint(
lambda: MyDatagramProto(loop=self.loop),
local_addr=('127.0.0.1', 0),
reuse_address=False,
reuse_port=True)
self.assertRaises(ValueError, self.loop.run_until_complete, coro)
def test_accept_connection_retry(self):
sock = mock.Mock()
sock.accept.side_effect = BlockingIOError()

View file

@ -814,6 +814,32 @@ class EventLoopTestsMixin:
# close server
server.close()
@unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT')
def test_create_server_reuse_port(self):
proto = MyProto(self.loop)
f = self.loop.create_server(
lambda: proto, '0.0.0.0', 0)
server = self.loop.run_until_complete(f)
self.assertEqual(len(server.sockets), 1)
sock = server.sockets[0]
self.assertFalse(
sock.getsockopt(
socket.SOL_SOCKET, socket.SO_REUSEPORT))
server.close()
test_utils.run_briefly(self.loop)
proto = MyProto(self.loop)
f = self.loop.create_server(
lambda: proto, '0.0.0.0', 0, reuse_port=True)
server = self.loop.run_until_complete(f)
self.assertEqual(len(server.sockets), 1)
sock = server.sockets[0]
self.assertTrue(
sock.getsockopt(
socket.SOL_SOCKET, socket.SO_REUSEPORT))
server.close()
def _make_unix_server(self, factory, **kwargs):
path = test_utils.gen_unix_socket_path()
self.addCleanup(lambda: os.path.exists(path) and os.unlink(path))
@ -1264,6 +1290,32 @@ class EventLoopTestsMixin:
self.assertEqual('CLOSED', client.state)
server.transport.close()
def test_create_datagram_endpoint_sock(self):
sock = None
local_address = ('127.0.0.1', 0)
infos = self.loop.run_until_complete(
self.loop.getaddrinfo(
*local_address, type=socket.SOCK_DGRAM))
for family, type, proto, cname, address in infos:
try:
sock = socket.socket(family=family, type=type, proto=proto)
sock.setblocking(False)
sock.bind(address)
except:
pass
else:
break
else:
assert False, 'Can not create socket.'
f = self.loop.create_connection(
lambda: MyDatagramProto(loop=self.loop), sock=sock)
tr, pr = self.loop.run_until_complete(f)
self.assertIsInstance(tr, asyncio.Transport)
self.assertIsInstance(pr, MyDatagramProto)
tr.close()
self.loop.run_until_complete(pr.done)
def test_internal_fds(self):
loop = self.create_event_loop()
if not isinstance(loop, selector_events.BaseSelectorEventLoop):