feat(otel): support vsock transport for telemetry (#30001)
Some checks are pending
ci / test debug macos-x86_64 (push) Blocked by required conditions
ci / build libs (push) Blocked by required conditions
ci / test debug macos-aarch64 (push) Blocked by required conditions
ci / test release macos-aarch64 (push) Blocked by required conditions
ci / bench release linux-x86_64 (push) Blocked by required conditions
ci / lint debug linux-x86_64 (push) Blocked by required conditions
ci / lint debug macos-x86_64 (push) Blocked by required conditions
ci / lint debug windows-x86_64 (push) Blocked by required conditions
ci / test debug linux-x86_64 (push) Blocked by required conditions
ci / test release linux-x86_64 (push) Blocked by required conditions
ci / test release macos-x86_64 (push) Blocked by required conditions
ci / test debug windows-x86_64 (push) Blocked by required conditions
ci / test release windows-x86_64 (push) Blocked by required conditions
ci / pre-build (push) Waiting to run
ci / test debug linux-aarch64 (push) Blocked by required conditions
ci / test release linux-aarch64 (push) Blocked by required conditions
ci / publish canary (push) Blocked by required conditions

implement new connector which uses either HttpsConnector or VsockStream.
will also be easy to add tunnels to this later.
This commit is contained in:
snek 2025-07-07 15:43:43 +02:00 committed by GitHub
parent 478121e118
commit 50e5c69f18
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 329 additions and 166 deletions

2
Cargo.lock generated
View file

@ -2855,6 +2855,8 @@ dependencies = [
"serde",
"thiserror 2.0.12",
"tokio",
"tokio-vsock",
"tower-service",
]
[[package]]

View file

@ -33,3 +33,7 @@ pin-project.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio.workspace = true
tower-service.workspace = true
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dependencies]
tokio-vsock.workspace = true

View file

@ -499,7 +499,6 @@ mod hyper_client {
use std::fmt::Debug;
use std::pin::Pin;
use std::task::Poll;
use std::task::{self};
use deno_tls::SocketUse;
use deno_tls::TlsKey;
@ -509,27 +508,192 @@ mod hyper_client {
use deno_tls::load_private_keys;
use http_body_util::BodyExt;
use http_body_util::Full;
use hyper::body::Body as HttpBody;
use hyper::body::Frame;
use hyper::Uri;
use hyper_rustls::HttpsConnector;
use hyper_rustls::MaybeHttpsStream;
use hyper_util::client::legacy::Client;
use hyper_util::client::legacy::connect::Connected;
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::rt::TokioIo;
use opentelemetry_http::Bytes;
use opentelemetry_http::HttpError;
use opentelemetry_http::Request;
use opentelemetry_http::Response;
use opentelemetry_http::ResponseExt;
use tokio::net::TcpStream;
#[cfg(any(target_os = "linux", target_os = "macos"))]
use tokio_vsock::VsockAddr;
#[cfg(any(target_os = "linux", target_os = "macos"))]
use tokio_vsock::VsockStream;
use super::OtelSharedRuntime;
// same as opentelemetry_http::HyperClient except it uses OtelSharedRuntime
#[derive(Debug, thiserror::Error)]
enum Error {
#[error(transparent)]
StdIo(#[from] std::io::Error),
#[error(transparent)]
Box(#[from] Box<dyn std::error::Error + Send + Sync>),
}
#[derive(Debug, Clone)]
enum Connector {
Http(HttpsConnector<HttpConnector>),
#[cfg(any(target_os = "linux", target_os = "macos"))]
Vsock(VsockAddr),
}
#[pin_project::pin_project(project = IOProj)]
enum IO {
Tls(#[pin] TokioIo<MaybeHttpsStream<TokioIo<TcpStream>>>),
#[cfg(any(target_os = "linux", target_os = "macos"))]
Vsock(#[pin] VsockStream),
}
impl tokio::io::AsyncRead for IO {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
match self.project() {
IOProj::Tls(stream) => stream.poll_read(cx, buf),
#[cfg(any(target_os = "linux", target_os = "macos"))]
IOProj::Vsock(stream) => stream.poll_read(cx, buf),
}
}
}
impl tokio::io::AsyncWrite for IO {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
match self.project() {
IOProj::Tls(stream) => stream.poll_write(cx, buf),
#[cfg(any(target_os = "linux", target_os = "macos"))]
IOProj::Vsock(stream) => stream.poll_write(cx, buf),
}
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
match self.project() {
IOProj::Tls(stream) => stream.poll_flush(cx),
#[cfg(any(target_os = "linux", target_os = "macos"))]
IOProj::Vsock(stream) => stream.poll_flush(cx),
}
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
match self.project() {
IOProj::Tls(stream) => stream.poll_shutdown(cx),
#[cfg(any(target_os = "linux", target_os = "macos"))]
IOProj::Vsock(stream) => stream.poll_shutdown(cx),
}
}
fn is_write_vectored(&self) -> bool {
match self {
IO::Tls(stream) => stream.is_write_vectored(),
#[cfg(any(target_os = "linux", target_os = "macos"))]
IO::Vsock(stream) => stream.is_write_vectored(),
}
}
fn poll_write_vectored(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
match self.project() {
IOProj::Tls(stream) => stream.poll_write_vectored(cx, bufs),
#[cfg(any(target_os = "linux", target_os = "macos"))]
IOProj::Vsock(stream) => stream.poll_write_vectored(cx, bufs),
}
}
}
impl hyper_util::client::legacy::connect::Connection for IO {
fn connected(&self) -> Connected {
match self {
Self::Tls(stream) => stream.connected(),
#[cfg(any(target_os = "linux", target_os = "macos"))]
Self::Vsock(_) => Connected::new().proxy(true),
}
}
}
impl tower_service::Service<Uri> for Connector {
type Response = TokioIo<IO>;
type Error = Error;
type Future = Pin<
Box<
dyn std::future::Future<Output = Result<Self::Response, Self::Error>>
+ Send,
>,
>;
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
match self {
Self::Http(c) => c.poll_ready(cx).map_err(Into::into),
#[cfg(any(target_os = "linux", target_os = "macos"))]
Self::Vsock(_) => Poll::Ready(Ok(())),
}
}
fn call(&mut self, dst: Uri) -> Self::Future {
let this = self.clone();
Box::pin(async move {
match this {
Self::Http(mut connector) => {
let stream = connector.call(dst).await?;
Ok(TokioIo::new(IO::Tls(TokioIo::new(stream))))
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
Self::Vsock(addr) => {
let stream = VsockStream::connect(addr).await?;
Ok(TokioIo::new(IO::Vsock(stream)))
}
}
})
}
}
#[derive(Debug, Clone)]
pub struct HyperClient {
inner: Client<HttpsConnector<HttpConnector>, Body>,
inner: Client<Connector, Full<Bytes>>,
}
impl HyperClient {
pub fn new() -> deno_core::anyhow::Result<Self> {
let connector = if let Ok(addr) = std::env::var("OTEL_DENO_VSOCK") {
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
let _ = addr;
deno_core::anyhow::bail!("vsock is not supported on this platform")
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
{
let Some((cid, port)) = addr.split_once(':') else {
deno_core::anyhow::bail!("invalid vsock addr");
};
let cid = if cid == "-1" { u32::MAX } else { cid.parse()? };
let port = port.parse()?;
let addr = VsockAddr::new(cid, port);
Connector::Vsock(addr)
}
} else {
let ca_certs = match std::env::var("OTEL_EXPORTER_OTLP_CERTIFICATE") {
Ok(path) => vec![std::fs::read(path)?],
_ => vec![],
@ -556,6 +720,8 @@ mod hyper_client {
let mut http_connector = HttpConnector::new();
http_connector.enforce_http(false);
let connector = HttpsConnector::from((http_connector, tls_config));
Connector::Http(connector)
};
Ok(Self {
inner: Client::builder(OtelSharedRuntime).build(connector),
@ -570,42 +736,12 @@ mod hyper_client {
request: Request<Vec<u8>>,
) -> Result<Response<Bytes>, HttpError> {
let (parts, body) = request.into_parts();
let request = Request::from_parts(parts, Body(Full::from(body)));
let mut response = self.inner.request(request).await?;
let headers = std::mem::take(response.headers_mut());
let mut http_response = Response::builder()
.status(response.status())
.body(response.into_body().collect().await?.to_bytes())?;
*http_response.headers_mut() = headers;
Ok(http_response.error_for_status()?)
}
}
#[pin_project::pin_project]
pub struct Body(#[pin] Full<Bytes>);
impl HttpBody for Body {
type Data = Bytes;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
#[inline]
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
self.project().0.poll_frame(cx).map_err(Into::into)
}
#[inline]
fn is_end_stream(&self) -> bool {
self.0.is_end_stream()
}
#[inline]
fn size_hint(&self) -> hyper::body::SizeHint {
self.0.size_hint()
let request = Request::from_parts(parts, Full::from(body));
let response = self.inner.request(request).await?;
let (parts, body) = response.into_parts();
let body = body.collect().await?.to_bytes();
let response = Response::from_parts(parts, body);
Ok(response.error_for_status()?)
}
}
}

View file

@ -4,6 +4,14 @@
"args": "run -A main.ts basic.ts",
"output": "basic.out"
},
"basic_vsock": {
"if": "linux",
"args": "run -A --unstable-vsock main.ts basic.ts",
"envs": {
"OTEL_DENO_VSOCK": "1:4317"
},
"output": "basic.out"
},
"natural_exit": {
"args": "run -A main.ts natural_exit.ts",
"output": "natural_exit.out"

View file

@ -6,12 +6,29 @@ const data = {
metrics: [],
};
const server = Deno.serve(
{
key: Deno.readTextFileSync("../../../testdata/tls/localhost.key"),
cert: Deno.readTextFileSync("../../../testdata/tls/localhost.crt"),
port: 0,
onListen({ port }) {
async function handler(req) {
const body = await req.json();
body.resourceLogs?.forEach((rLogs) => {
rLogs.scopeLogs.forEach((sLogs) => {
data.logs.push(...sLogs.logRecords);
});
});
body.resourceSpans?.forEach((rSpans) => {
rSpans.scopeSpans.forEach((sSpans) => {
data.spans.push(...sSpans.spans);
});
});
body.resourceMetrics?.forEach((rMetrics) => {
rMetrics.scopeMetrics.forEach((sMetrics) => {
data.metrics.push(...sMetrics.metrics);
});
});
return Response.json({ partialSuccess: {} }, { status: 200 });
}
let server;
function onListen({ port }) {
const command = new Deno.Command(Deno.execPath(), {
args: [
"run",
@ -88,25 +105,21 @@ const server = Deno.serve(
}
console.log(JSON.stringify(data, null, 2));
});
},
async handler(req) {
const body = await req.json();
body.resourceLogs?.forEach((rLogs) => {
rLogs.scopeLogs.forEach((sLogs) => {
data.logs.push(...sLogs.logRecords);
}
if (Deno.env.get("OTEL_DENO_VSOCK")) {
server = Deno.serve({
cid: -1,
port: 4317,
onListen,
handler,
});
} else {
server = Deno.serve({
key: Deno.readTextFileSync("../../../testdata/tls/localhost.key"),
cert: Deno.readTextFileSync("../../../testdata/tls/localhost.crt"),
port: 0,
onListen,
handler,
});
body.resourceSpans?.forEach((rSpans) => {
rSpans.scopeSpans.forEach((sSpans) => {
data.spans.push(...sSpans.spans);
});
});
body.resourceMetrics?.forEach((rMetrics) => {
rMetrics.scopeMetrics.forEach((sMetrics) => {
data.metrics.push(...sMetrics.metrics);
});
});
return Response.json({ partialSuccess: {} }, { status: 200 });
},
},
);
}