Store source distribution builds under a unique manifest ID (#1051)

## Summary

This is a refactor of the source distribution cache that again aims to
make the cache purely additive. Instead of deleting all built wheels
when the cache gets invalidated (e.g., because the source distribution
changed on PyPI or something), we now treat each invalidation as its own
cache directory. The manifest inside of the source distribution
directory now becomes a pointer to the "latest" version of the source
distribution cache.

Here's a visual example:

![Screenshot 2024-01-22 at 5 35
41 PM](ca103c83-e116-4956-b91c-8434fe62cffe)

With this change, we avoid deleting built distributions that might be
relied on elsewhere and maintain our invariant that the cache is purely
additive. The cost is that we now preserve stale wheels, but we should
add a garbage collection mechanism to deal with that.
This commit is contained in:
Charlie Marsh 2024-01-23 14:49:11 -05:00 committed by GitHub
parent e32027e384
commit 6561617c56
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 294 additions and 206 deletions

11
Cargo.lock generated
View file

@ -2515,6 +2515,7 @@ dependencies = [
"tokio-util", "tokio-util",
"tracing", "tracing",
"url", "url",
"uuid",
"zip", "zip",
] ]
@ -4007,6 +4008,16 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "uuid"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a"
dependencies = [
"getrandom",
"serde",
]
[[package]] [[package]]
name = "valuable" name = "valuable"
version = "0.1.0" version = "0.1.0"

View file

@ -88,6 +88,7 @@ tracing-tree = { version = "0.3.0" }
unicode-width = { version = "0.1.11" } unicode-width = { version = "0.1.11" }
unscanny = { version = "0.1.0" } unscanny = { version = "0.1.0" }
url = { version = "2.5.0" } url = { version = "2.5.0" }
uuid = { version = "1.7.0", default-features = false }
waitmap = { version = "1.1.0" } waitmap = { version = "1.1.0" }
walkdir = { version = "2.4.0" } walkdir = { version = "2.4.0" }
which = { version = "5.0.0" } which = { version = "5.0.0" }

View file

@ -68,9 +68,22 @@ impl CacheEntry {
pub struct CacheShard(PathBuf); pub struct CacheShard(PathBuf);
impl CacheShard { impl CacheShard {
/// Return a [`CacheEntry`] within this shard.
pub fn entry(&self, file: impl AsRef<Path>) -> CacheEntry { pub fn entry(&self, file: impl AsRef<Path>) -> CacheEntry {
CacheEntry::new(&self.0, file) CacheEntry::new(&self.0, file)
} }
/// Return a [`CacheShard`] within this shard.
#[must_use]
pub fn shard(&self, dir: impl AsRef<Path>) -> Self {
Self(self.0.join(dir.as_ref()))
}
}
impl AsRef<Path> for CacheShard {
fn as_ref(&self) -> &Path {
&self.0
}
} }
impl Deref for CacheShard { impl Deref for CacheShard {

View file

@ -43,4 +43,5 @@ tokio = { workspace = true }
tokio-util = { workspace = true, features = ["compat"] } tokio-util = { workspace = true, features = ["compat"] }
tracing = { workspace = true } tracing = { workspace = true }
url = { workspace = true } url = { workspace = true }
uuid = { workspace = true, default-features = false, features = ["v4", "serde"] }
zip = { workspace = true } zip = { workspace = true }

View file

@ -1,13 +1,80 @@
use distribution_types::{git_reference, DirectUrlSourceDist, GitSourceDist, Name, PathSourceDist};
use platform_tags::Tags; use platform_tags::Tags;
use puffin_cache::CacheShard; use puffin_cache::{Cache, CacheBucket, CacheShard, WheelCache};
use puffin_fs::directories; use puffin_fs::directories;
use crate::index::cached_wheel::CachedWheel; use crate::index::cached_wheel::CachedWheel;
use crate::source::{read_http_manifest, read_timestamp_manifest, MANIFEST};
use crate::SourceDistError;
/// A local index of built distributions for a specific source distribution. /// A local index of built distributions for a specific source distribution.
pub struct BuiltWheelIndex; pub struct BuiltWheelIndex;
impl BuiltWheelIndex { impl BuiltWheelIndex {
/// Return the most compatible [`CachedWheel`] for a given source distribution at a direct URL.
///
/// This method does not perform any freshness checks and assumes that the source distribution
/// is already up-to-date.
pub fn url(
source_dist: &DirectUrlSourceDist,
cache: &Cache,
tags: &Tags,
) -> Result<Option<CachedWheel>, SourceDistError> {
// For direct URLs, cache directly under the hash of the URL itself.
let cache_shard = cache.shard(
CacheBucket::BuiltWheels,
WheelCache::Url(source_dist.url.raw()).remote_wheel_dir(source_dist.name().as_ref()),
);
// Read the existing metadata from the cache, if it exists.
let manifest_entry = cache_shard.entry(MANIFEST);
let Some(manifest) = read_http_manifest(&manifest_entry)? else {
return Ok(None);
};
Ok(Self::find(&cache_shard.shard(manifest.digest()), tags))
}
/// Return the most compatible [`CachedWheel`] for a given source distribution at a local path.
pub fn path(
source_dist: &PathSourceDist,
cache: &Cache,
tags: &Tags,
) -> Result<Option<CachedWheel>, SourceDistError> {
let cache_shard = cache.shard(
CacheBucket::BuiltWheels,
WheelCache::Path(&source_dist.url).remote_wheel_dir(source_dist.name().as_ref()),
);
// Determine the last-modified time of the source distribution.
let Some(modified) = puffin_cache::archive_mtime(&source_dist.path)? else {
return Err(SourceDistError::DirWithoutEntrypoint);
};
// Read the existing metadata from the cache, if it's up-to-date.
let manifest_entry = cache_shard.entry(MANIFEST);
let Some(manifest) = read_timestamp_manifest(&manifest_entry, modified)? else {
return Ok(None);
};
Ok(Self::find(&cache_shard.shard(manifest.digest()), tags))
}
/// Return the most compatible [`CachedWheel`] for a given source distribution at a git URL.
pub fn git(source_dist: &GitSourceDist, cache: &Cache, tags: &Tags) -> Option<CachedWheel> {
let Ok(Some(git_sha)) = git_reference(&source_dist.url) else {
return None;
};
let cache_shard = cache.shard(
CacheBucket::BuiltWheels,
WheelCache::Git(&source_dist.url, &git_sha.to_short_string())
.remote_wheel_dir(source_dist.name().as_ref()),
);
Self::find(&cache_shard, tags)
}
/// Find the "best" distribution in the index for a given source distribution. /// Find the "best" distribution in the index for a given source distribution.
/// ///
/// This lookup prefers newer versions over older versions, and aims to maximize compatibility /// This lookup prefers newer versions over older versions, and aims to maximize compatibility
@ -27,7 +94,7 @@ impl BuiltWheelIndex {
pub fn find(shard: &CacheShard, tags: &Tags) -> Option<CachedWheel> { pub fn find(shard: &CacheShard, tags: &Tags) -> Option<CachedWheel> {
let mut candidate: Option<CachedWheel> = None; let mut candidate: Option<CachedWheel> = None;
for subdir in directories(&**shard) { for subdir in directories(shard) {
match CachedWheel::from_path(&subdir) { match CachedWheel::from_path(&subdir) {
None => {} None => {}
Some(dist_info) => { Some(dist_info) => {

View file

@ -4,7 +4,6 @@ use std::path::Path;
use rustc_hash::FxHashMap; use rustc_hash::FxHashMap;
use crate::index::cached_wheel::CachedWheel;
use distribution_types::{CachedRegistryDist, FlatIndexLocation, IndexLocations, IndexUrl}; use distribution_types::{CachedRegistryDist, FlatIndexLocation, IndexLocations, IndexUrl};
use pep440_rs::Version; use pep440_rs::Version;
use platform_tags::Tags; use platform_tags::Tags;
@ -12,6 +11,9 @@ use puffin_cache::{Cache, CacheBucket, WheelCache};
use puffin_fs::directories; use puffin_fs::directories;
use puffin_normalize::PackageName; use puffin_normalize::PackageName;
use crate::index::cached_wheel::CachedWheel;
use crate::source::{read_http_manifest, MANIFEST};
/// A local index of distributions that originate from a registry, like `PyPI`. /// A local index of distributions that originate from a registry, like `PyPI`.
#[derive(Debug)] #[derive(Debug)]
pub struct RegistryWheelIndex<'a> { pub struct RegistryWheelIndex<'a> {
@ -96,15 +98,19 @@ impl<'a> RegistryWheelIndex<'a> {
// Index all the built wheels, created by downloading and building source distributions // Index all the built wheels, created by downloading and building source distributions
// from the registry. // from the registry.
let built_wheel_dir = cache.shard( let cache_shard = cache.shard(
CacheBucket::BuiltWheels, CacheBucket::BuiltWheels,
WheelCache::Index(index_url).built_wheel_dir(package.to_string()), WheelCache::Index(index_url).built_wheel_dir(package.to_string()),
); );
// Built wheels have one more level of indirection, as they are keyed by the source // For registry wheels, the cache structure is: `<index>/<package-name>/<version>/`.
// distribution filename. for shard in directories(&cache_shard) {
for subdir in directories(&*built_wheel_dir) { // Read the existing metadata from the cache, if it exists.
Self::add_directory(subdir, tags, &mut versions); let cache_shard = cache_shard.shard(shard);
let manifest_entry = cache_shard.entry(MANIFEST);
if let Ok(Some(manifest)) = read_http_manifest(&manifest_entry) {
Self::add_directory(cache_shard.join(manifest.digest()), tags, &mut versions);
};
} }
} }

View file

@ -3,7 +3,7 @@ use std::str::FromStr;
use distribution_filename::WheelFilename; use distribution_filename::WheelFilename;
use platform_tags::Tags; use platform_tags::Tags;
use puffin_cache::CacheEntry; use puffin_cache::CacheShard;
use puffin_fs::directories; use puffin_fs::directories;
/// The information about the wheel we either just built or got from the cache. /// The information about the wheel we either just built or got from the cache.
@ -19,8 +19,8 @@ pub struct BuiltWheelMetadata {
impl BuiltWheelMetadata { impl BuiltWheelMetadata {
/// Find a compatible wheel in the cache based on the given manifest. /// Find a compatible wheel in the cache based on the given manifest.
pub(crate) fn find_in_cache(tags: &Tags, cache_entry: &CacheEntry) -> Option<Self> { pub(crate) fn find_in_cache(tags: &Tags, cache_shard: &CacheShard) -> Option<Self> {
for directory in directories(cache_entry.dir()) { for directory in directories(cache_shard) {
if let Some(metadata) = Self::from_path(directory) { if let Some(metadata) = Self::from_path(directory) {
// Validate that the wheel is compatible with the target platform. // Validate that the wheel is compatible with the target platform.
if metadata.filename.is_compatible(tags) { if metadata.filename.is_compatible(tags) {

View file

@ -1,7 +1,18 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
/// The [`Manifest`] exists as an empty serializable struct we can use to test for cache freshness. /// The [`Manifest`] is a thin wrapper around a unique identifier for the source distribution.
/// #[derive(Debug, Copy, Clone, Serialize, Deserialize)]
/// TODO(charlie): Store a unique ID, rather than an empty struct. pub(crate) struct Manifest(uuid::Uuid);
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub(crate) struct Manifest; impl Manifest {
/// Initialize a new [`Manifest`] with a random UUID.
pub(crate) fn new() -> Self {
Self(uuid::Uuid::new_v4())
}
/// Return the digest of the manifest. At present, the digest is the first 8 bytes of the
/// [`uuid::Uuid`] as a string.
pub(crate) fn digest(&self) -> String {
self.0.to_string()[..8].to_string()
}
}

View file

@ -23,7 +23,7 @@ use install_wheel_rs::read_dist_info;
use pep508_rs::VerbatimUrl; use pep508_rs::VerbatimUrl;
use platform_tags::Tags; use platform_tags::Tags;
use puffin_cache::{CacheBucket, CacheEntry, CacheShard, CachedByTimestamp, WheelCache}; use puffin_cache::{CacheBucket, CacheEntry, CacheShard, CachedByTimestamp, WheelCache};
use puffin_client::{CachedClient, CachedClientError}; use puffin_client::{CachedClient, CachedClientError, DataWithCachePolicy};
use puffin_fs::{write_atomic, LockedFile}; use puffin_fs::{write_atomic, LockedFile};
use puffin_git::{Fetch, GitSource}; use puffin_git::{Fetch, GitSource};
use puffin_traits::{BuildContext, BuildKind, SourceBuildTrait}; use puffin_traits::{BuildContext, BuildKind, SourceBuildTrait};
@ -48,10 +48,10 @@ pub struct SourceDistCachedBuilder<'a, T: BuildContext> {
} }
/// The name of the file that contains the cached manifest, encoded via `MsgPack`. /// The name of the file that contains the cached manifest, encoded via `MsgPack`.
const MANIFEST: &str = "manifest.msgpack"; pub(crate) const MANIFEST: &str = "manifest.msgpack";
/// The name of the file that contains the cached distribution metadata, encoded via `MsgPack`. /// The name of the file that contains the cached distribution metadata, encoded via `MsgPack`.
const METADATA: &str = "metadata.msgpack"; pub(crate) const METADATA: &str = "metadata.msgpack";
impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
/// Initialize a [`SourceDistCachedBuilder`] from a [`BuildContext`]. /// Initialize a [`SourceDistCachedBuilder`] from a [`BuildContext`].
@ -122,13 +122,12 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
} }
}; };
// For registry source distributions, shard by package, then by SHA. // For registry source distributions, shard by package, then version.
// Ex) `pypi/requests/a673187abc19fe6c`
let cache_shard = self.build_context.cache().shard( let cache_shard = self.build_context.cache().shard(
CacheBucket::BuiltWheels, CacheBucket::BuiltWheels,
WheelCache::Index(&registry_source_dist.index) WheelCache::Index(&registry_source_dist.index)
.remote_wheel_dir(registry_source_dist.filename.name.as_ref()) .remote_wheel_dir(registry_source_dist.filename.name.as_ref())
.join(&registry_source_dist.file.distribution_id().as_str()[..16]), .join(registry_source_dist.filename.version.to_string()),
); );
self.url( self.url(
@ -250,27 +249,23 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
let download = |response| { let download = |response| {
async { async {
// At this point, we're seeing a new or updated source distribution; delete all // At this point, we're seeing a new or updated source distribution. Initialize a
// wheels, and redownload. // new manifest, to collect the source and built artifacts.
match fs::remove_dir_all(&cache_entry.dir()).await { let manifest = Manifest::new();
Ok(()) => debug!("Cleared built wheels and metadata for {source_dist}"),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => (),
Err(err) => return Err(err.into()),
}
debug!("Downloading source distribution: {source_dist}");
// Download the source distribution. // Download the source distribution.
let source_dist_entry = cache_shard.entry(filename); debug!("Downloading source distribution: {source_dist}");
let source_dist_entry = cache_shard.shard(manifest.digest()).entry(filename);
self.persist_source_dist_url(response, source_dist, filename, &source_dist_entry) self.persist_source_dist_url(response, source_dist, filename, &source_dist_entry)
.await?; .await?;
Ok(Manifest) Ok(manifest)
} }
.instrument(info_span!("download", source_dist = %source_dist)) .instrument(info_span!("download", source_dist = %source_dist))
}; };
let req = self.cached_client.uncached().get(url.clone()).build()?; let req = self.cached_client.uncached().get(url.clone()).build()?;
self.cached_client let manifest = self
.cached_client
.get_cached_with_callback(req, &cache_entry, download) .get_cached_with_callback(req, &cache_entry, download)
.await .await
.map_err(|err| match err { .map_err(|err| match err {
@ -278,8 +273,11 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
CachedClientError::Client(err) => SourceDistError::Client(err), CachedClientError::Client(err) => SourceDistError::Client(err),
})?; })?;
// From here on, scope all operations to the current build.
let cache_shard = cache_shard.shard(manifest.digest());
// If the cache contains a compatible wheel, return it. // If the cache contains a compatible wheel, return it.
if let Some(built_wheel) = BuiltWheelMetadata::find_in_cache(self.tags, &cache_entry) { if let Some(built_wheel) = BuiltWheelMetadata::find_in_cache(self.tags, &cache_shard) {
return Ok(built_wheel); return Ok(built_wheel);
} }
@ -297,7 +295,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
source_dist, source_dist,
source_dist_entry.path(), source_dist_entry.path(),
subdirectory, subdirectory,
&cache_entry, &cache_shard,
) )
.await?; .await?;
@ -308,11 +306,11 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
} }
// Store the metadata. // Store the metadata.
let cache_entry = cache_entry.with_file(METADATA); let metadata_entry = cache_shard.entry(METADATA);
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?; write_atomic(metadata_entry.path(), rmp_serde::to_vec(&metadata)?).await?;
let path = cache_entry.dir().join(&disk_filename); let path = cache_shard.join(&disk_filename);
let target = cache_entry.dir().join(wheel_filename.stem()); let target = cache_shard.join(wheel_filename.stem());
Ok(BuiltWheelMetadata { Ok(BuiltWheelMetadata {
path, path,
@ -338,27 +336,23 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
let download = |response| { let download = |response| {
async { async {
// At this point, we're seeing a new or updated source distribution; delete all // At this point, we're seeing a new or updated source distribution. Initialize a
// wheels, and redownload. // new manifest, to collect the source and built artifacts.
match fs::remove_dir_all(&cache_entry.dir()).await { let manifest = Manifest::new();
Ok(()) => debug!("Cleared built wheels and metadata for {source_dist}"),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => (),
Err(err) => return Err(err.into()),
}
debug!("Downloading source distribution: {source_dist}");
// Download the source distribution. // Download the source distribution.
let source_dist_entry = cache_shard.entry(filename); debug!("Downloading source distribution: {source_dist}");
let source_dist_entry = cache_shard.shard(manifest.digest()).entry(filename);
self.persist_source_dist_url(response, source_dist, filename, &source_dist_entry) self.persist_source_dist_url(response, source_dist, filename, &source_dist_entry)
.await?; .await?;
Ok(Manifest) Ok(manifest)
} }
.instrument(info_span!("download", source_dist = %source_dist)) .instrument(info_span!("download", source_dist = %source_dist))
}; };
let req = self.cached_client.uncached().get(url.clone()).build()?; let req = self.cached_client.uncached().get(url.clone()).build()?;
self.cached_client let manifest = self
.cached_client
.get_cached_with_callback(req, &cache_entry, download) .get_cached_with_callback(req, &cache_entry, download)
.await .await
.map_err(|err| match err { .map_err(|err| match err {
@ -366,8 +360,11 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
CachedClientError::Client(err) => SourceDistError::Client(err), CachedClientError::Client(err) => SourceDistError::Client(err),
})?; })?;
// From here on, scope all operations to the current build.
let cache_shard = cache_shard.shard(manifest.digest());
// If the cache contains compatible metadata, return it. // If the cache contains compatible metadata, return it.
if let Some(metadata) = Self::read_metadata(&cache_entry.with_file(METADATA)).await? { if let Some(metadata) = read_cached_metadata(&cache_shard.entry(METADATA)).await? {
return Ok(metadata.clone()); return Ok(metadata.clone());
} }
@ -381,7 +378,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
.await? .await?
{ {
// Store the metadata. // Store the metadata.
let cache_entry = cache_entry.with_file(METADATA); let cache_entry = cache_shard.entry(METADATA);
fs::create_dir_all(cache_entry.dir()).await?; fs::create_dir_all(cache_entry.dir()).await?;
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?; write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?;
@ -401,12 +398,12 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
source_dist, source_dist,
source_dist_entry.path(), source_dist_entry.path(),
subdirectory, subdirectory,
&cache_entry, &cache_shard,
) )
.await?; .await?;
// Store the metadata. // Store the metadata.
let cache_entry = cache_entry.with_file(METADATA); let cache_entry = cache_shard.entry(METADATA);
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?; write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?;
if let Some(task) = task { if let Some(task) = task {
@ -424,11 +421,10 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
source_dist: &SourceDist, source_dist: &SourceDist,
path_source_dist: &PathSourceDist, path_source_dist: &PathSourceDist,
) -> Result<BuiltWheelMetadata, SourceDistError> { ) -> Result<BuiltWheelMetadata, SourceDistError> {
let cache_entry = self.build_context.cache().entry( let cache_shard = self.build_context.cache().shard(
CacheBucket::BuiltWheels, CacheBucket::BuiltWheels,
WheelCache::Path(&path_source_dist.url) WheelCache::Path(&path_source_dist.url)
.remote_wheel_dir(path_source_dist.name().as_ref()), .remote_wheel_dir(path_source_dist.name().as_ref()),
MANIFEST,
); );
// Determine the last-modified time of the source distribution. // Determine the last-modified time of the source distribution.
@ -437,10 +433,14 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
}; };
// Read the existing metadata from the cache, to clear stale wheels. // Read the existing metadata from the cache, to clear stale wheels.
Self::refresh_cached_manifest(&cache_entry, modified).await?; let manifest_entry = cache_shard.entry(MANIFEST);
let manifest = refresh_timestamp_manifest(&manifest_entry, modified).await?;
// From here on, scope all operations to the current build.
let cache_shard = cache_shard.shard(manifest.digest());
// If the cache contains a compatible wheel, return it. // If the cache contains a compatible wheel, return it.
if let Some(built_wheel) = BuiltWheelMetadata::find_in_cache(self.tags, &cache_entry) { if let Some(built_wheel) = BuiltWheelMetadata::find_in_cache(self.tags, &cache_shard) {
return Ok(built_wheel); return Ok(built_wheel);
} }
@ -451,7 +451,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
.map(|reporter| reporter.on_build_start(source_dist)); .map(|reporter| reporter.on_build_start(source_dist));
let (disk_filename, filename, metadata) = self let (disk_filename, filename, metadata) = self
.build_source_dist(source_dist, &path_source_dist.path, None, &cache_entry) .build_source_dist(source_dist, &path_source_dist.path, None, &cache_shard)
.await?; .await?;
if let Some(task) = task { if let Some(task) = task {
@ -461,15 +461,12 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
} }
// Store the metadata. // Store the metadata.
let cache_entry = cache_entry.with_file(METADATA); let cache_entry = cache_shard.entry(METADATA);
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?; write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?;
let path = cache_entry.dir().join(&disk_filename);
let target = cache_entry.dir().join(filename.stem());
Ok(BuiltWheelMetadata { Ok(BuiltWheelMetadata {
path, path: cache_shard.join(&disk_filename),
target, target: cache_shard.join(filename.stem()),
filename, filename,
}) })
} }
@ -483,11 +480,10 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
source_dist: &SourceDist, source_dist: &SourceDist,
path_source_dist: &PathSourceDist, path_source_dist: &PathSourceDist,
) -> Result<Metadata21, SourceDistError> { ) -> Result<Metadata21, SourceDistError> {
let cache_entry = self.build_context.cache().entry( let cache_shard = self.build_context.cache().shard(
CacheBucket::BuiltWheels, CacheBucket::BuiltWheels,
WheelCache::Path(&path_source_dist.url) WheelCache::Path(&path_source_dist.url)
.remote_wheel_dir(path_source_dist.name().as_ref()), .remote_wheel_dir(path_source_dist.name().as_ref()),
MANIFEST,
); );
// Determine the last-modified time of the source distribution. // Determine the last-modified time of the source distribution.
@ -496,10 +492,14 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
}; };
// Read the existing metadata from the cache, to clear stale entries. // Read the existing metadata from the cache, to clear stale entries.
Self::refresh_cached_manifest(&cache_entry, modified).await?; let manifest_entry = cache_shard.entry(MANIFEST);
let manifest = refresh_timestamp_manifest(&manifest_entry, modified).await?;
// From here on, scope all operations to the current build.
let cache_shard = cache_shard.shard(manifest.digest());
// If the cache contains compatible metadata, return it. // If the cache contains compatible metadata, return it.
if let Some(metadata) = Self::read_metadata(&cache_entry.with_file(METADATA)).await? { if let Some(metadata) = read_cached_metadata(&cache_shard.entry(METADATA)).await? {
return Ok(metadata.clone()); return Ok(metadata.clone());
} }
@ -510,7 +510,8 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
.await? .await?
{ {
// Store the metadata. // Store the metadata.
let cache_entry = cache_entry.with_file(METADATA); let cache_entry = cache_shard.entry(METADATA);
fs::create_dir_all(cache_entry.dir()).await?;
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?; write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?;
return Ok(metadata); return Ok(metadata);
@ -523,7 +524,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
.map(|reporter| reporter.on_build_start(source_dist)); .map(|reporter| reporter.on_build_start(source_dist));
let (_disk_filename, _filename, metadata) = self let (_disk_filename, _filename, metadata) = self
.build_source_dist(source_dist, &path_source_dist.path, None, &cache_entry) .build_source_dist(source_dist, &path_source_dist.path, None, &cache_shard)
.await?; .await?;
if let Some(task) = task { if let Some(task) = task {
@ -533,7 +534,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
} }
// Store the metadata. // Store the metadata.
let cache_entry = cache_entry.with_file(METADATA); let cache_entry = cache_shard.entry(METADATA);
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?; write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?;
Ok(metadata) Ok(metadata)
@ -548,18 +549,14 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
let (fetch, subdirectory) = self.download_source_dist_git(&git_source_dist.url).await?; let (fetch, subdirectory) = self.download_source_dist_git(&git_source_dist.url).await?;
let git_sha = fetch.git().precise().expect("Exact commit after checkout"); let git_sha = fetch.git().precise().expect("Exact commit after checkout");
let cache_entry = self.build_context.cache().entry( let cache_shard = self.build_context.cache().shard(
CacheBucket::BuiltWheels, CacheBucket::BuiltWheels,
WheelCache::Git(&git_source_dist.url, &git_sha.to_short_string()) WheelCache::Git(&git_source_dist.url, &git_sha.to_short_string())
.remote_wheel_dir(git_source_dist.name().as_ref()), .remote_wheel_dir(git_source_dist.name().as_ref()),
MANIFEST,
); );
// Read the existing metadata from the cache, to clear stale entries.
Self::refresh_manifest(&cache_entry).await?;
// If the cache contains a compatible wheel, return it. // If the cache contains a compatible wheel, return it.
if let Some(built_wheel) = BuiltWheelMetadata::find_in_cache(self.tags, &cache_entry) { if let Some(built_wheel) = BuiltWheelMetadata::find_in_cache(self.tags, &cache_shard) {
return Ok(built_wheel); return Ok(built_wheel);
} }
@ -573,7 +570,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
source_dist, source_dist,
fetch.path(), fetch.path(),
subdirectory.as_deref(), subdirectory.as_deref(),
&cache_entry, &cache_shard,
) )
.await?; .await?;
@ -584,15 +581,12 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
} }
// Store the metadata. // Store the metadata.
let cache_entry = cache_entry.with_file(METADATA); let cache_entry = cache_shard.entry(METADATA);
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?; write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?;
let path = cache_entry.dir().join(&disk_filename);
let target = cache_entry.dir().join(filename.stem());
Ok(BuiltWheelMetadata { Ok(BuiltWheelMetadata {
path, path: cache_shard.join(&disk_filename),
target, target: cache_shard.join(filename.stem()),
filename, filename,
}) })
} }
@ -609,18 +603,14 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
let (fetch, subdirectory) = self.download_source_dist_git(&git_source_dist.url).await?; let (fetch, subdirectory) = self.download_source_dist_git(&git_source_dist.url).await?;
let git_sha = fetch.git().precise().expect("Exact commit after checkout"); let git_sha = fetch.git().precise().expect("Exact commit after checkout");
let cache_entry = self.build_context.cache().entry( let cache_shard = self.build_context.cache().shard(
CacheBucket::BuiltWheels, CacheBucket::BuiltWheels,
WheelCache::Git(&git_source_dist.url, &git_sha.to_short_string()) WheelCache::Git(&git_source_dist.url, &git_sha.to_short_string())
.remote_wheel_dir(git_source_dist.name().as_ref()), .remote_wheel_dir(git_source_dist.name().as_ref()),
MANIFEST,
); );
// Read the existing metadata from the cache, to clear stale entries.
Self::refresh_manifest(&cache_entry).await?;
// If the cache contains compatible metadata, return it. // If the cache contains compatible metadata, return it.
if let Some(metadata) = Self::read_metadata(&cache_entry.with_file(METADATA)).await? { if let Some(metadata) = read_cached_metadata(&cache_shard.entry(METADATA)).await? {
return Ok(metadata.clone()); return Ok(metadata.clone());
} }
@ -631,7 +621,8 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
.await? .await?
{ {
// Store the metadata. // Store the metadata.
let cache_entry = cache_entry.with_file(METADATA); let cache_entry = cache_shard.entry(METADATA);
fs::create_dir_all(cache_entry.dir()).await?;
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?; write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?;
return Ok(metadata); return Ok(metadata);
@ -648,7 +639,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
source_dist, source_dist,
fetch.path(), fetch.path(),
subdirectory.as_deref(), subdirectory.as_deref(),
&cache_entry, &cache_shard,
) )
.await?; .await?;
@ -659,7 +650,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
} }
// Store the metadata. // Store the metadata.
let cache_entry = cache_entry.with_file(METADATA); let cache_entry = cache_shard.entry(METADATA);
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?; write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?;
Ok(metadata) Ok(metadata)
@ -782,7 +773,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
dist: &SourceDist, dist: &SourceDist,
source_dist: &Path, source_dist: &Path,
subdirectory: Option<&Path>, subdirectory: Option<&Path>,
cache_entry: &CacheEntry, cache_shard: &CacheShard,
) -> Result<(String, WheelFilename, Metadata21), SourceDistError> { ) -> Result<(String, WheelFilename, Metadata21), SourceDistError> {
debug!("Building: {dist}"); debug!("Building: {dist}");
@ -791,7 +782,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
} }
// Build the wheel. // Build the wheel.
fs::create_dir_all(&cache_entry.dir()).await?; fs::create_dir_all(&cache_shard).await?;
let disk_filename = self let disk_filename = self
.build_context .build_context
.setup_build( .setup_build(
@ -802,13 +793,13 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
) )
.await .await
.map_err(|err| SourceDistError::Build(dist.to_string(), err))? .map_err(|err| SourceDistError::Build(dist.to_string(), err))?
.wheel(cache_entry.dir()) .wheel(cache_shard)
.await .await
.map_err(|err| SourceDistError::Build(dist.to_string(), err))?; .map_err(|err| SourceDistError::Build(dist.to_string(), err))?;
// Read the metadata from the wheel. // Read the metadata from the wheel.
let filename = WheelFilename::from_str(&disk_filename)?; let filename = WheelFilename::from_str(&disk_filename)?;
let metadata = read_metadata(&filename, cache_entry.dir().join(&disk_filename))?; let metadata = read_wheel_metadata(&filename, cache_shard.join(&disk_filename))?;
// Validate the metadata. // Validate the metadata.
if &metadata.name != dist.name() { if &metadata.name != dist.name() {
@ -896,70 +887,75 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
path: editable.path.clone(), path: editable.path.clone(),
editable: true, editable: true,
})); }));
let metadata = read_metadata(&filename, editable_wheel_dir.join(&disk_filename))?; let metadata = read_wheel_metadata(&filename, editable_wheel_dir.join(&disk_filename))?;
debug!("Finished building (editable): {dist}"); debug!("Finished building (editable): {dist}");
Ok((dist, disk_filename, filename, metadata)) Ok((dist, disk_filename, filename, metadata))
} }
}
/// Read an existing cached [`Manifest`], if it exists and is up-to-date. /// Read an existing HTTP-cached [`Manifest`], if it exists.
pub(crate) fn read_http_manifest(
cache_entry: &CacheEntry,
) -> Result<Option<Manifest>, SourceDistError> {
match std::fs::read(cache_entry.path()) {
Ok(cached) => Ok(Some(
rmp_serde::from_slice::<DataWithCachePolicy<Manifest>>(&cached)?.data,
)),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(err.into()),
}
}
/// Read an existing timestamped [`Manifest`], if it exists and is up-to-date.
/// ///
/// If the cache entry is stale, it will be removed and recreated. /// If the cache entry is stale, a new entry will be created.
async fn refresh_cached_manifest( pub(crate) fn read_timestamp_manifest(
cache_entry: &CacheEntry, cache_entry: &CacheEntry,
modified: std::time::SystemTime, modified: std::time::SystemTime,
) -> Result<Manifest, SourceDistError> { ) -> Result<Option<Manifest>, SourceDistError> {
// If the cache entry is up-to-date, return it; if it's stale, remove it. // If the cache entry is up-to-date, return it.
match fs::read(&cache_entry.path()).await { match std::fs::read(cache_entry.path()) {
Ok(cached) => { Ok(cached) => {
let cached = rmp_serde::from_slice::<CachedByTimestamp<Manifest>>(&cached)?; let cached = rmp_serde::from_slice::<CachedByTimestamp<Manifest>>(&cached)?;
if cached.timestamp == modified { if cached.timestamp == modified {
return Ok(cached.data); return Ok(Some(cached.data));
}
debug!(
"Removing stale built wheels for: {}",
cache_entry.path().display()
);
if let Err(err) = fs::remove_dir_all(&cache_entry.dir()).await {
warn!("Failed to remove stale built wheel cache directory: {err}");
} }
} }
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {} Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
Err(err) => return Err(err.into()), Err(err) => return Err(err.into()),
} }
Ok(None)
}
// Write a new cache entry. /// Read an existing timestamped [`Manifest`], if it exists and is up-to-date.
///
/// If the cache entry is stale, a new entry will be created.
pub(crate) async fn refresh_timestamp_manifest(
cache_entry: &CacheEntry,
modified: std::time::SystemTime,
) -> Result<Manifest, SourceDistError> {
// If the cache entry is up-to-date, return it.
if let Some(manifest) = read_timestamp_manifest(cache_entry, modified)? {
return Ok(manifest);
}
// Otherwise, create a new manifest.
let manifest = Manifest::new();
fs::create_dir_all(&cache_entry.dir()).await?; fs::create_dir_all(&cache_entry.dir()).await?;
write_atomic( write_atomic(
cache_entry.path(), cache_entry.path(),
rmp_serde::to_vec(&CachedByTimestamp { rmp_serde::to_vec(&CachedByTimestamp {
timestamp: modified, timestamp: modified,
data: Manifest, data: manifest,
})?, })?,
) )
.await?; .await?;
Ok(manifest)
Ok(Manifest)
}
/// Read an existing cached [`Manifest`], if it exists.
///
/// If the cache entry does not exist, it will be created.
async fn refresh_manifest(cache_entry: &CacheEntry) -> Result<Manifest, SourceDistError> {
match fs::read(&cache_entry.path()).await {
Ok(cached) => Ok(rmp_serde::from_slice::<Manifest>(&cached)?),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
fs::create_dir_all(&cache_entry.dir()).await?;
write_atomic(cache_entry.path(), rmp_serde::to_vec(&Manifest)?).await?;
Ok(Manifest)
}
Err(err) => Err(err.into()),
}
} }
/// Read an existing cached [`Metadata21`], if it exists. /// Read an existing cached [`Metadata21`], if it exists.
async fn read_metadata( pub(crate) async fn read_cached_metadata(
cache_entry: &CacheEntry, cache_entry: &CacheEntry,
) -> Result<Option<Metadata21>, SourceDistError> { ) -> Result<Option<Metadata21>, SourceDistError> {
match fs::read(&cache_entry.path()).await { match fs::read(&cache_entry.path()).await {
@ -968,10 +964,9 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
Err(err) => Err(err.into()), Err(err) => Err(err.into()),
} }
} }
}
/// Read the [`Metadata21`] from a built wheel. /// Read the [`Metadata21`] from a built wheel.
fn read_metadata( fn read_wheel_metadata(
filename: &WheelFilename, filename: &WheelFilename,
wheel: impl Into<PathBuf>, wheel: impl Into<PathBuf>,
) -> Result<Metadata21, SourceDistError> { ) -> Result<Metadata21, SourceDistError> {

View file

@ -3,13 +3,12 @@ use std::io;
use std::path::Path; use std::path::Path;
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use puffin_traits::NoBinary;
use rustc_hash::FxHashSet; use rustc_hash::FxHashSet;
use tracing::{debug, warn}; use tracing::{debug, warn};
use distribution_types::{ use distribution_types::{
git_reference, BuiltDist, CachedDirectUrlDist, CachedDist, Dist, IndexLocations, BuiltDist, CachedDirectUrlDist, CachedDist, Dist, IndexLocations, InstalledDirectUrlDist,
InstalledDirectUrlDist, InstalledDist, Name, SourceDist, InstalledDist, Name, SourceDist,
}; };
use pep508_rs::{Requirement, VersionOrUrl}; use pep508_rs::{Requirement, VersionOrUrl};
use platform_tags::Tags; use platform_tags::Tags;
@ -17,6 +16,7 @@ use puffin_cache::{Cache, CacheBucket, CacheEntry, WheelCache};
use puffin_distribution::{BuiltWheelIndex, RegistryWheelIndex}; use puffin_distribution::{BuiltWheelIndex, RegistryWheelIndex};
use puffin_interpreter::Virtualenv; use puffin_interpreter::Virtualenv;
use puffin_normalize::PackageName; use puffin_normalize::PackageName;
use puffin_traits::NoBinary;
use crate::{ResolvedEditable, SitePackages}; use crate::{ResolvedEditable, SitePackages};
@ -51,7 +51,7 @@ impl<'a> Planner<'a> {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn build( pub fn build(
self, self,
mut site_packages: SitePackages, mut site_packages: SitePackages<'_>,
reinstall: &Reinstall, reinstall: &Reinstall,
no_binary: &NoBinary, no_binary: &NoBinary,
index_locations: &IndexLocations, index_locations: &IndexLocations,
@ -235,11 +235,13 @@ impl<'a> Planner<'a> {
// Find the exact wheel from the cache, since we know the filename in // Find the exact wheel from the cache, since we know the filename in
// advance. // advance.
let cache_entry = cache.entry( let cache_entry = cache
.shard(
CacheBucket::Wheels, CacheBucket::Wheels,
WheelCache::Url(&wheel.url).remote_wheel_dir(wheel.name().as_ref()), WheelCache::Url(&wheel.url)
wheel.filename.stem(), .remote_wheel_dir(wheel.name().as_ref()),
); )
.entry(wheel.filename.stem());
if cache_entry.path().exists() { if cache_entry.path().exists() {
let cached_dist = CachedDirectUrlDist::from_url( let cached_dist = CachedDirectUrlDist::from_url(
@ -270,11 +272,13 @@ impl<'a> Planner<'a> {
// Find the exact wheel from the cache, since we know the filename in // Find the exact wheel from the cache, since we know the filename in
// advance. // advance.
let cache_entry = cache.entry( let cache_entry = cache
.shard(
CacheBucket::Wheels, CacheBucket::Wheels,
WheelCache::Url(&wheel.url).remote_wheel_dir(wheel.name().as_ref()), WheelCache::Url(&wheel.url)
wheel.filename.stem(), .remote_wheel_dir(wheel.name().as_ref()),
); )
.entry(wheel.filename.stem());
if is_fresh_cache(&cache_entry, &wheel.path)? { if is_fresh_cache(&cache_entry, &wheel.path)? {
let cached_dist = CachedDirectUrlDist::from_url( let cached_dist = CachedDirectUrlDist::from_url(
@ -291,12 +295,7 @@ impl<'a> Planner<'a> {
Dist::Source(SourceDist::DirectUrl(sdist)) => { Dist::Source(SourceDist::DirectUrl(sdist)) => {
// Find the most-compatible wheel from the cache, since we don't know // Find the most-compatible wheel from the cache, since we don't know
// the filename in advance. // the filename in advance.
let cache_shard = cache.shard( if let Some(wheel) = BuiltWheelIndex::url(&sdist, cache, tags)? {
CacheBucket::BuiltWheels,
WheelCache::Url(&sdist.url).remote_wheel_dir(sdist.name().as_ref()),
);
if let Some(wheel) = BuiltWheelIndex::find(&cache_shard, tags) {
let cached_dist = wheel.into_url_dist(url.clone()); let cached_dist = wheel.into_url_dist(url.clone());
debug!("URL source requirement already cached: {cached_dist}"); debug!("URL source requirement already cached: {cached_dist}");
local.push(CachedDist::Url(cached_dist)); local.push(CachedDist::Url(cached_dist));
@ -306,32 +305,17 @@ impl<'a> Planner<'a> {
Dist::Source(SourceDist::Path(sdist)) => { Dist::Source(SourceDist::Path(sdist)) => {
// Find the most-compatible wheel from the cache, since we don't know // Find the most-compatible wheel from the cache, since we don't know
// the filename in advance. // the filename in advance.
let cache_shard = cache.shard( if let Some(wheel) = BuiltWheelIndex::path(&sdist, cache, tags)? {
CacheBucket::BuiltWheels,
WheelCache::Path(&sdist.url)
.remote_wheel_dir(sdist.name().as_ref()),
);
if let Some(wheel) = BuiltWheelIndex::find(&cache_shard, tags) {
if is_fresh_cache(&wheel.entry, &sdist.path)? {
let cached_dist = wheel.into_url_dist(url.clone()); let cached_dist = wheel.into_url_dist(url.clone());
debug!("Path source requirement already cached: {cached_dist}"); debug!("Path source requirement already cached: {cached_dist}");
local.push(CachedDist::Url(cached_dist)); local.push(CachedDist::Url(cached_dist));
continue; continue;
} }
} }
}
Dist::Source(SourceDist::Git(sdist)) => { Dist::Source(SourceDist::Git(sdist)) => {
// Find the most-compatible wheel from the cache, since we don't know // Find the most-compatible wheel from the cache, since we don't know
// the filename in advance. // the filename in advance.
if let Ok(Some(git_sha)) = git_reference(&sdist.url) { if let Some(wheel) = BuiltWheelIndex::git(&sdist, cache, tags) {
let cache_shard = cache.shard(
CacheBucket::BuiltWheels,
WheelCache::Git(&sdist.url, &git_sha.to_short_string())
.remote_wheel_dir(sdist.name().as_ref()),
);
if let Some(wheel) = BuiltWheelIndex::find(&cache_shard, tags) {
let cached_dist = wheel.into_url_dist(url.clone()); let cached_dist = wheel.into_url_dist(url.clone());
debug!("Git source requirement already cached: {cached_dist}"); debug!("Git source requirement already cached: {cached_dist}");
local.push(CachedDist::Url(cached_dist)); local.push(CachedDist::Url(cached_dist));
@ -341,7 +325,6 @@ impl<'a> Planner<'a> {
} }
} }
} }
}
debug!("Identified uncached requirement: {requirement}"); debug!("Identified uncached requirement: {requirement}");
remote.push(requirement.clone()); remote.push(requirement.clone());