deno/ext/net/tunnel.rs
林炳权 68297b5f10
Some checks are pending
ci / pre-build (push) Waiting to run
ci / test debug linux-aarch64 (push) Blocked by required conditions
ci / test release linux-aarch64 (push) Blocked by required conditions
ci / test debug macos-aarch64 (push) Blocked by required conditions
ci / test release macos-aarch64 (push) Blocked by required conditions
ci / bench release linux-x86_64 (push) Blocked by required conditions
ci / lint debug linux-x86_64 (push) Blocked by required conditions
ci / lint debug macos-x86_64 (push) Blocked by required conditions
ci / lint debug windows-x86_64 (push) Blocked by required conditions
ci / test debug linux-x86_64 (push) Blocked by required conditions
ci / test release linux-x86_64 (push) Blocked by required conditions
ci / test debug macos-x86_64 (push) Blocked by required conditions
ci / test release macos-x86_64 (push) Blocked by required conditions
ci / test debug windows-x86_64 (push) Blocked by required conditions
ci / test release windows-x86_64 (push) Blocked by required conditions
ci / build libs (push) Blocked by required conditions
ci / publish canary (push) Blocked by required conditions
chore: Rust 1.89.0 (#30364)
Related PR: https://github.com/denoland/deno/pull/30354
2025-08-09 11:11:48 +00:00

196 lines
4.7 KiB
Rust

// Copyright 2018-2025 the Deno authors. MIT license.
use std::collections::HashMap;
use std::net::SocketAddr;
use std::rc::Rc;
use std::sync::OnceLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use deno_core::AsyncMutFuture;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::futures::TryFutureExt;
use deno_error::JsErrorBox;
pub use deno_tunnel::Authentication;
pub use deno_tunnel::Error;
pub use deno_tunnel::Event;
pub use deno_tunnel::OwnedReadHalf;
pub use deno_tunnel::OwnedWriteHalf;
pub use deno_tunnel::TunnelAddr;
pub use deno_tunnel::TunnelConnection;
pub use deno_tunnel::TunnelStream;
pub use deno_tunnel::quinn;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
static TUNNEL: OnceLock<TunnelConnection> = OnceLock::new();
static RUN_BEFORE_EXIT: AtomicBool = AtomicBool::new(true);
pub fn set_tunnel(tunnel: TunnelConnection) {
if TUNNEL.set(tunnel).is_ok() {
deno_signals::before_exit(before_exit_internal);
}
}
fn before_exit_internal() {
if RUN_BEFORE_EXIT.load(Ordering::Relaxed) {
before_exit();
}
}
pub fn disable_before_exit() {
RUN_BEFORE_EXIT.store(false, Ordering::Relaxed);
}
pub fn before_exit() {
if let Some(tunnel) = get_tunnel() {
log::trace!("deno_net::tunnel::before_exit >");
// stay alive long enough to actually send the close frame, since
// we can't rely on the linux kernel to close this like with tcp.
deno_core::futures::executor::block_on(tunnel.close(1u32, b""));
log::trace!("deno_net::tunnel::before_exit <");
}
}
pub fn get_tunnel() -> Option<&'static TunnelConnection> {
TUNNEL.get()
}
#[derive(Debug)]
pub struct TunnelStreamResource {
tx: AsyncRefCell<OwnedWriteHalf>,
rx: AsyncRefCell<OwnedReadHalf>,
cancel_handle: CancelHandle,
}
impl TunnelStreamResource {
pub fn new(stream: TunnelStream) -> Self {
let (read_half, write_half) = stream.into_split();
Self {
tx: AsyncRefCell::new(write_half),
rx: AsyncRefCell::new(read_half),
cancel_handle: Default::default(),
}
}
pub fn into_inner(self) -> TunnelStream {
let tx = self.tx.into_inner();
let rx = self.rx.into_inner();
rx.unsplit(tx)
}
fn rd_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<OwnedReadHalf> {
RcRef::map(self, |r| &r.rx).borrow_mut()
}
fn wr_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<OwnedWriteHalf> {
RcRef::map(self, |r| &r.tx).borrow_mut()
}
pub fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> {
RcRef::map(self, |r| &r.cancel_handle)
}
}
impl Resource for TunnelStreamResource {
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<deno_core::BufView> {
Box::pin(async move {
let mut vec = vec![0; limit];
let nread = self
.rd_borrow_mut()
.await
.read(&mut vec)
.map_err(|e| JsErrorBox::generic(format!("{e}")))
.try_or_cancel(self.cancel_handle())
.await?;
if nread != vec.len() {
vec.truncate(nread);
}
Ok(vec.into())
})
}
fn read_byob(
self: Rc<Self>,
mut buf: deno_core::BufMutView,
) -> AsyncResult<(usize, deno_core::BufMutView)> {
Box::pin(async move {
let nread = self
.rd_borrow_mut()
.await
.read(&mut buf)
.map_err(|e| JsErrorBox::generic(format!("{e}")))
.try_or_cancel(self.cancel_handle())
.await?;
Ok((nread, buf))
})
}
fn write(
self: Rc<Self>,
buf: deno_core::BufView,
) -> AsyncResult<deno_core::WriteOutcome> {
Box::pin(async move {
let nwritten = self
.wr_borrow_mut()
.await
.write(&buf)
.await
.map_err(|e| JsErrorBox::generic(format!("{e}")))?;
Ok(deno_core::WriteOutcome::Partial {
nwritten,
view: buf,
})
})
}
fn name(&self) -> std::borrow::Cow<'_, str> {
"tunnelStream".into()
}
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
Box::pin(async move {
let mut wr = self.wr_borrow_mut().await;
wr.reset(0u32)
.map_err(|e| JsErrorBox::generic(format!("{e}")))?;
Ok(())
})
}
fn close(self: Rc<Self>) {
self.cancel_handle.cancel()
}
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
enum StreamHeader {
Control {
token: String,
org: String,
app: String,
},
Stream {
local_addr: SocketAddr,
remote_addr: SocketAddr,
},
Agent {},
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
enum ControlMessage {
Authenticated {
metadata: HashMap<String, String>,
addr: SocketAddr,
hostnames: Vec<String>,
env: HashMap<String, String>,
},
Routed {},
Migrate {},
}