From 36e9eb2023ce602b8ec11ab48a079492d30d6844 Mon Sep 17 00:00:00 2001 From: Divy Date: Thu, 28 Aug 2025 05:26:17 -0700 Subject: [PATCH] fix(ext/node): support JS underlying stream in TLS (#30465) Fixes https://github.com/denoland/deno/issues/20594 This implements `JSStreamSocket` which drives the TLS underlying stream in `rustls_tokio_stream` using 2 sets of channels. One for piping the encrypted protocol transport and the other for plaintext application data. This fixes connecting to `npm:mssql`: ```js import sql from "npm:mssql"; const sqlConfig = { server: "localhost", user: "divy", password: "123", database: "master", options: { trustServerCertificate: true, }, }; const pool = await sql.connect(sqlConfig); const result = await pool.request().query(`SELECT * FROM sys.databases`); ``` --- Cargo.lock | 2 + ext/node/Cargo.toml | 2 + ext/node/clippy.toml | 3 - ext/node/lib.rs | 2 + ext/node/ops/tls.rs | 506 ++++++++++++++++++ ext/node/polyfills/_tls_wrap.js | 91 +++- .../polyfills/internal_binding/stream_wrap.ts | 2 +- ext/node/polyfills/net.ts | 4 +- tests/unit_node/tls_test.ts | 64 ++- 9 files changed, 651 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7c8c06a133..1a2c855b21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2403,6 +2403,7 @@ dependencies = [ "deno_permissions", "deno_process", "deno_subprocess_windows", + "deno_tls", "deno_whoami", "der", "digest", @@ -2442,6 +2443,7 @@ dependencies = [ "ripemd", "rsa", "rusqlite", + "rustls-tokio-stream", "scrypt", "sec1", "serde", diff --git a/ext/node/Cargo.toml b/ext/node/Cargo.toml index 14767d8e0a..c1acd65d88 100644 --- a/ext/node/Cargo.toml +++ b/ext/node/Cargo.toml @@ -40,6 +40,7 @@ deno_package_json.workspace = true deno_path_util.workspace = true deno_permissions.workspace = true deno_process.workspace = true +deno_tls.workspace = true deno_whoami.workspace = true der = { workspace = true, features = ["derive"] } digest = { workspace = true, features = ["core-api", "std"] } @@ -78,6 +79,7 @@ rand.workspace = true ripemd = { workspace = true, features = ["oid"] } rsa.workspace = true rusqlite.workspace = true +rustls-tokio-stream.workspace = true scrypt.workspace = true sec1.workspace = true serde.workspace = true diff --git a/ext/node/clippy.toml b/ext/node/clippy.toml index 49e76c4a5a..58b1e31071 100644 --- a/ext/node/clippy.toml +++ b/ext/node/clippy.toml @@ -28,6 +28,3 @@ disallowed-methods = [ { path = "std::fs::symlink_metadata", reason = "File system operations should be done using FileSystem trait" }, { path = "std::fs::write", reason = "File system operations should be done using FileSystem trait" }, ] -disallowed-types = [ - { path = "std::sync::Arc", reason = "use deno_fs::sync::MaybeArc instead" }, -] diff --git a/ext/node/lib.rs b/ext/node/lib.rs index b9dfd690bb..23876e812a 100644 --- a/ext/node/lib.rs +++ b/ext/node/lib.rs @@ -460,6 +460,8 @@ deno_core::extension!(deno_node, ops::tls::op_get_root_certificates, ops::tls::op_tls_peer_certificate, ops::tls::op_tls_canonicalize_ipv4_address, + ops::tls::op_node_tls_start, + ops::tls::op_node_tls_handshake, ops::inspector::op_inspector_open

, ops::inspector::op_inspector_close, ops::inspector::op_inspector_url, diff --git a/ext/node/ops/tls.rs b/ext/node/ops/tls.rs index 8569e39a63..db1be31518 100644 --- a/ext/node/ops/tls.rs +++ b/ext/node/ops/tls.rs @@ -1,9 +1,46 @@ // Copyright 2018-2025 the Deno authors. MIT license. +use std::borrow::Cow; +use std::cell::RefCell; +use std::collections::VecDeque; +use std::future::Future; +use std::io::Error; +use std::io::ErrorKind; +use std::num::NonZeroUsize; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::task::Context; +use std::task::Poll; + use base64::Engine; +use bytes::Bytes; +use deno_core::AsyncRefCell; +use deno_core::AsyncResult; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; use deno_core::ResourceId; use deno_core::op2; +use deno_net::DefaultTlsOptions; +use deno_net::UnsafelyIgnoreCertificateErrors; +use deno_net::ops::NetError; +use deno_net::ops::TlsHandshakeInfo; use deno_net::ops_tls::TlsStreamResource; +use deno_tls::SocketUse; +use deno_tls::TlsClientConfigOptions; +use deno_tls::TlsKeys; +use deno_tls::TlsKeysHolder; +use deno_tls::create_client_config; +use deno_tls::rustls::ClientConnection; +use deno_tls::rustls::pki_types::ServerName; +use rustls_tokio_stream::TlsStream; +use rustls_tokio_stream::TlsStreamRead; +use rustls_tokio_stream::TlsStreamWrite; +use rustls_tokio_stream::UnderlyingStream; +use serde::Deserialize; use webpki_root_certs; use super::crypto::x509::Certificate; @@ -68,3 +105,472 @@ pub fn op_tls_canonicalize_ipv4_address( Some(canonical_ip) } + +struct ReadableFuture<'a> { + socket: &'a JSStreamSocket, +} + +impl<'a> Future for ReadableFuture<'a> { + type Output = std::io::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.socket.poll_read_ready(cx) + } +} + +struct WritableFuture<'a> { + socket: &'a JSStreamSocket, +} + +impl<'a> Future for WritableFuture<'a> { + type Output = std::io::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.socket.poll_write_ready(cx) + } +} + +#[derive(Debug)] +pub struct JSStreamSocket { + readable: Arc>>, + writable: tokio::sync::mpsc::Sender, + read_buffer: Arc>>, + closed: AtomicBool, +} + +impl JSStreamSocket { + pub fn new( + readable: tokio::sync::mpsc::Receiver, + writable: tokio::sync::mpsc::Sender, + ) -> Self { + Self { + readable: Arc::new(Mutex::new(readable)), + writable, + read_buffer: Arc::new(Mutex::new(VecDeque::new())), + closed: AtomicBool::new(false), + } + } +} + +impl UnderlyingStream for JSStreamSocket { + type StdType = (); + + fn poll_read_ready( + &self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + // Check if we have buffered data + if let Ok(buffer) = self.read_buffer.lock() + && !buffer.is_empty() + { + return Poll::Ready(Ok(())); + } + + if self.closed.load(Ordering::Relaxed) { + return Poll::Ready(Err(Error::new( + ErrorKind::UnexpectedEof, + "Stream closed", + ))); + } + + // Try to poll for data without consuming it + if let Ok(mut receiver) = self.readable.lock() { + match receiver.poll_recv(cx) { + Poll::Ready(Some(data)) => { + // Store the data in buffer for try_read + if let Ok(mut buffer) = self.read_buffer.lock() { + buffer.push_back(data); + } + Poll::Ready(Ok(())) + } + Poll::Ready(None) => { + // Channel closed + self.closed.store(true, Ordering::Relaxed); + Poll::Ready(Err(Error::new( + ErrorKind::UnexpectedEof, + "Channel closed", + ))) + } + Poll::Pending => Poll::Pending, + } + } else { + panic!("Failed to acquire lock") + } + } + + fn poll_write_ready( + &self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + if self.closed.load(Ordering::Relaxed) { + return Poll::Ready(Err(Error::new( + ErrorKind::BrokenPipe, + "Stream closed", + ))); + } + + // For bounded sender, check if channel is ready + if self.writable.is_closed() { + self.closed.store(true, Ordering::Relaxed); + Poll::Ready(Err(Error::new(ErrorKind::BrokenPipe, "Channel closed"))) + } else { + Poll::Ready(Ok(())) + } + } + + fn try_read(&self, buf: &mut [u8]) -> std::io::Result { + if self.closed.load(Ordering::Relaxed) { + return Err(Error::new(ErrorKind::UnexpectedEof, "Stream closed")); + } + + // Check if we have buffered data first + if let Ok(mut buffer) = self.read_buffer.lock() + && let Some(data) = buffer.pop_front() + { + let len = std::cmp::min(buf.len(), data.len()); + buf[..len].copy_from_slice(&data[..len]); + + // If there's leftover data, put it back in the buffer + if data.len() > len { + buffer.push_front(data.slice(len..)); + } + + return Ok(len); + } + + // Try to read from channel non-blocking + if let Ok(mut receiver) = self.readable.lock() { + match receiver.try_recv() { + Ok(data) => { + let len = std::cmp::min(buf.len(), data.len()); + buf[..len].copy_from_slice(&data[..len]); + + // If there's leftover data, store it in buffer + if data.len() > len + && let Ok(mut buffer) = self.read_buffer.lock() + { + buffer.push_front(data.slice(len..)); + } + + Ok(len) + } + Err(tokio::sync::mpsc::error::TryRecvError::Empty) => { + Err(Error::new(ErrorKind::WouldBlock, "No data available")) + } + Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => { + self.closed.store(true, Ordering::Relaxed); + Err(Error::new(ErrorKind::UnexpectedEof, "Channel closed")) + } + } + } else { + Err(Error::other("Failed to acquire lock")) + } + } + + fn try_write(&self, buf: &[u8]) -> std::io::Result { + if self.closed.load(Ordering::Relaxed) { + return Err(Error::new(ErrorKind::BrokenPipe, "Stream closed")); + } + + if self.writable.is_closed() { + self.closed.store(true, Ordering::Relaxed); + return Err(Error::new(ErrorKind::BrokenPipe, "Channel closed")); + } + + let data = Bytes::copy_from_slice(buf); + match self.writable.try_send(data) { + Ok(()) => Ok(buf.len()), + Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { + Err(Error::new(ErrorKind::WouldBlock, "Channel full")) + } + Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { + self.closed.store(true, Ordering::Relaxed); + Err(Error::new(ErrorKind::BrokenPipe, "Channel closed")) + } + } + } + + fn readable(&self) -> impl Future> + Send { + ReadableFuture { socket: self } + } + + fn writable(&self) -> impl Future> + Send { + WritableFuture { socket: self } + } + + fn shutdown(&self, _: std::net::Shutdown) -> std::io::Result<()> { + self.closed.store(true, Ordering::Relaxed); + Ok(()) + } + + fn into_std(self) -> Option> { + None + } +} + +struct JSDuplexResource { + readable: Arc>>, + writable: tokio::sync::mpsc::Sender, + read_buffer: Arc>>, +} + +impl JSDuplexResource { + pub fn new( + readable: tokio::sync::mpsc::Receiver, + writable: tokio::sync::mpsc::Sender, + ) -> Self { + Self { + readable: Arc::new(Mutex::new(readable)), + writable, + read_buffer: Arc::new(Mutex::new(VecDeque::new())), + } + } + + #[allow(clippy::await_holding_lock)] + pub async fn read( + self: Rc, + data: &mut [u8], + ) -> Result { + // First check if we have buffered data from previous partial read + if let Ok(mut buffer) = self.read_buffer.lock() + && let Some(buffered_data) = buffer.pop_front() + { + let len = std::cmp::min(data.len(), buffered_data.len()); + data[..len].copy_from_slice(&buffered_data[..len]); + + // If there's remaining data, put it back in buffer + if buffered_data.len() > len { + buffer.push_front(buffered_data.slice(len..)); + } + + return Ok(len); + } + + // No buffered data, receive new data from channel + let bytes = { + let mut receiver = self + .readable + .lock() + .map_err(|_| Error::other("Failed to acquire lock"))?; + receiver.recv().await + }; + + match bytes { + Some(bytes) => { + let len = std::cmp::min(data.len(), bytes.len()); + data[..len].copy_from_slice(&bytes[..len]); + + // If there's remaining data, buffer it for next read + if bytes.len() > len + && let Ok(mut buffer) = self.read_buffer.lock() + { + buffer.push_back(bytes.slice(len..)); + } + + Ok(len) + } + None => { + // Channel closed + Ok(0) + } + } + } + + pub async fn write( + self: Rc, + data: &[u8], + ) -> Result { + let bytes = Bytes::copy_from_slice(data); + + self + .writable + .send(bytes) + .await + .map_err(|_| Error::new(ErrorKind::BrokenPipe, "Channel closed"))?; + + Ok(data.len()) + } +} + +impl Resource for JSDuplexResource { + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); + + fn name(&self) -> Cow<'_, str> { + "JSDuplexResource".into() + } +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct StartJSTlsArgs { + ca_certs: Vec, + hostname: String, + alpn_protocols: Option>, + reject_unauthorized: Option, +} + +#[derive(Debug)] +pub struct JSStreamTlsResource { + rd: AsyncRefCell>, + wr: AsyncRefCell>, +} + +impl JSStreamTlsResource { + pub fn new( + (rd, wr): ( + TlsStreamRead, + TlsStreamWrite, + ), + ) -> Self { + Self { + rd: AsyncRefCell::new(rd), + wr: AsyncRefCell::new(wr), + } + } + + pub async fn handshake( + self: &Rc, + ) -> Result { + let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await; + + let handshake = wr.handshake().await?; + + let alpn_protocol = handshake.alpn.map(|alpn| alpn.into()); + let peer_certificates = handshake.peer_certificates.clone(); + let tls_info = TlsHandshakeInfo { + alpn_protocol, + peer_certificates, + }; + + Ok(tls_info) + } + + pub async fn read( + self: Rc, + data: &mut [u8], + ) -> Result { + use tokio::io::AsyncReadExt; + + let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; + rd.read(data).await + } + + pub async fn write( + self: Rc, + data: &[u8], + ) -> Result { + use tokio::io::AsyncWriteExt; + + let mut wr = RcRef::map(&self, |r| &r.wr).borrow_mut().await; + let nwritten = wr.write(data).await?; + wr.flush().await?; + Ok(nwritten) + } +} + +impl Resource for JSStreamTlsResource { + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); + + fn name(&self) -> Cow<'_, str> { + "JSStreamTlsResource".into() + } +} + +#[op2] +pub fn op_node_tls_start( + state: Rc>, + #[serde] args: StartJSTlsArgs, + #[buffer] output: &mut [u32], +) -> Result<(), NetError> { + let reject_unauthorized = args.reject_unauthorized.unwrap_or(true); + let hostname = match &*args.hostname { + "" => "localhost".to_string(), + n => n.to_string(), + }; + + assert_eq!(output.len(), 2); + + let ca_certs = args + .ca_certs + .into_iter() + .map(|s| s.into_bytes()) + .collect::>(); + + let hostname_dns = ServerName::try_from(hostname.to_string()) + .map_err(|_| NetError::InvalidHostname(hostname))?; + // --unsafely-ignore-certificate-errors overrides the `rejectUnauthorized` option. + let unsafely_ignore_certificate_errors = if reject_unauthorized { + state + .borrow() + .try_borrow::() + .and_then(|it| it.0.clone()) + } else { + Some(Vec::new()) + }; + + let root_cert_store = state + .borrow() + .borrow::() + .root_cert_store() + .map_err(NetError::RootCertStore)?; + + let (network_to_tls_tx, network_to_tls_rx) = + tokio::sync::mpsc::channel::(10); + let (tls_to_network_tx, tls_to_network_rx) = + tokio::sync::mpsc::channel::(10); + + let js_stream = JSStreamSocket::new(network_to_tls_rx, tls_to_network_tx); + + let tls_null = TlsKeysHolder::from(TlsKeys::Null); + let mut tls_config = create_client_config(TlsClientConfigOptions { + root_cert_store, + ca_certs, + unsafely_ignore_certificate_errors, + unsafely_disable_hostname_verification: false, + cert_chain_and_key: tls_null.take(), + socket_use: SocketUse::GeneralSsl, + })?; + + if let Some(alpn_protocols) = args.alpn_protocols { + tls_config.alpn_protocols = + alpn_protocols.into_iter().map(|s| s.into_bytes()).collect(); + } + + let tls_config = Arc::new(tls_config); + let tls_stream = TlsStream::new_client_side( + js_stream, + ClientConnection::new(tls_config, hostname_dns)?, + NonZeroUsize::new(65536), + ); + + let tls_resource = JSStreamTlsResource::new(tls_stream.into_split()); + let user_duplex = JSDuplexResource::new(tls_to_network_rx, network_to_tls_tx); + + let (tls_rid, duplex_rid) = { + let mut state = state.borrow_mut(); + let tls_rid = state.resource_table.add(tls_resource); + let duplex_rid = state.resource_table.add(user_duplex); + (tls_rid, duplex_rid) + }; + + output[0] = tls_rid; + output[1] = duplex_rid; + + Ok(()) +} + +#[op2(async)] +#[serde] +pub async fn op_node_tls_handshake( + state: Rc>, + #[smi] rid: ResourceId, +) -> Result { + let resource = state + .borrow() + .resource_table + .get::(rid) + .map_err(|_| NetError::ListenerClosed)?; + resource.handshake().await.map_err(Into::into) +} diff --git a/ext/node/polyfills/_tls_wrap.js b/ext/node/polyfills/_tls_wrap.js index c9e0e2960f..ed3388d52d 100644 --- a/ext/node/polyfills/_tls_wrap.js +++ b/ext/node/polyfills/_tls_wrap.js @@ -35,8 +35,10 @@ import { isArrayBufferView, } from "ext:deno_node/internal/util/types.ts"; import { startTlsInternal } from "ext:deno_net/02_tls.js"; -import { internals } from "ext:core/mod.js"; +import { core, internals } from "ext:core/mod.js"; import { + op_node_tls_handshake, + op_node_tls_start, op_tls_canonicalize_ipv4_address, op_tls_key_null, op_tls_key_static, @@ -47,6 +49,8 @@ const kIsVerified = Symbol("verified"); const kPendingSession = Symbol("pendingSession"); const kRes = Symbol("res"); +const tlsStreamRids = new Uint32Array(2); + let debug = debuglog("tls", (fn) => { debug = fn; }); @@ -160,10 +164,17 @@ export class TLSSocket extends net.Socket { /** Wraps the given socket and adds the tls capability to the underlying * handle */ - function _wrapHandle(tlsOptions, wrap) { + function _wrapHandle(tlsOptions, socket) { let handle; + let wrap; + + if (socket) { + if (socket instanceof net.Socket && socket._handle) { + wrap = socket; + } else { + wrap = new JSStreamSocket(socket); + } - if (wrap) { handle = wrap._handle; } @@ -188,13 +199,14 @@ export class TLSSocket extends net.Socket { } try { - const conn = await startTlsInternal( - handle[kStreamBaseField], + const conn = await startTls( + wrap, + handle, options, ); try { const hs = await conn.handshake(); - if (hs.alpnProtocol) { + if (hs?.alpnProtocol) { tlssock.alpnProtocol = hs.alpnProtocol; } else { tlssock.alpnProtocol = false; @@ -228,7 +240,7 @@ export class TLSSocket extends net.Socket { // An example usage of `_parentWrap` in npm module: // https://github.com/szmarczak/http2-wrapper/blob/51eeaf59ff9344fb192b092241bfda8506983620/source/utils/js-stream-socket.js#L6 handle._parent = handle; - handle._parentWrap = wrap; + handle._parentWrap = socket; return handle; } @@ -267,10 +279,75 @@ export class TLSSocket extends net.Socket { // TODO(kt3k): implement this } + setMaxSendFragment(_maxSendFragment) { + // TODO(littledivy): implement this + } + getPeerCertificate(detailed = false) { const conn = this[kHandle]?.[kStreamBaseField]; if (conn) return conn[internals.getPeerCertificate](detailed); } + + getCipher() { + return ""; + } +} + +class JSStreamSocket { + #rid; + + constructor(stream) { + this.stream = stream; + } + + init(options) { + op_node_tls_start(options, tlsStreamRids); + this.#rid = tlsStreamRids[0]; + const channelRid = tlsStreamRids[1]; + + this.stream.on("data", (data) => { + core.write(channelRid, data); + }); + + const buf = new Uint8Array(1024 * 16); + (async () => { + while (true) { + try { + const nread = await core.read(channelRid, buf); + this.stream.write(buf.slice(0, nread)); + } catch { + break; + } + } + })(); + + this.stream.on("close", () => { + core.close(this.#rid); + core.close(channelRid); + }); + } + + handshake() { + return op_node_tls_handshake(this.#rid); + } + + read(buf) { + return core.read(this.#rid, buf); + } + + write(data) { + return core.write(this.#rid, data); + } +} + +function startTls(wrap, handle, options) { + if (wrap instanceof JSStreamSocket) { + options.caCerts ??= []; + wrap.init(options); + return wrap; + } else { + return startTlsInternal(handle[kStreamBaseField], options); + } } function normalizeConnectArgs(listArgs) { diff --git a/ext/node/polyfills/internal_binding/stream_wrap.ts b/ext/node/polyfills/internal_binding/stream_wrap.ts index a00f964c48..095faa82d8 100644 --- a/ext/node/polyfills/internal_binding/stream_wrap.ts +++ b/ext/node/polyfills/internal_binding/stream_wrap.ts @@ -355,7 +355,6 @@ export class LibuvStreamWrap extends HandleWrap { let buf = this.#buf; let nread: number | null; - const ridBefore = this[kStreamBaseField]![internalRidSymbol]; if (this.upgrading) { // Starting an upgrade, stop reading. Upgrading will resume reading. @@ -363,6 +362,7 @@ export class LibuvStreamWrap extends HandleWrap { return; } + const ridBefore = this[kStreamBaseField]![internalRidSymbol]; try { if (this[kStreamBaseField]![_readWithCancelHandle]) { const { cancelHandle, nread: p } = this[kStreamBaseField]! diff --git a/ext/node/polyfills/net.ts b/ext/node/polyfills/net.ts index 69823811fb..d842fde9c2 100644 --- a/ext/node/polyfills/net.ts +++ b/ext/node/polyfills/net.ts @@ -1598,7 +1598,7 @@ Socket.prototype.destroySoon = function () { Socket.prototype._unrefTimer = function () { // deno-lint-ignore no-this-alias - for (let s = this; s !== null; s = s._parent) { + for (let s = this; s != null; s = s._parent) { if (s[kTimeout]) { s[kTimeout].refresh(); } @@ -1664,7 +1664,7 @@ Socket.prototype._destroy = function (exception, cb) { this.connecting = false; // deno-lint-ignore no-this-alias - for (let s = this; s !== null; s = s._parent) { + for (let s = this; s != null; s = s._parent) { clearTimeout(s[kTimeout]); } diff --git a/tests/unit_node/tls_test.ts b/tests/unit_node/tls_test.ts index 3dd8302b2a..01a03c7ae1 100644 --- a/tests/unit_node/tls_test.ts +++ b/tests/unit_node/tls_test.ts @@ -234,18 +234,6 @@ Deno.test("TLSSocket can construct without options", () => { new tls.TLSSocket(new stream.PassThrough() as any); }); -Deno.test("tlssocket._handle._parentWrap is set", () => { - // Note: This feature is used in popular 'http2-wrapper' module - // https://github.com/szmarczak/http2-wrapper/blob/51eeaf59ff9344fb192b092241bfda8506983620/source/utils/js-stream-socket.js#L6 - const parentWrap = - // deno-lint-ignore no-explicit-any - ((new tls.TLSSocket(new stream.PassThrough() as any, {}) as any) - // deno-lint-ignore no-explicit-any - ._handle as any)! - ._parentWrap; - assertInstanceOf(parentWrap, stream.PassThrough); -}); - Deno.test("tls.connect() throws InvalidData when there's error in certificate", async () => { // Uses execCode to avoid `--unsafely-ignore-certificate-errors` option applied const [status, output] = await execCode(` @@ -319,6 +307,58 @@ Deno.test("tls connect upgrade tcp", async () => { socket.destroy(); }); +Deno.test("tlssocket._handle._parentWrap is set", () => { + // Note: This feature is used in popular 'http2-wrapper' module + // https://github.com/szmarczak/http2-wrapper/blob/51eeaf59ff9344fb192b092241bfda8506983620/source/utils/js-stream-socket.js#L6 + const parentWrap = + // deno-lint-ignore no-explicit-any + ((new tls.TLSSocket(new stream.PassThrough() as any, {}) as any) + // deno-lint-ignore no-explicit-any + ._handle as any)! + ._parentWrap; + assertInstanceOf(parentWrap, stream.PassThrough); +}); + +Deno.test({ + name: "tls connect upgrade js socket wrapper", + sanitizeOps: false, + sanitizeResources: false, +}, async () => { + const { promise, resolve } = Promise.withResolvers(); + + class SocketWrapper extends stream.Duplex { + socket: net.Socket; + + constructor() { + super(); + this.socket = new net.Socket(); + } + + // deno-lint-ignore no-explicit-any + override _write(chunk: any, encoding: any, callback: any) { + this.socket.write(chunk, encoding, callback); + } + + override _read() { + } + + connect(port: number, host: string) { + this.socket.connect(port, host); + this.socket.on("data", (data) => this.push(data)); + this.socket.on("end", () => this.push(null)); + } + } + + const socket = new SocketWrapper(); + socket.connect(443, "google.com"); + + const secure = tls.connect({ socket, host: "google.com" }); + secure.on("secureConnect", () => resolve()); + + await promise; + socket.destroy(); +}); + Deno.test({ name: "[node/tls] tls.Server.unref() works", ignore: Deno.build.os === "windows",