mirror of
https://github.com/python/cpython.git
synced 2025-08-30 21:48:47 +00:00
gh-111085: Fix invalid state handling in TaskGroup and Timeout (#111111)
asyncio.TaskGroup and asyncio.Timeout classes now raise proper RuntimeError if they are improperly used. * When they are used without entering the context manager. * When they are used after finishing. * When the context manager is entered more than once (simultaneously or sequentially). * If there is no current task when entering the context manager. They now remain in a consistent state after an exception is thrown, so subsequent operations can be performed correctly (if they are allowed). Co-authored-by: James Hilton-Balfe <gobot1234yt@gmail.com>
This commit is contained in:
parent
fd60549c0a
commit
6c23635f2b
6 changed files with 121 additions and 10 deletions
|
@ -8,6 +8,8 @@ import contextlib
|
|||
from asyncio import taskgroups
|
||||
import unittest
|
||||
|
||||
from test.test_asyncio.utils import await_without_task
|
||||
|
||||
|
||||
# To prevent a warning "test altered the execution environment"
|
||||
def tearDownModule():
|
||||
|
@ -779,6 +781,49 @@ class TestTaskGroup(unittest.IsolatedAsyncioTestCase):
|
|||
|
||||
await asyncio.create_task(main())
|
||||
|
||||
async def test_taskgroup_already_entered(self):
|
||||
tg = taskgroups.TaskGroup()
|
||||
async with tg:
|
||||
with self.assertRaisesRegex(RuntimeError, "has already been entered"):
|
||||
async with tg:
|
||||
pass
|
||||
|
||||
async def test_taskgroup_double_enter(self):
|
||||
tg = taskgroups.TaskGroup()
|
||||
async with tg:
|
||||
pass
|
||||
with self.assertRaisesRegex(RuntimeError, "has already been entered"):
|
||||
async with tg:
|
||||
pass
|
||||
|
||||
async def test_taskgroup_finished(self):
|
||||
tg = taskgroups.TaskGroup()
|
||||
async with tg:
|
||||
pass
|
||||
coro = asyncio.sleep(0)
|
||||
with self.assertRaisesRegex(RuntimeError, "is finished"):
|
||||
tg.create_task(coro)
|
||||
# We still have to await coro to avoid a warning
|
||||
await coro
|
||||
|
||||
async def test_taskgroup_not_entered(self):
|
||||
tg = taskgroups.TaskGroup()
|
||||
coro = asyncio.sleep(0)
|
||||
with self.assertRaisesRegex(RuntimeError, "has not been entered"):
|
||||
tg.create_task(coro)
|
||||
# We still have to await coro to avoid a warning
|
||||
await coro
|
||||
|
||||
async def test_taskgroup_without_parent_task(self):
|
||||
tg = taskgroups.TaskGroup()
|
||||
with self.assertRaisesRegex(RuntimeError, "parent task"):
|
||||
await await_without_task(tg.__aenter__())
|
||||
coro = asyncio.sleep(0)
|
||||
with self.assertRaisesRegex(RuntimeError, "has not been entered"):
|
||||
tg.create_task(coro)
|
||||
# We still have to await coro to avoid a warning
|
||||
await coro
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue