rename, relocate, and simplify the task queue (#106)

This commit is contained in:
Josh Thomas 2025-04-21 21:59:00 -05:00 committed by GitHub
parent 3fb6fa995d
commit c2e27d43c3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 229 additions and 333 deletions

View file

@ -18,6 +18,10 @@ and this project attempts to adhere to [Semantic Versioning](https://semver.org/
## [Unreleased]
### Changed
- **Internal**: Moved task queueing functionality to `djls-server` crate, renamed from `Worker` to `Queue`, and simplified API.
## [5.2.0a0]
### Added
@ -79,4 +83,4 @@ and this project attempts to adhere to [Semantic Versioning](https://semver.org/
[5.1.0a1]: https://github.com/joshuadavidthomas/django-language-server/releases/tag/v5.1.0a1
[5.1.0a2]: https://github.com/joshuadavidthomas/django-language-server/releases/tag/v5.1.0a2
[5.2.0a0]: https://github.com/joshuadavidthomas/django-language-server/releases/tag/v5.2.0a0
[5.2.0a0]: https://github.com/joshuadavidthomas/django-language-server/releases/tag/v5.2.0a0

View file

@ -7,7 +7,6 @@ djls = { path = "crates/djls" }
djls-project = { path = "crates/djls-project" }
djls-server = { path = "crates/djls-server" }
djls-templates = { path = "crates/djls-templates" }
djls-worker = { path = "crates/djls-worker" }
anyhow = "1.0"
async-trait = "0.1"

View file

@ -172,7 +172,6 @@ The project is written in Rust using PyO3 for Python integration. Here is a high
- Django and Python project introspection ([`crates/djls-project/`](./crates/djls-project/))
- LSP server implementation ([`crates/djls-server/`](./crates/djls-server/))
- Template parsing ([`crates/djls-templates/`](./crates/djls-templates/))
- Tokio-based background task management ([`crates/djls-worker/`](./crates/djls-worker/))
Code contributions are welcome from developers of all backgrounds. Rust expertise is valuable for the LSP server and core components, but Python and Django developers should not be deterred by the Rust codebase - Django expertise is just as valuable. Understanding Django's internals and common development patterns helps inform what features would be most valuable.

View file

@ -6,7 +6,6 @@ edition = "2021"
[dependencies]
djls-project = { workspace = true }
djls-templates = { workspace = true }
djls-worker = { workspace = true }
anyhow = { workspace = true }
pyo3 = { workspace = true }

View file

@ -1,6 +1,6 @@
mod documents;
mod queue;
mod server;
mod tasks;
mod workspace;
use crate::server::DjangoLanguageServer;

View file

@ -0,0 +1,220 @@
use anyhow::Result;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
pub trait Task: Send + 'static {
type Output: Send + 'static;
fn run(&self) -> Result<Self::Output>;
}
trait TaskTrait: Send {
fn run_boxed(self: Box<Self>);
}
impl<T: Task> TaskTrait for T {
fn run_boxed(self: Box<Self>) {
match self.run() {
Ok(_) => { /* Task succeeded, do nothing */ }
Err(e) => {
// Log the error if the task failed.
// Consider adding a proper logging mechanism later.
eprintln!("Task failed: {}", e);
}
}
}
}
#[derive(Clone)]
pub struct Queue {
inner: Arc<QueueInner>,
}
struct QueueInner {
sender: mpsc::Sender<Box<dyn TaskTrait>>,
shutdown_sender: Option<oneshot::Sender<()>>,
}
impl Queue {
pub fn new() -> Self {
let (sender, mut receiver) = mpsc::channel::<Box<dyn TaskTrait>>(32); // Channel for tasks
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
tokio::spawn(async move {
loop {
tokio::select! {
Some(task) = receiver.recv() => {
task.run_boxed();
}
_ = &mut shutdown_rx => {
// Drain the channel before shutting down? Optional.
// For now, just break.
break;
}
else => break,
}
}
});
Self {
inner: Arc::new(QueueInner {
sender,
shutdown_sender: Some(shutdown_tx),
}),
}
}
/// Submits a task to the queue asynchronously, waiting if the channel is full.
/// The task is executed in the background, and its result is ignored.
pub async fn submit<T>(&self, task: T) -> Result<()>
where
T: Task + 'static,
{
self.inner
.sender
.send(Box::new(task))
.await
.map_err(|e| anyhow::anyhow!("Failed to submit task: {}", e))
}
}
impl Default for Queue {
fn default() -> Self {
Self::new()
}
}
impl Drop for QueueInner {
fn drop(&mut self) {
if let Some(sender) = self.shutdown_sender.take() {
sender.send(()).ok();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use anyhow::anyhow;
use std::time::Duration;
use tokio::time::sleep;
struct TestTask(i32);
impl Task for TestTask {
type Output = i32;
fn run(&self) -> Result<Self::Output> {
std::thread::sleep(Duration::from_millis(10));
Ok(self.0 * 2)
}
}
struct ErrorTask;
impl Task for ErrorTask {
type Output = ();
fn run(&self) -> Result<Self::Output> {
Err(anyhow!("Task failed intentionally"))
}
}
#[tokio::test]
async fn test_submit_and_process() {
let queue = Queue::new();
// Submit a few tasks
for i in 0..5 {
queue.submit(TestTask(i)).await.unwrap();
}
// Submit a task that will fail
queue.submit(ErrorTask).await.unwrap();
// Allow some time for tasks to be processed by the background worker.
// In a real scenario, you might not wait like this, but for testing,
// we need to ensure the background task has a chance to run.
sleep(Duration::from_millis(100)).await;
// We can't directly assert results here, but we can check the queue still works.
queue.submit(TestTask(10)).await.unwrap();
sleep(Duration::from_millis(50)).await; // Allow time for the last task
}
#[tokio::test]
async fn test_channel_backpressure_submit() {
let queue = Queue::new();
// Fill the channel (channel size is 32) using submit
let mut tasks = Vec::new();
for i in 0..32 {
let queue_clone = queue.clone();
// Spawn tasks to submit concurrently, as submit waits
tasks.push(tokio::spawn(async move {
queue_clone
.submit(TestTask(i))
.await
.expect("Submit should succeed");
}));
}
// Wait for all initial submissions to likely be sent (though not necessarily processed)
for task in tasks {
task.await.unwrap();
}
// Try submitting one more task. This should wait until a slot is free.
// We'll use a timeout to ensure it doesn't block forever if something is wrong.
let submit_task = queue.submit(TestTask(33));
match tokio::time::timeout(Duration::from_millis(200), submit_task).await {
Ok(Ok(_)) => { /* Successfully submitted after waiting */ }
Ok(Err(e)) => panic!("Submit failed unexpectedly: {}", e),
Err(_) => panic!("Submit timed out, likely blocked due to backpressure not resolving"),
}
// Allow time for processing
sleep(Duration::from_millis(100)).await;
}
#[tokio::test]
async fn test_shutdown() {
let queue = Queue::new();
queue.submit(TestTask(1)).await.unwrap();
queue.submit(TestTask(2)).await.unwrap();
// Queue is dropped here, triggering shutdown
drop(queue);
// Allow time for shutdown signal to be processed and potentially
// for the background task to finish ongoing work (though not guaranteed here).
sleep(Duration::from_millis(100)).await;
// No direct assertion, just checking it doesn't panic/hang.
}
#[tokio::test]
async fn test_queue_cloning() {
let queue1 = Queue::new();
let queue2 = queue1.clone();
// Submit tasks via both clones
let task1 = queue1.submit(TestTask(10));
let task2 = queue2.submit(TestTask(20));
// Wait for submissions to complete
tokio::try_join!(task1, task2).unwrap();
// Allow time for processing
sleep(Duration::from_millis(100)).await;
}
#[tokio::test]
async fn test_error_task_does_not_stop_queue() {
let queue = Queue::new();
queue.submit(TestTask(1)).await.unwrap();
queue.submit(ErrorTask).await.unwrap(); // Submit the failing task
queue.submit(TestTask(2)).await.unwrap();
// Allow time for tasks to process
sleep(Duration::from_millis(100)).await;
// Submit another task to ensure the queue is still running after the error
queue.submit(TestTask(3)).await.unwrap();
sleep(Duration::from_millis(50)).await;
// If we reach here without panic, the queue continued after the error.
// We expect an error message "Task failed: Task failed intentionally"
// to be printed to stderr during the test run.
}
}

View file

@ -1,8 +1,8 @@
use crate::documents::Store;
use crate::queue::Queue;
use crate::workspace::get_project_path;
use anyhow::Result;
use djls_project::DjangoProject;
use djls_worker::Worker;
use std::sync::Arc;
use tokio::sync::RwLock;
use tower_lsp_server::jsonrpc::Result as LspResult;
@ -16,7 +16,7 @@ pub struct DjangoLanguageServer {
client: Client,
project: Arc<RwLock<Option<DjangoProject>>>,
documents: Arc<RwLock<Store>>,
worker: Worker,
queue: Queue,
}
impl DjangoLanguageServer {
@ -25,7 +25,7 @@ impl DjangoLanguageServer {
client,
project: Arc::new(RwLock::new(None)),
documents: Arc::new(RwLock::new(Store::new())),
worker: Worker::new(),
queue: Queue::new(),
}
}

View file

@ -1,25 +0,0 @@
use anyhow::Result;
use djls_worker::Task;
use std::time::Duration;
pub struct DebugTask {
pub message: String,
pub delay: Duration,
}
impl DebugTask {
pub fn new(message: String, delay: Duration) -> Self {
Self { message, delay }
}
}
impl Task for DebugTask {
type Output = String;
fn run(&self) -> Result<Self::Output> {
std::thread::sleep(self.delay);
let result = format!("Debug task completed: {}", self.message);
Ok(result)
}
}

View file

@ -1,9 +0,0 @@
[package]
name = "djls-worker"
version = "0.0.0"
edition = "2021"
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
tokio = { workspace = true }

View file

@ -1,291 +0,0 @@
use anyhow::Result;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
pub trait Task: Send + 'static {
type Output: Send + 'static;
fn run(&self) -> Result<Self::Output>;
}
struct WorkerInner {
sender: mpsc::Sender<TaskMessage>,
shutdown_sender: Option<oneshot::Sender<()>>,
}
#[derive(Clone)]
pub struct Worker {
inner: Arc<WorkerInner>,
}
enum TaskMessage {
Execute(Box<dyn TaskTrait>),
WithResult(
Box<dyn TaskTrait>,
oneshot::Sender<Result<Box<dyn std::any::Any + Send>>>,
),
}
trait TaskTrait: Send {
fn run_boxed(self: Box<Self>) -> Result<Box<dyn std::any::Any + Send>>;
}
impl<T: Task> TaskTrait for T {
fn run_boxed(self: Box<Self>) -> Result<Box<dyn std::any::Any + Send>> {
self.run()
.map(|output| Box::new(output) as Box<dyn std::any::Any + Send>)
}
}
impl Worker {
pub fn new() -> Self {
let (sender, mut receiver) = mpsc::channel(32);
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
tokio::spawn(async move {
loop {
tokio::select! {
Some(msg) = receiver.recv() => {
match msg {
TaskMessage::Execute(task) => {
let _ = task.run_boxed();
}
TaskMessage::WithResult(task, sender) => {
let result = task.run_boxed();
let _ = sender.send(result);
}
}
}
_ = &mut shutdown_rx => break,
}
}
});
Self {
inner: Arc::new(WorkerInner {
sender,
shutdown_sender: Some(shutdown_tx),
}),
}
}
/// Attempts to execute a task immediately without waiting.
/// Returns an error if the worker's channel is full.
///
/// Best for non-critical tasks where backpressure is desired.
pub fn execute<T>(&self, task: T) -> Result<()>
where
T: Task + 'static,
{
self.inner
.sender
.try_send(TaskMessage::Execute(Box::new(task)))
.map_err(|e| anyhow::anyhow!("Failed to execute task: {}", e))
}
/// Submits a task asynchronously, waiting if the channel is full.
///
/// Good for tasks that must be processed but where you don't need
/// the result immediately.
pub async fn submit<T>(&self, task: T) -> Result<()>
where
T: Task + 'static,
{
self.inner
.sender
.send(TaskMessage::Execute(Box::new(task)))
.await
.map_err(|e| anyhow::anyhow!("Failed to submit task: {}", e))
}
/// Submits a task and waits for its result.
///
/// Best when you need the output of the task. This method will
/// wait both for space to submit the task and for its completion.
pub async fn wait_for<T>(&self, task: T) -> Result<T::Output>
where
T: Task + 'static,
{
let (tx, rx) = oneshot::channel();
self.inner
.sender
.send(TaskMessage::WithResult(Box::new(task), tx))
.await
.map_err(|e| anyhow::anyhow!("Failed to send task: {}", e))?;
let result = rx
.await
.map_err(|e| anyhow::anyhow!("Failed to receive result: {}", e))??;
result
.downcast()
.map(|b| *b)
.map_err(|_| anyhow::anyhow!("Failed to downcast result"))
}
}
impl Default for Worker {
fn default() -> Self {
Self::new()
}
}
impl Drop for WorkerInner {
fn drop(&mut self) {
if let Some(sender) = self.shutdown_sender.take() {
sender.send(()).ok();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use anyhow::anyhow;
use std::time::Duration;
use tokio::time::sleep;
struct TestTask(i32);
impl Task for TestTask {
type Output = i32;
fn run(&self) -> Result<Self::Output> {
Ok(self.0 * 2)
}
}
// Basic functionality tests
#[tokio::test]
async fn test_wait_for() {
let worker = Worker::new();
let result = worker.wait_for(TestTask(21)).await.unwrap();
assert_eq!(result, 42);
}
#[tokio::test]
async fn test_submit() {
let worker = Worker::new();
for i in 0..32 {
assert!(worker.execute(TestTask(i)).is_ok());
}
assert!(worker.execute(TestTask(33)).is_err());
assert!(worker.submit(TestTask(33)).await.is_ok());
sleep(Duration::from_millis(50)).await;
}
#[tokio::test]
async fn test_execute() {
let worker = Worker::new();
assert!(worker.execute(TestTask(21)).is_ok());
sleep(Duration::from_millis(50)).await;
}
// Test channel backpressure
#[tokio::test]
async fn test_channel_backpressure() {
let worker = Worker::new();
// Fill the channel (channel size is 32)
for i in 0..32 {
assert!(worker.execute(TestTask(i)).is_ok());
}
// Next execute should fail
assert!(worker.execute(TestTask(33)).is_err());
// But wait_for should eventually succeed
let result = worker.wait_for(TestTask(33)).await.unwrap();
assert_eq!(result, 66);
}
// Test concurrent tasks
#[tokio::test]
async fn test_concurrent_tasks() {
let worker = Worker::new();
let mut handles = Vec::new();
// Spawn multiple concurrent tasks
for i in 0..10 {
let worker = worker.clone();
let handle = tokio::spawn(async move {
let result = worker.wait_for(TestTask(i)).await.unwrap();
assert_eq!(result, i * 2);
});
handles.push(handle);
}
// Wait for all tasks to complete
for handle in handles {
handle.await.unwrap();
}
}
// Test shutdown behavior
#[tokio::test]
async fn test_shutdown() {
{
let worker = Worker::new();
worker.execute(TestTask(1)).unwrap();
worker.wait_for(TestTask(2)).await.unwrap();
// Worker will be dropped here, triggering shutdown
}
sleep(Duration::from_millis(50)).await;
}
// Test error handling
struct ErrorTask;
impl Task for ErrorTask {
type Output = (); // Unit type for error test
fn run(&self) -> Result<Self::Output> {
Err(anyhow!("Task failed"))
}
}
#[tokio::test]
async fn test_error_handling() {
let worker = Worker::new();
// Test error propagation
assert!(worker.wait_for(ErrorTask).await.is_err());
// Test that worker continues to function after error
let result = worker.wait_for(TestTask(21)).await.unwrap();
assert_eq!(result, 42);
}
#[tokio::test]
async fn test_worker_cloning() {
let worker = Worker::new();
let worker2 = worker.clone();
let (result1, result2) = tokio::join!(
worker.wait_for(TestTask(21)),
worker2.wait_for(TestTask(42))
);
assert_eq!(result1.unwrap(), 42);
assert_eq!(result2.unwrap(), 84);
}
#[tokio::test]
async fn test_multiple_workers() {
let worker = Worker::new();
let mut handles = Vec::new();
for i in 0..10 {
let worker = worker.clone();
let handle = tokio::spawn(async move {
let result = worker.wait_for(TestTask(i)).await.unwrap();
assert_eq!(result, i * 2);
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
}
}