diff --git a/crates/uv-distribution/src/source/mod.rs b/crates/uv-distribution/src/source/mod.rs index e9c713360..59c86adb3 100644 --- a/crates/uv-distribution/src/source/mod.rs +++ b/crates/uv-distribution/src/source/mod.rs @@ -9,7 +9,6 @@ use anyhow::Result; use fs_err::tokio as fs; use futures::{FutureExt, TryStreamExt}; use reqwest::Response; -use tempfile::TempDir; use tokio_util::compat::FuturesAsyncReadCompatExt; use tracing::{debug, info_span, instrument, Instrument}; use url::Url; @@ -24,8 +23,7 @@ use install_wheel_rs::metadata::read_archive_metadata; use platform_tags::Tags; use pypi_types::Metadata23; use uv_cache::{ - ArchiveTimestamp, Cache, CacheBucket, CacheEntry, CacheShard, CachedByTimestamp, Freshness, - WheelCache, + ArchiveTimestamp, CacheBucket, CacheEntry, CacheShard, CachedByTimestamp, Freshness, WheelCache, }; use uv_client::{ CacheControl, CachedClientError, Connectivity, DataWithCachePolicy, RegistryClient, @@ -93,18 +91,13 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { } FileLocation::Path(path) => { let url = Url::from_file_path(path).expect("path is absolute"); - - // If necessary, extract the archive. - let extracted = extract_archive(path, self.build_context.cache()).await?; - return self - .path( + .archive( source, &PathSourceUrl { url: &url, path: Cow::Borrowed(path), }, - extracted.path(), tags, ) .boxed() @@ -152,12 +145,15 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { .await? } BuildableSource::Dist(SourceDist::Path(dist)) => { - // If necessary, extract the archive. - let extracted = extract_archive(&dist.path, self.build_context.cache()).await?; - - self.path(source, &PathSourceUrl::from(dist), extracted.path(), tags) - .boxed() - .await? + if dist.path.is_dir() { + self.source_tree(source, &PathSourceUrl::from(dist), tags) + .boxed() + .await? + } else { + self.archive(source, &PathSourceUrl::from(dist), tags) + .boxed() + .await? + } } BuildableSource::Url(SourceUrl::Direct(resource)) => { let filename = resource @@ -187,12 +183,11 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { self.git(source, resource, tags).boxed().await? } BuildableSource::Url(SourceUrl::Path(resource)) => { - // If necessary, extract the archive. - let extracted = extract_archive(&resource.path, self.build_context.cache()).await?; - - self.path(source, resource, extracted.path(), tags) - .boxed() - .await? + if resource.path.is_dir() { + self.source_tree(source, resource, tags).boxed().await? + } else { + self.archive(source, resource, tags).boxed().await? + } } }; @@ -217,18 +212,13 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { } FileLocation::Path(path) => { let url = Url::from_file_path(path).expect("path is absolute"); - - // If necessary, extract the archive. - let extracted = extract_archive(path, self.build_context.cache()).await?; - return self - .path_metadata( + .archive_metadata( source, &PathSourceUrl { url: &url, path: Cow::Borrowed(path), }, - extracted.path(), ) .boxed() .await; @@ -273,12 +263,15 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { .await? } BuildableSource::Dist(SourceDist::Path(dist)) => { - // If necessary, extract the archive. - let extracted = extract_archive(&dist.path, self.build_context.cache()).await?; - - self.path_metadata(source, &PathSourceUrl::from(dist), extracted.path()) - .boxed() - .await? + if dist.path.is_dir() { + self.source_tree_metadata(source, &PathSourceUrl::from(dist)) + .boxed() + .await? + } else { + self.archive_metadata(source, &PathSourceUrl::from(dist)) + .boxed() + .await? + } } BuildableSource::Url(SourceUrl::Direct(resource)) => { let filename = resource @@ -307,12 +300,11 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { self.git_metadata(source, resource).boxed().await? } BuildableSource::Url(SourceUrl::Path(resource)) => { - // If necessary, extract the archive. - let extracted = extract_archive(&resource.path, self.build_context.cache()).await?; - - self.path_metadata(source, resource, extracted.path()) - .boxed() - .await? + if resource.path.is_dir() { + self.source_tree_metadata(source, resource).boxed().await? + } else { + self.archive_metadata(source, resource).boxed().await? + } } }; @@ -335,10 +327,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { .url_revision(source, filename, url, cache_shard) .await?; - // From here on, scope all operations to the current build. Within the revision shard, - // there's no need to check for freshness, since entries have to be fresher than the - // revision itself. There's also no need to lock, since we never replace entries within the - // shard. + // Scope all operations to the revision. Within the revision, there's no need to check for + // freshness, since entries have to be fresher than the revision itself. let cache_shard = cache_shard.shard(revision.id()); // If the cache contains a compatible wheel, return it. @@ -394,10 +384,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { .url_revision(source, filename, url, cache_shard) .await?; - // From here on, scope all operations to the current build. Within the revision shard, - // there's no need to check for freshness, since entries have to be fresher than the - // revision itself. There's also no need to lock, since we never replace entries within the - // shard. + // Scope all operations to the revision. Within the revision, there's no need to check for + // freshness, since entries have to be fresher than the revision itself. let cache_shard = cache_shard.shard(revision.id()); // If the cache contains compatible metadata, return it. @@ -500,12 +488,11 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { }) } - /// Build a source distribution from a local path. - async fn path( + /// Build a source distribution from a local archive (e.g., `.tar.gz` or `.zip`). + async fn archive( &self, source: &BuildableSource<'_>, resource: &PathSourceUrl<'_>, - source_root: &Path, tags: &Tags, ) -> Result { let cache_shard = self.build_context.cache().shard( @@ -514,12 +501,12 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { ); // Fetch the revision for the source distribution. - let revision = self.path_revision(source, resource, &cache_shard).await?; + let revision = self + .archive_revision(source, resource, &cache_shard) + .await?; - // From here on, scope all operations to the current build. Within the revision shard, - // there's no need to check for freshness, since entries have to be fresher than the - // revision itself. There's also no need to lock, since we never replace entries within the - // shard. + // Scope all operations to the revision. Within the revision, there's no need to check for + // freshness, since entries have to be fresher than the revision itself. let cache_shard = cache_shard.shard(revision.id()); // If the cache contains a compatible wheel, return it. @@ -527,6 +514,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { return Ok(built_wheel); } + let source_entry = cache_shard.entry("source"); + // Otherwise, we need to build a wheel. let task = self .reporter @@ -534,7 +523,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { .map(|reporter| reporter.on_build_start(source)); let (disk_filename, filename, metadata) = self - .build_distribution(source, source_root, None, &cache_shard) + .build_distribution(source, source_entry.path(), None, &cache_shard) .await?; if let Some(task) = task { @@ -556,15 +545,14 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { }) } - /// Build the source distribution's metadata from a local path. + /// Build the source distribution's metadata from a local archive (e.g., `.tar.gz` or `.zip`). /// /// If the build backend supports `prepare_metadata_for_build_wheel`, this method will avoid /// building the wheel. - async fn path_metadata( + async fn archive_metadata( &self, source: &BuildableSource<'_>, resource: &PathSourceUrl<'_>, - source_root: &Path, ) -> Result { let cache_shard = self.build_context.cache().shard( CacheBucket::BuiltWheels, @@ -572,31 +560,26 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { ); // Fetch the revision for the source distribution. - let revision = self.path_revision(source, resource, &cache_shard).await?; + let revision = self + .archive_revision(source, resource, &cache_shard) + .await?; - // From here on, scope all operations to the current build. Within the revision shard, - // there's no need to check for freshness, since entries have to be fresher than the - // revision itself. There's also no need to lock, since we never replace entries within the - // shard. + // Scope all operations to the revision. Within the revision, there's no need to check for + // freshness, since entries have to be fresher than the revision itself. let cache_shard = cache_shard.shard(revision.id()); // If the cache contains compatible metadata, return it. let metadata_entry = cache_shard.entry(METADATA); - if self - .build_context - .cache() - .freshness(&metadata_entry, source.name()) - .is_ok_and(Freshness::is_fresh) - { - if let Some(metadata) = read_cached_metadata(&metadata_entry).await? { - debug!("Using cached metadata for: {source}"); - return Ok(metadata); - } + if let Some(metadata) = read_cached_metadata(&metadata_entry).await? { + debug!("Using cached metadata for: {source}"); + return Ok(metadata); } + let source_entry = cache_shard.entry("source"); + // If the backend supports `prepare_metadata_for_build_wheel`, use it. if let Some(metadata) = self - .build_metadata(source, source_root, None) + .build_metadata(source, source_entry.path(), None) .boxed() .await? { @@ -619,7 +602,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { .map(|reporter| reporter.on_build_start(source)); let (_disk_filename, _filename, metadata) = self - .build_distribution(source, source_root, None, &cache_shard) + .build_distribution(source, source_entry.path(), None, &cache_shard) .await?; if let Some(task) = task { @@ -637,8 +620,184 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { Ok(metadata) } - /// Return the [`Revision`] for a local path, refreshing it if necessary. - async fn path_revision( + /// Return the [`Revision`] for a local archive, refreshing it if necessary. + async fn archive_revision( + &self, + source: &BuildableSource<'_>, + resource: &PathSourceUrl<'_>, + cache_shard: &CacheShard, + ) -> Result { + // Determine the last-modified time of the source distribution. + let modified = ArchiveTimestamp::from_file(&resource.path).map_err(Error::CacheRead)?; + + // Read the existing metadata from the cache. + let revision_entry = cache_shard.entry(REVISION); + + // If the revision already exists, return it. There's no need to check for freshness, since + // we use an exact timestamp. + if let Some(revision) = read_timestamped_revision(&revision_entry, modified)? { + return Ok(revision); + } + + // Otherwise, we need to create a new revision. + let revision = Revision::new(); + + // Unzip the archive to a temporary directory. + debug!("Unpacking source distribution: {source}"); + let entry = cache_shard.shard(revision.id()).entry("source"); + self.persist_archive(&resource.path, source, &entry).await?; + + // Persist the revision. + write_atomic( + revision_entry.path(), + rmp_serde::to_vec(&CachedByTimestamp { + timestamp: modified.timestamp(), + data: revision.clone(), + })?, + ) + .await + .map_err(Error::CacheWrite)?; + + Ok(revision) + } + + /// Build a source distribution from a local source tree (i.e., directory). + async fn source_tree( + &self, + source: &BuildableSource<'_>, + resource: &PathSourceUrl<'_>, + tags: &Tags, + ) -> Result { + let cache_shard = self.build_context.cache().shard( + CacheBucket::BuiltWheels, + WheelCache::Path(resource.url).root(), + ); + + // Fetch the revision for the source distribution. + let revision = self + .source_tree_revision(source, resource, &cache_shard) + .await?; + + // Scope all operations to the revision. Within the revision, there's no need to check for + // freshness, since entries have to be fresher than the revision itself. + let cache_shard = cache_shard.shard(revision.id()); + + // If the cache contains a compatible wheel, return it. + if let Some(built_wheel) = BuiltWheelMetadata::find_in_cache(tags, &cache_shard) { + return Ok(built_wheel); + } + + // Otherwise, we need to build a wheel. + let task = self + .reporter + .as_ref() + .map(|reporter| reporter.on_build_start(source)); + + let (disk_filename, filename, metadata) = self + .build_distribution(source, &resource.path, None, &cache_shard) + .await?; + + if let Some(task) = task { + if let Some(reporter) = self.reporter.as_ref() { + reporter.on_build_complete(source, task); + } + } + + // Store the metadata. + let metadata_entry = cache_shard.entry(METADATA); + write_atomic(metadata_entry.path(), rmp_serde::to_vec(&metadata)?) + .await + .map_err(Error::CacheWrite)?; + + Ok(BuiltWheelMetadata { + path: cache_shard.join(&disk_filename), + target: cache_shard.join(filename.stem()), + filename, + }) + } + + /// Build the source distribution's metadata from a local source tree (i.e., a directory). + /// + /// If the build backend supports `prepare_metadata_for_build_wheel`, this method will avoid + /// building the wheel. + async fn source_tree_metadata( + &self, + source: &BuildableSource<'_>, + resource: &PathSourceUrl<'_>, + ) -> Result { + let cache_shard = self.build_context.cache().shard( + CacheBucket::BuiltWheels, + WheelCache::Path(resource.url).root(), + ); + + // Fetch the revision for the source distribution. + let revision = self + .source_tree_revision(source, resource, &cache_shard) + .await?; + + // Scope all operations to the revision. Within the revision, there's no need to check for + // freshness, since entries have to be fresher than the revision itself. + let cache_shard = cache_shard.shard(revision.id()); + + // If the cache contains compatible metadata, return it. + let metadata_entry = cache_shard.entry(METADATA); + if self + .build_context + .cache() + .freshness(&metadata_entry, source.name()) + .is_ok_and(Freshness::is_fresh) + { + if let Some(metadata) = read_cached_metadata(&metadata_entry).await? { + debug!("Using cached metadata for: {source}"); + return Ok(metadata); + } + } + + // If the backend supports `prepare_metadata_for_build_wheel`, use it. + if let Some(metadata) = self + .build_metadata(source, &resource.path, None) + .boxed() + .await? + { + // Store the metadata. + let cache_entry = cache_shard.entry(METADATA); + fs::create_dir_all(cache_entry.dir()) + .await + .map_err(Error::CacheWrite)?; + write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?) + .await + .map_err(Error::CacheWrite)?; + + return Ok(metadata); + } + + // Otherwise, we need to build a wheel. + let task = self + .reporter + .as_ref() + .map(|reporter| reporter.on_build_start(source)); + + let (_disk_filename, _filename, metadata) = self + .build_distribution(source, &resource.path, None, &cache_shard) + .await?; + + if let Some(task) = task { + if let Some(reporter) = self.reporter.as_ref() { + reporter.on_build_complete(source, task); + } + } + + // Store the metadata. + let metadata_entry = cache_shard.entry(METADATA); + write_atomic(metadata_entry.path(), rmp_serde::to_vec(&metadata)?) + .await + .map_err(Error::CacheWrite)?; + + Ok(metadata) + } + + /// Return the [`Revision`] for a local source tree, refreshing it if necessary. + async fn source_tree_revision( &self, source: &BuildableSource<'_>, resource: &PathSourceUrl<'_>, @@ -815,21 +974,21 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { } /// Download and unzip a source distribution into the cache from an HTTP response. - async fn persist_url<'data>( + async fn persist_url( &self, response: Response, source: &BuildableSource<'_>, filename: &str, - cache_entry: &'data CacheEntry, - ) -> Result<&'data Path, Error> { + cache_entry: &CacheEntry, + ) -> Result<(), Error> { let cache_path = cache_entry.path(); if cache_path.is_dir() { debug!("Distribution is already cached: {source}"); - return Ok(cache_path); + return Ok(()); } // Download and unzip the source distribution into a temporary directory. - let span = info_span!("download_source_dist", filename = filename, source_dist = %source); + let span = info_span!("persist_url", filename = filename, source_dist = %source); let temp_dir = tempfile::tempdir_in(self.build_context.cache().bucket(CacheBucket::BuiltWheels)) .map_err(Error::CacheWrite)?; @@ -855,7 +1014,49 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { .await .map_err(Error::CacheWrite)?; - Ok(cache_path) + Ok(()) + } + + /// Extract a local archive, and store it at the given [`CacheEntry`]. + async fn persist_archive( + &self, + path: &Path, + source: &BuildableSource<'_>, + cache_entry: &CacheEntry, + ) -> Result<(), Error> { + let cache_path = cache_entry.path(); + if cache_path.is_dir() { + debug!("Distribution is already cached: {source}"); + return Ok(()); + } + + debug!("Unpacking for build: {}", path.display()); + + // Unzip the archive into a temporary directory. + let temp_dir = + tempfile::tempdir_in(self.build_context.cache().bucket(CacheBucket::BuiltWheels)) + .map_err(Error::CacheWrite)?; + let reader = fs_err::tokio::File::open(&path) + .await + .map_err(Error::CacheRead)?; + uv_extract::seek::archive(reader, path, &temp_dir.path()).await?; + + // Extract the top-level directory from the archive. + let extracted = match uv_extract::strip_component(temp_dir.path()) { + Ok(top_level) => top_level, + Err(uv_extract::Error::NonSingularArchive(_)) => temp_dir.path().to_path_buf(), + Err(err) => return Err(err.into()), + }; + + // Persist it to the cache. + fs_err::tokio::create_dir_all(cache_path.parent().expect("Cache entry to have parent")) + .await + .map_err(Error::CacheWrite)?; + fs_err::tokio::rename(extracted, &cache_path) + .await + .map_err(Error::CacheWrite)?; + + Ok(()) } /// Build a source distribution, storing the built wheel in the cache. @@ -1109,29 +1310,6 @@ pub(crate) fn read_timestamped_revision( Ok(None) } -#[derive(Debug)] -enum ExtractedSource { - /// The source distribution was passed in as a directory, and so doesn't need to be extracted. - Directory(PathBuf), - /// The source distribution was passed in as an archive, and was extracted into a temporary - /// directory. - /// - /// The extracted archive and temporary directory will be deleted when the `ExtractedSource` is - /// dropped. - #[allow(dead_code)] - Archive(PathBuf, TempDir), -} - -impl ExtractedSource { - /// Return the [`Path`] to the extracted source root. - fn path(&self) -> &Path { - match self { - ExtractedSource::Directory(path) => path, - ExtractedSource::Archive(path, _) => path, - } - } -} - /// Read the [`Metadata23`] from a source distribution's `PKG-INFO` file, if it uses Metadata 2.2 /// or later _and_ none of the required fields (`Requires-Python`, `Requires-Dist`, and /// `Provides-Extra`) are marked as dynamic. @@ -1236,41 +1414,3 @@ fn read_wheel_metadata( let dist_info = read_archive_metadata(filename, &mut archive)?; Ok(Metadata23::parse_metadata(&dist_info)?) } - -/// Extract a local source distribution, if it's stored as a `.tar.gz` or `.zip` archive. -/// -/// TODO(charlie): Consider storing the extracted source in the cache, to avoid re-extracting -/// on every invocation. -async fn extract_archive(path: &Path, cache: &Cache) -> Result { - let metadata = match fs::metadata(&path).await { - Ok(metadata) => metadata, - Err(err) if err.kind() == std::io::ErrorKind::NotFound => { - return Err(Error::NotFound(path.to_path_buf())); - } - Err(err) => return Err(Error::CacheRead(err)), - }; - - if metadata.is_dir() { - Ok(ExtractedSource::Directory(path.to_path_buf())) - } else { - debug!("Unpacking for build: {}", path.display()); - - let temp_dir = tempfile::tempdir_in(cache.bucket(CacheBucket::BuiltWheels)) - .map_err(Error::CacheWrite)?; - - // Unzip the archive into the temporary directory. - let reader = fs_err::tokio::File::open(&path) - .await - .map_err(Error::CacheRead)?; - uv_extract::seek::archive(reader, path, &temp_dir.path()).await?; - - // Extract the top-level directory from the archive. - let extracted = match uv_extract::strip_component(temp_dir.path()) { - Ok(top_level) => top_level, - Err(uv_extract::Error::NonSingularArchive(_)) => temp_dir.path().to_path_buf(), - Err(err) => return Err(err.into()), - }; - - Ok(ExtractedSource::Archive(extracted, temp_dir)) - } -} diff --git a/crates/uv/tests/pip_sync.rs b/crates/uv/tests/pip_sync.rs index 255912a11..a874e85df 100644 --- a/crates/uv/tests/pip_sync.rs +++ b/crates/uv/tests/pip_sync.rs @@ -1601,7 +1601,7 @@ fn install_path_source_dist_cached() -> Result<()> { ----- stdout ----- ----- stderr ----- - Removed 4 files for wheel ([SIZE]) + Removed 102 files for wheel ([SIZE]) "### );