feat: schedule async tasks when running the server on bare-metals (#2040)

This commit is contained in:
Myriad-Dreamin 2025-08-18 15:00:04 +08:00 committed by GitHub
parent 310c911996
commit d227ad2a53
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 96 additions and 51 deletions

View file

@ -50,6 +50,9 @@ pub struct EditorActor {
diagnostics: HashMap<Url, HashMap<ProjectInsId, EcoVec<Diagnostic>>>, diagnostics: HashMap<Url, HashMap<ProjectInsId, EcoVec<Diagnostic>>>,
/// The map from project ID to the affected files. /// The map from project ID to the affected files.
affect_map: HashMap<ProjectInsId, Vec<Url>>, affect_map: HashMap<ProjectInsId, Vec<Url>>,
/// The local state.
status: StatusAll,
} }
impl EditorActor { impl EditorActor {
@ -65,21 +68,34 @@ impl EditorActor {
diagnostics: HashMap::new(), diagnostics: HashMap::new(),
affect_map: HashMap::new(), affect_map: HashMap::new(),
config: EditorActorConfig { notify_status }, config: EditorActorConfig { notify_status },
status: StatusAll {
status: CompileStatusEnum::Compiling,
path: "".to_owned(),
page_count: 0,
words_count: None,
},
} }
} }
/// Runs the editor actor in background. It exits when the editor channel /// Runs the editor actor in background. It exits when the editor channel
/// is closed. /// is closed.
pub async fn run(mut self) { pub async fn run(mut self) {
// The local state.
let mut status = StatusAll {
status: CompileStatusEnum::Compiling,
path: "".to_owned(),
page_count: 0,
words_count: None,
};
while let Some(req) = self.editor_rx.recv().await { while let Some(req) = self.editor_rx.recv().await {
self.handle(req);
}
log::info!("editor actor is stopped");
}
#[cfg(not(feature = "system"))]
pub fn step(&mut self) {
while let Ok(req) = self.editor_rx.try_recv() {
self.handle(req);
}
}
fn handle(&mut self, req: EditorRequest) {
match req { match req {
EditorRequest::Config(config) => { EditorRequest::Config(config) => {
log::info!("received config request: {config:?}"); log::info!("received config request: {config:?}");
@ -91,42 +107,37 @@ impl EditorActor {
diagnostics.as_ref().map(|files| files.len()) diagnostics.as_ref().map(|files| files.len())
); );
self.publish(version.id, diagnostics).await; self.publish(version.id, diagnostics);
} }
EditorRequest::Status(compile_status) => { EditorRequest::Status(compile_status) => {
log::trace!("received status request: {compile_status:?}"); log::trace!("received status request: {compile_status:?}");
if self.config.notify_status && compile_status.id == ProjectInsId::PRIMARY { if self.config.notify_status && compile_status.id == ProjectInsId::PRIMARY {
use tinymist_project::CompileStatusEnum::*; use tinymist_project::CompileStatusEnum::*;
status.path = compile_status self.status.path = compile_status
.compiling_id .compiling_id
.map_or_default(|fid| unix_slash(fid.vpath().as_rooted_path())); .map_or_default(|fid| unix_slash(fid.vpath().as_rooted_path()));
status.page_count = compile_status.page_count; self.status.page_count = compile_status.page_count;
status.status = match &compile_status.status { self.status.status = match &compile_status.status {
Compiling => CompileStatusEnum::Compiling, Compiling => CompileStatusEnum::Compiling,
Suspend | CompileSuccess { .. } => CompileStatusEnum::CompileSuccess, Suspend | CompileSuccess { .. } => CompileStatusEnum::CompileSuccess,
ExportError { .. } | CompileError { .. } => { ExportError { .. } | CompileError { .. } => CompileStatusEnum::CompileError,
CompileStatusEnum::CompileError
}
}; };
self.client.send_notification::<StatusAll>(&status); self.client.send_notification::<StatusAll>(&self.status);
} }
} }
EditorRequest::WordCount(id, count) => { EditorRequest::WordCount(id, count) => {
log::trace!("received word count request"); log::trace!("received word count request");
if self.config.notify_status && id == ProjectInsId::PRIMARY { if self.config.notify_status && id == ProjectInsId::PRIMARY {
status.words_count = Some(count); self.status.words_count = Some(count);
self.client.send_notification::<StatusAll>(&status); self.client.send_notification::<StatusAll>(&self.status);
} }
} }
} }
} }
log::info!("editor actor is stopped");
}
/// Publishes diagnostics of a project to the editor. /// Publishes diagnostics of a project to the editor.
pub async fn publish(&mut self, id: ProjectInsId, next_diag: Option<DiagnosticsMap>) { pub fn publish(&mut self, id: ProjectInsId, next_diag: Option<DiagnosticsMap>) {
let affected = match next_diag.as_ref() { let affected = match next_diag.as_ref() {
Some(next_diag) => self Some(next_diag) => self
.affect_map .affect_map

View file

@ -60,6 +60,7 @@ impl ServerState {
.log_error("could not register to watch config changes"); .log_error("could not register to watch config changes");
} }
self.schedule_async();
log::info!("server initialized"); log::info!("server initialized");
Ok(()) Ok(())
} }
@ -94,6 +95,8 @@ impl ServerState {
// Focus after opening // Focus after opening
self.implicit_focus_entry(|| Some(path), 'o'); self.implicit_focus_entry(|| Some(path), 'o');
self.schedule_async();
Ok(()) Ok(())
} }
@ -101,6 +104,7 @@ impl ServerState {
let path = as_path(params.text_document).as_path().into(); let path = as_path(params.text_document).as_path().into();
self.remove_source(path).map_err(invalid_params)?; self.remove_source(path).map_err(invalid_params)?;
self.schedule_async();
Ok(()) Ok(())
} }
@ -110,6 +114,7 @@ impl ServerState {
self.edit_source(path, changes, self.const_config().position_encoding) self.edit_source(path, changes, self.const_config().position_encoding)
.map_err(invalid_params)?; .map_err(invalid_params)?;
self.schedule_async();
Ok(()) Ok(())
} }
@ -117,6 +122,7 @@ impl ServerState {
let path = as_path(params.text_document).as_path().into(); let path = as_path(params.text_document).as_path().into();
self.save_source(path).map_err(invalid_params)?; self.save_source(path).map_err(invalid_params)?;
self.schedule_async();
Ok(()) Ok(())
} }
} }
@ -176,6 +182,7 @@ impl ServerState {
} }
log::info!("new settings applied"); log::info!("new settings applied");
self.schedule_async();
Ok(()) Ok(())
} }

View file

@ -92,8 +92,11 @@ pub struct ServerState {
pub config: Config, pub config: Config,
/// Source synchronized with client /// Source synchronized with client
pub memory_changes: HashMap<Arc<Path>, Source>, pub memory_changes: HashMap<Arc<Path>, Source>,
/// The diagnostics sender to send diagnostics to `crate::actor::cluster`. /// The diagnostics sender to send diagnostics to `crate::actor::cluster`.
pub editor_tx: mpsc::UnboundedSender<EditorRequest>, pub editor_tx: mpsc::UnboundedSender<EditorRequest>,
/// The editor actor state
editor_actor: Option<EditorActor>,
} }
/// Getters and the main loop. /// Getters and the main loop.
@ -147,6 +150,7 @@ impl ServerState {
focusing: None, focusing: None,
implicit_position: None, implicit_position: None,
formatter, formatter,
editor_actor: None,
} }
} }
@ -196,8 +200,14 @@ impl ServerState {
#[cfg(feature = "preview")] #[cfg(feature = "preview")]
server.background_preview(); server.background_preview();
// Run the cluster in the background after we referencing it // Runs the editor actor. If the server is not running in the system, we do not
// spawn the editor actor and run it in the background, but steps it
// in the main thread using `Self::schedule_async`.
if cfg!(feature = "system") {
client.handle.spawn(editor_actor.run()); client.handle.spawn(editor_actor.run());
} else {
server.editor_actor = Some(editor_actor);
}
} }
server server
@ -341,6 +351,20 @@ impl ServerState {
.with_request::<request::Threads>(Self::debug_threads) .with_request::<request::Threads>(Self::debug_threads)
} }
#[cfg(not(feature = "system"))]
/// Schedules the async tasks of the server on some paths. This is used to
/// run the server in passive context, for example, in the web
/// environment where the server is not run in background.
pub(crate) fn schedule_async(&mut self) {
if let Some(editor_actor) = self.editor_actor.as_mut() {
editor_actor.step();
}
}
#[cfg(feature = "system")]
#[inline(always)]
pub(crate) fn schedule_async(&mut self) {}
/// Handles the project interrupts. /// Handles the project interrupts.
fn compile_interrupt<T: Initializer<S = Self>>( fn compile_interrupt<T: Initializer<S = Self>>(
mut state: ServiceState<T, T::S>, mut state: ServiceState<T, T::S>,
@ -354,6 +378,9 @@ impl ServerState {
}; };
ready.project.interrupt(params); ready.project.interrupt(params);
ready.schedule_async();
// log::info!("interrupted in {:?}", _start.elapsed()); // log::info!("interrupted in {:?}", _start.elapsed());
Ok(()) Ok(())
} }