diff --git a/crates/uv-cache/src/lib.rs b/crates/uv-cache/src/lib.rs index dc8bea8c6..817e8db86 100644 --- a/crates/uv-cache/src/lib.rs +++ b/crates/uv-cache/src/lib.rs @@ -7,11 +7,10 @@ use std::str::FromStr; use std::sync::Arc; use rustc_hash::FxHashMap; -use tracing::debug; +use tracing::{debug, warn}; -pub use archive::ArchiveId; use uv_cache_info::Timestamp; -use uv_fs::{LockedFile, cachedir, directories}; +use uv_fs::{LockedFile, Simplified, cachedir, directories}; use uv_normalize::PackageName; use uv_pypi_types::ResolutionMetadata; @@ -22,6 +21,7 @@ use crate::removal::Remover; pub use crate::removal::{Removal, rm_rf}; pub use crate::wheel::WheelCache; use crate::wheel::WheelCacheKind; +pub use archive::ArchiveId; mod archive; mod by_timestamp; @@ -135,6 +135,8 @@ impl Deref for CacheShard { } /// The main cache abstraction. +/// +/// While the cache is active, it holds a read (shared) lock that prevents cache cleaning #[derive(Debug, Clone)] pub struct Cache { /// The cache directory. @@ -146,6 +148,9 @@ pub struct Cache { /// Included to ensure that the temporary directory exists for the length of the operation, but /// is dropped at the end as appropriate. temp_dir: Option>, + /// Ensure that `uv cache` operations don't remove items from the cache that are used by another + /// uv process. + lock_file: Option>, } impl Cache { @@ -155,6 +160,7 @@ impl Cache { root: root.into(), refresh: Refresh::None(Timestamp::now()), temp_dir: None, + lock_file: None, } } @@ -165,6 +171,7 @@ impl Cache { root: temp_dir.path().to_path_buf(), refresh: Refresh::None(Timestamp::now()), temp_dir: Some(Arc::new(temp_dir)), + lock_file: None, }) } @@ -174,6 +181,34 @@ impl Cache { Self { refresh, ..self } } + /// Acquire a lock that allows removing entries from the cache. + pub fn with_exclusive_lock(self) -> Result { + let Self { + root, + refresh, + temp_dir, + lock_file, + } = self; + + // Release the existing lock, avoid deadlocks from a cloned cache. + if let Some(lock_file) = lock_file { + drop( + Arc::try_unwrap(lock_file).expect( + "cloning the cache before acquiring an exclusive lock causes a deadlock", + ), + ); + } + let lock_file = + LockedFile::acquire_blocking(root.join(".lock"), root.simplified_display())?; + + Ok(Self { + root, + refresh, + temp_dir, + lock_file: Some(Arc::new(lock_file)), + }) + } + /// Return the root of the cache. pub fn root(&self) -> &Path { &self.root @@ -359,15 +394,43 @@ impl Cache { .join(".git"), )?; + // Block cache removal operations from interfering. + let lock_file = match LockedFile::acquire_shared_blocking( + root.join(".lock"), + root.simplified_display(), + ) { + Ok(lock_file) => Some(Arc::new(lock_file)), + Err(err) if err.kind() == io::ErrorKind::Unsupported => { + warn!( + "Shared locking is not supported by the current platform or filesystem, \ + reduced parallel process safety with `uv cache clean` and `uv cache prune`." + ); + None + } + Err(err) => return Err(err), + }; + Ok(Self { root: std::path::absolute(root)?, + lock_file, ..self }) } /// Clear the cache, removing all entries. - pub fn clear(&self, reporter: Box) -> Result { - Remover::new(reporter).rm_rf(&self.root) + pub fn clear(self, reporter: Box) -> Result { + // Remove everything but `.lock`, for Windows locked file special cases. + let mut removal = Remover::new(reporter).rm_rf(&self.root, true)?; + let Self { + root, lock_file, .. + } = self; + // Unlock `.lock` + drop(lock_file); + fs_err::remove_file(root.join(".lock"))?; + removal.num_files += 1; + fs_err::remove_dir(root)?; + removal.num_dirs += 1; + Ok(removal) } /// Remove a package from the cache. @@ -407,6 +470,7 @@ impl Cache { if entry.file_name() == "CACHEDIR.TAG" || entry.file_name() == ".gitignore" || entry.file_name() == ".git" + || entry.file_name() == ".lock" { continue; } diff --git a/crates/uv-cache/src/removal.rs b/crates/uv-cache/src/removal.rs index d37004dcc..5521da5c8 100644 --- a/crates/uv-cache/src/removal.rs +++ b/crates/uv-cache/src/removal.rs @@ -10,7 +10,7 @@ use crate::CleanReporter; /// Remove a file or directory and all its contents, returning a [`Removal`] with /// the number of files and directories removed, along with a total byte count. pub fn rm_rf(path: impl AsRef) -> io::Result { - Remover::default().rm_rf(path) + Remover::default().rm_rf(path, false) } /// A builder for a [`Remover`] that can remove files and directories. @@ -29,9 +29,13 @@ impl Remover { /// Remove a file or directory and all its contents, returning a [`Removal`] with /// the number of files and directories removed, along with a total byte count. - pub(crate) fn rm_rf(&self, path: impl AsRef) -> io::Result { + pub(crate) fn rm_rf( + &self, + path: impl AsRef, + skip_locked_file: bool, + ) -> io::Result { let mut removal = Removal::default(); - removal.rm_rf(path.as_ref(), self.reporter.as_deref())?; + removal.rm_rf(path.as_ref(), self.reporter.as_deref(), skip_locked_file)?; Ok(removal) } } @@ -52,7 +56,12 @@ pub struct Removal { impl Removal { /// Recursively remove a file or directory and all its contents. - fn rm_rf(&mut self, path: &Path, reporter: Option<&dyn CleanReporter>) -> io::Result<()> { + fn rm_rf( + &mut self, + path: &Path, + reporter: Option<&dyn CleanReporter>, + skip_locked_file: bool, + ) -> io::Result<()> { let metadata = match fs_err::symlink_metadata(path) { Ok(metadata) => metadata, Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(()), @@ -100,13 +109,25 @@ impl Removal { if set_readable(dir).unwrap_or(false) { // Retry the operation; if we _just_ `self.rm_rf(dir)` and continue, // `walkdir` may give us duplicate entries for the directory. - return self.rm_rf(path, reporter); + return self.rm_rf(path, reporter, skip_locked_file); } } } } let entry = entry?; + + // Remove the exclusive lock last. + if skip_locked_file + && entry.file_name() == ".lock" + && entry + .path() + .strip_prefix(path) + .is_ok_and(|suffix| suffix == Path::new(".lock")) + { + continue; + } + if entry.file_type().is_symlink() && { #[cfg(windows)] { @@ -121,6 +142,11 @@ impl Removal { self.num_files += 1; remove_dir(entry.path())?; } else if entry.file_type().is_dir() { + // Remove the directory with the exclusive lock last. + if skip_locked_file && entry.path() == path { + continue; + } + self.num_dirs += 1; // The contents should have been removed by now, but sometimes a race condition is diff --git a/crates/uv-fs/src/lib.rs b/crates/uv-fs/src/lib.rs index e8cfc4d8a..ad319b9f2 100644 --- a/crates/uv-fs/src/lib.rs +++ b/crates/uv-fs/src/lib.rs @@ -693,9 +693,50 @@ impl LockedFile { } } - /// The same as [`LockedFile::acquire`], but for synchronous contexts. Do not use from an async - /// context, as this can block the runtime while waiting for another process to release the - /// lock. + /// Inner implementation for [`LockedFile::acquire_shared_blocking`] and + /// [`LockedFile::acquire_blocking`]. + fn lock_file_shared_blocking( + file: fs_err::File, + resource: &str, + ) -> Result { + trace!( + "Checking shared lock for `{resource}` at `{}`", + file.path().user_display() + ); + // TODO(konsti): Update fs_err to support this. + match FileExt::try_lock_shared(file.file()) { + Ok(()) => { + debug!("Acquired shared lock for `{resource}`"); + Ok(Self(file)) + } + Err(err) => { + // Log error code and enum kind to help debugging more exotic failures. + if err.kind() != std::io::ErrorKind::WouldBlock { + debug!("Try lock error: {err:?}"); + } + info!( + "Waiting to acquire shared lock for `{resource}` at `{}`", + file.path().user_display(), + ); + FileExt::lock_shared(file.file()).map_err(|err| { + // Not an fs_err method, we need to build our own path context + std::io::Error::other(format!( + "Could not acquire shared lock for `{resource}` at `{}`: {}", + file.path().user_display(), + err + )) + })?; + + debug!("Acquired shared lock for `{resource}`"); + Ok(Self(file)) + } + } + } + + /// The same as [`LockedFile::acquire`], but for synchronous contexts. + /// + /// Do not use from an async context, as this can block the runtime while waiting for another + /// process to release the lock. pub fn acquire_blocking( path: impl AsRef, resource: impl Display, @@ -705,6 +746,19 @@ impl LockedFile { Self::lock_file_blocking(file, &resource) } + /// The same as [`LockedFile::acquire_blocking`], but for synchronous contexts. + /// + /// Do not use from an async context, as this can block the runtime while waiting for another + /// process to release the lock. + pub fn acquire_shared_blocking( + path: impl AsRef, + resource: impl Display, + ) -> Result { + let file = Self::create(path)?; + let resource = resource.to_string(); + Self::lock_file_shared_blocking(file, &resource) + } + /// Acquire a cross-process lock for a resource using a file at the provided path. #[cfg(feature = "tokio")] pub async fn acquire( @@ -716,6 +770,18 @@ impl LockedFile { tokio::task::spawn_blocking(move || Self::lock_file_blocking(file, &resource)).await? } + /// Acquire a cross-process read lock for a shared resource using a file at the provided path. + #[cfg(feature = "tokio")] + pub async fn acquire_shared( + path: impl AsRef, + resource: impl Display, + ) -> Result { + let file = Self::create(path)?; + let resource = resource.to_string(); + tokio::task::spawn_blocking(move || Self::lock_file_shared_blocking(file, &resource)) + .await? + } + #[cfg(unix)] fn create(path: impl AsRef) -> Result { use std::os::unix::fs::PermissionsExt; diff --git a/crates/uv/src/commands/cache_clean.rs b/crates/uv/src/commands/cache_clean.rs index ef61db57a..040aa809a 100644 --- a/crates/uv/src/commands/cache_clean.rs +++ b/crates/uv/src/commands/cache_clean.rs @@ -14,7 +14,7 @@ use crate::printer::Printer; /// Clear the cache, removing all entries or those linked to specific packages. pub(crate) fn cache_clean( packages: &[PackageName], - cache: &Cache, + cache: Cache, printer: Printer, ) -> Result { if !cache.root().exists() { @@ -25,6 +25,7 @@ pub(crate) fn cache_clean( )?; return Ok(ExitStatus::Success); } + let cache = cache.with_exclusive_lock()?; let summary = if packages.is_empty() { writeln!( @@ -36,9 +37,10 @@ pub(crate) fn cache_clean( let num_paths = walkdir::WalkDir::new(cache.root()).into_iter().count(); let reporter = CleaningDirectoryReporter::new(printer, num_paths); + let root = cache.root().to_path_buf(); cache .clear(Box::new(reporter)) - .with_context(|| format!("Failed to clear cache at: {}", cache.root().user_display()))? + .with_context(|| format!("Failed to clear cache at: {}", root.user_display()))? } else { let reporter = CleaningPackageReporter::new(printer, packages.len()); let mut summary = Removal::default(); diff --git a/crates/uv/src/commands/cache_prune.rs b/crates/uv/src/commands/cache_prune.rs index ba2241169..e5aa69336 100644 --- a/crates/uv/src/commands/cache_prune.rs +++ b/crates/uv/src/commands/cache_prune.rs @@ -10,7 +10,7 @@ use crate::commands::{ExitStatus, human_readable_bytes}; use crate::printer::Printer; /// Prune all unreachable objects from the cache. -pub(crate) fn cache_prune(ci: bool, cache: &Cache, printer: Printer) -> Result { +pub(crate) fn cache_prune(ci: bool, cache: Cache, printer: Printer) -> Result { if !cache.root().exists() { writeln!( printer.stderr(), @@ -19,6 +19,7 @@ pub(crate) fn cache_prune(ci: bool, cache: &Cache, printer: Printer) -> Result Result Ok(handle), Err(err) if err.kind() == std::io::ErrorKind::NotFound => { diff --git a/crates/uv/src/lib.rs b/crates/uv/src/lib.rs index 756198cf1..724a7a30a 100644 --- a/crates/uv/src/lib.rs +++ b/crates/uv/src/lib.rs @@ -1010,13 +1010,13 @@ async fn run(mut cli: Cli) -> Result { }) | Commands::Clean(args) => { show_settings!(args); - commands::cache_clean(&args.package, &cache, printer) + commands::cache_clean(&args.package, cache, printer) } Commands::Cache(CacheNamespace { command: CacheCommand::Prune(args), }) => { show_settings!(args); - commands::cache_prune(args.ci, &cache, printer) + commands::cache_prune(args.ci, cache, printer) } Commands::Cache(CacheNamespace { command: CacheCommand::Dir, @@ -1836,7 +1836,7 @@ async fn run_project( globals.python_downloads, globals.installer_metadata, globals.concurrency, - &cache, + cache, printer, args.env_file, globals.preview, diff --git a/crates/uv/tests/it/cache_clean.rs b/crates/uv/tests/it/cache_clean.rs index 06fd8f215..574be3c4f 100644 --- a/crates/uv/tests/it/cache_clean.rs +++ b/crates/uv/tests/it/cache_clean.rs @@ -19,16 +19,18 @@ fn clean_all() -> Result<()> { .assert() .success(); - uv_snapshot!(context.with_filtered_counts().filters(), context.clean().arg("--verbose"), @r###" + uv_snapshot!(context.with_filtered_counts().filters(), context.clean().arg("--verbose"), @r" success: true exit_code: 0 ----- stdout ----- ----- stderr ----- DEBUG uv [VERSION] ([COMMIT] DATE) + DEBUG Acquired lock for `[CACHE_DIR]/` Clearing cache at: [CACHE_DIR]/ + DEBUG Released lock at `[CACHE_DIR]/.lock` Removed [N] files ([SIZE]) - "###); + "); Ok(()) } @@ -73,16 +75,18 @@ fn clean_package_pypi() -> Result<()> { ]) .collect(); - uv_snapshot!(&filters, context.clean().arg("--verbose").arg("iniconfig"), @r###" + uv_snapshot!(&filters, context.clean().arg("--verbose").arg("iniconfig"), @r" success: true exit_code: 0 ----- stdout ----- ----- stderr ----- DEBUG uv [VERSION] ([COMMIT] DATE) + DEBUG Acquired lock for `[CACHE_DIR]/` DEBUG Removing dangling cache entry: [CACHE_DIR]/archive-v0/[ENTRY] Removed [N] files ([SIZE]) - "###); + DEBUG Released lock at `[CACHE_DIR]/.lock` + "); // Assert that the `.rkyv` file is removed for `iniconfig`. assert!( @@ -91,16 +95,18 @@ fn clean_package_pypi() -> Result<()> { ); // Running `uv cache prune` should have no effect. - uv_snapshot!(&filters, context.prune().arg("--verbose"), @r###" + uv_snapshot!(&filters, context.prune().arg("--verbose"), @r" success: true exit_code: 0 ----- stdout ----- ----- stderr ----- DEBUG uv [VERSION] ([COMMIT] DATE) + DEBUG Acquired lock for `[CACHE_DIR]/` Pruning cache at: [CACHE_DIR]/ No unused entries found - "###); + DEBUG Released lock at `[CACHE_DIR]/.lock` + "); Ok(()) } @@ -148,16 +154,18 @@ fn clean_package_index() -> Result<()> { ]) .collect(); - uv_snapshot!(&filters, context.clean().arg("--verbose").arg("iniconfig"), @r###" + uv_snapshot!(&filters, context.clean().arg("--verbose").arg("iniconfig"), @r" success: true exit_code: 0 ----- stdout ----- ----- stderr ----- DEBUG uv [VERSION] ([COMMIT] DATE) + DEBUG Acquired lock for `[CACHE_DIR]/` DEBUG Removing dangling cache entry: [CACHE_DIR]/archive-v0/[ENTRY] Removed [N] files ([SIZE]) - "###); + DEBUG Released lock at `[CACHE_DIR]/.lock` + "); // Assert that the `.rkyv` file is removed for `iniconfig`. assert!( diff --git a/crates/uv/tests/it/cache_prune.rs b/crates/uv/tests/it/cache_prune.rs index 99493fe21..b9a34d865 100644 --- a/crates/uv/tests/it/cache_prune.rs +++ b/crates/uv/tests/it/cache_prune.rs @@ -29,16 +29,18 @@ fn prune_no_op() -> Result<()> { .chain(std::iter::once((r"Removed \d+ files", "Removed [N] files"))) .collect(); - uv_snapshot!(&filters, context.prune().arg("--verbose"), @r###" + uv_snapshot!(&filters, context.prune().arg("--verbose"), @r" success: true exit_code: 0 ----- stdout ----- ----- stderr ----- DEBUG uv [VERSION] ([COMMIT] DATE) + DEBUG Acquired lock for `[CACHE_DIR]/` Pruning cache at: [CACHE_DIR]/ No unused entries found - "###); + DEBUG Released lock at `[CACHE_DIR]/.lock` + "); Ok(()) } @@ -68,17 +70,19 @@ fn prune_stale_directory() -> Result<()> { .chain(std::iter::once((r"Removed \d+ files", "Removed [N] files"))) .collect(); - uv_snapshot!(&filters, context.prune().arg("--verbose"), @r###" + uv_snapshot!(&filters, context.prune().arg("--verbose"), @r" success: true exit_code: 0 ----- stdout ----- ----- stderr ----- DEBUG uv [VERSION] ([COMMIT] DATE) + DEBUG Acquired lock for `[CACHE_DIR]/` Pruning cache at: [CACHE_DIR]/ DEBUG Removing dangling cache bucket: [CACHE_DIR]/simple-v4 Removed 1 directory - "###); + DEBUG Released lock at `[CACHE_DIR]/.lock` + "); Ok(()) } @@ -128,18 +132,20 @@ fn prune_cached_env() { ]) .collect(); - uv_snapshot!(filters, context.prune().arg("--verbose"), @r###" + uv_snapshot!(filters, context.prune().arg("--verbose"), @r" success: true exit_code: 0 ----- stdout ----- ----- stderr ----- DEBUG uv [VERSION] ([COMMIT] DATE) + DEBUG Acquired lock for `[CACHE_DIR]/` Pruning cache at: [CACHE_DIR]/ DEBUG Removing dangling cache environment: [CACHE_DIR]/environments-v2/[ENTRY] DEBUG Removing dangling cache archive: [CACHE_DIR]/archive-v0/[ENTRY] Removed [N] files ([SIZE]) - "###); + DEBUG Released lock at `[CACHE_DIR]/.lock` + "); } /// `cache prune` should remove any stale symlink from the cache. @@ -173,17 +179,19 @@ fn prune_stale_symlink() -> Result<()> { ]) .collect(); - uv_snapshot!(filters, context.prune().arg("--verbose"), @r###" + uv_snapshot!(filters, context.prune().arg("--verbose"), @r" success: true exit_code: 0 ----- stdout ----- ----- stderr ----- DEBUG uv [VERSION] ([COMMIT] DATE) + DEBUG Acquired lock for `[CACHE_DIR]/` Pruning cache at: [CACHE_DIR]/ DEBUG Removing dangling cache archive: [CACHE_DIR]/archive-v0/[ENTRY] Removed 44 files ([SIZE]) - "###); + DEBUG Released lock at `[CACHE_DIR]/.lock` + "); Ok(()) } @@ -340,18 +348,20 @@ fn prune_stale_revision() -> Result<()> { .collect(); // Pruning should remove the unused revision. - uv_snapshot!(&filters, context.prune().arg("--verbose"), @r###" + uv_snapshot!(&filters, context.prune().arg("--verbose"), @r" success: true exit_code: 0 ----- stdout ----- ----- stderr ----- DEBUG uv [VERSION] ([COMMIT] DATE) + DEBUG Acquired lock for `[CACHE_DIR]/` Pruning cache at: [CACHE_DIR]/ DEBUG Removing dangling source revision: [CACHE_DIR]/sdists-v9/[ENTRY] DEBUG Removing dangling cache archive: [CACHE_DIR]/archive-v0/[ENTRY] Removed [N] files ([SIZE]) - "###); + DEBUG Released lock at `[CACHE_DIR]/.lock` + "); // Uninstall and reinstall the package. We should use the cached version. uv_snapshot!(&filters, context diff --git a/crates/uv/tests/it/lock.rs b/crates/uv/tests/it/lock.rs index a45956619..4991617c0 100644 --- a/crates/uv/tests/it/lock.rs +++ b/crates/uv/tests/it/lock.rs @@ -17704,6 +17704,7 @@ fn lock_explicit_default_index() -> Result<()> { ----- stderr ----- DEBUG uv [VERSION] ([COMMIT] DATE) + DEBUG Acquired shared lock for `[CACHE_DIR]/` DEBUG Found workspace root: `[TEMP_DIR]/` DEBUG Adding root workspace member: `[TEMP_DIR]/` DEBUG No Python version file found in workspace: [TEMP_DIR]/ @@ -17728,6 +17729,7 @@ fn lock_explicit_default_index() -> Result<()> { DEBUG No compatible version found for: project × No solution found when resolving dependencies: ╰─▶ Because anyio was not found in the package registry and your project depends on anyio, we can conclude that your project's requirements are unsatisfiable. + DEBUG Released lock at `[CACHE_DIR]/.lock` "#); let lock = fs_err::read_to_string(context.temp_dir.join("uv.lock")).unwrap();