diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..7a7b197 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["--cfg", "pyo3_disable_reference_pool"] diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index cf5fe7c..869c39c 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -3,8 +3,8 @@ name: build on: workflow_dispatch env: - MATURIN_VERSION: 1.5.1 - PY_ALL: 3.8 3.9 3.10 3.11 3.12 pypy3.8 pypy3.9 pypy3.10 + MATURIN_VERSION: 1.6.0 + PY_ALL: 3.8 3.9 3.10 3.11 3.12 3.13 pypy3.8 pypy3.9 pypy3.10 jobs: wheels: @@ -80,16 +80,10 @@ jobs: matrix: os: [ubuntu-latest, macos-latest, macos-14, windows-latest] manylinux: [auto] - # interpreter: ["3.8", "3.9", "3.10", "3.11", "3.12", "pypy3.8", "pypy3.9", "pypy3.10"] - interpreter: ["3.8", "3.9", "3.10", "3.11", "3.12"] + interpreter: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"] include: - os: ubuntu-latest platform: linux - exclude: - - os: macos-14 - interpreter: '3.8' - - os: macos-14 - interpreter: '3.9' runs-on: ${{ matrix.os }} steps: @@ -97,6 +91,7 @@ jobs: - uses: actions/setup-python@v5 with: python-version: ${{ matrix.interpreter }} + allow-prereleases: true - uses: dtolnay/rust-toolchain@stable with: components: llvm-tools diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 5d3fdad..d216c48 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -7,7 +7,7 @@ on: - master env: - MATURIN_VERSION: 1.5.1 + MATURIN_VERSION: 1.6.0 PYTHON_VERSION: 3.12 jobs: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 73c6fc9..026e0b2 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -8,8 +8,8 @@ on: # types: [published] env: - MATURIN_VERSION: 1.5.1 - PY_ALL: 3.8 3.9 3.10 3.11 3.12 pypy3.8 pypy3.9 pypy3.10 + MATURIN_VERSION: 1.6.0 + PY_ALL: 3.8 3.9 3.10 3.11 3.12 3.13 pypy3.8 pypy3.9 pypy3.10 jobs: sdist: @@ -107,8 +107,7 @@ jobs: matrix: os: [ubuntu-latest, macos-13, macos-14, windows-latest] manylinux: [auto] - # interpreter: ["3.8", "3.9", "3.10", "3.11", "3.12", "pypy3.8", "pypy3.9", "pypy3.10"] - interpreter: ["3.8", "3.9", "3.10", "3.11", "3.12"] + interpreter: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"] include: - os: ubuntu-latest platform: linux diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ce7028b..16395f6 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -10,7 +10,7 @@ on: - master env: - MATURIN_VERSION: 1.5.1 + MATURIN_VERSION: 1.6.0 jobs: linux: @@ -18,7 +18,13 @@ jobs: strategy: fail-fast: false matrix: - python-version: [3.8, 3.9, '3.10', '3.11', '3.12'] + python-version: + - '3.8' + - '3.9' + - '3.10' + - '3.11' + - '3.12' + - '3.13' steps: - uses: actions/checkout@v4 @@ -26,6 +32,7 @@ jobs: uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + allow-prereleases: true - name: Install run: | python -m venv .venv @@ -42,7 +49,13 @@ jobs: strategy: fail-fast: false matrix: - python-version: [3.8, 3.9, '3.10', '3.11', '3.12'] + python-version: + - '3.8' + - '3.9' + - '3.10' + - '3.11' + - '3.12' + - '3.13' steps: - uses: actions/checkout@v4 @@ -50,6 +63,7 @@ jobs: uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + allow-prereleases: true - name: Install run: | python -m venv .venv @@ -66,7 +80,13 @@ jobs: strategy: fail-fast: false matrix: - python-version: [3.8, 3.9, '3.10', '3.11', '3.12'] + python-version: + - '3.8' + - '3.9' + - '3.10' + - '3.11' + - '3.12' + - '3.13' steps: - uses: actions/checkout@v4 @@ -74,6 +94,7 @@ jobs: uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + allow-prereleases: true - name: Install run: | python -m venv venv diff --git a/Cargo.lock b/Cargo.lock index b9d105d..dcde3f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -64,9 +64,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "bitflags" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" +checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" [[package]] name = "block-buffer" @@ -91,9 +91,9 @@ checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" [[package]] name = "cc" -version = "1.0.99" +version = "1.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96c51067fd44124faa7f870b4b1c969379ad32b2ba805aa959430ceaa384f695" +checksum = "c891175c3fb232128f48de6590095e59198bbeb8620c310be349bfc3afd12c7b" [[package]] name = "cfg-if" @@ -348,9 +348,9 @@ checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" [[package]] name = "heck" -version = "0.4.1" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermit-abi" @@ -488,9 +488,9 @@ checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" [[package]] name = "libmimalloc-sys" -version = "0.1.38" +version = "0.1.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e7bb23d733dfcc8af652a78b7bf232f0e967710d044732185e561e47c0336b6" +checksum = "23aa6811d3bd4deb8a84dde645f943476d13b248d818edcf8ce0b2f37f036b44" dependencies = [ "cc", "libc", @@ -529,9 +529,9 @@ dependencies = [ [[package]] name = "mimalloc" -version = "0.1.42" +version = "0.1.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9186d86b79b52f4a77af65604b51225e8db1d6ee7e3f41aec1e40829c71a176" +checksum = "68914350ae34959d83f732418d51e2427a794055d0b9529f48259ac07af65633" dependencies = [ "libmimalloc-sys", ] @@ -656,25 +656,25 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.85" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22244ce15aa966053a896d1accb3a6e68469b97c7f33f284b99f0d576879fc23" +checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" dependencies = [ "unicode-ident", ] [[package]] name = "pyo3" -version = "0.21.2" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5e00b96a521718e08e03b1a622f01c8a8deb50719335de3f60b3b3950f069d8" +checksum = "1962a33ed2a201c637fc14a4e0fd4e06e6edfdeee6a5fede0dab55507ad74cf7" dependencies = [ "anyhow", "cfg-if", "indoc", "libc", "memoffset", - "parking_lot", + "once_cell", "portable-atomic", "pyo3-build-config", "pyo3-ffi", @@ -684,9 +684,9 @@ dependencies = [ [[package]] name = "pyo3-build-config" -version = "0.21.2" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7883df5835fafdad87c0d888b266c8ec0f4c9ca48a5bed6bbb592e8dedee1b50" +checksum = "ab7164b2202753bd33afc7f90a10355a719aa973d1f94502c50d06f3488bc420" dependencies = [ "once_cell", "python3-dll-a", @@ -695,9 +695,9 @@ dependencies = [ [[package]] name = "pyo3-ffi" -version = "0.21.2" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01be5843dc60b916ab4dad1dca6d20b9b4e6ddc8e15f50c47fe6d85f1fb97403" +checksum = "c6424906ca49013c0829c5c1ed405e20e2da2dc78b82d198564880a704e6a7b7" dependencies = [ "libc", "pyo3-build-config", @@ -706,8 +706,7 @@ dependencies = [ [[package]] name = "pyo3-log" version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2af49834b8d2ecd555177e63b273b708dea75150abc6f5341d0a6e1a9623976c" +source = "git+https://github.com/a1phyr/pyo3-log?rev=09e4ed97a8#09e4ed97a8e9ed4d745f4ca97e39515bee6bd943" dependencies = [ "arc-swap", "log", @@ -716,9 +715,9 @@ dependencies = [ [[package]] name = "pyo3-macros" -version = "0.21.2" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77b34069fc0682e11b31dbd10321cbf94808394c56fd996796ce45217dfac53c" +checksum = "82b2f19e153122d64afd8ce7aaa72f06a00f52e34e1d1e74b6d71baea396460a" dependencies = [ "proc-macro2", "pyo3-macros-backend", @@ -728,9 +727,9 @@ dependencies = [ [[package]] name = "pyo3-macros-backend" -version = "0.21.2" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08260721f32db5e1a5beae69a55553f56b99bd0e1c3e6e0a5e8851a9d0f5a85c" +checksum = "dd698c04cac17cf0fe63d47790ab311b8b25542f5cb976b65c374035c50f1eef" dependencies = [ "heck", "proc-macro2", @@ -741,9 +740,9 @@ dependencies = [ [[package]] name = "python3-dll-a" -version = "0.2.9" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5f07cd4412be8fa09a721d40007c483981bbe072cd6a21f2e83e04ec8f8343f" +checksum = "bd0b78171a90d808b319acfad166c4790d9e9759bbc14ac8273fe133673dd41b" dependencies = [ "cc", ] @@ -917,15 +916,15 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" [[package]] name = "subtle" -version = "2.5.0" +version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" +checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.66" +version = "2.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c42f3f41a2de00b01c0aaad383c5a45241efc8b2d1eda5661812fda5f3cdcff5" +checksum = "901fa70d88b9d6c98022e23b4136f9f3e54e4662c3bc1bd1d84a42a9a0f0c1e9" dependencies = [ "proc-macro2", "quote", @@ -980,9 +979,9 @@ dependencies = [ [[package]] name = "tinyvec" -version = "1.6.0" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" +checksum = "c55115c6fbe2d2bef26eb09ad74bde02d8255476fc0c7b515ef09fbb35742d82" dependencies = [ "tinyvec_macros", ] diff --git a/Cargo.toml b/Cargo.toml index 19022e9..2bc21b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,8 +41,8 @@ itertools = "0.13" log = "0.4" percent-encoding = "=2.3" pin-project = "1.1" -pyo3 = { version = "=0.21", features = ["anyhow", "extension-module", "generate-import-lib"] } -pyo3-log = "=0.10" +pyo3 = { version = "=0.22", features = ["anyhow", "extension-module", "generate-import-lib"] } +pyo3-log = { version = "=0.10", git = "https://github.com/a1phyr/pyo3-log", rev = "09e4ed97a8" } rustls-pemfile = "2.1" socket2 = { version = "0.5", features = ["all"] } tls-listener = { version = "=0.9", features = ["rustls"] } diff --git a/pyproject.toml b/pyproject.toml index 5c24c8f..c6c92d5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ classifiers = [ 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', 'Programming Language :: Python :: 3.12', + 'Programming Language :: Python :: 3.13', 'Programming Language :: Python :: Implementation :: CPython', 'Programming Language :: Python :: Implementation :: PyPy', 'Programming Language :: Python', @@ -34,7 +35,7 @@ dynamic = [ requires-python = '>=3.8' dependencies = [ 'click>=8.0.0', - 'uvloop>=0.18.0; sys_platform != "win32" and platform_python_implementation == "CPython"', + 'uvloop>=0.18.0; sys_platform != "win32" and platform_python_implementation == "CPython" and python_version != "3.13"', ] [project.optional-dependencies] diff --git a/src/asgi/callbacks.rs b/src/asgi/callbacks.rs index 311956b..125db3d 100644 --- a/src/asgi/callbacks.rs +++ b/src/asgi/callbacks.rs @@ -8,12 +8,13 @@ use super::{ utils::{build_scope_http, build_scope_ws, scope_native_parts}, }; use crate::{ + asyncio::PyContext, callbacks::{ callback_impl_loop_err, callback_impl_loop_pytask, callback_impl_loop_run, callback_impl_loop_step, callback_impl_loop_wake, callback_impl_run, callback_impl_run_pytask, CallbackWrapper, }, http::{response_500, HTTPResponse}, - runtime::{RuntimeRef, TaskLocals}, + runtime::RuntimeRef, utils::log_application_callable_exception, ws::{HyperWebsocket, UpgradeData}, }; @@ -21,7 +22,7 @@ use crate::{ #[pyclass(frozen)] pub(crate) struct CallbackRunnerHTTP { proto: Py, - context: TaskLocals, + context: PyContext, cb: PyObject, } @@ -69,13 +70,13 @@ macro_rules! callback_impl_done_err { #[pyclass(frozen)] pub(crate) struct CallbackTaskHTTP { proto: Py, - context: TaskLocals, + context: PyContext, pycontext: PyObject, cb: PyObject, } impl CallbackTaskHTTP { - pub fn new(py: Python, cb: PyObject, proto: Py, context: TaskLocals) -> PyResult { + pub fn new(py: Python, cb: PyObject, proto: Py, context: PyContext) -> PyResult { let pyctx = context.context(py); Ok(Self { proto, @@ -112,7 +113,7 @@ impl CallbackTaskHTTP { pub(crate) struct CallbackWrappedRunnerHTTP { #[pyo3(get)] proto: Py, - context: TaskLocals, + context: PyContext, cb: PyObject, #[pyo3(get)] scope: PyObject, @@ -123,7 +124,7 @@ impl CallbackWrappedRunnerHTTP { Self { proto: Py::new(py, proto).unwrap(), context: cb.context, - cb: cb.callback, + cb: cb.callback.clone_ref(py), scope: scope.into_py(py), } } @@ -149,7 +150,7 @@ impl CallbackWrappedRunnerHTTP { #[pyclass(frozen)] pub(crate) struct CallbackRunnerWebsocket { proto: Py, - context: TaskLocals, + context: PyContext, cb: PyObject, } @@ -157,7 +158,7 @@ impl CallbackRunnerWebsocket { pub fn new(py: Python, cb: CallbackWrapper, proto: WebsocketProtocol, scope: Bound) -> Self { let pyproto = Py::new(py, proto).unwrap(); Self { - proto: pyproto.clone(), + proto: pyproto.clone_ref(py), context: cb.context, cb: cb.callback.call1(py, (scope, pyproto)).unwrap(), } @@ -169,7 +170,13 @@ impl CallbackRunnerWebsocket { #[pymethods] impl CallbackRunnerWebsocket { fn _loop_task<'p>(&self, py: Python<'p>) -> PyResult> { - CallbackTaskWebsocket::new(py, self.cb.clone(), self.proto.clone(), self.context.clone())?.run(py) + CallbackTaskWebsocket::new( + py, + self.cb.clone_ref(py), + self.proto.clone_ref(py), + self.context.clone(), + )? + .run(py) } } @@ -184,13 +191,13 @@ macro_rules! callback_impl_done_ws { #[pyclass(frozen)] pub(crate) struct CallbackTaskWebsocket { proto: Py, - context: TaskLocals, + context: PyContext, pycontext: PyObject, cb: PyObject, } impl CallbackTaskWebsocket { - pub fn new(py: Python, cb: PyObject, proto: Py, context: TaskLocals) -> PyResult { + pub fn new(py: Python, cb: PyObject, proto: Py, context: PyContext) -> PyResult { let pyctx = context.context(py); Ok(Self { proto, @@ -227,7 +234,7 @@ impl CallbackTaskWebsocket { pub(crate) struct CallbackWrappedRunnerWebsocket { #[pyo3(get)] proto: Py, - context: TaskLocals, + context: PyContext, cb: PyObject, #[pyo3(get)] scope: PyObject, @@ -238,7 +245,7 @@ impl CallbackWrappedRunnerWebsocket { Self { proto: Py::new(py, proto).unwrap(), context: cb.context, - cb: cb.callback, + cb: cb.callback.clone_ref(py), scope: scope.into_py(py), } } diff --git a/src/asyncio.rs b/src/asyncio.rs index dfb7b99..7767ed9 100644 --- a/src/asyncio.rs +++ b/src/asyncio.rs @@ -1,25 +1,38 @@ use pyo3::{prelude::*, sync::GILOnceCell}; -use std::convert::Into; +use std::{convert::Into, sync::Arc}; -static ASYNCIO: GILOnceCell = GILOnceCell::new(); -static ASYNCIO_LOOP: GILOnceCell = GILOnceCell::new(); static CONTEXTVARS: GILOnceCell = GILOnceCell::new(); +static CONTEXT: GILOnceCell = GILOnceCell::new(); -fn asyncio(py: Python) -> PyResult<&Bound> { - ASYNCIO - .get_or_try_init(py, || Ok(py.import_bound("asyncio")?.into())) - .map(|asyncio| asyncio.bind(py)) +#[derive(Clone, Debug)] +pub struct PyContext { + event_loop: Arc, + context: Arc, } -pub(crate) fn get_running_loop(py: Python) -> PyResult> { - ASYNCIO_LOOP - .get_or_try_init(py, || -> PyResult { - let asyncio = asyncio(py)?; +impl PyContext { + pub fn new(event_loop: Bound) -> Self { + let pynone = event_loop.py().None(); + Self { + event_loop: Arc::new(event_loop.unbind()), + context: Arc::new(pynone), + } + } - Ok(asyncio.getattr("get_running_loop")?.into()) - })? - .bind(py) - .call0() + pub fn with_context(self, context: Bound) -> Self { + Self { + context: Arc::new(context.unbind()), + ..self + } + } + + pub fn event_loop<'p>(&self, py: Python<'p>) -> Bound<'p, PyAny> { + self.event_loop.clone_ref(py).into_bound(py) + } + + pub fn context<'p>(&self, py: Python<'p>) -> Bound<'p, PyAny> { + self.context.clone_ref(py).into_bound(py) + } } fn contextvars(py: Python) -> PyResult<&Bound> { @@ -28,6 +41,18 @@ fn contextvars(py: Python) -> PyResult<&Bound> { .bind(py)) } +pub(crate) fn empty_context(py: Python) -> PyResult<&Bound> { + Ok(CONTEXT + .get_or_try_init(py, || { + contextvars(py)? + .getattr("Context")? + .call0() + .map(std::convert::Into::into) + })? + .bind(py)) +} + +#[allow(dead_code)] pub(crate) fn copy_context(py: Python) -> PyResult> { contextvars(py)?.call_method0("copy_context") } diff --git a/src/callbacks.rs b/src/callbacks.rs index 4ef257d..f045c71 100644 --- a/src/callbacks.rs +++ b/src/callbacks.rs @@ -1,24 +1,21 @@ -use pyo3::{exceptions::PyStopIteration, prelude::*, sync::GILOnceCell}; +use pyo3::{exceptions::PyStopIteration, prelude::*}; use std::sync::{atomic, Arc, RwLock}; use tokio::sync::Notify; -use super::runtime::TaskLocals; - -static CONTEXTVARS: GILOnceCell = GILOnceCell::new(); -static CONTEXT: GILOnceCell = GILOnceCell::new(); +use super::asyncio::PyContext; #[derive(Clone)] pub(crate) struct CallbackWrapper { - pub callback: PyObject, - pub context: TaskLocals, + pub callback: Arc, + pub context: PyContext, } impl CallbackWrapper { pub(crate) fn new(callback: PyObject, event_loop: Bound, context: Bound) -> Self { Self { - callback, - context: TaskLocals::new(event_loop).with_context(context), + callback: Arc::new(callback), + context: PyContext::new(event_loop).with_context(context), } } } @@ -288,33 +285,13 @@ impl PyFutureResultSetter { } } -fn contextvars(py: Python) -> PyResult<&Bound> { - Ok(CONTEXTVARS - .get_or_try_init(py, || py.import_bound("contextvars").map(std::convert::Into::into))? - .bind(py)) -} - -pub fn empty_pycontext(py: Python) -> PyResult<&Bound> { - Ok(CONTEXT - .get_or_try_init(py, || { - contextvars(py)? - .getattr("Context")? - .call0() - .map(std::convert::Into::into) - })? - .bind(py)) -} - macro_rules! callback_impl_run { () => { pub fn run(self, py: Python<'_>) -> PyResult> { let event_loop = self.context.event_loop(py); let target = self.into_py(py).getattr(py, pyo3::intern!(py, "_loop_task"))?; let kwctx = pyo3::types::PyDict::new_bound(py); - kwctx.set_item( - pyo3::intern!(py, "context"), - crate::callbacks::empty_pycontext(py)?, - )?; + kwctx.set_item(pyo3::intern!(py, "context"), crate::asyncio::empty_context(py)?)?; event_loop.call_method(pyo3::intern!(py, "call_soon_threadsafe"), (target,), Some(&kwctx)) } }; diff --git a/src/rsgi/callbacks.rs b/src/rsgi/callbacks.rs index b49f089..4dc3928 100644 --- a/src/rsgi/callbacks.rs +++ b/src/rsgi/callbacks.rs @@ -6,11 +6,12 @@ use super::{ types::{PyResponse, PyResponseBody, RSGIHTTPScope as HTTPScope, RSGIWebsocketScope as WebsocketScope}, }; use crate::{ + asyncio::PyContext, callbacks::{ callback_impl_loop_err, callback_impl_loop_pytask, callback_impl_loop_run, callback_impl_loop_step, callback_impl_loop_wake, callback_impl_run, callback_impl_run_pytask, CallbackWrapper, }, - runtime::{RuntimeRef, TaskLocals}, + runtime::RuntimeRef, utils::log_application_callable_exception, ws::{HyperWebsocket, UpgradeData}, }; @@ -18,7 +19,7 @@ use crate::{ #[pyclass(frozen)] pub(crate) struct CallbackRunnerHTTP { proto: Py, - context: TaskLocals, + context: PyContext, cb: PyObject, } @@ -66,13 +67,13 @@ macro_rules! callback_impl_done_err { #[pyclass(frozen)] pub(crate) struct CallbackTaskHTTP { proto: Py, - context: TaskLocals, + context: PyContext, pycontext: PyObject, cb: PyObject, } impl CallbackTaskHTTP { - pub fn new(py: Python, cb: PyObject, proto: Py, context: TaskLocals) -> PyResult { + pub fn new(py: Python, cb: PyObject, proto: Py, context: PyContext) -> PyResult { let pyctx = context.context(py); Ok(Self { proto, @@ -109,7 +110,7 @@ impl CallbackTaskHTTP { pub(crate) struct CallbackWrappedRunnerHTTP { #[pyo3(get)] proto: Py, - context: TaskLocals, + context: PyContext, cb: PyObject, #[pyo3(get)] scope: PyObject, @@ -120,7 +121,7 @@ impl CallbackWrappedRunnerHTTP { Self { proto: Py::new(py, proto).unwrap(), context: cb.context, - cb: cb.callback, + cb: cb.callback.clone_ref(py), scope: scope.into_py(py), } } @@ -146,7 +147,7 @@ impl CallbackWrappedRunnerHTTP { #[pyclass(frozen)] pub(crate) struct CallbackRunnerWebsocket { proto: Py, - context: TaskLocals, + context: PyContext, cb: PyObject, } @@ -154,7 +155,7 @@ impl CallbackRunnerWebsocket { pub fn new(py: Python, cb: CallbackWrapper, proto: WebsocketProtocol, scope: WebsocketScope) -> Self { let pyproto = Py::new(py, proto).unwrap(); Self { - proto: pyproto.clone(), + proto: pyproto.clone_ref(py), context: cb.context, cb: cb.callback.call1(py, (scope, pyproto)).unwrap(), } @@ -166,7 +167,13 @@ impl CallbackRunnerWebsocket { #[pymethods] impl CallbackRunnerWebsocket { fn _loop_task<'p>(&self, py: Python<'p>) -> PyResult> { - CallbackTaskWebsocket::new(py, self.cb.clone(), self.proto.clone(), self.context.clone())?.run(py) + CallbackTaskWebsocket::new( + py, + self.cb.clone_ref(py), + self.proto.clone_ref(py), + self.context.clone(), + )? + .run(py) } } @@ -179,13 +186,13 @@ macro_rules! callback_impl_done_ws { #[pyclass(frozen)] pub(crate) struct CallbackTaskWebsocket { proto: Py, - context: TaskLocals, + context: PyContext, pycontext: PyObject, cb: PyObject, } impl CallbackTaskWebsocket { - pub fn new(py: Python, cb: PyObject, proto: Py, context: TaskLocals) -> PyResult { + pub fn new(py: Python, cb: PyObject, proto: Py, context: PyContext) -> PyResult { let pyctx = context.context(py); Ok(Self { proto, @@ -222,7 +229,7 @@ impl CallbackTaskWebsocket { pub(crate) struct CallbackWrappedRunnerWebsocket { #[pyo3(get)] proto: Py, - context: TaskLocals, + context: PyContext, cb: PyObject, #[pyo3(get)] scope: PyObject, @@ -233,7 +240,7 @@ impl CallbackWrappedRunnerWebsocket { Self { proto: Py::new(py, proto).unwrap(), context: cb.context, - cb: cb.callback, + cb: cb.callback.clone_ref(py), scope: scope.into_py(py), } } diff --git a/src/rsgi/io.rs b/src/rsgi/io.rs index 3271198..e2a602a 100644 --- a/src/rsgi/io.rs +++ b/src/rsgi/io.rs @@ -370,7 +370,7 @@ impl RSGIWebsocketProtocol { let mut trx = itransport.lock().unwrap(); Ok(Python::with_gil(|py| { let pytransport = Py::new(py, RSGIWebsocketTransport::new(rth, stream)).unwrap(); - *trx = Some(pytransport.clone()); + *trx = Some(pytransport.clone_ref(py)); pytransport })) } diff --git a/src/runtime.rs b/src/runtime.rs index 59bcf48..8758685 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -1,8 +1,6 @@ use pyo3::prelude::*; use std::{ - cell::OnceCell, future::Future, - pin::Pin, sync::{Arc, Mutex}, }; use tokio::{ @@ -10,7 +8,6 @@ use tokio::{ task::{JoinHandle, LocalSet}, }; -use super::asyncio::{copy_context, get_running_loop}; use super::blocking::BlockingRunner; use super::callbacks::PyEmptyAwaitable; #[cfg(unix)] @@ -20,49 +17,6 @@ use super::callbacks::PyIterAwaitable; #[cfg(windows)] use super::callbacks::{PyFutureDoneCallback, PyFutureResultSetter}; -tokio::task_local! { - static TASK_LOCALS: OnceCell; -} - -#[derive(Debug, Clone)] -pub struct TaskLocals { - event_loop: PyObject, - context: PyObject, -} - -impl TaskLocals { - pub fn new(event_loop: Bound) -> Self { - let pynone = event_loop.py().None(); - Self { - event_loop: event_loop.into(), - context: pynone, - } - } - - pub fn with_running_loop(py: Python) -> PyResult { - Ok(Self::new(get_running_loop(py)?)) - } - - pub fn with_context(self, context: Bound) -> Self { - Self { - context: context.into(), - ..self - } - } - - pub fn copy_context(self, py: Python) -> PyResult { - Ok(self.with_context(copy_context(py)?)) - } - - pub fn event_loop<'p>(&self, py: Python<'p>) -> Bound<'p, PyAny> { - self.event_loop.clone().into_bound(py) - } - - pub fn context<'p>(&self, py: Python<'p>) -> Bound<'p, PyAny> { - self.context.clone().into_bound(py) - } -} - pub trait JoinError { #[allow(dead_code)] fn is_panic(&self) -> bool; @@ -76,55 +30,38 @@ pub trait Runtime: Send + 'static { where F: Future + Send + 'static; - fn handler(&self) -> RuntimeRef; - fn blocking(&self) -> BlockingRunner; } pub trait ContextExt: Runtime { - fn scope(&self, locals: TaskLocals, fut: F) -> Pin + Send>> - where - F: Future + Send + 'static; - - fn get_task_locals() -> Option; -} - -pub trait SpawnLocalExt: Runtime { - #[allow(dead_code)] - fn spawn_local(&self, fut: F) -> Self::JoinHandle - where - F: Future + 'static; -} - -pub trait LocalContextExt: Runtime { - #[allow(dead_code)] - fn scope_local(&self, locals: TaskLocals, fut: F) -> Pin>> - where - F: Future + 'static; + fn py_event_loop(&self, py: Python) -> PyObject; } pub(crate) struct RuntimeWrapper { rt: tokio::runtime::Runtime, br: BlockingRunner, + pr: Arc, } impl RuntimeWrapper { - pub fn new(blocking_threads: usize) -> Self { + pub fn new(blocking_threads: usize, py_loop: Arc) -> Self { Self { rt: default_runtime(blocking_threads), br: BlockingRunner::new(), + pr: py_loop, } } - pub fn with_runtime(rt: tokio::runtime::Runtime) -> Self { + pub fn with_runtime(rt: tokio::runtime::Runtime, py_loop: Arc) -> Self { Self { rt, br: BlockingRunner::new(), + pr: py_loop, } } pub fn handler(&self) -> RuntimeRef { - RuntimeRef::new(self.rt.handle().clone(), self.br.clone()) + RuntimeRef::new(self.rt.handle().clone(), self.br.clone(), self.pr.clone()) } } @@ -132,11 +69,16 @@ impl RuntimeWrapper { pub struct RuntimeRef { pub inner: tokio::runtime::Handle, pub innerb: BlockingRunner, + innerp: Arc, } impl RuntimeRef { - pub fn new(rt: tokio::runtime::Handle, br: BlockingRunner) -> Self { - Self { inner: rt, innerb: br } + pub fn new(rt: tokio::runtime::Handle, br: BlockingRunner, pyloop: Arc) -> Self { + Self { + inner: rt, + innerb: br, + innerp: pyloop, + } } } @@ -157,52 +99,14 @@ impl Runtime for RuntimeRef { self.inner.spawn(fut) } - fn handler(&self) -> RuntimeRef { - RuntimeRef::new(self.inner.clone(), self.innerb.clone()) - } - fn blocking(&self) -> BlockingRunner { self.innerb.clone() } } impl ContextExt for RuntimeRef { - fn scope(&self, locals: TaskLocals, fut: F) -> Pin + Send>> - where - F: Future + Send + 'static, - { - let cell = OnceCell::new(); - cell.set(locals).unwrap(); - - Box::pin(TASK_LOCALS.scope(cell, fut)) - } - - fn get_task_locals() -> Option { - match TASK_LOCALS.try_with(|c| c.get().cloned()) { - Ok(locals) => locals, - Err(_) => None, - } - } -} - -impl SpawnLocalExt for RuntimeRef { - fn spawn_local(&self, fut: F) -> Self::JoinHandle - where - F: Future + 'static, - { - tokio::task::spawn_local(fut) - } -} - -impl LocalContextExt for RuntimeRef { - fn scope_local(&self, locals: TaskLocals, fut: F) -> Pin>> - where - F: Future + 'static, - { - let cell = OnceCell::new(); - cell.set(locals).unwrap(); - - Box::pin(TASK_LOCALS.scope(cell, fut)) + fn py_event_loop(&self, py: Python) -> PyObject { + self.innerp.clone_ref(py) } } @@ -214,7 +118,7 @@ fn default_runtime(blocking_threads: usize) -> tokio::runtime::Runtime { .unwrap() } -pub(crate) fn init_runtime_mt(threads: usize, blocking_threads: usize) -> RuntimeWrapper { +pub(crate) fn init_runtime_mt(threads: usize, blocking_threads: usize, py_loop: Arc) -> RuntimeWrapper { RuntimeWrapper::with_runtime( RuntimeBuilder::new_multi_thread() .worker_threads(threads) @@ -222,27 +126,12 @@ pub(crate) fn init_runtime_mt(threads: usize, blocking_threads: usize) -> Runtim .enable_all() .build() .unwrap(), + py_loop, ) } -pub(crate) fn init_runtime_st(blocking_threads: usize) -> RuntimeWrapper { - RuntimeWrapper::new(blocking_threads) -} - -// pub(crate) fn into_future(awaitable: &PyAny) -> PyResult> + Send> { -// pyo3_asyncio::into_future_with_locals(&get_current_locals::(awaitable.py())?, awaitable) -// } - -#[inline] -fn get_current_locals(py: Python) -> PyResult -where - R: ContextExt, -{ - if let Some(locals) = R::get_task_locals() { - Ok(locals) - } else { - Ok(TaskLocals::with_running_loop(py)?.copy_context(py)?) - } +pub(crate) fn init_runtime_st(blocking_threads: usize, py_loop: Arc) -> RuntimeWrapper { + RuntimeWrapper::new(blocking_threads, py_loop) } // NOTE: @@ -266,6 +155,7 @@ where let result = fut.await; let _ = rb.run(move || { aw.get().set_result(result); + Python::with_gil(|_| drop(aw)); }); }); @@ -300,8 +190,7 @@ where F: Future> + Send + 'static, T: IntoPy + Send + 'static, { - let task_locals = get_current_locals::(py)?; - let event_loop = task_locals.event_loop(py).to_object(py); + let event_loop = rt.py_event_loop(py); let (aw, cancel_tx) = PyFutureAwaitable::new(event_loop).to_spawn(py)?; let aw_ref = aw.clone_ref(py); let py_fut = aw.clone_ref(py); @@ -312,6 +201,7 @@ where result = fut => { let _ = rb.run(move || { aw.get().set_result(result, aw_ref); + Python::with_gil(|_| drop(aw)); }); }, () = cancel_tx.notified() => {} @@ -329,20 +219,20 @@ where F: Future> + Send + 'static, T: IntoPy + Send + 'static, { - let task_locals = get_current_locals::(py)?; - let event_loop = task_locals.event_loop(py); - let event_loop_ref = event_loop.to_object(py); + let event_loop = rt.py_event_loop(py); + let event_loop_ref = event_loop.clone_ref(py); let cancel_tx = Arc::new(tokio::sync::Notify::new()); let rb = rt.blocking(); - let py_fut = event_loop.call_method0(pyo3::intern!(py, "create_future"))?; + let py_fut = event_loop.call_method0(py, pyo3::intern!(py, "create_future"))?; py_fut.call_method1( + py, pyo3::intern!(py, "add_done_callback"), (PyFutureDoneCallback { cancel_tx: cancel_tx.clone(), },), )?; - let fut_ref = PyObject::from(py_fut.clone()); + let fut_ref = py_fut.clone_ref(py); rt.spawn(async move { tokio::select! { @@ -354,6 +244,8 @@ where Err(err) => (fut_ref.getattr(py, pyo3::intern!(py, "set_exception")).unwrap(), err.into_py(py)) }; let _ = event_loop_ref.call_method1(py, pyo3::intern!(py, "call_soon_threadsafe"), (PyFutureResultSetter, cb, value)); + drop(fut_ref); + drop(event_loop_ref); }); }); }, @@ -361,7 +253,7 @@ where } }); - Ok(py_fut) + Ok(py_fut.into_bound(py)) } #[allow(clippy::unnecessary_wraps)] @@ -381,22 +273,23 @@ where let result_tx = Arc::new(Mutex::new(None)); let result_rx = Arc::clone(&result_tx); - let task_locals = TaskLocals::new(event_loop.clone()).copy_context(py)?; let py_fut = event_loop.call_method0("create_future")?; let loop_tx = event_loop.clone().into_py(py); let future_tx = py_fut.clone().into_py(py); - let rth = rt.handler(); - rt.spawn(async move { - let val = rth.scope(task_locals.clone(), fut).await; + let val = fut.await; if let Ok(mut result) = result_tx.lock() { *result = Some(val.unwrap()); } + // NOTE: we don't care if we block the runtime. + // `run_until_complete` is used only for the workers main loop. Python::with_gil(move |py| { let res_method = future_tx.getattr(py, "set_result").unwrap(); let _ = loop_tx.call_method_bound(py, "call_soon_threadsafe", (res_method, py.None()), None); + drop(future_tx); + drop(loop_tx); }); }); diff --git a/src/workers.rs b/src/workers.rs index 09d97d5..9c30980 100644 --- a/src/workers.rs +++ b/src/workers.rs @@ -336,7 +336,11 @@ macro_rules! serve_rth { signal: Py, ) { pyo3_log::init(); - let rt = crate::runtime::init_runtime_mt(self.config.threads, self.config.blocking_threads); + let rt = crate::runtime::init_runtime_mt( + self.config.threads, + self.config.blocking_threads, + std::sync::Arc::new(event_loop.clone().unbind()), + ); let rth = rt.handler(); let tcp_listener = self.config.tcp_listener(); @@ -443,6 +447,8 @@ macro_rules! serve_rth { _ => unreachable!(), } + Python::with_gil(|_| drop(callback_wrapper)); + log::info!("Stopping worker-{}", worker_id); Ok(()) }); @@ -468,7 +474,11 @@ macro_rules! serve_rth_ssl { signal: Py, ) { pyo3_log::init(); - let rt = crate::runtime::init_runtime_mt(self.config.threads, self.config.blocking_threads); + let rt = crate::runtime::init_runtime_mt( + self.config.threads, + self.config.blocking_threads, + std::sync::Arc::new(event_loop.clone().unbind()), + ); let rth = rt.handler(); let tcp_listener = self.config.tcp_listener(); @@ -581,6 +591,8 @@ macro_rules! serve_rth_ssl { _ => unreachable!(), } + Python::with_gil(|_| drop(callback_wrapper)); + log::info!("Stopping worker-{}", worker_id); Ok(()) }); @@ -606,12 +618,13 @@ macro_rules! serve_wth { signal: Py, ) { pyo3_log::init(); - let rtm = crate::runtime::init_runtime_mt(1, 1); + let rtm = crate::runtime::init_runtime_mt(1, 1, std::sync::Arc::new(event_loop.clone().unbind())); let worker_id = self.config.id; log::info!("Started worker-{}", worker_id); let callback_wrapper = crate::callbacks::CallbackWrapper::new(callback, event_loop.clone(), context); + let py_loop = std::sync::Arc::new(event_loop.clone().unbind()); let mut pyrx = signal.get().rx.lock().unwrap().take().unwrap(); let (stx, srx) = tokio::sync::watch::channel(false); let mut workers = vec![]; @@ -627,10 +640,11 @@ macro_rules! serve_wth { let blocking_threads = self.config.blocking_threads.clone(); let backpressure = self.config.backpressure.clone(); let callback_wrapper = callback_wrapper.clone(); + let py_loop = py_loop.clone(); let mut srx = srx.clone(); workers.push(std::thread::spawn(move || { - let rt = crate::runtime::init_runtime_st(blocking_threads); + let rt = crate::runtime::init_runtime_st(blocking_threads, py_loop); let rth = rt.handler(); let local = tokio::task::LocalSet::new(); @@ -764,12 +778,13 @@ macro_rules! serve_wth_ssl { signal: Py, ) { pyo3_log::init(); - let rtm = crate::runtime::init_runtime_mt(1, 1); + let rtm = crate::runtime::init_runtime_mt(1, 1, std::sync::Arc::new(event_loop.clone().unbind())); let worker_id = self.config.id; log::info!("Started worker-{}", worker_id); let callback_wrapper = crate::callbacks::CallbackWrapper::new(callback, event_loop.clone(), context); + let py_loop = std::sync::Arc::new(event_loop.clone().unbind()); let mut pyrx = signal.get().rx.lock().unwrap().take().unwrap(); let (stx, srx) = tokio::sync::watch::channel(false); let mut workers = vec![]; @@ -786,10 +801,11 @@ macro_rules! serve_wth_ssl { let blocking_threads = self.config.blocking_threads.clone(); let backpressure = self.config.backpressure.clone(); let callback_wrapper = callback_wrapper.clone(); + let py_loop = py_loop.clone(); let mut srx = srx.clone(); workers.push(std::thread::spawn(move || { - let rt = crate::runtime::init_runtime_st(blocking_threads); + let rt = crate::runtime::init_runtime_st(blocking_threads, py_loop); let rth = rt.handler(); let local = tokio::task::LocalSet::new(); diff --git a/src/wsgi/callbacks.rs b/src/wsgi/callbacks.rs index 7b7c3ad..79a8529 100644 --- a/src/wsgi/callbacks.rs +++ b/src/wsgi/callbacks.rs @@ -1,5 +1,4 @@ -use http_body_util::combinators::BoxBody; -use http_body_util::BodyExt; +use http_body_util::{combinators::BoxBody, BodyExt}; use hyper::{ body::{self, Bytes}, header, @@ -12,8 +11,7 @@ use pyo3::{ prelude::*, types::{IntoPyDict, PyBytes, PyDict}, }; -use std::borrow::Cow; -use std::net::SocketAddr; +use std::{borrow::Cow, net::SocketAddr, sync::Arc}; use tokio::task::JoinHandle; use super::types::{WSGIBody, WSGIResponseBodyIter}; @@ -27,7 +25,7 @@ const WSGI_ITER_RESPONSE_BODY: i32 = 1; #[inline] fn run_callback( rt: RuntimeRef, - callback: PyObject, + callback: Arc, mut parts: request::Parts, server_addr: SocketAddr, client_addr: SocketAddr, @@ -68,6 +66,7 @@ fn run_callback( } Python::with_gil(|py| { + let callback = callback.clone_ref(py); let environ = PyDict::new_bound(py); environ.set_item(pyo3::intern!(py, "SERVER_PROTOCOL"), version)?; environ.set_item(pyo3::intern!(py, "SERVER_NAME"), server.0)?; diff --git a/src/wsgi/http.rs b/src/wsgi/http.rs index 4845726..95a3996 100644 --- a/src/wsgi/http.rs +++ b/src/wsgi/http.rs @@ -42,8 +42,9 @@ pub(crate) async fn handle( Ok((status, headers, body)) => { return build_response(status, headers, body); } - Err(ref err) => { - log_application_callable_exception(err); + Err(err) => { + log_application_callable_exception(&err); + pyo3::Python::with_gil(|_| drop(err)); } } } else { diff --git a/src/wsgi/types.rs b/src/wsgi/types.rs index a796f62..a5877b1 100644 --- a/src/wsgi/types.rs +++ b/src/wsgi/types.rs @@ -167,21 +167,25 @@ impl WSGIBody { } pub(crate) struct WSGIResponseBodyIter { - inner: PyObject, + inner: Option, closed: bool, } impl WSGIResponseBodyIter { pub fn new(body: PyObject) -> Self { Self { - inner: body, + inner: Some(body), closed: false, } } #[inline] fn close_inner(&mut self, py: Python) { - let _ = self.inner.call_method0(py, pyo3::intern!(py, "close")); + let _ = self + .inner + .as_ref() + .unwrap() + .call_method0(py, pyo3::intern!(py, "close")); self.closed = true; } } @@ -191,6 +195,8 @@ impl Drop for WSGIResponseBodyIter { if !self.closed { Python::with_gil(|py| self.close_inner(py)); } + let inner = self.inner.take().unwrap(); + Python::with_gil(|_| drop(inner)); } } @@ -198,23 +204,30 @@ impl Iterator for WSGIResponseBodyIter { type Item = Result, anyhow::Error>; fn next(&mut self) -> Option { - Python::with_gil(|py| match self.inner.call_method0(py, pyo3::intern!(py, "__next__")) { - Ok(chunk_obj) => match chunk_obj.extract::>(py) { - Ok(chunk) => { - let chunk: Box<[u8]> = chunk.into(); - Some(Ok(body::Frame::data(Bytes::from(chunk)))) - } - _ => { + Python::with_gil(|py| { + match self + .inner + .as_ref() + .unwrap() + .call_method0(py, pyo3::intern!(py, "__next__")) + { + Ok(chunk_obj) => match chunk_obj.extract::>(py) { + Ok(chunk) => { + let chunk: Box<[u8]> = chunk.into(); + Some(Ok(body::Frame::data(Bytes::from(chunk)))) + } + _ => { + self.close_inner(py); + None + } + }, + Err(err) => { + if !err.is_instance_of::(py) { + log_application_callable_exception(&err); + } self.close_inner(py); None } - }, - Err(err) => { - if !err.is_instance_of::(py) { - log_application_callable_exception(&err); - } - self.close_inner(py); - None } }) }