extract page cache to be multi threaded

This commit is contained in:
Pere Diaz Bou 2024-11-21 14:00:28 +01:00
parent da3765d061
commit a4297702bd
6 changed files with 28 additions and 25 deletions

View file

@ -6,7 +6,7 @@ use wasm_bindgen::prelude::*;
#[wasm_bindgen]
pub struct Database {
db: Rc<limbo_core::Database>,
db: Arc<limbo_core::Database>,
conn: Rc<limbo_core::Connection>,
}

View file

@ -21,13 +21,13 @@ use schema::Schema;
use sqlite3_parser::ast;
use sqlite3_parser::{ast::Cmd, lexer::sql::Parser};
use std::cell::Cell;
use std::rc::Weak;
use std::sync::{Arc, OnceLock};
use std::sync::Weak;
use std::sync::{Arc, OnceLock, RwLock};
use std::{cell::RefCell, rc::Rc};
use storage::btree::btree_init_page;
#[cfg(feature = "fs")]
use storage::database::FileStorage;
use storage::pager::allocate_page;
use storage::pager::{allocate_page, DumbLruPageCache};
use storage::sqlite3_ondisk::{DatabaseHeader, DATABASE_HEADER_SIZE};
pub use storage::wal::WalFile;
use util::parse_schema_rows;
@ -64,11 +64,12 @@ pub struct Database {
schema: Rc<RefCell<Schema>>,
header: Rc<RefCell<DatabaseHeader>>,
transaction_state: RefCell<TransactionState>,
shared_page_cache: Arc<RwLock<DumbLruPageCache>>,
}
impl Database {
#[cfg(feature = "fs")]
pub fn open_file(io: Arc<dyn IO>, path: &str) -> Result<Rc<Database>> {
pub fn open_file(io: Arc<dyn IO>, path: &str) -> Result<Arc<Database>> {
let file = io.open_file(path, io::OpenFlags::Create, true)?;
maybe_init_database_file(&file, &io)?;
let page_io = Rc::new(FileStorage::new(file));
@ -87,18 +88,20 @@ impl Database {
io: Arc<dyn IO>,
page_io: Rc<dyn DatabaseStorage>,
wal: Rc<RefCell<dyn Wal>>,
) -> Result<Rc<Database>> {
) -> Result<Arc<Database>> {
let db_header = Pager::begin_open(page_io.clone())?;
io.run_once()?;
DATABASE_VERSION.get_or_init(|| {
let version = db_header.borrow().version_number;
version.to_string()
});
let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(10)));
let pager = Rc::new(Pager::finish_open(
db_header.clone(),
page_io,
wal,
io.clone(),
shared_page_cache.clone(),
)?);
let bootstrap_schema = Rc::new(RefCell::new(Schema::new()));
let conn = Rc::new(Connection {
@ -113,21 +116,22 @@ impl Database {
parse_schema_rows(rows, &mut schema, io)?;
let schema = Rc::new(RefCell::new(schema));
let header = db_header;
Ok(Rc::new(Database {
Ok(Arc::new(Database {
pager,
schema,
header,
transaction_state: RefCell::new(TransactionState::None),
shared_page_cache,
}))
}
pub fn connect(self: &Rc<Database>) -> Rc<Connection> {
pub fn connect(self: &Arc<Database>) -> Rc<Connection> {
Rc::new(Connection {
pager: self.pager.clone(),
schema: self.schema.clone(),
header: self.header.clone(),
db: Rc::downgrade(self),
last_insert_rowid: Cell::new(0),
db: Arc::downgrade(self),
})
}
}

View file

@ -11,7 +11,7 @@ use std::hash::Hash;
use std::ptr::{drop_in_place, NonNull};
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use super::wal::CheckpointStatus;
@ -117,7 +117,7 @@ impl PageCacheEntry {
}
}
struct DumbLruPageCache {
pub struct DumbLruPageCache {
capacity: usize,
map: RefCell<HashMap<usize, NonNull<PageCacheEntry>>>,
head: RefCell<Option<NonNull<PageCacheEntry>>>,
@ -325,7 +325,7 @@ pub struct Pager {
/// The write-ahead log (WAL) for the database.
wal: Rc<RefCell<dyn Wal>>,
/// A page cache for the database.
page_cache: RefCell<DumbLruPageCache>,
page_cache: Arc<RwLock<DumbLruPageCache>>,
/// Buffer pool for temporary data storage.
buffer_pool: Rc<BufferPool>,
/// I/O interface for input/output operations.
@ -351,11 +351,11 @@ impl Pager {
page_io: Rc<dyn DatabaseStorage>,
wal: Rc<RefCell<dyn Wal>>,
io: Arc<dyn crate::io::IO>,
page_cache: Arc<RwLock<DumbLruPageCache>>,
) -> Result<Self> {
let db_header = RefCell::borrow(&db_header_ref);
let page_size = db_header.page_size as usize;
let buffer_pool = Rc::new(BufferPool::new(page_size));
let page_cache = RefCell::new(DumbLruPageCache::new(10));
Ok(Self {
page_io,
wal,
@ -396,7 +396,7 @@ impl Pager {
/// Reads a page from the database.
pub fn read_page(&self, page_idx: usize) -> crate::Result<Rc<RefCell<Page>>> {
trace!("read_page(page_idx = {})", page_idx);
let mut page_cache = self.page_cache.borrow_mut();
let mut page_cache = self.page_cache.write().unwrap();
if let Some(page) = page_cache.get(&page_idx) {
trace!("read_page(page_idx = {}) = cached", page_idx);
return Ok(page.clone());
@ -431,7 +431,7 @@ impl Pager {
pub fn load_page(&self, page: Rc<RefCell<Page>>) -> Result<()> {
let id = page.borrow().id;
trace!("load_page(page_idx = {})", id);
let mut page_cache = self.page_cache.borrow_mut();
let mut page_cache = self.page_cache.write().unwrap();
page.borrow_mut().set_locked();
if let Some(frame_id) = self.wal.borrow().find_frame(id as u64)? {
self.wal
@ -467,7 +467,8 @@ impl Pager {
/// Changes the size of the page cache.
pub fn change_page_cache_size(&self, capacity: usize) {
self.page_cache.borrow_mut().resize(capacity);
let mut page_cache = self.page_cache.write().unwrap();
page_cache.resize(capacity);
}
pub fn add_dirty(&self, page_id: usize) {
@ -483,7 +484,7 @@ impl Pager {
FlushState::Start => {
let db_size = self.db_header.borrow().database_size;
for page_id in self.dirty_pages.borrow().iter() {
let mut cache = self.page_cache.borrow_mut();
let mut cache = self.page_cache.write().unwrap();
let page = cache.get(page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
let page_type = page.borrow().contents.as_ref().unwrap().maybe_page_type();
debug!("appending frame {} {:?}", page_id, page_type);
@ -589,7 +590,7 @@ impl Pager {
Err(err) => panic!("error while clearing cache {}", err),
}
}
self.page_cache.borrow_mut().clear();
self.page_cache.write().unwrap().clear();
}
/*
@ -627,14 +628,14 @@ impl Pager {
let page = page_ref.borrow_mut();
page.set_dirty();
self.add_dirty(page.id);
let mut cache = self.page_cache.borrow_mut();
let mut cache = self.page_cache.write().unwrap();
cache.insert(page.id, page_ref.clone());
}
Ok(page_ref)
}
pub fn put_loaded_page(&self, id: usize, page: Rc<RefCell<Page>>) {
let mut cache = RefCell::borrow_mut(&self.page_cache);
let mut cache = self.page_cache.write().unwrap();
// cache insert invalidates previous page
cache.insert(id, page.clone());
page.borrow_mut().set_loaded();

View file

@ -277,8 +277,6 @@ impl WalFile {
self.io.run_once()?;
self.wal_header.replace(Some(wal_header));
} else {
// magic is a single number represented as WAL_MAGIC_LE but the big endian
// counterpart is just the same number with LSB set to 1.
let magic = if cfg!(target_endian = "big") {
WAL_MAGIC_BE
} else {

View file

@ -13,7 +13,7 @@ struct SimulatorEnv {
tables: Vec<Table>,
connections: Vec<SimConnection>,
io: Arc<SimulatorIO>,
db: Rc<Database>,
db: Arc<Database>,
rng: ChaCha8Rng,
}

View file

@ -33,7 +33,7 @@ pub mod util;
use util::sqlite3_safety_check_sick_or_ok;
pub struct sqlite3 {
pub(crate) _db: Rc<limbo_core::Database>,
pub(crate) _db: Arc<limbo_core::Database>,
pub(crate) conn: Rc<limbo_core::Connection>,
pub(crate) err_code: ffi::c_int,
pub(crate) err_mask: ffi::c_int,
@ -43,7 +43,7 @@ pub struct sqlite3 {
}
impl sqlite3 {
pub fn new(db: Rc<limbo_core::Database>, conn: Rc<limbo_core::Connection>) -> Self {
pub fn new(db: Arc<limbo_core::Database>, conn: Rc<limbo_core::Connection>) -> Self {
Self {
_db: db,
conn,