diff --git a/src/uu/sort/src/merge.rs b/src/uu/sort/src/merge.rs index ea212f62f..502dcda82 100644 --- a/src/uu/sort/src/merge.rs +++ b/src/uu/sort/src/merge.rs @@ -30,7 +30,7 @@ use uucore::error::{FromIo, UResult}; use crate::{ GlobalSettings, Output, SortError, chunks::{self, Chunk, RecycledChunk}, - compare_by, open, + compare_by, fd_soft_limit, open, tmp_dir::TmpDirWrapper, }; @@ -62,6 +62,28 @@ fn replace_output_file_in_input_files( Ok(()) } +/// Determine the effective merge batch size, enforcing a minimum and respecting the +/// file-descriptor soft limit after reserving stdio/output and a safety margin. +fn effective_merge_batch_size(settings: &GlobalSettings) -> usize { + const MIN_BATCH_SIZE: usize = 2; + const RESERVED_STDIO: usize = 3; + const RESERVED_OUTPUT: usize = 1; + const SAFETY_MARGIN: usize = 1; + let mut batch_size = settings.merge_batch_size.max(MIN_BATCH_SIZE); + + if let Some(limit) = fd_soft_limit() { + let reserved = RESERVED_STDIO + RESERVED_OUTPUT + SAFETY_MARGIN; + let available_inputs = limit.saturating_sub(reserved); + if available_inputs >= MIN_BATCH_SIZE { + batch_size = batch_size.min(available_inputs); + } else { + batch_size = MIN_BATCH_SIZE; + } + } + + batch_size +} + /// Merge pre-sorted `Box`s. /// /// If `settings.merge_batch_size` is greater than the length of `files`, intermediate files will be used. @@ -94,18 +116,21 @@ pub fn merge_with_file_limit< output: Output, tmp_dir: &mut TmpDirWrapper, ) -> UResult<()> { - if files.len() <= settings.merge_batch_size { + let batch_size = effective_merge_batch_size(settings); + debug_assert!(batch_size >= 2); + + if files.len() <= batch_size { let merger = merge_without_limit(files, settings); merger?.write_all(settings, output) } else { let mut temporary_files = vec![]; - let mut batch = vec![]; + let mut batch = Vec::with_capacity(batch_size); for file in files { batch.push(file); - if batch.len() >= settings.merge_batch_size { - assert_eq!(batch.len(), settings.merge_batch_size); + if batch.len() >= batch_size { + assert_eq!(batch.len(), batch_size); let merger = merge_without_limit(batch.into_iter(), settings)?; - batch = vec![]; + batch = Vec::with_capacity(batch_size); let mut tmp_file = Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?; @@ -115,7 +140,7 @@ pub fn merge_with_file_limit< } // Merge any remaining files that didn't get merged in a full batch above. if !batch.is_empty() { - assert!(batch.len() < settings.merge_batch_size); + assert!(batch.len() < batch_size); let merger = merge_without_limit(batch.into_iter(), settings)?; let mut tmp_file = diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index 3b967d042..6122089e2 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -1073,13 +1073,27 @@ fn make_sort_mode_arg(mode: &'static str, short: char, help: String) -> Arg { #[cfg(target_os = "linux")] fn get_rlimit() -> UResult { - use nix::sys::resource::{Resource, getrlimit}; + use nix::sys::resource::{RLIM_INFINITY, Resource, getrlimit}; - getrlimit(Resource::RLIMIT_NOFILE) - .map(|(rlim_cur, _)| rlim_cur as usize) + let (rlim_cur, _rlim_max) = getrlimit(Resource::RLIMIT_NOFILE) + .map_err(|_| UUsageError::new(2, translate!("sort-failed-fetch-rlimit")))?; + if rlim_cur == RLIM_INFINITY { + return Err(UUsageError::new(2, translate!("sort-failed-fetch-rlimit"))); + } + usize::try_from(rlim_cur) .map_err(|_| UUsageError::new(2, translate!("sort-failed-fetch-rlimit"))) } +#[cfg(target_os = "linux")] +pub(crate) fn fd_soft_limit() -> Option { + get_rlimit().ok() +} + +#[cfg(not(target_os = "linux"))] +pub(crate) fn fd_soft_limit() -> Option { + None +} + const STDIN_FILE: &str = "-"; /// Legacy `+POS1 [-POS2]` syntax is permitted unless `_POSIX2_VERSION` is in @@ -1232,12 +1246,12 @@ fn default_merge_batch_size() -> usize { #[cfg(target_os = "linux")] { // Adjust merge batch size dynamically based on available file descriptors. - match get_rlimit() { - Ok(limit) => { + match fd_soft_limit() { + Some(limit) => { let usable_limit = limit.saturating_div(LINUX_BATCH_DIVISOR); usable_limit.clamp(LINUX_BATCH_MIN, LINUX_BATCH_MAX) } - Err(_) => 64, + None => 64, } } @@ -1366,9 +1380,15 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> { settings.threads = matches .get_one::(options::PARALLEL) .map_or_else(|| "0".to_string(), String::from); - unsafe { - env::set_var("RAYON_NUM_THREADS", &settings.threads); - } + let num_threads = match settings.threads.parse::() { + Ok(0) | Err(_) => std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1), + Ok(n) => n, + }; + let _ = rayon::ThreadPoolBuilder::new() + .num_threads(num_threads) + .build_global(); } if let Some(size_str) = matches.get_one::(options::BUF_SIZE) { @@ -1419,7 +1439,15 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> { translate!( "sort-maximum-batch-size-rlimit", - "rlimit" => get_rlimit()? + "rlimit" => { + let Some(rlimit) = fd_soft_limit() else { + return Err(UUsageError::new( + 2, + translate!("sort-failed-fetch-rlimit"), + )); + }; + rlimit + } ) } #[cfg(not(target_os = "linux"))]