Stream zip archive when fetching non-range-request metadata (#1792)

## Summary

If a registry doesn't support range requests, then today, we download
the entire wheel to disk and then read the metadata from the downloaded
archive. This PR instead modifies the registry client to stream the
zipfile and stop as soon as it's seen the metadata, which should be more
efficient.

Closes https://github.com/astral-sh/uv/issues/1596.

## Test Plan

Made this the _only_ path for downloading metadata; verified that the
test suite passed.
This commit is contained in:
Charlie Marsh 2024-02-20 22:12:21 -05:00 committed by GitHub
parent 31752bf4be
commit 5d53040465
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 102 additions and 32 deletions

View file

@ -95,6 +95,36 @@ pub enum Error {
MismatchedVersion(Version, Version), MismatchedVersion(Version, Version),
} }
/// Returns `true` if the file is a `METADATA` file in a `dist-info` directory that matches the
/// wheel filename.
pub fn is_metadata_entry(path: &str, filename: &WheelFilename) -> bool {
let Some((dist_info_dir, file)) = path.split_once('/') else {
return false;
};
if file != "METADATA" {
return false;
}
let Some(dir_stem) = dist_info_dir.strip_suffix(".dist-info") else {
return false;
};
let Some((name, version)) = dir_stem.rsplit_once('-') else {
return false;
};
let Ok(name) = PackageName::from_str(name) else {
return false;
};
if name != filename.name {
return false;
}
let Ok(version) = Version::from_str(version) else {
return false;
};
if version != filename.version {
return false;
}
true
}
/// Find the `dist-info` directory from a list of files. /// Find the `dist-info` directory from a list of files.
/// ///
/// The metadata name may be uppercase, while the wheel and dist info names are lowercase, or /// The metadata name may be uppercase, while the wheel and dist info names are lowercase, or
@ -111,16 +141,21 @@ pub fn find_dist_info<'a, T: Copy>(
let metadatas: Vec<_> = files let metadatas: Vec<_> = files
.filter_map(|(payload, path)| { .filter_map(|(payload, path)| {
let (dist_info_dir, file) = path.split_once('/')?; let (dist_info_dir, file) = path.split_once('/')?;
if file != "METADATA" {
return None;
}
let dir_stem = dist_info_dir.strip_suffix(".dist-info")?; let dir_stem = dist_info_dir.strip_suffix(".dist-info")?;
let (name, version) = dir_stem.rsplit_once('-')?; let (name, version) = dir_stem.rsplit_once('-')?;
if PackageName::from_str(name).ok()? == filename.name if PackageName::from_str(name).ok()? != filename.name {
&& Version::from_str(version).ok()? == filename.version return None;
&& file == "METADATA"
{
Some((payload, dir_stem))
} else {
None
} }
if Version::from_str(version).ok()? != filename.version {
return None;
}
Some((payload, dir_stem))
}) })
.collect(); .collect();
let (payload, dist_info_prefix) = match metadatas[..] { let (payload, dist_info_prefix) = match metadatas[..] {

View file

@ -68,6 +68,10 @@ pub enum ErrorKind {
#[error("Couldn't parse metadata of {0} from {1}")] #[error("Couldn't parse metadata of {0} from {1}")]
MetadataParseError(WheelFilename, String, #[source] Box<pypi_types::Error>), MetadataParseError(WheelFilename, String, #[source] Box<pypi_types::Error>),
/// The metadata file was not found in the wheel.
#[error("Metadata file `{0}` was not found in {1}")]
MetadataNotFound(WheelFilename, String),
/// The metadata file was not found in the registry. /// The metadata file was not found in the registry.
#[error("File `{0}` was not found in the registry at {1}.")] #[error("File `{0}` was not found in the registry at {1}.")]
FileNotFound(String, #[source] reqwest::Error), FileNotFound(String, #[source] reqwest::Error),

View file

@ -5,21 +5,19 @@ use std::path::Path;
use std::str::FromStr; use std::str::FromStr;
use async_http_range_reader::AsyncHttpRangeReader; use async_http_range_reader::AsyncHttpRangeReader;
use async_zip::tokio::read::seek::ZipFileReader;
use futures::{FutureExt, TryStreamExt}; use futures::{FutureExt, TryStreamExt};
use reqwest::{Client, ClientBuilder, Response, StatusCode}; use reqwest::{Client, ClientBuilder, Response, StatusCode};
use reqwest_retry::policies::ExponentialBackoff; use reqwest_retry::policies::ExponentialBackoff;
use reqwest_retry::RetryTransientMiddleware; use reqwest_retry::RetryTransientMiddleware;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tempfile::tempfile_in; use tokio::io::AsyncReadExt;
use tokio::io::BufWriter;
use tokio_util::compat::FuturesAsyncReadCompatExt; use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{debug, info_span, instrument, trace, warn, Instrument}; use tracing::{debug, info_span, instrument, trace, warn, Instrument};
use url::Url; use url::Url;
use distribution_filename::{DistFilename, SourceDistFilename, WheelFilename}; use distribution_filename::{DistFilename, SourceDistFilename, WheelFilename};
use distribution_types::{BuiltDist, File, FileLocation, IndexUrl, IndexUrls, Name}; use distribution_types::{BuiltDist, File, FileLocation, IndexUrl, IndexUrls, Name};
use install_wheel_rs::find_dist_info; use install_wheel_rs::{find_dist_info, is_metadata_entry};
use pep440_rs::Version; use pep440_rs::Version;
use pypi_types::{Metadata21, SimpleJson}; use pypi_types::{Metadata21, SimpleJson};
use uv_cache::{Cache, CacheBucket, WheelCache}; use uv_cache::{Cache, CacheBucket, WheelCache};
@ -323,7 +321,8 @@ impl RegistryClient {
.await .await
.map_err(ErrorKind::Io)?; .map_err(ErrorKind::Io)?;
let reader = tokio::io::BufReader::new(file); let reader = tokio::io::BufReader::new(file);
read_metadata_async(&wheel.filename, built_dist.to_string(), reader).await? read_metadata_async_seek(&wheel.filename, built_dist.to_string(), reader)
.await?
} }
}, },
BuiltDist::DirectUrl(wheel) => { BuiltDist::DirectUrl(wheel) => {
@ -339,7 +338,7 @@ impl RegistryClient {
.await .await
.map_err(ErrorKind::Io)?; .map_err(ErrorKind::Io)?;
let reader = tokio::io::BufReader::new(file); let reader = tokio::io::BufReader::new(file);
read_metadata_async(&wheel.filename, built_dist.to_string(), reader).await? read_metadata_async_seek(&wheel.filename, built_dist.to_string(), reader).await?
} }
}; };
@ -489,20 +488,9 @@ impl RegistryClient {
} }
} }
// TODO(konstin): Download the wheel into a cache shared with the installer instead // Stream the file, searching for the METADATA.
// Note that this branch is only hit when you're not using and the server where let reader = self.stream_external(url).await?;
// you host your wheels for some reasons doesn't support range requests read_metadata_async_stream(filename, url.to_string(), reader).await
// (tbh we should probably warn here and tell users to get a better registry because
// their current one makes resolution unnecessary slow).
let temp_download = tempfile_in(self.cache.root()).map_err(ErrorKind::CacheWrite)?;
let mut writer = BufWriter::new(tokio::fs::File::from_std(temp_download));
let mut reader = self.stream_external(url).await?.compat();
tokio::io::copy(&mut reader, &mut writer)
.await
.map_err(ErrorKind::CacheWrite)?;
let reader = writer.into_inner();
read_metadata_async(filename, url.to_string(), reader).await
} }
/// Stream a file from an external URL. /// Stream a file from an external URL.
@ -526,13 +514,13 @@ impl RegistryClient {
} }
} }
/// It doesn't really fit into `uv_client`, but it avoids cyclical crate dependencies. /// Read a wheel's `METADATA` file from a zip file.
async fn read_metadata_async( async fn read_metadata_async_seek(
filename: &WheelFilename, filename: &WheelFilename,
debug_source: String, debug_source: String,
reader: impl tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin, reader: impl tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin,
) -> Result<Metadata21, Error> { ) -> Result<Metadata21, Error> {
let mut zip_reader = ZipFileReader::with_tokio(reader) let mut zip_reader = async_zip::tokio::read::seek::ZipFileReader::with_tokio(reader)
.await .await
.map_err(|err| ErrorKind::Zip(filename.clone(), err))?; .map_err(|err| ErrorKind::Zip(filename.clone(), err))?;
@ -543,11 +531,11 @@ async fn read_metadata_async(
.entries() .entries()
.iter() .iter()
.enumerate() .enumerate()
.filter_map(|(idx, e)| Some((idx, e.filename().as_str().ok()?))), .filter_map(|(index, entry)| Some((index, entry.filename().as_str().ok()?))),
) )
.map_err(ErrorKind::InstallWheel)?; .map_err(ErrorKind::InstallWheel)?;
// Read the contents of the METADATA file // Read the contents of the `METADATA` file.
let mut contents = Vec::new(); let mut contents = Vec::new();
zip_reader zip_reader
.reader_with_entry(metadata_idx) .reader_with_entry(metadata_idx)
@ -563,6 +551,49 @@ async fn read_metadata_async(
Ok(metadata) Ok(metadata)
} }
/// Like [`read_metadata_async_seek`], but doesn't use seek.
async fn read_metadata_async_stream<R: futures::AsyncRead + Unpin>(
filename: &WheelFilename,
debug_source: String,
reader: R,
) -> Result<Metadata21, Error> {
let mut zip = async_zip::base::read::stream::ZipFileReader::new(reader);
while let Some(mut entry) = zip
.next_with_entry()
.await
.map_err(|err| ErrorKind::Zip(filename.clone(), err))?
{
// Find the `METADATA` entry.
let path = entry
.reader()
.entry()
.filename()
.as_str()
.map_err(|err| ErrorKind::Zip(filename.clone(), err))?;
if is_metadata_entry(path, filename) {
let mut reader = entry.reader_mut().compat();
let mut contents = Vec::new();
reader.read_to_end(&mut contents).await.unwrap();
let metadata = Metadata21::parse(&contents).map_err(|err| {
ErrorKind::MetadataParseError(filename.clone(), debug_source, Box::new(err))
})?;
return Ok(metadata);
}
// Close current file to get access to the next one. See docs:
// https://docs.rs/async_zip/0.0.16/async_zip/base/read/stream/
zip = entry
.skip()
.await
.map_err(|err| ErrorKind::Zip(filename.clone(), err))?;
}
Err(ErrorKind::MetadataNotFound(filename.clone(), debug_source).into())
}
#[derive( #[derive(
Default, Debug, Serialize, Deserialize, rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Default, Debug, Serialize, Deserialize, rkyv::Archive, rkyv::Deserialize, rkyv::Serialize,
)] )]