From b6ab919945bea94210d07de92bcb17c3fd344523 Mon Sep 17 00:00:00 2001 From: Charlie Marsh Date: Wed, 27 Mar 2024 20:06:25 -0400 Subject: [PATCH] Make tags non-required for fetching wheel metadata (#2700) ## Summary This looks like a big change but it really isn't. Rather, I just split `get_or_build_wheel` into separate `get_wheel` and `build_wheel` methods internally, which made `get_or_build_wheel_metadata` capable of _not_ relying on `Tags`, which in turn makes it easier for us to use the `DistributionDatabase` in various places without having it coupled to an interpreter or environment (something we already did for `SourceDistributionBuilder`). --- .../src/distribution_database.rs | 449 +++++++++--------- crates/uv-installer/src/downloader.rs | 15 +- crates/uv-resolver/src/resolver/mod.rs | 2 +- 3 files changed, 230 insertions(+), 236 deletions(-) diff --git a/crates/uv-distribution/src/distribution_database.rs b/crates/uv-distribution/src/distribution_database.rs index 6f9d1be0b..8d92e8db2 100644 --- a/crates/uv-distribution/src/distribution_database.rs +++ b/crates/uv-distribution/src/distribution_database.rs @@ -11,7 +11,7 @@ use url::Url; use distribution_filename::WheelFilename; use distribution_types::{ - BuildableSource, BuiltDist, Dist, FileLocation, IndexLocations, LocalEditable, Name, + BuildableSource, BuiltDist, Dist, FileLocation, IndexLocations, LocalEditable, Name, SourceDist, }; use platform_tags::Tags; use pypi_types::Metadata23; @@ -40,24 +40,17 @@ pub struct DistributionDatabase<'a, Context: BuildContext + Send + Sync> { cache: &'a Cache, reporter: Option>, locks: Arc, - tags: &'a Tags, client: &'a RegistryClient, build_context: &'a Context, builder: SourceDistCachedBuilder<'a, Context>, } impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> { - pub fn new( - cache: &'a Cache, - tags: &'a Tags, - client: &'a RegistryClient, - build_context: &'a Context, - ) -> Self { + pub fn new(cache: &'a Cache, client: &'a RegistryClient, build_context: &'a Context) -> Self { Self { cache, reporter: None, locks: Arc::new(Locks::default()), - tags, client, build_context, builder: SourceDistCachedBuilder::new(build_context, client), @@ -94,222 +87,10 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> /// If `no_remote_wheel` is set, the wheel will be built from a source distribution /// even if compatible pre-built wheels are available. #[instrument(skip(self))] - pub async fn get_or_build_wheel(&self, dist: Dist) -> Result { - let no_binary = match self.build_context.no_binary() { - NoBinary::None => false, - NoBinary::All => true, - NoBinary::Packages(packages) => packages.contains(dist.name()), - }; - match &dist { - Dist::Built(BuiltDist::Registry(wheel)) => { - if no_binary { - return Err(Error::NoBinary); - } - - let url = match &wheel.file.url { - FileLocation::RelativeUrl(base, url) => { - pypi_types::base_url_join_relative(base, url)? - } - FileLocation::AbsoluteUrl(url) => { - Url::parse(url).map_err(|err| Error::Url(url.clone(), err))? - } - FileLocation::Path(path) => { - let url = Url::from_file_path(path).expect("path is absolute"); - let cache_entry = self.cache.entry( - CacheBucket::Wheels, - WheelCache::Url(&url).wheel_dir(wheel.name().as_ref()), - wheel.filename.stem(), - ); - - // If the file is already unzipped, and the unzipped directory is fresh, - // return it. - match cache_entry.path().canonicalize() { - Ok(archive) => { - if ArchiveTimestamp::up_to_date_with( - path, - ArchiveTarget::Cache(&archive), - ) - .map_err(Error::CacheRead)? - { - return Ok(LocalWheel::Unzipped(UnzippedWheel { - dist: dist.clone(), - archive, - filename: wheel.filename.clone(), - })); - } - } - Err(err) if err.kind() == io::ErrorKind::NotFound => {} - Err(err) => return Err(Error::CacheRead(err)), - } - - // Otherwise, unzip the file. - return Ok(LocalWheel::Disk(DiskWheel { - dist: dist.clone(), - path: path.clone(), - target: cache_entry.into_path_buf(), - filename: wheel.filename.clone(), - })); - } - }; - - // Create a cache entry for the wheel. - let wheel_entry = self.cache.entry( - CacheBucket::Wheels, - WheelCache::Index(&wheel.index).wheel_dir(wheel.name().as_ref()), - wheel.filename.stem(), - ); - - // Download and unzip. - match self - .stream_wheel(url.clone(), &wheel.filename, &wheel_entry, &dist) - .await - { - Ok(archive) => Ok(LocalWheel::Unzipped(UnzippedWheel { - dist: dist.clone(), - archive, - filename: wheel.filename.clone(), - })), - Err(Error::Extract(err)) if err.is_http_streaming_unsupported() => { - warn!( - "Streaming unsupported for {dist}; downloading wheel to disk ({err})" - ); - - // If the request failed because streaming is unsupported, download the - // wheel directly. - let archive = self - .download_wheel(url, &wheel.filename, &wheel_entry, &dist) - .await?; - Ok(LocalWheel::Unzipped(UnzippedWheel { - dist: dist.clone(), - archive, - filename: wheel.filename.clone(), - })) - } - Err(err) => Err(err), - } - } - - Dist::Built(BuiltDist::DirectUrl(wheel)) => { - if no_binary { - return Err(Error::NoBinary); - } - - // Create a cache entry for the wheel. - let wheel_entry = self.cache.entry( - CacheBucket::Wheels, - WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()), - wheel.filename.stem(), - ); - - // Download and unzip. - match self - .stream_wheel( - wheel.url.raw().clone(), - &wheel.filename, - &wheel_entry, - &dist, - ) - .await - { - Ok(archive) => Ok(LocalWheel::Unzipped(UnzippedWheel { - dist: dist.clone(), - archive, - filename: wheel.filename.clone(), - })), - Err(Error::Client(err)) if err.is_http_streaming_unsupported() => { - warn!( - "Streaming unsupported for {dist}; downloading wheel to disk ({err})" - ); - - // If the request failed because streaming is unsupported, download the - // wheel directly. - let archive = self - .download_wheel( - wheel.url.raw().clone(), - &wheel.filename, - &wheel_entry, - &dist, - ) - .await?; - Ok(LocalWheel::Unzipped(UnzippedWheel { - dist: dist.clone(), - archive, - filename: wheel.filename.clone(), - })) - } - Err(err) => Err(err), - } - } - - Dist::Built(BuiltDist::Path(wheel)) => { - if no_binary { - return Err(Error::NoBinary); - } - - let cache_entry = self.cache.entry( - CacheBucket::Wheels, - WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()), - wheel.filename.stem(), - ); - - // If the file is already unzipped, and the unzipped directory is fresh, - // return it. - match cache_entry.path().canonicalize() { - Ok(archive) => { - if ArchiveTimestamp::up_to_date_with( - &wheel.path, - ArchiveTarget::Cache(&archive), - ) - .map_err(Error::CacheRead)? - { - return Ok(LocalWheel::Unzipped(UnzippedWheel { - dist: dist.clone(), - archive, - filename: wheel.filename.clone(), - })); - } - } - Err(err) if err.kind() == io::ErrorKind::NotFound => {} - Err(err) => return Err(Error::CacheRead(err)), - } - - Ok(LocalWheel::Disk(DiskWheel { - dist: dist.clone(), - path: wheel.path.clone(), - target: cache_entry.into_path_buf(), - filename: wheel.filename.clone(), - })) - } - - Dist::Source(source_dist) => { - let lock = self.locks.acquire(&dist).await; - let _guard = lock.lock().await; - - let built_wheel = self - .builder - .download_and_build(&BuildableSource::Dist(source_dist), self.tags) - .boxed() - .await?; - - // If the wheel was unzipped previously, respect it. Source distributions are - // cached under a unique build ID, so unzipped directories are never stale. - match built_wheel.target.canonicalize() { - Ok(archive) => Ok(LocalWheel::Unzipped(UnzippedWheel { - dist: dist.clone(), - archive, - filename: built_wheel.filename, - })), - Err(err) if err.kind() == io::ErrorKind::NotFound => { - Ok(LocalWheel::Built(BuiltWheel { - dist: dist.clone(), - path: built_wheel.path, - target: built_wheel.target, - filename: built_wheel.filename, - })) - } - Err(err) => return Err(Error::CacheRead(err)), - } - } + pub async fn get_or_build_wheel(&self, dist: &Dist, tags: &Tags) -> Result { + match dist { + Dist::Built(built) => self.get_wheel(built).await, + Dist::Source(source) => self.build_wheel(source, tags).await, } } @@ -329,11 +110,11 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> match self.client.wheel_metadata(built_dist).boxed().await { Ok(metadata) => Ok((metadata, None)), Err(err) if err.is_http_streaming_unsupported() => { - warn!("Streaming unsupported when fetching metadata for {dist}; downloading wheel directly ({err})"); + warn!("Streaming unsupported when fetching metadata for {built_dist}; downloading wheel directly ({err})"); // If the request failed due to an error that could be resolved by // downloading the wheel directly, try that. - let wheel = self.get_or_build_wheel(dist.clone()).await?; + let wheel = self.get_wheel(built_dist).await?; Ok((wheel.metadata()?, None)) } Err(err) => Err(err.into()), @@ -345,6 +126,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> NoBuild::None => false, NoBuild::Packages(packages) => packages.contains(source_dist.name()), }; + // Optimization: Skip source dist download when we must not build them anyway. if no_build { return Err(Error::NoBuild); @@ -396,13 +178,222 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> Ok((LocalWheel::Built(built_wheel), metadata)) } + /// Fetch a wheel from the cache or download it from the index. + async fn get_wheel(&self, dist: &BuiltDist) -> Result { + let no_binary = match self.build_context.no_binary() { + NoBinary::None => false, + NoBinary::All => true, + NoBinary::Packages(packages) => packages.contains(dist.name()), + }; + if no_binary { + return Err(Error::NoBinary); + } + + match dist { + BuiltDist::Registry(wheel) => { + let url = match &wheel.file.url { + FileLocation::RelativeUrl(base, url) => { + pypi_types::base_url_join_relative(base, url)? + } + FileLocation::AbsoluteUrl(url) => { + Url::parse(url).map_err(|err| Error::Url(url.clone(), err))? + } + FileLocation::Path(path) => { + let url = Url::from_file_path(path).expect("path is absolute"); + let cache_entry = self.cache.entry( + CacheBucket::Wheels, + WheelCache::Url(&url).wheel_dir(wheel.name().as_ref()), + wheel.filename.stem(), + ); + + // If the file is already unzipped, and the unzipped directory is fresh, + // return it. + match cache_entry.path().canonicalize() { + Ok(archive) => { + if ArchiveTimestamp::up_to_date_with( + path, + ArchiveTarget::Cache(&archive), + ) + .map_err(Error::CacheRead)? + { + return Ok(LocalWheel::Unzipped(UnzippedWheel { + dist: Dist::Built(dist.clone()), + archive, + filename: wheel.filename.clone(), + })); + } + } + Err(err) if err.kind() == io::ErrorKind::NotFound => {} + Err(err) => return Err(Error::CacheRead(err)), + } + + // Otherwise, unzip the file. + return Ok(LocalWheel::Disk(DiskWheel { + dist: Dist::Built(dist.clone()), + path: path.clone(), + target: cache_entry.into_path_buf(), + filename: wheel.filename.clone(), + })); + } + }; + + // Create a cache entry for the wheel. + let wheel_entry = self.cache.entry( + CacheBucket::Wheels, + WheelCache::Index(&wheel.index).wheel_dir(wheel.name().as_ref()), + wheel.filename.stem(), + ); + + // Download and unzip. + match self + .stream_wheel(url.clone(), &wheel.filename, &wheel_entry, dist) + .await + { + Ok(archive) => Ok(LocalWheel::Unzipped(UnzippedWheel { + dist: Dist::Built(dist.clone()), + archive, + filename: wheel.filename.clone(), + })), + Err(Error::Extract(err)) if err.is_http_streaming_unsupported() => { + warn!( + "Streaming unsupported for {dist}; downloading wheel to disk ({err})" + ); + + // If the request failed because streaming is unsupported, download the + // wheel directly. + let archive = self + .download_wheel(url, &wheel.filename, &wheel_entry, dist) + .await?; + Ok(LocalWheel::Unzipped(UnzippedWheel { + dist: Dist::Built(dist.clone()), + archive, + filename: wheel.filename.clone(), + })) + } + Err(err) => Err(err), + } + } + + BuiltDist::DirectUrl(wheel) => { + // Create a cache entry for the wheel. + let wheel_entry = self.cache.entry( + CacheBucket::Wheels, + WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()), + wheel.filename.stem(), + ); + + // Download and unzip. + match self + .stream_wheel(wheel.url.raw().clone(), &wheel.filename, &wheel_entry, dist) + .await + { + Ok(archive) => Ok(LocalWheel::Unzipped(UnzippedWheel { + dist: Dist::Built(dist.clone()), + archive, + filename: wheel.filename.clone(), + })), + Err(Error::Client(err)) if err.is_http_streaming_unsupported() => { + warn!( + "Streaming unsupported for {dist}; downloading wheel to disk ({err})" + ); + + // If the request failed because streaming is unsupported, download the + // wheel directly. + let archive = self + .download_wheel( + wheel.url.raw().clone(), + &wheel.filename, + &wheel_entry, + dist, + ) + .await?; + Ok(LocalWheel::Unzipped(UnzippedWheel { + dist: Dist::Built(dist.clone()), + archive, + filename: wheel.filename.clone(), + })) + } + Err(err) => Err(err), + } + } + + BuiltDist::Path(wheel) => { + let cache_entry = self.cache.entry( + CacheBucket::Wheels, + WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()), + wheel.filename.stem(), + ); + + // If the file is already unzipped, and the unzipped directory is fresh, + // return it. + match cache_entry.path().canonicalize() { + Ok(archive) => { + if ArchiveTimestamp::up_to_date_with( + &wheel.path, + ArchiveTarget::Cache(&archive), + ) + .map_err(Error::CacheRead)? + { + return Ok(LocalWheel::Unzipped(UnzippedWheel { + dist: Dist::Built(dist.clone()), + archive, + filename: wheel.filename.clone(), + })); + } + } + Err(err) if err.kind() == io::ErrorKind::NotFound => {} + Err(err) => return Err(Error::CacheRead(err)), + } + + Ok(LocalWheel::Disk(DiskWheel { + dist: Dist::Built(dist.clone()), + path: wheel.path.clone(), + target: cache_entry.into_path_buf(), + filename: wheel.filename.clone(), + })) + } + } + } + + /// Convert a source distribution into a wheel, fetching it from the cache or building it if + /// necessary. + async fn build_wheel(&self, dist: &SourceDist, tags: &Tags) -> Result { + let lock = self.locks.acquire(&Dist::Source(dist.clone())).await; + let _guard = lock.lock().await; + + let built_wheel = self + .builder + .download_and_build(&BuildableSource::Dist(dist), tags) + .boxed() + .await?; + + // If the wheel was unzipped previously, respect it. Source distributions are + // cached under a unique build ID, so unzipped directories are never stale. + match built_wheel.target.canonicalize() { + Ok(archive) => Ok(LocalWheel::Unzipped(UnzippedWheel { + dist: Dist::Source(dist.clone()), + archive, + filename: built_wheel.filename, + })), + Err(err) if err.kind() == io::ErrorKind::NotFound => { + Ok(LocalWheel::Built(BuiltWheel { + dist: Dist::Source(dist.clone()), + path: built_wheel.path, + target: built_wheel.target, + filename: built_wheel.filename, + })) + } + Err(err) => Err(Error::CacheRead(err)), + } + } + /// Stream a wheel from a URL, unzipping it into the cache as it's downloaded. async fn stream_wheel( &self, url: Url, filename: &WheelFilename, wheel_entry: &CacheEntry, - dist: &Dist, + dist: &BuiltDist, ) -> Result { // Create an entry for the HTTP cache. let http_entry = wheel_entry.with_file(format!("{}.http", filename.stem())); @@ -470,7 +461,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> url: Url, filename: &WheelFilename, wheel_entry: &CacheEntry, - dist: &Dist, + dist: &BuiltDist, ) -> Result { // Create an entry for the HTTP cache. let http_entry = wheel_entry.with_file(format!("{}.http", filename.stem())); diff --git a/crates/uv-installer/src/downloader.rs b/crates/uv-installer/src/downloader.rs index 8e140d241..7ee10205e 100644 --- a/crates/uv-installer/src/downloader.rs +++ b/crates/uv-installer/src/downloader.rs @@ -38,8 +38,9 @@ pub enum Error { /// Download, build, and unzip a set of distributions. pub struct Downloader<'a, Context: BuildContext + Send + Sync> { - database: DistributionDatabase<'a, Context>, + tags: &'a Tags, cache: &'a Cache, + database: DistributionDatabase<'a, Context>, reporter: Option>, } @@ -51,9 +52,10 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { build_context: &'a Context, ) -> Self { Self { - database: DistributionDatabase::new(cache, tags, client, build_context), - reporter: None, + tags, cache, + database: DistributionDatabase::new(cache, client, build_context), + reporter: None, } } @@ -62,9 +64,10 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { pub fn with_reporter(self, reporter: impl Reporter + 'static) -> Self { let reporter: Arc = Arc::new(reporter); Self { - reporter: Some(reporter.clone()), - database: self.database.with_reporter(Facade::from(reporter.clone())), + tags: self.tags, cache: self.cache, + database: self.database.with_reporter(Facade::from(reporter.clone())), + reporter: Some(reporter.clone()), } } @@ -165,7 +168,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { if in_flight.downloads.register(id.clone()) { let download: LocalWheel = self .database - .get_or_build_wheel(dist.clone()) + .get_or_build_wheel(&dist, self.tags) .boxed() .map_err(|err| Error::Fetch(dist.clone(), err)) .await?; diff --git a/crates/uv-resolver/src/resolver/mod.rs b/crates/uv-resolver/src/resolver/mod.rs index 229ef6489..e7d717eab 100644 --- a/crates/uv-resolver/src/resolver/mod.rs +++ b/crates/uv-resolver/src/resolver/mod.rs @@ -129,7 +129,7 @@ impl<'a, Context: BuildContext + Send + Sync> Resolver<'a, DefaultResolverProvid ) -> Result { let provider = DefaultResolverProvider::new( client, - DistributionDatabase::new(build_context.cache(), tags, client, build_context), + DistributionDatabase::new(build_context.cache(), client, build_context), flat_index, tags, PythonRequirement::new(interpreter, markers),