Review Python thread state management in blocking pools (#561)

This commit is contained in:
Giovanni Barillari 2025-04-11 18:09:55 +02:00 committed by GitHub
parent 1ebedeed37
commit d9fe22cf35
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -60,11 +60,7 @@ impl BlockingRunnerMono {
pub fn new() -> Self {
let (qtx, qrx) = channel::unbounded();
let ret = Self { queue: qtx };
#[cfg(not(Py_GIL_DISABLED))]
thread::spawn(move || blocking_worker(qrx));
#[cfg(Py_GIL_DISABLED)]
thread::spawn(move || Python::with_gil(|py| blocking_worker(qrx, py)));
ret
}
@ -104,10 +100,7 @@ impl BlockingRunnerPool {
};
// always spawn the first thread
#[cfg(not(Py_GIL_DISABLED))]
thread::spawn(move || blocking_worker(qrx));
#[cfg(Py_GIL_DISABLED)]
thread::spawn(move || Python::with_gil(|py| blocking_worker(qrx, py)));
ret
}
@ -129,20 +122,11 @@ impl BlockingRunnerPool {
let queue = self.tq.clone();
let tcount = self.threads.clone();
let timeout = self.idle_timeout;
#[cfg(not(Py_GIL_DISABLED))]
thread::spawn(move || {
tcount.fetch_add(1, atomic::Ordering::Release);
blocking_worker_idle(queue, timeout);
tcount.fetch_sub(1, atomic::Ordering::Release);
});
#[cfg(Py_GIL_DISABLED)]
thread::spawn(move || {
Python::with_gil(|py| {
tcount.fetch_add(1, atomic::Ordering::Release);
blocking_worker_idle(queue, timeout, py);
tcount.fetch_sub(1, atomic::Ordering::Release);
});
});
self.spawn_tick
.store(self.birth.elapsed().as_micros() as u64, atomic::Ordering::Relaxed);
@ -162,37 +146,18 @@ impl BlockingRunnerPool {
}
}
#[cfg(not(Py_GIL_DISABLED))]
fn blocking_worker(queue: channel::Receiver<BlockingTask>) {
while let Ok(task) = queue.recv() {
Python::with_gil(|py| task.run(py));
}
Python::with_gil(|py| {
while let Ok(task) = py.allow_threads(|| queue.recv()) {
task.run(py);
}
});
}
#[cfg(Py_GIL_DISABLED)]
fn blocking_worker(queue: channel::Receiver<BlockingTask>, py: Python) {
while let Ok(task) = queue.recv() {
task.run(py);
}
}
#[cfg(not(Py_GIL_DISABLED))]
fn blocking_worker_idle(queue: channel::Receiver<BlockingTask>, timeout: time::Duration) {
while let Ok(task) = queue.recv_timeout(timeout) {
Python::with_gil(|py| task.run(py));
}
}
// NOTE: for some reason, on no-gil callback watchers are not GCd until following req.
// This seems to cause the underlying function to hang on exiting the
// `Python::with_gil` block under some circumstances (seems correlated to no of threads).
// It's not clear atm wether this is an issue with pyo3, CPython itself, or smth
// different in terms of pointers due to multi-threaded environment.
// Given the whole free-threaded Python support is experimental, we avoid strange
// hacks and wait for additional info.
#[cfg(Py_GIL_DISABLED)]
fn blocking_worker_idle(queue: channel::Receiver<BlockingTask>, timeout: time::Duration, py: Python) {
while let Ok(task) = queue.recv_timeout(timeout) {
task.run(py);
}
Python::with_gil(|py| {
while let Ok(task) = py.allow_threads(|| queue.recv_timeout(timeout)) {
task.run(py);
}
});
}