Fetch --find-links indexes in parallel (#934)

## Summary

Removes a TODO.

## Test Plan

Tested manually with:

```shell
cargo run -p puffin-cli -- \
    pip compile requirements.in -n \
    --find-links 'https://download.pytorch.org/whl/torch_stable.html' \
    --find-links 'https://storage.googleapis.com/jax-releases/jax_cuda_releases.html' \
    --verbose
```

And inspecting the logs to ensure that the two requests were kicked off
concrurently.
This commit is contained in:
Charlie Marsh 2024-01-16 05:37:35 -05:00 committed by GitHub
parent 2f8f126f2f
commit b50e5fcbc5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -2,6 +2,7 @@ use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::path::PathBuf;
use futures::StreamExt;
use reqwest::Response;
use rustc_hash::FxHashMap;
use tracing::{debug, info_span, instrument, warn, Instrument};
@ -53,30 +54,35 @@ impl<'a> FlatIndexClient<'a> {
&self,
indexes: impl Iterator<Item = &FlatIndexLocation>,
) -> Result<Vec<FlatIndexEntry>, FlatIndexError> {
let mut dists = Vec::new();
// TODO(konstin): Parallelize reads over flat indexes.
for flat_index in indexes {
let index_dists = match flat_index {
FlatIndexLocation::Path(path) => Self::read_from_directory(path)
.map_err(|err| FlatIndexError::FindLinksDirectory(path.clone(), err))?,
FlatIndexLocation::Url(url) => self
.read_from_url(url)
.await
.map_err(|err| FlatIndexError::FindLinksUrl(url.clone(), err))?,
};
if index_dists.is_empty() {
warn!("No packages found in `--find-links` entry: {}", flat_index);
} else {
debug!(
"Found {} package{} in `--find-links` entry: {}",
index_dists.len(),
if index_dists.len() == 1 { "" } else { "s" },
flat_index
);
}
dists.extend(index_dists);
let mut fetches = futures::stream::iter(indexes)
.map(|index| async move {
let entries = match index {
FlatIndexLocation::Path(path) => Self::read_from_directory(path)
.map_err(|err| FlatIndexError::FindLinksDirectory(path.clone(), err))?,
FlatIndexLocation::Url(url) => self
.read_from_url(url)
.await
.map_err(|err| FlatIndexError::FindLinksUrl(url.clone(), err))?,
};
if entries.is_empty() {
warn!("No packages found in `--find-links` entry: {}", index);
} else {
debug!(
"Found {} package{} in `--find-links` entry: {}",
entries.len(),
if entries.len() == 1 { "" } else { "s" },
index
);
}
Ok::<Vec<FlatIndexEntry>, FlatIndexError>(entries)
})
.buffered(16);
let mut results = Vec::new();
while let Some(entries) = fetches.next().await.transpose()? {
results.extend(entries);
}
Ok(dists)
Ok(results)
}
/// Read a flat remote index from a `--find-links` URL.