mirror of
https://github.com/astral-sh/uv.git
synced 2025-07-07 13:25:00 +00:00

## Summary Adds regular expression based version filter to python mirror script. ## Test Plan Manually using `uv run ./scripts/create-python-mirror.py --name cpython --arch x86_64 --os linux --version "3.13.\d+$"`
292 lines
9.3 KiB
Python
292 lines
9.3 KiB
Python
"""Create a mirror of Python distributions for use with uv.
|
|
|
|
Example usage:
|
|
uv run ./scripts/create-python-mirror.py --name cpython --arch x86_64 --os linux
|
|
"""
|
|
|
|
# /// script
|
|
# requires-python = ">=3.8"
|
|
# dependencies = [
|
|
# "gitpython",
|
|
# "httpx",
|
|
# "tqdm",
|
|
# ]
|
|
# ///
|
|
|
|
import argparse
|
|
import asyncio
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Set, Tuple
|
|
from urllib.parse import unquote
|
|
|
|
import httpx
|
|
from git import GitCommandError, Repo
|
|
from tqdm import tqdm
|
|
|
|
SELF_DIR = Path(__file__).parent
|
|
REPO_ROOT = SELF_DIR.parent
|
|
VERSIONS_FILE = REPO_ROOT / "crates" / "uv-python" / "download-metadata.json"
|
|
PREFIXES = [
|
|
"https://github.com/astral-sh/python-build-standalone/releases/download/",
|
|
"https://downloads.python.org/pypy/",
|
|
]
|
|
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
logging.getLogger("httpx").setLevel(logging.WARNING)
|
|
logging.getLogger("httpcore").setLevel(logging.WARNING)
|
|
|
|
|
|
def sanitize_url(url: str) -> Path:
|
|
"""Remove the prefix from the URL, decode it, and convert it to a relative path."""
|
|
for prefix in PREFIXES:
|
|
if url.startswith(prefix):
|
|
return Path(unquote(url[len(prefix) :])) # Decode the URL path
|
|
return Path(unquote(url)) # Fallback to full decoded path if no prefix matched
|
|
|
|
|
|
def sha256_checksum(file_path: Path) -> str:
|
|
"""Calculate the SHA-256 checksum of a file."""
|
|
hasher = hashlib.sha256()
|
|
with open(file_path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(8192), b""):
|
|
hasher.update(chunk)
|
|
return hasher.hexdigest()
|
|
|
|
|
|
def collect_metadata_from_git_history() -> List[Dict]:
|
|
"""Collect all metadata entries from the history of the VERSIONS_FILE."""
|
|
metadata = []
|
|
try:
|
|
repo = Repo(REPO_ROOT, search_parent_directories=True)
|
|
|
|
for commit in repo.iter_commits(paths=VERSIONS_FILE):
|
|
try:
|
|
# Ensure the file exists in the commit tree
|
|
blob = commit.tree / str(VERSIONS_FILE.relative_to(REPO_ROOT))
|
|
content = blob.data_stream.read().decode()
|
|
data = json.loads(content)
|
|
metadata.extend(data.values())
|
|
except KeyError:
|
|
logger.warning(
|
|
f"File {VERSIONS_FILE} not found in commit {commit.hexsha}. Skipping."
|
|
)
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Error decoding JSON in commit {commit.hexsha}: {e}")
|
|
|
|
except GitCommandError as e:
|
|
logger.error(f"Git command error: {e}")
|
|
except Exception as e:
|
|
logger.exception(f"Unexpected error while collecting metadata: {e}")
|
|
|
|
return metadata
|
|
|
|
|
|
def check_arch(entry, arch):
|
|
"""Checks whether arch entry in metadata matches the provided filter."""
|
|
if isinstance(entry, str):
|
|
return entry == arch
|
|
elif isinstance(entry, dict) and "family" in entry:
|
|
return entry["family"] == arch
|
|
return False
|
|
|
|
|
|
def match_version(entry, pattern):
|
|
"""Checks whether pattern matches against the entries version."""
|
|
vers = f"{entry['major']}.{entry['minor']}.{entry['patch']}"
|
|
if entry["prerelease"] != "":
|
|
vers += f"-{entry['prerelease']}"
|
|
return pattern.match(vers) is not None
|
|
|
|
|
|
def filter_metadata(
|
|
metadata: List[Dict],
|
|
name: Optional[str],
|
|
arch: Optional[str],
|
|
os: Optional[str],
|
|
version: Optional[re.Pattern],
|
|
) -> List[Dict]:
|
|
"""Filter the metadata based on name, architecture, and OS, ensuring unique URLs."""
|
|
filtered = [
|
|
entry
|
|
for entry in metadata
|
|
if (not name or entry["name"] == name)
|
|
and (not arch or check_arch(entry["arch"], arch))
|
|
and (not os or entry["os"] == os)
|
|
and (not version or match_version(entry, version))
|
|
]
|
|
# Use a set to ensure unique URLs
|
|
unique_urls = set()
|
|
unique_filtered = []
|
|
for entry in filtered:
|
|
if entry["url"] not in unique_urls:
|
|
unique_urls.add(entry["url"])
|
|
unique_filtered.append(entry)
|
|
return unique_filtered
|
|
|
|
|
|
async def download_file(
|
|
client: httpx.AsyncClient,
|
|
url: str,
|
|
dest: Path,
|
|
expected_sha256: Optional[str],
|
|
progress_bar,
|
|
errors,
|
|
):
|
|
"""Download a file and verify its SHA-256 checksum if provided."""
|
|
if dest.exists() and expected_sha256 and sha256_checksum(dest) == expected_sha256:
|
|
logger.debug(
|
|
f"File {dest} already exists and SHA-256 matches. Skipping download."
|
|
)
|
|
progress_bar.update(1)
|
|
return True # Success, even though skipped
|
|
elif dest.exists() and expected_sha256 is None:
|
|
logger.debug(
|
|
f"File {dest} already exists no SHA-256 provided. Skipping download."
|
|
)
|
|
progress_bar.update(1)
|
|
return True # Success, even though skipped
|
|
|
|
if not any(url.startswith(prefix) for prefix in PREFIXES):
|
|
error_msg = f"No valid prefix found for {url}. Skipping."
|
|
logger.warning(error_msg)
|
|
errors.append((url, error_msg))
|
|
progress_bar.update(1)
|
|
return False
|
|
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
logger.debug(f"Downloading {url} to {dest}")
|
|
|
|
try:
|
|
async with client.stream("GET", url) as response:
|
|
response.raise_for_status()
|
|
with open(dest, "wb") as f:
|
|
async for chunk in response.aiter_bytes():
|
|
f.write(chunk)
|
|
|
|
if expected_sha256 and sha256_checksum(dest) != expected_sha256:
|
|
error_msg = f"SHA-256 mismatch for {dest}. Deleting corrupted file."
|
|
logger.error(error_msg)
|
|
dest.unlink()
|
|
errors.append((url, "Checksum mismatch"))
|
|
progress_bar.update(1)
|
|
return False
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to download {url}: {str(e)}"
|
|
logger.error(error_msg)
|
|
errors.append((url, str(e)))
|
|
progress_bar.update(1)
|
|
return False
|
|
|
|
progress_bar.update(1)
|
|
return True
|
|
|
|
|
|
async def download_files(
|
|
urls: Set[Tuple[str, Optional[str]]], target: Path, max_concurrent: int
|
|
):
|
|
"""Download files with a limit on concurrent downloads using httpx."""
|
|
async with httpx.AsyncClient(follow_redirects=True) as client:
|
|
progress_bar = tqdm(total=len(urls), desc="Downloading", unit="file")
|
|
sem = asyncio.Semaphore(max_concurrent)
|
|
errors: List[Tuple[str, str]] = [] # To collect errors
|
|
success_count = 0 # Track number of successful downloads
|
|
|
|
async def sem_download(url, sha256):
|
|
nonlocal success_count
|
|
async with sem:
|
|
success = await download_file(
|
|
client,
|
|
url,
|
|
target / sanitize_url(url),
|
|
sha256,
|
|
progress_bar,
|
|
errors,
|
|
)
|
|
if success:
|
|
success_count += 1
|
|
|
|
tasks = [sem_download(url, sha256) for url, sha256 in urls]
|
|
await asyncio.gather(*tasks)
|
|
progress_bar.close()
|
|
|
|
return success_count, errors
|
|
|
|
|
|
def parse_arguments():
|
|
"""Parse command-line arguments using argparse."""
|
|
parser = argparse.ArgumentParser(description="Download and mirror Python builds.")
|
|
parser.add_argument("--name", help="Filter by name (e.g., 'cpython').")
|
|
parser.add_argument("--arch", help="Filter by architecture (e.g., 'aarch64').")
|
|
parser.add_argument("--os", help="Filter by operating system (e.g., 'darwin').")
|
|
parser.add_argument(
|
|
"--version", help="Filter version by regex (e.g., '3.13.\\d+$')."
|
|
)
|
|
parser.add_argument(
|
|
"--max-concurrent",
|
|
type=int,
|
|
default=20,
|
|
help="Maximum number of simultaneous downloads.",
|
|
)
|
|
parser.add_argument(
|
|
"--from-all-history",
|
|
action="store_true",
|
|
help="Collect URLs from the entire git history.",
|
|
)
|
|
parser.add_argument(
|
|
"--target",
|
|
default=SELF_DIR / "mirror",
|
|
help="Directory to store the downloaded files.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def main():
|
|
"""Main function to run the CLI."""
|
|
args = parse_arguments()
|
|
|
|
if args.from_all_history:
|
|
metadata = collect_metadata_from_git_history()
|
|
else:
|
|
with open(VERSIONS_FILE) as f:
|
|
metadata = list(json.load(f).values())
|
|
|
|
version = re.compile(args.version) if args.version else None
|
|
filtered_metadata = filter_metadata(
|
|
metadata, args.name, args.arch, args.os, version
|
|
)
|
|
urls = {(entry["url"], entry["sha256"]) for entry in filtered_metadata}
|
|
|
|
if not urls:
|
|
logger.error("No URLs found.")
|
|
return
|
|
|
|
target = Path(args.target)
|
|
logger.info(f"Downloading {len(urls)} files to {target}...")
|
|
try:
|
|
success_count, errors = asyncio.run(
|
|
download_files(urls, target, args.max_concurrent)
|
|
)
|
|
print(f"Successfully downloaded: {success_count} files.")
|
|
if errors:
|
|
print("Failed downloads:")
|
|
for url, error in errors:
|
|
print(f"- {url}: {error}")
|
|
print(
|
|
f"Example usage: `UV_PYTHON_INSTALL_MIRROR='file://{target.absolute()}' uv python install 3.13`"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error during download: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|