Avoid persisting manifest data in standalone file (#1044)

## Summary

This PR gets rid of the manifest that we store for source distributions.
Historically, that manifest included the source distribution metadata,
plus a list of built wheels.

The problem with the manifest is that it duplicates state, since we now
have to look at both the manifest and the filesystem to understand the
cache state. Instead, I think we should treat the cache as the source of
truth, and get rid of the duplicated state in the manifest.

Now, we store the manifest (which is merely used to check for cache
freshness -- in future PRs, I will repurpose it though, so I left it
around), then the distribution metadata as its own file, then any
distributions in the same directory. When we want to see if there are
any valid distributions, we `readdir` on the directory. This is also
much more consistent with how the install plan works.
This commit is contained in:
Charlie Marsh 2024-01-23 14:46:48 -05:00 committed by GitHub
parent 19c5cc8aba
commit e32027e384
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 148 additions and 277 deletions

View file

@ -23,7 +23,7 @@ use install_wheel_rs::read_dist_info;
use pep508_rs::VerbatimUrl;
use platform_tags::Tags;
use puffin_cache::{CacheBucket, CacheEntry, CacheShard, CachedByTimestamp, WheelCache};
use puffin_client::{CachedClient, CachedClientError, DataWithCachePolicy};
use puffin_client::{CachedClient, CachedClientError};
use puffin_fs::{write_atomic, LockedFile};
use puffin_git::{Fetch, GitSource};
use puffin_traits::{BuildContext, BuildKind, SourceBuildTrait};
@ -32,7 +32,7 @@ use pypi_types::Metadata21;
use crate::reporter::Facade;
use crate::source::built_wheel_metadata::BuiltWheelMetadata;
pub use crate::source::error::SourceDistError;
use crate::source::manifest::{DiskFilenameAndMetadata, Manifest};
use crate::source::manifest::Manifest;
use crate::Reporter;
mod built_wheel_metadata;
@ -50,6 +50,9 @@ pub struct SourceDistCachedBuilder<'a, T: BuildContext> {
/// The name of the file that contains the cached manifest, encoded via `MsgPack`.
const MANIFEST: &str = "manifest.msgpack";
/// The name of the file that contains the cached distribution metadata, encoded via `MsgPack`.
const METADATA: &str = "metadata.msgpack";
impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
/// Initialize a [`SourceDistCachedBuilder`] from a [`BuildContext`].
pub fn new(build_context: &'a T, cached_client: &'a CachedClient, tags: &'a Tags) -> Self {
@ -262,13 +265,12 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
self.persist_source_dist_url(response, source_dist, filename, &source_dist_entry)
.await?;
Ok(Manifest::default())
Ok(Manifest)
}
.instrument(info_span!("download", source_dist = %source_dist))
};
let req = self.cached_client.uncached().get(url.clone()).build()?;
let manifest = self
.cached_client
self.cached_client
.get_cached_with_callback(req, &cache_entry, download)
.await
.map_err(|err| match err {
@ -277,9 +279,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
})?;
// If the cache contains a compatible wheel, return it.
if let Some(built_wheel) =
BuiltWheelMetadata::find_in_cache(self.tags, &manifest, &cache_entry)
{
if let Some(built_wheel) = BuiltWheelMetadata::find_in_cache(self.tags, &cache_entry) {
return Ok(built_wheel);
}
@ -301,37 +301,24 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
)
.await?;
let cached_data = DiskFilenameAndMetadata {
disk_filename: disk_filename.clone(),
metadata: metadata.clone(),
};
// Not elegant that we have to read again here, but also not too relevant given that we
// have to build a source dist next.
// Just return if the response wasn't cacheable or there was another errors that
// `CachedClient` already complained about
if let Ok(cached) = fs::read(cache_entry.path()).await {
// If the file exists and it was just read or written by `CachedClient`, we assume it
// must be correct.
let mut cached = rmp_serde::from_slice::<DataWithCachePolicy<Manifest>>(&cached)?;
cached
.data
.insert_wheel(wheel_filename.clone(), cached_data.clone());
write_atomic(cache_entry.path(), rmp_serde::to_vec(&cached)?).await?;
};
if let Some(task) = task {
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_build_complete(source_dist, task);
}
}
Ok(BuiltWheelMetadata::from_cached(
wheel_filename,
cached_data,
&cache_entry,
))
// Store the metadata.
let cache_entry = cache_entry.with_file(METADATA);
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?;
let path = cache_entry.dir().join(&disk_filename);
let target = cache_entry.dir().join(wheel_filename.stem());
Ok(BuiltWheelMetadata {
path,
target,
filename: wheel_filename,
})
}
/// Build the source distribution's metadata from a local path.
@ -366,13 +353,12 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
self.persist_source_dist_url(response, source_dist, filename, &source_dist_entry)
.await?;
Ok(Manifest::default())
Ok(Manifest)
}
.instrument(info_span!("download", source_dist = %source_dist))
};
let req = self.cached_client.uncached().get(url.clone()).build()?;
let manifest = self
.cached_client
self.cached_client
.get_cached_with_callback(req, &cache_entry, download)
.await
.map_err(|err| match err {
@ -381,7 +367,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
})?;
// If the cache contains compatible metadata, return it.
if let Some(metadata) = manifest.find_metadata() {
if let Some(metadata) = Self::read_metadata(&cache_entry.with_file(METADATA)).await? {
return Ok(metadata.clone());
}
@ -394,12 +380,10 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
.boxed()
.await?
{
if let Ok(cached) = fs::read(cache_entry.path()).await {
let mut cached = rmp_serde::from_slice::<DataWithCachePolicy<Manifest>>(&cached)?;
cached.data.set_metadata(metadata.clone());
write_atomic(cache_entry.path(), rmp_serde::to_vec(&cached)?).await?;
};
// Store the metadata.
let cache_entry = cache_entry.with_file(METADATA);
fs::create_dir_all(cache_entry.dir()).await?;
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?;
return Ok(metadata);
}
@ -412,7 +396,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
.map(|reporter| reporter.on_build_start(source_dist));
// Build the source distribution.
let (disk_filename, wheel_filename, metadata) = self
let (_disk_filename, _wheel_filename, metadata) = self
.build_source_dist(
source_dist,
source_dist_entry.path(),
@ -421,24 +405,9 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
)
.await?;
// Not elegant that we have to read again here, but also not too relevant given that we
// have to build a source dist next.
// Just return if the response wasn't cacheable or there was another errors that
// `CachedClient` already complained about
if let Ok(cached) = fs::read(cache_entry.path()).await {
// If the file exists and it was just read or written by `CachedClient`, we assume it
// must be correct.
let mut cached = rmp_serde::from_slice::<DataWithCachePolicy<Manifest>>(&cached)?;
cached.data.insert_wheel(
wheel_filename.clone(),
DiskFilenameAndMetadata {
disk_filename: disk_filename.clone(),
metadata: metadata.clone(),
},
);
write_atomic(cache_entry.path(), rmp_serde::to_vec(&cached)?).await?;
};
// Store the metadata.
let cache_entry = cache_entry.with_file(METADATA);
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?;
if let Some(task) = task {
if let Some(reporter) = self.reporter.as_ref() {
@ -467,15 +436,11 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
return Err(SourceDistError::DirWithoutEntrypoint);
};
// Read the existing metadata from the cache.
let mut manifest = Self::read_cached_manifest(&cache_entry, modified)
.await?
.unwrap_or_default();
// Read the existing metadata from the cache, to clear stale wheels.
Self::refresh_cached_manifest(&cache_entry, modified).await?;
// If the cache contains a compatible wheel, return it.
if let Some(built_wheel) =
BuiltWheelMetadata::find_in_cache(self.tags, &manifest, &cache_entry)
{
if let Some(built_wheel) = BuiltWheelMetadata::find_in_cache(self.tags, &cache_entry) {
return Ok(built_wheel);
}
@ -489,27 +454,16 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
.build_source_dist(source_dist, &path_source_dist.path, None, &cache_entry)
.await?;
// Store the metadata for this build along with all the other builds.
manifest.insert_wheel(
filename.clone(),
DiskFilenameAndMetadata {
disk_filename: disk_filename.clone(),
metadata: metadata.clone(),
},
);
let cached = CachedByTimestamp {
timestamp: modified,
data: manifest,
};
let data = rmp_serde::to_vec(&cached)?;
write_atomic(cache_entry.path(), data).await?;
if let Some(task) = task {
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_build_complete(source_dist, task);
}
}
// Store the metadata.
let cache_entry = cache_entry.with_file(METADATA);
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?;
let path = cache_entry.dir().join(&disk_filename);
let target = cache_entry.dir().join(filename.stem());
@ -541,13 +495,11 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
return Err(SourceDistError::DirWithoutEntrypoint);
};
// Read the existing metadata from the cache.
let mut manifest = Self::read_cached_manifest(&cache_entry, modified)
.await?
.unwrap_or_default();
// Read the existing metadata from the cache, to clear stale entries.
Self::refresh_cached_manifest(&cache_entry, modified).await?;
// If the cache contains compatible metadata, return it.
if let Some(metadata) = manifest.find_metadata() {
if let Some(metadata) = Self::read_metadata(&cache_entry.with_file(METADATA)).await? {
return Ok(metadata.clone());
}
@ -557,16 +509,9 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
.boxed()
.await?
{
// Store the metadata for this build along with all the other builds.
manifest.set_metadata(metadata.clone());
let cached = CachedByTimestamp {
timestamp: modified,
data: manifest,
};
let data = rmp_serde::to_vec(&cached)?;
fs::create_dir_all(&cache_entry.dir()).await?;
write_atomic(cache_entry.path(), data).await?;
// Store the metadata.
let cache_entry = cache_entry.with_file(METADATA);
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?;
return Ok(metadata);
}
@ -577,31 +522,20 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
.as_ref()
.map(|reporter| reporter.on_build_start(source_dist));
let (disk_filename, filename, metadata) = self
let (_disk_filename, _filename, metadata) = self
.build_source_dist(source_dist, &path_source_dist.path, None, &cache_entry)
.await?;
// Store the metadata for this build along with all the other builds.
manifest.insert_wheel(
filename.clone(),
DiskFilenameAndMetadata {
disk_filename: disk_filename.clone(),
metadata: metadata.clone(),
},
);
let cached = CachedByTimestamp {
timestamp: modified,
data: manifest,
};
let data = rmp_serde::to_vec(&cached)?;
write_atomic(cache_entry.path(), data).await?;
if let Some(task) = task {
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_build_complete(source_dist, task);
}
}
// Store the metadata.
let cache_entry = cache_entry.with_file(METADATA);
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?;
Ok(metadata)
}
@ -613,7 +547,6 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
) -> Result<BuiltWheelMetadata, SourceDistError> {
let (fetch, subdirectory) = self.download_source_dist_git(&git_source_dist.url).await?;
// TODO(konstin): Do we want to delete old built wheels when the git sha changed?
let git_sha = fetch.git().precise().expect("Exact commit after checkout");
let cache_entry = self.build_context.cache().entry(
CacheBucket::BuiltWheels,
@ -622,13 +555,11 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
MANIFEST,
);
// Read the existing metadata from the cache.
let mut manifest = Self::read_manifest(&cache_entry).await?.unwrap_or_default();
// Read the existing metadata from the cache, to clear stale entries.
Self::refresh_manifest(&cache_entry).await?;
// If the cache contains a compatible wheel, return it.
if let Some(built_wheel) =
BuiltWheelMetadata::find_in_cache(self.tags, &manifest, &cache_entry)
{
if let Some(built_wheel) = BuiltWheelMetadata::find_in_cache(self.tags, &cache_entry) {
return Ok(built_wheel);
}
@ -646,23 +577,16 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
)
.await?;
// Store the metadata for this build along with all the other builds.
manifest.insert_wheel(
filename.clone(),
DiskFilenameAndMetadata {
disk_filename: disk_filename.clone(),
metadata: metadata.clone(),
},
);
let data = rmp_serde::to_vec(&manifest)?;
write_atomic(cache_entry.path(), data).await?;
if let Some(task) = task {
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_build_complete(source_dist, task);
}
}
// Store the metadata.
let cache_entry = cache_entry.with_file(METADATA);
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?;
let path = cache_entry.dir().join(&disk_filename);
let target = cache_entry.dir().join(filename.stem());
@ -692,11 +616,11 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
MANIFEST,
);
// Read the existing metadata from the cache.
let mut manifest = Self::read_manifest(&cache_entry).await?.unwrap_or_default();
// Read the existing metadata from the cache, to clear stale entries.
Self::refresh_manifest(&cache_entry).await?;
// If the cache contains compatible metadata, return it.
if let Some(metadata) = manifest.find_metadata() {
if let Some(metadata) = Self::read_metadata(&cache_entry.with_file(METADATA)).await? {
return Ok(metadata.clone());
}
@ -706,12 +630,9 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
.boxed()
.await?
{
// Store the metadata for this build along with all the other builds.
manifest.set_metadata(metadata.clone());
let data = rmp_serde::to_vec(&manifest)?;
fs::create_dir_all(&cache_entry.dir()).await?;
write_atomic(cache_entry.path(), data).await?;
// Store the metadata.
let cache_entry = cache_entry.with_file(METADATA);
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?;
return Ok(metadata);
}
@ -722,7 +643,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
.as_ref()
.map(|reporter| reporter.on_build_start(source_dist));
let (disk_filename, filename, metadata) = self
let (_disk_filename, _filename, metadata) = self
.build_source_dist(
source_dist,
fetch.path(),
@ -731,23 +652,16 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
)
.await?;
// Store the metadata for this build along with all the other builds.
manifest.insert_wheel(
filename.clone(),
DiskFilenameAndMetadata {
disk_filename: disk_filename.clone(),
metadata: metadata.clone(),
},
);
let data = rmp_serde::to_vec(&manifest)?;
write_atomic(cache_entry.path(), data).await?;
if let Some(task) = task {
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_build_complete(source_dist, task);
}
}
// Store the metadata.
let cache_entry = cache_entry.with_file(METADATA);
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?).await?;
Ok(metadata)
}
@ -975,7 +889,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
.await
.map_err(|err| SourceDistError::BuildEditable(editable.to_string(), err))?;
let filename = WheelFilename::from_str(&disk_filename)?;
// We finally have the name of the package and can construct the dist
// We finally have the name of the package and can construct the dist.
let dist = Dist::Source(SourceDist::Path(PathSourceDist {
name: filename.name.clone(),
url: editable.url().clone(),
@ -989,35 +903,67 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
}
/// Read an existing cached [`Manifest`], if it exists and is up-to-date.
async fn read_cached_manifest(
///
/// If the cache entry is stale, it will be removed and recreated.
async fn refresh_cached_manifest(
cache_entry: &CacheEntry,
modified: std::time::SystemTime,
) -> Result<Option<Manifest>, SourceDistError> {
) -> Result<Manifest, SourceDistError> {
// If the cache entry is up-to-date, return it; if it's stale, remove it.
match fs::read(&cache_entry.path()).await {
Ok(cached) => {
let cached = rmp_serde::from_slice::<CachedByTimestamp<Manifest>>(&cached)?;
if cached.timestamp == modified {
Ok(Some(cached.data))
} else {
debug!(
"Removing stale built wheels for: {}",
cache_entry.path().display()
);
if let Err(err) = fs::remove_dir_all(&cache_entry.dir()).await {
warn!("Failed to remove stale built wheel cache directory: {err}");
}
Ok(None)
return Ok(cached.data);
}
debug!(
"Removing stale built wheels for: {}",
cache_entry.path().display()
);
if let Err(err) = fs::remove_dir_all(&cache_entry.dir()).await {
warn!("Failed to remove stale built wheel cache directory: {err}");
}
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
Err(err) => return Err(err.into()),
}
// Write a new cache entry.
fs::create_dir_all(&cache_entry.dir()).await?;
write_atomic(
cache_entry.path(),
rmp_serde::to_vec(&CachedByTimestamp {
timestamp: modified,
data: Manifest,
})?,
)
.await?;
Ok(Manifest)
}
/// Read an existing cached [`Manifest`], if it exists.
///
/// If the cache entry does not exist, it will be created.
async fn refresh_manifest(cache_entry: &CacheEntry) -> Result<Manifest, SourceDistError> {
match fs::read(&cache_entry.path()).await {
Ok(cached) => Ok(rmp_serde::from_slice::<Manifest>(&cached)?),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
fs::create_dir_all(&cache_entry.dir()).await?;
write_atomic(cache_entry.path(), rmp_serde::to_vec(&Manifest)?).await?;
Ok(Manifest)
}
Err(err) => Err(err.into()),
}
}
/// Read an existing cached [`Manifest`], if it exists.
async fn read_manifest(cache_entry: &CacheEntry) -> Result<Option<Manifest>, SourceDistError> {
/// Read an existing cached [`Metadata21`], if it exists.
async fn read_metadata(
cache_entry: &CacheEntry,
) -> Result<Option<Metadata21>, SourceDistError> {
match fs::read(&cache_entry.path()).await {
Ok(cached) => Ok(Some(rmp_serde::from_slice::<Manifest>(&cached)?)),
Ok(cached) => Ok(Some(rmp_serde::from_slice::<Metadata21>(&cached)?)),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(err.into()),
}