mirror of
https://github.com/rust-lang/rust-analyzer.git
synced 2025-11-01 12:24:29 +00:00
Better manage parallel prime caches
To make best use of available cores, and don't waste time waiting for other tasks. See the comments in the code for explanation.
This commit is contained in:
parent
d7e977a8f1
commit
27dc8ad21b
2 changed files with 173 additions and 251 deletions
|
|
@ -2,12 +2,10 @@
|
|||
//! sometimes is counter productive when, for example, the first goto definition
|
||||
//! request takes longer to compute. This module implements prepopulation of
|
||||
//! various caches, it's not really advanced at the moment.
|
||||
mod topologic_sort;
|
||||
|
||||
use std::time::Duration;
|
||||
use std::panic::AssertUnwindSafe;
|
||||
|
||||
use hir::{Symbol, db::DefDatabase};
|
||||
use itertools::Itertools;
|
||||
use rustc_hash::FxHashMap;
|
||||
use salsa::{Cancelled, Database};
|
||||
|
||||
use crate::{
|
||||
|
|
@ -35,58 +33,112 @@ pub fn parallel_prime_caches(
|
|||
) {
|
||||
let _p = tracing::info_span!("parallel_prime_caches").entered();
|
||||
|
||||
let mut crates_to_prime = {
|
||||
// FIXME: We already have the crate list topologically sorted (but without the things
|
||||
// `TopologicalSortIter` gives us). Maybe there is a way to avoid using it and rip it out
|
||||
// of the codebase?
|
||||
let mut builder = topologic_sort::TopologicalSortIter::builder();
|
||||
|
||||
for &crate_id in db.all_crates().iter() {
|
||||
builder.add(crate_id, crate_id.data(db).dependencies.iter().map(|d| d.crate_id));
|
||||
}
|
||||
|
||||
builder.build()
|
||||
};
|
||||
|
||||
enum ParallelPrimeCacheWorkerProgress {
|
||||
BeginCrate { crate_id: Crate, crate_name: Symbol },
|
||||
EndCrate { crate_id: Crate },
|
||||
BeginCrateDefMap { crate_id: Crate, crate_name: Symbol },
|
||||
EndCrateDefMap { crate_id: Crate },
|
||||
EndCrateImportMap,
|
||||
EndModuleSymbols,
|
||||
Cancelled(Cancelled),
|
||||
}
|
||||
|
||||
// We split off def map computation from other work,
|
||||
// as the def map is the relevant one. Once the defmaps are computed
|
||||
// the project is ready to go, the other indices are just nice to have for some IDE features.
|
||||
#[derive(PartialOrd, Ord, PartialEq, Eq, Copy, Clone)]
|
||||
enum PrimingPhase {
|
||||
DefMap,
|
||||
ImportMap,
|
||||
CrateSymbols,
|
||||
}
|
||||
// The setup here is a bit complicated. We try to make best use of compute resources.
|
||||
// The idea is that if we have a def map available to compute, we should do that first.
|
||||
// This is because def map is a dependency of both import map and symbols. So if we have
|
||||
// e.g. a def map and a symbols, if we compute the def map we can, after it completes,
|
||||
// compute the def maps of dependencies, the existing symbols and the symbols of the
|
||||
// new crate, all in parallel. But if we compute the symbols, after that we will only
|
||||
// have the def map to compute, and the rest of the CPU cores will rest, which is not
|
||||
// good.
|
||||
// However, it's better to compute symbols/import map than to compute a def map that
|
||||
// isn't ready yet, because one of its dependencies hasn't yet completed its def map.
|
||||
// Such def map will just block on the dependency, which is just wasted time. So better
|
||||
// to compute the symbols/import map of an already computed def map in that time.
|
||||
|
||||
let (work_sender, progress_receiver) = {
|
||||
let (reverse_deps, mut to_be_done_deps) = {
|
||||
let all_crates = db.all_crates();
|
||||
let mut reverse_deps =
|
||||
all_crates.iter().map(|&krate| (krate, Vec::new())).collect::<FxHashMap<_, _>>();
|
||||
let mut to_be_done_deps =
|
||||
all_crates.iter().map(|&krate| (krate, 0u32)).collect::<FxHashMap<_, _>>();
|
||||
for &krate in &*all_crates {
|
||||
for dep in &krate.data(db).dependencies {
|
||||
reverse_deps.get_mut(&dep.crate_id).unwrap().push(krate);
|
||||
*to_be_done_deps.get_mut(&krate).unwrap() += 1;
|
||||
}
|
||||
}
|
||||
(reverse_deps, to_be_done_deps)
|
||||
};
|
||||
|
||||
let (def_map_work_sender, import_map_work_sender, symbols_work_sender, progress_receiver) = {
|
||||
let (progress_sender, progress_receiver) = crossbeam_channel::unbounded();
|
||||
let (work_sender, work_receiver) = crossbeam_channel::unbounded();
|
||||
let (def_map_work_sender, def_map_work_receiver) = crossbeam_channel::unbounded();
|
||||
let (import_map_work_sender, import_map_work_receiver) = crossbeam_channel::unbounded();
|
||||
let (symbols_work_sender, symbols_work_receiver) = crossbeam_channel::unbounded();
|
||||
let prime_caches_worker = move |db: RootDatabase| {
|
||||
while let Ok((crate_id, crate_name, kind)) = work_receiver.recv() {
|
||||
progress_sender
|
||||
.send(ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name })?;
|
||||
let handle_def_map = |crate_id, crate_name| {
|
||||
progress_sender.send(ParallelPrimeCacheWorkerProgress::BeginCrateDefMap {
|
||||
crate_id,
|
||||
crate_name,
|
||||
})?;
|
||||
|
||||
let cancelled = Cancelled::catch(|| match kind {
|
||||
PrimingPhase::DefMap => _ = hir::crate_def_map(&db, crate_id),
|
||||
PrimingPhase::ImportMap => _ = db.import_map(crate_id),
|
||||
PrimingPhase::CrateSymbols => _ = db.crate_symbols(crate_id.into()),
|
||||
});
|
||||
let cancelled = Cancelled::catch(|| _ = hir::crate_def_map(&db, crate_id));
|
||||
|
||||
match cancelled {
|
||||
Ok(()) => progress_sender
|
||||
.send(ParallelPrimeCacheWorkerProgress::EndCrate { crate_id })?,
|
||||
.send(ParallelPrimeCacheWorkerProgress::EndCrateDefMap { crate_id })?,
|
||||
Err(cancelled) => progress_sender
|
||||
.send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
|
||||
}
|
||||
}
|
||||
|
||||
Ok::<_, crossbeam_channel::SendError<_>>(())
|
||||
Ok::<_, crossbeam_channel::SendError<_>>(())
|
||||
};
|
||||
let handle_import_map = |crate_id| {
|
||||
let cancelled = Cancelled::catch(|| _ = db.import_map(crate_id));
|
||||
|
||||
match cancelled {
|
||||
Ok(()) => {
|
||||
progress_sender.send(ParallelPrimeCacheWorkerProgress::EndCrateImportMap)?
|
||||
}
|
||||
Err(cancelled) => progress_sender
|
||||
.send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
|
||||
}
|
||||
|
||||
Ok::<_, crossbeam_channel::SendError<_>>(())
|
||||
};
|
||||
let handle_symbols = |module| {
|
||||
let cancelled =
|
||||
Cancelled::catch(AssertUnwindSafe(|| _ = db.module_symbols(module)));
|
||||
|
||||
match cancelled {
|
||||
Ok(()) => {
|
||||
progress_sender.send(ParallelPrimeCacheWorkerProgress::EndModuleSymbols)?
|
||||
}
|
||||
Err(cancelled) => progress_sender
|
||||
.send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
|
||||
}
|
||||
|
||||
Ok::<_, crossbeam_channel::SendError<_>>(())
|
||||
};
|
||||
|
||||
loop {
|
||||
db.unwind_if_revision_cancelled();
|
||||
|
||||
// Biased because we want to prefer def maps.
|
||||
crossbeam_channel::select_biased! {
|
||||
recv(def_map_work_receiver) -> work => {
|
||||
let Ok((crate_id, crate_name)) = work else { return Ok::<_, crossbeam_channel::SendError<_>>(()) };
|
||||
handle_def_map(crate_id, crate_name)?;
|
||||
}
|
||||
recv(import_map_work_receiver) -> work => {
|
||||
let Ok(crate_id) = work else { return Ok(()) };
|
||||
handle_import_map(crate_id)?;
|
||||
}
|
||||
recv(symbols_work_receiver) -> work => {
|
||||
let Ok(module) = work else { return Ok(()) };
|
||||
handle_symbols(module)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
for id in 0..num_worker_threads {
|
||||
|
|
@ -103,138 +155,112 @@ pub fn parallel_prime_caches(
|
|||
.expect("failed to spawn thread");
|
||||
}
|
||||
|
||||
(work_sender, progress_receiver)
|
||||
(def_map_work_sender, import_map_work_sender, symbols_work_sender, progress_receiver)
|
||||
};
|
||||
|
||||
let crates_total = crates_to_prime.pending();
|
||||
let mut crates_done = 0;
|
||||
let crate_def_maps_total = db.all_crates().len();
|
||||
let mut crate_def_maps_done = 0;
|
||||
let (mut crate_import_maps_total, mut crate_import_maps_done) = (0usize, 0usize);
|
||||
let (mut module_symbols_total, mut module_symbols_done) = (0usize, 0usize);
|
||||
|
||||
// an index map is used to preserve ordering so we can sort the progress report in order of
|
||||
// "longest crate to index" first
|
||||
let mut crates_currently_indexing =
|
||||
FxIndexMap::with_capacity_and_hasher(num_worker_threads, Default::default());
|
||||
|
||||
let mut additional_phases = vec![];
|
||||
|
||||
while crates_done < crates_total {
|
||||
db.unwind_if_revision_cancelled();
|
||||
|
||||
for krate in &mut crates_to_prime {
|
||||
let name = krate.extra_data(db).display_name.as_deref().cloned().unwrap_or_else(|| {
|
||||
Symbol::integer(salsa::plumbing::AsId::as_id(&krate).as_u32() as usize)
|
||||
});
|
||||
let origin = &krate.data(db).origin;
|
||||
if origin.is_lang() {
|
||||
additional_phases.push((krate, name.clone(), PrimingPhase::ImportMap));
|
||||
} else if origin.is_local() {
|
||||
// Compute the symbol search index.
|
||||
// This primes the cache for `ide_db::symbol_index::world_symbols()`.
|
||||
//
|
||||
// We do this for workspace crates only (members of local_roots), because doing it
|
||||
// for all dependencies could be *very* unnecessarily slow in a large project.
|
||||
//
|
||||
// FIXME: We should do it unconditionally if the configuration is set to default to
|
||||
// searching dependencies (rust-analyzer.workspace.symbol.search.scope), but we
|
||||
// would need to pipe that configuration information down here.
|
||||
additional_phases.push((krate, name.clone(), PrimingPhase::CrateSymbols));
|
||||
}
|
||||
|
||||
work_sender.send((krate, name, PrimingPhase::DefMap)).ok();
|
||||
for (&krate, &to_be_done_deps) in &to_be_done_deps {
|
||||
if to_be_done_deps != 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
// recv_timeout is somewhat a hack, we need a way to from this thread check to see if the current salsa revision
|
||||
// is cancelled on a regular basis. workers will only exit if they are processing a task that is cancelled, or
|
||||
// if this thread exits, and closes the work channel.
|
||||
let worker_progress = match progress_receiver.recv_timeout(Duration::from_millis(10)) {
|
||||
let name = crate_name(db, krate);
|
||||
def_map_work_sender.send((krate, name)).ok();
|
||||
}
|
||||
|
||||
while crate_def_maps_done < crate_def_maps_total
|
||||
|| crate_import_maps_done < crate_import_maps_total
|
||||
|| module_symbols_done < module_symbols_total
|
||||
{
|
||||
db.unwind_if_revision_cancelled();
|
||||
|
||||
let progress = ParallelPrimeCachesProgress {
|
||||
crates_currently_indexing: crates_currently_indexing.values().cloned().collect(),
|
||||
crates_done: crate_def_maps_done,
|
||||
crates_total: crate_def_maps_total,
|
||||
work_type: "Indexing",
|
||||
};
|
||||
|
||||
cb(progress);
|
||||
|
||||
// Biased to prefer progress updates (and because it's faster).
|
||||
let progress = match progress_receiver.recv() {
|
||||
Ok(p) => p,
|
||||
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
|
||||
continue;
|
||||
}
|
||||
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
|
||||
Err(crossbeam_channel::RecvError) => {
|
||||
// all our workers have exited, mark us as finished and exit
|
||||
cb(ParallelPrimeCachesProgress {
|
||||
crates_currently_indexing: vec![],
|
||||
crates_done,
|
||||
crates_total: crates_done,
|
||||
crates_done: crate_def_maps_done,
|
||||
crates_total: crate_def_maps_done,
|
||||
work_type: "Indexing",
|
||||
});
|
||||
return;
|
||||
}
|
||||
};
|
||||
match worker_progress {
|
||||
ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => {
|
||||
|
||||
match progress {
|
||||
ParallelPrimeCacheWorkerProgress::BeginCrateDefMap { crate_id, crate_name } => {
|
||||
crates_currently_indexing.insert(crate_id, crate_name);
|
||||
}
|
||||
ParallelPrimeCacheWorkerProgress::EndCrate { crate_id } => {
|
||||
ParallelPrimeCacheWorkerProgress::EndCrateDefMap { crate_id } => {
|
||||
crates_currently_indexing.swap_remove(&crate_id);
|
||||
crates_to_prime.mark_done(crate_id);
|
||||
crates_done += 1;
|
||||
crate_def_maps_done += 1;
|
||||
|
||||
// Fire ready dependencies.
|
||||
for &dep in &reverse_deps[&crate_id] {
|
||||
let to_be_done = to_be_done_deps.get_mut(&dep).unwrap();
|
||||
*to_be_done -= 1;
|
||||
if *to_be_done == 0 {
|
||||
let dep_name = crate_name(db, dep);
|
||||
def_map_work_sender.send((dep, dep_name)).ok();
|
||||
}
|
||||
}
|
||||
|
||||
let origin = &crate_id.data(db).origin;
|
||||
if origin.is_lang() {
|
||||
crate_import_maps_total += 1;
|
||||
import_map_work_sender.send(crate_id).ok();
|
||||
} else if origin.is_local() {
|
||||
// Compute the symbol search index.
|
||||
// This primes the cache for `ide_db::symbol_index::world_symbols()`.
|
||||
//
|
||||
// We do this for workspace crates only (members of local_roots), because doing it
|
||||
// for all dependencies could be *very* unnecessarily slow in a large project.
|
||||
//
|
||||
// FIXME: We should do it unconditionally if the configuration is set to default to
|
||||
// searching dependencies (rust-analyzer.workspace.symbol.search.scope), but we
|
||||
// would need to pipe that configuration information down here.
|
||||
let modules = hir::Crate::from(crate_id).modules(db);
|
||||
module_symbols_total += modules.len();
|
||||
for module in modules {
|
||||
symbols_work_sender.send(module).ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
ParallelPrimeCacheWorkerProgress::EndCrateImportMap => crate_import_maps_done += 1,
|
||||
ParallelPrimeCacheWorkerProgress::EndModuleSymbols => module_symbols_done += 1,
|
||||
ParallelPrimeCacheWorkerProgress::Cancelled(cancelled) => {
|
||||
// Cancelled::throw should probably be public
|
||||
std::panic::resume_unwind(Box::new(cancelled));
|
||||
}
|
||||
};
|
||||
|
||||
let progress = ParallelPrimeCachesProgress {
|
||||
crates_currently_indexing: crates_currently_indexing.values().cloned().collect(),
|
||||
crates_done,
|
||||
crates_total,
|
||||
work_type: "Indexing",
|
||||
};
|
||||
|
||||
cb(progress);
|
||||
}
|
||||
|
||||
let mut crates_done = 0;
|
||||
let crates_total = additional_phases.len();
|
||||
for w in additional_phases.into_iter().sorted_by_key(|&(_, _, phase)| phase) {
|
||||
work_sender.send(w).ok();
|
||||
}
|
||||
|
||||
while crates_done < crates_total {
|
||||
db.unwind_if_revision_cancelled();
|
||||
|
||||
// recv_timeout is somewhat a hack, we need a way to from this thread check to see if the current salsa revision
|
||||
// is cancelled on a regular basis. workers will only exit if they are processing a task that is cancelled, or
|
||||
// if this thread exits, and closes the work channel.
|
||||
let worker_progress = match progress_receiver.recv_timeout(Duration::from_millis(10)) {
|
||||
Ok(p) => p,
|
||||
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
|
||||
continue;
|
||||
}
|
||||
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
|
||||
// all our workers have exited, mark us as finished and exit
|
||||
cb(ParallelPrimeCachesProgress {
|
||||
crates_currently_indexing: vec![],
|
||||
crates_done,
|
||||
crates_total: crates_done,
|
||||
work_type: "Populating symbols",
|
||||
});
|
||||
return;
|
||||
}
|
||||
};
|
||||
match worker_progress {
|
||||
ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => {
|
||||
crates_currently_indexing.insert(crate_id, crate_name);
|
||||
}
|
||||
ParallelPrimeCacheWorkerProgress::EndCrate { crate_id } => {
|
||||
crates_currently_indexing.swap_remove(&crate_id);
|
||||
crates_done += 1;
|
||||
}
|
||||
ParallelPrimeCacheWorkerProgress::Cancelled(cancelled) => {
|
||||
// Cancelled::throw should probably be public
|
||||
std::panic::resume_unwind(Box::new(cancelled));
|
||||
}
|
||||
};
|
||||
|
||||
let progress = ParallelPrimeCachesProgress {
|
||||
crates_currently_indexing: crates_currently_indexing.values().cloned().collect(),
|
||||
crates_done,
|
||||
crates_total,
|
||||
work_type: "Populating symbols",
|
||||
};
|
||||
|
||||
cb(progress);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn crate_name(db: &RootDatabase, krate: Crate) -> Symbol {
|
||||
krate
|
||||
.extra_data(db)
|
||||
.display_name
|
||||
.as_deref()
|
||||
.cloned()
|
||||
.unwrap_or_else(|| Symbol::integer(salsa::plumbing::AsId::as_id(&krate).as_u32() as usize))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,104 +0,0 @@
|
|||
//! helper data structure to schedule work for parallel prime caches.
|
||||
use std::{collections::VecDeque, hash::Hash};
|
||||
|
||||
use crate::FxHashMap;
|
||||
|
||||
pub(crate) struct TopologicSortIterBuilder<T> {
|
||||
nodes: FxHashMap<T, Entry<T>>,
|
||||
}
|
||||
|
||||
// this implementation has different bounds on T than would be implied by #[derive(Default)]
|
||||
impl<T> Default for TopologicSortIterBuilder<T>
|
||||
where
|
||||
T: Copy + Eq + PartialEq + Hash,
|
||||
{
|
||||
fn default() -> Self {
|
||||
Self { nodes: Default::default() }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> TopologicSortIterBuilder<T>
|
||||
where
|
||||
T: Copy + Eq + PartialEq + Hash,
|
||||
{
|
||||
fn get_or_create_entry(&mut self, item: T) -> &mut Entry<T> {
|
||||
self.nodes.entry(item).or_default()
|
||||
}
|
||||
|
||||
pub(crate) fn add(&mut self, item: T, predecessors: impl IntoIterator<Item = T>) {
|
||||
let mut num_predecessors = 0;
|
||||
|
||||
for predecessor in predecessors.into_iter() {
|
||||
self.get_or_create_entry(predecessor).successors.push(item);
|
||||
num_predecessors += 1;
|
||||
}
|
||||
|
||||
let entry = self.get_or_create_entry(item);
|
||||
entry.num_predecessors += num_predecessors;
|
||||
}
|
||||
|
||||
pub(crate) fn build(self) -> TopologicalSortIter<T> {
|
||||
let ready = self
|
||||
.nodes
|
||||
.iter()
|
||||
.filter_map(
|
||||
|(item, entry)| if entry.num_predecessors == 0 { Some(*item) } else { None },
|
||||
)
|
||||
.collect();
|
||||
|
||||
TopologicalSortIter { nodes: self.nodes, ready }
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct TopologicalSortIter<T> {
|
||||
ready: VecDeque<T>,
|
||||
nodes: FxHashMap<T, Entry<T>>,
|
||||
}
|
||||
|
||||
impl<T> TopologicalSortIter<T>
|
||||
where
|
||||
T: Copy + Eq + PartialEq + Hash,
|
||||
{
|
||||
pub(crate) fn builder() -> TopologicSortIterBuilder<T> {
|
||||
TopologicSortIterBuilder::default()
|
||||
}
|
||||
|
||||
pub(crate) fn pending(&self) -> usize {
|
||||
self.nodes.len()
|
||||
}
|
||||
|
||||
pub(crate) fn mark_done(&mut self, item: T) {
|
||||
let entry = self.nodes.remove(&item).expect("invariant: unknown item marked as done");
|
||||
|
||||
for successor in entry.successors {
|
||||
let succ_entry = self
|
||||
.nodes
|
||||
.get_mut(&successor)
|
||||
.expect("invariant: unknown successor referenced by entry");
|
||||
|
||||
succ_entry.num_predecessors -= 1;
|
||||
if succ_entry.num_predecessors == 0 {
|
||||
self.ready.push_back(successor);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Iterator for TopologicalSortIter<T> {
|
||||
type Item = T;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.ready.pop_front()
|
||||
}
|
||||
}
|
||||
|
||||
struct Entry<T> {
|
||||
successors: Vec<T>,
|
||||
num_predecessors: usize,
|
||||
}
|
||||
|
||||
impl<T> Default for Entry<T> {
|
||||
fn default() -> Self {
|
||||
Self { successors: Default::default(), num_predecessors: 0 }
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue