Ensure http.disconnect event in ASGI protocol (#174)

This commit is contained in:
Giovanni Barillari 2024-01-17 14:15:50 +01:00 committed by GitHub
parent da6be3024d
commit 571d443d2c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -44,6 +44,8 @@ pub(crate) struct ASGIHTTPProtocol {
response_status: Option<i16>, response_status: Option<i16>,
response_headers: Option<HeaderMap>, response_headers: Option<HeaderMap>,
body_tx: Option<mpsc::Sender<Result<body::Bytes, anyhow::Error>>>, body_tx: Option<mpsc::Sender<Result<body::Bytes, anyhow::Error>>>,
flow_rx_exhausted: Arc<std::sync::RwLock<bool>>,
flow_tx_waiter: Arc<tokio::sync::Notify>,
} }
impl ASGIHTTPProtocol { impl ASGIHTTPProtocol {
@ -57,6 +59,8 @@ impl ASGIHTTPProtocol {
response_status: None, response_status: None,
response_headers: None, response_headers: None,
body_tx: None, body_tx: None,
flow_rx_exhausted: Arc::new(std::sync::RwLock::new(false)),
flow_tx_waiter: Arc::new(tokio::sync::Notify::new()),
} }
} }
@ -96,7 +100,20 @@ impl ASGIHTTPProtocol {
#[pymethods] #[pymethods]
impl ASGIHTTPProtocol { impl ASGIHTTPProtocol {
fn receive<'p>(&mut self, py: Python<'p>) -> PyResult<&'p PyAny> { fn receive<'p>(&mut self, py: Python<'p>) -> PyResult<&'p PyAny> {
if *self.flow_rx_exhausted.read().unwrap() {
let holder = self.flow_tx_waiter.clone();
return future_into_py_futlike(self.rt.clone(), py, async move {
let () = holder.notified().await;
Python::with_gil(|py| {
let dict = PyDict::new(py);
dict.set_item(pyo3::intern!(py, "type"), pyo3::intern!(py, "http.disconnect"))?;
Ok(dict.to_object(py))
})
});
}
let body_ref = self.request_body.clone(); let body_ref = self.request_body.clone();
let flow_ref = self.flow_rx_exhausted.clone();
future_into_py_iter(self.rt.clone(), py, async move { future_into_py_iter(self.rt.clone(), py, async move {
let mut bodym = body_ref.lock().await; let mut bodym = body_ref.lock().await;
let body = &mut *bodym; let body = &mut *bodym;
@ -110,6 +127,11 @@ impl ASGIHTTPProtocol {
} }
_ => body::Bytes::new(), _ => body::Bytes::new(),
}; };
if !more_body {
let mut flow = flow_ref.write().unwrap();
*flow = true;
}
Python::with_gil(|py| { Python::with_gil(|py| {
let dict = PyDict::new(py); let dict = PyDict::new(py);
dict.set_item(pyo3::intern!(py, "type"), pyo3::intern!(py, "http.request"))?; dict.set_item(pyo3::intern!(py, "type"), pyo3::intern!(py, "http.request"))?;
@ -143,6 +165,7 @@ impl ASGIHTTPProtocol {
.map_err(|e| match e {}) .map_err(|e| match e {})
.boxed(), .boxed(),
); );
self.flow_tx_waiter.notify_one();
empty_future_into_py(py) empty_future_into_py(py)
} }
(true, true, false) => { (true, true, false) => {
@ -164,10 +187,13 @@ impl ASGIHTTPProtocol {
_ => error_flow!(), _ => error_flow!(),
}, },
(true, false, true) => match self.body_tx.take() { (true, false, true) => match self.body_tx.take() {
Some(tx) => match body.is_empty() { Some(tx) => {
false => self.send_body(py, tx, body), self.flow_tx_waiter.notify_one();
true => empty_future_into_py(py), match body.is_empty() {
}, false => self.send_body(py, tx, body),
true => empty_future_into_py(py),
}
}
_ => error_flow!(), _ => error_flow!(),
}, },
_ => error_flow!(), _ => error_flow!(),