mirror of
https://github.com/python/cpython.git
synced 2025-09-26 18:29:57 +00:00
GH-82448: Add thread timeout for loop.shutdown_default_executor (#97561)
Co-authored-by: Kyle Stanley <aeros167@gmail.com>
This commit is contained in:
parent
9a404b173e
commit
575a253b5c
6 changed files with 44 additions and 9 deletions
|
@ -180,18 +180,27 @@ Running and stopping the loop
|
||||||
|
|
||||||
.. versionadded:: 3.6
|
.. versionadded:: 3.6
|
||||||
|
|
||||||
.. coroutinemethod:: loop.shutdown_default_executor()
|
.. coroutinemethod:: loop.shutdown_default_executor(timeout=None)
|
||||||
|
|
||||||
Schedule the closure of the default executor and wait for it to join all of
|
Schedule the closure of the default executor and wait for it to join all of
|
||||||
the threads in the :class:`ThreadPoolExecutor`. After calling this method, a
|
the threads in the :class:`ThreadPoolExecutor`. After calling this method, a
|
||||||
:exc:`RuntimeError` will be raised if :meth:`loop.run_in_executor` is called
|
:exc:`RuntimeError` will be raised if :meth:`loop.run_in_executor` is called
|
||||||
while using the default executor.
|
while using the default executor.
|
||||||
|
|
||||||
|
The *timeout* parameter specifies the amount of time the executor will
|
||||||
|
be given to finish joining. The default value is ``None``, which means the
|
||||||
|
executor will be given an unlimited amount of time.
|
||||||
|
|
||||||
|
If the timeout duration is reached, a warning is emitted and executor is
|
||||||
|
terminated without waiting for its threads to finish joining.
|
||||||
|
|
||||||
Note that there is no need to call this function when
|
Note that there is no need to call this function when
|
||||||
:func:`asyncio.run` is used.
|
:func:`asyncio.run` is used.
|
||||||
|
|
||||||
.. versionadded:: 3.9
|
.. versionadded:: 3.9
|
||||||
|
|
||||||
|
.. versionchanged:: 3.12
|
||||||
|
Added the *timeout* parameter.
|
||||||
|
|
||||||
Scheduling callbacks
|
Scheduling callbacks
|
||||||
^^^^^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
|
@ -28,7 +28,7 @@ Running an asyncio Program
|
||||||
|
|
||||||
This function runs the passed coroutine, taking care of
|
This function runs the passed coroutine, taking care of
|
||||||
managing the asyncio event loop, *finalizing asynchronous
|
managing the asyncio event loop, *finalizing asynchronous
|
||||||
generators*, and closing the threadpool.
|
generators*, and closing the executor.
|
||||||
|
|
||||||
This function cannot be called when another asyncio event loop is
|
This function cannot be called when another asyncio event loop is
|
||||||
running in the same thread.
|
running in the same thread.
|
||||||
|
@ -41,6 +41,10 @@ Running an asyncio Program
|
||||||
the end. It should be used as a main entry point for asyncio
|
the end. It should be used as a main entry point for asyncio
|
||||||
programs, and should ideally only be called once.
|
programs, and should ideally only be called once.
|
||||||
|
|
||||||
|
The executor is given a timeout duration of 5 minutes to shutdown.
|
||||||
|
If the executor hasn't finished within that duration, a warning is
|
||||||
|
emitted and the executor is closed.
|
||||||
|
|
||||||
Example::
|
Example::
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
|
|
@ -561,8 +561,13 @@ class BaseEventLoop(events.AbstractEventLoop):
|
||||||
'asyncgen': agen
|
'asyncgen': agen
|
||||||
})
|
})
|
||||||
|
|
||||||
async def shutdown_default_executor(self):
|
async def shutdown_default_executor(self, timeout=None):
|
||||||
"""Schedule the shutdown of the default executor."""
|
"""Schedule the shutdown of the default executor.
|
||||||
|
|
||||||
|
The timeout parameter specifies the amount of time the executor will
|
||||||
|
be given to finish joining. The default value is None, which means
|
||||||
|
that the executor will be given an unlimited amount of time.
|
||||||
|
"""
|
||||||
self._executor_shutdown_called = True
|
self._executor_shutdown_called = True
|
||||||
if self._default_executor is None:
|
if self._default_executor is None:
|
||||||
return
|
return
|
||||||
|
@ -572,7 +577,13 @@ class BaseEventLoop(events.AbstractEventLoop):
|
||||||
try:
|
try:
|
||||||
await future
|
await future
|
||||||
finally:
|
finally:
|
||||||
thread.join()
|
thread.join(timeout)
|
||||||
|
|
||||||
|
if thread.is_alive():
|
||||||
|
warnings.warn("The executor did not finishing joining "
|
||||||
|
f"its threads within {timeout} seconds.",
|
||||||
|
RuntimeWarning, stacklevel=2)
|
||||||
|
self._default_executor.shutdown(wait=False)
|
||||||
|
|
||||||
def _do_shutdown(self, future):
|
def _do_shutdown(self, future):
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -26,6 +26,9 @@ SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 256
|
||||||
FLOW_CONTROL_HIGH_WATER_SSL_READ = 256 # KiB
|
FLOW_CONTROL_HIGH_WATER_SSL_READ = 256 # KiB
|
||||||
FLOW_CONTROL_HIGH_WATER_SSL_WRITE = 512 # KiB
|
FLOW_CONTROL_HIGH_WATER_SSL_WRITE = 512 # KiB
|
||||||
|
|
||||||
|
# Default timeout for joining the threads in the threadpool
|
||||||
|
THREAD_JOIN_TIMEOUT = 300
|
||||||
|
|
||||||
# The enum should be here to break circular dependencies between
|
# The enum should be here to break circular dependencies between
|
||||||
# base_events and sslproto
|
# base_events and sslproto
|
||||||
class _SendfileMode(enum.Enum):
|
class _SendfileMode(enum.Enum):
|
||||||
|
|
|
@ -9,7 +9,7 @@ from . import coroutines
|
||||||
from . import events
|
from . import events
|
||||||
from . import exceptions
|
from . import exceptions
|
||||||
from . import tasks
|
from . import tasks
|
||||||
|
from . import constants
|
||||||
|
|
||||||
class _State(enum.Enum):
|
class _State(enum.Enum):
|
||||||
CREATED = "created"
|
CREATED = "created"
|
||||||
|
@ -69,7 +69,8 @@ class Runner:
|
||||||
loop = self._loop
|
loop = self._loop
|
||||||
_cancel_all_tasks(loop)
|
_cancel_all_tasks(loop)
|
||||||
loop.run_until_complete(loop.shutdown_asyncgens())
|
loop.run_until_complete(loop.shutdown_asyncgens())
|
||||||
loop.run_until_complete(loop.shutdown_default_executor())
|
loop.run_until_complete(
|
||||||
|
loop.shutdown_default_executor(constants.THREAD_JOIN_TIMEOUT))
|
||||||
finally:
|
finally:
|
||||||
if self._set_event_loop:
|
if self._set_event_loop:
|
||||||
events.set_event_loop(None)
|
events.set_event_loop(None)
|
||||||
|
@ -160,8 +161,8 @@ def run(main, *, debug=None):
|
||||||
"""Execute the coroutine and return the result.
|
"""Execute the coroutine and return the result.
|
||||||
|
|
||||||
This function runs the passed coroutine, taking care of
|
This function runs the passed coroutine, taking care of
|
||||||
managing the asyncio event loop and finalizing asynchronous
|
managing the asyncio event loop, finalizing asynchronous
|
||||||
generators.
|
generators and closing the default executor.
|
||||||
|
|
||||||
This function cannot be called when another asyncio event loop is
|
This function cannot be called when another asyncio event loop is
|
||||||
running in the same thread.
|
running in the same thread.
|
||||||
|
@ -172,6 +173,10 @@ def run(main, *, debug=None):
|
||||||
It should be used as a main entry point for asyncio programs, and should
|
It should be used as a main entry point for asyncio programs, and should
|
||||||
ideally only be called once.
|
ideally only be called once.
|
||||||
|
|
||||||
|
The executor is given a timeout duration of 5 minutes to shutdown.
|
||||||
|
If the executor hasn't finished within that duration, a warning is
|
||||||
|
emitted and the executor is closed.
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
Add *timeout* parameter to :meth:`asyncio.loop.shutdown_default_executor`.
|
||||||
|
The default value is ``None``, which means the executor will be given an unlimited amount of time.
|
||||||
|
When called from :class:`asyncio.Runner` or :func:`asyncio.run`, the default timeout is 5 minutes.
|
Loading…
Add table
Add a link
Reference in a new issue