Merge 'Add manual WAL sync before checkpoint in con.close, fix async bug in checkpoint' from Preston Thorpe

In the normal path, we properly fsync the wal file before beginning a
checkpoint. However when we forcefully checkpoint upon closing the
connection we miss this step.

Closes #1695
This commit is contained in:
Jussi Saurio 2025-06-13 11:15:39 +03:00
commit 0e5abb41c6
3 changed files with 43 additions and 36 deletions

View file

@ -566,17 +566,7 @@ impl Connection {
/// Close a connection and checkpoint.
pub fn close(&self) -> Result<()> {
loop {
// TODO: make this async?
match self.pager.checkpoint()? {
CheckpointStatus::Done(_) => {
return Ok(());
}
CheckpointStatus::IO => {
self.pager.io.run_once()?;
}
};
}
self.pager.checkpoint_shutdown()
}
pub fn last_insert_rowid(&self) -> i64 {

View file

@ -821,6 +821,25 @@ impl Pager {
.expect("Failed to clear page cache");
}
pub fn checkpoint_shutdown(&self) -> Result<()> {
let mut attempts = 0;
{
let mut wal = self.wal.borrow_mut();
// fsync the wal syncronously before beginning checkpoint
while let Ok(WalFsyncStatus::IO) = wal.sync() {
if attempts >= 10 {
return Err(LimboError::InternalError(
"Failed to fsync WAL before final checkpoint, fd likely closed".into(),
));
}
self.io.run_once()?;
attempts += 1;
}
}
self.wal_checkpoint();
Ok(())
}
pub fn wal_checkpoint(&self) -> CheckpointResult {
let checkpoint_result: CheckpointResult;
loop {

View file

@ -72,7 +72,7 @@ pub enum CheckpointMode {
Truncate,
}
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct LimboRwLock {
lock: AtomicU32,
nreads: AtomicU32,
@ -390,8 +390,8 @@ pub struct WalFile {
io: Arc<dyn IO>,
buffer_pool: Rc<BufferPool>,
sync_state: RefCell<SyncState>,
syncing: Rc<RefCell<bool>>,
syncing: Rc<Cell<bool>>,
sync_state: Cell<SyncState>,
page_size: u32,
shared: Arc<UnsafeCell<WalFileShared>>,
@ -410,8 +410,8 @@ pub struct WalFile {
impl fmt::Debug for WalFile {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WalFile")
.field("syncing", &self.syncing.get())
.field("sync_state", &self.sync_state)
.field("syncing", &self.syncing)
.field("page_size", &self.page_size)
.field("shared", &self.shared)
.field("ongoing_checkpoint", &self.ongoing_checkpoint)
@ -748,7 +748,6 @@ impl Wal for WalFile {
self.buffer_pool.clone(),
)?;
self.ongoing_checkpoint.state = CheckpointState::WaitReadFrame;
self.ongoing_checkpoint.current_page += 1;
continue 'checkpoint_loop;
}
}
@ -778,6 +777,7 @@ impl Wal for WalFile {
if (self.ongoing_checkpoint.current_page as usize)
< shared.pages_in_frames.lock().len()
{
self.ongoing_checkpoint.current_page += 1;
self.ongoing_checkpoint.state = CheckpointState::ReadFrame;
} else {
self.ongoing_checkpoint.state = CheckpointState::Done;
@ -828,31 +828,29 @@ impl Wal for WalFile {
#[instrument(skip_all, level = Level::DEBUG)]
fn sync(&mut self) -> Result<WalFsyncStatus> {
let state = *self.sync_state.borrow();
match state {
match self.sync_state.get() {
SyncState::NotSyncing => {
let shared = self.get_shared();
tracing::debug!("wal_sync");
{
let syncing = self.syncing.clone();
*syncing.borrow_mut() = true;
let completion = Completion::Sync(SyncCompletion {
complete: Box::new(move |_| {
tracing::debug!("wal_sync finish");
*syncing.borrow_mut() = false;
}),
is_completed: Cell::new(false),
});
shared.file.sync(Arc::new(completion))?;
}
self.sync_state.replace(SyncState::Syncing);
let syncing = self.syncing.clone();
self.syncing.set(true);
let completion = Completion::Sync(SyncCompletion {
complete: Box::new(move |_| {
tracing::debug!("wal_sync finish");
syncing.set(false);
}),
is_completed: Cell::new(false),
});
let shared = self.get_shared();
shared.file.sync(Arc::new(completion))?;
self.sync_state.set(SyncState::Syncing);
Ok(WalFsyncStatus::IO)
}
SyncState::Syncing => {
if *self.syncing.borrow() {
if self.syncing.get() {
tracing::debug!("wal_sync is already syncing");
Ok(WalFsyncStatus::IO)
} else {
self.sync_state.replace(SyncState::NotSyncing);
self.sync_state.set(SyncState::NotSyncing);
Ok(WalFsyncStatus::Done)
}
}
@ -901,11 +899,11 @@ impl WalFile {
max_frame: 0,
current_page: 0,
},
syncing: Rc::new(RefCell::new(false)),
checkpoint_threshold: 1000,
page_size,
buffer_pool,
sync_state: RefCell::new(SyncState::NotSyncing),
syncing: Rc::new(Cell::new(false)),
sync_state: Cell::new(SyncState::NotSyncing),
max_frame: 0,
min_frame: 0,
max_frame_read_lock_index: 0,