Surface request stream errors in the resolver (#107)

Closes https://github.com/astral-sh/puffin/issues/105.
This commit is contained in:
Charlie Marsh 2023-10-16 13:26:46 -04:00 committed by GitHub
parent 7e8ffeb2df
commit bae52d5edd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 72 additions and 49 deletions

View file

@ -10,12 +10,18 @@ pub enum ResolveError {
#[error("Failed to find a version of {0} that satisfies the requirement")]
NotFound(Requirement),
#[error("The request stream terminated unexpectedly")]
StreamTermination,
#[error(transparent)]
Client(#[from] puffin_client::PypiClientError),
#[error(transparent)]
TrySend(#[from] futures::channel::mpsc::SendError),
#[error(transparent)]
Join(#[from] tokio::task::JoinError),
#[error(transparent)]
PubGrub(#[from] pubgrub::error::PubGrubError<PubGrubPackage, PubGrubVersion>),
}

View file

@ -7,11 +7,6 @@ use once_cell::sync::Lazy;
pub struct PubGrubVersion(pep440_rs::Version);
impl PubGrubVersion {
/// Returns `true` if this is a pre-release version.
pub fn is_prerelease(&self) -> bool {
self.0.pre.is_some() || self.0.dev.is_some()
}
/// Returns the smallest PEP 440 version that is larger than `self`.
pub fn next(&self) -> PubGrubVersion {
let mut next = self.clone();

View file

@ -7,11 +7,12 @@ use std::sync::Arc;
use anyhow::Result;
use futures::future::Either;
use futures::{StreamExt, TryFutureExt};
use futures::{pin_mut, FutureExt, StreamExt, TryFutureExt};
use pubgrub::error::PubGrubError;
use pubgrub::range::Range;
use pubgrub::solver::{DependencyConstraints, Incompatibility, State};
use pubgrub::type_aliases::SelectedDependencies;
use tokio::select;
use tracing::{debug, trace};
use waitmap::WaitMap;
@ -60,7 +61,7 @@ impl<'a> Resolver<'a> {
// A channel to fetch package metadata (e.g., given `flask`, fetch all versions) and version
// metadata (e.g., given `flask==1.0.0`, fetch the metadata for that version).
let (request_sink, request_stream) = futures::channel::mpsc::unbounded();
tokio::spawn({
let requests_fut = tokio::spawn({
let cache = cache.clone();
let client = client.clone();
async move {
@ -101,7 +102,7 @@ impl<'a> Resolver<'a> {
}
}
Ok::<(), anyhow::Error>(())
Ok::<(), ResolveError>(())
}
});
@ -112,52 +113,39 @@ impl<'a> Resolver<'a> {
request_sink.unbounded_send(Request::Package(package_name))?;
}
// Track the packages and versions that we've requested.
let mut requested_packages = HashSet::new();
let mut requested_versions = HashSet::new();
let mut pins = HashMap::new();
// Run the solver.
let resolve_fut = self.solve(&cache, &request_sink);
let selected_dependencies = self
.solve(
&cache,
&mut pins,
&mut requested_packages,
&mut requested_versions,
&request_sink,
)
.await?;
let requests_fut = requests_fut.fuse();
let resolve_fut = resolve_fut.fuse();
pin_mut!(requests_fut, resolve_fut);
// Map to our own internal resolution type.
let mut resolution = BTreeMap::new();
for (package, version) in selected_dependencies {
let PubGrubPackage::Package(package_name, None) = package else {
continue;
};
let resolution = select! {
result = requests_fut => {
result??;
return Err(ResolveError::StreamTermination);
}
resolution = resolve_fut => {
resolution?
}
};
let version = pep440_rs::Version::from(version);
let file = pins
.get(&package_name)
.and_then(|versions| versions.get(&version))
.unwrap()
.clone();
let pinned_package = PinnedPackage::new(package_name.clone(), version, file);
resolution.insert(package_name, pinned_package);
}
Ok(Resolution::new(resolution))
Ok(resolution.into())
}
/// Run the `PubGrub` solver.
async fn solve(
&self,
cache: &SolverCache,
pins: &mut HashMap<PackageName, HashMap<pep440_rs::Version, File>>,
requested_packages: &mut HashSet<PackageName>,
requested_versions: &mut HashSet<String>,
request_sink: &futures::channel::mpsc::UnboundedSender<Request>,
) -> Result<SelectedDependencies<PubGrubPackage, PubGrubVersion>, ResolveError> {
) -> Result<PubGrubResolution, ResolveError> {
let root = PubGrubPackage::Root;
// Keep track of the packages for which we've requested metadata.
let mut requested_packages = HashSet::new();
let mut requested_versions = HashSet::new();
let mut pins = HashMap::new();
// Start the solve.
let mut state = State::init(root.clone(), MIN_VERSION.clone());
let mut added_dependencies: HashMap<PubGrubPackage, HashSet<PubGrubVersion>> =
@ -170,11 +158,15 @@ impl<'a> Resolver<'a> {
// Fetch the list of candidates.
let Some(potential_packages) = state.partial_solution.potential_packages() else {
return state.partial_solution.extract_solution().ok_or_else(|| {
PubGrubError::Failure(
let Some(selected_dependencies) = state.partial_solution.extract_solution() else {
return Err(PubGrubError::Failure(
"How did we end up with no package to choose but no solution?".into(),
)
.into()
.into());
};
return Ok(PubGrubResolution {
selected_dependencies,
pins,
});
};
@ -184,8 +176,8 @@ impl<'a> Resolver<'a> {
.choose_package_version(
potential_packages,
cache,
pins,
requested_versions,
&mut pins,
&mut requested_versions,
request_sink,
)
.await?;
@ -218,8 +210,8 @@ impl<'a> Resolver<'a> {
package,
&version,
cache,
pins,
requested_packages,
&mut pins,
&mut requested_packages,
request_sink,
)
.await?
@ -516,3 +508,33 @@ enum Dependencies {
/// Container for all available package versions.
Known(DependencyConstraints<PubGrubPackage, PubGrubVersion>),
}
#[derive(Debug)]
struct PubGrubResolution {
/// The selected dependencies.
selected_dependencies: SelectedDependencies<PubGrubPackage, PubGrubVersion>,
/// The selected file (source or built distribution) for each package.
pins: HashMap<PackageName, HashMap<pep440_rs::Version, File>>,
}
impl From<PubGrubResolution> for Resolution {
fn from(value: PubGrubResolution) -> Self {
let mut packages = BTreeMap::new();
for (package, version) in value.selected_dependencies {
let PubGrubPackage::Package(package_name, None) = package else {
continue;
};
let version = pep440_rs::Version::from(version);
let file = value
.pins
.get(&package_name)
.and_then(|versions| versions.get(&version))
.unwrap()
.clone();
let pinned_package = PinnedPackage::new(package_name.clone(), version, file);
packages.insert(package_name, pinned_package);
}
Resolution::new(packages)
}
}