state machine cacheflush

This commit is contained in:
Pere Diaz Bou 2024-11-05 15:27:07 +01:00
parent da7717edfb
commit a85d599c65
4 changed files with 90 additions and 59 deletions

View file

@ -14,6 +14,7 @@ pub trait DatabaseStorage {
buffer: Rc<RefCell<Buffer>>,
c: Rc<Completion>,
) -> Result<()>;
fn sync(&self, c: Rc<Completion>) -> Result<()>;
}
#[cfg(feature = "fs")]
@ -52,6 +53,10 @@ impl DatabaseStorage for FileStorage {
self.file.pwrite(pos, buffer, c)?;
Ok(())
}
fn sync(&self, c: Rc<Completion>) -> Result<()> {
self.file.sync(c)
}
}
#[cfg(feature = "fs")]

View file

@ -268,9 +268,11 @@ impl<K: Eq + Hash + Clone, V> PageCache<K, V> {
#[derive(Clone)]
enum FlushState {
Start,
FramesDone,
SyncWal,
Checkpoint,
CheckpointDone,
Syncing,
SyncDbFile,
WaitSyncDbFile,
}
/// This will keep track of the state of current cache flush in order to not repeat work
@ -298,6 +300,7 @@ pub struct Pager {
db_header: Rc<RefCell<DatabaseHeader>>,
flush_info: RefCell<FlushInfo>,
syncing: Rc<RefCell<bool>>,
}
impl Pager {
@ -329,6 +332,7 @@ impl Pager {
state: FlushState::Start,
in_flight_writes: Rc::new(RefCell::new(0)),
}),
syncing: Rc::new(RefCell::new(false)),
})
}
@ -396,58 +400,72 @@ impl Pager {
}
pub fn cacheflush(&self) -> Result<CheckpointStatus> {
if matches!(self.flush_info.borrow().state.clone(), FlushState::Start) {
let db_size = self.db_header.borrow().database_size;
for page_id in self.dirty_pages.borrow().iter() {
let mut cache = self.page_cache.borrow_mut();
let page = cache.get(&page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
self.wal.borrow_mut().append_frame(
page.clone(),
db_size,
self,
self.flush_info.borrow().in_flight_writes.clone(),
)?;
}
self.dirty_pages.borrow_mut().clear();
self.flush_info.borrow_mut().state = FlushState::FramesDone;
}
loop {
let state = self.flush_info.borrow().state.clone();
match state {
FlushState::Start => {
let db_size = self.db_header.borrow().database_size;
for page_id in self.dirty_pages.borrow().iter() {
let mut cache = self.page_cache.borrow_mut();
let page = cache.get(&page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
self.wal.borrow_mut().append_frame(
page.clone(),
db_size,
self,
self.flush_info.borrow().in_flight_writes.clone(),
)?;
}
self.dirty_pages.borrow_mut().clear();
self.flush_info.borrow_mut().state = FlushState::SyncWal;
}
FlushState::Checkpoint => {
let in_flight = self.flush_info.borrow().in_flight_writes.clone();
dbg!("checkpoint");
match self.wal.borrow_mut().checkpoint(self, in_flight)? {
CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
CheckpointStatus::Done => {
self.flush_info.borrow_mut().state = FlushState::CheckpointDone;
}
};
}
FlushState::CheckpointDone => {
dbg!("checkpoint done");
let in_flight = *self.flush_info.borrow().in_flight_writes.borrow();
if in_flight == 0 {
self.flush_info.borrow_mut().state = FlushState::SyncDbFile;
} else {
return Ok(CheckpointStatus::IO);
}
}
FlushState::SyncWal => {
match self.wal.borrow_mut().sync() {
Ok(CheckpointStatus::IO) => return Ok(CheckpointStatus::IO),
Ok(CheckpointStatus::Done) => {}
Err(e) => return Err(e),
}
if matches!(
self.flush_info.borrow().state.clone(),
FlushState::FramesDone
) {
let should_checkpoint = self.wal.borrow().should_checkpoint();
if should_checkpoint {
match self
.wal
.borrow_mut()
.checkpoint(self, self.flush_info.borrow().in_flight_writes.clone())
{
Ok(CheckpointStatus::IO) => return Ok(CheckpointStatus::IO),
Ok(CheckpointStatus::Done) => {}
Err(e) => return Err(e),
};
let should_checkpoint = self.wal.borrow().should_checkpoint();
if should_checkpoint {
self.flush_info.borrow_mut().state = FlushState::Checkpoint;
} else {
self.flush_info.borrow_mut().state = FlushState::Start;
break;
}
}
FlushState::SyncDbFile => {
dbg!("sync db");
sqlite3_ondisk::begin_sync(self.page_io.clone(), self.syncing.clone())?;
self.flush_info.borrow_mut().state = FlushState::WaitSyncDbFile;
}
FlushState::WaitSyncDbFile => {
if *self.syncing.borrow() {
return Ok(CheckpointStatus::IO);
} else {
self.flush_info.borrow_mut().state = FlushState::Start;
break;
}
}
}
self.flush_info.borrow_mut().state = FlushState::CheckpointDone;
}
if matches!(
self.flush_info.borrow().state.clone(),
FlushState::CheckpointDone
) {
let in_flight = *self.flush_info.borrow().in_flight_writes.borrow();
if in_flight == 0 {
self.flush_info.borrow_mut().state = FlushState::Syncing;
}
}
if matches!(self.flush_info.borrow().state.clone(), FlushState::Syncing) {
match self.wal.borrow_mut().sync() {
Ok(CheckpointStatus::IO) => return Ok(CheckpointStatus::IO),
Ok(CheckpointStatus::Done) => {}
Err(e) => return Err(e),
}
self.flush_info.borrow_mut().state = FlushState::Start;
}
Ok(CheckpointStatus::Done)
}

View file

@ -42,7 +42,7 @@
//! https://www.sqlite.org/fileformat.html
use crate::error::LimboError;
use crate::io::{Buffer, Completion, ReadCompletion, WriteCompletion};
use crate::io::{Buffer, Completion, ReadCompletion, SyncCompletion, WriteCompletion};
use crate::storage::buffer_pool::BufferPool;
use crate::storage::database::DatabaseStorage;
use crate::storage::pager::{Page, Pager};
@ -563,6 +563,18 @@ pub fn begin_write_btree_page(
Ok(())
}
pub fn begin_sync(page_io: Rc<dyn DatabaseStorage>, syncing: Rc<RefCell<bool>>) -> Result<()> {
assert!(!*syncing.borrow());
*syncing.borrow_mut() = true;
let completion = Completion::Sync(SyncCompletion {
complete: Box::new(move |_| {
*syncing.borrow_mut() = false;
}),
});
page_io.sync(Rc::new(completion))?;
Ok(())
}
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Clone)]
pub enum BTreeCell {

View file

@ -170,11 +170,7 @@ impl Wal for WalFile {
fn should_checkpoint(&self) -> bool {
let frame_id = *self.max_frame.borrow() as usize;
if frame_id < self.checkpoint_threshold {
true
} else {
false
}
frame_id >= self.checkpoint_threshold
}
fn checkpoint(
@ -195,7 +191,7 @@ impl Wal for WalFile {
return Ok(CheckpointStatus::IO);
}
begin_write_btree_page(pager, &page, write_counter.clone());
begin_write_btree_page(pager, &page, write_counter.clone())?;
self.ongoing_checkpoint.insert(page_id);
}