Source dist metadata refactor (#468)

## Summary and motivation

For a given source dist, we store the metadata of each wheel built
through it in `built-wheel-metadata-v0/pypi/<source dist
filename>/metadata.json`. During resolution, we check the cache status
of the source dist. If it is fresh, we check `metadata.json` for a
matching wheel. If there is one we use that metadata, if there isn't, we
build one. If the source is stale, we build a wheel and override
`metadata.json` with that single wheel. This PR thereby ties the local
built wheel metadata cache to the freshness of the remote source dist.
This functionality is available through `SourceDistCachedBuilder`.

`puffin_installer::Builder`, `puffin_installer::Downloader` and
`Fetcher` are removed, instead there are now `FetchAndBuild` which calls
into the also new `SourceDistCachedBuilder`. `FetchAndBuild` is the new
main high-level abstraction: It spawns parallel fetching/building, for
wheel metadata it calls into the registry client, for wheel files it
fetches them, for source dists it calls `SourceDistCachedBuilder`. It
handles locks around builds, and newly added also inter-process file
locking for git operations.

Fetching and building source distributions now happens in parallel in
`pip-sync`, i.e. we don't have to wait for the largest wheel to be
downloaded to start building source distributions.

In a follow-up PR, I'll also clear built wheels when they've become
stale.

Another effect is that in a fully cached resolution, we need neither zip
reading nor email parsing.

Closes #473

## Source dist cache structure 

Entries by supported sources:
 * `<build wheel metadata cache>/pypi/foo-1.0.0.zip/metadata.json`
* `<build wheel metadata
cache>/<sha256(index-url)>/foo-1.0.0.zip/metadata.json`
* `<build wheel metadata
cache>/url/<sha256(url)>/foo-1.0.0.zip/metadata.json`
But the url filename does not need to be a valid source dist filename

(<https://github.com/search?q=path%3A**%2Frequirements.txt+master.zip&type=code>),
so it could also be the following and we have to take any string as
filename:
* `<build wheel metadata
cache>/url/<sha256(url)>/master.zip/metadata.json`

Example:
```text
# git source dist
pydantic-extra-types @ git+https://github.com/pydantic/pydantic-extra-types.git
# pypi source dist
django_allauth==0.51.0
# url source dist
werkzeug @ ff1904eb5e/werkzeug-3.0.1.tar.gz
```
will be stored as
```text
built-wheel-metadata-v0
├── git
│   └── 5c56bc1c58c34c11
│       └── 843b753e9e8cb74e83cac55598719b39a4d5ef1f
│           └── metadata.json
├── pypi
│   └── django-allauth-0.51.0.tar.gz
│       └── metadata.json
└── url
    └── 6781bd6440ae72c2
        └── werkzeug-3.0.1.tar.gz
            └── metadata.json
```

The inside of a `metadata.json`:
```json
{
  "data": {
    "django_allauth-0.51.0-py3-none-any.whl": {
      "metadata-version": "2.1",
      "name": "django-allauth",
      "version": "0.51.0",
      ...
    }
  }
}
```
This commit is contained in:
konsti 2023-11-24 18:47:58 +01:00 committed by GitHub
parent 8d247fe95b
commit d54e780843
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
49 changed files with 1712 additions and 1142 deletions

15
Cargo.lock generated
View file

@ -2374,7 +2374,6 @@ version = "0.0.1"
dependencies = [
"clap",
"directories",
"distribution-filename",
"fs-err",
"hex",
"pypi-types",
@ -2447,6 +2446,7 @@ dependencies = [
"async_http_range_reader",
"async_zip",
"distribution-filename",
"distribution-types",
"fs-err",
"futures",
"http",
@ -2479,6 +2479,7 @@ dependencies = [
"clap",
"colored",
"distribution-filename",
"distribution-types",
"fs-err",
"futures",
"gourgeist",
@ -2541,14 +2542,22 @@ dependencies = [
"distribution-filename",
"distribution-types",
"fs-err",
"fs2",
"futures",
"fxhash",
"install-wheel-rs",
"platform-tags",
"puffin-cache",
"puffin-client",
"puffin-git",
"puffin-normalize",
"puffin-traits",
"pypi-types",
"rayon",
"reqwest",
"serde",
"serde_json",
"sha2",
"tempfile",
"thiserror",
"tokio",
@ -2590,6 +2599,7 @@ dependencies = [
"install-wheel-rs",
"pep440_rs 0.3.12",
"pep508_rs",
"platform-tags",
"puffin-cache",
"puffin-client",
"puffin-distribution",
@ -2658,6 +2668,7 @@ dependencies = [
"futures",
"fxhash",
"gourgeist",
"http-cache-semantics",
"insta",
"install-wheel-rs",
"itertools 0.11.0",
@ -2677,6 +2688,8 @@ dependencies = [
"puffin-normalize",
"puffin-traits",
"pypi-types",
"reqwest",
"serde_json",
"sha2",
"tempfile",
"thiserror",

View file

@ -15,6 +15,11 @@ pub enum DirectUrl {
Archive(DirectArchiveUrl),
}
/// A git repository url
///
/// Examples:
/// * `git+https://git.example.com/MyProject.git`
/// * `git+https://git.example.com/MyProject.git@v1.0#egg=pkg&subdirectory=pkg_dir`
#[derive(Debug)]
pub struct LocalFileUrl {
pub url: Url,
@ -26,6 +31,12 @@ pub struct DirectGitUrl {
pub subdirectory: Option<PathBuf>,
}
/// An archive url
///
/// Examples:
/// * wheel: `https://download.pytorch.org/whl/torch-2.0.1-cp39-cp39-manylinux2014_aarch64.whl#sha256=423e0ae257b756bb45a4b49072046772d1ad0c592265c5080070e0767da4e490`
/// * source dist, correctly named: `https://files.pythonhosted.org/packages/62/06/d5604a70d160f6a6ca5fd2ba25597c24abd5c5ca5f437263d177ac242308/tqdm-4.66.1.tar.gz`
/// * source dist, only extension recognizable: `https://github.com/foo-labs/foo/archive/master.zip#egg=pkg&subdirectory=packages/bar`
#[derive(Debug)]
pub struct DirectArchiveUrl {
pub url: Url,
@ -42,14 +53,7 @@ impl TryFrom<&Url> for DirectGitUrl {
type Error = Error;
fn try_from(url: &Url) -> Result<Self, Self::Error> {
// If the URL points to a subdirectory, extract it, as in:
// `https://git.example.com/MyProject.git@v1.0#subdirectory=pkg_dir`
// `https://git.example.com/MyProject.git@v1.0#egg=pkg&subdirectory=pkg_dir`
let subdirectory = url.fragment().and_then(|fragment| {
fragment
.split('&')
.find_map(|fragment| fragment.strip_prefix("subdirectory=").map(PathBuf::from))
});
let subdirectory = get_subdirectory(url);
let url = url
.as_str()
@ -63,20 +67,27 @@ impl TryFrom<&Url> for DirectGitUrl {
impl From<&Url> for DirectArchiveUrl {
fn from(url: &Url) -> Self {
// If the URL points to a subdirectory, extract it, as in:
// `https://git.example.com/MyProject.git@v1.0#subdirectory=pkg_dir`
// `https://git.example.com/MyProject.git@v1.0#egg=pkg&subdirectory=pkg_dir`
let subdirectory = url.fragment().and_then(|fragment| {
fragment
.split('&')
.find_map(|fragment| fragment.strip_prefix("subdirectory=").map(PathBuf::from))
});
let url = url.clone();
Self { url, subdirectory }
Self {
url: url.clone(),
subdirectory: get_subdirectory(url),
}
}
}
/// If the URL points to a subdirectory, extract it, as in (git):
/// `git+https://git.example.com/MyProject.git@v1.0#subdirectory=pkg_dir`
/// `git+https://git.example.com/MyProject.git@v1.0#egg=pkg&subdirectory=pkg_dir`
/// or (direct archive url):
/// `https://github.com/foo-labs/foo/archive/master.zip#subdirectory=packages/bar`
/// `https://github.com/foo-labs/foo/archive/master.zip#egg=pkg&subdirectory=packages/bar`
fn get_subdirectory(url: &Url) -> Option<PathBuf> {
let fragment = url.fragment()?;
let subdirectory = fragment
.split('&')
.find_map(|fragment| fragment.strip_prefix("subdirectory="))?;
Some(PathBuf::from(subdirectory))
}
impl TryFrom<&Url> for DirectUrl {
type Error = Error;

View file

@ -240,27 +240,6 @@ impl Dist {
Dist::Source(source) => source.file(),
}
}
#[must_use]
pub fn with_url(self, url: Url) -> Self {
match self {
Self::Built(built) => Self::Built(match built {
BuiltDist::DirectUrl(dist) => {
BuiltDist::DirectUrl(DirectUrlBuiltDist { url, ..dist })
}
BuiltDist::Path(dist) => BuiltDist::Path(PathBuiltDist { url, ..dist }),
dist @ BuiltDist::Registry(_) => dist,
}),
Self::Source(source) => Self::Source(match source {
SourceDist::DirectUrl(dist) => {
SourceDist::DirectUrl(DirectUrlSourceDist { url, ..dist })
}
SourceDist::Git(dist) => SourceDist::Git(GitSourceDist { url, ..dist }),
SourceDist::Path(dist) => SourceDist::Path(PathSourceDist { url, ..dist }),
dist @ SourceDist::Registry(_) => dist,
}),
}
}
}
impl BuiltDist {

View file

@ -14,7 +14,6 @@ license = { workspace = true }
workspace = true
[dependencies]
distribution-filename = { path = "../distribution-filename" }
pypi-types = { path = "../pypi-types" }
clap = { workspace = true, features = ["derive"], optional = true }

View file

@ -1,4 +1,6 @@
use std::hash::{Hash, Hasher};
use std::ops::Deref;
use url::Url;
use crate::cache_key::{CacheKey, CacheKeyHasher};
@ -133,6 +135,14 @@ impl Hash for RepositoryUrl {
}
}
impl Deref for RepositoryUrl {
type Target = Url;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[cfg(test)]
mod tests {
use super::*;

View file

@ -6,12 +6,13 @@ pub use canonical_url::{CanonicalUrl, RepositoryUrl};
#[cfg(feature = "clap")]
pub use cli::{CacheArgs, CacheDir};
pub use digest::digest;
pub use metadata::WheelMetadataCache;
mod cache_key;
mod canonical_url;
mod cli;
mod digest;
pub mod metadata;
mod metadata;
/// A trait for types that can be hashed in a stable way across versions and platforms.
pub trait StableHash {

View file

@ -1,4 +1,4 @@
use std::path::{Path, PathBuf};
use std::path::PathBuf;
use url::Url;
@ -6,33 +6,128 @@ use pypi_types::IndexUrl;
use crate::{digest, CanonicalUrl};
/// Cache for metadata from (remote) wheels files
const WHEEL_METADATA_CACHE: &str = "wheel-metadata-v0";
/// Cache for metadata from wheels build from source dists
const BUILT_WHEEL_METADATA_CACHE: &str = "built-wheel-metadata-v0";
/// Cache wheel metadata.
/// Cache wheel metadata, both from remote wheels and built from source distributions.
///
/// Wheel metadata can come from a remote wheel or from building a source
/// distribution. For a remote wheel, we try the following ways to fetch the metadata:
/// 1. From a [PEP 658](https://peps.python.org/pep-0658/) data-dist-info-metadata url
/// 2. From a remote wheel by partial zip reading
/// 3. From a (temp) download of a remote wheel (this is a fallback, the webserver should support range requests)
/// See [`WheelMetadataCache::wheel_dir`] for remote wheel metadata caching and
/// [`WheelMetadataCache::built_wheel_dir`] for caching of metadata of built source
/// distributions.
pub enum WheelMetadataCache {
/// Either pypi or an alternative index, which we key by index url
Index(IndexUrl),
Url,
/// A direct url dependency, which we key by url
Url(Url),
/// A git dependency, which we key by repository url. We use the revision as filename.
///
/// Note that this variant only exists for source distributions, wheels can't be delivered
/// through git.
Git(Url),
}
impl WheelMetadataCache {
fn bucket(&self) -> PathBuf {
match self {
WheelMetadataCache::Index(IndexUrl::Pypi) => PathBuf::from("pypi"),
WheelMetadataCache::Index(url) => {
PathBuf::from("index").join(digest(&CanonicalUrl::new(url)))
}
WheelMetadataCache::Url(url) => {
PathBuf::from("url").join(digest(&CanonicalUrl::new(url)))
}
WheelMetadataCache::Git(url) => {
PathBuf::from("git").join(digest(&CanonicalUrl::new(url)))
}
}
}
/// Metadata of a remote wheel
///
/// Cache structure:
/// * `<wheel metadata cache>/pypi/foo-1.0.0-py3-none-any.json`
/// * `<wheel metadata cache>/<digest(index-url)>/foo-1.0.0-py3-none-any.json`
/// * `<wheel metadata cache>/url/<digest(url)>/foo-1.0.0-py3-none-any.json`
pub fn cache_dir(&self, cache: &Path, url: &Url) -> PathBuf {
let cache_root = cache.join(WHEEL_METADATA_CACHE);
match self {
WheelMetadataCache::Index(IndexUrl::Pypi) => cache_root.join("pypi"),
WheelMetadataCache::Index(url) => cache_root
.join("index")
.join(digest(&CanonicalUrl::new(url))),
WheelMetadataCache::Url => cache_root.join("url").join(digest(&CanonicalUrl::new(url))),
}
///
/// See `puffin_client::RegistryClient::wheel_metadata` for information on how wheel metadata
/// is fetched.
///
/// # Example
/// ```text
/// # pypi wheel
/// pandas
/// # url wheel
/// flask @ https://files.pythonhosted.org/packages/36/42/015c23096649b908c809c69388a805a571a3bea44362fe87e33fc3afa01f/flask-3.0.0-py3-none-any.whl
/// ```
/// may be cached as
/// ```text
/// wheel-metadata-v0
/// ├── pypi
/// │ ...
/// │ ├── pandas-2.1.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.json
/// │ ...
/// └── url
/// └── 4b8be67c801a7ecb
/// └── flask-3.0.0-py3-none-any.json
/// ```
pub fn wheel_dir(&self) -> PathBuf {
PathBuf::from(WHEEL_METADATA_CACHE).join(self.bucket())
}
/// Metadata of a built source distribution
///
/// Cache structure:
/// * `<build wheel metadata cache>/pypi/foo-1.0.0.zip/metadata.json`
/// * `<build wheel metadata cache>/<sha256(index-url)>/foo-1.0.0.zip/metadata.json`
/// * `<build wheel metadata cache>/url/<sha256(url)>/foo-1.0.0.zip/metadata.json`
/// But the url filename does not need to be a valid source dist filename
/// (<https://github.com/search?q=path%3A**%2Frequirements.txt+master.zip&type=code>),
/// so it could also be the following and we have to take any string as filename:
/// * `<build wheel metadata cache>/url/<sha256(url)>/master.zip/metadata.json`
///
/// # Example
/// ```text
/// # git source dist
/// pydantic-extra-types @ git+https://github.com/pydantic/pydantic-extra-types.git
/// # pypi source dist
/// django_allauth==0.51.0
/// # url source dist
/// werkzeug @ https://files.pythonhosted.org/packages/0d/cc/ff1904eb5eb4b455e442834dabf9427331ac0fa02853bf83db817a7dd53d/werkzeug-3.0.1.tar.gz
/// ```
/// may be cached as
/// ```text
/// built-wheel-metadata-v0
/// ├── git
/// │ └── 5c56bc1c58c34c11
/// │ └── 843b753e9e8cb74e83cac55598719b39a4d5ef1f
/// │ └── metadata.json
/// ├── pypi
/// │ └── django-allauth-0.51.0.tar.gz
/// │ └── metadata.json
/// └── url
/// └── 6781bd6440ae72c2
/// └── werkzeug-3.0.1.tar.gz
/// └── metadata.json
/// ```
///
/// The inside of a `metadata.json`:
/// ```json
/// {
/// "data": {
/// "django_allauth-0.51.0-py3-none-any.whl": {
/// "metadata-version": "2.1",
/// "name": "django-allauth",
/// "version": "0.51.0",
/// ...
/// }
/// }
/// }
/// ```
pub fn built_wheel_dir(&self, filename: &str) -> PathBuf {
PathBuf::from(BUILT_WHEEL_METADATA_CACHE)
.join(self.bucket())
.join(filename)
}
}

View file

@ -4,7 +4,7 @@ use std::path::Path;
use anyhow::{Context, Result};
use colored::Colorize;
use fs_err as fs;
use itertools::{Either, Itertools};
use itertools::Itertools;
use tracing::debug;
use distribution_types::{AnyDist, Metadata};
@ -14,13 +14,12 @@ use platform_host::Platform;
use platform_tags::Tags;
use puffin_client::RegistryClientBuilder;
use puffin_dispatch::BuildDispatch;
use puffin_installer::{Builder, InstallPlan};
use puffin_distribution::DistributionDatabase;
use puffin_installer::InstallPlan;
use puffin_interpreter::Virtualenv;
use pypi_types::Yanked;
use crate::commands::reporters::{
BuildReporter, DownloadReporter, FinderReporter, InstallReporter, UnzipReporter,
};
use crate::commands::reporters::{FetcherReporter, FinderReporter, InstallReporter, UnzipReporter};
use crate::commands::{elapsed, ExitStatus};
use crate::index_urls::IndexUrls;
use crate::printer::Printer;
@ -173,72 +172,34 @@ pub(crate) async fn sync_requirements(
}
// Download any missing distributions.
let downloads = if remote.is_empty() {
vec![]
} else {
let start = std::time::Instant::now();
let downloader = puffin_installer::Downloader::new(&client, cache)
.with_reporter(DownloadReporter::from(printer).with_length(remote.len() as u64))
.with_no_build(no_build);
let downloads = downloader
.download(remote)
.await
.context("Failed to download distributions")?;
let s = if downloads.len() == 1 { "" } else { "s" };
writeln!(
printer,
"{}",
format!(
"Downloaded {} in {}",
format!("{} package{}", downloads.len(), s).bold(),
elapsed(start.elapsed())
)
.dimmed()
)?;
downloads
};
let (wheels, sdists): (Vec<_>, Vec<_>) =
downloads
.into_iter()
.partition_map(|download| match download {
puffin_distribution::Download::Wheel(wheel) => Either::Left(wheel),
puffin_distribution::Download::SourceDist(sdist) => Either::Right(sdist),
});
// Build any missing source distributions.
let sdists = if sdists.is_empty() {
let wheels = if remote.is_empty() {
vec![]
} else {
let start = std::time::Instant::now();
let build_dispatch = BuildDispatch::new(
client,
client.clone(),
cache.to_path_buf(),
venv.interpreter().clone(),
fs::canonicalize(venv.python_executable())?,
no_build,
);
let builder = Builder::new(&build_dispatch)
.with_reporter(BuildReporter::from(printer).with_length(sdists.len() as u64));
let fetcher = DistributionDatabase::new(cache, &tags, &client, &build_dispatch)
.with_reporter(FetcherReporter::from(printer).with_length(remote.len() as u64));
let wheels = builder
.build(sdists)
let wheels = fetcher
.get_wheels(remote)
.await
.context("Failed to build source distributions")?;
.context("Failed to download and build distributions")?;
let s = if wheels.len() == 1 { "" } else { "s" };
let download_s = if wheels.len() == 1 { "" } else { "s" };
writeln!(
printer,
"{}",
format!(
"Built {} in {}",
format!("{} package{}", wheels.len(), s).bold(),
"Downloaded {} in {}",
format!("{} package{}", wheels.len(), download_s).bold(),
elapsed(start.elapsed())
)
.dimmed()
@ -247,19 +208,17 @@ pub(crate) async fn sync_requirements(
wheels
};
let downloads = wheels.into_iter().chain(sdists).collect::<Vec<_>>();
// Unzip any downloaded distributions.
let unzips = if downloads.is_empty() {
let unzips = if wheels.is_empty() {
vec![]
} else {
let start = std::time::Instant::now();
let unzipper = puffin_installer::Unzipper::default()
.with_reporter(UnzipReporter::from(printer).with_length(downloads.len() as u64));
.with_reporter(UnzipReporter::from(printer).with_length(wheels.len() as u64));
let unzips = unzipper
.unzip(downloads, cache)
.unzip(wheels, cache)
.await
.context("Failed to unpack wheels")?;

View file

@ -5,7 +5,7 @@ use colored::Colorize;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use url::Url;
use distribution_types::{CachedDist, Dist, Metadata, VersionOrUrl};
use distribution_types::{CachedDist, Dist, Metadata, SourceDist, VersionOrUrl};
use puffin_distribution::Download;
use puffin_normalize::ExtraName;
use puffin_normalize::PackageName;
@ -83,22 +83,36 @@ impl puffin_installer::UnzipReporter for UnzipReporter {
}
#[derive(Debug)]
pub(crate) struct DownloadReporter {
pub(crate) struct FetcherReporter {
printer: Printer,
multi_progress: MultiProgress,
progress: ProgressBar,
bars: Arc<Mutex<Vec<ProgressBar>>>,
}
impl From<Printer> for DownloadReporter {
impl From<Printer> for FetcherReporter {
fn from(printer: Printer) -> Self {
let progress = ProgressBar::with_draw_target(None, printer.target());
let multi_progress = MultiProgress::with_draw_target(printer.target());
let progress = multi_progress.add(ProgressBar::with_draw_target(None, printer.target()));
progress.enable_steady_tick(Duration::from_millis(200));
progress.set_style(
ProgressStyle::with_template("{bar:20} [{pos}/{len}] {wide_msg:.dim}").unwrap(),
ProgressStyle::with_template("{spinner:.white} {wide_msg:.dim}")
.unwrap()
.tick_strings(&["", "", "", "", "", "", "", "", "", ""]),
);
progress.set_message("Downloading wheels...");
Self { progress }
progress.set_message("Resolving dependencies...");
Self {
printer,
multi_progress,
progress,
bars: Arc::new(Mutex::new(Vec::new())),
}
}
}
impl DownloadReporter {
impl FetcherReporter {
#[must_use]
pub(crate) fn with_length(self, length: u64) -> Self {
self.progress.set_length(length);
@ -106,15 +120,74 @@ impl DownloadReporter {
}
}
impl puffin_installer::DownloadReporter for DownloadReporter {
impl puffin_distribution::Reporter for FetcherReporter {
fn on_download_progress(&self, download: &Download) {
self.progress.set_message(format!("{download}"));
self.progress.inc(1);
}
fn on_download_complete(&self) {
fn on_build_start(&self, dist: &SourceDist) -> usize {
let progress = self.multi_progress.insert_before(
&self.progress,
ProgressBar::with_draw_target(None, self.printer.target()),
);
progress.set_style(ProgressStyle::with_template("{wide_msg}").unwrap());
progress.set_message(format!(
"{} {}",
"Building".bold().green(),
dist.to_color_string()
));
let mut bars = self.bars.lock().unwrap();
bars.push(progress);
bars.len() - 1
}
fn on_build_complete(&self, dist: &SourceDist, index: usize) {
let bars = self.bars.lock().unwrap();
let progress = &bars[index];
progress.finish_with_message(format!(
"{} {}",
"Built".bold().green(),
dist.to_color_string()
));
}
fn on_download_and_build_complete(&self) {
self.progress.finish_and_clear();
}
fn on_checkout_start(&self, url: &Url, rev: &str) -> usize {
let progress = self.multi_progress.insert_before(
&self.progress,
ProgressBar::with_draw_target(None, self.printer.target()),
);
progress.set_style(ProgressStyle::with_template("{wide_msg}").unwrap());
progress.set_message(format!(
"{} {} ({})",
"Updating".bold().green(),
url,
rev.dimmed()
));
progress.finish();
let mut bars = self.bars.lock().unwrap();
bars.push(progress);
bars.len() - 1
}
fn on_checkout_complete(&self, url: &Url, rev: &str, index: usize) {
let bars = self.bars.lock().unwrap();
let progress = &bars[index];
progress.finish_with_message(format!(
"{} {} ({})",
"Updated".bold().green(),
url,
rev.dimmed()
));
}
}
#[derive(Debug)]
@ -152,41 +225,6 @@ impl puffin_installer::InstallReporter for InstallReporter {
}
}
#[derive(Debug)]
pub(crate) struct BuildReporter {
progress: ProgressBar,
}
impl From<Printer> for BuildReporter {
fn from(printer: Printer) -> Self {
let progress = ProgressBar::with_draw_target(None, printer.target());
progress.set_style(
ProgressStyle::with_template("{bar:20} [{pos}/{len}] {wide_msg:.dim}").unwrap(),
);
progress.set_message("Building wheels...");
Self { progress }
}
}
impl BuildReporter {
#[must_use]
pub(crate) fn with_length(self, length: u64) -> Self {
self.progress.set_length(length);
self
}
}
impl puffin_installer::BuildReporter for BuildReporter {
fn on_progress(&self, wheel: &Dist) {
self.progress.set_message(format!("{wheel}"));
self.progress.inc(1);
}
fn on_complete(&self) {
self.progress.finish_and_clear();
}
}
#[derive(Debug)]
pub(crate) struct ResolverReporter {
printer: Printer,
@ -242,11 +280,16 @@ impl puffin_resolver::ResolverReporter for ResolverReporter {
}
}
fn on_download_progress(&self, download: &Download) {
self.progress.set_message(format!("{download}"));
self.progress.inc(1);
}
fn on_complete(&self) {
self.progress.finish_and_clear();
}
fn on_build_start(&self, dist: &Dist) -> usize {
fn on_build_start(&self, dist: &SourceDist) -> usize {
let progress = self.multi_progress.insert_before(
&self.progress,
ProgressBar::with_draw_target(None, self.printer.target()),
@ -264,7 +307,7 @@ impl puffin_resolver::ResolverReporter for ResolverReporter {
bars.len() - 1
}
fn on_build_complete(&self, dist: &Dist, index: usize) {
fn on_build_complete(&self, dist: &SourceDist, index: usize) {
let bars = self.bars.lock().unwrap();
let progress = &bars[index];
progress.finish_with_message(format!(
@ -274,6 +317,10 @@ impl puffin_resolver::ResolverReporter for ResolverReporter {
));
}
fn on_download_and_build_complete(&self) {
self.progress.finish_and_clear();
}
fn on_checkout_start(&self, url: &Url, rev: &str) -> usize {
let progress = self.multi_progress.insert_before(
&self.progress,
@ -318,3 +365,11 @@ impl ColorDisplay for &Dist {
format!("{}{}", name, version_or_url.to_string().dimmed())
}
}
impl ColorDisplay for SourceDist {
fn to_color_string(&self) -> String {
let name = self.name();
let version_or_url = self.version_or_url();
format!("{}{}", name, version_or_url.to_string().dimmed())
}
}

View file

@ -6,14 +6,17 @@ info:
- pip-compile
- requirements.in
- "--cache-dir"
- /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpGuzVHM
- /tmp/.tmpR1rZex
- "--exclude-newer"
- "2023-11-18T12:00:00Z"
env:
VIRTUAL_ENV: /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpwgTo7l/.venv
VIRTUAL_ENV: /tmp/.tmpy0g4rP/.venv
---
success: false
exit_code: 2
----- stdout -----
----- stderr -----
error: Package metadata name `flask` does not match given name `dask`
error: Failed to download and build dask @ git+https://github.com/pallets/flask.git@3.0.0
Caused by: Package metadata name `flask` does not match given name `dask`

View file

@ -6,9 +6,9 @@ info:
- pip-sync
- requirements.txt
- "--cache-dir"
- /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpMEVMlO
- /tmp/.tmpoJsrzd
env:
VIRTUAL_ENV: /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpevmjat/.venv
VIRTUAL_ENV: /tmp/.tmpkdU3Lp/.venv
---
success: true
exit_code: 0
@ -17,7 +17,6 @@ exit_code: 0
----- stderr -----
Resolved 1 package in [TIME]
Downloaded 1 package in [TIME]
Built 1 package in [TIME]
Unzipped 1 package in [TIME]
Installed 1 package in [TIME]
+ werkzeug @ git+https://github.com/pallets/werkzeug.git@af160e0b6b7ddd81c22f1652c728ff5ac72d5c74

View file

@ -6,9 +6,9 @@ info:
- pip-sync
- requirements.txt
- "--cache-dir"
- /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpJvtAcl
- /tmp/.tmpLAs2NP
env:
VIRTUAL_ENV: /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpYIBn2G/.venv
VIRTUAL_ENV: /tmp/.tmpVSMTrb/.venv
---
success: true
exit_code: 0
@ -17,7 +17,6 @@ exit_code: 0
----- stderr -----
Resolved 2 packages in [TIME]
Downloaded 2 packages in [TIME]
Built 2 packages in [TIME]
Unzipped 2 packages in [TIME]
Installed 2 packages in [TIME]
+ example-pkg-a @ git+https://github.com/pypa/sample-namespace-packages.git@df7530eeb8fa0cb7dbb8ecb28363e8e36bfa2f45#subdirectory=pkg_resources/pkg_a

View file

@ -6,9 +6,9 @@ info:
- pip-sync
- requirements.txt
- "--cache-dir"
- /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpaUnvqN
- /tmp/.tmpHd5hvb
env:
VIRTUAL_ENV: /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpd1OThU/.venv
VIRTUAL_ENV: /tmp/.tmpvqrQH1/.venv
---
success: true
exit_code: 0
@ -17,7 +17,6 @@ exit_code: 0
----- stderr -----
Resolved 1 package in [TIME]
Downloaded 1 package in [TIME]
Built 1 package in [TIME]
Unzipped 1 package in [TIME]
Installed 1 package in [TIME]
+ werkzeug @ git+https://github.com/pallets/werkzeug.git@2.0.0

View file

@ -6,9 +6,9 @@ info:
- pip-sync
- requirements.txt
- "--cache-dir"
- /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmp1HfKKY
- /tmp/.tmpvKBMwz
env:
VIRTUAL_ENV: /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpBmYNQk/.venv
VIRTUAL_ENV: /tmp/.tmpDB96k4/.venv
---
success: true
exit_code: 0
@ -17,7 +17,6 @@ exit_code: 0
----- stderr -----
Resolved 1 package in [TIME]
Downloaded 1 package in [TIME]
Built 1 package in [TIME]
Unzipped 1 package in [TIME]
Installed 1 package in [TIME]
+ flask @ file://[TEMP_DIR]/flask-3.0.0.tar.gz

View file

@ -6,9 +6,9 @@ info:
- pip-sync
- requirements.txt
- "--cache-dir"
- /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpTtWuay
- /tmp/.tmpVf4dq6
env:
VIRTUAL_ENV: /var/folders/nt/6gf2v7_s3k13zq_t3944rwz40000gn/T/.tmpdSdhVI/.venv
VIRTUAL_ENV: /tmp/.tmpbJAXdQ/.venv
---
success: true
exit_code: 0
@ -17,7 +17,6 @@ exit_code: 0
----- stderr -----
Resolved 1 package in [TIME]
Downloaded 1 package in [TIME]
Built 1 package in [TIME]
Unzipped 1 package in [TIME]
Installed 1 package in [TIME]
+ werkzeug==0.9.6

View file

@ -5,13 +5,14 @@ edition = "2021"
[dependencies]
distribution-filename = { path = "../distribution-filename" }
distribution-types = { path = "../distribution-types" }
install-wheel-rs = { path = "../install-wheel-rs" }
puffin-cache = { path = "../puffin-cache" }
puffin-normalize = { path = "../puffin-normalize" }
pypi-types = { path = "../pypi-types" }
async_http_range_reader = { workspace = true }
async_zip = { workspace = true }
async_zip = { workspace = true, features = ["tokio"] }
futures = { workspace = true }
fs-err = { workspace = true, features = ["tokio"] }
http = { workspace = true }

View file

@ -3,13 +3,33 @@ use std::path::Path;
use std::time::SystemTime;
use http_cache_semantics::{AfterResponse, BeforeRequest, CachePolicy};
use reqwest::{Client, Request, Response};
use reqwest::{Request, Response};
use reqwest_middleware::ClientWithMiddleware;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tempfile::NamedTempFile;
use tracing::{trace, warn};
use crate::error::Error;
/// Either a cached client error or a (user specified) error from the callback
pub enum CachedClientError<CallbackError> {
Client(crate::Error),
Callback(CallbackError),
}
impl<CallbackError> From<crate::Error> for CachedClientError<CallbackError> {
fn from(error: crate::Error) -> Self {
CachedClientError::Client(error)
}
}
impl From<CachedClientError<crate::Error>> for crate::Error {
fn from(error: CachedClientError<crate::Error>) -> crate::Error {
match error {
CachedClientError::Client(error) => error,
CachedClientError::Callback(error) => error,
}
}
}
#[derive(Debug)]
enum CachedResponse<Payload: Serialize> {
@ -25,15 +45,15 @@ enum CachedResponse<Payload: Serialize> {
/// Serialize the actual payload together with its caching information
#[derive(Debug, Deserialize, Serialize)]
struct DataWithCachePolicy<Payload: Serialize> {
data: Payload,
pub struct DataWithCachePolicy<Payload: Serialize> {
pub data: Payload,
cache_policy: CachePolicy,
}
/// Custom caching layer over [`reqwest::Client`] using `http-cache-semantics`.
///
/// This effective middleware takes inspiration from the `http-cache` crate, but unlike this crate,
/// we allow running an async callback on the response before caching. We use this to e.g. store a
/// The implementation takes inspiration from the `http-cache` crate, but adds support for running
/// an async callback on the response before caching. We use this to e.g. store a
/// parsed version of the wheel metadata and for our remote zip reader. In the latter case, we want
/// to read a single file from a remote zip using range requests (so we don't have to download the
/// entire file). We send a HEAD request in the caching layer to check if the remote file has
@ -44,21 +64,29 @@ struct DataWithCachePolicy<Payload: Serialize> {
/// transparently switch to a faster/smaller format.
///
/// Again unlike `http-cache`, the caller gets full control over the cache key with the assumption
/// that it's a file. TODO(konstin): Centralize the cache bucket management.
/// that it's a file.
#[derive(Debug, Clone)]
pub(crate) struct CachedClient(Client);
pub struct CachedClient(ClientWithMiddleware);
impl CachedClient {
pub(crate) fn new(client: Client) -> Self {
pub fn new(client: ClientWithMiddleware) -> Self {
Self(client)
}
/// Makes a cached request with a custom response transformation
/// The middleware is the retry strategy
pub fn uncached(&self) -> ClientWithMiddleware {
self.0.clone()
}
/// Make a cached request with a custom response transformation
///
/// If a new response was received (no prior cached response or modified on the remote), the
/// response through `response_callback` and only the result is cached and returned
pub(crate) async fn get_cached_with_callback<
/// response is passed through `response_callback` and only the result is cached and returned.
/// The `response_callback` is allowed to make subsequent requests, e.g. through the uncached
/// client.
pub async fn get_cached_with_callback<
Payload: Serialize + DeserializeOwned,
CallBackError,
Callback,
CallbackReturn,
>(
@ -67,10 +95,10 @@ impl CachedClient {
cache_dir: &Path,
filename: &str,
response_callback: Callback,
) -> Result<Payload, Error>
) -> Result<Payload, CachedClientError<CallBackError>>
where
Callback: FnOnce(Response) -> CallbackReturn,
CallbackReturn: Future<Output = Result<Payload, Error>>,
CallbackReturn: Future<Output = Result<Payload, CallBackError>>,
{
let cache_file = cache_dir.join(filename);
let cached = if let Ok(cached) = fs_err::tokio::read(&cache_file).await {
@ -94,21 +122,33 @@ impl CachedClient {
match cached_response {
CachedResponse::FreshCache(data) => Ok(data),
CachedResponse::NotModified(data_with_cache_policy) => {
let temp_file = NamedTempFile::new_in(cache_dir)?;
fs_err::tokio::write(&temp_file, &serde_json::to_vec(&data_with_cache_policy)?)
.await?;
temp_file.persist(cache_file)?;
let temp_file = NamedTempFile::new_in(cache_dir).map_err(crate::Error::from)?;
fs_err::tokio::write(
&temp_file,
&serde_json::to_vec(&data_with_cache_policy).map_err(crate::Error::from)?,
)
.await
.map_err(crate::Error::from)?;
temp_file.persist(cache_file).map_err(crate::Error::from)?;
Ok(data_with_cache_policy.data)
}
CachedResponse::ModifiedOrNew(res, cache_policy) => {
let data = response_callback(res).await?;
let data = response_callback(res)
.await
.map_err(|err| CachedClientError::Callback(err))?;
if let Some(cache_policy) = cache_policy {
let data_with_cache_policy = DataWithCachePolicy { data, cache_policy };
fs_err::tokio::create_dir_all(cache_dir).await?;
let temp_file = NamedTempFile::new_in(cache_dir)?;
fs_err::tokio::write(&temp_file, &serde_json::to_vec(&data_with_cache_policy)?)
.await?;
temp_file.persist(cache_file)?;
fs_err::tokio::create_dir_all(cache_dir)
.await
.map_err(crate::Error::from)?;
let temp_file = NamedTempFile::new_in(cache_dir).map_err(crate::Error::from)?;
fs_err::tokio::write(
&temp_file,
&serde_json::to_vec(&data_with_cache_policy).map_err(crate::Error::from)?,
)
.await
.map_err(crate::Error::from)?;
temp_file.persist(cache_file).map_err(crate::Error::from)?;
Ok(data_with_cache_policy.data)
} else {
Ok(data)
@ -122,7 +162,7 @@ impl CachedClient {
&self,
mut req: Request,
cached: Option<DataWithCachePolicy<T>>,
) -> Result<CachedResponse<T>, Error> {
) -> Result<CachedResponse<T>, crate::Error> {
// The converted types are from the specific `reqwest` types to the more generic `http`
// types
let mut converted_req = http::Request::try_from(
@ -196,7 +236,7 @@ impl CachedClient {
&self,
req: Request,
converted_req: http::Request<reqwest::Body>,
) -> Result<CachedResponse<T>, Error> {
) -> Result<CachedResponse<T>, crate::Error> {
trace!("{} {}", req.method(), req.url());
let res = self.0.execute(req).await?.error_for_status()?;
let mut converted_res = http::Response::new(());

View file

@ -4,7 +4,7 @@ use std::str::FromStr;
use async_http_range_reader::{AsyncHttpRangeReader, AsyncHttpRangeReaderError};
use async_zip::tokio::read::seek::ZipFileReader;
use futures::{AsyncRead, StreamExt, TryStreamExt};
use futures::TryStreamExt;
use http_cache_reqwest::{CACacheManager, Cache, CacheMode, HttpCache, HttpCacheOptions};
use reqwest::{Client, ClientBuilder, Response, StatusCode};
use reqwest_middleware::ClientWithMiddleware;
@ -12,19 +12,19 @@ use reqwest_retry::policies::ExponentialBackoff;
use reqwest_retry::RetryTransientMiddleware;
use tempfile::tempfile;
use tokio::io::BufWriter;
use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{debug, trace};
use url::Url;
use distribution_filename::WheelFilename;
use distribution_types::{BuiltDist, Metadata};
use install_wheel_rs::find_dist_info;
use puffin_cache::metadata::WheelMetadataCache;
use puffin_cache::WheelMetadataCache;
use puffin_normalize::PackageName;
use pypi_types::{File, IndexUrl, Metadata21, SimpleJson};
use crate::cached_client::CachedClient;
use crate::error::Error;
use crate::remote_metadata::wheel_metadata_from_remote_zip;
use crate::{CachedClient, CachedClientError, Error};
/// A builder for an [`RegistryClient`].
#[derive(Debug, Clone)]
@ -114,18 +114,20 @@ impl RegistryClientBuilder {
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(self.retries);
let retry_strategy = RetryTransientMiddleware::new_with_policy(retry_policy);
let uncached_client_builder =
reqwest_middleware::ClientBuilder::new(client_raw.clone()).with(retry_strategy);
let uncached_client = reqwest_middleware::ClientBuilder::new(client_raw.clone())
.with(retry_strategy)
.build();
let cached_client = CachedClient::new(uncached_client.clone());
RegistryClient {
index: self.index,
extra_index: self.extra_index,
no_index: self.no_index,
client: client_builder.build(),
client_raw: client_raw.clone(),
uncached_client: uncached_client_builder.build(),
uncached_client,
cache: self.cache,
cached_client: CachedClient::new(client_raw),
cached_client,
}
}
}
@ -147,6 +149,10 @@ pub struct RegistryClient {
}
impl RegistryClient {
pub fn cached_client(&self) -> &CachedClient {
&self.cached_client
}
/// Fetch a package from the `PyPI` simple API.
pub async fn simple(&self, package_name: PackageName) -> Result<(IndexUrl, SimpleJson), Error> {
if self.no_index {
@ -197,8 +203,49 @@ impl RegistryClient {
.await?)
}
/// Fetch the metadata for a remote wheel file.
///
/// For a remote wheel, we try the following ways to fetch the metadata:
/// 1. From a [PEP 658](https://peps.python.org/pep-0658/) data-dist-info-metadata url
/// 2. From a remote wheel by partial zip reading
/// 3. From a (temp) download of a remote wheel (this is a fallback, the webserver should support range requests)
pub async fn wheel_metadata(&self, built_dist: &BuiltDist) -> Result<Metadata21, Error> {
let metadata = match &built_dist {
BuiltDist::Registry(wheel) => {
self.wheel_metadata_registry(wheel.index.clone(), wheel.file.clone())
.await?
}
BuiltDist::DirectUrl(wheel) => {
self.wheel_metadata_no_pep658(
&wheel.filename,
&wheel.url,
WheelMetadataCache::Url(wheel.url.clone()),
)
.await?
}
BuiltDist::Path(wheel) => {
let reader = fs_err::tokio::File::open(&wheel.path).await?;
Self::metadata_from_async_read(&wheel.filename, built_dist.to_string(), reader)
.await?
}
};
if metadata.name != *built_dist.name() {
return Err(Error::NameMismatch {
metadata: metadata.name,
given: built_dist.name().clone(),
});
}
Ok(metadata)
}
/// Fetch the metadata from a wheel file.
pub async fn wheel_metadata(&self, index: IndexUrl, file: File) -> Result<Metadata21, Error> {
async fn wheel_metadata_registry(
&self,
index: IndexUrl,
file: File,
) -> Result<Metadata21, Error> {
if self.no_index {
return Err(Error::NoIndex(file.filename));
}
@ -213,17 +260,20 @@ impl RegistryClient {
{
let url = Url::parse(&format!("{}.metadata", file.url))?;
let cache_dir = WheelMetadataCache::Index(index).cache_dir(&self.cache, &url);
let cache_dir = self
.cache
.join(WheelMetadataCache::Index(index).wheel_dir());
let cache_file = format!("{}.json", filename.stem());
let response_callback = |response: Response| async {
Metadata21::parse(response.bytes().await?.as_ref())
.map_err(|err| Error::MetadataParseError(filename, url.clone(), err))
.map_err(|err| Error::MetadataParseError(filename, url.to_string(), err))
};
let req = self.client_raw.get(url.clone()).build()?;
self.cached_client
Ok(self
.cached_client
.get_cached_with_callback(req, &cache_dir, &cache_file, response_callback)
.await
.await?)
} else {
// If we lack PEP 658 support, try using HTTP range requests to read only the
// `.dist-info/METADATA` file from the zip, and if that also fails, download the whole wheel
@ -234,7 +284,7 @@ impl RegistryClient {
}
/// Get the wheel metadata if it isn't available in an index through PEP 658
pub async fn wheel_metadata_no_pep658(
async fn wheel_metadata_no_pep658(
&self,
filename: &WheelFilename,
url: &Url,
@ -244,7 +294,7 @@ impl RegistryClient {
return Err(Error::NoIndex(url.to_string()));
}
let cache_dir = cache_shard.cache_dir(&self.cache, url);
let cache_dir = self.cache.join(cache_shard.wheel_dir());
let cache_file = format!("{}.json", filename.stem());
// This response callback is special, we actually make a number of subsequent requests to
@ -255,7 +305,7 @@ impl RegistryClient {
trace!("Getting metadata for {filename} by range request");
let text = wheel_metadata_from_remote_zip(filename, &mut reader).await?;
let metadata = Metadata21::parse(text.as_bytes())
.map_err(|err| Error::MetadataParseError(filename.clone(), url.clone(), err))?;
.map_err(|err| Error::MetadataParseError(filename.clone(), url.to_string(), err))?;
Ok(metadata)
};
@ -274,10 +324,10 @@ impl RegistryClient {
Ok(metadata) => {
return Ok(metadata);
}
Err(Error::AsyncHttpRangeReader(
Err(CachedClientError::Client(Error::AsyncHttpRangeReader(
AsyncHttpRangeReaderError::HttpRangeRequestUnsupported,
)) => {}
Err(err) => return Err(err),
))) => {}
Err(err) => return Err(err.into()),
}
// The range request version failed (this is bad, the webserver should support this), fall
@ -293,15 +343,23 @@ impl RegistryClient {
let mut writer = BufWriter::new(tokio::fs::File::from_std(temp_download));
let mut reader = self.stream_external(url).await?.compat();
tokio::io::copy(&mut reader, &mut writer).await?;
let temp_download = writer.into_inner();
let reader = writer.into_inner();
let mut reader = ZipFileReader::new(temp_download.compat())
Self::metadata_from_async_read(filename, url.to_string(), reader).await
}
async fn metadata_from_async_read(
filename: &WheelFilename,
debug_source: String,
reader: impl tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin,
) -> Result<Metadata21, Error> {
let mut zip_reader = ZipFileReader::with_tokio(reader)
.await
.map_err(|err| Error::Zip(filename.clone(), err))?;
let (metadata_idx, _dist_info_dir) = find_dist_info(
filename,
reader
zip_reader
.file()
.entries()
.iter()
@ -311,7 +369,7 @@ impl RegistryClient {
// Read the contents of the METADATA file
let mut contents = Vec::new();
reader
zip_reader
.reader_with_entry(metadata_idx)
.await
.map_err(|err| Error::Zip(filename.clone(), err))?
@ -320,7 +378,7 @@ impl RegistryClient {
.map_err(|err| Error::Zip(filename.clone(), err))?;
let metadata = Metadata21::parse(&contents)
.map_err(|err| Error::MetadataParseError(filename.clone(), url.clone(), err))?;
.map_err(|err| Error::MetadataParseError(filename.clone(), debug_source, err))?;
Ok(metadata)
}
@ -328,7 +386,7 @@ impl RegistryClient {
pub async fn stream_external(
&self,
url: &Url,
) -> Result<Box<dyn AsyncRead + Unpin + Send + Sync>, Error> {
) -> Result<Box<dyn futures::AsyncRead + Unpin + Send + Sync>, Error> {
if self.no_index {
return Err(Error::NoIndex(url.to_string()));
}
@ -340,10 +398,7 @@ impl RegistryClient {
.await?
.error_for_status()?
.bytes_stream()
.map(|r| match r {
Ok(bytes) => Ok(bytes),
Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)),
})
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
.into_async_read(),
))
}

View file

@ -6,6 +6,7 @@ use thiserror::Error;
use url::Url;
use distribution_filename::{WheelFilename, WheelFilenameError};
use puffin_normalize::PackageName;
#[derive(Debug, Error)]
pub enum Error {
@ -27,8 +28,8 @@ pub enum Error {
PackageNotFound(String),
/// The metadata file could not be parsed.
#[error("Couldn't parse metadata in {0} ({1})")]
MetadataParseError(WheelFilename, Url, #[source] pypi_types::Error),
#[error("Couldn't parse metadata of {0} from {1}")]
MetadataParseError(WheelFilename, String, #[source] pypi_types::Error),
/// The metadata file was not found in the registry.
#[error("File `{0}` was not found in the registry at {1}.")]
@ -56,6 +57,12 @@ pub enum Error {
#[error("{0} is not a valid wheel filename")]
WheelFilename(#[from] WheelFilenameError),
#[error("Package metadata name `{metadata}` does not match given name `{given}`")]
NameMismatch {
given: PackageName,
metadata: PackageName,
},
#[error("The wheel {0} is not a valid zip file")]
Zip(WheelFilename, #[source] ZipError),

View file

@ -1,3 +1,4 @@
pub use cached_client::{CachedClient, CachedClientError, DataWithCachePolicy};
pub use client::{RegistryClient, RegistryClientBuilder};
pub use error::Error;

View file

@ -5,7 +5,7 @@ use tempfile::tempdir;
use url::Url;
use distribution_filename::WheelFilename;
use puffin_cache::metadata::WheelMetadataCache;
use distribution_types::{BuiltDist, DirectUrlBuiltDist};
use puffin_client::RegistryClientBuilder;
#[tokio::test]
@ -18,9 +18,11 @@ async fn remote_metadata_with_and_without_cache() -> Result<()> {
for _ in 0..2 {
let url = "https://files.pythonhosted.org/packages/00/e5/f12a80907d0884e6dff9c16d0c0114d81b8cd07dc3ae54c5e962cc83037e/tqdm-4.66.1-py3-none-any.whl";
let filename = WheelFilename::from_str(url.rsplit_once('/').unwrap().1)?;
let metadata = client
.wheel_metadata_no_pep658(&filename, &Url::parse(url)?, WheelMetadataCache::Url)
.await?;
let dist = BuiltDist::DirectUrl(DirectUrlBuiltDist {
filename,
url: Url::parse(url).unwrap(),
});
let metadata = client.wheel_metadata(&dist).await.unwrap();
assert_eq!(metadata.version.to_string(), "4.66.1");
}

View file

@ -15,6 +15,7 @@ workspace = true
[dependencies]
distribution-filename = { path = "../distribution-filename" }
distribution-types = { path = "../distribution-types" }
gourgeist = { path = "../gourgeist" }
pep508_rs = { path = "../pep508-rs" }
platform-host = { path = "../platform-host" }

View file

@ -1,11 +1,11 @@
use std::str::FromStr;
use anyhow::Result;
use clap::Parser;
use url::Url;
use anyhow::Result;
use distribution_filename::WheelFilename;
use puffin_cache::metadata::WheelMetadataCache;
use distribution_types::{BuiltDist, DirectUrlBuiltDist};
use puffin_cache::{CacheArgs, CacheDir};
use puffin_client::RegistryClientBuilder;
@ -30,7 +30,10 @@ pub(crate) async fn wheel_metadata(args: WheelMetadataArgs) -> Result<()> {
)?;
let metadata = client
.wheel_metadata_no_pep658(&filename, &args.url, WheelMetadataCache::Url)
.wheel_metadata(&BuiltDist::DirectUrl(DirectUrlBuiltDist {
filename,
url: args.url,
}))
.await?;
println!("{metadata:?}");
Ok(())

View file

@ -8,7 +8,7 @@ use std::pin::Pin;
use anyhow::Result;
use anyhow::{bail, Context};
use itertools::{Either, Itertools};
use itertools::Itertools;
use tracing::{debug, instrument};
use distribution_types::Metadata;
@ -16,7 +16,8 @@ use pep508_rs::Requirement;
use platform_tags::Tags;
use puffin_build::{SourceBuild, SourceBuildContext};
use puffin_client::RegistryClient;
use puffin_installer::{Builder, Downloader, InstallPlan, Installer, Unzipper};
use puffin_distribution::DistributionDatabase;
use puffin_installer::{InstallPlan, Installer, Unzipper};
use puffin_interpreter::{Interpreter, Virtualenv};
use puffin_resolver::{DistFinder, Manifest, ResolutionOptions, Resolver};
use puffin_traits::BuildContext;
@ -139,57 +140,34 @@ impl BuildContext for BuildDispatch {
};
// Download any missing distributions.
let downloads = if remote.is_empty() {
let wheels = if remote.is_empty() {
vec![]
} else {
// TODO(konstin): Check that there is no endless recursion
let fetcher = DistributionDatabase::new(self.cache(), &tags, &self.client, self);
debug!(
"Downloading build requirement{}: {}",
"Downloading and building requirement{} for build: {}",
if remote.len() == 1 { "" } else { "s" },
remote.iter().map(ToString::to_string).join(", ")
);
Downloader::new(&self.client, &self.cache)
.with_no_build(self.no_build)
.download(remote)
fetcher
.get_wheels(remote)
.await
.context("Failed to download build dependencies")?
.context("Failed to download and build distributions")?
};
let (wheels, sdists): (Vec<_>, Vec<_>) =
downloads
.into_iter()
.partition_map(|download| match download {
puffin_distribution::Download::Wheel(wheel) => Either::Left(wheel),
puffin_distribution::Download::SourceDist(sdist) => Either::Right(sdist),
});
// Build any missing source distributions.
let sdists = if sdists.is_empty() {
vec![]
} else {
debug!(
"Building source distributions{}: {}",
if sdists.len() == 1 { "" } else { "s" },
sdists.iter().map(ToString::to_string).join(", ")
);
Builder::new(self)
.build(sdists)
.await
.context("Failed to build source distributions")?
};
let downloads = wheels.into_iter().chain(sdists).collect::<Vec<_>>();
// Unzip any downloaded distributions.
let unzips = if downloads.is_empty() {
let unzips = if wheels.is_empty() {
vec![]
} else {
debug!(
"Unzipping build requirement{}: {}",
if downloads.len() == 1 { "" } else { "s" },
downloads.iter().map(ToString::to_string).join(", ")
if wheels.len() == 1 { "" } else { "s" },
wheels.iter().map(ToString::to_string).join(", ")
);
Unzipper::default()
.unzip(downloads, &self.cache)
.unzip(wheels, &self.cache)
.await
.context("Failed to unpack build dependencies")?
};

View file

@ -13,20 +13,28 @@ license = { workspace = true }
workspace = true
[dependencies]
distribution-filename = { path = "../distribution-filename" }
distribution-filename = { path = "../distribution-filename", features = ["serde"] }
distribution-types = { path = "../distribution-types" }
install-wheel-rs = { path = "../install-wheel-rs" }
platform-tags = { path = "../platform-tags" }
puffin-cache = { path = "../puffin-cache" }
puffin-client = { path = "../puffin-client" }
puffin-git = { path = "../puffin-git" }
puffin-normalize = { path = "../puffin-normalize" }
puffin-traits = { path = "../puffin-traits" }
pypi-types = { path = "../pypi-types" }
anyhow = { workspace = true }
bytesize = { workspace = true }
fs-err = { workspace = true }
fs2 = { workspace = true }
futures = { workspace = true }
fxhash = { workspace = true }
rayon = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true , features = ["derive"] }
serde_json = { workspace = true }
sha2 = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }

View file

@ -0,0 +1,329 @@
use std::cmp::Reverse;
use std::io;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use bytesize::ByteSize;
use fs_err::tokio as fs;
use futures::StreamExt;
use thiserror::Error;
use tokio::task::JoinError;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::debug;
use url::Url;
use distribution_filename::{WheelFilename, WheelFilenameError};
use distribution_types::direct_url::DirectGitUrl;
use distribution_types::{BuiltDist, Dist, Metadata, RemoteSource, SourceDist};
use platform_tags::Tags;
use puffin_client::RegistryClient;
use puffin_git::GitSource;
use puffin_traits::BuildContext;
use pypi_types::Metadata21;
use crate::download::BuiltWheel;
use crate::locks::Locks;
use crate::reporter::Facade;
use crate::{
DiskWheel, Download, InMemoryWheel, LocalWheel, Reporter, SourceDistCachedBuilder,
SourceDistError,
};
// The cache subdirectory in which to store Git repositories.
const GIT_CACHE: &str = "git-v0";
// The cache subdirectory in which to store downloaded wheel archives.
const ARCHIVES_CACHE: &str = "archives-v0";
#[derive(Debug, Error)]
pub enum DistributionDatabaseError {
#[error("Failed to parse '{0}' as url")]
Url(String, #[source] url::ParseError),
#[error(transparent)]
WheelFilename(#[from] WheelFilenameError),
#[error(transparent)]
Client(#[from] puffin_client::Error),
#[error(transparent)]
Io(#[from] io::Error),
#[error(transparent)]
Distribution(#[from] distribution_types::Error),
#[error(transparent)]
SourceBuild(#[from] SourceDistError),
#[error("Failed to build")]
Build(#[source] anyhow::Error),
#[error("Git operation failed")]
Git(#[source] anyhow::Error),
/// Should not occur, i've only seen it when another task panicked
#[error("The task executor is broken, did some other task panic?")]
Join(#[from] JoinError),
#[error("Building source distributions is disabled")]
NoBuild,
}
/// A cached high-level interface to convert distributions (a requirement resolved to a location)
/// to a wheel or wheel metadata.
///
/// For wheel metadata, this happens by either fetching the metadata from the remote wheel or by
/// building the source distribution. For wheel files, either the wheel is downloaded or a source
/// distribution is downloaded, built and the new wheel gets returned.
///
/// All kinds of wheel sources (index, url, path) and source distribution source (index, url, path,
/// git) are supported.
///
/// This struct also has the task of acquiring locks around source dist builds in general and git
/// operation especially.
pub struct DistributionDatabase<'a, Context: BuildContext + Send + Sync> {
cache: &'a Path,
reporter: Option<Arc<dyn Reporter>>,
locks: Arc<Locks>,
client: &'a RegistryClient,
build_context: &'a Context,
builder: SourceDistCachedBuilder<'a, Context>,
}
impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> {
pub fn new(
cache: &'a Path,
tags: &'a Tags,
client: &'a RegistryClient,
build_context: &'a Context,
) -> Self {
Self {
cache,
reporter: None,
locks: Arc::new(Locks::default()),
client,
build_context,
builder: SourceDistCachedBuilder::new(build_context, client.cached_client(), tags),
}
}
/// Set the [`Reporter`] to use for this source distribution fetcher.
#[must_use]
pub fn with_reporter(self, reporter: impl Reporter + 'static) -> Self {
let reporter = Arc::new(reporter);
Self {
reporter: Some(reporter.clone()),
builder: self.builder.with_reporter(reporter),
..self
}
}
/// In parallel, either fetch the wheel or fetch and built source distributions.
pub async fn get_wheels(
&self,
dists: Vec<Dist>,
) -> Result<Vec<LocalWheel>, DistributionDatabaseError> {
// Sort the distributions by size.
let mut dists = dists;
dists.sort_unstable_by_key(|distribution| {
Reverse(distribution.size().unwrap_or(usize::MAX))
});
// Optimization: Skip source dist download when we must not build them anyway
if self.build_context.no_build() && dists.iter().any(|dist| matches!(dist, Dist::Source(_)))
{
return Err(DistributionDatabaseError::NoBuild);
}
// Fetch the distributions in parallel.
let mut downloads_and_builds = Vec::with_capacity(dists.len());
let mut fetches = futures::stream::iter(dists)
.map(|dist| self.get_or_build_wheel(dist))
.buffer_unordered(50);
while let Some(result) = fetches.next().await.transpose()? {
downloads_and_builds.push(result);
}
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_download_and_build_complete();
}
Ok(downloads_and_builds)
}
/// Either fetch the wheel or fetch and build the source distribution
async fn get_or_build_wheel(
&self,
dist: Dist,
) -> Result<LocalWheel, DistributionDatabaseError> {
match &dist {
Dist::Built(BuiltDist::Registry(wheel)) => {
// Fetch the wheel.
let url = Url::parse(&wheel.file.url).map_err(|err| {
DistributionDatabaseError::Url(wheel.file.url.to_string(), err)
})?;
let filename = WheelFilename::from_str(&wheel.file.filename)?;
let reader = self.client.stream_external(&url).await?;
// If the file is greater than 5MB, write it to disk; otherwise, keep it in memory.
let small_size = if let Some(size) = wheel.file.size {
let byte_size = ByteSize::b(size as u64);
if byte_size < ByteSize::mb(5) {
Some(size)
} else {
None
}
} else {
None
};
let local_wheel = if let Some(small_size) = small_size {
debug!(
"Fetching in-memory wheel from registry: {dist} ({})",
ByteSize::b(small_size as u64)
);
// Read into a buffer.
let mut buffer = Vec::with_capacity(small_size);
let mut reader = tokio::io::BufReader::new(reader.compat());
tokio::io::copy(&mut reader, &mut buffer).await?;
LocalWheel::InMemory(InMemoryWheel {
dist: dist.clone(),
filename,
buffer,
})
} else {
let size =
small_size.map_or("unknown size".to_string(), |size| size.to_string());
debug!("Fetching disk-based wheel from registry: {dist} ({size})");
// Create a directory for the wheel.
// TODO(konstin): Change this when the built wheel naming scheme is fixed.
let wheel_dir = self.cache.join(ARCHIVES_CACHE).join(wheel.package_id());
fs::create_dir_all(&wheel_dir).await?;
// Download the wheel to a temporary file.
let wheel_filename = &wheel.file.filename;
let wheel_file = wheel_dir.join(wheel_filename);
let mut writer = tokio::fs::File::create(&wheel_file).await?;
tokio::io::copy(&mut reader.compat(), &mut writer).await?;
LocalWheel::Disk(DiskWheel {
dist: dist.clone(),
filename,
path: wheel_file,
})
};
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_download_progress(&Download::Wheel(local_wheel.clone()));
}
Ok(local_wheel)
}
Dist::Built(BuiltDist::DirectUrl(wheel)) => {
debug!("Fetching disk-based wheel from URL: {}", &wheel.url);
// Create a directory for the wheel.
// TODO(konstin): Change this when the built wheel naming scheme is fixed.
let wheel_dir = self.cache.join(ARCHIVES_CACHE).join(wheel.package_id());
fs::create_dir_all(&wheel_dir).await?;
// Fetch the wheel.
let reader = self.client.stream_external(&wheel.url).await?;
// Download the wheel to the directory.
let wheel_filename = wheel.filename()?;
let wheel_file = wheel_dir.join(wheel_filename);
let mut writer = tokio::fs::File::create(&wheel_file).await?;
tokio::io::copy(&mut reader.compat(), &mut writer).await?;
let local_wheel = LocalWheel::Disk(DiskWheel {
dist: dist.clone(),
filename: wheel.filename.clone(),
path: wheel_file,
});
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_download_progress(&Download::Wheel(local_wheel.clone()));
}
Ok(local_wheel)
}
Dist::Built(BuiltDist::Path(wheel)) => Ok(LocalWheel::Disk(DiskWheel {
dist: dist.clone(),
path: wheel.path.clone(),
filename: wheel.filename.clone(),
})),
Dist::Source(source_dist) => {
let lock = self.locks.acquire(&dist).await;
let _guard = lock.lock().await;
let (built_wheel, _) = self.builder.download_and_build(source_dist).await?;
Ok(LocalWheel::Built(BuiltWheel {
dist: dist.clone(),
filename: built_wheel.filename,
path: built_wheel.path,
}))
}
}
}
/// Either fetch the only wheel metadata (directly from the index or with range requests) or
/// fetch and build the source distribution
pub async fn get_or_build_wheel_metadata(
&self,
dist: &Dist,
) -> Result<(Metadata21, Option<Url>), DistributionDatabaseError> {
match dist {
Dist::Built(built_dist) => Ok((self.client.wheel_metadata(built_dist).await?, None)),
Dist::Source(source_dist) => {
// Optimization: Skip source dist download when we must not build them anyway
if self.build_context.no_build() {
return Err(DistributionDatabaseError::NoBuild);
}
let lock = self.locks.acquire(dist).await;
let _guard = lock.lock().await;
let (built_wheel, precise) = self.builder.download_and_build(source_dist).await?;
Ok((built_wheel.metadata, precise))
}
}
}
/// Given a remote source distribution, return a precise variant, if possible.
///
/// For example, given a Git dependency with a reference to a branch or tag, return a URL
/// with a precise reference to the current commit of that branch or tag.
///
/// This method takes into account various normalizations that are independent from the Git
/// layer. For example: removing `#subdirectory=pkg_dir`-like fragments, and removing `git+`
/// prefix kinds.
pub async fn precise(&self, dist: &Dist) -> Result<Option<Url>, DistributionDatabaseError> {
let Dist::Source(SourceDist::Git(source_dist)) = dist else {
return Ok(None);
};
let git_dir = self.cache.join(GIT_CACHE);
let DirectGitUrl { url, subdirectory } =
DirectGitUrl::try_from(&source_dist.url).map_err(DistributionDatabaseError::Git)?;
// If the commit already contains a complete SHA, short-circuit.
if url.precise().is_some() {
return Ok(None);
}
// Fetch the precise SHA of the Git reference (which could be a branch, a tag, a partial
// commit, etc.).
// Git locks internally (https://stackoverflow.com/a/62841655/3549270)
let source = if let Some(reporter) = self.reporter.clone() {
GitSource::new(url, git_dir).with_reporter(Facade::from(reporter))
} else {
GitSource::new(url, git_dir)
};
let precise = tokio::task::spawn_blocking(move || source.fetch())
.await?
.map_err(DistributionDatabaseError::Git)?;
let url = precise.into_git();
// Re-encode as a URL.
Ok(Some(DirectGitUrl { url, subdirectory }.into()))
}
}

View file

@ -1,64 +1,71 @@
use std::path::PathBuf;
use std::str::FromStr;
use anyhow::{Context, Result};
use tempfile::TempDir;
use anyhow::Result;
use zip::ZipArchive;
use distribution_filename::WheelFilename;
use distribution_types::{Dist, RemoteSource};
use distribution_types::{Dist, SourceDist};
use install_wheel_rs::find_dist_info;
use pypi_types::Metadata21;
use crate::error::Error;
/// A downloaded wheel that's stored in-memory.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct InMemoryWheel {
/// The remote distribution from which this wheel was downloaded.
pub(crate) dist: Dist,
/// The parsed filename
pub(crate) filename: WheelFilename,
/// The contents of the wheel.
pub(crate) buffer: Vec<u8>,
}
/// A downloaded wheel that's stored on-disk.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DiskWheel {
/// The remote distribution from which this wheel was downloaded.
pub(crate) dist: Dist,
/// The parsed filename
pub(crate) filename: WheelFilename,
/// The path to the downloaded wheel.
pub(crate) path: PathBuf,
/// The download location, to be dropped after use.
#[allow(dead_code)]
pub(crate) temp_dir: Option<TempDir>,
}
/// A downloaded wheel.
#[derive(Debug)]
pub enum WheelDownload {
/// A wheel built from a source distribution that's stored on-disk.
#[derive(Debug, Clone)]
pub struct BuiltWheel {
/// The remote source distribution from which this wheel was built.
pub(crate) dist: Dist,
/// The parsed filename
pub(crate) filename: WheelFilename,
/// The path to the built wheel.
pub(crate) path: PathBuf,
}
/// A downloaded or built wheel.
#[derive(Debug, Clone)]
pub enum LocalWheel {
InMemory(InMemoryWheel),
Disk(DiskWheel),
Built(BuiltWheel),
}
/// A downloaded source distribution.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SourceDistDownload {
/// The remote distribution from which this source distribution was downloaded.
pub(crate) dist: Dist,
pub(crate) dist: SourceDist,
/// The path to the downloaded archive or directory.
pub(crate) sdist_file: PathBuf,
/// The subdirectory within the archive or directory.
pub(crate) subdirectory: Option<PathBuf>,
/// We can't use source dist archives, we build them into wheels which we persist and then drop
/// the source distribution. This field is non for git dependencies, which we keep in the cache.
#[allow(dead_code)]
pub(crate) temp_dir: Option<TempDir>,
}
/// A downloaded distribution, either a wheel or a source distribution.
#[derive(Debug)]
pub enum Download {
Wheel(WheelDownload),
Wheel(LocalWheel),
SourceDist(SourceDistDownload),
}
@ -71,11 +78,12 @@ impl std::fmt::Display for Download {
}
}
impl std::fmt::Display for WheelDownload {
impl std::fmt::Display for LocalWheel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WheelDownload::InMemory(wheel) => write!(f, "{}", wheel.dist),
WheelDownload::Disk(wheel) => write!(f, "{}", wheel.dist),
LocalWheel::InMemory(wheel) => write!(f, "{} from {}", wheel.filename, wheel.dist),
LocalWheel::Disk(wheel) => write!(f, "{} from {}", wheel.filename, wheel.dist),
LocalWheel::Built(wheel) => write!(f, "{} from {}", wheel.filename, wheel.dist),
}
}
}
@ -90,13 +98,14 @@ impl InMemoryWheel {
/// Read the [`Metadata21`] from a wheel.
pub fn read_dist_info(&self) -> Result<Metadata21, Error> {
let mut archive = ZipArchive::new(std::io::Cursor::new(&self.buffer))?;
let filename = self
.filename()
.map_err(|err| Error::FilenameParse(self.dist.to_string(), err))?;
let dist_info_dir =
find_dist_info(&filename, archive.file_names().map(|name| (name, name)))
.map_err(|err| Error::DistInfo(self.dist.to_string(), err))?
.1;
let dist_info_dir = find_dist_info(
&self.filename,
archive.file_names().map(|name| (name, name)),
)
.map_err(|err| {
Error::DistInfo(Box::new(self.filename.clone()), self.dist.to_string(), err)
})?
.1;
let dist_info =
std::io::read_to_string(archive.by_name(&format!("{dist_info_dir}/METADATA"))?)?;
Ok(Metadata21::parse(dist_info.as_bytes())?)
@ -107,25 +116,45 @@ impl DiskWheel {
/// Read the [`Metadata21`] from a wheel.
pub fn read_dist_info(&self) -> Result<Metadata21, Error> {
let mut archive = ZipArchive::new(fs_err::File::open(&self.path)?)?;
let filename = self
.filename()
.map_err(|err| Error::FilenameParse(self.dist.to_string(), err))?;
let dist_info_dir =
find_dist_info(&filename, archive.file_names().map(|name| (name, name)))
.map_err(|err| Error::DistInfo(self.dist.to_string(), err))?
.1;
let dist_info_dir = find_dist_info(
&self.filename,
archive.file_names().map(|name| (name, name)),
)
.map_err(|err| {
Error::DistInfo(Box::new(self.filename.clone()), self.dist.to_string(), err)
})?
.1;
let dist_info =
std::io::read_to_string(archive.by_name(&format!("{dist_info_dir}/METADATA"))?)?;
Ok(Metadata21::parse(dist_info.as_bytes())?)
}
}
impl WheelDownload {
impl BuiltWheel {
/// Read the [`Metadata21`] from a wheel.
pub fn read_dist_info(&self) -> Result<Metadata21, Error> {
let mut archive = ZipArchive::new(fs_err::File::open(&self.path)?)?;
let dist_info_dir = find_dist_info(
&self.filename,
archive.file_names().map(|name| (name, name)),
)
.map_err(|err| {
Error::DistInfo(Box::new(self.filename.clone()), self.dist.to_string(), err)
})?
.1;
let dist_info =
std::io::read_to_string(archive.by_name(&format!("{dist_info_dir}/METADATA"))?)?;
Ok(Metadata21::parse(dist_info.as_bytes())?)
}
}
impl LocalWheel {
/// Read the [`Metadata21`] from a wheel.
pub fn read_dist_info(&self) -> Result<Metadata21, Error> {
match self {
WheelDownload::InMemory(wheel) => wheel.read_dist_info(),
WheelDownload::Disk(wheel) => wheel.read_dist_info(),
LocalWheel::InMemory(wheel) => wheel.read_dist_info(),
LocalWheel::Disk(wheel) => wheel.read_dist_info(),
LocalWheel::Built(wheel) => wheel.read_dist_info(),
}
}
}
@ -135,20 +164,6 @@ impl DiskWheel {
pub fn remote(&self) -> &Dist {
&self.dist
}
/// Return the [`WheelFilename`] of this wheel.
pub fn filename(&self) -> Result<WheelFilename> {
// If the wheel was downloaded to disk, it's either a download of a remote wheel, or a
// built source distribution, both of which imply a valid wheel filename.
let filename = WheelFilename::from_str(
self.path
.file_name()
.context("Missing filename")?
.to_str()
.context("Invalid filename")?,
)?;
Ok(filename)
}
}
impl InMemoryWheel {
@ -156,37 +171,39 @@ impl InMemoryWheel {
pub fn remote(&self) -> &Dist {
&self.dist
}
}
/// Return the [`WheelFilename`] of this wheel.
pub fn filename(&self) -> Result<WheelFilename> {
// If the wheel is an in-memory buffer, it's assumed that the underlying distribution is
// itself a wheel, which in turn requires that the filename be parseable.
let filename = WheelFilename::from_str(self.dist.filename()?)?;
Ok(filename)
impl BuiltWheel {
/// Return the [`Dist`] from which this source distribution that this wheel was built from was
/// downloaded.
pub fn remote(&self) -> &Dist {
&self.dist
}
}
impl WheelDownload {
impl LocalWheel {
/// Return the [`Dist`] from which this wheel was downloaded.
pub fn remote(&self) -> &Dist {
match self {
WheelDownload::InMemory(wheel) => wheel.remote(),
WheelDownload::Disk(wheel) => wheel.remote(),
LocalWheel::InMemory(wheel) => wheel.remote(),
LocalWheel::Disk(wheel) => wheel.remote(),
LocalWheel::Built(wheel) => wheel.remote(),
}
}
/// Return the [`WheelFilename`] of this wheel.
pub fn filename(&self) -> Result<WheelFilename> {
pub fn filename(&self) -> &WheelFilename {
match self {
WheelDownload::InMemory(wheel) => wheel.filename(),
WheelDownload::Disk(wheel) => wheel.filename(),
LocalWheel::InMemory(wheel) => &wheel.filename,
LocalWheel::Disk(wheel) => &wheel.filename,
LocalWheel::Built(wheel) => &wheel.filename,
}
}
}
impl SourceDistDownload {
/// Return the [`Dist`] from which this source distribution was downloaded.
pub fn remote(&self) -> &Dist {
pub fn remote(&self) -> &SourceDist {
&self.dist
}
}

View file

@ -1,3 +1,5 @@
use distribution_filename::WheelFilename;
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error(transparent)]
@ -6,8 +8,12 @@ pub enum Error {
PypiTypes(#[from] pypi_types::Error),
#[error(transparent)]
Zip(#[from] zip::result::ZipError),
#[error("Unable to read .dist-info directory for: {0}")]
DistInfo(String, #[source] install_wheel_rs::Error),
#[error("Unable to read .dist-info directory in {0} from {1}")]
DistInfo(
Box<WheelFilename>,
String,
#[source] install_wheel_rs::Error,
),
#[error("Unable to parse wheel filename for: {0}")]
FilenameParse(String, #[source] anyhow::Error),
}

View file

@ -1,376 +0,0 @@
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use anyhow::{bail, Result};
use bytesize::ByteSize;
use fs_err::tokio as fs;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::debug;
use url::Url;
use distribution_filename::WheelFilename;
use distribution_types::direct_url::{DirectArchiveUrl, DirectGitUrl};
use distribution_types::{BuiltDist, Dist, Identifier, Metadata, RemoteSource, SourceDist};
use platform_tags::Tags;
use puffin_cache::metadata::WheelMetadataCache;
use puffin_client::RegistryClient;
use puffin_git::{GitSource, GitUrl};
use puffin_traits::BuildContext;
use pypi_types::Metadata21;
use crate::error::Error;
use crate::reporter::Facade;
use crate::{DiskWheel, Download, InMemoryWheel, Reporter, SourceDistDownload, WheelDownload};
// The cache subdirectory in which to store Git repositories.
const GIT_CACHE: &str = "git-v0";
// The cache subdirectory in which to store downloaded wheel archives.
const ARCHIVES_CACHE: &str = "archives-v0";
/// A high-level interface for accessing distribution metadata and source contents.
pub struct Fetcher<'a> {
cache: &'a Path,
reporter: Option<Arc<dyn Reporter>>,
}
impl<'a> Fetcher<'a> {
/// Initialize a [`Fetcher`].
pub fn new(cache: &'a Path) -> Self {
Self {
cache,
reporter: None,
}
}
/// Set the [`Reporter`] to use for this source distribution fetcher.
#[must_use]
pub fn with_reporter(self, reporter: impl Reporter + 'static) -> Self {
Self {
reporter: Some(Arc::new(reporter)),
..self
}
}
/// Return the [`Metadata21`] for a distribution, if it exists in the cache.
pub fn find_metadata(&self, dist: &Dist, tags: &Tags) -> Result<Option<Metadata21>, Error> {
self.find_in_cache(dist, tags)
.map(|wheel| wheel.read_dist_info())
.transpose()
}
/// Fetch the [`Metadata21`] for a distribution.
///
/// If the given [`Dist`] is a source distribution, the distribution will be downloaded, built,
/// and cached.
pub async fn fetch_metadata(
&self,
dist: &Dist,
client: &RegistryClient,
build_context: &impl BuildContext,
) -> Result<Metadata21> {
match dist {
// Fetch the metadata directly from the registry.
Dist::Built(BuiltDist::Registry(wheel)) => {
let metadata = client
.wheel_metadata(wheel.index.clone(), wheel.file.clone())
.await?;
Ok(metadata)
}
// Fetch the metadata directly from the wheel URL.
Dist::Built(BuiltDist::DirectUrl(wheel)) => {
let metadata = client
.wheel_metadata_no_pep658(&wheel.filename, &wheel.url, WheelMetadataCache::Url)
.await?;
Ok(metadata)
}
// Fetch the distribution, then read the metadata (for built distributions), or build
// the distribution and _then_ read the metadata (for source distributions).
dist => {
// Optimization: Skip source dist download when we must not build them anyway
if build_context.no_build() && matches!(dist, Dist::Source(_)) {
bail!("Building source distributions is disabled");
}
match self.fetch_dist(dist, client).await? {
Download::Wheel(wheel) => {
let metadata = wheel.read_dist_info()?;
Ok(metadata)
}
Download::SourceDist(sdist) => {
let wheel = self.build_sdist(sdist, build_context).await?;
let metadata = wheel.read_dist_info()?;
Ok(metadata)
}
}
}
}
}
/// Download a distribution.
pub async fn fetch_dist(&self, dist: &Dist, client: &RegistryClient) -> Result<Download> {
match &dist {
Dist::Built(BuiltDist::Registry(wheel)) => {
// Fetch the wheel.
let url = Url::parse(&wheel.file.url)?;
let reader = client.stream_external(&url).await?;
// If the file is greater than 5MB, write it to disk; otherwise, keep it in memory.
let small_size = if let Some(size) = wheel.file.size {
let byte_size = ByteSize::b(size as u64);
if byte_size < ByteSize::mb(5) {
Some(size)
} else {
None
}
} else {
None
};
if let Some(small_size) = small_size {
debug!(
"Fetching in-memory wheel from registry: {dist} ({})",
ByteSize::b(small_size as u64)
);
// Read into a buffer.
let mut buffer = Vec::with_capacity(small_size);
let mut reader = tokio::io::BufReader::new(reader.compat());
tokio::io::copy(&mut reader, &mut buffer).await?;
Ok(Download::Wheel(WheelDownload::InMemory(InMemoryWheel {
dist: dist.clone(),
buffer,
})))
} else {
let size =
small_size.map_or("unknown size".to_string(), |size| size.to_string());
debug!("Fetching disk-based wheel from registry: {dist} ({size})");
// Create a directory for the wheel.
let wheel_dir = self.cache.join(ARCHIVES_CACHE).join(wheel.package_id());
fs::create_dir_all(&wheel_dir).await?;
// Download the wheel to a temporary file.
let wheel_filename = &wheel.file.filename;
let wheel_file = wheel_dir.join(wheel_filename);
let mut writer = tokio::fs::File::create(&wheel_file).await?;
tokio::io::copy(&mut reader.compat(), &mut writer).await?;
Ok(Download::Wheel(WheelDownload::Disk(DiskWheel {
dist: dist.clone(),
path: wheel_file,
temp_dir: None,
})))
}
}
Dist::Built(BuiltDist::DirectUrl(wheel)) => {
debug!("Fetching disk-based wheel from URL: {}", &wheel.url);
// Create a directory for the wheel.
let wheel_dir = self.cache.join(ARCHIVES_CACHE).join(wheel.package_id());
fs::create_dir_all(&wheel_dir).await?;
// Fetch the wheel.
let reader = client.stream_external(&wheel.url).await?;
// Download the wheel to the directory.
let wheel_filename = wheel.filename()?;
let wheel_file = wheel_dir.join(wheel_filename);
let mut writer = tokio::fs::File::create(&wheel_file).await?;
tokio::io::copy(&mut reader.compat(), &mut writer).await?;
Ok(Download::Wheel(WheelDownload::Disk(DiskWheel {
dist: dist.clone(),
path: wheel_file,
temp_dir: None,
})))
}
Dist::Built(BuiltDist::Path(wheel)) => {
Ok(Download::Wheel(WheelDownload::Disk(DiskWheel {
dist: dist.clone(),
path: wheel.path.clone(),
temp_dir: None,
})))
}
Dist::Source(SourceDist::Registry(sdist)) => {
debug!(
"Fetching source distribution from registry: {}",
&sdist.file.url
);
let url = Url::parse(&sdist.file.url)?;
let reader = client.stream_external(&url).await?;
// Download the source distribution.
let temp_dir = tempfile::tempdir_in(self.cache)?;
let sdist_filename = sdist.filename()?;
let sdist_file = temp_dir.path().join(sdist_filename);
let mut writer = tokio::fs::File::create(&sdist_file).await?;
tokio::io::copy(&mut reader.compat(), &mut writer).await?;
Ok(Download::SourceDist(SourceDistDownload {
dist: dist.clone(),
sdist_file,
subdirectory: None,
temp_dir: Some(temp_dir),
}))
}
Dist::Source(SourceDist::DirectUrl(sdist)) => {
debug!("Fetching source distribution from URL: {}", sdist.url);
let DirectArchiveUrl { url, subdirectory } = DirectArchiveUrl::from(&sdist.url);
let reader = client.stream_external(&url).await?;
let mut reader = tokio::io::BufReader::new(reader.compat());
// Download the source distribution.
let temp_dir = tempfile::tempdir_in(self.cache)?;
let sdist_filename = sdist.filename()?;
let sdist_file = temp_dir.path().join(sdist_filename);
let mut writer = tokio::fs::File::create(&sdist_file).await?;
tokio::io::copy(&mut reader, &mut writer).await?;
Ok(Download::SourceDist(SourceDistDownload {
dist: dist.clone(),
sdist_file,
subdirectory,
temp_dir: Some(temp_dir),
}))
}
Dist::Source(SourceDist::Git(sdist)) => {
debug!("Fetching source distribution from Git: {}", sdist.url);
let DirectGitUrl { url, subdirectory } = DirectGitUrl::try_from(&sdist.url)?;
let git_dir = self.cache.join(GIT_CACHE);
let source = GitSource::new(url, git_dir);
let sdist_file = tokio::task::spawn_blocking(move || source.fetch())
.await??
.into();
Ok(Download::SourceDist(SourceDistDownload {
dist: dist.clone(),
sdist_file,
subdirectory,
temp_dir: None,
}))
}
Dist::Source(SourceDist::Path(sdist)) => Ok(Download::SourceDist(SourceDistDownload {
dist: dist.clone(),
sdist_file: sdist.path.clone(),
subdirectory: None,
temp_dir: None,
})),
}
}
/// Build a downloaded source distribution.
pub async fn build_sdist(
&self,
dist: SourceDistDownload,
build_context: &impl BuildContext,
) -> Result<WheelDownload> {
let task = self
.reporter
.as_ref()
.map(|reporter| reporter.on_build_start(&dist.dist));
// Create a directory for the wheel.
let wheel_dir = self
.cache
.join(ARCHIVES_CACHE)
.join(dist.remote().package_id());
fs::create_dir_all(&wheel_dir).await?;
// Build the wheel.
// TODO(charlie): If this is a Git dependency, we should do another checkout. If the same
// repository is used by multiple dependencies, at multiple commits, the local checkout may now
// point to the wrong commit.
let disk_filename = build_context
.build_source(
&dist.sdist_file,
dist.subdirectory.as_deref(),
&wheel_dir,
&dist.dist.to_string(),
)
.await?;
let wheel_filename = wheel_dir.join(disk_filename);
if let Some(task) = task {
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_build_complete(&dist.dist, task);
}
}
Ok(WheelDownload::Disk(DiskWheel {
dist: dist.dist,
path: wheel_filename,
temp_dir: None,
}))
}
/// Find a built wheel in the cache.
fn find_in_cache(&self, dist: &Dist, tags: &Tags) -> Option<DiskWheel> {
let wheel_dir = self.cache.join(ARCHIVES_CACHE).join(dist.distribution_id());
let read_dir = fs_err::read_dir(wheel_dir).ok()?;
for entry in read_dir {
let Ok(entry) = entry else {
continue;
};
let Ok(filename) =
WheelFilename::from_str(entry.file_name().to_string_lossy().as_ref())
else {
continue;
};
if filename.is_compatible(tags) {
return Some(DiskWheel {
dist: dist.clone(),
path: entry.path(),
temp_dir: None,
});
}
}
None
}
/// Given a remote source distribution, return a precise variant, if possible.
///
/// For example, given a Git dependency with a reference to a branch or tag, return a URL
/// with a precise reference to the current commit of that branch or tag.
///
/// This method takes into account various normalizations that are independent from the Git
/// layer. For example: removing `#subdirectory=pkg_dir`-like fragments, and removing `git+`
/// prefix kinds.
pub async fn precise(&self, dist: &Dist) -> Result<Option<Url>> {
let Dist::Source(SourceDist::Git(sdist)) = dist else {
return Ok(None);
};
let DirectGitUrl { url, subdirectory } = DirectGitUrl::try_from(&sdist.url)?;
// If the commit already contains a complete SHA, short-circuit.
if url.precise().is_some() {
return Ok(None);
}
// Fetch the precise SHA of the Git reference (which could be a branch, a tag, a partial
// commit, etc.).
let git_dir = self.cache.join(GIT_CACHE);
let source = if let Some(reporter) = self.reporter.clone() {
GitSource::new(url, git_dir).with_reporter(Facade::from(reporter))
} else {
GitSource::new(url, git_dir)
};
let precise = tokio::task::spawn_blocking(move || source.fetch()).await??;
let url = GitUrl::from(precise);
// Re-encode as a URL.
Ok(Some(DirectGitUrl { url, subdirectory }.into()))
}
}

View file

@ -1,11 +1,14 @@
pub use crate::download::{DiskWheel, Download, InMemoryWheel, SourceDistDownload, WheelDownload};
pub use crate::fetcher::Fetcher;
pub use crate::reporter::Reporter;
pub use crate::unzip::Unzip;
pub use distribution_database::{DistributionDatabase, DistributionDatabaseError};
pub use download::{DiskWheel, Download, InMemoryWheel, LocalWheel, SourceDistDownload};
pub use reporter::Reporter;
pub use source_dist::{SourceDistCachedBuilder, SourceDistError};
pub use unzip::Unzip;
mod distribution_database;
mod download;
mod error;
mod fetcher;
mod locks;
mod reporter;
mod source_dist;
mod unzip;
mod vendor;

View file

@ -0,0 +1,49 @@
use std::io;
use std::path::PathBuf;
use std::sync::Arc;
use fs2::FileExt;
use fs_err::File;
use fxhash::FxHashMap;
use tokio::sync::Mutex;
use tracing::error;
use distribution_types::Identifier;
/// A set of locks used to prevent concurrent access to the same resource.
#[derive(Debug, Default)]
pub(crate) struct Locks(Mutex<FxHashMap<String, Arc<Mutex<()>>>>);
impl Locks {
/// Acquire a lock on the given resource.
pub(crate) async fn acquire(&self, dist: &impl Identifier) -> Arc<Mutex<()>> {
let mut map = self.0.lock().await;
map.entry(dist.resource_id())
.or_insert_with(|| Arc::new(Mutex::new(())))
.clone()
}
}
pub(crate) struct LockedFile(File);
impl LockedFile {
pub(crate) fn new(path: impl Into<PathBuf>) -> Result<Self, io::Error> {
let file = File::create(path)?;
// TODO(konstin): Notify the user when the lock isn't free so they know why nothing is
// happening
file.file().lock_exclusive()?;
Ok(Self(file))
}
}
impl Drop for LockedFile {
fn drop(&mut self) {
if let Err(err) = self.0.file().unlock() {
error!(
"Failed to unlock {}, the program might be stuck! Error: {}",
self.0.path().display(),
err
);
}
}
}

View file

@ -1,14 +1,22 @@
use std::sync::Arc;
use distribution_types::Dist;
use url::Url;
use crate::Download;
use distribution_types::SourceDist;
pub trait Reporter: Send + Sync {
/// Callback to invoke when a wheel is downloaded.
fn on_download_progress(&self, download: &Download);
/// Callback to invoke when a source distribution build is kicked off.
fn on_build_start(&self, dist: &Dist) -> usize;
fn on_build_start(&self, dist: &SourceDist) -> usize;
/// Callback to invoke when a source distribution build is complete.
fn on_build_complete(&self, dist: &Dist, id: usize);
fn on_build_complete(&self, dist: &SourceDist, id: usize);
/// Callback to invoke when the operation is complete.
fn on_download_and_build_complete(&self);
/// Callback to invoke when a repository checkout begins.
fn on_checkout_start(&self, url: &Url, rev: &str) -> usize;
@ -17,7 +25,7 @@ pub trait Reporter: Send + Sync {
fn on_checkout_complete(&self, url: &Url, rev: &str, index: usize);
}
/// A facade for converting from [`Reporter`] to [`puffin_git::Reporter`].
/// A facade for converting from [`Reporter`] to [`puffin_git::Reporter`].
pub(crate) struct Facade {
reporter: Arc<dyn Reporter>,
}

View file

@ -0,0 +1,621 @@
//! Fetch and build source distributions from remote sources.
use std::io;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use anyhow::bail;
use fs_err::tokio as fs;
use futures::TryStreamExt;
use fxhash::FxHashMap;
use reqwest::Response;
use serde::{Deserialize, Serialize};
use tempfile::TempDir;
use thiserror::Error;
use tokio::task::JoinError;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::debug;
use url::Url;
use zip::ZipArchive;
use distribution_filename::{WheelFilename, WheelFilenameError};
use distribution_types::direct_url::{DirectArchiveUrl, DirectGitUrl};
use distribution_types::{Dist, GitSourceDist, Identifier, RemoteSource, SourceDist};
use install_wheel_rs::find_dist_info;
use platform_tags::Tags;
use puffin_cache::{digest, CanonicalUrl, WheelMetadataCache};
use puffin_client::{CachedClient, CachedClientError, DataWithCachePolicy};
use puffin_git::{Fetch, GitSource};
use puffin_normalize::PackageName;
use puffin_traits::BuildContext;
use pypi_types::Metadata21;
use crate::download::BuiltWheel;
use crate::locks::LockedFile;
use crate::{Download, Reporter, SourceDistDownload};
const BUILT_WHEELS_CACHE: &str = "built-wheels-v0";
const GIT_CACHE: &str = "git-v0";
/// The caller is responsible for adding the source dist information to the error chain
#[derive(Debug, Error)]
pub enum SourceDistError {
// Network error
#[error("Failed to parse url '{0}'")]
UrlParse(String, #[source] url::ParseError),
#[error("Git operation failed")]
Git(#[source] anyhow::Error),
#[error(transparent)]
Request(#[from] reqwest::Error),
#[error(transparent)]
Client(#[from] puffin_client::Error),
// Cache writing error
#[error(transparent)]
Io(#[from] io::Error),
#[error("Cache (de)serialization failed")]
Serde(#[from] serde_json::Error),
// Build error
#[error("Failed to build")]
Build(#[source] anyhow::Error),
#[error("Built wheel has an invalid filename")]
WheelFilename(#[from] WheelFilenameError),
#[error("Package metadata name `{metadata}` does not match given name `{given}`")]
NameMismatch {
given: PackageName,
metadata: PackageName,
},
#[error("Failed to parse metadata from built wheel")]
Metadata(#[from] crate::error::Error),
/// Should not occur, i've only seen it when another task panicked
#[error("The task executor is broken, did some other task panic?")]
Join(#[from] JoinError),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct DiskFilenameAndMetadata {
/// Relative, un-normalized wheel filename in the cache, which can be different than
/// `WheelFilename::to_string`.
disk_filename: String,
metadata: Metadata21,
}
type Metadata21s = FxHashMap<WheelFilename, DiskFilenameAndMetadata>;
/// The information about the wheel we either just built or got from the cache
#[derive(Debug, Clone)]
pub struct BuiltWheelMetadata {
pub path: PathBuf,
pub filename: WheelFilename,
pub metadata: Metadata21,
}
impl BuiltWheelMetadata {
fn from_cached(
filename: &WheelFilename,
cached_data: &DiskFilenameAndMetadata,
cache: &Path,
source_dist: &SourceDist,
) -> Self {
// TODO(konstin): Change this when the built wheel naming scheme is fixed
let wheel_dir = cache
.join(BUILT_WHEELS_CACHE)
.join(source_dist.distribution_id());
Self {
path: wheel_dir.join(&cached_data.disk_filename),
filename: filename.clone(),
metadata: cached_data.metadata.clone(),
}
}
}
/// Fetch and build a source distribution from a remote source, or from a local cache.
pub struct SourceDistCachedBuilder<'a, T: BuildContext> {
build_context: &'a T,
cached_client: &'a CachedClient,
reporter: Option<Arc<dyn Reporter>>,
tags: &'a Tags,
}
const METADATA_JSON: &str = "metadata.json";
impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
/// Initialize a [`SourceDistCachedBuilder`] from a [`BuildContext`].
pub fn new(build_context: &'a T, cached_client: &'a CachedClient, tags: &'a Tags) -> Self {
Self {
build_context,
reporter: None,
cached_client,
tags,
}
}
/// Set the [`Reporter`] to use for this source distribution fetcher.
#[must_use]
pub fn with_reporter(self, reporter: Arc<dyn Reporter>) -> Self {
Self {
reporter: Some(reporter),
..self
}
}
pub async fn download_and_build(
&self,
source_dist: &SourceDist,
) -> Result<(BuiltWheelMetadata, Option<Url>), SourceDistError> {
let precise = self.precise(source_dist).await?;
let built_wheel_metadata = match &source_dist {
SourceDist::DirectUrl(direct_url_source_dist) => {
let filename = direct_url_source_dist
.filename()
.unwrap_or(direct_url_source_dist.url.path())
.to_string();
let DirectArchiveUrl { url, subdirectory } =
DirectArchiveUrl::from(&direct_url_source_dist.url);
self.url(
source_dist,
&filename,
&url,
WheelMetadataCache::Url(url.clone()),
subdirectory.as_deref(),
)
.await?
}
SourceDist::Registry(registry_source_dist) => {
let url = Url::parse(&registry_source_dist.file.url).map_err(|err| {
SourceDistError::UrlParse(registry_source_dist.file.url.to_string(), err)
})?;
self.url(
source_dist,
&registry_source_dist.file.filename,
&url,
WheelMetadataCache::Index(registry_source_dist.index.clone()),
None,
)
.await?
}
SourceDist::Git(git_source_dist) => {
// TODO(konstin): return precise git reference
self.git(source_dist, git_source_dist).await?
}
SourceDist::Path(path_source_dist) => {
// TODO: Add caching here. See also https://github.com/astral-sh/puffin/issues/478
// TODO(konstin): Change this when the built wheel naming scheme is fixed
let wheel_dir = self
.build_context
.cache()
.join(BUILT_WHEELS_CACHE)
.join(source_dist.distribution_id());
fs::create_dir_all(&wheel_dir).await?;
// Build the wheel.
let disk_filename = self
.build_context
.build_source(
&path_source_dist.path,
None,
&wheel_dir,
&path_source_dist.to_string(),
)
.await
.map_err(SourceDistError::Build)?;
// Read the metadata from the wheel.
let filename = WheelFilename::from_str(&disk_filename)?;
// TODO(konstin): https://github.com/astral-sh/puffin/issues/484
let metadata = BuiltWheel {
dist: Dist::Source(source_dist.clone()),
filename: filename.clone(),
path: wheel_dir.join(&disk_filename),
}
.read_dist_info()?;
BuiltWheelMetadata {
path: wheel_dir.join(disk_filename),
filename,
metadata,
}
}
};
Ok((built_wheel_metadata, precise))
}
#[allow(clippy::too_many_arguments)]
async fn url(
&self,
source_dist: &SourceDist,
filename: &str,
url: &Url,
cache_shard: WheelMetadataCache,
subdirectory: Option<&Path>,
) -> Result<BuiltWheelMetadata, SourceDistError> {
let cache_dir = self
.build_context
.cache()
.join(cache_shard.built_wheel_dir(filename));
let cache_file = METADATA_JSON;
let response_callback = |response| async {
debug!("Downloading and building source distribution: {source_dist}");
let task = self
.reporter
.as_ref()
.map(|reporter| reporter.on_build_start(source_dist));
let (temp_dir, sdist_file) = self.download_source_dist_url(response, filename).await?;
let download = SourceDistDownload {
dist: source_dist.clone(),
sdist_file: sdist_file.clone(),
subdirectory: subdirectory.map(Path::to_path_buf),
};
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_download_progress(&Download::SourceDist(download.clone()));
}
let (disk_filename, wheel_filename, metadata) = self
.build_source_dist(
&download.dist,
temp_dir,
&download.sdist_file,
download.subdirectory.as_deref(),
)
.await
.map_err(SourceDistError::Build)?;
if let Some(task) = task {
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_build_complete(source_dist, task);
}
}
let mut metadatas = Metadata21s::default();
metadatas.insert(
wheel_filename,
DiskFilenameAndMetadata {
disk_filename,
metadata,
},
);
Ok(metadatas)
};
let req = self.cached_client.uncached().get(url.clone()).build()?;
let metadatas = self
.cached_client
.get_cached_with_callback(req, &cache_dir, cache_file, response_callback)
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,
CachedClientError::Client(err) => SourceDistError::Client(err),
})?;
if let Some((filename, cached_data)) = metadatas
.iter()
.find(|(filename, _metadata)| filename.is_compatible(self.tags))
{
return Ok(BuiltWheelMetadata::from_cached(
filename,
cached_data,
self.build_context.cache(),
source_dist,
));
}
// At this point, we're seeing cached metadata (fresh source dist) but the
// wheel(s) we built previously are incompatible
let task = self
.reporter
.as_ref()
.map(|reporter| reporter.on_build_start(source_dist));
let response = self
.cached_client
.uncached()
.get(url.clone())
.send()
.await
.map_err(puffin_client::Error::RequestMiddlewareError)?;
let (temp_dir, sdist_file) = self.download_source_dist_url(response, filename).await?;
let (disk_filename, wheel_filename, metadata) = self
.build_source_dist(source_dist, temp_dir, &sdist_file, subdirectory)
.await
.map_err(SourceDistError::Build)?;
if let Some(task) = task {
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_build_complete(source_dist, task);
}
}
let cached_data = DiskFilenameAndMetadata {
disk_filename: disk_filename.clone(),
metadata: metadata.clone(),
};
// Not elegant that we have to read again here, but also not too relevant given that we
// have to build a source dist next.
// Just return if the response wasn't cacheable or there was another errors that
// `CachedClient` already complained about
if let Ok(cached) = fs::read(cache_dir.join(cache_file)).await {
// If the file exists and it was just read or written by `CachedClient`, we assume it must
// be correct.
let mut cached = serde_json::from_slice::<DataWithCachePolicy<Metadata21s>>(&cached)?;
cached
.data
.insert(wheel_filename.clone(), cached_data.clone());
fs::write(cache_file, serde_json::to_vec(&cached)?).await?;
};
Ok(BuiltWheelMetadata::from_cached(
&wheel_filename,
&cached_data,
self.build_context.cache(),
source_dist,
))
}
async fn git(
&self,
source_dist: &SourceDist,
git_source_dist: &GitSourceDist,
) -> Result<BuiltWheelMetadata, SourceDistError> {
// TODO(konstin): Can we special case when we have a git sha so we know there is no change?
let (fetch, subdirectory) = self
.download_source_dist_git(git_source_dist.url.clone())
.await?;
let git_sha = fetch
.git()
.precise()
.expect("Exact commit after checkout")
.to_string();
let cache_shard = WheelMetadataCache::Git(git_source_dist.url.clone());
let cache_dir = self
.build_context
.cache()
.join(cache_shard.built_wheel_dir(&git_sha));
let cache_file = cache_dir.join(METADATA_JSON);
// TODO(konstin): Change this when the built wheel naming scheme is fixed
let wheel_dir = self
.build_context
.cache()
.join(BUILT_WHEELS_CACHE)
.join(source_dist.distribution_id());
fs::create_dir_all(&wheel_dir).await?;
let mut metadatas = if cache_file.is_file() {
let cached = fs::read(&cache_file).await?;
let metadatas = serde_json::from_slice::<Metadata21s>(&cached)?;
// Do we have previous compatible build of this source dist?
if let Some((filename, cached_data)) = metadatas
.iter()
.find(|(filename, _metadata)| filename.is_compatible(self.tags))
{
return Ok(BuiltWheelMetadata::from_cached(
filename,
cached_data,
self.build_context.cache(),
source_dist,
));
}
metadatas
} else {
Metadata21s::default()
};
let task = self
.reporter
.as_ref()
.map(|reporter| reporter.on_build_start(source_dist));
let (disk_filename, filename, metadata) = self
.build_source_dist(source_dist, None, fetch.path(), subdirectory.as_deref())
.await
.map_err(SourceDistError::Build)?;
if metadata.name != git_source_dist.name {
return Err(SourceDistError::NameMismatch {
metadata: metadata.name,
given: git_source_dist.name.clone(),
});
}
// Store the metadata for this build along with all the other builds
metadatas.insert(
filename.clone(),
DiskFilenameAndMetadata {
disk_filename: disk_filename.clone(),
metadata: metadata.clone(),
},
);
let cached = serde_json::to_vec(&metadatas)?;
fs::create_dir_all(cache_dir).await?;
fs::write(cache_file, cached).await?;
if let Some(task) = task {
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_build_complete(source_dist, task);
}
}
Ok(BuiltWheelMetadata {
path: wheel_dir.join(&disk_filename),
filename,
metadata,
})
}
async fn download_source_dist_url(
&self,
response: Response,
source_dist_filename: &str,
) -> Result<(Option<TempDir>, PathBuf), puffin_client::Error> {
let reader = response
.bytes_stream()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.into_async_read();
let mut reader = tokio::io::BufReader::new(reader.compat());
// Download the source distribution.
let temp_dir = tempfile::tempdir_in(self.build_context.cache())?;
let sdist_file = temp_dir.path().join(source_dist_filename);
let mut writer = tokio::fs::File::create(&sdist_file).await?;
tokio::io::copy(&mut reader, &mut writer).await?;
Ok((Some(temp_dir), sdist_file))
}
async fn download_source_dist_git(
&self,
url: Url,
) -> Result<(Fetch, Option<PathBuf>), SourceDistError> {
debug!("Fetching source distribution from Git: {}", url);
let git_dir = self.build_context.cache().join(GIT_CACHE);
// Avoid races between different processes, too
let locks_dir = git_dir.join("locks");
fs::create_dir_all(&locks_dir).await?;
let _lockfile = LockedFile::new(locks_dir.join(digest(&CanonicalUrl::new(&url))))?;
let DirectGitUrl { url, subdirectory } =
DirectGitUrl::try_from(&url).map_err(SourceDistError::Git)?;
let source = if let Some(reporter) = &self.reporter {
GitSource::new(url, git_dir).with_reporter(Facade::from(reporter.clone()))
} else {
GitSource::new(url, git_dir)
};
let fetch = tokio::task::spawn_blocking(move || source.fetch())
.await?
.map_err(SourceDistError::Git)?;
Ok((fetch, subdirectory))
}
/// Build a source distribution, storing the built wheel in the cache.
///
/// Returns the un-normalized disk filename, the parsed, normalized filename and the metadata
async fn build_source_dist(
&self,
dist: &SourceDist,
temp_dir: Option<TempDir>,
source_dist: &Path,
subdirectory: Option<&Path>,
) -> anyhow::Result<(String, WheelFilename, Metadata21)> {
debug!("Building: {dist}");
if self.build_context.no_build() {
bail!("Building source distributions is disabled");
}
// Create a directory for the wheel.
let wheel_dir = self
.build_context
.cache()
.join(BUILT_WHEELS_CACHE)
.join(dist.distribution_id());
fs::create_dir_all(&wheel_dir).await?;
// Build the wheel.
let disk_filename = self
.build_context
.build_source(source_dist, subdirectory, &wheel_dir, &dist.to_string())
.await?;
if let Some(temp_dir) = temp_dir {
temp_dir.close()?;
}
// Read the metadata from the wheel.
let filename = WheelFilename::from_str(&disk_filename)?;
let mut archive = ZipArchive::new(fs_err::File::open(wheel_dir.join(&disk_filename))?)?;
let dist_info_dir =
find_dist_info(&filename, archive.file_names().map(|name| (name, name)))?.1;
let dist_info =
std::io::read_to_string(archive.by_name(&format!("{dist_info_dir}/METADATA"))?)?;
let metadata = Metadata21::parse(dist_info.as_bytes())?;
debug!("Finished building: {dist}");
Ok((disk_filename, filename, metadata))
}
/// Given a remote source distribution, return a precise variant, if possible.
///
/// For example, given a Git dependency with a reference to a branch or tag, return a URL
/// with a precise reference to the current commit of that branch or tag.
///
/// This method takes into account various normalizations that are independent from the Git
/// layer. For example: removing `#subdirectory=pkg_dir`-like fragments, and removing `git+`
/// prefix kinds.
async fn precise(&self, dist: &SourceDist) -> Result<Option<Url>, SourceDistError> {
let SourceDist::Git(source_dist) = dist else {
return Ok(None);
};
let git_dir = self.build_context.cache().join(GIT_CACHE);
// Avoid races between different processes
let locks = git_dir.join("locks");
fs::create_dir_all(&locks).await?;
let _lockfile = LockedFile::new(locks.join(digest(&CanonicalUrl::new(&source_dist.url))))?;
let DirectGitUrl { url, subdirectory } =
DirectGitUrl::try_from(&source_dist.url).map_err(SourceDistError::Git)?;
// If the commit already contains a complete SHA, short-circuit.
if url.precise().is_some() {
return Ok(None);
}
// Fetch the precise SHA of the Git reference (which could be a branch, a tag, a partial
let git_dir = self.build_context.cache().join(GIT_CACHE);
let source = if let Some(reporter) = &self.reporter {
GitSource::new(url, git_dir).with_reporter(Facade::from(reporter.clone()))
} else {
GitSource::new(url, git_dir)
};
let precise = tokio::task::spawn_blocking(move || source.fetch())
.await?
.map_err(SourceDistError::Git)?;
let url = precise.into_git();
// Re-encode as a URL.
Ok(Some(DirectGitUrl { url, subdirectory }.into()))
}
}
trait SourceDistReporter: Send + Sync {
/// Callback to invoke when a repository checkout begins.
fn on_checkout_start(&self, url: &Url, rev: &str) -> usize;
/// Callback to invoke when a repository checkout completes.
fn on_checkout_complete(&self, url: &Url, rev: &str, index: usize);
}
/// A facade for converting from [`Reporter`] to [`puffin_git::Reporter`].
struct Facade {
reporter: Arc<dyn Reporter>,
}
impl From<Arc<dyn Reporter>> for Facade {
fn from(reporter: Arc<dyn Reporter>) -> Self {
Self { reporter }
}
}
impl puffin_git::Reporter for Facade {
fn on_checkout_start(&self, url: &Url, rev: &str) -> usize {
self.reporter.on_checkout_start(url, rev)
}
fn on_checkout_complete(&self, url: &Url, rev: &str, index: usize) {
self.reporter.on_checkout_complete(url, rev, index);
}
}

View file

@ -5,31 +5,39 @@ use anyhow::Result;
use rayon::prelude::*;
use zip::ZipArchive;
use crate::download::BuiltWheel;
use crate::vendor::{CloneableSeekableReader, HasLength};
use crate::{DiskWheel, InMemoryWheel, WheelDownload};
use crate::{DiskWheel, InMemoryWheel, LocalWheel};
pub trait Unzip {
/// Unzip a wheel into the target directory.
fn unzip(&self, target: &Path) -> Result<()>;
}
impl Unzip for DiskWheel {
fn unzip(&self, target: &Path) -> Result<()> {
unzip_archive(fs_err::File::open(&self.path)?, target)
}
}
impl Unzip for InMemoryWheel {
fn unzip(&self, target: &Path) -> Result<()> {
unzip_archive(std::io::Cursor::new(&self.buffer), target)
}
}
impl Unzip for WheelDownload {
impl Unzip for DiskWheel {
fn unzip(&self, target: &Path) -> Result<()> {
unzip_archive(fs_err::File::open(&self.path)?, target)
}
}
impl Unzip for BuiltWheel {
fn unzip(&self, target: &Path) -> Result<()> {
unzip_archive(fs_err::File::open(&self.path)?, target)
}
}
impl Unzip for LocalWheel {
fn unzip(&self, target: &Path) -> Result<()> {
match self {
WheelDownload::InMemory(wheel) => wheel.unzip(target),
WheelDownload::Disk(wheel) => wheel.unzip(target),
LocalWheel::InMemory(wheel) => wheel.unzip(target),
LocalWheel::Disk(wheel) => wheel.unzip(target),
LocalWheel::Built(wheel) => wheel.unzip(target),
}
}
}

View file

@ -1,7 +1,7 @@
use url::Url;
use crate::git::GitReference;
pub use crate::source::{GitSource, Reporter};
use git::GitReference;
pub use source::{Fetch, GitSource, Reporter};
mod git;
mod source;

View file

@ -1,7 +1,7 @@
//! Git support is derived from Cargo's implementation.
//! Cargo is dual-licensed under either Apache 2.0 or MIT, at the user's choice.
//! Source: <https://github.com/rust-lang/cargo/blob/23eb492cf920ce051abfc56bbaf838514dc8365c/src/cargo/sources/git/source.rs>
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use anyhow::Result;
use reqwest::Client;
@ -120,15 +120,20 @@ pub struct Fetch {
path: PathBuf,
}
impl From<Fetch> for GitUrl {
fn from(fetch: Fetch) -> Self {
fetch.git
impl Fetch {
pub fn git(&self) -> &GitUrl {
&self.git
}
}
impl From<Fetch> for PathBuf {
fn from(fetch: Fetch) -> Self {
fetch.path
pub fn path(&self) -> &Path {
&self.path
}
pub fn into_git(self) -> GitUrl {
self.git
}
pub fn into_path(self) -> PathBuf {
self.path
}
}

View file

@ -20,6 +20,7 @@ pep508_rs = { path = "../pep508-rs" }
puffin-cache = { path = "../puffin-cache" }
puffin-client = { path = "../puffin-client" }
distribution-types = { path = "../distribution-types" }
platform-tags = { path = "../platform-tags" }
puffin-distribution = { path = "../puffin-distribution" }
puffin-git = { path = "../puffin-git" }
puffin-interpreter = { path = "../puffin-interpreter" }

View file

@ -1,73 +0,0 @@
//! Build source distributions from downloaded archives.
use std::cmp::Reverse;
use anyhow::Result;
use tracing::debug;
use distribution_types::{Dist, RemoteSource};
use puffin_distribution::{Fetcher, SourceDistDownload, WheelDownload};
use puffin_traits::BuildContext;
pub struct Builder<'a, T: BuildContext + Send + Sync> {
build_context: &'a T,
reporter: Option<Box<dyn Reporter>>,
}
impl<'a, T: BuildContext + Send + Sync> Builder<'a, T> {
/// Initialize a new source distribution downloader.
pub fn new(build_context: &'a T) -> Self {
Self {
build_context,
reporter: None,
}
}
/// Set the [`Reporter`] to use for this downloader.
#[must_use]
pub fn with_reporter(self, reporter: impl Reporter + 'static) -> Self {
Self {
reporter: Some(Box::new(reporter)),
..self
}
}
/// Build a set of source distributions.
pub async fn build(&self, dists: Vec<SourceDistDownload>) -> Result<Vec<WheelDownload>> {
// Sort the distributions by size.
let mut dists = dists;
dists.sort_unstable_by_key(|distribution| {
Reverse(distribution.remote().size().unwrap_or(usize::MAX))
});
// Build the distributions serially.
let mut builds = Vec::with_capacity(dists.len());
for dist in dists {
debug!("Building source distribution: {dist}");
let result = Fetcher::new(self.build_context.cache())
.build_sdist(dist, self.build_context)
.await?;
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_progress(result.remote());
}
builds.push(result);
}
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_complete();
}
Ok(builds)
}
}
pub trait Reporter: Send + Sync {
/// Callback to invoke when a source distribution is built.
fn on_progress(&self, dist: &Dist);
/// Callback to invoke when the operation is complete.
fn on_complete(&self);
}

View file

@ -1,110 +0,0 @@
use std::cmp::Reverse;
use std::path::Path;
use std::sync::Arc;
use anyhow::{bail, Result};
use futures::StreamExt;
use distribution_types::{Dist, RemoteSource};
use puffin_client::RegistryClient;
use puffin_distribution::{Download, Fetcher};
use crate::locks::Locks;
pub struct Downloader<'a> {
client: &'a RegistryClient,
cache: &'a Path,
locks: Arc<Locks>,
reporter: Option<Box<dyn Reporter>>,
no_build: bool,
}
impl<'a> Downloader<'a> {
/// Initialize a new distribution downloader.
pub fn new(client: &'a RegistryClient, cache: &'a Path) -> Self {
Self {
client,
cache,
locks: Arc::new(Locks::default()),
reporter: None,
no_build: false,
}
}
/// Set the [`Reporter`] to use for this downloader.
#[must_use]
pub fn with_reporter(self, reporter: impl Reporter + 'static) -> Self {
Self {
reporter: Some(Box::new(reporter)),
..self
}
}
/// Optionally, block downloading source distributions.
#[must_use]
pub fn with_no_build(self, no_build: bool) -> Self {
Self { no_build, ..self }
}
/// Download a set of distributions.
pub async fn download(&self, dists: Vec<Dist>) -> Result<Vec<Download>> {
// Sort the distributions by size.
let mut dists = dists;
dists.sort_unstable_by_key(|distribution| {
Reverse(distribution.size().unwrap_or(usize::MAX))
});
// Fetch the distributions in parallel.
let mut downloads = Vec::with_capacity(dists.len());
let mut fetches = futures::stream::iter(dists)
.map(|dist| self.fetch(dist))
.buffer_unordered(50);
while let Some(result) = fetches.next().await.transpose()? {
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_download_progress(&result);
}
downloads.push(result);
}
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_download_complete();
}
Ok(downloads)
}
/// Download a built distribution (wheel) or source distribution (sdist).
async fn fetch(&self, dist: Dist) -> Result<Download> {
match dist {
Dist::Source(_) => {
if self.no_build {
bail!("Building source distributions is disabled; skipping: {dist}");
}
let lock = self.locks.acquire(&dist).await;
let _guard = lock.lock().await;
let metadata = Fetcher::new(self.cache)
.fetch_dist(&dist, self.client)
.await?;
Ok(metadata)
}
Dist::Built(_) => {
let metadata = Fetcher::new(self.cache)
.fetch_dist(&dist, self.client)
.await?;
Ok(metadata)
}
}
}
}
pub trait Reporter: Send + Sync {
/// Callback to invoke when a wheel is downloaded.
fn on_download_progress(&self, download: &Download);
/// Callback to invoke when the operation is complete.
fn on_download_complete(&self);
}

View file

@ -1,5 +1,3 @@
pub use builder::{Builder, Reporter as BuildReporter};
pub use downloader::{Downloader, Reporter as DownloadReporter};
pub use installer::{Installer, Reporter as InstallReporter};
pub use plan::InstallPlan;
pub use registry_index::RegistryIndex;
@ -7,11 +5,8 @@ pub use site_packages::SitePackages;
pub use uninstall::uninstall;
pub use unzipper::{Reporter as UnzipReporter, Unzipper};
mod builder;
mod cache;
mod downloader;
mod installer;
mod locks;
mod plan;
mod registry_index;
mod site_packages;

View file

@ -1,20 +0,0 @@
use std::sync::Arc;
use fxhash::FxHashMap;
use tokio::sync::Mutex;
use distribution_types::Identifier;
/// A set of locks used to prevent concurrent access to the same resource.
#[derive(Debug, Default)]
pub(crate) struct Locks(Mutex<FxHashMap<String, Arc<Mutex<()>>>>);
impl Locks {
/// Acquire a lock on the given resource.
pub(crate) async fn acquire(&self, dist: &impl Identifier) -> Arc<Mutex<()>> {
let mut map = self.0.lock().await;
map.entry(dist.resource_id())
.or_insert_with(|| Arc::new(Mutex::new(())))
.clone()
}
}

View file

@ -5,7 +5,7 @@ use anyhow::Result;
use tracing::debug;
use distribution_types::{CachedDist, Dist, Identifier, RemoteSource};
use puffin_distribution::{Unzip, WheelDownload};
use puffin_distribution::{LocalWheel, Unzip};
use crate::cache::WheelCache;
@ -26,7 +26,7 @@ impl Unzipper {
/// Unzip a set of downloaded wheels.
pub async fn unzip(
&self,
downloads: Vec<WheelDownload>,
downloads: Vec<LocalWheel>,
target: &Path,
) -> Result<Vec<CachedDist>> {
// Create the wheel cache subdirectory, if necessary.

View file

@ -13,7 +13,7 @@ license = { workspace = true }
workspace = true
[dependencies]
distribution-filename = { path = "../distribution-filename" }
distribution-filename = { path = "../distribution-filename", features = ["serde"] }
install-wheel-rs = { path = "../install-wheel-rs" }
pep440_rs = { path = "../pep440-rs" }
pep508_rs = { path = "../pep508-rs" }
@ -32,16 +32,21 @@ pypi-types = { path = "../pypi-types" }
anyhow = { workspace = true }
bitflags = { workspace = true }
clap = { workspace = true, features = ["derive"], optional = true }
chrono = { workspace = true }
clap = { workspace = true, features = ["derive"], optional = true }
colored = { workspace = true }
derivative = { workspace = true }
fs-err = { workspace = true, features = ["tokio"] }
futures = { workspace = true }
fxhash = { workspace = true }
http-cache-semantics = { workspace = true }
itertools = { workspace = true }
once_cell = { workspace = true }
petgraph = { workspace = true }
pubgrub = { workspace = true }
reqwest = { workspace = true }
serde_json = { workspace = true }
sha2 = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
@ -50,8 +55,6 @@ tracing = { workspace = true }
url = { workspace = true }
waitmap = { workspace = true }
zip = { workspace = true }
derivative = { workspace = true }
sha2 = { workspace = true }
[dev-dependencies]
gourgeist = { path = "../gourgeist" }

View file

@ -5,8 +5,9 @@ use pubgrub::report::Reporter;
use thiserror::Error;
use url::Url;
use distribution_types::{BuiltDist, Dist, SourceDist};
use distribution_types::{BuiltDist, SourceDist};
use pep508_rs::Requirement;
use puffin_distribution::DistributionDatabaseError;
use puffin_normalize::PackageName;
use crate::pubgrub::{PubGrubPackage, PubGrubVersion};
@ -53,37 +54,11 @@ pub enum ResolveError {
#[error(transparent)]
DistributionType(#[from] distribution_types::Error),
#[error("Failed to fetch wheel metadata from: {filename}")]
RegistryBuiltDist {
filename: String,
// TODO(konstin): Gives this a proper error type
#[source]
err: anyhow::Error,
},
#[error("Failed to download {0}")]
Fetch(Box<BuiltDist>, #[source] DistributionDatabaseError),
#[error("Failed to fetch wheel metadata from: {url}")]
UrlBuiltDist {
url: Url,
// TODO(konstin): Gives this a proper error type
#[source]
err: anyhow::Error,
},
#[error("Failed to build distribution: {filename}")]
RegistrySourceDist {
filename: String,
// TODO(konstin): Gives this a proper error type
#[source]
err: anyhow::Error,
},
#[error("Failed to build distribution from URL: {url}")]
UrlSourceDist {
url: Url,
// TODO(konstin): Gives this a proper error type
#[source]
err: anyhow::Error,
},
#[error("Failed to download and build {0}")]
FetchAndBuild(Box<SourceDist>, #[source] DistributionDatabaseError),
}
impl<T> From<futures::channel::mpsc::TrySendError<T>> for ResolveError {
@ -116,38 +91,3 @@ impl From<pubgrub::error::PubGrubError<PubGrubPackage, Range<PubGrubVersion>>> f
ResolveError::PubGrub(RichPubGrubError { source: value })
}
}
impl ResolveError {
pub fn from_dist(dist: Dist, err: anyhow::Error) -> Self {
match dist {
Dist::Built(BuiltDist::Registry(wheel)) => Self::RegistryBuiltDist {
filename: wheel.file.filename.clone(),
err,
},
Dist::Built(BuiltDist::DirectUrl(wheel)) => Self::UrlBuiltDist {
url: wheel.url.clone(),
err,
},
Dist::Built(BuiltDist::Path(wheel)) => Self::UrlBuiltDist {
url: wheel.url.clone(),
err,
},
Dist::Source(SourceDist::Registry(sdist)) => Self::RegistrySourceDist {
filename: sdist.file.filename.clone(),
err,
},
Dist::Source(SourceDist::DirectUrl(sdist)) => Self::UrlSourceDist {
url: sdist.url.clone(),
err,
},
Dist::Source(SourceDist::Git(sdist)) => Self::UrlSourceDist {
url: sdist.url.clone(),
err,
},
Dist::Source(SourceDist::Path(sdist)) => Self::UrlBuiltDist {
url: sdist.url.clone(),
err,
},
}
}
}

View file

@ -12,7 +12,6 @@ mod candidate_selector;
mod error;
mod file;
mod finder;
mod locks;
mod manifest;
mod prerelease_mode;
mod pubgrub;

View file

@ -1,20 +0,0 @@
use std::sync::Arc;
use fxhash::FxHashMap;
use tokio::sync::Mutex;
use distribution_types::Identifier;
/// A set of locks used to prevent concurrent access to the same resource.
#[derive(Debug, Default)]
pub(crate) struct Locks(Mutex<FxHashMap<String, Arc<Mutex<()>>>>);
impl Locks {
/// Acquire a lock on the given resource.
pub(crate) async fn acquire(&self, distribution: &impl Identifier) -> Arc<Mutex<()>> {
let mut map = self.0.lock().await;
map.entry(distribution.resource_id())
.or_insert_with(|| Arc::new(Mutex::new(())))
.clone()
}
}

View file

@ -1,6 +1,5 @@
//! Given a set of requirements, find a set of compatible packages.
use std::borrow::Cow;
use std::sync::Arc;
use anyhow::Result;
@ -18,13 +17,12 @@ use url::Url;
use waitmap::WaitMap;
use distribution_filename::WheelFilename;
use distribution_types::{BuiltDist, Dist, Identifier, Metadata, SourceDist, VersionOrUrl};
use distribution_types::{Dist, Identifier, Metadata, SourceDist, VersionOrUrl};
use pep508_rs::{MarkerEnvironment, Requirement};
use platform_tags::Tags;
use puffin_cache::CanonicalUrl;
use puffin_client::RegistryClient;
use puffin_distribution::Fetcher;
use puffin_distribution::{DistributionDatabase, Download};
use puffin_normalize::{ExtraName, PackageName};
use puffin_traits::BuildContext;
use pypi_types::{File, IndexUrl, Metadata21, SimpleJson};
@ -32,7 +30,6 @@ use pypi_types::{File, IndexUrl, Metadata21, SimpleJson};
use crate::candidate_selector::CandidateSelector;
use crate::error::ResolveError;
use crate::file::DistFile;
use crate::locks::Locks;
use crate::manifest::Manifest;
use crate::pubgrub::{
PubGrubDependencies, PubGrubPackage, PubGrubPriorities, PubGrubVersion, MIN_VERSION,
@ -52,7 +49,7 @@ pub struct Resolver<'a, Context: BuildContext + Send + Sync> {
selector: CandidateSelector,
index: Arc<Index>,
exclude_newer: Option<DateTime<Utc>>,
locks: Arc<Locks>,
fetcher: DistributionDatabase<'a, Context>,
build_context: &'a Context,
reporter: Option<Arc<dyn Reporter>>,
}
@ -69,7 +66,6 @@ impl<'a, Context: BuildContext + Send + Sync> Resolver<'a, Context> {
) -> Self {
Self {
index: Arc::new(Index::default()),
locks: Arc::new(Locks::default()),
selector: CandidateSelector::for_resolution(&manifest, options),
allowed_urls: manifest
.requirements
@ -90,6 +86,7 @@ impl<'a, Context: BuildContext + Send + Sync> Resolver<'a, Context> {
markers,
tags,
client,
fetcher: DistributionDatabase::new(build_context.cache(), tags, client, build_context),
build_context,
reporter: None,
}
@ -98,8 +95,10 @@ impl<'a, Context: BuildContext + Send + Sync> Resolver<'a, Context> {
/// Set the [`Reporter`] to use for this installer.
#[must_use]
pub fn with_reporter(self, reporter: impl Reporter + 'static) -> Self {
let reporter = Arc::new(reporter);
Self {
reporter: Some(Arc::new(reporter)),
reporter: Some(reporter.clone()),
fetcher: self.fetcher.with_reporter(Facade { reporter }),
..self
}
}
@ -604,72 +603,18 @@ impl<'a, Context: BuildContext + Send + Sync> Resolver<'a, Context> {
.await
}
// Fetch registry-based wheel metadata.
Request::Dist(Dist::Built(BuiltDist::Registry(wheel))) => {
let metadata = self
.client
.wheel_metadata(wheel.index.clone(), wheel.file.clone())
.await?;
if metadata.name != *wheel.name() {
return Err(ResolveError::NameMismatch {
metadata: metadata.name,
given: wheel.name().clone(),
});
}
Ok(Response::Dist(
Dist::Built(BuiltDist::Registry(wheel)),
metadata,
None,
))
}
// Fetch distribution metadata.
Request::Dist(distribution) => {
let lock = self.locks.acquire(&distribution).await;
let _guard = lock.lock().await;
let fetcher = if let Some(reporter) = self.reporter.clone() {
Fetcher::new(self.build_context.cache()).with_reporter(Facade { reporter })
} else {
Fetcher::new(self.build_context.cache())
};
let precise_url = fetcher
.precise(&distribution)
Request::Dist(dist) => {
let (metadata, precise) = self
.fetcher
.get_or_build_wheel_metadata(&dist)
.await
.map_err(|err| ResolveError::from_dist(distribution.clone(), err))?;
// Insert the `precise`, if it exists.
let precise_distribution = match precise_url.as_ref() {
Some(url) => Cow::Owned(distribution.clone().with_url(url.clone())),
None => Cow::Borrowed(&distribution),
};
// Fetch the metadata for the distribution.
let metadata = {
if let Ok(Some(metadata)) =
fetcher.find_metadata(&precise_distribution, self.tags)
{
debug!("Found distribution metadata in cache: {precise_distribution}");
metadata
} else {
debug!("Downloading distribution: {precise_distribution}");
fetcher
.fetch_metadata(&precise_distribution, self.client, self.build_context)
.await
.map_err(|err| ResolveError::from_dist(distribution.clone(), err))?
}
};
if metadata.name != *distribution.name() {
return Err(ResolveError::NameMismatch {
metadata: metadata.name,
given: distribution.name().clone(),
});
}
Ok(Response::Dist(distribution, metadata, precise_url))
.map_err(|err| match dist.clone() {
Dist::Built(built_dist) => ResolveError::Fetch(Box::new(built_dist), err),
Dist::Source(source_dist) => {
ResolveError::FetchAndBuild(Box::new(source_dist), err)
}
})?;
Ok(Response::Dist(dist, metadata, precise))
}
}
}
@ -705,14 +650,20 @@ pub trait Reporter: Send + Sync {
/// Callback to invoke when a dependency is resolved.
fn on_progress(&self, name: &PackageName, extra: Option<&ExtraName>, version: VersionOrUrl);
/// Callback to invoke when a wheel is downloaded.
fn on_download_progress(&self, download: &Download);
/// Callback to invoke when the resolution is complete.
fn on_complete(&self);
/// Callback to invoke when a source distribution build is kicked off.
fn on_build_start(&self, dist: &Dist) -> usize;
fn on_build_start(&self, dist: &SourceDist) -> usize;
/// Callback to invoke when a source distribution build is complete.
fn on_build_complete(&self, dist: &Dist, id: usize);
fn on_build_complete(&self, dist: &SourceDist, id: usize);
/// Callback to invoke when the operation is complete.
fn on_download_and_build_complete(&self);
/// Callback to invoke when a repository checkout begins.
fn on_checkout_start(&self, url: &Url, rev: &str) -> usize;
@ -721,20 +672,28 @@ pub trait Reporter: Send + Sync {
fn on_checkout_complete(&self, url: &Url, rev: &str, index: usize);
}
/// A facade for converting from [`Reporter`] to [`puffin_distribution::Reporter`].
/// A facade for converting from [`Reporter`] to [`puffin_distribution::Reporter`].
struct Facade {
reporter: Arc<dyn Reporter>,
}
impl puffin_distribution::Reporter for Facade {
fn on_build_start(&self, dist: &Dist) -> usize {
fn on_download_progress(&self, download: &Download) {
self.reporter.on_download_progress(download);
}
fn on_build_start(&self, dist: &SourceDist) -> usize {
self.reporter.on_build_start(dist)
}
fn on_build_complete(&self, dist: &Dist, id: usize) {
fn on_build_complete(&self, dist: &SourceDist, id: usize) {
self.reporter.on_build_complete(dist, id);
}
fn on_download_and_build_complete(&self) {
self.reporter.on_download_and_build_complete();
}
fn on_checkout_start(&self, url: &Url, rev: &str) -> usize {
self.reporter.on_checkout_start(url, rev)
}

View file

@ -31,12 +31,13 @@ static EXCLUDE_NEWER: Lazy<DateTime<Utc>> = Lazy::new(|| {
});
struct DummyContext {
cache: PathBuf,
interpreter: Interpreter,
}
impl BuildContext for DummyContext {
fn cache(&self) -> &Path {
panic!("The test should not need to build source distributions")
&self.cache
}
fn interpreter(&self) -> &Interpreter {
@ -82,6 +83,7 @@ async fn resolve(
let temp_dir = tempdir()?;
let client = RegistryClientBuilder::new(temp_dir.path()).build();
let build_context = DummyContext {
cache: temp_dir.path().to_path_buf(),
interpreter: Interpreter::artificial(
Platform::current()?,
markers.clone(),

View file

@ -1,5 +1,13 @@
# All kinds of dependencies
flask @ https://files.pythonhosted.org/packages/36/42/015c23096649b908c809c69388a805a571a3bea44362fe87e33fc3afa01f/flask-3.0.0-py3-none-any.whl
django_allauth==0.51.0
# pypi wheel
pandas
# url wheel
flask @ https://files.pythonhosted.org/packages/36/42/015c23096649b908c809c69388a805a571a3bea44362fe87e33fc3afa01f/flask-3.0.0-py3-none-any.whl
# pypi source dist
django_allauth==0.51.0
# url source dist
werkzeug @ https://files.pythonhosted.org/packages/0d/cc/ff1904eb5eb4b455e442834dabf9427331ac0fa02853bf83db817a7dd53d/werkzeug-3.0.1.tar.gz
# git source dist
pydantic-extra-types @ git+https://github.com/pydantic/pydantic-extra-types.git