feat: reduce deps of the tests crate (#1612)

* feat: reduce deps of the tests crate

* feat: reduce deps of the tests crate (2)
This commit is contained in:
Myriad-Dreamin 2025-04-01 16:42:25 +08:00 committed by GitHub
parent 64044ea514
commit 03a30f241e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 469 additions and 455 deletions

2
Cargo.lock generated
View file

@ -3891,10 +3891,10 @@ version = "0.13.10"
dependencies = [
"insta",
"lsp-types",
"reflexo",
"serde",
"serde_json",
"sync-ls",
"tinymist-std",
]
[[package]]

View file

@ -151,7 +151,7 @@ typlite = { path = "./crates/typlite" }
crossbeam-channel = "0.5.12"
lsp-types = { version = "=0.95.0", features = ["proposed"] }
dapts = "0.0.6"
sync-ls = { path = "./crates/sync-lsp", features = ["clap"] }
sync-ls = { path = "./crates/sync-lsp" }
# CLI
clap = { version = "4.5", features = ["derive", "env", "unicode"] }

View file

@ -14,25 +14,33 @@ rust-version.workspace = true
[dependencies]
anyhow.workspace = true
clap = { workspace = true, optional = true }
crossbeam-channel.workspace = true
dapts = { workspace = true, optional = true }
futures.workspace = true
log.workspace = true
lsp-types = { workspace = true, optional = true }
parking_lot.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = ["rt", "time"] }
tokio-util.workspace = true
clap = { workspace = true, optional = true }
crossbeam-channel = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
parking_lot = { workspace = true, optional = true }
tokio = { workspace = true, features = ["rt", "time"], optional = true }
tokio-util = { workspace = true, optional = true }
[features]
default = ["dap", "lsp"]
dap = ["dapts"]
lsp = ["lsp-types"]
server = [
"crossbeam-channel",
"futures",
"tokio",
"tokio-util",
"clap",
"parking_lot",
]
[package.metadata.docs.rs]
features = ["dap", "lsp"]
features = ["dap", "lsp", "server"]
[lints]
workspace = true

View file

@ -4,420 +4,27 @@
pub mod dap;
#[cfg(feature = "lsp")]
pub mod lsp;
pub mod req_queue;
pub mod transport;
pub use error::*;
pub use msg::*;
pub use server::*;
mod error;
pub use error::*;
mod msg;
pub use msg::*;
#[cfg(feature = "server")]
pub use server::*;
#[cfg(feature = "server")]
pub mod req_queue;
#[cfg(feature = "server")]
mod server;
#[cfg(feature = "server")]
pub mod transport;
use std::any::Any;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::time::Instant;
use futures::future::MaybeDone;
use parking_lot::Mutex;
use serde::Serialize;
use serde_json::Value as JsonValue;
type Event = Box<dyn Any + Send>;
/// The sender of the language server.
#[derive(Debug, Clone)]
pub struct TConnectionTx<M> {
/// The sender of the events.
pub event: crossbeam_channel::Sender<Event>,
/// The sender of the LSP messages.
pub lsp: crossbeam_channel::Sender<Message>,
marker: std::marker::PhantomData<M>,
}
/// The sender of the language server.
#[derive(Debug, Clone)]
pub struct TConnectionRx<M> {
/// The receiver of the events.
pub event: crossbeam_channel::Receiver<Event>,
/// The receiver of the LSP messages.
pub lsp: crossbeam_channel::Receiver<Message>,
marker: std::marker::PhantomData<M>,
}
impl<M: TryFrom<Message, Error = anyhow::Error>> TConnectionRx<M> {
/// Receives a message or an event.
fn recv(&self) -> anyhow::Result<EventOrMessage<M>> {
crossbeam_channel::select_biased! {
recv(self.lsp) -> msg => Ok(EventOrMessage::Msg(msg?.try_into()?)),
recv(self.event) -> event => Ok(event.map(EventOrMessage::Evt)?),
}
}
}
/// The untyped connect tx for the language server.
pub type ConnectionTx = TConnectionTx<Message>;
/// The untyped connect rx for the language server.
pub type ConnectionRx = TConnectionRx<Message>;
/// This is a helper enum to handle both events and messages.
enum EventOrMessage<M> {
Evt(Event),
Msg(M),
}
/// Connection is just a pair of channels of LSP messages.
pub struct Connection<M> {
/// The senders of the connection.
pub sender: TConnectionTx<M>,
/// The receivers of the connection.
pub receiver: TConnectionRx<M>,
}
impl<M: TryFrom<Message, Error = anyhow::Error>> From<Connection<Message>> for Connection<M> {
fn from(conn: Connection<Message>) -> Self {
Self {
sender: TConnectionTx {
event: conn.sender.event,
lsp: conn.sender.lsp,
marker: std::marker::PhantomData,
},
receiver: TConnectionRx {
event: conn.receiver.event,
lsp: conn.receiver.lsp,
marker: std::marker::PhantomData,
},
}
}
}
impl<M: TryFrom<Message, Error = anyhow::Error>> From<TConnectionTx<M>> for ConnectionTx {
fn from(conn: TConnectionTx<M>) -> Self {
Self {
event: conn.event,
lsp: conn.lsp,
marker: std::marker::PhantomData,
}
}
}
/// The common error type for the language server.
pub use msg::ResponseError;
/// The common result type for the language server.
/// The common error type for language servers.
pub use crate::msg::ResponseError;
/// The common result type for language servers.
pub type LspResult<T> = Result<T, ResponseError>;
/// A future that may be done in place or not.
pub type ResponseFuture<T> = MaybeDone<Pin<Box<dyn std::future::Future<Output = T> + Send>>>;
/// A future that may be rejected before actual started.
pub type LspResponseFuture<T> = LspResult<ResponseFuture<T>>;
/// A future that could be rejected by common error in `LspResponseFuture`.
pub type SchedulableResponse<T> = LspResponseFuture<LspResult<T>>;
/// The common future type for the language server.
pub type AnySchedulableResponse = SchedulableResponse<JsonValue>;
/// The result of a scheduled response which could be finally caught by
/// `schedule_tail`.
/// - Returns Ok(Some()) -> Already responded
/// - Returns Ok(None) -> Need to respond none
/// - Returns Err(..) -> Need to respond error
pub type ScheduledResult = LspResult<Option<()>>;
/// A helper function to create a `LspResponseFuture`
pub fn just_ok<T, E>(res: T) -> Result<ResponseFuture<Result<T, E>>, E> {
Ok(futures::future::MaybeDone::Done(Ok(res)))
}
/// A helper function to create a `LspResponseFuture`
pub fn just_result<T, E>(res: Result<T, E>) -> Result<ResponseFuture<Result<T, E>>, E> {
Ok(futures::future::MaybeDone::Done(res))
}
/// A helper function to create a `LspResponseFuture`
pub fn just_future<T, E>(
fut: impl std::future::Future<Output = Result<T, E>> + Send + 'static,
) -> Result<ResponseFuture<Result<T, E>>, E> {
Ok(futures::future::MaybeDone::Future(Box::pin(fut)))
}
type AnyCaster<S> = Arc<dyn Fn(&mut dyn Any) -> &mut S + Send + Sync>;
/// A Lsp client with typed service `S`.
pub struct TypedLspClient<S> {
client: LspClient,
caster: AnyCaster<S>,
}
impl<S> TypedLspClient<S> {
/// Converts the client to an untyped client.
pub fn to_untyped(self) -> LspClient {
self.client
}
}
impl<S: 'static> TypedLspClient<S> {
/// Returns the untyped lsp client.
pub fn untyped(&self) -> &LspClient {
&self.client
}
/// Casts the service to another type.
pub fn cast<T: 'static>(&self, f: fn(&mut S) -> &mut T) -> TypedLspClient<T> {
let caster = self.caster.clone();
TypedLspClient {
client: self.client.clone(),
caster: Arc::new(move |s| f(caster(s))),
}
}
/// Sends a event to the client itself.
pub fn send_event<T: std::any::Any + Send + 'static>(&self, event: T) {
let Some(sender) = self.sender.upgrade() else {
log::warn!("failed to send request: connection closed");
return;
};
let Err(res) = sender.event.send(Box::new(event)) else {
return;
};
log::warn!("failed to send event: {res:?}");
}
}
impl<S> Clone for TypedLspClient<S> {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
caster: self.caster.clone(),
}
}
}
impl<S> std::ops::Deref for TypedLspClient<S> {
type Target = LspClient;
fn deref(&self) -> &Self::Target {
&self.client
}
}
/// The root of the language server host.
/// Will close connection when dropped.
#[derive(Debug, Clone)]
pub struct LspClientRoot {
weak: LspClient,
_strong: Arc<ConnectionTx>,
}
impl LspClientRoot {
/// Creates a new language server host.
pub fn new<M: TryFrom<Message, Error = anyhow::Error> + GetMessageKind>(
handle: tokio::runtime::Handle,
sender: TConnectionTx<M>,
) -> Self {
let _strong = Arc::new(sender.into());
let weak = LspClient {
handle,
msg_kind: M::get_message_kind(),
sender: Arc::downgrade(&_strong),
req_queue: Arc::new(Mutex::new(ReqQueue::default())),
};
Self { weak, _strong }
}
/// Returns the weak reference to the language server host.
pub fn weak(&self) -> LspClient {
self.weak.clone()
}
}
type ReqHandler = Box<dyn for<'a> FnOnce(&'a mut dyn Any, LspOrDapResponse) + Send + Sync>;
type ReqQueue = req_queue::ReqQueue<(String, Instant), ReqHandler>;
/// The host for the language server, or known as the LSP client.
#[derive(Debug, Clone)]
pub struct LspClient {
/// The tokio handle.
pub handle: tokio::runtime::Handle,
msg_kind: MessageKind,
sender: Weak<ConnectionTx>,
req_queue: Arc<Mutex<ReqQueue>>,
}
impl LspClient {
/// Returns the untyped lsp client.
pub fn untyped(&self) -> &Self {
self
}
/// converts the client to a typed client.
pub fn to_typed<S: Any>(&self) -> TypedLspClient<S> {
TypedLspClient {
client: self.clone(),
caster: Arc::new(|s| s.downcast_mut().expect("invalid cast")),
}
}
/// Checks if there are pending requests.
pub fn has_pending_requests(&self) -> bool {
self.req_queue.lock().incoming.has_pending()
}
/// Prints states of the request queue and panics.
pub fn begin_panic(&self) {
self.req_queue.lock().begin_panic();
}
/// Sends a event to the server itself.
pub fn send_event<T: std::any::Any + Send + 'static>(&self, event: T) {
let Some(sender) = self.sender.upgrade() else {
log::warn!("failed to send request: connection closed");
return;
};
if let Err(res) = sender.event.send(Box::new(event)) {
log::warn!("failed to send event: {res:?}");
}
}
/// Completes an server2client request in the request queue.
#[cfg(feature = "lsp")]
pub fn complete_lsp_request<S: Any>(&self, service: &mut S, response: lsp::Response) {
let mut req_queue = self.req_queue.lock();
let Some(handler) = req_queue.outgoing.complete(response.id.clone()) else {
log::warn!("received response for unknown request");
return;
};
drop(req_queue);
handler(service, response.into())
}
/// Completes an server2client request in the request queue.
#[cfg(feature = "dap")]
pub fn complete_dap_request<S: Any>(&self, service: &mut S, response: dap::Response) {
let mut req_queue = self.req_queue.lock();
let Some(handler) = req_queue
.outgoing
// todo: casting i64 to i32
.complete((response.request_seq as i32).into())
else {
log::warn!("received response for unknown request");
return;
};
drop(req_queue);
handler(service, response.into())
}
/// Registers an client2server request in the request queue.
pub fn register_request(&self, method: &str, id: &RequestId, received_at: Instant) {
let mut req_queue = self.req_queue.lock();
self.start_request(id, method);
req_queue
.incoming
.register(id.clone(), (method.to_owned(), received_at));
}
/// Responds a typed result to the client.
pub fn respond_result<T: Serialize>(&self, id: RequestId, result: LspResult<T>) {
let result = result.and_then(|t| serde_json::to_value(t).map_err(internal_error));
self.respond_any_result(id, result);
}
fn respond_any_result(&self, id: RequestId, result: LspResult<JsonValue>) {
let req_id = id.clone();
let msg: Message = match (self.msg_kind, result) {
#[cfg(feature = "lsp")]
(MessageKind::Lsp, Ok(resp)) => lsp::Response::new_ok(id, resp).into(),
#[cfg(feature = "lsp")]
(MessageKind::Lsp, Err(e)) => lsp::Response::new_err(id, e.code, e.message).into(),
#[cfg(feature = "dap")]
(MessageKind::Dap, Ok(resp)) => dap::Response::success(RequestId::dap(id), resp).into(),
#[cfg(feature = "dap")]
(MessageKind::Dap, Err(e)) => {
dap::Response::error(RequestId::dap(id), Some(e.message), None).into()
}
};
self.respond(req_id, msg);
}
/// Completes an client2server request in the request queue.
pub fn respond(&self, id: RequestId, response: Message) {
let mut req_queue = self.req_queue.lock();
let Some((method, received_at)) = req_queue.incoming.complete(&id) else {
return;
};
self.stop_request(&id, &method, received_at);
let Some(sender) = self.sender.upgrade() else {
log::warn!("failed to send response ({method}, {id}): connection closed");
return;
};
if let Err(res) = sender.lsp.send(response) {
log::warn!("failed to send response ({method}, {id}): {res:?}");
}
}
}
impl LspClient {
/// Schedules a request from the client.
pub fn schedule<T: Serialize + 'static>(
&self,
req_id: RequestId,
resp: SchedulableResponse<T>,
) -> ScheduledResult {
let resp = resp?;
use futures::future::MaybeDone::*;
match resp {
Done(output) => {
self.respond_result(req_id, output);
}
Future(fut) => {
let client = self.clone();
let req_id = req_id.clone();
self.handle.spawn(async move {
client.respond_result(req_id, fut.await);
});
}
Gone => {
log::warn!("response for request({req_id:?}) already taken");
}
};
Ok(Some(()))
}
/// Catch the early rejected requests.
fn schedule_tail(&self, req_id: RequestId, resp: ScheduledResult) {
match resp {
// Already responded
Ok(Some(())) => {}
// The requests that doesn't start.
_ => self.respond_result(req_id, resp),
}
}
}
impl LspClient {
fn start_request(&self, req_id: &RequestId, method: &str) {
log::info!("handling {method} - ({req_id})");
}
fn stop_request(&self, req_id: &RequestId, method: &str, received_at: Instant) {
let duration = received_at.elapsed();
log::info!("handled {method} - ({req_id}) in {duration:0.2?}");
}
fn start_notification(&self, method: &str) {
log::info!("notifying {method}");
}
fn stop_notification(&self, method: &str, received_at: Instant, result: LspResult<()>) {
let request_duration = received_at.elapsed();
if let Err(err) = result {
log::error!("notify {method} failed in {request_duration:0.2?}: {err:?}");
} else {
log::info!("notify {method} succeeded in {request_duration:0.2?}");
}
}
}
/// The common event type for language servers.
pub type Event = Box<dyn Any + Send>;

View file

@ -176,6 +176,8 @@ impl Notification {
}),
}
}
#[cfg(feature = "server")]
pub(crate) fn is_exit(&self) -> bool {
self.method == "exit"
}

View file

@ -18,7 +18,7 @@ use crate::lsp;
pub struct RequestId(IdRepr);
impl RequestId {
#[cfg(feature = "dap")]
#[cfg(all(feature = "dap", feature = "server"))]
pub(crate) fn dap(id: RequestId) -> i64 {
match id.0 {
IdRepr::I32(it) => it as i64,

View file

@ -1,28 +1,408 @@
//! A synchronous language server implementation.
use core::fmt;
use std::any::Any;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::time::Instant;
use serde::Serialize;
use serde_json::{from_value, Value as JsonValue};
#[cfg(feature = "dap")]
mod dap_srv;
#[cfg(feature = "lsp")]
mod lsp_srv;
#[cfg(feature = "lsp")]
use lsp::{Notification, Request};
use core::fmt;
use std::any::Any;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::time::Instant;
use futures::future::MaybeDone;
use parking_lot::Mutex;
use serde::Serialize;
use serde_json::{from_value, Value as JsonValue};
#[cfg(feature = "lsp")]
use crate::lsp::{Notification, Request};
use crate::msg::*;
use crate::req_queue;
use crate::*;
type Event = Box<dyn Any + Send>;
type ImmutPath = Arc<Path>;
/// A future that may be done in place or not.
pub type ResponseFuture<T> = MaybeDone<Pin<Box<dyn std::future::Future<Output = T> + Send>>>;
/// A future that may be rejected before actual started.
pub type LspResponseFuture<T> = LspResult<ResponseFuture<T>>;
/// A future that could be rejected by common error in `LspResponseFuture`.
pub type SchedulableResponse<T> = LspResponseFuture<LspResult<T>>;
/// The common response future type for language servers.
pub type AnySchedulableResponse = SchedulableResponse<JsonValue>;
/// The result of a scheduled response which could be finally caught by
/// `schedule_tail`.
/// - Returns Ok(Some()) -> Already responded
/// - Returns Ok(None) -> Need to respond none
/// - Returns Err(..) -> Need to respond error
pub type ScheduledResult = LspResult<Option<()>>;
/// The untyped connect tx for language servers.
pub type ConnectionTx = TConnectionTx<Message>;
/// The untyped connect rx for language servers.
pub type ConnectionRx = TConnectionRx<Message>;
/// The sender of the language server.
#[derive(Debug, Clone)]
pub struct TConnectionTx<M> {
/// The sender of the events.
pub event: crossbeam_channel::Sender<Event>,
/// The sender of the LSP messages.
pub lsp: crossbeam_channel::Sender<Message>,
pub(crate) marker: std::marker::PhantomData<M>,
}
/// The sender of the language server.
#[derive(Debug, Clone)]
pub struct TConnectionRx<M> {
/// The receiver of the events.
pub event: crossbeam_channel::Receiver<Event>,
/// The receiver of the LSP messages.
pub lsp: crossbeam_channel::Receiver<Message>,
pub(crate) marker: std::marker::PhantomData<M>,
}
impl<M: TryFrom<Message, Error = anyhow::Error>> TConnectionRx<M> {
/// Receives a message or an event.
pub(crate) fn recv(&self) -> anyhow::Result<EventOrMessage<M>> {
crossbeam_channel::select_biased! {
recv(self.lsp) -> msg => Ok(EventOrMessage::Msg(msg?.try_into()?)),
recv(self.event) -> event => Ok(event.map(EventOrMessage::Evt)?),
}
}
}
/// This is a helper enum to handle both events and messages.
pub(crate) enum EventOrMessage<M> {
Evt(Event),
Msg(M),
}
/// Connection is just a pair of channels of LSP messages.
pub struct Connection<M> {
/// The senders of the connection.
pub sender: TConnectionTx<M>,
/// The receivers of the connection.
pub receiver: TConnectionRx<M>,
}
impl<M: TryFrom<Message, Error = anyhow::Error>> From<Connection<Message>> for Connection<M> {
fn from(conn: Connection<Message>) -> Self {
Self {
sender: TConnectionTx {
event: conn.sender.event,
lsp: conn.sender.lsp,
marker: std::marker::PhantomData,
},
receiver: TConnectionRx {
event: conn.receiver.event,
lsp: conn.receiver.lsp,
marker: std::marker::PhantomData,
},
}
}
}
impl<M: TryFrom<Message, Error = anyhow::Error>> From<TConnectionTx<M>> for ConnectionTx {
fn from(conn: TConnectionTx<M>) -> Self {
Self {
event: conn.event,
lsp: conn.lsp,
marker: std::marker::PhantomData,
}
}
}
type AnyCaster<S> = Arc<dyn Fn(&mut dyn Any) -> &mut S + Send + Sync>;
/// A Lsp client with typed service `S`.
pub struct TypedLspClient<S> {
client: LspClient,
caster: AnyCaster<S>,
}
impl<S> TypedLspClient<S> {
/// Converts the client to an untyped client.
pub fn to_untyped(self) -> LspClient {
self.client
}
}
impl<S: 'static> TypedLspClient<S> {
/// Returns the untyped lsp client.
pub fn untyped(&self) -> &LspClient {
&self.client
}
/// Casts the service to another type.
pub fn cast<T: 'static>(&self, f: fn(&mut S) -> &mut T) -> TypedLspClient<T> {
let caster = self.caster.clone();
TypedLspClient {
client: self.client.clone(),
caster: Arc::new(move |s| f(caster(s))),
}
}
/// Sends a event to the client itself.
pub fn send_event<T: std::any::Any + Send + 'static>(&self, event: T) {
let Some(sender) = self.sender.upgrade() else {
log::warn!("failed to send request: connection closed");
return;
};
let Err(res) = sender.event.send(Box::new(event)) else {
return;
};
log::warn!("failed to send event: {res:?}");
}
}
impl<S> Clone for TypedLspClient<S> {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
caster: self.caster.clone(),
}
}
}
impl<S> std::ops::Deref for TypedLspClient<S> {
type Target = LspClient;
fn deref(&self) -> &Self::Target {
&self.client
}
}
/// The root of the language server host.
/// Will close connection when dropped.
#[derive(Debug, Clone)]
pub struct LspClientRoot {
weak: LspClient,
_strong: Arc<ConnectionTx>,
}
impl LspClientRoot {
/// Creates a new language server host.
pub fn new<M: TryFrom<Message, Error = anyhow::Error> + GetMessageKind>(
handle: tokio::runtime::Handle,
sender: TConnectionTx<M>,
) -> Self {
let _strong = Arc::new(sender.into());
let weak = LspClient {
handle,
msg_kind: M::get_message_kind(),
sender: Arc::downgrade(&_strong),
req_queue: Arc::new(Mutex::new(ReqQueue::default())),
};
Self { weak, _strong }
}
/// Returns the weak reference to the language server host.
pub fn weak(&self) -> LspClient {
self.weak.clone()
}
}
type ReqHandler = Box<dyn for<'a> FnOnce(&'a mut dyn Any, LspOrDapResponse) + Send + Sync>;
type ReqQueue = req_queue::ReqQueue<(String, Instant), ReqHandler>;
/// The host for the language server, or known as the LSP client.
#[derive(Debug, Clone)]
pub struct LspClient {
/// The tokio handle.
pub handle: tokio::runtime::Handle,
pub(crate) msg_kind: MessageKind,
pub(crate) sender: Weak<ConnectionTx>,
pub(crate) req_queue: Arc<Mutex<ReqQueue>>,
}
impl LspClient {
/// Returns the untyped lsp client.
pub fn untyped(&self) -> &Self {
self
}
/// converts the client to a typed client.
pub fn to_typed<S: Any>(&self) -> TypedLspClient<S> {
TypedLspClient {
client: self.clone(),
caster: Arc::new(|s| s.downcast_mut().expect("invalid cast")),
}
}
/// Checks if there are pending requests.
pub fn has_pending_requests(&self) -> bool {
self.req_queue.lock().incoming.has_pending()
}
/// Prints states of the request queue and panics.
pub fn begin_panic(&self) {
self.req_queue.lock().begin_panic();
}
/// Sends a event to the server itself.
pub fn send_event<T: std::any::Any + Send + 'static>(&self, event: T) {
let Some(sender) = self.sender.upgrade() else {
log::warn!("failed to send request: connection closed");
return;
};
if let Err(res) = sender.event.send(Box::new(event)) {
log::warn!("failed to send event: {res:?}");
}
}
/// Completes an server2client request in the request queue.
#[cfg(feature = "lsp")]
pub fn complete_lsp_request<S: Any>(&self, service: &mut S, response: lsp::Response) {
let mut req_queue = self.req_queue.lock();
let Some(handler) = req_queue.outgoing.complete(response.id.clone()) else {
log::warn!("received response for unknown request");
return;
};
drop(req_queue);
handler(service, response.into())
}
/// Completes an server2client request in the request queue.
#[cfg(feature = "dap")]
pub fn complete_dap_request<S: Any>(&self, service: &mut S, response: dap::Response) {
let mut req_queue = self.req_queue.lock();
let Some(handler) = req_queue
.outgoing
// todo: casting i64 to i32
.complete((response.request_seq as i32).into())
else {
log::warn!("received response for unknown request");
return;
};
drop(req_queue);
handler(service, response.into())
}
/// Registers an client2server request in the request queue.
pub fn register_request(&self, method: &str, id: &RequestId, received_at: Instant) {
let mut req_queue = self.req_queue.lock();
self.start_request(id, method);
req_queue
.incoming
.register(id.clone(), (method.to_owned(), received_at));
}
/// Responds a typed result to the client.
pub fn respond_result<T: Serialize>(&self, id: RequestId, result: LspResult<T>) {
let result = result.and_then(|t| serde_json::to_value(t).map_err(internal_error));
self.respond_any_result(id, result);
}
fn respond_any_result(&self, id: RequestId, result: LspResult<JsonValue>) {
let req_id = id.clone();
let msg: Message = match (self.msg_kind, result) {
#[cfg(feature = "lsp")]
(MessageKind::Lsp, Ok(resp)) => lsp::Response::new_ok(id, resp).into(),
#[cfg(feature = "lsp")]
(MessageKind::Lsp, Err(e)) => lsp::Response::new_err(id, e.code, e.message).into(),
#[cfg(feature = "dap")]
(MessageKind::Dap, Ok(resp)) => dap::Response::success(RequestId::dap(id), resp).into(),
#[cfg(feature = "dap")]
(MessageKind::Dap, Err(e)) => {
dap::Response::error(RequestId::dap(id), Some(e.message), None).into()
}
};
self.respond(req_id, msg);
}
/// Completes an client2server request in the request queue.
pub fn respond(&self, id: RequestId, response: Message) {
let mut req_queue = self.req_queue.lock();
let Some((method, received_at)) = req_queue.incoming.complete(&id) else {
return;
};
self.stop_request(&id, &method, received_at);
let Some(sender) = self.sender.upgrade() else {
log::warn!("failed to send response ({method}, {id}): connection closed");
return;
};
if let Err(res) = sender.lsp.send(response) {
log::warn!("failed to send response ({method}, {id}): {res:?}");
}
}
}
impl LspClient {
/// Schedules a request from the client.
pub fn schedule<T: Serialize + 'static>(
&self,
req_id: RequestId,
resp: SchedulableResponse<T>,
) -> ScheduledResult {
let resp = resp?;
use futures::future::MaybeDone::*;
match resp {
Done(output) => {
self.respond_result(req_id, output);
}
Future(fut) => {
let client = self.clone();
let req_id = req_id.clone();
self.handle.spawn(async move {
client.respond_result(req_id, fut.await);
});
}
Gone => {
log::warn!("response for request({req_id:?}) already taken");
}
};
Ok(Some(()))
}
/// Catch the early rejected requests.
pub(crate) fn schedule_tail(&self, req_id: RequestId, resp: ScheduledResult) {
match resp {
// Already responded
Ok(Some(())) => {}
// The requests that doesn't start.
_ => self.respond_result(req_id, resp),
}
}
}
impl LspClient {
fn start_request(&self, req_id: &RequestId, method: &str) {
log::info!("handling {method} - ({req_id})");
}
fn stop_request(&self, req_id: &RequestId, method: &str, received_at: Instant) {
let duration = received_at.elapsed();
log::info!("handled {method} - ({req_id}) in {duration:0.2?}");
}
fn start_notification(&self, method: &str) {
log::info!("notifying {method}");
}
fn stop_notification(&self, method: &str, received_at: Instant, result: LspResult<()>) {
let request_duration = received_at.elapsed();
if let Err(err) = result {
log::error!("notify {method} failed in {request_duration:0.2?}: {err:?}");
} else {
log::info!("notify {method} succeeded in {request_duration:0.2?}");
}
}
}
type AsyncHandler<S, T, R> = fn(srv: &mut S, args: T) -> SchedulableResponse<R>;
type RawHandler<S, T> = fn(srv: &mut S, req_id: RequestId, args: T) -> ScheduledResult;
type BoxPureHandler<S, T> = Box<dyn Fn(&mut S, T) -> LspResult<()>>;
@ -48,6 +428,13 @@ pub trait Initializer {
fn initialize(self, req: Self::I) -> (Self::S, AnySchedulableResponse);
}
/// The language server builder serving LSP.
#[cfg(feature = "lsp")]
pub type LspBuilder<Args> = LsBuilder<LspMessage, Args>;
/// The language server builder serving DAP.
#[cfg(feature = "dap")]
pub type DapBuilder<Args> = LsBuilder<DapMessage, Args>;
/// The builder pattern for the language server.
pub struct LsBuilder<M, Args: Initializer> {
/// The extra initialization arguments.
@ -67,13 +454,6 @@ pub struct LsBuilder<M, Args: Initializer> {
_marker: std::marker::PhantomData<M>,
}
/// The language server builder serving LSP.
#[cfg(feature = "lsp")]
pub type LspBuilder<Args> = LsBuilder<LspMessage, Args>;
/// The language server builder serving DAP.
#[cfg(feature = "dap")]
pub type DapBuilder<Args> = LsBuilder<DapMessage, Args>;
impl<M, Args: Initializer> LsBuilder<M, Args>
where
Args::S: 'static,
@ -250,20 +630,21 @@ impl<M, Args: Initializer> LsDriver<M, Args> {
}
}
fn from_json<T: serde::de::DeserializeOwned>(json: JsonValue) -> LspResult<T> {
serde_json::from_value(json).map_err(invalid_request)
/// A helper function to create a `LspResponseFuture`
pub fn just_ok<T, E>(res: T) -> Result<ResponseFuture<Result<T, E>>, E> {
Ok(futures::future::MaybeDone::Done(Ok(res)))
}
fn raw_to_boxed<S: 'static, T: 'static>(handler: RawHandler<S, T>) -> BoxHandler<S, T> {
Box::new(move |s, _client, req_id, req| handler(s, req_id, req))
/// A helper function to create a `LspResponseFuture`
pub fn just_result<T, E>(res: Result<T, E>) -> Result<ResponseFuture<Result<T, E>>, E> {
Ok(futures::future::MaybeDone::Done(res))
}
fn resp_err(code: ErrorCode, msg: impl fmt::Display) -> ResponseError {
ResponseError {
code: code as i32,
message: msg.to_string(),
data: None,
}
/// A helper function to create a `LspResponseFuture`
pub fn just_future<T, E>(
fut: impl std::future::Future<Output = Result<T, E>> + Send + 'static,
) -> Result<ResponseFuture<Result<T, E>>, E> {
Ok(futures::future::MaybeDone::Future(Box::pin(fut)))
}
/// Creates an invalid params error.
@ -290,3 +671,19 @@ pub fn method_not_found() -> ResponseError {
pub fn invalid_request(msg: impl fmt::Display) -> ResponseError {
resp_err(ErrorCode::InvalidRequest, msg)
}
fn from_json<T: serde::de::DeserializeOwned>(json: JsonValue) -> LspResult<T> {
serde_json::from_value(json).map_err(invalid_request)
}
fn raw_to_boxed<S: 'static, T: 'static>(handler: RawHandler<S, T>) -> BoxHandler<S, T> {
Box::new(move |s, _client, req_id, req| handler(s, req_id, req))
}
fn resp_err(code: ErrorCode, msg: impl fmt::Display) -> ResponseError {
ResponseError {
code: code as i32,
message: msg.to_string(),
data: None,
}
}

View file

@ -16,7 +16,7 @@ base64.workspace = true
bitvec.workspace = true
comemo.workspace = true
dashmap.workspace = true
ecow.workspace = true
ecow = { workspace = true, features = ["serde"] }
fxhash.workspace = true
log.workspace = true
path-clean.workspace = true

View file

@ -55,7 +55,7 @@ serde.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
strum.workspace = true
sync-ls.workspace = true
sync-ls = { workspace = true, features = ["lsp", "server"] }
tinymist-assets = { workspace = true }
tinymist-query.workspace = true
tinymist-std.workspace = true
@ -132,7 +132,7 @@ preview = [
"hyper-tungstenite",
]
dap = []
dap = ["sync-ls/dap"]
l10n = ["tinymist-assets/l10n"]

View file

@ -9,9 +9,9 @@ name = "tinymist-e2e-tests"
path = "e2e/main.rs"
[dev-dependencies]
sync-ls.workspace = true
sync-ls = { workspace = true, default-features = false, features = ["lsp"] }
lsp-types.workspace = true
serde.workspace = true
serde_json.workspace = true
reflexo.workspace = true
tinymist-std.workspace = true
insta.workspace = true

View file

@ -345,7 +345,7 @@ fn replay_log(tinymist_binary: &Path, root: &Path) -> String {
// let sorted_res
let sorted_res = sort_and_redact_value(res);
let c = serde_json::to_string_pretty(&sorted_res).unwrap();
let hash = reflexo::hash::hash128(&c);
let hash = tinymist_std::hash::hash128(&c);
std::fs::write(root.join("result_sorted.json"), c).unwrap();
format!("siphash128_13:{:x}", hash)