feat: initialize nodes in deferred context

This commit is contained in:
Myriad-Dreamin 2024-03-09 16:15:36 +08:00
parent 23d6237dfe
commit 6ca11dc93c
2 changed files with 120 additions and 53 deletions

View file

@ -1,5 +1,6 @@
use std::{borrow::Cow, path::PathBuf, sync::Arc};
use ::typst::util::Deferred;
use parking_lot::Mutex;
use tinymist_query::DiagnosticsMap;
use tokio::sync::{broadcast, mpsc, watch};
@ -7,7 +8,7 @@ use typst_ts_core::config::CompileOpts;
use self::{
render::PdfExportActor,
typst::{create_server, CompileCluster, CompileDriver, CompileHandler, CompileNode},
typst::{create_server, CompileCluster, CompileHandler, CompileNode},
};
use crate::{ConstConfig, LspHost};
@ -22,7 +23,12 @@ struct Repr {
}
impl Repr {
fn server(&mut self, name: String, roots: Vec<PathBuf>) -> CompileNode<CompileHandler> {
fn server(
&mut self,
name: String,
roots: Vec<PathBuf>,
entry: Option<PathBuf>,
) -> Deferred<CompileNode<CompileHandler>> {
let (doc_tx, doc_rx) = watch::channel(None);
let (render_tx, _) = broadcast::channel(10);
@ -39,7 +45,9 @@ impl Repr {
create_server(
name,
&self.config,
CompileDriver::new(roots.clone(), opts),
roots.clone(),
opts,
entry,
self.diag_tx.clone(),
doc_tx,
render_tx,
@ -54,7 +62,7 @@ impl Repr {
) -> CompileCluster {
let diag_rx = self.diag_rx.take().expect("diag_rx is poisoned");
let primary = self.server("primary".to_owned(), roots.clone());
let primary = self.server("primary".to_owned(), roots.clone(), None);
CompileCluster::new(fac, host, roots, &self.config, primary, diag_rx)
}
}
@ -73,8 +81,13 @@ impl ActorFactory {
})))
}
fn server(&self, name: String, roots: Vec<PathBuf>) -> CompileNode<CompileHandler> {
self.0.lock().server(name, roots)
fn server(
&self,
name: String,
roots: Vec<PathBuf>,
entry: Option<PathBuf>,
) -> Deferred<CompileNode<CompileHandler>> {
self.0.lock().server(name, roots, entry)
}
pub fn prepare_cluster(&self, host: LspHost, roots: Vec<PathBuf>) -> CompileCluster {

View file

@ -17,6 +17,7 @@ use typst::{
diag::{FileResult, SourceDiagnostic, SourceResult},
layout::Position,
syntax::{Source, Span, VirtualPath},
util::Deferred,
};
use typst_preview::{
CompilationHandle, CompilationHandleImpl, CompileHost, CompileStatus, DocToSrcJumpInfo,
@ -47,13 +48,42 @@ type Node = CompileNode<CompileHandler>;
type DiagnosticsSender = mpsc::UnboundedSender<(String, Option<DiagnosticsMap>)>;
// pub struct LazyCompileDriver {
// value: QueryRef<CompileDriver, (), (Vec<PathBuf>, CompileOpts)>,
// }
// impl LazyCompileDriver {
// pub fn new(roots: Vec<PathBuf>, opts: CompileOpts) -> Self {
// Self {
// value: QueryRef::with_context((roots, opts)),
// }
// }
// pub fn get(&self) -> &CompileDriver {
// let value = self.value.compute_with_context_ref(|(roots, opts)| {
// let driver = CompileDriver::new(roots.clone(), opts);
// Ok(driver)
// });
// value.unwrap()
// }
// }
// impl Deref for LazyCompileDriver {
// type Target = CompileDriver;
// fn deref(&self) -> &Self::Target {
// self.get()
// }
// }
pub struct CompileCluster {
roots: Vec<PathBuf>,
actor_factory: ActorFactory,
position_encoding: PositionEncoding,
memory_changes: RwLock<HashMap<Arc<Path>, MemoryFileMeta>>,
primary: Node,
main: Mutex<Option<Node>>,
primary: Deferred<Node>,
main: Arc<Mutex<Option<Deferred<Node>>>>,
pub tokens_cache: SemanticTokenCache,
actor: Option<CompileClusterActor>,
}
@ -64,7 +94,7 @@ impl CompileCluster {
host: LspHost,
roots: Vec<PathBuf>,
cfg: &ConstConfig,
primary: Node,
primary: Deferred<Node>,
diag_rx: mpsc::UnboundedReceiver<(String, Option<DiagnosticsMap>)>,
) -> Self {
Self {
@ -73,7 +103,7 @@ impl CompileCluster {
position_encoding: cfg.position_encoding,
memory_changes: RwLock::new(HashMap::new()),
primary,
main: Mutex::new(None),
main: Arc::new(Mutex::new(None)),
tokens_cache: Default::default(),
actor: Some(CompileClusterActor {
host,
@ -92,9 +122,9 @@ impl CompileCluster {
pub async fn activate_doc(&self, new_entry: Option<ImmutPath>) -> Result<(), Error> {
match new_entry {
Some(new_entry) => self.primary.change_entry(new_entry).await?,
Some(new_entry) => self.primary.wait().change_entry(new_entry).await?,
None => {
self.primary.disable().await;
self.primary.wait().disable().await;
}
}
@ -110,7 +140,7 @@ impl CompileCluster {
.map_err(|_| error_once!("invalid url"))?;
let path = path.as_path().into();
m.as_mut().unwrap().change_entry(path).await
m.as_mut().unwrap().wait().change_entry(path).await
}
(Some(new_entry), false) => {
let path = new_entry
@ -118,17 +148,16 @@ impl CompileCluster {
.map_err(|_| error_once!("invalid url"))?;
let path = path.as_path().into();
let main_node = self
.actor_factory
.server("main".to_owned(), self.roots.clone());
main_node.change_entry(path).await?;
let main_node =
self.actor_factory
.server("main".to_owned(), self.roots.clone(), Some(path));
*m = Some(main_node);
Ok(())
}
(None, true) => {
// todo: unpin main
m.as_mut().unwrap().disable().await;
m.as_mut().unwrap().wait().disable().await;
Ok(())
}
@ -137,40 +166,55 @@ impl CompileCluster {
}
}
#[allow(clippy::too_many_arguments)]
pub fn create_server(
diag_group: String,
cfg: &ConstConfig,
compiler_driver: CompileDriver,
roots: Vec<PathBuf>,
opts: CompileOpts,
entry: Option<PathBuf>,
diag_tx: DiagnosticsSender,
doc_sender: watch::Sender<Option<Arc<TypstDocument>>>,
render_tx: broadcast::Sender<RenderActorRequest>,
) -> Node {
let root = compiler_driver.inner.world.root.as_ref().to_owned();
let handler: CompileHandler = compiler_driver.handler.clone();
) -> Deferred<Node> {
let cfg = cfg.clone();
let current_runtime = tokio::runtime::Handle::current();
Deferred::new(move || {
let compiler_driver = CompileDriver::new(roots.clone(), opts, entry.clone());
let root = compiler_driver.inner.world.root.as_ref().to_owned();
let handler: CompileHandler = compiler_driver.handler.clone();
let driver = CompileExporter::new(compiler_driver).with_exporter(Box::new(
move |_w: &dyn TypstWorld, doc| {
let _ = doc_sender.send(Some(doc));
// todo: is it right that ignore zero broadcast receiver?
let _ = render_tx.send(RenderActorRequest::Render);
let driver = CompileExporter::new(compiler_driver).with_exporter(Box::new(
move |_w: &dyn TypstWorld, doc| {
let _ = doc_sender.send(Some(doc));
// todo: is it right that ignore zero broadcast receiver?
let _ = render_tx.send(RenderActorRequest::Render);
Ok(())
},
));
let driver = Reporter {
diag_group: diag_group.clone(),
position_encoding: cfg.position_encoding,
diag_tx,
inner: driver,
cb: handler.clone(),
};
let driver = CompileActor::new(driver, root).with_watch(true);
Ok(())
},
));
let driver = Reporter {
diag_group: diag_group.clone(),
position_encoding: cfg.position_encoding,
diag_tx,
inner: driver,
cb: handler.clone(),
};
let driver = CompileActor::new(driver, root).with_watch(true);
let (server, client) = driver.split();
let (server, client) = driver.split();
tokio::spawn(server.spawn());
current_runtime.spawn(server.spawn());
CompileNode::new(diag_group, cfg.position_encoding, handler, client)
let this = CompileNode::new(diag_group, cfg.position_encoding, handler, client);
// todo: less bug-prone code
if let Some(entry) = entry {
this.entry.lock().unwrap().replace(entry.into());
}
this
})
}
pub struct CompileClusterActor {
@ -338,15 +382,19 @@ struct MemoryFileMeta {
impl CompileCluster {
async fn update_source(&self, files: FileChangeSet) -> Result<(), Error> {
let primary = Some(&self.primary);
let main = self.main.lock().await;
let main = main.as_ref();
let clients_to_notify = (primary.iter()).chain(main.iter());
let primary = self.primary.clone();
let main = self.main.clone();
tokio::spawn(async move {
let primary = Some(&primary);
let main = main.lock().await;
let main = main.as_ref();
let clients_to_notify = (primary.iter()).chain(main.iter());
for client in clients_to_notify {
let iw = client.inner.lock().await;
iw.add_memory_changes(MemoryEvent::Update(files.clone()));
}
for client in clients_to_notify {
let iw = client.wait().inner.lock().await;
iw.add_memory_changes(MemoryEvent::Update(files.clone()));
}
});
Ok(())
}
@ -487,13 +535,13 @@ impl CompileCluster {
None => {
// todo: race condition, we need atomic primary query
if let Some(path) = query.associated_path() {
self.primary.change_entry(path.into()).await?;
self.primary.wait().change_entry(path.into()).await?;
}
&self.primary
}
};
query_target.query(query).await
query_target.wait().query(query).await
}
}
}
@ -542,7 +590,7 @@ impl CompileMiddleware for CompileDriver {
}
impl CompileDriver {
pub fn new(roots: Vec<PathBuf>, opts: CompileOpts) -> Self {
pub fn new(roots: Vec<PathBuf>, opts: CompileOpts, entry: Option<PathBuf>) -> Self {
let world = TypstSystemWorld::new(opts).expect("incorrect options");
let mut driver = CompileDriverInner::new(world);
@ -555,14 +603,20 @@ impl CompileDriver {
)]),
));
Self {
let mut this = Self {
inner: driver,
roots,
handler: CompileHandler {
result: Arc::new(SyncMutex::new(Err(CompileStatus::Compiling))),
inner: Arc::new(SyncMutex::new(None)),
},
};
if let Some(entry) = entry {
this.set_entry_file(entry);
}
this
}
// todo: determine root