diff --git a/Cargo.lock b/Cargo.lock index 7980823a85..e7ae42a2d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -408,7 +408,6 @@ dependencies = [ "cargo_metadata", "command-group", "crossbeam-channel", - "jod-thread", "paths", "rustc-hash", "serde", @@ -1278,6 +1277,7 @@ dependencies = [ "paths", "proc-macro-api", "proc-macro-test", + "stdx", "tt", ] @@ -1493,7 +1493,6 @@ dependencies = [ "ide-db", "ide-ssr", "itertools", - "jod-thread", "lsp-server", "lsp-types", "mbe", @@ -1712,6 +1711,7 @@ version = "0.0.0" dependencies = [ "always-assert", "backtrace", + "jod-thread", "libc", "miow", "winapi", @@ -2123,9 +2123,9 @@ name = "vfs-notify" version = "0.0.0" dependencies = [ "crossbeam-channel", - "jod-thread", "notify", "paths", + "stdx", "tracing", "vfs", "walkdir", diff --git a/crates/flycheck/Cargo.toml b/crates/flycheck/Cargo.toml index 1e0b3605b1..3f6671b1c4 100644 --- a/crates/flycheck/Cargo.toml +++ b/crates/flycheck/Cargo.toml @@ -18,7 +18,6 @@ cargo_metadata = "0.15.0" rustc-hash = "1.1.0" serde_json.workspace = true serde.workspace = true -jod-thread = "0.1.2" command-group = "2.0.1" # local deps diff --git a/crates/flycheck/src/lib.rs b/crates/flycheck/src/lib.rs index accb14a51d..a4aa346a1c 100644 --- a/crates/flycheck/src/lib.rs +++ b/crates/flycheck/src/lib.rs @@ -77,7 +77,7 @@ impl fmt::Display for FlycheckConfig { pub struct FlycheckHandle { // XXX: drop order is significant sender: Sender, - _thread: jod_thread::JoinHandle, + _thread: stdx::thread::JoinHandle, id: usize, } @@ -90,7 +90,7 @@ impl FlycheckHandle { ) -> FlycheckHandle { let actor = FlycheckActor::new(id, sender, config, workspace_root); let (sender, receiver) = unbounded::(); - let thread = jod_thread::Builder::new() + let thread = stdx::thread::Builder::new(stdx::thread::QoSClass::Utility) .name("Flycheck".to_owned()) .spawn(move || actor.run(receiver)) .expect("failed to spawn thread"); @@ -395,7 +395,7 @@ struct CargoHandle { /// The handle to the actual cargo process. As we cannot cancel directly from with /// a read syscall dropping and therefore terminating the process is our best option. child: JodGroupChild, - thread: jod_thread::JoinHandle>, + thread: stdx::thread::JoinHandle>, receiver: Receiver, } @@ -409,7 +409,7 @@ impl CargoHandle { let (sender, receiver) = unbounded(); let actor = CargoActor::new(sender, stdout, stderr); - let thread = jod_thread::Builder::new() + let thread = stdx::thread::Builder::new(stdx::thread::QoSClass::Utility) .name("CargoHandle".to_owned()) .spawn(move || actor.run()) .expect("failed to spawn thread"); diff --git a/crates/ide/src/prime_caches.rs b/crates/ide/src/prime_caches.rs index 2962700360..f049a225f0 100644 --- a/crates/ide/src/prime_caches.rs +++ b/crates/ide/src/prime_caches.rs @@ -80,7 +80,11 @@ pub(crate) fn parallel_prime_caches( for _ in 0..num_worker_threads { let worker = prime_caches_worker.clone(); let db = db.snapshot(); - std::thread::spawn(move || Cancelled::catch(|| worker(db))); + + stdx::thread::Builder::new(stdx::thread::QoSClass::Utility) + .allow_leak(true) + .spawn(move || Cancelled::catch(|| worker(db))) + .expect("failed to spawn thread"); } (work_sender, progress_receiver) diff --git a/crates/proc-macro-srv/Cargo.toml b/crates/proc-macro-srv/Cargo.toml index f7f07cfcb2..d5eb157bfe 100644 --- a/crates/proc-macro-srv/Cargo.toml +++ b/crates/proc-macro-srv/Cargo.toml @@ -22,6 +22,7 @@ object = { version = "0.30.2", default-features = false, features = [ libloading = "0.7.3" memmap2 = "0.5.4" +stdx.workspace = true tt.workspace = true mbe.workspace = true paths.workspace = true diff --git a/crates/rust-analyzer/Cargo.toml b/crates/rust-analyzer/Cargo.toml index ae5b8e4c42..3f795340b2 100644 --- a/crates/rust-analyzer/Cargo.toml +++ b/crates/rust-analyzer/Cargo.toml @@ -86,7 +86,6 @@ jemallocator = { version = "0.5.0", package = "tikv-jemallocator", optional = tr [dev-dependencies] expect-test = "1.4.0" -jod-thread = "0.1.2" xshell = "0.2.2" test-utils.workspace = true diff --git a/crates/rust-analyzer/src/bin/main.rs b/crates/rust-analyzer/src/bin/main.rs index 992e174a42..660a780eb0 100644 --- a/crates/rust-analyzer/src/bin/main.rs +++ b/crates/rust-analyzer/src/bin/main.rs @@ -78,7 +78,7 @@ fn try_main(flags: flags::RustAnalyzer) -> Result<()> { println!("rust-analyzer {}", rust_analyzer::version()); return Ok(()); } - with_extra_thread("LspServer", run_server)?; + with_extra_thread("LspServer", stdx::thread::QoSClass::Utility, run_server)?; } flags::RustAnalyzerCmd::Parse(cmd) => cmd.run()?, flags::RustAnalyzerCmd::Symbols(cmd) => cmd.run()?, @@ -136,14 +136,17 @@ const STACK_SIZE: usize = 1024 * 1024 * 8; /// space. fn with_extra_thread( thread_name: impl Into, + qos_class: stdx::thread::QoSClass, f: impl FnOnce() -> Result<()> + Send + 'static, ) -> Result<()> { - let handle = - std::thread::Builder::new().name(thread_name.into()).stack_size(STACK_SIZE).spawn(f)?; - match handle.join() { - Ok(res) => res, - Err(panic) => std::panic::resume_unwind(panic), - } + let handle = stdx::thread::Builder::new(qos_class) + .name(thread_name.into()) + .stack_size(STACK_SIZE) + .spawn(f)?; + + handle.join()?; + + Ok(()) } fn run_server() -> Result<()> { diff --git a/crates/rust-analyzer/src/task_pool.rs b/crates/rust-analyzer/src/task_pool.rs index 616e449984..0c5a4f3055 100644 --- a/crates/rust-analyzer/src/task_pool.rs +++ b/crates/rust-analyzer/src/task_pool.rs @@ -1,5 +1,7 @@ //! A thin wrapper around `ThreadPool` to make sure that we join all things //! properly. +use std::sync::{Arc, Barrier}; + use crossbeam_channel::Sender; pub(crate) struct TaskPool { @@ -16,6 +18,18 @@ impl TaskPool { .thread_stack_size(STACK_SIZE) .num_threads(threads) .build(); + + // Set QoS of all threads in threadpool. + let barrier = Arc::new(Barrier::new(threads + 1)); + for _ in 0..threads { + let barrier = barrier.clone(); + inner.execute(move || { + stdx::thread::set_current_thread_qos_class(stdx::thread::QoSClass::Utility); + barrier.wait(); + }); + } + barrier.wait(); + TaskPool { sender, inner } } @@ -26,7 +40,16 @@ impl TaskPool { { self.inner.execute({ let sender = self.sender.clone(); - move || sender.send(task()).unwrap() + move || { + if stdx::thread::IS_QOS_AVAILABLE { + debug_assert_eq!( + stdx::thread::get_current_thread_qos_class(), + Some(stdx::thread::QoSClass::Utility) + ); + } + + sender.send(task()).unwrap() + } }) } diff --git a/crates/rust-analyzer/tests/slow-tests/support.rs b/crates/rust-analyzer/tests/slow-tests/support.rs index d0eeee189c..33d7f6576c 100644 --- a/crates/rust-analyzer/tests/slow-tests/support.rs +++ b/crates/rust-analyzer/tests/slow-tests/support.rs @@ -155,7 +155,7 @@ pub(crate) fn project(fixture: &str) -> Server { pub(crate) struct Server { req_id: Cell, messages: RefCell>, - _thread: jod_thread::JoinHandle<()>, + _thread: stdx::thread::JoinHandle, client: Connection, /// XXX: remove the tempdir last dir: TestDir, @@ -165,7 +165,7 @@ impl Server { fn new(dir: TestDir, config: Config) -> Server { let (connection, client) = Connection::memory(); - let _thread = jod_thread::Builder::new() + let _thread = stdx::thread::Builder::new(stdx::thread::QoSClass::Utility) .name("test server".to_string()) .spawn(move || main_loop(config, connection).unwrap()) .expect("failed to spawn a thread"); diff --git a/crates/stdx/Cargo.toml b/crates/stdx/Cargo.toml index c881f2fd3f..986e3fcdcf 100644 --- a/crates/stdx/Cargo.toml +++ b/crates/stdx/Cargo.toml @@ -15,6 +15,7 @@ doctest = false libc = "0.2.135" backtrace = { version = "0.3.65", optional = true } always-assert = { version = "0.1.2", features = ["log"] } +jod-thread = "0.1.2" # Think twice before adding anything here [target.'cfg(windows)'.dependencies] diff --git a/crates/stdx/src/lib.rs b/crates/stdx/src/lib.rs index 8df86e8100..24990d6a0e 100644 --- a/crates/stdx/src/lib.rs +++ b/crates/stdx/src/lib.rs @@ -11,6 +11,7 @@ pub mod process; pub mod panic_context; pub mod non_empty_vec; pub mod rand; +pub mod thread; pub use always_assert::{always, never}; diff --git a/crates/stdx/src/thread.rs b/crates/stdx/src/thread.rs new file mode 100644 index 0000000000..2bf9141cbf --- /dev/null +++ b/crates/stdx/src/thread.rs @@ -0,0 +1,200 @@ +//! A utility module for working with threads that automatically joins threads upon drop +//! and provides functionality for interfacing with operating system quality of service (QoS) APIs. +//! +//! As a system, rust-analyzer should have the property that +//! old manual scheduling APIs are replaced entirely by QoS. +//! To maintain this invariant, we panic when it is clear that +//! old scheduling APIs have been used. +//! +//! Moreover, we also want to ensure that every thread has a QoS set explicitly +//! to force a decision about its importance to the system. +//! Thus, [`QoSClass`] has no default value +//! and every entry point to creating a thread requires a [`QoSClass`] upfront. + +use std::fmt; + +pub fn spawn(qos_class: QoSClass, f: F) -> JoinHandle +where + F: FnOnce() -> T, + F: Send + 'static, + T: Send + 'static, +{ + Builder::new(qos_class).spawn(f).expect("failed to spawn thread") +} + +pub struct Builder { + qos_class: QoSClass, + inner: jod_thread::Builder, + allow_leak: bool, +} + +impl Builder { + pub fn new(qos_class: QoSClass) -> Builder { + Builder { qos_class, inner: jod_thread::Builder::new(), allow_leak: false } + } + + pub fn name(self, name: String) -> Builder { + Builder { inner: self.inner.name(name), ..self } + } + + pub fn stack_size(self, size: usize) -> Builder { + Builder { inner: self.inner.stack_size(size), ..self } + } + + pub fn allow_leak(self, b: bool) -> Builder { + Builder { allow_leak: b, ..self } + } + + pub fn spawn(self, f: F) -> std::io::Result> + where + F: FnOnce() -> T, + F: Send + 'static, + T: Send + 'static, + { + let inner_handle = self.inner.spawn(move || { + set_current_thread_qos_class(self.qos_class); + f() + })?; + + Ok(JoinHandle { inner: Some(inner_handle), allow_leak: self.allow_leak }) + } +} + +pub struct JoinHandle { + // `inner` is an `Option` so that we can + // take ownership of the contained `JoinHandle` + // in the `Drop` impl below. + inner: Option>, + allow_leak: bool, +} + +impl JoinHandle { + pub fn join(mut self) -> T { + self.inner.take().unwrap().join() + } +} + +impl Drop for JoinHandle { + fn drop(&mut self) { + if !self.allow_leak { + return; + } + + if let Some(join_handle) = self.inner.take() { + join_handle.detach(); + } + } +} + +impl fmt::Debug for JoinHandle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("JoinHandle { .. }") + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub enum QoSClass { + // Maintain order in priority from least to most. + Background, + Utility, + UserInitiated, + UserInteractive, +} + +#[cfg(target_vendor = "apple")] +pub const IS_QOS_AVAILABLE: bool = true; + +#[cfg(not(target_vendor = "apple"))] +pub const IS_QOS_AVAILABLE: bool = false; + +// All Apple platforms use XNU as their kernel +// and thus have the concept of QoS. +#[cfg(target_vendor = "apple")] +pub fn set_current_thread_qos_class(class: QoSClass) { + let c = match class { + QoSClass::UserInteractive => libc::qos_class_t::QOS_CLASS_USER_INTERACTIVE, + QoSClass::UserInitiated => libc::qos_class_t::QOS_CLASS_USER_INITIATED, + QoSClass::Utility => libc::qos_class_t::QOS_CLASS_UTILITY, + QoSClass::Background => libc::qos_class_t::QOS_CLASS_BACKGROUND, + }; + + let code = unsafe { libc::pthread_set_qos_class_self_np(c, 0) }; + + if code == 0 { + return; + } + + let errno = unsafe { *libc::__error() }; + + match errno { + libc::EPERM => { + // This thread has been excluded from the QoS system + // due to a previous call to a function such as `pthread_setschedparam` + // which is incompatible with QoS. + // + // Let’s just panic here because rust-analyzer as a system + // should have the property that QoS is used consistently + // instead of old manual scheduling management APIs. + panic!("tried to set QoS of thread which has opted out of QoS (os error {errno})") + } + + libc::EINVAL => { + // This is returned if we pass something other than a qos_class_t + // to `pthread_set_qos_class_self_np`. + // + // This is impossible, so again panic. + unreachable!("invalid qos_class_t value was passed to pthread_set_qos_class_self_np") + } + + _ => { + // `pthread_set_qos_class_self_np`’s documentation + // does not mention any other errors. + unreachable!("`pthread_set_qos_class_self_np` returned unexpected error {errno}") + } + } +} + +#[cfg(not(target_vendor = "apple"))] +pub fn set_current_thread_qos_class(class: QoSClass) { + // FIXME: Windows has QoS APIs, we should use them! +} + +#[cfg(target_vendor = "apple")] +pub fn get_current_thread_qos_class() -> Option { + let current_thread = unsafe { libc::pthread_self() }; + let mut qos_class_raw = libc::qos_class_t::QOS_CLASS_UNSPECIFIED; + let code = unsafe { + libc::pthread_get_qos_class_np(current_thread, &mut qos_class_raw, std::ptr::null_mut()) + }; + + if code != 0 { + // `pthread_get_qos_class_np`’s documentation states that + // an error value is placed into errno if the return code is not zero. + // However, it never states what errors are possible. + // Inspecting the source[0] shows that, as of this writing, it always returns zero. + // + // Whatever errors the function could report in future are likely to be + // ones which we cannot handle anyway + // + // 0: https://github.com/apple-oss-distributions/libpthread/blob/67e155c94093be9a204b69637d198eceff2c7c46/src/qos.c#L171-L177 + let errno = unsafe { *libc::__error() }; + unreachable!("`pthread_get_qos_class_np` failed unexpectedly (os error {errno})"); + } + + match qos_class_raw { + libc::qos_class_t::QOS_CLASS_USER_INTERACTIVE => Some(QoSClass::UserInteractive), + libc::qos_class_t::QOS_CLASS_USER_INITIATED => Some(QoSClass::UserInitiated), + libc::qos_class_t::QOS_CLASS_DEFAULT => None, // QoS has never been set + libc::qos_class_t::QOS_CLASS_UTILITY => Some(QoSClass::Utility), + libc::qos_class_t::QOS_CLASS_BACKGROUND => Some(QoSClass::Background), + libc::qos_class_t::QOS_CLASS_UNSPECIFIED => { + // We panic here because rust-analyzer should never use + panic!("tried to get QoS of thread which has opted out of QoS") + } + } +} + +#[cfg(not(target_vendor = "apple"))] +pub fn get_current_thread_qos_class() -> Option { + None +} diff --git a/crates/vfs-notify/Cargo.toml b/crates/vfs-notify/Cargo.toml index e06b98d811..5d61a22728 100644 --- a/crates/vfs-notify/Cargo.toml +++ b/crates/vfs-notify/Cargo.toml @@ -13,10 +13,10 @@ doctest = false [dependencies] tracing = "0.1.35" -jod-thread = "0.1.2" walkdir = "2.3.2" crossbeam-channel = "0.5.5" notify = "5.0" +stdx.workspace = true vfs.workspace = true paths.workspace = true diff --git a/crates/vfs-notify/src/lib.rs b/crates/vfs-notify/src/lib.rs index c95304e55a..26f7a9fc42 100644 --- a/crates/vfs-notify/src/lib.rs +++ b/crates/vfs-notify/src/lib.rs @@ -21,7 +21,7 @@ use walkdir::WalkDir; pub struct NotifyHandle { // Relative order of fields below is significant. sender: Sender, - _thread: jod_thread::JoinHandle, + _thread: stdx::thread::JoinHandle, } #[derive(Debug)] @@ -34,7 +34,7 @@ impl loader::Handle for NotifyHandle { fn spawn(sender: loader::Sender) -> NotifyHandle { let actor = NotifyActor::new(sender); let (sender, receiver) = unbounded::(); - let thread = jod_thread::Builder::new() + let thread = stdx::thread::Builder::new(stdx::thread::QoSClass::Utility) .name("VfsLoader".to_owned()) .spawn(move || actor.run(receiver)) .expect("failed to spawn thread");