diff --git a/Cargo.lock b/Cargo.lock index cd2f631f31..d87306562e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3060,6 +3060,7 @@ dependencies = [ "roc_derive_key", "roc_error_macros", "roc_exhaustive", + "roc_load", "roc_module", "roc_packaging", "roc_parse", @@ -3072,6 +3073,7 @@ dependencies = [ "roc_types", "roc_unify", "tempfile", + "test_solve_helpers", ] [[package]] @@ -3094,42 +3096,6 @@ dependencies = [ "bitflags 1.3.2", ] -[[package]] -name = "roc_solve_tests" -version = "0.0.1" -dependencies = [ - "arrayvec 0.7.4", - "bumpalo", - "indoc", - "insta", - "lazy_static", - "libtest-mimic", - "pretty_assertions", - "regex", - "roc_builtins", - "roc_can", - "roc_checkmate", - "roc_collections", - "roc_debug_flags", - "roc_derive", - "roc_derive_key", - "roc_error_macros", - "roc_exhaustive", - "roc_load_internal", - "roc_module", - "roc_packaging", - "roc_parse", - "roc_problem", - "roc_region", - "roc_reporting", - "roc_solve_problem", - "roc_solve_schema", - "roc_target", - "roc_types", - "roc_unify", - "tempfile", -] - [[package]] name = "roc_std" version = "0.0.1" diff --git a/crates/lang_srv/src/analysis.rs b/crates/lang_srv/src/analysis.rs index 072bf1673a..69f26ea26b 100644 --- a/crates/lang_srv/src/analysis.rs +++ b/crates/lang_srv/src/analysis.rs @@ -309,6 +309,8 @@ pub struct DocInfo { pub version: i32, } impl DocInfo { + #[cfg(debug_assertions)] + #[allow(unused)] fn debug_log_prefix(&self, offset: u32) { eprintln!("prefix source{:?}", self.source); diff --git a/crates/lang_srv/src/registry.rs b/crates/lang_srv/src/registry.rs index 20b0d6795d..07f16b1c3f 100644 --- a/crates/lang_srv/src/registry.rs +++ b/crates/lang_srv/src/registry.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, sync::Arc}; -use tokio::sync::{Mutex, MutexGuard}; +use tokio::sync::{Mutex, MutexGuard, RwLock, RwLockWriteGuard}; use tower_lsp::lsp_types::{ CompletionResponse, Diagnostic, GotoDefinitionResponse, Hover, Position, SemanticTokensResult, TextEdit, Url, @@ -10,51 +10,34 @@ use crate::analysis::{AnalyzedDocument, DocInfo}; #[derive(Debug)] pub(crate) struct LatestDocument { - info: DocInfo, - //We can hold this mutex locked during updating while the latest and doc_info are out of sync - //the lock should be aquired and immediately freed by and task looking to get a copy of info - //At the top level we will need to store our lock - latest: tokio::sync::watch::Receiver>>, - latest_sender: tokio::sync::watch::Sender>>, + pub info: DocInfo, + analyzed: tokio::sync::RwLock>>, } impl LatestDocument { pub(crate) async fn get_latest(&self) -> Arc { - let mut my_reciever = self.latest.clone(); - - let a = my_reciever.wait_for(|x| x.is_some()).await.unwrap(); - match a.as_ref() { - Some(latest) => latest.clone(), - None => todo!(), - } + self.analyzed.read().await.as_ref().unwrap().clone() } - pub(crate) fn set_latest(&self, latest: Arc) { - self.latest_sender.send(Some(latest)).unwrap() - } - pub(crate) fn waiting_for_doc(&self) -> bool { - self.latest.borrow().is_none() + pub(crate) fn get_lock(&self) -> RwLockWriteGuard>> { + self.analyzed.blocking_write() } pub(crate) fn new(doc_info: DocInfo) -> LatestDocument { - let chan = tokio::sync::watch::channel(None); + let val = RwLock::new(None); LatestDocument { info: doc_info, - latest_sender: chan.0, - latest: chan.1, + analyzed: val, } } - pub(crate) fn new_initialised(doc: Arc) -> LatestDocument { - let info = doc.doc_info.clone(); - let chan = tokio::sync::watch::channel(Some(doc)); + pub(crate) fn new_initialised(analyzed: Arc) -> LatestDocument { LatestDocument { - info, - latest_sender: chan.0, - latest: chan.1, + info: analyzed.doc_info.clone(), + analyzed: RwLock::new(Some(analyzed)), } } } #[derive(Debug)] pub(crate) struct DocumentPair { - latest_document: LatestDocument, + latest_document: Arc, last_good_document: Arc, } @@ -64,26 +47,28 @@ pub(crate) struct Registry { } impl Registry { + pub async fn get_latest_version(&self, url: &Url) -> Option { + self.documents + .lock() + .await + .get(&url) + .map(|x| x.latest_document.info.version) + } fn update_document<'a>( documents: &mut MutexGuard<'a, HashMap>, document: AnalyzedDocument, ) { let url = document.url().clone(); let document = Arc::new(document); - let latest_doc = LatestDocument::new_initialised(document.clone()); + let latest_doc = Arc::new(LatestDocument::new_initialised(document.clone())); match documents.get_mut(&url) { Some(old_doc) => { - //This is a special case where we know we should - if old_doc.latest_document.waiting_for_doc() { - old_doc.latest_document.set_latest(document.clone()); - } if document.type_checked() { *old_doc = DocumentPair { latest_document: latest_doc, last_good_document: document, }; } else { - //TODO this seems ugly but for now i'll let it slide. shoudl be immutable *old_doc = DocumentPair { latest_document: latest_doc, last_good_document: old_doc.last_good_document.clone(), @@ -102,7 +87,12 @@ impl Registry { } } - pub async fn apply_changes(&self, analysed_docs: Vec) -> () { + pub async fn apply_changes<'a>( + &self, + analysed_docs: Vec, + mut partial_writer: RwLockWriteGuard<'a, Option>>, + updating_url: Url, + ) { let mut documents = self.documents.lock().await; eprintln!( "finised doc analysis updating docs {:?}", @@ -111,13 +101,21 @@ impl Registry { .map(|a| a.doc_info.url.to_string()) .collect::>() ); + let updates = analysed_docs.into_iter().filter_map(|a| { + if a.doc_info.url == updating_url { + *partial_writer = Some(Arc::new(a)); + None + } else { + Some(a) + } + }); - for document in analysed_docs { + for document in updates { Registry::update_document(&mut documents, document); } } - pub async fn apply_doc_info_changes(&self, url: Url, partial: DocInfo) { + pub async fn apply_doc_info_changes(&self, url: Url, partial: Arc) { let mut lock = self.documents.lock().await; let doc = lock.get_mut(&url); match doc { @@ -125,10 +123,10 @@ impl Registry { eprintln!( "set the docInfo for {:?} to version:{:?}", url.as_str(), - partial.version + partial.info.version ); - a.latest_document = LatestDocument::new(partial); + a.latest_document = partial; } None => (), diff --git a/crates/lang_srv/src/server.rs b/crates/lang_srv/src/server.rs index c2a5d334bf..3d98b14e97 100644 --- a/crates/lang_srv/src/server.rs +++ b/crates/lang_srv/src/server.rs @@ -1,14 +1,15 @@ use analysis::HIGHLIGHT_TOKENS_LEGEND; use registry::Registry; use std::future::Future; +use tokio::sync::RwLock; use std::sync::Arc; -use std::time::Duration; use tower_lsp::jsonrpc::Result; use tower_lsp::lsp_types::*; use tower_lsp::{Client, LanguageServer, LspService, Server}; use crate::analysis::global_analysis; +use crate::registry::LatestDocument; mod analysis; mod convert; @@ -21,34 +22,21 @@ struct RocLs { #[derive(Debug)] struct Inner { client: Client, - registry: Registry, - change_cancel_handle: parking_lot::Mutex<( - tokio::sync::watch::Sender, - tokio::sync::watch::Receiver, - )>, - documents_updating: tokio::sync::Semaphore, + registry: RwLock, } impl std::panic::RefUnwindSafe for RocLs {} -const SEMLIMIT: u32 = 20; impl RocLs { pub fn new(client: Client) -> Self { Self { inner: Arc::new(Inner { client, - registry: Registry::default(), - change_cancel_handle: parking_lot::Mutex::new(tokio::sync::watch::channel(0)), - ///Used for identifying if the intial stage of a document update is complete - documents_updating: tokio::sync::Semaphore::new(SEMLIMIT as usize), + registry: RwLock::new(Registry::default()), }), } } ///Wait for all the semaphores associated with an in-progress document_info update to be released - async fn wait_for_changes(&self) { - //We don't actually need to return the permit, all we want is to have it momenterily which guarentees that any updates started before the call to this are complete - drop(self.inner.documents_updating.acquire_many(SEMLIMIT).await.expect("semaphore permits could not be aquired for the document updating process. This is unrecoverable")); - } pub fn capabilities() -> ServerCapabilities { let text_document_sync = TextDocumentSyncCapability::Options( // TODO: later on make this incremental @@ -104,89 +92,72 @@ impl RocLs { /// Records a document content change. async fn change(&self, fi: Url, text: String, version: i32) { eprintln!("starting change"); - - let (updating_doc_info, mut new_change) = self.change_preamble(version).await; - tokio::time::sleep(Duration::from_millis(20)).await; - if new_change.has_changed().unwrap() { - eprintln!("newer task started almost immediately"); - return; - } + let registry_write_lock = self.inner.registry.write().await; eprintln!("finished checking for cancellation"); let (results, partial) = global_analysis(fi.clone(), text, version); + let partial_document = Arc::new(LatestDocument::new(partial.clone())); + let partial_doc_write_lock = partial_document.get_lock(); - self.inner - .registry - .apply_doc_info_changes(fi.clone(), partial) + registry_write_lock + .apply_doc_info_changes(fi.clone(), partial_document.clone()) .await; - drop(updating_doc_info); + //Now that we've got our new partial document written and we hold the exclusive write_handle to its analysis we can allow other tasks to access the registry and the doc_info inside this partial document + drop(registry_write_lock); eprintln!("finished updating docinfo, starting analysis ",); let inner_ref = self.inner.clone(); - let handle = async { + let updating_result = async { let results = match tokio::task::spawn_blocking(results).await { Err(e) => return Err(format!("document analysis failed. reason:{:?}", e)), Ok(a) => a, }; + let latest_version = inner_ref + .registry + .read() + .await + .get_latest_version(&fi) + .await; - inner_ref.registry().apply_changes(results).await; + //if this version is not the latest another change must have come in and this analysis is useless + //if there is no older version we can just proceed with the update + if let Some(latest_version) = latest_version { + return Err(format!( + "version {0} doesn't match latest: {1} discarding analysis ", + version, latest_version + )); + } + + inner_ref + .registry + .write() + .await + .apply_changes(results, partial_doc_write_lock, fi.clone()) + .await; Ok(()) - }; - - eprintln!("waiting on analysis or cancel"); + } + .await; //The analysis task can be cancelled by another change coming in which will update the watched variable - let finished = tokio::select! { - a=handle=>a, - _=new_change.changed()=>Err("Cancellation triggered from change".to_string()) - }; - match finished { - Err(e) => { - eprintln!("cancelled change. Reason:{:?}", e); - return; - } - Ok(_) => (), + if let Err(e) = updating_result { + eprintln!("cancelled change. Reason:{:?}", e); + return; } eprintln!("applied_change getting and returning diagnostics"); - let diagnostics = self.inner.registry().diagnostics(&fi).await; + let diagnostics = self.inner.registry().await.diagnostics(&fi).await; self.inner .client .publish_diagnostics(fi, diagnostics, Some(version)) .await; } - /// Does some bookkeeping before the change. - ///aquires a semphore permit to let future functions know that there is a change ongoing - ///sends a message to the watched change_handle to cancel other changes - ///returns a copy of the watcher to use in our own cancellation - async fn change_preamble( - &self, - version: i32, - ) -> ( - tokio::sync::SemaphorePermit<'_>, - tokio::sync::watch::Receiver, - ) { - let updating_doc_info = self.inner.documents_updating.acquire().await.unwrap(); - let new_change = { - let cancel_handle = self.inner.change_cancel_handle.lock(); - //This will cancel any other onging changes in favour of this one - cancel_handle - .0 - .send(version) - .expect("change_handle disposed, this shouldn't happen"); - let mut watched = cancel_handle.1.clone(); - watched.borrow_and_update(); - watched - }; - (updating_doc_info, new_change) - } } impl Inner { - fn registry(&self) -> &Registry { - &self.registry + fn registry(&self) -> impl Future> { + self.registry.read() } async fn close(&self, _fi: Url) { @@ -250,6 +221,7 @@ impl LanguageServer for RocLs { panic_wrapper_async(|| async { self.inner .registry() + .await .hover(&text_document.uri, position) .await }) @@ -273,6 +245,7 @@ impl LanguageServer for RocLs { panic_wrapper_async(|| async { self.inner .registry() + .await .goto_definition(&text_document.uri, position) .await }) @@ -286,7 +259,8 @@ impl LanguageServer for RocLs { work_done_progress_params: _, } = params; - panic_wrapper_async(|| async { self.inner.registry().formatting(&text_document.uri) }).await + panic_wrapper_async(|| async { self.inner.registry().await.formatting(&text_document.uri) }) + .await } async fn semantic_tokens_full( @@ -299,24 +273,25 @@ impl LanguageServer for RocLs { partial_result_params: _, } = params; - panic_wrapper_async(|| async { self.inner.registry().semantic_tokens(&text_document.uri) }) - .await + panic_wrapper_async(|| async { + self.inner + .registry() + .await + .semantic_tokens(&text_document.uri) + }) + .await } async fn completion(&self, params: CompletionParams) -> Result> { let doc = params.text_document_position; eprintln!("starting completion"); - eprintln!( - "permits::{:?} ", - self.inner.documents_updating.available_permits() - ); //We need to wait untill any changes that were in progress when we requested completion have applied - self.wait_for_changes().await; eprintln!("waited for doc update to get sorted "); let res = panic_wrapper_async(|| async { self.inner .registry() + .await .completion_items(&doc.text_document.uri, doc.position) .await }) @@ -327,12 +302,6 @@ impl LanguageServer for RocLs { } } -fn panic_wrapper(f: impl FnOnce() -> Option + std::panic::UnwindSafe) -> Result> { - match std::panic::catch_unwind(f) { - Ok(r) => Ok(r), - Err(_) => Err(tower_lsp::jsonrpc::Error::internal_error()), - } -} async fn panic_wrapper_async( f: impl FnOnce() -> Fut + std::panic::UnwindSafe, ) -> Result>