bpo-34037: Fix test_asyncio failure and add loop.shutdown_default_executor() (GH-15735)

This commit is contained in:
Kyle Stanley 2019-09-19 08:47:22 -04:00 committed by Andrew Svetlov
parent 3171d67a6a
commit 9fdc64cf12
6 changed files with 54 additions and 2 deletions

View file

@ -167,6 +167,18 @@ Running and stopping the loop
.. versionadded:: 3.6 .. versionadded:: 3.6
.. coroutinemethod:: loop.shutdown_default_executor()
Schedule the closure of the default executor and wait for it to join all of
the threads in the :class:`ThreadPoolExecutor`. After calling this method, a
:exc:`RuntimeError` will be raised if :meth:`loop.run_in_executor` is called
while using the default executor.
Note that there is no need to call this function when
:func:`asyncio.run` is used.
.. versionadded:: 3.9
Scheduling callbacks Scheduling callbacks
^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^

View file

@ -213,8 +213,8 @@ Running an asyncio Program
.. function:: run(coro, \*, debug=False) .. function:: run(coro, \*, debug=False)
This function runs the passed coroutine, taking care of This function runs the passed coroutine, taking care of
managing the asyncio event loop and *finalizing asynchronous managing the asyncio event loop, *finalizing asynchronous
generators*. generators*, and closing the threadpool.
This function cannot be called when another asyncio event loop is This function cannot be called when another asyncio event loop is
running in the same thread. running in the same thread.
@ -229,6 +229,8 @@ Running an asyncio Program
**Important:** this function has been added to asyncio in **Important:** this function has been added to asyncio in
Python 3.7 on a :term:`provisional basis <provisional api>`. Python 3.7 on a :term:`provisional basis <provisional api>`.
.. versionchanged:: 3.9
Updated to use :meth:`loop.shutdown_default_executor`.
Creating Tasks Creating Tasks
============== ==============

View file

@ -406,6 +406,8 @@ class BaseEventLoop(events.AbstractEventLoop):
self._asyncgens = weakref.WeakSet() self._asyncgens = weakref.WeakSet()
# Set to True when `loop.shutdown_asyncgens` is called. # Set to True when `loop.shutdown_asyncgens` is called.
self._asyncgens_shutdown_called = False self._asyncgens_shutdown_called = False
# Set to True when `loop.shutdown_default_executor` is called.
self._executor_shutdown_called = False
def __repr__(self): def __repr__(self):
return ( return (
@ -503,6 +505,10 @@ class BaseEventLoop(events.AbstractEventLoop):
if self._closed: if self._closed:
raise RuntimeError('Event loop is closed') raise RuntimeError('Event loop is closed')
def _check_default_executor(self):
if self._executor_shutdown_called:
raise RuntimeError('Executor shutdown has been called')
def _asyncgen_finalizer_hook(self, agen): def _asyncgen_finalizer_hook(self, agen):
self._asyncgens.discard(agen) self._asyncgens.discard(agen)
if not self.is_closed(): if not self.is_closed():
@ -543,6 +549,26 @@ class BaseEventLoop(events.AbstractEventLoop):
'asyncgen': agen 'asyncgen': agen
}) })
async def shutdown_default_executor(self):
"""Schedule the shutdown of the default executor."""
self._executor_shutdown_called = True
if self._default_executor is None:
return
future = self.create_future()
thread = threading.Thread(target=self._do_shutdown, args=(future,))
thread.start()
try:
await future
finally:
thread.join()
def _do_shutdown(self, future):
try:
self._default_executor.shutdown(wait=True)
self.call_soon_threadsafe(future.set_result, None)
except Exception as ex:
self.call_soon_threadsafe(future.set_exception, ex)
def run_forever(self): def run_forever(self):
"""Run until stop() is called.""" """Run until stop() is called."""
self._check_closed() self._check_closed()
@ -632,6 +658,7 @@ class BaseEventLoop(events.AbstractEventLoop):
self._closed = True self._closed = True
self._ready.clear() self._ready.clear()
self._scheduled.clear() self._scheduled.clear()
self._executor_shutdown_called = True
executor = self._default_executor executor = self._default_executor
if executor is not None: if executor is not None:
self._default_executor = None self._default_executor = None
@ -768,6 +795,8 @@ class BaseEventLoop(events.AbstractEventLoop):
self._check_callback(func, 'run_in_executor') self._check_callback(func, 'run_in_executor')
if executor is None: if executor is None:
executor = self._default_executor executor = self._default_executor
# Only check when the default executor is being used
self._check_default_executor()
if executor is None: if executor is None:
executor = concurrent.futures.ThreadPoolExecutor() executor = concurrent.futures.ThreadPoolExecutor()
self._default_executor = executor self._default_executor = executor

View file

@ -249,6 +249,10 @@ class AbstractEventLoop:
"""Shutdown all active asynchronous generators.""" """Shutdown all active asynchronous generators."""
raise NotImplementedError raise NotImplementedError
async def shutdown_default_executor(self):
"""Schedule the shutdown of the default executor."""
raise NotImplementedError
# Methods scheduling callbacks. All these return Handles. # Methods scheduling callbacks. All these return Handles.
def _timer_handle_cancelled(self, handle): def _timer_handle_cancelled(self, handle):

View file

@ -45,6 +45,7 @@ def run(main, *, debug=False):
try: try:
_cancel_all_tasks(loop) _cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens()) loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(loop.shutdown_default_executor())
finally: finally:
events.set_event_loop(None) events.set_event_loop(None)
loop.close() loop.close()

View file

@ -0,0 +1,4 @@
For :mod:`asyncio`, add a new coroutine :meth:`loop.shutdown_default_executor`.
The new coroutine provides an API to schedule an executor shutdown that waits
on the threadpool to finish closing. Also, :func:`asyncio.run` has been updated
to utilize the new coroutine. Patch by Kyle Stanley.