mirror of
https://github.com/python/cpython.git
synced 2025-07-07 19:35:27 +00:00
328 lines
13 KiB
Markdown
328 lines
13 KiB
Markdown
asyncio
|
|
=======
|
|
|
|
|
|
This document describes the working and implementation details of the
|
|
[`asyncio`](https://docs.python.org/3/library/asyncio.html) module.
|
|
|
|
**The following section describes the implementation details of the C implementation**.
|
|
|
|
# Task management
|
|
|
|
## Pre-Python 3.14 implementation
|
|
|
|
Before Python 3.14, the C implementation of `asyncio` used a
|
|
[`WeakSet`](https://docs.python.org/3/library/weakref.html#weakref.WeakSet)
|
|
to store all the tasks created by the event loop. `WeakSet` was used
|
|
so that the event loop doesn't hold strong references to the tasks,
|
|
allowing them to be garbage collected when they are no longer needed.
|
|
The current task of the event loop was stored in a dict mapping the
|
|
event loop to the current task.
|
|
|
|
```c
|
|
/* Dictionary containing tasks that are currently active in
|
|
all running event loops. {EventLoop: Task} */
|
|
PyObject *current_tasks;
|
|
|
|
/* WeakSet containing all tasks scheduled to run on event loops. */
|
|
PyObject *scheduled_tasks;
|
|
```
|
|
|
|
This implementation had a few drawbacks:
|
|
|
|
1. **Performance**: Using a `WeakSet` for storing tasks is
|
|
inefficient, as it requires maintaining a full set of weak references
|
|
to tasks along with corresponding weakref callback to cleanup the
|
|
tasks when they are garbage collected. This increases the work done
|
|
by the garbage collector, and in applications with a large number of
|
|
tasks, this becomes a bottleneck, with increased memory usage and
|
|
lower performance. Looking up the current task was slow as it required
|
|
a dictionary lookup on the `current_tasks` dict.
|
|
|
|
2. **Thread safety**: Before Python 3.14, concurrent iterations over
|
|
`WeakSet` was not thread safe[^1]. This meant calling APIs like
|
|
`asyncio.all_tasks()` could lead to inconsistent results or even
|
|
`RuntimeError` if used in multiple threads[^2].
|
|
|
|
3. **Poor scaling in free-threading**: Using global `WeakSet` for
|
|
storing all tasks across all threads lead to contention when adding
|
|
and removing tasks from the set which is a frequent operation. As such
|
|
it performed poorly in free-threading and did not scale well with the
|
|
number of threads. Similarly, accessing the current task in multiple
|
|
threads did not scale due to contention on the global `current_tasks`
|
|
dictionary.
|
|
|
|
## Python 3.14 implementation
|
|
|
|
To address these issues, Python 3.14 implements several changes to
|
|
improve the performance and thread safety of tasks management.
|
|
|
|
- **Per-thread double linked list for tasks**: Python 3.14 introduces
|
|
a per-thread circular double linked list implementation for
|
|
storing tasks. This allows each thread to maintain its own list of
|
|
tasks and allows for lock free addition and removal of tasks. This
|
|
is designed to be efficient, and thread-safe and scales well with
|
|
the number of threads in free-threading. This also allows external
|
|
introspection tools such as `python -m asyncio pstree` to inspect
|
|
tasks running in all threads and was implemented as part of [Audit
|
|
asyncio thread
|
|
safety](https://github.com/python/cpython/issues/128002).
|
|
|
|
- **Per-thread current task**: Python 3.14 stores the current task on
|
|
the current thread state instead of a global dictionary. This
|
|
allows for faster access to the current task without the need for
|
|
a dictionary lookup. Each thread maintains its own current task,
|
|
which is stored in the `PyThreadState` structure. This was
|
|
implemented in https://github.com/python/cpython/issues/129898.
|
|
|
|
Storing the current task and list of all tasks per-thread instead of
|
|
storing it per-loop was chosen primarily to support external
|
|
introspection tools such as `python -m asyncio pstree` as looking up
|
|
arbitrary attributes on the loop object is not possible
|
|
externally. Storing data per-thread also makes it easy to support
|
|
third party event loop implementations such as `uvloop`, and is more
|
|
efficient for the single threaded asyncio use-case as it avoids the
|
|
overhead of attribute lookups on the loop object and several other
|
|
calls on the performance critical path of adding and removing tasks
|
|
from the per-loop task list.
|
|
|
|
## Per-thread double linked list for tasks
|
|
|
|
This implementation uses a circular doubly linked list to store tasks
|
|
on the thread states. This is used for all tasks which are instances
|
|
of `asyncio.Task` or subclasses of it, for third-party tasks a
|
|
fallback `WeakSet` implementation is used. The linked list is
|
|
implemented using an embedded `llist_node` structure within each
|
|
`TaskObj`. By embedding the list node directly into the task object,
|
|
the implementation avoids additional memory allocations for linked
|
|
list nodes.
|
|
|
|
The `PyThreadState` structure gained a new field `asyncio_tasks_head`,
|
|
which serves as the head of the circular linked list of tasks. This
|
|
allows for lock free addition and removal of tasks from the list.
|
|
|
|
It is possible that when a thread state is deallocated, there are
|
|
lingering tasks in its list; this can happen if another thread has
|
|
references to the tasks of this thread. Therefore, the
|
|
`PyInterpreterState` structure also gains a new `asyncio_tasks_head`
|
|
field to store any lingering tasks. When a thread state is
|
|
deallocated, any remaining lingering tasks are moved to the
|
|
interpreter state tasks list, and the thread state tasks list is
|
|
cleared. The `asyncio_tasks_lock` is used protect the interpreter's
|
|
tasks list from concurrent modifications.
|
|
|
|
```c
|
|
typedef struct TaskObj {
|
|
...
|
|
struct llist_node asyncio_node;
|
|
} TaskObj;
|
|
|
|
typedef struct PyThreadState {
|
|
...
|
|
struct llist_node asyncio_tasks_head;
|
|
} PyThreadState;
|
|
|
|
typedef struct PyInterpreterState {
|
|
...
|
|
struct llist_node asyncio_tasks_head;
|
|
PyMutex asyncio_tasks_lock;
|
|
} PyInterpreterState;
|
|
```
|
|
|
|
When a task is created, it is added to the current thread's list of
|
|
tasks by the `register_task` function. When the task is done, it is
|
|
removed from the list by the `unregister_task` function. In
|
|
free-threading, the thread id of the thread which created the task is
|
|
stored in `task_tid` field of the `TaskObj`. This is used to check if
|
|
the task is being removed from the correct thread's task list. If the
|
|
current thread is same as the thread which created it then no locking
|
|
is required, otherwise in free-threading, the `stop-the-world` pause
|
|
is used to pause all other threads and then safely remove the task
|
|
from the tasks list.
|
|
|
|
```mermaid
|
|
|
|
flowchart TD
|
|
subgraph one["Executing Thread"]
|
|
A["task = asyncio.create_task(coro())"] -->B("register_task(task)")
|
|
B --> C{"task->task_state?"}
|
|
C -->|pending| D["task_step(task)"]
|
|
C -->|done| F["unregister_task(task)"]
|
|
C -->|cancelled| F["unregister_task(task)"]
|
|
D --> C
|
|
F --> G{"free-threading?"}
|
|
G --> |false| H["unregister_task_safe(task)"]
|
|
G --> |true| J{"correct thread? <br>task->task_tid == _Py_ThreadId()"}
|
|
J --> |true| H
|
|
J --> |false| I["stop the world <br> pause all threads"]
|
|
I --> H["unregister_task_safe(task)"]
|
|
end
|
|
subgraph two["Thread deallocating"]
|
|
A1{"thread's task list empty? <br> llist_empty(tstate->asyncio_tasks_head)"}
|
|
A1 --> |true| B1["deallocate thread<br>free_threadstate(tstate)"]
|
|
A1 --> |false| C1["add tasks to interpreter's task list<br> llist_concat(&tstate->interp->asyncio_tasks_head,
|
|
&tstate->asyncio_tasks_head)"]
|
|
C1 --> B1
|
|
end
|
|
|
|
one --> two
|
|
```
|
|
|
|
`asyncio.all_tasks` now iterates over the per-thread task lists of all
|
|
threads and the interpreter's task list to get all the tasks. In
|
|
free-threading, this is done by pausing all the threads using the
|
|
`stop-the-world` pause to ensure that no tasks are being added or
|
|
removed while iterating over the lists. This allows for a consistent
|
|
view of all task lists across all threads and is thread safe.
|
|
|
|
This design allows for lock free execution and scales well in
|
|
free-threading with multiple event loops running in different threads.
|
|
|
|
## Per-thread current task
|
|
|
|
This implementation stores the current task in the `PyThreadState`
|
|
structure, which allows for faster access to the current task without
|
|
the need for a dictionary lookup.
|
|
|
|
```c
|
|
typedef struct PyThreadState {
|
|
...
|
|
PyObject *asyncio_current_loop;
|
|
PyObject *asyncio_current_task;
|
|
} PyThreadState;
|
|
```
|
|
|
|
When a task is entered or left, the current task is updated in the
|
|
thread state using `enter_task` and `leave_task` functions. When
|
|
`current_task(loop)` is called where `loop` is the current running
|
|
event loop of the current thread, no locking is required as the
|
|
current task is stored in the thread state and is returned directly
|
|
(general case). Otherwise, if the `loop` is not current running event
|
|
loop, the `stop-the-world` pause is used to pause all threads in
|
|
free-threading and then by iterating over all the thread states and
|
|
checking if the `loop` matches with `tstate->asyncio_current_loop`,
|
|
the current task is found and returned. If no matching thread state is
|
|
found, `None` is returned.
|
|
|
|
In free-threading, it avoids contention on a global dictionary as
|
|
threads can access the current task of thier running loop without any
|
|
locking.
|
|
|
|
---
|
|
|
|
**The following section describes the implementation details of the Python implementation**.
|
|
|
|
# async generators
|
|
|
|
This section describes the implementation details of async generators in `asyncio`.
|
|
|
|
Since async generators are meant to be used from coroutines,
|
|
their finalization (execution of finally blocks) needs
|
|
to be done while the loop is running.
|
|
Most async generators are closed automatically
|
|
when they are fully iterated over and exhausted; however,
|
|
if the async generator is not fully iterated over,
|
|
it may not be closed properly, leading to the `finally` blocks not being executed.
|
|
|
|
Consider the following code:
|
|
```py
|
|
import asyncio
|
|
|
|
async def agen():
|
|
try:
|
|
yield 1
|
|
finally:
|
|
await asyncio.sleep(1)
|
|
print("finally executed")
|
|
|
|
|
|
async def main():
|
|
async for i in agen():
|
|
break
|
|
|
|
loop = asyncio.EventLoop()
|
|
loop.run_until_complete(main())
|
|
```
|
|
|
|
The above code will not print "finally executed", because the
|
|
async generator `agen` is not fully iterated over
|
|
and it is not closed manually by awaiting `agen.aclose()`.
|
|
|
|
To solve this, `asyncio` uses the `sys.set_asyncgen_hooks` function to
|
|
set hooks for finalizing async generators as described in
|
|
[PEP 525](https://peps.python.org/pep-0525/).
|
|
|
|
- **firstiter hook**: When the async generator is iterated over for the first time,
|
|
the *firstiter hook* is called. The async generator is added to `loop._asyncgens` WeakSet
|
|
and the event loop tracks all active async generators.
|
|
|
|
- **finalizer hook**: When the async generator is about to be finalized,
|
|
the *finalizer hook* is called. The event loop removes the async generator
|
|
from `loop._asyncgens` WeakSet, and schedules the finalization of the async
|
|
generator by creating a task calling `agen.aclose()`. This ensures that the
|
|
finally block is executed while the event loop is running. When the loop is
|
|
shutting down, the loop checks if there are active async generators and if so,
|
|
it similarly schedules the finalization of all active async generators by calling
|
|
`agen.aclose()` on each of them and waits for them to complete before shutting
|
|
down the loop.
|
|
|
|
This ensures that the async generator's `finally` blocks are executed even
|
|
if the generator is not explicitly closed.
|
|
|
|
Consider the following example:
|
|
|
|
```python
|
|
import asyncio
|
|
|
|
async def agen():
|
|
try:
|
|
yield 1
|
|
yield 2
|
|
finally:
|
|
print("executing finally block")
|
|
|
|
async def main():
|
|
async for item in agen():
|
|
print(item)
|
|
break # not fully iterated
|
|
|
|
asyncio.run(main())
|
|
```
|
|
|
|
```mermaid
|
|
flowchart TD
|
|
subgraph one["Loop running"]
|
|
A["asyncio.run(main())"] --> B
|
|
B["set async generator hooks <br> sys.set_asyncgen_hooks()"] --> C
|
|
C["async for item in agen"] --> F
|
|
F{"first iteration?"} --> |true|D
|
|
F{"first iteration?"} --> |false|H
|
|
D["calls firstiter hook<br>loop._asyncgen_firstiter_hook(agen)"] --> E
|
|
E["add agen to WeakSet<br> loop._asyncgens.add(agen)"] --> H
|
|
H["item = await agen.\_\_anext\_\_()"] --> J
|
|
J{"StopAsyncIteration?"} --> |true|M
|
|
J{"StopAsyncIteration?"} --> |false|I
|
|
I["print(item)"] --> S
|
|
S{"continue iterating?"} --> |true|C
|
|
S{"continue iterating?"} --> |false|M
|
|
M{"agen is no longer referenced?"} --> |true|N
|
|
M{"agen is no longer referenced?"} --> |false|two
|
|
N["finalize agen<br>_PyGen_Finalize(agen)"] --> O
|
|
O["calls finalizer hook<br>loop._asyncgen_finalizer_hook(agen)"] --> P
|
|
P["remove agen from WeakSet<br>loop._asyncgens.discard(agen)"] --> Q
|
|
Q["schedule task to close it<br>self.create_task(agen.aclose())"] --> R
|
|
R["print('executing finally block')"] --> E1
|
|
|
|
end
|
|
|
|
subgraph two["Loop shutting down"]
|
|
A1{"check for alive async generators?"} --> |true|B1
|
|
B1["close all async generators <br> await asyncio.gather\(*\[ag.aclose\(\) for ag in loop._asyncgens\]"] --> R
|
|
A1{"check for alive async generators?"} --> |false|E1
|
|
E1["loop.close()"]
|
|
end
|
|
|
|
```
|
|
|
|
[^1]: https://github.com/python/cpython/issues/123089
|
|
[^2]: https://github.com/python/cpython/issues/80788
|