diff --git a/Cargo.lock b/Cargo.lock index bbb501acf3..642136304d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2474,6 +2474,7 @@ dependencies = [ "k256", "libc", "libz-sys", + "log", "md-5", "md4", "nix 0.27.1", diff --git a/Cargo.toml b/Cargo.toml index 297417afb6..2eaa6c1a94 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -201,7 +201,7 @@ jupyter_runtime = "=0.19.0" lazy-regex = "3" libc = "0.2.168" libz-sys = { version = "1.1.20", default-features = false } -log = { version = "0.4.20", features = ["kv"] } +log = { version = "0.4.28", features = ["kv"] } lsp-types = "=0.97.0" # used by tower-lsp and "proposed" feature is unstable in patch releases memchr = "2.7.4" mime = "0.3.16" diff --git a/cli/args/mod.rs b/cli/args/mod.rs index 97f5dde788..87fb4d7bc1 100644 --- a/cli/args/mod.rs +++ b/cli/args/mod.rs @@ -48,6 +48,7 @@ use deno_npm_installer::LifecycleScriptsConfig; use deno_npm_installer::graph::NpmCachingStrategy; use deno_path_util::resolve_url_or_path; use deno_resolver::factory::resolve_jsr_url; +use deno_runtime::deno_node::ops::ipc::ChildIpcSerialization; use deno_runtime::deno_permissions::AllowRunDescriptor; use deno_runtime::deno_permissions::PathDescriptor; use deno_runtime::deno_permissions::PermissionsOptions; @@ -586,18 +587,31 @@ impl CliOptions { ) } - pub fn node_ipc_fd(&self) -> Option { + pub fn node_ipc_init( + &self, + ) -> Result, AnyError> { let maybe_node_channel_fd = std::env::var("NODE_CHANNEL_FD").ok(); - if let Some(node_channel_fd) = maybe_node_channel_fd { - // Remove so that child processes don't inherit this environment variable. - - #[allow(clippy::undocumented_unsafe_blocks)] - unsafe { - std::env::remove_var("NODE_CHANNEL_FD") - }; - node_channel_fd.parse::().ok() + let maybe_node_channel_serialization = if let Ok(serialization) = + std::env::var("NODE_CHANNEL_SERIALIZATION_MODE") + { + Some(serialization.parse::()?) } else { None + }; + if let Some(node_channel_fd) = maybe_node_channel_fd { + // Remove so that child processes don't inherit this environment variables. + #[allow(clippy::undocumented_unsafe_blocks)] + unsafe { + std::env::remove_var("NODE_CHANNEL_FD"); + std::env::remove_var("NODE_CHANNEL_SERIALIZATION_MODE"); + } + let node_channel_fd = node_channel_fd.parse::()?; + Ok(Some(( + node_channel_fd, + maybe_node_channel_serialization.unwrap_or(ChildIpcSerialization::Json), + ))) + } else { + Ok(None) } } diff --git a/cli/factory.rs b/cli/factory.rs index 4ba24f795a..c29c2a5273 100644 --- a/cli/factory.rs +++ b/cli/factory.rs @@ -1134,7 +1134,7 @@ impl CliFactory { unsafely_ignore_certificate_errors: cli_options .unsafely_ignore_certificate_errors() .clone(), - node_ipc: cli_options.node_ipc_fd(), + node_ipc_init: cli_options.node_ipc_init()?, serve_port: cli_options.serve_port(), serve_host: cli_options.serve_host(), otel_config: cli_options.otel_config(), diff --git a/cli/lib/worker.rs b/cli/lib/worker.rs index e1421b4760..d126705df8 100644 --- a/cli/lib/worker.rs +++ b/cli/lib/worker.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use deno_bundle_runtime::BundleProvider; use deno_core::error::JsError; use deno_node::NodeRequireLoaderRc; +use deno_node::ops::ipc::ChildIpcSerialization; use deno_path_util::url_from_file_path; use deno_path_util::url_to_file_path; use deno_resolver::npm::DenoInNpmPackageChecker; @@ -345,7 +346,7 @@ pub struct LibMainWorkerOptions { pub seed: Option, pub unsafely_ignore_certificate_errors: Option>, pub skip_op_registration: bool, - pub node_ipc: Option, + pub node_ipc_init: Option<(i64, ChildIpcSerialization)>, pub no_legacy_abort: bool, pub startup_snapshot: Option<&'static [u8]>, pub serve_port: Option, @@ -489,7 +490,7 @@ impl LibWorkerFactorySharedState { has_node_modules_dir: shared.options.has_node_modules_dir, argv0: shared.options.argv0.clone(), node_debug: shared.options.node_debug.clone(), - node_ipc_fd: None, + node_ipc_init: None, mode: WorkerExecutionMode::Worker, serve_port: shared.options.serve_port, serve_host: shared.options.serve_host.clone(), @@ -688,7 +689,7 @@ impl LibMainWorkerFactory { has_node_modules_dir: shared.options.has_node_modules_dir, argv0: shared.options.argv0.clone(), node_debug: shared.options.node_debug.clone(), - node_ipc_fd: shared.options.node_ipc, + node_ipc_init: shared.options.node_ipc_init, mode, no_legacy_abort: shared.options.no_legacy_abort, serve_port: shared.options.serve_port, diff --git a/cli/rt/run.rs b/cli/rt/run.rs index 6a4b547ac4..1f938bb22a 100644 --- a/cli/rt/run.rs +++ b/cli/rt/run.rs @@ -1029,7 +1029,7 @@ pub async fn run( seed: metadata.seed, unsafely_ignore_certificate_errors: metadata .unsafely_ignore_certificate_errors, - node_ipc: None, + node_ipc_init: None, serve_port: None, serve_host: None, otel_config: metadata.otel_config, diff --git a/ext/node/Cargo.toml b/ext/node/Cargo.toml index fa564ae45c..eff2d94bde 100644 --- a/ext/node/Cargo.toml +++ b/ext/node/Cargo.toml @@ -62,6 +62,7 @@ ipnetwork.workspace = true k256.workspace = true libc.workspace = true libz-sys.workspace = true +log.workspace = true md-5 = { workspace = true, features = ["oid"] } md4.workspace = true node_resolver.workspace = true diff --git a/ext/node/lib.rs b/ext/node/lib.rs index f47250228d..aa99220279 100644 --- a/ext/node/lib.rs +++ b/ext/node/lib.rs @@ -367,8 +367,11 @@ deno_core::extension!(deno_node, ops::util::op_node_in_npm_package, ops::worker_threads::op_worker_threads_filename, ops::ipc::op_node_child_ipc_pipe, - ops::ipc::op_node_ipc_write, - ops::ipc::op_node_ipc_read, + ops::ipc::op_node_ipc_write_json, + ops::ipc::op_node_ipc_read_json, + ops::ipc::op_node_ipc_read_advanced, + ops::ipc::op_node_ipc_write_advanced, + ops::ipc::op_node_ipc_buffer_constructor, ops::ipc::op_node_ipc_ref, ops::ipc::op_node_ipc_unref, ops::process::op_node_process_kill, diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs index 2754546f2e..958a2e3922 100644 --- a/ext/node/ops/ipc.rs +++ b/ext/node/ops/ipc.rs @@ -2,7 +2,27 @@ pub use impl_::*; -pub struct ChildPipeFd(pub i64); +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum ChildIpcSerialization { + Json, + Advanced, +} + +impl std::str::FromStr for ChildIpcSerialization { + type Err = deno_core::anyhow::Error; + fn from_str(s: &str) -> Result { + match s { + "json" => Ok(ChildIpcSerialization::Json), + "advanced" => Ok(ChildIpcSerialization::Advanced), + _ => Err(deno_core::anyhow::anyhow!( + "Invalid serialization type: {}", + s + )), + } + } +} + +pub struct ChildPipeFd(pub i64, pub ChildIpcSerialization); mod impl_ { use std::cell::RefCell; @@ -20,13 +40,20 @@ mod impl_ { use deno_core::serde::Serializer; use deno_core::serde_json; use deno_core::v8; + use deno_core::v8::ValueDeserializerHelper; + use deno_core::v8::ValueSerializerHelper; use deno_error::JsErrorBox; pub use deno_process::ipc::INITIAL_CAPACITY; + use deno_process::ipc::IpcAdvancedStreamError; + use deno_process::ipc::IpcAdvancedStreamResource; use deno_process::ipc::IpcJsonStreamError; pub use deno_process::ipc::IpcJsonStreamResource; pub use deno_process::ipc::IpcRefTracker; use serde::Serialize; + use crate::ChildPipeFd; + use crate::ops::ipc::ChildIpcSerialization; + /// Wrapper around v8 value that implements Serialize. struct SerializeWrapper<'a, 'b, 'c>( RefCell<&'b mut v8::PinScope<'a, 'c>>, @@ -148,20 +175,31 @@ mod impl_ { // Open IPC pipe from bootstrap options. #[op2] - #[smi] + #[to_v8] pub fn op_node_child_ipc_pipe( state: &mut OpState, - ) -> Result, io::Error> { - let fd = match state.try_borrow_mut::() { - Some(child_pipe_fd) => child_pipe_fd.0, + ) -> Result, io::Error> { + let (fd, serialization) = match state.try_borrow_mut::() + { + Some(ChildPipeFd(fd, serialization)) => (*fd, *serialization), None => return Ok(None), }; + log::debug!("op_node_child_ipc_pipe: {:?}, {:?}", fd, serialization); let ref_tracker = IpcRefTracker::new(state.external_ops_tracker.clone()); - Ok(Some( - state - .resource_table - .add(IpcJsonStreamResource::new(fd, ref_tracker)?), - )) + match serialization { + ChildIpcSerialization::Json => Ok(Some(( + state + .resource_table + .add(IpcJsonStreamResource::new(fd, ref_tracker)?), + 0, + ))), + ChildIpcSerialization::Advanced => Ok(Some(( + state + .resource_table + .add(IpcAdvancedStreamResource::new(fd, ref_tracker)?), + 1, + ))), + } } #[derive(Debug, thiserror::Error, deno_error::JsError)] @@ -171,6 +209,9 @@ mod impl_ { Resource(#[from] deno_core::error::ResourceError), #[class(inherit)] #[error(transparent)] + IpcAdvancedStream(#[from] IpcAdvancedStreamError), + #[class(inherit)] + #[error(transparent)] IpcJsonStream(#[from] IpcJsonStreamError), #[class(inherit)] #[error(transparent)] @@ -178,10 +219,16 @@ mod impl_ { #[class(inherit)] #[error("failed to serialize json value: {0}")] SerdeJson(serde_json::Error), + #[class(type)] + #[error("Failed to read header")] + ReadHeaderFailed, + #[class(type)] + #[error("Failed to read value")] + ReadValueFailed, } #[op2(async)] - pub fn op_node_ipc_write<'a>( + pub fn op_node_ipc_write_json<'a>( scope: &mut v8::PinScope<'a, '_>, state: Rc>, #[smi] rid: ResourceId, @@ -226,6 +273,510 @@ mod impl_ { }) } + pub struct AdvancedSerializerDelegate { + constants: AdvancedIpcConstants, + } + + impl AdvancedSerializerDelegate { + fn new(constants: AdvancedIpcConstants) -> Self { + Self { constants } + } + } + + const ARRAY_BUFFER_VIEW_TAG: u32 = 0; + const NOT_ARRAY_BUFFER_VIEW_TAG: u32 = 1; + + fn ab_view_to_index<'s>( + scope: &mut v8::PinScope<'s, '_>, + view: v8::Local<'s, v8::ArrayBufferView>, + constants: &AdvancedIpcConstants, + ) -> Option { + if view.is_int8_array() { + Some(0) + } else if view.is_uint8_array() { + let constructor = view + .get( + scope, + v8::Local::new(scope, &constants.inner.constructor_key).into(), + ) + .unwrap(); + let buffer_constructor = v8::Local::::from(v8::Local::new( + scope, + &constants.inner.buffer_constructor, + )); + if constructor == buffer_constructor { + Some(10) + } else { + Some(1) + } + } else if view.is_uint8_clamped_array() { + Some(2) + } else if view.is_int16_array() { + Some(3) + } else if view.is_uint16_array() { + Some(4) + } else if view.is_int32_array() { + Some(5) + } else if view.is_uint32_array() { + Some(6) + } else if view.is_float32_array() { + Some(7) + } else if view.is_float64_array() { + Some(8) + } else if view.is_data_view() { + Some(9) + } else if view.is_big_int64_array() { + Some(11) + } else if view.is_big_uint64_array() { + Some(12) + } else if view.is_float16_array() { + Some(13) + } else { + None + } + } + + impl v8::ValueSerializerImpl for AdvancedSerializerDelegate { + fn throw_data_clone_error<'s>( + &self, + scope: &mut v8::PinScope<'s, '_>, + message: v8::Local<'s, v8::String>, + ) { + let error = v8::Exception::type_error(scope, message); + scope.throw_exception(error); + } + + fn has_custom_host_object(&self, _isolate: &v8::Isolate) -> bool { + false + } + + fn write_host_object<'s>( + &self, + scope: &mut v8::PinScope<'s, '_>, + object: v8::Local<'s, v8::Object>, + value_serializer: &dyn v8::ValueSerializerHelper, + ) -> Option { + if object.is_array_buffer_view() { + let ab_view = object.cast::(); + value_serializer.write_uint32(ARRAY_BUFFER_VIEW_TAG); + let Some(index) = ab_view_to_index(scope, ab_view, &self.constants) + else { + scope.throw_exception(v8::Exception::type_error( + scope, + v8::String::new_from_utf8( + scope, + format!("Unserializable host object: {}", object.type_repr()) + .as_bytes(), + v8::NewStringType::Normal, + ) + .unwrap(), + )); + return None; + }; + value_serializer.write_uint32(index); + value_serializer.write_uint32(ab_view.byte_length() as u32); + let mut storage = [0u8; v8::TYPED_ARRAY_MAX_SIZE_IN_HEAP]; + let slice = ab_view.get_contents(&mut storage); + value_serializer.write_raw_bytes(slice); + Some(true) + } else { + value_serializer.write_uint32(NOT_ARRAY_BUFFER_VIEW_TAG); + value_serializer + .write_value(scope.get_current_context(), object.into()); + Some(true) + } + } + + fn get_shared_array_buffer_id<'s>( + &self, + _scope: &mut v8::PinScope<'s, '_>, + _shared_array_buffer: v8::Local<'s, v8::SharedArrayBuffer>, + ) -> Option { + None + } + } + + #[derive(Clone)] + struct AdvancedIpcConstants { + inner: Rc, + } + struct AdvancedIpcConstantsInner { + buffer_constructor: v8::Global, + constructor_key: v8::Global, + fast_buffer_prototype: v8::Global, + } + + #[op2(fast)] + pub fn op_node_ipc_buffer_constructor( + scope: &mut v8::PinScope<'_, '_>, + state: &mut OpState, + buffer_constructor: v8::Local<'_, v8::Function>, + fast_buffer_prototype: v8::Local<'_, v8::Object>, + ) { + if state.has::() { + return; + } + let constants = AdvancedIpcConstants { + inner: Rc::new(AdvancedIpcConstantsInner { + buffer_constructor: v8::Global::new(scope, buffer_constructor), + constructor_key: v8::Global::new( + scope, + v8::String::new_from_utf8( + scope, + b"constructor", + v8::NewStringType::Internalized, + ) + .unwrap(), + ), + fast_buffer_prototype: v8::Global::new(scope, fast_buffer_prototype), + }), + }; + state.put(constants); + } + + #[op2(async)] + pub fn op_node_ipc_write_advanced<'a>( + scope: &mut v8::PinScope<'a, '_>, + state: Rc>, + #[smi] rid: ResourceId, + value: v8::Local<'a, v8::Value>, + // using an array as an "out parameter". + // index 0 is a boolean indicating whether the queue is under the limit. + // + // ideally we would just return `Result<(impl Future, bool), ..>`, but that's not + // supported by `op2` currently. + queue_ok: v8::Local<'a, v8::Array>, + ) -> Result> + use<>, IpcError> { + let constants = state.borrow().borrow::().clone(); + let serializer = AdvancedSerializer::new(scope, constants); + let serialized = serializer.serialize(scope, value)?; + + let stream = state + .borrow() + .resource_table + .get::(rid)?; + let old = stream + .queued_bytes + .fetch_add(serialized.len(), std::sync::atomic::Ordering::Relaxed); + if old + serialized.len() > 2 * INITIAL_CAPACITY { + // sending messages too fast + let Ok(v) = false.to_v8(scope); + queue_ok.set_index(scope, 0, v); + } + Ok(async move { + let cancel = stream.cancel.clone(); + let result = stream + .clone() + .write_msg_bytes(&serialized) + .or_cancel(cancel) + .await; + // adjust count even on error + stream + .queued_bytes + .fetch_sub(serialized.len(), std::sync::atomic::Ordering::Relaxed); + result??; + Ok(()) + }) + } + + struct AdvancedSerializer { + inner: v8::ValueSerializer<'static>, + } + + impl AdvancedSerializer { + fn new( + scope: &mut v8::PinScope<'_, '_>, + constants: AdvancedIpcConstants, + ) -> Self { + let inner = v8::ValueSerializer::new( + scope, + Box::new(AdvancedSerializerDelegate::new(constants)), + ); + inner.set_treat_array_buffer_views_as_host_objects(true); + Self { inner } + } + + fn serialize<'s, 'i>( + &self, + scope: &mut v8::PinScope<'s, 'i>, + value: v8::Local<'s, v8::Value>, + ) -> Result, IpcError> { + self.inner.write_raw_bytes(&[0, 0, 0, 0]); + self.inner.write_header(); + let context = scope.get_current_context(); + self.inner.write_value(context, value); + let mut ser = self.inner.release(); + let length = ser.len() - 4; + ser[0] = ((length >> 24) & 0xFF) as u8; + ser[1] = ((length >> 16) & 0xFF) as u8; + ser[2] = ((length >> 8) & 0xFF) as u8; + ser[3] = (length & 0xFF) as u8; + Ok(ser) + } + } + + struct AdvancedIpcDeserializer { + inner: v8::ValueDeserializer<'static>, + } + + struct AdvancedIpcDeserializerDelegate { + constants: AdvancedIpcConstants, + } + + impl v8::ValueDeserializerImpl for AdvancedIpcDeserializerDelegate { + fn read_host_object<'s>( + &self, + scope: &mut v8::PinScope<'s, '_>, + deser: &dyn ValueDeserializerHelper, + ) -> Option> { + let throw_error = |message: &str| { + scope.throw_exception(v8::Exception::type_error( + scope, + v8::String::new_from_utf8( + scope, + message.as_bytes(), + v8::NewStringType::Normal, + ) + .unwrap(), + )); + None + }; + let mut tag = 0; + if !deser.read_uint32(&mut tag) { + return throw_error("Failed to read tag"); + } + match tag { + ARRAY_BUFFER_VIEW_TAG => { + let mut index = 0; + if !deser.read_uint32(&mut index) { + return throw_error("Failed to read array buffer view type tag"); + } + let mut byte_length = 0; + if !deser.read_uint32(&mut byte_length) { + return throw_error("Failed to read byte length"); + } + let Some(buf) = deser.read_raw_bytes(byte_length as usize) else { + return throw_error("failed to read bytes for typed array"); + }; + + let array_buffer = v8::ArrayBuffer::new(scope, byte_length as usize); + // SAFETY: array_buffer is valid as v8 is keeping it alive, and is byte_length bytes + // buf is also byte_length bytes long + unsafe { + std::ptr::copy( + buf.as_ptr(), + array_buffer.data().unwrap().as_ptr().cast::(), + byte_length as usize, + ); + } + + let value = match index { + 0 => { + v8::Int8Array::new(scope, array_buffer, 0, byte_length as usize) + .unwrap() + .into() + } + 1 => { + v8::Uint8Array::new(scope, array_buffer, 0, byte_length as usize) + .unwrap() + .into() + } + 10 => { + let obj: v8::Local = v8::Uint8Array::new( + scope, + array_buffer, + 0, + byte_length as usize, + )? + .into(); + let fast_proto = v8::Local::new( + scope, + &self.constants.inner.fast_buffer_prototype, + ); + obj.set_prototype(scope, fast_proto.into()); + obj + } + 2 => v8::Uint8ClampedArray::new( + scope, + array_buffer, + 0, + byte_length as usize, + )? + .into(), + 3 => v8::Int16Array::new( + scope, + array_buffer, + 0, + byte_length as usize / 2, + )? + .into(), + 4 => v8::Uint16Array::new( + scope, + array_buffer, + 0, + byte_length as usize / 2, + )? + .into(), + 5 => v8::Int32Array::new( + scope, + array_buffer, + 0, + byte_length as usize / 4, + )? + .into(), + 6 => v8::Uint32Array::new( + scope, + array_buffer, + 0, + byte_length as usize / 4, + )? + .into(), + 7 => v8::Float32Array::new( + scope, + array_buffer, + 0, + byte_length as usize / 4, + ) + .unwrap() + .into(), + 8 => v8::Float64Array::new( + scope, + array_buffer, + 0, + byte_length as usize / 8, + )? + .into(), + 9 => { + v8::DataView::new(scope, array_buffer, 0, byte_length as usize) + .into() + } + 11 => v8::BigInt64Array::new( + scope, + array_buffer, + 0, + byte_length as usize / 8, + )? + .into(), + 12 => v8::BigUint64Array::new( + scope, + array_buffer, + 0, + byte_length as usize / 8, + )? + .into(), + // TODO(nathanwhit): this should just be `into()`, but I forgot to impl it in rusty_v8. + // the underlying impl is just a transmute though. + // SAFETY: float16array is an object + 13 => unsafe { + std::mem::transmute::< + v8::Local, + v8::Local, + >(v8::Float16Array::new( + scope, + array_buffer, + 0, + byte_length as usize / 2, + )?) + }, + _ => return None, + }; + Some(value) + } + NOT_ARRAY_BUFFER_VIEW_TAG => { + let value = deser.read_value(scope.get_current_context()); + Some(value.unwrap_or_else(|| v8::null(scope).into()).cast()) + } + _ => { + throw_error(&format!("Invalid tag: {}", tag)); + None + } + } + } + } + + impl AdvancedIpcDeserializer { + fn new( + scope: &mut v8::PinScope<'_, '_>, + constants: AdvancedIpcConstants, + msg_bytes: &[u8], + ) -> Self { + let inner = v8::ValueDeserializer::new( + scope, + Box::new(AdvancedIpcDeserializerDelegate { constants }), + msg_bytes, + ); + Self { inner } + } + } + + struct AdvancedIpcReadResult { + msg_bytes: Option>, + constants: AdvancedIpcConstants, + } + + fn make_stop_sentinel<'s>( + scope: &mut v8::PinScope<'s, '_>, + ) -> v8::Local<'s, v8::Value> { + let obj = v8::Object::new(scope); + obj.set( + scope, + v8::String::new_from_utf8(scope, b"cmd", v8::NewStringType::Internalized) + .unwrap() + .into(), + v8::String::new_from_utf8( + scope, + b"NODE_CLOSE", + v8::NewStringType::Internalized, + ) + .unwrap() + .into(), + ); + obj.into() + } + + impl<'a> deno_core::ToV8<'a> for AdvancedIpcReadResult { + type Error = IpcError; + fn to_v8( + self, + scope: &mut v8::PinScope<'a, '_>, + ) -> Result, Self::Error> { + let Some(msg_bytes) = self.msg_bytes else { + return Ok(make_stop_sentinel(scope)); + }; + let deser = + AdvancedIpcDeserializer::new(scope, self.constants, &msg_bytes); + let context = scope.get_current_context(); + let header_success = deser.inner.read_header(context).unwrap_or(false); + if !header_success { + return Err(IpcError::ReadHeaderFailed); + } + let Some(value) = deser.inner.read_value(context) else { + return Err(IpcError::ReadValueFailed); + }; + Ok(value) + } + } + + #[op2(async)] + #[to_v8] + pub async fn op_node_ipc_read_advanced( + state: Rc>, + #[smi] rid: ResourceId, + ) -> Result { + let stream = state + .borrow() + .resource_table + .get::(rid)?; + let cancel = stream.cancel.clone(); + let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await; + let msg_bytes = stream.read_msg_bytes().or_cancel(cancel).await??; + + Ok(AdvancedIpcReadResult { + msg_bytes, + constants: state.borrow().borrow::().clone(), + }) + } + /// Value signaling that the other end ipc channel has closed. /// /// Node reserves objects of this form (`{ "cmd": "NODE_"`) @@ -238,7 +789,7 @@ mod impl_ { #[op2(async)] #[serde] - pub async fn op_node_ipc_read( + pub async fn op_node_ipc_read_json( state: Rc>, #[smi] rid: ResourceId, ) -> Result { @@ -258,21 +809,45 @@ mod impl_ { } #[op2(fast)] - pub fn op_node_ipc_ref(state: &mut OpState, #[smi] rid: ResourceId) { - let stream = state - .resource_table - .get::(rid) - .expect("Invalid resource ID"); - stream.ref_tracker.ref_(); + pub fn op_node_ipc_ref( + state: &mut OpState, + #[smi] rid: ResourceId, + serialization_json: bool, + ) { + if serialization_json { + let stream = state + .resource_table + .get::(rid) + .expect("Invalid resource ID"); + stream.ref_tracker.ref_(); + } else { + let stream = state + .resource_table + .get::(rid) + .expect("Invalid resource ID"); + stream.ref_tracker.ref_(); + } } #[op2(fast)] - pub fn op_node_ipc_unref(state: &mut OpState, #[smi] rid: ResourceId) { - let stream = state - .resource_table - .get::(rid) - .expect("Invalid resource ID"); - stream.ref_tracker.unref(); + pub fn op_node_ipc_unref( + state: &mut OpState, + #[smi] rid: ResourceId, + serialization_json: bool, + ) { + if serialization_json { + let stream = state + .resource_table + .get::(rid) + .expect("Invalid resource ID"); + stream.ref_tracker.unref(); + } else { + let stream = state + .resource_table + .get::(rid) + .expect("Invalid resource ID"); + stream.ref_tracker.unref(); + } } #[cfg(test)] diff --git a/ext/node/ops/v8.rs b/ext/node/ops/v8.rs index ad284b855c..4b584233d9 100644 --- a/ext/node/ops/v8.rs +++ b/ext/node/ops/v8.rs @@ -330,11 +330,18 @@ pub fn op_v8_new_deserializer( pub fn op_v8_transfer_array_buffer_de( #[cppgc] deser: &Deserializer, #[smi] id: u32, - array_buffer: v8::Local, -) { - // TODO(nathanwhit): also need binding for TransferSharedArrayBuffer, then call that if - // array_buffer is shared + array_buffer: v8::Local, +) -> Result<(), deno_core::error::DataError> { + if let Ok(shared_array_buffer) = + array_buffer.try_cast::() + { + deser + .inner + .transfer_shared_array_buffer(id, shared_array_buffer) + } + let array_buffer = array_buffer.try_cast::()?; deser.inner.transfer_array_buffer(id, array_buffer); + Ok(()) } #[op2(fast)] diff --git a/ext/node/polyfills/child_process.ts b/ext/node/polyfills/child_process.ts index 8c6a8732d3..1a3e7bd3e9 100644 --- a/ext/node/polyfills/child_process.ts +++ b/ext/node/polyfills/child_process.ts @@ -835,9 +835,12 @@ export function execFileSync( } function setupChildProcessIpcChannel() { - const fd = op_node_child_ipc_pipe(); + const maybePipe = op_node_child_ipc_pipe(); + if (!maybePipe) return; + const [fd, serialization] = maybePipe; + const serializationMode = serialization === 0 ? "json" : "advanced"; if (typeof fd != "number" || fd < 0) return; - const control = setupChannel(process, fd); + const control = setupChannel(process, fd, serializationMode); process.on("newListener", (name: string) => { if (name === "message" || name === "disconnect") { control.refCounted(); diff --git a/ext/node/polyfills/internal/buffer.mjs b/ext/node/polyfills/internal/buffer.mjs index 937f73a22a..22b5e02fa6 100644 --- a/ext/node/polyfills/internal/buffer.mjs +++ b/ext/node/polyfills/internal/buffer.mjs @@ -176,7 +176,7 @@ function showFlaggedDeprecation() { bufferWarningAlreadyEmitted = true; } -class FastBuffer extends Uint8Array { +export class FastBuffer extends Uint8Array { constructor(bufferOrLength, byteOffset, length) { super(bufferOrLength, byteOffset, length); } diff --git a/ext/node/polyfills/internal/child_process.ts b/ext/node/polyfills/internal/child_process.ts index d5387633b9..11e7d5b939 100644 --- a/ext/node/polyfills/internal/child_process.ts +++ b/ext/node/polyfills/internal/child_process.ts @@ -9,10 +9,13 @@ import { core, internals } from "ext:core/mod.js"; import { op_node_in_npm_package, - op_node_ipc_read, + op_node_ipc_buffer_constructor, + op_node_ipc_read_advanced, + op_node_ipc_read_json, op_node_ipc_ref, op_node_ipc_unref, - op_node_ipc_write, + op_node_ipc_write_advanced, + op_node_ipc_write_json, } from "ext:core/ops"; import { ArrayIsArray, @@ -41,6 +44,7 @@ import { ERR_UNKNOWN_SIGNAL, } from "ext:deno_node/internal/errors.ts"; import { Buffer } from "node:buffer"; +import { FastBuffer } from "ext:deno_node/internal/buffer.mjs"; import { errnoException } from "ext:deno_node/internal/errors.ts"; import { ErrnoException } from "ext:deno_node/_global.d.ts"; import { codeMap } from "ext:deno_node/internal_binding/uv.ts"; @@ -48,6 +52,7 @@ import { isInt32, validateBoolean, validateObject, + validateOneOf, validateString, } from "ext:deno_node/internal/validators.mjs"; import { kEmptyObject } from "ext:deno_node/internal/util.mjs"; @@ -62,6 +67,7 @@ import { kInputOption, kIpc, kNeedsNpmProcessState, + kSerialization, } from "ext:deno_process/40_process.js"; export function mapValues( @@ -248,6 +254,8 @@ export class ChildProcess extends EventEmitter { windowsVerbatimArguments = false, detached, } = options || {}; + + const serialization = options?.serialization || "json"; const normalizedStdio = normalizeStdioOption(stdio); const [ stdin = "pipe", @@ -287,6 +295,7 @@ export class ChildProcess extends EventEmitter { stderr: toDenoStdio(stderr), windowsRawArguments: windowsVerbatimArguments, detached, + [kSerialization]: serialization, [kIpc]: ipc, // internal [kExtraStdio]: extraStdioNormalized, [kNeedsNpmProcessState]: @@ -386,7 +395,7 @@ export class ChildProcess extends EventEmitter { const pipeRid = internals.getIpcPipeRid(this.#process); if (typeof pipeRid == "number") { - setupChannel(this, pipeRid); + setupChannel(this, pipeRid, serialization); this[kClosesNeeded]++; this.on("disconnect", () => { maybeClose(this); @@ -763,6 +772,12 @@ export function normalizeSpawnArguments( ); } + validateOneOf(options.serialization, "options.serialization", [ + undefined, + "json", + "advanced", + ]); + if (options.shell) { const command = ArrayPrototypeJoin([file, ...args], " "); // Set the shell, switches, and commands. @@ -849,6 +864,7 @@ export function normalizeSpawnArguments( file, windowsHide: !!options.windowsHide, windowsVerbatimArguments: !!windowsVerbatimArguments, + serialization: options.serialization || "json", }; } @@ -1332,20 +1348,22 @@ class Control extends EventEmitter { #refExplicitlySet = false; #connected = true; [kPendingMessages] = []; - constructor(channel: number) { + #serialization: "json" | "advanced"; + constructor(channel: number, serialization: "json" | "advanced") { super(); this.#channel = channel; + this.#serialization = serialization; } #ref() { if (this.#connected) { - op_node_ipc_ref(this.#channel); + op_node_ipc_ref(this.#channel, this.#serialization === "json"); } } #unref() { if (this.#connected) { - op_node_ipc_unref(this.#channel); + op_node_ipc_unref(this.#channel, this.#serialization === "json"); } } @@ -1397,18 +1415,37 @@ function internalCmdName(msg: InternalMessage): string { return StringPrototypeSlice(msg.cmd, 5); } -// deno-lint-ignore no-explicit-any -export function setupChannel(target: any, ipc: number) { - const control = new Control(ipc); +let hasSetBufferConstructor = false; + +export function setupChannel( + // deno-lint-ignore no-explicit-any + target: any, + ipc: number, + serialization: "json" | "advanced", +) { + const control = new Control(ipc, serialization); target.channel = control; + if (!hasSetBufferConstructor) { + op_node_ipc_buffer_constructor(Buffer, FastBuffer.prototype); + hasSetBufferConstructor = true; + } + + const writeFn = serialization === "json" + ? op_node_ipc_write_json + : op_node_ipc_write_advanced; + const readFn = serialization === "json" + ? op_node_ipc_read_json + : op_node_ipc_read_advanced; + async function readLoop() { try { while (true) { if (!target.connected || target.killed) { return; } - const prom = op_node_ipc_read(ipc); + // TODO(nathanwhit): maybe allow returning multiple messages in a single read? needs benchmarking. + const prom = readFn(ipc); // there will always be a pending read promise, // but it shouldn't keep the event loop from exiting core.unrefOpPromise(prom); @@ -1500,7 +1537,7 @@ export function setupChannel(target: any, ipc: number) { // this acts as a backpressure mechanism. const queueOk = [true]; control.refCounted(); - op_node_ipc_write(ipc, message, queueOk) + writeFn(ipc, message, queueOk) .then(() => { control.unrefCounted(); if (callback) { diff --git a/ext/node/polyfills/internal/util/comparisons.ts b/ext/node/polyfills/internal/util/comparisons.ts index 8254f84a6c..c81d5d0cd1 100644 --- a/ext/node/polyfills/internal/util/comparisons.ts +++ b/ext/node/polyfills/internal/util/comparisons.ts @@ -32,6 +32,7 @@ import { primordials } from "ext:core/mod.js"; const { ObjectPrototypeHasOwnProperty, ObjectPrototypePropertyIsEnumerable, + TypedArrayPrototypeGetSymbolToStringTag, } = primordials; enum valueType { @@ -145,23 +146,6 @@ function innerDeepEqual( return false; } } else if (isArrayBufferView(val1)) { - const TypedArrayPrototypeGetSymbolToStringTag = ( - val: - | BigInt64Array - | BigUint64Array - | Float32Array - | Float64Array - | Int8Array - | Int16Array - | Int32Array - | Uint8Array - | Uint8ClampedArray - | Uint16Array - | Uint32Array, - ) => - Object.getOwnPropertySymbols(val) - .map((item) => item.toString()) - .toString(); if ( isTypedArray(val1) && isTypedArray(val2) && diff --git a/ext/process/40_process.js b/ext/process/40_process.js index 974fa79a9d..d22be546ae 100644 --- a/ext/process/40_process.js +++ b/ext/process/40_process.js @@ -162,6 +162,7 @@ function run({ export const kExtraStdio = Symbol("extraStdio"); export const kIpc = Symbol("ipc"); export const kNeedsNpmProcessState = Symbol("needsNpmProcessState"); +export const kSerialization = Symbol("serialization"); const illegalConstructorKey = Symbol("illegalConstructorKey"); @@ -178,6 +179,7 @@ function spawnChildInner(command, apiName, { stderr = "piped", windowsRawArguments = false, detached = false, + [kSerialization]: serialization = "json", [kExtraStdio]: extraStdio = [], [kIpc]: ipc = -1, [kNeedsNpmProcessState]: needsNpmProcessState = false, @@ -195,6 +197,7 @@ function spawnChildInner(command, apiName, { stderr, windowsRawArguments, ipc, + serialization, extraStdio, detached, needsNpmProcessState, diff --git a/ext/process/ipc.rs b/ext/process/ipc.rs index 3d6e6a789a..c02df8056d 100644 --- a/ext/process/ipc.rs +++ b/ext/process/ipc.rs @@ -1,8 +1,5 @@ // Copyright 2018-2025 the Deno authors. MIT license. -#![allow(unused)] - -use std::cell::RefCell; use std::future::Future; use std::io; use std::mem; @@ -18,7 +15,6 @@ use deno_core::AsyncRefCell; use deno_core::CancelHandle; use deno_core::ExternalOpsTracker; use deno_core::RcRef; -use deno_core::serde; use deno_core::serde_json; use deno_io::BiPipe; use deno_io::BiPipeRead; @@ -219,6 +215,231 @@ pub enum IpcJsonStreamError { SimdJson(#[source] simd_json::Error), } +#[derive(Debug, thiserror::Error, deno_error::JsError)] +pub enum IpcAdvancedStreamError { + #[class(inherit)] + #[error("{0}")] + Io(#[source] std::io::Error), +} + +pub struct IpcAdvancedStream { + pipe: BiPipeRead, + read_buffer: ReadBuffer, +} + +struct MessageLengthBuffer { + buffer: [u8; 4], + pos: u8, + value: u32, +} + +impl MessageLengthBuffer { + fn new() -> Self { + Self { + buffer: [0; 4], + pos: 0, + value: 0, + } + } + + fn message_len(&mut self) -> Option { + if self.pos == 4 { + self.value = u32::from_be_bytes(self.buffer); + self.pos = 5; + Some(self.value as usize) + } else if self.pos == 5 { + Some(self.value as usize) + } else { + None + } + } + + fn update_pos_by(&mut self, num: usize) { + self.pos = (self.pos + num as u8).min(4); + } + + fn available_mut(&mut self) -> &mut [u8] { + &mut self.buffer[self.pos as usize..4] + } +} + +pin_project! { + #[must_use = "futures do nothing unless you `.await` or poll them"] + struct ReadMsgBytesInner<'a, R: ?Sized> { + length_buffer: &'a mut MessageLengthBuffer, + reader: &'a mut R, + out_buf: &'a mut Vec, + // The number of bytes appended to buf. This can be less than buf.len() if + // the buffer was not empty when the operation was started. + read: usize, + read_buffer: &'a mut ReadBuffer, + } +} + +fn read_msg_bytes_inner<'a, R: AsyncRead + ?Sized + Unpin>( + reader: &'a mut R, + length_buffer: &'a mut MessageLengthBuffer, + out_buf: &'a mut Vec, + read: usize, + read_buffer: &'a mut ReadBuffer, +) -> ReadMsgBytesInner<'a, R> { + ReadMsgBytesInner { + length_buffer, + reader, + out_buf, + read, + read_buffer, + } +} + +impl IpcAdvancedStream { + fn new(pipe: BiPipeRead) -> Self { + Self { + pipe, + read_buffer: ReadBuffer::new(), + } + } + + pub async fn read_msg_bytes( + &mut self, + ) -> Result>, IpcAdvancedStreamError> { + let mut length_buffer = MessageLengthBuffer::new(); + let mut out_buf = Vec::with_capacity(32); + let nread = read_msg_bytes_inner( + &mut self.pipe, + &mut length_buffer, + &mut out_buf, + 0, + &mut self.read_buffer, + ) + .await + .map_err(IpcAdvancedStreamError::Io)?; + if nread == 0 { + return Ok(None); + } + Ok(Some(std::mem::take(&mut out_buf))) + } +} + +impl Future for ReadMsgBytesInner<'_, R> { + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + read_advanced_msg_bytes_internal( + Pin::new(me.reader), + cx, + me.length_buffer, + me.read_buffer, + me.out_buf, + me.read, + ) + } +} + +pub struct IpcAdvancedStreamResource { + pub read_half: AsyncRefCell, + pub write_half: AsyncRefCell, + pub cancel: Rc, + pub queued_bytes: AtomicUsize, + pub ref_tracker: IpcRefTracker, +} + +impl IpcAdvancedStreamResource { + pub fn new( + stream: i64, + ref_tracker: IpcRefTracker, + ) -> Result { + let (read_half, write_half) = BiPipe::from_raw(stream as _)?.split(); + Ok(Self { + read_half: AsyncRefCell::new(IpcAdvancedStream::new(read_half)), + write_half: AsyncRefCell::new(write_half), + cancel: Default::default(), + queued_bytes: Default::default(), + ref_tracker, + }) + } + + /// writes serialized message to the IPC pipe. the first 4 bytes must be the length of the following message. + pub async fn write_msg_bytes( + self: Rc, + msg: &[u8], + ) -> Result<(), io::Error> { + let mut write_half = RcRef::map(self, |r| &r.write_half).borrow_mut().await; + write_half.write_all(msg).await?; + Ok(()) + } +} + +impl deno_core::Resource for IpcAdvancedStreamResource { + fn close(self: Rc) { + self.cancel.cancel(); + } +} + +fn read_advanced_msg_bytes_internal( + mut reader: Pin<&mut R>, + cx: &mut Context<'_>, + length_buffer: &mut MessageLengthBuffer, + read_buffer: &mut ReadBuffer, + out_buffer: &mut Vec, + read: &mut usize, +) -> Poll> { + loop { + if read_buffer.needs_fill() { + let mut read_buf = ReadBuf::new(read_buffer.get_mut()); + ready!(reader.as_mut().poll_read(cx, &mut read_buf))?; + read_buffer.cap = read_buf.filled().len(); + read_buffer.pos = 0; + } + let available = read_buffer.available_mut(); + let msg_len = length_buffer.message_len(); + let (done, used) = if let Some(msg_len) = msg_len { + if out_buffer.len() >= msg_len { + (true, 0) + } else if available.is_empty() { + if *read == 0 { + return Poll::Ready(Ok(0)); + } else { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "ipc stream closed while reading message", + ))); + } + } else { + let remaining = msg_len - out_buffer.len(); + out_buffer.reserve(remaining); + let to_copy = available.len().min(remaining); + out_buffer.extend_from_slice(&available[..to_copy]); + (out_buffer.len() == msg_len, to_copy) + } + } else { + if available.is_empty() { + if *read == 0 { + return Poll::Ready(Ok(0)); + } else { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "ipc stream closed before message length", + ))); + } + } + let len_avail = length_buffer.available_mut(); + let to_copy = available.len().min(len_avail.len()); + len_avail[..to_copy].copy_from_slice(&available[..to_copy]); + length_buffer.update_pos_by(to_copy); + (false, to_copy) + }; + + read_buffer.consume(used); + *read += used; + + if done || *read == 0 { + return Poll::Ready(Ok(mem::replace(read, 0))); + } + } +} + // JSON serialization stream over IPC pipe. // // `\n` is used as a delimiter between messages. @@ -304,7 +525,7 @@ where } } -fn read_msg_internal( +fn read_json_msg_internal( mut reader: Pin<&mut R>, cx: &mut Context<'_>, buf: &mut Vec, @@ -354,7 +575,7 @@ impl Future for ReadMsgInner<'_, R> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let me = self.project(); - read_msg_internal( + read_json_msg_internal( Pin::new(*me.reader), cx, me.buf, @@ -369,11 +590,8 @@ impl Future for ReadMsgInner<'_, R> { mod tests { use std::rc::Rc; - use deno_core::JsRuntime; use deno_core::RcRef; - use deno_core::RuntimeOptions; use deno_core::serde_json::json; - use deno_core::v8; use super::IpcJsonStreamResource; diff --git a/ext/process/lib.rs b/ext/process/lib.rs index e67454a7b5..1f4600b6a2 100644 --- a/ext/process/lib.rs +++ b/ext/process/lib.rs @@ -50,6 +50,7 @@ use serde::Serialize; use tokio::process::Child as AsyncChild; pub mod ipc; +use ipc::IpcAdvancedStreamResource; use ipc::IpcJsonStreamResource; use ipc::IpcRefTracker; @@ -201,6 +202,8 @@ pub struct SpawnArgs { windows_raw_arguments: bool, ipc: Option, + serialization: Option, + #[serde(flatten)] stdio: ChildStdio, @@ -211,6 +214,26 @@ pub struct SpawnArgs { needs_npm_process_state: bool, } +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum ChildIpcSerialization { + Json, + Advanced, +} + +impl std::fmt::Display for ChildIpcSerialization { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match self { + ChildIpcSerialization::Json => "json", + ChildIpcSerialization::Advanced => "advanced", + } + ) + } +} + #[cfg(unix)] deno_error::js_error_wrapper!(nix::Error, JsNixError, |err| { match err { @@ -477,12 +500,30 @@ fn create_command( fds_to_dup.push((ipc_fd2, ipc)); fds_to_close.push(ipc_fd2); /* One end returned to parent process (this) */ - let pipe_rid = state.resource_table.add(IpcJsonStreamResource::new( - ipc_fd1 as _, - IpcRefTracker::new(state.external_ops_tracker.clone()), - )?); + let pipe_rid = match args.serialization { + Some(ChildIpcSerialization::Json) | None => { + state.resource_table.add(IpcJsonStreamResource::new( + ipc_fd1 as _, + IpcRefTracker::new(state.external_ops_tracker.clone()), + )?) + } + Some(ChildIpcSerialization::Advanced) => { + state.resource_table.add(IpcAdvancedStreamResource::new( + ipc_fd1 as _, + IpcRefTracker::new(state.external_ops_tracker.clone()), + )?) + } + }; + /* The other end passed to child process via NODE_CHANNEL_FD */ command.env("NODE_CHANNEL_FD", format!("{}", ipc)); + command.env( + "NODE_CHANNEL_SERIALIZATION_MODE", + args + .serialization + .unwrap_or(ChildIpcSerialization::Json) + .to_string(), + ); ipc_rid = Some(pipe_rid); } @@ -548,18 +589,34 @@ fn create_command( let (hd1, hd2) = deno_io::bi_pipe_pair_raw()?; /* One end returned to parent process (this) */ - let pipe_rid = - Some(state.resource_table.add(IpcJsonStreamResource::new( - hd1 as i64, - IpcRefTracker::new(state.external_ops_tracker.clone()), - )?)); + let pipe_rid = match args.serialization { + Some(ChildIpcSerialization::Json) | None => { + state.resource_table.add(IpcJsonStreamResource::new( + hd1 as _, + IpcRefTracker::new(state.external_ops_tracker.clone()), + )?) + } + Some(ChildIpcSerialization::Advanced) => { + state.resource_table.add(IpcAdvancedStreamResource::new( + hd1 as _, + IpcRefTracker::new(state.external_ops_tracker.clone()), + )?) + } + }; /* The other end passed to child process via NODE_CHANNEL_FD */ command.env("NODE_CHANNEL_FD", format!("{}", hd2 as i64)); + command.env( + "NODE_CHANNEL_SERIALIZATION_MODE", + args + .serialization + .unwrap_or(ChildIpcSerialization::Json) + .to_string(), + ); handles_to_close.push(hd2); - ipc_rid = pipe_rid; + ipc_rid = Some(pipe_rid); } for (i, stdio) in args.extra_stdio.into_iter().enumerate() { diff --git a/runtime/worker.rs b/runtime/worker.rs index ad6a1882c9..3633ed5321 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -739,8 +739,8 @@ impl MainWorker { let op_state = self.js_runtime.op_state(); let mut state = op_state.borrow_mut(); state.put(options.clone()); - if let Some(node_ipc_fd) = options.node_ipc_fd { - state.put(deno_node::ChildPipeFd(node_ipc_fd)); + if let Some((fd, serialization)) = options.node_ipc_init { + state.put(deno_node::ChildPipeFd(fd, serialization)); } } diff --git a/runtime/worker_bootstrap.rs b/runtime/worker_bootstrap.rs index f2652848fd..ff886a9d00 100644 --- a/runtime/worker_bootstrap.rs +++ b/runtime/worker_bootstrap.rs @@ -5,6 +5,7 @@ use std::thread; use deno_core::ModuleSpecifier; use deno_core::v8; +use deno_node::ops::ipc::ChildIpcSerialization; use deno_telemetry::OtelConfig; use deno_terminal::colors; use serde::Serialize; @@ -109,7 +110,7 @@ pub struct BootstrapOptions { pub has_node_modules_dir: bool, pub argv0: Option, pub node_debug: Option, - pub node_ipc_fd: Option, + pub node_ipc_init: Option<(i64, ChildIpcSerialization)>, pub mode: WorkerExecutionMode, pub no_legacy_abort: bool, // Used by `deno serve` @@ -149,7 +150,7 @@ impl Default for BootstrapOptions { has_node_modules_dir: false, argv0: None, node_debug: None, - node_ipc_fd: None, + node_ipc_init: None, mode: WorkerExecutionMode::None, no_legacy_abort: false, serve_port: Default::default(), diff --git a/tests/node_compat/config.toml b/tests/node_compat/config.toml index fe648af4cc..4d028b0277 100644 --- a/tests/node_compat/config.toml +++ b/tests/node_compat/config.toml @@ -135,6 +135,10 @@ "parallel/test-buffer-zero-fill-cli.js" = {} "parallel/test-buffer-zero-fill-reset.js" = {} "parallel/test-buffer-zero-fill.js" = {} +"parallel/test-child-process-advanced-serialization.js" = {} +# TODO(nathanwhit): figure out why this is failing on windows +"parallel/test-child-process-advanced-serialization-largebuffer.js" = { windows = false } +"parallel/test-child-process-advanced-serialization-splitted-length-field.js" = {} "parallel/test-child-process-can-write-to-stdout.js" = { flaky = true } "parallel/test-child-process-default-options.js" = {} "parallel/test-child-process-detached.js" = {} diff --git a/tests/unit_node/child_process_test.ts b/tests/unit_node/child_process_test.ts index f7fc3ae32a..aac3c9e986 100644 --- a/tests/unit_node/child_process_test.ts +++ b/tests/unit_node/child_process_test.ts @@ -1006,6 +1006,95 @@ Deno.test( }, ); +Deno.test(async function ipcSerializationAdvanced() { + const timeout = withTimeout(); + const script = ` + if (typeof process.send !== "function") { + console.error("process.send is not a function"); + process.exit(1); + } + + class BigIntWrapper { + constructor(value) { + this.value = value; + } + toJSON() { + return this.value.toString(); + } + } + + const makeSab = (arr) => { + const sab = new SharedArrayBuffer(arr.length); + const buf = new Uint8Array(sab); + for (let i = 0; i < arr.length; i++) { + buf[i] = arr[i]; + } + return buf; + }; + + + const inputs = [ + "foo", + { + foo: "bar", + }, + 42, + true, + null, + new Uint8Array([1, 2, 3]), + { + foo: new Uint8Array([1, 2, 3]), + bar: makeSab([4, 5, 6]), + }, + [1, { foo: 2 }, [3, 4]], + 42n, + ]; + for (const input of inputs) { + process.send(input); + } + `; + const makeSab = (arr: number[]) => { + const sab = new SharedArrayBuffer(arr.length); + const buf = new Uint8Array(sab); + for (let i = 0; i < arr.length; i++) { + buf[i] = arr[i]; + } + return buf; + }; + + const file = await Deno.makeTempFile(); + await Deno.writeTextFile(file, script); + const child = CP.fork(file, [], { + stdio: ["inherit", "inherit", "inherit", "ipc"], + serialization: "advanced", + }); + const expect = [ + "foo", + { + foo: "bar", + }, + 42, + true, + null, + new Uint8Array([1, 2, 3]), + { + foo: new Uint8Array([1, 2, 3]), + bar: makeSab([4, 5, 6]), + }, + [1, { foo: 2 }, [3, 4]], + 42n, + ]; + let i = 0; + + child.on("message", (message) => { + assertEquals(message, expect[i]); + i++; + }); + child.on("close", () => timeout.resolve()); + await timeout.promise; + assertEquals(i, expect.length); +}); + Deno.test(async function childProcessExitsGracefully() { const testdataDir = path.join( path.dirname(path.fromFileUrl(import.meta.url)),