mirror of
https://github.com/python/cpython.git
synced 2025-08-04 00:48:58 +00:00
bpo-32415: Add asyncio.Task.get_loop() and Future.get_loop() (#4992)
This commit is contained in:
parent
558aa30f79
commit
ca9b36cd1a
9 changed files with 152 additions and 76 deletions
|
@ -34,7 +34,7 @@ def all_tasks(loop=None):
|
|||
"""Return a set of all tasks for the loop."""
|
||||
if loop is None:
|
||||
loop = events.get_event_loop()
|
||||
return {t for t, l in _all_tasks.items() if l is loop}
|
||||
return {t for t in _all_tasks if futures._get_loop(t) is loop}
|
||||
|
||||
|
||||
class Task(futures.Future):
|
||||
|
@ -96,7 +96,7 @@ class Task(futures.Future):
|
|||
self._coro = coro
|
||||
|
||||
self._loop.call_soon(self._step)
|
||||
_register_task(self._loop, self)
|
||||
_register_task(self)
|
||||
|
||||
def __del__(self):
|
||||
if self._state == futures._PENDING and self._log_destroy_pending:
|
||||
|
@ -215,7 +215,7 @@ class Task(futures.Future):
|
|||
blocking = getattr(result, '_asyncio_future_blocking', None)
|
||||
if blocking is not None:
|
||||
# Yielded Future must come from Future.__iter__().
|
||||
if result._loop is not self._loop:
|
||||
if futures._get_loop(result) is not self._loop:
|
||||
new_exc = RuntimeError(
|
||||
f'Task {self!r} got Future '
|
||||
f'{result!r} attached to a different loop')
|
||||
|
@ -510,9 +510,9 @@ async def sleep(delay, result=None, *, loop=None):
|
|||
if loop is None:
|
||||
loop = events.get_event_loop()
|
||||
future = loop.create_future()
|
||||
h = future._loop.call_later(delay,
|
||||
futures._set_result_unless_cancelled,
|
||||
future, result)
|
||||
h = loop.call_later(delay,
|
||||
futures._set_result_unless_cancelled,
|
||||
future, result)
|
||||
try:
|
||||
return await future
|
||||
finally:
|
||||
|
@ -525,7 +525,7 @@ def ensure_future(coro_or_future, *, loop=None):
|
|||
If the argument is a Future, it is returned directly.
|
||||
"""
|
||||
if futures.isfuture(coro_or_future):
|
||||
if loop is not None and loop is not coro_or_future._loop:
|
||||
if loop is not None and loop is not futures._get_loop(coro_or_future):
|
||||
raise ValueError('loop argument must agree with Future')
|
||||
return coro_or_future
|
||||
elif coroutines.iscoroutine(coro_or_future):
|
||||
|
@ -655,7 +655,7 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
|
|||
if arg not in arg_to_fut:
|
||||
fut = ensure_future(arg, loop=loop)
|
||||
if loop is None:
|
||||
loop = fut._loop
|
||||
loop = futures._get_loop(fut)
|
||||
if fut is not arg:
|
||||
# 'arg' was not a Future, therefore, 'fut' is a new
|
||||
# Future created specifically for 'arg'. Since the caller
|
||||
|
@ -707,7 +707,7 @@ def shield(arg, *, loop=None):
|
|||
if inner.done():
|
||||
# Shortcut.
|
||||
return inner
|
||||
loop = inner._loop
|
||||
loop = futures._get_loop(inner)
|
||||
outer = loop.create_future()
|
||||
|
||||
def _done_callback(inner):
|
||||
|
@ -751,23 +751,17 @@ def run_coroutine_threadsafe(coro, loop):
|
|||
return future
|
||||
|
||||
|
||||
# WeakKeyDictionary of {Task: EventLoop} containing all tasks alive.
|
||||
# Task should be a weak reference to remove entry on task garbage
|
||||
# collection, EventLoop is required
|
||||
# to not access to private task._loop attribute.
|
||||
_all_tasks = weakref.WeakKeyDictionary()
|
||||
# WeakSet containing all alive tasks.
|
||||
_all_tasks = weakref.WeakSet()
|
||||
|
||||
# Dictionary containing tasks that are currently active in
|
||||
# all running event loops. {EventLoop: Task}
|
||||
_current_tasks = {}
|
||||
|
||||
|
||||
def _register_task(loop, task):
|
||||
"""Register a new task in asyncio as executed by loop.
|
||||
|
||||
Returns None.
|
||||
"""
|
||||
_all_tasks[task] = loop
|
||||
def _register_task(task):
|
||||
"""Register a new task in asyncio as executed by loop."""
|
||||
_all_tasks.add(task)
|
||||
|
||||
|
||||
def _enter_task(loop, task):
|
||||
|
@ -786,8 +780,9 @@ def _leave_task(loop, task):
|
|||
del _current_tasks[loop]
|
||||
|
||||
|
||||
def _unregister_task(loop, task):
|
||||
_all_tasks.pop(task, None)
|
||||
def _unregister_task(task):
|
||||
"""Unregister a task."""
|
||||
_all_tasks.discard(task)
|
||||
|
||||
|
||||
_py_register_task = _register_task
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue