mirror of
https://github.com/tursodatabase/limbo.git
synced 2025-08-04 18:18:03 +00:00
Wrap IoUring to ensure pending_ops is always correctly updated
Adds a struct WrappedIOUring which contains a IoUring and a pending_ops field. Entry submission and popping from the queue is done through functions operating on it, which also maintains pending_ops count NOTE: This is a bit weird/hacky since in get_completion we create a CompletionQueue and just call its next(). If it were a normal iterator it would always return the same first item. However it is a queue posing as an iterator which makes this work.
This commit is contained in:
parent
a7d735d368
commit
b7debabd81
1 changed files with 47 additions and 26 deletions
|
@ -3,7 +3,7 @@ use crate::{LimboError, Result};
|
|||
use libc::{c_short, fcntl, flock, iovec, F_SETLK};
|
||||
use log::{debug, trace};
|
||||
use nix::fcntl::{FcntlArg, OFlag};
|
||||
use std::cell::{RefCell, RefMut};
|
||||
use std::cell::RefCell;
|
||||
use std::fmt;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use std::rc::Rc;
|
||||
|
@ -33,24 +33,27 @@ pub struct LinuxIO {
|
|||
inner: Rc<RefCell<InnerLinuxIO>>,
|
||||
}
|
||||
|
||||
pub struct InnerLinuxIO {
|
||||
struct WrappedIOUring {
|
||||
ring: io_uring::IoUring,
|
||||
pending_ops: usize,
|
||||
}
|
||||
|
||||
struct InnerLinuxIO {
|
||||
ring: WrappedIOUring,
|
||||
iovecs: [iovec; MAX_IOVECS],
|
||||
next_iovec: usize,
|
||||
pending_ops: usize,
|
||||
}
|
||||
|
||||
impl LinuxIO {
|
||||
pub fn new() -> Result<Self> {
|
||||
let ring = io_uring::IoUring::new(MAX_IOVECS as u32)?;
|
||||
let inner = InnerLinuxIO {
|
||||
ring: ring,
|
||||
ring: WrappedIOUring{ring, pending_ops: 0},
|
||||
iovecs: [iovec {
|
||||
iov_base: std::ptr::null_mut(),
|
||||
iov_len: 0,
|
||||
}; MAX_IOVECS],
|
||||
next_iovec: 0,
|
||||
pending_ops: 0,
|
||||
};
|
||||
Ok(Self {
|
||||
inner: Rc::new(RefCell::new(inner)),
|
||||
|
@ -68,6 +71,36 @@ impl InnerLinuxIO {
|
|||
}
|
||||
}
|
||||
|
||||
impl WrappedIOUring {
|
||||
fn submit_entry(&mut self, entry: &io_uring::squeue::Entry) {
|
||||
unsafe {
|
||||
self.ring.submission()
|
||||
.push(entry)
|
||||
.expect("submission queue is full");
|
||||
}
|
||||
self.pending_ops += 1;
|
||||
}
|
||||
|
||||
fn wait_for_completion(&mut self) -> Result<()> {
|
||||
self.ring.submit_and_wait(1)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_completion(&mut self) -> Option<io_uring::cqueue::Entry> {
|
||||
// NOTE: This works because CompletionQueue's next function pops the head of the queue. This is not normal behaviour of iterators
|
||||
let entry = self.ring.completion().next();
|
||||
if entry.is_some() {
|
||||
// consumed an entry from completion queue, update pending_ops
|
||||
self.pending_ops -= 1;
|
||||
}
|
||||
entry
|
||||
}
|
||||
|
||||
fn empty(&self) -> bool {
|
||||
self.pending_ops == 0
|
||||
}
|
||||
}
|
||||
|
||||
impl IO for LinuxIO {
|
||||
fn open_file(&self, path: &str) -> Result<Rc<dyn File>> {
|
||||
trace!("open_file(path = {})", path);
|
||||
|
@ -91,15 +124,15 @@ impl IO for LinuxIO {
|
|||
|
||||
fn run_once(&self) -> Result<()> {
|
||||
trace!("run_once()");
|
||||
let inner = self.inner.borrow_mut();
|
||||
let (mut pending_ops, mut ring) = RefMut::map_split(inner, |inner_ref: &mut InnerLinuxIO| (&mut inner_ref.pending_ops, &mut inner_ref.ring));
|
||||
if *pending_ops == 0 {
|
||||
return Ok(());
|
||||
let mut inner = self.inner.borrow_mut();
|
||||
let ring = &mut inner.ring;
|
||||
|
||||
if ring.empty() {
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
ring.submit_and_wait(1)?;
|
||||
while let Some(cqe) = ring.completion().next() {
|
||||
*pending_ops -= 1;
|
||||
ring.wait_for_completion()?;
|
||||
while let Some(cqe) = ring.get_completion() {
|
||||
let result = cqe.result();
|
||||
if result < 0 {
|
||||
return Err(LimboError::LinuxIOError(format!(
|
||||
|
@ -199,13 +232,7 @@ impl File for LinuxFile {
|
|||
.build()
|
||||
.user_data(ptr as u64)
|
||||
};
|
||||
let ring = &mut io.ring;
|
||||
unsafe {
|
||||
ring.submission()
|
||||
.push(&read_e)
|
||||
.expect("submission queue is full");
|
||||
}
|
||||
io.pending_ops += 1;
|
||||
io.ring.submit_entry(&read_e);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -226,13 +253,7 @@ impl File for LinuxFile {
|
|||
.build()
|
||||
.user_data(ptr as u64)
|
||||
};
|
||||
let ring = &mut io.ring;
|
||||
unsafe {
|
||||
ring.submission()
|
||||
.push(&write)
|
||||
.expect("submission queue is full");
|
||||
}
|
||||
io.pending_ops += 1;
|
||||
io.ring.submit_entry(&write);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue