mirror of
https://github.com/denoland/deno.git
synced 2025-09-26 12:19:12 +00:00
fix(ext/node): upgrade node:stream
(#28855)
Ref https://github.com/denoland/deno/issues/28836 This PR replaces the _stream.mjs bundle with a file-by-file port instead. A codemod transpiles Node.js internals to ESM. The codemod performs three tasks: translating CJS to ESM, remapping internal dependencies, and hoisting lazy requires as imports. The process is fully automated through the `update_node_stream.ts` script, simplifying future internal updates. The script checks out Node.js from a specific tag defined in the `tests/node_compat/runner`. Additionally, the update enables new tests in our Node test runner and adds features (like compose()) that were missing from the outdated bundle. ## Performance There is a 140KB+ binary size increase on aarch64-apple-darwin and nop startup time stays the same.
This commit is contained in:
parent
6e49a4b3bd
commit
01b6da9d9b
76 changed files with 11145 additions and 7032 deletions
473
ext/node/polyfills/internal/streams/operators.js
Normal file
473
ext/node/polyfills/internal/streams/operators.js
Normal file
|
@ -0,0 +1,473 @@
|
|||
// deno-lint-ignore-file
|
||||
// Copyright 2018-2025 the Deno authors. MIT license.
|
||||
|
||||
import { primordials } from "ext:core/mod.js";
|
||||
import { AbortController, AbortSignal } from "ext:deno_web/03_abort_signal.js";
|
||||
import imported1 from "ext:deno_node/internal/errors.ts";
|
||||
import {
|
||||
validateAbortSignal,
|
||||
validateInteger,
|
||||
validateObject,
|
||||
} from "ext:deno_node/internal/validators.mjs";
|
||||
import {
|
||||
kResistStopPropagation,
|
||||
kWeakHandler,
|
||||
} from "ext:deno_node/internal/event_target.mjs";
|
||||
import { finished } from "ext:deno_node/internal/streams/end-of-stream.js";
|
||||
import staticCompose from "ext:deno_node/internal/streams/compose.js";
|
||||
import { addAbortSignalNoValidate } from "ext:deno_node/internal/streams/add-abort-signal.js";
|
||||
import {
|
||||
isNodeStream,
|
||||
isWritable,
|
||||
} from "ext:deno_node/internal/streams/utils.js";
|
||||
|
||||
const {
|
||||
AbortError,
|
||||
codes: {
|
||||
ERR_INVALID_ARG_TYPE,
|
||||
ERR_INVALID_ARG_VALUE,
|
||||
ERR_MISSING_ARGS,
|
||||
ERR_OUT_OF_RANGE,
|
||||
},
|
||||
} = imported1;
|
||||
|
||||
"use strict";
|
||||
|
||||
const {
|
||||
ArrayPrototypePush,
|
||||
Boolean,
|
||||
MathFloor,
|
||||
Number,
|
||||
NumberIsNaN,
|
||||
Promise,
|
||||
PromisePrototypeThen,
|
||||
PromiseReject,
|
||||
PromiseResolve,
|
||||
Symbol,
|
||||
} = primordials;
|
||||
|
||||
const kEmpty = Symbol("kEmpty");
|
||||
const kEof = Symbol("kEof");
|
||||
|
||||
function compose(stream, options) {
|
||||
if (options != null) {
|
||||
validateObject(options, "options");
|
||||
}
|
||||
if (options?.signal != null) {
|
||||
validateAbortSignal(options.signal, "options.signal");
|
||||
}
|
||||
|
||||
if (isNodeStream(stream) && !isWritable(stream)) {
|
||||
throw new ERR_INVALID_ARG_VALUE("stream", stream, "must be writable");
|
||||
}
|
||||
|
||||
const composedStream = staticCompose(this, stream);
|
||||
|
||||
if (options?.signal) {
|
||||
// Not validating as we already validated before
|
||||
addAbortSignalNoValidate(
|
||||
options.signal,
|
||||
composedStream,
|
||||
);
|
||||
}
|
||||
|
||||
return composedStream;
|
||||
}
|
||||
|
||||
function map(fn, options) {
|
||||
if (typeof fn !== "function") {
|
||||
throw new ERR_INVALID_ARG_TYPE(
|
||||
"fn",
|
||||
["Function", "AsyncFunction"],
|
||||
fn,
|
||||
);
|
||||
}
|
||||
if (options != null) {
|
||||
validateObject(options, "options");
|
||||
}
|
||||
if (options?.signal != null) {
|
||||
validateAbortSignal(options.signal, "options.signal");
|
||||
}
|
||||
|
||||
let concurrency = 1;
|
||||
if (options?.concurrency != null) {
|
||||
concurrency = MathFloor(options.concurrency);
|
||||
}
|
||||
|
||||
let highWaterMark = concurrency - 1;
|
||||
if (options?.highWaterMark != null) {
|
||||
highWaterMark = MathFloor(options.highWaterMark);
|
||||
}
|
||||
|
||||
validateInteger(concurrency, "options.concurrency", 1);
|
||||
validateInteger(highWaterMark, "options.highWaterMark", 0);
|
||||
|
||||
highWaterMark += concurrency;
|
||||
|
||||
return async function* map() {
|
||||
const signal = AbortSignal.any([options?.signal].filter(Boolean));
|
||||
const stream = this;
|
||||
const queue = [];
|
||||
const signalOpt = { signal };
|
||||
|
||||
let next;
|
||||
let resume;
|
||||
let done = false;
|
||||
let cnt = 0;
|
||||
|
||||
function onCatch() {
|
||||
done = true;
|
||||
afterItemProcessed();
|
||||
}
|
||||
|
||||
function afterItemProcessed() {
|
||||
cnt -= 1;
|
||||
maybeResume();
|
||||
}
|
||||
|
||||
function maybeResume() {
|
||||
if (
|
||||
resume &&
|
||||
!done &&
|
||||
cnt < concurrency &&
|
||||
queue.length < highWaterMark
|
||||
) {
|
||||
resume();
|
||||
resume = null;
|
||||
}
|
||||
}
|
||||
|
||||
async function pump() {
|
||||
try {
|
||||
for await (let val of stream) {
|
||||
if (done) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (signal.aborted) {
|
||||
throw new AbortError();
|
||||
}
|
||||
|
||||
try {
|
||||
val = fn(val, signalOpt);
|
||||
|
||||
if (val === kEmpty) {
|
||||
continue;
|
||||
}
|
||||
|
||||
val = PromiseResolve(val);
|
||||
} catch (err) {
|
||||
val = PromiseReject(err);
|
||||
}
|
||||
|
||||
cnt += 1;
|
||||
|
||||
PromisePrototypeThen(val, afterItemProcessed, onCatch);
|
||||
|
||||
queue.push(val);
|
||||
if (next) {
|
||||
next();
|
||||
next = null;
|
||||
}
|
||||
|
||||
if (!done && (queue.length >= highWaterMark || cnt >= concurrency)) {
|
||||
await new Promise((resolve) => {
|
||||
resume = resolve;
|
||||
});
|
||||
}
|
||||
}
|
||||
queue.push(kEof);
|
||||
} catch (err) {
|
||||
const val = PromiseReject(err);
|
||||
PromisePrototypeThen(val, afterItemProcessed, onCatch);
|
||||
queue.push(val);
|
||||
} finally {
|
||||
done = true;
|
||||
if (next) {
|
||||
next();
|
||||
next = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pump();
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
while (queue.length > 0) {
|
||||
const val = await queue[0];
|
||||
|
||||
if (val === kEof) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (signal.aborted) {
|
||||
throw new AbortError();
|
||||
}
|
||||
|
||||
if (val !== kEmpty) {
|
||||
yield val;
|
||||
}
|
||||
|
||||
queue.shift();
|
||||
maybeResume();
|
||||
}
|
||||
|
||||
await new Promise((resolve) => {
|
||||
next = resolve;
|
||||
});
|
||||
}
|
||||
} finally {
|
||||
done = true;
|
||||
if (resume) {
|
||||
resume();
|
||||
resume = null;
|
||||
}
|
||||
}
|
||||
}.call(this);
|
||||
}
|
||||
|
||||
async function some(fn, options = undefined) {
|
||||
for await (const unused of filter.call(this, fn, options)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
async function every(fn, options = undefined) {
|
||||
if (typeof fn !== "function") {
|
||||
throw new ERR_INVALID_ARG_TYPE(
|
||||
"fn",
|
||||
["Function", "AsyncFunction"],
|
||||
fn,
|
||||
);
|
||||
}
|
||||
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
|
||||
return !(await some.call(this, async (...args) => {
|
||||
return !(await fn(...args));
|
||||
}, options));
|
||||
}
|
||||
|
||||
async function find(fn, options) {
|
||||
for await (const result of filter.call(this, fn, options)) {
|
||||
return result;
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
async function forEach(fn, options) {
|
||||
if (typeof fn !== "function") {
|
||||
throw new ERR_INVALID_ARG_TYPE(
|
||||
"fn",
|
||||
["Function", "AsyncFunction"],
|
||||
fn,
|
||||
);
|
||||
}
|
||||
async function forEachFn(value, options) {
|
||||
await fn(value, options);
|
||||
return kEmpty;
|
||||
}
|
||||
// eslint-disable-next-line no-unused-vars
|
||||
for await (const unused of map.call(this, forEachFn, options));
|
||||
}
|
||||
|
||||
function filter(fn, options) {
|
||||
if (typeof fn !== "function") {
|
||||
throw new ERR_INVALID_ARG_TYPE(
|
||||
"fn",
|
||||
["Function", "AsyncFunction"],
|
||||
fn,
|
||||
);
|
||||
}
|
||||
async function filterFn(value, options) {
|
||||
if (await fn(value, options)) {
|
||||
return value;
|
||||
}
|
||||
return kEmpty;
|
||||
}
|
||||
return map.call(this, filterFn, options);
|
||||
}
|
||||
|
||||
// Specific to provide better error to reduce since the argument is only
|
||||
// missing if the stream has no items in it - but the code is still appropriate
|
||||
class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {
|
||||
constructor() {
|
||||
super("reduce");
|
||||
this.message = "Reduce of an empty stream requires an initial value";
|
||||
}
|
||||
}
|
||||
|
||||
async function reduce(reducer, initialValue, options) {
|
||||
if (typeof reducer !== "function") {
|
||||
throw new ERR_INVALID_ARG_TYPE(
|
||||
"reducer",
|
||||
["Function", "AsyncFunction"],
|
||||
reducer,
|
||||
);
|
||||
}
|
||||
if (options != null) {
|
||||
validateObject(options, "options");
|
||||
}
|
||||
if (options?.signal != null) {
|
||||
validateAbortSignal(options.signal, "options.signal");
|
||||
}
|
||||
|
||||
let hasInitialValue = arguments.length > 1;
|
||||
if (options?.signal?.aborted) {
|
||||
const err = new AbortError(undefined, { cause: options.signal.reason });
|
||||
this.once("error", () => {}); // The error is already propagated
|
||||
await finished(this.destroy(err));
|
||||
throw err;
|
||||
}
|
||||
const ac = new AbortController();
|
||||
const signal = ac.signal;
|
||||
if (options?.signal) {
|
||||
const opts = {
|
||||
once: true,
|
||||
[kWeakHandler]: this,
|
||||
[kResistStopPropagation]: true,
|
||||
};
|
||||
options.signal.addEventListener("abort", () => ac.abort(), opts);
|
||||
}
|
||||
let gotAnyItemFromStream = false;
|
||||
try {
|
||||
for await (const value of this) {
|
||||
gotAnyItemFromStream = true;
|
||||
if (options?.signal?.aborted) {
|
||||
throw new AbortError();
|
||||
}
|
||||
if (!hasInitialValue) {
|
||||
initialValue = value;
|
||||
hasInitialValue = true;
|
||||
} else {
|
||||
initialValue = await reducer(initialValue, value, { signal });
|
||||
}
|
||||
}
|
||||
if (!gotAnyItemFromStream && !hasInitialValue) {
|
||||
throw new ReduceAwareErrMissingArgs();
|
||||
}
|
||||
} finally {
|
||||
ac.abort();
|
||||
}
|
||||
return initialValue;
|
||||
}
|
||||
|
||||
async function toArray(options) {
|
||||
if (options != null) {
|
||||
validateObject(options, "options");
|
||||
}
|
||||
if (options?.signal != null) {
|
||||
validateAbortSignal(options.signal, "options.signal");
|
||||
}
|
||||
|
||||
const result = [];
|
||||
for await (const val of this) {
|
||||
if (options?.signal?.aborted) {
|
||||
throw new AbortError(undefined, { cause: options.signal.reason });
|
||||
}
|
||||
ArrayPrototypePush(result, val);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
function flatMap(fn, options) {
|
||||
const values = map.call(this, fn, options);
|
||||
return async function* flatMap() {
|
||||
for await (const val of values) {
|
||||
yield* val;
|
||||
}
|
||||
}.call(this);
|
||||
}
|
||||
|
||||
function toIntegerOrInfinity(number) {
|
||||
// We coerce here to align with the spec
|
||||
// https://github.com/tc39/proposal-iterator-helpers/issues/169
|
||||
number = Number(number);
|
||||
if (NumberIsNaN(number)) {
|
||||
return 0;
|
||||
}
|
||||
if (number < 0) {
|
||||
throw new ERR_OUT_OF_RANGE("number", ">= 0", number);
|
||||
}
|
||||
return number;
|
||||
}
|
||||
|
||||
function drop(number, options = undefined) {
|
||||
if (options != null) {
|
||||
validateObject(options, "options");
|
||||
}
|
||||
if (options?.signal != null) {
|
||||
validateAbortSignal(options.signal, "options.signal");
|
||||
}
|
||||
|
||||
number = toIntegerOrInfinity(number);
|
||||
return async function* drop() {
|
||||
if (options?.signal?.aborted) {
|
||||
throw new AbortError();
|
||||
}
|
||||
for await (const val of this) {
|
||||
if (options?.signal?.aborted) {
|
||||
throw new AbortError();
|
||||
}
|
||||
if (number-- <= 0) {
|
||||
yield val;
|
||||
}
|
||||
}
|
||||
}.call(this);
|
||||
}
|
||||
|
||||
function take(number, options = undefined) {
|
||||
if (options != null) {
|
||||
validateObject(options, "options");
|
||||
}
|
||||
if (options?.signal != null) {
|
||||
validateAbortSignal(options.signal, "options.signal");
|
||||
}
|
||||
|
||||
number = toIntegerOrInfinity(number);
|
||||
return async function* take() {
|
||||
if (options?.signal?.aborted) {
|
||||
throw new AbortError();
|
||||
}
|
||||
for await (const val of this) {
|
||||
if (options?.signal?.aborted) {
|
||||
throw new AbortError();
|
||||
}
|
||||
if (number-- > 0) {
|
||||
yield val;
|
||||
}
|
||||
|
||||
// Don't get another item from iterator in case we reached the end
|
||||
if (number <= 0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}.call(this);
|
||||
}
|
||||
|
||||
const streamReturningOperators = {
|
||||
drop,
|
||||
filter,
|
||||
flatMap,
|
||||
map,
|
||||
take,
|
||||
compose,
|
||||
};
|
||||
|
||||
export { streamReturningOperators };
|
||||
|
||||
const promiseReturningOperators = {
|
||||
every,
|
||||
forEach,
|
||||
reduce,
|
||||
toArray,
|
||||
some,
|
||||
find,
|
||||
};
|
||||
|
||||
export { promiseReturningOperators };
|
||||
|
||||
export default {
|
||||
streamReturningOperators,
|
||||
promiseReturningOperators,
|
||||
};
|
Loading…
Add table
Add a link
Reference in a new issue