gh-101659: Add _Py_AtExit() (gh-103298)

The function is like Py_AtExit() but for a single interpreter.  This is a companion to the atexit module's register() function, taking a C callback instead of a Python one.

We also update the _xxinterpchannels module to use _Py_AtExit(), which is the motivating case.  (This is inspired by pain points felt while working on gh-101660.)
This commit is contained in:
Eric Snow 2023-04-05 18:42:02 -06:00 committed by GitHub
parent 4ec8dd10bd
commit 03089fdccc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 268 additions and 67 deletions

View file

@ -174,19 +174,7 @@ _release_xid_data(_PyCrossInterpreterData *data, int ignoreexc)
}
int res = _PyCrossInterpreterData_Release(data);
if (res < 0) {
// XXX Fix this!
/* The owning interpreter is already destroyed.
* Ideally, this shouldn't ever happen. When an interpreter is
* about to be destroyed, we should clear out all of its objects
* from every channel associated with that interpreter.
* For now we hack around that to resolve refleaks, by decref'ing
* the released object here, even if its the wrong interpreter.
* The owning interpreter has already been destroyed
* so we should be okay, especially since the currently
* shareable types are all very basic, with no GC.
* That said, it becomes much messier once interpreters
* no longer share a GIL, so this needs to be fixed before then. */
_PyCrossInterpreterData_Clear(NULL, data);
/* The owning interpreter is already destroyed. */
if (ignoreexc) {
// XXX Emit a warning?
PyErr_Clear();
@ -489,6 +477,30 @@ _channelqueue_get(_channelqueue *queue)
return _channelitem_popped(item);
}
static void
_channelqueue_drop_interpreter(_channelqueue *queue, int64_t interp)
{
_channelitem *prev = NULL;
_channelitem *next = queue->first;
while (next != NULL) {
_channelitem *item = next;
next = item->next;
if (item->data->interp == interp) {
if (prev == NULL) {
queue->first = item->next;
}
else {
prev->next = item->next;
}
_channelitem_free(item);
queue->count -= 1;
}
else {
prev = item;
}
}
}
/* channel-interpreter associations */
struct _channelend;
@ -693,6 +705,20 @@ _channelends_close_interpreter(_channelends *ends, int64_t interp, int which)
return 0;
}
static void
_channelends_drop_interpreter(_channelends *ends, int64_t interp)
{
_channelend *end;
end = _channelend_find(ends->send, interp, NULL);
if (end != NULL) {
_channelends_close_end(ends, end, 1);
}
end = _channelend_find(ends->recv, interp, NULL);
if (end != NULL) {
_channelends_close_end(ends, end, 0);
}
}
static void
_channelends_close_all(_channelends *ends, int which, int force)
{
@ -841,6 +867,18 @@ done:
return res;
}
static void
_channel_drop_interpreter(_PyChannelState *chan, int64_t interp)
{
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
_channelqueue_drop_interpreter(chan->queue, interp);
_channelends_drop_interpreter(chan->ends, interp);
chan->open = _channelends_is_open(chan->ends);
PyThread_release_lock(chan->mutex);
}
static int
_channel_close_all(_PyChannelState *chan, int end, int force)
{
@ -1213,6 +1251,21 @@ done:
return cids;
}
static void
_channels_drop_interpreter(_channels *channels, int64_t interp)
{
PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
_channelref *ref = channels->head;
for (; ref != NULL; ref = ref->next) {
if (ref->chan != NULL) {
_channel_drop_interpreter(ref->chan, interp);
}
}
PyThread_release_lock(channels->mutex);
}
/* support for closing non-empty channels */
struct _channel_closing {
@ -1932,6 +1985,19 @@ _global_channels(void) {
}
static void
clear_interpreter(void *data)
{
if (_globals.module_count == 0) {
return;
}
PyInterpreterState *interp = (PyInterpreterState *)data;
assert(interp == _get_current_interp());
int64_t id = PyInterpreterState_GetID(interp);
_channels_drop_interpreter(&_globals.channels, id);
}
static PyObject *
channel_create(PyObject *self, PyObject *Py_UNUSED(ignored))
{
@ -2339,6 +2405,10 @@ module_exec(PyObject *mod)
goto error;
}
// Make sure chnnels drop objects owned by this interpreter
PyInterpreterState *interp = _get_current_interp();
_Py_AtExit(interp, clear_interpreter, (void *)interp);
return 0;
error: