From 66b082ff71979f9cc5e10f9522b4588dc9b452ee Mon Sep 17 00:00:00 2001 From: Micha Reiser Date: Mon, 26 May 2025 14:09:06 +0200 Subject: [PATCH] [ty] Abort process if worker thread panics (#18211) --- Cargo.lock | 1 + crates/ty_server/Cargo.toml | 1 + .../src/server/schedule/thread/pool.rs | 35 ++++++++++++++++--- 3 files changed, 32 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bab299de64..206a24a6a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4004,6 +4004,7 @@ dependencies = [ "ruff_source_file", "ruff_text_size", "rustc-hash 2.1.1", + "salsa", "serde", "serde_json", "shellexpand", diff --git a/crates/ty_server/Cargo.toml b/crates/ty_server/Cargo.toml index 0bae064fda..0012420f59 100644 --- a/crates/ty_server/Cargo.toml +++ b/crates/ty_server/Cargo.toml @@ -26,6 +26,7 @@ jod-thread = { workspace = true } lsp-server = { workspace = true } lsp-types = { workspace = true } rustc-hash = { workspace = true } +salsa = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } shellexpand = { workspace = true } diff --git a/crates/ty_server/src/server/schedule/thread/pool.rs b/crates/ty_server/src/server/schedule/thread/pool.rs index 391676ad5b..88abbd8e42 100644 --- a/crates/ty_server/src/server/schedule/thread/pool.rs +++ b/crates/ty_server/src/server/schedule/thread/pool.rs @@ -13,6 +13,8 @@ //! The thread pool is implemented entirely using //! the threading utilities in [`crate::server::schedule::thread`]. +use crossbeam::channel::{Receiver, Sender}; +use std::panic::AssertUnwindSafe; use std::{ num::NonZeroUsize, sync::{ @@ -21,8 +23,6 @@ use std::{ }, }; -use crossbeam::channel::{Receiver, Sender}; - use super::{Builder, JoinHandle, ThreadPriority}; pub(crate) struct Pool { @@ -51,8 +51,7 @@ impl Pool { let threads = usize::from(threads); - // Channel buffer capacity is between 2 and 4, depending on the pool size. - let (job_sender, job_receiver) = crossbeam::channel::bounded(std::cmp::min(threads * 2, 4)); + let (job_sender, job_receiver) = crossbeam::channel::bounded(std::cmp::max(threads * 2, 4)); let extant_tasks = Arc::new(AtomicUsize::new(0)); let mut handles = Vec::with_capacity(threads); @@ -71,7 +70,33 @@ impl Pool { current_priority = job.requested_priority; } extant_tasks.fetch_add(1, Ordering::SeqCst); - (job.f)(); + + // SAFETY: it's safe to assume that `job.f` is unwind safe because we always + // abort the process if it panics. + // Panicking here ensures that we don't swallow errors and is the same as + // what rayon does. + // Any recovery should be implemented outside the thread pool (e.g. when + // dispatching requests/notifications etc). + if let Err(error) = std::panic::catch_unwind(AssertUnwindSafe(job.f)) { + if let Some(msg) = error.downcast_ref::() { + tracing::error!("Worker thread panicked with: {msg}; aborting"); + } else if let Some(msg) = error.downcast_ref::<&str>() { + tracing::error!("Worker thread panicked with: {msg}; aborting"); + } else if let Some(cancelled) = + error.downcast_ref::() + { + tracing::error!( + "Worker thread got cancelled: {cancelled}; aborting" + ); + } else { + tracing::error!( + "Worker thread panicked with: {error:?}; aborting" + ); + } + + std::process::abort(); + } + extant_tasks.fetch_sub(1, Ordering::SeqCst); } }