core/io: Switch to Arc<Completion>

This commit is contained in:
Pekka Enberg 2025-05-22 09:32:16 +03:00
parent 21535018aa
commit eca9a5b703
14 changed files with 78 additions and 63 deletions

View file

@ -444,8 +444,8 @@ impl DatabaseFile {
}
impl limbo_core::DatabaseStorage for DatabaseFile {
fn read_page(&self, page_idx: usize, c: limbo_core::Completion) -> limbo_core::Result<()> {
let r = match c {
fn read_page(&self, page_idx: usize, c: Arc<limbo_core::Completion>) -> limbo_core::Result<()> {
let r = match *c {
limbo_core::Completion::Read(ref r) => r,
_ => unreachable!(),
};
@ -463,7 +463,7 @@ impl limbo_core::DatabaseStorage for DatabaseFile {
&self,
page_idx: usize,
buffer: Arc<std::cell::RefCell<limbo_core::Buffer>>,
c: limbo_core::Completion,
c: Arc<limbo_core::Completion>,
) -> limbo_core::Result<()> {
let size = buffer.borrow().len();
let pos = (page_idx - 1) * size;
@ -471,7 +471,7 @@ impl limbo_core::DatabaseStorage for DatabaseFile {
Ok(())
}
fn sync(&self, c: limbo_core::Completion) -> limbo_core::Result<()> {
fn sync(&self, c: Arc<limbo_core::Completion>) -> limbo_core::Result<()> {
self.file.sync(c)
}
}

View file

@ -208,8 +208,8 @@ impl limbo_core::File for File {
Ok(())
}
fn pread(&self, pos: usize, c: limbo_core::Completion) -> Result<()> {
let r = match &c {
fn pread(&self, pos: usize, c: Arc<limbo_core::Completion>) -> Result<()> {
let r = match *c {
limbo_core::Completion::Read(ref r) => r,
_ => unreachable!(),
};
@ -227,9 +227,9 @@ impl limbo_core::File for File {
&self,
pos: usize,
buffer: Arc<std::cell::RefCell<limbo_core::Buffer>>,
c: limbo_core::Completion,
c: Arc<limbo_core::Completion>,
) -> Result<()> {
let w = match &c {
let w = match *c {
limbo_core::Completion::Write(ref w) => w,
_ => unreachable!(),
};
@ -240,7 +240,7 @@ impl limbo_core::File for File {
Ok(())
}
fn sync(&self, c: limbo_core::Completion) -> Result<()> {
fn sync(&self, c: Arc<limbo_core::Completion>) -> Result<()> {
self.vfs.sync(self.fd);
c.complete(0);
Ok(())
@ -326,8 +326,8 @@ impl DatabaseFile {
}
impl limbo_core::DatabaseStorage for DatabaseFile {
fn read_page(&self, page_idx: usize, c: limbo_core::Completion) -> Result<()> {
let r = match c {
fn read_page(&self, page_idx: usize, c: Arc<limbo_core::Completion>) -> Result<()> {
let r = match *c {
limbo_core::Completion::Read(ref r) => r,
_ => unreachable!(),
};
@ -345,7 +345,7 @@ impl limbo_core::DatabaseStorage for DatabaseFile {
&self,
page_idx: usize,
buffer: Arc<std::cell::RefCell<limbo_core::Buffer>>,
c: limbo_core::Completion,
c: Arc<limbo_core::Completion>,
) -> Result<()> {
let size = buffer.borrow().len();
let pos = (page_idx - 1) * size;
@ -353,7 +353,7 @@ impl limbo_core::DatabaseStorage for DatabaseFile {
Ok(())
}
fn sync(&self, _c: limbo_core::Completion) -> Result<()> {
fn sync(&self, _c: Arc<limbo_core::Completion>) -> Result<()> {
todo!()
}
}

View file

@ -79,11 +79,11 @@ impl File for GenericFile {
Ok(())
}
fn pread(&self, pos: usize, c: Completion) -> Result<()> {
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<()> {
let mut file = self.file.borrow_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
{
let r = match c {
let r = match *c {
Completion::Read(ref r) => r,
_ => unreachable!(),
};
@ -95,7 +95,7 @@ impl File for GenericFile {
Ok(())
}
fn pwrite(&self, pos: usize, buffer: Arc<RefCell<crate::Buffer>>, c: Completion) -> Result<()> {
fn pwrite(&self, pos: usize, buffer: Arc<RefCell<crate::Buffer>>, c: Arc<Completion>) -> Result<()> {
let mut file = self.file.borrow_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
let buf = buffer.borrow();
@ -105,7 +105,7 @@ impl File for GenericFile {
Ok(())
}
fn sync(&self, c: Completion) -> Result<()> {
fn sync(&self, c: Arc<Completion>) -> Result<()> {
let mut file = self.file.borrow_mut();
file.sync_all().map_err(|err| LimboError::IOError(err))?;
c.complete(0);

View file

@ -1,3 +1,5 @@
#![allow(clippy::arc_with_non_send_sync)]
use super::{common, Completion, File, OpenFlags, WriteCompletion, IO};
use crate::io::clock::{Clock, Instant};
use crate::{LimboError, MemoryIO, Result};
@ -93,7 +95,7 @@ impl InnerUringIO {
}
impl WrappedIOUring {
fn submit_entry(&mut self, entry: &io_uring::squeue::Entry, c: Completion) {
fn submit_entry(&mut self, entry: &io_uring::squeue::Entry, c: Arc<Completion>) {
trace!("submit_entry({:?})", entry);
self.pending[entry.get_user_data() as usize] = Some(c);
unsafe {
@ -263,7 +265,7 @@ impl File for UringFile {
Ok(())
}
fn pread(&self, pos: usize, c: Completion) -> Result<()> {
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<()> {
let r = c.as_read();
trace!("pread(pos = {}, length = {})", pos, r.buf().len());
let fd = io_uring::types::Fd(self.file.as_raw_fd());
@ -282,7 +284,7 @@ impl File for UringFile {
Ok(())
}
fn pwrite(&self, pos: usize, buffer: Arc<RefCell<crate::Buffer>>, c: Completion) -> Result<()> {
fn pwrite(&self, pos: usize, buffer: Arc<RefCell<crate::Buffer>>, c: Arc<Completion>) -> Result<()> {
let mut io = self.io.borrow_mut();
let fd = io_uring::types::Fd(self.file.as_raw_fd());
let write = {

View file

@ -79,7 +79,7 @@ impl File for MemoryFile {
Ok(())
}
fn pread(&self, pos: usize, c: Completion) -> Result<()> {
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<()> {
let r = c.as_read();
let buf_len = r.buf().len();
if buf_len == 0 {
@ -120,7 +120,7 @@ impl File for MemoryFile {
Ok(())
}
fn pwrite(&self, pos: usize, buffer: Arc<RefCell<Buffer>>, c: Completion) -> Result<()> {
fn pwrite(&self, pos: usize, buffer: Arc<RefCell<Buffer>>, c: Arc<Completion>) -> Result<()> {
let buf = buffer.borrow();
let buf_len = buf.len();
if buf_len == 0 {
@ -156,7 +156,7 @@ impl File for MemoryFile {
Ok(())
}
fn sync(&self, c: Completion) -> Result<()> {
fn sync(&self, c: Arc<Completion>) -> Result<()> {
// no-op
c.complete(0);
Ok(())

View file

@ -14,9 +14,9 @@ use std::{
pub trait File: Send + Sync {
fn lock_file(&self, exclusive: bool) -> Result<()>;
fn unlock_file(&self) -> Result<()>;
fn pread(&self, pos: usize, c: Completion) -> Result<()>;
fn pwrite(&self, pos: usize, buffer: Arc<RefCell<Buffer>>, c: Completion) -> Result<()>;
fn sync(&self, c: Completion) -> Result<()>;
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<()>;
fn pwrite(&self, pos: usize, buffer: Arc<RefCell<Buffer>>, c: Arc<Completion>) -> Result<()>;
fn sync(&self, c: Arc<Completion>) -> Result<()>;
fn size(&self) -> Result<u64>;
}

View file

@ -268,10 +268,10 @@ impl IO for UnixIO {
}
enum CompletionCallback {
Read(Arc<RefCell<std::fs::File>>, Completion, usize),
Read(Arc<RefCell<std::fs::File>>, Arc<Completion>, usize),
Write(
Arc<RefCell<std::fs::File>>,
Completion,
Arc<Completion>,
Arc<RefCell<crate::Buffer>>,
usize,
),
@ -326,7 +326,7 @@ impl File for UnixFile<'_> {
Ok(())
}
fn pread(&self, pos: usize, c: Completion) -> Result<()> {
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<()> {
let file = self.file.borrow();
let result = {
let r = c.as_read();
@ -358,7 +358,7 @@ impl File for UnixFile<'_> {
}
}
fn pwrite(&self, pos: usize, buffer: Arc<RefCell<crate::Buffer>>, c: Completion) -> Result<()> {
fn pwrite(&self, pos: usize, buffer: Arc<RefCell<crate::Buffer>>, c: Arc<Completion>) -> Result<()> {
let file = self.file.borrow();
let result = {
let buf = buffer.borrow();
@ -387,7 +387,7 @@ impl File for UnixFile<'_> {
}
}
fn sync(&self, c: Completion) -> Result<()> {
fn sync(&self, c: Arc<Completion>) -> Result<()> {
let file = self.file.borrow();
let result = fs::fsync(file.as_fd());
match result {

View file

@ -93,8 +93,8 @@ impl File for VfsFileImpl {
Ok(())
}
fn pread(&self, pos: usize, c: Completion) -> Result<()> {
let r = match &c {
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<()> {
let r = match &*c {
Completion::Read(ref r) => r,
_ => unreachable!(),
};
@ -112,7 +112,7 @@ impl File for VfsFileImpl {
}
}
fn pwrite(&self, pos: usize, buffer: Arc<RefCell<Buffer>>, c: Completion) -> Result<()> {
fn pwrite(&self, pos: usize, buffer: Arc<RefCell<Buffer>>, c: Arc<Completion>) -> Result<()> {
let buf = buffer.borrow();
let count = buf.as_slice().len();
if self.vfs.is_null() {
@ -136,7 +136,7 @@ impl File for VfsFileImpl {
}
}
fn sync(&self, c: Completion) -> Result<()> {
fn sync(&self, c: Arc<Completion>) -> Result<()> {
let vfs = unsafe { &*self.vfs };
let result = unsafe { (vfs.sync)(self.file) };
if result < 0 {

View file

@ -1,3 +1,5 @@
#![allow(clippy::arc_with_non_send_sync)]
mod error;
mod ext;
mod fast_lock;
@ -302,7 +304,8 @@ pub fn maybe_init_database_file(file: &Arc<dyn File>, io: &Arc<dyn IO>) -> Resul
let completion = Completion::Write(WriteCompletion::new(Box::new(move |_| {
*flag_complete.borrow_mut() = true;
})));
file.pwrite(0, contents.buffer.clone(), completion)?;
#[allow(clippy::arc_with_non_send_sync)]
file.pwrite(0, contents.buffer.clone(), Arc::new(completion))?;
}
let mut limit = 100;
loop {

View file

@ -6776,7 +6776,7 @@ mod tests {
let write_complete = Box::new(|_| {});
let c = Completion::Write(WriteCompletion::new(write_complete));
db_file.write_page(1, buf.clone(), c).unwrap();
db_file.write_page(1, buf.clone(), Arc::new(c)).unwrap();
let wal_shared = WalFileShared::open_shared(&io, "test.wal", page_size).unwrap();
let wal = Rc::new(RefCell::new(WalFile::new(
@ -6826,9 +6826,10 @@ mod tests {
)));
let write_complete = Box::new(|_| {});
let c = Completion::Write(WriteCompletion::new(write_complete));
#[allow(clippy::arc_with_non_send_sync)]
pager
.db_file
.write_page(current_page as usize, buf.clone(), c)?;
.write_page(current_page as usize, buf.clone(), Arc::new(c))?;
pager.io.run_once()?;
let page = cursor.read_page(current_page as usize)?;

View file

@ -8,14 +8,14 @@ use std::{cell::RefCell, sync::Arc};
/// the storage medium. A database can either be a file on disk, like in SQLite,
/// or something like a remote page server service.
pub trait DatabaseStorage: Send + Sync {
fn read_page(&self, page_idx: usize, c: Completion) -> Result<()>;
fn read_page(&self, page_idx: usize, c: Arc<Completion>) -> Result<()>;
fn write_page(
&self,
page_idx: usize,
buffer: Arc<RefCell<Buffer>>,
c: Completion,
c: Arc<Completion>,
) -> Result<()>;
fn sync(&self, c: Completion) -> Result<()>;
fn sync(&self, c: Arc<Completion>) -> Result<()>;
}
#[cfg(feature = "fs")]
@ -30,7 +30,7 @@ unsafe impl Sync for DatabaseFile {}
#[cfg(feature = "fs")]
impl DatabaseStorage for DatabaseFile {
fn read_page(&self, page_idx: usize, c: Completion) -> Result<()> {
fn read_page(&self, page_idx: usize, c: Arc<Completion>) -> Result<()> {
let r = c.as_read();
let size = r.buf().len();
assert!(page_idx > 0);
@ -46,7 +46,7 @@ impl DatabaseStorage for DatabaseFile {
&self,
page_idx: usize,
buffer: Arc<RefCell<Buffer>>,
c: Completion,
c: Arc<Completion>,
) -> Result<()> {
let buffer_size = buffer.borrow().len();
assert!(page_idx > 0);
@ -58,7 +58,7 @@ impl DatabaseStorage for DatabaseFile {
Ok(())
}
fn sync(&self, c: Completion) -> Result<()> {
fn sync(&self, c: Arc<Completion>) -> Result<()> {
self.file.sync(c)
}
}
@ -78,8 +78,8 @@ unsafe impl Send for FileMemoryStorage {}
unsafe impl Sync for FileMemoryStorage {}
impl DatabaseStorage for FileMemoryStorage {
fn read_page(&self, page_idx: usize, c: Completion) -> Result<()> {
let r = match c {
fn read_page(&self, page_idx: usize, c: Arc<Completion>) -> Result<()> {
let r = match *c {
Completion::Read(ref r) => r,
_ => unreachable!(),
};
@ -97,7 +97,7 @@ impl DatabaseStorage for FileMemoryStorage {
&self,
page_idx: usize,
buffer: Arc<RefCell<Buffer>>,
c: Completion,
c: Arc<Completion>,
) -> Result<()> {
let buffer_size = buffer.borrow().len();
assert!(buffer_size >= 512);
@ -108,7 +108,7 @@ impl DatabaseStorage for FileMemoryStorage {
Ok(())
}
fn sync(&self, c: Completion) -> Result<()> {
fn sync(&self, c: Arc<Completion>) -> Result<()> {
self.file.sync(c)
}
}

View file

@ -41,6 +41,8 @@
//!
//! https://www.sqlite.org/fileformat.html
#![allow(clippy::arc_with_non_send_sync)]
use crate::error::LimboError;
use crate::fast_lock::SpinLock;
use crate::io::{Buffer, Complete, Completion, ReadCompletion, SyncCompletion, WriteCompletion};
@ -293,7 +295,8 @@ pub fn begin_read_database_header(
finish_read_database_header(buf, header).unwrap();
});
let c = Completion::Read(ReadCompletion::new(buf, complete));
db_file.read_page(1, c)?;
#[allow(clippy::arc_with_non_send_sync)]
db_file.read_page(1, Arc::new(c))?;
Ok(result)
}
@ -355,7 +358,8 @@ pub fn begin_write_database_header(header: &DatabaseHeader, pager: &Pager) -> Re
#[allow(clippy::arc_with_non_send_sync)]
let buf = Arc::new(RefCell::new(Buffer::allocate(512, drop_fn)));
let c = Completion::Read(ReadCompletion::new(buf, read_complete));
page_source.read_page(1, c)?;
#[allow(clippy::arc_with_non_send_sync)]
page_source.read_page(1, Arc::new(c))?;
// run get header block
pager.io.run_once()?;
@ -369,7 +373,7 @@ pub fn begin_write_database_header(header: &DatabaseHeader, pager: &Pager) -> Re
});
let c = Completion::Write(WriteCompletion::new(write_complete));
page_source.write_page(1, buffer_to_copy, c)?;
page_source.write_page(1, buffer_to_copy, Arc::new(c))?;
Ok(())
}
@ -819,7 +823,7 @@ pub fn begin_read_page(
}
});
let c = Completion::Read(ReadCompletion::new(buf, complete));
db_file.read_page(page_idx, c)?;
db_file.read_page(page_idx, Arc::new(c))?;
Ok(())
}
@ -877,7 +881,7 @@ pub fn begin_write_btree_page(
})
};
let c = Completion::Write(WriteCompletion::new(write_complete));
page_source.write_page(page_id, buffer.clone(), c)?;
page_source.write_page(page_id, buffer.clone(), Arc::new(c))?;
Ok(())
}
@ -889,7 +893,8 @@ pub fn begin_sync(db_file: Arc<dyn DatabaseStorage>, syncing: Rc<RefCell<bool>>)
*syncing.borrow_mut() = false;
}),
});
db_file.sync(completion)?;
#[allow(clippy::arc_with_non_send_sync)]
db_file.sync(Arc::new(completion))?;
Ok(())
}
@ -1519,7 +1524,7 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
wfs_data.loaded.store(true, Ordering::SeqCst);
});
let c = Completion::Read(ReadCompletion::new(buf_for_pread, complete));
file.pread(0, c)?;
file.pread(0, Arc::new(c))?;
Ok(wal_file_shared_ret)
}
@ -1536,9 +1541,9 @@ pub fn begin_read_wal_frame(
let buffer_pool = buffer_pool.clone();
buffer_pool.put(buf);
});
#[allow(clippy::arc_with_non_send_sync)]
let buf = Arc::new(RefCell::new(Buffer::new(buf, drop_fn)));
let c = Completion::Read(ReadCompletion::new(buf, complete));
#[allow(clippy::arc_with_non_send_sync)]
let c = Arc::new(Completion::Read(ReadCompletion::new(buf, complete)));
io.pread(offset, c)?;
Ok(())
}
@ -1622,7 +1627,8 @@ pub fn begin_write_wal_frame(
}
})
};
let c = Completion::Write(WriteCompletion::new(write_complete));
#[allow(clippy::arc_with_non_send_sync)]
let c = Arc::new(Completion::Write(WriteCompletion::new(write_complete)));
io.pwrite(offset, buffer.clone(), c)?;
trace!("Frame written and synced at offset={offset}");
Ok(checksums)
@ -1657,7 +1663,8 @@ pub fn begin_write_wal_header(io: &Arc<dyn File>, header: &WalHeader) -> Result<
}
})
};
let c = Completion::Write(WriteCompletion::new(write_complete));
#[allow(clippy::arc_with_non_send_sync)]
let c = Arc::new(Completion::Write(WriteCompletion::new(write_complete)));
io.pwrite(0, buffer.clone(), c)?;
Ok(())
}

View file

@ -1,3 +1,5 @@
#![allow(clippy::arc_with_non_send_sync)]
use std::cell::UnsafeCell;
use std::collections::HashMap;
use tracing::{debug, trace};
@ -750,7 +752,7 @@ impl Wal for WalFile {
*syncing.borrow_mut() = false;
}),
});
shared.file.sync(completion)?;
shared.file.sync(Arc::new(completion))?;
}
self.sync_state.replace(SyncState::Syncing);
Ok(WalFsyncStatus::IO)

View file

@ -77,7 +77,7 @@ impl File for SimulatorFile {
self.inner.unlock_file()
}
fn pread(&self, pos: usize, c: limbo_core::Completion) -> Result<()> {
fn pread(&self, pos: usize, c: Arc<limbo_core::Completion>) -> Result<()> {
*self.nr_pread_calls.borrow_mut() += 1;
if *self.fault.borrow() {
*self.nr_pread_faults.borrow_mut() += 1;
@ -92,7 +92,7 @@ impl File for SimulatorFile {
&self,
pos: usize,
buffer: Arc<RefCell<limbo_core::Buffer>>,
c: limbo_core::Completion,
c: Arc<limbo_core::Completion>,
) -> Result<()> {
*self.nr_pwrite_calls.borrow_mut() += 1;
if *self.fault.borrow() {
@ -104,7 +104,7 @@ impl File for SimulatorFile {
self.inner.pwrite(pos, buffer, c)
}
fn sync(&self, c: limbo_core::Completion) -> Result<()> {
fn sync(&self, c: Arc<limbo_core::Completion>) -> Result<()> {
*self.nr_sync_calls.borrow_mut() += 1;
self.inner.sync(c)
}