Parallelize resolver (#3627)

## Summary

This PR introduces parallelism to the resolver. Specifically, we can
perform PubGrub resolution on a separate thread, while keeping all I/O
on the tokio thread. We already have the infrastructure set up for this
with the channel and `OnceMap`, which makes this change relatively
simple. The big change needed to make this possible is removing the
lifetimes on some of the types that need to be shared between the
resolver and pubgrub thread.

A related PR, https://github.com/astral-sh/uv/pull/1163, found that
adding `yield_now` calls improved throughput. With optimal scheduling we
might be able to get away with everything on the same thread here.
However, in the ideal pipeline with perfect prefetching, the resolution
and prefetching can run completely in parallel without depending on one
another. While this would be very difficult to achieve, even with our
current prefetching pattern we see a consistent performance improvement
from parallelism.

This does also require reverting a few of the changes from
https://github.com/astral-sh/uv/pull/3413, but not all of them. The
sharing is isolated to the resolver task.

## Test Plan

On smaller tasks performance is mixed with ~2% improvements/regressions
on both sides. However, on medium-large resolution tasks we see the
benefits of parallelism, with improvements anywhere from 10-50%.

```
./scripts/requirements/jupyter.in
Benchmark 1: ./target/profiling/baseline (resolve-warm)
  Time (mean ± σ):      29.2 ms ±   1.8 ms    [User: 20.3 ms, System: 29.8 ms]
  Range (min … max):    26.4 ms …  36.0 ms    91 runs
 
Benchmark 2: ./target/profiling/parallel (resolve-warm)
  Time (mean ± σ):      25.5 ms ±   1.0 ms    [User: 19.5 ms, System: 25.5 ms]
  Range (min … max):    23.6 ms …  27.8 ms    99 runs
 
Summary
  ./target/profiling/parallel (resolve-warm) ran
    1.15 ± 0.08 times faster than ./target/profiling/baseline (resolve-warm)
```
```
./scripts/requirements/boto3.in   
Benchmark 1: ./target/profiling/baseline (resolve-warm)
  Time (mean ± σ):     487.1 ms ±   6.2 ms    [User: 464.6 ms, System: 61.6 ms]
  Range (min … max):   480.0 ms … 497.3 ms    10 runs
 
Benchmark 2: ./target/profiling/parallel (resolve-warm)
  Time (mean ± σ):     430.8 ms ±   9.3 ms    [User: 529.0 ms, System: 77.2 ms]
  Range (min … max):   417.1 ms … 442.5 ms    10 runs
 
Summary
  ./target/profiling/parallel (resolve-warm) ran
    1.13 ± 0.03 times faster than ./target/profiling/baseline (resolve-warm)
```
```
./scripts/requirements/airflow.in 
Benchmark 1: ./target/profiling/baseline (resolve-warm)
  Time (mean ± σ):     478.1 ms ±  18.8 ms    [User: 482.6 ms, System: 205.0 ms]
  Range (min … max):   454.7 ms … 508.9 ms    10 runs
 
Benchmark 2: ./target/profiling/parallel (resolve-warm)
  Time (mean ± σ):     308.7 ms ±  11.7 ms    [User: 428.5 ms, System: 209.5 ms]
  Range (min … max):   287.8 ms … 323.1 ms    10 runs
 
Summary
  ./target/profiling/parallel (resolve-warm) ran
    1.55 ± 0.08 times faster than ./target/profiling/baseline (resolve-warm)
```
This commit is contained in:
Ibraheem Ahmed 2024-05-17 11:47:30 -04:00 committed by GitHub
parent 70a1782745
commit 39af09f09b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
27 changed files with 336 additions and 302 deletions

2
Cargo.lock generated
View file

@ -2369,6 +2369,7 @@ name = "once-map"
version = "0.0.1"
dependencies = [
"dashmap",
"futures",
"tokio",
]
@ -5003,6 +5004,7 @@ dependencies = [
"cache-key",
"chrono",
"clap",
"dashmap",
"derivative",
"distribution-filename",
"distribution-types",

View file

@ -108,7 +108,7 @@ mod resolver {
&index,
&hashes,
&build_context,
&installed_packages,
installed_packages,
DistributionDatabase::new(client, &build_context, concurrency.downloads),
)?;

View file

@ -15,3 +15,4 @@ workspace = true
[dependencies]
dashmap = { workspace = true }
tokio = { workspace = true }
futures = { workspace = true }

View file

@ -63,6 +63,27 @@ impl<K: Eq + Hash, V: Clone> OnceMap<K, V> {
}
}
/// Wait for the result of a job that is running, in a blocking context.
///
/// Will hang if [`OnceMap::done`] isn't called for this key.
pub fn wait_blocking(&self, key: &K) -> Option<V> {
let entry = self.items.get(key)?;
match entry.value() {
Value::Filled(value) => Some(value.clone()),
Value::Waiting(notify) => {
let notify = notify.clone();
drop(entry);
futures::executor::block_on(notify.notified());
let entry = self.items.get(key).expect("map is append-only");
match entry.value() {
Value::Filled(value) => Some(value.clone()),
Value::Waiting(_) => unreachable!("notify was called"),
}
}
}
}
/// Return the result of a previous job, if any.
pub fn get<Q: ?Sized + Hash + Eq>(&self, key: &Q) -> Option<V>
where

View file

@ -153,7 +153,7 @@ impl<'a> BuildContext for BuildDispatch<'a> {
self.index,
&HashStrategy::None,
self,
&EmptyInstalledPackages,
EmptyInstalledPackages,
DistributionDatabase::new(self.client, self, self.concurrency.downloads),
)?;
let graph = resolver.resolve().await.with_context(|| {

View file

@ -68,7 +68,7 @@ impl<'a> Planner<'a> {
#[allow(clippy::too_many_arguments)]
pub fn build(
self,
mut site_packages: SitePackages<'_>,
mut site_packages: SitePackages,
reinstall: &Reinstall,
no_binary: &NoBinary,
hasher: &HashStrategy,

View file

@ -23,9 +23,9 @@ use crate::satisfies::RequirementSatisfaction;
/// An index over the packages installed in an environment.
///
/// Packages are indexed by both name and (for editable installs) URL.
#[derive(Debug)]
pub struct SitePackages<'a> {
venv: &'a PythonEnvironment,
#[derive(Debug, Clone)]
pub struct SitePackages {
venv: PythonEnvironment,
/// The vector of all installed distributions. The `by_name` and `by_url` indices index into
/// this vector. The vector may contain `None` values, which represent distributions that were
/// removed from the virtual environment.
@ -38,9 +38,9 @@ pub struct SitePackages<'a> {
by_url: FxHashMap<Url, Vec<usize>>,
}
impl<'a> SitePackages<'a> {
impl SitePackages {
/// Build an index of installed packages from the given Python executable.
pub fn from_executable(venv: &'a PythonEnvironment) -> Result<SitePackages<'a>> {
pub fn from_executable(venv: &PythonEnvironment) -> Result<SitePackages> {
let mut distributions: Vec<Option<InstalledDist>> = Vec::new();
let mut by_name = FxHashMap::default();
let mut by_url = FxHashMap::default();
@ -68,7 +68,7 @@ impl<'a> SitePackages<'a> {
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
return Ok(Self {
venv,
venv: venv.clone(),
distributions,
by_name,
by_url,
@ -107,7 +107,7 @@ impl<'a> SitePackages<'a> {
}
Ok(Self {
venv,
venv: venv.clone(),
distributions,
by_name,
by_url,
@ -439,7 +439,7 @@ pub enum SatisfiesResult {
Unsatisfied(String),
}
impl IntoIterator for SitePackages<'_> {
impl IntoIterator for SitePackages {
type Item = InstalledDist;
type IntoIter = Flatten<std::vec::IntoIter<Option<InstalledDist>>>;
@ -540,7 +540,7 @@ impl Diagnostic {
}
}
impl InstalledPackagesProvider for SitePackages<'_> {
impl InstalledPackagesProvider for SitePackages {
fn iter(&self) -> impl Iterator<Item = &InstalledDist> {
self.iter()
}

View file

@ -1,6 +1,7 @@
use itertools::Either;
use std::env;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use same_file::is_same_file;
@ -12,7 +13,10 @@ use crate::{find_default_python, find_requested_python, Error, Interpreter, Targ
/// A Python environment, consisting of a Python [`Interpreter`] and its associated paths.
#[derive(Debug, Clone)]
pub struct PythonEnvironment {
pub struct PythonEnvironment(Arc<PythonEnvironmentShared>);
#[derive(Debug, Clone)]
struct PythonEnvironmentShared {
root: PathBuf,
interpreter: Interpreter,
}
@ -46,10 +50,10 @@ impl PythonEnvironment {
interpreter.base_prefix().display()
);
Ok(Self {
Ok(Self(Arc::new(PythonEnvironmentShared {
root: venv,
interpreter,
})
})))
}
/// Create a [`PythonEnvironment`] for a Python interpreter specifier (e.g., a path or a binary name).
@ -57,57 +61,58 @@ impl PythonEnvironment {
let Some(interpreter) = find_requested_python(python, cache)? else {
return Err(Error::RequestedPythonNotFound(python.to_string()));
};
Ok(Self {
Ok(Self(Arc::new(PythonEnvironmentShared {
root: interpreter.prefix().to_path_buf(),
interpreter,
})
})))
}
/// Create a [`PythonEnvironment`] for the default Python interpreter.
pub fn from_default_python(cache: &Cache) -> Result<Self, Error> {
let interpreter = find_default_python(cache)?;
Ok(Self {
Ok(Self(Arc::new(PythonEnvironmentShared {
root: interpreter.prefix().to_path_buf(),
interpreter,
})
})))
}
/// Create a [`PythonEnvironment`] from an existing [`Interpreter`] and root directory.
pub fn from_interpreter(interpreter: Interpreter) -> Self {
Self {
Self(Arc::new(PythonEnvironmentShared {
root: interpreter.prefix().to_path_buf(),
interpreter,
}
}))
}
/// Create a [`PythonEnvironment`] from an existing [`Interpreter`] and `--target` directory.
#[must_use]
pub fn with_target(self, target: Target) -> Self {
Self {
interpreter: self.interpreter.with_target(target),
..self
}
let inner = Arc::unwrap_or_clone(self.0);
Self(Arc::new(PythonEnvironmentShared {
interpreter: inner.interpreter.with_target(target),
..inner
}))
}
/// Returns the root (i.e., `prefix`) of the Python interpreter.
pub fn root(&self) -> &Path {
&self.root
&self.0.root
}
/// Return the [`Interpreter`] for this virtual environment.
pub fn interpreter(&self) -> &Interpreter {
&self.interpreter
&self.0.interpreter
}
/// Return the [`PyVenvConfiguration`] for this virtual environment, as extracted from the
/// `pyvenv.cfg` file.
pub fn cfg(&self) -> Result<PyVenvConfiguration, Error> {
Ok(PyVenvConfiguration::parse(self.root.join("pyvenv.cfg"))?)
Ok(PyVenvConfiguration::parse(self.0.root.join("pyvenv.cfg"))?)
}
/// Returns the location of the Python executable.
pub fn python_executable(&self) -> &Path {
self.interpreter.sys_executable()
self.0.interpreter.sys_executable()
}
/// Returns an iterator over the `site-packages` directories inside a virtual environment.
@ -118,11 +123,11 @@ impl PythonEnvironment {
/// Some distributions also create symbolic links from `purelib` to `platlib`; in such cases, we
/// still deduplicate the entries, returning a single path.
pub fn site_packages(&self) -> impl Iterator<Item = &Path> {
if let Some(target) = self.interpreter.target() {
if let Some(target) = self.0.interpreter.target() {
Either::Left(std::iter::once(target.root()))
} else {
let purelib = self.interpreter.purelib();
let platlib = self.interpreter.platlib();
let purelib = self.0.interpreter.purelib();
let platlib = self.0.interpreter.platlib();
Either::Right(std::iter::once(purelib).chain(
if purelib == platlib || is_same_file(purelib, platlib).unwrap_or(false) {
None
@ -135,31 +140,31 @@ impl PythonEnvironment {
/// Returns the path to the `bin` directory inside a virtual environment.
pub fn scripts(&self) -> &Path {
self.interpreter.scripts()
self.0.interpreter.scripts()
}
/// Grab a file lock for the virtual environment to prevent concurrent writes across processes.
pub fn lock(&self) -> Result<LockedFile, std::io::Error> {
if let Some(target) = self.interpreter.target() {
if let Some(target) = self.0.interpreter.target() {
// If we're installing into a `--target`, use a target-specific lock file.
LockedFile::acquire(
target.root().join(".lock"),
target.root().simplified_display(),
)
} else if self.interpreter.is_virtualenv() {
} else if self.0.interpreter.is_virtualenv() {
// If the environment a virtualenv, use a virtualenv-specific lock file.
LockedFile::acquire(self.root.join(".lock"), self.root.simplified_display())
LockedFile::acquire(self.0.root.join(".lock"), self.0.root.simplified_display())
} else {
// Otherwise, use a global lock file.
LockedFile::acquire(
env::temp_dir().join(format!("uv-{}.lock", cache_key::digest(&self.root))),
self.root.simplified_display(),
env::temp_dir().join(format!("uv-{}.lock", cache_key::digest(&self.0.root))),
self.0.root.simplified_display(),
)
}
}
/// Return the [`Interpreter`] for this virtual environment.
pub fn into_interpreter(self) -> Interpreter {
self.interpreter
Arc::unwrap_or_clone(self.0).interpreter
}
}

View file

@ -1,4 +1,4 @@
use std::collections::VecDeque;
use std::{collections::VecDeque, sync::Arc};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
@ -197,17 +197,18 @@ impl<'a, Context: BuildContext> LookaheadResolver<'a, Context> {
// Fetch the metadata for the distribution.
let requires_dist = {
let id = dist.version_id();
if let Some(archive) = self
.index
.get_metadata(&id)
.as_deref()
.and_then(|response| {
if let MetadataResponse::Found(archive, ..) = response {
Some(archive)
} else {
None
}
})
if let Some(archive) =
self.index
.distributions()
.get(&id)
.as_deref()
.and_then(|response| {
if let MetadataResponse::Found(archive, ..) = response {
Some(archive)
} else {
None
}
})
{
// If the metadata is already in the index, return it.
archive
@ -234,7 +235,8 @@ impl<'a, Context: BuildContext> LookaheadResolver<'a, Context> {
// Insert the metadata into the index.
self.index
.insert_metadata(id, MetadataResponse::Found(archive));
.distributions()
.done(id, Arc::new(MetadataResponse::Found(archive)));
requires_dist
.into_iter()

View file

@ -1,5 +1,6 @@
use std::borrow::Cow;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{Context, Result};
use futures::stream::FuturesOrdered;
@ -117,17 +118,18 @@ impl<'a, Context: BuildContext> SourceTreeResolver<'a, Context> {
// Fetch the metadata for the distribution.
let metadata = {
let id = VersionId::from_url(source.url());
if let Some(archive) = self
.index
.get_metadata(&id)
.as_deref()
.and_then(|response| {
if let MetadataResponse::Found(archive) = response {
Some(archive)
} else {
None
}
})
if let Some(archive) =
self.index
.distributions()
.get(&id)
.as_deref()
.and_then(|response| {
if let MetadataResponse::Found(archive) = response {
Some(archive)
} else {
None
}
})
{
// If the metadata is already in the index, return it.
archive.metadata.clone()
@ -138,7 +140,8 @@ impl<'a, Context: BuildContext> SourceTreeResolver<'a, Context> {
// Insert the metadata into the index.
self.index
.insert_metadata(id, MetadataResponse::Found(archive.clone()));
.distributions()
.done(id, Arc::new(MetadataResponse::Found(archive.clone())));
archive.metadata
}

View file

@ -1,6 +1,7 @@
use std::borrow::Cow;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use anyhow::Result;
use configparser::ini::Ini;
@ -254,13 +255,18 @@ impl<'a, Context: BuildContext> NamedRequirementsResolver<'a, Context> {
// Fetch the metadata for the distribution.
let name = {
let id = VersionId::from_url(source.url());
if let Some(archive) = index.get_metadata(&id).as_deref().and_then(|response| {
if let MetadataResponse::Found(archive) = response {
Some(archive)
} else {
None
}
}) {
if let Some(archive) = index
.distributions()
.get(&id)
.as_deref()
.and_then(|response| {
if let MetadataResponse::Found(archive) = response {
Some(archive)
} else {
None
}
})
{
// If the metadata is already in the index, return it.
archive.metadata.name.clone()
} else {
@ -272,7 +278,9 @@ impl<'a, Context: BuildContext> NamedRequirementsResolver<'a, Context> {
let name = archive.metadata.name.clone();
// Insert the metadata into the index.
index.insert_metadata(id, MetadataResponse::Found(archive));
index
.distributions()
.done(id, Arc::new(MetadataResponse::Found(archive)));
name
}

View file

@ -56,6 +56,7 @@ tokio = { workspace = true }
tokio-stream = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }
dashmap = { workspace = true }
[dev-dependencies]
uv-interpreter = { workspace = true }

View file

@ -1,7 +1,6 @@
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Formatter;
use std::ops::Deref;
use std::rc::Rc;
use std::sync::Arc;
use indexmap::IndexMap;
@ -9,6 +8,7 @@ use pubgrub::range::Range;
use pubgrub::report::{DefaultStringReporter, DerivationTree, External, Reporter};
use rustc_hash::FxHashMap;
use dashmap::{DashMap, DashSet};
use distribution_types::{BuiltDist, IndexLocations, InstalledDist, ParsedUrlError, SourceDist};
use once_map::OnceMap;
use pep440_rs::Version;
@ -19,10 +19,7 @@ use crate::candidate_selector::CandidateSelector;
use crate::dependency_provider::UvDependencyProvider;
use crate::pubgrub::{PubGrubPackage, PubGrubPython, PubGrubReportFormatter};
use crate::python_requirement::PythonRequirement;
use crate::resolver::{
IncompletePackage, SharedMap, SharedSet, UnavailablePackage, UnavailableReason,
VersionsResponse,
};
use crate::resolver::{IncompletePackage, UnavailablePackage, UnavailableReason, VersionsResponse};
#[derive(Debug, thiserror::Error)]
pub enum ResolveError {
@ -238,8 +235,8 @@ impl NoSolutionError {
pub(crate) fn with_available_versions(
mut self,
python_requirement: &PythonRequirement,
visited: &SharedSet<PackageName>,
package_versions: &OnceMap<PackageName, Rc<VersionsResponse>>,
visited: &DashSet<PackageName>,
package_versions: &OnceMap<PackageName, Arc<VersionsResponse>>,
) -> Self {
let mut available_versions = IndexMap::default();
for package in self.derivation_tree.packages() {
@ -263,7 +260,7 @@ impl NoSolutionError {
// tree, but were never visited during resolution. We _may_ have metadata for
// these packages, but it's non-deterministic, and omitting them ensures that
// we represent the state of the resolver at the time of failure.
if visited.borrow().contains(name) {
if visited.contains(name) {
if let Some(response) = package_versions.get(name) {
if let VersionsResponse::Found(ref version_maps) = *response {
for version_map in version_maps {
@ -302,9 +299,8 @@ impl NoSolutionError {
#[must_use]
pub(crate) fn with_unavailable_packages(
mut self,
unavailable_packages: &SharedMap<PackageName, UnavailablePackage>,
unavailable_packages: &DashMap<PackageName, UnavailablePackage>,
) -> Self {
let unavailable_packages = unavailable_packages.borrow();
let mut new = FxHashMap::default();
for package in self.derivation_tree.packages() {
if let PubGrubPackage::Package(name, _, _) = package {
@ -321,14 +317,14 @@ impl NoSolutionError {
#[must_use]
pub(crate) fn with_incomplete_packages(
mut self,
incomplete_packages: &SharedMap<PackageName, SharedMap<Version, IncompletePackage>>,
incomplete_packages: &DashMap<PackageName, DashMap<Version, IncompletePackage>>,
) -> Self {
let mut new = FxHashMap::default();
let incomplete_packages = incomplete_packages.borrow();
for package in self.derivation_tree.packages() {
if let PubGrubPackage::Package(name, _, _) = package {
if let Some(versions) = incomplete_packages.get(name) {
for (version, reason) in versions.borrow().iter() {
for entry in versions.iter() {
let (version, reason) = entry.pair();
new.entry(name.clone())
.or_insert_with(BTreeMap::default)
.insert(version.clone(), reason.clone());

View file

@ -1,4 +1,5 @@
use std::str::FromStr;
use std::sync::Arc;
use rustc_hash::FxHashMap;
use tracing::trace;
@ -69,7 +70,7 @@ impl Preference {
/// A set of pinned packages that should be preserved during resolution, if possible.
#[derive(Debug, Clone)]
pub(crate) struct Preferences(FxHashMap<PackageName, Pin>);
pub(crate) struct Preferences(Arc<FxHashMap<PackageName, Pin>>);
impl Preferences {
/// Create a map of pinned packages from an iterator of [`Preference`] entries.
@ -81,10 +82,10 @@ impl Preferences {
preferences: PreferenceIterator,
markers: Option<&MarkerEnvironment>,
) -> Self {
Self(
// TODO(zanieb): We should explicitly ensure that when a package name is seen multiple times
// that the newest or oldest version is preferred dependning on the resolution strategy;
// right now, the order is dependent on the given iterator.
// TODO(zanieb): We should explicitly ensure that when a package name is seen multiple times
// that the newest or oldest version is preferred dependning on the resolution strategy;
// right now, the order is dependent on the given iterator.
let preferences =
preferences
.into_iter()
.filter_map(|preference| {
@ -130,8 +131,9 @@ impl Preferences {
}
}
})
.collect(),
)
.collect();
Self(Arc::new(preferences))
}
/// Return the pinned version for a package, if any.

View file

@ -1,5 +1,5 @@
use std::hash::BuildHasherDefault;
use std::rc::Rc;
use std::sync::Arc;
use pubgrub::range::Range;
use pubgrub::solver::{Kind, State};
@ -45,8 +45,8 @@ impl ResolutionGraph {
pub(crate) fn from_state(
selection: &SelectedDependencies<UvDependencyProvider>,
pins: &FilePins,
packages: &OnceMap<PackageName, Rc<VersionsResponse>>,
distributions: &OnceMap<VersionId, Rc<MetadataResponse>>,
packages: &OnceMap<PackageName, Arc<VersionsResponse>>,
distributions: &OnceMap<VersionId, Arc<MetadataResponse>>,
state: &State<UvDependencyProvider>,
preferences: &Preferences,
editables: Editables,
@ -458,7 +458,7 @@ impl ResolutionGraph {
VersionOrUrlRef::Url(verbatim_url) => VersionId::from_url(verbatim_url.raw()),
};
let res = index
.distributions
.distributions()
.get(&version_id)
.expect("every package in resolution graph has metadata");
let MetadataResponse::Found(archive, ..) = &*res else {

View file

@ -44,7 +44,7 @@ pub(crate) struct BatchPrefetcher {
impl BatchPrefetcher {
/// Prefetch a large number of versions if we already unsuccessfully tried many versions.
pub(crate) async fn prefetch_batches(
pub(crate) fn prefetch_batches(
&mut self,
next: &PubGrubPackage,
version: &Version,
@ -65,9 +65,8 @@ impl BatchPrefetcher {
// This is immediate, we already fetched the version map.
let versions_response = index
.packages
.wait(package_name)
.await
.packages()
.wait_blocking(package_name)
.ok_or(ResolveError::Unregistered)?;
let VersionsResponse::Found(ref version_map) = *versions_response else {
@ -142,9 +141,10 @@ impl BatchPrefetcher {
dist
);
prefetch_count += 1;
if index.distributions.register(candidate.version_id()) {
if index.distributions().register(candidate.version_id()) {
let request = Request::from(dist);
request_sink.send(request).await?;
request_sink.blocking_send(request)?;
}
}

View file

@ -1,4 +1,4 @@
use std::rc::Rc;
use std::sync::Arc;
use distribution_types::VersionId;
use once_map::OnceMap;
@ -7,34 +7,27 @@ use uv_normalize::PackageName;
use crate::resolver::provider::{MetadataResponse, VersionsResponse};
/// In-memory index of package metadata.
#[derive(Default, Clone)]
pub struct InMemoryIndex(Arc<SharedInMemoryIndex>);
#[derive(Default)]
pub struct InMemoryIndex {
struct SharedInMemoryIndex {
/// A map from package name to the metadata for that package and the index where the metadata
/// came from.
pub(crate) packages: OnceMap<PackageName, Rc<VersionsResponse>>,
packages: OnceMap<PackageName, Arc<VersionsResponse>>,
/// A map from package ID to metadata for that distribution.
pub(crate) distributions: OnceMap<VersionId, Rc<MetadataResponse>>,
distributions: OnceMap<VersionId, Arc<MetadataResponse>>,
}
impl InMemoryIndex {
/// Insert a [`VersionsResponse`] into the index.
pub fn insert_package(&self, package_name: PackageName, response: VersionsResponse) {
self.packages.done(package_name, Rc::new(response));
/// Returns a reference to the package metadata map.
pub fn packages(&self) -> &OnceMap<PackageName, Arc<VersionsResponse>> {
&self.0.packages
}
/// Insert a [`Metadata23`] into the index.
pub fn insert_metadata(&self, version_id: VersionId, response: MetadataResponse) {
self.distributions.done(version_id, Rc::new(response));
}
/// Get the [`VersionsResponse`] for a given package name, without waiting.
pub fn get_package(&self, package_name: &PackageName) -> Option<Rc<VersionsResponse>> {
self.packages.get(package_name)
}
/// Get the [`MetadataResponse`] for a given package ID, without waiting.
pub fn get_metadata(&self, version_id: &VersionId) -> Option<Rc<MetadataResponse>> {
self.distributions.get(version_id)
/// Returns a reference to the distribution metadata map.
pub fn distributions(&self) -> &OnceMap<VersionId, Arc<MetadataResponse>> {
&self.0.distributions
}
}

View file

@ -1,22 +1,23 @@
//! Given a set of requirements, find a set of compatible packages.
use std::borrow::Cow;
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::rc::Rc;
use std::sync::Arc;
use std::thread;
use anyhow::Result;
use dashmap::{DashMap, DashSet};
use futures::{FutureExt, StreamExt};
use itertools::Itertools;
use pubgrub::error::PubGrubError;
use pubgrub::range::Range;
use pubgrub::solver::{Incompatibility, State};
use rustc_hash::{FxHashMap, FxHashSet};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::oneshot;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, enabled, info_span, instrument, trace, warn, Instrument, Level};
use tracing::{debug, enabled, instrument, trace, warn, Level};
use distribution_types::{
BuiltDist, Dist, DistributionMetadata, IncompatibleDist, IncompatibleSource, IncompatibleWheel,
@ -172,10 +173,14 @@ enum ResolverVersion {
Unavailable(Version, UnavailableVersion),
}
pub(crate) type SharedMap<K, V> = Rc<RefCell<HashMap<K, V>>>;
pub(crate) type SharedSet<K> = Rc<RefCell<HashSet<K>>>;
pub struct Resolver<Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvider> {
state: ResolverState<InstalledPackages>,
provider: Provider,
}
pub struct Resolver<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvider> {
/// State that is shared between the prefetcher and the PubGrub solver during
/// resolution.
struct ResolverState<InstalledPackages: InstalledPackagesProvider> {
project: Option<PackageName>,
requirements: Vec<Requirement>,
constraints: Constraints,
@ -186,25 +191,24 @@ pub struct Resolver<'a, Provider: ResolverProvider, InstalledPackages: Installed
urls: Urls,
locals: Locals,
dependency_mode: DependencyMode,
hasher: &'a HashStrategy,
hasher: HashStrategy,
/// When not set, the resolver is in "universal" mode.
markers: Option<&'a MarkerEnvironment>,
python_requirement: &'a PythonRequirement,
markers: Option<MarkerEnvironment>,
python_requirement: PythonRequirement,
selector: CandidateSelector,
index: &'a InMemoryIndex,
installed_packages: &'a InstalledPackages,
index: InMemoryIndex,
installed_packages: InstalledPackages,
/// Incompatibilities for packages that are entirely unavailable.
unavailable_packages: SharedMap<PackageName, UnavailablePackage>,
unavailable_packages: DashMap<PackageName, UnavailablePackage>,
/// Incompatibilities for packages that are unavailable at specific versions.
incomplete_packages: SharedMap<PackageName, SharedMap<Version, IncompletePackage>>,
incomplete_packages: DashMap<PackageName, DashMap<Version, IncompletePackage>>,
/// The set of all registry-based packages visited during resolution.
visited: SharedSet<PackageName>,
visited: DashSet<PackageName>,
reporter: Option<Arc<dyn Reporter>>,
provider: Provider,
}
impl<'a, Context: BuildContext, InstalledPackages: InstalledPackagesProvider>
Resolver<'a, DefaultResolverProvider<'a, Context>, InstalledPackages>
Resolver<DefaultResolverProvider<'a, Context>, InstalledPackages>
{
/// Initialize a new resolver using the default backend doing real requests.
///
@ -235,7 +239,7 @@ impl<'a, Context: BuildContext, InstalledPackages: InstalledPackagesProvider>
index: &'a InMemoryIndex,
hasher: &'a HashStrategy,
build_context: &'a Context,
installed_packages: &'a InstalledPackages,
installed_packages: InstalledPackages,
database: DistributionDatabase<'a, Context>,
) -> Result<Self, ResolveError> {
let provider = DefaultResolverProvider::new(
@ -263,26 +267,26 @@ impl<'a, Context: BuildContext, InstalledPackages: InstalledPackagesProvider>
}
}
impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvider>
Resolver<'a, Provider, InstalledPackages>
impl<Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvider>
Resolver<Provider, InstalledPackages>
{
/// Initialize a new resolver using a user provided backend.
#[allow(clippy::too_many_arguments)]
pub fn new_custom_io(
manifest: Manifest,
options: Options,
hasher: &'a HashStrategy,
markers: Option<&'a MarkerEnvironment>,
python_requirement: &'a PythonRequirement,
index: &'a InMemoryIndex,
hasher: &HashStrategy,
markers: Option<&MarkerEnvironment>,
python_requirement: &PythonRequirement,
index: &InMemoryIndex,
provider: Provider,
installed_packages: &'a InstalledPackages,
installed_packages: InstalledPackages,
) -> Result<Self, ResolveError> {
Ok(Self {
index,
unavailable_packages: SharedMap::default(),
incomplete_packages: SharedMap::default(),
visited: SharedSet::default(),
let state = ResolverState {
index: index.clone(),
unavailable_packages: DashMap::default(),
incomplete_packages: DashMap::default(),
visited: DashSet::default(),
selector: CandidateSelector::for_resolution(options, &manifest, markers),
dependency_mode: options.dependency_mode,
urls: Urls::from_manifest(&manifest, markers, options.dependency_mode)?,
@ -294,43 +298,63 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
preferences: Preferences::from_iter(manifest.preferences, markers),
exclusions: manifest.exclusions,
editables: Editables::from_requirements(manifest.editables),
hasher,
markers,
python_requirement,
hasher: hasher.clone(),
markers: markers.cloned(),
python_requirement: python_requirement.clone(),
reporter: None,
provider,
installed_packages,
})
};
Ok(Self { state, provider })
}
/// Set the [`Reporter`] to use for this installer.
#[must_use]
pub fn with_reporter(self, reporter: impl Reporter + 'static) -> Self {
let reporter = Arc::new(reporter);
Self {
reporter: Some(reporter.clone()),
state: ResolverState {
reporter: Some(reporter.clone()),
..self.state
},
provider: self.provider.with_reporter(Facade { reporter }),
..self
}
}
/// Resolve a set of requirements into a set of pinned versions.
pub async fn resolve(self) -> Result<ResolutionGraph, ResolveError> {
let state = Arc::new(self.state);
let provider = Arc::new(self.provider);
// A channel to fetch package metadata (e.g., given `flask`, fetch all versions) and version
// metadata (e.g., given `flask==1.0.0`, fetch the metadata for that version).
// Channel size is set large to accommodate batch prefetching.
let (request_sink, request_stream) = tokio::sync::mpsc::channel(300);
let (request_sink, request_stream) = mpsc::channel(300);
// Run the fetcher.
let requests_fut = self.fetch(request_stream).fuse();
let requests_fut = state.clone().fetch(provider.clone(), request_stream).fuse();
// Run the solver.
let resolve_fut = self.solve(request_sink).boxed_local().fuse();
// Spawn the PubGrub solver on a dedicated thread.
let solver = state.clone();
let (tx, rx) = oneshot::channel();
thread::Builder::new()
.name("uv-resolver".into())
.spawn(move || {
let result = solver.solve(request_sink);
tx.send(result).unwrap();
})
.unwrap();
let resolve_fut = async move {
rx.await
.map_err(|_| ResolveError::ChannelClosed)
.and_then(|result| result)
};
// Wait for both to complete.
match tokio::try_join!(requests_fut, resolve_fut) {
Ok(((), resolution)) => {
self.on_complete();
state.on_complete();
Ok(resolution)
}
Err(err) => {
@ -338,15 +362,15 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
Err(if let ResolveError::NoSolution(err) = err {
ResolveError::NoSolution(
err.with_available_versions(
self.python_requirement,
&self.visited,
&self.index.packages,
&state.python_requirement,
&state.visited,
state.index.packages(),
)
.with_selector(self.selector.clone())
.with_python_requirement(self.python_requirement)
.with_index_locations(self.provider.index_locations())
.with_unavailable_packages(&self.unavailable_packages)
.with_incomplete_packages(&self.incomplete_packages),
.with_selector(state.selector.clone())
.with_python_requirement(&state.python_requirement)
.with_index_locations(provider.index_locations())
.with_unavailable_packages(&state.unavailable_packages)
.with_incomplete_packages(&state.incomplete_packages),
)
} else {
err
@ -354,16 +378,18 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
}
}
}
}
impl<InstalledPackages: InstalledPackagesProvider> ResolverState<InstalledPackages> {
/// Run the PubGrub solver.
#[instrument(skip_all)]
async fn solve(
&self,
request_sink: tokio::sync::mpsc::Sender<Request>,
fn solve(
self: Arc<Self>,
request_sink: Sender<Request>,
) -> Result<ResolutionGraph, ResolveError> {
let root = PubGrubPackage::Root(self.project.clone());
let mut prefetcher = BatchPrefetcher::default();
let mut state = ResolverState {
let mut state = SolveState {
pubgrub: State::init(root.clone(), MIN_VERSION.clone()),
next: root,
pins: FilePins::default(),
@ -386,8 +412,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
Self::pre_visit(
state.pubgrub.partial_solution.prioritized_packages(),
&request_sink,
)
.await?;
)?;
}
// Choose a package version.
@ -403,8 +428,8 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
return ResolutionGraph::from_state(
&selection,
&state.pins,
&self.index.packages,
&self.index.distributions,
self.index.packages(),
self.index.distributions(),
&state.pubgrub,
&self.preferences,
self.editables.clone(),
@ -421,14 +446,12 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
.ok_or_else(|| {
PubGrubError::Failure("a package was chosen but we don't have a term.".into())
})?;
let decision = self
.choose_version(
&state.next,
term_intersection.unwrap_positive(),
&mut state.pins,
&request_sink,
)
.await?;
let decision = self.choose_version(
&state.next,
term_intersection.unwrap_positive(),
&mut state.pins,
&request_sink,
)?;
// Pick the next compatible version.
let version = match decision {
@ -443,7 +466,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
// Check if the decision was due to the package being unavailable
if let PubGrubPackage::Package(ref package_name, _, _) = state.next {
if let Some(entry) = self.unavailable_packages.borrow().get(package_name) {
if let Some(entry) = self.unavailable_packages.get(package_name) {
state
.pubgrub
.add_incompatibility(Incompatibility::custom_term(
@ -512,16 +535,14 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
}
};
prefetcher
.prefetch_batches(
&state.next,
&version,
term_intersection.unwrap_positive(),
&request_sink,
self.index,
&self.selector,
)
.await?;
prefetcher.prefetch_batches(
&state.next,
&version,
term_intersection.unwrap_positive(),
&request_sink,
&self.index,
&self.selector,
)?;
self.on_progress(&state.next, &version);
@ -533,10 +554,12 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
{
// Retrieve that package dependencies.
let package = &state.next;
let dependencies = match self
.get_dependencies(package, &version, &mut state.priorities, &request_sink)
.await?
{
let dependencies = match self.get_dependencies(
package,
&version,
&mut state.priorities,
&request_sink,
)? {
Dependencies::Unavailable(reason) => {
state
.pubgrub
@ -590,10 +613,10 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
/// Visit a [`PubGrubPackage`] prior to selection. This should be called on a [`PubGrubPackage`]
/// before it is selected, to allow metadata to be fetched in parallel.
async fn visit_package(
fn visit_package(
&self,
package: &PubGrubPackage,
request_sink: &tokio::sync::mpsc::Sender<Request>,
request_sink: &Sender<Request>,
) -> Result<(), ResolveError> {
match package {
PubGrubPackage::Root(_) => {}
@ -606,8 +629,8 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
}
// Emit a request to fetch the metadata for this package.
if self.index.packages.register(name.clone()) {
request_sink.send(Request::Package(name.clone())).await?;
if self.index.packages().register(name.clone()) {
request_sink.blocking_send(Request::Package(name.clone()))?;
}
}
PubGrubPackage::Package(name, _extra, Some(url)) => {
@ -623,8 +646,8 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
// Emit a request to fetch the metadata for this distribution.
let dist = Dist::from_url(name.clone(), url.clone())?;
if self.index.distributions.register(dist.version_id()) {
request_sink.send(Request::Dist(dist)).await?;
if self.index.distributions().register(dist.version_id()) {
request_sink.blocking_send(Request::Dist(dist))?;
}
}
}
@ -633,9 +656,9 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
/// Visit the set of [`PubGrubPackage`] candidates prior to selection. This allows us to fetch
/// metadata for all of the packages in parallel.
async fn pre_visit<'data>(
fn pre_visit<'data>(
packages: impl Iterator<Item = (&'data PubGrubPackage, &'data Range<Version>)>,
request_sink: &tokio::sync::mpsc::Sender<Request>,
request_sink: &Sender<Request>,
) -> Result<(), ResolveError> {
// Iterate over the potential packages, and fetch file metadata for any of them. These
// represent our current best guesses for the versions that we _might_ select.
@ -643,9 +666,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
let PubGrubPackage::Package(package_name, None, None) = package else {
continue;
};
request_sink
.send(Request::Prefetch(package_name.clone(), range.clone()))
.await?;
request_sink.blocking_send(Request::Prefetch(package_name.clone(), range.clone()))?;
}
Ok(())
}
@ -655,12 +676,12 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
///
/// Returns [None] when there are no versions in the given range.
#[instrument(skip_all, fields(%package))]
async fn choose_version(
fn choose_version(
&self,
package: &'a PubGrubPackage,
package: &PubGrubPackage,
range: &Range<Version>,
pins: &mut FilePins,
request_sink: &tokio::sync::mpsc::Sender<Request>,
request_sink: &Sender<Request>,
) -> Result<Option<ResolverVersion>, ResolveError> {
match package {
PubGrubPackage::Root(_) => Ok(Some(ResolverVersion::Available(MIN_VERSION.clone()))),
@ -718,9 +739,8 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
let dist = PubGrubDistribution::from_url(package_name, url);
let response = self
.index
.distributions
.wait(&dist.version_id())
.await
.distributions()
.wait_blocking(&dist.version_id())
.ok_or(ResolveError::Unregistered)?;
// If we failed to fetch the metadata for a URL, we can't proceed.
@ -728,26 +748,25 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
MetadataResponse::Found(archive) => &archive.metadata,
MetadataResponse::Offline => {
self.unavailable_packages
.borrow_mut()
.insert(package_name.clone(), UnavailablePackage::Offline);
return Ok(None);
}
MetadataResponse::InvalidMetadata(err) => {
self.unavailable_packages.borrow_mut().insert(
self.unavailable_packages.insert(
package_name.clone(),
UnavailablePackage::InvalidMetadata(err.to_string()),
);
return Ok(None);
}
MetadataResponse::InconsistentMetadata(err) => {
self.unavailable_packages.borrow_mut().insert(
self.unavailable_packages.insert(
package_name.clone(),
UnavailablePackage::InvalidMetadata(err.to_string()),
);
return Ok(None);
}
MetadataResponse::InvalidStructure(err) => {
self.unavailable_packages.borrow_mut().insert(
self.unavailable_packages.insert(
package_name.clone(),
UnavailablePackage::InvalidStructure(err.to_string()),
);
@ -783,30 +802,25 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
// Wait for the metadata to be available.
let versions_response = self
.index
.packages
.wait(package_name)
.instrument(info_span!("package_wait", %package_name))
.await
.packages()
.wait_blocking(package_name)
.ok_or(ResolveError::Unregistered)?;
self.visited.borrow_mut().insert(package_name.clone());
self.visited.insert(package_name.clone());
let version_maps = match *versions_response {
VersionsResponse::Found(ref version_maps) => version_maps.as_slice(),
VersionsResponse::NoIndex => {
self.unavailable_packages
.borrow_mut()
.insert(package_name.clone(), UnavailablePackage::NoIndex);
&[]
}
VersionsResponse::Offline => {
self.unavailable_packages
.borrow_mut()
.insert(package_name.clone(), UnavailablePackage::Offline);
&[]
}
VersionsResponse::NotFound => {
self.unavailable_packages
.borrow_mut()
.insert(package_name.clone(), UnavailablePackage::NotFound);
&[]
}
@ -820,7 +834,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
range,
version_maps,
&self.preferences,
self.installed_packages,
&self.installed_packages,
&self.exclusions,
) else {
// Short circuit: we couldn't find _any_ versions for a package.
@ -863,9 +877,9 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
// Emit a request to fetch the metadata for this version.
if matches!(package, PubGrubPackage::Package(_, _, _)) {
if self.index.distributions.register(candidate.version_id()) {
if self.index.distributions().register(candidate.version_id()) {
let request = Request::from(dist.for_resolution());
request_sink.send(request).await?;
request_sink.blocking_send(request)?;
}
}
@ -876,12 +890,12 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
/// Given a candidate package and version, return its dependencies.
#[instrument(skip_all, fields(%package, %version))]
async fn get_dependencies(
fn get_dependencies(
&self,
package: &PubGrubPackage,
version: &Version,
priorities: &mut PubGrubPriorities,
request_sink: &tokio::sync::mpsc::Sender<Request>,
request_sink: &Sender<Request>,
) -> Result<Dependencies, ResolveError> {
match package {
PubGrubPackage::Root(_) => {
@ -894,7 +908,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
None,
&self.urls,
&self.locals,
self.markers,
self.markers.as_ref(),
);
let mut dependencies = match dependencies {
@ -913,7 +927,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
priorities.insert(package, version);
// Emit a request to fetch the metadata for this package.
self.visit_package(package, request_sink).await?;
self.visit_package(package, request_sink)?;
}
// Add a dependency on each editable.
@ -942,7 +956,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
// Add any constraints.
for constraint in self.constraints.get(&metadata.name).into_iter().flatten() {
if constraint.evaluate_markers(self.markers, &[]) {
if constraint.evaluate_markers(self.markers.as_ref(), &[]) {
let PubGrubRequirement { package, version } =
PubGrubRequirement::from_constraint(
constraint,
@ -974,10 +988,8 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
// Wait for the metadata to be available.
self.index
.distributions
.wait(&version_id)
.instrument(info_span!("distributions_wait", %version_id))
.await
.distributions()
.wait_blocking(&version_id)
.ok_or(ResolveError::Unregistered)?;
}
@ -1000,7 +1012,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
extra.as_ref(),
&self.urls,
&self.locals,
self.markers,
self.markers.as_ref(),
)?;
for (dep_package, dep_version) in dependencies.iter() {
@ -1010,7 +1022,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
priorities.insert(dep_package, dep_version);
// Emit a request to fetch the metadata for this package.
self.visit_package(dep_package, request_sink).await?;
self.visit_package(dep_package, request_sink)?;
}
return Ok(Dependencies::Available(dependencies.into()));
@ -1024,11 +1036,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
let version_id = dist.version_id();
// If the package does not exist in the registry or locally, we cannot fetch its dependencies
if self
.unavailable_packages
.borrow()
.get(package_name)
.is_some()
if self.unavailable_packages.get(package_name).is_some()
&& self
.installed_packages
.get_packages(package_name)
@ -1046,30 +1054,24 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
// Wait for the metadata to be available.
let response = self
.index
.distributions
.wait(&version_id)
.instrument(info_span!("distributions_wait", %version_id))
.await
.distributions()
.wait_blocking(&version_id)
.ok_or(ResolveError::Unregistered)?;
let metadata = match &*response {
MetadataResponse::Found(archive) => &archive.metadata,
MetadataResponse::Offline => {
self.incomplete_packages
.borrow_mut()
.entry(package_name.clone())
.or_default()
.borrow_mut()
.insert(version.clone(), IncompletePackage::Offline);
return Ok(Dependencies::Unavailable(UnavailableVersion::Offline));
}
MetadataResponse::InvalidMetadata(err) => {
warn!("Unable to extract metadata for {package_name}: {err}");
self.incomplete_packages
.borrow_mut()
.entry(package_name.clone())
.or_default()
.borrow_mut()
.insert(
version.clone(),
IncompletePackage::InvalidMetadata(err.to_string()),
@ -1081,10 +1083,8 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
MetadataResponse::InconsistentMetadata(err) => {
warn!("Unable to extract metadata for {package_name}: {err}");
self.incomplete_packages
.borrow_mut()
.entry(package_name.clone())
.or_default()
.borrow_mut()
.insert(
version.clone(),
IncompletePackage::InconsistentMetadata(err.to_string()),
@ -1096,10 +1096,8 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
MetadataResponse::InvalidStructure(err) => {
warn!("Unable to extract metadata for {package_name}: {err}");
self.incomplete_packages
.borrow_mut()
.entry(package_name.clone())
.or_default()
.borrow_mut()
.insert(
version.clone(),
IncompletePackage::InvalidStructure(err.to_string()),
@ -1124,7 +1122,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
extra.as_ref(),
&self.urls,
&self.locals,
self.markers,
self.markers.as_ref(),
)?;
for (dep_package, dep_version) in dependencies.iter() {
@ -1134,7 +1132,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
priorities.insert(dep_package, dep_version);
// Emit a request to fetch the metadata for this package.
self.visit_package(dep_package, request_sink).await?;
self.visit_package(dep_package, request_sink)?;
}
Ok(Dependencies::Available(dependencies.into()))
@ -1155,12 +1153,13 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
}
/// Fetch the metadata for a stream of packages and versions.
async fn fetch(
&self,
request_stream: tokio::sync::mpsc::Receiver<Request>,
async fn fetch<Provider: ResolverProvider>(
self: Arc<Self>,
provider: Arc<Provider>,
request_stream: Receiver<Request>,
) -> Result<(), ResolveError> {
let mut response_stream = ReceiverStream::new(request_stream)
.map(|request| self.process_request(request).boxed_local())
.map(|request| self.process_request(request, &*provider).boxed_local())
// Allow as many futures as possible to start in the background.
// Backpressure is provided by at a more granular level by `DistributionDatabase`
// and `SourceDispatch`, as well as the bounded request channel.
@ -1170,13 +1169,15 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
match response? {
Some(Response::Package(package_name, version_map)) => {
trace!("Received package metadata for: {package_name}");
self.index.packages.done(package_name, Rc::new(version_map));
self.index
.packages()
.done(package_name, Arc::new(version_map));
}
Some(Response::Installed { dist, metadata }) => {
trace!("Received installed distribution metadata for: {dist}");
self.index.distributions.done(
self.index.distributions().done(
dist.version_id(),
Rc::new(MetadataResponse::Found(ArchiveMetadata::from(metadata))),
Arc::new(MetadataResponse::Found(ArchiveMetadata::from(metadata))),
);
}
Some(Response::Dist {
@ -1194,8 +1195,8 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
_ => {}
}
self.index
.distributions
.done(dist.version_id(), Rc::new(metadata));
.distributions()
.done(dist.version_id(), Arc::new(metadata));
}
Some(Response::Dist {
dist: Dist::Source(dist),
@ -1212,8 +1213,8 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
_ => {}
}
self.index
.distributions
.done(dist.version_id(), Rc::new(metadata));
.distributions()
.done(dist.version_id(), Arc::new(metadata));
}
None => {}
}
@ -1223,12 +1224,15 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
}
#[instrument(skip_all, fields(%request))]
async fn process_request(&self, request: Request) -> Result<Option<Response>, ResolveError> {
async fn process_request<Provider: ResolverProvider>(
&self,
request: Request,
provider: &Provider,
) -> Result<Option<Response>, ResolveError> {
match request {
// Fetch package metadata from the registry.
Request::Package(package_name) => {
let package_versions = self
.provider
let package_versions = provider
.get_package_versions(&package_name)
.boxed_local()
.await
@ -1239,8 +1243,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
// Fetch distribution metadata from the distribution database.
Request::Dist(dist) => {
let metadata = self
.provider
let metadata = provider
.get_or_build_wheel_metadata(&dist)
.boxed_local()
.await
@ -1274,7 +1277,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
// Wait for the package metadata to become available.
let versions_response = self
.index
.packages
.packages()
.wait(&package_name)
.await
.ok_or(ResolveError::Unregistered)?;
@ -1284,21 +1287,18 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
// Short-circuit if we did not find any versions for the package
VersionsResponse::NoIndex => {
self.unavailable_packages
.borrow_mut()
.insert(package_name.clone(), UnavailablePackage::NoIndex);
return Ok(None);
}
VersionsResponse::Offline => {
self.unavailable_packages
.borrow_mut()
.insert(package_name.clone(), UnavailablePackage::Offline);
return Ok(None);
}
VersionsResponse::NotFound => {
self.unavailable_packages
.borrow_mut()
.insert(package_name.clone(), UnavailablePackage::NotFound);
return Ok(None);
@ -1312,7 +1312,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
&range,
version_map,
&self.preferences,
self.installed_packages,
&self.installed_packages,
&self.exclusions,
) else {
return Ok(None);
@ -1324,13 +1324,12 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
};
// Emit a request to fetch the metadata for this version.
if self.index.distributions.register(candidate.version_id()) {
if self.index.distributions().register(candidate.version_id()) {
let dist = dist.for_resolution().to_owned();
let response = match dist {
ResolvedDist::Installable(dist) => {
let metadata = self
.provider
let metadata = provider
.get_or_build_wheel_metadata(&dist)
.boxed_local()
.await
@ -1394,7 +1393,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide
/// State that is used during unit propagation in the resolver.
#[derive(Clone)]
struct ResolverState {
struct SolveState {
/// The internal state used by the resolver.
///
/// Note that not all parts of this state are strictly internal. For

View file

@ -145,7 +145,7 @@ async fn resolve(
&index,
&hashes,
&build_context,
&installed_packages,
installed_packages,
DistributionDatabase::new(&client, &build_context, concurrency.downloads),
)?;
Ok(resolver.resolve().await?)

View file

@ -128,12 +128,13 @@ pub trait SourceBuildTrait {
}
/// A wrapper for [`uv_installer::SitePackages`]
pub trait InstalledPackagesProvider {
pub trait InstalledPackagesProvider: Clone + Send + Sync + 'static {
fn iter(&self) -> impl Iterator<Item = &InstalledDist>;
fn get_packages(&self, name: &PackageName) -> Vec<&InstalledDist>;
}
/// An [`InstalledPackagesProvider`] with no packages in it.
#[derive(Clone)]
pub struct EmptyInstalledPackages;
impl InstalledPackagesProvider for EmptyInstalledPackages {

View file

@ -546,7 +546,7 @@ pub(crate) async fn pip_compile(
&top_level_index,
&hasher,
&build_dispatch,
&EmptyInstalledPackages,
EmptyInstalledPackages,
DistributionDatabase::new(&client, &build_dispatch, concurrency.downloads),
)?
.with_reporter(ResolverReporter::from(printer));

View file

@ -43,7 +43,7 @@ impl ResolvedEditables {
#[allow(clippy::too_many_arguments)]
pub(crate) async fn resolve(
editables: Vec<EditableRequirement>,
site_packages: &SitePackages<'_>,
site_packages: &SitePackages,
reinstall: &Reinstall,
hasher: &HashStrategy,
interpreter: &Interpreter,

View file

@ -427,7 +427,7 @@ pub(crate) async fn pip_install(
project,
&editables,
&hasher,
&site_packages,
site_packages.clone(),
&reinstall,
&upgrade,
&interpreter,
@ -574,7 +574,7 @@ async fn resolve(
project: Option<PackageName>,
editables: &[ResolvedEditable],
hasher: &HashStrategy,
site_packages: &SitePackages<'_>,
site_packages: SitePackages,
reinstall: &Reinstall,
upgrade: &Upgrade,
interpreter: &Interpreter,
@ -733,7 +733,7 @@ async fn resolve(
async fn install(
resolution: &Resolution,
editables: &[ResolvedEditable],
site_packages: SitePackages<'_>,
site_packages: SitePackages,
reinstall: &Reinstall,
no_binary: &NoBinary,
link_mode: LinkMode,

View file

@ -381,7 +381,7 @@ pub(crate) async fn pip_sync(
&hasher,
&build_dispatch,
// TODO(zanieb): We should consider support for installed packages in pip sync
&EmptyInstalledPackages,
EmptyInstalledPackages,
DistributionDatabase::new(&client, &build_dispatch, concurrency.downloads),
)?
.with_reporter(reporter);

View file

@ -111,7 +111,7 @@ pub(crate) async fn lock(
// Resolve the requirements.
let resolution = project::resolve(
spec,
&EmptyInstalledPackages,
EmptyInstalledPackages,
&hasher,
&interpreter,
tags,

View file

@ -115,7 +115,7 @@ pub(crate) fn init(
#[allow(clippy::too_many_arguments)]
pub(crate) async fn resolve<InstalledPackages: InstalledPackagesProvider>(
spec: RequirementsSpecification,
installed_packages: &InstalledPackages,
installed_packages: InstalledPackages,
hasher: &HashStrategy,
interpreter: &Interpreter,
tags: &Tags,
@ -242,7 +242,7 @@ pub(crate) async fn resolve<InstalledPackages: InstalledPackagesProvider>(
#[allow(clippy::too_many_arguments)]
pub(crate) async fn install(
resolution: &Resolution,
site_packages: SitePackages<'_>,
site_packages: SitePackages,
no_binary: &NoBinary,
link_mode: LinkMode,
index_urls: &IndexLocations,

View file

@ -292,7 +292,7 @@ async fn update_environment(
// Resolve the requirements.
let resolution = match project::resolve(
spec,
&site_packages,
site_packages.clone(),
&hasher,
&interpreter,
tags,