mirror of
https://github.com/python/cpython.git
synced 2025-08-21 17:25:34 +00:00
bpo-35621: Support running subprocesses in asyncio when loop is executed in non-main thread (GH-14344)
(cherry picked from commit 0d671c04c3
)
Co-authored-by: Andrew Svetlov <andrew.svetlov@gmail.com>
This commit is contained in:
parent
ffcc161c75
commit
bf8cb31803
7 changed files with 378 additions and 72 deletions
|
@ -117,6 +117,7 @@ asyncio ships with the following built-in policies:
|
||||||
|
|
||||||
.. availability:: Windows.
|
.. availability:: Windows.
|
||||||
|
|
||||||
|
.. _asyncio-watchers:
|
||||||
|
|
||||||
Process Watchers
|
Process Watchers
|
||||||
================
|
================
|
||||||
|
@ -129,10 +130,11 @@ In asyncio, child processes are created with
|
||||||
:func:`create_subprocess_exec` and :meth:`loop.subprocess_exec`
|
:func:`create_subprocess_exec` and :meth:`loop.subprocess_exec`
|
||||||
functions.
|
functions.
|
||||||
|
|
||||||
asyncio defines the :class:`AbstractChildWatcher` abstract base class,
|
asyncio defines the :class:`AbstractChildWatcher` abstract base class, which child
|
||||||
which child watchers should implement, and has two different
|
watchers should implement, and has four different implementations:
|
||||||
implementations: :class:`SafeChildWatcher` (configured to be used
|
:class:`ThreadedChildWatcher` (configured to be used by default),
|
||||||
by default) and :class:`FastChildWatcher`.
|
:class:`MultiLoopChildWatcher`, :class:`SafeChildWatcher`, and
|
||||||
|
:class:`FastChildWatcher`.
|
||||||
|
|
||||||
See also the :ref:`Subprocess and Threads <asyncio-subprocess-threads>`
|
See also the :ref:`Subprocess and Threads <asyncio-subprocess-threads>`
|
||||||
section.
|
section.
|
||||||
|
@ -184,6 +186,15 @@ implementation used by the asyncio event loop:
|
||||||
|
|
||||||
Note: loop may be ``None``.
|
Note: loop may be ``None``.
|
||||||
|
|
||||||
|
.. method:: is_active()
|
||||||
|
|
||||||
|
Return ``True`` if the watcher is ready to use.
|
||||||
|
|
||||||
|
Spawning a subprocess with *inactive* current child watcher raises
|
||||||
|
:exc:`RuntimeError`.
|
||||||
|
|
||||||
|
.. versionadded:: 3.8
|
||||||
|
|
||||||
.. method:: close()
|
.. method:: close()
|
||||||
|
|
||||||
Close the watcher.
|
Close the watcher.
|
||||||
|
@ -191,16 +202,48 @@ implementation used by the asyncio event loop:
|
||||||
This method has to be called to ensure that underlying
|
This method has to be called to ensure that underlying
|
||||||
resources are cleaned-up.
|
resources are cleaned-up.
|
||||||
|
|
||||||
.. class:: SafeChildWatcher
|
.. class:: ThreadedChildWatcher
|
||||||
|
|
||||||
This implementation avoids disrupting other code spawning processes
|
This implementation starts a new waiting thread for every subprocess spawn.
|
||||||
|
|
||||||
|
It works reliably even when the asyncio event loop is run in a non-main OS thread.
|
||||||
|
|
||||||
|
There is no noticeable overhead when handling a big number of children (*O(1)* each
|
||||||
|
time a child terminates), but stating a thread per process requires extra memory.
|
||||||
|
|
||||||
|
This watcher is used by default.
|
||||||
|
|
||||||
|
.. versionadded:: 3.8
|
||||||
|
|
||||||
|
.. class:: MultiLoopChildWatcher
|
||||||
|
|
||||||
|
This implementation registers a :py:data:`SIGCHLD` signal handler on
|
||||||
|
instantiation. That can break third-party code that installs a custom handler for
|
||||||
|
`SIGCHLD`. signal).
|
||||||
|
|
||||||
|
The watcher avoids disrupting other code spawning processes
|
||||||
by polling every process explicitly on a :py:data:`SIGCHLD` signal.
|
by polling every process explicitly on a :py:data:`SIGCHLD` signal.
|
||||||
|
|
||||||
This is a safe solution but it has a significant overhead when
|
There is no limitation for running subprocesses from different threads once the
|
||||||
|
watcher is installed.
|
||||||
|
|
||||||
|
The solution is safe but it has a significant overhead when
|
||||||
handling a big number of processes (*O(n)* each time a
|
handling a big number of processes (*O(n)* each time a
|
||||||
:py:data:`SIGCHLD` is received).
|
:py:data:`SIGCHLD` is received).
|
||||||
|
|
||||||
asyncio uses this safe implementation by default.
|
.. versionadded:: 3.8
|
||||||
|
|
||||||
|
.. class:: SafeChildWatcher
|
||||||
|
|
||||||
|
This implementation uses active event loop from the main thread to handle
|
||||||
|
:py:data:`SIGCHLD` signal. If the main thread has no running event loop another
|
||||||
|
thread cannot spawn a subprocess (:exc:`RuntimeError` is raised).
|
||||||
|
|
||||||
|
The watcher avoids disrupting other code spawning processes
|
||||||
|
by polling every process explicitly on a :py:data:`SIGCHLD` signal.
|
||||||
|
|
||||||
|
This solution is as safe as :class:`MultiLoopChildWatcher` and has the same *O(N)*
|
||||||
|
complexity but requires a running event loop in the main thread to work.
|
||||||
|
|
||||||
.. class:: FastChildWatcher
|
.. class:: FastChildWatcher
|
||||||
|
|
||||||
|
@ -211,6 +254,9 @@ implementation used by the asyncio event loop:
|
||||||
There is no noticeable overhead when handling a big number of
|
There is no noticeable overhead when handling a big number of
|
||||||
children (*O(1)* each time a child terminates).
|
children (*O(1)* each time a child terminates).
|
||||||
|
|
||||||
|
This solution requires a running event loop in the main thread to work, as
|
||||||
|
:class:`SafeChildWatcher`.
|
||||||
|
|
||||||
|
|
||||||
Custom Policies
|
Custom Policies
|
||||||
===============
|
===============
|
||||||
|
|
|
@ -293,18 +293,26 @@ their completion.
|
||||||
Subprocess and Threads
|
Subprocess and Threads
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
Standard asyncio event loop supports running subprocesses from
|
Standard asyncio event loop supports running subprocesses from different threads by
|
||||||
different threads, but there are limitations:
|
default.
|
||||||
|
|
||||||
* An event loop must run in the main thread.
|
On Windows subprocesses are provided by :class:`ProactorEventLoop` only (default),
|
||||||
|
:class:`SelectorEventLoop` has no subprocess support.
|
||||||
|
|
||||||
* The child watcher must be instantiated in the main thread
|
On UNIX *child watchers* are used for subprocess finish waiting, see
|
||||||
before executing subprocesses from other threads. Call the
|
:ref:`asyncio-watchers` for more info.
|
||||||
:func:`get_child_watcher` function in the main thread to instantiate
|
|
||||||
the child watcher.
|
|
||||||
|
|
||||||
Note that alternative event loop implementations might not share
|
|
||||||
the above limitations; please refer to their documentation.
|
.. versionchanged:: 3.8
|
||||||
|
|
||||||
|
UNIX switched to use :class:`ThreadedChildWatcher` for spawning subprocesses from
|
||||||
|
different threads without any limitation.
|
||||||
|
|
||||||
|
Spawning a subprocess with *inactive* current child watcher raises
|
||||||
|
:exc:`RuntimeError`.
|
||||||
|
|
||||||
|
Note that alternative event loop implementations might have own limitations;
|
||||||
|
please refer to their documentation.
|
||||||
|
|
||||||
.. seealso::
|
.. seealso::
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
import errno
|
import errno
|
||||||
import io
|
import io
|
||||||
|
import itertools
|
||||||
import os
|
import os
|
||||||
import selectors
|
import selectors
|
||||||
import signal
|
import signal
|
||||||
|
@ -12,7 +13,6 @@ import sys
|
||||||
import threading
|
import threading
|
||||||
import warnings
|
import warnings
|
||||||
|
|
||||||
|
|
||||||
from . import base_events
|
from . import base_events
|
||||||
from . import base_subprocess
|
from . import base_subprocess
|
||||||
from . import constants
|
from . import constants
|
||||||
|
@ -29,7 +29,9 @@ from .log import logger
|
||||||
__all__ = (
|
__all__ = (
|
||||||
'SelectorEventLoop',
|
'SelectorEventLoop',
|
||||||
'AbstractChildWatcher', 'SafeChildWatcher',
|
'AbstractChildWatcher', 'SafeChildWatcher',
|
||||||
'FastChildWatcher', 'DefaultEventLoopPolicy',
|
'FastChildWatcher',
|
||||||
|
'MultiLoopChildWatcher', 'ThreadedChildWatcher',
|
||||||
|
'DefaultEventLoopPolicy',
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -184,6 +186,13 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
|
||||||
stdin, stdout, stderr, bufsize,
|
stdin, stdout, stderr, bufsize,
|
||||||
extra=None, **kwargs):
|
extra=None, **kwargs):
|
||||||
with events.get_child_watcher() as watcher:
|
with events.get_child_watcher() as watcher:
|
||||||
|
if not watcher.is_active():
|
||||||
|
# Check early.
|
||||||
|
# Raising exception before process creation
|
||||||
|
# prevents subprocess execution if the watcher
|
||||||
|
# is not ready to handle it.
|
||||||
|
raise RuntimeError("asyncio.get_child_watcher() is not activated, "
|
||||||
|
"subprocess support is not installed.")
|
||||||
waiter = self.create_future()
|
waiter = self.create_future()
|
||||||
transp = _UnixSubprocessTransport(self, protocol, args, shell,
|
transp = _UnixSubprocessTransport(self, protocol, args, shell,
|
||||||
stdin, stdout, stderr, bufsize,
|
stdin, stdout, stderr, bufsize,
|
||||||
|
@ -838,6 +847,15 @@ class AbstractChildWatcher:
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def is_active(self):
|
||||||
|
"""Return ``True`` if the watcher is active and is used by the event loop.
|
||||||
|
|
||||||
|
Return True if the watcher is installed and ready to handle process exit
|
||||||
|
notifications.
|
||||||
|
|
||||||
|
"""
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
"""Enter the watcher's context and allow starting new processes
|
"""Enter the watcher's context and allow starting new processes
|
||||||
|
|
||||||
|
@ -849,6 +867,20 @@ class AbstractChildWatcher:
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
|
||||||
|
def _compute_returncode(status):
|
||||||
|
if os.WIFSIGNALED(status):
|
||||||
|
# The child process died because of a signal.
|
||||||
|
return -os.WTERMSIG(status)
|
||||||
|
elif os.WIFEXITED(status):
|
||||||
|
# The child process exited (e.g sys.exit()).
|
||||||
|
return os.WEXITSTATUS(status)
|
||||||
|
else:
|
||||||
|
# The child exited, but we don't understand its status.
|
||||||
|
# This shouldn't happen, but if it does, let's just
|
||||||
|
# return that status; perhaps that helps debug it.
|
||||||
|
return status
|
||||||
|
|
||||||
|
|
||||||
class BaseChildWatcher(AbstractChildWatcher):
|
class BaseChildWatcher(AbstractChildWatcher):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
@ -858,6 +890,9 @@ class BaseChildWatcher(AbstractChildWatcher):
|
||||||
def close(self):
|
def close(self):
|
||||||
self.attach_loop(None)
|
self.attach_loop(None)
|
||||||
|
|
||||||
|
def is_active(self):
|
||||||
|
return self._loop is not None and self._loop.is_running()
|
||||||
|
|
||||||
def _do_waitpid(self, expected_pid):
|
def _do_waitpid(self, expected_pid):
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
@ -898,19 +933,6 @@ class BaseChildWatcher(AbstractChildWatcher):
|
||||||
'exception': exc,
|
'exception': exc,
|
||||||
})
|
})
|
||||||
|
|
||||||
def _compute_returncode(self, status):
|
|
||||||
if os.WIFSIGNALED(status):
|
|
||||||
# The child process died because of a signal.
|
|
||||||
return -os.WTERMSIG(status)
|
|
||||||
elif os.WIFEXITED(status):
|
|
||||||
# The child process exited (e.g sys.exit()).
|
|
||||||
return os.WEXITSTATUS(status)
|
|
||||||
else:
|
|
||||||
# The child exited, but we don't understand its status.
|
|
||||||
# This shouldn't happen, but if it does, let's just
|
|
||||||
# return that status; perhaps that helps debug it.
|
|
||||||
return status
|
|
||||||
|
|
||||||
|
|
||||||
class SafeChildWatcher(BaseChildWatcher):
|
class SafeChildWatcher(BaseChildWatcher):
|
||||||
"""'Safe' child watcher implementation.
|
"""'Safe' child watcher implementation.
|
||||||
|
@ -934,11 +956,6 @@ class SafeChildWatcher(BaseChildWatcher):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def add_child_handler(self, pid, callback, *args):
|
def add_child_handler(self, pid, callback, *args):
|
||||||
if self._loop is None:
|
|
||||||
raise RuntimeError(
|
|
||||||
"Cannot add child handler, "
|
|
||||||
"the child watcher does not have a loop attached")
|
|
||||||
|
|
||||||
self._callbacks[pid] = (callback, args)
|
self._callbacks[pid] = (callback, args)
|
||||||
|
|
||||||
# Prevent a race condition in case the child is already terminated.
|
# Prevent a race condition in case the child is already terminated.
|
||||||
|
@ -974,7 +991,7 @@ class SafeChildWatcher(BaseChildWatcher):
|
||||||
# The child process is still alive.
|
# The child process is still alive.
|
||||||
return
|
return
|
||||||
|
|
||||||
returncode = self._compute_returncode(status)
|
returncode = _compute_returncode(status)
|
||||||
if self._loop.get_debug():
|
if self._loop.get_debug():
|
||||||
logger.debug('process %s exited with returncode %s',
|
logger.debug('process %s exited with returncode %s',
|
||||||
expected_pid, returncode)
|
expected_pid, returncode)
|
||||||
|
@ -1035,11 +1052,6 @@ class FastChildWatcher(BaseChildWatcher):
|
||||||
def add_child_handler(self, pid, callback, *args):
|
def add_child_handler(self, pid, callback, *args):
|
||||||
assert self._forks, "Must use the context manager"
|
assert self._forks, "Must use the context manager"
|
||||||
|
|
||||||
if self._loop is None:
|
|
||||||
raise RuntimeError(
|
|
||||||
"Cannot add child handler, "
|
|
||||||
"the child watcher does not have a loop attached")
|
|
||||||
|
|
||||||
with self._lock:
|
with self._lock:
|
||||||
try:
|
try:
|
||||||
returncode = self._zombies.pop(pid)
|
returncode = self._zombies.pop(pid)
|
||||||
|
@ -1072,7 +1084,7 @@ class FastChildWatcher(BaseChildWatcher):
|
||||||
# A child process is still alive.
|
# A child process is still alive.
|
||||||
return
|
return
|
||||||
|
|
||||||
returncode = self._compute_returncode(status)
|
returncode = _compute_returncode(status)
|
||||||
|
|
||||||
with self._lock:
|
with self._lock:
|
||||||
try:
|
try:
|
||||||
|
@ -1101,6 +1113,209 @@ class FastChildWatcher(BaseChildWatcher):
|
||||||
callback(pid, returncode, *args)
|
callback(pid, returncode, *args)
|
||||||
|
|
||||||
|
|
||||||
|
class MultiLoopChildWatcher(AbstractChildWatcher):
|
||||||
|
"""A watcher that doesn't require running loop in the main thread.
|
||||||
|
|
||||||
|
This implementation registers a SIGCHLD signal handler on
|
||||||
|
instantiation (which may conflict with other code that
|
||||||
|
install own handler for this signal).
|
||||||
|
|
||||||
|
The solution is safe but it has a significant overhead when
|
||||||
|
handling a big number of processes (*O(n)* each time a
|
||||||
|
SIGCHLD is received).
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Implementation note:
|
||||||
|
# The class keeps compatibility with AbstractChildWatcher ABC
|
||||||
|
# To achieve this it has empty attach_loop() method
|
||||||
|
# and doesn't accept explicit loop argument
|
||||||
|
# for add_child_handler()/remove_child_handler()
|
||||||
|
# but retrieves the current loop by get_running_loop()
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._callbacks = {}
|
||||||
|
self._saved_sighandler = None
|
||||||
|
|
||||||
|
def is_active(self):
|
||||||
|
return self._saved_sighandler is not None
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self._callbacks.clear()
|
||||||
|
if self._saved_sighandler is not None:
|
||||||
|
handler = signal.getsignal(signal.SIGCHLD)
|
||||||
|
if handler != self._sig_chld:
|
||||||
|
logger.warning("SIGCHLD handler was changed by outside code")
|
||||||
|
else:
|
||||||
|
signal.signal(signal.SIGCHLD, self._saved_sighandler)
|
||||||
|
self._saved_sighandler = None
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def add_child_handler(self, pid, callback, *args):
|
||||||
|
loop = events.get_running_loop()
|
||||||
|
self._callbacks[pid] = (loop, callback, args)
|
||||||
|
|
||||||
|
# Prevent a race condition in case the child is already terminated.
|
||||||
|
self._do_waitpid(pid)
|
||||||
|
|
||||||
|
def remove_child_handler(self, pid):
|
||||||
|
try:
|
||||||
|
del self._callbacks[pid]
|
||||||
|
return True
|
||||||
|
except KeyError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def attach_loop(self, loop):
|
||||||
|
# Don't save the loop but initialize itself if called first time
|
||||||
|
# The reason to do it here is that attach_loop() is called from
|
||||||
|
# unix policy only for the main thread.
|
||||||
|
# Main thread is required for subscription on SIGCHLD signal
|
||||||
|
if self._saved_sighandler is None:
|
||||||
|
self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
|
||||||
|
if self._saved_sighandler is None:
|
||||||
|
logger.warning("Previous SIGCHLD handler was set by non-Python code, "
|
||||||
|
"restore to default handler on watcher close.")
|
||||||
|
self._saved_sighandler = signal.SIG_DFL
|
||||||
|
|
||||||
|
# Set SA_RESTART to limit EINTR occurrences.
|
||||||
|
signal.siginterrupt(signal.SIGCHLD, False)
|
||||||
|
|
||||||
|
def _do_waitpid_all(self):
|
||||||
|
for pid in list(self._callbacks):
|
||||||
|
self._do_waitpid(pid)
|
||||||
|
|
||||||
|
def _do_waitpid(self, expected_pid):
|
||||||
|
assert expected_pid > 0
|
||||||
|
|
||||||
|
try:
|
||||||
|
pid, status = os.waitpid(expected_pid, os.WNOHANG)
|
||||||
|
except ChildProcessError:
|
||||||
|
# The child process is already reaped
|
||||||
|
# (may happen if waitpid() is called elsewhere).
|
||||||
|
pid = expected_pid
|
||||||
|
returncode = 255
|
||||||
|
logger.warning(
|
||||||
|
"Unknown child process pid %d, will report returncode 255",
|
||||||
|
pid)
|
||||||
|
debug_log = False
|
||||||
|
else:
|
||||||
|
if pid == 0:
|
||||||
|
# The child process is still alive.
|
||||||
|
return
|
||||||
|
|
||||||
|
returncode = _compute_returncode(status)
|
||||||
|
debug_log = True
|
||||||
|
try:
|
||||||
|
loop, callback, args = self._callbacks.pop(pid)
|
||||||
|
except KeyError: # pragma: no cover
|
||||||
|
# May happen if .remove_child_handler() is called
|
||||||
|
# after os.waitpid() returns.
|
||||||
|
logger.warning("Child watcher got an unexpected pid: %r",
|
||||||
|
pid, exc_info=True)
|
||||||
|
else:
|
||||||
|
if loop.is_closed():
|
||||||
|
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
|
||||||
|
else:
|
||||||
|
if debug_log and loop.get_debug():
|
||||||
|
logger.debug('process %s exited with returncode %s',
|
||||||
|
expected_pid, returncode)
|
||||||
|
loop.call_soon_threadsafe(callback, pid, returncode, *args)
|
||||||
|
|
||||||
|
def _sig_chld(self, signum, frame):
|
||||||
|
try:
|
||||||
|
self._do_waitpid_all()
|
||||||
|
except (SystemExit, KeyboardInterrupt):
|
||||||
|
raise
|
||||||
|
except BaseException:
|
||||||
|
logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
|
||||||
|
|
||||||
|
|
||||||
|
class ThreadedChildWatcher(AbstractChildWatcher):
|
||||||
|
"""Threaded child watcher implementation.
|
||||||
|
|
||||||
|
The watcher uses a thread per process
|
||||||
|
for waiting for the process finish.
|
||||||
|
|
||||||
|
It doesn't require subscription on POSIX signal
|
||||||
|
but a thread creation is not free.
|
||||||
|
|
||||||
|
The watcher has O(1) complexity, its perfomance doesn't depend
|
||||||
|
on amount of spawn processes.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._pid_counter = itertools.count(0)
|
||||||
|
self._threads = {}
|
||||||
|
|
||||||
|
def is_active(self):
|
||||||
|
return True
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __del__(self, _warn=warnings.warn):
|
||||||
|
threads = [thread for thread in list(self._threads.values())
|
||||||
|
if thread.is_alive()]
|
||||||
|
if threads:
|
||||||
|
_warn(f"{self.__class__} has registered but not finished child processes",
|
||||||
|
ResourceWarning,
|
||||||
|
source=self)
|
||||||
|
|
||||||
|
def add_child_handler(self, pid, callback, *args):
|
||||||
|
loop = events.get_running_loop()
|
||||||
|
thread = threading.Thread(target=self._do_waitpid,
|
||||||
|
name=f"waitpid-{next(self._pid_counter)}",
|
||||||
|
args=(loop, pid, callback, args),
|
||||||
|
daemon=True)
|
||||||
|
self._threads[pid] = thread
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
def remove_child_handler(self, pid):
|
||||||
|
# asyncio never calls remove_child_handler() !!!
|
||||||
|
# The method is no-op but is implemented because
|
||||||
|
# abstract base classe requires it
|
||||||
|
return True
|
||||||
|
|
||||||
|
def attach_loop(self, loop):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _do_waitpid(self, loop, expected_pid, callback, args):
|
||||||
|
assert expected_pid > 0
|
||||||
|
|
||||||
|
try:
|
||||||
|
pid, status = os.waitpid(expected_pid, 0)
|
||||||
|
except ChildProcessError:
|
||||||
|
# The child process is already reaped
|
||||||
|
# (may happen if waitpid() is called elsewhere).
|
||||||
|
pid = expected_pid
|
||||||
|
returncode = 255
|
||||||
|
logger.warning(
|
||||||
|
"Unknown child process pid %d, will report returncode 255",
|
||||||
|
pid)
|
||||||
|
else:
|
||||||
|
returncode = _compute_returncode(status)
|
||||||
|
if loop.get_debug():
|
||||||
|
logger.debug('process %s exited with returncode %s',
|
||||||
|
expected_pid, returncode)
|
||||||
|
|
||||||
|
if loop.is_closed():
|
||||||
|
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
|
||||||
|
else:
|
||||||
|
loop.call_soon_threadsafe(callback, pid, returncode, *args)
|
||||||
|
|
||||||
|
self._threads.pop(expected_pid)
|
||||||
|
|
||||||
|
|
||||||
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
|
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
|
||||||
"""UNIX event loop policy with a watcher for child processes."""
|
"""UNIX event loop policy with a watcher for child processes."""
|
||||||
_loop_factory = _UnixSelectorEventLoop
|
_loop_factory = _UnixSelectorEventLoop
|
||||||
|
@ -1112,7 +1327,7 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
|
||||||
def _init_watcher(self):
|
def _init_watcher(self):
|
||||||
with events._lock:
|
with events._lock:
|
||||||
if self._watcher is None: # pragma: no branch
|
if self._watcher is None: # pragma: no branch
|
||||||
self._watcher = SafeChildWatcher()
|
self._watcher = ThreadedChildWatcher()
|
||||||
if isinstance(threading.current_thread(),
|
if isinstance(threading.current_thread(),
|
||||||
threading._MainThread):
|
threading._MainThread):
|
||||||
self._watcher.attach_loop(self._local._loop)
|
self._watcher.attach_loop(self._local._loop)
|
||||||
|
@ -1134,7 +1349,7 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
|
||||||
def get_child_watcher(self):
|
def get_child_watcher(self):
|
||||||
"""Get the watcher for child processes.
|
"""Get the watcher for child processes.
|
||||||
|
|
||||||
If not yet set, a SafeChildWatcher object is automatically created.
|
If not yet set, a ThreadedChildWatcher object is automatically created.
|
||||||
"""
|
"""
|
||||||
if self._watcher is None:
|
if self._watcher is None:
|
||||||
self._init_watcher()
|
self._init_watcher()
|
||||||
|
|
|
@ -633,6 +633,7 @@ class SubprocessMixin:
|
||||||
|
|
||||||
self.assertIsNone(self.loop.run_until_complete(execute()))
|
self.assertIsNone(self.loop.run_until_complete(execute()))
|
||||||
|
|
||||||
|
|
||||||
if sys.platform != 'win32':
|
if sys.platform != 'win32':
|
||||||
# Unix
|
# Unix
|
||||||
class SubprocessWatcherMixin(SubprocessMixin):
|
class SubprocessWatcherMixin(SubprocessMixin):
|
||||||
|
@ -648,7 +649,24 @@ if sys.platform != 'win32':
|
||||||
watcher = self.Watcher()
|
watcher = self.Watcher()
|
||||||
watcher.attach_loop(self.loop)
|
watcher.attach_loop(self.loop)
|
||||||
policy.set_child_watcher(watcher)
|
policy.set_child_watcher(watcher)
|
||||||
self.addCleanup(policy.set_child_watcher, None)
|
|
||||||
|
def tearDown(self):
|
||||||
|
super().tearDown()
|
||||||
|
policy = asyncio.get_event_loop_policy()
|
||||||
|
watcher = policy.get_child_watcher()
|
||||||
|
policy.set_child_watcher(None)
|
||||||
|
watcher.attach_loop(None)
|
||||||
|
watcher.close()
|
||||||
|
|
||||||
|
class SubprocessThreadedWatcherTests(SubprocessWatcherMixin,
|
||||||
|
test_utils.TestCase):
|
||||||
|
|
||||||
|
Watcher = unix_events.ThreadedChildWatcher
|
||||||
|
|
||||||
|
class SubprocessMultiLoopWatcherTests(SubprocessWatcherMixin,
|
||||||
|
test_utils.TestCase):
|
||||||
|
|
||||||
|
Watcher = unix_events.MultiLoopChildWatcher
|
||||||
|
|
||||||
class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
|
class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
|
||||||
test_utils.TestCase):
|
test_utils.TestCase):
|
||||||
|
@ -670,5 +688,25 @@ else:
|
||||||
self.set_event_loop(self.loop)
|
self.set_event_loop(self.loop)
|
||||||
|
|
||||||
|
|
||||||
|
class GenericWatcherTests:
|
||||||
|
|
||||||
|
def test_create_subprocess_fails_with_inactive_watcher(self):
|
||||||
|
|
||||||
|
async def execute():
|
||||||
|
watcher = mock.create_authspec(asyncio.AbstractChildWatcher)
|
||||||
|
watcher.is_active.return_value = False
|
||||||
|
asyncio.set_child_watcher(watcher)
|
||||||
|
|
||||||
|
with self.assertRaises(RuntimeError):
|
||||||
|
await subprocess.create_subprocess_exec(
|
||||||
|
support.FakePath(sys.executable), '-c', 'pass')
|
||||||
|
|
||||||
|
watcher.add_child_handler.assert_not_called()
|
||||||
|
|
||||||
|
self.assertIsNone(self.loop.run_until_complete(execute()))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
|
@ -1082,6 +1082,8 @@ class AbstractChildWatcherTests(unittest.TestCase):
|
||||||
NotImplementedError, watcher.attach_loop, f)
|
NotImplementedError, watcher.attach_loop, f)
|
||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
NotImplementedError, watcher.close)
|
NotImplementedError, watcher.close)
|
||||||
|
self.assertRaises(
|
||||||
|
NotImplementedError, watcher.is_active)
|
||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
NotImplementedError, watcher.__enter__)
|
NotImplementedError, watcher.__enter__)
|
||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
|
@ -1784,15 +1786,6 @@ class ChildWatcherTestsMixin:
|
||||||
if isinstance(self.watcher, asyncio.FastChildWatcher):
|
if isinstance(self.watcher, asyncio.FastChildWatcher):
|
||||||
self.assertFalse(self.watcher._zombies)
|
self.assertFalse(self.watcher._zombies)
|
||||||
|
|
||||||
@waitpid_mocks
|
|
||||||
def test_add_child_handler_with_no_loop_attached(self, m):
|
|
||||||
callback = mock.Mock()
|
|
||||||
with self.create_watcher() as watcher:
|
|
||||||
with self.assertRaisesRegex(
|
|
||||||
RuntimeError,
|
|
||||||
'the child watcher does not have a loop attached'):
|
|
||||||
watcher.add_child_handler(100, callback)
|
|
||||||
|
|
||||||
|
|
||||||
class SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
|
class SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
|
||||||
def create_watcher(self):
|
def create_watcher(self):
|
||||||
|
@ -1809,17 +1802,16 @@ class PolicyTests(unittest.TestCase):
|
||||||
def create_policy(self):
|
def create_policy(self):
|
||||||
return asyncio.DefaultEventLoopPolicy()
|
return asyncio.DefaultEventLoopPolicy()
|
||||||
|
|
||||||
def test_get_child_watcher(self):
|
def test_get_default_child_watcher(self):
|
||||||
policy = self.create_policy()
|
policy = self.create_policy()
|
||||||
self.assertIsNone(policy._watcher)
|
self.assertIsNone(policy._watcher)
|
||||||
|
|
||||||
watcher = policy.get_child_watcher()
|
watcher = policy.get_child_watcher()
|
||||||
self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
|
self.assertIsInstance(watcher, asyncio.ThreadedChildWatcher)
|
||||||
|
|
||||||
self.assertIs(policy._watcher, watcher)
|
self.assertIs(policy._watcher, watcher)
|
||||||
|
|
||||||
self.assertIs(watcher, policy.get_child_watcher())
|
self.assertIs(watcher, policy.get_child_watcher())
|
||||||
self.assertIsNone(watcher._loop)
|
|
||||||
|
|
||||||
def test_get_child_watcher_after_set(self):
|
def test_get_child_watcher_after_set(self):
|
||||||
policy = self.create_policy()
|
policy = self.create_policy()
|
||||||
|
@ -1829,18 +1821,6 @@ class PolicyTests(unittest.TestCase):
|
||||||
self.assertIs(policy._watcher, watcher)
|
self.assertIs(policy._watcher, watcher)
|
||||||
self.assertIs(watcher, policy.get_child_watcher())
|
self.assertIs(watcher, policy.get_child_watcher())
|
||||||
|
|
||||||
def test_get_child_watcher_with_mainloop_existing(self):
|
|
||||||
policy = self.create_policy()
|
|
||||||
loop = policy.get_event_loop()
|
|
||||||
|
|
||||||
self.assertIsNone(policy._watcher)
|
|
||||||
watcher = policy.get_child_watcher()
|
|
||||||
|
|
||||||
self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
|
|
||||||
self.assertIs(watcher._loop, loop)
|
|
||||||
|
|
||||||
loop.close()
|
|
||||||
|
|
||||||
def test_get_child_watcher_thread(self):
|
def test_get_child_watcher_thread(self):
|
||||||
|
|
||||||
def f():
|
def f():
|
||||||
|
@ -1866,7 +1846,11 @@ class PolicyTests(unittest.TestCase):
|
||||||
policy = self.create_policy()
|
policy = self.create_policy()
|
||||||
loop = policy.get_event_loop()
|
loop = policy.get_event_loop()
|
||||||
|
|
||||||
watcher = policy.get_child_watcher()
|
# Explicitly setup SafeChildWatcher,
|
||||||
|
# default ThreadedChildWatcher has no _loop property
|
||||||
|
watcher = asyncio.SafeChildWatcher()
|
||||||
|
policy.set_child_watcher(watcher)
|
||||||
|
watcher.attach_loop(loop)
|
||||||
|
|
||||||
self.assertIs(watcher._loop, loop)
|
self.assertIs(watcher._loop, loop)
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
"""Utilities shared by tests."""
|
"""Utilities shared by tests."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import collections
|
import collections
|
||||||
import contextlib
|
import contextlib
|
||||||
import io
|
import io
|
||||||
|
@ -512,6 +513,18 @@ class TestCase(unittest.TestCase):
|
||||||
if executor is not None:
|
if executor is not None:
|
||||||
executor.shutdown(wait=True)
|
executor.shutdown(wait=True)
|
||||||
loop.close()
|
loop.close()
|
||||||
|
policy = support.maybe_get_event_loop_policy()
|
||||||
|
if policy is not None:
|
||||||
|
try:
|
||||||
|
watcher = policy.get_child_watcher()
|
||||||
|
except NotImplementedError:
|
||||||
|
# watcher is not implemented by EventLoopPolicy, e.g. Windows
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
if isinstance(watcher, asyncio.ThreadedChildWatcher):
|
||||||
|
threads = list(watcher._threads.values())
|
||||||
|
for thread in threads:
|
||||||
|
thread.join()
|
||||||
|
|
||||||
def set_event_loop(self, loop, *, cleanup=True):
|
def set_event_loop(self, loop, *, cleanup=True):
|
||||||
assert loop is not None
|
assert loop is not None
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
Support running asyncio subprocesses when execution event loop in a thread
|
||||||
|
on UNIX.
|
Loading…
Add table
Add a link
Reference in a new issue