[ty] Implement long-polling for workspace diagnsotics (#19670)
Some checks are pending
CI / Determine changes (push) Waiting to run
CI / cargo fmt (push) Waiting to run
CI / cargo clippy (push) Blocked by required conditions
CI / cargo test (linux) (push) Blocked by required conditions
CI / cargo test (linux, release) (push) Blocked by required conditions
CI / cargo test (windows) (push) Blocked by required conditions
CI / cargo test (wasm) (push) Blocked by required conditions
CI / cargo build (release) (push) Waiting to run
CI / cargo build (msrv) (push) Blocked by required conditions
CI / cargo fuzz build (push) Blocked by required conditions
CI / fuzz parser (push) Blocked by required conditions
CI / test scripts (push) Blocked by required conditions
CI / mkdocs (push) Waiting to run
CI / ecosystem (push) Blocked by required conditions
CI / Fuzz for new ty panics (push) Blocked by required conditions
CI / cargo shear (push) Blocked by required conditions
CI / python package (push) Waiting to run
CI / pre-commit (push) Waiting to run
CI / formatter instabilities and black similarity (push) Blocked by required conditions
CI / test ruff-lsp (push) Blocked by required conditions
CI / check playground (push) Blocked by required conditions
CI / benchmarks-instrumented (push) Blocked by required conditions
CI / benchmarks-walltime (push) Blocked by required conditions
[ty Playground] Release / publish (push) Waiting to run

This commit is contained in:
Micha Reiser 2025-08-04 12:26:38 +02:00 committed by GitHub
parent 736c4ab05a
commit f473f6b6e5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
31 changed files with 1138 additions and 201 deletions

View file

@ -33,6 +33,7 @@ pub(in crate::server) enum BackgroundSchedule {
/// while local tasks have exclusive access and can modify it as they please. Keep in mind that
/// local tasks will **block** the main event loop, so only use local tasks if you **need**
/// mutable state access or you need the absolute lowest latency possible.
#[must_use]
pub(in crate::server) enum Task {
Background(BackgroundTaskBuilder),
Sync(SyncTask),

View file

@ -219,13 +219,11 @@ where
return;
}
let result = ruff_db::panic::catch_unwind(|| {
if let Err(error) = ruff_db::panic::catch_unwind(|| {
let snapshot = snapshot;
R::run(snapshot.0, client, params)
});
if let Some(response) = request_result_to_response::<R>(&id, client, result, retry) {
respond::<R>(&id, response, client);
R::handle_request(&id, snapshot.0, client, params);
}) {
panic_response::<R>(&id, client, &error, retry);
}
})
}))
@ -288,58 +286,50 @@ where
return;
}
let result = ruff_db::panic::catch_unwind(|| {
R::run_with_snapshot(&db, snapshot, client, params)
});
if let Some(response) = request_result_to_response::<R>(&id, client, result, retry) {
respond::<R>(&id, response, client);
if let Err(error) = ruff_db::panic::catch_unwind(|| {
R::handle_request(&id, &db, snapshot, client, params);
}) {
panic_response::<R>(&id, client, &error, retry);
}
})
}))
}
fn request_result_to_response<R>(
fn panic_response<R>(
id: &RequestId,
client: &Client,
result: std::result::Result<
Result<<<R as RequestHandler>::RequestType as Request>::Result>,
PanicError,
>,
error: &PanicError,
request: Option<lsp_server::Request>,
) -> Option<Result<<<R as RequestHandler>::RequestType as Request>::Result>>
where
) where
R: traits::RetriableRequestHandler,
{
match result {
Ok(response) => Some(response),
Err(error) => {
// Check if the request was canceled due to some modifications to the salsa database.
if error.payload.downcast_ref::<salsa::Cancelled>().is_some() {
// If the query supports retry, re-queue the request.
// The query is still likely to succeed if the user modified any other document.
if let Some(request) = request {
tracing::trace!(
"request id={} method={} was cancelled by salsa, re-queueing for retry",
request.id,
request.method
);
client.retry(request);
} else {
tracing::trace!(
"request id={} was cancelled by salsa, sending content modified",
id
);
respond_silent_error(id.clone(), client, R::salsa_cancellation_error());
}
None
} else {
Some(Err(Error {
code: lsp_server::ErrorCode::InternalError,
error: anyhow!("request handler {error}"),
}))
}
// Check if the request was canceled due to some modifications to the salsa database.
if error.payload.downcast_ref::<salsa::Cancelled>().is_some() {
// If the query supports retry, re-queue the request.
// The query is still likely to succeed if the user modified any other document.
if let Some(request) = request {
tracing::trace!(
"request id={} method={} was cancelled by salsa, re-queueing for retry",
request.id,
request.method
);
client.retry(request);
} else {
tracing::trace!(
"request id={} was cancelled by salsa, sending content modified",
id
);
respond_silent_error(id.clone(), client, R::salsa_cancellation_error());
}
} else {
respond::<R>(
id,
Err(Error {
code: lsp_server::ErrorCode::InternalError,
error: anyhow!("request handler {error}"),
}),
client,
);
}
}
@ -352,7 +342,13 @@ fn sync_notification_task<N: traits::SyncNotificationHandler>(
if let Err(err) = N::run(session, client, params) {
tracing::error!("An error occurred while running {id}: {err}");
client.show_error_message("ty encountered a problem. Check the logs for more details.");
return;
}
// If there's a pending workspace diagnostic long-polling request,
// resume it, but only if the session revision changed (e.g. because some document changed).
session.resume_suspended_workspace_diagnostic_request(client);
}))
}

View file

@ -26,17 +26,29 @@ pub(super) struct Diagnostics<'a> {
}
impl Diagnostics<'_> {
pub(super) fn result_id_from_hash(diagnostics: &[ruff_db::diagnostic::Diagnostic]) -> String {
/// Computes the result ID for `diagnostics`.
///
/// Returns `None` if there are no diagnostics.
pub(super) fn result_id_from_hash(
diagnostics: &[ruff_db::diagnostic::Diagnostic],
) -> Option<String> {
if diagnostics.is_empty() {
return None;
}
// Generate result ID based on raw diagnostic content only
let mut hasher = DefaultHasher::new();
// Hash the length first to ensure different numbers of diagnostics produce different hashes
diagnostics.hash(&mut hasher);
format!("{:x}", hasher.finish())
Some(format!("{:x}", hasher.finish()))
}
pub(super) fn result_id(&self) -> String {
/// Computes the result ID for the diagnostics.
///
/// Returns `None` if there are no diagnostics.
pub(super) fn result_id(&self) -> Option<String> {
Self::result_id_from_hash(&self.items)
}

View file

@ -28,7 +28,7 @@ impl BackgroundDocumentRequestHandler for CompletionRequestHandler {
fn run_with_snapshot(
db: &ProjectDatabase,
snapshot: DocumentSnapshot,
snapshot: &DocumentSnapshot,
_client: &Client,
params: CompletionParams,
) -> crate::server::Result<Option<CompletionResponse>> {

View file

@ -29,11 +29,11 @@ impl BackgroundDocumentRequestHandler for DocumentDiagnosticRequestHandler {
fn run_with_snapshot(
db: &ProjectDatabase,
snapshot: DocumentSnapshot,
snapshot: &DocumentSnapshot,
_client: &Client,
params: DocumentDiagnosticParams,
) -> Result<DocumentDiagnosticReportResult> {
let diagnostics = compute_diagnostics(db, &snapshot);
let diagnostics = compute_diagnostics(db, snapshot);
let Some(diagnostics) = diagnostics else {
return Ok(DocumentDiagnosticReportResult::Report(
@ -43,23 +43,26 @@ impl BackgroundDocumentRequestHandler for DocumentDiagnosticRequestHandler {
let result_id = diagnostics.result_id();
let report = if params.previous_result_id.as_deref() == Some(&result_id) {
DocumentDiagnosticReport::Unchanged(RelatedUnchangedDocumentDiagnosticReport {
related_documents: None,
unchanged_document_diagnostic_report: UnchangedDocumentDiagnosticReport {
result_id,
},
})
} else {
DocumentDiagnosticReport::Full(RelatedFullDocumentDiagnosticReport {
related_documents: None,
full_document_diagnostic_report: FullDocumentDiagnosticReport {
result_id: Some(result_id),
// SAFETY: Pull diagnostic requests are only called for text documents, not for
// notebook documents.
items: diagnostics.to_lsp_diagnostics(db).expect_text_document(),
},
})
let report = match result_id {
Some(new_id) if Some(&new_id) == params.previous_result_id.as_ref() => {
DocumentDiagnosticReport::Unchanged(RelatedUnchangedDocumentDiagnosticReport {
related_documents: None,
unchanged_document_diagnostic_report: UnchangedDocumentDiagnosticReport {
result_id: new_id,
},
})
}
new_id => {
DocumentDiagnosticReport::Full(RelatedFullDocumentDiagnosticReport {
related_documents: None,
full_document_diagnostic_report: FullDocumentDiagnosticReport {
result_id: new_id,
// SAFETY: Pull diagnostic requests are only called for text documents, not for
// notebook documents.
items: diagnostics.to_lsp_diagnostics(db).expect_text_document(),
},
})
}
};
Ok(DocumentDiagnosticReportResult::Report(report))

View file

@ -26,7 +26,7 @@ impl BackgroundDocumentRequestHandler for DocumentHighlightRequestHandler {
fn run_with_snapshot(
db: &ProjectDatabase,
snapshot: DocumentSnapshot,
snapshot: &DocumentSnapshot,
_client: &Client,
params: DocumentHighlightParams,
) -> crate::server::Result<Option<Vec<DocumentHighlight>>> {

View file

@ -28,7 +28,7 @@ impl BackgroundDocumentRequestHandler for DocumentSymbolRequestHandler {
fn run_with_snapshot(
db: &ProjectDatabase,
snapshot: DocumentSnapshot,
snapshot: &DocumentSnapshot,
_client: &Client,
params: DocumentSymbolParams,
) -> crate::server::Result<Option<lsp_types::DocumentSymbolResponse>> {

View file

@ -26,7 +26,7 @@ impl BackgroundDocumentRequestHandler for GotoDeclarationRequestHandler {
fn run_with_snapshot(
db: &ProjectDatabase,
snapshot: DocumentSnapshot,
snapshot: &DocumentSnapshot,
_client: &Client,
params: GotoDeclarationParams,
) -> crate::server::Result<Option<GotoDefinitionResponse>> {

View file

@ -26,7 +26,7 @@ impl BackgroundDocumentRequestHandler for GotoDefinitionRequestHandler {
fn run_with_snapshot(
db: &ProjectDatabase,
snapshot: DocumentSnapshot,
snapshot: &DocumentSnapshot,
_client: &Client,
params: GotoDefinitionParams,
) -> crate::server::Result<Option<GotoDefinitionResponse>> {

View file

@ -26,7 +26,7 @@ impl BackgroundDocumentRequestHandler for ReferencesRequestHandler {
fn run_with_snapshot(
db: &ProjectDatabase,
snapshot: DocumentSnapshot,
snapshot: &DocumentSnapshot,
_client: &Client,
params: ReferenceParams,
) -> crate::server::Result<Option<Vec<Location>>> {

View file

@ -26,7 +26,7 @@ impl BackgroundDocumentRequestHandler for GotoTypeDefinitionRequestHandler {
fn run_with_snapshot(
db: &ProjectDatabase,
snapshot: DocumentSnapshot,
snapshot: &DocumentSnapshot,
_client: &Client,
params: GotoTypeDefinitionParams,
) -> crate::server::Result<Option<GotoDefinitionResponse>> {

View file

@ -26,7 +26,7 @@ impl BackgroundDocumentRequestHandler for HoverRequestHandler {
fn run_with_snapshot(
db: &ProjectDatabase,
snapshot: DocumentSnapshot,
snapshot: &DocumentSnapshot,
_client: &Client,
params: HoverParams,
) -> crate::server::Result<Option<lsp_types::Hover>> {

View file

@ -25,7 +25,7 @@ impl BackgroundDocumentRequestHandler for InlayHintRequestHandler {
fn run_with_snapshot(
db: &ProjectDatabase,
snapshot: DocumentSnapshot,
snapshot: &DocumentSnapshot,
_client: &Client,
params: InlayHintParams,
) -> crate::server::Result<Option<Vec<lsp_types::InlayHint>>> {

View file

@ -26,7 +26,7 @@ impl BackgroundDocumentRequestHandler for SelectionRangeRequestHandler {
fn run_with_snapshot(
db: &ProjectDatabase,
snapshot: DocumentSnapshot,
snapshot: &DocumentSnapshot,
_client: &Client,
params: SelectionRangeParams,
) -> crate::server::Result<Option<Vec<LspSelectionRange>>> {

View file

@ -22,7 +22,7 @@ impl BackgroundDocumentRequestHandler for SemanticTokensRequestHandler {
fn run_with_snapshot(
db: &ProjectDatabase,
snapshot: DocumentSnapshot,
snapshot: &DocumentSnapshot,
_client: &Client,
_params: SemanticTokensParams,
) -> crate::server::Result<Option<SemanticTokensResult>> {

View file

@ -24,7 +24,7 @@ impl BackgroundDocumentRequestHandler for SemanticTokensRangeRequestHandler {
fn run_with_snapshot(
db: &ProjectDatabase,
snapshot: DocumentSnapshot,
snapshot: &DocumentSnapshot,
_client: &Client,
params: SemanticTokensRangeParams,
) -> crate::server::Result<Option<SemanticTokensRangeResult>> {

View file

@ -2,6 +2,8 @@ use crate::Session;
use crate::server::api::traits::{RequestHandler, SyncRequestHandler};
use crate::session::client::Client;
use lsp_types::{WorkspaceDiagnosticReport, WorkspaceDiagnosticReportResult};
pub(crate) struct ShutdownHandler;
impl RequestHandler for ShutdownHandler {
@ -9,9 +11,23 @@ impl RequestHandler for ShutdownHandler {
}
impl SyncRequestHandler for ShutdownHandler {
fn run(session: &mut Session, _client: &Client, _params: ()) -> crate::server::Result<()> {
tracing::debug!("Received shutdown request, waiting for shutdown notification");
fn run(session: &mut Session, client: &Client, _params: ()) -> crate::server::Result<()> {
tracing::debug!("Received shutdown request, waiting for exit notification");
// Respond to any pending workspace diagnostic requests
if let Some(suspended_workspace_request) =
session.take_suspended_workspace_diagnostic_request()
{
client.respond(
&suspended_workspace_request.id,
Ok(WorkspaceDiagnosticReportResult::Report(
WorkspaceDiagnosticReport::default(),
)),
);
}
session.set_shutdown_requested(true);
Ok(())
}
}

View file

@ -28,7 +28,7 @@ impl BackgroundDocumentRequestHandler for SignatureHelpRequestHandler {
fn run_with_snapshot(
db: &ProjectDatabase,
snapshot: DocumentSnapshot,
snapshot: &DocumentSnapshot,
_client: &Client,
params: SignatureHelpParams,
) -> crate::server::Result<Option<SignatureHelp>> {

View file

@ -1,14 +1,15 @@
use crate::PositionEncoding;
use crate::server::Result;
use crate::server::api::diagnostics::{Diagnostics, to_lsp_diagnostic};
use crate::server::api::traits::{
BackgroundRequestHandler, RequestHandler, RetriableRequestHandler,
};
use crate::server::lazy_work_done_progress::LazyWorkDoneProgress;
use crate::session::SessionSnapshot;
use crate::server::{Action, Result};
use crate::session::client::Client;
use crate::session::index::Index;
use crate::session::{SessionSnapshot, SuspendedWorkspaceDiagnosticRequest};
use crate::system::file_to_url;
use lsp_server::RequestId;
use lsp_types::request::WorkspaceDiagnosticRequest;
use lsp_types::{
FullDocumentDiagnosticReport, PreviousResultId, ProgressToken,
@ -25,6 +26,70 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
use ty_project::{Db, ProgressReporter};
/// Handler for [Workspace diagnostics](workspace-diagnostics)
///
/// Workspace diagnostics are special in many ways compared to other request handlers.
/// This is mostly due to the fact that computing them is expensive. Because of that,
/// the LSP supports multiple optimizations of which we all make use:
///
/// ## Partial results
///
/// Many clients support partial results. They allow a server
/// to send multiple responses (in the form of `$/progress` notifications) for
/// the same request. We use partial results to stream the results for
/// changed files. This has the obvious benefit is that users
/// don't need to wait for the entire check to complete before they see any diagnostics.
/// The other benefit of "chunking" the work also helps client to incrementally
/// update (and repaint) the diagnostics instead of all at once.
/// We did see lags in VS code for projects with 10k+ diagnostics before implementing
/// this improvement.
///
/// ## Result IDs
///
/// The server can compute a result id for every file which the client
/// sends back in the next pull or workspace diagnostic request. The way we use
/// the result id is that we compute a fingerprint of the file's diagnostics (a hash)
/// and compare it with the result id sent by the server. We know that
/// the diagnostics for a file are unchanged (the client still has the most recent review)
/// if the ids compare equal.
///
/// Result IDs are also useful to identify files for which ty no longer emits
/// any diagnostics. For example, file A contained a syntax error that has now been fixed
/// by the user. The client will send us a result id for file A but we won't match it with
/// any new diagnostics because all errors in the file were fixed. The fact that we can't
/// match up the result ID tells us that we need to clear the diagnostics on the client
/// side by sending an empty diagnostic report (report without any diagnostics). We'll set the
/// result id to `None` so that the client stops sending us a result id for this file.
///
/// Sending unchanged instead of the full diagnostics for files that haven't changed
/// helps reduce the data that's sent from the server to the client and it also enables long-polling
/// (see the next section).
///
/// ## Long polling
///
/// As of today (1st of August 2025), VS code's LSP client automatically schedules a
/// workspace diagnostic request every two seconds because it doesn't know *when* to pull
/// for new workspace diagnostics (it doesn't know what actions invalidate the diagnostics).
/// However, running the workspace diagnostics every two seconds is wasting a lot of CPU cycles (and battery life as a result)
/// if the user's only browsing the project (it requires ty to iterate over all files).
/// That's why we implement long polling (as recommended in the LSP) for workspace diagnostics.
///
/// The basic idea of long-polling is that the server doesn't respond if there are no diagnostics
/// or all diagnostics are unchanged. Instead, the server keeps the request open (it doesn't respond)
/// and only responses when the diagnostics change. This puts the server in full control of when
/// to recheck a workspace and a client can simply wait for the response to come in.
///
/// One challenge with long polling for ty's server architecture is that we can't just keep
/// the background thread running because holding on to the [`ProjectDatabase`] references
/// prevents notifications from acquiring the exclusive db lock (or the long polling background thread
/// panics if a notification tries to do so). What we do instead is that this request handler
/// doesn't send a response if there are no diagnostics or all are unchanged and it
/// sets a "[snapshot](SuspendedWorkspaceDiagnosticRequest)" of the workspace diagnostic request on the [`Session`].
/// The second part to this is in the notification request handling. ty retries the
/// suspended workspace diagnostic request (if any) after every notification if the notification
/// changed the [`Session`]'s state.
///
/// [workspace-diagnostics](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#workspace_diagnostic)
pub(crate) struct WorkspaceDiagnosticRequestHandler;
impl RequestHandler for WorkspaceDiagnosticRequestHandler {
@ -33,7 +98,7 @@ impl RequestHandler for WorkspaceDiagnosticRequestHandler {
impl BackgroundRequestHandler for WorkspaceDiagnosticRequestHandler {
fn run(
snapshot: SessionSnapshot,
snapshot: &SessionSnapshot,
client: &Client,
params: WorkspaceDiagnosticParams,
) -> Result<WorkspaceDiagnosticReportResult> {
@ -49,7 +114,7 @@ impl BackgroundRequestHandler for WorkspaceDiagnosticRequestHandler {
let writer = ResponseWriter::new(
params.partial_result_params.partial_result_token,
params.previous_result_ids,
&snapshot,
snapshot,
client,
);
@ -71,6 +136,50 @@ impl BackgroundRequestHandler for WorkspaceDiagnosticRequestHandler {
Ok(reporter.into_final_report())
}
fn handle_request(
id: &RequestId,
snapshot: SessionSnapshot,
client: &Client,
params: WorkspaceDiagnosticParams,
) {
let result = Self::run(&snapshot, client, params.clone());
// Test if this is a no-op result, in which case we should long-poll the request and
// only respond once some diagnostics have changed to get the latest result ids.
//
// Bulk response: This the simple case. Simply test if all diagnostics are unchanged (or empty)
// Streaming: This trickier but follows the same principle.
// * If the server sent any partial results, then `result` is a `Partial` result (in which
// case we shouldn't do any long polling because some diagnostics changed).
// * If this is a full report, then check if all items are unchanged (or empty), the same as for
// the non-streaming case.
if let Ok(WorkspaceDiagnosticReportResult::Report(full)) = &result {
let all_unchanged = full
.items
.iter()
.all(|item| matches!(item, WorkspaceDocumentDiagnosticReport::Unchanged(_)));
if all_unchanged {
tracing::debug!(
"Suspending workspace diagnostic request, all diagnostics are unchanged or the project has no diagnostics"
);
client.queue_action(Action::SuspendWorkspaceDiagnostics(Box::new(
SuspendedWorkspaceDiagnosticRequest {
id: id.clone(),
params: serde_json::to_value(&params).unwrap(),
revision: snapshot.revision(),
},
)));
// Don't respond, keep the request open (long polling).
return;
}
}
client.respond(id, result);
}
}
impl RetriableRequestHandler for WorkspaceDiagnosticRequestHandler {
@ -147,7 +256,12 @@ impl ProgressReporter for WorkspaceDiagnosticsProgressReporter<'_> {
self.report_progress();
}
let mut response = self.response.lock().unwrap();
// Another thread might have panicked at this point because of a salsa cancellation which
// poisoned the result. If the response is poisoned, just don't report and wait for our thread
// to unwind with a salsa cancellation next.
let Ok(mut response) = self.response.lock() else {
return;
};
// Don't report empty diagnostics. We clear previous diagnostics in `into_response`
// which also handles the case where a file no longer has diagnostics because
@ -207,7 +321,7 @@ impl<'a> ResponseWriter<'a> {
token,
is_test: snapshot.in_test(),
last_flush: Instant::now(),
batched: Vec::new(),
changed: Vec::new(),
unchanged: Vec::with_capacity(previous_result_ids.len()),
})
} else {
@ -242,35 +356,33 @@ impl<'a> ResponseWriter<'a> {
let result_id = Diagnostics::result_id_from_hash(diagnostics);
let is_unchanged = self
.previous_result_ids
.remove(&url)
.is_some_and(|previous_result_id| previous_result_id == result_id);
let report = match result_id {
Some(new_id) if Some(&new_id) == self.previous_result_ids.remove(&url).as_ref() => {
WorkspaceDocumentDiagnosticReport::Unchanged(
WorkspaceUnchangedDocumentDiagnosticReport {
uri: url,
version,
unchanged_document_diagnostic_report: UnchangedDocumentDiagnosticReport {
result_id: new_id,
},
},
)
}
new_id => {
let lsp_diagnostics = diagnostics
.iter()
.map(|diagnostic| to_lsp_diagnostic(db, diagnostic, self.position_encoding))
.collect::<Vec<_>>();
let report = if is_unchanged {
WorkspaceDocumentDiagnosticReport::Unchanged(
WorkspaceUnchangedDocumentDiagnosticReport {
WorkspaceDocumentDiagnosticReport::Full(WorkspaceFullDocumentDiagnosticReport {
uri: url,
version,
unchanged_document_diagnostic_report: UnchangedDocumentDiagnosticReport {
result_id,
full_document_diagnostic_report: FullDocumentDiagnosticReport {
result_id: new_id,
items: lsp_diagnostics,
},
},
)
} else {
let lsp_diagnostics = diagnostics
.iter()
.map(|diagnostic| to_lsp_diagnostic(db, diagnostic, self.position_encoding))
.collect::<Vec<_>>();
WorkspaceDocumentDiagnosticReport::Full(WorkspaceFullDocumentDiagnosticReport {
uri: url,
version,
full_document_diagnostic_report: FullDocumentDiagnosticReport {
result_id: Some(result_id),
items: lsp_diagnostics,
},
})
})
}
};
self.write_report(report);
@ -306,7 +418,7 @@ impl<'a> ResponseWriter<'a> {
// Handle files that had diagnostics in previous request but no longer have any
// Any remaining entries in previous_results are files that were fixed
for previous_url in self.previous_result_ids.into_keys() {
for (previous_url, previous_result_id) in self.previous_result_ids {
// This file had diagnostics before but doesn't now, so we need to report it as having no diagnostics
let version = self
.index
@ -315,22 +427,38 @@ impl<'a> ResponseWriter<'a> {
.and_then(|key| self.index.make_document_ref(key).ok())
.map(|doc| i64::from(doc.version()));
items.push(WorkspaceDocumentDiagnosticReport::Full(
WorkspaceFullDocumentDiagnosticReport {
uri: previous_url,
version,
full_document_diagnostic_report: FullDocumentDiagnosticReport {
result_id: None, // No result ID needed for empty diagnostics
items: vec![], // No diagnostics
},
},
));
let new_result_id = Diagnostics::result_id_from_hash(&[]);
let report = match new_result_id {
Some(new_id) if new_id == previous_result_id => {
WorkspaceDocumentDiagnosticReport::Unchanged(
WorkspaceUnchangedDocumentDiagnosticReport {
uri: previous_url,
version,
unchanged_document_diagnostic_report:
UnchangedDocumentDiagnosticReport { result_id: new_id },
},
)
}
new_id => {
WorkspaceDocumentDiagnosticReport::Full(WorkspaceFullDocumentDiagnosticReport {
uri: previous_url,
version,
full_document_diagnostic_report: FullDocumentDiagnosticReport {
result_id: new_id,
items: vec![], // No diagnostics
},
})
}
};
items.push(report);
}
match &mut self.mode {
ReportingMode::Streaming(streaming) => {
items.extend(
std::mem::take(&mut streaming.batched)
std::mem::take(&mut streaming.changed)
.into_iter()
.map(WorkspaceDocumentDiagnosticReport::Full),
);
@ -388,7 +516,7 @@ struct Streaming {
/// The implementation uses batching to avoid too many
/// requests for large projects (can slow down the entire
/// analysis).
batched: Vec<WorkspaceFullDocumentDiagnosticReport>,
changed: Vec<WorkspaceFullDocumentDiagnosticReport>,
/// All the unchanged reports. Don't stream them,
/// since nothing has changed.
unchanged: Vec<WorkspaceUnchangedDocumentDiagnosticReport>,
@ -398,7 +526,7 @@ impl Streaming {
fn write_report(&mut self, report: WorkspaceDocumentDiagnosticReport) {
match report {
WorkspaceDocumentDiagnosticReport::Full(full) => {
self.batched.push(full);
self.changed.push(full);
}
WorkspaceDocumentDiagnosticReport::Unchanged(unchanged) => {
self.unchanged.push(unchanged);
@ -407,13 +535,13 @@ impl Streaming {
}
fn maybe_flush(&mut self) {
if self.batched.is_empty() {
if self.changed.is_empty() {
return;
}
// Flush every ~50ms or whenever we have two items and this is a test run.
let should_flush = if self.is_test {
self.batched.len() >= 2
self.changed.len() >= 2
} else {
self.last_flush.elapsed().as_millis() >= 50
};
@ -422,7 +550,7 @@ impl Streaming {
}
let items = self
.batched
.changed
.drain(..)
.map(WorkspaceDocumentDiagnosticReport::Full)
.collect();

View file

@ -19,7 +19,7 @@ impl RequestHandler for WorkspaceSymbolRequestHandler {
impl BackgroundRequestHandler for WorkspaceSymbolRequestHandler {
fn run(
snapshot: SessionSnapshot,
snapshot: &SessionSnapshot,
_client: &Client,
params: WorkspaceSymbolParams,
) -> crate::server::Result<Option<WorkspaceSymbolResponse>> {

View file

@ -33,10 +33,10 @@
//! See the `./requests` and `./notifications` directories for concrete implementations of these
//! traits in action.
use std::borrow::Cow;
use crate::session::client::Client;
use crate::session::{DocumentSnapshot, Session, SessionSnapshot};
use lsp_server::RequestId;
use std::borrow::Cow;
use lsp_types::Url;
use lsp_types::notification::Notification;
@ -91,12 +91,36 @@ pub(super) trait BackgroundDocumentRequestHandler: RetriableRequestHandler {
params: &<<Self as RequestHandler>::RequestType as Request>::Params,
) -> Cow<Url>;
/// Processes the request parameters and returns the LSP request result.
///
/// This is the main method that handlers implement. It takes the request parameters
/// from the client and computes the appropriate response data for the LSP request.
fn run_with_snapshot(
db: &ProjectDatabase,
snapshot: &DocumentSnapshot,
client: &Client,
params: <<Self as RequestHandler>::RequestType as Request>::Params,
) -> super::Result<<<Self as RequestHandler>::RequestType as Request>::Result>;
/// Handles the entire request lifecycle and sends the response to the client.
///
/// It allows handlers to customize how the server sends the response to the client.
fn handle_request(
id: &RequestId,
db: &ProjectDatabase,
snapshot: DocumentSnapshot,
client: &Client,
params: <<Self as RequestHandler>::RequestType as Request>::Params,
) -> super::Result<<<Self as RequestHandler>::RequestType as Request>::Result>;
) {
let result = Self::run_with_snapshot(db, &snapshot, client, params);
if let Err(err) = &result {
tracing::error!("An error occurred with request ID {id}: {err}");
client.show_error_message("ty encountered a problem. Check the logs for more details.");
}
client.respond(id, result);
}
}
/// A request handler that can be run on a background thread.
@ -106,11 +130,34 @@ pub(super) trait BackgroundDocumentRequestHandler: RetriableRequestHandler {
/// operations that require access to the entire session state, such as fetching workspace
/// diagnostics.
pub(super) trait BackgroundRequestHandler: RetriableRequestHandler {
/// Processes the request parameters and returns the LSP request result.
///
/// This is the main method that handlers implement. It takes the request parameters
/// from the client and computes the appropriate response data for the LSP request.
fn run(
snapshot: SessionSnapshot,
snapshot: &SessionSnapshot,
client: &Client,
params: <<Self as RequestHandler>::RequestType as Request>::Params,
) -> super::Result<<<Self as RequestHandler>::RequestType as Request>::Result>;
/// Handles the request lifecycle and sends the response to the client.
///
/// It allows handlers to customize how the server sends the response to the client.
fn handle_request(
id: &RequestId,
snapshot: SessionSnapshot,
client: &Client,
params: <<Self as RequestHandler>::RequestType as Request>::Params,
) {
let result = Self::run(&snapshot, client, params);
if let Err(err) = &result {
tracing::error!("An error occurred with request ID {id}: {err}");
client.show_error_message("ty encountered a problem. Check the logs for more details.");
}
client.respond(id, result);
}
}
/// A supertrait for any server notification handler.

View file

@ -1,7 +1,7 @@
use crate::server::schedule::Scheduler;
use crate::server::{Server, api};
use crate::session::ClientOptions;
use crate::session::client::{Client, ClientResponseHandler};
use crate::session::{ClientOptions, SuspendedWorkspaceDiagnosticRequest};
use anyhow::anyhow;
use crossbeam::select;
use lsp_server::Message;
@ -49,7 +49,8 @@ impl Server {
if self.session.is_shutdown_requested() {
tracing::warn!(
"Received request after server shutdown was requested, discarding"
"Received request `{}` after server shutdown was requested, discarding",
&req.method
);
client.respond_err(
req.id,
@ -130,7 +131,8 @@ impl Server {
.incoming()
.is_pending(&request.id)
{
api::request(request);
let task = api::request(request);
scheduler.dispatch(task, &mut self.session, client);
} else {
tracing::debug!(
"Request {}/{} was cancelled, not retrying",
@ -142,6 +144,13 @@ impl Server {
Action::SendRequest(request) => client.send_request_raw(&self.session, request),
Action::SuspendWorkspaceDiagnostics(suspended_request) => {
self.session.set_suspended_workspace_diagnostics_request(
*suspended_request,
&client,
);
}
Action::InitializeWorkspaces(workspaces_with_options) => {
self.session
.initialize_workspaces(workspaces_with_options, &client);
@ -304,6 +313,8 @@ pub(crate) enum Action {
/// Send a request from the server to the client.
SendRequest(SendRequest),
SuspendWorkspaceDiagnostics(Box<SuspendedWorkspaceDiagnosticRequest>),
/// Initialize the workspace after the server received
/// the options from the client.
InitializeWorkspaces(Vec<(Url, ClientOptions)>),

View file

@ -34,6 +34,7 @@ pub(in crate::server) enum BackgroundSchedule {
/// while local tasks have exclusive access and can modify it as they please. Keep in mind that
/// local tasks will **block** the main event loop, so only use local tasks if you **need**
/// mutable state access or you need the absolute lowest latency possible.
#[must_use]
pub(in crate::server) enum Task {
Background(BackgroundTaskBuilder),
Sync(SyncTask),

View file

@ -2,9 +2,9 @@
use anyhow::{Context, anyhow};
use index::DocumentQueryError;
use lsp_server::Message;
use lsp_server::{Message, RequestId};
use lsp_types::notification::{Exit, Notification};
use lsp_types::request::{Request, Shutdown};
use lsp_types::request::{Request, Shutdown, WorkspaceDiagnosticRequest};
use lsp_types::{ClientCapabilities, TextDocumentContentChangeEvent, Url};
use options::GlobalOptions;
use ruff_db::Db;
@ -24,7 +24,7 @@ pub(crate) use self::options::AllOptions;
pub use self::options::{ClientOptions, DiagnosticMode};
pub(crate) use self::settings::ClientSettings;
use crate::document::{DocumentKey, DocumentVersion, NotebookDocument};
use crate::server::publish_settings_diagnostics;
use crate::server::{Action, publish_settings_diagnostics};
use crate::session::client::Client;
use crate::session::request_queue::RequestQueue;
use crate::system::{AnySystemPath, LSPSystem};
@ -81,6 +81,16 @@ pub(crate) struct Session {
in_test: bool,
deferred_messages: VecDeque<Message>,
/// A revision counter. It gets incremented on every change to `Session` that
/// could result in different workspace diagnostics.
revision: u64,
/// A pending workspace diagnostics request because there were no diagnostics
/// or no changes when when the request ran last time.
/// We'll re-run the request after every change to `Session` (see `revision`)
/// to see if there are now changes and, if so, respond to the client.
suspended_workspace_diagnostics_request: Option<SuspendedWorkspaceDiagnosticRequest>,
}
/// LSP State for a Project
@ -137,6 +147,8 @@ impl Session {
request_queue: RequestQueue::new(),
shutdown_requested: false,
in_test,
suspended_workspace_diagnostics_request: None,
revision: 0,
})
}
@ -155,6 +167,56 @@ impl Session {
self.shutdown_requested = requested;
}
pub(crate) fn set_suspended_workspace_diagnostics_request(
&mut self,
request: SuspendedWorkspaceDiagnosticRequest,
client: &Client,
) {
self.suspended_workspace_diagnostics_request = Some(request);
// Run the suspended workspace diagnostic request immediately in case there
// were changes since the workspace diagnostics background thread queued
// the action to suspend the workspace diagnostic request.
self.resume_suspended_workspace_diagnostic_request(client);
}
pub(crate) fn take_suspended_workspace_diagnostic_request(
&mut self,
) -> Option<SuspendedWorkspaceDiagnosticRequest> {
self.suspended_workspace_diagnostics_request.take()
}
/// Resumes (retries) the workspace diagnostic request if there
/// were any changes to the [`Session`] (the revision got bumped)
/// since the workspace diagnostic request ran last time.
///
/// The workspace diagnostic requests is ignored if the request
/// was cancelled in the meantime.
pub(crate) fn resume_suspended_workspace_diagnostic_request(&mut self, client: &Client) {
self.suspended_workspace_diagnostics_request = self
.suspended_workspace_diagnostics_request
.take()
.and_then(|request| {
if !self.request_queue.incoming().is_pending(&request.id) {
// Clear out the suspended request if the request has been cancelled.
tracing::debug!("Skipping suspended workspace diagnostics request `{}` because it was cancelled", request.id);
return None;
}
request.resume_if_revision_changed(self.revision, client)
});
}
/// Bumps the revision.
///
/// The revision is used to track when workspace diagnostics may have changed and need to be re-run.
/// It's okay if a bump doesn't necessarily result in new workspace diagnostics.
///
/// In general, any change to a project database should bump the revision and so should
/// any change to the document states (but also when the open workspaces change etc.).
fn bump_revision(&mut self) {
self.revision += 1;
}
/// The LSP specification doesn't allow configuration requests during initialization,
/// but we need access to the configuration to resolve the settings in turn to create the
/// project databases. This will become more important in the future when we support
@ -318,6 +380,8 @@ impl Session {
.cloned()
});
self.bump_revision();
self.project_db_mut(path)
.apply_changes(changes, overrides.as_ref())
}
@ -465,6 +529,7 @@ impl Session {
position_encoding: self.position_encoding,
in_test: self.in_test,
resolved_client_capabilities: self.resolved_client_capabilities,
revision: self.revision,
}
}
@ -483,12 +548,14 @@ impl Session {
document: NotebookDocument,
) {
self.index_mut().open_notebook_document(path, document);
self.bump_revision();
}
/// Registers a text document at the provided `path`.
/// If a document is already open here, it will be overwritten.
pub(crate) fn open_text_document(&mut self, path: &AnySystemPath, document: TextDocument) {
self.index_mut().open_text_document(path, document);
self.bump_revision();
}
/// Updates a text document at the associated `key`.
@ -501,8 +568,14 @@ impl Session {
new_version: DocumentVersion,
) -> crate::Result<()> {
let position_encoding = self.position_encoding;
self.index_mut()
.update_text_document(key, content_changes, new_version, position_encoding)
self.index_mut().update_text_document(
key,
content_changes,
new_version,
position_encoding,
)?;
self.bump_revision();
Ok(())
}
/// De-registers a document, specified by its key.
@ -656,6 +729,7 @@ pub(crate) struct SessionSnapshot {
position_encoding: PositionEncoding,
resolved_client_capabilities: ResolvedClientCapabilities,
in_test: bool,
revision: u64,
/// IMPORTANT: It's important that the databases come last, or at least,
/// after any `Arc` that we try to extract or mutate in-place using `Arc::into_inner`
@ -689,6 +763,10 @@ impl SessionSnapshot {
pub(crate) const fn in_test(&self) -> bool {
self.in_test
}
pub(crate) fn revision(&self) -> u64 {
self.revision
}
}
#[derive(Debug, Default)]
@ -847,3 +925,43 @@ impl DefaultProject {
self.0.get_mut()
}
}
/// A workspace diagnostic request that didn't yield any changes or diagnostic
/// when it ran the last time.
#[derive(Debug)]
pub(crate) struct SuspendedWorkspaceDiagnosticRequest {
/// The LSP request id
pub(crate) id: RequestId,
/// The params passed to the `workspace/diagnostic` request.
pub(crate) params: serde_json::Value,
/// The session's revision when the request ran the last time.
///
/// This is to prevent races between:
/// * The background thread completes
/// * A did change notification coming in
/// * storing this struct on `Session`
///
/// The revision helps us detect that a did change notification
/// happened in the meantime, so that we can reschedule the
/// workspace diagnostic request immediately.
pub(crate) revision: u64,
}
impl SuspendedWorkspaceDiagnosticRequest {
fn resume_if_revision_changed(self, current_revision: u64, client: &Client) -> Option<Self> {
if self.revision == current_revision {
return Some(self);
}
tracing::debug!("Resuming workspace diagnostics request after revision bump");
client.queue_action(Action::RetryRequest(lsp_server::Request {
id: self.id,
method: WorkspaceDiagnosticRequest::METHOD.to_string(),
params: self.params,
}));
None
}
}

View file

@ -56,7 +56,7 @@ use lsp_types::{
DidChangeTextDocumentParams, DidChangeWatchedFilesClientCapabilities,
DidChangeWatchedFilesParams, DidCloseTextDocumentParams, DidOpenTextDocumentParams,
DocumentDiagnosticParams, DocumentDiagnosticReportResult, FileEvent, InitializeParams,
InitializeResult, InitializedParams, PartialResultParams, PreviousResultId,
InitializeResult, InitializedParams, NumberOrString, PartialResultParams, PreviousResultId,
PublishDiagnosticsClientCapabilities, TextDocumentClientCapabilities,
TextDocumentContentChangeEvent, TextDocumentIdentifier, TextDocumentItem, Url,
VersionedTextDocumentIdentifier, WorkDoneProgressParams, WorkspaceClientCapabilities,
@ -151,6 +151,10 @@ pub(crate) struct TestServer {
/// Capabilities registered by the server
registered_capabilities: Vec<String>,
/// Whether a Shutdown request has been sent by the test
/// and the exit sequence should be skipped during `Drop`
shutdown_requested: bool,
}
impl TestServer {
@ -207,6 +211,7 @@ impl TestServer {
initialize_response: None,
workspace_configurations,
registered_capabilities: Vec::new(),
shutdown_requested: false,
}
.initialize(workspace_folders, capabilities, initialization_options)
}
@ -229,7 +234,7 @@ impl TestServer {
};
let init_request_id = self.send_request::<Initialize>(init_params);
self.initialize_response = Some(self.await_response::<InitializeResult>(init_request_id)?);
self.initialize_response = Some(self.await_response::<InitializeResult>(&init_request_id)?);
self.send_notification::<Initialized>(InitializedParams {});
Ok(self)
@ -330,6 +335,11 @@ impl TestServer {
where
R: Request,
{
// Track if an Exit notification is being sent
if R::METHOD == lsp_types::request::Shutdown::METHOD {
self.shutdown_requested = true;
}
let id = self.next_request_id();
let request = lsp_server::Request::new(id.clone(), R::METHOD.to_string(), params);
self.send(Message::Request(request));
@ -354,9 +364,9 @@ impl TestServer {
/// called once per request ID.
///
/// [`send_request`]: TestServer::send_request
pub(crate) fn await_response<T: DeserializeOwned>(&mut self, id: RequestId) -> Result<T> {
pub(crate) fn await_response<T: DeserializeOwned>(&mut self, id: &RequestId) -> Result<T> {
loop {
if let Some(response) = self.responses.remove(&id) {
if let Some(response) = self.responses.remove(id) {
match response {
Response {
error: None,
@ -373,7 +383,11 @@ impl TestServer {
return Err(TestServerError::ResponseError(err).into());
}
response => {
return Err(TestServerError::InvalidResponse(id, Box::new(response)).into());
return Err(TestServerError::InvalidResponse(
id.clone(),
Box::new(response),
)
.into());
}
}
}
@ -531,6 +545,16 @@ impl TestServer {
panic!("Server dropped client receiver while still running");
}
pub(crate) fn cancel(&mut self, request_id: &RequestId) {
let id_string = request_id.to_string();
self.send_notification::<lsp_types::notification::Cancel>(lsp_types::CancelParams {
id: match id_string.parse() {
Ok(id) => NumberOrString::Number(id),
Err(_) => NumberOrString::String(id_string),
},
});
}
/// Handle workspace configuration requests from the server.
///
/// Use the [`get_request`] method to wait for the server to send this request.
@ -652,7 +676,7 @@ impl TestServer {
partial_result_params: PartialResultParams::default(),
};
let id = self.send_request::<DocumentDiagnosticRequest>(params);
self.await_response::<DocumentDiagnosticReportResult>(id)
self.await_response::<DocumentDiagnosticReportResult>(&id)
}
/// Send a `workspace/diagnostic` request with optional previous result IDs.
@ -672,7 +696,7 @@ impl TestServer {
};
let id = self.send_request::<WorkspaceDiagnosticRequest>(params);
self.await_response::<WorkspaceDiagnosticReportResult>(id)
self.await_response::<WorkspaceDiagnosticReportResult>(&id)
}
}
@ -699,9 +723,9 @@ impl Drop for TestServer {
//
// The `server_thread` could be `None` if the server exited unexpectedly or panicked or if
// it dropped the client connection.
let shutdown_error = if self.server_thread.is_some() {
let shutdown_error = if self.server_thread.is_some() && !self.shutdown_requested {
let shutdown_id = self.send_request::<Shutdown>(());
match self.await_response::<()>(shutdown_id) {
match self.await_response::<()>(&shutdown_id) {
Ok(()) => {
self.send_notification::<Exit>(());
None

View file

@ -8,7 +8,7 @@ use lsp_types::{
use ruff_db::system::SystemPath;
use ty_server::{ClientOptions, DiagnosticMode, PartialWorkspaceProgress};
use crate::{TestServer, TestServerBuilder};
use crate::{TestServer, TestServerBuilder, TestServerError};
#[test]
fn on_did_open() -> Result<()> {
@ -31,7 +31,7 @@ def foo() -> str:
server.open_text_document(foo, &foo_content, 1);
let diagnostics = server.document_diagnostic_request(foo, None)?;
insta::assert_debug_snapshot!(diagnostics);
assert_debug_snapshot!(diagnostics);
Ok(())
}
@ -239,32 +239,13 @@ def foo() -> str:
let mut first_response = server.workspace_diagnostic_request(None)?;
sort_workspace_diagnostic_response(&mut first_response);
insta::assert_debug_snapshot!("workspace_diagnostic_initial_state", first_response);
assert_debug_snapshot!("workspace_diagnostic_initial_state", first_response);
// Consume all progress notifications sent during workspace diagnostics
consume_all_progress_notifications(&mut server)?;
// Extract result IDs from the first response
let previous_result_ids = match first_response {
WorkspaceDiagnosticReportResult::Report(report) => {
report.items.into_iter().filter_map(|item| match item {
WorkspaceDocumentDiagnosticReport::Full(full_report) => {
let result_id = full_report.full_document_diagnostic_report.result_id?;
Some(PreviousResultId {
uri: full_report.uri,
value: result_id,
})
}
WorkspaceDocumentDiagnosticReport::Unchanged(_) => {
panic!("The first response must be a full report, not unchanged");
}
})
}
WorkspaceDiagnosticReportResult::Partial(_) => {
panic!("The first response must be a full report");
}
}
.collect();
let previous_result_ids = extract_result_ids_from_response(&first_response);
// Make changes to files B, C, D, and E (leave A unchanged)
// Need to open files before changing them
@ -330,7 +311,7 @@ def foo() -> str:
// Consume all progress notifications sent during the second workspace diagnostics
consume_all_progress_notifications(&mut server)?;
insta::assert_debug_snapshot!("workspace_diagnostic_after_changes", second_response);
assert_debug_snapshot!("workspace_diagnostic_after_changes", second_response);
Ok(())
}
@ -426,7 +407,7 @@ def foo() -> str:
// First, read the response of the workspace diagnostic request.
// Note: This response comes after the progress notifications but it simplifies the test to read it first.
let final_response = server.await_response::<WorkspaceDiagnosticReportResult>(request_id)?;
let final_response = server.await_response::<WorkspaceDiagnosticReportResult>(&request_id)?;
// Process the final report.
// This should always be a partial report. However, the type definition in the LSP specification
@ -509,27 +490,7 @@ fn workspace_diagnostic_streaming_with_caching() -> Result<()> {
// Consume progress notifications from first request
consume_all_progress_notifications(&mut server)?;
let result_ids = match first_response {
WorkspaceDiagnosticReportResult::Report(report) => report
.items
.into_iter()
.filter_map(|item| {
if let WorkspaceDocumentDiagnosticReport::Full(full) = item {
full.full_document_diagnostic_report
.result_id
.map(|id| PreviousResultId {
uri: full.uri,
value: id,
})
} else {
panic!("Expected Full report in initial response");
}
})
.collect::<Vec<_>>(),
WorkspaceDiagnosticReportResult::Partial(_) => {
panic!("Request without a partial response token should not use streaming")
}
};
let result_ids = extract_result_ids_from_response(&first_response);
assert_eq!(result_ids.len(), NUM_FILES);
@ -578,7 +539,7 @@ fn workspace_diagnostic_streaming_with_caching() -> Result<()> {
},
});
let final_response2 = server.await_response::<WorkspaceDiagnosticReportResult>(request2_id)?;
let final_response2 = server.await_response::<WorkspaceDiagnosticReportResult>(&request2_id)?;
let mut all_items = Vec::new();
@ -641,3 +602,344 @@ fn sort_workspace_report_items(items: &mut [WorkspaceDocumentDiagnosticReport])
items.sort_unstable_by(|a, b| item_uri(a).cmp(item_uri(b)));
}
/// The LSP specification requires that the server sends a response for every request.
///
/// This test verifies that the server responds to a long-polling workspace diagnostic request
/// when the server is shut down.
#[test]
fn workspace_diagnostic_long_polling_responds_on_shutdown() -> Result<()> {
let _filter = filter_result_id();
let workspace_root = SystemPath::new("src");
let file_path = SystemPath::new("src/test.py");
let file_content = "\
def hello() -> str:
return \"world\"
";
// Create a project with one file but no diagnostics
let mut server = create_workspace_server_with_file(workspace_root, file_path, file_content)?;
// Make a workspace diagnostic request to a project with one file but no diagnostics
// This should trigger long-polling since the project has no diagnostics
let request_id = send_workspace_diagnostic_request(&mut server);
assert_workspace_diagnostics_suspends_for_long_polling(&mut server, &request_id);
// Send shutdown request - this should cause the suspended workspace diagnostic request to respond
let shutdown_id = server.send_request::<lsp_types::request::Shutdown>(());
// The workspace diagnostic request should now respond with an empty report
let workspace_response =
server.await_response::<WorkspaceDiagnosticReportResult>(&request_id)?;
// Complete shutdown sequence
server.await_response::<()>(&shutdown_id)?;
server.send_notification::<lsp_types::notification::Exit>(());
// Verify we got an empty report (default response during shutdown)
assert_debug_snapshot!(
"workspace_diagnostic_long_polling_shutdown_response",
workspace_response
);
Ok(())
}
/// Tests that the server responds to a long-polling workspace diagnostic request
/// after a change introduced a new diagnostic.
#[test]
fn workspace_diagnostic_long_polling_responds_on_change() -> Result<()> {
let _filter = filter_result_id();
let workspace_root = SystemPath::new("src");
let file_path = SystemPath::new("src/test.py");
let file_content_no_error = "\
def hello() -> str:
return \"world\"
";
let file_content_with_error = "\
def hello() -> str:
return 42 # Type error: expected str, got int
";
// Create a project with one file but no diagnostics
let mut server =
create_workspace_server_with_file(workspace_root, file_path, file_content_no_error)?;
// Open the file first
server.open_text_document(file_path, &file_content_no_error, 1);
// Make a workspace diagnostic request to a project with one file but no diagnostics
// This should trigger long-polling since the project has no diagnostics
let request_id = send_workspace_diagnostic_request(&mut server);
// Verify the request doesn't complete immediately (should be long-polling)
assert_workspace_diagnostics_suspends_for_long_polling(&mut server, &request_id);
// Now introduce an error to the file - this should trigger the long-polling request to complete
server.change_text_document(
file_path,
vec![lsp_types::TextDocumentContentChangeEvent {
range: None,
range_length: None,
text: file_content_with_error.to_string(),
}],
2,
);
// The workspace diagnostic request should now complete with the new diagnostic
let workspace_response =
server.await_response::<WorkspaceDiagnosticReportResult>(&request_id)?;
// Verify we got a report with one file containing the new diagnostic
assert_debug_snapshot!(
"workspace_diagnostic_long_polling_change_response",
workspace_response
);
Ok(())
}
/// The LSP specification requires that the server responds to each request with exactly one response.
///
/// This test verifies that the server sends one response (and not two) if a long polling workspace diagnostic request
/// is cancelled.
#[test]
fn workspace_diagnostic_long_polling_responds_on_cancellation() -> Result<()> {
let _filter = filter_result_id();
let workspace_root = SystemPath::new("src");
let file_path = SystemPath::new("src/test.py");
let file_content = "\
def hello() -> str:
return \"world\"
";
// Create a project with one file but no diagnostics
let mut server = create_workspace_server_with_file(workspace_root, file_path, file_content)?;
// Make a workspace diagnostic request to a project with one file but no diagnostics
// This should trigger long-polling since the project has no diagnostics
let request_id = send_workspace_diagnostic_request(&mut server);
// Verify the request doesn't complete immediately (should be long-polling)
assert_workspace_diagnostics_suspends_for_long_polling(&mut server, &request_id);
// Send a cancel request notification for the suspended request
// The request_id from send_request should match the ID that the server expects
// Based on logs, the server shows request id=2, so let's try using that directly
server.cancel(&request_id);
// The workspace diagnostic request should now respond with a cancellation response (Err).
let result = server.await_response::<WorkspaceDiagnosticReportResult>(&request_id);
assert_debug_snapshot!(
"workspace_diagnostic_long_polling_cancellation_result",
result
);
// The test server's drop implementation asserts that we aren't sending the response twice.
Ok(())
}
/// This test verifies an entire workspace diagnostic cycle with long-polling:
/// * Initial suspend with no diagnostics
/// * Change the file to introduce a diagnostic, server should respond with the new diagnostics
/// * Send a second workspace diagnostic request, which should suspend again because the diagnostics haven't changed
/// * Change the file again to fix the diagnostic, server should respond with no diagnostics
#[test]
fn workspace_diagnostic_long_polling_suspend_change_suspend_cycle() -> Result<()> {
let _filter = filter_result_id();
let workspace_root = SystemPath::new("src");
let file_path = SystemPath::new("src/test.py");
let file_content_no_error = "\
def hello() -> str:
return \"world\"
";
let file_content_with_error = "\
def hello() -> str:
return 42 # Type error: expected str, got int
";
let file_content_fixed = "\
def hello() -> str:
return \"fixed\"
";
// Create a project with one file but no diagnostics
let mut server =
create_workspace_server_with_file(workspace_root, file_path, file_content_no_error)?;
// Open the file first
server.open_text_document(file_path, &file_content_no_error, 1);
// PHASE 1: Initial suspend (no diagnostics)
let request_id_1 = send_workspace_diagnostic_request(&mut server);
assert_workspace_diagnostics_suspends_for_long_polling(&mut server, &request_id_1);
// PHASE 2: Introduce error to trigger response
server.change_text_document(
file_path,
vec![lsp_types::TextDocumentContentChangeEvent {
range: None,
range_length: None,
text: file_content_with_error.to_string(),
}],
2,
);
// First request should complete with diagnostics
let first_response = server.await_response::<WorkspaceDiagnosticReportResult>(&request_id_1)?;
// Extract result IDs from the first response for the second request
let previous_result_ids = extract_result_ids_from_response(&first_response);
// PHASE 3: Second request with previous result IDs - should suspend again since diagnostics unchanged
let request_id_2 =
server.send_request::<WorkspaceDiagnosticRequest>(WorkspaceDiagnosticParams {
identifier: None,
previous_result_ids,
work_done_progress_params: WorkDoneProgressParams {
work_done_token: None,
},
partial_result_params: PartialResultParams {
partial_result_token: None,
},
});
// Second request should suspend since diagnostics haven't changed
assert_workspace_diagnostics_suspends_for_long_polling(&mut server, &request_id_2);
// PHASE 4: Fix the error to trigger the second response
server.change_text_document(
file_path,
vec![lsp_types::TextDocumentContentChangeEvent {
range: None,
range_length: None,
text: file_content_fixed.to_string(),
}],
3,
);
// Second request should complete with the fix (no diagnostics)
let second_response =
server.await_response::<WorkspaceDiagnosticReportResult>(&request_id_2)?;
// Snapshot both responses to verify the full cycle
assert_debug_snapshot!(
"workspace_diagnostic_suspend_change_suspend_first_response",
first_response
);
assert_debug_snapshot!(
"workspace_diagnostic_suspend_change_suspend_second_response",
second_response
);
Ok(())
}
// Helper functions for long-polling tests
fn create_workspace_server_with_file(
workspace_root: &SystemPath,
file_path: &SystemPath,
file_content: &str,
) -> Result<TestServer> {
let global_options = ClientOptions::default().with_diagnostic_mode(DiagnosticMode::Workspace);
TestServerBuilder::new()?
.with_workspace(workspace_root, global_options.clone())?
.with_file(file_path, file_content)?
.with_initialization_options(global_options)
.enable_pull_diagnostics(true)
.build()?
.wait_until_workspaces_are_initialized()
}
/// Sends a workspace diagnostic request to the server.
///
/// Unlike [`TestServer::workspace_diagnostic_request`], this function does not wait for the response.
fn send_workspace_diagnostic_request(server: &mut TestServer) -> lsp_server::RequestId {
server.send_request::<WorkspaceDiagnosticRequest>(WorkspaceDiagnosticParams {
identifier: None,
previous_result_ids: Vec::new(),
work_done_progress_params: WorkDoneProgressParams {
work_done_token: None,
},
partial_result_params: PartialResultParams {
partial_result_token: None,
},
})
}
#[track_caller]
fn assert_workspace_diagnostics_suspends_for_long_polling(
server: &mut TestServer,
request_id: &lsp_server::RequestId,
) {
match server.await_response::<WorkspaceDiagnosticReportResult>(request_id) {
Ok(_) => {
panic!("Expected workspace diagnostic request to suspend for long-polling.");
}
Err(error) => {
if let Some(test_error) = error.downcast_ref::<TestServerError>() {
assert!(
matches!(test_error, TestServerError::RecvTimeoutError(_)),
"Response should time out because the request is suspended for long-polling"
);
} else {
panic!("Unexpected error type: {error:?}");
}
}
}
}
fn extract_result_ids_from_response(
response: &WorkspaceDiagnosticReportResult,
) -> Vec<PreviousResultId> {
match response {
WorkspaceDiagnosticReportResult::Report(report) => {
report
.items
.iter()
.filter_map(|item| match item {
WorkspaceDocumentDiagnosticReport::Full(full_report) => {
let result_id = full_report
.full_document_diagnostic_report
.result_id
.as_ref()?;
Some(PreviousResultId {
uri: full_report.uri.clone(),
value: result_id.clone(),
})
}
WorkspaceDocumentDiagnosticReport::Unchanged(_) => {
// Unchanged reports don't provide new result IDs
None
}
})
.collect()
}
WorkspaceDiagnosticReportResult::Partial(partial) => {
// For partial results, extract from items the same way
partial
.items
.iter()
.filter_map(|item| match item {
WorkspaceDocumentDiagnosticReport::Full(full_report) => {
let result_id = full_report
.full_document_diagnostic_report
.result_id
.as_ref()?;
Some(PreviousResultId {
uri: full_report.uri.clone(),
value: result_id.clone(),
})
}
WorkspaceDocumentDiagnosticReport::Unchanged(_) => None,
})
.collect()
}
}
}

View file

@ -0,0 +1,13 @@
---
source: crates/ty_server/tests/e2e/pull_diagnostics.rs
expression: result
---
Err(
ResponseError(
ResponseError {
code: -32800,
message: "request was cancelled by client",
data: None,
},
),
)

View file

@ -0,0 +1,112 @@
---
source: crates/ty_server/tests/e2e/pull_diagnostics.rs
expression: workspace_response
---
Report(
WorkspaceDiagnosticReport {
items: [
Full(
WorkspaceFullDocumentDiagnosticReport {
uri: Url {
scheme: "file",
cannot_be_a_base: false,
username: "",
password: None,
host: None,
port: None,
path: "<temp_dir>/src/test.py",
query: None,
fragment: None,
},
version: Some(
2,
),
full_document_diagnostic_report: FullDocumentDiagnosticReport {
result_id: Some(
"[RESULT_ID]",
),
items: [
Diagnostic {
range: Range {
start: Position {
line: 1,
character: 11,
},
end: Position {
line: 1,
character: 13,
},
},
severity: Some(
Error,
),
code: Some(
String(
"invalid-return-type",
),
),
code_description: Some(
CodeDescription {
href: Url {
scheme: "https",
cannot_be_a_base: false,
username: "",
password: None,
host: Some(
Domain(
"ty.dev",
),
),
port: None,
path: "/rules",
query: None,
fragment: Some(
"invalid-return-type",
),
},
},
),
source: Some(
"ty",
),
message: "Return type does not match returned value: expected `str`, found `Literal[42]`",
related_information: Some(
[
DiagnosticRelatedInformation {
location: Location {
uri: Url {
scheme: "file",
cannot_be_a_base: false,
username: "",
password: None,
host: None,
port: None,
path: "<temp_dir>/src/test.py",
query: None,
fragment: None,
},
range: Range {
start: Position {
line: 0,
character: 15,
},
end: Position {
line: 0,
character: 18,
},
},
},
message: "Expected `str` because of return type",
},
],
),
tags: None,
data: None,
},
],
},
},
),
],
},
)

View file

@ -0,0 +1,9 @@
---
source: crates/ty_server/tests/e2e/pull_diagnostics.rs
expression: workspace_response
---
Report(
WorkspaceDiagnosticReport {
items: [],
},
)

View file

@ -0,0 +1,112 @@
---
source: crates/ty_server/tests/e2e/pull_diagnostics.rs
expression: first_response
---
Report(
WorkspaceDiagnosticReport {
items: [
Full(
WorkspaceFullDocumentDiagnosticReport {
uri: Url {
scheme: "file",
cannot_be_a_base: false,
username: "",
password: None,
host: None,
port: None,
path: "<temp_dir>/src/test.py",
query: None,
fragment: None,
},
version: Some(
2,
),
full_document_diagnostic_report: FullDocumentDiagnosticReport {
result_id: Some(
"[RESULT_ID]",
),
items: [
Diagnostic {
range: Range {
start: Position {
line: 1,
character: 11,
},
end: Position {
line: 1,
character: 13,
},
},
severity: Some(
Error,
),
code: Some(
String(
"invalid-return-type",
),
),
code_description: Some(
CodeDescription {
href: Url {
scheme: "https",
cannot_be_a_base: false,
username: "",
password: None,
host: Some(
Domain(
"ty.dev",
),
),
port: None,
path: "/rules",
query: None,
fragment: Some(
"invalid-return-type",
),
},
},
),
source: Some(
"ty",
),
message: "Return type does not match returned value: expected `str`, found `Literal[42]`",
related_information: Some(
[
DiagnosticRelatedInformation {
location: Location {
uri: Url {
scheme: "file",
cannot_be_a_base: false,
username: "",
password: None,
host: None,
port: None,
path: "<temp_dir>/src/test.py",
query: None,
fragment: None,
},
range: Range {
start: Position {
line: 0,
character: 15,
},
end: Position {
line: 0,
character: 18,
},
},
},
message: "Expected `str` because of return type",
},
],
),
tags: None,
data: None,
},
],
},
},
),
],
},
)

View file

@ -0,0 +1,32 @@
---
source: crates/ty_server/tests/e2e/pull_diagnostics.rs
expression: second_response
---
Report(
WorkspaceDiagnosticReport {
items: [
Full(
WorkspaceFullDocumentDiagnosticReport {
uri: Url {
scheme: "file",
cannot_be_a_base: false,
username: "",
password: None,
host: None,
port: None,
path: "<temp_dir>/src/test.py",
query: None,
fragment: None,
},
version: Some(
3,
),
full_document_diagnostic_report: FullDocumentDiagnosticReport {
result_id: None,
items: [],
},
},
),
],
},
)