diff --git a/core/mvcc/cursor.rs b/core/mvcc/cursor.rs index 463dd9cc3..30b3bf756 100644 --- a/core/mvcc/cursor.rs +++ b/core/mvcc/cursor.rs @@ -69,12 +69,31 @@ enum PrevState { Advance, } +#[derive(Debug, Clone)] +enum SeekBtreeState { + /// Seeking in btree (MVCC seek already done) + SeekBtree, + /// Advance to next key in btree (if we got [SeekResult::TryAdvance], or the current row is shadowed by MVCC) + AdvanceBTree, + /// Check if current row is visible (not shadowed by MVCC) + CheckRow, +} + +#[derive(Debug, Clone)] +enum SeekState { + /// Seeking in btree (MVCC seek already done) + SeekBtree(SeekBtreeState), + /// Pick winner and finalize + PickWinner, +} + #[derive(Debug, Clone)] enum MvccLazyCursorState { Next(NextState), Prev(PrevState), Rewind(RewindState), Exists(ExistsState), + Seek(SeekState, IterationDirection), } /// We read rows from MVCC index or BTree in a dual-cursor approach. @@ -404,6 +423,15 @@ impl MvccLazyCursor { /// Advance btree cursor forward and set btree peek to the first valid row key (skipping rows shadowed by MVCC) fn advance_btree_forward(&mut self) -> Result> { + self._advance_btree_forward(true) + } + + /// Advance btree cursor forward from current position (cursor already positioned by seek) + fn advance_btree_forward_from_current(&mut self) -> Result> { + self._advance_btree_forward(false) + } + + fn _advance_btree_forward(&mut self, initialize: bool) -> Result> { loop { let mut state = self.btree_advance_state.borrow_mut(); match state.as_mut() { @@ -414,9 +442,8 @@ impl MvccLazyCursor { *state = None; return Ok(IOResult::Done(())); } - let peek = self.dual_peek.borrow(); - // If the btree is uninitialized, do the equivalent of rewind() to find the first valid row - if peek.btree_uninitialized() { + // If the btree is uninitialized AND we should initialize, do the equivalent of rewind() to find the first valid row + if initialize && self.dual_peek.borrow().btree_uninitialized() { return_if_io!(self.btree_cursor.rewind()); *state = Some(AdvanceBtreeState::RewindCheckBtreeKey); } else { @@ -477,6 +504,15 @@ impl MvccLazyCursor { /// Advance btree cursor backward and set btree peek to the first valid row key (skipping rows shadowed by MVCC) fn advance_btree_backward(&mut self) -> Result> { + self._advance_btree_backward(true) + } + + /// Advance btree cursor backward from current position (cursor already positioned by seek) + fn advance_btree_backward_from_current(&mut self) -> Result> { + self._advance_btree_backward(false) + } + + fn _advance_btree_backward(&mut self, initialize: bool) -> Result> { loop { let mut state = self.btree_advance_state.borrow_mut(); match state.as_mut() { @@ -488,8 +524,9 @@ impl MvccLazyCursor { return Ok(IOResult::Done(())); } let peek = self.dual_peek.borrow(); - // If the btree is uninitialized, do the equivalent of last() to find the last valid row - if peek.btree_uninitialized() { + // If the btree is uninitialized AND we should initialize, do the equivalent of last() to find the last valid row + if initialize && peek.btree_uninitialized() { + drop(peek); return_if_io!(self.btree_cursor.last()); *state = Some(AdvanceBtreeState::RewindCheckBtreeKey); } else { @@ -602,6 +639,96 @@ impl MvccLazyCursor { }); } + /// Seek btree cursor and set btree_peek to the result. + /// Skips rows that are shadowed by MVCC. + /// Returns IOResult indicating if we need to yield for IO or are done. + fn seek_btree_and_set_peek( + &mut self, + seek_key: SeekKey<'_>, + op: SeekOp, + ) -> Result> { + // Fast path: btree not allocated + if !self.is_btree_allocated() { + let mut peek = self.dual_peek.borrow_mut(); + peek.btree_peek = CursorPeek::Exhausted; + self.state.replace(None); + return Ok(IOResult::Done(())); + } + + loop { + let Some(MvccLazyCursorState::Seek(SeekState::SeekBtree(btree_seek_state), direction)) = + self.state.borrow().clone() + else { + panic!( + "Invalid btree seek state in seek_btree_and_set_peek: {:?}", + self.state.borrow() + ); + }; + match btree_seek_state { + SeekBtreeState::SeekBtree => { + let seek_result = return_if_io!(self.btree_cursor.seek(seek_key.clone(), op)); + + match seek_result { + SeekResult::NotFound => { + let mut peek = self.dual_peek.borrow_mut(); + peek.btree_peek = CursorPeek::Exhausted; + return Ok(IOResult::Done(())); + } + SeekResult::TryAdvance => { + // Need to advance to find actual matching entry + self.state.replace(Some(MvccLazyCursorState::Seek( + SeekState::SeekBtree(SeekBtreeState::AdvanceBTree), + direction, + ))); + } + SeekResult::Found => { + self.state.replace(Some(MvccLazyCursorState::Seek( + SeekState::SeekBtree(SeekBtreeState::CheckRow), + direction, + ))); + } + } + } + SeekBtreeState::AdvanceBTree => { + return_if_io!(match direction { + IterationDirection::Forwards => { + self.advance_btree_forward_from_current() + } + IterationDirection::Backwards => { + self.advance_btree_backward_from_current() + } + }); + self.state.replace(Some(MvccLazyCursorState::Seek( + SeekState::SeekBtree(SeekBtreeState::CheckRow), + direction, + ))); + } + SeekBtreeState::CheckRow => { + let key = self.get_btree_current_key()?; + match key { + Some(k) if self.query_btree_version_is_valid(&k) => { + let mut peek = self.dual_peek.borrow_mut(); + peek.btree_peek = CursorPeek::Row(k); + return Ok(IOResult::Done(())); + } + Some(_) => { + // shadowed by MVCC, continue to next + self.state.replace(Some(MvccLazyCursorState::Seek( + SeekState::SeekBtree(SeekBtreeState::AdvanceBTree), + direction, + ))); + } + None => { + let mut peek = self.dual_peek.borrow_mut(); + peek.btree_peek = CursorPeek::Exhausted; + return Ok(IOResult::Done(())); + } + } + } + } + } + } + /// Initialize MVCC iterator for forward iteration (used when next() is called without rewind()) fn init_mvcc_iterator_forward(&mut self) { if self.table_iterator.is_some() || self.index_iterator.is_some() { @@ -932,113 +1059,157 @@ impl CursorTrait for MvccLazyCursor { // ge -> lower_bound bound included, we want first row equal to row_id or first row after row_id // lt -> upper_bound bound excluded, we want last row before row_id // le -> upper_bound bound included, we want last row equal to row_id or first row before row_id - match seek_key { - SeekKey::TableRowId(row_id) => { - let rowid = RowID { - table_id: self.table_id, - row_id: RowKey::Int(row_id), - }; - let inclusive = match op { - SeekOp::GT => false, - SeekOp::GE { eq_only: _ } => true, - SeekOp::LT => false, - SeekOp::LE { eq_only: _ } => true, - }; - self.invalidate_record(); - let found_rowid = self.db.seek_rowid( - rowid.clone(), - inclusive, - op.iteration_direction(), - self.tx_id, - &mut self.table_iterator, - ); - if let Some(found_rowid) = found_rowid { - self.current_pos.replace(CursorPosition::Loaded { - row_id: found_rowid.clone(), - in_btree: false, - }); - if op.eq_only() { - if found_rowid.row_id == rowid.row_id { - Ok(IOResult::Done(SeekResult::Found)) - } else { - Ok(IOResult::Done(SeekResult::NotFound)) + + loop { + let state = self.state.borrow().clone(); + match state { + None => { + // Initial state: Reset and do MVCC seek + let _ = self.table_iterator.take(); + let _ = self.index_iterator.take(); + self.reset_dual_peek(); + self.invalidate_record(); + + let direction = op.iteration_direction(); + let inclusive = matches!(op, SeekOp::GE { .. } | SeekOp::LE { .. }); + + match &seek_key { + SeekKey::TableRowId(row_id) => { + let rowid = RowID { + table_id: self.table_id, + row_id: RowKey::Int(*row_id), + }; + + // Seek in MVCC (synchronous) + let mvcc_rowid = self.db.seek_rowid( + rowid.clone(), + inclusive, + direction, + self.tx_id, + &mut self.table_iterator, + ); + + // Set MVCC peek + { + let mut peek = self.dual_peek.borrow_mut(); + peek.mvcc_peek = match &mvcc_rowid { + Some(rid) => CursorPeek::Row(rid.row_id.clone()), + None => CursorPeek::Exhausted, + }; + } + } + SeekKey::IndexKey(index_key) => { + let index_info = { + let MvccCursorType::Index(index_info) = &self.mv_cursor_type else { + panic!("SeekKey::IndexKey requires Index cursor type"); + }; + Arc::new(IndexInfo { + key_info: index_info.key_info.clone(), + has_rowid: index_info.has_rowid, + num_cols: index_key.column_count(), + }) + }; + let sortable_key = + SortableIndexKey::new_from_record((*index_key).clone(), index_info); + + // Seek in MVCC (synchronous) + let mvcc_rowid = self.db.seek_index( + self.table_id, + sortable_key.clone(), + inclusive, + direction, + self.tx_id, + &mut self.index_iterator, + ); + + // Set MVCC peek + { + let mut peek = self.dual_peek.borrow_mut(); + peek.mvcc_peek = match &mvcc_rowid { + Some(rid) => CursorPeek::Row(rid.row_id.clone()), + None => CursorPeek::Exhausted, + }; + } } - } else { - Ok(IOResult::Done(SeekResult::Found)) } - } else { - let forwards = matches!(op, SeekOp::GE { eq_only: _ } | SeekOp::GT); - if forwards { - let _ = self.last()?; - } else { - let _ = self.rewind()?; - } - Ok(IOResult::Done(SeekResult::NotFound)) + + // Move to btree seek state + self.state.replace(Some(MvccLazyCursorState::Seek( + SeekState::SeekBtree(SeekBtreeState::SeekBtree), + direction, + ))); } - } - SeekKey::IndexKey(index_key) => { - // TODO: we should seek in both btree and mvcc - let index_info = { - let MvccCursorType::Index(index_info) = &self.mv_cursor_type else { - panic!("SeekKey::IndexKey requires Index cursor type"); - }; - Arc::new(IndexInfo { - key_info: index_info.key_info.clone(), - has_rowid: index_info.has_rowid, - num_cols: index_key.column_count(), - }) - }; - let key_info = index_info - .key_info - .iter() - .take(index_key.column_count()) - .cloned() - .collect::>(); - let sortable_key = SortableIndexKey::new_from_record(index_key.clone(), index_info); - let inclusive = match op { - SeekOp::GT => false, - SeekOp::GE { eq_only: _ } => true, - SeekOp::LT => false, - SeekOp::LE { eq_only: _ } => true, - }; - let found_rowid = self.db.seek_index( - self.table_id, - sortable_key.clone(), - inclusive, - op.iteration_direction(), - self.tx_id, - &mut self.index_iterator, - ); - if let Some(found_rowid) = found_rowid { - self.current_pos.replace(CursorPosition::Loaded { - row_id: found_rowid.clone(), - in_btree: false, - }); - if op.eq_only() { - let RowKey::Record(found_rowid_key) = found_rowid.row_id else { - panic!("Found rowid is not a record"); - }; - let cmp = compare_immutable( - index_key.get_values(), - found_rowid_key.key.get_values(), - &key_info, - ); - if cmp.is_eq() { - Ok(IOResult::Done(SeekResult::Found)) + Some(MvccLazyCursorState::Seek(SeekState::SeekBtree(_), direction)) => { + return_if_io!(self.seek_btree_and_set_peek(seek_key.clone(), op)); + self.state.replace(Some(MvccLazyCursorState::Seek( + SeekState::PickWinner, + direction, + ))); + } + Some(MvccLazyCursorState::Seek(SeekState::PickWinner, direction)) => { + // Pick winner and return result + // Now pick the winner based on direction + let winner = self.dual_peek.borrow().get_next(direction); + + // Clear seek state + self.state.replace(None); + + if let Some((winner_key, in_btree)) = winner { + self.current_pos.replace(CursorPosition::Loaded { + row_id: RowID { + table_id: self.table_id, + row_id: winner_key.clone(), + }, + in_btree, + }); + + if op.eq_only() { + // Check if the winner matches the seek key + let found = match &seek_key { + SeekKey::TableRowId(row_id) => winner_key == RowKey::Int(*row_id), + SeekKey::IndexKey(index_key) => { + let RowKey::Record(found_key) = &winner_key else { + panic!("Found rowid is not a record"); + }; + let MvccCursorType::Index(index_info) = &self.mv_cursor_type + else { + panic!("Index cursor expected"); + }; + let key_info: Vec<_> = index_info + .key_info + .iter() + .take(index_key.column_count()) + .cloned() + .collect(); + let cmp = compare_immutable( + index_key.get_values(), + found_key.key.get_values(), + &key_info, + ); + cmp.is_eq() + } + }; + if found { + return Ok(IOResult::Done(SeekResult::Found)); + } else { + return Ok(IOResult::Done(SeekResult::NotFound)); + } } else { - Ok(IOResult::Done(SeekResult::NotFound)) + return Ok(IOResult::Done(SeekResult::Found)); } } else { - Ok(IOResult::Done(SeekResult::Found)) + // Nothing found in either cursor + let forwards = matches!(op, SeekOp::GE { .. } | SeekOp::GT); + if forwards { + self.current_pos.replace(CursorPosition::End); + } else { + self.current_pos.replace(CursorPosition::BeforeFirst); + } + return Ok(IOResult::Done(SeekResult::NotFound)); } - } else { - let forwards = matches!(op, SeekOp::GE { eq_only: _ } | SeekOp::GT); - if forwards { - let _ = self.last()?; - } else { - let _ = self.rewind()?; - } - Ok(IOResult::Done(SeekResult::NotFound)) + } + _ => { + panic!("Invalid state in seek: {:?}", self.state.borrow()); } } } diff --git a/core/types.rs b/core/types.rs index 89696da35..e1e04357b 100644 --- a/core/types.rs +++ b/core/types.rs @@ -2632,7 +2632,7 @@ macro_rules! return_and_restore_if_io { }; } -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone, Copy)] pub enum SeekResult { /// Record matching the [SeekOp] found in the B-tree and cursor was positioned to point onto that record Found, diff --git a/tests/fuzz/mod.rs b/tests/fuzz/mod.rs index df416442a..3e1738941 100644 --- a/tests/fuzz/mod.rs +++ b/tests/fuzz/mod.rs @@ -51,10 +51,13 @@ mod fuzz_tests { } } - // TODO: mvcc fuzz failure // INTEGER PRIMARY KEY is a rowid alias, so an index is not created - #[turso_macros::test(init_sql = "CREATE TABLE t (x INTEGER PRIMARY KEY autoincrement)")] + #[turso_macros::test( + mvcc, + init_sql = "CREATE TABLE t (x INTEGER PRIMARY KEY autoincrement)" + )] pub fn rowid_seek_fuzz(db: TempDatabase) { + let _ = tracing_subscriber::fmt::try_init(); let sqlite_conn = rusqlite::Connection::open(db.path.clone()).unwrap(); let (mut rng, _seed) = rng_from_time_or_env(); @@ -91,7 +94,7 @@ mod fuzz_tests { tracing::info!("rowid_seek_fuzz seed: {}", seed); for iteration in 0..2 { - tracing::trace!("rowid_seek_fuzz iteration: {}", iteration); + tracing::info!("rowid_seek_fuzz iteration: {}", iteration); for comp in COMPARISONS.iter() { for order_by in ORDER_BY.iter() { @@ -105,7 +108,7 @@ mod fuzz_tests { order_by.unwrap_or("") ); - log::trace!("query: {query}"); + tracing::info!("query: {query}"); let limbo_result = limbo_exec_rows(&db, &limbo_conn, &query); let sqlite_result = sqlite_exec_rows(&sqlite_conn, &query); assert_eq!( @@ -166,8 +169,7 @@ mod fuzz_tests { values } - // TODO: mvcc indexes - #[turso_macros::test(init_sql = "CREATE TABLE t (x PRIMARY KEY)")] + #[turso_macros::test(mvcc, init_sql = "CREATE TABLE t (x PRIMARY KEY)")] pub fn index_scan_fuzz(db: TempDatabase) { let sqlite_conn = rusqlite::Connection::open(db.path.clone()).unwrap(); @@ -212,8 +214,7 @@ mod fuzz_tests { } } - // TODO: mvcc indexes - #[turso_macros::test()] + #[turso_macros::test(mvcc)] /// A test for verifying that index seek+scan works correctly for compound keys /// on indexes with various column orderings. pub fn index_scan_compound_key_fuzz(db: TempDatabase) { diff --git a/tests/integration/query_processing/test_transactions.rs b/tests/integration/query_processing/test_transactions.rs index b5b2a7ef9..ae8e88c98 100644 --- a/tests/integration/query_processing/test_transactions.rs +++ b/tests/integration/query_processing/test_transactions.rs @@ -1220,3 +1220,424 @@ fn test_mvcc_checkpoint_before_insert_delete_after_checkpoint() { verify_table_contents(&conn, vec![1, 4]); } + +// Tests for dual-iteration seek: verifying that seek works correctly +// when there are rows in both btree (after checkpoint) and MVCC store. + +/// Test table seek (WHERE rowid = x) with rows in both btree and MVCC. +/// After checkpoint, rows 1-5 are in btree. Then we insert 6-10 into MVCC. +/// Seeking for various rowids should find them in the correct location. +#[test] +fn test_mvcc_dual_seek_table_rowid_basic() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_dual_seek_table_rowid_basic.db", + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Create table and insert initial rows + execute_and_log(&conn, "CREATE TABLE t (x INTEGER PRIMARY KEY, v TEXT)").unwrap(); + for i in 1..=5 { + execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i}, 'btree_{i}')")).unwrap(); + } + + // Checkpoint to move rows to btree + execute_and_log(&conn, "PRAGMA wal_checkpoint(TRUNCATE)").unwrap(); + + let path = tmp_db.path.clone(); + drop(conn); + drop(tmp_db); + + // Reopen to ensure btree is populated and MV store is empty + let tmp_db = TempDatabase::new_with_existent_with_opts( + &path, + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Insert more rows into MVCC + for i in 6..=10 { + execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i}, 'mvcc_{i}')")).unwrap(); + } + + // Seek for a row in btree + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 3") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("btree_3")]); + + // Seek for a row in MVCC + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 8") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("mvcc_8")]); + + // Seek for first row (btree) + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 1") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("btree_1")]); + + // Seek for last row (MVCC) + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 10") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("mvcc_10")]); + + // Seek for non-existent row + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 100") + .unwrap() + .unwrap(); + let rows = helper_read_all_rows(stmt); + assert!(rows.is_empty()); +} + +/// Test seek with interleaved rows in btree and MVCC. +/// Btree has odd numbers (1,3,5,7,9), MVCC has even numbers (2,4,6,8,10). +#[test] +fn test_mvcc_dual_seek_interleaved_rows() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_dual_seek_interleaved_rows.db", + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Create table and insert odd rows + execute_and_log(&conn, "CREATE TABLE t (x INTEGER PRIMARY KEY, v TEXT)").unwrap(); + for i in [1, 3, 5, 7, 9] { + execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i}, 'btree_{i}')")).unwrap(); + } + + // Checkpoint to move rows to btree + execute_and_log(&conn, "PRAGMA wal_checkpoint(TRUNCATE)").unwrap(); + + let path = tmp_db.path.clone(); + drop(conn); + drop(tmp_db); + + // Reopen + let tmp_db = TempDatabase::new_with_existent_with_opts( + &path, + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Insert even rows into MVCC + for i in [2, 4, 6, 8, 10] { + execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i}, 'mvcc_{i}')")).unwrap(); + } + + // Full table scan should return all rows in order + let stmt = query_and_log(&conn, "SELECT x, v FROM t ORDER BY x") + .unwrap() + .unwrap(); + let rows = helper_read_all_rows(stmt); + assert_eq!(rows.len(), 10); + for (i, row) in rows.iter().enumerate() { + let expected_x = (i + 1) as i64; + assert_eq!(row[0], Value::Integer(expected_x)); + let expected_source = if expected_x % 2 == 1 { "btree" } else { "mvcc" }; + assert_eq!( + row[1], + Value::build_text(format!("{expected_source}_{expected_x}")) + ); + } + + // Seek for btree row + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 5") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("btree_5")]); + + // Seek for MVCC row + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 6") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("mvcc_6")]); +} + +/// Test index seek with rows in both btree and MVCC. +#[test] +fn test_mvcc_dual_seek_index_basic() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_dual_seek_index_basic.db", + turso_core::DatabaseOpts::new() + .with_mvcc(true) + .with_indexes(true), + ); + let conn = tmp_db.connect_limbo(); + + // Create table with index + execute_and_log(&conn, "CREATE TABLE t (x INTEGER, v TEXT)").unwrap(); + execute_and_log(&conn, "CREATE INDEX idx_x ON t(x)").unwrap(); + + // Insert initial rows + for i in 1..=5 { + execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i}, 'btree_{i}')")).unwrap(); + } + + // Checkpoint + execute_and_log(&conn, "PRAGMA wal_checkpoint(TRUNCATE)").unwrap(); + + let path = tmp_db.path.clone(); + drop(conn); + drop(tmp_db); + + // Reopen + let tmp_db = TempDatabase::new_with_existent_with_opts( + &path, + turso_core::DatabaseOpts::new() + .with_mvcc(true) + .with_indexes(true), + ); + let conn = tmp_db.connect_limbo(); + + // Insert more rows into MVCC + for i in 6..=10 { + execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i}, 'mvcc_{i}')")).unwrap(); + } + + // Index seek for btree row + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 3") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("btree_3")]); + + // Index seek for MVCC row + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 8") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("mvcc_8")]); + + // Range scan should return all matching rows in order + let stmt = query_and_log(&conn, "SELECT x FROM t WHERE x >= 4 AND x <= 7 ORDER BY x") + .unwrap() + .unwrap(); + let rows = helper_read_all_rows(stmt); + assert_eq!( + rows, + vec![ + vec![Value::Integer(4)], + vec![Value::Integer(5)], + vec![Value::Integer(6)], + vec![Value::Integer(7)], + ] + ); +} + +/// Test seek with updates: row exists in btree but is updated in MVCC. +/// The seek should find the MVCC version (which shadows btree). +#[test] +fn test_mvcc_dual_seek_with_update() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_dual_seek_with_update.db", + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Create table and insert rows + execute_and_log(&conn, "CREATE TABLE t (x INTEGER PRIMARY KEY, v TEXT)").unwrap(); + for i in 1..=5 { + execute_and_log( + &conn, + &format!("INSERT INTO t VALUES ({i}, 'original_{i}')"), + ) + .unwrap(); + } + + // Checkpoint + execute_and_log(&conn, "PRAGMA wal_checkpoint(TRUNCATE)").unwrap(); + + let path = tmp_db.path.clone(); + drop(conn); + drop(tmp_db); + + // Reopen + let tmp_db = TempDatabase::new_with_existent_with_opts( + &path, + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Update row 3 (creates MVCC version that shadows btree) + execute_and_log(&conn, "UPDATE t SET v = 'updated_3' WHERE x = 3").unwrap(); + + // Seek for updated row should return MVCC version + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 3") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("updated_3")]); + + // Seek for non-updated row should still work + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 2") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("original_2")]); + + // Full scan should show correct values + let stmt = query_and_log(&conn, "SELECT x, v FROM t ORDER BY x") + .unwrap() + .unwrap(); + let rows = helper_read_all_rows(stmt); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1), Value::build_text("original_1")], + vec![Value::Integer(2), Value::build_text("original_2")], + vec![Value::Integer(3), Value::build_text("updated_3")], + vec![Value::Integer(4), Value::build_text("original_4")], + vec![Value::Integer(5), Value::build_text("original_5")], + ] + ); +} + +/// Test seek with delete: row exists in btree but is deleted in MVCC. +/// The seek should NOT find the deleted row. +#[test] +fn test_mvcc_dual_seek_with_delete() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_dual_seek_with_delete.db", + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Create table and insert rows + execute_and_log(&conn, "CREATE TABLE t (x INTEGER PRIMARY KEY, v TEXT)").unwrap(); + for i in 1..=5 { + execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i}, 'value_{i}')")).unwrap(); + } + + // Checkpoint + execute_and_log(&conn, "PRAGMA wal_checkpoint(TRUNCATE)").unwrap(); + + let path = tmp_db.path.clone(); + drop(conn); + drop(tmp_db); + + // Reopen + let tmp_db = TempDatabase::new_with_existent_with_opts( + &path, + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Delete row 3 + execute_and_log(&conn, "DELETE FROM t WHERE x = 3").unwrap(); + + // Seek for deleted row should return nothing + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 3") + .unwrap() + .unwrap(); + let rows = helper_read_all_rows(stmt); + assert!(rows.is_empty()); + + // Seek for non-deleted row should still work + let stmt = query_and_log(&conn, "SELECT v FROM t WHERE x = 2") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("value_2")]); + + // Full scan should not include deleted row + let stmt = query_and_log(&conn, "SELECT x FROM t ORDER BY x") + .unwrap() + .unwrap(); + let rows = helper_read_all_rows(stmt); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1)], + vec![Value::Integer(2)], + vec![Value::Integer(4)], + vec![Value::Integer(5)], + ] + ); +} + +/// Test range seek (GT, LT operations) with dual iteration. +#[test] +fn test_mvcc_dual_seek_range_operations() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_dual_seek_range_operations.db", + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Create table and insert rows + execute_and_log(&conn, "CREATE TABLE t (x INTEGER PRIMARY KEY)").unwrap(); + for i in [1, 3, 5] { + execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i})")).unwrap(); + } + + // Checkpoint + execute_and_log(&conn, "PRAGMA wal_checkpoint(TRUNCATE)").unwrap(); + + let path = tmp_db.path.clone(); + drop(conn); + drop(tmp_db); + + // Reopen + let tmp_db = TempDatabase::new_with_existent_with_opts( + &path, + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Insert more rows into MVCC + for i in [2, 4, 6] { + execute_and_log(&conn, &format!("INSERT INTO t VALUES ({i})")).unwrap(); + } + + // Range: x > 2 (should include 3,4,5,6) + let stmt = query_and_log(&conn, "SELECT x FROM t WHERE x > 2 ORDER BY x") + .unwrap() + .unwrap(); + let rows = helper_read_all_rows(stmt); + assert_eq!( + rows, + vec![ + vec![Value::Integer(3)], + vec![Value::Integer(4)], + vec![Value::Integer(5)], + vec![Value::Integer(6)], + ] + ); + + // Range: x < 4 (should include 1,2,3) + let stmt = query_and_log(&conn, "SELECT x FROM t WHERE x < 4 ORDER BY x") + .unwrap() + .unwrap(); + let rows = helper_read_all_rows(stmt); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1)], + vec![Value::Integer(2)], + vec![Value::Integer(3)], + ] + ); + + // Range: x >= 3 AND x <= 5 (should include 3,4,5) + let stmt = query_and_log(&conn, "SELECT x FROM t WHERE x >= 3 AND x <= 5 ORDER BY x") + .unwrap() + .unwrap(); + let rows = helper_read_all_rows(stmt); + assert_eq!( + rows, + vec![ + vec![Value::Integer(3)], + vec![Value::Integer(4)], + vec![Value::Integer(5)], + ] + ); +}