Lock directories to synchronize wheel-install copies (#4978)

## Summary

Closes https://github.com/astral-sh/uv/issues/4831.
This commit is contained in:
Charlie Marsh 2024-07-11 17:53:20 -07:00 committed by GitHub
parent 22cca77329
commit 55b41d7d3d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 54 additions and 18 deletions

View file

@ -1,13 +1,15 @@
//! Like `wheel.rs`, but for installing wheels that have already been unzipped, rather than //! Like `wheel.rs`, but for installing wheels that have already been unzipped, rather than
//! reading from a zip file. //! reading from a zip file.
use std::path::Path; use std::path::{Path, PathBuf};
use std::str::FromStr; use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::SystemTime; use std::time::SystemTime;
use fs_err as fs; use fs_err as fs;
use fs_err::{DirEntry, File}; use fs_err::{DirEntry, File};
use reflink_copy as reflink; use reflink_copy as reflink;
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tempfile::tempdir_in; use tempfile::tempdir_in;
use tracing::{debug, instrument}; use tracing::{debug, instrument};
@ -25,6 +27,9 @@ use crate::wheel::{
}; };
use crate::{Error, Layout}; use crate::{Error, Layout};
#[derive(Debug, Default)]
pub struct Locks(Mutex<FxHashMap<PathBuf, Arc<Mutex<()>>>>);
/// Install the given wheel to the given venv /// Install the given wheel to the given venv
/// ///
/// The caller must ensure that the wheel is compatible to the environment. /// The caller must ensure that the wheel is compatible to the environment.
@ -40,6 +45,7 @@ pub fn install_wheel(
direct_url: Option<&DirectUrl>, direct_url: Option<&DirectUrl>,
installer: Option<&str>, installer: Option<&str>,
link_mode: LinkMode, link_mode: LinkMode,
locks: &Locks,
) -> Result<(), Error> { ) -> Result<(), Error> {
let dist_info_prefix = find_dist_info(&wheel)?; let dist_info_prefix = find_dist_info(&wheel)?;
let metadata = dist_info_metadata(&dist_info_prefix, &wheel)?; let metadata = dist_info_metadata(&dist_info_prefix, &wheel)?;
@ -75,7 +81,7 @@ pub fn install_wheel(
LibKind::Pure => &layout.scheme.purelib, LibKind::Pure => &layout.scheme.purelib,
LibKind::Plat => &layout.scheme.platlib, LibKind::Plat => &layout.scheme.platlib,
}; };
let num_unpacked = link_mode.link_wheel_files(site_packages, &wheel)?; let num_unpacked = link_mode.link_wheel_files(site_packages, &wheel, locks)?;
debug!(name, "Extracted {num_unpacked} files"); debug!(name, "Extracted {num_unpacked} files");
// Read the RECORD file. // Read the RECORD file.
@ -240,11 +246,12 @@ impl LinkMode {
self, self,
site_packages: impl AsRef<Path>, site_packages: impl AsRef<Path>,
wheel: impl AsRef<Path>, wheel: impl AsRef<Path>,
locks: &Locks,
) -> Result<usize, Error> { ) -> Result<usize, Error> {
match self { match self {
Self::Clone => clone_wheel_files(site_packages, wheel), Self::Clone => clone_wheel_files(site_packages, wheel, locks),
Self::Copy => copy_wheel_files(site_packages, wheel), Self::Copy => copy_wheel_files(site_packages, wheel, locks),
Self::Hardlink => hardlink_wheel_files(site_packages, wheel), Self::Hardlink => hardlink_wheel_files(site_packages, wheel, locks),
} }
} }
} }
@ -257,6 +264,7 @@ impl LinkMode {
fn clone_wheel_files( fn clone_wheel_files(
site_packages: impl AsRef<Path>, site_packages: impl AsRef<Path>,
wheel: impl AsRef<Path>, wheel: impl AsRef<Path>,
locks: &Locks,
) -> Result<usize, Error> { ) -> Result<usize, Error> {
let mut count = 0usize; let mut count = 0usize;
let mut attempt = Attempt::default(); let mut attempt = Attempt::default();
@ -269,6 +277,7 @@ fn clone_wheel_files(
clone_recursive( clone_recursive(
site_packages.as_ref(), site_packages.as_ref(),
wheel.as_ref(), wheel.as_ref(),
locks,
&entry?, &entry?,
&mut attempt, &mut attempt,
)?; )?;
@ -319,6 +328,7 @@ enum Attempt {
fn clone_recursive( fn clone_recursive(
site_packages: &Path, site_packages: &Path,
wheel: &Path, wheel: &Path,
locks: &Locks,
entry: &DirEntry, entry: &DirEntry,
attempt: &mut Attempt, attempt: &mut Attempt,
) -> Result<(), Error> { ) -> Result<(), Error> {
@ -332,7 +342,7 @@ fn clone_recursive(
// On Windows, reflinking directories is not supported, so we copy each file instead. // On Windows, reflinking directories is not supported, so we copy each file instead.
fs::create_dir_all(&to)?; fs::create_dir_all(&to)?;
for entry in fs::read_dir(from)? { for entry in fs::read_dir(from)? {
clone_recursive(site_packages, wheel, &entry?, attempt)?; clone_recursive(site_packages, wheel, locks, &entry?, attempt)?;
} }
return Ok(()); return Ok(());
} }
@ -344,7 +354,7 @@ fn clone_recursive(
// If cloning/copying fails and the directory exists already, it must be merged recursively. // If cloning/copying fails and the directory exists already, it must be merged recursively.
if entry.file_type()?.is_dir() { if entry.file_type()?.is_dir() {
for entry in fs::read_dir(from)? { for entry in fs::read_dir(from)? {
clone_recursive(site_packages, wheel, &entry?, attempt)?; clone_recursive(site_packages, wheel, locks, &entry?, attempt)?;
} }
} else { } else {
// If file already exists, overwrite it. // If file already exists, overwrite it.
@ -359,7 +369,7 @@ fn clone_recursive(
tempfile.display(), tempfile.display(),
); );
*attempt = Attempt::UseCopyFallback; *attempt = Attempt::UseCopyFallback;
fs::copy(&from, &to)?; synchronized_copy(&from, &to, locks)?;
} }
} }
} else { } else {
@ -370,7 +380,7 @@ fn clone_recursive(
); );
// switch to copy fallback // switch to copy fallback
*attempt = Attempt::UseCopyFallback; *attempt = Attempt::UseCopyFallback;
clone_recursive(site_packages, wheel, entry, attempt)?; clone_recursive(site_packages, wheel, locks, entry, attempt)?;
} }
} }
} }
@ -380,7 +390,7 @@ fn clone_recursive(
// If cloning/copying fails and the directory exists already, it must be merged recursively. // If cloning/copying fails and the directory exists already, it must be merged recursively.
if entry.file_type()?.is_dir() { if entry.file_type()?.is_dir() {
for entry in fs::read_dir(from)? { for entry in fs::read_dir(from)? {
clone_recursive(site_packages, wheel, &entry?, attempt)?; clone_recursive(site_packages, wheel, locks, &entry?, attempt)?;
} }
} else { } else {
// If file already exists, overwrite it. // If file already exists, overwrite it.
@ -398,10 +408,10 @@ fn clone_recursive(
if entry.file_type()?.is_dir() { if entry.file_type()?.is_dir() {
fs::create_dir_all(&to)?; fs::create_dir_all(&to)?;
for entry in fs::read_dir(from)? { for entry in fs::read_dir(from)? {
clone_recursive(site_packages, wheel, &entry?, attempt)?; clone_recursive(site_packages, wheel, locks, &entry?, attempt)?;
} }
} else { } else {
fs::copy(&from, &to)?; synchronized_copy(&from, &to, locks)?;
} }
warn_user_once!("Failed to clone files; falling back to full copy. This may lead to degraded performance. If this is intentional, use `--link-mode=copy` to suppress this warning.\n\nhint: If the cache and target directories are on different filesystems, reflinking may not be supported."); warn_user_once!("Failed to clone files; falling back to full copy. This may lead to degraded performance. If this is intentional, use `--link-mode=copy` to suppress this warning.\n\nhint: If the cache and target directories are on different filesystems, reflinking may not be supported.");
} }
@ -417,6 +427,7 @@ fn clone_recursive(
fn copy_wheel_files( fn copy_wheel_files(
site_packages: impl AsRef<Path>, site_packages: impl AsRef<Path>,
wheel: impl AsRef<Path>, wheel: impl AsRef<Path>,
locks: &Locks,
) -> Result<usize, Error> { ) -> Result<usize, Error> {
let mut count = 0usize; let mut count = 0usize;
@ -433,8 +444,7 @@ fn copy_wheel_files(
continue; continue;
} }
// Copy the file, which will also set its permissions. synchronized_copy(path, &out_path, locks)?;
fs::copy(path, &out_path)?;
count += 1; count += 1;
} }
@ -446,6 +456,7 @@ fn copy_wheel_files(
fn hardlink_wheel_files( fn hardlink_wheel_files(
site_packages: impl AsRef<Path>, site_packages: impl AsRef<Path>,
wheel: impl AsRef<Path>, wheel: impl AsRef<Path>,
locks: &Locks,
) -> Result<usize, Error> { ) -> Result<usize, Error> {
let mut attempt = Attempt::default(); let mut attempt = Attempt::default();
let mut count = 0usize; let mut count = 0usize;
@ -465,7 +476,7 @@ fn hardlink_wheel_files(
// The `RECORD` file is modified during installation, so we copy it instead of hard-linking. // The `RECORD` file is modified during installation, so we copy it instead of hard-linking.
if path.ends_with("RECORD") { if path.ends_with("RECORD") {
fs::copy(path, &out_path)?; synchronized_copy(path, &out_path, locks)?;
count += 1; count += 1;
continue; continue;
} }
@ -493,7 +504,7 @@ fn hardlink_wheel_files(
out_path.display(), out_path.display(),
path.display() path.display()
); );
fs::copy(path, &out_path)?; synchronized_copy(path, &out_path, locks)?;
attempt = Attempt::UseCopyFallback; attempt = Attempt::UseCopyFallback;
} }
} else { } else {
@ -502,7 +513,7 @@ fn hardlink_wheel_files(
out_path.display(), out_path.display(),
path.display() path.display()
); );
fs::copy(path, &out_path)?; synchronized_copy(path, &out_path, locks)?;
attempt = Attempt::UseCopyFallback; attempt = Attempt::UseCopyFallback;
} }
} }
@ -526,7 +537,7 @@ fn hardlink_wheel_files(
} }
} }
Attempt::UseCopyFallback => { Attempt::UseCopyFallback => {
fs::copy(path, &out_path)?; synchronized_copy(path, &out_path, locks)?;
warn_user_once!("Failed to hardlink files; falling back to full copy. This may lead to degraded performance. If this is intentional, use `--link-mode=copy` to suppress this warning.\n\nhint: If the cache and target directories are on different filesystems, hardlinking may not be supported."); warn_user_once!("Failed to hardlink files; falling back to full copy. This may lead to degraded performance. If this is intentional, use `--link-mode=copy` to suppress this warning.\n\nhint: If the cache and target directories are on different filesystems, hardlinking may not be supported.");
} }
} }
@ -536,3 +547,26 @@ fn hardlink_wheel_files(
Ok(count) Ok(count)
} }
/// Copy from `from` to `to`, ensuring that the parent directory is locked. Avoids simultaneous
/// writes to the same file, which can lead to corruption.
///
/// See: <https://github.com/astral-sh/uv/issues/4831>
fn synchronized_copy(from: &Path, to: &Path, locks: &Locks) -> std::io::Result<()> {
// Ensure we have a lock for the directory.
let dir_lock = {
let mut locks_guard = locks.0.lock().unwrap();
locks_guard
.entry(to.parent().unwrap().to_path_buf())
.or_insert_with(|| Arc::new(Mutex::new(())))
.clone()
};
// Acquire a lock on the directory.
let _dir_guard = dir_lock.lock().unwrap();
// Copy the file, which will also set its permissions.
fs::copy(from, to)?;
Ok(())
}

View file

@ -96,6 +96,7 @@ fn install(
link_mode: LinkMode, link_mode: LinkMode,
reporter: Option<Box<dyn Reporter>>, reporter: Option<Box<dyn Reporter>>,
) -> Result<Vec<CachedDist>> { ) -> Result<Vec<CachedDist>> {
let locks = install_wheel_rs::linker::Locks::default();
wheels.par_iter().try_for_each(|wheel| { wheels.par_iter().try_for_each(|wheel| {
install_wheel_rs::linker::install_wheel( install_wheel_rs::linker::install_wheel(
&layout, &layout,
@ -109,6 +110,7 @@ fn install(
.as_ref(), .as_ref(),
installer_name.as_deref(), installer_name.as_deref(),
link_mode, link_mode,
&locks,
) )
.with_context(|| format!("Failed to install: {} ({wheel})", wheel.filename()))?; .with_context(|| format!("Failed to install: {} ({wheel})", wheel.filename()))?;