diff --git a/crates/ruff_server/src/server/schedule/task.rs b/crates/ruff_server/src/server/schedule/task.rs index 2e768c2b18..8e2e712efc 100644 --- a/crates/ruff_server/src/server/schedule/task.rs +++ b/crates/ruff_server/src/server/schedule/task.rs @@ -33,6 +33,7 @@ pub(in crate::server) enum BackgroundSchedule { /// while local tasks have exclusive access and can modify it as they please. Keep in mind that /// local tasks will **block** the main event loop, so only use local tasks if you **need** /// mutable state access or you need the absolute lowest latency possible. +#[must_use] pub(in crate::server) enum Task { Background(BackgroundTaskBuilder), Sync(SyncTask), diff --git a/crates/ty_server/src/server/api.rs b/crates/ty_server/src/server/api.rs index 4b7300373a..51a7d30505 100644 --- a/crates/ty_server/src/server/api.rs +++ b/crates/ty_server/src/server/api.rs @@ -219,13 +219,11 @@ where return; } - let result = ruff_db::panic::catch_unwind(|| { + if let Err(error) = ruff_db::panic::catch_unwind(|| { let snapshot = snapshot; - R::run(snapshot.0, client, params) - }); - - if let Some(response) = request_result_to_response::(&id, client, result, retry) { - respond::(&id, response, client); + R::handle_request(&id, snapshot.0, client, params); + }) { + panic_response::(&id, client, &error, retry); } }) })) @@ -288,58 +286,50 @@ where return; } - let result = ruff_db::panic::catch_unwind(|| { - R::run_with_snapshot(&db, snapshot, client, params) - }); - - if let Some(response) = request_result_to_response::(&id, client, result, retry) { - respond::(&id, response, client); + if let Err(error) = ruff_db::panic::catch_unwind(|| { + R::handle_request(&id, &db, snapshot, client, params); + }) { + panic_response::(&id, client, &error, retry); } }) })) } -fn request_result_to_response( +fn panic_response( id: &RequestId, client: &Client, - result: std::result::Result< - Result<<::RequestType as Request>::Result>, - PanicError, - >, + error: &PanicError, request: Option, -) -> Option::RequestType as Request>::Result>> -where +) where R: traits::RetriableRequestHandler, { - match result { - Ok(response) => Some(response), - Err(error) => { - // Check if the request was canceled due to some modifications to the salsa database. - if error.payload.downcast_ref::().is_some() { - // If the query supports retry, re-queue the request. - // The query is still likely to succeed if the user modified any other document. - if let Some(request) = request { - tracing::trace!( - "request id={} method={} was cancelled by salsa, re-queueing for retry", - request.id, - request.method - ); - client.retry(request); - } else { - tracing::trace!( - "request id={} was cancelled by salsa, sending content modified", - id - ); - respond_silent_error(id.clone(), client, R::salsa_cancellation_error()); - } - None - } else { - Some(Err(Error { - code: lsp_server::ErrorCode::InternalError, - error: anyhow!("request handler {error}"), - })) - } + // Check if the request was canceled due to some modifications to the salsa database. + if error.payload.downcast_ref::().is_some() { + // If the query supports retry, re-queue the request. + // The query is still likely to succeed if the user modified any other document. + if let Some(request) = request { + tracing::trace!( + "request id={} method={} was cancelled by salsa, re-queueing for retry", + request.id, + request.method + ); + client.retry(request); + } else { + tracing::trace!( + "request id={} was cancelled by salsa, sending content modified", + id + ); + respond_silent_error(id.clone(), client, R::salsa_cancellation_error()); } + } else { + respond::( + id, + Err(Error { + code: lsp_server::ErrorCode::InternalError, + error: anyhow!("request handler {error}"), + }), + client, + ); } } @@ -352,7 +342,13 @@ fn sync_notification_task( if let Err(err) = N::run(session, client, params) { tracing::error!("An error occurred while running {id}: {err}"); client.show_error_message("ty encountered a problem. Check the logs for more details."); + + return; } + + // If there's a pending workspace diagnostic long-polling request, + // resume it, but only if the session revision changed (e.g. because some document changed). + session.resume_suspended_workspace_diagnostic_request(client); })) } diff --git a/crates/ty_server/src/server/api/diagnostics.rs b/crates/ty_server/src/server/api/diagnostics.rs index 10229b1d0e..d50edaeaf1 100644 --- a/crates/ty_server/src/server/api/diagnostics.rs +++ b/crates/ty_server/src/server/api/diagnostics.rs @@ -26,17 +26,29 @@ pub(super) struct Diagnostics<'a> { } impl Diagnostics<'_> { - pub(super) fn result_id_from_hash(diagnostics: &[ruff_db::diagnostic::Diagnostic]) -> String { + /// Computes the result ID for `diagnostics`. + /// + /// Returns `None` if there are no diagnostics. + pub(super) fn result_id_from_hash( + diagnostics: &[ruff_db::diagnostic::Diagnostic], + ) -> Option { + if diagnostics.is_empty() { + return None; + } + // Generate result ID based on raw diagnostic content only let mut hasher = DefaultHasher::new(); // Hash the length first to ensure different numbers of diagnostics produce different hashes diagnostics.hash(&mut hasher); - format!("{:x}", hasher.finish()) + Some(format!("{:x}", hasher.finish())) } - pub(super) fn result_id(&self) -> String { + /// Computes the result ID for the diagnostics. + /// + /// Returns `None` if there are no diagnostics. + pub(super) fn result_id(&self) -> Option { Self::result_id_from_hash(&self.items) } diff --git a/crates/ty_server/src/server/api/requests/completion.rs b/crates/ty_server/src/server/api/requests/completion.rs index 5f7b922b68..f97c2b3f3a 100644 --- a/crates/ty_server/src/server/api/requests/completion.rs +++ b/crates/ty_server/src/server/api/requests/completion.rs @@ -28,7 +28,7 @@ impl BackgroundDocumentRequestHandler for CompletionRequestHandler { fn run_with_snapshot( db: &ProjectDatabase, - snapshot: DocumentSnapshot, + snapshot: &DocumentSnapshot, _client: &Client, params: CompletionParams, ) -> crate::server::Result> { diff --git a/crates/ty_server/src/server/api/requests/diagnostic.rs b/crates/ty_server/src/server/api/requests/diagnostic.rs index 6638890416..fccf26ec20 100644 --- a/crates/ty_server/src/server/api/requests/diagnostic.rs +++ b/crates/ty_server/src/server/api/requests/diagnostic.rs @@ -29,11 +29,11 @@ impl BackgroundDocumentRequestHandler for DocumentDiagnosticRequestHandler { fn run_with_snapshot( db: &ProjectDatabase, - snapshot: DocumentSnapshot, + snapshot: &DocumentSnapshot, _client: &Client, params: DocumentDiagnosticParams, ) -> Result { - let diagnostics = compute_diagnostics(db, &snapshot); + let diagnostics = compute_diagnostics(db, snapshot); let Some(diagnostics) = diagnostics else { return Ok(DocumentDiagnosticReportResult::Report( @@ -43,23 +43,26 @@ impl BackgroundDocumentRequestHandler for DocumentDiagnosticRequestHandler { let result_id = diagnostics.result_id(); - let report = if params.previous_result_id.as_deref() == Some(&result_id) { - DocumentDiagnosticReport::Unchanged(RelatedUnchangedDocumentDiagnosticReport { - related_documents: None, - unchanged_document_diagnostic_report: UnchangedDocumentDiagnosticReport { - result_id, - }, - }) - } else { - DocumentDiagnosticReport::Full(RelatedFullDocumentDiagnosticReport { - related_documents: None, - full_document_diagnostic_report: FullDocumentDiagnosticReport { - result_id: Some(result_id), - // SAFETY: Pull diagnostic requests are only called for text documents, not for - // notebook documents. - items: diagnostics.to_lsp_diagnostics(db).expect_text_document(), - }, - }) + let report = match result_id { + Some(new_id) if Some(&new_id) == params.previous_result_id.as_ref() => { + DocumentDiagnosticReport::Unchanged(RelatedUnchangedDocumentDiagnosticReport { + related_documents: None, + unchanged_document_diagnostic_report: UnchangedDocumentDiagnosticReport { + result_id: new_id, + }, + }) + } + new_id => { + DocumentDiagnosticReport::Full(RelatedFullDocumentDiagnosticReport { + related_documents: None, + full_document_diagnostic_report: FullDocumentDiagnosticReport { + result_id: new_id, + // SAFETY: Pull diagnostic requests are only called for text documents, not for + // notebook documents. + items: diagnostics.to_lsp_diagnostics(db).expect_text_document(), + }, + }) + } }; Ok(DocumentDiagnosticReportResult::Report(report)) diff --git a/crates/ty_server/src/server/api/requests/doc_highlights.rs b/crates/ty_server/src/server/api/requests/doc_highlights.rs index c9bb2a5e32..22bea4609e 100644 --- a/crates/ty_server/src/server/api/requests/doc_highlights.rs +++ b/crates/ty_server/src/server/api/requests/doc_highlights.rs @@ -26,7 +26,7 @@ impl BackgroundDocumentRequestHandler for DocumentHighlightRequestHandler { fn run_with_snapshot( db: &ProjectDatabase, - snapshot: DocumentSnapshot, + snapshot: &DocumentSnapshot, _client: &Client, params: DocumentHighlightParams, ) -> crate::server::Result>> { diff --git a/crates/ty_server/src/server/api/requests/document_symbols.rs b/crates/ty_server/src/server/api/requests/document_symbols.rs index 4d9bdc7dee..9e82247531 100644 --- a/crates/ty_server/src/server/api/requests/document_symbols.rs +++ b/crates/ty_server/src/server/api/requests/document_symbols.rs @@ -28,7 +28,7 @@ impl BackgroundDocumentRequestHandler for DocumentSymbolRequestHandler { fn run_with_snapshot( db: &ProjectDatabase, - snapshot: DocumentSnapshot, + snapshot: &DocumentSnapshot, _client: &Client, params: DocumentSymbolParams, ) -> crate::server::Result> { diff --git a/crates/ty_server/src/server/api/requests/goto_declaration.rs b/crates/ty_server/src/server/api/requests/goto_declaration.rs index 27297afef5..5509bc1d93 100644 --- a/crates/ty_server/src/server/api/requests/goto_declaration.rs +++ b/crates/ty_server/src/server/api/requests/goto_declaration.rs @@ -26,7 +26,7 @@ impl BackgroundDocumentRequestHandler for GotoDeclarationRequestHandler { fn run_with_snapshot( db: &ProjectDatabase, - snapshot: DocumentSnapshot, + snapshot: &DocumentSnapshot, _client: &Client, params: GotoDeclarationParams, ) -> crate::server::Result> { diff --git a/crates/ty_server/src/server/api/requests/goto_definition.rs b/crates/ty_server/src/server/api/requests/goto_definition.rs index cc41f1dee6..76c7ff2fd9 100644 --- a/crates/ty_server/src/server/api/requests/goto_definition.rs +++ b/crates/ty_server/src/server/api/requests/goto_definition.rs @@ -26,7 +26,7 @@ impl BackgroundDocumentRequestHandler for GotoDefinitionRequestHandler { fn run_with_snapshot( db: &ProjectDatabase, - snapshot: DocumentSnapshot, + snapshot: &DocumentSnapshot, _client: &Client, params: GotoDefinitionParams, ) -> crate::server::Result> { diff --git a/crates/ty_server/src/server/api/requests/goto_references.rs b/crates/ty_server/src/server/api/requests/goto_references.rs index f630d7b815..4d18b0c366 100644 --- a/crates/ty_server/src/server/api/requests/goto_references.rs +++ b/crates/ty_server/src/server/api/requests/goto_references.rs @@ -26,7 +26,7 @@ impl BackgroundDocumentRequestHandler for ReferencesRequestHandler { fn run_with_snapshot( db: &ProjectDatabase, - snapshot: DocumentSnapshot, + snapshot: &DocumentSnapshot, _client: &Client, params: ReferenceParams, ) -> crate::server::Result>> { diff --git a/crates/ty_server/src/server/api/requests/goto_type_definition.rs b/crates/ty_server/src/server/api/requests/goto_type_definition.rs index 6fb901b6d9..b514064616 100644 --- a/crates/ty_server/src/server/api/requests/goto_type_definition.rs +++ b/crates/ty_server/src/server/api/requests/goto_type_definition.rs @@ -26,7 +26,7 @@ impl BackgroundDocumentRequestHandler for GotoTypeDefinitionRequestHandler { fn run_with_snapshot( db: &ProjectDatabase, - snapshot: DocumentSnapshot, + snapshot: &DocumentSnapshot, _client: &Client, params: GotoTypeDefinitionParams, ) -> crate::server::Result> { diff --git a/crates/ty_server/src/server/api/requests/hover.rs b/crates/ty_server/src/server/api/requests/hover.rs index cc4a9e15a4..508c86eb2c 100644 --- a/crates/ty_server/src/server/api/requests/hover.rs +++ b/crates/ty_server/src/server/api/requests/hover.rs @@ -26,7 +26,7 @@ impl BackgroundDocumentRequestHandler for HoverRequestHandler { fn run_with_snapshot( db: &ProjectDatabase, - snapshot: DocumentSnapshot, + snapshot: &DocumentSnapshot, _client: &Client, params: HoverParams, ) -> crate::server::Result> { diff --git a/crates/ty_server/src/server/api/requests/inlay_hints.rs b/crates/ty_server/src/server/api/requests/inlay_hints.rs index ed17f24464..2c59c9fcfd 100644 --- a/crates/ty_server/src/server/api/requests/inlay_hints.rs +++ b/crates/ty_server/src/server/api/requests/inlay_hints.rs @@ -25,7 +25,7 @@ impl BackgroundDocumentRequestHandler for InlayHintRequestHandler { fn run_with_snapshot( db: &ProjectDatabase, - snapshot: DocumentSnapshot, + snapshot: &DocumentSnapshot, _client: &Client, params: InlayHintParams, ) -> crate::server::Result>> { diff --git a/crates/ty_server/src/server/api/requests/selection_range.rs b/crates/ty_server/src/server/api/requests/selection_range.rs index 7397459da8..6903b055f6 100644 --- a/crates/ty_server/src/server/api/requests/selection_range.rs +++ b/crates/ty_server/src/server/api/requests/selection_range.rs @@ -26,7 +26,7 @@ impl BackgroundDocumentRequestHandler for SelectionRangeRequestHandler { fn run_with_snapshot( db: &ProjectDatabase, - snapshot: DocumentSnapshot, + snapshot: &DocumentSnapshot, _client: &Client, params: SelectionRangeParams, ) -> crate::server::Result>> { diff --git a/crates/ty_server/src/server/api/requests/semantic_tokens.rs b/crates/ty_server/src/server/api/requests/semantic_tokens.rs index 8d23ecd054..b2a96a0c91 100644 --- a/crates/ty_server/src/server/api/requests/semantic_tokens.rs +++ b/crates/ty_server/src/server/api/requests/semantic_tokens.rs @@ -22,7 +22,7 @@ impl BackgroundDocumentRequestHandler for SemanticTokensRequestHandler { fn run_with_snapshot( db: &ProjectDatabase, - snapshot: DocumentSnapshot, + snapshot: &DocumentSnapshot, _client: &Client, _params: SemanticTokensParams, ) -> crate::server::Result> { diff --git a/crates/ty_server/src/server/api/requests/semantic_tokens_range.rs b/crates/ty_server/src/server/api/requests/semantic_tokens_range.rs index e14aedb925..b3e56eca70 100644 --- a/crates/ty_server/src/server/api/requests/semantic_tokens_range.rs +++ b/crates/ty_server/src/server/api/requests/semantic_tokens_range.rs @@ -24,7 +24,7 @@ impl BackgroundDocumentRequestHandler for SemanticTokensRangeRequestHandler { fn run_with_snapshot( db: &ProjectDatabase, - snapshot: DocumentSnapshot, + snapshot: &DocumentSnapshot, _client: &Client, params: SemanticTokensRangeParams, ) -> crate::server::Result> { diff --git a/crates/ty_server/src/server/api/requests/shutdown.rs b/crates/ty_server/src/server/api/requests/shutdown.rs index 73b956168c..221ff82e09 100644 --- a/crates/ty_server/src/server/api/requests/shutdown.rs +++ b/crates/ty_server/src/server/api/requests/shutdown.rs @@ -2,6 +2,8 @@ use crate::Session; use crate::server::api::traits::{RequestHandler, SyncRequestHandler}; use crate::session::client::Client; +use lsp_types::{WorkspaceDiagnosticReport, WorkspaceDiagnosticReportResult}; + pub(crate) struct ShutdownHandler; impl RequestHandler for ShutdownHandler { @@ -9,9 +11,23 @@ impl RequestHandler for ShutdownHandler { } impl SyncRequestHandler for ShutdownHandler { - fn run(session: &mut Session, _client: &Client, _params: ()) -> crate::server::Result<()> { - tracing::debug!("Received shutdown request, waiting for shutdown notification"); + fn run(session: &mut Session, client: &Client, _params: ()) -> crate::server::Result<()> { + tracing::debug!("Received shutdown request, waiting for exit notification"); + + // Respond to any pending workspace diagnostic requests + if let Some(suspended_workspace_request) = + session.take_suspended_workspace_diagnostic_request() + { + client.respond( + &suspended_workspace_request.id, + Ok(WorkspaceDiagnosticReportResult::Report( + WorkspaceDiagnosticReport::default(), + )), + ); + } + session.set_shutdown_requested(true); + Ok(()) } } diff --git a/crates/ty_server/src/server/api/requests/signature_help.rs b/crates/ty_server/src/server/api/requests/signature_help.rs index 71363fceed..4a92d49b55 100644 --- a/crates/ty_server/src/server/api/requests/signature_help.rs +++ b/crates/ty_server/src/server/api/requests/signature_help.rs @@ -28,7 +28,7 @@ impl BackgroundDocumentRequestHandler for SignatureHelpRequestHandler { fn run_with_snapshot( db: &ProjectDatabase, - snapshot: DocumentSnapshot, + snapshot: &DocumentSnapshot, _client: &Client, params: SignatureHelpParams, ) -> crate::server::Result> { diff --git a/crates/ty_server/src/server/api/requests/workspace_diagnostic.rs b/crates/ty_server/src/server/api/requests/workspace_diagnostic.rs index b0349bbc0a..b7a7fa3dc0 100644 --- a/crates/ty_server/src/server/api/requests/workspace_diagnostic.rs +++ b/crates/ty_server/src/server/api/requests/workspace_diagnostic.rs @@ -1,14 +1,15 @@ use crate::PositionEncoding; -use crate::server::Result; use crate::server::api::diagnostics::{Diagnostics, to_lsp_diagnostic}; use crate::server::api::traits::{ BackgroundRequestHandler, RequestHandler, RetriableRequestHandler, }; use crate::server::lazy_work_done_progress::LazyWorkDoneProgress; -use crate::session::SessionSnapshot; +use crate::server::{Action, Result}; use crate::session::client::Client; use crate::session::index::Index; +use crate::session::{SessionSnapshot, SuspendedWorkspaceDiagnosticRequest}; use crate::system::file_to_url; +use lsp_server::RequestId; use lsp_types::request::WorkspaceDiagnosticRequest; use lsp_types::{ FullDocumentDiagnosticReport, PreviousResultId, ProgressToken, @@ -25,6 +26,70 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Instant; use ty_project::{Db, ProgressReporter}; +/// Handler for [Workspace diagnostics](workspace-diagnostics) +/// +/// Workspace diagnostics are special in many ways compared to other request handlers. +/// This is mostly due to the fact that computing them is expensive. Because of that, +/// the LSP supports multiple optimizations of which we all make use: +/// +/// ## Partial results +/// +/// Many clients support partial results. They allow a server +/// to send multiple responses (in the form of `$/progress` notifications) for +/// the same request. We use partial results to stream the results for +/// changed files. This has the obvious benefit is that users +/// don't need to wait for the entire check to complete before they see any diagnostics. +/// The other benefit of "chunking" the work also helps client to incrementally +/// update (and repaint) the diagnostics instead of all at once. +/// We did see lags in VS code for projects with 10k+ diagnostics before implementing +/// this improvement. +/// +/// ## Result IDs +/// +/// The server can compute a result id for every file which the client +/// sends back in the next pull or workspace diagnostic request. The way we use +/// the result id is that we compute a fingerprint of the file's diagnostics (a hash) +/// and compare it with the result id sent by the server. We know that +/// the diagnostics for a file are unchanged (the client still has the most recent review) +/// if the ids compare equal. +/// +/// Result IDs are also useful to identify files for which ty no longer emits +/// any diagnostics. For example, file A contained a syntax error that has now been fixed +/// by the user. The client will send us a result id for file A but we won't match it with +/// any new diagnostics because all errors in the file were fixed. The fact that we can't +/// match up the result ID tells us that we need to clear the diagnostics on the client +/// side by sending an empty diagnostic report (report without any diagnostics). We'll set the +/// result id to `None` so that the client stops sending us a result id for this file. +/// +/// Sending unchanged instead of the full diagnostics for files that haven't changed +/// helps reduce the data that's sent from the server to the client and it also enables long-polling +/// (see the next section). +/// +/// ## Long polling +/// +/// As of today (1st of August 2025), VS code's LSP client automatically schedules a +/// workspace diagnostic request every two seconds because it doesn't know *when* to pull +/// for new workspace diagnostics (it doesn't know what actions invalidate the diagnostics). +/// However, running the workspace diagnostics every two seconds is wasting a lot of CPU cycles (and battery life as a result) +/// if the user's only browsing the project (it requires ty to iterate over all files). +/// That's why we implement long polling (as recommended in the LSP) for workspace diagnostics. +/// +/// The basic idea of long-polling is that the server doesn't respond if there are no diagnostics +/// or all diagnostics are unchanged. Instead, the server keeps the request open (it doesn't respond) +/// and only responses when the diagnostics change. This puts the server in full control of when +/// to recheck a workspace and a client can simply wait for the response to come in. +/// +/// One challenge with long polling for ty's server architecture is that we can't just keep +/// the background thread running because holding on to the [`ProjectDatabase`] references +/// prevents notifications from acquiring the exclusive db lock (or the long polling background thread +/// panics if a notification tries to do so). What we do instead is that this request handler +/// doesn't send a response if there are no diagnostics or all are unchanged and it +/// sets a "[snapshot](SuspendedWorkspaceDiagnosticRequest)" of the workspace diagnostic request on the [`Session`]. +/// The second part to this is in the notification request handling. ty retries the +/// suspended workspace diagnostic request (if any) after every notification if the notification +/// changed the [`Session`]'s state. +/// +/// [workspace-diagnostics](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#workspace_diagnostic) pub(crate) struct WorkspaceDiagnosticRequestHandler; impl RequestHandler for WorkspaceDiagnosticRequestHandler { @@ -33,7 +98,7 @@ impl RequestHandler for WorkspaceDiagnosticRequestHandler { impl BackgroundRequestHandler for WorkspaceDiagnosticRequestHandler { fn run( - snapshot: SessionSnapshot, + snapshot: &SessionSnapshot, client: &Client, params: WorkspaceDiagnosticParams, ) -> Result { @@ -49,7 +114,7 @@ impl BackgroundRequestHandler for WorkspaceDiagnosticRequestHandler { let writer = ResponseWriter::new( params.partial_result_params.partial_result_token, params.previous_result_ids, - &snapshot, + snapshot, client, ); @@ -71,6 +136,50 @@ impl BackgroundRequestHandler for WorkspaceDiagnosticRequestHandler { Ok(reporter.into_final_report()) } + + fn handle_request( + id: &RequestId, + snapshot: SessionSnapshot, + client: &Client, + params: WorkspaceDiagnosticParams, + ) { + let result = Self::run(&snapshot, client, params.clone()); + + // Test if this is a no-op result, in which case we should long-poll the request and + // only respond once some diagnostics have changed to get the latest result ids. + // + // Bulk response: This the simple case. Simply test if all diagnostics are unchanged (or empty) + // Streaming: This trickier but follows the same principle. + // * If the server sent any partial results, then `result` is a `Partial` result (in which + // case we shouldn't do any long polling because some diagnostics changed). + // * If this is a full report, then check if all items are unchanged (or empty), the same as for + // the non-streaming case. + if let Ok(WorkspaceDiagnosticReportResult::Report(full)) = &result { + let all_unchanged = full + .items + .iter() + .all(|item| matches!(item, WorkspaceDocumentDiagnosticReport::Unchanged(_))); + + if all_unchanged { + tracing::debug!( + "Suspending workspace diagnostic request, all diagnostics are unchanged or the project has no diagnostics" + ); + + client.queue_action(Action::SuspendWorkspaceDiagnostics(Box::new( + SuspendedWorkspaceDiagnosticRequest { + id: id.clone(), + params: serde_json::to_value(¶ms).unwrap(), + revision: snapshot.revision(), + }, + ))); + + // Don't respond, keep the request open (long polling). + return; + } + } + + client.respond(id, result); + } } impl RetriableRequestHandler for WorkspaceDiagnosticRequestHandler { @@ -147,7 +256,12 @@ impl ProgressReporter for WorkspaceDiagnosticsProgressReporter<'_> { self.report_progress(); } - let mut response = self.response.lock().unwrap(); + // Another thread might have panicked at this point because of a salsa cancellation which + // poisoned the result. If the response is poisoned, just don't report and wait for our thread + // to unwind with a salsa cancellation next. + let Ok(mut response) = self.response.lock() else { + return; + }; // Don't report empty diagnostics. We clear previous diagnostics in `into_response` // which also handles the case where a file no longer has diagnostics because @@ -207,7 +321,7 @@ impl<'a> ResponseWriter<'a> { token, is_test: snapshot.in_test(), last_flush: Instant::now(), - batched: Vec::new(), + changed: Vec::new(), unchanged: Vec::with_capacity(previous_result_ids.len()), }) } else { @@ -242,35 +356,33 @@ impl<'a> ResponseWriter<'a> { let result_id = Diagnostics::result_id_from_hash(diagnostics); - let is_unchanged = self - .previous_result_ids - .remove(&url) - .is_some_and(|previous_result_id| previous_result_id == result_id); + let report = match result_id { + Some(new_id) if Some(&new_id) == self.previous_result_ids.remove(&url).as_ref() => { + WorkspaceDocumentDiagnosticReport::Unchanged( + WorkspaceUnchangedDocumentDiagnosticReport { + uri: url, + version, + unchanged_document_diagnostic_report: UnchangedDocumentDiagnosticReport { + result_id: new_id, + }, + }, + ) + } + new_id => { + let lsp_diagnostics = diagnostics + .iter() + .map(|diagnostic| to_lsp_diagnostic(db, diagnostic, self.position_encoding)) + .collect::>(); - let report = if is_unchanged { - WorkspaceDocumentDiagnosticReport::Unchanged( - WorkspaceUnchangedDocumentDiagnosticReport { + WorkspaceDocumentDiagnosticReport::Full(WorkspaceFullDocumentDiagnosticReport { uri: url, version, - unchanged_document_diagnostic_report: UnchangedDocumentDiagnosticReport { - result_id, + full_document_diagnostic_report: FullDocumentDiagnosticReport { + result_id: new_id, + items: lsp_diagnostics, }, - }, - ) - } else { - let lsp_diagnostics = diagnostics - .iter() - .map(|diagnostic| to_lsp_diagnostic(db, diagnostic, self.position_encoding)) - .collect::>(); - - WorkspaceDocumentDiagnosticReport::Full(WorkspaceFullDocumentDiagnosticReport { - uri: url, - version, - full_document_diagnostic_report: FullDocumentDiagnosticReport { - result_id: Some(result_id), - items: lsp_diagnostics, - }, - }) + }) + } }; self.write_report(report); @@ -306,7 +418,7 @@ impl<'a> ResponseWriter<'a> { // Handle files that had diagnostics in previous request but no longer have any // Any remaining entries in previous_results are files that were fixed - for previous_url in self.previous_result_ids.into_keys() { + for (previous_url, previous_result_id) in self.previous_result_ids { // This file had diagnostics before but doesn't now, so we need to report it as having no diagnostics let version = self .index @@ -315,22 +427,38 @@ impl<'a> ResponseWriter<'a> { .and_then(|key| self.index.make_document_ref(key).ok()) .map(|doc| i64::from(doc.version())); - items.push(WorkspaceDocumentDiagnosticReport::Full( - WorkspaceFullDocumentDiagnosticReport { - uri: previous_url, - version, - full_document_diagnostic_report: FullDocumentDiagnosticReport { - result_id: None, // No result ID needed for empty diagnostics - items: vec![], // No diagnostics - }, - }, - )); + let new_result_id = Diagnostics::result_id_from_hash(&[]); + + let report = match new_result_id { + Some(new_id) if new_id == previous_result_id => { + WorkspaceDocumentDiagnosticReport::Unchanged( + WorkspaceUnchangedDocumentDiagnosticReport { + uri: previous_url, + version, + unchanged_document_diagnostic_report: + UnchangedDocumentDiagnosticReport { result_id: new_id }, + }, + ) + } + new_id => { + WorkspaceDocumentDiagnosticReport::Full(WorkspaceFullDocumentDiagnosticReport { + uri: previous_url, + version, + full_document_diagnostic_report: FullDocumentDiagnosticReport { + result_id: new_id, + items: vec![], // No diagnostics + }, + }) + } + }; + + items.push(report); } match &mut self.mode { ReportingMode::Streaming(streaming) => { items.extend( - std::mem::take(&mut streaming.batched) + std::mem::take(&mut streaming.changed) .into_iter() .map(WorkspaceDocumentDiagnosticReport::Full), ); @@ -388,7 +516,7 @@ struct Streaming { /// The implementation uses batching to avoid too many /// requests for large projects (can slow down the entire /// analysis). - batched: Vec, + changed: Vec, /// All the unchanged reports. Don't stream them, /// since nothing has changed. unchanged: Vec, @@ -398,7 +526,7 @@ impl Streaming { fn write_report(&mut self, report: WorkspaceDocumentDiagnosticReport) { match report { WorkspaceDocumentDiagnosticReport::Full(full) => { - self.batched.push(full); + self.changed.push(full); } WorkspaceDocumentDiagnosticReport::Unchanged(unchanged) => { self.unchanged.push(unchanged); @@ -407,13 +535,13 @@ impl Streaming { } fn maybe_flush(&mut self) { - if self.batched.is_empty() { + if self.changed.is_empty() { return; } // Flush every ~50ms or whenever we have two items and this is a test run. let should_flush = if self.is_test { - self.batched.len() >= 2 + self.changed.len() >= 2 } else { self.last_flush.elapsed().as_millis() >= 50 }; @@ -422,7 +550,7 @@ impl Streaming { } let items = self - .batched + .changed .drain(..) .map(WorkspaceDocumentDiagnosticReport::Full) .collect(); diff --git a/crates/ty_server/src/server/api/requests/workspace_symbols.rs b/crates/ty_server/src/server/api/requests/workspace_symbols.rs index ac1611f7c4..213d920ce6 100644 --- a/crates/ty_server/src/server/api/requests/workspace_symbols.rs +++ b/crates/ty_server/src/server/api/requests/workspace_symbols.rs @@ -19,7 +19,7 @@ impl RequestHandler for WorkspaceSymbolRequestHandler { impl BackgroundRequestHandler for WorkspaceSymbolRequestHandler { fn run( - snapshot: SessionSnapshot, + snapshot: &SessionSnapshot, _client: &Client, params: WorkspaceSymbolParams, ) -> crate::server::Result> { diff --git a/crates/ty_server/src/server/api/traits.rs b/crates/ty_server/src/server/api/traits.rs index aa297ba08e..5a738c5375 100644 --- a/crates/ty_server/src/server/api/traits.rs +++ b/crates/ty_server/src/server/api/traits.rs @@ -33,10 +33,10 @@ //! See the `./requests` and `./notifications` directories for concrete implementations of these //! traits in action. -use std::borrow::Cow; - use crate::session::client::Client; use crate::session::{DocumentSnapshot, Session, SessionSnapshot}; +use lsp_server::RequestId; +use std::borrow::Cow; use lsp_types::Url; use lsp_types::notification::Notification; @@ -91,12 +91,36 @@ pub(super) trait BackgroundDocumentRequestHandler: RetriableRequestHandler { params: &<::RequestType as Request>::Params, ) -> Cow; + /// Processes the request parameters and returns the LSP request result. + /// + /// This is the main method that handlers implement. It takes the request parameters + /// from the client and computes the appropriate response data for the LSP request. fn run_with_snapshot( + db: &ProjectDatabase, + snapshot: &DocumentSnapshot, + client: &Client, + params: <::RequestType as Request>::Params, + ) -> super::Result<<::RequestType as Request>::Result>; + + /// Handles the entire request lifecycle and sends the response to the client. + /// + /// It allows handlers to customize how the server sends the response to the client. + fn handle_request( + id: &RequestId, db: &ProjectDatabase, snapshot: DocumentSnapshot, client: &Client, params: <::RequestType as Request>::Params, - ) -> super::Result<<::RequestType as Request>::Result>; + ) { + let result = Self::run_with_snapshot(db, &snapshot, client, params); + + if let Err(err) = &result { + tracing::error!("An error occurred with request ID {id}: {err}"); + client.show_error_message("ty encountered a problem. Check the logs for more details."); + } + + client.respond(id, result); + } } /// A request handler that can be run on a background thread. @@ -106,11 +130,34 @@ pub(super) trait BackgroundDocumentRequestHandler: RetriableRequestHandler { /// operations that require access to the entire session state, such as fetching workspace /// diagnostics. pub(super) trait BackgroundRequestHandler: RetriableRequestHandler { + /// Processes the request parameters and returns the LSP request result. + /// + /// This is the main method that handlers implement. It takes the request parameters + /// from the client and computes the appropriate response data for the LSP request. fn run( - snapshot: SessionSnapshot, + snapshot: &SessionSnapshot, client: &Client, params: <::RequestType as Request>::Params, ) -> super::Result<<::RequestType as Request>::Result>; + + /// Handles the request lifecycle and sends the response to the client. + /// + /// It allows handlers to customize how the server sends the response to the client. + fn handle_request( + id: &RequestId, + snapshot: SessionSnapshot, + client: &Client, + params: <::RequestType as Request>::Params, + ) { + let result = Self::run(&snapshot, client, params); + + if let Err(err) = &result { + tracing::error!("An error occurred with request ID {id}: {err}"); + client.show_error_message("ty encountered a problem. Check the logs for more details."); + } + + client.respond(id, result); + } } /// A supertrait for any server notification handler. diff --git a/crates/ty_server/src/server/main_loop.rs b/crates/ty_server/src/server/main_loop.rs index 321cbb95e5..4363147a1c 100644 --- a/crates/ty_server/src/server/main_loop.rs +++ b/crates/ty_server/src/server/main_loop.rs @@ -1,7 +1,7 @@ use crate::server::schedule::Scheduler; use crate::server::{Server, api}; -use crate::session::ClientOptions; use crate::session::client::{Client, ClientResponseHandler}; +use crate::session::{ClientOptions, SuspendedWorkspaceDiagnosticRequest}; use anyhow::anyhow; use crossbeam::select; use lsp_server::Message; @@ -49,7 +49,8 @@ impl Server { if self.session.is_shutdown_requested() { tracing::warn!( - "Received request after server shutdown was requested, discarding" + "Received request `{}` after server shutdown was requested, discarding", + &req.method ); client.respond_err( req.id, @@ -130,7 +131,8 @@ impl Server { .incoming() .is_pending(&request.id) { - api::request(request); + let task = api::request(request); + scheduler.dispatch(task, &mut self.session, client); } else { tracing::debug!( "Request {}/{} was cancelled, not retrying", @@ -142,6 +144,13 @@ impl Server { Action::SendRequest(request) => client.send_request_raw(&self.session, request), + Action::SuspendWorkspaceDiagnostics(suspended_request) => { + self.session.set_suspended_workspace_diagnostics_request( + *suspended_request, + &client, + ); + } + Action::InitializeWorkspaces(workspaces_with_options) => { self.session .initialize_workspaces(workspaces_with_options, &client); @@ -304,6 +313,8 @@ pub(crate) enum Action { /// Send a request from the server to the client. SendRequest(SendRequest), + SuspendWorkspaceDiagnostics(Box), + /// Initialize the workspace after the server received /// the options from the client. InitializeWorkspaces(Vec<(Url, ClientOptions)>), diff --git a/crates/ty_server/src/server/schedule/task.rs b/crates/ty_server/src/server/schedule/task.rs index 3d4ddb3123..220e1f222a 100644 --- a/crates/ty_server/src/server/schedule/task.rs +++ b/crates/ty_server/src/server/schedule/task.rs @@ -34,6 +34,7 @@ pub(in crate::server) enum BackgroundSchedule { /// while local tasks have exclusive access and can modify it as they please. Keep in mind that /// local tasks will **block** the main event loop, so only use local tasks if you **need** /// mutable state access or you need the absolute lowest latency possible. +#[must_use] pub(in crate::server) enum Task { Background(BackgroundTaskBuilder), Sync(SyncTask), diff --git a/crates/ty_server/src/session.rs b/crates/ty_server/src/session.rs index 239cba83b3..443d71e836 100644 --- a/crates/ty_server/src/session.rs +++ b/crates/ty_server/src/session.rs @@ -2,9 +2,9 @@ use anyhow::{Context, anyhow}; use index::DocumentQueryError; -use lsp_server::Message; +use lsp_server::{Message, RequestId}; use lsp_types::notification::{Exit, Notification}; -use lsp_types::request::{Request, Shutdown}; +use lsp_types::request::{Request, Shutdown, WorkspaceDiagnosticRequest}; use lsp_types::{ClientCapabilities, TextDocumentContentChangeEvent, Url}; use options::GlobalOptions; use ruff_db::Db; @@ -24,7 +24,7 @@ pub(crate) use self::options::AllOptions; pub use self::options::{ClientOptions, DiagnosticMode}; pub(crate) use self::settings::ClientSettings; use crate::document::{DocumentKey, DocumentVersion, NotebookDocument}; -use crate::server::publish_settings_diagnostics; +use crate::server::{Action, publish_settings_diagnostics}; use crate::session::client::Client; use crate::session::request_queue::RequestQueue; use crate::system::{AnySystemPath, LSPSystem}; @@ -81,6 +81,16 @@ pub(crate) struct Session { in_test: bool, deferred_messages: VecDeque, + + /// A revision counter. It gets incremented on every change to `Session` that + /// could result in different workspace diagnostics. + revision: u64, + + /// A pending workspace diagnostics request because there were no diagnostics + /// or no changes when when the request ran last time. + /// We'll re-run the request after every change to `Session` (see `revision`) + /// to see if there are now changes and, if so, respond to the client. + suspended_workspace_diagnostics_request: Option, } /// LSP State for a Project @@ -137,6 +147,8 @@ impl Session { request_queue: RequestQueue::new(), shutdown_requested: false, in_test, + suspended_workspace_diagnostics_request: None, + revision: 0, }) } @@ -155,6 +167,56 @@ impl Session { self.shutdown_requested = requested; } + pub(crate) fn set_suspended_workspace_diagnostics_request( + &mut self, + request: SuspendedWorkspaceDiagnosticRequest, + client: &Client, + ) { + self.suspended_workspace_diagnostics_request = Some(request); + // Run the suspended workspace diagnostic request immediately in case there + // were changes since the workspace diagnostics background thread queued + // the action to suspend the workspace diagnostic request. + self.resume_suspended_workspace_diagnostic_request(client); + } + + pub(crate) fn take_suspended_workspace_diagnostic_request( + &mut self, + ) -> Option { + self.suspended_workspace_diagnostics_request.take() + } + + /// Resumes (retries) the workspace diagnostic request if there + /// were any changes to the [`Session`] (the revision got bumped) + /// since the workspace diagnostic request ran last time. + /// + /// The workspace diagnostic requests is ignored if the request + /// was cancelled in the meantime. + pub(crate) fn resume_suspended_workspace_diagnostic_request(&mut self, client: &Client) { + self.suspended_workspace_diagnostics_request = self + .suspended_workspace_diagnostics_request + .take() + .and_then(|request| { + if !self.request_queue.incoming().is_pending(&request.id) { + // Clear out the suspended request if the request has been cancelled. + tracing::debug!("Skipping suspended workspace diagnostics request `{}` because it was cancelled", request.id); + return None; + } + + request.resume_if_revision_changed(self.revision, client) + }); + } + + /// Bumps the revision. + /// + /// The revision is used to track when workspace diagnostics may have changed and need to be re-run. + /// It's okay if a bump doesn't necessarily result in new workspace diagnostics. + /// + /// In general, any change to a project database should bump the revision and so should + /// any change to the document states (but also when the open workspaces change etc.). + fn bump_revision(&mut self) { + self.revision += 1; + } + /// The LSP specification doesn't allow configuration requests during initialization, /// but we need access to the configuration to resolve the settings in turn to create the /// project databases. This will become more important in the future when we support @@ -318,6 +380,8 @@ impl Session { .cloned() }); + self.bump_revision(); + self.project_db_mut(path) .apply_changes(changes, overrides.as_ref()) } @@ -465,6 +529,7 @@ impl Session { position_encoding: self.position_encoding, in_test: self.in_test, resolved_client_capabilities: self.resolved_client_capabilities, + revision: self.revision, } } @@ -483,12 +548,14 @@ impl Session { document: NotebookDocument, ) { self.index_mut().open_notebook_document(path, document); + self.bump_revision(); } /// Registers a text document at the provided `path`. /// If a document is already open here, it will be overwritten. pub(crate) fn open_text_document(&mut self, path: &AnySystemPath, document: TextDocument) { self.index_mut().open_text_document(path, document); + self.bump_revision(); } /// Updates a text document at the associated `key`. @@ -501,8 +568,14 @@ impl Session { new_version: DocumentVersion, ) -> crate::Result<()> { let position_encoding = self.position_encoding; - self.index_mut() - .update_text_document(key, content_changes, new_version, position_encoding) + self.index_mut().update_text_document( + key, + content_changes, + new_version, + position_encoding, + )?; + self.bump_revision(); + Ok(()) } /// De-registers a document, specified by its key. @@ -656,6 +729,7 @@ pub(crate) struct SessionSnapshot { position_encoding: PositionEncoding, resolved_client_capabilities: ResolvedClientCapabilities, in_test: bool, + revision: u64, /// IMPORTANT: It's important that the databases come last, or at least, /// after any `Arc` that we try to extract or mutate in-place using `Arc::into_inner` @@ -689,6 +763,10 @@ impl SessionSnapshot { pub(crate) const fn in_test(&self) -> bool { self.in_test } + + pub(crate) fn revision(&self) -> u64 { + self.revision + } } #[derive(Debug, Default)] @@ -847,3 +925,43 @@ impl DefaultProject { self.0.get_mut() } } + +/// A workspace diagnostic request that didn't yield any changes or diagnostic +/// when it ran the last time. +#[derive(Debug)] +pub(crate) struct SuspendedWorkspaceDiagnosticRequest { + /// The LSP request id + pub(crate) id: RequestId, + + /// The params passed to the `workspace/diagnostic` request. + pub(crate) params: serde_json::Value, + + /// The session's revision when the request ran the last time. + /// + /// This is to prevent races between: + /// * The background thread completes + /// * A did change notification coming in + /// * storing this struct on `Session` + /// + /// The revision helps us detect that a did change notification + /// happened in the meantime, so that we can reschedule the + /// workspace diagnostic request immediately. + pub(crate) revision: u64, +} + +impl SuspendedWorkspaceDiagnosticRequest { + fn resume_if_revision_changed(self, current_revision: u64, client: &Client) -> Option { + if self.revision == current_revision { + return Some(self); + } + + tracing::debug!("Resuming workspace diagnostics request after revision bump"); + client.queue_action(Action::RetryRequest(lsp_server::Request { + id: self.id, + method: WorkspaceDiagnosticRequest::METHOD.to_string(), + params: self.params, + })); + + None + } +} diff --git a/crates/ty_server/tests/e2e/main.rs b/crates/ty_server/tests/e2e/main.rs index c2a6fc7c82..50bced9b1a 100644 --- a/crates/ty_server/tests/e2e/main.rs +++ b/crates/ty_server/tests/e2e/main.rs @@ -56,7 +56,7 @@ use lsp_types::{ DidChangeTextDocumentParams, DidChangeWatchedFilesClientCapabilities, DidChangeWatchedFilesParams, DidCloseTextDocumentParams, DidOpenTextDocumentParams, DocumentDiagnosticParams, DocumentDiagnosticReportResult, FileEvent, InitializeParams, - InitializeResult, InitializedParams, PartialResultParams, PreviousResultId, + InitializeResult, InitializedParams, NumberOrString, PartialResultParams, PreviousResultId, PublishDiagnosticsClientCapabilities, TextDocumentClientCapabilities, TextDocumentContentChangeEvent, TextDocumentIdentifier, TextDocumentItem, Url, VersionedTextDocumentIdentifier, WorkDoneProgressParams, WorkspaceClientCapabilities, @@ -151,6 +151,10 @@ pub(crate) struct TestServer { /// Capabilities registered by the server registered_capabilities: Vec, + + /// Whether a Shutdown request has been sent by the test + /// and the exit sequence should be skipped during `Drop` + shutdown_requested: bool, } impl TestServer { @@ -207,6 +211,7 @@ impl TestServer { initialize_response: None, workspace_configurations, registered_capabilities: Vec::new(), + shutdown_requested: false, } .initialize(workspace_folders, capabilities, initialization_options) } @@ -229,7 +234,7 @@ impl TestServer { }; let init_request_id = self.send_request::(init_params); - self.initialize_response = Some(self.await_response::(init_request_id)?); + self.initialize_response = Some(self.await_response::(&init_request_id)?); self.send_notification::(InitializedParams {}); Ok(self) @@ -330,6 +335,11 @@ impl TestServer { where R: Request, { + // Track if an Exit notification is being sent + if R::METHOD == lsp_types::request::Shutdown::METHOD { + self.shutdown_requested = true; + } + let id = self.next_request_id(); let request = lsp_server::Request::new(id.clone(), R::METHOD.to_string(), params); self.send(Message::Request(request)); @@ -354,9 +364,9 @@ impl TestServer { /// called once per request ID. /// /// [`send_request`]: TestServer::send_request - pub(crate) fn await_response(&mut self, id: RequestId) -> Result { + pub(crate) fn await_response(&mut self, id: &RequestId) -> Result { loop { - if let Some(response) = self.responses.remove(&id) { + if let Some(response) = self.responses.remove(id) { match response { Response { error: None, @@ -373,7 +383,11 @@ impl TestServer { return Err(TestServerError::ResponseError(err).into()); } response => { - return Err(TestServerError::InvalidResponse(id, Box::new(response)).into()); + return Err(TestServerError::InvalidResponse( + id.clone(), + Box::new(response), + ) + .into()); } } } @@ -531,6 +545,16 @@ impl TestServer { panic!("Server dropped client receiver while still running"); } + pub(crate) fn cancel(&mut self, request_id: &RequestId) { + let id_string = request_id.to_string(); + self.send_notification::(lsp_types::CancelParams { + id: match id_string.parse() { + Ok(id) => NumberOrString::Number(id), + Err(_) => NumberOrString::String(id_string), + }, + }); + } + /// Handle workspace configuration requests from the server. /// /// Use the [`get_request`] method to wait for the server to send this request. @@ -652,7 +676,7 @@ impl TestServer { partial_result_params: PartialResultParams::default(), }; let id = self.send_request::(params); - self.await_response::(id) + self.await_response::(&id) } /// Send a `workspace/diagnostic` request with optional previous result IDs. @@ -672,7 +696,7 @@ impl TestServer { }; let id = self.send_request::(params); - self.await_response::(id) + self.await_response::(&id) } } @@ -699,9 +723,9 @@ impl Drop for TestServer { // // The `server_thread` could be `None` if the server exited unexpectedly or panicked or if // it dropped the client connection. - let shutdown_error = if self.server_thread.is_some() { + let shutdown_error = if self.server_thread.is_some() && !self.shutdown_requested { let shutdown_id = self.send_request::(()); - match self.await_response::<()>(shutdown_id) { + match self.await_response::<()>(&shutdown_id) { Ok(()) => { self.send_notification::(()); None diff --git a/crates/ty_server/tests/e2e/pull_diagnostics.rs b/crates/ty_server/tests/e2e/pull_diagnostics.rs index 3cc9fb5844..8e18e9fea0 100644 --- a/crates/ty_server/tests/e2e/pull_diagnostics.rs +++ b/crates/ty_server/tests/e2e/pull_diagnostics.rs @@ -8,7 +8,7 @@ use lsp_types::{ use ruff_db::system::SystemPath; use ty_server::{ClientOptions, DiagnosticMode, PartialWorkspaceProgress}; -use crate::{TestServer, TestServerBuilder}; +use crate::{TestServer, TestServerBuilder, TestServerError}; #[test] fn on_did_open() -> Result<()> { @@ -31,7 +31,7 @@ def foo() -> str: server.open_text_document(foo, &foo_content, 1); let diagnostics = server.document_diagnostic_request(foo, None)?; - insta::assert_debug_snapshot!(diagnostics); + assert_debug_snapshot!(diagnostics); Ok(()) } @@ -239,32 +239,13 @@ def foo() -> str: let mut first_response = server.workspace_diagnostic_request(None)?; sort_workspace_diagnostic_response(&mut first_response); - insta::assert_debug_snapshot!("workspace_diagnostic_initial_state", first_response); + assert_debug_snapshot!("workspace_diagnostic_initial_state", first_response); // Consume all progress notifications sent during workspace diagnostics consume_all_progress_notifications(&mut server)?; // Extract result IDs from the first response - let previous_result_ids = match first_response { - WorkspaceDiagnosticReportResult::Report(report) => { - report.items.into_iter().filter_map(|item| match item { - WorkspaceDocumentDiagnosticReport::Full(full_report) => { - let result_id = full_report.full_document_diagnostic_report.result_id?; - Some(PreviousResultId { - uri: full_report.uri, - value: result_id, - }) - } - WorkspaceDocumentDiagnosticReport::Unchanged(_) => { - panic!("The first response must be a full report, not unchanged"); - } - }) - } - WorkspaceDiagnosticReportResult::Partial(_) => { - panic!("The first response must be a full report"); - } - } - .collect(); + let previous_result_ids = extract_result_ids_from_response(&first_response); // Make changes to files B, C, D, and E (leave A unchanged) // Need to open files before changing them @@ -330,7 +311,7 @@ def foo() -> str: // Consume all progress notifications sent during the second workspace diagnostics consume_all_progress_notifications(&mut server)?; - insta::assert_debug_snapshot!("workspace_diagnostic_after_changes", second_response); + assert_debug_snapshot!("workspace_diagnostic_after_changes", second_response); Ok(()) } @@ -426,7 +407,7 @@ def foo() -> str: // First, read the response of the workspace diagnostic request. // Note: This response comes after the progress notifications but it simplifies the test to read it first. - let final_response = server.await_response::(request_id)?; + let final_response = server.await_response::(&request_id)?; // Process the final report. // This should always be a partial report. However, the type definition in the LSP specification @@ -509,27 +490,7 @@ fn workspace_diagnostic_streaming_with_caching() -> Result<()> { // Consume progress notifications from first request consume_all_progress_notifications(&mut server)?; - let result_ids = match first_response { - WorkspaceDiagnosticReportResult::Report(report) => report - .items - .into_iter() - .filter_map(|item| { - if let WorkspaceDocumentDiagnosticReport::Full(full) = item { - full.full_document_diagnostic_report - .result_id - .map(|id| PreviousResultId { - uri: full.uri, - value: id, - }) - } else { - panic!("Expected Full report in initial response"); - } - }) - .collect::>(), - WorkspaceDiagnosticReportResult::Partial(_) => { - panic!("Request without a partial response token should not use streaming") - } - }; + let result_ids = extract_result_ids_from_response(&first_response); assert_eq!(result_ids.len(), NUM_FILES); @@ -578,7 +539,7 @@ fn workspace_diagnostic_streaming_with_caching() -> Result<()> { }, }); - let final_response2 = server.await_response::(request2_id)?; + let final_response2 = server.await_response::(&request2_id)?; let mut all_items = Vec::new(); @@ -641,3 +602,344 @@ fn sort_workspace_report_items(items: &mut [WorkspaceDocumentDiagnosticReport]) items.sort_unstable_by(|a, b| item_uri(a).cmp(item_uri(b))); } + +/// The LSP specification requires that the server sends a response for every request. +/// +/// This test verifies that the server responds to a long-polling workspace diagnostic request +/// when the server is shut down. +#[test] +fn workspace_diagnostic_long_polling_responds_on_shutdown() -> Result<()> { + let _filter = filter_result_id(); + + let workspace_root = SystemPath::new("src"); + let file_path = SystemPath::new("src/test.py"); + let file_content = "\ +def hello() -> str: + return \"world\" +"; + + // Create a project with one file but no diagnostics + let mut server = create_workspace_server_with_file(workspace_root, file_path, file_content)?; + + // Make a workspace diagnostic request to a project with one file but no diagnostics + // This should trigger long-polling since the project has no diagnostics + let request_id = send_workspace_diagnostic_request(&mut server); + + assert_workspace_diagnostics_suspends_for_long_polling(&mut server, &request_id); + + // Send shutdown request - this should cause the suspended workspace diagnostic request to respond + let shutdown_id = server.send_request::(()); + + // The workspace diagnostic request should now respond with an empty report + let workspace_response = + server.await_response::(&request_id)?; + + // Complete shutdown sequence + server.await_response::<()>(&shutdown_id)?; + server.send_notification::(()); + + // Verify we got an empty report (default response during shutdown) + assert_debug_snapshot!( + "workspace_diagnostic_long_polling_shutdown_response", + workspace_response + ); + + Ok(()) +} + +/// Tests that the server responds to a long-polling workspace diagnostic request +/// after a change introduced a new diagnostic. +#[test] +fn workspace_diagnostic_long_polling_responds_on_change() -> Result<()> { + let _filter = filter_result_id(); + + let workspace_root = SystemPath::new("src"); + let file_path = SystemPath::new("src/test.py"); + let file_content_no_error = "\ +def hello() -> str: + return \"world\" +"; + let file_content_with_error = "\ +def hello() -> str: + return 42 # Type error: expected str, got int +"; + + // Create a project with one file but no diagnostics + let mut server = + create_workspace_server_with_file(workspace_root, file_path, file_content_no_error)?; + + // Open the file first + server.open_text_document(file_path, &file_content_no_error, 1); + + // Make a workspace diagnostic request to a project with one file but no diagnostics + // This should trigger long-polling since the project has no diagnostics + let request_id = send_workspace_diagnostic_request(&mut server); + + // Verify the request doesn't complete immediately (should be long-polling) + assert_workspace_diagnostics_suspends_for_long_polling(&mut server, &request_id); + + // Now introduce an error to the file - this should trigger the long-polling request to complete + server.change_text_document( + file_path, + vec![lsp_types::TextDocumentContentChangeEvent { + range: None, + range_length: None, + text: file_content_with_error.to_string(), + }], + 2, + ); + + // The workspace diagnostic request should now complete with the new diagnostic + let workspace_response = + server.await_response::(&request_id)?; + + // Verify we got a report with one file containing the new diagnostic + assert_debug_snapshot!( + "workspace_diagnostic_long_polling_change_response", + workspace_response + ); + + Ok(()) +} + +/// The LSP specification requires that the server responds to each request with exactly one response. +/// +/// This test verifies that the server sends one response (and not two) if a long polling workspace diagnostic request +/// is cancelled. +#[test] +fn workspace_diagnostic_long_polling_responds_on_cancellation() -> Result<()> { + let _filter = filter_result_id(); + + let workspace_root = SystemPath::new("src"); + let file_path = SystemPath::new("src/test.py"); + let file_content = "\ +def hello() -> str: + return \"world\" +"; + + // Create a project with one file but no diagnostics + let mut server = create_workspace_server_with_file(workspace_root, file_path, file_content)?; + + // Make a workspace diagnostic request to a project with one file but no diagnostics + // This should trigger long-polling since the project has no diagnostics + let request_id = send_workspace_diagnostic_request(&mut server); + + // Verify the request doesn't complete immediately (should be long-polling) + assert_workspace_diagnostics_suspends_for_long_polling(&mut server, &request_id); + + // Send a cancel request notification for the suspended request + // The request_id from send_request should match the ID that the server expects + // Based on logs, the server shows request id=2, so let's try using that directly + server.cancel(&request_id); + + // The workspace diagnostic request should now respond with a cancellation response (Err). + let result = server.await_response::(&request_id); + assert_debug_snapshot!( + "workspace_diagnostic_long_polling_cancellation_result", + result + ); + + // The test server's drop implementation asserts that we aren't sending the response twice. + + Ok(()) +} + +/// This test verifies an entire workspace diagnostic cycle with long-polling: +/// * Initial suspend with no diagnostics +/// * Change the file to introduce a diagnostic, server should respond with the new diagnostics +/// * Send a second workspace diagnostic request, which should suspend again because the diagnostics haven't changed +/// * Change the file again to fix the diagnostic, server should respond with no diagnostics +#[test] +fn workspace_diagnostic_long_polling_suspend_change_suspend_cycle() -> Result<()> { + let _filter = filter_result_id(); + + let workspace_root = SystemPath::new("src"); + let file_path = SystemPath::new("src/test.py"); + let file_content_no_error = "\ +def hello() -> str: + return \"world\" +"; + let file_content_with_error = "\ +def hello() -> str: + return 42 # Type error: expected str, got int +"; + let file_content_fixed = "\ +def hello() -> str: + return \"fixed\" +"; + + // Create a project with one file but no diagnostics + let mut server = + create_workspace_server_with_file(workspace_root, file_path, file_content_no_error)?; + + // Open the file first + server.open_text_document(file_path, &file_content_no_error, 1); + + // PHASE 1: Initial suspend (no diagnostics) + let request_id_1 = send_workspace_diagnostic_request(&mut server); + assert_workspace_diagnostics_suspends_for_long_polling(&mut server, &request_id_1); + + // PHASE 2: Introduce error to trigger response + server.change_text_document( + file_path, + vec![lsp_types::TextDocumentContentChangeEvent { + range: None, + range_length: None, + text: file_content_with_error.to_string(), + }], + 2, + ); + + // First request should complete with diagnostics + let first_response = server.await_response::(&request_id_1)?; + + // Extract result IDs from the first response for the second request + let previous_result_ids = extract_result_ids_from_response(&first_response); + + // PHASE 3: Second request with previous result IDs - should suspend again since diagnostics unchanged + let request_id_2 = + server.send_request::(WorkspaceDiagnosticParams { + identifier: None, + previous_result_ids, + work_done_progress_params: WorkDoneProgressParams { + work_done_token: None, + }, + partial_result_params: PartialResultParams { + partial_result_token: None, + }, + }); + + // Second request should suspend since diagnostics haven't changed + assert_workspace_diagnostics_suspends_for_long_polling(&mut server, &request_id_2); + + // PHASE 4: Fix the error to trigger the second response + server.change_text_document( + file_path, + vec![lsp_types::TextDocumentContentChangeEvent { + range: None, + range_length: None, + text: file_content_fixed.to_string(), + }], + 3, + ); + + // Second request should complete with the fix (no diagnostics) + let second_response = + server.await_response::(&request_id_2)?; + + // Snapshot both responses to verify the full cycle + assert_debug_snapshot!( + "workspace_diagnostic_suspend_change_suspend_first_response", + first_response + ); + assert_debug_snapshot!( + "workspace_diagnostic_suspend_change_suspend_second_response", + second_response + ); + + Ok(()) +} + +// Helper functions for long-polling tests +fn create_workspace_server_with_file( + workspace_root: &SystemPath, + file_path: &SystemPath, + file_content: &str, +) -> Result { + let global_options = ClientOptions::default().with_diagnostic_mode(DiagnosticMode::Workspace); + + TestServerBuilder::new()? + .with_workspace(workspace_root, global_options.clone())? + .with_file(file_path, file_content)? + .with_initialization_options(global_options) + .enable_pull_diagnostics(true) + .build()? + .wait_until_workspaces_are_initialized() +} + +/// Sends a workspace diagnostic request to the server. +/// +/// Unlike [`TestServer::workspace_diagnostic_request`], this function does not wait for the response. +fn send_workspace_diagnostic_request(server: &mut TestServer) -> lsp_server::RequestId { + server.send_request::(WorkspaceDiagnosticParams { + identifier: None, + previous_result_ids: Vec::new(), + work_done_progress_params: WorkDoneProgressParams { + work_done_token: None, + }, + partial_result_params: PartialResultParams { + partial_result_token: None, + }, + }) +} + +#[track_caller] +fn assert_workspace_diagnostics_suspends_for_long_polling( + server: &mut TestServer, + request_id: &lsp_server::RequestId, +) { + match server.await_response::(request_id) { + Ok(_) => { + panic!("Expected workspace diagnostic request to suspend for long-polling."); + } + Err(error) => { + if let Some(test_error) = error.downcast_ref::() { + assert!( + matches!(test_error, TestServerError::RecvTimeoutError(_)), + "Response should time out because the request is suspended for long-polling" + ); + } else { + panic!("Unexpected error type: {error:?}"); + } + } + } +} + +fn extract_result_ids_from_response( + response: &WorkspaceDiagnosticReportResult, +) -> Vec { + match response { + WorkspaceDiagnosticReportResult::Report(report) => { + report + .items + .iter() + .filter_map(|item| match item { + WorkspaceDocumentDiagnosticReport::Full(full_report) => { + let result_id = full_report + .full_document_diagnostic_report + .result_id + .as_ref()?; + Some(PreviousResultId { + uri: full_report.uri.clone(), + value: result_id.clone(), + }) + } + WorkspaceDocumentDiagnosticReport::Unchanged(_) => { + // Unchanged reports don't provide new result IDs + None + } + }) + .collect() + } + WorkspaceDiagnosticReportResult::Partial(partial) => { + // For partial results, extract from items the same way + partial + .items + .iter() + .filter_map(|item| match item { + WorkspaceDocumentDiagnosticReport::Full(full_report) => { + let result_id = full_report + .full_document_diagnostic_report + .result_id + .as_ref()?; + Some(PreviousResultId { + uri: full_report.uri.clone(), + value: result_id.clone(), + }) + } + WorkspaceDocumentDiagnosticReport::Unchanged(_) => None, + }) + .collect() + } + } +} diff --git a/crates/ty_server/tests/e2e/snapshots/e2e__pull_diagnostics__workspace_diagnostic_long_polling_cancellation_result.snap b/crates/ty_server/tests/e2e/snapshots/e2e__pull_diagnostics__workspace_diagnostic_long_polling_cancellation_result.snap new file mode 100644 index 0000000000..a40dba6e86 --- /dev/null +++ b/crates/ty_server/tests/e2e/snapshots/e2e__pull_diagnostics__workspace_diagnostic_long_polling_cancellation_result.snap @@ -0,0 +1,13 @@ +--- +source: crates/ty_server/tests/e2e/pull_diagnostics.rs +expression: result +--- +Err( + ResponseError( + ResponseError { + code: -32800, + message: "request was cancelled by client", + data: None, + }, + ), +) diff --git a/crates/ty_server/tests/e2e/snapshots/e2e__pull_diagnostics__workspace_diagnostic_long_polling_change_response.snap b/crates/ty_server/tests/e2e/snapshots/e2e__pull_diagnostics__workspace_diagnostic_long_polling_change_response.snap new file mode 100644 index 0000000000..e2f12e261c --- /dev/null +++ b/crates/ty_server/tests/e2e/snapshots/e2e__pull_diagnostics__workspace_diagnostic_long_polling_change_response.snap @@ -0,0 +1,112 @@ +--- +source: crates/ty_server/tests/e2e/pull_diagnostics.rs +expression: workspace_response +--- +Report( + WorkspaceDiagnosticReport { + items: [ + Full( + WorkspaceFullDocumentDiagnosticReport { + uri: Url { + scheme: "file", + cannot_be_a_base: false, + username: "", + password: None, + host: None, + port: None, + path: "/src/test.py", + query: None, + fragment: None, + }, + version: Some( + 2, + ), + full_document_diagnostic_report: FullDocumentDiagnosticReport { + result_id: Some( + "[RESULT_ID]", + ), + items: [ + Diagnostic { + range: Range { + start: Position { + line: 1, + character: 11, + }, + end: Position { + line: 1, + character: 13, + }, + }, + severity: Some( + Error, + ), + code: Some( + String( + "invalid-return-type", + ), + ), + code_description: Some( + CodeDescription { + href: Url { + scheme: "https", + cannot_be_a_base: false, + username: "", + password: None, + host: Some( + Domain( + "ty.dev", + ), + ), + port: None, + path: "/rules", + query: None, + fragment: Some( + "invalid-return-type", + ), + }, + }, + ), + source: Some( + "ty", + ), + message: "Return type does not match returned value: expected `str`, found `Literal[42]`", + related_information: Some( + [ + DiagnosticRelatedInformation { + location: Location { + uri: Url { + scheme: "file", + cannot_be_a_base: false, + username: "", + password: None, + host: None, + port: None, + path: "/src/test.py", + query: None, + fragment: None, + }, + range: Range { + start: Position { + line: 0, + character: 15, + }, + end: Position { + line: 0, + character: 18, + }, + }, + }, + message: "Expected `str` because of return type", + }, + ], + ), + tags: None, + data: None, + }, + ], + }, + }, + ), + ], + }, +) diff --git a/crates/ty_server/tests/e2e/snapshots/e2e__pull_diagnostics__workspace_diagnostic_long_polling_shutdown_response.snap b/crates/ty_server/tests/e2e/snapshots/e2e__pull_diagnostics__workspace_diagnostic_long_polling_shutdown_response.snap new file mode 100644 index 0000000000..fb97ba434a --- /dev/null +++ b/crates/ty_server/tests/e2e/snapshots/e2e__pull_diagnostics__workspace_diagnostic_long_polling_shutdown_response.snap @@ -0,0 +1,9 @@ +--- +source: crates/ty_server/tests/e2e/pull_diagnostics.rs +expression: workspace_response +--- +Report( + WorkspaceDiagnosticReport { + items: [], + }, +) diff --git a/crates/ty_server/tests/e2e/snapshots/e2e__pull_diagnostics__workspace_diagnostic_suspend_change_suspend_first_response.snap b/crates/ty_server/tests/e2e/snapshots/e2e__pull_diagnostics__workspace_diagnostic_suspend_change_suspend_first_response.snap new file mode 100644 index 0000000000..6f38e6bc19 --- /dev/null +++ b/crates/ty_server/tests/e2e/snapshots/e2e__pull_diagnostics__workspace_diagnostic_suspend_change_suspend_first_response.snap @@ -0,0 +1,112 @@ +--- +source: crates/ty_server/tests/e2e/pull_diagnostics.rs +expression: first_response +--- +Report( + WorkspaceDiagnosticReport { + items: [ + Full( + WorkspaceFullDocumentDiagnosticReport { + uri: Url { + scheme: "file", + cannot_be_a_base: false, + username: "", + password: None, + host: None, + port: None, + path: "/src/test.py", + query: None, + fragment: None, + }, + version: Some( + 2, + ), + full_document_diagnostic_report: FullDocumentDiagnosticReport { + result_id: Some( + "[RESULT_ID]", + ), + items: [ + Diagnostic { + range: Range { + start: Position { + line: 1, + character: 11, + }, + end: Position { + line: 1, + character: 13, + }, + }, + severity: Some( + Error, + ), + code: Some( + String( + "invalid-return-type", + ), + ), + code_description: Some( + CodeDescription { + href: Url { + scheme: "https", + cannot_be_a_base: false, + username: "", + password: None, + host: Some( + Domain( + "ty.dev", + ), + ), + port: None, + path: "/rules", + query: None, + fragment: Some( + "invalid-return-type", + ), + }, + }, + ), + source: Some( + "ty", + ), + message: "Return type does not match returned value: expected `str`, found `Literal[42]`", + related_information: Some( + [ + DiagnosticRelatedInformation { + location: Location { + uri: Url { + scheme: "file", + cannot_be_a_base: false, + username: "", + password: None, + host: None, + port: None, + path: "/src/test.py", + query: None, + fragment: None, + }, + range: Range { + start: Position { + line: 0, + character: 15, + }, + end: Position { + line: 0, + character: 18, + }, + }, + }, + message: "Expected `str` because of return type", + }, + ], + ), + tags: None, + data: None, + }, + ], + }, + }, + ), + ], + }, +) diff --git a/crates/ty_server/tests/e2e/snapshots/e2e__pull_diagnostics__workspace_diagnostic_suspend_change_suspend_second_response.snap b/crates/ty_server/tests/e2e/snapshots/e2e__pull_diagnostics__workspace_diagnostic_suspend_change_suspend_second_response.snap new file mode 100644 index 0000000000..1276eabc94 --- /dev/null +++ b/crates/ty_server/tests/e2e/snapshots/e2e__pull_diagnostics__workspace_diagnostic_suspend_change_suspend_second_response.snap @@ -0,0 +1,32 @@ +--- +source: crates/ty_server/tests/e2e/pull_diagnostics.rs +expression: second_response +--- +Report( + WorkspaceDiagnosticReport { + items: [ + Full( + WorkspaceFullDocumentDiagnosticReport { + uri: Url { + scheme: "file", + cannot_be_a_base: false, + username: "", + password: None, + host: None, + port: None, + path: "/src/test.py", + query: None, + fragment: None, + }, + version: Some( + 3, + ), + full_document_diagnostic_report: FullDocumentDiagnosticReport { + result_id: None, + items: [], + }, + }, + ), + ], + }, +)