mirror of
https://github.com/denoland/deno.git
synced 2025-09-26 12:19:12 +00:00
feat: op registration in core (#3002)
This commit is contained in:
parent
ae26a9c7a2
commit
ffbf0c20cc
8 changed files with 207 additions and 91 deletions
|
@ -1,11 +1,6 @@
|
||||||
// This is not a real HTTP server. We read blindly one time into 'requestBuf',
|
// This is not a real HTTP server. We read blindly one time into 'requestBuf',
|
||||||
// then write this fixed 'responseBuf'. The point of this benchmark is to
|
// then write this fixed 'responseBuf'. The point of this benchmark is to
|
||||||
// exercise the event loop in a simple yet semi-realistic way.
|
// exercise the event loop in a simple yet semi-realistic way.
|
||||||
const OP_LISTEN = 1;
|
|
||||||
const OP_ACCEPT = 2;
|
|
||||||
const OP_READ = 3;
|
|
||||||
const OP_WRITE = 4;
|
|
||||||
const OP_CLOSE = 5;
|
|
||||||
const requestBuf = new Uint8Array(64 * 1024);
|
const requestBuf = new Uint8Array(64 * 1024);
|
||||||
const responseBuf = new Uint8Array(
|
const responseBuf = new Uint8Array(
|
||||||
"HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n"
|
"HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n"
|
||||||
|
@ -80,12 +75,12 @@ function handleAsyncMsgFromRust(opId, buf) {
|
||||||
|
|
||||||
/** Listens on 0.0.0.0:4500, returns rid. */
|
/** Listens on 0.0.0.0:4500, returns rid. */
|
||||||
function listen() {
|
function listen() {
|
||||||
return sendSync(OP_LISTEN, -1);
|
return sendSync(ops["listen"], -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Accepts a connection, returns rid. */
|
/** Accepts a connection, returns rid. */
|
||||||
async function accept(rid) {
|
async function accept(rid) {
|
||||||
return await sendAsync(OP_ACCEPT, rid);
|
return await sendAsync(ops["accept"], rid);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -93,16 +88,16 @@ async function accept(rid) {
|
||||||
* Returns bytes read.
|
* Returns bytes read.
|
||||||
*/
|
*/
|
||||||
async function read(rid, data) {
|
async function read(rid, data) {
|
||||||
return await sendAsync(OP_READ, rid, data);
|
return await sendAsync(ops["read"], rid, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Writes a fixed HTTP response to the socket rid. Returns bytes written. */
|
/** Writes a fixed HTTP response to the socket rid. Returns bytes written. */
|
||||||
async function write(rid, data) {
|
async function write(rid, data) {
|
||||||
return await sendAsync(OP_WRITE, rid, data);
|
return await sendAsync(ops["write"], rid, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
function close(rid) {
|
function close(rid) {
|
||||||
return sendSync(OP_CLOSE, rid);
|
return sendSync(ops["close"], rid);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function serve(rid) {
|
async function serve(rid) {
|
||||||
|
@ -120,8 +115,11 @@ async function serve(rid) {
|
||||||
close(rid);
|
close(rid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let ops;
|
||||||
|
|
||||||
async function main() {
|
async function main() {
|
||||||
Deno.core.setAsyncHandler(handleAsyncMsgFromRust);
|
Deno.core.setAsyncHandler(handleAsyncMsgFromRust);
|
||||||
|
ops = Deno.core.ops();
|
||||||
|
|
||||||
Deno.core.print("http_bench.js start\n");
|
Deno.core.print("http_bench.js start\n");
|
||||||
|
|
||||||
|
|
|
@ -36,12 +36,6 @@ impl log::Log for Logger {
|
||||||
fn flush(&self) {}
|
fn flush(&self) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
const OP_LISTEN: OpId = 1;
|
|
||||||
const OP_ACCEPT: OpId = 2;
|
|
||||||
const OP_READ: OpId = 3;
|
|
||||||
const OP_WRITE: OpId = 4;
|
|
||||||
const OP_CLOSE: OpId = 5;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq)]
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
pub struct Record {
|
pub struct Record {
|
||||||
pub promise_id: i32,
|
pub promise_id: i32,
|
||||||
|
@ -104,48 +98,24 @@ fn test_record_from() {
|
||||||
// TODO test From<&[u8]> for Record
|
// TODO test From<&[u8]> for Record
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send;
|
pub type HttpOp = dyn Future<Item = i32, Error = std::io::Error> + Send;
|
||||||
|
|
||||||
fn dispatch(
|
pub type HttpOpHandler =
|
||||||
op_id: OpId,
|
fn(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp>;
|
||||||
control: &[u8],
|
|
||||||
zero_copy_buf: Option<PinnedBuf>,
|
fn http_op(
|
||||||
) -> CoreOp {
|
handler: HttpOpHandler,
|
||||||
|
) -> impl Fn(&[u8], Option<PinnedBuf>) -> CoreOp {
|
||||||
|
move |control: &[u8], zero_copy_buf: Option<PinnedBuf>| -> CoreOp {
|
||||||
let record = Record::from(control);
|
let record = Record::from(control);
|
||||||
let is_sync = record.promise_id == 0;
|
let is_sync = record.promise_id == 0;
|
||||||
let http_bench_op = match op_id {
|
let op = handler(record.clone(), zero_copy_buf);
|
||||||
OP_LISTEN => {
|
|
||||||
assert!(is_sync);
|
|
||||||
op_listen()
|
|
||||||
}
|
|
||||||
OP_CLOSE => {
|
|
||||||
assert!(is_sync);
|
|
||||||
let rid = record.arg;
|
|
||||||
op_close(rid)
|
|
||||||
}
|
|
||||||
OP_ACCEPT => {
|
|
||||||
assert!(!is_sync);
|
|
||||||
let listener_rid = record.arg;
|
|
||||||
op_accept(listener_rid)
|
|
||||||
}
|
|
||||||
OP_READ => {
|
|
||||||
assert!(!is_sync);
|
|
||||||
let rid = record.arg;
|
|
||||||
op_read(rid, zero_copy_buf)
|
|
||||||
}
|
|
||||||
OP_WRITE => {
|
|
||||||
assert!(!is_sync);
|
|
||||||
let rid = record.arg;
|
|
||||||
op_write(rid, zero_copy_buf)
|
|
||||||
}
|
|
||||||
_ => panic!("bad op {}", op_id),
|
|
||||||
};
|
|
||||||
let mut record_a = record.clone();
|
let mut record_a = record.clone();
|
||||||
let mut record_b = record.clone();
|
let mut record_b = record.clone();
|
||||||
|
|
||||||
let fut = Box::new(
|
let fut = Box::new(
|
||||||
http_bench_op
|
op.and_then(move |result| {
|
||||||
.and_then(move |result| {
|
|
||||||
record_a.result = result;
|
record_a.result = result;
|
||||||
Ok(record_a)
|
Ok(record_a)
|
||||||
})
|
})
|
||||||
|
@ -165,6 +135,7 @@ fn dispatch(
|
||||||
} else {
|
} else {
|
||||||
Op::Async(fut)
|
Op::Async(fut)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
|
@ -181,7 +152,11 @@ fn main() {
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut isolate = deno::Isolate::new(startup_data, false);
|
let mut isolate = deno::Isolate::new(startup_data, false);
|
||||||
isolate.set_dispatch(dispatch);
|
isolate.register_op("listen", http_op(op_listen));
|
||||||
|
isolate.register_op("accept", http_op(op_accept));
|
||||||
|
isolate.register_op("read", http_op(op_read));
|
||||||
|
isolate.register_op("write", http_op(op_write));
|
||||||
|
isolate.register_op("close", http_op(op_close));
|
||||||
|
|
||||||
isolate.then(|r| {
|
isolate.then(|r| {
|
||||||
js_check(r);
|
js_check(r);
|
||||||
|
@ -225,7 +200,8 @@ fn new_rid() -> i32 {
|
||||||
rid as i32
|
rid as i32
|
||||||
}
|
}
|
||||||
|
|
||||||
fn op_accept(listener_rid: i32) -> Box<HttpBenchOp> {
|
fn op_accept(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
|
||||||
|
let listener_rid = record.arg;
|
||||||
debug!("accept {}", listener_rid);
|
debug!("accept {}", listener_rid);
|
||||||
Box::new(
|
Box::new(
|
||||||
futures::future::poll_fn(move || {
|
futures::future::poll_fn(move || {
|
||||||
|
@ -248,9 +224,11 @@ fn op_accept(listener_rid: i32) -> Box<HttpBenchOp> {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn op_listen() -> Box<HttpBenchOp> {
|
fn op_listen(
|
||||||
|
_record: Record,
|
||||||
|
_zero_copy_buf: Option<PinnedBuf>,
|
||||||
|
) -> Box<HttpOp> {
|
||||||
debug!("listen");
|
debug!("listen");
|
||||||
|
|
||||||
Box::new(lazy(move || {
|
Box::new(lazy(move || {
|
||||||
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
|
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
|
||||||
let listener = tokio::net::TcpListener::bind(&addr).unwrap();
|
let listener = tokio::net::TcpListener::bind(&addr).unwrap();
|
||||||
|
@ -262,8 +240,9 @@ fn op_listen() -> Box<HttpBenchOp> {
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn op_close(rid: i32) -> Box<HttpBenchOp> {
|
fn op_close(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
|
||||||
debug!("close");
|
debug!("close");
|
||||||
|
let rid = record.arg;
|
||||||
Box::new(lazy(move || {
|
Box::new(lazy(move || {
|
||||||
let mut table = RESOURCE_TABLE.lock().unwrap();
|
let mut table = RESOURCE_TABLE.lock().unwrap();
|
||||||
let r = table.remove(&rid);
|
let r = table.remove(&rid);
|
||||||
|
@ -272,7 +251,8 @@ fn op_close(rid: i32) -> Box<HttpBenchOp> {
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn op_read(rid: i32, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpBenchOp> {
|
fn op_read(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
|
||||||
|
let rid = record.arg;
|
||||||
debug!("read rid={}", rid);
|
debug!("read rid={}", rid);
|
||||||
let mut zero_copy_buf = zero_copy_buf.unwrap();
|
let mut zero_copy_buf = zero_copy_buf.unwrap();
|
||||||
Box::new(
|
Box::new(
|
||||||
|
@ -293,7 +273,8 @@ fn op_read(rid: i32, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpBenchOp> {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn op_write(rid: i32, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpBenchOp> {
|
fn op_write(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
|
||||||
|
let rid = record.arg;
|
||||||
debug!("write rid={}", rid);
|
debug!("write rid={}", rid);
|
||||||
let zero_copy_buf = zero_copy_buf.unwrap();
|
let zero_copy_buf = zero_copy_buf.unwrap();
|
||||||
Box::new(
|
Box::new(
|
||||||
|
|
|
@ -13,10 +13,10 @@ use crate::libdeno::deno_buf;
|
||||||
use crate::libdeno::deno_dyn_import_id;
|
use crate::libdeno::deno_dyn_import_id;
|
||||||
use crate::libdeno::deno_mod;
|
use crate::libdeno::deno_mod;
|
||||||
use crate::libdeno::deno_pinned_buf;
|
use crate::libdeno::deno_pinned_buf;
|
||||||
use crate::libdeno::OpId;
|
|
||||||
use crate::libdeno::PinnedBuf;
|
use crate::libdeno::PinnedBuf;
|
||||||
use crate::libdeno::Snapshot1;
|
use crate::libdeno::Snapshot1;
|
||||||
use crate::libdeno::Snapshot2;
|
use crate::libdeno::Snapshot2;
|
||||||
|
use crate::ops::*;
|
||||||
use crate::shared_queue::SharedQueue;
|
use crate::shared_queue::SharedQueue;
|
||||||
use crate::shared_queue::RECOMMENDED_SIZE;
|
use crate::shared_queue::RECOMMENDED_SIZE;
|
||||||
use futures::stream::FuturesUnordered;
|
use futures::stream::FuturesUnordered;
|
||||||
|
@ -34,24 +34,6 @@ use std::fmt;
|
||||||
use std::ptr::null;
|
use std::ptr::null;
|
||||||
use std::sync::{Arc, Mutex, Once};
|
use std::sync::{Arc, Mutex, Once};
|
||||||
|
|
||||||
pub type Buf = Box<[u8]>;
|
|
||||||
|
|
||||||
pub type OpAsyncFuture<E> = Box<dyn Future<Item = Buf, Error = E> + Send>;
|
|
||||||
|
|
||||||
type PendingOpFuture =
|
|
||||||
Box<dyn Future<Item = (OpId, Buf), Error = CoreError> + Send>;
|
|
||||||
|
|
||||||
pub enum Op<E> {
|
|
||||||
Sync(Buf),
|
|
||||||
Async(OpAsyncFuture<E>),
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type CoreError = ();
|
|
||||||
|
|
||||||
pub type CoreOp = Op<CoreError>;
|
|
||||||
|
|
||||||
pub type OpResult<E> = Result<Op<E>, E>;
|
|
||||||
|
|
||||||
/// Args: op_id, control_buf, zero_copy_buf
|
/// Args: op_id, control_buf, zero_copy_buf
|
||||||
type CoreDispatchFn = dyn Fn(OpId, &[u8], Option<PinnedBuf>) -> CoreOp;
|
type CoreDispatchFn = dyn Fn(OpId, &[u8], Option<PinnedBuf>) -> CoreOp;
|
||||||
|
|
||||||
|
@ -179,6 +161,7 @@ pub struct Isolate {
|
||||||
pending_dyn_imports: FuturesUnordered<StreamFuture<DynImport>>,
|
pending_dyn_imports: FuturesUnordered<StreamFuture<DynImport>>,
|
||||||
have_unpolled_ops: bool,
|
have_unpolled_ops: bool,
|
||||||
startup_script: Option<OwnedScript>,
|
startup_script: Option<OwnedScript>,
|
||||||
|
op_registry: OpRegistry,
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe impl Send for Isolate {}
|
unsafe impl Send for Isolate {}
|
||||||
|
@ -244,12 +227,17 @@ impl Isolate {
|
||||||
have_unpolled_ops: false,
|
have_unpolled_ops: false,
|
||||||
pending_dyn_imports: FuturesUnordered::new(),
|
pending_dyn_imports: FuturesUnordered::new(),
|
||||||
startup_script,
|
startup_script,
|
||||||
|
op_registry: OpRegistry::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Defines the how Deno.core.dispatch() acts.
|
/// Defines the how Deno.core.dispatch() acts.
|
||||||
/// Called whenever Deno.core.dispatch() is called in JavaScript. zero_copy_buf
|
/// Called whenever Deno.core.dispatch() is called in JavaScript. zero_copy_buf
|
||||||
/// corresponds to the second argument of Deno.core.dispatch().
|
/// corresponds to the second argument of Deno.core.dispatch().
|
||||||
|
///
|
||||||
|
/// If this method is used then ops registered using `op_register` function are
|
||||||
|
/// ignored and all dispatching must be handled manually in provided callback.
|
||||||
|
// TODO: we want to deprecate and remove this API and move to `register_op` API
|
||||||
pub fn set_dispatch<F>(&mut self, f: F)
|
pub fn set_dispatch<F>(&mut self, f: F)
|
||||||
where
|
where
|
||||||
F: Fn(OpId, &[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
|
F: Fn(OpId, &[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
|
||||||
|
@ -257,6 +245,22 @@ impl Isolate {
|
||||||
self.dispatch = Some(Arc::new(f));
|
self.dispatch = Some(Arc::new(f));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// New dispatch mechanism. Requires runtime to explicitly ask for op ids
|
||||||
|
/// before using any of the ops.
|
||||||
|
///
|
||||||
|
/// Ops added using this method are only usable if `dispatch` is not set
|
||||||
|
/// (using `set_dispatch` method).
|
||||||
|
pub fn register_op<F>(&mut self, name: &str, op: F) -> OpId
|
||||||
|
where
|
||||||
|
F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
assert!(
|
||||||
|
self.dispatch.is_none(),
|
||||||
|
"set_dispatch should not be used in conjunction with register_op"
|
||||||
|
);
|
||||||
|
self.op_registry.register(name, op)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn set_dyn_import<F>(&mut self, f: F)
|
pub fn set_dyn_import<F>(&mut self, f: F)
|
||||||
where
|
where
|
||||||
F: Fn(deno_dyn_import_id, &str, &str) -> DynImportStream
|
F: Fn(deno_dyn_import_id, &str, &str) -> DynImportStream
|
||||||
|
@ -329,9 +333,17 @@ impl Isolate {
|
||||||
let isolate = unsafe { Isolate::from_raw_ptr(user_data) };
|
let isolate = unsafe { Isolate::from_raw_ptr(user_data) };
|
||||||
|
|
||||||
let op = if let Some(ref f) = isolate.dispatch {
|
let op = if let Some(ref f) = isolate.dispatch {
|
||||||
|
assert!(
|
||||||
|
op_id != 0,
|
||||||
|
"op_id 0 is a special value that shouldn't be used with dispatch"
|
||||||
|
);
|
||||||
f(op_id, control_buf.as_ref(), PinnedBuf::new(zero_copy_buf))
|
f(op_id, control_buf.as_ref(), PinnedBuf::new(zero_copy_buf))
|
||||||
} else {
|
} else {
|
||||||
panic!("isolate.dispatch not set")
|
isolate.op_registry.call(
|
||||||
|
op_id,
|
||||||
|
control_buf.as_ref(),
|
||||||
|
PinnedBuf::new(zero_copy_buf),
|
||||||
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
debug_assert_eq!(isolate.shared.size(), 0);
|
debug_assert_eq!(isolate.shared.size(), 0);
|
||||||
|
|
|
@ -11,6 +11,7 @@ mod js_errors;
|
||||||
mod libdeno;
|
mod libdeno;
|
||||||
mod module_specifier;
|
mod module_specifier;
|
||||||
mod modules;
|
mod modules;
|
||||||
|
mod ops;
|
||||||
mod shared_queue;
|
mod shared_queue;
|
||||||
|
|
||||||
pub use crate::any_error::*;
|
pub use crate::any_error::*;
|
||||||
|
@ -22,6 +23,7 @@ pub use crate::libdeno::OpId;
|
||||||
pub use crate::libdeno::PinnedBuf;
|
pub use crate::libdeno::PinnedBuf;
|
||||||
pub use crate::module_specifier::*;
|
pub use crate::module_specifier::*;
|
||||||
pub use crate::modules::*;
|
pub use crate::modules::*;
|
||||||
|
pub use crate::ops::*;
|
||||||
|
|
||||||
pub fn v8_version() -> &'static str {
|
pub fn v8_version() -> &'static str {
|
||||||
use std::ffi::CStr;
|
use std::ffi::CStr;
|
||||||
|
|
111
core/ops.rs
Normal file
111
core/ops.rs
Normal file
|
@ -0,0 +1,111 @@
|
||||||
|
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
|
||||||
|
pub use crate::libdeno::OpId;
|
||||||
|
use crate::PinnedBuf;
|
||||||
|
use futures::Future;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
pub type Buf = Box<[u8]>;
|
||||||
|
|
||||||
|
pub type OpAsyncFuture<E> = Box<dyn Future<Item = Buf, Error = E> + Send>;
|
||||||
|
|
||||||
|
pub(crate) type PendingOpFuture =
|
||||||
|
Box<dyn Future<Item = (OpId, Buf), Error = CoreError> + Send>;
|
||||||
|
|
||||||
|
pub type OpResult<E> = Result<Op<E>, E>;
|
||||||
|
|
||||||
|
pub enum Op<E> {
|
||||||
|
Sync(Buf),
|
||||||
|
Async(OpAsyncFuture<E>),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type CoreError = ();
|
||||||
|
|
||||||
|
pub type CoreOp = Op<CoreError>;
|
||||||
|
|
||||||
|
/// Main type describing op
|
||||||
|
type OpDispatcher = dyn Fn(&[u8], Option<PinnedBuf>) -> CoreOp;
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct OpRegistry {
|
||||||
|
dispatchers: Vec<Box<OpDispatcher>>,
|
||||||
|
name_to_id: HashMap<String, OpId>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl OpRegistry {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
let mut registry = Self::default();
|
||||||
|
let op_id = registry.register("ops", |_, _| {
|
||||||
|
// ops is a special op which is handled in call.
|
||||||
|
unreachable!()
|
||||||
|
});
|
||||||
|
assert_eq!(op_id, 0);
|
||||||
|
registry
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register<F>(&mut self, name: &str, op: F) -> OpId
|
||||||
|
where
|
||||||
|
F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
let op_id = self.dispatchers.len() as u32;
|
||||||
|
|
||||||
|
let existing = self.name_to_id.insert(name.to_string(), op_id);
|
||||||
|
assert!(
|
||||||
|
existing.is_none(),
|
||||||
|
format!("Op already registered: {}", name)
|
||||||
|
);
|
||||||
|
|
||||||
|
self.dispatchers.push(Box::new(op));
|
||||||
|
op_id
|
||||||
|
}
|
||||||
|
|
||||||
|
fn json_map(&self) -> Buf {
|
||||||
|
let op_map_json = serde_json::to_string(&self.name_to_id).unwrap();
|
||||||
|
op_map_json.as_bytes().to_owned().into_boxed_slice()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn call(
|
||||||
|
&self,
|
||||||
|
op_id: OpId,
|
||||||
|
control: &[u8],
|
||||||
|
zero_copy_buf: Option<PinnedBuf>,
|
||||||
|
) -> CoreOp {
|
||||||
|
// Op with id 0 has special meaning - it's a special op that is always
|
||||||
|
// provided to retrieve op id map. The map consists of name to `OpId`
|
||||||
|
// mappings.
|
||||||
|
if op_id == 0 {
|
||||||
|
return Op::Sync(self.json_map());
|
||||||
|
}
|
||||||
|
|
||||||
|
let d = &*self.dispatchers.get(op_id as usize).expect("Op not found!");
|
||||||
|
d(control, zero_copy_buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_op_registry() {
|
||||||
|
use std::sync::atomic;
|
||||||
|
use std::sync::Arc;
|
||||||
|
let mut op_registry = OpRegistry::new();
|
||||||
|
|
||||||
|
let c = Arc::new(atomic::AtomicUsize::new(0));
|
||||||
|
let c_ = c.clone();
|
||||||
|
|
||||||
|
let test_id = op_registry.register("test", move |_, _| {
|
||||||
|
c_.fetch_add(1, atomic::Ordering::SeqCst);
|
||||||
|
CoreOp::Sync(Box::new([]))
|
||||||
|
});
|
||||||
|
assert!(test_id != 0);
|
||||||
|
|
||||||
|
let mut expected = HashMap::new();
|
||||||
|
expected.insert("ops".to_string(), 0);
|
||||||
|
expected.insert("test".to_string(), 1);
|
||||||
|
assert_eq!(op_registry.name_to_id, expected);
|
||||||
|
|
||||||
|
let res = op_registry.call(test_id, &[], None);
|
||||||
|
if let Op::Sync(buf) = res {
|
||||||
|
assert_eq!(buf.len(), 0);
|
||||||
|
} else {
|
||||||
|
unreachable!();
|
||||||
|
}
|
||||||
|
assert_eq!(c.load(atomic::Ordering::SeqCst), 1);
|
||||||
|
}
|
|
@ -58,6 +58,13 @@ SharedQueue Binary Layout
|
||||||
Deno.core.recv(handleAsyncMsgFromRust);
|
Deno.core.recv(handleAsyncMsgFromRust);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function ops() {
|
||||||
|
// op id 0 is a special value to retreive the map of registered ops.
|
||||||
|
const opsMapBytes = Deno.core.send(0, new Uint8Array([]), null);
|
||||||
|
const opsMapJson = String.fromCharCode.apply(null, opsMapBytes);
|
||||||
|
return JSON.parse(opsMapJson);
|
||||||
|
}
|
||||||
|
|
||||||
function assert(cond) {
|
function assert(cond) {
|
||||||
if (!cond) {
|
if (!cond) {
|
||||||
throw Error("assert");
|
throw Error("assert");
|
||||||
|
@ -84,7 +91,6 @@ SharedQueue Binary Layout
|
||||||
return shared32[INDEX_NUM_RECORDS] - shared32[INDEX_NUM_SHIFTED_OFF];
|
return shared32[INDEX_NUM_RECORDS] - shared32[INDEX_NUM_SHIFTED_OFF];
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(ry) rename to setMeta
|
|
||||||
function setMeta(index, end, opId) {
|
function setMeta(index, end, opId) {
|
||||||
shared32[INDEX_OFFSETS + 2 * index] = end;
|
shared32[INDEX_OFFSETS + 2 * index] = end;
|
||||||
shared32[INDEX_OFFSETS + 2 * index + 1] = opId;
|
shared32[INDEX_OFFSETS + 2 * index + 1] = opId;
|
||||||
|
@ -189,7 +195,8 @@ SharedQueue Binary Layout
|
||||||
push,
|
push,
|
||||||
reset,
|
reset,
|
||||||
shift
|
shift
|
||||||
}
|
},
|
||||||
|
ops
|
||||||
};
|
};
|
||||||
|
|
||||||
assert(window[GLOBAL_NAMESPACE] != null);
|
assert(window[GLOBAL_NAMESPACE] != null);
|
||||||
|
|
|
@ -196,7 +196,7 @@ impl SharedQueue {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::isolate::Buf;
|
use crate::ops::Buf;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn basic() {
|
fn basic() {
|
||||||
|
|
5
deno_typescript/lib.deno_core.d.ts
vendored
5
deno_typescript/lib.deno_core.d.ts
vendored
|
@ -37,6 +37,11 @@ declare interface DenoCore {
|
||||||
shift(): Uint8Array | null;
|
shift(): Uint8Array | null;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
ops: {
|
||||||
|
init(): void;
|
||||||
|
get(name: string): number;
|
||||||
|
};
|
||||||
|
|
||||||
recv(cb: MessageCallback): void;
|
recv(cb: MessageCallback): void;
|
||||||
|
|
||||||
send(
|
send(
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue