mirror of
https://github.com/python/cpython.git
synced 2025-07-24 03:35:53 +00:00
gh-104812: Run Pending Calls in any Thread (gh-104813)
For a while now, pending calls only run in the main thread (in the main interpreter). This PR changes things to allow any thread run a pending call, unless the pending call was explicitly added for the main thread to run.
This commit is contained in:
parent
4e80082723
commit
757b402ea1
16 changed files with 766 additions and 123 deletions
|
@ -758,6 +758,61 @@ handle_eval_breaker:
|
|||
* We need to do reasonably frequently, but not too frequently.
|
||||
* All loops should include a check of the eval breaker.
|
||||
* We also check on return from any builtin function.
|
||||
*
|
||||
* ## More Details ###
|
||||
*
|
||||
* The eval loop (this function) normally executes the instructions
|
||||
* of a code object sequentially. However, the runtime supports a
|
||||
* number of out-of-band execution scenarios that may pause that
|
||||
* sequential execution long enough to do that out-of-band work
|
||||
* in the current thread using the current PyThreadState.
|
||||
*
|
||||
* The scenarios include:
|
||||
*
|
||||
* - cyclic garbage collection
|
||||
* - GIL drop requests
|
||||
* - "async" exceptions
|
||||
* - "pending calls" (some only in the main thread)
|
||||
* - signal handling (only in the main thread)
|
||||
*
|
||||
* When the need for one of the above is detected, the eval loop
|
||||
* pauses long enough to handle the detected case. Then, if doing
|
||||
* so didn't trigger an exception, the eval loop resumes executing
|
||||
* the sequential instructions.
|
||||
*
|
||||
* To make this work, the eval loop periodically checks if any
|
||||
* of the above needs to happen. The individual checks can be
|
||||
* expensive if computed each time, so a while back we switched
|
||||
* to using pre-computed, per-interpreter variables for the checks,
|
||||
* and later consolidated that to a single "eval breaker" variable
|
||||
* (now a PyInterpreterState field).
|
||||
*
|
||||
* For the longest time, the eval breaker check would happen
|
||||
* frequently, every 5 or so times through the loop, regardless
|
||||
* of what instruction ran last or what would run next. Then, in
|
||||
* early 2021 (gh-18334, commit 4958f5d), we switched to checking
|
||||
* the eval breaker less frequently, by hard-coding the check to
|
||||
* specific places in the eval loop (e.g. certain instructions).
|
||||
* The intent then was to check after returning from calls
|
||||
* and on the back edges of loops.
|
||||
*
|
||||
* In addition to being more efficient, that approach keeps
|
||||
* the eval loop from running arbitrary code between instructions
|
||||
* that don't handle that well. (See gh-74174.)
|
||||
*
|
||||
* Currently, the eval breaker check happens here at the
|
||||
* "handle_eval_breaker" label. Some instructions come here
|
||||
* explicitly (goto) and some indirectly. Notably, the check
|
||||
* happens on back edges in the control flow graph, which
|
||||
* pretty much applies to all loops and most calls.
|
||||
* (See bytecodes.c for exact information.)
|
||||
*
|
||||
* One consequence of this approach is that it might not be obvious
|
||||
* how to force any specific thread to pick up the eval breaker,
|
||||
* or for any specific thread to not pick it up. Mostly this
|
||||
* involves judicious uses of locks and careful ordering of code,
|
||||
* while avoiding code that might trigger the eval breaker
|
||||
* until so desired.
|
||||
*/
|
||||
if (_Py_HandlePending(tstate) != 0) {
|
||||
goto error;
|
||||
|
|
|
@ -68,8 +68,9 @@ COMPUTE_EVAL_BREAKER(PyInterpreterState *interp,
|
|||
_Py_atomic_load_relaxed_int32(&ceval2->gil_drop_request)
|
||||
| (_Py_atomic_load_relaxed_int32(&ceval->signals_pending)
|
||||
&& _Py_ThreadCanHandleSignals(interp))
|
||||
| (_Py_atomic_load_relaxed_int32(&ceval2->pending.calls_to_do)
|
||||
&& _Py_ThreadCanHandlePendingCalls())
|
||||
| (_Py_atomic_load_relaxed_int32(&ceval2->pending.calls_to_do))
|
||||
| (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)
|
||||
&&_Py_atomic_load_relaxed_int32(&ceval->pending_mainthread.calls_to_do))
|
||||
| ceval2->pending.async_exc
|
||||
| _Py_atomic_load_relaxed_int32(&ceval2->gc_scheduled));
|
||||
}
|
||||
|
@ -95,11 +96,11 @@ RESET_GIL_DROP_REQUEST(PyInterpreterState *interp)
|
|||
|
||||
|
||||
static inline void
|
||||
SIGNAL_PENDING_CALLS(PyInterpreterState *interp)
|
||||
SIGNAL_PENDING_CALLS(struct _pending_calls *pending, PyInterpreterState *interp)
|
||||
{
|
||||
struct _ceval_runtime_state *ceval = &interp->runtime->ceval;
|
||||
struct _ceval_state *ceval2 = &interp->ceval;
|
||||
_Py_atomic_store_relaxed(&ceval2->pending.calls_to_do, 1);
|
||||
_Py_atomic_store_relaxed(&pending->calls_to_do, 1);
|
||||
COMPUTE_EVAL_BREAKER(interp, ceval, ceval2);
|
||||
}
|
||||
|
||||
|
@ -109,6 +110,9 @@ UNSIGNAL_PENDING_CALLS(PyInterpreterState *interp)
|
|||
{
|
||||
struct _ceval_runtime_state *ceval = &interp->runtime->ceval;
|
||||
struct _ceval_state *ceval2 = &interp->ceval;
|
||||
if (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)) {
|
||||
_Py_atomic_store_relaxed(&ceval->pending_mainthread.calls_to_do, 0);
|
||||
}
|
||||
_Py_atomic_store_relaxed(&ceval2->pending.calls_to_do, 0);
|
||||
COMPUTE_EVAL_BREAKER(interp, ceval, ceval2);
|
||||
}
|
||||
|
@ -803,19 +807,31 @@ _push_pending_call(struct _pending_calls *pending,
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
_next_pending_call(struct _pending_calls *pending,
|
||||
int (**func)(void *), void **arg)
|
||||
{
|
||||
int i = pending->first;
|
||||
if (i == pending->last) {
|
||||
/* Queue empty */
|
||||
assert(pending->calls[i].func == NULL);
|
||||
return -1;
|
||||
}
|
||||
*func = pending->calls[i].func;
|
||||
*arg = pending->calls[i].arg;
|
||||
return i;
|
||||
}
|
||||
|
||||
/* Pop one item off the queue while holding the lock. */
|
||||
static void
|
||||
_pop_pending_call(struct _pending_calls *pending,
|
||||
int (**func)(void *), void **arg)
|
||||
{
|
||||
int i = pending->first;
|
||||
if (i == pending->last) {
|
||||
return; /* Queue empty */
|
||||
int i = _next_pending_call(pending, func, arg);
|
||||
if (i >= 0) {
|
||||
pending->calls[i] = (struct _pending_call){0};
|
||||
pending->first = (i + 1) % NPENDINGCALLS;
|
||||
}
|
||||
|
||||
*func = pending->calls[i].func;
|
||||
*arg = pending->calls[i].arg;
|
||||
pending->first = (i + 1) % NPENDINGCALLS;
|
||||
}
|
||||
|
||||
/* This implementation is thread-safe. It allows
|
||||
|
@ -825,9 +841,16 @@ _pop_pending_call(struct _pending_calls *pending,
|
|||
|
||||
int
|
||||
_PyEval_AddPendingCall(PyInterpreterState *interp,
|
||||
int (*func)(void *), void *arg)
|
||||
int (*func)(void *), void *arg,
|
||||
int mainthreadonly)
|
||||
{
|
||||
assert(!mainthreadonly || _Py_IsMainInterpreter(interp));
|
||||
struct _pending_calls *pending = &interp->ceval.pending;
|
||||
if (mainthreadonly) {
|
||||
/* The main thread only exists in the main interpreter. */
|
||||
assert(_Py_IsMainInterpreter(interp));
|
||||
pending = &_PyRuntime.ceval.pending_mainthread;
|
||||
}
|
||||
/* Ensure that _PyEval_InitState() was called
|
||||
and that _PyEval_FiniState() is not called yet. */
|
||||
assert(pending->lock != NULL);
|
||||
|
@ -837,39 +860,17 @@ _PyEval_AddPendingCall(PyInterpreterState *interp,
|
|||
PyThread_release_lock(pending->lock);
|
||||
|
||||
/* signal main loop */
|
||||
SIGNAL_PENDING_CALLS(interp);
|
||||
SIGNAL_PENDING_CALLS(pending, interp);
|
||||
return result;
|
||||
}
|
||||
|
||||
int
|
||||
Py_AddPendingCall(int (*func)(void *), void *arg)
|
||||
{
|
||||
/* Best-effort to support subinterpreters and calls with the GIL released.
|
||||
|
||||
First attempt _PyThreadState_GET() since it supports subinterpreters.
|
||||
|
||||
If the GIL is released, _PyThreadState_GET() returns NULL . In this
|
||||
case, use PyGILState_GetThisThreadState() which works even if the GIL
|
||||
is released.
|
||||
|
||||
Sadly, PyGILState_GetThisThreadState() doesn't support subinterpreters:
|
||||
see bpo-10915 and bpo-15751.
|
||||
|
||||
Py_AddPendingCall() doesn't require the caller to hold the GIL. */
|
||||
PyThreadState *tstate = _PyThreadState_GET();
|
||||
if (tstate == NULL) {
|
||||
tstate = PyGILState_GetThisThreadState();
|
||||
}
|
||||
|
||||
PyInterpreterState *interp;
|
||||
if (tstate != NULL) {
|
||||
interp = tstate->interp;
|
||||
}
|
||||
else {
|
||||
/* Last resort: use the main interpreter */
|
||||
interp = _PyInterpreterState_Main();
|
||||
}
|
||||
return _PyEval_AddPendingCall(interp, func, arg);
|
||||
/* Legacy users of this API will continue to target the main thread
|
||||
(of the main interpreter). */
|
||||
PyInterpreterState *interp = _PyInterpreterState_Main();
|
||||
return _PyEval_AddPendingCall(interp, func, arg, 1);
|
||||
}
|
||||
|
||||
static int
|
||||
|
@ -889,27 +890,24 @@ handle_signals(PyThreadState *tstate)
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
make_pending_calls(PyInterpreterState *interp)
|
||||
static inline int
|
||||
maybe_has_pending_calls(PyInterpreterState *interp)
|
||||
{
|
||||
/* only execute pending calls on main thread */
|
||||
if (!_Py_ThreadCanHandlePendingCalls()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* don't perform recursive pending calls */
|
||||
if (interp->ceval.pending.busy) {
|
||||
return 0;
|
||||
}
|
||||
interp->ceval.pending.busy = 1;
|
||||
|
||||
/* unsignal before starting to call callbacks, so that any callback
|
||||
added in-between re-signals */
|
||||
UNSIGNAL_PENDING_CALLS(interp);
|
||||
int res = 0;
|
||||
|
||||
/* perform a bounded number of calls, in case of recursion */
|
||||
struct _pending_calls *pending = &interp->ceval.pending;
|
||||
if (_Py_atomic_load_relaxed_int32(&pending->calls_to_do)) {
|
||||
return 1;
|
||||
}
|
||||
if (!_Py_IsMainThread() || !_Py_IsMainInterpreter(interp)) {
|
||||
return 0;
|
||||
}
|
||||
pending = &_PyRuntime.ceval.pending_mainthread;
|
||||
return _Py_atomic_load_relaxed_int32(&pending->calls_to_do);
|
||||
}
|
||||
|
||||
static int
|
||||
_make_pending_calls(struct _pending_calls *pending)
|
||||
{
|
||||
/* perform a bounded number of calls, in case of recursion */
|
||||
for (int i=0; i<NPENDINGCALLS; i++) {
|
||||
int (*func)(void *) = NULL;
|
||||
void *arg = NULL;
|
||||
|
@ -923,19 +921,61 @@ make_pending_calls(PyInterpreterState *interp)
|
|||
if (func == NULL) {
|
||||
break;
|
||||
}
|
||||
res = func(arg);
|
||||
if (res) {
|
||||
goto error;
|
||||
if (func(arg) != 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
make_pending_calls(PyInterpreterState *interp)
|
||||
{
|
||||
struct _pending_calls *pending = &interp->ceval.pending;
|
||||
struct _pending_calls *pending_main = &_PyRuntime.ceval.pending_mainthread;
|
||||
|
||||
/* Only one thread (per interpreter) may run the pending calls
|
||||
at once. In the same way, we don't do recursive pending calls. */
|
||||
PyThread_acquire_lock(pending->lock, WAIT_LOCK);
|
||||
if (pending->busy) {
|
||||
/* A pending call was added after another thread was already
|
||||
handling the pending calls (and had already "unsignaled").
|
||||
Once that thread is done, it may have taken care of all the
|
||||
pending calls, or there might be some still waiting.
|
||||
Regardless, this interpreter's pending calls will stay
|
||||
"signaled" until that first thread has finished. At that
|
||||
point the next thread to trip the eval breaker will take
|
||||
care of any remaining pending calls. Until then, though,
|
||||
all the interpreter's threads will be tripping the eval
|
||||
breaker every time it's checked. */
|
||||
PyThread_release_lock(pending->lock);
|
||||
return 0;
|
||||
}
|
||||
pending->busy = 1;
|
||||
PyThread_release_lock(pending->lock);
|
||||
|
||||
/* unsignal before starting to call callbacks, so that any callback
|
||||
added in-between re-signals */
|
||||
UNSIGNAL_PENDING_CALLS(interp);
|
||||
|
||||
if (_make_pending_calls(pending) != 0) {
|
||||
pending->busy = 0;
|
||||
/* There might not be more calls to make, but we play it safe. */
|
||||
SIGNAL_PENDING_CALLS(pending, interp);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)) {
|
||||
if (_make_pending_calls(pending_main) != 0) {
|
||||
pending->busy = 0;
|
||||
/* There might not be more calls to make, but we play it safe. */
|
||||
SIGNAL_PENDING_CALLS(pending_main, interp);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
interp->ceval.pending.busy = 0;
|
||||
return res;
|
||||
|
||||
error:
|
||||
interp->ceval.pending.busy = 0;
|
||||
SIGNAL_PENDING_CALLS(interp);
|
||||
return res;
|
||||
pending->busy = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -944,12 +984,6 @@ _Py_FinishPendingCalls(PyThreadState *tstate)
|
|||
assert(PyGILState_Check());
|
||||
assert(is_tstate_valid(tstate));
|
||||
|
||||
struct _pending_calls *pending = &tstate->interp->ceval.pending;
|
||||
|
||||
if (!_Py_atomic_load_relaxed_int32(&(pending->calls_to_do))) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (make_pending_calls(tstate->interp) < 0) {
|
||||
PyObject *exc = _PyErr_GetRaisedException(tstate);
|
||||
PyErr_BadInternalCall();
|
||||
|
@ -958,6 +992,29 @@ _Py_FinishPendingCalls(PyThreadState *tstate)
|
|||
}
|
||||
}
|
||||
|
||||
int
|
||||
_PyEval_MakePendingCalls(PyThreadState *tstate)
|
||||
{
|
||||
int res;
|
||||
|
||||
if (_Py_IsMainThread() && _Py_IsMainInterpreter(tstate->interp)) {
|
||||
/* Python signal handler doesn't really queue a callback:
|
||||
it only signals that a signal was received,
|
||||
see _PyEval_SignalReceived(). */
|
||||
res = handle_signals(tstate);
|
||||
if (res != 0) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
res = make_pending_calls(tstate->interp);
|
||||
if (res != 0) {
|
||||
return res;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Py_MakePendingCalls() is a simple wrapper for the sake
|
||||
of backward-compatibility. */
|
||||
int
|
||||
|
@ -968,19 +1025,11 @@ Py_MakePendingCalls(void)
|
|||
PyThreadState *tstate = _PyThreadState_GET();
|
||||
assert(is_tstate_valid(tstate));
|
||||
|
||||
/* Python signal handler doesn't really queue a callback: it only signals
|
||||
that a signal was received, see _PyEval_SignalReceived(). */
|
||||
int res = handle_signals(tstate);
|
||||
if (res != 0) {
|
||||
return res;
|
||||
/* Only execute pending calls on the main thread. */
|
||||
if (!_Py_IsMainThread() || !_Py_IsMainInterpreter(tstate->interp)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
res = make_pending_calls(tstate->interp);
|
||||
if (res != 0) {
|
||||
return res;
|
||||
}
|
||||
|
||||
return 0;
|
||||
return _PyEval_MakePendingCalls(tstate);
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -1020,7 +1069,7 @@ _Py_HandlePending(PyThreadState *tstate)
|
|||
}
|
||||
|
||||
/* Pending calls */
|
||||
if (_Py_atomic_load_relaxed_int32(&interp_ceval_state->pending.calls_to_do)) {
|
||||
if (maybe_has_pending_calls(tstate->interp)) {
|
||||
if (make_pending_calls(tstate->interp) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -2152,6 +2152,9 @@ Py_EndInterpreter(PyThreadState *tstate)
|
|||
// Wrap up existing "threading"-module-created, non-daemon threads.
|
||||
wait_for_thread_shutdown(tstate);
|
||||
|
||||
// Make any remaining pending calls.
|
||||
_Py_FinishPendingCalls(tstate);
|
||||
|
||||
_PyAtExit_Call(tstate->interp);
|
||||
|
||||
if (tstate != interp->threads.head || tstate->next != NULL) {
|
||||
|
|
|
@ -380,7 +380,7 @@ _Py_COMP_DIAG_IGNORE_DEPR_DECLS
|
|||
static const _PyRuntimeState initial = _PyRuntimeState_INIT(_PyRuntime);
|
||||
_Py_COMP_DIAG_POP
|
||||
|
||||
#define NUMLOCKS 8
|
||||
#define NUMLOCKS 9
|
||||
#define LOCKS_INIT(runtime) \
|
||||
{ \
|
||||
&(runtime)->interpreters.mutex, \
|
||||
|
@ -388,6 +388,7 @@ _Py_COMP_DIAG_POP
|
|||
&(runtime)->getargs.mutex, \
|
||||
&(runtime)->unicode_state.ids.lock, \
|
||||
&(runtime)->imports.extensions.mutex, \
|
||||
&(runtime)->ceval.pending_mainthread.lock, \
|
||||
&(runtime)->atexit.mutex, \
|
||||
&(runtime)->audit_hooks.mutex, \
|
||||
&(runtime)->allocators.mutex, \
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue