mirror of
https://github.com/python/cpython.git
synced 2025-08-10 11:58:39 +00:00
[3.12] gh-111085: Fix invalid state handling in TaskGroup and Timeout (GH-111111) (GH-111171)
asyncio.TaskGroup and asyncio.Timeout classes now raise proper RuntimeError
if they are improperly used.
* When they are used without entering the context manager.
* When they are used after finishing.
* When the context manager is entered more than once (simultaneously or
sequentially).
* If there is no current task when entering the context manager.
They now remain in a consistent state after an exception is thrown,
so subsequent operations can be performed correctly (if they are allowed).
(cherry picked from commit 6c23635f2b
)
Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
Co-authored-by: James Hilton-Balfe <gobot1234yt@gmail.com>
This commit is contained in:
parent
322f79f5a2
commit
028f47754c
6 changed files with 121 additions and 10 deletions
|
@ -8,6 +8,8 @@ import contextlib
|
|||
from asyncio import taskgroups
|
||||
import unittest
|
||||
|
||||
from test.test_asyncio.utils import await_without_task
|
||||
|
||||
|
||||
# To prevent a warning "test altered the execution environment"
|
||||
def tearDownModule():
|
||||
|
@ -779,6 +781,49 @@ class TestTaskGroup(unittest.IsolatedAsyncioTestCase):
|
|||
|
||||
await asyncio.create_task(main())
|
||||
|
||||
async def test_taskgroup_already_entered(self):
|
||||
tg = taskgroups.TaskGroup()
|
||||
async with tg:
|
||||
with self.assertRaisesRegex(RuntimeError, "has already been entered"):
|
||||
async with tg:
|
||||
pass
|
||||
|
||||
async def test_taskgroup_double_enter(self):
|
||||
tg = taskgroups.TaskGroup()
|
||||
async with tg:
|
||||
pass
|
||||
with self.assertRaisesRegex(RuntimeError, "has already been entered"):
|
||||
async with tg:
|
||||
pass
|
||||
|
||||
async def test_taskgroup_finished(self):
|
||||
tg = taskgroups.TaskGroup()
|
||||
async with tg:
|
||||
pass
|
||||
coro = asyncio.sleep(0)
|
||||
with self.assertRaisesRegex(RuntimeError, "is finished"):
|
||||
tg.create_task(coro)
|
||||
# We still have to await coro to avoid a warning
|
||||
await coro
|
||||
|
||||
async def test_taskgroup_not_entered(self):
|
||||
tg = taskgroups.TaskGroup()
|
||||
coro = asyncio.sleep(0)
|
||||
with self.assertRaisesRegex(RuntimeError, "has not been entered"):
|
||||
tg.create_task(coro)
|
||||
# We still have to await coro to avoid a warning
|
||||
await coro
|
||||
|
||||
async def test_taskgroup_without_parent_task(self):
|
||||
tg = taskgroups.TaskGroup()
|
||||
with self.assertRaisesRegex(RuntimeError, "parent task"):
|
||||
await await_without_task(tg.__aenter__())
|
||||
coro = asyncio.sleep(0)
|
||||
with self.assertRaisesRegex(RuntimeError, "has not been entered"):
|
||||
tg.create_task(coro)
|
||||
# We still have to await coro to avoid a warning
|
||||
await coro
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue