// Copyright 2018-2025 the Deno authors. MIT license. pub use inner::LaxSingleProcessFsFlag; pub use inner::LaxSingleProcessFsFlagSys; #[cfg(not(target_arch = "wasm32"))] mod inner { use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use parking_lot::Mutex; use sys_traits::FsFileLock; use sys_traits::FsMetadataValue; use crate::Reporter; #[sys_traits::auto_impl] pub trait LaxSingleProcessFsFlagSys: sys_traits::FsOpen + sys_traits::FsMetadata + sys_traits::FsRemoveFile + sys_traits::FsWrite + sys_traits::ThreadSleep + sys_traits::SystemTimeNow + Clone + Send + Sync + 'static { } struct PollFile { sys: TSys, file_path: PathBuf, count: usize, } impl Drop for PollFile { fn drop(&mut self) { // cleanup the poll file so the node_modules folder is more // deterministic and so it doesn't end up in `deno compile` _ = self.sys.fs_remove_file(&self.file_path); } } impl PollFile { pub fn new(sys: TSys, file_path: PathBuf) -> Self { Self { sys, file_path, count: 0, } } pub fn touch(&mut self) { self.count += 1; _ = self.sys.fs_write(&self.file_path, self.count.to_string()); } } struct LaxSingleProcessFsFlagInner { file_path: PathBuf, fs_file: TSys::File, poll_file: Arc>>>, } impl Drop for LaxSingleProcessFsFlagInner { fn drop(&mut self) { // kill the poll thread and clean up the poll file self.poll_file.lock().take(); // release the file lock if let Err(err) = self.fs_file.fs_file_unlock() { log::debug!( "Failed releasing lock for {}. {:#}", self.file_path.display(), err ); } } } /// A file system based flag that will attempt to synchronize multiple /// processes so they go one after the other. In scenarios where /// synchronization cannot be achieved, it will allow the current process /// to proceed. /// /// This should only be used in places where it's ideal for multiple /// processes to not update something on the file system at the same time, /// but it's not that big of a deal. pub struct LaxSingleProcessFsFlag( #[allow(dead_code)] Option>, ); impl LaxSingleProcessFsFlag { pub async fn lock( sys: &TSys, file_path: PathBuf, reporter: &impl Reporter, long_wait_message: &str, ) -> Self { log::debug!("Acquiring file lock at {}", file_path.display()); let last_updated_path = file_path.with_extension("lock.poll"); let start_instant = std::time::Instant::now(); let mut open_options = sys_traits::OpenOptions::new(); open_options.create = true; open_options.read = true; open_options.write = true; let open_result = sys.fs_open(&file_path, &open_options); match open_result { Ok(mut fs_file) => { let mut pb_update_guard = None; let mut error_count = 0; while error_count < 10 { let lock_result = fs_file.fs_file_try_lock(sys_traits::FsFileLockMode::Exclusive); let poll_file_update_ms = 100; match lock_result { Ok(_) => { log::debug!("Acquired file lock at {}", file_path.display()); let mut poll_file = PollFile::new(sys.clone(), last_updated_path); poll_file.touch(); let poll_file = Arc::new(Mutex::new(Some(poll_file))); // Spawn a blocking task that will continually update a file // signalling the lock is alive. This is a fail safe for when // a file lock is never released. For example, on some operating // systems, if a process does not release the lock (say it's // killed), then the OS may release it at an indeterminate time // // This uses a blocking task because we use a single threaded // runtime and this is time sensitive so we don't want it to update // at the whims of whatever is occurring on the runtime thread. let sys = sys.clone(); deno_unsync::spawn_blocking({ let poll_file = poll_file.clone(); move || loop { sys .thread_sleep(Duration::from_millis(poll_file_update_ms)); match &mut *poll_file.lock() { Some(poll_file) => poll_file.touch(), None => return, } } }); return Self(Some(LaxSingleProcessFsFlagInner { file_path, fs_file, poll_file, })); } Err(_) => { // show a message if it's been a while if pb_update_guard.is_none() && start_instant.elapsed().as_millis() > 1_000 { let guard = reporter.on_blocking(long_wait_message); pb_update_guard = Some(guard); } // sleep for a little bit tokio::time::sleep(Duration::from_millis(20)).await; // Poll the last updated path to check if it's stopped updating, // which is an indication that the file lock is claimed, but // was never properly released. match sys .fs_metadata(&last_updated_path) .and_then(|p| p.modified()) { Ok(last_updated_time) => { let current_time = sys.sys_time_now(); match current_time.duration_since(last_updated_time) { Ok(duration) => { if duration.as_millis() > (poll_file_update_ms * 2) as u128 { // the other process hasn't updated this file in a long time // so maybe it was killed and the operating system hasn't // released the file lock yet return Self(None); } else { error_count = 0; // reset } } Err(_) => { error_count += 1; } } } Err(_) => { error_count += 1; } } } } } drop(pb_update_guard); // explicit for clarity Self(None) } Err(err) => { log::debug!( "Failed to open file lock at {}. {:#}", file_path.display(), err ); Self(None) // let the process through } } } } } #[cfg(target_arch = "wasm32")] mod inner { use std::marker::PhantomData; use std::path::PathBuf; use crate::Reporter; // Don't bother locking the folder when installing via Wasm for now. // In the future, what we'd need is a way to spawn a thread (worker) // and have it reliably do the update of the .poll file #[sys_traits::auto_impl] pub trait LaxSingleProcessFsFlagSys: Clone + Send + Sync + 'static {} pub struct LaxSingleProcessFsFlag { _data: PhantomData, } impl LaxSingleProcessFsFlag { pub async fn lock( _sys: &TSys, _file_path: PathBuf, _reporter: &impl Reporter, _long_wait_message: &str, ) -> Self { Self { _data: Default::default(), } } } } #[allow(clippy::disallowed_methods)] #[cfg(all(test, not(target_arch = "wasm32")))] mod test { use std::sync::Arc; use std::time::Duration; use parking_lot::Mutex; use test_util::TempDir; use tokio::sync::Notify; use super::*; use crate::LogReporter; #[tokio::test] async fn lax_fs_lock_basic() { let temp_dir = TempDir::new(); let lock_path = temp_dir.path().join("file.lock"); let signal1 = Arc::new(Notify::new()); let signal2 = Arc::new(Notify::new()); let signal3 = Arc::new(Notify::new()); let signal4 = Arc::new(Notify::new()); tokio::spawn({ let lock_path = lock_path.clone(); let signal1 = signal1.clone(); let signal2 = signal2.clone(); let signal3 = signal3.clone(); let signal4 = signal4.clone(); let temp_dir = temp_dir.clone(); async move { let flag = LaxSingleProcessFsFlag::lock( &sys_traits::impls::RealSys, lock_path.to_path_buf(), &LogReporter, "waiting", ) .await; signal1.notify_one(); signal2.notified().await; tokio::time::sleep(Duration::from_millis(10)).await; // give the other thread time to acquire the lock temp_dir.write("file.txt", "update1"); signal3.notify_one(); signal4.notified().await; drop(flag); } }); let signal5 = Arc::new(Notify::new()); tokio::spawn({ let lock_path = lock_path.clone(); let temp_dir = temp_dir.clone(); let signal5 = signal5.clone(); async move { signal1.notified().await; signal2.notify_one(); let flag = LaxSingleProcessFsFlag::lock( &sys_traits::impls::RealSys, lock_path.to_path_buf(), &LogReporter, "waiting", ) .await; temp_dir.write("file.txt", "update2"); signal5.notify_one(); drop(flag); } }); signal3.notified().await; assert_eq!(temp_dir.read_to_string("file.txt"), "update1"); signal4.notify_one(); signal5.notified().await; assert_eq!(temp_dir.read_to_string("file.txt"), "update2"); // ensure this is cleaned up assert!(!lock_path.with_extension("lock.poll").exists()) } #[tokio::test] async fn lax_fs_lock_ordered() { let temp_dir = TempDir::new(); let lock_path = temp_dir.path().join("file.lock"); let output_path = temp_dir.path().join("output"); let expected_order = Arc::new(Mutex::new(Vec::new())); let count = 10; let mut tasks = Vec::with_capacity(count); std::fs::write(&output_path, "").unwrap(); for i in 0..count { let lock_path = lock_path.clone(); let output_path = output_path.clone(); let expected_order = expected_order.clone(); tasks.push(tokio::spawn(async move { let flag = LaxSingleProcessFsFlag::lock( &sys_traits::impls::RealSys, lock_path.to_path_buf(), &LogReporter, "waiting", ) .await; expected_order.lock().push(i.to_string()); // be extremely racy let mut output = std::fs::read_to_string(&output_path).unwrap(); if !output.is_empty() { output.push('\n'); } output.push_str(&i.to_string()); std::fs::write(&output_path, output).unwrap(); drop(flag); })); } futures::future::join_all(tasks).await; let expected_output = expected_order.lock().join("\n"); assert_eq!( std::fs::read_to_string(output_path).unwrap(), expected_output ); } }