Fix inspector hanging when task budget is exceeded (#5083)

The issue is solved by proxying websocket messages over a pair of
`futures::mpsc::unbounded` channels. As these are are implemented in
the 'futures' crate, they can't participate in Tokio's cooperative
task yielding.
This commit is contained in:
Bert Belder 2020-05-05 05:39:42 +02:00
parent 6e287d9518
commit e574437922
No known key found for this signature in database
GPG key ID: 7A77887B2E2ED461
3 changed files with 204 additions and 71 deletions

View file

@ -37,7 +37,6 @@ use std::sync::Once;
use std::thread;
use uuid::Uuid;
use warp::filters::ws;
use warp::filters::ws::WebSocket;
use warp::Filter;
struct InspectorServer {
@ -91,7 +90,7 @@ struct InspectorInfo {
host: SocketAddr,
uuid: Uuid,
thread_name: Option<String>,
new_websocket_tx: UnboundedSender<WebSocket>,
new_websocket_tx: UnboundedSender<WebSocketProxy>,
canary_rx: oneshot::Receiver<Never>,
}
@ -178,7 +177,9 @@ async fn server(
g.get(&uuid).map(|info| info.new_websocket_tx.clone()).map(
|new_websocket_tx| {
ws.on_upgrade(move |websocket| async move {
let _ = new_websocket_tx.unbounded_send(websocket);
let (proxy, pump) = create_websocket_proxy(websocket);
let _ = new_websocket_tx.unbounded_send(proxy);
pump.await;
})
},
)
@ -223,6 +224,69 @@ async fn server(
}
}
type WebSocketProxySender = UnboundedSender<ws::Message>;
type WebSocketProxyReceiver =
UnboundedReceiver<Result<ws::Message, warp::Error>>;
/// Encapsulates an UnboundedSender/UnboundedReceiver pair that together form
/// a duplex channel for sending/receiving websocket messages.
struct WebSocketProxy {
tx: WebSocketProxySender,
rx: WebSocketProxyReceiver,
}
impl WebSocketProxy {
pub fn split(self) -> (WebSocketProxySender, WebSocketProxyReceiver) {
(self.tx, self.rx)
}
}
/// Creates a future that proxies messages sent and received on a warp WebSocket
/// to a UnboundedSender/UnboundedReceiver pair. We need this to sidestep
/// Tokio's task budget, which causes issues when DenoInspector::poll_sessions()
/// needs to block the thread because JavaScript execution is paused.
///
/// This works because UnboundedSender/UnboundedReceiver are implemented in the
/// 'futures' crate, therefore they can't participate in Tokio's cooperative
/// task yielding.
///
/// A tuple is returned, where the first element is a duplex channel that can
/// be used to send/receive messages on the websocket, and the second element
/// is a future that does the forwarding.
fn create_websocket_proxy(
websocket: ws::WebSocket,
) -> (WebSocketProxy, impl Future<Output = ()> + Send) {
// The 'outbound' channel carries messages sent to the websocket.
let (outbound_tx, outbound_rx) = mpsc::unbounded();
// The 'inbound' channel carries messages received from the websocket.
let (inbound_tx, inbound_rx) = mpsc::unbounded();
let proxy = WebSocketProxy {
tx: outbound_tx,
rx: inbound_rx,
};
// The pump future takes care of forwarding messages between the websocket
// and channels. It resolves to () when either side disconnects, ignoring any
// errors.
let pump = async move {
let (websocket_tx, websocket_rx) = websocket.split();
let outbound_pump =
outbound_rx.map(Ok).forward(websocket_tx).map_err(|_| ());
let inbound_pump = websocket_rx
.map(|msg| inbound_tx.unbounded_send(msg))
.map_err(|_| ())
.try_collect::<()>();
let _ = future::try_join(outbound_pump, inbound_pump).await;
};
(proxy, pump)
}
#[derive(Clone, Copy)]
enum PollState {
Idle,
@ -322,7 +386,8 @@ impl DenoInspector {
let mut hs = v8::HandleScope::new(v8_isolate);
let scope = hs.enter();
let (new_websocket_tx, new_websocket_rx) = mpsc::unbounded::<WebSocket>();
let (new_websocket_tx, new_websocket_rx) =
mpsc::unbounded::<WebSocketProxy>();
let (canary_tx, canary_rx) = oneshot::channel::<Never>();
let info = InspectorInfo {
@ -511,7 +576,7 @@ struct InspectorSessions {
impl InspectorSessions {
fn new(
inspector_ptr: *mut DenoInspector,
new_websocket_rx: UnboundedReceiver<WebSocket>,
new_websocket_rx: UnboundedReceiver<WebSocketProxy>,
) -> RefCell<Self> {
let new_incoming = new_websocket_rx
.map(move |websocket| DenoInspectorSession::new(inspector_ptr, websocket))
@ -609,11 +674,8 @@ impl task::ArcWake for InspectorWaker {
struct DenoInspectorSession {
v8_channel: v8::inspector::ChannelBase,
v8_session: v8::UniqueRef<v8::inspector::V8InspectorSession>,
message_handler: Pin<Box<dyn Future<Output = ()> + 'static>>,
// Internal channel/queue that temporarily stores messages sent by V8 to
// the front-end, before they are sent over the websocket.
outbound_queue_tx:
UnboundedSender<v8::UniquePtr<v8::inspector::StringBuffer>>,
websocket_tx: WebSocketProxySender,
websocket_rx_handler: Pin<Box<dyn Future<Output = ()> + 'static>>,
}
impl Deref for DenoInspectorSession {
@ -634,7 +696,7 @@ impl DenoInspectorSession {
pub fn new(
inspector_ptr: *mut DenoInspector,
websocket: WebSocket,
websocket: WebSocketProxy,
) -> Box<Self> {
new_box_with(move |self_ptr| {
let v8_channel = v8::inspector::ChannelBase::new::<Self>();
@ -648,54 +710,38 @@ impl DenoInspectorSession {
&empty_view,
);
let (outbound_queue_tx, outbound_queue_rx) =
mpsc::unbounded::<v8::UniquePtr<v8::inspector::StringBuffer>>();
let message_handler =
Self::create_message_handler(self_ptr, websocket, outbound_queue_rx);
let (websocket_tx, websocket_rx) = websocket.split();
let websocket_rx_handler =
Self::receive_from_websocket(self_ptr, websocket_rx);
Self {
v8_channel,
v8_session,
message_handler,
outbound_queue_tx,
websocket_tx,
websocket_rx_handler,
}
})
}
fn create_message_handler(
/// Returns a future that receives messages from the websocket and dispatches
/// them to the V8 session.
fn receive_from_websocket(
self_ptr: *mut Self,
websocket: WebSocket,
outbound_queue_rx: UnboundedReceiver<
v8::UniquePtr<v8::inspector::StringBuffer>,
>,
websocket_rx: WebSocketProxyReceiver,
) -> Pin<Box<dyn Future<Output = ()> + 'static>> {
let (websocket_tx, websocket_rx) = websocket.split();
// Receive messages from the websocket and dispatch them to the V8 session.
let inbound_pump = websocket_rx
.map_ok(move |msg| {
let msg = msg.as_bytes();
let msg = v8::inspector::StringView::from(msg);
unsafe { &mut *self_ptr }.dispatch_protocol_message(&msg);
})
.try_collect::<()>();
// Convert and forward messages from the outbound message queue to the
// websocket.
let outbound_pump = outbound_queue_rx
.map(move |msg| {
let msg = msg.unwrap().string().to_string();
let msg = ws::Message::text(msg);
Ok(msg)
})
.forward(websocket_tx);
let disconnect_future = future::try_join(inbound_pump, outbound_pump);
async move {
eprintln!("Debugger session started.");
match disconnect_future.await {
let result = websocket_rx
.map_ok(move |msg| {
let msg = msg.as_bytes();
let msg = v8::inspector::StringView::from(msg);
unsafe { &mut *self_ptr }.dispatch_protocol_message(&msg);
})
.try_collect::<()>()
.await;
match result {
Ok(_) => eprintln!("Debugger session ended."),
Err(err) => eprintln!("Debugger session ended: {}.", err),
};
@ -703,6 +749,12 @@ impl DenoInspectorSession {
.boxed_local()
}
fn send_to_websocket(&self, msg: v8::UniquePtr<v8::inspector::StringBuffer>) {
let msg = msg.unwrap().string().to_string();
let msg = ws::Message::text(msg);
let _ = self.websocket_tx.unbounded_send(msg);
}
pub fn break_on_first_statement(&mut self) {
let reason = v8::inspector::StringView::from(&b"debugCommand"[..]);
let detail = v8::inspector::StringView::empty();
@ -724,14 +776,14 @@ impl v8::inspector::ChannelImpl for DenoInspectorSession {
_call_id: i32,
message: v8::UniquePtr<v8::inspector::StringBuffer>,
) {
let _ = self.outbound_queue_tx.unbounded_send(message);
self.send_to_websocket(message);
}
fn send_notification(
&mut self,
message: v8::UniquePtr<v8::inspector::StringBuffer>,
) {
let _ = self.outbound_queue_tx.unbounded_send(message);
self.send_to_websocket(message);
}
fn flush_protocol_notifications(&mut self) {}
@ -740,7 +792,7 @@ impl v8::inspector::ChannelImpl for DenoInspectorSession {
impl Future for DenoInspectorSession {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.message_handler.poll_unpin(cx)
self.websocket_rx_handler.poll_unpin(cx)
}
}