diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 2ad350c553..c9fab6364a 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -779,37 +779,35 @@ class ResourceStreamResourceSink { * @param {any} sink * @param {Uint8Array} chunk */ -function readableStreamWriteChunkFn(reader, sink, chunk) { +async function readableStreamWriteChunkFn(reader, sink, chunk) { // Empty chunk. Re-read. if (chunk.length == 0) { - readableStreamReadFn(reader, sink); + await readableStreamReadFn(reader, sink); return; } const res = op_readable_stream_resource_write_sync(sink.external, chunk); if (res == 0) { // Closed - reader.cancel("resource closed"); + await reader.cancel("resource closed"); sink.close(); } else if (res == 1) { // Successfully written (synchronous). Re-read. - readableStreamReadFn(reader, sink); + await readableStreamReadFn(reader, sink); } else if (res == 2) { // Full. If the channel is full, we perform an async await until we can write, and then return // to a synchronous loop. - (async () => { - if ( - await op_readable_stream_resource_write_buf( - sink.external, - chunk, - ) - ) { - readableStreamReadFn(reader, sink); - } else { - reader.cancel("resource closed"); - sink.close(); - } - })(); + if ( + await op_readable_stream_resource_write_buf( + sink.external, + chunk, + ) + ) { + await readableStreamReadFn(reader, sink); + } else { + await reader.cancel("resource closed"); + sink.close(); + } } } @@ -822,17 +820,23 @@ function readableStreamReadFn(reader, sink) { // real resource. let reentrant = true; let gotChunk = undefined; + const promise = new Deferred(); readableStreamDefaultReaderRead(reader, { chunkSteps(chunk) { // If the chunk has non-zero length, write it if (reentrant) { gotChunk = chunk; } else { - readableStreamWriteChunkFn(reader, sink, chunk); + PromisePrototypeThen( + readableStreamWriteChunkFn(reader, sink, chunk), + () => promise.resolve(), + (e) => promise.reject(e), + ); } }, closeSteps() { sink.close(); + promise.resolve(); }, errorSteps(error) { const success = op_readable_stream_resource_write_error( @@ -842,15 +846,29 @@ function readableStreamReadFn(reader, sink) { // We don't cancel the reader if there was an error reading. We'll let the downstream // consumer close the resource after it receives the error. if (!success) { - reader.cancel("resource closed"); + PromisePrototypeThen( + reader.cancel("resource closed"), + () => { + sink.close(); + promise.resolve(); + }, + (e) => promise.reject(e), + ); + } else { + sink.close(); + promise.resolve(); } - sink.close(); }, }); reentrant = false; if (gotChunk) { - readableStreamWriteChunkFn(reader, sink, gotChunk); + PromisePrototypeThen( + readableStreamWriteChunkFn(reader, sink, gotChunk), + () => promise.resolve(), + (e) => promise.reject(e), + ); } + return promise.promise; } /** @@ -873,7 +891,9 @@ function resourceForReadableStream(stream, length) { PromisePrototypeCatch( PromisePrototypeThen( op_readable_stream_resource_await_close(rid), - () => reader.cancel("resource closed"), + () => { + PromisePrototypeCatch(reader.cancel("resource closed"), () => {}); + }, ), () => {}, ); @@ -884,7 +904,9 @@ function resourceForReadableStream(stream, length) { ); // Trigger the first read - readableStreamReadFn(reader, sink); + PromisePrototypeCatch(readableStreamReadFn(reader, sink), (err) => { + PromisePrototypeCatch(reader.cancel(err), () => {}); + }); return rid; } diff --git a/tests/unit/streams_test.ts b/tests/unit/streams_test.ts index 84a87d166d..d0a457ddc4 100644 --- a/tests/unit/streams_test.ts +++ b/tests/unit/streams_test.ts @@ -547,3 +547,107 @@ Deno.test(function readableStreamFromWithStringThrows() { "Failed to execute 'ReadableStream.from': Argument 1 can not be converted to async iterable.", ); }); + +Deno.test(async function readableStreamFromWithStringThrows() { + const serverPort = 4592; + const upstreamServerPort = 4593; + + const stopSignal = new AbortController(); + const promise = Promise.withResolvers(); + // Response transforming server that crashes with an uncaught AbortError. + function startServer() { + Deno.serve({ port: serverPort, signal: stopSignal.signal }, async (req) => { + const upstreamResponse = await fetch( + `http://localhost:${upstreamServerPort}`, + req, + ); + + // Use a TransformStream to convert the response body to uppercase. + const transformStream = new TransformStream({ + transform(chunk, controller) { + const decoder = new TextDecoder(); + const encoder = new TextEncoder(); + const chunk2 = encoder.encode(decoder.decode(chunk).toUpperCase()); + controller.enqueue(chunk2); + }, + }); + + upstreamResponse.body?.pipeTo(transformStream.writable).catch(() => {}); + + return new Response(transformStream.readable); + }); + } + + // ==== THE ISSUE IS NOT IN THE CODE BELOW ==== + + // Upstream server that sends a response with a body that never ends. + // This is not where the error happens (it handlers the cancellation correctly). + function startUpstreamServer() { + Deno.serve({ port: upstreamServerPort, signal: stopSignal.signal }, (_) => { + // Create an infinite readable stream that emits 'a' + let pushTimeout: number | null = null; + const readableStream = new ReadableStream({ + start(controller) { + const encoder = new TextEncoder(); + const chunk = encoder.encode("a"); + + function push() { + controller.enqueue(chunk); + pushTimeout = setTimeout(push, 100); + } + + push(); + }, + + cancel(reason) { + assertEquals(reason, "resource closed"); + promise.resolve(undefined); + clearTimeout(pushTimeout!); + }, + }); + + return new Response(readableStream, { + headers: { "Content-Type": "text/plain" }, + }); + }); + } + + // The client is just there to simulate a client that cancels a request. + async function startClient() { + const controller = new AbortController(); + const signal = controller.signal; + + try { + const response = await fetch(`http://localhost:${serverPort}`, { + signal, + }); + const reader = response.body?.getReader(); + if (!reader) { + throw new Error("client: failed to get reader from response"); + } + + let received = 0; + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + received += value.length; + + if (received >= 5) { + controller.abort(); + break; + } + } + } catch (_) { + // + } + } + + startUpstreamServer(); + startServer(); + const p = startClient(); + + await promise.promise; + stopSignal.abort(); + await p; +});