From cf8b452414c5c4708b3676dec642a7df2d67bde5 Mon Sep 17 00:00:00 2001 From: Charlie Marsh Date: Tue, 23 Jan 2024 17:31:42 -0500 Subject: [PATCH] Track HTTP caches for URL wheels (#1071) ## Summary This PR ensures that we store HTTP caching information for wheels. Previously, we only stored these for source distributions. This will be helpful for refresh, since we can avoid re-downloading wheels that are unchanged per HTTP caching semantics. There should be zero performance hit here for warm installs, and only an extremely small hit for cold installs (writing the HTTP cache data to disk). The hyperfine benchmarks reflect this. --- .../src/distribution_database.rs | 133 +++++++++++------- 1 file changed, 80 insertions(+), 53 deletions(-) diff --git a/crates/puffin-distribution/src/distribution_database.rs b/crates/puffin-distribution/src/distribution_database.rs index f761404ef..9ff6e902b 100644 --- a/crates/puffin-distribution/src/distribution_database.rs +++ b/crates/puffin-distribution/src/distribution_database.rs @@ -1,23 +1,21 @@ use std::borrow::Cow; use std::io; use std::path::Path; -use std::str::FromStr; use std::sync::Arc; -use futures::FutureExt; +use futures::{FutureExt, TryStreamExt}; use thiserror::Error; use tokio::task::JoinError; use tokio_util::compat::FuturesAsyncReadCompatExt; -use tracing::instrument; +use tracing::{info_span, instrument, Instrument}; use url::Url; -use distribution_filename::{WheelFilename, WheelFilenameError}; use distribution_types::{ BuiltDist, DirectGitUrl, Dist, FileLocation, LocalEditable, Name, SourceDist, }; use platform_tags::Tags; use puffin_cache::{Cache, CacheBucket, WheelCache}; -use puffin_client::RegistryClient; +use puffin_client::{CachedClientError, RegistryClient}; use puffin_extract::unzip_no_seek; use puffin_git::GitSource; use puffin_traits::{BuildContext, NoBinary}; @@ -33,15 +31,9 @@ pub enum DistributionDatabaseError { #[error("Failed to parse URL: {0}")] Url(String, #[source] url::ParseError), #[error(transparent)] - WheelFilename(#[from] WheelFilenameError), - #[error(transparent)] Client(#[from] puffin_client::Error), #[error(transparent)] - Extract(#[from] puffin_extract::Error), - #[error(transparent)] - Io(#[from] io::Error), - #[error(transparent)] - Distribution(#[from] distribution_types::Error), + Request(#[from] reqwest::Error), #[error(transparent)] SourceBuild(#[from] SourceDistError), #[error("Git operation failed")] @@ -146,41 +138,48 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> } }; - // Download and unzip on the same tokio task. - // - // In all wheels we've seen so far, unzipping while downloading is - // faster than downloading into a file and then unzipping on multiple - // threads. - // - // Writing to a file first may be faster if the wheel takes longer to - // unzip than it takes to download. This may happen if the wheel is a - // zip bomb, or if the machine has a weak cpu (with many cores), but a - // fast network. - // - // If we find such a case, it may make sense to create separate tasks - // for downloading and unzipping (with a buffer in between) and switch - // to rayon if this buffer grows large by the time the file is fully - // downloaded. - let reader = self.client.stream_external(&url).await?; - - // Download and unzip the wheel to a temporary directory. - let temp_dir = tempfile::tempdir_in(self.cache.root())?; - unzip_no_seek(reader.compat(), temp_dir.path()).await?; - - // Persist the temporary directory to the directory store. - let wheel_filename = WheelFilename::from_str(&wheel.file.filename)?; - let cache_entry = self.cache.entry( + // Create an entry for the wheel itself alongside its HTTP cache. + let wheel_entry = self.cache.entry( CacheBucket::Wheels, - WheelCache::Index(&wheel.index).remote_wheel_dir(wheel_filename.name.as_ref()), - wheel_filename.stem(), + WheelCache::Index(&wheel.index).remote_wheel_dir(wheel.name().as_ref()), + wheel.filename.stem(), ); - self.cache - .persist(temp_dir.into_path(), cache_entry.path())?; + let http_entry = wheel_entry.with_file(format!("{}.http", wheel.filename.stem())); + + let download = |response: reqwest::Response| { + async { + let reader = response + .bytes_stream() + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) + .into_async_read(); + + // Download and unzip the wheel to a temporary directory. + let temp_dir = tempfile::tempdir_in(self.cache.root())?; + unzip_no_seek(reader.compat(), temp_dir.path()).await?; + + // Persist the temporary directory to the directory store. + self.cache + .persist(temp_dir.into_path(), wheel_entry.path())?; + + Ok(()) + } + .instrument(info_span!("download", wheel = %wheel)) + }; + + let req = self.client.cached_client().uncached().get(url).build()?; + self.client + .cached_client() + .get_cached_with_callback(req, &http_entry, download) + .await + .map_err(|err| match err { + CachedClientError::Callback(err) => err, + CachedClientError::Client(err) => SourceDistError::Client(err), + })?; Ok(LocalWheel::Unzipped(UnzippedWheel { dist: dist.clone(), - target: cache_entry.into_path_buf(), - filename: wheel_filename, + target: wheel_entry.into_path_buf(), + filename: wheel.filename.clone(), })) } @@ -189,24 +188,52 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> return Err(DistributionDatabaseError::NoBinary); } - let reader = self.client.stream_external(&wheel.url).await?; - - // Download and unzip the wheel to a temporary directory. - let temp_dir = tempfile::tempdir_in(self.cache.root())?; - unzip_no_seek(reader.compat(), temp_dir.path()).await?; - - // Persist the temporary directory to the directory store. - let cache_entry = self.cache.entry( + // Create an entry for the wheel itself alongside its HTTP cache. + let wheel_entry = self.cache.entry( CacheBucket::Wheels, WheelCache::Url(&wheel.url).remote_wheel_dir(wheel.name().as_ref()), wheel.filename.stem(), ); - self.cache - .persist(temp_dir.into_path(), cache_entry.path())?; + let http_entry = wheel_entry.with_file(format!("{}.http", wheel.filename.stem())); + + let download = |response: reqwest::Response| { + async { + let reader = response + .bytes_stream() + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) + .into_async_read(); + + // Download and unzip the wheel to a temporary directory. + let temp_dir = tempfile::tempdir_in(self.cache.root())?; + unzip_no_seek(reader.compat(), temp_dir.path()).await?; + + // Persist the temporary directory to the directory store. + self.cache + .persist(temp_dir.into_path(), wheel_entry.path())?; + + Ok(()) + } + .instrument(info_span!("download", wheel = %wheel)) + }; + + let req = self + .client + .cached_client() + .uncached() + .get(wheel.url.raw().clone()) + .build()?; + self.client + .cached_client() + .get_cached_with_callback(req, &http_entry, download) + .await + .map_err(|err| match err { + CachedClientError::Callback(err) => err, + CachedClientError::Client(err) => SourceDistError::Client(err), + })?; Ok(LocalWheel::Unzipped(UnzippedWheel { dist: dist.clone(), - target: cache_entry.into_path_buf(), + target: wheel_entry.into_path_buf(), filename: wheel.filename.clone(), })) }