This commit is contained in:
Lukas Wirth 2024-02-07 16:30:00 +01:00
parent 159a03ad7b
commit 0a6197df97
21 changed files with 128 additions and 365 deletions

View file

@ -103,21 +103,14 @@ impl DependencyGraph {
// load up the next thread (i.e., we start at B/QB2,
// and then load up the dependency on C/QC2).
let edge = self.edges.get_mut(&id).unwrap();
let prefix = edge
.stack
.iter_mut()
.take_while(|p| p.database_key_index != key)
.count();
let prefix = edge.stack.iter_mut().take_while(|p| p.database_key_index != key).count();
closure(&mut edge.stack[prefix..]);
id = edge.blocked_on_id;
key = edge.blocked_on_key;
}
// Finally, we copy in the results from `from_stack`.
let prefix = from_stack
.iter_mut()
.take_while(|p| p.database_key_index != key)
.count();
let prefix = from_stack.iter_mut().take_while(|p| p.database_key_index != key).count();
closure(&mut from_stack[prefix..]);
}
@ -141,24 +134,13 @@ impl DependencyGraph {
let mut others_unblocked = false;
while id != from_id {
let edge = self.edges.get(&id).unwrap();
let prefix = edge
.stack
.iter()
.take_while(|p| p.database_key_index != key)
.count();
let prefix = edge.stack.iter().take_while(|p| p.database_key_index != key).count();
let next_id = edge.blocked_on_id;
let next_key = edge.blocked_on_key;
if let Some(cycle) = edge.stack[prefix..]
.iter()
.rev()
.find_map(|aq| aq.cycle.clone())
{
if let Some(cycle) = edge.stack[prefix..].iter().rev().find_map(|aq| aq.cycle.clone()) {
// Remove `id` from the list of runtimes blocked on `next_key`:
self.query_dependents
.get_mut(&next_key)
.unwrap()
.retain(|r| *r != id);
self.query_dependents.get_mut(&next_key).unwrap().retain(|r| *r != id);
// Unblock runtime so that it can resume execution once lock is released:
self.unblock_runtime(id, WaitResult::Cycle(cycle));
@ -170,10 +152,7 @@ impl DependencyGraph {
key = next_key;
}
let prefix = from_stack
.iter()
.take_while(|p| p.database_key_index != key)
.count();
let prefix = from_stack.iter().take_while(|p| p.database_key_index != key).count();
let this_unblocked = from_stack[prefix..].iter().any(|aq| aq.cycle.is_some());
(this_unblocked, others_unblocked)
@ -239,10 +218,7 @@ impl DependencyGraph {
condvar: condvar.clone(),
},
);
self.query_dependents
.entry(database_key)
.or_default()
.push(from_id);
self.query_dependents.entry(database_key).or_default().push(from_id);
condvar
}
@ -253,10 +229,7 @@ impl DependencyGraph {
database_key: DatabaseKeyIndex,
wait_result: WaitResult,
) {
let dependents = self
.query_dependents
.remove(&database_key)
.unwrap_or_default();
let dependents = self.query_dependents.remove(&database_key).unwrap_or_default();
for from_id in dependents {
self.unblock_runtime(from_id, wait_result.clone());

View file

@ -53,9 +53,7 @@ pub(crate) enum QueryInputs {
impl Default for LocalState {
fn default() -> Self {
LocalState {
query_stack: RefCell::new(Some(Vec::new())),
}
LocalState { query_stack: RefCell::new(Some(Vec::new())) }
}
}
@ -65,19 +63,11 @@ impl LocalState {
let mut query_stack = self.query_stack.borrow_mut();
let query_stack = query_stack.as_mut().expect("local stack taken");
query_stack.push(ActiveQuery::new(database_key_index));
ActiveQueryGuard {
local_state: self,
database_key_index,
push_len: query_stack.len(),
}
ActiveQueryGuard { local_state: self, database_key_index, push_len: query_stack.len() }
}
fn with_query_stack<R>(&self, c: impl FnOnce(&mut Vec<ActiveQuery>) -> R) -> R {
c(self
.query_stack
.borrow_mut()
.as_mut()
.expect("query stack taken"))
c(self.query_stack.borrow_mut().as_mut().expect("query stack taken"))
}
pub(super) fn query_in_progress(&self) -> bool {
@ -86,9 +76,7 @@ impl LocalState {
pub(super) fn active_query(&self) -> Option<DatabaseKeyIndex> {
self.with_query_stack(|stack| {
stack
.last()
.map(|active_query| active_query.database_key_index)
stack.last().map(|active_query| active_query.database_key_index)
})
}
@ -156,10 +144,7 @@ impl LocalState {
/// the current thread is blocking. The stack must be restored
/// with [`Self::restore_query_stack`] when the thread unblocks.
pub(super) fn take_query_stack(&self) -> Vec<ActiveQuery> {
assert!(
self.query_stack.borrow().is_some(),
"query stack already taken"
);
assert!(self.query_stack.borrow().is_some(), "query stack already taken");
self.query_stack.take().unwrap()
}
@ -188,10 +173,7 @@ impl ActiveQueryGuard<'_> {
self.local_state.with_query_stack(|stack| {
// Sanity check: pushes and pops should be balanced.
assert_eq!(stack.len(), self.push_len);
debug_assert_eq!(
stack.last().unwrap().database_key_index,
self.database_key_index
);
debug_assert_eq!(stack.last().unwrap().database_key_index, self.database_key_index);
stack.pop().unwrap()
})
}
@ -220,8 +202,7 @@ impl ActiveQueryGuard<'_> {
/// If the active query is registered as a cycle participant, remove and
/// return that cycle.
pub(crate) fn take_cycle(&self) -> Option<Cycle> {
self.local_state
.with_query_stack(|stack| stack.last_mut()?.cycle.take())
self.local_state.with_query_stack(|stack| stack.last_mut()?.cycle.take())
}
}