mirror of
https://github.com/uutils/coreutils.git
synced 2025-12-23 08:47:37 +00:00
fix(sort): GNU sort-continue.sh test (#9107)
* feat: dynamically adjust merge batch size based on file descriptor limits - Add `effective_merge_batch_size()` function to calculate batch size considering fd soft limit, with minimums and safety margins. - Generalize fd limit handling from Linux-only to Unix systems using `fd_soft_limit()`. - Update merge logic to use dynamic batch size instead of fixed `settings.merge_batch_size` to prevent fd exhaustion. * fix(sort): update rlimit fetching to use fd_soft_limit with error handling Replace direct call to get_rlimit()? with fd_soft_limit(), adding a check for None value to return a usage error if rlimit cannot be fetched. This improves robustness on Linux by ensuring proper error handling when retrieving the file descriptor soft limit. * refactor(sort): restrict nix::libc and fd_soft_limit to Linux Update conditional compilation attributes from #[cfg(unix)] to #[cfg(target_os = "linux")] for the nix::libc import and fd_soft_limit function implementations, ensuring these features are only enabled on Linux systems to improve portability and avoid issues on other Unix-like platforms. * refactor: improve thread management and replace unsafe libc calls Replace unsafe libc::getrlimit calls in fd_soft_limit with safe nix crate usage. Update Rayon thread configuration to use ThreadPoolBuilder instead of environment variables for better control. Add documentation comment to effective_merge_batch_size function for clarity. * refactor(linux): improve error handling in fd_soft_limit function Extract the rlimit fetching logic into a separate `get_rlimit` function that returns `UResult<usize>` and properly handles errors with `UUsageError`, instead of silently returning `None` on failure or infinity. This provides better error reporting for resource limit issues on Linux platforms. * refactor(sort): reorder imports in get_rlimit for consistency Reordered the nix::sys::resource imports to group constants first (RLIM_INFINITY), then types (Resource), and finally functions (getrlimit), improving code readability and adhering to import style guidelines.
This commit is contained in:
parent
1044809bdf
commit
ccd4bbdc8f
2 changed files with 70 additions and 17 deletions
|
|
@ -30,7 +30,7 @@ use uucore::error::{FromIo, UResult};
|
|||
use crate::{
|
||||
GlobalSettings, Output, SortError,
|
||||
chunks::{self, Chunk, RecycledChunk},
|
||||
compare_by, open,
|
||||
compare_by, fd_soft_limit, open,
|
||||
tmp_dir::TmpDirWrapper,
|
||||
};
|
||||
|
||||
|
|
@ -62,6 +62,28 @@ fn replace_output_file_in_input_files(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Determine the effective merge batch size, enforcing a minimum and respecting the
|
||||
/// file-descriptor soft limit after reserving stdio/output and a safety margin.
|
||||
fn effective_merge_batch_size(settings: &GlobalSettings) -> usize {
|
||||
const MIN_BATCH_SIZE: usize = 2;
|
||||
const RESERVED_STDIO: usize = 3;
|
||||
const RESERVED_OUTPUT: usize = 1;
|
||||
const SAFETY_MARGIN: usize = 1;
|
||||
let mut batch_size = settings.merge_batch_size.max(MIN_BATCH_SIZE);
|
||||
|
||||
if let Some(limit) = fd_soft_limit() {
|
||||
let reserved = RESERVED_STDIO + RESERVED_OUTPUT + SAFETY_MARGIN;
|
||||
let available_inputs = limit.saturating_sub(reserved);
|
||||
if available_inputs >= MIN_BATCH_SIZE {
|
||||
batch_size = batch_size.min(available_inputs);
|
||||
} else {
|
||||
batch_size = MIN_BATCH_SIZE;
|
||||
}
|
||||
}
|
||||
|
||||
batch_size
|
||||
}
|
||||
|
||||
/// Merge pre-sorted `Box<dyn Read>`s.
|
||||
///
|
||||
/// If `settings.merge_batch_size` is greater than the length of `files`, intermediate files will be used.
|
||||
|
|
@ -94,18 +116,21 @@ pub fn merge_with_file_limit<
|
|||
output: Output,
|
||||
tmp_dir: &mut TmpDirWrapper,
|
||||
) -> UResult<()> {
|
||||
if files.len() <= settings.merge_batch_size {
|
||||
let batch_size = effective_merge_batch_size(settings);
|
||||
debug_assert!(batch_size >= 2);
|
||||
|
||||
if files.len() <= batch_size {
|
||||
let merger = merge_without_limit(files, settings);
|
||||
merger?.write_all(settings, output)
|
||||
} else {
|
||||
let mut temporary_files = vec![];
|
||||
let mut batch = vec![];
|
||||
let mut batch = Vec::with_capacity(batch_size);
|
||||
for file in files {
|
||||
batch.push(file);
|
||||
if batch.len() >= settings.merge_batch_size {
|
||||
assert_eq!(batch.len(), settings.merge_batch_size);
|
||||
if batch.len() >= batch_size {
|
||||
assert_eq!(batch.len(), batch_size);
|
||||
let merger = merge_without_limit(batch.into_iter(), settings)?;
|
||||
batch = vec![];
|
||||
batch = Vec::with_capacity(batch_size);
|
||||
|
||||
let mut tmp_file =
|
||||
Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?;
|
||||
|
|
@ -115,7 +140,7 @@ pub fn merge_with_file_limit<
|
|||
}
|
||||
// Merge any remaining files that didn't get merged in a full batch above.
|
||||
if !batch.is_empty() {
|
||||
assert!(batch.len() < settings.merge_batch_size);
|
||||
assert!(batch.len() < batch_size);
|
||||
let merger = merge_without_limit(batch.into_iter(), settings)?;
|
||||
|
||||
let mut tmp_file =
|
||||
|
|
|
|||
|
|
@ -1073,13 +1073,27 @@ fn make_sort_mode_arg(mode: &'static str, short: char, help: String) -> Arg {
|
|||
|
||||
#[cfg(target_os = "linux")]
|
||||
fn get_rlimit() -> UResult<usize> {
|
||||
use nix::sys::resource::{Resource, getrlimit};
|
||||
use nix::sys::resource::{RLIM_INFINITY, Resource, getrlimit};
|
||||
|
||||
getrlimit(Resource::RLIMIT_NOFILE)
|
||||
.map(|(rlim_cur, _)| rlim_cur as usize)
|
||||
let (rlim_cur, _rlim_max) = getrlimit(Resource::RLIMIT_NOFILE)
|
||||
.map_err(|_| UUsageError::new(2, translate!("sort-failed-fetch-rlimit")))?;
|
||||
if rlim_cur == RLIM_INFINITY {
|
||||
return Err(UUsageError::new(2, translate!("sort-failed-fetch-rlimit")));
|
||||
}
|
||||
usize::try_from(rlim_cur)
|
||||
.map_err(|_| UUsageError::new(2, translate!("sort-failed-fetch-rlimit")))
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub(crate) fn fd_soft_limit() -> Option<usize> {
|
||||
get_rlimit().ok()
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
pub(crate) fn fd_soft_limit() -> Option<usize> {
|
||||
None
|
||||
}
|
||||
|
||||
const STDIN_FILE: &str = "-";
|
||||
|
||||
/// Legacy `+POS1 [-POS2]` syntax is permitted unless `_POSIX2_VERSION` is in
|
||||
|
|
@ -1232,12 +1246,12 @@ fn default_merge_batch_size() -> usize {
|
|||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
// Adjust merge batch size dynamically based on available file descriptors.
|
||||
match get_rlimit() {
|
||||
Ok(limit) => {
|
||||
match fd_soft_limit() {
|
||||
Some(limit) => {
|
||||
let usable_limit = limit.saturating_div(LINUX_BATCH_DIVISOR);
|
||||
usable_limit.clamp(LINUX_BATCH_MIN, LINUX_BATCH_MAX)
|
||||
}
|
||||
Err(_) => 64,
|
||||
None => 64,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1366,9 +1380,15 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> {
|
|||
settings.threads = matches
|
||||
.get_one::<String>(options::PARALLEL)
|
||||
.map_or_else(|| "0".to_string(), String::from);
|
||||
unsafe {
|
||||
env::set_var("RAYON_NUM_THREADS", &settings.threads);
|
||||
}
|
||||
let num_threads = match settings.threads.parse::<usize>() {
|
||||
Ok(0) | Err(_) => std::thread::available_parallelism()
|
||||
.map(|n| n.get())
|
||||
.unwrap_or(1),
|
||||
Ok(n) => n,
|
||||
};
|
||||
let _ = rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(num_threads)
|
||||
.build_global();
|
||||
}
|
||||
|
||||
if let Some(size_str) = matches.get_one::<String>(options::BUF_SIZE) {
|
||||
|
|
@ -1419,7 +1439,15 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> {
|
|||
|
||||
translate!(
|
||||
"sort-maximum-batch-size-rlimit",
|
||||
"rlimit" => get_rlimit()?
|
||||
"rlimit" => {
|
||||
let Some(rlimit) = fd_soft_limit() else {
|
||||
return Err(UUsageError::new(
|
||||
2,
|
||||
translate!("sort-failed-fetch-rlimit"),
|
||||
));
|
||||
};
|
||||
rlimit
|
||||
}
|
||||
)
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue