diff --git a/Cargo.lock b/Cargo.lock index f3c2aec6..d83a4768 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3891,10 +3891,10 @@ version = "0.13.10" dependencies = [ "insta", "lsp-types", - "reflexo", "serde", "serde_json", "sync-ls", + "tinymist-std", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9759dc7b..b0f9d99b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -151,7 +151,7 @@ typlite = { path = "./crates/typlite" } crossbeam-channel = "0.5.12" lsp-types = { version = "=0.95.0", features = ["proposed"] } dapts = "0.0.6" -sync-ls = { path = "./crates/sync-lsp", features = ["clap"] } +sync-ls = { path = "./crates/sync-lsp" } # CLI clap = { version = "4.5", features = ["derive", "env", "unicode"] } diff --git a/crates/sync-lsp/Cargo.toml b/crates/sync-lsp/Cargo.toml index 35920f62..b3d68434 100644 --- a/crates/sync-lsp/Cargo.toml +++ b/crates/sync-lsp/Cargo.toml @@ -14,25 +14,33 @@ rust-version.workspace = true [dependencies] anyhow.workspace = true -clap = { workspace = true, optional = true } -crossbeam-channel.workspace = true dapts = { workspace = true, optional = true } -futures.workspace = true log.workspace = true lsp-types = { workspace = true, optional = true } -parking_lot.workspace = true serde.workspace = true serde_json.workspace = true -tokio = { workspace = true, features = ["rt", "time"] } -tokio-util.workspace = true + +clap = { workspace = true, optional = true } +crossbeam-channel = { workspace = true, optional = true } +futures = { workspace = true, optional = true } +parking_lot = { workspace = true, optional = true } +tokio = { workspace = true, features = ["rt", "time"], optional = true } +tokio-util = { workspace = true, optional = true } [features] -default = ["dap", "lsp"] dap = ["dapts"] lsp = ["lsp-types"] +server = [ + "crossbeam-channel", + "futures", + "tokio", + "tokio-util", + "clap", + "parking_lot", +] [package.metadata.docs.rs] -features = ["dap", "lsp"] +features = ["dap", "lsp", "server"] [lints] workspace = true diff --git a/crates/sync-lsp/src/lib.rs b/crates/sync-lsp/src/lib.rs index ebef51d7..d734a93e 100644 --- a/crates/sync-lsp/src/lib.rs +++ b/crates/sync-lsp/src/lib.rs @@ -4,420 +4,27 @@ pub mod dap; #[cfg(feature = "lsp")] pub mod lsp; -pub mod req_queue; -pub mod transport; - -pub use error::*; -pub use msg::*; -pub use server::*; mod error; +pub use error::*; + mod msg; +pub use msg::*; + +#[cfg(feature = "server")] +pub use server::*; +#[cfg(feature = "server")] +pub mod req_queue; +#[cfg(feature = "server")] mod server; +#[cfg(feature = "server")] +pub mod transport; use std::any::Any; -use std::pin::Pin; -use std::sync::{Arc, Weak}; -use std::time::Instant; -use futures::future::MaybeDone; -use parking_lot::Mutex; -use serde::Serialize; -use serde_json::Value as JsonValue; - -type Event = Box; - -/// The sender of the language server. -#[derive(Debug, Clone)] -pub struct TConnectionTx { - /// The sender of the events. - pub event: crossbeam_channel::Sender, - /// The sender of the LSP messages. - pub lsp: crossbeam_channel::Sender, - marker: std::marker::PhantomData, -} - -/// The sender of the language server. -#[derive(Debug, Clone)] -pub struct TConnectionRx { - /// The receiver of the events. - pub event: crossbeam_channel::Receiver, - /// The receiver of the LSP messages. - pub lsp: crossbeam_channel::Receiver, - marker: std::marker::PhantomData, -} - -impl> TConnectionRx { - /// Receives a message or an event. - fn recv(&self) -> anyhow::Result> { - crossbeam_channel::select_biased! { - recv(self.lsp) -> msg => Ok(EventOrMessage::Msg(msg?.try_into()?)), - recv(self.event) -> event => Ok(event.map(EventOrMessage::Evt)?), - } - } -} - -/// The untyped connect tx for the language server. -pub type ConnectionTx = TConnectionTx; -/// The untyped connect rx for the language server. -pub type ConnectionRx = TConnectionRx; - -/// This is a helper enum to handle both events and messages. -enum EventOrMessage { - Evt(Event), - Msg(M), -} - -/// Connection is just a pair of channels of LSP messages. -pub struct Connection { - /// The senders of the connection. - pub sender: TConnectionTx, - /// The receivers of the connection. - pub receiver: TConnectionRx, -} - -impl> From> for Connection { - fn from(conn: Connection) -> Self { - Self { - sender: TConnectionTx { - event: conn.sender.event, - lsp: conn.sender.lsp, - marker: std::marker::PhantomData, - }, - receiver: TConnectionRx { - event: conn.receiver.event, - lsp: conn.receiver.lsp, - marker: std::marker::PhantomData, - }, - } - } -} - -impl> From> for ConnectionTx { - fn from(conn: TConnectionTx) -> Self { - Self { - event: conn.event, - lsp: conn.lsp, - marker: std::marker::PhantomData, - } - } -} - -/// The common error type for the language server. -pub use msg::ResponseError; -/// The common result type for the language server. +/// The common error type for language servers. +pub use crate::msg::ResponseError; +/// The common result type for language servers. pub type LspResult = Result; -/// A future that may be done in place or not. -pub type ResponseFuture = MaybeDone + Send>>>; -/// A future that may be rejected before actual started. -pub type LspResponseFuture = LspResult>; -/// A future that could be rejected by common error in `LspResponseFuture`. -pub type SchedulableResponse = LspResponseFuture>; -/// The common future type for the language server. -pub type AnySchedulableResponse = SchedulableResponse; -/// The result of a scheduled response which could be finally caught by -/// `schedule_tail`. -/// - Returns Ok(Some()) -> Already responded -/// - Returns Ok(None) -> Need to respond none -/// - Returns Err(..) -> Need to respond error -pub type ScheduledResult = LspResult>; - -/// A helper function to create a `LspResponseFuture` -pub fn just_ok(res: T) -> Result>, E> { - Ok(futures::future::MaybeDone::Done(Ok(res))) -} -/// A helper function to create a `LspResponseFuture` -pub fn just_result(res: Result) -> Result>, E> { - Ok(futures::future::MaybeDone::Done(res)) -} -/// A helper function to create a `LspResponseFuture` -pub fn just_future( - fut: impl std::future::Future> + Send + 'static, -) -> Result>, E> { - Ok(futures::future::MaybeDone::Future(Box::pin(fut))) -} - -type AnyCaster = Arc &mut S + Send + Sync>; - -/// A Lsp client with typed service `S`. -pub struct TypedLspClient { - client: LspClient, - caster: AnyCaster, -} - -impl TypedLspClient { - /// Converts the client to an untyped client. - pub fn to_untyped(self) -> LspClient { - self.client - } -} - -impl TypedLspClient { - /// Returns the untyped lsp client. - pub fn untyped(&self) -> &LspClient { - &self.client - } - - /// Casts the service to another type. - pub fn cast(&self, f: fn(&mut S) -> &mut T) -> TypedLspClient { - let caster = self.caster.clone(); - TypedLspClient { - client: self.client.clone(), - caster: Arc::new(move |s| f(caster(s))), - } - } - - /// Sends a event to the client itself. - pub fn send_event(&self, event: T) { - let Some(sender) = self.sender.upgrade() else { - log::warn!("failed to send request: connection closed"); - return; - }; - - let Err(res) = sender.event.send(Box::new(event)) else { - return; - }; - log::warn!("failed to send event: {res:?}"); - } -} - -impl Clone for TypedLspClient { - fn clone(&self) -> Self { - Self { - client: self.client.clone(), - caster: self.caster.clone(), - } - } -} - -impl std::ops::Deref for TypedLspClient { - type Target = LspClient; - - fn deref(&self) -> &Self::Target { - &self.client - } -} - -/// The root of the language server host. -/// Will close connection when dropped. -#[derive(Debug, Clone)] -pub struct LspClientRoot { - weak: LspClient, - _strong: Arc, -} - -impl LspClientRoot { - /// Creates a new language server host. - pub fn new + GetMessageKind>( - handle: tokio::runtime::Handle, - sender: TConnectionTx, - ) -> Self { - let _strong = Arc::new(sender.into()); - let weak = LspClient { - handle, - msg_kind: M::get_message_kind(), - sender: Arc::downgrade(&_strong), - req_queue: Arc::new(Mutex::new(ReqQueue::default())), - }; - Self { weak, _strong } - } - - /// Returns the weak reference to the language server host. - pub fn weak(&self) -> LspClient { - self.weak.clone() - } -} - -type ReqHandler = Box FnOnce(&'a mut dyn Any, LspOrDapResponse) + Send + Sync>; -type ReqQueue = req_queue::ReqQueue<(String, Instant), ReqHandler>; - -/// The host for the language server, or known as the LSP client. -#[derive(Debug, Clone)] -pub struct LspClient { - /// The tokio handle. - pub handle: tokio::runtime::Handle, - - msg_kind: MessageKind, - sender: Weak, - req_queue: Arc>, -} - -impl LspClient { - /// Returns the untyped lsp client. - pub fn untyped(&self) -> &Self { - self - } - - /// converts the client to a typed client. - pub fn to_typed(&self) -> TypedLspClient { - TypedLspClient { - client: self.clone(), - caster: Arc::new(|s| s.downcast_mut().expect("invalid cast")), - } - } - - /// Checks if there are pending requests. - pub fn has_pending_requests(&self) -> bool { - self.req_queue.lock().incoming.has_pending() - } - - /// Prints states of the request queue and panics. - pub fn begin_panic(&self) { - self.req_queue.lock().begin_panic(); - } - - /// Sends a event to the server itself. - pub fn send_event(&self, event: T) { - let Some(sender) = self.sender.upgrade() else { - log::warn!("failed to send request: connection closed"); - return; - }; - - if let Err(res) = sender.event.send(Box::new(event)) { - log::warn!("failed to send event: {res:?}"); - } - } - - /// Completes an server2client request in the request queue. - #[cfg(feature = "lsp")] - pub fn complete_lsp_request(&self, service: &mut S, response: lsp::Response) { - let mut req_queue = self.req_queue.lock(); - let Some(handler) = req_queue.outgoing.complete(response.id.clone()) else { - log::warn!("received response for unknown request"); - return; - }; - drop(req_queue); - handler(service, response.into()) - } - - /// Completes an server2client request in the request queue. - #[cfg(feature = "dap")] - pub fn complete_dap_request(&self, service: &mut S, response: dap::Response) { - let mut req_queue = self.req_queue.lock(); - let Some(handler) = req_queue - .outgoing - // todo: casting i64 to i32 - .complete((response.request_seq as i32).into()) - else { - log::warn!("received response for unknown request"); - return; - }; - drop(req_queue); - handler(service, response.into()) - } - - /// Registers an client2server request in the request queue. - pub fn register_request(&self, method: &str, id: &RequestId, received_at: Instant) { - let mut req_queue = self.req_queue.lock(); - self.start_request(id, method); - req_queue - .incoming - .register(id.clone(), (method.to_owned(), received_at)); - } - - /// Responds a typed result to the client. - pub fn respond_result(&self, id: RequestId, result: LspResult) { - let result = result.and_then(|t| serde_json::to_value(t).map_err(internal_error)); - self.respond_any_result(id, result); - } - - fn respond_any_result(&self, id: RequestId, result: LspResult) { - let req_id = id.clone(); - let msg: Message = match (self.msg_kind, result) { - #[cfg(feature = "lsp")] - (MessageKind::Lsp, Ok(resp)) => lsp::Response::new_ok(id, resp).into(), - #[cfg(feature = "lsp")] - (MessageKind::Lsp, Err(e)) => lsp::Response::new_err(id, e.code, e.message).into(), - #[cfg(feature = "dap")] - (MessageKind::Dap, Ok(resp)) => dap::Response::success(RequestId::dap(id), resp).into(), - #[cfg(feature = "dap")] - (MessageKind::Dap, Err(e)) => { - dap::Response::error(RequestId::dap(id), Some(e.message), None).into() - } - }; - - self.respond(req_id, msg); - } - - /// Completes an client2server request in the request queue. - pub fn respond(&self, id: RequestId, response: Message) { - let mut req_queue = self.req_queue.lock(); - let Some((method, received_at)) = req_queue.incoming.complete(&id) else { - return; - }; - - self.stop_request(&id, &method, received_at); - - let Some(sender) = self.sender.upgrade() else { - log::warn!("failed to send response ({method}, {id}): connection closed"); - return; - }; - if let Err(res) = sender.lsp.send(response) { - log::warn!("failed to send response ({method}, {id}): {res:?}"); - } - } -} - -impl LspClient { - /// Schedules a request from the client. - pub fn schedule( - &self, - req_id: RequestId, - resp: SchedulableResponse, - ) -> ScheduledResult { - let resp = resp?; - - use futures::future::MaybeDone::*; - match resp { - Done(output) => { - self.respond_result(req_id, output); - } - Future(fut) => { - let client = self.clone(); - let req_id = req_id.clone(); - self.handle.spawn(async move { - client.respond_result(req_id, fut.await); - }); - } - Gone => { - log::warn!("response for request({req_id:?}) already taken"); - } - }; - - Ok(Some(())) - } - - /// Catch the early rejected requests. - fn schedule_tail(&self, req_id: RequestId, resp: ScheduledResult) { - match resp { - // Already responded - Ok(Some(())) => {} - // The requests that doesn't start. - _ => self.respond_result(req_id, resp), - } - } -} - -impl LspClient { - fn start_request(&self, req_id: &RequestId, method: &str) { - log::info!("handling {method} - ({req_id})"); - } - - fn stop_request(&self, req_id: &RequestId, method: &str, received_at: Instant) { - let duration = received_at.elapsed(); - log::info!("handled {method} - ({req_id}) in {duration:0.2?}"); - } - - fn start_notification(&self, method: &str) { - log::info!("notifying {method}"); - } - - fn stop_notification(&self, method: &str, received_at: Instant, result: LspResult<()>) { - let request_duration = received_at.elapsed(); - if let Err(err) = result { - log::error!("notify {method} failed in {request_duration:0.2?}: {err:?}"); - } else { - log::info!("notify {method} succeeded in {request_duration:0.2?}"); - } - } -} +/// The common event type for language servers. +pub type Event = Box; diff --git a/crates/sync-lsp/src/lsp.rs b/crates/sync-lsp/src/lsp.rs index 17a7e8b7..be8332e9 100644 --- a/crates/sync-lsp/src/lsp.rs +++ b/crates/sync-lsp/src/lsp.rs @@ -176,6 +176,8 @@ impl Notification { }), } } + + #[cfg(feature = "server")] pub(crate) fn is_exit(&self) -> bool { self.method == "exit" } diff --git a/crates/sync-lsp/src/msg.rs b/crates/sync-lsp/src/msg.rs index 27d9a56e..1cca9bba 100644 --- a/crates/sync-lsp/src/msg.rs +++ b/crates/sync-lsp/src/msg.rs @@ -18,7 +18,7 @@ use crate::lsp; pub struct RequestId(IdRepr); impl RequestId { - #[cfg(feature = "dap")] + #[cfg(all(feature = "dap", feature = "server"))] pub(crate) fn dap(id: RequestId) -> i64 { match id.0 { IdRepr::I32(it) => it as i64, diff --git a/crates/sync-lsp/src/server.rs b/crates/sync-lsp/src/server.rs index b78857d5..08b48b4a 100644 --- a/crates/sync-lsp/src/server.rs +++ b/crates/sync-lsp/src/server.rs @@ -1,28 +1,408 @@ //! A synchronous language server implementation. -use core::fmt; -use std::any::Any; -use std::collections::HashMap; -use std::path::{Path, PathBuf}; -use std::time::Instant; - -use serde::Serialize; -use serde_json::{from_value, Value as JsonValue}; - #[cfg(feature = "dap")] mod dap_srv; #[cfg(feature = "lsp")] mod lsp_srv; -#[cfg(feature = "lsp")] -use lsp::{Notification, Request}; +use core::fmt; +use std::any::Any; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::sync::{Arc, Weak}; +use std::time::Instant; + +use futures::future::MaybeDone; +use parking_lot::Mutex; +use serde::Serialize; +use serde_json::{from_value, Value as JsonValue}; + +#[cfg(feature = "lsp")] +use crate::lsp::{Notification, Request}; use crate::msg::*; +use crate::req_queue; use crate::*; -type Event = Box; type ImmutPath = Arc; +/// A future that may be done in place or not. +pub type ResponseFuture = MaybeDone + Send>>>; +/// A future that may be rejected before actual started. +pub type LspResponseFuture = LspResult>; +/// A future that could be rejected by common error in `LspResponseFuture`. +pub type SchedulableResponse = LspResponseFuture>; +/// The common response future type for language servers. +pub type AnySchedulableResponse = SchedulableResponse; +/// The result of a scheduled response which could be finally caught by +/// `schedule_tail`. +/// - Returns Ok(Some()) -> Already responded +/// - Returns Ok(None) -> Need to respond none +/// - Returns Err(..) -> Need to respond error +pub type ScheduledResult = LspResult>; + +/// The untyped connect tx for language servers. +pub type ConnectionTx = TConnectionTx; +/// The untyped connect rx for language servers. +pub type ConnectionRx = TConnectionRx; + +/// The sender of the language server. +#[derive(Debug, Clone)] +pub struct TConnectionTx { + /// The sender of the events. + pub event: crossbeam_channel::Sender, + /// The sender of the LSP messages. + pub lsp: crossbeam_channel::Sender, + pub(crate) marker: std::marker::PhantomData, +} + +/// The sender of the language server. +#[derive(Debug, Clone)] +pub struct TConnectionRx { + /// The receiver of the events. + pub event: crossbeam_channel::Receiver, + /// The receiver of the LSP messages. + pub lsp: crossbeam_channel::Receiver, + pub(crate) marker: std::marker::PhantomData, +} + +impl> TConnectionRx { + /// Receives a message or an event. + pub(crate) fn recv(&self) -> anyhow::Result> { + crossbeam_channel::select_biased! { + recv(self.lsp) -> msg => Ok(EventOrMessage::Msg(msg?.try_into()?)), + recv(self.event) -> event => Ok(event.map(EventOrMessage::Evt)?), + } + } +} + +/// This is a helper enum to handle both events and messages. +pub(crate) enum EventOrMessage { + Evt(Event), + Msg(M), +} + +/// Connection is just a pair of channels of LSP messages. +pub struct Connection { + /// The senders of the connection. + pub sender: TConnectionTx, + /// The receivers of the connection. + pub receiver: TConnectionRx, +} + +impl> From> for Connection { + fn from(conn: Connection) -> Self { + Self { + sender: TConnectionTx { + event: conn.sender.event, + lsp: conn.sender.lsp, + marker: std::marker::PhantomData, + }, + receiver: TConnectionRx { + event: conn.receiver.event, + lsp: conn.receiver.lsp, + marker: std::marker::PhantomData, + }, + } + } +} + +impl> From> for ConnectionTx { + fn from(conn: TConnectionTx) -> Self { + Self { + event: conn.event, + lsp: conn.lsp, + marker: std::marker::PhantomData, + } + } +} + +type AnyCaster = Arc &mut S + Send + Sync>; + +/// A Lsp client with typed service `S`. +pub struct TypedLspClient { + client: LspClient, + caster: AnyCaster, +} + +impl TypedLspClient { + /// Converts the client to an untyped client. + pub fn to_untyped(self) -> LspClient { + self.client + } +} + +impl TypedLspClient { + /// Returns the untyped lsp client. + pub fn untyped(&self) -> &LspClient { + &self.client + } + + /// Casts the service to another type. + pub fn cast(&self, f: fn(&mut S) -> &mut T) -> TypedLspClient { + let caster = self.caster.clone(); + TypedLspClient { + client: self.client.clone(), + caster: Arc::new(move |s| f(caster(s))), + } + } + + /// Sends a event to the client itself. + pub fn send_event(&self, event: T) { + let Some(sender) = self.sender.upgrade() else { + log::warn!("failed to send request: connection closed"); + return; + }; + + let Err(res) = sender.event.send(Box::new(event)) else { + return; + }; + log::warn!("failed to send event: {res:?}"); + } +} + +impl Clone for TypedLspClient { + fn clone(&self) -> Self { + Self { + client: self.client.clone(), + caster: self.caster.clone(), + } + } +} + +impl std::ops::Deref for TypedLspClient { + type Target = LspClient; + + fn deref(&self) -> &Self::Target { + &self.client + } +} + +/// The root of the language server host. +/// Will close connection when dropped. +#[derive(Debug, Clone)] +pub struct LspClientRoot { + weak: LspClient, + _strong: Arc, +} + +impl LspClientRoot { + /// Creates a new language server host. + pub fn new + GetMessageKind>( + handle: tokio::runtime::Handle, + sender: TConnectionTx, + ) -> Self { + let _strong = Arc::new(sender.into()); + let weak = LspClient { + handle, + msg_kind: M::get_message_kind(), + sender: Arc::downgrade(&_strong), + req_queue: Arc::new(Mutex::new(ReqQueue::default())), + }; + Self { weak, _strong } + } + + /// Returns the weak reference to the language server host. + pub fn weak(&self) -> LspClient { + self.weak.clone() + } +} + +type ReqHandler = Box FnOnce(&'a mut dyn Any, LspOrDapResponse) + Send + Sync>; +type ReqQueue = req_queue::ReqQueue<(String, Instant), ReqHandler>; + +/// The host for the language server, or known as the LSP client. +#[derive(Debug, Clone)] +pub struct LspClient { + /// The tokio handle. + pub handle: tokio::runtime::Handle, + + pub(crate) msg_kind: MessageKind, + pub(crate) sender: Weak, + pub(crate) req_queue: Arc>, +} + +impl LspClient { + /// Returns the untyped lsp client. + pub fn untyped(&self) -> &Self { + self + } + + /// converts the client to a typed client. + pub fn to_typed(&self) -> TypedLspClient { + TypedLspClient { + client: self.clone(), + caster: Arc::new(|s| s.downcast_mut().expect("invalid cast")), + } + } + + /// Checks if there are pending requests. + pub fn has_pending_requests(&self) -> bool { + self.req_queue.lock().incoming.has_pending() + } + + /// Prints states of the request queue and panics. + pub fn begin_panic(&self) { + self.req_queue.lock().begin_panic(); + } + + /// Sends a event to the server itself. + pub fn send_event(&self, event: T) { + let Some(sender) = self.sender.upgrade() else { + log::warn!("failed to send request: connection closed"); + return; + }; + + if let Err(res) = sender.event.send(Box::new(event)) { + log::warn!("failed to send event: {res:?}"); + } + } + + /// Completes an server2client request in the request queue. + #[cfg(feature = "lsp")] + pub fn complete_lsp_request(&self, service: &mut S, response: lsp::Response) { + let mut req_queue = self.req_queue.lock(); + let Some(handler) = req_queue.outgoing.complete(response.id.clone()) else { + log::warn!("received response for unknown request"); + return; + }; + drop(req_queue); + handler(service, response.into()) + } + + /// Completes an server2client request in the request queue. + #[cfg(feature = "dap")] + pub fn complete_dap_request(&self, service: &mut S, response: dap::Response) { + let mut req_queue = self.req_queue.lock(); + let Some(handler) = req_queue + .outgoing + // todo: casting i64 to i32 + .complete((response.request_seq as i32).into()) + else { + log::warn!("received response for unknown request"); + return; + }; + drop(req_queue); + handler(service, response.into()) + } + + /// Registers an client2server request in the request queue. + pub fn register_request(&self, method: &str, id: &RequestId, received_at: Instant) { + let mut req_queue = self.req_queue.lock(); + self.start_request(id, method); + req_queue + .incoming + .register(id.clone(), (method.to_owned(), received_at)); + } + + /// Responds a typed result to the client. + pub fn respond_result(&self, id: RequestId, result: LspResult) { + let result = result.and_then(|t| serde_json::to_value(t).map_err(internal_error)); + self.respond_any_result(id, result); + } + + fn respond_any_result(&self, id: RequestId, result: LspResult) { + let req_id = id.clone(); + let msg: Message = match (self.msg_kind, result) { + #[cfg(feature = "lsp")] + (MessageKind::Lsp, Ok(resp)) => lsp::Response::new_ok(id, resp).into(), + #[cfg(feature = "lsp")] + (MessageKind::Lsp, Err(e)) => lsp::Response::new_err(id, e.code, e.message).into(), + #[cfg(feature = "dap")] + (MessageKind::Dap, Ok(resp)) => dap::Response::success(RequestId::dap(id), resp).into(), + #[cfg(feature = "dap")] + (MessageKind::Dap, Err(e)) => { + dap::Response::error(RequestId::dap(id), Some(e.message), None).into() + } + }; + + self.respond(req_id, msg); + } + + /// Completes an client2server request in the request queue. + pub fn respond(&self, id: RequestId, response: Message) { + let mut req_queue = self.req_queue.lock(); + let Some((method, received_at)) = req_queue.incoming.complete(&id) else { + return; + }; + + self.stop_request(&id, &method, received_at); + + let Some(sender) = self.sender.upgrade() else { + log::warn!("failed to send response ({method}, {id}): connection closed"); + return; + }; + if let Err(res) = sender.lsp.send(response) { + log::warn!("failed to send response ({method}, {id}): {res:?}"); + } + } +} + +impl LspClient { + /// Schedules a request from the client. + pub fn schedule( + &self, + req_id: RequestId, + resp: SchedulableResponse, + ) -> ScheduledResult { + let resp = resp?; + + use futures::future::MaybeDone::*; + match resp { + Done(output) => { + self.respond_result(req_id, output); + } + Future(fut) => { + let client = self.clone(); + let req_id = req_id.clone(); + self.handle.spawn(async move { + client.respond_result(req_id, fut.await); + }); + } + Gone => { + log::warn!("response for request({req_id:?}) already taken"); + } + }; + + Ok(Some(())) + } + + /// Catch the early rejected requests. + pub(crate) fn schedule_tail(&self, req_id: RequestId, resp: ScheduledResult) { + match resp { + // Already responded + Ok(Some(())) => {} + // The requests that doesn't start. + _ => self.respond_result(req_id, resp), + } + } +} + +impl LspClient { + fn start_request(&self, req_id: &RequestId, method: &str) { + log::info!("handling {method} - ({req_id})"); + } + + fn stop_request(&self, req_id: &RequestId, method: &str, received_at: Instant) { + let duration = received_at.elapsed(); + log::info!("handled {method} - ({req_id}) in {duration:0.2?}"); + } + + fn start_notification(&self, method: &str) { + log::info!("notifying {method}"); + } + + fn stop_notification(&self, method: &str, received_at: Instant, result: LspResult<()>) { + let request_duration = received_at.elapsed(); + if let Err(err) = result { + log::error!("notify {method} failed in {request_duration:0.2?}: {err:?}"); + } else { + log::info!("notify {method} succeeded in {request_duration:0.2?}"); + } + } +} + type AsyncHandler = fn(srv: &mut S, args: T) -> SchedulableResponse; type RawHandler = fn(srv: &mut S, req_id: RequestId, args: T) -> ScheduledResult; type BoxPureHandler = Box LspResult<()>>; @@ -48,6 +428,13 @@ pub trait Initializer { fn initialize(self, req: Self::I) -> (Self::S, AnySchedulableResponse); } +/// The language server builder serving LSP. +#[cfg(feature = "lsp")] +pub type LspBuilder = LsBuilder; +/// The language server builder serving DAP. +#[cfg(feature = "dap")] +pub type DapBuilder = LsBuilder; + /// The builder pattern for the language server. pub struct LsBuilder { /// The extra initialization arguments. @@ -67,13 +454,6 @@ pub struct LsBuilder { _marker: std::marker::PhantomData, } -/// The language server builder serving LSP. -#[cfg(feature = "lsp")] -pub type LspBuilder = LsBuilder; -/// The language server builder serving DAP. -#[cfg(feature = "dap")] -pub type DapBuilder = LsBuilder; - impl LsBuilder where Args::S: 'static, @@ -250,20 +630,21 @@ impl LsDriver { } } -fn from_json(json: JsonValue) -> LspResult { - serde_json::from_value(json).map_err(invalid_request) +/// A helper function to create a `LspResponseFuture` +pub fn just_ok(res: T) -> Result>, E> { + Ok(futures::future::MaybeDone::Done(Ok(res))) } -fn raw_to_boxed(handler: RawHandler) -> BoxHandler { - Box::new(move |s, _client, req_id, req| handler(s, req_id, req)) +/// A helper function to create a `LspResponseFuture` +pub fn just_result(res: Result) -> Result>, E> { + Ok(futures::future::MaybeDone::Done(res)) } -fn resp_err(code: ErrorCode, msg: impl fmt::Display) -> ResponseError { - ResponseError { - code: code as i32, - message: msg.to_string(), - data: None, - } +/// A helper function to create a `LspResponseFuture` +pub fn just_future( + fut: impl std::future::Future> + Send + 'static, +) -> Result>, E> { + Ok(futures::future::MaybeDone::Future(Box::pin(fut))) } /// Creates an invalid params error. @@ -290,3 +671,19 @@ pub fn method_not_found() -> ResponseError { pub fn invalid_request(msg: impl fmt::Display) -> ResponseError { resp_err(ErrorCode::InvalidRequest, msg) } + +fn from_json(json: JsonValue) -> LspResult { + serde_json::from_value(json).map_err(invalid_request) +} + +fn raw_to_boxed(handler: RawHandler) -> BoxHandler { + Box::new(move |s, _client, req_id, req| handler(s, req_id, req)) +} + +fn resp_err(code: ErrorCode, msg: impl fmt::Display) -> ResponseError { + ResponseError { + code: code as i32, + message: msg.to_string(), + data: None, + } +} diff --git a/crates/tinymist-std/Cargo.toml b/crates/tinymist-std/Cargo.toml index 80bea5a2..8c0a2b70 100644 --- a/crates/tinymist-std/Cargo.toml +++ b/crates/tinymist-std/Cargo.toml @@ -16,7 +16,7 @@ base64.workspace = true bitvec.workspace = true comemo.workspace = true dashmap.workspace = true -ecow.workspace = true +ecow = { workspace = true, features = ["serde"] } fxhash.workspace = true log.workspace = true path-clean.workspace = true diff --git a/crates/tinymist/Cargo.toml b/crates/tinymist/Cargo.toml index ed316c68..3cdaddbb 100644 --- a/crates/tinymist/Cargo.toml +++ b/crates/tinymist/Cargo.toml @@ -55,7 +55,7 @@ serde.workspace = true serde_json.workspace = true serde_yaml.workspace = true strum.workspace = true -sync-ls.workspace = true +sync-ls = { workspace = true, features = ["lsp", "server"] } tinymist-assets = { workspace = true } tinymist-query.workspace = true tinymist-std.workspace = true @@ -132,7 +132,7 @@ preview = [ "hyper-tungstenite", ] -dap = [] +dap = ["sync-ls/dap"] l10n = ["tinymist-assets/l10n"] diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 7fd0092d..263b70dc 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -9,9 +9,9 @@ name = "tinymist-e2e-tests" path = "e2e/main.rs" [dev-dependencies] -sync-ls.workspace = true +sync-ls = { workspace = true, default-features = false, features = ["lsp"] } lsp-types.workspace = true serde.workspace = true serde_json.workspace = true -reflexo.workspace = true +tinymist-std.workspace = true insta.workspace = true diff --git a/tests/e2e/main.rs b/tests/e2e/main.rs index 4b0af426..3bd671c9 100644 --- a/tests/e2e/main.rs +++ b/tests/e2e/main.rs @@ -345,7 +345,7 @@ fn replay_log(tinymist_binary: &Path, root: &Path) -> String { // let sorted_res let sorted_res = sort_and_redact_value(res); let c = serde_json::to_string_pretty(&sorted_res).unwrap(); - let hash = reflexo::hash::hash128(&c); + let hash = tinymist_std::hash::hash128(&c); std::fs::write(root.join("result_sorted.json"), c).unwrap(); format!("siphash128_13:{:x}", hash)