mirror of
https://github.com/python/cpython.git
synced 2025-07-24 11:44:31 +00:00
bpo-32193: Convert asyncio to async/await usage (#4753)
* Convert asyncio/tasks.py to async/await * Convert asyncio/queues.py to async/await * Convert asyncio/test_utils.py to async/await * Convert asyncio/base_subprocess.py to async/await * Convert asyncio/subprocess.py to async/await * Convert asyncio/streams.py to async/await * Fix comments * Convert asyncio/locks.py to async/await * Convert asyncio.sleep to async def * Add a comment * Add missing news * Convert stubs from AbstrctEventLoop to async functions * Convert subprocess_shell/subprocess_exec * Convert connect_read_pipe/connect_write_pip to async/await syntax * Convert create_datagram_endpoint * Convert create_unix_server/create_unix_connection * Get rid of old style coroutines in unix_events.py * Convert selector_events.py to async/await * Convert wait_closed and create_connection * Drop redundant line * Convert base_events.py * Code cleanup * Drop redundant comments * Fix indentation * Add explicit tests for compatibility between old and new coroutines * Convert windows event loop to use async/await * Fix double awaiting of async function * Convert asyncio/locks.py * Improve docstring * Convert tests to async/await * Convert more tests * Convert more tests * Convert more tests * Convert tests * Improve test
This commit is contained in:
parent
ede157331b
commit
5f841b5538
22 changed files with 647 additions and 771 deletions
|
@ -33,7 +33,6 @@ from . import coroutines
|
|||
from . import events
|
||||
from . import futures
|
||||
from . import tasks
|
||||
from .coroutines import coroutine
|
||||
from .log import logger
|
||||
|
||||
|
||||
|
@ -220,13 +219,12 @@ class Server(events.AbstractServer):
|
|||
if not waiter.done():
|
||||
waiter.set_result(waiter)
|
||||
|
||||
@coroutine
|
||||
def wait_closed(self):
|
||||
async def wait_closed(self):
|
||||
if self.sockets is None or self._waiters is None:
|
||||
return
|
||||
waiter = self._loop.create_future()
|
||||
self._waiters.append(waiter)
|
||||
yield from waiter
|
||||
await waiter
|
||||
|
||||
|
||||
class BaseEventLoop(events.AbstractEventLoop):
|
||||
|
@ -330,10 +328,9 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
"""Create write pipe transport."""
|
||||
raise NotImplementedError
|
||||
|
||||
@coroutine
|
||||
def _make_subprocess_transport(self, protocol, args, shell,
|
||||
stdin, stdout, stderr, bufsize,
|
||||
extra=None, **kwargs):
|
||||
async def _make_subprocess_transport(self, protocol, args, shell,
|
||||
stdin, stdout, stderr, bufsize,
|
||||
extra=None, **kwargs):
|
||||
"""Create subprocess transport."""
|
||||
raise NotImplementedError
|
||||
|
||||
|
@ -371,8 +368,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
|
||||
self._asyncgens.add(agen)
|
||||
|
||||
@coroutine
|
||||
def shutdown_asyncgens(self):
|
||||
async def shutdown_asyncgens(self):
|
||||
"""Shutdown all active asynchronous generators."""
|
||||
self._asyncgens_shutdown_called = True
|
||||
|
||||
|
@ -384,12 +380,11 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
closing_agens = list(self._asyncgens)
|
||||
self._asyncgens.clear()
|
||||
|
||||
shutdown_coro = tasks.gather(
|
||||
results = await tasks.gather(
|
||||
*[ag.aclose() for ag in closing_agens],
|
||||
return_exceptions=True,
|
||||
loop=self)
|
||||
|
||||
results = yield from shutdown_coro
|
||||
for result, agen in zip(results, closing_agens):
|
||||
if isinstance(result, Exception):
|
||||
self.call_exception_handler({
|
||||
|
@ -671,10 +666,10 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
def getnameinfo(self, sockaddr, flags=0):
|
||||
return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
|
||||
|
||||
@coroutine
|
||||
def create_connection(self, protocol_factory, host=None, port=None, *,
|
||||
ssl=None, family=0, proto=0, flags=0, sock=None,
|
||||
local_addr=None, server_hostname=None):
|
||||
async def create_connection(self, protocol_factory, host=None, port=None,
|
||||
*, ssl=None, family=0,
|
||||
proto=0, flags=0, sock=None,
|
||||
local_addr=None, server_hostname=None):
|
||||
"""Connect to a TCP server.
|
||||
|
||||
Create a streaming transport connection to a given Internet host and
|
||||
|
@ -722,7 +717,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
else:
|
||||
f2 = None
|
||||
|
||||
yield from tasks.wait(fs, loop=self)
|
||||
await tasks.wait(fs, loop=self)
|
||||
|
||||
infos = f1.result()
|
||||
if not infos:
|
||||
|
@ -755,7 +750,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
continue
|
||||
if self._debug:
|
||||
logger.debug("connect %r to %r", sock, address)
|
||||
yield from self.sock_connect(sock, address)
|
||||
await self.sock_connect(sock, address)
|
||||
except OSError as exc:
|
||||
if sock is not None:
|
||||
sock.close()
|
||||
|
@ -793,7 +788,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
raise ValueError(
|
||||
'A Stream Socket was expected, got {!r}'.format(sock))
|
||||
|
||||
transport, protocol = yield from self._create_connection_transport(
|
||||
transport, protocol = await self._create_connection_transport(
|
||||
sock, protocol_factory, ssl, server_hostname)
|
||||
if self._debug:
|
||||
# Get the socket from the transport because SSL transport closes
|
||||
|
@ -803,9 +798,8 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
sock, host, port, transport, protocol)
|
||||
return transport, protocol
|
||||
|
||||
@coroutine
|
||||
def _create_connection_transport(self, sock, protocol_factory, ssl,
|
||||
server_hostname, server_side=False):
|
||||
async def _create_connection_transport(self, sock, protocol_factory, ssl,
|
||||
server_hostname, server_side=False):
|
||||
|
||||
sock.setblocking(False)
|
||||
|
||||
|
@ -820,19 +814,18 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
transport = self._make_socket_transport(sock, protocol, waiter)
|
||||
|
||||
try:
|
||||
yield from waiter
|
||||
await waiter
|
||||
except:
|
||||
transport.close()
|
||||
raise
|
||||
|
||||
return transport, protocol
|
||||
|
||||
@coroutine
|
||||
def create_datagram_endpoint(self, protocol_factory,
|
||||
local_addr=None, remote_addr=None, *,
|
||||
family=0, proto=0, flags=0,
|
||||
reuse_address=None, reuse_port=None,
|
||||
allow_broadcast=None, sock=None):
|
||||
async def create_datagram_endpoint(self, protocol_factory,
|
||||
local_addr=None, remote_addr=None, *,
|
||||
family=0, proto=0, flags=0,
|
||||
reuse_address=None, reuse_port=None,
|
||||
allow_broadcast=None, sock=None):
|
||||
"""Create datagram connection."""
|
||||
if sock is not None:
|
||||
if not _is_dgram_socket(sock):
|
||||
|
@ -872,7 +865,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
assert isinstance(addr, tuple) and len(addr) == 2, (
|
||||
'2-tuple is expected')
|
||||
|
||||
infos = yield from _ensure_resolved(
|
||||
infos = await _ensure_resolved(
|
||||
addr, family=family, type=socket.SOCK_DGRAM,
|
||||
proto=proto, flags=flags, loop=self)
|
||||
if not infos:
|
||||
|
@ -918,7 +911,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
if local_addr:
|
||||
sock.bind(local_address)
|
||||
if remote_addr:
|
||||
yield from self.sock_connect(sock, remote_address)
|
||||
await self.sock_connect(sock, remote_address)
|
||||
r_addr = remote_address
|
||||
except OSError as exc:
|
||||
if sock is not None:
|
||||
|
@ -948,32 +941,30 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
remote_addr, transport, protocol)
|
||||
|
||||
try:
|
||||
yield from waiter
|
||||
await waiter
|
||||
except:
|
||||
transport.close()
|
||||
raise
|
||||
|
||||
return transport, protocol
|
||||
|
||||
@coroutine
|
||||
def _create_server_getaddrinfo(self, host, port, family, flags):
|
||||
infos = yield from _ensure_resolved((host, port), family=family,
|
||||
type=socket.SOCK_STREAM,
|
||||
flags=flags, loop=self)
|
||||
async def _create_server_getaddrinfo(self, host, port, family, flags):
|
||||
infos = await _ensure_resolved((host, port), family=family,
|
||||
type=socket.SOCK_STREAM,
|
||||
flags=flags, loop=self)
|
||||
if not infos:
|
||||
raise OSError('getaddrinfo({!r}) returned empty list'.format(host))
|
||||
return infos
|
||||
|
||||
@coroutine
|
||||
def create_server(self, protocol_factory, host=None, port=None,
|
||||
*,
|
||||
family=socket.AF_UNSPEC,
|
||||
flags=socket.AI_PASSIVE,
|
||||
sock=None,
|
||||
backlog=100,
|
||||
ssl=None,
|
||||
reuse_address=None,
|
||||
reuse_port=None):
|
||||
async def create_server(self, protocol_factory, host=None, port=None,
|
||||
*,
|
||||
family=socket.AF_UNSPEC,
|
||||
flags=socket.AI_PASSIVE,
|
||||
sock=None,
|
||||
backlog=100,
|
||||
ssl=None,
|
||||
reuse_address=None,
|
||||
reuse_port=None):
|
||||
"""Create a TCP server.
|
||||
|
||||
The host parameter can be a string, in that case the TCP server is bound
|
||||
|
@ -1011,7 +1002,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
fs = [self._create_server_getaddrinfo(host, port, family=family,
|
||||
flags=flags)
|
||||
for host in hosts]
|
||||
infos = yield from tasks.gather(*fs, loop=self)
|
||||
infos = await tasks.gather(*fs, loop=self)
|
||||
infos = set(itertools.chain.from_iterable(infos))
|
||||
|
||||
completed = False
|
||||
|
@ -1068,8 +1059,8 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
logger.info("%r is serving", server)
|
||||
return server
|
||||
|
||||
@coroutine
|
||||
def connect_accepted_socket(self, protocol_factory, sock, *, ssl=None):
|
||||
async def connect_accepted_socket(self, protocol_factory, sock,
|
||||
*, ssl=None):
|
||||
"""Handle an accepted connection.
|
||||
|
||||
This is used by servers that accept connections outside of
|
||||
|
@ -1082,7 +1073,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
raise ValueError(
|
||||
'A Stream Socket was expected, got {!r}'.format(sock))
|
||||
|
||||
transport, protocol = yield from self._create_connection_transport(
|
||||
transport, protocol = await self._create_connection_transport(
|
||||
sock, protocol_factory, ssl, '', server_side=True)
|
||||
if self._debug:
|
||||
# Get the socket from the transport because SSL transport closes
|
||||
|
@ -1091,14 +1082,13 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
|
||||
return transport, protocol
|
||||
|
||||
@coroutine
|
||||
def connect_read_pipe(self, protocol_factory, pipe):
|
||||
async def connect_read_pipe(self, protocol_factory, pipe):
|
||||
protocol = protocol_factory()
|
||||
waiter = self.create_future()
|
||||
transport = self._make_read_pipe_transport(pipe, protocol, waiter)
|
||||
|
||||
try:
|
||||
yield from waiter
|
||||
await waiter
|
||||
except:
|
||||
transport.close()
|
||||
raise
|
||||
|
@ -1108,14 +1098,13 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
pipe.fileno(), transport, protocol)
|
||||
return transport, protocol
|
||||
|
||||
@coroutine
|
||||
def connect_write_pipe(self, protocol_factory, pipe):
|
||||
async def connect_write_pipe(self, protocol_factory, pipe):
|
||||
protocol = protocol_factory()
|
||||
waiter = self.create_future()
|
||||
transport = self._make_write_pipe_transport(pipe, protocol, waiter)
|
||||
|
||||
try:
|
||||
yield from waiter
|
||||
await waiter
|
||||
except:
|
||||
transport.close()
|
||||
raise
|
||||
|
@ -1138,11 +1127,13 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
info.append('stderr=%s' % _format_pipe(stderr))
|
||||
logger.debug(' '.join(info))
|
||||
|
||||
@coroutine
|
||||
def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||||
universal_newlines=False, shell=True, bufsize=0,
|
||||
**kwargs):
|
||||
async def subprocess_shell(self, protocol_factory, cmd, *,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
universal_newlines=False,
|
||||
shell=True, bufsize=0,
|
||||
**kwargs):
|
||||
if not isinstance(cmd, (bytes, str)):
|
||||
raise ValueError("cmd must be a string")
|
||||
if universal_newlines:
|
||||
|
@ -1157,17 +1148,16 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
# (password) and may be too long
|
||||
debug_log = 'run shell command %r' % cmd
|
||||
self._log_subprocess(debug_log, stdin, stdout, stderr)
|
||||
transport = yield from self._make_subprocess_transport(
|
||||
transport = await self._make_subprocess_transport(
|
||||
protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
|
||||
if self._debug:
|
||||
logger.info('%s: %r', debug_log, transport)
|
||||
return transport, protocol
|
||||
|
||||
@coroutine
|
||||
def subprocess_exec(self, protocol_factory, program, *args,
|
||||
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE, universal_newlines=False,
|
||||
shell=False, bufsize=0, **kwargs):
|
||||
async def subprocess_exec(self, protocol_factory, program, *args,
|
||||
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE, universal_newlines=False,
|
||||
shell=False, bufsize=0, **kwargs):
|
||||
if universal_newlines:
|
||||
raise ValueError("universal_newlines must be False")
|
||||
if shell:
|
||||
|
@ -1186,7 +1176,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
# (password) and may be too long
|
||||
debug_log = 'execute program %r' % program
|
||||
self._log_subprocess(debug_log, stdin, stdout, stderr)
|
||||
transport = yield from self._make_subprocess_transport(
|
||||
transport = await self._make_subprocess_transport(
|
||||
protocol, popen_args, False, stdin, stdout, stderr,
|
||||
bufsize, **kwargs)
|
||||
if self._debug:
|
||||
|
|
|
@ -4,7 +4,6 @@ import warnings
|
|||
|
||||
from . import protocols
|
||||
from . import transports
|
||||
from .coroutines import coroutine
|
||||
from .log import logger
|
||||
|
||||
|
||||
|
@ -154,26 +153,25 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
|
|||
self._check_proc()
|
||||
self._proc.kill()
|
||||
|
||||
@coroutine
|
||||
def _connect_pipes(self, waiter):
|
||||
async def _connect_pipes(self, waiter):
|
||||
try:
|
||||
proc = self._proc
|
||||
loop = self._loop
|
||||
|
||||
if proc.stdin is not None:
|
||||
_, pipe = yield from loop.connect_write_pipe(
|
||||
_, pipe = await loop.connect_write_pipe(
|
||||
lambda: WriteSubprocessPipeProto(self, 0),
|
||||
proc.stdin)
|
||||
self._pipes[0] = pipe
|
||||
|
||||
if proc.stdout is not None:
|
||||
_, pipe = yield from loop.connect_read_pipe(
|
||||
_, pipe = await loop.connect_read_pipe(
|
||||
lambda: ReadSubprocessPipeProto(self, 1),
|
||||
proc.stdout)
|
||||
self._pipes[1] = pipe
|
||||
|
||||
if proc.stderr is not None:
|
||||
_, pipe = yield from loop.connect_read_pipe(
|
||||
_, pipe = await loop.connect_read_pipe(
|
||||
lambda: ReadSubprocessPipeProto(self, 2),
|
||||
proc.stderr)
|
||||
self._pipes[2] = pipe
|
||||
|
@ -224,8 +222,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
|
|||
waiter.set_result(returncode)
|
||||
self._exit_waiters = None
|
||||
|
||||
@coroutine
|
||||
def _wait(self):
|
||||
async def _wait(self):
|
||||
"""Wait until the process exit and return the process return code.
|
||||
|
||||
This method is a coroutine."""
|
||||
|
@ -234,7 +231,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
|
|||
|
||||
waiter = self._loop.create_future()
|
||||
self._exit_waiters.append(waiter)
|
||||
return (yield from waiter)
|
||||
return await waiter
|
||||
|
||||
def _try_finish(self):
|
||||
assert not self._finished
|
||||
|
|
|
@ -219,7 +219,7 @@ class AbstractServer:
|
|||
"""Stop serving. This leaves existing connections open."""
|
||||
return NotImplemented
|
||||
|
||||
def wait_closed(self):
|
||||
async def wait_closed(self):
|
||||
"""Coroutine to wait until service is closed."""
|
||||
return NotImplemented
|
||||
|
||||
|
@ -267,7 +267,7 @@ class AbstractEventLoop:
|
|||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def shutdown_asyncgens(self):
|
||||
async def shutdown_asyncgens(self):
|
||||
"""Shutdown all active asynchronous generators."""
|
||||
raise NotImplementedError
|
||||
|
||||
|
@ -302,7 +302,7 @@ class AbstractEventLoop:
|
|||
def call_soon_threadsafe(self, callback, *args):
|
||||
raise NotImplementedError
|
||||
|
||||
def run_in_executor(self, executor, func, *args):
|
||||
async def run_in_executor(self, executor, func, *args):
|
||||
raise NotImplementedError
|
||||
|
||||
def set_default_executor(self, executor):
|
||||
|
@ -310,21 +310,23 @@ class AbstractEventLoop:
|
|||
|
||||
# Network I/O methods returning Futures.
|
||||
|
||||
def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
|
||||
async def getaddrinfo(self, host, port, *,
|
||||
family=0, type=0, proto=0, flags=0):
|
||||
raise NotImplementedError
|
||||
|
||||
def getnameinfo(self, sockaddr, flags=0):
|
||||
async def getnameinfo(self, sockaddr, flags=0):
|
||||
raise NotImplementedError
|
||||
|
||||
def create_connection(self, protocol_factory, host=None, port=None, *,
|
||||
ssl=None, family=0, proto=0, flags=0, sock=None,
|
||||
local_addr=None, server_hostname=None):
|
||||
async def create_connection(self, protocol_factory, host=None, port=None,
|
||||
*, ssl=None, family=0, proto=0,
|
||||
flags=0, sock=None, local_addr=None,
|
||||
server_hostname=None):
|
||||
raise NotImplementedError
|
||||
|
||||
def create_server(self, protocol_factory, host=None, port=None, *,
|
||||
family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
|
||||
sock=None, backlog=100, ssl=None, reuse_address=None,
|
||||
reuse_port=None):
|
||||
async def create_server(self, protocol_factory, host=None, port=None,
|
||||
*, family=socket.AF_UNSPEC,
|
||||
flags=socket.AI_PASSIVE, sock=None, backlog=100,
|
||||
ssl=None, reuse_address=None, reuse_port=None):
|
||||
"""A coroutine which creates a TCP server bound to host and port.
|
||||
|
||||
The return value is a Server object which can be used to stop
|
||||
|
@ -362,13 +364,13 @@ class AbstractEventLoop:
|
|||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def create_unix_connection(self, protocol_factory, path=None, *,
|
||||
ssl=None, sock=None,
|
||||
server_hostname=None):
|
||||
async def create_unix_connection(self, protocol_factory, path=None, *,
|
||||
ssl=None, sock=None,
|
||||
server_hostname=None):
|
||||
raise NotImplementedError
|
||||
|
||||
def create_unix_server(self, protocol_factory, path=None, *,
|
||||
sock=None, backlog=100, ssl=None):
|
||||
async def create_unix_server(self, protocol_factory, path=None, *,
|
||||
sock=None, backlog=100, ssl=None):
|
||||
"""A coroutine which creates a UNIX Domain Socket server.
|
||||
|
||||
The return value is a Server object, which can be used to stop
|
||||
|
@ -388,11 +390,11 @@ class AbstractEventLoop:
|
|||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def create_datagram_endpoint(self, protocol_factory,
|
||||
local_addr=None, remote_addr=None, *,
|
||||
family=0, proto=0, flags=0,
|
||||
reuse_address=None, reuse_port=None,
|
||||
allow_broadcast=None, sock=None):
|
||||
async def create_datagram_endpoint(self, protocol_factory,
|
||||
local_addr=None, remote_addr=None, *,
|
||||
family=0, proto=0, flags=0,
|
||||
reuse_address=None, reuse_port=None,
|
||||
allow_broadcast=None, sock=None):
|
||||
"""A coroutine which creates a datagram endpoint.
|
||||
|
||||
This method will try to establish the endpoint in the background.
|
||||
|
@ -425,7 +427,7 @@ class AbstractEventLoop:
|
|||
|
||||
# Pipes and subprocesses.
|
||||
|
||||
def connect_read_pipe(self, protocol_factory, pipe):
|
||||
async def connect_read_pipe(self, protocol_factory, pipe):
|
||||
"""Register read pipe in event loop. Set the pipe to non-blocking mode.
|
||||
|
||||
protocol_factory should instantiate object with Protocol interface.
|
||||
|
@ -438,7 +440,7 @@ class AbstractEventLoop:
|
|||
# close fd in pipe transport then close f and vise versa.
|
||||
raise NotImplementedError
|
||||
|
||||
def connect_write_pipe(self, protocol_factory, pipe):
|
||||
async def connect_write_pipe(self, protocol_factory, pipe):
|
||||
"""Register write pipe in event loop.
|
||||
|
||||
protocol_factory should instantiate object with BaseProtocol interface.
|
||||
|
@ -451,14 +453,18 @@ class AbstractEventLoop:
|
|||
# close fd in pipe transport then close f and vise versa.
|
||||
raise NotImplementedError
|
||||
|
||||
def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||||
**kwargs):
|
||||
async def subprocess_shell(self, protocol_factory, cmd, *,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
**kwargs):
|
||||
raise NotImplementedError
|
||||
|
||||
def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||||
**kwargs):
|
||||
async def subprocess_exec(self, protocol_factory, *args,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
**kwargs):
|
||||
raise NotImplementedError
|
||||
|
||||
# Ready-based callback registration methods.
|
||||
|
@ -480,19 +486,19 @@ class AbstractEventLoop:
|
|||
|
||||
# Completion based I/O methods returning Futures.
|
||||
|
||||
def sock_recv(self, sock, nbytes):
|
||||
async def sock_recv(self, sock, nbytes):
|
||||
raise NotImplementedError
|
||||
|
||||
def sock_recv_into(self, sock, buf):
|
||||
async def sock_recv_into(self, sock, buf):
|
||||
raise NotImplementedError
|
||||
|
||||
def sock_sendall(self, sock, data):
|
||||
async def sock_sendall(self, sock, data):
|
||||
raise NotImplementedError
|
||||
|
||||
def sock_connect(self, sock, address):
|
||||
async def sock_connect(self, sock, address):
|
||||
raise NotImplementedError
|
||||
|
||||
def sock_accept(self, sock):
|
||||
async def sock_accept(self, sock):
|
||||
raise NotImplementedError
|
||||
|
||||
# Signal handling.
|
||||
|
|
|
@ -66,20 +66,21 @@ class _ContextManagerMixin:
|
|||
yield from self.acquire()
|
||||
return _ContextManager(self)
|
||||
|
||||
def __await__(self):
|
||||
# To make "with await lock" work.
|
||||
yield from self.acquire()
|
||||
async def __acquire_ctx(self):
|
||||
await self.acquire()
|
||||
return _ContextManager(self)
|
||||
|
||||
@coroutine
|
||||
def __aenter__(self):
|
||||
yield from self.acquire()
|
||||
def __await__(self):
|
||||
# To make "with await lock" work.
|
||||
return self.__acquire_ctx().__await__()
|
||||
|
||||
async def __aenter__(self):
|
||||
await self.acquire()
|
||||
# We have no use for the "as ..." clause in the with
|
||||
# statement for locks.
|
||||
return None
|
||||
|
||||
@coroutine
|
||||
def __aexit__(self, exc_type, exc, tb):
|
||||
async def __aexit__(self, exc_type, exc, tb):
|
||||
self.release()
|
||||
|
||||
|
||||
|
@ -156,8 +157,7 @@ class Lock(_ContextManagerMixin):
|
|||
"""Return True if lock is acquired."""
|
||||
return self._locked
|
||||
|
||||
@coroutine
|
||||
def acquire(self):
|
||||
async def acquire(self):
|
||||
"""Acquire a lock.
|
||||
|
||||
This method blocks until the lock is unlocked, then sets it to
|
||||
|
@ -170,7 +170,7 @@ class Lock(_ContextManagerMixin):
|
|||
fut = self._loop.create_future()
|
||||
self._waiters.append(fut)
|
||||
try:
|
||||
yield from fut
|
||||
await fut
|
||||
self._locked = True
|
||||
return True
|
||||
except futures.CancelledError:
|
||||
|
@ -251,8 +251,7 @@ class Event:
|
|||
to true again."""
|
||||
self._value = False
|
||||
|
||||
@coroutine
|
||||
def wait(self):
|
||||
async def wait(self):
|
||||
"""Block until the internal flag is true.
|
||||
|
||||
If the internal flag is true on entry, return True
|
||||
|
@ -265,7 +264,7 @@ class Event:
|
|||
fut = self._loop.create_future()
|
||||
self._waiters.append(fut)
|
||||
try:
|
||||
yield from fut
|
||||
await fut
|
||||
return True
|
||||
finally:
|
||||
self._waiters.remove(fut)
|
||||
|
@ -307,8 +306,7 @@ class Condition(_ContextManagerMixin):
|
|||
extra = '{},waiters:{}'.format(extra, len(self._waiters))
|
||||
return '<{} [{}]>'.format(res[1:-1], extra)
|
||||
|
||||
@coroutine
|
||||
def wait(self):
|
||||
async def wait(self):
|
||||
"""Wait until notified.
|
||||
|
||||
If the calling coroutine has not acquired the lock when this
|
||||
|
@ -327,7 +325,7 @@ class Condition(_ContextManagerMixin):
|
|||
fut = self._loop.create_future()
|
||||
self._waiters.append(fut)
|
||||
try:
|
||||
yield from fut
|
||||
await fut
|
||||
return True
|
||||
finally:
|
||||
self._waiters.remove(fut)
|
||||
|
@ -336,13 +334,12 @@ class Condition(_ContextManagerMixin):
|
|||
# Must reacquire lock even if wait is cancelled
|
||||
while True:
|
||||
try:
|
||||
yield from self.acquire()
|
||||
await self.acquire()
|
||||
break
|
||||
except futures.CancelledError:
|
||||
pass
|
||||
|
||||
@coroutine
|
||||
def wait_for(self, predicate):
|
||||
async def wait_for(self, predicate):
|
||||
"""Wait until a predicate becomes true.
|
||||
|
||||
The predicate should be a callable which result will be
|
||||
|
@ -351,7 +348,7 @@ class Condition(_ContextManagerMixin):
|
|||
"""
|
||||
result = predicate()
|
||||
while not result:
|
||||
yield from self.wait()
|
||||
await self.wait()
|
||||
result = predicate()
|
||||
return result
|
||||
|
||||
|
@ -432,8 +429,7 @@ class Semaphore(_ContextManagerMixin):
|
|||
"""Returns True if semaphore can not be acquired immediately."""
|
||||
return self._value == 0
|
||||
|
||||
@coroutine
|
||||
def acquire(self):
|
||||
async def acquire(self):
|
||||
"""Acquire a semaphore.
|
||||
|
||||
If the internal counter is larger than zero on entry,
|
||||
|
@ -446,7 +442,7 @@ class Semaphore(_ContextManagerMixin):
|
|||
fut = self._loop.create_future()
|
||||
self._waiters.append(fut)
|
||||
try:
|
||||
yield from fut
|
||||
await fut
|
||||
except:
|
||||
# See the similar code in Queue.get.
|
||||
fut.cancel()
|
||||
|
|
|
@ -7,7 +7,6 @@ import heapq
|
|||
|
||||
from . import events
|
||||
from . import locks
|
||||
from .coroutines import coroutine
|
||||
|
||||
|
||||
class QueueEmpty(Exception):
|
||||
|
@ -28,7 +27,7 @@ class Queue:
|
|||
"""A queue, useful for coordinating producer and consumer coroutines.
|
||||
|
||||
If maxsize is less than or equal to zero, the queue size is infinite. If it
|
||||
is an integer greater than 0, then "yield from put()" will block when the
|
||||
is an integer greater than 0, then "await put()" will block when the
|
||||
queue reaches maxsize, until an item is removed by get().
|
||||
|
||||
Unlike the standard library Queue, you can reliably know this Queue's size
|
||||
|
@ -116,20 +115,17 @@ class Queue:
|
|||
else:
|
||||
return self.qsize() >= self._maxsize
|
||||
|
||||
@coroutine
|
||||
def put(self, item):
|
||||
async def put(self, item):
|
||||
"""Put an item into the queue.
|
||||
|
||||
Put an item into the queue. If the queue is full, wait until a free
|
||||
slot is available before adding item.
|
||||
|
||||
This method is a coroutine.
|
||||
"""
|
||||
while self.full():
|
||||
putter = self._loop.create_future()
|
||||
self._putters.append(putter)
|
||||
try:
|
||||
yield from putter
|
||||
await putter
|
||||
except:
|
||||
putter.cancel() # Just in case putter is not done yet.
|
||||
if not self.full() and not putter.cancelled():
|
||||
|
@ -151,19 +147,16 @@ class Queue:
|
|||
self._finished.clear()
|
||||
self._wakeup_next(self._getters)
|
||||
|
||||
@coroutine
|
||||
def get(self):
|
||||
async def get(self):
|
||||
"""Remove and return an item from the queue.
|
||||
|
||||
If queue is empty, wait until an item is available.
|
||||
|
||||
This method is a coroutine.
|
||||
"""
|
||||
while self.empty():
|
||||
getter = self._loop.create_future()
|
||||
self._getters.append(getter)
|
||||
try:
|
||||
yield from getter
|
||||
await getter
|
||||
except:
|
||||
getter.cancel() # Just in case getter is not done yet.
|
||||
|
||||
|
@ -210,8 +203,7 @@ class Queue:
|
|||
if self._unfinished_tasks == 0:
|
||||
self._finished.set()
|
||||
|
||||
@coroutine
|
||||
def join(self):
|
||||
async def join(self):
|
||||
"""Block until all items in the queue have been gotten and processed.
|
||||
|
||||
The count of unfinished tasks goes up whenever an item is added to the
|
||||
|
@ -220,7 +212,7 @@ class Queue:
|
|||
When the count of unfinished tasks drops to zero, join() unblocks.
|
||||
"""
|
||||
if self._unfinished_tasks > 0:
|
||||
yield from self._finished.wait()
|
||||
await self._finished.wait()
|
||||
|
||||
|
||||
class PriorityQueue(Queue):
|
||||
|
|
|
@ -24,7 +24,6 @@ from . import events
|
|||
from . import futures
|
||||
from . import transports
|
||||
from . import sslproto
|
||||
from .coroutines import coroutine
|
||||
from .log import logger
|
||||
|
||||
|
||||
|
@ -189,9 +188,8 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
sslcontext, server)
|
||||
self.create_task(accept)
|
||||
|
||||
@coroutine
|
||||
def _accept_connection2(self, protocol_factory, conn, extra,
|
||||
sslcontext=None, server=None):
|
||||
async def _accept_connection2(self, protocol_factory, conn, extra,
|
||||
sslcontext=None, server=None):
|
||||
protocol = None
|
||||
transport = None
|
||||
try:
|
||||
|
@ -207,7 +205,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
server=server)
|
||||
|
||||
try:
|
||||
yield from waiter
|
||||
await waiter
|
||||
except:
|
||||
transport.close()
|
||||
raise
|
||||
|
@ -452,8 +450,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
fd = sock.fileno()
|
||||
self.add_writer(fd, self._sock_sendall, fut, fd, sock, data)
|
||||
|
||||
@coroutine
|
||||
def sock_connect(self, sock, address):
|
||||
async def sock_connect(self, sock, address):
|
||||
"""Connect to a remote socket at address.
|
||||
|
||||
This method is a coroutine.
|
||||
|
@ -465,12 +462,12 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
resolved = base_events._ensure_resolved(
|
||||
address, family=sock.family, proto=sock.proto, loop=self)
|
||||
if not resolved.done():
|
||||
yield from resolved
|
||||
await resolved
|
||||
_, _, _, _, address = resolved.result()[0]
|
||||
|
||||
fut = self.create_future()
|
||||
self._sock_connect(fut, sock, address)
|
||||
return (yield from fut)
|
||||
return await fut
|
||||
|
||||
def _sock_connect(self, fut, sock, address):
|
||||
fd = sock.fileno()
|
||||
|
|
|
@ -14,8 +14,8 @@ if hasattr(socket, 'AF_UNIX'):
|
|||
from . import coroutines
|
||||
from . import events
|
||||
from . import protocols
|
||||
from .coroutines import coroutine
|
||||
from .log import logger
|
||||
from .tasks import sleep
|
||||
|
||||
|
||||
_DEFAULT_LIMIT = 2 ** 16
|
||||
|
@ -52,9 +52,8 @@ class LimitOverrunError(Exception):
|
|||
return type(self), (self.args[0], self.consumed)
|
||||
|
||||
|
||||
@coroutine
|
||||
def open_connection(host=None, port=None, *,
|
||||
loop=None, limit=_DEFAULT_LIMIT, **kwds):
|
||||
async def open_connection(host=None, port=None, *,
|
||||
loop=None, limit=_DEFAULT_LIMIT, **kwds):
|
||||
"""A wrapper for create_connection() returning a (reader, writer) pair.
|
||||
|
||||
The reader returned is a StreamReader instance; the writer is a
|
||||
|
@ -76,15 +75,14 @@ def open_connection(host=None, port=None, *,
|
|||
loop = events.get_event_loop()
|
||||
reader = StreamReader(limit=limit, loop=loop)
|
||||
protocol = StreamReaderProtocol(reader, loop=loop)
|
||||
transport, _ = yield from loop.create_connection(
|
||||
transport, _ = await loop.create_connection(
|
||||
lambda: protocol, host, port, **kwds)
|
||||
writer = StreamWriter(transport, protocol, reader, loop)
|
||||
return reader, writer
|
||||
|
||||
|
||||
@coroutine
|
||||
def start_server(client_connected_cb, host=None, port=None, *,
|
||||
loop=None, limit=_DEFAULT_LIMIT, **kwds):
|
||||
async def start_server(client_connected_cb, host=None, port=None, *,
|
||||
loop=None, limit=_DEFAULT_LIMIT, **kwds):
|
||||
"""Start a socket server, call back for each client connected.
|
||||
|
||||
The first parameter, `client_connected_cb`, takes two parameters:
|
||||
|
@ -115,28 +113,26 @@ def start_server(client_connected_cb, host=None, port=None, *,
|
|||
loop=loop)
|
||||
return protocol
|
||||
|
||||
return (yield from loop.create_server(factory, host, port, **kwds))
|
||||
return await loop.create_server(factory, host, port, **kwds)
|
||||
|
||||
|
||||
if hasattr(socket, 'AF_UNIX'):
|
||||
# UNIX Domain Sockets are supported on this platform
|
||||
|
||||
@coroutine
|
||||
def open_unix_connection(path=None, *,
|
||||
loop=None, limit=_DEFAULT_LIMIT, **kwds):
|
||||
async def open_unix_connection(path=None, *,
|
||||
loop=None, limit=_DEFAULT_LIMIT, **kwds):
|
||||
"""Similar to `open_connection` but works with UNIX Domain Sockets."""
|
||||
if loop is None:
|
||||
loop = events.get_event_loop()
|
||||
reader = StreamReader(limit=limit, loop=loop)
|
||||
protocol = StreamReaderProtocol(reader, loop=loop)
|
||||
transport, _ = yield from loop.create_unix_connection(
|
||||
transport, _ = await loop.create_unix_connection(
|
||||
lambda: protocol, path, **kwds)
|
||||
writer = StreamWriter(transport, protocol, reader, loop)
|
||||
return reader, writer
|
||||
|
||||
@coroutine
|
||||
def start_unix_server(client_connected_cb, path=None, *,
|
||||
loop=None, limit=_DEFAULT_LIMIT, **kwds):
|
||||
async def start_unix_server(client_connected_cb, path=None, *,
|
||||
loop=None, limit=_DEFAULT_LIMIT, **kwds):
|
||||
"""Similar to `start_server` but works with UNIX Domain Sockets."""
|
||||
if loop is None:
|
||||
loop = events.get_event_loop()
|
||||
|
@ -147,7 +143,7 @@ if hasattr(socket, 'AF_UNIX'):
|
|||
loop=loop)
|
||||
return protocol
|
||||
|
||||
return (yield from loop.create_unix_server(factory, path, **kwds))
|
||||
return await loop.create_unix_server(factory, path, **kwds)
|
||||
|
||||
|
||||
class FlowControlMixin(protocols.Protocol):
|
||||
|
@ -203,8 +199,7 @@ class FlowControlMixin(protocols.Protocol):
|
|||
else:
|
||||
waiter.set_exception(exc)
|
||||
|
||||
@coroutine
|
||||
def _drain_helper(self):
|
||||
async def _drain_helper(self):
|
||||
if self._connection_lost:
|
||||
raise ConnectionResetError('Connection lost')
|
||||
if not self._paused:
|
||||
|
@ -213,7 +208,7 @@ class FlowControlMixin(protocols.Protocol):
|
|||
assert waiter is None or waiter.cancelled()
|
||||
waiter = self._loop.create_future()
|
||||
self._drain_waiter = waiter
|
||||
yield from waiter
|
||||
await waiter
|
||||
|
||||
|
||||
class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
|
||||
|
@ -313,14 +308,13 @@ class StreamWriter:
|
|||
def get_extra_info(self, name, default=None):
|
||||
return self._transport.get_extra_info(name, default)
|
||||
|
||||
@coroutine
|
||||
def drain(self):
|
||||
async def drain(self):
|
||||
"""Flush the write buffer.
|
||||
|
||||
The intended use is to write
|
||||
|
||||
w.write(data)
|
||||
yield from w.drain()
|
||||
await w.drain()
|
||||
"""
|
||||
if self._reader is not None:
|
||||
exc = self._reader.exception()
|
||||
|
@ -331,11 +325,11 @@ class StreamWriter:
|
|||
# Yield to the event loop so connection_lost() may be
|
||||
# called. Without this, _drain_helper() would return
|
||||
# immediately, and code that calls
|
||||
# write(...); yield from drain()
|
||||
# write(...); await drain()
|
||||
# in a loop would never call connection_lost(), so it
|
||||
# would not see an error when the socket is closed.
|
||||
yield
|
||||
yield from self._protocol._drain_helper()
|
||||
await sleep(0, loop=self._loop)
|
||||
await self._protocol._drain_helper()
|
||||
|
||||
|
||||
class StreamReader:
|
||||
|
@ -436,8 +430,7 @@ class StreamReader:
|
|||
else:
|
||||
self._paused = True
|
||||
|
||||
@coroutine
|
||||
def _wait_for_data(self, func_name):
|
||||
async def _wait_for_data(self, func_name):
|
||||
"""Wait until feed_data() or feed_eof() is called.
|
||||
|
||||
If stream was paused, automatically resume it.
|
||||
|
@ -460,12 +453,11 @@ class StreamReader:
|
|||
|
||||
self._waiter = self._loop.create_future()
|
||||
try:
|
||||
yield from self._waiter
|
||||
await self._waiter
|
||||
finally:
|
||||
self._waiter = None
|
||||
|
||||
@coroutine
|
||||
def readline(self):
|
||||
async def readline(self):
|
||||
"""Read chunk of data from the stream until newline (b'\n') is found.
|
||||
|
||||
On success, return chunk that ends with newline. If only partial
|
||||
|
@ -484,7 +476,7 @@ class StreamReader:
|
|||
sep = b'\n'
|
||||
seplen = len(sep)
|
||||
try:
|
||||
line = yield from self.readuntil(sep)
|
||||
line = await self.readuntil(sep)
|
||||
except IncompleteReadError as e:
|
||||
return e.partial
|
||||
except LimitOverrunError as e:
|
||||
|
@ -496,8 +488,7 @@ class StreamReader:
|
|||
raise ValueError(e.args[0])
|
||||
return line
|
||||
|
||||
@coroutine
|
||||
def readuntil(self, separator=b'\n'):
|
||||
async def readuntil(self, separator=b'\n'):
|
||||
"""Read data from the stream until ``separator`` is found.
|
||||
|
||||
On success, the data and separator will be removed from the
|
||||
|
@ -577,7 +568,7 @@ class StreamReader:
|
|||
raise IncompleteReadError(chunk, None)
|
||||
|
||||
# _wait_for_data() will resume reading if stream was paused.
|
||||
yield from self._wait_for_data('readuntil')
|
||||
await self._wait_for_data('readuntil')
|
||||
|
||||
if isep > self._limit:
|
||||
raise LimitOverrunError(
|
||||
|
@ -588,8 +579,7 @@ class StreamReader:
|
|||
self._maybe_resume_transport()
|
||||
return bytes(chunk)
|
||||
|
||||
@coroutine
|
||||
def read(self, n=-1):
|
||||
async def read(self, n=-1):
|
||||
"""Read up to `n` bytes from the stream.
|
||||
|
||||
If n is not provided, or set to -1, read until EOF and return all read
|
||||
|
@ -623,14 +613,14 @@ class StreamReader:
|
|||
# bytes. So just call self.read(self._limit) until EOF.
|
||||
blocks = []
|
||||
while True:
|
||||
block = yield from self.read(self._limit)
|
||||
block = await self.read(self._limit)
|
||||
if not block:
|
||||
break
|
||||
blocks.append(block)
|
||||
return b''.join(blocks)
|
||||
|
||||
if not self._buffer and not self._eof:
|
||||
yield from self._wait_for_data('read')
|
||||
await self._wait_for_data('read')
|
||||
|
||||
# This will work right even if buffer is less than n bytes
|
||||
data = bytes(self._buffer[:n])
|
||||
|
@ -639,8 +629,7 @@ class StreamReader:
|
|||
self._maybe_resume_transport()
|
||||
return data
|
||||
|
||||
@coroutine
|
||||
def readexactly(self, n):
|
||||
async def readexactly(self, n):
|
||||
"""Read exactly `n` bytes.
|
||||
|
||||
Raise an IncompleteReadError if EOF is reached before `n` bytes can be
|
||||
|
@ -670,7 +659,7 @@ class StreamReader:
|
|||
self._buffer.clear()
|
||||
raise IncompleteReadError(incomplete, n)
|
||||
|
||||
yield from self._wait_for_data('readexactly')
|
||||
await self._wait_for_data('readexactly')
|
||||
|
||||
if len(self._buffer) == n:
|
||||
data = bytes(self._buffer)
|
||||
|
@ -684,9 +673,8 @@ class StreamReader:
|
|||
def __aiter__(self):
|
||||
return self
|
||||
|
||||
@coroutine
|
||||
def __anext__(self):
|
||||
val = yield from self.readline()
|
||||
async def __anext__(self):
|
||||
val = await self.readline()
|
||||
if val == b'':
|
||||
raise StopAsyncIteration
|
||||
return val
|
||||
|
|
|
@ -6,7 +6,6 @@ from . import events
|
|||
from . import protocols
|
||||
from . import streams
|
||||
from . import tasks
|
||||
from .coroutines import coroutine
|
||||
from .log import logger
|
||||
|
||||
|
||||
|
@ -121,12 +120,9 @@ class Process:
|
|||
def returncode(self):
|
||||
return self._transport.get_returncode()
|
||||
|
||||
@coroutine
|
||||
def wait(self):
|
||||
"""Wait until the process exit and return the process return code.
|
||||
|
||||
This method is a coroutine."""
|
||||
return (yield from self._transport._wait())
|
||||
async def wait(self):
|
||||
"""Wait until the process exit and return the process return code."""
|
||||
return await self._transport._wait()
|
||||
|
||||
def send_signal(self, signal):
|
||||
self._transport.send_signal(signal)
|
||||
|
@ -137,15 +133,14 @@ class Process:
|
|||
def kill(self):
|
||||
self._transport.kill()
|
||||
|
||||
@coroutine
|
||||
def _feed_stdin(self, input):
|
||||
async def _feed_stdin(self, input):
|
||||
debug = self._loop.get_debug()
|
||||
self.stdin.write(input)
|
||||
if debug:
|
||||
logger.debug('%r communicate: feed stdin (%s bytes)',
|
||||
self, len(input))
|
||||
try:
|
||||
yield from self.stdin.drain()
|
||||
await self.stdin.drain()
|
||||
except (BrokenPipeError, ConnectionResetError) as exc:
|
||||
# communicate() ignores BrokenPipeError and ConnectionResetError
|
||||
if debug:
|
||||
|
@ -155,12 +150,10 @@ class Process:
|
|||
logger.debug('%r communicate: close stdin', self)
|
||||
self.stdin.close()
|
||||
|
||||
@coroutine
|
||||
def _noop(self):
|
||||
async def _noop(self):
|
||||
return None
|
||||
|
||||
@coroutine
|
||||
def _read_stream(self, fd):
|
||||
async def _read_stream(self, fd):
|
||||
transport = self._transport.get_pipe_transport(fd)
|
||||
if fd == 2:
|
||||
stream = self.stderr
|
||||
|
@ -170,15 +163,14 @@ class Process:
|
|||
if self._loop.get_debug():
|
||||
name = 'stdout' if fd == 1 else 'stderr'
|
||||
logger.debug('%r communicate: read %s', self, name)
|
||||
output = yield from stream.read()
|
||||
output = await stream.read()
|
||||
if self._loop.get_debug():
|
||||
name = 'stdout' if fd == 1 else 'stderr'
|
||||
logger.debug('%r communicate: close %s', self, name)
|
||||
transport.close()
|
||||
return output
|
||||
|
||||
@coroutine
|
||||
def communicate(self, input=None):
|
||||
async def communicate(self, input=None):
|
||||
if input is not None:
|
||||
stdin = self._feed_stdin(input)
|
||||
else:
|
||||
|
@ -191,36 +183,36 @@ class Process:
|
|||
stderr = self._read_stream(2)
|
||||
else:
|
||||
stderr = self._noop()
|
||||
stdin, stdout, stderr = yield from tasks.gather(stdin, stdout, stderr,
|
||||
loop=self._loop)
|
||||
yield from self.wait()
|
||||
stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr,
|
||||
loop=self._loop)
|
||||
await self.wait()
|
||||
return (stdout, stderr)
|
||||
|
||||
|
||||
@coroutine
|
||||
def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
|
||||
loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
|
||||
async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
|
||||
loop=None, limit=streams._DEFAULT_LIMIT,
|
||||
**kwds):
|
||||
if loop is None:
|
||||
loop = events.get_event_loop()
|
||||
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
|
||||
loop=loop)
|
||||
transport, protocol = yield from loop.subprocess_shell(
|
||||
protocol_factory,
|
||||
cmd, stdin=stdin, stdout=stdout,
|
||||
stderr=stderr, **kwds)
|
||||
transport, protocol = await loop.subprocess_shell(
|
||||
protocol_factory,
|
||||
cmd, stdin=stdin, stdout=stdout,
|
||||
stderr=stderr, **kwds)
|
||||
return Process(transport, protocol, loop)
|
||||
|
||||
@coroutine
|
||||
def create_subprocess_exec(program, *args, stdin=None, stdout=None,
|
||||
stderr=None, loop=None,
|
||||
limit=streams._DEFAULT_LIMIT, **kwds):
|
||||
|
||||
async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
|
||||
stderr=None, loop=None,
|
||||
limit=streams._DEFAULT_LIMIT, **kwds):
|
||||
if loop is None:
|
||||
loop = events.get_event_loop()
|
||||
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
|
||||
loop=loop)
|
||||
transport, protocol = yield from loop.subprocess_exec(
|
||||
protocol_factory,
|
||||
program, *args,
|
||||
stdin=stdin, stdout=stdout,
|
||||
stderr=stderr, **kwds)
|
||||
transport, protocol = await loop.subprocess_exec(
|
||||
protocol_factory,
|
||||
program, *args,
|
||||
stdin=stdin, stdout=stdout,
|
||||
stderr=stderr, **kwds)
|
||||
return Process(transport, protocol, loop)
|
||||
|
|
|
@ -9,6 +9,7 @@ __all__ = ['Task',
|
|||
import concurrent.futures
|
||||
import functools
|
||||
import inspect
|
||||
import types
|
||||
import warnings
|
||||
import weakref
|
||||
|
||||
|
@ -276,8 +277,7 @@ FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
|
|||
ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
|
||||
|
||||
|
||||
@coroutine
|
||||
def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
|
||||
async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
|
||||
"""Wait for the Futures and coroutines given by fs to complete.
|
||||
|
||||
The sequence futures must not be empty.
|
||||
|
@ -288,7 +288,7 @@ def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
|
|||
|
||||
Usage:
|
||||
|
||||
done, pending = yield from asyncio.wait(fs)
|
||||
done, pending = await asyncio.wait(fs)
|
||||
|
||||
Note: This does not raise TimeoutError! Futures that aren't done
|
||||
when the timeout occurs are returned in the second set.
|
||||
|
@ -305,7 +305,7 @@ def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
|
|||
|
||||
fs = {ensure_future(f, loop=loop) for f in set(fs)}
|
||||
|
||||
return (yield from _wait(fs, timeout, return_when, loop))
|
||||
return await _wait(fs, timeout, return_when, loop)
|
||||
|
||||
|
||||
def _release_waiter(waiter, *args):
|
||||
|
@ -313,8 +313,7 @@ def _release_waiter(waiter, *args):
|
|||
waiter.set_result(None)
|
||||
|
||||
|
||||
@coroutine
|
||||
def wait_for(fut, timeout, *, loop=None):
|
||||
async def wait_for(fut, timeout, *, loop=None):
|
||||
"""Wait for the single Future or coroutine to complete, with timeout.
|
||||
|
||||
Coroutine will be wrapped in Task.
|
||||
|
@ -331,7 +330,7 @@ def wait_for(fut, timeout, *, loop=None):
|
|||
loop = events.get_event_loop()
|
||||
|
||||
if timeout is None:
|
||||
return (yield from fut)
|
||||
return await fut
|
||||
|
||||
if timeout <= 0:
|
||||
fut = ensure_future(fut, loop=loop)
|
||||
|
@ -352,7 +351,7 @@ def wait_for(fut, timeout, *, loop=None):
|
|||
try:
|
||||
# wait until the future completes or the timeout
|
||||
try:
|
||||
yield from waiter
|
||||
await waiter
|
||||
except futures.CancelledError:
|
||||
fut.remove_done_callback(cb)
|
||||
fut.cancel()
|
||||
|
@ -368,8 +367,7 @@ def wait_for(fut, timeout, *, loop=None):
|
|||
timeout_handle.cancel()
|
||||
|
||||
|
||||
@coroutine
|
||||
def _wait(fs, timeout, return_when, loop):
|
||||
async def _wait(fs, timeout, return_when, loop):
|
||||
"""Internal helper for wait() and wait_for().
|
||||
|
||||
The fs argument must be a collection of Futures.
|
||||
|
@ -397,7 +395,7 @@ def _wait(fs, timeout, return_when, loop):
|
|||
f.add_done_callback(_on_completion)
|
||||
|
||||
try:
|
||||
yield from waiter
|
||||
await waiter
|
||||
finally:
|
||||
if timeout_handle is not None:
|
||||
timeout_handle.cancel()
|
||||
|
@ -423,10 +421,10 @@ def as_completed(fs, *, loop=None, timeout=None):
|
|||
This differs from PEP 3148; the proper way to use this is:
|
||||
|
||||
for f in as_completed(fs):
|
||||
result = yield from f # The 'yield from' may raise.
|
||||
result = await f # The 'await' may raise.
|
||||
# Use result.
|
||||
|
||||
If a timeout is specified, the 'yield from' will raise
|
||||
If a timeout is specified, the 'await' will raise
|
||||
TimeoutError when the timeout occurs before all Futures are done.
|
||||
|
||||
Note: The futures 'f' are not necessarily members of fs.
|
||||
|
@ -453,9 +451,8 @@ def as_completed(fs, *, loop=None, timeout=None):
|
|||
if not todo and timeout_handle is not None:
|
||||
timeout_handle.cancel()
|
||||
|
||||
@coroutine
|
||||
def _wait_for_one():
|
||||
f = yield from done.get()
|
||||
async def _wait_for_one():
|
||||
f = await done.get()
|
||||
if f is None:
|
||||
# Dummy value from _on_timeout().
|
||||
raise futures.TimeoutError
|
||||
|
@ -469,11 +466,22 @@ def as_completed(fs, *, loop=None, timeout=None):
|
|||
yield _wait_for_one()
|
||||
|
||||
|
||||
@coroutine
|
||||
def sleep(delay, result=None, *, loop=None):
|
||||
@types.coroutine
|
||||
def __sleep0():
|
||||
"""Skip one event loop run cycle.
|
||||
|
||||
This is a private helper for 'asyncio.sleep()', used
|
||||
when the 'delay' is set to 0. It uses a bare 'yield'
|
||||
expression (which Task._step knows how to handle)
|
||||
instead of creating a Future object.
|
||||
"""
|
||||
yield
|
||||
|
||||
|
||||
async def sleep(delay, result=None, *, loop=None):
|
||||
"""Coroutine that completes after a given time (in seconds)."""
|
||||
if delay == 0:
|
||||
yield
|
||||
await __sleep0()
|
||||
return result
|
||||
|
||||
if loop is None:
|
||||
|
@ -483,7 +491,7 @@ def sleep(delay, result=None, *, loop=None):
|
|||
futures._set_result_unless_cancelled,
|
||||
future, result)
|
||||
try:
|
||||
return (yield from future)
|
||||
return await future
|
||||
finally:
|
||||
h.cancel()
|
||||
|
||||
|
@ -652,11 +660,11 @@ def shield(arg, *, loop=None):
|
|||
|
||||
The statement
|
||||
|
||||
res = yield from shield(something())
|
||||
res = await shield(something())
|
||||
|
||||
is exactly equivalent to the statement
|
||||
|
||||
res = yield from something()
|
||||
res = await something()
|
||||
|
||||
*except* that if the coroutine containing it is cancelled, the
|
||||
task running in something() is not cancelled. From the POV of
|
||||
|
@ -669,7 +677,7 @@ def shield(arg, *, loop=None):
|
|||
you can combine shield() with a try/except clause, as follows:
|
||||
|
||||
try:
|
||||
res = yield from shield(something())
|
||||
res = await shield(something())
|
||||
except CancelledError:
|
||||
res = None
|
||||
"""
|
||||
|
|
|
@ -30,7 +30,6 @@ from . import base_events
|
|||
from . import events
|
||||
from . import futures
|
||||
from . import tasks
|
||||
from .coroutines import coroutine
|
||||
from .log import logger
|
||||
from test import support
|
||||
|
||||
|
@ -43,8 +42,7 @@ def dummy_ssl_context():
|
|||
|
||||
|
||||
def run_briefly(loop):
|
||||
@coroutine
|
||||
def once():
|
||||
async def once():
|
||||
pass
|
||||
gen = once()
|
||||
t = loop.create_task(gen)
|
||||
|
|
|
@ -20,7 +20,6 @@ from . import events
|
|||
from . import futures
|
||||
from . import selector_events
|
||||
from . import transports
|
||||
from .coroutines import coroutine
|
||||
from .log import logger
|
||||
|
||||
|
||||
|
@ -168,10 +167,9 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
|
|||
extra=None):
|
||||
return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
|
||||
|
||||
@coroutine
|
||||
def _make_subprocess_transport(self, protocol, args, shell,
|
||||
stdin, stdout, stderr, bufsize,
|
||||
extra=None, **kwargs):
|
||||
async def _make_subprocess_transport(self, protocol, args, shell,
|
||||
stdin, stdout, stderr, bufsize,
|
||||
extra=None, **kwargs):
|
||||
with events.get_child_watcher() as watcher:
|
||||
waiter = self.create_future()
|
||||
transp = _UnixSubprocessTransport(self, protocol, args, shell,
|
||||
|
@ -182,29 +180,20 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
|
|||
watcher.add_child_handler(transp.get_pid(),
|
||||
self._child_watcher_callback, transp)
|
||||
try:
|
||||
yield from waiter
|
||||
except Exception as exc:
|
||||
# Workaround CPython bug #23353: using yield/yield-from in an
|
||||
# except block of a generator doesn't clear properly
|
||||
# sys.exc_info()
|
||||
err = exc
|
||||
else:
|
||||
err = None
|
||||
|
||||
if err is not None:
|
||||
await waiter
|
||||
except Exception:
|
||||
transp.close()
|
||||
yield from transp._wait()
|
||||
raise err
|
||||
await transp._wait()
|
||||
raise
|
||||
|
||||
return transp
|
||||
|
||||
def _child_watcher_callback(self, pid, returncode, transp):
|
||||
self.call_soon_threadsafe(transp._process_exited, returncode)
|
||||
|
||||
@coroutine
|
||||
def create_unix_connection(self, protocol_factory, path=None, *,
|
||||
ssl=None, sock=None,
|
||||
server_hostname=None):
|
||||
async def create_unix_connection(self, protocol_factory, path=None, *,
|
||||
ssl=None, sock=None,
|
||||
server_hostname=None):
|
||||
assert server_hostname is None or isinstance(server_hostname, str)
|
||||
if ssl:
|
||||
if server_hostname is None:
|
||||
|
@ -223,7 +212,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
|
|||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
|
||||
try:
|
||||
sock.setblocking(False)
|
||||
yield from self.sock_connect(sock, path)
|
||||
await self.sock_connect(sock, path)
|
||||
except:
|
||||
sock.close()
|
||||
raise
|
||||
|
@ -238,13 +227,12 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
|
|||
.format(sock))
|
||||
sock.setblocking(False)
|
||||
|
||||
transport, protocol = yield from self._create_connection_transport(
|
||||
transport, protocol = await self._create_connection_transport(
|
||||
sock, protocol_factory, ssl, server_hostname)
|
||||
return transport, protocol
|
||||
|
||||
@coroutine
|
||||
def create_unix_server(self, protocol_factory, path=None, *,
|
||||
sock=None, backlog=100, ssl=None):
|
||||
async def create_unix_server(self, protocol_factory, path=None, *,
|
||||
sock=None, backlog=100, ssl=None):
|
||||
if isinstance(ssl, bool):
|
||||
raise TypeError('ssl argument must be an SSLContext or None')
|
||||
|
||||
|
|
|
@ -15,7 +15,6 @@ from . import proactor_events
|
|||
from . import selector_events
|
||||
from . import tasks
|
||||
from . import windows_utils
|
||||
from .coroutines import coroutine
|
||||
from .log import logger
|
||||
|
||||
|
||||
|
@ -305,17 +304,15 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
|
|||
proactor = IocpProactor()
|
||||
super().__init__(proactor)
|
||||
|
||||
@coroutine
|
||||
def create_pipe_connection(self, protocol_factory, address):
|
||||
async def create_pipe_connection(self, protocol_factory, address):
|
||||
f = self._proactor.connect_pipe(address)
|
||||
pipe = yield from f
|
||||
pipe = await f
|
||||
protocol = protocol_factory()
|
||||
trans = self._make_duplex_pipe_transport(pipe, protocol,
|
||||
extra={'addr': address})
|
||||
return trans, protocol
|
||||
|
||||
@coroutine
|
||||
def start_serving_pipe(self, protocol_factory, address):
|
||||
async def start_serving_pipe(self, protocol_factory, address):
|
||||
server = PipeServer(address)
|
||||
|
||||
def loop_accept_pipe(f=None):
|
||||
|
@ -361,28 +358,20 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
|
|||
self.call_soon(loop_accept_pipe)
|
||||
return [server]
|
||||
|
||||
@coroutine
|
||||
def _make_subprocess_transport(self, protocol, args, shell,
|
||||
stdin, stdout, stderr, bufsize,
|
||||
extra=None, **kwargs):
|
||||
async def _make_subprocess_transport(self, protocol, args, shell,
|
||||
stdin, stdout, stderr, bufsize,
|
||||
extra=None, **kwargs):
|
||||
waiter = self.create_future()
|
||||
transp = _WindowsSubprocessTransport(self, protocol, args, shell,
|
||||
stdin, stdout, stderr, bufsize,
|
||||
waiter=waiter, extra=extra,
|
||||
**kwargs)
|
||||
try:
|
||||
yield from waiter
|
||||
except Exception as exc:
|
||||
# Workaround CPython bug #23353: using yield/yield-from in an
|
||||
# except block of a generator doesn't clear properly sys.exc_info()
|
||||
err = exc
|
||||
else:
|
||||
err = None
|
||||
|
||||
if err is not None:
|
||||
await waiter
|
||||
except Exception:
|
||||
transp.close()
|
||||
yield from transp._wait()
|
||||
raise err
|
||||
await transp._wait()
|
||||
raise
|
||||
|
||||
return transp
|
||||
|
||||
|
@ -498,11 +487,10 @@ class IocpProactor:
|
|||
conn.settimeout(listener.gettimeout())
|
||||
return conn, conn.getpeername()
|
||||
|
||||
@coroutine
|
||||
def accept_coro(future, conn):
|
||||
async def accept_coro(future, conn):
|
||||
# Coroutine closing the accept socket if the future is cancelled
|
||||
try:
|
||||
yield from future
|
||||
await future
|
||||
except futures.CancelledError:
|
||||
conn.close()
|
||||
raise
|
||||
|
@ -552,8 +540,7 @@ class IocpProactor:
|
|||
|
||||
return self._register(ov, pipe, finish_accept_pipe)
|
||||
|
||||
@coroutine
|
||||
def connect_pipe(self, address):
|
||||
async def connect_pipe(self, address):
|
||||
delay = CONNECT_PIPE_INIT_DELAY
|
||||
while True:
|
||||
# Unfortunately there is no way to do an overlapped connect to a pipe.
|
||||
|
@ -568,7 +555,7 @@ class IocpProactor:
|
|||
|
||||
# ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
|
||||
delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
|
||||
yield from tasks.sleep(delay, loop=self._loop)
|
||||
await tasks.sleep(delay, loop=self._loop)
|
||||
|
||||
return windows_utils.PipeHandle(handle)
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue