gh-115957: Close coroutine if TaskGroup.create_task() raises an error (#116009)

This commit is contained in:
Jason Zhang 2024-03-06 20:20:26 +00:00 committed by GitHub
parent 7114cf20c0
commit ce0ae1d784
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 42 additions and 16 deletions

View file

@ -7,6 +7,7 @@ import contextvars
import contextlib
from asyncio import taskgroups
import unittest
import warnings
from test.test_asyncio.utils import await_without_task
@ -738,10 +739,7 @@ class TestTaskGroup(unittest.IsolatedAsyncioTestCase):
await asyncio.sleep(1)
except asyncio.CancelledError:
with self.assertRaises(RuntimeError):
g.create_task(c1 := coro1())
# We still have to await c1 to avoid a warning
with self.assertRaises(ZeroDivisionError):
await c1
g.create_task(coro1())
with self.assertRaises(ExceptionGroup) as cm:
async with taskgroups.TaskGroup() as g:
@ -797,22 +795,25 @@ class TestTaskGroup(unittest.IsolatedAsyncioTestCase):
pass
async def test_taskgroup_finished(self):
tg = taskgroups.TaskGroup()
async with tg:
pass
coro = asyncio.sleep(0)
with self.assertRaisesRegex(RuntimeError, "is finished"):
tg.create_task(coro)
# We still have to await coro to avoid a warning
await coro
async def create_task_after_tg_finish():
tg = taskgroups.TaskGroup()
async with tg:
pass
coro = asyncio.sleep(0)
with self.assertRaisesRegex(RuntimeError, "is finished"):
tg.create_task(coro)
# Make sure the coroutine was closed when submitted to the inactive tg
# (if not closed, a RuntimeWarning should have been raised)
with warnings.catch_warnings(record=True) as w:
await create_task_after_tg_finish()
self.assertEqual(len(w), 0)
async def test_taskgroup_not_entered(self):
tg = taskgroups.TaskGroup()
coro = asyncio.sleep(0)
with self.assertRaisesRegex(RuntimeError, "has not been entered"):
tg.create_task(coro)
# We still have to await coro to avoid a warning
await coro
async def test_taskgroup_without_parent_task(self):
tg = taskgroups.TaskGroup()
@ -821,8 +822,16 @@ class TestTaskGroup(unittest.IsolatedAsyncioTestCase):
coro = asyncio.sleep(0)
with self.assertRaisesRegex(RuntimeError, "has not been entered"):
tg.create_task(coro)
# We still have to await coro to avoid a warning
await coro
def test_coro_closed_when_tg_closed(self):
async def run_coro_after_tg_closes():
async with taskgroups.TaskGroup() as tg:
pass
coro = asyncio.sleep(0)
with self.assertRaisesRegex(RuntimeError, "is finished"):
tg.create_task(coro)
loop = asyncio.get_event_loop()
loop.run_until_complete(run_coro_after_tg_closes())
if __name__ == "__main__":