fix(node): support advanced serialization in IPC (#31380)
Some checks are pending
ci / test release macos-aarch64 (push) Blocked by required conditions
ci / bench release linux-x86_64 (push) Blocked by required conditions
ci / test debug linux-x86_64 (push) Blocked by required conditions
ci / test release linux-x86_64 (push) Blocked by required conditions
ci / test debug macos-x86_64 (push) Blocked by required conditions
ci / test release macos-x86_64 (push) Blocked by required conditions
ci / test debug windows-x86_64 (push) Blocked by required conditions
ci / test release windows-x86_64 (push) Blocked by required conditions
ci / pre-build (push) Waiting to run
ci / test debug linux-aarch64 (push) Blocked by required conditions
ci / test release linux-aarch64 (push) Blocked by required conditions
ci / test debug macos-aarch64 (push) Blocked by required conditions
ci / lint debug linux-x86_64 (push) Blocked by required conditions
ci / lint debug macos-x86_64 (push) Blocked by required conditions
ci / lint debug windows-x86_64 (push) Blocked by required conditions
ci / build libs (push) Blocked by required conditions
ci / publish canary (push) Blocked by required conditions

Fixes #31354.
This commit is contained in:
Nathan Whitaker 2025-11-26 18:15:32 -05:00 committed by GitHub
parent 3e22a737ed
commit faf9505814
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 1097 additions and 99 deletions

1
Cargo.lock generated
View file

@ -2474,6 +2474,7 @@ dependencies = [
"k256",
"libc",
"libz-sys",
"log",
"md-5",
"md4",
"nix 0.27.1",

View file

@ -201,7 +201,7 @@ jupyter_runtime = "=0.19.0"
lazy-regex = "3"
libc = "0.2.168"
libz-sys = { version = "1.1.20", default-features = false }
log = { version = "0.4.20", features = ["kv"] }
log = { version = "0.4.28", features = ["kv"] }
lsp-types = "=0.97.0" # used by tower-lsp and "proposed" feature is unstable in patch releases
memchr = "2.7.4"
mime = "0.3.16"

View file

@ -48,6 +48,7 @@ use deno_npm_installer::LifecycleScriptsConfig;
use deno_npm_installer::graph::NpmCachingStrategy;
use deno_path_util::resolve_url_or_path;
use deno_resolver::factory::resolve_jsr_url;
use deno_runtime::deno_node::ops::ipc::ChildIpcSerialization;
use deno_runtime::deno_permissions::AllowRunDescriptor;
use deno_runtime::deno_permissions::PathDescriptor;
use deno_runtime::deno_permissions::PermissionsOptions;
@ -586,18 +587,31 @@ impl CliOptions {
)
}
pub fn node_ipc_fd(&self) -> Option<i64> {
pub fn node_ipc_init(
&self,
) -> Result<Option<(i64, ChildIpcSerialization)>, AnyError> {
let maybe_node_channel_fd = std::env::var("NODE_CHANNEL_FD").ok();
if let Some(node_channel_fd) = maybe_node_channel_fd {
// Remove so that child processes don't inherit this environment variable.
#[allow(clippy::undocumented_unsafe_blocks)]
unsafe {
std::env::remove_var("NODE_CHANNEL_FD")
};
node_channel_fd.parse::<i64>().ok()
let maybe_node_channel_serialization = if let Ok(serialization) =
std::env::var("NODE_CHANNEL_SERIALIZATION_MODE")
{
Some(serialization.parse::<ChildIpcSerialization>()?)
} else {
None
};
if let Some(node_channel_fd) = maybe_node_channel_fd {
// Remove so that child processes don't inherit this environment variables.
#[allow(clippy::undocumented_unsafe_blocks)]
unsafe {
std::env::remove_var("NODE_CHANNEL_FD");
std::env::remove_var("NODE_CHANNEL_SERIALIZATION_MODE");
}
let node_channel_fd = node_channel_fd.parse::<i64>()?;
Ok(Some((
node_channel_fd,
maybe_node_channel_serialization.unwrap_or(ChildIpcSerialization::Json),
)))
} else {
Ok(None)
}
}

View file

@ -1134,7 +1134,7 @@ impl CliFactory {
unsafely_ignore_certificate_errors: cli_options
.unsafely_ignore_certificate_errors()
.clone(),
node_ipc: cli_options.node_ipc_fd(),
node_ipc_init: cli_options.node_ipc_init()?,
serve_port: cli_options.serve_port(),
serve_host: cli_options.serve_host(),
otel_config: cli_options.otel_config(),

View file

@ -8,6 +8,7 @@ use std::sync::Arc;
use deno_bundle_runtime::BundleProvider;
use deno_core::error::JsError;
use deno_node::NodeRequireLoaderRc;
use deno_node::ops::ipc::ChildIpcSerialization;
use deno_path_util::url_from_file_path;
use deno_path_util::url_to_file_path;
use deno_resolver::npm::DenoInNpmPackageChecker;
@ -345,7 +346,7 @@ pub struct LibMainWorkerOptions {
pub seed: Option<u64>,
pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
pub skip_op_registration: bool,
pub node_ipc: Option<i64>,
pub node_ipc_init: Option<(i64, ChildIpcSerialization)>,
pub no_legacy_abort: bool,
pub startup_snapshot: Option<&'static [u8]>,
pub serve_port: Option<u16>,
@ -489,7 +490,7 @@ impl<TSys: DenoLibSys> LibWorkerFactorySharedState<TSys> {
has_node_modules_dir: shared.options.has_node_modules_dir,
argv0: shared.options.argv0.clone(),
node_debug: shared.options.node_debug.clone(),
node_ipc_fd: None,
node_ipc_init: None,
mode: WorkerExecutionMode::Worker,
serve_port: shared.options.serve_port,
serve_host: shared.options.serve_host.clone(),
@ -688,7 +689,7 @@ impl<TSys: DenoLibSys> LibMainWorkerFactory<TSys> {
has_node_modules_dir: shared.options.has_node_modules_dir,
argv0: shared.options.argv0.clone(),
node_debug: shared.options.node_debug.clone(),
node_ipc_fd: shared.options.node_ipc,
node_ipc_init: shared.options.node_ipc_init,
mode,
no_legacy_abort: shared.options.no_legacy_abort,
serve_port: shared.options.serve_port,

View file

@ -1029,7 +1029,7 @@ pub async fn run(
seed: metadata.seed,
unsafely_ignore_certificate_errors: metadata
.unsafely_ignore_certificate_errors,
node_ipc: None,
node_ipc_init: None,
serve_port: None,
serve_host: None,
otel_config: metadata.otel_config,

View file

@ -62,6 +62,7 @@ ipnetwork.workspace = true
k256.workspace = true
libc.workspace = true
libz-sys.workspace = true
log.workspace = true
md-5 = { workspace = true, features = ["oid"] }
md4.workspace = true
node_resolver.workspace = true

View file

@ -367,8 +367,11 @@ deno_core::extension!(deno_node,
ops::util::op_node_in_npm_package<TInNpmPackageChecker, TNpmPackageFolderResolver, TSys>,
ops::worker_threads::op_worker_threads_filename<TSys>,
ops::ipc::op_node_child_ipc_pipe,
ops::ipc::op_node_ipc_write,
ops::ipc::op_node_ipc_read,
ops::ipc::op_node_ipc_write_json,
ops::ipc::op_node_ipc_read_json,
ops::ipc::op_node_ipc_read_advanced,
ops::ipc::op_node_ipc_write_advanced,
ops::ipc::op_node_ipc_buffer_constructor,
ops::ipc::op_node_ipc_ref,
ops::ipc::op_node_ipc_unref,
ops::process::op_node_process_kill,

View file

@ -2,7 +2,27 @@
pub use impl_::*;
pub struct ChildPipeFd(pub i64);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ChildIpcSerialization {
Json,
Advanced,
}
impl std::str::FromStr for ChildIpcSerialization {
type Err = deno_core::anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"json" => Ok(ChildIpcSerialization::Json),
"advanced" => Ok(ChildIpcSerialization::Advanced),
_ => Err(deno_core::anyhow::anyhow!(
"Invalid serialization type: {}",
s
)),
}
}
}
pub struct ChildPipeFd(pub i64, pub ChildIpcSerialization);
mod impl_ {
use std::cell::RefCell;
@ -20,13 +40,20 @@ mod impl_ {
use deno_core::serde::Serializer;
use deno_core::serde_json;
use deno_core::v8;
use deno_core::v8::ValueDeserializerHelper;
use deno_core::v8::ValueSerializerHelper;
use deno_error::JsErrorBox;
pub use deno_process::ipc::INITIAL_CAPACITY;
use deno_process::ipc::IpcAdvancedStreamError;
use deno_process::ipc::IpcAdvancedStreamResource;
use deno_process::ipc::IpcJsonStreamError;
pub use deno_process::ipc::IpcJsonStreamResource;
pub use deno_process::ipc::IpcRefTracker;
use serde::Serialize;
use crate::ChildPipeFd;
use crate::ops::ipc::ChildIpcSerialization;
/// Wrapper around v8 value that implements Serialize.
struct SerializeWrapper<'a, 'b, 'c>(
RefCell<&'b mut v8::PinScope<'a, 'c>>,
@ -148,20 +175,31 @@ mod impl_ {
// Open IPC pipe from bootstrap options.
#[op2]
#[smi]
#[to_v8]
pub fn op_node_child_ipc_pipe(
state: &mut OpState,
) -> Result<Option<ResourceId>, io::Error> {
let fd = match state.try_borrow_mut::<crate::ChildPipeFd>() {
Some(child_pipe_fd) => child_pipe_fd.0,
) -> Result<Option<(ResourceId, u8)>, io::Error> {
let (fd, serialization) = match state.try_borrow_mut::<crate::ChildPipeFd>()
{
Some(ChildPipeFd(fd, serialization)) => (*fd, *serialization),
None => return Ok(None),
};
log::debug!("op_node_child_ipc_pipe: {:?}, {:?}", fd, serialization);
let ref_tracker = IpcRefTracker::new(state.external_ops_tracker.clone());
Ok(Some(
state
.resource_table
.add(IpcJsonStreamResource::new(fd, ref_tracker)?),
))
match serialization {
ChildIpcSerialization::Json => Ok(Some((
state
.resource_table
.add(IpcJsonStreamResource::new(fd, ref_tracker)?),
0,
))),
ChildIpcSerialization::Advanced => Ok(Some((
state
.resource_table
.add(IpcAdvancedStreamResource::new(fd, ref_tracker)?),
1,
))),
}
}
#[derive(Debug, thiserror::Error, deno_error::JsError)]
@ -171,6 +209,9 @@ mod impl_ {
Resource(#[from] deno_core::error::ResourceError),
#[class(inherit)]
#[error(transparent)]
IpcAdvancedStream(#[from] IpcAdvancedStreamError),
#[class(inherit)]
#[error(transparent)]
IpcJsonStream(#[from] IpcJsonStreamError),
#[class(inherit)]
#[error(transparent)]
@ -178,10 +219,16 @@ mod impl_ {
#[class(inherit)]
#[error("failed to serialize json value: {0}")]
SerdeJson(serde_json::Error),
#[class(type)]
#[error("Failed to read header")]
ReadHeaderFailed,
#[class(type)]
#[error("Failed to read value")]
ReadValueFailed,
}
#[op2(async)]
pub fn op_node_ipc_write<'a>(
pub fn op_node_ipc_write_json<'a>(
scope: &mut v8::PinScope<'a, '_>,
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
@ -226,6 +273,510 @@ mod impl_ {
})
}
pub struct AdvancedSerializerDelegate {
constants: AdvancedIpcConstants,
}
impl AdvancedSerializerDelegate {
fn new(constants: AdvancedIpcConstants) -> Self {
Self { constants }
}
}
const ARRAY_BUFFER_VIEW_TAG: u32 = 0;
const NOT_ARRAY_BUFFER_VIEW_TAG: u32 = 1;
fn ab_view_to_index<'s>(
scope: &mut v8::PinScope<'s, '_>,
view: v8::Local<'s, v8::ArrayBufferView>,
constants: &AdvancedIpcConstants,
) -> Option<u32> {
if view.is_int8_array() {
Some(0)
} else if view.is_uint8_array() {
let constructor = view
.get(
scope,
v8::Local::new(scope, &constants.inner.constructor_key).into(),
)
.unwrap();
let buffer_constructor = v8::Local::<v8::Value>::from(v8::Local::new(
scope,
&constants.inner.buffer_constructor,
));
if constructor == buffer_constructor {
Some(10)
} else {
Some(1)
}
} else if view.is_uint8_clamped_array() {
Some(2)
} else if view.is_int16_array() {
Some(3)
} else if view.is_uint16_array() {
Some(4)
} else if view.is_int32_array() {
Some(5)
} else if view.is_uint32_array() {
Some(6)
} else if view.is_float32_array() {
Some(7)
} else if view.is_float64_array() {
Some(8)
} else if view.is_data_view() {
Some(9)
} else if view.is_big_int64_array() {
Some(11)
} else if view.is_big_uint64_array() {
Some(12)
} else if view.is_float16_array() {
Some(13)
} else {
None
}
}
impl v8::ValueSerializerImpl for AdvancedSerializerDelegate {
fn throw_data_clone_error<'s>(
&self,
scope: &mut v8::PinScope<'s, '_>,
message: v8::Local<'s, v8::String>,
) {
let error = v8::Exception::type_error(scope, message);
scope.throw_exception(error);
}
fn has_custom_host_object(&self, _isolate: &v8::Isolate) -> bool {
false
}
fn write_host_object<'s>(
&self,
scope: &mut v8::PinScope<'s, '_>,
object: v8::Local<'s, v8::Object>,
value_serializer: &dyn v8::ValueSerializerHelper,
) -> Option<bool> {
if object.is_array_buffer_view() {
let ab_view = object.cast::<v8::ArrayBufferView>();
value_serializer.write_uint32(ARRAY_BUFFER_VIEW_TAG);
let Some(index) = ab_view_to_index(scope, ab_view, &self.constants)
else {
scope.throw_exception(v8::Exception::type_error(
scope,
v8::String::new_from_utf8(
scope,
format!("Unserializable host object: {}", object.type_repr())
.as_bytes(),
v8::NewStringType::Normal,
)
.unwrap(),
));
return None;
};
value_serializer.write_uint32(index);
value_serializer.write_uint32(ab_view.byte_length() as u32);
let mut storage = [0u8; v8::TYPED_ARRAY_MAX_SIZE_IN_HEAP];
let slice = ab_view.get_contents(&mut storage);
value_serializer.write_raw_bytes(slice);
Some(true)
} else {
value_serializer.write_uint32(NOT_ARRAY_BUFFER_VIEW_TAG);
value_serializer
.write_value(scope.get_current_context(), object.into());
Some(true)
}
}
fn get_shared_array_buffer_id<'s>(
&self,
_scope: &mut v8::PinScope<'s, '_>,
_shared_array_buffer: v8::Local<'s, v8::SharedArrayBuffer>,
) -> Option<u32> {
None
}
}
#[derive(Clone)]
struct AdvancedIpcConstants {
inner: Rc<AdvancedIpcConstantsInner>,
}
struct AdvancedIpcConstantsInner {
buffer_constructor: v8::Global<v8::Function>,
constructor_key: v8::Global<v8::String>,
fast_buffer_prototype: v8::Global<v8::Object>,
}
#[op2(fast)]
pub fn op_node_ipc_buffer_constructor(
scope: &mut v8::PinScope<'_, '_>,
state: &mut OpState,
buffer_constructor: v8::Local<'_, v8::Function>,
fast_buffer_prototype: v8::Local<'_, v8::Object>,
) {
if state.has::<AdvancedIpcConstants>() {
return;
}
let constants = AdvancedIpcConstants {
inner: Rc::new(AdvancedIpcConstantsInner {
buffer_constructor: v8::Global::new(scope, buffer_constructor),
constructor_key: v8::Global::new(
scope,
v8::String::new_from_utf8(
scope,
b"constructor",
v8::NewStringType::Internalized,
)
.unwrap(),
),
fast_buffer_prototype: v8::Global::new(scope, fast_buffer_prototype),
}),
};
state.put(constants);
}
#[op2(async)]
pub fn op_node_ipc_write_advanced<'a>(
scope: &mut v8::PinScope<'a, '_>,
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
value: v8::Local<'a, v8::Value>,
// using an array as an "out parameter".
// index 0 is a boolean indicating whether the queue is under the limit.
//
// ideally we would just return `Result<(impl Future, bool), ..>`, but that's not
// supported by `op2` currently.
queue_ok: v8::Local<'a, v8::Array>,
) -> Result<impl Future<Output = Result<(), io::Error>> + use<>, IpcError> {
let constants = state.borrow().borrow::<AdvancedIpcConstants>().clone();
let serializer = AdvancedSerializer::new(scope, constants);
let serialized = serializer.serialize(scope, value)?;
let stream = state
.borrow()
.resource_table
.get::<IpcAdvancedStreamResource>(rid)?;
let old = stream
.queued_bytes
.fetch_add(serialized.len(), std::sync::atomic::Ordering::Relaxed);
if old + serialized.len() > 2 * INITIAL_CAPACITY {
// sending messages too fast
let Ok(v) = false.to_v8(scope);
queue_ok.set_index(scope, 0, v);
}
Ok(async move {
let cancel = stream.cancel.clone();
let result = stream
.clone()
.write_msg_bytes(&serialized)
.or_cancel(cancel)
.await;
// adjust count even on error
stream
.queued_bytes
.fetch_sub(serialized.len(), std::sync::atomic::Ordering::Relaxed);
result??;
Ok(())
})
}
struct AdvancedSerializer {
inner: v8::ValueSerializer<'static>,
}
impl AdvancedSerializer {
fn new(
scope: &mut v8::PinScope<'_, '_>,
constants: AdvancedIpcConstants,
) -> Self {
let inner = v8::ValueSerializer::new(
scope,
Box::new(AdvancedSerializerDelegate::new(constants)),
);
inner.set_treat_array_buffer_views_as_host_objects(true);
Self { inner }
}
fn serialize<'s, 'i>(
&self,
scope: &mut v8::PinScope<'s, 'i>,
value: v8::Local<'s, v8::Value>,
) -> Result<Vec<u8>, IpcError> {
self.inner.write_raw_bytes(&[0, 0, 0, 0]);
self.inner.write_header();
let context = scope.get_current_context();
self.inner.write_value(context, value);
let mut ser = self.inner.release();
let length = ser.len() - 4;
ser[0] = ((length >> 24) & 0xFF) as u8;
ser[1] = ((length >> 16) & 0xFF) as u8;
ser[2] = ((length >> 8) & 0xFF) as u8;
ser[3] = (length & 0xFF) as u8;
Ok(ser)
}
}
struct AdvancedIpcDeserializer {
inner: v8::ValueDeserializer<'static>,
}
struct AdvancedIpcDeserializerDelegate {
constants: AdvancedIpcConstants,
}
impl v8::ValueDeserializerImpl for AdvancedIpcDeserializerDelegate {
fn read_host_object<'s>(
&self,
scope: &mut v8::PinScope<'s, '_>,
deser: &dyn ValueDeserializerHelper,
) -> Option<v8::Local<'s, v8::Object>> {
let throw_error = |message: &str| {
scope.throw_exception(v8::Exception::type_error(
scope,
v8::String::new_from_utf8(
scope,
message.as_bytes(),
v8::NewStringType::Normal,
)
.unwrap(),
));
None
};
let mut tag = 0;
if !deser.read_uint32(&mut tag) {
return throw_error("Failed to read tag");
}
match tag {
ARRAY_BUFFER_VIEW_TAG => {
let mut index = 0;
if !deser.read_uint32(&mut index) {
return throw_error("Failed to read array buffer view type tag");
}
let mut byte_length = 0;
if !deser.read_uint32(&mut byte_length) {
return throw_error("Failed to read byte length");
}
let Some(buf) = deser.read_raw_bytes(byte_length as usize) else {
return throw_error("failed to read bytes for typed array");
};
let array_buffer = v8::ArrayBuffer::new(scope, byte_length as usize);
// SAFETY: array_buffer is valid as v8 is keeping it alive, and is byte_length bytes
// buf is also byte_length bytes long
unsafe {
std::ptr::copy(
buf.as_ptr(),
array_buffer.data().unwrap().as_ptr().cast::<u8>(),
byte_length as usize,
);
}
let value = match index {
0 => {
v8::Int8Array::new(scope, array_buffer, 0, byte_length as usize)
.unwrap()
.into()
}
1 => {
v8::Uint8Array::new(scope, array_buffer, 0, byte_length as usize)
.unwrap()
.into()
}
10 => {
let obj: v8::Local<v8::Object> = v8::Uint8Array::new(
scope,
array_buffer,
0,
byte_length as usize,
)?
.into();
let fast_proto = v8::Local::new(
scope,
&self.constants.inner.fast_buffer_prototype,
);
obj.set_prototype(scope, fast_proto.into());
obj
}
2 => v8::Uint8ClampedArray::new(
scope,
array_buffer,
0,
byte_length as usize,
)?
.into(),
3 => v8::Int16Array::new(
scope,
array_buffer,
0,
byte_length as usize / 2,
)?
.into(),
4 => v8::Uint16Array::new(
scope,
array_buffer,
0,
byte_length as usize / 2,
)?
.into(),
5 => v8::Int32Array::new(
scope,
array_buffer,
0,
byte_length as usize / 4,
)?
.into(),
6 => v8::Uint32Array::new(
scope,
array_buffer,
0,
byte_length as usize / 4,
)?
.into(),
7 => v8::Float32Array::new(
scope,
array_buffer,
0,
byte_length as usize / 4,
)
.unwrap()
.into(),
8 => v8::Float64Array::new(
scope,
array_buffer,
0,
byte_length as usize / 8,
)?
.into(),
9 => {
v8::DataView::new(scope, array_buffer, 0, byte_length as usize)
.into()
}
11 => v8::BigInt64Array::new(
scope,
array_buffer,
0,
byte_length as usize / 8,
)?
.into(),
12 => v8::BigUint64Array::new(
scope,
array_buffer,
0,
byte_length as usize / 8,
)?
.into(),
// TODO(nathanwhit): this should just be `into()`, but I forgot to impl it in rusty_v8.
// the underlying impl is just a transmute though.
// SAFETY: float16array is an object
13 => unsafe {
std::mem::transmute::<
v8::Local<v8::Float16Array>,
v8::Local<v8::Object>,
>(v8::Float16Array::new(
scope,
array_buffer,
0,
byte_length as usize / 2,
)?)
},
_ => return None,
};
Some(value)
}
NOT_ARRAY_BUFFER_VIEW_TAG => {
let value = deser.read_value(scope.get_current_context());
Some(value.unwrap_or_else(|| v8::null(scope).into()).cast())
}
_ => {
throw_error(&format!("Invalid tag: {}", tag));
None
}
}
}
}
impl AdvancedIpcDeserializer {
fn new(
scope: &mut v8::PinScope<'_, '_>,
constants: AdvancedIpcConstants,
msg_bytes: &[u8],
) -> Self {
let inner = v8::ValueDeserializer::new(
scope,
Box::new(AdvancedIpcDeserializerDelegate { constants }),
msg_bytes,
);
Self { inner }
}
}
struct AdvancedIpcReadResult {
msg_bytes: Option<Vec<u8>>,
constants: AdvancedIpcConstants,
}
fn make_stop_sentinel<'s>(
scope: &mut v8::PinScope<'s, '_>,
) -> v8::Local<'s, v8::Value> {
let obj = v8::Object::new(scope);
obj.set(
scope,
v8::String::new_from_utf8(scope, b"cmd", v8::NewStringType::Internalized)
.unwrap()
.into(),
v8::String::new_from_utf8(
scope,
b"NODE_CLOSE",
v8::NewStringType::Internalized,
)
.unwrap()
.into(),
);
obj.into()
}
impl<'a> deno_core::ToV8<'a> for AdvancedIpcReadResult {
type Error = IpcError;
fn to_v8(
self,
scope: &mut v8::PinScope<'a, '_>,
) -> Result<v8::Local<'a, v8::Value>, Self::Error> {
let Some(msg_bytes) = self.msg_bytes else {
return Ok(make_stop_sentinel(scope));
};
let deser =
AdvancedIpcDeserializer::new(scope, self.constants, &msg_bytes);
let context = scope.get_current_context();
let header_success = deser.inner.read_header(context).unwrap_or(false);
if !header_success {
return Err(IpcError::ReadHeaderFailed);
}
let Some(value) = deser.inner.read_value(context) else {
return Err(IpcError::ReadValueFailed);
};
Ok(value)
}
}
#[op2(async)]
#[to_v8]
pub async fn op_node_ipc_read_advanced(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> Result<AdvancedIpcReadResult, IpcError> {
let stream = state
.borrow()
.resource_table
.get::<IpcAdvancedStreamResource>(rid)?;
let cancel = stream.cancel.clone();
let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await;
let msg_bytes = stream.read_msg_bytes().or_cancel(cancel).await??;
Ok(AdvancedIpcReadResult {
msg_bytes,
constants: state.borrow().borrow::<AdvancedIpcConstants>().clone(),
})
}
/// Value signaling that the other end ipc channel has closed.
///
/// Node reserves objects of this form (`{ "cmd": "NODE_<something>"`)
@ -238,7 +789,7 @@ mod impl_ {
#[op2(async)]
#[serde]
pub async fn op_node_ipc_read(
pub async fn op_node_ipc_read_json(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> Result<serde_json::Value, IpcError> {
@ -258,21 +809,45 @@ mod impl_ {
}
#[op2(fast)]
pub fn op_node_ipc_ref(state: &mut OpState, #[smi] rid: ResourceId) {
let stream = state
.resource_table
.get::<IpcJsonStreamResource>(rid)
.expect("Invalid resource ID");
stream.ref_tracker.ref_();
pub fn op_node_ipc_ref(
state: &mut OpState,
#[smi] rid: ResourceId,
serialization_json: bool,
) {
if serialization_json {
let stream = state
.resource_table
.get::<IpcJsonStreamResource>(rid)
.expect("Invalid resource ID");
stream.ref_tracker.ref_();
} else {
let stream = state
.resource_table
.get::<IpcAdvancedStreamResource>(rid)
.expect("Invalid resource ID");
stream.ref_tracker.ref_();
}
}
#[op2(fast)]
pub fn op_node_ipc_unref(state: &mut OpState, #[smi] rid: ResourceId) {
let stream = state
.resource_table
.get::<IpcJsonStreamResource>(rid)
.expect("Invalid resource ID");
stream.ref_tracker.unref();
pub fn op_node_ipc_unref(
state: &mut OpState,
#[smi] rid: ResourceId,
serialization_json: bool,
) {
if serialization_json {
let stream = state
.resource_table
.get::<IpcJsonStreamResource>(rid)
.expect("Invalid resource ID");
stream.ref_tracker.unref();
} else {
let stream = state
.resource_table
.get::<IpcAdvancedStreamResource>(rid)
.expect("Invalid resource ID");
stream.ref_tracker.unref();
}
}
#[cfg(test)]

View file

@ -330,11 +330,18 @@ pub fn op_v8_new_deserializer(
pub fn op_v8_transfer_array_buffer_de(
#[cppgc] deser: &Deserializer,
#[smi] id: u32,
array_buffer: v8::Local<v8::ArrayBuffer>,
) {
// TODO(nathanwhit): also need binding for TransferSharedArrayBuffer, then call that if
// array_buffer is shared
array_buffer: v8::Local<v8::Value>,
) -> Result<(), deno_core::error::DataError> {
if let Ok(shared_array_buffer) =
array_buffer.try_cast::<v8::SharedArrayBuffer>()
{
deser
.inner
.transfer_shared_array_buffer(id, shared_array_buffer)
}
let array_buffer = array_buffer.try_cast::<v8::ArrayBuffer>()?;
deser.inner.transfer_array_buffer(id, array_buffer);
Ok(())
}
#[op2(fast)]

View file

@ -835,9 +835,12 @@ export function execFileSync(
}
function setupChildProcessIpcChannel() {
const fd = op_node_child_ipc_pipe();
const maybePipe = op_node_child_ipc_pipe();
if (!maybePipe) return;
const [fd, serialization] = maybePipe;
const serializationMode = serialization === 0 ? "json" : "advanced";
if (typeof fd != "number" || fd < 0) return;
const control = setupChannel(process, fd);
const control = setupChannel(process, fd, serializationMode);
process.on("newListener", (name: string) => {
if (name === "message" || name === "disconnect") {
control.refCounted();

View file

@ -176,7 +176,7 @@ function showFlaggedDeprecation() {
bufferWarningAlreadyEmitted = true;
}
class FastBuffer extends Uint8Array {
export class FastBuffer extends Uint8Array {
constructor(bufferOrLength, byteOffset, length) {
super(bufferOrLength, byteOffset, length);
}

View file

@ -9,10 +9,13 @@
import { core, internals } from "ext:core/mod.js";
import {
op_node_in_npm_package,
op_node_ipc_read,
op_node_ipc_buffer_constructor,
op_node_ipc_read_advanced,
op_node_ipc_read_json,
op_node_ipc_ref,
op_node_ipc_unref,
op_node_ipc_write,
op_node_ipc_write_advanced,
op_node_ipc_write_json,
} from "ext:core/ops";
import {
ArrayIsArray,
@ -41,6 +44,7 @@ import {
ERR_UNKNOWN_SIGNAL,
} from "ext:deno_node/internal/errors.ts";
import { Buffer } from "node:buffer";
import { FastBuffer } from "ext:deno_node/internal/buffer.mjs";
import { errnoException } from "ext:deno_node/internal/errors.ts";
import { ErrnoException } from "ext:deno_node/_global.d.ts";
import { codeMap } from "ext:deno_node/internal_binding/uv.ts";
@ -48,6 +52,7 @@ import {
isInt32,
validateBoolean,
validateObject,
validateOneOf,
validateString,
} from "ext:deno_node/internal/validators.mjs";
import { kEmptyObject } from "ext:deno_node/internal/util.mjs";
@ -62,6 +67,7 @@ import {
kInputOption,
kIpc,
kNeedsNpmProcessState,
kSerialization,
} from "ext:deno_process/40_process.js";
export function mapValues<T, O>(
@ -248,6 +254,8 @@ export class ChildProcess extends EventEmitter {
windowsVerbatimArguments = false,
detached,
} = options || {};
const serialization = options?.serialization || "json";
const normalizedStdio = normalizeStdioOption(stdio);
const [
stdin = "pipe",
@ -287,6 +295,7 @@ export class ChildProcess extends EventEmitter {
stderr: toDenoStdio(stderr),
windowsRawArguments: windowsVerbatimArguments,
detached,
[kSerialization]: serialization,
[kIpc]: ipc, // internal
[kExtraStdio]: extraStdioNormalized,
[kNeedsNpmProcessState]:
@ -386,7 +395,7 @@ export class ChildProcess extends EventEmitter {
const pipeRid = internals.getIpcPipeRid(this.#process);
if (typeof pipeRid == "number") {
setupChannel(this, pipeRid);
setupChannel(this, pipeRid, serialization);
this[kClosesNeeded]++;
this.on("disconnect", () => {
maybeClose(this);
@ -763,6 +772,12 @@ export function normalizeSpawnArguments(
);
}
validateOneOf(options.serialization, "options.serialization", [
undefined,
"json",
"advanced",
]);
if (options.shell) {
const command = ArrayPrototypeJoin([file, ...args], " ");
// Set the shell, switches, and commands.
@ -849,6 +864,7 @@ export function normalizeSpawnArguments(
file,
windowsHide: !!options.windowsHide,
windowsVerbatimArguments: !!windowsVerbatimArguments,
serialization: options.serialization || "json",
};
}
@ -1332,20 +1348,22 @@ class Control extends EventEmitter {
#refExplicitlySet = false;
#connected = true;
[kPendingMessages] = [];
constructor(channel: number) {
#serialization: "json" | "advanced";
constructor(channel: number, serialization: "json" | "advanced") {
super();
this.#channel = channel;
this.#serialization = serialization;
}
#ref() {
if (this.#connected) {
op_node_ipc_ref(this.#channel);
op_node_ipc_ref(this.#channel, this.#serialization === "json");
}
}
#unref() {
if (this.#connected) {
op_node_ipc_unref(this.#channel);
op_node_ipc_unref(this.#channel, this.#serialization === "json");
}
}
@ -1397,18 +1415,37 @@ function internalCmdName(msg: InternalMessage): string {
return StringPrototypeSlice(msg.cmd, 5);
}
// deno-lint-ignore no-explicit-any
export function setupChannel(target: any, ipc: number) {
const control = new Control(ipc);
let hasSetBufferConstructor = false;
export function setupChannel(
// deno-lint-ignore no-explicit-any
target: any,
ipc: number,
serialization: "json" | "advanced",
) {
const control = new Control(ipc, serialization);
target.channel = control;
if (!hasSetBufferConstructor) {
op_node_ipc_buffer_constructor(Buffer, FastBuffer.prototype);
hasSetBufferConstructor = true;
}
const writeFn = serialization === "json"
? op_node_ipc_write_json
: op_node_ipc_write_advanced;
const readFn = serialization === "json"
? op_node_ipc_read_json
: op_node_ipc_read_advanced;
async function readLoop() {
try {
while (true) {
if (!target.connected || target.killed) {
return;
}
const prom = op_node_ipc_read(ipc);
// TODO(nathanwhit): maybe allow returning multiple messages in a single read? needs benchmarking.
const prom = readFn(ipc);
// there will always be a pending read promise,
// but it shouldn't keep the event loop from exiting
core.unrefOpPromise(prom);
@ -1500,7 +1537,7 @@ export function setupChannel(target: any, ipc: number) {
// this acts as a backpressure mechanism.
const queueOk = [true];
control.refCounted();
op_node_ipc_write(ipc, message, queueOk)
writeFn(ipc, message, queueOk)
.then(() => {
control.unrefCounted();
if (callback) {

View file

@ -32,6 +32,7 @@ import { primordials } from "ext:core/mod.js";
const {
ObjectPrototypeHasOwnProperty,
ObjectPrototypePropertyIsEnumerable,
TypedArrayPrototypeGetSymbolToStringTag,
} = primordials;
enum valueType {
@ -145,23 +146,6 @@ function innerDeepEqual(
return false;
}
} else if (isArrayBufferView(val1)) {
const TypedArrayPrototypeGetSymbolToStringTag = (
val:
| BigInt64Array
| BigUint64Array
| Float32Array
| Float64Array
| Int8Array
| Int16Array
| Int32Array
| Uint8Array
| Uint8ClampedArray
| Uint16Array
| Uint32Array,
) =>
Object.getOwnPropertySymbols(val)
.map((item) => item.toString())
.toString();
if (
isTypedArray(val1) &&
isTypedArray(val2) &&

View file

@ -162,6 +162,7 @@ function run({
export const kExtraStdio = Symbol("extraStdio");
export const kIpc = Symbol("ipc");
export const kNeedsNpmProcessState = Symbol("needsNpmProcessState");
export const kSerialization = Symbol("serialization");
const illegalConstructorKey = Symbol("illegalConstructorKey");
@ -178,6 +179,7 @@ function spawnChildInner(command, apiName, {
stderr = "piped",
windowsRawArguments = false,
detached = false,
[kSerialization]: serialization = "json",
[kExtraStdio]: extraStdio = [],
[kIpc]: ipc = -1,
[kNeedsNpmProcessState]: needsNpmProcessState = false,
@ -195,6 +197,7 @@ function spawnChildInner(command, apiName, {
stderr,
windowsRawArguments,
ipc,
serialization,
extraStdio,
detached,
needsNpmProcessState,

View file

@ -1,8 +1,5 @@
// Copyright 2018-2025 the Deno authors. MIT license.
#![allow(unused)]
use std::cell::RefCell;
use std::future::Future;
use std::io;
use std::mem;
@ -18,7 +15,6 @@ use deno_core::AsyncRefCell;
use deno_core::CancelHandle;
use deno_core::ExternalOpsTracker;
use deno_core::RcRef;
use deno_core::serde;
use deno_core::serde_json;
use deno_io::BiPipe;
use deno_io::BiPipeRead;
@ -219,6 +215,231 @@ pub enum IpcJsonStreamError {
SimdJson(#[source] simd_json::Error),
}
#[derive(Debug, thiserror::Error, deno_error::JsError)]
pub enum IpcAdvancedStreamError {
#[class(inherit)]
#[error("{0}")]
Io(#[source] std::io::Error),
}
pub struct IpcAdvancedStream {
pipe: BiPipeRead,
read_buffer: ReadBuffer,
}
struct MessageLengthBuffer {
buffer: [u8; 4],
pos: u8,
value: u32,
}
impl MessageLengthBuffer {
fn new() -> Self {
Self {
buffer: [0; 4],
pos: 0,
value: 0,
}
}
fn message_len(&mut self) -> Option<usize> {
if self.pos == 4 {
self.value = u32::from_be_bytes(self.buffer);
self.pos = 5;
Some(self.value as usize)
} else if self.pos == 5 {
Some(self.value as usize)
} else {
None
}
}
fn update_pos_by(&mut self, num: usize) {
self.pos = (self.pos + num as u8).min(4);
}
fn available_mut(&mut self) -> &mut [u8] {
&mut self.buffer[self.pos as usize..4]
}
}
pin_project! {
#[must_use = "futures do nothing unless you `.await` or poll them"]
struct ReadMsgBytesInner<'a, R: ?Sized> {
length_buffer: &'a mut MessageLengthBuffer,
reader: &'a mut R,
out_buf: &'a mut Vec<u8>,
// The number of bytes appended to buf. This can be less than buf.len() if
// the buffer was not empty when the operation was started.
read: usize,
read_buffer: &'a mut ReadBuffer,
}
}
fn read_msg_bytes_inner<'a, R: AsyncRead + ?Sized + Unpin>(
reader: &'a mut R,
length_buffer: &'a mut MessageLengthBuffer,
out_buf: &'a mut Vec<u8>,
read: usize,
read_buffer: &'a mut ReadBuffer,
) -> ReadMsgBytesInner<'a, R> {
ReadMsgBytesInner {
length_buffer,
reader,
out_buf,
read,
read_buffer,
}
}
impl IpcAdvancedStream {
fn new(pipe: BiPipeRead) -> Self {
Self {
pipe,
read_buffer: ReadBuffer::new(),
}
}
pub async fn read_msg_bytes(
&mut self,
) -> Result<Option<Vec<u8>>, IpcAdvancedStreamError> {
let mut length_buffer = MessageLengthBuffer::new();
let mut out_buf = Vec::with_capacity(32);
let nread = read_msg_bytes_inner(
&mut self.pipe,
&mut length_buffer,
&mut out_buf,
0,
&mut self.read_buffer,
)
.await
.map_err(IpcAdvancedStreamError::Io)?;
if nread == 0 {
return Ok(None);
}
Ok(Some(std::mem::take(&mut out_buf)))
}
}
impl<R: AsyncRead + ?Sized + Unpin> Future for ReadMsgBytesInner<'_, R> {
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
read_advanced_msg_bytes_internal(
Pin::new(me.reader),
cx,
me.length_buffer,
me.read_buffer,
me.out_buf,
me.read,
)
}
}
pub struct IpcAdvancedStreamResource {
pub read_half: AsyncRefCell<IpcAdvancedStream>,
pub write_half: AsyncRefCell<BiPipeWrite>,
pub cancel: Rc<CancelHandle>,
pub queued_bytes: AtomicUsize,
pub ref_tracker: IpcRefTracker,
}
impl IpcAdvancedStreamResource {
pub fn new(
stream: i64,
ref_tracker: IpcRefTracker,
) -> Result<Self, std::io::Error> {
let (read_half, write_half) = BiPipe::from_raw(stream as _)?.split();
Ok(Self {
read_half: AsyncRefCell::new(IpcAdvancedStream::new(read_half)),
write_half: AsyncRefCell::new(write_half),
cancel: Default::default(),
queued_bytes: Default::default(),
ref_tracker,
})
}
/// writes serialized message to the IPC pipe. the first 4 bytes must be the length of the following message.
pub async fn write_msg_bytes(
self: Rc<Self>,
msg: &[u8],
) -> Result<(), io::Error> {
let mut write_half = RcRef::map(self, |r| &r.write_half).borrow_mut().await;
write_half.write_all(msg).await?;
Ok(())
}
}
impl deno_core::Resource for IpcAdvancedStreamResource {
fn close(self: Rc<Self>) {
self.cancel.cancel();
}
}
fn read_advanced_msg_bytes_internal<R: AsyncRead + ?Sized>(
mut reader: Pin<&mut R>,
cx: &mut Context<'_>,
length_buffer: &mut MessageLengthBuffer,
read_buffer: &mut ReadBuffer,
out_buffer: &mut Vec<u8>,
read: &mut usize,
) -> Poll<io::Result<usize>> {
loop {
if read_buffer.needs_fill() {
let mut read_buf = ReadBuf::new(read_buffer.get_mut());
ready!(reader.as_mut().poll_read(cx, &mut read_buf))?;
read_buffer.cap = read_buf.filled().len();
read_buffer.pos = 0;
}
let available = read_buffer.available_mut();
let msg_len = length_buffer.message_len();
let (done, used) = if let Some(msg_len) = msg_len {
if out_buffer.len() >= msg_len {
(true, 0)
} else if available.is_empty() {
if *read == 0 {
return Poll::Ready(Ok(0));
} else {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"ipc stream closed while reading message",
)));
}
} else {
let remaining = msg_len - out_buffer.len();
out_buffer.reserve(remaining);
let to_copy = available.len().min(remaining);
out_buffer.extend_from_slice(&available[..to_copy]);
(out_buffer.len() == msg_len, to_copy)
}
} else {
if available.is_empty() {
if *read == 0 {
return Poll::Ready(Ok(0));
} else {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"ipc stream closed before message length",
)));
}
}
let len_avail = length_buffer.available_mut();
let to_copy = available.len().min(len_avail.len());
len_avail[..to_copy].copy_from_slice(&available[..to_copy]);
length_buffer.update_pos_by(to_copy);
(false, to_copy)
};
read_buffer.consume(used);
*read += used;
if done || *read == 0 {
return Poll::Ready(Ok(mem::replace(read, 0)));
}
}
}
// JSON serialization stream over IPC pipe.
//
// `\n` is used as a delimiter between messages.
@ -304,7 +525,7 @@ where
}
}
fn read_msg_internal<R: AsyncRead + ?Sized>(
fn read_json_msg_internal<R: AsyncRead + ?Sized>(
mut reader: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut Vec<u8>,
@ -354,7 +575,7 @@ impl<R: AsyncRead + ?Sized + Unpin> Future for ReadMsgInner<'_, R> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
read_msg_internal(
read_json_msg_internal(
Pin::new(*me.reader),
cx,
me.buf,
@ -369,11 +590,8 @@ impl<R: AsyncRead + ?Sized + Unpin> Future for ReadMsgInner<'_, R> {
mod tests {
use std::rc::Rc;
use deno_core::JsRuntime;
use deno_core::RcRef;
use deno_core::RuntimeOptions;
use deno_core::serde_json::json;
use deno_core::v8;
use super::IpcJsonStreamResource;

View file

@ -50,6 +50,7 @@ use serde::Serialize;
use tokio::process::Child as AsyncChild;
pub mod ipc;
use ipc::IpcAdvancedStreamResource;
use ipc::IpcJsonStreamResource;
use ipc::IpcRefTracker;
@ -201,6 +202,8 @@ pub struct SpawnArgs {
windows_raw_arguments: bool,
ipc: Option<i32>,
serialization: Option<ChildIpcSerialization>,
#[serde(flatten)]
stdio: ChildStdio,
@ -211,6 +214,26 @@ pub struct SpawnArgs {
needs_npm_process_state: bool,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum ChildIpcSerialization {
Json,
Advanced,
}
impl std::fmt::Display for ChildIpcSerialization {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
ChildIpcSerialization::Json => "json",
ChildIpcSerialization::Advanced => "advanced",
}
)
}
}
#[cfg(unix)]
deno_error::js_error_wrapper!(nix::Error, JsNixError, |err| {
match err {
@ -477,12 +500,30 @@ fn create_command(
fds_to_dup.push((ipc_fd2, ipc));
fds_to_close.push(ipc_fd2);
/* One end returned to parent process (this) */
let pipe_rid = state.resource_table.add(IpcJsonStreamResource::new(
ipc_fd1 as _,
IpcRefTracker::new(state.external_ops_tracker.clone()),
)?);
let pipe_rid = match args.serialization {
Some(ChildIpcSerialization::Json) | None => {
state.resource_table.add(IpcJsonStreamResource::new(
ipc_fd1 as _,
IpcRefTracker::new(state.external_ops_tracker.clone()),
)?)
}
Some(ChildIpcSerialization::Advanced) => {
state.resource_table.add(IpcAdvancedStreamResource::new(
ipc_fd1 as _,
IpcRefTracker::new(state.external_ops_tracker.clone()),
)?)
}
};
/* The other end passed to child process via NODE_CHANNEL_FD */
command.env("NODE_CHANNEL_FD", format!("{}", ipc));
command.env(
"NODE_CHANNEL_SERIALIZATION_MODE",
args
.serialization
.unwrap_or(ChildIpcSerialization::Json)
.to_string(),
);
ipc_rid = Some(pipe_rid);
}
@ -548,18 +589,34 @@ fn create_command(
let (hd1, hd2) = deno_io::bi_pipe_pair_raw()?;
/* One end returned to parent process (this) */
let pipe_rid =
Some(state.resource_table.add(IpcJsonStreamResource::new(
hd1 as i64,
IpcRefTracker::new(state.external_ops_tracker.clone()),
)?));
let pipe_rid = match args.serialization {
Some(ChildIpcSerialization::Json) | None => {
state.resource_table.add(IpcJsonStreamResource::new(
hd1 as _,
IpcRefTracker::new(state.external_ops_tracker.clone()),
)?)
}
Some(ChildIpcSerialization::Advanced) => {
state.resource_table.add(IpcAdvancedStreamResource::new(
hd1 as _,
IpcRefTracker::new(state.external_ops_tracker.clone()),
)?)
}
};
/* The other end passed to child process via NODE_CHANNEL_FD */
command.env("NODE_CHANNEL_FD", format!("{}", hd2 as i64));
command.env(
"NODE_CHANNEL_SERIALIZATION_MODE",
args
.serialization
.unwrap_or(ChildIpcSerialization::Json)
.to_string(),
);
handles_to_close.push(hd2);
ipc_rid = pipe_rid;
ipc_rid = Some(pipe_rid);
}
for (i, stdio) in args.extra_stdio.into_iter().enumerate() {

View file

@ -739,8 +739,8 @@ impl MainWorker {
let op_state = self.js_runtime.op_state();
let mut state = op_state.borrow_mut();
state.put(options.clone());
if let Some(node_ipc_fd) = options.node_ipc_fd {
state.put(deno_node::ChildPipeFd(node_ipc_fd));
if let Some((fd, serialization)) = options.node_ipc_init {
state.put(deno_node::ChildPipeFd(fd, serialization));
}
}

View file

@ -5,6 +5,7 @@ use std::thread;
use deno_core::ModuleSpecifier;
use deno_core::v8;
use deno_node::ops::ipc::ChildIpcSerialization;
use deno_telemetry::OtelConfig;
use deno_terminal::colors;
use serde::Serialize;
@ -109,7 +110,7 @@ pub struct BootstrapOptions {
pub has_node_modules_dir: bool,
pub argv0: Option<String>,
pub node_debug: Option<String>,
pub node_ipc_fd: Option<i64>,
pub node_ipc_init: Option<(i64, ChildIpcSerialization)>,
pub mode: WorkerExecutionMode,
pub no_legacy_abort: bool,
// Used by `deno serve`
@ -149,7 +150,7 @@ impl Default for BootstrapOptions {
has_node_modules_dir: false,
argv0: None,
node_debug: None,
node_ipc_fd: None,
node_ipc_init: None,
mode: WorkerExecutionMode::None,
no_legacy_abort: false,
serve_port: Default::default(),

View file

@ -135,6 +135,10 @@
"parallel/test-buffer-zero-fill-cli.js" = {}
"parallel/test-buffer-zero-fill-reset.js" = {}
"parallel/test-buffer-zero-fill.js" = {}
"parallel/test-child-process-advanced-serialization.js" = {}
# TODO(nathanwhit): figure out why this is failing on windows
"parallel/test-child-process-advanced-serialization-largebuffer.js" = { windows = false }
"parallel/test-child-process-advanced-serialization-splitted-length-field.js" = {}
"parallel/test-child-process-can-write-to-stdout.js" = { flaky = true }
"parallel/test-child-process-default-options.js" = {}
"parallel/test-child-process-detached.js" = {}

View file

@ -1006,6 +1006,95 @@ Deno.test(
},
);
Deno.test(async function ipcSerializationAdvanced() {
const timeout = withTimeout<void>();
const script = `
if (typeof process.send !== "function") {
console.error("process.send is not a function");
process.exit(1);
}
class BigIntWrapper {
constructor(value) {
this.value = value;
}
toJSON() {
return this.value.toString();
}
}
const makeSab = (arr) => {
const sab = new SharedArrayBuffer(arr.length);
const buf = new Uint8Array(sab);
for (let i = 0; i < arr.length; i++) {
buf[i] = arr[i];
}
return buf;
};
const inputs = [
"foo",
{
foo: "bar",
},
42,
true,
null,
new Uint8Array([1, 2, 3]),
{
foo: new Uint8Array([1, 2, 3]),
bar: makeSab([4, 5, 6]),
},
[1, { foo: 2 }, [3, 4]],
42n,
];
for (const input of inputs) {
process.send(input);
}
`;
const makeSab = (arr: number[]) => {
const sab = new SharedArrayBuffer(arr.length);
const buf = new Uint8Array(sab);
for (let i = 0; i < arr.length; i++) {
buf[i] = arr[i];
}
return buf;
};
const file = await Deno.makeTempFile();
await Deno.writeTextFile(file, script);
const child = CP.fork(file, [], {
stdio: ["inherit", "inherit", "inherit", "ipc"],
serialization: "advanced",
});
const expect = [
"foo",
{
foo: "bar",
},
42,
true,
null,
new Uint8Array([1, 2, 3]),
{
foo: new Uint8Array([1, 2, 3]),
bar: makeSab([4, 5, 6]),
},
[1, { foo: 2 }, [3, 4]],
42n,
];
let i = 0;
child.on("message", (message) => {
assertEquals(message, expect[i]);
i++;
});
child.on("close", () => timeout.resolve());
await timeout.promise;
assertEquals(i, expect.length);
});
Deno.test(async function childProcessExitsGracefully() {
const testdataDir = path.join(
path.dirname(path.fromFileUrl(import.meta.url)),