From dc44b13396c57d0ac425de44f2bf3e248246cf4a Mon Sep 17 00:00:00 2001 From: Giovanni Barillari Date: Fri, 15 Apr 2022 18:45:01 +0200 Subject: [PATCH] first implementation --- .gitignore | 10 + Cargo.lock | 732 ++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 55 ++++ LICENSE | 28 ++ granian/__init__.py | 7 + granian/_futures.py | 6 + granian/_internal.py | 70 ++++ granian/_loops.py | 88 +++++ granian/asgi.py | 72 +++++ granian/asgi.pyi | 20 ++ granian/constants.py | 6 + granian/net.py | 9 + granian/rsgi.py | 47 +++ granian/rsgi.pyi | 26 ++ granian/server.py | 191 +++++++++++ pyproject.toml | 16 + src/asgi/callbacks.rs | 80 +++++ src/asgi/errors.rs | 54 ++++ src/asgi/http.rs | 46 +++ src/asgi/io.rs | 186 +++++++++++ src/asgi/mod.rs | 18 ++ src/asgi/serve.rs | 81 +++++ src/asgi/types.rs | 80 +++++ src/callbacks.rs | 16 + src/lib.rs | 22 ++ src/rsgi/callbacks.rs | 83 +++++ src/rsgi/http.rs | 183 +++++++++++ src/rsgi/io.rs | 37 +++ src/rsgi/mod.rs | 16 + src/rsgi/serve.rs | 76 +++++ src/rsgi/types.rs | 128 ++++++++ src/tcp.rs | 181 +++++++++++ src/workers.rs | 56 ++++ 33 files changed, 2726 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 LICENSE create mode 100644 granian/__init__.py create mode 100644 granian/_futures.py create mode 100644 granian/_internal.py create mode 100644 granian/_loops.py create mode 100644 granian/asgi.py create mode 100644 granian/asgi.pyi create mode 100644 granian/constants.py create mode 100644 granian/net.py create mode 100644 granian/rsgi.py create mode 100644 granian/rsgi.pyi create mode 100644 granian/server.py create mode 100644 pyproject.toml create mode 100644 src/asgi/callbacks.rs create mode 100644 src/asgi/errors.rs create mode 100644 src/asgi/http.rs create mode 100644 src/asgi/io.rs create mode 100644 src/asgi/mod.rs create mode 100644 src/asgi/serve.rs create mode 100644 src/asgi/types.rs create mode 100644 src/callbacks.rs create mode 100644 src/lib.rs create mode 100644 src/rsgi/callbacks.rs create mode 100644 src/rsgi/http.rs create mode 100644 src/rsgi/io.rs create mode 100644 src/rsgi/mod.rs create mode 100644 src/rsgi/serve.rs create mode 100644 src/rsgi/types.rs create mode 100644 src/tcp.rs create mode 100644 src/workers.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b30452d --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +.DS_Store +*.pyc +__pycache__ + +*.sublime-* +.venv +.vscode + +granian/*.so +target/* diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..9192e75 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,732 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "bytes" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" + +[[package]] +name = "cc" +version = "1.0.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "futures" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" + +[[package]] +name = "futures-executor" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" + +[[package]] +name = "futures-macro" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" + +[[package]] +name = "futures-task" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" + +[[package]] +name = "futures-util" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "granian" +version = "0.1.0" +dependencies = [ + "bytes", + "hyper", + "mimalloc", + "pyo3", + "pyo3-asyncio", + "socket2", + "tokio", + "tokio-util", +] + +[[package]] +name = "h2" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37a82c6d637fc9515a4694bbf1cb2457b79d81ce52b3108bdeea58b07dd34a57" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "hashbrown" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" + +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + +[[package]] +name = "http" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6330e8a36bd8c859f3fa6d9382911fbb7147ec39807f63b923933a247240b9ba" + +[[package]] +name = "httpdate" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" + +[[package]] +name = "hyper" +version = "0.14.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "indexmap" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f647032dfaa1f8b6dc29bd3edb7bbef4861b8b8007ebb118d6db284fd59f6ee" +dependencies = [ + "autocfg", + "hashbrown", +] + +[[package]] +name = "indoc" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7906a9fababaeacb774f72410e497a1d18de916322e33797bb2cd29baa23c9e" +dependencies = [ + "unindent", +] + +[[package]] +name = "itoa" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.123" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb691a747a7ab48abc15c5b42066eaafde10dc427e3b6ee2a1cf43db04c763bd" + +[[package]] +name = "libmimalloc-sys" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7705fc40f6ed493f73584abbb324e74f96b358ff60dfe5659a0f8fc12c590a69" +dependencies = [ + "cc", +] + +[[package]] +name = "lock_api" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "memchr" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" + +[[package]] +name = "mimalloc" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0dfa131390c2f6bdb3242f65ff271fcdaca5ff7b6c08f28398be7f2280e3926" +dependencies = [ + "libmimalloc-sys", +] + +[[package]] +name = "mio" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52da4364ffb0e4fe33a9841a98a3f3014fb964045ce4f7a45a398243c8d6b0c9" +dependencies = [ + "libc", + "log", + "miow", + "ntapi", + "wasi", + "winapi", +] + +[[package]] +name = "miow" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" +dependencies = [ + "winapi", +] + +[[package]] +name = "ntapi" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f" +dependencies = [ + "winapi", +] + +[[package]] +name = "num_cpus" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "once_cell" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" + +[[package]] +name = "parking_lot" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "995f667a6c822200b0433ac218e05582f0e2efa1b922a3fd2fbaadc5f87bab37" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-sys", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "proc-macro2" +version = "1.0.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec757218438d5fda206afc041538b2f6d889286160d649a86a24d37e1235afd1" +dependencies = [ + "unicode-xid", +] + +[[package]] +name = "pyo3" +version = "0.16.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd86513975ed69bf3fb5d4a286cdcda66dbc56f84bdf4832b6c82b459f4417b2" +dependencies = [ + "cfg-if", + "indoc", + "libc", + "parking_lot", + "pyo3-build-config", + "pyo3-ffi", + "pyo3-macros", + "unindent", +] + +[[package]] +name = "pyo3-asyncio" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb097197d88c0d24df47fcf43c1f7eadde27b17a558cf8875123d60cee2cbba8" +dependencies = [ + "futures", + "once_cell", + "pin-project-lite", + "pyo3", + "tokio", +] + +[[package]] +name = "pyo3-build-config" +version = "0.16.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "450e2e56cbfa67bbe224cef93312b7a76d81c471d4e0c459d24d4bfaf3d75b53" +dependencies = [ + "once_cell", + "target-lexicon", +] + +[[package]] +name = "pyo3-ffi" +version = "0.16.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36e653782972eba2fe86e8319ade54b97822c65fb1ccc1e116368372faa6ebc9" +dependencies = [ + "libc", + "pyo3-build-config", +] + +[[package]] +name = "pyo3-macros" +version = "0.16.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "317ce641f29f4e10e75765630bf4d28b2008612226fcc80b27f334fee8184d0f" +dependencies = [ + "proc-macro2", + "pyo3-macros-backend", + "quote", + "syn", +] + +[[package]] +name = "pyo3-macros-backend" +version = "0.16.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59342fce58a05983688e8d81209d06f67f0fcb1597253ef63b390b2da2417522" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "quote" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1feb54ed693b93a84e14094943b84b7c4eae204c512b7ccb95ab0c66d278ad1" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62f25bc4c7e55e0b0b7a1d43fb893f4fa1361d0abe38b9ce4f323c2adfe6ef42" +dependencies = [ + "bitflags", +] + +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + +[[package]] +name = "slab" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32" + +[[package]] +name = "smallvec" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" + +[[package]] +name = "socket2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "syn" +version = "1.0.91" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b683b2b825c8eef438b77c36a06dc262294da3d5a5813fac20da149241dcd44d" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + +[[package]] +name = "target-lexicon" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7fa7e55043acb85fca6b3c01485a2eeb6b69c5d21002e273c79e465f43b7ac1" + +[[package]] +name = "tokio" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" +dependencies = [ + "bytes", + "libc", + "memchr", + "mio", + "num_cpus", + "once_cell", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "winapi", +] + +[[package]] +name = "tokio-macros" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-util" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0edfdeb067411dba2044da6d1cb2df793dd35add7888d73c16e3381ded401764" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", + "tracing", +] + +[[package]] +name = "tower-service" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" + +[[package]] +name = "tracing" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e65ce065b4b5c53e73bb28912318cb8c9e9ad3921f1d669eb0e68b4c8143a2b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + +[[package]] +name = "unicode-xid" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" + +[[package]] +name = "unindent" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "514672a55d7380da379785a4d70ca8386c8883ff7eaae877be4d2081cebe73d8" + +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5acdd78cb4ba54c0045ac14f62d8f94a03d10047904ae2a40afa1e99d8f70825" +dependencies = [ + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_msvc" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17cffbe740121affb56fad0fc0e421804adf0ae00891205213b5cecd30db881d" + +[[package]] +name = "windows_i686_gnu" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2564fde759adb79129d9b4f54be42b32c89970c18ebf93124ca8870a498688ed" + +[[package]] +name = "windows_i686_msvc" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cd9d32ba70453522332c14d38814bceeb747d80b3958676007acadd7e166956" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfce6deae227ee8d356d19effc141a509cc503dfd1f850622ec4b0f84428e1f4" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d19538ccc21819d01deaf88d6a17eae6596a12e9aafdbb97916fb49896d89de9" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..67a19c7 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,55 @@ +[package] +name = "granian" +version = "0.1.0" +description = "" +authors = ["Giovanni Barillari "] +license = "BSD-3-Clause" +edition = "2018" + +keywords = ["emmett"] + +readme = "README.md" +homepage = "https://github.com/emmett-framework/granian" +repository = "https://github.com/emmett-framework/granian" + +include = [ + "Cargo.toml", + "LICENSE", + "pyproject.toml", + "README.md", + "src/*" +] + +[lib] +name = "granian" +crate-type = ["cdylib"] + +[dependencies] +bytes = "1" +hyper = { version = "0.14", features = ["http1", "http2", "server", "stream", "runtime", "tcp"] } +mimalloc = { version = "0.1.28" } +pyo3 = { version = "0.16", features = ["extension-module"] } +pyo3-asyncio = { version = "0.16", features = ["tokio-runtime"] } +socket2 = { version = "0.4", features = ["all"] } +tokio = { version = "1.17", features = ["full"] } +tokio-util = { version = "0.7", features = ["codec"] } + +[package.metadata.maturin] +requires-python = ">=3.7" + +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: BSD License", + "Operating System :: MacOS", + "Operating System :: Microsoft :: Windows", + "Operating System :: POSIX :: Linux", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python", + "Programming Language :: Rust" +] diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..46098b7 --- /dev/null +++ b/LICENSE @@ -0,0 +1,28 @@ +Copyright 2021 Giovanni Barillari + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + +1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A +PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED +TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/granian/__init__.py b/granian/__init__.py new file mode 100644 index 0000000..83bd0e3 --- /dev/null +++ b/granian/__init__.py @@ -0,0 +1,7 @@ +from .granian import ( + asgi as _asgi, + rsgi as _rsgi, + tcp as _tcp, + workers as _workers +) +from .server import Granian diff --git a/granian/_futures.py b/granian/_futures.py new file mode 100644 index 0000000..4a8df20 --- /dev/null +++ b/granian/_futures.py @@ -0,0 +1,6 @@ +import asyncio + + +def future_wrapper(watcher, coro, handler): + fut = asyncio.ensure_future(coro) + fut.add_done_callback(handler(watcher)) diff --git a/granian/_internal.py b/granian/_internal.py new file mode 100644 index 0000000..64af4af --- /dev/null +++ b/granian/_internal.py @@ -0,0 +1,70 @@ +import os +import re +import sys +import threading +import traceback + +from types import ModuleType +from typing import Callable, List, Optional + +CTX = threading.local() +CTX.socks = {} + + +def get_import_components(path: str) -> List[Optional[str]]: + return (re.split(r":(?![\\/])", path, 1) + [None])[:2] + + +def prepare_import(path: str) -> str: + path = os.path.realpath(path) + + fname, ext = os.path.splitext(path) + if ext == ".py": + path = fname + if os.path.basename(path) == "__init__": + path = os.path.dirname(path) + + module_name = [] + + #: move up untile outside package + while True: + path, name = os.path.split(path) + module_name.append(name) + + if not os.path.exists(os.path.join(path, "__init__.py")): + break + + if sys.path[0] != path: + sys.path.insert(0, path) + + return ".".join(module_name[::-1]) + + +def load_module( + module_name: str, + raise_on_failure: bool = True +) -> Optional[ModuleType]: + try: + __import__(module_name) + except ImportError: + if sys.exc_info()[-1].tb_next: + raise RuntimeError( + f"While importing '{module_name}', an ImportError was raised:" + f"\n\n{traceback.format_exc()}" + ) + elif raise_on_failure: + raise RuntimeError(f"Could not import '{module_name}'.") + else: + return + return sys.modules[module_name] + + +def load_target(target: str) -> Callable[..., None]: + path, name = get_import_components(target) + path = prepare_import(path) if path else None + name = name or "app" + module = load_module(path) + rv = module + for element in name.split("."): + rv = getattr(rv, element) + return rv diff --git a/granian/_loops.py b/granian/_loops.py new file mode 100644 index 0000000..1dc5d59 --- /dev/null +++ b/granian/_loops.py @@ -0,0 +1,88 @@ +import asyncio +import sys + +from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple + + +class Registry: + __slots__ = ["_data"] + + def __init__(self): + self._data: Dict[str, Callable[..., Any]] = {} + + def __contains__(self, key: str) -> bool: + return key in self._data + + def keys(self) -> Iterable[str]: + return self._data.keys() + + def register(self, key: str) -> Callable[[], Callable[..., Any]]: + def wrap(builder: Callable[..., Any]) -> Callable[..., Any]: + self._data[key] = builder + return builder + return wrap + + def get(self, key: str) -> Callable[..., Any]: + try: + return self._data[key] + except KeyError: + raise RuntimeError(f"'{key}' implementation not available.") + + + +class BuilderRegistry(Registry): + __slots__ = [] + + def __init__(self): + self._data: Dict[str, Tuple[Callable[..., Any], List[str]]] = {} + + def register( + self, + key: str, + packages: Optional[List[str]] = None + ) -> Callable[[], Callable[..., Any]]: + packages = packages or [] + + def wrap(builder: Callable[..., Any]) -> Callable[..., Any]: + loaded_packages, implemented = {}, True + try: + for package in packages: + __import__(package) + loaded_packages[package] = sys.modules[package] + except ImportError: + implemented = False + if implemented: + self._data[key] = (builder, loaded_packages) + return builder + return wrap + + def get(self, key: str) -> Callable[..., Any]: + try: + builder, packages = self._data[key] + except KeyError: + raise RuntimeError(f"'{key}' implementation not available.") + return builder(**packages) + + +loops = BuilderRegistry() + + +@loops.register('asyncio') +def build_asyncio_loop(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + return loop + + +@loops.register('uvloop', packages=['uvloop']) +def build_uv_loop(uvloop): + asyncio.get_event_loop().close() + asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) + return asyncio.get_event_loop() + + +@loops.register('auto') +def build_auto_loop(): + if 'uvloop' in loops: + return loops.get('uvloop') + return loops.get('asyncio') diff --git a/granian/asgi.py b/granian/asgi.py new file mode 100644 index 0000000..dba3fe4 --- /dev/null +++ b/granian/asgi.py @@ -0,0 +1,72 @@ +from functools import wraps + +from . import _asgi +from ._futures import future_wrapper + +Receiver = _asgi.Receiver +Sender = _asgi.Sender +Scope = _asgi.Scope + + +def receiver_wrapper(receiver): + @wraps(receiver) + async def wrapper(): + return { + "type": "http.request", + "body": await receiver, + "more_body": False + } + return wrapper + + +def sender_wrapper(sender): + @wraps(sender) + async def wrapper(data): + sender(data) + return wrapper + + +def callback_wrapper(callback): + @wraps(callback) + def wrapper(watcher, scope, receiver, sender): + client_addr, client_port = (scope.client.split(":") + ["0"])[:2] + coro = callback( + { + "type": scope.proto, + "asgi": { + "version": "3.0", + "spec_version": "2.3" + }, + "http_version": scope.http_version, + "server": ("127.0.0.1", 8000), + "client": (client_addr, int(client_port)), + "scheme": scope.scheme, + "method": scope.method, + "root_path": "", + "path": scope.path, + "raw_path": scope.path.encode("ascii"), + "query_string": scope.query_string, + "headers": scope.headers, + "extensions": {} + }, + receiver_wrapper(receiver), + sender_wrapper(sender) + ) + watcher.event_loop.call_soon_threadsafe( + future_wrapper, + watcher, + coro, + future_handler, + context=watcher.context + ) + return wrapper + + +def future_handler(watcher): + def handler(task): + try: + task.result() + watcher.done(True) + except Exception: + watcher.done(False) + return handler diff --git a/granian/asgi.pyi b/granian/asgi.pyi new file mode 100644 index 0000000..9236a84 --- /dev/null +++ b/granian/asgi.pyi @@ -0,0 +1,20 @@ +from typing import Any, Dict + + +class Receiver: + async def __call__(self) -> bytes: ... + + +class Sender: + def __call__(self, message: Dict[str, Any]) -> None: ... + + +class Scope: + client: str + headers: Dict[bytes, bytes] + http_version: str + method: str + path: str + proto: str + query_string: str + scheme: str diff --git a/granian/constants.py b/granian/constants.py new file mode 100644 index 0000000..92712f1 --- /dev/null +++ b/granian/constants.py @@ -0,0 +1,6 @@ +from enum import Enum + + +class Interfaces(str, Enum): + ASGI = "asgi" + RSGI = "rsgi" diff --git a/granian/net.py b/granian/net.py new file mode 100644 index 0000000..1e68c8b --- /dev/null +++ b/granian/net.py @@ -0,0 +1,9 @@ +import copyreg + +from . import _tcp + +SocketHolder = _tcp.ListenerHolder +copyreg.pickle( + SocketHolder, + lambda v: (SocketHolder, v.__getstate__()) +) diff --git a/granian/rsgi.py b/granian/rsgi.py new file mode 100644 index 0000000..9420364 --- /dev/null +++ b/granian/rsgi.py @@ -0,0 +1,47 @@ +from collections import namedtuple +from enum import Enum +from functools import wraps + +from . import _rsgi +from ._futures import future_wrapper + +Scope = _rsgi.Scope +Receiver = _rsgi.Receiver + + +class ResponseType(int, Enum): + empty = 0 + bytes = 1 + string = 2 + file_path = 10 + chunks = 20 + + +Response = namedtuple( + "Response", + ["mode", "status", "headers", "bytes_data", "str_data", "file_path"], + defaults=[ResponseType.empty, 200, {}, None, None, None] +) + + +def callback_wrapper(callback): + @wraps(callback) + def wrapper(watcher, scope, receiver): + watcher.event_loop.call_soon_threadsafe( + future_wrapper, + watcher, + callback(scope, receiver), + future_handler, + context=watcher.context + ) + return wrapper + + +def future_handler(watcher): + def handler(task): + try: + res = task.result() + watcher.done(res) + except Exception: + watcher.done(None) + return handler diff --git a/granian/rsgi.pyi b/granian/rsgi.pyi new file mode 100644 index 0000000..dd89fdc --- /dev/null +++ b/granian/rsgi.pyi @@ -0,0 +1,26 @@ +from typing import Any, Dict, List + + +class Headers: + def __contains__(self, key: str) -> bool: ... + def keys(self) -> List[str]: ... + def values(self) -> List[str]: ... + def items(self) -> List[str]: ... + def get(self, key: str, default: Any = None) -> Any: ... + + +class Scope: + proto: str + http_version: str + client: str + scheme: str + method: str + path: str + query_string: str + + @property + def headers(self) -> Headers: ... + + +class Receiver: + async def __call__(self) -> bytes: ... diff --git a/granian/server.py b/granian/server.py new file mode 100644 index 0000000..a023d94 --- /dev/null +++ b/granian/server.py @@ -0,0 +1,191 @@ +import contextvars +import copyreg +import os +import multiprocessing +import signal +import socket +import threading + +from functools import partial +from typing import List, Optional + +from . import _workers +from ._internal import CTX, load_target +from .asgi import callback_wrapper as _asgi_call_wrap +from .constants import Interfaces +from .net import SocketHolder +from .rsgi import callback_wrapper as _rsgi_call_wrap + +multiprocessing.allow_connection_pickling() + +ASGIWorker = _workers.ASGIWorker +RSGIWorker = _workers.RSGIWorker + + +class Granian: + SIGNALS = { + signal.SIGINT, + signal.SIGTERM + } + + def __init__( + self, + target: str, + address: str = "127.0.0.1", + port: int = 8000, + workers: int = 1, + backlog: int = 512, + threads: Optional[int] = None, + interface: Interfaces = Interfaces.RSGI + ): + self.target = target + self.bind_addr = address + self.bind_port = port + self.workers = max(1, workers) + self.backlog = max(128, backlog) + self.threads = ( + max(1, threads) if threads is not None else + max(2, multiprocessing.cpu_count() // workers) + ) + self.interface = interface + self._sfd = None + self.procs: List[multiprocessing.Process] = [] + self.exit_event = threading.Event() + + @staticmethod + def _target_load(target: str): + return load_target(target) + + @staticmethod + def _spawn_asgi_worker(worker_id, callback_loader, socket, threads): + from granian._loops import loops + + callback = callback_loader() + loop = loops.get("auto") + sfd = socket.fileno() + + worker = ASGIWorker(worker_id, sfd, threads) + worker.serve(_asgi_call_wrap(callback), loop, contextvars.copy_context()) + + # worker._serve_ret_st(callback, loop, contextvars.copy_context()) + # print("infinite loop") + # loop.run_forever() + + @staticmethod + def _spawn_rsgi_worker(worker_id, callback_loader, socket, threads): + from granian._loops import loops + + callback = callback_loader() + loop = loops.get("auto") + sfd = socket.fileno() + + print("spawning wrk", sfd, threads) + worker = RSGIWorker(worker_id, sfd, threads) + worker.serve(_rsgi_call_wrap(callback), loop, contextvars.copy_context()) + + # worker._serve_ret_st(callback, loop, contextvars.copy_context()) + # print("infinite loop") + # loop.run_forever() + + @staticmethod + def _shared_socket_loader(pid): + return CTX.socks[pid] + + @staticmethod + def _local_socket_builder(addr, port, backlog): + return SocketHolder.from_address(addr, port, backlog) + + def _init_shared_socket(self, pid): + # if self.workers > 1: + CTX.socks[pid] = SocketHolder.from_address( + self.bind_addr, + self.bind_port, + self.backlog * self.workers + ) + self._sfd = CTX.socks[pid].get_fd() + + def _build_socket_loader(self, pid): + if self.workers > 1: + return partial( + self._shared_socket_loader, pid + ) + return partial( + self._local_socket_builder, + self.bind_addr, + self.bind_port, + self.backlog + ) + + def signal_handler(self, *args, **kwargs): + self.exit_event.set() + + def _spawn_proc( + self, + id, + target, + callback_loader, + socket_loader + ) -> multiprocessing.Process: + return multiprocessing.get_context().Process( + target=target, + args=(id, callback_loader, socket_loader(), self.threads) + ) + + def startup(self, spawn_target, target_loader): + for sig in self.SIGNALS: + signal.signal(sig, self.signal_handler) + + pid = os.getpid() + self._init_shared_socket(pid) + # socket_loader = self._build_socket_loader(pid) + + sock = socket.socket(fileno=self._sfd) + sock.set_inheritable(True) + + def socket_loader(): + return sock + + for idx in range(self.workers): + proc = self._spawn_proc( + id=idx, + target=spawn_target, + callback_loader=target_loader, + socket_loader=socket_loader + ) + proc.start() + self.procs.append(proc) + + def shutdown(self): + print("send term") + for proc in self.procs: + # proc.terminate() + proc.kill() + print("joining") + for proc in self.procs: + proc.join() + + def serve(self, spawn_target = None, target_loader = None): + default_spawners = { + Interfaces.ASGI: self._spawn_asgi_worker, + Interfaces.RSGI: self._spawn_rsgi_worker + } + target_loader = target_loader or self._target_load + spawn_target = spawn_target or default_spawners[self.mode] + + # if self.workers > 1 and "fork" not in multiprocessing.get_all_start_methods(): + # raise RuntimeError("Multiple workers are not supported on current platform") + + self.startup(spawn_target, partial(target_loader, self.target)) + print("started", self.procs) + try: + self.exit_event.wait() + except KeyboardInterrupt: + print("keyb. interr") + print("exit event received") + self.shutdown() + + +copyreg.pickle( + SocketHolder, + lambda v: (SocketHolder, v.__getstate__()) +) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..68229ff --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,16 @@ +[project] +name = "granian" +repository = "https://github.com/emmett-framework/granian" +dependencies = ["uvloop~=0.16.0; sys_platform != 'win32'"] + +[build-system] +requires = ["maturin>=0.12,<0.13"] +build-backend = "maturin" + +[tool.maturin] +sdist-include = [ + "Cargo.lock", + "granian/*.py", + "tests/*.py" +] +strip = true diff --git a/src/asgi/callbacks.rs b/src/asgi/callbacks.rs new file mode 100644 index 0000000..e5a49a0 --- /dev/null +++ b/src/asgi/callbacks.rs @@ -0,0 +1,80 @@ +use hyper::{Response, Body}; +use pyo3::prelude::*; +use tokio::sync::oneshot; + +use super::super::callbacks::CallbackWrapper; +use super::io::{Receiver, Sender}; +use super::types::Scope; + +#[pyclass] +pub(crate) struct CallbackWatcher { + tx: Option>, + #[pyo3(get)] + event_loop: PyObject, + #[pyo3(get)] + context: PyObject +} + +impl CallbackWatcher { + pub fn new( + py: Python, + cb: CallbackWrapper, + tx: Option> + ) -> Self { + Self { + tx: tx, + event_loop: cb.context.event_loop(py).into(), + context: cb.context.context(py).into(), + } + } +} + +#[pymethods] +impl CallbackWatcher { + fn done(&mut self, success: bool) -> PyResult<()> { + if let Some(tx) = self.tx.take() { + let _ = tx.send(success); + }; + Ok(()) + } +} + +// pub(crate) async fn acall( +// cb: CallbackWrapper, +// receiver: Receiver, +// sender: Sender, +// scope: Scope +// ) -> PyResult<()> { +// Python::with_gil(|py| { +// let coro = cb.callback.call1(py, (scope, receiver, sender))?; +// pyo3_asyncio::into_future_with_locals( +// &cb.context, +// coro.as_ref(py) +// ) +// })? +// .await?; +// Ok(()) +// } + +pub(crate) async fn call( + cb: CallbackWrapper, + receiver: Receiver, + scope: Scope +) -> Result>, super::errors::ASGIFlowError> { + let (tx, rx) = oneshot::channel(); + let (stx, srx) = oneshot::channel(); + + let callback = cb.callback.clone(); + let sender = Sender::new(Some(stx)); + Python::with_gil(|py| { + callback.call1( + py, + (CallbackWatcher::new(py, cb, Some(tx)), scope, receiver, sender) + ) + })?; + + match rx.await { + Ok(true) => Ok(srx), + _ => Err(super::errors::ASGIFlowError) + } +} diff --git a/src/asgi/errors.rs b/src/asgi/errors.rs new file mode 100644 index 0000000..5383e01 --- /dev/null +++ b/src/asgi/errors.rs @@ -0,0 +1,54 @@ +use pyo3::exceptions::PyRuntimeError; +use pyo3::prelude::*; +use std::{error, fmt}; + +#[derive(Debug)] +pub(crate) struct UnsupportedASGIMessage; + +#[derive(Debug)] +pub(crate) struct ASGIFlowError; + +impl error::Error for UnsupportedASGIMessage {} +impl error::Error for ASGIFlowError {} + +impl fmt::Display for UnsupportedASGIMessage { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Unsupported ASGI message") + } +} + +impl fmt::Display for ASGIFlowError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "ASGI flow error") + } +} + +impl From for ASGIFlowError { + fn from(err: std::convert::Infallible) -> ASGIFlowError { + match err {} + } +} + +impl std::convert::From for UnsupportedASGIMessage { + fn from(_pyerr: PyErr) -> UnsupportedASGIMessage { + UnsupportedASGIMessage + } +} + +impl std::convert::From for ASGIFlowError { + fn from(_pyerr: PyErr) -> ASGIFlowError { + ASGIFlowError + } +} + +impl std::convert::From for PyErr { + fn from(err: UnsupportedASGIMessage) -> PyErr { + PyRuntimeError::new_err(err.to_string()) + } +} + +impl std::convert::From for PyErr { + fn from(err: ASGIFlowError) -> PyErr { + PyRuntimeError::new_err(err.to_string()) + } +} diff --git a/src/asgi/http.rs b/src/asgi/http.rs new file mode 100644 index 0000000..6ee796b --- /dev/null +++ b/src/asgi/http.rs @@ -0,0 +1,46 @@ +use hyper::{Body, Request, Response}; +use std::net::SocketAddr; + +use super::super::callbacks::CallbackWrapper; +use super::callbacks::call as callback_caller; +use super::io::{Receiver}; +use super::types::Scope; + +// pub(crate) async fn handle_request( +// cb_wrapper: CallbackWrapper, +// client_addr: SocketAddr, +// req: Request, +// sender: Sender +// ) -> PyResult<()> { +// let scope = Scope::new( +// "http", +// req.version(), +// req.uri().clone(), +// req.method().as_ref(), +// client_addr, +// req.headers() +// ); +// let receiver = Receiver::new(req); + +// callback_caller(cb_wrapper, receiver, sender, scope).await?; +// Ok(()) +// } + +pub(crate) async fn handle_request( + cb_wrapper: CallbackWrapper, + client_addr: SocketAddr, + req: Request, +) -> Result, Box> { + let scope = Scope::new( + "http", + req.version(), + req.uri().clone(), + req.method().as_ref(), + client_addr, + req.headers() + ); + let receiver = Receiver::new(req); + + let rx = callback_caller(cb_wrapper, receiver, scope).await?; + Ok(rx.await?) +} diff --git a/src/asgi/io.rs b/src/asgi/io.rs new file mode 100644 index 0000000..e95c464 --- /dev/null +++ b/src/asgi/io.rs @@ -0,0 +1,186 @@ +use bytes::Buf; +use hyper::{ + Body, + Request, + Response, + header::{HeaderName, HeaderValue, HeaderMap} +}; +use pyo3::prelude::*; +use pyo3::types::{PyBytes, PyDict}; +use std::sync::{Arc}; +use tokio::sync::{Mutex, oneshot}; + +use super::errors::{ASGIFlowError, UnsupportedASGIMessage}; +use super::types::ASGIMessageType; + +#[pyclass(module="granian.asgi")] +pub(crate) struct Receiver { + request: Arc>> +} + +impl Receiver { + pub fn new(request: Request) -> Self { + Self { + request: Arc::new(Mutex::new(request)) + } + } +} + +#[pymethods] +impl Receiver { + fn __call__<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> { + let req_ref = self.request.clone(); + pyo3_asyncio::tokio::future_into_py(py, async move { + let mut req = req_ref.lock().await; + let mut body = hyper::body::to_bytes(&mut *req).await.unwrap(); + Ok(Python::with_gil(|py| { + PyBytes::new_with(py, body.len(), |bytes: &mut [u8]| { + body.copy_to_slice(bytes); + Ok(()) + }).unwrap().as_ref().to_object(py) + })) + }) + } +} + +#[pyclass(module="granian.asgi")] +pub(crate) struct Sender { + inited: bool, + consumed: bool, + status: i16, + headers: HeaderMap, + body: Vec, + tx: Option>> +} + +impl Sender { + pub fn new(tx: Option>>) -> Self { + Self { + inited: false, + consumed: false, + status: 0, + headers: HeaderMap::new(), + body: Vec::new(), + tx: tx + } + } + + fn adapt_message_type( + &self, + message: &PyDict + ) -> Result { + match message.get_item("type") { + Some(item) => { + let message_type: &str = item.extract()?; + match message_type { + "http.response.start" => Ok(ASGIMessageType::Start), + "http.response.body" => Ok(ASGIMessageType::Body), + _ => Err(UnsupportedASGIMessage) + } + }, + _ => Err(UnsupportedASGIMessage) + } + } + + fn adapt_status_code( + &self, + message: &PyDict + ) -> Result { + match message.get_item("status") { + Some(item) => { + Ok(item.extract()?) + }, + _ => Err(UnsupportedASGIMessage) + } + } + + fn adapt_headers(&self, message: &PyDict) -> HeaderMap { + let mut ret = HeaderMap::new(); + match message.get_item("headers") { + Some(item) => { + let accum: Vec> = item.extract().unwrap_or(Vec::new()); + for tup in accum.iter() { + match ( + HeaderName::from_bytes(tup[0]), + HeaderValue::from_bytes(tup[1]) + ) { + (Ok(key), Ok(val)) => { ret.insert(key, val); }, + _ => {} + } + }; + ret + }, + _ => ret + } + } + + fn adapt_body(&self, message: &PyDict) -> (Vec, bool) { + let default_body = b"".to_vec(); + let default_more = false; + let body = match message.get_item("body") { + Some(item) => { + item.extract().unwrap_or(default_body) + }, + _ => default_body + }; + let more = match message.get_item("more_body") { + Some(item) => { + item.extract().unwrap_or(default_more) + }, + _ => default_more + }; + (body, more) + } + + fn init_response(&mut self, status_code: i16, headers: HeaderMap) { + self.status = status_code; + self.headers = headers; + self.inited = true; + } + + fn send_body(&mut self, body: &[u8], finish: bool) { + self.body.extend_from_slice(body); + if finish { + if let Some(tx) = self.tx.take() { + let mut res = Response::new(self.body.to_owned().into()); + *res.status_mut() = hyper::StatusCode::from_u16( + self.status as u16 + ).unwrap(); + *res.headers_mut() = self.headers.to_owned(); + let _ = tx.send(res); + } + self.consumed = true + } + } +} + +#[pymethods] +impl Sender { + fn __call__<'p>(&mut self, data: &PyDict) -> PyResult<()> { + match self.adapt_message_type(data) { + Ok(ASGIMessageType::Start) => { + match self.inited { + false => { + self.init_response( + self.adapt_status_code(data).unwrap(), + self.adapt_headers(data) + ); + Ok(()) + }, + _ => Err(ASGIFlowError.into()) + } + }, + Ok(ASGIMessageType::Body) => { + match (self.inited, self.consumed) { + (true, false) => { + let body_data = self.adapt_body(data); + self.send_body(&body_data.0[..], !body_data.1); + Ok(()) + }, + _ => Err(ASGIFlowError.into()) + } + }, + Err(err) => Err(err.into()) + } + } +} diff --git a/src/asgi/mod.rs b/src/asgi/mod.rs new file mode 100644 index 0000000..cc7fbe3 --- /dev/null +++ b/src/asgi/mod.rs @@ -0,0 +1,18 @@ +use pyo3::prelude::*; + +mod callbacks; +mod errors; +mod http; +mod io; +pub(crate) mod serve; +mod types; + +pub(crate) fn build_pymodule(py: Python) -> PyResult<&PyModule> { + let module = PyModule::new(py, "asgi")?; + + module.add_class::()?; + module.add_class::()?; + module.add_class::()?; + + Ok(module) +} diff --git a/src/asgi/serve.rs b/src/asgi/serve.rs new file mode 100644 index 0000000..0885ef6 --- /dev/null +++ b/src/asgi/serve.rs @@ -0,0 +1,81 @@ +use hyper::{ + Server, + server::conn::AddrStream, + service::{make_service_fn, service_fn} +}; +use pyo3::prelude::*; +use std::{convert::Infallible, process}; + +use super::super::callbacks::CallbackWrapper; +use super::super::workers::{WorkerConfig, worker_rt}; +use super::http::handle_request; + +#[pyclass(module="granian.workers")] +pub struct ASGIWorker { + config: WorkerConfig +} + +#[pymethods] +impl ASGIWorker { + #[new] + #[args(socket_fd, threads="1", http1_buffer_max="65535")] + fn new( + worker_id: i32, + socket_fd: i32, + threads: usize, + http1_buffer_max: usize + ) -> PyResult { + Ok(Self { + config: WorkerConfig::new( + worker_id, + socket_fd, + threads, + http1_buffer_max + ) + }) + } + + fn serve(&self, callback: PyObject, event_loop: &PyAny, context: &PyAny) { + let tcp_listener = worker_rt(&self.config); + let http1_buffer_max = self.config.http1_buffer_max; + let callback_wrapper = CallbackWrapper::new(callback, event_loop, context); + + let worker_id = self.config.id; + println!("Listener spawned: {}", worker_id); + + let svc_loop = pyo3_asyncio::tokio::run_until_complete(event_loop, async move { + println!("Service fut"); + let service = make_service_fn(|socket: &AddrStream| { + let remote_addr = socket.remote_addr(); + let callback_wrapper = callback_wrapper.clone(); + + async move { + Ok::<_, Infallible>(service_fn(move |req| { + let callback_wrapper = callback_wrapper.clone(); + + async move { + Ok::<_, Infallible>(handle_request( + callback_wrapper, remote_addr, req + ).await.unwrap()) + } + })) + } + }); + + println!("Starting server"); + let server = Server::from_tcp(tcp_listener).unwrap() + .http1_max_buf_size(http1_buffer_max) + .serve(service); + server.await.unwrap(); + Ok(()) + }); + + match svc_loop { + Ok(_) => {} + Err(err) => { + println!("err: {}", err); + process::exit(1); + } + }; + } +} diff --git a/src/asgi/types.rs b/src/asgi/types.rs new file mode 100644 index 0000000..45cacd7 --- /dev/null +++ b/src/asgi/types.rs @@ -0,0 +1,80 @@ +use hyper::{Uri, Version, header::{HeaderMap}}; +use pyo3::prelude::*; +use std::collections::HashMap; +use std::net::SocketAddr; + +pub(crate) enum ASGIMessageType { + Start = 0, + Body = 1 +} + +#[pyclass(module="granian.asgi")] +pub(crate) struct Scope { + #[pyo3(get)] + proto: String, + http_version: Version, + #[pyo3(get)] + method: String, + uri: Uri, + #[pyo3(get)] + client: String, + headers: HeaderMap +} + +// TODO: server address +impl Scope { + pub fn new( + proto: &str, + http_version: Version, + uri: Uri, + method: &str, + client: SocketAddr, + headers: &HeaderMap + ) -> Self { + Self { + proto: proto.to_string(), + http_version: http_version, + method: method.to_string(), + uri: uri, + client: client.to_string(), + headers: headers.to_owned() + } + } +} + +#[pymethods] +impl Scope { + #[getter(headers)] + fn get_headers(&self) -> HashMap<&[u8], &[u8]> { + let mut ret = HashMap::new(); + for (key, value) in self.headers.iter() { + ret.insert(key.as_str().as_bytes(), value.as_bytes()); + } + ret + } + + #[getter(http_version)] + fn get_http_version(&self) -> &str { + match self.http_version { + Version::HTTP_10 => "1", + Version::HTTP_11 => "1.1", + Version::HTTP_2 => "2", + _ => "1" + } + } + + #[getter(scheme)] + fn get_scheme(&self) -> &str { + self.uri.scheme_str().unwrap_or("http") + } + + #[getter(path)] + fn get_path(&self) -> &str { + self.uri.path() + } + + #[getter(query_string)] + fn get_query_string(&self) -> &str { + self.uri.query().unwrap_or("") + } +} diff --git a/src/callbacks.rs b/src/callbacks.rs new file mode 100644 index 0000000..fb829f0 --- /dev/null +++ b/src/callbacks.rs @@ -0,0 +1,16 @@ +use pyo3::prelude::*; + +#[derive(Clone)] +pub(crate) struct CallbackWrapper { + pub callback: PyObject, + pub context: pyo3_asyncio::TaskLocals +} + +impl CallbackWrapper { + pub(crate) fn new(callback: PyObject, event_loop: &PyAny, context: &PyAny) -> Self { + Self { + callback: callback, + context: pyo3_asyncio::TaskLocals::new(event_loop).with_context(context) + } + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..fd1fb23 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,22 @@ +#[global_allocator] +static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; + +use pyo3::prelude::*; + +mod asgi; +mod callbacks; +mod rsgi; +mod tcp; +mod workers; + +#[pymodule] +fn granian(py: Python, module: &PyModule) -> PyResult<()> { + module.add_submodule(asgi::build_pymodule(py)?)?; + module.add_submodule(rsgi::build_pymodule(py)?)?; + module.add_submodule(tcp::build_pymodule(py)?)?; + module.add_submodule(workers::build_pymodule(py)?)?; + + pyo3::prepare_freethreaded_python(); + + Ok(()) +} diff --git a/src/rsgi/callbacks.rs b/src/rsgi/callbacks.rs new file mode 100644 index 0000000..3eddb0b --- /dev/null +++ b/src/rsgi/callbacks.rs @@ -0,0 +1,83 @@ +use pyo3::prelude::*; +use std::collections::HashMap; +use tokio::sync::oneshot; + +use super::super::callbacks::CallbackWrapper; +use super::io::Receiver; +use super::types::Scope; + +#[derive(FromPyObject)] +pub(crate) struct CallbackRet { + pub mode: u32, + pub status: i32, + pub headers: HashMap, + pub bytes_data: Option>, + pub str_data: Option, + pub file_path: Option +} + +#[pyclass] +pub(crate) struct CallbackWatcher { + tx: Option>, + #[pyo3(get)] + event_loop: PyObject, + #[pyo3(get)] + context: PyObject +} + +impl CallbackWatcher { + pub fn new( + py: Python, + cb: CallbackWrapper, + tx: Option> + ) -> Self { + Self { + tx: tx, + event_loop: cb.context.event_loop(py).into(), + context: cb.context.context(py).into(), + } + } +} + +#[pymethods] +impl CallbackWatcher { + fn done(&mut self, py: Python, result: PyObject) -> PyResult<()> { + if let Some(tx) = self.tx.take() { + let _ = tx.send(result.extract(py)?); + }; + Ok(()) + } +} + +// pub(crate) async fn acall( +// cb: CallbackWrapper, +// receiver: Receiver, +// scope: Scope +// ) -> PyResult { +// let res = Python::with_gil(|py| { +// let coro = cb.callback.call1(py, (scope, receiver))?; +// pyo3_asyncio::into_future_with_locals( +// &cb.context, +// coro.as_ref(py) +// ) +// })? +// .await?; +// Ok(Python::with_gil(|py| { res.extract(py) })?) +// } + +pub(crate) async fn call( + cb: CallbackWrapper, + receiver: Receiver, + scope: Scope +) -> PyResult { + let (tx, rx) = oneshot::channel(); + let callback = cb.callback.clone(); + Python::with_gil(|py| { + callback.call1(py, (CallbackWatcher::new(py, cb, Some(tx)), scope, receiver)) + })?; + + match rx.await { + Ok(v) => Ok(v), + _ => Python::with_gil(|py| Err(PyErr::from_value(py.None().as_ref(py)))) + } +} diff --git a/src/rsgi/http.rs b/src/rsgi/http.rs new file mode 100644 index 0000000..1707acf --- /dev/null +++ b/src/rsgi/http.rs @@ -0,0 +1,183 @@ +use hyper::{ + Body, + Request, + Response, + header::{HeaderName, HeaderValue}, + http::response::{Builder as ResponseBuilder} +}; +use std::collections::HashMap; +use std::net::SocketAddr; +use tokio::fs::File; +use tokio_util::codec::{BytesCodec, FramedRead}; + +use super::super::callbacks::CallbackWrapper; +use super::callbacks::call as callback_caller; +use super::io::Receiver; +use super::types::{ResponseType, Scope}; + +const RESPONSE_BYTES: u32 = ResponseType::Bytes as u32; +const RESPONSE_FILEPATH: u32 = ResponseType::FilePath as u32; +const RESPONSE_STR: u32 = ResponseType::String as u32; + +const EMPTY_BODY: &[u8] = b""; + +pub trait HTTPResponseData {} + +pub struct HTTPResponse { + status: i32, + headers: HashMap, + response_data: R +} + +impl HTTPResponse { + pub fn response(&self) -> ResponseBuilder { + let mut builder = Response::builder().status(self.status as u16); + let headers = builder.headers_mut().unwrap(); + for (key, value) in self.headers.iter() { + headers.insert( + HeaderName::from_bytes(&key.clone().into_bytes()).unwrap(), + HeaderValue::from_str(&value.clone().as_str()).unwrap() + ); + }; + builder + } + + // pub fn apply(&self, builder: ResponseBuilder) -> ResponseBuilder { + // let mut mbuilder = builder.status(self.status as u16); + // let headers = mbuilder.headers_mut().unwrap(); + // for (key, value) in self.headers.iter() { + // headers.insert( + // HeaderName::from_bytes(&key.clone().into_bytes()).unwrap(), + // HeaderValue::from_str(&value.clone().as_str()).unwrap() + // ); + // }; + // mbuilder + // } +} + +pub struct HTTPEmptyResponse {} + +impl HTTPResponseData for HTTPEmptyResponse {} + +impl HTTPResponse { + pub fn new(status: i32, headers: HashMap) -> Self { + Self { + status: status, + headers: headers, + response_data: HTTPEmptyResponse{} + } + } + + pub fn get_body(&mut self) -> Body { + Body::from(EMPTY_BODY) + } +} + +// pub struct HTTPBodyResponse { +// body: Vec +// } + +// impl HTTPBodyResponse { +// fn new() -> Self { +// Self { body: EMPTY_BODY.to_owned() } +// } +// } + +// impl HTTPResponseData for HTTPBodyResponse {} + +// impl HTTPResponse { +// pub fn new(status: i32, headers: HashMap) -> Self { +// Self { +// status: status, +// headers: headers, +// response_data: HTTPBodyResponse::new() +// } +// } + +// pub fn get_body(&mut self) -> Body { +// // let stream = futures_util::stream::iter(self.response_data.body); +// // Body::wrap_stream(stream) +// // Body::from(std::mem::take(&mut self.response_data.body)) +// Body::from(self.response_data.body.to_owned()) +// } +// } + +pub(crate) struct HTTPFileResponse { + file_path: String +} + +impl HTTPFileResponse { + fn new(file_path: String) -> Self { + Self { file_path: file_path } + } +} + +impl HTTPResponseData for HTTPFileResponse {} + +impl HTTPResponse { + pub fn new(status: i32, headers: HashMap, file_path: String) -> Self { + Self { + status: status, + headers: headers, + response_data: HTTPFileResponse::new(file_path) + } + } + + pub async fn get_body(&self) -> Body { + // if let Ok(file) = File::open(&self.file_path.as_str()).await { + // let stream = FramedRead::new(file, BytesCodec::new()); + // return Ok(Body::wrap_stream(stream)); + // } + // Ok(Body::empty()) + let file = File::open(&self.response_data.file_path.as_str()).await.unwrap(); + let stream = FramedRead::new(file, BytesCodec::new()); + Body::wrap_stream(stream) + } +} + +pub(crate) async fn handle_request( + callback: CallbackWrapper, + client_addr: SocketAddr, + req: Request, +) -> Result, hyper::Error> { + let scope = Scope::new( + "http", + req.version(), + req.uri().clone(), + req.method().as_ref(), + client_addr, + req.headers() + ); + let receiver = Receiver::new(req); + let pyres = callback_caller(callback, receiver, scope).await.unwrap(); + + Ok(match pyres.mode { + RESPONSE_BYTES => { + HTTPResponse::::new( + pyres.status, + pyres.headers + ).response().body(pyres.bytes_data.unwrap().into()).unwrap() + }, + RESPONSE_STR => { + HTTPResponse::::new( + pyres.status, + pyres.headers + ).response().body(pyres.str_data.unwrap().into()).unwrap() + }, + RESPONSE_FILEPATH => { + let http_obj = HTTPResponse::::new( + pyres.status, + pyres.headers, + pyres.file_path.unwrap().to_owned() + ); + http_obj.response().body(http_obj.get_body().await).unwrap() + }, + _ => { + let mut http_obj = HTTPResponse::::new( + pyres.status, + pyres.headers + ); + http_obj.response().body(http_obj.get_body()).unwrap() + } + }) +} diff --git a/src/rsgi/io.rs b/src/rsgi/io.rs new file mode 100644 index 0000000..6b042f4 --- /dev/null +++ b/src/rsgi/io.rs @@ -0,0 +1,37 @@ +use bytes::Buf; +use hyper::{Body, Request}; +use pyo3::prelude::*; +use pyo3::types::{PyBytes}; +use std::sync::{Arc}; +use tokio::sync::{Mutex}; + +#[pyclass(module="granian.rsgi")] +pub(crate) struct Receiver { + request: Arc>> +} + +impl Receiver { + pub fn new(request: Request) -> Self { + Self { + request: Arc::new(Mutex::new(request)) + } + } +} + +#[pymethods] +impl Receiver { + fn __call__<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> { + let req_ref = self.request.clone(); + pyo3_asyncio::tokio::future_into_py(py, async move { + let mut req = req_ref.lock().await; + let mut body = hyper::body::to_bytes(&mut *req).await.unwrap(); + Ok(Python::with_gil(|py| { + // PyBytes::new(py, &body.to_vec()); + PyBytes::new_with(py, body.len(), |bytes: &mut [u8]| { + body.copy_to_slice(bytes); + Ok(()) + }).unwrap().as_ref().to_object(py) + })) + }) + } +} diff --git a/src/rsgi/mod.rs b/src/rsgi/mod.rs new file mode 100644 index 0000000..a447a96 --- /dev/null +++ b/src/rsgi/mod.rs @@ -0,0 +1,16 @@ +use pyo3::prelude::*; + +mod callbacks; +mod http; +mod io; +pub(crate) mod serve; +mod types; + +pub(crate) fn build_pymodule(py: Python) -> PyResult<&PyModule> { + let module = PyModule::new(py, "rsgi")?; + + module.add_class::()?; + module.add_class::()?; + + Ok(module) +} diff --git a/src/rsgi/serve.rs b/src/rsgi/serve.rs new file mode 100644 index 0000000..d6a4871 --- /dev/null +++ b/src/rsgi/serve.rs @@ -0,0 +1,76 @@ +use hyper::{ + Server, + server::conn::AddrStream, + service::{make_service_fn, service_fn} +}; +use pyo3::prelude::*; +use std::{convert::Infallible, process}; + +use super::super::callbacks::CallbackWrapper; +use super::super::workers::{WorkerConfig, worker_rt}; +use super::http::handle_request; + +#[pyclass(module="granian.workers")] +pub struct RSGIWorker { + config: WorkerConfig +} + +#[pymethods] +impl RSGIWorker { + #[new] + #[args(socket_fd, threads="1", http1_buffer_max="65535")] + fn new( + worker_id: i32, + socket_fd: i32, + threads: usize, + http1_buffer_max: usize + ) -> PyResult { + Ok(Self { + config: WorkerConfig::new( + worker_id, + socket_fd, + threads, + http1_buffer_max + ) + }) + } + + fn serve(&self, callback: PyObject, event_loop: &PyAny, context: &PyAny) { + let tcp_listener = worker_rt(&self.config); + let http1_buffer_max = self.config.http1_buffer_max; + let callback_wrapper = CallbackWrapper::new(callback, event_loop, context); + + let svc_loop = pyo3_asyncio::tokio::run_until_complete(event_loop, async move { + let service = make_service_fn(|socket: &AddrStream| { + let remote_addr = socket.remote_addr(); + let callback_wrapper = callback_wrapper.clone(); + + async move { + Ok::<_, Infallible>(service_fn(move |req| { + let callback_wrapper = callback_wrapper.clone(); + + async move { + Ok::<_, Infallible>(handle_request( + callback_wrapper, remote_addr, req + ).await.unwrap()) + } + })) + } + }); + + let server = Server::from_tcp(tcp_listener).unwrap() + .http1_max_buf_size(http1_buffer_max) + .serve(service); + server.await.unwrap(); + Ok(()) + }); + + match svc_loop { + Ok(_) => {} + Err(err) => { + println!("err: {}", err); + process::exit(1); + } + }; + } +} diff --git a/src/rsgi/types.rs b/src/rsgi/types.rs new file mode 100644 index 0000000..70da2ac --- /dev/null +++ b/src/rsgi/types.rs @@ -0,0 +1,128 @@ +use hyper::{Uri, Version, header::{HeaderMap}}; +use pyo3::prelude::*; +use pyo3::types::{PyString}; +use std::net::SocketAddr; + +#[pyclass(module="granian.rsgi")] +#[derive(Clone)] +pub(crate) struct Headers { + inner: HeaderMap +} + +impl Headers { + pub fn new(map: &HeaderMap) -> Self { + Self { inner: map.clone() } + } +} + +#[pymethods] +impl Headers { + fn keys(&self) -> Vec<&str> { + let mut ret = Vec::with_capacity(self.inner.keys_len()); + for key in self.inner.keys() { + ret.push(key.as_str()); + }; + ret + } + + fn values(&self) -> PyResult> { + let mut ret = Vec::with_capacity(self.inner.keys_len()); + for val in self.inner.values() { + ret.push(val.to_str().unwrap()); + }; + Ok(ret) + } + + fn items(&self) -> PyResult> { + let mut ret = Vec::with_capacity(self.inner.keys_len()); + for (key, val) in self.inner.iter() { + ret.push((key.as_str(), val.to_str().unwrap())); + }; + Ok(ret) + } + + fn __contains__(&self, key: &str) -> bool { + self.inner.contains_key(key) + } + + #[args(key, default="None")] + fn get(&self, py: Python, key: &str, default: Option) -> Option { + self.inner.get(key).and_then(|val| { + match val.to_str() { + Ok(string) => Some(PyString::new(py, string).into()), + _ => default + } + }) + } +} + +#[pyclass(module="granian.rsgi")] +pub(crate) struct Scope { + #[pyo3(get)] + proto: String, + http_version: Version, + #[pyo3(get)] + method: String, + uri: Uri, + #[pyo3(get)] + client: String, + #[pyo3(get)] + headers: Headers +} + +impl Scope { + pub fn new( + proto: &str, + http_version: Version, + uri: Uri, + method: &str, + client: SocketAddr, + headers: &HeaderMap + ) -> Self { + Self { + proto: proto.to_string(), + http_version: http_version, + method: method.to_string(), + uri: uri, + client: client.to_string(), + headers: Headers::new(headers) + } + } +} + +#[pymethods] +impl Scope { + #[getter(http_version)] + fn get_http_version(&self) -> &str { + match self.http_version { + Version::HTTP_10 => "1", + Version::HTTP_11 => "1.1", + Version::HTTP_2 => "2", + Version::HTTP_3 => "3", + _ => "1" + } + } + + #[getter(scheme)] + fn get_scheme(&self) -> &str { + self.uri.scheme_str().unwrap_or("http") + } + + #[getter(path)] + fn get_path(&self) -> &str { + self.uri.path() + } + + #[getter(query_string)] + fn get_query_string(&self) -> &str { + self.uri.query().unwrap_or("") + } +} + +pub(crate) enum ResponseType { + Bytes = 1, + String = 2, + FilePath = 10, + // Chunks = 20, + // AsyncIter = 30 +} diff --git a/src/tcp.rs b/src/tcp.rs new file mode 100644 index 0000000..8319543 --- /dev/null +++ b/src/tcp.rs @@ -0,0 +1,181 @@ +use pyo3::prelude::*; +use pyo3::types::PyType; + +use std::net::{IpAddr, SocketAddr, TcpListener}; +#[cfg(unix)] +use std::os::unix::io::{AsRawFd, FromRawFd}; +#[cfg(windows)] +use std::os::windows::io::{AsRawSocket, FromRawSocket}; +use std::time::Duration; + +use socket2::{Domain, Protocol, Socket, TcpKeepalive, Type}; + + +#[pyclass(module="granian.tcp")] +pub struct SocketHolder { + socket: Socket +} + +#[pymethods] +impl SocketHolder { + #[cfg(unix)] + #[new] + pub fn new(fd: i32) -> PyResult { + println!("{}", fd); + let socket = unsafe { + Socket::from_raw_fd(fd) + }; + // println!("{}", socket.type().unwrap()); + Ok(Self { socket: socket }) + } + + #[cfg(windows)] + #[new] + pub fn new(fd: u64) -> PyResult { + let socket = unsafe { + Socket::from_raw_socket(fd) + }; + Ok(Self { socket: socket }) + } + + #[classmethod] + pub fn from_address(_cls: &PyType, address: &str, port: u16, backlog: i32) -> PyResult { + println!("{}", address); + let address: SocketAddr = (address.parse::()?, port).into(); + let socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))?; + socket.set_reuse_address(true)?; + socket.set_tcp_keepalive(&TcpKeepalive::new().with_time(Duration::from_secs(0)))?; + socket.set_nodelay(true)?; + socket.bind(&address.into())?; + socket.listen(backlog)?; + Ok(Self { socket: socket }) + } + + #[cfg(unix)] + pub fn __getstate__(&self, py: Python) -> PyObject { + // let fd = self.socket.try_clone().unwrap().as_raw_fd(); + let fd = self.socket.as_raw_fd(); + println!("{}", fd); + ( + fd.into_py(py), + ).to_object(py) + } + + #[cfg(windows)] + pub fn __getstate__(&self, py: Python) -> PyObject { + // let fd = self.socket.try_clone().unwrap().as_raw_socket(); + let fd = self.socket.as_raw_socket(); + ( + fd.into_py(py), + ).to_object(py) + } + + #[cfg(unix)] + pub fn get_fd(&self, py: Python) -> PyObject { + self.socket.as_raw_fd().into_py(py).to_object(py) + } + + #[cfg(windows)] + pub fn get_fd(&self, py: Python) -> PyObject { + self.socket.as_raw_socket().into_py(py).to_object(py) + } +} + +impl SocketHolder { + // pub fn try_clone(&self) -> PyResult { + // let copied = self.socket.try_clone()?; + // Ok(Self { socket: copied }) + // } + + pub fn get_socket(&self) -> Socket { + self.socket.try_clone().unwrap() + } + + pub fn get_listener(&self) -> TcpListener { + self.socket.try_clone().unwrap().into() + } +} + +#[pyclass(module="granian.tcp")] +pub struct ListenerHolder { + socket: TcpListener +} + +#[pymethods] +impl ListenerHolder { + #[cfg(unix)] + #[new] + pub fn new(fd: i32) -> PyResult { + println!("{}", fd); + let socket = unsafe { + TcpListener::from_raw_fd(fd) + }; + // println!("{}", socket.type().unwrap()); + println!("{}", socket.local_addr().unwrap()); + Ok(Self { socket: socket }) + } + + #[cfg(windows)] + #[new] + pub fn new(fd: u64) -> PyResult { + let socket = unsafe { + TcpListener::from_raw_socket(fd) + }; + Ok(Self { socket: socket }) + } + + #[classmethod] + pub fn from_address(_cls: &PyType, address: &str, port: u16, backlog: i32) -> PyResult { + println!("{}", address); + let address: SocketAddr = (address.parse::()?, port).into(); + let socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))?; + socket.set_reuse_address(true)?; + socket.bind(&address.into())?; + socket.listen(backlog)?; + let listener: TcpListener = socket.into(); + Ok(Self { socket: listener }) + } + + #[cfg(unix)] + pub fn __getstate__(&self, py: Python) -> PyObject { + let fd = self.socket.as_raw_fd(); + println!("{}", fd); + ( + fd.into_py(py), + ).to_object(py) + } + + #[cfg(windows)] + pub fn __getstate__(&self, py: Python) -> PyObject { + let fd = self.socket.as_raw_socket(); + ( + fd.into_py(py), + ).to_object(py) + } + + #[cfg(unix)] + pub fn get_fd(&self, py: Python) -> PyObject { + self.socket.as_raw_fd().into_py(py).to_object(py) + } + + #[cfg(windows)] + pub fn get_fd(&self, py: Python) -> PyObject { + self.socket.as_raw_socket().into_py(py).to_object(py) + } +} + +impl ListenerHolder { + pub fn get_clone(&self) -> TcpListener { + self.socket.try_clone().unwrap() + } +} + + +pub(crate) fn build_pymodule(py: Python) -> PyResult<&PyModule> { + let module = PyModule::new(py, "tcp")?; + + module.add_class::()?; + module.add_class::()?; + + Ok(module) +} diff --git a/src/workers.rs b/src/workers.rs new file mode 100644 index 0000000..40be543 --- /dev/null +++ b/src/workers.rs @@ -0,0 +1,56 @@ +use pyo3::prelude::*; +use std::net::TcpListener; +#[cfg(unix)] +use std::os::unix::io::FromRawFd; +#[cfg(windows)] +use std::os::windows::io::{FromRawSocket, RawSocket}; + +use super::asgi::serve::ASGIWorker; +use super::rsgi::serve::RSGIWorker; + +pub(crate) struct WorkerConfig { + pub id: i32, + socket_fd: i32, + pub threads: usize, + pub http1_buffer_max: usize +} + +impl WorkerConfig { + pub fn new( + id: i32, + socket_fd: i32, + threads: usize, + http1_buffer_max: usize + ) -> Self { + Self { + id, + socket_fd, + threads, + http1_buffer_max + } + } + + pub fn tcp_listener(&self) -> TcpListener { + unsafe { + TcpListener::from_raw_fd(self.socket_fd) + } + } +} + +pub(crate) fn worker_rt(config: &WorkerConfig) -> TcpListener { + let mut tokio_builder = tokio::runtime::Builder::new_multi_thread(); + tokio_builder.worker_threads(config.threads); + tokio_builder.enable_all(); + pyo3_asyncio::tokio::init(tokio_builder); + + config.tcp_listener() +} + +pub(crate) fn build_pymodule(py: Python) -> PyResult<&PyModule> { + let module = PyModule::new(py, "workers")?; + + module.add_class::()?; + module.add_class::()?; + + Ok(module) +}