internal: Load VFS config changes in parallel

This commit is contained in:
Lukas Wirth 2024-08-02 12:57:15 +02:00
parent 758ad25229
commit 8286847bee
7 changed files with 128 additions and 80 deletions

View file

@ -10,12 +10,14 @@
use std::{
fs,
path::{Component, Path},
sync::atomic::AtomicUsize,
};
use crossbeam_channel::{never, select, unbounded, Receiver, Sender};
use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use paths::{AbsPath, AbsPathBuf, Utf8PathBuf};
use vfs::loader;
use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator as _, ParallelIterator};
use vfs::loader::{self, LoadingProgress};
use walkdir::WalkDir;
#[derive(Debug)]
@ -104,35 +106,61 @@ impl NotifyActor {
let config_version = config.version;
let n_total = config.load.len();
self.send(loader::Message::Progress {
self.watched_entries.clear();
let send = |msg| (self.sender)(msg);
send(loader::Message::Progress {
n_total,
n_done: None,
n_done: LoadingProgress::Started,
config_version,
dir: None,
});
self.watched_entries.clear();
for (i, entry) in config.load.into_iter().enumerate() {
let watch = config.watch.contains(&i);
if watch {
self.watched_entries.push(entry.clone());
let (entry_tx, entry_rx) = unbounded();
let (watch_tx, watch_rx) = unbounded();
let processed = AtomicUsize::new(0);
config.load.into_par_iter().enumerate().for_each(move |(i, entry)| {
let do_watch = config.watch.contains(&i);
if do_watch {
_ = entry_tx.send(entry.clone());
}
let files =
self.load_entry(entry, watch, |file| loader::Message::Progress {
n_total,
n_done: Some(i),
dir: Some(file),
config_version,
});
self.send(loader::Message::Loaded { files });
self.send(loader::Message::Progress {
let files = Self::load_entry(
|f| _ = watch_tx.send(f.to_owned()),
entry,
do_watch,
|file| {
send(loader::Message::Progress {
n_total,
n_done: LoadingProgress::Progress(
processed.load(std::sync::atomic::Ordering::Relaxed),
),
dir: Some(file),
config_version,
})
},
);
send(loader::Message::Loaded { files });
send(loader::Message::Progress {
n_total,
n_done: Some(i + 1),
n_done: LoadingProgress::Progress(
processed.fetch_add(1, std::sync::atomic::Ordering::AcqRel) + 1,
),
config_version,
dir: None,
});
});
for path in watch_rx {
self.watch(&path);
}
for entry in entry_rx {
self.watched_entries.push(entry);
}
self.send(loader::Message::Progress {
n_total,
n_done: LoadingProgress::Finished,
config_version,
dir: None,
});
}
Message::Invalidate(path) => {
let contents = read(path.as_path());
@ -142,60 +170,67 @@ impl NotifyActor {
},
Event::NotifyEvent(event) => {
if let Some(event) = log_notify_error(event) {
let files = event
.paths
.into_iter()
.filter_map(|path| {
Some(
AbsPathBuf::try_from(Utf8PathBuf::from_path_buf(path).ok()?)
if let EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) =
event.kind
{
let files = event
.paths
.into_iter()
.filter_map(|path| {
Some(
AbsPathBuf::try_from(
Utf8PathBuf::from_path_buf(path).ok()?,
)
.expect("path is absolute"),
)
})
.filter_map(|path| {
let meta = fs::metadata(&path).ok()?;
if meta.file_type().is_dir()
&& self
)
})
.filter_map(|path| {
let meta = fs::metadata(&path).ok()?;
if meta.file_type().is_dir()
&& self
.watched_entries
.iter()
.any(|entry| entry.contains_dir(&path))
{
self.watch(path.as_ref());
return None;
}
if !meta.file_type().is_file() {
return None;
}
if !self
.watched_entries
.iter()
.any(|entry| entry.contains_dir(&path))
{
self.watch(path);
return None;
}
.any(|entry| entry.contains_file(&path))
{
return None;
}
if !meta.file_type().is_file() {
return None;
}
if !self
.watched_entries
.iter()
.any(|entry| entry.contains_file(&path))
{
return None;
}
let contents = read(&path);
Some((path, contents))
})
.collect();
self.send(loader::Message::Changed { files });
let contents = read(&path);
Some((path, contents))
})
.collect();
self.send(loader::Message::Changed { files });
}
}
}
}
}
}
fn load_entry(
&mut self,
mut watch: impl FnMut(&Path),
entry: loader::Entry,
watch: bool,
make_message: impl Fn(AbsPathBuf) -> loader::Message,
do_watch: bool,
send_message: impl Fn(AbsPathBuf),
) -> Vec<(AbsPathBuf, Option<Vec<u8>>)> {
match entry {
loader::Entry::Files(files) => files
.into_iter()
.map(|file| {
if watch {
self.watch(file.clone());
if do_watch {
watch(file.as_ref());
}
let contents = read(file.as_path());
(file, contents)
@ -205,7 +240,7 @@ impl NotifyActor {
let mut res = Vec::new();
for root in &dirs.include {
self.send(make_message(root.clone()));
send_message(root.clone());
let walkdir =
WalkDir::new(root).follow_links(true).into_iter().filter_entry(|entry| {
if !entry.file_type().is_dir() {
@ -213,7 +248,7 @@ impl NotifyActor {
}
let path = entry.path();
if path_is_parent_symlink(path) {
if path_might_be_cyclic(path) {
return false;
}
@ -230,10 +265,10 @@ impl NotifyActor {
)
.ok()?;
if depth < 2 && is_dir {
self.send(make_message(abs_path.clone()));
send_message(abs_path.clone());
}
if is_dir && watch {
self.watch(abs_path.clone());
if is_dir && do_watch {
watch(abs_path.as_ref());
}
if !is_file {
return None;
@ -255,12 +290,13 @@ impl NotifyActor {
}
}
fn watch(&mut self, path: AbsPathBuf) {
fn watch(&mut self, path: &Path) {
if let Some((watcher, _)) = &mut self.watcher {
log_notify_error(watcher.watch(path.as_ref(), RecursiveMode::NonRecursive));
log_notify_error(watcher.watch(path, RecursiveMode::NonRecursive));
}
}
fn send(&mut self, msg: loader::Message) {
fn send(&self, msg: loader::Message) {
(self.sender)(msg);
}
}
@ -279,7 +315,7 @@ fn log_notify_error<T>(res: notify::Result<T>) -> Option<T> {
/// heuristic is not sufficient to catch all symlink cycles (it's
/// possible to construct cycle using two or more symlinks), but it
/// catches common cases.
fn path_is_parent_symlink(path: &Path) -> bool {
fn path_might_be_cyclic(path: &Path) -> bool {
let Ok(destination) = std::fs::read_link(path) else {
return false;
};