feat(runtime/http): server side websocket support (#10359)

Co-authored-by: Nayeem Rahman <nayeemrmn99@gmail.com>
Co-authored-by: Luca Casonato <hello@lcas.dev>
This commit is contained in:
Leo K 2021-07-08 13:33:01 +02:00 committed by GitHub
parent 215f6f2c9e
commit 5e092b19fe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 579 additions and 217 deletions

View file

@ -64,13 +64,81 @@ impl WebSocketPermissions for NoWebSocketPermissions {
}
type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
struct WsStreamResource {
tx: AsyncRefCell<SplitSink<WsStream, Message>>,
rx: AsyncRefCell<SplitStream<WsStream>>,
pub enum WebSocketStreamType {
Client {
tx: AsyncRefCell<SplitSink<WsStream, Message>>,
rx: AsyncRefCell<SplitStream<WsStream>>,
},
Server {
tx: AsyncRefCell<
SplitSink<WebSocketStream<hyper::upgrade::Upgraded>, Message>,
>,
rx: AsyncRefCell<SplitStream<WebSocketStream<hyper::upgrade::Upgraded>>>,
},
}
pub struct WsStreamResource {
pub stream: WebSocketStreamType,
// When a `WsStreamResource` resource is closed, all pending 'read' ops are
// canceled, while 'write' ops are allowed to complete. Therefore only
// 'read' futures are attached to this cancel handle.
cancel: CancelHandle,
pub cancel: CancelHandle,
}
impl WsStreamResource {
async fn send(self: &Rc<Self>, message: Message) -> Result<(), AnyError> {
match self.stream {
WebSocketStreamType::Client { .. } => {
let mut tx = RcRef::map(self, |r| match &r.stream {
WebSocketStreamType::Client { tx, .. } => tx,
WebSocketStreamType::Server { .. } => unreachable!(),
})
.borrow_mut()
.await;
tx.send(message).await?;
}
WebSocketStreamType::Server { .. } => {
let mut tx = RcRef::map(self, |r| match &r.stream {
WebSocketStreamType::Client { .. } => unreachable!(),
WebSocketStreamType::Server { tx, .. } => tx,
})
.borrow_mut()
.await;
tx.send(message).await?;
}
}
Ok(())
}
async fn next_message(
self: &Rc<Self>,
cancel: RcRef<CancelHandle>,
) -> Result<
Option<Result<Message, tokio_tungstenite::tungstenite::Error>>,
AnyError,
> {
match &self.stream {
WebSocketStreamType::Client { .. } => {
let mut rx = RcRef::map(self, |r| match &r.stream {
WebSocketStreamType::Client { rx, .. } => rx,
WebSocketStreamType::Server { .. } => unreachable!(),
})
.borrow_mut()
.await;
rx.next().or_cancel(cancel).await.map_err(AnyError::from)
}
WebSocketStreamType::Server { .. } => {
let mut rx = RcRef::map(self, |r| match &r.stream {
WebSocketStreamType::Client { .. } => unreachable!(),
WebSocketStreamType::Server { rx, .. } => rx,
})
.borrow_mut()
.await;
rx.next().or_cancel(cancel).await.map_err(AnyError::from)
}
}
}
}
impl Resource for WsStreamResource {
@ -79,8 +147,6 @@ impl Resource for WsStreamResource {
}
}
impl WsStreamResource {}
// This op is needed because creating a WS instance in JavaScript is a sync
// operation and should throw error when permissions are not fulfilled,
// but actual op that connects WS is async.
@ -184,8 +250,10 @@ where
let (ws_tx, ws_rx) = stream.split();
let resource = WsStreamResource {
rx: AsyncRefCell::new(ws_rx),
tx: AsyncRefCell::new(ws_tx),
stream: WebSocketStreamType::Client {
rx: AsyncRefCell::new(ws_rx),
tx: AsyncRefCell::new(ws_tx),
},
cancel: Default::default(),
};
let mut state = state.borrow_mut();
@ -227,15 +295,13 @@ pub async fn op_ws_send(
"pong" => Message::Pong(vec![]),
_ => unreachable!(),
};
let rid = args.rid;
let resource = state
.borrow_mut()
.resource_table
.get::<WsStreamResource>(rid)
.get::<WsStreamResource>(args.rid)
.ok_or_else(bad_resource_id)?;
let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await;
tx.send(msg).await?;
resource.send(msg).await?;
Ok(())
}
@ -266,8 +332,7 @@ pub async fn op_ws_close(
.resource_table
.get::<WsStreamResource>(rid)
.ok_or_else(bad_resource_id)?;
let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await;
tx.send(msg).await?;
resource.send(msg).await?;
Ok(())
}
@ -294,9 +359,8 @@ pub async fn op_ws_next_event(
.get::<WsStreamResource>(rid)
.ok_or_else(bad_resource_id)?;
let mut rx = RcRef::map(&resource, |r| &r.rx).borrow_mut().await;
let cancel = RcRef::map(resource, |r| &r.cancel);
let val = rx.next().or_cancel(cancel).await?;
let cancel = RcRef::map(&resource, |r| &r.cancel);
let val = resource.next_message(cancel).await?;
let res = match val {
Some(Ok(Message::Text(text))) => NextEventResponse::String(text),
Some(Ok(Message::Binary(data))) => NextEventResponse::Binary(data.into()),