allow updating a page id in page cache

This commit is contained in:
Pere Diaz Bou 2025-05-19 15:02:38 +02:00
parent 35e2088b7e
commit adf72f2bf8
3 changed files with 59 additions and 42 deletions

View file

@ -2814,8 +2814,8 @@ impl BTreeCursor {
let page = page.as_ref().unwrap();
if *new_id != page.get().id {
page.get().id = *new_id;
// FIXME: why would there be another page at this id/frame?
self.pager.put_loaded_page(*new_id, page.clone())?;
self.pager
.update_dirty_loaded_page_in_cache(*new_id, page.clone())?;
}
}

View file

@ -71,14 +71,33 @@ impl DumbLruPageCache {
}
pub fn insert(&mut self, key: PageCacheKey, value: PageRef) -> Result<(), CacheError> {
self._insert(key, value, false)
}
pub fn insert_ignore_existing(
&mut self,
key: PageCacheKey,
value: PageRef,
) -> Result<(), CacheError> {
self._insert(key, value, true)
}
pub fn _insert(
&mut self,
key: PageCacheKey,
value: PageRef,
ignore_exists: bool,
) -> Result<(), CacheError> {
trace!("insert(key={:?})", key);
// Check first if page already exists in cache
if let Some(existing_page_ref) = self.get(&key) {
assert!(
Arc::ptr_eq(&value, &existing_page_ref),
"Attempted to insert different page with same key"
);
return Err(CacheError::KeyExists);
if !ignore_exists {
if let Some(existing_page_ref) = self.get(&key) {
assert!(
Arc::ptr_eq(&value, &existing_page_ref),
"Attempted to insert different page with same key"
);
return Err(CacheError::KeyExists);
}
}
self.make_room_for(1)?;
let entry = Box::new(PageCacheEntry {
@ -268,15 +287,13 @@ impl DumbLruPageCache {
}
pub fn print(&mut self) {
println!("page_cache_len={}", self.map.borrow().len());
tracing::debug!("page_cache_len={}", self.map.borrow().len());
let head_ptr = *self.head.borrow();
let mut current = head_ptr;
let mut last_ptr: Option<NonNull<PageCacheEntry>> = None;
while let Some(node) = current {
unsafe {
println!("page={:?}", node.as_ref().key);
tracing::debug!("page={:?}", node.as_ref().key);
let node_ref = node.as_ref();
last_ptr = Some(node);
current = node_ref.next;
}
}
@ -287,12 +304,10 @@ impl DumbLruPageCache {
let mut this_keys = Vec::new();
let head_ptr = *self.head.borrow();
let mut current = head_ptr;
let mut last_ptr: Option<NonNull<PageCacheEntry>> = None;
while let Some(node) = current {
unsafe {
this_keys.push(node.as_ref().key.clone());
let node_ref = node.as_ref();
last_ptr = Some(node);
current = node_ref.next;
}
}
@ -428,6 +443,17 @@ impl DumbLruPageCache {
"Head pointer mismatch after backward traversal"
);
}
#[cfg(test)]
/// For testing purposes, in case we use cursor api directly, we might want to unmark pages as dirty because we bypass the WAL transaction layer
pub fn unset_dirty_all_pages(&mut self) {
for (_, page) in self.map.borrow_mut().iter_mut() {
unsafe {
let entry = page.as_mut();
entry.page.clear_dirty()
};
}
}
}
#[cfg(test)]
@ -850,10 +876,9 @@ mod tests {
let mut lru = LruCache::new(NonZeroUsize::new(10).unwrap());
for _ in 0..10000 {
dbg!(&lru);
cache.print();
for (key, page) in &lru {
println!("lru_page={:?}", key);
for (key, _) in &lru {
tracing::debug!("lru_page={:?}", key);
}
match rng.next_u64() % 2 {
0 => {
@ -866,7 +891,7 @@ mod tests {
if let Some(_) = cache.peek(&key, false) {
continue; // skip duplicate page ids
}
println!("inserting page {:?}", key);
tracing::debug!("inserting page {:?}", key);
match cache.insert(key.clone(), page.clone()) {
Err(CacheError::Full | CacheError::ActiveRefs) => {} // Ignore
Err(err) => {
@ -892,7 +917,7 @@ mod tests {
let key: PageCacheKey = lru.iter().skip(i).next().unwrap().0.clone();
key
};
println!("removing page {:?}", key);
tracing::debug!("removing page {:?}", key);
lru.pop(&key);
assert!(cache.delete(key).is_ok());
}
@ -900,8 +925,8 @@ mod tests {
}
compare_to_lru(&mut cache, &lru);
cache.print();
for (key, page) in &lru {
println!("lru_page={:?}", key);
for (key, _) in &lru {
tracing::debug!("lru_page={:?}", key);
}
cache.verify_list_integrity();
for (key, page) in &lru {
@ -914,8 +939,6 @@ mod tests {
}
pub fn compare_to_lru(cache: &mut DumbLruPageCache, lru: &LruCache<PageCacheKey, PageRef>) {
use lru::LruCache;
let this_keys = cache.keys();
let mut lru_keys = Vec::new();
for (lru_key, _) in lru {
@ -924,7 +947,7 @@ mod tests {
if this_keys != lru_keys {
cache.print();
for (lru_key, _) in lru {
println!("lru_page={:?}", lru_key);
tracing::debug!("lru_page={:?}", lru_key);
}
assert_eq!(&this_keys, &lru_keys)
}

View file

@ -750,7 +750,11 @@ impl Pager {
}
}
pub fn put_loaded_page(&self, id: usize, page: PageRef) -> Result<(), LimboError> {
pub fn update_dirty_loaded_page_in_cache(
&self,
id: usize,
page: PageRef,
) -> Result<(), LimboError> {
let mut cache = self.page_cache.write();
let max_frame = match &self.wal {
Some(wal) => wal.borrow().get_max_frame(),
@ -758,26 +762,16 @@ impl Pager {
};
let page_key = PageCacheKey::new(id, Some(max_frame));
// FIXME: is this correct? Why would there be another version of the page?
// Check if there's an existing page at this key and remove it
// This can happen when a page is being updated during a transaction
// or when WAL frames are being managed
if let Some(_existing_page_ref) = cache.get(&page_key) {
cache.delete(page_key.clone()).map_err(|e| {
// FIXME: use specific page key for writer instead of max frame, this will make readers not conflict
assert!(page.is_dirty());
cache
.insert_ignore_existing(page_key, page.clone())
.map_err(|e| {
LimboError::InternalError(format!(
"put_loaded_page failed to remove old version of page {:?}: {:?}.",
page_key, e
"Failed to insert loaded page {} into cache: {:?}",
id, e
))
})?;
}
cache.insert(page_key, page.clone()).map_err(|e| {
LimboError::InternalError(format!(
"Failed to insert loaded page {} into cache: {:?}",
id, e
))
})?;
page.set_loaded();
Ok(())
}