feat: listen data plane socket and serve frontend html on same address (#577)

* g1

* g2

* g3

* g4

* dev: clean up

* dev: remove deps

* dev: extract recv function

* move serve function

* hide generics

* recover fut change

* remove a useless function

* rename message back

* remove dyn frontend_html

* remove duplicated types

* remove wrong comment

* reduce glue codes

* remove dirty code

* feat: maintain compatibility

* dev: human name
This commit is contained in:
Myriad-Dreamin 2024-09-06 15:33:40 +08:00 committed by GitHub
parent 5b1d5ce331
commit 67f148ce44
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 661 additions and 438 deletions

View file

@ -19,7 +19,6 @@ reflexo-vec2svg.workspace = true
reflexo-typst.workspace = true
once_cell.workspace = true
tokio.workspace = true
tokio-tungstenite.workspace = true
env_logger.workspace = true
log.workspace = true
serde_json.workspace = true

View file

@ -1,10 +1,8 @@
use futures::{SinkExt, StreamExt};
use log::{debug, info, trace, warn};
use reflexo_typst::debug_loc::DocumentPosition;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::{net::TcpStream, sync::broadcast};
use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
use crate::debug_loc::{InternQuery, SpanInterner};
use crate::outline::Outline;
@ -42,31 +40,33 @@ pub enum EditorActorRequest {
CompileStatus(CompileStatus),
}
pub struct LspControlPlaneTx {
pub struct ControlPlaneTx {
pub is_standalone: bool,
pub resp_tx: mpsc::UnboundedSender<ControlPlaneResponse>,
pub ctl_rx: mpsc::UnboundedReceiver<ControlPlaneMessage>,
pub shutdown_tx: mpsc::Sender<()>,
}
pub struct LspControlPlaneRx {
pub struct ControlPlaneRx {
pub resp_rx: mpsc::UnboundedReceiver<ControlPlaneResponse>,
pub ctl_tx: mpsc::UnboundedSender<ControlPlaneMessage>,
pub shutdown_rx: mpsc::Receiver<()>,
}
impl LspControlPlaneTx {
pub fn new() -> (LspControlPlaneTx, LspControlPlaneRx) {
impl ControlPlaneTx {
pub fn new(need_sync_files: bool) -> (ControlPlaneTx, ControlPlaneRx) {
let (resp_tx, resp_rx) = mpsc::unbounded_channel();
let (ctl_tx, ctl_rx) = mpsc::unbounded_channel();
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
(
Self {
is_standalone: need_sync_files,
resp_tx,
ctl_rx,
shutdown_tx,
},
LspControlPlaneRx {
ControlPlaneRx {
resp_rx,
ctl_tx,
shutdown_rx,
@ -75,41 +75,21 @@ impl LspControlPlaneTx {
}
}
pub enum EditorConnection {
WebSocket(WebSocketStream<TcpStream>),
Lsp(LspControlPlaneTx),
}
impl EditorConnection {
impl ControlPlaneTx {
fn need_sync_files(&self) -> bool {
matches!(self, EditorConnection::WebSocket(_))
self.is_standalone
}
async fn sync_editor_changes(&mut self) {
let EditorConnection::WebSocket(ws) = self else {
return;
};
let Ok(_) = ws
.send(Message::Text(
serde_json::to_string(&ControlPlaneResponse::SyncEditorChanges(())).unwrap(),
))
.await
else {
warn!("failed to send sync editor changes to editor");
return;
};
self.resp_ctl_plane(
"SyncEditorChanges",
ControlPlaneResponse::SyncEditorChanges(()),
)
.await;
}
async fn resp_ctl_plane(&mut self, loc: &str, resp: ControlPlaneResponse) -> bool {
let sent = match self {
EditorConnection::WebSocket(ws) => ws
.send(Message::Text(serde_json::to_string(&resp).unwrap()))
.await
.is_ok(),
EditorConnection::Lsp(LspControlPlaneTx { resp_tx, .. }) => resp_tx.send(resp).is_ok(),
};
let sent = self.resp_tx.send(resp).is_ok();
if !sent {
warn!("failed to send {loc} response to editor");
}
@ -118,27 +98,13 @@ impl EditorConnection {
}
async fn next(&mut self) -> Option<ControlPlaneMessage> {
match self {
EditorConnection::Lsp(LspControlPlaneTx { ctl_rx, .. }) => ctl_rx.recv().await,
EditorConnection::WebSocket(ws) => {
let Some(Ok(Message::Text(msg))) = ws.next().await else {
return None;
};
let Ok(msg) = serde_json::from_str::<ControlPlaneMessage>(&msg) else {
warn!("failed to parse control plane request: {msg:?}");
return None;
};
Some(msg)
}
}
self.ctl_rx.recv().await
}
}
pub struct EditorActor {
mailbox: mpsc::UnboundedReceiver<EditorActorRequest>,
editor_conn: EditorConnection,
editor_conn: ControlPlaneTx,
world_sender: mpsc::UnboundedSender<TypstActorRequest>,
webview_sender: broadcast::Sender<WebviewActorRequest>,
@ -181,7 +147,7 @@ pub enum ControlPlaneResponse {
impl EditorActor {
pub fn new(
mailbox: mpsc::UnboundedReceiver<EditorActorRequest>,
editor_websocket_conn: EditorConnection,
editor_websocket_conn: ControlPlaneTx,
world_sender: mpsc::UnboundedSender<TypstActorRequest>,
webview_sender: broadcast::Sender<WebviewActorRequest>,
span_interner: SpanInterner,
@ -270,7 +236,7 @@ impl EditorActor {
info!("EditorActor: editor disconnected");
if !matches!(self.editor_conn, EditorConnection::Lsp(_)) {
if self.editor_conn.is_standalone {
info!("EditorActor: shutting down whole program");
std::process::exit(0);
}

View file

@ -1,13 +1,13 @@
use futures::{SinkExt, StreamExt};
// use hyper_tungstenite::tungstenite::Message;
use log::{info, trace};
use reflexo_typst::debug_loc::{DocumentPosition, ElementPoint};
use tokio::{
net::TcpStream,
sync::{broadcast, mpsc},
};
use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
use tokio::sync::{broadcast, mpsc};
use crate::actor::{editor::DocToSrcJumpResolveRequest, render::ResolveSpanRequest};
use crate::{
actor::{editor::DocToSrcJumpResolveRequest, render::ResolveSpanRequest},
Message, WsError,
};
use super::{editor::EditorActorRequest, render::RenderActorRequest};
@ -29,8 +29,11 @@ fn position_req(
format!("{event},{page_no} {x} {y}")
}
pub struct WebviewActor {
webview_websocket_conn: WebSocketStream<TcpStream>,
pub struct WebviewActor<
'a,
C: futures::Sink<Message, Error = WsError> + futures::Stream<Item = Result<Message, WsError>>,
> {
webview_websocket_conn: std::pin::Pin<&'a mut C>,
svg_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
mailbox: broadcast::Receiver<WebviewActorRequest>,
@ -46,14 +49,18 @@ pub struct Channels {
),
}
impl WebviewActor {
impl<
'a,
C: futures::Sink<Message, Error = WsError> + futures::Stream<Item = Result<Message, WsError>>,
> WebviewActor<'a, C>
{
pub fn set_up_channels() -> Channels {
Channels {
svg: mpsc::unbounded_channel(),
}
}
pub fn new(
websocket_conn: WebSocketStream<TcpStream>,
websocket_conn: std::pin::Pin<&'a mut C>,
svg_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
broadcast_sender: broadcast::Sender<WebviewActorRequest>,
mailbox: broadcast::Receiver<WebviewActorRequest>,
@ -88,7 +95,7 @@ impl WebviewActor {
}
// WebviewActorRequest::CursorPosition(jump_info) => {
// let msg = position_req("cursor", jump_info);
// self.webview_websocket_conn.send(Message::Binary(msg.into_bytes())).await.unwrap();
// self.webview_websocket_conn.send(WsMessage::Binary(msg.into_bytes())).await.unwrap();
// }
WebviewActorRequest::CursorPaths(jump_info) => {
let json = serde_json::to_string(&jump_info).unwrap();
@ -111,7 +118,7 @@ impl WebviewActor {
};
let Message::Text(msg) = msg else {
info!("WebviewActor: received non-text message from websocket: {:?}", msg);
let _ = self.webview_websocket_conn.send(Message::Text(format!("Webview Actor: error, received non-text message: {}", msg)))
let _ = self.webview_websocket_conn.send(Message::Text(format!("Webview Actor: error, received non-text message: {msg:?}")))
.await;
break;
};
@ -142,8 +149,8 @@ impl WebviewActor {
self.render_sender.send(RenderActorRequest::ResolveSpan(ResolveSpanRequest(path))).unwrap();
};
} else {
info!("WebviewActor: received unknown message from websocket: {}", msg);
self.webview_websocket_conn.send(Message::Text(format!("error, received unknown message: {}", msg))).await.unwrap();
let err = self.webview_websocket_conn.send(Message::Text(format!("error, received unknown message: {}", msg))).await;
info!("WebviewActor: received unknown message from websocket: {msg} {err:?}");
break;
}
}

View file

@ -28,30 +28,6 @@ pub enum RefreshStyle {
#[derive(Debug, Clone)]
#[cfg_attr(feature = "clap", derive(clap::Parser))]
pub struct PreviewArgs {
/// Data plane server will bind to this address
#[cfg_attr(
feature = "clap",
clap(
long = "data-plane-host",
default_value = "127.0.0.1:23625",
value_name = "HOST",
hide(true)
)
)]
pub data_plane_host: String,
/// Control plane server will bind to this address
#[cfg_attr(
feature = "clap",
clap(
long = "control-plane-host",
default_value = "127.0.0.1:23626",
value_name = "HOST",
hide(true)
)
)]
pub control_plane_host: String,
/// Only render visible part of the document. This can improve performance
/// but still being experimental.
#[cfg_attr(feature = "clap", clap(long = "partial-rendering"))]

View file

@ -4,189 +4,154 @@ mod debug_loc;
mod outline;
pub use actor::editor::{
CompileStatus, ControlPlaneMessage, ControlPlaneResponse, LspControlPlaneRx, LspControlPlaneTx,
CompileStatus, ControlPlaneMessage, ControlPlaneResponse, ControlPlaneRx, ControlPlaneTx,
};
pub use args::*;
pub use outline::Outline;
use std::pin::Pin;
use std::time::Duration;
use std::{collections::HashMap, future::Future, path::PathBuf, sync::Arc};
use std::{collections::HashMap, future::Future, path::PathBuf, pin::Pin, sync::Arc};
use futures::SinkExt;
use log::info;
use futures::sink::SinkExt;
use once_cell::sync::OnceCell;
use reflexo_typst::debug_loc::SourceSpanOffset;
use reflexo_typst::Error;
use reflexo_typst::TypstDocument as Document;
use serde::{Deserialize, Serialize};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::WebSocketStream;
use tokio::sync::{broadcast, mpsc};
use typst::{layout::Position, syntax::Span};
use crate::actor::editor::EditorActorRequest;
use crate::actor::render::RenderActorRequest;
use actor::editor::{EditorActor, EditorConnection};
use actor::editor::{EditorActor, EditorActorRequest};
use actor::render::RenderActorRequest;
use actor::typst::{TypstActor, TypstActorRequest};
use actor::webview::WebviewActorRequest;
use debug_loc::SpanInterner;
type StopFuture = Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
pub struct Previewer {
frontend_html_factory: Box<dyn Fn(PreviewMode) -> String + Send + Sync>,
stop: Option<Box<dyn FnOnce() -> StopFuture + Send + Sync>>,
data_plane_handle: tokio::task::JoinHandle<()>,
control_plane_handle: tokio::task::JoinHandle<()>,
data_plane_port: u16,
}
impl Previewer {
/// Get the HTML for the frontend by a given preview mode
pub fn frontend_html(&self, mode: PreviewMode) -> String {
(self.frontend_html_factory)(mode)
}
pub fn data_plane_port(&self) -> u16 {
self.data_plane_port
}
/// Join the previewer actors.
pub async fn join(self) {
let _ = tokio::join!(self.data_plane_handle, self.control_plane_handle);
}
pub async fn stop(&mut self) {
if let Some(stop) = self.stop.take() {
stop().await;
}
}
}
type WsError = Error;
type Message = WsMessage;
pub trait CompileHost: SourceFileServer + EditorServer {}
pub async fn preview<T: CompileHost + Send + Sync + 'static>(
arguments: PreviewArgs,
client: Arc<T>,
html: &str,
) -> Previewer {
PreviewBuilder::new(arguments).start(client, html).await
/// Get the HTML for the frontend by a given preview mode and server to connect
pub fn frontend_html(html: &str, mode: PreviewMode, to: &str) -> String {
let mode = match mode {
PreviewMode::Document => "Doc",
PreviewMode::Slide => "Slide",
};
html.replace("ws://127.0.0.1:23625", to).replace(
"preview-arg:previewMode:Doc",
format!("preview-arg:previewMode:{mode}").as_str(),
)
}
async fn preview_<T: CompileHost + Send + Sync + 'static>(
builder: PreviewBuilder,
client: Arc<T>,
html: &str,
/// Shortcut to create a previewer.
pub async fn preview(
arguments: PreviewArgs,
conn: ControlPlaneTx,
client: Arc<impl CompileHost + Send + Sync + 'static>,
) -> Previewer {
let PreviewBuilder {
arguments,
lsp_connection,
typst_mailbox,
renderer_mailbox,
editor_conn,
webview_conn: (webview_tx, _),
doc_sender,
..
} = builder;
let enable_partial_rendering = arguments.enable_partial_rendering;
let invert_colors = arguments.invert_colors;
let idle_timeout = Duration::from_secs(5);
PreviewBuilder::new(arguments).build(conn, client).await
}
// Shared resource
let span_interner = SpanInterner::new();
pub struct Previewer {
stop: Option<Box<dyn FnOnce() -> StopFuture + Send + Sync>>,
data_plane_handle: Option<tokio::task::JoinHandle<()>>,
data_plane_resources: Option<(DataPlane, Option<mpsc::Sender<()>>, mpsc::Receiver<()>)>,
control_plane_handle: tokio::task::JoinHandle<()>,
}
// Spawns the typst actor
let typst_actor = TypstActor::new(
client,
typst_mailbox.1,
renderer_mailbox.0.clone(),
editor_conn.0.clone(),
webview_tx.clone(),
);
tokio::spawn(typst_actor.run());
impl Previewer {
/// Send stop requests to preview actors.
pub async fn stop(&mut self) {
if let Some(stop) = self.stop.take() {
let _ = stop().await;
}
}
log::info!("Previewer: typst actor spawned");
/// Join all the previewer actors. Note: send stop request first.
pub async fn join(mut self) {
let data_plane_handle = self.data_plane_handle.take().expect("must bind data plane");
let _ = tokio::join!(data_plane_handle, self.control_plane_handle);
}
let (data_plane_port_tx, data_plane_port_rx) = oneshot::channel();
let data_plane_addr = arguments.data_plane_host;
let (shutdown_data_plane_tx, mut shutdown_data_plane_rx) = mpsc::channel(1);
let data_plane_handle = {
let span_interner = span_interner.clone();
let typst_tx = typst_mailbox.0.clone();
let webview_tx = webview_tx.clone();
let renderer_tx = renderer_mailbox.0.clone();
let editor_tx = editor_conn.0.clone();
let shutdown_tx = lsp_connection.as_ref().map(|e| e.shutdown_tx.clone());
tokio::spawn(async move {
// Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(&data_plane_addr).await;
let listener = try_socket.expect("Failed to bind");
info!(
"Data plane server listening on: {}",
listener.local_addr().unwrap()
);
let _ = data_plane_port_tx.send(listener.local_addr().unwrap().port());
let (alive_tx, mut alive_rx) = mpsc::unbounded_channel();
let recv = |stream: TcpStream| async {
let span_interner = span_interner.clone();
let webview_tx = webview_tx.clone();
let webview_rx = webview_tx.subscribe();
let typst_tx = typst_tx.clone();
let mut conn = accept_connection(stream).await;
if enable_partial_rendering {
conn.send(Message::Binary("partial-rendering,true".into()))
/// Listen streams that accepting data plane messages.
pub fn start_data_plane<
C: futures::Sink<WsMessage, Error = WsError>
+ futures::Stream<Item = Result<WsMessage, WsError>>
+ Send
+ 'static,
S: 'static,
SFut: Future<Output = S> + Send + 'static,
>(
&mut self,
mut streams: mpsc::UnboundedReceiver<SFut>,
caster: impl Fn(S) -> Result<C, Error> + Send + Sync + Copy + 'static,
) {
let idle_timeout = reflexo_typst::time::Duration::from_secs(5);
let (conn_handler, shutdown_tx, mut shutdown_data_plane_rx) =
self.data_plane_resources.take().unwrap();
let (alive_tx, mut alive_rx) = mpsc::unbounded_channel::<()>();
let recv = move |conn| {
let h = conn_handler.clone();
let alive_tx = alive_tx.clone();
tokio::spawn(async move {
let conn: C = caster(conn.await).unwrap();
tokio::pin!(conn);
if h.enable_partial_rendering {
conn.send(WsMessage::Binary("partial-rendering,true".into()))
.await
.unwrap();
}
if !invert_colors.is_empty() {
conn.send(Message::Binary(
format!("invert-colors,{}", invert_colors).into(),
if !h.invert_colors.is_empty() {
conn.send(WsMessage::Binary(
format!("invert-colors,{}", h.invert_colors).into(),
))
.await
.unwrap();
}
let actor::webview::Channels { svg } =
actor::webview::WebviewActor::set_up_channels();
actor::webview::WebviewActor::<'_, C>::set_up_channels();
let webview_actor = actor::webview::WebviewActor::new(
conn,
svg.1,
webview_tx.clone(),
webview_rx,
editor_tx.clone(),
renderer_tx.clone(),
h.webview_tx.clone(),
h.webview_tx.subscribe(),
h.editor_tx.clone(),
h.renderer_tx.clone(),
);
let alive_tx = alive_tx.clone();
tokio::spawn(async move {
struct FinallySend(mpsc::UnboundedSender<()>);
impl Drop for FinallySend {
fn drop(&mut self) {
let _ = self.0.send(());
}
}
let _send = FinallySend(alive_tx);
webview_actor.run().await;
});
let render_actor = actor::render::RenderActor::new(
renderer_tx.subscribe(),
doc_sender.clone(),
typst_tx,
h.renderer_tx.subscribe(),
h.doc_sender.clone(),
h.typst_tx,
svg.0,
webview_tx,
h.webview_tx,
);
tokio::spawn(render_actor.run());
let outline_render_actor = actor::render::OutlineRenderActor::new(
renderer_tx.subscribe(),
doc_sender.clone(),
editor_tx.clone(),
span_interner,
h.renderer_tx.subscribe(),
h.doc_sender.clone(),
h.editor_tx.clone(),
h.span_interner,
);
tokio::spawn(outline_render_actor.run());
};
struct FinallySend(mpsc::UnboundedSender<()>);
impl Drop for FinallySend {
fn drop(&mut self) {
let _ = self.0.send(());
}
}
let _send = FinallySend(alive_tx);
webview_actor.run().await;
})
};
let data_plane_handle = tokio::spawn(async move {
let mut alive_cnt = 0;
let mut shutdown_bell = tokio::time::interval(idle_timeout);
loop {
@ -195,88 +160,30 @@ async fn preview_<T: CompileHost + Send + Sync + 'static>(
}
tokio::select! {
Some(()) = shutdown_data_plane_rx.recv() => {
info!("Data plane server shutdown");
log::info!("Data plane server shutdown");
return;
}
Ok((stream, _)) = listener.accept() => {
Some(stream) = streams.recv() => {
alive_cnt += 1;
recv(stream).await;
tokio::spawn(recv(stream));
},
_ = alive_rx.recv() => {
alive_cnt -= 1;
}
_ = shutdown_bell.tick(), if alive_cnt == 0 && shutdown_tx.is_some() => {
let shutdown_tx = shutdown_tx.expect("scheduled shutdown without shutdown_tx");
info!("Data plane server has been idle for {idle_timeout:?}, shutting down.");
log::info!(
"Data plane server has been idle for {idle_timeout:?}, shutting down."
);
let _ = shutdown_tx.send(()).await;
info!("Data plane server shutdown");
log::info!("Data plane server shutdown");
return;
}
}
}
})
};
});
let control_plane_addr = arguments.control_plane_host;
let control_plane_handle = {
let span_interner = span_interner.clone();
let typst_tx = typst_mailbox.0.clone();
let editor_rx = editor_conn.1;
tokio::spawn(async move {
let conn = if !control_plane_addr.is_empty() {
let try_socket = TcpListener::bind(&control_plane_addr).await;
let listener = try_socket.expect("Failed to bind");
info!(
"Control plane server listening on: {}",
listener.local_addr().unwrap()
);
let (stream, _) = listener.accept().await.unwrap();
let conn = accept_connection(stream).await;
EditorConnection::WebSocket(conn)
} else {
EditorConnection::Lsp(lsp_connection.unwrap())
};
let editor_actor =
EditorActor::new(editor_rx, conn, typst_tx, webview_tx, span_interner);
editor_actor.run().await;
info!("Control plane client shutdown");
})
};
let data_plane_port = data_plane_port_rx.await.unwrap();
let html = html.replace(
"ws://127.0.0.1:23625",
format!("ws://127.0.0.1:{data_plane_port}").as_str(),
);
// previewMode
let frontend_html_factory = Box::new(move |mode| -> String {
let mode = match mode {
PreviewMode::Document => "Doc",
PreviewMode::Slide => "Slide",
};
html.replace(
"preview-arg:previewMode:Doc",
format!("preview-arg:previewMode:{mode}").as_str(),
)
});
let editor_tx = editor_conn.0;
let stop = move || -> StopFuture {
Box::pin(async move {
let _ = shutdown_data_plane_tx.send(()).await;
let _ = editor_tx.send(EditorActorRequest::Shutdown);
})
};
Previewer {
frontend_html_factory,
data_plane_handle,
control_plane_handle,
data_plane_port,
stop: Some(Box::new(stop)),
self.data_plane_handle = Some(data_plane_handle);
}
}
@ -285,8 +192,7 @@ type BroadcastChannel<T> = (broadcast::Sender<T>, broadcast::Receiver<T>);
pub struct PreviewBuilder {
arguments: PreviewArgs,
lsp_connection: Option<LspControlPlaneTx>,
shutdown_tx: Option<mpsc::Sender<()>>,
typst_mailbox: MpScChannel<TypstActorRequest>,
renderer_mailbox: BroadcastChannel<RenderActorRequest>,
editor_conn: MpScChannel<EditorActorRequest>,
@ -300,7 +206,7 @@ impl PreviewBuilder {
pub fn new(arguments: PreviewArgs) -> Self {
Self {
arguments,
lsp_connection: None,
shutdown_tx: None,
typst_mailbox: mpsc::unbounded_channel(),
renderer_mailbox: broadcast::channel(1024),
editor_conn: mpsc::unbounded_channel(),
@ -310,8 +216,8 @@ impl PreviewBuilder {
}
}
pub fn with_lsp_connection(mut self, lsp_connection: Option<LspControlPlaneTx>) -> Self {
self.lsp_connection = lsp_connection;
pub fn with_shutdown_tx(mut self, shutdown_tx: mpsc::Sender<()>) -> Self {
self.shutdown_tx = Some(shutdown_tx);
self
}
@ -327,14 +233,81 @@ impl PreviewBuilder {
})
}
pub async fn start<T>(self, client: Arc<T>, html: &str) -> Previewer
pub async fn build<T>(self, conn: ControlPlaneTx, client: Arc<T>) -> Previewer
where
T: CompileHost + Send + Sync + 'static,
{
preview_(self, client, html).await
let PreviewBuilder {
arguments,
shutdown_tx,
typst_mailbox,
renderer_mailbox,
editor_conn: (editor_tx, editor_rx),
webview_conn: (webview_tx, _),
doc_sender,
..
} = self;
// Shared resource
let span_interner = SpanInterner::new();
let (shutdown_data_plane_tx, shutdown_data_plane_rx) = mpsc::channel(1);
// Spawns the typst actor
let typst_actor = TypstActor::new(
client,
typst_mailbox.1,
renderer_mailbox.0.clone(),
editor_tx.clone(),
webview_tx.clone(),
);
tokio::spawn(typst_actor.run());
log::info!("Previewer: typst actor spawned");
// Spawns the editor actor
let editor_actor = EditorActor::new(
editor_rx,
conn,
typst_mailbox.0.clone(),
webview_tx.clone(),
span_interner.clone(),
);
let control_plane_handle = tokio::spawn(editor_actor.run());
log::info!("Previewer: editor actor spawned");
// Delayed data plane binding
let data_plane = DataPlane {
span_interner: span_interner.clone(),
webview_tx: webview_tx.clone(),
typst_tx: typst_mailbox.0.clone(),
editor_tx: editor_tx.clone(),
invert_colors: arguments.invert_colors.clone(),
renderer_tx: renderer_mailbox.0.clone(),
enable_partial_rendering: arguments.enable_partial_rendering,
doc_sender,
};
Previewer {
control_plane_handle,
data_plane_handle: None,
data_plane_resources: Some((data_plane, shutdown_tx, shutdown_data_plane_rx)),
stop: Some(Box::new(move || {
Box::pin(async move {
let _ = shutdown_data_plane_tx.send(()).await;
let _ = editor_tx.send(EditorActorRequest::Shutdown);
})
})),
}
}
}
#[derive(Debug)]
pub enum WsMessage {
/// A text WebSocket message
Text(String),
/// A binary WebSocket message
Binary(Vec<u8>),
}
pub type SourceLocation = reflexo_typst::debug_loc::SourceLocation;
pub enum Location {
@ -473,16 +446,14 @@ impl CompileWatcher {
}
}
async fn accept_connection(stream: TcpStream) -> WebSocketStream<TcpStream> {
let addr = stream
.peer_addr()
.expect("connected streams should have a peer address");
info!("Peer address: {}", addr);
let ws_stream = tokio_tungstenite::accept_async(stream)
.await
.expect("Error during the websocket handshake occurred");
info!("New WebSocket connection: {}", addr);
ws_stream
#[derive(Clone)]
struct DataPlane {
span_interner: SpanInterner,
webview_tx: broadcast::Sender<WebviewActorRequest>,
typst_tx: mpsc::UnboundedSender<TypstActorRequest>,
editor_tx: mpsc::UnboundedSender<EditorActorRequest>,
enable_partial_rendering: bool,
invert_colors: String,
renderer_tx: broadcast::Sender<RenderActorRequest>,
doc_sender: Arc<std::sync::RwLock<Option<Arc<Document>>>>,
}