Add backpressure (#301)

This commit is contained in:
Giovanni Barillari 2024-05-26 16:02:44 +02:00 committed by GitHub
parent 1401261f36
commit 4718c12012
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 384 additions and 309 deletions

16
Cargo.lock generated
View file

@ -104,6 +104,21 @@ dependencies = [
"libc",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80"
[[package]]
name = "crypto-common"
version = "0.1.6"
@ -278,6 +293,7 @@ name = "granian"
version = "1.4.0"
dependencies = [
"anyhow",
"crossbeam-channel",
"futures",
"http-body-util",
"hyper",

View file

@ -32,6 +32,7 @@ crate-type = ["cdylib"]
[dependencies]
anyhow = "=1.0"
crossbeam-channel = "0.5"
futures = "0.3"
http-body-util = { version = "=0.1" }
hyper = { version = "=1.3", features = ["http1", "http2", "server"] }

View file

@ -94,7 +94,7 @@ Options:
GRANIAN_THREADS; default: 1; x>=1]
--blocking-threads INTEGER RANGE
Number of blocking threads [env var:
GRANIAN_BLOCKING_THREADS; default: 1; x>=1]
GRANIAN_BLOCKING_THREADS; x>=1]
--threading-mode [runtime|workers]
Threading mode to use [env var:
GRANIAN_THREADING_MODE; default: (workers)]
@ -105,6 +105,9 @@ Options:
--backlog INTEGER RANGE Maximum number of connections to hold in
backlog [env var: GRANIAN_BACKLOG; default:
1024; x>=128]
--backpressure INTEGER RANGE Maximum number of requests to process
concurrently [env var:
GRANIAN_BACKPRESSURE; x>=1]
--http1-buffer-size INTEGER RANGE
Set the maximum buffer size for HTTP/1
connections [env var:
@ -215,6 +218,22 @@ The following atoms are available for use:
| scheme | Request scheme |
| protocol | HTTP protocol version |
### Processes and threads
Granian offers different options to configure the number of processes and threads to be run, in particular:
- **workers**: the total number of processes holding a dedicated Python interpreter that will run the application
- **threads**: the number of Rust threads per worker that will perform network I/O
- **blocking threads**: the number of Rust threads per worker involved in blocking operations. The main role of these threads is to deal with blocking I/O  like opening files  but on synchronous protocols like WSGI these threads will also be responsible of interacting with the application code.
In general, Granian will try its best to automatically pick proper values for the threading configuration, leaving to you the responsibility to choose the number of workers you need.
There is no *golden rule* here, as these numbers will vastly depend both on your application behavior and the deployment target, but we can list some suggestions:
- matching the amount of CPU cores for the workers is generally the best starting point; on containerized environments like docker or k8s is best to have 1 worker per container though and scale your containers using the relevant orchestrator;
- the default number of threads is fine for the vast majority of applications out there; you might want to increase this number for applications dealing with several concurrently opened websockets;
- the default number of blocking threads should work properly with the majority of applications; in synchronous protocols like WSGI this will also impact the number of concurrent requests you can handle, but you should use the `backpressure` configuration parameter to control it and set a lower number of blocking threads only if your application has a very low (1ms order) average response time;
Also, you should generally avoid to configure workers and threads based on numbers of other servers, as Granian architecture is quite different from projects like Gunicorn or Uvicorn.
### Threading mode
Granian offers two different threading paradigms, due to the fact the inner Rust runtime can be multi-threaded in opposition to what happens in Python event-loop which can only run as a single thread.

View file

@ -10,25 +10,25 @@ import time
from contextlib import contextmanager
CPU = multiprocessing.cpu_count()
WRK_CONCURRENCIES = [CPU * 2**i for i in range(3, 7)]
WRK_CONCURRENCIES = [64, 128, 256, 512]
APPS = {
"asgi": (
"granian --interface asgi --log-level warning --backlog 2048 "
"--no-ws --http {http} "
"--workers {procs} --threads {threads} --blocking-threads {bthreads} "
"--no-ws --http {http} --backpressure 512 "
"--workers {procs} --threads {threads}{bthreads} "
"--threading-mode {thmode} app.asgi:app"
),
"rsgi": (
"granian --interface rsgi --log-level warning --backlog 2048 "
"--no-ws --http {http} "
"--workers {procs} --threads {threads} --blocking-threads {bthreads} "
"--no-ws --http {http} --backpressure 512 "
"--workers {procs} --threads {threads}{bthreads} "
"--threading-mode {thmode} app.rsgi:app"
),
"wsgi": (
"granian --interface wsgi --log-level warning --backlog 2048 "
"--no-ws --http {http} "
"--workers {procs} --threads {threads} --blocking-threads {bthreads} "
"--no-ws --http {http} --backpressure 512 "
"--workers {procs} --threads {threads}{bthreads} "
"--threading-mode {thmode} app.wsgi:app"
),
"uvicorn_h11": (
@ -59,7 +59,7 @@ APPS = {
def app(name, procs=None, threads=None, bthreads=None, thmode=None, http="1"):
procs = procs or 1
threads = threads or 1
bthreads = bthreads or 1
bthreads = f" --blocking-threads {bthreads}" if bthreads else ""
thmode = thmode or "workers"
proc_cmd = APPS[name].format(
procs=procs,
@ -119,16 +119,17 @@ def wrk(duration, concurrency, endpoint, post=False, h2=False):
}
def benchmark(endpoint, post=False, h2=False):
def benchmark(endpoint, post=False, h2=False, concurrencies=None):
concurrencies = concurrencies or WRK_CONCURRENCIES
results = {}
# primer
wrk(5, 8, endpoint, post=post, h2=h2)
time.sleep(2)
# warm up
wrk(5, max(WRK_CONCURRENCIES), endpoint, post=post, h2=h2)
wrk(5, max(concurrencies), endpoint, post=post, h2=h2)
time.sleep(3)
# bench
for concurrency in WRK_CONCURRENCIES:
for concurrency in concurrencies:
res = wrk(15, concurrency, endpoint, post=post, h2=h2)
results[concurrency] = res
time.sleep(3)
@ -137,36 +138,21 @@ def benchmark(endpoint, post=False, h2=False):
def concurrencies():
nperm = sorted(set([1, 2, round(CPU / 2.5), round(CPU / 2), CPU]))
nperm = sorted(set([1, 2, round(CPU / 5), round(CPU / 2.5), round(CPU / 2), CPU]))
results = {"wsgi": {}}
for interface in ["asgi", "rsgi"]:
for interface in ["asgi", "rsgi", "wsgi"]:
results[interface] = {}
for np in nperm:
for nt in [1, 2, 4]:
for threading_mode in ["workers", "runtime"]:
key = f"P{np} T{nt} {threading_mode[0]}th"
with app(interface, np, nt, 1, threading_mode):
with app(interface, np, nt, thmode=threading_mode):
print(f"Bench concurrencies - [{interface}] {threading_mode} {np}:{nt}")
results[interface][key] = {
"m": threading_mode,
"p": np,
"t": nt,
"b": 1,
"res": benchmark("b")
}
for np in nperm:
for nt, nbtl in [(1, [1, 2, 4]), (2, [1]), (4, [1])]:
for nbt in nbtl:
for threading_mode in ["workers", "runtime"]:
key = f"P{np} T{nt} B{nbt} {threading_mode[0]}th"
with app("wsgi", np, nt, nbt, threading_mode):
print(f"Bench concurrencies - [wsgi] {threading_mode} {np}:{nt}:{nbt}")
results["wsgi"][key] = {
"m": threading_mode,
"p": np,
"t": nt,
"b": nbt,
"res": benchmark("b")
"res": benchmark("b", concurrencies=[128, 256, 512, 1024])
}
return results
@ -186,7 +172,7 @@ def interfaces():
for interface in ["rsgi", "asgi", "wsgi"]:
for key, bench_data in benches.items():
route, opts = bench_data
with app(interface):
with app(interface, bthreads=1):
results[f"{interface.upper()} {key}"] = benchmark(route, **opts)
return results
@ -205,9 +191,9 @@ def http2():
def files():
results = {}
with app("rsgi"):
with app("rsgi", bthreads=1):
results["RSGI"] = benchmark("fp")
with app("asgi"):
with app("asgi", bthreads=1):
results["ASGI"] = benchmark("fb")
results["ASGI pathsend"] = benchmark("fp")
return results
@ -234,7 +220,7 @@ def vs_wsgi():
route, opts = bench_data
fw_app = fw.split("_")[1] if fw.startswith("granian") else fw
title = " ".join(item.title() for item in fw.split("_"))
with app(fw_app):
with app(fw_app, bthreads=1):
results[f"{title} {key}"] = benchmark(route, **opts)
return results
@ -254,7 +240,7 @@ def vs_http2():
def vs_files():
results = {}
with app("asgi"):
with app("asgi", bthreads=1):
results["Granian (pathsend)"] = benchmark("fp")
for fw in ["uvicorn_h11", "uvicorn_httptools", "hypercorn"]:
title = " ".join(item.title() for item in fw.split("_"))

View file

@ -18,14 +18,14 @@ Granian version: {{ =data.granian }}
{{ pass }}
{{ pass }}
| Mode | Processes | Threads | Blocking Threads | Total requests | RPS | avg latency | max latency |
| --- | --- | --- | --- | --- | --- | --- | --- |
| Mode | Processes | Threads | Total requests | RPS | avg latency | max latency |
| --- | --- | --- | --- | --- | --- | --- |
{{ for runs in data.results["concurrencies"][interface].values(): }}
{{ max_c, run = get_max_concurrency_run(runs["res"]) }}
{{ if int(run["requests"]["rps"]) == max_rps[runs["m"]]: }}
| **{{ =runs["m"] }} (c{{ =max_c }})** | **{{ =runs["p"] }}** | **{{ =runs["t"] }}** | **{{ =runs["b"] }}** | **{{ =run["requests"]["total"] }}** | **{{ =run["requests"]["rps"] }}** | **{{ =fmt_ms(run["latency"]["avg"]) }}** | **{{ =fmt_ms(run["latency"]["max"]) }}** |
| **{{ =runs["m"] }} (c{{ =max_c }})** | **{{ =runs["p"] }}** | **{{ =runs["t"] }}** | **{{ =run["requests"]["total"] }}** | **{{ =run["requests"]["rps"] }}** | **{{ =fmt_ms(run["latency"]["avg"]) }}** | **{{ =fmt_ms(run["latency"]["max"]) }}** |
{{ else: }}
| {{ =runs["m"] }} (c{{ =max_c }}) | {{ =runs["p"] }} | {{ =runs["t"] }} | {{ =runs["b"] }} | {{ =run["requests"]["total"] }} | {{ =run["requests"]["rps"] }} | {{ =fmt_ms(run["latency"]["avg"]) }} | {{ =fmt_ms(run["latency"]["max"]) }} |
| {{ =runs["m"] }} (c{{ =max_c }}) | {{ =runs["p"] }} | {{ =runs["t"] }} | {{ =run["requests"]["total"] }} | {{ =run["requests"]["rps"] }} | {{ =fmt_ms(run["latency"]["avg"]) }} | {{ =fmt_ms(run["latency"]["max"]) }} |
{{ pass }}
{{ pass }}

View file

@ -49,7 +49,8 @@ class ASGIWorker:
worker_id: int,
socket_fd: int,
threads: int,
pthreads: int,
blocking_threads: int,
backpressure: int,
http_mode: str,
http1_opts: Optional[HTTP1Settings],
http2_opts: Optional[HTTP2Settings],
@ -66,7 +67,8 @@ class WSGIWorker:
worker_id: int,
socket_fd: int,
threads: int,
pthreads: int,
blocking_threads: int,
backpressure: int,
http_mode: str,
http1_opts: Optional[HTTP1Settings],
http2_opts: Optional[HTTP2Settings],
@ -81,7 +83,8 @@ class RSGIWorker:
worker_id: int,
socket_fd: int,
threads: int,
pthreads: int,
blocking_threads: int,
backpressure: int,
http_mode: str,
http1_opts: Optional[HTTP1Settings],
http2_opts: Optional[HTTP2Settings],

View file

@ -6,6 +6,7 @@ from typing import Any, Callable, Optional, Type, TypeVar, Union
import click
from .constants import HTTPModes, Interfaces, Loops, ThreadModes
from .errors import ConfigurationError
from .http import HTTP1Settings, HTTP2Settings
from .log import LogLevels
from .server import Granian
@ -67,7 +68,6 @@ def option(*param_decls: str, cls: Optional[Type[click.Option]] = None, **attrs:
@option(
'--blocking-threads',
type=click.IntRange(1),
default=1,
help='Number of blocking threads',
)
@option(
@ -84,6 +84,11 @@ def option(*param_decls: str, cls: Optional[Type[click.Option]] = None, **attrs:
default=1024,
help='Maximum number of connections to hold in backlog',
)
@option(
'--backpressure',
type=click.IntRange(1),
help='Maximum number of requests to process concurrently',
)
@option(
'--http1-buffer-size',
type=click.IntRange(8192),
@ -202,11 +207,12 @@ def cli(
websockets: bool,
workers: int,
threads: int,
blocking_threads: int,
blocking_threads: Optional[int],
threading_mode: ThreadModes,
loop: Loops,
loop_opt: bool,
backlog: int,
backpressure: Optional[int],
http1_buffer_size: int,
http1_keep_alive: bool,
http1_pipeline_flush: bool,
@ -241,20 +247,21 @@ def cli(
print('Unable to parse provided logging config.')
raise click.exceptions.Exit(1)
Granian(
server = Granian(
app,
address=host,
port=port,
interface=interface,
workers=workers,
threads=threads,
pthreads=blocking_threads,
blocking_threads=blocking_threads,
threading_mode=threading_mode,
loop=loop,
loop_opt=loop_opt,
http=http,
websockets=websockets,
backlog=backlog,
backpressure=backpressure,
http1_settings=HTTP1Settings(
keep_alive=http1_keep_alive, max_buffer_size=http1_buffer_size, pipeline_flush=http1_pipeline_flush
),
@ -281,7 +288,12 @@ def cli(
respawn_interval=respawn_interval,
reload=reload,
process_name=process_name,
).serve()
)
try:
server.serve()
except ConfigurationError:
raise click.exceptions.Exit(1)
def entrypoint():

2
granian/errors.py Normal file
View file

@ -0,0 +1,2 @@
class ConfigurationError(Exception):
...

View file

@ -19,6 +19,7 @@ from ._imports import setproctitle, watchfiles
from ._internal import load_target
from .asgi import LifespanProtocol, _callback_wrapper as _asgi_call_wrap
from .constants import HTTPModes, Interfaces, Loops, ThreadModes
from .errors import ConfigurationError
from .http import HTTP1Settings, HTTP2Settings
from .log import DEFAULT_ACCESSLOG_FMT, LogLevels, configure_logging, logger
from .net import SocketHolder
@ -72,13 +73,14 @@ class Granian:
interface: Interfaces = Interfaces.RSGI,
workers: int = 1,
threads: int = 1,
pthreads: int = 1,
blocking_threads: Optional[int] = None,
threading_mode: ThreadModes = ThreadModes.workers,
loop: Loops = Loops.auto,
loop_opt: bool = False,
http: HTTPModes = HTTPModes.auto,
websockets: bool = True,
backlog: int = 1024,
backpressure: Optional[int] = None,
http1_settings: Optional[HTTP1Settings] = None,
http2_settings: Optional[HTTP2Settings] = None,
log_enabled: bool = True,
@ -100,13 +102,20 @@ class Granian:
self.interface = interface
self.workers = max(1, workers)
self.threads = max(1, threads)
self.pthreads = max(1, pthreads)
self.threading_mode = threading_mode
self.loop = loop
self.loop_opt = loop_opt
self.http = http
self.websockets = websockets
self.backlog = max(128, backlog)
self.backpressure = max(1, backpressure or self.backlog // self.workers)
self.blocking_threads = (
blocking_threads
if blocking_threads is not None
else max(
1, (self.backpressure if self.interface == Interfaces.WSGI else min(2, multiprocessing.cpu_count()))
)
)
self.http1_settings = http1_settings
self.http2_settings = http2_settings
self.log_enabled = log_enabled
@ -152,7 +161,8 @@ class Granian:
socket: socket.socket,
loop_impl: Loops,
threads: int,
pthreads: int,
blocking_threads: int,
backpressure: int,
threading_mode: ThreadModes,
http_mode: HTTPModes,
http1_settings: Optional[HTTP1Settings],
@ -183,7 +193,17 @@ class Granian:
wcallback = future_watcher_wrapper(wcallback)
worker = ASGIWorker(
worker_id, sfd, threads, pthreads, http_mode, http1_settings, http2_settings, websockets, loop_opt, *ssl_ctx
worker_id,
sfd,
threads,
blocking_threads,
backpressure,
http_mode,
http1_settings,
http2_settings,
websockets,
loop_opt,
*ssl_ctx,
)
serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode])
serve(wcallback, loop, contextvars.copy_context(), shutdown_event)
@ -196,7 +216,8 @@ class Granian:
socket: socket.socket,
loop_impl: Loops,
threads: int,
pthreads: int,
blocking_threads: int,
backpressure: int,
threading_mode: ThreadModes,
http_mode: HTTPModes,
http1_settings: Optional[HTTP1Settings],
@ -233,7 +254,17 @@ class Granian:
wcallback = future_watcher_wrapper(wcallback)
worker = ASGIWorker(
worker_id, sfd, threads, pthreads, http_mode, http1_settings, http2_settings, websockets, loop_opt, *ssl_ctx
worker_id,
sfd,
threads,
blocking_threads,
backpressure,
http_mode,
http1_settings,
http2_settings,
websockets,
loop_opt,
*ssl_ctx,
)
serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode])
serve(wcallback, loop, contextvars.copy_context(), shutdown_event)
@ -247,7 +278,8 @@ class Granian:
socket: socket.socket,
loop_impl: Loops,
threads: int,
pthreads: int,
blocking_threads: int,
backpressure: int,
threading_mode: ThreadModes,
http_mode: HTTPModes,
http1_settings: Optional[HTTP1Settings],
@ -280,7 +312,17 @@ class Granian:
callback_init(loop)
worker = RSGIWorker(
worker_id, sfd, threads, pthreads, http_mode, http1_settings, http2_settings, websockets, loop_opt, *ssl_ctx
worker_id,
sfd,
threads,
blocking_threads,
backpressure,
http_mode,
http1_settings,
http2_settings,
websockets,
loop_opt,
*ssl_ctx,
)
serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode])
serve(
@ -298,7 +340,8 @@ class Granian:
socket: socket.socket,
loop_impl: Loops,
threads: int,
pthreads: int,
blocking_threads: int,
backpressure: int,
threading_mode: ThreadModes,
http_mode: HTTPModes,
http1_settings: Optional[HTTP1Settings],
@ -324,7 +367,9 @@ class Granian:
shutdown_event = set_loop_signals(loop, [signal.SIGTERM, signal.SIGINT])
worker = WSGIWorker(worker_id, sfd, threads, pthreads, http_mode, http1_settings, http2_settings, *ssl_ctx)
worker = WSGIWorker(
worker_id, sfd, threads, blocking_threads, backpressure, http_mode, http1_settings, http2_settings, *ssl_ctx
)
serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode])
serve(_wsgi_call_wrap(callback, scope_opts, log_access_fmt), loop, contextvars.copy_context(), shutdown_event)
@ -352,7 +397,8 @@ class Granian:
socket_loader(),
self.loop,
self.threads,
self.pthreads,
self.blocking_threads,
self.backpressure,
self.threading_mode,
self.http,
self.http1_settings,
@ -529,7 +575,7 @@ class Granian:
setproctitle.setproctitle(self.process_name)
elif self.process_name is not None:
logger.error('Setting process name requires the granian[pname] extra')
sys.exit(1)
raise ConfigurationError('process_name')
serve_method = self._serve_with_reloader if self.reload_on_changes else self._serve
serve_method(spawn_target, target_loader)

View file

@ -1,7 +1,6 @@
use std::{net::SocketAddr, sync::Arc};
use pyo3::prelude::*;
use pyo3::types::PyDict;
use std::{net::SocketAddr, sync::Arc};
use tokio::sync::oneshot;
use super::{
@ -285,42 +284,7 @@ impl CallbackWrappedRunnerWebsocket {
// }
// }
macro_rules! call_impl_rtb_http {
($func_name:ident, $runner:ident) => {
#[inline]
pub(crate) fn $func_name(
cb: CallbackWrapper,
rt: RuntimeRef,
server_addr: SocketAddr,
client_addr: SocketAddr,
scheme: &str,
req: hyper::http::request::Parts,
body: hyper::body::Incoming,
) -> oneshot::Receiver<HTTPResponse> {
let (tx, rx) = oneshot::channel();
let protocol = HTTPProtocol::new(rt, body, tx);
scope_native_parts!(
req,
server_addr,
client_addr,
path,
query_string,
version,
server,
client
);
Python::with_gil(|py| {
let scope = build_scope_http(py, &req, version, server, client, scheme, &path, query_string).unwrap();
let _ = $runner::new(py, cb, protocol, scope).run(py);
});
rx
}
};
}
macro_rules! call_impl_rtt_http {
macro_rules! call_impl_http {
($func_name:ident, $runner:ident) => {
#[inline]
pub(crate) fn $func_name(
@ -332,11 +296,12 @@ macro_rules! call_impl_rtt_http {
req: hyper::http::request::Parts,
body: hyper::body::Incoming,
) -> oneshot::Receiver<HTTPResponse> {
let brt = rt.innerb.clone();
let (tx, rx) = oneshot::channel();
let protocol = HTTPProtocol::new(rt, body, tx);
let scheme: Arc<str> = scheme.into();
tokio::task::spawn_blocking(move || {
let _ = brt.run(move || {
scope_native_parts!(
req,
server_addr,
@ -359,43 +324,7 @@ macro_rules! call_impl_rtt_http {
};
}
macro_rules! call_impl_rtb_ws {
($func_name:ident, $runner:ident) => {
#[inline]
pub(crate) fn $func_name(
cb: CallbackWrapper,
rt: RuntimeRef,
server_addr: SocketAddr,
client_addr: SocketAddr,
scheme: &str,
ws: HyperWebsocket,
req: hyper::http::request::Parts,
upgrade: UpgradeData,
) -> oneshot::Receiver<WebsocketDetachedTransport> {
let (tx, rx) = oneshot::channel();
let protocol = WebsocketProtocol::new(rt, tx, ws, upgrade);
scope_native_parts!(
req,
server_addr,
client_addr,
path,
query_string,
version,
server,
client
);
Python::with_gil(|py| {
let scope = build_scope_ws(py, &req, version, server, client, scheme, &path, query_string).unwrap();
let _ = $runner::new(py, cb, protocol, scope).run(py);
});
rx
}
};
}
macro_rules! call_impl_rtt_ws {
macro_rules! call_impl_ws {
($func_name:ident, $runner:ident) => {
#[inline]
pub(crate) fn $func_name(
@ -408,11 +337,12 @@ macro_rules! call_impl_rtt_ws {
req: hyper::http::request::Parts,
upgrade: UpgradeData,
) -> oneshot::Receiver<WebsocketDetachedTransport> {
let brt = rt.innerb.clone();
let (tx, rx) = oneshot::channel();
let protocol = WebsocketProtocol::new(rt, tx, ws, upgrade);
let scheme: Arc<str> = scheme.into();
tokio::task::spawn_blocking(move || {
let _ = brt.run(move || {
scope_native_parts!(
req,
server_addr,
@ -435,11 +365,7 @@ macro_rules! call_impl_rtt_ws {
};
}
call_impl_rtb_http!(call_rtb_http, CallbackRunnerHTTP);
call_impl_rtb_http!(call_rtb_http_pyw, CallbackWrappedRunnerHTTP);
call_impl_rtt_http!(call_rtt_http, CallbackRunnerHTTP);
call_impl_rtt_http!(call_rtt_http_pyw, CallbackWrappedRunnerHTTP);
call_impl_rtb_ws!(call_rtb_ws, CallbackRunnerWebsocket);
call_impl_rtb_ws!(call_rtb_ws_pyw, CallbackWrappedRunnerWebsocket);
call_impl_rtt_ws!(call_rtt_ws, CallbackRunnerWebsocket);
call_impl_rtt_ws!(call_rtt_ws_pyw, CallbackWrappedRunnerWebsocket);
call_impl_http!(call_http, CallbackRunnerHTTP);
call_impl_http!(call_http_pyw, CallbackWrappedRunnerHTTP);
call_impl_ws!(call_ws, CallbackRunnerWebsocket);
call_impl_ws!(call_ws_pyw, CallbackWrappedRunnerWebsocket);

View file

@ -3,10 +3,7 @@ use hyper::{header::SERVER as HK_SERVER, http::response::Builder as ResponseBuil
use std::net::SocketAddr;
use tokio::sync::mpsc;
use super::callbacks::{
call_rtb_http, call_rtb_http_pyw, call_rtb_ws, call_rtb_ws_pyw, call_rtt_http, call_rtt_http_pyw, call_rtt_ws,
call_rtt_ws_pyw,
};
use super::callbacks::{call_http, call_http_pyw, call_ws, call_ws_pyw};
use crate::{
callbacks::CallbackWrapper,
http::{empty_body, response_500, HTTPRequest, HTTPResponse, HV_SERVER},
@ -156,11 +153,11 @@ macro_rules! handle_request_with_ws {
};
}
handle_request!(handle_rtt, call_rtt_http);
handle_request!(handle_rtb, call_rtb_http);
handle_request!(handle_rtt_pyw, call_rtt_http_pyw);
handle_request!(handle_rtb_pyw, call_rtb_http_pyw);
handle_request_with_ws!(handle_rtt_ws, call_rtt_http, call_rtt_ws);
handle_request_with_ws!(handle_rtb_ws, call_rtb_http, call_rtb_ws);
handle_request_with_ws!(handle_rtt_ws_pyw, call_rtt_http_pyw, call_rtt_ws_pyw);
handle_request_with_ws!(handle_rtb_ws_pyw, call_rtb_http_pyw, call_rtb_ws_pyw);
handle_request!(handle, call_http);
// handle_request!(handle_rtb, call_rtb_http);
handle_request!(handle_pyw, call_http_pyw);
// handle_request!(handle_rtb_pyw, call_rtb_http_pyw);
handle_request_with_ws!(handle_ws, call_http, call_ws);
// handle_request_with_ws!(handle_rtb_ws, call_rtb_http, call_rtb_ws);
handle_request_with_ws!(handle_ws_pyw, call_http_pyw, call_ws_pyw);
// handle_request_with_ws!(handle_rtb_ws_pyw, call_rtb_http_pyw, call_rtb_ws_pyw);

View file

@ -1,9 +1,6 @@
use pyo3::prelude::*;
use super::http::{
handle_rtb, handle_rtb_pyw, handle_rtb_ws, handle_rtb_ws_pyw, handle_rtt, handle_rtt_pyw, handle_rtt_ws,
handle_rtt_ws_pyw,
};
use super::http::{handle, handle_pyw, handle_ws, handle_ws_pyw};
use crate::conversion::{worker_http1_config_from_py, worker_http2_config_from_py};
use crate::workers::{serve_rth, serve_rth_ssl, serve_wth, serve_wth_ssl, WorkerConfig, WorkerSignal};
@ -14,22 +11,22 @@ pub struct ASGIWorker {
}
impl ASGIWorker {
serve_rth!(_serve_rth, handle_rtb);
serve_rth!(_serve_rth_pyw, handle_rtb_pyw);
serve_rth!(_serve_rth_ws, handle_rtb_ws);
serve_rth!(_serve_rth_ws_pyw, handle_rtb_ws_pyw);
serve_wth!(_serve_wth, handle_rtt);
serve_wth!(_serve_wth_pyw, handle_rtt_pyw);
serve_wth!(_serve_wth_ws, handle_rtt_ws);
serve_wth!(_serve_wth_ws_pyw, handle_rtt_ws_pyw);
serve_rth_ssl!(_serve_rth_ssl, handle_rtb);
serve_rth_ssl!(_serve_rth_ssl_pyw, handle_rtb_pyw);
serve_rth_ssl!(_serve_rth_ssl_ws, handle_rtb_ws);
serve_rth_ssl!(_serve_rth_ssl_ws_pyw, handle_rtb_ws_pyw);
serve_wth_ssl!(_serve_wth_ssl, handle_rtt);
serve_wth_ssl!(_serve_wth_ssl_pyw, handle_rtt_pyw);
serve_wth_ssl!(_serve_wth_ssl_ws, handle_rtt_ws);
serve_wth_ssl!(_serve_wth_ssl_ws_pyw, handle_rtt_ws_pyw);
serve_rth!(_serve_rth, handle);
serve_rth!(_serve_rth_pyw, handle_pyw);
serve_rth!(_serve_rth_ws, handle_ws);
serve_rth!(_serve_rth_ws_pyw, handle_ws_pyw);
serve_wth!(_serve_wth, handle);
serve_wth!(_serve_wth_pyw, handle_pyw);
serve_wth!(_serve_wth_ws, handle_ws);
serve_wth!(_serve_wth_ws_pyw, handle_ws_pyw);
serve_rth_ssl!(_serve_rth_ssl, handle);
serve_rth_ssl!(_serve_rth_ssl_pyw, handle_pyw);
serve_rth_ssl!(_serve_rth_ssl_ws, handle_ws);
serve_rth_ssl!(_serve_rth_ssl_ws_pyw, handle_ws_pyw);
serve_wth_ssl!(_serve_wth_ssl, handle);
serve_wth_ssl!(_serve_wth_ssl_pyw, handle_pyw);
serve_wth_ssl!(_serve_wth_ssl_ws, handle_ws);
serve_wth_ssl!(_serve_wth_ssl_ws_pyw, handle_ws_pyw);
}
#[pymethods]
@ -40,7 +37,8 @@ impl ASGIWorker {
worker_id,
socket_fd,
threads=1,
pthreads=1,
blocking_threads=512,
backpressure=256,
http_mode="1",
http1_opts=None,
http2_opts=None,
@ -56,7 +54,8 @@ impl ASGIWorker {
worker_id: i32,
socket_fd: i32,
threads: usize,
pthreads: usize,
blocking_threads: usize,
backpressure: usize,
http_mode: &str,
http1_opts: Option<PyObject>,
http2_opts: Option<PyObject>,
@ -71,7 +70,8 @@ impl ASGIWorker {
worker_id,
socket_fd,
threads,
pthreads,
blocking_threads,
backpressure,
http_mode,
worker_http1_config_from_py(py, http1_opts)?,
worker_http2_config_from_py(py, http2_opts)?,

50
src/blocking.rs Normal file
View file

@ -0,0 +1,50 @@
use crossbeam_channel as channel;
use std::thread;
pub(crate) struct BlockingTask {
inner: Box<dyn FnOnce() + Send + 'static>,
}
impl BlockingTask {
pub fn new<T>(inner: T) -> BlockingTask
where
T: FnOnce() + Send + 'static,
{
Self { inner: Box::new(inner) }
}
pub fn run(self) {
(self.inner)();
}
}
#[derive(Clone)]
pub(crate) struct BlockingRunner {
queue: channel::Sender<BlockingTask>,
}
impl BlockingRunner {
pub fn new() -> Self {
let queue = blocking_thread();
Self { queue }
}
pub fn run<T>(&self, task: T) -> Result<(), channel::SendError<BlockingTask>>
where
T: FnOnce() + Send + 'static,
{
self.queue.send(BlockingTask::new(task))
}
}
fn bloking_loop(queue: channel::Receiver<BlockingTask>) {
while let Ok(task) = queue.recv() {
task.run();
}
}
fn blocking_thread() -> channel::Sender<BlockingTask> {
let (qtx, qrx) = channel::unbounded();
thread::spawn(|| bloking_loop(qrx));
qtx
}

View file

@ -11,6 +11,7 @@ use std::sync::OnceLock;
mod asgi;
mod asyncio;
mod blocking;
mod callbacks;
mod conversion;
mod http;

View file

@ -256,7 +256,7 @@ impl CallbackWrappedRunnerWebsocket {
}
}
macro_rules! call_impl_rtb_http {
macro_rules! call_impl_http {
($func_name:ident, $runner:ident) => {
#[inline]
pub(crate) fn $func_name(
@ -265,31 +265,11 @@ macro_rules! call_impl_rtb_http {
body: hyper::body::Incoming,
scope: HTTPScope,
) -> oneshot::Receiver<PyResponse> {
let brt = rt.innerb.clone();
let (tx, rx) = oneshot::channel();
let protocol = HTTPProtocol::new(rt, tx, body);
Python::with_gil(|py| {
let _ = $runner::new(py, cb, protocol, scope).run(py);
});
rx
}
};
}
macro_rules! call_impl_rtt_http {
($func_name:ident, $runner:ident) => {
#[inline]
pub(crate) fn $func_name(
cb: CallbackWrapper,
rt: RuntimeRef,
body: hyper::body::Incoming,
scope: HTTPScope,
) -> oneshot::Receiver<PyResponse> {
let (tx, rx) = oneshot::channel();
let protocol = HTTPProtocol::new(rt, tx, body);
tokio::task::spawn_blocking(move || {
let _ = brt.run(|| {
Python::with_gil(|py| {
let _ = $runner::new(py, cb, protocol, scope).run(py);
});
@ -300,7 +280,7 @@ macro_rules! call_impl_rtt_http {
};
}
macro_rules! call_impl_rtb_ws {
macro_rules! call_impl_ws {
($func_name:ident, $runner:ident) => {
#[inline]
pub(crate) fn $func_name(
@ -310,32 +290,11 @@ macro_rules! call_impl_rtb_ws {
upgrade: UpgradeData,
scope: WebsocketScope,
) -> oneshot::Receiver<WebsocketDetachedTransport> {
let brt = rt.innerb.clone();
let (tx, rx) = oneshot::channel();
let protocol = WebsocketProtocol::new(rt, tx, ws, upgrade);
Python::with_gil(|py| {
let _ = $runner::new(py, cb, protocol, scope).run(py);
});
rx
}
};
}
macro_rules! call_impl_rtt_ws {
($func_name:ident, $runner:ident) => {
#[inline]
pub(crate) fn $func_name(
cb: CallbackWrapper,
rt: RuntimeRef,
ws: HyperWebsocket,
upgrade: UpgradeData,
scope: WebsocketScope,
) -> oneshot::Receiver<WebsocketDetachedTransport> {
let (tx, rx) = oneshot::channel();
let protocol = WebsocketProtocol::new(rt, tx, ws, upgrade);
tokio::task::spawn_blocking(move || {
let _ = brt.run(|| {
Python::with_gil(|py| {
let _ = $runner::new(py, cb, protocol, scope).run(py);
});
@ -346,11 +305,7 @@ macro_rules! call_impl_rtt_ws {
};
}
call_impl_rtb_http!(call_rtb_http, CallbackRunnerHTTP);
call_impl_rtb_http!(call_rtb_http_pyw, CallbackWrappedRunnerHTTP);
call_impl_rtt_http!(call_rtt_http, CallbackRunnerHTTP);
call_impl_rtt_http!(call_rtt_http_pyw, CallbackWrappedRunnerHTTP);
call_impl_rtb_ws!(call_rtb_ws, CallbackRunnerWebsocket);
call_impl_rtb_ws!(call_rtb_ws_pyw, CallbackWrappedRunnerWebsocket);
call_impl_rtt_ws!(call_rtt_ws, CallbackRunnerWebsocket);
call_impl_rtt_ws!(call_rtt_ws_pyw, CallbackWrappedRunnerWebsocket);
call_impl_http!(call_http, CallbackRunnerHTTP);
call_impl_http!(call_http_pyw, CallbackWrappedRunnerHTTP);
call_impl_ws!(call_ws, CallbackRunnerWebsocket);
call_impl_ws!(call_ws_pyw, CallbackWrappedRunnerWebsocket);

View file

@ -4,10 +4,7 @@ use std::net::SocketAddr;
use tokio::sync::mpsc;
use super::{
callbacks::{
call_rtb_http, call_rtb_http_pyw, call_rtb_ws, call_rtb_ws_pyw, call_rtt_http, call_rtt_http_pyw, call_rtt_ws,
call_rtt_ws_pyw,
},
callbacks::{call_http, call_http_pyw, call_ws, call_ws_pyw},
types::{PyResponse, RSGIHTTPScope as HTTPScope, RSGIWebsocketScope as WebsocketScope},
};
use crate::{
@ -138,11 +135,7 @@ macro_rules! handle_request_with_ws {
};
}
handle_request!(handle_rtt, call_rtt_http);
handle_request!(handle_rtb, call_rtb_http);
handle_request!(handle_rtt_pyw, call_rtt_http_pyw);
handle_request!(handle_rtb_pyw, call_rtb_http_pyw);
handle_request_with_ws!(handle_rtt_ws, call_rtt_http, call_rtt_ws);
handle_request_with_ws!(handle_rtb_ws, call_rtb_http, call_rtb_ws);
handle_request_with_ws!(handle_rtt_ws_pyw, call_rtt_http_pyw, call_rtt_ws_pyw);
handle_request_with_ws!(handle_rtb_ws_pyw, call_rtb_http_pyw, call_rtb_ws_pyw);
handle_request!(handle, call_http);
handle_request!(handle_pyw, call_http_pyw);
handle_request_with_ws!(handle_ws, call_http, call_ws);
handle_request_with_ws!(handle_ws_pyw, call_http_pyw, call_ws_pyw);

View file

@ -1,9 +1,6 @@
use pyo3::prelude::*;
use super::http::{
handle_rtb, handle_rtb_pyw, handle_rtb_ws, handle_rtb_ws_pyw, handle_rtt, handle_rtt_pyw, handle_rtt_ws,
handle_rtt_ws_pyw,
};
use super::http::{handle, handle_pyw, handle_ws, handle_ws_pyw};
use crate::conversion::{worker_http1_config_from_py, worker_http2_config_from_py};
use crate::workers::{serve_rth, serve_rth_ssl, serve_wth, serve_wth_ssl, WorkerConfig, WorkerSignal};
@ -14,22 +11,22 @@ pub struct RSGIWorker {
}
impl RSGIWorker {
serve_rth!(_serve_rth, handle_rtb);
serve_rth!(_serve_rth_pyw, handle_rtb_pyw);
serve_rth!(_serve_rth_ws, handle_rtb_ws);
serve_rth!(_serve_rth_ws_pyw, handle_rtb_ws_pyw);
serve_wth!(_serve_wth, handle_rtt);
serve_wth!(_serve_wth_pyw, handle_rtt_pyw);
serve_wth!(_serve_wth_ws, handle_rtt_ws);
serve_wth!(_serve_wth_ws_pyw, handle_rtt_ws_pyw);
serve_rth_ssl!(_serve_rth_ssl, handle_rtb);
serve_rth_ssl!(_serve_rth_ssl_pyw, handle_rtb_pyw);
serve_rth_ssl!(_serve_rth_ssl_ws, handle_rtb_ws);
serve_rth_ssl!(_serve_rth_ssl_ws_pyw, handle_rtb_ws_pyw);
serve_wth_ssl!(_serve_wth_ssl, handle_rtt);
serve_wth_ssl!(_serve_wth_ssl_pyw, handle_rtt_pyw);
serve_wth_ssl!(_serve_wth_ssl_ws, handle_rtt_ws);
serve_wth_ssl!(_serve_wth_ssl_ws_pyw, handle_rtt_ws_pyw);
serve_rth!(_serve_rth, handle);
serve_rth!(_serve_rth_pyw, handle_pyw);
serve_rth!(_serve_rth_ws, handle_ws);
serve_rth!(_serve_rth_ws_pyw, handle_ws_pyw);
serve_wth!(_serve_wth, handle);
serve_wth!(_serve_wth_pyw, handle_pyw);
serve_wth!(_serve_wth_ws, handle_ws);
serve_wth!(_serve_wth_ws_pyw, handle_ws_pyw);
serve_rth_ssl!(_serve_rth_ssl, handle);
serve_rth_ssl!(_serve_rth_ssl_pyw, handle_pyw);
serve_rth_ssl!(_serve_rth_ssl_ws, handle_ws);
serve_rth_ssl!(_serve_rth_ssl_ws_pyw, handle_ws_pyw);
serve_wth_ssl!(_serve_wth_ssl, handle);
serve_wth_ssl!(_serve_wth_ssl_pyw, handle_pyw);
serve_wth_ssl!(_serve_wth_ssl_ws, handle_ws);
serve_wth_ssl!(_serve_wth_ssl_ws_pyw, handle_ws_pyw);
}
#[pymethods]
@ -40,7 +37,8 @@ impl RSGIWorker {
worker_id,
socket_fd,
threads=1,
pthreads=1,
blocking_threads=512,
backpressure=256,
http_mode="1",
http1_opts=None,
http2_opts=None,
@ -56,7 +54,8 @@ impl RSGIWorker {
worker_id: i32,
socket_fd: i32,
threads: usize,
pthreads: usize,
blocking_threads: usize,
backpressure: usize,
http_mode: &str,
http1_opts: Option<PyObject>,
http2_opts: Option<PyObject>,
@ -71,7 +70,8 @@ impl RSGIWorker {
worker_id,
socket_fd,
threads,
pthreads,
blocking_threads,
backpressure,
http_mode,
worker_http1_config_from_py(py, http1_opts)?,
worker_http2_config_from_py(py, http2_opts)?,

View file

@ -11,6 +11,7 @@ use tokio::{
};
use super::asyncio::{copy_context, get_running_loop};
use super::blocking::BlockingRunner;
use super::callbacks::PyEmptyAwaitable;
#[cfg(unix)]
use super::callbacks::PyFutureAwaitable;
@ -76,6 +77,8 @@ pub trait Runtime: Send + 'static {
F: Future<Output = ()> + Send + 'static;
fn handler(&self) -> RuntimeRef;
fn blocking(&self) -> BlockingRunner;
}
pub trait ContextExt: Runtime {
@ -102,32 +105,38 @@ pub trait LocalContextExt: Runtime {
pub(crate) struct RuntimeWrapper {
rt: tokio::runtime::Runtime,
br: BlockingRunner,
}
impl RuntimeWrapper {
pub fn new(blocking_threads: usize) -> Self {
Self {
rt: default_runtime(blocking_threads),
br: BlockingRunner::new(),
}
}
pub fn with_runtime(rt: tokio::runtime::Runtime) -> Self {
Self { rt }
Self {
rt,
br: BlockingRunner::new(),
}
}
pub fn handler(&self) -> RuntimeRef {
RuntimeRef::new(self.rt.handle().clone())
RuntimeRef::new(self.rt.handle().clone(), self.br.clone())
}
}
#[derive(Clone)]
pub struct RuntimeRef {
pub inner: tokio::runtime::Handle,
pub innerb: BlockingRunner,
}
impl RuntimeRef {
pub fn new(rt: tokio::runtime::Handle) -> Self {
Self { inner: rt }
pub fn new(rt: tokio::runtime::Handle, br: BlockingRunner) -> Self {
Self { inner: rt, innerb: br }
}
}
@ -149,7 +158,11 @@ impl Runtime for RuntimeRef {
}
fn handler(&self) -> RuntimeRef {
RuntimeRef::new(self.inner.clone())
RuntimeRef::new(self.inner.clone(), self.innerb.clone())
}
fn blocking(&self) -> BlockingRunner {
self.innerb.clone()
}
}
@ -243,14 +256,17 @@ pub(crate) fn future_into_py_iter<R, F, T>(rt: R, py: Python, fut: F) -> PyResul
where
R: Runtime + ContextExt + Clone,
F: Future<Output = PyResult<T>> + Send + 'static,
T: IntoPy<PyObject>,
T: IntoPy<PyObject> + Send + 'static,
{
let aw = Py::new(py, PyIterAwaitable::new())?;
let py_fut = aw.clone_ref(py);
let rb = rt.blocking();
rt.spawn(async move {
let result = fut.await;
aw.get().set_result(result);
let _ = rb.run(move || {
aw.get().set_result(result);
});
});
Ok(py_fut.into_any().into_bound(py))
@ -266,7 +282,7 @@ pub(crate) fn future_into_py_iter<R, F, T>(rt: R, py: Python, fut: F) -> PyResul
where
R: Runtime + ContextExt + Clone,
F: Future<Output = PyResult<T>> + Send + 'static,
T: IntoPy<PyObject>,
T: IntoPy<PyObject> + Send + 'static,
{
future_into_py_futlike(rt, py, fut)
}
@ -282,17 +298,22 @@ pub(crate) fn future_into_py_futlike<R, F, T>(rt: R, py: Python, fut: F) -> PyRe
where
R: Runtime + ContextExt + Clone,
F: Future<Output = PyResult<T>> + Send + 'static,
T: IntoPy<PyObject>,
T: IntoPy<PyObject> + Send + 'static,
{
let task_locals = get_current_locals::<R>(py)?;
let event_loop = task_locals.event_loop(py).to_object(py);
let (aw, cancel_tx) = PyFutureAwaitable::new(event_loop).to_spawn(py)?;
let aw_ref = aw.clone_ref(py);
let py_fut = aw.clone_ref(py);
let rb = rt.blocking();
rt.spawn(async move {
tokio::select! {
result = fut => aw.get().set_result(result, aw_ref),
result = fut => {
let _ = rb.run(move || {
aw.get().set_result(result, aw_ref);
});
},
() = cancel_tx.notified() => {}
}
});
@ -306,12 +327,13 @@ pub(crate) fn future_into_py_futlike<R, F, T>(rt: R, py: Python, fut: F) -> PyRe
where
R: Runtime + ContextExt + Clone,
F: Future<Output = PyResult<T>> + Send + 'static,
T: IntoPy<PyObject>,
T: IntoPy<PyObject> + Send + 'static,
{
let task_locals = get_current_locals::<R>(py)?;
let event_loop = task_locals.event_loop(py);
let event_loop_ref = event_loop.to_object(py);
let cancel_tx = Arc::new(tokio::sync::Notify::new());
let rb = rt.blocking();
let py_fut = event_loop.call_method0(pyo3::intern!(py, "create_future"))?;
py_fut.call_method1(
@ -325,12 +347,14 @@ where
rt.spawn(async move {
tokio::select! {
result = fut => {
Python::with_gil(|py| {
let (cb, value) = match result {
Ok(val) => (fut_ref.getattr(py, pyo3::intern!(py, "set_result")).unwrap(), val.into_py(py)),
Err(err) => (fut_ref.getattr(py, pyo3::intern!(py, "set_exception")).unwrap(), err.into_py(py))
};
let _ = event_loop_ref.call_method1(py, pyo3::intern!(py, "call_soon_threadsafe"), (PyFutureResultSetter, cb, value));
let _ = rb.run(move || {
Python::with_gil(|py| {
let (cb, value) = match result {
Ok(val) => (fut_ref.getattr(py, pyo3::intern!(py, "set_result")).unwrap(), val.into_py(py)),
Err(err) => (fut_ref.getattr(py, pyo3::intern!(py, "set_exception")).unwrap(), err.into_py(py))
};
let _ = event_loop_ref.call_method1(py, pyo3::intern!(py, "call_soon_threadsafe"), (PyFutureResultSetter, cb, value));
});
});
},
() = cancel_tx.notified() => {}

View file

@ -58,7 +58,8 @@ pub(crate) struct WorkerConfig {
pub id: i32,
socket_fd: i32,
pub threads: usize,
pub pthreads: usize,
pub blocking_threads: usize,
pub backpressure: usize,
pub http_mode: String,
pub http1_opts: HTTP1Config,
pub http2_opts: HTTP2Config,
@ -74,7 +75,8 @@ impl WorkerConfig {
id: i32,
socket_fd: i32,
threads: usize,
pthreads: usize,
blocking_threads: usize,
backpressure: usize,
http_mode: &str,
http1_opts: HTTP1Config,
http2_opts: HTTP2Config,
@ -88,7 +90,8 @@ impl WorkerConfig {
id,
socket_fd,
threads,
pthreads,
blocking_threads,
backpressure,
http_mode: http_mode.into(),
http1_opts,
http2_opts,
@ -174,15 +177,20 @@ macro_rules! build_service {
}
macro_rules! handle_connection_loop {
($tcp_listener:expr, $quit_signal:expr, $inner:expr) => {
($tcp_listener:expr, $quit_signal:expr, $backpressure:expr, $inner:expr) => {
let tcp_listener = tokio::net::TcpListener::from_std($tcp_listener).unwrap();
let local_addr = tcp_listener.local_addr().unwrap();
let mut accept_loop = true;
let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new($backpressure));
while accept_loop {
let semaphore = semaphore.clone();
tokio::select! {
Ok((stream, remote_addr)) = tcp_listener.accept() => {
$inner(local_addr, remote_addr, stream)
(permit, Ok((stream, remote_addr))) = async {
let permit = semaphore.acquire_owned().await.unwrap();
(permit, tcp_listener.accept().await)
} => {
$inner(local_addr, remote_addr, stream, permit)
},
_ = $quit_signal => {
accept_loop = false;
@ -193,16 +201,21 @@ macro_rules! handle_connection_loop {
}
macro_rules! handle_tls_loop {
($tcp_listener:expr, $tls_config:expr, $quit_signal:expr, $inner:expr) => {
($tcp_listener:expr, $tls_config:expr, $quit_signal:expr, $backpressure:expr, $inner:expr) => {
let (mut tls_listener, local_addr) = crate::tls::tls_listener($tls_config.into(), $tcp_listener).unwrap();
let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new($backpressure));
let mut accept_loop = true;
while accept_loop {
let semaphore = semaphore.clone();
tokio::select! {
accept = tls_listener.accept() => {
(permit, accept) = async {
let permit = semaphore.acquire_owned().await.unwrap();
(permit, tls_listener.accept().await)
} => {
match accept {
Ok((stream, remote_addr)) => {
$inner(local_addr, remote_addr, stream)
$inner(local_addr, remote_addr, stream, permit)
},
Err(err) => {
log::info!("TLS handshake failed with {:?}", err);
@ -219,7 +232,7 @@ macro_rules! handle_tls_loop {
macro_rules! handle_connection_http1 {
($rth:expr, $callback:expr, $spawner:expr, $stream_wrapper:expr, $proto:expr, $http_opts:expr, $target:expr) => {
|local_addr, remote_addr, stream| {
|local_addr, remote_addr, stream, permit| {
let rth = $rth.clone();
let callback_wrapper = $callback.clone();
$spawner(async move {
@ -230,6 +243,7 @@ macro_rules! handle_connection_http1 {
conn.max_buf_size($http_opts.max_buffer_size);
conn.pipeline_flush($http_opts.pipeline_flush);
let _ = conn.serve_connection($stream_wrapper(stream), svc).await;
drop(permit);
});
}
};
@ -237,7 +251,7 @@ macro_rules! handle_connection_http1 {
macro_rules! handle_connection_http1_upgrades {
($rth:expr, $callback:expr, $spawner:expr, $stream_wrapper:expr, $proto:expr, $http_opts:expr, $target:expr) => {
|local_addr, remote_addr, stream| {
|local_addr, remote_addr, stream, permit| {
let rth = $rth.clone();
let callback_wrapper = $callback.clone();
$spawner(async move {
@ -251,6 +265,7 @@ macro_rules! handle_connection_http1_upgrades {
.serve_connection($stream_wrapper(stream), svc)
.with_upgrades()
.await;
drop(permit);
});
}
};
@ -258,7 +273,7 @@ macro_rules! handle_connection_http1_upgrades {
macro_rules! handle_connection_http2 {
($rth:expr, $callback:expr, $spawner:expr, $executor_builder:expr, $stream_wrapper:expr, $proto:expr, $http_opts:expr, $target:expr) => {
|local_addr, remote_addr, stream| {
|local_addr, remote_addr, stream, permit| {
let rth = $rth.clone();
let callback_wrapper = $callback.clone();
$spawner(async move {
@ -275,6 +290,7 @@ macro_rules! handle_connection_http2 {
conn.max_header_list_size($http_opts.max_headers_size);
conn.max_send_buf_size($http_opts.max_send_buffer_size);
let _ = conn.serve_connection($stream_wrapper(stream), svc).await;
drop(permit);
});
}
};
@ -282,7 +298,7 @@ macro_rules! handle_connection_http2 {
macro_rules! handle_connection_httpa {
($rth:expr, $callback:expr, $spawner:expr, $executor_builder:expr, $conn_method:ident, $stream_wrapper:expr, $proto:expr, $http1_opts:expr, $http2_opts:expr, $target:expr) => {
|local_addr, remote_addr, stream| {
|local_addr, remote_addr, stream, permit| {
let rth = $rth.clone();
let callback_wrapper = $callback.clone();
$spawner(async move {
@ -304,6 +320,7 @@ macro_rules! handle_connection_httpa {
conn.http2().max_header_list_size($http2_opts.max_headers_size);
conn.http2().max_send_buf_size($http2_opts.max_send_buffer_size);
let _ = conn.$conn_method($stream_wrapper(stream), svc).await;
drop(permit);
});
}
};
@ -319,7 +336,7 @@ macro_rules! serve_rth {
signal: Py<crate::workers::WorkerSignal>,
) {
pyo3_log::init();
let rt = crate::runtime::init_runtime_mt(self.config.threads, self.config.pthreads);
let rt = crate::runtime::init_runtime_mt(self.config.threads, self.config.blocking_threads);
let rth = rt.handler();
let tcp_listener = self.config.tcp_listener();
@ -327,6 +344,7 @@ macro_rules! serve_rth {
let http_upgrades = self.config.websockets_enabled;
let http1_opts = self.config.http1_opts.clone();
let http2_opts = self.config.http2_opts.clone();
let backpressure = self.config.backpressure.clone();
let callback_wrapper = crate::callbacks::CallbackWrapper::new(callback, event_loop.clone(), context);
let mut pyrx = signal.get().rx.lock().unwrap().take().unwrap();
@ -339,6 +357,7 @@ macro_rules! serve_rth {
crate::workers::handle_connection_loop!(
tcp_listener,
pyrx.changed(),
backpressure,
crate::workers::handle_connection_httpa!(
rth,
callback_wrapper,
@ -357,6 +376,7 @@ macro_rules! serve_rth {
crate::workers::handle_connection_loop!(
tcp_listener,
pyrx.changed(),
backpressure,
crate::workers::handle_connection_httpa!(
rth,
callback_wrapper,
@ -375,6 +395,7 @@ macro_rules! serve_rth {
crate::workers::handle_connection_loop!(
tcp_listener,
pyrx.changed(),
backpressure,
crate::workers::handle_connection_http1_upgrades!(
rth,
callback_wrapper,
@ -390,6 +411,7 @@ macro_rules! serve_rth {
crate::workers::handle_connection_loop!(
tcp_listener,
pyrx.changed(),
backpressure,
crate::workers::handle_connection_http1!(
rth,
callback_wrapper,
@ -405,6 +427,7 @@ macro_rules! serve_rth {
crate::workers::handle_connection_loop!(
tcp_listener,
pyrx.changed(),
backpressure,
crate::workers::handle_connection_http2!(
rth,
callback_wrapper,
@ -445,7 +468,7 @@ macro_rules! serve_rth_ssl {
signal: Py<crate::workers::WorkerSignal>,
) {
pyo3_log::init();
let rt = crate::runtime::init_runtime_mt(self.config.threads, self.config.pthreads);
let rt = crate::runtime::init_runtime_mt(self.config.threads, self.config.blocking_threads);
let rth = rt.handler();
let tcp_listener = self.config.tcp_listener();
@ -453,6 +476,7 @@ macro_rules! serve_rth_ssl {
let http_upgrades = self.config.websockets_enabled;
let http1_opts = self.config.http1_opts.clone();
let http2_opts = self.config.http2_opts.clone();
let backpressure = self.config.backpressure.clone();
let tls_cfg = self.config.tls_cfg();
let callback_wrapper = crate::callbacks::CallbackWrapper::new(callback, event_loop.clone(), context);
let mut pyrx = signal.get().rx.lock().unwrap().take().unwrap();
@ -467,6 +491,7 @@ macro_rules! serve_rth_ssl {
tcp_listener,
tls_cfg,
pyrx.changed(),
backpressure,
crate::workers::handle_connection_httpa!(
rth,
callback_wrapper,
@ -486,6 +511,7 @@ macro_rules! serve_rth_ssl {
tcp_listener,
tls_cfg,
pyrx.changed(),
backpressure,
crate::workers::handle_connection_httpa!(
rth,
callback_wrapper,
@ -505,6 +531,7 @@ macro_rules! serve_rth_ssl {
tcp_listener,
tls_cfg,
pyrx.changed(),
backpressure,
crate::workers::handle_connection_http1_upgrades!(
rth,
callback_wrapper,
@ -521,6 +548,7 @@ macro_rules! serve_rth_ssl {
tcp_listener,
tls_cfg,
pyrx.changed(),
backpressure,
crate::workers::handle_connection_http1!(
rth,
callback_wrapper,
@ -537,6 +565,7 @@ macro_rules! serve_rth_ssl {
tcp_listener,
tls_cfg,
pyrx.changed(),
backpressure,
crate::workers::handle_connection_http2!(
rth,
callback_wrapper,
@ -595,12 +624,13 @@ macro_rules! serve_wth {
let http_upgrades = self.config.websockets_enabled;
let http1_opts = self.config.http1_opts.clone();
let http2_opts = self.config.http2_opts.clone();
let pthreads = self.config.pthreads.clone();
let blocking_threads = self.config.blocking_threads.clone();
let backpressure = self.config.backpressure.clone();
let callback_wrapper = callback_wrapper.clone();
let mut srx = srx.clone();
workers.push(std::thread::spawn(move || {
let rt = crate::runtime::init_runtime_st(pthreads);
let rt = crate::runtime::init_runtime_st(blocking_threads);
let rth = rt.handler();
let local = tokio::task::LocalSet::new();
@ -610,6 +640,7 @@ macro_rules! serve_wth {
crate::workers::handle_connection_loop!(
tcp_listener,
srx.changed(),
backpressure,
crate::workers::handle_connection_httpa!(
rth,
callback_wrapper,
@ -628,6 +659,7 @@ macro_rules! serve_wth {
crate::workers::handle_connection_loop!(
tcp_listener,
srx.changed(),
backpressure,
crate::workers::handle_connection_httpa!(
rth,
callback_wrapper,
@ -646,6 +678,7 @@ macro_rules! serve_wth {
crate::workers::handle_connection_loop!(
tcp_listener,
srx.changed(),
backpressure,
crate::workers::handle_connection_http1_upgrades!(
rth,
callback_wrapper,
@ -661,6 +694,7 @@ macro_rules! serve_wth {
crate::workers::handle_connection_loop!(
tcp_listener,
srx.changed(),
backpressure,
crate::workers::handle_connection_http1!(
rth,
callback_wrapper,
@ -676,6 +710,7 @@ macro_rules! serve_wth {
crate::workers::handle_connection_loop!(
tcp_listener,
srx.changed(),
backpressure,
crate::workers::handle_connection_http2!(
rth,
callback_wrapper,
@ -748,12 +783,13 @@ macro_rules! serve_wth_ssl {
let http1_opts = self.config.http1_opts.clone();
let http2_opts = self.config.http2_opts.clone();
let tls_cfg = self.config.tls_cfg();
let pthreads = self.config.pthreads.clone();
let blocking_threads = self.config.blocking_threads.clone();
let backpressure = self.config.backpressure.clone();
let callback_wrapper = callback_wrapper.clone();
let mut srx = srx.clone();
workers.push(std::thread::spawn(move || {
let rt = crate::runtime::init_runtime_st(pthreads);
let rt = crate::runtime::init_runtime_st(blocking_threads);
let rth = rt.handler();
let local = tokio::task::LocalSet::new();
@ -764,6 +800,7 @@ macro_rules! serve_wth_ssl {
tcp_listener,
tls_cfg,
srx.changed(),
backpressure,
crate::workers::handle_connection_httpa!(
rth,
callback_wrapper,
@ -783,6 +820,7 @@ macro_rules! serve_wth_ssl {
tcp_listener,
tls_cfg,
srx.changed(),
backpressure,
crate::workers::handle_connection_httpa!(
rth,
callback_wrapper,
@ -802,6 +840,7 @@ macro_rules! serve_wth_ssl {
tcp_listener,
tls_cfg,
srx.changed(),
backpressure,
crate::workers::handle_connection_http1_upgrades!(
rth,
callback_wrapper,
@ -818,6 +857,7 @@ macro_rules! serve_wth_ssl {
tcp_listener,
tls_cfg,
srx.changed(),
backpressure,
crate::workers::handle_connection_http1!(
rth,
callback_wrapper,
@ -834,6 +874,7 @@ macro_rules! serve_wth_ssl {
tcp_listener,
tls_cfg,
srx.changed(),
backpressure,
crate::workers::handle_connection_http2!(
rth,
callback_wrapper,

View file

@ -25,7 +25,8 @@ impl WSGIWorker {
worker_id,
socket_fd,
threads=1,
pthreads=1,
blocking_threads=512,
backpressure=128,
http_mode="1",
http1_opts=None,
http2_opts=None,
@ -39,7 +40,8 @@ impl WSGIWorker {
worker_id: i32,
socket_fd: i32,
threads: usize,
pthreads: usize,
blocking_threads: usize,
backpressure: usize,
http_mode: &str,
http1_opts: Option<PyObject>,
http2_opts: Option<PyObject>,
@ -52,7 +54,8 @@ impl WSGIWorker {
worker_id,
socket_fd,
threads,
pthreads,
blocking_threads,
backpressure,
http_mode,
worker_http1_config_from_py(py, http1_opts)?,
worker_http2_config_from_py(py, http2_opts)?,