mirror of
https://github.com/denoland/deno.git
synced 2025-09-22 02:12:33 +00:00
![gitstart-app[bot]](/assets/img/avatar_default.png)
Fix WebStreams adapter race condition when canceling during stream close --------- Co-authored-by: gitstart-app[bot] <80938352+gitstart-app[bot]@users.noreply.github.com> Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
699 lines
17 KiB
JavaScript
699 lines
17 KiB
JavaScript
// deno-lint-ignore-file
|
|
// Copyright 2018-2025 the Deno authors. MIT license.
|
|
import { destroy } from "ext:deno_node/internal/streams/destroy.js";
|
|
import finished from "ext:deno_node/internal/streams/end-of-stream.js";
|
|
import {
|
|
isDestroyed,
|
|
isReadable,
|
|
isReadableEnded,
|
|
isWritable,
|
|
isWritableEnded,
|
|
} from "ext:deno_node/internal/streams/utils.js";
|
|
import { ReadableStream, WritableStream } from "node:stream/web";
|
|
import {
|
|
validateBoolean,
|
|
validateObject,
|
|
} from "ext:deno_node/internal/validators.mjs";
|
|
import {
|
|
kEmptyObject,
|
|
normalizeEncoding,
|
|
} from "ext:deno_node/internal/util.mjs";
|
|
import { AbortError } from "ext:deno_node/internal/errors.ts";
|
|
import process from "node:process";
|
|
import { Buffer } from "node:buffer";
|
|
import { Duplex, Readable, Writable } from "node:stream";
|
|
|
|
function isWritableStream(object) {
|
|
return object instanceof WritableStream;
|
|
}
|
|
|
|
function isReadableStream(object) {
|
|
return object instanceof ReadableStream;
|
|
}
|
|
|
|
export function newStreamReadableFromReadableStream(
|
|
readableStream,
|
|
options = kEmptyObject,
|
|
) {
|
|
if (!isReadableStream(readableStream)) {
|
|
throw new ERR_INVALID_ARG_TYPE(
|
|
"readableStream",
|
|
"ReadableStream",
|
|
readableStream,
|
|
);
|
|
}
|
|
|
|
validateObject(options, "options");
|
|
const {
|
|
highWaterMark,
|
|
encoding,
|
|
objectMode = false,
|
|
signal,
|
|
} = options;
|
|
|
|
if (encoding !== undefined && !Buffer.isEncoding(encoding)) {
|
|
throw new ERR_INVALID_ARG_VALUE(encoding, "options.encoding");
|
|
}
|
|
validateBoolean(objectMode, "options.objectMode");
|
|
|
|
const reader = readableStream.getReader();
|
|
let closed = false;
|
|
|
|
const readable = new Readable({
|
|
objectMode,
|
|
highWaterMark,
|
|
encoding,
|
|
signal,
|
|
|
|
read() {
|
|
reader.read().then(
|
|
(chunk) => {
|
|
if (chunk.done) {
|
|
readable.push(null);
|
|
} else {
|
|
readable.push(chunk.value);
|
|
}
|
|
},
|
|
(error) => destroy.call(readable, error),
|
|
);
|
|
},
|
|
|
|
destroy(error, callback) {
|
|
function done() {
|
|
try {
|
|
callback(error);
|
|
} catch (error) {
|
|
// In a next tick because this is happening within
|
|
// a promise context, and if there are any errors
|
|
// thrown we don't want those to cause an unhandled
|
|
// rejection. Let's just escape the promise and
|
|
// handle it separately.
|
|
process.nextTick(() => {
|
|
throw error;
|
|
});
|
|
}
|
|
}
|
|
|
|
if (!closed) {
|
|
reader.cancel(error).then(done, done);
|
|
return;
|
|
}
|
|
|
|
done();
|
|
},
|
|
});
|
|
|
|
reader.closed.then(
|
|
() => {
|
|
closed = true;
|
|
},
|
|
(error) => {
|
|
closed = true;
|
|
destroy.call(readable, error);
|
|
},
|
|
);
|
|
|
|
return readable;
|
|
}
|
|
|
|
export function newStreamWritableFromWritableStream(
|
|
writableStream,
|
|
options = kEmptyObject,
|
|
) {
|
|
if (!isWritableStream(writableStream)) {
|
|
throw new ERR_INVALID_ARG_TYPE(
|
|
"writableStream",
|
|
"WritableStream",
|
|
writableStream,
|
|
);
|
|
}
|
|
|
|
validateObject(options, "options");
|
|
const {
|
|
highWaterMark,
|
|
decodeStrings = true,
|
|
objectMode = false,
|
|
signal,
|
|
} = options;
|
|
|
|
validateBoolean(objectMode, "options.objectMode");
|
|
validateBoolean(decodeStrings, "options.decodeStrings");
|
|
|
|
const writer = writableStream.getWriter();
|
|
let closed = false;
|
|
|
|
const writable = new Writable({
|
|
highWaterMark,
|
|
objectMode,
|
|
decodeStrings,
|
|
signal,
|
|
|
|
writev(chunks, callback) {
|
|
function done(error) {
|
|
error = error.filter((e) => e);
|
|
try {
|
|
callback(error.length === 0 ? undefined : error);
|
|
} catch (error) {
|
|
// In a next tick because this is happening within
|
|
// a promise context, and if there are any errors
|
|
// thrown we don't want those to cause an unhandled
|
|
// rejection. Let's just escape the promise and
|
|
// handle it separately.
|
|
process.nextTick(() => destroy.call(writable, error));
|
|
}
|
|
}
|
|
|
|
writer.ready.then(
|
|
() =>
|
|
Promise.all(
|
|
chunks.map((data) => writer.write(data.chunk)),
|
|
).then(done, done),
|
|
done,
|
|
);
|
|
},
|
|
|
|
write(chunk, encoding, callback) {
|
|
if (typeof chunk === "string" && decodeStrings && !objectMode) {
|
|
chunk = Buffer.from(chunk, encoding);
|
|
chunk = new Uint8Array(
|
|
chunk.buffer,
|
|
chunk.byteOffset,
|
|
chunk.byteLength,
|
|
);
|
|
}
|
|
|
|
function done(error) {
|
|
try {
|
|
callback(error);
|
|
} catch (error) {
|
|
destroy(this, duplex, error);
|
|
}
|
|
}
|
|
|
|
writer.ready.then(
|
|
() => writer.write(chunk).then(done, done),
|
|
done,
|
|
);
|
|
},
|
|
|
|
destroy(error, callback) {
|
|
function done() {
|
|
try {
|
|
callback(error);
|
|
} catch (error) {
|
|
// In a next tick because this is happening within
|
|
// a promise context, and if there are any errors
|
|
// thrown we don't want those to cause an unhandled
|
|
// rejection. Let's just escape the promise and
|
|
// handle it separately.
|
|
process.nextTick(() => {
|
|
throw error;
|
|
});
|
|
}
|
|
}
|
|
|
|
if (!closed) {
|
|
if (error != null) {
|
|
writer.abort(error).then(done, done);
|
|
} else {
|
|
writer.close().then(done, done);
|
|
}
|
|
return;
|
|
}
|
|
|
|
done();
|
|
},
|
|
|
|
final(callback) {
|
|
function done(error) {
|
|
try {
|
|
callback(error);
|
|
} catch (error) {
|
|
// In a next tick because this is happening within
|
|
// a promise context, and if there are any errors
|
|
// thrown we don't want those to cause an unhandled
|
|
// rejection. Let's just escape the promise and
|
|
// handle it separately.
|
|
process.nextTick(() => destroy.call(writable, error));
|
|
}
|
|
}
|
|
|
|
if (!closed) {
|
|
writer.close().then(done, done);
|
|
}
|
|
},
|
|
});
|
|
|
|
writer.closed.then(
|
|
() => {
|
|
closed = true;
|
|
},
|
|
(error) => {
|
|
closed = true;
|
|
destroy.call(writable, error);
|
|
},
|
|
);
|
|
|
|
return writable;
|
|
}
|
|
|
|
export function newStreamDuplexFromReadableWritablePair(
|
|
pair,
|
|
options = kEmptyObject,
|
|
) {
|
|
validateObject(pair, "pair");
|
|
const {
|
|
readable: readableStream,
|
|
writable: writableStream,
|
|
} = pair;
|
|
|
|
if (!isReadableStream(readableStream)) {
|
|
throw new ERR_INVALID_ARG_TYPE(
|
|
"pair.readable",
|
|
"ReadableStream",
|
|
readableStream,
|
|
);
|
|
}
|
|
if (!isWritableStream(writableStream)) {
|
|
throw new ERR_INVALID_ARG_TYPE(
|
|
"pair.writable",
|
|
"WritableStream",
|
|
writableStream,
|
|
);
|
|
}
|
|
|
|
validateObject(options, "options");
|
|
const {
|
|
allowHalfOpen = false,
|
|
objectMode = false,
|
|
encoding,
|
|
decodeStrings = true,
|
|
highWaterMark,
|
|
signal,
|
|
} = options;
|
|
|
|
validateBoolean(objectMode, "options.objectMode");
|
|
if (encoding !== undefined && !Buffer.isEncoding(encoding)) {
|
|
throw new ERR_INVALID_ARG_VALUE(encoding, "options.encoding");
|
|
}
|
|
|
|
const writer = writableStream.getWriter();
|
|
const reader = readableStream.getReader();
|
|
let writableClosed = false;
|
|
let readableClosed = false;
|
|
|
|
const duplex = new Duplex({
|
|
allowHalfOpen,
|
|
highWaterMark,
|
|
objectMode,
|
|
encoding,
|
|
decodeStrings,
|
|
signal,
|
|
|
|
writev(chunks, callback) {
|
|
function done(error) {
|
|
error = error.filter((e) => e);
|
|
try {
|
|
callback(error.length === 0 ? undefined : error);
|
|
} catch (error) {
|
|
// In a next tick because this is happening within
|
|
// a promise context, and if there are any errors
|
|
// thrown we don't want those to cause an unhandled
|
|
// rejection. Let's just escape the promise and
|
|
// handle it separately.
|
|
process.nextTick(() => destroy(duplex, error));
|
|
}
|
|
}
|
|
|
|
writer.ready.then(
|
|
() =>
|
|
Promise.all(
|
|
chunks.map((data) => writer.write(data.chunk)),
|
|
).then(done, done),
|
|
done,
|
|
);
|
|
},
|
|
|
|
write(chunk, encoding, callback) {
|
|
if (typeof chunk === "string" && decodeStrings && !objectMode) {
|
|
chunk = Buffer.from(chunk, encoding);
|
|
chunk = new Uint8Array(
|
|
chunk.buffer,
|
|
chunk.byteOffset,
|
|
chunk.byteLength,
|
|
);
|
|
}
|
|
|
|
function done(error) {
|
|
try {
|
|
callback(error);
|
|
} catch (error) {
|
|
destroy(duplex, error);
|
|
}
|
|
}
|
|
|
|
writer.ready.then(
|
|
() => writer.write(chunk).then(done, done),
|
|
done,
|
|
);
|
|
},
|
|
|
|
final(callback) {
|
|
function done(error) {
|
|
try {
|
|
callback(error);
|
|
} catch (error) {
|
|
// In a next tick because this is happening within
|
|
// a promise context, and if there are any errors
|
|
// thrown we don't want those to cause an unhandled
|
|
// rejection. Let's just escape the promise and
|
|
// handle it separately.
|
|
process.nextTick(() => destroy(duplex, error));
|
|
}
|
|
}
|
|
|
|
if (!writableClosed) {
|
|
writer.close().then(done, done);
|
|
}
|
|
},
|
|
|
|
read() {
|
|
reader.read().then(
|
|
(chunk) => {
|
|
if (chunk.done) {
|
|
duplex.push(null);
|
|
} else {
|
|
duplex.push(chunk.value);
|
|
}
|
|
},
|
|
(error) => destroy(duplex, error),
|
|
);
|
|
},
|
|
|
|
destroy(error, callback) {
|
|
function done() {
|
|
try {
|
|
callback(error);
|
|
} catch (error) {
|
|
// In a next tick because this is happening within
|
|
// a promise context, and if there are any errors
|
|
// thrown we don't want those to cause an unhandled
|
|
// rejection. Let's just escape the promise and
|
|
// handle it separately.
|
|
process.nextTick(() => {
|
|
throw error;
|
|
});
|
|
}
|
|
}
|
|
|
|
async function closeWriter() {
|
|
if (!writableClosed) {
|
|
await writer.abort(error);
|
|
}
|
|
}
|
|
|
|
async function closeReader() {
|
|
if (!readableClosed) {
|
|
await reader.cancel(error);
|
|
}
|
|
}
|
|
|
|
if (!writableClosed || !readableClosed) {
|
|
Promise.all([
|
|
closeWriter(),
|
|
closeReader(),
|
|
]).then(done, done);
|
|
return;
|
|
}
|
|
|
|
done();
|
|
},
|
|
});
|
|
|
|
writer.closed.then(
|
|
() => {
|
|
writableClosed = true;
|
|
},
|
|
(error) => {
|
|
writableClosed = true;
|
|
readableClosed = true;
|
|
destroy(duplex, error);
|
|
},
|
|
);
|
|
|
|
reader.closed.then(
|
|
() => {
|
|
readableClosed = true;
|
|
},
|
|
(error) => {
|
|
writableClosed = true;
|
|
readableClosed = true;
|
|
destroy(duplex, error);
|
|
},
|
|
);
|
|
|
|
return duplex;
|
|
}
|
|
|
|
export function newReadableStreamFromStreamReadable(
|
|
streamReadable,
|
|
options = kEmptyObject,
|
|
) {
|
|
// Not using the internal/streams/utils isReadableNodeStream utility
|
|
// here because it will return false if streamReadable is a Duplex
|
|
// whose readable option is false. For a Duplex that is not readable,
|
|
// we want it to pass this check but return a closed ReadableStream.
|
|
if (typeof streamReadable?._readableState !== "object") {
|
|
throw new ERR_INVALID_ARG_TYPE(
|
|
"streamReadable",
|
|
"stream.Readable",
|
|
streamReadable,
|
|
);
|
|
}
|
|
|
|
if (isDestroyed(streamReadable) || !isReadable(streamReadable)) {
|
|
const readable = new ReadableStream();
|
|
readable.cancel();
|
|
return readable;
|
|
}
|
|
|
|
const objectMode = streamReadable.readableObjectMode;
|
|
const highWaterMark = streamReadable.readableHighWaterMark;
|
|
|
|
const evaluateStrategyOrFallback = (strategy) => {
|
|
// If there is a strategy available, use it
|
|
if (strategy) {
|
|
return strategy;
|
|
}
|
|
|
|
if (objectMode) {
|
|
// When running in objectMode explicitly but no strategy, we just fall
|
|
// back to CountQueuingStrategy
|
|
return new CountQueuingStrategy({ highWaterMark });
|
|
}
|
|
|
|
// When not running in objectMode explicitly, we just fall
|
|
// back to a minimal strategy that just specifies the highWaterMark
|
|
// and no size algorithm. Using a ByteLengthQueuingStrategy here
|
|
// is unnecessary.
|
|
return { highWaterMark };
|
|
};
|
|
|
|
const strategy = evaluateStrategyOrFallback(options?.strategy);
|
|
|
|
let controller;
|
|
|
|
function onData(chunk) {
|
|
// Copy the Buffer to detach it from the pool.
|
|
if (Buffer.isBuffer(chunk) && !objectMode) {
|
|
chunk = new Uint8Array(chunk);
|
|
}
|
|
controller.enqueue(chunk);
|
|
if (controller.desiredSize <= 0) {
|
|
streamReadable.pause();
|
|
}
|
|
}
|
|
|
|
streamReadable.pause();
|
|
|
|
let isCanceled = false;
|
|
|
|
const cleanup = finished(streamReadable, (error) => {
|
|
if (error?.code === "ERR_STREAM_PREMATURE_CLOSE") {
|
|
const err = new AbortError(undefined, { cause: error });
|
|
error = err;
|
|
}
|
|
|
|
cleanup();
|
|
// This is a protection against non-standard, legacy streams
|
|
// that happen to emit an error event again after finished is called.
|
|
streamReadable.on("error", () => {});
|
|
if (error) {
|
|
return controller.error(error);
|
|
}
|
|
if (isCanceled) {
|
|
return;
|
|
}
|
|
controller.close();
|
|
});
|
|
|
|
streamReadable.on("data", onData);
|
|
|
|
return new ReadableStream({
|
|
start(c) {
|
|
controller = c;
|
|
},
|
|
|
|
pull() {
|
|
streamReadable.resume();
|
|
},
|
|
|
|
cancel(reason) {
|
|
isCanceled = true;
|
|
destroy.call(streamReadable, reason);
|
|
},
|
|
}, strategy);
|
|
}
|
|
|
|
export function newWritableStreamFromStreamWritable(streamWritable) {
|
|
// Not using the internal/streams/utils isWritableNodeStream utility
|
|
// here because it will return false if streamWritable is a Duplex
|
|
// whose writable option is false. For a Duplex that is not writable,
|
|
// we want it to pass this check but return a closed WritableStream.
|
|
if (typeof streamWritable?._writableState !== "object") {
|
|
throw new ERR_INVALID_ARG_TYPE(
|
|
"streamWritable",
|
|
"stream.Writable",
|
|
streamWritable,
|
|
);
|
|
}
|
|
|
|
if (isDestroyed(streamWritable) || !isWritable(streamWritable)) {
|
|
const writable = new WritableStream();
|
|
writable.close();
|
|
return writable;
|
|
}
|
|
|
|
const highWaterMark = streamWritable.writableHighWaterMark;
|
|
const strategy = streamWritable.writableObjectMode
|
|
? new CountQueuingStrategy({ highWaterMark })
|
|
: { highWaterMark };
|
|
|
|
let controller;
|
|
let backpressurePromise;
|
|
let closed;
|
|
|
|
function onDrain() {
|
|
if (backpressurePromise !== undefined) {
|
|
backpressurePromise.resolve();
|
|
}
|
|
}
|
|
|
|
const cleanup = finished(streamWritable, (error) => {
|
|
if (error?.code === "ERR_STREAM_PREMATURE_CLOSE") {
|
|
const err = new AbortError(undefined, { cause: error });
|
|
error = err;
|
|
}
|
|
|
|
cleanup();
|
|
// This is a protection against non-standard, legacy streams
|
|
// that happen to emit an error event again after finished is called.
|
|
streamWritable.on("error", () => {});
|
|
if (error != null) {
|
|
if (backpressurePromise !== undefined) {
|
|
backpressurePromise.reject(error);
|
|
}
|
|
// If closed is not undefined, the error is happening
|
|
// after the WritableStream close has already started.
|
|
// We need to reject it here.
|
|
if (closed !== undefined) {
|
|
closed.reject(error);
|
|
closed = undefined;
|
|
}
|
|
controller.error(error);
|
|
controller = undefined;
|
|
return;
|
|
}
|
|
|
|
if (closed !== undefined) {
|
|
closed.resolve();
|
|
closed = undefined;
|
|
return;
|
|
}
|
|
controller.error(new AbortError());
|
|
controller = undefined;
|
|
});
|
|
|
|
streamWritable.on("drain", onDrain);
|
|
|
|
return new WritableStream({
|
|
start(c) {
|
|
controller = c;
|
|
},
|
|
|
|
async write(chunk) {
|
|
if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
|
|
backpressurePromise = Promise.withResolvers();
|
|
return backpressurePromise.promise.finally(() => {
|
|
backpressurePromise = undefined;
|
|
});
|
|
}
|
|
},
|
|
|
|
abort(reason) {
|
|
destroy(streamWritable, reason);
|
|
},
|
|
|
|
close() {
|
|
if (closed === undefined && !isWritableEnded(streamWritable)) {
|
|
closed = Promise.withResolvers();
|
|
streamWritable.end();
|
|
return closed.promise;
|
|
}
|
|
|
|
controller = undefined;
|
|
return Promise.resolve();
|
|
},
|
|
}, strategy);
|
|
}
|
|
|
|
export function newReadableWritablePairFromDuplex(duplex) {
|
|
// Not using the internal/streams/utils isWritableNodeStream and
|
|
// isReadableNodestream utilities here because they will return false
|
|
// if the duplex was created with writable or readable options set to
|
|
// false. Instead, we'll check the readable and writable state after
|
|
// and return closed WritableStream or closed ReadableStream as
|
|
// necessary.
|
|
if (
|
|
typeof duplex?._writableState !== "object" ||
|
|
typeof duplex?._readableState !== "object"
|
|
) {
|
|
throw new ERR_INVALID_ARG_TYPE("duplex", "stream.Duplex", duplex);
|
|
}
|
|
|
|
if (isDestroyed(duplex)) {
|
|
const writable = new WritableStream();
|
|
const readable = new ReadableStream();
|
|
writable.close();
|
|
readable.cancel();
|
|
return { readable, writable };
|
|
}
|
|
|
|
const writable = isWritable(duplex)
|
|
? newWritableStreamFromStreamWritable(duplex)
|
|
: new WritableStream();
|
|
|
|
if (!isWritable(duplex)) {
|
|
writable.close();
|
|
}
|
|
|
|
const readable = isReadable(duplex)
|
|
? newReadableStreamFromStreamReadable(duplex)
|
|
: new ReadableStream();
|
|
|
|
if (!isReadable(duplex)) {
|
|
readable.cancel();
|
|
}
|
|
|
|
return { writable, readable };
|
|
}
|