deno/ext/node/polyfills/internal/streams/duplexify.js

408 lines
8.3 KiB
JavaScript

// deno-lint-ignore-file
// Copyright 2018-2025 the Deno authors. MIT license.
import process from "node:process";
import { primordials } from "ext:core/mod.js";
import {
isDuplexNodeStream,
isIterable,
isNodeStream,
isReadable,
isReadableNodeStream,
isReadableStream,
isWritable,
isWritableNodeStream,
isWritableStream,
} from "ext:deno_node/internal/streams/utils.js";
import eos from "ext:deno_node/internal/streams/end-of-stream.js";
import imported1 from "ext:deno_node/internal/errors.ts";
import { destroyer } from "ext:deno_node/internal/streams/destroy.js";
import Duplex from "node:_stream_duplex";
import Readable from "node:_stream_readable";
import Writable from "node:_stream_writable";
import from from "ext:deno_node/internal/streams/from.js";
import { isBlob } from "ext:deno_web/09_file.js";
import { AbortController } from "ext:deno_web/03_abort_signal.js";
const {
AbortError,
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_RETURN_VALUE,
},
} = imported1;
"use strict";
const {
FunctionPrototypeCall,
PromiseWithResolvers,
} = primordials;
let _Duplexify;
export default function duplexify(body, name) {
// This is needed for pre node 17.
class Duplexify extends Duplex {
constructor(options) {
super(options);
// https://github.com/nodejs/node/pull/34385
if (options?.readable === false) {
this._readableState.readable = false;
this._readableState.ended = true;
this._readableState.endEmitted = true;
}
if (options?.writable === false) {
this._writableState.writable = false;
this._writableState.ending = true;
this._writableState.ended = true;
this._writableState.finished = true;
}
}
}
_Duplexify = Duplexify;
if (isDuplexNodeStream(body)) {
return body;
}
if (isReadableNodeStream(body)) {
return _duplexify({ readable: body });
}
if (isWritableNodeStream(body)) {
return _duplexify({ writable: body });
}
if (isNodeStream(body)) {
return _duplexify({ writable: false, readable: false });
}
if (isReadableStream(body)) {
return _duplexify({ readable: Readable.fromWeb(body) });
}
if (isWritableStream(body)) {
return _duplexify({ writable: Writable.fromWeb(body) });
}
if (typeof body === "function") {
const { value, write, final, destroy } = fromAsyncGen(body);
// Body might be a constructor function instead of an async generator function.
if (isDuplexNodeStream(value)) {
return value;
}
if (isIterable(value)) {
return from(Duplexify, value, {
// TODO (ronag): highWaterMark?
objectMode: true,
write,
final,
destroy,
});
}
const then = value?.then;
if (typeof then === "function") {
let d;
const promise = FunctionPrototypeCall(
then,
value,
(val) => {
if (val != null) {
throw new ERR_INVALID_RETURN_VALUE("nully", "body", val);
}
},
(err) => {
destroyer(d, err);
},
);
return d = new Duplexify({
// TODO (ronag): highWaterMark?
objectMode: true,
readable: false,
write,
final(cb) {
final(async () => {
try {
await promise;
process.nextTick(cb, null);
} catch (err) {
process.nextTick(cb, err);
}
});
},
destroy,
});
}
throw new ERR_INVALID_RETURN_VALUE(
"Iterable, AsyncIterable or AsyncFunction",
name,
value,
);
}
if (isBlob(body)) {
return duplexify(body.arrayBuffer());
}
if (isIterable(body)) {
return from(Duplexify, body, {
// TODO (ronag): highWaterMark?
objectMode: true,
writable: false,
});
}
if (
isReadableStream(body?.readable) &&
isWritableStream(body?.writable)
) {
return Duplexify.fromWeb(body);
}
if (
typeof body?.writable === "object" ||
typeof body?.readable === "object"
) {
const readable = body?.readable
? isReadableNodeStream(body?.readable)
? body?.readable
: duplexify(body.readable)
: undefined;
const writable = body?.writable
? isWritableNodeStream(body?.writable)
? body?.writable
: duplexify(body.writable)
: undefined;
return _duplexify({ readable, writable });
}
const then = body?.then;
if (typeof then === "function") {
let d;
FunctionPrototypeCall(
then,
body,
(val) => {
if (val != null) {
d.push(val);
}
d.push(null);
},
(err) => {
destroyer(d, err);
},
);
return d = new Duplexify({
objectMode: true,
writable: false,
read() {},
});
}
throw new ERR_INVALID_ARG_TYPE(
name,
[
"Blob",
"ReadableStream",
"WritableStream",
"Stream",
"Iterable",
"AsyncIterable",
"Function",
"{ readable, writable } pair",
"Promise",
],
body,
);
}
function fromAsyncGen(fn) {
let { promise, resolve } = PromiseWithResolvers();
const ac = new AbortController();
const signal = ac.signal;
const value = fn(
async function* () {
while (true) {
const _promise = promise;
promise = null;
const { chunk, done, cb } = await _promise;
process.nextTick(cb);
if (done) return;
if (signal.aborted) {
throw new AbortError(undefined, { cause: signal.reason });
}
({ promise, resolve } = PromiseWithResolvers());
yield chunk;
}
}(),
{ signal },
);
return {
value,
write(chunk, encoding, cb) {
const _resolve = resolve;
resolve = null;
_resolve({ chunk, done: false, cb });
},
final(cb) {
const _resolve = resolve;
resolve = null;
_resolve({ done: true, cb });
},
destroy(err, cb) {
ac.abort();
cb(err);
},
};
}
function _duplexify(pair) {
const r = pair.readable && typeof pair.readable.read !== "function"
? Readable.wrap(pair.readable)
: pair.readable;
const w = pair.writable;
let readable = !!isReadable(r);
let writable = !!isWritable(w);
let ondrain;
let onfinish;
let onreadable;
let onclose;
let d;
function onfinished(err) {
const cb = onclose;
onclose = null;
if (cb) {
cb(err);
} else if (err) {
d.destroy(err);
}
}
// TODO(ronag): Avoid double buffering.
// Implement Writable/Readable/Duplex traits.
// See, https://github.com/nodejs/node/pull/33515.
d = new _Duplexify({
// TODO (ronag): highWaterMark?
readableObjectMode: !!r?.readableObjectMode,
writableObjectMode: !!w?.writableObjectMode,
readable,
writable,
});
if (writable) {
eos(w, (err) => {
writable = false;
if (err) {
destroyer(r, err);
}
onfinished(err);
});
d._write = function (chunk, encoding, callback) {
if (w.write(chunk, encoding)) {
callback();
} else {
ondrain = callback;
}
};
d._final = function (callback) {
w.end();
onfinish = callback;
};
w.on("drain", function () {
if (ondrain) {
const cb = ondrain;
ondrain = null;
cb();
}
});
w.on("finish", function () {
if (onfinish) {
const cb = onfinish;
onfinish = null;
cb();
}
});
}
if (readable) {
eos(r, (err) => {
readable = false;
if (err) {
destroyer(r, err);
}
onfinished(err);
});
r.on("readable", function () {
if (onreadable) {
const cb = onreadable;
onreadable = null;
cb();
}
});
r.on("end", function () {
d.push(null);
});
d._read = function () {
while (true) {
const buf = r.read();
if (buf === null) {
onreadable = d._read;
return;
}
if (!d.push(buf)) {
return;
}
}
};
}
d._destroy = function (err, callback) {
if (!err && onclose !== null) {
err = new AbortError();
}
onreadable = null;
ondrain = null;
onfinish = null;
if (onclose === null) {
callback(err);
} else {
onclose = callback;
destroyer(w, err);
destroyer(r, err);
}
};
return d;
}