diff --git a/Cargo.lock b/Cargo.lock index 824d2c12cd..ed87f39c71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2855,6 +2855,8 @@ dependencies = [ "serde", "thiserror 2.0.12", "tokio", + "tokio-vsock", + "tower-service", ] [[package]] diff --git a/ext/telemetry/Cargo.toml b/ext/telemetry/Cargo.toml index c586614761..f7c9b2df4b 100644 --- a/ext/telemetry/Cargo.toml +++ b/ext/telemetry/Cargo.toml @@ -33,3 +33,7 @@ pin-project.workspace = true serde.workspace = true thiserror.workspace = true tokio.workspace = true +tower-service.workspace = true + +[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dependencies] +tokio-vsock.workspace = true diff --git a/ext/telemetry/lib.rs b/ext/telemetry/lib.rs index 4ff074c2cd..c6c940fda3 100644 --- a/ext/telemetry/lib.rs +++ b/ext/telemetry/lib.rs @@ -499,7 +499,6 @@ mod hyper_client { use std::fmt::Debug; use std::pin::Pin; use std::task::Poll; - use std::task::{self}; use deno_tls::SocketUse; use deno_tls::TlsKey; @@ -509,53 +508,220 @@ mod hyper_client { use deno_tls::load_private_keys; use http_body_util::BodyExt; use http_body_util::Full; - use hyper::body::Body as HttpBody; - use hyper::body::Frame; + use hyper::Uri; use hyper_rustls::HttpsConnector; + use hyper_rustls::MaybeHttpsStream; use hyper_util::client::legacy::Client; + use hyper_util::client::legacy::connect::Connected; use hyper_util::client::legacy::connect::HttpConnector; + use hyper_util::rt::TokioIo; use opentelemetry_http::Bytes; use opentelemetry_http::HttpError; use opentelemetry_http::Request; use opentelemetry_http::Response; use opentelemetry_http::ResponseExt; + use tokio::net::TcpStream; + #[cfg(any(target_os = "linux", target_os = "macos"))] + use tokio_vsock::VsockAddr; + #[cfg(any(target_os = "linux", target_os = "macos"))] + use tokio_vsock::VsockStream; use super::OtelSharedRuntime; - // same as opentelemetry_http::HyperClient except it uses OtelSharedRuntime + #[derive(Debug, thiserror::Error)] + enum Error { + #[error(transparent)] + StdIo(#[from] std::io::Error), + #[error(transparent)] + Box(#[from] Box), + } + + #[derive(Debug, Clone)] + enum Connector { + Http(HttpsConnector), + #[cfg(any(target_os = "linux", target_os = "macos"))] + Vsock(VsockAddr), + } + + #[pin_project::pin_project(project = IOProj)] + enum IO { + Tls(#[pin] TokioIo>>), + #[cfg(any(target_os = "linux", target_os = "macos"))] + Vsock(#[pin] VsockStream), + } + + impl tokio::io::AsyncRead for IO { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + match self.project() { + IOProj::Tls(stream) => stream.poll_read(cx, buf), + #[cfg(any(target_os = "linux", target_os = "macos"))] + IOProj::Vsock(stream) => stream.poll_read(cx, buf), + } + } + } + + impl tokio::io::AsyncWrite for IO { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> Poll> { + match self.project() { + IOProj::Tls(stream) => stream.poll_write(cx, buf), + #[cfg(any(target_os = "linux", target_os = "macos"))] + IOProj::Vsock(stream) => stream.poll_write(cx, buf), + } + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + match self.project() { + IOProj::Tls(stream) => stream.poll_flush(cx), + #[cfg(any(target_os = "linux", target_os = "macos"))] + IOProj::Vsock(stream) => stream.poll_flush(cx), + } + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + match self.project() { + IOProj::Tls(stream) => stream.poll_shutdown(cx), + #[cfg(any(target_os = "linux", target_os = "macos"))] + IOProj::Vsock(stream) => stream.poll_shutdown(cx), + } + } + + fn is_write_vectored(&self) -> bool { + match self { + IO::Tls(stream) => stream.is_write_vectored(), + #[cfg(any(target_os = "linux", target_os = "macos"))] + IO::Vsock(stream) => stream.is_write_vectored(), + } + } + + fn poll_write_vectored( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + match self.project() { + IOProj::Tls(stream) => stream.poll_write_vectored(cx, bufs), + #[cfg(any(target_os = "linux", target_os = "macos"))] + IOProj::Vsock(stream) => stream.poll_write_vectored(cx, bufs), + } + } + } + + impl hyper_util::client::legacy::connect::Connection for IO { + fn connected(&self) -> Connected { + match self { + Self::Tls(stream) => stream.connected(), + #[cfg(any(target_os = "linux", target_os = "macos"))] + Self::Vsock(_) => Connected::new().proxy(true), + } + } + } + + impl tower_service::Service for Connector { + type Response = TokioIo; + type Error = Error; + type Future = Pin< + Box< + dyn std::future::Future> + + Send, + >, + >; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + match self { + Self::Http(c) => c.poll_ready(cx).map_err(Into::into), + #[cfg(any(target_os = "linux", target_os = "macos"))] + Self::Vsock(_) => Poll::Ready(Ok(())), + } + } + + fn call(&mut self, dst: Uri) -> Self::Future { + let this = self.clone(); + Box::pin(async move { + match this { + Self::Http(mut connector) => { + let stream = connector.call(dst).await?; + Ok(TokioIo::new(IO::Tls(TokioIo::new(stream)))) + } + #[cfg(any(target_os = "linux", target_os = "macos"))] + Self::Vsock(addr) => { + let stream = VsockStream::connect(addr).await?; + Ok(TokioIo::new(IO::Vsock(stream))) + } + } + }) + } + } + #[derive(Debug, Clone)] pub struct HyperClient { - inner: Client, Body>, + inner: Client>, } impl HyperClient { pub fn new() -> deno_core::anyhow::Result { - let ca_certs = match std::env::var("OTEL_EXPORTER_OTLP_CERTIFICATE") { - Ok(path) => vec![std::fs::read(path)?], - _ => vec![], - }; - - let keys = match ( - std::env::var("OTEL_EXPORTER_OTLP_CLIENT_KEY"), - std::env::var("OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE"), - ) { - (Ok(key_path), Ok(cert_path)) => { - let key = std::fs::read(key_path)?; - let cert = std::fs::read(cert_path)?; - - let certs = load_certs(&mut std::io::Cursor::new(cert))?; - let key = load_private_keys(&key)?.into_iter().next().unwrap(); - - TlsKeys::Static(TlsKey(certs, key)) + let connector = if let Ok(addr) = std::env::var("OTEL_DENO_VSOCK") { + #[cfg(not(any(target_os = "linux", target_os = "macos")))] + { + let _ = addr; + deno_core::anyhow::bail!("vsock is not supported on this platform") } - _ => TlsKeys::Null, - }; - let tls_config = - create_client_config(None, ca_certs, None, keys, SocketUse::Http)?; - let mut http_connector = HttpConnector::new(); - http_connector.enforce_http(false); - let connector = HttpsConnector::from((http_connector, tls_config)); + #[cfg(any(target_os = "linux", target_os = "macos"))] + { + let Some((cid, port)) = addr.split_once(':') else { + deno_core::anyhow::bail!("invalid vsock addr"); + }; + let cid = if cid == "-1" { u32::MAX } else { cid.parse()? }; + let port = port.parse()?; + let addr = VsockAddr::new(cid, port); + Connector::Vsock(addr) + } + } else { + let ca_certs = match std::env::var("OTEL_EXPORTER_OTLP_CERTIFICATE") { + Ok(path) => vec![std::fs::read(path)?], + _ => vec![], + }; + + let keys = match ( + std::env::var("OTEL_EXPORTER_OTLP_CLIENT_KEY"), + std::env::var("OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE"), + ) { + (Ok(key_path), Ok(cert_path)) => { + let key = std::fs::read(key_path)?; + let cert = std::fs::read(cert_path)?; + + let certs = load_certs(&mut std::io::Cursor::new(cert))?; + let key = load_private_keys(&key)?.into_iter().next().unwrap(); + + TlsKeys::Static(TlsKey(certs, key)) + } + _ => TlsKeys::Null, + }; + + let tls_config = + create_client_config(None, ca_certs, None, keys, SocketUse::Http)?; + let mut http_connector = HttpConnector::new(); + http_connector.enforce_http(false); + let connector = HttpsConnector::from((http_connector, tls_config)); + Connector::Http(connector) + }; Ok(Self { inner: Client::builder(OtelSharedRuntime).build(connector), @@ -570,42 +736,12 @@ mod hyper_client { request: Request>, ) -> Result, HttpError> { let (parts, body) = request.into_parts(); - let request = Request::from_parts(parts, Body(Full::from(body))); - let mut response = self.inner.request(request).await?; - let headers = std::mem::take(response.headers_mut()); - - let mut http_response = Response::builder() - .status(response.status()) - .body(response.into_body().collect().await?.to_bytes())?; - *http_response.headers_mut() = headers; - - Ok(http_response.error_for_status()?) - } - } - - #[pin_project::pin_project] - pub struct Body(#[pin] Full); - - impl HttpBody for Body { - type Data = Bytes; - type Error = Box; - - #[inline] - fn poll_frame( - self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> Poll, Self::Error>>> { - self.project().0.poll_frame(cx).map_err(Into::into) - } - - #[inline] - fn is_end_stream(&self) -> bool { - self.0.is_end_stream() - } - - #[inline] - fn size_hint(&self) -> hyper::body::SizeHint { - self.0.size_hint() + let request = Request::from_parts(parts, Full::from(body)); + let response = self.inner.request(request).await?; + let (parts, body) = response.into_parts(); + let body = body.collect().await?.to_bytes(); + let response = Response::from_parts(parts, body); + Ok(response.error_for_status()?) } } } diff --git a/tests/specs/cli/otel_basic/__test__.jsonc b/tests/specs/cli/otel_basic/__test__.jsonc index 283ad3b34a..31fe5e0a2b 100644 --- a/tests/specs/cli/otel_basic/__test__.jsonc +++ b/tests/specs/cli/otel_basic/__test__.jsonc @@ -4,6 +4,14 @@ "args": "run -A main.ts basic.ts", "output": "basic.out" }, + "basic_vsock": { + "if": "linux", + "args": "run -A --unstable-vsock main.ts basic.ts", + "envs": { + "OTEL_DENO_VSOCK": "1:4317" + }, + "output": "basic.out" + }, "natural_exit": { "args": "run -A main.ts natural_exit.ts", "output": "natural_exit.out" diff --git a/tests/specs/cli/otel_basic/main.ts b/tests/specs/cli/otel_basic/main.ts index 341a0ecd25..984ac685e3 100644 --- a/tests/specs/cli/otel_basic/main.ts +++ b/tests/specs/cli/otel_basic/main.ts @@ -6,107 +6,120 @@ const data = { metrics: [], }; -const server = Deno.serve( - { +async function handler(req) { + const body = await req.json(); + body.resourceLogs?.forEach((rLogs) => { + rLogs.scopeLogs.forEach((sLogs) => { + data.logs.push(...sLogs.logRecords); + }); + }); + body.resourceSpans?.forEach((rSpans) => { + rSpans.scopeSpans.forEach((sSpans) => { + data.spans.push(...sSpans.spans); + }); + }); + body.resourceMetrics?.forEach((rMetrics) => { + rMetrics.scopeMetrics.forEach((sMetrics) => { + data.metrics.push(...sMetrics.metrics); + }); + }); + return Response.json({ partialSuccess: {} }, { status: 200 }); +} + +let server; + +function onListen({ port }) { + const command = new Deno.Command(Deno.execPath(), { + args: [ + "run", + "--env-file=env_file", + "-A", + "-q", + Deno.args[0], + ], + env: { + // rest of env is in env_file + OTEL_EXPORTER_OTLP_ENDPOINT: `https://localhost:${port}`, + }, + stdout: "null", + }); + const child = command.spawn(); + child.status + .then((status) => { + if (status.signal) { + throw new Error("child process failed: " + JSON.stringify(status)); + } + return server.shutdown(); + }) + .then(() => { + data.logs.sort((a, b) => + Number( + BigInt(a.observedTimeUnixNano) - BigInt(b.observedTimeUnixNano), + ) + ); + data.spans.sort((a, b) => + Number(BigInt(`0x${a.spanId}`) - BigInt(`0x${b.spanId}`)) + ); + // v8js metrics are non-deterministic + data.metrics = data.metrics.filter((m) => !m.name.startsWith("v8js")); + data.metrics.sort((a, b) => a.name.localeCompare(b.name)); + for (const metric of data.metrics) { + if ("histogram" in metric) { + metric.histogram.dataPoints.sort((a, b) => { + const aKey = a.attributes + .sort((x, y) => x.key.localeCompare(y.key)) + .map(({ key, value }) => `${key}:${JSON.stringify(value)}`) + .join("|"); + const bKey = b.attributes + .sort((x, y) => x.key.localeCompare(y.key)) + .map(({ key, value }) => `${key}:${JSON.stringify(value)}`) + .join("|"); + return aKey.localeCompare(bKey); + }); + + for (const dataPoint of metric.histogram.dataPoints) { + dataPoint.attributes.sort((a, b) => { + return a.key.localeCompare(b.key); + }); + } + } + if ("sum" in metric) { + metric.sum.dataPoints.sort((a, b) => { + const aKey = a.attributes + .sort((x, y) => x.key.localeCompare(y.key)) + .map(({ key, value }) => `${key}:${JSON.stringify(value)}`) + .join("|"); + const bKey = b.attributes + .sort((x, y) => x.key.localeCompare(y.key)) + .map(({ key, value }) => `${key}:${JSON.stringify(value)}`) + .join("|"); + return aKey.localeCompare(bKey); + }); + + for (const dataPoint of metric.sum.dataPoints) { + dataPoint.attributes.sort((a, b) => { + return a.key.localeCompare(b.key); + }); + } + } + } + console.log(JSON.stringify(data, null, 2)); + }); +} + +if (Deno.env.get("OTEL_DENO_VSOCK")) { + server = Deno.serve({ + cid: -1, + port: 4317, + onListen, + handler, + }); +} else { + server = Deno.serve({ key: Deno.readTextFileSync("../../../testdata/tls/localhost.key"), cert: Deno.readTextFileSync("../../../testdata/tls/localhost.crt"), port: 0, - onListen({ port }) { - const command = new Deno.Command(Deno.execPath(), { - args: [ - "run", - "--env-file=env_file", - "-A", - "-q", - Deno.args[0], - ], - env: { - // rest of env is in env_file - OTEL_EXPORTER_OTLP_ENDPOINT: `https://localhost:${port}`, - }, - stdout: "null", - }); - const child = command.spawn(); - child.status - .then((status) => { - if (status.signal) { - throw new Error("child process failed: " + JSON.stringify(status)); - } - return server.shutdown(); - }) - .then(() => { - data.logs.sort((a, b) => - Number( - BigInt(a.observedTimeUnixNano) - BigInt(b.observedTimeUnixNano), - ) - ); - data.spans.sort((a, b) => - Number(BigInt(`0x${a.spanId}`) - BigInt(`0x${b.spanId}`)) - ); - // v8js metrics are non-deterministic - data.metrics = data.metrics.filter((m) => !m.name.startsWith("v8js")); - data.metrics.sort((a, b) => a.name.localeCompare(b.name)); - for (const metric of data.metrics) { - if ("histogram" in metric) { - metric.histogram.dataPoints.sort((a, b) => { - const aKey = a.attributes - .sort((x, y) => x.key.localeCompare(y.key)) - .map(({ key, value }) => `${key}:${JSON.stringify(value)}`) - .join("|"); - const bKey = b.attributes - .sort((x, y) => x.key.localeCompare(y.key)) - .map(({ key, value }) => `${key}:${JSON.stringify(value)}`) - .join("|"); - return aKey.localeCompare(bKey); - }); - - for (const dataPoint of metric.histogram.dataPoints) { - dataPoint.attributes.sort((a, b) => { - return a.key.localeCompare(b.key); - }); - } - } - if ("sum" in metric) { - metric.sum.dataPoints.sort((a, b) => { - const aKey = a.attributes - .sort((x, y) => x.key.localeCompare(y.key)) - .map(({ key, value }) => `${key}:${JSON.stringify(value)}`) - .join("|"); - const bKey = b.attributes - .sort((x, y) => x.key.localeCompare(y.key)) - .map(({ key, value }) => `${key}:${JSON.stringify(value)}`) - .join("|"); - return aKey.localeCompare(bKey); - }); - - for (const dataPoint of metric.sum.dataPoints) { - dataPoint.attributes.sort((a, b) => { - return a.key.localeCompare(b.key); - }); - } - } - } - console.log(JSON.stringify(data, null, 2)); - }); - }, - async handler(req) { - const body = await req.json(); - body.resourceLogs?.forEach((rLogs) => { - rLogs.scopeLogs.forEach((sLogs) => { - data.logs.push(...sLogs.logRecords); - }); - }); - body.resourceSpans?.forEach((rSpans) => { - rSpans.scopeSpans.forEach((sSpans) => { - data.spans.push(...sSpans.spans); - }); - }); - body.resourceMetrics?.forEach((rMetrics) => { - rMetrics.scopeMetrics.forEach((sMetrics) => { - data.metrics.push(...sMetrics.metrics); - }); - }); - return Response.json({ partialSuccess: {} }, { status: 200 }); - }, - }, -); + onListen, + handler, + }); +}