mirror of
https://github.com/emmett-framework/granian.git
synced 2025-12-23 05:36:49 +00:00
Improve errors handling
This commit is contained in:
parent
dd3f50d0c0
commit
c4e6bfd477
10 changed files with 163 additions and 20 deletions
|
|
@ -141,7 +141,8 @@ def future_handler(watcher):
|
|||
def handler(task):
|
||||
try:
|
||||
task.result()
|
||||
watcher.done(True)
|
||||
except Exception:
|
||||
watcher.done(False)
|
||||
raise
|
||||
watcher.done(True)
|
||||
return handler
|
||||
|
|
|
|||
|
|
@ -95,6 +95,7 @@ def future_handler(watcher):
|
|||
try:
|
||||
res = task.result()
|
||||
except Exception:
|
||||
res = None
|
||||
watcher.err()
|
||||
raise
|
||||
watcher.done(res)
|
||||
return handler
|
||||
|
|
|
|||
|
|
@ -55,6 +55,10 @@ pub(crate) async fn call(
|
|||
|
||||
match rx.await {
|
||||
Ok(true) => Ok(()),
|
||||
Ok(false) => {
|
||||
log::warn!("Application callable raised an exception");
|
||||
error_flow!()
|
||||
},
|
||||
_ => error_flow!()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,13 +3,13 @@ use tokio::sync::oneshot;
|
|||
|
||||
use crate::callbacks::CallbackWrapper;
|
||||
use super::{
|
||||
errors::ApplicationError,
|
||||
errors::{error_proto, error_app},
|
||||
io::{RSGIHTTPProtocol as HTTPProtocol, RSGIWebsocketProtocol as WebsocketProtocol},
|
||||
types::RSGIScope as Scope
|
||||
};
|
||||
|
||||
|
||||
#[derive(FromPyObject)]
|
||||
#[derive(FromPyObject, Debug)]
|
||||
pub(crate) struct CallbackResponse {
|
||||
pub mode: u32,
|
||||
pub status: i32,
|
||||
|
|
@ -21,7 +21,7 @@ pub(crate) struct CallbackResponse {
|
|||
|
||||
#[pyclass]
|
||||
pub(crate) struct CallbackResponseWatcher {
|
||||
tx: Option<oneshot::Sender<CallbackResponse>>,
|
||||
tx: Option<oneshot::Sender<Option<CallbackResponse>>>,
|
||||
#[pyo3(get)]
|
||||
event_loop: PyObject,
|
||||
#[pyo3(get)]
|
||||
|
|
@ -32,7 +32,7 @@ impl CallbackResponseWatcher {
|
|||
pub fn new(
|
||||
py: Python,
|
||||
cb: CallbackWrapper,
|
||||
tx: oneshot::Sender<CallbackResponse>
|
||||
tx: oneshot::Sender<Option<CallbackResponse>>
|
||||
) -> Self {
|
||||
Self {
|
||||
tx: Some(tx),
|
||||
|
|
@ -46,8 +46,22 @@ impl CallbackResponseWatcher {
|
|||
impl CallbackResponseWatcher {
|
||||
fn done(&mut self, py: Python, result: PyObject) -> PyResult<()> {
|
||||
if let Some(tx) = self.tx.take() {
|
||||
// FIXME: handle failure
|
||||
let _ = tx.send(result.extract(py)?);
|
||||
match result.extract(py) {
|
||||
Ok(res) => {
|
||||
let _ = tx.send(res);
|
||||
return Ok(())
|
||||
},
|
||||
_ => {
|
||||
let _ = tx.send(None);
|
||||
}
|
||||
}
|
||||
};
|
||||
error_proto!()
|
||||
}
|
||||
|
||||
fn err(&mut self) -> PyResult<()> {
|
||||
if let Some(tx) = self.tx.take() {
|
||||
let _ = tx.send(None);
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -55,7 +69,7 @@ impl CallbackResponseWatcher {
|
|||
|
||||
#[pyclass]
|
||||
pub(crate) struct CallbackProtocolWatcher {
|
||||
tx: Option<oneshot::Sender<(i32, bool)>>,
|
||||
tx: Option<oneshot::Sender<Option<(i32, bool)>>>,
|
||||
#[pyo3(get)]
|
||||
event_loop: PyObject,
|
||||
#[pyo3(get)]
|
||||
|
|
@ -66,7 +80,7 @@ impl CallbackProtocolWatcher {
|
|||
pub fn new(
|
||||
py: Python,
|
||||
cb: CallbackWrapper,
|
||||
tx: oneshot::Sender<(i32, bool)>
|
||||
tx: oneshot::Sender<Option<(i32, bool)>>
|
||||
) -> Self {
|
||||
Self {
|
||||
tx: Some(tx),
|
||||
|
|
@ -80,8 +94,22 @@ impl CallbackProtocolWatcher {
|
|||
impl CallbackProtocolWatcher {
|
||||
fn done(&mut self, py: Python, result: PyObject) -> PyResult<()> {
|
||||
if let Some(tx) = self.tx.take() {
|
||||
// FIXME: handle failure
|
||||
let _ = tx.send(result.extract(py)?);
|
||||
match result.extract(py) {
|
||||
Ok(res) => {
|
||||
let _ = tx.send(res);
|
||||
return Ok(())
|
||||
},
|
||||
_ => {
|
||||
let _ = tx.send(None);
|
||||
}
|
||||
}
|
||||
};
|
||||
error_proto!()
|
||||
}
|
||||
|
||||
fn err(&mut self) -> PyResult<()> {
|
||||
if let Some(tx) = self.tx.take() {
|
||||
let _ = tx.send(None);
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -99,8 +127,19 @@ pub(crate) async fn call_response(
|
|||
})?;
|
||||
|
||||
match rx.await {
|
||||
Ok(v) => Ok(v),
|
||||
_ => Err(ApplicationError.into())
|
||||
Ok(res) => {
|
||||
match res {
|
||||
Some(res) => Ok(res),
|
||||
_ => {
|
||||
log::warn!("Application failed to return a response");
|
||||
error_app!()
|
||||
}
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
log::error!("RSGI protocol failure");
|
||||
error_proto!()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -116,7 +155,18 @@ pub(crate) async fn call_protocol(
|
|||
})?;
|
||||
|
||||
match rx.await {
|
||||
Ok(v) => Ok(v),
|
||||
_ => Err(ApplicationError.into())
|
||||
Ok(res) => {
|
||||
match res {
|
||||
Some(res) => Ok(res),
|
||||
_ => {
|
||||
log::warn!("Application failed to close protocol");
|
||||
error_app!()
|
||||
}
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
log::error!("RSGI protocol failure");
|
||||
error_proto!()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -62,8 +62,15 @@ impl std::convert::From<ApplicationError> for PyErr {
|
|||
|
||||
macro_rules! error_proto {
|
||||
() => {
|
||||
Err(RSGIProtocolError.into())
|
||||
Err(super::errors::RSGIProtocolError.into())
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! error_app {
|
||||
() => {
|
||||
Err(super::errors::ApplicationError.into())
|
||||
};
|
||||
}
|
||||
|
||||
pub(crate) use error_proto;
|
||||
pub(crate) use error_app;
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ use crate::{
|
|||
runtime::{RuntimeRef, future_into_py},
|
||||
ws::{HyperWebsocket, UpgradeData}
|
||||
};
|
||||
use super::errors::{RSGIProtocolError, error_proto};
|
||||
use super::errors::error_proto;
|
||||
|
||||
|
||||
#[pyclass(module="granian._granian")]
|
||||
|
|
|
|||
|
|
@ -85,11 +85,21 @@ async def ws_echo(scope, receive, send):
|
|||
})
|
||||
|
||||
|
||||
async def err_app(scope, receive, send):
|
||||
1 / 0
|
||||
|
||||
|
||||
async def err_proto(scope, receive, send):
|
||||
await send({'type': 'wrong.msg'})
|
||||
|
||||
|
||||
def app(scope, receive, send):
|
||||
return {
|
||||
"/info": info,
|
||||
"/echo": echo,
|
||||
"/ws_reject": ws_reject,
|
||||
"/ws_info": ws_info,
|
||||
"/ws_echo": ws_echo
|
||||
"/ws_echo": ws_echo,
|
||||
"/err_app": err_app,
|
||||
"/err_proto": err_proto
|
||||
}[scope['path']](scope, receive, send)
|
||||
|
|
|
|||
|
|
@ -73,11 +73,21 @@ async def ws_echo(_, protocol: WebsocketProtocol):
|
|||
return protocol.close()
|
||||
|
||||
|
||||
async def err_app(scope: Scope, protocol: HTTPProtocol):
|
||||
1 / 0
|
||||
|
||||
|
||||
async def err_proto(scope: Scope, protocol: HTTPProtocol):
|
||||
return "bad"
|
||||
|
||||
|
||||
def app(scope, protocol):
|
||||
return {
|
||||
"/info": info,
|
||||
"/echo": echo,
|
||||
"/ws_reject": ws_reject,
|
||||
"/ws_info": ws_info,
|
||||
"/ws_echo": ws_echo
|
||||
"/ws_echo": ws_echo,
|
||||
"/err_app": err_app,
|
||||
"/err_proto": err_proto
|
||||
}[scope.path](scope, protocol)
|
||||
|
|
|
|||
|
|
@ -45,3 +45,33 @@ async def test_body(asgi_server, threading_mode):
|
|||
|
||||
assert res.status_code == 200
|
||||
assert res.text == "test"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"threading_mode",
|
||||
[
|
||||
"runtime",
|
||||
"workers"
|
||||
]
|
||||
)
|
||||
async def test_app_error(asgi_server, threading_mode):
|
||||
async with asgi_server(threading_mode) as port:
|
||||
res = httpx.get(f"http://localhost:{port}/err_app")
|
||||
|
||||
assert res.status_code == 500
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"threading_mode",
|
||||
[
|
||||
"runtime",
|
||||
"workers"
|
||||
]
|
||||
)
|
||||
async def test_protocol_error(asgi_server, threading_mode):
|
||||
async with asgi_server(threading_mode) as port:
|
||||
res = httpx.get(f"http://localhost:{port}/err_proto")
|
||||
|
||||
assert res.status_code == 500
|
||||
|
|
|
|||
|
|
@ -42,3 +42,33 @@ async def test_body(rsgi_server, threading_mode):
|
|||
|
||||
assert res.status_code == 200
|
||||
assert res.text == "test"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"threading_mode",
|
||||
[
|
||||
"runtime",
|
||||
"workers"
|
||||
]
|
||||
)
|
||||
async def test_app_error(rsgi_server, threading_mode):
|
||||
async with rsgi_server(threading_mode) as port:
|
||||
res = httpx.get(f"http://localhost:{port}/err_app")
|
||||
|
||||
assert res.status_code == 500
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"threading_mode",
|
||||
[
|
||||
"runtime",
|
||||
"workers"
|
||||
]
|
||||
)
|
||||
async def test_protocol_error(rsgi_server, threading_mode):
|
||||
async with rsgi_server(threading_mode) as port:
|
||||
res = httpx.get(f"http://localhost:{port}/err_proto")
|
||||
|
||||
assert res.status_code == 500
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue