granian/benchmarks/app/rsgi.py
Giovanni Barillari 7d48aa7c06
Upgrade benchmark suite (#551)
* Review benchmarks suite

* Update benchmarks
2025-04-07 22:34:29 +02:00

100 lines
2 KiB
Python

import asyncio
import pathlib
import sys
HEADERS = [('content-type', 'text/plain; charset=utf-8')]
HEADERS_MEDIA = [('content-type', 'image/jpeg'), ('content-length', '50486')]
BODY_BYTES = {
10: b'x' * 10,
1000: b'x' * 1024,
10_000: b'x' * 1024 * 10,
100_000: b'x' * 1024 * 100,
}
BODY_STR = {
10: 'x' * 10,
1000: 'x' * 1024,
10_000: 'x' * 1024 * 10,
100_000: 'x' * 1024 * 100,
}
MEDIA_PATH = str(pathlib.Path(__file__).parent / 'assets' / 'media.jpg')
def b_builder(size):
body = BODY_BYTES[size]
async def route(scope, proto):
proto.response_bytes(200, HEADERS, body)
return route
def s_builder(size):
body = BODY_STR[size]
async def route(scope, proto):
proto.response_str(200, HEADERS, body)
return route
async def echo(scope, proto):
proto.response_bytes(200, HEADERS, await proto())
async def echo_iter(scope, proto):
trx = proto.response_stream(200, HEADERS)
async for chunk in proto:
await trx.send_bytes(chunk)
async def file(scope, proto):
proto.response_file(200, HEADERS_MEDIA, MEDIA_PATH)
def io_builder(wait):
wait = wait / 1000
async def io(scope, proto):
await asyncio.sleep(wait)
proto.response_bytes(200, HEADERS, BODY_BYTES[10])
return io
async def handle_404(scope, proto):
proto.response_str(404, HEADERS, 'not found')
routes = {
'/b10': b_builder(10),
'/b1k': b_builder(1000),
'/b10k': b_builder(10_000),
'/b100k': b_builder(100_000),
'/s10': s_builder(10),
'/s1k': s_builder(1000),
'/s10k': s_builder(10_000),
'/s100k': s_builder(100_000),
'/echo': echo,
'/echoi': echo_iter,
'/fp': file,
'/io10': io_builder(10),
'/io100': io_builder(100),
}
def app(scope, proto):
handler = routes.get(scope.path, handle_404)
return handler(scope, proto)
def granian(wrk, thr):
from granian import Granian
Granian('rsgi:app', workers=int(wrk), threads=int(thr), interface='rsgi').serve()
if __name__ == '__main__':
granian(sys.argv[1], sys.argv[2])