gh-115103: Implement delayed free mechanism for free-threaded builds (#115367)

This adds `_PyMem_FreeDelayed()` and supporting functions. The
`_PyMem_FreeDelayed()` function frees memory with the same allocator as
`PyMem_Free()`, but after some delay to ensure that concurrent lock-free
readers have finished.
This commit is contained in:
Sam Gross 2024-02-20 13:04:37 -05:00 committed by GitHub
parent d207c7cd5a
commit e3ad6ca56f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 226 additions and 0 deletions

View file

@ -948,6 +948,196 @@ _PyMem_Strdup(const char *str)
return copy;
}
/***********************************************/
/* Delayed freeing support for Py_GIL_DISABLED */
/***********************************************/
// So that sizeof(struct _mem_work_chunk) is 4096 bytes on 64-bit platforms.
#define WORK_ITEMS_PER_CHUNK 254
// A pointer to be freed once the QSBR read sequence reaches qsbr_goal.
struct _mem_work_item {
void *ptr;
uint64_t qsbr_goal;
};
// A fixed-size buffer of pointers to be freed
struct _mem_work_chunk {
// Linked list node of chunks in queue
struct llist_node node;
Py_ssize_t rd_idx; // index of next item to read
Py_ssize_t wr_idx; // index of next item to write
struct _mem_work_item array[WORK_ITEMS_PER_CHUNK];
};
void
_PyMem_FreeDelayed(void *ptr)
{
#ifndef Py_GIL_DISABLED
PyMem_Free(ptr);
#else
if (_PyRuntime.stoptheworld.world_stopped) {
// Free immediately if the world is stopped, including during
// interpreter shutdown.
PyMem_Free(ptr);
return;
}
_PyThreadStateImpl *tstate = (_PyThreadStateImpl *)_PyThreadState_GET();
struct llist_node *head = &tstate->mem_free_queue;
struct _mem_work_chunk *buf = NULL;
if (!llist_empty(head)) {
// Try to re-use the last buffer
buf = llist_data(head->prev, struct _mem_work_chunk, node);
if (buf->wr_idx == WORK_ITEMS_PER_CHUNK) {
// already full
buf = NULL;
}
}
if (buf == NULL) {
buf = PyMem_Calloc(1, sizeof(*buf));
if (buf != NULL) {
llist_insert_tail(head, &buf->node);
}
}
if (buf == NULL) {
// failed to allocate a buffer, free immediately
_PyEval_StopTheWorld(tstate->base.interp);
PyMem_Free(ptr);
_PyEval_StartTheWorld(tstate->base.interp);
return;
}
assert(buf != NULL && buf->wr_idx < WORK_ITEMS_PER_CHUNK);
uint64_t seq = _Py_qsbr_deferred_advance(tstate->qsbr);
buf->array[buf->wr_idx].ptr = ptr;
buf->array[buf->wr_idx].qsbr_goal = seq;
buf->wr_idx++;
if (buf->wr_idx == WORK_ITEMS_PER_CHUNK) {
_PyMem_ProcessDelayed((PyThreadState *)tstate);
}
#endif
}
static struct _mem_work_chunk *
work_queue_first(struct llist_node *head)
{
return llist_data(head->next, struct _mem_work_chunk, node);
}
static void
process_queue(struct llist_node *head, struct _qsbr_thread_state *qsbr,
bool keep_empty)
{
while (!llist_empty(head)) {
struct _mem_work_chunk *buf = work_queue_first(head);
while (buf->rd_idx < buf->wr_idx) {
struct _mem_work_item *item = &buf->array[buf->rd_idx];
if (!_Py_qsbr_poll(qsbr, item->qsbr_goal)) {
return;
}
PyMem_Free(item->ptr);
buf->rd_idx++;
}
assert(buf->rd_idx == buf->wr_idx);
if (keep_empty && buf->node.next == head) {
// Keep the last buffer in the queue to reduce re-allocations
buf->rd_idx = buf->wr_idx = 0;
return;
}
llist_remove(&buf->node);
PyMem_Free(buf);
}
}
static void
process_interp_queue(struct _Py_mem_interp_free_queue *queue,
struct _qsbr_thread_state *qsbr)
{
if (!_Py_atomic_load_int_relaxed(&queue->has_work)) {
return;
}
// Try to acquire the lock, but don't block if it's already held.
if (_PyMutex_LockTimed(&queue->mutex, 0, 0) == PY_LOCK_ACQUIRED) {
process_queue(&queue->head, qsbr, false);
int more_work = !llist_empty(&queue->head);
_Py_atomic_store_int_relaxed(&queue->has_work, more_work);
PyMutex_Unlock(&queue->mutex);
}
}
void
_PyMem_ProcessDelayed(PyThreadState *tstate)
{
PyInterpreterState *interp = tstate->interp;
_PyThreadStateImpl *tstate_impl = (_PyThreadStateImpl *)tstate;
// Process thread-local work
process_queue(&tstate_impl->mem_free_queue, tstate_impl->qsbr, true);
// Process shared interpreter work
process_interp_queue(&interp->mem_free_queue, tstate_impl->qsbr);
}
void
_PyMem_AbandonDelayed(PyThreadState *tstate)
{
PyInterpreterState *interp = tstate->interp;
struct llist_node *queue = &((_PyThreadStateImpl *)tstate)->mem_free_queue;
if (llist_empty(queue)) {
return;
}
// Check if the queue contains one empty buffer
struct _mem_work_chunk *buf = work_queue_first(queue);
if (buf->rd_idx == buf->wr_idx) {
llist_remove(&buf->node);
PyMem_Free(buf);
assert(llist_empty(queue));
return;
}
// Merge the thread's work queue into the interpreter's work queue.
PyMutex_Lock(&interp->mem_free_queue.mutex);
llist_concat(&interp->mem_free_queue.head, queue);
_Py_atomic_store_int_relaxed(&interp->mem_free_queue.has_work, 1);
PyMutex_Unlock(&interp->mem_free_queue.mutex);
assert(llist_empty(queue)); // the thread's queue is now empty
}
void
_PyMem_FiniDelayed(PyInterpreterState *interp)
{
struct llist_node *head = &interp->mem_free_queue.head;
while (!llist_empty(head)) {
struct _mem_work_chunk *buf = work_queue_first(head);
while (buf->rd_idx < buf->wr_idx) {
// Free the remaining items immediately. There should be no other
// threads accessing the memory at this point during shutdown.
struct _mem_work_item *item = &buf->array[buf->rd_idx];
PyMem_Free(item->ptr);
buf->rd_idx++;
}
llist_remove(&buf->node);
PyMem_Free(buf);
}
}
/**************************/
/* the "object" allocator */