mirror of
https://github.com/denoland/deno.git
synced 2025-09-22 02:12:33 +00:00
408 lines
8.3 KiB
JavaScript
408 lines
8.3 KiB
JavaScript
// deno-lint-ignore-file
|
|
// Copyright 2018-2025 the Deno authors. MIT license.
|
|
import process from "node:process";
|
|
import { primordials } from "ext:core/mod.js";
|
|
|
|
import {
|
|
isDuplexNodeStream,
|
|
isIterable,
|
|
isNodeStream,
|
|
isReadable,
|
|
isReadableNodeStream,
|
|
isReadableStream,
|
|
isWritable,
|
|
isWritableNodeStream,
|
|
isWritableStream,
|
|
} from "ext:deno_node/internal/streams/utils.js";
|
|
|
|
import eos from "ext:deno_node/internal/streams/end-of-stream.js";
|
|
import imported1 from "ext:deno_node/internal/errors.ts";
|
|
import { destroyer } from "ext:deno_node/internal/streams/destroy.js";
|
|
import Duplex from "node:_stream_duplex";
|
|
import Readable from "node:_stream_readable";
|
|
import Writable from "node:_stream_writable";
|
|
import from from "ext:deno_node/internal/streams/from.js";
|
|
import { isBlob } from "ext:deno_web/09_file.js";
|
|
import { AbortController } from "ext:deno_web/03_abort_signal.js";
|
|
|
|
const {
|
|
AbortError,
|
|
codes: {
|
|
ERR_INVALID_ARG_TYPE,
|
|
ERR_INVALID_RETURN_VALUE,
|
|
},
|
|
} = imported1;
|
|
|
|
"use strict";
|
|
|
|
const {
|
|
FunctionPrototypeCall,
|
|
PromiseWithResolvers,
|
|
} = primordials;
|
|
|
|
let _Duplexify;
|
|
|
|
export default function duplexify(body, name) {
|
|
// This is needed for pre node 17.
|
|
class Duplexify extends Duplex {
|
|
constructor(options) {
|
|
super(options);
|
|
|
|
// https://github.com/nodejs/node/pull/34385
|
|
|
|
if (options?.readable === false) {
|
|
this._readableState.readable = false;
|
|
this._readableState.ended = true;
|
|
this._readableState.endEmitted = true;
|
|
}
|
|
|
|
if (options?.writable === false) {
|
|
this._writableState.writable = false;
|
|
this._writableState.ending = true;
|
|
this._writableState.ended = true;
|
|
this._writableState.finished = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
_Duplexify = Duplexify;
|
|
|
|
if (isDuplexNodeStream(body)) {
|
|
return body;
|
|
}
|
|
|
|
if (isReadableNodeStream(body)) {
|
|
return _duplexify({ readable: body });
|
|
}
|
|
|
|
if (isWritableNodeStream(body)) {
|
|
return _duplexify({ writable: body });
|
|
}
|
|
|
|
if (isNodeStream(body)) {
|
|
return _duplexify({ writable: false, readable: false });
|
|
}
|
|
|
|
if (isReadableStream(body)) {
|
|
return _duplexify({ readable: Readable.fromWeb(body) });
|
|
}
|
|
|
|
if (isWritableStream(body)) {
|
|
return _duplexify({ writable: Writable.fromWeb(body) });
|
|
}
|
|
|
|
if (typeof body === "function") {
|
|
const { value, write, final, destroy } = fromAsyncGen(body);
|
|
|
|
// Body might be a constructor function instead of an async generator function.
|
|
if (isDuplexNodeStream(value)) {
|
|
return value;
|
|
}
|
|
|
|
if (isIterable(value)) {
|
|
return from(Duplexify, value, {
|
|
// TODO (ronag): highWaterMark?
|
|
objectMode: true,
|
|
write,
|
|
final,
|
|
destroy,
|
|
});
|
|
}
|
|
|
|
const then = value?.then;
|
|
if (typeof then === "function") {
|
|
let d;
|
|
|
|
const promise = FunctionPrototypeCall(
|
|
then,
|
|
value,
|
|
(val) => {
|
|
if (val != null) {
|
|
throw new ERR_INVALID_RETURN_VALUE("nully", "body", val);
|
|
}
|
|
},
|
|
(err) => {
|
|
destroyer(d, err);
|
|
},
|
|
);
|
|
|
|
return d = new Duplexify({
|
|
// TODO (ronag): highWaterMark?
|
|
objectMode: true,
|
|
readable: false,
|
|
write,
|
|
final(cb) {
|
|
final(async () => {
|
|
try {
|
|
await promise;
|
|
process.nextTick(cb, null);
|
|
} catch (err) {
|
|
process.nextTick(cb, err);
|
|
}
|
|
});
|
|
},
|
|
destroy,
|
|
});
|
|
}
|
|
|
|
throw new ERR_INVALID_RETURN_VALUE(
|
|
"Iterable, AsyncIterable or AsyncFunction",
|
|
name,
|
|
value,
|
|
);
|
|
}
|
|
|
|
if (isBlob(body)) {
|
|
return duplexify(body.arrayBuffer());
|
|
}
|
|
|
|
if (isIterable(body)) {
|
|
return from(Duplexify, body, {
|
|
// TODO (ronag): highWaterMark?
|
|
objectMode: true,
|
|
writable: false,
|
|
});
|
|
}
|
|
|
|
if (
|
|
isReadableStream(body?.readable) &&
|
|
isWritableStream(body?.writable)
|
|
) {
|
|
return Duplexify.fromWeb(body);
|
|
}
|
|
|
|
if (
|
|
typeof body?.writable === "object" ||
|
|
typeof body?.readable === "object"
|
|
) {
|
|
const readable = body?.readable
|
|
? isReadableNodeStream(body?.readable)
|
|
? body?.readable
|
|
: duplexify(body.readable)
|
|
: undefined;
|
|
|
|
const writable = body?.writable
|
|
? isWritableNodeStream(body?.writable)
|
|
? body?.writable
|
|
: duplexify(body.writable)
|
|
: undefined;
|
|
|
|
return _duplexify({ readable, writable });
|
|
}
|
|
|
|
const then = body?.then;
|
|
if (typeof then === "function") {
|
|
let d;
|
|
|
|
FunctionPrototypeCall(
|
|
then,
|
|
body,
|
|
(val) => {
|
|
if (val != null) {
|
|
d.push(val);
|
|
}
|
|
d.push(null);
|
|
},
|
|
(err) => {
|
|
destroyer(d, err);
|
|
},
|
|
);
|
|
|
|
return d = new Duplexify({
|
|
objectMode: true,
|
|
writable: false,
|
|
read() {},
|
|
});
|
|
}
|
|
|
|
throw new ERR_INVALID_ARG_TYPE(
|
|
name,
|
|
[
|
|
"Blob",
|
|
"ReadableStream",
|
|
"WritableStream",
|
|
"Stream",
|
|
"Iterable",
|
|
"AsyncIterable",
|
|
"Function",
|
|
"{ readable, writable } pair",
|
|
"Promise",
|
|
],
|
|
body,
|
|
);
|
|
}
|
|
|
|
function fromAsyncGen(fn) {
|
|
let { promise, resolve } = PromiseWithResolvers();
|
|
const ac = new AbortController();
|
|
const signal = ac.signal;
|
|
const value = fn(
|
|
async function* () {
|
|
while (true) {
|
|
const _promise = promise;
|
|
promise = null;
|
|
const { chunk, done, cb } = await _promise;
|
|
process.nextTick(cb);
|
|
if (done) return;
|
|
if (signal.aborted) {
|
|
throw new AbortError(undefined, { cause: signal.reason });
|
|
}
|
|
({ promise, resolve } = PromiseWithResolvers());
|
|
yield chunk;
|
|
}
|
|
}(),
|
|
{ signal },
|
|
);
|
|
|
|
return {
|
|
value,
|
|
write(chunk, encoding, cb) {
|
|
const _resolve = resolve;
|
|
resolve = null;
|
|
_resolve({ chunk, done: false, cb });
|
|
},
|
|
final(cb) {
|
|
const _resolve = resolve;
|
|
resolve = null;
|
|
_resolve({ done: true, cb });
|
|
},
|
|
destroy(err, cb) {
|
|
ac.abort();
|
|
cb(err);
|
|
},
|
|
};
|
|
}
|
|
|
|
function _duplexify(pair) {
|
|
const r = pair.readable && typeof pair.readable.read !== "function"
|
|
? Readable.wrap(pair.readable)
|
|
: pair.readable;
|
|
const w = pair.writable;
|
|
|
|
let readable = !!isReadable(r);
|
|
let writable = !!isWritable(w);
|
|
|
|
let ondrain;
|
|
let onfinish;
|
|
let onreadable;
|
|
let onclose;
|
|
let d;
|
|
|
|
function onfinished(err) {
|
|
const cb = onclose;
|
|
onclose = null;
|
|
|
|
if (cb) {
|
|
cb(err);
|
|
} else if (err) {
|
|
d.destroy(err);
|
|
}
|
|
}
|
|
|
|
// TODO(ronag): Avoid double buffering.
|
|
// Implement Writable/Readable/Duplex traits.
|
|
// See, https://github.com/nodejs/node/pull/33515.
|
|
d = new _Duplexify({
|
|
// TODO (ronag): highWaterMark?
|
|
readableObjectMode: !!r?.readableObjectMode,
|
|
writableObjectMode: !!w?.writableObjectMode,
|
|
readable,
|
|
writable,
|
|
});
|
|
|
|
if (writable) {
|
|
eos(w, (err) => {
|
|
writable = false;
|
|
if (err) {
|
|
destroyer(r, err);
|
|
}
|
|
onfinished(err);
|
|
});
|
|
|
|
d._write = function (chunk, encoding, callback) {
|
|
if (w.write(chunk, encoding)) {
|
|
callback();
|
|
} else {
|
|
ondrain = callback;
|
|
}
|
|
};
|
|
|
|
d._final = function (callback) {
|
|
w.end();
|
|
onfinish = callback;
|
|
};
|
|
|
|
w.on("drain", function () {
|
|
if (ondrain) {
|
|
const cb = ondrain;
|
|
ondrain = null;
|
|
cb();
|
|
}
|
|
});
|
|
|
|
w.on("finish", function () {
|
|
if (onfinish) {
|
|
const cb = onfinish;
|
|
onfinish = null;
|
|
cb();
|
|
}
|
|
});
|
|
}
|
|
|
|
if (readable) {
|
|
eos(r, (err) => {
|
|
readable = false;
|
|
if (err) {
|
|
destroyer(r, err);
|
|
}
|
|
onfinished(err);
|
|
});
|
|
|
|
r.on("readable", function () {
|
|
if (onreadable) {
|
|
const cb = onreadable;
|
|
onreadable = null;
|
|
cb();
|
|
}
|
|
});
|
|
|
|
r.on("end", function () {
|
|
d.push(null);
|
|
});
|
|
|
|
d._read = function () {
|
|
while (true) {
|
|
const buf = r.read();
|
|
|
|
if (buf === null) {
|
|
onreadable = d._read;
|
|
return;
|
|
}
|
|
|
|
if (!d.push(buf)) {
|
|
return;
|
|
}
|
|
}
|
|
};
|
|
}
|
|
|
|
d._destroy = function (err, callback) {
|
|
if (!err && onclose !== null) {
|
|
err = new AbortError();
|
|
}
|
|
|
|
onreadable = null;
|
|
ondrain = null;
|
|
onfinish = null;
|
|
|
|
if (onclose === null) {
|
|
callback(err);
|
|
} else {
|
|
onclose = callback;
|
|
destroyer(w, err);
|
|
destroyer(r, err);
|
|
}
|
|
};
|
|
|
|
return d;
|
|
}
|