fix(ext/node): worker_threads handles basic require calls (#30279)
Some checks are pending
ci / test release windows-x86_64 (push) Blocked by required conditions
ci / build libs (push) Blocked by required conditions
ci / test debug macos-aarch64 (push) Blocked by required conditions
ci / test release macos-aarch64 (push) Blocked by required conditions
ci / bench release linux-x86_64 (push) Blocked by required conditions
ci / lint debug linux-x86_64 (push) Blocked by required conditions
ci / lint debug macos-x86_64 (push) Blocked by required conditions
ci / lint debug windows-x86_64 (push) Blocked by required conditions
ci / test debug linux-x86_64 (push) Blocked by required conditions
ci / test release linux-x86_64 (push) Blocked by required conditions
ci / test debug macos-x86_64 (push) Blocked by required conditions
ci / test release macos-x86_64 (push) Blocked by required conditions
ci / test debug windows-x86_64 (push) Blocked by required conditions
ci / pre-build (push) Waiting to run
ci / test debug linux-aarch64 (push) Blocked by required conditions
ci / test release linux-aarch64 (push) Blocked by required conditions
ci / publish canary (push) Blocked by required conditions

This is not a full-fledged and fully correct `require`/CJS support
for `node:worker_threads`, but unlocks certain scenarios that
were not working at all previously.
This commit is contained in:
Bartek Iwańczuk 2025-08-02 13:27:06 +02:00 committed by GitHub
parent 43b376cd47
commit dcdd1d6139
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 54 additions and 7 deletions

View file

@ -19,6 +19,7 @@ function initialize(args) {
maybeWorkerMetadata, maybeWorkerMetadata,
nodeDebug, nodeDebug,
warmup = false, warmup = false,
moduleSpecifier = null,
} = args; } = args;
if (!warmup) { if (!warmup) {
if (initialized) { if (initialized) {
@ -41,6 +42,7 @@ function initialize(args) {
runningOnMainThread, runningOnMainThread,
workerId, workerId,
maybeWorkerMetadata, maybeWorkerMetadata,
moduleSpecifier,
); );
internals.__setupChildProcessIpcChannel(); internals.__setupChildProcessIpcChannel();
// `Deno[Deno.internal].requireImpl` will be unreachable after this line. // `Deno[Deno.internal].requireImpl` will be unreachable after this line.

View file

@ -29,22 +29,26 @@ import { EventEmitter } from "node:events";
import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js"; import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js";
import { untransferableSymbol } from "ext:deno_node/internal_binding/util.ts"; import { untransferableSymbol } from "ext:deno_node/internal_binding/util.ts";
import process from "node:process"; import process from "node:process";
import { createRequire } from "node:module";
const { JSONParse, JSONStringify, ObjectPrototypeIsPrototypeOf } = primordials;
const { const {
encodeURIComponent,
Error, Error,
ObjectHasOwn,
PromiseResolve,
FunctionPrototypeCall, FunctionPrototypeCall,
JSONParse,
JSONStringify,
ObjectHasOwn,
ObjectPrototypeIsPrototypeOf,
PromiseResolve,
SafeMap,
SafeSet, SafeSet,
SafeWeakMap,
StringPrototypeStartsWith,
StringPrototypeTrim,
Symbol, Symbol,
SymbolFor, SymbolFor,
SymbolIterator, SymbolIterator,
StringPrototypeTrim,
SafeWeakMap,
SafeMap,
TypeError, TypeError,
encodeURIComponent,
} = primordials; } = primordials;
const debugWorkerThreads = false; const debugWorkerThreads = false;
@ -363,6 +367,7 @@ internals.__initWorkerThreads = (
runningOnMainThread: boolean, runningOnMainThread: boolean,
workerId, workerId,
maybeWorkerMetadata, maybeWorkerMetadata,
moduleSpecifier,
) => { ) => {
isMainThread = runningOnMainThread; isMainThread = runningOnMainThread;
@ -377,6 +382,17 @@ internals.__initWorkerThreads = (
defaultExport.resourceLimits = resourceLimits; defaultExport.resourceLimits = resourceLimits;
if (!isMainThread) { if (!isMainThread) {
// TODO(bartlomieju): this is a really hacky way to provide
// require in worker_threads - this should be rewritten to use proper
// CJS/ESM loading
if (moduleSpecifier) {
globalThis.require = createRequire(
StringPrototypeStartsWith(moduleSpecifier, "data:")
? `${Deno.cwd()}/[worker eval]`
: moduleSpecifier,
);
}
const listeners = new SafeWeakMap< const listeners = new SafeWeakMap<
// deno-lint-ignore no-explicit-any // deno-lint-ignore no-explicit-any
(...args: any[]) => void, (...args: any[]) => void,

View file

@ -1089,6 +1089,7 @@ function bootstrapWorkerRuntime(
} }
// Not available in workers // Not available in workers
const moduleSpecifier = finalDenoNs.mainModule;
delete finalDenoNs.mainModule; delete finalDenoNs.mainModule;
if (!ArrayPrototypeIncludes(unstableFeatures, unstableIds.unsafeProto)) { if (!ArrayPrototypeIncludes(unstableFeatures, unstableIds.unsafeProto)) {
@ -1121,6 +1122,7 @@ function bootstrapWorkerRuntime(
workerId, workerId,
maybeWorkerMetadata: workerMetadata, maybeWorkerMetadata: workerMetadata,
nodeDebug, nodeDebug,
moduleSpecifier: workerType === "node" ? moduleSpecifier : null,
}); });
} }
} else { } else {

View file

@ -864,3 +864,30 @@ Deno.test("[node/worker_threads] Worker runs async ops correctly", async () => {
await recvMessage.promise; await recvMessage.promise;
}); });
Deno.test("[node/worker_threads] Worker works with CJS require", async () => {
const recvMessage = Promise.withResolvers<void>();
const worker = new workerThreads.Worker(
`
const assert = require("assert");
require("worker_threads").parentPort.on("message", ({ port }) => {
assert(port instanceof MessagePort);
port.postMessage("Hello from worker");
});
`,
{ eval: true },
);
const channel = new workerThreads.MessageChannel();
worker.postMessage({ port: channel.port2 }, [channel.port2]);
channel.port1.on("message", (msg) => {
assertEquals(msg, "Hello from worker");
channel.port1.close();
channel.port2.close();
worker.terminate();
recvMessage.resolve();
});
await recvMessage.promise;
});