Move wheel download into a shared method (#2324)

## Summary

No behavioral changes; just taking code that's duplicated between two
branches in `distribution_database.rs` and pulling it into its own
method.
This commit is contained in:
Charlie Marsh 2024-03-09 19:40:00 -08:00 committed by GitHub
parent 6866a55f20
commit 6dcd00e031
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,6 +1,6 @@
use std::borrow::Cow;
use std::io;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use futures::{FutureExt, TryStreamExt};
@ -8,14 +8,14 @@ use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{info_span, instrument, Instrument};
use url::Url;
use distribution_filename::WheelFilename;
use distribution_types::{
BuiltDist, DirectGitUrl, Dist, FileLocation, IndexLocations, LocalEditable, Name, SourceDist,
};
use platform_tags::Tags;
use pypi_types::Metadata23;
use uv_cache::{ArchiveTarget, ArchiveTimestamp, Cache, CacheBucket, WheelCache};
use uv_cache::{ArchiveTarget, ArchiveTimestamp, Cache, CacheBucket, CacheEntry, WheelCache};
use uv_client::{CacheControl, CachedClientError, Connectivity, RegistryClient};
use uv_git::GitSource;
use uv_traits::{BuildContext, NoBinary, NoBuild};
@ -150,68 +150,17 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
}
};
// Create an entry for the wheel itself alongside its HTTP cache.
// Create a cache entry for the wheel.
let wheel_entry = self.cache.entry(
CacheBucket::Wheels,
WheelCache::Index(&wheel.index).remote_wheel_dir(wheel.name().as_ref()),
wheel.filename.stem(),
);
let http_entry = wheel_entry.with_file(format!("{}.http", wheel.filename.stem()));
let download = |response: reqwest::Response| {
async {
let reader = response
.bytes_stream()
.map_err(|err| self.handle_response_errors(err))
.into_async_read();
// Download and unzip the wheel to a temporary directory.
let temp_dir =
tempfile::tempdir_in(self.cache.root()).map_err(Error::CacheWrite)?;
uv_extract::stream::unzip(reader.compat(), temp_dir.path()).await?;
// Persist the temporary directory to the directory store.
let archive = self
.cache
.persist(temp_dir.into_path(), wheel_entry.path())
.map_err(Error::CacheRead)?;
Ok(archive)
}
.instrument(info_span!("download", wheel = %wheel))
};
let req = self
.client
.cached_client()
.uncached()
.get(url)
.header(
// `reqwest` defaults to accepting compressed responses.
// Specify identity encoding to get consistent .whl downloading
// behavior from servers. ref: https://github.com/pypa/pip/pull/1688
"accept-encoding",
reqwest::header::HeaderValue::from_static("identity"),
)
.build()?;
let cache_control = match self.client.connectivity() {
Connectivity::Online => CacheControl::from(
self.cache
.freshness(&http_entry, Some(wheel.name()))
.map_err(Error::CacheRead)?,
),
Connectivity::Offline => CacheControl::AllowStale,
};
// Download and unzip.
let archive = self
.client
.cached_client()
.get_serde(req, &http_entry, cache_control, download)
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,
CachedClientError::Client(err) => Error::Client(err),
})?;
.stream_wheel(url.clone(), &wheel.filename, &wheel_entry, &dist)
.await?;
Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
archive,
@ -224,67 +173,22 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
return Err(Error::NoBinary);
}
// Create an entry for the wheel itself alongside its HTTP cache.
// Create a cache entry for the wheel.
let wheel_entry = self.cache.entry(
CacheBucket::Wheels,
WheelCache::Url(&wheel.url).remote_wheel_dir(wheel.name().as_ref()),
wheel.filename.stem(),
);
let http_entry = wheel_entry.with_file(format!("{}.http", wheel.filename.stem()));
let download = |response: reqwest::Response| {
async {
let reader = response
.bytes_stream()
.map_err(|err| self.handle_response_errors(err))
.into_async_read();
// Download and unzip the wheel to a temporary directory.
let temp_dir =
tempfile::tempdir_in(self.cache.root()).map_err(Error::CacheWrite)?;
uv_extract::stream::unzip(reader.compat(), temp_dir.path()).await?;
// Persist the temporary directory to the directory store.
let archive = self
.cache
.persist(temp_dir.into_path(), wheel_entry.path())
.map_err(Error::CacheRead)?;
Ok(archive)
}
.instrument(info_span!("download", wheel = %wheel))
};
let req = self
.client
.cached_client()
.uncached()
.get(wheel.url.raw().clone())
.header(
// `reqwest` defaults to accepting compressed responses.
// Specify identity encoding to get consistent .whl downloading
// behavior from servers. ref: https://github.com/pypa/pip/pull/1688
"accept-encoding",
reqwest::header::HeaderValue::from_static("identity"),
)
.build()?;
let cache_control = match self.client.connectivity() {
Connectivity::Online => CacheControl::from(
self.cache
.freshness(&http_entry, Some(wheel.name()))
.map_err(Error::CacheRead)?,
),
Connectivity::Offline => CacheControl::AllowStale,
};
// Download and unzip.
let archive = self
.client
.cached_client()
.get_serde(req, &http_entry, cache_control, download)
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,
CachedClientError::Client(err) => Error::Client(err),
})?;
.stream_wheel(
wheel.url.raw().clone(),
&wheel.filename,
&wheel_entry,
&dist,
)
.await?;
Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
archive,
@ -465,6 +369,75 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
Ok(Some(Url::from(DirectGitUrl { url, subdirectory })))
}
/// Stream a wheel from a URL, unzipping it into the cache as it's downloaded.
async fn stream_wheel(
&self,
url: Url,
filename: &WheelFilename,
wheel_entry: &CacheEntry,
dist: &Dist,
) -> Result<PathBuf, Error> {
// Create an entry for the HTTP cache.
let http_entry = wheel_entry.with_file(format!("{}.http", filename.stem()));
let download = |response: reqwest::Response| {
async {
let reader = response
.bytes_stream()
.map_err(|err| self.handle_response_errors(err))
.into_async_read();
// Download and unzip the wheel to a temporary directory.
let temp_dir =
tempfile::tempdir_in(self.cache.root()).map_err(Error::CacheWrite)?;
uv_extract::stream::unzip(reader.compat(), temp_dir.path()).await?;
// Persist the temporary directory to the directory store.
let archive = self
.cache
.persist(temp_dir.into_path(), wheel_entry.path())
.map_err(Error::CacheRead)?;
Ok(archive)
}
.instrument(info_span!("wheel", wheel = %dist))
};
let req = self
.client
.cached_client()
.uncached()
.get(url)
.header(
// `reqwest` defaults to accepting compressed responses.
// Specify identity encoding to get consistent .whl downloading
// behavior from servers. ref: https://github.com/pypa/pip/pull/1688
"accept-encoding",
reqwest::header::HeaderValue::from_static("identity"),
)
.build()?;
let cache_control = match self.client.connectivity() {
Connectivity::Online => CacheControl::from(
self.cache
.freshness(&http_entry, Some(&filename.name))
.map_err(Error::CacheRead)?,
),
Connectivity::Offline => CacheControl::AllowStale,
};
let archive = self
.client
.cached_client()
.get_serde(req, &http_entry, cache_control, download)
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,
CachedClientError::Client(err) => Error::Client(err),
})?;
Ok(archive)
}
/// Return the [`IndexLocations`] used by this resolver.
pub fn index_locations(&self) -> &IndexLocations {
self.build_context.index_locations()
}