Support cancellation requests (#18627)
Some checks are pending
CI / Determine changes (push) Waiting to run
CI / cargo fmt (push) Waiting to run
CI / cargo clippy (push) Blocked by required conditions
CI / cargo test (linux) (push) Blocked by required conditions
CI / cargo test (linux, release) (push) Blocked by required conditions
CI / cargo test (windows) (push) Blocked by required conditions
CI / cargo test (wasm) (push) Blocked by required conditions
CI / cargo build (release) (push) Waiting to run
CI / cargo build (msrv) (push) Blocked by required conditions
CI / cargo fuzz build (push) Blocked by required conditions
CI / fuzz parser (push) Blocked by required conditions
CI / test scripts (push) Blocked by required conditions
CI / ecosystem (push) Blocked by required conditions
CI / Fuzz for new ty panics (push) Blocked by required conditions
CI / cargo shear (push) Blocked by required conditions
CI / python package (push) Waiting to run
CI / pre-commit (push) Waiting to run
CI / mkdocs (push) Waiting to run
CI / formatter instabilities and black similarity (push) Blocked by required conditions
CI / test ruff-lsp (push) Blocked by required conditions
CI / check playground (push) Blocked by required conditions
CI / benchmarks (push) Blocked by required conditions
[ty Playground] Release / publish (push) Waiting to run

This commit is contained in:
Micha Reiser 2025-06-12 22:08:42 +02:00 committed by GitHub
parent 1f27d53fd5
commit 015222900f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
46 changed files with 1324 additions and 857 deletions

View file

@ -0,0 +1,248 @@
use crate::Session;
use crate::server::{ConnectionSender, Event, MainLoopSender};
use anyhow::{Context, anyhow};
use lsp_server::{ErrorCode, Message, Notification, RequestId, ResponseError};
use serde_json::Value;
use std::any::TypeId;
use std::fmt::Display;
pub(crate) type ClientResponseHandler = Box<dyn FnOnce(&Client, lsp_server::Response) + Send>;
#[derive(Clone, Debug)]
pub struct Client {
/// Channel to send messages back to the main loop.
main_loop_sender: MainLoopSender,
/// Channel to send messages directly to the LSP client without going through the main loop.
///
/// This is generally preferred because it reduces pressure on the main loop but it may not always be
/// possible if access to data on [`Session`] is required, which background tasks don't have.
client_sender: ConnectionSender,
}
impl Client {
pub fn new(main_loop_sender: MainLoopSender, client_sender: ConnectionSender) -> Self {
Self {
main_loop_sender,
client_sender,
}
}
/// Sends a request of kind `R` to the client, with associated parameters.
///
/// The request is sent immediately.
/// The `response_handler` will be dispatched as soon as the client response
/// is processed on the main-loop. The handler always runs on the main-loop thread.
///
/// # Note
/// This method takes a `session` so that we can register the pending-request
/// and send the response directly to the client. If this ever becomes too limiting (because we
/// need to send a request from somewhere where we don't have access to session), consider introducing
/// a new `send_deferred_request` method that doesn't take a session and instead sends
/// an `Action` to the main loop to send the request (the main loop has always access to session).
pub(crate) fn send_request<R>(
&self,
session: &Session,
params: R::Params,
response_handler: impl FnOnce(&Client, R::Result) + Send + 'static,
) -> crate::Result<()>
where
R: lsp_types::request::Request,
{
let response_handler = Box::new(move |client: &Client, response: lsp_server::Response| {
let _span =
tracing::debug_span!("client_response", id=%response.id, method = R::METHOD)
.entered();
match (response.error, response.result) {
(Some(err), _) => {
tracing::error!(
"Got an error from the client (code {code}, method {method}): {message}",
code = err.code,
message = err.message,
method = R::METHOD
);
}
(None, Some(response)) => match serde_json::from_value(response) {
Ok(response) => response_handler(client, response),
Err(error) => {
tracing::error!(
"Failed to deserialize client response (method={method}): {error}",
method = R::METHOD
);
}
},
(None, None) => {
if TypeId::of::<R::Result>() == TypeId::of::<()>() {
// We can't call `response_handler(())` directly here, but
// since we _know_ the type expected is `()`, we can use
// `from_value(Value::Null)`. `R::Result` implements `DeserializeOwned`,
// so this branch works in the general case but we'll only
// hit it if the concrete type is `()`, so the `unwrap()` is safe here.
response_handler(client, serde_json::from_value(Value::Null).unwrap());
} else {
tracing::error!(
"Invalid client response: did not contain a result or error (method={method})",
method = R::METHOD
);
}
}
}
});
let id = session
.request_queue()
.outgoing()
.register(response_handler);
self.client_sender
.send(Message::Request(lsp_server::Request {
id,
method: R::METHOD.to_string(),
params: serde_json::to_value(params).context("Failed to serialize params")?,
}))
.with_context(|| {
format!("Failed to send request method={method}", method = R::METHOD)
})?;
Ok(())
}
/// Sends a notification to the client.
pub(crate) fn send_notification<N>(&self, params: N::Params) -> crate::Result<()>
where
N: lsp_types::notification::Notification,
{
let method = N::METHOD.to_string();
self.client_sender
.send(lsp_server::Message::Notification(Notification::new(
method, params,
)))
.map_err(|error| {
anyhow!(
"Failed to send notification (method={method}): {error}",
method = N::METHOD
)
})
}
/// Sends a notification without any parameters to the client.
///
/// This is useful for notifications that don't require any data.
#[expect(dead_code)]
pub(crate) fn send_notification_no_params(&self, method: &str) -> crate::Result<()> {
self.client_sender
.send(lsp_server::Message::Notification(Notification::new(
method.to_string(),
Value::Null,
)))
.map_err(|error| anyhow!("Failed to send notification (method={method}): {error}",))
}
/// Sends a response to the client for a given request ID.
///
/// The response isn't sent immediately. Instead, it's queued up in the main loop
/// and checked for cancellation (each request must have exactly one response).
pub(crate) fn respond<R>(
&self,
id: &RequestId,
result: crate::server::Result<R>,
) -> crate::Result<()>
where
R: serde::Serialize,
{
let response = match result {
Ok(res) => lsp_server::Response::new_ok(id.clone(), res),
Err(crate::server::Error { code, error }) => {
lsp_server::Response::new_err(id.clone(), code as i32, error.to_string())
}
};
self.main_loop_sender
.send(Event::SendResponse(response))
.map_err(|error| anyhow!("Failed to send response for request {id}: {error}"))
}
/// Sends an error response to the client for a given request ID.
///
/// The response isn't sent immediately. Instead, it's queued up in the main loop.
pub(crate) fn respond_err(
&self,
id: RequestId,
error: lsp_server::ResponseError,
) -> crate::Result<()> {
let response = lsp_server::Response {
id,
result: None,
error: Some(error),
};
self.main_loop_sender
.send(Event::SendResponse(response))
.map_err(|error| anyhow!("Failed to send response: {error}"))
}
/// Shows a message to the user.
///
/// This opens a pop up in VS Code showing `message`.
pub(crate) fn show_message(
&self,
message: impl Display,
message_type: lsp_types::MessageType,
) -> crate::Result<()> {
self.send_notification::<lsp_types::notification::ShowMessage>(
lsp_types::ShowMessageParams {
typ: message_type,
message: message.to_string(),
},
)
}
/// Sends a request to display a warning to the client with a formatted message. The warning is
/// sent in a `window/showMessage` notification.
///
/// Logs an error if the message could not be sent.
pub(crate) fn show_warning_message(&self, message: impl Display) {
let result = self.show_message(message, lsp_types::MessageType::WARNING);
if let Err(err) = result {
tracing::error!("Failed to send warning message to the client: {err}");
}
}
/// Sends a request to display an error to the client with a formatted message. The error is
/// sent in a `window/showMessage` notification.
///
/// Logs an error if the message could not be sent.
pub(crate) fn show_error_message(&self, message: impl Display) {
let result = self.show_message(message, lsp_types::MessageType::ERROR);
if let Err(err) = result {
tracing::error!("Failed to send error message to the client: {err}");
}
}
pub(crate) fn cancel(&self, session: &mut Session, id: RequestId) -> crate::Result<()> {
let method_name = session.request_queue_mut().incoming_mut().cancel(&id);
if let Some(method_name) = method_name {
tracing::debug!("Cancelled request id={id} method={method_name}");
let error = ResponseError {
code: ErrorCode::RequestCanceled as i32,
message: "request was cancelled by client".to_owned(),
data: None,
};
// Use `client_sender` here instead of `respond_err` because
// `respond_err` filters out responses for canceled requests (which we just did!).
self.client_sender
.send(Message::Response(lsp_server::Response {
id,
result: None,
error: Some(error),
}))?;
}
Ok(())
}
}

View file

@ -11,6 +11,7 @@ use thiserror::Error;
pub(crate) use ruff_settings::RuffSettings;
use crate::edit::LanguageId;
use crate::session::Client;
use crate::session::options::Combine;
use crate::session::settings::GlobalClientSettings;
use crate::workspace::{Workspace, Workspaces};
@ -73,10 +74,11 @@ impl Index {
pub(super) fn new(
workspaces: &Workspaces,
global: &GlobalClientSettings,
client: &Client,
) -> crate::Result<Self> {
let mut settings = WorkspaceSettingsIndex::default();
for workspace in &**workspaces {
settings.register_workspace(workspace, global)?;
settings.register_workspace(workspace, global, client)?;
}
Ok(Self {
@ -173,10 +175,11 @@ impl Index {
&mut self,
url: Url,
global: &GlobalClientSettings,
client: &Client,
) -> crate::Result<()> {
// TODO(jane): Find a way for workspace client settings to be added or changed dynamically.
self.settings
.register_workspace(&Workspace::new(url), global)
.register_workspace(&Workspace::new(url), global, client)
}
pub(super) fn close_workspace_folder(&mut self, workspace_url: &Url) -> crate::Result<()> {
@ -259,7 +262,7 @@ impl Index {
/// registered in [`try_register_capabilities`] method.
///
/// [`try_register_capabilities`]: crate::server::Server::try_register_capabilities
pub(super) fn reload_settings(&mut self, changes: &[FileEvent]) {
pub(super) fn reload_settings(&mut self, changes: &[FileEvent], client: &Client) {
let mut indexed = FxHashSet::default();
for change in changes {
@ -287,6 +290,7 @@ impl Index {
indexed.insert(root.clone());
settings.ruff_settings = ruff_settings::RuffSettingsIndex::new(
client,
root,
settings.client_settings.editor_settings(),
false,
@ -415,11 +419,14 @@ impl WorkspaceSettingsIndex {
&mut self,
workspace: &Workspace,
global: &GlobalClientSettings,
client: &Client,
) -> crate::Result<()> {
let workspace_url = workspace.url();
if workspace_url.scheme() != "file" {
tracing::info!("Ignoring non-file workspace URL: {workspace_url}");
show_warn_msg!("Ruff does not support non-file workspaces; Ignoring {workspace_url}");
client.show_warning_message(format_args!(
"Ruff does not support non-file workspaces; Ignoring {workspace_url}"
));
return Ok(());
}
let workspace_path = workspace_url.to_file_path().map_err(|()| {
@ -431,10 +438,10 @@ impl WorkspaceSettingsIndex {
let settings = match options.into_settings() {
Ok(settings) => settings,
Err(settings) => {
show_err_msg!(
client.show_error_message(format_args!(
"The settings for the workspace {workspace_path} are invalid. Refer to the logs for more information.",
workspace_path = workspace_path.display()
);
));
settings
}
};
@ -444,6 +451,7 @@ impl WorkspaceSettingsIndex {
};
let workspace_settings_index = ruff_settings::RuffSettingsIndex::new(
client,
&workspace_path,
client_settings.editor_settings(),
workspace.is_default(),

View file

@ -18,6 +18,7 @@ use ruff_workspace::{
resolver::ConfigurationTransformer,
};
use crate::session::Client;
use crate::session::options::ConfigurationPreference;
use crate::session::settings::{EditorSettings, ResolvedConfiguration};
@ -155,6 +156,7 @@ impl RuffSettingsIndex {
/// server will be running in a single file mode, then only (1) and (2) will be resolved,
/// skipping (3).
pub(super) fn new(
client: &Client,
root: &Path,
editor_settings: &EditorSettings,
is_default_workspace: bool,
@ -242,10 +244,10 @@ impl RuffSettingsIndex {
// means for different editors.
if is_default_workspace {
if has_error {
show_err_msg!(
client.show_error_message(format!(
"Error while resolving settings from workspace {}. Please refer to the logs for more details.",
root.display()
);
));
}
return RuffSettingsIndex { index, fallback };
@ -358,10 +360,10 @@ impl RuffSettingsIndex {
});
if has_error.load(Ordering::Relaxed) {
show_err_msg!(
client.show_error_message(format!(
"Error while resolving settings from workspace {}. Please refer to the logs for more details.",
root.display()
);
));
}
RuffSettingsIndex {

View file

@ -7,8 +7,9 @@ use serde_json::{Map, Value};
use ruff_linter::{RuleSelector, line_width::LineLength, rule_selector::ParseError};
use crate::session::settings::{
ClientSettings, EditorSettings, GlobalClientSettings, ResolvedConfiguration,
use crate::session::{
Client,
settings::{ClientSettings, EditorSettings, GlobalClientSettings, ResolvedConfiguration},
};
pub(crate) type WorkspaceOptionsMap = FxHashMap<Url, ClientOptions>;
@ -62,10 +63,11 @@ impl GlobalOptions {
&self.client
}
pub fn into_settings(self) -> GlobalClientSettings {
pub fn into_settings(self, client: Client) -> GlobalClientSettings {
GlobalClientSettings {
options: self.client,
settings: std::cell::OnceCell::default(),
client,
}
}
}
@ -367,12 +369,12 @@ pub(crate) struct AllOptions {
impl AllOptions {
/// Initializes the controller from the serialized initialization options.
/// This fails if `options` are not valid initialization options.
pub(crate) fn from_value(options: serde_json::Value) -> Self {
pub(crate) fn from_value(options: serde_json::Value, client: &Client) -> Self {
Self::from_init_options(
serde_json::from_value(options)
.map_err(|err| {
tracing::error!("Failed to deserialize initialization options: {err}. Falling back to default client settings...");
show_err_msg!("Ruff received invalid client settings - falling back to default client settings.");
client.show_error_message("Ruff received invalid client settings - falling back to default client settings.");
})
.unwrap_or_default(),
)
@ -896,10 +898,14 @@ mod tests {
#[test]
fn test_global_only_resolves_correctly() {
let (main_loop_sender, main_loop_receiver) = crossbeam::channel::unbounded();
let (client_sender, client_receiver) = crossbeam::channel::unbounded();
let options = deserialize_fixture(GLOBAL_ONLY_INIT_OPTIONS_FIXTURE);
let AllOptions { global, .. } = AllOptions::from_init_options(options);
let global = global.into_settings();
let client = Client::new(main_loop_sender, client_sender);
let global = global.into_settings(client);
assert_eq!(
global.to_settings(),
&ClientSettings {
@ -922,6 +928,9 @@ mod tests {
},
}
);
assert!(main_loop_receiver.is_empty());
assert!(client_receiver.is_empty());
}
#[test]
@ -959,6 +968,10 @@ mod tests {
#[test]
fn inline_configuration() {
let (main_loop_sender, main_loop_receiver) = crossbeam::channel::unbounded();
let (client_sender, client_receiver) = crossbeam::channel::unbounded();
let client = Client::new(main_loop_sender, client_sender);
let options: InitializationOptions = deserialize_fixture(INLINE_CONFIGURATION_FIXTURE);
let AllOptions {
@ -969,7 +982,7 @@ mod tests {
panic!("Expected global settings only");
};
let global = global.into_settings();
let global = global.into_settings(client);
assert_eq!(
global.to_settings(),
@ -1001,5 +1014,8 @@ mod tests {
}
}
);
assert!(main_loop_receiver.is_empty());
assert!(client_receiver.is_empty());
}
}

View file

@ -0,0 +1,198 @@
use crate::session::client::ClientResponseHandler;
use lsp_server::RequestId;
use rustc_hash::FxHashMap;
use std::cell::{Cell, OnceCell, RefCell};
use std::fmt::Formatter;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::Instant;
/// Tracks the pending requests between client and server.
pub(crate) struct RequestQueue {
incoming: Incoming,
outgoing: Outgoing,
}
impl RequestQueue {
pub(super) fn new() -> Self {
Self {
incoming: Incoming::default(),
outgoing: Outgoing::default(),
}
}
pub(crate) fn outgoing_mut(&mut self) -> &mut Outgoing {
&mut self.outgoing
}
/// Returns the server to client request queue.
pub(crate) fn outgoing(&self) -> &Outgoing {
&self.outgoing
}
/// Returns the client to server request queue.
pub(crate) fn incoming(&self) -> &Incoming {
&self.incoming
}
pub(crate) fn incoming_mut(&mut self) -> &mut Incoming {
&mut self.incoming
}
}
/// Requests from client -> server.
///
/// Tracks which requests are pending. Requests that aren't registered are considered completed.
///
/// A request is pending if:
///
/// * it has been registered
/// * it hasn't been cancelled
/// * it hasn't been completed
///
/// Tracking whether a request is pending is required to ensure that the server sends exactly
/// one response for every request as required by the LSP specification.
#[derive(Default, Debug)]
pub(crate) struct Incoming {
pending: FxHashMap<RequestId, PendingRequest>,
}
impl Incoming {
/// Registers a new pending request.
pub(crate) fn register(&mut self, request_id: RequestId, method: String) {
self.pending.insert(request_id, PendingRequest::new(method));
}
/// Cancels the pending request with the given id.
///
/// Returns the method name if the request was still pending, `None` if it was already completed.
pub(super) fn cancel(&mut self, request_id: &RequestId) -> Option<String> {
self.pending.remove(request_id).map(|mut pending| {
if let Some(cancellation_token) = pending.cancellation_token.take() {
cancellation_token.cancel();
}
pending.method
})
}
/// Returns `true` if the request with the given id is still pending.
#[expect(dead_code)]
pub(crate) fn is_pending(&self, request_id: &RequestId) -> bool {
self.pending.contains_key(request_id)
}
/// Returns the cancellation token for the given request id if the request is still pending.
pub(crate) fn cancellation_token(
&self,
request_id: &RequestId,
) -> Option<RequestCancellationToken> {
let pending = self.pending.get(request_id)?;
Some(RequestCancellationToken::clone(
pending
.cancellation_token
.get_or_init(RequestCancellationToken::default),
))
}
/// Marks the request as completed.
///
/// Returns the time when the request was registered and the request method name, or `None` if the request was not pending.
pub(crate) fn complete(&mut self, request_id: &RequestId) -> Option<(Instant, String)> {
self.pending
.remove(request_id)
.map(|pending| (pending.start_time, pending.method))
}
}
/// A request from the client to the server that hasn't been responded yet.
#[derive(Debug)]
struct PendingRequest {
/// The time when the request was registered.
///
/// This does not include the time the request was queued in the main loop before it was registered.
start_time: Instant,
/// The method name of the request.
method: String,
/// A cancellation token to cancel this request.
///
/// This is only initialized for background requests. Local tasks don't support cancellation (unless retried)
/// as they're processed immediately after receiving the request; Making it impossible for a
/// cancellation message to be processed before the task is completed.
cancellation_token: OnceCell<RequestCancellationToken>,
}
impl PendingRequest {
fn new(method: String) -> Self {
Self {
start_time: Instant::now(),
method,
cancellation_token: OnceCell::new(),
}
}
}
/// Token to cancel a specific request.
///
/// Can be shared between threads to check for cancellation *after* a request has been scheduled.
#[derive(Debug, Default)]
pub(crate) struct RequestCancellationToken(Arc<AtomicBool>);
impl RequestCancellationToken {
/// Returns true if the request was cancelled.
pub(crate) fn is_cancelled(&self) -> bool {
self.0.load(std::sync::atomic::Ordering::Relaxed)
}
/// Signals that the request should not be processed because it was cancelled.
fn cancel(&self) {
self.0.store(true, std::sync::atomic::Ordering::Relaxed);
}
fn clone(this: &Self) -> Self {
RequestCancellationToken(this.0.clone())
}
}
/// Requests from server -> client.
#[derive(Default)]
pub(crate) struct Outgoing {
/// The id of the next request sent from the server to the client.
next_request_id: Cell<i32>,
/// A map of request ids to the handlers that process the client-response.
response_handlers: RefCell<FxHashMap<RequestId, ClientResponseHandler>>,
}
impl Outgoing {
/// Registers a handler, returns the id for the request.
#[must_use]
pub(crate) fn register(&self, handler: ClientResponseHandler) -> RequestId {
let id = self.next_request_id.get();
self.next_request_id.set(id + 1);
self.response_handlers
.borrow_mut()
.insert(id.into(), handler);
id.into()
}
/// Marks the request with the given id as complete and returns the handler to process the response.
///
/// Returns `None` if the request was not found.
#[must_use]
pub(crate) fn complete(&mut self, request_id: &RequestId) -> Option<ClientResponseHandler> {
self.response_handlers.get_mut().remove(request_id)
}
}
impl std::fmt::Debug for Outgoing {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Outgoing")
.field("next_request_id", &self.next_request_id)
.field("response_handlers", &"<response handlers>")
.finish()
}
}

View file

@ -8,7 +8,10 @@ use ruff_workspace::options::Options;
use crate::{
ClientOptions,
session::options::{ClientConfiguration, ConfigurationPreference},
session::{
Client,
options::{ClientConfiguration, ConfigurationPreference},
},
};
pub struct GlobalClientSettings {
@ -20,6 +23,8 @@ pub struct GlobalClientSettings {
/// when the workspace settings e.g. select some rules that aren't available in a specific workspace
/// and said workspace overrides the selected rules.
pub(super) settings: std::cell::OnceCell<Arc<ClientSettings>>,
pub(super) client: Client,
}
impl GlobalClientSettings {
@ -33,7 +38,7 @@ impl GlobalClientSettings {
let settings = match settings {
Ok(settings) => settings,
Err(settings) => {
show_err_msg!(
self.client.show_error_message(
"Ruff received invalid settings from the editor. Refer to the logs for more information."
);
settings