mirror of
https://github.com/emmett-framework/granian.git
synced 2025-07-09 20:35:35 +00:00
Ensure http.disconnect
event in ASGI protocol (#174)
This commit is contained in:
parent
da6be3024d
commit
571d443d2c
1 changed files with 30 additions and 4 deletions
|
@ -44,6 +44,8 @@ pub(crate) struct ASGIHTTPProtocol {
|
|||
response_status: Option<i16>,
|
||||
response_headers: Option<HeaderMap>,
|
||||
body_tx: Option<mpsc::Sender<Result<body::Bytes, anyhow::Error>>>,
|
||||
flow_rx_exhausted: Arc<std::sync::RwLock<bool>>,
|
||||
flow_tx_waiter: Arc<tokio::sync::Notify>,
|
||||
}
|
||||
|
||||
impl ASGIHTTPProtocol {
|
||||
|
@ -57,6 +59,8 @@ impl ASGIHTTPProtocol {
|
|||
response_status: None,
|
||||
response_headers: None,
|
||||
body_tx: None,
|
||||
flow_rx_exhausted: Arc::new(std::sync::RwLock::new(false)),
|
||||
flow_tx_waiter: Arc::new(tokio::sync::Notify::new()),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -96,7 +100,20 @@ impl ASGIHTTPProtocol {
|
|||
#[pymethods]
|
||||
impl ASGIHTTPProtocol {
|
||||
fn receive<'p>(&mut self, py: Python<'p>) -> PyResult<&'p PyAny> {
|
||||
if *self.flow_rx_exhausted.read().unwrap() {
|
||||
let holder = self.flow_tx_waiter.clone();
|
||||
return future_into_py_futlike(self.rt.clone(), py, async move {
|
||||
let () = holder.notified().await;
|
||||
Python::with_gil(|py| {
|
||||
let dict = PyDict::new(py);
|
||||
dict.set_item(pyo3::intern!(py, "type"), pyo3::intern!(py, "http.disconnect"))?;
|
||||
Ok(dict.to_object(py))
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
let body_ref = self.request_body.clone();
|
||||
let flow_ref = self.flow_rx_exhausted.clone();
|
||||
future_into_py_iter(self.rt.clone(), py, async move {
|
||||
let mut bodym = body_ref.lock().await;
|
||||
let body = &mut *bodym;
|
||||
|
@ -110,6 +127,11 @@ impl ASGIHTTPProtocol {
|
|||
}
|
||||
_ => body::Bytes::new(),
|
||||
};
|
||||
if !more_body {
|
||||
let mut flow = flow_ref.write().unwrap();
|
||||
*flow = true;
|
||||
}
|
||||
|
||||
Python::with_gil(|py| {
|
||||
let dict = PyDict::new(py);
|
||||
dict.set_item(pyo3::intern!(py, "type"), pyo3::intern!(py, "http.request"))?;
|
||||
|
@ -143,6 +165,7 @@ impl ASGIHTTPProtocol {
|
|||
.map_err(|e| match e {})
|
||||
.boxed(),
|
||||
);
|
||||
self.flow_tx_waiter.notify_one();
|
||||
empty_future_into_py(py)
|
||||
}
|
||||
(true, true, false) => {
|
||||
|
@ -164,10 +187,13 @@ impl ASGIHTTPProtocol {
|
|||
_ => error_flow!(),
|
||||
},
|
||||
(true, false, true) => match self.body_tx.take() {
|
||||
Some(tx) => match body.is_empty() {
|
||||
false => self.send_body(py, tx, body),
|
||||
true => empty_future_into_py(py),
|
||||
},
|
||||
Some(tx) => {
|
||||
self.flow_tx_waiter.notify_one();
|
||||
match body.is_empty() {
|
||||
false => self.send_body(py, tx, body),
|
||||
true => empty_future_into_py(py),
|
||||
}
|
||||
}
|
||||
_ => error_flow!(),
|
||||
},
|
||||
_ => error_flow!(),
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue