Client request sender and inbound response handling for ruff server (#10620)

## Summary

Fixes #10618.

This PR introduces a proper API for sending requests to the client and
handling any response sent back. Dynamic capability registration now
uses this new API, fixing an issue where a much more simplistic response
handler silently flushes a code action request that needed a response.

## Test Plan

#10618 can no longer be reproduced. No errors about unhandled responses
should appear in the extension output, and you should see this new log
when the server starts:
```
<DATE> <TIME> [info] <DURATION> INFO ruff_server::server Configuration file watcher successfully registered
```
This commit is contained in:
Jane Lewis 2024-03-26 13:53:56 -07:00 committed by GitHub
parent 72aa1ce00f
commit 4d59142255
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 193 additions and 73 deletions

View file

@ -1,9 +1,6 @@
//! Scheduling, I/O, and API endpoints. //! Scheduling, I/O, and API endpoints.
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::Ordering;
use std::time::Duration;
use lsp::Connection; use lsp::Connection;
use lsp_server as lsp; use lsp_server as lsp;
@ -22,6 +19,8 @@ use types::WorkDoneProgressOptions;
use types::WorkspaceFoldersServerCapabilities; use types::WorkspaceFoldersServerCapabilities;
use self::schedule::event_loop_thread; use self::schedule::event_loop_thread;
use self::schedule::Scheduler;
use self::schedule::Task;
use crate::session::Session; use crate::session::Session;
use crate::PositionEncoding; use crate::PositionEncoding;
@ -33,10 +32,10 @@ pub(crate) type Result<T> = std::result::Result<T, api::Error>;
pub struct Server { pub struct Server {
conn: lsp::Connection, conn: lsp::Connection,
client_capabilities: ClientCapabilities,
threads: lsp::IoThreads, threads: lsp::IoThreads,
worker_threads: NonZeroUsize, worker_threads: NonZeroUsize,
session: Session, session: Session,
next_request_id: AtomicI32,
} }
impl Server { impl Server {
@ -50,12 +49,6 @@ impl Server {
let client_capabilities = init_params.capabilities; let client_capabilities = init_params.capabilities;
let server_capabilities = Self::server_capabilities(&client_capabilities); let server_capabilities = Self::server_capabilities(&client_capabilities);
let dynamic_registration = client_capabilities
.workspace
.and_then(|workspace| workspace.did_change_watched_files)
.and_then(|watched_files| watched_files.dynamic_registration)
.unwrap_or_default();
let workspaces = init_params let workspaces = init_params
.workspace_folders .workspace_folders
.map(|folders| folders.into_iter().map(|folder| folder.uri).collect()) .map(|folders| folders.into_iter().map(|folder| folder.uri).collect())
@ -76,22 +69,79 @@ impl Server {
} }
}); });
let next_request_id = AtomicI32::from(1);
conn.initialize_finish(id, initialize_data)?; conn.initialize_finish(id, initialize_data)?;
Ok(Self {
conn,
client_capabilities,
threads,
worker_threads,
session: Session::new(&server_capabilities, &workspaces)?,
})
}
pub fn run(self) -> crate::Result<()> {
let result = event_loop_thread(move || {
Self::event_loop(
&self.conn,
&self.client_capabilities,
self.session,
self.worker_threads,
)
})?
.join();
self.threads.join()?;
result
}
#[allow(clippy::needless_pass_by_value)] // this is because we aren't using `next_request_id` yet.
fn event_loop(
connection: &Connection,
client_capabilities: &ClientCapabilities,
mut session: Session,
worker_threads: NonZeroUsize,
) -> crate::Result<()> {
let mut scheduler =
schedule::Scheduler::new(&mut session, worker_threads, &connection.sender);
Self::try_register_capabilities(client_capabilities, &mut scheduler);
for msg in &connection.receiver {
let task = match msg {
lsp::Message::Request(req) => {
if connection.handle_shutdown(&req)? {
return Ok(());
}
api::request(req)
}
lsp::Message::Notification(notification) => api::notification(notification),
lsp::Message::Response(response) => scheduler.response(response),
};
scheduler.dispatch(task);
}
Ok(())
}
fn try_register_capabilities(
client_capabilities: &ClientCapabilities,
scheduler: &mut Scheduler,
) {
let dynamic_registration = client_capabilities
.workspace
.as_ref()
.and_then(|workspace| workspace.did_change_watched_files)
.and_then(|watched_files| watched_files.dynamic_registration)
.unwrap_or_default();
if dynamic_registration { if dynamic_registration {
// Register capabilities // Register all dynamic capabilities here
conn.sender
.send(lsp_server::Message::Request(lsp_server::Request { // `workspace/didChangeWatchedFiles`
id: next_request_id.fetch_add(1, Ordering::Relaxed).into(), // (this registers the configuration file watcher)
method: "client/registerCapability".into(), let params = lsp_types::RegistrationParams {
params: serde_json::to_value(lsp_types::RegistrationParams {
registrations: vec![lsp_types::Registration { registrations: vec![lsp_types::Registration {
id: "ruff-server-watch".into(), id: "ruff-server-watch".into(),
method: "workspace/didChangeWatchedFiles".into(), method: "workspace/didChangeWatchedFiles".into(),
register_options: Some(serde_json::to_value( register_options: Some(
DidChangeWatchedFilesRegistrationOptions { serde_json::to_value(DidChangeWatchedFilesRegistrationOptions {
watchers: vec![ watchers: vec![
FileSystemWatcher { FileSystemWatcher {
glob_pattern: types::GlobPattern::String( glob_pattern: types::GlobPattern::String(
@ -106,70 +156,25 @@ impl Server {
kind: None, kind: None,
}, },
], ],
},
)?),
}],
})?,
}))?;
// Flush response from the client (to avoid an unexpected response appearing in the event loop)
let _ = conn.receiver.recv_timeout(Duration::from_secs(5)).map_err(|_| {
tracing::error!("Timed out while waiting for client to acknowledge registration of dynamic capabilities");
});
} else {
tracing::warn!("LSP client does not support dynamic file watcher registration - automatic configuration reloading will not be available.");
}
Ok(Self {
conn,
threads,
worker_threads,
session: Session::new(&server_capabilities, &workspaces)?,
next_request_id,
}) })
} .unwrap(),
),
pub fn run(self) -> crate::Result<()> { }],
let result = event_loop_thread(move || {
Self::event_loop(
&self.conn,
self.session,
self.worker_threads,
self.next_request_id,
)
})?
.join();
self.threads.join()?;
result
}
#[allow(clippy::needless_pass_by_value)] // this is because we aren't using `next_request_id` yet.
fn event_loop(
connection: &Connection,
session: Session,
worker_threads: NonZeroUsize,
_next_request_id: AtomicI32,
) -> crate::Result<()> {
let mut scheduler = schedule::Scheduler::new(session, worker_threads, &connection.sender);
for msg in &connection.receiver {
let task = match msg {
lsp::Message::Request(req) => {
if connection.handle_shutdown(&req)? {
return Ok(());
}
api::request(req)
}
lsp::Message::Notification(notification) => api::notification(notification),
lsp::Message::Response(response) => {
tracing::error!(
"Expected request or notification, got response instead: {response:?}"
);
continue;
}
}; };
scheduler.dispatch(task);
let response_handler = |()| {
tracing::info!("Configuration file watcher successfully registered");
Task::nothing()
};
if let Err(err) = scheduler
.request::<lsp_types::request::RegisterCapability>(params, response_handler)
{
tracing::error!("An error occurred when trying to register the configuration file watcher: {err}");
}
} else {
tracing::warn!("LSP client does not support dynamic capability registration - automatic configuration reloading will not be available.");
} }
Ok(())
} }
fn server_capabilities(client_capabilities: &ClientCapabilities) -> types::ServerCapabilities { fn server_capabilities(client_capabilities: &ClientCapabilities) -> types::ServerCapabilities {

View file

@ -1,11 +1,19 @@
use std::any::TypeId;
use lsp_server::{Notification, RequestId}; use lsp_server::{Notification, RequestId};
use rustc_hash::FxHashMap;
use serde_json::Value; use serde_json::Value;
use super::schedule::Task;
pub(crate) type ClientSender = crossbeam::channel::Sender<lsp_server::Message>; pub(crate) type ClientSender = crossbeam::channel::Sender<lsp_server::Message>;
pub(crate) struct Client { type ResponseBuilder<'s> = Box<dyn FnOnce(lsp_server::Response) -> Task<'s>>;
pub(crate) struct Client<'s> {
notifier: Notifier, notifier: Notifier,
responder: Responder, responder: Responder,
pub(super) requester: Requester<'s>,
} }
#[derive(Clone)] #[derive(Clone)]
@ -14,11 +22,22 @@ pub(crate) struct Notifier(ClientSender);
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct Responder(ClientSender); pub(crate) struct Responder(ClientSender);
impl Client { pub(crate) struct Requester<'s> {
sender: ClientSender,
next_request_id: i32,
response_handlers: FxHashMap<lsp_server::RequestId, ResponseBuilder<'s>>,
}
impl<'s> Client<'s> {
pub(super) fn new(sender: &ClientSender) -> Self { pub(super) fn new(sender: &ClientSender) -> Self {
Self { Self {
notifier: Notifier(sender.clone()), notifier: Notifier(sender.clone()),
responder: Responder(sender.clone()), responder: Responder(sender.clone()),
requester: Requester {
sender: sender.clone(),
next_request_id: 1,
response_handlers: FxHashMap::default(),
},
} }
} }
@ -74,3 +93,80 @@ impl Responder {
)?) )?)
} }
} }
impl<'s> Requester<'s> {
/// Sends a request of kind `R` to the client, with associated parameters.
/// The task provided by `response_handler` will be dispatched as soon as the response
/// comes back from the client.
pub(crate) fn request<R>(
&mut self,
params: R::Params,
response_handler: impl Fn(R::Result) -> Task<'s> + 'static,
) -> crate::Result<()>
where
R: lsp_types::request::Request,
{
let serialized_params = serde_json::to_value(params)?;
self.response_handlers.insert(
self.next_request_id.into(),
Box::new(move |response: lsp_server::Response| {
match (response.error, response.result) {
(Some(err), _) => {
tracing::error!(
"Got an error from the client (code {}): {}",
err.code,
err.message
);
Task::nothing()
}
(None, Some(response)) => match serde_json::from_value(response) {
Ok(response) => response_handler(response),
Err(error) => {
tracing::error!("Failed to deserialize response from server: {error}");
Task::nothing()
}
},
(None, None) => {
if TypeId::of::<R::Result>() == TypeId::of::<()>() {
// We can't call `response_handler(())` directly here, but
// since we _know_ the type expected is `()`, we can use
// `from_value(Value::Null)`. `R::Result` implements `DeserializeOwned`,
// so this branch works in the general case but we'll only
// hit it if the concrete type is `()`, so the `unwrap()` is safe here.
response_handler(serde_json::from_value(Value::Null).unwrap());
} else {
tracing::error!(
"Server response was invalid: did not contain a result or error"
);
}
Task::nothing()
}
}
}),
);
self.sender
.send(lsp_server::Message::Request(lsp_server::Request {
id: self.next_request_id.into(),
method: R::METHOD.into(),
params: serialized_params,
}))?;
self.next_request_id += 1;
Ok(())
}
pub(crate) fn pop_response_task(&mut self, response: lsp_server::Response) -> Task<'s> {
if let Some(handler) = self.response_handlers.remove(&response.id) {
handler(response)
} else {
tracing::error!(
"Received a response with ID {}, which was not expected",
response.id
);
Task::nothing()
}
}
}

View file

@ -34,16 +34,16 @@ pub(crate) fn event_loop_thread(
) )
} }
pub(crate) struct Scheduler { pub(crate) struct Scheduler<'s> {
session: Session, session: &'s mut Session,
client: Client, client: Client<'s>,
fmt_pool: thread::Pool, fmt_pool: thread::Pool,
background_pool: thread::Pool, background_pool: thread::Pool,
} }
impl Scheduler { impl<'s> Scheduler<'s> {
pub(super) fn new( pub(super) fn new(
session: Session, session: &'s mut Session,
worker_threads: NonZeroUsize, worker_threads: NonZeroUsize,
sender: &Sender<lsp_server::Message>, sender: &Sender<lsp_server::Message>,
) -> Self { ) -> Self {
@ -56,13 +56,32 @@ impl Scheduler {
} }
} }
/// Immediately sends a request of kind `R` to the client, with associated parameters.
/// The task provided by `response_handler` will be dispatched as soon as the response
/// comes back from the client.
pub(super) fn request<R>(
&mut self,
params: R::Params,
response_handler: impl Fn(R::Result) -> Task<'s> + 'static,
) -> crate::Result<()>
where
R: lsp_types::request::Request,
{
self.client.requester.request::<R>(params, response_handler)
}
/// Creates a task to handle a response from the client.
pub(super) fn response(&mut self, response: lsp_server::Response) -> Task<'s> {
self.client.requester.pop_response_task(response)
}
/// Dispatches a `task` by either running it as a blocking function or /// Dispatches a `task` by either running it as a blocking function or
/// executing it on a background thread pool. /// executing it on a background thread pool.
pub(super) fn dispatch<'s>(&'s mut self, task: task::Task<'s>) { pub(super) fn dispatch(&mut self, task: task::Task<'s>) {
match task { match task {
Task::Sync(SyncTask { func }) => { Task::Sync(SyncTask { func }) => {
func( func(
&mut self.session, self.session,
self.client.notifier(), self.client.notifier(),
self.client.responder(), self.client.responder(),
); );
@ -71,7 +90,7 @@ impl Scheduler {
schedule, schedule,
builder: func, builder: func,
}) => { }) => {
let static_func = func(&self.session); let static_func = func(self.session);
let notifier = self.client.notifier(); let notifier = self.client.notifier();
let responder = self.client.responder(); let responder = self.client.responder();
let task = move || static_func(notifier, responder); let task = move || static_func(notifier, responder);