Merge '(btree): Implement support for handling offset-based payload access with overflow support' from Krishna Vishal

This PR adds a new function `read_write_payload_with_offset` to support
reading and writing payload data at specific offsets, handling both
local content and overflow pages. This is a port of SQLite's
`accessPayload` function in `btree.c` and will be essential for
supporting incremental blob I/O in the coming PRs.
- Added a state machine called `PayloadOverflowWithOffset` to make the
procedure reentrant.
- Correctly processes both local payload data and payload stored in
overflow pages
Testing:
- Reading and writing to a column with no overflow pages.
- Reading and writing at an offset with overflow pages (spanning 10
pages)

Reviewed-by: Pere Diaz Bou <pere-altea@homail.com>

Closes #1476
This commit is contained in:
Jussi Saurio 2025-05-18 22:58:10 +03:00
commit 7c6a4410d2

View file

@ -198,6 +198,25 @@ enum ReadPayloadOverflow {
},
}
enum PayloadOverflowWithOffset {
SkipOverflowPages {
next_page: u32,
pages_left_to_skip: u32,
page_offset: u32,
amount: u32,
buffer_offset: usize,
is_write: bool,
},
ProcessPage {
next_page: u32,
remaining_to_read: u32,
page: PageRef,
current_offset: usize,
buffer_offset: usize,
is_write: bool,
},
}
#[derive(Clone, Debug)]
pub enum BTreeKey<'a> {
TableRowId((u64, Option<&'a ImmutableRecord>)),
@ -284,6 +303,7 @@ impl WriteInfo {
enum CursorState {
None,
Read(ReadPayloadOverflow),
ReadWritePayload(PayloadOverflowWithOffset),
Write(WriteInfo),
Destroy(DestroyInfo),
Delete(DeleteInfo),
@ -743,6 +763,330 @@ impl BTreeCursor {
}
}
/// Calculates how much of a cell's payload should be stored locally vs in overflow pages
///
/// Parameters:
/// - payload_len: Total length of the payload data
/// - page_type: Type of the B-tree page (affects local storage thresholds)
///
/// Returns:
/// - A tuple of (n_local, payload_len) where:
/// - n_local: Amount of payload to store locally on the page
/// - payload_len: Total payload length (unchanged from input)
pub fn parse_cell_info(
&self,
payload_len: usize,
page_type: PageType,
usable_size: usize,
) -> Result<(usize, usize)> {
let max_local = payload_overflow_threshold_max(page_type, usable_size as u16);
let min_local = payload_overflow_threshold_min(page_type, usable_size as u16);
// This matches btreeParseCellAdjustSizeForOverflow logic
let n_local = if payload_len <= max_local {
// Common case - everything fits locally
payload_len
} else {
// For payloads that need overflow pages:
// Calculate how much should be stored locally using the following formula:
// surplus = min_local + (payload_len - min_local) % (usable_space - 4)
//
// This tries to minimize unused space on overflow pages while keeping
// the local storage between min_local and max_local thresholds.
// The (usable_space - 4) factor accounts for overhead in overflow pages.
let surplus = min_local + (payload_len - min_local) % (self.usable_space() - 4);
if surplus <= max_local {
surplus
} else {
min_local
}
};
Ok((n_local, payload_len))
}
/// This function is used to read/write into the payload of a cell that
/// cursor is pointing to.
/// Parameters:
/// - offset: offset in the payload to start reading/writing
/// - buffer: buffer to read/write into
/// - amount: amount of bytes to read/write
/// - is_write: true if writing, false if reading
///
/// If the cell has overflow pages, it will skip till the overflow page which
/// is at the offset given.
pub fn read_write_payload_with_offset(
&mut self,
mut offset: u32,
buffer: &mut Vec<u8>,
mut amount: u32,
is_write: bool,
) -> Result<CursorResult<()>> {
if let CursorState::ReadWritePayload(PayloadOverflowWithOffset::SkipOverflowPages {
..
})
| CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage { .. }) =
&self.state
{
return self.continue_payload_overflow_with_offset(buffer, self.usable_space());
}
let page = self.stack.top();
return_if_locked_maybe_load!(self.pager, page);
let contents = page.get().contents.as_ref().unwrap();
let cell_idx = self.stack.current_cell_index() as usize - 1;
if cell_idx >= contents.cell_count() {
return Err(LimboError::Corrupt("Invalid cell index".into()));
}
let usable_size = self.usable_space();
let cell = contents
.cell_get(
cell_idx,
payload_overflow_threshold_max(contents.page_type(), usable_size as u16),
payload_overflow_threshold_min(contents.page_type(), usable_size as u16),
usable_size,
)
.unwrap();
let (payload, payload_size, first_overflow_page) = match cell {
BTreeCell::TableLeafCell(cell) => {
(cell._payload, cell.payload_size, cell.first_overflow_page)
}
BTreeCell::IndexLeafCell(cell) => {
(cell.payload, cell.payload_size, cell.first_overflow_page)
}
BTreeCell::IndexInteriorCell(cell) => {
(cell.payload, cell.payload_size, cell.first_overflow_page)
}
BTreeCell::TableInteriorCell(_) => {
return Err(LimboError::Corrupt(
"Cannot access payload of table interior cell".into(),
));
}
};
assert!(offset + amount <= payload_size as u32);
let (local_size, _) =
self.parse_cell_info(payload_size as usize, contents.page_type(), usable_size)?;
let mut bytes_processed: u32 = 0;
if offset < local_size as u32 {
let mut local_amount: u32 = amount;
if local_amount + offset > local_size as u32 {
local_amount = local_size as u32 - offset;
}
if is_write {
self.write_payload_to_page(offset, local_amount, payload, buffer, page.clone());
} else {
self.read_payload_from_page(offset, local_amount, payload, buffer);
}
offset = 0;
amount -= local_amount;
bytes_processed += local_amount;
} else {
offset -= local_size as u32;
}
if amount > 0 {
if first_overflow_page.is_none() {
return Err(LimboError::Corrupt(
"Expected overflow page but none found".into(),
));
}
let overflow_size = usable_size - 4;
let pages_to_skip = offset / overflow_size as u32;
let page_offset = offset % overflow_size as u32;
self.state =
CursorState::ReadWritePayload(PayloadOverflowWithOffset::SkipOverflowPages {
next_page: first_overflow_page.unwrap(),
pages_left_to_skip: pages_to_skip,
page_offset: page_offset,
amount: amount,
buffer_offset: bytes_processed as usize,
is_write,
});
return Ok(CursorResult::IO);
}
Ok(CursorResult::Ok(()))
}
pub fn continue_payload_overflow_with_offset(
&mut self,
buffer: &mut Vec<u8>,
usable_space: usize,
) -> Result<CursorResult<()>> {
loop {
let mut state = std::mem::replace(&mut self.state, CursorState::None);
match &mut state {
CursorState::ReadWritePayload(PayloadOverflowWithOffset::SkipOverflowPages {
next_page,
pages_left_to_skip,
page_offset,
amount,
buffer_offset,
is_write,
}) => {
if *pages_left_to_skip == 0 {
let page = self.pager.read_page(*next_page as usize)?;
return_if_locked_maybe_load!(self.pager, page);
self.state =
CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage {
next_page: *next_page,
remaining_to_read: *amount,
page: page,
current_offset: *page_offset as usize,
buffer_offset: *buffer_offset,
is_write: *is_write,
});
continue;
}
let page = self.pager.read_page(*next_page as usize)?;
return_if_locked_maybe_load!(self.pager, page);
let contents = page.get_contents();
let next = contents.read_u32_no_offset(0);
if next == 0 {
return Err(LimboError::Corrupt(
"Overflow chain ends prematurely".into(),
));
}
*next_page = next;
*pages_left_to_skip -= 1;
self.state = CursorState::ReadWritePayload(
PayloadOverflowWithOffset::SkipOverflowPages {
next_page: next,
pages_left_to_skip: *pages_left_to_skip,
page_offset: *page_offset,
amount: *amount,
buffer_offset: *buffer_offset,
is_write: *is_write,
},
);
return Ok(CursorResult::IO);
}
CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage {
next_page,
remaining_to_read,
page,
current_offset,
buffer_offset,
is_write,
}) => {
if page.is_locked() {
self.state =
CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage {
next_page: *next_page,
remaining_to_read: *remaining_to_read,
page: page.clone(),
current_offset: *current_offset,
buffer_offset: *buffer_offset,
is_write: *is_write,
});
return Ok(CursorResult::IO);
}
let contents = page.get_contents();
let overflow_size = usable_space - 4;
let page_offset = *current_offset;
let bytes_to_process = std::cmp::min(
*remaining_to_read,
overflow_size as u32 - page_offset as u32,
);
let payload_offset = 4 + page_offset;
let page_payload = contents.as_ptr();
if *is_write {
self.write_payload_to_page(
payload_offset as u32,
bytes_to_process,
page_payload,
buffer,
page.clone(),
);
} else {
self.read_payload_from_page(
payload_offset as u32,
bytes_to_process,
page_payload,
buffer,
);
}
*remaining_to_read -= bytes_to_process;
*buffer_offset += bytes_to_process as usize;
if *remaining_to_read == 0 {
self.state = CursorState::None;
return Ok(CursorResult::Ok(()));
}
let next = contents.read_u32_no_offset(0);
if next == 0 {
return Err(LimboError::Corrupt(
"Overflow chain ends prematurely".into(),
));
}
// Load next page
*next_page = next;
*current_offset = 0; // Reset offset for new page
*page = self.pager.read_page(next as usize)?;
// Return IO to allow other operations
return Ok(CursorResult::IO);
}
_ => {
return Err(LimboError::InternalError(
"Invalid state for continue_payload_overflow_with_offset".into(),
))
}
}
}
}
fn read_payload_from_page(
&self,
payload_offset: u32,
num_bytes: u32,
payload: &[u8],
buffer: &mut Vec<u8>,
) {
buffer.extend_from_slice(
&payload[payload_offset as usize..(payload_offset + num_bytes) as usize],
);
}
/// This function write from a buffer into a page.
/// SAFETY: This function uses unsafe in the write path to write to the page payload directly.
/// - Make sure the page is pointing to valid data ie the page is not evicted from the page-cache.
fn write_payload_to_page(
&mut self,
payload_offset: u32,
num_bytes: u32,
payload: &[u8],
buffer: &mut Vec<u8>,
page: PageRef,
) {
page.set_dirty();
self.pager.add_dirty(page.get().id);
// SAFETY: This is safe as long as the page is not evicted from the cache.
let payload_mut =
unsafe { std::slice::from_raw_parts_mut(payload.as_ptr() as *mut u8, payload.len()) };
payload_mut[payload_offset as usize..payload_offset as usize + num_bytes as usize]
.copy_from_slice(&buffer[..num_bytes as usize]);
}
/// Move the cursor to the next record and return it.
/// Used in forwards iteration, which is the default.
fn get_next_record(
@ -7215,6 +7559,168 @@ mod tests {
}
}
#[test]
pub fn test_read_write_payload_with_offset() {
let (pager, root_page) = empty_btree();
let mut cursor = BTreeCursor::new(None, pager.clone(), root_page);
let offset = 2; // blobs data starts at offset 2
let initial_text = "hello world";
let initial_blob = initial_text.as_bytes().to_vec();
let value = ImmutableRecord::from_registers(&[Register::OwnedValue(OwnedValue::Blob(
initial_blob.clone(),
))]);
run_until_done(
|| {
let key = SeekKey::TableRowId(1);
cursor.move_to(key, SeekOp::EQ)
},
pager.deref(),
)
.unwrap();
run_until_done(
|| cursor.insert(&BTreeKey::new_table_rowid(1, Some(&value)), true),
pager.deref(),
)
.unwrap();
cursor
.stack
.set_cell_index(cursor.stack.current_cell_index() + 1);
let mut read_buffer = Vec::new();
run_until_done(
|| {
cursor.read_write_payload_with_offset(
offset,
&mut read_buffer,
initial_blob.len() as u32,
false,
)
},
pager.deref(),
)
.unwrap();
assert_eq!(
std::str::from_utf8(&read_buffer).unwrap(),
initial_text,
"Read data doesn't match expected data"
);
let mut modified_hello = "olleh".as_bytes().to_vec();
run_until_done(
|| cursor.read_write_payload_with_offset(offset, &mut modified_hello, 5, true),
pager.deref(),
)
.unwrap();
let mut verification_buffer = Vec::new();
run_until_done(
|| {
cursor.read_write_payload_with_offset(
offset,
&mut verification_buffer,
initial_blob.len() as u32,
false,
)
},
pager.deref(),
)
.unwrap();
assert_eq!(
std::str::from_utf8(&verification_buffer).unwrap(),
"olleh world",
"Modified data doesn't match expected result"
);
}
#[test]
pub fn test_read_write_payload_with_overflow_page() {
let (pager, root_page) = empty_btree();
let mut cursor = BTreeCursor::new(None, pager.clone(), root_page);
let mut large_blob = vec![b'A'; 40960 - 11]; // insert large blob. 40960 = 10 page long.
let hello_world = b"hello world";
large_blob.extend_from_slice(hello_world);
let value = ImmutableRecord::from_registers(&[Register::OwnedValue(OwnedValue::Blob(
large_blob.clone(),
))]);
run_until_done(
|| {
let key = SeekKey::TableRowId(1);
cursor.move_to(key, SeekOp::EQ)
},
pager.deref(),
)
.unwrap();
run_until_done(
|| cursor.insert(&BTreeKey::new_table_rowid(1, Some(&value)), true),
pager.deref(),
)
.unwrap();
cursor
.stack
.set_cell_index(cursor.stack.current_cell_index() + 1);
let offset_to_hello_world = 4 + (large_blob.len() - 11) as u32; // this offset depends on the records type.
let mut read_buffer = Vec::new();
run_until_done(
|| {
cursor.read_write_payload_with_offset(
offset_to_hello_world,
&mut read_buffer,
11,
false,
)
},
pager.deref(),
)
.unwrap();
assert_eq!(
std::str::from_utf8(&read_buffer).unwrap(),
"hello world",
"Failed to read 'hello world' from overflow page"
);
let mut modified_hello = "olleh".as_bytes().to_vec();
run_until_done(
|| {
cursor.read_write_payload_with_offset(
offset_to_hello_world,
&mut modified_hello,
5,
true,
)
},
pager.deref(),
)
.unwrap();
let mut verification_buffer = Vec::new();
run_until_done(
|| {
cursor.read_write_payload_with_offset(
offset_to_hello_world,
&mut verification_buffer,
hello_world.len() as u32,
false,
)
},
pager.deref(),
)
.unwrap();
assert_eq!(
std::str::from_utf8(&verification_buffer).unwrap(),
"olleh world",
"Modified data doesn't match expected result"
);
}
fn run_until_done<T>(
mut action: impl FnMut() -> Result<CursorResult<T>>,
pager: &Pager,