mirror of
https://github.com/tursodatabase/limbo.git
synced 2025-12-23 08:21:09 +00:00
Merge 'fix/mvcc: seek() must seek from both mv store and btree' from Jussi Saurio
Closes #4093 for example, upon opening an existing database, all the rows are in the btree, so if we seek only from MV store, we won't find anything. ergo: we must look from both the mv store and the btree. if we are iterating forwards, the smallest of the two results is where we land, and vice versa for backwards iteration. --- this PR was almost entirely written using Claude Code + Opus 4.5, and here is Claude's gangsta rap about the changes: > Can you write a gangsta rap that summarizes everything you changed in this session? I.e. what was fixed and how Here's the audio version generated using Suno: https://suno.com/song/2209deaf-4c59-43c5-b0e9-d807b9805dc0 ⏺ "Dual Seek Chronicles" (verse 1) Yo, the bug was real, seek was half-baked, Only checked MVCC, btree forsaked, User said "look, when both stores got data, Seek gotta check both—fix it now, not later." (pre-chorus) See the cursor got two sources of truth, Btree on disk, MVCC in the booth, After checkpoint, data splits in two, Seek was blind to half—that just won't do. (chorus) Now we seek in MVCC, seek in btree, Pick the winner based on which direction we be, Forwards? Smallest key wins the race, Backwards? Biggest key takes first place. Dual iteration, that's the game we play, seek_btree_and_set_peek saves the day! (verse 2) But wait—SeekResult got three states to check, Found means we good, NotFound means we wreck, TryAdvance though? Cursor's positioned but stuck, Gotta call next() or prev() for luck. Added helper methods, kept the code tight, advance_btree_forward_from_current done right, But then we noticed duplication in the mix, Refactored with a bool—that's the slickest fix. (bridge) _advance_btree_forward(initialize: bool), One function to rule them, that's the protocol, true means rewind if uninitialized state, false means we seeked—just iterate. (verse 3) Dropped six tests in test_transactions.rs, Checkpoint, insert, seek—put it to the test, Interleaved rows, odd in btree, even in MV, Updates and deletes, shadow what you see. Range operations: GT, LT, GE, LE, All forty-three tests pass, we're bug-free, From Uninitialized to Row or Exhausted state, Turso's dual cursor finally running straight. (outro) That's the story of the dual-seek fix, MVCC and btree no longer in conflicts, Ship it to main, let the users feast, Cursor seeking both stores—east to west, west to east. 🎤✨ EDIT: After merge of #4095 , I refactored this PR to use state machines instead of blocking IO as well. Initial implementation was all Claude Code + Opus 4.5 there too, but the state machine code was FAR too complex so I manually cleaned up and simplified it. Closes #4094
This commit is contained in:
commit
6da0895813
4 changed files with 707 additions and 114 deletions
|
|
@ -69,12 +69,31 @@ enum PrevState {
|
|||
Advance,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum SeekBtreeState {
|
||||
/// Seeking in btree (MVCC seek already done)
|
||||
SeekBtree,
|
||||
/// Advance to next key in btree (if we got [SeekResult::TryAdvance], or the current row is shadowed by MVCC)
|
||||
AdvanceBTree,
|
||||
/// Check if current row is visible (not shadowed by MVCC)
|
||||
CheckRow,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum SeekState {
|
||||
/// Seeking in btree (MVCC seek already done)
|
||||
SeekBtree(SeekBtreeState),
|
||||
/// Pick winner and finalize
|
||||
PickWinner,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum MvccLazyCursorState {
|
||||
Next(NextState),
|
||||
Prev(PrevState),
|
||||
Rewind(RewindState),
|
||||
Exists(ExistsState),
|
||||
Seek(SeekState, IterationDirection),
|
||||
}
|
||||
|
||||
/// We read rows from MVCC index or BTree in a dual-cursor approach.
|
||||
|
|
@ -404,6 +423,15 @@ impl<Clock: LogicalClock + 'static> MvccLazyCursor<Clock> {
|
|||
|
||||
/// Advance btree cursor forward and set btree peek to the first valid row key (skipping rows shadowed by MVCC)
|
||||
fn advance_btree_forward(&mut self) -> Result<IOResult<()>> {
|
||||
self._advance_btree_forward(true)
|
||||
}
|
||||
|
||||
/// Advance btree cursor forward from current position (cursor already positioned by seek)
|
||||
fn advance_btree_forward_from_current(&mut self) -> Result<IOResult<()>> {
|
||||
self._advance_btree_forward(false)
|
||||
}
|
||||
|
||||
fn _advance_btree_forward(&mut self, initialize: bool) -> Result<IOResult<()>> {
|
||||
loop {
|
||||
let mut state = self.btree_advance_state.borrow_mut();
|
||||
match state.as_mut() {
|
||||
|
|
@ -414,9 +442,8 @@ impl<Clock: LogicalClock + 'static> MvccLazyCursor<Clock> {
|
|||
*state = None;
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
let peek = self.dual_peek.borrow();
|
||||
// If the btree is uninitialized, do the equivalent of rewind() to find the first valid row
|
||||
if peek.btree_uninitialized() {
|
||||
// If the btree is uninitialized AND we should initialize, do the equivalent of rewind() to find the first valid row
|
||||
if initialize && self.dual_peek.borrow().btree_uninitialized() {
|
||||
return_if_io!(self.btree_cursor.rewind());
|
||||
*state = Some(AdvanceBtreeState::RewindCheckBtreeKey);
|
||||
} else {
|
||||
|
|
@ -477,6 +504,15 @@ impl<Clock: LogicalClock + 'static> MvccLazyCursor<Clock> {
|
|||
|
||||
/// Advance btree cursor backward and set btree peek to the first valid row key (skipping rows shadowed by MVCC)
|
||||
fn advance_btree_backward(&mut self) -> Result<IOResult<()>> {
|
||||
self._advance_btree_backward(true)
|
||||
}
|
||||
|
||||
/// Advance btree cursor backward from current position (cursor already positioned by seek)
|
||||
fn advance_btree_backward_from_current(&mut self) -> Result<IOResult<()>> {
|
||||
self._advance_btree_backward(false)
|
||||
}
|
||||
|
||||
fn _advance_btree_backward(&mut self, initialize: bool) -> Result<IOResult<()>> {
|
||||
loop {
|
||||
let mut state = self.btree_advance_state.borrow_mut();
|
||||
match state.as_mut() {
|
||||
|
|
@ -488,8 +524,9 @@ impl<Clock: LogicalClock + 'static> MvccLazyCursor<Clock> {
|
|||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
let peek = self.dual_peek.borrow();
|
||||
// If the btree is uninitialized, do the equivalent of last() to find the last valid row
|
||||
if peek.btree_uninitialized() {
|
||||
// If the btree is uninitialized AND we should initialize, do the equivalent of last() to find the last valid row
|
||||
if initialize && peek.btree_uninitialized() {
|
||||
drop(peek);
|
||||
return_if_io!(self.btree_cursor.last());
|
||||
*state = Some(AdvanceBtreeState::RewindCheckBtreeKey);
|
||||
} else {
|
||||
|
|
@ -602,6 +639,96 @@ impl<Clock: LogicalClock + 'static> MvccLazyCursor<Clock> {
|
|||
});
|
||||
}
|
||||
|
||||
/// Seek btree cursor and set btree_peek to the result.
|
||||
/// Skips rows that are shadowed by MVCC.
|
||||
/// Returns IOResult indicating if we need to yield for IO or are done.
|
||||
fn seek_btree_and_set_peek(
|
||||
&mut self,
|
||||
seek_key: SeekKey<'_>,
|
||||
op: SeekOp,
|
||||
) -> Result<IOResult<()>> {
|
||||
// Fast path: btree not allocated
|
||||
if !self.is_btree_allocated() {
|
||||
let mut peek = self.dual_peek.borrow_mut();
|
||||
peek.btree_peek = CursorPeek::Exhausted;
|
||||
self.state.replace(None);
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
|
||||
loop {
|
||||
let Some(MvccLazyCursorState::Seek(SeekState::SeekBtree(btree_seek_state), direction)) =
|
||||
self.state.borrow().clone()
|
||||
else {
|
||||
panic!(
|
||||
"Invalid btree seek state in seek_btree_and_set_peek: {:?}",
|
||||
self.state.borrow()
|
||||
);
|
||||
};
|
||||
match btree_seek_state {
|
||||
SeekBtreeState::SeekBtree => {
|
||||
let seek_result = return_if_io!(self.btree_cursor.seek(seek_key.clone(), op));
|
||||
|
||||
match seek_result {
|
||||
SeekResult::NotFound => {
|
||||
let mut peek = self.dual_peek.borrow_mut();
|
||||
peek.btree_peek = CursorPeek::Exhausted;
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
SeekResult::TryAdvance => {
|
||||
// Need to advance to find actual matching entry
|
||||
self.state.replace(Some(MvccLazyCursorState::Seek(
|
||||
SeekState::SeekBtree(SeekBtreeState::AdvanceBTree),
|
||||
direction,
|
||||
)));
|
||||
}
|
||||
SeekResult::Found => {
|
||||
self.state.replace(Some(MvccLazyCursorState::Seek(
|
||||
SeekState::SeekBtree(SeekBtreeState::CheckRow),
|
||||
direction,
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
SeekBtreeState::AdvanceBTree => {
|
||||
return_if_io!(match direction {
|
||||
IterationDirection::Forwards => {
|
||||
self.advance_btree_forward_from_current()
|
||||
}
|
||||
IterationDirection::Backwards => {
|
||||
self.advance_btree_backward_from_current()
|
||||
}
|
||||
});
|
||||
self.state.replace(Some(MvccLazyCursorState::Seek(
|
||||
SeekState::SeekBtree(SeekBtreeState::CheckRow),
|
||||
direction,
|
||||
)));
|
||||
}
|
||||
SeekBtreeState::CheckRow => {
|
||||
let key = self.get_btree_current_key()?;
|
||||
match key {
|
||||
Some(k) if self.query_btree_version_is_valid(&k) => {
|
||||
let mut peek = self.dual_peek.borrow_mut();
|
||||
peek.btree_peek = CursorPeek::Row(k);
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
Some(_) => {
|
||||
// shadowed by MVCC, continue to next
|
||||
self.state.replace(Some(MvccLazyCursorState::Seek(
|
||||
SeekState::SeekBtree(SeekBtreeState::AdvanceBTree),
|
||||
direction,
|
||||
)));
|
||||
}
|
||||
None => {
|
||||
let mut peek = self.dual_peek.borrow_mut();
|
||||
peek.btree_peek = CursorPeek::Exhausted;
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialize MVCC iterator for forward iteration (used when next() is called without rewind())
|
||||
fn init_mvcc_iterator_forward(&mut self) {
|
||||
if self.table_iterator.is_some() || self.index_iterator.is_some() {
|
||||
|
|
@ -932,113 +1059,157 @@ impl<Clock: LogicalClock + 'static> CursorTrait for MvccLazyCursor<Clock> {
|
|||
// ge -> lower_bound bound included, we want first row equal to row_id or first row after row_id
|
||||
// lt -> upper_bound bound excluded, we want last row before row_id
|
||||
// le -> upper_bound bound included, we want last row equal to row_id or first row before row_id
|
||||
match seek_key {
|
||||
SeekKey::TableRowId(row_id) => {
|
||||
let rowid = RowID {
|
||||
table_id: self.table_id,
|
||||
row_id: RowKey::Int(row_id),
|
||||
};
|
||||
let inclusive = match op {
|
||||
SeekOp::GT => false,
|
||||
SeekOp::GE { eq_only: _ } => true,
|
||||
SeekOp::LT => false,
|
||||
SeekOp::LE { eq_only: _ } => true,
|
||||
};
|
||||
self.invalidate_record();
|
||||
let found_rowid = self.db.seek_rowid(
|
||||
rowid.clone(),
|
||||
inclusive,
|
||||
op.iteration_direction(),
|
||||
self.tx_id,
|
||||
&mut self.table_iterator,
|
||||
);
|
||||
if let Some(found_rowid) = found_rowid {
|
||||
self.current_pos.replace(CursorPosition::Loaded {
|
||||
row_id: found_rowid.clone(),
|
||||
in_btree: false,
|
||||
});
|
||||
if op.eq_only() {
|
||||
if found_rowid.row_id == rowid.row_id {
|
||||
Ok(IOResult::Done(SeekResult::Found))
|
||||
} else {
|
||||
Ok(IOResult::Done(SeekResult::NotFound))
|
||||
|
||||
loop {
|
||||
let state = self.state.borrow().clone();
|
||||
match state {
|
||||
None => {
|
||||
// Initial state: Reset and do MVCC seek
|
||||
let _ = self.table_iterator.take();
|
||||
let _ = self.index_iterator.take();
|
||||
self.reset_dual_peek();
|
||||
self.invalidate_record();
|
||||
|
||||
let direction = op.iteration_direction();
|
||||
let inclusive = matches!(op, SeekOp::GE { .. } | SeekOp::LE { .. });
|
||||
|
||||
match &seek_key {
|
||||
SeekKey::TableRowId(row_id) => {
|
||||
let rowid = RowID {
|
||||
table_id: self.table_id,
|
||||
row_id: RowKey::Int(*row_id),
|
||||
};
|
||||
|
||||
// Seek in MVCC (synchronous)
|
||||
let mvcc_rowid = self.db.seek_rowid(
|
||||
rowid.clone(),
|
||||
inclusive,
|
||||
direction,
|
||||
self.tx_id,
|
||||
&mut self.table_iterator,
|
||||
);
|
||||
|
||||
// Set MVCC peek
|
||||
{
|
||||
let mut peek = self.dual_peek.borrow_mut();
|
||||
peek.mvcc_peek = match &mvcc_rowid {
|
||||
Some(rid) => CursorPeek::Row(rid.row_id.clone()),
|
||||
None => CursorPeek::Exhausted,
|
||||
};
|
||||
}
|
||||
}
|
||||
SeekKey::IndexKey(index_key) => {
|
||||
let index_info = {
|
||||
let MvccCursorType::Index(index_info) = &self.mv_cursor_type else {
|
||||
panic!("SeekKey::IndexKey requires Index cursor type");
|
||||
};
|
||||
Arc::new(IndexInfo {
|
||||
key_info: index_info.key_info.clone(),
|
||||
has_rowid: index_info.has_rowid,
|
||||
num_cols: index_key.column_count(),
|
||||
})
|
||||
};
|
||||
let sortable_key =
|
||||
SortableIndexKey::new_from_record((*index_key).clone(), index_info);
|
||||
|
||||
// Seek in MVCC (synchronous)
|
||||
let mvcc_rowid = self.db.seek_index(
|
||||
self.table_id,
|
||||
sortable_key.clone(),
|
||||
inclusive,
|
||||
direction,
|
||||
self.tx_id,
|
||||
&mut self.index_iterator,
|
||||
);
|
||||
|
||||
// Set MVCC peek
|
||||
{
|
||||
let mut peek = self.dual_peek.borrow_mut();
|
||||
peek.mvcc_peek = match &mvcc_rowid {
|
||||
Some(rid) => CursorPeek::Row(rid.row_id.clone()),
|
||||
None => CursorPeek::Exhausted,
|
||||
};
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Ok(IOResult::Done(SeekResult::Found))
|
||||
}
|
||||
} else {
|
||||
let forwards = matches!(op, SeekOp::GE { eq_only: _ } | SeekOp::GT);
|
||||
if forwards {
|
||||
let _ = self.last()?;
|
||||
} else {
|
||||
let _ = self.rewind()?;
|
||||
}
|
||||
Ok(IOResult::Done(SeekResult::NotFound))
|
||||
|
||||
// Move to btree seek state
|
||||
self.state.replace(Some(MvccLazyCursorState::Seek(
|
||||
SeekState::SeekBtree(SeekBtreeState::SeekBtree),
|
||||
direction,
|
||||
)));
|
||||
}
|
||||
}
|
||||
SeekKey::IndexKey(index_key) => {
|
||||
// TODO: we should seek in both btree and mvcc
|
||||
let index_info = {
|
||||
let MvccCursorType::Index(index_info) = &self.mv_cursor_type else {
|
||||
panic!("SeekKey::IndexKey requires Index cursor type");
|
||||
};
|
||||
Arc::new(IndexInfo {
|
||||
key_info: index_info.key_info.clone(),
|
||||
has_rowid: index_info.has_rowid,
|
||||
num_cols: index_key.column_count(),
|
||||
})
|
||||
};
|
||||
let key_info = index_info
|
||||
.key_info
|
||||
.iter()
|
||||
.take(index_key.column_count())
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
let sortable_key = SortableIndexKey::new_from_record(index_key.clone(), index_info);
|
||||
let inclusive = match op {
|
||||
SeekOp::GT => false,
|
||||
SeekOp::GE { eq_only: _ } => true,
|
||||
SeekOp::LT => false,
|
||||
SeekOp::LE { eq_only: _ } => true,
|
||||
};
|
||||
let found_rowid = self.db.seek_index(
|
||||
self.table_id,
|
||||
sortable_key.clone(),
|
||||
inclusive,
|
||||
op.iteration_direction(),
|
||||
self.tx_id,
|
||||
&mut self.index_iterator,
|
||||
);
|
||||
if let Some(found_rowid) = found_rowid {
|
||||
self.current_pos.replace(CursorPosition::Loaded {
|
||||
row_id: found_rowid.clone(),
|
||||
in_btree: false,
|
||||
});
|
||||
if op.eq_only() {
|
||||
let RowKey::Record(found_rowid_key) = found_rowid.row_id else {
|
||||
panic!("Found rowid is not a record");
|
||||
};
|
||||
let cmp = compare_immutable(
|
||||
index_key.get_values(),
|
||||
found_rowid_key.key.get_values(),
|
||||
&key_info,
|
||||
);
|
||||
if cmp.is_eq() {
|
||||
Ok(IOResult::Done(SeekResult::Found))
|
||||
Some(MvccLazyCursorState::Seek(SeekState::SeekBtree(_), direction)) => {
|
||||
return_if_io!(self.seek_btree_and_set_peek(seek_key.clone(), op));
|
||||
self.state.replace(Some(MvccLazyCursorState::Seek(
|
||||
SeekState::PickWinner,
|
||||
direction,
|
||||
)));
|
||||
}
|
||||
Some(MvccLazyCursorState::Seek(SeekState::PickWinner, direction)) => {
|
||||
// Pick winner and return result
|
||||
// Now pick the winner based on direction
|
||||
let winner = self.dual_peek.borrow().get_next(direction);
|
||||
|
||||
// Clear seek state
|
||||
self.state.replace(None);
|
||||
|
||||
if let Some((winner_key, in_btree)) = winner {
|
||||
self.current_pos.replace(CursorPosition::Loaded {
|
||||
row_id: RowID {
|
||||
table_id: self.table_id,
|
||||
row_id: winner_key.clone(),
|
||||
},
|
||||
in_btree,
|
||||
});
|
||||
|
||||
if op.eq_only() {
|
||||
// Check if the winner matches the seek key
|
||||
let found = match &seek_key {
|
||||
SeekKey::TableRowId(row_id) => winner_key == RowKey::Int(*row_id),
|
||||
SeekKey::IndexKey(index_key) => {
|
||||
let RowKey::Record(found_key) = &winner_key else {
|
||||
panic!("Found rowid is not a record");
|
||||
};
|
||||
let MvccCursorType::Index(index_info) = &self.mv_cursor_type
|
||||
else {
|
||||
panic!("Index cursor expected");
|
||||
};
|
||||
let key_info: Vec<_> = index_info
|
||||
.key_info
|
||||
.iter()
|
||||
.take(index_key.column_count())
|
||||
.cloned()
|
||||
.collect();
|
||||
let cmp = compare_immutable(
|
||||
index_key.get_values(),
|
||||
found_key.key.get_values(),
|
||||
&key_info,
|
||||
);
|
||||
cmp.is_eq()
|
||||
}
|
||||
};
|
||||
if found {
|
||||
return Ok(IOResult::Done(SeekResult::Found));
|
||||
} else {
|
||||
return Ok(IOResult::Done(SeekResult::NotFound));
|
||||
}
|
||||
} else {
|
||||
Ok(IOResult::Done(SeekResult::NotFound))
|
||||
return Ok(IOResult::Done(SeekResult::Found));
|
||||
}
|
||||
} else {
|
||||
Ok(IOResult::Done(SeekResult::Found))
|
||||
// Nothing found in either cursor
|
||||
let forwards = matches!(op, SeekOp::GE { .. } | SeekOp::GT);
|
||||
if forwards {
|
||||
self.current_pos.replace(CursorPosition::End);
|
||||
} else {
|
||||
self.current_pos.replace(CursorPosition::BeforeFirst);
|
||||
}
|
||||
return Ok(IOResult::Done(SeekResult::NotFound));
|
||||
}
|
||||
} else {
|
||||
let forwards = matches!(op, SeekOp::GE { eq_only: _ } | SeekOp::GT);
|
||||
if forwards {
|
||||
let _ = self.last()?;
|
||||
} else {
|
||||
let _ = self.rewind()?;
|
||||
}
|
||||
Ok(IOResult::Done(SeekResult::NotFound))
|
||||
}
|
||||
_ => {
|
||||
panic!("Invalid state in seek: {:?}", self.state.borrow());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2632,7 +2632,7 @@ macro_rules! return_and_restore_if_io {
|
|||
};
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
#[derive(Debug, PartialEq, Clone, Copy)]
|
||||
pub enum SeekResult {
|
||||
/// Record matching the [SeekOp] found in the B-tree and cursor was positioned to point onto that record
|
||||
Found,
|
||||
|
|
|
|||
|
|
@ -51,10 +51,13 @@ mod fuzz_tests {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: mvcc fuzz failure
|
||||
// INTEGER PRIMARY KEY is a rowid alias, so an index is not created
|
||||
#[turso_macros::test(init_sql = "CREATE TABLE t (x INTEGER PRIMARY KEY autoincrement)")]
|
||||
#[turso_macros::test(
|
||||
mvcc,
|
||||
init_sql = "CREATE TABLE t (x INTEGER PRIMARY KEY autoincrement)"
|
||||
)]
|
||||
pub fn rowid_seek_fuzz(db: TempDatabase) {
|
||||
let _ = tracing_subscriber::fmt::try_init();
|
||||
let sqlite_conn = rusqlite::Connection::open(db.path.clone()).unwrap();
|
||||
|
||||
let (mut rng, _seed) = rng_from_time_or_env();
|
||||
|
|
@ -91,7 +94,7 @@ mod fuzz_tests {
|
|||
tracing::info!("rowid_seek_fuzz seed: {}", seed);
|
||||
|
||||
for iteration in 0..2 {
|
||||
tracing::trace!("rowid_seek_fuzz iteration: {}", iteration);
|
||||
tracing::info!("rowid_seek_fuzz iteration: {}", iteration);
|
||||
|
||||
for comp in COMPARISONS.iter() {
|
||||
for order_by in ORDER_BY.iter() {
|
||||
|
|
@ -105,7 +108,7 @@ mod fuzz_tests {
|
|||
order_by.unwrap_or("")
|
||||
);
|
||||
|
||||
log::trace!("query: {query}");
|
||||
tracing::info!("query: {query}");
|
||||
let limbo_result = limbo_exec_rows(&db, &limbo_conn, &query);
|
||||
let sqlite_result = sqlite_exec_rows(&sqlite_conn, &query);
|
||||
assert_eq!(
|
||||
|
|
@ -166,8 +169,7 @@ mod fuzz_tests {
|
|||
values
|
||||
}
|
||||
|
||||
// TODO: mvcc indexes
|
||||
#[turso_macros::test(init_sql = "CREATE TABLE t (x PRIMARY KEY)")]
|
||||
#[turso_macros::test(mvcc, init_sql = "CREATE TABLE t (x PRIMARY KEY)")]
|
||||
pub fn index_scan_fuzz(db: TempDatabase) {
|
||||
let sqlite_conn = rusqlite::Connection::open(db.path.clone()).unwrap();
|
||||
|
||||
|
|
@ -212,8 +214,7 @@ mod fuzz_tests {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: mvcc indexes
|
||||
#[turso_macros::test()]
|
||||
#[turso_macros::test(mvcc)]
|
||||
/// A test for verifying that index seek+scan works correctly for compound keys
|
||||
/// on indexes with various column orderings.
|
||||
pub fn index_scan_compound_key_fuzz(db: TempDatabase) {
|
||||
|
|
|
|||
|
|
@ -1220,3 +1220,424 @@ fn test_mvcc_checkpoint_before_insert_delete_after_checkpoint() {
|
|||
|
||||
verify_table_contents(&conn, vec![1, 4]);
|
||||
}
|
||||
|
||||
// Tests for dual-iteration seek: verifying that seek works correctly
|
||||
// when there are rows in both btree (after checkpoint) and MVCC store.
|
||||
|
||||
/// Test table seek (WHERE rowid = x) with rows in both btree and MVCC.
|
||||
/// After checkpoint, rows 1-5 are in btree. Then we insert 6-10 into MVCC.
|
||||
/// Seeking for various rowids should find them in the correct location.
|
||||
#[test]
|
||||
fn test_mvcc_dual_seek_table_rowid_basic() {
|
||||
let tmp_db = TempDatabase::new_with_opts(
|
||||
"test_mvcc_dual_seek_table_rowid_basic.db",
|
||||
turso_core::DatabaseOpts::new().with_mvcc(true),
|
||||
);
|
||||
let conn = tmp_db.connect_limbo();
|
||||
|
||||
// Create table and insert initial rows
|
||||
execute_and_log(&conn, "CREATE TABLE t (x INTEGER PRIMARY KEY, v TEXT)").unwrap();
|
||||
for i in 1..=5 {
|
||||
execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i}, 'btree_{i}')")).unwrap();
|
||||
}
|
||||
|
||||
// Checkpoint to move rows to btree
|
||||
execute_and_log(&conn, "PRAGMA wal_checkpoint(TRUNCATE)").unwrap();
|
||||
|
||||
let path = tmp_db.path.clone();
|
||||
drop(conn);
|
||||
drop(tmp_db);
|
||||
|
||||
// Reopen to ensure btree is populated and MV store is empty
|
||||
let tmp_db = TempDatabase::new_with_existent_with_opts(
|
||||
&path,
|
||||
turso_core::DatabaseOpts::new().with_mvcc(true),
|
||||
);
|
||||
let conn = tmp_db.connect_limbo();
|
||||
|
||||
// Insert more rows into MVCC
|
||||
for i in 6..=10 {
|
||||
execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i}, 'mvcc_{i}')")).unwrap();
|
||||
}
|
||||
|
||||
// Seek for a row in btree
|
||||
let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 3")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let row = helper_read_single_row(stmt);
|
||||
assert_eq!(row, vec![Value::build_text("btree_3")]);
|
||||
|
||||
// Seek for a row in MVCC
|
||||
let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 8")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let row = helper_read_single_row(stmt);
|
||||
assert_eq!(row, vec![Value::build_text("mvcc_8")]);
|
||||
|
||||
// Seek for first row (btree)
|
||||
let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 1")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let row = helper_read_single_row(stmt);
|
||||
assert_eq!(row, vec![Value::build_text("btree_1")]);
|
||||
|
||||
// Seek for last row (MVCC)
|
||||
let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 10")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let row = helper_read_single_row(stmt);
|
||||
assert_eq!(row, vec![Value::build_text("mvcc_10")]);
|
||||
|
||||
// Seek for non-existent row
|
||||
let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 100")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let rows = helper_read_all_rows(stmt);
|
||||
assert!(rows.is_empty());
|
||||
}
|
||||
|
||||
/// Test seek with interleaved rows in btree and MVCC.
|
||||
/// Btree has odd numbers (1,3,5,7,9), MVCC has even numbers (2,4,6,8,10).
|
||||
#[test]
|
||||
fn test_mvcc_dual_seek_interleaved_rows() {
|
||||
let tmp_db = TempDatabase::new_with_opts(
|
||||
"test_mvcc_dual_seek_interleaved_rows.db",
|
||||
turso_core::DatabaseOpts::new().with_mvcc(true),
|
||||
);
|
||||
let conn = tmp_db.connect_limbo();
|
||||
|
||||
// Create table and insert odd rows
|
||||
execute_and_log(&conn, "CREATE TABLE t (x INTEGER PRIMARY KEY, v TEXT)").unwrap();
|
||||
for i in [1, 3, 5, 7, 9] {
|
||||
execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i}, 'btree_{i}')")).unwrap();
|
||||
}
|
||||
|
||||
// Checkpoint to move rows to btree
|
||||
execute_and_log(&conn, "PRAGMA wal_checkpoint(TRUNCATE)").unwrap();
|
||||
|
||||
let path = tmp_db.path.clone();
|
||||
drop(conn);
|
||||
drop(tmp_db);
|
||||
|
||||
// Reopen
|
||||
let tmp_db = TempDatabase::new_with_existent_with_opts(
|
||||
&path,
|
||||
turso_core::DatabaseOpts::new().with_mvcc(true),
|
||||
);
|
||||
let conn = tmp_db.connect_limbo();
|
||||
|
||||
// Insert even rows into MVCC
|
||||
for i in [2, 4, 6, 8, 10] {
|
||||
execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i}, 'mvcc_{i}')")).unwrap();
|
||||
}
|
||||
|
||||
// Full table scan should return all rows in order
|
||||
let stmt = query_and_log(&conn, "SELECT x, v FROM t ORDER BY x")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let rows = helper_read_all_rows(stmt);
|
||||
assert_eq!(rows.len(), 10);
|
||||
for (i, row) in rows.iter().enumerate() {
|
||||
let expected_x = (i + 1) as i64;
|
||||
assert_eq!(row[0], Value::Integer(expected_x));
|
||||
let expected_source = if expected_x % 2 == 1 { "btree" } else { "mvcc" };
|
||||
assert_eq!(
|
||||
row[1],
|
||||
Value::build_text(format!("{expected_source}_{expected_x}"))
|
||||
);
|
||||
}
|
||||
|
||||
// Seek for btree row
|
||||
let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 5")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let row = helper_read_single_row(stmt);
|
||||
assert_eq!(row, vec![Value::build_text("btree_5")]);
|
||||
|
||||
// Seek for MVCC row
|
||||
let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 6")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let row = helper_read_single_row(stmt);
|
||||
assert_eq!(row, vec![Value::build_text("mvcc_6")]);
|
||||
}
|
||||
|
||||
/// Test index seek with rows in both btree and MVCC.
|
||||
#[test]
|
||||
fn test_mvcc_dual_seek_index_basic() {
|
||||
let tmp_db = TempDatabase::new_with_opts(
|
||||
"test_mvcc_dual_seek_index_basic.db",
|
||||
turso_core::DatabaseOpts::new()
|
||||
.with_mvcc(true)
|
||||
.with_indexes(true),
|
||||
);
|
||||
let conn = tmp_db.connect_limbo();
|
||||
|
||||
// Create table with index
|
||||
execute_and_log(&conn, "CREATE TABLE t (x INTEGER, v TEXT)").unwrap();
|
||||
execute_and_log(&conn, "CREATE INDEX idx_x ON t(x)").unwrap();
|
||||
|
||||
// Insert initial rows
|
||||
for i in 1..=5 {
|
||||
execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i}, 'btree_{i}')")).unwrap();
|
||||
}
|
||||
|
||||
// Checkpoint
|
||||
execute_and_log(&conn, "PRAGMA wal_checkpoint(TRUNCATE)").unwrap();
|
||||
|
||||
let path = tmp_db.path.clone();
|
||||
drop(conn);
|
||||
drop(tmp_db);
|
||||
|
||||
// Reopen
|
||||
let tmp_db = TempDatabase::new_with_existent_with_opts(
|
||||
&path,
|
||||
turso_core::DatabaseOpts::new()
|
||||
.with_mvcc(true)
|
||||
.with_indexes(true),
|
||||
);
|
||||
let conn = tmp_db.connect_limbo();
|
||||
|
||||
// Insert more rows into MVCC
|
||||
for i in 6..=10 {
|
||||
execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i}, 'mvcc_{i}')")).unwrap();
|
||||
}
|
||||
|
||||
// Index seek for btree row
|
||||
let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 3")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let row = helper_read_single_row(stmt);
|
||||
assert_eq!(row, vec![Value::build_text("btree_3")]);
|
||||
|
||||
// Index seek for MVCC row
|
||||
let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 8")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let row = helper_read_single_row(stmt);
|
||||
assert_eq!(row, vec![Value::build_text("mvcc_8")]);
|
||||
|
||||
// Range scan should return all matching rows in order
|
||||
let stmt = query_and_log(&conn, "SELECT x FROM t WHERE x >= 4 AND x <= 7 ORDER BY x")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let rows = helper_read_all_rows(stmt);
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![Value::Integer(4)],
|
||||
vec![Value::Integer(5)],
|
||||
vec![Value::Integer(6)],
|
||||
vec![Value::Integer(7)],
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
/// Test seek with updates: row exists in btree but is updated in MVCC.
|
||||
/// The seek should find the MVCC version (which shadows btree).
|
||||
#[test]
|
||||
fn test_mvcc_dual_seek_with_update() {
|
||||
let tmp_db = TempDatabase::new_with_opts(
|
||||
"test_mvcc_dual_seek_with_update.db",
|
||||
turso_core::DatabaseOpts::new().with_mvcc(true),
|
||||
);
|
||||
let conn = tmp_db.connect_limbo();
|
||||
|
||||
// Create table and insert rows
|
||||
execute_and_log(&conn, "CREATE TABLE t (x INTEGER PRIMARY KEY, v TEXT)").unwrap();
|
||||
for i in 1..=5 {
|
||||
execute_and_log(
|
||||
&conn,
|
||||
&format!("INSERT INTO t VALUES ({i}, 'original_{i}')"),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Checkpoint
|
||||
execute_and_log(&conn, "PRAGMA wal_checkpoint(TRUNCATE)").unwrap();
|
||||
|
||||
let path = tmp_db.path.clone();
|
||||
drop(conn);
|
||||
drop(tmp_db);
|
||||
|
||||
// Reopen
|
||||
let tmp_db = TempDatabase::new_with_existent_with_opts(
|
||||
&path,
|
||||
turso_core::DatabaseOpts::new().with_mvcc(true),
|
||||
);
|
||||
let conn = tmp_db.connect_limbo();
|
||||
|
||||
// Update row 3 (creates MVCC version that shadows btree)
|
||||
execute_and_log(&conn, "UPDATE t SET v = 'updated_3' WHERE x = 3").unwrap();
|
||||
|
||||
// Seek for updated row should return MVCC version
|
||||
let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 3")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let row = helper_read_single_row(stmt);
|
||||
assert_eq!(row, vec![Value::build_text("updated_3")]);
|
||||
|
||||
// Seek for non-updated row should still work
|
||||
let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 2")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let row = helper_read_single_row(stmt);
|
||||
assert_eq!(row, vec![Value::build_text("original_2")]);
|
||||
|
||||
// Full scan should show correct values
|
||||
let stmt = query_and_log(&conn, "SELECT x, v FROM t ORDER BY x")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let rows = helper_read_all_rows(stmt);
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![Value::Integer(1), Value::build_text("original_1")],
|
||||
vec![Value::Integer(2), Value::build_text("original_2")],
|
||||
vec![Value::Integer(3), Value::build_text("updated_3")],
|
||||
vec![Value::Integer(4), Value::build_text("original_4")],
|
||||
vec![Value::Integer(5), Value::build_text("original_5")],
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
/// Test seek with delete: row exists in btree but is deleted in MVCC.
|
||||
/// The seek should NOT find the deleted row.
|
||||
#[test]
|
||||
fn test_mvcc_dual_seek_with_delete() {
|
||||
let tmp_db = TempDatabase::new_with_opts(
|
||||
"test_mvcc_dual_seek_with_delete.db",
|
||||
turso_core::DatabaseOpts::new().with_mvcc(true),
|
||||
);
|
||||
let conn = tmp_db.connect_limbo();
|
||||
|
||||
// Create table and insert rows
|
||||
execute_and_log(&conn, "CREATE TABLE t (x INTEGER PRIMARY KEY, v TEXT)").unwrap();
|
||||
for i in 1..=5 {
|
||||
execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i}, 'value_{i}')")).unwrap();
|
||||
}
|
||||
|
||||
// Checkpoint
|
||||
execute_and_log(&conn, "PRAGMA wal_checkpoint(TRUNCATE)").unwrap();
|
||||
|
||||
let path = tmp_db.path.clone();
|
||||
drop(conn);
|
||||
drop(tmp_db);
|
||||
|
||||
// Reopen
|
||||
let tmp_db = TempDatabase::new_with_existent_with_opts(
|
||||
&path,
|
||||
turso_core::DatabaseOpts::new().with_mvcc(true),
|
||||
);
|
||||
let conn = tmp_db.connect_limbo();
|
||||
|
||||
// Delete row 3
|
||||
execute_and_log(&conn, "DELETE FROM t WHERE x = 3").unwrap();
|
||||
|
||||
// Seek for deleted row should return nothing
|
||||
let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 3")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let rows = helper_read_all_rows(stmt);
|
||||
assert!(rows.is_empty());
|
||||
|
||||
// Seek for non-deleted row should still work
|
||||
let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 2")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let row = helper_read_single_row(stmt);
|
||||
assert_eq!(row, vec![Value::build_text("value_2")]);
|
||||
|
||||
// Full scan should not include deleted row
|
||||
let stmt = query_and_log(&conn, "SELECT x FROM t ORDER BY x")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let rows = helper_read_all_rows(stmt);
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![Value::Integer(1)],
|
||||
vec![Value::Integer(2)],
|
||||
vec![Value::Integer(4)],
|
||||
vec![Value::Integer(5)],
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
/// Test range seek (GT, LT operations) with dual iteration.
|
||||
#[test]
|
||||
fn test_mvcc_dual_seek_range_operations() {
|
||||
let tmp_db = TempDatabase::new_with_opts(
|
||||
"test_mvcc_dual_seek_range_operations.db",
|
||||
turso_core::DatabaseOpts::new().with_mvcc(true),
|
||||
);
|
||||
let conn = tmp_db.connect_limbo();
|
||||
|
||||
// Create table and insert rows
|
||||
execute_and_log(&conn, "CREATE TABLE t (x INTEGER PRIMARY KEY)").unwrap();
|
||||
for i in [1, 3, 5] {
|
||||
execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i})")).unwrap();
|
||||
}
|
||||
|
||||
// Checkpoint
|
||||
execute_and_log(&conn, "PRAGMA wal_checkpoint(TRUNCATE)").unwrap();
|
||||
|
||||
let path = tmp_db.path.clone();
|
||||
drop(conn);
|
||||
drop(tmp_db);
|
||||
|
||||
// Reopen
|
||||
let tmp_db = TempDatabase::new_with_existent_with_opts(
|
||||
&path,
|
||||
turso_core::DatabaseOpts::new().with_mvcc(true),
|
||||
);
|
||||
let conn = tmp_db.connect_limbo();
|
||||
|
||||
// Insert more rows into MVCC
|
||||
for i in [2, 4, 6] {
|
||||
execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i})")).unwrap();
|
||||
}
|
||||
|
||||
// Range: x > 2 (should include 3,4,5,6)
|
||||
let stmt = query_and_log(&conn, "SELECT x FROM t WHERE x > 2 ORDER BY x")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let rows = helper_read_all_rows(stmt);
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![Value::Integer(3)],
|
||||
vec![Value::Integer(4)],
|
||||
vec![Value::Integer(5)],
|
||||
vec![Value::Integer(6)],
|
||||
]
|
||||
);
|
||||
|
||||
// Range: x < 4 (should include 1,2,3)
|
||||
let stmt = query_and_log(&conn, "SELECT x FROM t WHERE x < 4 ORDER BY x")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let rows = helper_read_all_rows(stmt);
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![Value::Integer(1)],
|
||||
vec![Value::Integer(2)],
|
||||
vec![Value::Integer(3)],
|
||||
]
|
||||
);
|
||||
|
||||
// Range: x >= 3 AND x <= 5 (should include 3,4,5)
|
||||
let stmt = query_and_log(&conn, "SELECT x FROM t WHERE x >= 3 AND x <= 5 ORDER BY x")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let rows = helper_read_all_rows(stmt);
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![Value::Integer(3)],
|
||||
vec![Value::Integer(4)],
|
||||
vec![Value::Integer(5)],
|
||||
]
|
||||
);
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue