rewrite cycle handling to support fixed-point iteration (#603)

This commit is contained in:
Carl Meyer 2025-03-10 15:25:47 -07:00 committed by GitHub
parent 9ebc8a3913
commit 095d8b2b81
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
62 changed files with 3584 additions and 1637 deletions

View file

@ -1,5 +1,8 @@
use crate::{
zalsa::ZalsaDatabase, zalsa_local::ActiveQueryGuard, Cycle, Database, Event, EventKind,
cycle::{CycleRecoveryStrategy, MAX_ITERATIONS},
zalsa::ZalsaDatabase,
zalsa_local::ActiveQueryGuard,
Database, Event, EventKind,
};
use super::{memo::Memo, Configuration, IngredientImpl};
@ -20,12 +23,13 @@ where
pub(super) fn execute<'db>(
&'db self,
db: &'db C::DbView,
active_query: ActiveQueryGuard<'_>,
mut active_query: ActiveQueryGuard<'db>,
opt_old_memo: Option<&Memo<C::Output<'_>>>,
) -> &'db Memo<C::Output<'db>> {
let zalsa = db.zalsa();
let (zalsa, zalsa_local) = db.zalsas();
let revision_now = zalsa.current_revision();
let database_key_index = active_query.database_key_index;
let id = database_key_index.key_index;
tracing::info!("{:?}: executing query", database_key_index);
@ -35,59 +39,148 @@ where
})
});
// If we already executed this query once, then use the tracked-struct ids from the
// previous execution as the starting point for the new one.
if let Some(old_memo) = opt_old_memo {
active_query.seed_tracked_struct_ids(&old_memo.revisions.tracked_struct_ids);
}
let memo_ingredient_index = self.memo_ingredient_index(zalsa, id);
// Query was not previously executed, or value is potentially
// stale, or value is absent. Let's execute!
let database_key_index = active_query.database_key_index;
let id = database_key_index.key_index;
let value = match Cycle::catch(|| C::execute(db, C::id_to_input(db, id))) {
Ok(v) => v,
Err(cycle) => {
let mut iteration_count: u32 = 0;
let mut fell_back = false;
// Our provisional value from the previous iteration, when doing fixpoint iteration.
// Initially it's set to None, because the initial provisional value is created lazily,
// only when a cycle is actually encountered.
let mut opt_last_provisional: Option<&Memo<<C as Configuration>::Output<'db>>> = None;
loop {
// If we already executed this query once, then use the tracked-struct ids from the
// previous execution as the starting point for the new one.
if let Some(old_memo) = opt_old_memo {
active_query.seed_tracked_struct_ids(&old_memo.revisions.tracked_struct_ids);
}
// Query was not previously executed, or value is potentially
// stale, or value is absent. Let's execute!
let mut new_value = C::execute(db, C::id_to_input(db, id));
let mut revisions = active_query.pop();
// Did the new result we got depend on our own provisional value, in a cycle?
if C::CYCLE_STRATEGY == CycleRecoveryStrategy::Fixpoint
&& revisions.cycle_heads.contains(&database_key_index)
{
let opt_owned_last_provisional;
let last_provisional_value = if let Some(last_provisional) = opt_last_provisional {
// We have a last provisional value from our previous time around the loop.
last_provisional
.value
.as_ref()
.expect("provisional value should not be evicted by LRU")
} else {
// This is our first time around the loop; a provisional value must have been
// inserted into the memo table when the cycle was hit, so let's pull our
// initial provisional value from there.
opt_owned_last_provisional =
self.get_memo_from_table_for(zalsa, id, memo_ingredient_index);
debug_assert!(opt_owned_last_provisional
.as_ref()
.unwrap()
.may_be_provisional());
opt_owned_last_provisional
.expect(
"{database_key_index:#?} is a cycle head, \
but no provisional memo found",
)
.value
.as_ref()
.expect("provisional value should not be evicted by LRU")
};
tracing::debug!(
"{database_key_index:?}: caught cycle {cycle:?}, have strategy {:?}",
C::CYCLE_STRATEGY
"{database_key_index:?}: execute: \
I am a cycle head, comparing last provisional value with new value"
);
match C::CYCLE_STRATEGY {
crate::cycle::CycleRecoveryStrategy::Panic => cycle.throw(),
crate::cycle::CycleRecoveryStrategy::Fallback => {
if let Some(c) = active_query.take_cycle() {
assert!(c.is(&cycle));
C::recover_from_cycle(db, &cycle, C::id_to_input(db, id))
} else {
// we are not a participant in this cycle
debug_assert!(!cycle
.participant_keys()
.any(|k| k == database_key_index));
cycle.throw()
// If the new result is equal to the last provisional result, the cycle has
// converged and we are done.
if !C::values_equal(&new_value, last_provisional_value) {
if fell_back {
// We fell back to a value last iteration, but the fallback didn't result
// in convergence. We only have bad options here: continue iterating
// (ignoring the request to fall back), or forcibly use the fallback and
// leave the cycle in an inconsistent state (we'll be using a value for
// this query that it doesn't evaluate to, given its inputs). Maybe we'll
// have to go with the latter, but for now let's panic and see if real use
// cases need non-converging fallbacks.
panic!("{database_key_index:?}: execute: fallback did not converge");
}
// We are in a cycle that hasn't converged; ask the user's
// cycle-recovery function what to do:
match C::recover_from_cycle(
db,
&new_value,
iteration_count,
C::id_to_input(db, id),
) {
crate::CycleRecoveryAction::Iterate => {
tracing::debug!("{database_key_index:?}: execute: iterate again");
}
crate::CycleRecoveryAction::Fallback(fallback_value) => {
tracing::debug!(
"{database_key_index:?}: execute: user cycle_fn says to fall back"
);
new_value = fallback_value;
// We have to insert the fallback value for this query and then iterate
// one more time to fill in correct values for everything else in the
// cycle based on it; then we'll re-insert it as final value.
fell_back = true;
}
}
iteration_count = iteration_count
.checked_add(1)
.expect("fixpoint iteration should converge before u32::MAX iterations");
if iteration_count > MAX_ITERATIONS {
panic!("{database_key_index:?}: execute: too many cycle iterations");
}
opt_last_provisional = Some(self.insert_memo(
zalsa,
id,
Memo::new(Some(new_value), revision_now, revisions),
memo_ingredient_index,
));
active_query = zalsa_local.push_query(database_key_index);
continue;
}
tracing::debug!(
"{database_key_index:?}: execute: fixpoint iteration has a final value"
);
revisions.cycle_heads.remove(&database_key_index);
}
};
let mut revisions = active_query.pop();
// If the new value is equal to the old one, then it didn't
// really change, even if some of its inputs have. So we can
// "backdate" its `changed_at` revision to be the same as the
// old value.
if let Some(old_memo) = opt_old_memo {
self.backdate_if_appropriate(old_memo, &mut revisions, &value);
self.diff_outputs(zalsa, db, database_key_index, old_memo, &mut revisions);
tracing::debug!("{database_key_index:?}: execute: result.revisions = {revisions:#?}");
if let Some(old_memo) = opt_old_memo {
// If the new value is equal to the old one, then it didn't
// really change, even if some of its inputs have. So we can
// "backdate" its `changed_at` revision to be the same as the
// old value.
self.backdate_if_appropriate(old_memo, &mut revisions, &new_value);
// Diff the new outputs with the old, to discard any no-longer-emitted
// outputs and update the tracked struct IDs for seeding the next revision.
let provisional = !revisions.cycle_heads.is_empty();
self.diff_outputs(
zalsa,
db,
database_key_index,
old_memo,
&mut revisions,
provisional,
);
}
return self.insert_memo(
zalsa,
id,
Memo::new(Some(new_value), revision_now, revisions),
memo_ingredient_index,
);
}
tracing::debug!("{database_key_index:?}: read_upgrade: result.revisions = {revisions:#?}");
let memo_ingredient_index = self.memo_ingredient_index(zalsa, id);
self.insert_memo(
zalsa,
id,
Memo::new(Some(value), revision_now, revisions),
memo_ingredient_index,
)
}
}