upgrade: Tokio 0.2 (#3418)

This commit is contained in:
Bartek Iwańczuk 2019-12-30 14:57:17 +01:00 committed by GitHub
parent df1665a8fc
commit 46d76a7562
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 850 additions and 943 deletions

1005
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -28,12 +28,13 @@ deno_typescript = { path = "../deno_typescript", version = "0.27.0" }
ansi_term = "0.12.1" ansi_term = "0.12.1"
atty = "0.2.13" atty = "0.2.13"
base64 = "0.11.0" base64 = "0.11.0"
bytes = "0.5"
byteorder = "1.3.2" byteorder = "1.3.2"
clap = "2.33.0" clap = "2.33.0"
dirs = "2.0.2" dirs = "2.0.2"
dlopen = "0.1.8" dlopen = "0.1.8"
futures = { version = "0.3", features = [ "compat", "io-compat" ] } futures = { version = "0.3", features = [ "compat", "io-compat" ] }
http = "0.1.19" http = "0.2"
hyper = "0.12.35" hyper = "0.12.35"
hyper-rustls = "0.17.1" hyper-rustls = "0.17.1"
indexmap = "1.3.0" indexmap = "1.3.0"
@ -43,7 +44,7 @@ log = "0.4.8"
rand = "0.7.2" rand = "0.7.2"
regex = "1.3.1" regex = "1.3.1"
remove_dir_all = "0.5.2" remove_dir_all = "0.5.2"
reqwest = { version = "0.9.22", default-features = false, features = ["rustls-tls"] } reqwest = { git = "https://github.com/seanmonstar/reqwest.git", rev = "0ab5df3", features = ["rustls-tls", "stream"] }
ring = "0.16.9" ring = "0.16.9"
rustyline = "5.0.4" rustyline = "5.0.4"
serde = { version = "1.0.102", features = ["derive"] } serde = { version = "1.0.102", features = ["derive"] }
@ -53,11 +54,10 @@ source-map-mappings = "0.5.0"
sys-info = "0.5.8" sys-info = "0.5.8"
tempfile = "3.1.0" tempfile = "3.1.0"
termcolor = "1.0.5" termcolor = "1.0.5"
tokio = "0.1.22" tokio = { version = "0.2.6", features = ["full"] }
tokio-executor = "0.1.8" tokio-executor = "0.1.8"
tokio-process = "0.2.4" tokio-rustls = "0.12.0"
tokio-rustls = "0.10.2" url = "2.1"
url = "1.7.2"
utime = "0.2.1" utime = "0.2.1"
webpki = "0.21.0" webpki = "0.21.0"
webpki-roots = "0.17.0" webpki-roots = "0.17.0"

View file

@ -211,6 +211,7 @@ impl GetErrorKind for url::ParseError {
} }
RelativeUrlWithoutBase => ErrorKind::RelativeUrlWithoutBase, RelativeUrlWithoutBase => ErrorKind::RelativeUrlWithoutBase,
SetHostOnCannotBeABaseUrl => ErrorKind::SetHostOnCannotBeABaseUrl, SetHostOnCannotBeABaseUrl => ErrorKind::SetHostOnCannotBeABaseUrl,
_ => ErrorKind::Other,
} }
} }
} }
@ -231,7 +232,7 @@ impl GetErrorKind for reqwest::Error {
fn kind(&self) -> ErrorKind { fn kind(&self) -> ErrorKind {
use self::GetErrorKind as Get; use self::GetErrorKind as Get;
match self.get_ref() { match self.source() {
Some(err_ref) => None Some(err_ref) => None
.or_else(|| err_ref.downcast_ref::<hyper::Error>().map(Get::kind)) .or_else(|| err_ref.downcast_ref::<hyper::Error>().map(Get::kind))
.or_else(|| err_ref.downcast_ref::<url::ParseError>().map(Get::kind)) .or_else(|| err_ref.downcast_ref::<url::ParseError>().map(Get::kind))
@ -242,7 +243,7 @@ impl GetErrorKind for reqwest::Error {
.map(Get::kind) .map(Get::kind)
}) })
.unwrap_or_else(|| ErrorKind::HttpOther), .unwrap_or_else(|| ErrorKind::HttpOther),
_ => ErrorKind::HttpOther, None => ErrorKind::HttpOther,
} }
} }
} }

View file

@ -13,7 +13,7 @@ use futures::channel::oneshot;
use futures::future::FutureExt; use futures::future::FutureExt;
use std::future::Future; use std::future::Future;
use std::time::Instant; use std::time::Instant;
use tokio::timer::Delay; use tokio;
#[derive(Default)] #[derive(Default)]
pub struct GlobalTimer { pub struct GlobalTimer {
@ -43,8 +43,7 @@ impl GlobalTimer {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.tx = Some(tx); self.tx = Some(tx);
let delay = futures::compat::Compat01As03::new(Delay::new(deadline)) let delay = tokio::time::delay_until(deadline.into());
.map_err(|err| panic!("Unexpected error in timeout {:?}", err));
let rx = rx let rx = rx
.map_err(|err| panic!("Unexpected error in receiving channel {:?}", err)); .map_err(|err| panic!("Unexpected error in receiving channel {:?}", err));

View file

@ -1,28 +1,31 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use bytes::Bytes;
use futures::io::AsyncRead; use futures::Stream;
use futures::stream::StreamExt; use futures::StreamExt;
use reqwest::r#async::Chunk; use reqwest;
use reqwest::r#async::Decoder;
use std::cmp::min; use std::cmp::min;
use std::io; use std::io;
use std::io::Read; use std::io::Read;
use std::pin::Pin; use std::pin::Pin;
use std::task::Context; use std::task::Context;
use std::task::Poll; use std::task::Poll;
use tokio::io::AsyncRead;
/// Wraps `reqwest::Decoder` so that it can be exposed as an `AsyncRead` and integrated // TODO(bartlomieju): most of this stuff can be moved to `cli/ops/fetch.rs`
type ReqwestStream = Pin<Box<dyn Stream<Item = reqwest::Result<Bytes>> + Send>>;
/// Wraps `ReqwestStream` so that it can be exposed as an `AsyncRead` and integrated
/// into resources more easily. /// into resources more easily.
pub struct HttpBody { pub struct HttpBody {
decoder: futures::compat::Compat01As03<Decoder>, stream: ReqwestStream,
chunk: Option<Chunk>, chunk: Option<Bytes>,
pos: usize, pos: usize,
} }
impl HttpBody { impl HttpBody {
pub fn from(body: Decoder) -> Self { pub fn from(body: ReqwestStream) -> Self {
Self { Self {
decoder: futures::compat::Compat01As03::new(body), stream: body,
chunk: None, chunk: None,
pos: 0, pos: 0,
} }
@ -65,10 +68,10 @@ impl AsyncRead for HttpBody {
assert_eq!(inner.pos, 0); assert_eq!(inner.pos, 0);
} }
let p = inner.decoder.poll_next_unpin(cx); let p = inner.stream.poll_next_unpin(cx);
match p { match p {
Poll::Ready(Some(Err(e))) => Poll::Ready(Err( Poll::Ready(Some(Err(e))) => Poll::Ready(Err(
// TODO Need to map hyper::Error into std::io::Error. // TODO(bartlomieju): rewrite it to use ErrBox
io::Error::new(io::ErrorKind::Other, e), io::Error::new(io::ErrorKind::Other, e),
)), )),
Poll::Ready(Some(Ok(chunk))) => { Poll::Ready(Some(Ok(chunk))) => {

View file

@ -3,18 +3,15 @@ use crate::deno_error;
use crate::deno_error::DenoError; use crate::deno_error::DenoError;
use crate::version; use crate::version;
use deno::ErrBox; use deno::ErrBox;
use futures::future;
use futures::future::FutureExt; use futures::future::FutureExt;
use futures::future::TryFutureExt;
use reqwest; use reqwest;
use reqwest::header::HeaderMap; use reqwest::header::HeaderMap;
use reqwest::header::CONTENT_TYPE; use reqwest::header::CONTENT_TYPE;
use reqwest::header::LOCATION; use reqwest::header::LOCATION;
use reqwest::header::USER_AGENT; use reqwest::header::USER_AGENT;
use reqwest::r#async::Client; use reqwest::redirect::Policy;
use reqwest::RedirectPolicy; use reqwest::Client;
use std::future::Future; use std::future::Future;
use std::pin::Pin;
use url::Url; use url::Url;
/// Create new instance of async reqwest::Client. This client supports /// Create new instance of async reqwest::Client. This client supports
@ -26,9 +23,9 @@ pub fn get_client() -> Client {
format!("Deno/{}", version::DENO).parse().unwrap(), format!("Deno/{}", version::DENO).parse().unwrap(),
); );
Client::builder() Client::builder()
.redirect(RedirectPolicy::none()) .redirect(Policy::none())
.default_headers(headers) .default_headers(headers)
.use_sys_proxy() .use_rustls_tls()
.build() .build()
.unwrap() .unwrap()
} }
@ -75,17 +72,12 @@ pub enum FetchOnceResult {
pub fn fetch_string_once( pub fn fetch_string_once(
url: &Url, url: &Url,
) -> impl Future<Output = Result<FetchOnceResult, ErrBox>> { ) -> impl Future<Output = Result<FetchOnceResult, ErrBox>> {
type FetchAttempt = (Option<String>, Option<String>, Option<FetchOnceResult>);
let url = url.clone(); let url = url.clone();
let client = get_client(); let client = get_client();
futures::compat::Compat01As03::new(client.get(url.clone()).send()) let fut = async move {
.map_err(ErrBox::from) let response = client.get(url.clone()).send().await?;
.and_then(
move |mut response| -> Pin<
Box<dyn Future<Output = Result<FetchAttempt, ErrBox>> + Send>,
> {
if response.status().is_redirection() { if response.status().is_redirection() {
let location_string = response let location_string = response
.headers() .headers()
@ -96,26 +88,17 @@ pub fn fetch_string_once(
debug!("Redirecting to {:?}...", &location_string); debug!("Redirecting to {:?}...", &location_string);
let new_url = resolve_url_from_location(&url, location_string); let new_url = resolve_url_from_location(&url, location_string);
// Boxed trait object turns out to be the savior for 2+ types yielding same results. return Ok(FetchOnceResult::Redirect(new_url));
return futures::future::try_join3(
future::ok(None),
future::ok(None),
future::ok(Some(FetchOnceResult::Redirect(new_url))),
)
.boxed();
} }
if response.status().is_client_error() if response.status().is_client_error()
|| response.status().is_server_error() || response.status().is_server_error()
{ {
return future::err( let err = DenoError::new(
DenoError::new(
deno_error::ErrorKind::Other, deno_error::ErrorKind::Other,
format!("Import '{}' failed: {}", &url, response.status()), format!("Import '{}' failed: {}", &url, response.status()),
) );
.into(), return Err(err.into());
)
.boxed();
} }
let content_type = response let content_type = response
@ -123,29 +106,11 @@ pub fn fetch_string_once(
.get(CONTENT_TYPE) .get(CONTENT_TYPE)
.map(|content_type| content_type.to_str().unwrap().to_owned()); .map(|content_type| content_type.to_str().unwrap().to_owned());
let body = futures::compat::Compat01As03::new(response.text()) let body = response.text().await?;
.map_ok(Some) return Ok(FetchOnceResult::Code(body, content_type));
.map_err(ErrBox::from); };
futures::future::try_join3( fut.boxed()
body,
future::ok(content_type),
future::ok(None),
)
.boxed()
},
)
.and_then(move |(maybe_code, maybe_content_type, maybe_redirect)| {
if let Some(redirect) = maybe_redirect {
future::ok(redirect)
} else {
// maybe_code should always contain code here!
future::ok(FetchOnceResult::Code(
maybe_code.unwrap(),
maybe_content_type,
))
}
})
} }
#[cfg(test)] #[cfg(test)]

View file

@ -166,7 +166,10 @@ testPerm({ read: true, net: true }, async function dialAndListenTLS(): Promise<
assert(conn.remoteAddr != null); assert(conn.remoteAddr != null);
assert(conn.localAddr != null); assert(conn.localAddr != null);
await conn.write(response); await conn.write(response);
// TODO(bartlomieju): this might be a bug
setTimeout(() => {
conn.close(); conn.close();
}, 0);
} }
); );

View file

@ -7,7 +7,7 @@ use crate::ops::json_op;
use crate::state::ThreadSafeState; use crate::state::ThreadSafeState;
use deno::*; use deno::*;
use futures::future::FutureExt; use futures::future::FutureExt;
use futures::future::TryFutureExt; use futures::StreamExt;
use http::header::HeaderName; use http::header::HeaderName;
use http::header::HeaderValue; use http::header::HeaderValue;
use http::Method; use http::Method;
@ -56,9 +56,9 @@ pub fn op_fetch(
} }
debug!("Before fetch {}", url); debug!("Before fetch {}", url);
let state_ = state.clone(); let state_ = state.clone();
let future = futures::compat::Compat01As03::new(request.send())
.map_err(ErrBox::from) let future = async move {
.and_then(move |res| { let res = request.send().await?;
debug!("Fetch response {}", url); debug!("Fetch response {}", url);
let status = res.status(); let status = res.status();
let mut res_headers = Vec::new(); let mut res_headers = Vec::new();
@ -66,7 +66,7 @@ pub fn op_fetch(
res_headers.push((key.to_string(), val.to_str().unwrap().to_owned())); res_headers.push((key.to_string(), val.to_str().unwrap().to_owned()));
} }
let body = HttpBody::from(res.into_body()); let body = HttpBody::from(res.bytes_stream().boxed());
let mut table = state_.lock_resource_table(); let mut table = state_.lock_resource_table();
let rid = table.add( let rid = table.add(
"httpBody", "httpBody",
@ -80,8 +80,8 @@ pub fn op_fetch(
"headers": res_headers "headers": res_headers
}); });
futures::future::ok(json_res) Ok(json_res)
}); };
Ok(JsonOp::Async(future.boxed())) Ok(JsonOp::Async(future.boxed()))
} }

View file

@ -9,14 +9,9 @@ use crate::ops::json_op;
use crate::state::ThreadSafeState; use crate::state::ThreadSafeState;
use deno::*; use deno::*;
use futures::future::FutureExt; use futures::future::FutureExt;
use futures::future::TryFutureExt;
use std; use std;
use std::convert::From; use std::convert::From;
use std::future::Future;
use std::io::SeekFrom; use std::io::SeekFrom;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio; use tokio;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) { pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
@ -92,21 +87,19 @@ fn op_open(
} }
let is_sync = args.promise_id.is_none(); let is_sync = args.promise_id.is_none();
let op = futures::compat::Compat01As03::new(tokio::prelude::Future::map_err(
open_options.open(filename), let fut = async move {
ErrBox::from, let fs_file = open_options.open(filename).await?;
))
.and_then(move |fs_file| {
let mut table = state_.lock_resource_table(); let mut table = state_.lock_resource_table();
let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file))); let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file)));
futures::future::ok(json!(rid)) Ok(json!(rid))
}); };
if is_sync { if is_sync {
let buf = futures::executor::block_on(op)?; let buf = futures::executor::block_on(fut)?;
Ok(JsonOp::Sync(buf)) Ok(JsonOp::Sync(buf))
} else { } else {
Ok(JsonOp::Async(op.boxed())) Ok(JsonOp::Async(fut.boxed()))
} }
} }
@ -127,37 +120,6 @@ fn op_close(
Ok(JsonOp::Sync(json!({}))) Ok(JsonOp::Sync(json!({})))
} }
pub struct SeekFuture {
seek_from: SeekFrom,
rid: ResourceId,
state: ThreadSafeState,
}
impl Future for SeekFuture {
type Output = Result<u64, ErrBox>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
let mut table = inner.state.lock_resource_table();
let resource = table
.get_mut::<StreamResource>(inner.rid)
.ok_or_else(bad_resource)?;
let tokio_file = match resource {
StreamResource::FsFile(ref mut file) => file,
_ => return Poll::Ready(Err(bad_resource())),
};
use tokio::prelude::Async::*;
match tokio_file.poll_seek(inner.seek_from).map_err(ErrBox::from) {
Ok(Ready(v)) => Poll::Ready(Ok(v)),
Err(err) => Poll::Ready(Err(err)),
Ok(NotReady) => Poll::Pending,
}
}
}
#[derive(Deserialize)] #[derive(Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
struct SeekArgs { struct SeekArgs {
@ -189,17 +151,26 @@ fn op_seek(
} }
}; };
let fut = SeekFuture { let mut table = state.lock_resource_table();
state: state.clone(), let resource = table
seek_from, .get_mut::<StreamResource>(rid)
rid, .ok_or_else(bad_resource)?;
let tokio_file = match resource {
StreamResource::FsFile(ref mut file) => file,
_ => return Err(bad_resource()),
};
let mut file = futures::executor::block_on(tokio_file.try_clone())?;
let fut = async move {
file.seek(seek_from).await?;
Ok(json!({}))
}; };
let op = fut.and_then(move |_| futures::future::ok(json!({})));
if args.promise_id.is_none() { if args.promise_id.is_none() {
let buf = futures::executor::block_on(op)?; let buf = futures::executor::block_on(fut)?;
Ok(JsonOp::Sync(buf)) Ok(JsonOp::Sync(buf))
} else { } else {
Ok(JsonOp::Async(op.boxed())) Ok(JsonOp::Async(fut.boxed()))
} }
} }

View file

@ -8,18 +8,15 @@ use deno::ErrBox;
use deno::Resource; use deno::Resource;
use deno::*; use deno::*;
use futures; use futures;
use futures::compat::AsyncRead01CompatExt;
use futures::compat::AsyncWrite01CompatExt;
use futures::future::FutureExt; use futures::future::FutureExt;
use futures::io::{AsyncRead, AsyncWrite};
use std; use std;
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::task::Context; use std::task::Context;
use std::task::Poll; use std::task::Poll;
use tokio; use tokio;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio_process;
use tokio_rustls::client::TlsStream as ClientTlsStream; use tokio_rustls::client::TlsStream as ClientTlsStream;
use tokio_rustls::server::TlsStream as ServerTlsStream; use tokio_rustls::server::TlsStream as ServerTlsStream;
@ -86,9 +83,9 @@ pub enum StreamResource {
ServerTlsStream(Box<ServerTlsStream<TcpStream>>), ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
ClientTlsStream(Box<ClientTlsStream<TcpStream>>), ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
HttpBody(Box<HttpBody>), HttpBody(Box<HttpBody>),
ChildStdin(tokio_process::ChildStdin), ChildStdin(tokio::process::ChildStdin),
ChildStdout(tokio_process::ChildStdout), ChildStdout(tokio::process::ChildStdout),
ChildStderr(tokio_process::ChildStderr), ChildStderr(tokio::process::ChildStderr),
} }
impl Resource for StreamResource {} impl Resource for StreamResource {}
@ -111,22 +108,14 @@ impl DenoAsyncRead for StreamResource {
) -> Poll<Result<usize, ErrBox>> { ) -> Poll<Result<usize, ErrBox>> {
let inner = self.get_mut(); let inner = self.get_mut();
let mut f: Box<dyn AsyncRead + Unpin> = match inner { let mut f: Box<dyn AsyncRead + Unpin> = match inner {
StreamResource::FsFile(f) => Box::new(AsyncRead01CompatExt::compat(f)), StreamResource::FsFile(f) => Box::new(f),
StreamResource::Stdin(f) => Box::new(AsyncRead01CompatExt::compat(f)), StreamResource::Stdin(f) => Box::new(f),
StreamResource::TcpStream(f) => Box::new(AsyncRead01CompatExt::compat(f)), StreamResource::TcpStream(f) => Box::new(f),
StreamResource::ClientTlsStream(f) => { StreamResource::ClientTlsStream(f) => Box::new(f),
Box::new(AsyncRead01CompatExt::compat(f)) StreamResource::ServerTlsStream(f) => Box::new(f),
} StreamResource::ChildStdout(f) => Box::new(f),
StreamResource::ServerTlsStream(f) => { StreamResource::ChildStderr(f) => Box::new(f),
Box::new(AsyncRead01CompatExt::compat(f))
}
StreamResource::HttpBody(f) => Box::new(f), StreamResource::HttpBody(f) => Box::new(f),
StreamResource::ChildStdout(f) => {
Box::new(AsyncRead01CompatExt::compat(f))
}
StreamResource::ChildStderr(f) => {
Box::new(AsyncRead01CompatExt::compat(f))
}
_ => { _ => {
return Poll::Ready(Err(bad_resource())); return Poll::Ready(Err(bad_resource()));
} }
@ -145,6 +134,7 @@ impl DenoAsyncRead for StreamResource {
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
enum IoState { enum IoState {
Pending, Pending,
Flush,
Done, Done,
} }
@ -237,6 +227,11 @@ pub trait DenoAsyncWrite {
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context, cx: &mut Context,
) -> Poll<Result<(), ErrBox>>; ) -> Poll<Result<(), ErrBox>>;
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<(), ErrBox>>;
} }
impl DenoAsyncWrite for StreamResource { impl DenoAsyncWrite for StreamResource {
@ -247,21 +242,13 @@ impl DenoAsyncWrite for StreamResource {
) -> Poll<Result<usize, ErrBox>> { ) -> Poll<Result<usize, ErrBox>> {
let inner = self.get_mut(); let inner = self.get_mut();
let mut f: Box<dyn AsyncWrite + Unpin> = match inner { let mut f: Box<dyn AsyncWrite + Unpin> = match inner {
StreamResource::FsFile(f) => Box::new(AsyncWrite01CompatExt::compat(f)), StreamResource::FsFile(f) => Box::new(f),
StreamResource::Stdout(f) => Box::new(AsyncWrite01CompatExt::compat(f)), StreamResource::Stdout(f) => Box::new(f),
StreamResource::Stderr(f) => Box::new(AsyncWrite01CompatExt::compat(f)), StreamResource::Stderr(f) => Box::new(f),
StreamResource::TcpStream(f) => { StreamResource::TcpStream(f) => Box::new(f),
Box::new(AsyncWrite01CompatExt::compat(f)) StreamResource::ClientTlsStream(f) => Box::new(f),
} StreamResource::ServerTlsStream(f) => Box::new(f),
StreamResource::ClientTlsStream(f) => { StreamResource::ChildStdin(f) => Box::new(f),
Box::new(AsyncWrite01CompatExt::compat(f))
}
StreamResource::ServerTlsStream(f) => {
Box::new(AsyncWrite01CompatExt::compat(f))
}
StreamResource::ChildStdin(f) => {
Box::new(AsyncWrite01CompatExt::compat(f))
}
_ => { _ => {
return Poll::Ready(Err(bad_resource())); return Poll::Ready(Err(bad_resource()));
} }
@ -276,6 +263,33 @@ impl DenoAsyncWrite for StreamResource {
} }
} }
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<(), ErrBox>> {
let inner = self.get_mut();
let mut f: Box<dyn AsyncWrite + Unpin> = match inner {
StreamResource::FsFile(f) => Box::new(f),
StreamResource::Stdout(f) => Box::new(f),
StreamResource::Stderr(f) => Box::new(f),
StreamResource::TcpStream(f) => Box::new(f),
StreamResource::ClientTlsStream(f) => Box::new(f),
StreamResource::ServerTlsStream(f) => Box::new(f),
StreamResource::ChildStdin(f) => Box::new(f),
_ => {
return Poll::Ready(Err(bad_resource()));
}
};
let r = AsyncWrite::poll_flush(Pin::new(&mut f), cx);
match r {
Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))),
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
Poll::Pending => Poll::Pending,
}
}
fn poll_close( fn poll_close(
self: Pin<&mut Self>, self: Pin<&mut Self>,
_cx: &mut Context, _cx: &mut Context,
@ -290,6 +304,7 @@ pub struct Write<T> {
buf: T, buf: T,
io_state: IoState, io_state: IoState,
state: ThreadSafeState, state: ThreadSafeState,
nwritten: i32,
} }
/// Creates a future that will write some of the buffer `buf` to /// Creates a future that will write some of the buffer `buf` to
@ -306,6 +321,7 @@ where
buf, buf,
io_state: IoState::Pending, io_state: IoState::Pending,
state: state.clone(), state: state.clone(),
nwritten: 0,
} }
} }
@ -323,10 +339,12 @@ where
panic!("poll a Read after it's done"); panic!("poll a Read after it's done");
} }
if inner.io_state == IoState::Pending {
let mut table = inner.state.lock_resource_table(); let mut table = inner.state.lock_resource_table();
let resource = table let resource = table
.get_mut::<StreamResource>(inner.rid) .get_mut::<StreamResource>(inner.rid)
.ok_or_else(bad_resource)?; .ok_or_else(bad_resource)?;
let nwritten = match DenoAsyncWrite::poll_write( let nwritten = match DenoAsyncWrite::poll_write(
Pin::new(resource), Pin::new(resource),
cx, cx,
@ -336,8 +354,29 @@ where
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
}; };
inner.io_state = IoState::Flush;
inner.nwritten = nwritten as i32;
}
// TODO(bartlomieju): this step was added during upgrade to Tokio 0.2
// and the reasons for the need to explicitly flush are not fully known.
// Figure out why it's needed and preferably remove it.
// https://github.com/denoland/deno/issues/3565
if inner.io_state == IoState::Flush {
let mut table = inner.state.lock_resource_table();
let resource = table
.get_mut::<StreamResource>(inner.rid)
.ok_or_else(bad_resource)?;
match DenoAsyncWrite::poll_flush(Pin::new(resource), cx) {
Poll::Ready(Ok(_)) => {
inner.io_state = IoState::Done; inner.io_state = IoState::Done;
Poll::Ready(Ok(nwritten as i32)) }
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending,
};
}
Poll::Ready(Ok(inner.nwritten))
} }
} }

View file

@ -9,8 +9,6 @@ use deno::Resource;
use deno::*; use deno::*;
use futures::future::FutureExt; use futures::future::FutureExt;
use futures::future::TryFutureExt; use futures::future::TryFutureExt;
use futures::stream::StreamExt;
use futures::stream::TryStreamExt;
use std; use std;
use std::convert::From; use std::convert::From;
use std::future::Future; use std::future::Future;
@ -20,7 +18,6 @@ use std::pin::Pin;
use std::task::Context; use std::task::Context;
use std::task::Poll; use std::task::Poll;
use tokio; use tokio;
use tokio::net::tcp::Incoming;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::net::TcpStream; use tokio::net::TcpStream;
@ -73,27 +70,23 @@ impl Future for Accept {
ErrBox::from(e) ErrBox::from(e)
})?; })?;
let mut listener = let listener = &mut listener_resource.listener;
futures::compat::Compat01As03::new(&mut listener_resource.listener)
.map_err(ErrBox::from);
match listener.poll_next_unpin(cx) { match listener.poll_accept(cx).map_err(ErrBox::from) {
Poll::Ready(Some(Ok(stream))) => { Poll::Ready(Ok((stream, addr))) => {
listener_resource.untrack_task(); listener_resource.untrack_task();
inner.accept_state = AcceptState::Done; inner.accept_state = AcceptState::Done;
let addr = stream.peer_addr().unwrap();
Poll::Ready(Ok((stream, addr))) Poll::Ready(Ok((stream, addr)))
} }
Poll::Pending => { Poll::Pending => {
listener_resource.track_task(cx)?; listener_resource.track_task(cx)?;
Poll::Pending Poll::Pending
} }
Poll::Ready(Some(Err(e))) => { Poll::Ready(Err(e)) => {
listener_resource.untrack_task(); listener_resource.untrack_task();
inner.accept_state = AcceptState::Done; inner.accept_state = AcceptState::Done;
Poll::Ready(Err(e)) Poll::Ready(Err(e))
} }
_ => unreachable!(),
} }
} }
} }
@ -160,32 +153,20 @@ fn op_dial(
let state_ = state.clone(); let state_ = state.clone();
state.check_net(&args.hostname, args.port)?; state.check_net(&args.hostname, args.port)?;
let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| { let op = async move {
futures::compat::Compat01As03::new(TcpStream::connect(&addr)) let addr = resolve_addr(&args.hostname, args.port).await?;
.map_err(ErrBox::from) let tcp_stream = TcpStream::connect(&addr).await?;
.and_then(move |tcp_stream| { let local_addr = tcp_stream.local_addr()?;
let local_addr = match tcp_stream.local_addr() { let remote_addr = tcp_stream.peer_addr()?;
Ok(v) => v,
Err(e) => return futures::future::err(ErrBox::from(e)),
};
let remote_addr = match tcp_stream.peer_addr() {
Ok(v) => v,
Err(e) => return futures::future::err(ErrBox::from(e)),
};
let mut table = state_.lock_resource_table(); let mut table = state_.lock_resource_table();
let rid = table let rid =
.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
futures::future::ok((rid, local_addr, remote_addr)) Ok(json!({
})
.map_err(ErrBox::from)
.and_then(move |(rid, local_addr, remote_addr)| {
futures::future::ok(json!({
"rid": rid, "rid": rid,
"localAddr": local_addr.to_string(), "localAddr": local_addr.to_string(),
"remoteAddr": remote_addr.to_string(), "remoteAddr": remote_addr.to_string(),
})) }))
}) };
});
Ok(JsonOp::Async(op.boxed())) Ok(JsonOp::Async(op.boxed()))
} }
@ -235,7 +216,7 @@ struct ListenArgs {
#[allow(dead_code)] #[allow(dead_code)]
struct TcpListenerResource { struct TcpListenerResource {
listener: Incoming, listener: TcpListener,
waker: Option<futures::task::AtomicWaker>, waker: Option<futures::task::AtomicWaker>,
local_addr: SocketAddr, local_addr: SocketAddr,
} }
@ -300,11 +281,11 @@ fn op_listen(
let addr = let addr =
futures::executor::block_on(resolve_addr(&args.hostname, args.port))?; futures::executor::block_on(resolve_addr(&args.hostname, args.port))?;
let listener = TcpListener::bind(&addr)?; let listener = futures::executor::block_on(TcpListener::bind(&addr))?;
let local_addr = listener.local_addr()?; let local_addr = listener.local_addr()?;
let local_addr_str = local_addr.to_string(); let local_addr_str = local_addr.to_string();
let listener_resource = TcpListenerResource { let listener_resource = TcpListenerResource {
listener: listener.incoming(), listener,
waker: None, waker: None,
local_addr, local_addr,
}; };

View file

@ -14,12 +14,10 @@ use std;
use std::convert::From; use std::convert::From;
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::process::Command;
use std::process::ExitStatus; use std::process::ExitStatus;
use std::task::Context; use std::task::Context;
use std::task::Poll; use std::task::Poll;
use tokio::prelude::Async; use tokio::process::Command;
use tokio_process::CommandExt;
#[cfg(unix)] #[cfg(unix)]
use std::os::unix::process::ExitStatusExt; use std::os::unix::process::ExitStatusExt;
@ -33,42 +31,21 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
i.register_op("kill", s.core_op(json_op(s.stateful_op(op_kill)))); i.register_op("kill", s.core_op(json_op(s.stateful_op(op_kill))));
} }
struct CloneFileFuture {
rid: ResourceId,
state: ThreadSafeState,
}
impl Future for CloneFileFuture {
type Output = Result<tokio::fs::File, ErrBox>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
let mut table = inner.state.lock_resource_table();
let repr = table
.get_mut::<StreamResource>(inner.rid)
.ok_or_else(bad_resource)?;
match repr {
StreamResource::FsFile(ref mut file) => {
match file.poll_try_clone().map_err(ErrBox::from) {
Err(err) => Poll::Ready(Err(err)),
Ok(Async::Ready(v)) => Poll::Ready(Ok(v)),
Ok(Async::NotReady) => Poll::Pending,
}
}
_ => Poll::Ready(Err(bad_resource())),
}
}
}
fn clone_file( fn clone_file(
rid: u32, rid: u32,
state: &ThreadSafeState, state: &ThreadSafeState,
) -> Result<std::fs::File, ErrBox> { ) -> Result<std::fs::File, ErrBox> {
futures::executor::block_on(CloneFileFuture { let mut table = state.lock_resource_table();
rid, let repr = table
state: state.clone(), .get_mut::<StreamResource>(rid)
}) .ok_or_else(bad_resource)?;
.map(|f| f.into_std()) let file = match repr {
StreamResource::FsFile(ref mut file) => file,
_ => return Err(bad_resource()),
};
let tokio_file = futures::executor::block_on(file.try_clone())?;
let std_file = futures::executor::block_on(tokio_file.into_std());
Ok(std_file)
} }
fn subprocess_stdio_map(s: &str) -> std::process::Stdio { fn subprocess_stdio_map(s: &str) -> std::process::Stdio {
@ -95,7 +72,7 @@ struct RunArgs {
} }
struct ChildResource { struct ChildResource {
child: futures::compat::Compat01As03<tokio_process::Child>, child: tokio::process::Child,
} }
impl Resource for ChildResource {} impl Resource for ChildResource {}
@ -149,8 +126,11 @@ fn op_run(
c.stderr(subprocess_stdio_map(run_args.stderr.as_ref())); c.stderr(subprocess_stdio_map(run_args.stderr.as_ref()));
} }
// We want to kill child when it's closed
c.kill_on_drop(true);
// Spawn the command. // Spawn the command.
let mut child = c.spawn_async().map_err(ErrBox::from)?; let mut child = c.spawn()?;
let pid = child.id(); let pid = child.id();
let mut table = state_.lock_resource_table(); let mut table = state_.lock_resource_table();
@ -188,9 +168,7 @@ fn op_run(
None => None, None => None,
}; };
let child_resource = ChildResource { let child_resource = ChildResource { child };
child: futures::compat::Compat01As03::new(child),
};
let child_rid = table.add("child", Box::new(child_resource)); let child_rid = table.add("child", Box::new(child_resource));
Ok(JsonOp::Sync(json!({ Ok(JsonOp::Sync(json!({

View file

@ -10,9 +10,6 @@ use crate::state::ThreadSafeState;
use deno::Resource; use deno::Resource;
use deno::*; use deno::*;
use futures::future::FutureExt; use futures::future::FutureExt;
use futures::future::TryFutureExt;
use futures::stream::StreamExt;
use futures::stream::TryStreamExt;
use std; use std;
use std::convert::From; use std::convert::From;
use std::fs::File; use std::fs::File;
@ -24,7 +21,6 @@ use std::sync::Arc;
use std::task::Context; use std::task::Context;
use std::task::Poll; use std::task::Poll;
use tokio; use tokio;
use tokio::net::tcp::Incoming;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio_rustls::{rustls::ClientConfig, TlsConnector}; use tokio_rustls::{rustls::ClientConfig, TlsConnector};
@ -65,7 +61,7 @@ pub fn op_dial_tls(
_zero_copy: Option<PinnedBuf>, _zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let args: DialTLSArgs = serde_json::from_value(args)?; let args: DialTLSArgs = serde_json::from_value(args)?;
let cert_file = args.cert_file; let cert_file = args.cert_file.clone();
let state_ = state.clone(); let state_ = state.clone();
state.check_net(&args.hostname, args.port)?; state.check_net(&args.hostname, args.port)?;
if let Some(path) = cert_file.clone() { if let Some(path) = cert_file.clone() {
@ -77,62 +73,35 @@ pub fn op_dial_tls(
domain.push_str("localhost"); domain.push_str("localhost");
} }
let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| { let op = async move {
futures::compat::Compat01As03::new(TcpStream::connect(&addr)) let addr = resolve_addr(&args.hostname, args.port).await?;
.and_then(move |tcp_stream| { let tcp_stream = TcpStream::connect(&addr).await?;
let local_addr = match tcp_stream.local_addr() { let local_addr = tcp_stream.local_addr()?;
Ok(v) => v, let remote_addr = tcp_stream.peer_addr()?;
Err(e) => return futures::future::err(e),
};
let remote_addr = match tcp_stream.peer_addr() {
Ok(v) => v,
Err(e) => return futures::future::err(e),
};
let mut config = ClientConfig::new(); let mut config = ClientConfig::new();
config config
.root_store .root_store
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
if let Some(path) = cert_file { if let Some(path) = cert_file {
let key_file = match File::open(path) { let key_file = File::open(path)?;
Ok(v) => v,
Err(e) => return futures::future::err(e),
};
let reader = &mut BufReader::new(key_file); let reader = &mut BufReader::new(key_file);
config.root_store.add_pem_file(reader).unwrap(); config.root_store.add_pem_file(reader).unwrap();
} }
let tls_connector = TlsConnector::from(Arc::new(config)); let tls_connector = TlsConnector::from(Arc::new(config));
futures::future::ok(( let dnsname =
tls_connector, DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
tcp_stream, let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
local_addr,
remote_addr,
))
})
.map_err(ErrBox::from)
.and_then(
move |(tls_connector, tcp_stream, local_addr, remote_addr)| {
let dnsname = DNSNameRef::try_from_ascii_str(&domain)
.expect("Invalid DNS lookup");
futures::compat::Compat01As03::new(
tls_connector.connect(dnsname, tcp_stream),
)
.map_err(ErrBox::from)
.and_then(move |tls_stream| {
let mut table = state_.lock_resource_table(); let mut table = state_.lock_resource_table();
let rid = table.add( let rid = table.add(
"clientTlsStream", "clientTlsStream",
Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))), Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))),
); );
futures::future::ok(json!({ Ok(json!({
"rid": rid, "rid": rid,
"localAddr": local_addr.to_string(), "localAddr": local_addr.to_string(),
"remoteAddr": remote_addr.to_string(), "remoteAddr": remote_addr.to_string(),
})) }))
}) };
},
)
});
Ok(JsonOp::Async(op.boxed())) Ok(JsonOp::Async(op.boxed()))
} }
@ -197,7 +166,7 @@ fn load_keys(path: &str) -> Result<Vec<PrivateKey>, ErrBox> {
#[allow(dead_code)] #[allow(dead_code)]
pub struct TlsListenerResource { pub struct TlsListenerResource {
listener: Incoming, listener: TcpListener,
tls_acceptor: TlsAcceptor, tls_acceptor: TlsAcceptor,
waker: Option<futures::task::AtomicWaker>, waker: Option<futures::task::AtomicWaker>,
local_addr: SocketAddr, local_addr: SocketAddr,
@ -283,11 +252,11 @@ fn op_listen_tls(
let tls_acceptor = TlsAcceptor::from(Arc::new(config)); let tls_acceptor = TlsAcceptor::from(Arc::new(config));
let addr = let addr =
futures::executor::block_on(resolve_addr(&args.hostname, args.port))?; futures::executor::block_on(resolve_addr(&args.hostname, args.port))?;
let listener = TcpListener::bind(&addr)?; let listener = futures::executor::block_on(TcpListener::bind(&addr))?;
let local_addr = listener.local_addr()?; let local_addr = listener.local_addr()?;
let local_addr_str = local_addr.to_string(); let local_addr_str = local_addr.to_string();
let tls_listener_resource = TlsListenerResource { let tls_listener_resource = TlsListenerResource {
listener: listener.incoming(), listener,
tls_acceptor, tls_acceptor,
waker: None, waker: None,
local_addr, local_addr,
@ -343,27 +312,23 @@ impl Future for AcceptTls {
ErrBox::from(e) ErrBox::from(e)
})?; })?;
let mut listener = let listener = &mut listener_resource.listener;
futures::compat::Compat01As03::new(&mut listener_resource.listener)
.map_err(ErrBox::from);
match listener.poll_next_unpin(cx) { match listener.poll_accept(cx).map_err(ErrBox::from) {
Poll::Ready(Some(Ok(stream))) => { Poll::Ready(Ok((stream, addr))) => {
listener_resource.untrack_task(); listener_resource.untrack_task();
inner.accept_state = AcceptTlsState::Done; inner.accept_state = AcceptTlsState::Done;
let addr = stream.peer_addr().unwrap();
Poll::Ready(Ok((stream, addr))) Poll::Ready(Ok((stream, addr)))
} }
Poll::Pending => { Poll::Pending => {
listener_resource.track_task(cx)?; listener_resource.track_task(cx)?;
Poll::Pending Poll::Pending
} }
Poll::Ready(Some(Err(e))) => { Poll::Ready(Err(e)) => {
listener_resource.untrack_task(); listener_resource.untrack_task();
inner.accept_state = AcceptTlsState::Done; inner.accept_state = AcceptTlsState::Done;
Poll::Ready(Err(e)) Poll::Ready(Err(e))
} }
_ => unreachable!(),
} }
} }
} }
@ -380,47 +345,33 @@ fn op_accept_tls(
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let args: AcceptTlsArgs = serde_json::from_value(args)?; let args: AcceptTlsArgs = serde_json::from_value(args)?;
let rid = args.rid as u32; let rid = args.rid as u32;
let state1 = state.clone(); let state = state.clone();
let state2 = state.clone(); let op = async move {
let op = accept_tls(state, rid) let (tcp_stream, _socket_addr) = accept_tls(&state.clone(), rid).await?;
.and_then(move |(tcp_stream, _socket_addr)| { let local_addr = tcp_stream.local_addr()?;
let local_addr = match tcp_stream.local_addr() { let remote_addr = tcp_stream.peer_addr()?;
Ok(v) => v, let tls_acceptor = {
Err(e) => return futures::future::err(ErrBox::from(e)), let table = state.lock_resource_table();
};
let remote_addr = match tcp_stream.peer_addr() {
Ok(v) => v,
Err(e) => return futures::future::err(ErrBox::from(e)),
};
futures::future::ok((tcp_stream, local_addr, remote_addr))
})
.and_then(move |(tcp_stream, local_addr, remote_addr)| {
let table = state1.lock_resource_table();
let resource = table let resource = table
.get::<TlsListenerResource>(rid) .get::<TlsListenerResource>(rid)
.ok_or_else(bad_resource) .ok_or_else(bad_resource)
.expect("Can't find tls listener"); .expect("Can't find tls listener");
resource.tls_acceptor.clone()
futures::compat::Compat01As03::new( };
resource.tls_acceptor.accept(tcp_stream), let tls_stream = tls_acceptor.accept(tcp_stream).await?;
) let rid = {
.map_err(ErrBox::from) let mut table = state.lock_resource_table();
.and_then(move |tls_stream| { table.add(
let mut table = state2.lock_resource_table();
let rid = table.add(
"serverTlsStream", "serverTlsStream",
Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))), Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))),
); )
futures::future::ok((rid, local_addr, remote_addr)) };
}) Ok(json!({
})
.and_then(move |(rid, local_addr, remote_addr)| {
futures::future::ok(json!({
"rid": rid, "rid": rid,
"localAddr": local_addr.to_string(), "localAddr": local_addr.to_string(),
"remoteAddr": remote_addr.to_string(), "remoteAddr": remote_addr.to_string(),
})) }))
}); };
Ok(JsonOp::Async(op.boxed())) Ok(JsonOp::Async(op.boxed()))
} }

View file

@ -168,14 +168,13 @@ fn op_create_worker(
// TODO(bartlomieju): this should spawn mod execution on separate tokio task // TODO(bartlomieju): this should spawn mod execution on separate tokio task
// and block on receving message on a channel or even use sync channel /shrug // and block on receving message on a channel or even use sync channel /shrug
let (sender, receiver) = mpsc::sync_channel::<Result<(), ErrBox>>(1); let (sender, receiver) = mpsc::sync_channel::<Result<(), ErrBox>>(1);
let fut = worker let fut = async move {
let result = worker
.execute_mod_async(&module_specifier, None, false) .execute_mod_async(&module_specifier, None, false)
.then(move |result| { .await;
sender.send(result).expect("Failed to send message"); sender.send(result).expect("Failed to send message");
futures::future::ok(()) }
}) .boxed();
.boxed()
.compat();
tokio::spawn(fut); tokio::spawn(fut);
let result = receiver.recv().expect("Failed to receive message"); let result = receiver.recv().expect("Failed to receive message");

View file

@ -1,30 +1,30 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use futures;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use std::future::Future; use std::future::Future;
use tokio; use tokio;
use tokio::runtime; use tokio::runtime;
pub fn create_threadpool_runtime(
) -> Result<tokio::runtime::Runtime, tokio::io::Error> {
runtime::Builder::new()
.panic_handler(|err| std::panic::resume_unwind(err))
.build()
}
pub fn run<F>(future: F) pub fn run<F>(future: F)
where where
F: Future<Output = Result<(), ()>> + Send + 'static, F: Future<Output = Result<(), ()>> + Send + 'static,
{ {
// tokio::runtime::current_thread::run(future) let mut rt = runtime::Builder::new()
let rt = create_threadpool_runtime().expect("Unable to create Tokio runtime"); .threaded_scheduler()
rt.block_on_all(future.boxed().compat()).unwrap(); .enable_all()
.thread_name("deno")
.build()
.expect("Unable to create Tokio runtime");
rt.block_on(future).unwrap();
} }
pub fn run_on_current_thread<F>(future: F) pub fn run_on_current_thread<F>(future: F)
where where
F: Future<Output = Result<(), ()>> + Send + 'static, F: Future<Output = Result<(), ()>> + Send + 'static,
{ {
tokio::runtime::current_thread::run(future.boxed().compat()); let mut rt = runtime::Builder::new()
.basic_scheduler()
.enable_all()
.thread_name("deno")
.build()
.expect("Unable to create Tokio runtime");
rt.block_on(future).unwrap();
} }

View file

@ -420,10 +420,9 @@ mod tests {
let fut = async move { let fut = async move {
let r = worker.await; let r = worker.await;
r.unwrap(); r.unwrap();
Ok(())
}; };
tokio::spawn(fut.boxed().compat()); tokio::spawn(fut);
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
@ -453,26 +452,21 @@ mod tests {
.unwrap(); .unwrap();
let worker_ = worker.clone(); let worker_ = worker.clone();
let worker_future = worker let worker_future = async move {
.then(move |r| { let result = worker.await;
println!("workers.rs after resource close"); println!("workers.rs after resource close");
r.unwrap(); result.unwrap();
futures::future::ok(()) }
})
.shared(); .shared();
let worker_future_ = worker_future.clone(); let worker_future_ = worker_future.clone();
tokio::spawn( tokio::spawn(worker_future_);
worker_future_
.then(|_: Result<(), ()>| futures::future::ok(()))
.compat(),
);
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
let r = block_on(worker_.post_message(msg)); let r = block_on(worker_.post_message(msg));
assert!(r.is_ok()); assert!(r.is_ok());
block_on(worker_future).unwrap(); block_on(worker_future);
}) })
} }

View file

@ -20,7 +20,7 @@ lazy_static = "1.4.0"
libc = "0.2.65" libc = "0.2.65"
log = "0.4.8" log = "0.4.8"
serde_json = "1.0.41" serde_json = "1.0.41"
url = "1.7.2" url = "2.1"
[[example]] [[example]]
name = "deno_core_http_bench" name = "deno_core_http_bench"

View file

@ -272,11 +272,17 @@ async function formatSourceFiles(
for await (const { filename } of files) { for await (const { filename } of files) {
const parser = selectParser(filename); const parser = selectParser(filename);
if (parser) { if (parser) {
if (prettierOpts.write) {
formats.push(formatFile(filename, parser, prettierOpts)); formats.push(formatFile(filename, parser, prettierOpts));
} else {
await formatFile(filename, parser, prettierOpts);
}
} }
} }
if (prettierOpts.write) {
await Promise.all(formats); await Promise.all(formats);
}
exit(0); exit(0);
} }