Optional managed Python archive download cache (#12175)

Part of #11834

Currently, all Python installation are a streaming download-and-extract.
With this PR, we add the `UV_PYTHON_CACHE_DIR` variable. When set, the
installation is split into downloading the interpreter into
`UV_PYTHON_CACHE_DIR` and extracting it there from a second step. If the
archive is already present in `UV_PYTHON_CACHE_DIR`, we skip the
download.

The feature can be used to speed up tests and CI. Locally for me, `cargo
test -p uv -- python_install` goes from 43s to 7s (1,7s in release mode)
when setting `UV_PYTHON_CACHE_DIR`. It can also be used for offline
installation of Python interpreter, by copying the archives to a
directory in the offline machine, while the path rewriting is still
performed on the target machine on installation.
This commit is contained in:
konsti 2025-04-28 12:09:09 +02:00 committed by GitHub
parent cfe82dc22a
commit b33a19689c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 371 additions and 96 deletions

View file

@ -1,12 +1,12 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::Display;
use std::io;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::str::FromStr;
use std::task::{Context, Poll};
use std::time::{Duration, SystemTime};
use std::{env, io};
use futures::TryStreamExt;
use itertools::Itertools;
@ -15,13 +15,13 @@ use owo_colors::OwoColorize;
use reqwest_retry::RetryPolicy;
use serde::Deserialize;
use thiserror::Error;
use tokio::io::{AsyncRead, ReadBuf};
use tokio::io::{AsyncRead, AsyncWriteExt, BufWriter, ReadBuf};
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio_util::either::Either;
use tracing::{debug, instrument};
use url::Url;
use uv_client::{is_extended_transient_error, WrappedReqwestError};
use uv_client::{is_extended_transient_error, BaseClient, WrappedReqwestError};
use uv_distribution_filename::{ExtensionError, SourceDistExtension};
use uv_extract::hash::Hasher;
use uv_fs::{rename_with_retry, Simplified};
@ -96,6 +96,12 @@ pub enum Error {
RemoteJSONNotSupported(),
#[error("The json of the python downloads is invalid: {0}")]
InvalidPythonDownloadsJSON(String, #[source] serde_json::Error),
#[error("An offline Python installation was requested, but {file} (from {url}) is missing in {}", python_builds_dir.user_display())]
OfflinePythonMissing {
file: Box<PythonInstallationKey>,
url: Box<Url>,
python_builds_dir: PathBuf,
},
}
#[derive(Debug, PartialEq, Clone)]
@ -509,6 +515,7 @@ impl ManagedPythonDownload {
Err(Error::NoDownloadFound(request.clone()))
}
//noinspection RsUnresolvedPath - RustRover can't see through the `include!`
/// Iterate over all [`ManagedPythonDownload`]s.
pub fn iter_all() -> Result<impl Iterator<Item = &'static ManagedPythonDownload>, Error> {
let runtime_source = std::env::var(EnvVars::UV_PYTHON_DOWNLOADS_JSON_URL);
@ -560,7 +567,7 @@ impl ManagedPythonDownload {
#[instrument(skip(client, installation_dir, scratch_dir, reporter), fields(download = % self.key()))]
pub async fn fetch_with_retry(
&self,
client: &uv_client::BaseClient,
client: &BaseClient,
installation_dir: &Path,
scratch_dir: &Path,
reinstall: bool,
@ -610,7 +617,7 @@ impl ManagedPythonDownload {
#[instrument(skip(client, installation_dir, scratch_dir, reporter), fields(download = % self.key()))]
pub async fn fetch(
&self,
client: &uv_client::BaseClient,
client: &BaseClient,
installation_dir: &Path,
scratch_dir: &Path,
reinstall: bool,
@ -626,62 +633,109 @@ impl ManagedPythonDownload {
return Ok(DownloadResult::AlreadyAvailable(path));
}
let filename = url.path_segments().unwrap().next_back().unwrap();
let ext = SourceDistExtension::from_path(filename)
// We improve filesystem compatibility by using neither the URL-encoded `%2B` nor the `+` it
// decodes to.
let filename = url
.path_segments()
.unwrap()
.next_back()
.unwrap()
.replace("%2B", "-");
debug_assert!(
filename
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.'),
"Unexpected char in filename: {filename}"
);
let ext = SourceDistExtension::from_path(&filename)
.map_err(|err| Error::MissingExtension(url.to_string(), err))?;
let (reader, size) = read_url(&url, client).await?;
let progress = reporter
.as_ref()
.map(|reporter| (reporter, reporter.on_download_start(&self.key, size)));
// Download and extract into a temporary directory.
let temp_dir = tempfile::tempdir_in(scratch_dir).map_err(Error::DownloadDirError)?;
debug!(
"Downloading {url} to temporary location: {}",
temp_dir.path().simplified_display()
);
if let Some(python_builds_dir) = env::var_os(EnvVars::UV_PYTHON_CACHE_DIR) {
let python_builds_dir = PathBuf::from(python_builds_dir);
fs_err::create_dir_all(&python_builds_dir)?;
let hash_prefix = match self.sha256 {
Some(sha) => {
// Shorten the hash to avoid too-long-filename errors
&sha[..9]
}
None => "none",
};
let target_cache_file = python_builds_dir.join(format!("{hash_prefix}-{filename}"));
let mut hashers = self
.sha256
.into_iter()
.map(|_| Hasher::from(HashAlgorithm::Sha256))
.collect::<Vec<_>>();
let mut hasher = uv_extract::hash::HashReader::new(reader, &mut hashers);
// Download the archive to the cache, or return a reader if we have it in cache.
// TODO(konsti): We should "tee" the write so we can do the download-to-cache and unpacking
// in one step.
let (reader, size): (Box<dyn AsyncRead + Unpin>, Option<u64>) =
match fs_err::tokio::File::open(&target_cache_file).await {
Ok(file) => {
debug!(
"Extracting existing `{}`",
target_cache_file.simplified_display()
);
let size = file.metadata().await?.len();
let reader = Box::new(tokio::io::BufReader::new(file));
(reader, Some(size))
}
Err(err) if err.kind() == io::ErrorKind::NotFound => {
// Point the user to which file is missing where and where to download it
if client.connectivity().is_offline() {
return Err(Error::OfflinePythonMissing {
file: Box::new(self.key().clone()),
url: Box::new(url),
python_builds_dir,
});
}
debug!("Extracting {filename}");
self.download_archive(
&url,
client,
reporter,
&python_builds_dir,
&target_cache_file,
)
.await?;
match progress {
Some((&reporter, progress)) => {
let mut reader = ProgressReader::new(&mut hasher, progress, reporter);
uv_extract::stream::archive(&mut reader, ext, temp_dir.path())
.await
.map_err(|err| Error::ExtractError(filename.to_string(), err))?;
}
None => {
uv_extract::stream::archive(&mut hasher, ext, temp_dir.path())
.await
.map_err(|err| Error::ExtractError(filename.to_string(), err))?;
}
}
debug!("Extracting `{}`", target_cache_file.simplified_display());
let file = fs_err::tokio::File::open(&target_cache_file).await?;
let size = file.metadata().await?.len();
let reader = Box::new(tokio::io::BufReader::new(file));
(reader, Some(size))
}
Err(err) => return Err(err.into()),
};
hasher.finish().await.map_err(Error::HashExhaustion)?;
// Extract the downloaded archive into a temporary directory.
self.extract_reader(
reader,
temp_dir.path(),
&filename,
ext,
size,
reporter,
Direction::Extract,
)
.await?;
} else {
// Avoid overlong log lines
debug!("Downloading {url}");
debug!(
"Extracting {filename} to temporary location: {}",
temp_dir.path().simplified_display()
);
if let Some((&reporter, progress)) = progress {
reporter.on_progress(&self.key, progress);
}
// Check the hash
if let Some(expected) = self.sha256 {
let actual = HashDigest::from(hashers.pop().unwrap()).digest;
if !actual.eq_ignore_ascii_case(expected) {
return Err(Error::HashMismatch {
installation: self.key.to_string(),
expected: expected.to_string(),
actual: actual.to_string(),
});
}
let (reader, size) = read_url(&url, client).await?;
self.extract_reader(
reader,
temp_dir.path(),
&filename,
ext,
size,
reporter,
Direction::Download,
)
.await?;
}
// Extract the top-level directory.
@ -729,6 +783,97 @@ impl ManagedPythonDownload {
Ok(DownloadResult::Fetched(path))
}
/// Download the managed Python archive into the cache directory.
async fn download_archive(
&self,
url: &Url,
client: &BaseClient,
reporter: Option<&dyn Reporter>,
python_builds_dir: &Path,
target_cache_file: &Path,
) -> Result<(), Error> {
debug!(
"Downloading {} to `{}`",
url,
target_cache_file.simplified_display()
);
let (mut reader, size) = read_url(url, client).await?;
let temp_dir = tempfile::tempdir_in(python_builds_dir)?;
let temp_file = temp_dir.path().join("download");
// Download to a temporary file. We verify the hash when unpacking the file.
{
let mut archive_writer = BufWriter::new(fs_err::tokio::File::create(&temp_file).await?);
// Download with or without progress bar.
if let Some(reporter) = reporter {
let key = reporter.on_request_start(Direction::Download, &self.key, size);
tokio::io::copy(
&mut ProgressReader::new(reader, key, reporter),
&mut archive_writer,
)
.await?;
reporter.on_request_complete(Direction::Download, key);
} else {
tokio::io::copy(&mut reader, &mut archive_writer).await?;
}
archive_writer.flush().await?;
}
// Move the completed file into place, invalidating the `File` instance.
fs_err::rename(&temp_file, target_cache_file)?;
Ok(())
}
/// Extract a Python interpreter archive into a (temporary) directory, either from a file or
/// from a download stream.
async fn extract_reader(
&self,
reader: impl AsyncRead + Unpin,
target: &Path,
filename: &String,
ext: SourceDistExtension,
size: Option<u64>,
reporter: Option<&dyn Reporter>,
direction: Direction,
) -> Result<(), Error> {
let mut hashers = self
.sha256
.into_iter()
.map(|_| Hasher::from(HashAlgorithm::Sha256))
.collect::<Vec<_>>();
let mut hasher = uv_extract::hash::HashReader::new(reader, &mut hashers);
if let Some(reporter) = reporter {
let progress_key = reporter.on_request_start(direction, &self.key, size);
let mut reader = ProgressReader::new(&mut hasher, progress_key, reporter);
uv_extract::stream::archive(&mut reader, ext, target)
.await
.map_err(|err| Error::ExtractError(filename.to_string(), err))?;
reporter.on_request_complete(direction, progress_key);
} else {
uv_extract::stream::archive(&mut hasher, ext, target)
.await
.map_err(|err| Error::ExtractError(filename.to_string(), err))?;
}
hasher.finish().await.map_err(Error::HashExhaustion)?;
// Check the hash
if let Some(expected) = self.sha256 {
let actual = HashDigest::from(hashers.pop().unwrap()).digest;
if !actual.eq_ignore_ascii_case(expected) {
return Err(Error::HashMismatch {
installation: self.key.to_string(),
expected: expected.to_string(),
actual: actual.to_string(),
});
}
}
Ok(())
}
pub fn python_version(&self) -> PythonVersion {
self.key.version()
}
@ -905,11 +1050,36 @@ impl Display for ManagedPythonDownload {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Direction {
Download,
Extract,
}
impl Direction {
fn as_str(&self) -> &str {
match self {
Direction::Download => "download",
Direction::Extract => "extract",
}
}
}
impl Display for Direction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
pub trait Reporter: Send + Sync {
fn on_progress(&self, name: &PythonInstallationKey, id: usize);
fn on_download_start(&self, name: &PythonInstallationKey, size: Option<u64>) -> usize;
fn on_download_progress(&self, id: usize, inc: u64);
fn on_download_complete(&self);
fn on_request_start(
&self,
direction: Direction,
name: &PythonInstallationKey,
size: Option<u64>,
) -> usize;
fn on_request_progress(&self, id: usize, inc: u64);
fn on_request_complete(&self, direction: Direction, id: usize);
}
/// An asynchronous reader that reports progress as bytes are read.
@ -943,7 +1113,7 @@ where
.poll_read(cx, buf)
.map_ok(|()| {
self.reporter
.on_download_progress(self.index, buf.filled().len() as u64);
.on_request_progress(self.index, buf.filled().len() as u64);
})
}
}
@ -951,7 +1121,7 @@ where
/// Convert a [`Url`] into an [`AsyncRead`] stream.
async fn read_url(
url: &Url,
client: &uv_client::BaseClient,
client: &BaseClient,
) -> Result<(impl AsyncRead + Unpin, Option<u64>), Error> {
if url.scheme() == "file" {
// Loads downloaded distribution from the given `file://` URL.