use Roots in watcher

This commit is contained in:
Bernardo 2019-01-25 18:39:35 +01:00 committed by Aleksey Kladov
parent 86fadbd4e5
commit d63e1cebff
4 changed files with 275 additions and 331 deletions

View file

@ -1,95 +1,72 @@
use std::{
fmt, fs,
path::{Path, PathBuf},
sync::Arc,
thread,
};
use std::{fs, sync::Arc, thread};
use crossbeam_channel::{Receiver, Sender};
use parking_lot::Mutex;
use relative_path::RelativePathBuf;
use thread_worker::WorkerHandle;
use walkdir::WalkDir;
mod watcher;
use watcher::Watcher;
pub use watcher::WatcherChange;
use crate::{RootFilter, VfsRoot};
use crate::{RootFilter, Roots, VfsRoot};
pub(crate) enum Task {
AddRoot {
root: VfsRoot,
path: PathBuf,
root_filter: Arc<RootFilter>,
nested_roots: Vec<PathBuf>,
},
/// this variant should only be created by the watcher
HandleChange(WatcherChange),
LoadChange(WatcherChange),
Watch {
dir: PathBuf,
root_filter: Arc<RootFilter>,
filter: Arc<RootFilter>,
},
}
#[derive(Debug)]
pub struct AddRootResult {
pub(crate) root: VfsRoot,
pub(crate) files: Vec<(RelativePathBuf, String)>,
}
#[derive(Debug)]
pub enum WatcherChangeData {
Create { path: PathBuf, text: String },
Write { path: PathBuf, text: String },
Remove { path: PathBuf },
}
pub enum TaskResult {
AddRoot(AddRootResult),
HandleChange(WatcherChange),
LoadChange(WatcherChangeData),
}
impl fmt::Debug for TaskResult {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
TaskResult::AddRoot(..) => f.write_str("TaskResult::AddRoot(..)"),
TaskResult::HandleChange(c) => write!(f, "TaskResult::HandleChange({:?})", c),
TaskResult::LoadChange(c) => write!(f, "TaskResult::LoadChange({:?})", c),
}
}
BulkLoadRoot {
root: VfsRoot,
files: Vec<(RelativePathBuf, String)>,
},
AddSingleFile {
root: VfsRoot,
path: RelativePathBuf,
text: String,
},
ChangeSingleFile {
root: VfsRoot,
path: RelativePathBuf,
text: String,
},
RemoveSingleFile {
root: VfsRoot,
path: RelativePathBuf,
},
}
pub(crate) struct Worker {
worker: thread_worker::Worker<Task, TaskResult>,
worker_handle: WorkerHandle,
watcher: Arc<Mutex<Option<Watcher>>>,
}
impl Worker {
pub(crate) fn start() -> Worker {
let watcher = Arc::new(Mutex::new(None));
let watcher_clone = watcher.clone();
pub(crate) fn start(roots: Arc<Roots>) -> Worker {
let (worker, worker_handle) =
thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| {
input_receiver
let mut watcher = match Watcher::start(roots, output_sender.clone()) {
Ok(w) => Some(w),
Err(e) => {
log::error!("could not start watcher: {}", e);
None
}
};
let res = input_receiver
.into_iter()
.filter_map(|t| handle_task(t, &watcher_clone))
.try_for_each(|it| output_sender.send(it))
.unwrap()
.filter_map(|t| handle_task(t, &mut watcher))
.try_for_each(|it| output_sender.send(it));
if let Some(watcher) = watcher {
let _ = watcher.shutdown();
}
res.unwrap()
});
match Watcher::start(worker.inp.clone()) {
Ok(w) => {
watcher.lock().replace(w);
}
Err(e) => log::error!("could not start watcher: {}", e),
};
Worker {
worker,
worker_handle,
watcher,
}
}
@ -102,72 +79,31 @@ impl Worker {
}
pub(crate) fn shutdown(self) -> thread::Result<()> {
if let Some(watcher) = self.watcher.lock().take() {
let _ = watcher.shutdown();
}
let _ = self.worker.shutdown();
self.worker_handle.shutdown()
}
}
fn watch(
watcher: &Arc<Mutex<Option<Watcher>>>,
dir: &Path,
filter_entry: &RootFilter,
emit_for_existing: bool,
) {
if let Some(watcher) = watcher.lock().as_mut() {
watcher.watch_recursive(dir, filter_entry, emit_for_existing)
}
}
fn handle_task(task: Task, watcher: &Arc<Mutex<Option<Watcher>>>) -> Option<TaskResult> {
fn handle_task(task: Task, watcher: &mut Option<Watcher>) -> Option<TaskResult> {
match task {
Task::AddRoot {
root,
path,
root_filter,
nested_roots,
} => {
watch(watcher, &path, root_filter.as_ref(), false);
log::debug!("loading {} ...", path.as_path().display());
let files = load_root(
path.as_path(),
root_filter.as_ref(),
nested_roots.as_slice(),
);
log::debug!("... loaded {}", path.as_path().display());
Some(TaskResult::AddRoot(AddRootResult { root, files }))
}
Task::HandleChange(change) => {
// forward as is because Vfs has to decide if we should load it
Some(TaskResult::HandleChange(change))
}
Task::LoadChange(change) => {
log::debug!("loading {:?} ...", change);
load_change(change).map(TaskResult::LoadChange)
}
Task::Watch { dir, root_filter } => {
watch(watcher, &dir, root_filter.as_ref(), true);
None
Task::AddRoot { root, filter } => {
if let Some(watcher) = watcher {
watcher.watch_root(&filter)
}
log::debug!("loading {} ...", filter.root.as_path().display());
let files = load_root(filter.as_ref());
log::debug!("... loaded {}", filter.root.as_path().display());
Some(TaskResult::BulkLoadRoot { root, files })
}
}
}
fn load_root(
root: &Path,
root_filter: &RootFilter,
nested_roots: &[PathBuf],
) -> Vec<(RelativePathBuf, String)> {
fn load_root(filter: &RootFilter) -> Vec<(RelativePathBuf, String)> {
let mut res = Vec::new();
for entry in WalkDir::new(root).into_iter().filter_entry(|entry| {
if entry.file_type().is_dir() && nested_roots.iter().any(|it| it == entry.path()) {
// do not load files of a nested root
false
} else {
root_filter.can_contain(entry.path()).is_some()
}
}) {
for entry in WalkDir::new(&filter.root)
.into_iter()
.filter_entry(filter.entry_filter())
{
let entry = match entry {
Ok(entry) => entry,
Err(e) => {
@ -186,42 +122,8 @@ fn load_root(
continue;
}
};
let path = RelativePathBuf::from_path(path.strip_prefix(root).unwrap()).unwrap();
let path = RelativePathBuf::from_path(path.strip_prefix(&filter.root).unwrap()).unwrap();
res.push((path.to_owned(), text))
}
res
}
fn load_change(change: WatcherChange) -> Option<WatcherChangeData> {
let data = match change {
WatcherChange::Create(path) => {
if path.is_dir() {
return None;
}
let text = match fs::read_to_string(&path) {
Ok(text) => text,
Err(e) => {
log::warn!("watcher error \"{}\": {}", path.display(), e);
return None;
}
};
WatcherChangeData::Create { path, text }
}
WatcherChange::Write(path) => {
let text = match fs::read_to_string(&path) {
Ok(text) => text,
Err(e) => {
log::warn!("watcher error \"{}\": {}", path.display(), e);
return None;
}
};
WatcherChangeData::Write { path, text }
}
WatcherChange::Remove(path) => WatcherChangeData::Remove { path },
WatcherChange::Rescan => {
// this should be handled by Vfs::handle_task
return None;
}
};
Some(data)
}