diff --git a/crates/ruff_server/src/server.rs b/crates/ruff_server/src/server.rs index d5282560b0..5bce4bf794 100644 --- a/crates/ruff_server/src/server.rs +++ b/crates/ruff_server/src/server.rs @@ -1,9 +1,6 @@ //! Scheduling, I/O, and API endpoints. use std::num::NonZeroUsize; -use std::sync::atomic::AtomicI32; -use std::sync::atomic::Ordering; -use std::time::Duration; use lsp::Connection; use lsp_server as lsp; @@ -22,6 +19,8 @@ use types::WorkDoneProgressOptions; use types::WorkspaceFoldersServerCapabilities; use self::schedule::event_loop_thread; +use self::schedule::Scheduler; +use self::schedule::Task; use crate::session::Session; use crate::PositionEncoding; @@ -33,10 +32,10 @@ pub(crate) type Result = std::result::Result; pub struct Server { conn: lsp::Connection, + client_capabilities: ClientCapabilities, threads: lsp::IoThreads, worker_threads: NonZeroUsize, session: Session, - next_request_id: AtomicI32, } impl Server { @@ -50,12 +49,6 @@ impl Server { let client_capabilities = init_params.capabilities; let server_capabilities = Self::server_capabilities(&client_capabilities); - let dynamic_registration = client_capabilities - .workspace - .and_then(|workspace| workspace.did_change_watched_files) - .and_then(|watched_files| watched_files.dynamic_registration) - .unwrap_or_default(); - let workspaces = init_params .workspace_folders .map(|folders| folders.into_iter().map(|folder| folder.uri).collect()) @@ -76,56 +69,14 @@ impl Server { } }); - let next_request_id = AtomicI32::from(1); - conn.initialize_finish(id, initialize_data)?; - if dynamic_registration { - // Register capabilities - conn.sender - .send(lsp_server::Message::Request(lsp_server::Request { - id: next_request_id.fetch_add(1, Ordering::Relaxed).into(), - method: "client/registerCapability".into(), - params: serde_json::to_value(lsp_types::RegistrationParams { - registrations: vec![lsp_types::Registration { - id: "ruff-server-watch".into(), - method: "workspace/didChangeWatchedFiles".into(), - register_options: Some(serde_json::to_value( - DidChangeWatchedFilesRegistrationOptions { - watchers: vec![ - FileSystemWatcher { - glob_pattern: types::GlobPattern::String( - "**/.?ruff.toml".into(), - ), - kind: None, - }, - FileSystemWatcher { - glob_pattern: types::GlobPattern::String( - "**/pyproject.toml".into(), - ), - kind: None, - }, - ], - }, - )?), - }], - })?, - }))?; - - // Flush response from the client (to avoid an unexpected response appearing in the event loop) - let _ = conn.receiver.recv_timeout(Duration::from_secs(5)).map_err(|_| { - tracing::error!("Timed out while waiting for client to acknowledge registration of dynamic capabilities"); - }); - } else { - tracing::warn!("LSP client does not support dynamic file watcher registration - automatic configuration reloading will not be available."); - } - Ok(Self { conn, + client_capabilities, threads, worker_threads, session: Session::new(&server_capabilities, &workspaces)?, - next_request_id, }) } @@ -133,9 +84,9 @@ impl Server { let result = event_loop_thread(move || { Self::event_loop( &self.conn, + &self.client_capabilities, self.session, self.worker_threads, - self.next_request_id, ) })? .join(); @@ -146,11 +97,14 @@ impl Server { #[allow(clippy::needless_pass_by_value)] // this is because we aren't using `next_request_id` yet. fn event_loop( connection: &Connection, - session: Session, + client_capabilities: &ClientCapabilities, + mut session: Session, worker_threads: NonZeroUsize, - _next_request_id: AtomicI32, ) -> crate::Result<()> { - let mut scheduler = schedule::Scheduler::new(session, worker_threads, &connection.sender); + let mut scheduler = + schedule::Scheduler::new(&mut session, worker_threads, &connection.sender); + + Self::try_register_capabilities(client_capabilities, &mut scheduler); for msg in &connection.receiver { let task = match msg { lsp::Message::Request(req) => { @@ -160,18 +114,69 @@ impl Server { api::request(req) } lsp::Message::Notification(notification) => api::notification(notification), - lsp::Message::Response(response) => { - tracing::error!( - "Expected request or notification, got response instead: {response:?}" - ); - continue; - } + lsp::Message::Response(response) => scheduler.response(response), }; scheduler.dispatch(task); } Ok(()) } + fn try_register_capabilities( + client_capabilities: &ClientCapabilities, + scheduler: &mut Scheduler, + ) { + let dynamic_registration = client_capabilities + .workspace + .as_ref() + .and_then(|workspace| workspace.did_change_watched_files) + .and_then(|watched_files| watched_files.dynamic_registration) + .unwrap_or_default(); + if dynamic_registration { + // Register all dynamic capabilities here + + // `workspace/didChangeWatchedFiles` + // (this registers the configuration file watcher) + let params = lsp_types::RegistrationParams { + registrations: vec![lsp_types::Registration { + id: "ruff-server-watch".into(), + method: "workspace/didChangeWatchedFiles".into(), + register_options: Some( + serde_json::to_value(DidChangeWatchedFilesRegistrationOptions { + watchers: vec![ + FileSystemWatcher { + glob_pattern: types::GlobPattern::String( + "**/.?ruff.toml".into(), + ), + kind: None, + }, + FileSystemWatcher { + glob_pattern: types::GlobPattern::String( + "**/pyproject.toml".into(), + ), + kind: None, + }, + ], + }) + .unwrap(), + ), + }], + }; + + let response_handler = |()| { + tracing::info!("Configuration file watcher successfully registered"); + Task::nothing() + }; + + if let Err(err) = scheduler + .request::(params, response_handler) + { + tracing::error!("An error occurred when trying to register the configuration file watcher: {err}"); + } + } else { + tracing::warn!("LSP client does not support dynamic capability registration - automatic configuration reloading will not be available."); + } + } + fn server_capabilities(client_capabilities: &ClientCapabilities) -> types::ServerCapabilities { let position_encoding = client_capabilities .general diff --git a/crates/ruff_server/src/server/client.rs b/crates/ruff_server/src/server/client.rs index 5eafdf9b82..d36c50ef66 100644 --- a/crates/ruff_server/src/server/client.rs +++ b/crates/ruff_server/src/server/client.rs @@ -1,11 +1,19 @@ +use std::any::TypeId; + use lsp_server::{Notification, RequestId}; +use rustc_hash::FxHashMap; use serde_json::Value; +use super::schedule::Task; + pub(crate) type ClientSender = crossbeam::channel::Sender; -pub(crate) struct Client { +type ResponseBuilder<'s> = Box Task<'s>>; + +pub(crate) struct Client<'s> { notifier: Notifier, responder: Responder, + pub(super) requester: Requester<'s>, } #[derive(Clone)] @@ -14,11 +22,22 @@ pub(crate) struct Notifier(ClientSender); #[derive(Clone)] pub(crate) struct Responder(ClientSender); -impl Client { +pub(crate) struct Requester<'s> { + sender: ClientSender, + next_request_id: i32, + response_handlers: FxHashMap>, +} + +impl<'s> Client<'s> { pub(super) fn new(sender: &ClientSender) -> Self { Self { notifier: Notifier(sender.clone()), responder: Responder(sender.clone()), + requester: Requester { + sender: sender.clone(), + next_request_id: 1, + response_handlers: FxHashMap::default(), + }, } } @@ -74,3 +93,80 @@ impl Responder { )?) } } + +impl<'s> Requester<'s> { + /// Sends a request of kind `R` to the client, with associated parameters. + /// The task provided by `response_handler` will be dispatched as soon as the response + /// comes back from the client. + pub(crate) fn request( + &mut self, + params: R::Params, + response_handler: impl Fn(R::Result) -> Task<'s> + 'static, + ) -> crate::Result<()> + where + R: lsp_types::request::Request, + { + let serialized_params = serde_json::to_value(params)?; + + self.response_handlers.insert( + self.next_request_id.into(), + Box::new(move |response: lsp_server::Response| { + match (response.error, response.result) { + (Some(err), _) => { + tracing::error!( + "Got an error from the client (code {}): {}", + err.code, + err.message + ); + Task::nothing() + } + (None, Some(response)) => match serde_json::from_value(response) { + Ok(response) => response_handler(response), + Err(error) => { + tracing::error!("Failed to deserialize response from server: {error}"); + Task::nothing() + } + }, + (None, None) => { + if TypeId::of::() == TypeId::of::<()>() { + // We can't call `response_handler(())` directly here, but + // since we _know_ the type expected is `()`, we can use + // `from_value(Value::Null)`. `R::Result` implements `DeserializeOwned`, + // so this branch works in the general case but we'll only + // hit it if the concrete type is `()`, so the `unwrap()` is safe here. + response_handler(serde_json::from_value(Value::Null).unwrap()); + } else { + tracing::error!( + "Server response was invalid: did not contain a result or error" + ); + } + Task::nothing() + } + } + }), + ); + + self.sender + .send(lsp_server::Message::Request(lsp_server::Request { + id: self.next_request_id.into(), + method: R::METHOD.into(), + params: serialized_params, + }))?; + + self.next_request_id += 1; + + Ok(()) + } + + pub(crate) fn pop_response_task(&mut self, response: lsp_server::Response) -> Task<'s> { + if let Some(handler) = self.response_handlers.remove(&response.id) { + handler(response) + } else { + tracing::error!( + "Received a response with ID {}, which was not expected", + response.id + ); + Task::nothing() + } + } +} diff --git a/crates/ruff_server/src/server/schedule.rs b/crates/ruff_server/src/server/schedule.rs index 00368a411f..3e5ecbd35a 100644 --- a/crates/ruff_server/src/server/schedule.rs +++ b/crates/ruff_server/src/server/schedule.rs @@ -34,16 +34,16 @@ pub(crate) fn event_loop_thread( ) } -pub(crate) struct Scheduler { - session: Session, - client: Client, +pub(crate) struct Scheduler<'s> { + session: &'s mut Session, + client: Client<'s>, fmt_pool: thread::Pool, background_pool: thread::Pool, } -impl Scheduler { +impl<'s> Scheduler<'s> { pub(super) fn new( - session: Session, + session: &'s mut Session, worker_threads: NonZeroUsize, sender: &Sender, ) -> Self { @@ -56,13 +56,32 @@ impl Scheduler { } } + /// Immediately sends a request of kind `R` to the client, with associated parameters. + /// The task provided by `response_handler` will be dispatched as soon as the response + /// comes back from the client. + pub(super) fn request( + &mut self, + params: R::Params, + response_handler: impl Fn(R::Result) -> Task<'s> + 'static, + ) -> crate::Result<()> + where + R: lsp_types::request::Request, + { + self.client.requester.request::(params, response_handler) + } + + /// Creates a task to handle a response from the client. + pub(super) fn response(&mut self, response: lsp_server::Response) -> Task<'s> { + self.client.requester.pop_response_task(response) + } + /// Dispatches a `task` by either running it as a blocking function or /// executing it on a background thread pool. - pub(super) fn dispatch<'s>(&'s mut self, task: task::Task<'s>) { + pub(super) fn dispatch(&mut self, task: task::Task<'s>) { match task { Task::Sync(SyncTask { func }) => { func( - &mut self.session, + self.session, self.client.notifier(), self.client.responder(), ); @@ -71,7 +90,7 @@ impl Scheduler { schedule, builder: func, }) => { - let static_func = func(&self.session); + let static_func = func(self.session); let notifier = self.client.notifier(); let responder = self.client.responder(); let task = move || static_func(notifier, responder);