Deal with deadlocks in a more principaled way

This commit is contained in:
Aleksey Kladov 2018-09-08 12:36:02 +03:00
parent d9ccebd913
commit 326ffcefe0
6 changed files with 39 additions and 27 deletions

View file

@ -30,9 +30,8 @@ mod vfs;
mod path_map; mod path_map;
mod server_world; mod server_world;
mod project_model; mod project_model;
mod thread_watcher; pub mod thread_watcher;
pub type Result<T> = ::std::result::Result<T, ::failure::Error>; pub type Result<T> = ::std::result::Result<T, ::failure::Error>;
pub use caps::server_capabilities; pub use caps::server_capabilities;
pub use main_loop::main_loop; pub use main_loop::main_loop;

View file

@ -43,8 +43,8 @@ pub fn main_loop(
.build() .build()
.unwrap(); .unwrap();
let (task_sender, task_receiver) = unbounded::<Task>(); let (task_sender, task_receiver) = unbounded::<Task>();
let (fs_sender, fs_receiver, fs_watcher) = vfs::roots_loader(); let ((fs_sender, fs_receiver), fs_watcher) = vfs::roots_loader();
let (ws_sender, ws_receiver, ws_watcher) = workspace_loader(); let ((ws_sender, ws_receiver), ws_watcher) = workspace_loader();
info!("server initialized, serving requests"); info!("server initialized, serving requests");
let mut state = ServerWorldState::new(); let mut state = ServerWorldState::new();

View file

@ -3,12 +3,12 @@ use std::{
path::{Path, PathBuf}, path::{Path, PathBuf},
}; };
use cargo_metadata::{metadata_run, CargoOpt}; use cargo_metadata::{metadata_run, CargoOpt};
use crossbeam_channel::{bounded, Sender, Receiver}; use crossbeam_channel::{Sender, Receiver};
use libsyntax2::SmolStr; use libsyntax2::SmolStr;
use { use {
Result, Result,
thread_watcher::ThreadWatcher, thread_watcher::{ThreadWatcher, worker_chan},
}; };
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -162,15 +162,14 @@ impl TargetKind {
} }
} }
pub fn workspace_loader() -> (Sender<PathBuf>, Receiver<Result<CargoWorkspace>>, ThreadWatcher) { pub fn workspace_loader() -> ((Sender<PathBuf>, Receiver<Result<CargoWorkspace>>), ThreadWatcher) {
let (path_sender, path_receiver) = bounded::<PathBuf>(16); let (interface, input_receiver, output_sender) = worker_chan::<PathBuf, Result<CargoWorkspace>>(1);
let (ws_sender, ws_receiver) = bounded::<Result<CargoWorkspace>>(1);
let thread = ThreadWatcher::spawn("workspace loader", move || { let thread = ThreadWatcher::spawn("workspace loader", move || {
path_receiver input_receiver
.into_iter() .into_iter()
.map(|path| CargoWorkspace::from_cargo_metadata(path.as_path())) .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path()))
.for_each(|it| ws_sender.send(it)) .for_each(|it| output_sender.send(it))
}); });
(path_sender, ws_receiver, thread) (interface, thread)
} }

View file

@ -1,4 +1,5 @@
use std::thread; use std::thread;
use crossbeam_channel::{bounded, unbounded, Sender, Receiver};
use drop_bomb::DropBomb; use drop_bomb::DropBomb;
use Result; use Result;
@ -31,3 +32,12 @@ impl ThreadWatcher {
res res
} }
} }
/// Sets up worker channels in a deadlock-avoind way.
/// If one sets both input and output buffers to a fixed size,
/// a worker might get stuck.
pub fn worker_chan<I, O>(buf: usize) -> ((Sender<I>, Receiver<O>), Receiver<I>, Sender<O>) {
let (input_sender, input_receiver) = bounded::<I>(buf);
let (output_sender, output_receiver) = unbounded::<O>();
((input_sender, output_receiver), input_receiver, output_sender)
}

View file

@ -3,11 +3,11 @@ use std::{
fs, fs,
}; };
use crossbeam_channel::{Sender, Receiver, unbounded}; use crossbeam_channel::{Sender, Receiver};
use walkdir::WalkDir; use walkdir::WalkDir;
use { use {
thread_watcher::ThreadWatcher, thread_watcher::{ThreadWatcher, worker_chan},
}; };
@ -22,11 +22,11 @@ pub enum FileEventKind {
Add(String), Add(String),
} }
pub fn roots_loader() -> (Sender<PathBuf>, Receiver<(PathBuf, Vec<FileEvent>)>, ThreadWatcher) { pub fn roots_loader() -> ((Sender<PathBuf>, Receiver<(PathBuf, Vec<FileEvent>)>), ThreadWatcher) {
let (path_sender, path_receiver) = unbounded::<PathBuf>(); let (interface, input_receiver, output_sender) =
let (event_sender, event_receiver) = unbounded::<(PathBuf, Vec<FileEvent>)>(); worker_chan::<PathBuf, (PathBuf, Vec<FileEvent>)>(128);
let thread = ThreadWatcher::spawn("roots loader", move || { let thread = ThreadWatcher::spawn("roots loader", move || {
path_receiver input_receiver
.into_iter() .into_iter()
.map(|path| { .map(|path| {
debug!("loading {} ...", path.as_path().display()); debug!("loading {} ...", path.as_path().display());
@ -34,10 +34,10 @@ pub fn roots_loader() -> (Sender<PathBuf>, Receiver<(PathBuf, Vec<FileEvent>)>,
debug!("... loaded {}", path.as_path().display()); debug!("... loaded {}", path.as_path().display());
(path, events) (path, events)
}) })
.for_each(|it| event_sender.send(it)) .for_each(|it| output_sender.send(it))
}); });
(path_sender, event_receiver, thread) (interface, thread)
} }
fn load_root(path: &Path) -> Vec<FileEvent> { fn load_root(path: &Path) -> Vec<FileEvent> {

View file

@ -8,7 +8,7 @@ use std::{
}; };
use tempdir::TempDir; use tempdir::TempDir;
use crossbeam_channel::{unbounded, after, Sender, Receiver}; use crossbeam_channel::{after, Sender, Receiver};
use flexi_logger::Logger; use flexi_logger::Logger;
use languageserver_types::{ use languageserver_types::{
Url, Url,
@ -22,7 +22,7 @@ use serde::Serialize;
use serde_json::{Value, from_str, to_string_pretty}; use serde_json::{Value, from_str, to_string_pretty};
use gen_lsp_server::{RawMessage, RawRequest, RawNotification}; use gen_lsp_server::{RawMessage, RawRequest, RawNotification};
use m::{Result, main_loop, req}; use m::{Result, main_loop, req, thread_watcher::worker_chan};
pub fn project(fixture: &str) -> Server { pub fn project(fixture: &str) -> Server {
static INIT: Once = Once::new(); static INIT: Once = Once::new();
@ -69,15 +69,19 @@ pub struct Server {
impl Server { impl Server {
fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server { fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server {
let path = dir.path().to_path_buf(); let path = dir.path().to_path_buf();
let (client_sender, mut server_receiver) = unbounded(); let ((msg_sender, msg_receiver), server) = {
let (mut server_sender, client_receiver) = unbounded(); let (api, mut msg_receiver, mut msg_sender) = worker_chan::<RawMessage, RawMessage>(128);
let server = thread::spawn(move || main_loop(true, path, &mut server_receiver, &mut server_sender)); let server = thread::spawn(move || {
main_loop(true, path, &mut msg_receiver, &mut msg_sender)
});
(api, server)
};
let res = Server { let res = Server {
req_id: Cell::new(1), req_id: Cell::new(1),
dir, dir,
messages: Default::default(), messages: Default::default(),
sender: Some(client_sender), sender: Some(msg_sender),
receiver: client_receiver, receiver: msg_receiver,
server: Some(server), server: Some(server),
}; };