Persisting database header and pointer map page to cache

This commit ensures that the metadata in the database header and the pointer map pages allocated are correctly persisted to the page cache. This was not being done earlier.
This commit is contained in:
Zaid Humayun 2025-06-03 23:18:40 +05:30
parent e7d09edf09
commit e994adfb40
4 changed files with 53 additions and 24 deletions

View file

@ -69,6 +69,10 @@ fn pragma_for(pragma: PragmaName) -> Pragma {
&["user_version"],
),
WalCheckpoint => Pragma::new(PragmaFlags::NeedSchema, &["busy", "log", "checkpointed"]),
AutoVacuum => Pragma::new(
PragmaFlags::NoColumns1 | PragmaFlags::Result0,
&["auto_vacuum"],
),
}
}

View file

@ -484,8 +484,11 @@ impl Pager {
);
let allocated_page_id = page.get().get().id as u32;
if allocated_page_id != root_page_num {
// TODO: Handle swapping the allocated page with the desired root page
// TODO(Zaid): Handle swapping the allocated page with the desired root page
}
// TODO(Zaid): Update the header metadata to reflect the new root page number
// For now map allocated_page_id since we are not swapping it with root_page_num
match self.ptrmap_put(allocated_page_id, PtrmapType::RootPage, 0)? {
CursorResult::Ok(_) => Ok(CursorResult::Ok(allocated_page_id as u32)),
@ -926,20 +929,37 @@ impl Pager {
let header = &self.db_header;
let mut header = header.lock();
header.database_size += 1;
// update database size
self.write_database_header(&mut header)?;
#[cfg(not(feature = "omit_autovacuum"))]
{
// Check if the last page is a ptrmap page, if so allocate it and drop it immediately
// If the following conditions are met, allocate a pointer map page, add to cache and increment the database size
// - autovacuum is enabled
// - the last page is a pointer map page
if matches!(*self.auto_vacuum_mode.borrow(), AutoVacuumMode::Full)
&& is_ptrmap_page(header.database_size, header.get_page_size() as usize)
{
allocate_page(header.database_size as usize, &self.buffer_pool, 0);
let page = allocate_page(header.database_size as usize, &self.buffer_pool, 0);
page.set_dirty();
self.add_dirty(page.get().id);
let page_key = PageCacheKey::new(page.get().id);
let mut cache = self.page_cache.write();
match cache.insert(page_key, page.clone()) {
Ok(_) => (),
Err(CacheError::Full) => return Err(LimboError::CacheFull),
Err(_) => {
return Err(LimboError::InternalError(
"Unknown error inserting page to cache".into(),
))
}
}
header.database_size += 1;
}
}
// update database size
self.write_database_header(&mut header)?;
// FIXME: should reserve page cache entry before modifying the database
let page = allocate_page(header.database_size as usize, &self.buffer_pool, 0);
{
@ -1256,7 +1276,7 @@ mod ptrmap_tests {
// Helper to create a Pager for testing
fn test_pager_setup(page_size: u32, initial_db_pages: u32) -> Pager {
let io: Arc<dyn IO> = Arc::new(MemoryIO::new()); // In-memory IO for tests
let io: Arc<dyn IO> = Arc::new(MemoryIO::new());
let db_file_raw = io.open_file("test.db", OpenFlags::Create, true).unwrap();
let db_storage: Arc<dyn DatabaseStorage> = Arc::new(DatabaseFile::new(db_file_raw));
@ -1281,14 +1301,12 @@ mod ptrmap_tests {
let pager = Pager::finish_open(db_header_arc, db_storage, wal, io, page_cache, buffer_pool)
.unwrap();
pager.set_auto_vacuum_mode(AutoVacuumMode::Full); // set autovacuum mode to full
pager.set_auto_vacuum_mode(AutoVacuumMode::Full);
// Allocate all the pages as btree root pages
for _ in 0..initial_db_pages {
match pager.btree_create(&CreateBTreeFlags::new_table()) {
Ok(CursorResult::Ok(_root_page_id)) => {
// Successfully created, do nothing further in this loop context
}
Ok(CursorResult::Ok(_root_page_id)) => (),
Ok(CursorResult::IO) => {
panic!("test_pager_setup: btree_create returned CursorResult::IO unexpectedly");
}
@ -1317,6 +1335,9 @@ mod ptrmap_tests {
let ptrmap_page_ref = pager.read_page(expected_ptrmap_pg_no as usize);
assert!(ptrmap_page_ref.is_ok());
// Ensure that the database header size is correctly reflected
assert_eq!(pager.db_header.lock().database_size, initial_db_pages + 2); // (1+1) -> (header + ptrmap)
// Read the entry from the ptrmap page and verify it
let entry = pager.ptrmap_get(db_page_to_update).unwrap();
assert!(matches!(entry, CursorResult::Ok(Some(_))));

View file

@ -225,18 +225,9 @@ fn update_pragma(
}
};
match auto_vacuum_mode {
0 => {
pager.set_auto_vacuum_mode(AutoVacuumMode::None);
pager.db_header.lock().vacuum_mode_largest_root_page = 0;
}
1 => {
pager.set_auto_vacuum_mode(AutoVacuumMode::Full);
pager.db_header.lock().vacuum_mode_largest_root_page = 1;
}
2 => {
pager.set_auto_vacuum_mode(AutoVacuumMode::Incremental);
pager.db_header.lock().vacuum_mode_largest_root_page = 1;
}
0 => update_auto_vacuum_mode(AutoVacuumMode::None, 0, header, pager)?,
1 => update_auto_vacuum_mode(AutoVacuumMode::Full, 1, header, pager)?,
2 => update_auto_vacuum_mode(AutoVacuumMode::Incremental, 1, header, pager)?,
_ => {
return Err(LimboError::InvalidArgument(
"invalid auto vacuum mode".to_string(),
@ -399,6 +390,19 @@ fn query_pragma(
Ok(())
}
fn update_auto_vacuum_mode(
auto_vacuum_mode: AutoVacuumMode,
largest_root_page_number: u32,
header: Arc<SpinLock<DatabaseHeader>>,
pager: Rc<Pager>,
) -> crate::Result<()> {
let mut header_guard = header.lock();
header_guard.vacuum_mode_largest_root_page = largest_root_page_number;
pager.set_auto_vacuum_mode(auto_vacuum_mode);
pager.write_database_header(&header_guard)?;
Ok(())
}
fn update_cache_size(
value: i64,
header: Arc<SpinLock<DatabaseHeader>>,

View file

@ -4565,12 +4565,12 @@ pub fn op_set_cookie(
Cookie::LargestRootPageNumber => {
let mut header_guard = pager.db_header.lock();
header_guard.vacuum_mode_largest_root_page = *value as u32;
pager.write_database_header(&*header_guard);
pager.write_database_header(&*header_guard)?;
}
Cookie::IncrementalVacuum => {
let mut header_guard = pager.db_header.lock();
header_guard.incremental_vacuum_enabled = *value as u32;
pager.write_database_header(&*header_guard);
pager.write_database_header(&*header_guard)?;
}
cookie => todo!("{cookie:?} is not yet implement for SetCookie"),
}