import asyncio from queue import SimpleQueue from threading import Thread from typing import Any, Callable STOP_RUNNING_SENTINEL = object() class Worker(Thread): """ Dedicated worker thread executing database operations sequentially. The worker consumes (future, callable) items from the unbounded SimpleQueue. It executes the callable, then sets result or mapped exception on the future using loop.call_soon_threadsafe to synchronize with the event loop thread. If work item return STOP_RUNNING_SENTINEL value - it stops the execution (e.g. this can be used to stop worker when connection is about to close) """ def __init__( self, queue: SimpleQueue[tuple[asyncio.Future, Callable[[], Any]] | None], loop: asyncio.AbstractEventLoop, ) -> None: super().__init__(name="turso-async-worker", daemon=True) self._queue = queue self._loop = loop def run(self) -> None: while True: item = self._queue.get() fut, func = item if fut.cancelled(): # Still consume but skip execution if already cancelled continue try: result = func() if result is STOP_RUNNING_SENTINEL: break except Exception as e: self._loop.call_soon_threadsafe(fut.set_exception, e) else: self._loop.call_soon_threadsafe(fut.set_result, result)