feat(core): streams (#12596)

This allows resources to be "streams" by implementing read/write/shutdown. These streams are implicit since their nature (read/write/duplex) isn't known until called, but we could easily add another method to explicitly tag resources as streams.

`op_read/op_write/op_shutdown` are now builtin ops provided by `deno_core`

Note: this current implementation is simple & straightforward but it results in an additional alloc per read/write call

Closes #12556
This commit is contained in:
Aaron O'Mullan 2021-11-09 19:26:17 +01:00 committed by GitHub
parent 1eae6c139e
commit 375ce63c63
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 258 additions and 304 deletions

View file

@ -13,6 +13,7 @@ use deno_core::op_async;
use deno_core::op_sync;
use deno_core::url::Url;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
@ -84,8 +85,6 @@ where
.ops(vec![
("op_fetch", op_sync(op_fetch::<FP, FH>)),
("op_fetch_send", op_async(op_fetch_send)),
("op_fetch_request_write", op_async(op_fetch_request_write)),
("op_fetch_response_read", op_async(op_fetch_response_read)),
(
"op_fetch_custom_client",
op_sync(op_fetch_custom_client::<FP>),
@ -420,42 +419,6 @@ pub async fn op_fetch_send(
})
}
pub async fn op_fetch_request_write(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
data: ZeroCopyBuf,
) -> Result<(), AnyError> {
let buf = data.to_vec();
let resource = state
.borrow()
.resource_table
.get::<FetchRequestBodyResource>(rid)?;
let body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
let cancel = RcRef::map(resource, |r| &r.cancel);
body.send(Ok(buf)).or_cancel(cancel).await?.map_err(|_| {
type_error("request body receiver not connected (request closed)")
})?;
Ok(())
}
pub async fn op_fetch_response_read(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
data: ZeroCopyBuf,
) -> Result<usize, AnyError> {
let resource = state
.borrow()
.resource_table
.get::<FetchResponseBodyResource>(rid)?;
let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await;
let cancel = RcRef::map(resource, |r| &r.cancel);
let mut buf = data.clone();
let read = reader.read(&mut buf).try_or_cancel(cancel).await?;
Ok(read)
}
type CancelableResponseResult = Result<Result<Response, AnyError>, Canceled>;
struct FetchRequestResource(
@ -490,6 +453,20 @@ impl Resource for FetchRequestBodyResource {
"fetchRequestBody".into()
}
fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
Box::pin(async move {
let data = buf.to_vec();
let len = data.len();
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel);
body.send(Ok(data)).or_cancel(cancel).await?.map_err(|_| {
type_error("request body receiver not connected (request closed)")
})?;
Ok(len)
})
}
fn close(self: Rc<Self>) {
self.cancel.cancel()
}
@ -508,6 +485,15 @@ impl Resource for FetchResponseBodyResource {
"fetchResponseBody".into()
}
fn read(self: Rc<Self>, mut buf: ZeroCopyBuf) -> AsyncResult<usize> {
Box::pin(async move {
let mut reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel);
let read = reader.read(&mut buf).try_or_cancel(cancel).await?;
Ok(read)
})
}
fn close(self: Rc<Self>) {
self.cancel.cancel()
}