[ty] Abort process if worker thread panics (#18211)

This commit is contained in:
Micha Reiser 2025-05-26 14:09:06 +02:00 committed by GitHub
parent 5d93d619f3
commit 66b082ff71
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 32 additions and 5 deletions

View file

@ -26,6 +26,7 @@ jod-thread = { workspace = true }
lsp-server = { workspace = true }
lsp-types = { workspace = true }
rustc-hash = { workspace = true }
salsa = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
shellexpand = { workspace = true }

View file

@ -13,6 +13,8 @@
//! The thread pool is implemented entirely using
//! the threading utilities in [`crate::server::schedule::thread`].
use crossbeam::channel::{Receiver, Sender};
use std::panic::AssertUnwindSafe;
use std::{
num::NonZeroUsize,
sync::{
@ -21,8 +23,6 @@ use std::{
},
};
use crossbeam::channel::{Receiver, Sender};
use super::{Builder, JoinHandle, ThreadPriority};
pub(crate) struct Pool {
@ -51,8 +51,7 @@ impl Pool {
let threads = usize::from(threads);
// Channel buffer capacity is between 2 and 4, depending on the pool size.
let (job_sender, job_receiver) = crossbeam::channel::bounded(std::cmp::min(threads * 2, 4));
let (job_sender, job_receiver) = crossbeam::channel::bounded(std::cmp::max(threads * 2, 4));
let extant_tasks = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::with_capacity(threads);
@ -71,7 +70,33 @@ impl Pool {
current_priority = job.requested_priority;
}
extant_tasks.fetch_add(1, Ordering::SeqCst);
(job.f)();
// SAFETY: it's safe to assume that `job.f` is unwind safe because we always
// abort the process if it panics.
// Panicking here ensures that we don't swallow errors and is the same as
// what rayon does.
// Any recovery should be implemented outside the thread pool (e.g. when
// dispatching requests/notifications etc).
if let Err(error) = std::panic::catch_unwind(AssertUnwindSafe(job.f)) {
if let Some(msg) = error.downcast_ref::<String>() {
tracing::error!("Worker thread panicked with: {msg}; aborting");
} else if let Some(msg) = error.downcast_ref::<&str>() {
tracing::error!("Worker thread panicked with: {msg}; aborting");
} else if let Some(cancelled) =
error.downcast_ref::<salsa::Cancelled>()
{
tracing::error!(
"Worker thread got cancelled: {cancelled}; aborting"
);
} else {
tracing::error!(
"Worker thread panicked with: {error:?}; aborting"
);
}
std::process::abort();
}
extant_tasks.fetch_sub(1, Ordering::SeqCst);
}
}