From 6d672b895165c0db838dabdadc77c58ee631e514 Mon Sep 17 00:00:00 2001 From: Charlie Marsh Date: Mon, 6 Nov 2023 05:22:36 -0800 Subject: [PATCH] Add source distribution support to `pip-compile` (#323) ## Summary This is a first-pass at adding source distribution support to the installer. The previous installation flow was: 1. Come up with a plan. 1. Find a distribution (specific file) for every package that we'll need to download. 1. Download those distributions. 1. Unzip them (since we assumed they were all wheels). 1. Install them into the virtual environment. Now, Step (3) downloads both wheels and source distributions, and we insert a step between Steps (3) and (4) to build any source distributions into zipped wheels. There are a bunch of TODOs, the most important (IMO) is that we basically have two implementations of downloading and building, between the stuff in `puffin_installer` and `puffin_resolver` (namely in `crates/puffin-resolver/src/distribution`). I didn't attempt to clean that up here -- it's already a problem, and it's related to the overall problem we need to solve around unified caching and resource management. Closes #243. --- Cargo.lock | 4 + crates/puffin-cli/src/commands/pip_sync.rs | 52 +++- crates/puffin-cli/src/commands/reporters.rs | 35 +++ crates/puffin-cli/tests/pip_sync.rs | 137 +++++++++ .../snapshots/pip_sync__install_git.snap | 24 ++ .../pip_sync__install_git_subdirectories.snap | 25 ++ .../snapshots/pip_sync__install_sdist.snap | 24 ++ crates/puffin-dispatch/src/lib.rs | 31 +- crates/puffin-distribution/src/lib.rs | 47 +++ crates/puffin-installer/Cargo.toml | 4 + crates/puffin-installer/src/builder.rs | 109 +++++++ crates/puffin-installer/src/downloader.rs | 272 ++++++++++++++---- crates/puffin-installer/src/lib.rs | 5 +- crates/puffin-installer/src/locks.rs | 19 ++ crates/puffin-installer/src/plan.rs | 3 + crates/puffin-installer/src/unzipper.rs | 27 +- crates/puffin-installer/src/vendor/mod.rs | 2 +- .../src/distribution/source_distribution.rs | 4 + 18 files changed, 745 insertions(+), 79 deletions(-) create mode 100644 crates/puffin-cli/tests/snapshots/pip_sync__install_git.snap create mode 100644 crates/puffin-cli/tests/snapshots/pip_sync__install_git_subdirectories.snap create mode 100644 crates/puffin-cli/tests/snapshots/pip_sync__install_sdist.snap create mode 100644 crates/puffin-installer/src/builder.rs create mode 100644 crates/puffin-installer/src/locks.rs diff --git a/Cargo.lock b/Cargo.lock index 56d7ca1b8..f230688cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2278,13 +2278,17 @@ dependencies = [ "install-wheel-rs", "pep440_rs 0.3.12", "pep508_rs", + "puffin-cache", "puffin-client", "puffin-distribution", + "puffin-git", "puffin-interpreter", "puffin-normalize", "puffin-package", + "puffin-traits", "rayon", "tempfile", + "thiserror", "tokio", "tokio-util", "tracing", diff --git a/crates/puffin-cli/src/commands/pip_sync.rs b/crates/puffin-cli/src/commands/pip_sync.rs index 7f89fd8d5..ddd3562b0 100644 --- a/crates/puffin-cli/src/commands/pip_sync.rs +++ b/crates/puffin-cli/src/commands/pip_sync.rs @@ -3,20 +3,22 @@ use std::path::Path; use anyhow::{Context, Result}; use colored::Colorize; -use itertools::Itertools; +use itertools::{Either, Itertools}; use tracing::debug; +use fs_err as fs; use install_wheel_rs::linker::LinkMode; use pep508_rs::Requirement; use platform_host::Platform; use platform_tags::Tags; use puffin_client::RegistryClientBuilder; +use puffin_dispatch::BuildDispatch; use puffin_distribution::Distribution; -use puffin_installer::PartitionedRequirements; +use puffin_installer::{Builder, PartitionedRequirements}; use puffin_interpreter::Virtualenv; use crate::commands::reporters::{ - DownloadReporter, FinderReporter, InstallReporter, UnzipReporter, + BuildReporter, DownloadReporter, FinderReporter, InstallReporter, UnzipReporter, }; use crate::commands::{elapsed, ExitStatus}; use crate::index_urls::IndexUrls; @@ -55,7 +57,6 @@ pub(crate) async fn sync_requirements( cache: &Path, mut printer: Printer, ) -> Result { - // Audit the requirements. let start = std::time::Instant::now(); // Detect the current Python interpreter. @@ -163,6 +164,49 @@ pub(crate) async fn sync_requirements( downloads }; + let (wheels, sdists): (Vec<_>, Vec<_>) = + downloads + .into_iter() + .partition_map(|download| match download { + puffin_installer::Download::Wheel(wheel) => Either::Left(wheel), + puffin_installer::Download::SourceDistribution(sdist) => Either::Right(sdist), + }); + + // Build any missing source distributions. + let sdists = if sdists.is_empty() { + vec![] + } else { + let start = std::time::Instant::now(); + + let build_dispatch = BuildDispatch::new( + RegistryClientBuilder::default().build(), + cache.to_path_buf(), + venv.interpreter_info().clone(), + fs::canonicalize(venv.python_executable())?, + ); + + let builder = Builder::new(&build_dispatch) + .with_reporter(BuildReporter::from(printer).with_length(sdists.len() as u64)); + + let wheels = builder.build(sdists).await?; + + let s = if wheels.len() == 1 { "" } else { "s" }; + writeln!( + printer, + "{}", + format!( + "Built {} in {}", + format!("{} package{}", wheels.len(), s).bold(), + elapsed(start.elapsed()) + ) + .dimmed() + )?; + + wheels + }; + + let downloads = wheels.into_iter().chain(sdists).collect::>(); + // Unzip any downloaded distributions. let unzips = if downloads.is_empty() { vec![] diff --git a/crates/puffin-cli/src/commands/reporters.rs b/crates/puffin-cli/src/commands/reporters.rs index ef7e0381f..54235770d 100644 --- a/crates/puffin-cli/src/commands/reporters.rs +++ b/crates/puffin-cli/src/commands/reporters.rs @@ -148,6 +148,41 @@ impl puffin_installer::InstallReporter for InstallReporter { } } +#[derive(Debug)] +pub(crate) struct BuildReporter { + progress: ProgressBar, +} + +impl From for BuildReporter { + fn from(printer: Printer) -> Self { + let progress = ProgressBar::with_draw_target(None, printer.target()); + progress.set_style( + ProgressStyle::with_template("{bar:20} [{pos}/{len}] {wide_msg:.dim}").unwrap(), + ); + progress.set_message("Building wheels..."); + Self { progress } + } +} + +impl BuildReporter { + #[must_use] + pub(crate) fn with_length(self, length: u64) -> Self { + self.progress.set_length(length); + self + } +} + +impl puffin_installer::BuildReporter for BuildReporter { + fn on_progress(&self, wheel: &RemoteDistribution) { + self.progress.set_message(format!("{wheel}")); + self.progress.inc(1); + } + + fn on_complete(&self) { + self.progress.finish_and_clear(); + } +} + #[derive(Debug)] pub(crate) struct ResolverReporter { progress: ProgressBar, diff --git a/crates/puffin-cli/tests/pip_sync.rs b/crates/puffin-cli/tests/pip_sync.rs index ae93aa557..43783c4d6 100644 --- a/crates/puffin-cli/tests/pip_sync.rs +++ b/crates/puffin-cli/tests/pip_sync.rs @@ -569,3 +569,140 @@ fn install_url() -> Result<()> { Ok(()) } + +/// Install a package into a virtual environment from a Git repository. +#[test] +#[cfg(feature = "git")] +fn install_git() -> Result<()> { + let temp_dir = assert_fs::TempDir::new()?; + let cache_dir = assert_fs::TempDir::new()?; + let venv = temp_dir.child(".venv"); + + Command::new(get_cargo_bin(BIN_NAME)) + .arg("venv") + .arg(venv.as_os_str()) + .arg("--cache-dir") + .arg(cache_dir.path()) + .current_dir(&temp_dir) + .assert() + .success(); + venv.assert(predicates::path::is_dir()); + + let requirements_txt = temp_dir.child("requirements.txt"); + requirements_txt.touch()?; + requirements_txt.write_str("werkzeug @ git+https://github.com/pallets/werkzeug.git@2.0.0")?; + + insta::with_settings!({ + filters => vec![ + (r"(\d|\.)+(ms|s)", "[TIME]"), + ] + }, { + assert_cmd_snapshot!(Command::new(get_cargo_bin(BIN_NAME)) + .arg("pip-sync") + .arg("requirements.txt") + .arg("--cache-dir") + .arg(cache_dir.path()) + .env("VIRTUAL_ENV", venv.as_os_str()) + .current_dir(&temp_dir)); + }); + + Command::new(venv.join("bin").join("python")) + .arg("-c") + .arg("import werkzeug") + .current_dir(&temp_dir) + .assert() + .success(); + + Ok(()) +} + +/// Install two packages from the same Git repository. +#[test] +#[cfg(feature = "git")] +fn install_git_subdirectories() -> Result<()> { + let temp_dir = assert_fs::TempDir::new()?; + let cache_dir = assert_fs::TempDir::new()?; + let venv = temp_dir.child(".venv"); + + Command::new(get_cargo_bin(BIN_NAME)) + .arg("venv") + .arg(venv.as_os_str()) + .arg("--cache-dir") + .arg(cache_dir.path()) + .current_dir(&temp_dir) + .assert() + .success(); + venv.assert(predicates::path::is_dir()); + + let requirements_txt = temp_dir.child("requirements.txt"); + requirements_txt.touch()?; + requirements_txt.write_str("example-pkg-a @ git+https://github.com/pypa/sample-namespace-packages.git@df7530eeb8fa0cb7dbb8ecb28363e8e36bfa2f45#subdirectory=pkg_resources/pkg_a\nexample-pkg-b @ git+https://github.com/pypa/sample-namespace-packages.git@df7530eeb8fa0cb7dbb8ecb28363e8e36bfa2f45#subdirectory=pkg_resources/pkg_b")?; + + insta::with_settings!({ + filters => vec![ + (r"(\d|\.)+(ms|s)", "[TIME]"), + ] + }, { + assert_cmd_snapshot!(Command::new(get_cargo_bin(BIN_NAME)) + .arg("pip-sync") + .arg("requirements.txt") + .arg("--cache-dir") + .arg(cache_dir.path()) + .env("VIRTUAL_ENV", venv.as_os_str()) + .current_dir(&temp_dir)); + }); + + Command::new(venv.join("bin").join("python")) + .arg("-c") + .arg("import example_pkg") + .current_dir(&temp_dir) + .assert() + .success(); + + Ok(()) +} + +/// Install a source distribution into a virtual environment. +#[test] +fn install_sdist() -> Result<()> { + let temp_dir = assert_fs::TempDir::new()?; + let cache_dir = assert_fs::TempDir::new()?; + let venv = temp_dir.child(".venv"); + + Command::new(get_cargo_bin(BIN_NAME)) + .arg("venv") + .arg(venv.as_os_str()) + .arg("--cache-dir") + .arg(cache_dir.path()) + .current_dir(&temp_dir) + .assert() + .success(); + venv.assert(predicates::path::is_dir()); + + let requirements_txt = temp_dir.child("requirements.txt"); + requirements_txt.touch()?; + requirements_txt.write_str("Werkzeug==0.9.6")?; + + insta::with_settings!({ + filters => vec![ + (r"(\d|\.)+(ms|s)", "[TIME]"), + ] + }, { + assert_cmd_snapshot!(Command::new(get_cargo_bin(BIN_NAME)) + .arg("pip-sync") + .arg("requirements.txt") + .arg("--cache-dir") + .arg(cache_dir.path()) + .env("VIRTUAL_ENV", venv.as_os_str()) + .current_dir(&temp_dir)); + }); + + Command::new(venv.join("bin").join("python")) + .arg("-c") + .arg("import werkzeug") + .current_dir(&temp_dir) + .assert() + .success(); + + Ok(()) +} diff --git a/crates/puffin-cli/tests/snapshots/pip_sync__install_git.snap b/crates/puffin-cli/tests/snapshots/pip_sync__install_git.snap new file mode 100644 index 000000000..870aec6d4 --- /dev/null +++ b/crates/puffin-cli/tests/snapshots/pip_sync__install_git.snap @@ -0,0 +1,24 @@ +--- +source: crates/puffin-cli/tests/pip_sync.rs +info: + program: puffin + args: + - pip-sync + - requirements.txt + - "--cache-dir" + - /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpZNXBuT + env: + VIRTUAL_ENV: /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpBYANH7/.venv +--- +success: true +exit_code: 0 +----- stdout ----- + +----- stderr ----- +Resolved 1 package in [TIME] +Downloaded 1 package in [TIME] +Built 1 package in [TIME] +Unzipped 1 package in [TIME] +Installed 1 package in [TIME] + + werkzeug @ git+https://github.com/pallets/werkzeug.git@2.0.0 + diff --git a/crates/puffin-cli/tests/snapshots/pip_sync__install_git_subdirectories.snap b/crates/puffin-cli/tests/snapshots/pip_sync__install_git_subdirectories.snap new file mode 100644 index 000000000..a1a42bd22 --- /dev/null +++ b/crates/puffin-cli/tests/snapshots/pip_sync__install_git_subdirectories.snap @@ -0,0 +1,25 @@ +--- +source: crates/puffin-cli/tests/pip_sync.rs +info: + program: puffin + args: + - pip-sync + - requirements.txt + - "--cache-dir" + - /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpJvtAcl + env: + VIRTUAL_ENV: /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpYIBn2G/.venv +--- +success: true +exit_code: 0 +----- stdout ----- + +----- stderr ----- +Resolved 2 packages in [TIME] +Downloaded 2 packages in [TIME] +Built 2 packages in [TIME] +Unzipped 2 packages in [TIME] +Installed 2 packages in [TIME] + + example-pkg-a @ git+https://github.com/pypa/sample-namespace-packages.git@df7530eeb8fa0cb7dbb8ecb28363e8e36bfa2f45#subdirectory=pkg_resources/pkg_a + + example-pkg-b @ git+https://github.com/pypa/sample-namespace-packages.git@df7530eeb8fa0cb7dbb8ecb28363e8e36bfa2f45#subdirectory=pkg_resources/pkg_b + diff --git a/crates/puffin-cli/tests/snapshots/pip_sync__install_sdist.snap b/crates/puffin-cli/tests/snapshots/pip_sync__install_sdist.snap new file mode 100644 index 000000000..bdeb70436 --- /dev/null +++ b/crates/puffin-cli/tests/snapshots/pip_sync__install_sdist.snap @@ -0,0 +1,24 @@ +--- +source: crates/puffin-cli/tests/pip_sync.rs +info: + program: puffin + args: + - pip-sync + - requirements.txt + - "--cache-dir" + - /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpTtWuay + env: + VIRTUAL_ENV: /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpdSdhVI/.venv +--- +success: true +exit_code: 0 +----- stdout ----- + +----- stderr ----- +Resolved 1 package in [TIME] +Downloaded 1 package in [TIME] +Built 1 package in [TIME] +Unzipped 1 package in [TIME] +Installed 1 package in [TIME] + + werkzeug==0.9.6 + diff --git a/crates/puffin-dispatch/src/lib.rs b/crates/puffin-dispatch/src/lib.rs index 5ab815f0e..7db635105 100644 --- a/crates/puffin-dispatch/src/lib.rs +++ b/crates/puffin-dispatch/src/lib.rs @@ -8,14 +8,14 @@ use std::pin::Pin; use anyhow::Context; use anyhow::Result; -use itertools::Itertools; +use itertools::{Either, Itertools}; use tracing::{debug, instrument}; use pep508_rs::Requirement; use platform_tags::Tags; use puffin_build::SourceDistributionBuilder; use puffin_client::RegistryClient; -use puffin_installer::{Downloader, Installer, PartitionedRequirements, Unzipper}; +use puffin_installer::{Builder, Downloader, Installer, PartitionedRequirements, Unzipper}; use puffin_interpreter::{InterpreterInfo, Virtualenv}; use puffin_resolver::{DistributionFinder, Manifest, PreReleaseMode, ResolutionMode, Resolver}; use puffin_traits::BuildContext; @@ -144,6 +144,33 @@ impl BuildContext for BuildDispatch { .context("Failed to download build dependencies")? }; + let (wheels, sdists): (Vec<_>, Vec<_>) = + downloads + .into_iter() + .partition_map(|download| match download { + puffin_installer::Download::Wheel(wheel) => Either::Left(wheel), + puffin_installer::Download::SourceDistribution(sdist) => { + Either::Right(sdist) + } + }); + + // Build any missing source distributions. + let sdists = if sdists.is_empty() { + vec![] + } else { + debug!( + "Building source distributions{}: {}", + if sdists.len() == 1 { "" } else { "s" }, + sdists.iter().map(ToString::to_string).join(", ") + ); + Builder::new(self) + .build(sdists) + .await + .context("Failed to build source distributions")? + }; + + let downloads = wheels.into_iter().chain(sdists).collect::>(); + // Unzip any downloaded distributions. let unzips = if downloads.is_empty() { vec![] diff --git a/crates/puffin-distribution/src/lib.rs b/crates/puffin-distribution/src/lib.rs index b348d6ddf..7337f8391 100644 --- a/crates/puffin-distribution/src/lib.rs +++ b/crates/puffin-distribution/src/lib.rs @@ -97,6 +97,31 @@ impl RemoteDistribution { Self::Url(name, url) } + /// Return the URL of the distribution. + pub fn url(&self) -> Result> { + match self { + Self::Registry(_, _, file) => { + let url = Url::parse(&file.url)?; + Ok(Cow::Owned(url)) + } + Self::Url(_, url) => Ok(Cow::Borrowed(url)), + } + } + + /// Return the filename of the distribution. + pub fn filename(&self) -> Result> { + match self { + Self::Registry(_, _, file) => Ok(Cow::Borrowed(&file.filename)), + Self::Url(_, url) => { + let filename = url + .path_segments() + .and_then(std::iter::Iterator::last) + .ok_or_else(|| anyhow!("Could not parse filename from URL: {}", url))?; + Ok(Cow::Owned(filename.to_owned())) + } + } + } + /// Return the normalized [`PackageName`] of the distribution. pub fn name(&self) -> &PackageName { match self { @@ -125,6 +150,17 @@ impl RemoteDistribution { Self::Url(_name, url) => puffin_cache::digest(&CanonicalUrl::new(url)), } } + + /// Returns `true` if this distribution is a wheel. + pub fn is_wheel(&self) -> bool { + let filename = match self { + Self::Registry(_name, _version, file) => &file.filename, + Self::Url(_name, url) => url.path(), + }; + Path::new(filename) + .extension() + .is_some_and(|ext| ext.eq_ignore_ascii_case("whl")) + } } impl std::fmt::Display for RemoteDistribution { @@ -379,3 +415,14 @@ impl std::fmt::Display for RemoteDistributionRef<'_> { } } } + +impl<'a> From<&'a RemoteDistribution> for RemoteDistributionRef<'a> { + fn from(dist: &'a RemoteDistribution) -> Self { + match dist { + RemoteDistribution::Registry(name, version, file) => { + Self::Registry(name, version, file) + } + RemoteDistribution::Url(name, url) => Self::Url(name, url), + } + } +} diff --git a/crates/puffin-installer/Cargo.toml b/crates/puffin-installer/Cargo.toml index 2bb983a68..3005cd889 100644 --- a/crates/puffin-installer/Cargo.toml +++ b/crates/puffin-installer/Cargo.toml @@ -13,11 +13,14 @@ license = { workspace = true } install-wheel-rs = { path = "../install-wheel-rs", default-features = false } pep440_rs = { path = "../pep440-rs" } pep508_rs = { path = "../pep508-rs" } +puffin-cache = { path = "../puffin-cache" } puffin-client = { path = "../puffin-client" } puffin-distribution = { path = "../puffin-distribution" } +puffin-git = { path = "../puffin-git" } puffin-interpreter = { path = "../puffin-interpreter" } puffin-normalize = { path = "../puffin-normalize" } puffin-package = { path = "../puffin-package" } +puffin-traits = { path = "../puffin-traits" } distribution-filename = { path = "../distribution-filename" } anyhow = { workspace = true } @@ -26,6 +29,7 @@ fs-err = { workspace = true } fxhash = { workspace = true } rayon = { workspace = true } tempfile = { workspace = true } +thiserror = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } tracing = { workspace = true } diff --git a/crates/puffin-installer/src/builder.rs b/crates/puffin-installer/src/builder.rs new file mode 100644 index 000000000..b982f4e23 --- /dev/null +++ b/crates/puffin-installer/src/builder.rs @@ -0,0 +1,109 @@ +//! Build source distributions from downloaded archives. +//! +//! TODO(charlie): Unify with `crates/puffin-resolver/src/distribution/source_distribution.rs`. + +use std::cmp::Reverse; + +use anyhow::Result; +use fs_err::tokio as fs; +use tracing::debug; + +use puffin_distribution::RemoteDistribution; +use puffin_traits::BuildContext; + +use crate::downloader::{DiskWheel, SourceDistribution, Wheel}; + +const BUILT_WHEELS_CACHE: &str = "built-wheels-v0"; + +pub struct Builder<'a, T: BuildContext + Send + Sync> { + build_context: &'a T, + reporter: Option>, +} + +impl<'a, T: BuildContext + Send + Sync> Builder<'a, T> { + /// Initialize a new source distribution downloader. + pub fn new(build_context: &'a T) -> Self { + Self { + build_context, + reporter: None, + } + } + + /// Set the [`Reporter`] to use for this downloader. + #[must_use] + pub fn with_reporter(self, reporter: impl Reporter + 'static) -> Self { + Self { + reporter: Some(Box::new(reporter)), + ..self + } + } + + /// Build a set of source distributions. + pub async fn build(&'a self, distributions: Vec) -> Result> { + // Sort the distributions by size. + let mut distributions = distributions; + distributions.sort_unstable_by_key(|distribution| match &distribution.remote { + RemoteDistribution::Registry(_package, _version, file) => Reverse(file.size), + RemoteDistribution::Url(_, _) => Reverse(usize::MIN), + }); + + // Build the distributions serially. + let mut builds = Vec::with_capacity(distributions.len()); + for distribution in distributions { + debug!("Building source distribution: {distribution}"); + + let result = build_sdist(distribution, self.build_context).await?; + + if let Some(reporter) = self.reporter.as_ref() { + reporter.on_progress(result.remote()); + } + + builds.push(result); + } + + if let Some(reporter) = self.reporter.as_ref() { + reporter.on_complete(); + } + + Ok(builds) + } +} + +/// Build a source distribution into a wheel. +async fn build_sdist( + distribution: SourceDistribution, + build_context: &T, +) -> Result { + // Create a directory for the wheel. + let wheel_dir = build_context + .cache() + .join(BUILT_WHEELS_CACHE) + .join(distribution.remote.id()); + fs::create_dir_all(&wheel_dir).await?; + + // Build the wheel. + // TODO(charlie): If this is a Git dependency, we should do another checkout. If the same + // repository is used by multiple dependencies, at multiple commits, the local checkout may now + // point to the wrong commit. + let disk_filename = build_context + .build_source_distribution( + &distribution.sdist_file, + distribution.subdirectory.as_deref(), + &wheel_dir, + ) + .await?; + let wheel_filename = wheel_dir.join(disk_filename); + + Ok(Wheel::Disk(DiskWheel { + remote: distribution.remote, + path: wheel_filename, + })) +} + +pub trait Reporter: Send + Sync { + /// Callback to invoke when a source distribution is built. + fn on_progress(&self, wheel: &RemoteDistribution); + + /// Callback to invoke when the operation is complete. + fn on_complete(&self); +} diff --git a/crates/puffin-installer/src/downloader.rs b/crates/puffin-installer/src/downloader.rs index 6dbf3696a..672f95caf 100644 --- a/crates/puffin-installer/src/downloader.rs +++ b/crates/puffin-installer/src/downloader.rs @@ -1,5 +1,6 @@ use std::cmp::Reverse; use std::path::{Path, PathBuf}; +use std::sync::Arc; use anyhow::Result; use cacache::{Algorithm, Integrity}; @@ -9,20 +10,28 @@ use tracing::debug; use url::Url; use puffin_client::RegistryClient; -use puffin_distribution::RemoteDistribution; +use puffin_distribution::source::Source; +use puffin_distribution::{RemoteDistribution, RemoteDistributionRef}; +use puffin_git::GitSource; + +use crate::locks::Locks; + +const GIT_CACHE: &str = "git-v0"; pub struct Downloader<'a> { client: &'a RegistryClient, cache: &'a Path, + locks: Arc, reporter: Option>, } impl<'a> Downloader<'a> { - /// Initialize a new downloader. + /// Initialize a new distribution downloader. pub fn new(client: &'a RegistryClient, cache: &'a Path) -> Self { Self { client, cache, + locks: Arc::new(Locks::default()), reporter: None, } } @@ -36,28 +45,29 @@ impl<'a> Downloader<'a> { } } - /// Install a set of wheels into a Python virtual environment. + /// Download a set of distributions. pub async fn download( &'a self, - wheels: Vec, - ) -> Result> { - // Sort the wheels by size. - let mut wheels = wheels; - wheels.sort_unstable_by_key(|wheel| match wheel { + distributions: Vec, + ) -> Result> { + // Sort the distributions by size. + let mut distributions = distributions; + distributions.sort_unstable_by_key(|wheel| match wheel { RemoteDistribution::Registry(_package, _version, file) => Reverse(file.size), RemoteDistribution::Url(_, _) => Reverse(usize::MIN), }); - // Phase 1: Fetch the wheels in parallel. + // Fetch the distributions in parallel. let mut fetches = JoinSet::new(); - let mut downloads = Vec::with_capacity(wheels.len()); - for remote in wheels { - debug!("Downloading wheel: {remote}"); + let mut downloads = Vec::with_capacity(distributions.len()); + for distribution in distributions { + debug!("Downloading wheel: {distribution}"); - fetches.spawn(fetch_wheel( - remote.clone(), + fetches.spawn(fetch_distribution( + distribution.clone(), self.client.clone(), self.cache.to_path_buf(), + self.locks.clone(), )); } @@ -65,7 +75,7 @@ impl<'a> Downloader<'a> { let result = result?; if let Some(reporter) = self.reporter.as_ref() { - reporter.on_download_progress(&result.remote); + reporter.on_download_progress(result.remote()); } downloads.push(result); @@ -79,62 +89,119 @@ impl<'a> Downloader<'a> { } } -#[derive(Debug)] -pub struct InMemoryDistribution { - /// The remote file from which this wheel was downloaded. - pub(crate) remote: RemoteDistribution, - /// The contents of the wheel. - pub(crate) buffer: Vec, -} - -impl std::fmt::Display for InMemoryDistribution { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.remote) - } -} - -/// Download a wheel to a given path. -async fn fetch_wheel( - remote: RemoteDistribution, +/// Download a built distribution (wheel) or source distribution (sdist). +async fn fetch_distribution( + distribution: RemoteDistribution, client: RegistryClient, cache: PathBuf, -) -> Result { - match &remote { - RemoteDistribution::Registry(.., file) => { - // Parse the wheel's SRI. - let sri = Integrity::from_hex(&file.hashes.sha256, Algorithm::Sha256)?; + locks: Arc, +) -> Result { + let url = distribution.url()?; + let lock = locks.acquire(&url).await; + let _guard = lock.lock().await; - // Read from the cache, if possible. - if let Ok(buffer) = cacache::read_hash(&cache, &sri).await { - debug!("Extracted wheel from cache: {remote}"); - return Ok(InMemoryDistribution { remote, buffer }); + if distribution.is_wheel() { + match &distribution { + RemoteDistribution::Registry(.., file) => { + // Parse the wheel's SRI. + let sri = Integrity::from_hex(&file.hashes.sha256, Algorithm::Sha256)?; + + // Read from the cache, if possible. + if let Ok(buffer) = cacache::read_hash(&cache, &sri).await { + debug!("Extracted wheel from cache: {distribution}"); + return Ok(Download::Wheel(Wheel::InMemory(InMemoryWheel { + remote: distribution, + buffer, + }))); + } + + // Fetch the wheel. + let url = Url::parse(&file.url)?; + let reader = client.stream_external(&url).await?; + + // Read into a buffer. + let mut buffer = Vec::with_capacity(file.size); + let mut reader = tokio::io::BufReader::new(reader.compat()); + tokio::io::copy(&mut reader, &mut buffer).await?; + + // Write the buffer to the cache. + cacache::write_hash(&cache, &buffer).await?; + + Ok(Download::Wheel(Wheel::InMemory(InMemoryWheel { + remote: distribution, + buffer, + }))) } + RemoteDistribution::Url(.., url) => { + // Fetch the wheel. + let reader = client.stream_external(url).await?; - // Fetch the wheel. - let url = Url::parse(&file.url)?; - let reader = client.stream_external(&url).await?; + // Read into a buffer. + let mut buffer = Vec::with_capacity(1024 * 1024); + let mut reader = tokio::io::BufReader::new(reader.compat()); + tokio::io::copy(&mut reader, &mut buffer).await?; - // Read into a buffer. - let mut buffer = Vec::with_capacity(file.size); - let mut reader = tokio::io::BufReader::new(reader.compat()); - tokio::io::copy(&mut reader, &mut buffer).await?; - - // Write the buffer to the cache, if possible. - cacache::write_hash(&cache, &buffer).await?; - - Ok(InMemoryDistribution { remote, buffer }) + Ok(Download::Wheel(Wheel::InMemory(InMemoryWheel { + remote: distribution, + buffer, + }))) + } } - RemoteDistribution::Url(.., url) => { - // Fetch the wheel. - let reader = client.stream_external(url).await?; + } else { + let distribution_ref = RemoteDistributionRef::from(&distribution); + let source = Source::try_from(&distribution_ref)?; + let (sdist_file, subdirectory) = match source { + Source::RegistryUrl(url) => { + debug!("Fetching source distribution from registry: {url}"); - // Read into a buffer. - let mut buffer = Vec::with_capacity(1024 * 1024); - let mut reader = tokio::io::BufReader::new(reader.compat()); - tokio::io::copy(&mut reader, &mut buffer).await?; + let reader = client.stream_external(&url).await?; + let mut reader = tokio::io::BufReader::new(reader.compat()); - Ok(InMemoryDistribution { remote, buffer }) - } + // Download the source distribution. + let temp_dir = tempfile::tempdir_in(cache)?.into_path(); + let sdist_filename = distribution.filename()?; + let sdist_file = temp_dir.join(sdist_filename.as_ref()); + let mut writer = tokio::fs::File::create(&sdist_file).await?; + tokio::io::copy(&mut reader, &mut writer).await?; + + // Registry dependencies can't specify a subdirectory. + let subdirectory = None; + + (sdist_file, subdirectory) + } + Source::RemoteUrl(url, subdirectory) => { + debug!("Fetching source distribution from URL: {url}"); + + let reader = client.stream_external(url).await?; + let mut reader = tokio::io::BufReader::new(reader.compat()); + + // Download the source distribution. + let temp_dir = tempfile::tempdir_in(cache)?.into_path(); + let sdist_filename = distribution.filename()?; + let sdist_file = temp_dir.join(sdist_filename.as_ref()); + let mut writer = tokio::fs::File::create(&sdist_file).await?; + tokio::io::copy(&mut reader, &mut writer).await?; + + (sdist_file, subdirectory) + } + Source::Git(git, subdirectory) => { + debug!("Fetching source distribution from Git: {git}"); + + let git_dir = cache.join(GIT_CACHE); + let source = GitSource::new(git, git_dir); + let sdist_file = tokio::task::spawn_blocking(move || source.fetch()) + .await?? + .into(); + + (sdist_file, subdirectory) + } + }; + + Ok(Download::SourceDistribution(SourceDistribution { + remote: distribution, + sdist_file, + subdirectory, + })) } } @@ -145,3 +212,84 @@ pub trait Reporter: Send + Sync { /// Callback to invoke when the operation is complete. fn on_download_complete(&self); } + +/// A downloaded wheel that's stored in-memory. +#[derive(Debug)] +pub struct InMemoryWheel { + /// The remote file from which this wheel was downloaded. + pub(crate) remote: RemoteDistribution, + /// The contents of the wheel. + pub(crate) buffer: Vec, +} + +/// A downloaded wheel that's stored on-disk. +#[derive(Debug)] +pub struct DiskWheel { + /// The remote file from which this wheel was downloaded. + pub(crate) remote: RemoteDistribution, + /// The path to the downloaded wheel. + pub(crate) path: PathBuf, +} + +/// A downloaded wheel. +#[derive(Debug)] +pub enum Wheel { + InMemory(InMemoryWheel), + Disk(DiskWheel), +} + +impl Wheel { + /// Return the [`RemoteDistribution`] from which this wheel was downloaded. + pub fn remote(&self) -> &RemoteDistribution { + match self { + Wheel::InMemory(wheel) => &wheel.remote, + Wheel::Disk(wheel) => &wheel.remote, + } + } +} + +impl std::fmt::Display for Wheel { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.remote()) + } +} + +/// A downloaded source distribution. +#[derive(Debug, Clone)] +pub struct SourceDistribution { + /// The remote file from which this wheel was downloaded. + pub(crate) remote: RemoteDistribution, + /// The path to the downloaded archive or directory. + pub(crate) sdist_file: PathBuf, + /// The subdirectory within the archive or directory. + pub(crate) subdirectory: Option, +} + +impl std::fmt::Display for SourceDistribution { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.remote) + } +} + +/// A downloaded distribution, either a wheel or a source distribution. +#[derive(Debug)] +pub enum Download { + Wheel(Wheel), + SourceDistribution(SourceDistribution), +} + +impl Download { + /// Return the [`RemoteDistribution`] from which this distribution was downloaded. + pub fn remote(&self) -> &RemoteDistribution { + match self { + Download::Wheel(distribution) => distribution.remote(), + Download::SourceDistribution(distribution) => &distribution.remote, + } + } +} + +impl std::fmt::Display for Download { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.remote()) + } +} diff --git a/crates/puffin-installer/src/lib.rs b/crates/puffin-installer/src/lib.rs index 3ed82cffd..969ca5374 100644 --- a/crates/puffin-installer/src/lib.rs +++ b/crates/puffin-installer/src/lib.rs @@ -1,4 +1,5 @@ -pub use downloader::{Downloader, Reporter as DownloadReporter}; +pub use builder::{Builder, Reporter as BuildReporter}; +pub use downloader::{Download, Downloader, Reporter as DownloadReporter}; pub use installer::{Installer, Reporter as InstallReporter}; pub use plan::PartitionedRequirements; pub use registry_index::RegistryIndex; @@ -6,9 +7,11 @@ pub use site_packages::SitePackages; pub use uninstall::uninstall; pub use unzipper::{Reporter as UnzipReporter, Unzipper}; +mod builder; mod cache; mod downloader; mod installer; +mod locks; mod plan; mod registry_index; mod site_packages; diff --git a/crates/puffin-installer/src/locks.rs b/crates/puffin-installer/src/locks.rs new file mode 100644 index 000000000..40cf48671 --- /dev/null +++ b/crates/puffin-installer/src/locks.rs @@ -0,0 +1,19 @@ +use fxhash::FxHashMap; +use puffin_cache::RepositoryUrl; +use std::sync::Arc; +use tokio::sync::Mutex; +use url::Url; + +/// A set of locks used to prevent concurrent access to the same resource. +#[derive(Debug, Default)] +pub(crate) struct Locks(Mutex>>>); + +impl Locks { + /// Acquire a lock on the given resource. + pub(crate) async fn acquire(&self, url: &Url) -> Arc> { + let mut map = self.0.lock().await; + map.entry(puffin_cache::digest(&RepositoryUrl::new(url))) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone() + } +} diff --git a/crates/puffin-installer/src/plan.rs b/crates/puffin-installer/src/plan.rs index 680e038fd..2d6844ea6 100644 --- a/crates/puffin-installer/src/plan.rs +++ b/crates/puffin-installer/src/plan.rs @@ -46,6 +46,9 @@ impl PartitionedRequirements { for requirement in requirements { // Filter out already-installed packages. + // TODO(charlie): Detect packages installed via URL. Right now, like pip, we _always_ + // attempt to reinstall a package if it was installed via URL. This is often very + // fast, since the wheel is cached, but it should still be avoidable. if let Some(distribution) = site_packages.remove(&requirement.name) { if requirement.is_satisfied_by(distribution.version()) { debug!("Requirement already satisfied: {distribution}",); diff --git a/crates/puffin-installer/src/unzipper.rs b/crates/puffin-installer/src/unzipper.rs index f1c24efe0..5a712d86d 100644 --- a/crates/puffin-installer/src/unzipper.rs +++ b/crates/puffin-installer/src/unzipper.rs @@ -1,4 +1,5 @@ use std::cmp::Reverse; +use std::io::{Read, Seek}; use std::path::Path; use anyhow::Result; @@ -10,8 +11,8 @@ use zip::ZipArchive; use puffin_distribution::{CachedDistribution, RemoteDistribution}; use crate::cache::WheelCache; -use crate::downloader::InMemoryDistribution; -use crate::vendor::CloneableSeekableReader; +use crate::downloader::Wheel; +use crate::vendor::{CloneableSeekableReader, HasLength}; #[derive(Default)] pub struct Unzipper { @@ -30,7 +31,7 @@ impl Unzipper { /// Unzip a set of downloaded wheels. pub async fn unzip( &self, - downloads: Vec, + downloads: Vec, target: &Path, ) -> Result> { // Create the wheel cache subdirectory, if necessary. @@ -39,14 +40,17 @@ impl Unzipper { // Sort the wheels by size. let mut downloads = downloads; - downloads.sort_unstable_by_key(|wheel| Reverse(wheel.buffer.len())); + downloads.sort_unstable_by_key(|wheel| match wheel { + Wheel::Disk(_) => Reverse(usize::MIN), + Wheel::InMemory(wheel) => Reverse(wheel.buffer.len()), + }); let staging = tempfile::tempdir_in(wheel_cache.root())?; // Unpack the wheels into the cache. let mut wheels = Vec::with_capacity(downloads.len()); for download in downloads { - let remote = download.remote.clone(); + let remote = download.remote().clone(); debug!("Unpacking wheel: {remote}"); @@ -89,12 +93,17 @@ impl Unzipper { } /// Unzip a wheel into the target directory. -fn unzip_wheel(wheel: InMemoryDistribution, target: &Path) -> Result<()> { - // Read the wheel into a buffer. - let reader = std::io::Cursor::new(wheel.buffer); - let archive = ZipArchive::new(CloneableSeekableReader::new(reader))?; +fn unzip_wheel(wheel: Wheel, target: &Path) -> Result<()> { + match wheel { + Wheel::InMemory(wheel) => unzip_archive(std::io::Cursor::new(wheel.buffer), target), + Wheel::Disk(wheel) => unzip_archive(std::fs::File::open(wheel.path)?, target), + } +} +/// Unzip a zip archive into the target directory. +fn unzip_archive(reader: R, target: &Path) -> Result<()> { // Unzip in parallel. + let archive = ZipArchive::new(CloneableSeekableReader::new(reader))?; (0..archive.len()) .par_bridge() .map(|file_number| { diff --git a/crates/puffin-installer/src/vendor/mod.rs b/crates/puffin-installer/src/vendor/mod.rs index d55ab7ed7..3148e2edd 100644 --- a/crates/puffin-installer/src/vendor/mod.rs +++ b/crates/puffin-installer/src/vendor/mod.rs @@ -1,3 +1,3 @@ -pub(crate) use cloneable_seekable_reader::CloneableSeekableReader; +pub(crate) use cloneable_seekable_reader::{CloneableSeekableReader, HasLength}; mod cloneable_seekable_reader; diff --git a/crates/puffin-resolver/src/distribution/source_distribution.rs b/crates/puffin-resolver/src/distribution/source_distribution.rs index 31a9f9ea4..113e3f78d 100644 --- a/crates/puffin-resolver/src/distribution/source_distribution.rs +++ b/crates/puffin-resolver/src/distribution/source_distribution.rs @@ -1,3 +1,7 @@ +//! Fetch and build source distributions from remote sources. +//! +//! TODO(charlie): Unify with `crates/puffin-installer/src/sdist_builder.rs`. + use std::str::FromStr; use anyhow::Result;