use crate::debug::TableEntry; use crate::durability::Durability; use crate::hash::FxIndexMap; use crate::plumbing::CycleRecoveryStrategy; use crate::plumbing::InputQueryStorageOps; use crate::plumbing::QueryStorageMassOps; use crate::plumbing::QueryStorageOps; use crate::revision::Revision; use crate::runtime::StampedValue; use crate::Database; use crate::Query; use crate::Runtime; use crate::{DatabaseKeyIndex, QueryDb}; use indexmap::map::Entry; use parking_lot::RwLock; use std::iter; use tracing::debug; /// Input queries store the result plus a list of the other queries /// that they invoked. This means we can avoid recomputing them when /// none of those inputs have changed. pub struct InputStorage where Q: Query, { group_index: u16, slots: RwLock>>, } struct Slot { key_index: u32, stamped_value: RwLock>, } impl std::panic::RefUnwindSafe for InputStorage where Q: Query, Q::Key: std::panic::RefUnwindSafe, Q::Value: std::panic::RefUnwindSafe, { } impl QueryStorageOps for InputStorage where Q: Query, { const CYCLE_STRATEGY: crate::plumbing::CycleRecoveryStrategy = CycleRecoveryStrategy::Panic; fn new(group_index: u16) -> Self { InputStorage { group_index, slots: Default::default() } } fn fmt_index( &self, _db: &>::DynDb, index: u32, fmt: &mut std::fmt::Formatter<'_>, ) -> std::fmt::Result { let slot_map = self.slots.read(); let key = slot_map.get_index(index as usize).unwrap().0; write!(fmt, "{}({:?})", Q::QUERY_NAME, key) } fn maybe_changed_after( &self, db: &>::DynDb, index: u32, revision: Revision, ) -> bool { debug_assert!(revision < db.salsa_runtime().current_revision()); let slots = &self.slots.read(); let Some((_, slot)) = slots.get_index(index as usize) else { return true; }; debug!("maybe_changed_after(slot={:?}, revision={:?})", Q::default(), revision,); let changed_at = slot.stamped_value.read().changed_at; debug!("maybe_changed_after: changed_at = {:?}", changed_at); changed_at > revision } fn fetch(&self, db: &>::DynDb, key: &Q::Key) -> Q::Value { db.unwind_if_cancelled(); let slots = &self.slots.read(); let slot = slots .get(key) .unwrap_or_else(|| panic!("no value set for {:?}({:?})", Q::default(), key)); let StampedValue { value, durability, changed_at } = slot.stamped_value.read().clone(); db.salsa_runtime().report_query_read_and_unwind_if_cycle_resulted( DatabaseKeyIndex { group_index: self.group_index, query_index: Q::QUERY_INDEX, key_index: slot.key_index, }, durability, changed_at, ); value } fn durability(&self, _db: &>::DynDb, key: &Q::Key) -> Durability { match self.slots.read().get(key) { Some(slot) => slot.stamped_value.read().durability, None => panic!("no value set for {:?}({:?})", Q::default(), key), } } fn entries(&self, _db: &>::DynDb) -> C where C: std::iter::FromIterator>, { let slots = self.slots.read(); slots .iter() .map(|(key, slot)| { TableEntry::new(key.clone(), Some(slot.stamped_value.read().value.clone())) }) .collect() } } impl QueryStorageMassOps for InputStorage where Q: Query, { fn purge(&self) { *self.slots.write() = Default::default(); } } impl InputQueryStorageOps for InputStorage where Q: Query, { fn set(&self, runtime: &mut Runtime, key: &Q::Key, value: Q::Value, durability: Durability) { tracing::debug!("{:?}({:?}) = {:?} ({:?})", Q::default(), key, value, durability); // The value is changing, so we need a new revision (*). We also // need to update the 'last changed' revision by invoking // `guard.mark_durability_as_changed`. // // CAREFUL: This will block until the global revision lock can // be acquired. If there are still queries executing, they may // need to read from this input. Therefore, we wait to acquire // the lock on `map` until we also hold the global query write // lock. // // (*) Technically, since you can't presently access an input // for a non-existent key, and you can't enumerate the set of // keys, we only need a new revision if the key used to // exist. But we may add such methods in the future and this // case doesn't generally seem worth optimizing for. runtime.with_incremented_revision(|next_revision| { let mut slots = self.slots.write(); // Do this *after* we acquire the lock, so that we are not // racing with somebody else to modify this same cell. // (Otherwise, someone else might write a *newer* revision // into the same cell while we block on the lock.) let stamped_value = StampedValue { value, durability, changed_at: next_revision }; match slots.entry(key.clone()) { Entry::Occupied(entry) => { let mut slot_stamped_value = entry.get().stamped_value.write(); let old_durability = slot_stamped_value.durability; *slot_stamped_value = stamped_value; Some(old_durability) } Entry::Vacant(entry) => { let key_index = entry.index() as u32; entry.insert(Slot { key_index, stamped_value: RwLock::new(stamped_value) }); None } } }); } } /// Same as `InputStorage`, but optimized for queries that take no inputs. pub struct UnitInputStorage where Q: Query, { slot: UnitSlot, } struct UnitSlot { database_key_index: DatabaseKeyIndex, stamped_value: RwLock>>, } impl std::panic::RefUnwindSafe for UnitInputStorage where Q: Query, Q::Key: std::panic::RefUnwindSafe, Q::Value: std::panic::RefUnwindSafe, { } impl QueryStorageOps for UnitInputStorage where Q: Query, { const CYCLE_STRATEGY: crate::plumbing::CycleRecoveryStrategy = CycleRecoveryStrategy::Panic; fn new(group_index: u16) -> Self { let database_key_index = DatabaseKeyIndex { group_index, query_index: Q::QUERY_INDEX, key_index: 0 }; UnitInputStorage { slot: UnitSlot { database_key_index, stamped_value: RwLock::new(None) } } } fn fmt_index( &self, _db: &>::DynDb, _index: u32, fmt: &mut std::fmt::Formatter<'_>, ) -> std::fmt::Result { write!(fmt, "{}", Q::QUERY_NAME) } fn maybe_changed_after( &self, db: &>::DynDb, _index: u32, revision: Revision, ) -> bool { debug_assert!(revision < db.salsa_runtime().current_revision()); debug!("maybe_changed_after(slot={:?}, revision={:?})", Q::default(), revision,); let Some(value) = &*self.slot.stamped_value.read() else { return true; }; let changed_at = value.changed_at; debug!("maybe_changed_after: changed_at = {:?}", changed_at); changed_at > revision } fn fetch(&self, db: &>::DynDb, &(): &Q::Key) -> Q::Value { db.unwind_if_cancelled(); let StampedValue { value, durability, changed_at } = self .slot .stamped_value .read() .clone() .unwrap_or_else(|| panic!("no value set for {:?}", Q::default())); db.salsa_runtime().report_query_read_and_unwind_if_cycle_resulted( self.slot.database_key_index, durability, changed_at, ); value } fn durability(&self, _db: &>::DynDb, &(): &Q::Key) -> Durability { match &*self.slot.stamped_value.read() { Some(stamped_value) => stamped_value.durability, None => panic!("no value set for {:?}", Q::default(),), } } fn entries(&self, _db: &>::DynDb) -> C where C: std::iter::FromIterator>, { iter::once(TableEntry::new( (), self.slot.stamped_value.read().as_ref().map(|it| it.value.clone()), )) .collect() } } impl QueryStorageMassOps for UnitInputStorage where Q: Query, { fn purge(&self) { *self.slot.stamped_value.write() = Default::default(); } } impl InputQueryStorageOps for UnitInputStorage where Q: Query, { fn set(&self, runtime: &mut Runtime, (): &Q::Key, value: Q::Value, durability: Durability) { tracing::debug!("{:?} = {:?} ({:?})", Q::default(), value, durability); // The value is changing, so we need a new revision (*). We also // need to update the 'last changed' revision by invoking // `guard.mark_durability_as_changed`. // // CAREFUL: This will block until the global revision lock can // be acquired. If there are still queries executing, they may // need to read from this input. Therefore, we wait to acquire // the lock on `map` until we also hold the global query write // lock. // // (*) Technically, since you can't presently access an input // for a non-existent key, and you can't enumerate the set of // keys, we only need a new revision if the key used to // exist. But we may add such methods in the future and this // case doesn't generally seem worth optimizing for. runtime.with_incremented_revision(|next_revision| { let mut stamped_value_slot = self.slot.stamped_value.write(); // Do this *after* we acquire the lock, so that we are not // racing with somebody else to modify this same cell. // (Otherwise, someone else might write a *newer* revision // into the same cell while we block on the lock.) let stamped_value = StampedValue { value, durability, changed_at: next_revision }; match &mut *stamped_value_slot { Some(slot_stamped_value) => { let old_durability = slot_stamped_value.durability; *slot_stamped_value = stamped_value; Some(old_durability) } stamped_value_slot @ None => { *stamped_value_slot = Some(stamped_value); None } } }); } } /// Check that `Slot: Send + Sync` as long as /// `DB::DatabaseData: Send + Sync`, which in turn implies that /// `Q::Key: Send + Sync`, `Q::Value: Send + Sync`. #[allow(dead_code)] fn check_send_sync() where Q: Query, Q::Key: Send + Sync, Q::Value: Send + Sync, { fn is_send_sync() {} is_send_sync::>(); is_send_sync::>(); } /// Check that `Slot: 'static` as long as /// `DB::DatabaseData: 'static`, which in turn implies that /// `Q::Key: 'static`, `Q::Value: 'static`. #[allow(dead_code)] fn check_static() where Q: Query + 'static, Q::Key: 'static, Q::Value: 'static, { fn is_static() {} is_static::>(); is_static::>(); }