Parallelize wheel installations with Rayon (#84)

It looks like using _either_ async Rust with a `JoinSet` _or_
parallelizing a fixed threadpool with Rayon provide about a ~5% speed-up
over our current serial approach:

```console
❯ hyperfine --runs 30 --warmup 5 --prepare "./target/release/puffin venv .venv" \
  "./target/release/rayon sync ./scripts/benchmarks/requirements-large.txt" \
  "./target/release/async sync ./scripts/benchmarks/requirements-large.txt" \
  "./target/release/main sync ./scripts/benchmarks/requirements-large.txt"
Benchmark 1: ./target/release/rayon sync ./scripts/benchmarks/requirements-large.txt
  Time (mean ± σ):     295.7 ms ±  16.9 ms    [User: 28.6 ms, System: 263.3 ms]
  Range (min … max):   249.2 ms … 315.9 ms    30 runs

Benchmark 2: ./target/release/async sync ./scripts/benchmarks/requirements-large.txt
  Time (mean ± σ):     296.2 ms ±  20.2 ms    [User: 36.1 ms, System: 340.1 ms]
  Range (min … max):   258.0 ms … 359.4 ms    30 runs

Benchmark 3: ./target/release/main sync ./scripts/benchmarks/requirements-large.txt
  Time (mean ± σ):     306.6 ms ±  19.5 ms    [User: 25.3 ms, System: 220.5 ms]
  Range (min … max):   269.6 ms … 332.2 ms    30 runs

Summary
  './target/release/rayon sync ./scripts/benchmarks/requirements-large.txt' ran
    1.00 ± 0.09 times faster than './target/release/async sync ./scripts/benchmarks/requirements-large.txt'
    1.04 ± 0.09 times faster than './target/release/main sync ./scripts/benchmarks/requirements-large.txt'
```

It's much easier to just parallelize with Rayon and avoid async in the
underlying wheel code, so this PR takes that approach for now.
This commit is contained in:
Charlie Marsh 2023-10-10 23:46:30 -04:00 committed by GitHub
parent ed68d31e03
commit 85162d1111
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 65 additions and 44 deletions

View file

@ -73,7 +73,7 @@ impl AsRef<Path> for LockedDir {
/// We use a lockfile to prevent multiple instance writing stuff on the same time
/// As of pip 22.0, e.g. `pip install numpy; pip install numpy; pip install numpy` will
/// non-deterministically fail.
pub struct InstallLocation<T: AsRef<Path>> {
pub struct InstallLocation<T> {
/// absolute path
venv_base: T,
python_version: (u8, u8),

View file

@ -10,7 +10,7 @@ use fs_err::File;
use mailparse::MailHeaderMap;
use tracing::{debug, span, Level};
use crate::install_location::{InstallLocation, LockedDir};
use crate::install_location::InstallLocation;
use crate::wheel::{
extra_dist_info, install_data, parse_wheel_version, read_scripts_from_section,
write_script_entrypoints,
@ -24,7 +24,10 @@ use crate::{read_record_file, Error, Script};
/// <https://packaging.python.org/en/latest/specifications/binary-distribution-format/#installing-a-wheel-distribution-1-0-py32-none-any-whl>
///
/// Wheel 1.0: <https://www.python.org/dev/peps/pep-0427/>
pub fn install_wheel(location: &InstallLocation<LockedDir>, wheel: &Path) -> Result<(), Error> {
pub fn install_wheel(
location: &InstallLocation<impl AsRef<Path>>,
wheel: impl AsRef<Path>,
) -> Result<(), Error> {
let base_location = location.venv_base();
// TODO(charlie): Pass this in.
@ -43,8 +46,8 @@ pub fn install_wheel(location: &InstallLocation<LockedDir>, wheel: &Path) -> Res
.join("site-packages")
};
let dist_info_prefix = find_dist_info(wheel)?;
let (name, _version) = read_metadata(&dist_info_prefix, wheel)?;
let dist_info_prefix = find_dist_info(&wheel)?;
let (name, _version) = read_metadata(&dist_info_prefix, &wheel)?;
let _my_span = span!(Level::DEBUG, "install_wheel", name);
@ -52,7 +55,9 @@ pub fn install_wheel(location: &InstallLocation<LockedDir>, wheel: &Path) -> Res
// https://packaging.python.org/en/latest/specifications/binary-distribution-format/#installing-a-wheel-distribution-1-0-py32-none-any-whl
// > 1.a Parse distribution-1.0.dist-info/WHEEL.
// > 1.b Check that installer is compatible with Wheel-Version. Warn if minor version is greater, abort if major version is greater.
let wheel_file_path = wheel.join(format!("{dist_info_prefix}.dist-info/WHEEL"));
let wheel_file_path = wheel
.as_ref()
.join(format!("{dist_info_prefix}.dist-info/WHEEL"));
let wheel_text = std::fs::read_to_string(&wheel_file_path)?;
parse_wheel_version(&wheel_text)?;
@ -60,15 +65,19 @@ pub fn install_wheel(location: &InstallLocation<LockedDir>, wheel: &Path) -> Res
// > 1.d Else unpack archive into platlib (site-packages).
// We always install in the same virtualenv site packages
debug!(name, "Extracting file");
let num_unpacked = unpack_wheel_files(&site_packages, wheel)?;
let num_unpacked = unpack_wheel_files(&site_packages, &wheel)?;
debug!(name, "Extracted {num_unpacked} files");
// Read the RECORD file.
let mut record_file = File::open(&wheel.join(format!("{dist_info_prefix}.dist-info/RECORD")))?;
let mut record_file = File::open(
wheel
.as_ref()
.join(format!("{dist_info_prefix}.dist-info/RECORD")),
)?;
let mut record = read_record_file(&mut record_file)?;
debug!(name, "Writing entrypoints");
let (console_scripts, gui_scripts) = parse_scripts(wheel, &dist_info_prefix, None)?;
let (console_scripts, gui_scripts) = parse_scripts(&wheel, &dist_info_prefix, None)?;
write_script_entrypoints(&site_packages, location, &console_scripts, &mut record)?;
write_script_entrypoints(&site_packages, location, &gui_scripts, &mut record)?;
@ -117,7 +126,7 @@ pub fn install_wheel(location: &InstallLocation<LockedDir>, wheel: &Path) -> Res
/// Either way, we just search the wheel for the name
///
/// <https://github.com/PyO3/python-pkginfo-rs>
fn find_dist_info(path: &Path) -> Result<String, Error> {
fn find_dist_info(path: impl AsRef<Path>) -> Result<String, Error> {
// Iterate over `path` to find the `.dist-info` directory. It should be at the top-level.
let Some(dist_info) = std::fs::read_dir(path)?.find_map(|entry| {
let entry = entry.ok()?;
@ -147,8 +156,13 @@ fn find_dist_info(path: &Path) -> Result<String, Error> {
}
/// <https://github.com/PyO3/python-pkginfo-rs>
fn read_metadata(dist_info_prefix: &str, wheel: &Path) -> Result<(String, String), Error> {
let metadata_file = wheel.join(format!("{dist_info_prefix}.dist-info/METADATA"));
fn read_metadata(
dist_info_prefix: &str,
wheel: impl AsRef<Path>,
) -> Result<(String, String), Error> {
let metadata_file = wheel
.as_ref()
.join(format!("{dist_info_prefix}.dist-info/METADATA"));
// Read into a buffer.
let mut content = Vec::new();
@ -197,11 +211,13 @@ fn read_metadata(dist_info_prefix: &str, wheel: &Path) -> Result<(String, String
///
/// Extras are supposed to be ignored, which happens if you pass None for extras
fn parse_scripts(
wheel: &Path,
wheel: impl AsRef<Path>,
dist_info_prefix: &str,
extras: Option<&[String]>,
) -> Result<(Vec<Script>, Vec<Script>), Error> {
let entry_points_path = wheel.join(format!("{dist_info_prefix}.dist-info/entry_points.txt"));
let entry_points_path = wheel
.as_ref()
.join(format!("{dist_info_prefix}.dist-info/entry_points.txt"));
// Read the entry points mapping. If the file doesn't exist, we just return an empty mapping.
let Ok(ini) = std::fs::read_to_string(entry_points_path) else {
@ -229,7 +245,10 @@ fn parse_scripts(
/// Extract all files from the wheel into the site packages.
#[cfg(any(target_os = "macos", target_os = "ios"))]
fn unpack_wheel_files(site_packages: &Path, wheel: &Path) -> Result<usize, Error> {
fn unpack_wheel_files(
site_packages: impl AsRef<Path>,
wheel: impl AsRef<Path>,
) -> Result<usize, Error> {
use crate::reflink::reflink;
let mut count = 0usize;
@ -237,10 +256,12 @@ fn unpack_wheel_files(site_packages: &Path, wheel: &Path) -> Result<usize, Error
// On macOS, directly can be recursively copied with a single `clonefile` call.
// So we only need to iterate over the top-level of the directory, and copy each file or
// subdirectory.
for entry in std::fs::read_dir(wheel)? {
for entry in std::fs::read_dir(&wheel)? {
let entry = entry?;
let from = entry.path();
let to = site_packages.join(from.strip_prefix(wheel).unwrap());
let to = site_packages
.as_ref()
.join(from.strip_prefix(&wheel).unwrap());
// Delete the destination if it already exists.
if let Ok(metadata) = to.metadata() {
@ -262,14 +283,17 @@ fn unpack_wheel_files(site_packages: &Path, wheel: &Path) -> Result<usize, Error
/// Extract all files from the wheel into the site packages
#[cfg(not(any(target_os = "macos", target_os = "ios")))]
fn unpack_wheel_files(site_packages: &Path, wheel: &Path) -> Result<usize, Error> {
fn unpack_wheel_files(
site_packages: impl AsRef<Path>,
wheel: impl AsRef<Path>,
) -> Result<usize, Error> {
let mut count = 0usize;
// Walk over the directory.
for entry in walkdir::WalkDir::new(wheel) {
for entry in walkdir::WalkDir::new(&wheel) {
let entry = entry?;
let relative = entry.path().strip_prefix(wheel).unwrap();
let out_path = site_packages.join(relative);
let relative = entry.path().strip_prefix(&wheel).unwrap();
let out_path = site_packages.as_ref().join(relative);
if entry.file_type().is_dir() {
fs::create_dir_all(&out_path)?;

View file

@ -261,8 +261,8 @@ fn unpack_wheel_files<R: Read + Seek>(
Ok(extracted_paths)
}
pub(crate) fn get_shebang(location: &InstallLocation<LockedDir>) -> String {
let path = location.python().display().to_string();
pub(crate) fn get_shebang(location: &InstallLocation<impl AsRef<Path>>) -> String {
let path = location.python().to_string_lossy().to_string();
let path = if cfg!(windows) {
// https://stackoverflow.com/a/50323079
const VERBATIM_PREFIX: &str = r"\\?\";
@ -326,7 +326,7 @@ pub(crate) fn windows_script_launcher(launcher_python_script: &str) -> Result<Ve
/// TODO: Test for this launcher directly in install-wheel-rs
pub(crate) fn write_script_entrypoints(
site_packages: &Path,
location: &InstallLocation<LockedDir>,
location: &InstallLocation<impl AsRef<Path>>,
entrypoints: &[Script],
record: &mut Vec<RecordEntry>,
) -> Result<(), Error> {
@ -649,7 +649,7 @@ fn install_script(
site_packages: &Path,
record: &mut [RecordEntry],
file: &DirEntry,
location: &InstallLocation<LockedDir>,
location: &InstallLocation<impl AsRef<Path>>,
) -> Result<(), Error> {
let path = file.path();
if !path.is_file() {
@ -726,7 +726,7 @@ pub(crate) fn install_data(
site_packages: &Path,
data_dir: &Path,
dist_name: &str,
location: &InstallLocation<LockedDir>,
location: &InstallLocation<impl AsRef<Path>>,
console_scripts: &[Script],
gui_scripts: &[Script],
record: &mut [RecordEntry],

View file

@ -1,4 +1,5 @@
use anyhow::Result;
use anyhow::{Error, Result};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use pep440_rs::Version;
use puffin_interpreter::PythonExecutable;
@ -31,26 +32,22 @@ impl<'a> Installer<'a> {
/// Install a set of wheels into a Python virtual environment.
pub fn install(self, wheels: &[LocalDistribution]) -> Result<()> {
// Install each wheel.
let location = install_wheel_rs::InstallLocation::new(
self.python.venv().to_path_buf(),
self.python.simple_version(),
);
let locked_dir = location.acquire_lock()?;
tokio::task::block_in_place(|| {
wheels.par_iter().try_for_each(|wheel| {
let location = install_wheel_rs::InstallLocation::new(
self.python.venv().to_path_buf(),
self.python.simple_version(),
);
for wheel in wheels {
install_wheel_rs::unpacked::install_wheel(&locked_dir, wheel.path())?;
install_wheel_rs::unpacked::install_wheel(&location, wheel.path())?;
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_install_progress(wheel.name(), wheel.version());
}
}
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_install_progress(wheel.name(), wheel.version());
}
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_install_complete();
}
Ok(())
Ok::<(), Error>(())
})
})
}
}