refactor: remove run_worker_loop (#4028)

* remove run_worker_loop, impl poll for WebWorker
* store JoinHandle to worker thread
This commit is contained in:
Bartek Iwańczuk 2020-02-18 14:47:11 -05:00 committed by GitHub
parent 08dcf6bff7
commit 3d5bed35e0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 117 additions and 126 deletions

View file

@ -1,11 +1,17 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use crate::ops; use crate::ops;
use crate::state::State; use crate::state::State;
use crate::worker::Worker; use crate::web_worker::WebWorker;
use core::task::Context;
use deno_core; use deno_core;
use deno_core::ErrBox;
use deno_core::StartupData; use deno_core::StartupData;
use futures::future::Future;
use futures::future::FutureExt;
use std::ops::Deref; use std::ops::Deref;
use std::ops::DerefMut; use std::ops::DerefMut;
use std::pin::Pin;
use std::task::Poll;
/// This worker is used to host TypeScript and WASM compilers. /// This worker is used to host TypeScript and WASM compilers.
/// ///
@ -20,22 +26,15 @@ use std::ops::DerefMut;
/// ///
/// TODO(bartlomieju): add support to reuse the worker - or in other /// TODO(bartlomieju): add support to reuse the worker - or in other
/// words support stateful TS compiler /// words support stateful TS compiler
pub struct CompilerWorker(Worker); pub struct CompilerWorker(WebWorker);
impl CompilerWorker { impl CompilerWorker {
pub fn new(name: String, startup_data: StartupData, state: State) -> Self { pub fn new(name: String, startup_data: StartupData, state: State) -> Self {
let state_ = state.clone(); let state_ = state.clone();
let mut worker = Worker::new(name, startup_data, state_); let mut worker = WebWorker::new(name, startup_data, state_);
{ {
let isolate = &mut worker.isolate; let isolate = &mut worker.isolate;
ops::runtime::init(isolate, &state);
ops::compiler::init(isolate, &state); ops::compiler::init(isolate, &state);
ops::web_worker::init(isolate, &state, &worker.internal_channels.sender);
ops::errors::init(isolate, &state);
// for compatibility with Worker scope, though unused at
// the moment
ops::timers::init(isolate, &state);
ops::fetch::init(isolate, &state);
// TODO(bartlomieju): CompilerWorker should not // TODO(bartlomieju): CompilerWorker should not
// depend on those ops // depend on those ops
ops::os::init(isolate, &state); ops::os::init(isolate, &state);
@ -48,7 +47,7 @@ impl CompilerWorker {
} }
impl Deref for CompilerWorker { impl Deref for CompilerWorker {
type Target = Worker; type Target = WebWorker;
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
&self.0 &self.0
} }
@ -59,3 +58,12 @@ impl DerefMut for CompilerWorker {
&mut self.0 &mut self.0
} }
} }
impl Future for CompilerWorker {
type Output = Result<(), ErrBox>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
inner.0.poll_unpin(cx)
}
}

View file

@ -9,12 +9,11 @@ use crate::file_fetcher::SourceFile;
use crate::file_fetcher::SourceFileFetcher; use crate::file_fetcher::SourceFileFetcher;
use crate::global_state::GlobalState; use crate::global_state::GlobalState;
use crate::msg; use crate::msg;
use crate::ops::worker_host::run_worker_loop;
use crate::ops::JsonResult; use crate::ops::JsonResult;
use crate::source_maps::SourceMapGetter; use crate::source_maps::SourceMapGetter;
use crate::startup_data; use crate::startup_data;
use crate::state::*; use crate::state::*;
use crate::tokio_util::create_basic_runtime; use crate::tokio_util;
use crate::version; use crate::version;
use crate::worker::WorkerEvent; use crate::worker::WorkerEvent;
use crate::worker::WorkerHandle; use crate::worker::WorkerHandle;
@ -611,11 +610,10 @@ async fn execute_in_thread(
let builder = let builder =
std::thread::Builder::new().name("deno-ts-compiler".to_string()); std::thread::Builder::new().name("deno-ts-compiler".to_string());
let join_handle = builder.spawn(move || { let join_handle = builder.spawn(move || {
let mut worker = TsCompiler::setup_worker(global_state.clone()); let worker = TsCompiler::setup_worker(global_state.clone());
handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
drop(handle_sender); drop(handle_sender);
let mut rt = create_basic_runtime(); tokio_util::run_basic(worker).expect("Panic in event loop");
run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop");
})?; })?;
let mut handle = handle_receiver.recv().unwrap()?; let mut handle = handle_receiver.recv().unwrap()?;
handle.post_message(req).await?; handle.post_message(req).await?;

View file

@ -3,10 +3,9 @@ use super::compiler_worker::CompilerWorker;
use crate::compilers::CompiledModule; use crate::compilers::CompiledModule;
use crate::file_fetcher::SourceFile; use crate::file_fetcher::SourceFile;
use crate::global_state::GlobalState; use crate::global_state::GlobalState;
use crate::ops::worker_host::run_worker_loop;
use crate::startup_data; use crate::startup_data;
use crate::state::*; use crate::state::*;
use crate::tokio_util::create_basic_runtime; use crate::tokio_util;
use crate::worker::WorkerEvent; use crate::worker::WorkerEvent;
use crate::worker::WorkerHandle; use crate::worker::WorkerHandle;
use deno_core::Buf; use deno_core::Buf;
@ -123,11 +122,10 @@ async fn execute_in_thread(
let builder = let builder =
std::thread::Builder::new().name("deno-wasm-compiler".to_string()); std::thread::Builder::new().name("deno-wasm-compiler".to_string());
let join_handle = builder.spawn(move || { let join_handle = builder.spawn(move || {
let mut worker = WasmCompiler::setup_worker(global_state); let worker = WasmCompiler::setup_worker(global_state);
handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
drop(handle_sender); drop(handle_sender);
let mut rt = create_basic_runtime(); tokio_util::run_basic(worker).expect("Panic in event loop");
run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop");
})?; })?;
let mut handle = handle_receiver.recv().unwrap()?; let mut handle = handle_receiver.recv().unwrap()?;
handle.post_message(req).await?; handle.post_message(req).await?;

View file

@ -12,18 +12,15 @@ use crate::startup_data;
use crate::state::State; use crate::state::State;
use crate::tokio_util::create_basic_runtime; use crate::tokio_util::create_basic_runtime;
use crate::web_worker::WebWorker; use crate::web_worker::WebWorker;
use crate::worker::Worker;
use crate::worker::WorkerEvent; use crate::worker::WorkerEvent;
use crate::worker::WorkerHandle; use crate::worker::WorkerHandle;
use deno_core::*; use deno_core::*;
use futures; use futures;
use futures::future::poll_fn;
use futures::future::FutureExt; use futures::future::FutureExt;
use futures::future::TryFutureExt; use futures::future::TryFutureExt;
use futures::stream::StreamExt;
use std; use std;
use std::convert::From; use std::convert::From;
use std::task::Poll; use std::thread::JoinHandle;
pub fn init(i: &mut Isolate, s: &State) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op( i.register_op(
@ -61,64 +58,6 @@ fn create_web_worker(
Ok(worker) Ok(worker)
} }
// TODO(bartlomieju): this function should probably live in `cli/web_worker.rs`
pub fn run_worker_loop(
rt: &mut tokio::runtime::Runtime,
worker: &mut Worker,
) -> Result<(), ErrBox> {
let mut worker_is_ready = false;
let fut = poll_fn(|cx| -> Poll<Result<(), ErrBox>> {
if !worker_is_ready {
match worker.poll_unpin(cx) {
Poll::Ready(r) => {
if let Err(e) = r {
let mut sender = worker.internal_channels.sender.clone();
futures::executor::block_on(sender.send(WorkerEvent::Error(e)))
.expect("Failed to post message to host");
}
worker_is_ready = true;
}
Poll::Pending => {}
}
}
let maybe_msg = {
match worker.internal_channels.receiver.poll_next_unpin(cx) {
Poll::Ready(r) => match r {
Some(msg) => {
let msg_str = String::from_utf8(msg.to_vec()).unwrap();
debug!("received message from host: {}", msg_str);
Some(msg_str)
}
None => {
debug!("channel closed by host, worker event loop shuts down");
return Poll::Ready(Ok(()));
}
},
Poll::Pending => None,
}
};
if let Some(msg) = maybe_msg {
// TODO: just add second value and then bind using rusty_v8
// to get structured clone/transfer working
let script = format!("workerMessageRecvCallback({})", msg);
worker
.execute(&script)
.expect("Failed to execute message cb");
// Let worker be polled again
worker_is_ready = false;
worker.waker.wake();
}
Poll::Pending
});
rt.block_on(fut)
}
// TODO(bartlomieju): this function should probably live in `cli/web_worker.rs`
// TODO(bartlomieju): check if order of actions is aligned to Worker spec // TODO(bartlomieju): check if order of actions is aligned to Worker spec
fn run_worker_thread( fn run_worker_thread(
name: String, name: String,
@ -127,14 +66,13 @@ fn run_worker_thread(
specifier: ModuleSpecifier, specifier: ModuleSpecifier,
has_source_code: bool, has_source_code: bool,
source_code: String, source_code: String,
) -> Result<WorkerHandle, ErrBox> { ) -> Result<(JoinHandle<()>, WorkerHandle), ErrBox> {
let (handle_sender, handle_receiver) = let (handle_sender, handle_receiver) =
std::sync::mpsc::sync_channel::<Result<WorkerHandle, ErrBox>>(1); std::sync::mpsc::sync_channel::<Result<WorkerHandle, ErrBox>>(1);
let builder = let builder =
std::thread::Builder::new().name(format!("deno-worker-{}", name)); std::thread::Builder::new().name(format!("deno-worker-{}", name));
// TODO(bartlomieju): store JoinHandle as well let join_handle = builder.spawn(move || {
builder.spawn(move || {
// Any error inside this block is terminal: // Any error inside this block is terminal:
// - JS worker is useless - meaning it throws an exception and can't do anything else, // - JS worker is useless - meaning it throws an exception and can't do anything else,
// all action done upon it should be noops // all action done upon it should be noops
@ -189,10 +127,11 @@ fn run_worker_thread(
// TODO(bartlomieju): this thread should return result of event loop // TODO(bartlomieju): this thread should return result of event loop
// that means that we should store JoinHandle to thread to ensure // that means that we should store JoinHandle to thread to ensure
// that it actually terminates. // that it actually terminates.
run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop"); rt.block_on(worker).expect("Panic in event loop");
})?; })?;
handle_receiver.recv().unwrap() let worker_handle = handle_receiver.recv().unwrap()?;
Ok((join_handle, worker_handle))
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -230,7 +169,7 @@ fn op_create_worker(
format!("USER-WORKER-{}", specifier) format!("USER-WORKER-{}", specifier)
}); });
let worker_handle = run_worker_thread( let (join_handle, worker_handle) = run_worker_thread(
worker_name, worker_name,
global_state, global_state,
permissions, permissions,
@ -240,7 +179,12 @@ fn op_create_worker(
)?; )?;
// At this point all interactions with worker happen using thread // At this point all interactions with worker happen using thread
// safe handler returned from previous function call // safe handler returned from previous function call
let worker_id = parent_state.add_child_worker(worker_handle); let mut parent_state = parent_state.borrow_mut();
let worker_id = parent_state.next_worker_id;
parent_state.next_worker_id += 1;
parent_state
.workers
.insert(worker_id, (join_handle, worker_handle));
Ok(JsonOp::Sync(json!({ "id": worker_id }))) Ok(JsonOp::Sync(json!({ "id": worker_id })))
} }
@ -258,9 +202,10 @@ fn op_host_terminate_worker(
let args: WorkerArgs = serde_json::from_value(args)?; let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32; let id = args.id as u32;
let mut state = state.borrow_mut(); let mut state = state.borrow_mut();
let worker_handle = let (join_handle, worker_handle) =
state.workers.remove(&id).expect("No worker handle found"); state.workers.remove(&id).expect("No worker handle found");
worker_handle.terminate(); worker_handle.terminate();
join_handle.join().expect("Panic in worker thread");
Ok(JsonOp::Sync(json!({}))) Ok(JsonOp::Sync(json!({})))
} }
@ -299,22 +244,22 @@ fn op_host_get_message(
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let args: WorkerArgs = serde_json::from_value(args)?; let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32; let id = args.id as u32;
let state_ = state.borrow(); let worker_handle = {
let worker_handle = state_ let state_ = state.borrow();
.workers let (_join_handle, worker_handle) =
.get(&id) state_.workers.get(&id).expect("No worker handle found");
.expect("No worker handle found") worker_handle.clone()
.clone(); };
let state_ = state.clone(); let state_ = state.clone();
let op = async move { let op = async move {
let response = match worker_handle.get_event().await { let response = match worker_handle.get_event().await {
Some(event) => serialize_worker_event(event), Some(event) => serialize_worker_event(event),
None => { None => {
let mut state_ = state_.borrow_mut(); let mut state_ = state_.borrow_mut();
let mut handle = let (join_handle, mut worker_handle) =
state_.workers.remove(&id).expect("No worker handle found"); state_.workers.remove(&id).expect("No worker handle found");
handle.sender.close_channel(); worker_handle.sender.close_channel();
// TODO(bartlomieju): join thread handle here join_handle.join().expect("Worker thread panicked");
json!({ "type": "close" }) json!({ "type": "close" })
} }
}; };
@ -335,7 +280,8 @@ fn op_host_post_message(
debug!("post message to worker {}", id); debug!("post message to worker {}", id);
let state = state.borrow(); let state = state.borrow();
let worker_handle = state.workers.get(&id).expect("No worker handle found"); let (_, worker_handle) =
state.workers.get(&id).expect("No worker handle found");
let fut = worker_handle let fut = worker_handle
.post_message(msg) .post_message(msg)
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string())); .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()));

View file

@ -30,8 +30,7 @@ use std::path::Path;
use std::pin::Pin; use std::pin::Pin;
use std::rc::Rc; use std::rc::Rc;
use std::str; use std::str;
use std::sync::atomic::AtomicUsize; use std::thread::JoinHandle;
use std::sync::atomic::Ordering;
use std::time::Instant; use std::time::Instant;
#[derive(Clone)] #[derive(Clone)]
@ -54,8 +53,8 @@ pub struct StateInner {
pub import_map: Option<ImportMap>, pub import_map: Option<ImportMap>,
pub metrics: Metrics, pub metrics: Metrics,
pub global_timer: GlobalTimer, pub global_timer: GlobalTimer,
pub workers: HashMap<u32, WorkerHandle>, pub workers: HashMap<u32, (JoinHandle<()>, WorkerHandle)>,
pub next_worker_id: AtomicUsize, pub next_worker_id: u32,
pub start_time: Instant, pub start_time: Instant,
pub seeded_rng: Option<StdRng>, pub seeded_rng: Option<StdRng>,
pub resource_table: ResourceTable, pub resource_table: ResourceTable,
@ -231,7 +230,7 @@ impl State {
metrics: Metrics::default(), metrics: Metrics::default(),
global_timer: GlobalTimer::new(), global_timer: GlobalTimer::new(),
workers: HashMap::new(), workers: HashMap::new(),
next_worker_id: AtomicUsize::new(0), next_worker_id: 0,
start_time: Instant::now(), start_time: Instant::now(),
seeded_rng, seeded_rng,
@ -267,7 +266,7 @@ impl State {
metrics: Metrics::default(), metrics: Metrics::default(),
global_timer: GlobalTimer::new(), global_timer: GlobalTimer::new(),
workers: HashMap::new(), workers: HashMap::new(),
next_worker_id: AtomicUsize::new(0), next_worker_id: 0,
start_time: Instant::now(), start_time: Instant::now(),
seeded_rng, seeded_rng,
@ -278,14 +277,6 @@ impl State {
Ok(Self(state)) Ok(Self(state))
} }
pub fn add_child_worker(&self, handle: WorkerHandle) -> u32 {
let mut inner_state = self.borrow_mut();
let worker_id =
inner_state.next_worker_id.fetch_add(1, Ordering::Relaxed) as u32;
inner_state.workers.insert(worker_id, handle);
worker_id
}
#[inline] #[inline]
pub fn check_read(&self, path: &Path) -> Result<(), ErrBox> { pub fn check_read(&self, path: &Path) -> Result<(), ErrBox> {
self.borrow().permissions.check_read(path) self.borrow().permissions.check_read(path)

View file

@ -2,17 +2,19 @@
use crate::ops; use crate::ops;
use crate::state::State; use crate::state::State;
use crate::worker::Worker; use crate::worker::Worker;
use crate::worker::WorkerEvent;
use deno_core; use deno_core;
use deno_core::ErrBox; use deno_core::ErrBox;
use deno_core::StartupData; use deno_core::StartupData;
use futures::future::FutureExt; use futures::future::FutureExt;
use futures::stream::StreamExt;
use futures::SinkExt;
use std::future::Future; use std::future::Future;
use std::ops::Deref; use std::ops::Deref;
use std::ops::DerefMut; use std::ops::DerefMut;
use std::pin::Pin; use std::pin::Pin;
use std::task::Context; use std::task::Context;
use std::task::Poll; use std::task::Poll;
/// This worker is implementation of `Worker` Web API /// This worker is implementation of `Worker` Web API
/// ///
/// At the moment this type of worker supports only /// At the moment this type of worker supports only
@ -20,7 +22,10 @@ use std::task::Poll;
/// ///
/// Each `WebWorker` is either a child of `MainWorker` or other /// Each `WebWorker` is either a child of `MainWorker` or other
/// `WebWorker`. /// `WebWorker`.
pub struct WebWorker(Worker); pub struct WebWorker {
worker: Worker,
is_ready: bool,
}
impl WebWorker { impl WebWorker {
pub fn new(name: String, startup_data: StartupData, state: State) -> Self { pub fn new(name: String, startup_data: StartupData, state: State) -> Self {
@ -36,20 +41,23 @@ impl WebWorker {
ops::fetch::init(isolate, &state); ops::fetch::init(isolate, &state);
} }
Self(worker) Self {
worker,
is_ready: false,
}
} }
} }
impl Deref for WebWorker { impl Deref for WebWorker {
type Target = Worker; type Target = Worker;
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
&self.0 &self.worker
} }
} }
impl DerefMut for WebWorker { impl DerefMut for WebWorker {
fn deref_mut(&mut self) -> &mut Self::Target { fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0 &mut self.worker
} }
} }
@ -58,14 +66,58 @@ impl Future for WebWorker {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut(); let inner = self.get_mut();
inner.0.poll_unpin(cx) let worker = &mut inner.worker;
if !inner.is_ready {
match worker.poll_unpin(cx) {
Poll::Ready(r) => {
if let Err(e) = r {
let mut sender = worker.internal_channels.sender.clone();
futures::executor::block_on(sender.send(WorkerEvent::Error(e)))
.expect("Failed to post message to host");
}
inner.is_ready = true;
}
Poll::Pending => {}
}
}
let maybe_msg = {
match worker.internal_channels.receiver.poll_next_unpin(cx) {
Poll::Ready(r) => match r {
Some(msg) => {
let msg_str = String::from_utf8(msg.to_vec()).unwrap();
debug!("received message from host: {}", msg_str);
Some(msg_str)
}
None => {
debug!("channel closed by host, worker event loop shuts down");
return Poll::Ready(Ok(()));
}
},
Poll::Pending => None,
}
};
if let Some(msg) = maybe_msg {
// TODO: just add second value and then bind using rusty_v8
// to get structured clone/transfer working
let script = format!("workerMessageRecvCallback({})", msg);
worker
.execute(&script)
.expect("Failed to execute message cb");
// Let worker be polled again
inner.is_ready = false;
worker.waker.wake();
}
Poll::Pending
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::ops::worker_host::run_worker_loop;
use crate::startup_data; use crate::startup_data;
use crate::state::State; use crate::state::State;
use crate::tokio_util; use crate::tokio_util;
@ -104,8 +156,7 @@ mod tests {
worker.execute(source).unwrap(); worker.execute(source).unwrap();
let handle = worker.thread_safe_handle(); let handle = worker.thread_safe_handle();
handle_sender.send(handle).unwrap(); handle_sender.send(handle).unwrap();
let mut rt = tokio_util::create_basic_runtime(); let r = tokio_util::run_basic(worker);
let r = run_worker_loop(&mut rt, &mut worker);
assert!(r.is_ok()) assert!(r.is_ok())
}); });
@ -154,8 +205,7 @@ mod tests {
worker.execute("onmessage = () => { close(); }").unwrap(); worker.execute("onmessage = () => { close(); }").unwrap();
let handle = worker.thread_safe_handle(); let handle = worker.thread_safe_handle();
handle_sender.send(handle).unwrap(); handle_sender.send(handle).unwrap();
let mut rt = tokio_util::create_basic_runtime(); let r = tokio_util::run_basic(worker);
let r = run_worker_loop(&mut rt, &mut worker);
assert!(r.is_ok()) assert!(r.is_ok())
}); });