refactor queue to use closures and move project init to queue (#107)

This commit is contained in:
Josh Thomas 2025-04-21 23:04:24 -05:00 committed by GitHub
parent c2e27d43c3
commit 980983e4f3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 365 additions and 129 deletions

View file

@ -1,58 +1,111 @@
use anyhow::Result;
use anyhow::{anyhow, Result};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
pub trait Task: Send + 'static {
type Output: Send + 'static;
fn run(&self) -> Result<Self::Output>;
}
/// Type alias for a type-erased, pinned, heap-allocated, Send-able future
/// that resolves to `Result<()>`.
///
/// This allows storing different concrete `Future` types (resulting from
/// various `async` blocks) in a uniform way, suitable for sending over a channel
/// or storing in collections, as long as they meet the `Send` bound and
/// produce the expected `Result<()>`.
///
/// - `Pin`: Ensures the `Future`'s memory location is stable, which is often
/// required for self-referential `async` blocks.
/// - `Box`: Allocates the `Future` on the heap.
/// - `dyn Future`: Type erasure - hides the specific concrete `Future` type.
/// - `+ Send`: Ensures the `Future` can be safely sent across threads.
type TaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
trait TaskTrait: Send {
fn run_boxed(self: Box<Self>);
}
impl<T: Task> TaskTrait for T {
fn run_boxed(self: Box<Self>) {
match self.run() {
Ok(_) => { /* Task succeeded, do nothing */ }
Err(e) => {
// Log the error if the task failed.
// Consider adding a proper logging mechanism later.
eprintln!("Task failed: {}", e);
}
}
}
}
/// Type alias for a type-erased, heap-allocated, Send-able closure that,
/// when called, returns a `TaskFuture`.
///
/// This represents a unit of work that can be sent to the queue's worker task.
/// The closure captures any necessary data and contains the logic to start
/// the asynchronous operation, returning it as a `TaskFuture`.
///
/// - `Box`: Allocates the closure on the heap.
/// - `dyn FnOnce()`: Type erasure - hides the specific closure type. It takes no
/// arguments.
/// - `-> TaskFuture`: Specifies that calling the closure produces the type-erased future.
/// - `+ Send + 'static`: Ensures the closure itself can be safely sent across
/// threads and has a static lifetime (doesn't borrow short-lived data).
type TaskClosure = Box<dyn FnOnce() -> TaskFuture + Send + 'static>;
/// A simple asynchronous task queue for sequential execution.
///
/// Tasks are submitted as closures that return futures. These closures are sent
/// to a dedicated worker task which executes them one at a time in the order
/// they were received. This ensures sequential processing of background tasks.
///
/// The queue is cloneable (`Arc`-based internally), allowing multiple producers
/// to submit tasks concurrently.
///
/// Shutdown is handled gracefully when the last `Queue` instance is dropped.
#[derive(Clone)]
pub struct Queue {
inner: Arc<QueueInner>,
}
/// Internal state of the queue, managed by an Arc for shared ownership.
struct QueueInner {
sender: mpsc::Sender<Box<dyn TaskTrait>>,
/// The sender half of the MPSC channel used to send tasks (as closures)
/// to the worker task.
sender: mpsc::Sender<TaskClosure>,
/// The sender half of a oneshot channel used to signal the worker task
/// to shut down when the `QueueInner` is dropped.
shutdown_sender: Option<oneshot::Sender<()>>,
}
impl Queue {
/// Creates a new `Queue` and spawns its background worker task.
///
/// The worker task runs indefinitely, waiting for tasks on the MPSC channel
/// or a shutdown signal. Received tasks (closures) are executed sequentially.
/// If a task's future resolves to an `Err`, the error is printed to stderr.
pub fn new() -> Self {
let (sender, mut receiver) = mpsc::channel::<Box<dyn TaskTrait>>(32); // Channel for tasks
// Create the channel for sending task closures. Bounded to 32 pending tasks.
let (sender, mut receiver) = mpsc::channel::<TaskClosure>(32);
// Create the channel for signaling shutdown.
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
// Spawn the worker task.
tokio::spawn(async move {
loop {
tokio::select! {
Some(task) = receiver.recv() => {
task.run_boxed();
}
// Bias selection towards shutdown signal if available
// (though default select! behavior is unspecified randomization)
// biased; // Uncomment if strict shutdown priority is needed
// Wait for the shutdown signal.
_ = &mut shutdown_rx => {
// Drain the channel before shutting down? Optional.
// For now, just break.
// Shutdown signal received, break the loop.
// Consider draining the receiver here if pending tasks
// should be completed before shutdown.
break;
}
else => break,
// Wait for a task closure from the channel.
maybe_task_closure = receiver.recv() => {
if let Some(task_closure) = maybe_task_closure {
// Received a task closure. Execute it to get the future.
let task_future: TaskFuture = task_closure();
// Await the future's completion.
if let Err(e) = task_future.await {
// Log the error if the task failed.
// TODO: Integrate with a proper logging framework.
eprintln!("Task failed: {}", e);
}
} else {
// Channel closed, implies all senders (Queue instances)
// are dropped. Break the loop.
break;
}
}
}
}
eprintln!("Queue worker task shutting down.");
});
Self {
@ -63,17 +116,59 @@ impl Queue {
}
}
/// Submits a task to the queue asynchronously, waiting if the channel is full.
/// The task is executed in the background, and its result is ignored.
pub async fn submit<T>(&self, task: T) -> Result<()>
/// Submits an asynchronous task to the queue.
///
/// The task is provided as a closure (`task_fn`) that returns a `Future`.
/// This function wraps the provided closure and future for type erasure
/// and sends it to the background worker task for sequential execution.
///
/// The `await` on this method only waits for the task to be *sent* to the
/// queue's channel, not for the task to be *executed*. If the queue's
/// channel is full, this method will wait until space becomes available.
///
/// # Usage
///
/// The `task_fn` must be a closure that takes no arguments and returns
/// a `Future` which resolves to `Result<()>`. Typically, this is written
/// using the `|| async move { ... }` syntax:
///
/// ```rust,ignore
/// let data_to_capture = 42;
/// queue.submit(move || async move {
/// // ... perform async work using data_to_capture ...
/// println!("Processing data: {}", data_to_capture);
/// tokio::time::sleep(std::time::Duration::from_millis(10)).await;
/// Ok(()) // Indicate success
/// }).await?;
/// ```
///
/// # Type Parameters
///
/// - `F`: The type of the closure (`FnOnce() -> Fut`). Must be `Send + 'static`.
/// - `Fut`: The type of the `Future` returned by the closure. Must resolve
/// to `Result<()>` and be `Send + 'static`.
pub async fn submit<F, Fut>(&self, task_fn: F) -> Result<()>
where
T: Task + 'static,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<()>> + Send + 'static,
{
// Create the inner closure that matches `TaskClosure`'s signature.
// This closure, when called by the worker:
// 1. Calls the user-provided `task_fn()`.
// 2. Gets the user's concrete `Future` (`Fut`).
// 3. Pins it and Boxes it, implicitly converting it to `TaskFuture`.
let boxed_task_closure: TaskClosure = Box::new(move || Box::pin(task_fn()));
// Send the boxed, type-erased closure to the worker task.
// This will wait if the channel buffer is full.
self.inner
.sender
.send(Box::new(task))
.send(boxed_task_closure)
.await
.map_err(|e| anyhow::anyhow!("Failed to submit task: {}", e))
.map_err(|e| {
// Error likely means the receiver (worker task) has panicked or shut down.
anyhow!("Failed to submit task: queue receiver closed ({})", e)
})
}
}
@ -83,10 +178,16 @@ impl Default for Queue {
}
}
/// Handles cleanup when the last `Queue` reference is dropped.
impl Drop for QueueInner {
fn drop(&mut self) {
// Take the shutdown sender (if it hasn't already been taken or failed).
if let Some(sender) = self.shutdown_sender.take() {
// Send the shutdown signal.
// `.ok()` ignores the result, as the receiver might have already
// terminated if the channel closed naturally or panicked.
sender.send(()).ok();
eprintln!("Sent shutdown signal to queue worker.");
}
}
}
@ -95,126 +196,197 @@ impl Drop for QueueInner {
mod tests {
use super::*;
use anyhow::anyhow;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use tokio::time::sleep;
struct TestTask(i32);
impl Task for TestTask {
type Output = i32;
fn run(&self) -> Result<Self::Output> {
std::thread::sleep(Duration::from_millis(10));
Ok(self.0 * 2)
}
}
struct ErrorTask;
impl Task for ErrorTask {
type Output = ();
fn run(&self) -> Result<Self::Output> {
Err(anyhow!("Task failed intentionally"))
}
}
#[tokio::test]
async fn test_submit_and_process() {
let queue = Queue::new();
let counter = Arc::new(AtomicUsize::new(0));
// Submit a few tasks
for i in 0..5 {
queue.submit(TestTask(i)).await.unwrap();
let counter_clone = Arc::clone(&counter);
queue
.submit(move || async move {
sleep(Duration::from_millis(5)).await;
println!("Processing task {}", i);
counter_clone.fetch_add(1, Ordering::SeqCst);
Ok(())
})
.await
.unwrap();
}
// Submit a task that will fail
queue.submit(ErrorTask).await.unwrap();
queue
.submit(|| async {
println!("Submitting failing task");
Err(anyhow!("Task failed intentionally"))
})
.await
.unwrap();
// Allow some time for tasks to be processed by the background worker.
// In a real scenario, you might not wait like this, but for testing,
// we need to ensure the background task has a chance to run.
sleep(Duration::from_millis(100)).await;
assert_eq!(counter.load(Ordering::SeqCst), 5);
// We can't directly assert results here, but we can check the queue still works.
queue.submit(TestTask(10)).await.unwrap();
sleep(Duration::from_millis(50)).await; // Allow time for the last task
// Submit another task
let counter_clone = Arc::clone(&counter);
queue
.submit(|| async move {
println!("Processing task after error");
counter_clone.fetch_add(1, Ordering::SeqCst);
Ok(())
})
.await
.unwrap();
sleep(Duration::from_millis(50)).await;
assert_eq!(counter.load(Ordering::SeqCst), 6);
}
#[tokio::test]
async fn test_channel_backpressure_submit() {
let queue = Queue::new();
let counter = Arc::new(AtomicUsize::new(0));
// Fill the channel (channel size is 32) using submit
let mut tasks = Vec::new();
for i in 0..32 {
let queue_clone = queue.clone();
// Spawn tasks to submit concurrently, as submit waits
let counter_clone = Arc::clone(&counter);
tasks.push(tokio::spawn(async move {
queue_clone
.submit(TestTask(i))
.submit(|| async move {
counter_clone.fetch_add(1, Ordering::Relaxed);
sleep(Duration::from_millis(2)).await;
Ok(())
})
.await
.expect("Submit should succeed");
println!("Submitted task {}", i);
}));
}
// Wait for all initial submissions to likely be sent (though not necessarily processed)
for task in tasks {
task.await.unwrap();
}
println!("Finished submitting initial 32 tasks");
// Try submitting one more task. This should wait until a slot is free.
// We'll use a timeout to ensure it doesn't block forever if something is wrong.
let submit_task = queue.submit(TestTask(33));
match tokio::time::timeout(Duration::from_millis(200), submit_task).await {
Ok(Ok(_)) => { /* Successfully submitted after waiting */ }
let counter_clone = Arc::clone(&counter);
let submit_task = queue.submit(|| async move {
println!("Processing the 33rd task");
counter_clone.fetch_add(1, Ordering::Relaxed);
Ok(())
});
match tokio::time::timeout(Duration::from_millis(500), submit_task).await {
Ok(Ok(_)) => {
println!("Successfully submitted 33rd task");
}
Ok(Err(e)) => panic!("Submit failed unexpectedly: {}", e),
Err(_) => panic!("Submit timed out, likely blocked due to backpressure not resolving"),
}
// Allow time for processing
sleep(Duration::from_millis(100)).await;
sleep(Duration::from_millis(200)).await;
assert_eq!(counter.load(Ordering::Relaxed), 33);
}
#[tokio::test]
async fn test_shutdown() {
let queue = Queue::new();
queue.submit(TestTask(1)).await.unwrap();
queue.submit(TestTask(2)).await.unwrap();
// Queue is dropped here, triggering shutdown
drop(queue);
let counter = Arc::new(AtomicUsize::new(0));
// Allow time for shutdown signal to be processed and potentially
// for the background task to finish ongoing work (though not guaranteed here).
sleep(Duration::from_millis(100)).await;
// No direct assertion, just checking it doesn't panic/hang.
let counter_clone1 = Arc::clone(&counter);
queue
.submit(|| async move {
sleep(Duration::from_millis(50)).await;
counter_clone1.fetch_add(1, Ordering::SeqCst);
Ok(())
})
.await
.unwrap();
let counter_clone2 = Arc::clone(&counter);
queue
.submit(|| async move {
sleep(Duration::from_millis(50)).await;
counter_clone2.fetch_add(1, Ordering::SeqCst);
Ok(())
})
.await
.unwrap();
drop(queue);
sleep(Duration::from_millis(200)).await;
let final_count = counter.load(Ordering::SeqCst);
println!("Final count after shutdown: {}", final_count);
assert!(final_count <= 2);
}
#[tokio::test]
async fn test_queue_cloning() {
let queue1 = Queue::new();
let queue2 = queue1.clone();
let counter = Arc::new(AtomicUsize::new(0));
// Submit tasks via both clones
let task1 = queue1.submit(TestTask(10));
let task2 = queue2.submit(TestTask(20));
let counter_clone1 = Arc::clone(&counter);
let task1 = queue1.submit(|| async move {
counter_clone1.fetch_add(1, Ordering::SeqCst);
Ok(())
});
let counter_clone2 = Arc::clone(&counter);
let task2 = queue2.submit(|| async move {
counter_clone2.fetch_add(1, Ordering::SeqCst);
Ok(())
});
// Wait for submissions to complete
tokio::try_join!(task1, task2).unwrap();
// Allow time for processing
sleep(Duration::from_millis(100)).await;
assert_eq!(counter.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_error_task_does_not_stop_queue() {
let queue = Queue::new();
let counter = Arc::new(AtomicUsize::new(0));
queue.submit(TestTask(1)).await.unwrap();
queue.submit(ErrorTask).await.unwrap(); // Submit the failing task
queue.submit(TestTask(2)).await.unwrap();
let counter_clone1 = Arc::clone(&counter);
queue
.submit(|| async move {
counter_clone1.fetch_add(1, Ordering::SeqCst);
Ok(())
})
.await
.unwrap();
queue
.submit(|| async { Err(anyhow!("Intentional failure")) })
.await
.unwrap();
let counter_clone2 = Arc::clone(&counter);
queue
.submit(|| async move {
counter_clone2.fetch_add(1, Ordering::SeqCst);
Ok(())
})
.await
.unwrap();
// Allow time for tasks to process
sleep(Duration::from_millis(100)).await;
// Submit another task to ensure the queue is still running after the error
queue.submit(TestTask(3)).await.unwrap();
let counter_clone3 = Arc::clone(&counter);
queue
.submit(|| async move {
counter_clone3.fetch_add(1, Ordering::SeqCst);
Ok(())
})
.await
.unwrap();
sleep(Duration::from_millis(50)).await;
// If we reach here without panic, the queue continued after the error.
// We expect an error message "Task failed: Task failed intentionally"
// to be printed to stderr during the test run.
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
}

View file

@ -1,7 +1,6 @@
use crate::documents::Store;
use crate::queue::Queue;
use crate::workspace::get_project_path;
use anyhow::Result;
use djls_project::DjangoProject;
use std::sync::Arc;
use tokio::sync::RwLock;
@ -29,38 +28,41 @@ impl DjangoLanguageServer {
}
}
async fn log_message(&self, type_: MessageType, message: &str) -> Result<()> {
async fn log_message(&self, type_: MessageType, message: &str) {
self.client.log_message(type_, message).await;
Ok(())
}
}
impl LanguageServer for DjangoLanguageServer {
async fn initialize(&self, params: InitializeParams) -> LspResult<InitializeResult> {
self.log_message(MessageType::INFO, "Initializing server...")
.await;
let project_path = get_project_path(&params);
if let Some(path) = project_path {
let mut project = DjangoProject::new(path);
match project.initialize() {
Ok(()) => {
self.log_message(
MessageType::INFO,
&format!("Using project path: {}", project.path().display()),
)
.await
.ok();
*self.project.write().await = Some(project);
}
Err(e) => {
self.log_message(
MessageType::ERROR,
&format!("Failed to initialize Django project: {}", e),
)
.await
.ok();
}
{
// Scope for write lock
let mut project_guard = self.project.write().await;
if let Some(ref path) = project_path {
self.log_message(
MessageType::INFO,
&format!(
"Project root identified: {}. Creating project instance.",
path.display()
),
)
.await;
*project_guard = Some(DjangoProject::new(path.clone()));
} else {
self.log_message(
MessageType::WARNING,
"Could not determine project root. Project features will be unavailable.",
)
.await;
// Ensure it's None if no path
*project_guard = None;
}
}
} // Lock released
Ok(InitializeResult {
capabilities: ServerCapabilities {
@ -92,10 +94,76 @@ impl LanguageServer for DjangoLanguageServer {
})
}
async fn initialized(&self, _: InitializedParams) {
self.log_message(MessageType::INFO, "server initialized!")
.await
.ok();
async fn initialized(&self, _params: InitializedParams) {
self.log_message(
MessageType::INFO,
"Server received initialized notification.",
)
.await;
let project_arc = Arc::clone(&self.project);
let client = self.client.clone();
let task = move || async move {
let mut project_guard = project_arc.write().await;
if let Some(project) = project_guard.as_mut() {
let path_display = project.path().display().to_string();
client
.log_message(
MessageType::INFO,
&format!(
"Task: Starting initialization for project at: {}",
path_display
),
)
.await;
match project.initialize() {
Ok(()) => {
client
.log_message(
MessageType::INFO,
&format!(
"Task: Successfully initialized project: {}",
path_display
),
)
.await;
}
Err(e) => {
client
.log_message(
MessageType::ERROR,
&format!(
"Task: Failed to initialize Django project at {}: {}",
path_display, e
),
)
.await;
*project_guard = None;
}
}
} else {
client
.log_message(
MessageType::INFO,
"Task: No project instance found to initialize.",
)
.await;
}
Ok(())
};
if let Err(e) = self.queue.submit(task).await {
self.log_message(
MessageType::ERROR,
&format!("Failed to submit project initialization task: {}", e),
)
.await;
} else {
self.log_message(MessageType::INFO, "Scheduled project initialization task.")
.await;
}
}
async fn shutdown(&self) -> LspResult<()> {
@ -112,8 +180,7 @@ impl LanguageServer for DjangoLanguageServer {
MessageType::INFO,
&format!("Opened document: {:?}", params.text_document.uri),
)
.await
.ok();
.await;
}
async fn did_change(&self, params: DidChangeTextDocumentParams) {
@ -131,8 +198,7 @@ impl LanguageServer for DjangoLanguageServer {
MessageType::INFO,
&format!("Changed document: {:?}", params.text_document.uri),
)
.await
.ok();
.await;
}
async fn did_close(&self, params: DidCloseTextDocumentParams) {
@ -150,8 +216,7 @@ impl LanguageServer for DjangoLanguageServer {
MessageType::INFO,
&format!("Closed document: {:?}", params.text_document.uri),
)
.await
.ok();
.await;
}
async fn completion(&self, params: CompletionParams) -> LspResult<Option<CompletionResponse>> {