fix(streams): handle Resource stream error (#27975)

Fixes #27715
This commit is contained in:
Leo Kettmeir 2025-02-11 18:27:52 +01:00 committed by GitHub
parent a2afae46b6
commit c1a0d63753
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 149 additions and 23 deletions

View file

@ -779,37 +779,35 @@ class ResourceStreamResourceSink {
* @param {any} sink
* @param {Uint8Array} chunk
*/
function readableStreamWriteChunkFn(reader, sink, chunk) {
async function readableStreamWriteChunkFn(reader, sink, chunk) {
// Empty chunk. Re-read.
if (chunk.length == 0) {
readableStreamReadFn(reader, sink);
await readableStreamReadFn(reader, sink);
return;
}
const res = op_readable_stream_resource_write_sync(sink.external, chunk);
if (res == 0) {
// Closed
reader.cancel("resource closed");
await reader.cancel("resource closed");
sink.close();
} else if (res == 1) {
// Successfully written (synchronous). Re-read.
readableStreamReadFn(reader, sink);
await readableStreamReadFn(reader, sink);
} else if (res == 2) {
// Full. If the channel is full, we perform an async await until we can write, and then return
// to a synchronous loop.
(async () => {
if (
await op_readable_stream_resource_write_buf(
sink.external,
chunk,
)
) {
readableStreamReadFn(reader, sink);
} else {
reader.cancel("resource closed");
sink.close();
}
})();
if (
await op_readable_stream_resource_write_buf(
sink.external,
chunk,
)
) {
await readableStreamReadFn(reader, sink);
} else {
await reader.cancel("resource closed");
sink.close();
}
}
}
@ -822,17 +820,23 @@ function readableStreamReadFn(reader, sink) {
// real resource.
let reentrant = true;
let gotChunk = undefined;
const promise = new Deferred();
readableStreamDefaultReaderRead(reader, {
chunkSteps(chunk) {
// If the chunk has non-zero length, write it
if (reentrant) {
gotChunk = chunk;
} else {
readableStreamWriteChunkFn(reader, sink, chunk);
PromisePrototypeThen(
readableStreamWriteChunkFn(reader, sink, chunk),
() => promise.resolve(),
(e) => promise.reject(e),
);
}
},
closeSteps() {
sink.close();
promise.resolve();
},
errorSteps(error) {
const success = op_readable_stream_resource_write_error(
@ -842,15 +846,29 @@ function readableStreamReadFn(reader, sink) {
// We don't cancel the reader if there was an error reading. We'll let the downstream
// consumer close the resource after it receives the error.
if (!success) {
reader.cancel("resource closed");
PromisePrototypeThen(
reader.cancel("resource closed"),
() => {
sink.close();
promise.resolve();
},
(e) => promise.reject(e),
);
} else {
sink.close();
promise.resolve();
}
sink.close();
},
});
reentrant = false;
if (gotChunk) {
readableStreamWriteChunkFn(reader, sink, gotChunk);
PromisePrototypeThen(
readableStreamWriteChunkFn(reader, sink, gotChunk),
() => promise.resolve(),
(e) => promise.reject(e),
);
}
return promise.promise;
}
/**
@ -873,7 +891,9 @@ function resourceForReadableStream(stream, length) {
PromisePrototypeCatch(
PromisePrototypeThen(
op_readable_stream_resource_await_close(rid),
() => reader.cancel("resource closed"),
() => {
PromisePrototypeCatch(reader.cancel("resource closed"), () => {});
},
),
() => {},
);
@ -884,7 +904,9 @@ function resourceForReadableStream(stream, length) {
);
// Trigger the first read
readableStreamReadFn(reader, sink);
PromisePrototypeCatch(readableStreamReadFn(reader, sink), (err) => {
PromisePrototypeCatch(reader.cancel(err), () => {});
});
return rid;
}