Switch to Current-Thread Tokio Runtime (#4934)

## Summary

Move completely off tokio's multi-threaded runtime. We've slowly been
making changes to be smarter about scheduling in various places instead
of depending on tokio's general purpose work-stealing, notably
https://github.com/astral-sh/uv/pull/3627 and
https://github.com/astral-sh/uv/pull/4004. We now no longer benefit from
the multi-threaded runtime, as we run on all I/O on the main thread.
There's one remaining instance of `block_in_place` that can be swapped
for `rayon::spawn`.

This change is a small performance improvement due to removing some
unnecessary overhead of the multi-threaded runtime (e.g. spawning
threads), but nothing major. It also removes some noise from profiles.

## Test Plan

```
Benchmark 1: ./target/profiling/uv (resolve-warm)
  Time (mean ± σ):      14.9 ms ±   0.3 ms    [User: 3.0 ms, System: 17.3 ms]
  Range (min … max):    14.1 ms …  15.8 ms    169 runs
 
Benchmark 2: ./target/profiling/baseline (resolve-warm)
  Time (mean ± σ):      16.1 ms ±   0.3 ms    [User: 3.9 ms, System: 18.7 ms]
  Range (min … max):    15.1 ms …  17.3 ms    162 runs
 
Summary
  ./target/profiling/uv (resolve-warm) ran
    1.08 ± 0.03 times faster than ./target/profiling/baseline (resolve-warm)
```
This commit is contained in:
Ibraheem Ahmed 2024-07-09 18:21:16 -04:00 committed by GitHub
parent 540ff24302
commit aff9c9bd91
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 87 additions and 39 deletions

View file

@ -36,7 +36,7 @@ fn resolve_warm_jupyter(c: &mut Criterion<WallTime>) {
}
fn resolve_warm_airflow(c: &mut Criterion<WallTime>) {
let runtime = &tokio::runtime::Builder::new_multi_thread()
let runtime = &tokio::runtime::Builder::new_current_thread()
// CodSpeed limits the total number of threads to 500
.max_blocking_threads(256)
.enable_all()

View file

@ -84,7 +84,7 @@ async fn run() -> Result<()> {
Ok(())
}
#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() -> ExitCode {
let (duration_layer, _guard) = if let Ok(location) = env::var("TRACING_DURATIONS_FILE") {
let location = PathBuf::from(location);

View file

@ -278,16 +278,17 @@ impl<'a> BuildContext for BuildDispatch<'a> {
}
// Install the resolved distributions.
let wheels = wheels.into_iter().chain(cached).collect::<Vec<_>>();
let mut wheels = wheels.into_iter().chain(cached).collect::<Vec<_>>();
if !wheels.is_empty() {
debug!(
"Installing build requirement{}: {}",
if wheels.len() == 1 { "" } else { "s" },
wheels.iter().map(ToString::to_string).join(", ")
);
Installer::new(venv)
wheels = Installer::new(venv)
.with_link_mode(self.link_mode)
.install(&wheels)
.install(wheels)
.await
.context("Failed to install build dependencies")?;
}

View file

@ -1,5 +1,9 @@
use std::convert;
use anyhow::{Context, Error, Result};
use install_wheel_rs::{linker::LinkMode, Layout};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use tokio::sync::oneshot;
use tracing::instrument;
use distribution_types::CachedDist;
@ -7,7 +11,7 @@ use uv_python::PythonEnvironment;
pub struct Installer<'a> {
venv: &'a PythonEnvironment,
link_mode: install_wheel_rs::linker::LinkMode,
link_mode: LinkMode,
reporter: Option<Box<dyn Reporter>>,
installer_name: Option<String>,
}
@ -17,7 +21,7 @@ impl<'a> Installer<'a> {
pub fn new(venv: &'a PythonEnvironment) -> Self {
Self {
venv,
link_mode: install_wheel_rs::linker::LinkMode::default(),
link_mode: LinkMode::default(),
reporter: None,
installer_name: Some("uv".to_string()),
}
@ -25,7 +29,7 @@ impl<'a> Installer<'a> {
/// Set the [`LinkMode`][`install_wheel_rs::linker::LinkMode`] to use for this installer.
#[must_use]
pub fn with_link_mode(self, link_mode: install_wheel_rs::linker::LinkMode) -> Self {
pub fn with_link_mode(self, link_mode: LinkMode) -> Self {
Self { link_mode, ..self }
}
@ -49,33 +53,73 @@ impl<'a> Installer<'a> {
/// Install a set of wheels into a Python virtual environment.
#[instrument(skip_all, fields(num_wheels = %wheels.len()))]
pub fn install(self, wheels: &[CachedDist]) -> Result<()> {
let layout = self.venv.interpreter().layout();
tokio::task::block_in_place(|| {
wheels.par_iter().try_for_each(|wheel| {
install_wheel_rs::linker::install_wheel(
&layout,
wheel.path(),
wheel.filename(),
wheel
.parsed_url()?
.as_ref()
.map(pypi_types::DirectUrl::try_from)
.transpose()?
.as_ref(),
self.installer_name.as_deref(),
self.link_mode,
)
.with_context(|| format!("Failed to install: {} ({wheel})", wheel.filename()))?;
pub async fn install(self, wheels: Vec<CachedDist>) -> Result<Vec<CachedDist>> {
let (tx, rx) = oneshot::channel();
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_install_progress(wheel);
}
let Self {
venv,
link_mode,
reporter,
installer_name,
} = self;
let layout = venv.interpreter().layout();
Ok::<(), Error>(())
})
})
rayon::spawn(move || {
let result = install(wheels, layout, installer_name, link_mode, reporter);
tx.send(result).unwrap();
});
rx.await
.map_err(|_| anyhow::anyhow!("`install_blocking` task panicked"))
.and_then(convert::identity)
}
/// Install a set of wheels into a Python virtual environment synchronously.
#[instrument(skip_all, fields(num_wheels = %wheels.len()))]
pub fn install_blocking(self, wheels: Vec<CachedDist>) -> Result<Vec<CachedDist>> {
install(
wheels,
self.venv.interpreter().layout(),
self.installer_name,
self.link_mode,
self.reporter,
)
}
}
/// Install a set of wheels into a Python virtual environment synchronously.
#[instrument(skip_all, fields(num_wheels = %wheels.len()))]
fn install(
wheels: Vec<CachedDist>,
layout: Layout,
installer_name: Option<String>,
link_mode: LinkMode,
reporter: Option<Box<dyn Reporter>>,
) -> Result<Vec<CachedDist>> {
wheels.par_iter().try_for_each(|wheel| {
install_wheel_rs::linker::install_wheel(
&layout,
wheel.path(),
wheel.filename(),
wheel
.parsed_url()?
.as_ref()
.map(pypi_types::DirectUrl::try_from)
.transpose()?
.as_ref(),
installer_name.as_deref(),
link_mode,
)
.with_context(|| format!("Failed to install: {} ({wheel})", wheel.filename()))?;
if let Some(reporter) = reporter.as_ref() {
reporter.on_install_progress(wheel);
}
Ok::<(), Error>(())
})?;
Ok(wheels)
}
pub trait Reporter: Send + Sync {

View file

@ -442,13 +442,16 @@ pub(crate) async fn install(
}
// Install the resolved distributions.
let wheels = wheels.into_iter().chain(cached).collect::<Vec<_>>();
let mut wheels = wheels.into_iter().chain(cached).collect::<Vec<_>>();
if !wheels.is_empty() {
let start = std::time::Instant::now();
uv_installer::Installer::new(venv)
wheels = uv_installer::Installer::new(venv)
.with_link_mode(link_mode)
.with_reporter(InstallReporter::from(printer).with_length(wheels.len() as u64))
.install(&wheels)?;
// This technically can block the runtime, but we are on the main thread and
// have no other running tasks at this point, so this lets us avoid spawning a blocking
// task.
.install_blocking(wheels)?;
let s = if wheels.len() == 1 { "" } else { "s" };
writeln!(

View file

@ -1019,7 +1019,7 @@ fn main() -> ExitCode {
let result = if let Ok(stack_size) = env::var("UV_STACK_SIZE") {
let stack_size = stack_size.parse().expect("Invalid stack size");
let tokio_main = move || {
let runtime = tokio::runtime::Builder::new_multi_thread()
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.thread_stack_size(stack_size)
.build()
@ -1041,7 +1041,7 @@ fn main() -> ExitCode {
.join()
.expect("Tokio executor failed, was there a panic?")
} else {
let runtime = tokio::runtime::Builder::new_multi_thread()
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed building the Runtime");