diff --git a/Cargo.lock b/Cargo.lock index 56d7ca1b8..f230688cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2278,13 +2278,17 @@ dependencies = [ "install-wheel-rs", "pep440_rs 0.3.12", "pep508_rs", + "puffin-cache", "puffin-client", "puffin-distribution", + "puffin-git", "puffin-interpreter", "puffin-normalize", "puffin-package", + "puffin-traits", "rayon", "tempfile", + "thiserror", "tokio", "tokio-util", "tracing", diff --git a/crates/puffin-cli/src/commands/pip_sync.rs b/crates/puffin-cli/src/commands/pip_sync.rs index 7f89fd8d5..ddd3562b0 100644 --- a/crates/puffin-cli/src/commands/pip_sync.rs +++ b/crates/puffin-cli/src/commands/pip_sync.rs @@ -3,20 +3,22 @@ use std::path::Path; use anyhow::{Context, Result}; use colored::Colorize; -use itertools::Itertools; +use itertools::{Either, Itertools}; use tracing::debug; +use fs_err as fs; use install_wheel_rs::linker::LinkMode; use pep508_rs::Requirement; use platform_host::Platform; use platform_tags::Tags; use puffin_client::RegistryClientBuilder; +use puffin_dispatch::BuildDispatch; use puffin_distribution::Distribution; -use puffin_installer::PartitionedRequirements; +use puffin_installer::{Builder, PartitionedRequirements}; use puffin_interpreter::Virtualenv; use crate::commands::reporters::{ - DownloadReporter, FinderReporter, InstallReporter, UnzipReporter, + BuildReporter, DownloadReporter, FinderReporter, InstallReporter, UnzipReporter, }; use crate::commands::{elapsed, ExitStatus}; use crate::index_urls::IndexUrls; @@ -55,7 +57,6 @@ pub(crate) async fn sync_requirements( cache: &Path, mut printer: Printer, ) -> Result { - // Audit the requirements. let start = std::time::Instant::now(); // Detect the current Python interpreter. @@ -163,6 +164,49 @@ pub(crate) async fn sync_requirements( downloads }; + let (wheels, sdists): (Vec<_>, Vec<_>) = + downloads + .into_iter() + .partition_map(|download| match download { + puffin_installer::Download::Wheel(wheel) => Either::Left(wheel), + puffin_installer::Download::SourceDistribution(sdist) => Either::Right(sdist), + }); + + // Build any missing source distributions. + let sdists = if sdists.is_empty() { + vec![] + } else { + let start = std::time::Instant::now(); + + let build_dispatch = BuildDispatch::new( + RegistryClientBuilder::default().build(), + cache.to_path_buf(), + venv.interpreter_info().clone(), + fs::canonicalize(venv.python_executable())?, + ); + + let builder = Builder::new(&build_dispatch) + .with_reporter(BuildReporter::from(printer).with_length(sdists.len() as u64)); + + let wheels = builder.build(sdists).await?; + + let s = if wheels.len() == 1 { "" } else { "s" }; + writeln!( + printer, + "{}", + format!( + "Built {} in {}", + format!("{} package{}", wheels.len(), s).bold(), + elapsed(start.elapsed()) + ) + .dimmed() + )?; + + wheels + }; + + let downloads = wheels.into_iter().chain(sdists).collect::>(); + // Unzip any downloaded distributions. let unzips = if downloads.is_empty() { vec![] diff --git a/crates/puffin-cli/src/commands/reporters.rs b/crates/puffin-cli/src/commands/reporters.rs index ef7e0381f..54235770d 100644 --- a/crates/puffin-cli/src/commands/reporters.rs +++ b/crates/puffin-cli/src/commands/reporters.rs @@ -148,6 +148,41 @@ impl puffin_installer::InstallReporter for InstallReporter { } } +#[derive(Debug)] +pub(crate) struct BuildReporter { + progress: ProgressBar, +} + +impl From for BuildReporter { + fn from(printer: Printer) -> Self { + let progress = ProgressBar::with_draw_target(None, printer.target()); + progress.set_style( + ProgressStyle::with_template("{bar:20} [{pos}/{len}] {wide_msg:.dim}").unwrap(), + ); + progress.set_message("Building wheels..."); + Self { progress } + } +} + +impl BuildReporter { + #[must_use] + pub(crate) fn with_length(self, length: u64) -> Self { + self.progress.set_length(length); + self + } +} + +impl puffin_installer::BuildReporter for BuildReporter { + fn on_progress(&self, wheel: &RemoteDistribution) { + self.progress.set_message(format!("{wheel}")); + self.progress.inc(1); + } + + fn on_complete(&self) { + self.progress.finish_and_clear(); + } +} + #[derive(Debug)] pub(crate) struct ResolverReporter { progress: ProgressBar, diff --git a/crates/puffin-cli/tests/pip_sync.rs b/crates/puffin-cli/tests/pip_sync.rs index ae93aa557..43783c4d6 100644 --- a/crates/puffin-cli/tests/pip_sync.rs +++ b/crates/puffin-cli/tests/pip_sync.rs @@ -569,3 +569,140 @@ fn install_url() -> Result<()> { Ok(()) } + +/// Install a package into a virtual environment from a Git repository. +#[test] +#[cfg(feature = "git")] +fn install_git() -> Result<()> { + let temp_dir = assert_fs::TempDir::new()?; + let cache_dir = assert_fs::TempDir::new()?; + let venv = temp_dir.child(".venv"); + + Command::new(get_cargo_bin(BIN_NAME)) + .arg("venv") + .arg(venv.as_os_str()) + .arg("--cache-dir") + .arg(cache_dir.path()) + .current_dir(&temp_dir) + .assert() + .success(); + venv.assert(predicates::path::is_dir()); + + let requirements_txt = temp_dir.child("requirements.txt"); + requirements_txt.touch()?; + requirements_txt.write_str("werkzeug @ git+https://github.com/pallets/werkzeug.git@2.0.0")?; + + insta::with_settings!({ + filters => vec![ + (r"(\d|\.)+(ms|s)", "[TIME]"), + ] + }, { + assert_cmd_snapshot!(Command::new(get_cargo_bin(BIN_NAME)) + .arg("pip-sync") + .arg("requirements.txt") + .arg("--cache-dir") + .arg(cache_dir.path()) + .env("VIRTUAL_ENV", venv.as_os_str()) + .current_dir(&temp_dir)); + }); + + Command::new(venv.join("bin").join("python")) + .arg("-c") + .arg("import werkzeug") + .current_dir(&temp_dir) + .assert() + .success(); + + Ok(()) +} + +/// Install two packages from the same Git repository. +#[test] +#[cfg(feature = "git")] +fn install_git_subdirectories() -> Result<()> { + let temp_dir = assert_fs::TempDir::new()?; + let cache_dir = assert_fs::TempDir::new()?; + let venv = temp_dir.child(".venv"); + + Command::new(get_cargo_bin(BIN_NAME)) + .arg("venv") + .arg(venv.as_os_str()) + .arg("--cache-dir") + .arg(cache_dir.path()) + .current_dir(&temp_dir) + .assert() + .success(); + venv.assert(predicates::path::is_dir()); + + let requirements_txt = temp_dir.child("requirements.txt"); + requirements_txt.touch()?; + requirements_txt.write_str("example-pkg-a @ git+https://github.com/pypa/sample-namespace-packages.git@df7530eeb8fa0cb7dbb8ecb28363e8e36bfa2f45#subdirectory=pkg_resources/pkg_a\nexample-pkg-b @ git+https://github.com/pypa/sample-namespace-packages.git@df7530eeb8fa0cb7dbb8ecb28363e8e36bfa2f45#subdirectory=pkg_resources/pkg_b")?; + + insta::with_settings!({ + filters => vec![ + (r"(\d|\.)+(ms|s)", "[TIME]"), + ] + }, { + assert_cmd_snapshot!(Command::new(get_cargo_bin(BIN_NAME)) + .arg("pip-sync") + .arg("requirements.txt") + .arg("--cache-dir") + .arg(cache_dir.path()) + .env("VIRTUAL_ENV", venv.as_os_str()) + .current_dir(&temp_dir)); + }); + + Command::new(venv.join("bin").join("python")) + .arg("-c") + .arg("import example_pkg") + .current_dir(&temp_dir) + .assert() + .success(); + + Ok(()) +} + +/// Install a source distribution into a virtual environment. +#[test] +fn install_sdist() -> Result<()> { + let temp_dir = assert_fs::TempDir::new()?; + let cache_dir = assert_fs::TempDir::new()?; + let venv = temp_dir.child(".venv"); + + Command::new(get_cargo_bin(BIN_NAME)) + .arg("venv") + .arg(venv.as_os_str()) + .arg("--cache-dir") + .arg(cache_dir.path()) + .current_dir(&temp_dir) + .assert() + .success(); + venv.assert(predicates::path::is_dir()); + + let requirements_txt = temp_dir.child("requirements.txt"); + requirements_txt.touch()?; + requirements_txt.write_str("Werkzeug==0.9.6")?; + + insta::with_settings!({ + filters => vec![ + (r"(\d|\.)+(ms|s)", "[TIME]"), + ] + }, { + assert_cmd_snapshot!(Command::new(get_cargo_bin(BIN_NAME)) + .arg("pip-sync") + .arg("requirements.txt") + .arg("--cache-dir") + .arg(cache_dir.path()) + .env("VIRTUAL_ENV", venv.as_os_str()) + .current_dir(&temp_dir)); + }); + + Command::new(venv.join("bin").join("python")) + .arg("-c") + .arg("import werkzeug") + .current_dir(&temp_dir) + .assert() + .success(); + + Ok(()) +} diff --git a/crates/puffin-cli/tests/snapshots/pip_sync__install_git.snap b/crates/puffin-cli/tests/snapshots/pip_sync__install_git.snap new file mode 100644 index 000000000..870aec6d4 --- /dev/null +++ b/crates/puffin-cli/tests/snapshots/pip_sync__install_git.snap @@ -0,0 +1,24 @@ +--- +source: crates/puffin-cli/tests/pip_sync.rs +info: + program: puffin + args: + - pip-sync + - requirements.txt + - "--cache-dir" + - /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpZNXBuT + env: + VIRTUAL_ENV: /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpBYANH7/.venv +--- +success: true +exit_code: 0 +----- stdout ----- + +----- stderr ----- +Resolved 1 package in [TIME] +Downloaded 1 package in [TIME] +Built 1 package in [TIME] +Unzipped 1 package in [TIME] +Installed 1 package in [TIME] + + werkzeug @ git+https://github.com/pallets/werkzeug.git@2.0.0 + diff --git a/crates/puffin-cli/tests/snapshots/pip_sync__install_git_subdirectories.snap b/crates/puffin-cli/tests/snapshots/pip_sync__install_git_subdirectories.snap new file mode 100644 index 000000000..a1a42bd22 --- /dev/null +++ b/crates/puffin-cli/tests/snapshots/pip_sync__install_git_subdirectories.snap @@ -0,0 +1,25 @@ +--- +source: crates/puffin-cli/tests/pip_sync.rs +info: + program: puffin + args: + - pip-sync + - requirements.txt + - "--cache-dir" + - /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpJvtAcl + env: + VIRTUAL_ENV: /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpYIBn2G/.venv +--- +success: true +exit_code: 0 +----- stdout ----- + +----- stderr ----- +Resolved 2 packages in [TIME] +Downloaded 2 packages in [TIME] +Built 2 packages in [TIME] +Unzipped 2 packages in [TIME] +Installed 2 packages in [TIME] + + example-pkg-a @ git+https://github.com/pypa/sample-namespace-packages.git@df7530eeb8fa0cb7dbb8ecb28363e8e36bfa2f45#subdirectory=pkg_resources/pkg_a + + example-pkg-b @ git+https://github.com/pypa/sample-namespace-packages.git@df7530eeb8fa0cb7dbb8ecb28363e8e36bfa2f45#subdirectory=pkg_resources/pkg_b + diff --git a/crates/puffin-cli/tests/snapshots/pip_sync__install_sdist.snap b/crates/puffin-cli/tests/snapshots/pip_sync__install_sdist.snap new file mode 100644 index 000000000..bdeb70436 --- /dev/null +++ b/crates/puffin-cli/tests/snapshots/pip_sync__install_sdist.snap @@ -0,0 +1,24 @@ +--- +source: crates/puffin-cli/tests/pip_sync.rs +info: + program: puffin + args: + - pip-sync + - requirements.txt + - "--cache-dir" + - /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpTtWuay + env: + VIRTUAL_ENV: /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpdSdhVI/.venv +--- +success: true +exit_code: 0 +----- stdout ----- + +----- stderr ----- +Resolved 1 package in [TIME] +Downloaded 1 package in [TIME] +Built 1 package in [TIME] +Unzipped 1 package in [TIME] +Installed 1 package in [TIME] + + werkzeug==0.9.6 + diff --git a/crates/puffin-dispatch/src/lib.rs b/crates/puffin-dispatch/src/lib.rs index 5ab815f0e..7db635105 100644 --- a/crates/puffin-dispatch/src/lib.rs +++ b/crates/puffin-dispatch/src/lib.rs @@ -8,14 +8,14 @@ use std::pin::Pin; use anyhow::Context; use anyhow::Result; -use itertools::Itertools; +use itertools::{Either, Itertools}; use tracing::{debug, instrument}; use pep508_rs::Requirement; use platform_tags::Tags; use puffin_build::SourceDistributionBuilder; use puffin_client::RegistryClient; -use puffin_installer::{Downloader, Installer, PartitionedRequirements, Unzipper}; +use puffin_installer::{Builder, Downloader, Installer, PartitionedRequirements, Unzipper}; use puffin_interpreter::{InterpreterInfo, Virtualenv}; use puffin_resolver::{DistributionFinder, Manifest, PreReleaseMode, ResolutionMode, Resolver}; use puffin_traits::BuildContext; @@ -144,6 +144,33 @@ impl BuildContext for BuildDispatch { .context("Failed to download build dependencies")? }; + let (wheels, sdists): (Vec<_>, Vec<_>) = + downloads + .into_iter() + .partition_map(|download| match download { + puffin_installer::Download::Wheel(wheel) => Either::Left(wheel), + puffin_installer::Download::SourceDistribution(sdist) => { + Either::Right(sdist) + } + }); + + // Build any missing source distributions. + let sdists = if sdists.is_empty() { + vec![] + } else { + debug!( + "Building source distributions{}: {}", + if sdists.len() == 1 { "" } else { "s" }, + sdists.iter().map(ToString::to_string).join(", ") + ); + Builder::new(self) + .build(sdists) + .await + .context("Failed to build source distributions")? + }; + + let downloads = wheels.into_iter().chain(sdists).collect::>(); + // Unzip any downloaded distributions. let unzips = if downloads.is_empty() { vec![] diff --git a/crates/puffin-distribution/src/lib.rs b/crates/puffin-distribution/src/lib.rs index b348d6ddf..7337f8391 100644 --- a/crates/puffin-distribution/src/lib.rs +++ b/crates/puffin-distribution/src/lib.rs @@ -97,6 +97,31 @@ impl RemoteDistribution { Self::Url(name, url) } + /// Return the URL of the distribution. + pub fn url(&self) -> Result> { + match self { + Self::Registry(_, _, file) => { + let url = Url::parse(&file.url)?; + Ok(Cow::Owned(url)) + } + Self::Url(_, url) => Ok(Cow::Borrowed(url)), + } + } + + /// Return the filename of the distribution. + pub fn filename(&self) -> Result> { + match self { + Self::Registry(_, _, file) => Ok(Cow::Borrowed(&file.filename)), + Self::Url(_, url) => { + let filename = url + .path_segments() + .and_then(std::iter::Iterator::last) + .ok_or_else(|| anyhow!("Could not parse filename from URL: {}", url))?; + Ok(Cow::Owned(filename.to_owned())) + } + } + } + /// Return the normalized [`PackageName`] of the distribution. pub fn name(&self) -> &PackageName { match self { @@ -125,6 +150,17 @@ impl RemoteDistribution { Self::Url(_name, url) => puffin_cache::digest(&CanonicalUrl::new(url)), } } + + /// Returns `true` if this distribution is a wheel. + pub fn is_wheel(&self) -> bool { + let filename = match self { + Self::Registry(_name, _version, file) => &file.filename, + Self::Url(_name, url) => url.path(), + }; + Path::new(filename) + .extension() + .is_some_and(|ext| ext.eq_ignore_ascii_case("whl")) + } } impl std::fmt::Display for RemoteDistribution { @@ -379,3 +415,14 @@ impl std::fmt::Display for RemoteDistributionRef<'_> { } } } + +impl<'a> From<&'a RemoteDistribution> for RemoteDistributionRef<'a> { + fn from(dist: &'a RemoteDistribution) -> Self { + match dist { + RemoteDistribution::Registry(name, version, file) => { + Self::Registry(name, version, file) + } + RemoteDistribution::Url(name, url) => Self::Url(name, url), + } + } +} diff --git a/crates/puffin-installer/Cargo.toml b/crates/puffin-installer/Cargo.toml index 2bb983a68..3005cd889 100644 --- a/crates/puffin-installer/Cargo.toml +++ b/crates/puffin-installer/Cargo.toml @@ -13,11 +13,14 @@ license = { workspace = true } install-wheel-rs = { path = "../install-wheel-rs", default-features = false } pep440_rs = { path = "../pep440-rs" } pep508_rs = { path = "../pep508-rs" } +puffin-cache = { path = "../puffin-cache" } puffin-client = { path = "../puffin-client" } puffin-distribution = { path = "../puffin-distribution" } +puffin-git = { path = "../puffin-git" } puffin-interpreter = { path = "../puffin-interpreter" } puffin-normalize = { path = "../puffin-normalize" } puffin-package = { path = "../puffin-package" } +puffin-traits = { path = "../puffin-traits" } distribution-filename = { path = "../distribution-filename" } anyhow = { workspace = true } @@ -26,6 +29,7 @@ fs-err = { workspace = true } fxhash = { workspace = true } rayon = { workspace = true } tempfile = { workspace = true } +thiserror = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } tracing = { workspace = true } diff --git a/crates/puffin-installer/src/builder.rs b/crates/puffin-installer/src/builder.rs new file mode 100644 index 000000000..b982f4e23 --- /dev/null +++ b/crates/puffin-installer/src/builder.rs @@ -0,0 +1,109 @@ +//! Build source distributions from downloaded archives. +//! +//! TODO(charlie): Unify with `crates/puffin-resolver/src/distribution/source_distribution.rs`. + +use std::cmp::Reverse; + +use anyhow::Result; +use fs_err::tokio as fs; +use tracing::debug; + +use puffin_distribution::RemoteDistribution; +use puffin_traits::BuildContext; + +use crate::downloader::{DiskWheel, SourceDistribution, Wheel}; + +const BUILT_WHEELS_CACHE: &str = "built-wheels-v0"; + +pub struct Builder<'a, T: BuildContext + Send + Sync> { + build_context: &'a T, + reporter: Option>, +} + +impl<'a, T: BuildContext + Send + Sync> Builder<'a, T> { + /// Initialize a new source distribution downloader. + pub fn new(build_context: &'a T) -> Self { + Self { + build_context, + reporter: None, + } + } + + /// Set the [`Reporter`] to use for this downloader. + #[must_use] + pub fn with_reporter(self, reporter: impl Reporter + 'static) -> Self { + Self { + reporter: Some(Box::new(reporter)), + ..self + } + } + + /// Build a set of source distributions. + pub async fn build(&'a self, distributions: Vec) -> Result> { + // Sort the distributions by size. + let mut distributions = distributions; + distributions.sort_unstable_by_key(|distribution| match &distribution.remote { + RemoteDistribution::Registry(_package, _version, file) => Reverse(file.size), + RemoteDistribution::Url(_, _) => Reverse(usize::MIN), + }); + + // Build the distributions serially. + let mut builds = Vec::with_capacity(distributions.len()); + for distribution in distributions { + debug!("Building source distribution: {distribution}"); + + let result = build_sdist(distribution, self.build_context).await?; + + if let Some(reporter) = self.reporter.as_ref() { + reporter.on_progress(result.remote()); + } + + builds.push(result); + } + + if let Some(reporter) = self.reporter.as_ref() { + reporter.on_complete(); + } + + Ok(builds) + } +} + +/// Build a source distribution into a wheel. +async fn build_sdist( + distribution: SourceDistribution, + build_context: &T, +) -> Result { + // Create a directory for the wheel. + let wheel_dir = build_context + .cache() + .join(BUILT_WHEELS_CACHE) + .join(distribution.remote.id()); + fs::create_dir_all(&wheel_dir).await?; + + // Build the wheel. + // TODO(charlie): If this is a Git dependency, we should do another checkout. If the same + // repository is used by multiple dependencies, at multiple commits, the local checkout may now + // point to the wrong commit. + let disk_filename = build_context + .build_source_distribution( + &distribution.sdist_file, + distribution.subdirectory.as_deref(), + &wheel_dir, + ) + .await?; + let wheel_filename = wheel_dir.join(disk_filename); + + Ok(Wheel::Disk(DiskWheel { + remote: distribution.remote, + path: wheel_filename, + })) +} + +pub trait Reporter: Send + Sync { + /// Callback to invoke when a source distribution is built. + fn on_progress(&self, wheel: &RemoteDistribution); + + /// Callback to invoke when the operation is complete. + fn on_complete(&self); +} diff --git a/crates/puffin-installer/src/downloader.rs b/crates/puffin-installer/src/downloader.rs index 6dbf3696a..672f95caf 100644 --- a/crates/puffin-installer/src/downloader.rs +++ b/crates/puffin-installer/src/downloader.rs @@ -1,5 +1,6 @@ use std::cmp::Reverse; use std::path::{Path, PathBuf}; +use std::sync::Arc; use anyhow::Result; use cacache::{Algorithm, Integrity}; @@ -9,20 +10,28 @@ use tracing::debug; use url::Url; use puffin_client::RegistryClient; -use puffin_distribution::RemoteDistribution; +use puffin_distribution::source::Source; +use puffin_distribution::{RemoteDistribution, RemoteDistributionRef}; +use puffin_git::GitSource; + +use crate::locks::Locks; + +const GIT_CACHE: &str = "git-v0"; pub struct Downloader<'a> { client: &'a RegistryClient, cache: &'a Path, + locks: Arc, reporter: Option>, } impl<'a> Downloader<'a> { - /// Initialize a new downloader. + /// Initialize a new distribution downloader. pub fn new(client: &'a RegistryClient, cache: &'a Path) -> Self { Self { client, cache, + locks: Arc::new(Locks::default()), reporter: None, } } @@ -36,28 +45,29 @@ impl<'a> Downloader<'a> { } } - /// Install a set of wheels into a Python virtual environment. + /// Download a set of distributions. pub async fn download( &'a self, - wheels: Vec, - ) -> Result> { - // Sort the wheels by size. - let mut wheels = wheels; - wheels.sort_unstable_by_key(|wheel| match wheel { + distributions: Vec, + ) -> Result> { + // Sort the distributions by size. + let mut distributions = distributions; + distributions.sort_unstable_by_key(|wheel| match wheel { RemoteDistribution::Registry(_package, _version, file) => Reverse(file.size), RemoteDistribution::Url(_, _) => Reverse(usize::MIN), }); - // Phase 1: Fetch the wheels in parallel. + // Fetch the distributions in parallel. let mut fetches = JoinSet::new(); - let mut downloads = Vec::with_capacity(wheels.len()); - for remote in wheels { - debug!("Downloading wheel: {remote}"); + let mut downloads = Vec::with_capacity(distributions.len()); + for distribution in distributions { + debug!("Downloading wheel: {distribution}"); - fetches.spawn(fetch_wheel( - remote.clone(), + fetches.spawn(fetch_distribution( + distribution.clone(), self.client.clone(), self.cache.to_path_buf(), + self.locks.clone(), )); } @@ -65,7 +75,7 @@ impl<'a> Downloader<'a> { let result = result?; if let Some(reporter) = self.reporter.as_ref() { - reporter.on_download_progress(&result.remote); + reporter.on_download_progress(result.remote()); } downloads.push(result); @@ -79,62 +89,119 @@ impl<'a> Downloader<'a> { } } -#[derive(Debug)] -pub struct InMemoryDistribution { - /// The remote file from which this wheel was downloaded. - pub(crate) remote: RemoteDistribution, - /// The contents of the wheel. - pub(crate) buffer: Vec, -} - -impl std::fmt::Display for InMemoryDistribution { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.remote) - } -} - -/// Download a wheel to a given path. -async fn fetch_wheel( - remote: RemoteDistribution, +/// Download a built distribution (wheel) or source distribution (sdist). +async fn fetch_distribution( + distribution: RemoteDistribution, client: RegistryClient, cache: PathBuf, -) -> Result { - match &remote { - RemoteDistribution::Registry(.., file) => { - // Parse the wheel's SRI. - let sri = Integrity::from_hex(&file.hashes.sha256, Algorithm::Sha256)?; + locks: Arc, +) -> Result { + let url = distribution.url()?; + let lock = locks.acquire(&url).await; + let _guard = lock.lock().await; - // Read from the cache, if possible. - if let Ok(buffer) = cacache::read_hash(&cache, &sri).await { - debug!("Extracted wheel from cache: {remote}"); - return Ok(InMemoryDistribution { remote, buffer }); + if distribution.is_wheel() { + match &distribution { + RemoteDistribution::Registry(.., file) => { + // Parse the wheel's SRI. + let sri = Integrity::from_hex(&file.hashes.sha256, Algorithm::Sha256)?; + + // Read from the cache, if possible. + if let Ok(buffer) = cacache::read_hash(&cache, &sri).await { + debug!("Extracted wheel from cache: {distribution}"); + return Ok(Download::Wheel(Wheel::InMemory(InMemoryWheel { + remote: distribution, + buffer, + }))); + } + + // Fetch the wheel. + let url = Url::parse(&file.url)?; + let reader = client.stream_external(&url).await?; + + // Read into a buffer. + let mut buffer = Vec::with_capacity(file.size); + let mut reader = tokio::io::BufReader::new(reader.compat()); + tokio::io::copy(&mut reader, &mut buffer).await?; + + // Write the buffer to the cache. + cacache::write_hash(&cache, &buffer).await?; + + Ok(Download::Wheel(Wheel::InMemory(InMemoryWheel { + remote: distribution, + buffer, + }))) } + RemoteDistribution::Url(.., url) => { + // Fetch the wheel. + let reader = client.stream_external(url).await?; - // Fetch the wheel. - let url = Url::parse(&file.url)?; - let reader = client.stream_external(&url).await?; + // Read into a buffer. + let mut buffer = Vec::with_capacity(1024 * 1024); + let mut reader = tokio::io::BufReader::new(reader.compat()); + tokio::io::copy(&mut reader, &mut buffer).await?; - // Read into a buffer. - let mut buffer = Vec::with_capacity(file.size); - let mut reader = tokio::io::BufReader::new(reader.compat()); - tokio::io::copy(&mut reader, &mut buffer).await?; - - // Write the buffer to the cache, if possible. - cacache::write_hash(&cache, &buffer).await?; - - Ok(InMemoryDistribution { remote, buffer }) + Ok(Download::Wheel(Wheel::InMemory(InMemoryWheel { + remote: distribution, + buffer, + }))) + } } - RemoteDistribution::Url(.., url) => { - // Fetch the wheel. - let reader = client.stream_external(url).await?; + } else { + let distribution_ref = RemoteDistributionRef::from(&distribution); + let source = Source::try_from(&distribution_ref)?; + let (sdist_file, subdirectory) = match source { + Source::RegistryUrl(url) => { + debug!("Fetching source distribution from registry: {url}"); - // Read into a buffer. - let mut buffer = Vec::with_capacity(1024 * 1024); - let mut reader = tokio::io::BufReader::new(reader.compat()); - tokio::io::copy(&mut reader, &mut buffer).await?; + let reader = client.stream_external(&url).await?; + let mut reader = tokio::io::BufReader::new(reader.compat()); - Ok(InMemoryDistribution { remote, buffer }) - } + // Download the source distribution. + let temp_dir = tempfile::tempdir_in(cache)?.into_path(); + let sdist_filename = distribution.filename()?; + let sdist_file = temp_dir.join(sdist_filename.as_ref()); + let mut writer = tokio::fs::File::create(&sdist_file).await?; + tokio::io::copy(&mut reader, &mut writer).await?; + + // Registry dependencies can't specify a subdirectory. + let subdirectory = None; + + (sdist_file, subdirectory) + } + Source::RemoteUrl(url, subdirectory) => { + debug!("Fetching source distribution from URL: {url}"); + + let reader = client.stream_external(url).await?; + let mut reader = tokio::io::BufReader::new(reader.compat()); + + // Download the source distribution. + let temp_dir = tempfile::tempdir_in(cache)?.into_path(); + let sdist_filename = distribution.filename()?; + let sdist_file = temp_dir.join(sdist_filename.as_ref()); + let mut writer = tokio::fs::File::create(&sdist_file).await?; + tokio::io::copy(&mut reader, &mut writer).await?; + + (sdist_file, subdirectory) + } + Source::Git(git, subdirectory) => { + debug!("Fetching source distribution from Git: {git}"); + + let git_dir = cache.join(GIT_CACHE); + let source = GitSource::new(git, git_dir); + let sdist_file = tokio::task::spawn_blocking(move || source.fetch()) + .await?? + .into(); + + (sdist_file, subdirectory) + } + }; + + Ok(Download::SourceDistribution(SourceDistribution { + remote: distribution, + sdist_file, + subdirectory, + })) } } @@ -145,3 +212,84 @@ pub trait Reporter: Send + Sync { /// Callback to invoke when the operation is complete. fn on_download_complete(&self); } + +/// A downloaded wheel that's stored in-memory. +#[derive(Debug)] +pub struct InMemoryWheel { + /// The remote file from which this wheel was downloaded. + pub(crate) remote: RemoteDistribution, + /// The contents of the wheel. + pub(crate) buffer: Vec, +} + +/// A downloaded wheel that's stored on-disk. +#[derive(Debug)] +pub struct DiskWheel { + /// The remote file from which this wheel was downloaded. + pub(crate) remote: RemoteDistribution, + /// The path to the downloaded wheel. + pub(crate) path: PathBuf, +} + +/// A downloaded wheel. +#[derive(Debug)] +pub enum Wheel { + InMemory(InMemoryWheel), + Disk(DiskWheel), +} + +impl Wheel { + /// Return the [`RemoteDistribution`] from which this wheel was downloaded. + pub fn remote(&self) -> &RemoteDistribution { + match self { + Wheel::InMemory(wheel) => &wheel.remote, + Wheel::Disk(wheel) => &wheel.remote, + } + } +} + +impl std::fmt::Display for Wheel { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.remote()) + } +} + +/// A downloaded source distribution. +#[derive(Debug, Clone)] +pub struct SourceDistribution { + /// The remote file from which this wheel was downloaded. + pub(crate) remote: RemoteDistribution, + /// The path to the downloaded archive or directory. + pub(crate) sdist_file: PathBuf, + /// The subdirectory within the archive or directory. + pub(crate) subdirectory: Option, +} + +impl std::fmt::Display for SourceDistribution { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.remote) + } +} + +/// A downloaded distribution, either a wheel or a source distribution. +#[derive(Debug)] +pub enum Download { + Wheel(Wheel), + SourceDistribution(SourceDistribution), +} + +impl Download { + /// Return the [`RemoteDistribution`] from which this distribution was downloaded. + pub fn remote(&self) -> &RemoteDistribution { + match self { + Download::Wheel(distribution) => distribution.remote(), + Download::SourceDistribution(distribution) => &distribution.remote, + } + } +} + +impl std::fmt::Display for Download { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.remote()) + } +} diff --git a/crates/puffin-installer/src/lib.rs b/crates/puffin-installer/src/lib.rs index 3ed82cffd..969ca5374 100644 --- a/crates/puffin-installer/src/lib.rs +++ b/crates/puffin-installer/src/lib.rs @@ -1,4 +1,5 @@ -pub use downloader::{Downloader, Reporter as DownloadReporter}; +pub use builder::{Builder, Reporter as BuildReporter}; +pub use downloader::{Download, Downloader, Reporter as DownloadReporter}; pub use installer::{Installer, Reporter as InstallReporter}; pub use plan::PartitionedRequirements; pub use registry_index::RegistryIndex; @@ -6,9 +7,11 @@ pub use site_packages::SitePackages; pub use uninstall::uninstall; pub use unzipper::{Reporter as UnzipReporter, Unzipper}; +mod builder; mod cache; mod downloader; mod installer; +mod locks; mod plan; mod registry_index; mod site_packages; diff --git a/crates/puffin-installer/src/locks.rs b/crates/puffin-installer/src/locks.rs new file mode 100644 index 000000000..40cf48671 --- /dev/null +++ b/crates/puffin-installer/src/locks.rs @@ -0,0 +1,19 @@ +use fxhash::FxHashMap; +use puffin_cache::RepositoryUrl; +use std::sync::Arc; +use tokio::sync::Mutex; +use url::Url; + +/// A set of locks used to prevent concurrent access to the same resource. +#[derive(Debug, Default)] +pub(crate) struct Locks(Mutex>>>); + +impl Locks { + /// Acquire a lock on the given resource. + pub(crate) async fn acquire(&self, url: &Url) -> Arc> { + let mut map = self.0.lock().await; + map.entry(puffin_cache::digest(&RepositoryUrl::new(url))) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone() + } +} diff --git a/crates/puffin-installer/src/plan.rs b/crates/puffin-installer/src/plan.rs index 680e038fd..2d6844ea6 100644 --- a/crates/puffin-installer/src/plan.rs +++ b/crates/puffin-installer/src/plan.rs @@ -46,6 +46,9 @@ impl PartitionedRequirements { for requirement in requirements { // Filter out already-installed packages. + // TODO(charlie): Detect packages installed via URL. Right now, like pip, we _always_ + // attempt to reinstall a package if it was installed via URL. This is often very + // fast, since the wheel is cached, but it should still be avoidable. if let Some(distribution) = site_packages.remove(&requirement.name) { if requirement.is_satisfied_by(distribution.version()) { debug!("Requirement already satisfied: {distribution}",); diff --git a/crates/puffin-installer/src/unzipper.rs b/crates/puffin-installer/src/unzipper.rs index f1c24efe0..5a712d86d 100644 --- a/crates/puffin-installer/src/unzipper.rs +++ b/crates/puffin-installer/src/unzipper.rs @@ -1,4 +1,5 @@ use std::cmp::Reverse; +use std::io::{Read, Seek}; use std::path::Path; use anyhow::Result; @@ -10,8 +11,8 @@ use zip::ZipArchive; use puffin_distribution::{CachedDistribution, RemoteDistribution}; use crate::cache::WheelCache; -use crate::downloader::InMemoryDistribution; -use crate::vendor::CloneableSeekableReader; +use crate::downloader::Wheel; +use crate::vendor::{CloneableSeekableReader, HasLength}; #[derive(Default)] pub struct Unzipper { @@ -30,7 +31,7 @@ impl Unzipper { /// Unzip a set of downloaded wheels. pub async fn unzip( &self, - downloads: Vec, + downloads: Vec, target: &Path, ) -> Result> { // Create the wheel cache subdirectory, if necessary. @@ -39,14 +40,17 @@ impl Unzipper { // Sort the wheels by size. let mut downloads = downloads; - downloads.sort_unstable_by_key(|wheel| Reverse(wheel.buffer.len())); + downloads.sort_unstable_by_key(|wheel| match wheel { + Wheel::Disk(_) => Reverse(usize::MIN), + Wheel::InMemory(wheel) => Reverse(wheel.buffer.len()), + }); let staging = tempfile::tempdir_in(wheel_cache.root())?; // Unpack the wheels into the cache. let mut wheels = Vec::with_capacity(downloads.len()); for download in downloads { - let remote = download.remote.clone(); + let remote = download.remote().clone(); debug!("Unpacking wheel: {remote}"); @@ -89,12 +93,17 @@ impl Unzipper { } /// Unzip a wheel into the target directory. -fn unzip_wheel(wheel: InMemoryDistribution, target: &Path) -> Result<()> { - // Read the wheel into a buffer. - let reader = std::io::Cursor::new(wheel.buffer); - let archive = ZipArchive::new(CloneableSeekableReader::new(reader))?; +fn unzip_wheel(wheel: Wheel, target: &Path) -> Result<()> { + match wheel { + Wheel::InMemory(wheel) => unzip_archive(std::io::Cursor::new(wheel.buffer), target), + Wheel::Disk(wheel) => unzip_archive(std::fs::File::open(wheel.path)?, target), + } +} +/// Unzip a zip archive into the target directory. +fn unzip_archive(reader: R, target: &Path) -> Result<()> { // Unzip in parallel. + let archive = ZipArchive::new(CloneableSeekableReader::new(reader))?; (0..archive.len()) .par_bridge() .map(|file_number| { diff --git a/crates/puffin-installer/src/vendor/mod.rs b/crates/puffin-installer/src/vendor/mod.rs index d55ab7ed7..3148e2edd 100644 --- a/crates/puffin-installer/src/vendor/mod.rs +++ b/crates/puffin-installer/src/vendor/mod.rs @@ -1,3 +1,3 @@ -pub(crate) use cloneable_seekable_reader::CloneableSeekableReader; +pub(crate) use cloneable_seekable_reader::{CloneableSeekableReader, HasLength}; mod cloneable_seekable_reader; diff --git a/crates/puffin-resolver/src/distribution/source_distribution.rs b/crates/puffin-resolver/src/distribution/source_distribution.rs index 31a9f9ea4..113e3f78d 100644 --- a/crates/puffin-resolver/src/distribution/source_distribution.rs +++ b/crates/puffin-resolver/src/distribution/source_distribution.rs @@ -1,3 +1,7 @@ +//! Fetch and build source distributions from remote sources. +//! +//! TODO(charlie): Unify with `crates/puffin-installer/src/sdist_builder.rs`. + use std::str::FromStr; use anyhow::Result;