mirror of
https://github.com/python/cpython.git
synced 2025-07-12 22:05:16 +00:00
gh-104812: Run Pending Calls in any Thread (gh-104813)
For a while now, pending calls only run in the main thread (in the main interpreter). This PR changes things to allow any thread run a pending call, unless the pending call was explicitly added for the main thread to run.
This commit is contained in:
parent
4e80082723
commit
757b402ea1
16 changed files with 766 additions and 123 deletions
|
@ -22,6 +22,8 @@ PyAPI_FUNC(PyObject *) _PyEval_EvalFrameDefault(PyThreadState *tstate, struct _P
|
||||||
PyAPI_FUNC(void) _PyEval_SetSwitchInterval(unsigned long microseconds);
|
PyAPI_FUNC(void) _PyEval_SetSwitchInterval(unsigned long microseconds);
|
||||||
PyAPI_FUNC(unsigned long) _PyEval_GetSwitchInterval(void);
|
PyAPI_FUNC(unsigned long) _PyEval_GetSwitchInterval(void);
|
||||||
|
|
||||||
|
PyAPI_FUNC(int) _PyEval_MakePendingCalls(PyThreadState *);
|
||||||
|
|
||||||
PyAPI_FUNC(Py_ssize_t) PyUnstable_Eval_RequestCodeExtraIndex(freefunc);
|
PyAPI_FUNC(Py_ssize_t) PyUnstable_Eval_RequestCodeExtraIndex(freefunc);
|
||||||
// Old name -- remove when this API changes:
|
// Old name -- remove when this API changes:
|
||||||
_Py_DEPRECATED_EXTERNALLY(3.12) static inline Py_ssize_t
|
_Py_DEPRECATED_EXTERNALLY(3.12) static inline Py_ssize_t
|
||||||
|
|
|
@ -27,7 +27,8 @@ PyAPI_FUNC(void) _PyEval_SignalReceived(PyInterpreterState *interp);
|
||||||
PyAPI_FUNC(int) _PyEval_AddPendingCall(
|
PyAPI_FUNC(int) _PyEval_AddPendingCall(
|
||||||
PyInterpreterState *interp,
|
PyInterpreterState *interp,
|
||||||
int (*func)(void *),
|
int (*func)(void *),
|
||||||
void *arg);
|
void *arg,
|
||||||
|
int mainthreadonly);
|
||||||
PyAPI_FUNC(void) _PyEval_SignalAsyncExc(PyInterpreterState *interp);
|
PyAPI_FUNC(void) _PyEval_SignalAsyncExc(PyInterpreterState *interp);
|
||||||
#ifdef HAVE_FORK
|
#ifdef HAVE_FORK
|
||||||
extern PyStatus _PyEval_ReInitThreads(PyThreadState *tstate);
|
extern PyStatus _PyEval_ReInitThreads(PyThreadState *tstate);
|
||||||
|
|
|
@ -13,6 +13,24 @@ extern "C" {
|
||||||
#include "pycore_gil.h" // struct _gil_runtime_state
|
#include "pycore_gil.h" // struct _gil_runtime_state
|
||||||
|
|
||||||
|
|
||||||
|
struct _pending_calls {
|
||||||
|
int busy;
|
||||||
|
PyThread_type_lock lock;
|
||||||
|
/* Request for running pending calls. */
|
||||||
|
_Py_atomic_int calls_to_do;
|
||||||
|
/* Request for looking at the `async_exc` field of the current
|
||||||
|
thread state.
|
||||||
|
Guarded by the GIL. */
|
||||||
|
int async_exc;
|
||||||
|
#define NPENDINGCALLS 32
|
||||||
|
struct _pending_call {
|
||||||
|
int (*func)(void *);
|
||||||
|
void *arg;
|
||||||
|
} calls[NPENDINGCALLS];
|
||||||
|
int first;
|
||||||
|
int last;
|
||||||
|
};
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
PERF_STATUS_FAILED = -1, // Perf trampoline is in an invalid state
|
PERF_STATUS_FAILED = -1, // Perf trampoline is in an invalid state
|
||||||
PERF_STATUS_NO_INIT = 0, // Perf trampoline is not initialized
|
PERF_STATUS_NO_INIT = 0, // Perf trampoline is not initialized
|
||||||
|
@ -49,6 +67,8 @@ struct _ceval_runtime_state {
|
||||||
the main thread of the main interpreter can handle signals: see
|
the main thread of the main interpreter can handle signals: see
|
||||||
_Py_ThreadCanHandleSignals(). */
|
_Py_ThreadCanHandleSignals(). */
|
||||||
_Py_atomic_int signals_pending;
|
_Py_atomic_int signals_pending;
|
||||||
|
/* Pending calls to be made only on the main thread. */
|
||||||
|
struct _pending_calls pending_mainthread;
|
||||||
};
|
};
|
||||||
|
|
||||||
#ifdef PY_HAVE_PERF_TRAMPOLINE
|
#ifdef PY_HAVE_PERF_TRAMPOLINE
|
||||||
|
@ -62,24 +82,6 @@ struct _ceval_runtime_state {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
struct _pending_calls {
|
|
||||||
int busy;
|
|
||||||
PyThread_type_lock lock;
|
|
||||||
/* Request for running pending calls. */
|
|
||||||
_Py_atomic_int calls_to_do;
|
|
||||||
/* Request for looking at the `async_exc` field of the current
|
|
||||||
thread state.
|
|
||||||
Guarded by the GIL. */
|
|
||||||
int async_exc;
|
|
||||||
#define NPENDINGCALLS 32
|
|
||||||
struct {
|
|
||||||
int (*func)(void *);
|
|
||||||
void *arg;
|
|
||||||
} calls[NPENDINGCALLS];
|
|
||||||
int first;
|
|
||||||
int last;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct _ceval_state {
|
struct _ceval_state {
|
||||||
/* This single variable consolidates all requests to break out of
|
/* This single variable consolidates all requests to break out of
|
||||||
the fast path in the eval loop. */
|
the fast path in the eval loop. */
|
||||||
|
|
|
@ -60,14 +60,6 @@ _Py_ThreadCanHandleSignals(PyInterpreterState *interp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* Only execute pending calls on the main thread. */
|
|
||||||
static inline int
|
|
||||||
_Py_ThreadCanHandlePendingCalls(void)
|
|
||||||
{
|
|
||||||
return _Py_IsMainThread();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* Variable and static inline functions for in-line access to current thread
|
/* Variable and static inline functions for in-line access to current thread
|
||||||
and interpreter state */
|
and interpreter state */
|
||||||
|
|
||||||
|
|
|
@ -115,7 +115,11 @@ def join_thread(thread, timeout=None):
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def start_threads(threads, unlock=None):
|
def start_threads(threads, unlock=None):
|
||||||
import faulthandler
|
try:
|
||||||
|
import faulthandler
|
||||||
|
except ImportError:
|
||||||
|
# It isn't supported on subinterpreters yet.
|
||||||
|
faulthandler = None
|
||||||
threads = list(threads)
|
threads = list(threads)
|
||||||
started = []
|
started = []
|
||||||
try:
|
try:
|
||||||
|
@ -147,7 +151,8 @@ def start_threads(threads, unlock=None):
|
||||||
finally:
|
finally:
|
||||||
started = [t for t in started if t.is_alive()]
|
started = [t for t in started if t.is_alive()]
|
||||||
if started:
|
if started:
|
||||||
faulthandler.dump_traceback(sys.stdout)
|
if faulthandler is not None:
|
||||||
|
faulthandler.dump_traceback(sys.stdout)
|
||||||
raise AssertionError('Unable to join %d threads' % len(started))
|
raise AssertionError('Unable to join %d threads' % len(started))
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -2,17 +2,20 @@
|
||||||
# these are all functions _testcapi exports whose name begins with 'test_'.
|
# these are all functions _testcapi exports whose name begins with 'test_'.
|
||||||
|
|
||||||
import _thread
|
import _thread
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict, deque
|
||||||
import contextlib
|
import contextlib
|
||||||
import importlib.machinery
|
import importlib.machinery
|
||||||
import importlib.util
|
import importlib.util
|
||||||
|
import json
|
||||||
import os
|
import os
|
||||||
import pickle
|
import pickle
|
||||||
|
import queue
|
||||||
import random
|
import random
|
||||||
import sys
|
import sys
|
||||||
import textwrap
|
import textwrap
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
import types
|
||||||
import unittest
|
import unittest
|
||||||
import warnings
|
import warnings
|
||||||
import weakref
|
import weakref
|
||||||
|
@ -36,6 +39,10 @@ try:
|
||||||
import _testsinglephase
|
import _testsinglephase
|
||||||
except ImportError:
|
except ImportError:
|
||||||
_testsinglephase = None
|
_testsinglephase = None
|
||||||
|
try:
|
||||||
|
import _xxsubinterpreters as _interpreters
|
||||||
|
except ModuleNotFoundError:
|
||||||
|
_interpreters = None
|
||||||
|
|
||||||
# Skip this test if the _testcapi module isn't available.
|
# Skip this test if the _testcapi module isn't available.
|
||||||
_testcapi = import_helper.import_module('_testcapi')
|
_testcapi = import_helper.import_module('_testcapi')
|
||||||
|
@ -47,6 +54,12 @@ def decode_stderr(err):
|
||||||
return err.decode('utf-8', 'replace').replace('\r', '')
|
return err.decode('utf-8', 'replace').replace('\r', '')
|
||||||
|
|
||||||
|
|
||||||
|
def requires_subinterpreters(meth):
|
||||||
|
"""Decorator to skip a test if subinterpreters are not supported."""
|
||||||
|
return unittest.skipIf(_interpreters is None,
|
||||||
|
'subinterpreters required')(meth)
|
||||||
|
|
||||||
|
|
||||||
def testfunction(self):
|
def testfunction(self):
|
||||||
"""some doc"""
|
"""some doc"""
|
||||||
return self
|
return self
|
||||||
|
@ -1259,6 +1272,10 @@ class TestHeapTypeRelative(unittest.TestCase):
|
||||||
|
|
||||||
class TestPendingCalls(unittest.TestCase):
|
class TestPendingCalls(unittest.TestCase):
|
||||||
|
|
||||||
|
# See the comment in ceval.c (at the "handle_eval_breaker" label)
|
||||||
|
# about when pending calls get run. This is especially relevant
|
||||||
|
# here for creating deterministic tests.
|
||||||
|
|
||||||
def pendingcalls_submit(self, l, n):
|
def pendingcalls_submit(self, l, n):
|
||||||
def callback():
|
def callback():
|
||||||
#this function can be interrupted by thread switching so let's
|
#this function can be interrupted by thread switching so let's
|
||||||
|
@ -1341,6 +1358,388 @@ class TestPendingCalls(unittest.TestCase):
|
||||||
gen = genf()
|
gen = genf()
|
||||||
self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code)
|
self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code)
|
||||||
|
|
||||||
|
class PendingTask(types.SimpleNamespace):
|
||||||
|
|
||||||
|
_add_pending = _testinternalcapi.pending_threadfunc
|
||||||
|
|
||||||
|
def __init__(self, req, taskid=None, notify_done=None):
|
||||||
|
self.id = taskid
|
||||||
|
self.req = req
|
||||||
|
self.notify_done = notify_done
|
||||||
|
|
||||||
|
self.creator_tid = threading.get_ident()
|
||||||
|
self.requester_tid = None
|
||||||
|
self.runner_tid = None
|
||||||
|
self.result = None
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
assert self.result is None
|
||||||
|
self.runner_tid = threading.get_ident()
|
||||||
|
self._run()
|
||||||
|
if self.notify_done is not None:
|
||||||
|
self.notify_done()
|
||||||
|
|
||||||
|
def _run(self):
|
||||||
|
self.result = self.req
|
||||||
|
|
||||||
|
def run_in_pending_call(self, worker_tids):
|
||||||
|
assert self._add_pending is _testinternalcapi.pending_threadfunc
|
||||||
|
self.requester_tid = threading.get_ident()
|
||||||
|
def callback():
|
||||||
|
assert self.result is None
|
||||||
|
# It can be tricky to control which thread handles
|
||||||
|
# the eval breaker, so we take a naive approach to
|
||||||
|
# make sure.
|
||||||
|
if threading.get_ident() not in worker_tids:
|
||||||
|
self._add_pending(callback, ensure_added=True)
|
||||||
|
return
|
||||||
|
self.run()
|
||||||
|
self._add_pending(callback, ensure_added=True)
|
||||||
|
|
||||||
|
def create_thread(self, worker_tids):
|
||||||
|
return threading.Thread(
|
||||||
|
target=self.run_in_pending_call,
|
||||||
|
args=(worker_tids,),
|
||||||
|
)
|
||||||
|
|
||||||
|
def wait_for_result(self):
|
||||||
|
while self.result is None:
|
||||||
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
def test_subthreads_can_handle_pending_calls(self):
|
||||||
|
payload = 'Spam spam spam spam. Lovely spam! Wonderful spam!'
|
||||||
|
|
||||||
|
task = self.PendingTask(payload)
|
||||||
|
def do_the_work():
|
||||||
|
tid = threading.get_ident()
|
||||||
|
t = task.create_thread({tid})
|
||||||
|
with threading_helper.start_threads([t]):
|
||||||
|
task.wait_for_result()
|
||||||
|
t = threading.Thread(target=do_the_work)
|
||||||
|
with threading_helper.start_threads([t]):
|
||||||
|
pass
|
||||||
|
|
||||||
|
self.assertEqual(task.result, payload)
|
||||||
|
|
||||||
|
def test_many_subthreads_can_handle_pending_calls(self):
|
||||||
|
main_tid = threading.get_ident()
|
||||||
|
self.assertEqual(threading.main_thread().ident, main_tid)
|
||||||
|
|
||||||
|
# We can't use queue.Queue since it isn't reentrant relative
|
||||||
|
# to pending calls.
|
||||||
|
_queue = deque()
|
||||||
|
_active = deque()
|
||||||
|
_done_lock = threading.Lock()
|
||||||
|
def queue_put(task):
|
||||||
|
_queue.append(task)
|
||||||
|
_active.append(True)
|
||||||
|
def queue_get():
|
||||||
|
try:
|
||||||
|
task = _queue.popleft()
|
||||||
|
except IndexError:
|
||||||
|
raise queue.Empty
|
||||||
|
return task
|
||||||
|
def queue_task_done():
|
||||||
|
_active.pop()
|
||||||
|
if not _active:
|
||||||
|
try:
|
||||||
|
_done_lock.release()
|
||||||
|
except RuntimeError:
|
||||||
|
assert not _done_lock.locked()
|
||||||
|
def queue_empty():
|
||||||
|
return not _queue
|
||||||
|
def queue_join():
|
||||||
|
_done_lock.acquire()
|
||||||
|
_done_lock.release()
|
||||||
|
|
||||||
|
tasks = []
|
||||||
|
for i in range(20):
|
||||||
|
task = self.PendingTask(
|
||||||
|
req=f'request {i}',
|
||||||
|
taskid=i,
|
||||||
|
notify_done=queue_task_done,
|
||||||
|
)
|
||||||
|
tasks.append(task)
|
||||||
|
queue_put(task)
|
||||||
|
# This will be released once all the tasks have finished.
|
||||||
|
_done_lock.acquire()
|
||||||
|
|
||||||
|
def add_tasks(worker_tids):
|
||||||
|
while True:
|
||||||
|
if done:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
task = queue_get()
|
||||||
|
except queue.Empty:
|
||||||
|
break
|
||||||
|
task.run_in_pending_call(worker_tids)
|
||||||
|
|
||||||
|
done = False
|
||||||
|
def run_tasks():
|
||||||
|
while not queue_empty():
|
||||||
|
if done:
|
||||||
|
return
|
||||||
|
time.sleep(0.01)
|
||||||
|
# Give the worker a chance to handle any remaining pending calls.
|
||||||
|
while not done:
|
||||||
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
# Start the workers and wait for them to finish.
|
||||||
|
worker_threads = [threading.Thread(target=run_tasks)
|
||||||
|
for _ in range(3)]
|
||||||
|
with threading_helper.start_threads(worker_threads):
|
||||||
|
try:
|
||||||
|
# Add a pending call for each task.
|
||||||
|
worker_tids = [t.ident for t in worker_threads]
|
||||||
|
threads = [threading.Thread(target=add_tasks, args=(worker_tids,))
|
||||||
|
for _ in range(3)]
|
||||||
|
with threading_helper.start_threads(threads):
|
||||||
|
try:
|
||||||
|
pass
|
||||||
|
except BaseException:
|
||||||
|
done = True
|
||||||
|
raise # re-raise
|
||||||
|
# Wait for the pending calls to finish.
|
||||||
|
queue_join()
|
||||||
|
# Notify the workers that they can stop.
|
||||||
|
done = True
|
||||||
|
except BaseException:
|
||||||
|
done = True
|
||||||
|
raise # re-raise
|
||||||
|
runner_tids = [t.runner_tid for t in tasks]
|
||||||
|
|
||||||
|
self.assertNotIn(main_tid, runner_tids)
|
||||||
|
for task in tasks:
|
||||||
|
with self.subTest(f'task {task.id}'):
|
||||||
|
self.assertNotEqual(task.requester_tid, main_tid)
|
||||||
|
self.assertNotEqual(task.requester_tid, task.runner_tid)
|
||||||
|
self.assertNotIn(task.requester_tid, runner_tids)
|
||||||
|
|
||||||
|
@requires_subinterpreters
|
||||||
|
def test_isolated_subinterpreter(self):
|
||||||
|
# We exercise the most important permutations.
|
||||||
|
|
||||||
|
# This test relies on pending calls getting called
|
||||||
|
# (eval breaker tripped) at each loop iteration
|
||||||
|
# and at each call.
|
||||||
|
|
||||||
|
maxtext = 250
|
||||||
|
main_interpid = 0
|
||||||
|
interpid = _interpreters.create()
|
||||||
|
_interpreters.run_string(interpid, f"""if True:
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import _testinternalcapi
|
||||||
|
from test.support import threading_helper
|
||||||
|
""")
|
||||||
|
|
||||||
|
def create_pipe():
|
||||||
|
r, w = os.pipe()
|
||||||
|
self.addCleanup(lambda: os.close(r))
|
||||||
|
self.addCleanup(lambda: os.close(w))
|
||||||
|
return r, w
|
||||||
|
|
||||||
|
with self.subTest('add in main, run in subinterpreter'):
|
||||||
|
r_ready, w_ready = create_pipe()
|
||||||
|
r_done, w_done= create_pipe()
|
||||||
|
timeout = time.time() + 30 # seconds
|
||||||
|
|
||||||
|
def do_work():
|
||||||
|
_interpreters.run_string(interpid, f"""if True:
|
||||||
|
# Wait until this interp has handled the pending call.
|
||||||
|
waiting = False
|
||||||
|
done = False
|
||||||
|
def wait(os_read=os.read):
|
||||||
|
global done, waiting
|
||||||
|
waiting = True
|
||||||
|
os_read({r_done}, 1)
|
||||||
|
done = True
|
||||||
|
t = threading.Thread(target=wait)
|
||||||
|
with threading_helper.start_threads([t]):
|
||||||
|
while not waiting:
|
||||||
|
pass
|
||||||
|
os.write({w_ready}, b'\\0')
|
||||||
|
# Loop to trigger the eval breaker.
|
||||||
|
while not done:
|
||||||
|
time.sleep(0.01)
|
||||||
|
if time.time() > {timeout}:
|
||||||
|
raise Exception('timed out!')
|
||||||
|
""")
|
||||||
|
t = threading.Thread(target=do_work)
|
||||||
|
with threading_helper.start_threads([t]):
|
||||||
|
os.read(r_ready, 1)
|
||||||
|
# Add the pending call and wait for it to finish.
|
||||||
|
actual = _testinternalcapi.pending_identify(interpid)
|
||||||
|
# Signal the subinterpreter to stop.
|
||||||
|
os.write(w_done, b'\0')
|
||||||
|
|
||||||
|
self.assertEqual(actual, int(interpid))
|
||||||
|
|
||||||
|
with self.subTest('add in main, run in subinterpreter sub-thread'):
|
||||||
|
r_ready, w_ready = create_pipe()
|
||||||
|
r_done, w_done= create_pipe()
|
||||||
|
timeout = time.time() + 30 # seconds
|
||||||
|
|
||||||
|
def do_work():
|
||||||
|
_interpreters.run_string(interpid, f"""if True:
|
||||||
|
waiting = False
|
||||||
|
done = False
|
||||||
|
def subthread():
|
||||||
|
while not waiting:
|
||||||
|
pass
|
||||||
|
os.write({w_ready}, b'\\0')
|
||||||
|
# Loop to trigger the eval breaker.
|
||||||
|
while not done:
|
||||||
|
time.sleep(0.01)
|
||||||
|
if time.time() > {timeout}:
|
||||||
|
raise Exception('timed out!')
|
||||||
|
t = threading.Thread(target=subthread)
|
||||||
|
with threading_helper.start_threads([t]):
|
||||||
|
# Wait until this interp has handled the pending call.
|
||||||
|
waiting = True
|
||||||
|
os.read({r_done}, 1)
|
||||||
|
done = True
|
||||||
|
""")
|
||||||
|
t = threading.Thread(target=do_work)
|
||||||
|
with threading_helper.start_threads([t]):
|
||||||
|
os.read(r_ready, 1)
|
||||||
|
# Add the pending call and wait for it to finish.
|
||||||
|
actual = _testinternalcapi.pending_identify(interpid)
|
||||||
|
# Signal the subinterpreter to stop.
|
||||||
|
os.write(w_done, b'\0')
|
||||||
|
|
||||||
|
self.assertEqual(actual, int(interpid))
|
||||||
|
|
||||||
|
with self.subTest('add in subinterpreter, run in main'):
|
||||||
|
r_ready, w_ready = create_pipe()
|
||||||
|
r_done, w_done= create_pipe()
|
||||||
|
r_data, w_data= create_pipe()
|
||||||
|
timeout = time.time() + 30 # seconds
|
||||||
|
|
||||||
|
def add_job():
|
||||||
|
os.read(r_ready, 1)
|
||||||
|
_interpreters.run_string(interpid, f"""if True:
|
||||||
|
# Add the pending call and wait for it to finish.
|
||||||
|
actual = _testinternalcapi.pending_identify({main_interpid})
|
||||||
|
# Signal the subinterpreter to stop.
|
||||||
|
os.write({w_done}, b'\\0')
|
||||||
|
os.write({w_data}, actual.to_bytes(1, 'little'))
|
||||||
|
""")
|
||||||
|
# Wait until this interp has handled the pending call.
|
||||||
|
waiting = False
|
||||||
|
done = False
|
||||||
|
def wait(os_read=os.read):
|
||||||
|
nonlocal done, waiting
|
||||||
|
waiting = True
|
||||||
|
os_read(r_done, 1)
|
||||||
|
done = True
|
||||||
|
t1 = threading.Thread(target=add_job)
|
||||||
|
t2 = threading.Thread(target=wait)
|
||||||
|
with threading_helper.start_threads([t1, t2]):
|
||||||
|
while not waiting:
|
||||||
|
pass
|
||||||
|
os.write(w_ready, b'\0')
|
||||||
|
# Loop to trigger the eval breaker.
|
||||||
|
while not done:
|
||||||
|
time.sleep(0.01)
|
||||||
|
if time.time() > timeout:
|
||||||
|
raise Exception('timed out!')
|
||||||
|
text = os.read(r_data, 1)
|
||||||
|
actual = int.from_bytes(text, 'little')
|
||||||
|
|
||||||
|
self.assertEqual(actual, int(main_interpid))
|
||||||
|
|
||||||
|
with self.subTest('add in subinterpreter, run in sub-thread'):
|
||||||
|
r_ready, w_ready = create_pipe()
|
||||||
|
r_done, w_done= create_pipe()
|
||||||
|
r_data, w_data= create_pipe()
|
||||||
|
timeout = time.time() + 30 # seconds
|
||||||
|
|
||||||
|
def add_job():
|
||||||
|
os.read(r_ready, 1)
|
||||||
|
_interpreters.run_string(interpid, f"""if True:
|
||||||
|
# Add the pending call and wait for it to finish.
|
||||||
|
actual = _testinternalcapi.pending_identify({main_interpid})
|
||||||
|
# Signal the subinterpreter to stop.
|
||||||
|
os.write({w_done}, b'\\0')
|
||||||
|
os.write({w_data}, actual.to_bytes(1, 'little'))
|
||||||
|
""")
|
||||||
|
# Wait until this interp has handled the pending call.
|
||||||
|
waiting = False
|
||||||
|
done = False
|
||||||
|
def wait(os_read=os.read):
|
||||||
|
nonlocal done, waiting
|
||||||
|
waiting = True
|
||||||
|
os_read(r_done, 1)
|
||||||
|
done = True
|
||||||
|
def subthread():
|
||||||
|
while not waiting:
|
||||||
|
pass
|
||||||
|
os.write(w_ready, b'\0')
|
||||||
|
# Loop to trigger the eval breaker.
|
||||||
|
while not done:
|
||||||
|
time.sleep(0.01)
|
||||||
|
if time.time() > timeout:
|
||||||
|
raise Exception('timed out!')
|
||||||
|
t1 = threading.Thread(target=add_job)
|
||||||
|
t2 = threading.Thread(target=wait)
|
||||||
|
t3 = threading.Thread(target=subthread)
|
||||||
|
with threading_helper.start_threads([t1, t2, t3]):
|
||||||
|
pass
|
||||||
|
text = os.read(r_data, 1)
|
||||||
|
actual = int.from_bytes(text, 'little')
|
||||||
|
|
||||||
|
self.assertEqual(actual, int(main_interpid))
|
||||||
|
|
||||||
|
# XXX We can't use the rest until gh-105716 is fixed.
|
||||||
|
return
|
||||||
|
|
||||||
|
with self.subTest('add in subinterpreter, run in subinterpreter sub-thread'):
|
||||||
|
r_ready, w_ready = create_pipe()
|
||||||
|
r_done, w_done= create_pipe()
|
||||||
|
r_data, w_data= create_pipe()
|
||||||
|
timeout = time.time() + 30 # seconds
|
||||||
|
|
||||||
|
def do_work():
|
||||||
|
_interpreters.run_string(interpid, f"""if True:
|
||||||
|
waiting = False
|
||||||
|
done = False
|
||||||
|
def subthread():
|
||||||
|
while not waiting:
|
||||||
|
pass
|
||||||
|
os.write({w_ready}, b'\\0')
|
||||||
|
# Loop to trigger the eval breaker.
|
||||||
|
while not done:
|
||||||
|
time.sleep(0.01)
|
||||||
|
if time.time() > {timeout}:
|
||||||
|
raise Exception('timed out!')
|
||||||
|
t = threading.Thread(target=subthread)
|
||||||
|
with threading_helper.start_threads([t]):
|
||||||
|
# Wait until this interp has handled the pending call.
|
||||||
|
waiting = True
|
||||||
|
os.read({r_done}, 1)
|
||||||
|
done = True
|
||||||
|
""")
|
||||||
|
t = threading.Thread(target=do_work)
|
||||||
|
#with threading_helper.start_threads([t]):
|
||||||
|
t.start()
|
||||||
|
if True:
|
||||||
|
os.read(r_ready, 1)
|
||||||
|
_interpreters.run_string(interpid, f"""if True:
|
||||||
|
# Add the pending call and wait for it to finish.
|
||||||
|
actual = _testinternalcapi.pending_identify({interpid})
|
||||||
|
# Signal the subinterpreter to stop.
|
||||||
|
os.write({w_done}, b'\\0')
|
||||||
|
os.write({w_data}, actual.to_bytes(1, 'little'))
|
||||||
|
""")
|
||||||
|
t.join()
|
||||||
|
text = os.read(r_data, 1)
|
||||||
|
actual = int.from_bytes(text, 'little')
|
||||||
|
|
||||||
|
self.assertEqual(actual, int(interpid))
|
||||||
|
|
||||||
|
|
||||||
class SubinterpreterTest(unittest.TestCase):
|
class SubinterpreterTest(unittest.TestCase):
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
The "pending call" machinery now works for all interpreters, not just the
|
||||||
|
main interpreter, and runs in all threads, not just the main thread. Some
|
||||||
|
calls are still only done in the main thread, ergo in the main interpreter.
|
||||||
|
This change does not affect signal handling nor the existing public C-API
|
||||||
|
(``Py_AddPendingCall()``), which both still only target the main thread.
|
||||||
|
The new functionality is meant strictly for internal use for now, since
|
||||||
|
consequences of its use are not well understood yet outside some very
|
||||||
|
restricted cases. This change brings the capability in line with the
|
||||||
|
intention when the state was made per-interpreter several years ago.
|
|
@ -210,6 +210,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
|
||||||
PyObject *item;
|
PyObject *item;
|
||||||
PyLockStatus r;
|
PyLockStatus r;
|
||||||
PY_TIMEOUT_T microseconds;
|
PY_TIMEOUT_T microseconds;
|
||||||
|
PyThreadState *tstate = PyThreadState_Get();
|
||||||
|
|
||||||
if (block == 0) {
|
if (block == 0) {
|
||||||
/* Non-blocking */
|
/* Non-blocking */
|
||||||
|
@ -253,7 +254,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
|
||||||
Py_END_ALLOW_THREADS
|
Py_END_ALLOW_THREADS
|
||||||
}
|
}
|
||||||
|
|
||||||
if (r == PY_LOCK_INTR && Py_MakePendingCalls() < 0) {
|
if (r == PY_LOCK_INTR && _PyEval_MakePendingCalls(tstate) < 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (r == PY_LOCK_FAILURE) {
|
if (r == PY_LOCK_FAILURE) {
|
||||||
|
|
|
@ -13,16 +13,18 @@
|
||||||
|
|
||||||
#include "Python.h"
|
#include "Python.h"
|
||||||
#include "frameobject.h"
|
#include "frameobject.h"
|
||||||
|
#include "interpreteridobject.h" // _PyInterpreterID_LookUp()
|
||||||
#include "pycore_atomic_funcs.h" // _Py_atomic_int_get()
|
#include "pycore_atomic_funcs.h" // _Py_atomic_int_get()
|
||||||
#include "pycore_bitutils.h" // _Py_bswap32()
|
#include "pycore_bitutils.h" // _Py_bswap32()
|
||||||
#include "pycore_compile.h" // _PyCompile_CodeGen, _PyCompile_OptimizeCfg, _PyCompile_Assemble
|
#include "pycore_compile.h" // _PyCompile_CodeGen, _PyCompile_OptimizeCfg, _PyCompile_Assemble
|
||||||
|
#include "pycore_ceval.h" // _PyEval_AddPendingCall
|
||||||
#include "pycore_fileutils.h" // _Py_normpath
|
#include "pycore_fileutils.h" // _Py_normpath
|
||||||
#include "pycore_frame.h" // _PyInterpreterFrame
|
#include "pycore_frame.h" // _PyInterpreterFrame
|
||||||
#include "pycore_gc.h" // PyGC_Head
|
#include "pycore_gc.h" // PyGC_Head
|
||||||
#include "pycore_hashtable.h" // _Py_hashtable_new()
|
#include "pycore_hashtable.h" // _Py_hashtable_new()
|
||||||
#include "pycore_initconfig.h" // _Py_GetConfigsAsDict()
|
#include "pycore_initconfig.h" // _Py_GetConfigsAsDict()
|
||||||
#include "pycore_pathconfig.h" // _PyPathConfig_ClearGlobal()
|
|
||||||
#include "pycore_interp.h" // _PyInterpreterState_GetConfigCopy()
|
#include "pycore_interp.h" // _PyInterpreterState_GetConfigCopy()
|
||||||
|
#include "pycore_pathconfig.h" // _PyPathConfig_ClearGlobal()
|
||||||
#include "pycore_pyerrors.h" // _Py_UTF8_Edit_Cost()
|
#include "pycore_pyerrors.h" // _Py_UTF8_Edit_Cost()
|
||||||
#include "pycore_pystate.h" // _PyThreadState_GET()
|
#include "pycore_pystate.h" // _PyThreadState_GET()
|
||||||
#include "osdefs.h" // MAXPATHLEN
|
#include "osdefs.h" // MAXPATHLEN
|
||||||
|
@ -838,6 +840,120 @@ set_optimizer(PyObject *self, PyObject *opt)
|
||||||
Py_RETURN_NONE;
|
Py_RETURN_NONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int _pending_callback(void *arg)
|
||||||
|
{
|
||||||
|
/* we assume the argument is callable object to which we own a reference */
|
||||||
|
PyObject *callable = (PyObject *)arg;
|
||||||
|
PyObject *r = PyObject_CallNoArgs(callable);
|
||||||
|
Py_DECREF(callable);
|
||||||
|
Py_XDECREF(r);
|
||||||
|
return r != NULL ? 0 : -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* The following requests n callbacks to _pending_callback. It can be
|
||||||
|
* run from any python thread.
|
||||||
|
*/
|
||||||
|
static PyObject *
|
||||||
|
pending_threadfunc(PyObject *self, PyObject *args, PyObject *kwargs)
|
||||||
|
{
|
||||||
|
PyObject *callable;
|
||||||
|
int ensure_added = 0;
|
||||||
|
static char *kwlist[] = {"", "ensure_added", NULL};
|
||||||
|
if (!PyArg_ParseTupleAndKeywords(args, kwargs,
|
||||||
|
"O|$p:pending_threadfunc", kwlist,
|
||||||
|
&callable, &ensure_added))
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
PyInterpreterState *interp = PyInterpreterState_Get();
|
||||||
|
|
||||||
|
/* create the reference for the callbackwhile we hold the lock */
|
||||||
|
Py_INCREF(callable);
|
||||||
|
|
||||||
|
int r;
|
||||||
|
Py_BEGIN_ALLOW_THREADS
|
||||||
|
r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0);
|
||||||
|
Py_END_ALLOW_THREADS
|
||||||
|
if (r < 0) {
|
||||||
|
/* unsuccessful add */
|
||||||
|
if (!ensure_added) {
|
||||||
|
Py_DECREF(callable);
|
||||||
|
Py_RETURN_FALSE;
|
||||||
|
}
|
||||||
|
do {
|
||||||
|
Py_BEGIN_ALLOW_THREADS
|
||||||
|
r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0);
|
||||||
|
Py_END_ALLOW_THREADS
|
||||||
|
} while (r < 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
Py_RETURN_TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static struct {
|
||||||
|
int64_t interpid;
|
||||||
|
} pending_identify_result;
|
||||||
|
|
||||||
|
static int
|
||||||
|
_pending_identify_callback(void *arg)
|
||||||
|
{
|
||||||
|
PyThread_type_lock mutex = (PyThread_type_lock)arg;
|
||||||
|
assert(pending_identify_result.interpid == -1);
|
||||||
|
PyThreadState *tstate = PyThreadState_Get();
|
||||||
|
pending_identify_result.interpid = PyInterpreterState_GetID(tstate->interp);
|
||||||
|
PyThread_release_lock(mutex);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static PyObject *
|
||||||
|
pending_identify(PyObject *self, PyObject *args)
|
||||||
|
{
|
||||||
|
PyObject *interpid;
|
||||||
|
if (!PyArg_ParseTuple(args, "O:pending_identify", &interpid)) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
PyInterpreterState *interp = _PyInterpreterID_LookUp(interpid);
|
||||||
|
if (interp == NULL) {
|
||||||
|
if (!PyErr_Occurred()) {
|
||||||
|
PyErr_SetString(PyExc_ValueError, "interpreter not found");
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pending_identify_result.interpid = -1;
|
||||||
|
|
||||||
|
PyThread_type_lock mutex = PyThread_allocate_lock();
|
||||||
|
if (mutex == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
PyThread_acquire_lock(mutex, WAIT_LOCK);
|
||||||
|
/* It gets released in _pending_identify_callback(). */
|
||||||
|
|
||||||
|
int r;
|
||||||
|
do {
|
||||||
|
Py_BEGIN_ALLOW_THREADS
|
||||||
|
r = _PyEval_AddPendingCall(interp,
|
||||||
|
&_pending_identify_callback, (void *)mutex,
|
||||||
|
0);
|
||||||
|
Py_END_ALLOW_THREADS
|
||||||
|
} while (r < 0);
|
||||||
|
|
||||||
|
/* Wait for the pending call to complete. */
|
||||||
|
PyThread_acquire_lock(mutex, WAIT_LOCK);
|
||||||
|
PyThread_release_lock(mutex);
|
||||||
|
PyThread_free_lock(mutex);
|
||||||
|
|
||||||
|
PyObject *res = PyLong_FromLongLong(pending_identify_result.interpid);
|
||||||
|
pending_identify_result.interpid = -1;
|
||||||
|
if (res == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static PyMethodDef module_functions[] = {
|
static PyMethodDef module_functions[] = {
|
||||||
{"get_configs", get_configs, METH_NOARGS},
|
{"get_configs", get_configs, METH_NOARGS},
|
||||||
{"get_recursion_depth", get_recursion_depth, METH_NOARGS},
|
{"get_recursion_depth", get_recursion_depth, METH_NOARGS},
|
||||||
|
@ -868,6 +984,10 @@ static PyMethodDef module_functions[] = {
|
||||||
{"iframe_getlasti", iframe_getlasti, METH_O, NULL},
|
{"iframe_getlasti", iframe_getlasti, METH_O, NULL},
|
||||||
{"set_optimizer", set_optimizer, METH_O, NULL},
|
{"set_optimizer", set_optimizer, METH_O, NULL},
|
||||||
{"get_counter_optimizer", get_counter_optimizer, METH_NOARGS, NULL},
|
{"get_counter_optimizer", get_counter_optimizer, METH_NOARGS, NULL},
|
||||||
|
{"pending_threadfunc", _PyCFunction_CAST(pending_threadfunc),
|
||||||
|
METH_VARARGS | METH_KEYWORDS},
|
||||||
|
// {"pending_fd_identify", pending_fd_identify, METH_VARARGS, NULL},
|
||||||
|
{"pending_identify", pending_identify, METH_VARARGS, NULL},
|
||||||
{NULL, NULL} /* sentinel */
|
{NULL, NULL} /* sentinel */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -81,6 +81,7 @@ lock_dealloc(lockobject *self)
|
||||||
static PyLockStatus
|
static PyLockStatus
|
||||||
acquire_timed(PyThread_type_lock lock, _PyTime_t timeout)
|
acquire_timed(PyThread_type_lock lock, _PyTime_t timeout)
|
||||||
{
|
{
|
||||||
|
PyThreadState *tstate = _PyThreadState_GET();
|
||||||
_PyTime_t endtime = 0;
|
_PyTime_t endtime = 0;
|
||||||
if (timeout > 0) {
|
if (timeout > 0) {
|
||||||
endtime = _PyDeadline_Init(timeout);
|
endtime = _PyDeadline_Init(timeout);
|
||||||
|
@ -103,7 +104,7 @@ acquire_timed(PyThread_type_lock lock, _PyTime_t timeout)
|
||||||
/* Run signal handlers if we were interrupted. Propagate
|
/* Run signal handlers if we were interrupted. Propagate
|
||||||
* exceptions from signal handlers, such as KeyboardInterrupt, by
|
* exceptions from signal handlers, such as KeyboardInterrupt, by
|
||||||
* passing up PY_LOCK_INTR. */
|
* passing up PY_LOCK_INTR. */
|
||||||
if (Py_MakePendingCalls() < 0) {
|
if (_PyEval_MakePendingCalls(tstate) < 0) {
|
||||||
return PY_LOCK_INTR;
|
return PY_LOCK_INTR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -314,7 +314,8 @@ trip_signal(int sig_num)
|
||||||
still use it for this exceptional case. */
|
still use it for this exceptional case. */
|
||||||
_PyEval_AddPendingCall(interp,
|
_PyEval_AddPendingCall(interp,
|
||||||
report_wakeup_send_error,
|
report_wakeup_send_error,
|
||||||
(void *)(intptr_t) last_error);
|
(void *)(intptr_t) last_error,
|
||||||
|
1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -333,7 +334,8 @@ trip_signal(int sig_num)
|
||||||
still use it for this exceptional case. */
|
still use it for this exceptional case. */
|
||||||
_PyEval_AddPendingCall(interp,
|
_PyEval_AddPendingCall(interp,
|
||||||
report_wakeup_write_error,
|
report_wakeup_write_error,
|
||||||
(void *)(intptr_t)errno);
|
(void *)(intptr_t)errno,
|
||||||
|
1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -758,6 +758,61 @@ handle_eval_breaker:
|
||||||
* We need to do reasonably frequently, but not too frequently.
|
* We need to do reasonably frequently, but not too frequently.
|
||||||
* All loops should include a check of the eval breaker.
|
* All loops should include a check of the eval breaker.
|
||||||
* We also check on return from any builtin function.
|
* We also check on return from any builtin function.
|
||||||
|
*
|
||||||
|
* ## More Details ###
|
||||||
|
*
|
||||||
|
* The eval loop (this function) normally executes the instructions
|
||||||
|
* of a code object sequentially. However, the runtime supports a
|
||||||
|
* number of out-of-band execution scenarios that may pause that
|
||||||
|
* sequential execution long enough to do that out-of-band work
|
||||||
|
* in the current thread using the current PyThreadState.
|
||||||
|
*
|
||||||
|
* The scenarios include:
|
||||||
|
*
|
||||||
|
* - cyclic garbage collection
|
||||||
|
* - GIL drop requests
|
||||||
|
* - "async" exceptions
|
||||||
|
* - "pending calls" (some only in the main thread)
|
||||||
|
* - signal handling (only in the main thread)
|
||||||
|
*
|
||||||
|
* When the need for one of the above is detected, the eval loop
|
||||||
|
* pauses long enough to handle the detected case. Then, if doing
|
||||||
|
* so didn't trigger an exception, the eval loop resumes executing
|
||||||
|
* the sequential instructions.
|
||||||
|
*
|
||||||
|
* To make this work, the eval loop periodically checks if any
|
||||||
|
* of the above needs to happen. The individual checks can be
|
||||||
|
* expensive if computed each time, so a while back we switched
|
||||||
|
* to using pre-computed, per-interpreter variables for the checks,
|
||||||
|
* and later consolidated that to a single "eval breaker" variable
|
||||||
|
* (now a PyInterpreterState field).
|
||||||
|
*
|
||||||
|
* For the longest time, the eval breaker check would happen
|
||||||
|
* frequently, every 5 or so times through the loop, regardless
|
||||||
|
* of what instruction ran last or what would run next. Then, in
|
||||||
|
* early 2021 (gh-18334, commit 4958f5d), we switched to checking
|
||||||
|
* the eval breaker less frequently, by hard-coding the check to
|
||||||
|
* specific places in the eval loop (e.g. certain instructions).
|
||||||
|
* The intent then was to check after returning from calls
|
||||||
|
* and on the back edges of loops.
|
||||||
|
*
|
||||||
|
* In addition to being more efficient, that approach keeps
|
||||||
|
* the eval loop from running arbitrary code between instructions
|
||||||
|
* that don't handle that well. (See gh-74174.)
|
||||||
|
*
|
||||||
|
* Currently, the eval breaker check happens here at the
|
||||||
|
* "handle_eval_breaker" label. Some instructions come here
|
||||||
|
* explicitly (goto) and some indirectly. Notably, the check
|
||||||
|
* happens on back edges in the control flow graph, which
|
||||||
|
* pretty much applies to all loops and most calls.
|
||||||
|
* (See bytecodes.c for exact information.)
|
||||||
|
*
|
||||||
|
* One consequence of this approach is that it might not be obvious
|
||||||
|
* how to force any specific thread to pick up the eval breaker,
|
||||||
|
* or for any specific thread to not pick it up. Mostly this
|
||||||
|
* involves judicious uses of locks and careful ordering of code,
|
||||||
|
* while avoiding code that might trigger the eval breaker
|
||||||
|
* until so desired.
|
||||||
*/
|
*/
|
||||||
if (_Py_HandlePending(tstate) != 0) {
|
if (_Py_HandlePending(tstate) != 0) {
|
||||||
goto error;
|
goto error;
|
||||||
|
|
|
@ -68,8 +68,9 @@ COMPUTE_EVAL_BREAKER(PyInterpreterState *interp,
|
||||||
_Py_atomic_load_relaxed_int32(&ceval2->gil_drop_request)
|
_Py_atomic_load_relaxed_int32(&ceval2->gil_drop_request)
|
||||||
| (_Py_atomic_load_relaxed_int32(&ceval->signals_pending)
|
| (_Py_atomic_load_relaxed_int32(&ceval->signals_pending)
|
||||||
&& _Py_ThreadCanHandleSignals(interp))
|
&& _Py_ThreadCanHandleSignals(interp))
|
||||||
| (_Py_atomic_load_relaxed_int32(&ceval2->pending.calls_to_do)
|
| (_Py_atomic_load_relaxed_int32(&ceval2->pending.calls_to_do))
|
||||||
&& _Py_ThreadCanHandlePendingCalls())
|
| (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)
|
||||||
|
&&_Py_atomic_load_relaxed_int32(&ceval->pending_mainthread.calls_to_do))
|
||||||
| ceval2->pending.async_exc
|
| ceval2->pending.async_exc
|
||||||
| _Py_atomic_load_relaxed_int32(&ceval2->gc_scheduled));
|
| _Py_atomic_load_relaxed_int32(&ceval2->gc_scheduled));
|
||||||
}
|
}
|
||||||
|
@ -95,11 +96,11 @@ RESET_GIL_DROP_REQUEST(PyInterpreterState *interp)
|
||||||
|
|
||||||
|
|
||||||
static inline void
|
static inline void
|
||||||
SIGNAL_PENDING_CALLS(PyInterpreterState *interp)
|
SIGNAL_PENDING_CALLS(struct _pending_calls *pending, PyInterpreterState *interp)
|
||||||
{
|
{
|
||||||
struct _ceval_runtime_state *ceval = &interp->runtime->ceval;
|
struct _ceval_runtime_state *ceval = &interp->runtime->ceval;
|
||||||
struct _ceval_state *ceval2 = &interp->ceval;
|
struct _ceval_state *ceval2 = &interp->ceval;
|
||||||
_Py_atomic_store_relaxed(&ceval2->pending.calls_to_do, 1);
|
_Py_atomic_store_relaxed(&pending->calls_to_do, 1);
|
||||||
COMPUTE_EVAL_BREAKER(interp, ceval, ceval2);
|
COMPUTE_EVAL_BREAKER(interp, ceval, ceval2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,6 +110,9 @@ UNSIGNAL_PENDING_CALLS(PyInterpreterState *interp)
|
||||||
{
|
{
|
||||||
struct _ceval_runtime_state *ceval = &interp->runtime->ceval;
|
struct _ceval_runtime_state *ceval = &interp->runtime->ceval;
|
||||||
struct _ceval_state *ceval2 = &interp->ceval;
|
struct _ceval_state *ceval2 = &interp->ceval;
|
||||||
|
if (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)) {
|
||||||
|
_Py_atomic_store_relaxed(&ceval->pending_mainthread.calls_to_do, 0);
|
||||||
|
}
|
||||||
_Py_atomic_store_relaxed(&ceval2->pending.calls_to_do, 0);
|
_Py_atomic_store_relaxed(&ceval2->pending.calls_to_do, 0);
|
||||||
COMPUTE_EVAL_BREAKER(interp, ceval, ceval2);
|
COMPUTE_EVAL_BREAKER(interp, ceval, ceval2);
|
||||||
}
|
}
|
||||||
|
@ -803,19 +807,31 @@ _push_pending_call(struct _pending_calls *pending,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
_next_pending_call(struct _pending_calls *pending,
|
||||||
|
int (**func)(void *), void **arg)
|
||||||
|
{
|
||||||
|
int i = pending->first;
|
||||||
|
if (i == pending->last) {
|
||||||
|
/* Queue empty */
|
||||||
|
assert(pending->calls[i].func == NULL);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
*func = pending->calls[i].func;
|
||||||
|
*arg = pending->calls[i].arg;
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
|
||||||
/* Pop one item off the queue while holding the lock. */
|
/* Pop one item off the queue while holding the lock. */
|
||||||
static void
|
static void
|
||||||
_pop_pending_call(struct _pending_calls *pending,
|
_pop_pending_call(struct _pending_calls *pending,
|
||||||
int (**func)(void *), void **arg)
|
int (**func)(void *), void **arg)
|
||||||
{
|
{
|
||||||
int i = pending->first;
|
int i = _next_pending_call(pending, func, arg);
|
||||||
if (i == pending->last) {
|
if (i >= 0) {
|
||||||
return; /* Queue empty */
|
pending->calls[i] = (struct _pending_call){0};
|
||||||
|
pending->first = (i + 1) % NPENDINGCALLS;
|
||||||
}
|
}
|
||||||
|
|
||||||
*func = pending->calls[i].func;
|
|
||||||
*arg = pending->calls[i].arg;
|
|
||||||
pending->first = (i + 1) % NPENDINGCALLS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This implementation is thread-safe. It allows
|
/* This implementation is thread-safe. It allows
|
||||||
|
@ -825,9 +841,16 @@ _pop_pending_call(struct _pending_calls *pending,
|
||||||
|
|
||||||
int
|
int
|
||||||
_PyEval_AddPendingCall(PyInterpreterState *interp,
|
_PyEval_AddPendingCall(PyInterpreterState *interp,
|
||||||
int (*func)(void *), void *arg)
|
int (*func)(void *), void *arg,
|
||||||
|
int mainthreadonly)
|
||||||
{
|
{
|
||||||
|
assert(!mainthreadonly || _Py_IsMainInterpreter(interp));
|
||||||
struct _pending_calls *pending = &interp->ceval.pending;
|
struct _pending_calls *pending = &interp->ceval.pending;
|
||||||
|
if (mainthreadonly) {
|
||||||
|
/* The main thread only exists in the main interpreter. */
|
||||||
|
assert(_Py_IsMainInterpreter(interp));
|
||||||
|
pending = &_PyRuntime.ceval.pending_mainthread;
|
||||||
|
}
|
||||||
/* Ensure that _PyEval_InitState() was called
|
/* Ensure that _PyEval_InitState() was called
|
||||||
and that _PyEval_FiniState() is not called yet. */
|
and that _PyEval_FiniState() is not called yet. */
|
||||||
assert(pending->lock != NULL);
|
assert(pending->lock != NULL);
|
||||||
|
@ -837,39 +860,17 @@ _PyEval_AddPendingCall(PyInterpreterState *interp,
|
||||||
PyThread_release_lock(pending->lock);
|
PyThread_release_lock(pending->lock);
|
||||||
|
|
||||||
/* signal main loop */
|
/* signal main loop */
|
||||||
SIGNAL_PENDING_CALLS(interp);
|
SIGNAL_PENDING_CALLS(pending, interp);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
Py_AddPendingCall(int (*func)(void *), void *arg)
|
Py_AddPendingCall(int (*func)(void *), void *arg)
|
||||||
{
|
{
|
||||||
/* Best-effort to support subinterpreters and calls with the GIL released.
|
/* Legacy users of this API will continue to target the main thread
|
||||||
|
(of the main interpreter). */
|
||||||
First attempt _PyThreadState_GET() since it supports subinterpreters.
|
PyInterpreterState *interp = _PyInterpreterState_Main();
|
||||||
|
return _PyEval_AddPendingCall(interp, func, arg, 1);
|
||||||
If the GIL is released, _PyThreadState_GET() returns NULL . In this
|
|
||||||
case, use PyGILState_GetThisThreadState() which works even if the GIL
|
|
||||||
is released.
|
|
||||||
|
|
||||||
Sadly, PyGILState_GetThisThreadState() doesn't support subinterpreters:
|
|
||||||
see bpo-10915 and bpo-15751.
|
|
||||||
|
|
||||||
Py_AddPendingCall() doesn't require the caller to hold the GIL. */
|
|
||||||
PyThreadState *tstate = _PyThreadState_GET();
|
|
||||||
if (tstate == NULL) {
|
|
||||||
tstate = PyGILState_GetThisThreadState();
|
|
||||||
}
|
|
||||||
|
|
||||||
PyInterpreterState *interp;
|
|
||||||
if (tstate != NULL) {
|
|
||||||
interp = tstate->interp;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
/* Last resort: use the main interpreter */
|
|
||||||
interp = _PyInterpreterState_Main();
|
|
||||||
}
|
|
||||||
return _PyEval_AddPendingCall(interp, func, arg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
|
@ -889,27 +890,24 @@ handle_signals(PyThreadState *tstate)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static inline int
|
||||||
make_pending_calls(PyInterpreterState *interp)
|
maybe_has_pending_calls(PyInterpreterState *interp)
|
||||||
{
|
{
|
||||||
/* only execute pending calls on main thread */
|
|
||||||
if (!_Py_ThreadCanHandlePendingCalls()) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* don't perform recursive pending calls */
|
|
||||||
if (interp->ceval.pending.busy) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
interp->ceval.pending.busy = 1;
|
|
||||||
|
|
||||||
/* unsignal before starting to call callbacks, so that any callback
|
|
||||||
added in-between re-signals */
|
|
||||||
UNSIGNAL_PENDING_CALLS(interp);
|
|
||||||
int res = 0;
|
|
||||||
|
|
||||||
/* perform a bounded number of calls, in case of recursion */
|
|
||||||
struct _pending_calls *pending = &interp->ceval.pending;
|
struct _pending_calls *pending = &interp->ceval.pending;
|
||||||
|
if (_Py_atomic_load_relaxed_int32(&pending->calls_to_do)) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (!_Py_IsMainThread() || !_Py_IsMainInterpreter(interp)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
pending = &_PyRuntime.ceval.pending_mainthread;
|
||||||
|
return _Py_atomic_load_relaxed_int32(&pending->calls_to_do);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
_make_pending_calls(struct _pending_calls *pending)
|
||||||
|
{
|
||||||
|
/* perform a bounded number of calls, in case of recursion */
|
||||||
for (int i=0; i<NPENDINGCALLS; i++) {
|
for (int i=0; i<NPENDINGCALLS; i++) {
|
||||||
int (*func)(void *) = NULL;
|
int (*func)(void *) = NULL;
|
||||||
void *arg = NULL;
|
void *arg = NULL;
|
||||||
|
@ -923,19 +921,61 @@ make_pending_calls(PyInterpreterState *interp)
|
||||||
if (func == NULL) {
|
if (func == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
res = func(arg);
|
if (func(arg) != 0) {
|
||||||
if (res) {
|
return -1;
|
||||||
goto error;
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
make_pending_calls(PyInterpreterState *interp)
|
||||||
|
{
|
||||||
|
struct _pending_calls *pending = &interp->ceval.pending;
|
||||||
|
struct _pending_calls *pending_main = &_PyRuntime.ceval.pending_mainthread;
|
||||||
|
|
||||||
|
/* Only one thread (per interpreter) may run the pending calls
|
||||||
|
at once. In the same way, we don't do recursive pending calls. */
|
||||||
|
PyThread_acquire_lock(pending->lock, WAIT_LOCK);
|
||||||
|
if (pending->busy) {
|
||||||
|
/* A pending call was added after another thread was already
|
||||||
|
handling the pending calls (and had already "unsignaled").
|
||||||
|
Once that thread is done, it may have taken care of all the
|
||||||
|
pending calls, or there might be some still waiting.
|
||||||
|
Regardless, this interpreter's pending calls will stay
|
||||||
|
"signaled" until that first thread has finished. At that
|
||||||
|
point the next thread to trip the eval breaker will take
|
||||||
|
care of any remaining pending calls. Until then, though,
|
||||||
|
all the interpreter's threads will be tripping the eval
|
||||||
|
breaker every time it's checked. */
|
||||||
|
PyThread_release_lock(pending->lock);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
pending->busy = 1;
|
||||||
|
PyThread_release_lock(pending->lock);
|
||||||
|
|
||||||
|
/* unsignal before starting to call callbacks, so that any callback
|
||||||
|
added in-between re-signals */
|
||||||
|
UNSIGNAL_PENDING_CALLS(interp);
|
||||||
|
|
||||||
|
if (_make_pending_calls(pending) != 0) {
|
||||||
|
pending->busy = 0;
|
||||||
|
/* There might not be more calls to make, but we play it safe. */
|
||||||
|
SIGNAL_PENDING_CALLS(pending, interp);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)) {
|
||||||
|
if (_make_pending_calls(pending_main) != 0) {
|
||||||
|
pending->busy = 0;
|
||||||
|
/* There might not be more calls to make, but we play it safe. */
|
||||||
|
SIGNAL_PENDING_CALLS(pending_main, interp);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
interp->ceval.pending.busy = 0;
|
pending->busy = 0;
|
||||||
return res;
|
return 0;
|
||||||
|
|
||||||
error:
|
|
||||||
interp->ceval.pending.busy = 0;
|
|
||||||
SIGNAL_PENDING_CALLS(interp);
|
|
||||||
return res;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
@ -944,12 +984,6 @@ _Py_FinishPendingCalls(PyThreadState *tstate)
|
||||||
assert(PyGILState_Check());
|
assert(PyGILState_Check());
|
||||||
assert(is_tstate_valid(tstate));
|
assert(is_tstate_valid(tstate));
|
||||||
|
|
||||||
struct _pending_calls *pending = &tstate->interp->ceval.pending;
|
|
||||||
|
|
||||||
if (!_Py_atomic_load_relaxed_int32(&(pending->calls_to_do))) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (make_pending_calls(tstate->interp) < 0) {
|
if (make_pending_calls(tstate->interp) < 0) {
|
||||||
PyObject *exc = _PyErr_GetRaisedException(tstate);
|
PyObject *exc = _PyErr_GetRaisedException(tstate);
|
||||||
PyErr_BadInternalCall();
|
PyErr_BadInternalCall();
|
||||||
|
@ -958,6 +992,29 @@ _Py_FinishPendingCalls(PyThreadState *tstate)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int
|
||||||
|
_PyEval_MakePendingCalls(PyThreadState *tstate)
|
||||||
|
{
|
||||||
|
int res;
|
||||||
|
|
||||||
|
if (_Py_IsMainThread() && _Py_IsMainInterpreter(tstate->interp)) {
|
||||||
|
/* Python signal handler doesn't really queue a callback:
|
||||||
|
it only signals that a signal was received,
|
||||||
|
see _PyEval_SignalReceived(). */
|
||||||
|
res = handle_signals(tstate);
|
||||||
|
if (res != 0) {
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
res = make_pending_calls(tstate->interp);
|
||||||
|
if (res != 0) {
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
/* Py_MakePendingCalls() is a simple wrapper for the sake
|
/* Py_MakePendingCalls() is a simple wrapper for the sake
|
||||||
of backward-compatibility. */
|
of backward-compatibility. */
|
||||||
int
|
int
|
||||||
|
@ -968,19 +1025,11 @@ Py_MakePendingCalls(void)
|
||||||
PyThreadState *tstate = _PyThreadState_GET();
|
PyThreadState *tstate = _PyThreadState_GET();
|
||||||
assert(is_tstate_valid(tstate));
|
assert(is_tstate_valid(tstate));
|
||||||
|
|
||||||
/* Python signal handler doesn't really queue a callback: it only signals
|
/* Only execute pending calls on the main thread. */
|
||||||
that a signal was received, see _PyEval_SignalReceived(). */
|
if (!_Py_IsMainThread() || !_Py_IsMainInterpreter(tstate->interp)) {
|
||||||
int res = handle_signals(tstate);
|
return 0;
|
||||||
if (res != 0) {
|
|
||||||
return res;
|
|
||||||
}
|
}
|
||||||
|
return _PyEval_MakePendingCalls(tstate);
|
||||||
res = make_pending_calls(tstate->interp);
|
|
||||||
if (res != 0) {
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
@ -1020,7 +1069,7 @@ _Py_HandlePending(PyThreadState *tstate)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Pending calls */
|
/* Pending calls */
|
||||||
if (_Py_atomic_load_relaxed_int32(&interp_ceval_state->pending.calls_to_do)) {
|
if (maybe_has_pending_calls(tstate->interp)) {
|
||||||
if (make_pending_calls(tstate->interp) != 0) {
|
if (make_pending_calls(tstate->interp) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2152,6 +2152,9 @@ Py_EndInterpreter(PyThreadState *tstate)
|
||||||
// Wrap up existing "threading"-module-created, non-daemon threads.
|
// Wrap up existing "threading"-module-created, non-daemon threads.
|
||||||
wait_for_thread_shutdown(tstate);
|
wait_for_thread_shutdown(tstate);
|
||||||
|
|
||||||
|
// Make any remaining pending calls.
|
||||||
|
_Py_FinishPendingCalls(tstate);
|
||||||
|
|
||||||
_PyAtExit_Call(tstate->interp);
|
_PyAtExit_Call(tstate->interp);
|
||||||
|
|
||||||
if (tstate != interp->threads.head || tstate->next != NULL) {
|
if (tstate != interp->threads.head || tstate->next != NULL) {
|
||||||
|
|
|
@ -380,7 +380,7 @@ _Py_COMP_DIAG_IGNORE_DEPR_DECLS
|
||||||
static const _PyRuntimeState initial = _PyRuntimeState_INIT(_PyRuntime);
|
static const _PyRuntimeState initial = _PyRuntimeState_INIT(_PyRuntime);
|
||||||
_Py_COMP_DIAG_POP
|
_Py_COMP_DIAG_POP
|
||||||
|
|
||||||
#define NUMLOCKS 8
|
#define NUMLOCKS 9
|
||||||
#define LOCKS_INIT(runtime) \
|
#define LOCKS_INIT(runtime) \
|
||||||
{ \
|
{ \
|
||||||
&(runtime)->interpreters.mutex, \
|
&(runtime)->interpreters.mutex, \
|
||||||
|
@ -388,6 +388,7 @@ _Py_COMP_DIAG_POP
|
||||||
&(runtime)->getargs.mutex, \
|
&(runtime)->getargs.mutex, \
|
||||||
&(runtime)->unicode_state.ids.lock, \
|
&(runtime)->unicode_state.ids.lock, \
|
||||||
&(runtime)->imports.extensions.mutex, \
|
&(runtime)->imports.extensions.mutex, \
|
||||||
|
&(runtime)->ceval.pending_mainthread.lock, \
|
||||||
&(runtime)->atexit.mutex, \
|
&(runtime)->atexit.mutex, \
|
||||||
&(runtime)->audit_hooks.mutex, \
|
&(runtime)->audit_hooks.mutex, \
|
||||||
&(runtime)->allocators.mutex, \
|
&(runtime)->allocators.mutex, \
|
||||||
|
|
|
@ -517,6 +517,7 @@ Modules/_testcapimodule.c - g_type_watchers_installed -
|
||||||
Modules/_testimportmultiple.c - _barmodule -
|
Modules/_testimportmultiple.c - _barmodule -
|
||||||
Modules/_testimportmultiple.c - _foomodule -
|
Modules/_testimportmultiple.c - _foomodule -
|
||||||
Modules/_testimportmultiple.c - _testimportmultiple -
|
Modules/_testimportmultiple.c - _testimportmultiple -
|
||||||
|
Modules/_testinternalcapi.c - pending_identify_result -
|
||||||
Modules/_testmultiphase.c - Example_Type_slots -
|
Modules/_testmultiphase.c - Example_Type_slots -
|
||||||
Modules/_testmultiphase.c - Example_Type_spec -
|
Modules/_testmultiphase.c - Example_Type_spec -
|
||||||
Modules/_testmultiphase.c - Example_methods -
|
Modules/_testmultiphase.c - Example_methods -
|
||||||
|
|
Can't render this file because it has a wrong number of fields in line 4.
|
Loading…
Add table
Add a link
Reference in a new issue