initial implementation of zero-copy deserialization for SimpleMetadata (#1249)

(Please review this PR commit by commit.)

This PR closes an initial loop on zero-copy deserialization. That
is, provides a way to get a `Archived<SimpleMetadata>` (spelled
`OwnedArchive<SimpleMetadata>` in the code) from a `CachedClient`. The
main benefit of zero-copy deserialization is that we can read bytes
from a file, cast those bytes to a structured representation without
cost, and then start using that type as any other Rust type. The
"catch" is that the structured representation is not the actual type
you started with, but the "archived" version of it.

In order to make all this work, we ended up needing to shave a rather
large yak: we had to re-implement HTTP cache semantics. Previously,
we were using the `http-cache-semantics` crate. While it does support
Serde, it doesn't support `rkyv`. Moreover, even simple support for
`rkyv` wouldn't be enough. What we actually want is for the HTTP cache
semantics to be implemented on the *archived* type so that we can
decide whether our cached response is stale or not without needing to
do a full deserialization into the unarchived type. This is why, in
this PR, you'll see `impl ArchivedCachePolicy { ... }` instead of
`impl CachePolicy { ... }`. (The `derive(rkyv::Archive)` macro
automatically introduces the `ArchivedCachePolicy` type into the
current namespace.)

Unfortunately, this PR does not fully realize the dream that is
zero-copy deserialization. Namely, while a `CachedClient` can now
provide an `OwnedArchive<SimpleMetadata>`, the rest of our code
doesn't really make use of it. Indeed, as soon as we go to build a
`VersionMap`, we eagerly convert our archived metadata into an owned
`SimpleMetadata` via deserialization (that *isn't* zero-copy). After
this change, a lot of the work now shifts to `rkyv` deserialization
and `VersionMap` construction. More precisely, the main thing we drop
here is `CachePolicy` deserialization (which is now truly zero-copy)
and the parsing of the MessagePack format for `SimpleMetadata`. But we
are still paying for deserialization. We're just paying for it in a
different place.

This PR does seem to bring a speed-up, but it is somewhat underwhelming.
My measurements have been pretty noisy, but I get a 1.1x speedup fairly
often:

```
$ hyperfine -w5 "puffin-main pip compile --cache-dir ~/astral/tmp/cache-main ~/astral/tmp/reqs/home-assistant-reduced.in -o /dev/null" "puffin-test pip compile --cache-dir ~/astral/tmp/cache-test ~/astral/tmp/reqs/home-assistant-reduced.in -o /dev/null" ; A kang
Benchmark 1: puffin-main pip compile --cache-dir ~/astral/tmp/cache-main ~/astral/tmp/reqs/home-assistant-reduced.in -o /dev/null
  Time (mean ± σ):     164.4 ms ±  18.8 ms    [User: 427.1 ms, System: 348.6 ms]
  Range (min … max):   131.1 ms … 190.5 ms    18 runs

Benchmark 2: puffin-test pip compile --cache-dir ~/astral/tmp/cache-test ~/astral/tmp/reqs/home-assistant-reduced.in -o /dev/null
  Time (mean ± σ):     148.3 ms ±  10.2 ms    [User: 357.1 ms, System: 319.4 ms]
  Range (min … max):   136.8 ms … 184.4 ms    19 runs

Summary
  puffin-test pip compile --cache-dir ~/astral/tmp/cache-test ~/astral/tmp/reqs/home-assistant-reduced.in -o /dev/null ran
    1.11 ± 0.15 times faster than puffin-main pip compile --cache-dir ~/astral/tmp/cache-main ~/astral/tmp/reqs/home-assistant-reduced.in -o /dev/null
```

One downside is that this does increase cache size (`rkyv`'s
serialization format is not as compact as MessagePack). On disk size
increases by about 1.8x for our `simple-v0` cache.

```
$ sort-filesize cache-main
4.0K    cache-main/CACHEDIR.TAG
4.0K    cache-main/.gitignore
8.0K    cache-main/interpreter-v0
8.7M    cache-main/wheels-v0
18M     cache-main/archive-v0
59M     cache-main/simple-v0
109M    cache-main/built-wheels-v0
193M    cache-main
193M    total

$ sort-filesize cache-test
4.0K    cache-test/CACHEDIR.TAG
4.0K    cache-test/.gitignore
8.0K    cache-test/interpreter-v0
8.7M    cache-test/wheels-v0
18M     cache-test/archive-v0
107M    cache-test/simple-v0
109M    cache-test/built-wheels-v0
242M    cache-test
242M    total
```

Also, while I initially intended to do a simplistic implementation of
HTTP cache semantics, I found that everything was somewhat
inter-connected. I could have wrote code that _specifically_ only worked
with the present behavior of PyPI, but then it would need to be special
cased and everything else would need to continue to use
`http-cache-sematics`. By implementing what we need based on what Puffin
actually is (which is still less than what `http-cache-semantics` does),
we can avoid special casing and use zero-copy deserialization for our
cache policy in _all_ cases.
This commit is contained in:
Andrew Gallant 2024-02-05 16:47:53 -05:00 committed by GitHub
parent 398659a9b0
commit d4b4c21133
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 2763 additions and 429 deletions

24
Cargo.lock generated
View file

@ -1361,34 +1361,12 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "http-cache-semantics"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7aec9f678bca3f4a15194b980f20ed9bfe0dd38e8d298c65c559a93dfbd6380a"
dependencies = [
"http",
"http-serde",
"serde",
"time",
]
[[package]]
name = "http-content-range"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f0d1a8ef218a86416107794b34cc446958d9203556c312bb41eab4c924c1d2e"
[[package]]
name = "http-serde"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f560b665ad9f1572cfcaf034f7fb84338a7ce945216d64a90fd81f046a3caee"
dependencies = [
"http",
"serde",
]
[[package]]
name = "httparse"
version = "1.8.0"
@ -2584,7 +2562,6 @@ dependencies = [
"futures",
"html-escape",
"http",
"http-cache-semantics",
"insta",
"install-wheel-rs",
"pep440_rs",
@ -2868,7 +2845,6 @@ dependencies = [
"fs-err",
"futures",
"gourgeist",
"http-cache-semantics",
"indexmap 2.2.2",
"insta",
"install-wheel-rs",

View file

@ -50,7 +50,6 @@ hmac = { version = "0.12.1" }
home = { version = "0.5.9" }
html-escape = { version = "0.2.13" }
http = { version = "0.2.11" }
http-cache-semantics = { version = "1.0.2" }
indexmap = { version = "2.1.0" }
indicatif = { version = "0.17.7" }
indoc = { version = "2.0.4" }

View file

@ -488,8 +488,8 @@ pub enum CacheBucket {
/// Index responses through the simple metadata API.
///
/// Cache structure:
/// * `simple-v0/pypi/<package_name>.msgpack`
/// * `simple-v0/<digest(index_url)>/<package_name>.msgpack`
/// * `simple-v0/pypi/<package_name>.rkyv`
/// * `simple-v0/<digest(index_url)>/<package_name>.rkyv`
///
/// The response is parsed into `puffin_client::SimpleMetadata` before storage.
Simple,
@ -575,15 +575,15 @@ impl CacheBucket {
}
}
CacheBucket::Simple => {
// For `pypi` wheels, we expect a MsgPack file per package, indexed by name.
// For `pypi` wheels, we expect a rkyv file per package, indexed by name.
let root = cache.bucket(self).join(WheelCacheKind::Pypi);
summary += rm_rf(root.join(format!("{name}.msgpack")))?;
summary += rm_rf(root.join(format!("{name}.rkyv")))?;
// For alternate indices, we expect a directory for every index, followed by a
// MsgPack file per package, indexed by name.
let root = cache.bucket(self).join(WheelCacheKind::Url);
for directory in directories(root) {
summary += rm_rf(directory.join(format!("{name}.msgpack")))?;
summary += rm_rf(directory.join(format!("{name}.rkyv")))?;
}
}
CacheBucket::FlatIndex => {

View file

@ -23,7 +23,6 @@ fs-err = { workspace = true, features = ["tokio"] }
futures = { workspace = true }
html-escape = { workspace = true }
http = { workspace = true }
http-cache-semantics = { workspace = true }
reqwest = { workspace = true }
reqwest-middleware = { workspace = true }
reqwest-retry = { workspace = true }

View file

@ -1,56 +0,0 @@
use http::HeaderValue;
use rustc_hash::FxHashMap;
use std::collections::hash_map::Entry;
/// Cache headers from an HTTP response.
#[derive(Debug, Default)]
pub(crate) struct CacheHeaders(FxHashMap<Box<str>, Option<Box<str>>>);
impl CacheHeaders {
/// Parse the `Cache-Control` header from an HTTP response.
///
/// See: <https://github.com/kornelski/rusty-http-cache-semantics/blob/8fba3b9a3ddf01ba24f2d1a7994f4e9500644c1a/src/lib.rs#L51>
pub(crate) fn from_response<'a>(
headers: impl IntoIterator<Item = &'a HeaderValue>,
) -> CacheHeaders {
let mut cc = FxHashMap::<Box<str>, Option<Box<str>>>::default();
let mut is_valid = true;
for h in headers.into_iter().filter_map(|v| v.to_str().ok()) {
for part in h.split(',') {
// TODO: lame parsing
if part.trim().is_empty() {
continue;
}
let mut kv = part.splitn(2, '=');
let k = kv.next().unwrap().trim();
if k.is_empty() {
continue;
}
let v = kv.next().map(str::trim);
match cc.entry(k.into()) {
Entry::Occupied(e) => {
// When there is more than one value present for a given directive (e.g., two Expires header fields, multiple Cache-Control: max-age directives),
// the directive's value is considered invalid. Caches are encouraged to consider responses that have invalid freshness information to be stale
if e.get().as_deref() != v {
is_valid = false;
}
}
Entry::Vacant(e) => {
e.insert(v.map(|v| v.trim_matches('"')).map(From::from));
// TODO: bad unquoting
}
}
}
}
if !is_valid {
cc.insert("must-revalidate".into(), None);
}
Self(cc)
}
/// Returns `true` if the response has an `immutable` directive.
pub(crate) fn is_immutable(&self) -> bool {
self.0.contains_key("immutable")
}
}

View file

@ -1,20 +1,98 @@
use std::future::Future;
use std::time::SystemTime;
use std::{borrow::Cow, future::Future, path::Path};
use futures::FutureExt;
use http::request::Parts;
use http_cache_semantics::{AfterResponse, BeforeRequest, CachePolicy};
use reqwest::{Request, Response};
use reqwest_middleware::ClientWithMiddleware;
use rkyv::util::AlignedVec;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tracing::{debug, info_span, instrument, trace, warn, Instrument};
use url::Url;
use puffin_cache::{CacheEntry, Freshness};
use puffin_fs::write_atomic;
use crate::{cache_headers::CacheHeaders, Error, ErrorKind};
use crate::{
httpcache::{AfterResponse, BeforeRequest, CachePolicy, CachePolicyBuilder},
rkyvutil::OwnedArchive,
Error, ErrorKind,
};
/// A trait the generalizes (de)serialization at a high level.
///
/// The main purpose of this trait is to make the `CachedClient` work for
/// either serde or other mechanisms of serialization such as `rkyv`.
///
/// If you're using Serde, then unless you want to control the format, callers
/// should just use `CachedClient::get_serde`. This will use a default
/// implementation of `Cacheable` internally.
///
/// Alternatively, callers using `rkyv` should use
/// `CachedClient::get_cacheable`. If your types fit into the
/// `rkyvutil::OwnedArchive` mold, then an implementation of `Cacheable` is
/// already provided for that type.
pub trait Cacheable: Sized + Send {
/// This associated type permits customizing what the "output" type of
/// deserialization is. It can be identical to `Self`.
///
/// Typical use of this is for wrapper types used to proviate blanket trait
/// impls without hitting overlapping impl problems.
type Target;
/// Deserialize a value from bytes aligned to a 16-byte boundary.
fn from_aligned_bytes(bytes: AlignedVec) -> Result<Self::Target, crate::Error>;
/// Serialize bytes to a possibly owned byte buffer.
fn to_bytes(&self) -> Result<Cow<'_, [u8]>, crate::Error>;
/// Convert this type into its final form.
fn into_target(self) -> Self::Target;
}
/// A wrapper type that makes anything with Serde support automatically
/// implement `Cacheable`.
#[derive(Debug, Deserialize, Serialize)]
#[serde(transparent)]
pub struct SerdeCacheable<T> {
inner: T,
}
impl<T: Send + Serialize + DeserializeOwned> Cacheable for SerdeCacheable<T> {
type Target = T;
fn from_aligned_bytes(bytes: AlignedVec) -> Result<T, Error> {
Ok(rmp_serde::from_slice::<T>(&bytes).map_err(ErrorKind::Decode)?)
}
fn to_bytes(&self) -> Result<Cow<'_, [u8]>, Error> {
Ok(Cow::from(
rmp_serde::to_vec(&self.inner).map_err(ErrorKind::Encode)?,
))
}
fn into_target(self) -> Self::Target {
self.inner
}
}
/// All `OwnedArchive` values are cacheable.
impl<A> Cacheable for OwnedArchive<A>
where
A: rkyv::Archive + rkyv::Serialize<crate::rkyvutil::Serializer<4096>> + Send,
A::Archived: for<'a> rkyv::CheckBytes<rkyv::validation::validators::DefaultValidator<'a>>
+ rkyv::Deserialize<A, rkyv::de::deserializers::SharedDeserializeMap>,
{
type Target = OwnedArchive<A>;
fn from_aligned_bytes(bytes: AlignedVec) -> Result<OwnedArchive<A>, Error> {
OwnedArchive::new(bytes)
}
fn to_bytes(&self) -> Result<Cow<'_, [u8]>, Error> {
Ok(Cow::from(OwnedArchive::as_bytes(self)))
}
fn into_target(self) -> Self::Target {
self
}
}
/// Either a cached client error or a (user specified) error from the callback
#[derive(Debug)]
@ -44,285 +122,6 @@ impl<E: Into<Error>> From<CachedClientError<E>> for Error {
}
}
#[derive(Debug)]
enum CachedResponse<Payload: Serialize> {
/// The cached response is fresh without an HTTP request (e.g. immutable)
FreshCache(Payload),
/// The cached response is fresh after an HTTP request (e.g. 304 not modified)
NotModified(DataWithCachePolicy<Payload>),
/// There was no prior cached response or the cache was outdated
///
/// The cache policy is `None` if it isn't storable
ModifiedOrNew(Response, Option<Box<CachePolicy>>),
}
/// Serialize the actual payload together with its caching information.
#[derive(Debug, Deserialize, Serialize)]
pub struct DataWithCachePolicy<Payload: Serialize> {
pub data: Payload,
/// Whether the response should be considered immutable.
immutable: bool,
/// The [`CachePolicy`] is used to determine if the response is fresh or stale.
/// The policy is large (448 bytes at time of writing), so we reduce the stack size by
/// boxing it.
cache_policy: Box<CachePolicy>,
}
/// Custom caching layer over [`reqwest::Client`] using `http-cache-semantics`.
///
/// The implementation takes inspiration from the `http-cache` crate, but adds support for running
/// an async callback on the response before caching. We use this to e.g. store a
/// parsed version of the wheel metadata and for our remote zip reader. In the latter case, we want
/// to read a single file from a remote zip using range requests (so we don't have to download the
/// entire file). We send a HEAD request in the caching layer to check if the remote file has
/// changed (and if range requests are supported), and in the callback we make the actual range
/// requests if required.
///
/// Unlike `http-cache`, all outputs must be serde-able. Currently everything is json, but we can
/// transparently switch to a faster/smaller format.
///
/// Again unlike `http-cache`, the caller gets full control over the cache key with the assumption
/// that it's a file.
#[derive(Debug, Clone)]
pub struct CachedClient(ClientWithMiddleware);
impl CachedClient {
pub fn new(client: ClientWithMiddleware) -> Self {
Self(client)
}
/// The middleware is the retry strategy
pub fn uncached(&self) -> ClientWithMiddleware {
self.0.clone()
}
/// Make a cached request with a custom response transformation
///
/// If a new response was received (no prior cached response or modified on the remote), the
/// response is passed through `response_callback` and only the result is cached and returned.
/// The `response_callback` is allowed to make subsequent requests, e.g. through the uncached
/// client.
#[instrument(skip_all)]
pub async fn get_cached_with_callback<
Payload: Serialize + DeserializeOwned + Send + 'static,
CallBackError,
Callback,
CallbackReturn,
>(
&self,
req: Request,
cache_entry: &CacheEntry,
cache_control: CacheControl,
response_callback: Callback,
) -> Result<Payload, CachedClientError<CallBackError>>
where
Callback: FnOnce(Response) -> CallbackReturn,
CallbackReturn: Future<Output = Result<Payload, CallBackError>> + Send,
{
let cached = Self::read_cache(cache_entry).await;
let cached_response = self.send_cached(req, cache_control, cached).boxed().await?;
let write_cache = info_span!("write_cache", file = %cache_entry.path().display());
match cached_response {
CachedResponse::FreshCache(data) => Ok(data),
CachedResponse::NotModified(data_with_cache_policy) => {
async {
let data =
rmp_serde::to_vec(&data_with_cache_policy).map_err(ErrorKind::Encode)?;
write_atomic(cache_entry.path(), data)
.await
.map_err(ErrorKind::CacheWrite)?;
Ok(data_with_cache_policy.data)
}
.instrument(write_cache)
.await
}
CachedResponse::ModifiedOrNew(res, cache_policy) => {
let headers = CacheHeaders::from_response(res.headers().get_all("cache-control"));
let immutable = headers.is_immutable();
let data = response_callback(res)
.boxed()
.await
.map_err(|err| CachedClientError::Callback(err))?;
if let Some(cache_policy) = cache_policy {
let data_with_cache_policy = DataWithCachePolicy {
data,
immutable,
cache_policy,
};
async {
fs_err::tokio::create_dir_all(cache_entry.dir())
.await
.map_err(ErrorKind::CacheWrite)?;
let data = rmp_serde::to_vec(&data_with_cache_policy)
.map_err(ErrorKind::Encode)?;
write_atomic(cache_entry.path(), data)
.await
.map_err(ErrorKind::CacheWrite)?;
Ok(data_with_cache_policy.data)
}
.instrument(write_cache)
.await
} else {
Ok(data)
}
}
}
}
async fn read_cache<Payload: Serialize + DeserializeOwned + Send + 'static>(
cache_entry: &CacheEntry,
) -> Option<DataWithCachePolicy<Payload>> {
let read_span = info_span!("read_cache", file = %cache_entry.path().display());
let read_result = fs_err::tokio::read(cache_entry.path())
.instrument(read_span)
.await;
if let Ok(cached) = read_result {
let parse_span = info_span!(
"parse_cache",
path = %cache_entry.path().display()
);
let parse_result = tokio::task::spawn_blocking(move || {
parse_span
.in_scope(|| rmp_serde::from_slice::<DataWithCachePolicy<Payload>>(&cached))
})
.await
.expect("Tokio executor failed, was there a panic?");
match parse_result {
Ok(data) => Some(data),
Err(err) => {
warn!(
"Broken cache entry at {}, removing: {err}",
cache_entry.path().display()
);
let _ = fs_err::tokio::remove_file(&cache_entry.path()).await;
None
}
}
} else {
None
}
}
/// `http-cache-semantics` to `reqwest` wrapper
async fn send_cached<T: Serialize + DeserializeOwned>(
&self,
mut req: Request,
cache_control: CacheControl,
cached: Option<DataWithCachePolicy<T>>,
) -> Result<CachedResponse<T>, Error> {
let url = req.url().clone();
let cached_response = if let Some(cached) = cached {
// Avoid sending revalidation requests for immutable responses.
if cached.immutable && !cached.cache_policy.is_stale(SystemTime::now()) {
debug!("Found immutable response for: {url}");
return Ok(CachedResponse::FreshCache(cached.data));
}
// Apply the cache control header, if necessary.
match cache_control {
CacheControl::None => {}
CacheControl::MustRevalidate => {
req.headers_mut().insert(
http::header::CACHE_CONTROL,
http::HeaderValue::from_static("max-age=0, must-revalidate"),
);
}
}
match cached
.cache_policy
.before_request(&RequestLikeReqwest(&req), SystemTime::now())
{
BeforeRequest::Fresh(_) => {
debug!("Found fresh response for: {url}");
CachedResponse::FreshCache(cached.data)
}
BeforeRequest::Stale { request, matches } => {
self.send_cached_handle_stale(req, url, cached, &request, matches)
.await?
}
}
} else {
debug!("No cache entry for: {url}");
self.fresh_request(req).await?
};
Ok(cached_response)
}
async fn send_cached_handle_stale<T: Serialize + DeserializeOwned>(
&self,
mut req: Request,
url: Url,
cached: DataWithCachePolicy<T>,
request: &Parts,
matches: bool,
) -> Result<CachedResponse<T>, Error> {
if !matches {
// This shouldn't happen; if it does, we'll override the cache.
warn!("Cached request doesn't match current request for: {url}");
return self.fresh_request(req).await;
}
debug!("Sending revalidation request for: {url}");
for header in &request.headers {
req.headers_mut().insert(header.0.clone(), header.1.clone());
}
let res = self
.0
.execute(req.try_clone().expect("streaming requests not supported"))
.instrument(info_span!("revalidation_request", url = url.as_str()))
.await
.map_err(ErrorKind::RequestMiddlewareError)?
.error_for_status()
.map_err(ErrorKind::RequestError)?;
let after_response = cached.cache_policy.after_response(
&RequestLikeReqwest(&req),
&ResponseLikeReqwest(&res),
SystemTime::now(),
);
match after_response {
AfterResponse::NotModified(new_policy, _parts) => {
debug!("Found not-modified response for: {url}");
let headers = CacheHeaders::from_response(res.headers().get_all("cache-control"));
let immutable = headers.is_immutable();
Ok(CachedResponse::NotModified(DataWithCachePolicy {
data: cached.data,
immutable,
cache_policy: Box::new(new_policy),
}))
}
AfterResponse::Modified(new_policy, _parts) => {
debug!("Found modified response for: {url}");
Ok(CachedResponse::ModifiedOrNew(
res,
new_policy.is_storable().then(|| Box::new(new_policy)),
))
}
}
}
#[instrument(skip_all, fields(url = req.url().as_str()))]
async fn fresh_request<T: Serialize>(&self, req: Request) -> Result<CachedResponse<T>, Error> {
trace!("{} {}", req.method(), req.url());
let res = self
.0
.execute(req.try_clone().expect("streaming requests not supported"))
.await
.map_err(ErrorKind::RequestMiddlewareError)?
.error_for_status()
.map_err(ErrorKind::RequestError)?;
let cache_policy = CachePolicy::new(&RequestLikeReqwest(&req), &ResponseLikeReqwest(&res));
Ok(CachedResponse::ModifiedOrNew(
res,
cache_policy.is_storable().then(|| Box::new(cache_policy)),
))
}
}
#[derive(Debug, Clone, Copy)]
pub enum CacheControl {
/// Respect the `cache-control` header from the response.
@ -341,44 +140,515 @@ impl From<Freshness> for CacheControl {
}
}
#[derive(Debug)]
struct RequestLikeReqwest<'a>(&'a Request);
/// Custom caching layer over [`reqwest::Client`].
///
/// The implementation takes inspiration from the `http-cache` crate, but adds support for running
/// an async callback on the response before caching. We use this to e.g. store a
/// parsed version of the wheel metadata and for our remote zip reader. In the latter case, we want
/// to read a single file from a remote zip using range requests (so we don't have to download the
/// entire file). We send a HEAD request in the caching layer to check if the remote file has
/// changed (and if range requests are supported), and in the callback we make the actual range
/// requests if required.
///
/// Unlike `http-cache`, all outputs must be serializable/deserializable in some way, by
/// implementing the `Cacheable` trait.
///
/// Again unlike `http-cache`, the caller gets full control over the cache key with the assumption
/// that it's a file.
#[derive(Debug, Clone)]
pub struct CachedClient(ClientWithMiddleware);
impl<'a> http_cache_semantics::RequestLike for RequestLikeReqwest<'a> {
fn uri(&self) -> http::uri::Uri {
// This converts from a url::Url (as returned by reqwest::Request::url)
// to a http::uri::Uri. The conversion requires parsing, but this is
// only called ~once per HTTP request. We can afford it.
self.0
.url()
.as_str()
.parse()
.expect("reqwest::Request::url always returns a valid URL")
impl CachedClient {
pub fn new(client: ClientWithMiddleware) -> Self {
Self(client)
}
fn is_same_uri(&self, other: &http::uri::Uri) -> bool {
// At time of writing, I saw no way to cheaply compare a http::uri::Uri
// with a url::Url. We can at least avoid parsing anything, and
// Url::as_str() is free. In practice though, this routine is called
// ~once per HTTP request. We can afford it. (And it looks like
// http::uri::Uri's PartialEq<str> implementation has been tuned.)
self.0.url().as_str() == *other
/// The middleware is the retry strategy
pub fn uncached(&self) -> ClientWithMiddleware {
self.0.clone()
}
fn method(&self) -> &http::method::Method {
self.0.method()
/// Make a cached request with a custom response transformation
/// while using serde to (de)serialize cached responses.
///
/// If a new response was received (no prior cached response or modified
/// on the remote), the response is passed through `response_callback` and
/// only the result is cached and returned. The `response_callback` is
/// allowed to make subsequent requests, e.g. through the uncached client.
#[instrument(skip_all)]
pub async fn get_serde<
Payload: Serialize + DeserializeOwned + Send + 'static,
CallBackError,
Callback,
CallbackReturn,
>(
&self,
req: Request,
cache_entry: &CacheEntry,
cache_control: CacheControl,
response_callback: Callback,
) -> Result<Payload, CachedClientError<CallBackError>>
where
Callback: FnOnce(Response) -> CallbackReturn + Send,
CallbackReturn: Future<Output = Result<Payload, CallBackError>> + Send,
{
let payload = self
.get_cacheable(req, cache_entry, cache_control, move |resp| async {
let payload = response_callback(resp).await?;
Ok(SerdeCacheable { inner: payload })
})
.await?;
Ok(payload)
}
fn headers(&self) -> &http::header::HeaderMap {
self.0.headers()
/// Make a cached request with a custom response transformation while using
/// the `Cacheable` trait to (de)serialize cached responses.
///
/// The purpose of this routine is the use of `Cacheable`. Namely, it
/// generalizes over (de)serialization such that mechanisms other than
/// serde (such as rkyv) can be used to manage (de)serialization of cached
/// data.
///
/// If a new response was received (no prior cached response or modified
/// on the remote), the response is passed through `response_callback` and
/// only the result is cached and returned. The `response_callback` is
/// allowed to make subsequent requests, e.g. through the uncached client.
#[instrument(skip_all)]
pub async fn get_cacheable<Payload: Cacheable, CallBackError, Callback, CallbackReturn>(
&self,
req: Request,
cache_entry: &CacheEntry,
cache_control: CacheControl,
response_callback: Callback,
) -> Result<Payload::Target, CachedClientError<CallBackError>>
where
Callback: FnOnce(Response) -> CallbackReturn,
CallbackReturn: Future<Output = Result<Payload, CallBackError>> + Send,
{
let cached_response = match Self::read_cache(cache_entry).await {
Some(cached) => self.send_cached(req, cache_control, cached).boxed().await?,
None => {
debug!("No cache entry for: {}", req.url());
self.fresh_request(req).await?
}
};
match cached_response {
CachedResponse::FreshCache(cached) => Ok(Payload::from_aligned_bytes(cached.data)?),
CachedResponse::NotModified { cached, new_policy } => {
let refresh_cache =
info_span!("refresh_cache", file = %cache_entry.path().display());
async {
let data_with_cache_policy_bytes =
DataWithCachePolicy::serialize(&new_policy, &cached.data)?;
write_atomic(cache_entry.path(), data_with_cache_policy_bytes)
.await
.map_err(ErrorKind::CacheWrite)?;
Ok(Payload::from_aligned_bytes(cached.data)?)
}
.instrument(refresh_cache)
.await
}
CachedResponse::ModifiedOrNew {
response,
cache_policy,
} => {
let new_cache = info_span!("new_cache", file = %cache_entry.path().display());
let data = response_callback(response)
.boxed()
.await
.map_err(|err| CachedClientError::Callback(err))?;
let Some(cache_policy) = cache_policy else {
return Ok(data.into_target());
};
async {
fs_err::tokio::create_dir_all(cache_entry.dir())
.await
.map_err(ErrorKind::CacheWrite)?;
let data_with_cache_policy_bytes =
DataWithCachePolicy::serialize(&cache_policy, &data.to_bytes()?)?;
write_atomic(cache_entry.path(), data_with_cache_policy_bytes)
.await
.map_err(ErrorKind::CacheWrite)?;
Ok(data.into_target())
}
.instrument(new_cache)
.await
}
}
}
async fn read_cache(cache_entry: &CacheEntry) -> Option<DataWithCachePolicy> {
let span = info_span!("read_and_parse_cache", file = %cache_entry.path().display());
match span
.in_scope(|| DataWithCachePolicy::from_path_async(cache_entry.path()))
.await
{
Ok(data) => Some(data),
Err(err) => {
warn!(
"Broken cache entry at {}, removing: {err}",
cache_entry.path().display()
);
let _ = fs_err::tokio::remove_file(&cache_entry.path()).await;
None
}
}
}
/// Send a request given that we have a (possibly) stale cached response.
///
/// If the cached response is valid but stale, then this will attempt a
/// revalidation request.
async fn send_cached(
&self,
mut req: Request,
cache_control: CacheControl,
cached: DataWithCachePolicy,
) -> Result<CachedResponse, Error> {
// Apply the cache control header, if necessary.
match cache_control {
CacheControl::None => {}
CacheControl::MustRevalidate => {
req.headers_mut().insert(
http::header::CACHE_CONTROL,
http::HeaderValue::from_static("no-cache"),
);
}
}
Ok(match cached.cache_policy.before_request(&mut req) {
BeforeRequest::Fresh => {
debug!("Found fresh response for: {}", req.url());
CachedResponse::FreshCache(cached)
}
BeforeRequest::Stale(new_cache_policy_builder) => {
debug!("Found stale response for: {}", req.url());
self.send_cached_handle_stale(req, cached, new_cache_policy_builder)
.await?
}
BeforeRequest::NoMatch => {
// This shouldn't happen; if it does, we'll override the cache.
warn!(
"Cached request doesn't match current request for: {}",
req.url()
);
self.fresh_request(req).await?
}
})
}
async fn send_cached_handle_stale(
&self,
req: Request,
cached: DataWithCachePolicy,
new_cache_policy_builder: CachePolicyBuilder,
) -> Result<CachedResponse, Error> {
let url = req.url().clone();
debug!("Sending revalidation request for: {url}");
let response = self
.0
.execute(req)
.instrument(info_span!("revalidation_request", url = url.as_str()))
.await
.map_err(ErrorKind::RequestMiddlewareError)?
.error_for_status()
.map_err(ErrorKind::RequestError)?;
match cached
.cache_policy
.after_response(new_cache_policy_builder, &response)
{
AfterResponse::NotModified(new_policy) => {
debug!("Found not-modified response for: {url}");
Ok(CachedResponse::NotModified {
cached,
new_policy: Box::new(new_policy),
})
}
AfterResponse::Modified(new_policy) => {
debug!("Found modified response for: {url}");
Ok(CachedResponse::ModifiedOrNew {
response,
cache_policy: new_policy
.to_archived()
.is_storable()
.then(|| Box::new(new_policy)),
})
}
}
}
#[instrument(skip_all, fields(url = req.url().as_str()))]
async fn fresh_request(&self, req: Request) -> Result<CachedResponse, Error> {
trace!("Sending fresh {} request for {}", req.method(), req.url());
let cache_policy_builder = CachePolicyBuilder::new(&req);
let response = self
.0
.execute(req)
.await
.map_err(ErrorKind::RequestMiddlewareError)?
.error_for_status()
.map_err(ErrorKind::RequestError)?;
let cache_policy = cache_policy_builder.build(&response);
Ok(CachedResponse::ModifiedOrNew {
response,
cache_policy: cache_policy
.to_archived()
.is_storable()
.then(|| Box::new(cache_policy)),
})
}
}
#[derive(Debug)]
struct ResponseLikeReqwest<'a>(&'a Response);
enum CachedResponse {
/// The cached response is fresh without an HTTP request (e.g. age < max-age).
FreshCache(DataWithCachePolicy),
/// The cached response is fresh after an HTTP request (e.g. 304 not modified)
NotModified {
/// The cached response (with its old cache policy).
cached: DataWithCachePolicy,
/// The new [`CachePolicy`] is used to determine if the response
/// is fresh or stale when making subsequent requests for the same
/// resource. This policy should overwrite the old policy associated
/// with the cached response. In particular, this new policy is derived
/// from data received in a revalidation response, which might change
/// the parameters of cache behavior.
///
/// The policy is large (352 bytes at time of writing), so we reduce
/// the stack size by boxing it.
new_policy: Box<CachePolicy>,
},
/// There was no prior cached response or the cache was outdated
///
/// The cache policy is `None` if it isn't storable
ModifiedOrNew {
/// The response received from the server.
response: Response,
/// The [`CachePolicy`] is used to determine if the response is fresh or
/// stale when making subsequent requests for the same resource.
///
/// The policy is large (352 bytes at time of writing), so we reduce
/// the stack size by boxing it.
cache_policy: Option<Box<CachePolicy>>,
},
}
impl<'a> http_cache_semantics::ResponseLike for ResponseLikeReqwest<'a> {
fn status(&self) -> http::status::StatusCode {
self.0.status()
/// Represents an arbitrary data blob with an associated HTTP cache policy.
///
/// The cache policy is used to determine whether the data blob is stale or
/// not.
///
/// # Format
///
/// This type encapsulates the format for how blobs of data are stored on
/// disk. The format is very simple. First, the blob of data is written as-is.
/// Second, the archived representation of a `CachePolicy` is written. Thirdly,
/// the length, in bytes, of the archived `CachePolicy` is written as a 64-bit
/// little endian integer.
///
/// Reading the format is done via an `AlignedVec` so that `rkyv` can correctly
/// read the archived representation of the data blob. The cache policy is
/// split into its own `AlignedVec` allocation.
///
/// # Future ideas
///
/// This format was also chosen because it should in theory permit rewriting
/// the cache policy without needing to rewrite the data blob if the blob has
/// not changed. For example, this case occurs when a revalidation request
/// responds with HTTP 304 NOT MODIFIED. At time of writing, this is not yet
/// implemented because 1) the synchronization specifics of mutating a cache
/// file have not been worked out and 2) it's not clear if it's a win.
///
/// An alternative format would be to write the cache policy and the
/// blob in two distinct files. This would avoid needing to worry about
/// synchronization, but it means reading two files instead of one for every
/// cached response in the fast path. It's unclear whether it's worth it.
/// (Experiments have not yet been done.)
///
/// Another approach here would be to memory map the file and rejigger
/// `OwnedArchive` (or create a new type) that works with a memory map instead
/// of an `AlignedVec`. This will require care to ensure alignment is handled
/// correctly. This approach has not been litigated yet. I did not start with
/// it because experiments with ripgrep have tended to show that (on Linux)
/// memory mapping a bunch of small files ends up being quite a bit slower than
/// just reading them on to the heap.
#[derive(Debug)]
pub struct DataWithCachePolicy {
pub data: AlignedVec,
cache_policy: OwnedArchive<CachePolicy>,
}
impl DataWithCachePolicy {
/// Loads cached data and its associated HTTP cache policy from the given
/// file path in an asynchronous fashion (via `spawn_blocking`).
///
/// # Errors
///
/// If the given byte buffer is not in a valid format or if reading the
/// file given fails, then this returns an error.
async fn from_path_async(path: &Path) -> Result<DataWithCachePolicy, Error> {
let path = path.to_path_buf();
tokio::task::spawn_blocking(move || DataWithCachePolicy::from_path_sync(&path))
.await
// This just forwards panics from the closure.
.unwrap()
}
fn headers(&self) -> &http::header::HeaderMap {
self.0.headers()
/// Loads cached data and its associated HTTP cache policy from the given
/// file path in a synchronous fashion.
///
/// # Errors
///
/// If the given byte buffer is not in a valid format or if reading the
/// file given fails, then this returns an error.
fn from_path_sync(path: &Path) -> Result<DataWithCachePolicy, Error> {
let file = fs_err::File::open(path).map_err(ErrorKind::Io)?;
// Note that we don't wrap our file in a buffer because it will just
// get passed to AlignedVec::extend_from_reader, which doesn't benefit
// from an intermediary buffer. In effect, the AlignedVec acts as the
// buffer.
DataWithCachePolicy::from_reader(file)
}
/// Loads cached data and its associated HTTP cache policy from the given
/// reader.
///
/// # Errors
///
/// If the given byte buffer is not in a valid format or if the reader
/// fails, then this returns an error.
pub fn from_reader(mut rdr: impl std::io::Read) -> Result<DataWithCachePolicy, Error> {
let mut aligned_bytes = rkyv::util::AlignedVec::new();
aligned_bytes
.extend_from_reader(&mut rdr)
.map_err(ErrorKind::Io)?;
DataWithCachePolicy::from_aligned_bytes(aligned_bytes)
}
/// Loads cached data and its associated HTTP cache policy form an in
/// memory byte buffer.
///
/// # Errors
///
/// If the given byte buffer is not in a valid format, then this
/// returns an error.
fn from_aligned_bytes(mut bytes: AlignedVec) -> Result<DataWithCachePolicy, Error> {
let cache_policy = DataWithCachePolicy::deserialize_cache_policy(&mut bytes)?;
Ok(DataWithCachePolicy {
data: bytes,
cache_policy,
})
}
/// Serializes the given cache policy and arbitrary data blob to an in
/// memory byte buffer.
///
/// # Errors
///
/// If there was a problem converting the given cache policy to its
/// serialized representation, then this routine will return an error.
fn serialize(cache_policy: &CachePolicy, data: &[u8]) -> Result<Vec<u8>, Error> {
let mut buf = vec![];
DataWithCachePolicy::serialize_to_writer(cache_policy, data, &mut buf)?;
Ok(buf)
}
/// Serializes the given cache policy and arbitrary data blob to the given
/// writer.
///
/// # Errors
///
/// If there was a problem converting the given cache policy to its
/// serialized representation or if the writer returns an error, then
/// this routine will return an error.
fn serialize_to_writer(
cache_policy: &CachePolicy,
data: &[u8],
mut wtr: impl std::io::Write,
) -> Result<(), Error> {
let cache_policy_archived = OwnedArchive::from_unarchived(cache_policy)?;
let cache_policy_bytes = OwnedArchive::as_bytes(&cache_policy_archived);
wtr.write_all(data).map_err(ErrorKind::Io)?;
wtr.write_all(cache_policy_bytes).map_err(ErrorKind::Io)?;
let len = u64::try_from(cache_policy_bytes.len()).map_err(|_| {
let msg = format!(
"failed to represent {} (length of cache policy) in a u64",
cache_policy_bytes.len()
);
ErrorKind::Io(std::io::Error::other(msg))
})?;
wtr.write_all(&len.to_le_bytes()).map_err(ErrorKind::Io)?;
Ok(())
}
/// Deserializes a `OwnedArchive<CachePolicy>` off the end of the given
/// aligned bytes. Upon success, the given bytes will only contain the
/// data itself. The bytes representing the cached policy will have been
/// removed.
///
/// # Errors
///
/// This returns an error if the cache policy could not be deserialized
/// from the end of the given bytes.
fn deserialize_cache_policy(
bytes: &mut AlignedVec,
) -> Result<OwnedArchive<CachePolicy>, Error> {
let len = DataWithCachePolicy::deserialize_cache_policy_len(bytes)?;
let cache_policy_bytes_start = bytes.len() - (len + 8);
let cache_policy_bytes = &bytes[cache_policy_bytes_start..][..len];
let mut cache_policy_bytes_aligned = AlignedVec::with_capacity(len);
cache_policy_bytes_aligned.extend_from_slice(cache_policy_bytes);
assert!(
cache_policy_bytes_start <= bytes.len(),
"slicing cache policy should result in a truncation"
);
// Technically this will keep the extra capacity used to store the
// cache policy around. But it should be pretty small, and it saves a
// realloc. (It's unclear whether that matters more or less than the
// extra memory usage.)
bytes.resize(cache_policy_bytes_start, 0);
OwnedArchive::new(cache_policy_bytes_aligned)
}
/// Deserializes the length, in bytes, of the cache policy given a complete
/// serialized byte buffer of a `DataWithCachePolicy`.
///
/// Upon success, callers are guaranteed that
/// `&bytes[bytes.len() - (len + 8)..][..len]` will not panic.
///
/// # Errors
///
/// This returns an error if the length could not be read as a `usize` or is
/// otherwise known to be invalid. (For example, it is a length that is bigger
/// than `bytes.len()`.)
fn deserialize_cache_policy_len(bytes: &[u8]) -> Result<usize, Error> {
let Some(cache_policy_len_start) = bytes.len().checked_sub(8) else {
let msg = format!(
"data-with-cache-policy buffer should be at least 8 bytes \
in length, but is {} bytes",
bytes.len(),
);
return Err(ErrorKind::ArchiveRead(msg).into());
};
let cache_policy_len_bytes = <[u8; 8]>::try_from(&bytes[cache_policy_len_start..])
.expect("cache policy length is 8 bytes");
let len_u64 = u64::from_le_bytes(cache_policy_len_bytes);
let Ok(len_usize) = usize::try_from(len_u64) else {
let msg = format!(
"data-with-cache-policy has cache policy length of {}, \
but overflows usize",
len_u64,
);
return Err(ErrorKind::ArchiveRead(msg).into());
};
if bytes.len() < len_usize + 8 {
let msg = format!(
"invalid cache entry: data-with-cache-policy has cache policy length of {}, \
but total buffer size is {}",
len_usize,
bytes.len(),
);
return Err(ErrorKind::ArchiveRead(msg).into());
}
Ok(len_usize)
}
}

View file

@ -132,7 +132,7 @@ impl<'a> FlatIndexClient<'a> {
.instrument(info_span!("parse_flat_index_html", url = % url))
};
let files = cached_client
.get_cached_with_callback(
.get_serde(
flat_index_request,
&cache_entry,
cache_control,

View file

@ -0,0 +1,775 @@
use std::collections::HashSet;
use crate::rkyvutil::OwnedArchive;
/// Represents values for relevant cache-control directives.
///
/// This does include some directives that we don't use mostly because they are
/// trivial to support. (For example, `must-understand` at time of writing is
/// not used in our HTTP cache semantics. Neither is `proxy-revalidate` since
/// we are not a proxy.)
#[derive(
Clone,
Debug,
Default,
Eq,
PartialEq,
serde::Serialize,
serde::Deserialize,
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
)]
#[archive(check_bytes)]
#[archive_attr(derive(Debug))]
pub struct CacheControl {
// directives for requests and responses
/// * https://www.rfc-editor.org/rfc/rfc9111.html#name-max-age
/// * https://www.rfc-editor.org/rfc/rfc9111.html#name-max-age-2
pub max_age_seconds: Option<u64>,
/// * https://www.rfc-editor.org/rfc/rfc9111.html#name-no-cache
/// * https://www.rfc-editor.org/rfc/rfc9111.html#name-no-cache-2
pub no_cache: bool,
/// * https://www.rfc-editor.org/rfc/rfc9111.html#name-no-store
/// * https://www.rfc-editor.org/rfc/rfc9111.html#name-no-store-2
pub no_store: bool,
/// * https://www.rfc-editor.org/rfc/rfc9111.html#name-no-transform
/// * https://www.rfc-editor.org/rfc/rfc9111.html#name-no-transform-2
pub no_transform: bool,
// request-only directives
/// https://www.rfc-editor.org/rfc/rfc9111.html#name-max-stale
pub max_stale_seconds: Option<u64>,
/// https://www.rfc-editor.org/rfc/rfc9111.html#name-min-fresh
pub min_fresh_seconds: Option<u64>,
// response-only directives
/// https://www.rfc-editor.org/rfc/rfc9111.html#name-only-if-cached
pub only_if_cached: bool,
/// https://www.rfc-editor.org/rfc/rfc9111.html#name-must-revalidate
pub must_revalidate: bool,
/// https://www.rfc-editor.org/rfc/rfc9111.html#name-must-understand
pub must_understand: bool,
/// https://www.rfc-editor.org/rfc/rfc9111.html#name-private
pub private: bool,
/// https://www.rfc-editor.org/rfc/rfc9111.html#name-proxy-revalidate
pub proxy_revalidate: bool,
/// https://www.rfc-editor.org/rfc/rfc9111.html#name-public
pub public: bool,
/// https://www.rfc-editor.org/rfc/rfc9111.html#name-s-maxage
pub s_maxage_seconds: Option<u64>,
/// https://httpwg.org/specs/rfc8246.html
pub immutable: bool,
}
impl CacheControl {
/// Convert this to an owned archive value.
pub fn to_archived(&self) -> OwnedArchive<CacheControl> {
// There's no way (other than OOM) for serializing this type to fail.
OwnedArchive::from_unarchived(self).expect("all possible values can be archived")
}
}
impl<'b, B: 'b + ?Sized + AsRef<[u8]>> FromIterator<&'b B> for CacheControl {
fn from_iter<T: IntoIterator<Item = &'b B>>(it: T) -> CacheControl {
CacheControl::from_iter(CacheControlParser::new(it))
}
}
impl FromIterator<CacheControlDirective> for CacheControl {
fn from_iter<T: IntoIterator<Item = CacheControlDirective>>(it: T) -> CacheControl {
fn parse_int(value: &[u8]) -> Option<u64> {
if !value.iter().all(u8::is_ascii_digit) {
return None;
}
std::str::from_utf8(value).ok()?.parse().ok()
}
let mut cc = CacheControl::default();
for ccd in it {
// Note that when we see invalid directive values, we follow [RFC
// 9111 S4.2.1]. It says that invalid cache-control directives
// should result in treating the response as stale. (Which we
// accomplished by setting `must_revalidate` to `true`.)
//
// [RFC 9111 S4.2.1]: https://www.rfc-editor.org/rfc/rfc9111.html#section-4.2.1
match &*ccd.name {
// request + response directives
"max-age" => match parse_int(&ccd.value) {
None => cc.must_revalidate = true,
Some(seconds) => cc.max_age_seconds = Some(seconds),
},
"no-cache" => cc.no_cache = true,
"no-store" => cc.no_store = true,
"no-transform" => cc.no_transform = true,
// request-only directives
"max-stale" => {
// As per [RFC 9111 S5.2.1.2], "If no value is assigned to
// max-stale, then the client will accept a stale response
// of any age." We implement that by just using the maximum
// number of seconds.
//
// [RFC 9111 S5.2.1.2]: https://www.rfc-editor.org/rfc/rfc9111.html#section-5.2.1.2
if ccd.value.is_empty() {
cc.max_stale_seconds = Some(u64::MAX);
} else {
match parse_int(&ccd.value) {
None => cc.must_revalidate = true,
Some(seconds) => cc.max_stale_seconds = Some(seconds),
}
}
}
"min-fresh" => match parse_int(&ccd.value) {
None => cc.must_revalidate = true,
Some(seconds) => cc.min_fresh_seconds = Some(seconds),
},
"only-if-cached" => cc.only_if_cached = true,
"must-revalidate" => cc.must_revalidate = true,
"must-understand" => cc.must_understand = true,
"private" => cc.private = true,
"proxy-revalidate" => cc.proxy_revalidate = true,
"public" => cc.public = true,
"s-maxage" => match parse_int(&ccd.value) {
None => cc.must_revalidate = true,
Some(seconds) => cc.s_maxage_seconds = Some(seconds),
},
"immutable" => cc.immutable = true,
_ => {}
}
}
cc
}
}
/// A parser for the HTTP `Cache-Control` header.
///
/// The parser is mostly defined across multiple parts of multiple RFCs.
/// Namely, [RFC 9110 S5.6.2] says how to parse the names (or "keys") of each
/// directive (whose format is a "token"). [RFC 9110 S5.6.4] says how to parse
/// quoted values. And finally, [RFC 9111 Appendex A] gives the ABNF for the
/// overall header value.
///
/// This parser accepts an iterator of anything that can be cheaply converted
/// to a byte string (e.g., `http::header::HeaderValue`). Directives are parsed
/// from zero or more of these byte strings. Parsing cannot return an error,
/// but if something unexpected is found, the rest of that header value is
/// skipped.
///
/// Duplicate directives provoke an automatic insertion of `must-revalidate`,
/// as implied by [RFC 9111 S4.2.1], to ensure that the client will talk to the
/// server before using anything in case.
///
/// This parser handles a bit more than what we actually need in
/// `puffin-client`. For example, we don't need to handle quoted values at all
/// since either don't use or care about values that require quoted. With that
/// said, the parser handles these because it wasn't that much extra work to do
/// so and just generally seemed like good sense. (If we didn't handle them and
/// parsed them incorrectly, that might mean parsing subsequent directives that
/// we do care about incorrectly.)
///
/// [RFC 9110 S5.6.2]: https://www.rfc-editor.org/rfc/rfc9110.html#name-tokens
/// [RFC 9110 S5.6.4]: https://www.rfc-editor.org/rfc/rfc9110.html#name-quoted-strings
/// [RFC 9111 Appendix A]: https://www.rfc-editor.org/rfc/rfc9111.html#name-collected-abnf
/// [RFC 9111 S4.2.1]: https://www.rfc-editor.org/rfc/rfc9111.html#calculating.freshness.lifetime
struct CacheControlParser<'b, I> {
cur: &'b [u8],
directives: I,
seen: HashSet<String>,
}
impl<'b, B: 'b + ?Sized + AsRef<[u8]>, I: Iterator<Item = &'b B>> CacheControlParser<'b, I> {
/// Create a new parser of zero or more `Cache-Control` header values. The
/// given iterator should yield elements that satisfy `AsRef<[u8]>`.
fn new<II: IntoIterator<IntoIter = I>>(headers: II) -> CacheControlParser<'b, I> {
let mut directives = headers.into_iter();
let cur = directives.next().map(|h| h.as_ref()).unwrap_or(b"");
CacheControlParser {
cur,
directives,
seen: HashSet::new(),
}
}
/// Parses a token according to [RFC 9110 S5.6.2].
///
/// If no token is found at the current position, then this returns `None`.
/// Usually this indicates an invalid cache-control directive.
///
/// This does not trim whitespace before or after the token.
///
/// [RFC 9110 S5.6.2]: https://www.rfc-editor.org/rfc/rfc9110.html#name-tokens
fn parse_token(&mut self) -> Option<String> {
/// Returns true when the given byte can appear in a token, as
/// defined by [RFC 9110 S5.6.2].
///
/// [RFC 9110 S5.6.2]: https://www.rfc-editor.org/rfc/rfc9110.html#name-tokens
fn is_token_byte(byte: u8) -> bool {
matches!(
byte,
| b'!' | b'#' | b'$' | b'%' | b'&' | b'\'' | b'*' | b'+'
| b'-' | b'.' | b'^' | b'_' | b'`' | b'|' | b'~'
| b'0'..=b'9' | b'A'..=b'Z' | b'a'..=b'z',
)
}
let mut end = 0;
while self.cur.get(end).copied().map_or(false, is_token_byte) {
end += 1;
}
if end == 0 {
None
} else {
let (token, rest) = self.cur.split_at(end);
self.cur = rest;
// This can't fail because `end` is only incremented when the
// current byte is a valid token byte. And all valid token bytes
// are ASCII and thus valid UTF-8.
Some(String::from_utf8(token.to_vec()).expect("all valid token bytes are valid UTF-8"))
}
}
/// Looks for an `=` as per [RFC 9111 Appendix A] which indicates that a
/// cache directive has a value.
///
/// This returns true if one was found. In which case, the `=` is consumed.
///
/// This does not trim whitespace before or after the token.
///
/// [RFC 9111 Appendix A]: https://www.rfc-editor.org/rfc/rfc9111.html#name-collected-abnf
fn maybe_parse_equals(&mut self) -> bool {
if self.cur.first().map_or(false, |&byte| byte == b'=') {
self.cur = &self.cur[1..];
true
} else {
false
}
}
/// Parses a directive value as either an unquoted token or a quoted string
/// as per [RFC 9111 Appendix A].
///
/// If a valid value could not be found (for example, end-of-input or an
/// opening quote without a closing quote), then `None` is returned. In
/// this case, one should consider the cache-control header invalid.
///
/// This does not trim whitespace before or after the token.
///
/// Note that the returned value is *not* guaranteed to be valid UTF-8.
/// Namely, it is possible for a quoted string to contain invalid UTF-8.
///
/// [RFC 9111 Appendix A]: https://www.rfc-editor.org/rfc/rfc9111.html#name-collected-abnf
fn parse_value(&mut self) -> Option<Vec<u8>> {
if *self.cur.first()? == b'"' {
self.cur = &self.cur[1..];
self.parse_quoted_string()
} else {
self.parse_token().map(|s| s.into_bytes())
}
}
/// Parses a quoted string as per [RFC 9110 S5.6.4].
///
/// This assumes the opening quote has already been consumed.
///
/// If an invalid quoted string was found (e.g., no closing quote), then
/// `None` is returned. An empty value may be returned.
///
/// Note that the returned value is *not* guaranteed to be valid UTF-8.
/// Namely, it is possible for a quoted string to contain invalid UTF-8.
///
/// [RFC 9110 S5.6.4]: https://www.rfc-editor.org/rfc/rfc9110.html#name-quoted-strings
fn parse_quoted_string(&mut self) -> Option<Vec<u8>> {
fn is_qdtext_byte(byte: u8) -> bool {
matches!(byte, b'\t' | b' ' | 0x21 | 0x23..=0x5B | 0x5D..=0x7E | 0x80..=0xFF)
}
fn is_quoted_pair_byte(byte: u8) -> bool {
matches!(byte, b'\t' | b' ' | 0x21..=0x7E | 0x80..=0xFF)
}
let mut value = vec![];
while !self.cur.is_empty() {
let byte = self.cur[0];
self.cur = &self.cur[1..];
if byte == b'"' {
return Some(value);
} else if byte == b'\\' {
let byte = *self.cur.first()?;
self.cur = &self.cur[1..];
// If we saw an escape but didn't see a valid
// escaped byte, then we treat this value as
// invalid.
if !is_quoted_pair_byte(byte) {
return None;
}
value.push(byte);
} else if is_qdtext_byte(byte) {
value.push(byte);
} else {
break;
}
}
// If we got here, it means we hit end-of-input before seeing a closing
// quote. So we treat this as invalid and return `None`.
None
}
/// Looks for a `,` as per [RFC 9111 Appendix A]. If one is found, then it
/// is consumed and this returns true.
///
/// This does not trim whitespace before or after the token.
///
/// [RFC 9111 Appendix A]: https://www.rfc-editor.org/rfc/rfc9111.html#name-collected-abnf
fn maybe_parse_directive_delimiter(&mut self) -> bool {
if self.cur.first().map_or(false, |&byte| byte == b',') {
self.cur = &self.cur[1..];
true
} else {
false
}
}
/// [RFC 9111 Appendix A] says that optional whitespace may appear between
/// cache directives. We actually also allow whitespace to appear before
/// the first directive and after the last directive.
///
/// [RFC 9111 Appendix A]: https://www.rfc-editor.org/rfc/rfc9111.html#name-collected-abnf
fn skip_whitespace(&mut self) {
while self.cur.first().map_or(false, u8::is_ascii_whitespace) {
self.cur = &self.cur[1..];
}
}
fn emit_directive(
&mut self,
directive: CacheControlDirective,
) -> Option<CacheControlDirective> {
let duplicate = !self.seen.insert(directive.name.clone());
if duplicate {
self.emit_revalidation()
} else {
Some(directive)
}
}
fn emit_revalidation(&mut self) -> Option<CacheControlDirective> {
if self.seen.insert("must-revalidate".to_string()) {
Some(CacheControlDirective::must_revalidate())
} else {
// If we've already emitted a must-revalidate
// directive, then don't do it again.
None
}
}
}
impl<'b, B: 'b + ?Sized + AsRef<[u8]>, I: Iterator<Item = &'b B>> Iterator
for CacheControlParser<'b, I>
{
type Item = CacheControlDirective;
fn next(&mut self) -> Option<CacheControlDirective> {
loop {
if self.cur.is_empty() {
self.cur = self.directives.next().map(|h| h.as_ref())?;
}
while !self.cur.is_empty() {
self.skip_whitespace();
let Some(mut name) = self.parse_token() else {
// If we fail to parse a token, then this header value is
// either corrupt or empty. So skip the rest of it.
let invalid = !self.cur.is_empty();
self.cur = b"";
// But if it was invalid, force revalidation.
if invalid {
if let Some(d) = self.emit_revalidation() {
return Some(d);
}
}
break;
};
name.make_ascii_lowercase();
if !self.maybe_parse_equals() {
// Eat up whitespace and the next delimiter. We don't care
// if we find a terminator.
self.skip_whitespace();
self.maybe_parse_directive_delimiter();
let directive = CacheControlDirective {
name,
value: vec![],
};
match self.emit_directive(directive) {
None => continue,
Some(d) => return Some(d),
}
}
let Some(value) = self.parse_value() else {
// If we expected a value (we saw an =) but couldn't find a
// valid value, then this header value is probably corrupt.
// So skip the rest of it.
self.cur = b"";
match self.emit_revalidation() {
None => break,
Some(d) => return Some(d),
}
};
// Eat up whitespace and the next delimiter. We don't care if
// we find a terminator.
self.skip_whitespace();
self.maybe_parse_directive_delimiter();
let directive = CacheControlDirective { name, value };
match self.emit_directive(directive) {
None => continue,
Some(d) => return Some(d),
}
}
}
}
}
/// A single directive from the `Cache-Control` header.
#[derive(Debug, Eq, PartialEq)]
struct CacheControlDirective {
/// The name of the directive.
name: String,
/// A possibly empty value.
///
/// Note that directive values may contain invalid UTF-8. (Although they
/// cannot actually contain arbitrary bytes. For example, NUL bytes, among
/// others, are not allowed.)
value: Vec<u8>,
}
impl CacheControlDirective {
/// Returns a `must-revalidate` directive. This is useful for forcing a
/// cache decision that the response is stale, and thus the server should
/// be consulted for whether the cached response ought to be used or not.
fn must_revalidate() -> CacheControlDirective {
CacheControlDirective {
name: "must-revalidate".to_string(),
value: vec![],
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cache_control_token() {
let cc: CacheControl = CacheControlParser::new(["no-cache"]).collect();
assert!(cc.no_cache);
assert!(!cc.must_revalidate);
}
#[test]
fn cache_control_max_age() {
let cc: CacheControl = CacheControlParser::new(["max-age=60"]).collect();
assert_eq!(Some(60), cc.max_age_seconds);
assert!(!cc.must_revalidate);
}
// [RFC 9111 S5.2.1.1] says that client MUST NOT quote max-age, but we
// support parsing it that way anyway.
//
// [RFC 9111 S5.2.1.1]: https://www.rfc-editor.org/rfc/rfc9111.html#section-5.2.1.1
#[test]
fn cache_control_max_age_quoted() {
let cc: CacheControl = CacheControlParser::new([r#"max-age="60""#]).collect();
assert_eq!(Some(60), cc.max_age_seconds);
assert!(!cc.must_revalidate);
}
#[test]
fn cache_control_max_age_invalid() {
let cc: CacheControl = CacheControlParser::new(["max-age=6a0"]).collect();
assert_eq!(None, cc.max_age_seconds);
assert!(cc.must_revalidate);
}
#[test]
fn cache_control_immutable() {
let cc: CacheControl = CacheControlParser::new(["max-age=31536000, immutable"]).collect();
assert_eq!(Some(31_536_000), cc.max_age_seconds);
assert!(cc.immutable);
assert!(!cc.must_revalidate);
}
#[test]
fn cache_control_unrecognized() {
let cc: CacheControl = CacheControlParser::new(["lion,max-age=60,zebra"]).collect();
assert_eq!(Some(60), cc.max_age_seconds);
}
#[test]
fn cache_control_invalid_squashes_remainder() {
let cc: CacheControl = CacheControlParser::new(["no-cache,\x00,max-age=60"]).collect();
// The invalid data doesn't impact things before it.
assert!(cc.no_cache);
// The invalid data precludes parsing anything after.
assert_eq!(None, cc.max_age_seconds);
// The invalid contents should force revalidation.
assert!(cc.must_revalidate);
}
#[test]
fn cache_control_invalid_squashes_remainder_but_not_other_header_values() {
let cc: CacheControl =
CacheControlParser::new(["no-cache,\x00,max-age=60", "max-stale=30"]).collect();
// The invalid data doesn't impact things before it.
assert!(cc.no_cache);
// The invalid data precludes parsing anything after
// in the same header value, but not in other
// header values.
assert_eq!(Some(30), cc.max_stale_seconds);
// The invalid contents should force revalidation.
assert!(cc.must_revalidate);
}
#[test]
fn cache_control_parse_token() {
let directives = CacheControlParser::new(["no-cache"]).collect::<Vec<_>>();
assert_eq!(
directives,
vec![CacheControlDirective {
name: "no-cache".to_string(),
value: vec![]
}]
);
}
#[test]
fn cache_control_parse_token_to_token_value() {
let directives = CacheControlParser::new(["max-age=60"]).collect::<Vec<_>>();
assert_eq!(
directives,
vec![CacheControlDirective {
name: "max-age".to_string(),
value: b"60".to_vec(),
}]
);
}
#[test]
fn cache_control_parse_token_to_quoted_string() {
let directives =
CacheControlParser::new([r#"private="cookie,x-something-else""#]).collect::<Vec<_>>();
assert_eq!(
directives,
vec![CacheControlDirective {
name: "private".to_string(),
value: b"cookie,x-something-else".to_vec(),
}]
);
}
#[test]
fn cache_control_parse_token_to_quoted_string_with_escape() {
let directives =
CacheControlParser::new([r#"private="something\"crazy""#]).collect::<Vec<_>>();
assert_eq!(
directives,
vec![CacheControlDirective {
name: "private".to_string(),
value: br#"something"crazy"#.to_vec(),
}]
);
}
#[test]
fn cache_control_parse_multiple_directives() {
let header = r#"max-age=60, no-cache, private="cookie", no-transform"#;
let directives = CacheControlParser::new([header]).collect::<Vec<_>>();
assert_eq!(
directives,
vec![
CacheControlDirective {
name: "max-age".to_string(),
value: b"60".to_vec(),
},
CacheControlDirective {
name: "no-cache".to_string(),
value: vec![]
},
CacheControlDirective {
name: "private".to_string(),
value: b"cookie".to_vec(),
},
CacheControlDirective {
name: "no-transform".to_string(),
value: vec![]
},
]
);
}
#[test]
fn cache_control_parse_multiple_directives_across_multiple_header_values() {
let headers = [
r#"max-age=60, no-cache"#,
r#"private="cookie""#,
r#"no-transform"#,
];
let directives = CacheControlParser::new(headers).collect::<Vec<_>>();
assert_eq!(
directives,
vec![
CacheControlDirective {
name: "max-age".to_string(),
value: b"60".to_vec(),
},
CacheControlDirective {
name: "no-cache".to_string(),
value: vec![]
},
CacheControlDirective {
name: "private".to_string(),
value: b"cookie".to_vec(),
},
CacheControlDirective {
name: "no-transform".to_string(),
value: vec![]
},
]
);
}
#[test]
fn cache_control_parse_one_header_invalid() {
let headers = [
r#"max-age=60, no-cache"#,
r#", private="cookie""#,
r#"no-transform"#,
];
let directives = CacheControlParser::new(headers).collect::<Vec<_>>();
assert_eq!(
directives,
vec![
CacheControlDirective {
name: "max-age".to_string(),
value: b"60".to_vec(),
},
CacheControlDirective {
name: "no-cache".to_string(),
value: vec![]
},
CacheControlDirective {
name: "must-revalidate".to_string(),
value: vec![]
},
CacheControlDirective {
name: "no-transform".to_string(),
value: vec![]
},
]
);
}
#[test]
fn cache_control_parse_invalid_directive_drops_remainder() {
let header = r#"max-age=60, no-cache, ="cookie", no-transform"#;
let directives = CacheControlParser::new([header]).collect::<Vec<_>>();
assert_eq!(
directives,
vec![
CacheControlDirective {
name: "max-age".to_string(),
value: b"60".to_vec(),
},
CacheControlDirective {
name: "no-cache".to_string(),
value: vec![]
},
CacheControlDirective {
name: "must-revalidate".to_string(),
value: vec![]
},
]
);
}
#[test]
fn cache_control_parse_name_normalized() {
let header = r#"MAX-AGE=60"#;
let directives = CacheControlParser::new([header]).collect::<Vec<_>>();
assert_eq!(
directives,
vec![CacheControlDirective {
name: "max-age".to_string(),
value: b"60".to_vec(),
},]
);
}
// When a duplicate directive is found, we keep the first one
// and add in a `must-revalidate` directive to indicate that
// things are stale and the client should do a re-check.
#[test]
fn cache_control_parse_duplicate_directives() {
let header = r#"max-age=60, no-cache, max-age=30"#;
let directives = CacheControlParser::new([header]).collect::<Vec<_>>();
assert_eq!(
directives,
vec![
CacheControlDirective {
name: "max-age".to_string(),
value: b"60".to_vec(),
},
CacheControlDirective {
name: "no-cache".to_string(),
value: vec![]
},
CacheControlDirective {
name: "must-revalidate".to_string(),
value: vec![]
},
]
);
}
#[test]
fn cache_control_parse_duplicate_directives_across_headers() {
let headers = [r#"max-age=60, no-cache"#, r#"max-age=30"#];
let directives = CacheControlParser::new(headers).collect::<Vec<_>>();
assert_eq!(
directives,
vec![
CacheControlDirective {
name: "max-age".to_string(),
value: b"60".to_vec(),
},
CacheControlDirective {
name: "no-cache".to_string(),
value: vec![]
},
CacheControlDirective {
name: "must-revalidate".to_string(),
value: vec![]
},
]
);
}
// Tests that we don't emit must-revalidate multiple times
// even when something is duplicated multiple times.
#[test]
fn cache_control_parse_duplicate_redux() {
let header = r#"max-age=60, no-cache, no-cache, max-age=30"#;
let directives = CacheControlParser::new([header]).collect::<Vec<_>>();
assert_eq!(
directives,
vec![
CacheControlDirective {
name: "max-age".to_string(),
value: b"60".to_vec(),
},
CacheControlDirective {
name: "no-cache".to_string(),
value: vec![]
},
CacheControlDirective {
name: "must-revalidate".to_string(),
value: vec![]
},
]
);
}
}

File diff suppressed because it is too large Load diff

View file

@ -7,11 +7,11 @@ pub use registry_client::{
};
pub use rkyvutil::OwnedArchive;
mod cache_headers;
mod cached_client;
mod error;
mod flat_index;
mod html;
mod httpcache;
mod registry_client;
mod remote_metadata;
mod rkyvutil;

View file

@ -27,6 +27,7 @@ use pypi_types::{Metadata21, SimpleJson};
use crate::cached_client::CacheControl;
use crate::html::SimpleHtml;
use crate::remote_metadata::wheel_metadata_from_remote_zip;
use crate::rkyvutil::OwnedArchive;
use crate::{CachedClient, CachedClientError, Error, ErrorKind};
/// A builder for an [`RegistryClient`].
@ -122,7 +123,7 @@ impl RegistryClient {
pub async fn simple(
&self,
package_name: &PackageName,
) -> Result<(IndexUrl, SimpleMetadata), Error> {
) -> Result<(IndexUrl, OwnedArchive<SimpleMetadata>), Error> {
if self.index_urls.no_index() {
return Err(ErrorKind::NoIndex(package_name.as_ref().to_string()).into());
}
@ -152,7 +153,7 @@ impl RegistryClient {
&self,
package_name: &PackageName,
index: &IndexUrl,
) -> Result<Result<SimpleMetadata, CachedClientError<Error>>, Error> {
) -> Result<Result<OwnedArchive<SimpleMetadata>, CachedClientError<Error>>, Error> {
// Format the URL for PyPI.
let mut url: Url = index.clone().into();
url.path_segments_mut()
@ -168,7 +169,7 @@ impl RegistryClient {
IndexUrl::Pypi => "pypi".to_string(),
IndexUrl::Url(url) => cache_key::digest(&cache_key::CanonicalUrl::new(url)),
}),
format!("{package_name}.msgpack"),
format!("{package_name}.rkyv"),
);
let cache_control = CacheControl::from(
self.cache
@ -201,14 +202,14 @@ impl RegistryClient {
))
})?;
match media_type {
let unarchived = match media_type {
MediaType::Json => {
let bytes = response.bytes().await.map_err(ErrorKind::RequestError)?;
let data: SimpleJson = serde_json::from_slice(bytes.as_ref())
.map_err(|err| Error::from_json_err(err, url.clone()))?;
let metadata =
SimpleMetadata::from_files(data.files, package_name, url.as_str());
Ok(metadata)
metadata
}
MediaType::Html => {
let text = response.text().await.map_err(ErrorKind::RequestError)?;
@ -216,16 +217,17 @@ impl RegistryClient {
.map_err(|err| Error::from_html_err(err, url.clone()))?;
let metadata =
SimpleMetadata::from_files(files, package_name, base.as_url().as_str());
Ok(metadata)
metadata
}
}
};
OwnedArchive::from_unarchived(&unarchived)
}
.boxed()
.instrument(info_span!("parse_simple_api", package = %package_name))
};
let result = self
.client
.get_cached_with_callback(
.get_cacheable(
simple_request,
&cache_entry,
cache_control,
@ -339,7 +341,7 @@ impl RegistryClient {
.map_err(ErrorKind::RequestError)?;
Ok(self
.client
.get_cached_with_callback(req, &cache_entry, cache_control, response_callback)
.get_serde(req, &cache_entry, cache_control, response_callback)
.await?)
} else {
// If we lack PEP 658 support, try using HTTP range requests to read only the
@ -399,7 +401,7 @@ impl RegistryClient {
.map_err(ErrorKind::RequestError)?;
let result = self
.client
.get_cached_with_callback(
.get_serde(
req,
&cache_entry,
cache_control,

View file

@ -15,7 +15,7 @@ use pep440_rs::{Version, VersionSpecifier, VersionSpecifiers};
use pep508_rs::{Requirement, VersionOrUrl};
use platform_host::Platform;
use puffin_cache::{Cache, CacheArgs};
use puffin_client::{FlatIndex, RegistryClient, RegistryClientBuilder};
use puffin_client::{FlatIndex, OwnedArchive, RegistryClient, RegistryClientBuilder};
use puffin_dispatch::BuildDispatch;
use puffin_installer::NoBinary;
use puffin_interpreter::Virtualenv;
@ -48,7 +48,8 @@ async fn find_latest_version(
client: &RegistryClient,
package_name: &PackageName,
) -> Option<Version> {
let (_, simple_metadata) = client.simple(package_name).await.ok()?;
let (_, raw_simple_metadata) = client.simple(package_name).await.ok()?;
let simple_metadata = OwnedArchive::deserialize(&raw_simple_metadata);
let version = simple_metadata.into_iter().next()?.version;
Some(version.clone())
}

View file

@ -177,7 +177,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
let archive = self
.client
.cached_client()
.get_cached_with_callback(req, &http_entry, cache_control, download)
.get_serde(req, &http_entry, cache_control, download)
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,
@ -240,7 +240,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
let archive = self
.client
.cached_client()
.get_cached_with_callback(req, &http_entry, cache_control, download)
.get_serde(req, &http_entry, cache_control, download)
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,

View file

@ -273,7 +273,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
let req = self.cached_client.uncached().get(url.clone()).build()?;
let manifest = self
.cached_client
.get_cached_with_callback(req, &cache_entry, cache_control, download)
.get_serde(req, &cache_entry, cache_control, download)
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,
@ -366,7 +366,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
let req = self.cached_client.uncached().get(url.clone()).build()?;
let manifest = self
.cached_client
.get_cached_with_callback(req, &cache_entry, cache_control, download)
.get_serde(req, &cache_entry, cache_control, download)
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,
@ -941,10 +941,11 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
/// Read an existing HTTP-cached [`Manifest`], if it exists.
pub(crate) fn read_http_manifest(cache_entry: &CacheEntry) -> Result<Option<Manifest>, Error> {
match std::fs::read(cache_entry.path()) {
Ok(cached) => Ok(Some(
rmp_serde::from_slice::<DataWithCachePolicy<Manifest>>(&cached)?.data,
)),
match std::fs::File::open(cache_entry.path()) {
Ok(file) => {
let data = DataWithCachePolicy::from_reader(file)?.data;
Ok(Some(rmp_serde::from_slice::<Manifest>(&data)?))
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(Error::CacheRead(err)),
}

View file

@ -40,7 +40,6 @@ dashmap = { workspace = true }
derivative = { workspace = true }
fs-err = { workspace = true, features = ["tokio"] }
futures = { workspace = true }
http-cache-semantics = { workspace = true }
indexmap = { workspace = true }
itertools = { workspace = true }
once_cell = { workspace = true }

View file

@ -12,7 +12,7 @@ use distribution_types::{Dist, IndexUrl, Resolution};
use pep508_rs::{Requirement, VersionOrUrl};
use platform_tags::Tags;
use puffin_client::{
FlatDistributions, FlatIndex, RegistryClient, SimpleMetadata, SimpleMetadatum,
FlatDistributions, FlatIndex, OwnedArchive, RegistryClient, SimpleMetadata, SimpleMetadatum,
};
use puffin_interpreter::Interpreter;
use puffin_normalize::PackageName;
@ -67,7 +67,8 @@ impl<'a> DistFinder<'a> {
match requirement.version_or_url.as_ref() {
None | Some(VersionOrUrl::VersionSpecifier(_)) => {
// Query the index(es) (cached) to get the URLs for the available files.
let (index, metadata) = self.client.simple(&requirement.name).await?;
let (index, raw_metadata) = self.client.simple(&requirement.name).await?;
let metadata = OwnedArchive::deserialize(&raw_metadata);
// Pick a version that satisfies the requirement.
let Some(dist) = self.select(requirement, metadata, &index, flat_index) else {

View file

@ -8,7 +8,7 @@ use distribution_filename::DistFilename;
use distribution_types::{Dist, IndexUrl, PrioritizedDistribution, ResolvableDist};
use pep440_rs::Version;
use platform_tags::Tags;
use puffin_client::{FlatDistributions, SimpleMetadata, SimpleMetadatum};
use puffin_client::{FlatDistributions, OwnedArchive, SimpleMetadata, SimpleMetadatum};
use puffin_normalize::PackageName;
use puffin_traits::NoBinary;
use puffin_warnings::warn_user_once;
@ -26,7 +26,7 @@ impl VersionMap {
#[instrument(skip_all, fields(package_name))]
#[allow(clippy::too_many_arguments)]
pub(crate) fn from_metadata(
metadata: SimpleMetadata,
raw_metadata: OwnedArchive<SimpleMetadata>,
package_name: &PackageName,
index: &IndexUrl,
tags: &Tags,
@ -36,6 +36,15 @@ impl VersionMap {
flat_index: Option<FlatDistributions>,
no_binary: &NoBinary,
) -> Self {
// NOTE: We should experiment with refactoring the code
// below to work on rkyv::Archived<SimpleMetadata>. More
// specifically, we may want to adjust VersionMap itself to
// contain an Archived<SimpleMetadata> of some kind, that in
// turn is used in the resolver. The idea here is to avoid
// eagerly deserializing all of the metadata for a package
// up-front.
let metadata = OwnedArchive::deserialize(&raw_metadata);
// If we have packages of the same name from find links, gives them priority, otherwise start empty
let mut version_map: BTreeMap<Version, PrioritizedDistribution> =
flat_index.map(Into::into).unwrap_or_default();