Remove unnecessary uses of DashMap and Arc (#3413)

## Summary

All of the resolver code is run on the main thread, so a lot of the
`Send` bounds and uses of `DashMap` and `Arc` are unnecessary. We could
also switch to using single-threaded versions of `Mutex` and `Notify` in
some places, but there isn't really a crate that provides those I would
be comfortable with using.

The `Arc` in `OnceMap` can't easily be removed because of the uv-auth
code which uses the
[reqwest-middleware](https://docs.rs/reqwest-middleware/latest/reqwest_middleware/trait.Middleware.html)
crate, that seems to adds unnecessary `Send` bounds because of
`async-trait`. We could duplicate the code and create a `OnceMapLocal`
variant, but I don't feel that's worth it.
This commit is contained in:
Ibraheem Ahmed 2024-05-06 22:30:43 -04:00 committed by GitHub
parent 2c84af15b8
commit 94cf604574
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 165 additions and 145 deletions

View file

@ -1,9 +1,9 @@
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Formatter;
use std::ops::Deref;
use std::rc::Rc;
use std::sync::Arc;
use dashmap::{DashMap, DashSet};
use indexmap::IndexMap;
use pubgrub::range::Range;
use pubgrub::report::{DefaultStringReporter, DerivationTree, External, Reporter};
@ -22,7 +22,9 @@ use crate::candidate_selector::CandidateSelector;
use crate::dependency_provider::UvDependencyProvider;
use crate::pubgrub::{PubGrubPackage, PubGrubPython, PubGrubReportFormatter};
use crate::python_requirement::PythonRequirement;
use crate::resolver::{IncompletePackage, UnavailablePackage, VersionsResponse};
use crate::resolver::{
IncompletePackage, SharedMap, SharedSet, UnavailablePackage, VersionsResponse,
};
#[derive(Debug, thiserror::Error)]
pub enum ResolveError {
@ -236,8 +238,8 @@ impl NoSolutionError {
pub(crate) fn with_available_versions(
mut self,
python_requirement: &PythonRequirement,
visited: &DashSet<PackageName>,
package_versions: &OnceMap<PackageName, Arc<VersionsResponse>>,
visited: &SharedSet<PackageName>,
package_versions: &OnceMap<PackageName, Rc<VersionsResponse>>,
) -> Self {
let mut available_versions = IndexMap::default();
for package in self.derivation_tree.packages() {
@ -261,7 +263,7 @@ impl NoSolutionError {
// tree, but were never visited during resolution. We _may_ have metadata for
// these packages, but it's non-deterministic, and omitting them ensures that
// we represent the state of the resolver at the time of failure.
if visited.contains(name) {
if visited.borrow().contains(name) {
if let Some(response) = package_versions.get(name) {
if let VersionsResponse::Found(ref version_maps) = *response {
for version_map in version_maps {
@ -300,13 +302,13 @@ impl NoSolutionError {
#[must_use]
pub(crate) fn with_unavailable_packages(
mut self,
unavailable_packages: &DashMap<PackageName, UnavailablePackage>,
unavailable_packages: &SharedMap<PackageName, UnavailablePackage>,
) -> Self {
let unavailable_packages = unavailable_packages.borrow();
let mut new = FxHashMap::default();
for package in self.derivation_tree.packages() {
if let PubGrubPackage::Package(name, _, _) = package {
if let Some(entry) = unavailable_packages.get(name) {
let reason = entry.value();
if let Some(reason) = unavailable_packages.get(name) {
new.insert(name.clone(), reason.clone());
}
}
@ -319,15 +321,14 @@ impl NoSolutionError {
#[must_use]
pub(crate) fn with_incomplete_packages(
mut self,
incomplete_packages: &DashMap<PackageName, DashMap<Version, IncompletePackage>>,
incomplete_packages: &SharedMap<PackageName, SharedMap<Version, IncompletePackage>>,
) -> Self {
let mut new = FxHashMap::default();
let incomplete_packages = incomplete_packages.borrow();
for package in self.derivation_tree.packages() {
if let PubGrubPackage::Package(name, _, _) = package {
if let Some(entry) = incomplete_packages.get(name) {
let versions = entry.value();
for entry in versions {
let (version, reason) = entry.pair();
if let Some(versions) = incomplete_packages.get(name) {
for (version, reason) in versions.borrow().iter() {
new.entry(name.clone())
.or_insert_with(BTreeMap::default)
.insert(version.clone(), reason.clone());

View file

@ -1,6 +1,6 @@
use std::borrow::Cow;
use std::hash::BuildHasherDefault;
use std::sync::Arc;
use std::rc::Rc;
use anyhow::Result;
use itertools::Itertools;
@ -69,8 +69,8 @@ impl ResolutionGraph {
pub(crate) fn from_state(
selection: &SelectedDependencies<UvDependencyProvider>,
pins: &FilePins,
packages: &OnceMap<PackageName, Arc<VersionsResponse>>,
distributions: &OnceMap<VersionId, Arc<MetadataResponse>>,
packages: &OnceMap<PackageName, Rc<VersionsResponse>>,
distributions: &OnceMap<VersionId, Rc<MetadataResponse>>,
state: &State<UvDependencyProvider>,
preferences: &Preferences,
editables: Editables,

View file

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::rc::Rc;
use distribution_types::VersionId;
use once_map::OnceMap;
@ -11,30 +11,30 @@ use crate::resolver::provider::{MetadataResponse, VersionsResponse};
pub struct InMemoryIndex {
/// A map from package name to the metadata for that package and the index where the metadata
/// came from.
pub(crate) packages: OnceMap<PackageName, Arc<VersionsResponse>>,
pub(crate) packages: OnceMap<PackageName, Rc<VersionsResponse>>,
/// A map from package ID to metadata for that distribution.
pub(crate) distributions: OnceMap<VersionId, Arc<MetadataResponse>>,
pub(crate) distributions: OnceMap<VersionId, Rc<MetadataResponse>>,
}
impl InMemoryIndex {
/// Insert a [`VersionsResponse`] into the index.
pub fn insert_package(&self, package_name: PackageName, response: VersionsResponse) {
self.packages.done(package_name, Arc::new(response));
self.packages.done(package_name, Rc::new(response));
}
/// Insert a [`Metadata23`] into the index.
pub fn insert_metadata(&self, version_id: VersionId, response: MetadataResponse) {
self.distributions.done(version_id, Arc::new(response));
self.distributions.done(version_id, Rc::new(response));
}
/// Get the [`VersionsResponse`] for a given package name, without waiting.
pub fn get_package(&self, package_name: &PackageName) -> Option<Arc<VersionsResponse>> {
pub fn get_package(&self, package_name: &PackageName) -> Option<Rc<VersionsResponse>> {
self.packages.get(package_name)
}
/// Get the [`MetadataResponse`] for a given package ID, without waiting.
pub fn get_metadata(&self, version_id: &VersionId) -> Option<Arc<MetadataResponse>> {
pub fn get_metadata(&self, version_id: &VersionId) -> Option<Rc<MetadataResponse>> {
self.distributions.get(version_id)
}
}

View file

@ -1,12 +1,14 @@
//! Given a set of requirements, find a set of compatible packages.
use std::borrow::Cow;
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::rc::Rc;
use std::sync::Arc;
use anyhow::Result;
use dashmap::{DashMap, DashSet};
use futures::{FutureExt, StreamExt};
use itertools::Itertools;
use pubgrub::error::PubGrubError;
@ -107,11 +109,10 @@ enum ResolverVersion {
Unavailable(Version, UnavailableVersion),
}
pub struct Resolver<
'a,
Provider: ResolverProvider,
InstalledPackages: InstalledPackagesProvider + Send + Sync,
> {
pub(crate) type SharedMap<K, V> = Rc<RefCell<HashMap<K, V>>>;
pub(crate) type SharedSet<K> = Rc<RefCell<HashSet<K>>>;
pub struct Resolver<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvider> {
project: Option<PackageName>,
requirements: Vec<Requirement>,
constraints: Constraints,
@ -129,20 +130,17 @@ pub struct Resolver<
index: &'a InMemoryIndex,
installed_packages: &'a InstalledPackages,
/// Incompatibilities for packages that are entirely unavailable.
unavailable_packages: DashMap<PackageName, UnavailablePackage>,
unavailable_packages: SharedMap<PackageName, UnavailablePackage>,
/// Incompatibilities for packages that are unavailable at specific versions.
incomplete_packages: DashMap<PackageName, DashMap<Version, IncompletePackage>>,
incomplete_packages: SharedMap<PackageName, SharedMap<Version, IncompletePackage>>,
/// The set of all registry-based packages visited during resolution.
visited: DashSet<PackageName>,
visited: SharedSet<PackageName>,
reporter: Option<Arc<dyn Reporter>>,
provider: Provider,
}
impl<
'a,
Context: BuildContext + Send + Sync,
InstalledPackages: InstalledPackagesProvider + Send + Sync,
> Resolver<'a, DefaultResolverProvider<'a, Context>, InstalledPackages>
impl<'a, Context: BuildContext, InstalledPackages: InstalledPackagesProvider>
Resolver<'a, DefaultResolverProvider<'a, Context>, InstalledPackages>
{
/// Initialize a new resolver using the default backend doing real requests.
///
@ -186,11 +184,8 @@ impl<
}
}
impl<
'a,
Provider: ResolverProvider,
InstalledPackages: InstalledPackagesProvider + Send + Sync,
> Resolver<'a, Provider, InstalledPackages>
impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvider>
Resolver<'a, Provider, InstalledPackages>
{
/// Initialize a new resolver using a user provided backend.
#[allow(clippy::too_many_arguments)]
@ -206,9 +201,9 @@ impl<
) -> Result<Self, ResolveError> {
Ok(Self {
index,
unavailable_packages: DashMap::default(),
incomplete_packages: DashMap::default(),
visited: DashSet::default(),
unavailable_packages: SharedMap::default(),
incomplete_packages: SharedMap::default(),
visited: SharedSet::default(),
selector: CandidateSelector::for_resolution(options, &manifest, markers),
dependency_mode: options.dependency_mode,
urls: Urls::from_manifest(&manifest, markers, options.dependency_mode)?,
@ -251,7 +246,7 @@ impl<
let requests_fut = self.fetch(request_stream).fuse();
// Run the solver.
let resolve_fut = self.solve(request_sink).boxed().fuse();
let resolve_fut = self.solve(request_sink).boxed_local().fuse();
// Wait for both to complete.
match tokio::try_join!(requests_fut, resolve_fut) {
@ -368,6 +363,7 @@ impl<
if let PubGrubPackage::Package(ref package_name, _, _) = next {
// Check if the decision was due to the package being unavailable
self.unavailable_packages
.borrow()
.get(package_name)
.map(|entry| match *entry {
UnavailablePackage::NoIndex => {
@ -648,25 +644,26 @@ impl<
MetadataResponse::Found(archive) => &archive.metadata,
MetadataResponse::Offline => {
self.unavailable_packages
.borrow_mut()
.insert(package_name.clone(), UnavailablePackage::Offline);
return Ok(None);
}
MetadataResponse::InvalidMetadata(err) => {
self.unavailable_packages.insert(
self.unavailable_packages.borrow_mut().insert(
package_name.clone(),
UnavailablePackage::InvalidMetadata(err.to_string()),
);
return Ok(None);
}
MetadataResponse::InconsistentMetadata(err) => {
self.unavailable_packages.insert(
self.unavailable_packages.borrow_mut().insert(
package_name.clone(),
UnavailablePackage::InvalidMetadata(err.to_string()),
);
return Ok(None);
}
MetadataResponse::InvalidStructure(err) => {
self.unavailable_packages.insert(
self.unavailable_packages.borrow_mut().insert(
package_name.clone(),
UnavailablePackage::InvalidStructure(err.to_string()),
);
@ -707,22 +704,25 @@ impl<
.instrument(info_span!("package_wait", %package_name))
.await
.ok_or(ResolveError::Unregistered)?;
self.visited.insert(package_name.clone());
self.visited.borrow_mut().insert(package_name.clone());
let version_maps = match *versions_response {
VersionsResponse::Found(ref version_maps) => version_maps.as_slice(),
VersionsResponse::NoIndex => {
self.unavailable_packages
.borrow_mut()
.insert(package_name.clone(), UnavailablePackage::NoIndex);
&[]
}
VersionsResponse::Offline => {
self.unavailable_packages
.borrow_mut()
.insert(package_name.clone(), UnavailablePackage::Offline);
&[]
}
VersionsResponse::NotFound => {
self.unavailable_packages
.borrow_mut()
.insert(package_name.clone(), UnavailablePackage::NotFound);
&[]
}
@ -925,7 +925,11 @@ impl<
let version_id = dist.version_id();
// If the package does not exist in the registry or locally, we cannot fetch its dependencies
if self.unavailable_packages.get(package_name).is_some()
if self
.unavailable_packages
.borrow()
.get(package_name)
.is_some()
&& self
.installed_packages
.get_packages(package_name)
@ -953,8 +957,10 @@ impl<
MetadataResponse::Found(archive) => &archive.metadata,
MetadataResponse::Offline => {
self.incomplete_packages
.borrow_mut()
.entry(package_name.clone())
.or_default()
.borrow_mut()
.insert(version.clone(), IncompletePackage::Offline);
return Ok(Dependencies::Unavailable(
"network connectivity is disabled, but the metadata wasn't found in the cache"
@ -964,8 +970,10 @@ impl<
MetadataResponse::InvalidMetadata(err) => {
warn!("Unable to extract metadata for {package_name}: {err}");
self.incomplete_packages
.borrow_mut()
.entry(package_name.clone())
.or_default()
.borrow_mut()
.insert(
version.clone(),
IncompletePackage::InvalidMetadata(err.to_string()),
@ -977,8 +985,10 @@ impl<
MetadataResponse::InconsistentMetadata(err) => {
warn!("Unable to extract metadata for {package_name}: {err}");
self.incomplete_packages
.borrow_mut()
.entry(package_name.clone())
.or_default()
.borrow_mut()
.insert(
version.clone(),
IncompletePackage::InconsistentMetadata(err.to_string()),
@ -990,8 +1000,10 @@ impl<
MetadataResponse::InvalidStructure(err) => {
warn!("Unable to extract metadata for {package_name}: {err}");
self.incomplete_packages
.borrow_mut()
.entry(package_name.clone())
.or_default()
.borrow_mut()
.insert(
version.clone(),
IncompletePackage::InvalidStructure(err.to_string()),
@ -1052,22 +1064,20 @@ impl<
request_stream: tokio::sync::mpsc::Receiver<Request>,
) -> Result<(), ResolveError> {
let mut response_stream = ReceiverStream::new(request_stream)
.map(|request| self.process_request(request).boxed())
.map(|request| self.process_request(request).boxed_local())
.buffer_unordered(50);
while let Some(response) = response_stream.next().await {
match response? {
Some(Response::Package(package_name, version_map)) => {
trace!("Received package metadata for: {package_name}");
self.index
.packages
.done(package_name, Arc::new(version_map));
self.index.packages.done(package_name, Rc::new(version_map));
}
Some(Response::Installed { dist, metadata }) => {
trace!("Received installed distribution metadata for: {dist}");
self.index.distributions.done(
dist.version_id(),
Arc::new(MetadataResponse::Found(ArchiveMetadata::from(metadata))),
Rc::new(MetadataResponse::Found(ArchiveMetadata::from(metadata))),
);
}
Some(Response::Dist {
@ -1086,7 +1096,7 @@ impl<
}
self.index
.distributions
.done(dist.version_id(), Arc::new(metadata));
.done(dist.version_id(), Rc::new(metadata));
}
Some(Response::Dist {
dist: Dist::Source(dist),
@ -1104,7 +1114,7 @@ impl<
}
self.index
.distributions
.done(dist.version_id(), Arc::new(metadata));
.done(dist.version_id(), Rc::new(metadata));
}
None => {}
}
@ -1121,7 +1131,7 @@ impl<
let package_versions = self
.provider
.get_package_versions(&package_name)
.boxed()
.boxed_local()
.await
.map_err(ResolveError::Client)?;
@ -1133,7 +1143,7 @@ impl<
let metadata = self
.provider
.get_or_build_wheel_metadata(&dist)
.boxed()
.boxed_local()
.await
.map_err(|err| match dist.clone() {
Dist::Built(BuiltDist::Path(built_dist)) => {
@ -1172,18 +1182,21 @@ impl<
// Short-circuit if we did not find any versions for the package
VersionsResponse::NoIndex => {
self.unavailable_packages
.borrow_mut()
.insert(package_name.clone(), UnavailablePackage::NoIndex);
return Ok(None);
}
VersionsResponse::Offline => {
self.unavailable_packages
.borrow_mut()
.insert(package_name.clone(), UnavailablePackage::Offline);
return Ok(None);
}
VersionsResponse::NotFound => {
self.unavailable_packages
.borrow_mut()
.insert(package_name.clone(), UnavailablePackage::NotFound);
return Ok(None);
@ -1217,7 +1230,7 @@ impl<
let metadata = self
.provider
.get_or_build_wheel_metadata(&dist)
.boxed()
.boxed_local()
.await
.map_err(|err| match dist.clone() {
Dist::Built(BuiltDist::Path(built_dist)) => {

View file

@ -46,12 +46,12 @@ pub enum MetadataResponse {
Offline,
}
pub trait ResolverProvider: Send + Sync {
pub trait ResolverProvider {
/// Get the version map for a package.
fn get_package_versions<'io>(
&'io self,
package_name: &'io PackageName,
) -> impl Future<Output = PackageVersionsResult> + Send + 'io;
) -> impl Future<Output = PackageVersionsResult> + 'io;
/// Get the metadata for a distribution.
///
@ -61,7 +61,7 @@ pub trait ResolverProvider: Send + Sync {
fn get_or_build_wheel_metadata<'io>(
&'io self,
dist: &'io Dist,
) -> impl Future<Output = WheelMetadataResult> + Send + 'io;
) -> impl Future<Output = WheelMetadataResult> + 'io;
fn index_locations(&self) -> &IndexLocations;
@ -72,7 +72,7 @@ pub trait ResolverProvider: Send + Sync {
/// The main IO backend for the resolver, which does cached requests network requests using the
/// [`RegistryClient`] and [`DistributionDatabase`].
pub struct DefaultResolverProvider<'a, Context: BuildContext + Send + Sync> {
pub struct DefaultResolverProvider<'a, Context: BuildContext> {
/// The [`DistributionDatabase`] used to build source distributions.
fetcher: DistributionDatabase<'a, Context>,
/// The [`RegistryClient`] used to query the index.
@ -88,7 +88,7 @@ pub struct DefaultResolverProvider<'a, Context: BuildContext + Send + Sync> {
no_build: NoBuild,
}
impl<'a, Context: BuildContext + Send + Sync> DefaultResolverProvider<'a, Context> {
impl<'a, Context: BuildContext> DefaultResolverProvider<'a, Context> {
/// Reads the flat index entries and builds the provider.
#[allow(clippy::too_many_arguments)]
pub fn new(
@ -118,9 +118,7 @@ impl<'a, Context: BuildContext + Send + Sync> DefaultResolverProvider<'a, Contex
}
}
impl<'a, Context: BuildContext + Send + Sync> ResolverProvider
for DefaultResolverProvider<'a, Context>
{
impl<'a, Context: BuildContext> ResolverProvider for DefaultResolverProvider<'a, Context> {
/// Make a "Simple API" request for the package and convert the result to a [`VersionMap`].
async fn get_package_versions<'io>(
&'io self,