refactor: Use Tokio's single-threaded runtime (#3844)

This change simplifies how we execute V8. Previously V8 Isolates jumped
around threads every time they were woken up. This was overly complex and
potentially hurting performance in a myriad ways. Now isolates run on
their own dedicated thread and never move.

- blocking_json spawns a thread and does not use a thread pool
- op_host_poll_worker and op_host_resume_worker are non-operational
- removes Worker::get_message and Worker::post_message
- ThreadSafeState::workers table contains WorkerChannel entries instead
  of actual Worker instances.
- MainWorker and CompilerWorker are no longer Futures.
- The multi-threaded version of deno_core_http_bench was removed.
- AyncOps no longer need to be Send + Sync

This PR is very large and several tests were disabled to speed
integration:
- installer_test_local_module_run
- installer_test_remote_module_run
- _015_duplicate_parallel_import
- _026_workers
This commit is contained in:
Ryan Dahl 2020-02-03 18:08:44 -05:00 committed by GitHub
parent 0471243334
commit 161cf7cdfd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 655 additions and 682 deletions

View file

@ -4,15 +4,9 @@ use crate::state::ThreadSafeState;
use crate::worker::Worker;
use crate::worker::WorkerChannels;
use deno_core;
use deno_core::ErrBox;
use deno_core::StartupData;
use futures::future::FutureExt;
use std::future::Future;
use std::ops::Deref;
use std::ops::DerefMut;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
/// This worker is used to host TypeScript and WASM compilers.
///
@ -27,7 +21,6 @@ use std::task::Poll;
///
/// TODO(bartlomieju): add support to reuse the worker - or in other
/// words support stateful TS compiler
#[derive(Clone)]
pub struct CompilerWorker(Worker);
impl CompilerWorker {
@ -38,27 +31,24 @@ impl CompilerWorker {
external_channels: WorkerChannels,
) -> Self {
let state_ = state.clone();
let worker = Worker::new(name, startup_data, state_, external_channels);
let mut worker = Worker::new(name, startup_data, state_, external_channels);
{
let mut isolate = worker.isolate.try_lock().unwrap();
ops::runtime::init(&mut isolate, &state);
ops::compiler::init(&mut isolate, &state);
ops::web_worker::init(&mut isolate, &state);
ops::errors::init(&mut isolate, &state);
let isolate = &mut worker.isolate;
ops::runtime::init(isolate, &state);
ops::compiler::init(isolate, &state);
ops::web_worker::init(isolate, &state);
ops::errors::init(isolate, &state);
// for compatibility with Worker scope, though unused at
// the moment
ops::timers::init(&mut isolate, &state);
ops::fetch::init(&mut isolate, &state);
ops::timers::init(isolate, &state);
ops::fetch::init(isolate, &state);
// TODO(bartlomieju): CompilerWorker should not
// depend on those ops
ops::os::init(&mut isolate, &state);
ops::files::init(&mut isolate, &state);
ops::fs::init(&mut isolate, &state);
ops::io::init(&mut isolate, &state);
ops::os::init(isolate, &state);
ops::files::init(isolate, &state);
ops::fs::init(isolate, &state);
ops::io::init(isolate, &state);
}
Self(worker)
}
}
@ -75,12 +65,3 @@ impl DerefMut for CompilerWorker {
&mut self.0
}
}
impl Future for CompilerWorker {
type Output = Result<(), ErrBox>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
inner.0.poll_unpin(cx)
}
}

View file

@ -11,7 +11,7 @@ pub struct JsCompiler {}
impl JsCompiler {
pub fn compile_async(
&self,
source_file: &SourceFile,
source_file: SourceFile,
) -> Pin<Box<CompiledModuleFuture>> {
let module = CompiledModule {
code: str::from_utf8(&source_file.source_code)

View file

@ -1,7 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use crate::ops::JsonResult;
use deno_core::ErrBox;
use futures::Future;
use serde_json::Value;
mod compiler_worker;
mod js;
@ -17,8 +17,7 @@ pub use ts::TargetLib;
pub use ts::TsCompiler;
pub use wasm::WasmCompiler;
pub type CompilationResultFuture =
dyn Future<Output = Result<Value, ErrBox>> + Send;
pub type CompilationResultFuture = dyn Future<Output = JsonResult>;
#[derive(Debug, Clone)]
pub struct CompiledModule {
@ -27,4 +26,4 @@ pub struct CompiledModule {
}
pub type CompiledModuleFuture =
dyn Future<Output = Result<CompiledModule, ErrBox>> + Send;
dyn Future<Output = Result<CompiledModule, ErrBox>>;

View file

@ -9,7 +9,7 @@ use crate::file_fetcher::SourceFile;
use crate::file_fetcher::SourceFileFetcher;
use crate::global_state::ThreadSafeGlobalState;
use crate::msg;
use crate::serde_json::json;
use crate::ops::JsonResult;
use crate::source_maps::SourceMapGetter;
use crate::startup_data;
use crate::state::*;
@ -20,6 +20,7 @@ use deno_core::ModuleSpecifier;
use futures::future::FutureExt;
use futures::Future;
use regex::Regex;
use serde_json::json;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fs;
@ -37,7 +38,7 @@ lazy_static! {
Regex::new(r#""checkJs"\s*?:\s*?true"#).unwrap();
}
#[derive(Clone, Copy)]
#[derive(Clone)]
pub enum TargetLib {
Main,
Worker,
@ -236,7 +237,8 @@ impl TsCompiler {
Ok(compiler)
}
/// Create a new V8 worker with snapshot of TS compiler and setup compiler's runtime.
/// Create a new V8 worker with snapshot of TS compiler and setup compiler's
/// runtime.
fn setup_worker(global_state: ThreadSafeGlobalState) -> CompilerWorker {
let (int, ext) = ThreadSafeState::create_channels();
let worker_state =
@ -280,34 +282,52 @@ impl TsCompiler {
true,
);
let worker = TsCompiler::setup_worker(global_state);
let worker_ = worker.clone();
// TODO(ry) The code below looks very similar to spawn_ts_compiler_worker.
// Can we combine them?
let (load_sender, load_receiver) =
tokio::sync::oneshot::channel::<Result<(), ErrBox>>();
std::thread::spawn(move || {
let mut worker = TsCompiler::setup_worker(global_state);
let handle = worker.thread_safe_handle();
async move {
worker.post_message(req_msg).await?;
worker.await?;
debug!("Sent message to worker");
let maybe_msg = worker_.get_message().await;
debug!("Received message from worker");
if let Some(msg) = maybe_msg {
let json_str = std::str::from_utf8(&msg).unwrap();
debug!("Message: {}", json_str);
if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) {
return Err(ErrBox::from(diagnostics));
let fut = async move {
if let Err(err) = handle.post_message(req_msg).await {
load_sender.send(Err(err)).unwrap();
return;
}
debug!("Sent message to worker");
if let Err(err) = (&mut *worker).await {
load_sender.send(Err(err)).unwrap();
return;
}
let maybe_msg = handle.get_message().await;
debug!("Received message from worker");
if let Some(ref msg) = maybe_msg {
let json_str = std::str::from_utf8(msg).unwrap();
debug!("Message: {}", json_str);
if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) {
let err = ErrBox::from(diagnostics);
load_sender.send(Err(err)).unwrap();
return;
}
}
load_sender.send(Ok(())).unwrap();
}
Ok(())
}
.boxed_local();
crate::tokio_util::run_basic(fut);
});
async { load_receiver.await.unwrap() }.boxed_local()
}
/// Mark given module URL as compiled to avoid multiple compilations of same module
/// in single run.
/// Mark given module URL as compiled to avoid multiple compilations of same
/// module in single run.
fn mark_compiled(&self, url: &Url) {
let mut c = self.compiled.lock().unwrap();
c.insert(url.clone());
}
/// Check if given module URL has already been compiled and can be fetched directly from disk.
/// Check if given module URL has already been compiled and can be fetched
/// directly from disk.
fn has_compiled(&self, url: &Url) -> bool {
let c = self.compiled.lock().unwrap();
c.contains(url)
@ -317,9 +337,11 @@ impl TsCompiler {
///
/// This method compiled every module at most once.
///
/// If `--reload` flag was provided then compiler will not on-disk cache and force recompilation.
/// If `--reload` flag was provided then compiler will not on-disk cache and
/// force recompilation.
///
/// If compilation is required then new V8 worker is spawned with fresh TS compiler.
/// If compilation is required then new V8 worker is spawned with fresh TS
/// compiler.
pub fn compile_async(
&self,
global_state: ThreadSafeGlobalState,
@ -356,22 +378,12 @@ impl TsCompiler {
}
}
}
let source_file_ = source_file.clone();
debug!(">>>>> compile_sync START");
let module_url = source_file.url.clone();
debug!(
"Running rust part of compile_sync, module specifier: {}",
&source_file.url
);
let target = match target {
TargetLib::Main => "main",
TargetLib::Worker => "worker",
};
let root_names = vec![module_url.to_string()];
let req_msg = req(
msg::CompilerRequestType::Compile,
@ -382,34 +394,51 @@ impl TsCompiler {
false,
);
let worker = TsCompiler::setup_worker(global_state.clone());
let worker_ = worker.clone();
let compiling_job = global_state
.progress
.add("Compile", &module_url.to_string());
let global_state_ = global_state;
// TODO(ry) The code below looks very similar to spawn_ts_compiler_worker.
// Can we combine them?
let (load_sender, load_receiver) =
tokio::sync::oneshot::channel::<Result<CompiledModule, ErrBox>>();
std::thread::spawn(move || {
debug!(">>>>> compile_async START");
async move {
worker.post_message(req_msg).await?;
worker.await?;
debug!("Sent message to worker");
let maybe_msg = worker_.get_message().await;
if let Some(msg) = maybe_msg {
let json_str = std::str::from_utf8(&msg).unwrap();
debug!("Message: {}", json_str);
if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) {
return Err(ErrBox::from(diagnostics));
let mut worker = TsCompiler::setup_worker(global_state.clone());
let handle = worker.thread_safe_handle();
let compiling_job = global_state
.progress
.add("Compile", &module_url.to_string());
let fut = async move {
if let Err(err) = handle.post_message(req_msg).await {
load_sender.send(Err(err)).unwrap();
return;
}
if let Err(err) = (&mut *worker).await {
load_sender.send(Err(err)).unwrap();
return;
}
let maybe_msg = handle.get_message().await;
if let Some(ref msg) = maybe_msg {
let json_str = std::str::from_utf8(msg).unwrap();
if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) {
let err = ErrBox::from(diagnostics);
load_sender.send(Err(err)).unwrap();
return;
}
}
let compiled_module = global_state
.ts_compiler
.get_compiled_module(&source_file_.url)
.expect("Expected to find compiled file");
drop(compiling_job);
debug!(">>>>> compile_sync END");
load_sender.send(Ok(compiled_module)).unwrap();
}
let compiled_module = global_state_
.ts_compiler
.get_compiled_module(&source_file_.url)
.expect("Expected to find compiled file");
drop(compiling_job);
debug!(">>>>> compile_sync END");
Ok(compiled_module)
}
.boxed()
.boxed_local();
crate::tokio_util::run_basic(fut);
});
async { load_receiver.await.unwrap() }.boxed_local()
}
/// Get associated `CompiledFileMetadata` for given module if it exists.
@ -625,6 +654,39 @@ impl TsCompiler {
}
}
// TODO(ry) this is pretty general purpose and should be lifted and generalized.
fn spawn_ts_compiler_worker(
req_msg: Buf,
global_state: ThreadSafeGlobalState,
) -> Pin<Box<CompilationResultFuture>> {
let (load_sender, load_receiver) =
tokio::sync::oneshot::channel::<JsonResult>();
std::thread::spawn(move || {
let mut worker = TsCompiler::setup_worker(global_state);
let handle = worker.thread_safe_handle();
let fut = async move {
debug!("Sent message to worker");
if let Err(err) = handle.post_message(req_msg).await {
load_sender.send(Err(err)).unwrap();
return;
}
if let Err(err) = (&mut *worker).await {
load_sender.send(Err(err)).unwrap();
return;
}
let msg = handle.get_message().await.unwrap();
let json_str = std::str::from_utf8(&msg).unwrap();
load_sender.send(Ok(json!(json_str))).unwrap();
};
crate::tokio_util::run_basic(fut);
});
let fut = async { load_receiver.await.unwrap() };
fut.boxed_local()
}
pub fn runtime_compile_async<S: BuildHasher>(
global_state: ThreadSafeGlobalState,
root_name: &str,
@ -644,18 +706,7 @@ pub fn runtime_compile_async<S: BuildHasher>(
.into_boxed_str()
.into_boxed_bytes();
let worker = TsCompiler::setup_worker(global_state);
let worker_ = worker.clone();
async move {
worker.post_message(req_msg).await?;
worker.await?;
debug!("Sent message to worker");
let msg = (worker_.get_message().await).unwrap();
let json_str = std::str::from_utf8(&msg).unwrap();
Ok(json!(json_str))
}
.boxed()
spawn_ts_compiler_worker(req_msg, global_state)
}
pub fn runtime_transpile_async<S: BuildHasher>(
@ -672,38 +723,25 @@ pub fn runtime_transpile_async<S: BuildHasher>(
.into_boxed_str()
.into_boxed_bytes();
let worker = TsCompiler::setup_worker(global_state);
let worker_ = worker.clone();
async move {
worker.post_message(req_msg).await?;
worker.await?;
debug!("Sent message to worker");
let msg = (worker_.get_message().await).unwrap();
let json_str = std::str::from_utf8(&msg).unwrap();
Ok(json!(json_str))
}
.boxed()
spawn_ts_compiler_worker(req_msg, global_state)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::fs as deno_fs;
use crate::tokio_util;
use deno_core::ModuleSpecifier;
use std::path::PathBuf;
use tempfile::TempDir;
#[test]
fn test_compile_async() {
#[tokio::test]
async fn test_compile_async() {
let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.parent()
.unwrap()
.join("cli/tests/002_hello.ts");
let specifier =
ModuleSpecifier::resolve_url_or_path(p.to_str().unwrap()).unwrap();
let out = SourceFile {
url: specifier.as_url().clone(),
filename: PathBuf::from(p.to_str().unwrap().to_string()),
@ -711,31 +749,24 @@ mod tests {
source_code: include_bytes!("../tests/002_hello.ts").to_vec(),
types_url: None,
};
let mock_state = ThreadSafeGlobalState::mock(vec![
String::from("deno"),
String::from("hello.js"),
]);
let fut = async move {
let result = mock_state
.ts_compiler
.compile_async(mock_state.clone(), &out, TargetLib::Main)
.await;
assert!(result.is_ok());
assert!(result
.unwrap()
.code
.as_bytes()
.starts_with(b"console.log(\"Hello World\");"));
};
tokio_util::run(fut.boxed())
let result = mock_state
.ts_compiler
.compile_async(mock_state.clone(), &out, TargetLib::Main)
.await;
assert!(result.is_ok());
assert!(result
.unwrap()
.code
.as_bytes()
.starts_with(b"console.log(\"Hello World\");"));
}
#[test]
fn test_bundle_async() {
#[tokio::test]
async fn test_bundle_async() {
let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.parent()
.unwrap()
@ -751,19 +782,15 @@ mod tests {
String::from("$deno$/bundle.js"),
]);
let fut = async move {
let result = state
.ts_compiler
.bundle_async(
state.clone(),
module_name,
Some(String::from("$deno$/bundle.js")),
)
.await;
assert!(result.is_ok());
};
tokio_util::run(fut.boxed())
let result = state
.ts_compiler
.bundle_async(
state.clone(),
module_name,
Some(String::from("$deno$/bundle.js")),
)
.await;
assert!(result.is_ok());
}
#[test]

View file

@ -6,6 +6,7 @@ use crate::file_fetcher::SourceFile;
use crate::global_state::ThreadSafeGlobalState;
use crate::startup_data;
use crate::state::*;
use deno_core::ErrBox;
use futures::FutureExt;
use serde_derive::Deserialize;
use serde_json;
@ -70,56 +71,68 @@ impl WasmCompiler {
source_file: &SourceFile,
) -> Pin<Box<CompiledModuleFuture>> {
let cache = self.cache.clone();
let source_file = source_file.clone();
let maybe_cached = { cache.lock().unwrap().get(&source_file.url).cloned() };
if let Some(m) = maybe_cached {
return futures::future::ok(m).boxed();
}
let cache_ = self.cache.clone();
debug!(">>>>> wasm_compile_async START");
let base64_data = base64::encode(&source_file.source_code);
let worker = WasmCompiler::setup_worker(global_state);
let worker_ = worker.clone();
let url = source_file.url.clone();
let (load_sender, load_receiver) =
tokio::sync::oneshot::channel::<Result<CompiledModule, ErrBox>>();
Box::pin(async move {
let _ = worker
.post_message(
serde_json::to_string(&base64_data)
.unwrap()
.into_boxed_str()
.into_boxed_bytes(),
)
.await;
std::thread::spawn(move || {
debug!(">>>>> wasm_compile_async START");
let base64_data = base64::encode(&source_file.source_code);
let mut worker = WasmCompiler::setup_worker(global_state);
let handle = worker.thread_safe_handle();
let url = source_file.url.clone();
if let Err(err) = worker.await {
// TODO(ry) Need to forward the error instead of exiting.
eprintln!("{}", err.to_string());
std::process::exit(1);
}
debug!("Sent message to worker");
let json_msg = worker_.get_message().await.expect("not handled");
let fut = async move {
let _ = handle
.post_message(
serde_json::to_string(&base64_data)
.unwrap()
.into_boxed_str()
.into_boxed_bytes(),
)
.await;
debug!("Received message from worker");
let module_info: WasmModuleInfo =
serde_json::from_slice(&json_msg).unwrap();
debug!("WASM module info: {:#?}", &module_info);
let code = wrap_wasm_code(
&base64_data,
&module_info.import_list,
&module_info.export_list,
);
debug!("Generated code: {}", &code);
let module = CompiledModule {
code,
name: url.to_string(),
if let Err(err) = (&mut *worker).await {
load_sender.send(Err(err)).unwrap();
return;
}
debug!("Sent message to worker");
let json_msg = handle.get_message().await.expect("not handled");
debug!("Received message from worker");
let module_info: WasmModuleInfo =
serde_json::from_slice(&json_msg).unwrap();
debug!("WASM module info: {:#?}", &module_info);
let code = wrap_wasm_code(
&base64_data,
&module_info.import_list,
&module_info.export_list,
);
debug!("Generated code: {}", &code);
let module = CompiledModule {
code,
name: url.to_string(),
};
{
cache_.lock().unwrap().insert(url.clone(), module.clone());
}
debug!("<<<<< wasm_compile_async END");
load_sender.send(Ok(module)).unwrap();
};
{
cache_.lock().unwrap().insert(url.clone(), module.clone());
}
debug!("<<<<< wasm_compile_async END");
Ok(module)
})
crate::tokio_util::run_basic(fut);
});
let fut = async { load_receiver.await.unwrap() };
fut.boxed_local()
}
}