mirror of
https://github.com/python/cpython.git
synced 2025-08-04 17:08:35 +00:00
gh-130794: Process interpreter QSBR queue in _PyMem_AbandonDelayed. (gh-130808)
This avoids a case where the interpreter's queue of memory to be freed could grow rapidly if there are many short lived threads.
This commit is contained in:
parent
cb67b44ca9
commit
2f6e0e9f70
3 changed files with 25 additions and 9 deletions
|
@ -0,0 +1,2 @@
|
|||
Fix memory leak in the :term:`free threaded <free threading>` build when
|
||||
resizing a shared list or dictionary from multiple short-lived threads.
|
|
@ -1303,6 +1303,18 @@ static void
|
|||
process_interp_queue(struct _Py_mem_interp_free_queue *queue,
|
||||
struct _qsbr_thread_state *qsbr, delayed_dealloc_cb cb,
|
||||
void *state)
|
||||
{
|
||||
assert(PyMutex_IsLocked(&queue->mutex));
|
||||
process_queue(&queue->head, qsbr, false, cb, state);
|
||||
|
||||
int more_work = !llist_empty(&queue->head);
|
||||
_Py_atomic_store_int_relaxed(&queue->has_work, more_work);
|
||||
}
|
||||
|
||||
static void
|
||||
maybe_process_interp_queue(struct _Py_mem_interp_free_queue *queue,
|
||||
struct _qsbr_thread_state *qsbr, delayed_dealloc_cb cb,
|
||||
void *state)
|
||||
{
|
||||
if (!_Py_atomic_load_int_relaxed(&queue->has_work)) {
|
||||
return;
|
||||
|
@ -1310,11 +1322,7 @@ process_interp_queue(struct _Py_mem_interp_free_queue *queue,
|
|||
|
||||
// Try to acquire the lock, but don't block if it's already held.
|
||||
if (_PyMutex_LockTimed(&queue->mutex, 0, 0) == PY_LOCK_ACQUIRED) {
|
||||
process_queue(&queue->head, qsbr, false, cb, state);
|
||||
|
||||
int more_work = !llist_empty(&queue->head);
|
||||
_Py_atomic_store_int_relaxed(&queue->has_work, more_work);
|
||||
|
||||
process_interp_queue(queue, qsbr, cb, state);
|
||||
PyMutex_Unlock(&queue->mutex);
|
||||
}
|
||||
}
|
||||
|
@ -1329,7 +1337,7 @@ _PyMem_ProcessDelayed(PyThreadState *tstate)
|
|||
process_queue(&tstate_impl->mem_free_queue, tstate_impl->qsbr, true, NULL, NULL);
|
||||
|
||||
// Process shared interpreter work
|
||||
process_interp_queue(&interp->mem_free_queue, tstate_impl->qsbr, NULL, NULL);
|
||||
maybe_process_interp_queue(&interp->mem_free_queue, tstate_impl->qsbr, NULL, NULL);
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -1342,7 +1350,7 @@ _PyMem_ProcessDelayedNoDealloc(PyThreadState *tstate, delayed_dealloc_cb cb, voi
|
|||
process_queue(&tstate_impl->mem_free_queue, tstate_impl->qsbr, true, cb, state);
|
||||
|
||||
// Process shared interpreter work
|
||||
process_interp_queue(&interp->mem_free_queue, tstate_impl->qsbr, cb, state);
|
||||
maybe_process_interp_queue(&interp->mem_free_queue, tstate_impl->qsbr, cb, state);
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -1364,10 +1372,15 @@ _PyMem_AbandonDelayed(PyThreadState *tstate)
|
|||
return;
|
||||
}
|
||||
|
||||
// Merge the thread's work queue into the interpreter's work queue.
|
||||
PyMutex_Lock(&interp->mem_free_queue.mutex);
|
||||
|
||||
// Merge the thread's work queue into the interpreter's work queue.
|
||||
llist_concat(&interp->mem_free_queue.head, queue);
|
||||
_Py_atomic_store_int_relaxed(&interp->mem_free_queue.has_work, 1);
|
||||
|
||||
// Process the merged queue now (see gh-130794).
|
||||
_PyThreadStateImpl *this_tstate = (_PyThreadStateImpl *)_PyThreadState_GET();
|
||||
process_interp_queue(&interp->mem_free_queue, this_tstate->qsbr, NULL, NULL);
|
||||
|
||||
PyMutex_Unlock(&interp->mem_free_queue.mutex);
|
||||
|
||||
assert(llist_empty(queue)); // the thread's queue is now empty
|
||||
|
|
|
@ -161,6 +161,7 @@ bool
|
|||
_Py_qsbr_poll(struct _qsbr_thread_state *qsbr, uint64_t goal)
|
||||
{
|
||||
assert(_Py_atomic_load_int_relaxed(&_PyThreadState_GET()->state) == _Py_THREAD_ATTACHED);
|
||||
assert(((_PyThreadStateImpl *)_PyThreadState_GET())->qsbr == qsbr);
|
||||
|
||||
if (_Py_qbsr_goal_reached(qsbr, goal)) {
|
||||
return true;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue