Misc. changes

This commit is contained in:
Charlie Marsh 2023-10-05 00:31:47 -04:00
parent f51432382a
commit 8032d4606e
4 changed files with 134 additions and 92 deletions

View file

@ -3,17 +3,29 @@ use std::path::Path;
use std::str::FromStr; use std::str::FromStr;
use anyhow::Result; use anyhow::Result;
use futures::future::Either;
use futures::{StreamExt, TryFutureExt}; use futures::{StreamExt, TryFutureExt};
use pep440_rs::Version; use pep440_rs::Version;
use pep508_rs::{MarkerEnvironment, Requirement, StringVersion, VersionOrUrl}; use pep508_rs::{MarkerEnvironment, Requirement, StringVersion, VersionOrUrl};
use tracing::trace; use tracing::debug;
use puffin_client::{PypiClientBuilder, SimpleJson}; use puffin_client::{File, PypiClientBuilder, SimpleJson};
use puffin_requirements::metadata::Metadata21;
use puffin_requirements::package_name::PackageName; use puffin_requirements::package_name::PackageName;
use puffin_requirements::wheel::WheelName; use puffin_requirements::wheel::WheelName;
use crate::commands::ExitStatus; use crate::commands::ExitStatus;
enum Request {
Package(Requirement),
Version(Requirement, File),
}
enum Response {
Package(SimpleJson, Requirement),
Version(Metadata21, Requirement),
}
pub(crate) async fn install(src: &Path) -> Result<ExitStatus> { pub(crate) async fn install(src: &Path) -> Result<ExitStatus> {
// TODO(charlie): Fetch from the environment. // TODO(charlie): Fetch from the environment.
let env = MarkerEnvironment { let env = MarkerEnvironment {
@ -37,40 +49,49 @@ pub(crate) async fn install(src: &Path) -> Result<ExitStatus> {
let requirements = puffin_requirements::Requirements::from_str(&requirements_txt)?; let requirements = puffin_requirements::Requirements::from_str(&requirements_txt)?;
// Instantiate a client. // Instantiate a client.
let client = PypiClientBuilder::default().build(); let pypi_client = PypiClientBuilder::default().build();
let proxy_client = PypiClientBuilder::default().build();
// Fetch metadata in parallel. // A channel to fetch package metadata (e.g., given `flask`, fetch all versions) and version
// metadata (e.g., given `flask==1.0.0`, fetch the metadata for that version).
let (package_sink, package_stream) = futures::channel::mpsc::unbounded(); let (package_sink, package_stream) = futures::channel::mpsc::unbounded();
let mut resolution: HashMap<PackageName, Version> = HashMap::with_capacity(requirements.len()); // Initialize the package stream.
// Create a stream of futures that fetch metadata for each requirement.
let mut package_stream = package_stream let mut package_stream = package_stream
.map(|requirement: Requirement| { .map(|request: Request| match request {
client Request::Package(requirement) => Either::Left(
pypi_client
.simple(requirement.name.clone()) .simple(requirement.name.clone())
.map_ok(move |metadata| (metadata, requirement)) .map_ok(move |metadata| Response::Package(metadata, requirement)),
),
Request::Version(requirement, file) => Either::Right(
proxy_client
.file(file.clone())
.map_ok(move |metadata| Response::Version(metadata, requirement)),
),
}) })
.buffer_unordered(48) .buffer_unordered(32)
.ready_chunks(48); .ready_chunks(32);
// Push all the requirements into the sink. // Push all the requirements into the package sink.
let mut in_flight: HashSet<PackageName> = HashSet::with_capacity(requirements.len()); let mut in_flight: HashSet<PackageName> = HashSet::with_capacity(requirements.len());
for requirement in requirements.iter() { for requirement in requirements.iter() {
package_sink.unbounded_send(requirement.clone())?; debug!("--> adding root dependency: {}", requirement);
package_sink.unbounded_send(Request::Package(requirement.clone()))?;
in_flight.insert(PackageName::normalize(&requirement.name)); in_flight.insert(PackageName::normalize(&requirement.name));
} }
// Resolve the requirements.
let mut resolution: HashMap<PackageName, Version> = HashMap::with_capacity(requirements.len());
while let Some(chunk) = package_stream.next().await { while let Some(chunk) = package_stream.next().await {
for result in chunk { for result in chunk {
let (metadata, requirement): (SimpleJson, Requirement) = result?; let result: Response = result?;
match result {
// Remove this requirement from the in-flight set. Response::Package(metadata, requirement) => {
let normalized_name = PackageName::normalize(&requirement.name);
in_flight.remove(&normalized_name);
// TODO(charlie): Support URLs. Right now, we treat a URL as an unpinned dependency. // TODO(charlie): Support URLs. Right now, we treat a URL as an unpinned dependency.
let specifiers = requirement let specifiers =
requirement
.version_or_url .version_or_url
.as_ref() .as_ref()
.and_then(|version_or_url| match version_or_url { .and_then(|version_or_url| match version_or_url {
@ -92,16 +113,17 @@ pub(crate) async fn install(src: &Path) -> Result<ExitStatus> {
continue; continue;
}; };
// Fetch the metadata for this specific version. package_sink.unbounded_send(Request::Version(requirement, file.clone()))?;
let metadata = client.file(file).await?; }
trace!( Response::Version(metadata, requirement) => {
"Selecting {version} for {requirement}", debug!(
version = metadata.version, "--> selected version {} for {}",
requirement = requirement metadata.version, requirement
); );
// Add to the resolved set. // Add to the resolved set.
let normalized_name = PackageName::normalize(&requirement.name); let normalized_name = PackageName::normalize(&requirement.name);
in_flight.remove(&normalized_name);
resolution.insert(normalized_name, metadata.version); resolution.insert(normalized_name, metadata.version);
// Enqueue its dependencies. // Enqueue its dependencies.
@ -109,20 +131,12 @@ pub(crate) async fn install(src: &Path) -> Result<ExitStatus> {
if !dependency if !dependency
.evaluate_markers(&env, requirement.extras.clone().unwrap_or_default()) .evaluate_markers(&env, requirement.extras.clone().unwrap_or_default())
{ {
trace!("Ignoring {dependency} because it doesn't match the environment"); debug!("--> ignoring {dependency} due to environment mismatch");
continue;
}
if dependency
.extras
.as_ref()
.is_some_and(|extras| !extras.is_empty())
{
trace!("Ignoring {dependency} because it has extras");
continue; continue;
} }
let normalized_name = PackageName::normalize(&dependency.name); let normalized_name = PackageName::normalize(&dependency.name);
if resolution.contains_key(&normalized_name) { if resolution.contains_key(&normalized_name) {
continue; continue;
} }
@ -131,8 +145,11 @@ pub(crate) async fn install(src: &Path) -> Result<ExitStatus> {
continue; continue;
} }
trace!("Enqueueing {dependency}"); debug!("--> adding transitive dependency: {}", dependency);
package_sink.unbounded_send(dependency)?;
package_sink.unbounded_send(Request::Package(dependency))?;
}
}
} }
} }

View file

@ -9,10 +9,11 @@ edition = "2021"
puffin-requirements = { path = "../puffin-requirements" } puffin-requirements = { path = "../puffin-requirements" }
http-cache-reqwest = { version = "0.11.3" } http-cache-reqwest = { version = "0.11.3" }
reqwest = { version = "0.11.22", features = ["json", "gzip", "stream"] } reqwest = { version = "0.11.22", features = ["json", "gzip", "brotli"] }
reqwest-middleware = { version = "0.2.3" } reqwest-middleware = { version = "0.2.3" }
reqwest-retry = { version = "0.3.0" } reqwest-retry = { version = "0.3.0" }
serde = { version = "1.0.188" } serde = { version = "1.0.188" }
serde_json = { version = "1.0.107" } serde_json = { version = "1.0.107" }
thiserror = { version = "1.0.49" } thiserror = { version = "1.0.49" }
url = { version = "2.4.1" } url = { version = "2.4.1" }
tracing = "0.1.37"

View file

@ -1,5 +1,8 @@
use std::fmt::Debug;
use reqwest::StatusCode; use reqwest::StatusCode;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tracing::trace;
use url::Url; use url::Url;
use puffin_requirements::metadata::Metadata21; use puffin_requirements::metadata::Metadata21;
@ -13,6 +16,8 @@ impl PypiClient {
&self, &self,
package_name: impl AsRef<str>, package_name: impl AsRef<str>,
) -> Result<SimpleJson, PypiClientError> { ) -> Result<SimpleJson, PypiClientError> {
let start = std::time::Instant::now();
// Format the URL for PyPI. // Format the URL for PyPI.
let mut url = self.registry.join("simple")?; let mut url = self.registry.join("simple")?;
url.path_segments_mut() url.path_segments_mut()
@ -21,10 +26,20 @@ impl PypiClient {
url.path_segments_mut().unwrap().push(""); url.path_segments_mut().unwrap().push("");
url.set_query(Some("format=application/vnd.pypi.simple.v1+json")); url.set_query(Some("format=application/vnd.pypi.simple.v1+json"));
trace!(
"fetching metadata for {} from {}",
package_name.as_ref(),
url
);
// Fetch from the registry. // Fetch from the registry.
let text = self.simple_impl(package_name, &url).await?; let text = self.simple_impl(&package_name, &url).await?;
serde_json::from_str(&text) let payload = serde_json::from_str(&text)
.map_err(move |e| PypiClientError::from_json_err(e, url.to_string())) .map_err(move |e| PypiClientError::from_json_err(e, "".to_string()));
trace!("fetched metadata for {} in {:?}", url, start.elapsed());
payload
} }
async fn simple_impl( async fn simple_impl(
@ -35,6 +50,7 @@ impl PypiClient {
Ok(self Ok(self
.client .client
.get(url.clone()) .get(url.clone())
.header("Accept-Encoding", "gzip")
.send() .send()
.await? .await?
.error_for_status() .error_for_status()
@ -52,7 +68,9 @@ impl PypiClient {
.await?) .await?)
} }
pub async fn file(&self, file: &File) -> Result<Metadata21, PypiClientError> { pub async fn file(&self, file: File) -> Result<Metadata21, PypiClientError> {
let start = std::time::Instant::now();
// Send to the proxy. // Send to the proxy.
let url = self.proxy.join( let url = self.proxy.join(
file.url file.url
@ -60,9 +78,15 @@ impl PypiClient {
.unwrap(), .unwrap(),
)?; )?;
trace!("fetching file {} from {}", file.filename, url);
// Fetch from the registry. // Fetch from the registry.
let text = self.file_impl(&file.filename, &url).await?; let text = self.file_impl(&file.filename, &url).await?;
Metadata21::parse(text.as_bytes()).map_err(std::convert::Into::into) let payload = Metadata21::parse(text.as_bytes()).map_err(std::convert::Into::into);
trace!("fetched file {} in {:?}", url, start.elapsed());
payload
} }
async fn file_impl( async fn file_impl(
@ -91,7 +115,7 @@ impl PypiClient {
} }
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SimpleJson { pub struct SimpleJson {
pub files: Vec<File>, pub files: Vec<File>,
pub meta: Meta, pub meta: Meta,
@ -99,7 +123,7 @@ pub struct SimpleJson {
pub versions: Vec<String>, pub versions: Vec<String>,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")] #[serde(rename_all = "kebab-case")]
pub struct File { pub struct File {
pub core_metadata: Metadata, pub core_metadata: Metadata,
@ -113,26 +137,26 @@ pub struct File {
pub yanked: Yanked, pub yanked: Yanked,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)] #[serde(untagged)]
pub enum Metadata { pub enum Metadata {
Bool(bool), Bool(bool),
Hashes(Hashes), Hashes(Hashes),
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)] #[serde(untagged)]
pub enum Yanked { pub enum Yanked {
Bool(bool), Bool(bool),
Reason(String), Reason(String),
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Hashes { pub struct Hashes {
pub sha256: String, pub sha256: String,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")] #[serde(rename_all = "kebab-case")]
pub struct Meta { pub struct Meta {
#[serde(rename = "_last-serial")] #[serde(rename = "_last-serial")]

View file

@ -1,4 +1,4 @@
pub use api::SimpleJson; pub use api::{File, SimpleJson};
pub use client::PypiClientBuilder; pub use client::PypiClientBuilder;
mod api; mod api;