mirror of
https://github.com/python/cpython.git
synced 2025-08-03 16:39:00 +00:00
bpo-32327: Convert asyncio functions documented as coroutines to coroutines. (#4872)
This commit is contained in:
parent
41264f1cd4
commit
19a44f63c7
11 changed files with 170 additions and 195 deletions
|
@ -157,20 +157,6 @@ def _ipaddr_info(host, port, family, type, proto):
|
|||
return None
|
||||
|
||||
|
||||
def _ensure_resolved(address, *, family=0, type=socket.SOCK_STREAM, proto=0,
|
||||
flags=0, loop):
|
||||
host, port = address[:2]
|
||||
info = _ipaddr_info(host, port, family, type, proto)
|
||||
if info is not None:
|
||||
# "host" is already a resolved IP.
|
||||
fut = loop.create_future()
|
||||
fut.set_result([info])
|
||||
return fut
|
||||
else:
|
||||
return loop.getaddrinfo(host, port, family=family, type=type,
|
||||
proto=proto, flags=flags)
|
||||
|
||||
|
||||
def _run_until_complete_cb(fut):
|
||||
exc = fut._exception
|
||||
if isinstance(exc, BaseException) and not isinstance(exc, Exception):
|
||||
|
@ -614,7 +600,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
self._write_to_self()
|
||||
return handle
|
||||
|
||||
def run_in_executor(self, executor, func, *args):
|
||||
async def run_in_executor(self, executor, func, *args):
|
||||
self._check_closed()
|
||||
if self._debug:
|
||||
self._check_callback(func, 'run_in_executor')
|
||||
|
@ -623,7 +609,8 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
if executor is None:
|
||||
executor = concurrent.futures.ThreadPoolExecutor()
|
||||
self._default_executor = executor
|
||||
return futures.wrap_future(executor.submit(func, *args), loop=self)
|
||||
return await futures.wrap_future(
|
||||
executor.submit(func, *args), loop=self)
|
||||
|
||||
def set_default_executor(self, executor):
|
||||
self._default_executor = executor
|
||||
|
@ -652,17 +639,19 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
logger.debug(msg)
|
||||
return addrinfo
|
||||
|
||||
def getaddrinfo(self, host, port, *,
|
||||
family=0, type=0, proto=0, flags=0):
|
||||
async def getaddrinfo(self, host, port, *,
|
||||
family=0, type=0, proto=0, flags=0):
|
||||
if self._debug:
|
||||
return self.run_in_executor(None, self._getaddrinfo_debug,
|
||||
host, port, family, type, proto, flags)
|
||||
getaddr_func = self._getaddrinfo_debug
|
||||
else:
|
||||
return self.run_in_executor(None, socket.getaddrinfo,
|
||||
host, port, family, type, proto, flags)
|
||||
getaddr_func = socket.getaddrinfo
|
||||
|
||||
def getnameinfo(self, sockaddr, flags=0):
|
||||
return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
|
||||
return await self.run_in_executor(
|
||||
None, getaddr_func, host, port, family, type, proto, flags)
|
||||
|
||||
async def getnameinfo(self, sockaddr, flags=0):
|
||||
return await self.run_in_executor(
|
||||
None, socket.getnameinfo, sockaddr, flags)
|
||||
|
||||
async def create_connection(self, protocol_factory, host=None, port=None,
|
||||
*, ssl=None, family=0,
|
||||
|
@ -703,25 +692,17 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
raise ValueError(
|
||||
'host/port and sock can not be specified at the same time')
|
||||
|
||||
f1 = _ensure_resolved((host, port), family=family,
|
||||
type=socket.SOCK_STREAM, proto=proto,
|
||||
flags=flags, loop=self)
|
||||
fs = [f1]
|
||||
if local_addr is not None:
|
||||
f2 = _ensure_resolved(local_addr, family=family,
|
||||
type=socket.SOCK_STREAM, proto=proto,
|
||||
flags=flags, loop=self)
|
||||
fs.append(f2)
|
||||
else:
|
||||
f2 = None
|
||||
|
||||
await tasks.wait(fs, loop=self)
|
||||
|
||||
infos = f1.result()
|
||||
infos = await self._ensure_resolved(
|
||||
(host, port), family=family,
|
||||
type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
|
||||
if not infos:
|
||||
raise OSError('getaddrinfo() returned empty list')
|
||||
if f2 is not None:
|
||||
laddr_infos = f2.result()
|
||||
|
||||
if local_addr is not None:
|
||||
laddr_infos = await self._ensure_resolved(
|
||||
local_addr, family=family,
|
||||
type=socket.SOCK_STREAM, proto=proto,
|
||||
flags=flags, loop=self)
|
||||
if not laddr_infos:
|
||||
raise OSError('getaddrinfo() returned empty list')
|
||||
|
||||
|
@ -730,7 +711,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
try:
|
||||
sock = socket.socket(family=family, type=type, proto=proto)
|
||||
sock.setblocking(False)
|
||||
if f2 is not None:
|
||||
if local_addr is not None:
|
||||
for _, _, _, _, laddr in laddr_infos:
|
||||
try:
|
||||
sock.bind(laddr)
|
||||
|
@ -863,7 +844,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
assert isinstance(addr, tuple) and len(addr) == 2, (
|
||||
'2-tuple is expected')
|
||||
|
||||
infos = await _ensure_resolved(
|
||||
infos = await self._ensure_resolved(
|
||||
addr, family=family, type=socket.SOCK_DGRAM,
|
||||
proto=proto, flags=flags, loop=self)
|
||||
if not infos:
|
||||
|
@ -946,10 +927,22 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
|
||||
return transport, protocol
|
||||
|
||||
async def _ensure_resolved(self, address, *,
|
||||
family=0, type=socket.SOCK_STREAM,
|
||||
proto=0, flags=0, loop):
|
||||
host, port = address[:2]
|
||||
info = _ipaddr_info(host, port, family, type, proto)
|
||||
if info is not None:
|
||||
# "host" is already a resolved IP.
|
||||
return [info]
|
||||
else:
|
||||
return await loop.getaddrinfo(host, port, family=family, type=type,
|
||||
proto=proto, flags=flags)
|
||||
|
||||
async def _create_server_getaddrinfo(self, host, port, family, flags):
|
||||
infos = await _ensure_resolved((host, port), family=family,
|
||||
type=socket.SOCK_STREAM,
|
||||
flags=flags, loop=self)
|
||||
infos = await self._ensure_resolved((host, port), family=family,
|
||||
type=socket.SOCK_STREAM,
|
||||
flags=flags, loop=self)
|
||||
if not infos:
|
||||
raise OSError(f'getaddrinfo({host!r}) returned empty list')
|
||||
return infos
|
||||
|
|
|
@ -432,20 +432,20 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
|
|||
# Close the event loop
|
||||
super().close()
|
||||
|
||||
def sock_recv(self, sock, n):
|
||||
return self._proactor.recv(sock, n)
|
||||
async def sock_recv(self, sock, n):
|
||||
return await self._proactor.recv(sock, n)
|
||||
|
||||
def sock_recv_into(self, sock, buf):
|
||||
return self._proactor.recv_into(sock, buf)
|
||||
async def sock_recv_into(self, sock, buf):
|
||||
return await self._proactor.recv_into(sock, buf)
|
||||
|
||||
def sock_sendall(self, sock, data):
|
||||
return self._proactor.send(sock, data)
|
||||
async def sock_sendall(self, sock, data):
|
||||
return await self._proactor.send(sock, data)
|
||||
|
||||
def sock_connect(self, sock, address):
|
||||
return self._proactor.connect(sock, address)
|
||||
async def sock_connect(self, sock, address):
|
||||
return await self._proactor.connect(sock, address)
|
||||
|
||||
def sock_accept(self, sock):
|
||||
return self._proactor.accept(sock)
|
||||
async def sock_accept(self, sock):
|
||||
return await self._proactor.accept(sock)
|
||||
|
||||
def _close_self_pipe(self):
|
||||
if self._self_reading_future is not None:
|
||||
|
|
|
@ -336,20 +336,18 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
self._ensure_fd_no_transport(fd)
|
||||
return self._remove_writer(fd)
|
||||
|
||||
def sock_recv(self, sock, n):
|
||||
async def sock_recv(self, sock, n):
|
||||
"""Receive data from the socket.
|
||||
|
||||
The return value is a bytes object representing the data received.
|
||||
The maximum amount of data to be received at once is specified by
|
||||
nbytes.
|
||||
|
||||
This method is a coroutine.
|
||||
"""
|
||||
if self._debug and sock.gettimeout() != 0:
|
||||
raise ValueError("the socket must be non-blocking")
|
||||
fut = self.create_future()
|
||||
self._sock_recv(fut, None, sock, n)
|
||||
return fut
|
||||
return await fut
|
||||
|
||||
def _sock_recv(self, fut, registered_fd, sock, n):
|
||||
# _sock_recv() can add itself as an I/O callback if the operation can't
|
||||
|
@ -372,19 +370,17 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
else:
|
||||
fut.set_result(data)
|
||||
|
||||
def sock_recv_into(self, sock, buf):
|
||||
async def sock_recv_into(self, sock, buf):
|
||||
"""Receive data from the socket.
|
||||
|
||||
The received data is written into *buf* (a writable buffer).
|
||||
The return value is the number of bytes written.
|
||||
|
||||
This method is a coroutine.
|
||||
"""
|
||||
if self._debug and sock.gettimeout() != 0:
|
||||
raise ValueError("the socket must be non-blocking")
|
||||
fut = self.create_future()
|
||||
self._sock_recv_into(fut, None, sock, buf)
|
||||
return fut
|
||||
return await fut
|
||||
|
||||
def _sock_recv_into(self, fut, registered_fd, sock, buf):
|
||||
# _sock_recv_into() can add itself as an I/O callback if the operation
|
||||
|
@ -408,7 +404,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
else:
|
||||
fut.set_result(nbytes)
|
||||
|
||||
def sock_sendall(self, sock, data):
|
||||
async def sock_sendall(self, sock, data):
|
||||
"""Send data to the socket.
|
||||
|
||||
The socket must be connected to a remote socket. This method continues
|
||||
|
@ -416,8 +412,6 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
error occurs. None is returned on success. On error, an exception is
|
||||
raised, and there is no way to determine how much data, if any, was
|
||||
successfully processed by the receiving end of the connection.
|
||||
|
||||
This method is a coroutine.
|
||||
"""
|
||||
if self._debug and sock.gettimeout() != 0:
|
||||
raise ValueError("the socket must be non-blocking")
|
||||
|
@ -426,7 +420,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
self._sock_sendall(fut, None, sock, data)
|
||||
else:
|
||||
fut.set_result(None)
|
||||
return fut
|
||||
return await fut
|
||||
|
||||
def _sock_sendall(self, fut, registered_fd, sock, data):
|
||||
if registered_fd is not None:
|
||||
|
@ -459,11 +453,9 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
raise ValueError("the socket must be non-blocking")
|
||||
|
||||
if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX:
|
||||
resolved = base_events._ensure_resolved(
|
||||
resolved = await self._ensure_resolved(
|
||||
address, family=sock.family, proto=sock.proto, loop=self)
|
||||
if not resolved.done():
|
||||
await resolved
|
||||
_, _, _, _, address = resolved.result()[0]
|
||||
_, _, _, _, address = resolved[0]
|
||||
|
||||
fut = self.create_future()
|
||||
self._sock_connect(fut, sock, address)
|
||||
|
@ -506,21 +498,19 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
else:
|
||||
fut.set_result(None)
|
||||
|
||||
def sock_accept(self, sock):
|
||||
async def sock_accept(self, sock):
|
||||
"""Accept a connection.
|
||||
|
||||
The socket must be bound to an address and listening for connections.
|
||||
The return value is a pair (conn, address) where conn is a new socket
|
||||
object usable to send and receive data on the connection, and address
|
||||
is the address bound to the socket on the other end of the connection.
|
||||
|
||||
This method is a coroutine.
|
||||
"""
|
||||
if self._debug and sock.gettimeout() != 0:
|
||||
raise ValueError("the socket must be non-blocking")
|
||||
fut = self.create_future()
|
||||
self._sock_accept(fut, False, sock)
|
||||
return fut
|
||||
return await fut
|
||||
|
||||
def _sock_accept(self, fut, registered, sock):
|
||||
fd = sock.fileno()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue