chore: switch to deno_tunnel crate (#30049)

switch to `deno_tunnel` crate for shared code with deploy
This commit is contained in:
snek 2025-07-14 16:11:02 +02:00 committed by GitHub
parent 1dd0db2e16
commit 46009f7368
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 132 additions and 378 deletions

74
Cargo.lock generated
View file

@ -2369,6 +2369,7 @@ dependencies = [
"deno_permissions",
"deno_signals",
"deno_tls",
"deno_tunnel",
"hickory-proto",
"hickory-resolver",
"log",
@ -2978,6 +2979,20 @@ dependencies = [
"tracing",
]
[[package]]
name = "deno_tunnel"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38d54bc70cc874c19c3703d3b3b7d5fe709cbf98ff0aa7e9298825bf0e7c6f74"
dependencies = [
"pin-project",
"quinn",
"serde",
"serde_json",
"thiserror 2.0.12",
"tokio",
]
[[package]]
name = "deno_unsync"
version = "0.4.4"
@ -3848,6 +3863,18 @@ dependencies = [
"tokio-stream",
]
[[package]]
name = "fastbloom"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "27cea6e7f512d43b098939ff4d5a5d6fe3db07971e1d05176fe26c642d33f5b8"
dependencies = [
"getrandom 0.3.3",
"rand 0.9.1",
"siphasher 1.0.1",
"wide",
]
[[package]]
name = "faster-hex"
version = "0.10.0"
@ -5180,6 +5207,17 @@ dependencies = [
"generic-array",
]
[[package]]
name = "io-uring"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b86e202f00093dcba4275d4636b93ef9dd75d025ae560d2521b45ea28ab49013"
dependencies = [
"bitflags 2.8.0",
"cfg-if",
"libc",
]
[[package]]
name = "ipconfig"
version = "0.3.2"
@ -6680,7 +6718,7 @@ version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b"
dependencies = [
"siphasher",
"siphasher 0.3.11",
]
[[package]]
@ -7055,6 +7093,7 @@ checksum = "49df843a9161c85bb8aae55f101bc0bac8bcafd637a620d9122fd7e0b2f7422e"
dependencies = [
"aws-lc-rs",
"bytes",
"fastbloom",
"getrandom 0.3.3",
"lru-slab",
"rand 0.9.1",
@ -7703,6 +7742,15 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad97d4ce1560a5e27cec89519dc8300d1aa6035b099821261c651486a19e44d5"
[[package]]
name = "safe_arch"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96b02de82ddbe1b636e6170c21be622223aea188ef2e139be0a5b219ec215323"
dependencies = [
"bytemuck",
]
[[package]]
name = "saffron"
version = "0.1.0"
@ -8087,6 +8135,12 @@ version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d"
[[package]]
name = "siphasher"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d"
[[package]]
name = "slab"
version = "0.4.9"
@ -8391,7 +8445,7 @@ dependencies = [
"once_cell",
"rustc-hash 2.1.1",
"serde",
"siphasher",
"siphasher 0.3.11",
"sourcemap",
"swc_allocator",
"swc_atoms",
@ -9150,17 +9204,19 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.45.1"
version = "1.46.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779"
checksum = "0cc3a2344dafbe23a245241fe8b09735b521110d30fcefbbd5feb1797ca35d17"
dependencies = [
"backtrace",
"bytes",
"io-uring",
"libc",
"mio 1.0.3",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"slab",
"socket2",
"tokio-macros",
"windows-sys 0.52.0",
@ -10135,6 +10191,16 @@ dependencies = [
"web-sys",
]
[[package]]
name = "wide"
version = "0.7.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce5da8ecb62bcd8ec8b7ea19f69a51275e91299be594ea5cc6ef7819e16cd03"
dependencies = [
"bytemuck",
"safe_arch",
]
[[package]]
name = "widestring"
version = "1.1.0"

View file

@ -131,6 +131,8 @@ napi_sym = { version = "0.140.0", path = "./ext/napi/sym" }
node_resolver = { version = "0.48.0", path = "./libs/node_resolver" }
test_util = { package = "test_server", path = "./tests/util/server" }
deno_tunnel = "0.2.0"
# widely used libraries
anyhow = "1.0.57"
arc-swap = "1.7"

View file

@ -91,6 +91,7 @@ pub fn init(options: InitLoggingOptions) {
.filter_module("swc_ecma_codegen", log::LevelFilter::Off)
.filter_module("swc_ecma_transforms_optimization", log::LevelFilter::Off)
.filter_module("swc_ecma_parser", log::LevelFilter::Error)
.filter_module("swc_ecma_lexer", log::LevelFilter::Error)
// Suppress span lifecycle logs since they are too verbose
.filter_module("tracing::span", log::LevelFilter::Off)
.filter_module("tower_lsp", log::LevelFilter::Trace)

View file

@ -903,29 +903,41 @@ async fn initialize_tunnel(
let cert_store_provider = factory.root_cert_store_provider();
let root_cert_store = cert_store_provider.get_or_try_init()?.clone();
let tls_config = deno_runtime::deno_tls::create_client_config(
Some(root_cert_store),
vec![],
None,
deno_runtime::deno_tls::TlsKeys::Null,
deno_runtime::deno_tls::SocketUse::GeneralSsl,
)?;
let (tunnel, metadata, mut events) =
match deno_runtime::deno_net::tunnel::TunnelListener::connect(
match deno_runtime::deno_net::tunnel::TunnelConnection::connect(
addr,
hostname,
Some(root_cert_store.clone()),
token,
org.clone(),
app.clone(),
tls_config.clone(),
deno_runtime::deno_net::tunnel::Authentication::App {
token,
org: org.clone(),
app: app.clone(),
},
)
.await
{
Ok(res) => res,
Err(deno_runtime::deno_net::tunnel::Error::InvalidToken) => {
Err(deno_runtime::deno_net::tunnel::Error::Unauthorized) => {
tools::deploy::get_token_entry()?.delete_credential()?;
let token = auth_tunnel().await?;
deno_runtime::deno_net::tunnel::TunnelListener::connect(
deno_runtime::deno_net::tunnel::TunnelConnection::connect(
addr,
hostname,
Some(root_cert_store),
token,
org,
app,
tls_config,
deno_runtime::deno_net::tunnel::Authentication::App {
token,
org,
app,
},
)
.await?
}

View file

@ -20,6 +20,7 @@ deno_features.workspace = true
deno_permissions.workspace = true
deno_signals.workspace = true
deno_tls.workspace = true
deno_tunnel.workspace = true
hickory-proto.workspace = true
hickory-resolver.workspace = true
log.workspace = true

View file

@ -864,7 +864,7 @@ pub async fn op_net_accept_tunnel(
let resource = state
.borrow()
.resource_table
.get::<NetworkListenerResource<crate::tunnel::TunnelListener>>(rid)
.get::<NetworkListenerResource<crate::tunnel::TunnelConnection>>(rid)
.map_err(|_| NetError::ListenerClosed)?;
let listener = RcRef::map(&resource, |r| &r.listener)
.try_borrow_mut()

View file

@ -295,7 +295,7 @@ network_stream!(
Tunnel,
tunnel,
crate::tunnel::TunnelStream,
crate::tunnel::TunnelListener,
crate::tunnel::TunnelConnection,
crate::tunnel::TunnelAddr,
crate::tunnel::TunnelStreamResource
]
@ -332,7 +332,7 @@ network_stream!(
Tunnel,
tunnel,
crate::tunnel::TunnelStream,
crate::tunnel::TunnelListener,
crate::tunnel::TunnelConnection,
crate::tunnel::TunnelAddr,
crate::tunnel::TunnelStreamResource
]
@ -360,7 +360,7 @@ network_stream!(
Tunnel,
tunnel,
crate::tunnel::TunnelStream,
crate::tunnel::TunnelListener,
crate::tunnel::TunnelConnection,
crate::tunnel::TunnelAddr,
crate::tunnel::TunnelStreamResource
]

View file

@ -1,12 +1,9 @@
// Copyright 2018-2025 the Deno authors. MIT license.
use std::collections::HashMap;
use std::net::IpAddr;
use std::net::SocketAddr;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Duration;
use deno_core::AsyncMutFuture;
use deno_core::AsyncRefCell;
@ -17,50 +14,21 @@ use deno_core::RcRef;
use deno_core::Resource;
use deno_core::futures::TryFutureExt;
use deno_error::JsErrorBox;
use deno_tls::SocketUse;
use deno_tls::TlsKeys;
use deno_tls::create_client_config;
use deno_tls::rustls::RootCertStore;
pub use quinn;
use quinn::ConnectionError;
use quinn::crypto::rustls::QuicClientConfig;
use tokio::io::AsyncRead;
pub use deno_tunnel::Authentication;
pub use deno_tunnel::Error;
pub use deno_tunnel::Event;
pub use deno_tunnel::OwnedReadHalf;
pub use deno_tunnel::OwnedWriteHalf;
pub use deno_tunnel::TunnelAddr;
pub use deno_tunnel::TunnelConnection;
pub use deno_tunnel::TunnelStream;
pub use deno_tunnel::quinn;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
const VERSION: u32 = 1;
static TUNNEL: OnceLock<TunnelConnection> = OnceLock::new();
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error(transparent)]
StdIo(#[from] std::io::Error),
#[error(transparent)]
SerdeJson(#[from] deno_core::serde_json::Error),
#[error(transparent)]
Tls(#[from] deno_tls::TlsError),
#[error(transparent)]
QuinnConnect(#[from] quinn::ConnectError),
#[error(transparent)]
QuinnConnection(#[from] quinn::ConnectionError),
#[error(transparent)]
QuinnRead(#[from] quinn::ReadError),
#[error(transparent)]
QuinnReadExact(#[from] quinn::ReadExactError),
#[error(transparent)]
QuinnWrite(#[from] quinn::WriteError),
#[error("Unexpected header")]
UnexpectedHeader,
#[error("Unsupported version")]
UnsupportedVersion,
#[error("Invalid authorization token")]
InvalidToken,
}
static TUNNEL: OnceLock<crate::tunnel::TunnelListener> = OnceLock::new();
pub fn set_tunnel(tunnel: crate::tunnel::TunnelListener) {
pub fn set_tunnel(tunnel: TunnelConnection) {
if TUNNEL.set(tunnel).is_ok() {
deno_signals::before_exit(before_exit);
}
@ -68,288 +36,29 @@ pub fn set_tunnel(tunnel: crate::tunnel::TunnelListener) {
fn before_exit() {
if let Some(tunnel) = get_tunnel() {
tunnel.connection.close(1u32.into(), b"");
// stay alive long enough to actually send the close frame, since
// we can't rely on the linux kernel to close this like with tcp.
deno_core::futures::executor::block_on(tunnel.endpoint.wait_idle());
deno_core::futures::executor::block_on(tunnel.close(1u32, b""));
}
}
pub fn get_tunnel() -> Option<&'static crate::tunnel::TunnelListener> {
pub fn get_tunnel() -> Option<&'static TunnelConnection> {
TUNNEL.get()
}
/// Essentially a SocketAddr, except we prefer a human
/// readable hostname to identify the remote endpoint.
#[derive(Debug, Clone)]
pub struct TunnelAddr {
socket: SocketAddr,
hostname: Option<String>,
}
impl TunnelAddr {
pub fn hostname(&self) -> String {
self
.hostname
.clone()
.unwrap_or_else(|| self.socket.ip().to_string())
}
pub fn ip(&self) -> IpAddr {
self.socket.ip()
}
pub fn port(&self) -> u16 {
self.socket.port()
}
}
#[derive(Debug)]
pub struct Metadata {
pub hostnames: Vec<String>,
pub env: HashMap<String, String>,
}
#[derive(Debug)]
pub enum Event {
Routed,
Migrate,
}
#[derive(Debug)]
pub struct Events {
event_rx: tokio::sync::mpsc::Receiver<Event>,
}
impl Events {
pub async fn next(&mut self) -> Option<Event> {
self.event_rx.recv().await
}
}
#[derive(Debug, Clone)]
pub struct TunnelListener {
endpoint: quinn::Endpoint,
connection: quinn::Connection,
local_addr: TunnelAddr,
}
impl TunnelListener {
pub async fn connect(
addr: std::net::SocketAddr,
hostname: &str,
root_cert_store: Option<RootCertStore>,
token: String,
org: String,
app: String,
) -> Result<(Self, Metadata, Events), Error> {
let config = quinn::EndpointConfig::default();
let socket = std::net::UdpSocket::bind(("::", 0))?;
let endpoint = quinn::Endpoint::new(
config,
None,
socket,
quinn::default_runtime().unwrap(),
)?;
let mut tls_config = create_client_config(
root_cert_store,
vec![],
None,
TlsKeys::Null,
SocketUse::GeneralSsl,
)?;
tls_config.alpn_protocols = vec!["🦕🕳️".into()];
tls_config.enable_early_data = true;
let mut transport_config = quinn::TransportConfig::default();
transport_config.keep_alive_interval(Some(Duration::from_secs(5)));
transport_config
.max_idle_timeout(Some(Duration::from_secs(15).try_into().unwrap()));
let client_config =
QuicClientConfig::try_from(tls_config).expect("TLS13 supported");
let mut client_config = quinn::ClientConfig::new(Arc::new(client_config));
client_config.transport_config(Arc::new(transport_config));
let connecting = endpoint.connect_with(client_config, addr, hostname)?;
let connection = connecting.await?;
let mut control = connection.open_bi().await?;
control.0.write_u32_le(VERSION).await?;
if control.1.read_u32_le().await? != VERSION {
return Err(Error::UnsupportedVersion);
}
write_message(&mut control.0, StreamHeader::Control { token, org, app })
.await?;
let ControlMessage::Authenticated {
addr,
hostnames,
env,
metadata,
} = read_message(&mut control.1).await?
else {
return Err(Error::UnexpectedHeader);
};
let (event_tx, event_rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(async move {
while let Ok(message) = read_message(&mut control.1).await {
let event = match message {
ControlMessage::Routed {} => Event::Routed,
ControlMessage::Migrate {} => Event::Migrate,
_ => {
continue;
}
};
if event_tx.send(event).await.is_err() {
break;
}
}
});
log::debug!("tunnel connected: {metadata:?}");
let local_addr = TunnelAddr {
socket: addr,
hostname: hostnames.first().cloned(),
};
let metadata = Metadata { hostnames, env };
let routed = Events { event_rx };
Ok((
Self {
endpoint,
connection,
local_addr,
},
metadata,
routed,
))
}
}
impl TunnelListener {
pub fn local_addr(&self) -> Result<TunnelAddr, std::io::Error> {
Ok(self.local_addr.clone())
}
pub async fn accept(
&self,
) -> Result<(TunnelStream, TunnelAddr), std::io::Error> {
let (tx, mut rx) = self.connection.accept_bi().await?;
let StreamHeader::Stream {
remote_addr,
local_addr,
} = read_message(&mut rx).await.map_err(std::io::Error::other)?
else {
return Err(std::io::Error::other(Error::UnexpectedHeader));
};
Ok((
TunnelStream {
tx,
rx,
local_addr,
remote_addr,
},
TunnelAddr {
hostname: None,
socket: remote_addr,
},
))
}
pub async fn create_agent_stream(&self) -> Result<TunnelStream, Error> {
let (mut tx, rx) = self.connection.open_bi().await?;
write_message(&mut tx, StreamHeader::Agent {}).await?;
Ok(TunnelStream {
tx,
rx,
local_addr: self.endpoint.local_addr()?,
remote_addr: self.connection.remote_address(),
})
}
}
#[derive(Debug)]
#[pin_project::pin_project]
pub struct TunnelStream {
#[pin]
tx: quinn::SendStream,
#[pin]
rx: quinn::RecvStream,
local_addr: SocketAddr,
remote_addr: SocketAddr,
}
impl TunnelStream {
pub fn local_addr(&self) -> Result<SocketAddr, std::io::Error> {
Ok(self.local_addr)
}
pub fn peer_addr(&self) -> Result<SocketAddr, std::io::Error> {
Ok(self.remote_addr)
}
}
impl AsyncRead for TunnelStream {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
self.project().rx.poll_read(cx, buf)
}
}
impl AsyncWrite for TunnelStream {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
AsyncWrite::poll_write(self.project().tx, cx, buf)
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
self.project().tx.poll_flush(cx)
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
self.project().tx.poll_shutdown(cx)
}
}
#[derive(Debug)]
pub struct TunnelStreamResource {
tx: AsyncRefCell<quinn::SendStream>,
rx: AsyncRefCell<quinn::RecvStream>,
local_addr: SocketAddr,
remote_addr: SocketAddr,
tx: AsyncRefCell<OwnedWriteHalf>,
rx: AsyncRefCell<OwnedReadHalf>,
cancel_handle: CancelHandle,
}
impl TunnelStreamResource {
pub fn new(stream: TunnelStream) -> Self {
let (read_half, write_half) = stream.into_split();
Self {
tx: AsyncRefCell::new(stream.tx),
rx: AsyncRefCell::new(stream.rx),
local_addr: stream.local_addr,
remote_addr: stream.remote_addr,
tx: AsyncRefCell::new(write_half),
rx: AsyncRefCell::new(read_half),
cancel_handle: Default::default(),
}
}
@ -357,19 +66,14 @@ impl TunnelStreamResource {
pub fn into_inner(self) -> TunnelStream {
let tx = self.tx.into_inner();
let rx = self.rx.into_inner();
TunnelStream {
tx,
rx,
local_addr: self.local_addr,
remote_addr: self.remote_addr,
}
rx.unsplit(tx)
}
fn rd_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<quinn::RecvStream> {
fn rd_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<OwnedReadHalf> {
RcRef::map(self, |r| &r.rx).borrow_mut()
}
fn wr_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<quinn::SendStream> {
fn wr_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<OwnedWriteHalf> {
RcRef::map(self, |r| &r.tx).borrow_mut()
}
@ -388,8 +92,7 @@ impl Resource for TunnelStreamResource {
.read(&mut vec)
.map_err(|e| JsErrorBox::generic(format!("{e}")))
.try_or_cancel(self.cancel_handle())
.await?
.unwrap_or(0);
.await?;
if nread != vec.len() {
vec.truncate(nread);
}
@ -408,8 +111,7 @@ impl Resource for TunnelStreamResource {
.read(&mut buf)
.map_err(|e| JsErrorBox::generic(format!("{e}")))
.try_or_cancel(self.cancel_handle())
.await?
.unwrap_or(0);
.await?;
Ok((nread, buf))
})
}
@ -439,7 +141,7 @@ impl Resource for TunnelStreamResource {
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
Box::pin(async move {
let mut wr = self.wr_borrow_mut().await;
wr.reset(quinn::VarInt::from_u32(0))
wr.reset(0u32)
.map_err(|e| JsErrorBox::generic(format!("{e}")))?;
Ok(())
})
@ -475,38 +177,3 @@ enum ControlMessage {
Routed {},
Migrate {},
}
async fn write_message<T: serde::Serialize>(
tx: &mut quinn::SendStream,
message: T,
) -> Result<(), Error> {
let data = deno_core::serde_json::to_vec(&message)?;
tx.write_u32_le(data.len() as _).await?;
tx.write_all(&data).await?;
Ok(())
}
async fn read_message<T: serde::de::DeserializeOwned>(
rx: &mut quinn::RecvStream,
) -> Result<T, Error> {
let length = rx.read_u32_le().await.map_err(|e| {
if let Some(custom_error) = e.get_ref() {
if let Some(quinn::ReadError::ConnectionLost(
ConnectionError::ApplicationClosed(err),
)) = custom_error.downcast_ref::<quinn::ReadError>()
{
if err.reason == b"invalid token".as_slice() {
return Error::InvalidToken;
}
}
}
e.into()
})?;
let mut data = vec![0; length as usize];
rx.read_exact(&mut data).await?;
let message = deno_core::serde_json::from_slice(&data)?;
Ok(message)
}

View file

@ -500,7 +500,7 @@ mod hyper_client {
use std::pin::Pin;
use std::task::Poll;
use deno_net::tunnel::TunnelListener;
use deno_net::tunnel::TunnelConnection;
use deno_net::tunnel::TunnelStream;
use deno_net::tunnel::get_tunnel;
use deno_tls::SocketUse;
@ -544,7 +544,7 @@ mod hyper_client {
#[derive(Debug, Clone)]
enum Connector {
Http(HttpsConnector<HttpConnector>),
Tunnel(TunnelListener),
Tunnel(TunnelConnection),
#[cfg(any(target_os = "linux", target_os = "macos"))]
Vsock(VsockAddr),
}

View file

@ -58,6 +58,11 @@ async function handleConnection(conn: Deno.QuicConn) {
conn.close();
return;
}
const auth = await readStreamHeader(reader);
if (auth.headerType !== "AuthenticateApp") {
conn.close();
return;
}
await writeStreamHeader(writer, {
headerType: "Authenticated",
hostnames: ["localhost"],