mirror of
https://github.com/denoland/deno.git
synced 2025-09-22 02:12:33 +00:00

Ref https://github.com/denoland/deno/issues/28836 This PR replaces the _stream.mjs bundle with a file-by-file port instead. A codemod transpiles Node.js internals to ESM. The codemod performs three tasks: translating CJS to ESM, remapping internal dependencies, and hoisting lazy requires as imports. The process is fully automated through the `update_node_stream.ts` script, simplifying future internal updates. The script checks out Node.js from a specific tag defined in the `tests/node_compat/runner`. Additionally, the update enables new tests in our Node test runner and adds features (like compose()) that were missing from the outdated bundle. ## Performance There is a 140KB+ binary size increase on aarch64-apple-darwin and nop startup time stays the same.
218 lines
4.5 KiB
JavaScript
218 lines
4.5 KiB
JavaScript
// deno-lint-ignore-file
|
|
// Copyright 2018-2025 the Deno authors. MIT license.
|
|
|
|
import process from "node:process";
|
|
import { primordials } from "ext:core/mod.js";
|
|
import { Buffer } from "node:buffer";
|
|
import _mod1 from "ext:deno_node/internal/errors.ts";
|
|
|
|
const {
|
|
ERR_INVALID_ARG_TYPE,
|
|
ERR_STREAM_NULL_VALUES,
|
|
} = _mod1.codes;
|
|
|
|
"use strict";
|
|
|
|
const {
|
|
PromisePrototypeThen,
|
|
SymbolAsyncIterator,
|
|
SymbolIterator,
|
|
} = primordials;
|
|
|
|
function from(Readable, iterable, opts) {
|
|
let iterator;
|
|
if (typeof iterable === "string" || iterable instanceof Buffer) {
|
|
return new Readable({
|
|
objectMode: true,
|
|
...opts,
|
|
read() {
|
|
this.push(iterable);
|
|
this.push(null);
|
|
},
|
|
});
|
|
}
|
|
|
|
let isAsync;
|
|
if (iterable?.[SymbolAsyncIterator]) {
|
|
isAsync = true;
|
|
iterator = iterable[SymbolAsyncIterator]();
|
|
} else if (iterable?.[SymbolIterator]) {
|
|
isAsync = false;
|
|
iterator = iterable[SymbolIterator]();
|
|
} else {
|
|
throw new ERR_INVALID_ARG_TYPE("iterable", ["Iterable"], iterable);
|
|
}
|
|
|
|
const readable = new Readable({
|
|
objectMode: true,
|
|
highWaterMark: 1,
|
|
// TODO(ronag): What options should be allowed?
|
|
...opts,
|
|
});
|
|
|
|
// Flag to protect against _read
|
|
// being called before last iteration completion.
|
|
let reading = false;
|
|
let isAsyncValues = false;
|
|
|
|
readable._read = function () {
|
|
if (!reading) {
|
|
reading = true;
|
|
|
|
if (isAsync) {
|
|
nextAsync();
|
|
} else if (isAsyncValues) {
|
|
nextSyncWithAsyncValues();
|
|
} else {
|
|
nextSyncWithSyncValues();
|
|
}
|
|
}
|
|
};
|
|
|
|
readable._destroy = function (error, cb) {
|
|
PromisePrototypeThen(
|
|
close(error),
|
|
() => process.nextTick(cb, error), // nextTick is here in case cb throws
|
|
(e) => process.nextTick(cb, e || error),
|
|
);
|
|
};
|
|
|
|
async function close(error) {
|
|
const hadError = (error !== undefined) && (error !== null);
|
|
const hasThrow = typeof iterator.throw === "function";
|
|
if (hadError && hasThrow) {
|
|
const { value, done } = await iterator.throw(error);
|
|
await value;
|
|
if (done) {
|
|
return;
|
|
}
|
|
}
|
|
if (typeof iterator.return === "function") {
|
|
const { value } = await iterator.return();
|
|
await value;
|
|
}
|
|
}
|
|
|
|
// There are a lot of duplication here, it's done on purpose for performance
|
|
// reasons - avoid await when not needed.
|
|
|
|
function nextSyncWithSyncValues() {
|
|
for (;;) {
|
|
try {
|
|
const { value, done } = iterator.next();
|
|
|
|
if (done) {
|
|
readable.push(null);
|
|
return;
|
|
}
|
|
|
|
if (
|
|
value &&
|
|
typeof value.then === "function"
|
|
) {
|
|
return changeToAsyncValues(value);
|
|
}
|
|
|
|
if (value === null) {
|
|
reading = false;
|
|
throw new ERR_STREAM_NULL_VALUES();
|
|
}
|
|
|
|
if (readable.push(value)) {
|
|
continue;
|
|
}
|
|
|
|
reading = false;
|
|
} catch (err) {
|
|
readable.destroy(err);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
async function changeToAsyncValues(value) {
|
|
isAsyncValues = true;
|
|
|
|
try {
|
|
const res = await value;
|
|
|
|
if (res === null) {
|
|
reading = false;
|
|
throw new ERR_STREAM_NULL_VALUES();
|
|
}
|
|
|
|
if (readable.push(res)) {
|
|
nextSyncWithAsyncValues();
|
|
return;
|
|
}
|
|
|
|
reading = false;
|
|
} catch (err) {
|
|
readable.destroy(err);
|
|
}
|
|
}
|
|
|
|
async function nextSyncWithAsyncValues() {
|
|
for (;;) {
|
|
try {
|
|
const { value, done } = iterator.next();
|
|
|
|
if (done) {
|
|
readable.push(null);
|
|
return;
|
|
}
|
|
|
|
const res = (value &&
|
|
typeof value.then === "function")
|
|
? await value
|
|
: value;
|
|
|
|
if (res === null) {
|
|
reading = false;
|
|
throw new ERR_STREAM_NULL_VALUES();
|
|
}
|
|
|
|
if (readable.push(res)) {
|
|
continue;
|
|
}
|
|
|
|
reading = false;
|
|
} catch (err) {
|
|
readable.destroy(err);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
async function nextAsync() {
|
|
for (;;) {
|
|
try {
|
|
const { value, done } = await iterator.next();
|
|
|
|
if (done) {
|
|
readable.push(null);
|
|
return;
|
|
}
|
|
|
|
if (value === null) {
|
|
reading = false;
|
|
throw new ERR_STREAM_NULL_VALUES();
|
|
}
|
|
|
|
if (readable.push(value)) {
|
|
continue;
|
|
}
|
|
|
|
reading = false;
|
|
} catch (err) {
|
|
readable.destroy(err);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
return readable;
|
|
}
|
|
|
|
const _defaultExport = from;
|
|
export default _defaultExport;
|
|
export { from };
|