refactor: asynchronous blob backing store (#10969)

Co-authored-by: Luca Casonato <hello@lcas.dev>
This commit is contained in:
Jimmy Wärting 2021-07-05 15:34:37 +02:00 committed by GitHub
parent ea87d860be
commit 2c0b0e45b7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 652 additions and 269 deletions

View file

@ -26,7 +26,8 @@ use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
use data_url::DataUrl;
use deno_web::BlobUrlStore;
use deno_web::BlobStore;
use http::header::CONTENT_LENGTH;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderName;
use reqwest::header::HeaderValue;
@ -130,6 +131,7 @@ pub struct FetchArgs {
headers: Vec<(ByteString, ByteString)>,
client_rid: Option<u32>,
has_body: bool,
body_length: Option<u64>,
}
#[derive(Serialize)]
@ -176,6 +178,14 @@ where
None => {
// If no body is passed, we return a writer for streaming the body.
let (tx, rx) = mpsc::channel::<std::io::Result<Vec<u8>>>(1);
// If the size of the body is known, we include a content-length
// header explicitly.
if let Some(body_size) = args.body_length {
request =
request.header(CONTENT_LENGTH, HeaderValue::from(body_size))
}
request = request.body(Body::wrap_stream(ReceiverStream::new(rx)));
let request_body_rid =
@ -207,7 +217,13 @@ where
let cancel_handle = CancelHandle::new_rc();
let cancel_handle_ = cancel_handle.clone();
let fut = async move { request.send().or_cancel(cancel_handle_).await };
let fut = async move {
request
.send()
.or_cancel(cancel_handle_)
.await
.map(|res| res.map_err(|err| type_error(err.to_string())))
};
let request_rid = state
.resource_table
@ -240,32 +256,49 @@ where
(request_rid, None, None)
}
"blob" => {
let blob_url_storage =
state.try_borrow::<BlobUrlStore>().ok_or_else(|| {
type_error("Blob URLs are not supported in this context.")
})?;
let blob_store = state.try_borrow::<BlobStore>().ok_or_else(|| {
type_error("Blob URLs are not supported in this context.")
})?;
let blob = blob_url_storage
.get(url)?
let blob = blob_store
.get_object_url(url)?
.ok_or_else(|| type_error("Blob for the given URL not found."))?;
if method != "GET" {
return Err(type_error("Blob URL fetch only supports GET method."));
}
let response = http::Response::builder()
.status(http::StatusCode::OK)
.header(http::header::CONTENT_LENGTH, blob.data.len())
.header(http::header::CONTENT_TYPE, blob.media_type)
.body(reqwest::Body::from(blob.data))?;
let cancel_handle = CancelHandle::new_rc();
let cancel_handle_ = cancel_handle.clone();
let fut = async move { Ok(Ok(Response::from(response))) };
let fut = async move {
// TODO(lucacsonato): this should be a stream!
let chunk = match blob.read_all().or_cancel(cancel_handle_).await? {
Ok(chunk) => chunk,
Err(err) => return Ok(Err(err)),
};
let res = http::Response::builder()
.status(http::StatusCode::OK)
.header(http::header::CONTENT_LENGTH, chunk.len())
.header(http::header::CONTENT_TYPE, blob.media_type.clone())
.body(reqwest::Body::from(chunk))
.map_err(|err| type_error(err.to_string()));
match res {
Ok(response) => Ok(Ok(Response::from(response))),
Err(err) => Ok(Err(err)),
}
};
let request_rid = state
.resource_table
.add(FetchRequestResource(Box::pin(fut)));
(request_rid, None, None)
let cancel_handle_rid =
state.resource_table.add(FetchCancelHandle(cancel_handle));
(request_rid, None, Some(cancel_handle_rid))
}
_ => return Err(type_error(format!("scheme '{}' not supported", scheme))),
};
@ -382,8 +415,7 @@ pub async fn op_fetch_response_read(
Ok(read)
}
type CancelableResponseResult =
Result<Result<Response, reqwest::Error>, Canceled>;
type CancelableResponseResult = Result<Result<Response, AnyError>, Canceled>;
struct FetchRequestResource(
Pin<Box<dyn Future<Output = CancelableResponseResult>>>,