1. Added frangmentation handling to find_free_cell

2. Added asserts and corruption error to `allocate_cell_space`
This commit is contained in:
krishvishal 2025-01-26 17:35:10 +05:30
parent 8263bd0482
commit 7d67895306

View file

@ -798,7 +798,10 @@ impl BTreeCursor {
}
// TODO: insert into cell payload in internal page
let new_cell_data_pointer = self.allocate_cell_space(page, payload.len() as u16);
// TODO: handle the unwrap
let new_cell_data_pointer = self
.allocate_cell_space(page, payload.len() as u16)
.expect("Failed to allocate cell space");
let buf = page.as_ptr();
// copy data
@ -979,6 +982,67 @@ impl BTreeCursor {
}
}
fn find_free_cell(
&self,
page_ref: &PageContent,
amount: usize,
db_header: Ref<DatabaseHeader>,
) -> Result<usize> {
// NOTE: freelist is in ascending order of keys and pc
// unused_space is reserved bytes at the end of page, therefore we must substract from maxpc
let mut free_list_pointer_addr = 1;
let mut pc = page_ref.first_freeblock() as usize;
let usable_space = (db_header.page_size - db_header.reserved_space as u16) as usize;
let maxpc = usable_space - amount;
if pc == 0 {
return Ok(0);
}
while pc <= maxpc {
let size = page_ref.read_u16(pc + 2) as usize;
if let Some(x) = size.checked_sub(amount) {
if x < 4 {
if page_ref.read_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT) > 57 {
return Ok(0);
}
let next_ptr = page_ref.read_u16(pc);
page_ref.write_u16(free_list_pointer_addr, next_ptr);
let frag_count = page_ref.read_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT);
page_ref.write_u8(
PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT,
frag_count + x as u8,
);
return Ok(pc);
} else if x + pc > maxpc {
return Err(LimboError::Corrupt("Free block extends beyond page".into()));
} else {
page_ref.write_u16(pc + 2, x as u16);
return Ok(pc + x);
}
}
free_list_pointer_addr = pc;
pc = page_ref.read_u16(pc) as usize;
if pc <= free_list_pointer_addr && pc != 0 {
return Err(LimboError::Corrupt(
"Free list not in ascending order".into(),
));
}
}
if pc > maxpc + amount - 4 {
return Err(LimboError::Corrupt(
"Free block chain extends beyond page end".into(),
));
}
Ok(0)
}
/// Balance a leaf page.
/// Balancing is done when a page overflows.
/// see e.g. https://en.wikipedia.org/wiki/B-tree
@ -1430,38 +1494,41 @@ impl BTreeCursor {
}
/// Allocate space for a cell on a page.
fn allocate_cell_space(&self, page_ref: &PageContent, amount: u16) -> u16 {
fn allocate_cell_space(&self, page_ref: &mut PageContent, amount: u16) -> Result<u16> {
let amount = amount as usize;
let (cell_offset, _) = page_ref.cell_pointer_array_offset_and_size();
let gap = cell_offset + 2 * page_ref.cell_count();
let mut top = page_ref.cell_content_area() as usize;
// there are free blocks and enough space
if page_ref.first_freeblock() != 0 && gap + 2 <= top {
// find slot
let db_header = RefCell::borrow(&self.pager.db_header);
let pc = find_free_cell(page_ref, db_header, amount);
let db_header = RefCell::borrow(&self.database_header);
let pc = self.find_free_cell(page_ref, amount, db_header)?;
if pc != 0 {
return pc as u16;
// Corruption check
if pc <= gap {
return Err(LimboError::Corrupt(
"Corrupted page: free block overlaps cell pointer array".into(),
));
}
return Ok(pc as u16);
}
/* fall through, we might need to defragment */
}
if gap + 2 + amount > top {
// defragment
self.defragment_page(page_ref, RefCell::borrow(&self.pager.db_header));
top = page_ref.read_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA) as usize;
assert!(gap + 2 + amount <= top);
}
let db_header = RefCell::borrow(&self.pager.db_header);
top -= amount;
page_ref.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, top as u16);
let db_header = RefCell::borrow(&self.database_header);
let usable_space = (db_header.page_size - db_header.reserved_space as u16) as usize;
assert!(top + amount <= usable_space);
top as u16
Ok(top as u16)
}
/// Defragment a page. This means packing all the cells to the end of the page.
@ -2269,32 +2336,6 @@ impl PageStack {
}
}
fn find_free_cell(page_ref: &PageContent, db_header: Ref<DatabaseHeader>, amount: usize) -> usize {
// NOTE: freelist is in ascending order of keys and pc
// unuse_space is reserved bytes at the end of page, therefore we must substract from maxpc
let mut pc = page_ref.first_freeblock() as usize;
let buf = page_ref.as_ptr();
let usable_space = (db_header.page_size - db_header.reserved_space as u16) as usize;
let maxpc = usable_space - amount;
let mut found = false;
while pc <= maxpc {
let next = u16::from_be_bytes(buf[pc..pc + 2].try_into().unwrap());
let size = u16::from_be_bytes(buf[pc + 2..pc + 4].try_into().unwrap());
if amount <= size as usize {
found = true;
break;
}
pc = next as usize;
}
if !found {
0
} else {
pc
}
}
pub fn btree_init_page(
page: &PageRef,
page_type: PageType,