From bae52d5edd88005ec54a99e7c8845bae1f6579ed Mon Sep 17 00:00:00 2001 From: Charlie Marsh Date: Mon, 16 Oct 2023 13:26:46 -0400 Subject: [PATCH] Surface request stream errors in the resolver (#107) Closes https://github.com/astral-sh/puffin/issues/105. --- crates/puffin-resolver/src/error.rs | 6 + crates/puffin-resolver/src/pubgrub/version.rs | 5 - crates/puffin-resolver/src/resolver.rs | 110 +++++++++++------- 3 files changed, 72 insertions(+), 49 deletions(-) diff --git a/crates/puffin-resolver/src/error.rs b/crates/puffin-resolver/src/error.rs index ab1b2ad39..a39e77aae 100644 --- a/crates/puffin-resolver/src/error.rs +++ b/crates/puffin-resolver/src/error.rs @@ -10,12 +10,18 @@ pub enum ResolveError { #[error("Failed to find a version of {0} that satisfies the requirement")] NotFound(Requirement), + #[error("The request stream terminated unexpectedly")] + StreamTermination, + #[error(transparent)] Client(#[from] puffin_client::PypiClientError), #[error(transparent)] TrySend(#[from] futures::channel::mpsc::SendError), + #[error(transparent)] + Join(#[from] tokio::task::JoinError), + #[error(transparent)] PubGrub(#[from] pubgrub::error::PubGrubError), } diff --git a/crates/puffin-resolver/src/pubgrub/version.rs b/crates/puffin-resolver/src/pubgrub/version.rs index b8a3792b1..4bf8945dd 100644 --- a/crates/puffin-resolver/src/pubgrub/version.rs +++ b/crates/puffin-resolver/src/pubgrub/version.rs @@ -7,11 +7,6 @@ use once_cell::sync::Lazy; pub struct PubGrubVersion(pep440_rs::Version); impl PubGrubVersion { - /// Returns `true` if this is a pre-release version. - pub fn is_prerelease(&self) -> bool { - self.0.pre.is_some() || self.0.dev.is_some() - } - /// Returns the smallest PEP 440 version that is larger than `self`. pub fn next(&self) -> PubGrubVersion { let mut next = self.clone(); diff --git a/crates/puffin-resolver/src/resolver.rs b/crates/puffin-resolver/src/resolver.rs index 3bd23f41b..c0dd8c706 100644 --- a/crates/puffin-resolver/src/resolver.rs +++ b/crates/puffin-resolver/src/resolver.rs @@ -7,11 +7,12 @@ use std::sync::Arc; use anyhow::Result; use futures::future::Either; -use futures::{StreamExt, TryFutureExt}; +use futures::{pin_mut, FutureExt, StreamExt, TryFutureExt}; use pubgrub::error::PubGrubError; use pubgrub::range::Range; use pubgrub::solver::{DependencyConstraints, Incompatibility, State}; use pubgrub::type_aliases::SelectedDependencies; +use tokio::select; use tracing::{debug, trace}; use waitmap::WaitMap; @@ -60,7 +61,7 @@ impl<'a> Resolver<'a> { // A channel to fetch package metadata (e.g., given `flask`, fetch all versions) and version // metadata (e.g., given `flask==1.0.0`, fetch the metadata for that version). let (request_sink, request_stream) = futures::channel::mpsc::unbounded(); - tokio::spawn({ + let requests_fut = tokio::spawn({ let cache = cache.clone(); let client = client.clone(); async move { @@ -101,7 +102,7 @@ impl<'a> Resolver<'a> { } } - Ok::<(), anyhow::Error>(()) + Ok::<(), ResolveError>(()) } }); @@ -112,52 +113,39 @@ impl<'a> Resolver<'a> { request_sink.unbounded_send(Request::Package(package_name))?; } - // Track the packages and versions that we've requested. - let mut requested_packages = HashSet::new(); - let mut requested_versions = HashSet::new(); - let mut pins = HashMap::new(); + // Run the solver. + let resolve_fut = self.solve(&cache, &request_sink); - let selected_dependencies = self - .solve( - &cache, - &mut pins, - &mut requested_packages, - &mut requested_versions, - &request_sink, - ) - .await?; + let requests_fut = requests_fut.fuse(); + let resolve_fut = resolve_fut.fuse(); + pin_mut!(requests_fut, resolve_fut); - // Map to our own internal resolution type. - let mut resolution = BTreeMap::new(); - for (package, version) in selected_dependencies { - let PubGrubPackage::Package(package_name, None) = package else { - continue; - }; + let resolution = select! { + result = requests_fut => { + result??; + return Err(ResolveError::StreamTermination); + } + resolution = resolve_fut => { + resolution? + } + }; - let version = pep440_rs::Version::from(version); - let file = pins - .get(&package_name) - .and_then(|versions| versions.get(&version)) - .unwrap() - .clone(); - let pinned_package = PinnedPackage::new(package_name.clone(), version, file); - resolution.insert(package_name, pinned_package); - } - - Ok(Resolution::new(resolution)) + Ok(resolution.into()) } /// Run the `PubGrub` solver. async fn solve( &self, cache: &SolverCache, - pins: &mut HashMap>, - requested_packages: &mut HashSet, - requested_versions: &mut HashSet, request_sink: &futures::channel::mpsc::UnboundedSender, - ) -> Result, ResolveError> { + ) -> Result { let root = PubGrubPackage::Root; + // Keep track of the packages for which we've requested metadata. + let mut requested_packages = HashSet::new(); + let mut requested_versions = HashSet::new(); + let mut pins = HashMap::new(); + // Start the solve. let mut state = State::init(root.clone(), MIN_VERSION.clone()); let mut added_dependencies: HashMap> = @@ -170,11 +158,15 @@ impl<'a> Resolver<'a> { // Fetch the list of candidates. let Some(potential_packages) = state.partial_solution.potential_packages() else { - return state.partial_solution.extract_solution().ok_or_else(|| { - PubGrubError::Failure( + let Some(selected_dependencies) = state.partial_solution.extract_solution() else { + return Err(PubGrubError::Failure( "How did we end up with no package to choose but no solution?".into(), ) - .into() + .into()); + }; + return Ok(PubGrubResolution { + selected_dependencies, + pins, }); }; @@ -184,8 +176,8 @@ impl<'a> Resolver<'a> { .choose_package_version( potential_packages, cache, - pins, - requested_versions, + &mut pins, + &mut requested_versions, request_sink, ) .await?; @@ -218,8 +210,8 @@ impl<'a> Resolver<'a> { package, &version, cache, - pins, - requested_packages, + &mut pins, + &mut requested_packages, request_sink, ) .await? @@ -516,3 +508,33 @@ enum Dependencies { /// Container for all available package versions. Known(DependencyConstraints), } + +#[derive(Debug)] +struct PubGrubResolution { + /// The selected dependencies. + selected_dependencies: SelectedDependencies, + /// The selected file (source or built distribution) for each package. + pins: HashMap>, +} + +impl From for Resolution { + fn from(value: PubGrubResolution) -> Self { + let mut packages = BTreeMap::new(); + for (package, version) in value.selected_dependencies { + let PubGrubPackage::Package(package_name, None) = package else { + continue; + }; + + let version = pep440_rs::Version::from(version); + let file = value + .pins + .get(&package_name) + .and_then(|versions| versions.get(&version)) + .unwrap() + .clone(); + let pinned_package = PinnedPackage::new(package_name.clone(), version, file); + packages.insert(package_name, pinned_package); + } + Resolution::new(packages) + } +}