import asyncio import io import unittest # To prevent a warning "test altered the execution environment" def tearDownModule(): asyncio.events._set_event_loop_policy(None) def capture_test_stack(*, fut=None, depth=1): def walk(s): ret = [ (f"T<{n}>" if '-' not in (n := s.future.get_name()) else 'T') if isinstance(s.future, asyncio.Task) else 'F' ] ret.append( [ ( f"s {entry.frame.f_code.co_name}" if entry.frame.f_generator is None else ( f"a {entry.frame.f_generator.cr_code.co_name}" if hasattr(entry.frame.f_generator, 'cr_code') else f"ag {entry.frame.f_generator.ag_code.co_name}" ) ) for entry in s.call_stack ] ) ret.append( sorted([ walk(ab) for ab in s.awaited_by ], key=lambda entry: entry[0]) ) return ret buf = io.StringIO() asyncio.print_call_graph(fut, file=buf, depth=depth+1) stack = asyncio.capture_call_graph(fut, depth=depth) return walk(stack), buf.getvalue() class CallStackTestBase: async def test_stack_tgroup(self): stack_for_c5 = None def c5(): nonlocal stack_for_c5 stack_for_c5 = capture_test_stack(depth=2) async def c4(): await asyncio.sleep(0) c5() async def c3(): await c4() async def c2(): await c3() async def c1(task): await task async def main(): async with asyncio.TaskGroup() as tg: task = tg.create_task(c2(), name="c2_root") tg.create_task(c1(task), name="sub_main_1") tg.create_task(c1(task), name="sub_main_2") await main() self.assertEqual(stack_for_c5[0], [ # task name 'T', # call stack ['s c5', 'a c4', 'a c3', 'a c2'], # awaited by [ ['T', ['a _aexit', 'a __aexit__', 'a main', 'a test_stack_tgroup'], [] ], ['T', ['a c1'], [ ['T', ['a _aexit', 'a __aexit__', 'a main', 'a test_stack_tgroup'], [] ] ] ], ['T', ['a c1'], [ ['T', ['a _aexit', 'a __aexit__', 'a main', 'a test_stack_tgroup'], [] ] ] ] ] ]) self.assertIn( ' async CallStackTestBase.test_stack_tgroup()', stack_for_c5[1]) async def test_stack_async_gen(self): stack_for_gen_nested_call = None async def gen_nested_call(): nonlocal stack_for_gen_nested_call stack_for_gen_nested_call = capture_test_stack() async def gen(): for num in range(2): yield num if num == 1: await gen_nested_call() async def main(): async for el in gen(): pass await main() self.assertEqual(stack_for_gen_nested_call[0], [ 'T', [ 's capture_test_stack', 'a gen_nested_call', 'ag gen', 'a main', 'a test_stack_async_gen' ], [] ]) self.assertIn( 'async generator CallStackTestBase.test_stack_async_gen..gen()', stack_for_gen_nested_call[1]) async def test_stack_gather(self): stack_for_deep = None async def deep(): await asyncio.sleep(0) nonlocal stack_for_deep stack_for_deep = capture_test_stack() async def c1(): await asyncio.sleep(0) await deep() async def c2(): await asyncio.sleep(0) async def main(): await asyncio.gather(c1(), c2()) await main() self.assertEqual(stack_for_deep[0], [ 'T', ['s capture_test_stack', 'a deep', 'a c1'], [ ['T', ['a main', 'a test_stack_gather'], []] ] ]) async def test_stack_shield(self): stack_for_shield = None async def deep(): await asyncio.sleep(0) nonlocal stack_for_shield stack_for_shield = capture_test_stack() async def c1(): await asyncio.sleep(0) await deep() async def main(): await asyncio.shield(c1()) await main() self.assertEqual(stack_for_shield[0], [ 'T', ['s capture_test_stack', 'a deep', 'a c1'], [ ['T', ['a main', 'a test_stack_shield'], []] ] ]) async def test_stack_timeout(self): stack_for_inner = None async def inner(): await asyncio.sleep(0) nonlocal stack_for_inner stack_for_inner = capture_test_stack() async def c1(): async with asyncio.timeout(1): await asyncio.sleep(0) await inner() async def main(): await asyncio.shield(c1()) await main() self.assertEqual(stack_for_inner[0], [ 'T', ['s capture_test_stack', 'a inner', 'a c1'], [ ['T', ['a main', 'a test_stack_timeout'], []] ] ]) async def test_stack_wait(self): stack_for_inner = None async def inner(): await asyncio.sleep(0) nonlocal stack_for_inner stack_for_inner = capture_test_stack() async def c1(): async with asyncio.timeout(1): await asyncio.sleep(0) await inner() async def c2(): for i in range(3): await asyncio.sleep(0) async def main(t1, t2): while True: _, pending = await asyncio.wait([t1, t2]) if not pending: break t1 = asyncio.create_task(c1()) t2 = asyncio.create_task(c2()) try: await main(t1, t2) finally: await t1 await t2 self.assertEqual(stack_for_inner[0], [ 'T', ['s capture_test_stack', 'a inner', 'a c1'], [ ['T', ['a _wait', 'a wait', 'a main', 'a test_stack_wait'], [] ] ] ]) async def test_stack_task(self): stack_for_inner = None async def inner(): await asyncio.sleep(0) nonlocal stack_for_inner stack_for_inner = capture_test_stack() async def c1(): await inner() async def c2(): await asyncio.create_task(c1(), name='there there') async def main(): await c2() await main() self.assertEqual(stack_for_inner[0], [ 'T', ['s capture_test_stack', 'a inner', 'a c1'], [['T', ['a c2', 'a main', 'a test_stack_task'], []]] ]) async def test_stack_future(self): stack_for_fut = None async def a2(fut): await fut async def a1(fut): await a2(fut) async def b1(fut): await fut async def main(): nonlocal stack_for_fut fut = asyncio.Future() async with asyncio.TaskGroup() as g: g.create_task(a1(fut), name="task A") g.create_task(b1(fut), name='task B') for _ in range(5): # Do a few iterations to ensure that both a1 and b1 # await on the future await asyncio.sleep(0) stack_for_fut = capture_test_stack(fut=fut) fut.set_result(None) await main() self.assertEqual(stack_for_fut[0], ['F', [], [ ['T', ['a a2', 'a a1'], [['T', ['a test_stack_future'], []]] ], ['T', ['a b1'], [['T', ['a test_stack_future'], []]] ], ]] ) self.assertTrue(stack_for_fut[1].startswith('* Future(id=')) @unittest.skipIf( not hasattr(asyncio.futures, "_c_future_add_to_awaited_by"), "C-accelerated asyncio call graph backend missing", ) class TestCallStackC(CallStackTestBase, unittest.IsolatedAsyncioTestCase): def setUp(self): futures = asyncio.futures tasks = asyncio.tasks self._Future = asyncio.Future asyncio.Future = futures.Future = futures._CFuture self._Task = asyncio.Task asyncio.Task = tasks.Task = tasks._CTask self._future_add_to_awaited_by = asyncio.future_add_to_awaited_by futures.future_add_to_awaited_by = futures._c_future_add_to_awaited_by asyncio.future_add_to_awaited_by = futures.future_add_to_awaited_by self._future_discard_from_awaited_by = asyncio.future_discard_from_awaited_by futures.future_discard_from_awaited_by = futures._c_future_discard_from_awaited_by asyncio.future_discard_from_awaited_by = futures.future_discard_from_awaited_by self._current_task = asyncio.current_task asyncio.current_task = asyncio.tasks.current_task = tasks._c_current_task def tearDown(self): futures = asyncio.futures tasks = asyncio.tasks futures.future_discard_from_awaited_by = self._future_discard_from_awaited_by asyncio.future_discard_from_awaited_by = self._future_discard_from_awaited_by del self._future_discard_from_awaited_by futures.future_add_to_awaited_by = self._future_add_to_awaited_by asyncio.future_add_to_awaited_by = self._future_add_to_awaited_by del self._future_add_to_awaited_by asyncio.Task = self._Task tasks.Task = self._Task del self._Task asyncio.Future = self._Future futures.Future = self._Future del self._Future asyncio.current_task = asyncio.tasks.current_task = self._current_task @unittest.skipIf( not hasattr(asyncio.futures, "_py_future_add_to_awaited_by"), "Pure Python asyncio call graph backend missing", ) class TestCallStackPy(CallStackTestBase, unittest.IsolatedAsyncioTestCase): def setUp(self): futures = asyncio.futures tasks = asyncio.tasks self._Future = asyncio.Future asyncio.Future = futures.Future = futures._PyFuture self._Task = asyncio.Task asyncio.Task = tasks.Task = tasks._PyTask self._future_add_to_awaited_by = asyncio.future_add_to_awaited_by futures.future_add_to_awaited_by = futures._py_future_add_to_awaited_by asyncio.future_add_to_awaited_by = futures.future_add_to_awaited_by self._future_discard_from_awaited_by = asyncio.future_discard_from_awaited_by futures.future_discard_from_awaited_by = futures._py_future_discard_from_awaited_by asyncio.future_discard_from_awaited_by = futures.future_discard_from_awaited_by self._current_task = asyncio.current_task asyncio.current_task = asyncio.tasks.current_task = tasks._py_current_task def tearDown(self): futures = asyncio.futures tasks = asyncio.tasks futures.future_discard_from_awaited_by = self._future_discard_from_awaited_by asyncio.future_discard_from_awaited_by = self._future_discard_from_awaited_by del self._future_discard_from_awaited_by futures.future_add_to_awaited_by = self._future_add_to_awaited_by asyncio.future_add_to_awaited_by = self._future_add_to_awaited_by del self._future_add_to_awaited_by asyncio.Task = self._Task tasks.Task = self._Task del self._Task asyncio.Future = self._Future futures.Future = self._Future del self._Future asyncio.current_task = asyncio.tasks.current_task = self._current_task