From 783df8f6575f2f28ac5097eadc37458238e4b893 Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Fri, 10 May 2024 12:43:08 -0400 Subject: [PATCH] Consolidate concurrency limits (#3493) ## Summary This PR consolidates the concurrency limits used throughout `uv` and exposes two limits, `UV_CONCURRENT_DOWNLOADS` and `UV_CONCURRENT_BUILDS`, as environment variables. Currently, `uv` has a number of concurrent streams that it buffers using relatively arbitrary limits for backpressure. However, many of these limits are conflated. We run a relatively small number of tasks overall and should start most things as soon as possible. What we really want to limit are three separate operations: - File I/O. This is managed by tokio's blocking pool and we should not really have to worry about it. - Network I/O. - Python build processes. Because the current limits span a broad range of tasks, it's possible that a limit meant for network I/O is occupied by tasks performing builds, reading from the file system, or even waiting on a `OnceMap`. We also don't limit build processes that end up being required to perform a download. While this may not pose a performance problem because our limits are relatively high, it does mean that the limits do not do what we want, making it tricky to expose them to users (https://github.com/astral-sh/uv/issues/1205, https://github.com/astral-sh/uv/issues/3311). After this change, the limits on network I/O and build processes are centralized and managed by semaphores. All other tasks are unbuffered (note that these tasks are still bounded, so backpressure should not be a problem). --- Cargo.lock | 3 + README.md | 4 + crates/bench/Cargo.toml | 1 + crates/bench/benches/uv.rs | 6 +- crates/uv-build/src/lib.rs | 166 ++++++++++++------ crates/uv-configuration/src/concurrency.rs | 35 ++++ crates/uv-configuration/src/lib.rs | 2 + crates/uv-dev/Cargo.toml | 1 + crates/uv-dev/src/build.rs | 7 +- crates/uv-dev/src/resolve_cli.rs | 7 +- crates/uv-dev/src/resolve_many.rs | 4 +- crates/uv-dispatch/Cargo.toml | 1 + crates/uv-dispatch/src/lib.rs | 17 +- .../src/distribution_database.rs | 119 ++++++++++--- crates/uv-distribution/src/source/mod.rs | 56 +++--- crates/uv-installer/src/downloader.rs | 20 +-- crates/uv-requirements/src/lookahead.rs | 6 +- crates/uv-requirements/src/source_tree.rs | 15 +- crates/uv-requirements/src/unnamed.rs | 13 +- crates/uv-resolver/src/resolver/mod.rs | 12 +- crates/uv-resolver/src/resolver/provider.rs | 13 +- crates/uv-resolver/tests/resolver.rs | 8 +- crates/uv-workspace/src/combine.rs | 2 + crates/uv-workspace/src/settings.rs | 4 +- crates/uv/src/commands/pip_compile.rs | 30 ++-- crates/uv/src/commands/pip_install.rs | 61 +++++-- crates/uv/src/commands/pip_sync.rs | 45 +++-- crates/uv/src/commands/project/lock.rs | 7 +- crates/uv/src/commands/project/mod.rs | 25 +-- crates/uv/src/commands/project/run.rs | 8 +- crates/uv/src/commands/project/sync.rs | 7 +- crates/uv/src/commands/venv.rs | 6 +- crates/uv/src/main.rs | 3 + crates/uv/src/settings.rs | 63 ++++++- uv.schema.json | 16 ++ 35 files changed, 575 insertions(+), 218 deletions(-) create mode 100644 crates/uv-configuration/src/concurrency.rs diff --git a/Cargo.lock b/Cargo.lock index 46c31c8a0..36df2b216 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -408,6 +408,7 @@ dependencies = [ "uv-cache", "uv-client", "uv-configuration", + "uv-distribution", "uv-interpreter", "uv-resolver", "uv-types", @@ -4777,6 +4778,7 @@ dependencies = [ "uv-client", "uv-configuration", "uv-dispatch", + "uv-distribution", "uv-fs", "uv-installer", "uv-interpreter", @@ -4804,6 +4806,7 @@ dependencies = [ "uv-cache", "uv-client", "uv-configuration", + "uv-distribution", "uv-installer", "uv-interpreter", "uv-resolver", diff --git a/README.md b/README.md index d313919a8..531f540ff 100644 --- a/README.md +++ b/README.md @@ -559,6 +559,10 @@ uv accepts the following command-line arguments as environment variables: - `UV_CUSTOM_COMPILE_COMMAND`: Used to override `uv` in the output header of the `requirements.txt` files generated by `uv pip compile`. Intended for use-cases in which `uv pip compile` is called from within a wrapper script, to include the name of the wrapper script in the output file. +- `UV_CONCURRENT_DOWNLOADS`: Sets the maximum number of in-flight concurrent downloads that `uv` + will perform at any given time. +- `UV_CONCURRENT_BUILDS`: Sets the maximum number of source distributions that `uv` will build + concurrently at any given time. In each case, the corresponding command-line argument takes precedence over an environment variable. diff --git a/crates/bench/Cargo.toml b/crates/bench/Cargo.toml index d905dd4fe..3684aa25b 100644 --- a/crates/bench/Cargo.toml +++ b/crates/bench/Cargo.toml @@ -39,6 +39,7 @@ uv-resolver = { workspace = true } uv-cache = { workspace = true } uv-client = { workspace = true } uv-configuration = { workspace = true } +uv-distribution = { workspace = true } uv-types = { workspace = true } uv-interpreter = { workspace = true } platform-tags = { workspace = true } diff --git a/crates/bench/benches/uv.rs b/crates/bench/benches/uv.rs index f24926b29..5328a0a07 100644 --- a/crates/bench/benches/uv.rs +++ b/crates/bench/benches/uv.rs @@ -47,7 +47,8 @@ mod resolver { use platform_tags::{Arch, Os, Platform, Tags}; use uv_cache::Cache; use uv_client::RegistryClient; - use uv_configuration::{BuildKind, NoBinary, NoBuild, SetupPyStrategy}; + use uv_configuration::{BuildKind, Concurrency, NoBinary, NoBuild, SetupPyStrategy}; + use uv_distribution::DistributionDatabase; use uv_interpreter::{Interpreter, PythonEnvironment}; use uv_resolver::{ FlatIndex, InMemoryIndex, Manifest, Options, PythonRequirement, ResolutionGraph, Resolver, @@ -95,6 +96,7 @@ mod resolver { let hashes = HashStrategy::None; let installed_packages = EmptyInstalledPackages; let python_requirement = PythonRequirement::from_marker_environment(&interpreter, &MARKERS); + let concurrency = Concurrency::default(); let resolver = Resolver::new( manifest, @@ -102,12 +104,12 @@ mod resolver { &python_requirement, Some(&MARKERS), &TAGS, - client, &flat_index, &index, &hashes, &build_context, &installed_packages, + DistributionDatabase::new(client, &build_context, concurrency.downloads), )?; Ok(resolver.resolve().await?) diff --git a/crates/uv-build/src/lib.rs b/crates/uv-build/src/lib.rs index 32ae5075d..913d8ea3d 100644 --- a/crates/uv-build/src/lib.rs +++ b/crates/uv-build/src/lib.rs @@ -22,7 +22,7 @@ use serde::{de, Deserialize, Deserializer}; use tempfile::{tempdir_in, TempDir}; use thiserror::Error; use tokio::process::Command; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, Semaphore}; use tracing::{debug, info_span, instrument, Instrument}; use distribution_types::{ParsedUrlError, Requirement, Resolution}; @@ -377,6 +377,8 @@ pub struct SourceBuild { modified_path: OsString, /// Environment variables to be passed in during metadata or wheel building environment_variables: FxHashMap, + /// Runner for Python scripts. + runner: PythonRunner, } impl SourceBuild { @@ -397,6 +399,7 @@ impl SourceBuild { build_isolation: BuildIsolation<'_>, build_kind: BuildKind, mut environment_variables: FxHashMap, + concurrent_builds: usize, ) -> Result { let temp_dir = tempdir_in(build_context.cache().root())?; @@ -476,9 +479,11 @@ impl SourceBuild { // Create the PEP 517 build environment. If build isolation is disabled, we assume the build // environment is already setup. + let runner = PythonRunner::new(concurrent_builds); if build_isolation.is_isolated() { if let Some(pep517_backend) = &pep517_backend { create_pep517_build_environment( + &runner, &source_tree, &venv, pep517_backend, @@ -506,6 +511,7 @@ impl SourceBuild { version_id, environment_variables, modified_path, + runner, }) } @@ -693,15 +699,17 @@ impl SourceBuild { script="prepare_metadata_for_build_wheel", python_version = %self.venv.interpreter().python_version() ); - let output = run_python_script( - &self.venv, - &script, - &self.source_tree, - &self.environment_variables, - &self.modified_path, - ) - .instrument(span) - .await?; + let output = self + .runner + .run_script( + &self.venv, + &script, + &self.source_tree, + &self.environment_variables, + &self.modified_path, + ) + .instrument(span) + .await?; if !output.status.success() { return Err(Error::from_command_output( "Build backend failed to determine metadata through `prepare_metadata_for_build_wheel`".to_string(), @@ -744,19 +752,16 @@ impl SourceBuild { return Err(Error::EditableSetupPy); } // We checked earlier that setup.py exists. - let python_interpreter = self.venv.python_executable(); let span = info_span!( "run_python_script", script="setup.py bdist_wheel", python_version = %self.venv.interpreter().python_version() ); - let output = Command::new(python_interpreter) - .args(["setup.py", "bdist_wheel"]) - .current_dir(self.source_tree.simplified()) - .output() + let output = self + .runner + .run_setup_py(&self.venv, "bdist_wheel", &self.source_tree) .instrument(span) - .await - .map_err(|err| Error::CommandFailed(python_interpreter.to_path_buf(), err))?; + .await?; if !output.status.success() { return Err(Error::from_command_output( "Failed building wheel through setup.py".to_string(), @@ -826,15 +831,17 @@ impl SourceBuild { script=format!("build_{}", self.build_kind), python_version = %self.venv.interpreter().python_version() ); - let output = run_python_script( - &self.venv, - &script, - &self.source_tree, - &self.environment_variables, - &self.modified_path, - ) - .instrument(span) - .await?; + let output = self + .runner + .run_script( + &self.venv, + &script, + &self.source_tree, + &self.environment_variables, + &self.modified_path, + ) + .instrument(span) + .await?; if !output.status.success() { return Err(Error::from_command_output( format!( @@ -880,6 +887,7 @@ fn escape_path_for_python(path: &Path) -> String { /// Not a method because we call it before the builder is completely initialized #[allow(clippy::too_many_arguments)] async fn create_pep517_build_environment( + runner: &PythonRunner, source_tree: &Path, venv: &PythonEnvironment, pep517_backend: &Pep517Backend, @@ -925,15 +933,16 @@ async fn create_pep517_build_environment( script=format!("get_requires_for_build_{}", build_kind), python_version = %venv.interpreter().python_version() ); - let output = run_python_script( - venv, - &script, - source_tree, - environment_variables, - modified_path, - ) - .instrument(span) - .await?; + let output = runner + .run_script( + venv, + &script, + source_tree, + environment_variables, + modified_path, + ) + .instrument(span) + .await?; if !output.status.success() { return Err(Error::from_command_output( format!("Build backend failed to determine extra requires with `build_{build_kind}()`"), @@ -998,27 +1007,72 @@ async fn create_pep517_build_environment( Ok(()) } -/// It is the caller's responsibility to create an informative span. -async fn run_python_script( - venv: &PythonEnvironment, - script: &str, - source_tree: &Path, - environment_variables: &FxHashMap, - modified_path: &OsString, -) -> Result { - Command::new(venv.python_executable()) - .args(["-c", script]) - .current_dir(source_tree.simplified()) - // Pass in remaining environment variables - .envs(environment_variables) - // Set the modified PATH - .env("PATH", modified_path) - // Activate the venv - .env("VIRTUAL_ENV", venv.root()) - .env("CLICOLOR_FORCE", "1") - .output() - .await - .map_err(|err| Error::CommandFailed(venv.python_executable().to_path_buf(), err)) +/// A runner that manages the execution of external python processes with a +/// concurrency limit. +struct PythonRunner { + control: Semaphore, +} + +impl PythonRunner { + /// Create a `PythonRunner` with the provided concurrency limit. + fn new(concurrency: usize) -> PythonRunner { + PythonRunner { + control: Semaphore::new(concurrency), + } + } + + /// Spawn a process that runs a python script in the provided environment. + /// + /// If the concurrency limit has been reached this method will wait until a pending + /// script completes before spawning this one. + /// + /// Note: It is the caller's responsibility to create an informative span. + async fn run_script( + &self, + venv: &PythonEnvironment, + script: &str, + source_tree: &Path, + environment_variables: &FxHashMap, + modified_path: &OsString, + ) -> Result { + let _permit = self.control.acquire().await.unwrap(); + + Command::new(venv.python_executable()) + .args(["-c", script]) + .current_dir(source_tree.simplified()) + // Pass in remaining environment variables + .envs(environment_variables) + // Set the modified PATH + .env("PATH", modified_path) + // Activate the venv + .env("VIRTUAL_ENV", venv.root()) + .env("CLICOLOR_FORCE", "1") + .output() + .await + .map_err(|err| Error::CommandFailed(venv.python_executable().to_path_buf(), err)) + } + + /// Spawn a process that runs a `setup.py` script. + /// + /// If the concurrency limit has been reached this method will wait until a pending + /// script completes before spawning this one. + /// + /// Note: It is the caller's responsibility to create an informative span. + async fn run_setup_py( + &self, + venv: &PythonEnvironment, + script: &str, + source_tree: &Path, + ) -> Result { + let _permit = self.control.acquire().await.unwrap(); + + Command::new(venv.python_executable()) + .args(["setup.py", script]) + .current_dir(source_tree.simplified()) + .output() + .await + .map_err(|err| Error::CommandFailed(venv.python_executable().to_path_buf(), err)) + } } #[cfg(test)] diff --git a/crates/uv-configuration/src/concurrency.rs b/crates/uv-configuration/src/concurrency.rs new file mode 100644 index 000000000..64a86eb58 --- /dev/null +++ b/crates/uv-configuration/src/concurrency.rs @@ -0,0 +1,35 @@ +use std::num::NonZeroUsize; + +/// Concurrency limit settings. +#[derive(Copy, Clone, Debug)] +pub struct Concurrency { + /// The maximum number of concurrent downloads. + /// + /// Note this value must be non-zero. + pub downloads: usize, + /// The maximum number of concurrent builds. + /// + /// Note this value must be non-zero. + pub builds: usize, +} + +impl Default for Concurrency { + fn default() -> Self { + Concurrency { + downloads: Concurrency::DEFAULT_DOWNLOADS, + builds: Concurrency::default_builds(), + } + } +} + +impl Concurrency { + // The default concurrent downloads limit. + pub const DEFAULT_DOWNLOADS: usize = 50; + + // The default concurrent builds limit. + pub fn default_builds() -> usize { + std::thread::available_parallelism() + .map(NonZeroUsize::get) + .unwrap_or(1) + } +} diff --git a/crates/uv-configuration/src/lib.rs b/crates/uv-configuration/src/lib.rs index da3966701..38b32eee7 100644 --- a/crates/uv-configuration/src/lib.rs +++ b/crates/uv-configuration/src/lib.rs @@ -1,5 +1,6 @@ pub use authentication::*; pub use build_options::*; +pub use concurrency::*; pub use config_settings::*; pub use constraints::*; pub use name_specifiers::*; @@ -10,6 +11,7 @@ pub use target_triple::*; mod authentication; mod build_options; +mod concurrency; mod config_settings; mod constraints; mod name_specifiers; diff --git a/crates/uv-dev/Cargo.toml b/crates/uv-dev/Cargo.toml index a0da57f94..f69f07a59 100644 --- a/crates/uv-dev/Cargo.toml +++ b/crates/uv-dev/Cargo.toml @@ -26,6 +26,7 @@ uv-cache = { workspace = true, features = ["clap"] } uv-client = { workspace = true } uv-configuration = { workspace = true } uv-dispatch = { workspace = true } +uv-distribution = { workspace = true } uv-fs = { workspace = true } uv-installer = { workspace = true } uv-interpreter = { workspace = true } diff --git a/crates/uv-dev/src/build.rs b/crates/uv-dev/src/build.rs index 730a09b8b..79be5cc58 100644 --- a/crates/uv-dev/src/build.rs +++ b/crates/uv-dev/src/build.rs @@ -10,7 +10,9 @@ use rustc_hash::FxHashMap; use uv_build::{SourceBuild, SourceBuildContext}; use uv_cache::{Cache, CacheArgs}; use uv_client::RegistryClientBuilder; -use uv_configuration::{BuildKind, ConfigSettings, NoBinary, NoBuild, SetupPyStrategy}; +use uv_configuration::{ + BuildKind, Concurrency, ConfigSettings, NoBinary, NoBuild, SetupPyStrategy, +}; use uv_dispatch::BuildDispatch; use uv_interpreter::PythonEnvironment; use uv_resolver::{FlatIndex, InMemoryIndex}; @@ -61,6 +63,7 @@ pub(crate) async fn build(args: BuildArgs) -> Result { let setup_py = SetupPyStrategy::default(); let in_flight = InFlight::default(); let config_settings = ConfigSettings::default(); + let concurrency = Concurrency::default(); let build_dispatch = BuildDispatch::new( &client, @@ -76,6 +79,7 @@ pub(crate) async fn build(args: BuildArgs) -> Result { install_wheel_rs::linker::LinkMode::default(), &NoBuild::None, &NoBinary::None, + concurrency, ); let builder = SourceBuild::setup( @@ -90,6 +94,7 @@ pub(crate) async fn build(args: BuildArgs) -> Result { BuildIsolation::Isolated, build_kind, FxHashMap::default(), + concurrency.builds, ) .await?; Ok(wheel_dir.join(builder.build_wheel(&wheel_dir).await?)) diff --git a/crates/uv-dev/src/resolve_cli.rs b/crates/uv-dev/src/resolve_cli.rs index 713701f0a..8c0e3d351 100644 --- a/crates/uv-dev/src/resolve_cli.rs +++ b/crates/uv-dev/src/resolve_cli.rs @@ -11,8 +11,9 @@ use petgraph::dot::{Config as DotConfig, Dot}; use distribution_types::{FlatIndexLocation, IndexLocations, IndexUrl, Requirement, Resolution}; use uv_cache::{Cache, CacheArgs}; use uv_client::{FlatIndexClient, RegistryClientBuilder}; -use uv_configuration::{ConfigSettings, NoBinary, NoBuild, SetupPyStrategy}; +use uv_configuration::{Concurrency, ConfigSettings, NoBinary, NoBuild, SetupPyStrategy}; use uv_dispatch::BuildDispatch; +use uv_distribution::DistributionDatabase; use uv_installer::SitePackages; use uv_interpreter::PythonEnvironment; use uv_resolver::{ @@ -79,6 +80,7 @@ pub(crate) async fn resolve_cli(args: ResolveCliArgs) -> Result<()> { ) }; let config_settings = ConfigSettings::default(); + let concurrency = Concurrency::default(); let build_dispatch = BuildDispatch::new( &client, @@ -94,6 +96,7 @@ pub(crate) async fn resolve_cli(args: ResolveCliArgs) -> Result<()> { install_wheel_rs::linker::LinkMode::default(), &no_build, &NoBinary::None, + concurrency, ); let site_packages = SitePackages::from_executable(&venv)?; @@ -115,12 +118,12 @@ pub(crate) async fn resolve_cli(args: ResolveCliArgs) -> Result<()> { &python_requirement, Some(venv.interpreter().markers()), tags, - &client, &flat_index, &index, &HashStrategy::None, &build_dispatch, &site_packages, + DistributionDatabase::new(&client, &build_dispatch, concurrency.downloads), )?; let resolution_graph = resolver.resolve().await.with_context(|| { format!( diff --git a/crates/uv-dev/src/resolve_many.rs b/crates/uv-dev/src/resolve_many.rs index 81a85b895..75cd47bf0 100644 --- a/crates/uv-dev/src/resolve_many.rs +++ b/crates/uv-dev/src/resolve_many.rs @@ -15,7 +15,7 @@ use pep440_rs::{Version, VersionSpecifier, VersionSpecifiers}; use pep508_rs::VersionOrUrl; use uv_cache::{Cache, CacheArgs}; use uv_client::{OwnedArchive, RegistryClient, RegistryClientBuilder}; -use uv_configuration::{ConfigSettings, NoBinary, NoBuild, SetupPyStrategy}; +use uv_configuration::{Concurrency, ConfigSettings, NoBinary, NoBuild, SetupPyStrategy}; use uv_dispatch::BuildDispatch; use uv_interpreter::PythonEnvironment; use uv_normalize::PackageName; @@ -80,6 +80,7 @@ pub(crate) async fn resolve_many(args: ResolveManyArgs) -> Result<()> { let venv = PythonEnvironment::from_virtualenv(&cache)?; let in_flight = InFlight::default(); + let concurrency = Concurrency::default(); let client = RegistryClientBuilder::new(cache.clone()).build(); let header_span = info_span!("resolve many"); @@ -118,6 +119,7 @@ pub(crate) async fn resolve_many(args: ResolveManyArgs) -> Result<()> { install_wheel_rs::linker::LinkMode::default(), &no_build, &NoBinary::None, + concurrency, ); let start = Instant::now(); diff --git a/crates/uv-dispatch/Cargo.toml b/crates/uv-dispatch/Cargo.toml index afbb53741..e23a783c7 100644 --- a/crates/uv-dispatch/Cargo.toml +++ b/crates/uv-dispatch/Cargo.toml @@ -23,6 +23,7 @@ uv-client = { workspace = true } uv-configuration = { workspace = true } uv-installer = { workspace = true } uv-interpreter = { workspace = true } +uv-distribution = { workspace = true } uv-resolver = { workspace = true } uv-types = { workspace = true } diff --git a/crates/uv-dispatch/src/lib.rs b/crates/uv-dispatch/src/lib.rs index 02a774452..49da99f6e 100644 --- a/crates/uv-dispatch/src/lib.rs +++ b/crates/uv-dispatch/src/lib.rs @@ -16,7 +16,9 @@ use distribution_types::{IndexLocations, Name, Requirement, Resolution, SourceDi use uv_build::{SourceBuild, SourceBuildContext}; use uv_cache::Cache; use uv_client::RegistryClient; +use uv_configuration::Concurrency; use uv_configuration::{BuildKind, ConfigSettings, NoBinary, NoBuild, Reinstall, SetupPyStrategy}; +use uv_distribution::DistributionDatabase; use uv_installer::{Downloader, Installer, Plan, Planner, SitePackages}; use uv_interpreter::{Interpreter, PythonEnvironment}; use uv_resolver::{FlatIndex, InMemoryIndex, Manifest, Options, PythonRequirement, Resolver}; @@ -41,6 +43,7 @@ pub struct BuildDispatch<'a> { source_build_context: SourceBuildContext, options: Options, build_extra_env_vars: FxHashMap, + concurrency: Concurrency, } impl<'a> BuildDispatch<'a> { @@ -59,6 +62,7 @@ impl<'a> BuildDispatch<'a> { link_mode: install_wheel_rs::linker::LinkMode, no_build: &'a NoBuild, no_binary: &'a NoBinary, + concurrency: Concurrency, ) -> Self { Self { client, @@ -74,6 +78,7 @@ impl<'a> BuildDispatch<'a> { link_mode, no_build, no_binary, + concurrency, source_build_context: SourceBuildContext::default(), options: Options::default(), build_extra_env_vars: FxHashMap::default(), @@ -144,12 +149,12 @@ impl<'a> BuildContext for BuildDispatch<'a> { &python_requirement, Some(markers), tags, - self.client, self.flat_index, self.index, &HashStrategy::None, self, &EmptyInstalledPackages, + DistributionDatabase::new(self.client, self, self.concurrency.downloads), )?; let graph = resolver.resolve().await.with_context(|| { format!( @@ -226,8 +231,13 @@ impl<'a> BuildContext for BuildDispatch<'a> { vec![] } else { // TODO(konstin): Check that there is no endless recursion. - let downloader = - Downloader::new(self.cache, tags, &HashStrategy::None, self.client, self); + let downloader = Downloader::new( + self.cache, + tags, + &HashStrategy::None, + DistributionDatabase::new(self.client, self, self.concurrency.downloads), + ); + debug!( "Downloading and building requirement{} for build: {}", if remote.len() == 1 { "" } else { "s" }, @@ -315,6 +325,7 @@ impl<'a> BuildContext for BuildDispatch<'a> { self.build_isolation, build_kind, self.build_extra_env_vars.clone(), + self.concurrency.builds, ) .boxed_local() .await?; diff --git a/crates/uv-distribution/src/distribution_database.rs b/crates/uv-distribution/src/distribution_database.rs index c97f9bdce..76e94bb4b 100644 --- a/crates/uv-distribution/src/distribution_database.rs +++ b/crates/uv-distribution/src/distribution_database.rs @@ -1,3 +1,4 @@ +use std::future::Future; use std::io; use std::path::Path; use std::rc::Rc; @@ -6,6 +7,7 @@ use std::sync::Arc; use futures::{FutureExt, TryStreamExt}; use tempfile::TempDir; use tokio::io::AsyncSeekExt; +use tokio::sync::Semaphore; use tokio_util::compat::FuturesAsyncReadCompatExt; use tracing::{info_span, instrument, warn, Instrument}; use url::Url; @@ -41,21 +43,25 @@ use crate::{ArchiveMetadata, Error, LocalWheel, Reporter, SourceDistributionBuil /// git) are supported. /// /// This struct also has the task of acquiring locks around source dist builds in general and git -/// operation especially. +/// operation especially, as well as respecting concurrency limits. pub struct DistributionDatabase<'a, Context: BuildContext> { - client: &'a RegistryClient, build_context: &'a Context, builder: SourceDistributionBuilder<'a, Context>, locks: Rc, + client: ManagedClient<'a>, } impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { - pub fn new(client: &'a RegistryClient, build_context: &'a Context) -> Self { + pub fn new( + client: &'a RegistryClient, + build_context: &'a Context, + concurrent_downloads: usize, + ) -> Self { Self { - client, build_context, - builder: SourceDistributionBuilder::new(client, build_context), + builder: SourceDistributionBuilder::new(build_context), locks: Rc::new(Locks::default()), + client: ManagedClient::new(client, concurrent_downloads), } } @@ -75,7 +81,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { io::Error::new( io::ErrorKind::TimedOut, format!( - "Failed to download distribution due to network timeout. Try increasing UV_HTTP_TIMEOUT (current value: {}s).", self.client.timeout() + "Failed to download distribution due to network timeout. Try increasing UV_HTTP_TIMEOUT (current value: {}s).", self.client.unmanaged.timeout() ), ) } else { @@ -307,7 +313,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { let built_wheel = self .builder - .download_and_build(&BuildableSource::Dist(dist), tags, hashes) + .download_and_build(&BuildableSource::Dist(dist), tags, hashes, &self.client) .boxed_local() .await?; @@ -361,7 +367,12 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { return Ok(ArchiveMetadata { metadata, hashes }); } - match self.client.wheel_metadata(dist).boxed_local().await { + let result = self + .client + .managed(|client| client.wheel_metadata(dist).boxed_local()) + .await; + + match result { Ok(metadata) => Ok(ArchiveMetadata::from(metadata)), Err(err) if err.is_http_streaming_unsupported() => { warn!("Streaming unsupported when fetching metadata for {dist}; downloading wheel directly ({err})"); @@ -404,7 +415,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { let metadata = self .builder - .download_and_build_metadata(source, hashes) + .download_and_build_metadata(source, hashes, &self.client) .boxed_local() .await?; Ok(metadata) @@ -462,7 +473,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { // Fetch the archive from the cache, or download it if necessary. let req = self.request(url.clone())?; - let cache_control = match self.client.connectivity() { + let cache_control = match self.client.unmanaged.connectivity() { Connectivity::Online => CacheControl::from( self.build_context .cache() @@ -471,10 +482,14 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { ), Connectivity::Offline => CacheControl::AllowStale, }; + let archive = self .client - .cached_client() - .get_serde(req, &http_entry, cache_control, download) + .managed(|client| { + client + .cached_client() + .get_serde(req, &http_entry, cache_control, download) + }) .await .map_err(|err| match err { CachedClientError::Callback(err) => err, @@ -486,13 +501,17 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { archive } else { self.client - .cached_client() - .skip_cache(self.request(url)?, &http_entry, download) - .await - .map_err(|err| match err { - CachedClientError::Callback(err) => err, - CachedClientError::Client(err) => Error::Client(err), - })? + .managed(|client| async { + client + .cached_client() + .skip_cache(self.request(url)?, &http_entry, download) + .await + .map_err(|err| match err { + CachedClientError::Callback(err) => err, + CachedClientError::Client(err) => Error::Client(err), + }) + }) + .await? }; Ok(archive) @@ -574,7 +593,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { }; let req = self.request(url.clone())?; - let cache_control = match self.client.connectivity() { + let cache_control = match self.client.unmanaged.connectivity() { Connectivity::Online => CacheControl::from( self.build_context .cache() @@ -583,10 +602,14 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { ), Connectivity::Offline => CacheControl::AllowStale, }; + let archive = self .client - .cached_client() - .get_serde(req, &http_entry, cache_control, download) + .managed(|client| { + client + .cached_client() + .get_serde(req, &http_entry, cache_control, download) + }) .await .map_err(|err| match err { CachedClientError::Callback(err) => err, @@ -598,13 +621,17 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { archive } else { self.client - .cached_client() - .skip_cache(self.request(url)?, &http_entry, download) - .await - .map_err(|err| match err { - CachedClientError::Callback(err) => err, - CachedClientError::Client(err) => Error::Client(err), - })? + .managed(|client| async move { + client + .cached_client() + .skip_cache(self.request(url)?, &http_entry, download) + .await + .map_err(|err| match err { + CachedClientError::Callback(err) => err, + CachedClientError::Client(err) => Error::Client(err), + }) + }) + .await? }; Ok(archive) @@ -733,6 +760,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { /// Returns a GET [`reqwest::Request`] for the given URL. fn request(&self, url: Url) -> Result { self.client + .unmanaged .uncached_client() .get(url) .header( @@ -749,6 +777,39 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { pub fn index_locations(&self) -> &IndexLocations { self.build_context.index_locations() } + + /// Return the [`ManagedClient`] used by this resolver. + pub fn client(&self) -> &ManagedClient<'a> { + &self.client + } +} + +/// A wrapper around `RegistryClient` that manages a concurrency limit. +pub struct ManagedClient<'a> { + pub unmanaged: &'a RegistryClient, + control: Semaphore, +} + +impl<'a> ManagedClient<'a> { + /// Create a new `ManagedClient` using the given client and concurrency limit. + fn new(client: &'a RegistryClient, concurrency: usize) -> ManagedClient<'a> { + ManagedClient { + unmanaged: client, + control: Semaphore::new(concurrency), + } + } + + /// Perform a request using the client, respecting the concurrency limit. + /// + /// If the concurrency limit has been reached, this method will wait until a pending + /// operation completes before executing the closure. + pub async fn managed(&self, f: impl FnOnce(&'a RegistryClient) -> F) -> T + where + F: Future, + { + let _permit = self.control.acquire().await.unwrap(); + f(self.unmanaged).await + } } /// A pointer to an archive in the cache, fetched from an HTTP archive. diff --git a/crates/uv-distribution/src/source/mod.rs b/crates/uv-distribution/src/source/mod.rs index 92bc2cfc7..48eabd9b8 100644 --- a/crates/uv-distribution/src/source/mod.rs +++ b/crates/uv-distribution/src/source/mod.rs @@ -34,6 +34,7 @@ use uv_extract::hash::Hasher; use uv_fs::write_atomic; use uv_types::{BuildContext, SourceBuildTrait}; +use crate::distribution_database::ManagedClient; use crate::error::Error; use crate::git::{fetch_git_archive, resolve_precise}; use crate::source::built_wheel_metadata::BuiltWheelMetadata; @@ -45,7 +46,6 @@ mod revision; /// Fetch and build a source distribution from a remote source, or from a local cache. pub struct SourceDistributionBuilder<'a, T: BuildContext> { - client: &'a RegistryClient, build_context: &'a T, reporter: Option>, } @@ -61,9 +61,8 @@ pub(crate) const METADATA: &str = "metadata.msgpack"; impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { /// Initialize a [`SourceDistributionBuilder`] from a [`BuildContext`]. - pub fn new(client: &'a RegistryClient, build_context: &'a T) -> Self { + pub fn new(build_context: &'a T) -> Self { Self { - client, build_context, reporter: None, } @@ -84,6 +83,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { source: &BuildableSource<'_>, tags: &Tags, hashes: HashPolicy<'_>, + client: &ManagedClient<'_>, ) -> Result { let built_wheel_metadata = match &source { BuildableSource::Dist(SourceDist::Registry(dist)) => { @@ -129,6 +129,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { None, tags, hashes, + client, ) .boxed_local() .await? @@ -152,6 +153,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { subdirectory.as_deref(), tags, hashes, + client, ) .boxed_local() .await? @@ -204,6 +206,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { subdirectory.as_deref(), tags, hashes, + client, ) .boxed_local() .await? @@ -240,6 +243,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { &self, source: &BuildableSource<'_>, hashes: HashPolicy<'_>, + client: &ManagedClient<'_>, ) -> Result { let metadata = match &source { BuildableSource::Dist(SourceDist::Registry(dist)) => { @@ -282,6 +286,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { &cache_shard, None, hashes, + client, ) .boxed_local() .await? @@ -304,6 +309,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { &cache_shard, subdirectory.as_deref(), hashes, + client, ) .boxed_local() .await? @@ -349,6 +355,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { &cache_shard, subdirectory.as_deref(), hashes, + client, ) .boxed_local() .await? @@ -389,10 +396,11 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { subdirectory: Option<&'data Path>, tags: &Tags, hashes: HashPolicy<'_>, + client: &ManagedClient<'_>, ) -> Result { // Fetch the revision for the source distribution. let revision = self - .url_revision(source, filename, url, cache_shard, hashes) + .url_revision(source, filename, url, cache_shard, hashes, client) .await?; // Before running the build, check that the hashes match. @@ -457,10 +465,11 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { cache_shard: &CacheShard, subdirectory: Option<&'data Path>, hashes: HashPolicy<'_>, + client: &ManagedClient<'_>, ) -> Result { // Fetch the revision for the source distribution. let revision = self - .url_revision(source, filename, url, cache_shard, hashes) + .url_revision(source, filename, url, cache_shard, hashes, client) .await?; // Before running the build, check that the hashes match. @@ -546,9 +555,10 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { url: &Url, cache_shard: &CacheShard, hashes: HashPolicy<'_>, + client: &ManagedClient<'_>, ) -> Result { let cache_entry = cache_shard.entry(HTTP_REVISION); - let cache_control = match self.client.connectivity() { + let cache_control = match client.unmanaged.connectivity() { Connectivity::Online => CacheControl::from( self.build_context .cache() @@ -576,11 +586,13 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { .boxed_local() .instrument(info_span!("download", source_dist = %source)) }; - let req = self.request(url.clone())?; - let revision = self - .client - .cached_client() - .get_serde(req, &cache_entry, cache_control, download) + let req = Self::request(url.clone(), client.unmanaged)?; + let revision = client + .managed(|client| { + client + .cached_client() + .get_serde(req, &cache_entry, cache_control, download) + }) .await .map_err(|err| match err { CachedClientError::Callback(err) => err, @@ -591,14 +603,18 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { if revision.has_digests(hashes) { Ok(revision) } else { - self.client - .cached_client() - .skip_cache(self.request(url.clone())?, &cache_entry, download) - .await - .map_err(|err| match err { - CachedClientError::Callback(err) => err, - CachedClientError::Client(err) => Error::Client(err), + client + .managed(|client| async move { + client + .cached_client() + .skip_cache(Self::request(url.clone(), client)?, &cache_entry, download) + .await + .map_err(|err| match err { + CachedClientError::Callback(err) => err, + CachedClientError::Client(err) => Error::Client(err), + }) }) + .await } } @@ -1430,8 +1446,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { } /// Returns a GET [`reqwest::Request`] for the given URL. - fn request(&self, url: Url) -> Result { - self.client + fn request(url: Url, client: &RegistryClient) -> Result { + client .uncached_client() .get(url) .header( diff --git a/crates/uv-installer/src/downloader.rs b/crates/uv-installer/src/downloader.rs index 746355638..dbcf6f6ea 100644 --- a/crates/uv-installer/src/downloader.rs +++ b/crates/uv-installer/src/downloader.rs @@ -2,7 +2,7 @@ use std::cmp::Reverse; use std::path::Path; use std::sync::Arc; -use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{stream::FuturesUnordered, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use tokio::task::JoinError; use tracing::instrument; use url::Url; @@ -13,7 +13,6 @@ use distribution_types::{ }; use platform_tags::Tags; use uv_cache::Cache; -use uv_client::RegistryClient; use uv_distribution::{DistributionDatabase, LocalWheel}; use uv_types::{BuildContext, HashStrategy, InFlight}; @@ -50,14 +49,13 @@ impl<'a, Context: BuildContext> Downloader<'a, Context> { cache: &'a Cache, tags: &'a Tags, hashes: &'a HashStrategy, - client: &'a RegistryClient, - build_context: &'a Context, + database: DistributionDatabase<'a, Context>, ) -> Self { Self { tags, cache, hashes, - database: DistributionDatabase::new(client, build_context), + database, reporter: None, } } @@ -81,7 +79,8 @@ impl<'a, Context: BuildContext> Downloader<'a, Context> { distributions: Vec, in_flight: &'stream InFlight, ) -> impl Stream> + 'stream { - futures::stream::iter(distributions) + distributions + .into_iter() .map(|dist| async { let wheel = self.get_wheel(dist, in_flight).boxed_local().await?; if let Some(reporter) = self.reporter.as_ref() { @@ -89,9 +88,7 @@ impl<'a, Context: BuildContext> Downloader<'a, Context> { } Ok::(wheel) }) - // TODO(charlie): The number of concurrent fetches, such that we limit the number of - // concurrent builds to the number of cores, while allowing more concurrent downloads. - .buffer_unordered(50) + .collect::>() } /// Download, build, and unzip a set of downloaded wheels. @@ -126,7 +123,8 @@ impl<'a, Context: BuildContext> Downloader<'a, Context> { ) -> Result, Error> { // Build editables in parallel let mut results = Vec::with_capacity(editables.len()); - let mut fetches = futures::stream::iter(editables) + let mut fetches = editables + .into_iter() .map(|editable| async move { let task_id = self .reporter @@ -145,7 +143,7 @@ impl<'a, Context: BuildContext> Downloader<'a, Context> { } Ok::<_, Error>((editable, cached_dist, metadata)) }) - .buffer_unordered(50); + .collect::>(); while let Some((editable, wheel, metadata)) = fetches.next().await.transpose()? { if let Some(reporter) = self.reporter.as_ref() { diff --git a/crates/uv-requirements/src/lookahead.rs b/crates/uv-requirements/src/lookahead.rs index 3cbc3ea18..f9f00cb93 100644 --- a/crates/uv-requirements/src/lookahead.rs +++ b/crates/uv-requirements/src/lookahead.rs @@ -11,7 +11,6 @@ use distribution_types::{ }; use pep508_rs::MarkerEnvironment; use pypi_types::Metadata23; -use uv_client::RegistryClient; use uv_configuration::{Constraints, Overrides}; use uv_distribution::{DistributionDatabase, Reporter}; use uv_resolver::{InMemoryIndex, MetadataResponse}; @@ -71,9 +70,8 @@ impl<'a, Context: BuildContext> LookaheadResolver<'a, Context> { overrides: &'a Overrides, editables: &'a [(LocalEditable, Metadata23, Requirements)], hasher: &'a HashStrategy, - context: &'a Context, - client: &'a RegistryClient, index: &'a InMemoryIndex, + database: DistributionDatabase<'a, Context>, ) -> Self { Self { requirements, @@ -82,7 +80,7 @@ impl<'a, Context: BuildContext> LookaheadResolver<'a, Context> { editables, hasher, index, - database: DistributionDatabase::new(client, context), + database, } } diff --git a/crates/uv-requirements/src/source_tree.rs b/crates/uv-requirements/src/source_tree.rs index 054cd1c75..706bac3cf 100644 --- a/crates/uv-requirements/src/source_tree.rs +++ b/crates/uv-requirements/src/source_tree.rs @@ -2,7 +2,8 @@ use std::borrow::Cow; use std::path::{Path, PathBuf}; use anyhow::{Context, Result}; -use futures::{StreamExt, TryStreamExt}; +use futures::stream::FuturesOrdered; +use futures::TryStreamExt; use url::Url; use distribution_types::{ @@ -10,7 +11,6 @@ use distribution_types::{ }; use pep508_rs::RequirementOrigin; -use uv_client::RegistryClient; use uv_distribution::{DistributionDatabase, Reporter}; use uv_fs::Simplified; use uv_resolver::{InMemoryIndex, MetadataResponse}; @@ -41,16 +41,15 @@ impl<'a, Context: BuildContext> SourceTreeResolver<'a, Context> { source_trees: Vec, extras: &'a ExtrasSpecification, hasher: &'a HashStrategy, - context: &'a Context, - client: &'a RegistryClient, index: &'a InMemoryIndex, + database: DistributionDatabase<'a, Context>, ) -> Self { Self { source_trees, extras, hasher, index, - database: DistributionDatabase::new(client, context), + database, } } @@ -65,9 +64,11 @@ impl<'a, Context: BuildContext> SourceTreeResolver<'a, Context> { /// Resolve the requirements from the provided source trees. pub async fn resolve(self) -> Result> { - let requirements: Vec<_> = futures::stream::iter(self.source_trees.iter()) + let requirements: Vec<_> = self + .source_trees + .iter() .map(|source_tree| async { self.resolve_source_tree(source_tree).await }) - .buffered(50) + .collect::>() .try_collect() .await?; Ok(requirements diff --git a/crates/uv-requirements/src/unnamed.rs b/crates/uv-requirements/src/unnamed.rs index 727fd83c7..7aa264d86 100644 --- a/crates/uv-requirements/src/unnamed.rs +++ b/crates/uv-requirements/src/unnamed.rs @@ -4,7 +4,7 @@ use std::str::FromStr; use anyhow::Result; use configparser::ini::Ini; -use futures::{StreamExt, TryStreamExt}; +use futures::{stream::FuturesOrdered, TryStreamExt}; use serde::Deserialize; use tracing::debug; @@ -15,7 +15,6 @@ use distribution_types::{ }; use pep508_rs::{Scheme, UnnamedRequirement, VersionOrUrl}; use pypi_types::Metadata10; -use uv_client::RegistryClient; use uv_distribution::{DistributionDatabase, Reporter}; use uv_normalize::PackageName; use uv_resolver::{InMemoryIndex, MetadataResponse}; @@ -38,15 +37,14 @@ impl<'a, Context: BuildContext> NamedRequirementsResolver<'a, Context> { pub fn new( requirements: Vec, hasher: &'a HashStrategy, - context: &'a Context, - client: &'a RegistryClient, index: &'a InMemoryIndex, + database: DistributionDatabase<'a, Context>, ) -> Self { Self { requirements, hasher, index, - database: DistributionDatabase::new(client, context), + database, } } @@ -67,7 +65,8 @@ impl<'a, Context: BuildContext> NamedRequirementsResolver<'a, Context> { index, database, } = self; - futures::stream::iter(requirements) + requirements + .into_iter() .map(|entry| async { match entry.requirement { UnresolvedRequirement::Named(requirement) => Ok(requirement), @@ -76,7 +75,7 @@ impl<'a, Context: BuildContext> NamedRequirementsResolver<'a, Context> { )?), } }) - .buffered(50) + .collect::>() .try_collect() .await } diff --git a/crates/uv-resolver/src/resolver/mod.rs b/crates/uv-resolver/src/resolver/mod.rs index cf0b37b78..a848128ab 100644 --- a/crates/uv-resolver/src/resolver/mod.rs +++ b/crates/uv-resolver/src/resolver/mod.rs @@ -29,7 +29,6 @@ use pep508_rs::MarkerEnvironment; use platform_tags::Tags; use pypi_types::Metadata23; pub(crate) use urls::Urls; -use uv_client::RegistryClient; use uv_configuration::{Constraints, Overrides}; use uv_distribution::{ArchiveMetadata, DistributionDatabase}; use uv_normalize::PackageName; @@ -232,16 +231,15 @@ impl<'a, Context: BuildContext, InstalledPackages: InstalledPackagesProvider> python_requirement: &'a PythonRequirement, markers: Option<&'a MarkerEnvironment>, tags: &'a Tags, - client: &'a RegistryClient, flat_index: &'a FlatIndex, index: &'a InMemoryIndex, hasher: &'a HashStrategy, build_context: &'a Context, installed_packages: &'a InstalledPackages, + database: DistributionDatabase<'a, Context>, ) -> Result { let provider = DefaultResolverProvider::new( - client, - DistributionDatabase::new(client, build_context), + database, flat_index, tags, python_requirement.clone(), @@ -251,6 +249,7 @@ impl<'a, Context: BuildContext, InstalledPackages: InstalledPackagesProvider> build_context.no_binary(), build_context.no_build(), ); + Self::new_custom_io( manifest, options, @@ -1141,7 +1140,10 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide ) -> Result<(), ResolveError> { let mut response_stream = ReceiverStream::new(request_stream) .map(|request| self.process_request(request).boxed_local()) - .buffer_unordered(50); + // Allow as many futures as possible to start in the background. + // Backpressure is provided by at a more granular level by `DistributionDatabase` + // and `SourceDispatch`, as well as the bounded request channel. + .buffer_unordered(usize::MAX); while let Some(response) = response_stream.next().await { match response? { diff --git a/crates/uv-resolver/src/resolver/provider.rs b/crates/uv-resolver/src/resolver/provider.rs index 60eb80caf..c9c4a3e98 100644 --- a/crates/uv-resolver/src/resolver/provider.rs +++ b/crates/uv-resolver/src/resolver/provider.rs @@ -4,7 +4,6 @@ use anyhow::Result; use distribution_types::{Dist, IndexLocations}; use platform_tags::Tags; -use uv_client::RegistryClient; use uv_configuration::{NoBinary, NoBuild}; use uv_distribution::{ArchiveMetadata, DistributionDatabase}; use uv_normalize::PackageName; @@ -75,8 +74,6 @@ pub trait ResolverProvider { pub struct DefaultResolverProvider<'a, Context: BuildContext> { /// The [`DistributionDatabase`] used to build source distributions. fetcher: DistributionDatabase<'a, Context>, - /// The [`RegistryClient`] used to query the index. - client: RegistryClient, /// These are the entries from `--find-links` that act as overrides for index responses. flat_index: FlatIndex, tags: Tags, @@ -92,7 +89,6 @@ impl<'a, Context: BuildContext> DefaultResolverProvider<'a, Context> { /// Reads the flat index entries and builds the provider. #[allow(clippy::too_many_arguments)] pub fn new( - client: &'a RegistryClient, fetcher: DistributionDatabase<'a, Context>, flat_index: &'a FlatIndex, tags: &'a Tags, @@ -105,7 +101,6 @@ impl<'a, Context: BuildContext> DefaultResolverProvider<'a, Context> { ) -> Self { Self { fetcher, - client: client.clone(), flat_index: flat_index.clone(), tags: tags.clone(), python_requirement, @@ -124,7 +119,13 @@ impl<'a, Context: BuildContext> ResolverProvider for DefaultResolverProvider<'a, &'io self, package_name: &'io PackageName, ) -> PackageVersionsResult { - match self.client.simple(package_name).await { + let result = self + .fetcher + .client() + .managed(|client| client.simple(package_name)) + .await; + + match result { Ok(results) => Ok(VersionsResponse::Found( results .into_iter() diff --git a/crates/uv-resolver/tests/resolver.rs b/crates/uv-resolver/tests/resolver.rs index 1a3580301..1a9f00361 100644 --- a/crates/uv-resolver/tests/resolver.rs +++ b/crates/uv-resolver/tests/resolver.rs @@ -15,7 +15,10 @@ use pep508_rs::{MarkerEnvironment, MarkerEnvironmentBuilder}; use platform_tags::{Arch, Os, Platform, Tags}; use uv_cache::Cache; use uv_client::RegistryClientBuilder; -use uv_configuration::{BuildKind, Constraints, NoBinary, NoBuild, Overrides, SetupPyStrategy}; +use uv_configuration::{ + BuildKind, Concurrency, Constraints, NoBinary, NoBuild, Overrides, SetupPyStrategy, +}; +use uv_distribution::DistributionDatabase; use uv_interpreter::{find_default_python, Interpreter, PythonEnvironment}; use uv_resolver::{ DisplayResolutionGraph, ExcludeNewer, Exclusions, FlatIndex, InMemoryIndex, Manifest, Options, @@ -131,18 +134,19 @@ async fn resolve( let build_context = DummyContext::new(Cache::temp()?, interpreter.clone()); let hashes = HashStrategy::None; let installed_packages = EmptyInstalledPackages; + let concurrency = Concurrency::default(); let resolver = Resolver::new( manifest, options, &python_requirement, Some(markers), tags, - &client, &flat_index, &index, &hashes, &build_context, &installed_packages, + DistributionDatabase::new(&client, &build_context, concurrency.downloads), )?; Ok(resolver.resolve().await?) } diff --git a/crates/uv-workspace/src/combine.rs b/crates/uv-workspace/src/combine.rs index ddc34146f..5f958dc44 100644 --- a/crates/uv-workspace/src/combine.rs +++ b/crates/uv-workspace/src/combine.rs @@ -91,6 +91,8 @@ impl Combine for PipOptions { link_mode: self.link_mode.or(other.link_mode), compile_bytecode: self.compile_bytecode.or(other.compile_bytecode), require_hashes: self.require_hashes.or(other.require_hashes), + concurrent_downloads: self.concurrent_downloads.or(other.concurrent_downloads), + concurrent_builds: self.concurrent_builds.or(other.concurrent_builds), } } } diff --git a/crates/uv-workspace/src/settings.rs b/crates/uv-workspace/src/settings.rs index 359854b7c..133eca118 100644 --- a/crates/uv-workspace/src/settings.rs +++ b/crates/uv-workspace/src/settings.rs @@ -1,4 +1,4 @@ -use std::path::PathBuf; +use std::{num::NonZeroUsize, path::PathBuf}; use serde::Deserialize; @@ -85,4 +85,6 @@ pub struct PipOptions { pub link_mode: Option, pub compile_bytecode: Option, pub require_hashes: Option, + pub concurrent_downloads: Option, + pub concurrent_builds: Option, } diff --git a/crates/uv/src/commands/pip_compile.rs b/crates/uv/src/commands/pip_compile.rs index 0f8870fae..208054302 100644 --- a/crates/uv/src/commands/pip_compile.rs +++ b/crates/uv/src/commands/pip_compile.rs @@ -28,11 +28,12 @@ use uv_auth::store_credentials_from_url; use uv_cache::Cache; use uv_client::{BaseClientBuilder, Connectivity, FlatIndexClient, RegistryClientBuilder}; use uv_configuration::{ - ConfigSettings, Constraints, IndexStrategy, NoBinary, NoBuild, Overrides, PreviewMode, - SetupPyStrategy, Upgrade, + Concurrency, ConfigSettings, Constraints, IndexStrategy, NoBinary, NoBuild, Overrides, + PreviewMode, SetupPyStrategy, Upgrade, }; use uv_configuration::{KeyringProviderType, TargetTriple}; use uv_dispatch::BuildDispatch; +use uv_distribution::DistributionDatabase; use uv_fs::Simplified; use uv_installer::Downloader; use uv_interpreter::PythonVersion; @@ -91,6 +92,7 @@ pub(crate) async fn pip_compile( link_mode: LinkMode, python: Option, system: bool, + concurrency: Concurrency, uv_lock: bool, native_tls: bool, quiet: bool, @@ -315,6 +317,7 @@ pub(crate) async fn pip_compile( link_mode, &no_build, &NoBinary::None, + concurrency, ) .with_options(OptionsBuilder::new().exclude_newer(exclude_newer).build()); @@ -324,9 +327,8 @@ pub(crate) async fn pip_compile( let mut requirements = NamedRequirementsResolver::new( requirements, &hasher, - &build_dispatch, - &client, &top_level_index, + DistributionDatabase::new(&client, &build_dispatch, concurrency.downloads), ) .with_reporter(ResolverReporter::from(printer)) .resolve() @@ -339,9 +341,8 @@ pub(crate) async fn pip_compile( source_trees, &extras, &hasher, - &build_dispatch, - &client, &top_level_index, + DistributionDatabase::new(&client, &build_dispatch, concurrency.downloads), ) .with_reporter(ResolverReporter::from(printer)) .resolve() @@ -356,9 +357,8 @@ pub(crate) async fn pip_compile( let overrides = NamedRequirementsResolver::new( overrides, &hasher, - &build_dispatch, - &client, &top_level_index, + DistributionDatabase::new(&client, &build_dispatch, concurrency.downloads), ) .with_reporter(ResolverReporter::from(printer)) .resolve() @@ -432,8 +432,13 @@ pub(crate) async fn pip_compile( LocalEditable { url, path, extras } })); - let downloader = Downloader::new(&cache, &tags, &hasher, &client, &build_dispatch) - .with_reporter(DownloadReporter::from(printer).with_length(editables.len() as u64)); + let downloader = Downloader::new( + &cache, + &tags, + &hasher, + DistributionDatabase::new(&client, &build_dispatch, concurrency.downloads), + ) + .with_reporter(DownloadReporter::from(printer).with_length(editables.len() as u64)); // Build all editables. let editable_wheel_dir = tempdir_in(cache.root())?; @@ -498,9 +503,8 @@ pub(crate) async fn pip_compile( &overrides, &editables, &hasher, - &build_dispatch, - &client, &top_level_index, + DistributionDatabase::new(&client, &build_dispatch, concurrency.downloads), ) .with_reporter(ResolverReporter::from(printer)) .resolve(marker_filter) @@ -537,12 +541,12 @@ pub(crate) async fn pip_compile( &python_requirement, marker_filter, &tags, - &client, &flat_index, &top_level_index, &hasher, &build_dispatch, &EmptyInstalledPackages, + DistributionDatabase::new(&client, &build_dispatch, concurrency.downloads), )? .with_reporter(ResolverReporter::from(printer)); diff --git a/crates/uv/src/commands/pip_install.rs b/crates/uv/src/commands/pip_install.rs index 1cc81fc1f..5e1b7a2bf 100644 --- a/crates/uv/src/commands/pip_install.rs +++ b/crates/uv/src/commands/pip_install.rs @@ -28,11 +28,12 @@ use uv_client::{ BaseClientBuilder, Connectivity, FlatIndexClient, RegistryClient, RegistryClientBuilder, }; use uv_configuration::{ - ConfigSettings, Constraints, IndexStrategy, NoBinary, NoBuild, Overrides, PreviewMode, - Reinstall, SetupPyStrategy, Upgrade, + Concurrency, ConfigSettings, Constraints, IndexStrategy, NoBinary, NoBuild, Overrides, + PreviewMode, Reinstall, SetupPyStrategy, Upgrade, }; use uv_configuration::{KeyringProviderType, TargetTriple}; use uv_dispatch::BuildDispatch; +use uv_distribution::DistributionDatabase; use uv_fs::Simplified; use uv_installer::{ BuiltEditable, Downloader, Plan, Planner, ResolvedEditable, SatisfiesResult, SitePackages, @@ -89,6 +90,7 @@ pub(crate) async fn pip_install( system: bool, break_system_packages: bool, target: Option, + concurrency: Concurrency, uv_lock: Option, native_tls: bool, preview: PreviewMode, @@ -334,6 +336,7 @@ pub(crate) async fn pip_install( link_mode, &no_build, &no_binary, + concurrency, ) .with_options(OptionsBuilder::new().exclude_newer(exclude_newer).build()); @@ -352,6 +355,7 @@ pub(crate) async fn pip_install( &cache, &interpreter, &tags, + concurrency, &client, &resolve_dispatch, printer, @@ -372,9 +376,8 @@ pub(crate) async fn pip_install( let mut requirements = NamedRequirementsResolver::new( requirements, &hasher, - &resolve_dispatch, - &client, &index, + DistributionDatabase::new(&client, &resolve_dispatch, concurrency.downloads), ) .with_reporter(ResolverReporter::from(printer)) .resolve() @@ -387,9 +390,12 @@ pub(crate) async fn pip_install( source_trees, extras, &hasher, - &resolve_dispatch, - &client, &index, + DistributionDatabase::new( + &client, + &resolve_dispatch, + concurrency.downloads, + ), ) .with_reporter(ResolverReporter::from(printer)) .resolve() @@ -401,11 +407,15 @@ pub(crate) async fn pip_install( }; // Resolve the overrides from the provided sources. - let overrides = - NamedRequirementsResolver::new(overrides, &hasher, &resolve_dispatch, &client, &index) - .with_reporter(ResolverReporter::from(printer)) - .resolve() - .await?; + let overrides = NamedRequirementsResolver::new( + overrides, + &hasher, + &index, + DistributionDatabase::new(&client, &resolve_dispatch, concurrency.downloads), + ) + .with_reporter(ResolverReporter::from(printer)) + .resolve() + .await?; let options = OptionsBuilder::new() .resolution_mode(resolution_mode) @@ -432,6 +442,7 @@ pub(crate) async fn pip_install( &flat_index, &index, &resolve_dispatch, + concurrency, options, printer, ) @@ -470,6 +481,7 @@ pub(crate) async fn pip_install( link_mode, &no_build, &no_binary, + concurrency, ) .with_options(OptionsBuilder::new().exclude_newer(exclude_newer).build()) }; @@ -488,6 +500,7 @@ pub(crate) async fn pip_install( &tags, &client, &in_flight, + concurrency, &install_dispatch, &cache, &venv, @@ -566,14 +579,20 @@ async fn build_editables( cache: &Cache, interpreter: &Interpreter, tags: &Tags, + concurrency: Concurrency, client: &RegistryClient, build_dispatch: &BuildDispatch<'_>, printer: Printer, ) -> Result, Error> { let start = std::time::Instant::now(); - let downloader = Downloader::new(cache, tags, hasher, client, build_dispatch) - .with_reporter(DownloadReporter::from(printer).with_length(editables.len() as u64)); + let downloader = Downloader::new( + cache, + tags, + hasher, + DistributionDatabase::new(client, build_dispatch, concurrency.downloads), + ) + .with_reporter(DownloadReporter::from(printer).with_length(editables.len() as u64)); let editables = LocalEditables::from_editables(editables.iter().map(|editable| { let EditableRequirement { @@ -645,6 +664,7 @@ async fn resolve( flat_index: &FlatIndex, index: &InMemoryIndex, build_dispatch: &BuildDispatch<'_>, + concurrency: Concurrency, options: Options, printer: Printer, ) -> Result { @@ -723,9 +743,8 @@ async fn resolve( &overrides, &editables, hasher, - build_dispatch, - client, index, + DistributionDatabase::new(client, build_dispatch, concurrency.downloads), ) .with_reporter(ResolverReporter::from(printer)) .resolve(Some(markers)) @@ -753,12 +772,12 @@ async fn resolve( &python_requirement, Some(markers), tags, - client, flat_index, index, hasher, build_dispatch, site_packages, + DistributionDatabase::new(client, build_dispatch, concurrency.downloads), )? .with_reporter(ResolverReporter::from(printer)); let resolution = resolver.resolve().await?; @@ -804,6 +823,7 @@ async fn install( tags: &Tags, client: &RegistryClient, in_flight: &InFlight, + concurrency: Concurrency, build_dispatch: &BuildDispatch<'_>, cache: &Cache, venv: &PythonEnvironment, @@ -881,8 +901,13 @@ async fn install( } else { let start = std::time::Instant::now(); - let downloader = Downloader::new(cache, tags, hasher, client, build_dispatch) - .with_reporter(DownloadReporter::from(printer).with_length(remote.len() as u64)); + let downloader = Downloader::new( + cache, + tags, + hasher, + DistributionDatabase::new(client, build_dispatch, concurrency.downloads), + ) + .with_reporter(DownloadReporter::from(printer).with_length(remote.len() as u64)); let wheels = downloader .download(remote.clone(), in_flight) diff --git a/crates/uv/src/commands/pip_sync.rs b/crates/uv/src/commands/pip_sync.rs index 068b06b61..008194f50 100644 --- a/crates/uv/src/commands/pip_sync.rs +++ b/crates/uv/src/commands/pip_sync.rs @@ -20,10 +20,12 @@ use uv_client::{ BaseClientBuilder, Connectivity, FlatIndexClient, RegistryClient, RegistryClientBuilder, }; use uv_configuration::{ - ConfigSettings, IndexStrategy, NoBinary, NoBuild, PreviewMode, Reinstall, SetupPyStrategy, + Concurrency, ConfigSettings, IndexStrategy, NoBinary, NoBuild, PreviewMode, Reinstall, + SetupPyStrategy, }; use uv_configuration::{KeyringProviderType, TargetTriple}; use uv_dispatch::BuildDispatch; +use uv_distribution::DistributionDatabase; use uv_fs::Simplified; use uv_installer::{is_dynamic, Downloader, Plan, Planner, ResolvedEditable, SitePackages}; use uv_interpreter::{Interpreter, PythonEnvironment, PythonVersion, Target}; @@ -65,6 +67,7 @@ pub(crate) async fn pip_sync( system: bool, break_system_packages: bool, target: Option, + concurrency: Concurrency, native_tls: bool, preview: PreviewMode, cache: Cache, @@ -261,16 +264,21 @@ pub(crate) async fn pip_sync( link_mode, &no_build, &no_binary, + concurrency, ); // Convert from unnamed to named requirements. let requirements = { // Convert from unnamed to named requirements. - let mut requirements = - NamedRequirementsResolver::new(requirements, &hasher, &build_dispatch, &client, &index) - .with_reporter(ResolverReporter::from(printer)) - .resolve() - .await?; + let mut requirements = NamedRequirementsResolver::new( + requirements, + &hasher, + &index, + DistributionDatabase::new(&client, &build_dispatch, concurrency.downloads), + ) + .with_reporter(ResolverReporter::from(printer)) + .resolve() + .await?; // Resolve any source trees into requirements. if !source_trees.is_empty() { @@ -279,9 +287,8 @@ pub(crate) async fn pip_sync( source_trees, &ExtrasSpecification::None, &hasher, - &build_dispatch, - &client, &index, + DistributionDatabase::new(&client, &build_dispatch, concurrency.downloads), ) .with_reporter(ResolverReporter::from(printer)) .resolve() @@ -303,6 +310,7 @@ pub(crate) async fn pip_sync( &cache, &client, &build_dispatch, + concurrency, printer, ) .await?; @@ -373,13 +381,13 @@ pub(crate) async fn pip_sync( &python_requirement, Some(markers), tags, - &client, &flat_index, &index, &hasher, &build_dispatch, // TODO(zanieb): We should consider support for installed packages in pip sync &EmptyInstalledPackages, + DistributionDatabase::new(&client, &build_dispatch, concurrency.downloads), )? .with_reporter(reporter); @@ -420,8 +428,13 @@ pub(crate) async fn pip_sync( } else { let start = std::time::Instant::now(); - let downloader = Downloader::new(&cache, &tags, &hasher, &client, &build_dispatch) - .with_reporter(DownloadReporter::from(printer).with_length(remote.len() as u64)); + let downloader = Downloader::new( + &cache, + &tags, + &hasher, + DistributionDatabase::new(&client, &build_dispatch, concurrency.downloads), + ) + .with_reporter(DownloadReporter::from(printer).with_length(remote.len() as u64)); let wheels = downloader .download(remote.clone(), &in_flight) @@ -622,6 +635,7 @@ async fn resolve_editables( cache: &Cache, client: &RegistryClient, build_dispatch: &BuildDispatch<'_>, + concurrency: Concurrency, printer: Printer, ) -> Result { // Partition the editables into those that are already installed, and those that must be built. @@ -683,8 +697,13 @@ async fn resolve_editables( } else { let start = std::time::Instant::now(); - let downloader = Downloader::new(cache, tags, hasher, client, build_dispatch) - .with_reporter(DownloadReporter::from(printer).with_length(uninstalled.len() as u64)); + let downloader = Downloader::new( + cache, + tags, + hasher, + DistributionDatabase::new(client, build_dispatch, concurrency.downloads), + ) + .with_reporter(DownloadReporter::from(printer).with_length(uninstalled.len() as u64)); let editables = LocalEditables::from_editables(uninstalled.iter().map(|editable| { let EditableRequirement { diff --git a/crates/uv/src/commands/project/lock.rs b/crates/uv/src/commands/project/lock.rs index 0450b5304..1ddd84da1 100644 --- a/crates/uv/src/commands/project/lock.rs +++ b/crates/uv/src/commands/project/lock.rs @@ -5,7 +5,9 @@ use distribution_types::IndexLocations; use install_wheel_rs::linker::LinkMode; use uv_cache::Cache; use uv_client::{BaseClientBuilder, RegistryClientBuilder}; -use uv_configuration::{ConfigSettings, NoBinary, NoBuild, PreviewMode, SetupPyStrategy}; +use uv_configuration::{ + Concurrency, ConfigSettings, NoBinary, NoBuild, PreviewMode, SetupPyStrategy, +}; use uv_dispatch::BuildDispatch; use uv_requirements::{ExtrasSpecification, RequirementsSpecification}; use uv_resolver::{FlatIndex, InMemoryIndex, OptionsBuilder}; @@ -78,6 +80,7 @@ pub(crate) async fn lock( let no_binary = NoBinary::default(); let no_build = NoBuild::default(); let setup_py = SetupPyStrategy::default(); + let concurrency = Concurrency::default(); // Create a build dispatch. let build_dispatch = BuildDispatch::new( @@ -94,6 +97,7 @@ pub(crate) async fn lock( link_mode, &no_build, &no_binary, + concurrency, ); let options = OptionsBuilder::new() @@ -117,6 +121,7 @@ pub(crate) async fn lock( &build_dispatch, options, printer, + concurrency, ) .await; diff --git a/crates/uv/src/commands/project/mod.rs b/crates/uv/src/commands/project/mod.rs index 4cd1616d0..c37e77865 100644 --- a/crates/uv/src/commands/project/mod.rs +++ b/crates/uv/src/commands/project/mod.rs @@ -11,8 +11,9 @@ use platform_tags::Tags; use pypi_types::Yanked; use uv_cache::Cache; use uv_client::RegistryClient; -use uv_configuration::{Constraints, NoBinary, Overrides, Reinstall}; +use uv_configuration::{Concurrency, Constraints, NoBinary, Overrides, Reinstall}; use uv_dispatch::BuildDispatch; +use uv_distribution::DistributionDatabase; use uv_fs::Simplified; use uv_installer::{Downloader, Plan, Planner, SitePackages}; use uv_interpreter::{find_default_python, Interpreter, PythonEnvironment}; @@ -124,6 +125,7 @@ pub(crate) async fn resolve( build_dispatch: &BuildDispatch<'_>, options: Options, printer: Printer, + concurrency: Concurrency, ) -> Result { let start = std::time::Instant::now(); @@ -141,9 +143,8 @@ pub(crate) async fn resolve( let mut requirements = NamedRequirementsResolver::new( spec.requirements, hasher, - build_dispatch, - client, index, + DistributionDatabase::new(client, build_dispatch, concurrency.downloads), ) .with_reporter(ResolverReporter::from(printer)) .resolve() @@ -156,9 +157,8 @@ pub(crate) async fn resolve( spec.source_trees, &ExtrasSpecification::None, hasher, - build_dispatch, - client, index, + DistributionDatabase::new(client, build_dispatch, concurrency.downloads), ) .with_reporter(ResolverReporter::from(printer)) .resolve() @@ -176,9 +176,8 @@ pub(crate) async fn resolve( &overrides, &editables, hasher, - build_dispatch, - client, index, + DistributionDatabase::new(client, build_dispatch, concurrency.downloads), ) .with_reporter(ResolverReporter::from(printer)) .resolve(Some(markers)) @@ -203,12 +202,12 @@ pub(crate) async fn resolve( &python_requirement, Some(markers), tags, - client, flat_index, index, hasher, build_dispatch, &installed_packages, + DistributionDatabase::new(client, build_dispatch, concurrency.downloads), )? .with_reporter(ResolverReporter::from(printer)); let resolution = resolver.resolve().await?; @@ -255,6 +254,7 @@ pub(crate) async fn install( cache: &Cache, venv: &PythonEnvironment, printer: Printer, + concurrency: Concurrency, ) -> Result<(), Error> { let start = std::time::Instant::now(); @@ -316,8 +316,13 @@ pub(crate) async fn install( } else { let start = std::time::Instant::now(); - let downloader = Downloader::new(cache, tags, hasher, client, build_dispatch) - .with_reporter(DownloadReporter::from(printer).with_length(remote.len() as u64)); + let downloader = Downloader::new( + cache, + tags, + hasher, + DistributionDatabase::new(client, build_dispatch, concurrency.downloads), + ) + .with_reporter(DownloadReporter::from(printer).with_length(remote.len() as u64)); let wheels = downloader .download(remote.clone(), in_flight) diff --git a/crates/uv/src/commands/project/run.rs b/crates/uv/src/commands/project/run.rs index 77d566af5..7df20e9fa 100644 --- a/crates/uv/src/commands/project/run.rs +++ b/crates/uv/src/commands/project/run.rs @@ -11,7 +11,9 @@ use distribution_types::{IndexLocations, Resolution}; use install_wheel_rs::linker::LinkMode; use uv_cache::Cache; use uv_client::{BaseClientBuilder, RegistryClientBuilder}; -use uv_configuration::{ConfigSettings, NoBinary, NoBuild, PreviewMode, SetupPyStrategy}; +use uv_configuration::{ + Concurrency, ConfigSettings, NoBinary, NoBuild, PreviewMode, SetupPyStrategy, +}; use uv_dispatch::BuildDispatch; use uv_installer::{SatisfiesResult, SitePackages}; use uv_interpreter::PythonEnvironment; @@ -259,6 +261,7 @@ async fn update_environment( let no_binary = NoBinary::default(); let no_build = NoBuild::default(); let setup_py = SetupPyStrategy::default(); + let concurrency = Concurrency::default(); // Create a build dispatch. let build_dispatch = BuildDispatch::new( @@ -275,6 +278,7 @@ async fn update_environment( link_mode, &no_build, &no_binary, + concurrency, ); let options = OptionsBuilder::new() @@ -298,6 +302,7 @@ async fn update_environment( &build_dispatch, options, printer, + concurrency, ) .await { @@ -323,6 +328,7 @@ async fn update_environment( cache, &venv, printer, + concurrency, ) .await?; diff --git a/crates/uv/src/commands/project/sync.rs b/crates/uv/src/commands/project/sync.rs index 92fe51be2..e6d74f355 100644 --- a/crates/uv/src/commands/project/sync.rs +++ b/crates/uv/src/commands/project/sync.rs @@ -4,7 +4,9 @@ use distribution_types::IndexLocations; use install_wheel_rs::linker::LinkMode; use uv_cache::Cache; use uv_client::RegistryClientBuilder; -use uv_configuration::{ConfigSettings, NoBinary, NoBuild, PreviewMode, SetupPyStrategy}; +use uv_configuration::{ + Concurrency, ConfigSettings, NoBinary, NoBuild, PreviewMode, SetupPyStrategy, +}; use uv_dispatch::BuildDispatch; use uv_installer::SitePackages; use uv_resolver::{FlatIndex, InMemoryIndex, Lock}; @@ -64,6 +66,7 @@ pub(crate) async fn sync( let no_binary = NoBinary::default(); let no_build = NoBuild::default(); let setup_py = SetupPyStrategy::default(); + let concurrency = Concurrency::default(); // Create a build dispatch. let build_dispatch = BuildDispatch::new( @@ -80,6 +83,7 @@ pub(crate) async fn sync( link_mode, &no_build, &no_binary, + concurrency, ); // Sync the environment. @@ -97,6 +101,7 @@ pub(crate) async fn sync( cache, &venv, printer, + concurrency, ) .await?; diff --git a/crates/uv/src/commands/venv.rs b/crates/uv/src/commands/venv.rs index 4f4572786..99d5d2437 100644 --- a/crates/uv/src/commands/venv.rs +++ b/crates/uv/src/commands/venv.rs @@ -15,7 +15,7 @@ use install_wheel_rs::linker::LinkMode; use uv_auth::store_credentials_from_url; use uv_cache::Cache; use uv_client::{Connectivity, FlatIndexClient, RegistryClientBuilder}; -use uv_configuration::KeyringProviderType; +use uv_configuration::{Concurrency, KeyringProviderType}; use uv_configuration::{ConfigSettings, IndexStrategy, NoBinary, NoBuild, SetupPyStrategy}; use uv_dispatch::BuildDispatch; use uv_fs::Simplified; @@ -194,8 +194,9 @@ async fn venv_impl( // Track in-flight downloads, builds, etc., across resolutions. let in_flight = InFlight::default(); - // For seed packages, assume the default settings are sufficient. + // For seed packages, assume the default settings and concurrency is sufficient. let config_settings = ConfigSettings::default(); + let concurrency = Concurrency::default(); // Prep the build context. let build_dispatch = BuildDispatch::new( @@ -212,6 +213,7 @@ async fn venv_impl( link_mode, &NoBuild::All, &NoBinary::None, + concurrency, ) .with_options(OptionsBuilder::new().exclude_newer(exclude_newer).build()); diff --git a/crates/uv/src/main.rs b/crates/uv/src/main.rs index 22fc37e50..7290c7929 100644 --- a/crates/uv/src/main.rs +++ b/crates/uv/src/main.rs @@ -229,6 +229,7 @@ async fn run() -> Result { args.shared.link_mode, args.shared.python, args.shared.system, + args.shared.concurrency, args.uv_lock, globals.native_tls, globals.quiet, @@ -275,6 +276,7 @@ async fn run() -> Result { args.shared.system, args.shared.break_system_packages, args.shared.target, + args.shared.concurrency, globals.native_tls, globals.preview, cache, @@ -342,6 +344,7 @@ async fn run() -> Result { args.shared.system, args.shared.break_system_packages, args.shared.target, + args.shared.concurrency, args.uv_lock, globals.native_tls, globals.preview, diff --git a/crates/uv/src/settings.rs b/crates/uv/src/settings.rs index 8efaf41eb..8cb4a9c3c 100644 --- a/crates/uv/src/settings.rs +++ b/crates/uv/src/settings.rs @@ -1,13 +1,17 @@ +use std::env::VarError; use std::ffi::OsString; +use std::num::NonZeroUsize; use std::path::PathBuf; +use std::process; +use std::str::FromStr; use distribution_types::IndexLocations; use install_wheel_rs::linker::LinkMode; use uv_cache::{CacheArgs, Refresh}; use uv_client::Connectivity; use uv_configuration::{ - ConfigSettings, IndexStrategy, KeyringProviderType, NoBinary, NoBuild, PreviewMode, Reinstall, - SetupPyStrategy, TargetTriple, Upgrade, + Concurrency, ConfigSettings, IndexStrategy, KeyringProviderType, NoBinary, NoBuild, + PreviewMode, Reinstall, SetupPyStrategy, TargetTriple, Upgrade, }; use uv_interpreter::{PythonVersion, Target}; use uv_normalize::PackageName; @@ -299,6 +303,8 @@ impl PipCompileSettings { emit_index_annotation: flag(emit_index_annotation, no_emit_index_annotation), annotation_style, link_mode, + concurrent_builds: env(env::CONCURRENT_BUILDS), + concurrent_downloads: env(env::CONCURRENT_DOWNLOADS), ..PipOptions::default() }, workspace, @@ -405,6 +411,8 @@ impl PipSyncSettings { link_mode, compile_bytecode: flag(compile_bytecode, no_compile_bytecode), require_hashes: flag(require_hashes, no_require_hashes), + concurrent_builds: env(env::CONCURRENT_BUILDS), + concurrent_downloads: env(env::CONCURRENT_DOWNLOADS), ..PipOptions::default() }, workspace, @@ -555,6 +563,8 @@ impl PipInstallSettings { link_mode, compile_bytecode: flag(compile_bytecode, no_compile_bytecode), require_hashes: flag(require_hashes, no_require_hashes), + concurrent_builds: env(env::CONCURRENT_BUILDS), + concurrent_downloads: env(env::CONCURRENT_DOWNLOADS), ..PipOptions::default() }, workspace, @@ -893,6 +903,7 @@ pub(crate) struct PipSharedSettings { pub(crate) link_mode: LinkMode, pub(crate) compile_bytecode: bool, pub(crate) require_hashes: bool, + pub(crate) concurrency: Concurrency, } impl PipSharedSettings { @@ -940,6 +951,8 @@ impl PipSharedSettings { link_mode, compile_bytecode, require_hashes, + concurrent_builds, + concurrent_downloads, } = workspace .and_then(|workspace| workspace.options.pip) .unwrap_or_default(); @@ -1025,10 +1038,56 @@ impl PipSharedSettings { .or(compile_bytecode) .unwrap_or_default(), strict: args.strict.or(strict).unwrap_or_default(), + concurrency: Concurrency { + downloads: args + .concurrent_downloads + .or(concurrent_downloads) + .map_or(Concurrency::DEFAULT_DOWNLOADS, NonZeroUsize::get), + builds: args + .concurrent_builds + .or(concurrent_builds) + .map_or_else(Concurrency::default_builds, NonZeroUsize::get), + }, } } } +// Environment variables that are not exposed as CLI arguments. +mod env { + pub(super) const CONCURRENT_DOWNLOADS: (&str, &str) = + ("UV_CONCURRENT_DOWNLOADS", "a non-zero integer"); + + pub(super) const CONCURRENT_BUILDS: (&str, &str) = + ("UV_CONCURRENT_BUILDS", "a non-zero integer"); +} + +/// Attempt to load and parse an environment variable with the given name. +/// +/// Exits the program and prints an error message containing the expected type if +/// parsing values. +fn env((name, expected): (&str, &str)) -> Option +where + T: FromStr, +{ + let val = match std::env::var(name) { + Ok(val) => val, + Err(VarError::NotPresent) => return None, + Err(VarError::NotUnicode(_)) => parse_failure(name, expected), + }; + + Some( + val.parse() + .unwrap_or_else(|_| parse_failure(name, expected)), + ) +} + +/// Prints a parse error and exits the process. +#[allow(clippy::exit, clippy::print_stderr)] +fn parse_failure(name: &str, expected: &str) -> ! { + eprintln!("error: invalid value for {name}, expected {expected}"); + process::exit(1) +} + /// Given a boolean flag pair (like `--upgrade` and `--no-upgrade`), resolve the value of the flag. fn flag(yes: bool, no: bool) -> Option { match (yes, no) { diff --git a/uv.schema.json b/uv.schema.json index 829f90da9..3141daeee 100644 --- a/uv.schema.json +++ b/uv.schema.json @@ -248,6 +248,22 @@ "null" ] }, + "concurrent-builds": { + "type": [ + "integer", + "null" + ], + "format": "uint", + "minimum": 1.0 + }, + "concurrent-downloads": { + "type": [ + "integer", + "null" + ], + "format": "uint", + "minimum": 1.0 + }, "config-settings": { "anyOf": [ {