mirror of
https://github.com/denoland/deno.git
synced 2025-09-26 20:29:11 +00:00
fix(ext/node): support JS underlying stream in TLS (#30465)
Some checks are pending
ci / pre-build (push) Waiting to run
ci / test debug linux-aarch64 (push) Blocked by required conditions
ci / test release linux-aarch64 (push) Blocked by required conditions
ci / test debug macos-aarch64 (push) Blocked by required conditions
ci / test release macos-aarch64 (push) Blocked by required conditions
ci / bench release linux-x86_64 (push) Blocked by required conditions
ci / lint debug linux-x86_64 (push) Blocked by required conditions
ci / lint debug macos-x86_64 (push) Blocked by required conditions
ci / lint debug windows-x86_64 (push) Blocked by required conditions
ci / test debug linux-x86_64 (push) Blocked by required conditions
ci / test release linux-x86_64 (push) Blocked by required conditions
ci / test debug macos-x86_64 (push) Blocked by required conditions
ci / test release macos-x86_64 (push) Blocked by required conditions
ci / test debug windows-x86_64 (push) Blocked by required conditions
ci / test release windows-x86_64 (push) Blocked by required conditions
ci / build libs (push) Blocked by required conditions
ci / publish canary (push) Blocked by required conditions
Some checks are pending
ci / pre-build (push) Waiting to run
ci / test debug linux-aarch64 (push) Blocked by required conditions
ci / test release linux-aarch64 (push) Blocked by required conditions
ci / test debug macos-aarch64 (push) Blocked by required conditions
ci / test release macos-aarch64 (push) Blocked by required conditions
ci / bench release linux-x86_64 (push) Blocked by required conditions
ci / lint debug linux-x86_64 (push) Blocked by required conditions
ci / lint debug macos-x86_64 (push) Blocked by required conditions
ci / lint debug windows-x86_64 (push) Blocked by required conditions
ci / test debug linux-x86_64 (push) Blocked by required conditions
ci / test release linux-x86_64 (push) Blocked by required conditions
ci / test debug macos-x86_64 (push) Blocked by required conditions
ci / test release macos-x86_64 (push) Blocked by required conditions
ci / test debug windows-x86_64 (push) Blocked by required conditions
ci / test release windows-x86_64 (push) Blocked by required conditions
ci / build libs (push) Blocked by required conditions
ci / publish canary (push) Blocked by required conditions
Fixes https://github.com/denoland/deno/issues/20594 This implements `JSStreamSocket` which drives the TLS underlying stream in `rustls_tokio_stream` using 2 sets of channels. One for piping the encrypted protocol transport and the other for plaintext application data. This fixes connecting to `npm:mssql`: ```js import sql from "npm:mssql"; const sqlConfig = { server: "localhost", user: "divy", password: "123", database: "master", options: { trustServerCertificate: true, }, }; const pool = await sql.connect(sqlConfig); const result = await pool.request().query(`SELECT * FROM sys.databases`); ```
This commit is contained in:
parent
c217928649
commit
36e9eb2023
9 changed files with 651 additions and 25 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
@ -2403,6 +2403,7 @@ dependencies = [
|
||||||
"deno_permissions",
|
"deno_permissions",
|
||||||
"deno_process",
|
"deno_process",
|
||||||
"deno_subprocess_windows",
|
"deno_subprocess_windows",
|
||||||
|
"deno_tls",
|
||||||
"deno_whoami",
|
"deno_whoami",
|
||||||
"der",
|
"der",
|
||||||
"digest",
|
"digest",
|
||||||
|
@ -2442,6 +2443,7 @@ dependencies = [
|
||||||
"ripemd",
|
"ripemd",
|
||||||
"rsa",
|
"rsa",
|
||||||
"rusqlite",
|
"rusqlite",
|
||||||
|
"rustls-tokio-stream",
|
||||||
"scrypt",
|
"scrypt",
|
||||||
"sec1",
|
"sec1",
|
||||||
"serde",
|
"serde",
|
||||||
|
|
|
@ -40,6 +40,7 @@ deno_package_json.workspace = true
|
||||||
deno_path_util.workspace = true
|
deno_path_util.workspace = true
|
||||||
deno_permissions.workspace = true
|
deno_permissions.workspace = true
|
||||||
deno_process.workspace = true
|
deno_process.workspace = true
|
||||||
|
deno_tls.workspace = true
|
||||||
deno_whoami.workspace = true
|
deno_whoami.workspace = true
|
||||||
der = { workspace = true, features = ["derive"] }
|
der = { workspace = true, features = ["derive"] }
|
||||||
digest = { workspace = true, features = ["core-api", "std"] }
|
digest = { workspace = true, features = ["core-api", "std"] }
|
||||||
|
@ -78,6 +79,7 @@ rand.workspace = true
|
||||||
ripemd = { workspace = true, features = ["oid"] }
|
ripemd = { workspace = true, features = ["oid"] }
|
||||||
rsa.workspace = true
|
rsa.workspace = true
|
||||||
rusqlite.workspace = true
|
rusqlite.workspace = true
|
||||||
|
rustls-tokio-stream.workspace = true
|
||||||
scrypt.workspace = true
|
scrypt.workspace = true
|
||||||
sec1.workspace = true
|
sec1.workspace = true
|
||||||
serde.workspace = true
|
serde.workspace = true
|
||||||
|
|
|
@ -28,6 +28,3 @@ disallowed-methods = [
|
||||||
{ path = "std::fs::symlink_metadata", reason = "File system operations should be done using FileSystem trait" },
|
{ path = "std::fs::symlink_metadata", reason = "File system operations should be done using FileSystem trait" },
|
||||||
{ path = "std::fs::write", reason = "File system operations should be done using FileSystem trait" },
|
{ path = "std::fs::write", reason = "File system operations should be done using FileSystem trait" },
|
||||||
]
|
]
|
||||||
disallowed-types = [
|
|
||||||
{ path = "std::sync::Arc", reason = "use deno_fs::sync::MaybeArc instead" },
|
|
||||||
]
|
|
||||||
|
|
|
@ -460,6 +460,8 @@ deno_core::extension!(deno_node,
|
||||||
ops::tls::op_get_root_certificates,
|
ops::tls::op_get_root_certificates,
|
||||||
ops::tls::op_tls_peer_certificate,
|
ops::tls::op_tls_peer_certificate,
|
||||||
ops::tls::op_tls_canonicalize_ipv4_address,
|
ops::tls::op_tls_canonicalize_ipv4_address,
|
||||||
|
ops::tls::op_node_tls_start,
|
||||||
|
ops::tls::op_node_tls_handshake,
|
||||||
ops::inspector::op_inspector_open<P>,
|
ops::inspector::op_inspector_open<P>,
|
||||||
ops::inspector::op_inspector_close,
|
ops::inspector::op_inspector_close,
|
||||||
ops::inspector::op_inspector_url,
|
ops::inspector::op_inspector_url,
|
||||||
|
|
|
@ -1,9 +1,46 @@
|
||||||
// Copyright 2018-2025 the Deno authors. MIT license.
|
// Copyright 2018-2025 the Deno authors. MIT license.
|
||||||
|
use std::borrow::Cow;
|
||||||
|
use std::cell::RefCell;
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
use std::future::Future;
|
||||||
|
use std::io::Error;
|
||||||
|
use std::io::ErrorKind;
|
||||||
|
use std::num::NonZeroUsize;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::rc::Rc;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::Mutex;
|
||||||
|
use std::sync::atomic::AtomicBool;
|
||||||
|
use std::sync::atomic::Ordering;
|
||||||
|
use std::task::Context;
|
||||||
|
use std::task::Poll;
|
||||||
|
|
||||||
use base64::Engine;
|
use base64::Engine;
|
||||||
|
use bytes::Bytes;
|
||||||
|
use deno_core::AsyncRefCell;
|
||||||
|
use deno_core::AsyncResult;
|
||||||
use deno_core::OpState;
|
use deno_core::OpState;
|
||||||
|
use deno_core::RcRef;
|
||||||
|
use deno_core::Resource;
|
||||||
use deno_core::ResourceId;
|
use deno_core::ResourceId;
|
||||||
use deno_core::op2;
|
use deno_core::op2;
|
||||||
|
use deno_net::DefaultTlsOptions;
|
||||||
|
use deno_net::UnsafelyIgnoreCertificateErrors;
|
||||||
|
use deno_net::ops::NetError;
|
||||||
|
use deno_net::ops::TlsHandshakeInfo;
|
||||||
use deno_net::ops_tls::TlsStreamResource;
|
use deno_net::ops_tls::TlsStreamResource;
|
||||||
|
use deno_tls::SocketUse;
|
||||||
|
use deno_tls::TlsClientConfigOptions;
|
||||||
|
use deno_tls::TlsKeys;
|
||||||
|
use deno_tls::TlsKeysHolder;
|
||||||
|
use deno_tls::create_client_config;
|
||||||
|
use deno_tls::rustls::ClientConnection;
|
||||||
|
use deno_tls::rustls::pki_types::ServerName;
|
||||||
|
use rustls_tokio_stream::TlsStream;
|
||||||
|
use rustls_tokio_stream::TlsStreamRead;
|
||||||
|
use rustls_tokio_stream::TlsStreamWrite;
|
||||||
|
use rustls_tokio_stream::UnderlyingStream;
|
||||||
|
use serde::Deserialize;
|
||||||
use webpki_root_certs;
|
use webpki_root_certs;
|
||||||
|
|
||||||
use super::crypto::x509::Certificate;
|
use super::crypto::x509::Certificate;
|
||||||
|
@ -68,3 +105,472 @@ pub fn op_tls_canonicalize_ipv4_address(
|
||||||
|
|
||||||
Some(canonical_ip)
|
Some(canonical_ip)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct ReadableFuture<'a> {
|
||||||
|
socket: &'a JSStreamSocket,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Future for ReadableFuture<'a> {
|
||||||
|
type Output = std::io::Result<()>;
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
self.socket.poll_read_ready(cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct WritableFuture<'a> {
|
||||||
|
socket: &'a JSStreamSocket,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Future for WritableFuture<'a> {
|
||||||
|
type Output = std::io::Result<()>;
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
self.socket.poll_write_ready(cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct JSStreamSocket {
|
||||||
|
readable: Arc<Mutex<tokio::sync::mpsc::Receiver<Bytes>>>,
|
||||||
|
writable: tokio::sync::mpsc::Sender<Bytes>,
|
||||||
|
read_buffer: Arc<Mutex<VecDeque<Bytes>>>,
|
||||||
|
closed: AtomicBool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JSStreamSocket {
|
||||||
|
pub fn new(
|
||||||
|
readable: tokio::sync::mpsc::Receiver<Bytes>,
|
||||||
|
writable: tokio::sync::mpsc::Sender<Bytes>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
readable: Arc::new(Mutex::new(readable)),
|
||||||
|
writable,
|
||||||
|
read_buffer: Arc::new(Mutex::new(VecDeque::new())),
|
||||||
|
closed: AtomicBool::new(false),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UnderlyingStream for JSStreamSocket {
|
||||||
|
type StdType = ();
|
||||||
|
|
||||||
|
fn poll_read_ready(
|
||||||
|
&self,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
) -> std::task::Poll<std::io::Result<()>> {
|
||||||
|
// Check if we have buffered data
|
||||||
|
if let Ok(buffer) = self.read_buffer.lock()
|
||||||
|
&& !buffer.is_empty()
|
||||||
|
{
|
||||||
|
return Poll::Ready(Ok(()));
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.closed.load(Ordering::Relaxed) {
|
||||||
|
return Poll::Ready(Err(Error::new(
|
||||||
|
ErrorKind::UnexpectedEof,
|
||||||
|
"Stream closed",
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to poll for data without consuming it
|
||||||
|
if let Ok(mut receiver) = self.readable.lock() {
|
||||||
|
match receiver.poll_recv(cx) {
|
||||||
|
Poll::Ready(Some(data)) => {
|
||||||
|
// Store the data in buffer for try_read
|
||||||
|
if let Ok(mut buffer) = self.read_buffer.lock() {
|
||||||
|
buffer.push_back(data);
|
||||||
|
}
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
Poll::Ready(None) => {
|
||||||
|
// Channel closed
|
||||||
|
self.closed.store(true, Ordering::Relaxed);
|
||||||
|
Poll::Ready(Err(Error::new(
|
||||||
|
ErrorKind::UnexpectedEof,
|
||||||
|
"Channel closed",
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
Poll::Pending => Poll::Pending,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
panic!("Failed to acquire lock")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_write_ready(
|
||||||
|
&self,
|
||||||
|
_cx: &mut std::task::Context<'_>,
|
||||||
|
) -> std::task::Poll<std::io::Result<()>> {
|
||||||
|
if self.closed.load(Ordering::Relaxed) {
|
||||||
|
return Poll::Ready(Err(Error::new(
|
||||||
|
ErrorKind::BrokenPipe,
|
||||||
|
"Stream closed",
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
// For bounded sender, check if channel is ready
|
||||||
|
if self.writable.is_closed() {
|
||||||
|
self.closed.store(true, Ordering::Relaxed);
|
||||||
|
Poll::Ready(Err(Error::new(ErrorKind::BrokenPipe, "Channel closed")))
|
||||||
|
} else {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_read(&self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||||
|
if self.closed.load(Ordering::Relaxed) {
|
||||||
|
return Err(Error::new(ErrorKind::UnexpectedEof, "Stream closed"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we have buffered data first
|
||||||
|
if let Ok(mut buffer) = self.read_buffer.lock()
|
||||||
|
&& let Some(data) = buffer.pop_front()
|
||||||
|
{
|
||||||
|
let len = std::cmp::min(buf.len(), data.len());
|
||||||
|
buf[..len].copy_from_slice(&data[..len]);
|
||||||
|
|
||||||
|
// If there's leftover data, put it back in the buffer
|
||||||
|
if data.len() > len {
|
||||||
|
buffer.push_front(data.slice(len..));
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok(len);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to read from channel non-blocking
|
||||||
|
if let Ok(mut receiver) = self.readable.lock() {
|
||||||
|
match receiver.try_recv() {
|
||||||
|
Ok(data) => {
|
||||||
|
let len = std::cmp::min(buf.len(), data.len());
|
||||||
|
buf[..len].copy_from_slice(&data[..len]);
|
||||||
|
|
||||||
|
// If there's leftover data, store it in buffer
|
||||||
|
if data.len() > len
|
||||||
|
&& let Ok(mut buffer) = self.read_buffer.lock()
|
||||||
|
{
|
||||||
|
buffer.push_front(data.slice(len..));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(len)
|
||||||
|
}
|
||||||
|
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
|
||||||
|
Err(Error::new(ErrorKind::WouldBlock, "No data available"))
|
||||||
|
}
|
||||||
|
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
|
||||||
|
self.closed.store(true, Ordering::Relaxed);
|
||||||
|
Err(Error::new(ErrorKind::UnexpectedEof, "Channel closed"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Err(Error::other("Failed to acquire lock"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_write(&self, buf: &[u8]) -> std::io::Result<usize> {
|
||||||
|
if self.closed.load(Ordering::Relaxed) {
|
||||||
|
return Err(Error::new(ErrorKind::BrokenPipe, "Stream closed"));
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.writable.is_closed() {
|
||||||
|
self.closed.store(true, Ordering::Relaxed);
|
||||||
|
return Err(Error::new(ErrorKind::BrokenPipe, "Channel closed"));
|
||||||
|
}
|
||||||
|
|
||||||
|
let data = Bytes::copy_from_slice(buf);
|
||||||
|
match self.writable.try_send(data) {
|
||||||
|
Ok(()) => Ok(buf.len()),
|
||||||
|
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
|
||||||
|
Err(Error::new(ErrorKind::WouldBlock, "Channel full"))
|
||||||
|
}
|
||||||
|
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
|
||||||
|
self.closed.store(true, Ordering::Relaxed);
|
||||||
|
Err(Error::new(ErrorKind::BrokenPipe, "Channel closed"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn readable(&self) -> impl Future<Output = std::io::Result<()>> + Send {
|
||||||
|
ReadableFuture { socket: self }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn writable(&self) -> impl Future<Output = std::io::Result<()>> + Send {
|
||||||
|
WritableFuture { socket: self }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn shutdown(&self, _: std::net::Shutdown) -> std::io::Result<()> {
|
||||||
|
self.closed.store(true, Ordering::Relaxed);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn into_std(self) -> Option<std::io::Result<Self::StdType>> {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct JSDuplexResource {
|
||||||
|
readable: Arc<Mutex<tokio::sync::mpsc::Receiver<Bytes>>>,
|
||||||
|
writable: tokio::sync::mpsc::Sender<Bytes>,
|
||||||
|
read_buffer: Arc<Mutex<VecDeque<Bytes>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JSDuplexResource {
|
||||||
|
pub fn new(
|
||||||
|
readable: tokio::sync::mpsc::Receiver<Bytes>,
|
||||||
|
writable: tokio::sync::mpsc::Sender<Bytes>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
readable: Arc::new(Mutex::new(readable)),
|
||||||
|
writable,
|
||||||
|
read_buffer: Arc::new(Mutex::new(VecDeque::new())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::await_holding_lock)]
|
||||||
|
pub async fn read(
|
||||||
|
self: Rc<Self>,
|
||||||
|
data: &mut [u8],
|
||||||
|
) -> Result<usize, std::io::Error> {
|
||||||
|
// First check if we have buffered data from previous partial read
|
||||||
|
if let Ok(mut buffer) = self.read_buffer.lock()
|
||||||
|
&& let Some(buffered_data) = buffer.pop_front()
|
||||||
|
{
|
||||||
|
let len = std::cmp::min(data.len(), buffered_data.len());
|
||||||
|
data[..len].copy_from_slice(&buffered_data[..len]);
|
||||||
|
|
||||||
|
// If there's remaining data, put it back in buffer
|
||||||
|
if buffered_data.len() > len {
|
||||||
|
buffer.push_front(buffered_data.slice(len..));
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok(len);
|
||||||
|
}
|
||||||
|
|
||||||
|
// No buffered data, receive new data from channel
|
||||||
|
let bytes = {
|
||||||
|
let mut receiver = self
|
||||||
|
.readable
|
||||||
|
.lock()
|
||||||
|
.map_err(|_| Error::other("Failed to acquire lock"))?;
|
||||||
|
receiver.recv().await
|
||||||
|
};
|
||||||
|
|
||||||
|
match bytes {
|
||||||
|
Some(bytes) => {
|
||||||
|
let len = std::cmp::min(data.len(), bytes.len());
|
||||||
|
data[..len].copy_from_slice(&bytes[..len]);
|
||||||
|
|
||||||
|
// If there's remaining data, buffer it for next read
|
||||||
|
if bytes.len() > len
|
||||||
|
&& let Ok(mut buffer) = self.read_buffer.lock()
|
||||||
|
{
|
||||||
|
buffer.push_back(bytes.slice(len..));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(len)
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// Channel closed
|
||||||
|
Ok(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn write(
|
||||||
|
self: Rc<Self>,
|
||||||
|
data: &[u8],
|
||||||
|
) -> Result<usize, std::io::Error> {
|
||||||
|
let bytes = Bytes::copy_from_slice(data);
|
||||||
|
|
||||||
|
self
|
||||||
|
.writable
|
||||||
|
.send(bytes)
|
||||||
|
.await
|
||||||
|
.map_err(|_| Error::new(ErrorKind::BrokenPipe, "Channel closed"))?;
|
||||||
|
|
||||||
|
Ok(data.len())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Resource for JSDuplexResource {
|
||||||
|
deno_core::impl_readable_byob!();
|
||||||
|
deno_core::impl_writable!();
|
||||||
|
|
||||||
|
fn name(&self) -> Cow<'_, str> {
|
||||||
|
"JSDuplexResource".into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct StartJSTlsArgs {
|
||||||
|
ca_certs: Vec<String>,
|
||||||
|
hostname: String,
|
||||||
|
alpn_protocols: Option<Vec<String>>,
|
||||||
|
reject_unauthorized: Option<bool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct JSStreamTlsResource {
|
||||||
|
rd: AsyncRefCell<TlsStreamRead<JSStreamSocket>>,
|
||||||
|
wr: AsyncRefCell<TlsStreamWrite<JSStreamSocket>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JSStreamTlsResource {
|
||||||
|
pub fn new(
|
||||||
|
(rd, wr): (
|
||||||
|
TlsStreamRead<JSStreamSocket>,
|
||||||
|
TlsStreamWrite<JSStreamSocket>,
|
||||||
|
),
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
rd: AsyncRefCell::new(rd),
|
||||||
|
wr: AsyncRefCell::new(wr),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handshake(
|
||||||
|
self: &Rc<Self>,
|
||||||
|
) -> Result<TlsHandshakeInfo, std::io::Error> {
|
||||||
|
let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await;
|
||||||
|
|
||||||
|
let handshake = wr.handshake().await?;
|
||||||
|
|
||||||
|
let alpn_protocol = handshake.alpn.map(|alpn| alpn.into());
|
||||||
|
let peer_certificates = handshake.peer_certificates.clone();
|
||||||
|
let tls_info = TlsHandshakeInfo {
|
||||||
|
alpn_protocol,
|
||||||
|
peer_certificates,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(tls_info)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn read(
|
||||||
|
self: Rc<Self>,
|
||||||
|
data: &mut [u8],
|
||||||
|
) -> Result<usize, std::io::Error> {
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
|
let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
|
||||||
|
rd.read(data).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn write(
|
||||||
|
self: Rc<Self>,
|
||||||
|
data: &[u8],
|
||||||
|
) -> Result<usize, std::io::Error> {
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
let mut wr = RcRef::map(&self, |r| &r.wr).borrow_mut().await;
|
||||||
|
let nwritten = wr.write(data).await?;
|
||||||
|
wr.flush().await?;
|
||||||
|
Ok(nwritten)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Resource for JSStreamTlsResource {
|
||||||
|
deno_core::impl_readable_byob!();
|
||||||
|
deno_core::impl_writable!();
|
||||||
|
|
||||||
|
fn name(&self) -> Cow<'_, str> {
|
||||||
|
"JSStreamTlsResource".into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[op2]
|
||||||
|
pub fn op_node_tls_start(
|
||||||
|
state: Rc<RefCell<OpState>>,
|
||||||
|
#[serde] args: StartJSTlsArgs,
|
||||||
|
#[buffer] output: &mut [u32],
|
||||||
|
) -> Result<(), NetError> {
|
||||||
|
let reject_unauthorized = args.reject_unauthorized.unwrap_or(true);
|
||||||
|
let hostname = match &*args.hostname {
|
||||||
|
"" => "localhost".to_string(),
|
||||||
|
n => n.to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_eq!(output.len(), 2);
|
||||||
|
|
||||||
|
let ca_certs = args
|
||||||
|
.ca_certs
|
||||||
|
.into_iter()
|
||||||
|
.map(|s| s.into_bytes())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let hostname_dns = ServerName::try_from(hostname.to_string())
|
||||||
|
.map_err(|_| NetError::InvalidHostname(hostname))?;
|
||||||
|
// --unsafely-ignore-certificate-errors overrides the `rejectUnauthorized` option.
|
||||||
|
let unsafely_ignore_certificate_errors = if reject_unauthorized {
|
||||||
|
state
|
||||||
|
.borrow()
|
||||||
|
.try_borrow::<UnsafelyIgnoreCertificateErrors>()
|
||||||
|
.and_then(|it| it.0.clone())
|
||||||
|
} else {
|
||||||
|
Some(Vec::new())
|
||||||
|
};
|
||||||
|
|
||||||
|
let root_cert_store = state
|
||||||
|
.borrow()
|
||||||
|
.borrow::<DefaultTlsOptions>()
|
||||||
|
.root_cert_store()
|
||||||
|
.map_err(NetError::RootCertStore)?;
|
||||||
|
|
||||||
|
let (network_to_tls_tx, network_to_tls_rx) =
|
||||||
|
tokio::sync::mpsc::channel::<Bytes>(10);
|
||||||
|
let (tls_to_network_tx, tls_to_network_rx) =
|
||||||
|
tokio::sync::mpsc::channel::<Bytes>(10);
|
||||||
|
|
||||||
|
let js_stream = JSStreamSocket::new(network_to_tls_rx, tls_to_network_tx);
|
||||||
|
|
||||||
|
let tls_null = TlsKeysHolder::from(TlsKeys::Null);
|
||||||
|
let mut tls_config = create_client_config(TlsClientConfigOptions {
|
||||||
|
root_cert_store,
|
||||||
|
ca_certs,
|
||||||
|
unsafely_ignore_certificate_errors,
|
||||||
|
unsafely_disable_hostname_verification: false,
|
||||||
|
cert_chain_and_key: tls_null.take(),
|
||||||
|
socket_use: SocketUse::GeneralSsl,
|
||||||
|
})?;
|
||||||
|
|
||||||
|
if let Some(alpn_protocols) = args.alpn_protocols {
|
||||||
|
tls_config.alpn_protocols =
|
||||||
|
alpn_protocols.into_iter().map(|s| s.into_bytes()).collect();
|
||||||
|
}
|
||||||
|
|
||||||
|
let tls_config = Arc::new(tls_config);
|
||||||
|
let tls_stream = TlsStream::new_client_side(
|
||||||
|
js_stream,
|
||||||
|
ClientConnection::new(tls_config, hostname_dns)?,
|
||||||
|
NonZeroUsize::new(65536),
|
||||||
|
);
|
||||||
|
|
||||||
|
let tls_resource = JSStreamTlsResource::new(tls_stream.into_split());
|
||||||
|
let user_duplex = JSDuplexResource::new(tls_to_network_rx, network_to_tls_tx);
|
||||||
|
|
||||||
|
let (tls_rid, duplex_rid) = {
|
||||||
|
let mut state = state.borrow_mut();
|
||||||
|
let tls_rid = state.resource_table.add(tls_resource);
|
||||||
|
let duplex_rid = state.resource_table.add(user_duplex);
|
||||||
|
(tls_rid, duplex_rid)
|
||||||
|
};
|
||||||
|
|
||||||
|
output[0] = tls_rid;
|
||||||
|
output[1] = duplex_rid;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[op2(async)]
|
||||||
|
#[serde]
|
||||||
|
pub async fn op_node_tls_handshake(
|
||||||
|
state: Rc<RefCell<OpState>>,
|
||||||
|
#[smi] rid: ResourceId,
|
||||||
|
) -> Result<TlsHandshakeInfo, NetError> {
|
||||||
|
let resource = state
|
||||||
|
.borrow()
|
||||||
|
.resource_table
|
||||||
|
.get::<JSStreamTlsResource>(rid)
|
||||||
|
.map_err(|_| NetError::ListenerClosed)?;
|
||||||
|
resource.handshake().await.map_err(Into::into)
|
||||||
|
}
|
||||||
|
|
|
@ -35,8 +35,10 @@ import {
|
||||||
isArrayBufferView,
|
isArrayBufferView,
|
||||||
} from "ext:deno_node/internal/util/types.ts";
|
} from "ext:deno_node/internal/util/types.ts";
|
||||||
import { startTlsInternal } from "ext:deno_net/02_tls.js";
|
import { startTlsInternal } from "ext:deno_net/02_tls.js";
|
||||||
import { internals } from "ext:core/mod.js";
|
import { core, internals } from "ext:core/mod.js";
|
||||||
import {
|
import {
|
||||||
|
op_node_tls_handshake,
|
||||||
|
op_node_tls_start,
|
||||||
op_tls_canonicalize_ipv4_address,
|
op_tls_canonicalize_ipv4_address,
|
||||||
op_tls_key_null,
|
op_tls_key_null,
|
||||||
op_tls_key_static,
|
op_tls_key_static,
|
||||||
|
@ -47,6 +49,8 @@ const kIsVerified = Symbol("verified");
|
||||||
const kPendingSession = Symbol("pendingSession");
|
const kPendingSession = Symbol("pendingSession");
|
||||||
const kRes = Symbol("res");
|
const kRes = Symbol("res");
|
||||||
|
|
||||||
|
const tlsStreamRids = new Uint32Array(2);
|
||||||
|
|
||||||
let debug = debuglog("tls", (fn) => {
|
let debug = debuglog("tls", (fn) => {
|
||||||
debug = fn;
|
debug = fn;
|
||||||
});
|
});
|
||||||
|
@ -160,10 +164,17 @@ export class TLSSocket extends net.Socket {
|
||||||
|
|
||||||
/** Wraps the given socket and adds the tls capability to the underlying
|
/** Wraps the given socket and adds the tls capability to the underlying
|
||||||
* handle */
|
* handle */
|
||||||
function _wrapHandle(tlsOptions, wrap) {
|
function _wrapHandle(tlsOptions, socket) {
|
||||||
let handle;
|
let handle;
|
||||||
|
let wrap;
|
||||||
|
|
||||||
|
if (socket) {
|
||||||
|
if (socket instanceof net.Socket && socket._handle) {
|
||||||
|
wrap = socket;
|
||||||
|
} else {
|
||||||
|
wrap = new JSStreamSocket(socket);
|
||||||
|
}
|
||||||
|
|
||||||
if (wrap) {
|
|
||||||
handle = wrap._handle;
|
handle = wrap._handle;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,13 +199,14 @@ export class TLSSocket extends net.Socket {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const conn = await startTlsInternal(
|
const conn = await startTls(
|
||||||
handle[kStreamBaseField],
|
wrap,
|
||||||
|
handle,
|
||||||
options,
|
options,
|
||||||
);
|
);
|
||||||
try {
|
try {
|
||||||
const hs = await conn.handshake();
|
const hs = await conn.handshake();
|
||||||
if (hs.alpnProtocol) {
|
if (hs?.alpnProtocol) {
|
||||||
tlssock.alpnProtocol = hs.alpnProtocol;
|
tlssock.alpnProtocol = hs.alpnProtocol;
|
||||||
} else {
|
} else {
|
||||||
tlssock.alpnProtocol = false;
|
tlssock.alpnProtocol = false;
|
||||||
|
@ -228,7 +240,7 @@ export class TLSSocket extends net.Socket {
|
||||||
// An example usage of `_parentWrap` in npm module:
|
// An example usage of `_parentWrap` in npm module:
|
||||||
// https://github.com/szmarczak/http2-wrapper/blob/51eeaf59ff9344fb192b092241bfda8506983620/source/utils/js-stream-socket.js#L6
|
// https://github.com/szmarczak/http2-wrapper/blob/51eeaf59ff9344fb192b092241bfda8506983620/source/utils/js-stream-socket.js#L6
|
||||||
handle._parent = handle;
|
handle._parent = handle;
|
||||||
handle._parentWrap = wrap;
|
handle._parentWrap = socket;
|
||||||
|
|
||||||
return handle;
|
return handle;
|
||||||
}
|
}
|
||||||
|
@ -267,10 +279,75 @@ export class TLSSocket extends net.Socket {
|
||||||
// TODO(kt3k): implement this
|
// TODO(kt3k): implement this
|
||||||
}
|
}
|
||||||
|
|
||||||
|
setMaxSendFragment(_maxSendFragment) {
|
||||||
|
// TODO(littledivy): implement this
|
||||||
|
}
|
||||||
|
|
||||||
getPeerCertificate(detailed = false) {
|
getPeerCertificate(detailed = false) {
|
||||||
const conn = this[kHandle]?.[kStreamBaseField];
|
const conn = this[kHandle]?.[kStreamBaseField];
|
||||||
if (conn) return conn[internals.getPeerCertificate](detailed);
|
if (conn) return conn[internals.getPeerCertificate](detailed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
getCipher() {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class JSStreamSocket {
|
||||||
|
#rid;
|
||||||
|
|
||||||
|
constructor(stream) {
|
||||||
|
this.stream = stream;
|
||||||
|
}
|
||||||
|
|
||||||
|
init(options) {
|
||||||
|
op_node_tls_start(options, tlsStreamRids);
|
||||||
|
this.#rid = tlsStreamRids[0];
|
||||||
|
const channelRid = tlsStreamRids[1];
|
||||||
|
|
||||||
|
this.stream.on("data", (data) => {
|
||||||
|
core.write(channelRid, data);
|
||||||
|
});
|
||||||
|
|
||||||
|
const buf = new Uint8Array(1024 * 16);
|
||||||
|
(async () => {
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
const nread = await core.read(channelRid, buf);
|
||||||
|
this.stream.write(buf.slice(0, nread));
|
||||||
|
} catch {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
this.stream.on("close", () => {
|
||||||
|
core.close(this.#rid);
|
||||||
|
core.close(channelRid);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
handshake() {
|
||||||
|
return op_node_tls_handshake(this.#rid);
|
||||||
|
}
|
||||||
|
|
||||||
|
read(buf) {
|
||||||
|
return core.read(this.#rid, buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
write(data) {
|
||||||
|
return core.write(this.#rid, data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function startTls(wrap, handle, options) {
|
||||||
|
if (wrap instanceof JSStreamSocket) {
|
||||||
|
options.caCerts ??= [];
|
||||||
|
wrap.init(options);
|
||||||
|
return wrap;
|
||||||
|
} else {
|
||||||
|
return startTlsInternal(handle[kStreamBaseField], options);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function normalizeConnectArgs(listArgs) {
|
function normalizeConnectArgs(listArgs) {
|
||||||
|
|
|
@ -355,7 +355,6 @@ export class LibuvStreamWrap extends HandleWrap {
|
||||||
let buf = this.#buf;
|
let buf = this.#buf;
|
||||||
|
|
||||||
let nread: number | null;
|
let nread: number | null;
|
||||||
const ridBefore = this[kStreamBaseField]![internalRidSymbol];
|
|
||||||
|
|
||||||
if (this.upgrading) {
|
if (this.upgrading) {
|
||||||
// Starting an upgrade, stop reading. Upgrading will resume reading.
|
// Starting an upgrade, stop reading. Upgrading will resume reading.
|
||||||
|
@ -363,6 +362,7 @@ export class LibuvStreamWrap extends HandleWrap {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const ridBefore = this[kStreamBaseField]![internalRidSymbol];
|
||||||
try {
|
try {
|
||||||
if (this[kStreamBaseField]![_readWithCancelHandle]) {
|
if (this[kStreamBaseField]![_readWithCancelHandle]) {
|
||||||
const { cancelHandle, nread: p } = this[kStreamBaseField]!
|
const { cancelHandle, nread: p } = this[kStreamBaseField]!
|
||||||
|
|
|
@ -1598,7 +1598,7 @@ Socket.prototype.destroySoon = function () {
|
||||||
|
|
||||||
Socket.prototype._unrefTimer = function () {
|
Socket.prototype._unrefTimer = function () {
|
||||||
// deno-lint-ignore no-this-alias
|
// deno-lint-ignore no-this-alias
|
||||||
for (let s = this; s !== null; s = s._parent) {
|
for (let s = this; s != null; s = s._parent) {
|
||||||
if (s[kTimeout]) {
|
if (s[kTimeout]) {
|
||||||
s[kTimeout].refresh();
|
s[kTimeout].refresh();
|
||||||
}
|
}
|
||||||
|
@ -1664,7 +1664,7 @@ Socket.prototype._destroy = function (exception, cb) {
|
||||||
this.connecting = false;
|
this.connecting = false;
|
||||||
|
|
||||||
// deno-lint-ignore no-this-alias
|
// deno-lint-ignore no-this-alias
|
||||||
for (let s = this; s !== null; s = s._parent) {
|
for (let s = this; s != null; s = s._parent) {
|
||||||
clearTimeout(s[kTimeout]);
|
clearTimeout(s[kTimeout]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -234,18 +234,6 @@ Deno.test("TLSSocket can construct without options", () => {
|
||||||
new tls.TLSSocket(new stream.PassThrough() as any);
|
new tls.TLSSocket(new stream.PassThrough() as any);
|
||||||
});
|
});
|
||||||
|
|
||||||
Deno.test("tlssocket._handle._parentWrap is set", () => {
|
|
||||||
// Note: This feature is used in popular 'http2-wrapper' module
|
|
||||||
// https://github.com/szmarczak/http2-wrapper/blob/51eeaf59ff9344fb192b092241bfda8506983620/source/utils/js-stream-socket.js#L6
|
|
||||||
const parentWrap =
|
|
||||||
// deno-lint-ignore no-explicit-any
|
|
||||||
((new tls.TLSSocket(new stream.PassThrough() as any, {}) as any)
|
|
||||||
// deno-lint-ignore no-explicit-any
|
|
||||||
._handle as any)!
|
|
||||||
._parentWrap;
|
|
||||||
assertInstanceOf(parentWrap, stream.PassThrough);
|
|
||||||
});
|
|
||||||
|
|
||||||
Deno.test("tls.connect() throws InvalidData when there's error in certificate", async () => {
|
Deno.test("tls.connect() throws InvalidData when there's error in certificate", async () => {
|
||||||
// Uses execCode to avoid `--unsafely-ignore-certificate-errors` option applied
|
// Uses execCode to avoid `--unsafely-ignore-certificate-errors` option applied
|
||||||
const [status, output] = await execCode(`
|
const [status, output] = await execCode(`
|
||||||
|
@ -319,6 +307,58 @@ Deno.test("tls connect upgrade tcp", async () => {
|
||||||
socket.destroy();
|
socket.destroy();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
Deno.test("tlssocket._handle._parentWrap is set", () => {
|
||||||
|
// Note: This feature is used in popular 'http2-wrapper' module
|
||||||
|
// https://github.com/szmarczak/http2-wrapper/blob/51eeaf59ff9344fb192b092241bfda8506983620/source/utils/js-stream-socket.js#L6
|
||||||
|
const parentWrap =
|
||||||
|
// deno-lint-ignore no-explicit-any
|
||||||
|
((new tls.TLSSocket(new stream.PassThrough() as any, {}) as any)
|
||||||
|
// deno-lint-ignore no-explicit-any
|
||||||
|
._handle as any)!
|
||||||
|
._parentWrap;
|
||||||
|
assertInstanceOf(parentWrap, stream.PassThrough);
|
||||||
|
});
|
||||||
|
|
||||||
|
Deno.test({
|
||||||
|
name: "tls connect upgrade js socket wrapper",
|
||||||
|
sanitizeOps: false,
|
||||||
|
sanitizeResources: false,
|
||||||
|
}, async () => {
|
||||||
|
const { promise, resolve } = Promise.withResolvers<void>();
|
||||||
|
|
||||||
|
class SocketWrapper extends stream.Duplex {
|
||||||
|
socket: net.Socket;
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
super();
|
||||||
|
this.socket = new net.Socket();
|
||||||
|
}
|
||||||
|
|
||||||
|
// deno-lint-ignore no-explicit-any
|
||||||
|
override _write(chunk: any, encoding: any, callback: any) {
|
||||||
|
this.socket.write(chunk, encoding, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
override _read() {
|
||||||
|
}
|
||||||
|
|
||||||
|
connect(port: number, host: string) {
|
||||||
|
this.socket.connect(port, host);
|
||||||
|
this.socket.on("data", (data) => this.push(data));
|
||||||
|
this.socket.on("end", () => this.push(null));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const socket = new SocketWrapper();
|
||||||
|
socket.connect(443, "google.com");
|
||||||
|
|
||||||
|
const secure = tls.connect({ socket, host: "google.com" });
|
||||||
|
secure.on("secureConnect", () => resolve());
|
||||||
|
|
||||||
|
await promise;
|
||||||
|
socket.destroy();
|
||||||
|
});
|
||||||
|
|
||||||
Deno.test({
|
Deno.test({
|
||||||
name: "[node/tls] tls.Server.unref() works",
|
name: "[node/tls] tls.Server.unref() works",
|
||||||
ignore: Deno.build.os === "windows",
|
ignore: Deno.build.os === "windows",
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue