mirror of
https://github.com/tursodatabase/limbo.git
synced 2025-08-04 18:18:03 +00:00
Merge 'Page cache fixes' from Pere Diaz Bou
This PR builds on top of https://github.com/tursodatabase/limbo/pull/1368 and adds few things like allowing inserting pages with the same page key, fix fuzz tests by adding transactions and some minor improvements to cacheflush. Closes #1523
This commit is contained in:
commit
c7f984c5c8
7 changed files with 1430 additions and 354 deletions
|
@ -8,6 +8,8 @@ pub enum LimboError {
|
|||
NotADB,
|
||||
#[error("Internal error: {0}")]
|
||||
InternalError(String),
|
||||
#[error("Page cache is full")]
|
||||
CacheFull,
|
||||
#[error("Parse error: {0}")]
|
||||
ParseError(String),
|
||||
#[error(transparent)]
|
||||
|
|
12
core/lib.rs
12
core/lib.rs
|
@ -57,7 +57,7 @@ use std::{
|
|||
rc::Rc,
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
use storage::btree::btree_init_page;
|
||||
use storage::btree::{btree_init_page, BTreePageInner};
|
||||
#[cfg(feature = "fs")]
|
||||
use storage::database::DatabaseFile;
|
||||
pub use storage::{
|
||||
|
@ -80,8 +80,6 @@ use vdbe::{builder::QueryMode, VTabOpaqueCursor};
|
|||
pub type Result<T, E = LimboError> = std::result::Result<T, E>;
|
||||
pub static DATABASE_VERSION: OnceLock<String> = OnceLock::new();
|
||||
|
||||
const DEFAULT_PAGE_CACHE_SIZE_IN_PAGES: usize = 2000;
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq)]
|
||||
enum TransactionState {
|
||||
Write,
|
||||
|
@ -170,9 +168,7 @@ impl Database {
|
|||
None
|
||||
};
|
||||
|
||||
let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(
|
||||
DEFAULT_PAGE_CACHE_SIZE_IN_PAGES,
|
||||
)));
|
||||
let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::default()));
|
||||
let schema = Arc::new(RwLock::new(Schema::new()));
|
||||
let db = Database {
|
||||
mv_store,
|
||||
|
@ -276,6 +272,9 @@ pub fn maybe_init_database_file(file: &Arc<dyn File>, io: &Arc<dyn IO>) -> Resul
|
|||
&Rc::new(BufferPool::new(db_header.get_page_size() as usize)),
|
||||
DATABASE_HEADER_SIZE,
|
||||
);
|
||||
let page1 = Arc::new(BTreePageInner {
|
||||
page: RefCell::new(page1),
|
||||
});
|
||||
{
|
||||
// Create the sqlite_schema table, for this we just need to create the btree page
|
||||
// for the first page of the database which is basically like any other btree page
|
||||
|
@ -288,6 +287,7 @@ pub fn maybe_init_database_file(file: &Arc<dyn File>, io: &Arc<dyn IO>) -> Resul
|
|||
(db_header.get_page_size() - db_header.reserved_space as u32) as u16,
|
||||
);
|
||||
|
||||
let page1 = page1.get();
|
||||
let contents = page1.get().contents.as_mut().unwrap();
|
||||
contents.write_database_header(&db_header);
|
||||
// write the first page to disk synchronously
|
||||
|
|
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
|
@ -1,5 +1,6 @@
|
|||
use crate::fast_lock::SpinLock;
|
||||
use crate::result::LimboResult;
|
||||
use crate::storage::btree::BTreePageInner;
|
||||
use crate::storage::buffer_pool::BufferPool;
|
||||
use crate::storage::database::DatabaseStorage;
|
||||
use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent, PageType};
|
||||
|
@ -13,7 +14,8 @@ use std::sync::atomic::{AtomicUsize, Ordering};
|
|||
use std::sync::Arc;
|
||||
use tracing::trace;
|
||||
|
||||
use super::page_cache::{DumbLruPageCache, PageCacheKey};
|
||||
use super::btree::BTreePage;
|
||||
use super::page_cache::{CacheError, CacheResizeResult, DumbLruPageCache, PageCacheKey};
|
||||
use super::wal::{CheckpointMode, CheckpointStatus};
|
||||
|
||||
pub struct PageInner {
|
||||
|
@ -22,6 +24,7 @@ pub struct PageInner {
|
|||
pub id: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Page {
|
||||
pub inner: UnsafeCell<PageInner>,
|
||||
}
|
||||
|
@ -213,6 +216,7 @@ impl Pager {
|
|||
})
|
||||
}
|
||||
|
||||
// FIXME: handle no room in page cache
|
||||
pub fn btree_create(&self, flags: &CreateBTreeFlags) -> u32 {
|
||||
let page_type = match flags {
|
||||
_ if flags.is_table() => PageType::TableLeaf,
|
||||
|
@ -220,12 +224,13 @@ impl Pager {
|
|||
_ => unreachable!("Invalid flags state"),
|
||||
};
|
||||
let page = self.do_allocate_page(page_type, 0);
|
||||
let id = page.get().id;
|
||||
let id = page.get().get().id;
|
||||
id as u32
|
||||
}
|
||||
|
||||
/// Allocate a new overflow page.
|
||||
/// This is done when a cell overflows and new space is needed.
|
||||
// FIXME: handle no room in page cache
|
||||
pub fn allocate_overflow_page(&self) -> PageRef {
|
||||
let page = self.allocate_page().unwrap();
|
||||
tracing::debug!("Pager::allocate_overflow_page(id={})", page.get().id);
|
||||
|
@ -240,13 +245,17 @@ impl Pager {
|
|||
|
||||
/// Allocate a new page to the btree via the pager.
|
||||
/// This marks the page as dirty and writes the page header.
|
||||
pub fn do_allocate_page(&self, page_type: PageType, offset: usize) -> PageRef {
|
||||
// FIXME: handle no room in page cache
|
||||
pub fn do_allocate_page(&self, page_type: PageType, offset: usize) -> BTreePage {
|
||||
let page = self.allocate_page().unwrap();
|
||||
let page = Arc::new(BTreePageInner {
|
||||
page: RefCell::new(page),
|
||||
});
|
||||
crate::btree_init_page(&page, page_type, offset, self.usable_space() as u16);
|
||||
tracing::debug!(
|
||||
"do_allocate_page(id={}, page_type={:?})",
|
||||
page.get().id,
|
||||
page.get_contents().page_type()
|
||||
page.get().get().id,
|
||||
page.get().get_contents().page_type()
|
||||
);
|
||||
page
|
||||
}
|
||||
|
@ -302,7 +311,7 @@ impl Pager {
|
|||
}
|
||||
|
||||
/// Reads a page from the database.
|
||||
pub fn read_page(&self, page_idx: usize) -> Result<PageRef> {
|
||||
pub fn read_page(&self, page_idx: usize) -> Result<PageRef, LimboError> {
|
||||
tracing::trace!("read_page(page_idx = {})", page_idx);
|
||||
let mut page_cache = self.page_cache.write();
|
||||
let max_frame = match &self.wal {
|
||||
|
@ -324,9 +333,21 @@ impl Pager {
|
|||
{
|
||||
page.set_uptodate();
|
||||
}
|
||||
// TODO(pere) ensure page is inserted, we should probably first insert to page cache
|
||||
// and if successful, read frame or page
|
||||
page_cache.insert(page_key, page.clone());
|
||||
// TODO(pere) should probably first insert to page cache, and if successful,
|
||||
// read frame or page
|
||||
match page_cache.insert(page_key, page.clone()) {
|
||||
Ok(_) => {}
|
||||
Err(CacheError::Full) => return Err(LimboError::CacheFull),
|
||||
Err(CacheError::KeyExists) => {
|
||||
unreachable!("Page should not exist in cache after get() miss")
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Failed to insert page into cache: {:?}",
|
||||
e
|
||||
)))
|
||||
}
|
||||
}
|
||||
return Ok(page);
|
||||
}
|
||||
}
|
||||
|
@ -336,49 +357,20 @@ impl Pager {
|
|||
page.clone(),
|
||||
page_idx,
|
||||
)?;
|
||||
// TODO(pere) ensure page is inserted
|
||||
page_cache.insert(page_key, page.clone());
|
||||
Ok(page)
|
||||
}
|
||||
|
||||
/// Loads pages if not loaded
|
||||
pub fn load_page(&self, page: PageRef) -> Result<()> {
|
||||
let id = page.get().id;
|
||||
trace!("load_page(page_idx = {})", id);
|
||||
let mut page_cache = self.page_cache.write();
|
||||
page.set_locked();
|
||||
let max_frame = match &self.wal {
|
||||
Some(wal) => wal.borrow().get_max_frame(),
|
||||
None => 0,
|
||||
};
|
||||
let page_key = PageCacheKey::new(id, Some(max_frame));
|
||||
if let Some(wal) = &self.wal {
|
||||
if let Some(frame_id) = wal.borrow().find_frame(id as u64)? {
|
||||
wal.borrow()
|
||||
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
|
||||
{
|
||||
page.set_uptodate();
|
||||
}
|
||||
// TODO(pere) ensure page is inserted
|
||||
if !page_cache.contains_key(&page_key) {
|
||||
page_cache.insert(page_key, page.clone());
|
||||
}
|
||||
return Ok(());
|
||||
match page_cache.insert(page_key, page.clone()) {
|
||||
Ok(_) => {}
|
||||
Err(CacheError::Full) => return Err(LimboError::CacheFull),
|
||||
Err(CacheError::KeyExists) => {
|
||||
unreachable!("Page should not exist in cache after get() miss")
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Failed to insert page into cache: {:?}",
|
||||
e
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(pere) ensure page is inserted
|
||||
if !page_cache.contains_key(&page_key) {
|
||||
page_cache.insert(page_key, page.clone());
|
||||
}
|
||||
sqlite3_ondisk::begin_read_page(
|
||||
self.db_file.clone(),
|
||||
self.buffer_pool.clone(),
|
||||
page.clone(),
|
||||
id,
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
Ok(page)
|
||||
}
|
||||
|
||||
/// Writes the database header.
|
||||
|
@ -387,9 +379,9 @@ impl Pager {
|
|||
}
|
||||
|
||||
/// Changes the size of the page cache.
|
||||
pub fn change_page_cache_size(&self, capacity: usize) {
|
||||
pub fn change_page_cache_size(&self, capacity: usize) -> Result<CacheResizeResult> {
|
||||
let mut page_cache = self.page_cache.write();
|
||||
page_cache.resize(capacity);
|
||||
Ok(page_cache.resize(capacity))
|
||||
}
|
||||
|
||||
pub fn add_dirty(&self, page_id: usize) {
|
||||
|
@ -422,8 +414,8 @@ impl Pager {
|
|||
for page_id in self.dirty_pages.borrow().iter() {
|
||||
let mut cache = self.page_cache.write();
|
||||
let page_key = PageCacheKey::new(*page_id, Some(max_frame));
|
||||
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
|
||||
if let Some(wal) = &self.wal {
|
||||
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
|
||||
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
|
||||
trace!("cacheflush(page={}, page_type={:?}", page_id, page_type);
|
||||
wal.borrow_mut().append_frame(
|
||||
|
@ -432,10 +424,12 @@ impl Pager {
|
|||
self.flush_info.borrow().in_flight_writes.clone(),
|
||||
)?;
|
||||
}
|
||||
// This page is no longer valid.
|
||||
// For example:
|
||||
// We took page with key (page_num, max_frame) -- this page is no longer valid for that max_frame so it must be invalidated.
|
||||
cache.delete(page_key);
|
||||
page.clear_dirty();
|
||||
}
|
||||
// This is okay assuming we use shared cache by default.
|
||||
{
|
||||
let mut cache = self.page_cache.write();
|
||||
cache.clear().unwrap();
|
||||
}
|
||||
self.dirty_pages.borrow_mut().clear();
|
||||
self.flush_info.borrow_mut().state = FlushState::WaitAppendFrames;
|
||||
|
@ -560,7 +554,10 @@ impl Pager {
|
|||
}
|
||||
}
|
||||
// TODO: only clear cache of things that are really invalidated
|
||||
self.page_cache.write().clear();
|
||||
self.page_cache
|
||||
.write()
|
||||
.clear()
|
||||
.expect("Failed to clear page cache");
|
||||
checkpoint_result
|
||||
}
|
||||
|
||||
|
@ -640,6 +637,7 @@ impl Pager {
|
|||
Gets a new page that increasing the size of the page or uses a free page.
|
||||
Currently free list pages are not yet supported.
|
||||
*/
|
||||
// FIXME: handle no room in page cache
|
||||
#[allow(clippy::readonly_write_lock)]
|
||||
pub fn allocate_page(&self) -> Result<PageRef> {
|
||||
let header = &self.db_header;
|
||||
|
@ -663,33 +661,55 @@ impl Pager {
|
|||
}
|
||||
}
|
||||
|
||||
// FIXME: should reserve page cache entry before modifying the database
|
||||
let page = allocate_page(header.database_size as usize, &self.buffer_pool, 0);
|
||||
{
|
||||
// setup page and add to cache
|
||||
page.set_dirty();
|
||||
self.add_dirty(page.get().id);
|
||||
let mut cache = self.page_cache.write();
|
||||
let max_frame = match &self.wal {
|
||||
Some(wal) => wal.borrow().get_max_frame(),
|
||||
None => 0,
|
||||
};
|
||||
|
||||
let page_key = PageCacheKey::new(page.get().id, Some(max_frame));
|
||||
cache.insert(page_key, page.clone());
|
||||
let mut cache = self.page_cache.write();
|
||||
match cache.insert(page_key, page.clone()) {
|
||||
Err(CacheError::Full) => return Err(LimboError::CacheFull),
|
||||
Err(_) => {
|
||||
return Err(LimboError::InternalError(
|
||||
"Unknown error inserting page to cache".into(),
|
||||
))
|
||||
}
|
||||
Ok(_) => return Ok(page),
|
||||
}
|
||||
}
|
||||
Ok(page)
|
||||
}
|
||||
|
||||
pub fn put_loaded_page(&self, id: usize, page: PageRef) {
|
||||
pub fn update_dirty_loaded_page_in_cache(
|
||||
&self,
|
||||
id: usize,
|
||||
page: PageRef,
|
||||
) -> Result<(), LimboError> {
|
||||
let mut cache = self.page_cache.write();
|
||||
// cache insert invalidates previous page
|
||||
let max_frame = match &self.wal {
|
||||
Some(wal) => wal.borrow().get_max_frame(),
|
||||
None => 0,
|
||||
};
|
||||
let page_key = PageCacheKey::new(id, Some(max_frame));
|
||||
cache.insert(page_key, page.clone());
|
||||
|
||||
// FIXME: use specific page key for writer instead of max frame, this will make readers not conflict
|
||||
assert!(page.is_dirty());
|
||||
cache
|
||||
.insert_ignore_existing(page_key, page.clone())
|
||||
.map_err(|e| {
|
||||
LimboError::InternalError(format!(
|
||||
"Failed to insert loaded page {} into cache: {:?}",
|
||||
id, e
|
||||
))
|
||||
})?;
|
||||
page.set_loaded();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn usable_size(&self) -> usize {
|
||||
|
@ -760,7 +780,7 @@ mod tests {
|
|||
std::thread::spawn(move || {
|
||||
let mut cache = cache.write();
|
||||
let page_key = PageCacheKey::new(1, None);
|
||||
cache.insert(page_key, Arc::new(Page::new(1)));
|
||||
cache.insert(page_key, Arc::new(Page::new(1))).unwrap();
|
||||
})
|
||||
};
|
||||
let _ = thread.join();
|
||||
|
|
|
@ -305,5 +305,7 @@ fn update_cache_size(value: i64, header: Arc<SpinLock<DatabaseHeader>>, pager: R
|
|||
pager.write_database_header(&header_copy);
|
||||
|
||||
// update cache size
|
||||
pager.change_page_cache_size(cache_size);
|
||||
pager
|
||||
.change_page_cache_size(cache_size)
|
||||
.expect("couldn't update page cache size");
|
||||
}
|
||||
|
|
|
@ -4323,6 +4323,7 @@ pub fn op_create_btree(
|
|||
// TODO: implement temp databases
|
||||
todo!("temp databases not implemented yet");
|
||||
}
|
||||
// FIXME: handle page cache is full
|
||||
let root_page = pager.btree_create(flags);
|
||||
state.registers[*root] = Register::Value(Value::Integer(root_page as i64));
|
||||
state.pc += 1;
|
||||
|
@ -4691,7 +4692,7 @@ pub fn op_open_ephemeral(
|
|||
|
||||
let db_header = Pager::begin_open(db_file.clone())?;
|
||||
let buffer_pool = Rc::new(BufferPool::new(db_header.lock().get_page_size() as usize));
|
||||
let page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(10)));
|
||||
let page_cache = Arc::new(RwLock::new(DumbLruPageCache::default()));
|
||||
|
||||
let pager = Rc::new(Pager::finish_open(
|
||||
db_header,
|
||||
|
@ -4708,6 +4709,7 @@ pub fn op_open_ephemeral(
|
|||
&CreateBTreeFlags::new_index()
|
||||
};
|
||||
|
||||
// FIXME: handle page cache is full
|
||||
let root_page = pager.btree_create(flag);
|
||||
|
||||
let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap();
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue