mirror of
https://github.com/denoland/deno.git
synced 2025-09-27 12:49:10 +00:00
Crashes while running wrk against
js/deps/https/deno.land/std/http/http_bench.ts
This reverts commit 972ac03858
.
This commit is contained in:
parent
a4551c853e
commit
1af02b405e
3 changed files with 40 additions and 110 deletions
|
@ -171,49 +171,12 @@ impl Resource {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Track the current task (for TcpListener resource).
|
|
||||||
/// Throws an error if another task is already tracked.
|
|
||||||
pub fn track_task(&mut self) -> Result<(), std::io::Error> {
|
|
||||||
let mut table = RESOURCE_TABLE.lock().unwrap();
|
|
||||||
// Only track if is TcpListener.
|
|
||||||
if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) {
|
|
||||||
// Currently, we only allow tracking a single accept task for a listener.
|
|
||||||
// This might be changed in the future with multiple workers.
|
|
||||||
// Caveat: TcpListener by itself also only tracks an accept task at a time.
|
|
||||||
// See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
|
|
||||||
if t.is_some() {
|
|
||||||
return Err(std::io::Error::new(
|
|
||||||
std::io::ErrorKind::Other,
|
|
||||||
"Another accept task is ongoing",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
t.replace(futures::task::current());
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Stop tracking a task (for TcpListener resource).
|
|
||||||
/// Happens when the task is done and thus no further tracking is needed.
|
|
||||||
pub fn untrack_task(&mut self) {
|
|
||||||
let mut table = RESOURCE_TABLE.lock().unwrap();
|
|
||||||
// Only untrack if is TcpListener.
|
|
||||||
if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) {
|
|
||||||
assert!(t.is_some());
|
|
||||||
t.take();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// close(2) is done by dropping the value. Therefore we just need to remove
|
// close(2) is done by dropping the value. Therefore we just need to remove
|
||||||
// the resource from the RESOURCE_TABLE.
|
// the resource from the RESOURCE_TABLE.
|
||||||
pub fn close(&self) {
|
pub fn close(&self) {
|
||||||
let mut table = RESOURCE_TABLE.lock().unwrap();
|
let mut table = RESOURCE_TABLE.lock().unwrap();
|
||||||
let r = table.remove(&self.rid);
|
let r = table.remove(&self.rid);
|
||||||
assert!(r.is_some());
|
assert!(r.is_some());
|
||||||
// If TcpListener, we must kill all pending accepts!
|
|
||||||
if let Repr::TcpListener(_, Some(t)) = r.unwrap() {
|
|
||||||
// Call notify on the tracked task, so that they would error out.
|
|
||||||
t.notify();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn shutdown(&mut self, how: Shutdown) -> Result<(), DenoError> {
|
pub fn shutdown(&mut self, how: Shutdown) -> Result<(), DenoError> {
|
||||||
|
|
|
@ -78,31 +78,14 @@ pub fn accept(r: Resource) -> Accept {
|
||||||
pub struct Accept {
|
pub struct Accept {
|
||||||
state: AcceptState,
|
state: AcceptState,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Future for Accept {
|
impl Future for Accept {
|
||||||
type Item = (TcpStream, SocketAddr);
|
type Item = (TcpStream, SocketAddr);
|
||||||
type Error = io::Error;
|
type Error = io::Error;
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
let (stream, addr) = match self.state {
|
let (stream, addr) = match self.state {
|
||||||
// Similar to try_ready!, but also track/untrack accept task
|
AcceptState::Pending(ref mut r) => try_ready!(r.poll_accept()),
|
||||||
// in TcpListener resource.
|
|
||||||
// In this way, when the listener is closed, the task can be
|
|
||||||
// notified to error out (instead of stuck forever).
|
|
||||||
AcceptState::Pending(ref mut r) => match r.poll_accept() {
|
|
||||||
Ok(futures::prelude::Async::Ready(t)) => {
|
|
||||||
r.untrack_task();
|
|
||||||
t
|
|
||||||
}
|
|
||||||
Ok(futures::prelude::Async::NotReady) => {
|
|
||||||
// Would error out if another accept task is being tracked.
|
|
||||||
r.track_task()?;
|
|
||||||
return Ok(futures::prelude::Async::NotReady);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
r.untrack_task();
|
|
||||||
return Err(From::from(e));
|
|
||||||
}
|
|
||||||
},
|
|
||||||
AcceptState::Empty => panic!("poll Accept after it's done"),
|
AcceptState::Empty => panic!("poll Accept after it's done"),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -1,22 +1,6 @@
|
||||||
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
|
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
|
||||||
import { testPerm, assert, assertEquals } from "./test_util.ts";
|
import { testPerm, assert, assertEquals } from "./test_util.ts";
|
||||||
|
|
||||||
function deferred(): {
|
|
||||||
resolve: () => void;
|
|
||||||
reject: () => void;
|
|
||||||
promise: Promise<void>;
|
|
||||||
} {
|
|
||||||
let resolve: () => void;
|
|
||||||
let reject: () => void;
|
|
||||||
const promise = new Promise<void>(
|
|
||||||
(a, b): void => {
|
|
||||||
resolve = a;
|
|
||||||
reject = b;
|
|
||||||
}
|
|
||||||
);
|
|
||||||
return { resolve, reject, promise };
|
|
||||||
}
|
|
||||||
|
|
||||||
testPerm({ net: true }, function netListenClose(): void {
|
testPerm({ net: true }, function netListenClose(): void {
|
||||||
const listener = Deno.listen("tcp", "127.0.0.1:4500");
|
const listener = Deno.listen("tcp", "127.0.0.1:4500");
|
||||||
listener.close();
|
listener.close();
|
||||||
|
@ -88,25 +72,24 @@ testPerm({ net: true }, async function netDialListen(): Promise<void> {
|
||||||
conn.close();
|
conn.close();
|
||||||
});
|
});
|
||||||
|
|
||||||
testPerm({ net: true }, async function netCloseReadSuccess(): Promise<void> {
|
/* TODO Fix broken test.
|
||||||
|
testPerm({ net: true }, async function netCloseReadSuccess() {
|
||||||
const addr = "127.0.0.1:4500";
|
const addr = "127.0.0.1:4500";
|
||||||
const listener = Deno.listen("tcp", addr);
|
const listener = Deno.listen("tcp", addr);
|
||||||
const closeDeferred = deferred();
|
const closeDeferred = deferred();
|
||||||
const closeReadDeferred = deferred();
|
const closeReadDeferred = deferred();
|
||||||
listener.accept().then(
|
listener.accept().then(async conn => {
|
||||||
async (conn): Promise<void> => {
|
await closeReadDeferred.promise;
|
||||||
await closeReadDeferred.promise;
|
await conn.write(new Uint8Array([1, 2, 3]));
|
||||||
await conn.write(new Uint8Array([1, 2, 3]));
|
const buf = new Uint8Array(1024);
|
||||||
const buf = new Uint8Array(1024);
|
const readResult = await conn.read(buf);
|
||||||
const readResult = await conn.read(buf);
|
assertEquals(3, readResult.nread);
|
||||||
assertEquals(3, readResult.nread);
|
assertEquals(4, buf[0]);
|
||||||
assertEquals(4, buf[0]);
|
assertEquals(5, buf[1]);
|
||||||
assertEquals(5, buf[1]);
|
assertEquals(6, buf[2]);
|
||||||
assertEquals(6, buf[2]);
|
conn.close();
|
||||||
conn.close();
|
closeDeferred.resolve();
|
||||||
closeDeferred.resolve();
|
});
|
||||||
}
|
|
||||||
);
|
|
||||||
const conn = await Deno.dial("tcp", addr);
|
const conn = await Deno.dial("tcp", addr);
|
||||||
conn.closeRead(); // closing read
|
conn.closeRead(); // closing read
|
||||||
closeReadDeferred.resolve();
|
closeReadDeferred.resolve();
|
||||||
|
@ -120,18 +103,18 @@ testPerm({ net: true }, async function netCloseReadSuccess(): Promise<void> {
|
||||||
listener.close();
|
listener.close();
|
||||||
conn.close();
|
conn.close();
|
||||||
});
|
});
|
||||||
|
*/
|
||||||
|
|
||||||
testPerm({ net: true }, async function netDoubleCloseRead(): Promise<void> {
|
/* TODO Fix broken test.
|
||||||
|
testPerm({ net: true }, async function netDoubleCloseRead() {
|
||||||
const addr = "127.0.0.1:4500";
|
const addr = "127.0.0.1:4500";
|
||||||
const listener = Deno.listen("tcp", addr);
|
const listener = Deno.listen("tcp", addr);
|
||||||
const closeDeferred = deferred();
|
const closeDeferred = deferred();
|
||||||
listener.accept().then(
|
listener.accept().then(async conn => {
|
||||||
async (conn): Promise<void> => {
|
await conn.write(new Uint8Array([1, 2, 3]));
|
||||||
await conn.write(new Uint8Array([1, 2, 3]));
|
await closeDeferred.promise;
|
||||||
await closeDeferred.promise;
|
conn.close();
|
||||||
conn.close();
|
});
|
||||||
}
|
|
||||||
);
|
|
||||||
const conn = await Deno.dial("tcp", addr);
|
const conn = await Deno.dial("tcp", addr);
|
||||||
conn.closeRead(); // closing read
|
conn.closeRead(); // closing read
|
||||||
let err;
|
let err;
|
||||||
|
@ -148,18 +131,18 @@ testPerm({ net: true }, async function netDoubleCloseRead(): Promise<void> {
|
||||||
listener.close();
|
listener.close();
|
||||||
conn.close();
|
conn.close();
|
||||||
});
|
});
|
||||||
|
*/
|
||||||
|
|
||||||
testPerm({ net: true }, async function netCloseWriteSuccess(): Promise<void> {
|
/* TODO Fix broken test.
|
||||||
|
testPerm({ net: true }, async function netCloseWriteSuccess() {
|
||||||
const addr = "127.0.0.1:4500";
|
const addr = "127.0.0.1:4500";
|
||||||
const listener = Deno.listen("tcp", addr);
|
const listener = Deno.listen("tcp", addr);
|
||||||
const closeDeferred = deferred();
|
const closeDeferred = deferred();
|
||||||
listener.accept().then(
|
listener.accept().then(async conn => {
|
||||||
async (conn): Promise<void> => {
|
await conn.write(new Uint8Array([1, 2, 3]));
|
||||||
await conn.write(new Uint8Array([1, 2, 3]));
|
await closeDeferred.promise;
|
||||||
await closeDeferred.promise;
|
conn.close();
|
||||||
conn.close();
|
});
|
||||||
}
|
|
||||||
);
|
|
||||||
const conn = await Deno.dial("tcp", addr);
|
const conn = await Deno.dial("tcp", addr);
|
||||||
conn.closeWrite(); // closing write
|
conn.closeWrite(); // closing write
|
||||||
const buf = new Uint8Array(1024);
|
const buf = new Uint8Array(1024);
|
||||||
|
@ -183,17 +166,17 @@ testPerm({ net: true }, async function netCloseWriteSuccess(): Promise<void> {
|
||||||
listener.close();
|
listener.close();
|
||||||
conn.close();
|
conn.close();
|
||||||
});
|
});
|
||||||
|
*/
|
||||||
|
|
||||||
testPerm({ net: true }, async function netDoubleCloseWrite(): Promise<void> {
|
/* TODO Fix broken test.
|
||||||
|
testPerm({ net: true }, async function netDoubleCloseWrite() {
|
||||||
const addr = "127.0.0.1:4500";
|
const addr = "127.0.0.1:4500";
|
||||||
const listener = Deno.listen("tcp", addr);
|
const listener = Deno.listen("tcp", addr);
|
||||||
const closeDeferred = deferred();
|
const closeDeferred = deferred();
|
||||||
listener.accept().then(
|
listener.accept().then(async conn => {
|
||||||
async (conn): Promise<void> => {
|
await closeDeferred.promise;
|
||||||
await closeDeferred.promise;
|
conn.close();
|
||||||
conn.close();
|
});
|
||||||
}
|
|
||||||
);
|
|
||||||
const conn = await Deno.dial("tcp", addr);
|
const conn = await Deno.dial("tcp", addr);
|
||||||
conn.closeWrite(); // closing write
|
conn.closeWrite(); // closing write
|
||||||
let err;
|
let err;
|
||||||
|
@ -210,3 +193,4 @@ testPerm({ net: true }, async function netDoubleCloseWrite(): Promise<void> {
|
||||||
listener.close();
|
listener.close();
|
||||||
conn.close();
|
conn.close();
|
||||||
});
|
});
|
||||||
|
*/
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue