mirror of
https://github.com/python/cpython.git
synced 2025-09-26 10:19:53 +00:00
[3.12] gh-124309: Revert eager task factory fix to prevent breaking downstream (GH-124810) (#124817)
gh-124309: Revert eager task factory fix to prevent breaking downstream (GH-124810) * Revert "GH-124639: add back loop param to staggered_race (GH-124700)" This reverts commite0a41a5dd1
. * Revert "gh-124309: Modernize the `staggered_race` implementation to support eager task factories (GH-124390)" This reverts commitde929f353c
. (cherry picked from commit133e929a79
) Co-authored-by: Peter Bierma <zintensitydev@gmail.com>
This commit is contained in:
parent
ffc6a10149
commit
3b5bc8d228
5 changed files with 64 additions and 123 deletions
|
@ -1110,7 +1110,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
||||||
(functools.partial(self._connect_sock,
|
(functools.partial(self._connect_sock,
|
||||||
exceptions, addrinfo, laddr_infos)
|
exceptions, addrinfo, laddr_infos)
|
||||||
for addrinfo in infos),
|
for addrinfo in infos),
|
||||||
happy_eyeballs_delay)
|
happy_eyeballs_delay, loop=self)
|
||||||
|
|
||||||
if sock is None:
|
if sock is None:
|
||||||
exceptions = [exc for sub in exceptions for exc in sub]
|
exceptions = [exc for sub in exceptions for exc in sub]
|
||||||
|
|
|
@ -4,12 +4,11 @@ __all__ = 'staggered_race',
|
||||||
|
|
||||||
import contextlib
|
import contextlib
|
||||||
|
|
||||||
|
from . import events
|
||||||
|
from . import exceptions as exceptions_mod
|
||||||
from . import locks
|
from . import locks
|
||||||
from . import tasks
|
from . import tasks
|
||||||
from . import taskgroups
|
|
||||||
|
|
||||||
class _Done(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
async def staggered_race(coro_fns, delay, *, loop=None):
|
async def staggered_race(coro_fns, delay, *, loop=None):
|
||||||
"""Run coroutines with staggered start times and take the first to finish.
|
"""Run coroutines with staggered start times and take the first to finish.
|
||||||
|
@ -43,6 +42,8 @@ async def staggered_race(coro_fns, delay, *, loop=None):
|
||||||
delay: amount of time, in seconds, between starting coroutines. If
|
delay: amount of time, in seconds, between starting coroutines. If
|
||||||
``None``, the coroutines will run sequentially.
|
``None``, the coroutines will run sequentially.
|
||||||
|
|
||||||
|
loop: the event loop to use.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
tuple *(winner_result, winner_index, exceptions)* where
|
tuple *(winner_result, winner_index, exceptions)* where
|
||||||
|
|
||||||
|
@ -61,11 +62,36 @@ async def staggered_race(coro_fns, delay, *, loop=None):
|
||||||
|
|
||||||
"""
|
"""
|
||||||
# TODO: when we have aiter() and anext(), allow async iterables in coro_fns.
|
# TODO: when we have aiter() and anext(), allow async iterables in coro_fns.
|
||||||
|
loop = loop or events.get_running_loop()
|
||||||
|
enum_coro_fns = enumerate(coro_fns)
|
||||||
winner_result = None
|
winner_result = None
|
||||||
winner_index = None
|
winner_index = None
|
||||||
exceptions = []
|
exceptions = []
|
||||||
|
running_tasks = []
|
||||||
|
|
||||||
|
async def run_one_coro(previous_failed) -> None:
|
||||||
|
# Wait for the previous task to finish, or for delay seconds
|
||||||
|
if previous_failed is not None:
|
||||||
|
with contextlib.suppress(exceptions_mod.TimeoutError):
|
||||||
|
# Use asyncio.wait_for() instead of asyncio.wait() here, so
|
||||||
|
# that if we get cancelled at this point, Event.wait() is also
|
||||||
|
# cancelled, otherwise there will be a "Task destroyed but it is
|
||||||
|
# pending" later.
|
||||||
|
await tasks.wait_for(previous_failed.wait(), delay)
|
||||||
|
# Get the next coroutine to run
|
||||||
|
try:
|
||||||
|
this_index, coro_fn = next(enum_coro_fns)
|
||||||
|
except StopIteration:
|
||||||
|
return
|
||||||
|
# Start task that will run the next coroutine
|
||||||
|
this_failed = locks.Event()
|
||||||
|
next_task = loop.create_task(run_one_coro(this_failed))
|
||||||
|
running_tasks.append(next_task)
|
||||||
|
assert len(running_tasks) == this_index + 2
|
||||||
|
# Prepare place to put this coroutine's exceptions if not won
|
||||||
|
exceptions.append(None)
|
||||||
|
assert len(exceptions) == this_index + 1
|
||||||
|
|
||||||
async def run_one_coro(this_index, coro_fn, this_failed):
|
|
||||||
try:
|
try:
|
||||||
result = await coro_fn()
|
result = await coro_fn()
|
||||||
except (SystemExit, KeyboardInterrupt):
|
except (SystemExit, KeyboardInterrupt):
|
||||||
|
@ -79,23 +105,34 @@ async def staggered_race(coro_fns, delay, *, loop=None):
|
||||||
assert winner_index is None
|
assert winner_index is None
|
||||||
winner_index = this_index
|
winner_index = this_index
|
||||||
winner_result = result
|
winner_result = result
|
||||||
raise _Done
|
# Cancel all other tasks. We take care to not cancel the current
|
||||||
|
# task as well. If we do so, then since there is no `await` after
|
||||||
|
# here and CancelledError are usually thrown at one, we will
|
||||||
|
# encounter a curious corner case where the current task will end
|
||||||
|
# up as done() == True, cancelled() == False, exception() ==
|
||||||
|
# asyncio.CancelledError. This behavior is specified in
|
||||||
|
# https://bugs.python.org/issue30048
|
||||||
|
for i, t in enumerate(running_tasks):
|
||||||
|
if i != this_index:
|
||||||
|
t.cancel()
|
||||||
|
|
||||||
|
first_task = loop.create_task(run_one_coro(None))
|
||||||
|
running_tasks.append(first_task)
|
||||||
try:
|
try:
|
||||||
tg = taskgroups.TaskGroup()
|
# Wait for a growing list of tasks to all finish: poor man's version of
|
||||||
# Intentionally override the loop in the TaskGroup to avoid
|
# curio's TaskGroup or trio's nursery
|
||||||
# using the running loop, preserving backwards compatibility
|
done_count = 0
|
||||||
# TaskGroup only starts using `_loop` after `__aenter__`
|
while done_count != len(running_tasks):
|
||||||
# so overriding it here is safe.
|
done, _ = await tasks.wait(running_tasks)
|
||||||
tg._loop = loop
|
done_count = len(done)
|
||||||
async with tg:
|
# If run_one_coro raises an unhandled exception, it's probably a
|
||||||
for this_index, coro_fn in enumerate(coro_fns):
|
# programming error, and I want to see it.
|
||||||
this_failed = locks.Event()
|
if __debug__:
|
||||||
exceptions.append(None)
|
for d in done:
|
||||||
tg.create_task(run_one_coro(this_index, coro_fn, this_failed))
|
if d.done() and not d.cancelled() and d.exception():
|
||||||
with contextlib.suppress(TimeoutError):
|
raise d.exception()
|
||||||
await tasks.wait_for(this_failed.wait(), delay)
|
return winner_result, winner_index, exceptions
|
||||||
except* _Done:
|
finally:
|
||||||
pass
|
# Make sure no tasks are left running if we leave this function
|
||||||
|
for t in running_tasks:
|
||||||
return winner_result, winner_index, exceptions
|
t.cancel()
|
||||||
|
|
|
@ -218,53 +218,6 @@ class EagerTaskFactoryLoopTests:
|
||||||
|
|
||||||
self.run_coro(run())
|
self.run_coro(run())
|
||||||
|
|
||||||
def test_staggered_race_with_eager_tasks(self):
|
|
||||||
# See https://github.com/python/cpython/issues/124309
|
|
||||||
|
|
||||||
async def fail():
|
|
||||||
await asyncio.sleep(0)
|
|
||||||
raise ValueError("no good")
|
|
||||||
|
|
||||||
async def run():
|
|
||||||
winner, index, excs = await asyncio.staggered.staggered_race(
|
|
||||||
[
|
|
||||||
lambda: asyncio.sleep(2, result="sleep2"),
|
|
||||||
lambda: asyncio.sleep(1, result="sleep1"),
|
|
||||||
lambda: fail()
|
|
||||||
],
|
|
||||||
delay=0.25
|
|
||||||
)
|
|
||||||
self.assertEqual(winner, 'sleep1')
|
|
||||||
self.assertEqual(index, 1)
|
|
||||||
self.assertIsNone(excs[index])
|
|
||||||
self.assertIsInstance(excs[0], asyncio.CancelledError)
|
|
||||||
self.assertIsInstance(excs[2], ValueError)
|
|
||||||
|
|
||||||
self.run_coro(run())
|
|
||||||
|
|
||||||
def test_staggered_race_with_eager_tasks_no_delay(self):
|
|
||||||
# See https://github.com/python/cpython/issues/124309
|
|
||||||
async def fail():
|
|
||||||
raise ValueError("no good")
|
|
||||||
|
|
||||||
async def run():
|
|
||||||
winner, index, excs = await asyncio.staggered.staggered_race(
|
|
||||||
[
|
|
||||||
lambda: fail(),
|
|
||||||
lambda: asyncio.sleep(1, result="sleep1"),
|
|
||||||
lambda: asyncio.sleep(0, result="sleep0"),
|
|
||||||
],
|
|
||||||
delay=None
|
|
||||||
)
|
|
||||||
self.assertEqual(winner, 'sleep1')
|
|
||||||
self.assertEqual(index, 1)
|
|
||||||
self.assertIsNone(excs[index])
|
|
||||||
self.assertIsInstance(excs[0], ValueError)
|
|
||||||
self.assertEqual(len(excs), 2)
|
|
||||||
|
|
||||||
self.run_coro(run())
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class PyEagerTaskFactoryLoopTests(EagerTaskFactoryLoopTests, test_utils.TestCase):
|
class PyEagerTaskFactoryLoopTests(EagerTaskFactoryLoopTests, test_utils.TestCase):
|
||||||
Task = tasks._PyTask
|
Task = tasks._PyTask
|
||||||
|
|
|
@ -82,64 +82,16 @@ class StaggeredTests(unittest.IsolatedAsyncioTestCase):
|
||||||
async def coro(index):
|
async def coro(index):
|
||||||
raise ValueError(index)
|
raise ValueError(index)
|
||||||
|
|
||||||
for delay in [None, 0, 0.1, 1]:
|
|
||||||
with self.subTest(delay=delay):
|
|
||||||
winner, index, excs = await staggered_race(
|
|
||||||
[
|
|
||||||
lambda: coro(0),
|
|
||||||
lambda: coro(1),
|
|
||||||
],
|
|
||||||
delay=delay,
|
|
||||||
)
|
|
||||||
|
|
||||||
self.assertIs(winner, None)
|
|
||||||
self.assertIs(index, None)
|
|
||||||
self.assertEqual(len(excs), 2)
|
|
||||||
self.assertIsInstance(excs[0], ValueError)
|
|
||||||
self.assertIsInstance(excs[1], ValueError)
|
|
||||||
|
|
||||||
async def test_long_delay_early_failure(self):
|
|
||||||
async def coro(index):
|
|
||||||
await asyncio.sleep(0) # Dummy coroutine for the 1 case
|
|
||||||
if index == 0:
|
|
||||||
await asyncio.sleep(0.1) # Dummy coroutine
|
|
||||||
raise ValueError(index)
|
|
||||||
|
|
||||||
return f'Res: {index}'
|
|
||||||
|
|
||||||
winner, index, excs = await staggered_race(
|
winner, index, excs = await staggered_race(
|
||||||
[
|
[
|
||||||
lambda: coro(0),
|
lambda: coro(0),
|
||||||
lambda: coro(1),
|
lambda: coro(1),
|
||||||
],
|
],
|
||||||
delay=10,
|
delay=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
self.assertEqual(winner, 'Res: 1')
|
self.assertIs(winner, None)
|
||||||
self.assertEqual(index, 1)
|
self.assertIs(index, None)
|
||||||
self.assertEqual(len(excs), 2)
|
self.assertEqual(len(excs), 2)
|
||||||
self.assertIsInstance(excs[0], ValueError)
|
self.assertIsInstance(excs[0], ValueError)
|
||||||
self.assertIsNone(excs[1])
|
self.assertIsInstance(excs[1], ValueError)
|
||||||
|
|
||||||
def test_loop_argument(self):
|
|
||||||
loop = asyncio.new_event_loop()
|
|
||||||
async def coro():
|
|
||||||
self.assertEqual(loop, asyncio.get_running_loop())
|
|
||||||
return 'coro'
|
|
||||||
|
|
||||||
async def main():
|
|
||||||
winner, index, excs = await staggered_race(
|
|
||||||
[coro],
|
|
||||||
delay=0.1,
|
|
||||||
loop=loop
|
|
||||||
)
|
|
||||||
|
|
||||||
self.assertEqual(winner, 'coro')
|
|
||||||
self.assertEqual(index, 0)
|
|
||||||
|
|
||||||
loop.run_until_complete(main())
|
|
||||||
loop.close()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
unittest.main()
|
|
||||||
|
|
|
@ -1 +0,0 @@
|
||||||
Fixed :exc:`AssertionError` when using :func:`!asyncio.staggered.staggered_race` with :attr:`asyncio.eager_task_factory`.
|
|
Loading…
Add table
Add a link
Reference in a new issue