Always pre-fetch distribution metadata (#744)

This PR fixes our prefetching logic to ensure that we always attempt to
prefetch the "best-guess" distribution for all dependencies. This logic
already existed, but because we only attempted to prefetch when package
metadata was available, it almost never triggered. Now, we wait for the
package metadata to become available, _then_ kick off the "best-guess"
prefetch (for every package).

In my testing, this dramatically improves performance (like 2x). I'm
wondering if this regressed at some point?

Closes #743.

Co-authored-by: konsti <konstin@mailbox.org>
This commit is contained in:
Charlie Marsh 2024-01-03 06:37:45 -04:00 committed by GitHub
parent ba23115465
commit 5a98add54e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -324,7 +324,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
state.unit_propagation(next)?;
// Pre-visit all candidate packages, to allow metadata to be fetched in parallel.
self.pre_visit(state.partial_solution.prioritized_packages(), request_sink)?;
Self::pre_visit(state.partial_solution.prioritized_packages(), request_sink)?;
// Choose a package version.
let Some(highest_priority_pkg) =
@ -461,9 +461,8 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
/// Visit the set of [`PubGrubPackage`] candidates prior to selection. This allows us to fetch
/// metadata for all of the packages in parallel.
fn pre_visit(
&self,
packages: impl Iterator<Item = (&'a PubGrubPackage, &'a Range<PubGrubVersion>)>,
fn pre_visit<'data>(
packages: impl Iterator<Item = (&'data PubGrubPackage, &'data Range<PubGrubVersion>)>,
request_sink: &futures::channel::mpsc::UnboundedSender<Request>,
) -> Result<(), ResolveError> {
// Iterate over the potential packages, and fetch file metadata for any of them. These
@ -472,29 +471,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
let PubGrubPackage::Package(package_name, _extra, None) = package else {
continue;
};
// If we don't have metadata for this package, we can't make an early decision.
let Some(entry) = self.index.packages.get(package_name) else {
continue;
};
let (index, base, version_map) = entry.value();
// Try to find a compatible version. If there aren't any compatible versions,
// short-circuit and return `None`.
let Some(candidate) = self.selector.select(package_name, range, version_map) else {
// Short-circuit: we couldn't find _any_ compatible versions for a package.
return Ok(());
};
// Emit a request to fetch the metadata for this version.
if self
.index
.distributions
.register_owned(candidate.package_id())
{
let distribution = candidate.into_distribution(index.clone(), base.clone());
request_sink.unbounded_send(Request::Dist(distribution))?;
}
request_sink.unbounded_send(Request::Prefetch(package_name.clone(), range.clone()))?;
}
Ok(())
}
@ -704,19 +681,19 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
while let Some(response) = response_stream.next().await {
match response? {
Response::Package(package_name, index, base, version_map) => {
Some(Response::Package(package_name, index, base, version_map)) => {
trace!("Received package metadata for: {package_name}");
self.index
.packages
.done(package_name, (index, base, version_map));
}
Response::Dist(Dist::Built(distribution), metadata, ..) => {
Some(Response::Dist(Dist::Built(distribution), metadata, ..)) => {
trace!("Received built distribution metadata for: {distribution}");
self.index
.distributions
.done(distribution.package_id(), metadata);
}
Response::Dist(Dist::Source(distribution), metadata, precise) => {
Some(Response::Dist(Dist::Source(distribution), metadata, precise)) => {
trace!("Received source distribution metadata for: {distribution}");
self.index
.distributions
@ -736,13 +713,14 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
}
}
}
None => {}
}
}
Ok::<(), ResolveError>(())
}
async fn process_request(&self, request: Request) -> Result<Response, ResolveError> {
async fn process_request(&self, request: Request) -> Result<Option<Response>, ResolveError> {
match request {
// Fetch package metadata from the registry.
Request::Package(package_name) => {
@ -751,9 +729,10 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
.get_version_map(&package_name)
.await
.map_err(ResolveError::Client)?;
Ok(Response::Package(package_name, index, base, metadata))
Ok(Some(Response::Package(package_name, index, base, metadata)))
}
// Fetch distribution metadata from the distribution database.
Request::Dist(dist) => {
let (metadata, precise) = self
.provider
@ -771,7 +750,50 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
ResolveError::FetchAndBuild(Box::new(source_dist), err)
}
})?;
Ok(Response::Dist(dist, metadata, precise))
Ok(Some(Response::Dist(dist, metadata, precise)))
}
// Pre-fetch the package and distribution metadata.
Request::Prefetch(package_name, range) => {
// Wait for the package metadata to become available.
let entry = self.index.packages.wait(&package_name).await;
let (index, base, version_map) = entry.value();
// Try to find a compatible version. If there aren't any compatible versions,
// short-circuit and return `None`.
let Some(candidate) = self.selector.select(&package_name, &range, version_map)
else {
return Ok(None);
};
// Emit a request to fetch the metadata for this version.
if self.index.distributions.register(&candidate.package_id()) {
let dist = candidate.into_distribution(index.clone(), base.clone());
drop(entry);
let (metadata, precise) = self
.provider
.get_or_build_wheel_metadata(&dist)
.await
.map_err(|err| match dist.clone() {
Dist::Built(BuiltDist::Path(built_dist)) => {
ResolveError::Read(Box::new(built_dist), err)
}
Dist::Source(SourceDist::Path(source_dist)) => {
ResolveError::Build(Box::new(source_dist), err)
}
Dist::Built(built_dist) => {
ResolveError::Fetch(Box::new(built_dist), err)
}
Dist::Source(source_dist) => {
ResolveError::FetchAndBuild(Box::new(source_dist), err)
}
})?;
Ok(Some(Response::Dist(dist, metadata, precise)))
} else {
Ok(None)
}
}
}
}
@ -850,6 +872,8 @@ enum Request {
Package(PackageName),
/// A request to fetch the metadata for a built or source distribution.
Dist(Dist),
/// A request to pre-fetch the metadata for a package and the best-guess distribution.
Prefetch(PackageName, Range<PubGrubVersion>),
}
#[derive(Debug)]