This commit is contained in:
Zaid Humayun 2025-07-07 18:38:26 +05:30 committed by GitHub
commit ce39f341a6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 74 additions and 29 deletions

View file

@ -4,6 +4,7 @@ use crate::{
pager::{PageRef, Pager},
sqlite3_ondisk::DATABASE_HEADER_PAGE_ID,
},
types::CursorResult,
LimboError, Result,
};
use std::sync::atomic::Ordering;
@ -33,22 +34,21 @@ const HEADER_OFFSET_VERSION_VALID_FOR: usize = 92;
const HEADER_OFFSET_VERSION_NUMBER: usize = 96;
// Helper to get a read-only reference to the header page.
fn get_header_page(pager: &Pager) -> Result<PageRef> {
fn get_header_page(pager: &Pager) -> Result<CursorResult<PageRef>> {
if pager.is_empty.load(Ordering::SeqCst) < 2 {
return Err(LimboError::InternalError(
"Database is empty, header does not exist - page 1 should've been allocated before this".to_string(),
));
}
let page = pager.read_page(DATABASE_HEADER_PAGE_ID)?;
while !page.is_loaded() || page.is_locked() {
// FIXME: LETS STOP DOING THESE SYNCHRONOUS IO HACKS
pager.io.run_once()?;
if !page.is_loaded() || page.is_locked() {
return Ok(CursorResult::IO);
}
Ok(page)
Ok(CursorResult::Ok(page))
}
// Helper to get a writable reference to the header page and mark it dirty.
fn get_header_page_for_write(pager: &Pager) -> Result<PageRef> {
fn get_header_page_for_write(pager: &Pager) -> Result<CursorResult<PageRef>> {
if pager.is_empty.load(Ordering::SeqCst) < 2 {
// This should not be called on an empty DB for writing, as page 1 is allocated on first transaction.
return Err(LimboError::InternalError(
@ -56,20 +56,36 @@ fn get_header_page_for_write(pager: &Pager) -> Result<PageRef> {
));
}
let page = pager.read_page(DATABASE_HEADER_PAGE_ID)?;
while !page.is_loaded() || page.is_locked() {
// FIXME: LETS STOP DOING THESE SYNCHRONOUS IO HACKS
pager.io.run_once()?;
if !page.is_loaded() || page.is_locked() {
return Ok(CursorResult::IO);
}
page.set_dirty();
pager.add_dirty(DATABASE_HEADER_PAGE_ID);
Ok(page)
Ok(CursorResult::Ok(page))
}
/// Helper function to run async header accessors until completion
fn run_header_accessor_until_done<T, F>(pager: &Pager, mut accessor: F) -> Result<T>
where
F: FnMut() -> Result<CursorResult<T>>,
{
loop {
match accessor()? {
CursorResult::Ok(value) => return Ok(value),
CursorResult::IO => {
pager.io.run_once()?;
}
}
}
}
/// Helper macro to implement getters and setters for header fields.
/// For example, `impl_header_field_accessor!(page_size, u16, HEADER_OFFSET_PAGE_SIZE);`
/// will generate the following functions:
/// - `pub fn get_page_size(pager: &Pager) -> Result<u16>`
/// - `pub fn set_page_size(pager: &Pager, value: u16) -> Result<()>`
/// - `pub fn get_page_size(pager: &Pager) -> Result<u16>` (sync)
/// - `pub fn get_page_size_async(pager: &Pager) -> Result<CursorResult<u16>>` (async)
/// - `pub fn set_page_size(pager: &Pager, value: u16) -> Result<()>` (sync)
/// - `pub fn set_page_size_async(pager: &Pager, value: u16) -> Result<CursorResult<()>>` (async)
///
/// The macro takes three required arguments:
/// - `$field_name`: The name of the field to implement.
@ -79,19 +95,21 @@ fn get_header_page_for_write(pager: &Pager) -> Result<PageRef> {
/// And a fourth optional argument:
/// - `$ifzero`: A value to return if the field is 0.
///
/// The macro will generate the following functions:
/// - `pub fn get_<field_name>(pager: &Pager) -> Result<T>`
/// - `pub fn set_<field_name>(pager: &Pager, value: T) -> Result<()>`
/// The macro will generate both sync and async versions of the functions.
///
macro_rules! impl_header_field_accessor {
($field_name:ident, $type:ty, $offset:expr $(, $ifzero:expr)?) => {
paste::paste! {
// Async version
#[allow(dead_code)]
pub fn [<get_ $field_name>](pager: &Pager) -> Result<$type> {
pub fn [<get_ $field_name _async>](pager: &Pager) -> Result<CursorResult<$type>> {
if pager.is_empty.load(Ordering::SeqCst) < 2 {
return Err(LimboError::InternalError(format!("Database is empty, header does not exist - page 1 should've been allocated before this")));
}
let page = get_header_page(pager)?;
let page = match get_header_page(pager)? {
CursorResult::Ok(page) => page,
CursorResult::IO => return Ok(CursorResult::IO),
};
let page_inner = page.get();
let page_content = page_inner.contents.as_ref().unwrap();
let buf = page_content.buffer.borrow();
@ -101,15 +119,25 @@ macro_rules! impl_header_field_accessor {
let value = <$type>::from_be_bytes(bytes);
$(
if value == 0 {
return Ok($ifzero);
return Ok(CursorResult::Ok($ifzero));
}
)?
Ok(value)
Ok(CursorResult::Ok(value))
}
// Sync version
#[allow(dead_code)]
pub fn [<set_ $field_name>](pager: &Pager, value: $type) -> Result<()> {
let page = get_header_page_for_write(pager)?;
pub fn [<get_ $field_name>](pager: &Pager) -> Result<$type> {
run_header_accessor_until_done(pager, || [<get_ $field_name _async>](pager))
}
// Async setter
#[allow(dead_code)]
pub fn [<set_ $field_name _async>](pager: &Pager, value: $type) -> Result<CursorResult<()>> {
let page = match get_header_page_for_write(pager)? {
CursorResult::Ok(page) => page,
CursorResult::IO => return Ok(CursorResult::IO),
};
let page_inner = page.get();
let page_content = page_inner.contents.as_ref().unwrap();
let mut buf = page_content.buffer.borrow_mut();
@ -117,7 +145,13 @@ macro_rules! impl_header_field_accessor {
buf_slice[$offset..$offset + std::mem::size_of::<$type>()].copy_from_slice(&value.to_be_bytes());
page.set_dirty();
pager.add_dirty(1);
Ok(())
Ok(CursorResult::Ok(()))
}
// Sync setter
#[allow(dead_code)]
pub fn [<set_ $field_name>](pager: &Pager, value: $type) -> Result<()> {
run_header_accessor_until_done(pager, || [<set_ $field_name _async>](pager, value))
}
}
};

View file

@ -314,7 +314,10 @@ impl Pager {
#[cfg(not(feature = "omit_autovacuum"))]
pub fn ptrmap_get(&self, target_page_num: u32) -> Result<CursorResult<Option<PtrmapEntry>>> {
tracing::trace!("ptrmap_get(page_idx = {})", target_page_num);
let configured_page_size = header_accessor::get_page_size(self)? as usize;
let configured_page_size = match header_accessor::get_page_size_async(self)? {
CursorResult::Ok(size) => size as usize,
CursorResult::IO => return Ok(CursorResult::IO),
};
if target_page_num < FIRST_PTRMAP_PAGE_NO
|| is_ptrmap_page(target_page_num, configured_page_size)
@ -400,7 +403,10 @@ impl Pager {
parent_page_no
);
let page_size = header_accessor::get_page_size(self)? as usize;
let page_size = match header_accessor::get_page_size_async(self)? {
CursorResult::Ok(size) => size as usize,
CursorResult::IO => return Ok(CursorResult::IO),
};
if db_page_no_to_update < FIRST_PTRMAP_PAGE_NO
|| is_ptrmap_page(db_page_no_to_update, page_size)
@ -496,15 +502,20 @@ impl Pager {
}
AutoVacuumMode::Full => {
let mut root_page_num =
header_accessor::get_vacuum_mode_largest_root_page(self)?;
match header_accessor::get_vacuum_mode_largest_root_page_async(self)? {
CursorResult::Ok(value) => value,
CursorResult::IO => return Ok(CursorResult::IO),
};
assert!(root_page_num > 0); // Largest root page number cannot be 0 because that is set to 1 when creating the database with autovacuum enabled
root_page_num += 1;
assert!(root_page_num >= FIRST_PTRMAP_PAGE_NO); // can never be less than 2 because we have already incremented
while is_ptrmap_page(
root_page_num,
header_accessor::get_page_size(self)? as usize,
) {
let page_size = match header_accessor::get_page_size_async(self)? {
CursorResult::Ok(size) => size as usize,
CursorResult::IO => return Ok(CursorResult::IO),
};
while is_ptrmap_page(root_page_num, page_size) {
root_page_num += 1;
}
assert!(root_page_num >= 3); // the very first root page is page 3