gh-97696: asyncio eager tasks factory (#102853)

Co-authored-by: Jacob Bower <jbower@meta.com>
Co-authored-by: Carol Willing <carolcode@willingconsulting.com>
This commit is contained in:
Itamar Ostricher 2023-05-01 14:10:13 -07:00 committed by GitHub
parent 59bc36aacd
commit a474e04388
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 945 additions and 47 deletions

View file

@ -6,6 +6,7 @@ __all__ = (
'wait', 'wait_for', 'as_completed', 'sleep',
'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
'current_task', 'all_tasks',
'create_eager_task_factory', 'eager_task_factory',
'_register_task', '_unregister_task', '_enter_task', '_leave_task',
)
@ -43,22 +44,26 @@ def all_tasks(loop=None):
"""Return a set of all tasks for the loop."""
if loop is None:
loop = events.get_running_loop()
# Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
# thread while we do so. Therefore we cast it to list prior to filtering. The list
# cast itself requires iteration, so we repeat it several times ignoring
# RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
# details.
# capturing the set of eager tasks first, so if an eager task "graduates"
# to a regular task in another thread, we don't risk missing it.
eager_tasks = list(_eager_tasks)
# Looping over the WeakSet isn't safe as it can be updated from another
# thread, therefore we cast it to list prior to filtering. The list cast
# itself requires iteration, so we repeat it several times ignoring
# RuntimeErrors (which are not very likely to occur).
# See issues 34970 and 36607 for details.
scheduled_tasks = None
i = 0
while True:
try:
tasks = list(_all_tasks)
scheduled_tasks = list(_scheduled_tasks)
except RuntimeError:
i += 1
if i >= 1000:
raise
else:
break
return {t for t in tasks
return {t for t in itertools.chain(scheduled_tasks, eager_tasks)
if futures._get_loop(t) is loop and not t.done()}
@ -93,7 +98,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
# status is still pending
_log_destroy_pending = True
def __init__(self, coro, *, loop=None, name=None, context=None):
def __init__(self, coro, *, loop=None, name=None, context=None,
eager_start=False):
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
@ -117,8 +123,11 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
else:
self._context = context
self._loop.call_soon(self.__step, context=self._context)
_register_task(self)
if eager_start and self._loop.is_running():
self.__eager_start()
else:
self._loop.call_soon(self.__step, context=self._context)
_register_task(self)
def __del__(self):
if self._state == futures._PENDING and self._log_destroy_pending:
@ -250,6 +259,25 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
self._num_cancels_requested -= 1
return self._num_cancels_requested
def __eager_start(self):
prev_task = _swap_current_task(self._loop, self)
try:
_register_eager_task(self)
try:
self._context.run(self.__step_run_and_handle_result, None)
finally:
_unregister_eager_task(self)
finally:
try:
curtask = _swap_current_task(self._loop, prev_task)
assert curtask is self
finally:
if self.done():
self._coro = None
self = None # Needed to break cycles when an exception occurs.
else:
_register_task(self)
def __step(self, exc=None):
if self.done():
raise exceptions.InvalidStateError(
@ -258,11 +286,17 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
if not isinstance(exc, exceptions.CancelledError):
exc = self._make_cancelled_error()
self._must_cancel = False
coro = self._coro
self._fut_waiter = None
_enter_task(self._loop, self)
# Call either coro.throw(exc) or coro.send(None).
try:
self.__step_run_and_handle_result(exc)
finally:
_leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs.
def __step_run_and_handle_result(self, exc):
coro = self._coro
try:
if exc is None:
# We use the `send` method directly, because coroutines
@ -334,7 +368,6 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
self._loop.call_soon(
self.__step, new_exc, context=self._context)
finally:
_leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs.
def __wakeup(self, future):
@ -897,8 +930,27 @@ def run_coroutine_threadsafe(coro, loop):
return future
# WeakSet containing all alive tasks.
_all_tasks = weakref.WeakSet()
def create_eager_task_factory(custom_task_constructor):
if "eager_start" not in inspect.signature(custom_task_constructor).parameters:
raise TypeError(
"Provided constructor does not support eager task execution")
def factory(loop, coro, *, name=None, context=None):
return custom_task_constructor(
coro, loop=loop, name=name, context=context, eager_start=True)
return factory
eager_task_factory = create_eager_task_factory(Task)
# Collectively these two sets hold references to the complete set of active
# tasks. Eagerly executed tasks use a faster regular set as an optimization
# but may graduate to a WeakSet if the task blocks on IO.
_scheduled_tasks = weakref.WeakSet()
_eager_tasks = set()
# Dictionary containing tasks that are currently active in
# all running event loops. {EventLoop: Task}
@ -906,8 +958,13 @@ _current_tasks = {}
def _register_task(task):
"""Register a new task in asyncio as executed by loop."""
_all_tasks.add(task)
"""Register an asyncio Task scheduled to run on an event loop."""
_scheduled_tasks.add(task)
def _register_eager_task(task):
"""Register an asyncio Task about to be eagerly executed."""
_eager_tasks.add(task)
def _enter_task(loop, task):
@ -926,28 +983,49 @@ def _leave_task(loop, task):
del _current_tasks[loop]
def _swap_current_task(loop, task):
prev_task = _current_tasks.get(loop)
if task is None:
del _current_tasks[loop]
else:
_current_tasks[loop] = task
return prev_task
def _unregister_task(task):
"""Unregister a task."""
_all_tasks.discard(task)
"""Unregister a completed, scheduled Task."""
_scheduled_tasks.discard(task)
def _unregister_eager_task(task):
"""Unregister a task which finished its first eager step."""
_eager_tasks.discard(task)
_py_current_task = current_task
_py_register_task = _register_task
_py_register_eager_task = _register_eager_task
_py_unregister_task = _unregister_task
_py_unregister_eager_task = _unregister_eager_task
_py_enter_task = _enter_task
_py_leave_task = _leave_task
_py_swap_current_task = _swap_current_task
try:
from _asyncio import (_register_task, _unregister_task,
_enter_task, _leave_task,
_all_tasks, _current_tasks,
from _asyncio import (_register_task, _register_eager_task,
_unregister_task, _unregister_eager_task,
_enter_task, _leave_task, _swap_current_task,
_scheduled_tasks, _eager_tasks, _current_tasks,
current_task)
except ImportError:
pass
else:
_c_current_task = current_task
_c_register_task = _register_task
_c_register_eager_task = _register_eager_task
_c_unregister_task = _unregister_task
_c_unregister_eager_task = _unregister_eager_task
_c_enter_task = _enter_task
_c_leave_task = _leave_task
_c_swap_current_task = _swap_current_task