Use a channel to read stdout

This commit is contained in:
Zanie 2024-01-17 11:24:36 -06:00
parent cabb9f4ba2
commit cc8459ecb5
2 changed files with 38 additions and 12 deletions

View file

@ -7,6 +7,7 @@ use tokio::fs::File;
use tokio::io::{self, BufReader};
use tokio::io::{AsyncWriteExt, Lines};
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use tokio::sync::mpsc::{Receiver, Sender};
use tracing::{debug, error};
use crate::{BuildKind, Pep517Backend};
@ -126,9 +127,23 @@ impl HookErrorKind {
}
}
}
async fn read_stdout_into_channel(
mut lines: Lines<BufReader<ChildStdout>>,
sender: Sender<String>,
) -> () {
// We do not handle any errors here. If there's an IO error or the receiver is dropped,
// we will just exit. The receiver will report the daemon as closed if it attempts to
// read after this channel exits.
while let Ok(Some(line)) = lines.next_line().await {
if let Err(_) = sender.send(line).await {
return;
}
}
}
#[derive(Debug)]
struct DaemonIO {
stdout: Lines<BufReader<ChildStdout>>,
stdout: Receiver<String>,
stdin: ChildStdin,
handle: Child,
}
@ -143,8 +158,14 @@ impl DaemonIO {
// Take standard input
let stdin = handle.stdin.take().expect("stdin is available");
// Create a channel to read standard output continuously
let (sender, receiver) = tokio::sync::mpsc::channel(20);
// We let this handle drop, as we don't care about its result
tokio::spawn(read_stdout_into_channel(stdout, sender));
Self {
stdout,
stdout: receiver,
stdin,
handle,
}
@ -159,6 +180,11 @@ impl DaemonIO {
self.stdin.flush().await?;
Ok(())
}
async fn recv(&mut self) -> Option<String> {
self.stdout.recv().await
}
async fn close(mut self) -> Result<Option<Output>, DaemonError> {
if !self.exited()? {
// Send a shutdown command if it's not closed yet
@ -195,7 +221,6 @@ impl Pep517Daemon {
/// Ensure the daemon is started and ready.
/// If the daemon is not started, [`Self::start`] will be called.
///
async fn ensure_started(&mut self) -> Result<&mut DaemonIO, DaemonError> {
let started = {
if let Some(io) = self.io.as_mut() {
@ -250,11 +275,14 @@ impl Pep517Daemon {
/// Reads a single response from the daemon.
async fn receive_one(&mut self) -> Result<DaemonResponse, DaemonError> {
let stdout = &mut self.io.as_mut().unwrap().stdout;
if let Some(line) = stdout.next_line().await? {
Ok(DaemonResponse::try_from_str(line.as_str())?)
if let Some(io) = self.io.as_mut() {
if let Some(line) = io.recv().await {
Ok(DaemonResponse::try_from_str(line.as_str())?)
} else {
self.close().await?;
Err(DaemonError::Closed)
}
} else {
self.close().await?;
Err(DaemonError::Closed)
}
}
@ -292,7 +320,7 @@ impl Pep517Daemon {
hook_name: &str,
mut args: Vec<&str>,
) -> Result<String, DaemonError> {
let mut io = self.ensure_started().await?;
let io = self.ensure_started().await?;
// Always send run and the backend name
let mut commands = vec!["run", backend.backend.as_str()];
@ -434,7 +462,7 @@ impl Pep517Daemon {
// If there's an error on close, we should raise that instead of complaining it was never called
self.closed = true;
if let Some(mut io) = self.io.take() {
if let Some(io) = self.io.take() {
io.close().await
} else {
Ok(None)

View file

@ -366,7 +366,6 @@ impl SourceBuild {
if let Some(pep517_backend) = &pep517_backend {
if pep517_backend != &default_backend {
let environment = create_pep517_build_environment(
&source_tree,
&venv,
pep517_backend,
&mut pep517_daemon,
@ -537,12 +536,11 @@ impl SourceBuildTrait for SourceBuild {
}
/// Not a method because we call it before the builder is completely initialized
async fn create_pep517_build_environment(
source_tree: &Path,
venv: &Virtualenv,
pep517_backend: &Pep517Backend,
pep517_daemon: &mut Pep517Daemon,
build_context: &impl BuildContext,
package_id: &str,
_package_id: &str,
build_kind: BuildKind,
) -> Result<(), Error> {
let extra_requires = pep517_daemon