mirror of
https://github.com/denoland/deno.git
synced 2025-09-26 12:19:12 +00:00
fix(ext/node): make worker setup synchronous (#22815)
This commit fixes race condition in "node:worker_threads" module were the first message did a setup of "threadId", "workerData" and "environmentData". Now this data is passed explicitly during workers creation and is set up before any user code is executed. Closes https://github.com/denoland/deno/issues/22783 Closes https://github.com/denoland/deno/issues/22672 --------- Co-authored-by: Satya Rohith <me@satyarohith.com>
This commit is contained in:
parent
28b362adfc
commit
d69aab62b0
6 changed files with 60 additions and 46 deletions
|
@ -841,6 +841,7 @@ fn create_web_worker_callback(
|
||||||
stdio: stdio.clone(),
|
stdio: stdio.clone(),
|
||||||
cache_storage_dir,
|
cache_storage_dir,
|
||||||
feature_checker,
|
feature_checker,
|
||||||
|
maybe_worker_metadata: args.maybe_worker_metadata,
|
||||||
};
|
};
|
||||||
|
|
||||||
WebWorker::bootstrap_from_options(
|
WebWorker::bootstrap_from_options(
|
||||||
|
|
|
@ -14,6 +14,7 @@ function initialize(
|
||||||
usesLocalNodeModulesDir,
|
usesLocalNodeModulesDir,
|
||||||
argv0,
|
argv0,
|
||||||
runningOnMainThread,
|
runningOnMainThread,
|
||||||
|
maybeWorkerMetadata,
|
||||||
) {
|
) {
|
||||||
if (initialized) {
|
if (initialized) {
|
||||||
throw Error("Node runtime already initialized");
|
throw Error("Node runtime already initialized");
|
||||||
|
@ -38,7 +39,7 @@ function initialize(
|
||||||
// FIXME(bartlomieju): not nice to depend on `Deno` namespace here
|
// FIXME(bartlomieju): not nice to depend on `Deno` namespace here
|
||||||
// but it's the only way to get `args` and `version` and this point.
|
// but it's the only way to get `args` and `version` and this point.
|
||||||
internals.__bootstrapNodeProcess(argv0, Deno.args, Deno.version);
|
internals.__bootstrapNodeProcess(argv0, Deno.args, Deno.version);
|
||||||
internals.__initWorkerThreads(runningOnMainThread);
|
internals.__initWorkerThreads(runningOnMainThread, maybeWorkerMetadata);
|
||||||
internals.__setupChildProcessIpcChannel();
|
internals.__setupChildProcessIpcChannel();
|
||||||
// `Deno[Deno.internal].requireImpl` will be unreachable after this line.
|
// `Deno[Deno.internal].requireImpl` will be unreachable after this line.
|
||||||
delete internals.requireImpl;
|
delete internals.requireImpl;
|
||||||
|
|
|
@ -22,7 +22,7 @@ import {
|
||||||
import * as webidl from "ext:deno_webidl/00_webidl.js";
|
import * as webidl from "ext:deno_webidl/00_webidl.js";
|
||||||
import { log } from "ext:runtime/06_util.js";
|
import { log } from "ext:runtime/06_util.js";
|
||||||
import { notImplemented } from "ext:deno_node/_utils.ts";
|
import { notImplemented } from "ext:deno_node/_utils.ts";
|
||||||
import { EventEmitter, once } from "node:events";
|
import { EventEmitter } from "node:events";
|
||||||
import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js";
|
import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js";
|
||||||
import { isAbsolute, resolve } from "node:path";
|
import { isAbsolute, resolve } from "node:path";
|
||||||
|
|
||||||
|
@ -42,7 +42,6 @@ const {
|
||||||
SafeRegExp,
|
SafeRegExp,
|
||||||
SafeMap,
|
SafeMap,
|
||||||
TypeError,
|
TypeError,
|
||||||
PromisePrototypeThen,
|
|
||||||
} = primordials;
|
} = primordials;
|
||||||
|
|
||||||
export interface WorkerOptions {
|
export interface WorkerOptions {
|
||||||
|
@ -196,6 +195,13 @@ class NodeWorker extends EventEmitter {
|
||||||
name = "[worker eval]";
|
name = "[worker eval]";
|
||||||
}
|
}
|
||||||
this.#name = name;
|
this.#name = name;
|
||||||
|
this.threadId = ++threads;
|
||||||
|
|
||||||
|
const serializedWorkerMetadata = serializeJsMessageData({
|
||||||
|
workerData: options?.workerData,
|
||||||
|
environmentData: environmentData,
|
||||||
|
threadId: this.threadId,
|
||||||
|
}, options?.transferList ?? []);
|
||||||
const id = op_create_worker(
|
const id = op_create_worker(
|
||||||
{
|
{
|
||||||
// deno-lint-ignore prefer-primordials
|
// deno-lint-ignore prefer-primordials
|
||||||
|
@ -206,16 +212,11 @@ class NodeWorker extends EventEmitter {
|
||||||
name: this.#name,
|
name: this.#name,
|
||||||
workerType: "module",
|
workerType: "module",
|
||||||
},
|
},
|
||||||
|
serializedWorkerMetadata,
|
||||||
);
|
);
|
||||||
this.#id = id;
|
this.#id = id;
|
||||||
this.#pollControl();
|
this.#pollControl();
|
||||||
this.#pollMessages();
|
this.#pollMessages();
|
||||||
|
|
||||||
this.postMessage({
|
|
||||||
environmentData,
|
|
||||||
threadId: (this.threadId = ++threads),
|
|
||||||
workerData: options?.workerData,
|
|
||||||
}, options?.transferList || []);
|
|
||||||
// https://nodejs.org/api/worker_threads.html#event-online
|
// https://nodejs.org/api/worker_threads.html#event-online
|
||||||
this.emit("online");
|
this.emit("online");
|
||||||
}
|
}
|
||||||
|
@ -387,7 +388,10 @@ type ParentPort = typeof self & NodeEventTarget;
|
||||||
// deno-lint-ignore no-explicit-any
|
// deno-lint-ignore no-explicit-any
|
||||||
let parentPort: ParentPort = null as any;
|
let parentPort: ParentPort = null as any;
|
||||||
|
|
||||||
internals.__initWorkerThreads = (runningOnMainThread: boolean) => {
|
internals.__initWorkerThreads = (
|
||||||
|
runningOnMainThread: boolean,
|
||||||
|
maybeWorkerMetadata,
|
||||||
|
) => {
|
||||||
isMainThread = runningOnMainThread;
|
isMainThread = runningOnMainThread;
|
||||||
|
|
||||||
defaultExport.isMainThread = isMainThread;
|
defaultExport.isMainThread = isMainThread;
|
||||||
|
@ -409,29 +413,15 @@ internals.__initWorkerThreads = (runningOnMainThread: boolean) => {
|
||||||
>();
|
>();
|
||||||
|
|
||||||
parentPort = self as ParentPort;
|
parentPort = self as ParentPort;
|
||||||
|
if (typeof maybeWorkerMetadata !== "undefined") {
|
||||||
|
const { 0: metadata, 1: _ } = maybeWorkerMetadata;
|
||||||
|
workerData = metadata.workerData;
|
||||||
|
environmentData = metadata.environmentData;
|
||||||
|
threadId = metadata.threadId;
|
||||||
|
}
|
||||||
|
defaultExport.workerData = workerData;
|
||||||
defaultExport.parentPort = parentPort;
|
defaultExport.parentPort = parentPort;
|
||||||
|
defaultExport.threadId = threadId;
|
||||||
const initPromise = PromisePrototypeThen(
|
|
||||||
once(
|
|
||||||
parentPort,
|
|
||||||
"message",
|
|
||||||
),
|
|
||||||
(result) => {
|
|
||||||
// TODO(bartlomieju): just so we don't error out here. It's still racy,
|
|
||||||
// but should be addressed by https://github.com/denoland/deno/issues/22783
|
|
||||||
// shortly.
|
|
||||||
const data = result[0].data ?? {};
|
|
||||||
// TODO(kt3k): The below values are set asynchronously
|
|
||||||
// using the first message from the parent.
|
|
||||||
// This should be done synchronously.
|
|
||||||
threadId = data.threadId;
|
|
||||||
workerData = data.workerData;
|
|
||||||
environmentData = data.environmentData;
|
|
||||||
|
|
||||||
defaultExport.threadId = threadId;
|
|
||||||
defaultExport.workerData = workerData;
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
parentPort.off = parentPort.removeListener = function (
|
parentPort.off = parentPort.removeListener = function (
|
||||||
this: ParentPort,
|
this: ParentPort,
|
||||||
|
@ -447,22 +437,18 @@ internals.__initWorkerThreads = (runningOnMainThread: boolean) => {
|
||||||
name,
|
name,
|
||||||
listener,
|
listener,
|
||||||
) {
|
) {
|
||||||
PromisePrototypeThen(initPromise, () => {
|
// deno-lint-ignore no-explicit-any
|
||||||
// deno-lint-ignore no-explicit-any
|
const _listener = (ev: any) => listener(ev.data);
|
||||||
const _listener = (ev: any) => listener(ev.data);
|
listeners.set(listener, _listener);
|
||||||
listeners.set(listener, _listener);
|
this.addEventListener(name, _listener);
|
||||||
this.addEventListener(name, _listener);
|
|
||||||
});
|
|
||||||
return this;
|
return this;
|
||||||
};
|
};
|
||||||
|
|
||||||
parentPort.once = function (this: ParentPort, name, listener) {
|
parentPort.once = function (this: ParentPort, name, listener) {
|
||||||
PromisePrototypeThen(initPromise, () => {
|
// deno-lint-ignore no-explicit-any
|
||||||
// deno-lint-ignore no-explicit-any
|
const _listener = (ev: any) => listener(ev.data);
|
||||||
const _listener = (ev: any) => listener(ev.data);
|
listeners.set(listener, _listener);
|
||||||
listeners.set(listener, _listener);
|
this.addEventListener(name, _listener);
|
||||||
this.addEventListener(name, _listener);
|
|
||||||
});
|
|
||||||
return this;
|
return this;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -786,6 +786,7 @@ function bootstrapWorkerRuntime(
|
||||||
runtimeOptions,
|
runtimeOptions,
|
||||||
name,
|
name,
|
||||||
internalName,
|
internalName,
|
||||||
|
maybeWorkerMetadata,
|
||||||
) {
|
) {
|
||||||
if (hasBootstrapped) {
|
if (hasBootstrapped) {
|
||||||
throw new Error("Worker runtime already bootstrapped");
|
throw new Error("Worker runtime already bootstrapped");
|
||||||
|
@ -908,8 +909,17 @@ function bootstrapWorkerRuntime(
|
||||||
// existing global `Deno` with `Deno` namespace from "./deno.ts".
|
// existing global `Deno` with `Deno` namespace from "./deno.ts".
|
||||||
ObjectDefineProperty(globalThis, "Deno", core.propReadOnly(finalDenoNs));
|
ObjectDefineProperty(globalThis, "Deno", core.propReadOnly(finalDenoNs));
|
||||||
|
|
||||||
|
const workerMetadata = maybeWorkerMetadata
|
||||||
|
? messagePort.deserializeJsMessageData(maybeWorkerMetadata)
|
||||||
|
: undefined;
|
||||||
|
|
||||||
if (nodeBootstrap) {
|
if (nodeBootstrap) {
|
||||||
nodeBootstrap(hasNodeModulesDir, argv0, /* runningOnMainThread */ false);
|
nodeBootstrap(
|
||||||
|
hasNodeModulesDir,
|
||||||
|
argv0,
|
||||||
|
/* runningOnMainThread */ false,
|
||||||
|
workerMetadata,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,7 @@ pub struct CreateWebWorkerArgs {
|
||||||
pub permissions: PermissionsContainer,
|
pub permissions: PermissionsContainer,
|
||||||
pub main_module: ModuleSpecifier,
|
pub main_module: ModuleSpecifier,
|
||||||
pub worker_type: WebWorkerType,
|
pub worker_type: WebWorkerType,
|
||||||
|
pub maybe_worker_metadata: Option<JsMessageData>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, SendableWebWorkerHandle)
|
pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, SendableWebWorkerHandle)
|
||||||
|
@ -121,6 +122,7 @@ pub struct CreateWorkerArgs {
|
||||||
fn op_create_worker(
|
fn op_create_worker(
|
||||||
state: &mut OpState,
|
state: &mut OpState,
|
||||||
#[serde] args: CreateWorkerArgs,
|
#[serde] args: CreateWorkerArgs,
|
||||||
|
#[serde] maybe_worker_metadata: Option<JsMessageData>,
|
||||||
) -> Result<WorkerId, AnyError> {
|
) -> Result<WorkerId, AnyError> {
|
||||||
let specifier = args.specifier.clone();
|
let specifier = args.specifier.clone();
|
||||||
let maybe_source_code = if args.has_source_code {
|
let maybe_source_code = if args.has_source_code {
|
||||||
|
@ -189,6 +191,7 @@ fn op_create_worker(
|
||||||
permissions: worker_permissions,
|
permissions: worker_permissions,
|
||||||
main_module: module_specifier.clone(),
|
main_module: module_specifier.clone(),
|
||||||
worker_type,
|
worker_type,
|
||||||
|
maybe_worker_metadata,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Send thread safe handle from newly created worker to host thread
|
// Send thread safe handle from newly created worker to host thread
|
||||||
|
|
|
@ -48,6 +48,7 @@ use deno_terminal::colors;
|
||||||
use deno_tls::RootCertStoreProvider;
|
use deno_tls::RootCertStoreProvider;
|
||||||
use deno_web::create_entangled_message_port;
|
use deno_web::create_entangled_message_port;
|
||||||
use deno_web::BlobStore;
|
use deno_web::BlobStore;
|
||||||
|
use deno_web::JsMessageData;
|
||||||
use deno_web::MessagePort;
|
use deno_web::MessagePort;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
|
@ -331,6 +332,8 @@ pub struct WebWorker {
|
||||||
pub main_module: ModuleSpecifier,
|
pub main_module: ModuleSpecifier,
|
||||||
poll_for_messages_fn: Option<v8::Global<v8::Value>>,
|
poll_for_messages_fn: Option<v8::Global<v8::Value>>,
|
||||||
bootstrap_fn_global: Option<v8::Global<v8::Function>>,
|
bootstrap_fn_global: Option<v8::Global<v8::Function>>,
|
||||||
|
// Consumed when `bootstrap_fn` is called
|
||||||
|
maybe_worker_metadata: Option<JsMessageData>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct WebWorkerOptions {
|
pub struct WebWorkerOptions {
|
||||||
|
@ -356,6 +359,7 @@ pub struct WebWorkerOptions {
|
||||||
pub cache_storage_dir: Option<std::path::PathBuf>,
|
pub cache_storage_dir: Option<std::path::PathBuf>,
|
||||||
pub stdio: Stdio,
|
pub stdio: Stdio,
|
||||||
pub feature_checker: Arc<FeatureChecker>,
|
pub feature_checker: Arc<FeatureChecker>,
|
||||||
|
pub maybe_worker_metadata: Option<JsMessageData>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WebWorker {
|
impl WebWorker {
|
||||||
|
@ -601,6 +605,7 @@ impl WebWorker {
|
||||||
main_module,
|
main_module,
|
||||||
poll_for_messages_fn: None,
|
poll_for_messages_fn: None,
|
||||||
bootstrap_fn_global: Some(bootstrap_fn_global),
|
bootstrap_fn_global: Some(bootstrap_fn_global),
|
||||||
|
maybe_worker_metadata: options.maybe_worker_metadata,
|
||||||
},
|
},
|
||||||
external_handle,
|
external_handle,
|
||||||
)
|
)
|
||||||
|
@ -616,6 +621,10 @@ impl WebWorker {
|
||||||
let bootstrap_fn = self.bootstrap_fn_global.take().unwrap();
|
let bootstrap_fn = self.bootstrap_fn_global.take().unwrap();
|
||||||
let bootstrap_fn = v8::Local::new(scope, bootstrap_fn);
|
let bootstrap_fn = v8::Local::new(scope, bootstrap_fn);
|
||||||
let undefined = v8::undefined(scope);
|
let undefined = v8::undefined(scope);
|
||||||
|
let mut worker_data: v8::Local<v8::Value> = v8::undefined(scope).into();
|
||||||
|
if let Some(data) = self.maybe_worker_metadata.take() {
|
||||||
|
worker_data = deno_core::serde_v8::to_v8(scope, data).unwrap();
|
||||||
|
}
|
||||||
let name_str: v8::Local<v8::Value> =
|
let name_str: v8::Local<v8::Value> =
|
||||||
v8::String::new(scope, &self.name).unwrap().into();
|
v8::String::new(scope, &self.name).unwrap().into();
|
||||||
let id_str: v8::Local<v8::Value> =
|
let id_str: v8::Local<v8::Value> =
|
||||||
|
@ -623,7 +632,11 @@ impl WebWorker {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.into();
|
.into();
|
||||||
bootstrap_fn
|
bootstrap_fn
|
||||||
.call(scope, undefined.into(), &[args, name_str, id_str])
|
.call(
|
||||||
|
scope,
|
||||||
|
undefined.into(),
|
||||||
|
&[args, name_str, id_str, worker_data],
|
||||||
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
// TODO(bartlomieju): this could be done using V8 API, without calling `execute_script`.
|
// TODO(bartlomieju): this could be done using V8 API, without calling `execute_script`.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue