Add client disconnection handling in ASGI and RSGI (#524)

* Handle client disconnection in ASGI

* Add `RSGIHTTPProtocol.client_disconnect`
This commit is contained in:
Giovanni Barillari 2025-03-18 19:58:07 +01:00 committed by GitHub
parent 74020ed9d2
commit e804c9edf7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 231 additions and 58 deletions

View file

@ -1,6 +1,6 @@
# RSGI Specification
**Version:** 1.4
**Version:** 1.5
## Abstract
@ -165,10 +165,11 @@ And here are descriptions for the upper attributes:
#### HTTP protocol interface
HTTP protocol object implements two awaitable methods to receive the request body, and five different methods to send data, in particular:
HTTP protocol object implements two awaitable methods to receive the request body, five different methods to send data, and one awaitable method to wait for client disconnection, in particular:
- `__call__` to receive the entire body in `bytes` format
- `__aiter__` to receive the body in `bytes` chunks
- `client_disconnect` to watch for client disconnection
- `response_empty` to send back an empty response
- `response_str` to send back a response with a `str` body
- `response_bytes` to send back a response with `bytes` body
@ -180,6 +181,7 @@ All the upper-mentioned response methods accepts an integer `status` parameter,
```
coroutine __call__() -> body
asynciterator __aiter__() -> body chunks
coroutine client_disconnect()
function response_empty(status, headers)
function response_str(status, headers, body)
function response_bytes(status, headers, body)
@ -197,6 +199,10 @@ coroutine send_bytes(bytes)
coroutine send_str(str)
```
The `client_disconnect` method will return a future that resolve ones the client has disconnected.
> **Note:** as HTTP supports keep-alived connections, the lifecycle of the client connection might not be the same of the single request. This is why the RSGI specification doesn't imply `client_disconnect` should resolve in case a client sends multiple requests within the same connection, and thus the protocol delegates to the application the responsibility to cancel the disconnection watcher once the response is sent.
### Websocket protocol
WebSockets share some HTTP details - they have a path and headers - but also have more state. Again, most of that state is in the scope, which will live as long as the socket does.

View file

@ -21,6 +21,7 @@ class RSGIHTTPStreamTransport:
class RSGIHTTPProtocol:
async def __call__(self) -> bytes: ...
def __aiter__(self) -> Any: ...
async def client_disconnect(self) -> None: ...
def response_empty(self, status: int, headers: List[Tuple[str, str]]): ...
def response_str(self, status: int, headers: List[Tuple[str, str]], body: str): ...
def response_bytes(self, status: int, headers: List[Tuple[str, str]], body: bytes): ...

View file

@ -1,7 +1,10 @@
use pyo3::prelude::*;
use pyo3::types::PyDict;
use std::{net::SocketAddr, sync::OnceLock};
use tokio::sync::oneshot;
use std::{
net::SocketAddr,
sync::{Arc, OnceLock},
};
use tokio::sync::{oneshot, Notify};
use super::{
io::{ASGIHTTPProtocol as HTTPProtocol, ASGIWebsocketProtocol as WebsocketProtocol, WebsocketDetachedTransport},
@ -145,6 +148,7 @@ impl CallbackWatcherWebsocket {
pub(crate) fn call_http(
cb: ArcCBScheduler,
rt: RuntimeRef,
disconnect_guard: Arc<Notify>,
server_addr: SocketAddr,
client_addr: SocketAddr,
scheme: &str,
@ -152,7 +156,7 @@ pub(crate) fn call_http(
body: hyper::body::Incoming,
) -> oneshot::Receiver<HTTPResponse> {
let (tx, rx) = oneshot::channel();
let protocol = HTTPProtocol::new(rt.clone(), body, tx);
let protocol = HTTPProtocol::new(rt.clone(), body, tx, disconnect_guard);
let scheme: Box<str> = scheme.into();
rt.spawn_blocking(move |py| {

View file

@ -1,7 +1,7 @@
use http_body_util::BodyExt;
use hyper::{header::SERVER as HK_SERVER, http::response::Builder as ResponseBuilder, StatusCode};
use std::net::SocketAddr;
use tokio::sync::mpsc;
use std::{net::SocketAddr, sync::Arc};
use tokio::sync::{mpsc, Notify};
use super::callbacks::{call_http, call_ws};
use crate::{
@ -16,8 +16,19 @@ const SCHEME_WS: &str = "ws";
const SCHEME_WSS: &str = "wss";
macro_rules! handle_http_response {
($handler:expr, $rt:expr, $callback:expr, $server_addr:expr, $client_addr:expr, $scheme:expr, $req:expr, $body:expr) => {
match $handler($callback, $rt, $server_addr, $client_addr, $req, $scheme, $body).await {
($handler:expr, $rt:expr, $disconnect_guard:expr, $callback:expr, $server_addr:expr, $client_addr:expr, $scheme:expr, $req:expr, $body:expr) => {
match $handler(
$callback,
$rt,
$disconnect_guard,
$server_addr,
$client_addr,
$req,
$scheme,
$body,
)
.await
{
Ok(res) => res,
_ => {
log::error!("ASGI protocol failure");
@ -32,6 +43,7 @@ macro_rules! handle_request {
#[inline]
pub(crate) async fn $func_name(
rt: RuntimeRef,
disconnect_guard: Arc<Notify>,
callback: ArcCBScheduler,
server_addr: SocketAddr,
client_addr: SocketAddr,
@ -42,6 +54,7 @@ macro_rules! handle_request {
handle_http_response!(
$handler,
rt,
disconnect_guard,
callback,
server_addr,
client_addr,
@ -58,6 +71,7 @@ macro_rules! handle_request_with_ws {
#[inline]
pub(crate) async fn $func_name(
rt: RuntimeRef,
disconnect_guard: Arc<Notify>,
callback: ArcCBScheduler,
server_addr: SocketAddr,
client_addr: SocketAddr,
@ -142,6 +156,7 @@ macro_rules! handle_request_with_ws {
handle_http_response!(
$handler_req,
rt,
disconnect_guard,
callback,
server_addr,
client_addr,

View file

@ -13,7 +13,7 @@ use std::{
};
use tokio::{
fs::File,
sync::{mpsc, oneshot, Mutex as AsyncMutex},
sync::{mpsc, oneshot, Mutex as AsyncMutex, Notify},
};
use tokio_tungstenite::tungstenite::Message;
use tokio_util::io::ReaderStream;
@ -25,7 +25,9 @@ use super::{
use crate::{
conversion::FutureResultToPy,
http::{response_404, HTTPResponse, HTTPResponseBody, HV_SERVER},
runtime::{empty_future_into_py, err_future_into_py, future_into_py_futlike, Runtime, RuntimeRef},
runtime::{
done_future_into_py, empty_future_into_py, err_future_into_py, future_into_py_futlike, Runtime, RuntimeRef,
},
ws::{HyperWebsocket, UpgradeData, WSRxStream, WSTxStream},
};
@ -37,27 +39,36 @@ static WS_SUBPROTO_HNAME: &str = "Sec-WebSocket-Protocol";
pub(crate) struct ASGIHTTPProtocol {
rt: RuntimeRef,
tx: Mutex<Option<oneshot::Sender<HTTPResponse>>>,
disconnect_guard: Arc<Notify>,
request_body: Arc<AsyncMutex<http_body_util::BodyStream<body::Incoming>>>,
response_started: atomic::AtomicBool,
response_chunked: atomic::AtomicBool,
response_intent: Mutex<Option<(u16, HeaderMap)>>,
body_tx: Mutex<Option<mpsc::Sender<Result<body::Bytes, anyhow::Error>>>>,
flow_rx_exhausted: Arc<atomic::AtomicBool>,
flow_rx_closed: Arc<atomic::AtomicBool>,
flow_tx_waiter: Arc<tokio::sync::Notify>,
sent_response_code: Arc<atomic::AtomicU16>,
}
impl ASGIHTTPProtocol {
pub fn new(rt: RuntimeRef, body: hyper::body::Incoming, tx: oneshot::Sender<HTTPResponse>) -> Self {
pub fn new(
rt: RuntimeRef,
body: hyper::body::Incoming,
tx: oneshot::Sender<HTTPResponse>,
disconnect_guard: Arc<Notify>,
) -> Self {
Self {
rt,
tx: Mutex::new(Some(tx)),
disconnect_guard,
request_body: Arc::new(AsyncMutex::new(http_body_util::BodyStream::new(body))),
response_started: false.into(),
response_chunked: false.into(),
response_intent: Mutex::new(None),
body_tx: Mutex::new(None),
flow_rx_exhausted: Arc::new(atomic::AtomicBool::new(false)),
flow_rx_closed: Arc::new(atomic::AtomicBool::new(false)),
flow_tx_waiter: Arc::new(tokio::sync::Notify::new()),
sent_response_code: Arc::new(atomic::AtomicU16::new(500)),
}
@ -108,10 +119,22 @@ impl ASGIHTTPProtocol {
#[pymethods]
impl ASGIHTTPProtocol {
fn receive<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
if self.flow_rx_closed.load(atomic::Ordering::Acquire) {
return done_future_into_py(
py,
super::conversion::message_into_py(py, ASGIMessageType::HTTPDisconnect).map(Bound::unbind),
);
}
if self.flow_rx_exhausted.load(atomic::Ordering::Acquire) {
let flow_hld = self.flow_tx_waiter.clone();
let flow_dgr = self.disconnect_guard.clone();
let flow_dsr = self.flow_rx_closed.clone();
return future_into_py_futlike(self.rt.clone(), py, async move {
let () = flow_hld.notified().await;
tokio::select! {
() = flow_hld.notified() => {},
() = flow_dgr.notified() => flow_dsr.store(true, atomic::Ordering::Release),
}
FutureResultToPy::ASGIMessage(ASGIMessageType::HTTPDisconnect)
});
}
@ -119,24 +142,33 @@ impl ASGIHTTPProtocol {
let body_ref = self.request_body.clone();
let flow_ref = self.flow_rx_exhausted.clone();
let flow_hld = self.flow_tx_waiter.clone();
let flow_dgr = self.disconnect_guard.clone();
let flow_dsr = self.flow_rx_closed.clone();
future_into_py_futlike(self.rt.clone(), py, async move {
let mut bodym = body_ref.lock().await;
let body = &mut *bodym;
let mut more_body = false;
let chunk = match body.next().await {
Some(Ok(buf)) => {
more_body = true;
Ok(buf.into_data().unwrap_or_default())
let chunk = tokio::select! {
frame = body.next() => match frame {
Some(Ok(buf)) => {
more_body = true;
Some(buf.into_data().unwrap_or_default())
}
Some(Err(_)) => None,
_ => Some(body::Bytes::new()),
},
() = flow_dgr.notified() => {
flow_dsr.store(true, atomic::Ordering::Release);
None
}
Some(Err(err)) => Err(err),
_ => Ok(body::Bytes::new()),
};
if !more_body {
flow_ref.store(true, atomic::Ordering::Release);
}
match chunk {
Ok(data) => FutureResultToPy::ASGIMessage(ASGIMessageType::HTTPRequestBody((data, more_body))),
Some(data) => FutureResultToPy::ASGIMessage(ASGIMessageType::HTTPRequestBody((data, more_body))),
_ => {
flow_hld.notify_one();
FutureResultToPy::ASGIMessage(ASGIMessageType::HTTPDisconnect)

View file

@ -333,6 +333,35 @@ impl PyEmptyAwaitable {
}
}
#[pyclass(frozen, module = "granian._granian")]
pub(crate) struct PyDoneAwaitable {
result: PyResult<PyObject>,
}
impl PyDoneAwaitable {
pub(crate) fn new(result: PyResult<PyObject>) -> Self {
Self { result }
}
}
#[pymethods]
impl PyDoneAwaitable {
fn __await__(pyself: PyRef<'_, Self>) -> PyRef<'_, Self> {
pyself
}
fn __iter__(pyself: PyRef<'_, Self>) -> PyRef<'_, Self> {
pyself
}
fn __next__(&self, py: Python) -> PyResult<PyObject> {
self.result
.as_ref()
.map(|v| v.clone_ref(py))
.map_err(|v| v.clone_ref(py))
}
}
#[pyclass(frozen, module = "granian._granian")]
pub(crate) struct PyErrAwaitable {
result: PyResult<()>,

View file

@ -1,6 +1,6 @@
use pyo3::prelude::*;
use std::sync::OnceLock;
use tokio::sync::oneshot;
use std::sync::{Arc, OnceLock};
use tokio::sync::{oneshot, Notify};
use super::{
io::{RSGIHTTPProtocol as HTTPProtocol, RSGIWebsocketProtocol as WebsocketProtocol, WebsocketDetachedTransport},
@ -118,11 +118,12 @@ impl CallbackWatcherWebsocket {
pub(crate) fn call_http(
cb: ArcCBScheduler,
rt: RuntimeRef,
disconnect_guard: Arc<Notify>,
body: hyper::body::Incoming,
scope: HTTPScope,
) -> oneshot::Receiver<PyResponse> {
let (tx, rx) = oneshot::channel();
let protocol = HTTPProtocol::new(rt.clone(), tx, body);
let protocol = HTTPProtocol::new(rt.clone(), tx, body, disconnect_guard);
rt.spawn_blocking(move |py| {
if let Ok(watcher) = CallbackWatcherHTTP::new(py, protocol, scope) {

View file

@ -1,8 +1,8 @@
use futures::sink::SinkExt;
use http_body_util::BodyExt;
use hyper::{header::SERVER as HK_SERVER, http::response::Builder as ResponseBuilder, StatusCode};
use std::net::SocketAddr;
use tokio::sync::mpsc;
use std::{net::SocketAddr, sync::Arc};
use tokio::sync::{mpsc, Notify};
use super::{
callbacks::{call_http, call_ws},
@ -30,8 +30,8 @@ macro_rules! build_scope {
}
macro_rules! handle_http_response {
($handler:expr, $rt:expr, $callback:expr, $body:expr, $scope:expr) => {
match $handler($callback, $rt, $body, $scope).await {
($handler:expr, $rt:expr, $disconnect_guard:expr, $callback:expr, $body:expr, $scope:expr) => {
match $handler($callback, $rt, $disconnect_guard, $body, $scope).await {
Ok(PyResponse::Body(pyres)) => pyres.to_response(),
Ok(PyResponse::File(pyres)) => pyres.to_response().await,
_ => {
@ -47,6 +47,7 @@ macro_rules! handle_request {
#[inline]
pub(crate) async fn $func_name(
rt: RuntimeRef,
disconnect_guard: Arc<Notify>,
callback: ArcCBScheduler,
server_addr: SocketAddr,
client_addr: SocketAddr,
@ -55,7 +56,7 @@ macro_rules! handle_request {
) -> HTTPResponse {
let (parts, body) = req.into_parts();
let scope = build_scope!(HTTPScope, server_addr, client_addr, parts, scheme);
handle_http_response!($handler, rt, callback, body, scope)
handle_http_response!($handler, rt, disconnect_guard, callback, body, scope)
}
};
}
@ -65,6 +66,7 @@ macro_rules! handle_request_with_ws {
#[inline]
pub(crate) async fn $func_name(
rt: RuntimeRef,
disconnect_guard: Arc<Notify>,
callback: ArcCBScheduler,
server_addr: SocketAddr,
client_addr: SocketAddr,
@ -131,7 +133,7 @@ macro_rules! handle_request_with_ws {
let (parts, body) = req.into_parts();
let scope = build_scope!(HTTPScope, server_addr, client_addr, parts, scheme);
handle_http_response!($handler_req, rt, callback, body, scope)
handle_http_response!($handler_req, rt, disconnect_guard, callback, body, scope)
}
};
}

View file

@ -4,9 +4,9 @@ use hyper::body;
use pyo3::{prelude::*, pybacked::PyBackedStr};
use std::{
borrow::Cow,
sync::{Arc, Mutex, RwLock},
sync::{atomic, Arc, Mutex, RwLock},
};
use tokio::sync::{mpsc, oneshot, Mutex as AsyncMutex};
use tokio::sync::{mpsc, oneshot, Mutex as AsyncMutex, Notify};
use tokio_tungstenite::tungstenite::Message;
use super::{
@ -15,7 +15,7 @@ use super::{
};
use crate::{
conversion::FutureResultToPy,
runtime::{future_into_py_futlike, RuntimeRef},
runtime::{empty_future_into_py, future_into_py_futlike, RuntimeRef},
ws::{HyperWebsocket, UpgradeData, WSRxStream, WSTxStream},
};
@ -63,17 +63,26 @@ impl RSGIHTTPStreamTransport {
pub(crate) struct RSGIHTTPProtocol {
rt: RuntimeRef,
tx: Mutex<Option<oneshot::Sender<PyResponse>>>,
disconnect_guard: Arc<Notify>,
body: Mutex<Option<body::Incoming>>,
body_stream: Arc<AsyncMutex<Option<http_body_util::BodyStream<body::Incoming>>>>,
disconnected: Arc<atomic::AtomicBool>,
}
impl RSGIHTTPProtocol {
pub fn new(rt: RuntimeRef, tx: oneshot::Sender<PyResponse>, body: body::Incoming) -> Self {
pub fn new(
rt: RuntimeRef,
tx: oneshot::Sender<PyResponse>,
body: body::Incoming,
disconnect_guard: Arc<Notify>,
) -> Self {
Self {
rt,
tx: Mutex::new(Some(tx)),
disconnect_guard,
body: Mutex::new(Some(body)),
body_stream: Arc::new(AsyncMutex::new(None)),
disconnected: Arc::new(atomic::AtomicBool::new(false)),
}
}
@ -110,6 +119,7 @@ impl RSGIHTTPProtocol {
if self.body_stream.blocking_lock().is_none() {
return Err(pyo3::exceptions::PyStopAsyncIteration::new_err("stream exhausted"));
}
let body_stream = self.body_stream.clone();
future_into_py_futlike(self.rt.clone(), py, async move {
let guard = &mut *body_stream.lock().await;
@ -121,38 +131,52 @@ impl RSGIHTTPProtocol {
FutureResultToPy::Bytes(chunk)
}
_ => {
let _ = guard.take();
_ = guard.take();
FutureResultToPy::Bytes(body::Bytes::new())
}
}
})
}
fn client_disconnect<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
if self.disconnected.load(atomic::Ordering::Acquire) {
return empty_future_into_py(py);
}
let guard = self.disconnect_guard.clone();
let state = self.disconnected.clone();
future_into_py_futlike(self.rt.clone(), py, async move {
guard.notified().await;
state.store(true, atomic::Ordering::Release);
FutureResultToPy::None
})
}
#[pyo3(signature = (status=200, headers=vec![]))]
fn response_empty(&self, status: u16, headers: Vec<(PyBackedStr, PyBackedStr)>) {
if let Some(tx) = self.tx.lock().unwrap().take() {
let _ = tx.send(PyResponse::Body(PyResponseBody::empty(status, headers)));
_ = tx.send(PyResponse::Body(PyResponseBody::empty(status, headers)));
}
}
#[pyo3(signature = (status=200, headers=vec![], body=vec![].into()))]
fn response_bytes(&self, status: u16, headers: Vec<(PyBackedStr, PyBackedStr)>, body: Cow<[u8]>) {
if let Some(tx) = self.tx.lock().unwrap().take() {
let _ = tx.send(PyResponse::Body(PyResponseBody::from_bytes(status, headers, body)));
_ = tx.send(PyResponse::Body(PyResponseBody::from_bytes(status, headers, body)));
}
}
#[pyo3(signature = (status=200, headers=vec![], body=String::new()))]
fn response_str(&self, status: u16, headers: Vec<(PyBackedStr, PyBackedStr)>, body: String) {
if let Some(tx) = self.tx.lock().unwrap().take() {
let _ = tx.send(PyResponse::Body(PyResponseBody::from_string(status, headers, body)));
_ = tx.send(PyResponse::Body(PyResponseBody::from_string(status, headers, body)));
}
}
#[pyo3(signature = (status, headers, file))]
fn response_file(&self, status: u16, headers: Vec<(PyBackedStr, PyBackedStr)>, file: String) {
if let Some(tx) = self.tx.lock().unwrap().take() {
let _ = tx.send(PyResponse::File(PyResponseFile::new(status, headers, file)));
_ = tx.send(PyResponse::File(PyResponseFile::new(status, headers, file)));
}
}
@ -168,7 +192,7 @@ impl RSGIHTTPProtocol {
let body_stream = http_body_util::StreamBody::new(
tokio_stream::wrappers::ReceiverStream::new(body_rx).map_ok(body::Frame::data),
);
let _ = tx.send(PyResponse::Body(PyResponseBody::new(
_ = tx.send(PyResponse::Body(PyResponseBody::new(
status,
headers,
BodyExt::boxed(BodyExt::map_err(body_stream, std::convert::Into::into)),

View file

@ -16,7 +16,7 @@ use tokio_util::io::ReaderStream;
use crate::http::{empty_body, response_404, HTTPResponseBody, HV_SERVER};
const RSGI_PROTO_VERSION: &str = "1.4";
const RSGI_PROTO_VERSION: &str = "1.5";
#[pyclass(frozen, module = "granian._granian")]
#[derive(Clone)]

View file

@ -14,7 +14,7 @@ use super::callbacks::PyFutureAwaitable;
use super::callbacks::{PyFutureDoneCallback, PyFutureResultSetter};
use super::blocking::BlockingRunner;
use super::callbacks::{PyEmptyAwaitable, PyErrAwaitable, PyIterAwaitable};
use super::callbacks::{PyDoneAwaitable, PyEmptyAwaitable, PyErrAwaitable, PyIterAwaitable};
use super::conversion::FutureResultToPy;
pub trait JoinError {
@ -168,6 +168,11 @@ pub(crate) fn empty_future_into_py(py: Python) -> PyResult<Bound<PyAny>> {
PyEmptyAwaitable.into_bound_py_any(py)
}
#[inline(always)]
pub(crate) fn done_future_into_py(py: Python, result: PyResult<PyObject>) -> PyResult<Bound<PyAny>> {
PyDoneAwaitable::new(result).into_bound_py_any(py)
}
#[inline(always)]
pub(crate) fn err_future_into_py(py: Python, err: PyResult<()>) -> PyResult<Bound<PyAny>> {
PyErrAwaitable::new(err).into_bound_py_any(py)

View file

@ -202,13 +202,25 @@ where
}
macro_rules! build_service {
($local_addr:expr, $remote_addr:expr, $callback_wrapper:expr, $rt:expr, $target:expr, $proto:expr) => {
($local_addr:expr, $remote_addr:expr, $callback_wrapper:expr, $rt:expr, $disconnect_guard:expr, $target:expr, $proto:expr) => {
hyper::service::service_fn(move |request: crate::http::HTTPRequest| {
let callback_wrapper = $callback_wrapper.clone();
let rth = $rt.clone();
let disconnect_guard = $disconnect_guard.clone();
async move {
Ok::<_, anyhow::Error>($target(rth, callback_wrapper, $local_addr, $remote_addr, request, $proto).await)
Ok::<_, anyhow::Error>(
$target(
rth,
disconnect_guard,
callback_wrapper,
$local_addr,
$remote_addr,
request,
$proto,
)
.await,
)
}
})
};
@ -274,9 +286,18 @@ macro_rules! handle_connection_http1 {
let rth = $rth.clone();
let callback_wrapper = $callback.clone();
$spawner(async move {
let svc =
crate::workers::build_service!(local_addr, remote_addr, callback_wrapper, rth, $target, $proto);
let _ = hyper::server::conn::http1::Builder::new()
let disconnect_guard = std::sync::Arc::new(tokio::sync::Notify::new());
let disconnect_tx = disconnect_guard.clone();
let svc = crate::workers::build_service!(
local_addr,
remote_addr,
callback_wrapper,
rth,
disconnect_guard,
$target,
$proto
);
_ = hyper::server::conn::http1::Builder::new()
.timer(crate::io::TokioTimer::new())
.header_read_timeout($http_opts.header_read_timeout)
.keep_alive($http_opts.keep_alive)
@ -284,6 +305,7 @@ macro_rules! handle_connection_http1 {
.pipeline_flush($http_opts.pipeline_flush)
.serve_connection($stream_wrapper(stream), svc)
.await;
disconnect_tx.notify_one();
drop(permit);
});
}
@ -296,9 +318,18 @@ macro_rules! handle_connection_http1_upgrades {
let rth = $rth.clone();
let callback_wrapper = $callback.clone();
$spawner(async move {
let svc =
crate::workers::build_service!(local_addr, remote_addr, callback_wrapper, rth, $target, $proto);
let _ = hyper::server::conn::http1::Builder::new()
let disconnect_guard = std::sync::Arc::new(tokio::sync::Notify::new());
let disconnect_tx = disconnect_guard.clone();
let svc = crate::workers::build_service!(
local_addr,
remote_addr,
callback_wrapper,
rth,
disconnect_guard,
$target,
$proto
);
_ = hyper::server::conn::http1::Builder::new()
.timer(crate::io::TokioTimer::new())
.header_read_timeout($http_opts.header_read_timeout)
.keep_alive($http_opts.keep_alive)
@ -307,6 +338,7 @@ macro_rules! handle_connection_http1_upgrades {
.serve_connection($stream_wrapper(stream), svc)
.with_upgrades()
.await;
disconnect_tx.notify_one();
drop(permit);
});
}
@ -319,9 +351,18 @@ macro_rules! handle_connection_http2 {
let rth = $rth.clone();
let callback_wrapper = $callback.clone();
$spawner(async move {
let svc =
crate::workers::build_service!(local_addr, remote_addr, callback_wrapper, rth, $target, $proto);
let _ = hyper::server::conn::http2::Builder::new($executor_builder())
let disconnect_guard = std::sync::Arc::new(tokio::sync::Notify::new());
let disconnect_tx = disconnect_guard.clone();
let svc = crate::workers::build_service!(
local_addr,
remote_addr,
callback_wrapper,
rth,
disconnect_guard,
$target,
$proto
);
_ = hyper::server::conn::http2::Builder::new($executor_builder())
.timer(crate::io::TokioTimer::new())
.adaptive_window($http_opts.adaptive_window)
.initial_connection_window_size($http_opts.initial_connection_window_size)
@ -334,6 +375,7 @@ macro_rules! handle_connection_http2 {
.max_send_buf_size($http_opts.max_send_buffer_size)
.serve_connection($stream_wrapper(stream), svc)
.await;
disconnect_tx.notify_one();
drop(permit);
});
}
@ -346,8 +388,17 @@ macro_rules! handle_connection_httpa {
let rth = $rth.clone();
let callback_wrapper = $callback.clone();
$spawner(async move {
let svc =
crate::workers::build_service!(local_addr, remote_addr, callback_wrapper, rth, $target, $proto);
let disconnect_guard = std::sync::Arc::new(tokio::sync::Notify::new());
let disconnect_tx = disconnect_guard.clone();
let svc = crate::workers::build_service!(
local_addr,
remote_addr,
callback_wrapper,
rth,
disconnect_guard,
$target,
$proto
);
let mut conn = hyper_util::server::conn::auto::Builder::new($executor_builder());
conn.http1()
.timer(crate::io::TokioTimer::new())
@ -366,7 +417,8 @@ macro_rules! handle_connection_httpa {
.max_frame_size($http2_opts.max_frame_size)
.max_header_list_size($http2_opts.max_headers_size)
.max_send_buf_size($http2_opts.max_send_buffer_size);
let _ = conn.$conn_method($stream_wrapper(stream), svc).await;
_ = conn.$conn_method($stream_wrapper(stream), svc).await;
disconnect_tx.notify_one();
drop(permit);
});
}

View file

@ -1,5 +1,6 @@
use hyper::Response;
use std::net::SocketAddr;
use std::{net::SocketAddr, sync::Arc};
use tokio::sync::Notify;
use super::callbacks::call_http;
use crate::{
@ -19,6 +20,7 @@ fn build_response(status: u16, pyheaders: hyper::HeaderMap, body: HTTPResponseBo
#[inline]
pub(crate) async fn handle(
rt: RuntimeRef,
_disconnect_guard: Arc<Notify>,
callback: ArcCBScheduler,
server_addr: SocketAddr,
client_addr: SocketAddr,

View file

@ -17,7 +17,7 @@ async def test_scope(rsgi_server, runtime_mode):
data = res.json()
assert data['proto'] == 'http'
assert data['http_version'] == '1.1'
assert data['rsgi_version'] == '1.4'
assert data['rsgi_version'] == '1.5'
assert data['scheme'] == 'http'
assert data['method'] == 'GET'
assert data['path'] == '/info'

View file

@ -62,7 +62,7 @@ async def test_rsgi_scope(rsgi_server, runtime_mode):
data = json.loads(res)
assert data['proto'] == 'ws'
assert data['http_version'] == '1.1'
assert data['rsgi_version'] == '1.4'
assert data['rsgi_version'] == '1.5'
assert data['scheme'] == 'http'
assert data['method'] == 'GET'
assert data['path'] == '/ws_info'