bpo-42392: Remove loop parameter from asyncio.tasks and asyncio.subprocess (GH-23521)

This commit is contained in:
Yurii Karabas 2020-11-28 10:21:17 +02:00 committed by GitHub
parent f9195318a8
commit e4fe303b8c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 66 additions and 158 deletions

View file

@ -350,7 +350,7 @@ class Server(events.AbstractServer):
self._start_serving()
# Skip one loop iteration so that all 'loop.add_reader'
# go through.
await tasks.sleep(0, loop=self._loop)
await tasks.sleep(0)
async def serve_forever(self):
if self._serving_forever_fut is not None:
@ -541,8 +541,7 @@ class BaseEventLoop(events.AbstractEventLoop):
results = await tasks.gather(
*[ag.aclose() for ag in closing_agens],
return_exceptions=True,
loop=self)
return_exceptions=True)
for result, agen in zip(results, closing_agens):
if isinstance(result, Exception):
@ -1457,7 +1456,7 @@ class BaseEventLoop(events.AbstractEventLoop):
fs = [self._create_server_getaddrinfo(host, port, family=family,
flags=flags)
for host in hosts]
infos = await tasks.gather(*fs, loop=self)
infos = await tasks.gather(*fs)
infos = set(itertools.chain.from_iterable(infos))
completed = False
@ -1515,7 +1514,7 @@ class BaseEventLoop(events.AbstractEventLoop):
server._start_serving()
# Skip one loop iteration so that all 'loop.add_reader'
# go through.
await tasks.sleep(0, loop=self)
await tasks.sleep(0)
if self._debug:
logger.info("%r is serving", server)

View file

@ -60,8 +60,7 @@ def _cancel_all_tasks(loop):
for task in to_cancel:
task.cancel()
loop.run_until_complete(
tasks.gather(*to_cancel, loop=loop, return_exceptions=True))
loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
for task in to_cancel:
if task.cancelled():

View file

@ -1,7 +1,6 @@
__all__ = 'create_subprocess_exec', 'create_subprocess_shell'
import subprocess
import warnings
from . import events
from . import protocols
@ -193,24 +192,14 @@ class Process:
stderr = self._read_stream(2)
else:
stderr = self._noop()
stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr,
loop=self._loop)
stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr)
await self.wait()
return (stdout, stderr)
async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
loop=None, limit=streams._DEFAULT_LIMIT,
**kwds):
if loop is None:
loop = events.get_event_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8 "
"and scheduled for removal in Python 3.10.",
DeprecationWarning,
stacklevel=2
)
limit=streams._DEFAULT_LIMIT, **kwds):
loop = events.get_running_loop()
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
loop=loop)
transport, protocol = await loop.subprocess_shell(
@ -221,16 +210,9 @@ async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
stderr=None, loop=None,
limit=streams._DEFAULT_LIMIT, **kwds):
if loop is None:
loop = events.get_event_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8 "
"and scheduled for removal in Python 3.10.",
DeprecationWarning,
stacklevel=2
)
stderr=None, limit=streams._DEFAULT_LIMIT,
**kwds):
loop = events.get_running_loop()
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
loop=loop)
transport, protocol = await loop.subprocess_exec(

View file

@ -370,7 +370,7 @@ FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
"""Wait for the Futures and coroutines given by fs to complete.
The fs iterable must not be empty.
@ -393,12 +393,7 @@ async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
raise ValueError(f'Invalid return_when value: {return_when}')
if loop is None:
loop = events.get_running_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
loop = events.get_running_loop()
fs = set(fs)
@ -418,7 +413,7 @@ def _release_waiter(waiter, *args):
waiter.set_result(None)
async def wait_for(fut, timeout, *, loop=None):
async def wait_for(fut, timeout):
"""Wait for the single Future or coroutine to complete, with timeout.
Coroutine will be wrapped in Task.
@ -431,12 +426,7 @@ async def wait_for(fut, timeout, *, loop=None):
This function is a coroutine.
"""
if loop is None:
loop = events.get_running_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
loop = events.get_running_loop()
if timeout is None:
return await fut
@ -556,7 +546,7 @@ async def _cancel_and_wait(fut, loop):
# This is *not* a @coroutine! It is just an iterator (yielding Futures).
def as_completed(fs, *, loop=None, timeout=None):
def as_completed(fs, *, timeout=None):
"""Return an iterator whose values are coroutines.
When waiting for the yielded coroutines you'll get the results (or
@ -580,12 +570,7 @@ def as_completed(fs, *, loop=None, timeout=None):
from .queues import Queue # Import here to avoid circular import problem.
done = Queue()
if loop is None:
loop = events.get_event_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
loop = events.get_event_loop()
todo = {ensure_future(f, loop=loop) for f in set(fs)}
timeout_handle = None
@ -630,19 +615,13 @@ def __sleep0():
yield
async def sleep(delay, result=None, *, loop=None):
async def sleep(delay, result=None):
"""Coroutine that completes after a given time (in seconds)."""
if delay <= 0:
await __sleep0()
return result
if loop is None:
loop = events.get_running_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
loop = events.get_running_loop()
future = loop.create_future()
h = loop.call_later(delay,
futures._set_result_unless_cancelled,
@ -717,7 +696,7 @@ class _GatheringFuture(futures.Future):
return ret
def gather(*coros_or_futures, loop=None, return_exceptions=False):
def gather(*coros_or_futures, return_exceptions=False):
"""Return a future aggregating results from the given coroutines/futures.
Coroutines will be wrapped in a future and scheduled in the event
@ -748,12 +727,7 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
gather won't cancel any other awaitables.
"""
if not coros_or_futures:
if loop is None:
loop = events.get_event_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
loop = events.get_event_loop()
outer = loop.create_future()
outer.set_result([])
return outer
@ -817,6 +791,7 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
children = []
nfuts = 0
nfinished = 0
loop = None
for arg in coros_or_futures:
if arg not in arg_to_fut:
fut = ensure_future(arg, loop=loop)
@ -843,7 +818,7 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
return outer
def shield(arg, *, loop=None):
def shield(arg):
"""Wait for a future, shielding it from cancellation.
The statement
@ -869,11 +844,7 @@ def shield(arg, *, loop=None):
except CancelledError:
res = None
"""
if loop is not None:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
inner = ensure_future(arg, loop=loop)
inner = ensure_future(arg)
if inner.done():
# Shortcut.
return inner

View file

@ -323,7 +323,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
server._start_serving()
# Skip one loop iteration so that all 'loop.add_reader'
# go through.
await tasks.sleep(0, loop=self)
await tasks.sleep(0)
return server