mirror of
https://github.com/astral-sh/uv.git
synced 2025-11-03 05:03:46 +00:00
Reduce stack usage by boxing File in Dist, CachePolicy and large futures (#1004)
This is https://github.com/astral-sh/puffin/pull/947 again but this time merging into main instead of downstack, sorry for the noise. --- Windows has a default stack size of 1MB, which makes puffin often fail with stack overflows. The PR reduces stack size by three changes: * Boxing `File` in `Dist`, reducing the size from 496 to 240. * Boxing the largest futures. * Boxing `CachePolicy` ## Method Debugging happened on linux using https://github.com/astral-sh/puffin/pull/941 to limit the stack size to 1MB. Used ran the command below. ``` RUSTFLAGS=-Zprint-type-sizes cargo +nightly build -p puffin-cli -j 1 > type-sizes.txt && top-type-sizes -w -s -h 10 < type-sizes.txt > sizes.txt ``` The main drawback is top-type-sizes not saying what the `__awaitee` is, so it requires manually looking up with a future with matching size. When the `brotli` features on `reqwest` is active, a lot of brotli types show up. Toggling this feature however seems to have no effect. I assume they are false positives since the `brotli` crate has elaborate control about allocation. The sizes are therefore shown with the feature off. ## Results The largest future goes from 12208B to 6416B, the largest type (`PrioritizedDistribution`, see also #948) from 17448B to 9264B. Full diff: https://gist.github.com/konstin/62635c0d12110a616a1b2bfcde21304f For the second commit, i iteratively boxed the largest file until the tests passed, then with an 800KB stack limit looked through the backtrace of a failing test and added some more boxing. Quick benchmarking showed no difference: ```console $ hyperfine --warmup 2 "target/profiling/main-dev resolve meine_stadt_transparent" "target/profiling/puffin-dev resolve meine_stadt_transparent" Benchmark 1: target/profiling/main-dev resolve meine_stadt_transparent Time (mean ± σ): 49.2 ms ± 3.0 ms [User: 39.8 ms, System: 24.0 ms] Range (min … max): 46.6 ms … 63.0 ms 55 runs Warning: Statistical outliers were detected. Consider re-running this benchmark on a quiet system without any interferences from other programs. It might help to use the '--warmup' or '--prepare' options. Benchmark 2: target/profiling/puffin-dev resolve meine_stadt_transparent Time (mean ± σ): 47.4 ms ± 3.2 ms [User: 41.3 ms, System: 20.6 ms] Range (min … max): 44.6 ms … 60.5 ms 62 runs Warning: Statistical outliers were detected. Consider re-running this benchmark on a quiet system without any interferences from other programs. It might help to use the '--warmup' or '--prepare' options. Summary target/profiling/puffin-dev resolve meine_stadt_transparent ran 1.04 ± 0.09 times faster than target/profiling/main-dev resolve meine_stadt_transparent ```
This commit is contained in:
parent
66e651901e
commit
47fc90d1b3
9 changed files with 65 additions and 25 deletions
|
|
@ -147,7 +147,7 @@ pub enum SourceDist {
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct RegistryBuiltDist {
|
pub struct RegistryBuiltDist {
|
||||||
pub filename: WheelFilename,
|
pub filename: WheelFilename,
|
||||||
pub file: File,
|
pub file: Box<File>,
|
||||||
pub index: IndexUrl,
|
pub index: IndexUrl,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -172,7 +172,7 @@ pub struct PathBuiltDist {
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct RegistrySourceDist {
|
pub struct RegistrySourceDist {
|
||||||
pub filename: SourceDistFilename,
|
pub filename: SourceDistFilename,
|
||||||
pub file: File,
|
pub file: Box<File>,
|
||||||
pub index: IndexUrl,
|
pub index: IndexUrl,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -208,14 +208,14 @@ impl Dist {
|
||||||
DistFilename::WheelFilename(filename) => {
|
DistFilename::WheelFilename(filename) => {
|
||||||
Self::Built(BuiltDist::Registry(RegistryBuiltDist {
|
Self::Built(BuiltDist::Registry(RegistryBuiltDist {
|
||||||
filename,
|
filename,
|
||||||
file,
|
file: Box::new(file),
|
||||||
index,
|
index,
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
DistFilename::SourceDistFilename(filename) => {
|
DistFilename::SourceDistFilename(filename) => {
|
||||||
Self::Source(SourceDist::Registry(RegistrySourceDist {
|
Self::Source(SourceDist::Registry(RegistrySourceDist {
|
||||||
filename,
|
filename,
|
||||||
file,
|
file: Box::new(file),
|
||||||
index,
|
index,
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
@ -865,3 +865,16 @@ impl Identifier for Dist {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use crate::{BuiltDist, Dist, SourceDist};
|
||||||
|
|
||||||
|
/// Ensure that we don't accidentally grow the `Dist` sizes.
|
||||||
|
#[test]
|
||||||
|
fn dist_size() {
|
||||||
|
assert!(std::mem::size_of::<Dist>() <= 240);
|
||||||
|
assert!(std::mem::size_of::<BuiltDist>() <= 240);
|
||||||
|
assert!(std::mem::size_of::<SourceDist>() <= 168);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
use futures::FutureExt;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::time::SystemTime;
|
use std::time::SystemTime;
|
||||||
|
|
||||||
|
|
@ -42,14 +43,15 @@ enum CachedResponse<Payload: Serialize> {
|
||||||
/// There was no prior cached response or the cache was outdated
|
/// There was no prior cached response or the cache was outdated
|
||||||
///
|
///
|
||||||
/// The cache policy is `None` if it isn't storable
|
/// The cache policy is `None` if it isn't storable
|
||||||
ModifiedOrNew(Response, Option<CachePolicy>),
|
ModifiedOrNew(Response, Option<Box<CachePolicy>>),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Serialize the actual payload together with its caching information
|
/// Serialize the actual payload together with its caching information
|
||||||
#[derive(Debug, Deserialize, Serialize)]
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
pub struct DataWithCachePolicy<Payload: Serialize> {
|
pub struct DataWithCachePolicy<Payload: Serialize> {
|
||||||
pub data: Payload,
|
pub data: Payload,
|
||||||
cache_policy: CachePolicy,
|
// The cache policy is large (448 bytes at time of writing), reduce the stack size
|
||||||
|
cache_policy: Box<CachePolicy>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Custom caching layer over [`reqwest::Client`] using `http-cache-semantics`.
|
/// Custom caching layer over [`reqwest::Client`] using `http-cache-semantics`.
|
||||||
|
|
@ -88,7 +90,7 @@ impl CachedClient {
|
||||||
/// client.
|
/// client.
|
||||||
#[instrument(skip_all)]
|
#[instrument(skip_all)]
|
||||||
pub async fn get_cached_with_callback<
|
pub async fn get_cached_with_callback<
|
||||||
Payload: Serialize + DeserializeOwned,
|
Payload: Serialize + DeserializeOwned + Send,
|
||||||
CallBackError,
|
CallBackError,
|
||||||
Callback,
|
Callback,
|
||||||
CallbackReturn,
|
CallbackReturn,
|
||||||
|
|
@ -128,7 +130,7 @@ impl CachedClient {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
let cached_response = self.send_cached(req, cached).await?;
|
let cached_response = self.send_cached(req, cached).boxed().await?;
|
||||||
|
|
||||||
let write_cache = info_span!("write_cache", file = %cache_entry.path().display());
|
let write_cache = info_span!("write_cache", file = %cache_entry.path().display());
|
||||||
match cached_response {
|
match cached_response {
|
||||||
|
|
@ -231,14 +233,14 @@ impl CachedClient {
|
||||||
debug!("Found not-modified response for: {url}");
|
debug!("Found not-modified response for: {url}");
|
||||||
CachedResponse::NotModified(DataWithCachePolicy {
|
CachedResponse::NotModified(DataWithCachePolicy {
|
||||||
data: cached.data,
|
data: cached.data,
|
||||||
cache_policy: new_policy,
|
cache_policy: Box::new(new_policy),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
AfterResponse::Modified(new_policy, _parts) => {
|
AfterResponse::Modified(new_policy, _parts) => {
|
||||||
debug!("Found modified response for: {url}");
|
debug!("Found modified response for: {url}");
|
||||||
CachedResponse::ModifiedOrNew(
|
CachedResponse::ModifiedOrNew(
|
||||||
res,
|
res,
|
||||||
new_policy.is_storable().then_some(new_policy),
|
new_policy.is_storable().then(|| Box::new(new_policy)),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -271,7 +273,7 @@ impl CachedClient {
|
||||||
CachePolicy::new(&converted_req.into_parts().0, &converted_res.into_parts().0);
|
CachePolicy::new(&converted_req.into_parts().0, &converted_res.into_parts().0);
|
||||||
Ok(CachedResponse::ModifiedOrNew(
|
Ok(CachedResponse::ModifiedOrNew(
|
||||||
res,
|
res,
|
||||||
cache_policy.is_storable().then_some(cache_policy),
|
cache_policy.is_storable().then(|| Box::new(cache_policy)),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,7 @@ use std::collections::btree_map::Entry;
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
use futures::StreamExt;
|
use futures::{FutureExt, StreamExt};
|
||||||
use reqwest::Response;
|
use reqwest::Response;
|
||||||
use rustc_hash::FxHashMap;
|
use rustc_hash::FxHashMap;
|
||||||
use tracing::{debug, info_span, instrument, warn, Instrument};
|
use tracing::{debug, info_span, instrument, warn, Instrument};
|
||||||
|
|
@ -120,6 +120,7 @@ impl<'a> FlatIndexClient<'a> {
|
||||||
.collect();
|
.collect();
|
||||||
Ok(files)
|
Ok(files)
|
||||||
}
|
}
|
||||||
|
.boxed()
|
||||||
.instrument(info_span!("parse_flat_index_html", url = % url))
|
.instrument(info_span!("parse_flat_index_html", url = % url))
|
||||||
};
|
};
|
||||||
let files = cached_client
|
let files = cached_client
|
||||||
|
|
@ -218,7 +219,7 @@ impl FlatIndex {
|
||||||
|
|
||||||
let dist = Dist::Built(BuiltDist::Registry(RegistryBuiltDist {
|
let dist = Dist::Built(BuiltDist::Registry(RegistryBuiltDist {
|
||||||
filename,
|
filename,
|
||||||
file,
|
file: Box::new(file),
|
||||||
index,
|
index,
|
||||||
}));
|
}));
|
||||||
match distributions.0.entry(version) {
|
match distributions.0.entry(version) {
|
||||||
|
|
@ -235,7 +236,7 @@ impl FlatIndex {
|
||||||
DistFilename::SourceDistFilename(filename) => {
|
DistFilename::SourceDistFilename(filename) => {
|
||||||
let dist = Dist::Source(SourceDist::Registry(RegistrySourceDist {
|
let dist = Dist::Source(SourceDist::Registry(RegistrySourceDist {
|
||||||
filename: filename.clone(),
|
filename: filename.clone(),
|
||||||
file,
|
file: Box::new(file),
|
||||||
index,
|
index,
|
||||||
}));
|
}));
|
||||||
match distributions.0.entry(filename.version.clone()) {
|
match distributions.0.entry(filename.version.clone()) {
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ use std::str::FromStr;
|
||||||
|
|
||||||
use async_http_range_reader::{AsyncHttpRangeReader, AsyncHttpRangeReaderError};
|
use async_http_range_reader::{AsyncHttpRangeReader, AsyncHttpRangeReaderError};
|
||||||
use async_zip::tokio::read::seek::ZipFileReader;
|
use async_zip::tokio::read::seek::ZipFileReader;
|
||||||
use futures::TryStreamExt;
|
use futures::{FutureExt, TryStreamExt};
|
||||||
use reqwest::{Client, ClientBuilder, Response, StatusCode};
|
use reqwest::{Client, ClientBuilder, Response, StatusCode};
|
||||||
use reqwest_retry::policies::ExponentialBackoff;
|
use reqwest_retry::policies::ExponentialBackoff;
|
||||||
use reqwest_retry::RetryTransientMiddleware;
|
use reqwest_retry::RetryTransientMiddleware;
|
||||||
|
|
@ -206,6 +206,7 @@ impl RegistryClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
.boxed()
|
||||||
.instrument(info_span!("parse_simple_api", package = %package_name))
|
.instrument(info_span!("parse_simple_api", package = %package_name))
|
||||||
};
|
};
|
||||||
let result = self
|
let result = self
|
||||||
|
|
@ -335,6 +336,7 @@ impl RegistryClient {
|
||||||
})?;
|
})?;
|
||||||
Ok(metadata)
|
Ok(metadata)
|
||||||
}
|
}
|
||||||
|
.boxed()
|
||||||
.instrument(info_span!("read_metadata_range_request", wheel = %filename))
|
.instrument(info_span!("read_metadata_range_request", wheel = %filename))
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use fs_err::tokio as fs;
|
use fs_err::tokio as fs;
|
||||||
|
use futures::FutureExt;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tokio::task::JoinError;
|
use tokio::task::JoinError;
|
||||||
use tokio_util::compat::FuturesAsyncReadCompatExt;
|
use tokio_util::compat::FuturesAsyncReadCompatExt;
|
||||||
|
|
@ -219,7 +220,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
|
||||||
let lock = self.locks.acquire(&dist).await;
|
let lock = self.locks.acquire(&dist).await;
|
||||||
let _guard = lock.lock().await;
|
let _guard = lock.lock().await;
|
||||||
|
|
||||||
let built_wheel = self.builder.download_and_build(source_dist).await?;
|
let built_wheel = self.builder.download_and_build(source_dist).boxed().await?;
|
||||||
Ok(LocalWheel::Built(BuiltWheel {
|
Ok(LocalWheel::Built(BuiltWheel {
|
||||||
dist: dist.clone(),
|
dist: dist.clone(),
|
||||||
path: built_wheel.path,
|
path: built_wheel.path,
|
||||||
|
|
@ -242,7 +243,9 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
|
||||||
dist: &Dist,
|
dist: &Dist,
|
||||||
) -> Result<(Metadata21, Option<Url>), DistributionDatabaseError> {
|
) -> Result<(Metadata21, Option<Url>), DistributionDatabaseError> {
|
||||||
match dist {
|
match dist {
|
||||||
Dist::Built(built_dist) => Ok((self.client.wheel_metadata(built_dist).await?, None)),
|
Dist::Built(built_dist) => {
|
||||||
|
Ok((self.client.wheel_metadata(built_dist).boxed().await?, None))
|
||||||
|
}
|
||||||
Dist::Source(source_dist) => {
|
Dist::Source(source_dist) => {
|
||||||
// Optimization: Skip source dist download when we must not build them anyway.
|
// Optimization: Skip source dist download when we must not build them anyway.
|
||||||
if self.build_context.no_build() {
|
if self.build_context.no_build() {
|
||||||
|
|
@ -263,6 +266,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
|
||||||
let metadata = self
|
let metadata = self
|
||||||
.builder
|
.builder
|
||||||
.download_and_build_metadata(&source_dist)
|
.download_and_build_metadata(&source_dist)
|
||||||
|
.boxed()
|
||||||
.await?;
|
.await?;
|
||||||
Ok((metadata, precise))
|
Ok((metadata, precise))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use fs_err::tokio as fs;
|
use fs_err::tokio as fs;
|
||||||
use futures::TryStreamExt;
|
use futures::{FutureExt, TryStreamExt};
|
||||||
use reqwest::Response;
|
use reqwest::Response;
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
use tokio_util::compat::FuturesAsyncReadCompatExt;
|
use tokio_util::compat::FuturesAsyncReadCompatExt;
|
||||||
|
|
@ -96,6 +96,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
|
||||||
&cache_shard,
|
&cache_shard,
|
||||||
subdirectory.as_deref(),
|
subdirectory.as_deref(),
|
||||||
)
|
)
|
||||||
|
.boxed()
|
||||||
.await?
|
.await?
|
||||||
}
|
}
|
||||||
SourceDist::Registry(registry_source_dist) => {
|
SourceDist::Registry(registry_source_dist) => {
|
||||||
|
|
@ -134,6 +135,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
|
||||||
&cache_shard,
|
&cache_shard,
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
|
.boxed()
|
||||||
.await?
|
.await?
|
||||||
}
|
}
|
||||||
SourceDist::Git(git_source_dist) => self.git(source_dist, git_source_dist).await?,
|
SourceDist::Git(git_source_dist) => self.git(source_dist, git_source_dist).await?,
|
||||||
|
|
@ -171,6 +173,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
|
||||||
&cache_shard,
|
&cache_shard,
|
||||||
subdirectory.as_deref(),
|
subdirectory.as_deref(),
|
||||||
)
|
)
|
||||||
|
.boxed()
|
||||||
.await?
|
.await?
|
||||||
}
|
}
|
||||||
SourceDist::Registry(registry_source_dist) => {
|
SourceDist::Registry(registry_source_dist) => {
|
||||||
|
|
@ -189,7 +192,10 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
|
||||||
path: path.clone(),
|
path: path.clone(),
|
||||||
editable: false,
|
editable: false,
|
||||||
};
|
};
|
||||||
return self.path_metadata(source_dist, &path_source_dist).await;
|
return self
|
||||||
|
.path_metadata(source_dist, &path_source_dist)
|
||||||
|
.boxed()
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -209,13 +215,18 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
|
||||||
&cache_shard,
|
&cache_shard,
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
|
.boxed()
|
||||||
.await?
|
.await?
|
||||||
}
|
}
|
||||||
SourceDist::Git(git_source_dist) => {
|
SourceDist::Git(git_source_dist) => {
|
||||||
self.git_metadata(source_dist, git_source_dist).await?
|
self.git_metadata(source_dist, git_source_dist)
|
||||||
|
.boxed()
|
||||||
|
.await?
|
||||||
}
|
}
|
||||||
SourceDist::Path(path_source_dist) => {
|
SourceDist::Path(path_source_dist) => {
|
||||||
self.path_metadata(source_dist, path_source_dist).await?
|
self.path_metadata(source_dist, path_source_dist)
|
||||||
|
.boxed()
|
||||||
|
.await?
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -380,6 +391,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
|
||||||
// If the backend supports `prepare_metadata_for_build_wheel`, use it.
|
// If the backend supports `prepare_metadata_for_build_wheel`, use it.
|
||||||
if let Some(metadata) = self
|
if let Some(metadata) = self
|
||||||
.build_source_dist_metadata(source_dist, source_dist_entry.path(), subdirectory)
|
.build_source_dist_metadata(source_dist, source_dist_entry.path(), subdirectory)
|
||||||
|
.boxed()
|
||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
if let Ok(cached) = fs::read(cache_entry.path()).await {
|
if let Ok(cached) = fs::read(cache_entry.path()).await {
|
||||||
|
|
@ -564,6 +576,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
|
||||||
// If the backend supports `prepare_metadata_for_build_wheel`, use it.
|
// If the backend supports `prepare_metadata_for_build_wheel`, use it.
|
||||||
if let Some(metadata) = self
|
if let Some(metadata) = self
|
||||||
.build_source_dist_metadata(source_dist, &path_source_dist.path, None)
|
.build_source_dist_metadata(source_dist, &path_source_dist.path, None)
|
||||||
|
.boxed()
|
||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
// Store the metadata for this build along with all the other builds.
|
// Store the metadata for this build along with all the other builds.
|
||||||
|
|
@ -712,6 +725,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
|
||||||
// If the backend supports `prepare_metadata_for_build_wheel`, use it.
|
// If the backend supports `prepare_metadata_for_build_wheel`, use it.
|
||||||
if let Some(metadata) = self
|
if let Some(metadata) = self
|
||||||
.build_source_dist_metadata(source_dist, fetch.path(), subdirectory.as_deref())
|
.build_source_dist_metadata(source_dist, fetch.path(), subdirectory.as_deref())
|
||||||
|
.boxed()
|
||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
// Store the metadata for this build along with all the other builds.
|
// Store the metadata for this build along with all the other builds.
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,7 @@ use std::cmp::Reverse;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
|
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
|
||||||
use tokio::task::JoinError;
|
use tokio::task::JoinError;
|
||||||
use tracing::{instrument, warn};
|
use tracing::{instrument, warn};
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
@ -68,7 +68,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> {
|
||||||
) -> impl Stream<Item = Result<CachedDist, Error>> + 'stream {
|
) -> impl Stream<Item = Result<CachedDist, Error>> + 'stream {
|
||||||
futures::stream::iter(distributions)
|
futures::stream::iter(distributions)
|
||||||
.map(|dist| async {
|
.map(|dist| async {
|
||||||
let wheel = self.get_wheel(dist, in_flight).await?;
|
let wheel = self.get_wheel(dist, in_flight).boxed().await?;
|
||||||
if let Some(reporter) = self.reporter.as_ref() {
|
if let Some(reporter) = self.reporter.as_ref() {
|
||||||
reporter.on_progress(&wheel);
|
reporter.on_progress(&wheel);
|
||||||
}
|
}
|
||||||
|
|
@ -158,6 +158,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> {
|
||||||
let download: LocalWheel = self
|
let download: LocalWheel = self
|
||||||
.database
|
.database
|
||||||
.get_or_build_wheel(dist.clone())
|
.get_or_build_wheel(dist.clone())
|
||||||
|
.boxed()
|
||||||
.map_err(|err| Error::Fetch(dist.clone(), err))
|
.map_err(|err| Error::Fetch(dist.clone(), err))
|
||||||
.await?;
|
.await?;
|
||||||
let result = Self::unzip_wheel(download).await;
|
let result = Self::unzip_wheel(download).await;
|
||||||
|
|
|
||||||
|
|
@ -683,7 +683,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
|
||||||
/// Fetch the metadata for a stream of packages and versions.
|
/// Fetch the metadata for a stream of packages and versions.
|
||||||
async fn fetch(&self, request_stream: UnboundedReceiver<Request>) -> Result<(), ResolveError> {
|
async fn fetch(&self, request_stream: UnboundedReceiver<Request>) -> Result<(), ResolveError> {
|
||||||
let mut response_stream = request_stream
|
let mut response_stream = request_stream
|
||||||
.map(|request| self.process_request(request))
|
.map(|request| self.process_request(request).boxed())
|
||||||
.buffer_unordered(50);
|
.buffer_unordered(50);
|
||||||
|
|
||||||
while let Some(response) = response_stream.next().await {
|
while let Some(response) = response_stream.next().await {
|
||||||
|
|
@ -738,6 +738,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
|
||||||
let version_map = self
|
let version_map = self
|
||||||
.provider
|
.provider
|
||||||
.get_version_map(&package_name)
|
.get_version_map(&package_name)
|
||||||
|
.boxed()
|
||||||
.await
|
.await
|
||||||
.map_err(ResolveError::Client)?;
|
.map_err(ResolveError::Client)?;
|
||||||
Ok(Some(Response::Package(package_name, version_map)))
|
Ok(Some(Response::Package(package_name, version_map)))
|
||||||
|
|
@ -748,6 +749,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
|
||||||
let (metadata, precise) = self
|
let (metadata, precise) = self
|
||||||
.provider
|
.provider
|
||||||
.get_or_build_wheel_metadata(&dist)
|
.get_or_build_wheel_metadata(&dist)
|
||||||
|
.boxed()
|
||||||
.await
|
.await
|
||||||
.map_err(|err| match dist.clone() {
|
.map_err(|err| match dist.clone() {
|
||||||
Dist::Built(BuiltDist::Path(built_dist)) => {
|
Dist::Built(BuiltDist::Path(built_dist)) => {
|
||||||
|
|
@ -800,6 +802,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
|
||||||
let (metadata, precise) = self
|
let (metadata, precise) = self
|
||||||
.provider
|
.provider
|
||||||
.get_or_build_wheel_metadata(&dist)
|
.get_or_build_wheel_metadata(&dist)
|
||||||
|
.boxed()
|
||||||
.await
|
.await
|
||||||
.map_err(|err| match dist.clone() {
|
.map_err(|err| match dist.clone() {
|
||||||
Dist::Built(BuiltDist::Path(built_dist)) => {
|
Dist::Built(BuiltDist::Path(built_dist)) => {
|
||||||
|
|
|
||||||
|
|
@ -51,7 +51,7 @@ use puffin_interpreter::{Interpreter, Virtualenv};
|
||||||
/// them.
|
/// them.
|
||||||
|
|
||||||
// TODO(konstin): Proper error types
|
// TODO(konstin): Proper error types
|
||||||
pub trait BuildContext {
|
pub trait BuildContext: Sync {
|
||||||
type SourceDistBuilder: SourceBuildTrait + Send + Sync;
|
type SourceDistBuilder: SourceBuildTrait + Send + Sync;
|
||||||
|
|
||||||
fn cache(&self) -> &Cache;
|
fn cache(&self) -> &Cache;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue