diff --git a/crates/djls-server/src/queue.rs b/crates/djls-server/src/queue.rs index c6dc546..8348a2a 100644 --- a/crates/djls-server/src/queue.rs +++ b/crates/djls-server/src/queue.rs @@ -118,9 +118,8 @@ impl Queue { /// Submits an asynchronous task to the queue. /// - /// The task is provided as a closure (`task_fn`) that returns a `Future`. - /// This function wraps the provided closure and future for type erasure - /// and sends it to the background worker task for sequential execution. + /// This method accepts a `Future` directly and sends it to the background worker + /// task for sequential execution. The future should resolve to `Result<()>`. /// /// The `await` on this method only waits for the task to be *sent* to the /// queue's channel, not for the task to be *executed*. If the queue's @@ -128,13 +127,12 @@ impl Queue { /// /// # Usage /// - /// The `task_fn` must be a closure that takes no arguments and returns - /// a `Future` which resolves to `Result<()>`. Typically, this is written - /// using the `|| async move { ... }` syntax: + /// The `future` must be a `Future` which resolves to `Result<()>`. Typically, + /// this is provided using an `async move` block: /// /// ```rust,ignore /// let data_to_capture = 42; - /// queue.submit(move || async move { + /// queue.submit(async move { /// // ... perform async work using data_to_capture ... /// println!("Processing data: {}", data_to_capture); /// tokio::time::sleep(std::time::Duration::from_millis(10)).await; @@ -144,20 +142,14 @@ impl Queue { /// /// # Type Parameters /// - /// - `F`: The type of the closure (`FnOnce() -> Fut`). Must be `Send + 'static`. - /// - `Fut`: The type of the `Future` returned by the closure. Must resolve - /// to `Result<()>` and be `Send + 'static`. - pub async fn submit(&self, task_fn: F) -> Result<()> + /// - `F`: The type of the `Future`. Must resolve to `Result<()>` and be `Send + 'static`. + pub async fn submit(&self, future: F) -> Result<()> where - F: FnOnce() -> Fut + Send + 'static, - Fut: Future> + Send + 'static, + F: Future> + Send + 'static, { // Create the inner closure that matches `TaskClosure`'s signature. - // This closure, when called by the worker: - // 1. Calls the user-provided `task_fn()`. - // 2. Gets the user's concrete `Future` (`Fut`). - // 3. Pins it and Boxes it, implicitly converting it to `TaskFuture`. - let boxed_task_closure: TaskClosure = Box::new(move || Box::pin(task_fn())); + // This closure wraps the future in a way that TaskClosure expects. + let boxed_task_closure: TaskClosure = Box::new(move || Box::pin(future)); // Send the boxed, type-erased closure to the worker task. // This will wait if the channel buffer is full. @@ -209,7 +201,7 @@ mod tests { for i in 0..5 { let counter_clone = Arc::clone(&counter); queue - .submit(move || async move { + .submit(async move { sleep(Duration::from_millis(5)).await; println!("Processing task {}", i); counter_clone.fetch_add(1, Ordering::SeqCst); @@ -221,7 +213,7 @@ mod tests { // Submit a task that will fail queue - .submit(|| async { + .submit(async { println!("Submitting failing task"); Err(anyhow!("Task failed intentionally")) }) @@ -234,7 +226,7 @@ mod tests { // Submit another task let counter_clone = Arc::clone(&counter); queue - .submit(|| async move { + .submit(async move { println!("Processing task after error"); counter_clone.fetch_add(1, Ordering::SeqCst); Ok(()) @@ -257,7 +249,7 @@ mod tests { let counter_clone = Arc::clone(&counter); tasks.push(tokio::spawn(async move { queue_clone - .submit(|| async move { + .submit(async move { counter_clone.fetch_add(1, Ordering::Relaxed); sleep(Duration::from_millis(2)).await; Ok(()) @@ -273,7 +265,7 @@ mod tests { println!("Finished submitting initial 32 tasks"); let counter_clone = Arc::clone(&counter); - let submit_task = queue.submit(|| async move { + let submit_task = queue.submit(async move { println!("Processing the 33rd task"); counter_clone.fetch_add(1, Ordering::Relaxed); Ok(()) @@ -298,7 +290,7 @@ mod tests { let counter_clone1 = Arc::clone(&counter); queue - .submit(|| async move { + .submit(async move { sleep(Duration::from_millis(50)).await; counter_clone1.fetch_add(1, Ordering::SeqCst); Ok(()) @@ -308,7 +300,7 @@ mod tests { let counter_clone2 = Arc::clone(&counter); queue - .submit(|| async move { + .submit(async move { sleep(Duration::from_millis(50)).await; counter_clone2.fetch_add(1, Ordering::SeqCst); Ok(()) @@ -331,13 +323,13 @@ mod tests { let counter = Arc::new(AtomicUsize::new(0)); let counter_clone1 = Arc::clone(&counter); - let task1 = queue1.submit(|| async move { + let task1 = queue1.submit(async move { counter_clone1.fetch_add(1, Ordering::SeqCst); Ok(()) }); let counter_clone2 = Arc::clone(&counter); - let task2 = queue2.submit(|| async move { + let task2 = queue2.submit(async move { counter_clone2.fetch_add(1, Ordering::SeqCst); Ok(()) }); @@ -354,7 +346,7 @@ mod tests { let counter_clone1 = Arc::clone(&counter); queue - .submit(|| async move { + .submit(async move { counter_clone1.fetch_add(1, Ordering::SeqCst); Ok(()) }) @@ -362,13 +354,13 @@ mod tests { .unwrap(); queue - .submit(|| async { Err(anyhow!("Intentional failure")) }) + .submit(async { Err(anyhow!("Intentional failure")) }) .await .unwrap(); let counter_clone2 = Arc::clone(&counter); queue - .submit(|| async move { + .submit(async move { counter_clone2.fetch_add(1, Ordering::SeqCst); Ok(()) }) @@ -379,7 +371,7 @@ mod tests { let counter_clone3 = Arc::clone(&counter); queue - .submit(|| async move { + .submit(async move { counter_clone3.fetch_add(1, Ordering::SeqCst); Ok(()) }) diff --git a/crates/djls-server/src/server.rs b/crates/djls-server/src/server.rs index fcd14dd..a832184 100644 --- a/crates/djls-server/src/server.rs +++ b/crates/djls-server/src/server.rs @@ -158,73 +158,75 @@ impl LanguageServer for DjangoLanguageServer { let project_arc = Arc::clone(&self.project); let client = self.client.clone(); - let settings_arc = Arc::clone(&self.settings); - let task = move || async move { - let mut project_guard = project_arc.write().await; - if let Some(project) = project_guard.as_mut() { - let path_display = project.path().display().to_string(); - client - .log_message( - MessageType::INFO, - &format!( - "Task: Starting initialization for project at: {}", - path_display - ), - ) - .await; - let venv_path = { - let settings = settings_arc.read().await; - settings.venv_path().map(|s| s.to_string()) - }; - - if let Some(ref path) = venv_path { + if let Err(e) = self + .queue + .submit(async move { + let mut project_guard = project_arc.write().await; + if let Some(project) = project_guard.as_mut() { + let path_display = project.path().display().to_string(); client .log_message( MessageType::INFO, - &format!("Using virtual environment from config: {}", path), + &format!( + "Task: Starting initialization for project at: {}", + path_display + ), ) .await; - } - match project.initialize(venv_path.as_deref()) { - Ok(()) => { + let venv_path = { + let settings = settings_arc.read().await; + settings.venv_path().map(|s| s.to_string()) + }; + + if let Some(ref path) = venv_path { client .log_message( MessageType::INFO, - &format!( - "Task: Successfully initialized project: {}", - path_display - ), + &format!("Using virtual environment from config: {}", path), ) .await; } - Err(e) => { - client - .log_message( - MessageType::ERROR, - &format!( - "Task: Failed to initialize Django project at {}: {}", - path_display, e - ), - ) - .await; - *project_guard = None; - } - } - } else { - client - .log_message( - MessageType::INFO, - "Task: No project instance found to initialize.", - ) - .await; - } - Ok(()) - }; - if let Err(e) = self.queue.submit(task).await { + match project.initialize(venv_path.as_deref()) { + Ok(()) => { + client + .log_message( + MessageType::INFO, + &format!( + "Task: Successfully initialized project: {}", + path_display + ), + ) + .await; + } + Err(e) => { + client + .log_message( + MessageType::ERROR, + &format!( + "Task: Failed to initialize Django project at {}: {}", + path_display, e + ), + ) + .await; + *project_guard = None; + } + } + } else { + client + .log_message( + MessageType::INFO, + "Task: No project instance found to initialize.", + ) + .await; + } + Ok(()) + }) + .await + { self.log_message( MessageType::ERROR, &format!("Failed to submit project initialization task: {}", e),