// deno-lint-ignore-file // Copyright 2018-2025 the Deno authors. MIT license. import { destroy } from "ext:deno_node/internal/streams/destroy.js"; import finished from "ext:deno_node/internal/streams/end-of-stream.js"; import { isDestroyed, isReadable, isReadableEnded, isWritable, isWritableEnded, } from "ext:deno_node/internal/streams/utils.js"; import { ReadableStream, WritableStream } from "node:stream/web"; import { validateBoolean, validateObject, } from "ext:deno_node/internal/validators.mjs"; import { kEmptyObject, normalizeEncoding, } from "ext:deno_node/internal/util.mjs"; import { AbortError } from "ext:deno_node/internal/errors.ts"; import process from "node:process"; import { Buffer } from "node:buffer"; import { Duplex, Readable, Writable } from "node:stream"; function isWritableStream(object) { return object instanceof WritableStream; } function isReadableStream(object) { return object instanceof ReadableStream; } export function newStreamReadableFromReadableStream( readableStream, options = kEmptyObject, ) { if (!isReadableStream(readableStream)) { throw new ERR_INVALID_ARG_TYPE( "readableStream", "ReadableStream", readableStream, ); } validateObject(options, "options"); const { highWaterMark, encoding, objectMode = false, signal, } = options; if (encoding !== undefined && !Buffer.isEncoding(encoding)) { throw new ERR_INVALID_ARG_VALUE(encoding, "options.encoding"); } validateBoolean(objectMode, "options.objectMode"); const reader = readableStream.getReader(); let closed = false; const readable = new Readable({ objectMode, highWaterMark, encoding, signal, read() { reader.read().then( (chunk) => { if (chunk.done) { readable.push(null); } else { readable.push(chunk.value); } }, (error) => destroy.call(readable, error), ); }, destroy(error, callback) { function done() { try { callback(error); } catch (error) { // In a next tick because this is happening within // a promise context, and if there are any errors // thrown we don't want those to cause an unhandled // rejection. Let's just escape the promise and // handle it separately. process.nextTick(() => { throw error; }); } } if (!closed) { reader.cancel(error).then(done, done); return; } done(); }, }); reader.closed.then( () => { closed = true; }, (error) => { closed = true; destroy.call(readable, error); }, ); return readable; } export function newStreamWritableFromWritableStream( writableStream, options = kEmptyObject, ) { if (!isWritableStream(writableStream)) { throw new ERR_INVALID_ARG_TYPE( "writableStream", "WritableStream", writableStream, ); } validateObject(options, "options"); const { highWaterMark, decodeStrings = true, objectMode = false, signal, } = options; validateBoolean(objectMode, "options.objectMode"); validateBoolean(decodeStrings, "options.decodeStrings"); const writer = writableStream.getWriter(); let closed = false; const writable = new Writable({ highWaterMark, objectMode, decodeStrings, signal, writev(chunks, callback) { function done(error) { error = error.filter((e) => e); try { callback(error.length === 0 ? undefined : error); } catch (error) { // In a next tick because this is happening within // a promise context, and if there are any errors // thrown we don't want those to cause an unhandled // rejection. Let's just escape the promise and // handle it separately. process.nextTick(() => destroy.call(writable, error)); } } writer.ready.then( () => Promise.all( chunks.map((data) => writer.write(data.chunk)), ).then(done, done), done, ); }, write(chunk, encoding, callback) { if (typeof chunk === "string" && decodeStrings && !objectMode) { chunk = Buffer.from(chunk, encoding); chunk = new Uint8Array( chunk.buffer, chunk.byteOffset, chunk.byteLength, ); } function done(error) { try { callback(error); } catch (error) { destroy(this, duplex, error); } } writer.ready.then( () => writer.write(chunk).then(done, done), done, ); }, destroy(error, callback) { function done() { try { callback(error); } catch (error) { // In a next tick because this is happening within // a promise context, and if there are any errors // thrown we don't want those to cause an unhandled // rejection. Let's just escape the promise and // handle it separately. process.nextTick(() => { throw error; }); } } if (!closed) { if (error != null) { writer.abort(error).then(done, done); } else { writer.close().then(done, done); } return; } done(); }, final(callback) { function done(error) { try { callback(error); } catch (error) { // In a next tick because this is happening within // a promise context, and if there are any errors // thrown we don't want those to cause an unhandled // rejection. Let's just escape the promise and // handle it separately. process.nextTick(() => destroy.call(writable, error)); } } if (!closed) { writer.close().then(done, done); } }, }); writer.closed.then( () => { closed = true; }, (error) => { closed = true; destroy.call(writable, error); }, ); return writable; } export function newStreamDuplexFromReadableWritablePair( pair, options = kEmptyObject, ) { validateObject(pair, "pair"); const { readable: readableStream, writable: writableStream, } = pair; if (!isReadableStream(readableStream)) { throw new ERR_INVALID_ARG_TYPE( "pair.readable", "ReadableStream", readableStream, ); } if (!isWritableStream(writableStream)) { throw new ERR_INVALID_ARG_TYPE( "pair.writable", "WritableStream", writableStream, ); } validateObject(options, "options"); const { allowHalfOpen = false, objectMode = false, encoding, decodeStrings = true, highWaterMark, signal, } = options; validateBoolean(objectMode, "options.objectMode"); if (encoding !== undefined && !Buffer.isEncoding(encoding)) { throw new ERR_INVALID_ARG_VALUE(encoding, "options.encoding"); } const writer = writableStream.getWriter(); const reader = readableStream.getReader(); let writableClosed = false; let readableClosed = false; const duplex = new Duplex({ allowHalfOpen, highWaterMark, objectMode, encoding, decodeStrings, signal, writev(chunks, callback) { function done(error) { error = error.filter((e) => e); try { callback(error.length === 0 ? undefined : error); } catch (error) { // In a next tick because this is happening within // a promise context, and if there are any errors // thrown we don't want those to cause an unhandled // rejection. Let's just escape the promise and // handle it separately. process.nextTick(() => destroy(duplex, error)); } } writer.ready.then( () => Promise.all( chunks.map((data) => writer.write(data.chunk)), ).then(done, done), done, ); }, write(chunk, encoding, callback) { if (typeof chunk === "string" && decodeStrings && !objectMode) { chunk = Buffer.from(chunk, encoding); chunk = new Uint8Array( chunk.buffer, chunk.byteOffset, chunk.byteLength, ); } function done(error) { try { callback(error); } catch (error) { destroy(duplex, error); } } writer.ready.then( () => writer.write(chunk).then(done, done), done, ); }, final(callback) { function done(error) { try { callback(error); } catch (error) { // In a next tick because this is happening within // a promise context, and if there are any errors // thrown we don't want those to cause an unhandled // rejection. Let's just escape the promise and // handle it separately. process.nextTick(() => destroy(duplex, error)); } } if (!writableClosed) { writer.close().then(done, done); } }, read() { reader.read().then( (chunk) => { if (chunk.done) { duplex.push(null); } else { duplex.push(chunk.value); } }, (error) => destroy(duplex, error), ); }, destroy(error, callback) { function done() { try { callback(error); } catch (error) { // In a next tick because this is happening within // a promise context, and if there are any errors // thrown we don't want those to cause an unhandled // rejection. Let's just escape the promise and // handle it separately. process.nextTick(() => { throw error; }); } } async function closeWriter() { if (!writableClosed) { await writer.abort(error); } } async function closeReader() { if (!readableClosed) { await reader.cancel(error); } } if (!writableClosed || !readableClosed) { Promise.all([ closeWriter(), closeReader(), ]).then(done, done); return; } done(); }, }); writer.closed.then( () => { writableClosed = true; }, (error) => { writableClosed = true; readableClosed = true; destroy(duplex, error); }, ); reader.closed.then( () => { readableClosed = true; }, (error) => { writableClosed = true; readableClosed = true; destroy(duplex, error); }, ); return duplex; } export function newReadableStreamFromStreamReadable( streamReadable, options = kEmptyObject, ) { // Not using the internal/streams/utils isReadableNodeStream utility // here because it will return false if streamReadable is a Duplex // whose readable option is false. For a Duplex that is not readable, // we want it to pass this check but return a closed ReadableStream. if (typeof streamReadable?._readableState !== "object") { throw new ERR_INVALID_ARG_TYPE( "streamReadable", "stream.Readable", streamReadable, ); } if (isDestroyed(streamReadable) || !isReadable(streamReadable)) { const readable = new ReadableStream(); readable.cancel(); return readable; } const objectMode = streamReadable.readableObjectMode; const highWaterMark = streamReadable.readableHighWaterMark; const evaluateStrategyOrFallback = (strategy) => { // If there is a strategy available, use it if (strategy) { return strategy; } if (objectMode) { // When running in objectMode explicitly but no strategy, we just fall // back to CountQueuingStrategy return new CountQueuingStrategy({ highWaterMark }); } // When not running in objectMode explicitly, we just fall // back to a minimal strategy that just specifies the highWaterMark // and no size algorithm. Using a ByteLengthQueuingStrategy here // is unnecessary. return { highWaterMark }; }; const strategy = evaluateStrategyOrFallback(options?.strategy); let controller; function onData(chunk) { // Copy the Buffer to detach it from the pool. if (Buffer.isBuffer(chunk) && !objectMode) { chunk = new Uint8Array(chunk); } controller.enqueue(chunk); if (controller.desiredSize <= 0) { streamReadable.pause(); } } streamReadable.pause(); let isCanceled = false; const cleanup = finished(streamReadable, (error) => { if (error?.code === "ERR_STREAM_PREMATURE_CLOSE") { const err = new AbortError(undefined, { cause: error }); error = err; } cleanup(); // This is a protection against non-standard, legacy streams // that happen to emit an error event again after finished is called. streamReadable.on("error", () => {}); if (error) { return controller.error(error); } if (isCanceled) { return; } controller.close(); }); streamReadable.on("data", onData); return new ReadableStream({ start(c) { controller = c; }, pull() { streamReadable.resume(); }, cancel(reason) { isCanceled = true; destroy.call(streamReadable, reason); }, }, strategy); } export function newWritableStreamFromStreamWritable(streamWritable) { // Not using the internal/streams/utils isWritableNodeStream utility // here because it will return false if streamWritable is a Duplex // whose writable option is false. For a Duplex that is not writable, // we want it to pass this check but return a closed WritableStream. if (typeof streamWritable?._writableState !== "object") { throw new ERR_INVALID_ARG_TYPE( "streamWritable", "stream.Writable", streamWritable, ); } if (isDestroyed(streamWritable) || !isWritable(streamWritable)) { const writable = new WritableStream(); writable.close(); return writable; } const highWaterMark = streamWritable.writableHighWaterMark; const strategy = streamWritable.writableObjectMode ? new CountQueuingStrategy({ highWaterMark }) : { highWaterMark }; let controller; let backpressurePromise; let closed; function onDrain() { if (backpressurePromise !== undefined) { backpressurePromise.resolve(); } } const cleanup = finished(streamWritable, (error) => { if (error?.code === "ERR_STREAM_PREMATURE_CLOSE") { const err = new AbortError(undefined, { cause: error }); error = err; } cleanup(); // This is a protection against non-standard, legacy streams // that happen to emit an error event again after finished is called. streamWritable.on("error", () => {}); if (error != null) { if (backpressurePromise !== undefined) { backpressurePromise.reject(error); } // If closed is not undefined, the error is happening // after the WritableStream close has already started. // We need to reject it here. if (closed !== undefined) { closed.reject(error); closed = undefined; } controller.error(error); controller = undefined; return; } if (closed !== undefined) { closed.resolve(); closed = undefined; return; } controller.error(new AbortError()); controller = undefined; }); streamWritable.on("drain", onDrain); return new WritableStream({ start(c) { controller = c; }, async write(chunk) { if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) { backpressurePromise = Promise.withResolvers(); return backpressurePromise.promise.finally(() => { backpressurePromise = undefined; }); } }, abort(reason) { destroy(streamWritable, reason); }, close() { if (closed === undefined && !isWritableEnded(streamWritable)) { closed = Promise.withResolvers(); streamWritable.end(); return closed.promise; } controller = undefined; return Promise.resolve(); }, }, strategy); } export function newReadableWritablePairFromDuplex(duplex) { // Not using the internal/streams/utils isWritableNodeStream and // isReadableNodestream utilities here because they will return false // if the duplex was created with writable or readable options set to // false. Instead, we'll check the readable and writable state after // and return closed WritableStream or closed ReadableStream as // necessary. if ( typeof duplex?._writableState !== "object" || typeof duplex?._readableState !== "object" ) { throw new ERR_INVALID_ARG_TYPE("duplex", "stream.Duplex", duplex); } if (isDestroyed(duplex)) { const writable = new WritableStream(); const readable = new ReadableStream(); writable.close(); readable.cancel(); return { readable, writable }; } const writable = isWritable(duplex) ? newWritableStreamFromStreamWritable(duplex) : new WritableStream(); if (!isWritable(duplex)) { writable.close(); } const readable = isReadable(duplex) ? newReadableStreamFromStreamReadable(duplex) : new ReadableStream(); if (!isReadable(duplex)) { readable.cancel(); } return { writable, readable }; }