core/io: Add wait_for_completion() to I/O dispatcher

This commit is contained in:
Pekka Enberg 2025-05-22 09:43:28 +03:00
parent eca9a5b703
commit 05df548b10
11 changed files with 88 additions and 10 deletions

View file

@ -283,6 +283,13 @@ impl limbo_core::IO for PlatformIO {
}))
}
fn wait_for_completion(&self, c: Arc<limbo_core::Completion>) -> Result<()> {
while !c.is_completed() {
self.run_once()?;
}
Ok(())
}
fn run_once(&self) -> Result<()> {
Ok(())
}

View file

@ -35,6 +35,13 @@ impl IO for GenericIO {
}))
}
fn wait_for_completion(&self, c: Arc<Completion>) -> Result<()> {
while !c.is_completed() {
self.run_once()?;
}
Ok(())
}
fn run_once(&self) -> Result<()> {
Ok(())
}

View file

@ -45,7 +45,7 @@ unsafe impl Sync for UringIO {}
struct WrappedIOUring {
ring: io_uring::IoUring,
pending_ops: usize,
pub pending: [Option<Completion>; MAX_IOVECS as usize + 1],
pub pending: [Option<Arc<Completion>>; MAX_IOVECS as usize + 1],
key: u64,
}
@ -169,6 +169,13 @@ impl IO for UringIO {
Ok(uring_file)
}
fn wait_for_completion(&self, c: Arc<Completion>) -> Result<()> {
while !c.is_completed() {
self.run_once()?;
}
Ok(())
}
fn run_once(&self) -> Result<()> {
trace!("run_once()");
let mut inner = self.inner.borrow_mut();
@ -298,16 +305,16 @@ impl File for UringFile {
};
io.ring.submit_entry(
&write,
Completion::Write(WriteCompletion::new(Box::new(move |result| {
Arc::new(Completion::Write(WriteCompletion::new(Box::new(move |result| {
c.complete(result);
// NOTE: Explicitly reference buffer to ensure it lives until here
let _ = buffer.borrow();
}))),
})))),
);
Ok(())
}
fn sync(&self, c: Completion) -> Result<()> {
fn sync(&self, c: Arc<Completion>) -> Result<()> {
let fd = io_uring::types::Fd(self.file.as_raw_fd());
let mut io = self.io.borrow_mut();
trace!("sync()");

View file

@ -53,6 +53,10 @@ impl IO for MemoryIO {
Ok(())
}
fn wait_for_completion(&self, _c: Arc<Completion>) -> Result<()> {
todo!();
}
fn generate_random_number(&self) -> i64 {
let mut buf = [0u8; 8];
getrandom::getrandom(&mut buf).unwrap();

View file

@ -42,6 +42,8 @@ pub trait IO: Clock + Send + Sync {
fn run_once(&self) -> Result<()>;
fn wait_for_completion(&self, c: Arc<Completion>) -> Result<()>;
fn generate_random_number(&self) -> i64;
fn get_memory_io(&self) -> Arc<MemoryIO>;
@ -60,9 +62,18 @@ pub enum Completion {
pub struct ReadCompletion {
pub buf: Arc<RefCell<Buffer>>,
pub complete: Box<Complete>,
pub is_completed: RefCell<bool>,
}
impl Completion {
pub fn is_completed(&self) -> bool {
match self {
Self::Read(r) => *r.is_completed.borrow(),
Self::Write(w) => *w.is_completed.borrow(),
Self::Sync(s) => *s.is_completed.borrow(),
}
}
pub fn complete(&self, result: i32) {
match self {
Self::Read(r) => r.complete(),
@ -83,15 +94,21 @@ impl Completion {
pub struct WriteCompletion {
pub complete: Box<WriteComplete>,
pub is_completed: RefCell<bool>,
}
pub struct SyncCompletion {
pub complete: Box<SyncComplete>,
pub is_completed: RefCell<bool>,
}
impl ReadCompletion {
pub fn new(buf: Arc<RefCell<Buffer>>, complete: Box<Complete>) -> Self {
Self { buf, complete }
Self {
buf,
complete,
is_completed: RefCell::new(false),
}
}
pub fn buf(&self) -> Ref<'_, Buffer> {
@ -104,26 +121,35 @@ impl ReadCompletion {
pub fn complete(&self) {
(self.complete)(self.buf.clone());
*self.is_completed.borrow_mut() = true;
}
}
impl WriteCompletion {
pub fn new(complete: Box<WriteComplete>) -> Self {
Self { complete }
Self {
complete,
is_completed: RefCell::new(false),
}
}
pub fn complete(&self, bytes_written: i32) {
(self.complete)(bytes_written);
*self.is_completed.borrow_mut() = true;
}
}
impl SyncCompletion {
pub fn new(complete: Box<SyncComplete>) -> Self {
Self { complete }
Self {
complete,
is_completed: RefCell::new(false),
}
}
pub fn complete(&self, res: i32) {
(self.complete)(res);
*self.is_completed.borrow_mut() = true;
}
}

View file

@ -256,6 +256,13 @@ impl IO for UnixIO {
Ok(())
}
fn wait_for_completion(&self, c: Arc<Completion>) -> Result<()> {
while !c.is_completed() {
self.run_once()?;
}
Ok(())
}
fn generate_random_number(&self) -> i64 {
let mut buf = [0u8; 8];
getrandom::getrandom(&mut buf).unwrap();

View file

@ -43,6 +43,10 @@ impl IO for VfsMod {
Ok(())
}
fn wait_for_completion(&self, _c: Arc<Completion>) -> Result<()> {
todo!();
}
fn generate_random_number(&self) -> i64 {
if self.ctx.is_null() {
return -1;

View file

@ -33,6 +33,13 @@ impl IO for WindowsIO {
}))
}
fn wait_for_completion(&self, c: Arc<Completion>) -> Result<()> {
while !c.is_completed() {
self.run_once()?;
}
Ok(())
}
fn run_once(&self) -> Result<()> {
Ok(())
}
@ -74,7 +81,7 @@ impl File for WindowsFile {
unimplemented!()
}
fn pread(&self, pos: usize, c: Completion) -> Result<()> {
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<()> {
let mut file = self.file.borrow_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
{
@ -87,7 +94,7 @@ impl File for WindowsFile {
Ok(())
}
fn pwrite(&self, pos: usize, buffer: Arc<RefCell<crate::Buffer>>, c: Completion) -> Result<()> {
fn pwrite(&self, pos: usize, buffer: Arc<RefCell<crate::Buffer>>, c: Arc<Completion>) -> Result<()> {
let mut file = self.file.borrow_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
let buf = buffer.borrow();
@ -97,7 +104,7 @@ impl File for WindowsFile {
Ok(())
}
fn sync(&self, c: Completion) -> Result<()> {
fn sync(&self, c: Arc<Completion>) -> Result<()> {
let file = self.file.borrow_mut();
file.sync_all().map_err(LimboError::IOError)?;
c.complete(0);

View file

@ -892,6 +892,7 @@ pub fn begin_sync(db_file: Arc<dyn DatabaseStorage>, syncing: Rc<RefCell<bool>>)
complete: Box::new(move |_| {
*syncing.borrow_mut() = false;
}),
is_completed: RefCell::new(false),
});
#[allow(clippy::arc_with_non_send_sync)]
db_file.sync(Arc::new(completion))?;

View file

@ -751,6 +751,7 @@ impl Wal for WalFile {
debug!("wal_sync finish");
*syncing.borrow_mut() = false;
}),
is_completed: RefCell::new(false),
});
shared.file.sync(Arc::new(completion))?;
}

View file

@ -83,6 +83,13 @@ impl IO for SimulatorIO {
Ok(file)
}
fn wait_for_completion(&self, c: Arc<limbo_core::Completion>) -> Result<()> {
while !c.is_completed() {
self.run_once()?;
}
Ok(())
}
fn run_once(&self) -> Result<()> {
if *self.fault.borrow() {
*self.nr_run_once_faults.borrow_mut() += 1;