mirror of
https://github.com/tursodatabase/limbo.git
synced 2025-08-04 10:08:20 +00:00
Added Async Header Accessor API's
This commit introduces async header accessor API's in addition to the sync ones. Allows gradual porting instead of one big shot PR.
This commit is contained in:
parent
c28c83fa2c
commit
f44aeeef47
2 changed files with 74 additions and 29 deletions
|
@ -4,6 +4,7 @@ use crate::{
|
|||
pager::{PageRef, Pager},
|
||||
sqlite3_ondisk::DATABASE_HEADER_PAGE_ID,
|
||||
},
|
||||
types::CursorResult,
|
||||
LimboError, Result,
|
||||
};
|
||||
use std::sync::atomic::Ordering;
|
||||
|
@ -33,22 +34,21 @@ const HEADER_OFFSET_VERSION_VALID_FOR: usize = 92;
|
|||
const HEADER_OFFSET_VERSION_NUMBER: usize = 96;
|
||||
|
||||
// Helper to get a read-only reference to the header page.
|
||||
fn get_header_page(pager: &Pager) -> Result<PageRef> {
|
||||
fn get_header_page(pager: &Pager) -> Result<CursorResult<PageRef>> {
|
||||
if pager.is_empty.load(Ordering::SeqCst) < 2 {
|
||||
return Err(LimboError::InternalError(
|
||||
"Database is empty, header does not exist - page 1 should've been allocated before this".to_string(),
|
||||
));
|
||||
}
|
||||
let page = pager.read_page(DATABASE_HEADER_PAGE_ID)?;
|
||||
while !page.is_loaded() || page.is_locked() {
|
||||
// FIXME: LETS STOP DOING THESE SYNCHRONOUS IO HACKS
|
||||
pager.io.run_once()?;
|
||||
if !page.is_loaded() || page.is_locked() {
|
||||
return Ok(CursorResult::IO);
|
||||
}
|
||||
Ok(page)
|
||||
Ok(CursorResult::Ok(page))
|
||||
}
|
||||
|
||||
// Helper to get a writable reference to the header page and mark it dirty.
|
||||
fn get_header_page_for_write(pager: &Pager) -> Result<PageRef> {
|
||||
fn get_header_page_for_write(pager: &Pager) -> Result<CursorResult<PageRef>> {
|
||||
if pager.is_empty.load(Ordering::SeqCst) < 2 {
|
||||
// This should not be called on an empty DB for writing, as page 1 is allocated on first transaction.
|
||||
return Err(LimboError::InternalError(
|
||||
|
@ -56,20 +56,36 @@ fn get_header_page_for_write(pager: &Pager) -> Result<PageRef> {
|
|||
));
|
||||
}
|
||||
let page = pager.read_page(DATABASE_HEADER_PAGE_ID)?;
|
||||
while !page.is_loaded() || page.is_locked() {
|
||||
// FIXME: LETS STOP DOING THESE SYNCHRONOUS IO HACKS
|
||||
pager.io.run_once()?;
|
||||
if !page.is_loaded() || page.is_locked() {
|
||||
return Ok(CursorResult::IO);
|
||||
}
|
||||
page.set_dirty();
|
||||
pager.add_dirty(DATABASE_HEADER_PAGE_ID);
|
||||
Ok(page)
|
||||
Ok(CursorResult::Ok(page))
|
||||
}
|
||||
|
||||
/// Helper function to run async header accessors until completion
|
||||
fn run_header_accessor_until_done<T, F>(pager: &Pager, mut accessor: F) -> Result<T>
|
||||
where
|
||||
F: FnMut() -> Result<CursorResult<T>>,
|
||||
{
|
||||
loop {
|
||||
match accessor()? {
|
||||
CursorResult::Ok(value) => return Ok(value),
|
||||
CursorResult::IO => {
|
||||
pager.io.run_once()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper macro to implement getters and setters for header fields.
|
||||
/// For example, `impl_header_field_accessor!(page_size, u16, HEADER_OFFSET_PAGE_SIZE);`
|
||||
/// will generate the following functions:
|
||||
/// - `pub fn get_page_size(pager: &Pager) -> Result<u16>`
|
||||
/// - `pub fn set_page_size(pager: &Pager, value: u16) -> Result<()>`
|
||||
/// - `pub fn get_page_size(pager: &Pager) -> Result<u16>` (sync)
|
||||
/// - `pub fn get_page_size_async(pager: &Pager) -> Result<CursorResult<u16>>` (async)
|
||||
/// - `pub fn set_page_size(pager: &Pager, value: u16) -> Result<()>` (sync)
|
||||
/// - `pub fn set_page_size_async(pager: &Pager, value: u16) -> Result<CursorResult<()>>` (async)
|
||||
///
|
||||
/// The macro takes three required arguments:
|
||||
/// - `$field_name`: The name of the field to implement.
|
||||
|
@ -79,19 +95,21 @@ fn get_header_page_for_write(pager: &Pager) -> Result<PageRef> {
|
|||
/// And a fourth optional argument:
|
||||
/// - `$ifzero`: A value to return if the field is 0.
|
||||
///
|
||||
/// The macro will generate the following functions:
|
||||
/// - `pub fn get_<field_name>(pager: &Pager) -> Result<T>`
|
||||
/// - `pub fn set_<field_name>(pager: &Pager, value: T) -> Result<()>`
|
||||
/// The macro will generate both sync and async versions of the functions.
|
||||
///
|
||||
macro_rules! impl_header_field_accessor {
|
||||
($field_name:ident, $type:ty, $offset:expr $(, $ifzero:expr)?) => {
|
||||
paste::paste! {
|
||||
// Async version
|
||||
#[allow(dead_code)]
|
||||
pub fn [<get_ $field_name>](pager: &Pager) -> Result<$type> {
|
||||
pub fn [<get_ $field_name _async>](pager: &Pager) -> Result<CursorResult<$type>> {
|
||||
if pager.is_empty.load(Ordering::SeqCst) < 2 {
|
||||
return Err(LimboError::InternalError(format!("Database is empty, header does not exist - page 1 should've been allocated before this")));
|
||||
}
|
||||
let page = get_header_page(pager)?;
|
||||
let page = match get_header_page(pager)? {
|
||||
CursorResult::Ok(page) => page,
|
||||
CursorResult::IO => return Ok(CursorResult::IO),
|
||||
};
|
||||
let page_inner = page.get();
|
||||
let page_content = page_inner.contents.as_ref().unwrap();
|
||||
let buf = page_content.buffer.borrow();
|
||||
|
@ -101,15 +119,25 @@ macro_rules! impl_header_field_accessor {
|
|||
let value = <$type>::from_be_bytes(bytes);
|
||||
$(
|
||||
if value == 0 {
|
||||
return Ok($ifzero);
|
||||
return Ok(CursorResult::Ok($ifzero));
|
||||
}
|
||||
)?
|
||||
Ok(value)
|
||||
Ok(CursorResult::Ok(value))
|
||||
}
|
||||
|
||||
// Sync version
|
||||
#[allow(dead_code)]
|
||||
pub fn [<set_ $field_name>](pager: &Pager, value: $type) -> Result<()> {
|
||||
let page = get_header_page_for_write(pager)?;
|
||||
pub fn [<get_ $field_name>](pager: &Pager) -> Result<$type> {
|
||||
run_header_accessor_until_done(pager, || [<get_ $field_name _async>](pager))
|
||||
}
|
||||
|
||||
// Async setter
|
||||
#[allow(dead_code)]
|
||||
pub fn [<set_ $field_name _async>](pager: &Pager, value: $type) -> Result<CursorResult<()>> {
|
||||
let page = match get_header_page_for_write(pager)? {
|
||||
CursorResult::Ok(page) => page,
|
||||
CursorResult::IO => return Ok(CursorResult::IO),
|
||||
};
|
||||
let page_inner = page.get();
|
||||
let page_content = page_inner.contents.as_ref().unwrap();
|
||||
let mut buf = page_content.buffer.borrow_mut();
|
||||
|
@ -117,7 +145,13 @@ macro_rules! impl_header_field_accessor {
|
|||
buf_slice[$offset..$offset + std::mem::size_of::<$type>()].copy_from_slice(&value.to_be_bytes());
|
||||
page.set_dirty();
|
||||
pager.add_dirty(1);
|
||||
Ok(())
|
||||
Ok(CursorResult::Ok(()))
|
||||
}
|
||||
|
||||
// Sync setter
|
||||
#[allow(dead_code)]
|
||||
pub fn [<set_ $field_name>](pager: &Pager, value: $type) -> Result<()> {
|
||||
run_header_accessor_until_done(pager, || [<set_ $field_name _async>](pager, value))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
|
@ -314,7 +314,10 @@ impl Pager {
|
|||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
pub fn ptrmap_get(&self, target_page_num: u32) -> Result<CursorResult<Option<PtrmapEntry>>> {
|
||||
tracing::trace!("ptrmap_get(page_idx = {})", target_page_num);
|
||||
let configured_page_size = header_accessor::get_page_size(self)? as usize;
|
||||
let configured_page_size = match header_accessor::get_page_size_async(self)? {
|
||||
CursorResult::Ok(size) => size as usize,
|
||||
CursorResult::IO => return Ok(CursorResult::IO),
|
||||
};
|
||||
|
||||
if target_page_num < FIRST_PTRMAP_PAGE_NO
|
||||
|| is_ptrmap_page(target_page_num, configured_page_size)
|
||||
|
@ -400,7 +403,10 @@ impl Pager {
|
|||
parent_page_no
|
||||
);
|
||||
|
||||
let page_size = header_accessor::get_page_size(self)? as usize;
|
||||
let page_size = match header_accessor::get_page_size_async(self)? {
|
||||
CursorResult::Ok(size) => size as usize,
|
||||
CursorResult::IO => return Ok(CursorResult::IO),
|
||||
};
|
||||
|
||||
if db_page_no_to_update < FIRST_PTRMAP_PAGE_NO
|
||||
|| is_ptrmap_page(db_page_no_to_update, page_size)
|
||||
|
@ -496,15 +502,20 @@ impl Pager {
|
|||
}
|
||||
AutoVacuumMode::Full => {
|
||||
let mut root_page_num =
|
||||
header_accessor::get_vacuum_mode_largest_root_page(self)?;
|
||||
match header_accessor::get_vacuum_mode_largest_root_page_async(self)? {
|
||||
CursorResult::Ok(value) => value,
|
||||
CursorResult::IO => return Ok(CursorResult::IO),
|
||||
};
|
||||
assert!(root_page_num > 0); // Largest root page number cannot be 0 because that is set to 1 when creating the database with autovacuum enabled
|
||||
root_page_num += 1;
|
||||
assert!(root_page_num >= FIRST_PTRMAP_PAGE_NO); // can never be less than 2 because we have already incremented
|
||||
|
||||
while is_ptrmap_page(
|
||||
root_page_num,
|
||||
header_accessor::get_page_size(self)? as usize,
|
||||
) {
|
||||
let page_size = match header_accessor::get_page_size_async(self)? {
|
||||
CursorResult::Ok(size) => size as usize,
|
||||
CursorResult::IO => return Ok(CursorResult::IO),
|
||||
};
|
||||
|
||||
while is_ptrmap_page(root_page_num, page_size) {
|
||||
root_page_num += 1;
|
||||
}
|
||||
assert!(root_page_num >= 3); // the very first root page is page 3
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue