Merge pull request #7 from Myriad-Dreamin/shutdown-sender

feat: correctly drop sender after the server shutting down
This commit is contained in:
Myriad-Dreamin 2024-03-11 19:57:10 +08:00 committed by GitHub
commit 9823dfa4f5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 42 additions and 11 deletions

View file

@ -50,7 +50,7 @@ use lsp_types::notification::{Notification as NotificationTrait, PublishDiagnost
use lsp_types::request::{RegisterCapability, UnregisterCapability, WorkspaceConfiguration};
use lsp_types::*;
use once_cell::sync::OnceCell;
use parking_lot::Mutex;
use parking_lot::{Mutex, RwLock};
use paste::paste;
use query::MemoryFileMeta;
use serde_json::{Map, Value as JsonValue};
@ -79,13 +79,13 @@ type ReqQueue = lsp_server::ReqQueue<(String, Instant), ReqHandler>;
/// The host for the language server, or known as the LSP client.
#[derive(Debug, Clone)]
pub struct LspHost {
sender: crossbeam_channel::Sender<Message>,
sender: Arc<RwLock<Option<crossbeam_channel::Sender<Message>>>>,
req_queue: Arc<Mutex<ReqQueue>>,
}
impl LspHost {
/// Creates a new language server host.
pub fn new(sender: crossbeam_channel::Sender<Message>) -> Self {
pub fn new(sender: Arc<RwLock<Option<crossbeam_channel::Sender<Message>>>>) -> Self {
Self {
sender,
req_queue: Arc::new(Mutex::new(ReqQueue::default())),
@ -98,10 +98,15 @@ impl LspHost {
handler: ReqHandler,
) {
let mut req_queue = self.req_queue.lock();
let sender = self.sender.read();
let Some(sender) = sender.as_ref() else {
warn!("closed connection, failed to send request");
return;
};
let request = req_queue
.outgoing
.register(R::METHOD.to_owned(), params, handler);
let Err(res) = self.sender.send(request.into()) else {
let Err(res) = sender.send(request.into()) else {
return;
};
warn!("failed to send request: {res:?}");
@ -123,7 +128,13 @@ impl LspHost {
pub fn send_notification<N: lsp_types::notification::Notification>(&self, params: N::Params) {
let not = lsp_server::Notification::new(N::METHOD.to_owned(), params);
let Err(res) = self.sender.send(not.into()) else {
let sender = self.sender.read();
let Some(sender) = sender.as_ref() else {
warn!("closed connection, failed to send request");
return;
};
let Err(res) = sender.send(not.into()) else {
return;
};
warn!("failed to send notification: {res:?}");
@ -143,6 +154,12 @@ impl LspHost {
pub fn respond(&self, response: lsp_server::Response) {
let mut req_queue = self.req_queue.lock();
if let Some((method, start)) = req_queue.incoming.complete(response.id.clone()) {
let sender = self.sender.read();
let Some(sender) = sender.as_ref() else {
warn!("closed connection, failed to send request");
return;
};
// if let Some(err) = &response.error {
// if err.message.starts_with("server panicked") {
// self.poke_rust_analyzer_developer(format!("{}, check the log",
@ -154,7 +171,7 @@ impl LspHost {
"handled {} - ({}) in {:0.2?}",
method, response.id, duration
);
let Err(res) = self.sender.send(response.into()) else {
let Err(res) = sender.send(response.into()) else {
return;
};
warn!("failed to send response: {res:?}");

View file

@ -2,11 +2,15 @@
mod args;
use std::io::{self, BufRead, Read, Write};
use std::{
io::{self, BufRead, Read, Write},
sync::Arc,
};
use clap::Parser;
use log::{info, trace, warn};
use lsp_types::{InitializeParams, InitializedParams};
use parking_lot::RwLock;
use serde::de::DeserializeOwned;
use tinymist::{init::Init, transport::io_transport, LspHost};
use typst_ts_core::config::CompileOpts;
@ -99,7 +103,8 @@ async fn main() -> anyhow::Result<()> {
};
let request_received = std::time::Instant::now();
trace!("InitializeParams: {initialize_params}");
let host = LspHost::new(connection.sender);
let sender = Arc::new(RwLock::new(Some(connection.sender)));
let host = LspHost::new(sender.clone());
let req = lsp_server::Request::new(initialize_id, "initialize".to_owned(), initialize_params);
host.register_request(&req, request_received);
@ -168,6 +173,10 @@ async fn main() -> anyhow::Result<()> {
service.main_loop(connection.receiver)?;
// Drop it on the main thread
{
sender.write().take();
}
io_threads.join()?;
info!("server did shut down");
Ok(())

View file

@ -3,7 +3,7 @@ use std::{
thread,
};
use log::trace;
use log::{info, trace};
use crossbeam_channel::{bounded, Receiver, Sender};
@ -29,9 +29,12 @@ pub fn io_transport<I: BufRead, O: Write>(
let (writer_sender, writer_receiver) = bounded::<Message>(0);
let writer = thread::spawn(move || {
let mut out = out();
writer_receiver
let res = writer_receiver
.into_iter()
.try_for_each(|it| it.write(&mut out))
.try_for_each(|it| it.write(&mut out));
info!("writer thread finished");
res
});
let (reader_sender, reader_receiver) = bounded::<Message>(0);
let reader = thread::spawn(move || {
@ -48,6 +51,8 @@ pub fn io_transport<I: BufRead, O: Write>(
break;
}
}
info!("reader thread finished");
Ok(())
});
let threads = IoThreads { reader, writer };