cpython/Lib/test/test_asyncio/test_tools.py

846 lines
28 KiB
Python

import unittest
from asyncio import tools
# mock output of get_all_awaited_by function.
TEST_INPUTS_TREE = [
[
# test case containing a task called timer being awaited in two
# different subtasks part of a TaskGroup (root1 and root2) which call
# awaiter functions.
(
(
1,
[
(2, "Task-1", []),
(
3,
"timer",
[
[[("awaiter3", "/path/to/app.py", 130),
("awaiter2", "/path/to/app.py", 120),
("awaiter", "/path/to/app.py", 110)], 4],
[[("awaiterB3", "/path/to/app.py", 190),
("awaiterB2", "/path/to/app.py", 180),
("awaiterB", "/path/to/app.py", 170)], 5],
[[("awaiterB3", "/path/to/app.py", 190),
("awaiterB2", "/path/to/app.py", 180),
("awaiterB", "/path/to/app.py", 170)], 6],
[[("awaiter3", "/path/to/app.py", 130),
("awaiter2", "/path/to/app.py", 120),
("awaiter", "/path/to/app.py", 110)], 7],
],
),
(
8,
"root1",
[[["_aexit", "__aexit__", "main"], 2]],
),
(
9,
"root2",
[[["_aexit", "__aexit__", "main"], 2]],
),
(
4,
"child1_1",
[
[
["_aexit", "__aexit__", "blocho_caller", "bloch"],
8,
]
],
),
(
6,
"child2_1",
[
[
["_aexit", "__aexit__", "blocho_caller", "bloch"],
8,
]
],
),
(
7,
"child1_2",
[
[
["_aexit", "__aexit__", "blocho_caller", "bloch"],
9,
]
],
),
(
5,
"child2_2",
[
[
["_aexit", "__aexit__", "blocho_caller", "bloch"],
9,
]
],
),
],
),
(0, []),
),
(
[
[
"└── (T) Task-1",
" └── main",
" └── __aexit__",
" └── _aexit",
" ├── (T) root1",
" │ └── bloch",
" │ └── blocho_caller",
" │ └── __aexit__",
" │ └── _aexit",
" │ ├── (T) child1_1",
" │ │ └── awaiter /path/to/app.py:110",
" │ │ └── awaiter2 /path/to/app.py:120",
" │ │ └── awaiter3 /path/to/app.py:130",
" │ │ └── (T) timer",
" │ └── (T) child2_1",
" │ └── awaiterB /path/to/app.py:170",
" │ └── awaiterB2 /path/to/app.py:180",
" │ └── awaiterB3 /path/to/app.py:190",
" │ └── (T) timer",
" └── (T) root2",
" └── bloch",
" └── blocho_caller",
" └── __aexit__",
" └── _aexit",
" ├── (T) child1_2",
" │ └── awaiter /path/to/app.py:110",
" │ └── awaiter2 /path/to/app.py:120",
" │ └── awaiter3 /path/to/app.py:130",
" │ └── (T) timer",
" └── (T) child2_2",
" └── awaiterB /path/to/app.py:170",
" └── awaiterB2 /path/to/app.py:180",
" └── awaiterB3 /path/to/app.py:190",
" └── (T) timer",
]
]
),
],
[
# test case containing two roots
(
(
9,
[
(5, "Task-5", []),
(6, "Task-6", [[["main2"], 5]]),
(7, "Task-7", [[["main2"], 5]]),
(8, "Task-8", [[["main2"], 5]]),
],
),
(
10,
[
(1, "Task-1", []),
(2, "Task-2", [[["main"], 1]]),
(3, "Task-3", [[["main"], 1]]),
(4, "Task-4", [[["main"], 1]]),
],
),
(11, []),
(0, []),
),
(
[
[
"└── (T) Task-5",
" └── main2",
" ├── (T) Task-6",
" ├── (T) Task-7",
" └── (T) Task-8",
],
[
"└── (T) Task-1",
" └── main",
" ├── (T) Task-2",
" ├── (T) Task-3",
" └── (T) Task-4",
],
]
),
],
[
# test case containing two roots, one of them without subtasks
(
[
(1, [(2, "Task-5", [])]),
(
3,
[
(4, "Task-1", []),
(5, "Task-2", [[["main"], 4]]),
(6, "Task-3", [[["main"], 4]]),
(7, "Task-4", [[["main"], 4]]),
],
),
(8, []),
(0, []),
]
),
(
[
["└── (T) Task-5"],
[
"└── (T) Task-1",
" └── main",
" ├── (T) Task-2",
" ├── (T) Task-3",
" └── (T) Task-4",
],
]
),
],
]
TEST_INPUTS_CYCLES_TREE = [
[
# this test case contains a cycle: two tasks awaiting each other.
(
[
(
1,
[
(2, "Task-1", []),
(
3,
"a",
[[["awaiter2"], 4], [["main"], 2]],
),
(4, "b", [[["awaiter"], 3]]),
],
),
(0, []),
]
),
([[4, 3, 4]]),
],
[
# this test case contains two cycles
(
[
(
1,
[
(2, "Task-1", []),
(
3,
"A",
[[["nested", "nested", "task_b"], 4]],
),
(
4,
"B",
[
[["nested", "nested", "task_c"], 5],
[["nested", "nested", "task_a"], 3],
],
),
(5, "C", [[["nested", "nested"], 6]]),
(
6,
"Task-2",
[[["nested", "nested", "task_b"], 4]],
),
],
),
(0, []),
]
),
([[4, 3, 4], [4, 6, 5, 4]]),
],
]
TEST_INPUTS_TABLE = [
[
# test case containing a task called timer being awaited in two
# different subtasks part of a TaskGroup (root1 and root2) which call
# awaiter functions.
(
(
1,
[
(2, "Task-1", []),
(
3,
"timer",
[
[["awaiter3", "awaiter2", "awaiter"], 4],
[["awaiter1_3", "awaiter1_2", "awaiter1"], 5],
[["awaiter1_3", "awaiter1_2", "awaiter1"], 6],
[["awaiter3", "awaiter2", "awaiter"], 7],
],
),
(
8,
"root1",
[[["_aexit", "__aexit__", "main"], 2]],
),
(
9,
"root2",
[[["_aexit", "__aexit__", "main"], 2]],
),
(
4,
"child1_1",
[
[
["_aexit", "__aexit__", "blocho_caller", "bloch"],
8,
]
],
),
(
6,
"child2_1",
[
[
["_aexit", "__aexit__", "blocho_caller", "bloch"],
8,
]
],
),
(
7,
"child1_2",
[
[
["_aexit", "__aexit__", "blocho_caller", "bloch"],
9,
]
],
),
(
5,
"child2_2",
[
[
["_aexit", "__aexit__", "blocho_caller", "bloch"],
9,
]
],
),
],
),
(0, []),
),
(
[
[1, "0x2", "Task-1", "", "", "0x0"],
[
1,
"0x3",
"timer",
"awaiter3 -> awaiter2 -> awaiter",
"child1_1",
"0x4",
],
[
1,
"0x3",
"timer",
"awaiter1_3 -> awaiter1_2 -> awaiter1",
"child2_2",
"0x5",
],
[
1,
"0x3",
"timer",
"awaiter1_3 -> awaiter1_2 -> awaiter1",
"child2_1",
"0x6",
],
[
1,
"0x3",
"timer",
"awaiter3 -> awaiter2 -> awaiter",
"child1_2",
"0x7",
],
[
1,
"0x8",
"root1",
"_aexit -> __aexit__ -> main",
"Task-1",
"0x2",
],
[
1,
"0x9",
"root2",
"_aexit -> __aexit__ -> main",
"Task-1",
"0x2",
],
[
1,
"0x4",
"child1_1",
"_aexit -> __aexit__ -> blocho_caller -> bloch",
"root1",
"0x8",
],
[
1,
"0x6",
"child2_1",
"_aexit -> __aexit__ -> blocho_caller -> bloch",
"root1",
"0x8",
],
[
1,
"0x7",
"child1_2",
"_aexit -> __aexit__ -> blocho_caller -> bloch",
"root2",
"0x9",
],
[
1,
"0x5",
"child2_2",
"_aexit -> __aexit__ -> blocho_caller -> bloch",
"root2",
"0x9",
],
]
),
],
[
# test case containing two roots
(
(
9,
[
(5, "Task-5", []),
(6, "Task-6", [[["main2"], 5]]),
(7, "Task-7", [[["main2"], 5]]),
(8, "Task-8", [[["main2"], 5]]),
],
),
(
10,
[
(1, "Task-1", []),
(2, "Task-2", [[["main"], 1]]),
(3, "Task-3", [[["main"], 1]]),
(4, "Task-4", [[["main"], 1]]),
],
),
(11, []),
(0, []),
),
(
[
[9, "0x5", "Task-5", "", "", "0x0"],
[9, "0x6", "Task-6", "main2", "Task-5", "0x5"],
[9, "0x7", "Task-7", "main2", "Task-5", "0x5"],
[9, "0x8", "Task-8", "main2", "Task-5", "0x5"],
[10, "0x1", "Task-1", "", "", "0x0"],
[10, "0x2", "Task-2", "main", "Task-1", "0x1"],
[10, "0x3", "Task-3", "main", "Task-1", "0x1"],
[10, "0x4", "Task-4", "main", "Task-1", "0x1"],
]
),
],
[
# test case containing two roots, one of them without subtasks
(
[
(1, [(2, "Task-5", [])]),
(
3,
[
(4, "Task-1", []),
(5, "Task-2", [[["main"], 4]]),
(6, "Task-3", [[["main"], 4]]),
(7, "Task-4", [[["main"], 4]]),
],
),
(8, []),
(0, []),
]
),
(
[
[1, "0x2", "Task-5", "", "", "0x0"],
[3, "0x4", "Task-1", "", "", "0x0"],
[3, "0x5", "Task-2", "main", "Task-1", "0x4"],
[3, "0x6", "Task-3", "main", "Task-1", "0x4"],
[3, "0x7", "Task-4", "main", "Task-1", "0x4"],
]
),
],
# CASES WITH CYCLES
[
# this test case contains a cycle: two tasks awaiting each other.
(
[
(
1,
[
(2, "Task-1", []),
(
3,
"a",
[[["awaiter2"], 4], [["main"], 2]],
),
(4, "b", [[["awaiter"], 3]]),
],
),
(0, []),
]
),
(
[
[1, "0x2", "Task-1", "", "", "0x0"],
[1, "0x3", "a", "awaiter2", "b", "0x4"],
[1, "0x3", "a", "main", "Task-1", "0x2"],
[1, "0x4", "b", "awaiter", "a", "0x3"],
]
),
],
[
# this test case contains two cycles
(
[
(
1,
[
(2, "Task-1", []),
(
3,
"A",
[[["nested", "nested", "task_b"], 4]],
),
(
4,
"B",
[
[["nested", "nested", "task_c"], 5],
[["nested", "nested", "task_a"], 3],
],
),
(5, "C", [[["nested", "nested"], 6]]),
(
6,
"Task-2",
[[["nested", "nested", "task_b"], 4]],
),
],
),
(0, []),
]
),
(
[
[1, "0x2", "Task-1", "", "", "0x0"],
[
1,
"0x3",
"A",
"nested -> nested -> task_b",
"B",
"0x4",
],
[
1,
"0x4",
"B",
"nested -> nested -> task_c",
"C",
"0x5",
],
[
1,
"0x4",
"B",
"nested -> nested -> task_a",
"A",
"0x3",
],
[
1,
"0x5",
"C",
"nested -> nested",
"Task-2",
"0x6",
],
[
1,
"0x6",
"Task-2",
"nested -> nested -> task_b",
"B",
"0x4",
],
]
),
],
]
class TestAsyncioToolsTree(unittest.TestCase):
def test_asyncio_utils(self):
for input_, tree in TEST_INPUTS_TREE:
with self.subTest(input_):
self.assertEqual(tools.build_async_tree(input_), tree)
def test_asyncio_utils_cycles(self):
for input_, cycles in TEST_INPUTS_CYCLES_TREE:
with self.subTest(input_):
try:
tools.build_async_tree(input_)
except tools.CycleFoundException as e:
self.assertEqual(e.cycles, cycles)
class TestAsyncioToolsTable(unittest.TestCase):
def test_asyncio_utils(self):
for input_, table in TEST_INPUTS_TABLE:
with self.subTest(input_):
self.assertEqual(tools.build_task_table(input_), table)
class TestAsyncioToolsBasic(unittest.TestCase):
def test_empty_input_tree(self):
"""Test build_async_tree with empty input."""
result = []
expected_output = []
self.assertEqual(tools.build_async_tree(result), expected_output)
def test_empty_input_table(self):
"""Test build_task_table with empty input."""
result = []
expected_output = []
self.assertEqual(tools.build_task_table(result), expected_output)
def test_only_independent_tasks_tree(self):
input_ = [(1, [(10, "taskA", []), (11, "taskB", [])])]
expected = [["└── (T) taskA"], ["└── (T) taskB"]]
result = tools.build_async_tree(input_)
self.assertEqual(sorted(result), sorted(expected))
def test_only_independent_tasks_table(self):
input_ = [(1, [(10, "taskA", []), (11, "taskB", [])])]
self.assertEqual(
tools.build_task_table(input_),
[[1, "0xa", "taskA", "", "", "0x0"], [1, "0xb", "taskB", "", "", "0x0"]],
)
def test_single_task_tree(self):
"""Test build_async_tree with a single task and no awaits."""
result = [
(
1,
[
(2, "Task-1", []),
],
)
]
expected_output = [
[
"└── (T) Task-1",
]
]
self.assertEqual(tools.build_async_tree(result), expected_output)
def test_single_task_table(self):
"""Test build_task_table with a single task and no awaits."""
result = [
(
1,
[
(2, "Task-1", []),
],
)
]
expected_output = [[1, "0x2", "Task-1", "", "", "0x0"]]
self.assertEqual(tools.build_task_table(result), expected_output)
def test_cycle_detection(self):
"""Test build_async_tree raises CycleFoundException for cyclic input."""
result = [
(
1,
[
(2, "Task-1", [[["main"], 3]]),
(3, "Task-2", [[["main"], 2]]),
],
)
]
with self.assertRaises(tools.CycleFoundException) as context:
tools.build_async_tree(result)
self.assertEqual(context.exception.cycles, [[3, 2, 3]])
def test_complex_tree(self):
"""Test build_async_tree with a more complex tree structure."""
result = [
(
1,
[
(2, "Task-1", []),
(3, "Task-2", [[["main"], 2]]),
(4, "Task-3", [[["main"], 3]]),
],
)
]
expected_output = [
[
"└── (T) Task-1",
" └── main",
" └── (T) Task-2",
" └── main",
" └── (T) Task-3",
]
]
self.assertEqual(tools.build_async_tree(result), expected_output)
def test_complex_table(self):
"""Test build_task_table with a more complex tree structure."""
result = [
(
1,
[
(2, "Task-1", []),
(3, "Task-2", [[["main"], 2]]),
(4, "Task-3", [[["main"], 3]]),
],
)
]
expected_output = [
[1, "0x2", "Task-1", "", "", "0x0"],
[1, "0x3", "Task-2", "main", "Task-1", "0x2"],
[1, "0x4", "Task-3", "main", "Task-2", "0x3"],
]
self.assertEqual(tools.build_task_table(result), expected_output)
def test_deep_coroutine_chain(self):
input_ = [
(
1,
[
(10, "leaf", [[["c1", "c2", "c3", "c4", "c5"], 11]]),
(11, "root", []),
],
)
]
expected = [
[
"└── (T) root",
" └── c5",
" └── c4",
" └── c3",
" └── c2",
" └── c1",
" └── (T) leaf",
]
]
result = tools.build_async_tree(input_)
self.assertEqual(result, expected)
def test_multiple_cycles_same_node(self):
input_ = [
(
1,
[
(1, "Task-A", [[["call1"], 2]]),
(2, "Task-B", [[["call2"], 3]]),
(3, "Task-C", [[["call3"], 1], [["call4"], 2]]),
],
)
]
with self.assertRaises(tools.CycleFoundException) as ctx:
tools.build_async_tree(input_)
cycles = ctx.exception.cycles
self.assertTrue(any(set(c) == {1, 2, 3} for c in cycles))
def test_table_output_format(self):
input_ = [(1, [(1, "Task-A", [[["foo"], 2]]), (2, "Task-B", [])])]
table = tools.build_task_table(input_)
for row in table:
self.assertEqual(len(row), 6)
self.assertIsInstance(row[0], int) # thread ID
self.assertTrue(
isinstance(row[1], str) and row[1].startswith("0x")
) # hex task ID
self.assertIsInstance(row[2], str) # task name
self.assertIsInstance(row[3], str) # coroutine chain
self.assertIsInstance(row[4], str) # awaiter name
self.assertTrue(
isinstance(row[5], str) and row[5].startswith("0x")
) # hex awaiter ID
class TestAsyncioToolsEdgeCases(unittest.TestCase):
def test_task_awaits_self(self):
"""A task directly awaits itself - should raise a cycle."""
input_ = [(1, [(1, "Self-Awaiter", [[["loopback"], 1]])])]
with self.assertRaises(tools.CycleFoundException) as ctx:
tools.build_async_tree(input_)
self.assertIn([1, 1], ctx.exception.cycles)
def test_task_with_missing_awaiter_id(self):
"""Awaiter ID not in task list - should not crash, just show 'Unknown'."""
input_ = [(1, [(1, "Task-A", [[["coro"], 999]])])] # 999 not defined
table = tools.build_task_table(input_)
self.assertEqual(len(table), 1)
self.assertEqual(table[0][4], "Unknown")
def test_duplicate_coroutine_frames(self):
"""Same coroutine frame repeated under a parent - should deduplicate."""
input_ = [
(
1,
[
(1, "Task-1", [[["frameA"], 2], [["frameA"], 3]]),
(2, "Task-2", []),
(3, "Task-3", []),
],
)
]
tree = tools.build_async_tree(input_)
# Both children should be under the same coroutine node
flat = "\n".join(tree[0])
self.assertIn("frameA", flat)
self.assertIn("Task-2", flat)
self.assertIn("Task-1", flat)
flat = "\n".join(tree[1])
self.assertIn("frameA", flat)
self.assertIn("Task-3", flat)
self.assertIn("Task-1", flat)
def test_task_with_no_name(self):
"""Task with no name in id2name - should still render with fallback."""
input_ = [(1, [(1, "root", [[["f1"], 2]]), (2, None, [])])]
# If name is None, fallback to string should not crash
tree = tools.build_async_tree(input_)
self.assertIn("(T) None", "\n".join(tree[0]))
def test_tree_rendering_with_custom_emojis(self):
"""Pass custom emojis to the tree renderer."""
input_ = [(1, [(1, "MainTask", [[["f1", "f2"], 2]]), (2, "SubTask", [])])]
tree = tools.build_async_tree(input_, task_emoji="🧵", cor_emoji="🔁")
flat = "\n".join(tree[0])
self.assertIn("🧵 MainTask", flat)
self.assertIn("🔁 f1", flat)
self.assertIn("🔁 f2", flat)
self.assertIn("🧵 SubTask", flat)