Use futures 0.3 API (#3358)

This commit is contained in:
Bartek Iwańczuk 2019-11-17 01:17:47 +01:00 committed by Ry Dahl
parent cb00fd6e98
commit 8f9a942cb9
36 changed files with 1463 additions and 1018 deletions

View file

@ -8,11 +8,15 @@ use crate::fs as deno_fs;
use crate::ops::json_op;
use crate::state::ThreadSafeState;
use deno::*;
use futures::Future;
use futures::Poll;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use std;
use std::convert::From;
use std::future::Future;
use std::io::SeekFrom;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
@ -88,19 +92,21 @@ fn op_open(
}
let is_sync = args.promise_id.is_none();
let op = open_options.open(filename).map_err(ErrBox::from).and_then(
move |fs_file| {
let mut table = state_.lock_resource_table();
let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file)));
futures::future::ok(json!(rid))
},
);
let op = futures::compat::Compat01As03::new(tokio::prelude::Future::map_err(
open_options.open(filename),
ErrBox::from,
))
.and_then(move |fs_file| {
let mut table = state_.lock_resource_table();
let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file)));
futures::future::ok(json!(rid))
});
if is_sync {
let buf = op.wait()?;
let buf = futures::executor::block_on(op)?;
Ok(JsonOp::Sync(buf))
} else {
Ok(JsonOp::Async(Box::new(op)))
Ok(JsonOp::Async(op.boxed()))
}
}
@ -128,21 +134,27 @@ pub struct SeekFuture {
}
impl Future for SeekFuture {
type Item = u64;
type Error = ErrBox;
type Output = Result<u64, ErrBox>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut table = self.state.lock_resource_table();
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
let mut table = inner.state.lock_resource_table();
let resource = table
.get_mut::<StreamResource>(self.rid)
.get_mut::<StreamResource>(inner.rid)
.ok_or_else(bad_resource)?;
let tokio_file = match resource {
StreamResource::FsFile(ref mut file) => file,
_ => return Err(bad_resource()),
_ => return Poll::Ready(Err(bad_resource())),
};
tokio_file.poll_seek(self.seek_from).map_err(ErrBox::from)
use tokio::prelude::Async::*;
match tokio_file.poll_seek(inner.seek_from).map_err(ErrBox::from) {
Ok(Ready(v)) => Poll::Ready(Ok(v)),
Err(err) => Poll::Ready(Err(err)),
Ok(NotReady) => Poll::Pending,
}
}
}
@ -185,9 +197,9 @@ fn op_seek(
let op = fut.and_then(move |_| futures::future::ok(json!({})));
if args.promise_id.is_none() {
let buf = op.wait()?;
let buf = futures::executor::block_on(op)?;
Ok(JsonOp::Sync(buf))
} else {
Ok(JsonOp::Async(Box::new(op)))
Ok(JsonOp::Async(op.boxed()))
}
}