mirror of
https://github.com/denoland/deno.git
synced 2025-08-04 02:48:24 +00:00
perf(ext/websocket): Make send sync for non-stream websockets (#19376)
No need to go through the async machinery for `send(String | Buffer)` -- we can fire and forget, and then route any send errors into the async call we're already making (`op_ws_next_event`). Early benchmark on MacOS: Before: 155.8k msg/sec After: 166.2k msg/sec (+6.6%) Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
This commit is contained in:
parent
42991017e9
commit
df76a062fa
3 changed files with 93 additions and 59 deletions
|
@ -281,8 +281,10 @@ where
|
|||
}
|
||||
|
||||
let resource = ServerWebSocket {
|
||||
buffered: Cell::new(0),
|
||||
errored: Cell::new(None),
|
||||
ws: AsyncRefCell::new(FragmentCollector::new(stream)),
|
||||
closed: Rc::new(Cell::new(false)),
|
||||
closed: Cell::new(false),
|
||||
tx_lock: AsyncRefCell::new(()),
|
||||
};
|
||||
let mut state = state.borrow_mut();
|
||||
|
@ -315,18 +317,20 @@ pub enum MessageKind {
|
|||
}
|
||||
|
||||
pub struct ServerWebSocket {
|
||||
buffered: Cell<usize>,
|
||||
errored: Cell<Option<AnyError>>,
|
||||
ws: AsyncRefCell<FragmentCollector<WebSocketStream>>,
|
||||
closed: Rc<Cell<bool>>,
|
||||
closed: Cell<bool>,
|
||||
tx_lock: AsyncRefCell<()>,
|
||||
}
|
||||
|
||||
impl ServerWebSocket {
|
||||
#[inline]
|
||||
pub async fn write_frame(
|
||||
self: Rc<Self>,
|
||||
self: &Rc<Self>,
|
||||
frame: Frame,
|
||||
) -> Result<(), AnyError> {
|
||||
let _lock = RcRef::map(&self, |r| &r.tx_lock).borrow_mut().await;
|
||||
let _lock = RcRef::map(self, |r| &r.tx_lock).borrow_mut().await;
|
||||
// SAFETY: fastwebsockets only needs a mutable reference to the WebSocket
|
||||
// to populate the write buffer. We encounter an await point when writing
|
||||
// to the socket after the frame has already been written to the buffer.
|
||||
|
@ -361,8 +365,10 @@ pub fn ws_create_server_stream(
|
|||
ws.set_auto_pong(true);
|
||||
|
||||
let ws_resource = ServerWebSocket {
|
||||
buffered: Cell::new(0),
|
||||
errored: Cell::new(None),
|
||||
ws: AsyncRefCell::new(FragmentCollector::new(ws)),
|
||||
closed: Rc::new(Cell::new(false)),
|
||||
closed: Cell::new(false),
|
||||
tx_lock: AsyncRefCell::new(()),
|
||||
};
|
||||
|
||||
|
@ -370,8 +376,48 @@ pub fn ws_create_server_stream(
|
|||
Ok(rid)
|
||||
}
|
||||
|
||||
#[op]
|
||||
pub async fn op_ws_send_binary(
|
||||
#[op(fast)]
|
||||
pub fn op_ws_send_binary(
|
||||
state: &mut OpState,
|
||||
rid: ResourceId,
|
||||
data: ZeroCopyBuf,
|
||||
) {
|
||||
let resource = state.resource_table.get::<ServerWebSocket>(rid).unwrap();
|
||||
let data = data.to_vec();
|
||||
let len = data.len();
|
||||
resource.buffered.set(resource.buffered.get() + len);
|
||||
deno_core::task::spawn(async move {
|
||||
if let Err(err) = resource
|
||||
.write_frame(Frame::new(true, OpCode::Binary, None, data))
|
||||
.await
|
||||
{
|
||||
resource.errored.set(Some(err));
|
||||
} else {
|
||||
resource.buffered.set(resource.buffered.get() - len);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[op(fast)]
|
||||
pub fn op_ws_send_text(state: &mut OpState, rid: ResourceId, data: String) {
|
||||
let resource = state.resource_table.get::<ServerWebSocket>(rid).unwrap();
|
||||
let len = data.len();
|
||||
resource.buffered.set(resource.buffered.get() + len);
|
||||
deno_core::task::spawn(async move {
|
||||
if let Err(err) = resource
|
||||
.write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
|
||||
.await
|
||||
{
|
||||
resource.errored.set(Some(err));
|
||||
} else {
|
||||
resource.buffered.set(resource.buffered.get() - len);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Async version of send. Does not update buffered amount as we rely on the socket itself for backpressure.
|
||||
#[op(fast)]
|
||||
pub async fn op_ws_send_binary_async(
|
||||
state: Rc<RefCell<OpState>>,
|
||||
rid: ResourceId,
|
||||
data: ZeroCopyBuf,
|
||||
|
@ -380,13 +426,15 @@ pub async fn op_ws_send_binary(
|
|||
.borrow_mut()
|
||||
.resource_table
|
||||
.get::<ServerWebSocket>(rid)?;
|
||||
let data = data.to_vec();
|
||||
resource
|
||||
.write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec()))
|
||||
.write_frame(Frame::new(true, OpCode::Binary, None, data))
|
||||
.await
|
||||
}
|
||||
|
||||
#[op]
|
||||
pub async fn op_ws_send_text(
|
||||
/// Async version of send. Does not update buffered amount as we rely on the socket itself for backpressure.
|
||||
#[op(fast)]
|
||||
pub async fn op_ws_send_text_async(
|
||||
state: Rc<RefCell<OpState>>,
|
||||
rid: ResourceId,
|
||||
data: String,
|
||||
|
@ -400,6 +448,16 @@ pub async fn op_ws_send_text(
|
|||
.await
|
||||
}
|
||||
|
||||
#[op(fast)]
|
||||
pub fn op_ws_get_buffered_amount(state: &mut OpState, rid: ResourceId) -> u32 {
|
||||
state
|
||||
.resource_table
|
||||
.get::<ServerWebSocket>(rid)
|
||||
.unwrap()
|
||||
.buffered
|
||||
.get() as u32
|
||||
}
|
||||
|
||||
#[op]
|
||||
pub async fn op_ws_send_pong(
|
||||
state: Rc<RefCell<OpState>>,
|
||||
|
@ -441,8 +499,7 @@ pub async fn op_ws_close(
|
|||
.map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes()))
|
||||
.unwrap_or_else(|| Frame::close_raw(vec![]));
|
||||
|
||||
let cell = Rc::clone(&resource.closed);
|
||||
cell.set(true);
|
||||
resource.closed.set(true);
|
||||
resource.write_frame(frame).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -457,6 +514,10 @@ pub async fn op_ws_next_event(
|
|||
.resource_table
|
||||
.get::<ServerWebSocket>(rid)?;
|
||||
|
||||
if let Some(err) = resource.errored.take() {
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
|
||||
loop {
|
||||
let val = match ws.read_frame().await {
|
||||
|
@ -519,8 +580,11 @@ deno_core::extension!(deno_websocket,
|
|||
op_ws_next_event,
|
||||
op_ws_send_binary,
|
||||
op_ws_send_text,
|
||||
op_ws_send_binary_async,
|
||||
op_ws_send_text_async,
|
||||
op_ws_send_ping,
|
||||
op_ws_send_pong,
|
||||
op_ws_get_buffered_amount,
|
||||
],
|
||||
esm = [ "01_websocket.js", "02_websocketstream.js" ],
|
||||
options = {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue