mirror of
https://github.com/python/cpython.git
synced 2025-08-30 21:48:47 +00:00
gh-108724: Add PyMutex and _PyParkingLot APIs (gh-109344)
PyMutex is a one byte lock with fast, inlineable lock and unlock functions for the common uncontended case. The design is based on WebKit's WTF::Lock. PyMutex is built using the _PyParkingLot APIs, which provides a cross-platform futex-like API (based on WebKit's WTF::ParkingLot). This internal API will be used for building other synchronization primitives used to implement PEP 703, such as one-time initialization and events. This also includes tests and a mini benchmark in Tools/lockbench/lockbench.py to compare with the existing PyThread_type_lock. Uncontended acquisition + release: * Linux (x86-64): PyMutex: 11 ns, PyThread_type_lock: 44 ns * macOS (arm64): PyMutex: 13 ns, PyThread_type_lock: 18 ns * Windows (x86-64): PyMutex: 13 ns, PyThread_type_lock: 38 ns PR Overview: The primary purpose of this PR is to implement PyMutex, but there are a number of support pieces (described below). * PyMutex: A 1-byte lock that doesn't require memory allocation to initialize and is generally faster than the existing PyThread_type_lock. The API is internal only for now. * _PyParking_Lot: A futex-like API based on the API of the same name in WebKit. Used to implement PyMutex. * _PyRawMutex: A word sized lock used to implement _PyParking_Lot. * PyEvent: A one time event. This was used a bunch in the "nogil" fork and is useful for testing the PyMutex implementation, so I've included it as part of the PR. * pycore_llist.h: Defines common operations on doubly-linked list. Not strictly necessary (could do the list operations manually), but they come up frequently in the "nogil" fork. ( Similar to https://man.freebsd.org/cgi/man.cgi?queue) --------- Co-authored-by: Eric Snow <ericsnowcurrently@gmail.com>
This commit is contained in:
parent
0a31ff0050
commit
0c89056fe5
29 changed files with 1665 additions and 21 deletions
297
Python/lock.c
Normal file
297
Python/lock.c
Normal file
|
@ -0,0 +1,297 @@
|
|||
// Lock implementation
|
||||
|
||||
#include "Python.h"
|
||||
|
||||
#include "pycore_lock.h"
|
||||
#include "pycore_parking_lot.h"
|
||||
#include "pycore_semaphore.h"
|
||||
|
||||
#ifdef MS_WINDOWS
|
||||
#define WIN32_LEAN_AND_MEAN
|
||||
#include <windows.h> // SwitchToThread()
|
||||
#elif defined(HAVE_SCHED_H)
|
||||
#include <sched.h> // sched_yield()
|
||||
#endif
|
||||
|
||||
// If a thread waits on a lock for longer than TIME_TO_BE_FAIR_NS (1 ms), then
|
||||
// the unlocking thread directly hands off ownership of the lock. This avoids
|
||||
// starvation.
|
||||
static const _PyTime_t TIME_TO_BE_FAIR_NS = 1000*1000;
|
||||
|
||||
// Spin for a bit before parking the thread. This is only enabled for
|
||||
// `--disable-gil` builds because it is unlikely to be helpful if the GIL is
|
||||
// enabled.
|
||||
#if Py_NOGIL
|
||||
static const int MAX_SPIN_COUNT = 40;
|
||||
#else
|
||||
static const int MAX_SPIN_COUNT = 0;
|
||||
#endif
|
||||
|
||||
struct mutex_entry {
|
||||
// The time after which the unlocking thread should hand off lock ownership
|
||||
// directly to the waiting thread. Written by the waiting thread.
|
||||
_PyTime_t time_to_be_fair;
|
||||
|
||||
// Set to 1 if the lock was handed off. Written by the unlocking thread.
|
||||
int handed_off;
|
||||
};
|
||||
|
||||
static void
|
||||
_Py_yield(void)
|
||||
{
|
||||
#ifdef MS_WINDOWS
|
||||
SwitchToThread();
|
||||
#elif defined(HAVE_SCHED_H)
|
||||
sched_yield();
|
||||
#endif
|
||||
}
|
||||
|
||||
void
|
||||
_PyMutex_LockSlow(PyMutex *m)
|
||||
{
|
||||
_PyMutex_LockTimed(m, -1, _PY_LOCK_DETACH);
|
||||
}
|
||||
|
||||
PyLockStatus
|
||||
_PyMutex_LockTimed(PyMutex *m, _PyTime_t timeout, _PyLockFlags flags)
|
||||
{
|
||||
uint8_t v = _Py_atomic_load_uint8_relaxed(&m->v);
|
||||
if ((v & _Py_LOCKED) == 0) {
|
||||
if (_Py_atomic_compare_exchange_uint8(&m->v, &v, v|_Py_LOCKED)) {
|
||||
return PY_LOCK_ACQUIRED;
|
||||
}
|
||||
}
|
||||
else if (timeout == 0) {
|
||||
return PY_LOCK_FAILURE;
|
||||
}
|
||||
|
||||
_PyTime_t now = _PyTime_GetMonotonicClock();
|
||||
_PyTime_t endtime = 0;
|
||||
if (timeout > 0) {
|
||||
endtime = _PyTime_Add(now, timeout);
|
||||
}
|
||||
|
||||
struct mutex_entry entry = {
|
||||
.time_to_be_fair = now + TIME_TO_BE_FAIR_NS,
|
||||
.handed_off = 0,
|
||||
};
|
||||
|
||||
Py_ssize_t spin_count = 0;
|
||||
for (;;) {
|
||||
if ((v & _Py_LOCKED) == 0) {
|
||||
// The lock is unlocked. Try to grab it.
|
||||
if (_Py_atomic_compare_exchange_uint8(&m->v, &v, v|_Py_LOCKED)) {
|
||||
return PY_LOCK_ACQUIRED;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!(v & _Py_HAS_PARKED) && spin_count < MAX_SPIN_COUNT) {
|
||||
// Spin for a bit.
|
||||
_Py_yield();
|
||||
spin_count++;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (timeout == 0) {
|
||||
return PY_LOCK_FAILURE;
|
||||
}
|
||||
|
||||
uint8_t newv = v;
|
||||
if (!(v & _Py_HAS_PARKED)) {
|
||||
// We are the first waiter. Set the _Py_HAS_PARKED flag.
|
||||
newv = v | _Py_HAS_PARKED;
|
||||
if (!_Py_atomic_compare_exchange_uint8(&m->v, &v, newv)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
int ret = _PyParkingLot_Park(&m->v, &newv, sizeof(newv), timeout,
|
||||
&entry, (flags & _PY_LOCK_DETACH) != 0);
|
||||
if (ret == Py_PARK_OK) {
|
||||
if (entry.handed_off) {
|
||||
// We own the lock now.
|
||||
assert(_Py_atomic_load_uint8_relaxed(&m->v) & _Py_LOCKED);
|
||||
return PY_LOCK_ACQUIRED;
|
||||
}
|
||||
}
|
||||
else if (ret == Py_PARK_INTR && (flags & _PY_LOCK_HANDLE_SIGNALS)) {
|
||||
if (Py_MakePendingCalls() < 0) {
|
||||
return PY_LOCK_INTR;
|
||||
}
|
||||
}
|
||||
else if (ret == Py_PARK_TIMEOUT) {
|
||||
assert(timeout >= 0);
|
||||
return PY_LOCK_FAILURE;
|
||||
}
|
||||
|
||||
if (timeout > 0) {
|
||||
timeout = _PyDeadline_Get(endtime);
|
||||
if (timeout <= 0) {
|
||||
// Avoid negative values because those mean block forever.
|
||||
timeout = 0;
|
||||
}
|
||||
}
|
||||
|
||||
v = _Py_atomic_load_uint8_relaxed(&m->v);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
mutex_unpark(PyMutex *m, struct mutex_entry *entry, int has_more_waiters)
|
||||
{
|
||||
uint8_t v = 0;
|
||||
if (entry) {
|
||||
_PyTime_t now = _PyTime_GetMonotonicClock();
|
||||
int should_be_fair = now > entry->time_to_be_fair;
|
||||
|
||||
entry->handed_off = should_be_fair;
|
||||
if (should_be_fair) {
|
||||
v |= _Py_LOCKED;
|
||||
}
|
||||
if (has_more_waiters) {
|
||||
v |= _Py_HAS_PARKED;
|
||||
}
|
||||
}
|
||||
_Py_atomic_store_uint8(&m->v, v);
|
||||
}
|
||||
|
||||
int
|
||||
_PyMutex_TryUnlock(PyMutex *m)
|
||||
{
|
||||
uint8_t v = _Py_atomic_load_uint8(&m->v);
|
||||
for (;;) {
|
||||
if ((v & _Py_LOCKED) == 0) {
|
||||
// error: the mutex is not locked
|
||||
return -1;
|
||||
}
|
||||
else if ((v & _Py_HAS_PARKED)) {
|
||||
// wake up a single thread
|
||||
_PyParkingLot_Unpark(&m->v, (_Py_unpark_fn_t *)mutex_unpark, m);
|
||||
return 0;
|
||||
}
|
||||
else if (_Py_atomic_compare_exchange_uint8(&m->v, &v, _Py_UNLOCKED)) {
|
||||
// fast-path: no waiters
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
_PyMutex_UnlockSlow(PyMutex *m)
|
||||
{
|
||||
if (_PyMutex_TryUnlock(m) < 0) {
|
||||
Py_FatalError("unlocking mutex that is not locked");
|
||||
}
|
||||
}
|
||||
|
||||
// _PyRawMutex stores a linked list of `struct raw_mutex_entry`, one for each
|
||||
// thread waiting on the mutex, directly in the mutex itself.
|
||||
struct raw_mutex_entry {
|
||||
struct raw_mutex_entry *next;
|
||||
_PySemaphore sema;
|
||||
};
|
||||
|
||||
void
|
||||
_PyRawMutex_LockSlow(_PyRawMutex *m)
|
||||
{
|
||||
struct raw_mutex_entry waiter;
|
||||
_PySemaphore_Init(&waiter.sema);
|
||||
|
||||
uintptr_t v = _Py_atomic_load_uintptr(&m->v);
|
||||
for (;;) {
|
||||
if ((v & _Py_LOCKED) == 0) {
|
||||
// Unlocked: try to grab it (even if it has a waiter).
|
||||
if (_Py_atomic_compare_exchange_uintptr(&m->v, &v, v|_Py_LOCKED)) {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// Locked: try to add ourselves as a waiter.
|
||||
waiter.next = (struct raw_mutex_entry *)(v & ~1);
|
||||
uintptr_t desired = ((uintptr_t)&waiter)|_Py_LOCKED;
|
||||
if (!_Py_atomic_compare_exchange_uintptr(&m->v, &v, desired)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Wait for us to be woken up. Note that we still have to lock the
|
||||
// mutex ourselves: it is NOT handed off to us.
|
||||
_PySemaphore_Wait(&waiter.sema, -1, /*detach=*/0);
|
||||
}
|
||||
|
||||
_PySemaphore_Destroy(&waiter.sema);
|
||||
}
|
||||
|
||||
void
|
||||
_PyRawMutex_UnlockSlow(_PyRawMutex *m)
|
||||
{
|
||||
uintptr_t v = _Py_atomic_load_uintptr(&m->v);
|
||||
for (;;) {
|
||||
if ((v & _Py_LOCKED) == 0) {
|
||||
Py_FatalError("unlocking mutex that is not locked");
|
||||
}
|
||||
|
||||
struct raw_mutex_entry *waiter = (struct raw_mutex_entry *)(v & ~1);
|
||||
if (waiter) {
|
||||
uintptr_t next_waiter = (uintptr_t)waiter->next;
|
||||
if (_Py_atomic_compare_exchange_uintptr(&m->v, &v, next_waiter)) {
|
||||
_PySemaphore_Wakeup(&waiter->sema);
|
||||
return;
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (_Py_atomic_compare_exchange_uintptr(&m->v, &v, _Py_UNLOCKED)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
_PyEvent_Notify(PyEvent *evt)
|
||||
{
|
||||
uintptr_t v = _Py_atomic_exchange_uint8(&evt->v, _Py_LOCKED);
|
||||
if (v == _Py_UNLOCKED) {
|
||||
// no waiters
|
||||
return;
|
||||
}
|
||||
else if (v == _Py_LOCKED) {
|
||||
// event already set
|
||||
return;
|
||||
}
|
||||
else {
|
||||
assert(v == _Py_HAS_PARKED);
|
||||
_PyParkingLot_UnparkAll(&evt->v);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
PyEvent_Wait(PyEvent *evt)
|
||||
{
|
||||
while (!PyEvent_WaitTimed(evt, -1))
|
||||
;
|
||||
}
|
||||
|
||||
int
|
||||
PyEvent_WaitTimed(PyEvent *evt, _PyTime_t timeout_ns)
|
||||
{
|
||||
for (;;) {
|
||||
uint8_t v = _Py_atomic_load_uint8(&evt->v);
|
||||
if (v == _Py_LOCKED) {
|
||||
// event already set
|
||||
return 1;
|
||||
}
|
||||
if (v == _Py_UNLOCKED) {
|
||||
if (!_Py_atomic_compare_exchange_uint8(&evt->v, &v, _Py_HAS_PARKED)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
uint8_t expected = _Py_HAS_PARKED;
|
||||
(void) _PyParkingLot_Park(&evt->v, &expected, sizeof(evt->v),
|
||||
timeout_ns, NULL, 1);
|
||||
|
||||
return _Py_atomic_load_uint8(&evt->v) == _Py_LOCKED;
|
||||
}
|
||||
}
|
370
Python/parking_lot.c
Normal file
370
Python/parking_lot.c
Normal file
|
@ -0,0 +1,370 @@
|
|||
#include "Python.h"
|
||||
|
||||
#include "pycore_llist.h"
|
||||
#include "pycore_lock.h" // _PyRawMutex
|
||||
#include "pycore_parking_lot.h"
|
||||
#include "pycore_pyerrors.h" // _Py_FatalErrorFormat
|
||||
#include "pycore_pystate.h" // _PyThreadState_GET
|
||||
#include "pycore_semaphore.h" // _PySemaphore
|
||||
|
||||
#include <stdbool.h>
|
||||
|
||||
|
||||
typedef struct {
|
||||
// The mutex protects the waiter queue and the num_waiters counter.
|
||||
_PyRawMutex mutex;
|
||||
|
||||
// Linked list of `struct wait_entry` waiters in this bucket.
|
||||
struct llist_node root;
|
||||
size_t num_waiters;
|
||||
} Bucket;
|
||||
|
||||
struct wait_entry {
|
||||
void *park_arg;
|
||||
uintptr_t addr;
|
||||
_PySemaphore sema;
|
||||
struct llist_node node;
|
||||
bool is_unparking;
|
||||
};
|
||||
|
||||
// Prime number to avoid correlations with memory addresses.
|
||||
// We want this to be roughly proportional to the number of CPU cores
|
||||
// to minimize contention on the bucket locks, but not too big to avoid
|
||||
// wasting memory. The exact choice does not matter much.
|
||||
#define NUM_BUCKETS 257
|
||||
|
||||
#define BUCKET_INIT(b, i) [i] = { .root = LLIST_INIT(b[i].root) }
|
||||
#define BUCKET_INIT_2(b, i) BUCKET_INIT(b, i), BUCKET_INIT(b, i+1)
|
||||
#define BUCKET_INIT_4(b, i) BUCKET_INIT_2(b, i), BUCKET_INIT_2(b, i+2)
|
||||
#define BUCKET_INIT_8(b, i) BUCKET_INIT_4(b, i), BUCKET_INIT_4(b, i+4)
|
||||
#define BUCKET_INIT_16(b, i) BUCKET_INIT_8(b, i), BUCKET_INIT_8(b, i+8)
|
||||
#define BUCKET_INIT_32(b, i) BUCKET_INIT_16(b, i), BUCKET_INIT_16(b, i+16)
|
||||
#define BUCKET_INIT_64(b, i) BUCKET_INIT_32(b, i), BUCKET_INIT_32(b, i+32)
|
||||
#define BUCKET_INIT_128(b, i) BUCKET_INIT_64(b, i), BUCKET_INIT_64(b, i+64)
|
||||
#define BUCKET_INIT_256(b, i) BUCKET_INIT_128(b, i), BUCKET_INIT_128(b, i+128)
|
||||
|
||||
// Table of waiters (hashed by address)
|
||||
static Bucket buckets[NUM_BUCKETS] = {
|
||||
BUCKET_INIT_256(buckets, 0),
|
||||
BUCKET_INIT(buckets, 256),
|
||||
};
|
||||
|
||||
void
|
||||
_PySemaphore_Init(_PySemaphore *sema)
|
||||
{
|
||||
#if defined(MS_WINDOWS)
|
||||
sema->platform_sem = CreateSemaphore(
|
||||
NULL, // attributes
|
||||
0, // initial count
|
||||
10, // maximum count
|
||||
NULL // unnamed
|
||||
);
|
||||
if (!sema->platform_sem) {
|
||||
Py_FatalError("parking_lot: CreateSemaphore failed");
|
||||
}
|
||||
#elif defined(_Py_USE_SEMAPHORES)
|
||||
if (sem_init(&sema->platform_sem, /*pshared=*/0, /*value=*/0) < 0) {
|
||||
Py_FatalError("parking_lot: sem_init failed");
|
||||
}
|
||||
#else
|
||||
if (pthread_mutex_init(&sema->mutex, NULL) != 0) {
|
||||
Py_FatalError("parking_lot: pthread_mutex_init failed");
|
||||
}
|
||||
if (pthread_cond_init(&sema->cond, NULL)) {
|
||||
Py_FatalError("parking_lot: pthread_cond_init failed");
|
||||
}
|
||||
sema->counter = 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
void
|
||||
_PySemaphore_Destroy(_PySemaphore *sema)
|
||||
{
|
||||
#if defined(MS_WINDOWS)
|
||||
CloseHandle(sema->platform_sem);
|
||||
#elif defined(_Py_USE_SEMAPHORES)
|
||||
sem_destroy(&sema->platform_sem);
|
||||
#else
|
||||
pthread_mutex_destroy(&sema->mutex);
|
||||
pthread_cond_destroy(&sema->cond);
|
||||
#endif
|
||||
}
|
||||
|
||||
static int
|
||||
_PySemaphore_PlatformWait(_PySemaphore *sema, _PyTime_t timeout)
|
||||
{
|
||||
int res;
|
||||
#if defined(MS_WINDOWS)
|
||||
DWORD wait;
|
||||
DWORD millis = 0;
|
||||
if (timeout < 0) {
|
||||
millis = INFINITE;
|
||||
}
|
||||
else {
|
||||
millis = (DWORD) (timeout / 1000000);
|
||||
}
|
||||
wait = WaitForSingleObjectEx(sema->platform_sem, millis, FALSE);
|
||||
if (wait == WAIT_OBJECT_0) {
|
||||
res = Py_PARK_OK;
|
||||
}
|
||||
else if (wait == WAIT_TIMEOUT) {
|
||||
res = Py_PARK_TIMEOUT;
|
||||
}
|
||||
else {
|
||||
res = Py_PARK_INTR;
|
||||
}
|
||||
#elif defined(_Py_USE_SEMAPHORES)
|
||||
int err;
|
||||
if (timeout >= 0) {
|
||||
struct timespec ts;
|
||||
|
||||
_PyTime_t deadline = _PyTime_Add(_PyTime_GetSystemClock(), timeout);
|
||||
_PyTime_AsTimespec(deadline, &ts);
|
||||
|
||||
err = sem_timedwait(&sema->platform_sem, &ts);
|
||||
}
|
||||
else {
|
||||
err = sem_wait(&sema->platform_sem);
|
||||
}
|
||||
if (err == -1) {
|
||||
err = errno;
|
||||
if (err == EINTR) {
|
||||
res = Py_PARK_INTR;
|
||||
}
|
||||
else if (err == ETIMEDOUT) {
|
||||
res = Py_PARK_TIMEOUT;
|
||||
}
|
||||
else {
|
||||
_Py_FatalErrorFormat(__func__,
|
||||
"unexpected error from semaphore: %d",
|
||||
err);
|
||||
}
|
||||
}
|
||||
else {
|
||||
res = Py_PARK_OK;
|
||||
}
|
||||
#else
|
||||
pthread_mutex_lock(&sema->mutex);
|
||||
int err = 0;
|
||||
if (sema->counter == 0) {
|
||||
if (timeout >= 0) {
|
||||
struct timespec ts;
|
||||
|
||||
_PyTime_t deadline = _PyTime_Add(_PyTime_GetSystemClock(), timeout);
|
||||
_PyTime_AsTimespec(deadline, &ts);
|
||||
|
||||
err = pthread_cond_timedwait(&sema->cond, &sema->mutex, &ts);
|
||||
}
|
||||
else {
|
||||
err = pthread_cond_wait(&sema->cond, &sema->mutex);
|
||||
}
|
||||
}
|
||||
if (sema->counter > 0) {
|
||||
sema->counter--;
|
||||
res = Py_PARK_OK;
|
||||
}
|
||||
else if (err) {
|
||||
res = Py_PARK_TIMEOUT;
|
||||
}
|
||||
else {
|
||||
res = Py_PARK_INTR;
|
||||
}
|
||||
pthread_mutex_unlock(&sema->mutex);
|
||||
#endif
|
||||
return res;
|
||||
}
|
||||
|
||||
int
|
||||
_PySemaphore_Wait(_PySemaphore *sema, _PyTime_t timeout, int detach)
|
||||
{
|
||||
PyThreadState *tstate = NULL;
|
||||
if (detach) {
|
||||
tstate = _PyThreadState_GET();
|
||||
if (tstate) {
|
||||
PyEval_ReleaseThread(tstate);
|
||||
}
|
||||
}
|
||||
|
||||
int res = _PySemaphore_PlatformWait(sema, timeout);
|
||||
|
||||
if (detach && tstate) {
|
||||
PyEval_AcquireThread(tstate);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
void
|
||||
_PySemaphore_Wakeup(_PySemaphore *sema)
|
||||
{
|
||||
#if defined(MS_WINDOWS)
|
||||
if (!ReleaseSemaphore(sema->platform_sem, 1, NULL)) {
|
||||
Py_FatalError("parking_lot: ReleaseSemaphore failed");
|
||||
}
|
||||
#elif defined(_Py_USE_SEMAPHORES)
|
||||
int err = sem_post(&sema->platform_sem);
|
||||
if (err != 0) {
|
||||
Py_FatalError("parking_lot: sem_post failed");
|
||||
}
|
||||
#else
|
||||
pthread_mutex_lock(&sema->mutex);
|
||||
sema->counter++;
|
||||
pthread_cond_signal(&sema->cond);
|
||||
pthread_mutex_unlock(&sema->mutex);
|
||||
#endif
|
||||
}
|
||||
|
||||
static void
|
||||
enqueue(Bucket *bucket, const void *address, struct wait_entry *wait)
|
||||
{
|
||||
llist_insert_tail(&bucket->root, &wait->node);
|
||||
++bucket->num_waiters;
|
||||
}
|
||||
|
||||
static struct wait_entry *
|
||||
dequeue(Bucket *bucket, const void *address)
|
||||
{
|
||||
// find the first waiter that is waiting on `address`
|
||||
struct llist_node *root = &bucket->root;
|
||||
struct llist_node *node;
|
||||
llist_for_each(node, root) {
|
||||
struct wait_entry *wait = llist_data(node, struct wait_entry, node);
|
||||
if (wait->addr == (uintptr_t)address) {
|
||||
llist_remove(node);
|
||||
--bucket->num_waiters;
|
||||
return wait;
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void
|
||||
dequeue_all(Bucket *bucket, const void *address, struct llist_node *dst)
|
||||
{
|
||||
// remove and append all matching waiters to dst
|
||||
struct llist_node *root = &bucket->root;
|
||||
struct llist_node *node;
|
||||
llist_for_each_safe(node, root) {
|
||||
struct wait_entry *wait = llist_data(node, struct wait_entry, node);
|
||||
if (wait->addr == (uintptr_t)address) {
|
||||
llist_remove(node);
|
||||
llist_insert_tail(dst, node);
|
||||
--bucket->num_waiters;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Checks that `*addr == *expected` (only works for 1, 2, 4, or 8 bytes)
|
||||
static int
|
||||
atomic_memcmp(const void *addr, const void *expected, size_t addr_size)
|
||||
{
|
||||
switch (addr_size) {
|
||||
case 1: return _Py_atomic_load_uint8(addr) == *(const uint8_t *)expected;
|
||||
case 2: return _Py_atomic_load_uint16(addr) == *(const uint16_t *)expected;
|
||||
case 4: return _Py_atomic_load_uint32(addr) == *(const uint32_t *)expected;
|
||||
case 8: return _Py_atomic_load_uint64(addr) == *(const uint64_t *)expected;
|
||||
default: Py_UNREACHABLE();
|
||||
}
|
||||
}
|
||||
|
||||
int
|
||||
_PyParkingLot_Park(const void *addr, const void *expected, size_t size,
|
||||
_PyTime_t timeout_ns, void *park_arg, int detach)
|
||||
{
|
||||
struct wait_entry wait = {
|
||||
.park_arg = park_arg,
|
||||
.addr = (uintptr_t)addr,
|
||||
.is_unparking = false,
|
||||
};
|
||||
|
||||
Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS];
|
||||
|
||||
_PyRawMutex_Lock(&bucket->mutex);
|
||||
if (!atomic_memcmp(addr, expected, size)) {
|
||||
_PyRawMutex_Unlock(&bucket->mutex);
|
||||
return Py_PARK_AGAIN;
|
||||
}
|
||||
_PySemaphore_Init(&wait.sema);
|
||||
enqueue(bucket, addr, &wait);
|
||||
_PyRawMutex_Unlock(&bucket->mutex);
|
||||
|
||||
int res = _PySemaphore_Wait(&wait.sema, timeout_ns, detach);
|
||||
if (res == Py_PARK_OK) {
|
||||
goto done;
|
||||
}
|
||||
|
||||
// timeout or interrupt
|
||||
_PyRawMutex_Lock(&bucket->mutex);
|
||||
if (wait.is_unparking) {
|
||||
_PyRawMutex_Unlock(&bucket->mutex);
|
||||
// Another thread has started to unpark us. Wait until we process the
|
||||
// wakeup signal.
|
||||
do {
|
||||
res = _PySemaphore_Wait(&wait.sema, -1, detach);
|
||||
} while (res != Py_PARK_OK);
|
||||
goto done;
|
||||
}
|
||||
else {
|
||||
llist_remove(&wait.node);
|
||||
--bucket->num_waiters;
|
||||
}
|
||||
_PyRawMutex_Unlock(&bucket->mutex);
|
||||
|
||||
done:
|
||||
_PySemaphore_Destroy(&wait.sema);
|
||||
return res;
|
||||
|
||||
}
|
||||
|
||||
void
|
||||
_PyParkingLot_Unpark(const void *addr, _Py_unpark_fn_t *fn, void *arg)
|
||||
{
|
||||
Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS];
|
||||
|
||||
// Find the first waiter that is waiting on `addr`
|
||||
_PyRawMutex_Lock(&bucket->mutex);
|
||||
struct wait_entry *waiter = dequeue(bucket, addr);
|
||||
if (waiter) {
|
||||
waiter->is_unparking = true;
|
||||
|
||||
int has_more_waiters = (bucket->num_waiters > 0);
|
||||
fn(arg, waiter->park_arg, has_more_waiters);
|
||||
}
|
||||
else {
|
||||
fn(arg, NULL, 0);
|
||||
}
|
||||
_PyRawMutex_Unlock(&bucket->mutex);
|
||||
|
||||
if (waiter) {
|
||||
// Wakeup the waiter outside of the bucket lock
|
||||
_PySemaphore_Wakeup(&waiter->sema);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
_PyParkingLot_UnparkAll(const void *addr)
|
||||
{
|
||||
struct llist_node head = LLIST_INIT(head);
|
||||
Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS];
|
||||
|
||||
_PyRawMutex_Lock(&bucket->mutex);
|
||||
dequeue_all(bucket, addr, &head);
|
||||
_PyRawMutex_Unlock(&bucket->mutex);
|
||||
|
||||
struct llist_node *node;
|
||||
llist_for_each_safe(node, &head) {
|
||||
struct wait_entry *waiter = llist_data(node, struct wait_entry, node);
|
||||
llist_remove(node);
|
||||
_PySemaphore_Wakeup(&waiter->sema);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
_PyParkingLot_AfterFork(void)
|
||||
{
|
||||
// After a fork only one thread remains. That thread cannot be blocked
|
||||
// so all entries in the parking lot are for dead threads.
|
||||
memset(buckets, 0, sizeof(buckets));
|
||||
for (Py_ssize_t i = 0; i < NUM_BUCKETS; i++) {
|
||||
llist_init(&buckets[i].root);
|
||||
}
|
||||
}
|
|
@ -9,6 +9,7 @@
|
|||
#include "pycore_frame.h"
|
||||
#include "pycore_initconfig.h" // _PyStatus_OK()
|
||||
#include "pycore_object.h" // _PyType_InitCache()
|
||||
#include "pycore_parking_lot.h" // _PyParkingLot_AfterFork()
|
||||
#include "pycore_pyerrors.h" // _PyErr_Clear()
|
||||
#include "pycore_pylifecycle.h" // _PyAST_Fini()
|
||||
#include "pycore_pymem.h" // _PyMem_SetDefaultAllocator()
|
||||
|
@ -554,6 +555,10 @@ _PyRuntimeState_ReInitThreads(_PyRuntimeState *runtime)
|
|||
|
||||
PyMem_SetAllocator(PYMEM_DOMAIN_RAW, &old_alloc);
|
||||
|
||||
// Clears the parking lot. Any waiting threads are dead. This must be
|
||||
// called before releasing any locks that use the parking lot.
|
||||
_PyParkingLot_AfterFork();
|
||||
|
||||
/* bpo-42540: id_mutex is freed by _PyInterpreterState_Delete, which does
|
||||
* not force the default allocator. */
|
||||
reinit_err += _PyThread_at_fork_reinit(&runtime->interpreters.main->id_mutex);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue