Store source distribution sources in the cache (#653)

## Summary

This PR modifies `source_dist.rs` to store source distributions (from
remote URLs) in the cache. The cache structure for registries now looks
like:

<img width="1053" alt="Screen Shot 2023-12-14 at 10 43 43 PM"
src="3c2dbf6b-5926-41f2-b69b-74031741aba8">

(I will update the docs prior to merging, if approved.)

The benefit here is that we can reuse the source distribution (avoid
download + unzipping it) if we need to build multiple wheels. In the
future, it will be even more relevant, since we'll need to reuse the
source distribution to support
https://github.com/astral-sh/puffin/issues/599.

I also included some misc. refactors to DRY up repeated operations and
add some more abstraction to `source_dist.rs`.
This commit is contained in:
Charlie Marsh 2023-12-15 12:19:33 -05:00 committed by GitHub
parent a361ccfbb3
commit 84093773ef
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 187 additions and 126 deletions

View file

@ -287,7 +287,7 @@ pub enum CacheBucket {
/// directories in the cache.
///
/// Cache structure:
/// * `built-wheels-v0/pypi/foo/foo-1.0.0.zip/{metadata.json, foo-1.0.0-py3-none-any.whl, ...other wheels}`
/// * `built-wheels-v0/pypi/foo/34a17436ed1e9669/{metadata.json, foo-1.0.0.zip, foo-1.0.0-py3-none-any.whl, ...other wheels}`
/// * `built-wheels-v0/<digest(index-url)>/foo/foo-1.0.0.zip/{metadata.json, foo-1.0.0-py3-none-any.whl, ...other wheels}`
/// * `built-wheels-v0/url/<digest(url)>/foo/foo-1.0.0.zip/{metadata.json, foo-1.0.0-py3-none-any.whl, ...other wheels}`
/// * `built-wheels-v0/git/<digest(url)>/<git sha>/foo/foo-1.0.0.zip/{metadata.json, foo-1.0.0-py3-none-any.whl, ...other wheels}`

View file

@ -158,7 +158,7 @@ impl CachedClient {
cached: Option<DataWithCachePolicy<T>>,
) -> Result<CachedResponse<T>, crate::Error> {
// The converted types are from the specific `reqwest` types to the more generic `http`
// types
// types.
let mut converted_req = http::Request::try_from(
req.try_clone()
.expect("You can't use streaming request bodies with this function"),

View file

@ -77,6 +77,8 @@ pub enum SourceDistError {
Zip(#[from] ZipError),
#[error("Source distribution directory contains neither readable pyproject.toml nor setup.py")]
DirWithoutEntrypoint,
#[error("Failed to extract source distribution: {0}")]
Extract(#[from] puffin_extract::Error),
/// Should not occur; only seen when another task panicked.
#[error("The task executor is broken, did some other task panic?")]
@ -91,8 +93,6 @@ struct DiskFilenameAndMetadata {
metadata: Metadata21,
}
type Metadata21s = FxHashMap<WheelFilename, DiskFilenameAndMetadata>;
/// The information about the wheel we either just built or got from the cache
#[derive(Debug, Clone)]
pub struct BuiltWheelMetadata {
@ -121,6 +121,37 @@ impl BuiltWheelMetadata {
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
struct Manifest(FxHashMap<WheelFilename, DiskFilenameAndMetadata>);
impl Manifest {
/// Initialize a [`Manifest`] from an iterator over entries.
fn from_iter(iter: impl IntoIterator<Item = (WheelFilename, DiskFilenameAndMetadata)>) -> Self {
Self(iter.into_iter().collect())
}
/// Find a compatible wheel in the cache.
fn find_compatible(&self, tags: &Tags) -> Option<(&WheelFilename, &DiskFilenameAndMetadata)> {
self.0
.iter()
.find(|(filename, _metadata)| filename.is_compatible(tags))
}
}
impl std::ops::Deref for Manifest {
type Target = FxHashMap<WheelFilename, DiskFilenameAndMetadata>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::ops::DerefMut for Manifest {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
/// Fetch and build a source distribution from a remote source, or from a local cache.
pub struct SourceDistCachedBuilder<'a, T: BuildContext> {
build_context: &'a T,
@ -159,7 +190,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
SourceDist::DirectUrl(direct_url_source_dist) => {
let filename = direct_url_source_dist
.filename()
.unwrap_or(direct_url_source_dist.url.path());
.expect("Distribution must have a filename");
let DirectArchiveUrl { url, subdirectory } =
DirectArchiveUrl::from(direct_url_source_dist.url.raw());
@ -173,7 +204,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
source_dist,
filename,
&url,
cache_shard,
&cache_shard,
subdirectory.as_deref(),
)
.await?
@ -183,20 +214,20 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
SourceDistError::UrlParse(registry_source_dist.file.url.clone(), err)
})?;
// For registry source distributions, shard by distribution, then by filename.
// Ex) `pypi/requests/requests-2.25.1.tar.gz`
// For registry source distributions, shard by package, then by SHA.
// Ex) `pypi/requests/a673187abc19fe6c`
let cache_shard = self.build_context.cache().shard(
CacheBucket::BuiltWheels,
WheelCache::Index(&registry_source_dist.index)
.remote_wheel_dir(registry_source_dist.name.as_ref())
.join(&registry_source_dist.file.filename),
.join(&registry_source_dist.file.hashes.sha256[..16]),
);
self.url(
source_dist,
&registry_source_dist.file.filename,
&url,
cache_shard,
&cache_shard,
None,
)
.await?
@ -215,36 +246,34 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
source_dist: &'data SourceDist,
filename: &'data str,
url: &'data Url,
cache_shard: CacheShard,
cache_shard: &CacheShard,
subdirectory: Option<&'data Path>,
) -> Result<BuiltWheelMetadata, SourceDistError> {
let cache_entry = cache_shard.entry(METADATA_JSON.to_string());
let response_callback = |response| async {
// New or changed source distribution, delete all built wheels
if cache_entry.dir.exists() {
debug!("Clearing built wheels and metadata for {source_dist}");
fs::remove_dir_all(&cache_entry.dir).await?;
// At this point, we're seeing a new or updated source distribution; delete all
// wheels, and rebuild.
match fs::remove_dir_all(&cache_entry.dir).await {
Ok(()) => debug!("Cleared built wheels and metadata for {source_dist}"),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => (),
Err(err) => return Err(err.into()),
}
debug!("Downloading and building source distribution: {source_dist}");
debug!("Downloading and building source distribution: {source_dist}");
let task = self
.reporter
.as_ref()
.map(|reporter| reporter.on_build_start(source_dist));
let span =
info_span!("download_source_dist", filename = filename, source_dist = %source_dist);
let (temp_dir, sdist_file) = self.download_source_dist_url(response, filename).await?;
drop(span);
// Download the source distribution.
let cache_dir = self
.persist_source_dist_url(response, source_dist, filename, cache_shard)
.await?;
// Build the source distribution.
let (disk_filename, wheel_filename, metadata) = self
.build_source_dist(
source_dist,
temp_dir,
&sdist_file,
subdirectory,
&cache_entry,
)
.build_source_dist(source_dist, &cache_dir, subdirectory, &cache_entry)
.await?;
if let Some(task) = task {
@ -253,18 +282,16 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
}
}
let mut metadatas = Metadata21s::default();
metadatas.insert(
Ok(Manifest::from_iter([(
wheel_filename,
DiskFilenameAndMetadata {
disk_filename,
metadata,
},
);
Ok(metadatas)
)]))
};
let req = self.cached_client.uncached().get(url.clone()).build()?;
let metadatas = self
let manifest = self
.cached_client
.get_cached_with_callback(req, &cache_entry, response_callback)
.await
@ -273,10 +300,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
CachedClientError::Client(err) => SourceDistError::Client(err),
})?;
if let Some((filename, cached_data)) = metadatas
.iter()
.find(|(filename, _metadata)| filename.is_compatible(self.tags))
{
if let Some((filename, cached_data)) = manifest.find_compatible(self.tags) {
return Ok(BuiltWheelMetadata::from_cached(
filename,
cached_data,
@ -284,12 +308,14 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
));
}
// At this point, we're seeing cached metadata (fresh source dist) but the
// wheel(s) we built previously are incompatible
// At this point, we're seeing cached metadata (as in, we have an up-to-date source
// distribution), but the wheel(s) we built previously are incompatible.
let task = self
.reporter
.as_ref()
.map(|reporter| reporter.on_build_start(source_dist));
// Start by downloading the source distribution.
let response = self
.cached_client
.uncached()
@ -297,19 +323,15 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
.send()
.await
.map_err(puffin_client::Error::RequestMiddlewareError)?;
let span =
info_span!("download_source_dist", filename = filename, source_dist = %source_dist);
let (temp_dir, sdist_file) = self.download_source_dist_url(response, filename).await?;
drop(span);
let (disk_filename, wheel_filename, metadata) = self
.build_source_dist(
source_dist,
temp_dir,
&sdist_file,
subdirectory,
&cache_entry,
)
let cache_dir = self
.persist_source_dist_url(response, source_dist, filename, cache_shard)
.await?;
// Build the source distribution.
let (disk_filename, wheel_filename, metadata) = self
.build_source_dist(source_dist, &cache_dir, subdirectory, &cache_entry)
.await?;
if let Some(task) = task {
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_build_complete(source_dist, task);
@ -328,7 +350,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
if let Ok(cached) = fs::read(cache_entry.path()).await {
// If the file exists and it was just read or written by `CachedClient`, we assume it must
// be correct.
let mut cached = serde_json::from_slice::<DataWithCachePolicy<Metadata21s>>(&cached)?;
let mut cached = serde_json::from_slice::<DataWithCachePolicy<Manifest>>(&cached)?;
cached
.data
@ -356,9 +378,10 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
METADATA_JSON.to_string(),
);
// Determine the last-modified time of the source distribution.
let file_metadata = fs_err::metadata(&path_source_dist.path)?;
// `modified()` is infallible on Windows and Unix (i.e., all platforms we support).
let modified = if file_metadata.is_file() {
// `modified()` is infallible on windows and unix (i.e., all platforms we support).
file_metadata.modified()?
} else {
if let Some(metadata) = path_source_dist
@ -382,55 +405,28 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
}
};
let mut metadatas = if cache_entry.path().is_file() {
let cached = fs::read(&cache_entry.path()).await.ok().and_then(|cached| {
serde_json::from_slice::<CachedByTimestamp<Metadata21s>>(&cached).ok()
});
if let Some(cached) = cached {
if cached.timestamp == modified {
// Do we have previous compatible build of this source dist?
if let Some((filename, cached_data)) = cached
.data
.iter()
.find(|(filename, _metadata)| filename.is_compatible(self.tags))
{
return Ok(BuiltWheelMetadata::from_cached(
filename,
cached_data,
&cache_entry,
));
}
cached.data
} else {
debug!(
"Removing stale built wheels for: {}",
cache_entry.path().display()
);
if let Err(err) = fs::remove_dir_all(&cache_entry.dir).await {
warn!("Failed to remove stale built wheel cache directory: {err}");
}
Metadata21s::default()
}
} else {
Metadata21s::default()
}
} else {
Metadata21s::default()
};
// Read the existing metadata from the cache.
let mut manifest = Self::read_fresh_metadata(&cache_entry, modified)
.await?
.unwrap_or_default();
// If the cache contains a compatible wheel, return it.
if let Some((filename, cached_data)) = manifest.find_compatible(self.tags) {
return Ok(BuiltWheelMetadata::from_cached(
filename,
cached_data,
&cache_entry,
));
}
// Otherwise, we need to build a wheel.
let task = self
.reporter
.as_ref()
.map(|reporter| reporter.on_build_start(source_dist));
let (disk_filename, filename, metadata) = self
.build_source_dist(
source_dist,
None,
&path_source_dist.path,
None,
&cache_entry,
)
.build_source_dist(source_dist, &path_source_dist.path, None, &cache_entry)
.await?;
if metadata.name != path_source_dist.name {
@ -441,7 +437,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
}
// Store the metadata for this build along with all the other builds.
metadatas.insert(
manifest.insert(
filename.clone(),
DiskFilenameAndMetadata {
disk_filename: disk_filename.clone(),
@ -450,9 +446,8 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
);
let cached = CachedByTimestamp {
timestamp: modified,
data: metadatas,
data: manifest,
};
fs::create_dir_all(&cache_entry.dir).await?;
let data = serde_json::to_vec(&cached)?;
write_atomic(cache_entry.path(), data).await?;
@ -490,24 +485,17 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
METADATA_JSON.to_string(),
);
let mut metadatas = if cache_entry.path().is_file() {
let cached = fs::read(&cache_entry.path()).await?;
let metadatas = serde_json::from_slice::<Metadata21s>(&cached)?;
// Do we have previous compatible build of this source dist?
if let Some((filename, cached_data)) = metadatas
.iter()
.find(|(filename, _metadata)| filename.is_compatible(self.tags))
{
return Ok(BuiltWheelMetadata::from_cached(
filename,
cached_data,
&cache_entry,
));
}
metadatas
} else {
Metadata21s::default()
};
// Read the existing metadata from the cache.
let mut manifest = Self::read_metadata(&cache_entry).await?.unwrap_or_default();
// If the cache contains a compatible wheel, return it.
if let Some((filename, cached_data)) = manifest.find_compatible(self.tags) {
return Ok(BuiltWheelMetadata::from_cached(
filename,
cached_data,
&cache_entry,
));
}
let task = self
.reporter
@ -517,7 +505,6 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
let (disk_filename, filename, metadata) = self
.build_source_dist(
source_dist,
None,
fetch.path(),
subdirectory.as_deref(),
&cache_entry,
@ -532,15 +519,14 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
}
// Store the metadata for this build along with all the other builds.
metadatas.insert(
manifest.insert(
filename.clone(),
DiskFilenameAndMetadata {
disk_filename: disk_filename.clone(),
metadata: metadata.clone(),
},
);
fs::create_dir_all(&cache_entry.dir).await?;
let data = serde_json::to_vec(&metadatas)?;
let data = serde_json::to_vec(&manifest)?;
write_atomic(cache_entry.path(), data).await?;
if let Some(task) = task {
@ -560,12 +546,57 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
})
}
/// Download and unzip a source distribution into the cache from an HTTP response.
async fn persist_source_dist_url(
&self,
response: Response,
source_dist: &SourceDist,
filename: &str,
cache_shard: &CacheShard,
) -> Result<PathBuf, SourceDistError> {
let cache_entry = cache_shard.entry(filename);
let cache_path = cache_entry.path();
if cache_path.is_dir() {
debug!("Distribution is already cached: {source_dist}");
return Ok(cache_path);
}
// Download the source distribution to a temporary file.
let span =
info_span!("download_source_dist", filename = filename, source_dist = %source_dist);
let (temp_dir, source_dist_archive) =
self.download_source_dist_url(response, filename).await?;
drop(span);
// Unzip the source distribution to a temporary directory.
let span =
info_span!("extract_source_dist", filename = filename, source_dist = %source_dist);
let source_dist_dir = puffin_extract::extract_source(
&source_dist_archive,
temp_dir.path().join("extracted"),
)?;
drop(span);
// Persist the unzipped distribution to the cache.
fs::create_dir_all(&cache_entry.dir).await?;
if let Err(err) = fs_err::rename(&source_dist_dir, &cache_path) {
// If another thread already cached the distribution, we can ignore the error.
if cache_path.is_dir() {
warn!("Downloaded already-cached distribution: {source_dist}");
} else {
return Err(err.into());
};
}
Ok(cache_path)
}
/// Download a source distribution from a URL to a temporary file.
async fn download_source_dist_url(
&self,
response: Response,
source_dist_filename: &str,
) -> Result<(Option<TempDir>, PathBuf), puffin_client::Error> {
) -> Result<(TempDir, PathBuf), puffin_client::Error> {
let reader = response
.bytes_stream()
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
@ -590,7 +621,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
.await
.map_err(puffin_client::Error::CacheWrite)?;
Ok((Some(temp_dir), sdist_file))
Ok((temp_dir, sdist_file))
}
/// Download a source distribution from a Git repository.
@ -626,7 +657,6 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
async fn build_source_dist(
&self,
dist: &SourceDist,
temp_dir: Option<TempDir>,
source_dist: &Path,
subdirectory: Option<&Path>,
cache_entry: &CacheEntry,
@ -648,10 +678,6 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
.await
.map_err(|err| SourceDistError::Build(Box::new(dist.clone()), err))?;
if let Some(temp_dir) = temp_dir {
temp_dir.close()?;
}
// Read the metadata from the wheel.
let filename = WheelFilename::from_str(&disk_filename)?;
let metadata = read_metadata(&filename, cache_entry.dir.join(&disk_filename))?;
@ -659,6 +685,41 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
debug!("Finished building: {dist}");
Ok((disk_filename, filename, metadata))
}
/// Read an existing cache entry, if it exists and is up-to-date.
async fn read_fresh_metadata(
cache_entry: &CacheEntry,
modified: std::time::SystemTime,
) -> Result<Option<Manifest>, SourceDistError> {
match fs::read(&cache_entry.path()).await {
Ok(cached) => {
let cached = serde_json::from_slice::<CachedByTimestamp<Manifest>>(&cached)?;
if cached.timestamp == modified {
Ok(Some(cached.data))
} else {
debug!(
"Removing stale built wheels for: {}",
cache_entry.path().display()
);
if let Err(err) = fs::remove_dir_all(&cache_entry.dir).await {
warn!("Failed to remove stale built wheel cache directory: {err}");
}
Ok(None)
}
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(err.into()),
}
}
/// Read an existing cache entry, if it exists.
async fn read_metadata(cache_entry: &CacheEntry) -> Result<Option<Manifest>, SourceDistError> {
match fs::read(&cache_entry.path()).await {
Ok(cached) => Ok(Some(serde_json::from_slice::<Manifest>(&cached)?)),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(err.into()),
}
}
}
/// Read the [`Metadata21`] from a built wheel.