mirror of
https://github.com/python/cpython.git
synced 2025-08-04 17:08:35 +00:00
gh-113884: Make queue.SimpleQueue thread-safe when the GIL is disabled (#114161)
* use the ParkingLot API to manage waiting threads * use Argument Clinic's critical section directive to protect queue methods * remove unnecessary overflow check Co-authored-by: Erlend E. Aasland <erlend.aasland@protonmail.com>
This commit is contained in:
parent
441affc9e7
commit
925907ea36
3 changed files with 138 additions and 91 deletions
|
@ -3,8 +3,9 @@
|
|||
#endif
|
||||
|
||||
#include "Python.h"
|
||||
#include "pycore_ceval.h" // _PyEval_MakePendingCalls()
|
||||
#include "pycore_ceval.h" // Py_MakePendingCalls()
|
||||
#include "pycore_moduleobject.h" // _PyModule_GetState()
|
||||
#include "pycore_parking_lot.h"
|
||||
#include "pycore_time.h" // _PyTime_t
|
||||
|
||||
#include <stdbool.h>
|
||||
|
@ -151,7 +152,9 @@ RingBuf_Get(RingBuf *buf)
|
|||
return item;
|
||||
}
|
||||
|
||||
// Returns 0 on success or -1 if the buffer failed to grow
|
||||
// Returns 0 on success or -1 if the buffer failed to grow.
|
||||
//
|
||||
// Steals a reference to item.
|
||||
static int
|
||||
RingBuf_Put(RingBuf *buf, PyObject *item)
|
||||
{
|
||||
|
@ -164,7 +167,7 @@ RingBuf_Put(RingBuf *buf, PyObject *item)
|
|||
return -1;
|
||||
}
|
||||
}
|
||||
buf->items[buf->put_idx] = Py_NewRef(item);
|
||||
buf->items[buf->put_idx] = item;
|
||||
buf->put_idx = (buf->put_idx + 1) % buf->items_cap;
|
||||
buf->num_items++;
|
||||
return 0;
|
||||
|
@ -184,9 +187,13 @@ RingBuf_IsEmpty(RingBuf *buf)
|
|||
|
||||
typedef struct {
|
||||
PyObject_HEAD
|
||||
PyThread_type_lock lock;
|
||||
int locked;
|
||||
|
||||
// Are there threads waiting for items
|
||||
bool has_threads_waiting;
|
||||
|
||||
// Items in the queue
|
||||
RingBuf buf;
|
||||
|
||||
PyObject *weakreflist;
|
||||
} simplequeueobject;
|
||||
|
||||
|
@ -209,12 +216,6 @@ simplequeue_dealloc(simplequeueobject *self)
|
|||
PyTypeObject *tp = Py_TYPE(self);
|
||||
|
||||
PyObject_GC_UnTrack(self);
|
||||
if (self->lock != NULL) {
|
||||
/* Unlock the lock so it's safe to free it */
|
||||
if (self->locked > 0)
|
||||
PyThread_release_lock(self->lock);
|
||||
PyThread_free_lock(self->lock);
|
||||
}
|
||||
(void)simplequeue_clear(self);
|
||||
if (self->weakreflist != NULL)
|
||||
PyObject_ClearWeakRefs((PyObject *) self);
|
||||
|
@ -249,12 +250,6 @@ simplequeue_new_impl(PyTypeObject *type)
|
|||
self = (simplequeueobject *) type->tp_alloc(type, 0);
|
||||
if (self != NULL) {
|
||||
self->weakreflist = NULL;
|
||||
self->lock = PyThread_allocate_lock();
|
||||
if (self->lock == NULL) {
|
||||
Py_DECREF(self);
|
||||
PyErr_SetString(PyExc_MemoryError, "can't allocate lock");
|
||||
return NULL;
|
||||
}
|
||||
if (RingBuf_Init(&self->buf) < 0) {
|
||||
Py_DECREF(self);
|
||||
return NULL;
|
||||
|
@ -264,7 +259,29 @@ simplequeue_new_impl(PyTypeObject *type)
|
|||
return (PyObject *) self;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
bool handed_off;
|
||||
simplequeueobject *queue;
|
||||
PyObject *item;
|
||||
} HandoffData;
|
||||
|
||||
static void
|
||||
maybe_handoff_item(HandoffData *data, PyObject **item, int has_more_waiters)
|
||||
{
|
||||
if (item == NULL) {
|
||||
// No threads were waiting
|
||||
data->handed_off = false;
|
||||
}
|
||||
else {
|
||||
// There was at least one waiting thread, hand off the item
|
||||
*item = data->item;
|
||||
data->handed_off = true;
|
||||
}
|
||||
data->queue->has_threads_waiting = has_more_waiters;
|
||||
}
|
||||
|
||||
/*[clinic input]
|
||||
@critical_section
|
||||
_queue.SimpleQueue.put
|
||||
item: object
|
||||
block: bool = True
|
||||
|
@ -280,21 +297,28 @@ never blocks. They are provided for compatibility with the Queue class.
|
|||
static PyObject *
|
||||
_queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item,
|
||||
int block, PyObject *timeout)
|
||||
/*[clinic end generated code: output=4333136e88f90d8b input=6e601fa707a782d5]*/
|
||||
/*[clinic end generated code: output=4333136e88f90d8b input=a16dbb33363c0fa8]*/
|
||||
{
|
||||
/* BEGIN GIL-protected critical section */
|
||||
if (RingBuf_Put(&self->buf, item) < 0)
|
||||
return NULL;
|
||||
if (self->locked) {
|
||||
/* A get() may be waiting, wake it up */
|
||||
self->locked = 0;
|
||||
PyThread_release_lock(self->lock);
|
||||
HandoffData data = {
|
||||
.handed_off = 0,
|
||||
.item = Py_NewRef(item),
|
||||
.queue = self,
|
||||
};
|
||||
if (self->has_threads_waiting) {
|
||||
// Try to hand the item off directly if there are threads waiting
|
||||
_PyParkingLot_Unpark(&self->has_threads_waiting,
|
||||
(_Py_unpark_fn_t *)maybe_handoff_item, &data);
|
||||
}
|
||||
if (!data.handed_off) {
|
||||
if (RingBuf_Put(&self->buf, item) < 0) {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
/* END GIL-protected critical section */
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
/*[clinic input]
|
||||
@critical_section
|
||||
_queue.SimpleQueue.put_nowait
|
||||
item: object
|
||||
|
||||
|
@ -307,12 +331,23 @@ for compatibility with the Queue class.
|
|||
|
||||
static PyObject *
|
||||
_queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item)
|
||||
/*[clinic end generated code: output=0990536715efb1f1 input=36b1ea96756b2ece]*/
|
||||
/*[clinic end generated code: output=0990536715efb1f1 input=ce949cc2cd8a4119]*/
|
||||
{
|
||||
return _queue_SimpleQueue_put_impl(self, item, 0, Py_None);
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
empty_error(PyTypeObject *cls)
|
||||
{
|
||||
PyObject *module = PyType_GetModule(cls);
|
||||
assert(module != NULL);
|
||||
simplequeue_state *state = simplequeue_get_state(module);
|
||||
PyErr_SetNone(state->EmptyError);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*[clinic input]
|
||||
@critical_section
|
||||
_queue.SimpleQueue.get
|
||||
|
||||
cls: defining_class
|
||||
|
@ -335,23 +370,15 @@ in that case).
|
|||
static PyObject *
|
||||
_queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
|
||||
int block, PyObject *timeout_obj)
|
||||
/*[clinic end generated code: output=5c2cca914cd1e55b input=5b4047bfbc645ec1]*/
|
||||
/*[clinic end generated code: output=5c2cca914cd1e55b input=f7836c65e5839c51]*/
|
||||
{
|
||||
_PyTime_t endtime = 0;
|
||||
_PyTime_t timeout;
|
||||
PyObject *item;
|
||||
PyLockStatus r;
|
||||
PY_TIMEOUT_T microseconds;
|
||||
PyThreadState *tstate = PyThreadState_Get();
|
||||
|
||||
// XXX Use PyThread_ParseTimeoutArg().
|
||||
|
||||
if (block == 0) {
|
||||
/* Non-blocking */
|
||||
microseconds = 0;
|
||||
}
|
||||
else if (timeout_obj != Py_None) {
|
||||
if (block != 0 && !Py_IsNone(timeout_obj)) {
|
||||
/* With timeout */
|
||||
_PyTime_t timeout;
|
||||
if (_PyTime_FromSecondsObject(&timeout,
|
||||
timeout_obj, _PyTime_ROUND_CEILING) < 0) {
|
||||
return NULL;
|
||||
|
@ -361,65 +388,64 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
|
|||
"'timeout' must be a non-negative number");
|
||||
return NULL;
|
||||
}
|
||||
microseconds = _PyTime_AsMicroseconds(timeout,
|
||||
_PyTime_ROUND_CEILING);
|
||||
if (microseconds > PY_TIMEOUT_MAX) {
|
||||
PyErr_SetString(PyExc_OverflowError,
|
||||
"timeout value is too large");
|
||||
return NULL;
|
||||
}
|
||||
endtime = _PyDeadline_Init(timeout);
|
||||
}
|
||||
else {
|
||||
/* Infinitely blocking */
|
||||
microseconds = -1;
|
||||
}
|
||||
|
||||
/* put() signals the queue to be non-empty by releasing the lock.
|
||||
* So we simply try to acquire the lock in a loop, until the condition
|
||||
* (queue non-empty) becomes true.
|
||||
*/
|
||||
while (RingBuf_IsEmpty(&self->buf)) {
|
||||
/* First a simple non-blocking try without releasing the GIL */
|
||||
r = PyThread_acquire_lock_timed(self->lock, 0, 0);
|
||||
if (r == PY_LOCK_FAILURE && microseconds != 0) {
|
||||
Py_BEGIN_ALLOW_THREADS
|
||||
r = PyThread_acquire_lock_timed(self->lock, microseconds, 1);
|
||||
Py_END_ALLOW_THREADS
|
||||
for (;;) {
|
||||
if (!RingBuf_IsEmpty(&self->buf)) {
|
||||
return RingBuf_Get(&self->buf);
|
||||
}
|
||||
|
||||
if (r == PY_LOCK_INTR && _PyEval_MakePendingCalls(tstate) < 0) {
|
||||
return NULL;
|
||||
if (!block) {
|
||||
return empty_error(cls);
|
||||
}
|
||||
if (r == PY_LOCK_FAILURE) {
|
||||
PyObject *module = PyType_GetModule(cls);
|
||||
simplequeue_state *state = simplequeue_get_state(module);
|
||||
/* Timed out */
|
||||
PyErr_SetNone(state->EmptyError);
|
||||
return NULL;
|
||||
}
|
||||
self->locked = 1;
|
||||
|
||||
/* Adjust timeout for next iteration (if any) */
|
||||
if (microseconds > 0) {
|
||||
timeout = _PyDeadline_Get(endtime);
|
||||
microseconds = _PyTime_AsMicroseconds(timeout,
|
||||
_PyTime_ROUND_CEILING);
|
||||
int64_t timeout_ns = -1;
|
||||
if (endtime != 0) {
|
||||
timeout_ns = _PyDeadline_Get(endtime);
|
||||
if (timeout_ns < 0) {
|
||||
return empty_error(cls);
|
||||
}
|
||||
}
|
||||
|
||||
bool waiting = 1;
|
||||
self->has_threads_waiting = waiting;
|
||||
|
||||
PyObject *item = NULL;
|
||||
int st = _PyParkingLot_Park(&self->has_threads_waiting, &waiting,
|
||||
sizeof(bool), timeout_ns, &item,
|
||||
/* detach */ 1);
|
||||
switch (st) {
|
||||
case Py_PARK_OK: {
|
||||
assert(item != NULL);
|
||||
return item;
|
||||
}
|
||||
case Py_PARK_TIMEOUT: {
|
||||
return empty_error(cls);
|
||||
}
|
||||
case Py_PARK_INTR: {
|
||||
// Interrupted
|
||||
if (Py_MakePendingCalls() < 0) {
|
||||
return NULL;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Py_PARK_AGAIN: {
|
||||
// This should be impossible with the current implementation of
|
||||
// PyParkingLot, but would be possible if critical sections /
|
||||
// the GIL were released before the thread was added to the
|
||||
// internal thread queue in the parking lot.
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
Py_UNREACHABLE();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* BEGIN GIL-protected critical section */
|
||||
item = RingBuf_Get(&self->buf);
|
||||
if (self->locked) {
|
||||
PyThread_release_lock(self->lock);
|
||||
self->locked = 0;
|
||||
}
|
||||
/* END GIL-protected critical section */
|
||||
|
||||
return item;
|
||||
}
|
||||
|
||||
/*[clinic input]
|
||||
@critical_section
|
||||
_queue.SimpleQueue.get_nowait
|
||||
|
||||
cls: defining_class
|
||||
|
@ -434,12 +460,13 @@ raise the Empty exception.
|
|||
static PyObject *
|
||||
_queue_SimpleQueue_get_nowait_impl(simplequeueobject *self,
|
||||
PyTypeObject *cls)
|
||||
/*[clinic end generated code: output=620c58e2750f8b8a input=842f732bf04216d3]*/
|
||||
/*[clinic end generated code: output=620c58e2750f8b8a input=d48be63633fefae9]*/
|
||||
{
|
||||
return _queue_SimpleQueue_get_impl(self, cls, 0, Py_None);
|
||||
}
|
||||
|
||||
/*[clinic input]
|
||||
@critical_section
|
||||
_queue.SimpleQueue.empty -> bool
|
||||
|
||||
Return True if the queue is empty, False otherwise (not reliable!).
|
||||
|
@ -447,12 +474,13 @@ Return True if the queue is empty, False otherwise (not reliable!).
|
|||
|
||||
static int
|
||||
_queue_SimpleQueue_empty_impl(simplequeueobject *self)
|
||||
/*[clinic end generated code: output=1a02a1b87c0ef838 input=1a98431c45fd66f9]*/
|
||||
/*[clinic end generated code: output=1a02a1b87c0ef838 input=96cb22df5a67d831]*/
|
||||
{
|
||||
return RingBuf_IsEmpty(&self->buf);
|
||||
}
|
||||
|
||||
/*[clinic input]
|
||||
@critical_section
|
||||
_queue.SimpleQueue.qsize -> Py_ssize_t
|
||||
|
||||
Return the approximate size of the queue (not reliable!).
|
||||
|
@ -460,7 +488,7 @@ Return the approximate size of the queue (not reliable!).
|
|||
|
||||
static Py_ssize_t
|
||||
_queue_SimpleQueue_qsize_impl(simplequeueobject *self)
|
||||
/*[clinic end generated code: output=f9dcd9d0a90e121e input=7a74852b407868a1]*/
|
||||
/*[clinic end generated code: output=f9dcd9d0a90e121e input=e218623cb8c16a79]*/
|
||||
{
|
||||
return RingBuf_Len(&self->buf);
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue