Refactor batch prefetch (#10349)

This commit is contained in:
konsti 2025-01-07 14:58:36 +01:00 committed by GitHub
parent 3dc481b063
commit c6ac121ed0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 194 additions and 143 deletions

View file

@ -38,7 +38,6 @@ uv-types = { workspace = true }
uv-warnings = { workspace = true }
uv-workspace = { workspace = true }
anyhow = { workspace = true }
clap = { workspace = true, features = ["derive"], optional = true }
dashmap = { workspace = true }
either = { workspace = true }

View file

@ -1,7 +1,8 @@
use std::cmp::min;
use std::sync::Arc;
use itertools::Itertools;
use pubgrub::{Range, Term};
use pubgrub::{Range, Ranges, Term};
use rustc_hash::FxHashMap;
use tokio::sync::mpsc::Sender;
use tracing::{debug, trace};
@ -41,10 +42,19 @@ enum BatchPrefetchStrategy {
/// Note that these all heuristics that could totally prefetch lots of irrelevant versions.
#[derive(Clone)]
pub(crate) struct BatchPrefetcher {
// Internal types.
// Types to determine whether we need to prefetch.
tried_versions: FxHashMap<PackageName, usize>,
last_prefetch: FxHashMap<PackageName, usize>,
// Shared (e.g., `Arc`) types.
// Types to execute the prefetch.
prefetch_runner: BatchPrefetcherRunner,
}
/// The types that are needed for running the batch prefetching after we determined that we need to
/// prefetch.
///
/// These types are shared (e.g., `Arc`) so they can be cheaply cloned and moved between threads.
#[derive(Clone)]
pub(crate) struct BatchPrefetcherRunner {
capabilities: IndexCapabilities,
index: InMemoryIndex,
request_sink: Sender<Request>,
@ -59,9 +69,11 @@ impl BatchPrefetcher {
Self {
tried_versions: FxHashMap::default(),
last_prefetch: FxHashMap::default(),
capabilities,
index,
request_sink,
prefetch_runner: BatchPrefetcherRunner {
capabilities,
index,
request_sink,
},
}
}
@ -76,7 +88,7 @@ impl BatchPrefetcher {
python_requirement: &PythonRequirement,
selector: &CandidateSelector,
env: &ResolverEnvironment,
) -> anyhow::Result<(), ResolveError> {
) -> Result<(), ResolveError> {
let PubGrubPackageInner::Package {
name,
extra: None,
@ -95,154 +107,37 @@ impl BatchPrefetcher {
// This is immediate, we already fetched the version map.
let versions_response = if let Some(index) = index {
self.index
self.prefetch_runner
.index
.explicit()
.wait_blocking(&(name.clone(), index.clone()))
.ok_or_else(|| ResolveError::UnregisteredTask(name.to_string()))?
} else {
self.index
self.prefetch_runner
.index
.implicit()
.wait_blocking(name)
.ok_or_else(|| ResolveError::UnregisteredTask(name.to_string()))?
};
let VersionsResponse::Found(ref version_map) = *versions_response else {
return Ok(());
};
let mut phase = BatchPrefetchStrategy::Compatible {
let phase = BatchPrefetchStrategy::Compatible {
compatible: current_range.clone(),
previous: version.clone(),
};
let mut prefetch_count = 0;
for _ in 0..total_prefetch {
let candidate = match phase {
BatchPrefetchStrategy::Compatible {
compatible,
previous,
} => {
if let Some(candidate) =
selector.select_no_preference(name, &compatible, version_map, env)
{
let compatible = compatible.intersection(
&Range::singleton(candidate.version().clone()).complement(),
);
phase = BatchPrefetchStrategy::Compatible {
compatible,
previous: candidate.version().clone(),
};
candidate
} else {
// We exhausted the compatible version, switch to ignoring the existing
// constraints on the package and instead going through versions in order.
phase = BatchPrefetchStrategy::InOrder { previous };
continue;
}
}
BatchPrefetchStrategy::InOrder { previous } => {
let mut range = if selector.use_highest_version(name, env) {
Range::strictly_lower_than(previous)
} else {
Range::strictly_higher_than(previous)
};
// If we have constraints from root, don't go beyond those. Example: We are
// prefetching for foo 1.60 and have a dependency for `foo>=1.50`, so we should
// only prefetch 1.60 to 1.50, knowing 1.49 will always be rejected.
if let Some(unchangeable_constraints) = unchangeable_constraints {
range = match unchangeable_constraints {
Term::Positive(constraints) => range.intersection(constraints),
Term::Negative(negative_constraints) => {
range.intersection(&negative_constraints.complement())
}
};
}
if let Some(candidate) =
selector.select_no_preference(name, &range, version_map, env)
{
phase = BatchPrefetchStrategy::InOrder {
previous: candidate.version().clone(),
};
candidate
} else {
// Both strategies exhausted their candidates.
break;
}
}
};
let Some(dist) = candidate.compatible() else {
continue;
};
// Avoid prefetching source distributions, which could be expensive.
let Some(wheel) = dist.wheel() else {
continue;
};
// Avoid prefetching built distributions that don't support _either_ PEP 658 (`.metadata`)
// or range requests.
if !(wheel.file.dist_info_metadata
|| self.capabilities.supports_range_requests(&wheel.index))
{
debug!("Abandoning prefetch for {wheel} due to missing registry capabilities");
return Ok(());
}
// Avoid prefetching for distributions that don't satisfy the Python requirement.
match dist {
CompatibleDist::InstalledDist(_) => {}
CompatibleDist::SourceDist { sdist, .. }
| CompatibleDist::IncompatibleWheel { sdist, .. } => {
// Source distributions must meet both the _target_ Python version and the
// _installed_ Python version (to build successfully).
if let Some(requires_python) = sdist.file.requires_python.as_ref() {
if !python_requirement
.installed()
.is_contained_by(requires_python)
{
continue;
}
if !python_requirement.target().is_contained_by(requires_python) {
continue;
}
}
}
CompatibleDist::CompatibleWheel { wheel, .. } => {
// Wheels must meet the _target_ Python version.
if let Some(requires_python) = wheel.file.requires_python.as_ref() {
if !python_requirement.target().is_contained_by(requires_python) {
continue;
}
}
}
};
let dist = dist.for_resolution();
// Emit a request to fetch the metadata for this version.
trace!(
"Prefetching {prefetch_count} ({}) {}",
match phase {
BatchPrefetchStrategy::Compatible { .. } => "compatible",
BatchPrefetchStrategy::InOrder { .. } => "in order",
},
dist
);
prefetch_count += 1;
if self.index.distributions().register(candidate.version_id()) {
let request = Request::from(dist);
self.request_sink.blocking_send(request)?;
}
}
match prefetch_count {
0 => debug!("No `{name}` versions to prefetch"),
1 => debug!("Prefetched 1 `{name}` version"),
_ => debug!("Prefetched {prefetch_count} `{name}` versions"),
}
self.last_prefetch.insert(name.clone(), num_tried);
self.prefetch_runner.send_prefetch(
name,
unchangeable_constraints,
total_prefetch,
&versions_response,
phase,
python_requirement,
selector,
env,
)?;
Ok(())
}
@ -303,3 +198,161 @@ impl BatchPrefetcher {
debug!("Tried {total_versions} versions: {counts}");
}
}
impl BatchPrefetcherRunner {
/// Given that the conditions for prefetching are met, find the versions to prefetch and
/// send the prefetch requests.
fn send_prefetch(
&self,
name: &PackageName,
unchangeable_constraints: Option<&Term<Ranges<Version>>>,
total_prefetch: usize,
versions_response: &Arc<VersionsResponse>,
mut phase: BatchPrefetchStrategy,
python_requirement: &PythonRequirement,
selector: &CandidateSelector,
env: &ResolverEnvironment,
) -> Result<(), ResolveError> {
let VersionsResponse::Found(ref version_map) = &**versions_response else {
return Ok(());
};
let mut prefetch_count = 0;
for _ in 0..total_prefetch {
let candidate = match phase {
BatchPrefetchStrategy::Compatible {
compatible,
previous,
} => {
if let Some(candidate) =
selector.select_no_preference(name, &compatible, version_map, env)
{
let compatible = compatible.intersection(
&Range::singleton(candidate.version().clone()).complement(),
);
phase = BatchPrefetchStrategy::Compatible {
compatible,
previous: candidate.version().clone(),
};
candidate
} else {
// We exhausted the compatible version, switch to ignoring the existing
// constraints on the package and instead going through versions in order.
phase = BatchPrefetchStrategy::InOrder { previous };
continue;
}
}
BatchPrefetchStrategy::InOrder { previous } => {
let mut range = if selector.use_highest_version(name, env) {
Range::strictly_lower_than(previous)
} else {
Range::strictly_higher_than(previous)
};
// If we have constraints from root, don't go beyond those. Example: We are
// prefetching for foo 1.60 and have a dependency for `foo>=1.50`, so we should
// only prefetch 1.60 to 1.50, knowing 1.49 will always be rejected.
if let Some(unchangeable_constraints) = &unchangeable_constraints {
range = match unchangeable_constraints {
Term::Positive(constraints) => range.intersection(constraints),
Term::Negative(negative_constraints) => {
range.intersection(&negative_constraints.complement())
}
};
}
if let Some(candidate) =
selector.select_no_preference(name, &range, version_map, env)
{
phase = BatchPrefetchStrategy::InOrder {
previous: candidate.version().clone(),
};
candidate
} else {
// Both strategies exhausted their candidates.
break;
}
}
};
let Some(dist) = candidate.compatible() else {
continue;
};
// Avoid prefetching source distributions, which could be expensive.
let Some(wheel) = dist.wheel() else {
continue;
};
// Avoid prefetching built distributions that don't support _either_ PEP 658 (`.metadata`)
// or range requests.
if !(wheel.file.dist_info_metadata
|| self.capabilities.supports_range_requests(&wheel.index))
{
debug!("Abandoning prefetch for {wheel} due to missing registry capabilities");
return Ok(());
}
// Avoid prefetching for distributions that don't satisfy the Python requirement.
if !satisfies_python(dist, python_requirement) {
continue;
}
let dist = dist.for_resolution();
// Emit a request to fetch the metadata for this version.
trace!(
"Prefetching {prefetch_count} ({}) {}",
match phase {
BatchPrefetchStrategy::Compatible { .. } => "compatible",
BatchPrefetchStrategy::InOrder { .. } => "in order",
},
dist
);
prefetch_count += 1;
if self.index.distributions().register(candidate.version_id()) {
let request = Request::from(dist);
self.request_sink.blocking_send(request)?;
}
}
match prefetch_count {
0 => debug!("No `{name}` versions to prefetch"),
1 => debug!("Prefetched 1 `{name}` version"),
_ => debug!("Prefetched {prefetch_count} `{name}` versions"),
}
Ok(())
}
}
fn satisfies_python(dist: &CompatibleDist, python_requirement: &PythonRequirement) -> bool {
match dist {
CompatibleDist::InstalledDist(_) => {}
CompatibleDist::SourceDist { sdist, .. }
| CompatibleDist::IncompatibleWheel { sdist, .. } => {
// Source distributions must meet both the _target_ Python version and the
// _installed_ Python version (to build successfully).
if let Some(requires_python) = sdist.file.requires_python.as_ref() {
if !python_requirement
.installed()
.is_contained_by(requires_python)
{
return false;
}
if !python_requirement.target().is_contained_by(requires_python) {
return false;
}
}
}
CompatibleDist::CompatibleWheel { wheel, .. } => {
// Wheels must meet the _target_ Python version.
if let Some(requires_python) = wheel.file.requires_python.as_ref() {
if !python_requirement.target().is_contained_by(requires_python) {
return false;
}
}
}
}
true
}