From dc2c289dfff2e7cae6b4417c1d9661adf488c2ce Mon Sep 17 00:00:00 2001 From: Charlie Marsh Date: Wed, 3 Apr 2024 21:31:40 -0400 Subject: [PATCH] Upgrade `rs-async-zip` to support data descriptors (#2809) ## Summary Upgrading `rs-async-zip` enables us to support data descriptors in streaming. This both greatly improves performance for indexes that use data descriptors _and_ ensures that we support them in a few other places (e.g., zipped source distributions created in Finder). Closes #2808. --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- crates/uv-client/src/registry_client.rs | 6 ++++-- crates/uv-client/src/remote_metadata.rs | 6 ++++-- crates/uv-distribution/src/distribution_database.rs | 3 +-- crates/uv-distribution/src/source/mod.rs | 3 +-- crates/uv-extract/src/seek.rs | 4 ++-- crates/uv-extract/src/stream.rs | 10 ++++------ 8 files changed, 19 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0395e2662..7a4676e40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -246,8 +246,8 @@ dependencies = [ [[package]] name = "async_zip" -version = "0.0.16" -source = "git+https://github.com/charliermarsh/rs-async-zip?rev=d76801da0943de985254fc6255c0e476b57c5836#d76801da0943de985254fc6255c0e476b57c5836" +version = "0.0.17" +source = "git+https://github.com/charliermarsh/rs-async-zip?rev=1dcb40cfe1bf5325a6fd4bfcf9894db40241f585#1dcb40cfe1bf5325a6fd4bfcf9894db40241f585" dependencies = [ "async-compression", "crc32fast", diff --git a/Cargo.toml b/Cargo.toml index 1b05003d5..3621f00fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,7 +55,7 @@ async-channel = { version = "2.2.0" } async-compression = { version = "0.4.6" } async-trait = { version = "0.1.78" } async_http_range_reader = { version = "0.7.0" } -async_zip = { git = "https://github.com/charliermarsh/rs-async-zip", rev = "d76801da0943de985254fc6255c0e476b57c5836", features = ["deflate"] } +async_zip = { git = "https://github.com/charliermarsh/rs-async-zip", rev = "1dcb40cfe1bf5325a6fd4bfcf9894db40241f585", features = ["deflate"] } axoupdater = { version = "0.3.1", default-features = false } backoff = { version = "0.4.0" } base64 = { version = "0.21.7" } diff --git a/crates/uv-client/src/registry_client.rs b/crates/uv-client/src/registry_client.rs index ec9bbf572..b5f51d76e 100644 --- a/crates/uv-client/src/registry_client.rs +++ b/crates/uv-client/src/registry_client.rs @@ -9,7 +9,7 @@ use http::HeaderMap; use reqwest::{Client, Response, StatusCode}; use serde::{Deserialize, Serialize}; use tokio::io::AsyncReadExt; -use tokio_util::compat::FuturesAsyncReadCompatExt; +use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt}; use tracing::{info_span, instrument, trace, warn, Instrument}; use url::Url; @@ -618,7 +618,8 @@ async fn read_metadata_async_seek( debug_source: String, reader: impl tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin, ) -> Result { - let mut zip_reader = async_zip::tokio::read::seek::ZipFileReader::with_tokio(reader) + let reader = futures::io::BufReader::new(reader.compat()); + let mut zip_reader = async_zip::base::read::seek::ZipFileReader::new(reader) .await .map_err(|err| ErrorKind::Zip(filename.clone(), err))?; @@ -655,6 +656,7 @@ async fn read_metadata_async_stream( debug_source: String, reader: R, ) -> Result { + let reader = futures::io::BufReader::with_capacity(128 * 1024, reader); let mut zip = async_zip::base::read::stream::ZipFileReader::new(reader); while let Some(mut entry) = zip diff --git a/crates/uv-client/src/remote_metadata.rs b/crates/uv-client/src/remote_metadata.rs index 548968e2f..11b156c70 100644 --- a/crates/uv-client/src/remote_metadata.rs +++ b/crates/uv-client/src/remote_metadata.rs @@ -1,5 +1,5 @@ use async_http_range_reader::AsyncHttpRangeReader; -use async_zip::tokio::read::seek::ZipFileReader; +use futures::io::BufReader; use tokio_util::compat::TokioAsyncReadCompatExt; use distribution_filename::WheelFilename; @@ -61,7 +61,8 @@ pub(crate) async fn wheel_metadata_from_remote_zip( .await; // Construct a zip reader to uses the stream. - let mut reader = ZipFileReader::new(reader.compat()) + let buf = BufReader::new(reader.compat()); + let mut reader = async_zip::base::read::seek::ZipFileReader::new(buf) .await .map_err(|err| ErrorKind::Zip(filename.clone(), err))?; @@ -90,6 +91,7 @@ pub(crate) async fn wheel_metadata_from_remote_zip( reader .inner_mut() .get_mut() + .get_mut() .prefetch(offset..offset + size) .await; diff --git a/crates/uv-distribution/src/distribution_database.rs b/crates/uv-distribution/src/distribution_database.rs index e4b1cec23..dbe968067 100644 --- a/crates/uv-distribution/src/distribution_database.rs +++ b/crates/uv-distribution/src/distribution_database.rs @@ -481,8 +481,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> file.seek(io::SeekFrom::Start(0)) .await .map_err(Error::CacheWrite)?; - let reader = tokio::io::BufReader::new(file); - uv_extract::seek::unzip(reader, temp_dir.path()).await?; + uv_extract::seek::unzip(file, temp_dir.path()).await?; // Persist the temporary directory to the directory store. let archive = self diff --git a/crates/uv-distribution/src/source/mod.rs b/crates/uv-distribution/src/source/mod.rs index 2ff2873e9..4be6443d4 100644 --- a/crates/uv-distribution/src/source/mod.rs +++ b/crates/uv-distribution/src/source/mod.rs @@ -1293,8 +1293,7 @@ async fn extract_archive(path: &Path, cache: &Cache) -> Result( target: impl AsRef, ) -> Result<(), Error> { let target = target.as_ref(); - let mut reader = reader.compat(); + let mut reader = futures::io::BufReader::new(reader.compat()); let mut zip = async_zip::base::read::seek::ZipFileReader::new(&mut reader).await?; let mut directories = FxHashSet::default(); @@ -81,7 +81,7 @@ pub async fn unzip( } /// Unzip a `.zip` or `.tar.gz` archive into the target directory, requiring `Seek`. -pub async fn archive( +pub async fn archive( reader: R, source: impl AsRef, target: impl AsRef, diff --git a/crates/uv-extract/src/stream.rs b/crates/uv-extract/src/stream.rs index 3754064a7..7fb4707fc 100644 --- a/crates/uv-extract/src/stream.rs +++ b/crates/uv-extract/src/stream.rs @@ -66,10 +66,7 @@ pub async fn unzip( use std::fs::Permissions; use std::os::unix::fs::PermissionsExt; - // To avoid lots of small reads to `reader` when parsing the central directory, wrap it in - // a buffer. - let mut buf = futures::io::BufReader::new(reader); - let mut directory = async_zip::base::read::cd::CentralDirectoryReader::new(&mut buf); + let mut directory = async_zip::base::read::cd::CentralDirectoryReader::new(&mut reader); while let Some(entry) = directory.next().await? { if entry.dir()? { continue; @@ -154,10 +151,11 @@ async fn untar_in>( /// Unzip a `.tar.gz` archive into the target directory, without requiring `Seek`. /// /// This is useful for unpacking files as they're being downloaded. -pub async fn untar( +pub async fn untar( reader: R, target: impl AsRef, ) -> Result<(), Error> { + let reader = tokio::io::BufReader::new(reader); let decompressed_bytes = async_compression::tokio::bufread::GzipDecoder::new(reader); let mut archive = tokio_tar::ArchiveBuilder::new(decompressed_bytes) .set_preserve_mtime(false) @@ -166,7 +164,7 @@ pub async fn untar( } /// Unzip a `.zip` or `.tar.gz` archive into the target directory, without requiring `Seek`. -pub async fn archive( +pub async fn archive( reader: R, source: impl AsRef, target: impl AsRef,