Switched to simpler version of locking mechanisim

Signed-off-by: faldor20 <eli.jambu@yahoo.com>
This commit is contained in:
Eli Dowling 2023-12-18 10:34:54 +10:00 committed by faldor20
parent a2c8acd9ac
commit b125cc22aa
No known key found for this signature in database
GPG key ID: F2216079B890CD57
4 changed files with 95 additions and 160 deletions

View file

@ -1,14 +1,15 @@
use analysis::HIGHLIGHT_TOKENS_LEGEND;
use registry::Registry;
use std::future::Future;
use tokio::sync::RwLock;
use std::sync::Arc;
use std::time::Duration;
use tower_lsp::jsonrpc::Result;
use tower_lsp::lsp_types::*;
use tower_lsp::{Client, LanguageServer, LspService, Server};
use crate::analysis::global_analysis;
use crate::registry::LatestDocument;
mod analysis;
mod convert;
@ -21,34 +22,21 @@ struct RocLs {
#[derive(Debug)]
struct Inner {
client: Client,
registry: Registry,
change_cancel_handle: parking_lot::Mutex<(
tokio::sync::watch::Sender<i32>,
tokio::sync::watch::Receiver<i32>,
)>,
documents_updating: tokio::sync::Semaphore,
registry: RwLock<Registry>,
}
impl std::panic::RefUnwindSafe for RocLs {}
const SEMLIMIT: u32 = 20;
impl RocLs {
pub fn new(client: Client) -> Self {
Self {
inner: Arc::new(Inner {
client,
registry: Registry::default(),
change_cancel_handle: parking_lot::Mutex::new(tokio::sync::watch::channel(0)),
///Used for identifying if the intial stage of a document update is complete
documents_updating: tokio::sync::Semaphore::new(SEMLIMIT as usize),
registry: RwLock::new(Registry::default()),
}),
}
}
///Wait for all the semaphores associated with an in-progress document_info update to be released
async fn wait_for_changes(&self) {
//We don't actually need to return the permit, all we want is to have it momenterily which guarentees that any updates started before the call to this are complete
drop(self.inner.documents_updating.acquire_many(SEMLIMIT).await.expect("semaphore permits could not be aquired for the document updating process. This is unrecoverable"));
}
pub fn capabilities() -> ServerCapabilities {
let text_document_sync = TextDocumentSyncCapability::Options(
// TODO: later on make this incremental
@ -104,89 +92,72 @@ impl RocLs {
/// Records a document content change.
async fn change(&self, fi: Url, text: String, version: i32) {
eprintln!("starting change");
let (updating_doc_info, mut new_change) = self.change_preamble(version).await;
tokio::time::sleep(Duration::from_millis(20)).await;
if new_change.has_changed().unwrap() {
eprintln!("newer task started almost immediately");
return;
}
let registry_write_lock = self.inner.registry.write().await;
eprintln!("finished checking for cancellation");
let (results, partial) = global_analysis(fi.clone(), text, version);
let partial_document = Arc::new(LatestDocument::new(partial.clone()));
let partial_doc_write_lock = partial_document.get_lock();
self.inner
.registry
.apply_doc_info_changes(fi.clone(), partial)
registry_write_lock
.apply_doc_info_changes(fi.clone(), partial_document.clone())
.await;
drop(updating_doc_info);
//Now that we've got our new partial document written and we hold the exclusive write_handle to its analysis we can allow other tasks to access the registry and the doc_info inside this partial document
drop(registry_write_lock);
eprintln!("finished updating docinfo, starting analysis ",);
let inner_ref = self.inner.clone();
let handle = async {
let updating_result = async {
let results = match tokio::task::spawn_blocking(results).await {
Err(e) => return Err(format!("document analysis failed. reason:{:?}", e)),
Ok(a) => a,
};
let latest_version = inner_ref
.registry
.read()
.await
.get_latest_version(&fi)
.await;
inner_ref.registry().apply_changes(results).await;
//if this version is not the latest another change must have come in and this analysis is useless
//if there is no older version we can just proceed with the update
if let Some(latest_version) = latest_version {
return Err(format!(
"version {0} doesn't match latest: {1} discarding analysis ",
version, latest_version
));
}
inner_ref
.registry
.write()
.await
.apply_changes(results, partial_doc_write_lock, fi.clone())
.await;
Ok(())
};
eprintln!("waiting on analysis or cancel");
}
.await;
//The analysis task can be cancelled by another change coming in which will update the watched variable
let finished = tokio::select! {
a=handle=>a,
_=new_change.changed()=>Err("Cancellation triggered from change".to_string())
};
match finished {
Err(e) => {
eprintln!("cancelled change. Reason:{:?}", e);
return;
}
Ok(_) => (),
if let Err(e) = updating_result {
eprintln!("cancelled change. Reason:{:?}", e);
return;
}
eprintln!("applied_change getting and returning diagnostics");
let diagnostics = self.inner.registry().diagnostics(&fi).await;
let diagnostics = self.inner.registry().await.diagnostics(&fi).await;
self.inner
.client
.publish_diagnostics(fi, diagnostics, Some(version))
.await;
}
/// Does some bookkeeping before the change.
///aquires a semphore permit to let future functions know that there is a change ongoing
///sends a message to the watched change_handle to cancel other changes
///returns a copy of the watcher to use in our own cancellation
async fn change_preamble(
&self,
version: i32,
) -> (
tokio::sync::SemaphorePermit<'_>,
tokio::sync::watch::Receiver<i32>,
) {
let updating_doc_info = self.inner.documents_updating.acquire().await.unwrap();
let new_change = {
let cancel_handle = self.inner.change_cancel_handle.lock();
//This will cancel any other onging changes in favour of this one
cancel_handle
.0
.send(version)
.expect("change_handle disposed, this shouldn't happen");
let mut watched = cancel_handle.1.clone();
watched.borrow_and_update();
watched
};
(updating_doc_info, new_change)
}
}
impl Inner {
fn registry(&self) -> &Registry {
&self.registry
fn registry(&self) -> impl Future<Output = tokio::sync::RwLockReadGuard<Registry>> {
self.registry.read()
}
async fn close(&self, _fi: Url) {
@ -250,6 +221,7 @@ impl LanguageServer for RocLs {
panic_wrapper_async(|| async {
self.inner
.registry()
.await
.hover(&text_document.uri, position)
.await
})
@ -273,6 +245,7 @@ impl LanguageServer for RocLs {
panic_wrapper_async(|| async {
self.inner
.registry()
.await
.goto_definition(&text_document.uri, position)
.await
})
@ -286,7 +259,8 @@ impl LanguageServer for RocLs {
work_done_progress_params: _,
} = params;
panic_wrapper_async(|| async { self.inner.registry().formatting(&text_document.uri) }).await
panic_wrapper_async(|| async { self.inner.registry().await.formatting(&text_document.uri) })
.await
}
async fn semantic_tokens_full(
@ -299,24 +273,25 @@ impl LanguageServer for RocLs {
partial_result_params: _,
} = params;
panic_wrapper_async(|| async { self.inner.registry().semantic_tokens(&text_document.uri) })
.await
panic_wrapper_async(|| async {
self.inner
.registry()
.await
.semantic_tokens(&text_document.uri)
})
.await
}
async fn completion(&self, params: CompletionParams) -> Result<Option<CompletionResponse>> {
let doc = params.text_document_position;
eprintln!("starting completion");
eprintln!(
"permits::{:?} ",
self.inner.documents_updating.available_permits()
);
//We need to wait untill any changes that were in progress when we requested completion have applied
self.wait_for_changes().await;
eprintln!("waited for doc update to get sorted ");
let res = panic_wrapper_async(|| async {
self.inner
.registry()
.await
.completion_items(&doc.text_document.uri, doc.position)
.await
})
@ -327,12 +302,6 @@ impl LanguageServer for RocLs {
}
}
fn panic_wrapper<T>(f: impl FnOnce() -> Option<T> + std::panic::UnwindSafe) -> Result<Option<T>> {
match std::panic::catch_unwind(f) {
Ok(r) => Ok(r),
Err(_) => Err(tower_lsp::jsonrpc::Error::internal_error()),
}
}
async fn panic_wrapper_async<Fut, T>(
f: impl FnOnce() -> Fut + std::panic::UnwindSafe,
) -> Result<Option<T>>