mirror of
https://github.com/python/cpython.git
synced 2025-08-04 08:59:19 +00:00
gh-86802: Fix asyncio memory leak; shielded task exceptions log once through the exception handler (gh-134331)
Co-authored-by: Łukasz Langa <lukasz@langa.pl>
This commit is contained in:
parent
f3acbb72ff
commit
f695eca60c
3 changed files with 72 additions and 7 deletions
|
@ -908,6 +908,25 @@ def gather(*coros_or_futures, return_exceptions=False):
|
||||||
return outer
|
return outer
|
||||||
|
|
||||||
|
|
||||||
|
def _log_on_exception(fut):
|
||||||
|
if fut.cancelled():
|
||||||
|
return
|
||||||
|
|
||||||
|
exc = fut.exception()
|
||||||
|
if exc is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
context = {
|
||||||
|
'message':
|
||||||
|
f'{exc.__class__.__name__} exception in shielded future',
|
||||||
|
'exception': exc,
|
||||||
|
'future': fut,
|
||||||
|
}
|
||||||
|
if fut._source_traceback:
|
||||||
|
context['source_traceback'] = fut._source_traceback
|
||||||
|
fut._loop.call_exception_handler(context)
|
||||||
|
|
||||||
|
|
||||||
def shield(arg):
|
def shield(arg):
|
||||||
"""Wait for a future, shielding it from cancellation.
|
"""Wait for a future, shielding it from cancellation.
|
||||||
|
|
||||||
|
@ -953,14 +972,11 @@ def shield(arg):
|
||||||
else:
|
else:
|
||||||
cur_task = None
|
cur_task = None
|
||||||
|
|
||||||
def _inner_done_callback(inner, cur_task=cur_task):
|
def _clear_awaited_by_callback(inner):
|
||||||
if cur_task is not None:
|
futures.future_discard_from_awaited_by(inner, cur_task)
|
||||||
futures.future_discard_from_awaited_by(inner, cur_task)
|
|
||||||
|
|
||||||
|
def _inner_done_callback(inner):
|
||||||
if outer.cancelled():
|
if outer.cancelled():
|
||||||
if not inner.cancelled():
|
|
||||||
# Mark inner's result as retrieved.
|
|
||||||
inner.exception()
|
|
||||||
return
|
return
|
||||||
|
|
||||||
if inner.cancelled():
|
if inner.cancelled():
|
||||||
|
@ -972,10 +988,16 @@ def shield(arg):
|
||||||
else:
|
else:
|
||||||
outer.set_result(inner.result())
|
outer.set_result(inner.result())
|
||||||
|
|
||||||
|
|
||||||
def _outer_done_callback(outer):
|
def _outer_done_callback(outer):
|
||||||
if not inner.done():
|
if not inner.done():
|
||||||
inner.remove_done_callback(_inner_done_callback)
|
inner.remove_done_callback(_inner_done_callback)
|
||||||
|
# Keep only one callback to log on cancel
|
||||||
|
inner.remove_done_callback(_log_on_exception)
|
||||||
|
inner.add_done_callback(_log_on_exception)
|
||||||
|
|
||||||
|
if cur_task is not None:
|
||||||
|
inner.add_done_callback(_clear_awaited_by_callback)
|
||||||
|
|
||||||
|
|
||||||
inner.add_done_callback(_inner_done_callback)
|
inner.add_done_callback(_inner_done_callback)
|
||||||
outer.add_done_callback(_outer_done_callback)
|
outer.add_done_callback(_outer_done_callback)
|
||||||
|
|
|
@ -2116,6 +2116,46 @@ class BaseTaskTests:
|
||||||
self.assertTrue(outer.cancelled())
|
self.assertTrue(outer.cancelled())
|
||||||
self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks))
|
self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks))
|
||||||
|
|
||||||
|
def test_shield_cancel_outer_result(self):
|
||||||
|
mock_handler = mock.Mock()
|
||||||
|
self.loop.set_exception_handler(mock_handler)
|
||||||
|
inner = self.new_future(self.loop)
|
||||||
|
outer = asyncio.shield(inner)
|
||||||
|
test_utils.run_briefly(self.loop)
|
||||||
|
outer.cancel()
|
||||||
|
test_utils.run_briefly(self.loop)
|
||||||
|
inner.set_result(1)
|
||||||
|
test_utils.run_briefly(self.loop)
|
||||||
|
mock_handler.assert_not_called()
|
||||||
|
|
||||||
|
def test_shield_cancel_outer_exception(self):
|
||||||
|
mock_handler = mock.Mock()
|
||||||
|
self.loop.set_exception_handler(mock_handler)
|
||||||
|
inner = self.new_future(self.loop)
|
||||||
|
outer = asyncio.shield(inner)
|
||||||
|
test_utils.run_briefly(self.loop)
|
||||||
|
outer.cancel()
|
||||||
|
test_utils.run_briefly(self.loop)
|
||||||
|
inner.set_exception(Exception('foo'))
|
||||||
|
test_utils.run_briefly(self.loop)
|
||||||
|
mock_handler.assert_called_once()
|
||||||
|
|
||||||
|
def test_shield_duplicate_log_once(self):
|
||||||
|
mock_handler = mock.Mock()
|
||||||
|
self.loop.set_exception_handler(mock_handler)
|
||||||
|
inner = self.new_future(self.loop)
|
||||||
|
outer = asyncio.shield(inner)
|
||||||
|
test_utils.run_briefly(self.loop)
|
||||||
|
outer.cancel()
|
||||||
|
test_utils.run_briefly(self.loop)
|
||||||
|
outer = asyncio.shield(inner)
|
||||||
|
test_utils.run_briefly(self.loop)
|
||||||
|
outer.cancel()
|
||||||
|
test_utils.run_briefly(self.loop)
|
||||||
|
inner.set_exception(Exception('foo'))
|
||||||
|
test_utils.run_briefly(self.loop)
|
||||||
|
mock_handler.assert_called_once()
|
||||||
|
|
||||||
def test_shield_shortcut(self):
|
def test_shield_shortcut(self):
|
||||||
fut = self.new_future(self.loop)
|
fut = self.new_future(self.loop)
|
||||||
fut.set_result(42)
|
fut.set_result(42)
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
Fixed asyncio memory leak in cancelled shield tasks. For shielded tasks
|
||||||
|
where the shield was cancelled, log potential exceptions through the
|
||||||
|
exception handler. Contributed by Christian Harries.
|
Loading…
Add table
Add a link
Reference in a new issue