Move simple index queries to CachedClient (#504)

Replaces the usage of `http-cache-reqwest` for simple index queries with
our custom cached client, removing `http-cache-reqwest` altogether.

The new cache paths are `<cache>/simple-v0/<index>/<package_name>.json`.
I could not test with a non-pypi index since i'm not aware of any other
json indices (jax and torch are both html indices).

In a future step, we can transform the response to be a
`HashMap<Version, {source_dists: Vec<(SourceDistFilename, File)>,
wheels: Vec<(WheeFilename, File)>}` (independent of python version, this
cache is used by all environments together). This should speed up cache
deserialization a bit, since we don't need to try source dist and wheel
anymore and drop incompatible dists, and it should make building the
`VersionMap` simpler. We can speed this up even further by splitting
into a version lists and the info for each version. I'm mentioning this
because deserialization was a major bottleneck in the rust part of the
old python prototype.

Fixes #481
This commit is contained in:
konsti 2023-11-28 01:11:03 +01:00 committed by GitHub
parent 1142a14f4d
commit 8855f44b5f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 74 additions and 126 deletions

View file

@ -16,7 +16,6 @@ async_zip = { workspace = true, features = ["tokio"] }
futures = { workspace = true }
fs-err = { workspace = true, features = ["tokio"] }
http = { workspace = true }
http-cache-reqwest = { workspace = true }
http-cache-semantics = { workspace = true }
reqwest = { workspace = true }
reqwest-middleware = { workspace = true }

View file

@ -8,7 +8,7 @@ use reqwest_middleware::ClientWithMiddleware;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tempfile::NamedTempFile;
use tracing::{trace, warn};
use tracing::{debug, trace, warn};
/// Either a cached client error or a (user specified) error from the callback
pub enum CachedClientError<CallbackError> {
@ -93,23 +93,23 @@ impl CachedClient {
&self,
req: Request,
cache_dir: &Path,
filename: &str,
cache_file: &str,
response_callback: Callback,
) -> Result<Payload, CachedClientError<CallBackError>>
where
Callback: FnOnce(Response) -> CallbackReturn,
CallbackReturn: Future<Output = Result<Payload, CallBackError>>,
{
let cache_file = cache_dir.join(filename);
let cached = if let Ok(cached) = fs_err::tokio::read(&cache_file).await {
let cache_path = cache_dir.join(cache_file);
let cached = if let Ok(cached) = fs_err::tokio::read(&cache_path).await {
match serde_json::from_slice::<DataWithCachePolicy<Payload>>(&cached) {
Ok(data) => Some(data),
Err(err) => {
warn!(
"Broken cache entry at {}, removing: {err}",
cache_file.display()
cache_path.display()
);
let _ = fs_err::tokio::remove_file(&cache_file).await;
let _ = fs_err::tokio::remove_file(&cache_path).await;
None
}
}
@ -129,7 +129,7 @@ impl CachedClient {
)
.await
.map_err(crate::Error::from)?;
temp_file.persist(cache_file).map_err(crate::Error::from)?;
temp_file.persist(cache_path).map_err(crate::Error::from)?;
Ok(data_with_cache_policy.data)
}
CachedResponse::ModifiedOrNew(res, cache_policy) => {
@ -148,7 +148,7 @@ impl CachedClient {
)
.await
.map_err(crate::Error::from)?;
temp_file.persist(cache_file).map_err(crate::Error::from)?;
temp_file.persist(cache_path).map_err(crate::Error::from)?;
Ok(data_with_cache_policy.data)
} else {
Ok(data)
@ -169,26 +169,24 @@ impl CachedClient {
req.try_clone()
.expect("You can't use streaming request bodies with this function"),
)?;
let url = req.url().clone();
let cached_response = if let Some(cached) = cached {
match cached
.cache_policy
.before_request(&converted_req, SystemTime::now())
{
BeforeRequest::Fresh(_) => {
trace!("Fresh cache for {}", req.url());
debug!("Found fresh response for: {url}");
CachedResponse::FreshCache(cached.data)
}
BeforeRequest::Stale { request, matches } => {
if !matches {
// This should not happen
warn!(
"Cached request doesn't match current request for {}",
req.url(),
);
// This will override the bogus cache
// This shouldn't happen; if it does, we'll override the cache.
warn!("Cached request doesn't match current request for: {url}");
return self.fresh_request(req, converted_req).await;
}
trace!("Revalidation request for {}", req.url());
debug!("Sending revalidation request for: {url}");
for header in &request.headers {
req.headers_mut().insert(header.0.clone(), header.1.clone());
converted_req
@ -211,12 +209,14 @@ impl CachedClient {
);
match after_response {
AfterResponse::NotModified(new_policy, _parts) => {
debug!("Found not-modified response for: {url}");
CachedResponse::NotModified(DataWithCachePolicy {
data: cached.data,
cache_policy: new_policy,
})
}
AfterResponse::Modified(new_policy, _parts) => {
debug!("Found modified response for: {url}");
CachedResponse::ModifiedOrNew(
res,
new_policy.is_storable().then_some(new_policy),
@ -226,6 +226,7 @@ impl CachedClient {
}
}
} else {
debug!("Not cached {url}");
// No reusable cache
self.fresh_request(req, converted_req).await?
};

View file

@ -1,8 +1,8 @@
pub use cached_client::{CachedClient, CachedClientError, DataWithCachePolicy};
pub use client::{RegistryClient, RegistryClientBuilder};
pub use error::Error;
pub use registry_client::{RegistryClient, RegistryClientBuilder};
mod cached_client;
mod client;
mod error;
mod registry_client;
mod remote_metadata;

View file

@ -5,9 +5,7 @@ use std::str::FromStr;
use async_http_range_reader::{AsyncHttpRangeReader, AsyncHttpRangeReaderError};
use async_zip::tokio::read::seek::ZipFileReader;
use futures::TryStreamExt;
use http_cache_reqwest::{CACacheManager, Cache, CacheMode, HttpCache, HttpCacheOptions};
use reqwest::{Client, ClientBuilder, Response, StatusCode};
use reqwest_middleware::ClientWithMiddleware;
use reqwest_retry::policies::ExponentialBackoff;
use reqwest_retry::RetryTransientMiddleware;
use tempfile::tempfile;
@ -19,13 +17,15 @@ use url::Url;
use distribution_filename::WheelFilename;
use distribution_types::{BuiltDist, Metadata};
use install_wheel_rs::find_dist_info;
use puffin_cache::WheelMetadataCache;
use puffin_cache::{digest, CanonicalUrl, WheelMetadataCache};
use puffin_normalize::PackageName;
use pypi_types::{File, IndexUrl, Metadata21, SimpleJson};
use crate::remote_metadata::wheel_metadata_from_remote_zip;
use crate::{CachedClient, CachedClientError, Error};
const SIMPLE_CACHE: &str = "simple-v0";
/// A builder for an [`RegistryClient`].
#[derive(Debug, Clone)]
pub struct RegistryClientBuilder {
@ -100,34 +100,18 @@ impl RegistryClientBuilder {
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(self.retries);
let retry_strategy = RetryTransientMiddleware::new_with_policy(retry_policy);
let mut client_builder =
reqwest_middleware::ClientBuilder::new(client_raw.clone()).with(retry_strategy);
client_builder = client_builder.with(Cache(HttpCache {
mode: CacheMode::Default,
manager: CACacheManager {
path: self.cache.clone(),
},
options: HttpCacheOptions::default(),
}));
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(self.retries);
let retry_strategy = RetryTransientMiddleware::new_with_policy(retry_policy);
let uncached_client = reqwest_middleware::ClientBuilder::new(client_raw.clone())
.with(retry_strategy)
.build();
let cached_client = CachedClient::new(uncached_client.clone());
let client = CachedClient::new(uncached_client.clone());
RegistryClient {
index: self.index,
extra_index: self.extra_index,
no_index: self.no_index,
client: client_builder.build(),
client_raw: client_raw.clone(),
uncached_client,
cache: self.cache,
cached_client,
client,
}
}
}
@ -140,20 +124,24 @@ pub struct RegistryClient {
pub(crate) extra_index: Vec<IndexUrl>,
/// Ignore the package index, instead relying on local archives and caches.
pub(crate) no_index: bool,
pub(crate) client: ClientWithMiddleware,
pub(crate) uncached_client: ClientWithMiddleware,
pub(crate) client: CachedClient,
/// Don't use this client, it only exists because `async_http_range_reader` needs
/// [`reqwest::Client] instead of [`reqwest_middleware::Client`]
pub(crate) client_raw: Client,
pub(crate) cached_client: CachedClient,
/// Used for the remote wheel METADATA cache
pub(crate) cache: PathBuf,
}
impl RegistryClient {
pub fn cached_client(&self) -> &CachedClient {
&self.cached_client
&self.client
}
/// Fetch a package from the `PyPI` simple API.
///
/// "simple" here refers to [PEP 503 Simple Repository API](https://peps.python.org/pep-0503/)
/// and [PEP 691 JSON-based Simple API for Python Package Indexes](https://peps.python.org/pep-0691/),
/// which the pypi json api approximately implements.
pub async fn simple(&self, package_name: PackageName) -> Result<(IndexUrl, SimpleJson), Error> {
if self.no_index {
return Err(Error::NoIndex(package_name.as_ref().to_string()));
@ -172,37 +160,54 @@ impl RegistryClient {
url
);
let cache_dir = self.cache.join(SIMPLE_CACHE).join(match index {
IndexUrl::Pypi => "pypi".to_string(),
IndexUrl::Url(url) => digest(&CanonicalUrl::new(url)),
});
let cache_file = format!("{}.json", package_name.as_ref());
let simple_request = self
.client
.uncached()
.get(url.clone())
.header("Accept-Encoding", "gzip")
.build()?;
let parse_simple_response = |response: Response| async {
let bytes = response.bytes().await?;
let data: SimpleJson = serde_json::from_slice(bytes.as_ref())
.map_err(|err| Error::from_json_err(err, url))?;
Ok(data)
};
let result = self
.client
.get_cached_with_callback(
simple_request,
&cache_dir,
&cache_file,
parse_simple_response,
)
.await;
// Fetch from the index.
match self.simple_impl(&url).await {
Ok(text) => {
let data = serde_json::from_str(&text)
.map_err(move |e| Error::from_json_err(e, url))?;
return Ok((index.clone(), data));
match result {
Ok(simple_json) => {
return Ok((index.clone(), simple_json));
}
Err(err) => {
Err(CachedClientError::Client(Error::RequestError(err))) => {
if err.status() == Some(StatusCode::NOT_FOUND) {
continue;
}
return Err(err.into());
}
Err(err) => {
return Err(err.into());
}
}
}
Err(Error::PackageNotFound(package_name.as_ref().to_string()))
}
async fn simple_impl(&self, url: &Url) -> Result<String, reqwest_middleware::Error> {
Ok(self
.client
.get(url.clone())
.header("Accept-Encoding", "gzip")
.send()
.await?
.error_for_status()?
.text()
.await?)
}
/// Fetch the metadata for a remote wheel file.
///
/// For a remote wheel, we try the following ways to fetch the metadata:
@ -269,9 +274,9 @@ impl RegistryClient {
Metadata21::parse(response.bytes().await?.as_ref())
.map_err(|err| Error::MetadataParseError(filename, url.to_string(), err))
};
let req = self.client_raw.get(url.clone()).build()?;
let req = self.client.uncached().get(url.clone()).build()?;
Ok(self
.cached_client
.client
.get_cached_with_callback(req, &cache_dir, &cache_file, response_callback)
.await?)
} else {
@ -309,9 +314,9 @@ impl RegistryClient {
Ok(metadata)
};
let req = self.client_raw.head(url.clone()).build()?;
let req = self.client.uncached().head(url.clone()).build()?;
let result = self
.cached_client
.client
.get_cached_with_callback(
req,
&cache_dir,
@ -392,7 +397,8 @@ impl RegistryClient {
}
Ok(Box::new(
self.uncached_client
self.client
.uncached()
.get(url.to_string())
.send()
.await?

View file

@ -17,7 +17,6 @@ pep440_rs = { path = "../pep440-rs" }
pep508_rs = { path = "../pep508-rs", features = ["serde"] }
platform-host = { path = "../platform-host" }
anyhow = { workspace = true }
cacache = { workspace = true }
fs-err = { workspace = true, features = ["tokio"] }
serde_json = { workspace = true }