mirror of
https://github.com/denoland/deno.git
synced 2025-10-03 07:34:36 +00:00
Use gotham-like state for ops (#7385)
Provides a concrete state type that can be dynamically added. This is necessary for op crates. * renames BasicState to OpState * async ops take `Rc<RefCell<OpState>>` * sync ops take `&mut OpState` * removes `OpRegistry`, `OpRouter` traits * `get_error_class_fn` moved to OpState * ResourceTable moved to OpState
This commit is contained in:
parent
6f70e6e72b
commit
7c2e7c6608
44 changed files with 1576 additions and 1348 deletions
117
cli/ops/net.rs
117
cli/ops/net.rs
|
@ -2,14 +2,14 @@
|
|||
|
||||
use crate::ops::io::{StreamResource, StreamResourceHolder};
|
||||
use crate::resolve_addr::resolve_addr;
|
||||
use crate::state::State;
|
||||
use deno_core::BufVec;
|
||||
use deno_core::ErrBox;
|
||||
use deno_core::OpRegistry;
|
||||
use deno_core::OpState;
|
||||
use deno_core::ZeroCopyBuf;
|
||||
use futures::future::poll_fn;
|
||||
use serde_derive::Deserialize;
|
||||
use serde_json::Value;
|
||||
use std::cell::RefCell;
|
||||
use std::net::Shutdown;
|
||||
use std::net::SocketAddr;
|
||||
use std::rc::Rc;
|
||||
|
@ -22,13 +22,13 @@ use tokio::net::UdpSocket;
|
|||
#[cfg(unix)]
|
||||
use super::net_unix;
|
||||
|
||||
pub fn init(s: &Rc<State>) {
|
||||
s.register_op_json_async("op_accept", op_accept);
|
||||
s.register_op_json_async("op_connect", op_connect);
|
||||
s.register_op_json_sync("op_shutdown", op_shutdown);
|
||||
s.register_op_json_sync("op_listen", op_listen);
|
||||
s.register_op_json_async("op_datagram_receive", op_datagram_receive);
|
||||
s.register_op_json_async("op_datagram_send", op_datagram_send);
|
||||
pub fn init(rt: &mut deno_core::JsRuntime) {
|
||||
super::reg_json_async(rt, "op_accept", op_accept);
|
||||
super::reg_json_async(rt, "op_connect", op_connect);
|
||||
super::reg_json_sync(rt, "op_shutdown", op_shutdown);
|
||||
super::reg_json_sync(rt, "op_listen", op_listen);
|
||||
super::reg_json_async(rt, "op_datagram_receive", op_datagram_receive);
|
||||
super::reg_json_async(rt, "op_datagram_send", op_datagram_send);
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
|
@ -38,15 +38,16 @@ pub(crate) struct AcceptArgs {
|
|||
}
|
||||
|
||||
async fn accept_tcp(
|
||||
state: Rc<State>,
|
||||
state: Rc<RefCell<OpState>>,
|
||||
args: AcceptArgs,
|
||||
_zero_copy: BufVec,
|
||||
) -> Result<Value, ErrBox> {
|
||||
let rid = args.rid as u32;
|
||||
|
||||
let accept_fut = poll_fn(|cx| {
|
||||
let mut resource_table = state.resource_table.borrow_mut();
|
||||
let listener_resource = resource_table
|
||||
let mut state = state.borrow_mut();
|
||||
let listener_resource = state
|
||||
.resource_table
|
||||
.get_mut::<TcpListenerResource>(rid)
|
||||
.ok_or_else(|| ErrBox::bad_resource("Listener has been closed"))?;
|
||||
let listener = &mut listener_resource.listener;
|
||||
|
@ -68,7 +69,9 @@ async fn accept_tcp(
|
|||
let (tcp_stream, _socket_addr) = accept_fut.await?;
|
||||
let local_addr = tcp_stream.local_addr()?;
|
||||
let remote_addr = tcp_stream.peer_addr()?;
|
||||
let rid = state.resource_table.borrow_mut().add(
|
||||
|
||||
let mut state = state.borrow_mut();
|
||||
let rid = state.resource_table.add(
|
||||
"tcpStream",
|
||||
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
|
||||
tcp_stream,
|
||||
|
@ -90,7 +93,7 @@ async fn accept_tcp(
|
|||
}
|
||||
|
||||
async fn op_accept(
|
||||
state: Rc<State>,
|
||||
state: Rc<RefCell<OpState>>,
|
||||
args: Value,
|
||||
bufs: BufVec,
|
||||
) -> Result<Value, ErrBox> {
|
||||
|
@ -113,7 +116,7 @@ pub(crate) struct ReceiveArgs {
|
|||
}
|
||||
|
||||
async fn receive_udp(
|
||||
state: Rc<State>,
|
||||
state: Rc<RefCell<OpState>>,
|
||||
args: ReceiveArgs,
|
||||
zero_copy: BufVec,
|
||||
) -> Result<Value, ErrBox> {
|
||||
|
@ -123,8 +126,9 @@ async fn receive_udp(
|
|||
let rid = args.rid as u32;
|
||||
|
||||
let receive_fut = poll_fn(|cx| {
|
||||
let mut resource_table = state.resource_table.borrow_mut();
|
||||
let resource = resource_table
|
||||
let mut state = state.borrow_mut();
|
||||
let resource = state
|
||||
.resource_table
|
||||
.get_mut::<UdpSocketResource>(rid)
|
||||
.ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?;
|
||||
let socket = &mut resource.socket;
|
||||
|
@ -144,7 +148,7 @@ async fn receive_udp(
|
|||
}
|
||||
|
||||
async fn op_datagram_receive(
|
||||
state: Rc<State>,
|
||||
state: Rc<RefCell<OpState>>,
|
||||
args: Value,
|
||||
zero_copy: BufVec,
|
||||
) -> Result<Value, ErrBox> {
|
||||
|
@ -171,12 +175,13 @@ struct SendArgs {
|
|||
}
|
||||
|
||||
async fn op_datagram_send(
|
||||
state: Rc<State>,
|
||||
state: Rc<RefCell<OpState>>,
|
||||
args: Value,
|
||||
zero_copy: BufVec,
|
||||
) -> Result<Value, ErrBox> {
|
||||
assert_eq!(zero_copy.len(), 1, "Invalid number of arguments");
|
||||
let zero_copy = zero_copy[0].clone();
|
||||
let cli_state = super::cli_state2(&state);
|
||||
|
||||
match serde_json::from_value(args)? {
|
||||
SendArgs {
|
||||
|
@ -184,11 +189,12 @@ async fn op_datagram_send(
|
|||
transport,
|
||||
transport_args: ArgsEnum::Ip(args),
|
||||
} if transport == "udp" => {
|
||||
state.check_net(&args.hostname, args.port)?;
|
||||
cli_state.check_net(&args.hostname, args.port)?;
|
||||
let addr = resolve_addr(&args.hostname, args.port)?;
|
||||
poll_fn(move |cx| {
|
||||
let mut resource_table = state.resource_table.borrow_mut();
|
||||
let resource = resource_table
|
||||
let mut state = state.borrow_mut();
|
||||
let resource = state
|
||||
.resource_table
|
||||
.get_mut::<UdpSocketResource>(rid as u32)
|
||||
.ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?;
|
||||
resource
|
||||
|
@ -206,9 +212,10 @@ async fn op_datagram_send(
|
|||
transport_args: ArgsEnum::Unix(args),
|
||||
} if transport == "unixpacket" => {
|
||||
let address_path = net_unix::Path::new(&args.path);
|
||||
state.check_read(&address_path)?;
|
||||
let mut resource_table = state.resource_table.borrow_mut();
|
||||
let resource = resource_table
|
||||
cli_state.check_read(&address_path)?;
|
||||
let mut state = state.borrow_mut();
|
||||
let resource = state
|
||||
.resource_table
|
||||
.get_mut::<net_unix::UnixDatagramResource>(rid as u32)
|
||||
.ok_or_else(|| ErrBox::new("NotConnected", "Socket has been closed"))?;
|
||||
let socket = &mut resource.socket;
|
||||
|
@ -230,21 +237,24 @@ struct ConnectArgs {
|
|||
}
|
||||
|
||||
async fn op_connect(
|
||||
state: Rc<State>,
|
||||
state: Rc<RefCell<OpState>>,
|
||||
args: Value,
|
||||
_zero_copy: BufVec,
|
||||
) -> Result<Value, ErrBox> {
|
||||
let cli_state = super::cli_state2(&state);
|
||||
match serde_json::from_value(args)? {
|
||||
ConnectArgs {
|
||||
transport,
|
||||
transport_args: ArgsEnum::Ip(args),
|
||||
} if transport == "tcp" => {
|
||||
state.check_net(&args.hostname, args.port)?;
|
||||
cli_state.check_net(&args.hostname, args.port)?;
|
||||
let addr = resolve_addr(&args.hostname, args.port)?;
|
||||
let tcp_stream = TcpStream::connect(&addr).await?;
|
||||
let local_addr = tcp_stream.local_addr()?;
|
||||
let remote_addr = tcp_stream.peer_addr()?;
|
||||
let rid = state.resource_table.borrow_mut().add(
|
||||
|
||||
let mut state_ = state.borrow_mut();
|
||||
let rid = state_.resource_table.add(
|
||||
"tcpStream",
|
||||
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
|
||||
tcp_stream,
|
||||
|
@ -270,14 +280,16 @@ async fn op_connect(
|
|||
transport_args: ArgsEnum::Unix(args),
|
||||
} if transport == "unix" => {
|
||||
let address_path = net_unix::Path::new(&args.path);
|
||||
state.check_unstable("Deno.connect");
|
||||
state.check_read(&address_path)?;
|
||||
cli_state.check_unstable("Deno.connect");
|
||||
cli_state.check_read(&address_path)?;
|
||||
let path = args.path;
|
||||
let unix_stream =
|
||||
net_unix::UnixStream::connect(net_unix::Path::new(&path)).await?;
|
||||
let local_addr = unix_stream.local_addr()?;
|
||||
let remote_addr = unix_stream.peer_addr()?;
|
||||
let rid = state.resource_table.borrow_mut().add(
|
||||
|
||||
let mut state_ = state.borrow_mut();
|
||||
let rid = state_.resource_table.add(
|
||||
"unixStream",
|
||||
Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
|
||||
unix_stream,
|
||||
|
@ -306,11 +318,11 @@ struct ShutdownArgs {
|
|||
}
|
||||
|
||||
fn op_shutdown(
|
||||
state: &State,
|
||||
state: &mut OpState,
|
||||
args: Value,
|
||||
_zero_copy: &mut [ZeroCopyBuf],
|
||||
) -> Result<Value, ErrBox> {
|
||||
state.check_unstable("Deno.shutdown");
|
||||
super::cli_state(state).check_unstable("Deno.shutdown");
|
||||
|
||||
let args: ShutdownArgs = serde_json::from_value(args)?;
|
||||
|
||||
|
@ -323,8 +335,8 @@ fn op_shutdown(
|
|||
_ => unimplemented!(),
|
||||
};
|
||||
|
||||
let mut resource_table = state.resource_table.borrow_mut();
|
||||
let resource_holder = resource_table
|
||||
let resource_holder = state
|
||||
.resource_table
|
||||
.get_mut::<StreamResourceHolder>(rid)
|
||||
.ok_or_else(ErrBox::bad_resource_id)?;
|
||||
match resource_holder.resource {
|
||||
|
@ -416,7 +428,7 @@ struct ListenArgs {
|
|||
}
|
||||
|
||||
fn listen_tcp(
|
||||
state: &State,
|
||||
state: &mut OpState,
|
||||
addr: SocketAddr,
|
||||
) -> Result<(u32, SocketAddr), ErrBox> {
|
||||
let std_listener = std::net::TcpListener::bind(&addr)?;
|
||||
|
@ -429,14 +441,13 @@ fn listen_tcp(
|
|||
};
|
||||
let rid = state
|
||||
.resource_table
|
||||
.borrow_mut()
|
||||
.add("tcpListener", Box::new(listener_resource));
|
||||
|
||||
Ok((rid, local_addr))
|
||||
}
|
||||
|
||||
fn listen_udp(
|
||||
state: &State,
|
||||
state: &mut OpState,
|
||||
addr: SocketAddr,
|
||||
) -> Result<(u32, SocketAddr), ErrBox> {
|
||||
let std_socket = std::net::UdpSocket::bind(&addr)?;
|
||||
|
@ -445,26 +456,28 @@ fn listen_udp(
|
|||
let socket_resource = UdpSocketResource { socket };
|
||||
let rid = state
|
||||
.resource_table
|
||||
.borrow_mut()
|
||||
.add("udpSocket", Box::new(socket_resource));
|
||||
|
||||
Ok((rid, local_addr))
|
||||
}
|
||||
|
||||
fn op_listen(
|
||||
state: &State,
|
||||
state: &mut OpState,
|
||||
args: Value,
|
||||
_zero_copy: &mut [ZeroCopyBuf],
|
||||
) -> Result<Value, ErrBox> {
|
||||
let cli_state = super::cli_state(state);
|
||||
match serde_json::from_value(args)? {
|
||||
ListenArgs {
|
||||
transport,
|
||||
transport_args: ArgsEnum::Ip(args),
|
||||
} => {
|
||||
if transport == "udp" {
|
||||
state.check_unstable("Deno.listenDatagram");
|
||||
{
|
||||
if transport == "udp" {
|
||||
cli_state.check_unstable("Deno.listenDatagram");
|
||||
}
|
||||
cli_state.check_net(&args.hostname, args.port)?;
|
||||
}
|
||||
state.check_net(&args.hostname, args.port)?;
|
||||
let addr = resolve_addr(&args.hostname, args.port)?;
|
||||
let (rid, local_addr) = if transport == "tcp" {
|
||||
listen_tcp(state, addr)?
|
||||
|
@ -491,15 +504,17 @@ fn op_listen(
|
|||
transport,
|
||||
transport_args: ArgsEnum::Unix(args),
|
||||
} if transport == "unix" || transport == "unixpacket" => {
|
||||
if transport == "unix" {
|
||||
state.check_unstable("Deno.listen");
|
||||
}
|
||||
if transport == "unixpacket" {
|
||||
state.check_unstable("Deno.listenDatagram");
|
||||
}
|
||||
let address_path = net_unix::Path::new(&args.path);
|
||||
state.check_read(&address_path)?;
|
||||
state.check_write(&address_path)?;
|
||||
{
|
||||
if transport == "unix" {
|
||||
cli_state.check_unstable("Deno.listen");
|
||||
}
|
||||
if transport == "unixpacket" {
|
||||
cli_state.check_unstable("Deno.listenDatagram");
|
||||
}
|
||||
cli_state.check_read(&address_path)?;
|
||||
cli_state.check_write(&address_path)?;
|
||||
}
|
||||
let (rid, local_addr) = if transport == "unix" {
|
||||
net_unix::listen_unix(state, &address_path)?
|
||||
} else {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue