Make tags non-required for fetching wheel metadata (#2700)

## Summary

This looks like a big change but it really isn't. Rather, I just split
`get_or_build_wheel` into separate `get_wheel` and `build_wheel` methods
internally, which made `get_or_build_wheel_metadata` capable of _not_
relying on `Tags`, which in turn makes it easier for us to use the
`DistributionDatabase` in various places without having it coupled to an
interpreter or environment (something we already did for
`SourceDistributionBuilder`).
This commit is contained in:
Charlie Marsh 2024-03-27 20:06:25 -04:00 committed by GitHub
parent cf30932831
commit b6ab919945
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 230 additions and 236 deletions

View file

@ -11,7 +11,7 @@ use url::Url;
use distribution_filename::WheelFilename; use distribution_filename::WheelFilename;
use distribution_types::{ use distribution_types::{
BuildableSource, BuiltDist, Dist, FileLocation, IndexLocations, LocalEditable, Name, BuildableSource, BuiltDist, Dist, FileLocation, IndexLocations, LocalEditable, Name, SourceDist,
}; };
use platform_tags::Tags; use platform_tags::Tags;
use pypi_types::Metadata23; use pypi_types::Metadata23;
@ -40,24 +40,17 @@ pub struct DistributionDatabase<'a, Context: BuildContext + Send + Sync> {
cache: &'a Cache, cache: &'a Cache,
reporter: Option<Arc<dyn Reporter>>, reporter: Option<Arc<dyn Reporter>>,
locks: Arc<Locks>, locks: Arc<Locks>,
tags: &'a Tags,
client: &'a RegistryClient, client: &'a RegistryClient,
build_context: &'a Context, build_context: &'a Context,
builder: SourceDistCachedBuilder<'a, Context>, builder: SourceDistCachedBuilder<'a, Context>,
} }
impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> { impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> {
pub fn new( pub fn new(cache: &'a Cache, client: &'a RegistryClient, build_context: &'a Context) -> Self {
cache: &'a Cache,
tags: &'a Tags,
client: &'a RegistryClient,
build_context: &'a Context,
) -> Self {
Self { Self {
cache, cache,
reporter: None, reporter: None,
locks: Arc::new(Locks::default()), locks: Arc::new(Locks::default()),
tags,
client, client,
build_context, build_context,
builder: SourceDistCachedBuilder::new(build_context, client), builder: SourceDistCachedBuilder::new(build_context, client),
@ -94,222 +87,10 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
/// If `no_remote_wheel` is set, the wheel will be built from a source distribution /// If `no_remote_wheel` is set, the wheel will be built from a source distribution
/// even if compatible pre-built wheels are available. /// even if compatible pre-built wheels are available.
#[instrument(skip(self))] #[instrument(skip(self))]
pub async fn get_or_build_wheel(&self, dist: Dist) -> Result<LocalWheel, Error> { pub async fn get_or_build_wheel(&self, dist: &Dist, tags: &Tags) -> Result<LocalWheel, Error> {
let no_binary = match self.build_context.no_binary() { match dist {
NoBinary::None => false, Dist::Built(built) => self.get_wheel(built).await,
NoBinary::All => true, Dist::Source(source) => self.build_wheel(source, tags).await,
NoBinary::Packages(packages) => packages.contains(dist.name()),
};
match &dist {
Dist::Built(BuiltDist::Registry(wheel)) => {
if no_binary {
return Err(Error::NoBinary);
}
let url = match &wheel.file.url {
FileLocation::RelativeUrl(base, url) => {
pypi_types::base_url_join_relative(base, url)?
}
FileLocation::AbsoluteUrl(url) => {
Url::parse(url).map_err(|err| Error::Url(url.clone(), err))?
}
FileLocation::Path(path) => {
let url = Url::from_file_path(path).expect("path is absolute");
let cache_entry = self.cache.entry(
CacheBucket::Wheels,
WheelCache::Url(&url).wheel_dir(wheel.name().as_ref()),
wheel.filename.stem(),
);
// If the file is already unzipped, and the unzipped directory is fresh,
// return it.
match cache_entry.path().canonicalize() {
Ok(archive) => {
if ArchiveTimestamp::up_to_date_with(
path,
ArchiveTarget::Cache(&archive),
)
.map_err(Error::CacheRead)?
{
return Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
archive,
filename: wheel.filename.clone(),
}));
}
}
Err(err) if err.kind() == io::ErrorKind::NotFound => {}
Err(err) => return Err(Error::CacheRead(err)),
}
// Otherwise, unzip the file.
return Ok(LocalWheel::Disk(DiskWheel {
dist: dist.clone(),
path: path.clone(),
target: cache_entry.into_path_buf(),
filename: wheel.filename.clone(),
}));
}
};
// Create a cache entry for the wheel.
let wheel_entry = self.cache.entry(
CacheBucket::Wheels,
WheelCache::Index(&wheel.index).wheel_dir(wheel.name().as_ref()),
wheel.filename.stem(),
);
// Download and unzip.
match self
.stream_wheel(url.clone(), &wheel.filename, &wheel_entry, &dist)
.await
{
Ok(archive) => Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
archive,
filename: wheel.filename.clone(),
})),
Err(Error::Extract(err)) if err.is_http_streaming_unsupported() => {
warn!(
"Streaming unsupported for {dist}; downloading wheel to disk ({err})"
);
// If the request failed because streaming is unsupported, download the
// wheel directly.
let archive = self
.download_wheel(url, &wheel.filename, &wheel_entry, &dist)
.await?;
Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
archive,
filename: wheel.filename.clone(),
}))
}
Err(err) => Err(err),
}
}
Dist::Built(BuiltDist::DirectUrl(wheel)) => {
if no_binary {
return Err(Error::NoBinary);
}
// Create a cache entry for the wheel.
let wheel_entry = self.cache.entry(
CacheBucket::Wheels,
WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()),
wheel.filename.stem(),
);
// Download and unzip.
match self
.stream_wheel(
wheel.url.raw().clone(),
&wheel.filename,
&wheel_entry,
&dist,
)
.await
{
Ok(archive) => Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
archive,
filename: wheel.filename.clone(),
})),
Err(Error::Client(err)) if err.is_http_streaming_unsupported() => {
warn!(
"Streaming unsupported for {dist}; downloading wheel to disk ({err})"
);
// If the request failed because streaming is unsupported, download the
// wheel directly.
let archive = self
.download_wheel(
wheel.url.raw().clone(),
&wheel.filename,
&wheel_entry,
&dist,
)
.await?;
Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
archive,
filename: wheel.filename.clone(),
}))
}
Err(err) => Err(err),
}
}
Dist::Built(BuiltDist::Path(wheel)) => {
if no_binary {
return Err(Error::NoBinary);
}
let cache_entry = self.cache.entry(
CacheBucket::Wheels,
WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()),
wheel.filename.stem(),
);
// If the file is already unzipped, and the unzipped directory is fresh,
// return it.
match cache_entry.path().canonicalize() {
Ok(archive) => {
if ArchiveTimestamp::up_to_date_with(
&wheel.path,
ArchiveTarget::Cache(&archive),
)
.map_err(Error::CacheRead)?
{
return Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
archive,
filename: wheel.filename.clone(),
}));
}
}
Err(err) if err.kind() == io::ErrorKind::NotFound => {}
Err(err) => return Err(Error::CacheRead(err)),
}
Ok(LocalWheel::Disk(DiskWheel {
dist: dist.clone(),
path: wheel.path.clone(),
target: cache_entry.into_path_buf(),
filename: wheel.filename.clone(),
}))
}
Dist::Source(source_dist) => {
let lock = self.locks.acquire(&dist).await;
let _guard = lock.lock().await;
let built_wheel = self
.builder
.download_and_build(&BuildableSource::Dist(source_dist), self.tags)
.boxed()
.await?;
// If the wheel was unzipped previously, respect it. Source distributions are
// cached under a unique build ID, so unzipped directories are never stale.
match built_wheel.target.canonicalize() {
Ok(archive) => Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
archive,
filename: built_wheel.filename,
})),
Err(err) if err.kind() == io::ErrorKind::NotFound => {
Ok(LocalWheel::Built(BuiltWheel {
dist: dist.clone(),
path: built_wheel.path,
target: built_wheel.target,
filename: built_wheel.filename,
}))
}
Err(err) => return Err(Error::CacheRead(err)),
}
}
} }
} }
@ -329,11 +110,11 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
match self.client.wheel_metadata(built_dist).boxed().await { match self.client.wheel_metadata(built_dist).boxed().await {
Ok(metadata) => Ok((metadata, None)), Ok(metadata) => Ok((metadata, None)),
Err(err) if err.is_http_streaming_unsupported() => { Err(err) if err.is_http_streaming_unsupported() => {
warn!("Streaming unsupported when fetching metadata for {dist}; downloading wheel directly ({err})"); warn!("Streaming unsupported when fetching metadata for {built_dist}; downloading wheel directly ({err})");
// If the request failed due to an error that could be resolved by // If the request failed due to an error that could be resolved by
// downloading the wheel directly, try that. // downloading the wheel directly, try that.
let wheel = self.get_or_build_wheel(dist.clone()).await?; let wheel = self.get_wheel(built_dist).await?;
Ok((wheel.metadata()?, None)) Ok((wheel.metadata()?, None))
} }
Err(err) => Err(err.into()), Err(err) => Err(err.into()),
@ -345,6 +126,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
NoBuild::None => false, NoBuild::None => false,
NoBuild::Packages(packages) => packages.contains(source_dist.name()), NoBuild::Packages(packages) => packages.contains(source_dist.name()),
}; };
// Optimization: Skip source dist download when we must not build them anyway. // Optimization: Skip source dist download when we must not build them anyway.
if no_build { if no_build {
return Err(Error::NoBuild); return Err(Error::NoBuild);
@ -396,13 +178,222 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
Ok((LocalWheel::Built(built_wheel), metadata)) Ok((LocalWheel::Built(built_wheel), metadata))
} }
/// Fetch a wheel from the cache or download it from the index.
async fn get_wheel(&self, dist: &BuiltDist) -> Result<LocalWheel, Error> {
let no_binary = match self.build_context.no_binary() {
NoBinary::None => false,
NoBinary::All => true,
NoBinary::Packages(packages) => packages.contains(dist.name()),
};
if no_binary {
return Err(Error::NoBinary);
}
match dist {
BuiltDist::Registry(wheel) => {
let url = match &wheel.file.url {
FileLocation::RelativeUrl(base, url) => {
pypi_types::base_url_join_relative(base, url)?
}
FileLocation::AbsoluteUrl(url) => {
Url::parse(url).map_err(|err| Error::Url(url.clone(), err))?
}
FileLocation::Path(path) => {
let url = Url::from_file_path(path).expect("path is absolute");
let cache_entry = self.cache.entry(
CacheBucket::Wheels,
WheelCache::Url(&url).wheel_dir(wheel.name().as_ref()),
wheel.filename.stem(),
);
// If the file is already unzipped, and the unzipped directory is fresh,
// return it.
match cache_entry.path().canonicalize() {
Ok(archive) => {
if ArchiveTimestamp::up_to_date_with(
path,
ArchiveTarget::Cache(&archive),
)
.map_err(Error::CacheRead)?
{
return Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: Dist::Built(dist.clone()),
archive,
filename: wheel.filename.clone(),
}));
}
}
Err(err) if err.kind() == io::ErrorKind::NotFound => {}
Err(err) => return Err(Error::CacheRead(err)),
}
// Otherwise, unzip the file.
return Ok(LocalWheel::Disk(DiskWheel {
dist: Dist::Built(dist.clone()),
path: path.clone(),
target: cache_entry.into_path_buf(),
filename: wheel.filename.clone(),
}));
}
};
// Create a cache entry for the wheel.
let wheel_entry = self.cache.entry(
CacheBucket::Wheels,
WheelCache::Index(&wheel.index).wheel_dir(wheel.name().as_ref()),
wheel.filename.stem(),
);
// Download and unzip.
match self
.stream_wheel(url.clone(), &wheel.filename, &wheel_entry, dist)
.await
{
Ok(archive) => Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: Dist::Built(dist.clone()),
archive,
filename: wheel.filename.clone(),
})),
Err(Error::Extract(err)) if err.is_http_streaming_unsupported() => {
warn!(
"Streaming unsupported for {dist}; downloading wheel to disk ({err})"
);
// If the request failed because streaming is unsupported, download the
// wheel directly.
let archive = self
.download_wheel(url, &wheel.filename, &wheel_entry, dist)
.await?;
Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: Dist::Built(dist.clone()),
archive,
filename: wheel.filename.clone(),
}))
}
Err(err) => Err(err),
}
}
BuiltDist::DirectUrl(wheel) => {
// Create a cache entry for the wheel.
let wheel_entry = self.cache.entry(
CacheBucket::Wheels,
WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()),
wheel.filename.stem(),
);
// Download and unzip.
match self
.stream_wheel(wheel.url.raw().clone(), &wheel.filename, &wheel_entry, dist)
.await
{
Ok(archive) => Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: Dist::Built(dist.clone()),
archive,
filename: wheel.filename.clone(),
})),
Err(Error::Client(err)) if err.is_http_streaming_unsupported() => {
warn!(
"Streaming unsupported for {dist}; downloading wheel to disk ({err})"
);
// If the request failed because streaming is unsupported, download the
// wheel directly.
let archive = self
.download_wheel(
wheel.url.raw().clone(),
&wheel.filename,
&wheel_entry,
dist,
)
.await?;
Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: Dist::Built(dist.clone()),
archive,
filename: wheel.filename.clone(),
}))
}
Err(err) => Err(err),
}
}
BuiltDist::Path(wheel) => {
let cache_entry = self.cache.entry(
CacheBucket::Wheels,
WheelCache::Url(&wheel.url).wheel_dir(wheel.name().as_ref()),
wheel.filename.stem(),
);
// If the file is already unzipped, and the unzipped directory is fresh,
// return it.
match cache_entry.path().canonicalize() {
Ok(archive) => {
if ArchiveTimestamp::up_to_date_with(
&wheel.path,
ArchiveTarget::Cache(&archive),
)
.map_err(Error::CacheRead)?
{
return Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: Dist::Built(dist.clone()),
archive,
filename: wheel.filename.clone(),
}));
}
}
Err(err) if err.kind() == io::ErrorKind::NotFound => {}
Err(err) => return Err(Error::CacheRead(err)),
}
Ok(LocalWheel::Disk(DiskWheel {
dist: Dist::Built(dist.clone()),
path: wheel.path.clone(),
target: cache_entry.into_path_buf(),
filename: wheel.filename.clone(),
}))
}
}
}
/// Convert a source distribution into a wheel, fetching it from the cache or building it if
/// necessary.
async fn build_wheel(&self, dist: &SourceDist, tags: &Tags) -> Result<LocalWheel, Error> {
let lock = self.locks.acquire(&Dist::Source(dist.clone())).await;
let _guard = lock.lock().await;
let built_wheel = self
.builder
.download_and_build(&BuildableSource::Dist(dist), tags)
.boxed()
.await?;
// If the wheel was unzipped previously, respect it. Source distributions are
// cached under a unique build ID, so unzipped directories are never stale.
match built_wheel.target.canonicalize() {
Ok(archive) => Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: Dist::Source(dist.clone()),
archive,
filename: built_wheel.filename,
})),
Err(err) if err.kind() == io::ErrorKind::NotFound => {
Ok(LocalWheel::Built(BuiltWheel {
dist: Dist::Source(dist.clone()),
path: built_wheel.path,
target: built_wheel.target,
filename: built_wheel.filename,
}))
}
Err(err) => Err(Error::CacheRead(err)),
}
}
/// Stream a wheel from a URL, unzipping it into the cache as it's downloaded. /// Stream a wheel from a URL, unzipping it into the cache as it's downloaded.
async fn stream_wheel( async fn stream_wheel(
&self, &self,
url: Url, url: Url,
filename: &WheelFilename, filename: &WheelFilename,
wheel_entry: &CacheEntry, wheel_entry: &CacheEntry,
dist: &Dist, dist: &BuiltDist,
) -> Result<PathBuf, Error> { ) -> Result<PathBuf, Error> {
// Create an entry for the HTTP cache. // Create an entry for the HTTP cache.
let http_entry = wheel_entry.with_file(format!("{}.http", filename.stem())); let http_entry = wheel_entry.with_file(format!("{}.http", filename.stem()));
@ -470,7 +461,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
url: Url, url: Url,
filename: &WheelFilename, filename: &WheelFilename,
wheel_entry: &CacheEntry, wheel_entry: &CacheEntry,
dist: &Dist, dist: &BuiltDist,
) -> Result<PathBuf, Error> { ) -> Result<PathBuf, Error> {
// Create an entry for the HTTP cache. // Create an entry for the HTTP cache.
let http_entry = wheel_entry.with_file(format!("{}.http", filename.stem())); let http_entry = wheel_entry.with_file(format!("{}.http", filename.stem()));

View file

@ -38,8 +38,9 @@ pub enum Error {
/// Download, build, and unzip a set of distributions. /// Download, build, and unzip a set of distributions.
pub struct Downloader<'a, Context: BuildContext + Send + Sync> { pub struct Downloader<'a, Context: BuildContext + Send + Sync> {
database: DistributionDatabase<'a, Context>, tags: &'a Tags,
cache: &'a Cache, cache: &'a Cache,
database: DistributionDatabase<'a, Context>,
reporter: Option<Arc<dyn Reporter>>, reporter: Option<Arc<dyn Reporter>>,
} }
@ -51,9 +52,10 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> {
build_context: &'a Context, build_context: &'a Context,
) -> Self { ) -> Self {
Self { Self {
database: DistributionDatabase::new(cache, tags, client, build_context), tags,
reporter: None,
cache, cache,
database: DistributionDatabase::new(cache, client, build_context),
reporter: None,
} }
} }
@ -62,9 +64,10 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> {
pub fn with_reporter(self, reporter: impl Reporter + 'static) -> Self { pub fn with_reporter(self, reporter: impl Reporter + 'static) -> Self {
let reporter: Arc<dyn Reporter> = Arc::new(reporter); let reporter: Arc<dyn Reporter> = Arc::new(reporter);
Self { Self {
reporter: Some(reporter.clone()), tags: self.tags,
database: self.database.with_reporter(Facade::from(reporter.clone())),
cache: self.cache, cache: self.cache,
database: self.database.with_reporter(Facade::from(reporter.clone())),
reporter: Some(reporter.clone()),
} }
} }
@ -165,7 +168,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> {
if in_flight.downloads.register(id.clone()) { if in_flight.downloads.register(id.clone()) {
let download: LocalWheel = self let download: LocalWheel = self
.database .database
.get_or_build_wheel(dist.clone()) .get_or_build_wheel(&dist, self.tags)
.boxed() .boxed()
.map_err(|err| Error::Fetch(dist.clone(), err)) .map_err(|err| Error::Fetch(dist.clone(), err))
.await?; .await?;

View file

@ -129,7 +129,7 @@ impl<'a, Context: BuildContext + Send + Sync> Resolver<'a, DefaultResolverProvid
) -> Result<Self, ResolveError> { ) -> Result<Self, ResolveError> {
let provider = DefaultResolverProvider::new( let provider = DefaultResolverProvider::new(
client, client,
DistributionDatabase::new(build_context.cache(), tags, client, build_context), DistributionDatabase::new(build_context.cache(), client, build_context),
flat_index, flat_index,
tags, tags,
PythonRequirement::new(interpreter, markers), PythonRequirement::new(interpreter, markers),