mirror of
https://github.com/Myriad-Dreamin/tinymist.git
synced 2025-08-04 10:18:16 +00:00
dev: reduce dependencies a bit (#400)
* dev: disable default feature of typstyle * dev: add release launch * dev: remove threads and await tree * dev: remove await-tree
This commit is contained in:
parent
4acc39b237
commit
adff5a8a96
16 changed files with 62 additions and 499 deletions
|
@ -30,7 +30,6 @@ serde_json.workspace = true
|
|||
futures.workspace = true
|
||||
indexmap.workspace = true
|
||||
serde.workspace = true
|
||||
await-tree.workspace = true
|
||||
|
||||
clap = { workspace = true, optional = true }
|
||||
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
use await_tree::InstrumentAwait;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use log::{debug, info, trace, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
@ -7,7 +6,6 @@ use tokio::{net::TcpStream, sync::broadcast};
|
|||
use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
|
||||
use typst_ts_core::debug_loc::DocumentPosition;
|
||||
|
||||
use crate::await_tree::REGISTRY;
|
||||
use crate::debug_loc::{InternQuery, SpanInterner};
|
||||
use crate::outline::Outline;
|
||||
use crate::{
|
||||
|
@ -96,7 +94,6 @@ impl EditorConnection {
|
|||
.send(Message::Text(
|
||||
serde_json::to_string(&ControlPlaneResponse::SyncEditorChanges(())).unwrap(),
|
||||
))
|
||||
.instrument_await("sync editor changes")
|
||||
.await
|
||||
else {
|
||||
warn!("failed to send sync editor changes to editor");
|
||||
|
@ -108,7 +105,6 @@ impl EditorConnection {
|
|||
let sent = match self {
|
||||
EditorConnection::WebSocket(ws) => ws
|
||||
.send(Message::Text(serde_json::to_string(&resp).unwrap()))
|
||||
.instrument_await("send response to editor")
|
||||
.await
|
||||
.is_ok(),
|
||||
EditorConnection::Lsp(LspControlPlaneTx { resp_tx, .. }) => resp_tx.send(resp).is_ok(),
|
||||
|
@ -125,9 +121,7 @@ impl EditorConnection {
|
|||
match self {
|
||||
EditorConnection::Lsp(LspControlPlaneTx { ctl_rx, .. }) => ctl_rx.recv().await,
|
||||
EditorConnection::WebSocket(ws) => {
|
||||
let Some(Ok(Message::Text(msg))) =
|
||||
ws.next().instrument_await("waiting for websocket").await
|
||||
else {
|
||||
let Some(Ok(Message::Text(msg))) = ws.next().await else {
|
||||
return None;
|
||||
};
|
||||
|
||||
|
@ -202,22 +196,14 @@ impl EditorActor {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn run(self) {
|
||||
let root = REGISTRY
|
||||
.lock()
|
||||
.await
|
||||
.register("editor actor".into(), "editor actor");
|
||||
root.instrument(self.run_instrumented()).await;
|
||||
}
|
||||
|
||||
async fn run_instrumented(mut self) {
|
||||
pub async fn run(mut self) {
|
||||
if self.editor_conn.need_sync_files() {
|
||||
self.editor_conn.sync_editor_changes().await;
|
||||
}
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(msg) = self.mailbox.recv().instrument_await("waiting for mailbox") => {
|
||||
Some(msg) = self.mailbox.recv() => {
|
||||
trace!("EditorActor: received message from mailbox: {:?}", msg);
|
||||
let sent = match msg {
|
||||
EditorActorRequest::Shutdown => {
|
||||
|
@ -229,7 +215,6 @@ impl EditorActor {
|
|||
},
|
||||
EditorActorRequest::DocToSrcJumpResolve(req) => {
|
||||
self.source_scroll_by_span(req.span)
|
||||
.instrument_await("source scroll by span")
|
||||
.await;
|
||||
|
||||
false
|
||||
|
@ -246,7 +231,7 @@ impl EditorActor {
|
|||
break;
|
||||
}
|
||||
}
|
||||
Some(msg) = self.editor_conn.next().instrument_await("waiting for editor message") => {
|
||||
Some(msg) = self.editor_conn.next() => {
|
||||
match msg {
|
||||
ControlPlaneMessage::ChangeCursorPosition(cursor_info) => {
|
||||
debug!("EditorActor: received message from editor: {:?}", cursor_info);
|
||||
|
@ -264,7 +249,6 @@ impl EditorActor {
|
|||
debug!("EditorActor: received message from editor: {:?}", jump_info);
|
||||
|
||||
self.source_scroll_by_span(jump_info.span)
|
||||
.instrument_await("source scroll by span")
|
||||
.await;
|
||||
}
|
||||
ControlPlaneMessage::SyncMemoryFiles(memory_files) => {
|
||||
|
@ -294,12 +278,7 @@ impl EditorActor {
|
|||
|
||||
async fn source_scroll_by_span(&mut self, span: String) {
|
||||
let jump_info = {
|
||||
match self
|
||||
.span_interner
|
||||
.span_by_str(&span)
|
||||
.instrument_await("get span by str")
|
||||
.await
|
||||
{
|
||||
match self.span_interner.span_by_str(&span).await {
|
||||
InternQuery::Ok(s) => s,
|
||||
InternQuery::UseAfterFree => {
|
||||
warn!("EditorActor: out of date span id: {}", span);
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use await_tree::InstrumentAwait;
|
||||
use log::{debug, info, trace};
|
||||
use tokio::sync::{broadcast, mpsc, watch};
|
||||
use typst::model::Document;
|
||||
|
@ -8,7 +7,6 @@ use typst_ts_core::debug_loc::{ElementPoint, SourceSpanOffset};
|
|||
use typst_ts_core::TypstDocument;
|
||||
use typst_ts_svg_exporter::IncrSvgDocServer;
|
||||
|
||||
use crate::await_tree::REGISTRY;
|
||||
use crate::{debug_loc::SpanInterner, outline::Outline};
|
||||
|
||||
use super::{editor::EditorActorRequest, typst::TypstActorRequest, webview::WebviewActorRequest};
|
||||
|
@ -64,13 +62,6 @@ impl RenderActor {
|
|||
res
|
||||
}
|
||||
|
||||
pub fn spawn(self, peer_addr: String) {
|
||||
std::thread::Builder::new()
|
||||
.name("RenderActor".to_owned())
|
||||
.spawn(move || self.run(&peer_addr))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
async fn process_message(&mut self, msg: RenderActorRequest) -> bool {
|
||||
trace!("RenderActor: received message: {:?}", msg);
|
||||
|
||||
|
@ -117,28 +108,13 @@ impl RenderActor {
|
|||
res
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn run(self, peer_addr: &str) {
|
||||
let span = format!("render actor<{}>", peer_addr);
|
||||
let root = REGISTRY.lock().await.register(span.clone().into(), span);
|
||||
root.instrument(self.run_instrumented()).await;
|
||||
}
|
||||
|
||||
async fn run_instrumented(mut self) {
|
||||
pub async fn run(mut self) {
|
||||
loop {
|
||||
let mut has_full_render = false;
|
||||
debug!("RenderActor: waiting for message");
|
||||
match self
|
||||
.mailbox
|
||||
.recv()
|
||||
.instrument_await("waiting for message")
|
||||
.await
|
||||
{
|
||||
match self.mailbox.recv().await {
|
||||
Ok(msg) => {
|
||||
has_full_render |= self
|
||||
.process_message(msg)
|
||||
.instrument_await("processing message")
|
||||
.await;
|
||||
has_full_render |= self.process_message(msg).await;
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => {
|
||||
info!("RenderActor: no more messages");
|
||||
|
@ -150,10 +126,7 @@ impl RenderActor {
|
|||
}
|
||||
// read the queue to empty
|
||||
while let Ok(msg) = self.mailbox.try_recv() {
|
||||
has_full_render |= self
|
||||
.process_message(msg)
|
||||
.instrument_await("processing message")
|
||||
.await;
|
||||
has_full_render |= self.process_message(msg).await;
|
||||
}
|
||||
// if a full render is requested, we render the latest document
|
||||
// otherwise, we render the incremental changes for only once
|
||||
|
@ -204,29 +177,10 @@ impl OutlineRenderActor {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn spawn(self, peer_addr: String) {
|
||||
std::thread::Builder::new()
|
||||
.name("OutlineRenderActor".to_owned())
|
||||
.spawn(move || self.run(&peer_addr))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn run(self, peer_addr: &str) {
|
||||
let span = format!("outline render actor<{}>", peer_addr);
|
||||
let root = REGISTRY.lock().await.register(span.clone().into(), span);
|
||||
root.instrument(self.run_instrumented()).await;
|
||||
}
|
||||
|
||||
async fn run_instrumented(mut self) {
|
||||
pub async fn run(mut self) {
|
||||
loop {
|
||||
debug!("OutlineRenderActor: waiting for message");
|
||||
match self
|
||||
.signal
|
||||
.recv()
|
||||
.instrument_await("waiting for message")
|
||||
.await
|
||||
{
|
||||
match self.signal.recv().await {
|
||||
Ok(msg) => {
|
||||
debug!("OutlineRenderActor: received message: {:?}", msg);
|
||||
}
|
||||
|
@ -246,7 +200,7 @@ impl OutlineRenderActor {
|
|||
info!("OutlineRenderActor: document is not ready");
|
||||
continue;
|
||||
};
|
||||
let data = self.outline(&document).instrument_await("outline").await;
|
||||
let data = self.outline(&document).await;
|
||||
debug!("OutlineRenderActor: sending outline");
|
||||
let Ok(_) = self.editor_tx.send(EditorActorRequest::Outline(data)) else {
|
||||
info!("OutlineRenderActor: outline_sender is dropped");
|
||||
|
@ -262,7 +216,6 @@ impl OutlineRenderActor {
|
|||
interner.reset();
|
||||
crate::outline::outline(interner, document)
|
||||
})
|
||||
.instrument_await("generating outline with span interner")
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,13 +1,10 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use await_tree::InstrumentAwait;
|
||||
|
||||
use log::{debug, error, info};
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
use typst::syntax::Span;
|
||||
use typst_ts_core::debug_loc::{CharPosition, DocumentPosition, SourceLocation, SourceSpanOffset};
|
||||
|
||||
use crate::await_tree::REGISTRY;
|
||||
use crate::{
|
||||
ChangeCursorPositionRequest, EditorServer, MemoryFiles, MemoryFilesShort, SourceFileServer,
|
||||
SrcToDocJumpRequest,
|
||||
|
@ -80,25 +77,10 @@ impl<T> TypstActor<T> {
|
|||
}
|
||||
|
||||
impl<T: SourceFileServer + EditorServer> TypstActor<T> {
|
||||
pub async fn run(self) {
|
||||
let root = REGISTRY
|
||||
.lock()
|
||||
.await
|
||||
.register("typst actor".into(), "typst actor");
|
||||
root.instrument(self.run_instrumented()).await;
|
||||
}
|
||||
|
||||
pub async fn run_instrumented(mut self) {
|
||||
pub async fn run(mut self) {
|
||||
debug!("TypstActor: waiting for message");
|
||||
while let Some(mail) = self
|
||||
.mailbox
|
||||
.recv()
|
||||
.instrument_await("waiting for message")
|
||||
.await
|
||||
{
|
||||
self.process_mail(mail)
|
||||
.instrument_await("processing mail")
|
||||
.await;
|
||||
while let Some(mail) = self.mailbox.recv().await {
|
||||
self.process_mail(mail).await;
|
||||
}
|
||||
info!("TypstActor: exiting");
|
||||
}
|
||||
|
@ -107,10 +89,7 @@ impl<T: SourceFileServer + EditorServer> TypstActor<T> {
|
|||
match mail {
|
||||
TypstActorRequest::DocToSrcJumpResolve(span_range) => {
|
||||
debug!("TypstActor: processing doc2src: {:?}", span_range);
|
||||
let res = self
|
||||
.resolve_span_range(span_range)
|
||||
.instrument_await("resolve span range")
|
||||
.await;
|
||||
let res = self.resolve_span_range(span_range).await;
|
||||
|
||||
if let Some(info) = res {
|
||||
let _ = self
|
||||
|
@ -130,7 +109,6 @@ impl<T: SourceFileServer + EditorServer> TypstActor<T> {
|
|||
column: req.character,
|
||||
},
|
||||
}))
|
||||
.instrument_await("resolve span range")
|
||||
.await
|
||||
.map_err(|err| {
|
||||
error!("TypstActor: failed to resolve cursor position: {:#}", err);
|
||||
|
@ -157,7 +135,6 @@ impl<T: SourceFileServer + EditorServer> TypstActor<T> {
|
|||
column: req.character,
|
||||
},
|
||||
}))
|
||||
.instrument_await("resolve doc position")
|
||||
.await
|
||||
.map_err(|err| {
|
||||
error!("TypstActor: failed to resolve src to doc jump: {:#}", err);
|
||||
|
@ -191,10 +168,7 @@ impl<T: SourceFileServer + EditorServer> TypstActor<T> {
|
|||
);
|
||||
handle_error(
|
||||
"SyncMemoryFiles",
|
||||
self.client
|
||||
.update_memory_files(m, true)
|
||||
.instrument_await("sync memory files")
|
||||
.await,
|
||||
self.client.update_memory_files(m, true).await,
|
||||
);
|
||||
}
|
||||
TypstActorRequest::UpdateMemoryFiles(m) => {
|
||||
|
@ -204,20 +178,14 @@ impl<T: SourceFileServer + EditorServer> TypstActor<T> {
|
|||
);
|
||||
handle_error(
|
||||
"UpdateMemoryFiles",
|
||||
self.client
|
||||
.update_memory_files(m, false)
|
||||
.instrument_await("update memory files")
|
||||
.await,
|
||||
self.client.update_memory_files(m, false).await,
|
||||
);
|
||||
}
|
||||
TypstActorRequest::RemoveMemoryFiles(m) => {
|
||||
debug!("TypstActor: processing REMOVE memory files: {:?}", m.files);
|
||||
handle_error(
|
||||
"RemoveMemoryFiles",
|
||||
self.client
|
||||
.remove_shadow_files(m)
|
||||
.instrument_await("remove memory files")
|
||||
.await,
|
||||
self.client.remove_shadow_files(m).await,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -226,7 +194,6 @@ impl<T: SourceFileServer + EditorServer> TypstActor<T> {
|
|||
async fn resolve_span(&mut self, s: Span, offset: Option<usize>) -> Option<DocToSrcJumpInfo> {
|
||||
self.client
|
||||
.resolve_source_location(s, offset)
|
||||
.instrument_await("resolve span")
|
||||
.await
|
||||
.map_err(|err| {
|
||||
error!("TypstActor: failed to resolve doc to src jump: {:#}", err);
|
||||
|
@ -236,9 +203,7 @@ impl<T: SourceFileServer + EditorServer> TypstActor<T> {
|
|||
}
|
||||
|
||||
async fn resolve_span_offset(&mut self, s: SourceSpanOffset) -> Option<DocToSrcJumpInfo> {
|
||||
self.resolve_span(s.span, Some(s.offset))
|
||||
.instrument_await("resolve span offset")
|
||||
.await
|
||||
self.resolve_span(s.span, Some(s.offset)).await
|
||||
}
|
||||
|
||||
async fn resolve_span_range(
|
||||
|
@ -248,10 +213,7 @@ impl<T: SourceFileServer + EditorServer> TypstActor<T> {
|
|||
// Resolves FileLoC of start, end, and the element wide
|
||||
let st_res = self.resolve_span_offset(span_range.0).await;
|
||||
let ed_res = self.resolve_span_offset(span_range.1).await;
|
||||
let elem_res = self
|
||||
.resolve_span(span_range.1.span, None)
|
||||
.instrument_await("resolve span")
|
||||
.await;
|
||||
let elem_res = self.resolve_span(span_range.1.span, None).await;
|
||||
|
||||
// Combines the result of start and end
|
||||
let range_res = match (st_res, ed_res) {
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
use await_tree::InstrumentAwait;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use log::{info, trace};
|
||||
use tokio::{
|
||||
|
@ -8,10 +7,7 @@ use tokio::{
|
|||
use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
|
||||
use typst_ts_core::debug_loc::{DocumentPosition, ElementPoint};
|
||||
|
||||
use crate::{
|
||||
actor::{editor::DocToSrcJumpResolveRequest, render::ResolveSpanRequest},
|
||||
await_tree::REGISTRY,
|
||||
};
|
||||
use crate::actor::{editor::DocToSrcJumpResolveRequest, render::ResolveSpanRequest};
|
||||
|
||||
use super::{editor::EditorActorRequest, render::RenderActorRequest};
|
||||
|
||||
|
@ -74,28 +70,20 @@ impl WebviewActor {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn run(self, peer_addr: String) {
|
||||
let span = format!("webview actor<{}>", peer_addr);
|
||||
let root = REGISTRY.lock().await.register(span.clone().into(), span);
|
||||
root.instrument(self.run_instrumented()).await;
|
||||
}
|
||||
|
||||
pub async fn run_instrumented(mut self) {
|
||||
pub async fn run(mut self) {
|
||||
loop {
|
||||
tokio::select! {
|
||||
Ok(msg) = self.mailbox.recv().instrument_await("waiting for mailbox") => {
|
||||
Ok(msg) = self.mailbox.recv() => {
|
||||
trace!("WebviewActor: received message from mailbox: {:?}", msg);
|
||||
match msg {
|
||||
WebviewActorRequest::SrcToDocJump(jump_info) => {
|
||||
let msg = position_req("jump", jump_info);
|
||||
self.webview_websocket_conn.send(Message::Binary(msg.into_bytes()))
|
||||
.instrument_await("send SrcToDocJump message to webview")
|
||||
.await.unwrap();
|
||||
}
|
||||
WebviewActorRequest::ViewportPosition(jump_info) => {
|
||||
let msg = position_req("viewport", jump_info);
|
||||
self.webview_websocket_conn.send(Message::Binary(msg.into_bytes()))
|
||||
.instrument_await("send ViewportPosition message to webview")
|
||||
.await.unwrap();
|
||||
}
|
||||
// WebviewActorRequest::CursorPosition(jump_info) => {
|
||||
|
@ -106,18 +94,16 @@ impl WebviewActor {
|
|||
let json = serde_json::to_string(&jump_info).unwrap();
|
||||
let msg = format!("cursor-paths,{json}");
|
||||
self.webview_websocket_conn.send(Message::Binary(msg.into_bytes()))
|
||||
.instrument_await("send CursorPaths message to webview")
|
||||
.await.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(svg) = self.svg_receiver.recv().instrument_await("waiting for renderer") => {
|
||||
Some(svg) = self.svg_receiver.recv() => {
|
||||
trace!("WebviewActor: received svg from renderer");
|
||||
self.webview_websocket_conn.send(Message::Binary(svg))
|
||||
.instrument_await("send svg to webview")
|
||||
.await.unwrap();
|
||||
}
|
||||
Some(msg) = self.webview_websocket_conn.next().instrument_await("waiting for websocket") => {
|
||||
Some(msg) = self.webview_websocket_conn.next() => {
|
||||
trace!("WebviewActor: received message from websocket: {:?}", msg);
|
||||
let Ok(msg) = msg else {
|
||||
info!("WebviewActor: no more messages from websocket: {}", msg.unwrap_err());
|
||||
|
@ -126,7 +112,6 @@ impl WebviewActor {
|
|||
let Message::Text(msg) = msg else {
|
||||
info!("WebviewActor: received non-text message from websocket: {:?}", msg);
|
||||
let _ = self.webview_websocket_conn.send(Message::Text(format!("Webview Actor: error, received non-text message: {}", msg)))
|
||||
.instrument_await("send error message to webview")
|
||||
.await;
|
||||
break;
|
||||
};
|
||||
|
@ -158,9 +143,7 @@ impl WebviewActor {
|
|||
};
|
||||
} else {
|
||||
info!("WebviewActor: received unknown message from websocket: {}", msg);
|
||||
self.webview_websocket_conn.send(Message::Text(format!("error, received unknown message: {}", msg)))
|
||||
.instrument_await("send error message to webview")
|
||||
.await.unwrap();
|
||||
self.webview_websocket_conn.send(Message::Text(format!("error, received unknown message: {}", msg))).await.unwrap();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,25 +0,0 @@
|
|||
use std::borrow::Cow;
|
||||
|
||||
use await_tree::{Config, Registry};
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio::sync::Mutex;
|
||||
pub static REGISTRY: Lazy<Mutex<Registry<Cow<'static, str>>>> =
|
||||
Lazy::new(|| Mutex::new(Registry::new(Config::default())));
|
||||
|
||||
pub async fn get_await_tree_async() -> String {
|
||||
let trace = REGISTRY.lock().await;
|
||||
get_await_tree_impl(&trace)
|
||||
}
|
||||
|
||||
pub fn get_await_tree_blocking() -> String {
|
||||
let trace = REGISTRY.blocking_lock();
|
||||
get_await_tree_impl(&trace)
|
||||
}
|
||||
|
||||
fn get_await_tree_impl(trace: &Registry<Cow<'static, str>>) -> String {
|
||||
let mut res = trace.iter().collect::<Vec<_>>();
|
||||
res.sort_by_key(|&(k, _)| k);
|
||||
res.into_iter()
|
||||
.map(|(_, tree)| tree.to_string())
|
||||
.collect::<String>()
|
||||
}
|
|
@ -1,6 +1,5 @@
|
|||
mod actor;
|
||||
mod args;
|
||||
pub mod await_tree;
|
||||
mod debug_loc;
|
||||
mod outline;
|
||||
|
||||
|
@ -14,7 +13,6 @@ use std::pin::Pin;
|
|||
use std::time::Duration;
|
||||
use std::{collections::HashMap, future::Future, path::PathBuf, sync::Arc};
|
||||
|
||||
use ::await_tree::InstrumentAwait;
|
||||
use debug_loc::SpanInterner;
|
||||
use futures::SinkExt;
|
||||
use log::info;
|
||||
|
@ -260,9 +258,7 @@ pub async fn preview<T: CompileHost + Send + Sync + 'static>(
|
|||
let shutdown_tx = lsp_connection.as_ref().map(|e| e.shutdown_tx.clone());
|
||||
tokio::spawn(async move {
|
||||
// Create the event loop and TCP listener we'll accept connections on.
|
||||
let try_socket = TcpListener::bind(&data_plane_addr)
|
||||
.instrument_await("bind data plane server")
|
||||
.await;
|
||||
let try_socket = TcpListener::bind(&data_plane_addr).await;
|
||||
let listener = try_socket.expect("Failed to bind");
|
||||
info!(
|
||||
"Data plane server listening on: {}",
|
||||
|
@ -275,15 +271,9 @@ pub async fn preview<T: CompileHost + Send + Sync + 'static>(
|
|||
let webview_tx = webview_tx.clone();
|
||||
let webview_rx = webview_tx.subscribe();
|
||||
let typst_tx = typst_tx.clone();
|
||||
let peer_addr = stream
|
||||
.peer_addr()
|
||||
.map_or("unknown".to_string(), |addr| addr.to_string());
|
||||
let mut conn = accept_connection(stream)
|
||||
.instrument_await("accept data plane websocket connection")
|
||||
.await;
|
||||
let mut conn = accept_connection(stream).await;
|
||||
if enable_partial_rendering {
|
||||
conn.send(Message::Binary("partial-rendering,true".into()))
|
||||
.instrument_await("send partial-rendering message to webview")
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
@ -291,7 +281,6 @@ pub async fn preview<T: CompileHost + Send + Sync + 'static>(
|
|||
conn.send(Message::Binary(
|
||||
format!("invert-colors,{}", invert_colors).into(),
|
||||
))
|
||||
.instrument_await("send invert-colors message to webview")
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
@ -307,7 +296,6 @@ pub async fn preview<T: CompileHost + Send + Sync + 'static>(
|
|||
);
|
||||
|
||||
let alive_tx = alive_tx.clone();
|
||||
let wr = webview_actor.run(peer_addr.clone());
|
||||
tokio::spawn(async move {
|
||||
struct FinallySend(mpsc::UnboundedSender<()>);
|
||||
impl Drop for FinallySend {
|
||||
|
@ -317,7 +305,7 @@ pub async fn preview<T: CompileHost + Send + Sync + 'static>(
|
|||
}
|
||||
|
||||
let _send = FinallySend(alive_tx);
|
||||
wr.await;
|
||||
webview_actor.run().await;
|
||||
});
|
||||
let render_actor = actor::render::RenderActor::new(
|
||||
renderer_tx.subscribe(),
|
||||
|
@ -326,14 +314,14 @@ pub async fn preview<T: CompileHost + Send + Sync + 'static>(
|
|||
svg.0,
|
||||
webview_tx,
|
||||
);
|
||||
render_actor.spawn(peer_addr.clone());
|
||||
tokio::spawn(render_actor.run());
|
||||
let outline_render_actor = actor::render::OutlineRenderActor::new(
|
||||
renderer_tx.subscribe(),
|
||||
doc_watcher.1.clone(),
|
||||
editor_tx.clone(),
|
||||
span_interner,
|
||||
);
|
||||
outline_render_actor.spawn(peer_addr);
|
||||
tokio::spawn(outline_render_actor.run());
|
||||
};
|
||||
|
||||
let mut alive_cnt = 0;
|
||||
|
@ -343,18 +331,18 @@ pub async fn preview<T: CompileHost + Send + Sync + 'static>(
|
|||
shutdown_bell.reset();
|
||||
}
|
||||
tokio::select! {
|
||||
Some(()) = shutdown_data_plane_rx.recv().instrument_await("data plane exit signal") => {
|
||||
Some(()) = shutdown_data_plane_rx.recv() => {
|
||||
info!("Data plane server shutdown");
|
||||
return;
|
||||
}
|
||||
Ok((stream, _)) = listener.accept().instrument_await("accept data plane connection") => {
|
||||
Ok((stream, _)) = listener.accept() => {
|
||||
alive_cnt += 1;
|
||||
recv(stream).await;
|
||||
},
|
||||
_ = alive_rx.recv().instrument_await("data plane alive signal") => {
|
||||
_ = alive_rx.recv() => {
|
||||
alive_cnt -= 1;
|
||||
}
|
||||
_ = shutdown_bell.tick().instrument_await("data plane interval"), if alive_cnt == 0 && shutdown_tx.is_some() => {
|
||||
_ = shutdown_bell.tick(), if alive_cnt == 0 && shutdown_tx.is_some() => {
|
||||
let shutdown_tx = shutdown_tx.expect("scheduled shutdown without shutdown_tx");
|
||||
|
||||
info!("Data plane server has been idle for {idle_timeout:?}, shutting down.");
|
||||
|
@ -374,23 +362,15 @@ pub async fn preview<T: CompileHost + Send + Sync + 'static>(
|
|||
let editor_rx = editor_conn.1;
|
||||
tokio::spawn(async move {
|
||||
let conn = if !control_plane_addr.is_empty() {
|
||||
let try_socket = TcpListener::bind(&control_plane_addr)
|
||||
.instrument_await("bind control plane server")
|
||||
.await;
|
||||
let try_socket = TcpListener::bind(&control_plane_addr).await;
|
||||
let listener = try_socket.expect("Failed to bind");
|
||||
info!(
|
||||
"Control plane server listening on: {}",
|
||||
listener.local_addr().unwrap()
|
||||
);
|
||||
let (stream, _) = listener
|
||||
.accept()
|
||||
.instrument_await("accept control plane connection")
|
||||
.await
|
||||
.unwrap();
|
||||
let (stream, _) = listener.accept().await.unwrap();
|
||||
|
||||
let conn = accept_connection(stream)
|
||||
.instrument_await("accept control plane websocket connection")
|
||||
.await;
|
||||
let conn = accept_connection(stream).await;
|
||||
|
||||
EditorConnection::WebSocket(conn)
|
||||
} else {
|
||||
|
@ -399,10 +379,7 @@ pub async fn preview<T: CompileHost + Send + Sync + 'static>(
|
|||
|
||||
let editor_actor =
|
||||
EditorActor::new(editor_rx, conn, typst_tx, webview_tx, span_interner);
|
||||
editor_actor
|
||||
.run()
|
||||
.instrument_await("run editor actor")
|
||||
.await;
|
||||
editor_actor.run().await;
|
||||
info!("Control plane client shutdown");
|
||||
})
|
||||
};
|
||||
|
@ -427,10 +404,7 @@ pub async fn preview<T: CompileHost + Send + Sync + 'static>(
|
|||
let editor_tx = editor_conn.0;
|
||||
let stop = move || -> StopFuture {
|
||||
Box::pin(async move {
|
||||
let _ = shutdown_data_plane_tx
|
||||
.send(())
|
||||
.instrument_await("wait data plane")
|
||||
.await;
|
||||
let _ = shutdown_data_plane_tx.send(()).await;
|
||||
let _ = editor_tx.send(EditorActorRequest::Shutdown);
|
||||
})
|
||||
};
|
||||
|
@ -452,7 +426,6 @@ async fn accept_connection(stream: TcpStream) -> WebSocketStream<TcpStream> {
|
|||
info!("Peer address: {}", addr);
|
||||
|
||||
let ws_stream = tokio_tungstenite::accept_async(stream)
|
||||
.instrument_await("accept websocket connection")
|
||||
.await
|
||||
.expect("Error during the websocket handshake occurred");
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue