Consolidate concurrency limits (#3493)

## Summary

This PR consolidates the concurrency limits used throughout `uv` and
exposes two limits, `UV_CONCURRENT_DOWNLOADS` and
`UV_CONCURRENT_BUILDS`, as environment variables.

Currently, `uv` has a number of concurrent streams that it buffers using
relatively arbitrary limits for backpressure. However, many of these
limits are conflated. We run a relatively small number of tasks overall
and should start most things as soon as possible. What we really want to
limit are three separate operations:
- File I/O. This is managed by tokio's blocking pool and we should not
really have to worry about it.
- Network I/O.
- Python build processes.

Because the current limits span a broad range of tasks, it's possible
that a limit meant for network I/O is occupied by tasks performing
builds, reading from the file system, or even waiting on a `OnceMap`. We
also don't limit build processes that end up being required to perform a
download. While this may not pose a performance problem because our
limits are relatively high, it does mean that the limits do not do what
we want, making it tricky to expose them to users
(https://github.com/astral-sh/uv/issues/1205,
https://github.com/astral-sh/uv/issues/3311).

After this change, the limits on network I/O and build processes are
centralized and managed by semaphores. All other tasks are unbuffered
(note that these tasks are still bounded, so backpressure should not be
a problem).
This commit is contained in:
Ibraheem Ahmed 2024-05-10 12:43:08 -04:00 committed by GitHub
parent eab2b832a6
commit 783df8f657
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
35 changed files with 575 additions and 218 deletions

View file

@ -1,3 +1,4 @@
use std::future::Future;
use std::io;
use std::path::Path;
use std::rc::Rc;
@ -6,6 +7,7 @@ use std::sync::Arc;
use futures::{FutureExt, TryStreamExt};
use tempfile::TempDir;
use tokio::io::AsyncSeekExt;
use tokio::sync::Semaphore;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{info_span, instrument, warn, Instrument};
use url::Url;
@ -41,21 +43,25 @@ use crate::{ArchiveMetadata, Error, LocalWheel, Reporter, SourceDistributionBuil
/// git) are supported.
///
/// This struct also has the task of acquiring locks around source dist builds in general and git
/// operation especially.
/// operation especially, as well as respecting concurrency limits.
pub struct DistributionDatabase<'a, Context: BuildContext> {
client: &'a RegistryClient,
build_context: &'a Context,
builder: SourceDistributionBuilder<'a, Context>,
locks: Rc<Locks>,
client: ManagedClient<'a>,
}
impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
pub fn new(client: &'a RegistryClient, build_context: &'a Context) -> Self {
pub fn new(
client: &'a RegistryClient,
build_context: &'a Context,
concurrent_downloads: usize,
) -> Self {
Self {
client,
build_context,
builder: SourceDistributionBuilder::new(client, build_context),
builder: SourceDistributionBuilder::new(build_context),
locks: Rc::new(Locks::default()),
client: ManagedClient::new(client, concurrent_downloads),
}
}
@ -75,7 +81,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
io::Error::new(
io::ErrorKind::TimedOut,
format!(
"Failed to download distribution due to network timeout. Try increasing UV_HTTP_TIMEOUT (current value: {}s).", self.client.timeout()
"Failed to download distribution due to network timeout. Try increasing UV_HTTP_TIMEOUT (current value: {}s).", self.client.unmanaged.timeout()
),
)
} else {
@ -307,7 +313,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
let built_wheel = self
.builder
.download_and_build(&BuildableSource::Dist(dist), tags, hashes)
.download_and_build(&BuildableSource::Dist(dist), tags, hashes, &self.client)
.boxed_local()
.await?;
@ -361,7 +367,12 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
return Ok(ArchiveMetadata { metadata, hashes });
}
match self.client.wheel_metadata(dist).boxed_local().await {
let result = self
.client
.managed(|client| client.wheel_metadata(dist).boxed_local())
.await;
match result {
Ok(metadata) => Ok(ArchiveMetadata::from(metadata)),
Err(err) if err.is_http_streaming_unsupported() => {
warn!("Streaming unsupported when fetching metadata for {dist}; downloading wheel directly ({err})");
@ -404,7 +415,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
let metadata = self
.builder
.download_and_build_metadata(source, hashes)
.download_and_build_metadata(source, hashes, &self.client)
.boxed_local()
.await?;
Ok(metadata)
@ -462,7 +473,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
// Fetch the archive from the cache, or download it if necessary.
let req = self.request(url.clone())?;
let cache_control = match self.client.connectivity() {
let cache_control = match self.client.unmanaged.connectivity() {
Connectivity::Online => CacheControl::from(
self.build_context
.cache()
@ -471,10 +482,14 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
),
Connectivity::Offline => CacheControl::AllowStale,
};
let archive = self
.client
.cached_client()
.get_serde(req, &http_entry, cache_control, download)
.managed(|client| {
client
.cached_client()
.get_serde(req, &http_entry, cache_control, download)
})
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,
@ -486,13 +501,17 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
archive
} else {
self.client
.cached_client()
.skip_cache(self.request(url)?, &http_entry, download)
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,
CachedClientError::Client(err) => Error::Client(err),
})?
.managed(|client| async {
client
.cached_client()
.skip_cache(self.request(url)?, &http_entry, download)
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,
CachedClientError::Client(err) => Error::Client(err),
})
})
.await?
};
Ok(archive)
@ -574,7 +593,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
};
let req = self.request(url.clone())?;
let cache_control = match self.client.connectivity() {
let cache_control = match self.client.unmanaged.connectivity() {
Connectivity::Online => CacheControl::from(
self.build_context
.cache()
@ -583,10 +602,14 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
),
Connectivity::Offline => CacheControl::AllowStale,
};
let archive = self
.client
.cached_client()
.get_serde(req, &http_entry, cache_control, download)
.managed(|client| {
client
.cached_client()
.get_serde(req, &http_entry, cache_control, download)
})
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,
@ -598,13 +621,17 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
archive
} else {
self.client
.cached_client()
.skip_cache(self.request(url)?, &http_entry, download)
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,
CachedClientError::Client(err) => Error::Client(err),
})?
.managed(|client| async move {
client
.cached_client()
.skip_cache(self.request(url)?, &http_entry, download)
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,
CachedClientError::Client(err) => Error::Client(err),
})
})
.await?
};
Ok(archive)
@ -733,6 +760,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
/// Returns a GET [`reqwest::Request`] for the given URL.
fn request(&self, url: Url) -> Result<reqwest::Request, reqwest::Error> {
self.client
.unmanaged
.uncached_client()
.get(url)
.header(
@ -749,6 +777,39 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
pub fn index_locations(&self) -> &IndexLocations {
self.build_context.index_locations()
}
/// Return the [`ManagedClient`] used by this resolver.
pub fn client(&self) -> &ManagedClient<'a> {
&self.client
}
}
/// A wrapper around `RegistryClient` that manages a concurrency limit.
pub struct ManagedClient<'a> {
pub unmanaged: &'a RegistryClient,
control: Semaphore,
}
impl<'a> ManagedClient<'a> {
/// Create a new `ManagedClient` using the given client and concurrency limit.
fn new(client: &'a RegistryClient, concurrency: usize) -> ManagedClient<'a> {
ManagedClient {
unmanaged: client,
control: Semaphore::new(concurrency),
}
}
/// Perform a request using the client, respecting the concurrency limit.
///
/// If the concurrency limit has been reached, this method will wait until a pending
/// operation completes before executing the closure.
pub async fn managed<F, T>(&self, f: impl FnOnce(&'a RegistryClient) -> F) -> T
where
F: Future<Output = T>,
{
let _permit = self.control.acquire().await.unwrap();
f(self.unmanaged).await
}
}
/// A pointer to an archive in the cache, fetched from an HTTP archive.