check order of rowids

This commit is contained in:
Pere Diaz Bou 2025-06-11 17:56:19 +02:00
parent d3c646378a
commit a24e1b775c

View file

@ -5098,22 +5098,28 @@ impl BTreeCursor {
}
}
#[derive(Clone)]
struct IntegrityCheckPageEntry {
page_idx: usize,
level: usize,
max_intkey: i64,
}
pub struct IntegrityCheckState {
pub current_page: usize,
page_stack: PageStack,
start: bool,
page_stack: Vec<IntegrityCheckPageEntry>,
first_leaf_level: Option<usize>,
}
impl IntegrityCheckState {
pub fn new(page_idx: usize) -> Self {
Self {
current_page: page_idx,
page_stack: PageStack {
current_page: Cell::new(-1),
cell_indices: RefCell::new([0; BTCURSOR_MAX_DEPTH + 1]),
stack: RefCell::new([const { None }; BTCURSOR_MAX_DEPTH + 1]),
},
start: true,
page_stack: vec![IntegrityCheckPageEntry {
page_idx,
level: 0,
max_intkey: i64::MAX,
}],
first_leaf_level: None,
}
}
}
@ -5121,32 +5127,51 @@ impl std::fmt::Debug for IntegrityCheckState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IntegrityCheckState")
.field("current_page", &self.current_page)
.field("start", &self.start)
.field("first_leaf_level", &self.first_leaf_level)
.finish()
}
}
/// Perform integrity check on a whole table/index. We check for:
/// 1. Correct order of keys in case of rowids.
/// 2. There are no overlap between cells.
/// 3. Cells do not scape outside expected range.
/// 4. Depth of leaf pages are equal.
/// 5. Overflow pages are correct (TODO)
///
/// In order to keep this reentrant, we keep a stack of pages we need to check. Ideally, like in
/// SQLlite, we would have implemented a recursive solution which would make it easier to check the
/// depth.
pub fn integrity_check(
state: &mut IntegrityCheckState,
error_count: &mut usize,
message: &mut String,
pager: &Rc<Pager>,
) -> Result<CursorResult<()>> {
if state.start {
let page = btree_read_page(pager, state.current_page)?;
return_if_locked_maybe_load!(pager, page);
state.page_stack.push(page);
state.start = false;
}
let page = state.page_stack.top();
let Some(IntegrityCheckPageEntry { page_idx, level, max_intkey }) = state.page_stack.last().cloned() else {
return Ok(CursorResult::Ok(()));
};
let page = btree_read_page(pager, page_idx)?;
return_if_locked_maybe_load!(pager, page);
state.page_stack.pop();
let page = page.get();
let contents = page.get_contents();
let usable_space = pager.usable_space() as u16;
let mut coverage_checker = CoverageChecker::new(page.get().id);
for cell_idx in 0..contents.cell_count() {
// Now we check every cell for few things:
// 1. Check cell is in correct range. Not exceeds page and not starts before we have marked
// (cell content area).
// 2. We add the cell to coverage checker in order to check if cells do not overlap.
// 3. We check order of rowids in case of table pages. We iterate backwards in order to check
// if current cell's rowid is less than the next cell. We also check rowid is less than the
// parent's divider cell. In case of this page being root page max rowid will be i64::MAX.
// 4. We append pages to the stack to check later.
// 5. In case of leaf page, check if the current level(depth) is equal to other leaf pages we
// have seen.
let mut next_rowid = max_intkey;
for cell_idx in (0..contents.cell_count()).rev() {
let (cell_start, cell_length) = contents.cell_get_raw_region(
cell_idx,
payload_overflow_threshold_max(contents.page_type(), usable_space),
@ -5166,10 +5191,69 @@ pub fn integrity_check(
*error_count += 1;
}
coverage_checker.add_cell(cell_start, cell_start + cell_length);
// TODO: check order
// TODO: check overflow
let cell = contents.cell_get(
cell_idx,
payload_overflow_threshold_max(contents.page_type(), usable_space),
payload_overflow_threshold_min(contents.page_type(), usable_space),
usable_space as usize,
)?;
match cell {
BTreeCell::TableInteriorCell(table_interior_cell) => {
state.page_stack.push(IntegrityCheckPageEntry {
page_idx: table_interior_cell._left_child_page as usize,
level: level + 1,
max_intkey: table_interior_cell._rowid,
});
let rowid = table_interior_cell._rowid;
if rowid > max_intkey || rowid > next_rowid {
let error_msg = format!("Page {} cell {} has rowid={} in wrong order. Parent cell has parent_rowid={} and next_rowid={}", page.get().id, cell_idx, rowid, max_intkey, next_rowid);
message.write_str(&error_msg).unwrap();
*error_count += 1;
}
next_rowid = rowid;
}
BTreeCell::TableLeafCell(table_leaf_cell) => {
// check depth of leaf pages are equal
if let Some(expected_leaf_level) = state.first_leaf_level {
if expected_leaf_level != level {
let error_msg = format!("Page {} is at different depth from another leaf page this_page_depth={}, other_page_depth={} ", page.get().id, level, expected_leaf_level);
message.write_str(&error_msg).unwrap();
*error_count += 1;
}
} else {
state.first_leaf_level = Some(level);
}
let rowid = table_leaf_cell._rowid;
if rowid > max_intkey || rowid > next_rowid {
let error_msg = format!("Page {} cell {} has rowid={} in wrong order. Parent cell has parent_rowid={} and next_rowid={}", page.get().id, cell_idx, rowid, max_intkey, next_rowid);
message.write_str(&error_msg).unwrap();
*error_count += 1;
}
next_rowid = rowid;
}
BTreeCell::IndexInteriorCell(index_interior_cell) => {
state.page_stack.push(IntegrityCheckPageEntry {
page_idx: index_interior_cell.left_child_page as usize,
level: level + 1,
max_intkey, // we don't care about intkey in non-table pages
});
}
BTreeCell::IndexLeafCell(_) => {
// check depth of leaf pages are equal
if let Some(expected_leaf_level) = state.first_leaf_level {
if expected_leaf_level != level {
let error_msg = format!("Page {} is at different depth from another leaf page this_page_depth={}, other_page_depth={} ", page.get().id, level, expected_leaf_level);
message.write_str(&error_msg).unwrap();
*error_count += 1;
}
} else {
state.first_leaf_level = Some(level);
}
}
}
}
// Now we add free blocks to the coverage checker
let first_freeblock = contents.first_freeblock();
if first_freeblock > 0 {
let mut pc = first_freeblock;
@ -5193,6 +5277,7 @@ pub fn integrity_check(
}
}
// Let's check the overlap of freeblocks and cells now that we have collected them all.
coverage_checker.analyze(
usable_space,
contents.cell_content_area() as usize,
@ -5201,7 +5286,7 @@ pub fn integrity_check(
contents.num_frag_free_bytes() as usize,
);
Ok(CursorResult::Ok(()))
Ok(CursorResult::IO)
}
pub fn btree_read_page(pager: &Rc<Pager>, page_idx: usize) -> Result<BTreePage> {