make find_cell and process_overflow_page reentrant

This commit is contained in:
pedrocarlo 2025-05-31 23:24:27 -03:00
parent e97227ccb9
commit d688cfd547
2 changed files with 130 additions and 72 deletions

View file

@ -1,3 +1,5 @@
use tracing::{instrument, Level};
use crate::{
schema::Index,
storage::{
@ -23,6 +25,7 @@ use std::collections::HashSet;
use std::{
cell::{Cell, Ref, RefCell},
cmp::Ordering,
fmt::Debug,
pin::Pin,
rc::Rc,
sync::Arc,
@ -31,8 +34,7 @@ use std::{
use super::{
pager::PageRef,
sqlite3_ondisk::{
read_record, write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell,
DATABASE_HEADER_SIZE,
write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell, DATABASE_HEADER_SIZE,
},
};
@ -200,13 +202,11 @@ enum WriteState {
Finish,
}
enum ReadPayloadOverflow {
ProcessPage {
payload: Vec<u8>,
next_page: u32,
remaining_to_read: usize,
page: BTreePage,
},
struct ReadPayloadOverflow {
payload: Vec<u8>,
next_page: u32,
remaining_to_read: usize,
page: BTreePage,
}
enum PayloadOverflowWithOffset {
@ -322,7 +322,6 @@ enum CursorHasRecord {
/// was suspended due to IO.
enum CursorState {
None,
Read(ReadPayloadOverflow),
ReadWritePayload(PayloadOverflowWithOffset),
Write(WriteInfo),
Destroy(DestroyInfo),
@ -371,6 +370,18 @@ impl CursorState {
}
}
impl Debug for CursorState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Delete(..) => write!(f, "Delete"),
Self::Destroy(..) => write!(f, "Destroy"),
Self::None => write!(f, "None"),
Self::ReadWritePayload(..) => write!(f, "ReadWritePayload"),
Self::Write(..) => write!(f, "Write"),
}
}
}
enum OverflowState {
Start,
ProcessPage { next_page: u32 },
@ -475,6 +486,26 @@ impl CursorSeekState {
type CursorMoveToState = CursorSeekState;
#[derive(Debug)]
struct FindCellState(Option<usize>);
impl FindCellState {
#[inline]
fn set(&mut self, cell_idx: usize) {
self.0 = Some(cell_idx)
}
#[inline]
fn get_cell_idx(&mut self) -> usize {
self.0.expect("get can only be called after a set")
}
#[inline]
fn reset(&mut self) {
self.0 = None;
}
}
pub struct BTreeCursor {
/// The multi-version cursor that is used to read and write to the database file.
mv_cursor: Option<Rc<RefCell<MvCursor>>>,
@ -512,6 +543,11 @@ pub struct BTreeCursor {
collations: Vec<CollationSeq>,
seek_state: CursorSeekState,
move_to_state: CursorMoveToState,
/// Separate state to read a record with overflow pages. This separation from `state` is necessary as
/// we can be in a function that relies on `state`, but also needs to process overflow pages
read_overflow_state: Option<ReadPayloadOverflow>,
/// Contains the current cell_idx for `find_cell`
find_cell_state: FindCellState,
}
impl BTreeCursor {
@ -543,6 +579,8 @@ impl BTreeCursor {
collations,
seek_state: CursorSeekState::Start,
move_to_state: CursorMoveToState::Start,
read_overflow_state: None,
find_cell_state: FindCellState(None),
}
}
@ -834,11 +872,11 @@ impl BTreeCursor {
start_next_page: u32,
payload_size: u64,
) -> Result<CursorResult<()>> {
let res = match &mut self.state {
CursorState::None => {
let res = match &mut self.read_overflow_state {
None => {
tracing::debug!("start reading overflow page payload_size={}", payload_size);
let page = self.read_page(start_next_page as usize)?;
self.state = CursorState::Read(ReadPayloadOverflow::ProcessPage {
self.read_overflow_state = Some(ReadPayloadOverflow {
payload: payload.to_vec(),
next_page: start_next_page,
remaining_to_read: payload_size as usize - payload.len(),
@ -846,7 +884,7 @@ impl BTreeCursor {
});
CursorResult::IO
}
CursorState::Read(ReadPayloadOverflow::ProcessPage {
Some(ReadPayloadOverflow {
payload,
next_page,
remaining_to_read,
@ -884,7 +922,6 @@ impl BTreeCursor {
CursorResult::IO
}
}
_ => unreachable!(),
};
match res {
CursorResult::Ok(payload) => {
@ -895,7 +932,7 @@ impl BTreeCursor {
reuse_immutable.as_mut().unwrap(),
)?;
}
self.state = CursorState::None;
self.read_overflow_state = None;
Ok(CursorResult::Ok(()))
}
CursorResult::IO => Ok(CursorResult::IO),
@ -1493,6 +1530,7 @@ impl BTreeCursor {
}
/// Move the cursor to the root page of the btree.
#[instrument(skip_all, level = Level::TRACE)]
fn move_to_root(&mut self) {
tracing::trace!("move_to_root({})", self.root_page);
let mem_page = self.read_page(self.root_page).unwrap();
@ -1534,6 +1572,7 @@ impl BTreeCursor {
}
/// Specialized version of move_to() for table btrees.
#[instrument(skip(self), level = Level::TRACE)]
fn tablebtree_move_to(&mut self, rowid: i64, seek_op: SeekOp) -> Result<CursorResult<()>> {
let iter_dir = seek_op.iteration_direction();
'outer: loop {
@ -1546,7 +1585,7 @@ impl BTreeCursor {
}
let cell_count = contents.cell_count();
{
if matches!(self.move_to_state, CursorSeekState::Start) {
let min: isize = 0;
let max: isize = cell_count as isize - 1;
let leftmost_matching_cell = None;
@ -1639,6 +1678,7 @@ impl BTreeCursor {
}
/// Specialized version of move_to() for index btrees.
#[instrument(skip(self, index_key), level = Level::TRACE)]
fn indexbtree_move_to(
&mut self,
index_key: &ImmutableRecord,
@ -1654,7 +1694,7 @@ impl BTreeCursor {
return Ok(CursorResult::Ok(()));
}
{
if matches!(self.move_to_state, CursorSeekState::Start) {
let cell_count = contents.cell_count();
let min: isize = 0;
let max: isize = cell_count as isize - 1;
@ -2175,6 +2215,7 @@ impl BTreeCursor {
}
}
#[instrument(skip_all, level = Level::TRACE)]
pub fn move_to(&mut self, key: SeekKey<'_>, cmp: SeekOp) -> Result<CursorResult<()>> {
assert!(self.mv_cursor.is_none());
tracing::trace!("move_to(key={:?} cmp={:?})", key, cmp);
@ -2250,58 +2291,55 @@ impl BTreeCursor {
));
// find cell
(self.find_cell(page, bkey), page.page_type())
(return_if_io!(self.find_cell(page, bkey)), page.page_type())
};
tracing::debug!("insert_into_page(cell_idx={})", cell_idx);
// if the cell index is less than the total cells, check: if its an existing
// rowid, we are going to update / overwrite the cell
if cell_idx < page.get().get_contents().cell_count() {
match page.get().get_contents().cell_get(
let cell = page.get().get_contents().cell_get(
cell_idx,
payload_overflow_threshold_max(page_type, self.usable_space() as u16),
payload_overflow_threshold_min(page_type, self.usable_space() as u16),
self.usable_space(),
)? {
BTreeCell::TableLeafCell(tbl_leaf) => {
if tbl_leaf._rowid == bkey.to_rowid() {
tracing::debug!("insert_into_page: found exact match with cell_idx={cell_idx}, overwriting");
self.overwrite_cell(page.clone(), cell_idx, record)?;
self.state
.mut_write_info()
.expect("expected write info")
.state = WriteState::Finish;
continue;
)?;
match cell {
BTreeCell::TableLeafCell(tbl_leaf) => {
if tbl_leaf._rowid == bkey.to_rowid() {
tracing::debug!("insert_into_page: found exact match with cell_idx={cell_idx}, overwriting");
self.overwrite_cell(page.clone(), cell_idx, record)?;
self.state
.mut_write_info()
.expect("expected write info")
.state = WriteState::Finish;
continue;
}
}
BTreeCell::IndexLeafCell(..) => {
// Not necessary to read record again here, as find_cell already does that for us
let cmp = compare_immutable(
record.get_values(),
self.get_immutable_record()
.as_ref()
.unwrap()
.get_values(),
self.key_sort_order(),
&self.collations,
);
if cmp == Ordering::Equal {
tracing::debug!("insert_into_page: found exact match with cell_idx={cell_idx}, overwriting");
self.has_record.set(CursorHasRecord::Yes { rowid: self.get_index_rowid_from_record() });
self.overwrite_cell(page.clone(), cell_idx, record)?;
self.state
.mut_write_info()
.expect("expected write info")
.state = WriteState::Finish;
continue;
}
}
other => panic!("unexpected cell type, expected TableLeaf or IndexLeaf, found: {:?}", other),
}
BTreeCell::IndexLeafCell(idx_leaf) => {
read_record(
idx_leaf.payload,
self.get_immutable_record_or_create().as_mut().unwrap(),
)
.expect("failed to read record");
if compare_immutable(
record.get_values(),
self.get_immutable_record()
.as_ref()
.unwrap()
.get_values(),
self.key_sort_order(),
&self.collations,
) == Ordering::Equal {
tracing::debug!("insert_into_page: found exact match with cell_idx={cell_idx}, overwriting");
self.has_record.set(CursorHasRecord::Yes { rowid: self.get_index_rowid_from_record() });
self.overwrite_cell(page.clone(), cell_idx, record)?;
self.state
.mut_write_info()
.expect("expected write info")
.state = WriteState::Finish;
continue;
}
}
other => panic!("unexpected cell type, expected TableLeaf or IndexLeaf, found: {:?}", other),
}
}
// insert cell
let mut cell_payload: Vec<u8> = Vec::with_capacity(record.len() + 4);
@ -3936,13 +3974,15 @@ impl BTreeCursor {
}
/// Find the index of the cell in the page that contains the given rowid.
fn find_cell(&self, page: &PageContent, key: &BTreeKey) -> usize {
let mut cell_idx = 0;
fn find_cell(&mut self, page: &PageContent, key: &BTreeKey) -> Result<CursorResult<usize>> {
if self.find_cell_state.0.is_none() {
self.find_cell_state.set(0);
}
let cell_count = page.cell_count();
while cell_idx < cell_count {
while self.find_cell_state.get_cell_idx() < cell_count {
match page
.cell_get(
cell_idx,
self.find_cell_state.get_cell_idx(),
payload_overflow_threshold_max(page.page_type(), self.usable_space() as u16),
payload_overflow_threshold_min(page.page_type(), self.usable_space() as u16),
self.usable_space(),
@ -3959,15 +3999,24 @@ impl BTreeCursor {
break;
}
}
BTreeCell::IndexInteriorCell(IndexInteriorCell { payload, .. })
| BTreeCell::IndexLeafCell(IndexLeafCell { payload, .. }) => {
BTreeCell::IndexInteriorCell(IndexInteriorCell {
payload,
first_overflow_page,
payload_size,
..
})
| BTreeCell::IndexLeafCell(IndexLeafCell {
payload,
first_overflow_page,
payload_size,
}) => {
// TODO: implement efficient comparison of records
// e.g. https://github.com/sqlite/sqlite/blob/master/src/vdbeaux.c#L4719
read_record(
return_if_io!(self.read_record_w_possible_overflow(
payload,
self.get_immutable_record_or_create().as_mut().unwrap(),
)
.expect("failed to read record");
first_overflow_page,
payload_size,
));
let order = compare_immutable(
key.to_index_key_values(),
self.get_immutable_record().as_ref().unwrap().get_values(),
@ -3982,10 +4031,13 @@ impl BTreeCursor {
}
}
}
cell_idx += 1;
let cell_idx = self.find_cell_state.get_cell_idx();
self.find_cell_state.set(cell_idx + 1);
}
let cell_idx = self.find_cell_state.get_cell_idx();
assert!(cell_idx <= cell_count);
cell_idx
self.find_cell_state.reset();
Ok(CursorResult::Ok(cell_idx))
}
pub fn seek_end(&mut self) -> Result<CursorResult<()>> {
@ -4106,6 +4158,7 @@ impl BTreeCursor {
self.reusable_immutable_record.borrow()
}
#[instrument(skip_all, level = Level::TRACE)]
pub fn insert(
&mut self,
key: &BTreeKey,
@ -4509,7 +4562,8 @@ impl BTreeCursor {
Value::Integer(i) => *i,
_ => unreachable!("btree tables are indexed by integers!"),
};
let cell_idx = self.find_cell(contents, &BTreeKey::new_table_rowid(int_key, None));
let cell_idx =
return_if_io!(self.find_cell(contents, &BTreeKey::new_table_rowid(int_key, None)));
if cell_idx >= contents.cell_count() {
Ok(CursorResult::Ok(false))
} else {

View file

@ -17,10 +17,14 @@ def stub_memory_test(
vals: int = 100,
blobs: bool = True,
):
raise
# zero_blob_size = 1024 **2
zero_blob = "0" * blob_size * 2
# vals = 100
big_stmt = ["CREATE TABLE temp (t1 BLOB, t2 INTEGER);", "CREATE INDEX temp_index ON temp(t1);"]
big_stmt = [
"CREATE TABLE temp (t1 BLOB, t2 INTEGER);",
"CREATE INDEX temp_index ON temp(t1);",
]
big_stmt = big_stmt + [
f"INSERT INTO temp (t1) VALUES (zeroblob({blob_size}));"
if i % 2 == 0 and blobs