From 94cf60457466d1234069c52b19e1bab545129c99 Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Mon, 6 May 2024 22:30:43 -0400 Subject: [PATCH] Remove unnecessary uses of `DashMap` and `Arc` (#3413) ## Summary All of the resolver code is run on the main thread, so a lot of the `Send` bounds and uses of `DashMap` and `Arc` are unnecessary. We could also switch to using single-threaded versions of `Mutex` and `Notify` in some places, but there isn't really a crate that provides those I would be comfortable with using. The `Arc` in `OnceMap` can't easily be removed because of the uv-auth code which uses the [reqwest-middleware](https://docs.rs/reqwest-middleware/latest/reqwest_middleware/trait.Middleware.html) crate, that seems to adds unnecessary `Send` bounds because of `async-trait`. We could duplicate the code and create a `OnceMapLocal` variant, but I don't feel that's worth it. --- crates/uv-build/src/lib.rs | 8 +- crates/uv-client/src/cached_client.rs | 32 ++++--- crates/uv-client/src/flat_index.rs | 2 +- crates/uv-client/src/registry_client.rs | 4 +- crates/uv-dispatch/src/lib.rs | 2 +- .../src/distribution_database.rs | 15 ++-- crates/uv-distribution/src/locks.rs | 8 +- crates/uv-distribution/src/source/mod.rs | 54 +++++------ crates/uv-installer/src/downloader.rs | 8 +- crates/uv-requirements/src/lookahead.rs | 4 +- crates/uv-requirements/src/source_tree.rs | 4 +- crates/uv-requirements/src/unnamed.rs | 4 +- crates/uv-resolver/src/error.rs | 27 +++--- crates/uv-resolver/src/resolution.rs | 6 +- crates/uv-resolver/src/resolver/index.rs | 14 +-- crates/uv-resolver/src/resolver/mod.rs | 89 +++++++++++-------- crates/uv-resolver/src/resolver/provider.rs | 14 ++- crates/uv-types/src/traits.rs | 15 ++-- 18 files changed, 165 insertions(+), 145 deletions(-) diff --git a/crates/uv-build/src/lib.rs b/crates/uv-build/src/lib.rs index 488bc23fe..32ae5075d 100644 --- a/crates/uv-build/src/lib.rs +++ b/crates/uv-build/src/lib.rs @@ -7,8 +7,8 @@ use std::fmt::{Display, Formatter}; use std::io; use std::path::{Path, PathBuf}; use std::process::{ExitStatus, Output}; +use std::rc::Rc; use std::str::FromStr; -use std::sync::Arc; use std::{env, iter}; use fs_err as fs; @@ -335,13 +335,13 @@ impl Pep517Backend { } } -/// Uses an [`Arc`] internally, clone freely. +/// Uses an [`Rc`] internally, clone freely. #[derive(Debug, Default, Clone)] pub struct SourceBuildContext { /// An in-memory resolution of the default backend's requirements for PEP 517 builds. - default_resolution: Arc>>, + default_resolution: Rc>>, /// An in-memory resolution of the build requirements for `--legacy-setup-py` builds. - setup_py_resolution: Arc>>, + setup_py_resolution: Rc>>, } /// Holds the state through a series of PEP 517 frontend to backend calls or a single setup.py diff --git a/crates/uv-client/src/cached_client.rs b/crates/uv-client/src/cached_client.rs index 672785328..2cb65431b 100644 --- a/crates/uv-client/src/cached_client.rs +++ b/crates/uv-client/src/cached_client.rs @@ -30,7 +30,7 @@ use crate::{ /// `CachedClient::get_cacheable`. If your types fit into the /// `rkyvutil::OwnedArchive` mold, then an implementation of `Cacheable` is /// already provided for that type. -pub trait Cacheable: Sized + Send { +pub trait Cacheable: Sized { /// This associated type permits customizing what the "output" type of /// deserialization is. It can be identical to `Self`. /// @@ -54,7 +54,7 @@ pub struct SerdeCacheable { inner: T, } -impl Cacheable for SerdeCacheable { +impl Cacheable for SerdeCacheable { type Target = T; fn from_aligned_bytes(bytes: AlignedVec) -> Result { @@ -75,7 +75,7 @@ impl Cacheable for SerdeCacheable { /// All `OwnedArchive` values are cacheable. impl Cacheable for OwnedArchive where - A: rkyv::Archive + rkyv::Serialize> + Send, + A: rkyv::Archive + rkyv::Serialize>, A::Archived: for<'a> rkyv::CheckBytes> + rkyv::Deserialize, { @@ -179,7 +179,7 @@ impl CachedClient { /// allowed to make subsequent requests, e.g. through the uncached client. #[instrument(skip_all)] pub async fn get_serde< - Payload: Serialize + DeserializeOwned + Send + 'static, + Payload: Serialize + DeserializeOwned + 'static, CallBackError, Callback, CallbackReturn, @@ -191,8 +191,8 @@ impl CachedClient { response_callback: Callback, ) -> Result> where - Callback: FnOnce(Response) -> CallbackReturn + Send, - CallbackReturn: Future> + Send, + Callback: FnOnce(Response) -> CallbackReturn, + CallbackReturn: Future>, { let payload = self .get_cacheable(req, cache_entry, cache_control, move |resp| async { @@ -225,11 +225,15 @@ impl CachedClient { ) -> Result> where Callback: FnOnce(Response) -> CallbackReturn, - CallbackReturn: Future> + Send, + CallbackReturn: Future>, { let fresh_req = req.try_clone().expect("HTTP request must be cloneable"); let cached_response = match Self::read_cache(cache_entry).await { - Some(cached) => self.send_cached(req, cache_control, cached).boxed().await?, + Some(cached) => { + self.send_cached(req, cache_control, cached) + .boxed_local() + .await? + } None => { debug!("No cache entry for: {}", req.url()); let (response, cache_policy) = self.fresh_request(req).await?; @@ -301,7 +305,7 @@ impl CachedClient { /// Make a request without checking whether the cache is fresh. pub async fn skip_cache< - Payload: Serialize + DeserializeOwned + Send + 'static, + Payload: Serialize + DeserializeOwned + 'static, CallBackError, Callback, CallbackReturn, @@ -312,8 +316,8 @@ impl CachedClient { response_callback: Callback, ) -> Result> where - Callback: FnOnce(Response) -> CallbackReturn + Send, - CallbackReturn: Future> + Send, + Callback: FnOnce(Response) -> CallbackReturn, + CallbackReturn: Future>, { let (response, cache_policy) = self.fresh_request(req).await?; @@ -335,7 +339,7 @@ impl CachedClient { ) -> Result> where Callback: FnOnce(Response) -> CallbackReturn, - CallbackReturn: Future> + Send, + CallbackReturn: Future>, { let _ = fs_err::tokio::remove_file(&cache_entry.path()).await; let (response, cache_policy) = self.fresh_request(req).await?; @@ -352,11 +356,11 @@ impl CachedClient { ) -> Result> where Callback: FnOnce(Response) -> CallbackReturn, - CallbackReturn: Future> + Send, + CallbackReturn: Future>, { let new_cache = info_span!("new_cache", file = %cache_entry.path().display()); let data = response_callback(response) - .boxed() + .boxed_local() .await .map_err(|err| CachedClientError::Callback(err))?; let Some(cache_policy) = cache_policy else { diff --git a/crates/uv-client/src/flat_index.rs b/crates/uv-client/src/flat_index.rs index a468cc640..1973396b9 100644 --- a/crates/uv-client/src/flat_index.rs +++ b/crates/uv-client/src/flat_index.rs @@ -166,7 +166,7 @@ impl<'a> FlatIndexClient<'a> { .collect(); Ok::, CachedClientError>(files) } - .boxed() + .boxed_local() .instrument(info_span!("parse_flat_index_html", url = % url)) }; let response = self diff --git a/crates/uv-client/src/registry_client.rs b/crates/uv-client/src/registry_client.rs index 69613b529..3b0538df3 100644 --- a/crates/uv-client/src/registry_client.rs +++ b/crates/uv-client/src/registry_client.rs @@ -345,7 +345,7 @@ impl RegistryClient { }; OwnedArchive::from_unarchived(&unarchived) } - .boxed() + .boxed_local() .instrument(info_span!("parse_simple_api", package = %package_name)) }; let result = self @@ -534,7 +534,7 @@ impl RegistryClient { })?; Ok::>(metadata) } - .boxed() + .boxed_local() .instrument(info_span!("read_metadata_range_request", wheel = %filename)) }; diff --git a/crates/uv-dispatch/src/lib.rs b/crates/uv-dispatch/src/lib.rs index 3cfbe9dd8..15e1952c6 100644 --- a/crates/uv-dispatch/src/lib.rs +++ b/crates/uv-dispatch/src/lib.rs @@ -314,7 +314,7 @@ impl<'a> BuildContext for BuildDispatch<'a> { build_kind, self.build_extra_env_vars.clone(), ) - .boxed() + .boxed_local() .await?; Ok(builder) } diff --git a/crates/uv-distribution/src/distribution_database.rs b/crates/uv-distribution/src/distribution_database.rs index 67bc80cb5..c97f9bdce 100644 --- a/crates/uv-distribution/src/distribution_database.rs +++ b/crates/uv-distribution/src/distribution_database.rs @@ -1,5 +1,6 @@ use std::io; use std::path::Path; +use std::rc::Rc; use std::sync::Arc; use futures::{FutureExt, TryStreamExt}; @@ -41,20 +42,20 @@ use crate::{ArchiveMetadata, Error, LocalWheel, Reporter, SourceDistributionBuil /// /// This struct also has the task of acquiring locks around source dist builds in general and git /// operation especially. -pub struct DistributionDatabase<'a, Context: BuildContext + Send + Sync> { +pub struct DistributionDatabase<'a, Context: BuildContext> { client: &'a RegistryClient, build_context: &'a Context, builder: SourceDistributionBuilder<'a, Context>, - locks: Arc, + locks: Rc, } -impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> { +impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { pub fn new(client: &'a RegistryClient, build_context: &'a Context) -> Self { Self { client, build_context, builder: SourceDistributionBuilder::new(client, build_context), - locks: Arc::new(Locks::default()), + locks: Rc::new(Locks::default()), } } @@ -307,7 +308,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> let built_wheel = self .builder .download_and_build(&BuildableSource::Dist(dist), tags, hashes) - .boxed() + .boxed_local() .await?; // If the wheel was unzipped previously, respect it. Source distributions are @@ -360,7 +361,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> return Ok(ArchiveMetadata { metadata, hashes }); } - match self.client.wheel_metadata(dist).boxed().await { + match self.client.wheel_metadata(dist).boxed_local().await { Ok(metadata) => Ok(ArchiveMetadata::from(metadata)), Err(err) if err.is_http_streaming_unsupported() => { warn!("Streaming unsupported when fetching metadata for {dist}; downloading wheel directly ({err})"); @@ -404,7 +405,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> let metadata = self .builder .download_and_build_metadata(source, hashes) - .boxed() + .boxed_local() .await?; Ok(metadata) } diff --git a/crates/uv-distribution/src/locks.rs b/crates/uv-distribution/src/locks.rs index 90f72d5b0..0e3deda4a 100644 --- a/crates/uv-distribution/src/locks.rs +++ b/crates/uv-distribution/src/locks.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::rc::Rc; use rustc_hash::FxHashMap; use tokio::sync::Mutex; @@ -7,14 +7,14 @@ use distribution_types::{Identifier, ResourceId}; /// A set of locks used to prevent concurrent access to the same resource. #[derive(Debug, Default)] -pub(crate) struct Locks(Mutex>>>); +pub(crate) struct Locks(Mutex>>>); impl Locks { /// Acquire a lock on the given resource. - pub(crate) async fn acquire(&self, dist: &impl Identifier) -> Arc> { + pub(crate) async fn acquire(&self, dist: &impl Identifier) -> Rc> { let mut map = self.0.lock().await; map.entry(dist.resource_id()) - .or_insert_with(|| Arc::new(Mutex::new(()))) + .or_insert_with(|| Rc::new(Mutex::new(()))) .clone() } } diff --git a/crates/uv-distribution/src/source/mod.rs b/crates/uv-distribution/src/source/mod.rs index b5fc03507..92bc2cfc7 100644 --- a/crates/uv-distribution/src/source/mod.rs +++ b/crates/uv-distribution/src/source/mod.rs @@ -116,7 +116,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { tags, hashes, ) - .boxed() + .boxed_local() .await; } }; @@ -130,7 +130,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { tags, hashes, ) - .boxed() + .boxed_local() .await? } BuildableSource::Dist(SourceDist::DirectUrl(dist)) => { @@ -153,18 +153,18 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { tags, hashes, ) - .boxed() + .boxed_local() .await? } BuildableSource::Dist(SourceDist::Git(dist)) => { self.git(source, &GitSourceUrl::from(dist), tags, hashes) - .boxed() + .boxed_local() .await? } BuildableSource::Dist(SourceDist::Path(dist)) => { if dist.path.is_dir() { self.source_tree(source, &PathSourceUrl::from(dist), tags, hashes) - .boxed() + .boxed_local() .await? } else { let cache_shard = self @@ -178,7 +178,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { tags, hashes, ) - .boxed() + .boxed_local() .await? } } @@ -205,16 +205,18 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { tags, hashes, ) - .boxed() + .boxed_local() .await? } BuildableSource::Url(SourceUrl::Git(resource)) => { - self.git(source, resource, tags, hashes).boxed().await? + self.git(source, resource, tags, hashes) + .boxed_local() + .await? } BuildableSource::Url(SourceUrl::Path(resource)) => { if resource.path.is_dir() { self.source_tree(source, resource, tags, hashes) - .boxed() + .boxed_local() .await? } else { let cache_shard = self.build_context.cache().shard( @@ -222,7 +224,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { WheelCache::Path(resource.url).root(), ); self.archive(source, resource, &cache_shard, tags, hashes) - .boxed() + .boxed_local() .await? } } @@ -268,7 +270,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { &cache_shard, hashes, ) - .boxed() + .boxed_local() .await; } }; @@ -281,7 +283,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { None, hashes, ) - .boxed() + .boxed_local() .await? } BuildableSource::Dist(SourceDist::DirectUrl(dist)) => { @@ -303,18 +305,18 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { subdirectory.as_deref(), hashes, ) - .boxed() + .boxed_local() .await? } BuildableSource::Dist(SourceDist::Git(dist)) => { self.git_metadata(source, &GitSourceUrl::from(dist), hashes) - .boxed() + .boxed_local() .await? } BuildableSource::Dist(SourceDist::Path(dist)) => { if dist.path.is_dir() { self.source_tree_metadata(source, &PathSourceUrl::from(dist), hashes) - .boxed() + .boxed_local() .await? } else { let cache_shard = self @@ -322,7 +324,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { .cache() .shard(CacheBucket::BuiltWheels, WheelCache::Path(&dist.url).root()); self.archive_metadata(source, &PathSourceUrl::from(dist), &cache_shard, hashes) - .boxed() + .boxed_local() .await? } } @@ -348,16 +350,18 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { subdirectory.as_deref(), hashes, ) - .boxed() + .boxed_local() .await? } BuildableSource::Url(SourceUrl::Git(resource)) => { - self.git_metadata(source, resource, hashes).boxed().await? + self.git_metadata(source, resource, hashes) + .boxed_local() + .await? } BuildableSource::Url(SourceUrl::Path(resource)) => { if resource.path.is_dir() { self.source_tree_metadata(source, resource, hashes) - .boxed() + .boxed_local() .await? } else { let cache_shard = self.build_context.cache().shard( @@ -365,7 +369,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { WheelCache::Path(resource.url).root(), ); self.archive_metadata(source, resource, &cache_shard, hashes) - .boxed() + .boxed_local() .await? } } @@ -488,7 +492,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { // If the backend supports `prepare_metadata_for_build_wheel`, use it. if let Some(metadata) = self .build_metadata(source, source_dist_entry.path(), subdirectory) - .boxed() + .boxed_local() .await? { // Store the metadata. @@ -569,7 +573,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { Ok(revision.with_hashes(hashes)) } - .boxed() + .boxed_local() .instrument(info_span!("download", source_dist = %source)) }; let req = self.request(url.clone())?; @@ -706,7 +710,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { // If the backend supports `prepare_metadata_for_build_wheel`, use it. if let Some(metadata) = self .build_metadata(source, source_entry.path(), None) - .boxed() + .boxed_local() .await? { // Store the metadata. @@ -903,7 +907,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { // If the backend supports `prepare_metadata_for_build_wheel`, use it. if let Some(metadata) = self .build_metadata(source, &resource.path, None) - .boxed() + .boxed_local() .await? { // Store the metadata. @@ -1110,7 +1114,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { // If the backend supports `prepare_metadata_for_build_wheel`, use it. if let Some(metadata) = self .build_metadata(source, fetch.path(), subdirectory.as_deref()) - .boxed() + .boxed_local() .await? { // Store the metadata. diff --git a/crates/uv-installer/src/downloader.rs b/crates/uv-installer/src/downloader.rs index e83d0bec8..746355638 100644 --- a/crates/uv-installer/src/downloader.rs +++ b/crates/uv-installer/src/downloader.rs @@ -37,7 +37,7 @@ pub enum Error { } /// Download, build, and unzip a set of distributions. -pub struct Downloader<'a, Context: BuildContext + Send + Sync> { +pub struct Downloader<'a, Context: BuildContext> { tags: &'a Tags, cache: &'a Cache, hashes: &'a HashStrategy, @@ -45,7 +45,7 @@ pub struct Downloader<'a, Context: BuildContext + Send + Sync> { reporter: Option>, } -impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { +impl<'a, Context: BuildContext> Downloader<'a, Context> { pub fn new( cache: &'a Cache, tags: &'a Tags, @@ -83,7 +83,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { ) -> impl Stream> + 'stream { futures::stream::iter(distributions) .map(|dist| async { - let wheel = self.get_wheel(dist, in_flight).boxed().await?; + let wheel = self.get_wheel(dist, in_flight).boxed_local().await?; if let Some(reporter) = self.reporter.as_ref() { reporter.on_progress(&wheel); } @@ -174,7 +174,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { let result = self .database .get_or_build_wheel(&dist, self.tags, policy) - .boxed() + .boxed_local() .map_err(|err| Error::Fetch(dist.clone(), err)) .await .and_then(|wheel: LocalWheel| { diff --git a/crates/uv-requirements/src/lookahead.rs b/crates/uv-requirements/src/lookahead.rs index 49cb86ae3..bb96a790c 100644 --- a/crates/uv-requirements/src/lookahead.rs +++ b/crates/uv-requirements/src/lookahead.rs @@ -45,7 +45,7 @@ pub enum LookaheadError { /// possible because a direct URL points to a _specific_ version of a package, and so we know that /// any correct resolution will _have_ to include it (unlike with PyPI dependencies, which may /// require a range of versions and backtracking). -pub struct LookaheadResolver<'a, Context: BuildContext + Send + Sync> { +pub struct LookaheadResolver<'a, Context: BuildContext> { /// The direct requirements for the project. requirements: &'a [Requirement], /// The constraints for the project. @@ -62,7 +62,7 @@ pub struct LookaheadResolver<'a, Context: BuildContext + Send + Sync> { database: DistributionDatabase<'a, Context>, } -impl<'a, Context: BuildContext + Send + Sync> LookaheadResolver<'a, Context> { +impl<'a, Context: BuildContext> LookaheadResolver<'a, Context> { /// Instantiate a new [`LookaheadResolver`] for a given set of requirements. #[allow(clippy::too_many_arguments)] pub fn new( diff --git a/crates/uv-requirements/src/source_tree.rs b/crates/uv-requirements/src/source_tree.rs index 0f2f0b76a..6da664f82 100644 --- a/crates/uv-requirements/src/source_tree.rs +++ b/crates/uv-requirements/src/source_tree.rs @@ -21,7 +21,7 @@ use crate::ExtrasSpecification; /// /// Used, e.g., to determine the input requirements when a user specifies a `pyproject.toml` /// file, which may require running PEP 517 build hooks to extract metadata. -pub struct SourceTreeResolver<'a, Context: BuildContext + Send + Sync> { +pub struct SourceTreeResolver<'a, Context: BuildContext> { /// The requirements for the project. source_trees: Vec, /// The extras to include when resolving requirements. @@ -34,7 +34,7 @@ pub struct SourceTreeResolver<'a, Context: BuildContext + Send + Sync> { database: DistributionDatabase<'a, Context>, } -impl<'a, Context: BuildContext + Send + Sync> SourceTreeResolver<'a, Context> { +impl<'a, Context: BuildContext> SourceTreeResolver<'a, Context> { /// Instantiate a new [`SourceTreeResolver`] for a given set of `source_trees`. pub fn new( source_trees: Vec, diff --git a/crates/uv-requirements/src/unnamed.rs b/crates/uv-requirements/src/unnamed.rs index c20b9274e..4ef0f5aff 100644 --- a/crates/uv-requirements/src/unnamed.rs +++ b/crates/uv-requirements/src/unnamed.rs @@ -22,7 +22,7 @@ use uv_resolver::{InMemoryIndex, MetadataResponse}; use uv_types::{BuildContext, HashStrategy}; /// Like [`RequirementsSpecification`], but with concrete names for all requirements. -pub struct NamedRequirementsResolver<'a, Context: BuildContext + Send + Sync> { +pub struct NamedRequirementsResolver<'a, Context: BuildContext> { /// The requirements for the project. requirements: Vec, /// Whether to check hashes for distributions. @@ -33,7 +33,7 @@ pub struct NamedRequirementsResolver<'a, Context: BuildContext + Send + Sync> { database: DistributionDatabase<'a, Context>, } -impl<'a, Context: BuildContext + Send + Sync> NamedRequirementsResolver<'a, Context> { +impl<'a, Context: BuildContext> NamedRequirementsResolver<'a, Context> { /// Instantiate a new [`NamedRequirementsResolver`] for a given set of requirements. pub fn new( requirements: Vec, diff --git a/crates/uv-resolver/src/error.rs b/crates/uv-resolver/src/error.rs index e135ebb95..6acafedd5 100644 --- a/crates/uv-resolver/src/error.rs +++ b/crates/uv-resolver/src/error.rs @@ -1,9 +1,9 @@ use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Formatter; use std::ops::Deref; +use std::rc::Rc; use std::sync::Arc; -use dashmap::{DashMap, DashSet}; use indexmap::IndexMap; use pubgrub::range::Range; use pubgrub::report::{DefaultStringReporter, DerivationTree, External, Reporter}; @@ -22,7 +22,9 @@ use crate::candidate_selector::CandidateSelector; use crate::dependency_provider::UvDependencyProvider; use crate::pubgrub::{PubGrubPackage, PubGrubPython, PubGrubReportFormatter}; use crate::python_requirement::PythonRequirement; -use crate::resolver::{IncompletePackage, UnavailablePackage, VersionsResponse}; +use crate::resolver::{ + IncompletePackage, SharedMap, SharedSet, UnavailablePackage, VersionsResponse, +}; #[derive(Debug, thiserror::Error)] pub enum ResolveError { @@ -236,8 +238,8 @@ impl NoSolutionError { pub(crate) fn with_available_versions( mut self, python_requirement: &PythonRequirement, - visited: &DashSet, - package_versions: &OnceMap>, + visited: &SharedSet, + package_versions: &OnceMap>, ) -> Self { let mut available_versions = IndexMap::default(); for package in self.derivation_tree.packages() { @@ -261,7 +263,7 @@ impl NoSolutionError { // tree, but were never visited during resolution. We _may_ have metadata for // these packages, but it's non-deterministic, and omitting them ensures that // we represent the state of the resolver at the time of failure. - if visited.contains(name) { + if visited.borrow().contains(name) { if let Some(response) = package_versions.get(name) { if let VersionsResponse::Found(ref version_maps) = *response { for version_map in version_maps { @@ -300,13 +302,13 @@ impl NoSolutionError { #[must_use] pub(crate) fn with_unavailable_packages( mut self, - unavailable_packages: &DashMap, + unavailable_packages: &SharedMap, ) -> Self { + let unavailable_packages = unavailable_packages.borrow(); let mut new = FxHashMap::default(); for package in self.derivation_tree.packages() { if let PubGrubPackage::Package(name, _, _) = package { - if let Some(entry) = unavailable_packages.get(name) { - let reason = entry.value(); + if let Some(reason) = unavailable_packages.get(name) { new.insert(name.clone(), reason.clone()); } } @@ -319,15 +321,14 @@ impl NoSolutionError { #[must_use] pub(crate) fn with_incomplete_packages( mut self, - incomplete_packages: &DashMap>, + incomplete_packages: &SharedMap>, ) -> Self { let mut new = FxHashMap::default(); + let incomplete_packages = incomplete_packages.borrow(); for package in self.derivation_tree.packages() { if let PubGrubPackage::Package(name, _, _) = package { - if let Some(entry) = incomplete_packages.get(name) { - let versions = entry.value(); - for entry in versions { - let (version, reason) = entry.pair(); + if let Some(versions) = incomplete_packages.get(name) { + for (version, reason) in versions.borrow().iter() { new.entry(name.clone()) .or_insert_with(BTreeMap::default) .insert(version.clone(), reason.clone()); diff --git a/crates/uv-resolver/src/resolution.rs b/crates/uv-resolver/src/resolution.rs index 77e6d64b0..3407cd483 100644 --- a/crates/uv-resolver/src/resolution.rs +++ b/crates/uv-resolver/src/resolution.rs @@ -1,6 +1,6 @@ use std::borrow::Cow; use std::hash::BuildHasherDefault; -use std::sync::Arc; +use std::rc::Rc; use anyhow::Result; use itertools::Itertools; @@ -69,8 +69,8 @@ impl ResolutionGraph { pub(crate) fn from_state( selection: &SelectedDependencies, pins: &FilePins, - packages: &OnceMap>, - distributions: &OnceMap>, + packages: &OnceMap>, + distributions: &OnceMap>, state: &State, preferences: &Preferences, editables: Editables, diff --git a/crates/uv-resolver/src/resolver/index.rs b/crates/uv-resolver/src/resolver/index.rs index aae0a3129..8f06bbd07 100644 --- a/crates/uv-resolver/src/resolver/index.rs +++ b/crates/uv-resolver/src/resolver/index.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::rc::Rc; use distribution_types::VersionId; use once_map::OnceMap; @@ -11,30 +11,30 @@ use crate::resolver::provider::{MetadataResponse, VersionsResponse}; pub struct InMemoryIndex { /// A map from package name to the metadata for that package and the index where the metadata /// came from. - pub(crate) packages: OnceMap>, + pub(crate) packages: OnceMap>, /// A map from package ID to metadata for that distribution. - pub(crate) distributions: OnceMap>, + pub(crate) distributions: OnceMap>, } impl InMemoryIndex { /// Insert a [`VersionsResponse`] into the index. pub fn insert_package(&self, package_name: PackageName, response: VersionsResponse) { - self.packages.done(package_name, Arc::new(response)); + self.packages.done(package_name, Rc::new(response)); } /// Insert a [`Metadata23`] into the index. pub fn insert_metadata(&self, version_id: VersionId, response: MetadataResponse) { - self.distributions.done(version_id, Arc::new(response)); + self.distributions.done(version_id, Rc::new(response)); } /// Get the [`VersionsResponse`] for a given package name, without waiting. - pub fn get_package(&self, package_name: &PackageName) -> Option> { + pub fn get_package(&self, package_name: &PackageName) -> Option> { self.packages.get(package_name) } /// Get the [`MetadataResponse`] for a given package ID, without waiting. - pub fn get_metadata(&self, version_id: &VersionId) -> Option> { + pub fn get_metadata(&self, version_id: &VersionId) -> Option> { self.distributions.get(version_id) } } diff --git a/crates/uv-resolver/src/resolver/mod.rs b/crates/uv-resolver/src/resolver/mod.rs index 3879ec168..2a7da763e 100644 --- a/crates/uv-resolver/src/resolver/mod.rs +++ b/crates/uv-resolver/src/resolver/mod.rs @@ -1,12 +1,14 @@ //! Given a set of requirements, find a set of compatible packages. use std::borrow::Cow; +use std::cell::RefCell; +use std::collections::{HashMap, HashSet}; use std::fmt::{Display, Formatter}; use std::ops::Deref; +use std::rc::Rc; use std::sync::Arc; use anyhow::Result; -use dashmap::{DashMap, DashSet}; use futures::{FutureExt, StreamExt}; use itertools::Itertools; use pubgrub::error::PubGrubError; @@ -107,11 +109,10 @@ enum ResolverVersion { Unavailable(Version, UnavailableVersion), } -pub struct Resolver< - 'a, - Provider: ResolverProvider, - InstalledPackages: InstalledPackagesProvider + Send + Sync, -> { +pub(crate) type SharedMap = Rc>>; +pub(crate) type SharedSet = Rc>>; + +pub struct Resolver<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvider> { project: Option, requirements: Vec, constraints: Constraints, @@ -129,20 +130,17 @@ pub struct Resolver< index: &'a InMemoryIndex, installed_packages: &'a InstalledPackages, /// Incompatibilities for packages that are entirely unavailable. - unavailable_packages: DashMap, + unavailable_packages: SharedMap, /// Incompatibilities for packages that are unavailable at specific versions. - incomplete_packages: DashMap>, + incomplete_packages: SharedMap>, /// The set of all registry-based packages visited during resolution. - visited: DashSet, + visited: SharedSet, reporter: Option>, provider: Provider, } -impl< - 'a, - Context: BuildContext + Send + Sync, - InstalledPackages: InstalledPackagesProvider + Send + Sync, - > Resolver<'a, DefaultResolverProvider<'a, Context>, InstalledPackages> +impl<'a, Context: BuildContext, InstalledPackages: InstalledPackagesProvider> + Resolver<'a, DefaultResolverProvider<'a, Context>, InstalledPackages> { /// Initialize a new resolver using the default backend doing real requests. /// @@ -186,11 +184,8 @@ impl< } } -impl< - 'a, - Provider: ResolverProvider, - InstalledPackages: InstalledPackagesProvider + Send + Sync, - > Resolver<'a, Provider, InstalledPackages> +impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvider> + Resolver<'a, Provider, InstalledPackages> { /// Initialize a new resolver using a user provided backend. #[allow(clippy::too_many_arguments)] @@ -206,9 +201,9 @@ impl< ) -> Result { Ok(Self { index, - unavailable_packages: DashMap::default(), - incomplete_packages: DashMap::default(), - visited: DashSet::default(), + unavailable_packages: SharedMap::default(), + incomplete_packages: SharedMap::default(), + visited: SharedSet::default(), selector: CandidateSelector::for_resolution(options, &manifest, markers), dependency_mode: options.dependency_mode, urls: Urls::from_manifest(&manifest, markers, options.dependency_mode)?, @@ -251,7 +246,7 @@ impl< let requests_fut = self.fetch(request_stream).fuse(); // Run the solver. - let resolve_fut = self.solve(request_sink).boxed().fuse(); + let resolve_fut = self.solve(request_sink).boxed_local().fuse(); // Wait for both to complete. match tokio::try_join!(requests_fut, resolve_fut) { @@ -368,6 +363,7 @@ impl< if let PubGrubPackage::Package(ref package_name, _, _) = next { // Check if the decision was due to the package being unavailable self.unavailable_packages + .borrow() .get(package_name) .map(|entry| match *entry { UnavailablePackage::NoIndex => { @@ -648,25 +644,26 @@ impl< MetadataResponse::Found(archive) => &archive.metadata, MetadataResponse::Offline => { self.unavailable_packages + .borrow_mut() .insert(package_name.clone(), UnavailablePackage::Offline); return Ok(None); } MetadataResponse::InvalidMetadata(err) => { - self.unavailable_packages.insert( + self.unavailable_packages.borrow_mut().insert( package_name.clone(), UnavailablePackage::InvalidMetadata(err.to_string()), ); return Ok(None); } MetadataResponse::InconsistentMetadata(err) => { - self.unavailable_packages.insert( + self.unavailable_packages.borrow_mut().insert( package_name.clone(), UnavailablePackage::InvalidMetadata(err.to_string()), ); return Ok(None); } MetadataResponse::InvalidStructure(err) => { - self.unavailable_packages.insert( + self.unavailable_packages.borrow_mut().insert( package_name.clone(), UnavailablePackage::InvalidStructure(err.to_string()), ); @@ -707,22 +704,25 @@ impl< .instrument(info_span!("package_wait", %package_name)) .await .ok_or(ResolveError::Unregistered)?; - self.visited.insert(package_name.clone()); + self.visited.borrow_mut().insert(package_name.clone()); let version_maps = match *versions_response { VersionsResponse::Found(ref version_maps) => version_maps.as_slice(), VersionsResponse::NoIndex => { self.unavailable_packages + .borrow_mut() .insert(package_name.clone(), UnavailablePackage::NoIndex); &[] } VersionsResponse::Offline => { self.unavailable_packages + .borrow_mut() .insert(package_name.clone(), UnavailablePackage::Offline); &[] } VersionsResponse::NotFound => { self.unavailable_packages + .borrow_mut() .insert(package_name.clone(), UnavailablePackage::NotFound); &[] } @@ -925,7 +925,11 @@ impl< let version_id = dist.version_id(); // If the package does not exist in the registry or locally, we cannot fetch its dependencies - if self.unavailable_packages.get(package_name).is_some() + if self + .unavailable_packages + .borrow() + .get(package_name) + .is_some() && self .installed_packages .get_packages(package_name) @@ -953,8 +957,10 @@ impl< MetadataResponse::Found(archive) => &archive.metadata, MetadataResponse::Offline => { self.incomplete_packages + .borrow_mut() .entry(package_name.clone()) .or_default() + .borrow_mut() .insert(version.clone(), IncompletePackage::Offline); return Ok(Dependencies::Unavailable( "network connectivity is disabled, but the metadata wasn't found in the cache" @@ -964,8 +970,10 @@ impl< MetadataResponse::InvalidMetadata(err) => { warn!("Unable to extract metadata for {package_name}: {err}"); self.incomplete_packages + .borrow_mut() .entry(package_name.clone()) .or_default() + .borrow_mut() .insert( version.clone(), IncompletePackage::InvalidMetadata(err.to_string()), @@ -977,8 +985,10 @@ impl< MetadataResponse::InconsistentMetadata(err) => { warn!("Unable to extract metadata for {package_name}: {err}"); self.incomplete_packages + .borrow_mut() .entry(package_name.clone()) .or_default() + .borrow_mut() .insert( version.clone(), IncompletePackage::InconsistentMetadata(err.to_string()), @@ -990,8 +1000,10 @@ impl< MetadataResponse::InvalidStructure(err) => { warn!("Unable to extract metadata for {package_name}: {err}"); self.incomplete_packages + .borrow_mut() .entry(package_name.clone()) .or_default() + .borrow_mut() .insert( version.clone(), IncompletePackage::InvalidStructure(err.to_string()), @@ -1052,22 +1064,20 @@ impl< request_stream: tokio::sync::mpsc::Receiver, ) -> Result<(), ResolveError> { let mut response_stream = ReceiverStream::new(request_stream) - .map(|request| self.process_request(request).boxed()) + .map(|request| self.process_request(request).boxed_local()) .buffer_unordered(50); while let Some(response) = response_stream.next().await { match response? { Some(Response::Package(package_name, version_map)) => { trace!("Received package metadata for: {package_name}"); - self.index - .packages - .done(package_name, Arc::new(version_map)); + self.index.packages.done(package_name, Rc::new(version_map)); } Some(Response::Installed { dist, metadata }) => { trace!("Received installed distribution metadata for: {dist}"); self.index.distributions.done( dist.version_id(), - Arc::new(MetadataResponse::Found(ArchiveMetadata::from(metadata))), + Rc::new(MetadataResponse::Found(ArchiveMetadata::from(metadata))), ); } Some(Response::Dist { @@ -1086,7 +1096,7 @@ impl< } self.index .distributions - .done(dist.version_id(), Arc::new(metadata)); + .done(dist.version_id(), Rc::new(metadata)); } Some(Response::Dist { dist: Dist::Source(dist), @@ -1104,7 +1114,7 @@ impl< } self.index .distributions - .done(dist.version_id(), Arc::new(metadata)); + .done(dist.version_id(), Rc::new(metadata)); } None => {} } @@ -1121,7 +1131,7 @@ impl< let package_versions = self .provider .get_package_versions(&package_name) - .boxed() + .boxed_local() .await .map_err(ResolveError::Client)?; @@ -1133,7 +1143,7 @@ impl< let metadata = self .provider .get_or_build_wheel_metadata(&dist) - .boxed() + .boxed_local() .await .map_err(|err| match dist.clone() { Dist::Built(BuiltDist::Path(built_dist)) => { @@ -1172,18 +1182,21 @@ impl< // Short-circuit if we did not find any versions for the package VersionsResponse::NoIndex => { self.unavailable_packages + .borrow_mut() .insert(package_name.clone(), UnavailablePackage::NoIndex); return Ok(None); } VersionsResponse::Offline => { self.unavailable_packages + .borrow_mut() .insert(package_name.clone(), UnavailablePackage::Offline); return Ok(None); } VersionsResponse::NotFound => { self.unavailable_packages + .borrow_mut() .insert(package_name.clone(), UnavailablePackage::NotFound); return Ok(None); @@ -1217,7 +1230,7 @@ impl< let metadata = self .provider .get_or_build_wheel_metadata(&dist) - .boxed() + .boxed_local() .await .map_err(|err| match dist.clone() { Dist::Built(BuiltDist::Path(built_dist)) => { diff --git a/crates/uv-resolver/src/resolver/provider.rs b/crates/uv-resolver/src/resolver/provider.rs index c362da33c..60eb80caf 100644 --- a/crates/uv-resolver/src/resolver/provider.rs +++ b/crates/uv-resolver/src/resolver/provider.rs @@ -46,12 +46,12 @@ pub enum MetadataResponse { Offline, } -pub trait ResolverProvider: Send + Sync { +pub trait ResolverProvider { /// Get the version map for a package. fn get_package_versions<'io>( &'io self, package_name: &'io PackageName, - ) -> impl Future + Send + 'io; + ) -> impl Future + 'io; /// Get the metadata for a distribution. /// @@ -61,7 +61,7 @@ pub trait ResolverProvider: Send + Sync { fn get_or_build_wheel_metadata<'io>( &'io self, dist: &'io Dist, - ) -> impl Future + Send + 'io; + ) -> impl Future + 'io; fn index_locations(&self) -> &IndexLocations; @@ -72,7 +72,7 @@ pub trait ResolverProvider: Send + Sync { /// The main IO backend for the resolver, which does cached requests network requests using the /// [`RegistryClient`] and [`DistributionDatabase`]. -pub struct DefaultResolverProvider<'a, Context: BuildContext + Send + Sync> { +pub struct DefaultResolverProvider<'a, Context: BuildContext> { /// The [`DistributionDatabase`] used to build source distributions. fetcher: DistributionDatabase<'a, Context>, /// The [`RegistryClient`] used to query the index. @@ -88,7 +88,7 @@ pub struct DefaultResolverProvider<'a, Context: BuildContext + Send + Sync> { no_build: NoBuild, } -impl<'a, Context: BuildContext + Send + Sync> DefaultResolverProvider<'a, Context> { +impl<'a, Context: BuildContext> DefaultResolverProvider<'a, Context> { /// Reads the flat index entries and builds the provider. #[allow(clippy::too_many_arguments)] pub fn new( @@ -118,9 +118,7 @@ impl<'a, Context: BuildContext + Send + Sync> DefaultResolverProvider<'a, Contex } } -impl<'a, Context: BuildContext + Send + Sync> ResolverProvider - for DefaultResolverProvider<'a, Context> -{ +impl<'a, Context: BuildContext> ResolverProvider for DefaultResolverProvider<'a, Context> { /// Make a "Simple API" request for the package and convert the result to a [`VersionMap`]. async fn get_package_versions<'io>( &'io self, diff --git a/crates/uv-types/src/traits.rs b/crates/uv-types/src/traits.rs index 7abd29fed..f074f12ec 100644 --- a/crates/uv-types/src/traits.rs +++ b/crates/uv-types/src/traits.rs @@ -48,8 +48,8 @@ use crate::BuildIsolation; /// Put in a different way, the types here allow `uv-resolver` to depend on `uv-build` and /// `uv-build` to depend on `uv-resolver` which having actual crate dependencies between /// them. -pub trait BuildContext: Sync { - type SourceDistBuilder: SourceBuildTrait + Send + Sync; +pub trait BuildContext { + type SourceDistBuilder: SourceBuildTrait; /// Return a reference to the cache. fn cache(&self) -> &Cache; @@ -79,7 +79,7 @@ pub trait BuildContext: Sync { fn resolve<'a>( &'a self, requirements: &'a [Requirement], - ) -> impl Future> + Send + 'a; + ) -> impl Future> + 'a; /// Install the given set of package versions into the virtual environment. The environment must /// use the same base Python as [`BuildContext::interpreter`] @@ -87,7 +87,7 @@ pub trait BuildContext: Sync { &'a self, resolution: &'a Resolution, venv: &'a PythonEnvironment, - ) -> impl Future> + Send + 'a; + ) -> impl Future> + 'a; /// Setup a source distribution build by installing the required dependencies. A wrapper for /// `uv_build::SourceBuild::setup`. @@ -103,7 +103,7 @@ pub trait BuildContext: Sync { version_id: &'a str, dist: Option<&'a SourceDist>, build_kind: BuildKind, - ) -> impl Future> + Send + 'a; + ) -> impl Future> + 'a; } /// A wrapper for `uv_build::SourceBuild` to avoid cyclical crate dependencies. @@ -117,15 +117,14 @@ pub trait SourceBuildTrait { /// /// Returns the metadata directory if we're having a PEP 517 build and the /// `prepare_metadata_for_build_wheel` hook exists - fn metadata(&mut self) -> impl Future>> + Send; + fn metadata(&mut self) -> impl Future>>; /// A wrapper for `uv_build::SourceBuild::build`. /// /// For PEP 517 builds, this calls `build_wheel`. /// /// Returns the filename of the built wheel inside the given `wheel_dir`. - fn wheel<'a>(&'a self, wheel_dir: &'a Path) - -> impl Future> + Send + 'a; + fn wheel<'a>(&'a self, wheel_dir: &'a Path) -> impl Future> + 'a; } /// A wrapper for [`uv_installer::SitePackages`]