From eb782ce2d4868b2ad9f19cfcd5bf55a51d823226 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Thu, 4 Dec 2025 13:00:15 +0200 Subject: [PATCH] fix/mvcc: seek() must seek from both mv store and btree for example, upon opening an existing database, all the rows are in the btree, so if we seek only from MV store, we won't find anything. ergo: we must look from both the mv store and the btree. if we are iterating forwards, the smallest of the two results is where we land, and vice versa for backwards iteration. initially this implementation used blocking IO but was refactored to use state machines after the rest of the Cursor methods in the MVCC cursor module were refactored to do that too. --- this PR was initially almost entirely written using Claude Code + Opus 4.5, but heavily manually cleaned up as the AI made the state machine refactor far too complicated. --- core/mvcc/cursor.rs | 381 +++++++++++----- core/types.rs | 2 +- tests/fuzz/mod.rs | 17 +- .../query_processing/test_transactions.rs | 421 ++++++++++++++++++ 4 files changed, 707 insertions(+), 114 deletions(-) diff --git a/core/mvcc/cursor.rs b/core/mvcc/cursor.rs index 463dd9cc3..30b3bf756 100644 --- a/core/mvcc/cursor.rs +++ b/core/mvcc/cursor.rs @@ -69,12 +69,31 @@ enum PrevState { Advance, } +#[derive(Debug, Clone)] +enum SeekBtreeState { + /// Seeking in btree (MVCC seek already done) + SeekBtree, + /// Advance to next key in btree (if we got [SeekResult::TryAdvance], or the current row is shadowed by MVCC) + AdvanceBTree, + /// Check if current row is visible (not shadowed by MVCC) + CheckRow, +} + +#[derive(Debug, Clone)] +enum SeekState { + /// Seeking in btree (MVCC seek already done) + SeekBtree(SeekBtreeState), + /// Pick winner and finalize + PickWinner, +} + #[derive(Debug, Clone)] enum MvccLazyCursorState { Next(NextState), Prev(PrevState), Rewind(RewindState), Exists(ExistsState), + Seek(SeekState, IterationDirection), } /// We read rows from MVCC index or BTree in a dual-cursor approach. @@ -404,6 +423,15 @@ impl MvccLazyCursor { /// Advance btree cursor forward and set btree peek to the first valid row key (skipping rows shadowed by MVCC) fn advance_btree_forward(&mut self) -> Result> { + self._advance_btree_forward(true) + } + + /// Advance btree cursor forward from current position (cursor already positioned by seek) + fn advance_btree_forward_from_current(&mut self) -> Result> { + self._advance_btree_forward(false) + } + + fn _advance_btree_forward(&mut self, initialize: bool) -> Result> { loop { let mut state = self.btree_advance_state.borrow_mut(); match state.as_mut() { @@ -414,9 +442,8 @@ impl MvccLazyCursor { *state = None; return Ok(IOResult::Done(())); } - let peek = self.dual_peek.borrow(); - // If the btree is uninitialized, do the equivalent of rewind() to find the first valid row - if peek.btree_uninitialized() { + // If the btree is uninitialized AND we should initialize, do the equivalent of rewind() to find the first valid row + if initialize && self.dual_peek.borrow().btree_uninitialized() { return_if_io!(self.btree_cursor.rewind()); *state = Some(AdvanceBtreeState::RewindCheckBtreeKey); } else { @@ -477,6 +504,15 @@ impl MvccLazyCursor { /// Advance btree cursor backward and set btree peek to the first valid row key (skipping rows shadowed by MVCC) fn advance_btree_backward(&mut self) -> Result> { + self._advance_btree_backward(true) + } + + /// Advance btree cursor backward from current position (cursor already positioned by seek) + fn advance_btree_backward_from_current(&mut self) -> Result> { + self._advance_btree_backward(false) + } + + fn _advance_btree_backward(&mut self, initialize: bool) -> Result> { loop { let mut state = self.btree_advance_state.borrow_mut(); match state.as_mut() { @@ -488,8 +524,9 @@ impl MvccLazyCursor { return Ok(IOResult::Done(())); } let peek = self.dual_peek.borrow(); - // If the btree is uninitialized, do the equivalent of last() to find the last valid row - if peek.btree_uninitialized() { + // If the btree is uninitialized AND we should initialize, do the equivalent of last() to find the last valid row + if initialize && peek.btree_uninitialized() { + drop(peek); return_if_io!(self.btree_cursor.last()); *state = Some(AdvanceBtreeState::RewindCheckBtreeKey); } else { @@ -602,6 +639,96 @@ impl MvccLazyCursor { }); } + /// Seek btree cursor and set btree_peek to the result. + /// Skips rows that are shadowed by MVCC. + /// Returns IOResult indicating if we need to yield for IO or are done. + fn seek_btree_and_set_peek( + &mut self, + seek_key: SeekKey<'_>, + op: SeekOp, + ) -> Result> { + // Fast path: btree not allocated + if !self.is_btree_allocated() { + let mut peek = self.dual_peek.borrow_mut(); + peek.btree_peek = CursorPeek::Exhausted; + self.state.replace(None); + return Ok(IOResult::Done(())); + } + + loop { + let Some(MvccLazyCursorState::Seek(SeekState::SeekBtree(btree_seek_state), direction)) = + self.state.borrow().clone() + else { + panic!( + "Invalid btree seek state in seek_btree_and_set_peek: {:?}", + self.state.borrow() + ); + }; + match btree_seek_state { + SeekBtreeState::SeekBtree => { + let seek_result = return_if_io!(self.btree_cursor.seek(seek_key.clone(), op)); + + match seek_result { + SeekResult::NotFound => { + let mut peek = self.dual_peek.borrow_mut(); + peek.btree_peek = CursorPeek::Exhausted; + return Ok(IOResult::Done(())); + } + SeekResult::TryAdvance => { + // Need to advance to find actual matching entry + self.state.replace(Some(MvccLazyCursorState::Seek( + SeekState::SeekBtree(SeekBtreeState::AdvanceBTree), + direction, + ))); + } + SeekResult::Found => { + self.state.replace(Some(MvccLazyCursorState::Seek( + SeekState::SeekBtree(SeekBtreeState::CheckRow), + direction, + ))); + } + } + } + SeekBtreeState::AdvanceBTree => { + return_if_io!(match direction { + IterationDirection::Forwards => { + self.advance_btree_forward_from_current() + } + IterationDirection::Backwards => { + self.advance_btree_backward_from_current() + } + }); + self.state.replace(Some(MvccLazyCursorState::Seek( + SeekState::SeekBtree(SeekBtreeState::CheckRow), + direction, + ))); + } + SeekBtreeState::CheckRow => { + let key = self.get_btree_current_key()?; + match key { + Some(k) if self.query_btree_version_is_valid(&k) => { + let mut peek = self.dual_peek.borrow_mut(); + peek.btree_peek = CursorPeek::Row(k); + return Ok(IOResult::Done(())); + } + Some(_) => { + // shadowed by MVCC, continue to next + self.state.replace(Some(MvccLazyCursorState::Seek( + SeekState::SeekBtree(SeekBtreeState::AdvanceBTree), + direction, + ))); + } + None => { + let mut peek = self.dual_peek.borrow_mut(); + peek.btree_peek = CursorPeek::Exhausted; + return Ok(IOResult::Done(())); + } + } + } + } + } + } + /// Initialize MVCC iterator for forward iteration (used when next() is called without rewind()) fn init_mvcc_iterator_forward(&mut self) { if self.table_iterator.is_some() || self.index_iterator.is_some() { @@ -932,113 +1059,157 @@ impl CursorTrait for MvccLazyCursor { // ge -> lower_bound bound included, we want first row equal to row_id or first row after row_id // lt -> upper_bound bound excluded, we want last row before row_id // le -> upper_bound bound included, we want last row equal to row_id or first row before row_id - match seek_key { - SeekKey::TableRowId(row_id) => { - let rowid = RowID { - table_id: self.table_id, - row_id: RowKey::Int(row_id), - }; - let inclusive = match op { - SeekOp::GT => false, - SeekOp::GE { eq_only: _ } => true, - SeekOp::LT => false, - SeekOp::LE { eq_only: _ } => true, - }; - self.invalidate_record(); - let found_rowid = self.db.seek_rowid( - rowid.clone(), - inclusive, - op.iteration_direction(), - self.tx_id, - &mut self.table_iterator, - ); - if let Some(found_rowid) = found_rowid { - self.current_pos.replace(CursorPosition::Loaded { - row_id: found_rowid.clone(), - in_btree: false, - }); - if op.eq_only() { - if found_rowid.row_id == rowid.row_id { - Ok(IOResult::Done(SeekResult::Found)) - } else { - Ok(IOResult::Done(SeekResult::NotFound)) + + loop { + let state = self.state.borrow().clone(); + match state { + None => { + // Initial state: Reset and do MVCC seek + let _ = self.table_iterator.take(); + let _ = self.index_iterator.take(); + self.reset_dual_peek(); + self.invalidate_record(); + + let direction = op.iteration_direction(); + let inclusive = matches!(op, SeekOp::GE { .. } | SeekOp::LE { .. }); + + match &seek_key { + SeekKey::TableRowId(row_id) => { + let rowid = RowID { + table_id: self.table_id, + row_id: RowKey::Int(*row_id), + }; + + // Seek in MVCC (synchronous) + let mvcc_rowid = self.db.seek_rowid( + rowid.clone(), + inclusive, + direction, + self.tx_id, + &mut self.table_iterator, + ); + + // Set MVCC peek + { + let mut peek = self.dual_peek.borrow_mut(); + peek.mvcc_peek = match &mvcc_rowid { + Some(rid) => CursorPeek::Row(rid.row_id.clone()), + None => CursorPeek::Exhausted, + }; + } + } + SeekKey::IndexKey(index_key) => { + let index_info = { + let MvccCursorType::Index(index_info) = &self.mv_cursor_type else { + panic!("SeekKey::IndexKey requires Index cursor type"); + }; + Arc::new(IndexInfo { + key_info: index_info.key_info.clone(), + has_rowid: index_info.has_rowid, + num_cols: index_key.column_count(), + }) + }; + let sortable_key = + SortableIndexKey::new_from_record((*index_key).clone(), index_info); + + // Seek in MVCC (synchronous) + let mvcc_rowid = self.db.seek_index( + self.table_id, + sortable_key.clone(), + inclusive, + direction, + self.tx_id, + &mut self.index_iterator, + ); + + // Set MVCC peek + { + let mut peek = self.dual_peek.borrow_mut(); + peek.mvcc_peek = match &mvcc_rowid { + Some(rid) => CursorPeek::Row(rid.row_id.clone()), + None => CursorPeek::Exhausted, + }; + } } - } else { - Ok(IOResult::Done(SeekResult::Found)) } - } else { - let forwards = matches!(op, SeekOp::GE { eq_only: _ } | SeekOp::GT); - if forwards { - let _ = self.last()?; - } else { - let _ = self.rewind()?; - } - Ok(IOResult::Done(SeekResult::NotFound)) + + // Move to btree seek state + self.state.replace(Some(MvccLazyCursorState::Seek( + SeekState::SeekBtree(SeekBtreeState::SeekBtree), + direction, + ))); } - } - SeekKey::IndexKey(index_key) => { - // TODO: we should seek in both btree and mvcc - let index_info = { - let MvccCursorType::Index(index_info) = &self.mv_cursor_type else { - panic!("SeekKey::IndexKey requires Index cursor type"); - }; - Arc::new(IndexInfo { - key_info: index_info.key_info.clone(), - has_rowid: index_info.has_rowid, - num_cols: index_key.column_count(), - }) - }; - let key_info = index_info - .key_info - .iter() - .take(index_key.column_count()) - .cloned() - .collect::>(); - let sortable_key = SortableIndexKey::new_from_record(index_key.clone(), index_info); - let inclusive = match op { - SeekOp::GT => false, - SeekOp::GE { eq_only: _ } => true, - SeekOp::LT => false, - SeekOp::LE { eq_only: _ } => true, - }; - let found_rowid = self.db.seek_index( - self.table_id, - sortable_key.clone(), - inclusive, - op.iteration_direction(), - self.tx_id, - &mut self.index_iterator, - ); - if let Some(found_rowid) = found_rowid { - self.current_pos.replace(CursorPosition::Loaded { - row_id: found_rowid.clone(), - in_btree: false, - }); - if op.eq_only() { - let RowKey::Record(found_rowid_key) = found_rowid.row_id else { - panic!("Found rowid is not a record"); - }; - let cmp = compare_immutable( - index_key.get_values(), - found_rowid_key.key.get_values(), - &key_info, - ); - if cmp.is_eq() { - Ok(IOResult::Done(SeekResult::Found)) + Some(MvccLazyCursorState::Seek(SeekState::SeekBtree(_), direction)) => { + return_if_io!(self.seek_btree_and_set_peek(seek_key.clone(), op)); + self.state.replace(Some(MvccLazyCursorState::Seek( + SeekState::PickWinner, + direction, + ))); + } + Some(MvccLazyCursorState::Seek(SeekState::PickWinner, direction)) => { + // Pick winner and return result + // Now pick the winner based on direction + let winner = self.dual_peek.borrow().get_next(direction); + + // Clear seek state + self.state.replace(None); + + if let Some((winner_key, in_btree)) = winner { + self.current_pos.replace(CursorPosition::Loaded { + row_id: RowID { + table_id: self.table_id, + row_id: winner_key.clone(), + }, + in_btree, + }); + + if op.eq_only() { + // Check if the winner matches the seek key + let found = match &seek_key { + SeekKey::TableRowId(row_id) => winner_key == RowKey::Int(*row_id), + SeekKey::IndexKey(index_key) => { + let RowKey::Record(found_key) = &winner_key else { + panic!("Found rowid is not a record"); + }; + let MvccCursorType::Index(index_info) = &self.mv_cursor_type + else { + panic!("Index cursor expected"); + }; + let key_info: Vec<_> = index_info + .key_info + .iter() + .take(index_key.column_count()) + .cloned() + .collect(); + let cmp = compare_immutable( + index_key.get_values(), + found_key.key.get_values(), + &key_info, + ); + cmp.is_eq() + } + }; + if found { + return Ok(IOResult::Done(SeekResult::Found)); + } else { + return Ok(IOResult::Done(SeekResult::NotFound)); + } } else { - Ok(IOResult::Done(SeekResult::NotFound)) + return Ok(IOResult::Done(SeekResult::Found)); } } else { - Ok(IOResult::Done(SeekResult::Found)) + // Nothing found in either cursor + let forwards = matches!(op, SeekOp::GE { .. } | SeekOp::GT); + if forwards { + self.current_pos.replace(CursorPosition::End); + } else { + self.current_pos.replace(CursorPosition::BeforeFirst); + } + return Ok(IOResult::Done(SeekResult::NotFound)); } - } else { - let forwards = matches!(op, SeekOp::GE { eq_only: _ } | SeekOp::GT); - if forwards { - let _ = self.last()?; - } else { - let _ = self.rewind()?; - } - Ok(IOResult::Done(SeekResult::NotFound)) + } + _ => { + panic!("Invalid state in seek: {:?}", self.state.borrow()); } } } diff --git a/core/types.rs b/core/types.rs index 89696da35..e1e04357b 100644 --- a/core/types.rs +++ b/core/types.rs @@ -2632,7 +2632,7 @@ macro_rules! return_and_restore_if_io { }; } -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone, Copy)] pub enum SeekResult { /// Record matching the [SeekOp] found in the B-tree and cursor was positioned to point onto that record Found, diff --git a/tests/fuzz/mod.rs b/tests/fuzz/mod.rs index df416442a..3e1738941 100644 --- a/tests/fuzz/mod.rs +++ b/tests/fuzz/mod.rs @@ -51,10 +51,13 @@ mod fuzz_tests { } } - // TODO: mvcc fuzz failure // INTEGER PRIMARY KEY is a rowid alias, so an index is not created - #[turso_macros::test(init_sql = "CREATE TABLE t (x INTEGER PRIMARY KEY autoincrement)")] + #[turso_macros::test( + mvcc, + init_sql = "CREATE TABLE t (x INTEGER PRIMARY KEY autoincrement)" + )] pub fn rowid_seek_fuzz(db: TempDatabase) { + let _ = tracing_subscriber::fmt::try_init(); let sqlite_conn = rusqlite::Connection::open(db.path.clone()).unwrap(); let (mut rng, _seed) = rng_from_time_or_env(); @@ -91,7 +94,7 @@ mod fuzz_tests { tracing::info!("rowid_seek_fuzz seed: {}", seed); for iteration in 0..2 { - tracing::trace!("rowid_seek_fuzz iteration: {}", iteration); + tracing::info!("rowid_seek_fuzz iteration: {}", iteration); for comp in COMPARISONS.iter() { for order_by in ORDER_BY.iter() { @@ -105,7 +108,7 @@ mod fuzz_tests { order_by.unwrap_or("") ); - log::trace!("query: {query}"); + tracing::info!("query: {query}"); let limbo_result = limbo_exec_rows(&db, &limbo_conn, &query); let sqlite_result = sqlite_exec_rows(&sqlite_conn, &query); assert_eq!( @@ -166,8 +169,7 @@ mod fuzz_tests { values } - // TODO: mvcc indexes - #[turso_macros::test(init_sql = "CREATE TABLE t (x PRIMARY KEY)")] + #[turso_macros::test(mvcc, init_sql = "CREATE TABLE t (x PRIMARY KEY)")] pub fn index_scan_fuzz(db: TempDatabase) { let sqlite_conn = rusqlite::Connection::open(db.path.clone()).unwrap(); @@ -212,8 +214,7 @@ mod fuzz_tests { } } - // TODO: mvcc indexes - #[turso_macros::test()] + #[turso_macros::test(mvcc)] /// A test for verifying that index seek+scan works correctly for compound keys /// on indexes with various column orderings. pub fn index_scan_compound_key_fuzz(db: TempDatabase) { diff --git a/tests/integration/query_processing/test_transactions.rs b/tests/integration/query_processing/test_transactions.rs index b5b2a7ef9..ae8e88c98 100644 --- a/tests/integration/query_processing/test_transactions.rs +++ b/tests/integration/query_processing/test_transactions.rs @@ -1220,3 +1220,424 @@ fn test_mvcc_checkpoint_before_insert_delete_after_checkpoint() { verify_table_contents(&conn, vec![1, 4]); } + +// Tests for dual-iteration seek: verifying that seek works correctly +// when there are rows in both btree (after checkpoint) and MVCC store. + +/// Test table seek (WHERE rowid = x) with rows in both btree and MVCC. +/// After checkpoint, rows 1-5 are in btree. Then we insert 6-10 into MVCC. +/// Seeking for various rowids should find them in the correct location. +#[test] +fn test_mvcc_dual_seek_table_rowid_basic() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_dual_seek_table_rowid_basic.db", + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Create table and insert initial rows + execute_and_log(&conn, "CREATE TABLE t (x INTEGER PRIMARY KEY, v TEXT)").unwrap(); + for i in 1..=5 { + execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i}, 'btree_{i}')")).unwrap(); + } + + // Checkpoint to move rows to btree + execute_and_log(&conn, "PRAGMA wal_checkpoint(TRUNCATE)").unwrap(); + + let path = tmp_db.path.clone(); + drop(conn); + drop(tmp_db); + + // Reopen to ensure btree is populated and MV store is empty + let tmp_db = TempDatabase::new_with_existent_with_opts( + &path, + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Insert more rows into MVCC + for i in 6..=10 { + execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i}, 'mvcc_{i}')")).unwrap(); + } + + // Seek for a row in btree + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 3") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("btree_3")]); + + // Seek for a row in MVCC + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 8") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("mvcc_8")]); + + // Seek for first row (btree) + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 1") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("btree_1")]); + + // Seek for last row (MVCC) + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 10") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("mvcc_10")]); + + // Seek for non-existent row + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 100") + .unwrap() + .unwrap(); + let rows = helper_read_all_rows(stmt); + assert!(rows.is_empty()); +} + +/// Test seek with interleaved rows in btree and MVCC. +/// Btree has odd numbers (1,3,5,7,9), MVCC has even numbers (2,4,6,8,10). +#[test] +fn test_mvcc_dual_seek_interleaved_rows() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_dual_seek_interleaved_rows.db", + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Create table and insert odd rows + execute_and_log(&conn, "CREATE TABLE t (x INTEGER PRIMARY KEY, v TEXT)").unwrap(); + for i in [1, 3, 5, 7, 9] { + execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i}, 'btree_{i}')")).unwrap(); + } + + // Checkpoint to move rows to btree + execute_and_log(&conn, "PRAGMA wal_checkpoint(TRUNCATE)").unwrap(); + + let path = tmp_db.path.clone(); + drop(conn); + drop(tmp_db); + + // Reopen + let tmp_db = TempDatabase::new_with_existent_with_opts( + &path, + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Insert even rows into MVCC + for i in [2, 4, 6, 8, 10] { + execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i}, 'mvcc_{i}')")).unwrap(); + } + + // Full table scan should return all rows in order + let stmt = query_and_log(&conn, "SELECT x, v FROM t ORDER BY x") + .unwrap() + .unwrap(); + let rows = helper_read_all_rows(stmt); + assert_eq!(rows.len(), 10); + for (i, row) in rows.iter().enumerate() { + let expected_x = (i + 1) as i64; + assert_eq!(row[0], Value::Integer(expected_x)); + let expected_source = if expected_x % 2 == 1 { "btree" } else { "mvcc" }; + assert_eq!( + row[1], + Value::build_text(format!("{expected_source}_{expected_x}")) + ); + } + + // Seek for btree row + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 5") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("btree_5")]); + + // Seek for MVCC row + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 6") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("mvcc_6")]); +} + +/// Test index seek with rows in both btree and MVCC. +#[test] +fn test_mvcc_dual_seek_index_basic() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_dual_seek_index_basic.db", + turso_core::DatabaseOpts::new() + .with_mvcc(true) + .with_indexes(true), + ); + let conn = tmp_db.connect_limbo(); + + // Create table with index + execute_and_log(&conn, "CREATE TABLE t (x INTEGER, v TEXT)").unwrap(); + execute_and_log(&conn, "CREATE INDEX idx_x ON t(x)").unwrap(); + + // Insert initial rows + for i in 1..=5 { + execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i}, 'btree_{i}')")).unwrap(); + } + + // Checkpoint + execute_and_log(&conn, "PRAGMA wal_checkpoint(TRUNCATE)").unwrap(); + + let path = tmp_db.path.clone(); + drop(conn); + drop(tmp_db); + + // Reopen + let tmp_db = TempDatabase::new_with_existent_with_opts( + &path, + turso_core::DatabaseOpts::new() + .with_mvcc(true) + .with_indexes(true), + ); + let conn = tmp_db.connect_limbo(); + + // Insert more rows into MVCC + for i in 6..=10 { + execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i}, 'mvcc_{i}')")).unwrap(); + } + + // Index seek for btree row + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 3") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("btree_3")]); + + // Index seek for MVCC row + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 8") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("mvcc_8")]); + + // Range scan should return all matching rows in order + let stmt = query_and_log(&conn, "SELECT x FROM t WHERE x >= 4 AND x <= 7 ORDER BY x") + .unwrap() + .unwrap(); + let rows = helper_read_all_rows(stmt); + assert_eq!( + rows, + vec![ + vec![Value::Integer(4)], + vec![Value::Integer(5)], + vec![Value::Integer(6)], + vec![Value::Integer(7)], + ] + ); +} + +/// Test seek with updates: row exists in btree but is updated in MVCC. +/// The seek should find the MVCC version (which shadows btree). +#[test] +fn test_mvcc_dual_seek_with_update() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_dual_seek_with_update.db", + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Create table and insert rows + execute_and_log(&conn, "CREATE TABLE t (x INTEGER PRIMARY KEY, v TEXT)").unwrap(); + for i in 1..=5 { + execute_and_log( + &conn, + &format!("INSERT INTO t VALUES ({i}, 'original_{i}')"), + ) + .unwrap(); + } + + // Checkpoint + execute_and_log(&conn, "PRAGMA wal_checkpoint(TRUNCATE)").unwrap(); + + let path = tmp_db.path.clone(); + drop(conn); + drop(tmp_db); + + // Reopen + let tmp_db = TempDatabase::new_with_existent_with_opts( + &path, + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Update row 3 (creates MVCC version that shadows btree) + execute_and_log(&conn, "UPDATE t SET v = 'updated_3' WHERE x = 3").unwrap(); + + // Seek for updated row should return MVCC version + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 3") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("updated_3")]); + + // Seek for non-updated row should still work + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 2") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("original_2")]); + + // Full scan should show correct values + let stmt = query_and_log(&conn, "SELECT x, v FROM t ORDER BY x") + .unwrap() + .unwrap(); + let rows = helper_read_all_rows(stmt); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1), Value::build_text("original_1")], + vec![Value::Integer(2), Value::build_text("original_2")], + vec![Value::Integer(3), Value::build_text("updated_3")], + vec![Value::Integer(4), Value::build_text("original_4")], + vec![Value::Integer(5), Value::build_text("original_5")], + ] + ); +} + +/// Test seek with delete: row exists in btree but is deleted in MVCC. +/// The seek should NOT find the deleted row. +#[test] +fn test_mvcc_dual_seek_with_delete() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_dual_seek_with_delete.db", + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Create table and insert rows + execute_and_log(&conn, "CREATE TABLE t (x INTEGER PRIMARY KEY, v TEXT)").unwrap(); + for i in 1..=5 { + execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i}, 'value_{i}')")).unwrap(); + } + + // Checkpoint + execute_and_log(&conn, "PRAGMA wal_checkpoint(TRUNCATE)").unwrap(); + + let path = tmp_db.path.clone(); + drop(conn); + drop(tmp_db); + + // Reopen + let tmp_db = TempDatabase::new_with_existent_with_opts( + &path, + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Delete row 3 + execute_and_log(&conn, "DELETE FROM t WHERE x = 3").unwrap(); + + // Seek for deleted row should return nothing + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 3") + .unwrap() + .unwrap(); + let rows = helper_read_all_rows(stmt); + assert!(rows.is_empty()); + + // Seek for non-deleted row should still work + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 2") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("value_2")]); + + // Full scan should not include deleted row + let stmt = query_and_log(&conn, "SELECT x FROM t ORDER BY x") + .unwrap() + .unwrap(); + let rows = helper_read_all_rows(stmt); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1)], + vec![Value::Integer(2)], + vec![Value::Integer(4)], + vec![Value::Integer(5)], + ] + ); +} + +/// Test range seek (GT, LT operations) with dual iteration. +#[test] +fn test_mvcc_dual_seek_range_operations() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_dual_seek_range_operations.db", + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Create table and insert rows + execute_and_log(&conn, "CREATE TABLE t (x INTEGER PRIMARY KEY)").unwrap(); + for i in [1, 3, 5] { + execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i})")).unwrap(); + } + + // Checkpoint + execute_and_log(&conn, "PRAGMA wal_checkpoint(TRUNCATE)").unwrap(); + + let path = tmp_db.path.clone(); + drop(conn); + drop(tmp_db); + + // Reopen + let tmp_db = TempDatabase::new_with_existent_with_opts( + &path, + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Insert more rows into MVCC + for i in [2, 4, 6] { + execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i})")).unwrap(); + } + + // Range: x > 2 (should include 3,4,5,6) + let stmt = query_and_log(&conn, "SELECT x FROM t WHERE x > 2 ORDER BY x") + .unwrap() + .unwrap(); + let rows = helper_read_all_rows(stmt); + assert_eq!( + rows, + vec![ + vec![Value::Integer(3)], + vec![Value::Integer(4)], + vec![Value::Integer(5)], + vec![Value::Integer(6)], + ] + ); + + // Range: x < 4 (should include 1,2,3) + let stmt = query_and_log(&conn, "SELECT x FROM t WHERE x < 4 ORDER BY x") + .unwrap() + .unwrap(); + let rows = helper_read_all_rows(stmt); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1)], + vec![Value::Integer(2)], + vec![Value::Integer(3)], + ] + ); + + // Range: x >= 3 AND x <= 5 (should include 3,4,5) + let stmt = query_and_log(&conn, "SELECT x FROM t WHERE x >= 3 AND x <= 5 ORDER BY x") + .unwrap() + .unwrap(); + let rows = helper_read_all_rows(stmt); + assert_eq!( + rows, + vec![ + vec![Value::Integer(3)], + vec![Value::Integer(4)], + vec![Value::Integer(5)], + ] + ); +}