Separate unzip into its own install phase (#87)

This commit is contained in:
Charlie Marsh 2023-10-11 11:18:23 -04:00 committed by GitHub
parent 85162d1111
commit 906a482499
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 226 additions and 100 deletions

View file

@ -47,6 +47,41 @@ impl puffin_resolver::Reporter for ResolverReporter {
}
}
#[derive(Debug)]
pub(crate) struct UnzipReporter {
progress: ProgressBar,
}
impl From<Printer> for UnzipReporter {
fn from(printer: Printer) -> Self {
let progress = ProgressBar::with_draw_target(None, printer.target());
progress.set_message("Unzipping wheels...");
progress.set_style(
ProgressStyle::with_template("{bar:20} [{pos}/{len}] {wide_msg:.dim}").unwrap(),
);
Self { progress }
}
}
impl UnzipReporter {
#[must_use]
pub(crate) fn with_length(self, length: u64) -> Self {
self.progress.set_length(length);
self
}
}
impl puffin_installer::UnzipReporter for UnzipReporter {
fn on_unzip_progress(&self, name: &PackageName, version: &Version) {
self.progress.set_message(format!("{name}=={version}"));
self.progress.inc(1);
}
fn on_unzip_complete(&self) {
self.progress.finish_and_clear();
}
}
#[derive(Debug)]
pub(crate) struct DownloadReporter {
progress: ProgressBar,

View file

@ -16,7 +16,9 @@ use puffin_interpreter::{PythonExecutable, SitePackages};
use puffin_package::package_name::PackageName;
use puffin_package::requirements::Requirements;
use crate::commands::reporters::{DownloadReporter, InstallReporter, ResolverReporter};
use crate::commands::reporters::{
DownloadReporter, InstallReporter, ResolverReporter, UnzipReporter,
};
use crate::commands::{elapsed, ExitStatus};
use crate::printer::Printer;
@ -161,6 +163,8 @@ pub(crate) async fn sync(
resolution
};
let start = std::time::Instant::now();
let uncached = resolution
.into_files()
.map(RemoteDistribution::from_file)
@ -168,8 +172,8 @@ pub(crate) async fn sync(
let staging = tempfile::tempdir()?;
// Download any missing distributions.
let wheels = if uncached.is_empty() {
cached
let downloads = if uncached.is_empty() {
vec![]
} else {
let downloader = puffin_installer::Downloader::new(&client, cache)
.with_reporter(DownloadReporter::from(printer).with_length(uncached.len() as u64));
@ -190,10 +194,41 @@ pub(crate) async fn sync(
.dimmed()
)?;
downloads.into_iter().chain(cached).collect::<Vec<_>>()
downloads
};
let start = std::time::Instant::now();
// Unzip any downloaded distributions.
let unzips = if downloads.is_empty() {
vec![]
} else {
let unzipper = puffin_installer::Unzipper::default()
.with_reporter(UnzipReporter::from(printer).with_length(downloads.len() as u64));
let unzips = unzipper
.download(downloads, cache.unwrap_or(staging.path()))
.await?;
let s = if unzips.len() == 1 { "" } else { "s" };
writeln!(
printer,
"{}",
format!(
"Unzipped {} in {}",
format!("{} package{}", unzips.len(), s).bold(),
elapsed(start.elapsed())
)
.dimmed()
)?;
unzips
};
let start = std::time::Instant::now();
// Install the resolved distributions.
let wheels = unzips.into_iter().chain(cached).collect::<Vec<_>>();
puffin_installer::Installer::new(&python)
.with_reporter(InstallReporter::from(printer).with_length(wheels.len() as u64))
.install(&wheels)?;

View file

@ -2,13 +2,10 @@ use std::path::Path;
use anyhow::Result;
use cacache::{Algorithm, Integrity};
use rayon::iter::ParallelBridge;
use rayon::iter::ParallelIterator;
use tokio::task::JoinSet;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::debug;
use url::Url;
use zip::ZipArchive;
use pep440_rs::Version;
use puffin_client::PypiClient;
@ -16,8 +13,6 @@ use puffin_package::package_name::PackageName;
use crate::cache::WheelCache;
use crate::distribution::RemoteDistribution;
use crate::vendor::CloneableSeekableReader;
use crate::LocalDistribution;
pub struct Downloader<'a> {
client: &'a PypiClient,
@ -49,7 +44,7 @@ impl<'a> Downloader<'a> {
&'a self,
wheels: &'a [RemoteDistribution],
target: &'a Path,
) -> Result<Vec<LocalDistribution>> {
) -> Result<Vec<InMemoryDistribution>> {
// Create the wheel cache subdirectory, if necessary.
let wheel_cache = WheelCache::new(target);
wheel_cache.init().await?;
@ -68,57 +63,29 @@ impl<'a> Downloader<'a> {
}
while let Some(result) = fetches.join_next().await.transpose()? {
downloads.push(result?);
}
let mut wheels = Vec::with_capacity(downloads.len());
// Phase 2: Unpack the wheels into the cache.
let staging = tempfile::tempdir()?;
for download in downloads {
let remote = download.remote.clone();
debug!("Unpacking wheel: {}", remote.file().filename);
// Unzip the wheel.
tokio::task::spawn_blocking({
let target = staging.path().join(remote.id());
move || unzip_wheel(download, &target)
})
.await??;
// Write the unzipped wheel to the target directory.
tokio::fs::rename(
staging.path().join(remote.id()),
wheel_cache.entry(&remote.id()),
)
.await?;
wheels.push(LocalDistribution::new(
remote.name().clone(),
remote.version().clone(),
wheel_cache.entry(&remote.id()),
));
let result = result?;
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_download_progress(remote.name(), remote.version());
reporter.on_download_progress(result.remote.name(), result.remote.version());
}
downloads.push(result);
}
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_download_complete();
}
Ok(wheels)
Ok(downloads)
}
}
#[derive(Debug, Clone)]
struct InMemoryDistribution {
pub struct InMemoryDistribution {
/// The remote file from which this wheel was downloaded.
remote: RemoteDistribution,
pub(crate) remote: RemoteDistribution,
/// The contents of the wheel.
buffer: Vec<u8>,
pub(crate) buffer: Vec<u8>,
}
/// Download a wheel to a given path.
@ -154,55 +121,10 @@ async fn fetch_wheel(
Ok(InMemoryDistribution { remote, buffer })
}
/// Write a wheel into the target directory.
fn unzip_wheel(wheel: InMemoryDistribution, target: &Path) -> Result<()> {
// Read the wheel into a buffer.
let reader = std::io::Cursor::new(wheel.buffer);
let archive = ZipArchive::new(CloneableSeekableReader::new(reader))?;
// Unzip in parallel.
(0..archive.len())
.par_bridge()
.map(|file_number| {
let mut archive = archive.clone();
let mut file = archive.by_index(file_number)?;
// Determine the path of the file within the wheel.
let file_path = match file.enclosed_name() {
Some(path) => path.to_owned(),
None => return Ok(()),
};
// Create necessary parent directories.
let path = target.join(file_path);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
// Write the file.
let mut outfile = std::fs::File::create(&path)?;
std::io::copy(&mut file, &mut outfile)?;
// Set permissions.
#[cfg(unix)]
{
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;
if let Some(mode) = file.unix_mode() {
std::fs::set_permissions(&path, Permissions::from_mode(mode))?;
}
}
Ok(())
})
.collect::<Result<_>>()
}
pub trait Reporter: Send + Sync {
/// Callback to invoke when a wheel is downloaded.
fn on_download_progress(&self, name: &PackageName, version: &Version);
/// Callback to invoke when the download is complete.
/// Callback to invoke when the operation is complete.
fn on_download_complete(&self);
}

View file

@ -3,6 +3,7 @@ pub use downloader::{Downloader, Reporter as DownloadReporter};
pub use index::LocalIndex;
pub use installer::{Installer, Reporter as InstallReporter};
pub use uninstall::uninstall;
pub use unzipper::{Reporter as UnzipReporter, Unzipper};
mod cache;
mod distribution;
@ -10,4 +11,5 @@ mod downloader;
mod index;
mod installer;
mod uninstall;
mod unzipper;
mod vendor;

View file

@ -0,0 +1,134 @@
use std::path::Path;
use anyhow::Result;
use rayon::iter::ParallelBridge;
use rayon::iter::ParallelIterator;
use tracing::debug;
use zip::ZipArchive;
use pep440_rs::Version;
use puffin_package::package_name::PackageName;
use crate::cache::WheelCache;
use crate::downloader::InMemoryDistribution;
use crate::vendor::CloneableSeekableReader;
use crate::LocalDistribution;
#[derive(Default)]
pub struct Unzipper {
reporter: Option<Box<dyn Reporter>>,
}
impl Unzipper {
/// Set the [`Reporter`] to use for this unzipper.
#[must_use]
pub fn with_reporter(self, reporter: impl Reporter + 'static) -> Self {
Self {
reporter: Some(Box::new(reporter)),
}
}
/// Install a set of wheels into a Python virtual environment.
pub async fn download(
&self,
downloads: Vec<InMemoryDistribution>,
target: &Path,
) -> Result<Vec<LocalDistribution>> {
// Create the wheel cache subdirectory, if necessary.
let wheel_cache = WheelCache::new(target);
wheel_cache.init().await?;
let staging = tempfile::tempdir()?;
// Unpack the wheels into the cache.
let mut wheels = Vec::with_capacity(downloads.len());
for download in downloads {
let remote = download.remote.clone();
debug!("Unpacking wheel: {}", remote.file().filename);
// Unzip the wheel.
tokio::task::spawn_blocking({
let target = staging.path().join(remote.id());
move || unzip_wheel(download, &target)
})
.await??;
// Write the unzipped wheel to the target directory.
tokio::fs::rename(
staging.path().join(remote.id()),
wheel_cache.entry(&remote.id()),
)
.await?;
wheels.push(LocalDistribution::new(
remote.name().clone(),
remote.version().clone(),
wheel_cache.entry(&remote.id()),
));
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_unzip_progress(remote.name(), remote.version());
}
}
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_unzip_complete();
}
Ok(wheels)
}
}
/// Write a wheel into the target directory.
fn unzip_wheel(wheel: InMemoryDistribution, target: &Path) -> Result<()> {
// Read the wheel into a buffer.
let reader = std::io::Cursor::new(wheel.buffer);
let archive = ZipArchive::new(CloneableSeekableReader::new(reader))?;
// Unzip in parallel.
(0..archive.len())
.par_bridge()
.map(|file_number| {
let mut archive = archive.clone();
let mut file = archive.by_index(file_number)?;
// Determine the path of the file within the wheel.
let file_path = match file.enclosed_name() {
Some(path) => path.to_owned(),
None => return Ok(()),
};
// Create necessary parent directories.
let path = target.join(file_path);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
// Write the file.
let mut outfile = std::fs::File::create(&path)?;
std::io::copy(&mut file, &mut outfile)?;
// Set permissions.
#[cfg(unix)]
{
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;
if let Some(mode) = file.unix_mode() {
std::fs::set_permissions(&path, Permissions::from_mode(mode))?;
}
}
Ok(())
})
.collect::<Result<_>>()
}
pub trait Reporter: Send + Sync {
/// Callback to invoke when a wheel is unzipped.
fn on_unzip_progress(&self, name: &PackageName, version: &Version);
/// Callback to invoke when the operation is complete.
fn on_unzip_complete(&self);
}

View file

@ -1 +0,0 @@
mypy

View file

@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::BTreeMap;
use pep440_rs::Version;
use puffin_client::File;
@ -6,11 +6,11 @@ use puffin_package::metadata::Metadata21;
use puffin_package::package_name::PackageName;
#[derive(Debug, Default)]
pub struct Resolution(HashMap<PackageName, PinnedPackage>);
pub struct Resolution(BTreeMap<PackageName, PinnedPackage>);
impl Resolution {
/// Create a new resolution from the given pinned packages.
pub(crate) fn new(packages: HashMap<PackageName, PinnedPackage>) -> Self {
pub(crate) fn new(packages: BTreeMap<PackageName, PinnedPackage>) -> Self {
Self(packages)
}

View file

@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, HashSet};
use std::str::FromStr;
use anyhow::Result;
@ -87,8 +87,7 @@ impl<'a> Resolver<'a> {
}
// Resolve the requirements.
let mut resolution: HashMap<PackageName, PinnedPackage> =
HashMap::with_capacity(in_flight.len());
let mut resolution: BTreeMap<PackageName, PinnedPackage> = BTreeMap::new();
while let Some(chunk) = package_stream.next().await {
for result in chunk {

View file

@ -99,7 +99,7 @@ async fn scipy() -> Result<()> {
)
.await?;
assert_eq!(format!("{resolution}"), "scipy==1.11.2\nnumpy==1.25.2");
assert_eq!(format!("{resolution}"), "numpy==1.25.2\nscipy==1.11.2");
Ok(())
}