swtich lsp server to vfs

This commit is contained in:
Aleksey Kladov 2018-12-19 15:04:15 +03:00
parent 6a755ed83a
commit a5ef8ad05b
14 changed files with 235 additions and 399 deletions

View file

@ -1,7 +1,10 @@
mod handlers;
mod subscriptions;
use std::path::PathBuf;
use std::{
path::PathBuf,
sync::Arc,
};
use crossbeam_channel::{unbounded, select, Receiver, Sender};
use gen_lsp_server::{
@ -9,8 +12,8 @@ use gen_lsp_server::{
};
use languageserver_types::NumberOrString;
use ra_analysis::{Canceled, FileId, LibraryData};
use ra_vfs::{VfsTask};
use rayon;
use thread_worker::Worker;
use threadpool::ThreadPool;
use rustc_hash::FxHashSet;
use serde::{de::DeserializeOwned, Serialize};
@ -19,10 +22,9 @@ use failure_derive::Fail;
use crate::{
main_loop::subscriptions::Subscriptions,
project_model::{workspace_loader, CargoWorkspace},
project_model::{workspace_loader},
req,
server_world::{ServerWorld, ServerWorldState},
vfs::{self, FileEvent},
Result,
};
@ -50,32 +52,42 @@ enum Task {
pub fn main_loop(
internal_mode: bool,
root: PathBuf,
ws_root: PathBuf,
publish_decorations: bool,
msg_receiver: &Receiver<RawMessage>,
msg_sender: &Sender<RawMessage>,
) -> Result<()> {
let pool = ThreadPool::new(8);
let (task_sender, task_receiver) = unbounded::<Task>();
let (fs_worker, fs_watcher) = vfs::roots_loader();
let (ws_worker, ws_watcher) = workspace_loader();
ws_worker.send(ws_root.clone());
// FIXME: support dynamic workspace loading.
let workspaces = match ws_worker.recv().unwrap() {
Ok(ws) => vec![ws],
Err(e) => {
log::warn!("loading workspace failed: {}", e);
Vec::new()
}
};
ws_worker.shutdown();
ws_watcher
.shutdown()
.map_err(|_| format_err!("ws watcher died"))?;
let mut state = ServerWorldState::new(ws_root.clone(), workspaces);
log::info!("server initialized, serving requests");
let mut state = ServerWorldState::default();
let mut pending_requests = FxHashSet::default();
let mut subs = Subscriptions::new();
let main_res = main_loop_inner(
internal_mode,
publish_decorations,
root,
&pool,
msg_sender,
msg_receiver,
task_sender,
task_receiver.clone(),
fs_worker,
ws_worker,
&mut state,
&mut pending_requests,
&mut subs,
@ -88,12 +100,11 @@ pub fn main_loop(
drop(pool);
log::info!("...threadpool has finished");
let fs_res = fs_watcher.shutdown();
let ws_res = ws_watcher.shutdown();
let vfs = Arc::try_unwrap(state.vfs).expect("all snapshots should be dead");
let vfs_res = vfs.into_inner().shutdown();
main_res?;
fs_res.map_err(|_| format_err!("fs watcher died"))?;
ws_res.map_err(|_| format_err!("ws watcher died"))?;
vfs_res.map_err(|_| format_err!("fs watcher died"))?;
Ok(())
}
@ -101,28 +112,22 @@ pub fn main_loop(
fn main_loop_inner(
internal_mode: bool,
publish_decorations: bool,
ws_root: PathBuf,
pool: &ThreadPool,
msg_sender: &Sender<RawMessage>,
msg_receiver: &Receiver<RawMessage>,
task_sender: Sender<Task>,
task_receiver: Receiver<Task>,
fs_worker: Worker<PathBuf, (PathBuf, Vec<FileEvent>)>,
ws_worker: Worker<PathBuf, Result<CargoWorkspace>>,
state: &mut ServerWorldState,
pending_requests: &mut FxHashSet<u64>,
subs: &mut Subscriptions,
) -> Result<()> {
let (libdata_sender, libdata_receiver) = unbounded();
ws_worker.send(ws_root.clone());
fs_worker.send(ws_root.clone());
loop {
#[derive(Debug)]
enum Event {
Msg(RawMessage),
Task(Task),
Fs(PathBuf, Vec<FileEvent>),
Ws(Result<CargoWorkspace>),
Vfs(VfsTask),
Lib(LibraryData),
}
log::trace!("selecting");
@ -132,77 +137,19 @@ fn main_loop_inner(
None => bail!("client exited without shutdown"),
},
recv(task_receiver, task) => Event::Task(task.unwrap()),
recv(fs_worker.out, events) => match events {
None => bail!("roots watcher died"),
Some((pb, events)) => Event::Fs(pb, events),
}
recv(ws_worker.out, ws) => match ws {
None => bail!("workspace watcher died"),
Some(ws) => Event::Ws(ws),
recv(state.vfs.read().task_receiver(), task) => match task {
None => bail!("vfs died"),
Some(task) => Event::Vfs(task),
}
recv(libdata_receiver, data) => Event::Lib(data.unwrap())
};
let mut state_changed = false;
match event {
Event::Task(task) => on_task(task, msg_sender, pending_requests),
Event::Fs(root, events) => {
log::info!("fs change, {}, {} events", root.display(), events.len());
if root == ws_root {
state.apply_fs_changes(events);
} else {
let (files, resolver) = state.events_to_files(events);
let sender = libdata_sender.clone();
pool.execute(move || {
let start = ::std::time::Instant::now();
log::info!("indexing {} ... ", root.display());
let data = LibraryData::prepare(files, resolver);
log::info!("indexed {:?} {}", start.elapsed(), root.display());
sender.send(data);
});
}
Event::Vfs(task) => {
state.vfs.write().handle_task(task);
state_changed = true;
}
Event::Ws(ws) => match ws {
Ok(ws) => {
let workspaces = vec![ws];
feedback(internal_mode, "workspace loaded", msg_sender);
for ws in workspaces.iter() {
// Add each library as constant input. If library is
// within the workspace, don't treat it as a library.
//
// HACK: If source roots are nested, pick the outer one.
let mut roots = ws
.packages()
.filter(|pkg| !pkg.is_member(ws))
.filter_map(|pkg| {
let root = pkg.root(ws).to_path_buf();
if root.starts_with(&ws_root) {
None
} else {
Some(root)
}
})
.collect::<Vec<_>>();
roots.sort_by_key(|it| it.as_os_str().len());
let unique = roots
.iter()
.enumerate()
.filter(|&(idx, long)| {
!roots[..idx].iter().any(|short| long.starts_with(short))
})
.map(|(_idx, root)| root);
for root in unique {
log::debug!("sending root, {}", root.display());
fs_worker.send(root.to_owned());
}
}
state.set_workspaces(workspaces);
state_changed = true;
}
Err(e) => log::warn!("loading workspace failed: {}", e),
},
Event::Lib(lib) => {
feedback(internal_mode, "library loaded", msg_sender);
state.add_lib(lib);
@ -234,6 +181,18 @@ fn main_loop_inner(
},
};
for lib in state.process_changes() {
let (root, files) = lib;
let sender = libdata_sender.clone();
pool.execute(move || {
let start = ::std::time::Instant::now();
log::info!("indexing {:?} ... ", root);
let data = LibraryData::prepare(root, files);
log::info!("indexed {:?} {:?}", start.elapsed(), root);
sender.send(data);
});
}
if state_changed {
update_file_notifications_on_threadpool(
pool,
@ -336,8 +295,13 @@ fn on_notification(
let path = uri
.to_file_path()
.map_err(|()| format_err!("invalid uri: {}", uri))?;
let file_id = state.add_mem_file(path, params.text_document.text);
subs.add_sub(file_id);
if let Some(file_id) = state
.vfs
.write()
.add_file_overlay(&path, params.text_document.text)
{
subs.add_sub(FileId(file_id.0));
}
return Ok(());
}
Err(not) => not,
@ -353,7 +317,7 @@ fn on_notification(
.pop()
.ok_or_else(|| format_err!("empty changes"))?
.text;
state.change_mem_file(path.as_path(), text)?;
state.vfs.write().change_file_overlay(path.as_path(), text);
return Ok(());
}
Err(not) => not,
@ -364,8 +328,9 @@ fn on_notification(
let path = uri
.to_file_path()
.map_err(|()| format_err!("invalid uri: {}", uri))?;
let file_id = state.remove_mem_file(path.as_path())?;
subs.remove_sub(file_id);
if let Some(file_id) = state.vfs.write().remove_file_overlay(path.as_path()) {
subs.remove_sub(FileId(file_id.0));
}
let params = req::PublishDiagnosticsParams {
uri,
diagnostics: Vec::new(),