various fixes in btree

* read_u8 now takes self.offset into account
* shift cell pointers left on balance_root with offset > 0
* fix wrong writes to page in degragment_page
This commit is contained in:
Pere Diaz Bou 2024-11-19 17:15:19 +01:00
parent db343ac5ea
commit f5a1f7c800
6 changed files with 56 additions and 28 deletions

View file

@ -93,16 +93,18 @@ fn main() -> anyhow::Result<()> {
// At prompt, increment interrupt count
if interrupt_count.fetch_add(1, Ordering::SeqCst) >= 1 {
eprintln!("Interrupted. Exiting...");
conn.close();
conn.close()?;
break;
}
println!("Use .quit to exit or press Ctrl-C again to force quit.");
continue;
}
Err(ReadlineError::Eof) => {
conn.close()?;
break;
}
Err(err) => {
conn.close()?;
anyhow::bail!(err)
}
}

View file

@ -2,10 +2,7 @@ use log::debug;
use nix::NixPath;
use crate::storage::pager::{Page, Pager};
use crate::storage::sqlite3_ondisk::{
read_btree_cell, read_varint, write_varint, BTreeCell, DatabaseHeader, PageContent, PageType,
TableInteriorCell, TableLeafCell,
};
use crate::storage::sqlite3_ondisk::{read_btree_cell, read_record, read_varint, write_varint, BTreeCell, DatabaseHeader, PageContent, PageType, TableInteriorCell, TableLeafCell};
use crate::types::{Cursor, CursorResult, OwnedRecord, OwnedValue, SeekKey, SeekOp};
use crate::Result;
@ -745,7 +742,6 @@ impl BTreeCursor {
// NOTE: since we are doing a simple split we only finding the pointer we want to update (right pointer).
// Right pointer means cell that points to the last page, as we don't really want to drop this one. This one
// can be a "rightmost pointer" or a "cell".
// TODO(pere): simplify locking...
// we always asumme there is a parent
let current_page = self.stack.top();
let mut page_rc = current_page.borrow_mut();
@ -823,7 +819,7 @@ impl BTreeCursor {
debug!(
"splitting left={} right={}",
self.stack.current(),
page_rc.id,
right_page_id
);
@ -913,7 +909,9 @@ impl BTreeCursor {
contents.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, cell_content_area_start);
contents.write_u8(BTREE_HEADER_OFFSET_FRAGMENTED, 0);
contents.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, 0);
if !contents.is_leaf() {
contents.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, 0);
}
}
// distribute cells
@ -922,8 +920,11 @@ impl BTreeCursor {
let mut current_cell_index = 0_usize;
let mut divider_cells_index = Vec::new(); /* index to scratch cells that will be used as dividers in order */
debug!("balance_leaf::distribute(cells={}, cells_per_page={})", scratch_cells.len(), cells_per_page);
for (i, page) in new_pages.iter_mut().enumerate() {
let mut page = page.borrow_mut();
let page_id = page.id;
let contents = page.contents.as_mut().unwrap();
let last_page = i == new_pages_len - 1;
@ -933,9 +934,12 @@ impl BTreeCursor {
} else {
cells_per_page
};
debug!("balance_leaf::distribute(page={}, cells_to_copy={})", page_id, cells_to_copy);
let cell_index_range = current_cell_index..current_cell_index + cells_to_copy;
for (j, cell_idx) in cell_index_range.enumerate() {
debug!("balance_leaf::distribute_in_page(page={}, cells_to_copy={}, j={}, cell_idx={})", page_id, cells_to_copy, j, cell_idx);
let cell = scratch_cells[cell_idx];
self.insert_into_cell(contents, cell, j);
}
@ -1050,19 +1054,24 @@ impl BTreeCursor {
/* todo: balance deeper, create child and copy contents of root there. Then split root */
/* if we are in root page then we just need to create a new root and push key there */
let new_root_page_ref = self.allocate_page(PageType::TableInterior, DATABASE_HEADER_SIZE);
let is_page_1 = {
let current_root = self.stack.top();
let current_root_ref = current_root.borrow();
current_root_ref.id == 1
};
let offset = if is_page_1 { DATABASE_HEADER_SIZE } else { 0 };
let new_root_page_ref = self.allocate_page(PageType::TableInterior, offset);
{
let current_root = self.stack.top();
let current_root_ref = current_root.borrow();
let current_root_contents = current_root_ref.contents.as_ref().unwrap();
let is_page_1 = current_root_contents.offset == DATABASE_HEADER_SIZE;
let mut new_root_page = new_root_page_ref.borrow_mut();
let new_root_page_id = new_root_page.id;
let new_root_page_contents = new_root_page.contents.as_mut().unwrap();
if is_page_1 {
new_root_page_contents
.write_u8(BTREE_HEADER_OFFSET_TYPE, PageType::TableInterior as u8);
// Copy header
let current_root_buf = current_root_contents.as_ptr();
let new_root_buf = new_root_page_contents.as_ptr();
new_root_buf[0..DATABASE_HEADER_SIZE]
@ -1074,8 +1083,7 @@ impl BTreeCursor {
new_root_page_contents.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, 0);
// TODO:: this page should have offset
// copy header bytes to here
is_page_1
};
}
/* swap splitted page buffer with new root buffer so we don't have to update page idx */
{
@ -1092,11 +1100,19 @@ impl BTreeCursor {
// Also change the offset of page
//
if is_page_1 {
// Remove heaader from child and set offset to 0
// Remove header from child and set offset to 0
let contents = child_rc.contents.as_mut().unwrap();
contents.offset = 0;
let (cell_pointer_offset, _ ) = contents.cell_get_raw_pointer_region();
// change cell pointers
for cell_idx in 0..contents.cell_count() {
let cell_pointer_offset = cell_pointer_offset + (2 * cell_idx) - offset;
let pc = contents.read_u16(cell_pointer_offset);
contents.write_u16(cell_pointer_offset, pc - offset as u16);
}
contents.offset = 0;
let buf = contents.as_ptr();
buf.copy_within(DATABASE_HEADER_SIZE.., 0);
buf.copy_within(DATABASE_HEADER_SIZE.., 0);
}
self.pager.add_dirty(new_root_page.id);
@ -1175,7 +1191,9 @@ impl BTreeCursor {
}
fn defragment_page(&self, page: &PageContent, db_header: Ref<DatabaseHeader>) {
log::debug!("defragment_page");
let cloned_page = page.clone();
// TODO(pere): usable space should include offset probably
let usable_space = (db_header.page_size - db_header.unused_space as u16) as u64;
let mut cbrk = usable_space;
@ -1253,10 +1271,9 @@ impl BTreeCursor {
let write_buf = page.as_ptr();
// set new first byte of cell content
write_buf[5..7].copy_from_slice(&(cbrk as u16).to_be_bytes());
page.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, cbrk as u16);
// set free block to 0, unused spaced can be retrieved from gap between cell pointer end and content start
write_buf[1] = 0;
write_buf[2] = 0;
page.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, 0);
// set unused space to 0
let first_cell = cloned_page.cell_content_area() as u64;
assert!(first_cell <= cbrk);
@ -1267,6 +1284,7 @@ impl BTreeCursor {
// and end of cell pointer area.
#[allow(unused_assignments)]
fn compute_free_space(&self, page: &PageContent, db_header: Ref<DatabaseHeader>) -> u16 {
// TODO(pere): maybe free space is not calculated correctly with offset
let buf = page.as_ptr();
let usable_space = (db_header.page_size - db_header.unused_space as u16) as usize;
@ -1280,7 +1298,8 @@ impl BTreeCursor {
let ncell = page.cell_count();
// 8 + 4 == header end
let first_cell = (page.offset + 8 + 4 + (2 * ncell)) as u16;
let child_pointer_size = if page.is_leaf() { 0 } else { 4 };
let first_cell = (page.offset + 8 + child_pointer_size + (2 * ncell)) as u16;
let mut nfree = fragmented_free_bytes as usize + first_byte_in_cell_content as usize;
@ -1317,7 +1336,7 @@ impl BTreeCursor {
// return SQLITE_CORRUPT_PAGE(pPage);
// }
// don't count header and cell pointers?
nfree -= first_cell as usize;
nfree = nfree - first_cell as usize;
nfree as u16
}

View file

@ -139,8 +139,8 @@ impl DumbLruPageCache {
}
pub fn insert(&mut self, key: usize, value: Rc<RefCell<Page>>) {
debug!("cache_insert(key={})", key);
self._delete(key, false);
debug!("cache_insert(key={})", key);
let mut entry = Box::new(PageCacheEntry {
key,
next: None,
@ -660,6 +660,7 @@ pub fn allocate_page(
bp.put(buf);
});
let buffer = Rc::new(RefCell::new(Buffer::new(buffer, drop_fn)));
page.set_loaded();
page.contents = Some(PageContent {
offset,
buffer,

View file

@ -320,11 +320,11 @@ impl Clone for PageContent {
impl PageContent {
pub fn page_type(&self) -> PageType {
self.read_u8(self.offset).try_into().unwrap()
self.read_u8(0).try_into().unwrap()
}
pub fn maybe_page_type(&self) -> Option<PageType> {
match self.read_u8(self.offset).try_into() {
match self.read_u8(0).try_into() {
Ok(v) => Some(v),
Err(_) => None, // this could be an overflow page
}
@ -342,7 +342,7 @@ impl PageContent {
fn read_u8(&self, pos: usize) -> u8 {
let buf = self.as_ptr();
buf[pos]
buf[self.offset + pos]
}
pub fn read_u16(&self, pos: usize) -> u16 {
@ -361,16 +361,19 @@ impl PageContent {
}
pub fn write_u8(&self, pos: usize, value: u8) {
log::debug!("write_u8(pos={}, value={})", pos, value);
let buf = self.as_ptr();
buf[self.offset + pos] = value;
}
pub fn write_u16(&self, pos: usize, value: u16) {
log::debug!("write_u16(pos={}, value={})", pos, value);
let buf = self.as_ptr();
buf[self.offset + pos..self.offset + pos + 2].copy_from_slice(&value.to_be_bytes());
}
pub fn write_u32(&self, pos: usize, value: u32) {
log::debug!("write_u32(pos={}, value={})", pos, value);
let buf = self.as_ptr();
buf[self.offset + pos..self.offset + pos + 4].copy_from_slice(&value.to_be_bytes());
}
@ -433,6 +436,7 @@ impl PageContent {
)
}
/// When using this fu
pub fn cell_get_raw_pointer_region(&self) -> (usize, usize) {
let cell_start = match self.page_type() {
PageType::IndexInterior => 12,

View file

@ -222,10 +222,10 @@ fn do_write(env: &mut SimulatorEnv, conn: &mut Rc<Connection>) -> Result<()> {
}
fn compare_equal_rows(a: &Vec<Vec<Value>>, b: &Vec<Vec<Value>>) {
assert_eq!(a.len(), b.len());
assert_eq!(a.len(), b.len(), "lengths are different");
for (r1, r2) in a.iter().zip(b) {
for (v1, v2) in r1.iter().zip(r2) {
assert_eq!(v1, v2);
assert_eq!(v1, v2, "values are different");
}
}
}

View file

@ -367,6 +367,7 @@ mod tests {
let conn = tmp_db.connect_limbo();
insert(1, &conn, &tmp_db).unwrap();
assert_eq!(count(&conn, &tmp_db).unwrap(), 1);
conn.close()?;
}
{
let conn = tmp_db.connect_limbo();
@ -375,6 +376,7 @@ mod tests {
1,
"failed to read from wal from another connection"
);
conn.close()?;
}
Ok(())
}