perf: Request cancellation while processing changed files

This commit is contained in:
Lukas Wirth 2025-05-07 08:44:54 +02:00
parent d7e977a8f1
commit 8d9b318a85
4 changed files with 134 additions and 78 deletions

View file

@ -17,6 +17,7 @@ jod-thread = "1.0.0"
crossbeam-channel.workspace = true
itertools.workspace = true
tracing.workspace = true
crossbeam-utils = "0.8.21"
# Think twice before adding anything here
[target.'cfg(unix)'.dependencies]

View file

@ -8,6 +8,7 @@
//! the threading utilities in [`crate::thread`].
use std::{
marker::PhantomData,
panic::{self, UnwindSafe},
sync::{
Arc,
@ -16,8 +17,9 @@ use std::{
};
use crossbeam_channel::{Receiver, Sender};
use crossbeam_utils::sync::WaitGroup;
use super::{Builder, JoinHandle, ThreadIntent};
use crate::thread::{Builder, JoinHandle, ThreadIntent};
pub struct Pool {
// `_handles` is never read: the field is present
@ -79,9 +81,6 @@ impl Pool {
Self { _handles: handles.into_boxed_slice(), extant_tasks, job_sender }
}
/// # Panics
///
/// Panics if job panics
pub fn spawn<F>(&self, intent: ThreadIntent, f: F)
where
F: FnOnce() + Send + UnwindSafe + 'static,
@ -97,6 +96,17 @@ impl Pool {
self.job_sender.send(job).unwrap();
}
pub fn scoped<'pool, 'scope, F, R>(&'pool self, f: F) -> R
where
F: FnOnce(&Scope<'pool, 'scope>) -> R,
{
let wg = WaitGroup::new();
let scope = Scope { pool: self, wg, _marker: PhantomData };
let r = f(&scope);
scope.wg.wait();
r
}
#[must_use]
pub fn len(&self) -> usize {
self.extant_tasks.load(Ordering::SeqCst)
@ -107,3 +117,36 @@ impl Pool {
self.len() == 0
}
}
pub struct Scope<'pool, 'scope> {
pool: &'pool Pool,
wg: WaitGroup,
_marker: PhantomData<fn(&'scope ()) -> &'scope ()>,
}
impl<'scope> Scope<'_, 'scope> {
pub fn spawn<F>(&self, intent: ThreadIntent, f: F)
where
F: 'scope + FnOnce() + Send + UnwindSafe,
{
let wg = self.wg.clone();
let f = Box::new(move || {
if cfg!(debug_assertions) {
intent.assert_is_used_on_current_thread();
}
f();
drop(wg);
});
let job = Job {
requested_intent: intent,
f: unsafe {
std::mem::transmute::<
Box<dyn 'scope + FnOnce() + Send + UnwindSafe>,
Box<dyn 'static + FnOnce() + Send + UnwindSafe>,
>(f)
},
};
self.pool.job_sender.send(job).unwrap();
}
}