Track HTTP caches for URL wheels (#1071)

## Summary

This PR ensures that we store HTTP caching information for wheels.
Previously, we only stored these for source distributions. This will be
helpful for refresh, since we can avoid re-downloading wheels that are
unchanged per HTTP caching semantics.

There should be zero performance hit here for warm installs, and only an
extremely small hit for cold installs (writing the HTTP cache data to
disk). The hyperfine benchmarks reflect this.
This commit is contained in:
Charlie Marsh 2024-01-23 17:31:42 -05:00 committed by GitHub
parent 09f5884f28
commit cf8b452414
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,23 +1,21 @@
use std::borrow::Cow;
use std::io;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use futures::FutureExt;
use futures::{FutureExt, TryStreamExt};
use thiserror::Error;
use tokio::task::JoinError;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::instrument;
use tracing::{info_span, instrument, Instrument};
use url::Url;
use distribution_filename::{WheelFilename, WheelFilenameError};
use distribution_types::{
BuiltDist, DirectGitUrl, Dist, FileLocation, LocalEditable, Name, SourceDist,
};
use platform_tags::Tags;
use puffin_cache::{Cache, CacheBucket, WheelCache};
use puffin_client::RegistryClient;
use puffin_client::{CachedClientError, RegistryClient};
use puffin_extract::unzip_no_seek;
use puffin_git::GitSource;
use puffin_traits::{BuildContext, NoBinary};
@ -33,15 +31,9 @@ pub enum DistributionDatabaseError {
#[error("Failed to parse URL: {0}")]
Url(String, #[source] url::ParseError),
#[error(transparent)]
WheelFilename(#[from] WheelFilenameError),
#[error(transparent)]
Client(#[from] puffin_client::Error),
#[error(transparent)]
Extract(#[from] puffin_extract::Error),
#[error(transparent)]
Io(#[from] io::Error),
#[error(transparent)]
Distribution(#[from] distribution_types::Error),
Request(#[from] reqwest::Error),
#[error(transparent)]
SourceBuild(#[from] SourceDistError),
#[error("Git operation failed")]
@ -146,41 +138,48 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
}
};
// Download and unzip on the same tokio task.
//
// In all wheels we've seen so far, unzipping while downloading is
// faster than downloading into a file and then unzipping on multiple
// threads.
//
// Writing to a file first may be faster if the wheel takes longer to
// unzip than it takes to download. This may happen if the wheel is a
// zip bomb, or if the machine has a weak cpu (with many cores), but a
// fast network.
//
// If we find such a case, it may make sense to create separate tasks
// for downloading and unzipping (with a buffer in between) and switch
// to rayon if this buffer grows large by the time the file is fully
// downloaded.
let reader = self.client.stream_external(&url).await?;
// Download and unzip the wheel to a temporary directory.
let temp_dir = tempfile::tempdir_in(self.cache.root())?;
unzip_no_seek(reader.compat(), temp_dir.path()).await?;
// Persist the temporary directory to the directory store.
let wheel_filename = WheelFilename::from_str(&wheel.file.filename)?;
let cache_entry = self.cache.entry(
// Create an entry for the wheel itself alongside its HTTP cache.
let wheel_entry = self.cache.entry(
CacheBucket::Wheels,
WheelCache::Index(&wheel.index).remote_wheel_dir(wheel_filename.name.as_ref()),
wheel_filename.stem(),
WheelCache::Index(&wheel.index).remote_wheel_dir(wheel.name().as_ref()),
wheel.filename.stem(),
);
self.cache
.persist(temp_dir.into_path(), cache_entry.path())?;
let http_entry = wheel_entry.with_file(format!("{}.http", wheel.filename.stem()));
let download = |response: reqwest::Response| {
async {
let reader = response
.bytes_stream()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.into_async_read();
// Download and unzip the wheel to a temporary directory.
let temp_dir = tempfile::tempdir_in(self.cache.root())?;
unzip_no_seek(reader.compat(), temp_dir.path()).await?;
// Persist the temporary directory to the directory store.
self.cache
.persist(temp_dir.into_path(), wheel_entry.path())?;
Ok(())
}
.instrument(info_span!("download", wheel = %wheel))
};
let req = self.client.cached_client().uncached().get(url).build()?;
self.client
.cached_client()
.get_cached_with_callback(req, &http_entry, download)
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,
CachedClientError::Client(err) => SourceDistError::Client(err),
})?;
Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
target: cache_entry.into_path_buf(),
filename: wheel_filename,
target: wheel_entry.into_path_buf(),
filename: wheel.filename.clone(),
}))
}
@ -189,24 +188,52 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
return Err(DistributionDatabaseError::NoBinary);
}
let reader = self.client.stream_external(&wheel.url).await?;
// Download and unzip the wheel to a temporary directory.
let temp_dir = tempfile::tempdir_in(self.cache.root())?;
unzip_no_seek(reader.compat(), temp_dir.path()).await?;
// Persist the temporary directory to the directory store.
let cache_entry = self.cache.entry(
// Create an entry for the wheel itself alongside its HTTP cache.
let wheel_entry = self.cache.entry(
CacheBucket::Wheels,
WheelCache::Url(&wheel.url).remote_wheel_dir(wheel.name().as_ref()),
wheel.filename.stem(),
);
self.cache
.persist(temp_dir.into_path(), cache_entry.path())?;
let http_entry = wheel_entry.with_file(format!("{}.http", wheel.filename.stem()));
let download = |response: reqwest::Response| {
async {
let reader = response
.bytes_stream()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.into_async_read();
// Download and unzip the wheel to a temporary directory.
let temp_dir = tempfile::tempdir_in(self.cache.root())?;
unzip_no_seek(reader.compat(), temp_dir.path()).await?;
// Persist the temporary directory to the directory store.
self.cache
.persist(temp_dir.into_path(), wheel_entry.path())?;
Ok(())
}
.instrument(info_span!("download", wheel = %wheel))
};
let req = self
.client
.cached_client()
.uncached()
.get(wheel.url.raw().clone())
.build()?;
self.client
.cached_client()
.get_cached_with_callback(req, &http_entry, download)
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,
CachedClientError::Client(err) => SourceDistError::Client(err),
})?;
Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
target: cache_entry.into_path_buf(),
target: wheel_entry.into_path_buf(),
filename: wheel.filename.clone(),
}))
}