fix(ext/flash): revert #16284 and add test case (#16576)

This commit is contained in:
Yoshiya Hinosawa 2022-11-09 17:20:05 +09:00 committed by GitHub
parent c08fcd96c1
commit 9edcab524f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 51 additions and 75 deletions

View file

@ -2282,6 +2282,40 @@ Deno.test(
},
);
// Checks large streaming response
// https://github.com/denoland/deno/issues/16567
Deno.test(
{ permissions: { net: true } },
async function testIssue16567() {
const ac = new AbortController();
const promise = deferred();
const server = Deno.serve(() =>
new Response(
new ReadableStream({
start(c) {
// 2MB "a...a" response with 40 chunks
for (const _ of Array(40)) {
c.enqueue(new Uint8Array(50_000).fill(97));
}
c.close();
},
}),
), {
async onListen() {
const res1 = await fetch("http://localhost:9000/");
assertEquals((await res1.text()).length, 40 * 50_000);
promise.resolve();
ac.abort();
},
signal: ac.signal,
});
await promise;
await server;
},
);
function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
const tp = new TextProtoReader(r);

View file

@ -371,8 +371,6 @@
}
} else {
const reader = respBody.getReader();
const { value, done } = await reader.read();
// Best case: sends headers + first chunk in a single go.
writeFixedResponse(
serverId,
i,
@ -387,21 +385,14 @@
false,
respondFast,
);
await respondChunked(
i,
value,
done,
);
if (!done) {
while (true) {
const chunk = await reader.read();
await respondChunked(
i,
chunk.value,
chunk.done,
);
if (chunk.done) break;
}
while (true) {
const { value, done } = await reader.read();
await respondChunked(
i,
value,
done,
);
if (done) break;
}
}
}
@ -632,22 +623,13 @@
});
function respondChunked(token, chunk, shutdown) {
const nwritten = core.ops.op_try_flash_respond_chuncked(
return core.opAsync(
"op_flash_respond_chuncked",
serverId,
token,
chunk ?? new Uint8Array(),
chunk,
shutdown,
);
if (nwritten > 0) {
return core.opAsync(
"op_flash_respond_chuncked",
serverId,
token,
chunk,
shutdown,
nwritten,
);
}
}
const fastOp = prepareFastCalls();

View file

@ -150,34 +150,6 @@ async fn op_flash_respond_async(
Ok(())
}
#[op(fast)]
fn op_try_flash_respond_chuncked(
op_state: &mut OpState,
server_id: u32,
token: u32,
response: &[u8],
shutdown: bool,
) -> u32 {
let flash_ctx = op_state.borrow_mut::<FlashContext>();
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
let tx = ctx.requests.get(&token).unwrap();
let sock = tx.socket();
// TODO(@littledivy): Use writev when `UnixIoSlice` lands.
// https://github.com/denoland/deno/pull/15629
let h = format!("{:x}\r\n", response.len());
let concat = [h.as_bytes(), response, b"\r\n"].concat();
let expected = sock.try_write(&concat);
if expected != concat.len() {
return expected as u32;
}
if shutdown {
// Best case: We've written everything and the stream is done too.
let _ = ctx.requests.remove(&token).unwrap();
}
0
}
#[op]
async fn op_flash_respond_chuncked(
op_state: Rc<RefCell<OpState>>,
@ -185,7 +157,6 @@ async fn op_flash_respond_chuncked(
token: u32,
response: Option<ZeroCopyBuf>,
shutdown: bool,
nwritten: u32,
) -> Result<(), AnyError> {
let mut op_state = op_state.borrow_mut();
let flash_ctx = op_state.borrow_mut::<FlashContext>();
@ -207,27 +178,17 @@ async fn op_flash_respond_chuncked(
.with_async_stream(|stream| {
Box::pin(async move {
use tokio::io::AsyncWriteExt;
// TODO(@littledivy): Use writev when `UnixIoSlice` lands.
// https://github.com/denoland/deno/pull/15629
macro_rules! write_whats_not_written {
($e:expr) => {
let e = $e;
let n = nwritten as usize;
if n < e.len() {
stream.write_all(&e[n..]).await?;
}
};
}
if let Some(response) = response {
let h = format!("{:x}\r\n", response.len());
write_whats_not_written!(h.as_bytes());
write_whats_not_written!(&response);
write_whats_not_written!(b"\r\n");
stream
.write_all(format!("{:x}\r\n", response.len()).as_bytes())
.await?;
stream.write_all(&response).await?;
stream.write_all(b"\r\n").await?;
}
// The last chunk
if shutdown {
write_whats_not_written!(b"0\r\n\r\n");
stream.write_all(b"0\r\n\r\n").await?;
}
Ok(())
@ -1487,7 +1448,6 @@ pub fn init<P: FlashPermissions + 'static>(unstable: bool) -> Extension {
op_flash_respond::decl(),
op_flash_respond_async::decl(),
op_flash_respond_chuncked::decl(),
op_try_flash_respond_chuncked::decl(),
op_flash_method::decl(),
op_flash_path::decl(),
op_flash_headers::decl(),