Return error if more than one listener calls WorkerHandle::get_event() (#5461)

This commit is contained in:
Yiyu Lin 2020-05-17 12:50:38 +08:00 committed by GitHub
parent f12dffca9f
commit c4fe58d8df
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 11 additions and 11 deletions

View file

@ -306,7 +306,7 @@ fn op_host_get_message(
}; };
let state_ = state.clone(); let state_ = state.clone();
let op = async move { let op = async move {
let response = match worker_handle.get_event().await { let response = match worker_handle.get_event().await? {
Some(event) => { Some(event) => {
// Terminal error means that worker should be removed from worker table. // Terminal error means that worker should be removed from worker table.
if let WorkerEvent::TerminalError(_) = &event { if let WorkerEvent::TerminalError(_) = &event {

View file

@ -794,7 +794,7 @@ async fn execute_in_thread(
})?; })?;
let handle = handle_receiver.recv().unwrap()?; let handle = handle_receiver.recv().unwrap()?;
handle.post_message(req)?; handle.post_message(req)?;
let event = handle.get_event().await.expect("Compiler didn't respond"); let event = handle.get_event().await?.expect("Compiler didn't respond");
let buf = match event { let buf = match event {
WorkerEvent::Message(buf) => Ok(buf), WorkerEvent::Message(buf) => Ok(buf),
WorkerEvent::Error(error) => Err(error), WorkerEvent::Error(error) => Err(error),

View file

@ -300,13 +300,13 @@ mod tests {
let r = handle.post_message(msg.clone()); let r = handle.post_message(msg.clone());
assert!(r.is_ok()); assert!(r.is_ok());
let maybe_msg = handle.get_event().await; let maybe_msg = handle.get_event().await.unwrap();
assert!(maybe_msg.is_some()); assert!(maybe_msg.is_some());
let r = handle.post_message(msg.clone()); let r = handle.post_message(msg.clone());
assert!(r.is_ok()); assert!(r.is_ok());
let maybe_msg = handle.get_event().await; let maybe_msg = handle.get_event().await.unwrap();
assert!(maybe_msg.is_some()); assert!(maybe_msg.is_some());
match maybe_msg { match maybe_msg {
Some(WorkerEvent::Message(buf)) => { Some(WorkerEvent::Message(buf)) => {
@ -321,7 +321,7 @@ mod tests {
.into_boxed_bytes(); .into_boxed_bytes();
let r = handle.post_message(msg); let r = handle.post_message(msg);
assert!(r.is_ok()); assert!(r.is_ok());
let event = handle.get_event().await; let event = handle.get_event().await.unwrap();
assert!(event.is_none()); assert!(event.is_none());
handle.sender.close_channel(); handle.sender.close_channel();
}); });
@ -348,7 +348,7 @@ mod tests {
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
let r = handle.post_message(msg.clone()); let r = handle.post_message(msg.clone());
assert!(r.is_ok()); assert!(r.is_ok());
let event = handle.get_event().await; let event = handle.get_event().await.unwrap();
assert!(event.is_none()); assert!(event.is_none());
handle.sender.close_channel(); handle.sender.close_channel();
}); });

View file

@ -51,11 +51,11 @@ impl WorkerHandle {
sender.try_send(buf).map_err(ErrBox::from) sender.try_send(buf).map_err(ErrBox::from)
} }
// TODO: should use `try_lock` and return error if /// Get the event with lock.
// more than one listener tries to get event /// Return error if more than one listener tries to get event
pub async fn get_event(&self) -> Option<WorkerEvent> { pub async fn get_event(&self) -> Result<Option<WorkerEvent>, ErrBox> {
let mut receiver = self.receiver.lock().await; let mut receiver = self.receiver.try_lock()?;
receiver.next().await Ok(receiver.next().await)
} }
} }