mirror of
https://github.com/python/cpython.git
synced 2025-07-21 18:25:22 +00:00

asyncio/taskgroups.py is an adaptation of taskgroup.py from EdgeDb, with the following key changes: - Allow creating new tasks as long as the last task hasn't finished - Raise [Base]ExceptionGroup (directly) rather than TaskGroupError deriving from MultiError - Instead of monkey-patching the parent task's cancel() method, add a new public API to Task The Task class has a new internal flag, `_cancel_requested`, which is set when `.cancel()` is called successfully. The `.cancelling()` method returns the value of this flag. Further `.cancel()` calls while this flag is set return False. To reset this flag, call `.uncancel()`. Thus, a Task that catches and ignores `CancelledError` should call `.uncancel()` if it wants to be cancellable again; until it does so, it is deemed to be busy with uninterruptible cleanup. This new Task API helps solve the problem where TaskGroup needs to distinguish between whether the parent task being cancelled "from the outside" vs. "from inside". Co-authored-by: Yury Selivanov <yury@edgedb.com> Co-authored-by: Andrew Svetlov <andrew.svetlov@gmail.com>
85 lines
2.4 KiB
Python
85 lines
2.4 KiB
Python
import linecache
|
|
import traceback
|
|
|
|
from . import base_futures
|
|
from . import coroutines
|
|
|
|
|
|
def _task_repr_info(task):
|
|
info = base_futures._future_repr_info(task)
|
|
|
|
if task.cancelling() and not task.done():
|
|
# replace status
|
|
info[0] = 'cancelling'
|
|
|
|
info.insert(1, 'name=%r' % task.get_name())
|
|
|
|
coro = coroutines._format_coroutine(task._coro)
|
|
info.insert(2, f'coro=<{coro}>')
|
|
|
|
if task._fut_waiter is not None:
|
|
info.insert(3, f'wait_for={task._fut_waiter!r}')
|
|
return info
|
|
|
|
|
|
def _task_get_stack(task, limit):
|
|
frames = []
|
|
if hasattr(task._coro, 'cr_frame'):
|
|
# case 1: 'async def' coroutines
|
|
f = task._coro.cr_frame
|
|
elif hasattr(task._coro, 'gi_frame'):
|
|
# case 2: legacy coroutines
|
|
f = task._coro.gi_frame
|
|
elif hasattr(task._coro, 'ag_frame'):
|
|
# case 3: async generators
|
|
f = task._coro.ag_frame
|
|
else:
|
|
# case 4: unknown objects
|
|
f = None
|
|
if f is not None:
|
|
while f is not None:
|
|
if limit is not None:
|
|
if limit <= 0:
|
|
break
|
|
limit -= 1
|
|
frames.append(f)
|
|
f = f.f_back
|
|
frames.reverse()
|
|
elif task._exception is not None:
|
|
tb = task._exception.__traceback__
|
|
while tb is not None:
|
|
if limit is not None:
|
|
if limit <= 0:
|
|
break
|
|
limit -= 1
|
|
frames.append(tb.tb_frame)
|
|
tb = tb.tb_next
|
|
return frames
|
|
|
|
|
|
def _task_print_stack(task, limit, file):
|
|
extracted_list = []
|
|
checked = set()
|
|
for f in task.get_stack(limit=limit):
|
|
lineno = f.f_lineno
|
|
co = f.f_code
|
|
filename = co.co_filename
|
|
name = co.co_name
|
|
if filename not in checked:
|
|
checked.add(filename)
|
|
linecache.checkcache(filename)
|
|
line = linecache.getline(filename, lineno, f.f_globals)
|
|
extracted_list.append((filename, lineno, name, line))
|
|
|
|
exc = task._exception
|
|
if not extracted_list:
|
|
print(f'No stack for {task!r}', file=file)
|
|
elif exc is not None:
|
|
print(f'Traceback for {task!r} (most recent call last):', file=file)
|
|
else:
|
|
print(f'Stack for {task!r} (most recent call last):', file=file)
|
|
|
|
traceback.print_list(extracted_list, file=file)
|
|
if exc is not None:
|
|
for line in traceback.format_exception_only(exc.__class__, exc):
|
|
print(line, file=file, end='')
|