From 5a98add54ecc5a88e94d766ce5e12c6bf1bfcfb6 Mon Sep 17 00:00:00 2001 From: Charlie Marsh Date: Wed, 3 Jan 2024 06:37:45 -0400 Subject: [PATCH] Always pre-fetch distribution metadata (#744) This PR fixes our prefetching logic to ensure that we always attempt to prefetch the "best-guess" distribution for all dependencies. This logic already existed, but because we only attempted to prefetch when package metadata was available, it almost never triggered. Now, we wait for the package metadata to become available, _then_ kick off the "best-guess" prefetch (for every package). In my testing, this dramatically improves performance (like 2x). I'm wondering if this regressed at some point? Closes #743. Co-authored-by: konsti --- crates/puffin-resolver/src/resolver.rs | 90 ++++++++++++++++---------- 1 file changed, 57 insertions(+), 33 deletions(-) diff --git a/crates/puffin-resolver/src/resolver.rs b/crates/puffin-resolver/src/resolver.rs index d03b484b3..755b72406 100644 --- a/crates/puffin-resolver/src/resolver.rs +++ b/crates/puffin-resolver/src/resolver.rs @@ -324,7 +324,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { state.unit_propagation(next)?; // Pre-visit all candidate packages, to allow metadata to be fetched in parallel. - self.pre_visit(state.partial_solution.prioritized_packages(), request_sink)?; + Self::pre_visit(state.partial_solution.prioritized_packages(), request_sink)?; // Choose a package version. let Some(highest_priority_pkg) = @@ -461,9 +461,8 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { /// Visit the set of [`PubGrubPackage`] candidates prior to selection. This allows us to fetch /// metadata for all of the packages in parallel. - fn pre_visit( - &self, - packages: impl Iterator)>, + fn pre_visit<'data>( + packages: impl Iterator)>, request_sink: &futures::channel::mpsc::UnboundedSender, ) -> Result<(), ResolveError> { // Iterate over the potential packages, and fetch file metadata for any of them. These @@ -472,29 +471,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { let PubGrubPackage::Package(package_name, _extra, None) = package else { continue; }; - - // If we don't have metadata for this package, we can't make an early decision. - let Some(entry) = self.index.packages.get(package_name) else { - continue; - }; - let (index, base, version_map) = entry.value(); - - // Try to find a compatible version. If there aren't any compatible versions, - // short-circuit and return `None`. - let Some(candidate) = self.selector.select(package_name, range, version_map) else { - // Short-circuit: we couldn't find _any_ compatible versions for a package. - return Ok(()); - }; - - // Emit a request to fetch the metadata for this version. - if self - .index - .distributions - .register_owned(candidate.package_id()) - { - let distribution = candidate.into_distribution(index.clone(), base.clone()); - request_sink.unbounded_send(Request::Dist(distribution))?; - } + request_sink.unbounded_send(Request::Prefetch(package_name.clone(), range.clone()))?; } Ok(()) } @@ -704,19 +681,19 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { while let Some(response) = response_stream.next().await { match response? { - Response::Package(package_name, index, base, version_map) => { + Some(Response::Package(package_name, index, base, version_map)) => { trace!("Received package metadata for: {package_name}"); self.index .packages .done(package_name, (index, base, version_map)); } - Response::Dist(Dist::Built(distribution), metadata, ..) => { + Some(Response::Dist(Dist::Built(distribution), metadata, ..)) => { trace!("Received built distribution metadata for: {distribution}"); self.index .distributions .done(distribution.package_id(), metadata); } - Response::Dist(Dist::Source(distribution), metadata, precise) => { + Some(Response::Dist(Dist::Source(distribution), metadata, precise)) => { trace!("Received source distribution metadata for: {distribution}"); self.index .distributions @@ -736,13 +713,14 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { } } } + None => {} } } Ok::<(), ResolveError>(()) } - async fn process_request(&self, request: Request) -> Result { + async fn process_request(&self, request: Request) -> Result, ResolveError> { match request { // Fetch package metadata from the registry. Request::Package(package_name) => { @@ -751,9 +729,10 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { .get_version_map(&package_name) .await .map_err(ResolveError::Client)?; - Ok(Response::Package(package_name, index, base, metadata)) + Ok(Some(Response::Package(package_name, index, base, metadata))) } + // Fetch distribution metadata from the distribution database. Request::Dist(dist) => { let (metadata, precise) = self .provider @@ -771,7 +750,50 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { ResolveError::FetchAndBuild(Box::new(source_dist), err) } })?; - Ok(Response::Dist(dist, metadata, precise)) + Ok(Some(Response::Dist(dist, metadata, precise))) + } + + // Pre-fetch the package and distribution metadata. + Request::Prefetch(package_name, range) => { + // Wait for the package metadata to become available. + let entry = self.index.packages.wait(&package_name).await; + let (index, base, version_map) = entry.value(); + + // Try to find a compatible version. If there aren't any compatible versions, + // short-circuit and return `None`. + let Some(candidate) = self.selector.select(&package_name, &range, version_map) + else { + return Ok(None); + }; + + // Emit a request to fetch the metadata for this version. + if self.index.distributions.register(&candidate.package_id()) { + let dist = candidate.into_distribution(index.clone(), base.clone()); + drop(entry); + + let (metadata, precise) = self + .provider + .get_or_build_wheel_metadata(&dist) + .await + .map_err(|err| match dist.clone() { + Dist::Built(BuiltDist::Path(built_dist)) => { + ResolveError::Read(Box::new(built_dist), err) + } + Dist::Source(SourceDist::Path(source_dist)) => { + ResolveError::Build(Box::new(source_dist), err) + } + Dist::Built(built_dist) => { + ResolveError::Fetch(Box::new(built_dist), err) + } + Dist::Source(source_dist) => { + ResolveError::FetchAndBuild(Box::new(source_dist), err) + } + })?; + + Ok(Some(Response::Dist(dist, metadata, precise))) + } else { + Ok(None) + } } } } @@ -850,6 +872,8 @@ enum Request { Package(PackageName), /// A request to fetch the metadata for a built or source distribution. Dist(Dist), + /// A request to pre-fetch the metadata for a package and the best-guess distribution. + Prefetch(PackageName, Range), } #[derive(Debug)]