[ty] Support cancellation and retry in the server (#18273)
Some checks are pending
CI / Determine changes (push) Waiting to run
CI / cargo fmt (push) Waiting to run
CI / cargo clippy (push) Blocked by required conditions
CI / cargo test (linux) (push) Blocked by required conditions
CI / cargo test (linux, release) (push) Blocked by required conditions
CI / cargo test (windows) (push) Blocked by required conditions
CI / cargo test (wasm) (push) Blocked by required conditions
CI / cargo build (release) (push) Waiting to run
CI / cargo build (msrv) (push) Blocked by required conditions
CI / cargo fuzz build (push) Blocked by required conditions
CI / fuzz parser (push) Blocked by required conditions
CI / test scripts (push) Blocked by required conditions
CI / ecosystem (push) Blocked by required conditions
CI / Fuzz for new ty panics (push) Blocked by required conditions
CI / cargo shear (push) Blocked by required conditions
CI / python package (push) Waiting to run
CI / pre-commit (push) Waiting to run
CI / mkdocs (push) Waiting to run
CI / formatter instabilities and black similarity (push) Blocked by required conditions
CI / test ruff-lsp (push) Blocked by required conditions
CI / check playground (push) Blocked by required conditions
CI / benchmarks (push) Blocked by required conditions
[ty Playground] Release / publish (push) Waiting to run

This commit is contained in:
Micha Reiser 2025-05-28 10:59:29 +02:00 committed by GitHub
parent bbcd7e0196
commit 66ba1d8775
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
28 changed files with 1015 additions and 623 deletions

View file

@ -1,12 +1,9 @@
use crate::server::Server;
use crate::server::{ConnectionInitializer, Server};
use anyhow::Context;
pub use document::{DocumentKey, NotebookDocument, PositionEncoding, TextDocument};
pub use session::{ClientSettings, DocumentQuery, DocumentSnapshot, Session};
use std::num::NonZeroUsize;
#[macro_use]
mod message;
mod document;
mod logging;
mod server;
@ -32,9 +29,18 @@ pub fn run_server() -> anyhow::Result<()> {
.unwrap_or(four)
.min(four);
Server::new(worker_threads)
.context("Failed to start server")?
.run()?;
let (connection, io_threads) = ConnectionInitializer::stdio();
Ok(())
let server_result = Server::new(worker_threads, connection)
.context("Failed to start server")?
.run();
let io_result = io_threads.join();
match (server_result, io_result) {
(Ok(()), Ok(())) => Ok(()),
(Err(server), Err(io)) => Err(server).context(format!("IO thread error: {io}")),
(Err(server), _) => Err(server),
(_, Err(io)) => Err(io).context("IO thread error"),
}
}

View file

@ -1,54 +0,0 @@
use anyhow::Context;
use lsp_types::notification::Notification;
use std::sync::OnceLock;
use crate::server::ClientSender;
static MESSENGER: OnceLock<ClientSender> = OnceLock::new();
pub(crate) fn init_messenger(client_sender: ClientSender) {
MESSENGER
.set(client_sender)
.expect("messenger should only be initialized once");
}
pub(crate) fn show_message(message: String, message_type: lsp_types::MessageType) {
try_show_message(message, message_type).unwrap();
}
pub(super) fn try_show_message(
message: String,
message_type: lsp_types::MessageType,
) -> crate::Result<()> {
MESSENGER
.get()
.ok_or_else(|| anyhow::anyhow!("messenger not initialized"))?
.send(lsp_server::Message::Notification(
lsp_server::Notification {
method: lsp_types::notification::ShowMessage::METHOD.into(),
params: serde_json::to_value(lsp_types::ShowMessageParams {
typ: message_type,
message,
})?,
},
))
.context("Failed to send message")?;
Ok(())
}
/// Sends an error to the client with a formatted message. The error is sent in a
/// `window/showMessage` notification.
macro_rules! show_err_msg {
($msg:expr$(, $($arg:tt)*)?) => {
crate::message::show_message(::core::format_args!($msg$(, $($arg)*)?).to_string(), lsp_types::MessageType::ERROR)
};
}
/// Sends a request to display a warning to the client with a formatted message. The warning is
/// sent in a `window/showMessage` notification.
macro_rules! show_warn_msg {
($msg:expr$(, $($arg:tt)*)?) => {
crate::message::show_message(::core::format_args!($msg$(, $($arg)*)?).to_string(), lsp_types::MessageType::WARNING)
};
}

View file

@ -1,9 +1,7 @@
//! Scheduling, I/O, and API endpoints.
use lsp_server::Message;
use lsp_types::{
ClientCapabilities, DiagnosticOptions, DiagnosticServerCapabilities,
DidChangeWatchedFilesRegistrationOptions, FileSystemWatcher, HoverProviderCapability,
ClientCapabilities, DiagnosticOptions, DiagnosticServerCapabilities, HoverProviderCapability,
InlayHintOptions, InlayHintServerCapabilities, MessageType, ServerCapabilities,
TextDocumentSyncCapability, TextDocumentSyncKind, TextDocumentSyncOptions,
TypeDefinitionProviderCapability, Url,
@ -11,19 +9,20 @@ use lsp_types::{
use std::num::NonZeroUsize;
use std::panic::PanicHookInfo;
use self::connection::{Connection, ConnectionInitializer};
use self::schedule::event_loop_thread;
use self::connection::Connection;
use self::schedule::spawn_main_loop;
use crate::PositionEncoding;
use crate::session::{AllSettings, ClientSettings, Experimental, Session};
mod api;
mod client;
mod connection;
mod main_loop;
mod schedule;
use crate::message::try_show_message;
use crate::server::schedule::Task;
pub(crate) use connection::ClientSender;
use crate::session::client::Client;
pub(crate) use api::Error;
pub(crate) use connection::{ConnectionInitializer, ConnectionSender};
pub(crate) use main_loop::{Action, Event, MainLoopReceiver, MainLoopSender};
pub(crate) type Result<T> = std::result::Result<T, api::Error>;
@ -31,13 +30,16 @@ pub(crate) struct Server {
connection: Connection,
client_capabilities: ClientCapabilities,
worker_threads: NonZeroUsize,
main_loop_receiver: MainLoopReceiver,
main_loop_sender: MainLoopSender,
session: Session,
}
impl Server {
pub(crate) fn new(worker_threads: NonZeroUsize) -> crate::Result<Self> {
let connection = ConnectionInitializer::stdio();
pub(crate) fn new(
worker_threads: NonZeroUsize,
connection: ConnectionInitializer,
) -> crate::Result<Self> {
let (id, init_params) = connection.initialize_start()?;
let AllSettings {
@ -61,7 +63,11 @@ impl Server {
crate::version(),
)?;
crate::message::init_messenger(connection.make_sender());
// The number 32 was chosen arbitrarily. The main goal was to have enough capacity to queue
// some responses before blocking.
let (main_loop_sender, main_loop_receiver) = crossbeam::channel::bounded(32);
let client = Client::new(main_loop_sender.clone(), connection.sender());
crate::logging::init_logging(
global_settings.tracing.log_level.unwrap_or_default(),
global_settings.tracing.log_file.as_deref(),
@ -99,10 +105,10 @@ impl Server {
"Multiple workspaces are not yet supported, using the first workspace: {}",
&first_workspace.0
);
show_warn_msg!(
client.show_warning_message(format_args!(
"Multiple workspaces are not yet supported, using the first workspace: {}",
&first_workspace.0
);
&first_workspace.0,
));
vec![first_workspace]
} else {
workspaces
@ -111,6 +117,8 @@ impl Server {
Ok(Self {
connection,
worker_threads,
main_loop_receiver,
main_loop_sender,
session: Session::new(
&client_capabilities,
position_encoding,
@ -121,7 +129,7 @@ impl Server {
})
}
pub(crate) fn run(self) -> crate::Result<()> {
pub(crate) fn run(mut self) -> crate::Result<()> {
type PanicHook = Box<dyn Fn(&PanicHookInfo<'_>) + 'static + Sync + Send>;
struct RestorePanicHook {
hook: Option<PanicHook>,
@ -141,6 +149,8 @@ impl Server {
hook: Some(std::panic::take_hook()),
};
let client = Client::new(self.main_loop_sender.clone(), self.connection.sender());
// When we panic, try to notify the client.
std::panic::set_hook(Box::new(move |panic_info| {
use std::io::Write;
@ -154,118 +164,15 @@ impl Server {
let mut stderr = std::io::stderr().lock();
writeln!(stderr, "{panic_info}\n{backtrace}").ok();
try_show_message(
"The ty language server exited with a panic. See the logs for more details."
.to_string(),
MessageType::ERROR,
)
.ok();
client
.show_message(
"The ty language server exited with a panic. See the logs for more details.",
MessageType::ERROR,
)
.ok();
}));
event_loop_thread(move || {
Self::event_loop(
&self.connection,
&self.client_capabilities,
self.session,
self.worker_threads,
)?;
self.connection.close()?;
Ok(())
})?
.join()
}
fn event_loop(
connection: &Connection,
client_capabilities: &ClientCapabilities,
mut session: Session,
worker_threads: NonZeroUsize,
) -> crate::Result<()> {
let mut scheduler =
schedule::Scheduler::new(&mut session, worker_threads, connection.make_sender());
let fs_watcher = client_capabilities
.workspace
.as_ref()
.and_then(|workspace| workspace.did_change_watched_files?.dynamic_registration)
.unwrap_or_default();
if fs_watcher {
let registration = lsp_types::Registration {
id: "workspace/didChangeWatchedFiles".to_owned(),
method: "workspace/didChangeWatchedFiles".to_owned(),
register_options: Some(
serde_json::to_value(DidChangeWatchedFilesRegistrationOptions {
watchers: vec![
FileSystemWatcher {
glob_pattern: lsp_types::GlobPattern::String("**/ty.toml".into()),
kind: None,
},
FileSystemWatcher {
glob_pattern: lsp_types::GlobPattern::String(
"**/.gitignore".into(),
),
kind: None,
},
FileSystemWatcher {
glob_pattern: lsp_types::GlobPattern::String("**/.ignore".into()),
kind: None,
},
FileSystemWatcher {
glob_pattern: lsp_types::GlobPattern::String(
"**/pyproject.toml".into(),
),
kind: None,
},
FileSystemWatcher {
glob_pattern: lsp_types::GlobPattern::String("**/*.py".into()),
kind: None,
},
FileSystemWatcher {
glob_pattern: lsp_types::GlobPattern::String("**/*.pyi".into()),
kind: None,
},
FileSystemWatcher {
glob_pattern: lsp_types::GlobPattern::String("**/*.ipynb".into()),
kind: None,
},
],
})
.unwrap(),
),
};
let response_handler = |()| {
tracing::info!("File watcher successfully registered");
Task::nothing()
};
if let Err(err) = scheduler.request::<lsp_types::request::RegisterCapability>(
lsp_types::RegistrationParams {
registrations: vec![registration],
},
response_handler,
) {
tracing::error!(
"An error occurred when trying to register the configuration file watcher: {err}"
);
}
} else {
tracing::warn!("The client does not support file system watching.");
}
for msg in connection.incoming() {
if connection.handle_shutdown(&msg)? {
break;
}
let task = match msg {
Message::Request(req) => api::request(req),
Message::Notification(notification) => api::notification(notification),
Message::Response(response) => scheduler.response(response),
};
scheduler.dispatch(task);
}
Ok(())
spawn_main_loop(move || self.main_loop())?.join()
}
fn find_best_position_encoding(client_capabilities: &ClientCapabilities) -> PositionEncoding {

View file

@ -5,7 +5,7 @@ use anyhow::anyhow;
use lsp_server as server;
use lsp_server::RequestId;
use lsp_types::notification::Notification;
use ruff_db::panic::PanicError;
use lsp_types::request::Request;
use std::panic::UnwindSafe;
mod diagnostics;
@ -14,8 +14,16 @@ mod requests;
mod traits;
use self::traits::{NotificationHandler, RequestHandler};
use super::{Result, client::Responder, schedule::BackgroundSchedule};
use super::{Result, schedule::BackgroundSchedule};
use crate::session::client::Client;
use ruff_db::panic::PanicError;
/// Processes a request from the client to the server.
///
/// The LSP specification requires that each request has exactly one response. Therefore,
/// it's crucial that all paths in this method call [`Client::respond`] exactly once.
/// The only exception to this is requests that were cancelled by the client. In this case,
/// the response was already sent by the [`notification::CancelNotificationHandler`].
pub(super) fn request(req: server::Request) -> Task {
let id = req.id.clone();
@ -45,7 +53,7 @@ pub(super) fn request(req: server::Request) -> Task {
method => {
tracing::warn!("Received request {method} which does not have a handler");
let result: Result<()> = Err(Error::new(
anyhow!("Unknown request"),
anyhow!("Unknown request: {method}"),
server::ErrorCode::MethodNotFound,
));
return Task::immediate(id, result);
@ -53,11 +61,21 @@ pub(super) fn request(req: server::Request) -> Task {
}
.unwrap_or_else(|err| {
tracing::error!("Encountered error when routing request with ID {id}: {err}");
show_err_msg!(
"ty failed to handle a request from the editor. Check the logs for more details."
);
let result: Result<()> = Err(err);
Task::immediate(id, result)
Task::local(move |_session, client| {
client.show_error_message(
"ty failed to handle a request from the editor. Check the logs for more details.",
);
respond_silent_error(
id,
client,
lsp_server::ResponseError {
code: err.code as i32,
message: err.to_string(),
data: None,
},
);
})
})
}
@ -81,6 +99,9 @@ pub(super) fn notification(notif: server::Notification) -> Task {
notifications::DidChangeWatchedFiles::METHOD => {
local_notification_task::<notifications::DidChangeWatchedFiles>(notif)
}
lsp_types::notification::Cancel::METHOD => {
local_notification_task::<notifications::CancelNotificationHandler>(notif)
}
lsp_types::notification::SetTrace::METHOD => {
tracing::trace!("Ignoring `setTrace` notification");
return Task::nothing();
@ -91,41 +112,50 @@ pub(super) fn notification(notif: server::Notification) -> Task {
return Task::nothing();
}
}
.unwrap_or_else(|err| {
tracing::error!("Encountered error when routing notification: {err}");
show_err_msg!(
"ty failed to handle a notification from the editor. Check the logs for more details."
);
Task::nothing()
})
.unwrap_or_else(|err| {
tracing::error!("Encountered error when routing notification: {err}");
Task::local(|_session, client| {
client.show_error_message(
"ty failed to handle a notification from the editor. Check the logs for more details."
);
})
})
}
fn _local_request_task<R: traits::SyncRequestHandler>(req: server::Request) -> super::Result<Task>
fn _local_request_task<R: traits::SyncRequestHandler>(req: server::Request) -> Result<Task>
where
<<R as RequestHandler>::RequestType as lsp_types::request::Request>::Params: UnwindSafe,
<<R as RequestHandler>::RequestType as Request>::Params: UnwindSafe,
{
let (id, params) = cast_request::<R>(req)?;
Ok(Task::local(|session, notifier, requester, responder| {
Ok(Task::local(move |session, client: &Client| {
let _span = tracing::debug_span!("request", %id, method = R::METHOD).entered();
let result = R::run(session, notifier, requester, params);
respond::<R>(id, result, &responder);
let result = R::run(session, client, params);
respond::<R>(&id, result, client);
}))
}
fn background_request_task<R: traits::BackgroundDocumentRequestHandler>(
req: server::Request,
schedule: BackgroundSchedule,
) -> super::Result<Task>
) -> Result<Task>
where
<<R as RequestHandler>::RequestType as lsp_types::request::Request>::Params: UnwindSafe,
<<R as RequestHandler>::RequestType as Request>::Params: UnwindSafe,
{
let retry = R::RETRY_ON_CANCELLATION.then(|| req.clone());
let (id, params) = cast_request::<R>(req)?;
Ok(Task::background(schedule, move |session: &Session| {
let cancellation_token = session
.request_queue()
.incoming()
.cancellation_token(&id)
.expect("request should have been tested for cancellation before scheduling");
let url = R::document_url(&params).into_owned();
let Ok(path) = url_to_any_system_path(&url) else {
tracing::warn!("Ignoring request for invalid `{url}`");
return Box::new(|_, _| {});
return Box::new(|_| {});
};
let db = match &path {
@ -138,17 +168,31 @@ where
let Some(snapshot) = session.take_snapshot(url) else {
tracing::warn!("Ignoring request because snapshot for path `{path:?}` doesn't exist.");
return Box::new(|_, _| {});
return Box::new(|_| {});
};
Box::new(move |notifier, responder| {
Box::new(move |client| {
let _span = tracing::debug_span!("request", %id, method = R::METHOD).entered();
// Test again if the request was cancelled since it was scheduled on the background task
// and, if so, return early
if cancellation_token.is_cancelled() {
tracing::trace!(
"Ignoring request id={id} method={} because it was cancelled",
R::METHOD
);
// We don't need to send a response here because the `cancel` notification
// handler already responded with a message.
return;
}
let result = ruff_db::panic::catch_unwind(|| {
R::run_with_snapshot(&db, snapshot, notifier, params)
R::run_with_snapshot(&db, snapshot, client, params)
});
if let Some(response) = request_result_to_response(&id, &responder, result) {
respond::<R>(id, response, &responder);
if let Some(response) = request_result_to_response::<R>(&id, client, result, retry) {
respond::<R>(&id, response, client);
}
})
}))
@ -156,29 +200,45 @@ where
fn request_result_to_response<R>(
id: &RequestId,
responder: &Responder,
result: std::result::Result<Result<R>, PanicError>,
) -> Option<Result<R>> {
client: &Client,
result: std::result::Result<
Result<<<R as RequestHandler>::RequestType as Request>::Result>,
PanicError,
>,
request: Option<lsp_server::Request>,
) -> Option<Result<<<R as RequestHandler>::RequestType as Request>::Result>>
where
R: traits::BackgroundDocumentRequestHandler,
{
match result {
Ok(response) => Some(response),
Err(error) => {
// Check if the request was canceled due to some modifications to the salsa database.
if error.payload.downcast_ref::<salsa::Cancelled>().is_some() {
// Request was cancelled by Salsa. TODO: Retry
respond_silent_error(
id.clone(),
responder,
Error {
code: lsp_server::ErrorCode::ContentModified,
error: anyhow!("content modified"),
},
// If the query supports retry, re-queue the request.
// The query is still likely to succeed if the user modified any other document.
if let Some(request) = request {
tracing::trace!(
"request id={} method={} was cancelled by salsa, re-queueing for retry",
request.id,
request.method
);
if client.retry(request).is_ok() {
return None;
}
}
tracing::trace!(
"request id={} was cancelled by salsa, sending content modified",
id
);
respond_silent_error(id.clone(), client, R::salsa_cancellation_error());
None
} else {
let message = format!("request handler {error}");
Some(Err(Error {
code: lsp_server::ErrorCode::InternalError,
error: anyhow!(message),
error: anyhow!("request handler {error}"),
}))
}
}
@ -187,13 +247,13 @@ fn request_result_to_response<R>(
fn local_notification_task<N: traits::SyncNotificationHandler>(
notif: server::Notification,
) -> super::Result<Task> {
) -> Result<Task> {
let (id, params) = cast_notification::<N>(notif)?;
Ok(Task::local(move |session, notifier, requester, _| {
Ok(Task::local(move |session, client| {
let _span = tracing::debug_span!("notification", method = N::METHOD).entered();
if let Err(err) = N::run(session, notifier, requester, params) {
if let Err(err) = N::run(session, client, params) {
tracing::error!("An error occurred while running {id}: {err}");
show_err_msg!("ty encountered a problem. Check the logs for more details.");
client.show_error_message("ty encountered a problem. Check the logs for more details.");
}
}))
}
@ -202,11 +262,10 @@ fn local_notification_task<N: traits::SyncNotificationHandler>(
fn background_notification_thread<N>(
req: server::Notification,
schedule: BackgroundSchedule,
) -> super::Result<Task>
) -> Result<Task>
where
N: traits::BackgroundDocumentNotificationHandler,
<<N as NotificationHandler>::NotificationType as lsp_types::notification::Notification>::Params:
UnwindSafe,
<<N as NotificationHandler>::NotificationType as Notification>::Params: UnwindSafe,
{
let (id, params) = cast_notification::<N>(req)?;
Ok(Task::background(schedule, move |session: &Session| {
@ -215,25 +274,29 @@ where
tracing::debug!(
"Ignoring notification because snapshot for url `{url}` doesn't exist."
);
return Box::new(|_, _| {});
return Box::new(|_| {});
};
Box::new(move |notifier, _| {
Box::new(move |client| {
let _span = tracing::debug_span!("notification", method = N::METHOD).entered();
let result = match ruff_db::panic::catch_unwind(|| {
N::run_with_snapshot(snapshot, notifier, params)
N::run_with_snapshot(snapshot, client, params)
}) {
Ok(result) => result,
Err(panic) => {
tracing::error!("An error occurred while running {id}: {panic}");
show_err_msg!("ty encountered a panic. Check the logs for more details.");
client.show_error_message(
"ty encountered a panic. Check the logs for more details.",
);
return;
}
};
if let Err(err) = result {
tracing::error!("An error occurred while running {id}: {err}");
show_err_msg!("ty encountered a problem. Check the logs for more details.");
client.show_error_message(
"ty encountered a problem. Check the logs for more details.",
);
}
})
}))
@ -245,13 +308,13 @@ where
/// implementation.
fn cast_request<Req>(
request: server::Request,
) -> super::Result<(
server::RequestId,
<<Req as RequestHandler>::RequestType as lsp_types::request::Request>::Params,
) -> Result<(
RequestId,
<<Req as RequestHandler>::RequestType as Request>::Params,
)>
where
Req: traits::RequestHandler,
<<Req as RequestHandler>::RequestType as lsp_types::request::Request>::Params: UnwindSafe,
Req: RequestHandler,
<<Req as RequestHandler>::RequestType as Request>::Params: UnwindSafe,
{
request
.extract(Req::METHOD)
@ -269,27 +332,25 @@ where
/// Sends back a response to the server using a [`Responder`].
fn respond<Req>(
id: server::RequestId,
result: crate::server::Result<
<<Req as traits::RequestHandler>::RequestType as lsp_types::request::Request>::Result,
>,
responder: &Responder,
id: &RequestId,
result: Result<<<Req as RequestHandler>::RequestType as Request>::Result>,
client: &Client,
) where
Req: traits::RequestHandler,
Req: RequestHandler,
{
if let Err(err) = &result {
tracing::error!("An error occurred with request ID {id}: {err}");
show_err_msg!("ty encountered a problem. Check the logs for more details.");
client.show_error_message("ty encountered a problem. Check the logs for more details.");
}
if let Err(err) = responder.respond(id, result) {
if let Err(err) = client.respond(id, result) {
tracing::error!("Failed to send response: {err}");
}
}
/// Sends back an error response to the server using a [`Responder`] without showing a warning
/// Sends back an error response to the server using a [`Client`] without showing a warning
/// to the user.
fn respond_silent_error(id: server::RequestId, responder: &Responder, error: Error) {
if let Err(err) = responder.respond::<()>(id, Err(error)) {
fn respond_silent_error(id: RequestId, client: &Client, error: lsp_server::ResponseError) {
if let Err(err) = client.respond_err(id, error) {
tracing::error!("Failed to send response: {err}");
}
}
@ -298,12 +359,12 @@ fn respond_silent_error(id: server::RequestId, responder: &Responder, error: Err
/// a parameter type for a specific request handler.
fn cast_notification<N>(
notification: server::Notification,
) -> super::Result<
(
&'static str,
<<N as traits::NotificationHandler>::NotificationType as lsp_types::notification::Notification>::Params,
)> where
N: traits::NotificationHandler,
) -> Result<(
&'static str,
<<N as NotificationHandler>::NotificationType as Notification>::Params,
)>
where
N: NotificationHandler,
{
Ok((
N::METHOD,

View file

@ -11,14 +11,13 @@ use ruff_db::files::FileRange;
use ruff_db::source::{line_index, source_text};
use ty_project::{Db, ProjectDatabase};
use super::LSPResult;
use crate::document::{FileRangeExt, ToRangeExt};
use crate::server::Result;
use crate::server::client::Notifier;
use crate::session::client::Client;
use crate::system::url_to_any_system_path;
use crate::{DocumentSnapshot, PositionEncoding, Session};
use super::LSPResult;
/// Represents the diagnostics for a text document or a notebook document.
pub(super) enum Diagnostics {
TextDocument(Vec<Diagnostic>),
@ -46,9 +45,9 @@ impl Diagnostics {
/// Clears the diagnostics for the document at `uri`.
///
/// This is done by notifying the client with an empty list of diagnostics for the document.
pub(super) fn clear_diagnostics(uri: &Url, notifier: &Notifier) -> Result<()> {
notifier
.notify::<PublishDiagnostics>(PublishDiagnosticsParams {
pub(super) fn clear_diagnostics(uri: &Url, client: &Client) -> Result<()> {
client
.send_notification::<PublishDiagnostics>(PublishDiagnosticsParams {
uri: uri.clone(),
diagnostics: vec![],
version: None,
@ -63,7 +62,7 @@ pub(super) fn clear_diagnostics(uri: &Url, notifier: &Notifier) -> Result<()> {
/// This function is a no-op if the client supports pull diagnostics.
///
/// [publish diagnostics notification]: https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#textDocument_publishDiagnostics
pub(super) fn publish_diagnostics(session: &Session, url: Url, notifier: &Notifier) -> Result<()> {
pub(super) fn publish_diagnostics(session: &Session, url: Url, client: &Client) -> Result<()> {
if session.client_capabilities().pull_diagnostics {
return Ok(());
}
@ -85,8 +84,8 @@ pub(super) fn publish_diagnostics(session: &Session, url: Url, notifier: &Notifi
// Sends a notification to the client with the diagnostics for the document.
let publish_diagnostics_notification = |uri: Url, diagnostics: Vec<Diagnostic>| {
notifier
.notify::<PublishDiagnostics>(PublishDiagnosticsParams {
client
.send_notification::<PublishDiagnostics>(PublishDiagnosticsParams {
uri,
diagnostics,
version: Some(snapshot.query().version()),

View file

@ -1,3 +1,4 @@
mod cancel;
mod did_change;
mod did_change_watched_files;
mod did_close;
@ -5,6 +6,7 @@ mod did_close_notebook;
mod did_open;
mod did_open_notebook;
pub(super) use cancel::CancelNotificationHandler;
pub(super) use did_change::DidChangeTextDocumentHandler;
pub(super) use did_change_watched_files::DidChangeWatchedFiles;
pub(super) use did_close::DidCloseTextDocumentHandler;

View file

@ -0,0 +1,27 @@
use lsp_server::RequestId;
use lsp_types::CancelParams;
use lsp_types::notification::Cancel;
use crate::server::Result;
use crate::server::api::traits::{NotificationHandler, SyncNotificationHandler};
use crate::session::Session;
use crate::session::client::Client;
pub(crate) struct CancelNotificationHandler;
impl NotificationHandler for CancelNotificationHandler {
type NotificationType = Cancel;
}
impl SyncNotificationHandler for CancelNotificationHandler {
fn run(session: &mut Session, client: &Client, params: CancelParams) -> Result<()> {
let id: RequestId = match params.id {
lsp_types::NumberOrString::Number(id) => id.into(),
lsp_types::NumberOrString::String(id) => id.into(),
};
let _ = client.cancel(session, id);
Ok(())
}
}

View file

@ -2,15 +2,14 @@ use lsp_server::ErrorCode;
use lsp_types::notification::DidChangeTextDocument;
use lsp_types::{DidChangeTextDocumentParams, VersionedTextDocumentIdentifier};
use ty_project::watch::ChangeEvent;
use crate::server::Result;
use crate::server::api::LSPResult;
use crate::server::api::diagnostics::publish_diagnostics;
use crate::server::api::traits::{NotificationHandler, SyncNotificationHandler};
use crate::server::client::{Notifier, Requester};
use crate::session::Session;
use crate::session::client::Client;
use crate::system::{AnySystemPath, url_to_any_system_path};
use ty_project::watch::ChangeEvent;
pub(crate) struct DidChangeTextDocumentHandler;
@ -21,8 +20,7 @@ impl NotificationHandler for DidChangeTextDocumentHandler {
impl SyncNotificationHandler for DidChangeTextDocumentHandler {
fn run(
session: &mut Session,
notifier: Notifier,
_requester: &mut Requester,
client: &Client,
params: DidChangeTextDocumentParams,
) -> Result<()> {
let DidChangeTextDocumentParams {
@ -54,6 +52,6 @@ impl SyncNotificationHandler for DidChangeTextDocumentHandler {
}
}
publish_diagnostics(session, uri, &notifier)
publish_diagnostics(session, uri, client)
}
}

View file

@ -2,9 +2,8 @@ use crate::server::Result;
use crate::server::api::LSPResult;
use crate::server::api::diagnostics::publish_diagnostics;
use crate::server::api::traits::{NotificationHandler, SyncNotificationHandler};
use crate::server::client::{Notifier, Requester};
use crate::server::schedule::Task;
use crate::session::Session;
use crate::session::client::Client;
use crate::system::{AnySystemPath, url_to_any_system_path};
use lsp_types as types;
use lsp_types::{FileChangeType, notification as notif};
@ -21,8 +20,7 @@ impl NotificationHandler for DidChangeWatchedFiles {
impl SyncNotificationHandler for DidChangeWatchedFiles {
fn run(
session: &mut Session,
notifier: Notifier,
requester: &mut Requester,
client: &Client,
params: types::DidChangeWatchedFilesParams,
) -> Result<()> {
let mut events_by_db: FxHashMap<_, Vec<ChangeEvent>> = FxHashMap::default();
@ -105,12 +103,16 @@ impl SyncNotificationHandler for DidChangeWatchedFiles {
if project_changed {
if client_capabilities.diagnostics_refresh {
requester
.request::<types::request::WorkspaceDiagnosticRefresh>((), |()| Task::nothing())
client
.send_request::<types::request::WorkspaceDiagnosticRefresh>(
session,
(),
|_, ()| {},
)
.with_failure_code(lsp_server::ErrorCode::InternalError)?;
} else {
for url in session.text_document_urls() {
publish_diagnostics(session, url.clone(), &notifier)?;
publish_diagnostics(session, url.clone(), client)?;
}
}
@ -118,8 +120,8 @@ impl SyncNotificationHandler for DidChangeWatchedFiles {
}
if client_capabilities.inlay_refresh {
requester
.request::<types::request::InlayHintRefreshRequest>((), |()| Task::nothing())
client
.send_request::<types::request::InlayHintRefreshRequest>(session, (), |_, ()| {})
.with_failure_code(lsp_server::ErrorCode::InternalError)?;
}

View file

@ -1,15 +1,14 @@
use lsp_server::ErrorCode;
use lsp_types::DidCloseTextDocumentParams;
use lsp_types::notification::DidCloseTextDocument;
use ty_project::watch::ChangeEvent;
use crate::server::Result;
use crate::server::api::LSPResult;
use crate::server::api::diagnostics::clear_diagnostics;
use crate::server::api::traits::{NotificationHandler, SyncNotificationHandler};
use crate::server::client::{Notifier, Requester};
use crate::session::Session;
use crate::session::client::Client;
use crate::system::{AnySystemPath, url_to_any_system_path};
use lsp_server::ErrorCode;
use lsp_types::DidCloseTextDocumentParams;
use lsp_types::notification::DidCloseTextDocument;
use ty_project::watch::ChangeEvent;
pub(crate) struct DidCloseTextDocumentHandler;
@ -20,8 +19,7 @@ impl NotificationHandler for DidCloseTextDocumentHandler {
impl SyncNotificationHandler for DidCloseTextDocumentHandler {
fn run(
session: &mut Session,
notifier: Notifier,
_requester: &mut Requester,
client: &Client,
params: DidCloseTextDocumentParams,
) -> Result<()> {
let Ok(path) = url_to_any_system_path(&params.text_document.uri) else {
@ -38,7 +36,7 @@ impl SyncNotificationHandler for DidCloseTextDocumentHandler {
db.apply_changes(vec![ChangeEvent::DeletedVirtual(virtual_path)], None);
}
clear_diagnostics(key.url(), &notifier)?;
clear_diagnostics(key.url(), client)?;
Ok(())
}

View file

@ -1,14 +1,13 @@
use lsp_types::DidCloseNotebookDocumentParams;
use lsp_types::notification::DidCloseNotebookDocument;
use ty_project::watch::ChangeEvent;
use crate::server::Result;
use crate::server::api::LSPResult;
use crate::server::api::traits::{NotificationHandler, SyncNotificationHandler};
use crate::server::client::{Notifier, Requester};
use crate::session::Session;
use crate::session::client::Client;
use crate::system::{AnySystemPath, url_to_any_system_path};
use ty_project::watch::ChangeEvent;
pub(crate) struct DidCloseNotebookHandler;
@ -19,8 +18,7 @@ impl NotificationHandler for DidCloseNotebookHandler {
impl SyncNotificationHandler for DidCloseNotebookHandler {
fn run(
session: &mut Session,
_notifier: Notifier,
_requester: &mut Requester,
_client: &Client,
params: DidCloseNotebookDocumentParams,
) -> Result<()> {
let Ok(path) = url_to_any_system_path(&params.notebook_document.uri) else {

View file

@ -1,16 +1,15 @@
use lsp_types::notification::DidOpenTextDocument;
use lsp_types::{DidOpenTextDocumentParams, TextDocumentItem};
use ruff_db::Db;
use ty_project::watch::ChangeEvent;
use crate::TextDocument;
use crate::server::Result;
use crate::server::api::diagnostics::publish_diagnostics;
use crate::server::api::traits::{NotificationHandler, SyncNotificationHandler};
use crate::server::client::{Notifier, Requester};
use crate::session::Session;
use crate::session::client::Client;
use crate::system::{AnySystemPath, url_to_any_system_path};
use ruff_db::Db;
use ty_project::watch::ChangeEvent;
pub(crate) struct DidOpenTextDocumentHandler;
@ -21,8 +20,7 @@ impl NotificationHandler for DidOpenTextDocumentHandler {
impl SyncNotificationHandler for DidOpenTextDocumentHandler {
fn run(
session: &mut Session,
notifier: Notifier,
_requester: &mut Requester,
client: &Client,
DidOpenTextDocumentParams {
text_document:
TextDocumentItem {
@ -54,6 +52,6 @@ impl SyncNotificationHandler for DidOpenTextDocumentHandler {
}
}
publish_diagnostics(session, uri, &notifier)
publish_diagnostics(session, uri, client)
}
}

View file

@ -9,8 +9,8 @@ use crate::document::NotebookDocument;
use crate::server::Result;
use crate::server::api::LSPResult;
use crate::server::api::traits::{NotificationHandler, SyncNotificationHandler};
use crate::server::client::{Notifier, Requester};
use crate::session::Session;
use crate::session::client::Client;
use crate::system::{AnySystemPath, url_to_any_system_path};
pub(crate) struct DidOpenNotebookHandler;
@ -22,8 +22,7 @@ impl NotificationHandler for DidOpenNotebookHandler {
impl SyncNotificationHandler for DidOpenNotebookHandler {
fn run(
session: &mut Session,
_notifier: Notifier,
_requester: &mut Requester,
_client: &Client,
params: DidOpenNotebookDocumentParams,
) -> Result<()> {
let Ok(path) = url_to_any_system_path(&params.notebook_document.uri) else {

View file

@ -9,7 +9,7 @@ use ty_project::ProjectDatabase;
use crate::DocumentSnapshot;
use crate::document::PositionExt;
use crate::server::api::traits::{BackgroundDocumentRequestHandler, RequestHandler};
use crate::server::client::Notifier;
use crate::session::client::Client;
pub(crate) struct CompletionRequestHandler;
@ -18,6 +18,8 @@ impl RequestHandler for CompletionRequestHandler {
}
impl BackgroundDocumentRequestHandler for CompletionRequestHandler {
const RETRY_ON_CANCELLATION: bool = true;
fn document_url(params: &CompletionParams) -> Cow<Url> {
Cow::Borrowed(&params.text_document_position.text_document.uri)
}
@ -25,7 +27,7 @@ impl BackgroundDocumentRequestHandler for CompletionRequestHandler {
fn run_with_snapshot(
db: &ProjectDatabase,
snapshot: DocumentSnapshot,
_notifier: Notifier,
_client: &Client,
params: CompletionParams,
) -> crate::server::Result<Option<CompletionResponse>> {
let Some(file) = snapshot.file(db) else {

View file

@ -6,10 +6,11 @@ use lsp_types::{
FullDocumentDiagnosticReport, RelatedFullDocumentDiagnosticReport,
};
use crate::server::Result;
use crate::server::api::diagnostics::{Diagnostics, compute_diagnostics};
use crate::server::api::traits::{BackgroundDocumentRequestHandler, RequestHandler};
use crate::server::{Result, client::Notifier};
use crate::session::DocumentSnapshot;
use crate::session::client::Client;
use ty_project::ProjectDatabase;
pub(crate) struct DocumentDiagnosticRequestHandler;
@ -26,7 +27,7 @@ impl BackgroundDocumentRequestHandler for DocumentDiagnosticRequestHandler {
fn run_with_snapshot(
db: &ProjectDatabase,
snapshot: DocumentSnapshot,
_notifier: Notifier,
_client: &Client,
_params: DocumentDiagnosticParams,
) -> Result<DocumentDiagnosticReportResult> {
Ok(DocumentDiagnosticReportResult::Report(
@ -42,4 +43,15 @@ impl BackgroundDocumentRequestHandler for DocumentDiagnosticRequestHandler {
}),
))
}
fn salsa_cancellation_error() -> lsp_server::ResponseError {
lsp_server::ResponseError {
code: lsp_server::ErrorCode::ServerCancelled as i32,
message: "server cancelled the request".to_owned(),
data: serde_json::to_value(lsp_types::DiagnosticServerCancellationData {
retrigger_request: true,
})
.ok(),
}
}
}

View file

@ -9,7 +9,7 @@ use ty_project::ProjectDatabase;
use crate::DocumentSnapshot;
use crate::document::{PositionExt, ToLink};
use crate::server::api::traits::{BackgroundDocumentRequestHandler, RequestHandler};
use crate::server::client::Notifier;
use crate::session::client::Client;
pub(crate) struct GotoTypeDefinitionRequestHandler;
@ -25,7 +25,7 @@ impl BackgroundDocumentRequestHandler for GotoTypeDefinitionRequestHandler {
fn run_with_snapshot(
db: &ProjectDatabase,
snapshot: DocumentSnapshot,
_notifier: Notifier,
_client: &Client,
params: GotoTypeDefinitionParams,
) -> crate::server::Result<Option<GotoDefinitionResponse>> {
let Some(file) = snapshot.file(db) else {

View file

@ -3,7 +3,7 @@ use std::borrow::Cow;
use crate::DocumentSnapshot;
use crate::document::{PositionExt, ToRangeExt};
use crate::server::api::traits::{BackgroundDocumentRequestHandler, RequestHandler};
use crate::server::client::Notifier;
use crate::session::client::Client;
use lsp_types::request::HoverRequest;
use lsp_types::{HoverContents, HoverParams, MarkupContent, Url};
use ruff_db::source::{line_index, source_text};
@ -25,7 +25,7 @@ impl BackgroundDocumentRequestHandler for HoverRequestHandler {
fn run_with_snapshot(
db: &ProjectDatabase,
snapshot: DocumentSnapshot,
_notifier: Notifier,
_client: &Client,
params: HoverParams,
) -> crate::server::Result<Option<lsp_types::Hover>> {
let Some(file) = snapshot.file(db) else {

View file

@ -3,7 +3,7 @@ use std::borrow::Cow;
use crate::DocumentSnapshot;
use crate::document::{RangeExt, TextSizeExt};
use crate::server::api::traits::{BackgroundDocumentRequestHandler, RequestHandler};
use crate::server::client::Notifier;
use crate::session::client::Client;
use lsp_types::request::InlayHintRequest;
use lsp_types::{InlayHintParams, Url};
use ruff_db::source::{line_index, source_text};
@ -24,7 +24,7 @@ impl BackgroundDocumentRequestHandler for InlayHintRequestHandler {
fn run_with_snapshot(
db: &ProjectDatabase,
snapshot: DocumentSnapshot,
_notifier: Notifier,
_client: &Client,
params: InlayHintParams,
) -> crate::server::Result<Option<Vec<lsp_types::InlayHint>>> {
let Some(file) = snapshot.file(db) else {

View file

@ -1,6 +1,6 @@
//! A stateful LSP implementation that calls into the ty API.
use crate::server::client::{Notifier, Requester};
use crate::session::client::Client;
use crate::session::{DocumentSnapshot, Session};
use lsp_types::notification::Notification as LSPNotification;
@ -21,14 +21,16 @@ pub(super) trait RequestHandler {
pub(super) trait SyncRequestHandler: RequestHandler {
fn run(
session: &mut Session,
notifier: Notifier,
requester: &mut Requester,
client: &Client,
params: <<Self as RequestHandler>::RequestType as Request>::Params,
) -> super::Result<<<Self as RequestHandler>::RequestType as Request>::Result>;
}
/// A request handler that can be run on a background thread.
pub(super) trait BackgroundDocumentRequestHandler: RequestHandler {
/// Whether this request be retried if it was cancelled due to a modification to the Salsa database.
const RETRY_ON_CANCELLATION: bool = false;
fn document_url(
params: &<<Self as RequestHandler>::RequestType as Request>::Params,
) -> std::borrow::Cow<lsp_types::Url>;
@ -36,9 +38,17 @@ pub(super) trait BackgroundDocumentRequestHandler: RequestHandler {
fn run_with_snapshot(
db: &ProjectDatabase,
snapshot: DocumentSnapshot,
notifier: Notifier,
client: &Client,
params: <<Self as RequestHandler>::RequestType as Request>::Params,
) -> super::Result<<<Self as RequestHandler>::RequestType as Request>::Result>;
fn salsa_cancellation_error() -> lsp_server::ResponseError {
lsp_server::ResponseError {
code: lsp_server::ErrorCode::ContentModified as i32,
message: "content modified".to_string(),
data: None,
}
}
}
/// A supertrait for any server notification handler.
@ -55,8 +65,7 @@ pub(super) trait NotificationHandler {
pub(super) trait SyncNotificationHandler: NotificationHandler {
fn run(
session: &mut Session,
notifier: Notifier,
requester: &mut Requester,
client: &Client,
params: <<Self as NotificationHandler>::NotificationType as LSPNotification>::Params,
) -> super::Result<()>;
}
@ -72,7 +81,7 @@ pub(super) trait BackgroundDocumentNotificationHandler: NotificationHandler {
fn run_with_snapshot(
snapshot: DocumentSnapshot,
notifier: Notifier,
client: &Client,
params: <<Self as NotificationHandler>::NotificationType as LSPNotification>::Params,
) -> super::Result<()>;
}

View file

@ -1,169 +0,0 @@
use std::any::TypeId;
use lsp_server::{Notification, RequestId};
use rustc_hash::FxHashMap;
use serde_json::Value;
use super::{ClientSender, schedule::Task};
type ResponseBuilder = Box<dyn FnOnce(lsp_server::Response) -> Task>;
pub(crate) struct Client {
notifier: Notifier,
responder: Responder,
pub(super) requester: Requester,
}
#[derive(Clone)]
pub(crate) struct Notifier(ClientSender);
#[derive(Clone)]
pub(crate) struct Responder(ClientSender);
pub(crate) struct Requester {
sender: ClientSender,
next_request_id: i32,
response_handlers: FxHashMap<lsp_server::RequestId, ResponseBuilder>,
}
impl Client {
pub(super) fn new(sender: ClientSender) -> Self {
Self {
notifier: Notifier(sender.clone()),
responder: Responder(sender.clone()),
requester: Requester {
sender,
next_request_id: 1,
response_handlers: FxHashMap::default(),
},
}
}
pub(super) fn notifier(&self) -> Notifier {
self.notifier.clone()
}
pub(super) fn responder(&self) -> Responder {
self.responder.clone()
}
}
#[expect(dead_code)] // we'll need to use `Notifier` in the future
impl Notifier {
pub(crate) fn notify<N>(&self, params: N::Params) -> crate::Result<()>
where
N: lsp_types::notification::Notification,
{
let method = N::METHOD.to_string();
let message = lsp_server::Message::Notification(Notification::new(method, params));
self.0.send(message)
}
pub(crate) fn notify_method(&self, method: String) -> crate::Result<()> {
self.0
.send(lsp_server::Message::Notification(Notification::new(
method,
Value::Null,
)))
}
}
impl Responder {
pub(crate) fn respond<R>(
&self,
id: RequestId,
result: crate::server::Result<R>,
) -> crate::Result<()>
where
R: serde::Serialize,
{
self.0.send(
match result {
Ok(res) => lsp_server::Response::new_ok(id, res),
Err(crate::server::api::Error { code, error }) => {
lsp_server::Response::new_err(id, code as i32, format!("{error}"))
}
}
.into(),
)
}
}
impl Requester {
/// Sends a request of kind `R` to the client, with associated parameters.
/// The task provided by `response_handler` will be dispatched as soon as the response
/// comes back from the client.
pub(crate) fn request<R>(
&mut self,
params: R::Params,
response_handler: impl Fn(R::Result) -> Task + 'static,
) -> crate::Result<()>
where
R: lsp_types::request::Request,
{
let serialized_params = serde_json::to_value(params)?;
self.response_handlers.insert(
self.next_request_id.into(),
Box::new(move |response: lsp_server::Response| {
match (response.error, response.result) {
(Some(err), _) => {
tracing::error!(
"Got an error from the client (code {}): {}",
err.code,
err.message
);
Task::nothing()
}
(None, Some(response)) => match serde_json::from_value(response) {
Ok(response) => response_handler(response),
Err(error) => {
tracing::error!("Failed to deserialize response from server: {error}");
Task::nothing()
}
},
(None, None) => {
if TypeId::of::<R::Result>() == TypeId::of::<()>() {
// We can't call `response_handler(())` directly here, but
// since we _know_ the type expected is `()`, we can use
// `from_value(Value::Null)`. `R::Result` implements `DeserializeOwned`,
// so this branch works in the general case but we'll only
// hit it if the concrete type is `()`, so the `unwrap()` is safe here.
response_handler(serde_json::from_value(Value::Null).unwrap());
} else {
tracing::error!(
"Server response was invalid: did not contain a result or error"
);
}
Task::nothing()
}
}
}),
);
self.sender
.send(lsp_server::Message::Request(lsp_server::Request {
id: self.next_request_id.into(),
method: R::METHOD.into(),
params: serialized_params,
}))?;
self.next_request_id += 1;
Ok(())
}
pub(crate) fn pop_response_task(&mut self, response: lsp_server::Response) -> Task {
if let Some(handler) = self.response_handlers.remove(&response.id) {
handler(response)
} else {
tracing::error!(
"Received a response with ID {}, which was not expected",
response.id
);
Task::nothing()
}
}
}

View file

@ -1,31 +1,25 @@
use lsp_server as lsp;
use lsp_types::{notification::Notification, request::Request};
use std::sync::{Arc, Weak};
type ConnectionSender = crossbeam::channel::Sender<lsp::Message>;
pub(crate) type ConnectionSender = crossbeam::channel::Sender<lsp::Message>;
type ConnectionReceiver = crossbeam::channel::Receiver<lsp::Message>;
/// A builder for `Connection` that handles LSP initialization.
pub(crate) struct ConnectionInitializer {
connection: lsp::Connection,
threads: lsp::IoThreads,
}
/// Handles inbound and outbound messages with the client.
pub(crate) struct Connection {
sender: Arc<ConnectionSender>,
sender: ConnectionSender,
receiver: ConnectionReceiver,
threads: lsp::IoThreads,
}
impl ConnectionInitializer {
/// Create a new LSP server connection over stdin/stdout.
pub(super) fn stdio() -> Self {
pub(crate) fn stdio() -> (Self, lsp::IoThreads) {
let (connection, threads) = lsp::Connection::stdio();
Self {
connection,
threads,
}
(Self { connection }, threads)
}
/// Starts the initialization process with the client by listening for an initialization request.
@ -59,27 +53,25 @@ impl ConnectionInitializer {
)?;
let Self {
connection: lsp::Connection { sender, receiver },
threads,
} = self;
Ok(Connection {
sender: Arc::new(sender),
receiver,
threads,
})
Ok(Connection { sender, receiver })
}
}
impl Connection {
/// Make a new `ClientSender` for sending messages to the client.
pub(super) fn make_sender(&self) -> ClientSender {
ClientSender {
weak_sender: Arc::downgrade(&self.sender),
}
pub(super) fn sender(&self) -> ConnectionSender {
self.sender.clone()
}
pub(super) fn send(&self, msg: lsp::Message) -> crate::Result<()> {
self.sender.send(msg)?;
Ok(())
}
/// An iterator over incoming messages from the client.
pub(super) fn incoming(&self) -> crossbeam::channel::Iter<lsp::Message> {
self.receiver.iter()
pub(super) fn incoming(&self) -> &crossbeam::channel::Receiver<lsp::Message> {
&self.receiver
}
/// Check and respond to any incoming shutdown requests; returns`true` if the server should be shutdown.
@ -131,37 +123,4 @@ impl Connection {
_ => Ok(false),
}
}
/// Join the I/O threads that underpin this connection.
/// This is guaranteed to be nearly immediate since
/// we close the only active channels to these threads prior
/// to joining them.
pub(super) fn close(self) -> crate::Result<()> {
std::mem::drop(
Arc::into_inner(self.sender)
.expect("the client sender shouldn't have more than one strong reference"),
);
std::mem::drop(self.receiver);
self.threads.join()?;
Ok(())
}
}
/// A weak reference to an underlying sender channel, used for communication with the client.
/// If the `Connection` that created this `ClientSender` is dropped, any `send` calls will throw
/// an error.
#[derive(Clone, Debug)]
pub(crate) struct ClientSender {
weak_sender: Weak<ConnectionSender>,
}
// note: additional wrapper functions for senders may be implemented as needed.
impl ClientSender {
pub(crate) fn send(&self, msg: lsp::Message) -> crate::Result<()> {
let Some(sender) = self.weak_sender.upgrade() else {
anyhow::bail!("The connection with the client has been closed");
};
Ok(sender.send(msg)?)
}
}

View file

@ -0,0 +1,212 @@
use crate::Session;
use crate::server::schedule::Scheduler;
use crate::server::{Server, api};
use crate::session::client::Client;
use crossbeam::select;
use lsp_server::Message;
use lsp_types::{DidChangeWatchedFilesRegistrationOptions, FileSystemWatcher};
pub(crate) type MainLoopSender = crossbeam::channel::Sender<Event>;
pub(crate) type MainLoopReceiver = crossbeam::channel::Receiver<Event>;
impl Server {
pub(super) fn main_loop(&mut self) -> crate::Result<()> {
self.initialize(&Client::new(
self.main_loop_sender.clone(),
self.connection.sender(),
));
let mut scheduler = Scheduler::new(self.worker_threads);
while let Ok(next_event) = self.next_event() {
let Some(next_event) = next_event else {
anyhow::bail!("client exited without proper shutdown sequence");
};
match next_event {
Event::Message(msg) => {
if self.connection.handle_shutdown(&msg)? {
break;
}
let task = match msg {
Message::Request(req) => {
self.session
.request_queue_mut()
.incoming_mut()
.register(req.id.clone(), req.method.clone());
api::request(req)
}
Message::Notification(notification) => api::notification(notification),
// Handle the response from the client to a server request
Message::Response(response) => {
if let Some(handler) = self
.session
.request_queue_mut()
.outgoing_mut()
.complete(&response.id)
{
handler(&self.session, response);
} else {
tracing::error!(
"Received a response with ID {}, which was not expected",
response.id
);
}
continue;
}
};
let client =
Client::new(self.main_loop_sender.clone(), self.connection.sender());
scheduler.dispatch(task, &mut self.session, client);
}
Event::Action(action) => match action {
Action::SendResponse(response) => {
// Filter out responses for already canceled requests.
if let Some((start_time, method)) = self
.session
.request_queue_mut()
.incoming_mut()
.complete(&response.id)
{
let duration = start_time.elapsed();
tracing::trace!(name: "message response", method, %response.id, duration = format_args!("{:0.2?}", duration));
self.connection.send(Message::Response(response))?;
} else {
tracing::trace!(
"Ignoring response for canceled request id={}",
response.id
);
}
}
Action::RetryRequest(request) => {
// Never retry canceled requests.
if self
.session
.request_queue()
.incoming()
.is_pending(&request.id)
{
api::request(request);
} else {
tracing::debug!(
"Request {}/{} was cancelled, not retrying",
request.method,
request.id
);
}
}
},
}
}
Ok(())
}
/// Waits for the next message from the client or action.
///
/// Returns `Ok(None)` if the client connection is closed.
fn next_event(&self) -> Result<Option<Event>, crossbeam::channel::RecvError> {
let next = select!(
recv(self.connection.incoming()) -> msg => msg.map(Event::Message),
recv(self.main_loop_receiver) -> event => return Ok(event.ok()),
);
next.map(Some)
}
fn initialize(&mut self, client: &Client) {
let fs_watcher = self
.client_capabilities
.workspace
.as_ref()
.and_then(|workspace| workspace.did_change_watched_files?.dynamic_registration)
.unwrap_or_default();
if fs_watcher {
let registration = lsp_types::Registration {
id: "workspace/didChangeWatchedFiles".to_owned(),
method: "workspace/didChangeWatchedFiles".to_owned(),
register_options: Some(
serde_json::to_value(DidChangeWatchedFilesRegistrationOptions {
watchers: vec![
FileSystemWatcher {
glob_pattern: lsp_types::GlobPattern::String("**/ty.toml".into()),
kind: None,
},
FileSystemWatcher {
glob_pattern: lsp_types::GlobPattern::String(
"**/.gitignore".into(),
),
kind: None,
},
FileSystemWatcher {
glob_pattern: lsp_types::GlobPattern::String("**/.ignore".into()),
kind: None,
},
FileSystemWatcher {
glob_pattern: lsp_types::GlobPattern::String(
"**/pyproject.toml".into(),
),
kind: None,
},
FileSystemWatcher {
glob_pattern: lsp_types::GlobPattern::String("**/*.py".into()),
kind: None,
},
FileSystemWatcher {
glob_pattern: lsp_types::GlobPattern::String("**/*.pyi".into()),
kind: None,
},
FileSystemWatcher {
glob_pattern: lsp_types::GlobPattern::String("**/*.ipynb".into()),
kind: None,
},
],
})
.unwrap(),
),
};
let response_handler = move |_session: &Session, ()| {
tracing::info!("File watcher successfully registered");
};
if let Err(err) = client.send_request::<lsp_types::request::RegisterCapability>(
&self.session,
lsp_types::RegistrationParams {
registrations: vec![registration],
},
response_handler,
) {
tracing::error!(
"An error occurred when trying to register the configuration file watcher: {err}"
);
}
} else {
tracing::warn!("The client does not support file system watching.");
}
}
}
/// An action that should be performed on the main loop.
#[derive(Debug)]
pub(crate) enum Action {
/// Send a response to the client
SendResponse(lsp_server::Response),
/// Retry a request that previously failed due to a salsa cancellation.
RetryRequest(lsp_server::Request),
}
#[derive(Debug)]
pub(crate) enum Event {
/// An incoming message from the LSP client.
Message(lsp_server::Message),
Action(Action),
}

View file

@ -5,20 +5,18 @@ use crate::session::Session;
mod task;
mod thread;
pub(super) use task::{BackgroundSchedule, Task};
use self::{
task::{BackgroundTaskBuilder, SyncTask},
thread::ThreadPriority,
};
use super::{ClientSender, client::Client};
use crate::session::client::Client;
pub(super) use task::{BackgroundSchedule, Task};
/// The event loop thread is actually a secondary thread that we spawn from the
/// _actual_ main thread. This secondary thread has a larger stack size
/// than some OS defaults (Windows, for example) and is also designated as
/// high-priority.
pub(crate) fn event_loop_thread(
pub(crate) fn spawn_main_loop(
func: impl FnOnce() -> crate::Result<()> + Send + 'static,
) -> crate::Result<thread::JoinHandle<crate::Result<()>>> {
// Override OS defaults to avoid stack overflows on platforms with low stack size defaults.
@ -32,69 +30,33 @@ pub(crate) fn event_loop_thread(
)
}
pub(crate) struct Scheduler<'s> {
session: &'s mut Session,
client: Client,
pub(crate) struct Scheduler {
fmt_pool: thread::Pool,
background_pool: thread::Pool,
}
impl<'s> Scheduler<'s> {
pub(super) fn new(
session: &'s mut Session,
worker_threads: NonZeroUsize,
sender: ClientSender,
) -> Self {
impl Scheduler {
pub(super) fn new(worker_threads: NonZeroUsize) -> Self {
const FMT_THREADS: usize = 1;
Self {
session,
fmt_pool: thread::Pool::new(NonZeroUsize::try_from(FMT_THREADS).unwrap()),
background_pool: thread::Pool::new(worker_threads),
client: Client::new(sender),
}
}
/// Immediately sends a request of kind `R` to the client, with associated parameters.
/// The task provided by `response_handler` will be dispatched as soon as the response
/// comes back from the client.
pub(super) fn request<R>(
&mut self,
params: R::Params,
response_handler: impl Fn(R::Result) -> Task + 'static,
) -> crate::Result<()>
where
R: lsp_types::request::Request,
{
self.client.requester.request::<R>(params, response_handler)
}
/// Creates a task to handle a response from the client.
pub(super) fn response(&mut self, response: lsp_server::Response) -> Task {
self.client.requester.pop_response_task(response)
}
/// Dispatches a `task` by either running it as a blocking function or
/// executing it on a background thread pool.
pub(super) fn dispatch(&mut self, task: task::Task) {
pub(super) fn dispatch(&mut self, task: task::Task, session: &mut Session, client: Client) {
match task {
Task::Sync(SyncTask { func }) => {
let notifier = self.client.notifier();
let responder = self.client.responder();
func(
self.session,
notifier,
&mut self.client.requester,
responder,
);
func(session, &client);
}
Task::Background(BackgroundTaskBuilder {
schedule,
builder: func,
}) => {
let static_func = func(self.session);
let notifier = self.client.notifier();
let responder = self.client.responder();
let task = move || static_func(notifier, responder);
let static_func = func(session);
let task = move || static_func(&client);
match schedule {
BackgroundSchedule::Worker => {
self.background_pool.spawn(ThreadPriority::Worker, task);

View file

@ -1,14 +1,12 @@
use lsp_server::RequestId;
use serde::Serialize;
use crate::{
server::client::{Notifier, Requester, Responder},
session::Session,
};
use crate::session::Session;
use crate::session::client::Client;
type LocalFn = Box<dyn FnOnce(&mut Session, Notifier, &mut Requester, Responder)>;
type LocalFn = Box<dyn FnOnce(&mut Session, &Client)>;
type BackgroundFn = Box<dyn FnOnce(Notifier, Responder) + Send + 'static>;
type BackgroundFn = Box<dyn FnOnce(&Client) + Send + 'static>;
type BackgroundFnBuilder = Box<dyn FnOnce(&Session) -> BackgroundFn>;
@ -62,7 +60,7 @@ impl Task {
/// Creates a new background task.
pub(crate) fn background<F>(schedule: BackgroundSchedule, func: F) -> Self
where
F: FnOnce(&Session) -> Box<dyn FnOnce(Notifier, Responder) + Send + 'static> + 'static,
F: FnOnce(&Session) -> Box<dyn FnOnce(&Client) + Send + 'static> + 'static,
{
Self::Background(BackgroundTaskBuilder {
schedule,
@ -72,7 +70,7 @@ impl Task {
/// Creates a new local task.
pub(crate) fn local<F>(func: F) -> Self
where
F: FnOnce(&mut Session, Notifier, &mut Requester, Responder) + 'static,
F: FnOnce(&mut Session, &Client) + 'static,
{
Self::Sync(SyncTask {
func: Box::new(func),
@ -84,8 +82,8 @@ impl Task {
where
R: Serialize + Send + 'static,
{
Self::local(move |_, _, _, responder| {
if let Err(err) = responder.respond(id, result) {
Self::local(move |_, client| {
if let Err(err) = client.respond(&id, result) {
tracing::error!("Unable to send immediate response: {err}");
}
})
@ -93,6 +91,6 @@ impl Task {
/// Creates a local task that does nothing.
pub(crate) fn nothing() -> Self {
Self::local(move |_, _, _, _| {})
Self::local(move |_, _| {})
}
}

View file

@ -7,29 +7,27 @@ use std::sync::Arc;
use anyhow::anyhow;
use lsp_types::{ClientCapabilities, TextDocumentContentChangeEvent, Url};
use ruff_db::Db;
use ruff_db::files::{File, system_path_to_file};
use ruff_db::system::SystemPath;
use ty_project::{ProjectDatabase, ProjectMetadata};
use crate::document::{DocumentKey, DocumentVersion, NotebookDocument};
use crate::system::{AnySystemPath, LSPSystem, url_to_any_system_path};
use crate::{PositionEncoding, TextDocument};
pub(crate) use self::capabilities::ResolvedClientCapabilities;
pub use self::index::DocumentQuery;
pub(crate) use self::settings::AllSettings;
pub use self::settings::ClientSettings;
pub(crate) use self::settings::Experimental;
use crate::document::{DocumentKey, DocumentVersion, NotebookDocument};
use crate::session::request_queue::RequestQueue;
use crate::system::{AnySystemPath, LSPSystem, url_to_any_system_path};
use crate::{PositionEncoding, TextDocument};
mod capabilities;
pub(crate) mod client;
pub(crate) mod index;
mod request_queue;
mod settings;
// TODO(dhruvmanila): In general, the server shouldn't use any salsa queries directly and instead
// should use methods on `ProjectDatabase`.
/// The global state for the LSP
pub struct Session {
/// Used to retrieve information about open documents and settings.
@ -49,10 +47,13 @@ pub struct Session {
/// Tracks what LSP features the client supports and doesn't support.
resolved_client_capabilities: Arc<ResolvedClientCapabilities>,
/// Tracks the pending requests between client and server.
request_queue: RequestQueue,
}
impl Session {
pub fn new(
pub(crate) fn new(
client_capabilities: &ClientCapabilities,
position_encoding: PositionEncoding,
global_settings: ClientSettings,
@ -84,9 +85,18 @@ impl Session {
resolved_client_capabilities: Arc::new(ResolvedClientCapabilities::new(
client_capabilities,
)),
request_queue: RequestQueue::new(),
})
}
pub(crate) fn request_queue(&self) -> &RequestQueue {
&self.request_queue
}
pub(crate) fn request_queue_mut(&mut self) -> &mut RequestQueue {
&mut self.request_queue
}
// TODO(dhruvmanila): Ideally, we should have a single method for `workspace_db_for_path_mut`
// and `default_workspace_db_mut` but the borrow checker doesn't allow that.
// https://github.com/astral-sh/ruff/pull/13041#discussion_r1726725437

View file

@ -0,0 +1,260 @@
use crate::Session;
use crate::server::{Action, ConnectionSender};
use crate::server::{Event, MainLoopSender};
use anyhow::{Context, anyhow};
use lsp_server::{ErrorCode, Message, Notification, RequestId, ResponseError};
use serde_json::Value;
use std::any::TypeId;
use std::fmt::Display;
pub(crate) type ClientResponseHandler = Box<dyn FnOnce(&Session, lsp_server::Response) + Send>;
#[derive(Debug)]
pub(crate) struct Client {
/// Channel to send messages back to the main loop.
main_loop_sender: MainLoopSender,
/// Channel to send messages directly to the LSP client without going through the main loop.
///
/// This is generally preferred because it reduces pressure on the main loop but it may not always be
/// possible if access to data on [`Session`] is required, which background tasks don't have.
client_sender: ConnectionSender,
}
impl Client {
pub(crate) fn new(main_loop_sender: MainLoopSender, client_sender: ConnectionSender) -> Self {
Self {
main_loop_sender,
client_sender,
}
}
/// Sends a request of kind `R` to the client, with associated parameters.
///
/// The request is sent immediately.
/// The `response_handler` will be dispatched as soon as the client response
/// is processed on the main-loop. The handler always runs on the main-loop thread.
///
/// # Note
/// This method takes a `session` so that we can register the pending-request
/// and send the response directly to the client. If this ever becomes too limiting (because we
/// need to send a request from somewhere where we don't have access to session), consider introducing
/// a new `send_deferred_request` method that doesn't take a session and instead sends
/// an `Action` to the main loop to send the request (the main loop has always access to session).
pub(crate) fn send_request<R>(
&self,
session: &Session,
params: R::Params,
response_handler: impl FnOnce(&Session, R::Result) + Send + 'static,
) -> crate::Result<()>
where
R: lsp_types::request::Request,
{
let response_handler = Box::new(
move |session: &Session, response: lsp_server::Response| {
let _span =
tracing::debug_span!("client_response", id=%response.id, method = R::METHOD)
.entered();
match (response.error, response.result) {
(Some(err), _) => {
tracing::error!(
"Got an error from the client (code {code}, method {method}): {message}",
code = err.code,
message = err.message,
method = R::METHOD
);
}
(None, Some(response)) => match serde_json::from_value(response) {
Ok(response) => response_handler(session, response),
Err(error) => {
tracing::error!(
"Failed to deserialize client response (method={method}): {error}",
method = R::METHOD
);
}
},
(None, None) => {
if TypeId::of::<R::Result>() == TypeId::of::<()>() {
// We can't call `response_handler(())` directly here, but
// since we _know_ the type expected is `()`, we can use
// `from_value(Value::Null)`. `R::Result` implements `DeserializeOwned`,
// so this branch works in the general case but we'll only
// hit it if the concrete type is `()`, so the `unwrap()` is safe here.
response_handler(session, serde_json::from_value(Value::Null).unwrap());
} else {
tracing::error!(
"Invalid client response: did not contain a result or error (method={method})",
method = R::METHOD
);
}
}
}
},
);
let id = session
.request_queue()
.outgoing()
.register(response_handler);
self.client_sender
.send(Message::Request(lsp_server::Request {
id,
method: R::METHOD.to_string(),
params: serde_json::to_value(params).context("Failed to serialize params")?,
}))
.with_context(|| {
format!("Failed to send request method={method}", method = R::METHOD)
})?;
Ok(())
}
/// Sends a notification to the client.
pub(crate) fn send_notification<N>(&self, params: N::Params) -> crate::Result<()>
where
N: lsp_types::notification::Notification,
{
let method = N::METHOD.to_string();
self.client_sender
.send(lsp_server::Message::Notification(Notification::new(
method, params,
)))
.map_err(|error| {
anyhow!(
"Failed to send notification (method={method}): {error}",
method = N::METHOD
)
})
}
/// Sends a notification without any parameters to the client.
///
/// This is useful for notifications that don't require any data.
#[expect(dead_code)]
pub(crate) fn send_notification_no_params(&self, method: &str) -> crate::Result<()> {
self.client_sender
.send(lsp_server::Message::Notification(Notification::new(
method.to_string(),
Value::Null,
)))
.map_err(|error| anyhow!("Failed to send notification (method={method}): {error}",))
}
/// Sends a response to the client for a given request ID.
///
/// The response isn't sent immediately. Instead, it's queued up in the main loop
/// and checked for cancellation (each request must have exactly one response).
pub(crate) fn respond<R>(
&self,
id: &RequestId,
result: crate::server::Result<R>,
) -> crate::Result<()>
where
R: serde::Serialize,
{
let response = match result {
Ok(res) => lsp_server::Response::new_ok(id.clone(), res),
Err(crate::server::Error { code, error }) => {
lsp_server::Response::new_err(id.clone(), code as i32, error.to_string())
}
};
self.main_loop_sender
.send(Event::Action(Action::SendResponse(response)))
.map_err(|error| anyhow!("Failed to send response for request {id}: {error}"))
}
/// Sends an error response to the client for a given request ID.
///
/// The response isn't sent immediately. Instead, it's queued up in the main loop.
pub(crate) fn respond_err(
&self,
id: RequestId,
error: lsp_server::ResponseError,
) -> crate::Result<()> {
let response = lsp_server::Response {
id,
result: None,
error: Some(error),
};
self.main_loop_sender
.send(Event::Action(Action::SendResponse(response)))
.map_err(|error| anyhow!("Failed to send response: {error}"))
}
/// Shows a message to the user.
///
/// This opens a pop up in VS Code showing `message`.
pub(crate) fn show_message(
&self,
message: impl Display,
message_type: lsp_types::MessageType,
) -> crate::Result<()> {
self.send_notification::<lsp_types::notification::ShowMessage>(
lsp_types::ShowMessageParams {
typ: message_type,
message: message.to_string(),
},
)
}
/// Sends a request to display a warning to the client with a formatted message. The warning is
/// sent in a `window/showMessage` notification.
///
/// Logs an error if the message could not be sent.
pub(crate) fn show_warning_message(&self, message: impl Display) {
let result = self.show_message(message, lsp_types::MessageType::WARNING);
if let Err(err) = result {
tracing::error!("Failed to send warning message to the client: {err}");
}
}
/// Sends a request to display an error to the client with a formatted message. The error is
/// sent in a `window/showMessage` notification.
///
/// Logs an error if the message could not be sent.
pub(crate) fn show_error_message(&self, message: impl Display) {
let result = self.show_message(message, lsp_types::MessageType::ERROR);
if let Err(err) = result {
tracing::error!("Failed to send error message to the client: {err}");
}
}
/// Re-queues this request after a salsa cancellation for a retry.
///
/// The main loop will skip the retry if the client cancelled the request in the meantime.
pub(crate) fn retry(&self, request: lsp_server::Request) -> crate::Result<()> {
self.main_loop_sender
.send(Event::Action(Action::RetryRequest(request)))
.map_err(|error| anyhow!("Failed to send retry request: {error}"))
}
pub(crate) fn cancel(&self, session: &mut Session, id: RequestId) -> crate::Result<()> {
let method_name = session.request_queue_mut().incoming_mut().cancel(&id);
if let Some(method_name) = method_name {
tracing::debug!("Cancelled request id={id} method={method_name}");
let error = ResponseError {
code: ErrorCode::RequestCanceled as i32,
message: "request was cancelled by client".to_owned(),
data: None,
};
// Use `client_sender` here instead of `respond_err` because
// `respond_err` filters out responses for canceled requests (which we just did!).
self.client_sender
.send(Message::Response(lsp_server::Response {
id,
result: None,
error: Some(error),
}))?;
}
Ok(())
}
}

View file

@ -0,0 +1,197 @@
use crate::session::client::ClientResponseHandler;
use lsp_server::RequestId;
use rustc_hash::FxHashMap;
use std::cell::{Cell, OnceCell, RefCell};
use std::fmt::Formatter;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::Instant;
/// Tracks the pending requests between client and server.
pub(crate) struct RequestQueue {
incoming: Incoming,
outgoing: Outgoing,
}
impl RequestQueue {
pub(super) fn new() -> Self {
Self {
incoming: Incoming::default(),
outgoing: Outgoing::default(),
}
}
pub(crate) fn outgoing_mut(&mut self) -> &mut Outgoing {
&mut self.outgoing
}
/// Returns the server to client request queue.
pub(crate) fn outgoing(&self) -> &Outgoing {
&self.outgoing
}
/// Returns the client to server request queue.
pub(crate) fn incoming(&self) -> &Incoming {
&self.incoming
}
pub(crate) fn incoming_mut(&mut self) -> &mut Incoming {
&mut self.incoming
}
}
/// Requests from client -> server.
///
/// Tracks which requests are pending. Requests that aren't registered are considered completed.
///
/// A request is pending if:
///
/// * it has been registered
/// * it hasn't been cancelled
/// * it hasn't been completed
///
/// Tracking whether a request is pending is required to ensure that the server sends exactly
/// one response for every request as required by the LSP specification.
#[derive(Default, Debug)]
pub(crate) struct Incoming {
pending: FxHashMap<RequestId, PendingRequest>,
}
impl Incoming {
/// Registers a new pending request.
pub(crate) fn register(&mut self, request_id: RequestId, method: String) {
self.pending.insert(request_id, PendingRequest::new(method));
}
/// Cancels the pending request with the given id.
///
/// Returns the method name if the request was still pending, `None` if it was already completed.
pub(super) fn cancel(&mut self, request_id: &RequestId) -> Option<String> {
self.pending.remove(request_id).map(|mut pending| {
if let Some(cancellation_token) = pending.cancellation_token.take() {
cancellation_token.cancel();
}
pending.method
})
}
/// Returns `true` if the request with the given id is still pending.
pub(crate) fn is_pending(&self, request_id: &RequestId) -> bool {
self.pending.contains_key(request_id)
}
/// Returns the cancellation token for the given request id if the request is still pending.
pub(crate) fn cancellation_token(
&self,
request_id: &RequestId,
) -> Option<RequestCancellationToken> {
let pending = self.pending.get(request_id)?;
Some(RequestCancellationToken::clone(
pending
.cancellation_token
.get_or_init(RequestCancellationToken::default),
))
}
/// Marks the request as completed.
///
/// Returns the time when the request was registered and the request method name, or `None` if the request was not pending.
pub(crate) fn complete(&mut self, request_id: &RequestId) -> Option<(Instant, String)> {
self.pending
.remove(request_id)
.map(|pending| (pending.start_time, pending.method))
}
}
/// A request from the client to the server that hasn't been responded yet.
#[derive(Debug)]
struct PendingRequest {
/// The time when the request was registered.
///
/// This does not include the time the request was queued in the main loop before it was registered.
start_time: Instant,
/// The method name of the request.
method: String,
/// A cancellation token to cancel this request.
///
/// This is only initialized for background requests. Local tasks don't support cancellation (unless retried)
/// as they're processed immediately after receiving the request; Making it impossible for a
/// cancellation message to be processed before the task is completed.
cancellation_token: OnceCell<RequestCancellationToken>,
}
impl PendingRequest {
fn new(method: String) -> Self {
Self {
start_time: Instant::now(),
method,
cancellation_token: OnceCell::new(),
}
}
}
/// Token to cancel a specific request.
///
/// Can be shared between threads to check for cancellation *after* a request has been scheduled.
#[derive(Debug, Default)]
pub(crate) struct RequestCancellationToken(Arc<AtomicBool>);
impl RequestCancellationToken {
/// Returns true if the request was cancelled.
pub(crate) fn is_cancelled(&self) -> bool {
self.0.load(std::sync::atomic::Ordering::Relaxed)
}
/// Signals that the request should not be processed because it was cancelled.
fn cancel(&self) {
self.0.store(true, std::sync::atomic::Ordering::Relaxed);
}
fn clone(this: &Self) -> Self {
RequestCancellationToken(this.0.clone())
}
}
/// Requests from server -> client.
#[derive(Default)]
pub(crate) struct Outgoing {
/// The id of the next request sent from the server to the client.
next_request_id: Cell<i32>,
/// A map of request ids to the handlers that process the client-response.
response_handlers: RefCell<FxHashMap<RequestId, ClientResponseHandler>>,
}
impl Outgoing {
/// Registers a handler, returns the id for the request.
#[must_use]
pub(crate) fn register(&self, handler: ClientResponseHandler) -> RequestId {
let id = self.next_request_id.get();
self.next_request_id.set(id + 1);
self.response_handlers
.borrow_mut()
.insert(id.into(), handler);
id.into()
}
/// Marks the request with the given id as complete and returns the handler to process the response.
///
/// Returns `None` if the request was not found.
#[must_use]
pub(crate) fn complete(&mut self, request_id: &RequestId) -> Option<ClientResponseHandler> {
self.response_handlers.get_mut().remove(request_id)
}
}
impl std::fmt::Debug for Outgoing {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Outgoing")
.field("next_request_id", &self.next_request_id)
.field("response_handlers", &"<response handlers>")
.finish()
}
}

View file

@ -99,7 +99,6 @@ impl AllSettings {
serde_json::from_value(options)
.map_err(|err| {
tracing::error!("Failed to deserialize initialization options: {err}. Falling back to default client settings...");
show_err_msg!("ty received invalid client settings - falling back to default client settings.");
})
.unwrap_or_default(),
)