gh-104812: Run Pending Calls in any Thread (gh-104813)

For a while now, pending calls only run in the main thread (in the main interpreter).  This PR changes things to allow any thread run a pending call, unless the pending call was explicitly added for the main thread to run.
This commit is contained in:
Eric Snow 2023-06-13 15:02:19 -06:00 committed by GitHub
parent 4e80082723
commit 757b402ea1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 766 additions and 123 deletions

View file

@ -22,6 +22,8 @@ PyAPI_FUNC(PyObject *) _PyEval_EvalFrameDefault(PyThreadState *tstate, struct _P
PyAPI_FUNC(void) _PyEval_SetSwitchInterval(unsigned long microseconds);
PyAPI_FUNC(unsigned long) _PyEval_GetSwitchInterval(void);
PyAPI_FUNC(int) _PyEval_MakePendingCalls(PyThreadState *);
PyAPI_FUNC(Py_ssize_t) PyUnstable_Eval_RequestCodeExtraIndex(freefunc);
// Old name -- remove when this API changes:
_Py_DEPRECATED_EXTERNALLY(3.12) static inline Py_ssize_t

View file

@ -27,7 +27,8 @@ PyAPI_FUNC(void) _PyEval_SignalReceived(PyInterpreterState *interp);
PyAPI_FUNC(int) _PyEval_AddPendingCall(
PyInterpreterState *interp,
int (*func)(void *),
void *arg);
void *arg,
int mainthreadonly);
PyAPI_FUNC(void) _PyEval_SignalAsyncExc(PyInterpreterState *interp);
#ifdef HAVE_FORK
extern PyStatus _PyEval_ReInitThreads(PyThreadState *tstate);

View file

@ -13,6 +13,24 @@ extern "C" {
#include "pycore_gil.h" // struct _gil_runtime_state
struct _pending_calls {
int busy;
PyThread_type_lock lock;
/* Request for running pending calls. */
_Py_atomic_int calls_to_do;
/* Request for looking at the `async_exc` field of the current
thread state.
Guarded by the GIL. */
int async_exc;
#define NPENDINGCALLS 32
struct _pending_call {
int (*func)(void *);
void *arg;
} calls[NPENDINGCALLS];
int first;
int last;
};
typedef enum {
PERF_STATUS_FAILED = -1, // Perf trampoline is in an invalid state
PERF_STATUS_NO_INIT = 0, // Perf trampoline is not initialized
@ -49,6 +67,8 @@ struct _ceval_runtime_state {
the main thread of the main interpreter can handle signals: see
_Py_ThreadCanHandleSignals(). */
_Py_atomic_int signals_pending;
/* Pending calls to be made only on the main thread. */
struct _pending_calls pending_mainthread;
};
#ifdef PY_HAVE_PERF_TRAMPOLINE
@ -62,24 +82,6 @@ struct _ceval_runtime_state {
#endif
struct _pending_calls {
int busy;
PyThread_type_lock lock;
/* Request for running pending calls. */
_Py_atomic_int calls_to_do;
/* Request for looking at the `async_exc` field of the current
thread state.
Guarded by the GIL. */
int async_exc;
#define NPENDINGCALLS 32
struct {
int (*func)(void *);
void *arg;
} calls[NPENDINGCALLS];
int first;
int last;
};
struct _ceval_state {
/* This single variable consolidates all requests to break out of
the fast path in the eval loop. */

View file

@ -60,14 +60,6 @@ _Py_ThreadCanHandleSignals(PyInterpreterState *interp)
}
/* Only execute pending calls on the main thread. */
static inline int
_Py_ThreadCanHandlePendingCalls(void)
{
return _Py_IsMainThread();
}
/* Variable and static inline functions for in-line access to current thread
and interpreter state */

View file

@ -115,7 +115,11 @@ def join_thread(thread, timeout=None):
@contextlib.contextmanager
def start_threads(threads, unlock=None):
import faulthandler
try:
import faulthandler
except ImportError:
# It isn't supported on subinterpreters yet.
faulthandler = None
threads = list(threads)
started = []
try:
@ -147,7 +151,8 @@ def start_threads(threads, unlock=None):
finally:
started = [t for t in started if t.is_alive()]
if started:
faulthandler.dump_traceback(sys.stdout)
if faulthandler is not None:
faulthandler.dump_traceback(sys.stdout)
raise AssertionError('Unable to join %d threads' % len(started))

View file

@ -2,17 +2,20 @@
# these are all functions _testcapi exports whose name begins with 'test_'.
import _thread
from collections import OrderedDict
from collections import OrderedDict, deque
import contextlib
import importlib.machinery
import importlib.util
import json
import os
import pickle
import queue
import random
import sys
import textwrap
import threading
import time
import types
import unittest
import warnings
import weakref
@ -36,6 +39,10 @@ try:
import _testsinglephase
except ImportError:
_testsinglephase = None
try:
import _xxsubinterpreters as _interpreters
except ModuleNotFoundError:
_interpreters = None
# Skip this test if the _testcapi module isn't available.
_testcapi = import_helper.import_module('_testcapi')
@ -47,6 +54,12 @@ def decode_stderr(err):
return err.decode('utf-8', 'replace').replace('\r', '')
def requires_subinterpreters(meth):
"""Decorator to skip a test if subinterpreters are not supported."""
return unittest.skipIf(_interpreters is None,
'subinterpreters required')(meth)
def testfunction(self):
"""some doc"""
return self
@ -1259,6 +1272,10 @@ class TestHeapTypeRelative(unittest.TestCase):
class TestPendingCalls(unittest.TestCase):
# See the comment in ceval.c (at the "handle_eval_breaker" label)
# about when pending calls get run. This is especially relevant
# here for creating deterministic tests.
def pendingcalls_submit(self, l, n):
def callback():
#this function can be interrupted by thread switching so let's
@ -1341,6 +1358,388 @@ class TestPendingCalls(unittest.TestCase):
gen = genf()
self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code)
class PendingTask(types.SimpleNamespace):
_add_pending = _testinternalcapi.pending_threadfunc
def __init__(self, req, taskid=None, notify_done=None):
self.id = taskid
self.req = req
self.notify_done = notify_done
self.creator_tid = threading.get_ident()
self.requester_tid = None
self.runner_tid = None
self.result = None
def run(self):
assert self.result is None
self.runner_tid = threading.get_ident()
self._run()
if self.notify_done is not None:
self.notify_done()
def _run(self):
self.result = self.req
def run_in_pending_call(self, worker_tids):
assert self._add_pending is _testinternalcapi.pending_threadfunc
self.requester_tid = threading.get_ident()
def callback():
assert self.result is None
# It can be tricky to control which thread handles
# the eval breaker, so we take a naive approach to
# make sure.
if threading.get_ident() not in worker_tids:
self._add_pending(callback, ensure_added=True)
return
self.run()
self._add_pending(callback, ensure_added=True)
def create_thread(self, worker_tids):
return threading.Thread(
target=self.run_in_pending_call,
args=(worker_tids,),
)
def wait_for_result(self):
while self.result is None:
time.sleep(0.01)
def test_subthreads_can_handle_pending_calls(self):
payload = 'Spam spam spam spam. Lovely spam! Wonderful spam!'
task = self.PendingTask(payload)
def do_the_work():
tid = threading.get_ident()
t = task.create_thread({tid})
with threading_helper.start_threads([t]):
task.wait_for_result()
t = threading.Thread(target=do_the_work)
with threading_helper.start_threads([t]):
pass
self.assertEqual(task.result, payload)
def test_many_subthreads_can_handle_pending_calls(self):
main_tid = threading.get_ident()
self.assertEqual(threading.main_thread().ident, main_tid)
# We can't use queue.Queue since it isn't reentrant relative
# to pending calls.
_queue = deque()
_active = deque()
_done_lock = threading.Lock()
def queue_put(task):
_queue.append(task)
_active.append(True)
def queue_get():
try:
task = _queue.popleft()
except IndexError:
raise queue.Empty
return task
def queue_task_done():
_active.pop()
if not _active:
try:
_done_lock.release()
except RuntimeError:
assert not _done_lock.locked()
def queue_empty():
return not _queue
def queue_join():
_done_lock.acquire()
_done_lock.release()
tasks = []
for i in range(20):
task = self.PendingTask(
req=f'request {i}',
taskid=i,
notify_done=queue_task_done,
)
tasks.append(task)
queue_put(task)
# This will be released once all the tasks have finished.
_done_lock.acquire()
def add_tasks(worker_tids):
while True:
if done:
return
try:
task = queue_get()
except queue.Empty:
break
task.run_in_pending_call(worker_tids)
done = False
def run_tasks():
while not queue_empty():
if done:
return
time.sleep(0.01)
# Give the worker a chance to handle any remaining pending calls.
while not done:
time.sleep(0.01)
# Start the workers and wait for them to finish.
worker_threads = [threading.Thread(target=run_tasks)
for _ in range(3)]
with threading_helper.start_threads(worker_threads):
try:
# Add a pending call for each task.
worker_tids = [t.ident for t in worker_threads]
threads = [threading.Thread(target=add_tasks, args=(worker_tids,))
for _ in range(3)]
with threading_helper.start_threads(threads):
try:
pass
except BaseException:
done = True
raise # re-raise
# Wait for the pending calls to finish.
queue_join()
# Notify the workers that they can stop.
done = True
except BaseException:
done = True
raise # re-raise
runner_tids = [t.runner_tid for t in tasks]
self.assertNotIn(main_tid, runner_tids)
for task in tasks:
with self.subTest(f'task {task.id}'):
self.assertNotEqual(task.requester_tid, main_tid)
self.assertNotEqual(task.requester_tid, task.runner_tid)
self.assertNotIn(task.requester_tid, runner_tids)
@requires_subinterpreters
def test_isolated_subinterpreter(self):
# We exercise the most important permutations.
# This test relies on pending calls getting called
# (eval breaker tripped) at each loop iteration
# and at each call.
maxtext = 250
main_interpid = 0
interpid = _interpreters.create()
_interpreters.run_string(interpid, f"""if True:
import json
import os
import threading
import time
import _testinternalcapi
from test.support import threading_helper
""")
def create_pipe():
r, w = os.pipe()
self.addCleanup(lambda: os.close(r))
self.addCleanup(lambda: os.close(w))
return r, w
with self.subTest('add in main, run in subinterpreter'):
r_ready, w_ready = create_pipe()
r_done, w_done= create_pipe()
timeout = time.time() + 30 # seconds
def do_work():
_interpreters.run_string(interpid, f"""if True:
# Wait until this interp has handled the pending call.
waiting = False
done = False
def wait(os_read=os.read):
global done, waiting
waiting = True
os_read({r_done}, 1)
done = True
t = threading.Thread(target=wait)
with threading_helper.start_threads([t]):
while not waiting:
pass
os.write({w_ready}, b'\\0')
# Loop to trigger the eval breaker.
while not done:
time.sleep(0.01)
if time.time() > {timeout}:
raise Exception('timed out!')
""")
t = threading.Thread(target=do_work)
with threading_helper.start_threads([t]):
os.read(r_ready, 1)
# Add the pending call and wait for it to finish.
actual = _testinternalcapi.pending_identify(interpid)
# Signal the subinterpreter to stop.
os.write(w_done, b'\0')
self.assertEqual(actual, int(interpid))
with self.subTest('add in main, run in subinterpreter sub-thread'):
r_ready, w_ready = create_pipe()
r_done, w_done= create_pipe()
timeout = time.time() + 30 # seconds
def do_work():
_interpreters.run_string(interpid, f"""if True:
waiting = False
done = False
def subthread():
while not waiting:
pass
os.write({w_ready}, b'\\0')
# Loop to trigger the eval breaker.
while not done:
time.sleep(0.01)
if time.time() > {timeout}:
raise Exception('timed out!')
t = threading.Thread(target=subthread)
with threading_helper.start_threads([t]):
# Wait until this interp has handled the pending call.
waiting = True
os.read({r_done}, 1)
done = True
""")
t = threading.Thread(target=do_work)
with threading_helper.start_threads([t]):
os.read(r_ready, 1)
# Add the pending call and wait for it to finish.
actual = _testinternalcapi.pending_identify(interpid)
# Signal the subinterpreter to stop.
os.write(w_done, b'\0')
self.assertEqual(actual, int(interpid))
with self.subTest('add in subinterpreter, run in main'):
r_ready, w_ready = create_pipe()
r_done, w_done= create_pipe()
r_data, w_data= create_pipe()
timeout = time.time() + 30 # seconds
def add_job():
os.read(r_ready, 1)
_interpreters.run_string(interpid, f"""if True:
# Add the pending call and wait for it to finish.
actual = _testinternalcapi.pending_identify({main_interpid})
# Signal the subinterpreter to stop.
os.write({w_done}, b'\\0')
os.write({w_data}, actual.to_bytes(1, 'little'))
""")
# Wait until this interp has handled the pending call.
waiting = False
done = False
def wait(os_read=os.read):
nonlocal done, waiting
waiting = True
os_read(r_done, 1)
done = True
t1 = threading.Thread(target=add_job)
t2 = threading.Thread(target=wait)
with threading_helper.start_threads([t1, t2]):
while not waiting:
pass
os.write(w_ready, b'\0')
# Loop to trigger the eval breaker.
while not done:
time.sleep(0.01)
if time.time() > timeout:
raise Exception('timed out!')
text = os.read(r_data, 1)
actual = int.from_bytes(text, 'little')
self.assertEqual(actual, int(main_interpid))
with self.subTest('add in subinterpreter, run in sub-thread'):
r_ready, w_ready = create_pipe()
r_done, w_done= create_pipe()
r_data, w_data= create_pipe()
timeout = time.time() + 30 # seconds
def add_job():
os.read(r_ready, 1)
_interpreters.run_string(interpid, f"""if True:
# Add the pending call and wait for it to finish.
actual = _testinternalcapi.pending_identify({main_interpid})
# Signal the subinterpreter to stop.
os.write({w_done}, b'\\0')
os.write({w_data}, actual.to_bytes(1, 'little'))
""")
# Wait until this interp has handled the pending call.
waiting = False
done = False
def wait(os_read=os.read):
nonlocal done, waiting
waiting = True
os_read(r_done, 1)
done = True
def subthread():
while not waiting:
pass
os.write(w_ready, b'\0')
# Loop to trigger the eval breaker.
while not done:
time.sleep(0.01)
if time.time() > timeout:
raise Exception('timed out!')
t1 = threading.Thread(target=add_job)
t2 = threading.Thread(target=wait)
t3 = threading.Thread(target=subthread)
with threading_helper.start_threads([t1, t2, t3]):
pass
text = os.read(r_data, 1)
actual = int.from_bytes(text, 'little')
self.assertEqual(actual, int(main_interpid))
# XXX We can't use the rest until gh-105716 is fixed.
return
with self.subTest('add in subinterpreter, run in subinterpreter sub-thread'):
r_ready, w_ready = create_pipe()
r_done, w_done= create_pipe()
r_data, w_data= create_pipe()
timeout = time.time() + 30 # seconds
def do_work():
_interpreters.run_string(interpid, f"""if True:
waiting = False
done = False
def subthread():
while not waiting:
pass
os.write({w_ready}, b'\\0')
# Loop to trigger the eval breaker.
while not done:
time.sleep(0.01)
if time.time() > {timeout}:
raise Exception('timed out!')
t = threading.Thread(target=subthread)
with threading_helper.start_threads([t]):
# Wait until this interp has handled the pending call.
waiting = True
os.read({r_done}, 1)
done = True
""")
t = threading.Thread(target=do_work)
#with threading_helper.start_threads([t]):
t.start()
if True:
os.read(r_ready, 1)
_interpreters.run_string(interpid, f"""if True:
# Add the pending call and wait for it to finish.
actual = _testinternalcapi.pending_identify({interpid})
# Signal the subinterpreter to stop.
os.write({w_done}, b'\\0')
os.write({w_data}, actual.to_bytes(1, 'little'))
""")
t.join()
text = os.read(r_data, 1)
actual = int.from_bytes(text, 'little')
self.assertEqual(actual, int(interpid))
class SubinterpreterTest(unittest.TestCase):

View file

@ -0,0 +1,9 @@
The "pending call" machinery now works for all interpreters, not just the
main interpreter, and runs in all threads, not just the main thread. Some
calls are still only done in the main thread, ergo in the main interpreter.
This change does not affect signal handling nor the existing public C-API
(``Py_AddPendingCall()``), which both still only target the main thread.
The new functionality is meant strictly for internal use for now, since
consequences of its use are not well understood yet outside some very
restricted cases. This change brings the capability in line with the
intention when the state was made per-interpreter several years ago.

View file

@ -210,6 +210,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
PyObject *item;
PyLockStatus r;
PY_TIMEOUT_T microseconds;
PyThreadState *tstate = PyThreadState_Get();
if (block == 0) {
/* Non-blocking */
@ -253,7 +254,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
Py_END_ALLOW_THREADS
}
if (r == PY_LOCK_INTR && Py_MakePendingCalls() < 0) {
if (r == PY_LOCK_INTR && _PyEval_MakePendingCalls(tstate) < 0) {
return NULL;
}
if (r == PY_LOCK_FAILURE) {

View file

@ -13,16 +13,18 @@
#include "Python.h"
#include "frameobject.h"
#include "interpreteridobject.h" // _PyInterpreterID_LookUp()
#include "pycore_atomic_funcs.h" // _Py_atomic_int_get()
#include "pycore_bitutils.h" // _Py_bswap32()
#include "pycore_compile.h" // _PyCompile_CodeGen, _PyCompile_OptimizeCfg, _PyCompile_Assemble
#include "pycore_ceval.h" // _PyEval_AddPendingCall
#include "pycore_fileutils.h" // _Py_normpath
#include "pycore_frame.h" // _PyInterpreterFrame
#include "pycore_gc.h" // PyGC_Head
#include "pycore_hashtable.h" // _Py_hashtable_new()
#include "pycore_initconfig.h" // _Py_GetConfigsAsDict()
#include "pycore_pathconfig.h" // _PyPathConfig_ClearGlobal()
#include "pycore_interp.h" // _PyInterpreterState_GetConfigCopy()
#include "pycore_pathconfig.h" // _PyPathConfig_ClearGlobal()
#include "pycore_pyerrors.h" // _Py_UTF8_Edit_Cost()
#include "pycore_pystate.h" // _PyThreadState_GET()
#include "osdefs.h" // MAXPATHLEN
@ -838,6 +840,120 @@ set_optimizer(PyObject *self, PyObject *opt)
Py_RETURN_NONE;
}
static int _pending_callback(void *arg)
{
/* we assume the argument is callable object to which we own a reference */
PyObject *callable = (PyObject *)arg;
PyObject *r = PyObject_CallNoArgs(callable);
Py_DECREF(callable);
Py_XDECREF(r);
return r != NULL ? 0 : -1;
}
/* The following requests n callbacks to _pending_callback. It can be
* run from any python thread.
*/
static PyObject *
pending_threadfunc(PyObject *self, PyObject *args, PyObject *kwargs)
{
PyObject *callable;
int ensure_added = 0;
static char *kwlist[] = {"", "ensure_added", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs,
"O|$p:pending_threadfunc", kwlist,
&callable, &ensure_added))
{
return NULL;
}
PyInterpreterState *interp = PyInterpreterState_Get();
/* create the reference for the callbackwhile we hold the lock */
Py_INCREF(callable);
int r;
Py_BEGIN_ALLOW_THREADS
r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0);
Py_END_ALLOW_THREADS
if (r < 0) {
/* unsuccessful add */
if (!ensure_added) {
Py_DECREF(callable);
Py_RETURN_FALSE;
}
do {
Py_BEGIN_ALLOW_THREADS
r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0);
Py_END_ALLOW_THREADS
} while (r < 0);
}
Py_RETURN_TRUE;
}
static struct {
int64_t interpid;
} pending_identify_result;
static int
_pending_identify_callback(void *arg)
{
PyThread_type_lock mutex = (PyThread_type_lock)arg;
assert(pending_identify_result.interpid == -1);
PyThreadState *tstate = PyThreadState_Get();
pending_identify_result.interpid = PyInterpreterState_GetID(tstate->interp);
PyThread_release_lock(mutex);
return 0;
}
static PyObject *
pending_identify(PyObject *self, PyObject *args)
{
PyObject *interpid;
if (!PyArg_ParseTuple(args, "O:pending_identify", &interpid)) {
return NULL;
}
PyInterpreterState *interp = _PyInterpreterID_LookUp(interpid);
if (interp == NULL) {
if (!PyErr_Occurred()) {
PyErr_SetString(PyExc_ValueError, "interpreter not found");
}
return NULL;
}
pending_identify_result.interpid = -1;
PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) {
return NULL;
}
PyThread_acquire_lock(mutex, WAIT_LOCK);
/* It gets released in _pending_identify_callback(). */
int r;
do {
Py_BEGIN_ALLOW_THREADS
r = _PyEval_AddPendingCall(interp,
&_pending_identify_callback, (void *)mutex,
0);
Py_END_ALLOW_THREADS
} while (r < 0);
/* Wait for the pending call to complete. */
PyThread_acquire_lock(mutex, WAIT_LOCK);
PyThread_release_lock(mutex);
PyThread_free_lock(mutex);
PyObject *res = PyLong_FromLongLong(pending_identify_result.interpid);
pending_identify_result.interpid = -1;
if (res == NULL) {
return NULL;
}
return res;
}
static PyMethodDef module_functions[] = {
{"get_configs", get_configs, METH_NOARGS},
{"get_recursion_depth", get_recursion_depth, METH_NOARGS},
@ -868,6 +984,10 @@ static PyMethodDef module_functions[] = {
{"iframe_getlasti", iframe_getlasti, METH_O, NULL},
{"set_optimizer", set_optimizer, METH_O, NULL},
{"get_counter_optimizer", get_counter_optimizer, METH_NOARGS, NULL},
{"pending_threadfunc", _PyCFunction_CAST(pending_threadfunc),
METH_VARARGS | METH_KEYWORDS},
// {"pending_fd_identify", pending_fd_identify, METH_VARARGS, NULL},
{"pending_identify", pending_identify, METH_VARARGS, NULL},
{NULL, NULL} /* sentinel */
};

View file

@ -81,6 +81,7 @@ lock_dealloc(lockobject *self)
static PyLockStatus
acquire_timed(PyThread_type_lock lock, _PyTime_t timeout)
{
PyThreadState *tstate = _PyThreadState_GET();
_PyTime_t endtime = 0;
if (timeout > 0) {
endtime = _PyDeadline_Init(timeout);
@ -103,7 +104,7 @@ acquire_timed(PyThread_type_lock lock, _PyTime_t timeout)
/* Run signal handlers if we were interrupted. Propagate
* exceptions from signal handlers, such as KeyboardInterrupt, by
* passing up PY_LOCK_INTR. */
if (Py_MakePendingCalls() < 0) {
if (_PyEval_MakePendingCalls(tstate) < 0) {
return PY_LOCK_INTR;
}

View file

@ -314,7 +314,8 @@ trip_signal(int sig_num)
still use it for this exceptional case. */
_PyEval_AddPendingCall(interp,
report_wakeup_send_error,
(void *)(intptr_t) last_error);
(void *)(intptr_t) last_error,
1);
}
}
}
@ -333,7 +334,8 @@ trip_signal(int sig_num)
still use it for this exceptional case. */
_PyEval_AddPendingCall(interp,
report_wakeup_write_error,
(void *)(intptr_t)errno);
(void *)(intptr_t)errno,
1);
}
}
}

View file

@ -758,6 +758,61 @@ handle_eval_breaker:
* We need to do reasonably frequently, but not too frequently.
* All loops should include a check of the eval breaker.
* We also check on return from any builtin function.
*
* ## More Details ###
*
* The eval loop (this function) normally executes the instructions
* of a code object sequentially. However, the runtime supports a
* number of out-of-band execution scenarios that may pause that
* sequential execution long enough to do that out-of-band work
* in the current thread using the current PyThreadState.
*
* The scenarios include:
*
* - cyclic garbage collection
* - GIL drop requests
* - "async" exceptions
* - "pending calls" (some only in the main thread)
* - signal handling (only in the main thread)
*
* When the need for one of the above is detected, the eval loop
* pauses long enough to handle the detected case. Then, if doing
* so didn't trigger an exception, the eval loop resumes executing
* the sequential instructions.
*
* To make this work, the eval loop periodically checks if any
* of the above needs to happen. The individual checks can be
* expensive if computed each time, so a while back we switched
* to using pre-computed, per-interpreter variables for the checks,
* and later consolidated that to a single "eval breaker" variable
* (now a PyInterpreterState field).
*
* For the longest time, the eval breaker check would happen
* frequently, every 5 or so times through the loop, regardless
* of what instruction ran last or what would run next. Then, in
* early 2021 (gh-18334, commit 4958f5d), we switched to checking
* the eval breaker less frequently, by hard-coding the check to
* specific places in the eval loop (e.g. certain instructions).
* The intent then was to check after returning from calls
* and on the back edges of loops.
*
* In addition to being more efficient, that approach keeps
* the eval loop from running arbitrary code between instructions
* that don't handle that well. (See gh-74174.)
*
* Currently, the eval breaker check happens here at the
* "handle_eval_breaker" label. Some instructions come here
* explicitly (goto) and some indirectly. Notably, the check
* happens on back edges in the control flow graph, which
* pretty much applies to all loops and most calls.
* (See bytecodes.c for exact information.)
*
* One consequence of this approach is that it might not be obvious
* how to force any specific thread to pick up the eval breaker,
* or for any specific thread to not pick it up. Mostly this
* involves judicious uses of locks and careful ordering of code,
* while avoiding code that might trigger the eval breaker
* until so desired.
*/
if (_Py_HandlePending(tstate) != 0) {
goto error;

View file

@ -68,8 +68,9 @@ COMPUTE_EVAL_BREAKER(PyInterpreterState *interp,
_Py_atomic_load_relaxed_int32(&ceval2->gil_drop_request)
| (_Py_atomic_load_relaxed_int32(&ceval->signals_pending)
&& _Py_ThreadCanHandleSignals(interp))
| (_Py_atomic_load_relaxed_int32(&ceval2->pending.calls_to_do)
&& _Py_ThreadCanHandlePendingCalls())
| (_Py_atomic_load_relaxed_int32(&ceval2->pending.calls_to_do))
| (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)
&&_Py_atomic_load_relaxed_int32(&ceval->pending_mainthread.calls_to_do))
| ceval2->pending.async_exc
| _Py_atomic_load_relaxed_int32(&ceval2->gc_scheduled));
}
@ -95,11 +96,11 @@ RESET_GIL_DROP_REQUEST(PyInterpreterState *interp)
static inline void
SIGNAL_PENDING_CALLS(PyInterpreterState *interp)
SIGNAL_PENDING_CALLS(struct _pending_calls *pending, PyInterpreterState *interp)
{
struct _ceval_runtime_state *ceval = &interp->runtime->ceval;
struct _ceval_state *ceval2 = &interp->ceval;
_Py_atomic_store_relaxed(&ceval2->pending.calls_to_do, 1);
_Py_atomic_store_relaxed(&pending->calls_to_do, 1);
COMPUTE_EVAL_BREAKER(interp, ceval, ceval2);
}
@ -109,6 +110,9 @@ UNSIGNAL_PENDING_CALLS(PyInterpreterState *interp)
{
struct _ceval_runtime_state *ceval = &interp->runtime->ceval;
struct _ceval_state *ceval2 = &interp->ceval;
if (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)) {
_Py_atomic_store_relaxed(&ceval->pending_mainthread.calls_to_do, 0);
}
_Py_atomic_store_relaxed(&ceval2->pending.calls_to_do, 0);
COMPUTE_EVAL_BREAKER(interp, ceval, ceval2);
}
@ -803,19 +807,31 @@ _push_pending_call(struct _pending_calls *pending,
return 0;
}
static int
_next_pending_call(struct _pending_calls *pending,
int (**func)(void *), void **arg)
{
int i = pending->first;
if (i == pending->last) {
/* Queue empty */
assert(pending->calls[i].func == NULL);
return -1;
}
*func = pending->calls[i].func;
*arg = pending->calls[i].arg;
return i;
}
/* Pop one item off the queue while holding the lock. */
static void
_pop_pending_call(struct _pending_calls *pending,
int (**func)(void *), void **arg)
{
int i = pending->first;
if (i == pending->last) {
return; /* Queue empty */
int i = _next_pending_call(pending, func, arg);
if (i >= 0) {
pending->calls[i] = (struct _pending_call){0};
pending->first = (i + 1) % NPENDINGCALLS;
}
*func = pending->calls[i].func;
*arg = pending->calls[i].arg;
pending->first = (i + 1) % NPENDINGCALLS;
}
/* This implementation is thread-safe. It allows
@ -825,9 +841,16 @@ _pop_pending_call(struct _pending_calls *pending,
int
_PyEval_AddPendingCall(PyInterpreterState *interp,
int (*func)(void *), void *arg)
int (*func)(void *), void *arg,
int mainthreadonly)
{
assert(!mainthreadonly || _Py_IsMainInterpreter(interp));
struct _pending_calls *pending = &interp->ceval.pending;
if (mainthreadonly) {
/* The main thread only exists in the main interpreter. */
assert(_Py_IsMainInterpreter(interp));
pending = &_PyRuntime.ceval.pending_mainthread;
}
/* Ensure that _PyEval_InitState() was called
and that _PyEval_FiniState() is not called yet. */
assert(pending->lock != NULL);
@ -837,39 +860,17 @@ _PyEval_AddPendingCall(PyInterpreterState *interp,
PyThread_release_lock(pending->lock);
/* signal main loop */
SIGNAL_PENDING_CALLS(interp);
SIGNAL_PENDING_CALLS(pending, interp);
return result;
}
int
Py_AddPendingCall(int (*func)(void *), void *arg)
{
/* Best-effort to support subinterpreters and calls with the GIL released.
First attempt _PyThreadState_GET() since it supports subinterpreters.
If the GIL is released, _PyThreadState_GET() returns NULL . In this
case, use PyGILState_GetThisThreadState() which works even if the GIL
is released.
Sadly, PyGILState_GetThisThreadState() doesn't support subinterpreters:
see bpo-10915 and bpo-15751.
Py_AddPendingCall() doesn't require the caller to hold the GIL. */
PyThreadState *tstate = _PyThreadState_GET();
if (tstate == NULL) {
tstate = PyGILState_GetThisThreadState();
}
PyInterpreterState *interp;
if (tstate != NULL) {
interp = tstate->interp;
}
else {
/* Last resort: use the main interpreter */
interp = _PyInterpreterState_Main();
}
return _PyEval_AddPendingCall(interp, func, arg);
/* Legacy users of this API will continue to target the main thread
(of the main interpreter). */
PyInterpreterState *interp = _PyInterpreterState_Main();
return _PyEval_AddPendingCall(interp, func, arg, 1);
}
static int
@ -889,27 +890,24 @@ handle_signals(PyThreadState *tstate)
return 0;
}
static int
make_pending_calls(PyInterpreterState *interp)
static inline int
maybe_has_pending_calls(PyInterpreterState *interp)
{
/* only execute pending calls on main thread */
if (!_Py_ThreadCanHandlePendingCalls()) {
return 0;
}
/* don't perform recursive pending calls */
if (interp->ceval.pending.busy) {
return 0;
}
interp->ceval.pending.busy = 1;
/* unsignal before starting to call callbacks, so that any callback
added in-between re-signals */
UNSIGNAL_PENDING_CALLS(interp);
int res = 0;
/* perform a bounded number of calls, in case of recursion */
struct _pending_calls *pending = &interp->ceval.pending;
if (_Py_atomic_load_relaxed_int32(&pending->calls_to_do)) {
return 1;
}
if (!_Py_IsMainThread() || !_Py_IsMainInterpreter(interp)) {
return 0;
}
pending = &_PyRuntime.ceval.pending_mainthread;
return _Py_atomic_load_relaxed_int32(&pending->calls_to_do);
}
static int
_make_pending_calls(struct _pending_calls *pending)
{
/* perform a bounded number of calls, in case of recursion */
for (int i=0; i<NPENDINGCALLS; i++) {
int (*func)(void *) = NULL;
void *arg = NULL;
@ -923,19 +921,61 @@ make_pending_calls(PyInterpreterState *interp)
if (func == NULL) {
break;
}
res = func(arg);
if (res) {
goto error;
if (func(arg) != 0) {
return -1;
}
}
return 0;
}
static int
make_pending_calls(PyInterpreterState *interp)
{
struct _pending_calls *pending = &interp->ceval.pending;
struct _pending_calls *pending_main = &_PyRuntime.ceval.pending_mainthread;
/* Only one thread (per interpreter) may run the pending calls
at once. In the same way, we don't do recursive pending calls. */
PyThread_acquire_lock(pending->lock, WAIT_LOCK);
if (pending->busy) {
/* A pending call was added after another thread was already
handling the pending calls (and had already "unsignaled").
Once that thread is done, it may have taken care of all the
pending calls, or there might be some still waiting.
Regardless, this interpreter's pending calls will stay
"signaled" until that first thread has finished. At that
point the next thread to trip the eval breaker will take
care of any remaining pending calls. Until then, though,
all the interpreter's threads will be tripping the eval
breaker every time it's checked. */
PyThread_release_lock(pending->lock);
return 0;
}
pending->busy = 1;
PyThread_release_lock(pending->lock);
/* unsignal before starting to call callbacks, so that any callback
added in-between re-signals */
UNSIGNAL_PENDING_CALLS(interp);
if (_make_pending_calls(pending) != 0) {
pending->busy = 0;
/* There might not be more calls to make, but we play it safe. */
SIGNAL_PENDING_CALLS(pending, interp);
return -1;
}
if (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)) {
if (_make_pending_calls(pending_main) != 0) {
pending->busy = 0;
/* There might not be more calls to make, but we play it safe. */
SIGNAL_PENDING_CALLS(pending_main, interp);
return -1;
}
}
interp->ceval.pending.busy = 0;
return res;
error:
interp->ceval.pending.busy = 0;
SIGNAL_PENDING_CALLS(interp);
return res;
pending->busy = 0;
return 0;
}
void
@ -944,12 +984,6 @@ _Py_FinishPendingCalls(PyThreadState *tstate)
assert(PyGILState_Check());
assert(is_tstate_valid(tstate));
struct _pending_calls *pending = &tstate->interp->ceval.pending;
if (!_Py_atomic_load_relaxed_int32(&(pending->calls_to_do))) {
return;
}
if (make_pending_calls(tstate->interp) < 0) {
PyObject *exc = _PyErr_GetRaisedException(tstate);
PyErr_BadInternalCall();
@ -958,6 +992,29 @@ _Py_FinishPendingCalls(PyThreadState *tstate)
}
}
int
_PyEval_MakePendingCalls(PyThreadState *tstate)
{
int res;
if (_Py_IsMainThread() && _Py_IsMainInterpreter(tstate->interp)) {
/* Python signal handler doesn't really queue a callback:
it only signals that a signal was received,
see _PyEval_SignalReceived(). */
res = handle_signals(tstate);
if (res != 0) {
return res;
}
}
res = make_pending_calls(tstate->interp);
if (res != 0) {
return res;
}
return 0;
}
/* Py_MakePendingCalls() is a simple wrapper for the sake
of backward-compatibility. */
int
@ -968,19 +1025,11 @@ Py_MakePendingCalls(void)
PyThreadState *tstate = _PyThreadState_GET();
assert(is_tstate_valid(tstate));
/* Python signal handler doesn't really queue a callback: it only signals
that a signal was received, see _PyEval_SignalReceived(). */
int res = handle_signals(tstate);
if (res != 0) {
return res;
/* Only execute pending calls on the main thread. */
if (!_Py_IsMainThread() || !_Py_IsMainInterpreter(tstate->interp)) {
return 0;
}
res = make_pending_calls(tstate->interp);
if (res != 0) {
return res;
}
return 0;
return _PyEval_MakePendingCalls(tstate);
}
void
@ -1020,7 +1069,7 @@ _Py_HandlePending(PyThreadState *tstate)
}
/* Pending calls */
if (_Py_atomic_load_relaxed_int32(&interp_ceval_state->pending.calls_to_do)) {
if (maybe_has_pending_calls(tstate->interp)) {
if (make_pending_calls(tstate->interp) != 0) {
return -1;
}

View file

@ -2152,6 +2152,9 @@ Py_EndInterpreter(PyThreadState *tstate)
// Wrap up existing "threading"-module-created, non-daemon threads.
wait_for_thread_shutdown(tstate);
// Make any remaining pending calls.
_Py_FinishPendingCalls(tstate);
_PyAtExit_Call(tstate->interp);
if (tstate != interp->threads.head || tstate->next != NULL) {

View file

@ -380,7 +380,7 @@ _Py_COMP_DIAG_IGNORE_DEPR_DECLS
static const _PyRuntimeState initial = _PyRuntimeState_INIT(_PyRuntime);
_Py_COMP_DIAG_POP
#define NUMLOCKS 8
#define NUMLOCKS 9
#define LOCKS_INIT(runtime) \
{ \
&(runtime)->interpreters.mutex, \
@ -388,6 +388,7 @@ _Py_COMP_DIAG_POP
&(runtime)->getargs.mutex, \
&(runtime)->unicode_state.ids.lock, \
&(runtime)->imports.extensions.mutex, \
&(runtime)->ceval.pending_mainthread.lock, \
&(runtime)->atexit.mutex, \
&(runtime)->audit_hooks.mutex, \
&(runtime)->allocators.mutex, \

View file

@ -517,6 +517,7 @@ Modules/_testcapimodule.c - g_type_watchers_installed -
Modules/_testimportmultiple.c - _barmodule -
Modules/_testimportmultiple.c - _foomodule -
Modules/_testimportmultiple.c - _testimportmultiple -
Modules/_testinternalcapi.c - pending_identify_result -
Modules/_testmultiphase.c - Example_Type_slots -
Modules/_testmultiphase.c - Example_Type_spec -
Modules/_testmultiphase.c - Example_methods -

Can't render this file because it has a wrong number of fields in line 4.