gh-123358: Use _PyStackRef in LOAD_DEREF (gh-130064)

Concurrent accesses from multiple threads to the same `cell` object did not
scale well in the free-threaded build. Use `_PyStackRef` and optimistically
avoid locking to improve scaling.

With the locks around cell reads gone, some of the free threading tests were
prone to starvation: the readers were able to run in a tight loop and the
writer threads weren't scheduled frequently enough to make timely progress.
Adjust the tests to avoid this.

Co-authored-by: Donghee Na <donghee.na@python.org>
This commit is contained in:
Sam Gross 2025-03-26 12:08:20 -04:00 committed by GitHub
parent 1b8bb1ed0c
commit 3d4ac1a2c2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 90 additions and 44 deletions

View file

@ -2,6 +2,8 @@
#define Py_INTERNAL_CELL_H
#include "pycore_critical_section.h"
#include "pycore_object.h"
#include "pycore_stackref.h"
#ifdef __cplusplus
extern "C" {
@ -19,7 +21,7 @@ PyCell_SwapTakeRef(PyCellObject *cell, PyObject *value)
PyObject *old_value;
Py_BEGIN_CRITICAL_SECTION(cell);
old_value = cell->ob_ref;
cell->ob_ref = value;
FT_ATOMIC_STORE_PTR_RELEASE(cell->ob_ref, value);
Py_END_CRITICAL_SECTION();
return old_value;
}
@ -37,11 +39,36 @@ PyCell_GetRef(PyCellObject *cell)
{
PyObject *res;
Py_BEGIN_CRITICAL_SECTION(cell);
#ifdef Py_GIL_DISABLED
res = _Py_XNewRefWithLock(cell->ob_ref);
#else
res = Py_XNewRef(cell->ob_ref);
#endif
Py_END_CRITICAL_SECTION();
return res;
}
static inline _PyStackRef
_PyCell_GetStackRef(PyCellObject *cell)
{
PyObject *value;
#ifdef Py_GIL_DISABLED
value = _Py_atomic_load_ptr(&cell->ob_ref);
if (value == NULL) {
return PyStackRef_NULL;
}
_PyStackRef ref;
if (_Py_TryIncrefCompareStackRef(&cell->ob_ref, value, &ref)) {
return ref;
}
#endif
value = PyCell_GetRef(cell);
if (value == NULL) {
return PyStackRef_NULL;
}
return PyStackRef_FromPyObjectSteal(value);
}
#ifdef __cplusplus
}
#endif

View file

@ -1195,7 +1195,7 @@ const struct opcode_metadata _PyOpcode_opcode_metadata[266] = {
[LOAD_CONST] = { true, INSTR_FMT_IB, HAS_ARG_FLAG | HAS_CONST_FLAG },
[LOAD_CONST_IMMORTAL] = { true, INSTR_FMT_IB, HAS_ARG_FLAG | HAS_CONST_FLAG },
[LOAD_CONST_MORTAL] = { true, INSTR_FMT_IB, HAS_ARG_FLAG | HAS_CONST_FLAG },
[LOAD_DEREF] = { true, INSTR_FMT_IB, HAS_ARG_FLAG | HAS_FREE_FLAG | HAS_ERROR_FLAG | HAS_ESCAPES_FLAG },
[LOAD_DEREF] = { true, INSTR_FMT_IB, HAS_ARG_FLAG | HAS_LOCAL_FLAG | HAS_ERROR_FLAG | HAS_ESCAPES_FLAG },
[LOAD_FAST] = { true, INSTR_FMT_IB, HAS_ARG_FLAG | HAS_LOCAL_FLAG | HAS_PURE_FLAG },
[LOAD_FAST_AND_CLEAR] = { true, INSTR_FMT_IB, HAS_ARG_FLAG | HAS_LOCAL_FLAG },
[LOAD_FAST_CHECK] = { true, INSTR_FMT_IB, HAS_ARG_FLAG | HAS_LOCAL_FLAG | HAS_ERROR_FLAG | HAS_ESCAPES_FLAG },

View file

@ -131,7 +131,7 @@ const uint16_t _PyUop_Flags[MAX_UOP_ID+1] = {
[_MAKE_CELL] = HAS_ARG_FLAG | HAS_FREE_FLAG | HAS_ERROR_FLAG | HAS_ERROR_NO_POP_FLAG | HAS_ESCAPES_FLAG,
[_DELETE_DEREF] = HAS_ARG_FLAG | HAS_FREE_FLAG | HAS_ERROR_FLAG | HAS_ERROR_NO_POP_FLAG | HAS_ESCAPES_FLAG,
[_LOAD_FROM_DICT_OR_DEREF] = HAS_ARG_FLAG | HAS_FREE_FLAG | HAS_ERROR_FLAG | HAS_ERROR_NO_POP_FLAG | HAS_ESCAPES_FLAG,
[_LOAD_DEREF] = HAS_ARG_FLAG | HAS_FREE_FLAG | HAS_ERROR_FLAG | HAS_ESCAPES_FLAG,
[_LOAD_DEREF] = HAS_ARG_FLAG | HAS_LOCAL_FLAG | HAS_ERROR_FLAG | HAS_ESCAPES_FLAG,
[_STORE_DEREF] = HAS_ARG_FLAG | HAS_FREE_FLAG | HAS_ESCAPES_FLAG,
[_COPY_FREE_VARS] = HAS_ARG_FLAG,
[_BUILD_STRING] = HAS_ARG_FLAG | HAS_ERROR_FLAG,

View file

@ -74,6 +74,7 @@ class TestDict(TestCase):
last = -1
while True:
if CUR == last:
time.sleep(0.001)
continue
elif CUR == OBJECT_COUNT:
break

View file

@ -27,13 +27,13 @@ def set_func_annotation(f, b):
@unittest.skipUnless(Py_GIL_DISABLED, "Enable only in FT build")
class TestFTFuncAnnotations(TestCase):
NUM_THREADS = 8
NUM_THREADS = 4
def test_concurrent_read(self):
def f(x: int) -> int:
return x + 1
for _ in range(100):
for _ in range(10):
with concurrent.futures.ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor:
b = Barrier(self.NUM_THREADS)
futures = {executor.submit(get_func_annotation, f, b): i for i in range(self.NUM_THREADS)}
@ -54,7 +54,7 @@ class TestFTFuncAnnotations(TestCase):
def bar(x: int, y: float) -> float:
return y ** x
for _ in range(100):
for _ in range(10):
with concurrent.futures.ThreadPoolExecutor(max_workers=self.NUM_THREADS) as executor:
b = Barrier(self.NUM_THREADS)
futures = {executor.submit(set_func_annotation, bar, b): i for i in range(self.NUM_THREADS)}

View file

@ -35,24 +35,30 @@ class TestGC(TestCase):
pass
def test_get_referrers(self):
NUM_GC = 2
NUM_MUTATORS = 4
b = threading.Barrier(NUM_GC + NUM_MUTATORS)
event = threading.Event()
obj = MyObj()
def gc_thread():
b.wait()
for i in range(100):
o = gc.get_referrers(obj)
event.set()
def mutator_thread():
b.wait()
while not event.is_set():
d1 = { "key": obj }
d2 = { "key": obj }
d3 = { "key": obj }
d4 = { "key": obj }
gcs = [Thread(target=gc_thread) for _ in range(2)]
mutators = [Thread(target=mutator_thread) for _ in range(4)]
gcs = [Thread(target=gc_thread) for _ in range(NUM_GC)]
mutators = [Thread(target=mutator_thread) for _ in range(NUM_MUTATORS)]
with threading_helper.start_threads(gcs + mutators):
pass

View file

@ -20,11 +20,14 @@ class TestList(TestCase):
def test_racing_iter_append(self):
l = []
def writer_func():
barrier = Barrier(NTHREAD + 1)
def writer_func(l):
barrier.wait()
for i in range(OBJECT_COUNT):
l.append(C(i + OBJECT_COUNT))
def reader_func():
def reader_func(l):
barrier.wait()
while True:
count = len(l)
for i, x in enumerate(l):
@ -32,10 +35,10 @@ class TestList(TestCase):
if count == OBJECT_COUNT:
break
writer = Thread(target=writer_func)
writer = Thread(target=writer_func, args=(l,))
readers = []
for x in range(NTHREAD):
reader = Thread(target=reader_func)
reader = Thread(target=reader_func, args=(l,))
readers.append(reader)
reader.start()
@ -47,11 +50,14 @@ class TestList(TestCase):
def test_racing_iter_extend(self):
l = []
barrier = Barrier(NTHREAD + 1)
def writer_func():
barrier.wait()
for i in range(OBJECT_COUNT):
l.extend([C(i + OBJECT_COUNT)])
def reader_func():
barrier.wait()
while True:
count = len(l)
for i, x in enumerate(l):

View file

@ -8,7 +8,7 @@ import weakref
from sys import monitoring
from test.support import threading_helper
from threading import Thread, _PyRLock
from threading import Thread, _PyRLock, Barrier
from unittest import TestCase
@ -194,7 +194,9 @@ class SetProfileMultiThreaded(InstrumentationMultiThreadedMixin, TestCase):
@threading_helper.requires_working_threading()
class MonitoringMisc(MonitoringTestMixin, TestCase):
def register_callback(self):
def register_callback(self, barrier):
barrier.wait()
def callback(*args):
pass
@ -206,8 +208,9 @@ class MonitoringMisc(MonitoringTestMixin, TestCase):
def test_register_callback(self):
self.refs = []
threads = []
for i in range(50):
t = Thread(target=self.register_callback)
barrier = Barrier(5)
for i in range(5):
t = Thread(target=self.register_callback, args=(barrier,))
t.start()
threads.append(t)

View file

@ -45,26 +45,20 @@ class TestType(TestCase):
class C:
x = 0
DONE = False
def writer_func():
for i in range(3000):
for _ in range(3000):
C.x
C.x
C.x += 1
nonlocal DONE
DONE = True
def reader_func():
while True:
for _ in range(3000):
# We should always see a greater value read from the type than the
# dictionary
a = C.__dict__['x']
b = C.x
self.assertGreaterEqual(b, a)
if DONE:
break
self.run_one(writer_func, reader_func)
def test_attr_cache_consistency_subclass(self):
@ -74,26 +68,20 @@ class TestType(TestCase):
class D(C):
pass
DONE = False
def writer_func():
for i in range(3000):
for _ in range(3000):
D.x
D.x
C.x += 1
nonlocal DONE
DONE = True
def reader_func():
while True:
for _ in range(3000):
# We should always see a greater value read from the type than the
# dictionary
a = C.__dict__['x']
b = D.x
self.assertGreaterEqual(b, a)
if DONE:
break
self.run_one(writer_func, reader_func)
def test___class___modification(self):
@ -140,10 +128,18 @@ class TestType(TestCase):
def run_one(self, writer_func, reader_func):
writer = Thread(target=writer_func)
barrier = threading.Barrier(NTHREADS)
def wrap_target(target):
def wrapper():
barrier.wait()
target()
return wrapper
writer = Thread(target=wrap_target(writer_func))
readers = []
for x in range(30):
reader = Thread(target=reader_func)
for x in range(NTHREADS - 1):
reader = Thread(target=wrap_target(reader_func))
readers.append(reader)
reader.start()

View file

@ -1822,12 +1822,11 @@ dummy_func(
inst(LOAD_DEREF, ( -- value)) {
PyCellObject *cell = (PyCellObject *)PyStackRef_AsPyObjectBorrow(GETLOCAL(oparg));
PyObject *value_o = PyCell_GetRef(cell);
if (value_o == NULL) {
value = _PyCell_GetStackRef(cell);
if (PyStackRef_IsNull(value)) {
_PyEval_FormatExcUnbound(tstate, _PyFrame_GetCode(frame), oparg);
ERROR_IF(true, error);
}
value = PyStackRef_FromPyObjectSteal(value_o);
}
inst(STORE_DEREF, (v --)) {

View file

@ -2479,14 +2479,18 @@
_PyStackRef value;
oparg = CURRENT_OPARG();
PyCellObject *cell = (PyCellObject *)PyStackRef_AsPyObjectBorrow(GETLOCAL(oparg));
PyObject *value_o = PyCell_GetRef(cell);
if (value_o == NULL) {
_PyFrame_SetStackPointer(frame, stack_pointer);
value = _PyCell_GetStackRef(cell);
stack_pointer = _PyFrame_GetStackPointer(frame);
if (PyStackRef_IsNull(value)) {
stack_pointer[0] = value;
stack_pointer += 1;
assert(WITHIN_STACK_BOUNDS());
_PyFrame_SetStackPointer(frame, stack_pointer);
_PyEval_FormatExcUnbound(tstate, _PyFrame_GetCode(frame), oparg);
stack_pointer = _PyFrame_GetStackPointer(frame);
JUMP_TO_ERROR();
}
value = PyStackRef_FromPyObjectSteal(value_o);
stack_pointer[0] = value;
stack_pointer += 1;
assert(WITHIN_STACK_BOUNDS());

View file

@ -8822,14 +8822,18 @@
INSTRUCTION_STATS(LOAD_DEREF);
_PyStackRef value;
PyCellObject *cell = (PyCellObject *)PyStackRef_AsPyObjectBorrow(GETLOCAL(oparg));
PyObject *value_o = PyCell_GetRef(cell);
if (value_o == NULL) {
_PyFrame_SetStackPointer(frame, stack_pointer);
value = _PyCell_GetStackRef(cell);
stack_pointer = _PyFrame_GetStackPointer(frame);
if (PyStackRef_IsNull(value)) {
stack_pointer[0] = value;
stack_pointer += 1;
assert(WITHIN_STACK_BOUNDS());
_PyFrame_SetStackPointer(frame, stack_pointer);
_PyEval_FormatExcUnbound(tstate, _PyFrame_GetCode(frame), oparg);
stack_pointer = _PyFrame_GetStackPointer(frame);
JUMP_TO_LABEL(error);
}
value = PyStackRef_FromPyObjectSteal(value_o);
stack_pointer[0] = value;
stack_pointer += 1;
assert(WITHIN_STACK_BOUNDS());