feat: support named pipe listen, connect and open (#31624)

Co-authored-by:  Cyan Changes <contact@cyans.me>

Add support to Pipe.prototype.listen, Pipe.prototype.connect for windows
with named pipes.
Also support to Pipe.prototype.open for unix.

Alternative of https://github.com/denoland/deno/pull/29308, but without
`Deno:pipe`.
Only with node compatibilities.


Solution for:
https://github.com/denoland/deno/issues/25867
https://github.com/denoland/deno/issues/28332
https://github.com/denoland/deno/issues/31032


Maybe related:
https://github.com/denoland/deno/issues/10244


I tested the Nuxt and Nx, they are working fine on Windows now.
I also tested node-pty, and it's working on Unix.

Co-authored-by: Cyan Changes <contact@cyans.me>
This commit is contained in:
Felipe Cardozo 2025-12-19 05:18:48 -03:00 committed by GitHub
parent e76aa96376
commit dd47e25a30
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 781 additions and 73 deletions

1
Cargo.lock generated
View file

@ -2406,6 +2406,7 @@ dependencies = [
"deno_tunnel",
"hickory-proto",
"hickory-resolver",
"libc",
"log",
"pin-project",
"quinn",

View file

@ -265,6 +265,17 @@ class VsockConn extends Conn {
}
}
class PipeConn extends Conn {
constructor(rid) {
super(rid, null, null);
ObjectDefineProperty(this, internalRidSymbol, {
__proto__: null,
enumerable: false,
value: rid,
});
}
}
class TunnelConn extends Conn {
constructor(rid, remoteAddr, localAddr) {
super(rid, remoteAddr, localAddr);
@ -749,6 +760,7 @@ export {
listen,
Listener,
listenOptionApiName,
PipeConn,
resolveDns,
setDatagramBroadcast,
setMulticastLoopback,

View file

@ -35,6 +35,9 @@ tokio.workspace = true
url.workspace = true
web-transport-proto.workspace = true
[target.'cfg(unix)'.dependencies]
libc.workspace = true
[target.'cfg(any(target_os = "android", target_os = "linux", target_os = "macos"))'.dependencies]
tokio-vsock.workspace = true

View file

@ -5,11 +5,15 @@ pub mod ops;
pub mod ops_tls;
#[cfg(unix)]
pub mod ops_unix;
#[cfg(windows)]
mod ops_win_pipe;
mod quic;
pub mod raw;
pub mod resolve_addr;
pub mod tcp;
pub mod tunnel;
#[cfg(windows)]
mod win_pipe;
use std::sync::Arc;
@ -97,6 +101,11 @@ deno_core::extension!(deno_net,
ops_unix::op_node_unstable_net_listen_unixpacket,
ops_unix::op_net_recv_unixpacket,
ops_unix::op_net_send_unixpacket,
ops_unix::op_net_unix_stream_from_fd,
ops_win_pipe::op_pipe_open,
ops_win_pipe::op_pipe_connect,
ops_win_pipe::op_pipe_windows_wait,
quic::op_quic_connecting_0rtt,
quic::op_quic_connecting_1rtt,
@ -169,19 +178,6 @@ mod ops_unix {
))
}
};
($name:ident) => {
#[op2(fast)]
pub fn $name() -> Result<(), std::io::Error> {
let error_msg = format!(
"Operation `{:?}` not supported on non-unix platforms.",
stringify!($name)
);
Err(std::io::Error::new(
std::io::ErrorKind::Unsupported,
error_msg,
))
}
};
}
stub_op!(op_net_accept_unix);
@ -191,4 +187,39 @@ mod ops_unix {
stub_op!(op_node_unstable_net_listen_unixpacket);
stub_op!(op_net_recv_unixpacket);
stub_op!(op_net_send_unixpacket);
stub_op!(op_net_unix_stream_from_fd);
}
/// Stub ops for non-windows platforms.
#[cfg(not(windows))]
mod ops_win_pipe {
use deno_core::op2;
use crate::ops::NetError;
#[op2(fast)]
#[smi]
pub fn op_pipe_open() -> Result<u32, NetError> {
Err(NetError::Io(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"Windows named pipes are not supported on this platform",
)))
}
#[op2(fast)]
#[smi]
pub fn op_pipe_connect() -> Result<u32, NetError> {
Err(NetError::Io(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"Windows named pipes are not supported on this platform",
)))
}
#[op2(fast)]
pub fn op_pipe_windows_wait() -> Result<(), NetError> {
Err(NetError::Io(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"Windows named pipes are not supported on this platform",
)))
}
}

View file

@ -241,3 +241,50 @@ pub fn op_node_unstable_net_listen_unixpacket(
pub fn pathstring(pathname: &Path) -> Result<String, NetError> {
into_string(pathname.into())
}
/// Check if fd is a socket using fstat
fn is_socket_fd(fd: i32) -> bool {
// SAFETY: It is safe to zero-initialize a libc::stat struct
let mut stat_buf: libc::stat = unsafe { std::mem::zeroed() };
// SAFETY: fd is a valid file descriptor, stat_buf is a valid pointer
let result = unsafe { libc::fstat(fd, &mut stat_buf) };
if result != 0 {
return false;
}
// S_IFSOCK = 0o140000 on most Unix systems
(stat_buf.st_mode & libc::S_IFMT) == libc::S_IFSOCK
}
#[op2(fast)]
#[smi]
pub fn op_net_unix_stream_from_fd(
state: &mut OpState,
fd: i32,
) -> Result<ResourceId, NetError> {
use std::os::unix::io::FromRawFd;
// Validate fd is non-negative
if fd < 0 {
return Err(NetError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Invalid file descriptor",
)));
}
// Check if fd is a socket - if not, we can't use UnixStream
if !is_socket_fd(fd) {
return Err(NetError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"File descriptor is not a socket",
)));
}
// SAFETY: The caller is responsible for passing a valid fd that they own.
// The fd will be owned by the created UnixStream from this point on.
let std_stream = unsafe { std::os::unix::net::UnixStream::from_raw_fd(fd) };
std_stream.set_nonblocking(true)?;
let unix_stream = UnixStream::from_std(std_stream)?;
let resource = UnixStreamResource::new(unix_stream.into_split());
let rid = state.resource_table.add(resource);
Ok(rid)
}

116
ext/net/ops_win_pipe.rs Normal file
View file

@ -0,0 +1,116 @@
// Copyright 2018-2025 the Deno authors. MIT license.
use std::borrow::Cow;
use std::cell::RefCell;
use std::path::Path;
use std::rc::Rc;
use deno_core::OpState;
use deno_core::ResourceId;
use deno_core::op2;
use deno_permissions::OpenAccessKind;
use deno_permissions::PermissionsContainer;
use tokio::net::windows::named_pipe;
use crate::ops::NetError;
use crate::win_pipe::NamedPipe;
#[op2(stack_trace)]
#[smi]
pub fn op_pipe_open(
state: &mut OpState,
#[string] path: String,
#[smi] max_instances: Option<u32>,
is_message_mode: bool,
inbound: bool,
outbound: bool,
#[string] api_name: String,
) -> Result<ResourceId, NetError> {
let permissions = state.borrow_mut::<PermissionsContainer>();
let path = permissions
.check_open(
Cow::Borrowed(Path::new(&path)),
OpenAccessKind::ReadWriteNoFollow,
Some(&api_name),
)
.map_err(NetError::Permission)?;
let pipe_mode = if is_message_mode {
named_pipe::PipeMode::Message
} else {
named_pipe::PipeMode::Byte
};
let mut opts = named_pipe::ServerOptions::new();
opts
.pipe_mode(pipe_mode)
.access_inbound(inbound)
.access_outbound(outbound);
if let Some(max_instances) = max_instances {
opts.max_instances(max_instances as usize);
}
let pipe = NamedPipe::new_server(AsRef::<Path>::as_ref(&path), &opts)?;
let rid = state.resource_table.add(pipe);
Ok(rid)
}
#[op2(async, stack_trace)]
pub async fn op_pipe_windows_wait(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> Result<(), NetError> {
let pipe = state.borrow().resource_table.get::<NamedPipe>(rid)?;
pipe.connect().await?;
Ok(())
}
#[op2(fast)]
#[smi]
pub fn op_pipe_connect(
state: &mut OpState,
#[string] path: String,
read: bool,
write: bool,
#[string] api_name: &str,
) -> Result<ResourceId, NetError> {
let permissions = state.borrow_mut::<PermissionsContainer>();
let checked_path = permissions
.check_open(
Cow::Borrowed(Path::new(&path)),
OpenAccessKind::ReadWriteNoFollow,
Some(api_name),
)
.map_err(NetError::Permission)?;
// Check if this looks like a named pipe path
// Windows named pipes must start with \\.\pipe\ or \\?\pipe\
let is_named_pipe = path.starts_with("\\\\.\\pipe\\")
|| path.starts_with("\\\\?\\pipe\\")
|| path.starts_with("//./pipe/")
|| path.starts_with("//?/pipe/");
if !is_named_pipe {
// For non-pipe paths, check if the path exists as a file
// If it does, return ENOTSOCK (not a socket)
// If it doesn't exist, return ENOENT
let path = Path::new(&path);
if path.exists() {
return Err(NetError::Io(std::io::Error::other(
"ENOTSOCK: not a socket",
)));
} else {
return Err(NetError::Io(std::io::Error::other(
"ENOENT: no such file or directory",
)));
}
}
let mut opts = named_pipe::ClientOptions::new();
opts.read(read).write(write);
let pipe =
NamedPipe::new_client(AsRef::<Path>::as_ref(&checked_path), &opts)?;
let rid = state.resource_table.add(pipe);
Ok(rid)
}

136
ext/net/win_pipe.rs Normal file
View file

@ -0,0 +1,136 @@
// Copyright 2018-2025 the Deno authors. MIT license.
use std::borrow::Cow;
use std::ffi::OsStr;
use std::io;
use std::rc::Rc;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::RcRef;
use deno_core::Resource;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::ReadHalf;
use tokio::io::WriteHalf;
use tokio::net::windows::named_pipe;
/// A Windows named pipe resource that supports concurrent read and write.
/// This is achieved by splitting the pipe into separate read and write halves,
/// each with its own async lock, allowing duplex communication.
pub struct NamedPipe {
read_half: AsyncRefCell<NamedPipeRead>,
write_half: AsyncRefCell<NamedPipeWrite>,
cancel: CancelHandle,
/// Server pipe waiting for connection (before split)
pending_server: AsyncRefCell<Option<named_pipe::NamedPipeServer>>,
}
enum NamedPipeRead {
Server(ReadHalf<named_pipe::NamedPipeServer>),
Client(ReadHalf<named_pipe::NamedPipeClient>),
None,
}
enum NamedPipeWrite {
Server(WriteHalf<named_pipe::NamedPipeServer>),
Client(WriteHalf<named_pipe::NamedPipeClient>),
None,
}
impl NamedPipe {
pub fn new_server(
addr: impl AsRef<OsStr>,
options: &named_pipe::ServerOptions,
) -> io::Result<NamedPipe> {
let server = options.create(addr)?;
// Server starts in pending state - will be split after connect()
Ok(NamedPipe {
read_half: AsyncRefCell::new(NamedPipeRead::None),
write_half: AsyncRefCell::new(NamedPipeWrite::None),
cancel: Default::default(),
pending_server: AsyncRefCell::new(Some(server)),
})
}
pub fn new_client(
addr: impl AsRef<OsStr>,
options: &named_pipe::ClientOptions,
) -> io::Result<NamedPipe> {
let client = options.open(addr)?;
// Client is immediately connected, split into read/write halves
let (read, write) = tokio::io::split(client);
Ok(NamedPipe {
read_half: AsyncRefCell::new(NamedPipeRead::Client(read)),
write_half: AsyncRefCell::new(NamedPipeWrite::Client(write)),
cancel: Default::default(),
pending_server: AsyncRefCell::new(None),
})
}
pub async fn connect(self: Rc<Self>) -> io::Result<()> {
let mut pending =
RcRef::map(&self, |s| &s.pending_server).borrow_mut().await;
let cancel = RcRef::map(&self, |s| &s.cancel);
if let Some(server) = pending.take() {
// Wait for client to connect
server.connect().try_or_cancel(cancel).await?;
// Now split the connected server into read/write halves
let (read, write) = tokio::io::split(server);
let mut read_half =
RcRef::map(&self, |s| &s.read_half).borrow_mut().await;
let mut write_half =
RcRef::map(&self, |s| &s.write_half).borrow_mut().await;
*read_half = NamedPipeRead::Server(read);
*write_half = NamedPipeWrite::Server(write);
}
// Client is already connected, nothing to do
Ok(())
}
pub async fn write(self: Rc<Self>, buf: &[u8]) -> io::Result<usize> {
let mut write_half =
RcRef::map(&self, |s| &s.write_half).borrow_mut().await;
let cancel = RcRef::map(&self, |s| &s.cancel);
match &mut *write_half {
NamedPipeWrite::Server(w) => w.write(buf).try_or_cancel(cancel).await,
NamedPipeWrite::Client(w) => w.write(buf).try_or_cancel(cancel).await,
NamedPipeWrite::None => Err(io::Error::new(
io::ErrorKind::NotConnected,
"pipe not connected",
)),
}
}
pub async fn read(self: Rc<Self>, buf: &mut [u8]) -> io::Result<usize> {
let mut read_half = RcRef::map(&self, |s| &s.read_half).borrow_mut().await;
let cancel = RcRef::map(&self, |s| &s.cancel);
match &mut *read_half {
NamedPipeRead::Server(r) => r.read(buf).try_or_cancel(cancel).await,
NamedPipeRead::Client(r) => r.read(buf).try_or_cancel(cancel).await,
NamedPipeRead::None => Err(io::Error::new(
io::ErrorKind::NotConnected,
"pipe not connected",
)),
}
}
}
impl Resource for NamedPipe {
deno_core::impl_readable_byob!();
deno_core::impl_writable!();
fn name(&self) -> Cow<'_, str> {
Cow::Borrowed("namedPipe")
}
fn close(self: Rc<Self>) {
self.cancel.cancel();
}
}

View file

@ -281,6 +281,7 @@ deno_core::extension!(deno_node,
ops::fs::op_node_open,
ops::fs::op_node_statfs_sync,
ops::fs::op_node_statfs,
ops::fs::op_node_file_from_fd,
ops::winerror::op_node_sys_to_uv_error,
ops::v8::op_v8_cached_data_version_tag,
ops::v8::op_v8_get_heap_statistics,

View file

@ -530,3 +530,47 @@ fn temp_path_append_suffix(prefix: &str) -> String {
(0..6).map(|_| OsRng.sample(Alphanumeric) as char).collect();
format!("{}{}", prefix, suffix)
}
/// Create a file resource from a raw file descriptor.
/// This is used for wrapping PTYs and other non-socket file descriptors
/// that can't be wrapped as Unix streams.
#[cfg(unix)]
#[op2(fast)]
#[smi]
pub fn op_node_file_from_fd(
state: &mut OpState,
fd: i32,
) -> Result<ResourceId, FsError> {
use std::fs::File as StdFile;
use std::os::unix::io::FromRawFd;
if fd < 0 {
return Err(FsError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Invalid file descriptor",
)));
}
// SAFETY: The caller is responsible for passing a valid fd that they own.
// The fd will be owned by the created File from this point on.
let std_file = unsafe { StdFile::from_raw_fd(fd) };
let file = Rc::new(deno_io::StdFileResourceInner::file(std_file, None));
let rid = state
.resource_table
.add(FileResource::new(file, "pipe".to_string()));
Ok(rid)
}
#[cfg(not(unix))]
#[op2(fast)]
#[smi]
pub fn op_node_file_from_fd(
_state: &mut OpState,
_fd: i32,
) -> Result<ResourceId, FsError> {
Err(FsError::Io(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"op_node_file_from_fd is not supported on this platform",
)))
}

View file

@ -24,6 +24,17 @@
// - https://github.com/nodejs/node/blob/master/src/pipe_wrap.cc
// - https://github.com/nodejs/node/blob/master/src/pipe_wrap.h
import { core, primordials } from "ext:core/mod.js";
import {
op_net_unix_stream_from_fd,
op_node_file_from_fd,
op_pipe_connect,
op_pipe_open,
op_pipe_windows_wait,
} from "ext:core/ops";
import { PipeConn, UnixConn } from "ext:deno_net/01_net.js";
const { internalRidSymbol } = core;
import { notImplemented } from "ext:deno_node/_utils.ts";
import { unreachable } from "ext:deno_node/_util/asserts.ts";
import { ConnectionWrap } from "ext:deno_node/internal_binding/connection_wrap.ts";
@ -32,12 +43,12 @@ import {
providerType,
} from "ext:deno_node/internal_binding/async_wrap.ts";
import { LibuvStreamWrap } from "ext:deno_node/internal_binding/stream_wrap.ts";
import { codeMap } from "ext:deno_node/internal_binding/uv.ts";
import { delay } from "ext:deno_node/_util/async.ts";
import {
kStreamBaseField,
StreamBase,
} from "ext:deno_node/internal_binding/stream_wrap.ts";
codeMap,
mapSysErrnoToUvErrno,
} from "ext:deno_node/internal_binding/uv.ts";
import { delay } from "ext:deno_node/_util/async.ts";
import { kStreamBaseField } from "ext:deno_node/internal_binding/stream_wrap.ts";
import {
ceilPowOf2,
INITIAL_ACCEPT_BACKOFF_DELAY,
@ -45,14 +56,17 @@ import {
} from "ext:deno_node/internal_binding/_listen.ts";
import { isWindows } from "ext:deno_node/_util/os.ts";
import { fs } from "ext:deno_node/internal_binding/constants.ts";
import { primordials } from "ext:core/mod.js";
const {
ErrorPrototype,
FunctionPrototypeCall,
MapPrototypeGet,
ObjectDefineProperty,
ObjectPrototypeIsPrototypeOf,
PromisePrototypeThen,
ReflectHas,
StringPrototypeIncludes,
queueMicrotask,
} = primordials;
export enum socketType {
@ -61,6 +75,56 @@ export enum socketType {
IPC,
}
/**
* A wrapper for file-based streams (PTYs, pipes, etc.) that provides
* the interface expected by LibuvStreamWrap.
*/
class FileStreamConn {
#rid: number;
#closed = false;
constructor(rid: number) {
this.#rid = rid;
ObjectDefineProperty(this, internalRidSymbol, {
__proto__: null,
enumerable: false,
value: rid,
});
}
async read(buf: Uint8Array): Promise<number | null> {
// Loop to handle EAGAIN/EWOULDBLOCK for non-blocking fds (PTYs, pipes)
while (!this.#closed) {
try {
const nread = await core.read(this.#rid, buf);
return nread === 0 ? null : nread;
} catch (e) {
// Handle EAGAIN/EWOULDBLOCK by waiting and retrying
if (
ObjectPrototypeIsPrototypeOf(ErrorPrototype, e) &&
((e as Error).name === "WouldBlock" ||
(e as { code?: string }).code === "EAGAIN")
) {
// Wait a bit before retrying to avoid busy-looping
await delay(10);
continue;
}
throw e;
}
}
return null;
}
async write(data: Uint8Array): Promise<number> {
return await core.write(this.#rid, data);
}
close(): void {
this.#closed = true;
core.tryClose(this.#rid);
}
}
export class Pipe extends ConnectionWrap {
override reading = false;
ipc: boolean;
@ -76,6 +140,7 @@ export class Pipe extends ConnectionWrap {
#closed = false;
#acceptBackoffDelay?: number;
#serverPipeRid?: number;
constructor(type: number, conn?: Deno.UnixConn | StreamBase) {
let provider: providerType;
@ -118,9 +183,47 @@ export class Pipe extends ConnectionWrap {
}
}
open(_fd: number): number {
// REF: https://github.com/denoland/deno/issues/6529
notImplemented("Pipe.prototype.open");
open(fd: number): number {
if (isWindows) {
// Windows named pipes don't support opening from fd
notImplemented("Pipe.prototype.open on Windows");
}
try {
// First, try to open as a Unix socket (for actual Unix domain sockets)
const rid = op_net_unix_stream_from_fd(fd);
this[kStreamBaseField] = new UnixConn(rid, null, null);
return 0;
} catch (e) {
// If the fd is not a socket (e.g., PTY, pipe), fall back to file-based I/O
if (
ObjectPrototypeIsPrototypeOf(ErrorPrototype, e) &&
(StringPrototypeIncludes((e as Error).message, "not a socket") ||
StringPrototypeIncludes((e as Error).message, "ENOTSOCK"))
) {
try {
const rid = op_node_file_from_fd(fd);
this[kStreamBaseField] = new FileStreamConn(rid);
return 0;
} catch (e2) {
if (
ObjectPrototypeIsPrototypeOf(ErrorPrototype, e2) &&
ReflectHas(e2 as Error, "code")
) {
return codeMap.get((e2 as { code: string }).code) ??
codeMap.get("UNKNOWN")!;
}
return codeMap.get("UNKNOWN")!;
}
}
if (
ObjectPrototypeIsPrototypeOf(ErrorPrototype, e) &&
ReflectHas(e as Error, "code")
) {
return codeMap.get((e as { code: string }).code) ??
codeMap.get("UNKNOWN")!;
}
return codeMap.get("UNKNOWN")!;
}
}
/**
@ -146,8 +249,73 @@ export class Pipe extends ConnectionWrap {
*/
connect(req: PipeConnectWrap, address: string) {
if (isWindows) {
// REF: https://github.com/denoland/deno/issues/10244
notImplemented("Pipe.prototype.connect - Windows");
// On Windows, use the named pipe API
try {
const rid = op_pipe_connect(
address,
true,
true,
"net.createConnection()",
);
this[kStreamBaseField] = new PipeConn(rid);
this.#address = req.address = address;
// Use queueMicrotask to match async behavior
queueMicrotask(() => {
try {
this.afterConnect(req, 0);
} catch {
// swallow callback errors.
}
});
} catch (e: unknown) {
// Handle Windows named pipe errors
// Map the error to UV error codes
let code;
const err = e as {
code?: string;
message?: string;
rawOsError?: number;
cause?: { rawOsError?: number };
};
if (err.code !== undefined) {
code = MapPrototypeGet(codeMap, err.code) ??
MapPrototypeGet(codeMap, "UNKNOWN")!;
} else {
// Check error message for known patterns
const msg = err.message ?? "";
if (StringPrototypeIncludes(msg, "ENOTSOCK")) {
code = MapPrototypeGet(codeMap, "ENOTSOCK")!;
} else if (
StringPrototypeIncludes(msg, "ENOENT") ||
StringPrototypeIncludes(msg, "NotFound")
) {
code = MapPrototypeGet(codeMap, "ENOENT")!;
} else {
// Try to extract Windows error codes from the error
// Windows error 2 = ERROR_FILE_NOT_FOUND -> ENOENT
// Windows error 3 = ERROR_PATH_NOT_FOUND -> ENOENT
// Windows error 231 = ERROR_PIPE_BUSY -> EAGAIN
// Windows error 232 = ERROR_NO_DATA -> EPIPE
const rawOsError = err.rawOsError ?? err.cause?.rawOsError;
if (rawOsError !== undefined) {
code = mapSysErrnoToUvErrno(rawOsError);
} else {
code = MapPrototypeGet(codeMap, "UNKNOWN")!;
}
}
}
queueMicrotask(() => {
try {
this.afterConnect(req, code);
} catch {
// swallow callback errors.
}
});
}
return 0;
}
const connectOptions: Deno.UnixConnectOptions = {
@ -190,15 +358,34 @@ export class Pipe extends ConnectionWrap {
* @return An error status code.
*/
listen(backlog: number): number {
if (isWindows) {
// REF: https://github.com/denoland/deno/issues/10244
notImplemented("Pipe.prototype.listen - Windows");
}
this.#backlog = isWindows
? this.#pendingInstances
: ceilPowOf2(backlog + 1);
if (isWindows) {
try {
const rid = op_pipe_open(
this.#address!,
this.#pendingInstances,
false,
true,
true,
"net.Server.listen()",
);
this.#serverPipeRid = rid;
this.#acceptWindows();
return 0;
} catch (e) {
if (ObjectPrototypeIsPrototypeOf(Deno.errors.NotCapable.prototype, e)) {
throw e;
}
return MapPrototypeGet(codeMap, e.code ?? "UNKNOWN") ??
MapPrototypeGet(codeMap, "UNKNOWN")!;
}
}
const listenOptions = {
path: this.#address!,
transport: "unix" as const,
@ -284,8 +471,8 @@ export class Pipe extends ConnectionWrap {
return 0;
}
/** Handle backoff delays following an unsuccessful accept. */
async #acceptBackoff() {
/** Calculate and apply backoff delay following an unsuccessful accept. */
async #acceptBackoff(): Promise<void> {
// Backoff after transient errors to allow time for the system to
// recover, and avoid blocking up the event loop with a continuously
// running loop.
@ -300,60 +487,101 @@ export class Pipe extends ConnectionWrap {
}
await delay(this.#acceptBackoffDelay);
}
this.#accept();
/** Accept new connections on Windows named pipes. */
async #acceptWindows(): Promise<void> {
while (!this.#closed) {
try {
// Wait for a client to connect
await op_pipe_windows_wait(this.#serverPipeRid!);
// Connection established, wrap it
const connectionHandle = new Pipe(socketType.SOCKET);
connectionHandle[kStreamBaseField] = new PipeConn(this.#serverPipeRid!);
this.#connections++;
try {
this.onconnection!(0, connectionHandle);
} catch {
// swallow callback errors.
}
// Reset the backoff delay upon successful accept.
this.#acceptBackoffDelay = undefined;
// Create a new server pipe for the next connection
const newRid = op_pipe_open(
this.#address!,
this.#pendingInstances,
false,
true,
true,
"net.Server.listen()",
);
this.#serverPipeRid = newRid;
} catch {
if (this.#closed) {
return;
}
try {
this.onconnection!(MapPrototypeGet(codeMap, "UNKNOWN")!, undefined);
} catch {
// swallow callback errors.
}
await delay(this.#acceptBackoffDelay || INITIAL_ACCEPT_BACKOFF_DELAY);
}
}
}
/** Accept new connections. */
async #accept(): Promise<void> {
if (this.#closed) {
return;
}
if (this.#connections > this.#backlog!) {
this.#acceptBackoff();
return;
}
let connection: Deno.Conn;
try {
connection = await this.#listener.accept();
} catch (e) {
if (
ObjectPrototypeIsPrototypeOf(Deno.errors.BadResource.prototype, e) &&
this.#closed
) {
// Listener and server has closed.
return;
while (!this.#closed) {
if (this.#connections > this.#backlog!) {
await this.#acceptBackoff();
continue;
}
let connection: Deno.Conn;
try {
// TODO(cmorten): map errors to appropriate error codes.
this.onconnection!(MapPrototypeGet(codeMap, "UNKNOWN")!, undefined);
connection = await this.#listener.accept();
} catch (e) {
if (
ObjectPrototypeIsPrototypeOf(Deno.errors.BadResource.prototype, e) &&
this.#closed
) {
// Listener and server has closed.
return;
}
try {
// TODO(cmorten): map errors to appropriate error codes.
this.onconnection!(MapPrototypeGet(codeMap, "UNKNOWN")!, undefined);
} catch {
// swallow callback errors.
}
await this.#acceptBackoff();
continue;
}
// Reset the backoff delay upon successful accept.
this.#acceptBackoffDelay = undefined;
const connectionHandle = new Pipe(socketType.SOCKET, connection);
this.#connections++;
try {
this.onconnection!(0, connectionHandle);
} catch {
// swallow callback errors.
}
this.#acceptBackoff();
return;
}
// Reset the backoff delay upon successful accept.
this.#acceptBackoffDelay = undefined;
const connectionHandle = new Pipe(socketType.SOCKET, connection);
this.#connections++;
try {
this.onconnection!(0, connectionHandle);
} catch {
// swallow callback errors.
}
return this.#accept();
}
/** Handle server closure. */
@ -368,6 +596,11 @@ export class Pipe extends ConnectionWrap {
this.#acceptBackoffDelay = undefined;
if (this.provider === providerType.PIPESERVERWRAP) {
if (this.#serverPipeRid !== undefined) {
core.tryClose(this.#serverPipeRid);
this.#serverPipeRid = undefined;
}
try {
this.#listener.close();
} catch {

View file

@ -808,6 +808,7 @@
"parallel/test-net-persistent-keepalive.js" = {}
"parallel/test-net-persistent-nodelay.js" = {}
"parallel/test-net-persistent-ref-unref.js" = {}
"parallel/test-net-pipe-connect-errors.js" = {}
"parallel/test-net-pipe-with-long-path.js" = {}
"parallel/test-net-reconnect.js" = {}
"parallel/test-net-remote-address-port.js" = {}

View file

@ -0,0 +1,15 @@
{
"tempDir": true,
"tests": {
"invalid_fd": {
"if": "unix",
"args": "run -A invalid_fd.ts",
"output": "invalid_fd.out"
},
"open_fd": {
"if": "unix",
"args": "run -A --unstable-ffi open_fd.ts",
"output": "open_fd.out"
}
}
}

View file

@ -0,0 +1,2 @@
open(-1) returned: -22
PASS: Invalid fd returns error code

View file

@ -0,0 +1,18 @@
// Test Pipe.prototype.open() with invalid fd
import { createRequire } from "node:module";
const require = createRequire(import.meta.url);
const { Pipe, constants: PipeConstants } = require("internal/test/binding")
.internalBinding("pipe_wrap");
const pipe = new Pipe(PipeConstants.SOCKET);
const result = pipe.open(-1);
// Should return a non-zero error code for invalid fd
console.log(`open(-1) returned: ${result}`);
if (result !== 0) {
console.log("PASS: Invalid fd returns error code");
} else {
console.log("FAIL: Invalid fd should return error code");
Deno.exit(1);
}

View file

@ -0,0 +1,3 @@
Created socketpair: fd [WILDCARD]
Pipe.open([WILDCARD]) returned: 0
PASS: Pipe.open() succeeded with valid fd

View file

@ -0,0 +1,45 @@
// Test Pipe.prototype.open(fd) happy path using socketpair via FFI
import { createRequire } from "node:module";
const require = createRequire(import.meta.url);
const { Pipe, constants: PipeConstants } = require("internal/test/binding")
.internalBinding("pipe_wrap");
// Use FFI to create a socketpair
const libName = Deno.build.os === "darwin" ? "libSystem.B.dylib" : "libc.so.6";
const libc = Deno.dlopen(libName, {
socketpair: { parameters: ["i32", "i32", "i32", "buffer"], result: "i32" },
close: { parameters: ["i32"], result: "i32" },
});
const AF_UNIX = 1;
const SOCK_STREAM = 1;
const fds = new Int32Array(2);
const result = libc.symbols.socketpair(AF_UNIX, SOCK_STREAM, 0, fds);
if (result !== 0) {
console.log("FAIL: socketpair failed");
Deno.exit(1);
}
const fd0 = fds[0];
const fd1 = fds[1];
console.log(`Created socketpair: fd ${fd0} <-> fd ${fd1}`);
// Test Pipe.prototype.open() with a valid fd
const pipe = new Pipe(PipeConstants.SOCKET);
const openResult = pipe.open(fd0);
console.log(`Pipe.open(${fd0}) returned: ${openResult}`);
if (openResult === 0) {
console.log("PASS: Pipe.open() succeeded with valid fd");
} else {
console.log("FAIL: Pipe.open() should return 0 for valid fd");
libc.symbols.close(fd1);
libc.close();
Deno.exit(1);
}
// Clean up the other fd (fd0 is now owned by the Pipe)
libc.symbols.close(fd1);
libc.close();