mirror of
https://github.com/python/cpython.git
synced 2025-07-19 09:15:34 +00:00
gh-111964: Implement stop-the-world pauses (gh-112471)
The `--disable-gil` builds occasionally need to pause all but one thread. Some examples include: * Cyclic garbage collection, where this is often called a "stop the world event" * Before calling `fork()`, to ensure a consistent state for internal data structures * During interpreter shutdown, to ensure that daemon threads aren't accessing Python objects This adds the following functions to implement global and per-interpreter pauses: * `_PyEval_StopTheWorldAll()` and `_PyEval_StartTheWorldAll()` (for the global runtime) * `_PyEval_StopTheWorld()` and `_PyEval_StartTheWorld()` (per-interpreter) (The function names may change.) These functions are no-ops outside of the `--disable-gil` build.
This commit is contained in:
parent
5f1997896d
commit
441affc9e7
10 changed files with 336 additions and 29 deletions
|
@ -949,6 +949,15 @@ _Py_HandlePending(PyThreadState *tstate)
|
|||
{
|
||||
PyInterpreterState *interp = tstate->interp;
|
||||
|
||||
/* Stop-the-world */
|
||||
if (_Py_eval_breaker_bit_is_set(interp, _PY_EVAL_PLEASE_STOP_BIT)) {
|
||||
_Py_set_eval_breaker_bit(interp, _PY_EVAL_PLEASE_STOP_BIT, 0);
|
||||
_PyThreadState_Suspend(tstate);
|
||||
|
||||
/* The attach blocks until the stop-the-world event is complete. */
|
||||
_PyThreadState_Attach(tstate);
|
||||
}
|
||||
|
||||
/* Pending signals */
|
||||
if (_Py_eval_breaker_bit_is_set(interp, _PY_SIGNALS_PENDING_BIT)) {
|
||||
if (handle_signals(tstate) != 0) {
|
||||
|
|
269
Python/pystate.c
269
Python/pystate.c
|
@ -1336,6 +1336,11 @@ init_threadstate(_PyThreadStateImpl *_tstate,
|
|||
tstate->datastack_limit = NULL;
|
||||
tstate->what_event = -1;
|
||||
|
||||
if (interp->stoptheworld.requested || _PyRuntime.stoptheworld.requested) {
|
||||
// Start in the suspended state if there is an ongoing stop-the-world.
|
||||
tstate->state = _Py_THREAD_SUSPENDED;
|
||||
}
|
||||
|
||||
tstate->_status.initialized = 1;
|
||||
}
|
||||
|
||||
|
@ -1562,6 +1567,9 @@ PyThreadState_Clear(PyThreadState *tstate)
|
|||
// XXX Do it as early in the function as possible.
|
||||
}
|
||||
|
||||
static void
|
||||
decrement_stoptheworld_countdown(struct _stoptheworld_state *stw);
|
||||
|
||||
/* Common code for PyThreadState_Delete() and PyThreadState_DeleteCurrent() */
|
||||
static void
|
||||
tstate_delete_common(PyThreadState *tstate)
|
||||
|
@ -1585,6 +1593,16 @@ tstate_delete_common(PyThreadState *tstate)
|
|||
if (tstate->next) {
|
||||
tstate->next->prev = tstate->prev;
|
||||
}
|
||||
if (tstate->state != _Py_THREAD_SUSPENDED) {
|
||||
// Any ongoing stop-the-world request should not wait for us because
|
||||
// our thread is getting deleted.
|
||||
if (interp->stoptheworld.requested) {
|
||||
decrement_stoptheworld_countdown(&interp->stoptheworld);
|
||||
}
|
||||
if (runtime->stoptheworld.requested) {
|
||||
decrement_stoptheworld_countdown(&runtime->stoptheworld);
|
||||
}
|
||||
}
|
||||
HEAD_UNLOCK(runtime);
|
||||
|
||||
// XXX Unbind in PyThreadState_Clear(), or earlier
|
||||
|
@ -1790,13 +1808,9 @@ tstate_try_attach(PyThreadState *tstate)
|
|||
{
|
||||
#ifdef Py_GIL_DISABLED
|
||||
int expected = _Py_THREAD_DETACHED;
|
||||
if (_Py_atomic_compare_exchange_int(
|
||||
&tstate->state,
|
||||
&expected,
|
||||
_Py_THREAD_ATTACHED)) {
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
return _Py_atomic_compare_exchange_int(&tstate->state,
|
||||
&expected,
|
||||
_Py_THREAD_ATTACHED);
|
||||
#else
|
||||
assert(tstate->state == _Py_THREAD_DETACHED);
|
||||
tstate->state = _Py_THREAD_ATTACHED;
|
||||
|
@ -1815,6 +1829,20 @@ tstate_set_detached(PyThreadState *tstate)
|
|||
#endif
|
||||
}
|
||||
|
||||
static void
|
||||
tstate_wait_attach(PyThreadState *tstate)
|
||||
{
|
||||
do {
|
||||
int expected = _Py_THREAD_SUSPENDED;
|
||||
|
||||
// Wait until we're switched out of SUSPENDED to DETACHED.
|
||||
_PyParkingLot_Park(&tstate->state, &expected, sizeof(tstate->state),
|
||||
/*timeout=*/-1, NULL, /*detach=*/0);
|
||||
|
||||
// Once we're back in DETACHED we can re-attach
|
||||
} while (!tstate_try_attach(tstate));
|
||||
}
|
||||
|
||||
void
|
||||
_PyThreadState_Attach(PyThreadState *tstate)
|
||||
{
|
||||
|
@ -1836,10 +1864,7 @@ _PyThreadState_Attach(PyThreadState *tstate)
|
|||
tstate_activate(tstate);
|
||||
|
||||
if (!tstate_try_attach(tstate)) {
|
||||
// TODO: Once stop-the-world GC is implemented for --disable-gil builds
|
||||
// this will need to wait until the GC completes. For now, this case
|
||||
// should never happen.
|
||||
Py_FatalError("thread attach failed");
|
||||
tstate_wait_attach(tstate);
|
||||
}
|
||||
|
||||
// Resume previous critical section. This acquires the lock(s) from the
|
||||
|
@ -1853,8 +1878,8 @@ _PyThreadState_Attach(PyThreadState *tstate)
|
|||
#endif
|
||||
}
|
||||
|
||||
void
|
||||
_PyThreadState_Detach(PyThreadState *tstate)
|
||||
static void
|
||||
detach_thread(PyThreadState *tstate, int detached_state)
|
||||
{
|
||||
// XXX assert(tstate_is_alive(tstate) && tstate_is_bound(tstate));
|
||||
assert(tstate->state == _Py_THREAD_ATTACHED);
|
||||
|
@ -1862,12 +1887,228 @@ _PyThreadState_Detach(PyThreadState *tstate)
|
|||
if (tstate->critical_section != 0) {
|
||||
_PyCriticalSection_SuspendAll(tstate);
|
||||
}
|
||||
tstate_set_detached(tstate);
|
||||
tstate_deactivate(tstate);
|
||||
tstate_set_detached(tstate);
|
||||
current_fast_clear(&_PyRuntime);
|
||||
_PyEval_ReleaseLock(tstate->interp, tstate);
|
||||
}
|
||||
|
||||
void
|
||||
_PyThreadState_Detach(PyThreadState *tstate)
|
||||
{
|
||||
detach_thread(tstate, _Py_THREAD_DETACHED);
|
||||
}
|
||||
|
||||
void
|
||||
_PyThreadState_Suspend(PyThreadState *tstate)
|
||||
{
|
||||
_PyRuntimeState *runtime = &_PyRuntime;
|
||||
|
||||
assert(tstate->state == _Py_THREAD_ATTACHED);
|
||||
|
||||
struct _stoptheworld_state *stw = NULL;
|
||||
HEAD_LOCK(runtime);
|
||||
if (runtime->stoptheworld.requested) {
|
||||
stw = &runtime->stoptheworld;
|
||||
}
|
||||
else if (tstate->interp->stoptheworld.requested) {
|
||||
stw = &tstate->interp->stoptheworld;
|
||||
}
|
||||
HEAD_UNLOCK(runtime);
|
||||
|
||||
if (stw == NULL) {
|
||||
// Switch directly to "detached" if there is no active stop-the-world
|
||||
// request.
|
||||
detach_thread(tstate, _Py_THREAD_DETACHED);
|
||||
return;
|
||||
}
|
||||
|
||||
// Switch to "suspended" state.
|
||||
detach_thread(tstate, _Py_THREAD_SUSPENDED);
|
||||
|
||||
// Decrease the count of remaining threads needing to park.
|
||||
HEAD_LOCK(runtime);
|
||||
decrement_stoptheworld_countdown(stw);
|
||||
HEAD_UNLOCK(runtime);
|
||||
}
|
||||
|
||||
// Decrease stop-the-world counter of remaining number of threads that need to
|
||||
// pause. If we are the final thread to pause, notify the requesting thread.
|
||||
static void
|
||||
decrement_stoptheworld_countdown(struct _stoptheworld_state *stw)
|
||||
{
|
||||
assert(stw->thread_countdown > 0);
|
||||
if (--stw->thread_countdown == 0) {
|
||||
_PyEvent_Notify(&stw->stop_event);
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef Py_GIL_DISABLED
|
||||
// Interpreter for _Py_FOR_EACH_THREAD(). For global stop-the-world events,
|
||||
// we start with the first interpreter and then iterate over all interpreters.
|
||||
// For per-interpreter stop-the-world events, we only operate on the one
|
||||
// interpreter.
|
||||
static PyInterpreterState *
|
||||
interp_for_stop_the_world(struct _stoptheworld_state *stw)
|
||||
{
|
||||
return (stw->is_global
|
||||
? PyInterpreterState_Head()
|
||||
: _Py_CONTAINER_OF(stw, PyInterpreterState, stoptheworld));
|
||||
}
|
||||
|
||||
// Loops over threads for a stop-the-world event.
|
||||
// For global: all threads in all interpreters
|
||||
// For per-interpreter: all threads in the interpreter
|
||||
#define _Py_FOR_EACH_THREAD(stw, i, t) \
|
||||
for (i = interp_for_stop_the_world((stw)); \
|
||||
i != NULL; i = ((stw->is_global) ? i->next : NULL)) \
|
||||
for (t = i->threads.head; t; t = t->next)
|
||||
|
||||
|
||||
// Try to transition threads atomically from the "detached" state to the
|
||||
// "gc stopped" state. Returns true if all threads are in the "gc stopped"
|
||||
static bool
|
||||
park_detached_threads(struct _stoptheworld_state *stw)
|
||||
{
|
||||
int num_parked = 0;
|
||||
PyInterpreterState *i;
|
||||
PyThreadState *t;
|
||||
_Py_FOR_EACH_THREAD(stw, i, t) {
|
||||
int state = _Py_atomic_load_int_relaxed(&t->state);
|
||||
if (state == _Py_THREAD_DETACHED) {
|
||||
// Atomically transition to "suspended" if in "detached" state.
|
||||
if (_Py_atomic_compare_exchange_int(&t->state,
|
||||
&state, _Py_THREAD_SUSPENDED)) {
|
||||
num_parked++;
|
||||
}
|
||||
}
|
||||
else if (state == _Py_THREAD_ATTACHED && t != stw->requester) {
|
||||
// TODO: set this per-thread, rather than per-interpreter.
|
||||
_Py_set_eval_breaker_bit(t->interp, _PY_EVAL_PLEASE_STOP_BIT, 1);
|
||||
}
|
||||
}
|
||||
stw->thread_countdown -= num_parked;
|
||||
assert(stw->thread_countdown >= 0);
|
||||
return num_parked > 0 && stw->thread_countdown == 0;
|
||||
}
|
||||
|
||||
static void
|
||||
stop_the_world(struct _stoptheworld_state *stw)
|
||||
{
|
||||
_PyRuntimeState *runtime = &_PyRuntime;
|
||||
|
||||
PyMutex_Lock(&stw->mutex);
|
||||
if (stw->is_global) {
|
||||
_PyRWMutex_Lock(&runtime->stoptheworld_mutex);
|
||||
}
|
||||
else {
|
||||
_PyRWMutex_RLock(&runtime->stoptheworld_mutex);
|
||||
}
|
||||
|
||||
HEAD_LOCK(runtime);
|
||||
stw->requested = 1;
|
||||
stw->thread_countdown = 0;
|
||||
stw->stop_event = (PyEvent){0}; // zero-initialize (unset)
|
||||
stw->requester = _PyThreadState_GET(); // may be NULL
|
||||
|
||||
PyInterpreterState *i;
|
||||
PyThreadState *t;
|
||||
_Py_FOR_EACH_THREAD(stw, i, t) {
|
||||
if (t != stw->requester) {
|
||||
// Count all the other threads (we don't wait on ourself).
|
||||
stw->thread_countdown++;
|
||||
}
|
||||
}
|
||||
|
||||
if (stw->thread_countdown == 0) {
|
||||
HEAD_UNLOCK(runtime);
|
||||
stw->world_stopped = 1;
|
||||
return;
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
// Switch threads that are detached to the GC stopped state
|
||||
bool stopped_all_threads = park_detached_threads(stw);
|
||||
HEAD_UNLOCK(runtime);
|
||||
|
||||
if (stopped_all_threads) {
|
||||
break;
|
||||
}
|
||||
|
||||
_PyTime_t wait_ns = 1000*1000; // 1ms (arbitrary, may need tuning)
|
||||
if (PyEvent_WaitTimed(&stw->stop_event, wait_ns)) {
|
||||
assert(stw->thread_countdown == 0);
|
||||
break;
|
||||
}
|
||||
|
||||
HEAD_LOCK(runtime);
|
||||
}
|
||||
stw->world_stopped = 1;
|
||||
}
|
||||
|
||||
static void
|
||||
start_the_world(struct _stoptheworld_state *stw)
|
||||
{
|
||||
_PyRuntimeState *runtime = &_PyRuntime;
|
||||
assert(PyMutex_IsLocked(&stw->mutex));
|
||||
|
||||
HEAD_LOCK(runtime);
|
||||
stw->requested = 0;
|
||||
stw->world_stopped = 0;
|
||||
stw->requester = NULL;
|
||||
// Switch threads back to the detached state.
|
||||
PyInterpreterState *i;
|
||||
PyThreadState *t;
|
||||
_Py_FOR_EACH_THREAD(stw, i, t) {
|
||||
if (t != stw->requester) {
|
||||
assert(t->state == _Py_THREAD_SUSPENDED);
|
||||
_Py_atomic_store_int(&t->state, _Py_THREAD_DETACHED);
|
||||
_PyParkingLot_UnparkAll(&t->state);
|
||||
}
|
||||
}
|
||||
HEAD_UNLOCK(runtime);
|
||||
if (stw->is_global) {
|
||||
_PyRWMutex_Unlock(&runtime->stoptheworld_mutex);
|
||||
}
|
||||
else {
|
||||
_PyRWMutex_RUnlock(&runtime->stoptheworld_mutex);
|
||||
}
|
||||
PyMutex_Unlock(&stw->mutex);
|
||||
}
|
||||
#endif // Py_GIL_DISABLED
|
||||
|
||||
void
|
||||
_PyEval_StopTheWorldAll(_PyRuntimeState *runtime)
|
||||
{
|
||||
#ifdef Py_GIL_DISABLED
|
||||
stop_the_world(&runtime->stoptheworld);
|
||||
#endif
|
||||
}
|
||||
|
||||
void
|
||||
_PyEval_StartTheWorldAll(_PyRuntimeState *runtime)
|
||||
{
|
||||
#ifdef Py_GIL_DISABLED
|
||||
start_the_world(&runtime->stoptheworld);
|
||||
#endif
|
||||
}
|
||||
|
||||
void
|
||||
_PyEval_StopTheWorld(PyInterpreterState *interp)
|
||||
{
|
||||
#ifdef Py_GIL_DISABLED
|
||||
stop_the_world(&interp->stoptheworld);
|
||||
#endif
|
||||
}
|
||||
|
||||
void
|
||||
_PyEval_StartTheWorld(PyInterpreterState *interp)
|
||||
{
|
||||
#ifdef Py_GIL_DISABLED
|
||||
start_the_world(&interp->stoptheworld);
|
||||
#endif
|
||||
}
|
||||
|
||||
//----------
|
||||
// other API
|
||||
//----------
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue