mirror of
https://github.com/python/cpython.git
synced 2025-09-26 18:29:57 +00:00
bpo-32751: Wait for task cancellation in asyncio.wait_for() (GH-7216)
Currently, asyncio.wait_for(fut), upon reaching the timeout deadline, cancels the future and returns immediately. This is problematic for when *fut* is a Task, because it will be left running for an arbitrary amount of time. This behavior is iself surprising and may lead to related bugs such as the one described in bpo-33638: condition = asyncio.Condition() async with condition: await asyncio.wait_for(condition.wait(), timeout=0.5) Currently, instead of raising a TimeoutError, the above code will fail with `RuntimeError: cannot wait on un-acquired lock`, because `__aexit__` is reached _before_ `condition.wait()` finishes its cancellation and re-acquires the condition lock. To resolve this, make `wait_for` await for the task cancellation. The tradeoff here is that the `timeout` promise may be broken if the task decides to handle its cancellation in a slow way. This represents a behavior change and should probably not be back-patched to 3.6 and earlier.
This commit is contained in:
parent
863b674909
commit
e2b340ab41
5 changed files with 100 additions and 3 deletions
|
@ -790,7 +790,9 @@ Task functions
|
||||||
|
|
||||||
Returns result of the Future or coroutine. When a timeout occurs, it
|
Returns result of the Future or coroutine. When a timeout occurs, it
|
||||||
cancels the task and raises :exc:`asyncio.TimeoutError`. To avoid the task
|
cancels the task and raises :exc:`asyncio.TimeoutError`. To avoid the task
|
||||||
cancellation, wrap it in :func:`shield`.
|
cancellation, wrap it in :func:`shield`. The function will wait until
|
||||||
|
the future is actually cancelled, so the total wait time may exceed
|
||||||
|
the *timeout*.
|
||||||
|
|
||||||
If the wait is cancelled, the future *fut* is also cancelled.
|
If the wait is cancelled, the future *fut* is also cancelled.
|
||||||
|
|
||||||
|
@ -800,3 +802,8 @@ Task functions
|
||||||
|
|
||||||
.. versionchanged:: 3.4.3
|
.. versionchanged:: 3.4.3
|
||||||
If the wait is cancelled, the future *fut* is now also cancelled.
|
If the wait is cancelled, the future *fut* is now also cancelled.
|
||||||
|
|
||||||
|
.. versionchanged:: 3.7
|
||||||
|
When *fut* is cancelled due to a timeout, ``wait_for`` now waits
|
||||||
|
for *fut* to be cancelled. Previously,
|
||||||
|
it raised :exc:`~asyncio.TimeoutError` immediately.
|
||||||
|
|
|
@ -412,14 +412,17 @@ async def wait_for(fut, timeout, *, loop=None):
|
||||||
return fut.result()
|
return fut.result()
|
||||||
else:
|
else:
|
||||||
fut.remove_done_callback(cb)
|
fut.remove_done_callback(cb)
|
||||||
fut.cancel()
|
# We must ensure that the task is not running
|
||||||
|
# after wait_for() returns.
|
||||||
|
# See https://bugs.python.org/issue32751
|
||||||
|
await _cancel_and_wait(fut, loop=loop)
|
||||||
raise futures.TimeoutError()
|
raise futures.TimeoutError()
|
||||||
finally:
|
finally:
|
||||||
timeout_handle.cancel()
|
timeout_handle.cancel()
|
||||||
|
|
||||||
|
|
||||||
async def _wait(fs, timeout, return_when, loop):
|
async def _wait(fs, timeout, return_when, loop):
|
||||||
"""Internal helper for wait() and wait_for().
|
"""Internal helper for wait().
|
||||||
|
|
||||||
The fs argument must be a collection of Futures.
|
The fs argument must be a collection of Futures.
|
||||||
"""
|
"""
|
||||||
|
@ -461,6 +464,22 @@ async def _wait(fs, timeout, return_when, loop):
|
||||||
return done, pending
|
return done, pending
|
||||||
|
|
||||||
|
|
||||||
|
async def _cancel_and_wait(fut, loop):
|
||||||
|
"""Cancel the *fut* future or task and wait until it completes."""
|
||||||
|
|
||||||
|
waiter = loop.create_future()
|
||||||
|
cb = functools.partial(_release_waiter, waiter)
|
||||||
|
fut.add_done_callback(cb)
|
||||||
|
|
||||||
|
try:
|
||||||
|
fut.cancel()
|
||||||
|
# We cannot wait on *fut* directly to make
|
||||||
|
# sure _cancel_and_wait itself is reliably cancellable.
|
||||||
|
await waiter
|
||||||
|
finally:
|
||||||
|
fut.remove_done_callback(cb)
|
||||||
|
|
||||||
|
|
||||||
# This is *not* a @coroutine! It is just an iterator (yielding Futures).
|
# This is *not* a @coroutine! It is just an iterator (yielding Futures).
|
||||||
def as_completed(fs, *, loop=None, timeout=None):
|
def as_completed(fs, *, loop=None, timeout=None):
|
||||||
"""Return an iterator whose values are coroutines.
|
"""Return an iterator whose values are coroutines.
|
||||||
|
|
|
@ -807,6 +807,19 @@ class ConditionTests(test_utils.TestCase):
|
||||||
with self.assertRaises(ValueError):
|
with self.assertRaises(ValueError):
|
||||||
asyncio.Condition(lock, loop=loop)
|
asyncio.Condition(lock, loop=loop)
|
||||||
|
|
||||||
|
def test_timeout_in_block(self):
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
self.addCleanup(loop.close)
|
||||||
|
|
||||||
|
async def task_timeout():
|
||||||
|
condition = asyncio.Condition(loop=loop)
|
||||||
|
async with condition:
|
||||||
|
with self.assertRaises(asyncio.TimeoutError):
|
||||||
|
await asyncio.wait_for(condition.wait(), timeout=0.5,
|
||||||
|
loop=loop)
|
||||||
|
|
||||||
|
loop.run_until_complete(task_timeout())
|
||||||
|
|
||||||
|
|
||||||
class SemaphoreTests(test_utils.TestCase):
|
class SemaphoreTests(test_utils.TestCase):
|
||||||
|
|
||||||
|
|
|
@ -789,6 +789,62 @@ class BaseTaskTests:
|
||||||
res = loop.run_until_complete(task)
|
res = loop.run_until_complete(task)
|
||||||
self.assertEqual(res, "ok")
|
self.assertEqual(res, "ok")
|
||||||
|
|
||||||
|
def test_wait_for_waits_for_task_cancellation(self):
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
self.addCleanup(loop.close)
|
||||||
|
|
||||||
|
task_done = False
|
||||||
|
|
||||||
|
async def foo():
|
||||||
|
async def inner():
|
||||||
|
nonlocal task_done
|
||||||
|
try:
|
||||||
|
await asyncio.sleep(0.2, loop=loop)
|
||||||
|
finally:
|
||||||
|
task_done = True
|
||||||
|
|
||||||
|
inner_task = self.new_task(loop, inner())
|
||||||
|
|
||||||
|
with self.assertRaises(asyncio.TimeoutError):
|
||||||
|
await asyncio.wait_for(inner_task, timeout=0.1, loop=loop)
|
||||||
|
|
||||||
|
self.assertTrue(task_done)
|
||||||
|
|
||||||
|
loop.run_until_complete(foo())
|
||||||
|
|
||||||
|
def test_wait_for_self_cancellation(self):
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
self.addCleanup(loop.close)
|
||||||
|
|
||||||
|
async def foo():
|
||||||
|
async def inner():
|
||||||
|
try:
|
||||||
|
await asyncio.sleep(0.3, loop=loop)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
try:
|
||||||
|
await asyncio.sleep(0.3, loop=loop)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
await asyncio.sleep(0.3, loop=loop)
|
||||||
|
|
||||||
|
return 42
|
||||||
|
|
||||||
|
inner_task = self.new_task(loop, inner())
|
||||||
|
|
||||||
|
wait = asyncio.wait_for(inner_task, timeout=0.1, loop=loop)
|
||||||
|
|
||||||
|
# Test that wait_for itself is properly cancellable
|
||||||
|
# even when the initial task holds up the initial cancellation.
|
||||||
|
task = self.new_task(loop, wait)
|
||||||
|
await asyncio.sleep(0.2, loop=loop)
|
||||||
|
task.cancel()
|
||||||
|
|
||||||
|
with self.assertRaises(asyncio.CancelledError):
|
||||||
|
await task
|
||||||
|
|
||||||
|
self.assertEqual(await inner_task, 42)
|
||||||
|
|
||||||
|
loop.run_until_complete(foo())
|
||||||
|
|
||||||
def test_wait(self):
|
def test_wait(self):
|
||||||
|
|
||||||
def gen():
|
def gen():
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
When cancelling the task due to a timeout, :meth:`asyncio.wait_for` will now
|
||||||
|
wait until the cancellation is complete.
|
Loading…
Add table
Add a link
Reference in a new issue