cpython/Lib/test/test_asyncio/test_tools.py

1706 lines
63 KiB
Python

import unittest
from asyncio import tools
from collections import namedtuple
FrameInfo = namedtuple('FrameInfo', ['funcname', 'filename', 'lineno'])
CoroInfo = namedtuple('CoroInfo', ['call_stack', 'task_name'])
TaskInfo = namedtuple('TaskInfo', ['task_id', 'task_name', 'coroutine_stack', 'awaited_by'])
AwaitedInfo = namedtuple('AwaitedInfo', ['thread_id', 'awaited_by'])
# mock output of get_all_awaited_by function.
TEST_INPUTS_TREE = [
[
# test case containing a task called timer being awaited in two
# different subtasks part of a TaskGroup (root1 and root2) which call
# awaiter functions.
(
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=2,
task_name="Task-1",
coroutine_stack=[],
awaited_by=[]
),
TaskInfo(
task_id=3,
task_name="timer",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("awaiter3", "/path/to/app.py", 130),
FrameInfo("awaiter2", "/path/to/app.py", 120),
FrameInfo("awaiter", "/path/to/app.py", 110)
],
task_name=4
),
CoroInfo(
call_stack=[
FrameInfo("awaiterB3", "/path/to/app.py", 190),
FrameInfo("awaiterB2", "/path/to/app.py", 180),
FrameInfo("awaiterB", "/path/to/app.py", 170)
],
task_name=5
),
CoroInfo(
call_stack=[
FrameInfo("awaiterB3", "/path/to/app.py", 190),
FrameInfo("awaiterB2", "/path/to/app.py", 180),
FrameInfo("awaiterB", "/path/to/app.py", 170)
],
task_name=6
),
CoroInfo(
call_stack=[
FrameInfo("awaiter3", "/path/to/app.py", 130),
FrameInfo("awaiter2", "/path/to/app.py", 120),
FrameInfo("awaiter", "/path/to/app.py", 110)
],
task_name=7
)
]
),
TaskInfo(
task_id=8,
task_name="root1",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("_aexit", "", 0),
FrameInfo("__aexit__", "", 0),
FrameInfo("main", "", 0)
],
task_name=2
)
]
),
TaskInfo(
task_id=9,
task_name="root2",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("_aexit", "", 0),
FrameInfo("__aexit__", "", 0),
FrameInfo("main", "", 0)
],
task_name=2
)
]
),
TaskInfo(
task_id=4,
task_name="child1_1",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("_aexit", "", 0),
FrameInfo("__aexit__", "", 0),
FrameInfo("blocho_caller", "", 0),
FrameInfo("bloch", "", 0)
],
task_name=8
)
]
),
TaskInfo(
task_id=6,
task_name="child2_1",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("_aexit", "", 0),
FrameInfo("__aexit__", "", 0),
FrameInfo("blocho_caller", "", 0),
FrameInfo("bloch", "", 0)
],
task_name=8
)
]
),
TaskInfo(
task_id=7,
task_name="child1_2",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("_aexit", "", 0),
FrameInfo("__aexit__", "", 0),
FrameInfo("blocho_caller", "", 0),
FrameInfo("bloch", "", 0)
],
task_name=9
)
]
),
TaskInfo(
task_id=5,
task_name="child2_2",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("_aexit", "", 0),
FrameInfo("__aexit__", "", 0),
FrameInfo("blocho_caller", "", 0),
FrameInfo("bloch", "", 0)
],
task_name=9
)
]
)
]
),
AwaitedInfo(thread_id=0, awaited_by=[])
),
(
[
[
"└── (T) Task-1",
" └── main",
" └── __aexit__",
" └── _aexit",
" ├── (T) root1",
" │ └── bloch",
" │ └── blocho_caller",
" │ └── __aexit__",
" │ └── _aexit",
" │ ├── (T) child1_1",
" │ │ └── awaiter /path/to/app.py:110",
" │ │ └── awaiter2 /path/to/app.py:120",
" │ │ └── awaiter3 /path/to/app.py:130",
" │ │ └── (T) timer",
" │ └── (T) child2_1",
" │ └── awaiterB /path/to/app.py:170",
" │ └── awaiterB2 /path/to/app.py:180",
" │ └── awaiterB3 /path/to/app.py:190",
" │ └── (T) timer",
" └── (T) root2",
" └── bloch",
" └── blocho_caller",
" └── __aexit__",
" └── _aexit",
" ├── (T) child1_2",
" │ └── awaiter /path/to/app.py:110",
" │ └── awaiter2 /path/to/app.py:120",
" │ └── awaiter3 /path/to/app.py:130",
" │ └── (T) timer",
" └── (T) child2_2",
" └── awaiterB /path/to/app.py:170",
" └── awaiterB2 /path/to/app.py:180",
" └── awaiterB3 /path/to/app.py:190",
" └── (T) timer",
]
]
),
],
[
# test case containing two roots
(
AwaitedInfo(
thread_id=9,
awaited_by=[
TaskInfo(
task_id=5,
task_name="Task-5",
coroutine_stack=[],
awaited_by=[]
),
TaskInfo(
task_id=6,
task_name="Task-6",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main2", "", 0)],
task_name=5
)
]
),
TaskInfo(
task_id=7,
task_name="Task-7",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main2", "", 0)],
task_name=5
)
]
),
TaskInfo(
task_id=8,
task_name="Task-8",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main2", "", 0)],
task_name=5
)
]
)
]
),
AwaitedInfo(
thread_id=10,
awaited_by=[
TaskInfo(
task_id=1,
task_name="Task-1",
coroutine_stack=[],
awaited_by=[]
),
TaskInfo(
task_id=2,
task_name="Task-2",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main", "", 0)],
task_name=1
)
]
),
TaskInfo(
task_id=3,
task_name="Task-3",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main", "", 0)],
task_name=1
)
]
),
TaskInfo(
task_id=4,
task_name="Task-4",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main", "", 0)],
task_name=1
)
]
)
]
),
AwaitedInfo(thread_id=11, awaited_by=[]),
AwaitedInfo(thread_id=0, awaited_by=[])
),
(
[
[
"└── (T) Task-5",
" └── main2",
" ├── (T) Task-6",
" ├── (T) Task-7",
" └── (T) Task-8",
],
[
"└── (T) Task-1",
" └── main",
" ├── (T) Task-2",
" ├── (T) Task-3",
" └── (T) Task-4",
],
]
),
],
[
# test case containing two roots, one of them without subtasks
(
[
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=2,
task_name="Task-5",
coroutine_stack=[],
awaited_by=[]
)
]
),
AwaitedInfo(
thread_id=3,
awaited_by=[
TaskInfo(
task_id=4,
task_name="Task-1",
coroutine_stack=[],
awaited_by=[]
),
TaskInfo(
task_id=5,
task_name="Task-2",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main", "", 0)],
task_name=4
)
]
),
TaskInfo(
task_id=6,
task_name="Task-3",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main", "", 0)],
task_name=4
)
]
),
TaskInfo(
task_id=7,
task_name="Task-4",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main", "", 0)],
task_name=4
)
]
)
]
),
AwaitedInfo(thread_id=8, awaited_by=[]),
AwaitedInfo(thread_id=0, awaited_by=[])
]
),
(
[
["└── (T) Task-5"],
[
"└── (T) Task-1",
" └── main",
" ├── (T) Task-2",
" ├── (T) Task-3",
" └── (T) Task-4",
],
]
),
],
]
TEST_INPUTS_CYCLES_TREE = [
[
# this test case contains a cycle: two tasks awaiting each other.
(
[
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=2,
task_name="Task-1",
coroutine_stack=[],
awaited_by=[]
),
TaskInfo(
task_id=3,
task_name="a",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("awaiter2", "", 0)],
task_name=4
),
CoroInfo(
call_stack=[FrameInfo("main", "", 0)],
task_name=2
)
]
),
TaskInfo(
task_id=4,
task_name="b",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("awaiter", "", 0)],
task_name=3
)
]
)
]
),
AwaitedInfo(thread_id=0, awaited_by=[])
]
),
([[4, 3, 4]]),
],
[
# this test case contains two cycles
(
[
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=2,
task_name="Task-1",
coroutine_stack=[],
awaited_by=[]
),
TaskInfo(
task_id=3,
task_name="A",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("nested", "", 0),
FrameInfo("nested", "", 0),
FrameInfo("task_b", "", 0)
],
task_name=4
)
]
),
TaskInfo(
task_id=4,
task_name="B",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("nested", "", 0),
FrameInfo("nested", "", 0),
FrameInfo("task_c", "", 0)
],
task_name=5
),
CoroInfo(
call_stack=[
FrameInfo("nested", "", 0),
FrameInfo("nested", "", 0),
FrameInfo("task_a", "", 0)
],
task_name=3
)
]
),
TaskInfo(
task_id=5,
task_name="C",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("nested", "", 0),
FrameInfo("nested", "", 0)
],
task_name=6
)
]
),
TaskInfo(
task_id=6,
task_name="Task-2",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("nested", "", 0),
FrameInfo("nested", "", 0),
FrameInfo("task_b", "", 0)
],
task_name=4
)
]
)
]
),
AwaitedInfo(thread_id=0, awaited_by=[])
]
),
([[4, 3, 4], [4, 6, 5, 4]]),
],
]
TEST_INPUTS_TABLE = [
[
# test case containing a task called timer being awaited in two
# different subtasks part of a TaskGroup (root1 and root2) which call
# awaiter functions.
(
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=2,
task_name="Task-1",
coroutine_stack=[],
awaited_by=[]
),
TaskInfo(
task_id=3,
task_name="timer",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("awaiter3", "", 0),
FrameInfo("awaiter2", "", 0),
FrameInfo("awaiter", "", 0)
],
task_name=4
),
CoroInfo(
call_stack=[
FrameInfo("awaiter1_3", "", 0),
FrameInfo("awaiter1_2", "", 0),
FrameInfo("awaiter1", "", 0)
],
task_name=5
),
CoroInfo(
call_stack=[
FrameInfo("awaiter1_3", "", 0),
FrameInfo("awaiter1_2", "", 0),
FrameInfo("awaiter1", "", 0)
],
task_name=6
),
CoroInfo(
call_stack=[
FrameInfo("awaiter3", "", 0),
FrameInfo("awaiter2", "", 0),
FrameInfo("awaiter", "", 0)
],
task_name=7
)
]
),
TaskInfo(
task_id=8,
task_name="root1",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("_aexit", "", 0),
FrameInfo("__aexit__", "", 0),
FrameInfo("main", "", 0)
],
task_name=2
)
]
),
TaskInfo(
task_id=9,
task_name="root2",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("_aexit", "", 0),
FrameInfo("__aexit__", "", 0),
FrameInfo("main", "", 0)
],
task_name=2
)
]
),
TaskInfo(
task_id=4,
task_name="child1_1",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("_aexit", "", 0),
FrameInfo("__aexit__", "", 0),
FrameInfo("blocho_caller", "", 0),
FrameInfo("bloch", "", 0)
],
task_name=8
)
]
),
TaskInfo(
task_id=6,
task_name="child2_1",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("_aexit", "", 0),
FrameInfo("__aexit__", "", 0),
FrameInfo("blocho_caller", "", 0),
FrameInfo("bloch", "", 0)
],
task_name=8
)
]
),
TaskInfo(
task_id=7,
task_name="child1_2",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("_aexit", "", 0),
FrameInfo("__aexit__", "", 0),
FrameInfo("blocho_caller", "", 0),
FrameInfo("bloch", "", 0)
],
task_name=9
)
]
),
TaskInfo(
task_id=5,
task_name="child2_2",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("_aexit", "", 0),
FrameInfo("__aexit__", "", 0),
FrameInfo("blocho_caller", "", 0),
FrameInfo("bloch", "", 0)
],
task_name=9
)
]
)
]
),
AwaitedInfo(thread_id=0, awaited_by=[])
),
(
[
[1, "0x2", "Task-1", "", "", "", "0x0"],
[
1,
"0x3",
"timer",
"",
"awaiter3 -> awaiter2 -> awaiter",
"child1_1",
"0x4",
],
[
1,
"0x3",
"timer",
"",
"awaiter1_3 -> awaiter1_2 -> awaiter1",
"child2_2",
"0x5",
],
[
1,
"0x3",
"timer",
"",
"awaiter1_3 -> awaiter1_2 -> awaiter1",
"child2_1",
"0x6",
],
[
1,
"0x3",
"timer",
"",
"awaiter3 -> awaiter2 -> awaiter",
"child1_2",
"0x7",
],
[
1,
"0x8",
"root1",
"",
"_aexit -> __aexit__ -> main",
"Task-1",
"0x2",
],
[
1,
"0x9",
"root2",
"",
"_aexit -> __aexit__ -> main",
"Task-1",
"0x2",
],
[
1,
"0x4",
"child1_1",
"",
"_aexit -> __aexit__ -> blocho_caller -> bloch",
"root1",
"0x8",
],
[
1,
"0x6",
"child2_1",
"",
"_aexit -> __aexit__ -> blocho_caller -> bloch",
"root1",
"0x8",
],
[
1,
"0x7",
"child1_2",
"",
"_aexit -> __aexit__ -> blocho_caller -> bloch",
"root2",
"0x9",
],
[
1,
"0x5",
"child2_2",
"",
"_aexit -> __aexit__ -> blocho_caller -> bloch",
"root2",
"0x9",
],
]
),
],
[
# test case containing two roots
(
AwaitedInfo(
thread_id=9,
awaited_by=[
TaskInfo(
task_id=5,
task_name="Task-5",
coroutine_stack=[],
awaited_by=[]
),
TaskInfo(
task_id=6,
task_name="Task-6",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main2", "", 0)],
task_name=5
)
]
),
TaskInfo(
task_id=7,
task_name="Task-7",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main2", "", 0)],
task_name=5
)
]
),
TaskInfo(
task_id=8,
task_name="Task-8",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main2", "", 0)],
task_name=5
)
]
)
]
),
AwaitedInfo(
thread_id=10,
awaited_by=[
TaskInfo(
task_id=1,
task_name="Task-1",
coroutine_stack=[],
awaited_by=[]
),
TaskInfo(
task_id=2,
task_name="Task-2",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main", "", 0)],
task_name=1
)
]
),
TaskInfo(
task_id=3,
task_name="Task-3",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main", "", 0)],
task_name=1
)
]
),
TaskInfo(
task_id=4,
task_name="Task-4",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main", "", 0)],
task_name=1
)
]
)
]
),
AwaitedInfo(thread_id=11, awaited_by=[]),
AwaitedInfo(thread_id=0, awaited_by=[])
),
(
[
[9, "0x5", "Task-5", "", "", "", "0x0"],
[9, "0x6", "Task-6", "", "main2", "Task-5", "0x5"],
[9, "0x7", "Task-7", "", "main2", "Task-5", "0x5"],
[9, "0x8", "Task-8", "", "main2", "Task-5", "0x5"],
[10, "0x1", "Task-1", "", "", "", "0x0"],
[10, "0x2", "Task-2", "", "main", "Task-1", "0x1"],
[10, "0x3", "Task-3", "", "main", "Task-1", "0x1"],
[10, "0x4", "Task-4", "", "main", "Task-1", "0x1"],
]
),
],
[
# test case containing two roots, one of them without subtasks
(
[
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=2,
task_name="Task-5",
coroutine_stack=[],
awaited_by=[]
)
]
),
AwaitedInfo(
thread_id=3,
awaited_by=[
TaskInfo(
task_id=4,
task_name="Task-1",
coroutine_stack=[],
awaited_by=[]
),
TaskInfo(
task_id=5,
task_name="Task-2",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main", "", 0)],
task_name=4
)
]
),
TaskInfo(
task_id=6,
task_name="Task-3",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main", "", 0)],
task_name=4
)
]
),
TaskInfo(
task_id=7,
task_name="Task-4",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main", "", 0)],
task_name=4
)
]
)
]
),
AwaitedInfo(thread_id=8, awaited_by=[]),
AwaitedInfo(thread_id=0, awaited_by=[])
]
),
(
[
[1, "0x2", "Task-5", "", "", "", "0x0"],
[3, "0x4", "Task-1", "", "", "", "0x0"],
[3, "0x5", "Task-2", "", "main", "Task-1", "0x4"],
[3, "0x6", "Task-3", "", "main", "Task-1", "0x4"],
[3, "0x7", "Task-4", "", "main", "Task-1", "0x4"],
]
),
],
# CASES WITH CYCLES
[
# this test case contains a cycle: two tasks awaiting each other.
(
[
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=2,
task_name="Task-1",
coroutine_stack=[],
awaited_by=[]
),
TaskInfo(
task_id=3,
task_name="a",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("awaiter2", "", 0)],
task_name=4
),
CoroInfo(
call_stack=[FrameInfo("main", "", 0)],
task_name=2
)
]
),
TaskInfo(
task_id=4,
task_name="b",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("awaiter", "", 0)],
task_name=3
)
]
)
]
),
AwaitedInfo(thread_id=0, awaited_by=[])
]
),
(
[
[1, "0x2", "Task-1", "", "", "", "0x0"],
[1, "0x3", "a", "", "awaiter2", "b", "0x4"],
[1, "0x3", "a", "", "main", "Task-1", "0x2"],
[1, "0x4", "b", "", "awaiter", "a", "0x3"],
]
),
],
[
# this test case contains two cycles
(
[
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=2,
task_name="Task-1",
coroutine_stack=[],
awaited_by=[]
),
TaskInfo(
task_id=3,
task_name="A",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("nested", "", 0),
FrameInfo("nested", "", 0),
FrameInfo("task_b", "", 0)
],
task_name=4
)
]
),
TaskInfo(
task_id=4,
task_name="B",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("nested", "", 0),
FrameInfo("nested", "", 0),
FrameInfo("task_c", "", 0)
],
task_name=5
),
CoroInfo(
call_stack=[
FrameInfo("nested", "", 0),
FrameInfo("nested", "", 0),
FrameInfo("task_a", "", 0)
],
task_name=3
)
]
),
TaskInfo(
task_id=5,
task_name="C",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("nested", "", 0),
FrameInfo("nested", "", 0)
],
task_name=6
)
]
),
TaskInfo(
task_id=6,
task_name="Task-2",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("nested", "", 0),
FrameInfo("nested", "", 0),
FrameInfo("task_b", "", 0)
],
task_name=4
)
]
)
]
),
AwaitedInfo(thread_id=0, awaited_by=[])
]
),
(
[
[1, "0x2", "Task-1", "", "", "", "0x0"],
[
1,
"0x3",
"A",
"",
"nested -> nested -> task_b",
"B",
"0x4",
],
[
1,
"0x4",
"B",
"",
"nested -> nested -> task_c",
"C",
"0x5",
],
[
1,
"0x4",
"B",
"",
"nested -> nested -> task_a",
"A",
"0x3",
],
[
1,
"0x5",
"C",
"",
"nested -> nested",
"Task-2",
"0x6",
],
[
1,
"0x6",
"Task-2",
"",
"nested -> nested -> task_b",
"B",
"0x4",
],
]
),
],
]
class TestAsyncioToolsTree(unittest.TestCase):
def test_asyncio_utils(self):
for input_, tree in TEST_INPUTS_TREE:
with self.subTest(input_):
result = tools.build_async_tree(input_)
self.assertEqual(result, tree)
def test_asyncio_utils_cycles(self):
for input_, cycles in TEST_INPUTS_CYCLES_TREE:
with self.subTest(input_):
try:
tools.build_async_tree(input_)
except tools.CycleFoundException as e:
self.assertEqual(e.cycles, cycles)
class TestAsyncioToolsTable(unittest.TestCase):
def test_asyncio_utils(self):
for input_, table in TEST_INPUTS_TABLE:
with self.subTest(input_):
result = tools.build_task_table(input_)
self.assertEqual(result, table)
class TestAsyncioToolsBasic(unittest.TestCase):
def test_empty_input_tree(self):
"""Test build_async_tree with empty input."""
result = []
expected_output = []
self.assertEqual(tools.build_async_tree(result), expected_output)
def test_empty_input_table(self):
"""Test build_task_table with empty input."""
result = []
expected_output = []
self.assertEqual(tools.build_task_table(result), expected_output)
def test_only_independent_tasks_tree(self):
input_ = [
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=10,
task_name="taskA",
coroutine_stack=[],
awaited_by=[]
),
TaskInfo(
task_id=11,
task_name="taskB",
coroutine_stack=[],
awaited_by=[]
)
]
)
]
expected = [["└── (T) taskA"], ["└── (T) taskB"]]
result = tools.build_async_tree(input_)
self.assertEqual(sorted(result), sorted(expected))
def test_only_independent_tasks_table(self):
input_ = [
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=10,
task_name="taskA",
coroutine_stack=[],
awaited_by=[]
),
TaskInfo(
task_id=11,
task_name="taskB",
coroutine_stack=[],
awaited_by=[]
)
]
)
]
self.assertEqual(
tools.build_task_table(input_),
[[1, '0xa', 'taskA', '', '', '', '0x0'], [1, '0xb', 'taskB', '', '', '', '0x0']]
)
def test_single_task_tree(self):
"""Test build_async_tree with a single task and no awaits."""
result = [
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=2,
task_name="Task-1",
coroutine_stack=[],
awaited_by=[]
)
]
)
]
expected_output = [
[
"└── (T) Task-1",
]
]
self.assertEqual(tools.build_async_tree(result), expected_output)
def test_single_task_table(self):
"""Test build_task_table with a single task and no awaits."""
result = [
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=2,
task_name="Task-1",
coroutine_stack=[],
awaited_by=[]
)
]
)
]
expected_output = [[1, '0x2', 'Task-1', '', '', '', '0x0']]
self.assertEqual(tools.build_task_table(result), expected_output)
def test_cycle_detection(self):
"""Test build_async_tree raises CycleFoundException for cyclic input."""
result = [
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=2,
task_name="Task-1",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main", "", 0)],
task_name=3
)
]
),
TaskInfo(
task_id=3,
task_name="Task-2",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main", "", 0)],
task_name=2
)
]
)
]
)
]
with self.assertRaises(tools.CycleFoundException) as context:
tools.build_async_tree(result)
self.assertEqual(context.exception.cycles, [[3, 2, 3]])
def test_complex_tree(self):
"""Test build_async_tree with a more complex tree structure."""
result = [
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=2,
task_name="Task-1",
coroutine_stack=[],
awaited_by=[]
),
TaskInfo(
task_id=3,
task_name="Task-2",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main", "", 0)],
task_name=2
)
]
),
TaskInfo(
task_id=4,
task_name="Task-3",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main", "", 0)],
task_name=3
)
]
)
]
)
]
expected_output = [
[
"└── (T) Task-1",
" └── main",
" └── (T) Task-2",
" └── main",
" └── (T) Task-3",
]
]
self.assertEqual(tools.build_async_tree(result), expected_output)
def test_complex_table(self):
"""Test build_task_table with a more complex tree structure."""
result = [
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=2,
task_name="Task-1",
coroutine_stack=[],
awaited_by=[]
),
TaskInfo(
task_id=3,
task_name="Task-2",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main", "", 0)],
task_name=2
)
]
),
TaskInfo(
task_id=4,
task_name="Task-3",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("main", "", 0)],
task_name=3
)
]
)
]
)
]
expected_output = [
[1, '0x2', 'Task-1', '', '', '', '0x0'],
[1, '0x3', 'Task-2', '', 'main', 'Task-1', '0x2'],
[1, '0x4', 'Task-3', '', 'main', 'Task-2', '0x3']
]
self.assertEqual(tools.build_task_table(result), expected_output)
def test_deep_coroutine_chain(self):
input_ = [
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=10,
task_name="leaf",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("c1", "", 0),
FrameInfo("c2", "", 0),
FrameInfo("c3", "", 0),
FrameInfo("c4", "", 0),
FrameInfo("c5", "", 0)
],
task_name=11
)
]
),
TaskInfo(
task_id=11,
task_name="root",
coroutine_stack=[],
awaited_by=[]
)
]
)
]
expected = [
[
"└── (T) root",
" └── c5",
" └── c4",
" └── c3",
" └── c2",
" └── c1",
" └── (T) leaf",
]
]
result = tools.build_async_tree(input_)
self.assertEqual(result, expected)
def test_multiple_cycles_same_node(self):
input_ = [
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=1,
task_name="Task-A",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("call1", "", 0)],
task_name=2
)
]
),
TaskInfo(
task_id=2,
task_name="Task-B",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("call2", "", 0)],
task_name=3
)
]
),
TaskInfo(
task_id=3,
task_name="Task-C",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("call3", "", 0)],
task_name=1
),
CoroInfo(
call_stack=[FrameInfo("call4", "", 0)],
task_name=2
)
]
)
]
)
]
with self.assertRaises(tools.CycleFoundException) as ctx:
tools.build_async_tree(input_)
cycles = ctx.exception.cycles
self.assertTrue(any(set(c) == {1, 2, 3} for c in cycles))
def test_table_output_format(self):
input_ = [
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=1,
task_name="Task-A",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("foo", "", 0)],
task_name=2
)
]
),
TaskInfo(
task_id=2,
task_name="Task-B",
coroutine_stack=[],
awaited_by=[]
)
]
)
]
table = tools.build_task_table(input_)
for row in table:
self.assertEqual(len(row), 7)
self.assertIsInstance(row[0], int) # thread ID
self.assertTrue(
isinstance(row[1], str) and row[1].startswith("0x")
) # hex task ID
self.assertIsInstance(row[2], str) # task name
self.assertIsInstance(row[3], str) # coroutine stack
self.assertIsInstance(row[4], str) # coroutine chain
self.assertIsInstance(row[5], str) # awaiter name
self.assertTrue(
isinstance(row[6], str) and row[6].startswith("0x")
) # hex awaiter ID
class TestAsyncioToolsEdgeCases(unittest.TestCase):
def test_task_awaits_self(self):
"""A task directly awaits itself - should raise a cycle."""
input_ = [
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=1,
task_name="Self-Awaiter",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("loopback", "", 0)],
task_name=1
)
]
)
]
)
]
with self.assertRaises(tools.CycleFoundException) as ctx:
tools.build_async_tree(input_)
self.assertIn([1, 1], ctx.exception.cycles)
def test_task_with_missing_awaiter_id(self):
"""Awaiter ID not in task list - should not crash, just show 'Unknown'."""
input_ = [
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=1,
task_name="Task-A",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("coro", "", 0)],
task_name=999
)
]
)
]
)
]
table = tools.build_task_table(input_)
self.assertEqual(len(table), 1)
self.assertEqual(table[0][5], "Unknown")
def test_duplicate_coroutine_frames(self):
"""Same coroutine frame repeated under a parent - should deduplicate."""
input_ = [
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=1,
task_name="Task-1",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("frameA", "", 0)],
task_name=2
),
CoroInfo(
call_stack=[FrameInfo("frameA", "", 0)],
task_name=3
)
]
),
TaskInfo(
task_id=2,
task_name="Task-2",
coroutine_stack=[],
awaited_by=[]
),
TaskInfo(
task_id=3,
task_name="Task-3",
coroutine_stack=[],
awaited_by=[]
)
]
)
]
tree = tools.build_async_tree(input_)
# Both children should be under the same coroutine node
flat = "\n".join(tree[0])
self.assertIn("frameA", flat)
self.assertIn("Task-2", flat)
self.assertIn("Task-1", flat)
flat = "\n".join(tree[1])
self.assertIn("frameA", flat)
self.assertIn("Task-3", flat)
self.assertIn("Task-1", flat)
def test_task_with_no_name(self):
"""Task with no name in id2name - should still render with fallback."""
input_ = [
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=1,
task_name="root",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[FrameInfo("f1", "", 0)],
task_name=2
)
]
),
TaskInfo(
task_id=2,
task_name=None,
coroutine_stack=[],
awaited_by=[]
)
]
)
]
# If name is None, fallback to string should not crash
tree = tools.build_async_tree(input_)
self.assertIn("(T) None", "\n".join(tree[0]))
def test_tree_rendering_with_custom_emojis(self):
"""Pass custom emojis to the tree renderer."""
input_ = [
AwaitedInfo(
thread_id=1,
awaited_by=[
TaskInfo(
task_id=1,
task_name="MainTask",
coroutine_stack=[],
awaited_by=[
CoroInfo(
call_stack=[
FrameInfo("f1", "", 0),
FrameInfo("f2", "", 0)
],
task_name=2
)
]
),
TaskInfo(
task_id=2,
task_name="SubTask",
coroutine_stack=[],
awaited_by=[]
)
]
)
]
tree = tools.build_async_tree(input_, task_emoji="🧵", cor_emoji="🔁")
flat = "\n".join(tree[0])
self.assertIn("🧵 MainTask", flat)
self.assertIn("🔁 f1", flat)
self.assertIn("🔁 f2", flat)
self.assertIn("🧵 SubTask", flat)