GH-91048: Add utils for capturing async call stack for asyncio programs and enable profiling (#124640)

Signed-off-by: Pablo Galindo <pablogsal@gmail.com>
Co-authored-by: Pablo Galindo <pablogsal@gmail.com>
Co-authored-by: Kumar Aditya <kumaraditya@python.org>
Co-authored-by: Łukasz Langa <lukasz@langa.pl>
Co-authored-by: Savannah Ostrowski <savannahostrowski@gmail.com>
Co-authored-by: Jacob Coffee <jacob@z7x.org>
Co-authored-by: Irit Katriel <1055913+iritkatriel@users.noreply.github.com>
This commit is contained in:
Yury Selivanov 2025-01-22 08:25:29 -08:00 committed by GitHub
parent 60a3a0dd6f
commit 188598851d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 2923 additions and 241 deletions

View file

@ -0,0 +1,145 @@
.. currentmodule:: asyncio
.. _asyncio-graph:
========================
Call Graph Introspection
========================
**Source code:** :source:`Lib/asyncio/graph.py`
-------------------------------------
asyncio has powerful runtime call graph introspection utilities
to trace the entire call graph of a running *coroutine* or *task*, or
a suspended *future*. These utilities and the underlying machinery
can be used from within a Python program or by external profilers
and debuggers.
.. versionadded:: next
.. function:: print_call_graph(future=None, /, *, file=None, depth=1, limit=None)
Print the async call graph for the current task or the provided
:class:`Task` or :class:`Future`.
This function prints entries starting from the top frame and going
down towards the invocation point.
The function receives an optional *future* argument.
If not passed, the current running task will be used.
If the function is called on *the current task*, the optional
keyword-only *depth* argument can be used to skip the specified
number of frames from top of the stack.
If the optional keyword-only *limit* argument is provided, each call stack
in the resulting graph is truncated to include at most ``abs(limit)``
entries. If *limit* is positive, the entries left are the closest to
the invocation point. If *limit* is negative, the topmost entries are
left. If *limit* is omitted or ``None``, all entries are present.
If *limit* is ``0``, the call stack is not printed at all, only
"awaited by" information is printed.
If *file* is omitted or ``None``, the function will print
to :data:`sys.stdout`.
**Example:**
The following Python code:
.. code-block:: python
import asyncio
async def test():
asyncio.print_call_graph()
async def main():
async with asyncio.TaskGroup() as g:
g.create_task(test())
asyncio.run(main())
will print::
* Task(name='Task-2', id=0x1039f0fe0)
+ Call stack:
| File 't2.py', line 4, in async test()
+ Awaited by:
* Task(name='Task-1', id=0x103a5e060)
+ Call stack:
| File 'taskgroups.py', line 107, in async TaskGroup.__aexit__()
| File 't2.py', line 7, in async main()
.. function:: format_call_graph(future=None, /, *, depth=1, limit=None)
Like :func:`print_call_graph`, but returns a string.
If *future* is ``None`` and there's no current task,
the function returns an empty string.
.. function:: capture_call_graph(future=None, /, *, depth=1, limit=None)
Capture the async call graph for the current task or the provided
:class:`Task` or :class:`Future`.
The function receives an optional *future* argument.
If not passed, the current running task will be used. If there's no
current task, the function returns ``None``.
If the function is called on *the current task*, the optional
keyword-only *depth* argument can be used to skip the specified
number of frames from top of the stack.
Returns a ``FutureCallGraph`` data class object:
* ``FutureCallGraph(future, call_stack, awaited_by)``
Where *future* is a reference to a :class:`Future` or
a :class:`Task` (or their subclasses.)
``call_stack`` is a tuple of ``FrameCallGraphEntry`` objects.
``awaited_by`` is a tuple of ``FutureCallGraph`` objects.
* ``FrameCallGraphEntry(frame)``
Where *frame* is a frame object of a regular Python function
in the call stack.
Low level utility functions
===========================
To introspect an async call graph asyncio requires cooperation from
control flow structures, such as :func:`shield` or :class:`TaskGroup`.
Any time an intermediate :class:`Future` object with low-level APIs like
:meth:`Future.add_done_callback() <asyncio.Future.add_done_callback>` is
involved, the following two functions should be used to inform asyncio
about how exactly such intermediate future objects are connected with
the tasks they wrap or control.
.. function:: future_add_to_awaited_by(future, waiter, /)
Record that *future* is awaited on by *waiter*.
Both *future* and *waiter* must be instances of
:class:`Future` or :class:`Task` or their subclasses,
otherwise the call would have no effect.
A call to ``future_add_to_awaited_by()`` must be followed by an
eventual call to the :func:`future_discard_from_awaited_by` function
with the same arguments.
.. function:: future_discard_from_awaited_by(future, waiter, /)
Record that *future* is no longer awaited on by *waiter*.
Both *future* and *waiter* must be instances of
:class:`Future` or :class:`Task` or their subclasses, otherwise
the call would have no effect.

View file

@ -99,6 +99,7 @@ You can experiment with an ``asyncio`` concurrent context in the :term:`REPL`:
asyncio-subprocess.rst
asyncio-queue.rst
asyncio-exceptions.rst
asyncio-graph.rst
.. toctree::
:caption: Low-level APIs

View file

@ -150,6 +150,12 @@ attributes (see :ref:`import-mod-attrs` for module attributes):
| | f_locals | local namespace seen by |
| | | this frame |
+-----------------+-------------------+---------------------------+
| | f_generator | returns the generator or |
| | | coroutine object that |
| | | owns this frame, or |
| | | ``None`` if the frame is |
| | | of a regular function |
+-----------------+-------------------+---------------------------+
| | f_trace | tracing function for this |
| | | frame, or ``None`` |
+-----------------+-------------------+---------------------------+
@ -310,6 +316,10 @@ attributes (see :ref:`import-mod-attrs` for module attributes):
Add ``__builtins__`` attribute to functions.
.. versionchanged:: next
Add ``f_generator`` attribute to frames.
.. function:: getmembers(object[, predicate])
Return all the members of an object in a list of ``(name, value)``

View file

@ -755,6 +755,11 @@ asyncio
reduces memory usage.
(Contributed by Kumar Aditya in :gh:`107803`.)
* :mod:`asyncio` has new utility functions for introspecting and printing
the program's call graph: :func:`asyncio.capture_call_graph` and
:func:`asyncio.print_call_graph`.
(Contributed by Yury Selivanov, Pablo Galindo Salgado, and Łukasz Langa
in :gh:`91048`.)
base64
------

View file

@ -11,6 +11,41 @@ extern "C" {
#define _Py_Debug_Cookie "xdebugpy"
#if defined(__APPLE__)
# include <mach-o/loader.h>
#endif
// Macros to burn global values in custom sections so out-of-process
// profilers can locate them easily.
#define GENERATE_DEBUG_SECTION(name, declaration) \
_GENERATE_DEBUG_SECTION_WINDOWS(name) \
_GENERATE_DEBUG_SECTION_APPLE(name) \
declaration \
_GENERATE_DEBUG_SECTION_LINUX(name)
#if defined(MS_WINDOWS)
#define _GENERATE_DEBUG_SECTION_WINDOWS(name) \
_Pragma(Py_STRINGIFY(section(Py_STRINGIFY(name), read, write))) \
__declspec(allocate(Py_STRINGIFY(name)))
#else
#define _GENERATE_DEBUG_SECTION_WINDOWS(name)
#endif
#if defined(__APPLE__)
#define _GENERATE_DEBUG_SECTION_APPLE(name) \
__attribute__((section(SEG_DATA "," Py_STRINGIFY(name))))
#else
#define _GENERATE_DEBUG_SECTION_APPLE(name)
#endif
#if defined(__linux__) && (defined(__GNUC__) || defined(__clang__))
#define _GENERATE_DEBUG_SECTION_LINUX(name) \
__attribute__((section("." Py_STRINGIFY(name))))
#else
#define _GENERATE_DEBUG_SECTION_LINUX(name)
#endif
#ifdef Py_GIL_DISABLED
# define _Py_Debug_gilruntimestate_enabled offsetof(struct _gil_runtime_state, enabled)
# define _Py_Debug_Free_Threaded 1
@ -69,6 +104,7 @@ typedef struct _Py_DebugOffsets {
uint64_t instr_ptr;
uint64_t localsplus;
uint64_t owner;
uint64_t stackpointer;
} interpreter_frame;
// Code object offset;
@ -113,6 +149,14 @@ typedef struct _Py_DebugOffsets {
uint64_t ob_size;
} list_object;
// PySet object offset;
struct _set_object {
uint64_t size;
uint64_t used;
uint64_t table;
uint64_t mask;
} set_object;
// PyDict object offset;
struct _dict_object {
uint64_t size;
@ -153,6 +197,14 @@ typedef struct _Py_DebugOffsets {
uint64_t size;
uint64_t collecting;
} gc;
// Generator object offset;
struct _gen_object {
uint64_t size;
uint64_t gi_name;
uint64_t gi_iframe;
uint64_t gi_frame_state;
} gen_object;
} _Py_DebugOffsets;
@ -198,6 +250,7 @@ typedef struct _Py_DebugOffsets {
.instr_ptr = offsetof(_PyInterpreterFrame, instr_ptr), \
.localsplus = offsetof(_PyInterpreterFrame, localsplus), \
.owner = offsetof(_PyInterpreterFrame, owner), \
.stackpointer = offsetof(_PyInterpreterFrame, stackpointer), \
}, \
.code_object = { \
.size = sizeof(PyCodeObject), \
@ -231,6 +284,12 @@ typedef struct _Py_DebugOffsets {
.ob_item = offsetof(PyListObject, ob_item), \
.ob_size = offsetof(PyListObject, ob_base.ob_size), \
}, \
.set_object = { \
.size = sizeof(PySetObject), \
.used = offsetof(PySetObject, used), \
.table = offsetof(PySetObject, table), \
.mask = offsetof(PySetObject, mask), \
}, \
.dict_object = { \
.size = sizeof(PyDictObject), \
.ma_keys = offsetof(PyDictObject, ma_keys), \
@ -260,6 +319,12 @@ typedef struct _Py_DebugOffsets {
.size = sizeof(struct _gc_runtime_state), \
.collecting = offsetof(struct _gc_runtime_state, collecting), \
}, \
.gen_object = { \
.size = sizeof(PyGenObject), \
.gi_name = offsetof(PyGenObject, gi_name), \
.gi_iframe = offsetof(PyGenObject, gi_iframe), \
.gi_frame_state = offsetof(PyGenObject, gi_frame_state), \
}, \
}

View file

@ -22,6 +22,7 @@ typedef struct _PyThreadStateImpl {
PyThreadState base;
PyObject *asyncio_running_loop; // Strong reference
PyObject *asyncio_running_task; // Strong reference
struct _qsbr_thread_state *qsbr; // only used by free-threaded build
struct llist_node mem_free_queue; // delayed free queue

View file

@ -10,6 +10,7 @@ from .coroutines import *
from .events import *
from .exceptions import *
from .futures import *
from .graph import *
from .locks import *
from .protocols import *
from .runners import *
@ -27,6 +28,7 @@ __all__ = (base_events.__all__ +
events.__all__ +
exceptions.__all__ +
futures.__all__ +
graph.__all__ +
locks.__all__ +
protocols.__all__ +
runners.__all__ +

View file

@ -2,6 +2,7 @@
__all__ = (
'Future', 'wrap_future', 'isfuture',
'future_add_to_awaited_by', 'future_discard_from_awaited_by',
)
import concurrent.futures
@ -66,6 +67,9 @@ class Future:
# `yield Future()` (incorrect).
_asyncio_future_blocking = False
# Used by the capture_call_stack() API.
__asyncio_awaited_by = None
__log_traceback = False
def __init__(self, *, loop=None):
@ -115,6 +119,12 @@ class Future:
raise ValueError('_log_traceback can only be set to False')
self.__log_traceback = False
@property
def _asyncio_awaited_by(self):
if self.__asyncio_awaited_by is None:
return None
return frozenset(self.__asyncio_awaited_by)
def get_loop(self):
"""Return the event loop the Future is bound to."""
loop = self._loop
@ -415,6 +425,49 @@ def wrap_future(future, *, loop=None):
return new_future
def future_add_to_awaited_by(fut, waiter, /):
"""Record that `fut` is awaited on by `waiter`."""
# For the sake of keeping the implementation minimal and assuming
# that most of asyncio users use the built-in Futures and Tasks
# (or their subclasses), we only support native Future objects
# and their subclasses.
#
# Longer version: tracking requires storing the caller-callee
# dependency somewhere. One obvious choice is to store that
# information right in the future itself in a dedicated attribute.
# This means that we'd have to require all duck-type compatible
# futures to implement a specific attribute used by asyncio for
# the book keeping. Another solution would be to store that in
# a global dictionary. The downside here is that that would create
# strong references and any scenario where the "add" call isn't
# followed by a "discard" call would lead to a memory leak.
# Using WeakDict would resolve that issue, but would complicate
# the C code (_asynciomodule.c). The bottom line here is that
# it's not clear that all this work would be worth the effort.
#
# Note that there's an accelerated version of this function
# shadowing this implementation later in this file.
if isinstance(fut, _PyFuture) and isinstance(waiter, _PyFuture):
if fut._Future__asyncio_awaited_by is None:
fut._Future__asyncio_awaited_by = set()
fut._Future__asyncio_awaited_by.add(waiter)
def future_discard_from_awaited_by(fut, waiter, /):
"""Record that `fut` is no longer awaited on by `waiter`."""
# See the comment in "future_add_to_awaited_by()" body for
# details on implementation.
#
# Note that there's an accelerated version of this function
# shadowing this implementation later in this file.
if isinstance(fut, _PyFuture) and isinstance(waiter, _PyFuture):
if fut._Future__asyncio_awaited_by is not None:
fut._Future__asyncio_awaited_by.discard(waiter)
_py_future_add_to_awaited_by = future_add_to_awaited_by
_py_future_discard_from_awaited_by = future_discard_from_awaited_by
try:
import _asyncio
except ImportError:
@ -422,3 +475,7 @@ except ImportError:
else:
# _CFuture is needed for tests.
Future = _CFuture = _asyncio.Future
future_add_to_awaited_by = _asyncio.future_add_to_awaited_by
future_discard_from_awaited_by = _asyncio.future_discard_from_awaited_by
_c_future_add_to_awaited_by = future_add_to_awaited_by
_c_future_discard_from_awaited_by = future_discard_from_awaited_by

278
Lib/asyncio/graph.py Normal file
View file

@ -0,0 +1,278 @@
"""Introspection utils for tasks call graphs."""
import dataclasses
import sys
import types
from . import events
from . import futures
from . import tasks
__all__ = (
'capture_call_graph',
'format_call_graph',
'print_call_graph',
'FrameCallGraphEntry',
'FutureCallGraph',
)
if False: # for type checkers
from typing import TextIO
# Sadly, we can't re-use the traceback module's datastructures as those
# are tailored for error reporting, whereas we need to represent an
# async call graph.
#
# Going with pretty verbose names as we'd like to export them to the
# top level asyncio namespace, and want to avoid future name clashes.
@dataclasses.dataclass(frozen=True, slots=True)
class FrameCallGraphEntry:
frame: types.FrameType
@dataclasses.dataclass(frozen=True, slots=True)
class FutureCallGraph:
future: futures.Future
call_stack: tuple["FrameCallGraphEntry", ...]
awaited_by: tuple["FutureCallGraph", ...]
def _build_graph_for_future(
future: futures.Future,
*,
limit: int | None = None,
) -> FutureCallGraph:
if not isinstance(future, futures.Future):
raise TypeError(
f"{future!r} object does not appear to be compatible "
f"with asyncio.Future"
)
coro = None
if get_coro := getattr(future, 'get_coro', None):
coro = get_coro() if limit != 0 else None
st: list[FrameCallGraphEntry] = []
awaited_by: list[FutureCallGraph] = []
while coro is not None:
if hasattr(coro, 'cr_await'):
# A native coroutine or duck-type compatible iterator
st.append(FrameCallGraphEntry(coro.cr_frame))
coro = coro.cr_await
elif hasattr(coro, 'ag_await'):
# A native async generator or duck-type compatible iterator
st.append(FrameCallGraphEntry(coro.cr_frame))
coro = coro.ag_await
else:
break
if future._asyncio_awaited_by:
for parent in future._asyncio_awaited_by:
awaited_by.append(_build_graph_for_future(parent, limit=limit))
if limit is not None:
if limit > 0:
st = st[:limit]
elif limit < 0:
st = st[limit:]
st.reverse()
return FutureCallGraph(future, tuple(st), tuple(awaited_by))
def capture_call_graph(
future: futures.Future | None = None,
/,
*,
depth: int = 1,
limit: int | None = None,
) -> FutureCallGraph | None:
"""Capture the async call graph for the current task or the provided Future.
The graph is represented with three data structures:
* FutureCallGraph(future, call_stack, awaited_by)
Where 'future' is an instance of asyncio.Future or asyncio.Task.
'call_stack' is a tuple of FrameGraphEntry objects.
'awaited_by' is a tuple of FutureCallGraph objects.
* FrameCallGraphEntry(frame)
Where 'frame' is a frame object of a regular Python function
in the call stack.
Receives an optional 'future' argument. If not passed,
the current task will be used. If there's no current task, the function
returns None.
If "capture_call_graph()" is introspecting *the current task*, the
optional keyword-only 'depth' argument can be used to skip the specified
number of frames from top of the stack.
If the optional keyword-only 'limit' argument is provided, each call stack
in the resulting graph is truncated to include at most ``abs(limit)``
entries. If 'limit' is positive, the entries left are the closest to
the invocation point. If 'limit' is negative, the topmost entries are
left. If 'limit' is omitted or None, all entries are present.
If 'limit' is 0, the call stack is not captured at all, only
"awaited by" information is present.
"""
loop = events._get_running_loop()
if future is not None:
# Check if we're in a context of a running event loop;
# if yes - check if the passed future is the currently
# running task or not.
if loop is None or future is not tasks.current_task(loop=loop):
return _build_graph_for_future(future, limit=limit)
# else: future is the current task, move on.
else:
if loop is None:
raise RuntimeError(
'capture_call_graph() is called outside of a running '
'event loop and no *future* to introspect was provided')
future = tasks.current_task(loop=loop)
if future is None:
# This isn't a generic call stack introspection utility. If we
# can't determine the current task and none was provided, we
# just return.
return None
if not isinstance(future, futures.Future):
raise TypeError(
f"{future!r} object does not appear to be compatible "
f"with asyncio.Future"
)
call_stack: list[FrameCallGraphEntry] = []
f = sys._getframe(depth) if limit != 0 else None
try:
while f is not None:
is_async = f.f_generator is not None
call_stack.append(FrameCallGraphEntry(f))
if is_async:
if f.f_back is not None and f.f_back.f_generator is None:
# We've reached the bottom of the coroutine stack, which
# must be the Task that runs it.
break
f = f.f_back
finally:
del f
awaited_by = []
if future._asyncio_awaited_by:
for parent in future._asyncio_awaited_by:
awaited_by.append(_build_graph_for_future(parent, limit=limit))
if limit is not None:
limit *= -1
if limit > 0:
call_stack = call_stack[:limit]
elif limit < 0:
call_stack = call_stack[limit:]
return FutureCallGraph(future, tuple(call_stack), tuple(awaited_by))
def format_call_graph(
future: futures.Future | None = None,
/,
*,
depth: int = 1,
limit: int | None = None,
) -> str:
"""Return the async call graph as a string for `future`.
If `future` is not provided, format the call graph for the current task.
"""
def render_level(st: FutureCallGraph, buf: list[str], level: int) -> None:
def add_line(line: str) -> None:
buf.append(level * ' ' + line)
if isinstance(st.future, tasks.Task):
add_line(
f'* Task(name={st.future.get_name()!r}, id={id(st.future):#x})'
)
else:
add_line(
f'* Future(id={id(st.future):#x})'
)
if st.call_stack:
add_line(
f' + Call stack:'
)
for ste in st.call_stack:
f = ste.frame
if f.f_generator is None:
f = ste.frame
add_line(
f' | File {f.f_code.co_filename!r},'
f' line {f.f_lineno}, in'
f' {f.f_code.co_qualname}()'
)
else:
c = f.f_generator
try:
f = c.cr_frame
code = c.cr_code
tag = 'async'
except AttributeError:
try:
f = c.ag_frame
code = c.ag_code
tag = 'async generator'
except AttributeError:
f = c.gi_frame
code = c.gi_code
tag = 'generator'
add_line(
f' | File {f.f_code.co_filename!r},'
f' line {f.f_lineno}, in'
f' {tag} {code.co_qualname}()'
)
if st.awaited_by:
add_line(
f' + Awaited by:'
)
for fut in st.awaited_by:
render_level(fut, buf, level + 1)
graph = capture_call_graph(future, depth=depth + 1, limit=limit)
if graph is None:
return ""
buf: list[str] = []
try:
render_level(graph, buf, 0)
finally:
# 'graph' has references to frames so we should
# make sure it's GC'ed as soon as we don't need it.
del graph
return '\n'.join(buf)
def print_call_graph(
future: futures.Future | None = None,
/,
*,
file: TextIO | None = None,
depth: int = 1,
limit: int | None = None,
) -> None:
"""Print the async call graph for the current task or the provided Future."""
print(format_call_graph(future, depth=depth, limit=limit), file=file)

View file

@ -6,6 +6,7 @@ __all__ = ("TaskGroup",)
from . import events
from . import exceptions
from . import futures
from . import tasks
@ -197,6 +198,8 @@ class TaskGroup:
else:
task = self._loop.create_task(coro, name=name, context=context)
futures.future_add_to_awaited_by(task, self._parent_task)
# Always schedule the done callback even if the task is
# already done (e.g. if the coro was able to complete eagerly),
# otherwise if the task completes with an exception then it will cancel
@ -228,6 +231,8 @@ class TaskGroup:
def _on_task_done(self, task):
self._tasks.discard(task)
futures.future_discard_from_awaited_by(task, self._parent_task)
if self._on_completed_fut is not None and not self._tasks:
if not self._on_completed_fut.done():
self._on_completed_fut.set_result(True)

View file

@ -322,6 +322,7 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
self._loop.call_soon(
self.__step, new_exc, context=self._context)
else:
futures.future_add_to_awaited_by(result, self)
result._asyncio_future_blocking = False
result.add_done_callback(
self.__wakeup, context=self._context)
@ -356,6 +357,7 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
self = None # Needed to break cycles when an exception occurs.
def __wakeup(self, future):
futures.future_discard_from_awaited_by(future, self)
try:
future.result()
except BaseException as exc:
@ -502,6 +504,7 @@ async def _wait(fs, timeout, return_when, loop):
if timeout is not None:
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
counter = len(fs)
cur_task = current_task()
def _on_completion(f):
nonlocal counter
@ -514,9 +517,11 @@ async def _wait(fs, timeout, return_when, loop):
timeout_handle.cancel()
if not waiter.done():
waiter.set_result(None)
futures.future_discard_from_awaited_by(f, cur_task)
for f in fs:
f.add_done_callback(_on_completion)
futures.future_add_to_awaited_by(f, cur_task)
try:
await waiter
@ -802,10 +807,19 @@ def gather(*coros_or_futures, return_exceptions=False):
outer.set_result([])
return outer
def _done_callback(fut):
loop = events._get_running_loop()
if loop is not None:
cur_task = current_task(loop)
else:
cur_task = None
def _done_callback(fut, cur_task=cur_task):
nonlocal nfinished
nfinished += 1
if cur_task is not None:
futures.future_discard_from_awaited_by(fut, cur_task)
if outer is None or outer.done():
if not fut.cancelled():
# Mark exception retrieved.
@ -862,7 +876,6 @@ def gather(*coros_or_futures, return_exceptions=False):
nfuts = 0
nfinished = 0
done_futs = []
loop = None
outer = None # bpo-46672
for arg in coros_or_futures:
if arg not in arg_to_fut:
@ -875,12 +888,13 @@ def gather(*coros_or_futures, return_exceptions=False):
# can't control it, disable the "destroy pending task"
# warning.
fut._log_destroy_pending = False
nfuts += 1
arg_to_fut[arg] = fut
if fut.done():
done_futs.append(fut)
else:
if cur_task is not None:
futures.future_add_to_awaited_by(fut, cur_task)
fut.add_done_callback(_done_callback)
else:
@ -940,7 +954,15 @@ def shield(arg):
loop = futures._get_loop(inner)
outer = loop.create_future()
def _inner_done_callback(inner):
if loop is not None and (cur_task := current_task(loop)) is not None:
futures.future_add_to_awaited_by(inner, cur_task)
else:
cur_task = None
def _inner_done_callback(inner, cur_task=cur_task):
if cur_task is not None:
futures.future_discard_from_awaited_by(inner, cur_task)
if outer.cancelled():
if not inner.cancelled():
# Mark inner's result as retrieved.

View file

@ -0,0 +1,436 @@
import asyncio
import io
import unittest
# To prevent a warning "test altered the execution environment"
def tearDownModule():
asyncio._set_event_loop_policy(None)
def capture_test_stack(*, fut=None, depth=1):
def walk(s):
ret = [
(f"T<{n}>" if '-' not in (n := s.future.get_name()) else 'T<anon>')
if isinstance(s.future, asyncio.Task) else 'F'
]
ret.append(
[
(
f"s {entry.frame.f_code.co_name}"
if entry.frame.f_generator is None else
(
f"a {entry.frame.f_generator.cr_code.co_name}"
if hasattr(entry.frame.f_generator, 'cr_code') else
f"ag {entry.frame.f_generator.ag_code.co_name}"
)
) for entry in s.call_stack
]
)
ret.append(
sorted([
walk(ab) for ab in s.awaited_by
], key=lambda entry: entry[0])
)
return ret
buf = io.StringIO()
asyncio.print_call_graph(fut, file=buf, depth=depth+1)
stack = asyncio.capture_call_graph(fut, depth=depth)
return walk(stack), buf.getvalue()
class CallStackTestBase:
async def test_stack_tgroup(self):
stack_for_c5 = None
def c5():
nonlocal stack_for_c5
stack_for_c5 = capture_test_stack(depth=2)
async def c4():
await asyncio.sleep(0)
c5()
async def c3():
await c4()
async def c2():
await c3()
async def c1(task):
await task
async def main():
async with asyncio.TaskGroup() as tg:
task = tg.create_task(c2(), name="c2_root")
tg.create_task(c1(task), name="sub_main_1")
tg.create_task(c1(task), name="sub_main_2")
await main()
self.assertEqual(stack_for_c5[0], [
# task name
'T<c2_root>',
# call stack
['s c5', 'a c4', 'a c3', 'a c2'],
# awaited by
[
['T<anon>',
['a _aexit', 'a __aexit__', 'a main', 'a test_stack_tgroup'], []
],
['T<sub_main_1>',
['a c1'],
[
['T<anon>',
['a _aexit', 'a __aexit__', 'a main', 'a test_stack_tgroup'], []
]
]
],
['T<sub_main_2>',
['a c1'],
[
['T<anon>',
['a _aexit', 'a __aexit__', 'a main', 'a test_stack_tgroup'], []
]
]
]
]
])
self.assertIn(
' async CallStackTestBase.test_stack_tgroup()',
stack_for_c5[1])
async def test_stack_async_gen(self):
stack_for_gen_nested_call = None
async def gen_nested_call():
nonlocal stack_for_gen_nested_call
stack_for_gen_nested_call = capture_test_stack()
async def gen():
for num in range(2):
yield num
if num == 1:
await gen_nested_call()
async def main():
async for el in gen():
pass
await main()
self.assertEqual(stack_for_gen_nested_call[0], [
'T<anon>',
[
's capture_test_stack',
'a gen_nested_call',
'ag gen',
'a main',
'a test_stack_async_gen'
],
[]
])
self.assertIn(
'async generator CallStackTestBase.test_stack_async_gen.<locals>.gen()',
stack_for_gen_nested_call[1])
async def test_stack_gather(self):
stack_for_deep = None
async def deep():
await asyncio.sleep(0)
nonlocal stack_for_deep
stack_for_deep = capture_test_stack()
async def c1():
await asyncio.sleep(0)
await deep()
async def c2():
await asyncio.sleep(0)
async def main():
await asyncio.gather(c1(), c2())
await main()
self.assertEqual(stack_for_deep[0], [
'T<anon>',
['s capture_test_stack', 'a deep', 'a c1'],
[
['T<anon>', ['a main', 'a test_stack_gather'], []]
]
])
async def test_stack_shield(self):
stack_for_shield = None
async def deep():
await asyncio.sleep(0)
nonlocal stack_for_shield
stack_for_shield = capture_test_stack()
async def c1():
await asyncio.sleep(0)
await deep()
async def main():
await asyncio.shield(c1())
await main()
self.assertEqual(stack_for_shield[0], [
'T<anon>',
['s capture_test_stack', 'a deep', 'a c1'],
[
['T<anon>', ['a main', 'a test_stack_shield'], []]
]
])
async def test_stack_timeout(self):
stack_for_inner = None
async def inner():
await asyncio.sleep(0)
nonlocal stack_for_inner
stack_for_inner = capture_test_stack()
async def c1():
async with asyncio.timeout(1):
await asyncio.sleep(0)
await inner()
async def main():
await asyncio.shield(c1())
await main()
self.assertEqual(stack_for_inner[0], [
'T<anon>',
['s capture_test_stack', 'a inner', 'a c1'],
[
['T<anon>', ['a main', 'a test_stack_timeout'], []]
]
])
async def test_stack_wait(self):
stack_for_inner = None
async def inner():
await asyncio.sleep(0)
nonlocal stack_for_inner
stack_for_inner = capture_test_stack()
async def c1():
async with asyncio.timeout(1):
await asyncio.sleep(0)
await inner()
async def c2():
for i in range(3):
await asyncio.sleep(0)
async def main(t1, t2):
while True:
_, pending = await asyncio.wait([t1, t2])
if not pending:
break
t1 = asyncio.create_task(c1())
t2 = asyncio.create_task(c2())
try:
await main(t1, t2)
finally:
await t1
await t2
self.assertEqual(stack_for_inner[0], [
'T<anon>',
['s capture_test_stack', 'a inner', 'a c1'],
[
['T<anon>',
['a _wait', 'a wait', 'a main', 'a test_stack_wait'],
[]
]
]
])
async def test_stack_task(self):
stack_for_inner = None
async def inner():
await asyncio.sleep(0)
nonlocal stack_for_inner
stack_for_inner = capture_test_stack()
async def c1():
await inner()
async def c2():
await asyncio.create_task(c1(), name='there there')
async def main():
await c2()
await main()
self.assertEqual(stack_for_inner[0], [
'T<there there>',
['s capture_test_stack', 'a inner', 'a c1'],
[['T<anon>', ['a c2', 'a main', 'a test_stack_task'], []]]
])
async def test_stack_future(self):
stack_for_fut = None
async def a2(fut):
await fut
async def a1(fut):
await a2(fut)
async def b1(fut):
await fut
async def main():
nonlocal stack_for_fut
fut = asyncio.Future()
async with asyncio.TaskGroup() as g:
g.create_task(a1(fut), name="task A")
g.create_task(b1(fut), name='task B')
for _ in range(5):
# Do a few iterations to ensure that both a1 and b1
# await on the future
await asyncio.sleep(0)
stack_for_fut = capture_test_stack(fut=fut)
fut.set_result(None)
await main()
self.assertEqual(stack_for_fut[0],
['F',
[],
[
['T<task A>',
['a a2', 'a a1'],
[['T<anon>', ['a test_stack_future'], []]]
],
['T<task B>',
['a b1'],
[['T<anon>', ['a test_stack_future'], []]]
],
]]
)
self.assertTrue(stack_for_fut[1].startswith('* Future(id='))
@unittest.skipIf(
not hasattr(asyncio.futures, "_c_future_add_to_awaited_by"),
"C-accelerated asyncio call graph backend missing",
)
class TestCallStackC(CallStackTestBase, unittest.IsolatedAsyncioTestCase):
def setUp(self):
futures = asyncio.futures
tasks = asyncio.tasks
self._Future = asyncio.Future
asyncio.Future = futures.Future = futures._CFuture
self._Task = asyncio.Task
asyncio.Task = tasks.Task = tasks._CTask
self._future_add_to_awaited_by = asyncio.future_add_to_awaited_by
futures.future_add_to_awaited_by = futures._c_future_add_to_awaited_by
asyncio.future_add_to_awaited_by = futures.future_add_to_awaited_by
self._future_discard_from_awaited_by = asyncio.future_discard_from_awaited_by
futures.future_discard_from_awaited_by = futures._c_future_discard_from_awaited_by
asyncio.future_discard_from_awaited_by = futures.future_discard_from_awaited_by
def tearDown(self):
futures = asyncio.futures
tasks = asyncio.tasks
futures.future_discard_from_awaited_by = self._future_discard_from_awaited_by
asyncio.future_discard_from_awaited_by = self._future_discard_from_awaited_by
del self._future_discard_from_awaited_by
futures.future_add_to_awaited_by = self._future_add_to_awaited_by
asyncio.future_add_to_awaited_by = self._future_add_to_awaited_by
del self._future_add_to_awaited_by
asyncio.Task = self._Task
tasks.Task = self._Task
del self._Task
asyncio.Future = self._Future
futures.Future = self._Future
del self._Future
@unittest.skipIf(
not hasattr(asyncio.futures, "_py_future_add_to_awaited_by"),
"Pure Python asyncio call graph backend missing",
)
class TestCallStackPy(CallStackTestBase, unittest.IsolatedAsyncioTestCase):
def setUp(self):
futures = asyncio.futures
tasks = asyncio.tasks
self._Future = asyncio.Future
asyncio.Future = futures.Future = futures._PyFuture
self._Task = asyncio.Task
asyncio.Task = tasks.Task = tasks._PyTask
self._future_add_to_awaited_by = asyncio.future_add_to_awaited_by
futures.future_add_to_awaited_by = futures._py_future_add_to_awaited_by
asyncio.future_add_to_awaited_by = futures.future_add_to_awaited_by
self._future_discard_from_awaited_by = asyncio.future_discard_from_awaited_by
futures.future_discard_from_awaited_by = futures._py_future_discard_from_awaited_by
asyncio.future_discard_from_awaited_by = futures.future_discard_from_awaited_by
def tearDown(self):
futures = asyncio.futures
tasks = asyncio.tasks
futures.future_discard_from_awaited_by = self._future_discard_from_awaited_by
asyncio.future_discard_from_awaited_by = self._future_discard_from_awaited_by
del self._future_discard_from_awaited_by
futures.future_add_to_awaited_by = self._future_add_to_awaited_by
asyncio.future_add_to_awaited_by = self._future_add_to_awaited_by
del self._future_add_to_awaited_by
asyncio.Task = self._Task
tasks.Task = self._Task
del self._Task
asyncio.Future = self._Future
futures.Future = self._Future
del self._Future

View file

@ -13,8 +13,10 @@ PROCESS_VM_READV_SUPPORTED = False
try:
from _testexternalinspection import PROCESS_VM_READV_SUPPORTED
from _testexternalinspection import get_stack_trace
from _testexternalinspection import get_async_stack_trace
except ImportError:
raise unittest.SkipTest("Test only runs when _testexternalinspection is available")
raise unittest.SkipTest(
"Test only runs when _testexternalinspection is available")
def _make_test_script(script_dir, script_basename, source):
to_return = make_script(script_dir, script_basename, source)
@ -23,12 +25,14 @@ def _make_test_script(script_dir, script_basename, source):
class TestGetStackTrace(unittest.TestCase):
@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", "Test only runs on Linux and MacOS")
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, "Test only runs on Linux with process_vm_readv support")
@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux",
"Test only runs on Linux and MacOS")
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support")
def test_remote_stack_trace(self):
# Spawn a process with some realistic Python code
script = textwrap.dedent("""\
import time, sys, os
import time, sys
def bar():
for x in range(100):
if x == 50:
@ -37,8 +41,8 @@ class TestGetStackTrace(unittest.TestCase):
foo()
def foo():
fifo = sys.argv[1]
with open(sys.argv[1], "w") as fifo:
fifo_path = sys.argv[1]
with open(fifo_path, "w") as fifo:
fifo.write("ready")
time.sleep(1000)
@ -74,8 +78,218 @@ class TestGetStackTrace(unittest.TestCase):
]
self.assertEqual(stack_trace, expected_stack_trace)
@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", "Test only runs on Linux and MacOS")
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, "Test only runs on Linux with process_vm_readv support")
@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux",
"Test only runs on Linux and MacOS")
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support")
def test_async_remote_stack_trace(self):
# Spawn a process with some realistic Python code
script = textwrap.dedent("""\
import asyncio
import time
import sys
def c5():
fifo_path = sys.argv[1]
with open(fifo_path, "w") as fifo:
fifo.write("ready")
time.sleep(10000)
async def c4():
await asyncio.sleep(0)
c5()
async def c3():
await c4()
async def c2():
await c3()
async def c1(task):
await task
async def main():
async with asyncio.TaskGroup() as tg:
task = tg.create_task(c2(), name="c2_root")
tg.create_task(c1(task), name="sub_main_1")
tg.create_task(c1(task), name="sub_main_2")
def new_eager_loop():
loop = asyncio.new_event_loop()
eager_task_factory = asyncio.create_eager_task_factory(
asyncio.Task)
loop.set_task_factory(eager_task_factory)
return loop
asyncio.run(main(), loop_factory={TASK_FACTORY})
""")
stack_trace = None
for task_factory_variant in "asyncio.new_event_loop", "new_eager_loop":
with (
self.subTest(task_factory_variant=task_factory_variant),
os_helper.temp_dir() as work_dir,
):
script_dir = os.path.join(work_dir, "script_pkg")
os.mkdir(script_dir)
fifo = f"{work_dir}/the_fifo"
os.mkfifo(fifo)
script_name = _make_test_script(
script_dir, 'script',
script.format(TASK_FACTORY=task_factory_variant))
try:
p = subprocess.Popen(
[sys.executable, script_name, str(fifo)]
)
with open(fifo, "r") as fifo_file:
response = fifo_file.read()
self.assertEqual(response, "ready")
stack_trace = get_async_stack_trace(p.pid)
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace")
finally:
os.remove(fifo)
p.kill()
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)
# sets are unordered, so we want to sort "awaited_by"s
stack_trace[2].sort(key=lambda x: x[1])
root_task = "Task-1"
expected_stack_trace = [
["c5", "c4", "c3", "c2"],
"c2_root",
[
[["main"], root_task, []],
[["c1"], "sub_main_1", [[["main"], root_task, []]]],
[["c1"], "sub_main_2", [[["main"], root_task, []]]],
],
]
self.assertEqual(stack_trace, expected_stack_trace)
@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux",
"Test only runs on Linux and MacOS")
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support")
def test_asyncgen_remote_stack_trace(self):
# Spawn a process with some realistic Python code
script = textwrap.dedent("""\
import asyncio
import time
import sys
async def gen_nested_call():
fifo_path = sys.argv[1]
with open(fifo_path, "w") as fifo:
fifo.write("ready")
time.sleep(10000)
async def gen():
for num in range(2):
yield num
if num == 1:
await gen_nested_call()
async def main():
async for el in gen():
pass
asyncio.run(main())
""")
stack_trace = None
with os_helper.temp_dir() as work_dir:
script_dir = os.path.join(work_dir, "script_pkg")
os.mkdir(script_dir)
fifo = f"{work_dir}/the_fifo"
os.mkfifo(fifo)
script_name = _make_test_script(script_dir, 'script', script)
try:
p = subprocess.Popen([sys.executable, script_name, str(fifo)])
with open(fifo, "r") as fifo_file:
response = fifo_file.read()
self.assertEqual(response, "ready")
stack_trace = get_async_stack_trace(p.pid)
except PermissionError:
self.skipTest("Insufficient permissions to read the stack trace")
finally:
os.remove(fifo)
p.kill()
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)
# sets are unordered, so we want to sort "awaited_by"s
stack_trace[2].sort(key=lambda x: x[1])
expected_stack_trace = [
['gen_nested_call', 'gen', 'main'], 'Task-1', []
]
self.assertEqual(stack_trace, expected_stack_trace)
@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux",
"Test only runs on Linux and MacOS")
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support")
def test_async_gather_remote_stack_trace(self):
# Spawn a process with some realistic Python code
script = textwrap.dedent("""\
import asyncio
import time
import sys
async def deep():
await asyncio.sleep(0)
fifo_path = sys.argv[1]
with open(fifo_path, "w") as fifo:
fifo.write("ready")
time.sleep(10000)
async def c1():
await asyncio.sleep(0)
await deep()
async def c2():
await asyncio.sleep(0)
async def main():
await asyncio.gather(c1(), c2())
asyncio.run(main())
""")
stack_trace = None
with os_helper.temp_dir() as work_dir:
script_dir = os.path.join(work_dir, "script_pkg")
os.mkdir(script_dir)
fifo = f"{work_dir}/the_fifo"
os.mkfifo(fifo)
script_name = _make_test_script(script_dir, 'script', script)
try:
p = subprocess.Popen([sys.executable, script_name, str(fifo)])
with open(fifo, "r") as fifo_file:
response = fifo_file.read()
self.assertEqual(response, "ready")
stack_trace = get_async_stack_trace(p.pid)
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace")
finally:
os.remove(fifo)
p.kill()
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)
# sets are unordered, so we want to sort "awaited_by"s
stack_trace[2].sort(key=lambda x: x[1])
expected_stack_trace = [
['deep', 'c1'], 'Task-2', [[['main'], 'Task-1', []]]
]
self.assertEqual(stack_trace, expected_stack_trace)
@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux",
"Test only runs on Linux and MacOS")
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support")
def test_self_trace(self):
stack_trace = get_stack_trace(os.getpid())
self.assertEqual(stack_trace[0], "test_self_trace")

View file

@ -222,6 +222,56 @@ class FrameAttrsTest(unittest.TestCase):
with self.assertRaises(AttributeError):
del f.f_lineno
def test_f_generator(self):
# Test f_generator in different contexts.
def t0():
def nested():
frame = sys._getframe()
return frame.f_generator
def gen():
yield nested()
g = gen()
try:
return next(g)
finally:
g.close()
def t1():
frame = sys._getframe()
return frame.f_generator
def t2():
frame = sys._getframe()
yield frame.f_generator
async def t3():
frame = sys._getframe()
return frame.f_generator
# For regular functions f_generator is None
self.assertIsNone(t0())
self.assertIsNone(t1())
# For generators f_generator is equal to self
g = t2()
try:
frame_g = next(g)
self.assertIs(g, frame_g)
finally:
g.close()
# Ditto for coroutines
c = t3()
try:
c.send(None)
except StopIteration as ex:
self.assertIs(ex.value, c)
else:
raise AssertionError('coroutine did not exit')
class ReprTest(unittest.TestCase):
"""

View file

@ -0,0 +1,2 @@
Add :func:`asyncio.capture_call_graph` and
:func:`asyncio.print_call_graph` functions.

View file

@ -40,12 +40,17 @@ typedef enum {
PyObject *prefix##_source_tb; \
PyObject *prefix##_cancel_msg; \
PyObject *prefix##_cancelled_exc; \
PyObject *prefix##_awaited_by; \
fut_state prefix##_state; \
/* These bitfields need to be at the end of the struct
so that these and bitfields from TaskObj are contiguous.
/* Used by profilers to make traversing the stack from an external \
process faster. */ \
char prefix##_is_task; \
char prefix##_awaited_by_is_set; \
/* These bitfields need to be at the end of the struct \
so that these and bitfields from TaskObj are contiguous. \
*/ \
unsigned prefix##_log_tb: 1; \
unsigned prefix##_blocking: 1;
unsigned prefix##_blocking: 1; \
typedef struct {
FutureObj_HEAD(fut)
@ -69,12 +74,24 @@ typedef struct {
PyObject *sw_arg;
} TaskStepMethWrapper;
#define Future_CheckExact(state, obj) Py_IS_TYPE(obj, state->FutureType)
#define Task_CheckExact(state, obj) Py_IS_TYPE(obj, state->TaskType)
#define Future_Check(state, obj) PyObject_TypeCheck(obj, state->FutureType)
#define Task_Check(state, obj) PyObject_TypeCheck(obj, state->TaskType)
#define Future_Check(state, obj) \
(Future_CheckExact(state, obj) \
|| PyObject_TypeCheck(obj, state->FutureType))
#define Task_Check(state, obj) \
(Task_CheckExact(state, obj) \
|| PyObject_TypeCheck(obj, state->TaskType))
// This macro is optimized to quickly return for native Future *or* Task
// objects by inlining fast "exact" checks to be called first.
#define TaskOrFuture_Check(state, obj) \
(Task_CheckExact(state, obj) \
|| Future_CheckExact(state, obj) \
|| PyObject_TypeCheck(obj, state->FutureType) \
|| PyObject_TypeCheck(obj, state->TaskType))
#ifdef Py_GIL_DISABLED
# define ASYNCIO_STATE_LOCK(state) Py_BEGIN_CRITICAL_SECTION_MUT(&state->mutex)
@ -84,6 +101,37 @@ typedef struct {
# define ASYNCIO_STATE_UNLOCK(state) ((void)state)
#endif
typedef struct _Py_AsyncioModuleDebugOffsets {
struct _asyncio_task_object {
uint64_t size;
uint64_t task_name;
uint64_t task_awaited_by;
uint64_t task_is_task;
uint64_t task_awaited_by_is_set;
uint64_t task_coro;
} asyncio_task_object;
struct _asyncio_thread_state {
uint64_t size;
uint64_t asyncio_running_loop;
uint64_t asyncio_running_task;
} asyncio_thread_state;
} Py_AsyncioModuleDebugOffsets;
GENERATE_DEBUG_SECTION(AsyncioDebug, Py_AsyncioModuleDebugOffsets AsyncioDebug)
= {.asyncio_task_object = {
.size = sizeof(TaskObj),
.task_name = offsetof(TaskObj, task_name),
.task_awaited_by = offsetof(TaskObj, task_awaited_by),
.task_is_task = offsetof(TaskObj, task_is_task),
.task_awaited_by_is_set = offsetof(TaskObj, task_awaited_by_is_set),
.task_coro = offsetof(TaskObj, task_coro),
},
.asyncio_thread_state = {
.size = sizeof(_PyThreadStateImpl),
.asyncio_running_loop = offsetof(_PyThreadStateImpl, asyncio_running_loop),
.asyncio_running_task = offsetof(_PyThreadStateImpl, asyncio_running_task),
}};
/* State of the _asyncio module */
typedef struct {
#ifdef Py_GIL_DISABLED
@ -185,6 +233,22 @@ static PyObject *
task_step_handle_result_impl(asyncio_state *state, TaskObj *task, PyObject *result);
static void
clear_task_coro(TaskObj *task)
{
Py_CLEAR(task->task_coro);
}
static void
set_task_coro(TaskObj *task, PyObject *coro)
{
assert(coro != NULL);
Py_INCREF(coro);
Py_XSETREF(task->task_coro, coro);
}
static int
_is_coroutine(asyncio_state *state, PyObject *coro)
{
@ -437,10 +501,13 @@ future_init(FutureObj *fut, PyObject *loop)
Py_CLEAR(fut->fut_source_tb);
Py_CLEAR(fut->fut_cancel_msg);
Py_CLEAR(fut->fut_cancelled_exc);
Py_CLEAR(fut->fut_awaited_by);
fut->fut_state = STATE_PENDING;
fut->fut_log_tb = 0;
fut->fut_blocking = 0;
fut->fut_awaited_by_is_set = 0;
fut->fut_is_task = 0;
if (loop == Py_None) {
asyncio_state *state = get_asyncio_state_by_def((PyObject *)fut);
@ -480,6 +547,115 @@ future_init(FutureObj *fut, PyObject *loop)
return 0;
}
static int
future_awaited_by_add(asyncio_state *state, PyObject *fut, PyObject *thing)
{
if (!TaskOrFuture_Check(state, fut) || !TaskOrFuture_Check(state, thing)) {
// We only want to support native asyncio Futures.
// For further insight see the comment in the Python
// implementation of "future_add_to_awaited_by()".
return 0;
}
FutureObj *_fut = (FutureObj *)fut;
/* Most futures/task are only awaited by one entity, so we want
to avoid always creating a set for `fut_awaited_by`.
*/
if (_fut->fut_awaited_by == NULL) {
assert(!_fut->fut_awaited_by_is_set);
Py_INCREF(thing);
_fut->fut_awaited_by = thing;
return 0;
}
if (_fut->fut_awaited_by_is_set) {
assert(PySet_CheckExact(_fut->fut_awaited_by));
return PySet_Add(_fut->fut_awaited_by, thing);
}
PyObject *set = PySet_New(NULL);
if (set == NULL) {
return -1;
}
if (PySet_Add(set, thing)) {
Py_DECREF(set);
return -1;
}
if (PySet_Add(set, _fut->fut_awaited_by)) {
Py_DECREF(set);
return -1;
}
Py_SETREF(_fut->fut_awaited_by, set);
_fut->fut_awaited_by_is_set = 1;
return 0;
}
static int
future_awaited_by_discard(asyncio_state *state, PyObject *fut, PyObject *thing)
{
if (!TaskOrFuture_Check(state, fut) || !TaskOrFuture_Check(state, thing)) {
// We only want to support native asyncio Futures.
// For further insight see the comment in the Python
// implementation of "future_add_to_awaited_by()".
return 0;
}
FutureObj *_fut = (FutureObj *)fut;
/* Following the semantics of 'set.discard()' here in not
raising an error if `thing` isn't in the `awaited_by` "set".
*/
if (_fut->fut_awaited_by == NULL) {
return 0;
}
if (_fut->fut_awaited_by == thing) {
Py_CLEAR(_fut->fut_awaited_by);
return 0;
}
if (_fut->fut_awaited_by_is_set) {
assert(PySet_CheckExact(_fut->fut_awaited_by));
int err = PySet_Discard(_fut->fut_awaited_by, thing);
if (err < 0) {
return -1;
} else {
return 0;
}
}
return 0;
}
/*[clinic input]
@critical_section
@getter
_asyncio.Future._asyncio_awaited_by
[clinic start generated code]*/
static PyObject *
_asyncio_Future__asyncio_awaited_by_get_impl(FutureObj *self)
/*[clinic end generated code: output=932af76d385d2e2a input=64c1783df2d44d2b]*/
{
/* Implementation of a Python getter. */
if (self->fut_awaited_by == NULL) {
Py_RETURN_NONE;
}
if (self->fut_awaited_by_is_set) {
/* Already a set, just wrap it into a frozen set and return. */
assert(PySet_CheckExact(self->fut_awaited_by));
return PyFrozenSet_New(self->fut_awaited_by);
}
PyObject *set = PyFrozenSet_New(NULL);
if (set == NULL) {
return NULL;
}
if (PySet_Add(set, self->fut_awaited_by)) {
Py_DECREF(set);
return NULL;
}
return set;
}
static PyObject *
future_set_result(asyncio_state *state, FutureObj *fut, PyObject *res)
{
@ -780,6 +956,8 @@ FutureObj_clear(FutureObj *fut)
Py_CLEAR(fut->fut_source_tb);
Py_CLEAR(fut->fut_cancel_msg);
Py_CLEAR(fut->fut_cancelled_exc);
Py_CLEAR(fut->fut_awaited_by);
fut->fut_awaited_by_is_set = 0;
PyObject_ClearManagedDict((PyObject *)fut);
return 0;
}
@ -798,6 +976,7 @@ FutureObj_traverse(FutureObj *fut, visitproc visit, void *arg)
Py_VISIT(fut->fut_source_tb);
Py_VISIT(fut->fut_cancel_msg);
Py_VISIT(fut->fut_cancelled_exc);
Py_VISIT(fut->fut_awaited_by);
PyObject_VisitManagedDict((PyObject *)fut, visit, arg);
return 0;
}
@ -1577,6 +1756,7 @@ static PyGetSetDef FutureType_getsetlist[] = {
_ASYNCIO_FUTURE__LOG_TRACEBACK_GETSETDEF
_ASYNCIO_FUTURE__SOURCE_TRACEBACK_GETSETDEF
_ASYNCIO_FUTURE__CANCEL_MESSAGE_GETSETDEF
_ASYNCIO_FUTURE__ASYNCIO_AWAITED_BY_GETSETDEF
{NULL} /* Sentinel */
};
@ -2053,7 +2233,50 @@ enter_task(asyncio_state *state, PyObject *loop, PyObject *task)
Py_DECREF(item);
return -1;
}
Py_DECREF(item);
assert(task == item);
Py_CLEAR(item);
// This block is needed to enable `asyncio.capture_call_graph()` API.
// We want to be enable debuggers and profilers to be able to quickly
// introspect the asyncio running state from another process.
// When we do that, we need to essentially traverse the address space
// of a Python process and understand what every Python thread in it is
// currently doing, mainly:
//
// * current frame
// * current asyncio task
//
// A naive solution would be to require profilers and debuggers to
// find the current task in the "_asynciomodule" module state, but
// unfortunately that would require a lot of complicated remote
// memory reads and logic, as Python's dict is a notoriously complex
// and ever-changing data structure.
//
// So the easier solution is to put a strong reference to the currently
// running `asyncio.Task` on the interpreter thread state (we already
// have some asyncio state there.)
_PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET();
if (ts->asyncio_running_loop == loop) {
// Protect from a situation when someone calls this method
// from another thread. This shouldn't ever happen though,
// as `enter_task` and `leave_task` can either be called by:
//
// - `asyncio.Task` itself, in `Task.__step()`. That method
// can only be called by the event loop itself.
//
// - third-party Task "from scratch" implementations, that
// our `capture_call_graph` API doesn't support anyway.
//
// That said, we still want to make sure we don't end up in
// a broken state, so we check that we're in the correct thread
// by comparing the *loop* argument to the event loop running
// in the current thread. If they match we know we're in the
// right thread, as asyncio event loops don't change threads.
assert(ts->asyncio_running_task == NULL);
ts->asyncio_running_task = Py_NewRef(task);
}
return 0;
}
@ -2078,7 +2301,6 @@ leave_task_predicate(PyObject *item, void *task)
static int
leave_task(asyncio_state *state, PyObject *loop, PyObject *task)
/*[clinic end generated code: output=0ebf6db4b858fb41 input=51296a46313d1ad8]*/
{
int res = _PyDict_DelItemIf(state->current_tasks, loop,
leave_task_predicate, task);
@ -2086,6 +2308,14 @@ leave_task(asyncio_state *state, PyObject *loop, PyObject *task)
// task was not found
return err_leave_task(Py_None, task);
}
// See the comment in `enter_task` for the explanation of why
// the following is needed.
_PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET();
if (ts->asyncio_running_loop == NULL || ts->asyncio_running_loop == loop) {
Py_CLEAR(ts->asyncio_running_task);
}
return res;
}
@ -2158,6 +2388,7 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop,
if (future_init((FutureObj*)self, loop)) {
return -1;
}
self->task_is_task = 1;
asyncio_state *state = get_asyncio_state_by_def((PyObject *)self);
int is_coro = is_coroutine(state, coro);
@ -2185,8 +2416,7 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop,
self->task_must_cancel = 0;
self->task_log_destroy_pending = 1;
self->task_num_cancels_requested = 0;
Py_INCREF(coro);
Py_XSETREF(self->task_coro, coro);
set_task_coro(self, coro);
if (name == Py_None) {
// optimization: defer task name formatting
@ -2234,8 +2464,8 @@ static int
TaskObj_clear(TaskObj *task)
{
(void)FutureObj_clear((FutureObj*) task);
clear_task_coro(task);
Py_CLEAR(task->task_context);
Py_CLEAR(task->task_coro);
Py_CLEAR(task->task_name);
Py_CLEAR(task->task_fut_waiter);
return 0;
@ -2260,6 +2490,7 @@ TaskObj_traverse(TaskObj *task, visitproc visit, void *arg)
Py_VISIT(fut->fut_source_tb);
Py_VISIT(fut->fut_cancel_msg);
Py_VISIT(fut->fut_cancelled_exc);
Py_VISIT(fut->fut_awaited_by);
PyObject_VisitManagedDict((PyObject *)fut, visit, arg);
return 0;
}
@ -3050,6 +3281,10 @@ task_step_handle_result_impl(asyncio_state *state, TaskObj *task, PyObject *resu
goto yield_insteadof_yf;
}
if (future_awaited_by_add(state, result, (PyObject *)task)) {
goto fail;
}
fut->fut_blocking = 0;
/* result.add_done_callback(task._wakeup) */
@ -3139,6 +3374,10 @@ task_step_handle_result_impl(asyncio_state *state, TaskObj *task, PyObject *resu
goto yield_insteadof_yf;
}
if (future_awaited_by_add(state, result, (PyObject *)task)) {
goto fail;
}
/* result._asyncio_future_blocking = False */
if (PyObject_SetAttr(
result, &_Py_ID(_asyncio_future_blocking), Py_False) == -1) {
@ -3335,7 +3574,7 @@ task_eager_start(asyncio_state *state, TaskObj *task)
register_task(state, task);
} else {
// This seems to really help performance on pyperformance benchmarks
Py_CLEAR(task->task_coro);
clear_task_coro(task);
}
return retval;
@ -3350,6 +3589,11 @@ task_wakeup_lock_held(TaskObj *task, PyObject *o)
assert(o);
asyncio_state *state = get_asyncio_state_by_def((PyObject *)task);
if (future_awaited_by_discard(state, o, (PyObject *)task)) {
return NULL;
}
if (Future_CheckExact(state, o) || Task_CheckExact(state, o)) {
PyObject *fut_result = NULL;
int res;
@ -3833,6 +4077,50 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop)
return res;
}
/*[clinic input]
_asyncio.future_add_to_awaited_by
fut: object
waiter: object
/
Record that `fut` is awaited on by `waiter`.
[clinic start generated code]*/
static PyObject *
_asyncio_future_add_to_awaited_by_impl(PyObject *module, PyObject *fut,
PyObject *waiter)
/*[clinic end generated code: output=0ab9a1a63389e4df input=06e6eaac51f532b9]*/
{
asyncio_state *state = get_asyncio_state(module);
if (future_awaited_by_add(state, fut, waiter)) {
return NULL;
}
Py_RETURN_NONE;
}
/*[clinic input]
_asyncio.future_discard_from_awaited_by
fut: object
waiter: object
/
[clinic start generated code]*/
static PyObject *
_asyncio_future_discard_from_awaited_by_impl(PyObject *module, PyObject *fut,
PyObject *waiter)
/*[clinic end generated code: output=a03b0b4323b779de input=3833f7639e88e483]*/
{
asyncio_state *state = get_asyncio_state(module);
if (future_awaited_by_discard(state, fut, waiter)) {
return NULL;
}
Py_RETURN_NONE;
}
static int
module_traverse(PyObject *mod, visitproc visit, void *arg)
{
@ -3896,6 +4184,7 @@ module_clear(PyObject *mod)
// those get cleared in PyThreadState_Clear.
_PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET();
Py_CLEAR(ts->asyncio_running_loop);
Py_CLEAR(ts->asyncio_running_task);
return 0;
}
@ -3926,7 +4215,6 @@ module_init(asyncio_state *state)
goto fail;
}
state->context_kwname = Py_BuildValue("(s)", "context");
if (state->context_kwname == NULL) {
goto fail;
@ -4007,6 +4295,8 @@ static PyMethodDef asyncio_methods[] = {
_ASYNCIO__LEAVE_TASK_METHODDEF
_ASYNCIO__SWAP_CURRENT_TASK_METHODDEF
_ASYNCIO_ALL_TASKS_METHODDEF
_ASYNCIO_FUTURE_ADD_TO_AWAITED_BY_METHODDEF
_ASYNCIO_FUTURE_DISCARD_FROM_AWAITED_BY_METHODDEF
{NULL, NULL}
};

File diff suppressed because it is too large Load diff

View file

@ -9,6 +9,31 @@ preserve
#include "pycore_critical_section.h"// Py_BEGIN_CRITICAL_SECTION()
#include "pycore_modsupport.h" // _PyArg_UnpackKeywords()
#if !defined(_asyncio_Future__asyncio_awaited_by_DOCSTR)
# define _asyncio_Future__asyncio_awaited_by_DOCSTR NULL
#endif
#if defined(_ASYNCIO_FUTURE__ASYNCIO_AWAITED_BY_GETSETDEF)
# undef _ASYNCIO_FUTURE__ASYNCIO_AWAITED_BY_GETSETDEF
# define _ASYNCIO_FUTURE__ASYNCIO_AWAITED_BY_GETSETDEF {"_asyncio_awaited_by", (getter)_asyncio_Future__asyncio_awaited_by_get, (setter)_asyncio_Future__asyncio_awaited_by_set, _asyncio_Future__asyncio_awaited_by_DOCSTR},
#else
# define _ASYNCIO_FUTURE__ASYNCIO_AWAITED_BY_GETSETDEF {"_asyncio_awaited_by", (getter)_asyncio_Future__asyncio_awaited_by_get, NULL, _asyncio_Future__asyncio_awaited_by_DOCSTR},
#endif
static PyObject *
_asyncio_Future__asyncio_awaited_by_get_impl(FutureObj *self);
static PyObject *
_asyncio_Future__asyncio_awaited_by_get(PyObject *self, void *Py_UNUSED(context))
{
PyObject *return_value = NULL;
Py_BEGIN_CRITICAL_SECTION(self);
return_value = _asyncio_Future__asyncio_awaited_by_get_impl((FutureObj *)self);
Py_END_CRITICAL_SECTION();
return return_value;
}
PyDoc_STRVAR(_asyncio_Future___init____doc__,
"Future(*, loop=None)\n"
"--\n"
@ -2088,4 +2113,65 @@ skip_optional_pos:
exit:
return return_value;
}
/*[clinic end generated code: output=ec2fa1d60b094978 input=a9049054013a1b77]*/
PyDoc_STRVAR(_asyncio_future_add_to_awaited_by__doc__,
"future_add_to_awaited_by($module, fut, waiter, /)\n"
"--\n"
"\n"
"Record that `fut` is awaited on by `waiter`.");
#define _ASYNCIO_FUTURE_ADD_TO_AWAITED_BY_METHODDEF \
{"future_add_to_awaited_by", _PyCFunction_CAST(_asyncio_future_add_to_awaited_by), METH_FASTCALL, _asyncio_future_add_to_awaited_by__doc__},
static PyObject *
_asyncio_future_add_to_awaited_by_impl(PyObject *module, PyObject *fut,
PyObject *waiter);
static PyObject *
_asyncio_future_add_to_awaited_by(PyObject *module, PyObject *const *args, Py_ssize_t nargs)
{
PyObject *return_value = NULL;
PyObject *fut;
PyObject *waiter;
if (!_PyArg_CheckPositional("future_add_to_awaited_by", nargs, 2, 2)) {
goto exit;
}
fut = args[0];
waiter = args[1];
return_value = _asyncio_future_add_to_awaited_by_impl(module, fut, waiter);
exit:
return return_value;
}
PyDoc_STRVAR(_asyncio_future_discard_from_awaited_by__doc__,
"future_discard_from_awaited_by($module, fut, waiter, /)\n"
"--\n"
"\n");
#define _ASYNCIO_FUTURE_DISCARD_FROM_AWAITED_BY_METHODDEF \
{"future_discard_from_awaited_by", _PyCFunction_CAST(_asyncio_future_discard_from_awaited_by), METH_FASTCALL, _asyncio_future_discard_from_awaited_by__doc__},
static PyObject *
_asyncio_future_discard_from_awaited_by_impl(PyObject *module, PyObject *fut,
PyObject *waiter);
static PyObject *
_asyncio_future_discard_from_awaited_by(PyObject *module, PyObject *const *args, Py_ssize_t nargs)
{
PyObject *return_value = NULL;
PyObject *fut;
PyObject *waiter;
if (!_PyArg_CheckPositional("future_discard_from_awaited_by", nargs, 2, 2)) {
goto exit;
}
fut = args[0];
waiter = args[1];
return_value = _asyncio_future_discard_from_awaited_by_impl(module, fut, waiter);
exit:
return return_value;
}
/*[clinic end generated code: output=fe4ffe08404ad566 input=a9049054013a1b77]*/

View file

@ -1672,6 +1672,15 @@ frame_settrace(PyFrameObject *f, PyObject* v, void *closure)
return 0;
}
static PyObject *
frame_getgenerator(PyFrameObject *f, void *arg) {
if (f->f_frame->owner == FRAME_OWNED_BY_GENERATOR) {
PyObject *gen = (PyObject *)_PyGen_GetGeneratorFromFrame(f->f_frame);
return Py_NewRef(gen);
}
Py_RETURN_NONE;
}
static PyGetSetDef frame_getsetlist[] = {
{"f_back", (getter)frame_getback, NULL, NULL},
@ -1684,6 +1693,7 @@ static PyGetSetDef frame_getsetlist[] = {
{"f_builtins", (getter)frame_getbuiltins, NULL, NULL},
{"f_code", (getter)frame_getcode, NULL, NULL},
{"f_trace_opcodes", (getter)frame_gettrace_opcodes, (setter)frame_settrace_opcodes, NULL},
{"f_generator", (getter)frame_getgenerator, NULL, NULL},
{0}
};

View file

@ -1153,7 +1153,6 @@ cr_getcode(PyObject *coro, void *Py_UNUSED(ignored))
return _gen_getcode(_PyGen_CAST(coro), "cr_code");
}
static PyGetSetDef coro_getsetlist[] = {
{"__name__", gen_get_name, gen_set_name,
PyDoc_STR("name of the coroutine")},

View file

@ -111,23 +111,7 @@ static void call_ll_exitfuncs(_PyRuntimeState *runtime);
_Py_COMP_DIAG_PUSH
_Py_COMP_DIAG_IGNORE_DEPR_DECLS
#if defined(MS_WINDOWS)
#pragma section("PyRuntime", read, write)
__declspec(allocate("PyRuntime"))
#elif defined(__APPLE__)
__attribute__((
section(SEG_DATA ",PyRuntime")
))
#endif
_PyRuntimeState _PyRuntime
#if defined(__linux__) && (defined(__GNUC__) || defined(__clang__))
__attribute__ ((section (".PyRuntime")))
#endif
GENERATE_DEBUG_SECTION(PyRuntime, _PyRuntimeState _PyRuntime)
= _PyRuntimeState_INIT(_PyRuntime, _Py_Debug_Cookie);
_Py_COMP_DIAG_POP

View file

@ -1515,6 +1515,7 @@ init_threadstate(_PyThreadStateImpl *_tstate,
tstate->dict_global_version = 0;
_tstate->asyncio_running_loop = NULL;
_tstate->asyncio_running_task = NULL;
tstate->delete_later = NULL;
@ -1697,6 +1698,7 @@ PyThreadState_Clear(PyThreadState *tstate)
Py_CLEAR(tstate->threading_local_sentinel);
Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_loop);
Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_task);
Py_CLEAR(tstate->dict);
Py_CLEAR(tstate->async_exc);

View file

@ -53,6 +53,9 @@ Python/pyhash.c - _Py_HashSecret -
## thread-safe hashtable (internal locks)
Python/parking_lot.c - buckets -
## data needed for introspecting asyncio state from debuggers and profilers
Modules/_asynciomodule.c - AsyncioDebug -
##################################
## state tied to Py_Main()

Can't render this file because it has a wrong number of fields in line 4.