Initial implementation of read_write_payload_with_offset

This is a port of `accessPayload` in `btree.c`. This port gives
us the ability to read and write from a payload at a set offset an
-d amount. This will be used in the upcoming PRs that will add
incremental IO for blobs.
This commit is contained in:
krishvishal 2025-05-12 15:31:22 +05:30
parent d592e8ec8a
commit f6cc2c3cd8

View file

@ -200,6 +200,18 @@ enum ReadPayloadOverflow {
},
}
enum PayloadOverflowWithOffset {
ProcessPage {
payload: Vec<u8>,
next_page: u32,
remaining_to_read: usize,
page: PageRef,
current_offset: usize,
buffset_offset: usize,
is_write: bool,
},
}
#[derive(Clone, Debug)]
pub enum BTreeKey<'a> {
TableRowId((u64, Option<&'a ImmutableRecord>)),
@ -718,6 +730,134 @@ impl BTreeCursor {
}
}
/// Calculates how much of a cell's payload should be stored locally vs in overflow pages
///
/// Parameters:
/// - payload_len: Total length of the payload data
/// - page_type: Type of the B-tree page (affects local storage thresholds)
///
/// Returns:
/// - A tuple of (n_local, payload_len) where:
/// - n_local: Amount of payload to store locally on the page
/// - payload_len: Total payload length (unchanged from input)
pub fn parse_cell_info(
&self,
payload_len: usize,
page_type: PageType,
usable_size: usize,
) -> Result<(usize, usize)> {
let max_local = payload_overflow_threshold_max(page_type, usable_size as u16);
let min_local = payload_overflow_threshold_min(page_type, usable_size as u16);
// This matches btreeParseCellAdjustSizeForOverflow logic
let n_local = if payload_len <= max_local {
// Common case - everything fits locally
payload_len
} else {
// For payloads that need overflow pages:
// Calculate how much should be stored locally using the following formula:
// surplus = min_local + (payload_len - min_local) % (usable_space - 4)
//
// This tries to minimize unused space on overflow pages while keeping
// the local storage between min_local and max_local thresholds.
// The (usable_space - 4) factor accounts for overhead in overflow pages.
let surplus = min_local + (payload_len - min_local) % (self.usable_space() - 4);
if surplus <= max_local {
surplus
} else {
min_local
}
};
Ok((n_local, payload_len))
}
pub fn read_write_payload_with_offset(
&mut self,
mut offset: u32,
buffer: &mut Vec<u8>,
mut amount: u32,
is_write: bool,
) -> Result<CursorResult<()>> {
let page = self.stack.top();
return_if_locked_maybe_load!(self.pager, page);
let contents = page.get().contents.as_ref().unwrap();
let cell_idx = self.stack.current_cell_index() as usize - 1;
if cell_idx >= contents.cell_count() {
return Err(LimboError::Corrupt("Invalid cell index".into()));
}
let usable_size = self.usable_space();
let cell = contents
.cell_get(
cell_idx,
payload_overflow_threshold_max(contents.page_type(), usable_size as u16),
payload_overflow_threshold_min(contents.page_type(), usable_size as u16),
usable_size,
)
.unwrap();
let (payload, payload_size, first_overflow_page) = match cell {
BTreeCell::TableLeafCell(cell) => {
(cell._payload, cell.payload_size, cell.first_overflow_page)
}
BTreeCell::IndexLeafCell(cell) => {
(cell.payload, cell.payload_size, cell.first_overflow_page)
}
BTreeCell::IndexInteriorCell(cell) => {
(cell.payload, cell.payload_size, cell.first_overflow_page)
}
BTreeCell::TableInteriorCell(_) => {
return Err(LimboError::Corrupt(
"Cannot access payload of table interior cell".into(),
));
}
};
assert!(offset + amount <= payload_size as u32);
let (local_size, _) =
self.parse_cell_info(payload_size as usize, contents.page_type(), usable_size)?;
if offset < local_size as u32 {
let mut a: u32 = amount;
if a + offset > local_size as u32 {
a = local_size as u32 - offset;
self.read_write_payload_to_page(offset, page, buffer, amount, is_write);
offset = 0;
amount -= a;
}
} else {
offset -= local_size as u32;
}
Ok(CursorResult::Ok(()))
}
fn read_write_payload_to_page(
&mut self,
payload_offset: u32,
page: PageRef,
buffer: &mut Vec<u8>,
num_bytes: u32,
copy_to_page: bool,
) {
if copy_to_page {
page.set_dirty();
self.pager.add_dirty(page.get().id);
let page_contents = page.get().contents.as_mut().unwrap();
let page_buf = page_contents.as_ptr();
page_buf[payload_offset as usize..payload_offset as usize + num_bytes as usize]
.copy_from_slice(&buffer[..num_bytes as usize]);
} else {
buffer.extend_from_slice(
&page.get().contents.as_ref().unwrap().as_ptr()
[payload_offset as usize..payload_offset as usize + num_bytes as usize],
);
}
}
/// Move the cursor to the next record and return it.
/// Used in forwards iteration, which is the default.
fn get_next_record(
@ -4842,6 +4982,7 @@ fn insert_into_cell(
cell_idx: usize,
usable_space: u16,
) -> Result<()> {
println!("usable_space: {}", usable_space);
debug_validate_cells!(page, usable_space);
assert!(
cell_idx <= page.cell_count() + page.overflow_cells.len(),