From 36d0124e60388beff280c1d9e9313963143849f3 Mon Sep 17 00:00:00 2001 From: Charlie Marsh Date: Fri, 6 Oct 2023 16:51:31 -0400 Subject: [PATCH] Do wheel downloads concurrently (#28) --- crates/puffin-installer/src/lib.rs | 35 +++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/crates/puffin-installer/src/lib.rs b/crates/puffin-installer/src/lib.rs index a801ab800..dfd786b97 100644 --- a/crates/puffin-installer/src/lib.rs +++ b/crates/puffin-installer/src/lib.rs @@ -1,7 +1,9 @@ +use std::path::Path; use std::str::FromStr; use anyhow::Result; use install_wheel_rs::{install_wheel, InstallLocation}; +use tokio::task::JoinSet; use tokio_util::compat::FuturesAsyncReadCompatExt; use url::Url; @@ -17,20 +19,20 @@ pub async fn install( // Create a temporary directory, in which we'll store the wheels. let tmp_dir = tempfile::tempdir()?; - // Download each wheel. - // TODO(charlie): Store these in a content-addressed cache. - // TODO(charlie): Use channels to efficiently stream-and-install. + // Download the wheels in parallel. + let mut downloads = JoinSet::new(); for wheel in wheels { - let url = Url::parse(&wheel.url)?; - let reader = client.stream_external(&url).await?; - - // TODO(charlie): Stream the unzip. - let mut writer = tokio::fs::File::create(tmp_dir.path().join(&wheel.hashes.sha256)).await?; - tokio::io::copy(&mut reader.compat(), &mut writer).await?; + downloads.spawn(do_download( + wheel.clone(), + client.clone(), + tmp_dir.path().join(&wheel.hashes.sha256), + )); + } + while let Some(result) = downloads.join_next().await.transpose()? { + result?; } // Install each wheel. - // TODO(charlie): Use channels to efficiently stream-and-install. let location = InstallLocation::Venv { venv_base: python.venv().to_path_buf(), python_version: python.simple_version(), @@ -54,3 +56,16 @@ pub async fn install( Ok(()) } + +/// Download a wheel to a given path. +async fn do_download(wheel: File, client: PypiClient, path: impl AsRef) -> Result { + // TODO(charlie): Store these in a content-addressed cache. + let url = Url::parse(&wheel.url)?; + let reader = client.stream_external(&url).await?; + + // TODO(charlie): Stream the unzip. + let mut writer = tokio::fs::File::create(path).await?; + tokio::io::copy(&mut reader.compat(), &mut writer).await?; + + Ok(wheel) +}