From 44b444494e227ab4e978caabddd8729d7395463c Mon Sep 17 00:00:00 2001 From: Charlie Marsh Date: Wed, 4 Oct 2023 20:02:05 -0400 Subject: [PATCH] Fetch package metadata in parallel --- crates/puffin-cli/Cargo.toml | 1 + crates/puffin-cli/src/commands/install.rs | 40 ++++++++++++++++++++--- crates/puffin-requirements/src/lib.rs | 2 +- 3 files changed, 37 insertions(+), 6 deletions(-) diff --git a/crates/puffin-cli/Cargo.toml b/crates/puffin-cli/Cargo.toml index c634ad4f8..47d6ac0c1 100644 --- a/crates/puffin-cli/Cargo.toml +++ b/crates/puffin-cli/Cargo.toml @@ -18,3 +18,4 @@ async-std = { version = "1.12.0", features = [ "tokio1", "unstable", ] } +futures = "0.3.28" diff --git a/crates/puffin-cli/src/commands/install.rs b/crates/puffin-cli/src/commands/install.rs index 9bc5a81a6..fdd8a2980 100644 --- a/crates/puffin-cli/src/commands/install.rs +++ b/crates/puffin-cli/src/commands/install.rs @@ -2,7 +2,10 @@ use std::path::Path; use std::str::FromStr; use anyhow::Result; +use futures::{StreamExt, TryFutureExt}; + use puffin_client::PypiClientBuilder; +use puffin_requirements::Requirement; use crate::commands::ExitStatus; @@ -16,12 +19,39 @@ pub(crate) async fn install(src: &Path) -> Result { // Instantiate a client. let client = PypiClientBuilder::default().build(); + // Fetch metadata in parallel. + let (package_sink, package_stream) = futures::channel::mpsc::unbounded(); + + // Create a stream of futures that fetch metadata for each requirement. + let mut package_stream = package_stream + .map(|requirement: Requirement| { + client + .simple(requirement.clone().name) + .map_ok(move |metadata| (metadata, requirement)) + }) + .buffer_unordered(32) + .ready_chunks(32); + + // Push all the requirements into the sink. + let mut in_flight = 0; for requirement in requirements.iter() { - let packument = client.simple(&requirement.name).await?; - #[allow(clippy::print_stdout)] - { - println!("{packument:#?}"); - println!("{requirement:#?}"); + package_sink.unbounded_send(requirement.clone())?; + in_flight += 1; + } + + while let Some(chunk) = package_stream.next().await { + in_flight -= chunk.len(); + for result in chunk { + let (metadata, requirement) = result?; + #[allow(clippy::print_stdout)] + { + println!("{metadata:#?}"); + println!("{requirement:#?}"); + } + } + + if in_flight == 0 { + break; } } diff --git a/crates/puffin-requirements/src/lib.rs b/crates/puffin-requirements/src/lib.rs index b35b7b314..094387822 100644 --- a/crates/puffin-requirements/src/lib.rs +++ b/crates/puffin-requirements/src/lib.rs @@ -4,7 +4,7 @@ use std::str::FromStr; use anyhow::Result; use memchr::{memchr2, memchr_iter}; -use pep508_rs::{Pep508Error, Requirement}; +pub use pep508_rs::{Pep508Error, Requirement}; #[derive(Debug)] pub struct Requirements(Vec);