gh-76785: Expand How Interpreter Queues Handle Interpreter Finalization (gh-116431)

Any cross-interpreter mechanism for passing objects between interpreters must be very careful to respect isolation, even when the object is effectively immutable (e.g. int, str).  Here this especially relates to when an interpreter sends one of its objects, and then is destroyed while the inter-interpreter machinery (e.g. queue) still holds a reference to the object.

When I added interpreters.Queue, I dealt with that case (using an atexit hook) by silently removing all items from the queue that were added by the finalizing interpreter.

Later, while working on concurrent.futures.InterpreterPoolExecutor (gh-116430), I noticed it was somewhat surprising when items were silently removed from the queue when the originating interpreter was destroyed.  (See my comment on that PR.) 
 It took me a little while to realize what was going on.  I expect that users, which much less context than I have, would experience the same pain.

My approach, here, to improving the situation is to give users three options:

1. return a singleton (interpreters.queues.UNBOUND) from Queue.get() in place of each removed item
2. raise an exception (interpreters.queues.ItemInterpreterDestroyed) from Queue.get() in place of each removed item
3. existing behavior: silently remove each item (i.e. Queue.get() skips each one)

The default will now be (1), but users can still explicitly opt in any of them, including to the silent removal behavior.

The behavior for each item may be set with the corresponding Queue.put() call. and a queue-wide default may be set when the queue is created.  (This is the same as I did for "synconly".)
This commit is contained in:
Eric Snow 2024-07-15 12:49:23 -06:00 committed by GitHub
parent 985dd8e17b
commit 6b98b274b6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 513 additions and 90 deletions

View file

@ -58,6 +58,19 @@ _release_xid_data(_PyCrossInterpreterData *data, int flags)
return res;
}
static inline int64_t
_get_interpid(_PyCrossInterpreterData *data)
{
int64_t interpid;
if (data != NULL) {
interpid = _PyCrossInterpreterData_INTERPID(data);
assert(!PyErr_Occurred());
}
else {
interpid = PyInterpreterState_GetID(PyInterpreterState_Get());
}
return interpid;
}
static PyInterpreterState *
_get_current_interp(void)
@ -389,47 +402,98 @@ handle_queue_error(int err, PyObject *mod, int64_t qid)
}
/* unbound items ************************************************************/
#define UNBOUND_REMOVE 1
#define UNBOUND_ERROR 2
#define UNBOUND_REPLACE 3
// It would also be possible to add UNBOUND_REPLACE where the replacement
// value is user-provided. There would be some limitations there, though.
// Another possibility would be something like UNBOUND_COPY, where the
// object is released but the underlying data is copied (with the "raw"
// allocator) and used when the item is popped off the queue.
static int
check_unbound(int unboundop)
{
switch (unboundop) {
case UNBOUND_REMOVE:
case UNBOUND_ERROR:
case UNBOUND_REPLACE:
return 1;
default:
return 0;
}
}
/* the basic queue **********************************************************/
struct _queueitem;
typedef struct _queueitem {
/* The interpreter that added the item to the queue.
The actual bound interpid is found in item->data.
This is necessary because item->data might be NULL,
meaning the interpreter has been destroyed. */
int64_t interpid;
_PyCrossInterpreterData *data;
int fmt;
int unboundop;
struct _queueitem *next;
} _queueitem;
static void
_queueitem_init(_queueitem *item,
_PyCrossInterpreterData *data, int fmt)
int64_t interpid, _PyCrossInterpreterData *data,
int fmt, int unboundop)
{
if (interpid < 0) {
interpid = _get_interpid(data);
}
else {
assert(data == NULL
|| _PyCrossInterpreterData_INTERPID(data) < 0
|| interpid == _PyCrossInterpreterData_INTERPID(data));
}
assert(check_unbound(unboundop));
*item = (_queueitem){
.interpid = interpid,
.data = data,
.fmt = fmt,
.unboundop = unboundop,
};
}
static void
_queueitem_clear_data(_queueitem *item)
{
if (item->data == NULL) {
return;
}
// It was allocated in queue_put().
(void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE);
item->data = NULL;
}
static void
_queueitem_clear(_queueitem *item)
{
item->next = NULL;
if (item->data != NULL) {
// It was allocated in queue_put().
(void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE);
item->data = NULL;
}
_queueitem_clear_data(item);
}
static _queueitem *
_queueitem_new(_PyCrossInterpreterData *data, int fmt)
_queueitem_new(int64_t interpid, _PyCrossInterpreterData *data,
int fmt, int unboundop)
{
_queueitem *item = GLOBAL_MALLOC(_queueitem);
if (item == NULL) {
PyErr_NoMemory();
return NULL;
}
_queueitem_init(item, data, fmt);
_queueitem_init(item, interpid, data, fmt, unboundop);
return item;
}
@ -452,15 +516,44 @@ _queueitem_free_all(_queueitem *item)
static void
_queueitem_popped(_queueitem *item,
_PyCrossInterpreterData **p_data, int *p_fmt)
_PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop)
{
*p_data = item->data;
*p_fmt = item->fmt;
*p_unboundop = item->unboundop;
// We clear them here, so they won't be released in _queueitem_clear().
item->data = NULL;
_queueitem_free(item);
}
static int
_queueitem_clear_interpreter(_queueitem *item)
{
assert(item->interpid >= 0);
if (item->data == NULL) {
// Its interpreter was already cleared (or it was never bound).
// For UNBOUND_REMOVE it should have been freed at that time.
assert(item->unboundop != UNBOUND_REMOVE);
return 0;
}
assert(_PyCrossInterpreterData_INTERPID(item->data) == item->interpid);
switch (item->unboundop) {
case UNBOUND_REMOVE:
// The caller must free/clear it.
return 1;
case UNBOUND_ERROR:
case UNBOUND_REPLACE:
// We won't need the cross-interpreter data later
// so we completely throw it away.
_queueitem_clear_data(item);
return 0;
default:
Py_FatalError("not reachable");
return -1;
}
}
/* the queue */
@ -474,12 +567,16 @@ typedef struct _queue {
_queueitem *first;
_queueitem *last;
} items;
int fmt;
struct {
int fmt;
int unboundop;
} defaults;
} _queue;
static int
_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt)
_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop)
{
assert(check_unbound(unboundop));
PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) {
return ERR_QUEUE_ALLOC;
@ -490,7 +587,10 @@ _queue_init(_queue *queue, Py_ssize_t maxsize, int fmt)
.items = {
.maxsize = maxsize,
},
.fmt = fmt,
.defaults = {
.fmt = fmt,
.unboundop = unboundop,
},
};
return 0;
}
@ -571,7 +671,8 @@ _queue_unlock(_queue *queue)
}
static int
_queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt)
_queue_add(_queue *queue, int64_t interpid, _PyCrossInterpreterData *data,
int fmt, int unboundop)
{
int err = _queue_lock(queue);
if (err < 0) {
@ -587,7 +688,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt)
return ERR_QUEUE_FULL;
}
_queueitem *item = _queueitem_new(data, fmt);
_queueitem *item = _queueitem_new(interpid, data, fmt, unboundop);
if (item == NULL) {
_queue_unlock(queue);
return -1;
@ -608,7 +709,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt)
static int
_queue_next(_queue *queue,
_PyCrossInterpreterData **p_data, int *p_fmt)
_PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop)
{
int err = _queue_lock(queue);
if (err < 0) {
@ -627,7 +728,7 @@ _queue_next(_queue *queue,
}
queue->items.count -= 1;
_queueitem_popped(item, p_data, p_fmt);
_queueitem_popped(item, p_data, p_fmt, p_unboundop);
_queue_unlock(queue);
return 0;
@ -692,14 +793,17 @@ _queue_clear_interpreter(_queue *queue, int64_t interpid)
while (next != NULL) {
_queueitem *item = next;
next = item->next;
if (_PyCrossInterpreterData_INTERPID(item->data) == interpid) {
int remove = (item->interpid == interpid)
? _queueitem_clear_interpreter(item)
: 0;
if (remove) {
_queueitem_free(item);
if (prev == NULL) {
queue->items.first = item->next;
queue->items.first = next;
}
else {
prev->next = item->next;
prev->next = next;
}
_queueitem_free(item);
queue->items.count -= 1;
}
else {
@ -966,18 +1070,19 @@ finally:
return res;
}
struct queue_id_and_fmt {
struct queue_id_and_info {
int64_t id;
int fmt;
int unboundop;
};
static struct queue_id_and_fmt *
_queues_list_all(_queues *queues, int64_t *count)
static struct queue_id_and_info *
_queues_list_all(_queues *queues, int64_t *p_count)
{
struct queue_id_and_fmt *qids = NULL;
struct queue_id_and_info *qids = NULL;
PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
struct queue_id_and_fmt *ids = PyMem_NEW(struct queue_id_and_fmt,
(Py_ssize_t)(queues->count));
struct queue_id_and_info *ids = PyMem_NEW(struct queue_id_and_info,
(Py_ssize_t)(queues->count));
if (ids == NULL) {
goto done;
}
@ -985,9 +1090,10 @@ _queues_list_all(_queues *queues, int64_t *count)
for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
ids[i].id = ref->qid;
assert(ref->queue != NULL);
ids[i].fmt = ref->queue->fmt;
ids[i].fmt = ref->queue->defaults.fmt;
ids[i].unboundop = ref->queue->defaults.unboundop;
}
*count = queues->count;
*p_count = queues->count;
qids = ids;
done:
@ -1021,13 +1127,13 @@ _queue_free(_queue *queue)
// Create a new queue.
static int64_t
queue_create(_queues *queues, Py_ssize_t maxsize, int fmt)
queue_create(_queues *queues, Py_ssize_t maxsize, int fmt, int unboundop)
{
_queue *queue = GLOBAL_MALLOC(_queue);
if (queue == NULL) {
return ERR_QUEUE_ALLOC;
}
int err = _queue_init(queue, maxsize, fmt);
int err = _queue_init(queue, maxsize, fmt, unboundop);
if (err < 0) {
GLOBAL_FREE(queue);
return (int64_t)err;
@ -1056,7 +1162,7 @@ queue_destroy(_queues *queues, int64_t qid)
// Push an object onto the queue.
static int
queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt)
queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop)
{
// Look up the queue.
_queue *queue = NULL;
@ -1077,9 +1183,12 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt)
GLOBAL_FREE(data);
return -1;
}
assert(_PyCrossInterpreterData_INTERPID(data) == \
PyInterpreterState_GetID(PyInterpreterState_Get()));
// Add the data to the queue.
int res = _queue_add(queue, data, fmt);
int64_t interpid = -1; // _queueitem_init() will set it.
int res = _queue_add(queue, interpid, data, fmt, unboundop);
_queue_unmark_waiter(queue, queues->mutex);
if (res != 0) {
// We may chain an exception here:
@ -1094,7 +1203,8 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt)
// Pop the next object off the queue. Fail if empty.
// XXX Support a "wait" mutex?
static int
queue_get(_queues *queues, int64_t qid, PyObject **res, int *p_fmt)
queue_get(_queues *queues, int64_t qid,
PyObject **res, int *p_fmt, int *p_unboundop)
{
int err;
*res = NULL;
@ -1110,7 +1220,7 @@ queue_get(_queues *queues, int64_t qid, PyObject **res, int *p_fmt)
// Pop off the next item from the queue.
_PyCrossInterpreterData *data = NULL;
err = _queue_next(queue, &data, p_fmt);
err = _queue_next(queue, &data, p_fmt, p_unboundop);
_queue_unmark_waiter(queue, queues->mutex);
if (err != 0) {
return err;
@ -1397,15 +1507,22 @@ qidarg_converter(PyObject *arg, void *ptr)
static PyObject *
queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"maxsize", "fmt", NULL};
static char *kwlist[] = {"maxsize", "fmt", "unboundop", NULL};
Py_ssize_t maxsize;
int fmt;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "ni:create", kwlist,
&maxsize, &fmt)) {
int unboundop;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "nii:create", kwlist,
&maxsize, &fmt, &unboundop))
{
return NULL;
}
if (!check_unbound(unboundop)) {
PyErr_Format(PyExc_ValueError,
"unsupported unboundop %d", unboundop);
return NULL;
}
int64_t qid = queue_create(&_globals.queues, maxsize, fmt);
int64_t qid = queue_create(&_globals.queues, maxsize, fmt, unboundop);
if (qid < 0) {
(void)handle_queue_error((int)qid, self, qid);
return NULL;
@ -1427,7 +1544,7 @@ queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
}
PyDoc_STRVAR(queuesmod_create_doc,
"create(maxsize, fmt) -> qid\n\
"create(maxsize, fmt, unboundop) -> qid\n\
\n\
Create a new cross-interpreter queue and return its unique generated ID.\n\
It is a new reference as though bind() had been called on the queue.\n\
@ -1463,9 +1580,9 @@ static PyObject *
queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
{
int64_t count = 0;
struct queue_id_and_fmt *qids = _queues_list_all(&_globals.queues, &count);
struct queue_id_and_info *qids = _queues_list_all(&_globals.queues, &count);
if (qids == NULL) {
if (count == 0) {
if (!PyErr_Occurred() && count == 0) {
return PyList_New(0);
}
return NULL;
@ -1474,9 +1591,10 @@ queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
if (ids == NULL) {
goto finally;
}
struct queue_id_and_fmt *cur = qids;
struct queue_id_and_info *cur = qids;
for (int64_t i=0; i < count; cur++, i++) {
PyObject *item = Py_BuildValue("Li", cur->id, cur->fmt);
PyObject *item = Py_BuildValue("Lii", cur->id, cur->fmt,
cur->unboundop);
if (item == NULL) {
Py_SETREF(ids, NULL);
break;
@ -1498,18 +1616,26 @@ Each corresponding default format is also included.");
static PyObject *
queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"qid", "obj", "fmt", NULL};
static char *kwlist[] = {"qid", "obj", "fmt", "unboundop", NULL};
qidarg_converter_data qidarg;
PyObject *obj;
int fmt;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oi:put", kwlist,
qidarg_converter, &qidarg, &obj, &fmt)) {
int unboundop;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oii:put", kwlist,
qidarg_converter, &qidarg, &obj, &fmt,
&unboundop))
{
return NULL;
}
int64_t qid = qidarg.id;
if (!check_unbound(unboundop)) {
PyErr_Format(PyExc_ValueError,
"unsupported unboundop %d", unboundop);
return NULL;
}
/* Queue up the object. */
int err = queue_put(&_globals.queues, qid, obj, fmt);
int err = queue_put(&_globals.queues, qid, obj, fmt, unboundop);
// This is the only place that raises QueueFull.
if (handle_queue_error(err, self, qid)) {
return NULL;
@ -1536,13 +1662,17 @@ queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds)
PyObject *obj = NULL;
int fmt = 0;
int err = queue_get(&_globals.queues, qid, &obj, &fmt);
int unboundop = 0;
int err = queue_get(&_globals.queues, qid, &obj, &fmt, &unboundop);
// This is the only place that raises QueueEmpty.
if (handle_queue_error(err, self, qid)) {
return NULL;
}
PyObject *res = Py_BuildValue("Oi", obj, fmt);
if (obj == NULL) {
return Py_BuildValue("Oii", Py_None, fmt, unboundop);
}
PyObject *res = Py_BuildValue("OiO", obj, fmt, Py_None);
Py_DECREF(obj);
return res;
}
@ -1656,17 +1786,12 @@ queuesmod_get_queue_defaults(PyObject *self, PyObject *args, PyObject *kwds)
if (handle_queue_error(err, self, qid)) {
return NULL;
}
int fmt = queue->fmt;
int fmt = queue->defaults.fmt;
int unboundop = queue->defaults.unboundop;
_queue_unmark_waiter(queue, _globals.queues.mutex);
PyObject *fmt_obj = PyLong_FromLong(fmt);
if (fmt_obj == NULL) {
return NULL;
}
// For now queues only have one default.
PyObject *res = PyTuple_Pack(1, fmt_obj);
Py_DECREF(fmt_obj);
return res;
PyObject *defaults = Py_BuildValue("ii", fmt, unboundop);
return defaults;
}
PyDoc_STRVAR(queuesmod_get_queue_defaults_doc,