add sync requests

This commit is contained in:
Aleksey Kladov 2019-05-31 20:50:16 +03:00
parent 9697d8afad
commit c6537c3280
2 changed files with 56 additions and 43 deletions

View file

@ -2,7 +2,7 @@ mod handlers;
mod subscriptions; mod subscriptions;
pub(crate) mod pending_requests; pub(crate) mod pending_requests;
use std::{fmt, path::PathBuf, sync::Arc, time::Instant, any::TypeId}; use std::{fmt, path::PathBuf, sync::Arc, time::Instant};
use crossbeam_channel::{select, unbounded, Receiver, RecvError, Sender}; use crossbeam_channel::{select, unbounded, Receiver, RecvError, Sender};
use failure::{bail, format_err}; use failure::{bail, format_err};
@ -219,22 +219,15 @@ fn main_loop_inner(
Some(req) => req, Some(req) => req,
None => return Ok(()), None => return Ok(()),
}; };
match req.cast::<req::CollectGarbage>() { on_request(
Ok((id, ())) => { state,
state.collect_garbage(); pending_requests,
let resp = RawResponse::ok::<req::CollectGarbage>(id, &()); pool,
msg_sender.send(resp.into()).unwrap() &task_sender,
} msg_sender,
Err(req) => on_request( loop_start,
state, req,
pending_requests, )?
pool,
&task_sender,
msg_sender,
loop_start,
req,
)?,
}
} }
RawMessage::Notification(not) => { RawMessage::Notification(not) => {
on_notification(msg_sender, state, pending_requests, subs, not)?; on_notification(msg_sender, state, pending_requests, subs, not)?;
@ -322,6 +315,7 @@ fn on_request(
request_received, request_received,
}; };
pool_dispatcher pool_dispatcher
.on_sync::<req::CollectGarbage>(|state, ()| Ok(state.collect_garbage()))?
.on::<req::AnalyzerStatus>(handlers::handle_analyzer_status)? .on::<req::AnalyzerStatus>(handlers::handle_analyzer_status)?
.on::<req::SyntaxTree>(handlers::handle_syntax_tree)? .on::<req::SyntaxTree>(handlers::handle_syntax_tree)?
.on::<req::ExtendSelection>(handlers::handle_extend_selection)? .on::<req::ExtendSelection>(handlers::handle_extend_selection)?
@ -434,37 +428,39 @@ struct PoolDispatcher<'a> {
} }
impl<'a> PoolDispatcher<'a> { impl<'a> PoolDispatcher<'a> {
fn on_sync<R>(
&mut self,
f: fn(&mut ServerWorldState, R::Params) -> Result<R::Result>,
) -> Result<&mut Self>
where
R: req::Request + 'static,
R::Params: DeserializeOwned + Send + 'static,
R::Result: Serialize + 'static,
{
let (id, params) = match self.parse::<R>() {
Some(it) => it,
None => {
return Ok(self);
}
};
let result = f(self.world, params);
let task = result_to_task::<R>(id, result);
on_task(task, self.msg_sender, self.pending_requests, self.world);
Ok(self)
}
fn on<R>(&mut self, f: fn(ServerWorld, R::Params) -> Result<R::Result>) -> Result<&mut Self> fn on<R>(&mut self, f: fn(ServerWorld, R::Params) -> Result<R::Result>) -> Result<&mut Self>
where where
R: req::Request + 'static, R: req::Request + 'static,
R::Params: DeserializeOwned + Send + 'static, R::Params: DeserializeOwned + Send + 'static,
R::Result: Serialize + 'static, R::Result: Serialize + 'static,
{ {
let req = match self.req.take() { let (id, params) = match self.parse::<R>() {
None => return Ok(self), Some(it) => it,
Some(req) => req, None => {
};
let (id, params) = match req.cast::<R>() {
Ok(it) => it,
Err(req) => {
self.req = Some(req);
return Ok(self); return Ok(self);
} }
}; };
self.pending_requests.start(PendingRequest {
id,
method: R::METHOD.to_string(),
received: self.request_received,
});
// Real time requests block user typing, so we should react quickly to them.
// Currently this means that we try to cancel background jobs if we don't have
// a spare thread.
let is_real_time = TypeId::of::<R>() == TypeId::of::<req::JoinLines>()
|| TypeId::of::<R>() == TypeId::of::<req::OnEnter>();
if self.pool.queued_count() > 0 && is_real_time {
self.world.cancel_requests();
}
self.pool.execute({ self.pool.execute({
let world = self.world.snapshot(); let world = self.world.snapshot();
@ -479,6 +475,27 @@ impl<'a> PoolDispatcher<'a> {
Ok(self) Ok(self)
} }
fn parse<R>(&mut self) -> Option<(u64, R::Params)>
where
R: req::Request + 'static,
R::Params: DeserializeOwned + Send + 'static,
{
let req = self.req.take()?;
let (id, params) = match req.cast::<R>() {
Ok(it) => it,
Err(req) => {
self.req = Some(req);
return None;
}
};
self.pending_requests.start(PendingRequest {
id,
method: R::METHOD.to_string(),
received: self.request_received,
});
Some((id, params))
}
fn finish(&mut self) { fn finish(&mut self) {
match self.req.take() { match self.req.take() {
None => (), None => (),

View file

@ -136,10 +136,6 @@ impl ServerWorldState {
self.analysis_host.apply_change(change); self.analysis_host.apply_change(change);
} }
pub fn cancel_requests(&mut self) {
self.analysis_host.apply_change(AnalysisChange::new());
}
pub fn snapshot(&self) -> ServerWorld { pub fn snapshot(&self) -> ServerWorld {
ServerWorld { ServerWorld {
workspaces: Arc::clone(&self.workspaces), workspaces: Arc::clone(&self.workspaces),