mirror of
https://github.com/python/cpython.git
synced 2025-07-12 13:55:34 +00:00
gh-104812: Run Pending Calls in any Thread (gh-104813)
For a while now, pending calls only run in the main thread (in the main interpreter). This PR changes things to allow any thread run a pending call, unless the pending call was explicitly added for the main thread to run.
This commit is contained in:
parent
4e80082723
commit
757b402ea1
16 changed files with 766 additions and 123 deletions
|
@ -22,6 +22,8 @@ PyAPI_FUNC(PyObject *) _PyEval_EvalFrameDefault(PyThreadState *tstate, struct _P
|
|||
PyAPI_FUNC(void) _PyEval_SetSwitchInterval(unsigned long microseconds);
|
||||
PyAPI_FUNC(unsigned long) _PyEval_GetSwitchInterval(void);
|
||||
|
||||
PyAPI_FUNC(int) _PyEval_MakePendingCalls(PyThreadState *);
|
||||
|
||||
PyAPI_FUNC(Py_ssize_t) PyUnstable_Eval_RequestCodeExtraIndex(freefunc);
|
||||
// Old name -- remove when this API changes:
|
||||
_Py_DEPRECATED_EXTERNALLY(3.12) static inline Py_ssize_t
|
||||
|
|
|
@ -27,7 +27,8 @@ PyAPI_FUNC(void) _PyEval_SignalReceived(PyInterpreterState *interp);
|
|||
PyAPI_FUNC(int) _PyEval_AddPendingCall(
|
||||
PyInterpreterState *interp,
|
||||
int (*func)(void *),
|
||||
void *arg);
|
||||
void *arg,
|
||||
int mainthreadonly);
|
||||
PyAPI_FUNC(void) _PyEval_SignalAsyncExc(PyInterpreterState *interp);
|
||||
#ifdef HAVE_FORK
|
||||
extern PyStatus _PyEval_ReInitThreads(PyThreadState *tstate);
|
||||
|
|
|
@ -13,6 +13,24 @@ extern "C" {
|
|||
#include "pycore_gil.h" // struct _gil_runtime_state
|
||||
|
||||
|
||||
struct _pending_calls {
|
||||
int busy;
|
||||
PyThread_type_lock lock;
|
||||
/* Request for running pending calls. */
|
||||
_Py_atomic_int calls_to_do;
|
||||
/* Request for looking at the `async_exc` field of the current
|
||||
thread state.
|
||||
Guarded by the GIL. */
|
||||
int async_exc;
|
||||
#define NPENDINGCALLS 32
|
||||
struct _pending_call {
|
||||
int (*func)(void *);
|
||||
void *arg;
|
||||
} calls[NPENDINGCALLS];
|
||||
int first;
|
||||
int last;
|
||||
};
|
||||
|
||||
typedef enum {
|
||||
PERF_STATUS_FAILED = -1, // Perf trampoline is in an invalid state
|
||||
PERF_STATUS_NO_INIT = 0, // Perf trampoline is not initialized
|
||||
|
@ -49,6 +67,8 @@ struct _ceval_runtime_state {
|
|||
the main thread of the main interpreter can handle signals: see
|
||||
_Py_ThreadCanHandleSignals(). */
|
||||
_Py_atomic_int signals_pending;
|
||||
/* Pending calls to be made only on the main thread. */
|
||||
struct _pending_calls pending_mainthread;
|
||||
};
|
||||
|
||||
#ifdef PY_HAVE_PERF_TRAMPOLINE
|
||||
|
@ -62,24 +82,6 @@ struct _ceval_runtime_state {
|
|||
#endif
|
||||
|
||||
|
||||
struct _pending_calls {
|
||||
int busy;
|
||||
PyThread_type_lock lock;
|
||||
/* Request for running pending calls. */
|
||||
_Py_atomic_int calls_to_do;
|
||||
/* Request for looking at the `async_exc` field of the current
|
||||
thread state.
|
||||
Guarded by the GIL. */
|
||||
int async_exc;
|
||||
#define NPENDINGCALLS 32
|
||||
struct {
|
||||
int (*func)(void *);
|
||||
void *arg;
|
||||
} calls[NPENDINGCALLS];
|
||||
int first;
|
||||
int last;
|
||||
};
|
||||
|
||||
struct _ceval_state {
|
||||
/* This single variable consolidates all requests to break out of
|
||||
the fast path in the eval loop. */
|
||||
|
|
|
@ -60,14 +60,6 @@ _Py_ThreadCanHandleSignals(PyInterpreterState *interp)
|
|||
}
|
||||
|
||||
|
||||
/* Only execute pending calls on the main thread. */
|
||||
static inline int
|
||||
_Py_ThreadCanHandlePendingCalls(void)
|
||||
{
|
||||
return _Py_IsMainThread();
|
||||
}
|
||||
|
||||
|
||||
/* Variable and static inline functions for in-line access to current thread
|
||||
and interpreter state */
|
||||
|
||||
|
|
|
@ -115,7 +115,11 @@ def join_thread(thread, timeout=None):
|
|||
|
||||
@contextlib.contextmanager
|
||||
def start_threads(threads, unlock=None):
|
||||
try:
|
||||
import faulthandler
|
||||
except ImportError:
|
||||
# It isn't supported on subinterpreters yet.
|
||||
faulthandler = None
|
||||
threads = list(threads)
|
||||
started = []
|
||||
try:
|
||||
|
@ -147,6 +151,7 @@ def start_threads(threads, unlock=None):
|
|||
finally:
|
||||
started = [t for t in started if t.is_alive()]
|
||||
if started:
|
||||
if faulthandler is not None:
|
||||
faulthandler.dump_traceback(sys.stdout)
|
||||
raise AssertionError('Unable to join %d threads' % len(started))
|
||||
|
||||
|
|
|
@ -2,17 +2,20 @@
|
|||
# these are all functions _testcapi exports whose name begins with 'test_'.
|
||||
|
||||
import _thread
|
||||
from collections import OrderedDict
|
||||
from collections import OrderedDict, deque
|
||||
import contextlib
|
||||
import importlib.machinery
|
||||
import importlib.util
|
||||
import json
|
||||
import os
|
||||
import pickle
|
||||
import queue
|
||||
import random
|
||||
import sys
|
||||
import textwrap
|
||||
import threading
|
||||
import time
|
||||
import types
|
||||
import unittest
|
||||
import warnings
|
||||
import weakref
|
||||
|
@ -36,6 +39,10 @@ try:
|
|||
import _testsinglephase
|
||||
except ImportError:
|
||||
_testsinglephase = None
|
||||
try:
|
||||
import _xxsubinterpreters as _interpreters
|
||||
except ModuleNotFoundError:
|
||||
_interpreters = None
|
||||
|
||||
# Skip this test if the _testcapi module isn't available.
|
||||
_testcapi = import_helper.import_module('_testcapi')
|
||||
|
@ -47,6 +54,12 @@ def decode_stderr(err):
|
|||
return err.decode('utf-8', 'replace').replace('\r', '')
|
||||
|
||||
|
||||
def requires_subinterpreters(meth):
|
||||
"""Decorator to skip a test if subinterpreters are not supported."""
|
||||
return unittest.skipIf(_interpreters is None,
|
||||
'subinterpreters required')(meth)
|
||||
|
||||
|
||||
def testfunction(self):
|
||||
"""some doc"""
|
||||
return self
|
||||
|
@ -1259,6 +1272,10 @@ class TestHeapTypeRelative(unittest.TestCase):
|
|||
|
||||
class TestPendingCalls(unittest.TestCase):
|
||||
|
||||
# See the comment in ceval.c (at the "handle_eval_breaker" label)
|
||||
# about when pending calls get run. This is especially relevant
|
||||
# here for creating deterministic tests.
|
||||
|
||||
def pendingcalls_submit(self, l, n):
|
||||
def callback():
|
||||
#this function can be interrupted by thread switching so let's
|
||||
|
@ -1341,6 +1358,388 @@ class TestPendingCalls(unittest.TestCase):
|
|||
gen = genf()
|
||||
self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code)
|
||||
|
||||
class PendingTask(types.SimpleNamespace):
|
||||
|
||||
_add_pending = _testinternalcapi.pending_threadfunc
|
||||
|
||||
def __init__(self, req, taskid=None, notify_done=None):
|
||||
self.id = taskid
|
||||
self.req = req
|
||||
self.notify_done = notify_done
|
||||
|
||||
self.creator_tid = threading.get_ident()
|
||||
self.requester_tid = None
|
||||
self.runner_tid = None
|
||||
self.result = None
|
||||
|
||||
def run(self):
|
||||
assert self.result is None
|
||||
self.runner_tid = threading.get_ident()
|
||||
self._run()
|
||||
if self.notify_done is not None:
|
||||
self.notify_done()
|
||||
|
||||
def _run(self):
|
||||
self.result = self.req
|
||||
|
||||
def run_in_pending_call(self, worker_tids):
|
||||
assert self._add_pending is _testinternalcapi.pending_threadfunc
|
||||
self.requester_tid = threading.get_ident()
|
||||
def callback():
|
||||
assert self.result is None
|
||||
# It can be tricky to control which thread handles
|
||||
# the eval breaker, so we take a naive approach to
|
||||
# make sure.
|
||||
if threading.get_ident() not in worker_tids:
|
||||
self._add_pending(callback, ensure_added=True)
|
||||
return
|
||||
self.run()
|
||||
self._add_pending(callback, ensure_added=True)
|
||||
|
||||
def create_thread(self, worker_tids):
|
||||
return threading.Thread(
|
||||
target=self.run_in_pending_call,
|
||||
args=(worker_tids,),
|
||||
)
|
||||
|
||||
def wait_for_result(self):
|
||||
while self.result is None:
|
||||
time.sleep(0.01)
|
||||
|
||||
def test_subthreads_can_handle_pending_calls(self):
|
||||
payload = 'Spam spam spam spam. Lovely spam! Wonderful spam!'
|
||||
|
||||
task = self.PendingTask(payload)
|
||||
def do_the_work():
|
||||
tid = threading.get_ident()
|
||||
t = task.create_thread({tid})
|
||||
with threading_helper.start_threads([t]):
|
||||
task.wait_for_result()
|
||||
t = threading.Thread(target=do_the_work)
|
||||
with threading_helper.start_threads([t]):
|
||||
pass
|
||||
|
||||
self.assertEqual(task.result, payload)
|
||||
|
||||
def test_many_subthreads_can_handle_pending_calls(self):
|
||||
main_tid = threading.get_ident()
|
||||
self.assertEqual(threading.main_thread().ident, main_tid)
|
||||
|
||||
# We can't use queue.Queue since it isn't reentrant relative
|
||||
# to pending calls.
|
||||
_queue = deque()
|
||||
_active = deque()
|
||||
_done_lock = threading.Lock()
|
||||
def queue_put(task):
|
||||
_queue.append(task)
|
||||
_active.append(True)
|
||||
def queue_get():
|
||||
try:
|
||||
task = _queue.popleft()
|
||||
except IndexError:
|
||||
raise queue.Empty
|
||||
return task
|
||||
def queue_task_done():
|
||||
_active.pop()
|
||||
if not _active:
|
||||
try:
|
||||
_done_lock.release()
|
||||
except RuntimeError:
|
||||
assert not _done_lock.locked()
|
||||
def queue_empty():
|
||||
return not _queue
|
||||
def queue_join():
|
||||
_done_lock.acquire()
|
||||
_done_lock.release()
|
||||
|
||||
tasks = []
|
||||
for i in range(20):
|
||||
task = self.PendingTask(
|
||||
req=f'request {i}',
|
||||
taskid=i,
|
||||
notify_done=queue_task_done,
|
||||
)
|
||||
tasks.append(task)
|
||||
queue_put(task)
|
||||
# This will be released once all the tasks have finished.
|
||||
_done_lock.acquire()
|
||||
|
||||
def add_tasks(worker_tids):
|
||||
while True:
|
||||
if done:
|
||||
return
|
||||
try:
|
||||
task = queue_get()
|
||||
except queue.Empty:
|
||||
break
|
||||
task.run_in_pending_call(worker_tids)
|
||||
|
||||
done = False
|
||||
def run_tasks():
|
||||
while not queue_empty():
|
||||
if done:
|
||||
return
|
||||
time.sleep(0.01)
|
||||
# Give the worker a chance to handle any remaining pending calls.
|
||||
while not done:
|
||||
time.sleep(0.01)
|
||||
|
||||
# Start the workers and wait for them to finish.
|
||||
worker_threads = [threading.Thread(target=run_tasks)
|
||||
for _ in range(3)]
|
||||
with threading_helper.start_threads(worker_threads):
|
||||
try:
|
||||
# Add a pending call for each task.
|
||||
worker_tids = [t.ident for t in worker_threads]
|
||||
threads = [threading.Thread(target=add_tasks, args=(worker_tids,))
|
||||
for _ in range(3)]
|
||||
with threading_helper.start_threads(threads):
|
||||
try:
|
||||
pass
|
||||
except BaseException:
|
||||
done = True
|
||||
raise # re-raise
|
||||
# Wait for the pending calls to finish.
|
||||
queue_join()
|
||||
# Notify the workers that they can stop.
|
||||
done = True
|
||||
except BaseException:
|
||||
done = True
|
||||
raise # re-raise
|
||||
runner_tids = [t.runner_tid for t in tasks]
|
||||
|
||||
self.assertNotIn(main_tid, runner_tids)
|
||||
for task in tasks:
|
||||
with self.subTest(f'task {task.id}'):
|
||||
self.assertNotEqual(task.requester_tid, main_tid)
|
||||
self.assertNotEqual(task.requester_tid, task.runner_tid)
|
||||
self.assertNotIn(task.requester_tid, runner_tids)
|
||||
|
||||
@requires_subinterpreters
|
||||
def test_isolated_subinterpreter(self):
|
||||
# We exercise the most important permutations.
|
||||
|
||||
# This test relies on pending calls getting called
|
||||
# (eval breaker tripped) at each loop iteration
|
||||
# and at each call.
|
||||
|
||||
maxtext = 250
|
||||
main_interpid = 0
|
||||
interpid = _interpreters.create()
|
||||
_interpreters.run_string(interpid, f"""if True:
|
||||
import json
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
import _testinternalcapi
|
||||
from test.support import threading_helper
|
||||
""")
|
||||
|
||||
def create_pipe():
|
||||
r, w = os.pipe()
|
||||
self.addCleanup(lambda: os.close(r))
|
||||
self.addCleanup(lambda: os.close(w))
|
||||
return r, w
|
||||
|
||||
with self.subTest('add in main, run in subinterpreter'):
|
||||
r_ready, w_ready = create_pipe()
|
||||
r_done, w_done= create_pipe()
|
||||
timeout = time.time() + 30 # seconds
|
||||
|
||||
def do_work():
|
||||
_interpreters.run_string(interpid, f"""if True:
|
||||
# Wait until this interp has handled the pending call.
|
||||
waiting = False
|
||||
done = False
|
||||
def wait(os_read=os.read):
|
||||
global done, waiting
|
||||
waiting = True
|
||||
os_read({r_done}, 1)
|
||||
done = True
|
||||
t = threading.Thread(target=wait)
|
||||
with threading_helper.start_threads([t]):
|
||||
while not waiting:
|
||||
pass
|
||||
os.write({w_ready}, b'\\0')
|
||||
# Loop to trigger the eval breaker.
|
||||
while not done:
|
||||
time.sleep(0.01)
|
||||
if time.time() > {timeout}:
|
||||
raise Exception('timed out!')
|
||||
""")
|
||||
t = threading.Thread(target=do_work)
|
||||
with threading_helper.start_threads([t]):
|
||||
os.read(r_ready, 1)
|
||||
# Add the pending call and wait for it to finish.
|
||||
actual = _testinternalcapi.pending_identify(interpid)
|
||||
# Signal the subinterpreter to stop.
|
||||
os.write(w_done, b'\0')
|
||||
|
||||
self.assertEqual(actual, int(interpid))
|
||||
|
||||
with self.subTest('add in main, run in subinterpreter sub-thread'):
|
||||
r_ready, w_ready = create_pipe()
|
||||
r_done, w_done= create_pipe()
|
||||
timeout = time.time() + 30 # seconds
|
||||
|
||||
def do_work():
|
||||
_interpreters.run_string(interpid, f"""if True:
|
||||
waiting = False
|
||||
done = False
|
||||
def subthread():
|
||||
while not waiting:
|
||||
pass
|
||||
os.write({w_ready}, b'\\0')
|
||||
# Loop to trigger the eval breaker.
|
||||
while not done:
|
||||
time.sleep(0.01)
|
||||
if time.time() > {timeout}:
|
||||
raise Exception('timed out!')
|
||||
t = threading.Thread(target=subthread)
|
||||
with threading_helper.start_threads([t]):
|
||||
# Wait until this interp has handled the pending call.
|
||||
waiting = True
|
||||
os.read({r_done}, 1)
|
||||
done = True
|
||||
""")
|
||||
t = threading.Thread(target=do_work)
|
||||
with threading_helper.start_threads([t]):
|
||||
os.read(r_ready, 1)
|
||||
# Add the pending call and wait for it to finish.
|
||||
actual = _testinternalcapi.pending_identify(interpid)
|
||||
# Signal the subinterpreter to stop.
|
||||
os.write(w_done, b'\0')
|
||||
|
||||
self.assertEqual(actual, int(interpid))
|
||||
|
||||
with self.subTest('add in subinterpreter, run in main'):
|
||||
r_ready, w_ready = create_pipe()
|
||||
r_done, w_done= create_pipe()
|
||||
r_data, w_data= create_pipe()
|
||||
timeout = time.time() + 30 # seconds
|
||||
|
||||
def add_job():
|
||||
os.read(r_ready, 1)
|
||||
_interpreters.run_string(interpid, f"""if True:
|
||||
# Add the pending call and wait for it to finish.
|
||||
actual = _testinternalcapi.pending_identify({main_interpid})
|
||||
# Signal the subinterpreter to stop.
|
||||
os.write({w_done}, b'\\0')
|
||||
os.write({w_data}, actual.to_bytes(1, 'little'))
|
||||
""")
|
||||
# Wait until this interp has handled the pending call.
|
||||
waiting = False
|
||||
done = False
|
||||
def wait(os_read=os.read):
|
||||
nonlocal done, waiting
|
||||
waiting = True
|
||||
os_read(r_done, 1)
|
||||
done = True
|
||||
t1 = threading.Thread(target=add_job)
|
||||
t2 = threading.Thread(target=wait)
|
||||
with threading_helper.start_threads([t1, t2]):
|
||||
while not waiting:
|
||||
pass
|
||||
os.write(w_ready, b'\0')
|
||||
# Loop to trigger the eval breaker.
|
||||
while not done:
|
||||
time.sleep(0.01)
|
||||
if time.time() > timeout:
|
||||
raise Exception('timed out!')
|
||||
text = os.read(r_data, 1)
|
||||
actual = int.from_bytes(text, 'little')
|
||||
|
||||
self.assertEqual(actual, int(main_interpid))
|
||||
|
||||
with self.subTest('add in subinterpreter, run in sub-thread'):
|
||||
r_ready, w_ready = create_pipe()
|
||||
r_done, w_done= create_pipe()
|
||||
r_data, w_data= create_pipe()
|
||||
timeout = time.time() + 30 # seconds
|
||||
|
||||
def add_job():
|
||||
os.read(r_ready, 1)
|
||||
_interpreters.run_string(interpid, f"""if True:
|
||||
# Add the pending call and wait for it to finish.
|
||||
actual = _testinternalcapi.pending_identify({main_interpid})
|
||||
# Signal the subinterpreter to stop.
|
||||
os.write({w_done}, b'\\0')
|
||||
os.write({w_data}, actual.to_bytes(1, 'little'))
|
||||
""")
|
||||
# Wait until this interp has handled the pending call.
|
||||
waiting = False
|
||||
done = False
|
||||
def wait(os_read=os.read):
|
||||
nonlocal done, waiting
|
||||
waiting = True
|
||||
os_read(r_done, 1)
|
||||
done = True
|
||||
def subthread():
|
||||
while not waiting:
|
||||
pass
|
||||
os.write(w_ready, b'\0')
|
||||
# Loop to trigger the eval breaker.
|
||||
while not done:
|
||||
time.sleep(0.01)
|
||||
if time.time() > timeout:
|
||||
raise Exception('timed out!')
|
||||
t1 = threading.Thread(target=add_job)
|
||||
t2 = threading.Thread(target=wait)
|
||||
t3 = threading.Thread(target=subthread)
|
||||
with threading_helper.start_threads([t1, t2, t3]):
|
||||
pass
|
||||
text = os.read(r_data, 1)
|
||||
actual = int.from_bytes(text, 'little')
|
||||
|
||||
self.assertEqual(actual, int(main_interpid))
|
||||
|
||||
# XXX We can't use the rest until gh-105716 is fixed.
|
||||
return
|
||||
|
||||
with self.subTest('add in subinterpreter, run in subinterpreter sub-thread'):
|
||||
r_ready, w_ready = create_pipe()
|
||||
r_done, w_done= create_pipe()
|
||||
r_data, w_data= create_pipe()
|
||||
timeout = time.time() + 30 # seconds
|
||||
|
||||
def do_work():
|
||||
_interpreters.run_string(interpid, f"""if True:
|
||||
waiting = False
|
||||
done = False
|
||||
def subthread():
|
||||
while not waiting:
|
||||
pass
|
||||
os.write({w_ready}, b'\\0')
|
||||
# Loop to trigger the eval breaker.
|
||||
while not done:
|
||||
time.sleep(0.01)
|
||||
if time.time() > {timeout}:
|
||||
raise Exception('timed out!')
|
||||
t = threading.Thread(target=subthread)
|
||||
with threading_helper.start_threads([t]):
|
||||
# Wait until this interp has handled the pending call.
|
||||
waiting = True
|
||||
os.read({r_done}, 1)
|
||||
done = True
|
||||
""")
|
||||
t = threading.Thread(target=do_work)
|
||||
#with threading_helper.start_threads([t]):
|
||||
t.start()
|
||||
if True:
|
||||
os.read(r_ready, 1)
|
||||
_interpreters.run_string(interpid, f"""if True:
|
||||
# Add the pending call and wait for it to finish.
|
||||
actual = _testinternalcapi.pending_identify({interpid})
|
||||
# Signal the subinterpreter to stop.
|
||||
os.write({w_done}, b'\\0')
|
||||
os.write({w_data}, actual.to_bytes(1, 'little'))
|
||||
""")
|
||||
t.join()
|
||||
text = os.read(r_data, 1)
|
||||
actual = int.from_bytes(text, 'little')
|
||||
|
||||
self.assertEqual(actual, int(interpid))
|
||||
|
||||
|
||||
class SubinterpreterTest(unittest.TestCase):
|
||||
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
The "pending call" machinery now works for all interpreters, not just the
|
||||
main interpreter, and runs in all threads, not just the main thread. Some
|
||||
calls are still only done in the main thread, ergo in the main interpreter.
|
||||
This change does not affect signal handling nor the existing public C-API
|
||||
(``Py_AddPendingCall()``), which both still only target the main thread.
|
||||
The new functionality is meant strictly for internal use for now, since
|
||||
consequences of its use are not well understood yet outside some very
|
||||
restricted cases. This change brings the capability in line with the
|
||||
intention when the state was made per-interpreter several years ago.
|
|
@ -210,6 +210,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
|
|||
PyObject *item;
|
||||
PyLockStatus r;
|
||||
PY_TIMEOUT_T microseconds;
|
||||
PyThreadState *tstate = PyThreadState_Get();
|
||||
|
||||
if (block == 0) {
|
||||
/* Non-blocking */
|
||||
|
@ -253,7 +254,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
|
|||
Py_END_ALLOW_THREADS
|
||||
}
|
||||
|
||||
if (r == PY_LOCK_INTR && Py_MakePendingCalls() < 0) {
|
||||
if (r == PY_LOCK_INTR && _PyEval_MakePendingCalls(tstate) < 0) {
|
||||
return NULL;
|
||||
}
|
||||
if (r == PY_LOCK_FAILURE) {
|
||||
|
|
|
@ -13,16 +13,18 @@
|
|||
|
||||
#include "Python.h"
|
||||
#include "frameobject.h"
|
||||
#include "interpreteridobject.h" // _PyInterpreterID_LookUp()
|
||||
#include "pycore_atomic_funcs.h" // _Py_atomic_int_get()
|
||||
#include "pycore_bitutils.h" // _Py_bswap32()
|
||||
#include "pycore_compile.h" // _PyCompile_CodeGen, _PyCompile_OptimizeCfg, _PyCompile_Assemble
|
||||
#include "pycore_ceval.h" // _PyEval_AddPendingCall
|
||||
#include "pycore_fileutils.h" // _Py_normpath
|
||||
#include "pycore_frame.h" // _PyInterpreterFrame
|
||||
#include "pycore_gc.h" // PyGC_Head
|
||||
#include "pycore_hashtable.h" // _Py_hashtable_new()
|
||||
#include "pycore_initconfig.h" // _Py_GetConfigsAsDict()
|
||||
#include "pycore_pathconfig.h" // _PyPathConfig_ClearGlobal()
|
||||
#include "pycore_interp.h" // _PyInterpreterState_GetConfigCopy()
|
||||
#include "pycore_pathconfig.h" // _PyPathConfig_ClearGlobal()
|
||||
#include "pycore_pyerrors.h" // _Py_UTF8_Edit_Cost()
|
||||
#include "pycore_pystate.h" // _PyThreadState_GET()
|
||||
#include "osdefs.h" // MAXPATHLEN
|
||||
|
@ -838,6 +840,120 @@ set_optimizer(PyObject *self, PyObject *opt)
|
|||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
|
||||
static int _pending_callback(void *arg)
|
||||
{
|
||||
/* we assume the argument is callable object to which we own a reference */
|
||||
PyObject *callable = (PyObject *)arg;
|
||||
PyObject *r = PyObject_CallNoArgs(callable);
|
||||
Py_DECREF(callable);
|
||||
Py_XDECREF(r);
|
||||
return r != NULL ? 0 : -1;
|
||||
}
|
||||
|
||||
/* The following requests n callbacks to _pending_callback. It can be
|
||||
* run from any python thread.
|
||||
*/
|
||||
static PyObject *
|
||||
pending_threadfunc(PyObject *self, PyObject *args, PyObject *kwargs)
|
||||
{
|
||||
PyObject *callable;
|
||||
int ensure_added = 0;
|
||||
static char *kwlist[] = {"", "ensure_added", NULL};
|
||||
if (!PyArg_ParseTupleAndKeywords(args, kwargs,
|
||||
"O|$p:pending_threadfunc", kwlist,
|
||||
&callable, &ensure_added))
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
PyInterpreterState *interp = PyInterpreterState_Get();
|
||||
|
||||
/* create the reference for the callbackwhile we hold the lock */
|
||||
Py_INCREF(callable);
|
||||
|
||||
int r;
|
||||
Py_BEGIN_ALLOW_THREADS
|
||||
r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0);
|
||||
Py_END_ALLOW_THREADS
|
||||
if (r < 0) {
|
||||
/* unsuccessful add */
|
||||
if (!ensure_added) {
|
||||
Py_DECREF(callable);
|
||||
Py_RETURN_FALSE;
|
||||
}
|
||||
do {
|
||||
Py_BEGIN_ALLOW_THREADS
|
||||
r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0);
|
||||
Py_END_ALLOW_THREADS
|
||||
} while (r < 0);
|
||||
}
|
||||
|
||||
Py_RETURN_TRUE;
|
||||
}
|
||||
|
||||
|
||||
static struct {
|
||||
int64_t interpid;
|
||||
} pending_identify_result;
|
||||
|
||||
static int
|
||||
_pending_identify_callback(void *arg)
|
||||
{
|
||||
PyThread_type_lock mutex = (PyThread_type_lock)arg;
|
||||
assert(pending_identify_result.interpid == -1);
|
||||
PyThreadState *tstate = PyThreadState_Get();
|
||||
pending_identify_result.interpid = PyInterpreterState_GetID(tstate->interp);
|
||||
PyThread_release_lock(mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
pending_identify(PyObject *self, PyObject *args)
|
||||
{
|
||||
PyObject *interpid;
|
||||
if (!PyArg_ParseTuple(args, "O:pending_identify", &interpid)) {
|
||||
return NULL;
|
||||
}
|
||||
PyInterpreterState *interp = _PyInterpreterID_LookUp(interpid);
|
||||
if (interp == NULL) {
|
||||
if (!PyErr_Occurred()) {
|
||||
PyErr_SetString(PyExc_ValueError, "interpreter not found");
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pending_identify_result.interpid = -1;
|
||||
|
||||
PyThread_type_lock mutex = PyThread_allocate_lock();
|
||||
if (mutex == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
PyThread_acquire_lock(mutex, WAIT_LOCK);
|
||||
/* It gets released in _pending_identify_callback(). */
|
||||
|
||||
int r;
|
||||
do {
|
||||
Py_BEGIN_ALLOW_THREADS
|
||||
r = _PyEval_AddPendingCall(interp,
|
||||
&_pending_identify_callback, (void *)mutex,
|
||||
0);
|
||||
Py_END_ALLOW_THREADS
|
||||
} while (r < 0);
|
||||
|
||||
/* Wait for the pending call to complete. */
|
||||
PyThread_acquire_lock(mutex, WAIT_LOCK);
|
||||
PyThread_release_lock(mutex);
|
||||
PyThread_free_lock(mutex);
|
||||
|
||||
PyObject *res = PyLong_FromLongLong(pending_identify_result.interpid);
|
||||
pending_identify_result.interpid = -1;
|
||||
if (res == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
static PyMethodDef module_functions[] = {
|
||||
{"get_configs", get_configs, METH_NOARGS},
|
||||
{"get_recursion_depth", get_recursion_depth, METH_NOARGS},
|
||||
|
@ -868,6 +984,10 @@ static PyMethodDef module_functions[] = {
|
|||
{"iframe_getlasti", iframe_getlasti, METH_O, NULL},
|
||||
{"set_optimizer", set_optimizer, METH_O, NULL},
|
||||
{"get_counter_optimizer", get_counter_optimizer, METH_NOARGS, NULL},
|
||||
{"pending_threadfunc", _PyCFunction_CAST(pending_threadfunc),
|
||||
METH_VARARGS | METH_KEYWORDS},
|
||||
// {"pending_fd_identify", pending_fd_identify, METH_VARARGS, NULL},
|
||||
{"pending_identify", pending_identify, METH_VARARGS, NULL},
|
||||
{NULL, NULL} /* sentinel */
|
||||
};
|
||||
|
||||
|
|
|
@ -81,6 +81,7 @@ lock_dealloc(lockobject *self)
|
|||
static PyLockStatus
|
||||
acquire_timed(PyThread_type_lock lock, _PyTime_t timeout)
|
||||
{
|
||||
PyThreadState *tstate = _PyThreadState_GET();
|
||||
_PyTime_t endtime = 0;
|
||||
if (timeout > 0) {
|
||||
endtime = _PyDeadline_Init(timeout);
|
||||
|
@ -103,7 +104,7 @@ acquire_timed(PyThread_type_lock lock, _PyTime_t timeout)
|
|||
/* Run signal handlers if we were interrupted. Propagate
|
||||
* exceptions from signal handlers, such as KeyboardInterrupt, by
|
||||
* passing up PY_LOCK_INTR. */
|
||||
if (Py_MakePendingCalls() < 0) {
|
||||
if (_PyEval_MakePendingCalls(tstate) < 0) {
|
||||
return PY_LOCK_INTR;
|
||||
}
|
||||
|
||||
|
|
|
@ -314,7 +314,8 @@ trip_signal(int sig_num)
|
|||
still use it for this exceptional case. */
|
||||
_PyEval_AddPendingCall(interp,
|
||||
report_wakeup_send_error,
|
||||
(void *)(intptr_t) last_error);
|
||||
(void *)(intptr_t) last_error,
|
||||
1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -333,7 +334,8 @@ trip_signal(int sig_num)
|
|||
still use it for this exceptional case. */
|
||||
_PyEval_AddPendingCall(interp,
|
||||
report_wakeup_write_error,
|
||||
(void *)(intptr_t)errno);
|
||||
(void *)(intptr_t)errno,
|
||||
1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -758,6 +758,61 @@ handle_eval_breaker:
|
|||
* We need to do reasonably frequently, but not too frequently.
|
||||
* All loops should include a check of the eval breaker.
|
||||
* We also check on return from any builtin function.
|
||||
*
|
||||
* ## More Details ###
|
||||
*
|
||||
* The eval loop (this function) normally executes the instructions
|
||||
* of a code object sequentially. However, the runtime supports a
|
||||
* number of out-of-band execution scenarios that may pause that
|
||||
* sequential execution long enough to do that out-of-band work
|
||||
* in the current thread using the current PyThreadState.
|
||||
*
|
||||
* The scenarios include:
|
||||
*
|
||||
* - cyclic garbage collection
|
||||
* - GIL drop requests
|
||||
* - "async" exceptions
|
||||
* - "pending calls" (some only in the main thread)
|
||||
* - signal handling (only in the main thread)
|
||||
*
|
||||
* When the need for one of the above is detected, the eval loop
|
||||
* pauses long enough to handle the detected case. Then, if doing
|
||||
* so didn't trigger an exception, the eval loop resumes executing
|
||||
* the sequential instructions.
|
||||
*
|
||||
* To make this work, the eval loop periodically checks if any
|
||||
* of the above needs to happen. The individual checks can be
|
||||
* expensive if computed each time, so a while back we switched
|
||||
* to using pre-computed, per-interpreter variables for the checks,
|
||||
* and later consolidated that to a single "eval breaker" variable
|
||||
* (now a PyInterpreterState field).
|
||||
*
|
||||
* For the longest time, the eval breaker check would happen
|
||||
* frequently, every 5 or so times through the loop, regardless
|
||||
* of what instruction ran last or what would run next. Then, in
|
||||
* early 2021 (gh-18334, commit 4958f5d), we switched to checking
|
||||
* the eval breaker less frequently, by hard-coding the check to
|
||||
* specific places in the eval loop (e.g. certain instructions).
|
||||
* The intent then was to check after returning from calls
|
||||
* and on the back edges of loops.
|
||||
*
|
||||
* In addition to being more efficient, that approach keeps
|
||||
* the eval loop from running arbitrary code between instructions
|
||||
* that don't handle that well. (See gh-74174.)
|
||||
*
|
||||
* Currently, the eval breaker check happens here at the
|
||||
* "handle_eval_breaker" label. Some instructions come here
|
||||
* explicitly (goto) and some indirectly. Notably, the check
|
||||
* happens on back edges in the control flow graph, which
|
||||
* pretty much applies to all loops and most calls.
|
||||
* (See bytecodes.c for exact information.)
|
||||
*
|
||||
* One consequence of this approach is that it might not be obvious
|
||||
* how to force any specific thread to pick up the eval breaker,
|
||||
* or for any specific thread to not pick it up. Mostly this
|
||||
* involves judicious uses of locks and careful ordering of code,
|
||||
* while avoiding code that might trigger the eval breaker
|
||||
* until so desired.
|
||||
*/
|
||||
if (_Py_HandlePending(tstate) != 0) {
|
||||
goto error;
|
||||
|
|
|
@ -68,8 +68,9 @@ COMPUTE_EVAL_BREAKER(PyInterpreterState *interp,
|
|||
_Py_atomic_load_relaxed_int32(&ceval2->gil_drop_request)
|
||||
| (_Py_atomic_load_relaxed_int32(&ceval->signals_pending)
|
||||
&& _Py_ThreadCanHandleSignals(interp))
|
||||
| (_Py_atomic_load_relaxed_int32(&ceval2->pending.calls_to_do)
|
||||
&& _Py_ThreadCanHandlePendingCalls())
|
||||
| (_Py_atomic_load_relaxed_int32(&ceval2->pending.calls_to_do))
|
||||
| (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)
|
||||
&&_Py_atomic_load_relaxed_int32(&ceval->pending_mainthread.calls_to_do))
|
||||
| ceval2->pending.async_exc
|
||||
| _Py_atomic_load_relaxed_int32(&ceval2->gc_scheduled));
|
||||
}
|
||||
|
@ -95,11 +96,11 @@ RESET_GIL_DROP_REQUEST(PyInterpreterState *interp)
|
|||
|
||||
|
||||
static inline void
|
||||
SIGNAL_PENDING_CALLS(PyInterpreterState *interp)
|
||||
SIGNAL_PENDING_CALLS(struct _pending_calls *pending, PyInterpreterState *interp)
|
||||
{
|
||||
struct _ceval_runtime_state *ceval = &interp->runtime->ceval;
|
||||
struct _ceval_state *ceval2 = &interp->ceval;
|
||||
_Py_atomic_store_relaxed(&ceval2->pending.calls_to_do, 1);
|
||||
_Py_atomic_store_relaxed(&pending->calls_to_do, 1);
|
||||
COMPUTE_EVAL_BREAKER(interp, ceval, ceval2);
|
||||
}
|
||||
|
||||
|
@ -109,6 +110,9 @@ UNSIGNAL_PENDING_CALLS(PyInterpreterState *interp)
|
|||
{
|
||||
struct _ceval_runtime_state *ceval = &interp->runtime->ceval;
|
||||
struct _ceval_state *ceval2 = &interp->ceval;
|
||||
if (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)) {
|
||||
_Py_atomic_store_relaxed(&ceval->pending_mainthread.calls_to_do, 0);
|
||||
}
|
||||
_Py_atomic_store_relaxed(&ceval2->pending.calls_to_do, 0);
|
||||
COMPUTE_EVAL_BREAKER(interp, ceval, ceval2);
|
||||
}
|
||||
|
@ -803,19 +807,31 @@ _push_pending_call(struct _pending_calls *pending,
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
_next_pending_call(struct _pending_calls *pending,
|
||||
int (**func)(void *), void **arg)
|
||||
{
|
||||
int i = pending->first;
|
||||
if (i == pending->last) {
|
||||
/* Queue empty */
|
||||
assert(pending->calls[i].func == NULL);
|
||||
return -1;
|
||||
}
|
||||
*func = pending->calls[i].func;
|
||||
*arg = pending->calls[i].arg;
|
||||
return i;
|
||||
}
|
||||
|
||||
/* Pop one item off the queue while holding the lock. */
|
||||
static void
|
||||
_pop_pending_call(struct _pending_calls *pending,
|
||||
int (**func)(void *), void **arg)
|
||||
{
|
||||
int i = pending->first;
|
||||
if (i == pending->last) {
|
||||
return; /* Queue empty */
|
||||
}
|
||||
|
||||
*func = pending->calls[i].func;
|
||||
*arg = pending->calls[i].arg;
|
||||
int i = _next_pending_call(pending, func, arg);
|
||||
if (i >= 0) {
|
||||
pending->calls[i] = (struct _pending_call){0};
|
||||
pending->first = (i + 1) % NPENDINGCALLS;
|
||||
}
|
||||
}
|
||||
|
||||
/* This implementation is thread-safe. It allows
|
||||
|
@ -825,9 +841,16 @@ _pop_pending_call(struct _pending_calls *pending,
|
|||
|
||||
int
|
||||
_PyEval_AddPendingCall(PyInterpreterState *interp,
|
||||
int (*func)(void *), void *arg)
|
||||
int (*func)(void *), void *arg,
|
||||
int mainthreadonly)
|
||||
{
|
||||
assert(!mainthreadonly || _Py_IsMainInterpreter(interp));
|
||||
struct _pending_calls *pending = &interp->ceval.pending;
|
||||
if (mainthreadonly) {
|
||||
/* The main thread only exists in the main interpreter. */
|
||||
assert(_Py_IsMainInterpreter(interp));
|
||||
pending = &_PyRuntime.ceval.pending_mainthread;
|
||||
}
|
||||
/* Ensure that _PyEval_InitState() was called
|
||||
and that _PyEval_FiniState() is not called yet. */
|
||||
assert(pending->lock != NULL);
|
||||
|
@ -837,39 +860,17 @@ _PyEval_AddPendingCall(PyInterpreterState *interp,
|
|||
PyThread_release_lock(pending->lock);
|
||||
|
||||
/* signal main loop */
|
||||
SIGNAL_PENDING_CALLS(interp);
|
||||
SIGNAL_PENDING_CALLS(pending, interp);
|
||||
return result;
|
||||
}
|
||||
|
||||
int
|
||||
Py_AddPendingCall(int (*func)(void *), void *arg)
|
||||
{
|
||||
/* Best-effort to support subinterpreters and calls with the GIL released.
|
||||
|
||||
First attempt _PyThreadState_GET() since it supports subinterpreters.
|
||||
|
||||
If the GIL is released, _PyThreadState_GET() returns NULL . In this
|
||||
case, use PyGILState_GetThisThreadState() which works even if the GIL
|
||||
is released.
|
||||
|
||||
Sadly, PyGILState_GetThisThreadState() doesn't support subinterpreters:
|
||||
see bpo-10915 and bpo-15751.
|
||||
|
||||
Py_AddPendingCall() doesn't require the caller to hold the GIL. */
|
||||
PyThreadState *tstate = _PyThreadState_GET();
|
||||
if (tstate == NULL) {
|
||||
tstate = PyGILState_GetThisThreadState();
|
||||
}
|
||||
|
||||
PyInterpreterState *interp;
|
||||
if (tstate != NULL) {
|
||||
interp = tstate->interp;
|
||||
}
|
||||
else {
|
||||
/* Last resort: use the main interpreter */
|
||||
interp = _PyInterpreterState_Main();
|
||||
}
|
||||
return _PyEval_AddPendingCall(interp, func, arg);
|
||||
/* Legacy users of this API will continue to target the main thread
|
||||
(of the main interpreter). */
|
||||
PyInterpreterState *interp = _PyInterpreterState_Main();
|
||||
return _PyEval_AddPendingCall(interp, func, arg, 1);
|
||||
}
|
||||
|
||||
static int
|
||||
|
@ -889,27 +890,24 @@ handle_signals(PyThreadState *tstate)
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
make_pending_calls(PyInterpreterState *interp)
|
||||
static inline int
|
||||
maybe_has_pending_calls(PyInterpreterState *interp)
|
||||
{
|
||||
/* only execute pending calls on main thread */
|
||||
if (!_Py_ThreadCanHandlePendingCalls()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* don't perform recursive pending calls */
|
||||
if (interp->ceval.pending.busy) {
|
||||
return 0;
|
||||
}
|
||||
interp->ceval.pending.busy = 1;
|
||||
|
||||
/* unsignal before starting to call callbacks, so that any callback
|
||||
added in-between re-signals */
|
||||
UNSIGNAL_PENDING_CALLS(interp);
|
||||
int res = 0;
|
||||
|
||||
/* perform a bounded number of calls, in case of recursion */
|
||||
struct _pending_calls *pending = &interp->ceval.pending;
|
||||
if (_Py_atomic_load_relaxed_int32(&pending->calls_to_do)) {
|
||||
return 1;
|
||||
}
|
||||
if (!_Py_IsMainThread() || !_Py_IsMainInterpreter(interp)) {
|
||||
return 0;
|
||||
}
|
||||
pending = &_PyRuntime.ceval.pending_mainthread;
|
||||
return _Py_atomic_load_relaxed_int32(&pending->calls_to_do);
|
||||
}
|
||||
|
||||
static int
|
||||
_make_pending_calls(struct _pending_calls *pending)
|
||||
{
|
||||
/* perform a bounded number of calls, in case of recursion */
|
||||
for (int i=0; i<NPENDINGCALLS; i++) {
|
||||
int (*func)(void *) = NULL;
|
||||
void *arg = NULL;
|
||||
|
@ -923,19 +921,61 @@ make_pending_calls(PyInterpreterState *interp)
|
|||
if (func == NULL) {
|
||||
break;
|
||||
}
|
||||
res = func(arg);
|
||||
if (res) {
|
||||
goto error;
|
||||
if (func(arg) != 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
make_pending_calls(PyInterpreterState *interp)
|
||||
{
|
||||
struct _pending_calls *pending = &interp->ceval.pending;
|
||||
struct _pending_calls *pending_main = &_PyRuntime.ceval.pending_mainthread;
|
||||
|
||||
/* Only one thread (per interpreter) may run the pending calls
|
||||
at once. In the same way, we don't do recursive pending calls. */
|
||||
PyThread_acquire_lock(pending->lock, WAIT_LOCK);
|
||||
if (pending->busy) {
|
||||
/* A pending call was added after another thread was already
|
||||
handling the pending calls (and had already "unsignaled").
|
||||
Once that thread is done, it may have taken care of all the
|
||||
pending calls, or there might be some still waiting.
|
||||
Regardless, this interpreter's pending calls will stay
|
||||
"signaled" until that first thread has finished. At that
|
||||
point the next thread to trip the eval breaker will take
|
||||
care of any remaining pending calls. Until then, though,
|
||||
all the interpreter's threads will be tripping the eval
|
||||
breaker every time it's checked. */
|
||||
PyThread_release_lock(pending->lock);
|
||||
return 0;
|
||||
}
|
||||
pending->busy = 1;
|
||||
PyThread_release_lock(pending->lock);
|
||||
|
||||
/* unsignal before starting to call callbacks, so that any callback
|
||||
added in-between re-signals */
|
||||
UNSIGNAL_PENDING_CALLS(interp);
|
||||
|
||||
if (_make_pending_calls(pending) != 0) {
|
||||
pending->busy = 0;
|
||||
/* There might not be more calls to make, but we play it safe. */
|
||||
SIGNAL_PENDING_CALLS(pending, interp);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)) {
|
||||
if (_make_pending_calls(pending_main) != 0) {
|
||||
pending->busy = 0;
|
||||
/* There might not be more calls to make, but we play it safe. */
|
||||
SIGNAL_PENDING_CALLS(pending_main, interp);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
interp->ceval.pending.busy = 0;
|
||||
return res;
|
||||
|
||||
error:
|
||||
interp->ceval.pending.busy = 0;
|
||||
SIGNAL_PENDING_CALLS(interp);
|
||||
return res;
|
||||
pending->busy = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -944,12 +984,6 @@ _Py_FinishPendingCalls(PyThreadState *tstate)
|
|||
assert(PyGILState_Check());
|
||||
assert(is_tstate_valid(tstate));
|
||||
|
||||
struct _pending_calls *pending = &tstate->interp->ceval.pending;
|
||||
|
||||
if (!_Py_atomic_load_relaxed_int32(&(pending->calls_to_do))) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (make_pending_calls(tstate->interp) < 0) {
|
||||
PyObject *exc = _PyErr_GetRaisedException(tstate);
|
||||
PyErr_BadInternalCall();
|
||||
|
@ -958,6 +992,29 @@ _Py_FinishPendingCalls(PyThreadState *tstate)
|
|||
}
|
||||
}
|
||||
|
||||
int
|
||||
_PyEval_MakePendingCalls(PyThreadState *tstate)
|
||||
{
|
||||
int res;
|
||||
|
||||
if (_Py_IsMainThread() && _Py_IsMainInterpreter(tstate->interp)) {
|
||||
/* Python signal handler doesn't really queue a callback:
|
||||
it only signals that a signal was received,
|
||||
see _PyEval_SignalReceived(). */
|
||||
res = handle_signals(tstate);
|
||||
if (res != 0) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
res = make_pending_calls(tstate->interp);
|
||||
if (res != 0) {
|
||||
return res;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Py_MakePendingCalls() is a simple wrapper for the sake
|
||||
of backward-compatibility. */
|
||||
int
|
||||
|
@ -968,19 +1025,11 @@ Py_MakePendingCalls(void)
|
|||
PyThreadState *tstate = _PyThreadState_GET();
|
||||
assert(is_tstate_valid(tstate));
|
||||
|
||||
/* Python signal handler doesn't really queue a callback: it only signals
|
||||
that a signal was received, see _PyEval_SignalReceived(). */
|
||||
int res = handle_signals(tstate);
|
||||
if (res != 0) {
|
||||
return res;
|
||||
}
|
||||
|
||||
res = make_pending_calls(tstate->interp);
|
||||
if (res != 0) {
|
||||
return res;
|
||||
}
|
||||
|
||||
/* Only execute pending calls on the main thread. */
|
||||
if (!_Py_IsMainThread() || !_Py_IsMainInterpreter(tstate->interp)) {
|
||||
return 0;
|
||||
}
|
||||
return _PyEval_MakePendingCalls(tstate);
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -1020,7 +1069,7 @@ _Py_HandlePending(PyThreadState *tstate)
|
|||
}
|
||||
|
||||
/* Pending calls */
|
||||
if (_Py_atomic_load_relaxed_int32(&interp_ceval_state->pending.calls_to_do)) {
|
||||
if (maybe_has_pending_calls(tstate->interp)) {
|
||||
if (make_pending_calls(tstate->interp) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -2152,6 +2152,9 @@ Py_EndInterpreter(PyThreadState *tstate)
|
|||
// Wrap up existing "threading"-module-created, non-daemon threads.
|
||||
wait_for_thread_shutdown(tstate);
|
||||
|
||||
// Make any remaining pending calls.
|
||||
_Py_FinishPendingCalls(tstate);
|
||||
|
||||
_PyAtExit_Call(tstate->interp);
|
||||
|
||||
if (tstate != interp->threads.head || tstate->next != NULL) {
|
||||
|
|
|
@ -380,7 +380,7 @@ _Py_COMP_DIAG_IGNORE_DEPR_DECLS
|
|||
static const _PyRuntimeState initial = _PyRuntimeState_INIT(_PyRuntime);
|
||||
_Py_COMP_DIAG_POP
|
||||
|
||||
#define NUMLOCKS 8
|
||||
#define NUMLOCKS 9
|
||||
#define LOCKS_INIT(runtime) \
|
||||
{ \
|
||||
&(runtime)->interpreters.mutex, \
|
||||
|
@ -388,6 +388,7 @@ _Py_COMP_DIAG_POP
|
|||
&(runtime)->getargs.mutex, \
|
||||
&(runtime)->unicode_state.ids.lock, \
|
||||
&(runtime)->imports.extensions.mutex, \
|
||||
&(runtime)->ceval.pending_mainthread.lock, \
|
||||
&(runtime)->atexit.mutex, \
|
||||
&(runtime)->audit_hooks.mutex, \
|
||||
&(runtime)->allocators.mutex, \
|
||||
|
|
|
@ -517,6 +517,7 @@ Modules/_testcapimodule.c - g_type_watchers_installed -
|
|||
Modules/_testimportmultiple.c - _barmodule -
|
||||
Modules/_testimportmultiple.c - _foomodule -
|
||||
Modules/_testimportmultiple.c - _testimportmultiple -
|
||||
Modules/_testinternalcapi.c - pending_identify_result -
|
||||
Modules/_testmultiphase.c - Example_Type_slots -
|
||||
Modules/_testmultiphase.c - Example_Type_spec -
|
||||
Modules/_testmultiphase.c - Example_methods -
|
||||
|
|
Can't render this file because it has a wrong number of fields in line 4.
|
Loading…
Add table
Add a link
Reference in a new issue