diff --git a/Cargo.lock b/Cargo.lock index 892309cbe2..8d151c07f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2369,6 +2369,7 @@ dependencies = [ "deno_permissions", "deno_signals", "deno_tls", + "deno_tunnel", "hickory-proto", "hickory-resolver", "log", @@ -2978,6 +2979,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "deno_tunnel" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38d54bc70cc874c19c3703d3b3b7d5fe709cbf98ff0aa7e9298825bf0e7c6f74" +dependencies = [ + "pin-project", + "quinn", + "serde", + "serde_json", + "thiserror 2.0.12", + "tokio", +] + [[package]] name = "deno_unsync" version = "0.4.4" @@ -3848,6 +3863,18 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "fastbloom" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27cea6e7f512d43b098939ff4d5a5d6fe3db07971e1d05176fe26c642d33f5b8" +dependencies = [ + "getrandom 0.3.3", + "rand 0.9.1", + "siphasher 1.0.1", + "wide", +] + [[package]] name = "faster-hex" version = "0.10.0" @@ -5180,6 +5207,17 @@ dependencies = [ "generic-array", ] +[[package]] +name = "io-uring" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b86e202f00093dcba4275d4636b93ef9dd75d025ae560d2521b45ea28ab49013" +dependencies = [ + "bitflags 2.8.0", + "cfg-if", + "libc", +] + [[package]] name = "ipconfig" version = "0.3.2" @@ -6680,7 +6718,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" dependencies = [ - "siphasher", + "siphasher 0.3.11", ] [[package]] @@ -7055,6 +7093,7 @@ checksum = "49df843a9161c85bb8aae55f101bc0bac8bcafd637a620d9122fd7e0b2f7422e" dependencies = [ "aws-lc-rs", "bytes", + "fastbloom", "getrandom 0.3.3", "lru-slab", "rand 0.9.1", @@ -7703,6 +7742,15 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad97d4ce1560a5e27cec89519dc8300d1aa6035b099821261c651486a19e44d5" +[[package]] +name = "safe_arch" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96b02de82ddbe1b636e6170c21be622223aea188ef2e139be0a5b219ec215323" +dependencies = [ + "bytemuck", +] + [[package]] name = "saffron" version = "0.1.0" @@ -8087,6 +8135,12 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +[[package]] +name = "siphasher" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" + [[package]] name = "slab" version = "0.4.9" @@ -8391,7 +8445,7 @@ dependencies = [ "once_cell", "rustc-hash 2.1.1", "serde", - "siphasher", + "siphasher 0.3.11", "sourcemap", "swc_allocator", "swc_atoms", @@ -9150,17 +9204,19 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.45.1" +version = "1.46.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779" +checksum = "0cc3a2344dafbe23a245241fe8b09735b521110d30fcefbbd5feb1797ca35d17" dependencies = [ "backtrace", "bytes", + "io-uring", "libc", "mio 1.0.3", "parking_lot", "pin-project-lite", "signal-hook-registry", + "slab", "socket2", "tokio-macros", "windows-sys 0.52.0", @@ -10135,6 +10191,16 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wide" +version = "0.7.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce5da8ecb62bcd8ec8b7ea19f69a51275e91299be594ea5cc6ef7819e16cd03" +dependencies = [ + "bytemuck", + "safe_arch", +] + [[package]] name = "widestring" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index d3ef612625..c541d1f220 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -131,6 +131,8 @@ napi_sym = { version = "0.140.0", path = "./ext/napi/sym" } node_resolver = { version = "0.48.0", path = "./libs/node_resolver" } test_util = { package = "test_server", path = "./tests/util/server" } +deno_tunnel = "0.2.0" + # widely used libraries anyhow = "1.0.57" arc-swap = "1.7" diff --git a/cli/lib/util/logger.rs b/cli/lib/util/logger.rs index 9487a9fbf4..1077cd0e6b 100644 --- a/cli/lib/util/logger.rs +++ b/cli/lib/util/logger.rs @@ -91,6 +91,7 @@ pub fn init(options: InitLoggingOptions) { .filter_module("swc_ecma_codegen", log::LevelFilter::Off) .filter_module("swc_ecma_transforms_optimization", log::LevelFilter::Off) .filter_module("swc_ecma_parser", log::LevelFilter::Error) + .filter_module("swc_ecma_lexer", log::LevelFilter::Error) // Suppress span lifecycle logs since they are too verbose .filter_module("tracing::span", log::LevelFilter::Off) .filter_module("tower_lsp", log::LevelFilter::Trace) diff --git a/cli/main.rs b/cli/main.rs index f100077298..86bceaf788 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -903,29 +903,41 @@ async fn initialize_tunnel( let cert_store_provider = factory.root_cert_store_provider(); let root_cert_store = cert_store_provider.get_or_try_init()?.clone(); + let tls_config = deno_runtime::deno_tls::create_client_config( + Some(root_cert_store), + vec![], + None, + deno_runtime::deno_tls::TlsKeys::Null, + deno_runtime::deno_tls::SocketUse::GeneralSsl, + )?; + let (tunnel, metadata, mut events) = - match deno_runtime::deno_net::tunnel::TunnelListener::connect( + match deno_runtime::deno_net::tunnel::TunnelConnection::connect( addr, hostname, - Some(root_cert_store.clone()), - token, - org.clone(), - app.clone(), + tls_config.clone(), + deno_runtime::deno_net::tunnel::Authentication::App { + token, + org: org.clone(), + app: app.clone(), + }, ) .await { Ok(res) => res, - Err(deno_runtime::deno_net::tunnel::Error::InvalidToken) => { + Err(deno_runtime::deno_net::tunnel::Error::Unauthorized) => { tools::deploy::get_token_entry()?.delete_credential()?; let token = auth_tunnel().await?; - deno_runtime::deno_net::tunnel::TunnelListener::connect( + deno_runtime::deno_net::tunnel::TunnelConnection::connect( addr, hostname, - Some(root_cert_store), - token, - org, - app, + tls_config, + deno_runtime::deno_net::tunnel::Authentication::App { + token, + org, + app, + }, ) .await? } diff --git a/ext/net/Cargo.toml b/ext/net/Cargo.toml index 1a03a0ebf5..aae42adc43 100644 --- a/ext/net/Cargo.toml +++ b/ext/net/Cargo.toml @@ -20,6 +20,7 @@ deno_features.workspace = true deno_permissions.workspace = true deno_signals.workspace = true deno_tls.workspace = true +deno_tunnel.workspace = true hickory-proto.workspace = true hickory-resolver.workspace = true log.workspace = true diff --git a/ext/net/ops.rs b/ext/net/ops.rs index 651b20c9db..bc398e8f8b 100644 --- a/ext/net/ops.rs +++ b/ext/net/ops.rs @@ -864,7 +864,7 @@ pub async fn op_net_accept_tunnel( let resource = state .borrow() .resource_table - .get::>(rid) + .get::>(rid) .map_err(|_| NetError::ListenerClosed)?; let listener = RcRef::map(&resource, |r| &r.listener) .try_borrow_mut() diff --git a/ext/net/raw.rs b/ext/net/raw.rs index cc51823f5f..aa21bf73db 100644 --- a/ext/net/raw.rs +++ b/ext/net/raw.rs @@ -295,7 +295,7 @@ network_stream!( Tunnel, tunnel, crate::tunnel::TunnelStream, - crate::tunnel::TunnelListener, + crate::tunnel::TunnelConnection, crate::tunnel::TunnelAddr, crate::tunnel::TunnelStreamResource ] @@ -332,7 +332,7 @@ network_stream!( Tunnel, tunnel, crate::tunnel::TunnelStream, - crate::tunnel::TunnelListener, + crate::tunnel::TunnelConnection, crate::tunnel::TunnelAddr, crate::tunnel::TunnelStreamResource ] @@ -360,7 +360,7 @@ network_stream!( Tunnel, tunnel, crate::tunnel::TunnelStream, - crate::tunnel::TunnelListener, + crate::tunnel::TunnelConnection, crate::tunnel::TunnelAddr, crate::tunnel::TunnelStreamResource ] diff --git a/ext/net/tunnel.rs b/ext/net/tunnel.rs index e579babb32..59f09af81d 100644 --- a/ext/net/tunnel.rs +++ b/ext/net/tunnel.rs @@ -1,12 +1,9 @@ // Copyright 2018-2025 the Deno authors. MIT license. use std::collections::HashMap; -use std::net::IpAddr; use std::net::SocketAddr; use std::rc::Rc; -use std::sync::Arc; use std::sync::OnceLock; -use std::time::Duration; use deno_core::AsyncMutFuture; use deno_core::AsyncRefCell; @@ -17,50 +14,21 @@ use deno_core::RcRef; use deno_core::Resource; use deno_core::futures::TryFutureExt; use deno_error::JsErrorBox; -use deno_tls::SocketUse; -use deno_tls::TlsKeys; -use deno_tls::create_client_config; -use deno_tls::rustls::RootCertStore; -pub use quinn; -use quinn::ConnectionError; -use quinn::crypto::rustls::QuicClientConfig; -use tokio::io::AsyncRead; +pub use deno_tunnel::Authentication; +pub use deno_tunnel::Error; +pub use deno_tunnel::Event; +pub use deno_tunnel::OwnedReadHalf; +pub use deno_tunnel::OwnedWriteHalf; +pub use deno_tunnel::TunnelAddr; +pub use deno_tunnel::TunnelConnection; +pub use deno_tunnel::TunnelStream; +pub use deno_tunnel::quinn; use tokio::io::AsyncReadExt; -use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; -const VERSION: u32 = 1; +static TUNNEL: OnceLock = OnceLock::new(); -#[derive(thiserror::Error, Debug)] -pub enum Error { - #[error(transparent)] - StdIo(#[from] std::io::Error), - #[error(transparent)] - SerdeJson(#[from] deno_core::serde_json::Error), - #[error(transparent)] - Tls(#[from] deno_tls::TlsError), - #[error(transparent)] - QuinnConnect(#[from] quinn::ConnectError), - #[error(transparent)] - QuinnConnection(#[from] quinn::ConnectionError), - #[error(transparent)] - QuinnRead(#[from] quinn::ReadError), - #[error(transparent)] - QuinnReadExact(#[from] quinn::ReadExactError), - #[error(transparent)] - QuinnWrite(#[from] quinn::WriteError), - - #[error("Unexpected header")] - UnexpectedHeader, - #[error("Unsupported version")] - UnsupportedVersion, - #[error("Invalid authorization token")] - InvalidToken, -} - -static TUNNEL: OnceLock = OnceLock::new(); - -pub fn set_tunnel(tunnel: crate::tunnel::TunnelListener) { +pub fn set_tunnel(tunnel: TunnelConnection) { if TUNNEL.set(tunnel).is_ok() { deno_signals::before_exit(before_exit); } @@ -68,288 +36,29 @@ pub fn set_tunnel(tunnel: crate::tunnel::TunnelListener) { fn before_exit() { if let Some(tunnel) = get_tunnel() { - tunnel.connection.close(1u32.into(), b""); // stay alive long enough to actually send the close frame, since // we can't rely on the linux kernel to close this like with tcp. - deno_core::futures::executor::block_on(tunnel.endpoint.wait_idle()); + deno_core::futures::executor::block_on(tunnel.close(1u32, b"")); } } -pub fn get_tunnel() -> Option<&'static crate::tunnel::TunnelListener> { +pub fn get_tunnel() -> Option<&'static TunnelConnection> { TUNNEL.get() } -/// Essentially a SocketAddr, except we prefer a human -/// readable hostname to identify the remote endpoint. -#[derive(Debug, Clone)] -pub struct TunnelAddr { - socket: SocketAddr, - hostname: Option, -} - -impl TunnelAddr { - pub fn hostname(&self) -> String { - self - .hostname - .clone() - .unwrap_or_else(|| self.socket.ip().to_string()) - } - - pub fn ip(&self) -> IpAddr { - self.socket.ip() - } - - pub fn port(&self) -> u16 { - self.socket.port() - } -} - -#[derive(Debug)] -pub struct Metadata { - pub hostnames: Vec, - pub env: HashMap, -} - -#[derive(Debug)] -pub enum Event { - Routed, - Migrate, -} - -#[derive(Debug)] -pub struct Events { - event_rx: tokio::sync::mpsc::Receiver, -} - -impl Events { - pub async fn next(&mut self) -> Option { - self.event_rx.recv().await - } -} - -#[derive(Debug, Clone)] -pub struct TunnelListener { - endpoint: quinn::Endpoint, - connection: quinn::Connection, - local_addr: TunnelAddr, -} - -impl TunnelListener { - pub async fn connect( - addr: std::net::SocketAddr, - hostname: &str, - root_cert_store: Option, - token: String, - org: String, - app: String, - ) -> Result<(Self, Metadata, Events), Error> { - let config = quinn::EndpointConfig::default(); - let socket = std::net::UdpSocket::bind(("::", 0))?; - let endpoint = quinn::Endpoint::new( - config, - None, - socket, - quinn::default_runtime().unwrap(), - )?; - - let mut tls_config = create_client_config( - root_cert_store, - vec![], - None, - TlsKeys::Null, - SocketUse::GeneralSsl, - )?; - - tls_config.alpn_protocols = vec!["🦕🕳️".into()]; - tls_config.enable_early_data = true; - - let mut transport_config = quinn::TransportConfig::default(); - transport_config.keep_alive_interval(Some(Duration::from_secs(5))); - transport_config - .max_idle_timeout(Some(Duration::from_secs(15).try_into().unwrap())); - - let client_config = - QuicClientConfig::try_from(tls_config).expect("TLS13 supported"); - let mut client_config = quinn::ClientConfig::new(Arc::new(client_config)); - client_config.transport_config(Arc::new(transport_config)); - - let connecting = endpoint.connect_with(client_config, addr, hostname)?; - - let connection = connecting.await?; - - let mut control = connection.open_bi().await?; - control.0.write_u32_le(VERSION).await?; - if control.1.read_u32_le().await? != VERSION { - return Err(Error::UnsupportedVersion); - } - - write_message(&mut control.0, StreamHeader::Control { token, org, app }) - .await?; - - let ControlMessage::Authenticated { - addr, - hostnames, - env, - metadata, - } = read_message(&mut control.1).await? - else { - return Err(Error::UnexpectedHeader); - }; - - let (event_tx, event_rx) = tokio::sync::mpsc::channel(1); - tokio::spawn(async move { - while let Ok(message) = read_message(&mut control.1).await { - let event = match message { - ControlMessage::Routed {} => Event::Routed, - ControlMessage::Migrate {} => Event::Migrate, - _ => { - continue; - } - }; - if event_tx.send(event).await.is_err() { - break; - } - } - }); - - log::debug!("tunnel connected: {metadata:?}"); - - let local_addr = TunnelAddr { - socket: addr, - hostname: hostnames.first().cloned(), - }; - - let metadata = Metadata { hostnames, env }; - let routed = Events { event_rx }; - - Ok(( - Self { - endpoint, - connection, - local_addr, - }, - metadata, - routed, - )) - } -} - -impl TunnelListener { - pub fn local_addr(&self) -> Result { - Ok(self.local_addr.clone()) - } - - pub async fn accept( - &self, - ) -> Result<(TunnelStream, TunnelAddr), std::io::Error> { - let (tx, mut rx) = self.connection.accept_bi().await?; - - let StreamHeader::Stream { - remote_addr, - local_addr, - } = read_message(&mut rx).await.map_err(std::io::Error::other)? - else { - return Err(std::io::Error::other(Error::UnexpectedHeader)); - }; - - Ok(( - TunnelStream { - tx, - rx, - local_addr, - remote_addr, - }, - TunnelAddr { - hostname: None, - socket: remote_addr, - }, - )) - } - - pub async fn create_agent_stream(&self) -> Result { - let (mut tx, rx) = self.connection.open_bi().await?; - write_message(&mut tx, StreamHeader::Agent {}).await?; - Ok(TunnelStream { - tx, - rx, - local_addr: self.endpoint.local_addr()?, - remote_addr: self.connection.remote_address(), - }) - } -} - -#[derive(Debug)] -#[pin_project::pin_project] -pub struct TunnelStream { - #[pin] - tx: quinn::SendStream, - #[pin] - rx: quinn::RecvStream, - - local_addr: SocketAddr, - remote_addr: SocketAddr, -} - -impl TunnelStream { - pub fn local_addr(&self) -> Result { - Ok(self.local_addr) - } - - pub fn peer_addr(&self) -> Result { - Ok(self.remote_addr) - } -} - -impl AsyncRead for TunnelStream { - fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> std::task::Poll> { - self.project().rx.poll_read(cx, buf) - } -} - -impl AsyncWrite for TunnelStream { - fn poll_write( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> std::task::Poll> { - AsyncWrite::poll_write(self.project().tx, cx, buf) - } - - fn poll_flush( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.project().tx.poll_flush(cx) - } - - fn poll_shutdown( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.project().tx.poll_shutdown(cx) - } -} - #[derive(Debug)] pub struct TunnelStreamResource { - tx: AsyncRefCell, - rx: AsyncRefCell, - local_addr: SocketAddr, - remote_addr: SocketAddr, + tx: AsyncRefCell, + rx: AsyncRefCell, cancel_handle: CancelHandle, } impl TunnelStreamResource { pub fn new(stream: TunnelStream) -> Self { + let (read_half, write_half) = stream.into_split(); Self { - tx: AsyncRefCell::new(stream.tx), - rx: AsyncRefCell::new(stream.rx), - local_addr: stream.local_addr, - remote_addr: stream.remote_addr, + tx: AsyncRefCell::new(write_half), + rx: AsyncRefCell::new(read_half), cancel_handle: Default::default(), } } @@ -357,19 +66,14 @@ impl TunnelStreamResource { pub fn into_inner(self) -> TunnelStream { let tx = self.tx.into_inner(); let rx = self.rx.into_inner(); - TunnelStream { - tx, - rx, - local_addr: self.local_addr, - remote_addr: self.remote_addr, - } + rx.unsplit(tx) } - fn rd_borrow_mut(self: &Rc) -> AsyncMutFuture { + fn rd_borrow_mut(self: &Rc) -> AsyncMutFuture { RcRef::map(self, |r| &r.rx).borrow_mut() } - fn wr_borrow_mut(self: &Rc) -> AsyncMutFuture { + fn wr_borrow_mut(self: &Rc) -> AsyncMutFuture { RcRef::map(self, |r| &r.tx).borrow_mut() } @@ -388,8 +92,7 @@ impl Resource for TunnelStreamResource { .read(&mut vec) .map_err(|e| JsErrorBox::generic(format!("{e}"))) .try_or_cancel(self.cancel_handle()) - .await? - .unwrap_or(0); + .await?; if nread != vec.len() { vec.truncate(nread); } @@ -408,8 +111,7 @@ impl Resource for TunnelStreamResource { .read(&mut buf) .map_err(|e| JsErrorBox::generic(format!("{e}"))) .try_or_cancel(self.cancel_handle()) - .await? - .unwrap_or(0); + .await?; Ok((nread, buf)) }) } @@ -439,7 +141,7 @@ impl Resource for TunnelStreamResource { fn shutdown(self: Rc) -> AsyncResult<()> { Box::pin(async move { let mut wr = self.wr_borrow_mut().await; - wr.reset(quinn::VarInt::from_u32(0)) + wr.reset(0u32) .map_err(|e| JsErrorBox::generic(format!("{e}")))?; Ok(()) }) @@ -475,38 +177,3 @@ enum ControlMessage { Routed {}, Migrate {}, } - -async fn write_message( - tx: &mut quinn::SendStream, - message: T, -) -> Result<(), Error> { - let data = deno_core::serde_json::to_vec(&message)?; - tx.write_u32_le(data.len() as _).await?; - tx.write_all(&data).await?; - Ok(()) -} - -async fn read_message( - rx: &mut quinn::RecvStream, -) -> Result { - let length = rx.read_u32_le().await.map_err(|e| { - if let Some(custom_error) = e.get_ref() { - if let Some(quinn::ReadError::ConnectionLost( - ConnectionError::ApplicationClosed(err), - )) = custom_error.downcast_ref::() - { - if err.reason == b"invalid token".as_slice() { - return Error::InvalidToken; - } - } - } - - e.into() - })?; - let mut data = vec![0; length as usize]; - rx.read_exact(&mut data).await?; - - let message = deno_core::serde_json::from_slice(&data)?; - - Ok(message) -} diff --git a/ext/telemetry/lib.rs b/ext/telemetry/lib.rs index eb921d1ff8..08067250dc 100644 --- a/ext/telemetry/lib.rs +++ b/ext/telemetry/lib.rs @@ -500,7 +500,7 @@ mod hyper_client { use std::pin::Pin; use std::task::Poll; - use deno_net::tunnel::TunnelListener; + use deno_net::tunnel::TunnelConnection; use deno_net::tunnel::TunnelStream; use deno_net::tunnel::get_tunnel; use deno_tls::SocketUse; @@ -544,7 +544,7 @@ mod hyper_client { #[derive(Debug, Clone)] enum Connector { Http(HttpsConnector), - Tunnel(TunnelListener), + Tunnel(TunnelConnection), #[cfg(any(target_os = "linux", target_os = "macos"))] Vsock(VsockAddr), } diff --git a/tests/specs/run/tunnel/test.ts b/tests/specs/run/tunnel/test.ts index 55bd1ad2a0..7b2b681e38 100644 --- a/tests/specs/run/tunnel/test.ts +++ b/tests/specs/run/tunnel/test.ts @@ -58,6 +58,11 @@ async function handleConnection(conn: Deno.QuicConn) { conn.close(); return; } + const auth = await readStreamHeader(reader); + if (auth.headerType !== "AuthenticateApp") { + conn.close(); + return; + } await writeStreamHeader(writer, { headerType: "Authenticated", hostnames: ["localhost"],