mirror of
https://github.com/astral-sh/uv.git
synced 2025-08-04 19:08:04 +00:00
Wait for request stream to flush before returning resolution (#2374)
## Summary This is a more robust fix for https://github.com/astral-sh/uv/issues/2300. The basic issue is: - When we resolve, we attempt to pre-fetch the distribution metadata for candidate packages. - It's possible that the resolution completes _without_ those pre-fetch responses. (In the linked issue, this was mainly because we were running with `--no-deps`, but the pre-fetch was causing us to attempt to build a package to get its dependencies. The resolution would then finish before the build completed.) - In that case, the `Index` will be marked as "waiting" for that response -- but it'll never come through. - If there's a subsequent call to the `Index`, to see if we should fetch or are waiting for that response, we'll end up waiting for it forever, since it _looks_ like it's in-flight (but isn't). (In the linked issue, we had to build the source distribution for the install phase of `pip install`, but `setuptools` was in this bad state from the _resolve_ phase.) This PR modifies the resolver to ensure that we flush the stream of requests before returning. Specifically, we now `join` rather than `select` between the resolution and request-handling futures. This _could_ be wasteful, since we don't _need_ those requests, but it at least ensures that every `.wait` is followed by ` .done`. In practice, I expect this not to have any significant effect on performance, since we end up using the pre-fetched distributions almost every time. ## Test Plan I ran through the test plan from https://github.com/astral-sh/uv/pull/2373, but ran the build 10 times and ensured it never crashed. (I reverted https://github.com/astral-sh/uv/pull/2373, since that _also_ fixes the issue in the proximate case, by never fetching `setuptools` during the resolve phase.) I also added logging to verify that requests are being handled _after_ the resolution completes, as expected. I also introduced an arbitrary error in `fetch` to ensure that the error was immediately propagated.
This commit is contained in:
parent
96290bf1a7
commit
79ac3a2a7e
2 changed files with 28 additions and 31 deletions
|
@ -27,7 +27,7 @@ pub enum ResolveError {
|
|||
#[error(transparent)]
|
||||
Client(#[from] uv_client::Error),
|
||||
|
||||
#[error("The channel is closed, was there a panic?")]
|
||||
#[error("The channel closed unexpectedly")]
|
||||
ChannelClosed,
|
||||
|
||||
#[error(transparent)]
|
||||
|
|
|
@ -12,7 +12,6 @@ use pubgrub::error::PubGrubError;
|
|||
use pubgrub::range::Range;
|
||||
use pubgrub::solver::{Incompatibility, State};
|
||||
use rustc_hash::{FxHashMap, FxHashSet};
|
||||
use tokio::select;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tracing::{debug, info_span, instrument, trace, Instrument};
|
||||
use url::Url;
|
||||
|
@ -197,42 +196,40 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
|
|||
let requests_fut = self.fetch(request_stream).fuse();
|
||||
|
||||
// Run the solver.
|
||||
let resolve_fut = self.solve(&request_sink).fuse();
|
||||
let resolve_fut = self.solve(request_sink).fuse();
|
||||
|
||||
let resolution = select! {
|
||||
result = requests_fut => {
|
||||
result?;
|
||||
return Err(ResolveError::ChannelClosed);
|
||||
// Wait for both to complete.
|
||||
match tokio::try_join!(requests_fut, resolve_fut) {
|
||||
Ok(((), resolution)) => {
|
||||
self.on_complete();
|
||||
Ok(resolution)
|
||||
}
|
||||
resolution = resolve_fut => {
|
||||
resolution.map_err(|err| {
|
||||
// Add version information to improve unsat error messages.
|
||||
if let ResolveError::NoSolution(err) = err {
|
||||
ResolveError::NoSolution(
|
||||
err
|
||||
.with_available_versions(&self.python_requirement, &self.visited, &self.index.packages)
|
||||
.with_selector(self.selector.clone())
|
||||
.with_python_requirement(&self.python_requirement)
|
||||
.with_index_locations(self.provider.index_locations())
|
||||
.with_unavailable_packages(&self.unavailable_packages)
|
||||
Err(err) => {
|
||||
// Add version information to improve unsat error messages.
|
||||
Err(if let ResolveError::NoSolution(err) = err {
|
||||
ResolveError::NoSolution(
|
||||
err.with_available_versions(
|
||||
&self.python_requirement,
|
||||
&self.visited,
|
||||
&self.index.packages,
|
||||
)
|
||||
} else {
|
||||
err
|
||||
}
|
||||
})?
|
||||
.with_selector(self.selector.clone())
|
||||
.with_python_requirement(&self.python_requirement)
|
||||
.with_index_locations(self.provider.index_locations())
|
||||
.with_unavailable_packages(&self.unavailable_packages),
|
||||
)
|
||||
} else {
|
||||
err
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
self.on_complete();
|
||||
|
||||
Ok(resolution)
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the `PubGrub` solver.
|
||||
#[instrument(skip_all)]
|
||||
async fn solve(
|
||||
&self,
|
||||
request_sink: &tokio::sync::mpsc::Sender<Request>,
|
||||
request_sink: tokio::sync::mpsc::Sender<Request>,
|
||||
) -> Result<ResolutionGraph, ResolveError> {
|
||||
let root = PubGrubPackage::Root(self.project.clone());
|
||||
|
||||
|
@ -258,7 +255,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
|
|||
// Pre-visit all candidate packages, to allow metadata to be fetched in parallel. If
|
||||
// the dependency mode is direct, we only need to visit the root package.
|
||||
if self.dependency_mode.is_transitive() {
|
||||
Self::pre_visit(state.partial_solution.prioritized_packages(), request_sink)
|
||||
Self::pre_visit(state.partial_solution.prioritized_packages(), &request_sink)
|
||||
.await?;
|
||||
}
|
||||
|
||||
|
@ -294,7 +291,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
|
|||
&next,
|
||||
term_intersection.unwrap_positive(),
|
||||
&mut pins,
|
||||
request_sink,
|
||||
&request_sink,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
@ -434,7 +431,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
|
|||
// Retrieve that package dependencies.
|
||||
let package = &next;
|
||||
let dependencies = match self
|
||||
.get_dependencies(package, &version, &mut priorities, request_sink)
|
||||
.get_dependencies(package, &version, &mut priorities, &request_sink)
|
||||
.await?
|
||||
{
|
||||
Dependencies::Unavailable(reason) => {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue