Ensure http.disconnect event in ASGI protocol (#174)

This commit is contained in:
Giovanni Barillari 2024-01-17 14:15:50 +01:00 committed by GitHub
parent da6be3024d
commit 571d443d2c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -44,6 +44,8 @@ pub(crate) struct ASGIHTTPProtocol {
response_status: Option<i16>,
response_headers: Option<HeaderMap>,
body_tx: Option<mpsc::Sender<Result<body::Bytes, anyhow::Error>>>,
flow_rx_exhausted: Arc<std::sync::RwLock<bool>>,
flow_tx_waiter: Arc<tokio::sync::Notify>,
}
impl ASGIHTTPProtocol {
@ -57,6 +59,8 @@ impl ASGIHTTPProtocol {
response_status: None,
response_headers: None,
body_tx: None,
flow_rx_exhausted: Arc::new(std::sync::RwLock::new(false)),
flow_tx_waiter: Arc::new(tokio::sync::Notify::new()),
}
}
@ -96,7 +100,20 @@ impl ASGIHTTPProtocol {
#[pymethods]
impl ASGIHTTPProtocol {
fn receive<'p>(&mut self, py: Python<'p>) -> PyResult<&'p PyAny> {
if *self.flow_rx_exhausted.read().unwrap() {
let holder = self.flow_tx_waiter.clone();
return future_into_py_futlike(self.rt.clone(), py, async move {
let () = holder.notified().await;
Python::with_gil(|py| {
let dict = PyDict::new(py);
dict.set_item(pyo3::intern!(py, "type"), pyo3::intern!(py, "http.disconnect"))?;
Ok(dict.to_object(py))
})
});
}
let body_ref = self.request_body.clone();
let flow_ref = self.flow_rx_exhausted.clone();
future_into_py_iter(self.rt.clone(), py, async move {
let mut bodym = body_ref.lock().await;
let body = &mut *bodym;
@ -110,6 +127,11 @@ impl ASGIHTTPProtocol {
}
_ => body::Bytes::new(),
};
if !more_body {
let mut flow = flow_ref.write().unwrap();
*flow = true;
}
Python::with_gil(|py| {
let dict = PyDict::new(py);
dict.set_item(pyo3::intern!(py, "type"), pyo3::intern!(py, "http.request"))?;
@ -143,6 +165,7 @@ impl ASGIHTTPProtocol {
.map_err(|e| match e {})
.boxed(),
);
self.flow_tx_waiter.notify_one();
empty_future_into_py(py)
}
(true, true, false) => {
@ -164,10 +187,13 @@ impl ASGIHTTPProtocol {
_ => error_flow!(),
},
(true, false, true) => match self.body_tx.take() {
Some(tx) => match body.is_empty() {
false => self.send_body(py, tx, body),
true => empty_future_into_py(py),
},
Some(tx) => {
self.flow_tx_waiter.notify_one();
match body.is_empty() {
false => self.send_body(py, tx, body),
true => empty_future_into_py(py),
}
}
_ => error_flow!(),
},
_ => error_flow!(),