mirror of
https://github.com/tursodatabase/limbo.git
synced 2025-12-23 08:21:09 +00:00
43 lines
1.5 KiB
Python
43 lines
1.5 KiB
Python
import asyncio
|
|
from queue import SimpleQueue
|
|
from threading import Thread
|
|
from typing import Any, Callable
|
|
|
|
STOP_RUNNING_SENTINEL = object()
|
|
|
|
class Worker(Thread):
|
|
"""
|
|
Dedicated worker thread executing database operations sequentially.
|
|
|
|
The worker consumes (future, callable) items from the unbounded SimpleQueue.
|
|
It executes the callable, then sets result or mapped exception on the future
|
|
using loop.call_soon_threadsafe to synchronize with the event loop thread.
|
|
|
|
If work item return STOP_RUNNING_SENTINEL value - it stops the execution
|
|
(e.g. this can be used to stop worker when connection is about to close)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
queue: SimpleQueue[tuple[asyncio.Future, Callable[[], Any]] | None],
|
|
loop: asyncio.AbstractEventLoop,
|
|
) -> None:
|
|
super().__init__(name="turso-async-worker", daemon=True)
|
|
self._queue = queue
|
|
self._loop = loop
|
|
|
|
def run(self) -> None:
|
|
while True:
|
|
item = self._queue.get()
|
|
fut, func = item
|
|
if fut.cancelled():
|
|
# Still consume but skip execution if already cancelled
|
|
continue
|
|
try:
|
|
result = func()
|
|
if result is STOP_RUNNING_SENTINEL:
|
|
break
|
|
except Exception as e:
|
|
self._loop.call_soon_threadsafe(fut.set_exception, e)
|
|
else:
|
|
self._loop.call_soon_threadsafe(fut.set_result, result)
|