Add hash-checking support to install and sync (#2945)

## Summary

This PR adds support for hash-checking mode in `pip install` and `pip
sync`. It's a large change, both in terms of the size of the diff and
the modifications in behavior, but it's also one that's hard to merge in
pieces (at least, with any test coverage) since it needs to work
end-to-end to be useful and testable.

Here are some of the most important highlights:

- We store hashes in the cache. Where we previously stored pointers to
unzipped wheels in the `archives` directory, we now store pointers with
a set of known hashes. So every pointer to an unzipped wheel also
includes its known hashes.
- By default, we don't compute any hashes. If the user runs with
`--require-hashes`, and the cache doesn't contain those hashes, we
invalidate the cache, redownload the wheel, and compute the hashes as we
go. For users that don't run with `--require-hashes`, there will be no
change in performance. For users that _do_, the only change will be if
they don't run with `--generate-hashes` -- then they may see some
repeated work between resolution and installation, if they use `pip
compile` then `pip sync`.
- Many of the distribution types now include a `hashes` field, like
`CachedDist` and `LocalWheel`.
- Our behavior is similar to pip, in that we enforce hashes when pulling
any remote distributions, and when pulling from our own cache. Like pip,
though, we _don't_ enforce hashes if a distribution is _already_
installed.
- Hash validity is enforced in a few different places:
1. During resolution, we enforce hash validity based on the hashes
reported by the registry. If we need to access a source distribution,
though, we then enforce hash validity at that point too, prior to
running any untrusted code. (This is enforced in the distribution
database.)
2. In the install plan, we _only_ add cached distributions that have
matching hashes. If a cached distribution is missing any hashes, or the
hashes don't match, we don't return them from the install plan.
3. In the downloader, we _only_ return distributions with matching
hashes.
4. The final combination of "things we install" are: (1) the wheels from
the cache, and (2) the downloaded wheels. So this ensures that we never
install any mismatching distributions.
- Like pip, if `--require-hashes` is provided, we require that _all_
distributions are pinned with either `==` or a direct URL. We also
require that _all_ distributions have hashes.

There are a few notable TODOs:

- We don't support hash-checking mode for unnamed requirements. These
should be _somewhat_ rare, though? Since `pip compile` never outputs
unnamed requirements. I can fix this, it's just some additional work.
- We don't automatically enable `--require-hashes` with a hash exists in
the requirements file. We require `--require-hashes`.

Closes #474.

## Test Plan

I'd like to add some tests for registries that report incorrect hashes,
but otherwise: `cargo test`
This commit is contained in:
Charlie Marsh 2024-04-10 15:09:03 -04:00 committed by GitHub
parent 715a309dd5
commit 1f3b5bb093
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
56 changed files with 3186 additions and 333 deletions

View file

@ -11,16 +11,19 @@ use url::Url;
use distribution_filename::WheelFilename;
use distribution_types::{
BuildableSource, BuiltDist, Dist, FileLocation, IndexLocations, LocalEditable, Name, SourceDist,
BuildableSource, BuiltDist, Dist, FileLocation, Hashed, IndexLocations, LocalEditable, Name,
SourceDist,
};
use platform_tags::Tags;
use pypi_types::Metadata23;
use pypi_types::{HashDigest, Metadata23};
use uv_cache::{ArchiveTimestamp, CacheBucket, CacheEntry, CachedByTimestamp, WheelCache};
use uv_client::{CacheControl, CachedClientError, Connectivity, RegistryClient};
use uv_configuration::{NoBinary, NoBuild};
use uv_extract::hash::Hasher;
use uv_fs::write_atomic;
use uv_types::BuildContext;
use crate::archive::Archive;
use crate::locks::Locks;
use crate::{Error, LocalWheel, Reporter, SourceDistributionBuilder};
@ -79,28 +82,38 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
/// Either fetch the wheel or fetch and build the source distribution
///
/// If `no_remote_wheel` is set, the wheel will be built from a source distribution
/// even if compatible pre-built wheels are available.
/// Returns a wheel that's compliant with the given platform tags.
///
/// While hashes will be generated in some cases, hash-checking is only enforced for source
/// distributions, and should be enforced by the caller for wheels.
#[instrument(skip_all, fields(%dist))]
pub async fn get_or_build_wheel(&self, dist: &Dist, tags: &Tags) -> Result<LocalWheel, Error> {
pub async fn get_or_build_wheel(
&self,
dist: &Dist,
tags: &Tags,
hashes: &[HashDigest],
) -> Result<LocalWheel, Error> {
match dist {
Dist::Built(built) => self.get_wheel(built).await,
Dist::Source(source) => self.build_wheel(source, tags).await,
Dist::Built(built) => self.get_wheel(built, hashes).await,
Dist::Source(source) => self.build_wheel(source, tags, hashes).await,
}
}
/// Either fetch the only wheel metadata (directly from the index or with range requests) or
/// fetch and build the source distribution.
///
/// Returns the [`Metadata23`], along with a "precise" URL for the source distribution, if
/// possible. For example, given a Git dependency with a reference to a branch or tag, return a
/// URL with a precise reference to the current commit of that branch or tag.
/// While hashes will be generated in some cases, hash-checking is only enforced for source
/// distributions, and should be enforced by the caller for wheels.
#[instrument(skip_all, fields(%dist))]
pub async fn get_or_build_wheel_metadata(&self, dist: &Dist) -> Result<Metadata23, Error> {
pub async fn get_or_build_wheel_metadata(
&self,
dist: &Dist,
hashes: &[HashDigest],
) -> Result<Metadata23, Error> {
match dist {
Dist::Built(built) => self.get_wheel_metadata(built).await,
Dist::Built(built) => self.get_wheel_metadata(built, hashes).await,
Dist::Source(source) => {
self.build_wheel_metadata(&BuildableSource::Dist(source))
self.build_wheel_metadata(&BuildableSource::Dist(source), hashes)
.await
}
}
@ -118,7 +131,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
.build_editable(editable, editable_wheel_dir)
.await?;
// Unzip.
// Unzip into the editable wheel directory.
let path = editable_wheel_dir.join(&disk_filename);
let target = editable_wheel_dir.join(cache_key::digest(&editable.path));
let archive = self.unzip_wheel(&path, &target).await?;
@ -126,13 +139,21 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
dist,
filename,
archive,
hashes: vec![],
};
Ok((wheel, metadata))
}
/// Fetch a wheel from the cache or download it from the index.
async fn get_wheel(&self, dist: &BuiltDist) -> Result<LocalWheel, Error> {
///
/// While hashes will be generated in some cases, hash-checking is _not_ enforced and should
/// instead be enforced by the caller.
async fn get_wheel(
&self,
dist: &BuiltDist,
hashes: &[HashDigest],
) -> Result<LocalWheel, Error> {
let no_binary = match self.build_context.no_binary() {
NoBinary::None => false,
NoBinary::All => true,
@ -157,8 +178,9 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
WheelCache::Index(&wheel.index).wheel_dir(wheel.name().as_ref()),
wheel.filename.stem(),
);
return self
.load_wheel(path, &wheel.filename, cache_entry, dist)
.load_wheel(path, &wheel.filename, cache_entry, dist, hashes)
.await;
}
};
@ -172,12 +194,13 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
// Download and unzip.
match self
.stream_wheel(url.clone(), &wheel.filename, &wheel_entry, dist)
.stream_wheel(url.clone(), &wheel.filename, &wheel_entry, dist, hashes)
.await
{
Ok(archive) => Ok(LocalWheel {
dist: Dist::Built(dist.clone()),
archive,
archive: archive.path,
hashes: archive.hashes,
filename: wheel.filename.clone(),
}),
Err(Error::Extract(err)) if err.is_http_streaming_unsupported() => {
@ -188,11 +211,12 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
// If the request failed because streaming is unsupported, download the
// wheel directly.
let archive = self
.download_wheel(url, &wheel.filename, &wheel_entry, dist)
.download_wheel(url, &wheel.filename, &wheel_entry, dist, hashes)
.await?;
Ok(LocalWheel {
dist: Dist::Built(dist.clone()),
archive,
archive: archive.path,
hashes: archive.hashes,
filename: wheel.filename.clone(),
})
}
@ -210,12 +234,19 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
// Download and unzip.
match self
.stream_wheel(wheel.url.raw().clone(), &wheel.filename, &wheel_entry, dist)
.stream_wheel(
wheel.url.raw().clone(),
&wheel.filename,
&wheel_entry,
dist,
hashes,
)
.await
{
Ok(archive) => Ok(LocalWheel {
dist: Dist::Built(dist.clone()),
archive,
archive: archive.path,
hashes: archive.hashes,
filename: wheel.filename.clone(),
}),
Err(Error::Client(err)) if err.is_http_streaming_unsupported() => {
@ -231,11 +262,13 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
&wheel.filename,
&wheel_entry,
dist,
hashes,
)
.await?;
Ok(LocalWheel {
dist: Dist::Built(dist.clone()),
archive,
archive: archive.path,
hashes: archive.hashes,
filename: wheel.filename.clone(),
})
}
@ -249,7 +282,8 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()),
wheel.filename.stem(),
);
self.load_wheel(&wheel.path, &wheel.filename, cache_entry, dist)
self.load_wheel(&wheel.path, &wheel.filename, cache_entry, dist, hashes)
.await
}
}
@ -257,24 +291,33 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
/// Convert a source distribution into a wheel, fetching it from the cache or building it if
/// necessary.
async fn build_wheel(&self, dist: &SourceDist, tags: &Tags) -> Result<LocalWheel, Error> {
///
/// The returned wheel is guaranteed to come from a distribution with a matching hash, and
/// no build processes will be executed for distributions with mismatched hashes.
async fn build_wheel(
&self,
dist: &SourceDist,
tags: &Tags,
hashes: &[HashDigest],
) -> Result<LocalWheel, Error> {
let lock = self.locks.acquire(&Dist::Source(dist.clone())).await;
let _guard = lock.lock().await;
let built_wheel = self
.builder
.download_and_build(&BuildableSource::Dist(dist), tags)
.download_and_build(&BuildableSource::Dist(dist), tags, hashes)
.boxed()
.await?;
// If the wheel was unzipped previously, respect it. Source distributions are
// cached under a unique build ID, so unzipped directories are never stale.
// cached under a unique revision ID, so unzipped directories are never stale.
match built_wheel.target.canonicalize() {
Ok(archive) => {
return Ok(LocalWheel {
dist: Dist::Source(dist.clone()),
archive,
filename: built_wheel.filename,
hashes: built_wheel.hashes,
});
}
Err(err) if err.kind() == io::ErrorKind::NotFound => {}
@ -287,12 +330,20 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
archive: self
.unzip_wheel(&built_wheel.path, &built_wheel.target)
.await?,
hashes: built_wheel.hashes,
filename: built_wheel.filename,
})
}
/// Fetch the wheel metadata from the index, or from the cache if possible.
pub async fn get_wheel_metadata(&self, dist: &BuiltDist) -> Result<Metadata23, Error> {
///
/// While hashes will be generated in some cases, hash-checking is _not_ enforced and should
/// instead be enforced by the caller.
pub async fn get_wheel_metadata(
&self,
dist: &BuiltDist,
hashes: &[HashDigest],
) -> Result<Metadata23, Error> {
match self.client.wheel_metadata(dist).boxed().await {
Ok(metadata) => Ok(metadata),
Err(err) if err.is_http_streaming_unsupported() => {
@ -300,7 +351,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
// If the request failed due to an error that could be resolved by
// downloading the wheel directly, try that.
let wheel = self.get_wheel(dist).await?;
let wheel = self.get_wheel(dist, hashes).await?;
Ok(wheel.metadata()?)
}
Err(err) => Err(err.into()),
@ -308,9 +359,13 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
}
/// Build the wheel metadata for a source distribution, or fetch it from the cache if possible.
///
/// The returned metadata is guaranteed to come from a distribution with a matching hash, and
/// no build processes will be executed for distributions with mismatched hashes.
pub async fn build_wheel_metadata(
&self,
source: &BuildableSource<'_>,
hashes: &[HashDigest],
) -> Result<Metadata23, Error> {
let no_build = match self.build_context.no_build() {
NoBuild::All => true,
@ -330,7 +385,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
let metadata = self
.builder
.download_and_build_metadata(source)
.download_and_build_metadata(source, hashes)
.boxed()
.await?;
Ok(metadata)
@ -343,7 +398,8 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
filename: &WheelFilename,
wheel_entry: &CacheEntry,
dist: &BuiltDist,
) -> Result<PathBuf, Error> {
hashes: &[HashDigest],
) -> Result<Archive, Error> {
// Create an entry for the HTTP cache.
let http_entry = wheel_entry.with_file(format!("{}.http", filename.stem()));
@ -354,23 +410,42 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
.map_err(|err| self.handle_response_errors(err))
.into_async_read();
// Create a hasher for each hash algorithm.
let algorithms = {
let mut hash = hashes.iter().map(HashDigest::algorithm).collect::<Vec<_>>();
hash.sort();
hash.dedup();
hash
};
let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
let mut hasher = uv_extract::hash::HashReader::new(reader.compat(), &mut hashers);
// Download and unzip the wheel to a temporary directory.
let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
.map_err(Error::CacheWrite)?;
uv_extract::stream::unzip(reader.compat(), temp_dir.path()).await?;
uv_extract::stream::unzip(&mut hasher, temp_dir.path()).await?;
// If necessary, exhaust the reader to compute the hash.
if !hashes.is_empty() {
hasher.finish().await.map_err(Error::HashExhaustion)?;
}
// Persist the temporary directory to the directory store.
let archive = self
let path = self
.build_context
.cache()
.persist(temp_dir.into_path(), wheel_entry.path())
.await
.map_err(Error::CacheRead)?;
Ok(archive)
Ok(Archive::new(
path,
hashers.into_iter().map(HashDigest::from).collect(),
))
}
.instrument(info_span!("wheel", wheel = %dist))
};
// Fetch the archive from the cache, or download it if necessary.
let req = self.request(url.clone())?;
let cache_control = match self.client.connectivity() {
Connectivity::Online => CacheControl::from(
@ -391,6 +466,20 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
CachedClientError::Client(err) => Error::Client(err),
})?;
// If the archive is missing the required hashes, force a refresh.
let archive = if archive.has_digests(hashes) {
archive
} else {
self.client
.cached_client()
.skip_cache(self.request(url)?, &http_entry, download)
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,
CachedClientError::Client(err) => Error::Client(err),
})?
};
Ok(archive)
}
@ -401,7 +490,8 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
filename: &WheelFilename,
wheel_entry: &CacheEntry,
dist: &BuiltDist,
) -> Result<PathBuf, Error> {
hashes: &[HashDigest],
) -> Result<Archive, Error> {
// Create an entry for the HTTP cache.
let http_entry = wheel_entry.with_file(format!("{}.http", filename.stem()));
@ -427,16 +517,48 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
file.seek(io::SeekFrom::Start(0))
.await
.map_err(Error::CacheWrite)?;
uv_extract::seek::unzip(file, temp_dir.path()).await?;
// If no hashes are required, parallelize the unzip operation.
let hashes = if hashes.is_empty() {
let file = file.into_std().await;
tokio::task::spawn_blocking({
let target = temp_dir.path().to_owned();
move || -> Result<(), uv_extract::Error> {
// Unzip the wheel into a temporary directory.
uv_extract::unzip(file, &target)?;
Ok(())
}
})
.await??;
vec![]
} else {
// Create a hasher for each hash algorithm.
let algorithms = {
let mut hash = hashes.iter().map(HashDigest::algorithm).collect::<Vec<_>>();
hash.sort();
hash.dedup();
hash
};
let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
let mut hasher = uv_extract::hash::HashReader::new(file, &mut hashers);
uv_extract::stream::unzip(&mut hasher, temp_dir.path()).await?;
// If necessary, exhaust the reader to compute the hash.
hasher.finish().await.map_err(Error::HashExhaustion)?;
hashers.into_iter().map(HashDigest::from).collect()
};
// Persist the temporary directory to the directory store.
let archive = self
let path = self
.build_context
.cache()
.persist(temp_dir.into_path(), wheel_entry.path())
.await
.map_err(Error::CacheRead)?;
Ok(archive)
Ok(Archive::new(path, hashes))
}
.instrument(info_span!("wheel", wheel = %dist))
};
@ -451,7 +573,6 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
),
Connectivity::Offline => CacheControl::AllowStale,
};
let archive = self
.client
.cached_client()
@ -462,6 +583,20 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
CachedClientError::Client(err) => Error::Client(err),
})?;
// If the archive is missing the required hashes, force a refresh.
let archive = if archive.has_digests(hashes) {
archive
} else {
self.client
.cached_client()
.skip_cache(self.request(url)?, &http_entry, download)
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,
CachedClientError::Client(err) => Error::Client(err),
})?
};
Ok(archive)
}
@ -472,6 +607,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
filename: &WheelFilename,
wheel_entry: CacheEntry,
dist: &BuiltDist,
hashes: &[HashDigest],
) -> Result<LocalWheel, Error> {
// Determine the last-modified time of the wheel.
let modified = ArchiveTimestamp::from_file(path).map_err(Error::CacheRead)?;
@ -481,20 +617,66 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
let archive = read_timestamped_archive(&archive_entry, modified)?;
// If the file is already unzipped, and the cache is up-to-date, return it.
if let Some(archive) = archive {
if let Some(archive) = archive.filter(|archive| archive.has_digests(hashes)) {
Ok(LocalWheel {
dist: Dist::Built(dist.clone()),
archive,
archive: archive.path,
hashes: archive.hashes,
filename: filename.clone(),
})
} else {
} else if hashes.is_empty() {
// Otherwise, unzip the wheel.
let archive = self.unzip_wheel(path, wheel_entry.path()).await?;
let archive = Archive::new(self.unzip_wheel(path, wheel_entry.path()).await?, vec![]);
write_timestamped_archive(&archive_entry, archive.clone(), modified).await?;
Ok(LocalWheel {
dist: Dist::Built(dist.clone()),
archive,
archive: archive.path,
hashes: archive.hashes,
filename: filename.clone(),
})
} else {
// If necessary, compute the hashes of the wheel.
let file = fs_err::tokio::File::open(path)
.await
.map_err(Error::CacheRead)?;
let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
.map_err(Error::CacheWrite)?;
// Create a hasher for each hash algorithm.
let algorithms = {
let mut hash = hashes.iter().map(HashDigest::algorithm).collect::<Vec<_>>();
hash.sort();
hash.dedup();
hash
};
let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
let mut hasher = uv_extract::hash::HashReader::new(file, &mut hashers);
// Unzip the wheel to a temporary directory.
uv_extract::stream::unzip(&mut hasher, temp_dir.path()).await?;
// Exhaust the reader to compute the hash.
hasher.finish().await.map_err(Error::HashExhaustion)?;
// Persist the temporary directory to the directory store.
let archive = self
.build_context
.cache()
.persist(temp_dir.into_path(), wheel_entry.path())
.await
.map_err(Error::CacheWrite)?;
let hashes = hashers.into_iter().map(HashDigest::from).collect();
// Write the archive pointer to the cache.
let archive = Archive::new(archive, hashes);
write_timestamped_archive(&archive_entry, archive.clone(), modified).await?;
Ok(LocalWheel {
dist: Dist::Built(dist.clone()),
archive: archive.path,
hashes: archive.hashes,
filename: filename.clone(),
})
}
@ -549,7 +731,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
/// Write a timestamped archive path to the cache.
async fn write_timestamped_archive(
cache_entry: &CacheEntry,
data: PathBuf,
data: Archive,
modified: ArchiveTimestamp,
) -> Result<(), Error> {
write_atomic(
@ -564,13 +746,13 @@ async fn write_timestamped_archive(
}
/// Read an existing timestamped archive path, if it exists and is up-to-date.
fn read_timestamped_archive(
pub fn read_timestamped_archive(
cache_entry: &CacheEntry,
modified: ArchiveTimestamp,
) -> Result<Option<PathBuf>, Error> {
) -> Result<Option<Archive>, Error> {
match fs_err::read(cache_entry.path()) {
Ok(cached) => {
let cached = rmp_serde::from_slice::<CachedByTimestamp<PathBuf>>(&cached)?;
let cached = rmp_serde::from_slice::<CachedByTimestamp<Archive>>(&cached)?;
if cached.timestamp == modified.timestamp() {
return Ok(Some(cached.data));
}