Refactor Worker and ThreadSafeState (#3242)

* Split ThreadSafeState into State and GlobalState. State is a "local"
  state belonging to "Worker" while "GlobalState" is state shared by
  whole program.
* Update "Worker" and ops to use "GlobalState" where applicable
* Move and refactor "WorkerChannels" resource
This commit is contained in:
Bartek Iwańczuk 2019-11-04 16:38:52 +01:00 committed by Ry Dahl
parent 429439d198
commit 0049d4e50c
16 changed files with 557 additions and 414 deletions

View file

@ -1,33 +1,57 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use crate::deno_error::bad_resource;
use crate::fmt_errors::JSError;
use crate::ops;
use crate::resources;
use crate::resources::CoreResource;
use crate::resources::ResourceId;
use crate::state::ThreadSafeState;
use deno;
use deno::Buf;
use deno::ErrBox;
use deno::ModuleSpecifier;
use deno::RecursiveLoad;
use deno::StartupData;
use futures::Async;
use futures::Future;
use futures::Poll;
use futures::Sink;
use futures::Stream;
use std::env;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::sync::mpsc;
use url::Url;
/// Wraps mpsc channels into a generic resource so they can be referenced
/// from ops and used to facilitate parent-child communication
/// for workers.
pub struct WorkerChannels {
pub sender: mpsc::Sender<Buf>,
pub receiver: mpsc::Receiver<Buf>,
}
impl CoreResource for WorkerChannels {
fn inspect_repr(&self) -> &str {
"worker"
}
}
/// Wraps deno::Isolate to provide source maps, ops for the CLI, and
/// high-level module loading
/// high-level module loading.
#[derive(Clone)]
pub struct Worker {
pub name: String,
isolate: Arc<Mutex<deno::Isolate>>,
pub state: ThreadSafeState,
}
impl Worker {
pub fn new(
_name: String,
name: String,
startup_data: StartupData,
state: ThreadSafeState,
) -> Worker {
) -> Self {
let isolate = Arc::new(Mutex::new(deno::Isolate::new(startup_data, false)));
{
let mut i = isolate.lock().unwrap();
@ -61,12 +85,16 @@ impl Worker {
Box::new(load_stream)
});
let state_ = state.clone();
let global_state_ = state.global_state.clone();
i.set_js_error_create(move |v8_exception| {
JSError::from_v8_exception(v8_exception, &state_.ts_compiler)
JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler)
})
}
Self { isolate, state }
Self {
name,
isolate,
state,
}
}
/// Same as execute2() but the filename defaults to "$CWD/__anonymous__".
@ -106,7 +134,7 @@ impl Worker {
)
.get_future(isolate);
recursive_load.and_then(move |id| -> Result<(), ErrBox> {
worker.state.progress.done();
worker.state.global_state.progress.done();
if is_prefetch {
Ok(())
} else {
@ -115,6 +143,37 @@ impl Worker {
}
})
}
/// Post message to worker as a host or privileged overlord
pub fn post_message(self: &Self, buf: Buf) -> Result<Async<()>, ErrBox> {
Worker::post_message_to_resource(self.state.rid, buf)
}
pub fn post_message_to_resource(
rid: resources::ResourceId,
buf: Buf,
) -> Result<Async<()>, ErrBox> {
debug!("post message to resource {}", rid);
let mut table = resources::lock_resource_table();
let worker = table
.get_mut::<WorkerChannels>(rid)
.ok_or_else(bad_resource)?;
let sender = &mut worker.sender;
sender
.send(buf)
.poll()
.map(|_| Async::Ready(()))
.map_err(ErrBox::from)
}
pub fn get_message(self: &Self) -> WorkerReceiver {
Worker::get_message_from_resource(self.state.rid)
}
pub fn get_message_from_resource(rid: ResourceId) -> WorkerReceiver {
debug!("get message from resource {}", rid);
WorkerReceiver { rid }
}
}
impl Future for Worker {
@ -127,16 +186,39 @@ impl Future for Worker {
}
}
/// This structure wraps worker's resource id to implement future
/// that will return message received from worker or None
/// if worker's channel has been closed.
pub struct WorkerReceiver {
rid: ResourceId,
}
impl Future for WorkerReceiver {
type Item = Option<Buf>;
type Error = ErrBox;
fn poll(&mut self) -> Poll<Option<Buf>, ErrBox> {
let mut table = resources::lock_resource_table();
let worker = table
.get_mut::<WorkerChannels>(self.rid)
.ok_or_else(bad_resource)?;
let receiver = &mut worker.receiver;
receiver.poll().map_err(ErrBox::from)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::flags;
use crate::flags::DenoFlags;
use crate::global_state::ThreadSafeGlobalState;
use crate::progress::Progress;
use crate::resources;
use crate::startup_data;
use crate::state::ThreadSafeState;
use crate::tokio_util;
use futures::future::lazy;
use futures::IntoFuture;
use std::sync::atomic::Ordering;
#[test]
@ -149,13 +231,15 @@ mod tests {
let module_specifier =
ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap();
let argv = vec![String::from("./deno"), module_specifier.to_string()];
let state = ThreadSafeState::new(
let global_state = ThreadSafeGlobalState::new(
flags::DenoFlags::default(),
argv,
Progress::new(),
true,
)
.unwrap();
let state =
ThreadSafeState::new(global_state, Some(module_specifier.clone()), true)
.unwrap();
let state_ = state.clone();
tokio_util::run(lazy(move || {
let mut worker =
@ -186,13 +270,12 @@ mod tests {
let module_specifier =
ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap();
let argv = vec![String::from("deno"), module_specifier.to_string()];
let state = ThreadSafeState::new(
flags::DenoFlags::default(),
argv,
Progress::new(),
true,
)
.unwrap();
let global_state =
ThreadSafeGlobalState::new(DenoFlags::default(), argv, Progress::new())
.unwrap();
let state =
ThreadSafeState::new(global_state, Some(module_specifier.clone()), true)
.unwrap();
let state_ = state.clone();
tokio_util::run(lazy(move || {
let mut worker =
@ -227,8 +310,15 @@ mod tests {
let argv = vec![String::from("deno"), module_specifier.to_string()];
let mut flags = flags::DenoFlags::default();
flags.reload = true;
let state =
ThreadSafeState::new(flags, argv, Progress::new(), true).unwrap();
let global_state =
ThreadSafeGlobalState::new(flags, argv, Progress::new()).unwrap();
let state = ThreadSafeState::new(
global_state.clone(),
Some(module_specifier.clone()),
true,
)
.unwrap();
let global_state_ = global_state.clone();
let state_ = state.clone();
tokio_util::run(lazy(move || {
let mut worker = Worker::new(
@ -247,10 +337,12 @@ mod tests {
})
}));
let metrics = &state_.metrics;
assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 3);
assert_eq!(state_.metrics.resolve_count.load(Ordering::SeqCst), 3);
// Check that we've only invoked the compiler once.
assert_eq!(metrics.compiler_starts.load(Ordering::SeqCst), 1);
assert_eq!(
global_state_.metrics.compiler_starts.load(Ordering::SeqCst),
1
);
drop(http_server_guard);
}
@ -285,8 +377,9 @@ mod tests {
"#;
worker.execute(source).unwrap();
let resource = worker.state.resource.clone();
let resource_ = resource.clone();
let worker_ = worker.clone();
let rid = worker.state.rid;
let resource_ = resources::Resource { rid };
tokio::spawn(lazy(move || {
worker.then(move |r| -> Result<(), ()> {
@ -298,14 +391,10 @@ mod tests {
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
let r = resources::post_message_to_worker(resource.rid, msg)
.expect("Bad resource")
.wait();
let r = worker_.post_message(msg).into_future().wait();
assert!(r.is_ok());
let maybe_msg = resources::get_message_from_worker(resource.rid)
.wait()
.unwrap();
let maybe_msg = worker_.get_message().wait().unwrap();
assert!(maybe_msg.is_some());
// Check if message received is [1, 2, 3] in json
assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");
@ -314,9 +403,7 @@ mod tests {
.to_string()
.into_boxed_str()
.into_boxed_bytes();
let r = resources::post_message_to_worker(resource.rid, msg)
.expect("Bad resource")
.wait();
let r = worker_.post_message(msg).into_future().wait();
assert!(r.is_ok());
})
}
@ -329,8 +416,9 @@ mod tests {
.execute("onmessage = () => { delete window.onmessage; }")
.unwrap();
let resource = worker.state.resource.clone();
let rid = resource.rid;
let rid = worker.state.rid;
let resource = resources::Resource { rid };
let worker_ = worker.clone();
let worker_future = worker
.then(move |r| -> Result<(), ()> {
@ -345,9 +433,7 @@ mod tests {
tokio::spawn(lazy(move || worker_future_.then(|_| Ok(()))));
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
let r = resources::post_message_to_worker(rid, msg)
.expect("Bad resource")
.wait();
let r = worker_.post_message(msg).into_future().wait();
assert!(r.is_ok());
debug!("rid {:?}", rid);