GH-107803: double linked list implementation for asyncio tasks (GH-107804)

* linked list

* add tail optmiization to linked list

* wip

* wip

* wip

* more fixes

* finally it works

* add tests

* remove weakreflist

* add some comments

* reduce code duplication in _asynciomodule.c

* address some review comments

* add invariants about the state of the linked list

* add better explanation

* clinic regen

* reorder branches for better branch prediction

* Update Modules/_asynciomodule.c

* Apply suggestions from code review

Co-authored-by: Itamar Oren <itamarost@gmail.com>

* fix capturing of eager tasks

* add comment to task finalization

* fix tests and couple c implmentation to c task

improved linked-list logic and more comments

* fix test

---------

Co-authored-by: Itamar Oren <itamarost@gmail.com>
This commit is contained in:
Kumar Aditya 2024-06-22 23:28:35 +05:30 committed by GitHub
parent e213475495
commit 4717aaa1a7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 374 additions and 96 deletions

View file

@ -19,6 +19,60 @@ module _asyncio
[clinic start generated code]*/
/*[clinic end generated code: output=da39a3ee5e6b4b0d input=8fd17862aa989c69]*/
typedef enum {
STATE_PENDING,
STATE_CANCELLED,
STATE_FINISHED
} fut_state;
#define FutureObj_HEAD(prefix) \
PyObject_HEAD \
PyObject *prefix##_loop; \
PyObject *prefix##_callback0; \
PyObject *prefix##_context0; \
PyObject *prefix##_callbacks; \
PyObject *prefix##_exception; \
PyObject *prefix##_exception_tb; \
PyObject *prefix##_result; \
PyObject *prefix##_source_tb; \
PyObject *prefix##_cancel_msg; \
PyObject *prefix##_cancelled_exc; \
fut_state prefix##_state; \
/* These bitfields need to be at the end of the struct
so that these and bitfields from TaskObj are contiguous.
*/ \
unsigned prefix##_log_tb: 1; \
unsigned prefix##_blocking: 1;
typedef struct {
FutureObj_HEAD(fut)
} FutureObj;
typedef struct TaskObj {
FutureObj_HEAD(task)
unsigned task_must_cancel: 1;
unsigned task_log_destroy_pending: 1;
int task_num_cancels_requested;
PyObject *task_fut_waiter;
PyObject *task_coro;
PyObject *task_name;
PyObject *task_context;
struct TaskObj *next;
struct TaskObj *prev;
} TaskObj;
typedef struct {
PyObject_HEAD
TaskObj *sw_task;
PyObject *sw_arg;
} TaskStepMethWrapper;
#define Future_CheckExact(state, obj) Py_IS_TYPE(obj, state->FutureType)
#define Task_CheckExact(state, obj) Py_IS_TYPE(obj, state->TaskType)
#define Future_Check(state, obj) PyObject_TypeCheck(obj, state->FutureType)
#define Task_Check(state, obj) PyObject_TypeCheck(obj, state->TaskType)
#define FI_FREELIST_MAXLEN 255
@ -38,8 +92,9 @@ typedef struct {
all running event loops. {EventLoop: Task} */
PyObject *current_tasks;
/* WeakSet containing all tasks scheduled to run on event loops. */
PyObject *scheduled_tasks;
/* WeakSet containing scheduled 3rd party tasks which don't
inherit from native asyncio.Task */
PyObject *non_asyncio_tasks;
/* Set containing all eagerly executing tasks. */
PyObject *eager_tasks;
@ -76,6 +131,51 @@ typedef struct {
futureiterobject *fi_freelist;
Py_ssize_t fi_freelist_len;
/* Linked-list of all tasks which are instances of asyncio.Task or subclasses
of it. Third party tasks implementations which don't inherit from
asyncio.Task are tracked separately using the 'non_asyncio_tasks' WeakSet.
`tail` is used as a sentinel to mark the end of the linked-list. It avoids one
branch in checking for empty list when adding a new task, the list is
initialized with `head` pointing to `tail` to mark an empty list.
Invariants:
* When the list is empty:
- asyncio_tasks.head == &asyncio_tasks.tail
- asyncio_tasks.head->prev == NULL
- asyncio_tasks.head->next == NULL
* After adding the first task 'task1':
- asyncio_tasks.head == task1
- task1->next == &asyncio_tasks.tail
- task1->prev == NULL
- asyncio_tasks.tail.prev == task1
* After adding a second task 'task2':
- asyncio_tasks.head == task2
- task2->next == task1
- task2->prev == NULL
- task1->prev == task2
- asyncio_tasks.tail.prev == task1
* After removing task 'task1':
- asyncio_tasks.head == task2
- task2->next == &asyncio_tasks.tail
- task2->prev == NULL
- asyncio_tasks.tail.prev == task2
* After removing task 'task2', the list is empty:
- asyncio_tasks.head == &asyncio_tasks.tail
- asyncio_tasks.head->prev == NULL
- asyncio_tasks.tail.prev == NULL
- asyncio_tasks.tail.next == NULL
*/
struct {
TaskObj tail;
TaskObj *head;
} asyncio_tasks;
} asyncio_state;
static inline asyncio_state *
@ -105,59 +205,6 @@ get_asyncio_state_by_def(PyObject *self)
return get_asyncio_state(mod);
}
typedef enum {
STATE_PENDING,
STATE_CANCELLED,
STATE_FINISHED
} fut_state;
#define FutureObj_HEAD(prefix) \
PyObject_HEAD \
PyObject *prefix##_loop; \
PyObject *prefix##_callback0; \
PyObject *prefix##_context0; \
PyObject *prefix##_callbacks; \
PyObject *prefix##_exception; \
PyObject *prefix##_exception_tb; \
PyObject *prefix##_result; \
PyObject *prefix##_source_tb; \
PyObject *prefix##_cancel_msg; \
PyObject *prefix##_cancelled_exc; \
fut_state prefix##_state; \
/* These bitfields need to be at the end of the struct
so that these and bitfields from TaskObj are contiguous.
*/ \
unsigned prefix##_log_tb: 1; \
unsigned prefix##_blocking: 1;
typedef struct {
FutureObj_HEAD(fut)
} FutureObj;
typedef struct {
FutureObj_HEAD(task)
unsigned task_must_cancel: 1;
unsigned task_log_destroy_pending: 1;
int task_num_cancels_requested;
PyObject *task_fut_waiter;
PyObject *task_coro;
PyObject *task_name;
PyObject *task_context;
} TaskObj;
typedef struct {
PyObject_HEAD
TaskObj *sw_task;
PyObject *sw_arg;
} TaskStepMethWrapper;
#define Future_CheckExact(state, obj) Py_IS_TYPE(obj, state->FutureType)
#define Task_CheckExact(state, obj) Py_IS_TYPE(obj, state->TaskType)
#define Future_Check(state, obj) PyObject_TypeCheck(obj, state->FutureType)
#define Task_Check(state, obj) PyObject_TypeCheck(obj, state->TaskType)
#include "clinic/_asynciomodule.c.h"
@ -1967,16 +2014,21 @@ static PyMethodDef TaskWakeupDef = {
/* ----- Task introspection helpers */
static int
register_task(asyncio_state *state, PyObject *task)
static void
register_task(asyncio_state *state, TaskObj *task)
{
PyObject *res = PyObject_CallMethodOneArg(state->scheduled_tasks,
&_Py_ID(add), task);
if (res == NULL) {
return -1;
assert(Task_Check(state, task));
assert(task != &state->asyncio_tasks.tail);
if (task->next != NULL) {
// already registered
return;
}
Py_DECREF(res);
return 0;
assert(task->prev == NULL);
assert(state->asyncio_tasks.head != NULL);
task->next = state->asyncio_tasks.head;
state->asyncio_tasks.head->prev = task;
state->asyncio_tasks.head = task;
}
static int
@ -1985,16 +2037,27 @@ register_eager_task(asyncio_state *state, PyObject *task)
return PySet_Add(state->eager_tasks, task);
}
static int
unregister_task(asyncio_state *state, PyObject *task)
static void
unregister_task(asyncio_state *state, TaskObj *task)
{
PyObject *res = PyObject_CallMethodOneArg(state->scheduled_tasks,
&_Py_ID(discard), task);
if (res == NULL) {
return -1;
assert(Task_Check(state, task));
assert(task != &state->asyncio_tasks.tail);
if (task->next == NULL) {
// not registered
assert(task->prev == NULL);
assert(state->asyncio_tasks.head != task);
return;
}
Py_DECREF(res);
return 0;
task->next->prev = task->prev;
if (task->prev == NULL) {
assert(state->asyncio_tasks.head == task);
state->asyncio_tasks.head = task->next;
} else {
task->prev->next = task->next;
}
task->next = NULL;
task->prev = NULL;
assert(state->asyncio_tasks.head != task);
}
static int
@ -2178,7 +2241,8 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop,
if (task_call_step_soon(state, self, NULL)) {
return -1;
}
return register_task(state, (PyObject*)self);
register_task(state, self);
return 0;
}
static int
@ -2586,6 +2650,15 @@ _asyncio_Task_set_name(TaskObj *self, PyObject *value)
static void
TaskObj_finalize(TaskObj *task)
{
asyncio_state *state = get_asyncio_state_by_def((PyObject *)task);
// Unregister the task from the linked list of tasks.
// Since task is a native task, we directly call the
// unregister_task function. Third party event loops
// should use the asyncio._unregister_task function.
// See https://docs.python.org/3/library/asyncio-extending.html#task-lifetime-support
unregister_task(state, task);
PyObject *context;
PyObject *message = NULL;
PyObject *func;
@ -3197,9 +3270,7 @@ task_eager_start(asyncio_state *state, TaskObj *task)
}
if (task->task_state == STATE_PENDING) {
if (register_task(state, (PyObject *)task) == -1) {
retval = -1;
}
register_task(state, task);
} else {
// This seems to really help performance on pyperformance benchmarks
Py_CLEAR(task->task_coro);
@ -3365,9 +3436,20 @@ _asyncio__register_task_impl(PyObject *module, PyObject *task)
/*[clinic end generated code: output=8672dadd69a7d4e2 input=21075aaea14dfbad]*/
{
asyncio_state *state = get_asyncio_state(module);
if (register_task(state, task) < 0) {
if (Task_Check(state, task)) {
// task is an asyncio.Task instance or subclass, use efficient
// linked-list implementation.
register_task(state, (TaskObj *)task);
Py_RETURN_NONE;
}
// As task does not inherit from asyncio.Task, fallback to less efficient
// weakset implementation.
PyObject *res = PyObject_CallMethodOneArg(state->non_asyncio_tasks,
&_Py_ID(add), task);
if (res == NULL) {
return NULL;
}
Py_DECREF(res);
Py_RETURN_NONE;
}
@ -3408,9 +3490,16 @@ _asyncio__unregister_task_impl(PyObject *module, PyObject *task)
/*[clinic end generated code: output=6e5585706d568a46 input=28fb98c3975f7bdc]*/
{
asyncio_state *state = get_asyncio_state(module);
if (unregister_task(state, task) < 0) {
if (Task_Check(state, task)) {
unregister_task(state, (TaskObj *)task);
Py_RETURN_NONE;
}
PyObject *res = PyObject_CallMethodOneArg(state->non_asyncio_tasks,
&_Py_ID(discard), task);
if (res == NULL) {
return NULL;
}
Py_DECREF(res);
Py_RETURN_NONE;
}
@ -3541,8 +3630,115 @@ _asyncio_current_task_impl(PyObject *module, PyObject *loop)
}
static inline int
add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *loop)
{
PyObject *done = PyObject_CallMethodNoArgs(task, &_Py_ID(done));
if (done == NULL) {
return -1;
}
if (Py_IsTrue(done)) {
return 0;
}
Py_DECREF(done);
PyObject *task_loop = get_future_loop(state, task);
if (task_loop == NULL) {
return -1;
}
if (task_loop == loop) {
if (PySet_Add(tasks, task) < 0) {
Py_DECREF(task_loop);
return -1;
}
}
Py_DECREF(task_loop);
return 0;
}
/*********************** Module **************************/
/*[clinic input]
_asyncio.all_tasks
loop: object = None
Return a set of all tasks for the loop.
[clinic start generated code]*/
static PyObject *
_asyncio_all_tasks_impl(PyObject *module, PyObject *loop)
/*[clinic end generated code: output=0e107cbb7f72aa7b input=43a1b423c2d95bfa]*/
{
asyncio_state *state = get_asyncio_state(module);
PyObject *tasks = PySet_New(NULL);
if (tasks == NULL) {
return NULL;
}
if (loop == Py_None) {
loop = _asyncio_get_running_loop_impl(module);
if (loop == NULL) {
Py_DECREF(tasks);
return NULL;
}
} else {
Py_INCREF(loop);
}
// First add eager tasks to the set so that we don't miss
// any tasks which graduates from eager to non-eager
PyObject *eager_iter = PyObject_GetIter(state->eager_tasks);
if (eager_iter == NULL) {
Py_DECREF(tasks);
Py_DECREF(loop);
return NULL;
}
PyObject *item;
while ((item = PyIter_Next(eager_iter)) != NULL) {
if (add_one_task(state, tasks, item, loop) < 0) {
Py_DECREF(tasks);
Py_DECREF(loop);
Py_DECREF(item);
Py_DECREF(eager_iter);
return NULL;
}
Py_DECREF(item);
}
Py_DECREF(eager_iter);
TaskObj *head = state->asyncio_tasks.head;
assert(head != NULL);
assert(head->prev == NULL);
TaskObj *tail = &state->asyncio_tasks.tail;
while (head != tail)
{
if (add_one_task(state, tasks, (PyObject *)head, loop) < 0) {
Py_DECREF(tasks);
Py_DECREF(loop);
return NULL;
}
head = head->next;
assert(head != NULL);
}
PyObject *scheduled_iter = PyObject_GetIter(state->non_asyncio_tasks);
if (scheduled_iter == NULL) {
Py_DECREF(tasks);
Py_DECREF(loop);
return NULL;
}
while ((item = PyIter_Next(scheduled_iter)) != NULL) {
if (add_one_task(state, tasks, item, loop) < 0) {
Py_DECREF(tasks);
Py_DECREF(loop);
Py_DECREF(item);
Py_DECREF(scheduled_iter);
return NULL;
}
Py_DECREF(item);
}
Py_DECREF(scheduled_iter);
Py_DECREF(loop);
return tasks;
}
static void
module_free_freelists(asyncio_state *state)
@ -3584,7 +3780,7 @@ module_traverse(PyObject *mod, visitproc visit, void *arg)
Py_VISIT(state->asyncio_InvalidStateError);
Py_VISIT(state->asyncio_CancelledError);
Py_VISIT(state->scheduled_tasks);
Py_VISIT(state->non_asyncio_tasks);
Py_VISIT(state->eager_tasks);
Py_VISIT(state->current_tasks);
Py_VISIT(state->iscoroutine_typecache);
@ -3622,7 +3818,7 @@ module_clear(PyObject *mod)
Py_CLEAR(state->asyncio_InvalidStateError);
Py_CLEAR(state->asyncio_CancelledError);
Py_CLEAR(state->scheduled_tasks);
Py_CLEAR(state->non_asyncio_tasks);
Py_CLEAR(state->eager_tasks);
Py_CLEAR(state->current_tasks);
Py_CLEAR(state->iscoroutine_typecache);
@ -3703,9 +3899,9 @@ module_init(asyncio_state *state)
PyObject *weak_set;
WITH_MOD("weakref")
GET_MOD_ATTR(weak_set, "WeakSet");
state->scheduled_tasks = PyObject_CallNoArgs(weak_set);
state->non_asyncio_tasks = PyObject_CallNoArgs(weak_set);
Py_CLEAR(weak_set);
if (state->scheduled_tasks == NULL) {
if (state->non_asyncio_tasks == NULL) {
goto fail;
}
@ -3740,6 +3936,7 @@ static PyMethodDef asyncio_methods[] = {
_ASYNCIO__ENTER_TASK_METHODDEF
_ASYNCIO__LEAVE_TASK_METHODDEF
_ASYNCIO__SWAP_CURRENT_TASK_METHODDEF
_ASYNCIO_ALL_TASKS_METHODDEF
{NULL, NULL}
};
@ -3747,6 +3944,7 @@ static int
module_exec(PyObject *mod)
{
asyncio_state *state = get_asyncio_state(mod);
state->asyncio_tasks.head = &state->asyncio_tasks.tail;
#define CREATE_TYPE(m, tp, spec, base) \
do { \
@ -3776,7 +3974,7 @@ module_exec(PyObject *mod)
return -1;
}
if (PyModule_AddObjectRef(mod, "_scheduled_tasks", state->scheduled_tasks) < 0) {
if (PyModule_AddObjectRef(mod, "_scheduled_tasks", state->non_asyncio_tasks) < 0) {
return -1;
}

View file

@ -1487,4 +1487,64 @@ skip_optional_pos:
exit:
return return_value;
}
/*[clinic end generated code: output=b26155080c82c472 input=a9049054013a1b77]*/
PyDoc_STRVAR(_asyncio_all_tasks__doc__,
"all_tasks($module, /, loop=None)\n"
"--\n"
"\n"
"Return a set of all tasks for the loop.");
#define _ASYNCIO_ALL_TASKS_METHODDEF \
{"all_tasks", _PyCFunction_CAST(_asyncio_all_tasks), METH_FASTCALL|METH_KEYWORDS, _asyncio_all_tasks__doc__},
static PyObject *
_asyncio_all_tasks_impl(PyObject *module, PyObject *loop);
static PyObject *
_asyncio_all_tasks(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
{
PyObject *return_value = NULL;
#if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE)
#define NUM_KEYWORDS 1
static struct {
PyGC_Head _this_is_not_used;
PyObject_VAR_HEAD
PyObject *ob_item[NUM_KEYWORDS];
} _kwtuple = {
.ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS)
.ob_item = { &_Py_ID(loop), },
};
#undef NUM_KEYWORDS
#define KWTUPLE (&_kwtuple.ob_base.ob_base)
#else // !Py_BUILD_CORE
# define KWTUPLE NULL
#endif // !Py_BUILD_CORE
static const char * const _keywords[] = {"loop", NULL};
static _PyArg_Parser _parser = {
.keywords = _keywords,
.fname = "all_tasks",
.kwtuple = KWTUPLE,
};
#undef KWTUPLE
PyObject *argsbuf[1];
Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 0;
PyObject *loop = Py_None;
args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 0, 1, 0, argsbuf);
if (!args) {
goto exit;
}
if (!noptargs) {
goto skip_optional_pos;
}
loop = args[0];
skip_optional_pos:
return_value = _asyncio_all_tasks_impl(module, loop);
exit:
return return_value;
}
/*[clinic end generated code: output=ffe9b71bc65888b3 input=a9049054013a1b77]*/