diff --git a/crates/install-wheel-rs/src/linker.rs b/crates/install-wheel-rs/src/linker.rs index c943483cd..9ba0019e2 100644 --- a/crates/install-wheel-rs/src/linker.rs +++ b/crates/install-wheel-rs/src/linker.rs @@ -1,13 +1,15 @@ //! Like `wheel.rs`, but for installing wheels that have already been unzipped, rather than //! reading from a zip file. -use std::path::Path; +use std::path::{Path, PathBuf}; use std::str::FromStr; +use std::sync::{Arc, Mutex}; use std::time::SystemTime; use fs_err as fs; use fs_err::{DirEntry, File}; use reflink_copy as reflink; +use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; use tempfile::tempdir_in; use tracing::{debug, instrument}; @@ -25,6 +27,9 @@ use crate::wheel::{ }; use crate::{Error, Layout}; +#[derive(Debug, Default)] +pub struct Locks(Mutex>>>); + /// Install the given wheel to the given venv /// /// The caller must ensure that the wheel is compatible to the environment. @@ -40,6 +45,7 @@ pub fn install_wheel( direct_url: Option<&DirectUrl>, installer: Option<&str>, link_mode: LinkMode, + locks: &Locks, ) -> Result<(), Error> { let dist_info_prefix = find_dist_info(&wheel)?; let metadata = dist_info_metadata(&dist_info_prefix, &wheel)?; @@ -75,7 +81,7 @@ pub fn install_wheel( LibKind::Pure => &layout.scheme.purelib, LibKind::Plat => &layout.scheme.platlib, }; - let num_unpacked = link_mode.link_wheel_files(site_packages, &wheel)?; + let num_unpacked = link_mode.link_wheel_files(site_packages, &wheel, locks)?; debug!(name, "Extracted {num_unpacked} files"); // Read the RECORD file. @@ -240,11 +246,12 @@ impl LinkMode { self, site_packages: impl AsRef, wheel: impl AsRef, + locks: &Locks, ) -> Result { match self { - Self::Clone => clone_wheel_files(site_packages, wheel), - Self::Copy => copy_wheel_files(site_packages, wheel), - Self::Hardlink => hardlink_wheel_files(site_packages, wheel), + Self::Clone => clone_wheel_files(site_packages, wheel, locks), + Self::Copy => copy_wheel_files(site_packages, wheel, locks), + Self::Hardlink => hardlink_wheel_files(site_packages, wheel, locks), } } } @@ -257,6 +264,7 @@ impl LinkMode { fn clone_wheel_files( site_packages: impl AsRef, wheel: impl AsRef, + locks: &Locks, ) -> Result { let mut count = 0usize; let mut attempt = Attempt::default(); @@ -269,6 +277,7 @@ fn clone_wheel_files( clone_recursive( site_packages.as_ref(), wheel.as_ref(), + locks, &entry?, &mut attempt, )?; @@ -319,6 +328,7 @@ enum Attempt { fn clone_recursive( site_packages: &Path, wheel: &Path, + locks: &Locks, entry: &DirEntry, attempt: &mut Attempt, ) -> Result<(), Error> { @@ -332,7 +342,7 @@ fn clone_recursive( // On Windows, reflinking directories is not supported, so we copy each file instead. fs::create_dir_all(&to)?; for entry in fs::read_dir(from)? { - clone_recursive(site_packages, wheel, &entry?, attempt)?; + clone_recursive(site_packages, wheel, locks, &entry?, attempt)?; } return Ok(()); } @@ -344,7 +354,7 @@ fn clone_recursive( // If cloning/copying fails and the directory exists already, it must be merged recursively. if entry.file_type()?.is_dir() { for entry in fs::read_dir(from)? { - clone_recursive(site_packages, wheel, &entry?, attempt)?; + clone_recursive(site_packages, wheel, locks, &entry?, attempt)?; } } else { // If file already exists, overwrite it. @@ -359,7 +369,7 @@ fn clone_recursive( tempfile.display(), ); *attempt = Attempt::UseCopyFallback; - fs::copy(&from, &to)?; + synchronized_copy(&from, &to, locks)?; } } } else { @@ -370,7 +380,7 @@ fn clone_recursive( ); // switch to copy fallback *attempt = Attempt::UseCopyFallback; - clone_recursive(site_packages, wheel, entry, attempt)?; + clone_recursive(site_packages, wheel, locks, entry, attempt)?; } } } @@ -380,7 +390,7 @@ fn clone_recursive( // If cloning/copying fails and the directory exists already, it must be merged recursively. if entry.file_type()?.is_dir() { for entry in fs::read_dir(from)? { - clone_recursive(site_packages, wheel, &entry?, attempt)?; + clone_recursive(site_packages, wheel, locks, &entry?, attempt)?; } } else { // If file already exists, overwrite it. @@ -398,10 +408,10 @@ fn clone_recursive( if entry.file_type()?.is_dir() { fs::create_dir_all(&to)?; for entry in fs::read_dir(from)? { - clone_recursive(site_packages, wheel, &entry?, attempt)?; + clone_recursive(site_packages, wheel, locks, &entry?, attempt)?; } } else { - fs::copy(&from, &to)?; + synchronized_copy(&from, &to, locks)?; } warn_user_once!("Failed to clone files; falling back to full copy. This may lead to degraded performance. If this is intentional, use `--link-mode=copy` to suppress this warning.\n\nhint: If the cache and target directories are on different filesystems, reflinking may not be supported."); } @@ -417,6 +427,7 @@ fn clone_recursive( fn copy_wheel_files( site_packages: impl AsRef, wheel: impl AsRef, + locks: &Locks, ) -> Result { let mut count = 0usize; @@ -433,8 +444,7 @@ fn copy_wheel_files( continue; } - // Copy the file, which will also set its permissions. - fs::copy(path, &out_path)?; + synchronized_copy(path, &out_path, locks)?; count += 1; } @@ -446,6 +456,7 @@ fn copy_wheel_files( fn hardlink_wheel_files( site_packages: impl AsRef, wheel: impl AsRef, + locks: &Locks, ) -> Result { let mut attempt = Attempt::default(); let mut count = 0usize; @@ -465,7 +476,7 @@ fn hardlink_wheel_files( // The `RECORD` file is modified during installation, so we copy it instead of hard-linking. if path.ends_with("RECORD") { - fs::copy(path, &out_path)?; + synchronized_copy(path, &out_path, locks)?; count += 1; continue; } @@ -493,7 +504,7 @@ fn hardlink_wheel_files( out_path.display(), path.display() ); - fs::copy(path, &out_path)?; + synchronized_copy(path, &out_path, locks)?; attempt = Attempt::UseCopyFallback; } } else { @@ -502,7 +513,7 @@ fn hardlink_wheel_files( out_path.display(), path.display() ); - fs::copy(path, &out_path)?; + synchronized_copy(path, &out_path, locks)?; attempt = Attempt::UseCopyFallback; } } @@ -526,7 +537,7 @@ fn hardlink_wheel_files( } } Attempt::UseCopyFallback => { - fs::copy(path, &out_path)?; + synchronized_copy(path, &out_path, locks)?; warn_user_once!("Failed to hardlink files; falling back to full copy. This may lead to degraded performance. If this is intentional, use `--link-mode=copy` to suppress this warning.\n\nhint: If the cache and target directories are on different filesystems, hardlinking may not be supported."); } } @@ -536,3 +547,26 @@ fn hardlink_wheel_files( Ok(count) } + +/// Copy from `from` to `to`, ensuring that the parent directory is locked. Avoids simultaneous +/// writes to the same file, which can lead to corruption. +/// +/// See: +fn synchronized_copy(from: &Path, to: &Path, locks: &Locks) -> std::io::Result<()> { + // Ensure we have a lock for the directory. + let dir_lock = { + let mut locks_guard = locks.0.lock().unwrap(); + locks_guard + .entry(to.parent().unwrap().to_path_buf()) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone() + }; + + // Acquire a lock on the directory. + let _dir_guard = dir_lock.lock().unwrap(); + + // Copy the file, which will also set its permissions. + fs::copy(from, to)?; + + Ok(()) +} diff --git a/crates/uv-installer/src/installer.rs b/crates/uv-installer/src/installer.rs index a39c5c7a6..008b8d325 100644 --- a/crates/uv-installer/src/installer.rs +++ b/crates/uv-installer/src/installer.rs @@ -96,6 +96,7 @@ fn install( link_mode: LinkMode, reporter: Option>, ) -> Result> { + let locks = install_wheel_rs::linker::Locks::default(); wheels.par_iter().try_for_each(|wheel| { install_wheel_rs::linker::install_wheel( &layout, @@ -109,6 +110,7 @@ fn install( .as_ref(), installer_name.as_deref(), link_mode, + &locks, ) .with_context(|| format!("Failed to install: {} ({wheel})", wheel.filename()))?;