diff --git a/crates/nil/src/server.rs b/crates/nil/src/server.rs index 1650c1f..6c7a246 100644 --- a/crates/nil/src/server.rs +++ b/crates/nil/src/server.rs @@ -7,10 +7,10 @@ use async_lsp::{ClientSocket, ErrorCode, LanguageClient, ResponseError}; use ide::{Analysis, AnalysisHost, Cancelled, FlakeInfo, VfsPath}; use lsp_types::request::{self as req, Request}; use lsp_types::{ - notification as notif, ConfigurationItem, ConfigurationParams, Diagnostic, - DidChangeConfigurationParams, DidChangeTextDocumentParams, DidCloseTextDocumentParams, - DidOpenTextDocumentParams, InitializeParams, InitializeResult, InitializedParams, MessageType, - PublishDiagnosticsParams, ServerInfo, ShowMessageParams, Url, + notification as notif, ConfigurationItem, ConfigurationParams, DidChangeConfigurationParams, + DidChangeTextDocumentParams, DidCloseTextDocumentParams, DidOpenTextDocumentParams, + InitializeParams, InitializeResult, InitializedParams, MessageType, PublishDiagnosticsParams, + ServerInfo, ShowMessageParams, Url, }; use nix_interop::nixos_options::{self, NixosOptions}; use nix_interop::{flake_lock, FLAKE_FILE, FLAKE_LOCK_FILE}; @@ -24,7 +24,7 @@ use std::ops::ControlFlow; use std::panic::UnwindSafe; use std::sync::{Arc, Once, RwLock}; use std::{fmt, panic}; -use tokio::task::JoinHandle; +use tokio::task::{AbortHandle, JoinHandle}; use tokio::{fs, task}; const LSP_SERVER_NAME: &str = "nil"; @@ -33,16 +33,6 @@ const NIXOS_OPTIONS_FLAKE_INPUT: &str = "nixpkgs"; type NotifyResult = ControlFlow>; -type Task = Box Event + Send + 'static>; - -enum Event { - Diagnostics { - uri: Url, - version: u64, - diagnostics: Vec, - }, -} - struct UpdateConfigEvent(serde_json::Value); struct SetFlakeInfoEvent(Option); struct SetNixosOptionsEvent(NixosOptions); @@ -54,8 +44,6 @@ pub struct Server { vfs: Arc>, opened_files: HashMap, config: Arc, - /// Monotonic version counter for diagnostics calculation ordering. - version_counter: u64, /// Tried to load flake? /// This is used to reload flake only once after the configuration is first loaded. tried_flake_load: bool, @@ -69,8 +57,7 @@ pub struct Server { #[derive(Debug, Default)] struct FileData { - diagnostics_version: u64, - diagnostics: Vec, + diagnostics_task: Option, } impl Server { @@ -111,9 +98,7 @@ impl Server { //// Events //// .event(Self::on_set_flake_info) .event(Self::on_set_nixos_options) - .event(Self::on_update_config) - // TODO: Use individual event types instead. - .event(Self::on_event); + .event(Self::on_update_config); router } @@ -124,7 +109,6 @@ impl Server { opened_files: HashMap::default(), // Will be set during initialization. config: Arc::new(Config::new("/non-existing-path".into())), - version_counter: 0, tried_flake_load: false, load_flake_workspace_fut: None, @@ -133,15 +117,6 @@ impl Server { } } - // TODO: Refactor blocking tasks into async tasks as possible. - fn spawn_task(&self, task: Task) { - let client = self.client.clone(); - task::spawn(async move { - let ret: Event = task::spawn_blocking(task).await.expect("Task panicked"); - let _: Result<_, _> = client.emit(ret); - }); - } - fn on_initialize( &mut self, params: InitializeParams, @@ -189,23 +164,36 @@ impl Server { return ControlFlow::Continue(()); } - let uri = ¶ms.text_document.uri; - self.set_vfs_file_content(uri, params.text_document.text); + let uri = params.text_document.uri; self.opened_files.insert(uri.clone(), FileData::default()); + self.set_vfs_file_content(&uri, params.text_document.text); + + self.spawn_update_diagnostics(uri); + ControlFlow::Continue(()) } fn on_did_close(&mut self, params: DidCloseTextDocumentParams) -> NotifyResult { // N.B. Don't clear text here. self.opened_files.remove(¶ms.text_document.uri); + + // Clear diagnostics for closed files. + let _: Result<_, _> = + self.client + .notify::(PublishDiagnosticsParams { + uri: params.text_document.uri, + diagnostics: Vec::new(), + version: None, + }); + ControlFlow::Continue(()) } fn on_did_change(&mut self, params: DidChangeTextDocumentParams) -> NotifyResult { let mut vfs = self.vfs.write().unwrap(); - let uri = ¶ms.text_document.uri; + let uri = params.text_document.uri; // Ignore files not maintained in Vfs. - let Ok(file) = vfs.file_for_uri(uri) else { return ControlFlow::Continue(()) }; + let Ok(file) = vfs.file_for_uri(&uri) else { return ControlFlow::Continue(()) }; for change in params.content_changes { let ret = (|| { let del_range = match change.range { @@ -222,13 +210,17 @@ impl Server { ); // Clear file states to minimize pollution of the broken state. - self.opened_files.remove(uri); + self.opened_files.remove(&uri); // TODO: Remove the file from Vfs. } } drop(vfs); + // FIXME: This blocks. self.apply_vfs_change(); + + self.spawn_update_diagnostics(uri); + ControlFlow::Continue(()) } @@ -242,37 +234,6 @@ impl Server { ControlFlow::Continue(()) } - fn on_event(&mut self, event: Event) -> NotifyResult { - match event { - Event::Diagnostics { - uri, - version, - diagnostics, - } => match self.opened_files.get_mut(&uri) { - Some(f) if f.diagnostics_version < version => { - f.diagnostics_version = version; - f.diagnostics = diagnostics.clone(); - tracing::trace!( - "Push {} diagnostics of {uri}, version {version}", - diagnostics.len(), - ); - task::spawn({ - let mut client = self.client.clone(); - async move { - client.publish_diagnostics(PublishDiagnosticsParams { - uri, - diagnostics, - version: None, - }) - } - }); - } - _ => tracing::debug!("Ignore raced diagnostics of {uri}, version {version}"), - }, - } - ControlFlow::Continue(()) - } - /// Spawn a task to (re)load the flake workspace via `flake.{nix,lock}`, including flake info, /// NixOS options and outputs (TODO). fn spawn_load_flake_workspace(&mut self) { @@ -477,10 +438,11 @@ impl Server { // Refresh all diagnostics since the filter may be changed. if updated_diagnostics { - let version = self.next_version(); - for uri in self.opened_files.keys() { - tracing::trace!("Recalculate diagnostics of {uri}, version {version}"); - self.update_diagnostics(uri.clone(), version); + // Pre-collect to avoid mutability violation. + let uris = self.opened_files.keys().cloned().collect::>(); + for uri in uris { + tracing::trace!("Recalculate diagnostics of {uri}"); + self.spawn_update_diagnostics(uri.clone()); } } @@ -494,31 +456,43 @@ impl Server { ControlFlow::Continue(()) } - fn update_diagnostics(&self, uri: Url, version: u64) { + fn spawn_update_diagnostics(&mut self, uri: Url) { let snap = self.snapshot(); - let task = move || { - // Return empty diagnostics for ignored files. - let diagnostics = (!snap.config.diagnostics_excluded_files.contains(&uri)) - .then(|| { - with_catch_unwind("diagnostics", || handler::diagnostics(snap, &uri)) - .unwrap_or_else(|err| { - tracing::error!("Failed to calculate diagnostics: {err}"); - Vec::new() - }) - }) - .unwrap_or_default(); - Event::Diagnostics { - uri, - version, - diagnostics, + let task = task::spawn_blocking({ + let uri = uri.clone(); + move || { + // Return empty diagnostics for ignored files. + (!snap.config.diagnostics_excluded_files.contains(&uri)) + .then(|| { + with_catch_unwind("diagnostics", || handler::diagnostics(snap, &uri)) + .unwrap_or_else(|err| { + tracing::error!("Failed to calculate diagnostics: {err}"); + Vec::new() + }) + }) + .unwrap_or_default() } - }; - self.spawn_task(Box::new(task)); - } + }); - fn next_version(&mut self) -> u64 { - self.version_counter += 1; - self.version_counter + // Can this really fail? + let Some(f) = self.opened_files.get_mut(&uri) else { task.abort(); return; }; + if let Some(prev_task) = f.diagnostics_task.replace(task.abort_handle()) { + prev_task.abort(); + } + + let mut client = self.client.clone(); + task::spawn(async move { + if let Ok(diagnostics) = task.await { + tracing::debug!("Publish {} diagnostics for {}", diagnostics.len(), uri); + let _: Result<_, _> = client.publish_diagnostics(PublishDiagnosticsParams { + uri, + diagnostics, + version: None, + }); + } else { + // Task cancelled, then there must be another task queued already. Do nothing. + } + }); } fn snapshot(&self) -> StateSnapshot { @@ -537,33 +511,11 @@ impl Server { fn apply_vfs_change(&mut self) { let changes = self.vfs.write().unwrap().take_change(); - tracing::trace!("Change: {:?}", changes); - let file_changes = changes.file_changes.clone(); + tracing::trace!("Apply VFS changes: {:?}", changes); // N.B. This acquires the internal write lock. // Must be called without holding the lock of `vfs`. self.host.apply_change(changes); - - let version = self.next_version(); - let vfs = self.vfs.read().unwrap(); - for (file, text) in file_changes { - let uri = vfs.uri_for_file(file); - if !self.opened_files.contains_key(&uri) { - continue; - } - - // FIXME: Removed or closed files are indistinguishable from empty files. - if !text.is_empty() { - self.update_diagnostics(uri, version); - } else { - // Clear diagnostics. - let _: Result<_, _> = self.client.emit(Event::Diagnostics { - uri, - version, - diagnostics: Vec::new(), - }); - } - } } }