diff --git a/.github/workflows/ci.generate.ts b/.github/workflows/ci.generate.ts index f28ade6223..8499d48210 100755 --- a/.github/workflows/ci.generate.ts +++ b/.github/workflows/ci.generate.ts @@ -1250,8 +1250,9 @@ const ci = { "cargo check --target wasm32-unknown-unknown -p deno_resolver && cargo check --target wasm32-unknown-unknown -p deno_resolver --features graph", }, { - name: "Cargo check (deno_npm_cache)", - run: "cargo check --target wasm32-unknown-unknown -p deno_npm_cache", + name: "Cargo check (deno_npm_installer)", + run: + "cargo check --target wasm32-unknown-unknown -p deno_npm_installer", }, ]), }, diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d33817f998..371a72075a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -808,8 +808,8 @@ jobs: - name: Cargo check (deno_resolver) run: cargo check --target wasm32-unknown-unknown -p deno_resolver && cargo check --target wasm32-unknown-unknown -p deno_resolver --features graph if: '!(matrix.skip)' - - name: Cargo check (deno_npm_cache) - run: cargo check --target wasm32-unknown-unknown -p deno_npm_cache + - name: Cargo check (deno_npm_installer) + run: cargo check --target wasm32-unknown-unknown -p deno_npm_installer if: '!(matrix.skip)' publish-canary: name: publish canary diff --git a/resolvers/npm_installer/extra_info.rs b/resolvers/npm_installer/extra_info.rs index ac4bbc4aa2..ec1fa7ed22 100644 --- a/resolvers/npm_installer/extra_info.rs +++ b/resolvers/npm_installer/extra_info.rs @@ -156,7 +156,7 @@ impl NpmPackageExtraInfoProvider { let package_json_path = package_path.join("package.json"); let sys = self.sys.clone(); let extra_info: NpmPackageExtraInfo = - deno_unsync::spawn_blocking(move || { + crate::rt::spawn_blocking(move || { let package_json = sys .base_fs_read(&package_json_path) .map_err(JsErrorBox::from_err)?; diff --git a/resolvers/npm_installer/flag.rs b/resolvers/npm_installer/flag.rs index 358798d7dd..2e6f0431c6 100644 --- a/resolvers/npm_installer/flag.rs +++ b/resolvers/npm_installer/flag.rs @@ -1,182 +1,220 @@ // Copyright 2018-2025 the Deno authors. MIT license. -use std::path::PathBuf; -use std::sync::Arc; -use std::time::Duration; +pub use inner::LaxSingleProcessFsFlag; +pub use inner::LaxSingleProcessFsFlagSys; -use deno_unsync::sync::AtomicFlag; -use sys_traits::FsFileLock; -use sys_traits::FsMetadataValue; +#[cfg(not(target_arch = "wasm32"))] +mod inner { + use std::path::PathBuf; + use std::sync::Arc; + use std::time::Duration; -use crate::Reporter; + use deno_unsync::sync::AtomicFlag; + use sys_traits::FsFileLock; + use sys_traits::FsMetadataValue; -#[sys_traits::auto_impl] -pub trait LaxSingleProcessFsFlagSys: - sys_traits::FsOpen - + sys_traits::FsMetadata - + sys_traits::FsWrite - + sys_traits::ThreadSleep - + sys_traits::SystemTimeNow - + Clone - + Send - + Sync - + 'static -{ -} + use crate::Reporter; -struct LaxSingleProcessFsFlagInner { - file_path: PathBuf, - fs_file: TSys::File, - finished_flag: Arc, -} + #[sys_traits::auto_impl] + pub trait LaxSingleProcessFsFlagSys: + sys_traits::FsOpen + + sys_traits::FsMetadata + + sys_traits::FsWrite + + sys_traits::ThreadSleep + + sys_traits::SystemTimeNow + + Clone + + Send + + Sync + + 'static + { + } -impl Drop - for LaxSingleProcessFsFlagInner -{ - fn drop(&mut self) { - // kill the poll thread - self.finished_flag.raise(); - // release the file lock - if let Err(err) = self.fs_file.fs_file_unlock() { - log::debug!( - "Failed releasing lock for {}. {:#}", - self.file_path.display(), - err - ); + struct LaxSingleProcessFsFlagInner { + file_path: PathBuf, + fs_file: TSys::File, + finished_flag: Arc, + } + + impl Drop + for LaxSingleProcessFsFlagInner + { + fn drop(&mut self) { + // kill the poll thread + self.finished_flag.raise(); + // release the file lock + if let Err(err) = self.fs_file.fs_file_unlock() { + log::debug!( + "Failed releasing lock for {}. {:#}", + self.file_path.display(), + err + ); + } } } -} -/// A file system based flag that will attempt to synchronize multiple -/// processes so they go one after the other. In scenarios where -/// synchronization cannot be achieved, it will allow the current process -/// to proceed. -/// -/// This should only be used in places where it's ideal for multiple -/// processes to not update something on the file system at the same time, -/// but it's not that big of a deal. -pub struct LaxSingleProcessFsFlag( - #[allow(dead_code)] Option>, -); + /// A file system based flag that will attempt to synchronize multiple + /// processes so they go one after the other. In scenarios where + /// synchronization cannot be achieved, it will allow the current process + /// to proceed. + /// + /// This should only be used in places where it's ideal for multiple + /// processes to not update something on the file system at the same time, + /// but it's not that big of a deal. + pub struct LaxSingleProcessFsFlag( + #[allow(dead_code)] Option>, + ); -impl LaxSingleProcessFsFlag { - pub async fn lock( - sys: &TSys, - file_path: PathBuf, - reporter: &impl Reporter, - long_wait_message: &str, - ) -> Self { - log::debug!("Acquiring file lock at {}", file_path.display()); - let last_updated_path = file_path.with_extension("lock.poll"); - let start_instant = std::time::Instant::now(); - let mut open_options = sys_traits::OpenOptions::new(); - open_options.create = true; - open_options.read = true; - open_options.write = true; - let open_result = sys.fs_open(&file_path, &open_options); + impl LaxSingleProcessFsFlag { + pub async fn lock( + sys: &TSys, + file_path: PathBuf, + reporter: &impl Reporter, + long_wait_message: &str, + ) -> Self { + log::debug!("Acquiring file lock at {}", file_path.display()); + let last_updated_path = file_path.with_extension("lock.poll"); + let start_instant = std::time::Instant::now(); + let mut open_options = sys_traits::OpenOptions::new(); + open_options.create = true; + open_options.read = true; + open_options.write = true; + let open_result = sys.fs_open(&file_path, &open_options); - match open_result { - Ok(mut fs_file) => { - let mut pb_update_guard = None; - let mut error_count = 0; - while error_count < 10 { - let lock_result = - fs_file.fs_file_try_lock(sys_traits::FsFileLockMode::Exclusive); - let poll_file_update_ms = 100; - match lock_result { - Ok(_) => { - log::debug!("Acquired file lock at {}", file_path.display()); - let _ignore = sys.fs_write(&last_updated_path, ""); - let finished_flag = Arc::new(AtomicFlag::lowered()); + match open_result { + Ok(mut fs_file) => { + let mut pb_update_guard = None; + let mut error_count = 0; + while error_count < 10 { + let lock_result = + fs_file.fs_file_try_lock(sys_traits::FsFileLockMode::Exclusive); + let poll_file_update_ms = 100; + match lock_result { + Ok(_) => { + log::debug!("Acquired file lock at {}", file_path.display()); + let _ignore = sys.fs_write(&last_updated_path, ""); + let finished_flag = Arc::new(AtomicFlag::lowered()); - // Spawn a blocking task that will continually update a file - // signalling the lock is alive. This is a fail safe for when - // a file lock is never released. For example, on some operating - // systems, if a process does not release the lock (say it's - // killed), then the OS may release it at an indeterminate time - // - // This uses a blocking task because we use a single threaded - // runtime and this is time sensitive so we don't want it to update - // at the whims of whatever is occurring on the runtime thread. - let sys = sys.clone(); - deno_unsync::spawn_blocking({ - let finished_flag = finished_flag.clone(); - let last_updated_path = last_updated_path.clone(); - move || { - let mut i = 0; - while !finished_flag.is_raised() { - i += 1; - let _ignore = - sys.fs_write(&last_updated_path, i.to_string()); - sys - .thread_sleep(Duration::from_millis(poll_file_update_ms)); + // Spawn a blocking task that will continually update a file + // signalling the lock is alive. This is a fail safe for when + // a file lock is never released. For example, on some operating + // systems, if a process does not release the lock (say it's + // killed), then the OS may release it at an indeterminate time + // + // This uses a blocking task because we use a single threaded + // runtime and this is time sensitive so we don't want it to update + // at the whims of whatever is occurring on the runtime thread. + let sys = sys.clone(); + deno_unsync::spawn_blocking({ + let finished_flag = finished_flag.clone(); + let last_updated_path = last_updated_path.clone(); + move || { + let mut i = 0; + while !finished_flag.is_raised() { + i += 1; + let _ignore = + sys.fs_write(&last_updated_path, i.to_string()); + sys.thread_sleep(Duration::from_millis( + poll_file_update_ms, + )); + } } - } - }); + }); - return Self(Some(LaxSingleProcessFsFlagInner { - file_path, - fs_file, - finished_flag, - })); - } - Err(_) => { - // show a message if it's been a while - if pb_update_guard.is_none() - && start_instant.elapsed().as_millis() > 1_000 - { - let guard = reporter.on_blocking(long_wait_message); - pb_update_guard = Some(guard); + return Self(Some(LaxSingleProcessFsFlagInner { + file_path, + fs_file, + finished_flag, + })); } + Err(_) => { + // show a message if it's been a while + if pb_update_guard.is_none() + && start_instant.elapsed().as_millis() > 1_000 + { + let guard = reporter.on_blocking(long_wait_message); + pb_update_guard = Some(guard); + } - // sleep for a little bit - tokio::time::sleep(Duration::from_millis(20)).await; + // sleep for a little bit + tokio::time::sleep(Duration::from_millis(20)).await; - // Poll the last updated path to check if it's stopped updating, - // which is an indication that the file lock is claimed, but - // was never properly released. - match sys - .fs_metadata(&last_updated_path) - .and_then(|p| p.modified()) - { - Ok(last_updated_time) => { - let current_time = sys.sys_time_now(); - match current_time.duration_since(last_updated_time) { - Ok(duration) => { - if duration.as_millis() - > (poll_file_update_ms * 2) as u128 - { - // the other process hasn't updated this file in a long time - // so maybe it was killed and the operating system hasn't - // released the file lock yet - return Self(None); - } else { - error_count = 0; // reset + // Poll the last updated path to check if it's stopped updating, + // which is an indication that the file lock is claimed, but + // was never properly released. + match sys + .fs_metadata(&last_updated_path) + .and_then(|p| p.modified()) + { + Ok(last_updated_time) => { + let current_time = sys.sys_time_now(); + match current_time.duration_since(last_updated_time) { + Ok(duration) => { + if duration.as_millis() + > (poll_file_update_ms * 2) as u128 + { + // the other process hasn't updated this file in a long time + // so maybe it was killed and the operating system hasn't + // released the file lock yet + return Self(None); + } else { + error_count = 0; // reset + } + } + Err(_) => { + error_count += 1; } } - Err(_) => { - error_count += 1; - } } - } - Err(_) => { - error_count += 1; + Err(_) => { + error_count += 1; + } } } } } - } - drop(pb_update_guard); // explicit for clarity - Self(None) + drop(pb_update_guard); // explicit for clarity + Self(None) + } + Err(err) => { + log::debug!( + "Failed to open file lock at {}. {:#}", + file_path.display(), + err + ); + Self(None) // let the process through + } } - Err(err) => { - log::debug!( - "Failed to open file lock at {}. {:#}", - file_path.display(), - err - ); - Self(None) // let the process through + } + } +} + +#[cfg(target_arch = "wasm32")] +mod inner { + use std::marker::PhantomData; + use std::path::PathBuf; + + use crate::Reporter; + + // Don't bother locking the folder when installing via Wasm for now. + // In the future, what we'd need is a way to spawn a thread (worker) + // and have it reliably do the update of the .poll file + #[sys_traits::auto_impl] + pub trait LaxSingleProcessFsFlagSys: Clone + Send + Sync + 'static {} + + pub struct LaxSingleProcessFsFlag { + _data: PhantomData, + } + + impl LaxSingleProcessFsFlag { + pub async fn lock( + _sys: &TSys, + _file_path: PathBuf, + _reporter: &impl Reporter, + _long_wait_message: &str, + ) -> Self { + Self { + _data: Default::default(), } } } diff --git a/resolvers/npm_installer/lib.rs b/resolvers/npm_installer/lib.rs index 617c087d15..53935f905f 100644 --- a/resolvers/npm_installer/lib.rs +++ b/resolvers/npm_installer/lib.rs @@ -28,6 +28,7 @@ mod local; pub mod package_json; pub mod process_state; pub mod resolution; +mod rt; pub use bin_entries::BinEntries; pub use bin_entries::BinEntriesError; diff --git a/resolvers/npm_installer/local.rs b/resolvers/npm_installer/local.rs index 93bfb6927e..f2d9e5a642 100644 --- a/resolvers/npm_installer/local.rs +++ b/resolvers/npm_installer/local.rs @@ -46,6 +46,7 @@ use sys_traits::FsWrite; use crate::bin_entries::EntrySetupOutcome; use crate::bin_entries::SetupBinEntrySys; use crate::flag::LaxSingleProcessFsFlag; +use crate::flag::LaxSingleProcessFsFlagSys; use crate::fs::clone_dir_recursive; use crate::fs::symlink_dir; use crate::fs::CloneDirRecursiveSys; @@ -71,10 +72,10 @@ pub trait LocalNpmInstallSys: NpmCacheSys + CloneDirRecursiveSys + SetupBinEntrySys + + LaxSingleProcessFsFlagSys + sys_traits::EnvVar + sys_traits::FsSymlinkDir + sys_traits::FsCreateJunction - + sys_traits::SystemTimeNow { } @@ -325,7 +326,7 @@ impl< let cache_folder = self.npm_cache.package_folder_for_nv(&package.id.nv); - let handle = deno_unsync::spawn_blocking({ + let handle = crate::rt::spawn_blocking({ let package_path = package_path.clone(); let sys = self.sys.clone(); move || { @@ -464,7 +465,7 @@ impl< async move { let from_path = patch_pkg.target_dir.clone(); let sys = self.sys.clone(); - deno_unsync::spawn_blocking({ + crate::rt::spawn_blocking({ move || { clone_dir_recrusive_except_node_modules_child( &sys, &from_path, &target, @@ -511,7 +512,7 @@ impl< cache_futures.push( async move { let sys = self.sys.clone(); - deno_unsync::spawn_blocking(move || { + crate::rt::spawn_blocking(move || { clone_dir_recursive(&sys, &source_path, &package_path) .map_err(JsErrorBox::from_err)?; // write out a file that indicates this folder has been initialized diff --git a/resolvers/npm_installer/process_state.rs b/resolvers/npm_installer/process_state.rs index 67639b4691..5dd72ac8f8 100644 --- a/resolvers/npm_installer/process_state.rs +++ b/resolvers/npm_installer/process_state.rs @@ -75,10 +75,13 @@ impl NpmProcessState { match self { FdOrPath::Fd(fd) => { #[cfg(target_arch = "wasm32")] - return Err(std::io::Error::new( - ErrorKind::Unsupported, - "Cannot pass fd for npm process state to Wasm. Use a file path instead.", - )); + { + let _fd = fd; + return Err(std::io::Error::new( + ErrorKind::Unsupported, + "Cannot pass fd for npm process state to Wasm. Use a file path instead.", + )); + } #[cfg(all(unix, not(target_arch = "wasm32")))] return Ok( // SAFETY: Assume valid file descriptor diff --git a/resolvers/npm_installer/rt.rs b/resolvers/npm_installer/rt.rs new file mode 100644 index 0000000000..83fbbb1da2 --- /dev/null +++ b/resolvers/npm_installer/rt.rs @@ -0,0 +1,24 @@ +// Copyright 2018-2025 the Deno authors. MIT license. + +#[cfg(not(target_arch = "wasm32"))] +use deno_unsync::JoinHandle; +#[cfg(target_arch = "wasm32")] +pub type JoinHandle = + std::future::Ready>; + +pub fn spawn_blocking< + F: (FnOnce() -> R) + Send + 'static, + R: Send + 'static, +>( + f: F, +) -> JoinHandle { + #[cfg(target_arch = "wasm32")] + { + let result = f(); + std::future::ready(Ok(result)) + } + #[cfg(not(target_arch = "wasm32"))] + { + deno_unsync::spawn_blocking(f) + } +}