dev: remove threaded_receive uses (#391)

This commit is contained in:
Myriad-Dreamin 2024-07-09 12:37:33 +08:00 committed by GitHub
parent 30a446086e
commit 7cf100ec96
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 172 additions and 223 deletions

View file

@ -379,8 +379,7 @@ fn analyze_dyn_signature_inner(func: Func) -> Arc<PrimarySignature> {
let sig_ty = SigTy::new(
pos.iter().map(|e| e.base_type.clone()),
named_vec,
rest.as_ref()
.map(|e| e.base_type.clone()),
rest.as_ref().map(|e| e.base_type.clone()),
ret_ty.clone(),
);
Arc::new(PrimarySignature {

View file

@ -2,7 +2,7 @@ use once_cell::sync::OnceCell;
use crate::{
adt::interner::Interned,
analysis::{analyze_dyn_signature, find_definition, },
analysis::{analyze_dyn_signature, find_definition},
prelude::*,
syntax::{get_check_target, get_deref_target, CheckTarget, ParamTarget},
DocTooltip, LspParamInfo, SemanticRequest,

View file

@ -35,8 +35,8 @@ impl LanguageState {
self.compile_config().determine_inputs(),
self.vfs_snapshot(),
);
if let Some(mut previous_server) = self.primary.replace(server) {
std::thread::spawn(move || previous_server.settle());
if let Some(mut prev) = self.primary.replace(server) {
self.client.handle.spawn(async move { prev.settle().await });
}
}

View file

@ -62,7 +62,6 @@ use super::{
use crate::{
task::{ExportConfig, ExportSignal, ExportTask},
tool::preview::CompileStatus,
utils::{self, threaded_receive},
world::{LspCompilerFeat, LspWorld},
CompileConfig,
};
@ -97,17 +96,6 @@ impl CompileHandler {
})
}
/// Snapshot the compiler thread for tasks
pub fn sync_snapshot(&self) -> ZResult<CompileSnapshot<LspCompilerFeat>> {
let (tx, rx) = oneshot::channel();
self.intr_tx
.send(Interrupt::Snapshot(tx))
.map_err(map_string_err("failed to send snapshot request"))?;
threaded_receive(rx).map_err(map_string_err("failed to get snapshot"))
}
pub fn add_memory_changes(&self, event: MemoryEvent) {
let _ = self.intr_tx.send(Interrupt::Memory(event));
}
@ -344,11 +332,6 @@ impl CompileClientActor {
self.handle.clone().snapshot()
}
/// Snapshot the compiler thread for tasks
pub fn sync_snapshot(&self) -> ZResult<CompileSnapshot<LspCompilerFeat>> {
self.handle.sync_snapshot()
}
pub fn add_memory_changes(&self, event: MemoryEvent) {
self.handle.add_memory_changes(event);
}
@ -388,12 +371,12 @@ impl CompileClientActor {
}
impl CompileClientActor {
pub fn settle(&mut self) {
pub async fn settle(&mut self) {
let _ = self.change_entry(None);
info!("TypstActor({}): settle requested", self.handle.diag_group);
let (tx, rx) = oneshot::channel();
let _ = self.handle.intr_tx.send(Interrupt::Settle(tx));
match utils::threaded_receive(rx) {
match rx.await {
Ok(()) => info!("TypstActor({}): settled", self.handle.diag_group),
Err(err) => error!(
"TypstActor({}): failed to settle: {err:#}",
@ -432,10 +415,12 @@ impl CompileClientActor {
self.handle.analysis.clear_cache();
}
pub fn collect_server_info(&self) -> anyhow::Result<HashMap<String, ServerInfoResponse>> {
pub fn collect_server_info(&self) -> QueryFuture {
let dg = self.handle.diag_group.clone();
let snap = self.sync_snapshot()?;
let snap = self.snapshot()?;
just_future!(async move {
let snap = snap.snapshot().await?;
let w = &snap.world;
let info = ServerInfoResponse {
@ -450,7 +435,9 @@ impl CompileClientActor {
]),
};
Ok(HashMap::from_iter([(dg, info)]))
let info = Some(HashMap::from_iter([(dg, info)]));
Ok(tinymist_query::CompilerQueryResponse::ServerInfo(info))
})
}
}
@ -470,14 +457,4 @@ impl QuerySnap {
.await
.clone()
}
/// Snapshot the compiler thread for tasks
pub fn snapshot_sync(&self) -> ZResult<CompileSnapshot<LspCompilerFeat>> {
if let Some(snap) = self.snap.get() {
return snap.clone();
}
let rx = self.rx.lock().take().unwrap();
threaded_receive(rx).map_err(map_string_err("failed to get snapshot"))
}
}

View file

@ -166,7 +166,10 @@ impl LanguageState {
let from_source = get_arg!(args[0] as String);
let to_path = get_arg!(args[1] as Option<PathBuf>).map(From::from);
let snap = self.primary().sync_snapshot().map_err(z_internal_error)?;
let snap = self.primary().snapshot().map_err(z_internal_error)?;
just_future!(async move {
let snap = snap.snapshot().await.map_err(z_internal_error)?;
// Parse the package specification. If the user didn't specify the version,
// we try to figure it out automatically by downloading the package index
@ -197,9 +200,9 @@ impl LanguageState {
log::info!("template initialized: {from_source:?} to {to_path:?}");
let res = serde_json::to_value(InitResult { entry_path })
.map_err(|_| internal_error("Cannot serialize path"));
just_result!(res)
serde_json::to_value(InitResult { entry_path })
.map_err(|_| internal_error("Cannot serialize path"))
})
}
/// Get the entry of a template.
@ -208,7 +211,10 @@ impl LanguageState {
let from_source = get_arg!(args[0] as String);
let snap = self.primary().sync_snapshot().map_err(z_internal_error)?;
let snap = self.primary().snapshot().map_err(z_internal_error)?;
just_future!(async move {
let snap = snap.snapshot().await.map_err(z_internal_error)?;
// Parse the package specification. If the user didn't specify the version,
// we try to figure it out automatically by downloading the package index
@ -234,7 +240,8 @@ impl LanguageState {
let entry = String::from_utf8(entry.to_vec())
.map_err(|_| invalid_params("template entry is not a valid UTF-8 string"))?;
just_ok!(JsonValue::String(entry))
Ok(JsonValue::String(entry))
})
}
/// Interact with the code context at the source file.
@ -263,11 +270,7 @@ impl LanguageState {
}
/// Get the trace data of the document.
pub fn get_document_trace(
&mut self,
req_id: RequestId,
mut args: Vec<JsonValue>,
) -> LspResult<Option<()>> {
pub fn get_document_trace(&mut self, mut args: Vec<JsonValue>) -> AnySchedulableResponse {
let path = get_arg!(args[0] as PathBuf).into();
// get path to self program
@ -276,7 +279,11 @@ impl LanguageState {
let entry = self.config.compile.determine_entry(Some(path));
let snap = self.primary().sync_snapshot().map_err(z_internal_error)?;
let snap = self.primary().snapshot().map_err(z_internal_error)?;
let user_action = self.user_action;
just_future!(async move {
let snap = snap.snapshot().await.map_err(z_internal_error)?;
// todo: rootless file
// todo: memory dirty file
@ -291,16 +298,20 @@ impl LanguageState {
)
.map_err(z_internal_error)?;
self.client.schedule(
req_id,
self.user_action.trace(TraceParams {
let task = user_action.trace(TraceParams {
compiler_program: self_path,
root: root.as_ref().to_owned(),
main,
inputs: snap.world.inputs().as_ref().deref().clone(),
font_paths: snap.world.font_resolver.font_paths().to_owned(),
}),
)
})?;
tokio::pin!(task);
task.as_mut().await;
let resp = task.take_output().unwrap()?;
serde_json::to_value(resp).map_err(|e| internal_error(e.to_string()))
})
}
/// Get the metrics of the document.
@ -326,8 +337,8 @@ impl LanguageState {
impl LanguageState {
/// Get the all valid symbols
pub fn resource_symbols(&mut self, _arguments: Vec<JsonValue>) -> AnySchedulableResponse {
let resp = self.get_symbol_resources();
just_result!(resp.map_err(|e| internal_error(e.to_string())))
let snapshot = self.primary().snapshot().map_err(z_internal_error)?;
just_future!(Self::get_symbol_resources(snapshot))
}
/// Get resource preview html

View file

@ -188,7 +188,11 @@ pub fn compiler_main(args: CompileArgs) -> anyhow::Result<()> {
.compile_config()
.determine_entry(Some(input.as_path().into()));
let snap = state.primary().sync_snapshot().unwrap();
let snap = state.primary().snapshot().unwrap();
RUNTIMES.tokio_runtime.block_on(async {
let snap = snap.snapshot().await.unwrap();
let w = snap.world.task(TaskInputs {
entry: Some(entry),
inputs: Some(inputs),
@ -213,7 +217,10 @@ pub fn compiler_main(args: CompileArgs) -> anyhow::Result<()> {
let warnings = env.tracer.map(|e| e.warnings());
let diagnostics = state.primary().handle.run_analysis(&w, |ctx| {
tinymist_query::convert_diagnostics(ctx, warnings.iter().flatten().chain(errors.iter()))
tinymist_query::convert_diagnostics(
ctx,
warnings.iter().flatten().chain(errors.iter()),
)
});
let diagnostics = diagnostics.unwrap_or_default();
@ -230,6 +237,7 @@ pub fn compiler_main(args: CompileArgs) -> anyhow::Result<()> {
})),
error: None,
});
});
Ok(())
})?;

View file

@ -1,8 +1,11 @@
use std::{collections::BTreeMap, path::Path, sync::Arc};
use sync_lsp::LspResult;
use typst_ts_compiler::{ShadowApi, TaskInputs};
use typst_ts_core::{config::compiler::EntryState, font::GlyphId, TypstDocument, TypstFont};
use crate::{actor::typ_client::QuerySnap, z_internal_error};
pub use super::prelude::*;
#[derive(Debug, Serialize, Deserialize)]
@ -159,7 +162,9 @@ static CAT_MAP: Lazy<HashMap<&str, SymCategory>> = Lazy::new(|| {
impl LanguageState {
/// Get the all valid symbols
pub fn get_symbol_resources(&self) -> ZResult<JsonValue> {
pub async fn get_symbol_resources(snap: QuerySnap) -> LspResult<JsonValue> {
let snap = snap.snapshot().await.map_err(z_internal_error)?;
let mut symbols = ResourceSymbolMap::new();
use typst::symbols::{emoji, sym};
populate_scope(sym().scope(), "sym", SymCategory::Misc, &mut symbols);
@ -186,24 +191,26 @@ impl LanguageState {
let symbols_ref = symbols.keys().cloned().collect::<Vec<_>>();
let snapshot = self.primary().sync_snapshot()?;
let font = {
let entry_path: Arc<Path> = Path::new("/._sym_.typ").into();
let new_entry = EntryState::new_rootless(entry_path.clone())
.ok_or_else(|| error_once!("cannot change entry"))?;
.ok_or_else(|| error_once!("cannot change entry"))
.map_err(z_internal_error)?;
let mut forked = snapshot.world.task(TaskInputs {
let mut forked = snap.world.task(TaskInputs {
entry: Some(new_entry),
..Default::default()
});
forked
.map_shadow(&entry_path, math_shaping_text.into_bytes().into())
.map_err(|e| error_once!("cannot map shadow", err: e))?;
.map_err(|e| error_once!("cannot map shadow", err: e))
.map_err(z_internal_error)?;
let sym_doc = std::marker::PhantomData
.compile(&forked, &mut Default::default())
.map_err(|e| error_once!("cannot compile symbols", err: format!("{e:?}")))?;
.map_err(|e| error_once!("cannot compile symbols", err: format!("{e:?}")))
.map_err(z_internal_error)?;
log::debug!("sym doc: {sym_doc:?}");
Some(trait_symbol_fonts(&sym_doc, &symbols_ref))
@ -295,7 +302,9 @@ impl LanguageState {
glyph_defs: glyph_def,
};
serde_json::to_value(resp).context("cannot serialize response")
serde_json::to_value(resp)
.context("cannot serialize response")
.map_err(z_internal_error)
}
}

View file

@ -243,7 +243,7 @@ impl LanguageState {
.with_command("tinymist.doInitTemplate", State::init_template)
.with_command("tinymist.doGetTemplateEntry", State::get_template_entry)
.with_command_("tinymist.interactCodeContext", State::interact_code_context)
.with_command_("tinymist.getDocumentTrace", State::get_document_trace)
.with_command("tinymist.getDocumentTrace", State::get_document_trace)
.with_command_("tinymist.getDocumentMetrics", State::get_document_metrics)
.with_command_("tinymist.getServerInfo", State::get_server_info)
// resources
@ -1030,10 +1030,7 @@ impl LanguageState {
PrepareRename(req) => query_state!(client, PrepareRename, req),
Symbol(req) => query_world!(client, Symbol, req),
DocumentMetrics(req) => query_state!(client, DocumentMetrics, req),
ServerInfo(_) => {
let res = client.collect_server_info()?;
just_ok!(CompilerQueryResponse::ServerInfo(Some(res)))
}
ServerInfo(_) => client.collect_server_info(),
_ => unreachable!(),
}
}

View file

@ -20,7 +20,7 @@ pub struct TraceParams {
pub font_paths: Vec<PathBuf>,
}
#[derive(Default)]
#[derive(Default, Clone, Copy)]
pub struct UserActionTask;
impl UserActionTask {

View file

@ -1,10 +1,6 @@
use core::fmt;
use std::thread;
use lsp_server::{ErrorCode, ResponseError};
use tokio::sync::oneshot;
use typst_ts_core::error::prelude::*;
use typst_ts_core::Error;
#[derive(Clone)]
pub struct Derived<T>(pub T);
@ -64,51 +60,3 @@ pub fn try_or<T>(f: impl FnOnce() -> Option<T>, default: T) -> T {
pub fn try_or_default<T: Default>(f: impl FnOnce() -> Option<T>) -> T {
f().unwrap_or_default()
}
pub fn threaded_receive<T: Send>(f: oneshot::Receiver<T>) -> Result<T, Error> {
// get current async handle
if let Ok(e) = tokio::runtime::Handle::try_current() {
// todo: remove blocking
return thread::scope(|s| {
s.spawn(move || {
e.block_on(f)
.map_err(map_string_err("failed to receive data"))
})
.join()
.map_err(|_| error_once!("failed to join"))?
});
}
f.blocking_recv()
.map_err(map_string_err("failed to recv from receive data"))
}
#[cfg(test)]
mod tests {
fn do_receive() {
let (tx, rx) = tokio::sync::oneshot::channel();
tx.send(1).unwrap();
let res = super::threaded_receive(rx).unwrap();
assert_eq!(res, 1);
}
#[test]
fn test_sync() {
do_receive();
}
#[test]
fn test_single_threaded() {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async { do_receive() });
}
#[test]
fn test_multiple_threaded() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async { do_receive() });
}
}