Add source distribution support to pip-compile (#323)

## Summary

This is a first-pass at adding source distribution support to the
installer.

The previous installation flow was:

1. Come up with a plan.
1. Find a distribution (specific file) for every package that we'll need
to download.
1. Download those distributions.
1. Unzip them (since we assumed they were all wheels).
1. Install them into the virtual environment.

Now, Step (3) downloads both wheels and source distributions, and we
insert a step between Steps (3) and (4) to build any source
distributions into zipped wheels.

There are a bunch of TODOs, the most important (IMO) is that we
basically have two implementations of downloading and building, between
the stuff in `puffin_installer` and `puffin_resolver` (namely in
`crates/puffin-resolver/src/distribution`). I didn't attempt to clean
that up here -- it's already a problem, and it's related to the overall
problem we need to solve around unified caching and resource management.

Closes #243.
This commit is contained in:
Charlie Marsh 2023-11-06 05:22:36 -08:00 committed by GitHub
parent b79a15b458
commit 6d672b8951
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 745 additions and 79 deletions

4
Cargo.lock generated
View file

@ -2278,13 +2278,17 @@ dependencies = [
"install-wheel-rs",
"pep440_rs 0.3.12",
"pep508_rs",
"puffin-cache",
"puffin-client",
"puffin-distribution",
"puffin-git",
"puffin-interpreter",
"puffin-normalize",
"puffin-package",
"puffin-traits",
"rayon",
"tempfile",
"thiserror",
"tokio",
"tokio-util",
"tracing",

View file

@ -3,20 +3,22 @@ use std::path::Path;
use anyhow::{Context, Result};
use colored::Colorize;
use itertools::Itertools;
use itertools::{Either, Itertools};
use tracing::debug;
use fs_err as fs;
use install_wheel_rs::linker::LinkMode;
use pep508_rs::Requirement;
use platform_host::Platform;
use platform_tags::Tags;
use puffin_client::RegistryClientBuilder;
use puffin_dispatch::BuildDispatch;
use puffin_distribution::Distribution;
use puffin_installer::PartitionedRequirements;
use puffin_installer::{Builder, PartitionedRequirements};
use puffin_interpreter::Virtualenv;
use crate::commands::reporters::{
DownloadReporter, FinderReporter, InstallReporter, UnzipReporter,
BuildReporter, DownloadReporter, FinderReporter, InstallReporter, UnzipReporter,
};
use crate::commands::{elapsed, ExitStatus};
use crate::index_urls::IndexUrls;
@ -55,7 +57,6 @@ pub(crate) async fn sync_requirements(
cache: &Path,
mut printer: Printer,
) -> Result<ExitStatus> {
// Audit the requirements.
let start = std::time::Instant::now();
// Detect the current Python interpreter.
@ -163,6 +164,49 @@ pub(crate) async fn sync_requirements(
downloads
};
let (wheels, sdists): (Vec<_>, Vec<_>) =
downloads
.into_iter()
.partition_map(|download| match download {
puffin_installer::Download::Wheel(wheel) => Either::Left(wheel),
puffin_installer::Download::SourceDistribution(sdist) => Either::Right(sdist),
});
// Build any missing source distributions.
let sdists = if sdists.is_empty() {
vec![]
} else {
let start = std::time::Instant::now();
let build_dispatch = BuildDispatch::new(
RegistryClientBuilder::default().build(),
cache.to_path_buf(),
venv.interpreter_info().clone(),
fs::canonicalize(venv.python_executable())?,
);
let builder = Builder::new(&build_dispatch)
.with_reporter(BuildReporter::from(printer).with_length(sdists.len() as u64));
let wheels = builder.build(sdists).await?;
let s = if wheels.len() == 1 { "" } else { "s" };
writeln!(
printer,
"{}",
format!(
"Built {} in {}",
format!("{} package{}", wheels.len(), s).bold(),
elapsed(start.elapsed())
)
.dimmed()
)?;
wheels
};
let downloads = wheels.into_iter().chain(sdists).collect::<Vec<_>>();
// Unzip any downloaded distributions.
let unzips = if downloads.is_empty() {
vec![]

View file

@ -148,6 +148,41 @@ impl puffin_installer::InstallReporter for InstallReporter {
}
}
#[derive(Debug)]
pub(crate) struct BuildReporter {
progress: ProgressBar,
}
impl From<Printer> for BuildReporter {
fn from(printer: Printer) -> Self {
let progress = ProgressBar::with_draw_target(None, printer.target());
progress.set_style(
ProgressStyle::with_template("{bar:20} [{pos}/{len}] {wide_msg:.dim}").unwrap(),
);
progress.set_message("Building wheels...");
Self { progress }
}
}
impl BuildReporter {
#[must_use]
pub(crate) fn with_length(self, length: u64) -> Self {
self.progress.set_length(length);
self
}
}
impl puffin_installer::BuildReporter for BuildReporter {
fn on_progress(&self, wheel: &RemoteDistribution) {
self.progress.set_message(format!("{wheel}"));
self.progress.inc(1);
}
fn on_complete(&self) {
self.progress.finish_and_clear();
}
}
#[derive(Debug)]
pub(crate) struct ResolverReporter {
progress: ProgressBar,

View file

@ -569,3 +569,140 @@ fn install_url() -> Result<()> {
Ok(())
}
/// Install a package into a virtual environment from a Git repository.
#[test]
#[cfg(feature = "git")]
fn install_git() -> Result<()> {
let temp_dir = assert_fs::TempDir::new()?;
let cache_dir = assert_fs::TempDir::new()?;
let venv = temp_dir.child(".venv");
Command::new(get_cargo_bin(BIN_NAME))
.arg("venv")
.arg(venv.as_os_str())
.arg("--cache-dir")
.arg(cache_dir.path())
.current_dir(&temp_dir)
.assert()
.success();
venv.assert(predicates::path::is_dir());
let requirements_txt = temp_dir.child("requirements.txt");
requirements_txt.touch()?;
requirements_txt.write_str("werkzeug @ git+https://github.com/pallets/werkzeug.git@2.0.0")?;
insta::with_settings!({
filters => vec![
(r"(\d|\.)+(ms|s)", "[TIME]"),
]
}, {
assert_cmd_snapshot!(Command::new(get_cargo_bin(BIN_NAME))
.arg("pip-sync")
.arg("requirements.txt")
.arg("--cache-dir")
.arg(cache_dir.path())
.env("VIRTUAL_ENV", venv.as_os_str())
.current_dir(&temp_dir));
});
Command::new(venv.join("bin").join("python"))
.arg("-c")
.arg("import werkzeug")
.current_dir(&temp_dir)
.assert()
.success();
Ok(())
}
/// Install two packages from the same Git repository.
#[test]
#[cfg(feature = "git")]
fn install_git_subdirectories() -> Result<()> {
let temp_dir = assert_fs::TempDir::new()?;
let cache_dir = assert_fs::TempDir::new()?;
let venv = temp_dir.child(".venv");
Command::new(get_cargo_bin(BIN_NAME))
.arg("venv")
.arg(venv.as_os_str())
.arg("--cache-dir")
.arg(cache_dir.path())
.current_dir(&temp_dir)
.assert()
.success();
venv.assert(predicates::path::is_dir());
let requirements_txt = temp_dir.child("requirements.txt");
requirements_txt.touch()?;
requirements_txt.write_str("example-pkg-a @ git+https://github.com/pypa/sample-namespace-packages.git@df7530eeb8fa0cb7dbb8ecb28363e8e36bfa2f45#subdirectory=pkg_resources/pkg_a\nexample-pkg-b @ git+https://github.com/pypa/sample-namespace-packages.git@df7530eeb8fa0cb7dbb8ecb28363e8e36bfa2f45#subdirectory=pkg_resources/pkg_b")?;
insta::with_settings!({
filters => vec![
(r"(\d|\.)+(ms|s)", "[TIME]"),
]
}, {
assert_cmd_snapshot!(Command::new(get_cargo_bin(BIN_NAME))
.arg("pip-sync")
.arg("requirements.txt")
.arg("--cache-dir")
.arg(cache_dir.path())
.env("VIRTUAL_ENV", venv.as_os_str())
.current_dir(&temp_dir));
});
Command::new(venv.join("bin").join("python"))
.arg("-c")
.arg("import example_pkg")
.current_dir(&temp_dir)
.assert()
.success();
Ok(())
}
/// Install a source distribution into a virtual environment.
#[test]
fn install_sdist() -> Result<()> {
let temp_dir = assert_fs::TempDir::new()?;
let cache_dir = assert_fs::TempDir::new()?;
let venv = temp_dir.child(".venv");
Command::new(get_cargo_bin(BIN_NAME))
.arg("venv")
.arg(venv.as_os_str())
.arg("--cache-dir")
.arg(cache_dir.path())
.current_dir(&temp_dir)
.assert()
.success();
venv.assert(predicates::path::is_dir());
let requirements_txt = temp_dir.child("requirements.txt");
requirements_txt.touch()?;
requirements_txt.write_str("Werkzeug==0.9.6")?;
insta::with_settings!({
filters => vec![
(r"(\d|\.)+(ms|s)", "[TIME]"),
]
}, {
assert_cmd_snapshot!(Command::new(get_cargo_bin(BIN_NAME))
.arg("pip-sync")
.arg("requirements.txt")
.arg("--cache-dir")
.arg(cache_dir.path())
.env("VIRTUAL_ENV", venv.as_os_str())
.current_dir(&temp_dir));
});
Command::new(venv.join("bin").join("python"))
.arg("-c")
.arg("import werkzeug")
.current_dir(&temp_dir)
.assert()
.success();
Ok(())
}

View file

@ -0,0 +1,24 @@
---
source: crates/puffin-cli/tests/pip_sync.rs
info:
program: puffin
args:
- pip-sync
- requirements.txt
- "--cache-dir"
- /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpZNXBuT
env:
VIRTUAL_ENV: /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpBYANH7/.venv
---
success: true
exit_code: 0
----- stdout -----
----- stderr -----
Resolved 1 package in [TIME]
Downloaded 1 package in [TIME]
Built 1 package in [TIME]
Unzipped 1 package in [TIME]
Installed 1 package in [TIME]
+ werkzeug @ git+https://github.com/pallets/werkzeug.git@2.0.0

View file

@ -0,0 +1,25 @@
---
source: crates/puffin-cli/tests/pip_sync.rs
info:
program: puffin
args:
- pip-sync
- requirements.txt
- "--cache-dir"
- /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpJvtAcl
env:
VIRTUAL_ENV: /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpYIBn2G/.venv
---
success: true
exit_code: 0
----- stdout -----
----- stderr -----
Resolved 2 packages in [TIME]
Downloaded 2 packages in [TIME]
Built 2 packages in [TIME]
Unzipped 2 packages in [TIME]
Installed 2 packages in [TIME]
+ example-pkg-a @ git+https://github.com/pypa/sample-namespace-packages.git@df7530eeb8fa0cb7dbb8ecb28363e8e36bfa2f45#subdirectory=pkg_resources/pkg_a
+ example-pkg-b @ git+https://github.com/pypa/sample-namespace-packages.git@df7530eeb8fa0cb7dbb8ecb28363e8e36bfa2f45#subdirectory=pkg_resources/pkg_b

View file

@ -0,0 +1,24 @@
---
source: crates/puffin-cli/tests/pip_sync.rs
info:
program: puffin
args:
- pip-sync
- requirements.txt
- "--cache-dir"
- /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpTtWuay
env:
VIRTUAL_ENV: /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpdSdhVI/.venv
---
success: true
exit_code: 0
----- stdout -----
----- stderr -----
Resolved 1 package in [TIME]
Downloaded 1 package in [TIME]
Built 1 package in [TIME]
Unzipped 1 package in [TIME]
Installed 1 package in [TIME]
+ werkzeug==0.9.6

View file

@ -8,14 +8,14 @@ use std::pin::Pin;
use anyhow::Context;
use anyhow::Result;
use itertools::Itertools;
use itertools::{Either, Itertools};
use tracing::{debug, instrument};
use pep508_rs::Requirement;
use platform_tags::Tags;
use puffin_build::SourceDistributionBuilder;
use puffin_client::RegistryClient;
use puffin_installer::{Downloader, Installer, PartitionedRequirements, Unzipper};
use puffin_installer::{Builder, Downloader, Installer, PartitionedRequirements, Unzipper};
use puffin_interpreter::{InterpreterInfo, Virtualenv};
use puffin_resolver::{DistributionFinder, Manifest, PreReleaseMode, ResolutionMode, Resolver};
use puffin_traits::BuildContext;
@ -144,6 +144,33 @@ impl BuildContext for BuildDispatch {
.context("Failed to download build dependencies")?
};
let (wheels, sdists): (Vec<_>, Vec<_>) =
downloads
.into_iter()
.partition_map(|download| match download {
puffin_installer::Download::Wheel(wheel) => Either::Left(wheel),
puffin_installer::Download::SourceDistribution(sdist) => {
Either::Right(sdist)
}
});
// Build any missing source distributions.
let sdists = if sdists.is_empty() {
vec![]
} else {
debug!(
"Building source distributions{}: {}",
if sdists.len() == 1 { "" } else { "s" },
sdists.iter().map(ToString::to_string).join(", ")
);
Builder::new(self)
.build(sdists)
.await
.context("Failed to build source distributions")?
};
let downloads = wheels.into_iter().chain(sdists).collect::<Vec<_>>();
// Unzip any downloaded distributions.
let unzips = if downloads.is_empty() {
vec![]

View file

@ -97,6 +97,31 @@ impl RemoteDistribution {
Self::Url(name, url)
}
/// Return the URL of the distribution.
pub fn url(&self) -> Result<Cow<'_, Url>> {
match self {
Self::Registry(_, _, file) => {
let url = Url::parse(&file.url)?;
Ok(Cow::Owned(url))
}
Self::Url(_, url) => Ok(Cow::Borrowed(url)),
}
}
/// Return the filename of the distribution.
pub fn filename(&self) -> Result<Cow<'_, str>> {
match self {
Self::Registry(_, _, file) => Ok(Cow::Borrowed(&file.filename)),
Self::Url(_, url) => {
let filename = url
.path_segments()
.and_then(std::iter::Iterator::last)
.ok_or_else(|| anyhow!("Could not parse filename from URL: {}", url))?;
Ok(Cow::Owned(filename.to_owned()))
}
}
}
/// Return the normalized [`PackageName`] of the distribution.
pub fn name(&self) -> &PackageName {
match self {
@ -125,6 +150,17 @@ impl RemoteDistribution {
Self::Url(_name, url) => puffin_cache::digest(&CanonicalUrl::new(url)),
}
}
/// Returns `true` if this distribution is a wheel.
pub fn is_wheel(&self) -> bool {
let filename = match self {
Self::Registry(_name, _version, file) => &file.filename,
Self::Url(_name, url) => url.path(),
};
Path::new(filename)
.extension()
.is_some_and(|ext| ext.eq_ignore_ascii_case("whl"))
}
}
impl std::fmt::Display for RemoteDistribution {
@ -379,3 +415,14 @@ impl std::fmt::Display for RemoteDistributionRef<'_> {
}
}
}
impl<'a> From<&'a RemoteDistribution> for RemoteDistributionRef<'a> {
fn from(dist: &'a RemoteDistribution) -> Self {
match dist {
RemoteDistribution::Registry(name, version, file) => {
Self::Registry(name, version, file)
}
RemoteDistribution::Url(name, url) => Self::Url(name, url),
}
}
}

View file

@ -13,11 +13,14 @@ license = { workspace = true }
install-wheel-rs = { path = "../install-wheel-rs", default-features = false }
pep440_rs = { path = "../pep440-rs" }
pep508_rs = { path = "../pep508-rs" }
puffin-cache = { path = "../puffin-cache" }
puffin-client = { path = "../puffin-client" }
puffin-distribution = { path = "../puffin-distribution" }
puffin-git = { path = "../puffin-git" }
puffin-interpreter = { path = "../puffin-interpreter" }
puffin-normalize = { path = "../puffin-normalize" }
puffin-package = { path = "../puffin-package" }
puffin-traits = { path = "../puffin-traits" }
distribution-filename = { path = "../distribution-filename" }
anyhow = { workspace = true }
@ -26,6 +29,7 @@ fs-err = { workspace = true }
fxhash = { workspace = true }
rayon = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
tracing = { workspace = true }

View file

@ -0,0 +1,109 @@
//! Build source distributions from downloaded archives.
//!
//! TODO(charlie): Unify with `crates/puffin-resolver/src/distribution/source_distribution.rs`.
use std::cmp::Reverse;
use anyhow::Result;
use fs_err::tokio as fs;
use tracing::debug;
use puffin_distribution::RemoteDistribution;
use puffin_traits::BuildContext;
use crate::downloader::{DiskWheel, SourceDistribution, Wheel};
const BUILT_WHEELS_CACHE: &str = "built-wheels-v0";
pub struct Builder<'a, T: BuildContext + Send + Sync> {
build_context: &'a T,
reporter: Option<Box<dyn Reporter>>,
}
impl<'a, T: BuildContext + Send + Sync> Builder<'a, T> {
/// Initialize a new source distribution downloader.
pub fn new(build_context: &'a T) -> Self {
Self {
build_context,
reporter: None,
}
}
/// Set the [`Reporter`] to use for this downloader.
#[must_use]
pub fn with_reporter(self, reporter: impl Reporter + 'static) -> Self {
Self {
reporter: Some(Box::new(reporter)),
..self
}
}
/// Build a set of source distributions.
pub async fn build(&'a self, distributions: Vec<SourceDistribution>) -> Result<Vec<Wheel>> {
// Sort the distributions by size.
let mut distributions = distributions;
distributions.sort_unstable_by_key(|distribution| match &distribution.remote {
RemoteDistribution::Registry(_package, _version, file) => Reverse(file.size),
RemoteDistribution::Url(_, _) => Reverse(usize::MIN),
});
// Build the distributions serially.
let mut builds = Vec::with_capacity(distributions.len());
for distribution in distributions {
debug!("Building source distribution: {distribution}");
let result = build_sdist(distribution, self.build_context).await?;
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_progress(result.remote());
}
builds.push(result);
}
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_complete();
}
Ok(builds)
}
}
/// Build a source distribution into a wheel.
async fn build_sdist<T: BuildContext + Send + Sync>(
distribution: SourceDistribution,
build_context: &T,
) -> Result<Wheel> {
// Create a directory for the wheel.
let wheel_dir = build_context
.cache()
.join(BUILT_WHEELS_CACHE)
.join(distribution.remote.id());
fs::create_dir_all(&wheel_dir).await?;
// Build the wheel.
// TODO(charlie): If this is a Git dependency, we should do another checkout. If the same
// repository is used by multiple dependencies, at multiple commits, the local checkout may now
// point to the wrong commit.
let disk_filename = build_context
.build_source_distribution(
&distribution.sdist_file,
distribution.subdirectory.as_deref(),
&wheel_dir,
)
.await?;
let wheel_filename = wheel_dir.join(disk_filename);
Ok(Wheel::Disk(DiskWheel {
remote: distribution.remote,
path: wheel_filename,
}))
}
pub trait Reporter: Send + Sync {
/// Callback to invoke when a source distribution is built.
fn on_progress(&self, wheel: &RemoteDistribution);
/// Callback to invoke when the operation is complete.
fn on_complete(&self);
}

View file

@ -1,5 +1,6 @@
use std::cmp::Reverse;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::Result;
use cacache::{Algorithm, Integrity};
@ -9,20 +10,28 @@ use tracing::debug;
use url::Url;
use puffin_client::RegistryClient;
use puffin_distribution::RemoteDistribution;
use puffin_distribution::source::Source;
use puffin_distribution::{RemoteDistribution, RemoteDistributionRef};
use puffin_git::GitSource;
use crate::locks::Locks;
const GIT_CACHE: &str = "git-v0";
pub struct Downloader<'a> {
client: &'a RegistryClient,
cache: &'a Path,
locks: Arc<Locks>,
reporter: Option<Box<dyn Reporter>>,
}
impl<'a> Downloader<'a> {
/// Initialize a new downloader.
/// Initialize a new distribution downloader.
pub fn new(client: &'a RegistryClient, cache: &'a Path) -> Self {
Self {
client,
cache,
locks: Arc::new(Locks::default()),
reporter: None,
}
}
@ -36,28 +45,29 @@ impl<'a> Downloader<'a> {
}
}
/// Install a set of wheels into a Python virtual environment.
/// Download a set of distributions.
pub async fn download(
&'a self,
wheels: Vec<RemoteDistribution>,
) -> Result<Vec<InMemoryDistribution>> {
// Sort the wheels by size.
let mut wheels = wheels;
wheels.sort_unstable_by_key(|wheel| match wheel {
distributions: Vec<RemoteDistribution>,
) -> Result<Vec<Download>> {
// Sort the distributions by size.
let mut distributions = distributions;
distributions.sort_unstable_by_key(|wheel| match wheel {
RemoteDistribution::Registry(_package, _version, file) => Reverse(file.size),
RemoteDistribution::Url(_, _) => Reverse(usize::MIN),
});
// Phase 1: Fetch the wheels in parallel.
// Fetch the distributions in parallel.
let mut fetches = JoinSet::new();
let mut downloads = Vec::with_capacity(wheels.len());
for remote in wheels {
debug!("Downloading wheel: {remote}");
let mut downloads = Vec::with_capacity(distributions.len());
for distribution in distributions {
debug!("Downloading wheel: {distribution}");
fetches.spawn(fetch_wheel(
remote.clone(),
fetches.spawn(fetch_distribution(
distribution.clone(),
self.client.clone(),
self.cache.to_path_buf(),
self.locks.clone(),
));
}
@ -65,7 +75,7 @@ impl<'a> Downloader<'a> {
let result = result?;
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_download_progress(&result.remote);
reporter.on_download_progress(result.remote());
}
downloads.push(result);
@ -79,62 +89,119 @@ impl<'a> Downloader<'a> {
}
}
#[derive(Debug)]
pub struct InMemoryDistribution {
/// The remote file from which this wheel was downloaded.
pub(crate) remote: RemoteDistribution,
/// The contents of the wheel.
pub(crate) buffer: Vec<u8>,
}
impl std::fmt::Display for InMemoryDistribution {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.remote)
}
}
/// Download a wheel to a given path.
async fn fetch_wheel(
remote: RemoteDistribution,
/// Download a built distribution (wheel) or source distribution (sdist).
async fn fetch_distribution(
distribution: RemoteDistribution,
client: RegistryClient,
cache: PathBuf,
) -> Result<InMemoryDistribution> {
match &remote {
RemoteDistribution::Registry(.., file) => {
// Parse the wheel's SRI.
let sri = Integrity::from_hex(&file.hashes.sha256, Algorithm::Sha256)?;
locks: Arc<Locks>,
) -> Result<Download> {
let url = distribution.url()?;
let lock = locks.acquire(&url).await;
let _guard = lock.lock().await;
// Read from the cache, if possible.
if let Ok(buffer) = cacache::read_hash(&cache, &sri).await {
debug!("Extracted wheel from cache: {remote}");
return Ok(InMemoryDistribution { remote, buffer });
if distribution.is_wheel() {
match &distribution {
RemoteDistribution::Registry(.., file) => {
// Parse the wheel's SRI.
let sri = Integrity::from_hex(&file.hashes.sha256, Algorithm::Sha256)?;
// Read from the cache, if possible.
if let Ok(buffer) = cacache::read_hash(&cache, &sri).await {
debug!("Extracted wheel from cache: {distribution}");
return Ok(Download::Wheel(Wheel::InMemory(InMemoryWheel {
remote: distribution,
buffer,
})));
}
// Fetch the wheel.
let url = Url::parse(&file.url)?;
let reader = client.stream_external(&url).await?;
// Read into a buffer.
let mut buffer = Vec::with_capacity(file.size);
let mut reader = tokio::io::BufReader::new(reader.compat());
tokio::io::copy(&mut reader, &mut buffer).await?;
// Write the buffer to the cache.
cacache::write_hash(&cache, &buffer).await?;
Ok(Download::Wheel(Wheel::InMemory(InMemoryWheel {
remote: distribution,
buffer,
})))
}
RemoteDistribution::Url(.., url) => {
// Fetch the wheel.
let reader = client.stream_external(url).await?;
// Fetch the wheel.
let url = Url::parse(&file.url)?;
let reader = client.stream_external(&url).await?;
// Read into a buffer.
let mut buffer = Vec::with_capacity(1024 * 1024);
let mut reader = tokio::io::BufReader::new(reader.compat());
tokio::io::copy(&mut reader, &mut buffer).await?;
// Read into a buffer.
let mut buffer = Vec::with_capacity(file.size);
let mut reader = tokio::io::BufReader::new(reader.compat());
tokio::io::copy(&mut reader, &mut buffer).await?;
// Write the buffer to the cache, if possible.
cacache::write_hash(&cache, &buffer).await?;
Ok(InMemoryDistribution { remote, buffer })
Ok(Download::Wheel(Wheel::InMemory(InMemoryWheel {
remote: distribution,
buffer,
})))
}
}
RemoteDistribution::Url(.., url) => {
// Fetch the wheel.
let reader = client.stream_external(url).await?;
} else {
let distribution_ref = RemoteDistributionRef::from(&distribution);
let source = Source::try_from(&distribution_ref)?;
let (sdist_file, subdirectory) = match source {
Source::RegistryUrl(url) => {
debug!("Fetching source distribution from registry: {url}");
// Read into a buffer.
let mut buffer = Vec::with_capacity(1024 * 1024);
let mut reader = tokio::io::BufReader::new(reader.compat());
tokio::io::copy(&mut reader, &mut buffer).await?;
let reader = client.stream_external(&url).await?;
let mut reader = tokio::io::BufReader::new(reader.compat());
Ok(InMemoryDistribution { remote, buffer })
}
// Download the source distribution.
let temp_dir = tempfile::tempdir_in(cache)?.into_path();
let sdist_filename = distribution.filename()?;
let sdist_file = temp_dir.join(sdist_filename.as_ref());
let mut writer = tokio::fs::File::create(&sdist_file).await?;
tokio::io::copy(&mut reader, &mut writer).await?;
// Registry dependencies can't specify a subdirectory.
let subdirectory = None;
(sdist_file, subdirectory)
}
Source::RemoteUrl(url, subdirectory) => {
debug!("Fetching source distribution from URL: {url}");
let reader = client.stream_external(url).await?;
let mut reader = tokio::io::BufReader::new(reader.compat());
// Download the source distribution.
let temp_dir = tempfile::tempdir_in(cache)?.into_path();
let sdist_filename = distribution.filename()?;
let sdist_file = temp_dir.join(sdist_filename.as_ref());
let mut writer = tokio::fs::File::create(&sdist_file).await?;
tokio::io::copy(&mut reader, &mut writer).await?;
(sdist_file, subdirectory)
}
Source::Git(git, subdirectory) => {
debug!("Fetching source distribution from Git: {git}");
let git_dir = cache.join(GIT_CACHE);
let source = GitSource::new(git, git_dir);
let sdist_file = tokio::task::spawn_blocking(move || source.fetch())
.await??
.into();
(sdist_file, subdirectory)
}
};
Ok(Download::SourceDistribution(SourceDistribution {
remote: distribution,
sdist_file,
subdirectory,
}))
}
}
@ -145,3 +212,84 @@ pub trait Reporter: Send + Sync {
/// Callback to invoke when the operation is complete.
fn on_download_complete(&self);
}
/// A downloaded wheel that's stored in-memory.
#[derive(Debug)]
pub struct InMemoryWheel {
/// The remote file from which this wheel was downloaded.
pub(crate) remote: RemoteDistribution,
/// The contents of the wheel.
pub(crate) buffer: Vec<u8>,
}
/// A downloaded wheel that's stored on-disk.
#[derive(Debug)]
pub struct DiskWheel {
/// The remote file from which this wheel was downloaded.
pub(crate) remote: RemoteDistribution,
/// The path to the downloaded wheel.
pub(crate) path: PathBuf,
}
/// A downloaded wheel.
#[derive(Debug)]
pub enum Wheel {
InMemory(InMemoryWheel),
Disk(DiskWheel),
}
impl Wheel {
/// Return the [`RemoteDistribution`] from which this wheel was downloaded.
pub fn remote(&self) -> &RemoteDistribution {
match self {
Wheel::InMemory(wheel) => &wheel.remote,
Wheel::Disk(wheel) => &wheel.remote,
}
}
}
impl std::fmt::Display for Wheel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.remote())
}
}
/// A downloaded source distribution.
#[derive(Debug, Clone)]
pub struct SourceDistribution {
/// The remote file from which this wheel was downloaded.
pub(crate) remote: RemoteDistribution,
/// The path to the downloaded archive or directory.
pub(crate) sdist_file: PathBuf,
/// The subdirectory within the archive or directory.
pub(crate) subdirectory: Option<PathBuf>,
}
impl std::fmt::Display for SourceDistribution {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.remote)
}
}
/// A downloaded distribution, either a wheel or a source distribution.
#[derive(Debug)]
pub enum Download {
Wheel(Wheel),
SourceDistribution(SourceDistribution),
}
impl Download {
/// Return the [`RemoteDistribution`] from which this distribution was downloaded.
pub fn remote(&self) -> &RemoteDistribution {
match self {
Download::Wheel(distribution) => distribution.remote(),
Download::SourceDistribution(distribution) => &distribution.remote,
}
}
}
impl std::fmt::Display for Download {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.remote())
}
}

View file

@ -1,4 +1,5 @@
pub use downloader::{Downloader, Reporter as DownloadReporter};
pub use builder::{Builder, Reporter as BuildReporter};
pub use downloader::{Download, Downloader, Reporter as DownloadReporter};
pub use installer::{Installer, Reporter as InstallReporter};
pub use plan::PartitionedRequirements;
pub use registry_index::RegistryIndex;
@ -6,9 +7,11 @@ pub use site_packages::SitePackages;
pub use uninstall::uninstall;
pub use unzipper::{Reporter as UnzipReporter, Unzipper};
mod builder;
mod cache;
mod downloader;
mod installer;
mod locks;
mod plan;
mod registry_index;
mod site_packages;

View file

@ -0,0 +1,19 @@
use fxhash::FxHashMap;
use puffin_cache::RepositoryUrl;
use std::sync::Arc;
use tokio::sync::Mutex;
use url::Url;
/// A set of locks used to prevent concurrent access to the same resource.
#[derive(Debug, Default)]
pub(crate) struct Locks(Mutex<FxHashMap<String, Arc<Mutex<()>>>>);
impl Locks {
/// Acquire a lock on the given resource.
pub(crate) async fn acquire(&self, url: &Url) -> Arc<Mutex<()>> {
let mut map = self.0.lock().await;
map.entry(puffin_cache::digest(&RepositoryUrl::new(url)))
.or_insert_with(|| Arc::new(Mutex::new(())))
.clone()
}
}

View file

@ -46,6 +46,9 @@ impl PartitionedRequirements {
for requirement in requirements {
// Filter out already-installed packages.
// TODO(charlie): Detect packages installed via URL. Right now, like pip, we _always_
// attempt to reinstall a package if it was installed via URL. This is often very
// fast, since the wheel is cached, but it should still be avoidable.
if let Some(distribution) = site_packages.remove(&requirement.name) {
if requirement.is_satisfied_by(distribution.version()) {
debug!("Requirement already satisfied: {distribution}",);

View file

@ -1,4 +1,5 @@
use std::cmp::Reverse;
use std::io::{Read, Seek};
use std::path::Path;
use anyhow::Result;
@ -10,8 +11,8 @@ use zip::ZipArchive;
use puffin_distribution::{CachedDistribution, RemoteDistribution};
use crate::cache::WheelCache;
use crate::downloader::InMemoryDistribution;
use crate::vendor::CloneableSeekableReader;
use crate::downloader::Wheel;
use crate::vendor::{CloneableSeekableReader, HasLength};
#[derive(Default)]
pub struct Unzipper {
@ -30,7 +31,7 @@ impl Unzipper {
/// Unzip a set of downloaded wheels.
pub async fn unzip(
&self,
downloads: Vec<InMemoryDistribution>,
downloads: Vec<Wheel>,
target: &Path,
) -> Result<Vec<CachedDistribution>> {
// Create the wheel cache subdirectory, if necessary.
@ -39,14 +40,17 @@ impl Unzipper {
// Sort the wheels by size.
let mut downloads = downloads;
downloads.sort_unstable_by_key(|wheel| Reverse(wheel.buffer.len()));
downloads.sort_unstable_by_key(|wheel| match wheel {
Wheel::Disk(_) => Reverse(usize::MIN),
Wheel::InMemory(wheel) => Reverse(wheel.buffer.len()),
});
let staging = tempfile::tempdir_in(wheel_cache.root())?;
// Unpack the wheels into the cache.
let mut wheels = Vec::with_capacity(downloads.len());
for download in downloads {
let remote = download.remote.clone();
let remote = download.remote().clone();
debug!("Unpacking wheel: {remote}");
@ -89,12 +93,17 @@ impl Unzipper {
}
/// Unzip a wheel into the target directory.
fn unzip_wheel(wheel: InMemoryDistribution, target: &Path) -> Result<()> {
// Read the wheel into a buffer.
let reader = std::io::Cursor::new(wheel.buffer);
let archive = ZipArchive::new(CloneableSeekableReader::new(reader))?;
fn unzip_wheel(wheel: Wheel, target: &Path) -> Result<()> {
match wheel {
Wheel::InMemory(wheel) => unzip_archive(std::io::Cursor::new(wheel.buffer), target),
Wheel::Disk(wheel) => unzip_archive(std::fs::File::open(wheel.path)?, target),
}
}
/// Unzip a zip archive into the target directory.
fn unzip_archive<R: Send + Read + Seek + HasLength>(reader: R, target: &Path) -> Result<()> {
// Unzip in parallel.
let archive = ZipArchive::new(CloneableSeekableReader::new(reader))?;
(0..archive.len())
.par_bridge()
.map(|file_number| {

View file

@ -1,3 +1,3 @@
pub(crate) use cloneable_seekable_reader::CloneableSeekableReader;
pub(crate) use cloneable_seekable_reader::{CloneableSeekableReader, HasLength};
mod cloneable_seekable_reader;

View file

@ -1,3 +1,7 @@
//! Fetch and build source distributions from remote sources.
//!
//! TODO(charlie): Unify with `crates/puffin-installer/src/sdist_builder.rs`.
use std::str::FromStr;
use anyhow::Result;