feat: send trace data via http instead of lsp stdio (#660)

This commit is contained in:
Myriad-Dreamin 2024-10-11 15:27:23 +08:00 committed by GitHub
parent 69aef36184
commit ae08ce8723
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 331 additions and 93 deletions

View file

@ -25,7 +25,7 @@ pub enum Commands {
Query(QueryCommands),
/// Runs language server for tracing some typst program.
#[clap(hide(true))]
TraceLsp(CompileArgs),
TraceLsp(TraceLspArgs),
/// Runs preview server
#[cfg(feature = "preview")]
Preview(tinymist::tool::preview::PreviewCliArgs),
@ -118,9 +118,12 @@ impl clap_complete::Generator for Shell {
}
#[derive(Debug, Clone, Default, clap::Parser)]
pub struct CompileArgs {
pub struct TraceLspArgs {
#[clap(long, default_value = "false")]
pub persist: bool,
// lsp or http
#[clap(long, default_value = "lsp")]
pub rpc_kind: String,
#[clap(flatten)]
pub mirror: MirrorArgs,
#[clap(flatten)]

View file

@ -443,13 +443,12 @@ impl LanguageState {
main,
inputs: snap.world.inputs().as_ref().deref().clone(),
font_paths: snap.world.font_resolver.font_paths().to_owned(),
rpc_kind: "http".into(),
})?;
tokio::pin!(task);
task.as_mut().await;
let resp = task.take_output().unwrap()?;
serde_json::to_value(resp).map_err(|e| internal_error(e.to_string()))
task.take_output().unwrap()
})
}

View file

@ -24,6 +24,7 @@ mod init;
mod resource;
mod server;
mod task;
pub use task::UserActionTask;
pub mod tool;
mod utils;

View file

@ -17,18 +17,16 @@ use comemo::Prehashed;
use futures::future::MaybeDone;
use lsp_server::RequestId;
use once_cell::sync::Lazy;
use reflexo_typst::{
package::PackageSpec, typst::prelude::EcoVec, CompileEnv, Compiler, TaskInputs, TypstDict,
};
use reflexo_typst::{package::PackageSpec, TaskInputs, TypstDict};
use serde_json::Value as JsonValue;
use sync_lsp::{
internal_error,
transport::{with_stdio_transport, MirrorArgs},
LspBuilder, LspClientRoot, LspResult,
};
use tinymist::{CompileConfig, Config, LanguageState, LspWorld, RegularInit, SuperInit};
use tinymist::{CompileConfig, Config, LanguageState, RegularInit, SuperInit, UserActionTask};
use tinymist_query::docs::PackageInfo;
use typst::{eval::Tracer, foundations::IntoValue, syntax::Span, World};
use typst::foundations::IntoValue;
use crate::args::*;
@ -80,7 +78,7 @@ fn main() -> anyhow::Result<()> {
Commands::Completion(args) => completion(args),
Commands::Query(query_cmds) => query_main(query_cmds),
Commands::Lsp(args) => lsp_main(args),
Commands::TraceLsp(args) => trace_main(args),
Commands::TraceLsp(args) => trace_lsp_main(args),
#[cfg(feature = "preview")]
Commands::Preview(args) => {
#[cfg(feature = "preview")]
@ -133,7 +131,7 @@ pub fn lsp_main(args: LspArgs) -> anyhow::Result<()> {
}
/// The main entry point for the compiler.
pub fn trace_main(args: CompileArgs) -> anyhow::Result<()> {
pub fn trace_lsp_main(args: TraceLspArgs) -> anyhow::Result<()> {
let mut input = PathBuf::from(match args.compile.input {
Some(value) => value,
None => return Err(anyhow::anyhow!("provide a valid path")),
@ -218,45 +216,7 @@ pub fn trace_main(args: CompileArgs) -> anyhow::Result<()> {
inputs: Some(inputs),
});
let mut env = CompileEnv {
tracer: Some(Tracer::default()),
..Default::default()
};
typst_timing::enable();
let mut errors = EcoVec::new();
if let Err(e) = std::marker::PhantomData.compile(&w, &mut env) {
errors = e;
}
let mut writer = std::io::BufWriter::new(Vec::new());
let _ = typst_timing::export_json(&mut writer, |span| {
resolve_span(&w, span).unwrap_or_else(|| ("unknown".to_string(), 0))
});
let timings = String::from_utf8(writer.into_inner().unwrap()).unwrap();
let warnings = env.tracer.map(|e| e.warnings());
let diagnostics = state.primary().handle.run_analysis(&w, |ctx| {
tinymist_query::convert_diagnostics(
ctx,
warnings.iter().flatten().chain(errors.iter()),
)
});
let diagnostics = diagnostics.unwrap_or_default();
client.send_notification_(lsp_server::Notification {
method: "tinymistExt/diagnostics".to_owned(),
params: serde_json::json!(diagnostics),
});
client.respond(lsp_server::Response {
id: req_id,
result: Some(serde_json::json!({
"tracingData": timings,
})),
error: None,
});
UserActionTask::trace_main(client, state, &w, args.rpc_kind, req_id).await
});
Ok(())
@ -327,12 +287,3 @@ pub fn query_main(cmds: QueryCommands) -> anyhow::Result<()> {
Ok(())
}
/// Turns a span into a (file, line) pair.
fn resolve_span(world: &LspWorld, span: Span) -> Option<(String, u32)> {
let id = span.id()?;
let source = world.source(id).ok()?;
let range = source.range(span)?;
let line = source.byte_to_line(range.start)?;
Some((format!("{id:?}"), line as u32 + 1))
}

View file

@ -4,11 +4,17 @@ use std::path::PathBuf;
use anyhow::bail;
use base64::Engine;
use reflexo_typst::TypstDict;
use hyper::service::service_fn;
use hyper_util::{rt::TokioIo, server::graceful::GracefulShutdown};
use lsp_server::RequestId;
use reflexo_typst::{typst::prelude::EcoVec, CompileEnv, Compiler, TypstDict};
use serde::{Deserialize, Serialize};
use sync_lsp::{just_future, SchedulableResponse};
use serde_json::Value as JsonValue;
use sync_lsp::{just_future, LspClient, SchedulableResponse};
use tinymist_world::LspWorld;
use typst::{eval::Tracer, syntax::Span, World};
use crate::internal_error;
use crate::{internal_error, LanguageState};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
@ -18,30 +24,53 @@ pub struct TraceParams {
pub main: PathBuf,
pub inputs: TypstDict,
pub font_paths: Vec<PathBuf>,
pub rpc_kind: String,
}
/// The user action task.
#[derive(Default, Clone, Copy)]
pub struct UserActionTask;
impl UserActionTask {
pub fn trace(&self, params: TraceParams) -> SchedulableResponse<TraceReport> {
/// Run a trace.
pub fn trace(&self, params: TraceParams) -> SchedulableResponse<JsonValue> {
just_future(async move {
run_trace_program(params)
.await
.map_err(|e| internal_error(format!("failed to run trace program: {e:?}")))
})
}
/// Run a trace request in subprocess.
pub async fn trace_main(
client: LspClient,
state: &mut LanguageState,
w: &LspWorld,
rpc_kind: String,
req_id: RequestId,
) -> ! {
trace_main(client, state, w, rpc_kind, req_id).await
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TraceReport {
request: TraceParams,
messages: Vec<lsp_server::Message>,
stderr: String,
}
/// Run a perf trace to some typst program
async fn run_trace_program(params: TraceParams) -> anyhow::Result<TraceReport> {
async fn run_trace_program(params: TraceParams) -> anyhow::Result<JsonValue> {
// Typst compile root, input, font paths, inputs
let mut cmd = tokio::process::Command::new(&params.compiler_program);
let mut cmd = std::process::Command::new(&params.compiler_program);
let mut cmd = &mut cmd;
cmd = cmd.arg("trace-lsp");
cmd = cmd
.arg("--rpc-kind")
.arg(&params.rpc_kind)
.arg("--root")
.arg(params.root.as_path())
.arg(params.main.as_path());
@ -58,37 +87,285 @@ async fn run_trace_program(params: TraceParams) -> anyhow::Result<TraceReport> {
}
log::info!("running trace program: {cmd:?}");
let start = reflexo::time::Instant::now();
let output = cmd.output().await;
let output = output.expect("trace program command failed to start");
let stdout = output.stdout;
let stderr = output.stderr;
let mut child = cmd
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.expect("trace program command failed to start");
log::info!("trace program executed");
let stdout = child.stdout.take().expect("stdout missing");
let mut input_chan = std::io::Cursor::new(stdout);
let messages = std::iter::from_fn(|| {
if input_chan.position() == input_chan.get_ref().len() as u64 {
return None;
}
let msg = lsp_server::Message::read(&mut input_chan).ok()?;
Some(msg)
})
.flatten()
.collect::<Vec<_>>();
let (msg_tx, msg_rx) = tokio::sync::oneshot::channel();
std::thread::spawn(move || {
let mut input_chan = std::io::BufReader::new(stdout);
let mut has_response = false;
let messages = std::iter::from_fn(|| {
if has_response {
return None;
}
let msg = lsp_server::Message::read(&mut input_chan).ok()?;
if let Some(lsp_server::Message::Response(resp)) = &msg {
if resp.id == 0.into() {
has_response = true;
}
}
Some(msg)
})
.flatten()
.collect::<Vec<_>>();
msg_tx.send(messages).ok();
});
let stderr = base64::engine::general_purpose::STANDARD.encode(stderr);
let messages = msg_rx.await.unwrap();
Ok(TraceReport {
log::info!("trace program executed in {:?}", start.elapsed());
let start = reflexo::time::Instant::now();
let res = serde_json::to_value(TraceReport {
request: params,
messages,
stderr,
})
stderr: base64::engine::general_purpose::STANDARD.encode(String::new()),
})?;
log::info!("trace result encoded in {:?}", start.elapsed());
std::thread::spawn(move || {
let res = child.wait_with_output();
match res {
Ok(output) => {
log::info!("trace program exited with status: {:?}", output.status);
use std::io::BufRead;
for line in output.stderr.lines() {
let Ok(line) = line else {
continue;
};
log::error!("trace program stderr: {line}");
}
}
Err(e) => {
log::error!("trace program failed with error: {e:?}");
}
}
});
Ok(res)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TraceReport {
request: TraceParams,
messages: Vec<lsp_server::Message>,
stderr: String,
async fn trace_main(
client: LspClient,
state: &mut LanguageState,
w: &LspWorld,
rpc_kind: String,
req_id: RequestId,
) -> ! {
let mut env = CompileEnv {
tracer: Some(Tracer::default()),
..Default::default()
};
typst_timing::enable();
let mut errors = EcoVec::new();
if let Err(e) = std::marker::PhantomData.compile(w, &mut env) {
errors = e;
}
let mut writer = std::io::BufWriter::new(Vec::new());
let _ = typst_timing::export_json(&mut writer, |span| {
resolve_span(w, span).unwrap_or_else(|| ("unknown".to_string(), 0))
});
let timings = writer.into_inner().unwrap();
let warnings = env.tracer.map(|e| e.warnings());
let diagnostics = state.primary().handle.run_analysis(w, |ctx| {
tinymist_query::convert_diagnostics(ctx, warnings.iter().flatten().chain(errors.iter()))
});
let diagnostics = diagnostics.unwrap_or_default();
let rpc_kind = rpc_kind.as_str();
client.send_notification_(lsp_server::Notification {
method: "tinymistExt/diagnostics".to_owned(),
params: serde_json::json!(diagnostics),
});
match rpc_kind {
"lsp" => {
client.respond(lsp_server::Response {
id: req_id,
result: Some(serde_json::json!({
"tracingData": String::from_utf8(timings).unwrap(),
})),
error: None,
});
}
"http" => {
let (addr_tx, addr_rx) = tokio::sync::oneshot::channel();
let t = tokio::spawn(async move {
let static_file_addr = "127.0.0.1:0".to_owned();
make_http_server(timings, static_file_addr, addr_tx).await;
});
let addr = addr_rx.await.unwrap();
client.respond(lsp_server::Response {
id: req_id,
result: Some(serde_json::json!({
"tracingUrl": format!("http://{addr}"),
})),
error: None,
});
t.await.unwrap();
}
kind => {
panic!("unsupported rpc kind: {kind:?}");
}
}
std::process::exit(0);
}
// todo: reuse code from tools preview
/// Create a http server for the trace program.
pub async fn make_http_server(
timings: Vec<u8>,
static_file_addr: String,
addr_tx: tokio::sync::oneshot::Sender<std::net::SocketAddr>,
) -> ! {
use http_body_util::Full;
use hyper::body::{Bytes, Incoming};
type Server = hyper_util::server::conn::auto::Builder<hyper_util::rt::TokioExecutor>;
let (alive_tx, mut alive_rx) = tokio::sync::mpsc::unbounded_channel();
let timings = hyper::body::Bytes::from(timings);
let make_service = move || {
let timings = timings.clone();
let alive_tx = alive_tx.clone();
service_fn(move |req: hyper::Request<Incoming>| {
let timings = timings.clone();
let _ = alive_tx.send(());
async move {
if req.uri().path() == "/" {
let b = hyper::Response::builder()
.header(hyper::header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
.header(hyper::header::ACCESS_CONTROL_ALLOW_METHODS, "GET, HEAD")
.header(
hyper::header::ACCESS_CONTROL_ALLOW_HEADERS,
"Origin, X-Requested-With, Content-Type, Accept",
);
let res = if req.method() == hyper::Method::HEAD {
b.body(Full::<Bytes>::default()).unwrap()
} else {
b.header(hyper::header::CONTENT_TYPE, "application/json")
.body(Full::<Bytes>::from(timings))
.unwrap()
};
Ok::<_, std::convert::Infallible>(res)
} else {
// jump to /
let res = hyper::Response::builder()
.status(hyper::StatusCode::FOUND)
.header(hyper::header::LOCATION, "/")
.body(Full::<Bytes>::default())
.unwrap();
Ok(res)
}
}
})
};
let listener = tokio::net::TcpListener::bind(&static_file_addr)
.await
.unwrap();
let addr = listener.local_addr().unwrap();
log::info!("trace server listening on http://{addr}");
let (final_tx, final_rx) = tokio::sync::oneshot::channel();
// the graceful watcher
let graceful = hyper_util::server::graceful::GracefulShutdown::new();
let serve_conn = move |server: &Server, graceful: &GracefulShutdown, conn| {
let (stream, _peer_addr) = match conn {
Ok(conn) => conn,
Err(e) => {
log::error!("accept error: {e}");
return;
}
};
let conn = server.serve_connection(TokioIo::new(stream), make_service());
let conn = graceful.watch(conn.into_owned());
tokio::spawn(async move {
if let Err(err) = conn.await {
log::error!("error serving connection: {err:?}");
}
});
};
let join = tokio::spawn(async move {
// when this signal completes, start shutdown
let mut signal = std::pin::pin!(final_rx);
let mut server = Server::new(hyper_util::rt::TokioExecutor::new());
server.http1().keep_alive(true);
loop {
tokio::select! {
conn = listener.accept() => serve_conn(&server, &graceful, conn),
Ok(_) = &mut signal => {
log::info!("graceful shutdown signal received");
break;
}
}
}
tokio::select! {
_ = graceful.shutdown() => {
log::info!("gracefully shutdown!");
},
_ = tokio::time::sleep(reflexo::time::Duration::from_secs(10)) => {
log::info!("waited 10 seconds for graceful shutdown, aborting...");
}
}
});
// final_tx.send(()).ok();
tokio::spawn(async move {
// timemout alive_rx
loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
log::info!("trace-server: ctrl-c received, shutting down");
final_tx.send(()).ok();
break;
},
_ = tokio::time::sleep(reflexo::time::Duration::from_secs(15)) => {
log::info!("trace-server: No activity for 15 seconds, shutting down");
final_tx.send(()).ok();
break;
},
_ = alive_rx.recv() => {
log::info!("trace-server: Activity detected, resetting timer");
}
}
}
});
addr_tx.send(addr).ok();
join.await.unwrap();
std::process::exit(0);
}
/// Turns a span into a (file, line) pair.
fn resolve_span(world: &LspWorld, span: Span) -> Option<(String, u32)> {
let id = span.id()?;
let source = world.source(id).ok()?;
let range = source.range(span)?;
let line = source.byte_to_line(range.start)?;
Some((format!("{id:?}"), line as u32 + 1))
}

View file

@ -83,7 +83,7 @@ export const Tracing = () => {
);
const since = Date.now();
const collecting = setInterval(() => {
const collecting = setInterval(async () => {
const message = document.getElementById("message")!;
if (!message) {
return;
@ -92,7 +92,7 @@ export const Tracing = () => {
const elapsedAlign = (elapsed / 1000).toFixed(1).padStart(5, " ");
if (traceReport.val) {
console.log(JSON.stringify(traceReport.val));
// console.log(JSON.stringify(traceReport.val));
clearInterval(collecting);
const openTraceButton = document.getElementById(
@ -120,7 +120,14 @@ export const Tracing = () => {
msg = `Error: ${firstResponse.error.message}`;
} else {
msg = "";
tracingContent = enc.encode(firstResponse.result.tracingData).buffer;
if (firstResponse.result.tracingData) {
tracingContent = enc.encode(firstResponse.result.tracingData).buffer;
} else if (firstResponse.result.tracingUrl) {
const response = await fetch(firstResponse.result.tracingUrl);
tracingContent = await response.arrayBuffer();
} else {
msg = "No trace data or url found in response";
}
}
if (!firstResponse) {