From dd47e25a3056dea79620ddb778834c462713881b Mon Sep 17 00:00:00 2001 From: Felipe Cardozo Date: Fri, 19 Dec 2025 05:18:48 -0300 Subject: [PATCH] feat: support named pipe listen, connect and open (#31624) Co-authored-by: Cyan Changes Add support to Pipe.prototype.listen, Pipe.prototype.connect for windows with named pipes. Also support to Pipe.prototype.open for unix. Alternative of https://github.com/denoland/deno/pull/29308, but without `Deno:pipe`. Only with node compatibilities. Solution for: https://github.com/denoland/deno/issues/25867 https://github.com/denoland/deno/issues/28332 https://github.com/denoland/deno/issues/31032 Maybe related: https://github.com/denoland/deno/issues/10244 I tested the Nuxt and Nx, they are working fine on Windows now. I also tested node-pty, and it's working on Unix. Co-authored-by: Cyan Changes --- Cargo.lock | 1 + ext/net/01_net.js | 12 + ext/net/Cargo.toml | 3 + ext/net/lib.rs | 57 ++- ext/net/ops_unix.rs | 47 +++ ext/net/ops_win_pipe.rs | 116 ++++++ ext/net/win_pipe.rs | 136 +++++++ ext/node/lib.rs | 1 + ext/node/ops/fs.rs | 44 +++ .../polyfills/internal_binding/pipe_wrap.ts | 353 +++++++++++++++--- tests/node_compat/config.toml | 1 + tests/specs/node/pipe_open_fd/__test__.jsonc | 15 + tests/specs/node/pipe_open_fd/invalid_fd.out | 2 + tests/specs/node/pipe_open_fd/invalid_fd.ts | 18 + tests/specs/node/pipe_open_fd/open_fd.out | 3 + tests/specs/node/pipe_open_fd/open_fd.ts | 45 +++ 16 files changed, 781 insertions(+), 73 deletions(-) create mode 100644 ext/net/ops_win_pipe.rs create mode 100644 ext/net/win_pipe.rs create mode 100644 tests/specs/node/pipe_open_fd/__test__.jsonc create mode 100644 tests/specs/node/pipe_open_fd/invalid_fd.out create mode 100644 tests/specs/node/pipe_open_fd/invalid_fd.ts create mode 100644 tests/specs/node/pipe_open_fd/open_fd.out create mode 100644 tests/specs/node/pipe_open_fd/open_fd.ts diff --git a/Cargo.lock b/Cargo.lock index 07675cd294..a7ba9ec9d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2406,6 +2406,7 @@ dependencies = [ "deno_tunnel", "hickory-proto", "hickory-resolver", + "libc", "log", "pin-project", "quinn", diff --git a/ext/net/01_net.js b/ext/net/01_net.js index d4af48d33d..ed3217dd61 100644 --- a/ext/net/01_net.js +++ b/ext/net/01_net.js @@ -265,6 +265,17 @@ class VsockConn extends Conn { } } +class PipeConn extends Conn { + constructor(rid) { + super(rid, null, null); + ObjectDefineProperty(this, internalRidSymbol, { + __proto__: null, + enumerable: false, + value: rid, + }); + } +} + class TunnelConn extends Conn { constructor(rid, remoteAddr, localAddr) { super(rid, remoteAddr, localAddr); @@ -749,6 +760,7 @@ export { listen, Listener, listenOptionApiName, + PipeConn, resolveDns, setDatagramBroadcast, setMulticastLoopback, diff --git a/ext/net/Cargo.toml b/ext/net/Cargo.toml index a93cb21d9e..9f76bf73d3 100644 --- a/ext/net/Cargo.toml +++ b/ext/net/Cargo.toml @@ -35,6 +35,9 @@ tokio.workspace = true url.workspace = true web-transport-proto.workspace = true +[target.'cfg(unix)'.dependencies] +libc.workspace = true + [target.'cfg(any(target_os = "android", target_os = "linux", target_os = "macos"))'.dependencies] tokio-vsock.workspace = true diff --git a/ext/net/lib.rs b/ext/net/lib.rs index 4a85ef26d9..041788854e 100644 --- a/ext/net/lib.rs +++ b/ext/net/lib.rs @@ -5,11 +5,15 @@ pub mod ops; pub mod ops_tls; #[cfg(unix)] pub mod ops_unix; +#[cfg(windows)] +mod ops_win_pipe; mod quic; pub mod raw; pub mod resolve_addr; pub mod tcp; pub mod tunnel; +#[cfg(windows)] +mod win_pipe; use std::sync::Arc; @@ -97,6 +101,11 @@ deno_core::extension!(deno_net, ops_unix::op_node_unstable_net_listen_unixpacket, ops_unix::op_net_recv_unixpacket, ops_unix::op_net_send_unixpacket, + ops_unix::op_net_unix_stream_from_fd, + + ops_win_pipe::op_pipe_open, + ops_win_pipe::op_pipe_connect, + ops_win_pipe::op_pipe_windows_wait, quic::op_quic_connecting_0rtt, quic::op_quic_connecting_1rtt, @@ -169,19 +178,6 @@ mod ops_unix { )) } }; - ($name:ident) => { - #[op2(fast)] - pub fn $name() -> Result<(), std::io::Error> { - let error_msg = format!( - "Operation `{:?}` not supported on non-unix platforms.", - stringify!($name) - ); - Err(std::io::Error::new( - std::io::ErrorKind::Unsupported, - error_msg, - )) - } - }; } stub_op!(op_net_accept_unix); @@ -191,4 +187,39 @@ mod ops_unix { stub_op!(op_node_unstable_net_listen_unixpacket); stub_op!(op_net_recv_unixpacket); stub_op!(op_net_send_unixpacket); + stub_op!(op_net_unix_stream_from_fd); +} + +/// Stub ops for non-windows platforms. +#[cfg(not(windows))] +mod ops_win_pipe { + use deno_core::op2; + + use crate::ops::NetError; + + #[op2(fast)] + #[smi] + pub fn op_pipe_open() -> Result { + Err(NetError::Io(std::io::Error::new( + std::io::ErrorKind::Unsupported, + "Windows named pipes are not supported on this platform", + ))) + } + + #[op2(fast)] + #[smi] + pub fn op_pipe_connect() -> Result { + Err(NetError::Io(std::io::Error::new( + std::io::ErrorKind::Unsupported, + "Windows named pipes are not supported on this platform", + ))) + } + + #[op2(fast)] + pub fn op_pipe_windows_wait() -> Result<(), NetError> { + Err(NetError::Io(std::io::Error::new( + std::io::ErrorKind::Unsupported, + "Windows named pipes are not supported on this platform", + ))) + } } diff --git a/ext/net/ops_unix.rs b/ext/net/ops_unix.rs index ee54567863..ae3762eb31 100644 --- a/ext/net/ops_unix.rs +++ b/ext/net/ops_unix.rs @@ -241,3 +241,50 @@ pub fn op_node_unstable_net_listen_unixpacket( pub fn pathstring(pathname: &Path) -> Result { into_string(pathname.into()) } + +/// Check if fd is a socket using fstat +fn is_socket_fd(fd: i32) -> bool { + // SAFETY: It is safe to zero-initialize a libc::stat struct + let mut stat_buf: libc::stat = unsafe { std::mem::zeroed() }; + // SAFETY: fd is a valid file descriptor, stat_buf is a valid pointer + let result = unsafe { libc::fstat(fd, &mut stat_buf) }; + if result != 0 { + return false; + } + // S_IFSOCK = 0o140000 on most Unix systems + (stat_buf.st_mode & libc::S_IFMT) == libc::S_IFSOCK +} + +#[op2(fast)] +#[smi] +pub fn op_net_unix_stream_from_fd( + state: &mut OpState, + fd: i32, +) -> Result { + use std::os::unix::io::FromRawFd; + + // Validate fd is non-negative + if fd < 0 { + return Err(NetError::Io(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Invalid file descriptor", + ))); + } + + // Check if fd is a socket - if not, we can't use UnixStream + if !is_socket_fd(fd) { + return Err(NetError::Io(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "File descriptor is not a socket", + ))); + } + + // SAFETY: The caller is responsible for passing a valid fd that they own. + // The fd will be owned by the created UnixStream from this point on. + let std_stream = unsafe { std::os::unix::net::UnixStream::from_raw_fd(fd) }; + std_stream.set_nonblocking(true)?; + let unix_stream = UnixStream::from_std(std_stream)?; + let resource = UnixStreamResource::new(unix_stream.into_split()); + let rid = state.resource_table.add(resource); + Ok(rid) +} diff --git a/ext/net/ops_win_pipe.rs b/ext/net/ops_win_pipe.rs new file mode 100644 index 0000000000..ebcdd38729 --- /dev/null +++ b/ext/net/ops_win_pipe.rs @@ -0,0 +1,116 @@ +// Copyright 2018-2025 the Deno authors. MIT license. + +use std::borrow::Cow; +use std::cell::RefCell; +use std::path::Path; +use std::rc::Rc; + +use deno_core::OpState; +use deno_core::ResourceId; +use deno_core::op2; +use deno_permissions::OpenAccessKind; +use deno_permissions::PermissionsContainer; +use tokio::net::windows::named_pipe; + +use crate::ops::NetError; +use crate::win_pipe::NamedPipe; + +#[op2(stack_trace)] +#[smi] +pub fn op_pipe_open( + state: &mut OpState, + #[string] path: String, + #[smi] max_instances: Option, + is_message_mode: bool, + inbound: bool, + outbound: bool, + #[string] api_name: String, +) -> Result { + let permissions = state.borrow_mut::(); + + let path = permissions + .check_open( + Cow::Borrowed(Path::new(&path)), + OpenAccessKind::ReadWriteNoFollow, + Some(&api_name), + ) + .map_err(NetError::Permission)?; + + let pipe_mode = if is_message_mode { + named_pipe::PipeMode::Message + } else { + named_pipe::PipeMode::Byte + }; + + let mut opts = named_pipe::ServerOptions::new(); + opts + .pipe_mode(pipe_mode) + .access_inbound(inbound) + .access_outbound(outbound); + if let Some(max_instances) = max_instances { + opts.max_instances(max_instances as usize); + } + let pipe = NamedPipe::new_server(AsRef::::as_ref(&path), &opts)?; + let rid = state.resource_table.add(pipe); + Ok(rid) +} + +#[op2(async, stack_trace)] +pub async fn op_pipe_windows_wait( + state: Rc>, + #[smi] rid: ResourceId, +) -> Result<(), NetError> { + let pipe = state.borrow().resource_table.get::(rid)?; + pipe.connect().await?; + Ok(()) +} + +#[op2(fast)] +#[smi] +pub fn op_pipe_connect( + state: &mut OpState, + #[string] path: String, + read: bool, + write: bool, + #[string] api_name: &str, +) -> Result { + let permissions = state.borrow_mut::(); + + let checked_path = permissions + .check_open( + Cow::Borrowed(Path::new(&path)), + OpenAccessKind::ReadWriteNoFollow, + Some(api_name), + ) + .map_err(NetError::Permission)?; + + // Check if this looks like a named pipe path + // Windows named pipes must start with \\.\pipe\ or \\?\pipe\ + let is_named_pipe = path.starts_with("\\\\.\\pipe\\") + || path.starts_with("\\\\?\\pipe\\") + || path.starts_with("//./pipe/") + || path.starts_with("//?/pipe/"); + + if !is_named_pipe { + // For non-pipe paths, check if the path exists as a file + // If it does, return ENOTSOCK (not a socket) + // If it doesn't exist, return ENOENT + let path = Path::new(&path); + if path.exists() { + return Err(NetError::Io(std::io::Error::other( + "ENOTSOCK: not a socket", + ))); + } else { + return Err(NetError::Io(std::io::Error::other( + "ENOENT: no such file or directory", + ))); + } + } + + let mut opts = named_pipe::ClientOptions::new(); + opts.read(read).write(write); + let pipe = + NamedPipe::new_client(AsRef::::as_ref(&checked_path), &opts)?; + let rid = state.resource_table.add(pipe); + Ok(rid) +} diff --git a/ext/net/win_pipe.rs b/ext/net/win_pipe.rs new file mode 100644 index 0000000000..3dafe480ed --- /dev/null +++ b/ext/net/win_pipe.rs @@ -0,0 +1,136 @@ +// Copyright 2018-2025 the Deno authors. MIT license. + +use std::borrow::Cow; +use std::ffi::OsStr; +use std::io; +use std::rc::Rc; + +use deno_core::AsyncRefCell; +use deno_core::AsyncResult; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::RcRef; +use deno_core::Resource; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; +use tokio::io::ReadHalf; +use tokio::io::WriteHalf; +use tokio::net::windows::named_pipe; + +/// A Windows named pipe resource that supports concurrent read and write. +/// This is achieved by splitting the pipe into separate read and write halves, +/// each with its own async lock, allowing duplex communication. +pub struct NamedPipe { + read_half: AsyncRefCell, + write_half: AsyncRefCell, + cancel: CancelHandle, + /// Server pipe waiting for connection (before split) + pending_server: AsyncRefCell>, +} + +enum NamedPipeRead { + Server(ReadHalf), + Client(ReadHalf), + None, +} + +enum NamedPipeWrite { + Server(WriteHalf), + Client(WriteHalf), + None, +} + +impl NamedPipe { + pub fn new_server( + addr: impl AsRef, + options: &named_pipe::ServerOptions, + ) -> io::Result { + let server = options.create(addr)?; + // Server starts in pending state - will be split after connect() + Ok(NamedPipe { + read_half: AsyncRefCell::new(NamedPipeRead::None), + write_half: AsyncRefCell::new(NamedPipeWrite::None), + cancel: Default::default(), + pending_server: AsyncRefCell::new(Some(server)), + }) + } + + pub fn new_client( + addr: impl AsRef, + options: &named_pipe::ClientOptions, + ) -> io::Result { + let client = options.open(addr)?; + // Client is immediately connected, split into read/write halves + let (read, write) = tokio::io::split(client); + Ok(NamedPipe { + read_half: AsyncRefCell::new(NamedPipeRead::Client(read)), + write_half: AsyncRefCell::new(NamedPipeWrite::Client(write)), + cancel: Default::default(), + pending_server: AsyncRefCell::new(None), + }) + } + + pub async fn connect(self: Rc) -> io::Result<()> { + let mut pending = + RcRef::map(&self, |s| &s.pending_server).borrow_mut().await; + let cancel = RcRef::map(&self, |s| &s.cancel); + + if let Some(server) = pending.take() { + // Wait for client to connect + server.connect().try_or_cancel(cancel).await?; + + // Now split the connected server into read/write halves + let (read, write) = tokio::io::split(server); + + let mut read_half = + RcRef::map(&self, |s| &s.read_half).borrow_mut().await; + let mut write_half = + RcRef::map(&self, |s| &s.write_half).borrow_mut().await; + + *read_half = NamedPipeRead::Server(read); + *write_half = NamedPipeWrite::Server(write); + } + // Client is already connected, nothing to do + Ok(()) + } + + pub async fn write(self: Rc, buf: &[u8]) -> io::Result { + let mut write_half = + RcRef::map(&self, |s| &s.write_half).borrow_mut().await; + let cancel = RcRef::map(&self, |s| &s.cancel); + match &mut *write_half { + NamedPipeWrite::Server(w) => w.write(buf).try_or_cancel(cancel).await, + NamedPipeWrite::Client(w) => w.write(buf).try_or_cancel(cancel).await, + NamedPipeWrite::None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "pipe not connected", + )), + } + } + + pub async fn read(self: Rc, buf: &mut [u8]) -> io::Result { + let mut read_half = RcRef::map(&self, |s| &s.read_half).borrow_mut().await; + let cancel = RcRef::map(&self, |s| &s.cancel); + match &mut *read_half { + NamedPipeRead::Server(r) => r.read(buf).try_or_cancel(cancel).await, + NamedPipeRead::Client(r) => r.read(buf).try_or_cancel(cancel).await, + NamedPipeRead::None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "pipe not connected", + )), + } + } +} + +impl Resource for NamedPipe { + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); + + fn name(&self) -> Cow<'_, str> { + Cow::Borrowed("namedPipe") + } + + fn close(self: Rc) { + self.cancel.cancel(); + } +} diff --git a/ext/node/lib.rs b/ext/node/lib.rs index 81111f03c7..7d494c88ef 100644 --- a/ext/node/lib.rs +++ b/ext/node/lib.rs @@ -281,6 +281,7 @@ deno_core::extension!(deno_node, ops::fs::op_node_open, ops::fs::op_node_statfs_sync, ops::fs::op_node_statfs, + ops::fs::op_node_file_from_fd, ops::winerror::op_node_sys_to_uv_error, ops::v8::op_v8_cached_data_version_tag, ops::v8::op_v8_get_heap_statistics, diff --git a/ext/node/ops/fs.rs b/ext/node/ops/fs.rs index dc16f7b885..19a479e4c5 100644 --- a/ext/node/ops/fs.rs +++ b/ext/node/ops/fs.rs @@ -530,3 +530,47 @@ fn temp_path_append_suffix(prefix: &str) -> String { (0..6).map(|_| OsRng.sample(Alphanumeric) as char).collect(); format!("{}{}", prefix, suffix) } + +/// Create a file resource from a raw file descriptor. +/// This is used for wrapping PTYs and other non-socket file descriptors +/// that can't be wrapped as Unix streams. +#[cfg(unix)] +#[op2(fast)] +#[smi] +pub fn op_node_file_from_fd( + state: &mut OpState, + fd: i32, +) -> Result { + use std::fs::File as StdFile; + use std::os::unix::io::FromRawFd; + + if fd < 0 { + return Err(FsError::Io(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Invalid file descriptor", + ))); + } + + // SAFETY: The caller is responsible for passing a valid fd that they own. + // The fd will be owned by the created File from this point on. + let std_file = unsafe { StdFile::from_raw_fd(fd) }; + + let file = Rc::new(deno_io::StdFileResourceInner::file(std_file, None)); + let rid = state + .resource_table + .add(FileResource::new(file, "pipe".to_string())); + Ok(rid) +} + +#[cfg(not(unix))] +#[op2(fast)] +#[smi] +pub fn op_node_file_from_fd( + _state: &mut OpState, + _fd: i32, +) -> Result { + Err(FsError::Io(std::io::Error::new( + std::io::ErrorKind::Unsupported, + "op_node_file_from_fd is not supported on this platform", + ))) +} diff --git a/ext/node/polyfills/internal_binding/pipe_wrap.ts b/ext/node/polyfills/internal_binding/pipe_wrap.ts index d39128e993..dede8e8781 100644 --- a/ext/node/polyfills/internal_binding/pipe_wrap.ts +++ b/ext/node/polyfills/internal_binding/pipe_wrap.ts @@ -24,6 +24,17 @@ // - https://github.com/nodejs/node/blob/master/src/pipe_wrap.cc // - https://github.com/nodejs/node/blob/master/src/pipe_wrap.h +import { core, primordials } from "ext:core/mod.js"; +import { + op_net_unix_stream_from_fd, + op_node_file_from_fd, + op_pipe_connect, + op_pipe_open, + op_pipe_windows_wait, +} from "ext:core/ops"; +import { PipeConn, UnixConn } from "ext:deno_net/01_net.js"; + +const { internalRidSymbol } = core; import { notImplemented } from "ext:deno_node/_utils.ts"; import { unreachable } from "ext:deno_node/_util/asserts.ts"; import { ConnectionWrap } from "ext:deno_node/internal_binding/connection_wrap.ts"; @@ -32,12 +43,12 @@ import { providerType, } from "ext:deno_node/internal_binding/async_wrap.ts"; import { LibuvStreamWrap } from "ext:deno_node/internal_binding/stream_wrap.ts"; -import { codeMap } from "ext:deno_node/internal_binding/uv.ts"; -import { delay } from "ext:deno_node/_util/async.ts"; import { - kStreamBaseField, - StreamBase, -} from "ext:deno_node/internal_binding/stream_wrap.ts"; + codeMap, + mapSysErrnoToUvErrno, +} from "ext:deno_node/internal_binding/uv.ts"; +import { delay } from "ext:deno_node/_util/async.ts"; +import { kStreamBaseField } from "ext:deno_node/internal_binding/stream_wrap.ts"; import { ceilPowOf2, INITIAL_ACCEPT_BACKOFF_DELAY, @@ -45,14 +56,17 @@ import { } from "ext:deno_node/internal_binding/_listen.ts"; import { isWindows } from "ext:deno_node/_util/os.ts"; import { fs } from "ext:deno_node/internal_binding/constants.ts"; -import { primordials } from "ext:core/mod.js"; const { + ErrorPrototype, FunctionPrototypeCall, MapPrototypeGet, + ObjectDefineProperty, ObjectPrototypeIsPrototypeOf, PromisePrototypeThen, ReflectHas, + StringPrototypeIncludes, + queueMicrotask, } = primordials; export enum socketType { @@ -61,6 +75,56 @@ export enum socketType { IPC, } +/** + * A wrapper for file-based streams (PTYs, pipes, etc.) that provides + * the interface expected by LibuvStreamWrap. + */ +class FileStreamConn { + #rid: number; + #closed = false; + + constructor(rid: number) { + this.#rid = rid; + ObjectDefineProperty(this, internalRidSymbol, { + __proto__: null, + enumerable: false, + value: rid, + }); + } + + async read(buf: Uint8Array): Promise { + // Loop to handle EAGAIN/EWOULDBLOCK for non-blocking fds (PTYs, pipes) + while (!this.#closed) { + try { + const nread = await core.read(this.#rid, buf); + return nread === 0 ? null : nread; + } catch (e) { + // Handle EAGAIN/EWOULDBLOCK by waiting and retrying + if ( + ObjectPrototypeIsPrototypeOf(ErrorPrototype, e) && + ((e as Error).name === "WouldBlock" || + (e as { code?: string }).code === "EAGAIN") + ) { + // Wait a bit before retrying to avoid busy-looping + await delay(10); + continue; + } + throw e; + } + } + return null; + } + + async write(data: Uint8Array): Promise { + return await core.write(this.#rid, data); + } + + close(): void { + this.#closed = true; + core.tryClose(this.#rid); + } +} + export class Pipe extends ConnectionWrap { override reading = false; ipc: boolean; @@ -76,6 +140,7 @@ export class Pipe extends ConnectionWrap { #closed = false; #acceptBackoffDelay?: number; + #serverPipeRid?: number; constructor(type: number, conn?: Deno.UnixConn | StreamBase) { let provider: providerType; @@ -118,9 +183,47 @@ export class Pipe extends ConnectionWrap { } } - open(_fd: number): number { - // REF: https://github.com/denoland/deno/issues/6529 - notImplemented("Pipe.prototype.open"); + open(fd: number): number { + if (isWindows) { + // Windows named pipes don't support opening from fd + notImplemented("Pipe.prototype.open on Windows"); + } + try { + // First, try to open as a Unix socket (for actual Unix domain sockets) + const rid = op_net_unix_stream_from_fd(fd); + this[kStreamBaseField] = new UnixConn(rid, null, null); + return 0; + } catch (e) { + // If the fd is not a socket (e.g., PTY, pipe), fall back to file-based I/O + if ( + ObjectPrototypeIsPrototypeOf(ErrorPrototype, e) && + (StringPrototypeIncludes((e as Error).message, "not a socket") || + StringPrototypeIncludes((e as Error).message, "ENOTSOCK")) + ) { + try { + const rid = op_node_file_from_fd(fd); + this[kStreamBaseField] = new FileStreamConn(rid); + return 0; + } catch (e2) { + if ( + ObjectPrototypeIsPrototypeOf(ErrorPrototype, e2) && + ReflectHas(e2 as Error, "code") + ) { + return codeMap.get((e2 as { code: string }).code) ?? + codeMap.get("UNKNOWN")!; + } + return codeMap.get("UNKNOWN")!; + } + } + if ( + ObjectPrototypeIsPrototypeOf(ErrorPrototype, e) && + ReflectHas(e as Error, "code") + ) { + return codeMap.get((e as { code: string }).code) ?? + codeMap.get("UNKNOWN")!; + } + return codeMap.get("UNKNOWN")!; + } } /** @@ -146,8 +249,73 @@ export class Pipe extends ConnectionWrap { */ connect(req: PipeConnectWrap, address: string) { if (isWindows) { - // REF: https://github.com/denoland/deno/issues/10244 - notImplemented("Pipe.prototype.connect - Windows"); + // On Windows, use the named pipe API + try { + const rid = op_pipe_connect( + address, + true, + true, + "net.createConnection()", + ); + this[kStreamBaseField] = new PipeConn(rid); + this.#address = req.address = address; + + // Use queueMicrotask to match async behavior + queueMicrotask(() => { + try { + this.afterConnect(req, 0); + } catch { + // swallow callback errors. + } + }); + } catch (e: unknown) { + // Handle Windows named pipe errors + // Map the error to UV error codes + let code; + const err = e as { + code?: string; + message?: string; + rawOsError?: number; + cause?: { rawOsError?: number }; + }; + if (err.code !== undefined) { + code = MapPrototypeGet(codeMap, err.code) ?? + MapPrototypeGet(codeMap, "UNKNOWN")!; + } else { + // Check error message for known patterns + const msg = err.message ?? ""; + if (StringPrototypeIncludes(msg, "ENOTSOCK")) { + code = MapPrototypeGet(codeMap, "ENOTSOCK")!; + } else if ( + StringPrototypeIncludes(msg, "ENOENT") || + StringPrototypeIncludes(msg, "NotFound") + ) { + code = MapPrototypeGet(codeMap, "ENOENT")!; + } else { + // Try to extract Windows error codes from the error + // Windows error 2 = ERROR_FILE_NOT_FOUND -> ENOENT + // Windows error 3 = ERROR_PATH_NOT_FOUND -> ENOENT + // Windows error 231 = ERROR_PIPE_BUSY -> EAGAIN + // Windows error 232 = ERROR_NO_DATA -> EPIPE + const rawOsError = err.rawOsError ?? err.cause?.rawOsError; + if (rawOsError !== undefined) { + code = mapSysErrnoToUvErrno(rawOsError); + } else { + code = MapPrototypeGet(codeMap, "UNKNOWN")!; + } + } + } + + queueMicrotask(() => { + try { + this.afterConnect(req, code); + } catch { + // swallow callback errors. + } + }); + } + + return 0; } const connectOptions: Deno.UnixConnectOptions = { @@ -190,15 +358,34 @@ export class Pipe extends ConnectionWrap { * @return An error status code. */ listen(backlog: number): number { - if (isWindows) { - // REF: https://github.com/denoland/deno/issues/10244 - notImplemented("Pipe.prototype.listen - Windows"); - } - this.#backlog = isWindows ? this.#pendingInstances : ceilPowOf2(backlog + 1); + if (isWindows) { + try { + const rid = op_pipe_open( + this.#address!, + this.#pendingInstances, + false, + true, + true, + "net.Server.listen()", + ); + + this.#serverPipeRid = rid; + this.#acceptWindows(); + + return 0; + } catch (e) { + if (ObjectPrototypeIsPrototypeOf(Deno.errors.NotCapable.prototype, e)) { + throw e; + } + return MapPrototypeGet(codeMap, e.code ?? "UNKNOWN") ?? + MapPrototypeGet(codeMap, "UNKNOWN")!; + } + } + const listenOptions = { path: this.#address!, transport: "unix" as const, @@ -284,8 +471,8 @@ export class Pipe extends ConnectionWrap { return 0; } - /** Handle backoff delays following an unsuccessful accept. */ - async #acceptBackoff() { + /** Calculate and apply backoff delay following an unsuccessful accept. */ + async #acceptBackoff(): Promise { // Backoff after transient errors to allow time for the system to // recover, and avoid blocking up the event loop with a continuously // running loop. @@ -300,60 +487,101 @@ export class Pipe extends ConnectionWrap { } await delay(this.#acceptBackoffDelay); + } - this.#accept(); + /** Accept new connections on Windows named pipes. */ + async #acceptWindows(): Promise { + while (!this.#closed) { + try { + // Wait for a client to connect + await op_pipe_windows_wait(this.#serverPipeRid!); + + // Connection established, wrap it + const connectionHandle = new Pipe(socketType.SOCKET); + connectionHandle[kStreamBaseField] = new PipeConn(this.#serverPipeRid!); + + this.#connections++; + + try { + this.onconnection!(0, connectionHandle); + } catch { + // swallow callback errors. + } + + // Reset the backoff delay upon successful accept. + this.#acceptBackoffDelay = undefined; + + // Create a new server pipe for the next connection + const newRid = op_pipe_open( + this.#address!, + this.#pendingInstances, + false, + true, + true, + "net.Server.listen()", + ); + + this.#serverPipeRid = newRid; + } catch { + if (this.#closed) { + return; + } + + try { + this.onconnection!(MapPrototypeGet(codeMap, "UNKNOWN")!, undefined); + } catch { + // swallow callback errors. + } + + await delay(this.#acceptBackoffDelay || INITIAL_ACCEPT_BACKOFF_DELAY); + } + } } /** Accept new connections. */ async #accept(): Promise { - if (this.#closed) { - return; - } - - if (this.#connections > this.#backlog!) { - this.#acceptBackoff(); - - return; - } - - let connection: Deno.Conn; - - try { - connection = await this.#listener.accept(); - } catch (e) { - if ( - ObjectPrototypeIsPrototypeOf(Deno.errors.BadResource.prototype, e) && - this.#closed - ) { - // Listener and server has closed. - return; + while (!this.#closed) { + if (this.#connections > this.#backlog!) { + await this.#acceptBackoff(); + continue; } + let connection: Deno.Conn; + try { - // TODO(cmorten): map errors to appropriate error codes. - this.onconnection!(MapPrototypeGet(codeMap, "UNKNOWN")!, undefined); + connection = await this.#listener.accept(); + } catch (e) { + if ( + ObjectPrototypeIsPrototypeOf(Deno.errors.BadResource.prototype, e) && + this.#closed + ) { + // Listener and server has closed. + return; + } + + try { + // TODO(cmorten): map errors to appropriate error codes. + this.onconnection!(MapPrototypeGet(codeMap, "UNKNOWN")!, undefined); + } catch { + // swallow callback errors. + } + + await this.#acceptBackoff(); + continue; + } + + // Reset the backoff delay upon successful accept. + this.#acceptBackoffDelay = undefined; + + const connectionHandle = new Pipe(socketType.SOCKET, connection); + this.#connections++; + + try { + this.onconnection!(0, connectionHandle); } catch { // swallow callback errors. } - - this.#acceptBackoff(); - - return; } - - // Reset the backoff delay upon successful accept. - this.#acceptBackoffDelay = undefined; - - const connectionHandle = new Pipe(socketType.SOCKET, connection); - this.#connections++; - - try { - this.onconnection!(0, connectionHandle); - } catch { - // swallow callback errors. - } - - return this.#accept(); } /** Handle server closure. */ @@ -368,6 +596,11 @@ export class Pipe extends ConnectionWrap { this.#acceptBackoffDelay = undefined; if (this.provider === providerType.PIPESERVERWRAP) { + if (this.#serverPipeRid !== undefined) { + core.tryClose(this.#serverPipeRid); + this.#serverPipeRid = undefined; + } + try { this.#listener.close(); } catch { diff --git a/tests/node_compat/config.toml b/tests/node_compat/config.toml index c6e7cf5826..89d811d183 100644 --- a/tests/node_compat/config.toml +++ b/tests/node_compat/config.toml @@ -808,6 +808,7 @@ "parallel/test-net-persistent-keepalive.js" = {} "parallel/test-net-persistent-nodelay.js" = {} "parallel/test-net-persistent-ref-unref.js" = {} +"parallel/test-net-pipe-connect-errors.js" = {} "parallel/test-net-pipe-with-long-path.js" = {} "parallel/test-net-reconnect.js" = {} "parallel/test-net-remote-address-port.js" = {} diff --git a/tests/specs/node/pipe_open_fd/__test__.jsonc b/tests/specs/node/pipe_open_fd/__test__.jsonc new file mode 100644 index 0000000000..b70cd0e008 --- /dev/null +++ b/tests/specs/node/pipe_open_fd/__test__.jsonc @@ -0,0 +1,15 @@ +{ + "tempDir": true, + "tests": { + "invalid_fd": { + "if": "unix", + "args": "run -A invalid_fd.ts", + "output": "invalid_fd.out" + }, + "open_fd": { + "if": "unix", + "args": "run -A --unstable-ffi open_fd.ts", + "output": "open_fd.out" + } + } +} diff --git a/tests/specs/node/pipe_open_fd/invalid_fd.out b/tests/specs/node/pipe_open_fd/invalid_fd.out new file mode 100644 index 0000000000..495fc0c006 --- /dev/null +++ b/tests/specs/node/pipe_open_fd/invalid_fd.out @@ -0,0 +1,2 @@ +open(-1) returned: -22 +PASS: Invalid fd returns error code diff --git a/tests/specs/node/pipe_open_fd/invalid_fd.ts b/tests/specs/node/pipe_open_fd/invalid_fd.ts new file mode 100644 index 0000000000..30e0ddc936 --- /dev/null +++ b/tests/specs/node/pipe_open_fd/invalid_fd.ts @@ -0,0 +1,18 @@ +// Test Pipe.prototype.open() with invalid fd +import { createRequire } from "node:module"; + +const require = createRequire(import.meta.url); +const { Pipe, constants: PipeConstants } = require("internal/test/binding") + .internalBinding("pipe_wrap"); + +const pipe = new Pipe(PipeConstants.SOCKET); +const result = pipe.open(-1); + +// Should return a non-zero error code for invalid fd +console.log(`open(-1) returned: ${result}`); +if (result !== 0) { + console.log("PASS: Invalid fd returns error code"); +} else { + console.log("FAIL: Invalid fd should return error code"); + Deno.exit(1); +} diff --git a/tests/specs/node/pipe_open_fd/open_fd.out b/tests/specs/node/pipe_open_fd/open_fd.out new file mode 100644 index 0000000000..b39211c1f1 --- /dev/null +++ b/tests/specs/node/pipe_open_fd/open_fd.out @@ -0,0 +1,3 @@ +Created socketpair: fd [WILDCARD] +Pipe.open([WILDCARD]) returned: 0 +PASS: Pipe.open() succeeded with valid fd diff --git a/tests/specs/node/pipe_open_fd/open_fd.ts b/tests/specs/node/pipe_open_fd/open_fd.ts new file mode 100644 index 0000000000..d406e49ec5 --- /dev/null +++ b/tests/specs/node/pipe_open_fd/open_fd.ts @@ -0,0 +1,45 @@ +// Test Pipe.prototype.open(fd) happy path using socketpair via FFI +import { createRequire } from "node:module"; + +const require = createRequire(import.meta.url); +const { Pipe, constants: PipeConstants } = require("internal/test/binding") + .internalBinding("pipe_wrap"); + +// Use FFI to create a socketpair +const libName = Deno.build.os === "darwin" ? "libSystem.B.dylib" : "libc.so.6"; +const libc = Deno.dlopen(libName, { + socketpair: { parameters: ["i32", "i32", "i32", "buffer"], result: "i32" }, + close: { parameters: ["i32"], result: "i32" }, +}); + +const AF_UNIX = 1; +const SOCK_STREAM = 1; +const fds = new Int32Array(2); + +const result = libc.symbols.socketpair(AF_UNIX, SOCK_STREAM, 0, fds); +if (result !== 0) { + console.log("FAIL: socketpair failed"); + Deno.exit(1); +} + +const fd0 = fds[0]; +const fd1 = fds[1]; +console.log(`Created socketpair: fd ${fd0} <-> fd ${fd1}`); + +// Test Pipe.prototype.open() with a valid fd +const pipe = new Pipe(PipeConstants.SOCKET); +const openResult = pipe.open(fd0); +console.log(`Pipe.open(${fd0}) returned: ${openResult}`); + +if (openResult === 0) { + console.log("PASS: Pipe.open() succeeded with valid fd"); +} else { + console.log("FAIL: Pipe.open() should return 0 for valid fd"); + libc.symbols.close(fd1); + libc.close(); + Deno.exit(1); +} + +// Clean up the other fd (fd0 is now owned by the Pipe) +libc.symbols.close(fd1); +libc.close();