GH-117881: fix athrow().throw()/asend().throw() concurrent access (GH-117882)

This commit is contained in:
Thomas Grainger 2024-05-01 07:44:01 +01:00 committed by GitHub
parent 2520eed0a5
commit fc7e1aa3c0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 235 additions and 2 deletions

View file

@ -393,6 +393,151 @@ class AsyncGenTest(unittest.TestCase):
r'anext\(\): asynchronous generator is already running'):
an.__next__()
with self.assertRaisesRegex(RuntimeError,
r"cannot reuse already awaited __anext__\(\)/asend\(\)"):
an.send(None)
def test_async_gen_asend_throw_concurrent_with_send(self):
import types
@types.coroutine
def _async_yield(v):
return (yield v)
class MyExc(Exception):
pass
async def agenfn():
while True:
try:
await _async_yield(None)
except MyExc:
pass
return
yield
agen = agenfn()
gen = agen.asend(None)
gen.send(None)
gen2 = agen.asend(None)
with self.assertRaisesRegex(RuntimeError,
r'anext\(\): asynchronous generator is already running'):
gen2.throw(MyExc)
with self.assertRaisesRegex(RuntimeError,
r"cannot reuse already awaited __anext__\(\)/asend\(\)"):
gen2.send(None)
def test_async_gen_athrow_throw_concurrent_with_send(self):
import types
@types.coroutine
def _async_yield(v):
return (yield v)
class MyExc(Exception):
pass
async def agenfn():
while True:
try:
await _async_yield(None)
except MyExc:
pass
return
yield
agen = agenfn()
gen = agen.asend(None)
gen.send(None)
gen2 = agen.athrow(MyExc)
with self.assertRaisesRegex(RuntimeError,
r'athrow\(\): asynchronous generator is already running'):
gen2.throw(MyExc)
with self.assertRaisesRegex(RuntimeError,
r"cannot reuse already awaited aclose\(\)/athrow\(\)"):
gen2.send(None)
def test_async_gen_asend_throw_concurrent_with_throw(self):
import types
@types.coroutine
def _async_yield(v):
return (yield v)
class MyExc(Exception):
pass
async def agenfn():
try:
yield
except MyExc:
pass
while True:
try:
await _async_yield(None)
except MyExc:
pass
agen = agenfn()
with self.assertRaises(StopIteration):
agen.asend(None).send(None)
gen = agen.athrow(MyExc)
gen.throw(MyExc)
gen2 = agen.asend(MyExc)
with self.assertRaisesRegex(RuntimeError,
r'anext\(\): asynchronous generator is already running'):
gen2.throw(MyExc)
with self.assertRaisesRegex(RuntimeError,
r"cannot reuse already awaited __anext__\(\)/asend\(\)"):
gen2.send(None)
def test_async_gen_athrow_throw_concurrent_with_throw(self):
import types
@types.coroutine
def _async_yield(v):
return (yield v)
class MyExc(Exception):
pass
async def agenfn():
try:
yield
except MyExc:
pass
while True:
try:
await _async_yield(None)
except MyExc:
pass
agen = agenfn()
with self.assertRaises(StopIteration):
agen.asend(None).send(None)
gen = agen.athrow(MyExc)
gen.throw(MyExc)
gen2 = agen.athrow(None)
with self.assertRaisesRegex(RuntimeError,
r'athrow\(\): asynchronous generator is already running'):
gen2.throw(MyExc)
with self.assertRaisesRegex(RuntimeError,
r"cannot reuse already awaited aclose\(\)/athrow\(\)"):
gen2.send(None)
def test_async_gen_3_arg_deprecation_warning(self):
async def gen():
yield 123
@ -1571,6 +1716,8 @@ class AsyncGenAsyncioTest(unittest.TestCase):
self.assertIsInstance(message['exception'], ZeroDivisionError)
self.assertIn('unhandled exception during asyncio.run() shutdown',
message['message'])
del message, messages
gc_collect()
def test_async_gen_expression_01(self):
async def arange(n):
@ -1624,6 +1771,7 @@ class AsyncGenAsyncioTest(unittest.TestCase):
asyncio.run(main())
self.assertEqual([], messages)
gc_collect()
def test_async_gen_await_same_anext_coro_twice(self):
async def async_iterate():
@ -1809,9 +1957,56 @@ class TestUnawaitedWarnings(unittest.TestCase):
g = gen()
with self.assertRaises(MyException):
g.aclose().throw(MyException)
del g
gc_collect()
del g
gc_collect() # does not warn unawaited
def test_asend_send_already_running(self):
@types.coroutine
def _async_yield(v):
return (yield v)
async def agenfn():
while True:
await _async_yield(1)
return
yield
agen = agenfn()
gen = agen.asend(None)
gen.send(None)
gen2 = agen.asend(None)
with self.assertRaisesRegex(RuntimeError,
r'anext\(\): asynchronous generator is already running'):
gen2.send(None)
del gen2
gc_collect() # does not warn unawaited
def test_athrow_send_already_running(self):
@types.coroutine
def _async_yield(v):
return (yield v)
async def agenfn():
while True:
await _async_yield(1)
return
yield
agen = agenfn()
gen = agen.asend(None)
gen.send(None)
gen2 = agen.athrow(Exception)
with self.assertRaisesRegex(RuntimeError,
r'athrow\(\): asynchronous generator is already running'):
gen2.send(None)
del gen2
gc_collect() # does not warn unawaited
if __name__ == "__main__":
unittest.main()