diff --git a/simulator/generation/plan.rs b/simulator/generation/plan.rs index fe170e878..12ab2e454 100644 --- a/simulator/generation/plan.rs +++ b/simulator/generation/plan.rs @@ -592,9 +592,21 @@ impl Interaction { let mut rows = rows.unwrap().unwrap(); let mut out = Vec::new(); let mut current_prob = 0.05; - let mut incr = 0.01; - while let Ok(row) = rows.step() { - match row { + let mut incr = 0.001; + loop { + let syncing = { + let files = env.io.files.borrow(); + // TODO: currently assuming we only have 1 file that is syncing + files + .iter() + .any(|file| file.sync_completion.borrow().is_some()) + }; + let inject_fault = env.rng.gen_bool(current_prob); + if inject_fault || syncing { + env.io.inject_fault(true); + } + + match rows.step()? { StepResult::Row => { let row = rows.row().unwrap(); let mut r = Vec::new(); @@ -605,24 +617,12 @@ impl Interaction { out.push(r); } StepResult::IO => { - let syncing = { - let files = env.io.files.borrow(); - // TODO: currently assuming we only have 1 file that is syncing - files - .iter() - .any(|file| file.sync_completion.borrow().is_some()) - }; - let inject_fault = env.rng.gen_bool(current_prob); - if inject_fault || syncing { - env.io.inject_fault(true); - } - rows.run_once()?; current_prob += incr; if current_prob > 1.0 { current_prob = 1.0; } else { - incr += 0.01; + incr *= 1.01; } } StepResult::Done => { diff --git a/simulator/generation/property.rs b/simulator/generation/property.rs index b15e55da5..e74338d60 100644 --- a/simulator/generation/property.rs +++ b/simulator/generation/property.rs @@ -123,7 +123,10 @@ pub(crate) enum Property { /// tends to optimize `where` statements while keeping the result column expressions /// unoptimized. This property is used to test the optimizer. The property is successful /// if the two queries return the same number of rows. - SelectSelectOptimizer { table: String, predicate: Predicate }, + SelectSelectOptimizer { + table: String, + predicate: Predicate, + }, /// FsyncNoWait is a property which tests if we do not loose any data after not waiting for fsync. /// /// # Interactions @@ -132,7 +135,14 @@ pub(crate) enum Property { /// - Execute the `query` again /// - Query tables to assert that the values were inserted /// - FsyncNoWait { query: Query, tables: Vec }, + FsyncNoWait { + query: Query, + tables: Vec, + }, + FaultyQuery { + query: Query, + tables: Vec, + }, } impl Property { @@ -145,6 +155,7 @@ impl Property { Property::DropSelect { .. } => "Drop-Select", Property::SelectSelectOptimizer { .. } => "Select-Select-Optimizer", Property::FsyncNoWait { .. } => "FsyncNoWait", + Property::FaultyQuery { .. } => "FaultyQuery", } } /// interactions construct a list of interactions, which is an executable representation of the property. @@ -436,49 +447,53 @@ impl Property { vec![assumption, select1, select2, assertion] } Property::FsyncNoWait { query, tables } => { - let checks = tables.iter().flat_map(|table| { - let select = Interaction::Query(Query::Select(Select { - table: table.clone(), - result_columns: vec![ResultColumn::Star], - predicate: Predicate::true_(), - limit: None, - distinct: Distinctness::All, - })); - let assertion = Interaction::Assertion(Assertion { - message: format!( - "table {} should contain all of its values after the wal reopened", - table - ), - func: Box::new({ - let table = table.clone(); - move |stack: &Vec, env: &SimulatorEnv| { - let table = - env.tables.iter().find(|t| t.name == table).ok_or_else( - || { - LimboError::InternalError(format!( - "table {} should exist", - table - )) - }, - )?; - let last = stack.last().unwrap(); - match last { - Ok(vals) => Ok(*vals == table.rows), - Err(err) => Err(LimboError::InternalError(format!("{}", err))), - } - } - }), - }); - [select, assertion].into_iter() - }); + let checks = assert_all_table_values(tables); Vec::from_iter( std::iter::once(Interaction::FsyncQuery(query.clone())).chain(checks), ) } + Property::FaultyQuery { query, tables } => { + let checks = assert_all_table_values(tables); + let first = std::iter::once(Interaction::FaultyQuery(query.clone())); + Vec::from_iter(first.chain(checks)) + } } } } +fn assert_all_table_values(tables: &[String]) -> impl Iterator + use<'_> { + let checks = tables.iter().flat_map(|table| { + let select = Interaction::Query(Query::Select(Select { + table: table.clone(), + result_columns: vec![ResultColumn::Star], + predicate: Predicate::true_(), + limit: None, + distinct: Distinctness::All, + })); + let assertion = Interaction::Assertion(Assertion { + message: format!( + "table {} should contain all of its values after the wal reopened", + table + ), + func: Box::new({ + let table = table.clone(); + move |stack: &Vec, env: &SimulatorEnv| { + let table = env.tables.iter().find(|t| t.name == table).ok_or_else(|| { + LimboError::InternalError(format!("table {} should exist", table)) + })?; + let last = stack.last().unwrap(); + match last { + Ok(vals) => Ok(*vals == table.rows), + Err(err) => Err(LimboError::InternalError(format!("{}", err))), + } + } + }), + }); + [select, assertion].into_iter() + }); + checks +} + #[derive(Debug)] pub(crate) struct Remaining { pub(crate) read: f64, @@ -755,6 +770,17 @@ fn property_fsync_no_wait( } } +fn property_faulty_query( + rng: &mut R, + env: &SimulatorEnv, + remaining: &Remaining, +) -> Property { + Property::FaultyQuery { + query: Query::arbitrary_from(rng, (env, remaining)), + tables: env.tables.iter().map(|t| t.name.clone()).collect(), + } +} + impl ArbitraryFrom<(&SimulatorEnv, &InteractionStats)> for Property { fn arbitrary_from( rng: &mut R, @@ -820,6 +846,10 @@ impl ArbitraryFrom<(&SimulatorEnv, &InteractionStats)> for Property { }, Box::new(|rng: &mut R| property_fsync_no_wait(rng, env, &remaining_)), ), + ( + 20.0, + Box::new(|rng: &mut R| property_faulty_query(rng, env, &remaining_)), + ), ], rng, ) diff --git a/simulator/runner/execution.rs b/simulator/runner/execution.rs index c2c972a38..7f8de7ddb 100644 --- a/simulator/runner/execution.rs +++ b/simulator/runner/execution.rs @@ -192,7 +192,7 @@ pub(crate) fn execute_interaction( }; let results = interaction.execute_query(conn, &env.io); - tracing::debug!("{:?}", results); + tracing::debug!(?results); stack.push(results); limbo_integrity_check(conn)?; } @@ -204,7 +204,7 @@ pub(crate) fn execute_interaction( }; let results = interaction.execute_fsync_query(conn.clone(), env); - tracing::debug!("{:?}", results); + tracing::debug!(?results); stack.push(results); let query_interaction = Interaction::Query(query.clone()); @@ -235,7 +235,7 @@ pub(crate) fn execute_interaction( }; let results = interaction.execute_faulty_query(&conn, env); - tracing::debug!("{:?}", results); + tracing::debug!(?results); stack.push(results); // Reset fault injection env.io.inject_fault(false); diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index 6b885fae1..12eb10f30 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -26,6 +26,9 @@ pub(crate) struct SimulatorFile { /// Number of `sync` function calls (both success and failures). pub(crate) nr_sync_calls: Cell, + /// Number of `sync` function calls with injected fault. + pub(crate) nr_sync_faults: Cell, + pub(crate) page_size: usize, pub(crate) rng: RefCell, @@ -107,6 +110,7 @@ impl File for SimulatorFile { ) -> Result> { self.nr_pread_calls.set(self.nr_pread_calls.get() + 1); if self.fault.get() { + tracing::debug!("pread fault"); self.nr_pread_faults.set(self.nr_pread_faults.get() + 1); return Err(turso_core::LimboError::InternalError( "Injected fault".into(), @@ -141,6 +145,7 @@ impl File for SimulatorFile { ) -> Result> { self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1); if self.fault.get() { + tracing::debug!("pwrite fault"); self.nr_pwrite_faults.set(self.nr_pwrite_faults.get() + 1); return Err(turso_core::LimboError::InternalError( "Injected fault".into(), @@ -169,6 +174,13 @@ impl File for SimulatorFile { fn sync(&self, mut c: turso_core::Completion) -> Result> { self.nr_sync_calls.set(self.nr_sync_calls.get() + 1); + if self.fault.get() { + tracing::debug!("sync fault"); + self.nr_sync_faults.set(self.nr_sync_faults.get() + 1); + return Err(turso_core::LimboError::InternalError( + "Injected fault".into(), + )); + } if let Some(latency) = self.generate_latency_duration() { let CompletionType::Sync(sync_completion) = &mut c.completion_type else { unreachable!(); diff --git a/simulator/runner/io.rs b/simulator/runner/io.rs index 30649b3a0..b3c823125 100644 --- a/simulator/runner/io.rs +++ b/simulator/runner/io.rs @@ -79,6 +79,7 @@ impl IO for SimulatorIO { fault: Cell::new(false), nr_pread_faults: Cell::new(0), nr_pwrite_faults: Cell::new(0), + nr_sync_faults: Cell::new(0), nr_pread_calls: Cell::new(0), nr_pwrite_calls: Cell::new(0), nr_sync_calls: Cell::new(0), diff --git a/simulator/shrink/plan.rs b/simulator/shrink/plan.rs index 521d93810..d2d548b3b 100644 --- a/simulator/shrink/plan.rs +++ b/simulator/shrink/plan.rs @@ -72,7 +72,8 @@ impl InteractionPlan { } Property::SelectLimit { .. } | Property::SelectSelectOptimizer { .. } - | Property::FsyncNoWait { .. } => {} + | Property::FsyncNoWait { .. } + | Property::FaultyQuery { .. } => {} } } // Check again after query clear if the interactions still uses the failing table