mirror of
https://github.com/emmett-framework/granian.git
synced 2025-07-08 03:45:41 +00:00
100 lines
2 KiB
Python
100 lines
2 KiB
Python
import asyncio
|
|
import pathlib
|
|
import sys
|
|
|
|
|
|
HEADERS = [('content-type', 'text/plain; charset=utf-8')]
|
|
HEADERS_MEDIA = [('content-type', 'image/jpeg'), ('content-length', '50486')]
|
|
|
|
BODY_BYTES = {
|
|
10: b'x' * 10,
|
|
1000: b'x' * 1024,
|
|
10_000: b'x' * 1024 * 10,
|
|
100_000: b'x' * 1024 * 100,
|
|
}
|
|
BODY_STR = {
|
|
10: 'x' * 10,
|
|
1000: 'x' * 1024,
|
|
10_000: 'x' * 1024 * 10,
|
|
100_000: 'x' * 1024 * 100,
|
|
}
|
|
|
|
MEDIA_PATH = str(pathlib.Path(__file__).parent / 'assets' / 'media.jpg')
|
|
|
|
|
|
def b_builder(size):
|
|
body = BODY_BYTES[size]
|
|
|
|
async def route(scope, proto):
|
|
proto.response_bytes(200, HEADERS, body)
|
|
|
|
return route
|
|
|
|
|
|
def s_builder(size):
|
|
body = BODY_STR[size]
|
|
|
|
async def route(scope, proto):
|
|
proto.response_str(200, HEADERS, body)
|
|
|
|
return route
|
|
|
|
|
|
async def echo(scope, proto):
|
|
proto.response_bytes(200, HEADERS, await proto())
|
|
|
|
|
|
async def echo_iter(scope, proto):
|
|
trx = proto.response_stream(200, HEADERS)
|
|
async for chunk in proto:
|
|
await trx.send_bytes(chunk)
|
|
|
|
|
|
async def file(scope, proto):
|
|
proto.response_file(200, HEADERS_MEDIA, MEDIA_PATH)
|
|
|
|
|
|
def io_builder(wait):
|
|
wait = wait / 1000
|
|
|
|
async def io(scope, proto):
|
|
await asyncio.sleep(wait)
|
|
proto.response_bytes(200, HEADERS, BODY_BYTES[10])
|
|
|
|
return io
|
|
|
|
|
|
async def handle_404(scope, proto):
|
|
proto.response_str(404, HEADERS, 'not found')
|
|
|
|
|
|
routes = {
|
|
'/b10': b_builder(10),
|
|
'/b1k': b_builder(1000),
|
|
'/b10k': b_builder(10_000),
|
|
'/b100k': b_builder(100_000),
|
|
'/s10': s_builder(10),
|
|
'/s1k': s_builder(1000),
|
|
'/s10k': s_builder(10_000),
|
|
'/s100k': s_builder(100_000),
|
|
'/echo': echo,
|
|
'/echoi': echo_iter,
|
|
'/fp': file,
|
|
'/io10': io_builder(10),
|
|
'/io100': io_builder(100),
|
|
}
|
|
|
|
|
|
def app(scope, proto):
|
|
handler = routes.get(scope.path, handle_404)
|
|
return handler(scope, proto)
|
|
|
|
|
|
def granian(wrk, thr):
|
|
from granian import Granian
|
|
|
|
Granian('rsgi:app', workers=int(wrk), threads=int(thr), interface='rsgi').serve()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
granian(sys.argv[1], sys.argv[2])
|