core: Clean up integrity_check()

Suggested by Jussi.
This commit is contained in:
Pekka Enberg 2025-06-16 13:58:12 +03:00
parent 882c5ca168
commit 4496a0d08a
2 changed files with 131 additions and 67 deletions

View file

@ -26,7 +26,7 @@ use std::{
cell::{Cell, Ref, RefCell},
cmp::{Ordering, Reverse},
collections::BinaryHeap,
fmt::{Debug, Write},
fmt::Debug,
pin::Pin,
rc::Rc,
sync::Arc,
@ -5106,6 +5106,62 @@ impl BTreeCursor {
}
}
#[derive(Debug, thiserror::Error)]
pub enum IntegrityCheckError {
#[error("Cell {cell_idx} in page {page_id} is out of range. cell_range={cell_start}..{cell_end}, content_area={content_area}, usable_space={usable_space}")]
CellOutOfRange {
cell_idx: usize,
page_id: usize,
cell_start: usize,
cell_end: usize,
content_area: usize,
usable_space: usize,
},
#[error("Cell {cell_idx} in page {page_id} extends out of page. cell_range={cell_start}..{cell_end}, content_area={content_area}, usable_space={usable_space}")]
CellOverflowsPage {
cell_idx: usize,
page_id: usize,
cell_start: usize,
cell_end: usize,
content_area: usize,
usable_space: usize,
},
#[error("Page {page_id} cell {cell_idx} has rowid={rowid} in wrong order. Parent cell has parent_rowid={max_intkey} and next_rowid={next_rowid}")]
CellRowidOutOfRange {
page_id: usize,
cell_idx: usize,
rowid: i64,
max_intkey: i64,
next_rowid: i64,
},
#[error("Page {page_id} is at different depth from another leaf page this_page_depth={this_page_depth}, other_page_depth={other_page_depth} ")]
LeafDepthMismatch {
page_id: usize,
this_page_depth: usize,
other_page_depth: usize,
},
#[error("Page {page_id} detected freeblock that extends page start={start} end={end}")]
FreeBlockOutOfRange {
page_id: usize,
start: usize,
end: usize,
},
#[error("Page {page_id} cell overlap detected at position={start} with previous_end={prev_end}. content_area={content_area}, is_free_block={is_free_block}")]
CellOverlap {
page_id: usize,
start: usize,
prev_end: usize,
content_area: usize,
is_free_block: bool,
},
#[error("Page {page_id} unexpected fragmentation got={got}, expected={expected}")]
UnexpectedFragmentation {
page_id: usize,
got: usize,
expected: usize,
},
}
#[derive(Clone)]
struct IntegrityCheckPageEntry {
page_idx: usize,
@ -5152,8 +5208,7 @@ impl std::fmt::Debug for IntegrityCheckState {
/// depth.
pub fn integrity_check(
state: &mut IntegrityCheckState,
error_count: &mut usize,
message: &mut String,
errors: &mut Vec<IntegrityCheckError>,
pager: &Rc<Pager>,
) -> Result<CursorResult<()>> {
let Some(IntegrityCheckPageEntry {
@ -5194,14 +5249,24 @@ pub fn integrity_check(
if cell_start < contents.cell_content_area() as usize
|| cell_start > usable_space as usize - 4
{
let error_msg = format!("Cell {} in page {} is out of range. cell_range={}..{}, content_area={}, usable_space={}\n", cell_idx, page.get().id, cell_start, cell_start+cell_length, contents.cell_content_area(), usable_space);
message.write_str(&error_msg).unwrap();
*error_count += 1;
errors.push(IntegrityCheckError::CellOutOfRange {
cell_idx,
page_id: page.get().id,
cell_start,
cell_end: cell_start + cell_length,
content_area: contents.cell_content_area() as usize,
usable_space: usable_space as usize,
});
}
if cell_start + cell_length > usable_space as usize {
let error_msg = format!("Cell {} in page {} extends out of page. cell_range={}..{}, content_area={}, usable_space={}\n", cell_idx, page.get().id, cell_start, cell_start+cell_length, contents.cell_content_area(), usable_space);
message.write_str(&error_msg).unwrap();
*error_count += 1;
errors.push(IntegrityCheckError::CellOverflowsPage {
cell_idx,
page_id: page.get().id,
cell_start,
cell_end: cell_start + cell_length,
content_area: contents.cell_content_area() as usize,
usable_space: usable_space as usize,
});
}
coverage_checker.add_cell(cell_start, cell_start + cell_length);
let cell = contents.cell_get(
@ -5219,9 +5284,13 @@ pub fn integrity_check(
});
let rowid = table_interior_cell._rowid;
if rowid > max_intkey || rowid > next_rowid {
let error_msg = format!("Page {} cell {} has rowid={} in wrong order. Parent cell has parent_rowid={} and next_rowid={}", page.get().id, cell_idx, rowid, max_intkey, next_rowid);
message.write_str(&error_msg).unwrap();
*error_count += 1;
errors.push(IntegrityCheckError::CellRowidOutOfRange {
page_id: page.get().id,
cell_idx,
rowid,
max_intkey,
next_rowid,
});
}
next_rowid = rowid;
}
@ -5229,18 +5298,24 @@ pub fn integrity_check(
// check depth of leaf pages are equal
if let Some(expected_leaf_level) = state.first_leaf_level {
if expected_leaf_level != level {
let error_msg = format!("Page {} is at different depth from another leaf page this_page_depth={}, other_page_depth={} ", page.get().id, level, expected_leaf_level);
message.write_str(&error_msg).unwrap();
*error_count += 1;
errors.push(IntegrityCheckError::LeafDepthMismatch {
page_id: page.get().id,
this_page_depth: level,
other_page_depth: expected_leaf_level,
});
}
} else {
state.first_leaf_level = Some(level);
}
let rowid = table_leaf_cell._rowid;
if rowid > max_intkey || rowid > next_rowid {
let error_msg = format!("Page {} cell {} has rowid={} in wrong order. Parent cell has parent_rowid={} and next_rowid={}", page.get().id, cell_idx, rowid, max_intkey, next_rowid);
message.write_str(&error_msg).unwrap();
*error_count += 1;
errors.push(IntegrityCheckError::CellRowidOutOfRange {
page_id: page.get().id,
cell_idx,
rowid,
max_intkey,
next_rowid,
});
}
next_rowid = rowid;
}
@ -5255,9 +5330,11 @@ pub fn integrity_check(
// check depth of leaf pages are equal
if let Some(expected_leaf_level) = state.first_leaf_level {
if expected_leaf_level != level {
let error_msg = format!("Page {} is at different depth from another leaf page this_page_depth={}, other_page_depth={} ", page.get().id, level, expected_leaf_level);
message.write_str(&error_msg).unwrap();
*error_count += 1;
errors.push(IntegrityCheckError::LeafDepthMismatch {
page_id: page.get().id,
this_page_depth: level,
other_page_depth: expected_leaf_level,
});
}
} else {
state.first_leaf_level = Some(level);
@ -5275,14 +5352,11 @@ pub fn integrity_check(
let size = contents.read_u16_no_offset(pc as usize + 2) as usize;
// check it doesn't go out of range
if pc > usable_space - 4 {
let error_msg = format!(
"Page {} detected freeblock that extends page start={} end={}",
page.get().id,
pc,
pc as usize + size
);
message.write_str(&error_msg).unwrap();
*error_count += 1;
errors.push(IntegrityCheckError::FreeBlockOutOfRange {
page_id: page.get().id,
start: pc as usize,
end: pc as usize + size,
});
break;
}
coverage_checker.add_free_block(pc as usize, pc as usize + size);
@ -5294,8 +5368,7 @@ pub fn integrity_check(
coverage_checker.analyze(
usable_space,
contents.cell_content_area() as usize,
message,
error_count,
errors,
contents.num_frag_free_bytes() as usize,
);
@ -5375,8 +5448,7 @@ impl CoverageChecker {
&mut self,
usable_space: u16,
content_area: usize,
message: &mut String,
error_count: &mut usize,
errors: &mut Vec<IntegrityCheckError>,
expected_fragmentation: usize,
) {
let mut fragmentation = 0;
@ -5384,12 +5456,13 @@ impl CoverageChecker {
while let Some(cell) = self.heap.pop() {
let start = cell.0.start;
if prev_end > start {
let error_msg = format!(
"Page {} cell overlap detected at position={} with previous_end={}. content_area={}, is_free_block={}",
self.page_idx, start, prev_end, content_area, cell.0.is_free_block
);
message.push_str(&error_msg);
*error_count += 1;
errors.push(IntegrityCheckError::CellOverlap {
page_id: self.page_idx,
start,
prev_end,
content_area,
is_free_block: cell.0.is_free_block,
});
break;
} else {
fragmentation += start - prev_end;
@ -5398,12 +5471,11 @@ impl CoverageChecker {
}
fragmentation += usable_space as usize - prev_end;
if fragmentation != expected_fragmentation {
let error_msg = format!(
"Page {} unexpected fragmentation got={}, expected={}",
self.page_idx, fragmentation, expected_fragmentation
);
message.push_str(&error_msg);
*error_count += 1;
errors.push(IntegrityCheckError::UnexpectedFragmentation {
page_id: self.page_idx,
got: fragmentation,
expected: expected_fragmentation,
});
}
}
}

View file

@ -2,7 +2,7 @@
use crate::function::AlterTableFunc;
use crate::numeric::{NullableInteger, Numeric};
use crate::schema::Schema;
use crate::storage::btree::{integrity_check, IntegrityCheckState};
use crate::storage::btree::{integrity_check, IntegrityCheckError, IntegrityCheckState};
use crate::storage::database::FileMemoryStorage;
use crate::storage::page_cache::DumbLruPageCache;
use crate::storage::pager::CreateBTreeFlags;
@ -24,7 +24,6 @@ use crate::{
},
types::compare_immutable,
};
use std::fmt::Write;
use std::{borrow::BorrowMut, rc::Rc, sync::Arc};
use crate::{pseudo::PseudoCursor, result::LimboResult};
@ -5408,8 +5407,7 @@ pub fn op_count(
pub enum OpIntegrityCheckState {
Start,
Checking {
error_count: usize,
message: String,
errors: Vec<IntegrityCheckError>,
current_root_idx: usize,
state: IntegrityCheckState,
},
@ -5432,37 +5430,31 @@ pub fn op_integrity_check(
match &mut state.op_integrity_check_state {
OpIntegrityCheckState::Start => {
state.op_integrity_check_state = OpIntegrityCheckState::Checking {
error_count: 0,
message: String::new(),
errors: Vec::new(),
current_root_idx: 0,
state: IntegrityCheckState::new(roots[0]),
};
}
OpIntegrityCheckState::Checking {
error_count,
message,
errors,
current_root_idx,
state: integrity_check_state,
} => {
return_if_io!(integrity_check(
integrity_check_state,
error_count,
message,
pager
));
return_if_io!(integrity_check(integrity_check_state, errors, pager));
*current_root_idx += 1;
if *current_root_idx < roots.len() {
*integrity_check_state = IntegrityCheckState::new(roots[*current_root_idx]);
return Ok(InsnFunctionStepResult::Step);
} else {
if *error_count == 0 {
message.write_str("ok").map_err(|err| {
LimboError::InternalError(format!(
"error appending message to integrity check {:?}",
err
))
})?;
}
let message = if errors.is_empty() {
"ok".to_string()
} else {
errors
.iter()
.map(|e| e.to_string())
.collect::<Vec<String>>()
.join("\n")
};
state.registers[*message_register] = Register::Value(Value::build_text(message));
state.op_integrity_check_state = OpIntegrityCheckState::Start;
state.pc += 1;