completed refactor

added cargo lock

Signed-off-by: faldor20 <eli.jambu@yahoo.com>
This commit is contained in:
Eli Dowling 2023-12-10 10:28:17 +10:00 committed by faldor20
parent 58dec9af28
commit a2c8acd9ac
No known key found for this signature in database
GPG key ID: F2216079B890CD57
6 changed files with 217 additions and 362 deletions

View file

@ -1,17 +1,14 @@
use analysis::HIGHLIGHT_TOKENS_LEGEND;
use registry::{DocumentChange, Registry};
use registry::Registry;
use std::future::Future;
use std::io::Write;
use std::sync::{Arc, OnceLock};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{oneshot, Mutex, MutexGuard, RwLock};
use tokio::task::{JoinError, JoinHandle};
use tower_lsp::jsonrpc::Result;
use tower_lsp::lsp_types::request::RegisterCapability;
use tower_lsp::lsp_types::*;
use tower_lsp::{Client, LanguageServer, LspService, Server};
use crate::analysis::global_anal;
use crate::analysis::global_analysis;
mod analysis;
mod convert;
@ -25,7 +22,7 @@ struct RocLs {
struct Inner {
client: Client,
registry: Registry,
change_handle: parking_lot::Mutex<(
change_cancel_handle: parking_lot::Mutex<(
tokio::sync::watch::Sender<i32>,
tokio::sync::watch::Receiver<i32>,
)>,
@ -41,7 +38,7 @@ impl RocLs {
inner: Arc::new(Inner {
client,
registry: Registry::default(),
change_handle: parking_lot::Mutex::new(tokio::sync::watch::channel(0)),
change_cancel_handle: parking_lot::Mutex::new(tokio::sync::watch::channel(0)),
///Used for identifying if the intial stage of a document update is complete
documents_updating: tokio::sync::Semaphore::new(SEMLIMIT as usize),
}),
@ -49,7 +46,8 @@ impl RocLs {
}
///Wait for all the semaphores associated with an in-progress document_info update to be released
async fn wait_for_changes(&self) {
self.inner.documents_updating.acquire_many(SEMLIMIT).await;
//We don't actually need to return the permit, all we want is to have it momenterily which guarentees that any updates started before the call to this are complete
drop(self.inner.documents_updating.acquire_many(SEMLIMIT).await.expect("semaphore permits could not be aquired for the document updating process. This is unrecoverable"));
}
pub fn capabilities() -> ServerCapabilities {
let text_document_sync = TextDocumentSyncCapability::Options(
@ -105,27 +103,17 @@ impl RocLs {
/// Records a document content change.
async fn change(&self, fi: Url, text: String, version: i32) {
writeln!(std::io::stderr(), "starting change");
let updating_doc_info = self.inner.documents_updating.acquire().await.unwrap();
let mut new_change = {
let change_handle = self.inner.change_handle.lock();
//This will cancel any other onging changes in favour of this one
change_handle
.0
.send(version)
.expect("change_handle disposed, this shouldn't happen");
let mut watched = change_handle.1.clone();
drop(watched.borrow_and_update());
watched
};
//We will wait just a tiny amount of time to catch any requests that come in at the exact same time
eprintln!("starting change");
let (updating_doc_info, mut new_change) = self.change_preamble(version).await;
tokio::time::sleep(Duration::from_millis(20)).await;
if (new_change.has_changed().unwrap()) {
writeln!(std::io::stderr(), "newer task started almost immediately");
if new_change.has_changed().unwrap() {
eprintln!("newer task started almost immediately");
return;
}
writeln!(std::io::stderr(), "finished checking for cancellation");
let (results, partial) = global_anal(fi.clone(), text, version as u32);
eprintln!("finished checking for cancellation");
let (results, partial) = global_analysis(fi.clone(), text, version);
self.inner
.registry
@ -133,64 +121,76 @@ impl RocLs {
.await;
drop(updating_doc_info);
writeln!(std::io::stderr(), "finished applying");
eprintln!("finished updating docinfo, starting analysis ",);
let inner_ref = self.inner.clone();
let handle: JoinHandle<core::result::Result<&str, JoinError>> =
tokio::task::spawn(async move {
let results = tokio::task::spawn_blocking(results).await?;
let handle = async {
let results = match tokio::task::spawn_blocking(results).await {
Err(e) => return Err(format!("document analysis failed. reason:{:?}", e)),
Ok(a) => a,
};
inner_ref.registry().apply_change(results).await;
Ok("okay")
});
inner_ref.registry().apply_changes(results).await;
Ok(())
};
writeln!(std::io::stderr(), "waiting on analisys or cancel");
eprintln!("waiting on analysis or cancel");
//The analysis task can be cancelled by another change coming in which will update the watched variable
let cancelled = tokio::select! {
a=handle=>{
match a{
Err(a)=>
{
writeln!(std::io::stderr(), "error in task{:?}",a);
true},
Ok(_)=>false
}
},
_=new_change.changed()=>true
let finished = tokio::select! {
a=handle=>a,
_=new_change.changed()=>Err("Cancellation triggered from change".to_string())
};
if cancelled {
writeln!(std::io::stderr(), "cancelled change");
return;
match finished {
Err(e) => {
eprintln!("cancelled change. Reason:{:?}", e);
return;
}
Ok(_) => (),
}
writeln!(std::io::stderr(), "applied_change getting diagnostics");
//We do this to briefly yeild
eprintln!("applied_change getting and returning diagnostics");
let diagnostics = self.inner.registry().diagnostics(&fi).await;
writeln!(std::io::stderr(), "applied_change returning diagnostics");
self.inner
.client
.publish_diagnostics(fi, diagnostics, Some(version))
.await;
}
/// Does some bookkeeping before the change.
///aquires a semphore permit to let future functions know that there is a change ongoing
///sends a message to the watched change_handle to cancel other changes
///returns a copy of the watcher to use in our own cancellation
async fn change_preamble(
&self,
version: i32,
) -> (
tokio::sync::SemaphorePermit<'_>,
tokio::sync::watch::Receiver<i32>,
) {
let updating_doc_info = self.inner.documents_updating.acquire().await.unwrap();
let new_change = {
let cancel_handle = self.inner.change_cancel_handle.lock();
//This will cancel any other onging changes in favour of this one
cancel_handle
.0
.send(version)
.expect("change_handle disposed, this shouldn't happen");
let mut watched = cancel_handle.1.clone();
watched.borrow_and_update();
watched
};
(updating_doc_info, new_change)
}
}
impl Inner {
fn registry(&self) -> &Registry {
&self.registry
}
fn registry_write(&mut self) -> &mut Registry {
&mut self.registry
}
async fn close(&self, fi: Url) {
// self.registry()
// .apply_change(DocumentChange::Closed(fi))
// .await;
async fn close(&self, _fi: Url) {
()
}
}
@ -304,16 +304,15 @@ impl LanguageServer for RocLs {
}
async fn completion(&self, params: CompletionParams) -> Result<Option<CompletionResponse>> {
let doc = params.text_document_position;
writeln!(std::io::stderr(), "starting completion");
writeln!(
std::io::stderr(),
eprintln!("starting completion");
eprintln!(
"permits::{:?} ",
self.inner.documents_updating.available_permits()
);
//We need to wait untill any changes that were in progress when we requested completion have applied
self.wait_for_changes().await;
writeln!(std::io::stderr(), "waited for doc update to get sorted ");
eprintln!("waited for doc update to get sorted ");
let res = panic_wrapper_async(|| async {
self.inner
@ -323,7 +322,7 @@ impl LanguageServer for RocLs {
})
.await;
writeln!(std::io::stderr(), "finished completion");
eprintln!("finished completion");
res
}
}