deno/ext/node/polyfills/internal/streams/compose.js

265 lines
5.6 KiB
JavaScript

// deno-lint-ignore-file
// Copyright 2018-2025 the Deno authors. MIT license.
import { primordials } from "ext:core/mod.js";
import { pipeline } from "ext:deno_node/internal/streams/pipeline.js";
import Duplex from "node:_stream_duplex";
import { destroyer } from "ext:deno_node/internal/streams/destroy.js";
import {
isNodeStream,
isReadable,
isReadableStream,
isTransformStream,
isWebStream,
isWritable,
isWritableStream,
} from "ext:deno_node/internal/streams/utils.js";
import imported1 from "ext:deno_node/internal/errors.ts";
import eos from "ext:deno_node/internal/streams/end-of-stream.js";
const {
AbortError,
codes: {
ERR_INVALID_ARG_VALUE,
ERR_MISSING_ARGS,
},
} = imported1;
"use strict";
const {
ArrayPrototypeSlice,
} = primordials;
export default function compose(...streams) {
if (streams.length === 0) {
throw new ERR_MISSING_ARGS("streams");
}
if (streams.length === 1) {
return Duplex.from(streams[0]);
}
const orgStreams = ArrayPrototypeSlice(streams);
if (typeof streams[0] === "function") {
streams[0] = Duplex.from(streams[0]);
}
if (typeof streams[streams.length - 1] === "function") {
const idx = streams.length - 1;
streams[idx] = Duplex.from(streams[idx]);
}
for (let n = 0; n < streams.length; ++n) {
if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) {
// TODO(ronag): Add checks for non streams.
continue;
}
if (
n < streams.length - 1 &&
!(
isReadable(streams[n]) ||
isReadableStream(streams[n]) ||
isTransformStream(streams[n])
)
) {
throw new ERR_INVALID_ARG_VALUE(
`streams[${n}]`,
orgStreams[n],
"must be readable",
);
}
if (
n > 0 &&
!(
isWritable(streams[n]) ||
isWritableStream(streams[n]) ||
isTransformStream(streams[n])
)
) {
throw new ERR_INVALID_ARG_VALUE(
`streams[${n}]`,
orgStreams[n],
"must be writable",
);
}
}
let ondrain;
let onfinish;
let onreadable;
let onclose;
let d;
function onfinished(err) {
const cb = onclose;
onclose = null;
if (cb) {
cb(err);
} else if (err) {
d.destroy(err);
} else if (!readable && !writable) {
d.destroy();
}
}
const head = streams[0];
const tail = pipeline(streams, onfinished);
const writable = !!(
isWritable(head) ||
isWritableStream(head) ||
isTransformStream(head)
);
const readable = !!(
isReadable(tail) ||
isReadableStream(tail) ||
isTransformStream(tail)
);
// TODO(ronag): Avoid double buffering.
// Implement Writable/Readable/Duplex traits.
// See, https://github.com/nodejs/node/pull/33515.
d = new Duplex({
// TODO (ronag): highWaterMark?
writableObjectMode: !!head?.writableObjectMode,
readableObjectMode: !!tail?.readableObjectMode,
writable,
readable,
});
if (writable) {
if (isNodeStream(head)) {
d._write = function (chunk, encoding, callback) {
if (head.write(chunk, encoding)) {
callback();
} else {
ondrain = callback;
}
};
d._final = function (callback) {
head.end();
onfinish = callback;
};
head.on("drain", function () {
if (ondrain) {
const cb = ondrain;
ondrain = null;
cb();
}
});
} else if (isWebStream(head)) {
const writable = isTransformStream(head) ? head.writable : head;
const writer = writable.getWriter();
d._write = async function (chunk, encoding, callback) {
try {
await writer.ready;
writer.write(chunk).catch(() => {});
callback();
} catch (err) {
callback(err);
}
};
d._final = async function (callback) {
try {
await writer.ready;
writer.close().catch(() => {});
onfinish = callback;
} catch (err) {
callback(err);
}
};
}
const toRead = isTransformStream(tail) ? tail.readable : tail;
eos(toRead, () => {
if (onfinish) {
const cb = onfinish;
onfinish = null;
cb();
}
});
}
if (readable) {
if (isNodeStream(tail)) {
tail.on("readable", function () {
if (onreadable) {
const cb = onreadable;
onreadable = null;
cb();
}
});
tail.on("end", function () {
d.push(null);
});
d._read = function () {
while (true) {
const buf = tail.read();
if (buf === null) {
onreadable = d._read;
return;
}
if (!d.push(buf)) {
return;
}
}
};
} else if (isWebStream(tail)) {
const readable = isTransformStream(tail) ? tail.readable : tail;
const reader = readable.getReader();
d._read = async function () {
while (true) {
try {
const { value, done } = await reader.read();
if (!d.push(value)) {
return;
}
if (done) {
d.push(null);
return;
}
} catch {
return;
}
}
};
}
}
d._destroy = function (err, callback) {
if (!err && onclose !== null) {
err = new AbortError();
}
onreadable = null;
ondrain = null;
onfinish = null;
if (isNodeStream(tail)) {
destroyer(tail, err);
}
if (onclose === null) {
callback(err);
} else {
onclose = callback;
}
};
return d;
}