Add Seek fallback for zip files (#2320)

## Summary

Some zip files can't be streamed; in particular, `rs-async-zip` doesn't
support data descriptors right now (though it may in the future). This
PR adds a fallback path for such zips that downloads the entire zip file
to disk, then unzips it from disk (which gives us `Seek`).

Closes https://github.com/astral-sh/uv/issues/2216.

## Test Plan

`cargo run pip install --extra-index-url https://buf.build/gen/python
hashb_foxglove_protocolbuffers_python==25.3.0.1.20240226043130+465630478360
--force-reinstall -n`
This commit is contained in:
Charlie Marsh 2024-03-10 08:39:28 -07:00 committed by GitHub
parent 67fb023f10
commit a267a501b6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 591 additions and 160 deletions

View file

@ -4,8 +4,9 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use futures::{FutureExt, TryStreamExt};
use tokio::io::AsyncSeekExt;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{info_span, instrument, Instrument};
use tracing::{info_span, instrument, warn, Instrument};
use url::Url;
use distribution_filename::WheelFilename;
@ -158,14 +159,33 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
);
// Download and unzip.
let archive = self
match self
.stream_wheel(url.clone(), &wheel.filename, &wheel_entry, &dist)
.await?;
Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
archive,
filename: wheel.filename.clone(),
}))
.await
{
Ok(archive) => Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
archive,
filename: wheel.filename.clone(),
})),
Err(Error::Extract(err)) if err.is_http_streaming_unsupported() => {
warn!(
"Streaming unsupported for {dist}; downloading wheel to disk ({err})"
);
// If the request failed because streaming is unsupported, download the
// wheel directly.
let archive = self
.download_wheel(url, &wheel.filename, &wheel_entry, &dist)
.await?;
Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
archive,
filename: wheel.filename.clone(),
}))
}
Err(err) => Err(err),
}
}
Dist::Built(BuiltDist::DirectUrl(wheel)) => {
@ -181,19 +201,43 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
);
// Download and unzip.
let archive = self
match self
.stream_wheel(
wheel.url.raw().clone(),
&wheel.filename,
&wheel_entry,
&dist,
)
.await?;
Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
archive,
filename: wheel.filename.clone(),
}))
.await
{
Ok(archive) => Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
archive,
filename: wheel.filename.clone(),
})),
Err(Error::Client(err)) if err.is_http_streaming_unsupported() => {
warn!(
"Streaming unsupported for {dist}; downloading wheel to disk ({err})"
);
// If the request failed because streaming is unsupported, download the
// wheel directly.
let archive = self
.download_wheel(
wheel.url.raw().clone(),
&wheel.filename,
&wheel_entry,
&dist,
)
.await?;
Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
archive,
filename: wheel.filename.clone(),
}))
}
Err(err) => Err(err),
}
}
Dist::Built(BuiltDist::Path(wheel)) => {
@ -277,7 +321,18 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
) -> Result<(Metadata23, Option<Url>), Error> {
match dist {
Dist::Built(built_dist) => {
Ok((self.client.wheel_metadata(built_dist).boxed().await?, None))
match self.client.wheel_metadata(built_dist).boxed().await {
Ok(metadata) => Ok((metadata, None)),
Err(err) if err.is_http_streaming_unsupported() => {
warn!("Streaming unsupported when fetching metadata for {dist}; downloading wheel directly ({err})");
// If the request failed due to an error that could be resolved by
// downloading the wheel directly, try that.
let wheel = self.get_or_build_wheel(dist.clone()).await?;
Ok((wheel.metadata()?, None))
}
Err(err) => Err(err.into()),
}
}
Dist::Source(source_dist) => {
let no_build = match self.build_context.no_build() {
@ -437,6 +492,87 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
Ok(archive)
}
/// Download a wheel from a URL, then unzip it into the cache.
async fn download_wheel(
&self,
url: Url,
filename: &WheelFilename,
wheel_entry: &CacheEntry,
dist: &Dist,
) -> Result<PathBuf, Error> {
// Create an entry for the HTTP cache.
let http_entry = wheel_entry.with_file(format!("{}.http", filename.stem()));
let download = |response: reqwest::Response| {
async {
let reader = response
.bytes_stream()
.map_err(|err| self.handle_response_errors(err))
.into_async_read();
// Download the wheel to a temporary file.
let temp_file =
tempfile::tempfile_in(self.cache.root()).map_err(Error::CacheWrite)?;
let mut writer = tokio::io::BufWriter::new(tokio::fs::File::from_std(temp_file));
tokio::io::copy(&mut reader.compat(), &mut writer)
.await
.map_err(Error::CacheWrite)?;
// Unzip the wheel to a temporary directory.
let temp_dir =
tempfile::tempdir_in(self.cache.root()).map_err(Error::CacheWrite)?;
let mut file = writer.into_inner();
file.seek(io::SeekFrom::Start(0))
.await
.map_err(Error::CacheWrite)?;
let reader = tokio::io::BufReader::new(file);
uv_extract::seek::unzip(reader, temp_dir.path()).await?;
// Persist the temporary directory to the directory store.
let archive = self
.cache
.persist(temp_dir.into_path(), wheel_entry.path())
.map_err(Error::CacheRead)?;
Ok(archive)
}
.instrument(info_span!("wheel", wheel = %dist))
};
let req = self
.client
.cached_client()
.uncached()
.get(url)
.header(
// `reqwest` defaults to accepting compressed responses.
// Specify identity encoding to get consistent .whl downloading
// behavior from servers. ref: https://github.com/pypa/pip/pull/1688
"accept-encoding",
reqwest::header::HeaderValue::from_static("identity"),
)
.build()?;
let cache_control = match self.client.connectivity() {
Connectivity::Online => CacheControl::from(
self.cache
.freshness(&http_entry, Some(&filename.name))
.map_err(Error::CacheRead)?,
),
Connectivity::Offline => CacheControl::AllowStale,
};
let archive = self
.client
.cached_client()
.get_serde(req, &http_entry, cache_control, download)
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,
CachedClientError::Client(err) => Error::Client(err),
})?;
Ok(archive)
}
/// Return the [`IndexLocations`] used by this resolver.
pub fn index_locations(&self) -> &IndexLocations {
self.build_context.index_locations()