Fetch package metadata in parallel

This commit is contained in:
Charlie Marsh 2023-10-04 20:02:05 -04:00
parent b08e8c78b5
commit 44b444494e
3 changed files with 37 additions and 6 deletions

View file

@ -18,3 +18,4 @@ async-std = { version = "1.12.0", features = [
"tokio1", "tokio1",
"unstable", "unstable",
] } ] }
futures = "0.3.28"

View file

@ -2,7 +2,10 @@ use std::path::Path;
use std::str::FromStr; use std::str::FromStr;
use anyhow::Result; use anyhow::Result;
use futures::{StreamExt, TryFutureExt};
use puffin_client::PypiClientBuilder; use puffin_client::PypiClientBuilder;
use puffin_requirements::Requirement;
use crate::commands::ExitStatus; use crate::commands::ExitStatus;
@ -16,12 +19,39 @@ pub(crate) async fn install(src: &Path) -> Result<ExitStatus> {
// Instantiate a client. // Instantiate a client.
let client = PypiClientBuilder::default().build(); let client = PypiClientBuilder::default().build();
// Fetch metadata in parallel.
let (package_sink, package_stream) = futures::channel::mpsc::unbounded();
// Create a stream of futures that fetch metadata for each requirement.
let mut package_stream = package_stream
.map(|requirement: Requirement| {
client
.simple(requirement.clone().name)
.map_ok(move |metadata| (metadata, requirement))
})
.buffer_unordered(32)
.ready_chunks(32);
// Push all the requirements into the sink.
let mut in_flight = 0;
for requirement in requirements.iter() { for requirement in requirements.iter() {
let packument = client.simple(&requirement.name).await?; package_sink.unbounded_send(requirement.clone())?;
#[allow(clippy::print_stdout)] in_flight += 1;
{ }
println!("{packument:#?}");
println!("{requirement:#?}"); while let Some(chunk) = package_stream.next().await {
in_flight -= chunk.len();
for result in chunk {
let (metadata, requirement) = result?;
#[allow(clippy::print_stdout)]
{
println!("{metadata:#?}");
println!("{requirement:#?}");
}
}
if in_flight == 0 {
break;
} }
} }

View file

@ -4,7 +4,7 @@ use std::str::FromStr;
use anyhow::Result; use anyhow::Result;
use memchr::{memchr2, memchr_iter}; use memchr::{memchr2, memchr_iter};
use pep508_rs::{Pep508Error, Requirement}; pub use pep508_rs::{Pep508Error, Requirement};
#[derive(Debug)] #[derive(Debug)]
pub struct Requirements(Vec<Requirement>); pub struct Requirements(Vec<Requirement>);