Add --refresh behavior to the cache (#1057)

## Summary

This PR is an alternative approach to #949 which should be much safer.
As in #949, we add a `Refresh` policy to the cache. However, instead of
deleting entries from the cache the first time we read them, we now
check if the entry is sufficiently new (created after the start of the
command) if the refresh policy applies. If the entry is stale, then we
avoid reading it and continue onward, relying on the cache to
appropriately overwrite based on "new" data. (This relies on the
preceding PRs, which ensure the cache is append-only, and ensure that we
can atomically overwrite.)

Unfortunately, there are just a lot of paths through the cache, and
didn't data is handled with different policies, so I really had to go
through and consider the "right" behavior for each case. For example,
the HTTP requests can use `max-age=0, must-revalidate`. But for the
routes that are based on filesystem modification, we need to do
something slightly different.

Closes #945.
This commit is contained in:
Charlie Marsh 2024-01-23 18:30:26 -05:00 committed by GitHub
parent cf8b452414
commit 1b3a3f4e80
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 657 additions and 184 deletions

View file

@ -23,8 +23,10 @@ use distribution_types::{
use install_wheel_rs::read_dist_info;
use pep508_rs::VerbatimUrl;
use platform_tags::Tags;
use puffin_cache::{CacheBucket, CacheEntry, CacheShard, CachedByTimestamp, WheelCache};
use puffin_client::{CachedClient, CachedClientError, DataWithCachePolicy};
use puffin_cache::{
ArchiveTimestamp, CacheBucket, CacheEntry, CacheShard, CachedByTimestamp, Freshness, WheelCache,
};
use puffin_client::{CacheControl, CachedClient, CachedClientError, DataWithCachePolicy};
use puffin_fs::{write_atomic, LockedFile};
use puffin_git::{Fetch, GitSource};
use puffin_traits::{BuildContext, BuildKind, SourceBuildTrait};
@ -247,6 +249,11 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
subdirectory: Option<&'data Path>,
) -> Result<BuiltWheelMetadata, SourceDistError> {
let cache_entry = cache_shard.entry(MANIFEST);
let cache_control = CacheControl::from(
self.build_context
.cache()
.freshness(&cache_entry, Some(source_dist.name()))?,
);
let download = |response| {
async {
@ -267,14 +274,16 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
let req = self.cached_client.uncached().get(url.clone()).build()?;
let manifest = self
.cached_client
.get_cached_with_callback(req, &cache_entry, download)
.get_cached_with_callback(req, &cache_entry, cache_control, download)
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,
CachedClientError::Client(err) => SourceDistError::Client(err),
})?;
// From here on, scope all operations to the current build.
// From here on, scope all operations to the current build. Within the manifest shard,
// there's no need to check for freshness, since entries have to be fresher than the
// manifest itself.
let cache_shard = cache_shard.shard(manifest.digest());
// If the cache contains a compatible wheel, return it.
@ -282,8 +291,6 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
return Ok(built_wheel);
}
// At this point, we're seeing cached metadata (as in, we have an up-to-date source
// distribution), but the wheel(s) we built previously are incompatible.
let task = self
.reporter
.as_ref()
@ -310,12 +317,9 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
let metadata_entry = cache_shard.entry(METADATA);
write_atomic(metadata_entry.path(), rmp_serde::to_vec(&metadata)?).await?;
let path = cache_shard.join(&disk_filename);
let target = cache_shard.join(wheel_filename.stem());
Ok(BuiltWheelMetadata {
path,
target,
path: cache_shard.join(&disk_filename),
target: cache_shard.join(wheel_filename.stem()),
filename: wheel_filename,
})
}
@ -334,6 +338,11 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
subdirectory: Option<&'data Path>,
) -> Result<Metadata21, SourceDistError> {
let cache_entry = cache_shard.entry(MANIFEST);
let cache_control = CacheControl::from(
self.build_context
.cache()
.freshness(&cache_entry, Some(source_dist.name()))?,
);
let download = |response| {
async {
@ -354,18 +363,22 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
let req = self.cached_client.uncached().get(url.clone()).build()?;
let manifest = self
.cached_client
.get_cached_with_callback(req, &cache_entry, download)
.get_cached_with_callback(req, &cache_entry, cache_control, download)
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,
CachedClientError::Client(err) => SourceDistError::Client(err),
})?;
// From here on, scope all operations to the current build.
// From here on, scope all operations to the current build. Within the manifest shard,
// there's no need to check for freshness, since entries have to be fresher than the
// manifest itself.
let cache_shard = cache_shard.shard(manifest.digest());
// If the cache contains compatible metadata, return it.
if let Some(metadata) = read_cached_metadata(&cache_shard.entry(METADATA)).await? {
let metadata_entry = cache_shard.entry(METADATA);
if let Some(metadata) = read_cached_metadata(&metadata_entry).await? {
debug!("Using cached metadata for {source_dist}");
return Ok(metadata.clone());
}
@ -386,8 +399,6 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
return Ok(metadata);
}
// At this point, we're seeing cached metadata (as in, we have an up-to-date source
// distribution), but the wheel(s) we built previously are incompatible.
let task = self
.reporter
.as_ref()
@ -429,15 +440,22 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
);
// Determine the last-modified time of the source distribution.
let Some(modified) = puffin_cache::archive_mtime(&path_source_dist.path)? else {
let Some(modified) = ArchiveTimestamp::from_path(&path_source_dist.path)? else {
return Err(SourceDistError::DirWithoutEntrypoint);
};
// Read the existing metadata from the cache, to clear stale wheels.
// Read the existing metadata from the cache.
let manifest_entry = cache_shard.entry(MANIFEST);
let manifest = refresh_timestamp_manifest(&manifest_entry, modified).await?;
let manifest_freshness = self
.build_context
.cache()
.freshness(&manifest_entry, Some(source_dist.name()))?;
let manifest =
refresh_timestamp_manifest(&manifest_entry, manifest_freshness, modified).await?;
// From here on, scope all operations to the current build.
// From here on, scope all operations to the current build. Within the manifest shard,
// there's no need to check for freshness, since entries have to be fresher than the
// manifest itself.
let cache_shard = cache_shard.shard(manifest.digest());
// If the cache contains a compatible wheel, return it.
@ -488,20 +506,36 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
);
// Determine the last-modified time of the source distribution.
let Some(modified) = puffin_cache::archive_mtime(&path_source_dist.path)? else {
let Some(modified) = ArchiveTimestamp::from_path(&path_source_dist.path)? else {
return Err(SourceDistError::DirWithoutEntrypoint);
};
// Read the existing metadata from the cache, to clear stale entries.
let manifest_entry = cache_shard.entry(MANIFEST);
let manifest = refresh_timestamp_manifest(&manifest_entry, modified).await?;
let manifest_freshness = self
.build_context
.cache()
.freshness(&manifest_entry, Some(source_dist.name()))?;
let manifest =
refresh_timestamp_manifest(&manifest_entry, manifest_freshness, modified).await?;
// From here on, scope all operations to the current build.
// From here on, scope all operations to the current build. Within the manifest shard,
// there's no need to check for freshness, since entries have to be fresher than the
// manifest itself.
let cache_shard = cache_shard.shard(manifest.digest());
// If the cache contains compatible metadata, return it.
if let Some(metadata) = read_cached_metadata(&cache_shard.entry(METADATA)).await? {
return Ok(metadata.clone());
let metadata_entry = cache_shard.entry(METADATA);
if self
.build_context
.cache()
.freshness(&metadata_entry, Some(source_dist.name()))
.is_ok_and(Freshness::is_fresh)
{
if let Some(metadata) = read_cached_metadata(&metadata_entry).await? {
debug!("Using cached metadata for {source_dist}");
return Ok(metadata.clone());
}
}
// If the backend supports `prepare_metadata_for_build_wheel`, use it.
@ -611,8 +645,17 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
);
// If the cache contains compatible metadata, return it.
if let Some(metadata) = read_cached_metadata(&cache_shard.entry(METADATA)).await? {
return Ok(metadata.clone());
let metadata_entry = cache_shard.entry(METADATA);
if self
.build_context
.cache()
.freshness(&metadata_entry, Some(source_dist.name()))
.is_ok_and(Freshness::is_fresh)
{
if let Some(metadata) = read_cached_metadata(&metadata_entry).await? {
debug!("Using cached metadata for {source_dist}");
return Ok(metadata.clone());
}
}
// If the backend supports `prepare_metadata_for_build_wheel`, use it.
@ -902,13 +945,13 @@ pub(crate) fn read_http_manifest(
/// If the cache entry is stale, a new entry will be created.
pub(crate) fn read_timestamp_manifest(
cache_entry: &CacheEntry,
modified: SystemTime,
modified: ArchiveTimestamp,
) -> Result<Option<Manifest>, SourceDistError> {
// If the cache entry is up-to-date, return it.
match std::fs::read(cache_entry.path()) {
Ok(cached) => {
let cached = rmp_serde::from_slice::<CachedByTimestamp<SystemTime, Manifest>>(&cached)?;
if cached.timestamp == modified {
if cached.timestamp == modified.timestamp() {
return Ok(Some(cached.data));
}
}
@ -923,11 +966,14 @@ pub(crate) fn read_timestamp_manifest(
/// If the cache entry is stale, a new entry will be created.
pub(crate) async fn refresh_timestamp_manifest(
cache_entry: &CacheEntry,
modified: SystemTime,
freshness: Freshness,
modified: ArchiveTimestamp,
) -> Result<Manifest, SourceDistError> {
// If the cache entry is up-to-date, return it.
if let Some(manifest) = read_timestamp_manifest(cache_entry, modified)? {
return Ok(manifest);
// If we know the exact modification time, we don't need to force a revalidate.
if matches!(modified, ArchiveTimestamp::Exact(_)) || freshness.is_fresh() {
if let Some(manifest) = read_timestamp_manifest(cache_entry, modified)? {
return Ok(manifest);
}
}
// Otherwise, create a new manifest.
@ -936,7 +982,7 @@ pub(crate) async fn refresh_timestamp_manifest(
write_atomic(
cache_entry.path(),
rmp_serde::to_vec(&CachedByTimestamp {
timestamp: modified,
timestamp: modified.timestamp(),
data: manifest,
})?,
)