chore(ext/cache): remove CachePutResource in preparation for resource rewrite (#21949)

We can use `resourceForReadableStream` to ensure that cached resources
are implemented more efficiently and remove one more resource special
case.
This commit is contained in:
Matt Mastracci 2024-01-15 13:14:54 -07:00 committed by GitHub
parent 72ecfe0419
commit 3ff80eb152
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 114 additions and 145 deletions

152
ext/cache/sqlite.rs vendored
View file

@ -1,7 +1,7 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use std::borrow::Cow;
use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::time::SystemTime;
@ -9,16 +9,19 @@ use std::time::UNIX_EPOCH;
use async_trait::async_trait;
use deno_core::error::AnyError;
use deno_core::futures::future::poll_fn;
use deno_core::parking_lot::Mutex;
use deno_core::unsync::spawn_blocking;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::BufMutView;
use deno_core::ByteString;
use deno_core::Resource;
use rusqlite::params;
use rusqlite::Connection;
use rusqlite::OptionalExtension;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use crate::deserialize_headers;
@ -94,7 +97,7 @@ impl SqliteBackedCache {
#[async_trait(?Send)]
impl Cache for SqliteBackedCache {
type CachePutResourceType = CachePutResource;
type CacheMatchResourceType = CacheResponseResource;
/// Open a cache storage. Internally, this creates a row in the
/// sqlite db if the cache doesn't exist and returns the internal id
@ -169,58 +172,63 @@ impl Cache for SqliteBackedCache {
.await?
}
async fn put_create(
async fn put(
&self,
request_response: CachePutRequest,
) -> Result<Option<Rc<CachePutResource>>, AnyError> {
resource: Option<Rc<dyn Resource>>,
) -> Result<(), AnyError> {
let db = self.connection.clone();
let cache_storage_dir = self.cache_storage_dir.clone();
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
let response_body_key = if request_response.response_has_body {
Some(hash(&format!(
if let Some(resource) = resource {
let body_key = hash(&format!(
"{}_{}",
&request_response.request_url,
now.as_nanos()
)))
} else {
None
};
if let Some(body_key) = response_body_key {
));
let responses_dir =
get_responses_dir(cache_storage_dir, request_response.cache_id);
let response_path = responses_dir.join(&body_key);
let file = tokio::fs::File::create(response_path).await?;
Ok(Some(Rc::new(CachePutResource {
file: AsyncRefCell::new(file),
db,
put_request: request_response,
response_body_key: body_key,
start_time: now.as_secs(),
})))
} else {
insert_cache_asset(db, request_response, None).await?;
Ok(None)
}
}
let mut file = tokio::fs::File::create(response_path).await?;
let mut buf = BufMutView::new(64 * 1024);
loop {
let (size, buf2) = resource.clone().read_byob(buf).await?;
if size == 0 {
break;
}
buf = buf2;
async fn put_finish(
&self,
resource: Rc<CachePutResource>,
) -> Result<(), AnyError> {
resource.write_to_cache().await
// Use poll_write to avoid holding a slice across await points
poll_fn(|cx| Pin::new(&mut file).poll_write(cx, &buf[..size])).await?;
}
file.flush().await?;
file.sync_all().await?;
assert_eq!(
insert_cache_asset(db, request_response, Some(body_key.clone()),)
.await?,
Some(body_key)
);
} else {
assert!(insert_cache_asset(db, request_response, None)
.await?
.is_none());
}
Ok(())
}
async fn r#match(
&self,
request: CacheMatchRequest,
) -> Result<
Option<(CacheMatchResponseMeta, Option<Rc<dyn Resource>>)>,
Option<(CacheMatchResponseMeta, Option<CacheResponseResource>)>,
AnyError,
> {
let db = self.connection.clone();
let cache_storage_dir = self.cache_storage_dir.clone();
let query_result = spawn_blocking(move || {
let (query_result, request) = spawn_blocking(move || {
let db = db.lock();
let result = db.query_row(
"SELECT response_body_key, response_headers, response_status, response_status_text, request_headers
@ -235,10 +243,17 @@ impl Cache for SqliteBackedCache {
let request_headers: Vec<u8> = row.get(4)?;
let response_headers: Vec<(ByteString, ByteString)> = deserialize_headers(&response_headers);
let request_headers: Vec<(ByteString, ByteString)> = deserialize_headers(&request_headers);
Ok((CacheMatchResponseMeta {request_headers, response_headers,response_status,response_status_text}, response_body_key))
Ok((CacheMatchResponseMeta {
request_headers,
response_headers,
response_status,
response_status_text},
response_body_key
))
},
);
result.optional()
// Return ownership of request to the caller
result.optional().map(|x| (x, request))
})
.await??;
@ -261,11 +276,21 @@ impl Cache for SqliteBackedCache {
let response_path =
get_responses_dir(cache_storage_dir, request.cache_id)
.join(response_body_key);
let file = tokio::fs::File::open(response_path).await?;
return Ok(Some((
cache_meta,
Some(Rc::new(CacheResponseResource::new(file))),
)));
let file = match tokio::fs::File::open(response_path).await {
Ok(file) => file,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
// Best efforts to delete the old cache item
_ = self
.delete(CacheDeleteRequest {
cache_id: request.cache_id,
request_url: request.request_url,
})
.await;
return Ok(None);
}
Err(err) => return Err(err.into()),
};
return Ok(Some((cache_meta, Some(CacheResponseResource::new(file)))));
}
Some((cache_meta, None)) => {
return Ok(Some((cache_meta, None)));
@ -339,55 +364,6 @@ impl deno_core::Resource for SqliteBackedCache {
}
}
pub struct CachePutResource {
pub db: Arc<Mutex<rusqlite::Connection>>,
pub put_request: CachePutRequest,
pub response_body_key: String,
pub file: AsyncRefCell<tokio::fs::File>,
pub start_time: u64,
}
impl CachePutResource {
async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> {
let resource = deno_core::RcRef::map(&self, |r| &r.file);
let mut file = resource.borrow_mut().await;
file.write_all(data).await?;
Ok(data.len())
}
async fn write_to_cache(self: Rc<Self>) -> Result<(), AnyError> {
let resource = deno_core::RcRef::map(&self, |r| &r.file);
let mut file = resource.borrow_mut().await;
file.flush().await?;
file.sync_all().await?;
let maybe_body_key = insert_cache_asset(
self.db.clone(),
self.put_request.clone(),
Some(self.response_body_key.clone()),
)
.await?;
match maybe_body_key {
Some(key) => {
assert_eq!(key, self.response_body_key);
Ok(())
}
// This should never happen because we will always have
// body key associated with CachePutResource
None => Err(deno_core::anyhow::anyhow!(
"unexpected: response body key is None"
)),
}
}
}
impl Resource for CachePutResource {
fn name(&self) -> Cow<str> {
"CachePutResource".into()
}
deno_core::impl_writable!();
}
pub struct CacheResponseResource {
file: AsyncRefCell<tokio::fs::File>,
}