[ty] Fix server hang after shutdown request (#18414)

This commit is contained in:
Micha Reiser 2025-06-02 08:57:51 +02:00 committed by GitHub
parent 844c8626c3
commit 1e6d76c878
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 176 additions and 158 deletions

View file

@ -37,10 +37,18 @@ pub fn run_server() -> anyhow::Result<()> {
let io_result = io_threads.join(); let io_result = io_threads.join();
match (server_result, io_result) { let result = match (server_result, io_result) {
(Ok(()), Ok(())) => Ok(()), (Ok(()), Ok(())) => Ok(()),
(Err(server), Err(io)) => Err(server).context(format!("IO thread error: {io}")), (Err(server), Err(io)) => Err(server).context(format!("IO thread error: {io}")),
(Err(server), _) => Err(server), (Err(server), _) => Err(server),
(_, Err(io)) => Err(io).context("IO thread error"), (_, Err(io)) => Err(io).context("IO thread error"),
};
if let Err(err) = result.as_ref() {
tracing::warn!("Server shut down with an error: {err}");
} else {
tracing::info!("Server shut down");
} }
result
} }

View file

@ -1,5 +1,9 @@
//! Scheduling, I/O, and API endpoints. //! Scheduling, I/O, and API endpoints.
use self::schedule::spawn_main_loop;
use crate::PositionEncoding;
use crate::session::{AllSettings, ClientSettings, Experimental, Session};
use lsp_server::Connection;
use lsp_types::{ use lsp_types::{
ClientCapabilities, DiagnosticOptions, DiagnosticServerCapabilities, HoverProviderCapability, ClientCapabilities, DiagnosticOptions, DiagnosticServerCapabilities, HoverProviderCapability,
InlayHintOptions, InlayHintServerCapabilities, MessageType, ServerCapabilities, InlayHintOptions, InlayHintServerCapabilities, MessageType, ServerCapabilities,
@ -8,11 +12,7 @@ use lsp_types::{
}; };
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
use std::panic::PanicHookInfo; use std::panic::PanicHookInfo;
use std::sync::Arc;
use self::connection::Connection;
use self::schedule::spawn_main_loop;
use crate::PositionEncoding;
use crate::session::{AllSettings, ClientSettings, Experimental, Session};
mod api; mod api;
mod connection; mod connection;
@ -66,7 +66,7 @@ impl Server {
// The number 32 was chosen arbitrarily. The main goal was to have enough capacity to queue // The number 32 was chosen arbitrarily. The main goal was to have enough capacity to queue
// some responses before blocking. // some responses before blocking.
let (main_loop_sender, main_loop_receiver) = crossbeam::channel::bounded(32); let (main_loop_sender, main_loop_receiver) = crossbeam::channel::bounded(32);
let client = Client::new(main_loop_sender.clone(), connection.sender()); let client = Client::new(main_loop_sender.clone(), connection.sender.clone());
crate::logging::init_logging( crate::logging::init_logging(
global_settings.tracing.log_level.unwrap_or_default(), global_settings.tracing.log_level.unwrap_or_default(),
@ -130,47 +130,12 @@ impl Server {
} }
pub(crate) fn run(mut self) -> crate::Result<()> { pub(crate) fn run(mut self) -> crate::Result<()> {
type PanicHook = Box<dyn Fn(&PanicHookInfo<'_>) + 'static + Sync + Send>; let client = Client::new(
struct RestorePanicHook { self.main_loop_sender.clone(),
hook: Option<PanicHook>, self.connection.sender.clone(),
} );
impl Drop for RestorePanicHook { let _panic_hook = ServerPanicHookHandler::new(client);
fn drop(&mut self) {
if let Some(hook) = self.hook.take() {
std::panic::set_hook(hook);
}
}
}
// unregister any previously registered panic hook
// The hook will be restored when this function exits.
let _ = RestorePanicHook {
hook: Some(std::panic::take_hook()),
};
let client = Client::new(self.main_loop_sender.clone(), self.connection.sender());
// When we panic, try to notify the client.
std::panic::set_hook(Box::new(move |panic_info| {
use std::io::Write;
let backtrace = std::backtrace::Backtrace::force_capture();
tracing::error!("{panic_info}\n{backtrace}");
// we also need to print to stderr directly for when using `$logTrace` because
// the message won't be sent to the client.
// But don't use `eprintln` because `eprintln` itself may panic if the pipe is broken.
let mut stderr = std::io::stderr().lock();
writeln!(stderr, "{panic_info}\n{backtrace}").ok();
client
.show_message(
"The ty language server exited with a panic. See the logs for more details.",
MessageType::ERROR,
)
.ok();
}));
spawn_main_loop(move || self.main_loop())?.join() spawn_main_loop(move || self.main_loop())?.join()
} }
@ -221,3 +186,63 @@ impl Server {
} }
} }
} }
type PanicHook = Box<dyn Fn(&PanicHookInfo<'_>) + 'static + Sync + Send>;
struct ServerPanicHookHandler {
hook: Option<PanicHook>,
// Hold on to the strong reference for as long as the panic hook is set.
_client: Arc<Client>,
}
impl ServerPanicHookHandler {
fn new(client: Client) -> Self {
let hook = std::panic::take_hook();
let client = Arc::new(client);
// Use a weak reference to the client because it must be dropped when exiting or the
// io-threads join hangs forever (because client has a reference to the connection sender).
let hook_client = Arc::downgrade(&client);
// When we panic, try to notify the client.
std::panic::set_hook(Box::new(move |panic_info| {
use std::io::Write;
let backtrace = std::backtrace::Backtrace::force_capture();
tracing::error!("{panic_info}\n{backtrace}");
// we also need to print to stderr directly for when using `$logTrace` because
// the message won't be sent to the client.
// But don't use `eprintln` because `eprintln` itself may panic if the pipe is broken.
let mut stderr = std::io::stderr().lock();
writeln!(stderr, "{panic_info}\n{backtrace}").ok();
if let Some(client) = hook_client.upgrade() {
client
.show_message(
"The ty language server exited with a panic. See the logs for more details.",
MessageType::ERROR,
)
.ok();
}
}));
Self {
hook: Some(hook),
_client: client,
}
}
}
impl Drop for ServerPanicHookHandler {
fn drop(&mut self) {
if std::thread::panicking() {
// Calling `std::panic::set_hook` while panicking results in a panic.
return;
}
if let Some(hook) = self.hook.take() {
std::panic::set_hook(hook);
}
}
}

View file

@ -49,6 +49,7 @@ pub(super) fn request(req: server::Request) -> Task {
>( >(
req, BackgroundSchedule::LatencySensitive req, BackgroundSchedule::LatencySensitive
), ),
lsp_types::request::Shutdown::METHOD => sync_request_task::<requests::ShutdownHandler>(req),
method => { method => {
tracing::warn!("Received request {method} which does not have a handler"); tracing::warn!("Received request {method} which does not have a handler");
@ -62,7 +63,7 @@ pub(super) fn request(req: server::Request) -> Task {
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
tracing::error!("Encountered error when routing request with ID {id}: {err}"); tracing::error!("Encountered error when routing request with ID {id}: {err}");
Task::local(move |_session, client| { Task::sync(move |_session, client| {
client.show_error_message( client.show_error_message(
"ty failed to handle a request from the editor. Check the logs for more details.", "ty failed to handle a request from the editor. Check the logs for more details.",
); );
@ -82,25 +83,25 @@ pub(super) fn request(req: server::Request) -> Task {
pub(super) fn notification(notif: server::Notification) -> Task { pub(super) fn notification(notif: server::Notification) -> Task {
match notif.method.as_str() { match notif.method.as_str() {
notifications::DidCloseTextDocumentHandler::METHOD => { notifications::DidCloseTextDocumentHandler::METHOD => {
local_notification_task::<notifications::DidCloseTextDocumentHandler>(notif) sync_notification_task::<notifications::DidCloseTextDocumentHandler>(notif)
} }
notifications::DidOpenTextDocumentHandler::METHOD => { notifications::DidOpenTextDocumentHandler::METHOD => {
local_notification_task::<notifications::DidOpenTextDocumentHandler>(notif) sync_notification_task::<notifications::DidOpenTextDocumentHandler>(notif)
} }
notifications::DidChangeTextDocumentHandler::METHOD => { notifications::DidChangeTextDocumentHandler::METHOD => {
local_notification_task::<notifications::DidChangeTextDocumentHandler>(notif) sync_notification_task::<notifications::DidChangeTextDocumentHandler>(notif)
} }
notifications::DidOpenNotebookHandler::METHOD => { notifications::DidOpenNotebookHandler::METHOD => {
local_notification_task::<notifications::DidOpenNotebookHandler>(notif) sync_notification_task::<notifications::DidOpenNotebookHandler>(notif)
} }
notifications::DidCloseNotebookHandler::METHOD => { notifications::DidCloseNotebookHandler::METHOD => {
local_notification_task::<notifications::DidCloseNotebookHandler>(notif) sync_notification_task::<notifications::DidCloseNotebookHandler>(notif)
} }
notifications::DidChangeWatchedFiles::METHOD => { notifications::DidChangeWatchedFiles::METHOD => {
local_notification_task::<notifications::DidChangeWatchedFiles>(notif) sync_notification_task::<notifications::DidChangeWatchedFiles>(notif)
} }
lsp_types::notification::Cancel::METHOD => { lsp_types::notification::Cancel::METHOD => {
local_notification_task::<notifications::CancelNotificationHandler>(notif) sync_notification_task::<notifications::CancelNotificationHandler>(notif)
} }
lsp_types::notification::SetTrace::METHOD => { lsp_types::notification::SetTrace::METHOD => {
tracing::trace!("Ignoring `setTrace` notification"); tracing::trace!("Ignoring `setTrace` notification");
@ -114,7 +115,7 @@ pub(super) fn notification(notif: server::Notification) -> Task {
} }
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
tracing::error!("Encountered error when routing notification: {err}"); tracing::error!("Encountered error when routing notification: {err}");
Task::local(|_session, client| { Task::sync(|_session, client| {
client.show_error_message( client.show_error_message(
"ty failed to handle a notification from the editor. Check the logs for more details." "ty failed to handle a notification from the editor. Check the logs for more details."
); );
@ -122,12 +123,12 @@ pub(super) fn notification(notif: server::Notification) -> Task {
}) })
} }
fn _local_request_task<R: traits::SyncRequestHandler>(req: server::Request) -> Result<Task> fn sync_request_task<R: traits::SyncRequestHandler>(req: server::Request) -> Result<Task>
where where
<<R as RequestHandler>::RequestType as Request>::Params: UnwindSafe, <<R as RequestHandler>::RequestType as Request>::Params: UnwindSafe,
{ {
let (id, params) = cast_request::<R>(req)?; let (id, params) = cast_request::<R>(req)?;
Ok(Task::local(move |session, client: &Client| { Ok(Task::sync(move |session, client: &Client| {
let _span = tracing::debug_span!("request", %id, method = R::METHOD).entered(); let _span = tracing::debug_span!("request", %id, method = R::METHOD).entered();
let result = R::run(session, client, params); let result = R::run(session, client, params);
respond::<R>(&id, result, client); respond::<R>(&id, result, client);
@ -245,11 +246,11 @@ where
} }
} }
fn local_notification_task<N: traits::SyncNotificationHandler>( fn sync_notification_task<N: traits::SyncNotificationHandler>(
notif: server::Notification, notif: server::Notification,
) -> Result<Task> { ) -> Result<Task> {
let (id, params) = cast_notification::<N>(notif)?; let (id, params) = cast_notification::<N>(notif)?;
Ok(Task::local(move |session, client| { Ok(Task::sync(move |session, client| {
let _span = tracing::debug_span!("notification", method = N::METHOD).entered(); let _span = tracing::debug_span!("notification", method = N::METHOD).entered();
if let Err(err) = N::run(session, client, params) { if let Err(err) = N::run(session, client, params) {
tracing::error!("An error occurred while running {id}: {err}"); tracing::error!("An error occurred while running {id}: {err}");

View file

@ -3,9 +3,11 @@ mod diagnostic;
mod goto_type_definition; mod goto_type_definition;
mod hover; mod hover;
mod inlay_hints; mod inlay_hints;
mod shutdown;
pub(super) use completion::CompletionRequestHandler; pub(super) use completion::CompletionRequestHandler;
pub(super) use diagnostic::DocumentDiagnosticRequestHandler; pub(super) use diagnostic::DocumentDiagnosticRequestHandler;
pub(super) use goto_type_definition::GotoTypeDefinitionRequestHandler; pub(super) use goto_type_definition::GotoTypeDefinitionRequestHandler;
pub(super) use hover::HoverRequestHandler; pub(super) use hover::HoverRequestHandler;
pub(super) use inlay_hints::InlayHintRequestHandler; pub(super) use inlay_hints::InlayHintRequestHandler;
pub(super) use shutdown::ShutdownHandler;

View file

@ -0,0 +1,17 @@
use crate::Session;
use crate::server::api::traits::{RequestHandler, SyncRequestHandler};
use crate::session::client::Client;
pub(crate) struct ShutdownHandler;
impl RequestHandler for ShutdownHandler {
type RequestType = lsp_types::request::Shutdown;
}
impl SyncRequestHandler for ShutdownHandler {
fn run(session: &mut Session, _client: &Client, _params: ()) -> crate::server::Result<()> {
tracing::debug!("Received shutdown request, waiting for shutdown notification");
session.set_shutdown_requested(true);
Ok(())
}
}

View file

@ -17,7 +17,6 @@ pub(super) trait RequestHandler {
/// This will block the main message receiver loop, meaning that no /// This will block the main message receiver loop, meaning that no
/// incoming requests or notifications will be handled while `run` is /// incoming requests or notifications will be handled while `run` is
/// executing. Try to avoid doing any I/O or long-running computations. /// executing. Try to avoid doing any I/O or long-running computations.
#[expect(dead_code)]
pub(super) trait SyncRequestHandler: RequestHandler { pub(super) trait SyncRequestHandler: RequestHandler {
fn run( fn run(
session: &mut Session, session: &mut Session,

View file

@ -1,20 +1,12 @@
use lsp_server as lsp; use lsp_server as lsp;
use lsp_types::{notification::Notification, request::Request};
pub(crate) type ConnectionSender = crossbeam::channel::Sender<lsp::Message>; pub(crate) type ConnectionSender = crossbeam::channel::Sender<lsp::Message>;
type ConnectionReceiver = crossbeam::channel::Receiver<lsp::Message>;
/// A builder for `Connection` that handles LSP initialization. /// A builder for `Connection` that handles LSP initialization.
pub(crate) struct ConnectionInitializer { pub(crate) struct ConnectionInitializer {
connection: lsp::Connection, connection: lsp::Connection,
} }
/// Handles inbound and outbound messages with the client.
pub(crate) struct Connection {
sender: ConnectionSender,
receiver: ConnectionReceiver,
}
impl ConnectionInitializer { impl ConnectionInitializer {
/// Create a new LSP server connection over stdin/stdout. /// Create a new LSP server connection over stdin/stdout.
pub(crate) fn stdio() -> (Self, lsp::IoThreads) { pub(crate) fn stdio() -> (Self, lsp::IoThreads) {
@ -40,7 +32,7 @@ impl ConnectionInitializer {
server_capabilities: &lsp_types::ServerCapabilities, server_capabilities: &lsp_types::ServerCapabilities,
name: &str, name: &str,
version: &str, version: &str,
) -> crate::Result<Connection> { ) -> crate::Result<lsp_server::Connection> {
self.connection.initialize_finish( self.connection.initialize_finish(
id, id,
serde_json::json!({ serde_json::json!({
@ -51,76 +43,7 @@ impl ConnectionInitializer {
} }
}), }),
)?; )?;
let Self {
connection: lsp::Connection { sender, receiver },
} = self;
Ok(Connection { sender, receiver })
}
}
impl Connection { Ok(self.connection)
/// Make a new `ClientSender` for sending messages to the client.
pub(super) fn sender(&self) -> ConnectionSender {
self.sender.clone()
}
pub(super) fn send(&self, msg: lsp::Message) -> crate::Result<()> {
self.sender.send(msg)?;
Ok(())
}
/// An iterator over incoming messages from the client.
pub(super) fn incoming(&self) -> &crossbeam::channel::Receiver<lsp::Message> {
&self.receiver
}
/// Check and respond to any incoming shutdown requests; returns`true` if the server should be shutdown.
pub(super) fn handle_shutdown(&self, message: &lsp::Message) -> crate::Result<bool> {
match message {
lsp::Message::Request(lsp::Request { id, method, .. })
if method == lsp_types::request::Shutdown::METHOD =>
{
self.sender
.send(lsp::Response::new_ok(id.clone(), ()).into())?;
tracing::info!("Shutdown request received. Waiting for an exit notification...");
loop {
match &self
.receiver
.recv_timeout(std::time::Duration::from_secs(30))?
{
lsp::Message::Notification(lsp::Notification { method, .. })
if method == lsp_types::notification::Exit::METHOD =>
{
tracing::info!("Exit notification received. Server shutting down...");
return Ok(true);
}
lsp::Message::Request(lsp::Request { id, method, .. }) => {
tracing::warn!(
"Server received unexpected request {method} ({id}) while waiting for exit notification",
);
self.sender.send(lsp::Message::Response(lsp::Response::new_err(
id.clone(),
lsp::ErrorCode::InvalidRequest as i32,
"Server received unexpected request while waiting for exit notification".to_string(),
)))?;
}
message => {
tracing::warn!(
"Server received unexpected message while waiting for exit notification: {message:?}"
);
}
}
}
}
lsp::Message::Notification(lsp::Notification { method, .. })
if method == lsp_types::notification::Exit::METHOD =>
{
anyhow::bail!(
"Server received an exit notification before a shutdown request was sent. Exiting..."
);
}
_ => Ok(false),
}
} }
} }

View file

@ -2,8 +2,10 @@ use crate::Session;
use crate::server::schedule::Scheduler; use crate::server::schedule::Scheduler;
use crate::server::{Server, api}; use crate::server::{Server, api};
use crate::session::client::Client; use crate::session::client::Client;
use anyhow::anyhow;
use crossbeam::select; use crossbeam::select;
use lsp_server::Message; use lsp_server::Message;
use lsp_types::notification::Notification;
use lsp_types::{DidChangeWatchedFilesRegistrationOptions, FileSystemWatcher}; use lsp_types::{DidChangeWatchedFilesRegistrationOptions, FileSystemWatcher};
pub(crate) type MainLoopSender = crossbeam::channel::Sender<Event>; pub(crate) type MainLoopSender = crossbeam::channel::Sender<Event>;
@ -13,7 +15,7 @@ impl Server {
pub(super) fn main_loop(&mut self) -> crate::Result<()> { pub(super) fn main_loop(&mut self) -> crate::Result<()> {
self.initialize(&Client::new( self.initialize(&Client::new(
self.main_loop_sender.clone(), self.main_loop_sender.clone(),
self.connection.sender(), self.connection.sender.clone(),
)); ));
let mut scheduler = Scheduler::new(self.worker_threads); let mut scheduler = Scheduler::new(self.worker_threads);
@ -25,9 +27,11 @@ impl Server {
match next_event { match next_event {
Event::Message(msg) => { Event::Message(msg) => {
if self.connection.handle_shutdown(&msg)? { let client = Client::new(
break; self.main_loop_sender.clone(),
} self.connection.sender.clone(),
);
let task = match msg { let task = match msg {
Message::Request(req) => { Message::Request(req) => {
self.session self.session
@ -35,9 +39,37 @@ impl Server {
.incoming_mut() .incoming_mut()
.register(req.id.clone(), req.method.clone()); .register(req.id.clone(), req.method.clone());
if self.session.is_shutdown_requested() {
tracing::warn!(
"Received request after server shutdown was requested, discarding"
);
client.respond_err(
req.id,
lsp_server::ResponseError {
code: lsp_server::ErrorCode::InvalidRequest as i32,
message: "Shutdown already requested".to_owned(),
data: None,
},
)?;
continue;
}
api::request(req) api::request(req)
} }
Message::Notification(notification) => api::notification(notification), Message::Notification(notification) => {
if notification.method == lsp_types::notification::Exit::METHOD {
if !self.session.is_shutdown_requested() {
return Err(anyhow!(
"Received exit notification before a shutdown request"
));
}
tracing::debug!("Received exit notification, exiting");
return Ok(());
}
api::notification(notification)
}
// Handle the response from the client to a server request // Handle the response from the client to a server request
Message::Response(response) => { Message::Response(response) => {
@ -59,8 +91,6 @@ impl Server {
} }
}; };
let client =
Client::new(self.main_loop_sender.clone(), self.connection.sender());
scheduler.dispatch(task, &mut self.session, client); scheduler.dispatch(task, &mut self.session, client);
} }
Event::Action(action) => match action { Event::Action(action) => match action {
@ -75,7 +105,7 @@ impl Server {
let duration = start_time.elapsed(); let duration = start_time.elapsed();
tracing::trace!(name: "message response", method, %response.id, duration = format_args!("{:0.2?}", duration)); tracing::trace!(name: "message response", method, %response.id, duration = format_args!("{:0.2?}", duration));
self.connection.send(Message::Response(response))?; self.connection.sender.send(Message::Response(response))?;
} else { } else {
tracing::trace!( tracing::trace!(
"Ignoring response for canceled request id={}", "Ignoring response for canceled request id={}",
@ -112,12 +142,13 @@ impl Server {
/// ///
/// Returns `Ok(None)` if the client connection is closed. /// Returns `Ok(None)` if the client connection is closed.
fn next_event(&self) -> Result<Option<Event>, crossbeam::channel::RecvError> { fn next_event(&self) -> Result<Option<Event>, crossbeam::channel::RecvError> {
let next = select!( select!(
recv(self.connection.incoming()) -> msg => msg.map(Event::Message), recv(self.connection.receiver) -> msg => {
recv(self.main_loop_receiver) -> event => return Ok(event.ok()), // Ignore disconnect errors, they're handled by the main loop (it will exit).
); Ok(msg.ok().map(Event::Message))
},
next.map(Some) recv(self.main_loop_receiver) -> event => event.map(Some),
)
} }
fn initialize(&mut self, client: &Client) { fn initialize(&mut self, client: &Client) {

View file

@ -68,7 +68,7 @@ impl Task {
}) })
} }
/// Creates a new local task. /// Creates a new local task.
pub(crate) fn local<F>(func: F) -> Self pub(crate) fn sync<F>(func: F) -> Self
where where
F: FnOnce(&mut Session, &Client) + 'static, F: FnOnce(&mut Session, &Client) + 'static,
{ {
@ -82,7 +82,7 @@ impl Task {
where where
R: Serialize + Send + 'static, R: Serialize + Send + 'static,
{ {
Self::local(move |_, client| { Self::sync(move |_, client| {
if let Err(err) = client.respond(&id, result) { if let Err(err) = client.respond(&id, result) {
tracing::error!("Unable to send immediate response: {err}"); tracing::error!("Unable to send immediate response: {err}");
} }
@ -91,6 +91,6 @@ impl Task {
/// Creates a local task that does nothing. /// Creates a local task that does nothing.
pub(crate) fn nothing() -> Self { pub(crate) fn nothing() -> Self {
Self::local(move |_, _| {}) Self::sync(move |_, _| {})
} }
} }

View file

@ -50,6 +50,9 @@ pub struct Session {
/// Tracks the pending requests between client and server. /// Tracks the pending requests between client and server.
request_queue: RequestQueue, request_queue: RequestQueue,
/// Has the client requested the server to shutdown.
shutdown_requested: bool,
} }
impl Session { impl Session {
@ -86,6 +89,7 @@ impl Session {
client_capabilities, client_capabilities,
)), )),
request_queue: RequestQueue::new(), request_queue: RequestQueue::new(),
shutdown_requested: false,
}) })
} }
@ -97,6 +101,14 @@ impl Session {
&mut self.request_queue &mut self.request_queue
} }
pub(crate) fn is_shutdown_requested(&self) -> bool {
self.shutdown_requested
}
pub(crate) fn set_shutdown_requested(&mut self, requested: bool) {
self.shutdown_requested = requested;
}
// TODO(dhruvmanila): Ideally, we should have a single method for `workspace_db_for_path_mut` // TODO(dhruvmanila): Ideally, we should have a single method for `workspace_db_for_path_mut`
// and `default_workspace_db_mut` but the borrow checker doesn't allow that. // and `default_workspace_db_mut` but the borrow checker doesn't allow that.
// https://github.com/astral-sh/ruff/pull/13041#discussion_r1726725437 // https://github.com/astral-sh/ruff/pull/13041#discussion_r1726725437