Stream unpacking of source distribution downloads (#1157)

This PR migrates our source distribution downloads to unzip as we
stream, similar to our approach for wheels.

In my testing, this showed a consistent speedup (e.g., 6% here for a few
representative source distributions):

```text
❯ python -m scripts.bench --puffin-path ./target/release/main --puffin-path ./target/release/puffin --benchmark install-cold requirements.in
Benchmark 1: ./target/release/main (install-cold)
  Time (mean ± σ):      1.503 s ±  0.039 s    [User: 1.479 s, System: 0.537 s]
  Range (min … max):    1.466 s …  1.605 s    10 runs

Benchmark 2: ./target/release/puffin (install-cold)
  Time (mean ± σ):      1.421 s ±  0.024 s    [User: 1.505 s, System: 0.593 s]
  Range (min … max):    1.381 s …  1.454 s    10 runs

Summary
  './target/release/puffin (install-cold)' ran
    1.06 ± 0.03 times faster than './target/release/main (install-cold)'
```
This commit is contained in:
Charlie Marsh 2024-01-28 17:09:24 -08:00 committed by GitHub
parent 5219d37250
commit d88ce76979
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 750 additions and 294 deletions

View file

@ -8,7 +8,6 @@ use anyhow::Result;
use fs_err::tokio as fs;
use futures::{FutureExt, TryStreamExt};
use reqwest::Response;
use tempfile::TempDir;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{debug, info_span, instrument, Instrument};
use url::Url;
@ -750,60 +749,30 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
return Ok(cache_path);
}
// Download the source distribution to a temporary file.
// TODO(charlie): Unzip as we download, as with wheels.
// Download and unzip the source distribution into a temporary directory.
let span =
info_span!("download_source_dist", filename = filename, source_dist = %source_dist);
let download_dir = self.download_source_dist_url(response, filename).await?;
drop(span);
// Unzip the source distribution to a temporary directory.
let span =
info_span!("extract_source_dist", filename = filename, source_dist = %source_dist);
let source_dist_dir = puffin_extract::extract_source(
download_dir.path().join(filename),
download_dir.path().join("extracted"),
)?;
drop(span);
// Persist the unzipped distribution to the cache.
fs_err::tokio::create_dir_all(cache_path.parent().expect("Cache entry to have parent"))
.await
.map_err(Error::CacheWrite)?;
fs_err::tokio::rename(&source_dist_dir, &cache_path)
.await
.map_err(Error::CacheWrite)?;
Ok(cache_path)
}
/// Download a source distribution from a URL to a temporary file.
async fn download_source_dist_url(
&self,
response: Response,
source_dist_filename: &str,
) -> Result<TempDir, puffin_client::Error> {
let temp_dir =
tempfile::tempdir_in(self.build_context.cache().root()).map_err(Error::CacheWrite)?;
let reader = response
.bytes_stream()
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
.into_async_read();
let mut reader = tokio::io::BufReader::new(reader.compat());
puffin_extract::stream::archive(reader.compat(), filename, temp_dir.path()).await?;
drop(span);
// Create a temporary directory.
let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
.map_err(puffin_client::ErrorKind::CacheWrite)?;
// Extract the top-level directory.
let extracted = puffin_extract::strip_component(temp_dir.path())?;
// Download the source distribution to a temporary file.
let mut writer = tokio::io::BufWriter::new(
fs_err::tokio::File::create(temp_dir.path().join(source_dist_filename))
.await
.map_err(puffin_client::ErrorKind::CacheWrite)?,
);
tokio::io::copy(&mut reader, &mut writer)
// Persist it to the cache.
fs_err::tokio::create_dir_all(cache_path.parent().expect("Cache entry to have parent"))
.await
.map_err(puffin_client::ErrorKind::CacheWrite)?;
.map_err(Error::CacheWrite)?;
fs_err::tokio::rename(extracted, &cache_path)
.await
.map_err(Error::CacheWrite)?;
Ok(temp_dir)
Ok(cache_path)
}
/// Download a source distribution from a Git repository.