fix(ext/node): worker_threads.receiveMessageOnPort doesn't panic (#23406)

Follow up to https://github.com/denoland/deno/pull/23386.
Instead of using async `recv()` method, it was replaced
with a poll based function that doesn't hold onto
RefCell borrow across await point.

Fixes https://github.com/denoland/deno/issues/23362
This commit is contained in:
Bartek Iwańczuk 2024-04-16 19:41:03 +01:00 committed by GitHub
parent 534dd34f86
commit 760d64bc6b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 34 additions and 6 deletions

View file

@ -15,6 +15,7 @@ use deno_core::OpState;
use deno_core::RcRef; use deno_core::RcRef;
use deno_core::Resource; use deno_core::Resource;
use deno_core::ResourceId; use deno_core::ResourceId;
use futures::future::poll_fn;
use serde::Deserialize; use serde::Deserialize;
use serde::Serialize; use serde::Serialize;
use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::mpsc::error::TryRecvError;
@ -52,16 +53,19 @@ impl MessagePort {
Ok(()) Ok(())
} }
#[allow(clippy::await_holding_refcell_ref)] // TODO(ry) remove!
pub async fn recv( pub async fn recv(
&self, &self,
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
) -> Result<Option<JsMessageData>, AnyError> { ) -> Result<Option<JsMessageData>, AnyError> {
let mut rx = self let rx = &self.rx;
.rx
.try_borrow_mut() let maybe_data = poll_fn(|cx| {
.map_err(|_| type_error("Port receiver is already borrowed"))?; let mut rx = rx.borrow_mut();
if let Some((data, transferables)) = rx.recv().await { rx.poll_recv(cx)
})
.await;
if let Some((data, transferables)) = maybe_data {
let js_transferables = let js_transferables =
serialize_transferables(&mut state.borrow_mut(), transferables); serialize_transferables(&mut state.borrow_mut(), transferables);
return Ok(Some(JsMessageData { return Ok(Some(JsMessageData {

View file

@ -418,21 +418,45 @@ Deno.test({
// Regression test for https://github.com/denoland/deno/issues/23362 // Regression test for https://github.com/denoland/deno/issues/23362
Deno.test("[node/worker_threads] receiveMessageOnPort works if there's pending read", function () { Deno.test("[node/worker_threads] receiveMessageOnPort works if there's pending read", function () {
const { port1, port2 } = new workerThreads.MessageChannel(); const { port1, port2 } = new workerThreads.MessageChannel();
const { port1: port3, port2: port4 } = new workerThreads.MessageChannel();
const { port1: port5, port2: port6 } = new workerThreads.MessageChannel();
const message1 = { hello: "world" }; const message1 = { hello: "world" };
const message2 = { foo: "bar" }; const message2 = { foo: "bar" };
assertEquals(workerThreads.receiveMessageOnPort(port2), undefined); assertEquals(workerThreads.receiveMessageOnPort(port2), undefined);
port2.start(); port2.start();
port4.start();
port6.start();
port1.postMessage(message1); port1.postMessage(message1);
port1.postMessage(message2); port1.postMessage(message2);
port3.postMessage(message1);
port3.postMessage(message2);
port5.postMessage(message1);
port5.postMessage(message2);
assertEquals(workerThreads.receiveMessageOnPort(port2), { assertEquals(workerThreads.receiveMessageOnPort(port2), {
message: message1, message: message1,
}); });
assertEquals(workerThreads.receiveMessageOnPort(port2), { assertEquals(workerThreads.receiveMessageOnPort(port2), {
message: message2, message: message2,
}); });
assertEquals(workerThreads.receiveMessageOnPort(port4), {
message: message1,
});
assertEquals(workerThreads.receiveMessageOnPort(port4), {
message: message2,
});
assertEquals(workerThreads.receiveMessageOnPort(port6), {
message: message1,
});
assertEquals(workerThreads.receiveMessageOnPort(port6), {
message: message2,
});
port1.close(); port1.close();
port2.close(); port2.close();
port3.close();
port4.close();
port5.close();
port6.close();
}); });