return the builder after completion

This commit is contained in:
Danny McClanahan 2025-03-06 06:05:04 -05:00
parent 6fafb724c1
commit 8838712212
No known key found for this signature in database
GPG key ID: 2FCF4469781EB9E5
2 changed files with 95 additions and 125 deletions

View file

@ -1,49 +1,29 @@
use std::{
collections::BTreeSet,
mem, ops,
path::PathBuf,
ptr,
sync::{
atomic::{AtomicPtr, Ordering},
mpsc, Mutex,
},
thread,
ops,
path::{Path, PathBuf},
sync::{Mutex, OnceLock},
};
struct VisitorBuilder {
traversal_error: AtomicPtr<ignore::Error>,
traversal_error: OnceLock<ignore::Error>,
all_matches: Mutex<BTreeSet<PathBuf>>,
result: mpsc::SyncSender<Result<BTreeSet<PathBuf>, ignore::Error>>,
}
impl VisitorBuilder {
fn new(
result: mpsc::SyncSender<Result<BTreeSet<PathBuf>, ignore::Error>>,
) -> Self {
fn new() -> Self {
Self {
traversal_error: AtomicPtr::new(ptr::null_mut()),
traversal_error: OnceLock::new(),
all_matches: Mutex::new(BTreeSet::new()),
result,
}
}
}
impl ops::Drop for VisitorBuilder {
fn drop(&mut self) {
let err_ptr = self.traversal_error.get_mut();
if !err_ptr.is_null() {
let e: Box<ignore::Error> = unsafe { Box::from_raw(*err_ptr) };
self.result.send(Err(*e)).unwrap();
return;
}
match self
.result
.send(Ok(mem::take(&mut self.all_matches.get_mut().unwrap())))
{
Ok(()) => (),
Err(_e) => {
eprintln!("failed to send result (hangup)");
}
fn into_result(self) -> Result<BTreeSet<PathBuf>, ignore::Error> {
let Self { traversal_error, all_matches } = self;
if let Some(e) = traversal_error.into_inner() {
Err(e)
} else {
Ok(all_matches.into_inner().unwrap())
}
}
}
@ -59,46 +39,23 @@ impl ignore::ParallelVisitorBuilder for VisitorBuilder {
}
struct Visitor<'s> {
traversal_error: &'s AtomicPtr<ignore::Error>,
traversal_error: &'s OnceLock<ignore::Error>,
cur_matches: Vec<PathBuf>,
all_matches: &'s Mutex<BTreeSet<PathBuf>>,
}
impl<'s> Visitor<'s> {
fn new(
traversal_error: &'s AtomicPtr<ignore::Error>,
traversal_error: &'s OnceLock<ignore::Error>,
all_matches: &'s Mutex<BTreeSet<PathBuf>>,
) -> Self {
Self { traversal_error, cur_matches: Vec::new(), all_matches }
}
#[inline(always)]
fn fatal_error_was_signaled(&self) -> bool {
!self.traversal_error.load(Ordering::Acquire).is_null()
}
fn handle_fatal_error(&self, e: ignore::Error) {
let boxed = Box::into_raw(Box::new(e));
match self.traversal_error.compare_exchange(
ptr::null_mut(),
boxed,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(prev) => {
debug_assert!(prev.is_null());
}
Err(_prior_error) => {
let e = unsafe { Box::from_raw(boxed) };
eprintln!("dropped racing error: {}", e);
}
}
}
}
impl<'s> ops::Drop for Visitor<'s> {
fn drop(&mut self) {
if !self.traversal_error.load(Ordering::Relaxed).is_null() {
if self.traversal_error.get().is_some() {
return;
}
self.all_matches.lock().unwrap().extend(self.cur_matches.drain(..));
@ -110,12 +67,12 @@ impl<'s> ignore::ParallelVisitor for Visitor<'s> {
&mut self,
entry: Result<ignore::DirEntry, ignore::Error>,
) -> ignore::WalkState {
if self.fatal_error_was_signaled() {
if self.traversal_error.get().is_some() {
return ignore::WalkState::Quit;
}
match entry {
Err(e) => {
self.handle_fatal_error(e);
let _ = self.traversal_error.set(e);
ignore::WalkState::Quit
}
Ok(entry) => {
@ -135,18 +92,16 @@ impl<'s> ignore::ParallelVisitor for Visitor<'s> {
}
}
fn main() {
let (send, recv) = mpsc::sync_channel(0);
let t = thread::spawn(move || {
ignore::WalkBuilder::new(".")
.build_parallel()
.visit(VisitorBuilder::new(send.clone()));
ignore::WalkBuilder::new("asdf")
.build_parallel()
.visit(VisitorBuilder::new(send));
});
for result in recv.iter() {
println!("result: {:?}", result.unwrap());
}
t.join().unwrap();
fn walk_dir(
dir: impl AsRef<Path>,
) -> Result<BTreeSet<PathBuf>, ignore::Error> {
ignore::WalkBuilder::new(dir)
.build_parallel()
.visit(VisitorBuilder::new())
.into_result()
}
fn main() {
println!("success: {:?}", walk_dir(".").unwrap());
println!("err: {:?}", walk_dir("asdf"));
}

View file

@ -1220,7 +1220,62 @@ impl WalkParallel {
V: FnMut(Result<DirEntry, Error>) -> WalkState + Send,
F: Fn() -> V,
{
self.visit(FnBuilder { builder: mkf })
self.visit(FnBuilder { builder: mkf });
}
fn send_root_paths<B>(&mut self, builder: &B) -> Result<Vec<Message>, ()>
where
B: ParallelVisitorBuilder,
{
let mut stack = vec![];
let mut visitor = builder.build();
let mut paths = Vec::new().into_iter();
std::mem::swap(&mut paths, &mut self.paths);
// Send the initial set of root paths to the pool of workers. Note
// that we only send directories. For files, we send to them the
// callback directly.
for path in paths {
let (dent, root_device) = if path == Path::new("-") {
(DirEntry::new_stdin(), None)
} else {
let root_device = if !self.same_file_system {
None
} else {
match device_num(&path) {
Ok(root_device) => Some(root_device),
Err(err) => {
let err = Error::Io(err).with_path(path);
if visitor.visit(Err(err)).is_quit() {
return Err(());
}
continue;
}
}
};
match DirEntryRaw::from_path(0, path, false) {
Ok(dent) => (DirEntry::new_raw(dent, None), root_device),
Err(err) => {
if visitor.visit(Err(err)).is_quit() {
return Err(());
}
continue;
}
}
};
stack.push(Message::Work(Work {
dent,
ignore: self.ig_root.clone(),
root_device,
}));
}
// ... but there's no need to start workers if we don't need them.
if stack.is_empty() {
Err(())
} else {
Ok(stack)
}
}
/// Execute the parallel recursive directory iterator using a custom
@ -1240,57 +1295,16 @@ impl WalkParallel {
/// visitor runs on only one thread, this build-up can be done without
/// synchronization. Then, once traversal is complete, all of the results
/// can be merged together into a single data structure.
pub fn visit(mut self, builder: impl ParallelVisitorBuilder) {
pub fn visit<B>(mut self, builder: B) -> B
where
B: ParallelVisitorBuilder,
{
let threads = self.threads();
let mut stack = vec![];
{
let mut visitor = builder.build();
let mut paths = Vec::new().into_iter();
std::mem::swap(&mut paths, &mut self.paths);
// Send the initial set of root paths to the pool of workers. Note
// that we only send directories. For files, we send to them the
// callback directly.
for path in paths {
let (dent, root_device) = if path == Path::new("-") {
(DirEntry::new_stdin(), None)
} else {
let root_device = if !self.same_file_system {
None
} else {
match device_num(&path) {
Ok(root_device) => Some(root_device),
Err(err) => {
let err = Error::Io(err).with_path(path);
if visitor.visit(Err(err)).is_quit() {
return;
}
continue;
}
}
};
match DirEntryRaw::from_path(0, path, false) {
Ok(dent) => {
(DirEntry::new_raw(dent, None), root_device)
}
Err(err) => {
if visitor.visit(Err(err)).is_quit() {
return;
}
continue;
}
}
};
stack.push(Message::Work(Work {
dent,
ignore: self.ig_root.clone(),
root_device,
}));
}
// ... but there's no need to start workers if we don't need them.
if stack.is_empty() {
return;
}
}
let stack = match self.send_root_paths(&builder) {
Ok(stack) => stack,
Err(()) => return builder,
};
// Create the workers and then wait for them to finish.
let quit_now = Arc::new(AtomicBool::new(false));
let active_workers = Arc::new(AtomicUsize::new(threads));
@ -1315,6 +1329,7 @@ impl WalkParallel {
handle.join().unwrap();
}
});
builder
}
fn threads(&self) -> usize {