This commit is contained in:
Danny McClanahan 2025-07-05 15:10:03 +02:00 committed by GitHub
commit e0ef33b442
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 217 additions and 78 deletions

View file

@ -180,7 +180,7 @@ fn search_parallel(args: &HiArgs, mode: SearchMode) -> anyhow::Result<bool> {
let haystack_builder = &haystack_builder;
let mut searcher = searcher.clone();
Box::new(move |result| {
move |result| {
let haystack = match haystack_builder.build_from_result(result) {
Some(haystack) => haystack,
None => return WalkState::Continue,
@ -214,7 +214,7 @@ fn search_parallel(args: &HiArgs, mode: SearchMode) -> anyhow::Result<bool> {
} else {
WalkState::Continue
}
})
}
});
if args.has_implicit_path() && !searched.load(Ordering::SeqCst) {
eprint_nothing_searched();
@ -297,7 +297,7 @@ fn files_parallel(args: &HiArgs) -> anyhow::Result<bool> {
let matched = &matched;
let tx = tx.clone();
Box::new(move |result| {
move |result| {
let haystack = match haystack_builder.build_from_result(result) {
Some(haystack) => haystack,
None => return WalkState::Continue,
@ -311,7 +311,7 @@ fn files_parallel(args: &HiArgs) -> anyhow::Result<bool> {
Err(_) => WalkState::Quit,
}
}
})
}
});
drop(tx);
if let Err(err) = print_thread.join().unwrap() {

View file

@ -0,0 +1,107 @@
use std::{
collections::BTreeSet,
ops,
path::{Path, PathBuf},
sync::{Mutex, OnceLock},
};
struct VisitorBuilder {
traversal_error: OnceLock<ignore::Error>,
all_matches: Mutex<BTreeSet<PathBuf>>,
}
impl VisitorBuilder {
fn new() -> Self {
Self {
traversal_error: OnceLock::new(),
all_matches: Mutex::new(BTreeSet::new()),
}
}
fn into_result(self) -> Result<BTreeSet<PathBuf>, ignore::Error> {
let Self { traversal_error, all_matches } = self;
if let Some(e) = traversal_error.into_inner() {
Err(e)
} else {
Ok(all_matches.into_inner().unwrap())
}
}
}
impl ignore::ParallelVisitorBuilder for VisitorBuilder {
type Visitor<'s>
= Visitor<'s>
where
Self: 's;
fn build<'s, 't: 's>(&'t self) -> Self::Visitor<'s> {
Visitor::new(&self.traversal_error, &self.all_matches)
}
}
struct Visitor<'s> {
traversal_error: &'s OnceLock<ignore::Error>,
cur_matches: Vec<PathBuf>,
all_matches: &'s Mutex<BTreeSet<PathBuf>>,
}
impl<'s> Visitor<'s> {
fn new(
traversal_error: &'s OnceLock<ignore::Error>,
all_matches: &'s Mutex<BTreeSet<PathBuf>>,
) -> Self {
Self { traversal_error, cur_matches: Vec::new(), all_matches }
}
}
impl<'s> ops::Drop for Visitor<'s> {
fn drop(&mut self) {
if self.traversal_error.get().is_some() {
return;
}
self.all_matches.lock().unwrap().extend(self.cur_matches.drain(..));
}
}
impl<'s> ignore::ParallelVisitor for Visitor<'s> {
fn visit(
&mut self,
entry: Result<ignore::DirEntry, ignore::Error>,
) -> ignore::WalkState {
if self.traversal_error.get().is_some() {
return ignore::WalkState::Quit;
}
match entry {
Err(e) => {
let _ = self.traversal_error.set(e);
ignore::WalkState::Quit
}
Ok(entry) => {
if let Some(e) = entry.error() {
eprintln!(
"non-fatal error while processing entry {:?}: {}",
&entry, e
);
}
let file_type = entry.file_type().unwrap();
if file_type.is_file() {
self.cur_matches.push(entry.into_path());
}
ignore::WalkState::Continue
}
}
}
}
fn walk_dir(
dir: impl AsRef<Path>,
) -> Result<BTreeSet<PathBuf>, ignore::Error> {
ignore::WalkBuilder::new(dir)
.build_parallel()
.visit(VisitorBuilder::new())
.into_result()
}
fn main() {
println!("success: {:?}", walk_dir(".").unwrap());
println!("err: {:?}", walk_dir("asdf"));
}

View file

@ -27,12 +27,12 @@ fn main() {
let walker = WalkBuilder::new(path).threads(6).build_parallel();
walker.run(|| {
let tx = tx.clone();
Box::new(move |result| {
move |result| {
use ignore::WalkState::*;
tx.send(DirEntry::Y(result.unwrap())).unwrap();
Continue
})
}
});
} else if simple {
let walker = WalkDir::new(path);

View file

@ -1128,15 +1128,21 @@ impl WalkState {
/// The builder will be called for each thread started by `WalkParallel`. The
/// visitor returned from each builder is then called for every directory
/// entry.
pub trait ParallelVisitorBuilder<'s> {
pub trait ParallelVisitorBuilder {
/// The visitor implementation, which may or may not share state with the builder.
type Visitor<'s>: ParallelVisitor
where
Self: 's;
/// Create per-thread `ParallelVisitor`s for `WalkParallel`.
fn build(&mut self) -> Box<dyn ParallelVisitor + 's>;
fn build<'s, 't: 's>(&'t self) -> Self::Visitor<'s>;
}
impl<'a, 's, P: ParallelVisitorBuilder<'s>> ParallelVisitorBuilder<'s>
for &'a mut P
{
fn build(&mut self) -> Box<dyn ParallelVisitor + 's> {
impl<P: ParallelVisitorBuilder> ParallelVisitorBuilder for &P {
type Visitor<'s>
= P::Visitor<'s>
where
Self: 's;
fn build<'s, 't: 's>(&'t self) -> Self::Visitor<'s> {
(**self).build()
}
}
@ -1156,23 +1162,30 @@ struct FnBuilder<F> {
builder: F,
}
impl<'s, F: FnMut() -> FnVisitor<'s>> ParallelVisitorBuilder<'s>
for FnBuilder<F>
impl<V, F> ParallelVisitorBuilder for FnBuilder<F>
where
V: FnMut(Result<DirEntry, Error>) -> WalkState + Send,
F: Fn() -> V,
{
fn build(&mut self) -> Box<dyn ParallelVisitor + 's> {
type Visitor<'s>
= FnVisitorImp<V>
where
F: 's;
fn build<'s, 't: 's>(&'t self) -> Self::Visitor<'s> {
let visitor = (self.builder)();
Box::new(FnVisitorImp { visitor })
FnVisitorImp { visitor }
}
}
type FnVisitor<'s> =
Box<dyn FnMut(Result<DirEntry, Error>) -> WalkState + Send + 's>;
struct FnVisitorImp<'s> {
visitor: FnVisitor<'s>,
struct FnVisitorImp<F> {
visitor: F,
}
impl<'s> ParallelVisitor for FnVisitorImp<'s> {
impl<F> ParallelVisitor for FnVisitorImp<F>
where
F: FnMut(Result<DirEntry, Error>) -> WalkState + Send,
{
fn visit(&mut self, entry: Result<DirEntry, Error>) -> WalkState {
(self.visitor)(entry)
}
@ -1202,11 +1215,67 @@ impl WalkParallel {
/// Execute the parallel recursive directory iterator. `mkf` is called
/// for each thread used for iteration. The function produced by `mkf`
/// is then in turn called for each visited file path.
pub fn run<'s, F>(self, mkf: F)
pub fn run<V, F>(self, mkf: F)
where
F: FnMut() -> FnVisitor<'s>,
V: FnMut(Result<DirEntry, Error>) -> WalkState + Send,
F: Fn() -> V,
{
self.visit(&mut FnBuilder { builder: mkf })
self.visit(FnBuilder { builder: mkf });
}
fn send_root_paths<B>(&mut self, builder: &B) -> Result<Vec<Message>, ()>
where
B: ParallelVisitorBuilder,
{
let mut stack = vec![];
let mut visitor = builder.build();
let mut paths = Vec::new().into_iter();
std::mem::swap(&mut paths, &mut self.paths);
// Send the initial set of root paths to the pool of workers. Note
// that we only send directories. For files, we send to them the
// callback directly.
for path in paths {
let (dent, root_device) = if path == Path::new("-") {
(DirEntry::new_stdin(), None)
} else {
let root_device = if !self.same_file_system {
None
} else {
match device_num(&path) {
Ok(root_device) => Some(root_device),
Err(err) => {
let err = Error::Io(err).with_path(path);
if visitor.visit(Err(err)).is_quit() {
return Err(());
}
continue;
}
}
};
match DirEntryRaw::from_path(0, path, false) {
Ok(dent) => (DirEntry::new_raw(dent, None), root_device),
Err(err) => {
if visitor.visit(Err(err)).is_quit() {
return Err(());
}
continue;
}
}
};
stack.push(Message::Work(Work {
dent,
ignore: self.ig_root.clone(),
root_device,
}));
}
// ... but there's no need to start workers if we don't need them.
if stack.is_empty() {
Err(())
} else {
Ok(stack)
}
}
/// Execute the parallel recursive directory iterator using a custom
@ -1226,57 +1295,16 @@ impl WalkParallel {
/// visitor runs on only one thread, this build-up can be done without
/// synchronization. Then, once traversal is complete, all of the results
/// can be merged together into a single data structure.
pub fn visit(mut self, builder: &mut dyn ParallelVisitorBuilder<'_>) {
pub fn visit<B>(mut self, builder: B) -> B
where
B: ParallelVisitorBuilder,
{
let threads = self.threads();
let mut stack = vec![];
{
let mut visitor = builder.build();
let mut paths = Vec::new().into_iter();
std::mem::swap(&mut paths, &mut self.paths);
// Send the initial set of root paths to the pool of workers. Note
// that we only send directories. For files, we send to them the
// callback directly.
for path in paths {
let (dent, root_device) = if path == Path::new("-") {
(DirEntry::new_stdin(), None)
} else {
let root_device = if !self.same_file_system {
None
} else {
match device_num(&path) {
Ok(root_device) => Some(root_device),
Err(err) => {
let err = Error::Io(err).with_path(path);
if visitor.visit(Err(err)).is_quit() {
return;
}
continue;
}
}
};
match DirEntryRaw::from_path(0, path, false) {
Ok(dent) => {
(DirEntry::new_raw(dent, None), root_device)
}
Err(err) => {
if visitor.visit(Err(err)).is_quit() {
return;
}
continue;
}
}
};
stack.push(Message::Work(Work {
dent,
ignore: self.ig_root.clone(),
root_device,
}));
}
// ... but there's no need to start workers if we don't need them.
if stack.is_empty() {
return;
}
}
let stack = match self.send_root_paths(&builder) {
Ok(stack) => stack,
Err(()) => return builder,
};
// Create the workers and then wait for them to finish.
let quit_now = Arc::new(AtomicBool::new(false));
let active_workers = Arc::new(AtomicUsize::new(threads));
@ -1301,6 +1329,7 @@ impl WalkParallel {
handle.join().unwrap();
}
});
builder
}
fn threads(&self) -> usize {
@ -1457,9 +1486,9 @@ impl Stack {
/// ignore matchers, producing new work and invoking the caller's callback.
///
/// Note that a worker is *both* a producer and a consumer.
struct Worker<'s> {
struct Worker<V> {
/// The caller's callback.
visitor: Box<dyn ParallelVisitor + 's>,
visitor: V,
/// A work-stealing stack of work to do.
///
/// We use a stack instead of a channel because a stack lets us visit
@ -1490,7 +1519,10 @@ struct Worker<'s> {
filter: Option<Filter>,
}
impl<'s> Worker<'s> {
impl<V> Worker<V>
where
V: ParallelVisitor,
{
/// Runs this worker until there is no more work left to do.
///
/// The worker will call the caller's callback for all entries that aren't