refactor: rename ThreadSafeState, use RefCell for mutable state (#3931)

* rename ThreadSafeState to State
* State stores InnerState wrapped in Rc and RefCell
This commit is contained in:
Bartek Iwańczuk 2020-02-08 20:34:31 +01:00 committed by GitHub
parent 619a24390f
commit cdba5ab6fc
31 changed files with 454 additions and 464 deletions

View file

@ -1,6 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use crate::ops; use crate::ops;
use crate::state::ThreadSafeState; use crate::state::State;
use crate::worker::Worker; use crate::worker::Worker;
use deno_core; use deno_core;
use deno_core::StartupData; use deno_core::StartupData;
@ -23,11 +23,7 @@ use std::ops::DerefMut;
pub struct CompilerWorker(Worker); pub struct CompilerWorker(Worker);
impl CompilerWorker { impl CompilerWorker {
pub fn new( pub fn new(name: String, startup_data: StartupData, state: State) -> Self {
name: String,
startup_data: StartupData,
state: ThreadSafeState,
) -> Self {
let state_ = state.clone(); let state_ = state.clone();
let mut worker = Worker::new(name, startup_data, state_); let mut worker = Worker::new(name, startup_data, state_);
{ {

View file

@ -249,9 +249,8 @@ impl TsCompiler {
fn setup_worker(global_state: GlobalState) -> CompilerWorker { fn setup_worker(global_state: GlobalState) -> CompilerWorker {
let entry_point = let entry_point =
ModuleSpecifier::resolve_url_or_path("./__$deno$ts_compiler.ts").unwrap(); ModuleSpecifier::resolve_url_or_path("./__$deno$ts_compiler.ts").unwrap();
let worker_state = let worker_state = State::new(global_state.clone(), None, entry_point)
ThreadSafeState::new(global_state.clone(), None, entry_point) .expect("Unable to create worker state");
.expect("Unable to create worker state");
// Count how many times we start the compiler worker. // Count how many times we start the compiler worker.
global_state global_state

View file

@ -52,9 +52,8 @@ impl WasmCompiler {
let entry_point = let entry_point =
ModuleSpecifier::resolve_url_or_path("./__$deno$wasm_compiler.ts") ModuleSpecifier::resolve_url_or_path("./__$deno$wasm_compiler.ts")
.unwrap(); .unwrap();
let worker_state = let worker_state = State::new(global_state.clone(), None, entry_point)
ThreadSafeState::new(global_state.clone(), None, entry_point) .expect("Unable to create worker state");
.expect("Unable to create worker state");
// Count how many times we start the compiler worker. // Count how many times we start the compiler worker.
global_state global_state

View file

@ -38,7 +38,7 @@ const SUPPORTED_FETCH_SCHEMES: [&str; 3] = ["http", "https", "file"];
type SpecifierMap = IndexMap<String, Vec<ModuleSpecifier>>; type SpecifierMap = IndexMap<String, Vec<ModuleSpecifier>>;
type ScopesMap = IndexMap<String, SpecifierMap>; type ScopesMap = IndexMap<String, SpecifierMap>;
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct ImportMap { pub struct ImportMap {
base_url: String, base_url: String,
imports: SpecifierMap, imports: SpecifierMap,

View file

@ -61,7 +61,7 @@ use crate::deno_error::js_check;
use crate::deno_error::{print_err_and_exit, print_msg_and_exit}; use crate::deno_error::{print_err_and_exit, print_msg_and_exit};
use crate::global_state::GlobalState; use crate::global_state::GlobalState;
use crate::ops::io::get_stdio; use crate::ops::io::get_stdio;
use crate::state::ThreadSafeState; use crate::state::State;
use crate::worker::MainWorker; use crate::worker::MainWorker;
use deno_core::v8_set_flags; use deno_core::v8_set_flags;
use deno_core::ErrBox; use deno_core::ErrBox;
@ -107,17 +107,17 @@ fn create_main_worker(
global_state: GlobalState, global_state: GlobalState,
main_module: ModuleSpecifier, main_module: ModuleSpecifier,
) -> MainWorker { ) -> MainWorker {
let state = ThreadSafeState::new(global_state, None, main_module) let state = State::new(global_state, None, main_module)
.map_err(deno_error::print_err_and_exit) .map_err(deno_error::print_err_and_exit)
.unwrap(); .unwrap();
let state_ = state.clone(); let state_ = state.clone();
{ {
let mut resource_table = state_.lock_resource_table(); let mut state = state_.borrow_mut();
let (stdin, stdout, stderr) = get_stdio(); let (stdin, stdout, stderr) = get_stdio();
resource_table.add("stdin", Box::new(stdin)); state.resource_table.add("stdin", Box::new(stdin));
resource_table.add("stdout", Box::new(stdout)); state.resource_table.add("stdout", Box::new(stdout));
resource_table.add("stderr", Box::new(stderr)); state.resource_table.add("stderr", Box::new(stderr));
} }
MainWorker::new("main".to_string(), startup_data::deno_isolate_init(), state) MainWorker::new("main".to_string(), startup_data::deno_isolate_init(), state)
@ -157,7 +157,7 @@ async fn print_file_info(
worker: &MainWorker, worker: &MainWorker,
module_specifier: ModuleSpecifier, module_specifier: ModuleSpecifier,
) { ) {
let global_state = worker.state.global_state.clone(); let global_state = worker.state.borrow().global_state.clone();
let maybe_source_file = global_state let maybe_source_file = global_state
.file_fetcher .file_fetcher

View file

@ -3,11 +3,11 @@ use super::dispatch_json::{Deserialize, JsonOp, Value};
use crate::futures::future::try_join_all; use crate::futures::future::try_join_all;
use crate::msg; use crate::msg;
use crate::ops::json_op; use crate::ops::json_op;
use crate::state::ThreadSafeState; use crate::state::State;
use deno_core::Loader; use deno_core::Loader;
use deno_core::*; use deno_core::*;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op("cache", s.core_op(json_op(s.stateful_op(op_cache)))); i.register_op("cache", s.core_op(json_op(s.stateful_op(op_cache))));
i.register_op( i.register_op(
"resolve_modules", "resolve_modules",
@ -28,7 +28,7 @@ struct CacheArgs {
} }
fn op_cache( fn op_cache(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -37,11 +37,15 @@ fn op_cache(
let module_specifier = ModuleSpecifier::resolve_url(&args.module_id) let module_specifier = ModuleSpecifier::resolve_url(&args.module_id)
.expect("Should be valid module specifier"); .expect("Should be valid module specifier");
state.global_state.ts_compiler.cache_compiler_output( state
&module_specifier, .borrow()
&args.extension, .global_state
&args.contents, .ts_compiler
)?; .cache_compiler_output(
&module_specifier,
&args.extension,
&args.contents,
)?;
Ok(JsonOp::Sync(json!({}))) Ok(JsonOp::Sync(json!({})))
} }
@ -53,7 +57,7 @@ struct SpecifiersReferrerArgs {
} }
fn op_resolve_modules( fn op_resolve_modules(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_data: Option<ZeroCopyBuf>, _data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -78,7 +82,7 @@ fn op_resolve_modules(
} }
fn op_fetch_source_files( fn op_fetch_source_files(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_data: Option<ZeroCopyBuf>, _data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -93,18 +97,17 @@ fn op_fetch_source_files(
}; };
let mut futures = vec![]; let mut futures = vec![];
let global_state = state.borrow().global_state.clone();
for specifier in &args.specifiers { for specifier in &args.specifiers {
let resolved_specifier = let resolved_specifier =
ModuleSpecifier::resolve_url(&specifier).expect("Invalid specifier"); ModuleSpecifier::resolve_url(&specifier).expect("Invalid specifier");
let fut = state let fut = global_state
.global_state
.file_fetcher .file_fetcher
.fetch_source_file_async(&resolved_specifier, ref_specifier.clone()); .fetch_source_file_async(&resolved_specifier, ref_specifier.clone());
futures.push(fut); futures.push(fut);
} }
let global_state = state.global_state.clone();
let future = Box::pin(async move { let future = Box::pin(async move {
let files = try_join_all(futures).await?; let files = try_join_all(futures).await?;

View file

@ -4,11 +4,11 @@ use crate::fmt_errors::JSError;
use crate::ops::json_op; use crate::ops::json_op;
use crate::source_maps::get_orig_position; use crate::source_maps::get_orig_position;
use crate::source_maps::CachedMaps; use crate::source_maps::CachedMaps;
use crate::state::ThreadSafeState; use crate::state::State;
use deno_core::*; use deno_core::*;
use std::collections::HashMap; use std::collections::HashMap;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op( i.register_op(
"apply_source_map", "apply_source_map",
s.core_op(json_op(s.stateful_op(op_apply_source_map))), s.core_op(json_op(s.stateful_op(op_apply_source_map))),
@ -25,12 +25,13 @@ struct FormatErrorArgs {
} }
fn op_format_error( fn op_format_error(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let args: FormatErrorArgs = serde_json::from_value(args)?; let args: FormatErrorArgs = serde_json::from_value(args)?;
let error = JSError::from_json(&args.error, &state.global_state.ts_compiler); let error =
JSError::from_json(&args.error, &state.borrow().global_state.ts_compiler);
Ok(JsonOp::Sync(json!({ Ok(JsonOp::Sync(json!({
"error": error.to_string(), "error": error.to_string(),
@ -45,7 +46,7 @@ struct ApplySourceMap {
} }
fn op_apply_source_map( fn op_apply_source_map(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -57,7 +58,7 @@ fn op_apply_source_map(
args.line.into(), args.line.into(),
args.column.into(), args.column.into(),
&mut mappings_map, &mut mappings_map,
&state.global_state.ts_compiler, &state.borrow().global_state.ts_compiler,
); );
Ok(JsonOp::Sync(json!({ Ok(JsonOp::Sync(json!({

View file

@ -3,7 +3,7 @@ use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource; use super::io::StreamResource;
use crate::http_util::{create_http_client, HttpBody}; use crate::http_util::{create_http_client, HttpBody};
use crate::ops::json_op; use crate::ops::json_op;
use crate::state::ThreadSafeState; use crate::state::State;
use deno_core::*; use deno_core::*;
use futures::future::FutureExt; use futures::future::FutureExt;
use http::header::HeaderName; use http::header::HeaderName;
@ -12,7 +12,7 @@ use http::Method;
use std; use std;
use std::convert::From; use std::convert::From;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op("fetch", s.core_op(json_op(s.stateful_op(op_fetch)))); i.register_op("fetch", s.core_op(json_op(s.stateful_op(op_fetch))));
} }
@ -24,7 +24,7 @@ struct FetchArgs {
} }
pub fn op_fetch( pub fn op_fetch(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
data: Option<ZeroCopyBuf>, data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -65,8 +65,8 @@ pub fn op_fetch(
} }
let body = HttpBody::from(res); let body = HttpBody::from(res);
let mut table = state_.lock_resource_table(); let mut state = state_.borrow_mut();
let rid = table.add( let rid = state.resource_table.add(
"httpBody", "httpBody",
Box::new(StreamResource::HttpBody(Box::new(body))), Box::new(StreamResource::HttpBody(Box::new(body))),
); );

View file

@ -6,7 +6,7 @@ use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind; use crate::deno_error::ErrorKind;
use crate::fs as deno_fs; use crate::fs as deno_fs;
use crate::ops::json_op; use crate::ops::json_op;
use crate::state::ThreadSafeState; use crate::state::State;
use deno_core::*; use deno_core::*;
use futures::future::FutureExt; use futures::future::FutureExt;
use std; use std;
@ -15,7 +15,7 @@ use std::io::SeekFrom;
use std::path::Path; use std::path::Path;
use tokio; use tokio;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op("open", s.core_op(json_op(s.stateful_op(op_open)))); i.register_op("open", s.core_op(json_op(s.stateful_op(op_open))));
i.register_op("close", s.core_op(json_op(s.stateful_op(op_close)))); i.register_op("close", s.core_op(json_op(s.stateful_op(op_close))));
i.register_op("seek", s.core_op(json_op(s.stateful_op(op_seek)))); i.register_op("seek", s.core_op(json_op(s.stateful_op(op_seek))));
@ -43,7 +43,7 @@ struct OpenOptions {
} }
fn op_open( fn op_open(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -130,8 +130,10 @@ fn op_open(
let fut = async move { let fut = async move {
let fs_file = open_options.open(filename).await?; let fs_file = open_options.open(filename).await?;
let mut table = state_.lock_resource_table(); let mut state = state_.borrow_mut();
let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file))); let rid = state
.resource_table
.add("fsFile", Box::new(StreamResource::FsFile(fs_file)));
Ok(json!(rid)) Ok(json!(rid))
}; };
@ -149,14 +151,17 @@ struct CloseArgs {
} }
fn op_close( fn op_close(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let args: CloseArgs = serde_json::from_value(args)?; let args: CloseArgs = serde_json::from_value(args)?;
let mut table = state.lock_resource_table(); let mut state = state.borrow_mut();
table.close(args.rid as u32).ok_or_else(bad_resource)?; state
.resource_table
.close(args.rid as u32)
.ok_or_else(bad_resource)?;
Ok(JsonOp::Sync(json!({}))) Ok(JsonOp::Sync(json!({})))
} }
@ -170,7 +175,7 @@ struct SeekArgs {
} }
fn op_seek( fn op_seek(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -191,13 +196,14 @@ fn op_seek(
} }
}; };
let mut table = state.lock_resource_table(); let state = state.borrow();
let resource = table let resource = state
.get_mut::<StreamResource>(rid) .resource_table
.get::<StreamResource>(rid)
.ok_or_else(bad_resource)?; .ok_or_else(bad_resource)?;
let tokio_file = match resource { let tokio_file = match resource {
StreamResource::FsFile(ref mut file) => file, StreamResource::FsFile(ref file) => file,
_ => return Err(bad_resource()), _ => return Err(bad_resource()),
}; };
let mut file = futures::executor::block_on(tokio_file.try_clone())?; let mut file = futures::executor::block_on(tokio_file.try_clone())?;

View file

@ -6,7 +6,7 @@ use crate::deno_error::ErrorKind;
use crate::fs as deno_fs; use crate::fs as deno_fs;
use crate::ops::dispatch_json::JsonResult; use crate::ops::dispatch_json::JsonResult;
use crate::ops::json_op; use crate::ops::json_op;
use crate::state::ThreadSafeState; use crate::state::State;
use deno_core::*; use deno_core::*;
use remove_dir_all::remove_dir_all; use remove_dir_all::remove_dir_all;
use std::convert::From; use std::convert::From;
@ -19,7 +19,7 @@ use std::os::unix::fs::MetadataExt;
#[cfg(unix)] #[cfg(unix)]
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op("chdir", s.core_op(json_op(s.stateful_op(op_chdir)))); i.register_op("chdir", s.core_op(json_op(s.stateful_op(op_chdir))));
i.register_op("mkdir", s.core_op(json_op(s.stateful_op(op_mkdir)))); i.register_op("mkdir", s.core_op(json_op(s.stateful_op(op_mkdir))));
i.register_op("chmod", s.core_op(json_op(s.stateful_op(op_chmod)))); i.register_op("chmod", s.core_op(json_op(s.stateful_op(op_chmod))));
@ -48,7 +48,7 @@ struct ChdirArgs {
} }
fn op_chdir( fn op_chdir(
_state: &ThreadSafeState, _state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -67,7 +67,7 @@ struct MkdirArgs {
} }
fn op_mkdir( fn op_mkdir(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -94,7 +94,7 @@ struct ChmodArgs {
} }
fn op_chmod( fn op_chmod(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -128,7 +128,7 @@ struct ChownArgs {
} }
fn op_chown( fn op_chown(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -156,7 +156,7 @@ struct RemoveArgs {
} }
fn op_remove( fn op_remove(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -191,7 +191,7 @@ struct CopyFileArgs {
} }
fn op_copy_file( fn op_copy_file(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -290,7 +290,7 @@ struct StatArgs {
} }
fn op_stat( fn op_stat(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -320,7 +320,7 @@ struct RealpathArgs {
} }
fn op_realpath( fn op_realpath(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -352,7 +352,7 @@ struct ReadDirArgs {
} }
fn op_read_dir( fn op_read_dir(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -389,7 +389,7 @@ struct RenameArgs {
} }
fn op_rename( fn op_rename(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -418,7 +418,7 @@ struct LinkArgs {
} }
fn op_link( fn op_link(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -446,7 +446,7 @@ struct SymlinkArgs {
} }
fn op_symlink( fn op_symlink(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -478,7 +478,7 @@ struct ReadLinkArgs {
} }
fn op_read_link( fn op_read_link(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -506,7 +506,7 @@ struct TruncateArgs {
} }
fn op_truncate( fn op_truncate(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -535,7 +535,7 @@ struct MakeTempDirArgs {
} }
fn op_make_temp_dir( fn op_make_temp_dir(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -575,7 +575,7 @@ struct Utime {
} }
fn op_utime( fn op_utime(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -590,7 +590,7 @@ fn op_utime(
} }
fn op_cwd( fn op_cwd(
_state: &ThreadSafeState, _state: &State,
_args: Value, _args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {

View file

@ -3,7 +3,7 @@ use crate::deno_error;
use crate::deno_error::bad_resource; use crate::deno_error::bad_resource;
use crate::http_util::HttpBody; use crate::http_util::HttpBody;
use crate::ops::minimal_op; use crate::ops::minimal_op;
use crate::state::ThreadSafeState; use crate::state::State;
use deno_core::ErrBox; use deno_core::ErrBox;
use deno_core::*; use deno_core::*;
use futures::future::FutureExt; use futures::future::FutureExt;
@ -47,7 +47,7 @@ lazy_static! {
}; };
} }
pub fn init(i: &mut Isolate, s: &ThreadSafeState) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op( i.register_op(
"read", "read",
s.core_op(minimal_op(s.stateful_minimal_op(op_read))), s.core_op(minimal_op(s.stateful_minimal_op(op_read))),
@ -131,7 +131,7 @@ enum IoState {
/// ///
/// The returned future will resolve to both the I/O stream and the buffer /// The returned future will resolve to both the I/O stream and the buffer
/// as well as the number of bytes read once the read operation is completed. /// as well as the number of bytes read once the read operation is completed.
pub fn read<T>(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Read<T> pub fn read<T>(state: &State, rid: ResourceId, buf: T) -> Read<T>
where where
T: AsMut<[u8]>, T: AsMut<[u8]>,
{ {
@ -151,7 +151,7 @@ pub struct Read<T> {
rid: ResourceId, rid: ResourceId,
buf: T, buf: T,
io_state: IoState, io_state: IoState,
state: ThreadSafeState, state: State,
} }
impl<T> Future for Read<T> impl<T> Future for Read<T>
@ -166,8 +166,9 @@ where
panic!("poll a Read after it's done"); panic!("poll a Read after it's done");
} }
let mut table = inner.state.lock_resource_table(); let mut state = inner.state.borrow_mut();
let resource = table let resource = state
.resource_table
.get_mut::<StreamResource>(inner.rid) .get_mut::<StreamResource>(inner.rid)
.ok_or_else(bad_resource)?; .ok_or_else(bad_resource)?;
let nread = ready!(resource.poll_read(cx, &mut inner.buf.as_mut()[..]))?; let nread = ready!(resource.poll_read(cx, &mut inner.buf.as_mut()[..]))?;
@ -177,7 +178,7 @@ where
} }
pub fn op_read( pub fn op_read(
state: &ThreadSafeState, state: &State,
rid: i32, rid: i32,
zero_copy: Option<ZeroCopyBuf>, zero_copy: Option<ZeroCopyBuf>,
) -> Pin<Box<MinimalOp>> { ) -> Pin<Box<MinimalOp>> {
@ -257,7 +258,7 @@ pub struct Write<T> {
rid: ResourceId, rid: ResourceId,
buf: T, buf: T,
io_state: IoState, io_state: IoState,
state: ThreadSafeState, state: State,
nwritten: i32, nwritten: i32,
} }
@ -266,7 +267,7 @@ pub struct Write<T> {
/// ///
/// Any error which happens during writing will cause both the stream and the /// Any error which happens during writing will cause both the stream and the
/// buffer to get destroyed. /// buffer to get destroyed.
pub fn write<T>(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Write<T> pub fn write<T>(state: &State, rid: ResourceId, buf: T) -> Write<T>
where where
T: AsRef<[u8]>, T: AsRef<[u8]>,
{ {
@ -294,8 +295,9 @@ where
} }
if inner.io_state == IoState::Pending { if inner.io_state == IoState::Pending {
let mut table = inner.state.lock_resource_table(); let mut state = inner.state.borrow_mut();
let resource = table let resource = state
.resource_table
.get_mut::<StreamResource>(inner.rid) .get_mut::<StreamResource>(inner.rid)
.ok_or_else(bad_resource)?; .ok_or_else(bad_resource)?;
@ -309,8 +311,9 @@ where
// Figure out why it's needed and preferably remove it. // Figure out why it's needed and preferably remove it.
// https://github.com/denoland/deno/issues/3565 // https://github.com/denoland/deno/issues/3565
if inner.io_state == IoState::Flush { if inner.io_state == IoState::Flush {
let mut table = inner.state.lock_resource_table(); let mut state = inner.state.borrow_mut();
let resource = table let resource = state
.resource_table
.get_mut::<StreamResource>(inner.rid) .get_mut::<StreamResource>(inner.rid)
.ok_or_else(bad_resource)?; .ok_or_else(bad_resource)?;
ready!(resource.poll_flush(cx))?; ready!(resource.poll_flush(cx))?;
@ -322,7 +325,7 @@ where
} }
pub fn op_write( pub fn op_write(
state: &ThreadSafeState, state: &State,
rid: i32, rid: i32,
zero_copy: Option<ZeroCopyBuf>, zero_copy: Option<ZeroCopyBuf>,
) -> Pin<Box<MinimalOp>> { ) -> Pin<Box<MinimalOp>> {

View file

@ -4,7 +4,7 @@ use super::io::StreamResource;
use crate::deno_error::bad_resource; use crate::deno_error::bad_resource;
use crate::ops::json_op; use crate::ops::json_op;
use crate::resolve_addr::resolve_addr; use crate::resolve_addr::resolve_addr;
use crate::state::ThreadSafeState; use crate::state::State;
use deno_core::*; use deno_core::*;
use futures::future::FutureExt; use futures::future::FutureExt;
use std; use std;
@ -19,7 +19,7 @@ use tokio;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::net::TcpStream; use tokio::net::TcpStream;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op("accept", s.core_op(json_op(s.stateful_op(op_accept)))); i.register_op("accept", s.core_op(json_op(s.stateful_op(op_accept))));
i.register_op("connect", s.core_op(json_op(s.stateful_op(op_connect)))); i.register_op("connect", s.core_op(json_op(s.stateful_op(op_connect))));
i.register_op("shutdown", s.core_op(json_op(s.stateful_op(op_shutdown)))); i.register_op("shutdown", s.core_op(json_op(s.stateful_op(op_shutdown))));
@ -33,7 +33,7 @@ enum AcceptState {
} }
/// Simply accepts a connection. /// Simply accepts a connection.
pub fn accept(state: &ThreadSafeState, rid: ResourceId) -> Accept { pub fn accept(state: &State, rid: ResourceId) -> Accept {
Accept { Accept {
accept_state: AcceptState::Pending, accept_state: AcceptState::Pending,
rid, rid,
@ -45,7 +45,7 @@ pub fn accept(state: &ThreadSafeState, rid: ResourceId) -> Accept {
pub struct Accept<'a> { pub struct Accept<'a> {
accept_state: AcceptState, accept_state: AcceptState,
rid: ResourceId, rid: ResourceId,
state: &'a ThreadSafeState, state: &'a State,
} }
impl Future for Accept<'_> { impl Future for Accept<'_> {
@ -57,8 +57,9 @@ impl Future for Accept<'_> {
panic!("poll Accept after it's done"); panic!("poll Accept after it's done");
} }
let mut table = inner.state.lock_resource_table(); let mut state = inner.state.borrow_mut();
let listener_resource = table let listener_resource = state
.resource_table
.get_mut::<TcpListenerResource>(inner.rid) .get_mut::<TcpListenerResource>(inner.rid)
.ok_or_else(|| { .ok_or_else(|| {
let e = std::io::Error::new( let e = std::io::Error::new(
@ -95,25 +96,29 @@ struct AcceptArgs {
} }
fn op_accept( fn op_accept(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let args: AcceptArgs = serde_json::from_value(args)?; let args: AcceptArgs = serde_json::from_value(args)?;
let rid = args.rid as u32; let rid = args.rid as u32;
let state_ = state.clone(); let state_ = state.clone();
let table = state.lock_resource_table(); {
table let state = state.borrow();
.get::<TcpListenerResource>(rid) state
.ok_or_else(bad_resource)?; .resource_table
.get::<TcpListenerResource>(rid)
.ok_or_else(bad_resource)?;
}
let op = async move { let op = async move {
let (tcp_stream, _socket_addr) = accept(&state_, rid).await?; let (tcp_stream, _socket_addr) = accept(&state_, rid).await?;
let local_addr = tcp_stream.local_addr()?; let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?; let remote_addr = tcp_stream.peer_addr()?;
let mut table = state_.lock_resource_table(); let mut state = state_.borrow_mut();
let rid = let rid = state
table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); .resource_table
.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
Ok(json!({ Ok(json!({
"rid": rid, "rid": rid,
"localAddr": { "localAddr": {
@ -140,7 +145,7 @@ struct ConnectArgs {
} }
fn op_connect( fn op_connect(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -154,9 +159,10 @@ fn op_connect(
let tcp_stream = TcpStream::connect(&addr).await?; let tcp_stream = TcpStream::connect(&addr).await?;
let local_addr = tcp_stream.local_addr()?; let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?; let remote_addr = tcp_stream.peer_addr()?;
let mut table = state_.lock_resource_table(); let mut state = state_.borrow_mut();
let rid = let rid = state
table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); .resource_table
.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
Ok(json!({ Ok(json!({
"rid": rid, "rid": rid,
"localAddr": { "localAddr": {
@ -182,7 +188,7 @@ struct ShutdownArgs {
} }
fn op_shutdown( fn op_shutdown(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -197,8 +203,9 @@ fn op_shutdown(
_ => unimplemented!(), _ => unimplemented!(),
}; };
let mut table = state.lock_resource_table(); let mut state = state.borrow_mut();
let resource = table let resource = state
.resource_table
.get_mut::<StreamResource>(rid) .get_mut::<StreamResource>(rid)
.ok_or_else(bad_resource)?; .ok_or_else(bad_resource)?;
match resource { match resource {
@ -272,7 +279,7 @@ impl TcpListenerResource {
} }
fn op_listen( fn op_listen(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -290,8 +297,10 @@ fn op_listen(
waker: None, waker: None,
local_addr, local_addr,
}; };
let mut table = state.lock_resource_table(); let mut state = state.borrow_mut();
let rid = table.add("tcpListener", Box::new(listener_resource)); let rid = state
.resource_table
.add("tcpListener", Box::new(listener_resource));
debug!( debug!(
"New listener {} {}:{}", "New listener {} {}:{}",
rid, rid,

View file

@ -1,7 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value}; use super::dispatch_json::{Deserialize, JsonOp, Value};
use crate::ops::json_op; use crate::ops::json_op;
use crate::state::ThreadSafeState; use crate::state::State;
use atty; use atty;
use deno_core::*; use deno_core::*;
use std::collections::HashMap; use std::collections::HashMap;
@ -10,7 +10,7 @@ use std::io::{Error, ErrorKind};
use sys_info; use sys_info;
use url::Url; use url::Url;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op("exit", s.core_op(json_op(s.stateful_op(op_exit)))); i.register_op("exit", s.core_op(json_op(s.stateful_op(op_exit))));
i.register_op("is_tty", s.core_op(json_op(s.stateful_op(op_is_tty)))); i.register_op("is_tty", s.core_op(json_op(s.stateful_op(op_is_tty))));
i.register_op("env", s.core_op(json_op(s.stateful_op(op_env)))); i.register_op("env", s.core_op(json_op(s.stateful_op(op_env))));
@ -27,7 +27,7 @@ struct GetDirArgs {
} }
fn op_get_dir( fn op_get_dir(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -73,7 +73,7 @@ fn op_get_dir(
} }
fn op_exec_path( fn op_exec_path(
state: &ThreadSafeState, state: &State,
_args: Value, _args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -93,7 +93,7 @@ struct SetEnv {
} }
fn op_set_env( fn op_set_env(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -104,7 +104,7 @@ fn op_set_env(
} }
fn op_env( fn op_env(
state: &ThreadSafeState, state: &State,
_args: Value, _args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -119,7 +119,7 @@ struct GetEnv {
} }
fn op_get_env( fn op_get_env(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -138,7 +138,7 @@ struct Exit {
} }
fn op_exit( fn op_exit(
_s: &ThreadSafeState, _s: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -147,7 +147,7 @@ fn op_exit(
} }
fn op_is_tty( fn op_is_tty(
_s: &ThreadSafeState, _s: &State,
_args: Value, _args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -159,7 +159,7 @@ fn op_is_tty(
} }
fn op_hostname( fn op_hostname(
state: &ThreadSafeState, state: &State,
_args: Value, _args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {

View file

@ -3,11 +3,11 @@ use super::dispatch_json::{Deserialize, JsonOp, Value};
use crate::deno_error::other_error; use crate::deno_error::other_error;
use crate::fs as deno_fs; use crate::fs as deno_fs;
use crate::ops::json_op; use crate::ops::json_op;
use crate::state::ThreadSafeState; use crate::state::State;
use deno_core::*; use deno_core::*;
use std::path::Path; use std::path::Path;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op( i.register_op(
"query_permission", "query_permission",
s.core_op(json_op(s.stateful_op(op_query_permission))), s.core_op(json_op(s.stateful_op(op_query_permission))),
@ -38,14 +38,14 @@ fn resolve_path(path: &str) -> String {
} }
pub fn op_query_permission( pub fn op_query_permission(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let args: PermissionArgs = serde_json::from_value(args)?; let args: PermissionArgs = serde_json::from_value(args)?;
let permissions = state.permissions.lock().unwrap(); let state = state.borrow();
let resolved_path = args.path.as_ref().map(String::as_str).map(resolve_path); let resolved_path = args.path.as_ref().map(String::as_str).map(resolve_path);
let perm = permissions.get_permission_state( let perm = state.permissions.get_permission_state(
&args.name, &args.name,
&args.url.as_ref().map(String::as_str), &args.url.as_ref().map(String::as_str),
&resolved_path.as_ref().map(String::as_str).map(Path::new), &resolved_path.as_ref().map(String::as_str).map(Path::new),
@ -54,12 +54,13 @@ pub fn op_query_permission(
} }
pub fn op_revoke_permission( pub fn op_revoke_permission(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let args: PermissionArgs = serde_json::from_value(args)?; let args: PermissionArgs = serde_json::from_value(args)?;
let mut permissions = state.permissions.lock().unwrap(); let mut state = state.borrow_mut();
let permissions = &mut state.permissions;
match args.name.as_ref() { match args.name.as_ref() {
"run" => permissions.allow_run.revoke(), "run" => permissions.allow_run.revoke(),
"read" => permissions.allow_read.revoke(), "read" => permissions.allow_read.revoke(),
@ -80,12 +81,13 @@ pub fn op_revoke_permission(
} }
pub fn op_request_permission( pub fn op_request_permission(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let args: PermissionArgs = serde_json::from_value(args)?; let args: PermissionArgs = serde_json::from_value(args)?;
let mut permissions = state.permissions.lock().unwrap(); let mut state = state.borrow_mut();
let permissions = &mut state.permissions;
let resolved_path = args.path.as_ref().map(String::as_str).map(resolve_path); let resolved_path = args.path.as_ref().map(String::as_str).map(resolve_path);
let perm = match args.name.as_ref() { let perm = match args.name.as_ref() {
"run" => Ok(permissions.request_run()), "run" => Ok(permissions.request_run()),

View file

@ -1,7 +1,7 @@
use super::dispatch_json::{Deserialize, JsonOp, Value}; use super::dispatch_json::{Deserialize, JsonOp, Value};
use crate::fs as deno_fs; use crate::fs as deno_fs;
use crate::ops::json_op; use crate::ops::json_op;
use crate::state::ThreadSafeState; use crate::state::State;
use deno_core::*; use deno_core::*;
use dlopen::symbor::Library; use dlopen::symbor::Library;
use std::collections::HashMap; use std::collections::HashMap;
@ -9,11 +9,7 @@ use std::ffi::OsStr;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
pub fn init( pub fn init(i: &mut Isolate, s: &State, r: Arc<deno_core::OpRegistry>) {
i: &mut Isolate,
s: &ThreadSafeState,
r: Arc<deno_core::OpRegistry>,
) {
let r_ = r; let r_ = r;
i.register_op( i.register_op(
"open_plugin", "open_plugin",
@ -56,7 +52,7 @@ struct OpenPluginArgs {
pub fn op_open_plugin( pub fn op_open_plugin(
registry: &Arc<deno_core::OpRegistry>, registry: &Arc<deno_core::OpRegistry>,
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -70,9 +66,14 @@ pub fn op_open_plugin(
lib, lib,
ops: HashMap::new(), ops: HashMap::new(),
}; };
let mut table = state.lock_resource_table(); let mut state_ = state.borrow_mut();
let rid = table.add("plugin", Box::new(plugin_resource)); let rid = state_
let plugin_resource = table.get_mut::<PluginResource>(rid).unwrap(); .resource_table
.add("plugin", Box::new(plugin_resource));
let plugin_resource = state_
.resource_table
.get_mut::<PluginResource>(rid)
.unwrap();
let init_fn = *unsafe { let init_fn = *unsafe {
plugin_resource plugin_resource

View file

@ -4,12 +4,11 @@ use super::io::StreamResource;
use crate::deno_error::bad_resource; use crate::deno_error::bad_resource;
use crate::ops::json_op; use crate::ops::json_op;
use crate::signal::kill; use crate::signal::kill;
use crate::state::ThreadSafeState; use crate::state::State;
use deno_core::*; use deno_core::*;
use futures; use futures;
use futures::future::FutureExt; use futures::future::FutureExt;
use futures::future::TryFutureExt; use futures::future::TryFutureExt;
use futures::task::SpawnExt;
use std; use std;
use std::convert::From; use std::convert::From;
use std::future::Future; use std::future::Future;
@ -22,7 +21,7 @@ use tokio::process::Command;
#[cfg(unix)] #[cfg(unix)]
use std::os::unix::process::ExitStatusExt; use std::os::unix::process::ExitStatusExt;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op("run", s.core_op(json_op(s.stateful_op(op_run)))); i.register_op("run", s.core_op(json_op(s.stateful_op(op_run))));
i.register_op( i.register_op(
"run_status", "run_status",
@ -31,12 +30,10 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
i.register_op("kill", s.core_op(json_op(s.stateful_op(op_kill)))); i.register_op("kill", s.core_op(json_op(s.stateful_op(op_kill))));
} }
fn clone_file( fn clone_file(rid: u32, state: &State) -> Result<std::fs::File, ErrBox> {
rid: u32, let mut state = state.borrow_mut();
state: &ThreadSafeState, let repr = state
) -> Result<std::fs::File, ErrBox> { .resource_table
let mut table = state.lock_resource_table();
let repr = table
.get_mut::<StreamResource>(rid) .get_mut::<StreamResource>(rid)
.ok_or_else(bad_resource)?; .ok_or_else(bad_resource)?;
let file = match repr { let file = match repr {
@ -76,7 +73,7 @@ struct ChildResource {
} }
fn op_run( fn op_run(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -131,7 +128,8 @@ fn op_run(
let mut child = c.spawn()?; let mut child = c.spawn()?;
let pid = child.id(); let pid = child.id();
let mut table = state_.lock_resource_table(); let mut state = state_.borrow_mut();
let table = &mut state.resource_table;
let stdin_rid = match child.stdin.take() { let stdin_rid = match child.stdin.take() {
Some(child_stdin) => { Some(child_stdin) => {
@ -180,7 +178,7 @@ fn op_run(
pub struct ChildStatus { pub struct ChildStatus {
rid: ResourceId, rid: ResourceId,
state: ThreadSafeState, state: State,
} }
impl Future for ChildStatus { impl Future for ChildStatus {
@ -188,8 +186,9 @@ impl Future for ChildStatus {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut(); let inner = self.get_mut();
let mut table = inner.state.lock_resource_table(); let mut state = inner.state.borrow_mut();
let child_resource = table let child_resource = state
.resource_table
.get_mut::<ChildResource>(inner.rid) .get_mut::<ChildResource>(inner.rid)
.ok_or_else(bad_resource)?; .ok_or_else(bad_resource)?;
let child = &mut child_resource.child; let child = &mut child_resource.child;
@ -204,7 +203,7 @@ struct RunStatusArgs {
} }
fn op_run_status( fn op_run_status(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -239,10 +238,7 @@ fn op_run_status(
})) }))
}; };
let pool = futures::executor::ThreadPool::new().unwrap(); Ok(JsonOp::Async(future.boxed_local()))
let handle = pool.spawn_with_handle(future).unwrap();
Ok(JsonOp::Async(handle.boxed()))
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -252,7 +248,7 @@ struct KillArgs {
} }
fn op_kill( fn op_kill(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {

View file

@ -1,12 +1,12 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{JsonOp, Value}; use super::dispatch_json::{JsonOp, Value};
use crate::ops::json_op; use crate::ops::json_op;
use crate::state::ThreadSafeState; use crate::state::State;
use deno_core::*; use deno_core::*;
use rand::thread_rng; use rand::thread_rng;
use rand::Rng; use rand::Rng;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op( i.register_op(
"get_random_values", "get_random_values",
s.core_op(json_op(s.stateful_op(op_get_random_values))), s.core_op(json_op(s.stateful_op(op_get_random_values))),
@ -14,15 +14,14 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
} }
fn op_get_random_values( fn op_get_random_values(
state: &ThreadSafeState, state: &State,
_args: Value, _args: Value,
zero_copy: Option<ZeroCopyBuf>, zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
assert!(zero_copy.is_some()); assert!(zero_copy.is_some());
if let Some(ref seeded_rng) = state.seeded_rng { if let Some(ref mut seeded_rng) = state.borrow_mut().seeded_rng {
let mut rng = seeded_rng.lock().unwrap(); seeded_rng.fill(&mut zero_copy.unwrap()[..]);
rng.fill(&mut zero_copy.unwrap()[..]);
} else { } else {
let mut rng = thread_rng(); let mut rng = thread_rng();
rng.fill(&mut zero_copy.unwrap()[..]); rng.fill(&mut zero_copy.unwrap()[..]);

View file

@ -4,12 +4,12 @@ use crate::deno_error::bad_resource;
use crate::ops::json_op; use crate::ops::json_op;
use crate::repl; use crate::repl;
use crate::repl::Repl; use crate::repl::Repl;
use crate::state::ThreadSafeState; use crate::state::State;
use deno_core::*; use deno_core::*;
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex; use std::sync::Mutex;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op( i.register_op(
"repl_start", "repl_start",
s.core_op(json_op(s.stateful_op(op_repl_start))), s.core_op(json_op(s.stateful_op(op_repl_start))),
@ -29,7 +29,7 @@ struct ReplStartArgs {
} }
fn op_repl_start( fn op_repl_start(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -37,11 +37,11 @@ fn op_repl_start(
debug!("op_repl_start {}", args.history_file); debug!("op_repl_start {}", args.history_file);
let history_path = let history_path =
repl::history_path(&state.global_state.dir, &args.history_file); repl::history_path(&state.borrow().global_state.dir, &args.history_file);
let repl = repl::Repl::new(history_path); let repl = repl::Repl::new(history_path);
let mut state = state.borrow_mut();
let resource = ReplResource(Arc::new(Mutex::new(repl))); let resource = ReplResource(Arc::new(Mutex::new(repl)));
let mut table = state.lock_resource_table(); let rid = state.resource_table.add("repl", Box::new(resource));
let rid = table.add("repl", Box::new(resource));
Ok(JsonOp::Sync(json!(rid))) Ok(JsonOp::Sync(json!(rid)))
} }
@ -52,7 +52,7 @@ struct ReplReadlineArgs {
} }
fn op_repl_readline( fn op_repl_readline(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -60,12 +60,14 @@ fn op_repl_readline(
let rid = args.rid as u32; let rid = args.rid as u32;
let prompt = args.prompt; let prompt = args.prompt;
debug!("op_repl_readline {} {}", rid, prompt); debug!("op_repl_readline {} {}", rid, prompt);
let state = state.clone(); let state = state.borrow();
let resource = state
.resource_table
.get::<ReplResource>(rid)
.ok_or_else(bad_resource)?;
let repl = resource.0.clone();
blocking_json(false, move || { blocking_json(false, move || {
let table = state.lock_resource_table();
let resource = table.get::<ReplResource>(rid).ok_or_else(bad_resource)?;
let repl = resource.0.clone();
let line = repl.lock().unwrap().readline(&prompt)?; let line = repl.lock().unwrap().readline(&prompt)?;
Ok(json!(line)) Ok(json!(line))
}) })

View file

@ -1,19 +1,19 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{JsonOp, Value}; use super::dispatch_json::{JsonOp, Value};
use crate::ops::json_op; use crate::ops::json_op;
use crate::state::ThreadSafeState; use crate::state::State;
use deno_core::*; use deno_core::*;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op("resources", s.core_op(json_op(s.stateful_op(op_resources)))); i.register_op("resources", s.core_op(json_op(s.stateful_op(op_resources))));
} }
fn op_resources( fn op_resources(
state: &ThreadSafeState, state: &State,
_args: Value, _args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let resource_table = state.lock_resource_table(); let state = state.borrow();
let serialized_resources = resource_table.entries(); let serialized_resources = state.resource_table.entries();
Ok(JsonOp::Sync(json!(serialized_resources))) Ok(JsonOp::Sync(json!(serialized_resources)))
} }

View file

@ -3,7 +3,7 @@ use super::dispatch_json::{JsonOp, Value};
use crate::colors; use crate::colors;
use crate::fs as deno_fs; use crate::fs as deno_fs;
use crate::ops::json_op; use crate::ops::json_op;
use crate::state::ThreadSafeState; use crate::state::State;
use crate::version; use crate::version;
use crate::DenoSubcommand; use crate::DenoSubcommand;
use deno_core::*; use deno_core::*;
@ -19,15 +19,16 @@ static BUILD_OS: &str = "win";
#[cfg(target_arch = "x86_64")] #[cfg(target_arch = "x86_64")]
static BUILD_ARCH: &str = "x64"; static BUILD_ARCH: &str = "x64";
pub fn init(i: &mut Isolate, s: &ThreadSafeState) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op("start", s.core_op(json_op(s.stateful_op(op_start)))); i.register_op("start", s.core_op(json_op(s.stateful_op(op_start))));
} }
fn op_start( fn op_start(
state: &ThreadSafeState, state: &State,
_args: Value, _args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let state = state.borrow();
let gs = &state.global_state; let gs = &state.global_state;
Ok(JsonOp::Sync(json!({ Ok(JsonOp::Sync(json!({

View file

@ -3,11 +3,11 @@ use super::dispatch_json::{Deserialize, JsonOp, Value};
use crate::compilers::runtime_compile_async; use crate::compilers::runtime_compile_async;
use crate::compilers::runtime_transpile_async; use crate::compilers::runtime_transpile_async;
use crate::ops::json_op; use crate::ops::json_op;
use crate::state::ThreadSafeState; use crate::state::State;
use deno_core::*; use deno_core::*;
use std::collections::HashMap; use std::collections::HashMap;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op("compile", s.core_op(json_op(s.stateful_op(op_compile)))); i.register_op("compile", s.core_op(json_op(s.stateful_op(op_compile))));
i.register_op("transpile", s.core_op(json_op(s.stateful_op(op_transpile)))); i.register_op("transpile", s.core_op(json_op(s.stateful_op(op_transpile))));
} }
@ -22,13 +22,13 @@ struct CompileArgs {
} }
fn op_compile( fn op_compile(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let args: CompileArgs = serde_json::from_value(args)?; let args: CompileArgs = serde_json::from_value(args)?;
Ok(JsonOp::Async(runtime_compile_async( Ok(JsonOp::Async(runtime_compile_async(
state.global_state.clone(), state.borrow().global_state.clone(),
&args.root_name, &args.root_name,
&args.sources, &args.sources,
args.bundle, args.bundle,
@ -43,13 +43,13 @@ struct TranspileArgs {
} }
fn op_transpile( fn op_transpile(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let args: TranspileArgs = serde_json::from_value(args)?; let args: TranspileArgs = serde_json::from_value(args)?;
Ok(JsonOp::Async(runtime_transpile_async( Ok(JsonOp::Async(runtime_transpile_async(
state.global_state.clone(), state.borrow().global_state.clone(),
&args.sources, &args.sources,
&args.options, &args.options,
))) )))

View file

@ -1,7 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{JsonOp, Value}; use super::dispatch_json::{JsonOp, Value};
use crate::ops::json_op; use crate::ops::json_op;
use crate::state::ThreadSafeState; use crate::state::State;
use deno_core::*; use deno_core::*;
#[cfg(unix)] #[cfg(unix)]
@ -17,7 +17,7 @@ use std::task::Waker;
#[cfg(unix)] #[cfg(unix)]
use tokio::signal::unix::{signal, Signal, SignalKind}; use tokio::signal::unix::{signal, Signal, SignalKind};
pub fn init(i: &mut Isolate, s: &ThreadSafeState) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op( i.register_op(
"signal_bind", "signal_bind",
s.core_op(json_op(s.stateful_op(op_signal_bind))), s.core_op(json_op(s.stateful_op(op_signal_bind))),
@ -51,13 +51,13 @@ struct SignalArgs {
#[cfg(unix)] #[cfg(unix)]
fn op_signal_bind( fn op_signal_bind(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let args: BindSignalArgs = serde_json::from_value(args)?; let args: BindSignalArgs = serde_json::from_value(args)?;
let mut table = state.lock_resource_table(); let mut state = state.borrow_mut();
let rid = table.add( let rid = state.resource_table.add(
"signal", "signal",
Box::new(SignalStreamResource( Box::new(SignalStreamResource(
signal(SignalKind::from_raw(args.signo)).expect(""), signal(SignalKind::from_raw(args.signo)).expect(""),
@ -71,7 +71,7 @@ fn op_signal_bind(
#[cfg(unix)] #[cfg(unix)]
fn op_signal_poll( fn op_signal_poll(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -80,8 +80,10 @@ fn op_signal_poll(
let state_ = state.clone(); let state_ = state.clone();
let future = poll_fn(move |cx| { let future = poll_fn(move |cx| {
let mut table = state_.lock_resource_table(); let mut state = state_.borrow_mut();
if let Some(mut signal) = table.get_mut::<SignalStreamResource>(rid) { if let Some(mut signal) =
state.resource_table.get_mut::<SignalStreamResource>(rid)
{
signal.1 = Some(cx.waker().clone()); signal.1 = Some(cx.waker().clone());
return signal.0.poll_recv(cx); return signal.0.poll_recv(cx);
} }
@ -94,14 +96,14 @@ fn op_signal_poll(
#[cfg(unix)] #[cfg(unix)]
pub fn op_signal_unbind( pub fn op_signal_unbind(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let args: SignalArgs = serde_json::from_value(args)?; let args: SignalArgs = serde_json::from_value(args)?;
let rid = args.rid as u32; let rid = args.rid as u32;
let mut table = state.lock_resource_table(); let mut state = state.borrow_mut();
let resource = table.get::<SignalStreamResource>(rid); let resource = state.resource_table.get::<SignalStreamResource>(rid);
if let Some(signal) = resource { if let Some(signal) = resource {
if let Some(waker) = &signal.1 { if let Some(waker) = &signal.1 {
// Wakes up the pending poll if exists. // Wakes up the pending poll if exists.
@ -109,13 +111,13 @@ pub fn op_signal_unbind(
waker.clone().wake(); waker.clone().wake();
} }
} }
table.close(rid).ok_or_else(bad_resource)?; state.resource_table.close(rid).ok_or_else(bad_resource)?;
Ok(JsonOp::Sync(json!({}))) Ok(JsonOp::Sync(json!({})))
} }
#[cfg(not(unix))] #[cfg(not(unix))]
pub fn op_signal_bind( pub fn op_signal_bind(
_state: &ThreadSafeState, _state: &State,
_args: Value, _args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -124,7 +126,7 @@ pub fn op_signal_bind(
#[cfg(not(unix))] #[cfg(not(unix))]
fn op_signal_unbind( fn op_signal_unbind(
_state: &ThreadSafeState, _state: &State,
_args: Value, _args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -133,7 +135,7 @@ fn op_signal_unbind(
#[cfg(not(unix))] #[cfg(not(unix))]
fn op_signal_poll( fn op_signal_poll(
_state: &ThreadSafeState, _state: &State,
_args: Value, _args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {

View file

@ -1,14 +1,14 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value}; use super::dispatch_json::{Deserialize, JsonOp, Value};
use crate::ops::json_op; use crate::ops::json_op;
use crate::state::ThreadSafeState; use crate::state::State;
use deno_core::*; use deno_core::*;
use futures::future::FutureExt; use futures::future::FutureExt;
use std; use std;
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op( i.register_op(
"global_timer_stop", "global_timer_stop",
s.core_op(json_op(s.stateful_op(op_global_timer_stop))), s.core_op(json_op(s.stateful_op(op_global_timer_stop))),
@ -21,13 +21,12 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
} }
fn op_global_timer_stop( fn op_global_timer_stop(
state: &ThreadSafeState, state: &State,
_args: Value, _args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let state = state; let mut state = state.borrow_mut();
let mut t = state.global_timer.lock().unwrap(); state.global_timer.cancel();
t.cancel();
Ok(JsonOp::Sync(json!({}))) Ok(JsonOp::Sync(json!({})))
} }
@ -37,17 +36,17 @@ struct GlobalTimerArgs {
} }
fn op_global_timer( fn op_global_timer(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let args: GlobalTimerArgs = serde_json::from_value(args)?; let args: GlobalTimerArgs = serde_json::from_value(args)?;
let val = args.timeout; let val = args.timeout;
let state = state; let mut state = state.borrow_mut();
let mut t = state.global_timer.lock().unwrap();
let deadline = Instant::now() + Duration::from_millis(val); let deadline = Instant::now() + Duration::from_millis(val);
let f = t let f = state
.global_timer
.new_timeout(deadline) .new_timeout(deadline)
.then(move |_| futures::future::ok(json!({}))); .then(move |_| futures::future::ok(json!({})));
@ -59,19 +58,19 @@ fn op_global_timer(
// If the High precision flag is not set, the // If the High precision flag is not set, the
// nanoseconds are rounded on 2ms. // nanoseconds are rounded on 2ms.
fn op_now( fn op_now(
state: &ThreadSafeState, state: &State,
_args: Value, _args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let state = state.borrow();
let seconds = state.start_time.elapsed().as_secs(); let seconds = state.start_time.elapsed().as_secs();
let mut subsec_nanos = state.start_time.elapsed().subsec_nanos(); let mut subsec_nanos = state.start_time.elapsed().subsec_nanos();
let reduced_time_precision = 2_000_000; // 2ms in nanoseconds let reduced_time_precision = 2_000_000; // 2ms in nanoseconds
let permissions = state.permissions.lock().unwrap();
// If the permission is not enabled // If the permission is not enabled
// Round the nano result on 2 milliseconds // Round the nano result on 2 milliseconds
// see: https://developer.mozilla.org/en-US/docs/Web/API/DOMHighResTimeStamp#Reduced_time_precision // see: https://developer.mozilla.org/en-US/docs/Web/API/DOMHighResTimeStamp#Reduced_time_precision
if !permissions.allow_hrtime.is_allow() { if !state.permissions.allow_hrtime.is_allow() {
subsec_nanos -= subsec_nanos % reduced_time_precision subsec_nanos -= subsec_nanos % reduced_time_precision
} }

View file

@ -6,7 +6,7 @@ use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind; use crate::deno_error::ErrorKind;
use crate::ops::json_op; use crate::ops::json_op;
use crate::resolve_addr::resolve_addr; use crate::resolve_addr::resolve_addr;
use crate::state::ThreadSafeState; use crate::state::State;
use deno_core::*; use deno_core::*;
use futures::future::FutureExt; use futures::future::FutureExt;
use std; use std;
@ -35,7 +35,7 @@ use webpki;
use webpki::DNSNameRef; use webpki::DNSNameRef;
use webpki_roots; use webpki_roots;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op( i.register_op(
"connect_tls", "connect_tls",
s.core_op(json_op(s.stateful_op(op_connect_tls))), s.core_op(json_op(s.stateful_op(op_connect_tls))),
@ -60,7 +60,7 @@ struct ConnectTLSArgs {
} }
pub fn op_connect_tls( pub fn op_connect_tls(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -95,8 +95,8 @@ pub fn op_connect_tls(
let dnsname = let dnsname =
DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup"); DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?; let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
let mut table = state_.lock_resource_table(); let mut state = state_.borrow_mut();
let rid = table.add( let rid = state.resource_table.add(
"clientTlsStream", "clientTlsStream",
Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))), Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))),
); );
@ -241,7 +241,7 @@ struct ListenTlsArgs {
} }
fn op_listen_tls( fn op_listen_tls(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -270,8 +270,10 @@ fn op_listen_tls(
waker: None, waker: None,
local_addr, local_addr,
}; };
let mut table = state.lock_resource_table(); let mut state = state.borrow_mut();
let rid = table.add("tlsListener", Box::new(tls_listener_resource)); let rid = state
.resource_table
.add("tlsListener", Box::new(tls_listener_resource));
Ok(JsonOp::Sync(json!({ Ok(JsonOp::Sync(json!({
"rid": rid, "rid": rid,
@ -290,7 +292,7 @@ enum AcceptTlsState {
} }
/// Simply accepts a TLS connection. /// Simply accepts a TLS connection.
pub fn accept_tls(state: &ThreadSafeState, rid: ResourceId) -> AcceptTls { pub fn accept_tls(state: &State, rid: ResourceId) -> AcceptTls {
AcceptTls { AcceptTls {
accept_state: AcceptTlsState::Pending, accept_state: AcceptTlsState::Pending,
rid, rid,
@ -302,7 +304,7 @@ pub fn accept_tls(state: &ThreadSafeState, rid: ResourceId) -> AcceptTls {
pub struct AcceptTls { pub struct AcceptTls {
accept_state: AcceptTlsState, accept_state: AcceptTlsState,
rid: ResourceId, rid: ResourceId,
state: ThreadSafeState, state: State,
} }
impl Future for AcceptTls { impl Future for AcceptTls {
@ -314,8 +316,9 @@ impl Future for AcceptTls {
panic!("poll AcceptTls after it's done"); panic!("poll AcceptTls after it's done");
} }
let mut table = inner.state.lock_resource_table(); let mut state = inner.state.borrow_mut();
let listener_resource = table let listener_resource = state
.resource_table
.get_mut::<TlsListenerResource>(inner.rid) .get_mut::<TlsListenerResource>(inner.rid)
.ok_or_else(|| { .ok_or_else(|| {
let e = std::io::Error::new( let e = std::io::Error::new(
@ -352,7 +355,7 @@ struct AcceptTlsArgs {
} }
fn op_accept_tls( fn op_accept_tls(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -364,8 +367,9 @@ fn op_accept_tls(
let local_addr = tcp_stream.local_addr()?; let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?; let remote_addr = tcp_stream.peer_addr()?;
let tls_acceptor = { let tls_acceptor = {
let table = state.lock_resource_table(); let state = state.borrow();
let resource = table let resource = state
.resource_table
.get::<TlsListenerResource>(rid) .get::<TlsListenerResource>(rid)
.ok_or_else(bad_resource) .ok_or_else(bad_resource)
.expect("Can't find tls listener"); .expect("Can't find tls listener");
@ -373,8 +377,8 @@ fn op_accept_tls(
}; };
let tls_stream = tls_acceptor.accept(tcp_stream).await?; let tls_stream = tls_acceptor.accept(tcp_stream).await?;
let rid = { let rid = {
let mut table = state.lock_resource_table(); let mut state = state.borrow_mut();
table.add( state.resource_table.add(
"serverTlsStream", "serverTlsStream",
Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))), Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))),
) )

View file

@ -3,14 +3,14 @@ use super::dispatch_json::{JsonOp, Value};
use crate::deno_error::DenoError; use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind; use crate::deno_error::ErrorKind;
use crate::ops::json_op; use crate::ops::json_op;
use crate::state::ThreadSafeState; use crate::state::State;
use deno_core::*; use deno_core::*;
use futures; use futures;
use futures::future::FutureExt; use futures::future::FutureExt;
use std; use std;
use std::convert::From; use std::convert::From;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op( i.register_op(
"worker_post_message", "worker_post_message",
s.core_op(json_op(s.stateful_op(op_worker_post_message))), s.core_op(json_op(s.stateful_op(op_worker_post_message))),
@ -23,14 +23,21 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
/// Get message from host as guest worker /// Get message from host as guest worker
fn op_worker_get_message( fn op_worker_get_message(
state: &ThreadSafeState, state: &State,
_args: Value, _args: Value,
_data: Option<ZeroCopyBuf>, _data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let state_ = state.clone(); let state_ = state.clone();
let op = async move { let op = async move {
let c = state_.worker_channels_internal.lock().unwrap(); let fut = {
let maybe_buf = c.as_ref().unwrap().get_message().await; let state = state_.borrow();
state
.worker_channels_internal
.as_ref()
.unwrap()
.get_message()
};
let maybe_buf = fut.await;
debug!("op_worker_get_message"); debug!("op_worker_get_message");
Ok(json!({ "data": maybe_buf })) Ok(json!({ "data": maybe_buf }))
}; };
@ -40,13 +47,17 @@ fn op_worker_get_message(
/// Post message to host as guest worker /// Post message to host as guest worker
fn op_worker_post_message( fn op_worker_post_message(
state: &ThreadSafeState, state: &State,
_args: Value, _args: Value,
data: Option<ZeroCopyBuf>, data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
let c = state.worker_channels_internal.lock().unwrap(); let state = state.borrow();
let fut = c.as_ref().unwrap().post_message(d); let fut = state
.worker_channels_internal
.as_ref()
.unwrap()
.post_message(d);
futures::executor::block_on(fut) futures::executor::block_on(fut)
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;

View file

@ -4,11 +4,11 @@ use crate::deno_error::bad_resource;
use crate::deno_error::js_check; use crate::deno_error::js_check;
use crate::deno_error::DenoError; use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind; use crate::deno_error::ErrorKind;
use crate::ops::dispatch_json::JsonResult;
use crate::ops::json_op; use crate::ops::json_op;
use crate::startup_data; use crate::startup_data;
use crate::state::ThreadSafeState; use crate::state::State;
use crate::web_worker::WebWorker; use crate::web_worker::WebWorker;
use crate::worker::WorkerChannelsExternal;
use deno_core::*; use deno_core::*;
use futures; use futures;
use futures::future::FutureExt; use futures::future::FutureExt;
@ -17,7 +17,7 @@ use std;
use std::convert::From; use std::convert::From;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op( i.register_op(
"create_worker", "create_worker",
s.core_op(json_op(s.stateful_op(op_create_worker))), s.core_op(json_op(s.stateful_op(op_create_worker))),
@ -48,7 +48,7 @@ struct CreateWorkerArgs {
/// Create worker as the host /// Create worker as the host
fn op_create_worker( fn op_create_worker(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_data: Option<ZeroCopyBuf>, _data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -59,37 +59,31 @@ fn op_create_worker(
let source_code = args.source_code.clone(); let source_code = args.source_code.clone();
let args_name = args.name; let args_name = args.name;
let parent_state = state.clone(); let parent_state = state.clone();
let state = state.borrow();
let global_state = state.global_state.clone();
let child_permissions = state.permissions.clone();
let referrer = state.main_module.to_string();
drop(state);
let (load_sender, load_receiver) = let (handle_sender, handle_receiver) =
std::sync::mpsc::sync_channel::<JsonResult>(1); std::sync::mpsc::sync_channel::<Result<WorkerChannelsExternal, ErrBox>>(1);
// TODO(bartlomieju): Isn't this wrong?
let result = ModuleSpecifier::resolve_url_or_path(&specifier)?;
let module_specifier = if !has_source_code {
ModuleSpecifier::resolve_import(&specifier, &referrer)?
} else {
result
};
std::thread::spawn(move || { std::thread::spawn(move || {
// TODO(bartlomieju): Isn't this wrong? let result = State::new_for_worker(
let result = ModuleSpecifier::resolve_url_or_path(&specifier); global_state,
if let Err(err) = result { Some(child_permissions), // by default share with parent
load_sender.send(Err(err.into())).unwrap();
return;
}
let module_specifier = if !has_source_code {
let referrer = parent_state.main_module.to_string();
let result = ModuleSpecifier::resolve_import(&specifier, &referrer);
if let Err(err) = result {
load_sender.send(Err(err.into())).unwrap();
return;
}
result.unwrap()
} else {
result.unwrap()
};
let result = ThreadSafeState::new_for_worker(
parent_state.global_state.clone(),
Some(parent_state.permissions.clone()), // by default share with parent
module_specifier.clone(), module_specifier.clone(),
); );
if let Err(err) = result { if let Err(err) = result {
load_sender.send(Err(err)).unwrap(); handle_sender.send(Err(err)).unwrap();
return; return;
} }
let child_state = result.unwrap(); let child_state = result.unwrap();
@ -109,12 +103,12 @@ fn op_create_worker(
js_check(worker.execute(&script)); js_check(worker.execute(&script));
js_check(worker.execute("runWorkerMessageLoop()")); js_check(worker.execute("runWorkerMessageLoop()"));
let worker_id = parent_state.add_child_worker(&worker); handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
// Has provided source code, execute immediately. // Has provided source code, execute immediately.
if has_source_code { if has_source_code {
js_check(worker.execute(&source_code)); js_check(worker.execute(&source_code));
load_sender.send(Ok(json!({ "id": worker_id }))).unwrap(); // FIXME(bartlomieju): runtime is not run in this case
return; return;
} }
@ -128,14 +122,13 @@ fn op_create_worker(
} }
.boxed_local(); .boxed_local();
load_sender.send(Ok(json!({ "id": worker_id }))).unwrap();
crate::tokio_util::run_basic(fut); crate::tokio_util::run_basic(fut);
}); });
let r = load_receiver.recv().unwrap(); let handle = handle_receiver.recv().unwrap()?;
let worker_id = parent_state.add_child_worker(handle);
Ok(JsonOp::Sync(r.unwrap())) Ok(JsonOp::Sync(json!({ "id": worker_id })))
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -144,16 +137,15 @@ struct WorkerArgs {
} }
fn op_host_close_worker( fn op_host_close_worker(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_data: Option<ZeroCopyBuf>, _data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let args: WorkerArgs = serde_json::from_value(args)?; let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32; let id = args.id as u32;
let state_ = state.clone(); let mut state = state.borrow_mut();
let mut workers_table = state_.workers.lock().unwrap(); let maybe_worker_handle = state.workers.remove(&id);
let maybe_worker_handle = workers_table.remove(&id);
if let Some(worker_handle) = maybe_worker_handle { if let Some(worker_handle) = maybe_worker_handle {
let mut sender = worker_handle.sender.clone(); let mut sender = worker_handle.sender.clone();
sender.close_channel(); sender.close_channel();
@ -173,16 +165,16 @@ struct HostGetMessageArgs {
/// Get message from guest worker as host /// Get message from guest worker as host
fn op_host_get_message( fn op_host_get_message(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
_data: Option<ZeroCopyBuf>, _data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let args: HostGetMessageArgs = serde_json::from_value(args)?; let args: HostGetMessageArgs = serde_json::from_value(args)?;
let state_ = state.clone();
let id = args.id as u32; let id = args.id as u32;
let mut table = state_.workers.lock().unwrap();
let state = state.borrow();
// TODO: don't return bad resource anymore // TODO: don't return bad resource anymore
let worker_handle = table.get_mut(&id).ok_or_else(bad_resource)?; let worker_handle = state.workers.get(&id).ok_or_else(bad_resource)?;
let fut = worker_handle.get_message(); let fut = worker_handle.get_message();
let op = async move { let op = async move {
let maybe_buf = fut.await; let maybe_buf = fut.await;
@ -198,7 +190,7 @@ struct HostPostMessageArgs {
/// Post message to guest worker as host /// Post message to guest worker as host
fn op_host_post_message( fn op_host_post_message(
state: &ThreadSafeState, state: &State,
args: Value, args: Value,
data: Option<ZeroCopyBuf>, data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
@ -207,9 +199,9 @@ fn op_host_post_message(
let msg = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); let msg = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
debug!("post message to worker {}", id); debug!("post message to worker {}", id);
let mut table = state.workers.lock().unwrap(); let state = state.borrow();
// TODO: don't return bad resource anymore // TODO: don't return bad resource anymore
let worker_handle = table.get_mut(&id).ok_or_else(bad_resource)?; let worker_handle = state.workers.get(&id).ok_or_else(bad_resource)?;
let fut = worker_handle let fut = worker_handle
.post_message(msg) .post_message(msg)
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string())); .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()));
@ -218,10 +210,11 @@ fn op_host_post_message(
} }
fn op_metrics( fn op_metrics(
state: &ThreadSafeState, state: &State,
_args: Value, _args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> { ) -> Result<JsonOp, ErrBox> {
let state = state.borrow();
let m = &state.metrics; let m = &state.metrics;
Ok(JsonOp::Sync(json!({ Ok(JsonOp::Sync(json!({

View file

@ -55,7 +55,7 @@ impl fmt::Debug for Shell {
enum ShellOut { enum ShellOut {
/// A plain write object without color support /// A plain write object without color support
// TODO(ry) Disabling this type of output because it makes Shell // TODO(ry) Disabling this type of output because it makes Shell
// not thread safe and thus not includable in ThreadSafeState. // not thread safe and thus not includable in State.
// But I think we will want this in the future. // But I think we will want this in the future.
//Write(Box<dyn Write>), //Write(Box<dyn Write>),
/// Color-enabled stdio, with information on whether color should be used /// Color-enabled stdio, with information on whether color should be used

View file

@ -8,7 +8,6 @@ use crate::metrics::Metrics;
use crate::ops::JsonOp; use crate::ops::JsonOp;
use crate::ops::MinimalOp; use crate::ops::MinimalOp;
use crate::permissions::DenoPermissions; use crate::permissions::DenoPermissions;
use crate::web_worker::WebWorker;
use crate::worker::WorkerChannelsExternal; use crate::worker::WorkerChannelsExternal;
use crate::worker::WorkerChannelsInternal; use crate::worker::WorkerChannelsInternal;
use deno_core::Buf; use deno_core::Buf;
@ -19,68 +18,53 @@ use deno_core::ModuleSpecifier;
use deno_core::Op; use deno_core::Op;
use deno_core::ResourceTable; use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf; use deno_core::ZeroCopyBuf;
use futures::channel::mpsc;
use futures::future::FutureExt; use futures::future::FutureExt;
use futures::future::TryFutureExt; use futures::future::TryFutureExt;
use rand::rngs::StdRng; use rand::rngs::StdRng;
use rand::SeedableRng; use rand::SeedableRng;
use serde_json::Value; use serde_json::Value;
use std; use std;
use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
use std::ops::Deref; use std::ops::Deref;
use std::path::Path; use std::path::Path;
use std::pin::Pin; use std::pin::Pin;
use std::rc::Rc;
use std::str; use std::str;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::MutexGuard;
use std::time::Instant; use std::time::Instant;
/// Isolate cannot be passed between threads but ThreadSafeState can. #[derive(Clone)]
/// ThreadSafeState satisfies Send and Sync. So any state that needs to be pub struct State(Rc<RefCell<StateInner>>);
/// accessed outside the main V8 thread should be inside ThreadSafeState.
pub struct ThreadSafeState(Arc<State>);
#[cfg_attr(feature = "cargo-clippy", allow(stutter))] impl Deref for State {
pub struct State { type Target = Rc<RefCell<StateInner>>;
pub global_state: GlobalState,
pub permissions: Arc<Mutex<DenoPermissions>>,
pub main_module: ModuleSpecifier,
/// When flags contains a `.import_map_path` option, the content of the
/// import map file will be resolved and set.
pub import_map: Option<ImportMap>,
pub metrics: Metrics,
pub global_timer: Mutex<GlobalTimer>,
pub workers: Mutex<HashMap<u32, WorkerChannelsExternal>>,
pub worker_channels_internal: Mutex<Option<WorkerChannelsInternal>>,
pub loading_workers: Mutex<HashMap<u32, mpsc::Receiver<Result<(), ErrBox>>>>,
pub next_worker_id: AtomicUsize,
pub start_time: Instant,
pub seeded_rng: Option<Mutex<StdRng>>,
pub resource_table: Mutex<ResourceTable>,
pub target_lib: TargetLib,
}
impl Clone for ThreadSafeState {
fn clone(&self) -> Self {
ThreadSafeState(self.0.clone())
}
}
impl Deref for ThreadSafeState {
type Target = Arc<State>;
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
&self.0 &self.0
} }
} }
impl ThreadSafeState { #[cfg_attr(feature = "cargo-clippy", allow(stutter))]
pub fn lock_resource_table(&self) -> MutexGuard<ResourceTable> { pub struct StateInner {
self.resource_table.lock().unwrap() pub global_state: GlobalState,
} pub permissions: DenoPermissions,
pub main_module: ModuleSpecifier,
/// When flags contains a `.import_map_path` option, the content of the
/// import map file will be resolved and set.
pub import_map: Option<ImportMap>,
pub metrics: Metrics,
pub global_timer: GlobalTimer,
pub workers: HashMap<u32, WorkerChannelsExternal>,
pub worker_channels_internal: Option<WorkerChannelsInternal>,
pub next_worker_id: AtomicUsize,
pub start_time: Instant,
pub seeded_rng: Option<StdRng>,
pub resource_table: ResourceTable,
pub target_lib: TargetLib,
}
impl State {
/// Wrap core `OpDispatcher` to collect metrics. /// Wrap core `OpDispatcher` to collect metrics.
pub fn core_op<D>( pub fn core_op<D>(
&self, &self,
@ -130,7 +114,7 @@ impl ThreadSafeState {
dispatcher: D, dispatcher: D,
) -> impl Fn(i32, Option<ZeroCopyBuf>) -> Pin<Box<MinimalOp>> ) -> impl Fn(i32, Option<ZeroCopyBuf>) -> Pin<Box<MinimalOp>>
where where
D: Fn(&ThreadSafeState, i32, Option<ZeroCopyBuf>) -> Pin<Box<MinimalOp>>, D: Fn(&State, i32, Option<ZeroCopyBuf>) -> Pin<Box<MinimalOp>>,
{ {
let state = self.clone(); let state = self.clone();
@ -149,11 +133,7 @@ impl ThreadSafeState {
dispatcher: D, dispatcher: D,
) -> impl Fn(Value, Option<ZeroCopyBuf>) -> Result<JsonOp, ErrBox> ) -> impl Fn(Value, Option<ZeroCopyBuf>) -> Result<JsonOp, ErrBox>
where where
D: Fn( D: Fn(&State, Value, Option<ZeroCopyBuf>) -> Result<JsonOp, ErrBox>,
&ThreadSafeState,
Value,
Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox>,
{ {
let state = self.clone(); let state = self.clone();
@ -163,7 +143,7 @@ impl ThreadSafeState {
} }
} }
impl Loader for ThreadSafeState { impl Loader for State {
fn resolve( fn resolve(
&self, &self,
specifier: &str, specifier: &str,
@ -171,7 +151,7 @@ impl Loader for ThreadSafeState {
is_main: bool, is_main: bool,
) -> Result<ModuleSpecifier, ErrBox> { ) -> Result<ModuleSpecifier, ErrBox> {
if !is_main { if !is_main {
if let Some(import_map) = &self.import_map { if let Some(import_map) = &self.borrow().import_map {
let result = import_map.resolve(specifier, referrer)?; let result = import_map.resolve(specifier, referrer)?;
if let Some(r) = result { if let Some(r) = result {
return Ok(r); return Ok(r);
@ -198,11 +178,12 @@ impl Loader for ThreadSafeState {
} }
} }
let state = self.borrow();
// TODO(bartlomieju): incrementing resolve_count here has no sense... // TODO(bartlomieju): incrementing resolve_count here has no sense...
self.metrics.resolve_count.fetch_add(1, Ordering::SeqCst); state.metrics.resolve_count.fetch_add(1, Ordering::SeqCst);
let module_url_specified = module_specifier.to_string(); let module_url_specified = module_specifier.to_string();
let global_state = self.global_state.clone(); let global_state = state.global_state.clone();
let target_lib = self.target_lib.clone(); let target_lib = state.target_lib.clone();
let fut = async move { let fut = async move {
let compiled_module = global_state let compiled_module = global_state
.fetch_compiled_module(module_specifier, maybe_referrer, target_lib) .fetch_compiled_module(module_specifier, maybe_referrer, target_lib)
@ -220,11 +201,11 @@ impl Loader for ThreadSafeState {
} }
} }
impl ThreadSafeState { impl State {
/// If `shared_permission` is None then permissions from globa state are used. /// If `shared_permission` is None then permissions from globa state are used.
pub fn new( pub fn new(
global_state: GlobalState, global_state: GlobalState,
shared_permissions: Option<Arc<Mutex<DenoPermissions>>>, shared_permissions: Option<DenoPermissions>,
main_module: ModuleSpecifier, main_module: ModuleSpecifier,
) -> Result<Self, ErrBox> { ) -> Result<Self, ErrBox> {
let import_map: Option<ImportMap> = let import_map: Option<ImportMap> =
@ -234,116 +215,114 @@ impl ThreadSafeState {
}; };
let seeded_rng = match global_state.flags.seed { let seeded_rng = match global_state.flags.seed {
Some(seed) => Some(Mutex::new(StdRng::seed_from_u64(seed))), Some(seed) => Some(StdRng::seed_from_u64(seed)),
None => None, None => None,
}; };
let permissions = if let Some(perm) = shared_permissions { let permissions = if let Some(perm) = shared_permissions {
perm perm
} else { } else {
Arc::new(Mutex::new(global_state.permissions.clone())) global_state.permissions.clone()
}; };
let state = State { let state = Rc::new(RefCell::new(StateInner {
global_state, global_state,
main_module, main_module,
permissions, permissions,
import_map, import_map,
metrics: Metrics::default(), metrics: Metrics::default(),
global_timer: Mutex::new(GlobalTimer::new()), global_timer: GlobalTimer::new(),
worker_channels_internal: Mutex::new(None), worker_channels_internal: None,
workers: Mutex::new(HashMap::new()), workers: HashMap::new(),
loading_workers: Mutex::new(HashMap::new()),
next_worker_id: AtomicUsize::new(0), next_worker_id: AtomicUsize::new(0),
start_time: Instant::now(), start_time: Instant::now(),
seeded_rng, seeded_rng,
resource_table: Mutex::new(ResourceTable::default()), resource_table: ResourceTable::default(),
target_lib: TargetLib::Main, target_lib: TargetLib::Main,
}; }));
Ok(ThreadSafeState(Arc::new(state))) Ok(Self(state))
} }
/// If `shared_permission` is None then permissions from globa state are used. /// If `shared_permission` is None then permissions from globa state are used.
pub fn new_for_worker( pub fn new_for_worker(
global_state: GlobalState, global_state: GlobalState,
shared_permissions: Option<Arc<Mutex<DenoPermissions>>>, shared_permissions: Option<DenoPermissions>,
main_module: ModuleSpecifier, main_module: ModuleSpecifier,
) -> Result<Self, ErrBox> { ) -> Result<Self, ErrBox> {
let seeded_rng = match global_state.flags.seed { let seeded_rng = match global_state.flags.seed {
Some(seed) => Some(Mutex::new(StdRng::seed_from_u64(seed))), Some(seed) => Some(StdRng::seed_from_u64(seed)),
None => None, None => None,
}; };
let permissions = if let Some(perm) = shared_permissions { let permissions = if let Some(perm) = shared_permissions {
perm perm
} else { } else {
Arc::new(Mutex::new(global_state.permissions.clone())) global_state.permissions.clone()
}; };
let state = State { let state = Rc::new(RefCell::new(StateInner {
global_state, global_state,
main_module, main_module,
permissions, permissions,
import_map: None, import_map: None,
metrics: Metrics::default(), metrics: Metrics::default(),
global_timer: Mutex::new(GlobalTimer::new()), global_timer: GlobalTimer::new(),
worker_channels_internal: Mutex::new(None), worker_channels_internal: None,
workers: Mutex::new(HashMap::new()), workers: HashMap::new(),
loading_workers: Mutex::new(HashMap::new()),
next_worker_id: AtomicUsize::new(0), next_worker_id: AtomicUsize::new(0),
start_time: Instant::now(), start_time: Instant::now(),
seeded_rng, seeded_rng,
resource_table: Mutex::new(ResourceTable::default()), resource_table: ResourceTable::default(),
target_lib: TargetLib::Worker, target_lib: TargetLib::Worker,
}; }));
Ok(ThreadSafeState(Arc::new(state))) Ok(Self(state))
} }
pub fn add_child_worker(&self, worker: &WebWorker) -> u32 { pub fn add_child_worker(&self, handle: WorkerChannelsExternal) -> u32 {
let worker_id = self.next_worker_id.fetch_add(1, Ordering::Relaxed) as u32; let mut inner_state = self.borrow_mut();
let handle = worker.thread_safe_handle(); let worker_id =
let mut workers_tl = self.workers.lock().unwrap(); inner_state.next_worker_id.fetch_add(1, Ordering::Relaxed) as u32;
workers_tl.insert(worker_id, handle); inner_state.workers.insert(worker_id, handle);
worker_id worker_id
} }
#[inline] #[inline]
pub fn check_read(&self, path: &Path) -> Result<(), ErrBox> { pub fn check_read(&self, path: &Path) -> Result<(), ErrBox> {
self.permissions.lock().unwrap().check_read(path) self.borrow().permissions.check_read(path)
} }
#[inline] #[inline]
pub fn check_write(&self, path: &Path) -> Result<(), ErrBox> { pub fn check_write(&self, path: &Path) -> Result<(), ErrBox> {
self.permissions.lock().unwrap().check_write(path) self.borrow().permissions.check_write(path)
} }
#[inline] #[inline]
pub fn check_env(&self) -> Result<(), ErrBox> { pub fn check_env(&self) -> Result<(), ErrBox> {
self.permissions.lock().unwrap().check_env() self.borrow().permissions.check_env()
} }
#[inline] #[inline]
pub fn check_net(&self, hostname: &str, port: u16) -> Result<(), ErrBox> { pub fn check_net(&self, hostname: &str, port: u16) -> Result<(), ErrBox> {
self.permissions.lock().unwrap().check_net(hostname, port) self.borrow().permissions.check_net(hostname, port)
} }
#[inline] #[inline]
pub fn check_net_url(&self, url: &url::Url) -> Result<(), ErrBox> { pub fn check_net_url(&self, url: &url::Url) -> Result<(), ErrBox> {
self.permissions.lock().unwrap().check_net_url(url) self.borrow().permissions.check_net_url(url)
} }
#[inline] #[inline]
pub fn check_run(&self) -> Result<(), ErrBox> { pub fn check_run(&self) -> Result<(), ErrBox> {
self.permissions.lock().unwrap().check_run() self.borrow().permissions.check_run()
} }
#[inline] #[inline]
pub fn check_plugin(&self, filename: &Path) -> Result<(), ErrBox> { pub fn check_plugin(&self, filename: &Path) -> Result<(), ErrBox> {
self.permissions.lock().unwrap().check_plugin(filename) self.borrow().permissions.check_plugin(filename)
} }
pub fn check_dyn_import( pub fn check_dyn_import(
@ -371,10 +350,10 @@ impl ThreadSafeState {
} }
#[cfg(test)] #[cfg(test)]
pub fn mock(main_module: &str) -> ThreadSafeState { pub fn mock(main_module: &str) -> State {
let module_specifier = ModuleSpecifier::resolve_url_or_path(main_module) let module_specifier = ModuleSpecifier::resolve_url_or_path(main_module)
.expect("Invalid entry module"); .expect("Invalid entry module");
ThreadSafeState::new( State::new(
GlobalState::mock(vec!["deno".to_string()]), GlobalState::mock(vec!["deno".to_string()]),
None, None,
module_specifier, module_specifier,
@ -387,28 +366,24 @@ impl ThreadSafeState {
bytes_sent_control: usize, bytes_sent_control: usize,
bytes_sent_data: usize, bytes_sent_data: usize,
) { ) {
self.metrics.ops_dispatched.fetch_add(1, Ordering::SeqCst); let state = self.borrow();
self state.metrics.ops_dispatched.fetch_add(1, Ordering::SeqCst);
state
.metrics .metrics
.bytes_sent_control .bytes_sent_control
.fetch_add(bytes_sent_control, Ordering::SeqCst); .fetch_add(bytes_sent_control, Ordering::SeqCst);
self state
.metrics .metrics
.bytes_sent_data .bytes_sent_data
.fetch_add(bytes_sent_data, Ordering::SeqCst); .fetch_add(bytes_sent_data, Ordering::SeqCst);
} }
pub fn metrics_op_completed(&self, bytes_received: usize) { pub fn metrics_op_completed(&self, bytes_received: usize) {
self.metrics.ops_completed.fetch_add(1, Ordering::SeqCst); let state = self.borrow();
self state.metrics.ops_completed.fetch_add(1, Ordering::SeqCst);
state
.metrics .metrics
.bytes_received .bytes_received
.fetch_add(bytes_received, Ordering::SeqCst); .fetch_add(bytes_received, Ordering::SeqCst);
} }
} }
#[test]
fn thread_safe() {
fn f<S: Send + Sync>(_: S) {}
f(ThreadSafeState::mock("./hello.js"));
}

View file

@ -1,6 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use crate::ops; use crate::ops;
use crate::state::ThreadSafeState; use crate::state::State;
use crate::worker::Worker; use crate::worker::Worker;
use deno_core; use deno_core;
use deno_core::ErrBox; use deno_core::ErrBox;
@ -23,11 +23,7 @@ use std::task::Poll;
pub struct WebWorker(Worker); pub struct WebWorker(Worker);
impl WebWorker { impl WebWorker {
pub fn new( pub fn new(name: String, startup_data: StartupData, state: State) -> Self {
name: String,
startup_data: StartupData,
state: ThreadSafeState,
) -> Self {
let state_ = state.clone(); let state_ = state.clone();
let mut worker = Worker::new(name, startup_data, state_); let mut worker = Worker::new(name, startup_data, state_);
{ {
@ -70,11 +66,11 @@ impl Future for WebWorker {
mod tests { mod tests {
use super::*; use super::*;
use crate::startup_data; use crate::startup_data;
use crate::state::ThreadSafeState; use crate::state::State;
use crate::tokio_util; use crate::tokio_util;
fn create_test_worker() -> WebWorker { fn create_test_worker() -> WebWorker {
let state = ThreadSafeState::mock("./hello.js"); let state = State::mock("./hello.js");
let mut worker = WebWorker::new( let mut worker = WebWorker::new(
"TEST".to_string(), "TEST".to_string(),
startup_data::deno_isolate_init(), startup_data::deno_isolate_init(),
@ -104,48 +100,53 @@ mod tests {
worker.execute(source).unwrap(); worker.execute(source).unwrap();
let handle = worker.thread_safe_handle(); let handle = worker.thread_safe_handle();
let _ = tokio_util::spawn_thread(move || tokio_util::run_basic(worker)); let _ = tokio_util::spawn_thread(move || {
tokio_util::run_basic(async move {
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
let r = handle.post_message(msg.clone()).await;
assert!(r.is_ok());
tokio_util::run_basic(async move { let maybe_msg = handle.get_message().await;
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); assert!(maybe_msg.is_some());
let r = handle.post_message(msg.clone()).await;
assert!(r.is_ok());
let maybe_msg = handle.get_message().await; let r = handle.post_message(msg.clone()).await;
assert!(maybe_msg.is_some()); assert!(r.is_ok());
let r = handle.post_message(msg.clone()).await; let maybe_msg = handle.get_message().await;
assert!(r.is_ok()); assert!(maybe_msg.is_some());
assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");
let maybe_msg = handle.get_message().await; let msg = json!("exit")
assert!(maybe_msg.is_some()); .to_string()
assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]"); .into_boxed_str()
.into_boxed_bytes();
let msg = json!("exit") let r = handle.post_message(msg).await;
.to_string() assert!(r.is_ok());
.into_boxed_str() })
.into_boxed_bytes();
let r = handle.post_message(msg).await;
assert!(r.is_ok());
}); });
let r = tokio_util::run_basic(worker);
assert!(r.is_ok())
} }
#[test] #[test]
fn removed_from_resource_table_on_close() { fn removed_from_resource_table_on_close() {
let mut worker = create_test_worker(); let mut worker = create_test_worker();
let handle = worker.thread_safe_handle(); let handle = worker.thread_safe_handle();
let worker_complete_fut = tokio_util::spawn_thread(move || {
worker worker
.execute("onmessage = () => { delete self.onmessage; }") .execute("onmessage = () => { delete self.onmessage; }")
.unwrap(); .unwrap();
tokio_util::run_basic(worker)
let worker_post_message_fut = tokio_util::spawn_thread(move || {
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
let r = futures::executor::block_on(handle.post_message(msg));
assert!(r.is_ok());
}); });
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
tokio_util::run_basic(async move { tokio_util::run_basic(async move {
let r = handle.post_message(msg).await; worker_post_message_fut.await;
assert!(r.is_ok()); let r = worker.await;
let r = worker_complete_fut.await;
assert!(r.is_ok()); assert!(r.is_ok());
}); });
} }

View file

@ -1,7 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use crate::fmt_errors::JSError; use crate::fmt_errors::JSError;
use crate::ops; use crate::ops;
use crate::state::ThreadSafeState; use crate::state::State;
use deno_core; use deno_core;
use deno_core::Buf; use deno_core::Buf;
use deno_core::ErrBox; use deno_core::ErrBox;
@ -112,28 +112,24 @@ fn create_channels() -> (WorkerChannelsInternal, WorkerChannelsExternal) {
pub struct Worker { pub struct Worker {
pub name: String, pub name: String,
pub isolate: Box<deno_core::EsIsolate>, pub isolate: Box<deno_core::EsIsolate>,
pub state: ThreadSafeState, pub state: State,
external_channels: WorkerChannelsExternal, external_channels: WorkerChannelsExternal,
} }
impl Worker { impl Worker {
pub fn new( pub fn new(name: String, startup_data: StartupData, state: State) -> Self {
name: String,
startup_data: StartupData,
state: ThreadSafeState,
) -> Self {
let mut isolate = let mut isolate =
deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false); deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false);
let global_state_ = state.global_state.clone(); let global_state_ = state.borrow().global_state.clone();
isolate.set_js_error_create(move |v8_exception| { isolate.set_js_error_create(move |v8_exception| {
JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler) JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler)
}); });
let (internal_channels, external_channels) = create_channels(); let (internal_channels, external_channels) = create_channels();
{ {
let mut c = state.worker_channels_internal.lock().unwrap(); let mut state = state.borrow_mut();
*c = Some(internal_channels); state.worker_channels_internal = Some(internal_channels);
} }
Self { Self {
@ -170,7 +166,7 @@ impl Worker {
) -> Result<(), ErrBox> { ) -> Result<(), ErrBox> {
let specifier = module_specifier.to_string(); let specifier = module_specifier.to_string();
let id = self.isolate.load_module(&specifier, maybe_code).await?; let id = self.isolate.load_module(&specifier, maybe_code).await?;
self.state.global_state.progress.done(); self.state.borrow().global_state.progress.done();
if !is_prefetch { if !is_prefetch {
return self.isolate.mod_evaluate(id); return self.isolate.mod_evaluate(id);
} }
@ -203,11 +199,7 @@ impl Future for Worker {
pub struct MainWorker(Worker); pub struct MainWorker(Worker);
impl MainWorker { impl MainWorker {
pub fn new( pub fn new(name: String, startup_data: StartupData, state: State) -> Self {
name: String,
startup_data: StartupData,
state: ThreadSafeState,
) -> Self {
let state_ = state.clone(); let state_ = state.clone();
let mut worker = Worker::new(name, startup_data, state_); let mut worker = Worker::new(name, startup_data, state_);
{ {
@ -257,7 +249,7 @@ mod tests {
use crate::flags; use crate::flags;
use crate::global_state::GlobalState; use crate::global_state::GlobalState;
use crate::startup_data; use crate::startup_data;
use crate::state::ThreadSafeState; use crate::state::State;
use crate::tokio_util; use crate::tokio_util;
use futures::executor::block_on; use futures::executor::block_on;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
@ -280,8 +272,7 @@ mod tests {
ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap(); ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap();
let global_state = GlobalState::new(flags::DenoFlags::default()).unwrap(); let global_state = GlobalState::new(flags::DenoFlags::default()).unwrap();
let state = let state =
ThreadSafeState::new(global_state, None, module_specifier.clone()) State::new(global_state, None, module_specifier.clone()).unwrap();
.unwrap();
let state_ = state.clone(); let state_ = state.clone();
tokio_util::run_basic(async move { tokio_util::run_basic(async move {
let mut worker = let mut worker =
@ -296,8 +287,8 @@ mod tests {
panic!("Future got unexpected error: {:?}", e); panic!("Future got unexpected error: {:?}", e);
} }
}); });
let mut state = state_.borrow_mut();
let metrics = &state_.metrics; let metrics = &mut state.metrics;
assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 2); assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 2);
// Check that we didn't start the compiler. // Check that we didn't start the compiler.
assert_eq!(metrics.compiler_starts.load(Ordering::SeqCst), 0); assert_eq!(metrics.compiler_starts.load(Ordering::SeqCst), 0);
@ -313,8 +304,7 @@ mod tests {
ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap(); ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap();
let global_state = GlobalState::new(flags::DenoFlags::default()).unwrap(); let global_state = GlobalState::new(flags::DenoFlags::default()).unwrap();
let state = let state =
ThreadSafeState::new(global_state, None, module_specifier.clone()) State::new(global_state, None, module_specifier.clone()).unwrap();
.unwrap();
let state_ = state.clone(); let state_ = state.clone();
tokio_util::run_basic(async move { tokio_util::run_basic(async move {
let mut worker = let mut worker =
@ -330,7 +320,8 @@ mod tests {
} }
}); });
let metrics = &state_.metrics; let mut state = state_.borrow_mut();
let metrics = &mut state.metrics;
// TODO assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 2); // TODO assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 2);
// Check that we didn't start the compiler. // Check that we didn't start the compiler.
assert_eq!(metrics.compiler_starts.load(Ordering::SeqCst), 0); assert_eq!(metrics.compiler_starts.load(Ordering::SeqCst), 0);
@ -353,12 +344,8 @@ mod tests {
..flags::DenoFlags::default() ..flags::DenoFlags::default()
}; };
let global_state = GlobalState::new(flags).unwrap(); let global_state = GlobalState::new(flags).unwrap();
let state = ThreadSafeState::new( let state =
global_state.clone(), State::new(global_state.clone(), None, module_specifier.clone()).unwrap();
None,
module_specifier.clone(),
)
.unwrap();
let mut worker = MainWorker::new( let mut worker = MainWorker::new(
"TEST".to_string(), "TEST".to_string(),
startup_data::deno_isolate_init(), startup_data::deno_isolate_init(),
@ -374,6 +361,7 @@ mod tests {
if let Err(e) = (&mut *worker).await { if let Err(e) = (&mut *worker).await {
panic!("Future got unexpected error: {:?}", e); panic!("Future got unexpected error: {:?}", e);
} }
let state = state.borrow_mut();
assert_eq!(state.metrics.resolve_count.load(Ordering::SeqCst), 3); assert_eq!(state.metrics.resolve_count.load(Ordering::SeqCst), 3);
// Check that we've only invoked the compiler once. // Check that we've only invoked the compiler once.
assert_eq!( assert_eq!(
@ -384,7 +372,7 @@ mod tests {
} }
fn create_test_worker() -> MainWorker { fn create_test_worker() -> MainWorker {
let state = ThreadSafeState::mock("./hello.js"); let state = State::mock("./hello.js");
let mut worker = MainWorker::new( let mut worker = MainWorker::new(
"TEST".to_string(), "TEST".to_string(),
startup_data::deno_isolate_init(), startup_data::deno_isolate_init(),

View file

@ -23,7 +23,7 @@ use std::task::Poll;
pub type SourceCodeInfoFuture = pub type SourceCodeInfoFuture =
dyn Future<Output = Result<SourceCodeInfo, ErrBox>>; dyn Future<Output = Result<SourceCodeInfo, ErrBox>>;
pub trait Loader: Send { pub trait Loader {
/// Returns an absolute URL. /// Returns an absolute URL.
/// When implementing an spec-complaint VM, this should be exactly the /// When implementing an spec-complaint VM, this should be exactly the
/// algorithm described here: /// algorithm described here: