// deno-lint-ignore-file // Copyright 2018-2025 the Deno authors. MIT license. import { primordials } from "ext:core/mod.js"; import { pipeline } from "ext:deno_node/internal/streams/pipeline.js"; import Duplex from "node:_stream_duplex"; import { destroyer } from "ext:deno_node/internal/streams/destroy.js"; import { isNodeStream, isReadable, isReadableStream, isTransformStream, isWebStream, isWritable, isWritableStream, } from "ext:deno_node/internal/streams/utils.js"; import imported1 from "ext:deno_node/internal/errors.ts"; import eos from "ext:deno_node/internal/streams/end-of-stream.js"; const { AbortError, codes: { ERR_INVALID_ARG_VALUE, ERR_MISSING_ARGS, }, } = imported1; "use strict"; const { ArrayPrototypeSlice, } = primordials; export default function compose(...streams) { if (streams.length === 0) { throw new ERR_MISSING_ARGS("streams"); } if (streams.length === 1) { return Duplex.from(streams[0]); } const orgStreams = ArrayPrototypeSlice(streams); if (typeof streams[0] === "function") { streams[0] = Duplex.from(streams[0]); } if (typeof streams[streams.length - 1] === "function") { const idx = streams.length - 1; streams[idx] = Duplex.from(streams[idx]); } for (let n = 0; n < streams.length; ++n) { if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) { // TODO(ronag): Add checks for non streams. continue; } if ( n < streams.length - 1 && !( isReadable(streams[n]) || isReadableStream(streams[n]) || isTransformStream(streams[n]) ) ) { throw new ERR_INVALID_ARG_VALUE( `streams[${n}]`, orgStreams[n], "must be readable", ); } if ( n > 0 && !( isWritable(streams[n]) || isWritableStream(streams[n]) || isTransformStream(streams[n]) ) ) { throw new ERR_INVALID_ARG_VALUE( `streams[${n}]`, orgStreams[n], "must be writable", ); } } let ondrain; let onfinish; let onreadable; let onclose; let d; function onfinished(err) { const cb = onclose; onclose = null; if (cb) { cb(err); } else if (err) { d.destroy(err); } else if (!readable && !writable) { d.destroy(); } } const head = streams[0]; const tail = pipeline(streams, onfinished); const writable = !!( isWritable(head) || isWritableStream(head) || isTransformStream(head) ); const readable = !!( isReadable(tail) || isReadableStream(tail) || isTransformStream(tail) ); // TODO(ronag): Avoid double buffering. // Implement Writable/Readable/Duplex traits. // See, https://github.com/nodejs/node/pull/33515. d = new Duplex({ // TODO (ronag): highWaterMark? writableObjectMode: !!head?.writableObjectMode, readableObjectMode: !!tail?.readableObjectMode, writable, readable, }); if (writable) { if (isNodeStream(head)) { d._write = function (chunk, encoding, callback) { if (head.write(chunk, encoding)) { callback(); } else { ondrain = callback; } }; d._final = function (callback) { head.end(); onfinish = callback; }; head.on("drain", function () { if (ondrain) { const cb = ondrain; ondrain = null; cb(); } }); } else if (isWebStream(head)) { const writable = isTransformStream(head) ? head.writable : head; const writer = writable.getWriter(); d._write = async function (chunk, encoding, callback) { try { await writer.ready; writer.write(chunk).catch(() => {}); callback(); } catch (err) { callback(err); } }; d._final = async function (callback) { try { await writer.ready; writer.close().catch(() => {}); onfinish = callback; } catch (err) { callback(err); } }; } const toRead = isTransformStream(tail) ? tail.readable : tail; eos(toRead, () => { if (onfinish) { const cb = onfinish; onfinish = null; cb(); } }); } if (readable) { if (isNodeStream(tail)) { tail.on("readable", function () { if (onreadable) { const cb = onreadable; onreadable = null; cb(); } }); tail.on("end", function () { d.push(null); }); d._read = function () { while (true) { const buf = tail.read(); if (buf === null) { onreadable = d._read; return; } if (!d.push(buf)) { return; } } }; } else if (isWebStream(tail)) { const readable = isTransformStream(tail) ? tail.readable : tail; const reader = readable.getReader(); d._read = async function () { while (true) { try { const { value, done } = await reader.read(); if (!d.push(value)) { return; } if (done) { d.push(null); return; } } catch { return; } } }; } } d._destroy = function (err, callback) { if (!err && onclose !== null) { err = new AbortError(); } onreadable = null; ondrain = null; onfinish = null; if (isNodeStream(tail)) { destroyer(tail, err); } if (onclose === null) { callback(err); } else { onclose = callback; } }; return d; }