mirror of
https://github.com/python/cpython.git
synced 2025-07-24 11:44:31 +00:00
Issue #28003: Implement PEP 525 -- Asynchronous Generators.
This commit is contained in:
parent
b96ef55d49
commit
eb6364557f
27 changed files with 2189 additions and 96 deletions
|
@ -13,7 +13,6 @@ conscious design decision, leaving the door open for keyword arguments
|
|||
to modify the meaning of the API call itself.
|
||||
"""
|
||||
|
||||
|
||||
import collections
|
||||
import concurrent.futures
|
||||
import heapq
|
||||
|
@ -28,6 +27,7 @@ import time
|
|||
import traceback
|
||||
import sys
|
||||
import warnings
|
||||
import weakref
|
||||
|
||||
from . import compat
|
||||
from . import coroutines
|
||||
|
@ -242,6 +242,13 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
self._task_factory = None
|
||||
self._coroutine_wrapper_set = False
|
||||
|
||||
# A weak set of all asynchronous generators that are being iterated
|
||||
# by the loop.
|
||||
self._asyncgens = weakref.WeakSet()
|
||||
|
||||
# Set to True when `loop.shutdown_asyncgens` is called.
|
||||
self._asyncgens_shutdown_called = False
|
||||
|
||||
def __repr__(self):
|
||||
return ('<%s running=%s closed=%s debug=%s>'
|
||||
% (self.__class__.__name__, self.is_running(),
|
||||
|
@ -333,6 +340,46 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
if self._closed:
|
||||
raise RuntimeError('Event loop is closed')
|
||||
|
||||
def _asyncgen_finalizer_hook(self, agen):
|
||||
self._asyncgens.discard(agen)
|
||||
if not self.is_closed():
|
||||
self.create_task(agen.aclose())
|
||||
|
||||
def _asyncgen_firstiter_hook(self, agen):
|
||||
if self._asyncgens_shutdown_called:
|
||||
warnings.warn(
|
||||
"asynchronous generator {!r} was scheduled after "
|
||||
"loop.shutdown_asyncgens() call".format(agen),
|
||||
ResourceWarning, source=self)
|
||||
|
||||
self._asyncgens.add(agen)
|
||||
|
||||
@coroutine
|
||||
def shutdown_asyncgens(self):
|
||||
"""Shutdown all active asynchronous generators."""
|
||||
self._asyncgens_shutdown_called = True
|
||||
|
||||
if not len(self._asyncgens):
|
||||
return
|
||||
|
||||
closing_agens = list(self._asyncgens)
|
||||
self._asyncgens.clear()
|
||||
|
||||
shutdown_coro = tasks.gather(
|
||||
*[ag.aclose() for ag in closing_agens],
|
||||
return_exceptions=True,
|
||||
loop=self)
|
||||
|
||||
results = yield from shutdown_coro
|
||||
for result, agen in zip(results, closing_agens):
|
||||
if isinstance(result, Exception):
|
||||
self.call_exception_handler({
|
||||
'message': 'an error occurred during closing of '
|
||||
'asynchronous generator {!r}'.format(agen),
|
||||
'exception': result,
|
||||
'asyncgen': agen
|
||||
})
|
||||
|
||||
def run_forever(self):
|
||||
"""Run until stop() is called."""
|
||||
self._check_closed()
|
||||
|
@ -340,6 +387,9 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
raise RuntimeError('Event loop is running.')
|
||||
self._set_coroutine_wrapper(self._debug)
|
||||
self._thread_id = threading.get_ident()
|
||||
old_agen_hooks = sys.get_asyncgen_hooks()
|
||||
sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
|
||||
finalizer=self._asyncgen_finalizer_hook)
|
||||
try:
|
||||
while True:
|
||||
self._run_once()
|
||||
|
@ -349,6 +399,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
self._stopping = False
|
||||
self._thread_id = None
|
||||
self._set_coroutine_wrapper(False)
|
||||
sys.set_asyncgen_hooks(*old_agen_hooks)
|
||||
|
||||
def run_until_complete(self, future):
|
||||
"""Run until the Future is done.
|
||||
|
@ -1179,7 +1230,9 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
- 'handle' (optional): Handle instance;
|
||||
- 'protocol' (optional): Protocol instance;
|
||||
- 'transport' (optional): Transport instance;
|
||||
- 'socket' (optional): Socket instance.
|
||||
- 'socket' (optional): Socket instance;
|
||||
- 'asyncgen' (optional): Asynchronous generator that caused
|
||||
the exception.
|
||||
|
||||
New keys maybe introduced in the future.
|
||||
|
||||
|
|
|
@ -276,7 +276,10 @@ def _format_coroutine(coro):
|
|||
try:
|
||||
coro_code = coro.gi_code
|
||||
except AttributeError:
|
||||
coro_code = coro.cr_code
|
||||
try:
|
||||
coro_code = coro.cr_code
|
||||
except AttributeError:
|
||||
return repr(coro)
|
||||
|
||||
try:
|
||||
coro_frame = coro.gi_frame
|
||||
|
|
|
@ -248,6 +248,10 @@ class AbstractEventLoop:
|
|||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def shutdown_asyncgens(self):
|
||||
"""Shutdown all active asynchronous generators."""
|
||||
raise NotImplementedError
|
||||
|
||||
# Methods scheduling callbacks. All these return Handles.
|
||||
|
||||
def _timer_handle_cancelled(self, handle):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue