deno/ext/node/polyfills/internal/streams/operators.js
Divy Srivastava 01b6da9d9b
fix(ext/node): upgrade node:stream (#28855)
Ref https://github.com/denoland/deno/issues/28836

This PR replaces the _stream.mjs bundle with a file-by-file port instead. A codemod transpiles Node.js internals to ESM. The codemod performs three tasks: translating CJS to ESM, remapping internal dependencies, and hoisting lazy requires as imports.

The process is fully automated through the `update_node_stream.ts` script, simplifying future internal updates. The script checks out Node.js from a specific tag defined in the `tests/node_compat/runner`.

Additionally, the update enables new tests in our Node test runner and adds features (like compose()) that were missing from the outdated bundle.

## Performance

There is a 140KB+ binary size increase on aarch64-apple-darwin and nop startup time stays the same.
2025-04-14 21:35:34 +05:30

473 lines
10 KiB
JavaScript

// deno-lint-ignore-file
// Copyright 2018-2025 the Deno authors. MIT license.
import { primordials } from "ext:core/mod.js";
import { AbortController, AbortSignal } from "ext:deno_web/03_abort_signal.js";
import imported1 from "ext:deno_node/internal/errors.ts";
import {
validateAbortSignal,
validateInteger,
validateObject,
} from "ext:deno_node/internal/validators.mjs";
import {
kResistStopPropagation,
kWeakHandler,
} from "ext:deno_node/internal/event_target.mjs";
import { finished } from "ext:deno_node/internal/streams/end-of-stream.js";
import staticCompose from "ext:deno_node/internal/streams/compose.js";
import { addAbortSignalNoValidate } from "ext:deno_node/internal/streams/add-abort-signal.js";
import {
isNodeStream,
isWritable,
} from "ext:deno_node/internal/streams/utils.js";
const {
AbortError,
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE,
ERR_MISSING_ARGS,
ERR_OUT_OF_RANGE,
},
} = imported1;
"use strict";
const {
ArrayPrototypePush,
Boolean,
MathFloor,
Number,
NumberIsNaN,
Promise,
PromisePrototypeThen,
PromiseReject,
PromiseResolve,
Symbol,
} = primordials;
const kEmpty = Symbol("kEmpty");
const kEof = Symbol("kEof");
function compose(stream, options) {
if (options != null) {
validateObject(options, "options");
}
if (options?.signal != null) {
validateAbortSignal(options.signal, "options.signal");
}
if (isNodeStream(stream) && !isWritable(stream)) {
throw new ERR_INVALID_ARG_VALUE("stream", stream, "must be writable");
}
const composedStream = staticCompose(this, stream);
if (options?.signal) {
// Not validating as we already validated before
addAbortSignalNoValidate(
options.signal,
composedStream,
);
}
return composedStream;
}
function map(fn, options) {
if (typeof fn !== "function") {
throw new ERR_INVALID_ARG_TYPE(
"fn",
["Function", "AsyncFunction"],
fn,
);
}
if (options != null) {
validateObject(options, "options");
}
if (options?.signal != null) {
validateAbortSignal(options.signal, "options.signal");
}
let concurrency = 1;
if (options?.concurrency != null) {
concurrency = MathFloor(options.concurrency);
}
let highWaterMark = concurrency - 1;
if (options?.highWaterMark != null) {
highWaterMark = MathFloor(options.highWaterMark);
}
validateInteger(concurrency, "options.concurrency", 1);
validateInteger(highWaterMark, "options.highWaterMark", 0);
highWaterMark += concurrency;
return async function* map() {
const signal = AbortSignal.any([options?.signal].filter(Boolean));
const stream = this;
const queue = [];
const signalOpt = { signal };
let next;
let resume;
let done = false;
let cnt = 0;
function onCatch() {
done = true;
afterItemProcessed();
}
function afterItemProcessed() {
cnt -= 1;
maybeResume();
}
function maybeResume() {
if (
resume &&
!done &&
cnt < concurrency &&
queue.length < highWaterMark
) {
resume();
resume = null;
}
}
async function pump() {
try {
for await (let val of stream) {
if (done) {
return;
}
if (signal.aborted) {
throw new AbortError();
}
try {
val = fn(val, signalOpt);
if (val === kEmpty) {
continue;
}
val = PromiseResolve(val);
} catch (err) {
val = PromiseReject(err);
}
cnt += 1;
PromisePrototypeThen(val, afterItemProcessed, onCatch);
queue.push(val);
if (next) {
next();
next = null;
}
if (!done && (queue.length >= highWaterMark || cnt >= concurrency)) {
await new Promise((resolve) => {
resume = resolve;
});
}
}
queue.push(kEof);
} catch (err) {
const val = PromiseReject(err);
PromisePrototypeThen(val, afterItemProcessed, onCatch);
queue.push(val);
} finally {
done = true;
if (next) {
next();
next = null;
}
}
}
pump();
try {
while (true) {
while (queue.length > 0) {
const val = await queue[0];
if (val === kEof) {
return;
}
if (signal.aborted) {
throw new AbortError();
}
if (val !== kEmpty) {
yield val;
}
queue.shift();
maybeResume();
}
await new Promise((resolve) => {
next = resolve;
});
}
} finally {
done = true;
if (resume) {
resume();
resume = null;
}
}
}.call(this);
}
async function some(fn, options = undefined) {
for await (const unused of filter.call(this, fn, options)) {
return true;
}
return false;
}
async function every(fn, options = undefined) {
if (typeof fn !== "function") {
throw new ERR_INVALID_ARG_TYPE(
"fn",
["Function", "AsyncFunction"],
fn,
);
}
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
return !(await some.call(this, async (...args) => {
return !(await fn(...args));
}, options));
}
async function find(fn, options) {
for await (const result of filter.call(this, fn, options)) {
return result;
}
return undefined;
}
async function forEach(fn, options) {
if (typeof fn !== "function") {
throw new ERR_INVALID_ARG_TYPE(
"fn",
["Function", "AsyncFunction"],
fn,
);
}
async function forEachFn(value, options) {
await fn(value, options);
return kEmpty;
}
// eslint-disable-next-line no-unused-vars
for await (const unused of map.call(this, forEachFn, options));
}
function filter(fn, options) {
if (typeof fn !== "function") {
throw new ERR_INVALID_ARG_TYPE(
"fn",
["Function", "AsyncFunction"],
fn,
);
}
async function filterFn(value, options) {
if (await fn(value, options)) {
return value;
}
return kEmpty;
}
return map.call(this, filterFn, options);
}
// Specific to provide better error to reduce since the argument is only
// missing if the stream has no items in it - but the code is still appropriate
class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {
constructor() {
super("reduce");
this.message = "Reduce of an empty stream requires an initial value";
}
}
async function reduce(reducer, initialValue, options) {
if (typeof reducer !== "function") {
throw new ERR_INVALID_ARG_TYPE(
"reducer",
["Function", "AsyncFunction"],
reducer,
);
}
if (options != null) {
validateObject(options, "options");
}
if (options?.signal != null) {
validateAbortSignal(options.signal, "options.signal");
}
let hasInitialValue = arguments.length > 1;
if (options?.signal?.aborted) {
const err = new AbortError(undefined, { cause: options.signal.reason });
this.once("error", () => {}); // The error is already propagated
await finished(this.destroy(err));
throw err;
}
const ac = new AbortController();
const signal = ac.signal;
if (options?.signal) {
const opts = {
once: true,
[kWeakHandler]: this,
[kResistStopPropagation]: true,
};
options.signal.addEventListener("abort", () => ac.abort(), opts);
}
let gotAnyItemFromStream = false;
try {
for await (const value of this) {
gotAnyItemFromStream = true;
if (options?.signal?.aborted) {
throw new AbortError();
}
if (!hasInitialValue) {
initialValue = value;
hasInitialValue = true;
} else {
initialValue = await reducer(initialValue, value, { signal });
}
}
if (!gotAnyItemFromStream && !hasInitialValue) {
throw new ReduceAwareErrMissingArgs();
}
} finally {
ac.abort();
}
return initialValue;
}
async function toArray(options) {
if (options != null) {
validateObject(options, "options");
}
if (options?.signal != null) {
validateAbortSignal(options.signal, "options.signal");
}
const result = [];
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError(undefined, { cause: options.signal.reason });
}
ArrayPrototypePush(result, val);
}
return result;
}
function flatMap(fn, options) {
const values = map.call(this, fn, options);
return async function* flatMap() {
for await (const val of values) {
yield* val;
}
}.call(this);
}
function toIntegerOrInfinity(number) {
// We coerce here to align with the spec
// https://github.com/tc39/proposal-iterator-helpers/issues/169
number = Number(number);
if (NumberIsNaN(number)) {
return 0;
}
if (number < 0) {
throw new ERR_OUT_OF_RANGE("number", ">= 0", number);
}
return number;
}
function drop(number, options = undefined) {
if (options != null) {
validateObject(options, "options");
}
if (options?.signal != null) {
validateAbortSignal(options.signal, "options.signal");
}
number = toIntegerOrInfinity(number);
return async function* drop() {
if (options?.signal?.aborted) {
throw new AbortError();
}
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError();
}
if (number-- <= 0) {
yield val;
}
}
}.call(this);
}
function take(number, options = undefined) {
if (options != null) {
validateObject(options, "options");
}
if (options?.signal != null) {
validateAbortSignal(options.signal, "options.signal");
}
number = toIntegerOrInfinity(number);
return async function* take() {
if (options?.signal?.aborted) {
throw new AbortError();
}
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError();
}
if (number-- > 0) {
yield val;
}
// Don't get another item from iterator in case we reached the end
if (number <= 0) {
return;
}
}
}.call(this);
}
const streamReturningOperators = {
drop,
filter,
flatMap,
map,
take,
compose,
};
export { streamReturningOperators };
const promiseReturningOperators = {
every,
forEach,
reduce,
toArray,
some,
find,
};
export { promiseReturningOperators };
export default {
streamReturningOperators,
promiseReturningOperators,
};