deno/ext/node/polyfills/internal/webstreams/adapters.js
gitstart-app[bot] c1f873dc4f
fix(ext/node): Readable adapter cancel semantics (#30526)
Fix WebStreams adapter race condition when canceling during stream close

---------

Co-authored-by: gitstart-app[bot] <80938352+gitstart-app[bot]@users.noreply.github.com>
Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
2025-09-01 09:29:06 +00:00

699 lines
17 KiB
JavaScript

// deno-lint-ignore-file
// Copyright 2018-2025 the Deno authors. MIT license.
import { destroy } from "ext:deno_node/internal/streams/destroy.js";
import finished from "ext:deno_node/internal/streams/end-of-stream.js";
import {
isDestroyed,
isReadable,
isReadableEnded,
isWritable,
isWritableEnded,
} from "ext:deno_node/internal/streams/utils.js";
import { ReadableStream, WritableStream } from "node:stream/web";
import {
validateBoolean,
validateObject,
} from "ext:deno_node/internal/validators.mjs";
import {
kEmptyObject,
normalizeEncoding,
} from "ext:deno_node/internal/util.mjs";
import { AbortError } from "ext:deno_node/internal/errors.ts";
import process from "node:process";
import { Buffer } from "node:buffer";
import { Duplex, Readable, Writable } from "node:stream";
function isWritableStream(object) {
return object instanceof WritableStream;
}
function isReadableStream(object) {
return object instanceof ReadableStream;
}
export function newStreamReadableFromReadableStream(
readableStream,
options = kEmptyObject,
) {
if (!isReadableStream(readableStream)) {
throw new ERR_INVALID_ARG_TYPE(
"readableStream",
"ReadableStream",
readableStream,
);
}
validateObject(options, "options");
const {
highWaterMark,
encoding,
objectMode = false,
signal,
} = options;
if (encoding !== undefined && !Buffer.isEncoding(encoding)) {
throw new ERR_INVALID_ARG_VALUE(encoding, "options.encoding");
}
validateBoolean(objectMode, "options.objectMode");
const reader = readableStream.getReader();
let closed = false;
const readable = new Readable({
objectMode,
highWaterMark,
encoding,
signal,
read() {
reader.read().then(
(chunk) => {
if (chunk.done) {
readable.push(null);
} else {
readable.push(chunk.value);
}
},
(error) => destroy.call(readable, error),
);
},
destroy(error, callback) {
function done() {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => {
throw error;
});
}
}
if (!closed) {
reader.cancel(error).then(done, done);
return;
}
done();
},
});
reader.closed.then(
() => {
closed = true;
},
(error) => {
closed = true;
destroy.call(readable, error);
},
);
return readable;
}
export function newStreamWritableFromWritableStream(
writableStream,
options = kEmptyObject,
) {
if (!isWritableStream(writableStream)) {
throw new ERR_INVALID_ARG_TYPE(
"writableStream",
"WritableStream",
writableStream,
);
}
validateObject(options, "options");
const {
highWaterMark,
decodeStrings = true,
objectMode = false,
signal,
} = options;
validateBoolean(objectMode, "options.objectMode");
validateBoolean(decodeStrings, "options.decodeStrings");
const writer = writableStream.getWriter();
let closed = false;
const writable = new Writable({
highWaterMark,
objectMode,
decodeStrings,
signal,
writev(chunks, callback) {
function done(error) {
error = error.filter((e) => e);
try {
callback(error.length === 0 ? undefined : error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => destroy.call(writable, error));
}
}
writer.ready.then(
() =>
Promise.all(
chunks.map((data) => writer.write(data.chunk)),
).then(done, done),
done,
);
},
write(chunk, encoding, callback) {
if (typeof chunk === "string" && decodeStrings && !objectMode) {
chunk = Buffer.from(chunk, encoding);
chunk = new Uint8Array(
chunk.buffer,
chunk.byteOffset,
chunk.byteLength,
);
}
function done(error) {
try {
callback(error);
} catch (error) {
destroy(this, duplex, error);
}
}
writer.ready.then(
() => writer.write(chunk).then(done, done),
done,
);
},
destroy(error, callback) {
function done() {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => {
throw error;
});
}
}
if (!closed) {
if (error != null) {
writer.abort(error).then(done, done);
} else {
writer.close().then(done, done);
}
return;
}
done();
},
final(callback) {
function done(error) {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => destroy.call(writable, error));
}
}
if (!closed) {
writer.close().then(done, done);
}
},
});
writer.closed.then(
() => {
closed = true;
},
(error) => {
closed = true;
destroy.call(writable, error);
},
);
return writable;
}
export function newStreamDuplexFromReadableWritablePair(
pair,
options = kEmptyObject,
) {
validateObject(pair, "pair");
const {
readable: readableStream,
writable: writableStream,
} = pair;
if (!isReadableStream(readableStream)) {
throw new ERR_INVALID_ARG_TYPE(
"pair.readable",
"ReadableStream",
readableStream,
);
}
if (!isWritableStream(writableStream)) {
throw new ERR_INVALID_ARG_TYPE(
"pair.writable",
"WritableStream",
writableStream,
);
}
validateObject(options, "options");
const {
allowHalfOpen = false,
objectMode = false,
encoding,
decodeStrings = true,
highWaterMark,
signal,
} = options;
validateBoolean(objectMode, "options.objectMode");
if (encoding !== undefined && !Buffer.isEncoding(encoding)) {
throw new ERR_INVALID_ARG_VALUE(encoding, "options.encoding");
}
const writer = writableStream.getWriter();
const reader = readableStream.getReader();
let writableClosed = false;
let readableClosed = false;
const duplex = new Duplex({
allowHalfOpen,
highWaterMark,
objectMode,
encoding,
decodeStrings,
signal,
writev(chunks, callback) {
function done(error) {
error = error.filter((e) => e);
try {
callback(error.length === 0 ? undefined : error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => destroy(duplex, error));
}
}
writer.ready.then(
() =>
Promise.all(
chunks.map((data) => writer.write(data.chunk)),
).then(done, done),
done,
);
},
write(chunk, encoding, callback) {
if (typeof chunk === "string" && decodeStrings && !objectMode) {
chunk = Buffer.from(chunk, encoding);
chunk = new Uint8Array(
chunk.buffer,
chunk.byteOffset,
chunk.byteLength,
);
}
function done(error) {
try {
callback(error);
} catch (error) {
destroy(duplex, error);
}
}
writer.ready.then(
() => writer.write(chunk).then(done, done),
done,
);
},
final(callback) {
function done(error) {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => destroy(duplex, error));
}
}
if (!writableClosed) {
writer.close().then(done, done);
}
},
read() {
reader.read().then(
(chunk) => {
if (chunk.done) {
duplex.push(null);
} else {
duplex.push(chunk.value);
}
},
(error) => destroy(duplex, error),
);
},
destroy(error, callback) {
function done() {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => {
throw error;
});
}
}
async function closeWriter() {
if (!writableClosed) {
await writer.abort(error);
}
}
async function closeReader() {
if (!readableClosed) {
await reader.cancel(error);
}
}
if (!writableClosed || !readableClosed) {
Promise.all([
closeWriter(),
closeReader(),
]).then(done, done);
return;
}
done();
},
});
writer.closed.then(
() => {
writableClosed = true;
},
(error) => {
writableClosed = true;
readableClosed = true;
destroy(duplex, error);
},
);
reader.closed.then(
() => {
readableClosed = true;
},
(error) => {
writableClosed = true;
readableClosed = true;
destroy(duplex, error);
},
);
return duplex;
}
export function newReadableStreamFromStreamReadable(
streamReadable,
options = kEmptyObject,
) {
// Not using the internal/streams/utils isReadableNodeStream utility
// here because it will return false if streamReadable is a Duplex
// whose readable option is false. For a Duplex that is not readable,
// we want it to pass this check but return a closed ReadableStream.
if (typeof streamReadable?._readableState !== "object") {
throw new ERR_INVALID_ARG_TYPE(
"streamReadable",
"stream.Readable",
streamReadable,
);
}
if (isDestroyed(streamReadable) || !isReadable(streamReadable)) {
const readable = new ReadableStream();
readable.cancel();
return readable;
}
const objectMode = streamReadable.readableObjectMode;
const highWaterMark = streamReadable.readableHighWaterMark;
const evaluateStrategyOrFallback = (strategy) => {
// If there is a strategy available, use it
if (strategy) {
return strategy;
}
if (objectMode) {
// When running in objectMode explicitly but no strategy, we just fall
// back to CountQueuingStrategy
return new CountQueuingStrategy({ highWaterMark });
}
// When not running in objectMode explicitly, we just fall
// back to a minimal strategy that just specifies the highWaterMark
// and no size algorithm. Using a ByteLengthQueuingStrategy here
// is unnecessary.
return { highWaterMark };
};
const strategy = evaluateStrategyOrFallback(options?.strategy);
let controller;
function onData(chunk) {
// Copy the Buffer to detach it from the pool.
if (Buffer.isBuffer(chunk) && !objectMode) {
chunk = new Uint8Array(chunk);
}
controller.enqueue(chunk);
if (controller.desiredSize <= 0) {
streamReadable.pause();
}
}
streamReadable.pause();
let isCanceled = false;
const cleanup = finished(streamReadable, (error) => {
if (error?.code === "ERR_STREAM_PREMATURE_CLOSE") {
const err = new AbortError(undefined, { cause: error });
error = err;
}
cleanup();
// This is a protection against non-standard, legacy streams
// that happen to emit an error event again after finished is called.
streamReadable.on("error", () => {});
if (error) {
return controller.error(error);
}
if (isCanceled) {
return;
}
controller.close();
});
streamReadable.on("data", onData);
return new ReadableStream({
start(c) {
controller = c;
},
pull() {
streamReadable.resume();
},
cancel(reason) {
isCanceled = true;
destroy.call(streamReadable, reason);
},
}, strategy);
}
export function newWritableStreamFromStreamWritable(streamWritable) {
// Not using the internal/streams/utils isWritableNodeStream utility
// here because it will return false if streamWritable is a Duplex
// whose writable option is false. For a Duplex that is not writable,
// we want it to pass this check but return a closed WritableStream.
if (typeof streamWritable?._writableState !== "object") {
throw new ERR_INVALID_ARG_TYPE(
"streamWritable",
"stream.Writable",
streamWritable,
);
}
if (isDestroyed(streamWritable) || !isWritable(streamWritable)) {
const writable = new WritableStream();
writable.close();
return writable;
}
const highWaterMark = streamWritable.writableHighWaterMark;
const strategy = streamWritable.writableObjectMode
? new CountQueuingStrategy({ highWaterMark })
: { highWaterMark };
let controller;
let backpressurePromise;
let closed;
function onDrain() {
if (backpressurePromise !== undefined) {
backpressurePromise.resolve();
}
}
const cleanup = finished(streamWritable, (error) => {
if (error?.code === "ERR_STREAM_PREMATURE_CLOSE") {
const err = new AbortError(undefined, { cause: error });
error = err;
}
cleanup();
// This is a protection against non-standard, legacy streams
// that happen to emit an error event again after finished is called.
streamWritable.on("error", () => {});
if (error != null) {
if (backpressurePromise !== undefined) {
backpressurePromise.reject(error);
}
// If closed is not undefined, the error is happening
// after the WritableStream close has already started.
// We need to reject it here.
if (closed !== undefined) {
closed.reject(error);
closed = undefined;
}
controller.error(error);
controller = undefined;
return;
}
if (closed !== undefined) {
closed.resolve();
closed = undefined;
return;
}
controller.error(new AbortError());
controller = undefined;
});
streamWritable.on("drain", onDrain);
return new WritableStream({
start(c) {
controller = c;
},
async write(chunk) {
if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
backpressurePromise = Promise.withResolvers();
return backpressurePromise.promise.finally(() => {
backpressurePromise = undefined;
});
}
},
abort(reason) {
destroy(streamWritable, reason);
},
close() {
if (closed === undefined && !isWritableEnded(streamWritable)) {
closed = Promise.withResolvers();
streamWritable.end();
return closed.promise;
}
controller = undefined;
return Promise.resolve();
},
}, strategy);
}
export function newReadableWritablePairFromDuplex(duplex) {
// Not using the internal/streams/utils isWritableNodeStream and
// isReadableNodestream utilities here because they will return false
// if the duplex was created with writable or readable options set to
// false. Instead, we'll check the readable and writable state after
// and return closed WritableStream or closed ReadableStream as
// necessary.
if (
typeof duplex?._writableState !== "object" ||
typeof duplex?._readableState !== "object"
) {
throw new ERR_INVALID_ARG_TYPE("duplex", "stream.Duplex", duplex);
}
if (isDestroyed(duplex)) {
const writable = new WritableStream();
const readable = new ReadableStream();
writable.close();
readable.cancel();
return { readable, writable };
}
const writable = isWritable(duplex)
? newWritableStreamFromStreamWritable(duplex)
: new WritableStream();
if (!isWritable(duplex)) {
writable.close();
}
const readable = isReadable(duplex)
? newReadableStreamFromStreamReadable(duplex)
: new ReadableStream();
if (!isReadable(duplex)) {
readable.cancel();
}
return { writable, readable };
}