refator to move all io to io module

use same channel for scanner and watcher
some implementations pending
This commit is contained in:
Bernardo 2019-01-07 21:35:18 +01:00 committed by Aleksey Kladov
parent d032a1a4e8
commit 6b86f038d6
5 changed files with 136 additions and 103 deletions

View file

@ -113,7 +113,6 @@ enum Event {
Msg(RawMessage), Msg(RawMessage),
Task(Task), Task(Task),
Vfs(VfsTask), Vfs(VfsTask),
Watcher(WatcherChange),
Lib(LibraryData), Lib(LibraryData),
} }
@ -150,7 +149,6 @@ impl fmt::Debug for Event {
Event::Task(it) => fmt::Debug::fmt(it, f), Event::Task(it) => fmt::Debug::fmt(it, f),
Event::Vfs(it) => fmt::Debug::fmt(it, f), Event::Vfs(it) => fmt::Debug::fmt(it, f),
Event::Lib(it) => fmt::Debug::fmt(it, f), Event::Lib(it) => fmt::Debug::fmt(it, f),
Event::Watcher(it) => fmt::Debug::fmt(it, f),
} }
} }
} }
@ -185,10 +183,6 @@ fn main_loop_inner(
Ok(task) => Event::Vfs(task), Ok(task) => Event::Vfs(task),
Err(RecvError) => bail!("vfs died"), Err(RecvError) => bail!("vfs died"),
}, },
recv(state.vfs.read().change_receiver()) -> change => match change {
Ok(change) => Event::Watcher(change),
Err(RecvError) => bail!("vfs watcher died"),
},
recv(libdata_receiver) -> data => Event::Lib(data.unwrap()) recv(libdata_receiver) -> data => Event::Lib(data.unwrap())
}; };
log::info!("loop_turn = {:?}", event); log::info!("loop_turn = {:?}", event);
@ -200,10 +194,6 @@ fn main_loop_inner(
state.vfs.write().handle_task(task); state.vfs.write().handle_task(task);
state_changed = true; state_changed = true;
} }
Event::Watcher(change) => {
state.vfs.write().handle_change(change);
state_changed = true;
}
Event::Lib(lib) => { Event::Lib(lib) => {
feedback(internal_mode, "library loaded", msg_sender); feedback(internal_mode, "library loaded", msg_sender);
state.add_lib(lib); state.add_lib(lib);
@ -375,7 +365,7 @@ fn on_notification(
if let Some(file_id) = state if let Some(file_id) = state
.vfs .vfs
.write() .write()
.add_file_overlay(&path, Some(params.text_document.text)) .add_file_overlay(&path, params.text_document.text)
{ {
subs.add_sub(FileId(file_id.0.into())); subs.add_sub(FileId(file_id.0.into()));
} }
@ -394,10 +384,7 @@ fn on_notification(
.pop() .pop()
.ok_or_else(|| format_err!("empty changes"))? .ok_or_else(|| format_err!("empty changes"))?
.text; .text;
state state.vfs.write().change_file_overlay(path.as_path(), text);
.vfs
.write()
.change_file_overlay(path.as_path(), Some(text));
return Ok(()); return Ok(());
} }
Err(not) => not, Err(not) => not,

View file

@ -10,17 +10,47 @@ use relative_path::RelativePathBuf;
use crate::{VfsRoot, has_rs_extension}; use crate::{VfsRoot, has_rs_extension};
pub(crate) struct Task { pub(crate) enum Task {
pub(crate) root: VfsRoot, AddRoot {
pub(crate) path: PathBuf, root: VfsRoot,
pub(crate) filter: Box<Fn(&DirEntry) -> bool + Send>, path: PathBuf,
filter: Box<Fn(&DirEntry) -> bool + Send>,
},
WatcherChange(crate::watcher::WatcherChange),
} }
pub struct TaskResult { #[derive(Debug)]
pub struct AddRootResult {
pub(crate) root: VfsRoot, pub(crate) root: VfsRoot,
pub(crate) files: Vec<(RelativePathBuf, String)>, pub(crate) files: Vec<(RelativePathBuf, String)>,
} }
#[derive(Debug)]
pub enum WatcherChangeResult {
Create {
path: PathBuf,
text: String,
},
Write {
path: PathBuf,
text: String,
},
Remove {
path: PathBuf,
},
// can this be replaced and use Remove and Create instead?
Rename {
src: PathBuf,
dst: PathBuf,
text: String,
},
}
pub enum TaskResult {
AddRoot(AddRootResult),
WatcherChange(WatcherChangeResult),
}
impl fmt::Debug for TaskResult { impl fmt::Debug for TaskResult {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("TaskResult { ... }") f.write_str("TaskResult { ... }")
@ -40,11 +70,18 @@ pub(crate) fn start() -> (Worker, WorkerHandle) {
} }
fn handle_task(task: Task) -> TaskResult { fn handle_task(task: Task) -> TaskResult {
let Task { root, path, filter } = task; match task {
log::debug!("loading {} ...", path.as_path().display()); Task::AddRoot { root, path, filter } => {
let files = load_root(path.as_path(), &*filter); log::debug!("loading {} ...", path.as_path().display());
log::debug!("... loaded {}", path.as_path().display()); let files = load_root(path.as_path(), &*filter);
TaskResult { root, files } log::debug!("... loaded {}", path.as_path().display());
TaskResult::AddRoot(AddRootResult { root, files })
}
Task::WatcherChange(change) => {
// TODO
unimplemented!()
}
}
} }
fn load_root(root: &Path, filter: &dyn Fn(&DirEntry) -> bool) -> Vec<(RelativePathBuf, String)> { fn load_root(root: &Path, filter: &dyn Fn(&DirEntry) -> bool) -> Vec<(RelativePathBuf, String)> {

View file

@ -60,7 +60,7 @@ impl RootFilter {
} }
} }
fn has_rs_extension(p: &Path) -> bool { pub(crate) fn has_rs_extension(p: &Path) -> bool {
p.extension() == Some(OsStr::new("rs")) p.extension() == Some(OsStr::new("rs"))
} }
@ -98,7 +98,7 @@ impl Vfs {
pub fn new(mut roots: Vec<PathBuf>) -> (Vfs, Vec<VfsRoot>) { pub fn new(mut roots: Vec<PathBuf>) -> (Vfs, Vec<VfsRoot>) {
let (worker, worker_handle) = io::start(); let (worker, worker_handle) = io::start();
let watcher = Watcher::start().unwrap(); // TODO return Result? let watcher = Watcher::start(worker.inp.clone()).unwrap(); // TODO return Result?
let mut res = Vfs { let mut res = Vfs {
roots: Arena::default(), roots: Arena::default(),
@ -127,7 +127,7 @@ impl Vfs {
nested.iter().all(|it| it != entry.path()) nested.iter().all(|it| it != entry.path())
} }
}; };
let task = io::Task { let task = io::Task::AddRoot {
root, root,
path: path.clone(), path: path.clone(),
filter: Box::new(filter), filter: Box::new(filter),
@ -188,58 +188,43 @@ impl Vfs {
&self.worker.out &self.worker.out
} }
pub fn change_receiver(&self) -> &Receiver<WatcherChange> {
&self.watcher.change_receiver()
}
pub fn handle_task(&mut self, task: io::TaskResult) { pub fn handle_task(&mut self, task: io::TaskResult) {
let mut files = Vec::new(); match task {
// While we were scanning the root in the backgound, a file might have io::TaskResult::AddRoot(task) => {
// been open in the editor, so we need to account for that. let mut files = Vec::new();
let exising = self.root2files[&task.root] // While we were scanning the root in the backgound, a file might have
.iter() // been open in the editor, so we need to account for that.
.map(|&file| (self.files[file].path.clone(), file)) let exising = self.root2files[&task.root]
.collect::<FxHashMap<_, _>>(); .iter()
for (path, text) in task.files { .map(|&file| (self.files[file].path.clone(), file))
if let Some(&file) = exising.get(&path) { .collect::<FxHashMap<_, _>>();
let text = Arc::clone(&self.files[file].text); for (path, text) in task.files {
files.push((file, path, text)); if let Some(&file) = exising.get(&path) {
continue; let text = Arc::clone(&self.files[file].text);
} files.push((file, path, text));
let text = Arc::new(text); continue;
let file = self.add_file(task.root, path.clone(), Arc::clone(&text)); }
files.push((file, path, text)); let text = Arc::new(text);
} let file = self.add_file(task.root, path.clone(), Arc::clone(&text));
files.push((file, path, text));
}
let change = VfsChange::AddRoot { let change = VfsChange::AddRoot {
root: task.root, root: task.root,
files, files,
}; };
self.pending_changes.push(change); self.pending_changes.push(change);
}
pub fn handle_change(&mut self, change: WatcherChange) {
match change {
WatcherChange::Create(path) => {
self.add_file_overlay(&path, None);
} }
WatcherChange::Remove(path) => { io::TaskResult::WatcherChange(change) => {
self.remove_file_overlay(&path); // TODO
} unimplemented!()
WatcherChange::Rename(src, dst) => {
self.remove_file_overlay(&src);
self.add_file_overlay(&dst, None);
}
WatcherChange::Write(path) => {
self.change_file_overlay(&path, None);
} }
} }
} }
pub fn add_file_overlay(&mut self, path: &Path, text: Option<String>) -> Option<VfsFile> { pub fn add_file_overlay(&mut self, path: &Path, text: String) -> Option<VfsFile> {
let mut res = None; let mut res = None;
if let Some((root, rel_path, file)) = self.find_root(path) { if let Some((root, rel_path, file)) = self.find_root(path) {
let text = text.unwrap_or_else(|| fs::read_to_string(&path).unwrap_or_default());
let text = Arc::new(text); let text = Arc::new(text);
let change = if let Some(file) = file { let change = if let Some(file) = file {
res = Some(file); res = Some(file);
@ -260,10 +245,8 @@ impl Vfs {
res res
} }
pub fn change_file_overlay(&mut self, path: &Path, new_text: Option<String>) { pub fn change_file_overlay(&mut self, path: &Path, new_text: String) {
if let Some((_root, _path, file)) = self.find_root(path) { if let Some((_root, _path, file)) = self.find_root(path) {
let new_text =
new_text.unwrap_or_else(|| fs::read_to_string(&path).unwrap_or_default());
let file = file.expect("can't change a file which wasn't added"); let file = file.expect("can't change a file which wasn't added");
let text = Arc::new(new_text); let text = Arc::new(new_text);
self.change_file(file, Arc::clone(&text)); self.change_file(file, Arc::clone(&text));

View file

@ -5,12 +5,12 @@ use std::{
time::Duration, time::Duration,
}; };
use crossbeam_channel::Receiver; use crossbeam_channel::Sender;
use drop_bomb::DropBomb; use drop_bomb::DropBomb;
use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher}; use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
use crate::{has_rs_extension, io};
pub struct Watcher { pub struct Watcher {
receiver: Receiver<WatcherChange>,
watcher: RecommendedWatcher, watcher: RecommendedWatcher,
thread: thread::JoinHandle<()>, thread: thread::JoinHandle<()>,
bomb: DropBomb, bomb: DropBomb,
@ -21,24 +21,54 @@ pub enum WatcherChange {
Create(PathBuf), Create(PathBuf),
Write(PathBuf), Write(PathBuf),
Remove(PathBuf), Remove(PathBuf),
// can this be replaced and use Remove and Create instead?
Rename(PathBuf, PathBuf), Rename(PathBuf, PathBuf),
} }
impl WatcherChange { impl WatcherChange {
fn from_debounced_event(ev: DebouncedEvent) -> Option<WatcherChange> { fn try_from_debounced_event(ev: DebouncedEvent) -> Option<WatcherChange> {
match ev { match ev {
DebouncedEvent::NoticeWrite(_) DebouncedEvent::NoticeWrite(_)
| DebouncedEvent::NoticeRemove(_) | DebouncedEvent::NoticeRemove(_)
| DebouncedEvent::Chmod(_) | DebouncedEvent::Chmod(_) => {
| DebouncedEvent::Rescan => {
// ignore // ignore
None None
} }
DebouncedEvent::Create(path) => Some(WatcherChange::Create(path)), DebouncedEvent::Rescan => {
DebouncedEvent::Write(path) => Some(WatcherChange::Write(path)), // TODO should we rescan the root?
DebouncedEvent::Remove(path) => Some(WatcherChange::Remove(path)), None
DebouncedEvent::Rename(src, dst) => Some(WatcherChange::Rename(src, dst)), }
DebouncedEvent::Create(path) => {
if has_rs_extension(&path) {
Some(WatcherChange::Create(path))
} else {
None
}
}
DebouncedEvent::Write(path) => {
if has_rs_extension(&path) {
Some(WatcherChange::Write(path))
} else {
None
}
}
DebouncedEvent::Remove(path) => {
if has_rs_extension(&path) {
Some(WatcherChange::Remove(path))
} else {
None
}
}
DebouncedEvent::Rename(src, dst) => {
match (has_rs_extension(&src), has_rs_extension(&dst)) {
(true, true) => Some(WatcherChange::Rename(src, dst)),
(true, false) => Some(WatcherChange::Remove(src)),
(false, true) => Some(WatcherChange::Create(dst)),
(false, false) => None,
}
}
DebouncedEvent::Error(err, path) => { DebouncedEvent::Error(err, path) => {
// TODO should we reload the file contents?
log::warn!("watch error {}, {:?}", err, path); log::warn!("watch error {}, {:?}", err, path);
None None
} }
@ -47,20 +77,20 @@ impl WatcherChange {
} }
impl Watcher { impl Watcher {
pub fn start() -> Result<Watcher, Box<std::error::Error>> { pub(crate) fn start(
output_sender: Sender<io::Task>,
) -> Result<Watcher, Box<std::error::Error>> {
let (input_sender, input_receiver) = mpsc::channel(); let (input_sender, input_receiver) = mpsc::channel();
let watcher = notify::watcher(input_sender, Duration::from_millis(250))?; let watcher = notify::watcher(input_sender, Duration::from_millis(250))?;
let (output_sender, output_receiver) = crossbeam_channel::unbounded();
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
input_receiver input_receiver
.into_iter() .into_iter()
// forward relevant events only // forward relevant events only
.filter_map(WatcherChange::from_debounced_event) .filter_map(WatcherChange::try_from_debounced_event)
.try_for_each(|change| output_sender.send(change)) .try_for_each(|change| output_sender.send(io::Task::WatcherChange(change)))
.unwrap() .unwrap()
}); });
Ok(Watcher { Ok(Watcher {
receiver: output_receiver,
watcher, watcher,
thread, thread,
bomb: DropBomb::new(format!("Watcher was not shutdown")), bomb: DropBomb::new(format!("Watcher was not shutdown")),
@ -72,10 +102,6 @@ impl Watcher {
Ok(()) Ok(())
} }
pub fn change_receiver(&self) -> &Receiver<WatcherChange> {
&self.receiver
}
pub fn shutdown(mut self) -> thread::Result<()> { pub fn shutdown(mut self) -> thread::Result<()> {
self.bomb.defuse(); self.bomb.defuse();
drop(self.watcher); drop(self.watcher);

View file

@ -59,15 +59,15 @@ fn test_vfs_works() -> std::io::Result<()> {
// on disk change // on disk change
fs::write(&dir.path().join("a/b/baz.rs"), "quux").unwrap(); fs::write(&dir.path().join("a/b/baz.rs"), "quux").unwrap();
let change = vfs.change_receiver().recv().unwrap(); let task = vfs.task_receiver().recv().unwrap();
vfs.handle_change(change); vfs.handle_task(task);
match vfs.commit_changes().as_slice() { match vfs.commit_changes().as_slice() {
[VfsChange::ChangeFile { text, .. }] => assert_eq!(text.as_str(), "quux"), [VfsChange::ChangeFile { text, .. }] => assert_eq!(text.as_str(), "quux"),
_ => panic!("unexpected changes"), _ => panic!("unexpected changes"),
} }
// in memory change // in memory change
vfs.change_file_overlay(&dir.path().join("a/b/baz.rs"), Some("m".to_string())); vfs.change_file_overlay(&dir.path().join("a/b/baz.rs"), "m".to_string());
match vfs.commit_changes().as_slice() { match vfs.commit_changes().as_slice() {
[VfsChange::ChangeFile { text, .. }] => assert_eq!(text.as_str(), "m"), [VfsChange::ChangeFile { text, .. }] => assert_eq!(text.as_str(), "m"),
_ => panic!("unexpected changes"), _ => panic!("unexpected changes"),
@ -81,7 +81,7 @@ fn test_vfs_works() -> std::io::Result<()> {
} }
// in memory add // in memory add
vfs.add_file_overlay(&dir.path().join("a/b/spam.rs"), Some("spam".to_string())); vfs.add_file_overlay(&dir.path().join("a/b/spam.rs"), "spam".to_string());
match vfs.commit_changes().as_slice() { match vfs.commit_changes().as_slice() {
[VfsChange::AddFile { text, path, .. }] => { [VfsChange::AddFile { text, path, .. }] => {
assert_eq!(text.as_str(), "spam"); assert_eq!(text.as_str(), "spam");
@ -99,8 +99,8 @@ fn test_vfs_works() -> std::io::Result<()> {
// on disk add // on disk add
fs::write(&dir.path().join("a/new.rs"), "new hello").unwrap(); fs::write(&dir.path().join("a/new.rs"), "new hello").unwrap();
let change = vfs.change_receiver().recv().unwrap(); let task = vfs.task_receiver().recv().unwrap();
vfs.handle_change(change); vfs.handle_task(task);
match vfs.commit_changes().as_slice() { match vfs.commit_changes().as_slice() {
[VfsChange::AddFile { text, path, .. }] => { [VfsChange::AddFile { text, path, .. }] => {
assert_eq!(text.as_str(), "new hello"); assert_eq!(text.as_str(), "new hello");
@ -111,8 +111,8 @@ fn test_vfs_works() -> std::io::Result<()> {
// on disk rename // on disk rename
fs::rename(&dir.path().join("a/new.rs"), &dir.path().join("a/new1.rs")).unwrap(); fs::rename(&dir.path().join("a/new.rs"), &dir.path().join("a/new1.rs")).unwrap();
let change = vfs.change_receiver().recv().unwrap(); let task = vfs.task_receiver().recv().unwrap();
vfs.handle_change(change); vfs.handle_task(task);
match vfs.commit_changes().as_slice() { match vfs.commit_changes().as_slice() {
[VfsChange::RemoveFile { [VfsChange::RemoveFile {
path: removed_path, .. path: removed_path, ..
@ -130,14 +130,14 @@ fn test_vfs_works() -> std::io::Result<()> {
// on disk remove // on disk remove
fs::remove_file(&dir.path().join("a/new1.rs")).unwrap(); fs::remove_file(&dir.path().join("a/new1.rs")).unwrap();
let change = vfs.change_receiver().recv().unwrap(); let task = vfs.task_receiver().recv().unwrap();
vfs.handle_change(change); vfs.handle_task(task);
match vfs.commit_changes().as_slice() { match vfs.commit_changes().as_slice() {
[VfsChange::RemoveFile { path, .. }] => assert_eq!(path, "new1.rs"), [VfsChange::RemoveFile { path, .. }] => assert_eq!(path, "new1.rs"),
_ => panic!("unexpected changes"), _ => panic!("unexpected changes"),
} }
match vfs.change_receiver().try_recv() { match vfs.task_receiver().try_recv() {
Err(crossbeam_channel::TryRecvError::Empty) => (), Err(crossbeam_channel::TryRecvError::Empty) => (),
res => panic!("unexpected {:?}", res), res => panic!("unexpected {:?}", res),
} }