add FaultyQuery Property to inject fault in a query and then assert that it did not modify the database

This commit is contained in:
pedrocarlo 2025-06-29 15:54:27 -03:00
parent b578f2249a
commit 119c537334
6 changed files with 101 additions and 57 deletions

View file

@ -592,9 +592,21 @@ impl Interaction {
let mut rows = rows.unwrap().unwrap();
let mut out = Vec::new();
let mut current_prob = 0.05;
let mut incr = 0.01;
while let Ok(row) = rows.step() {
match row {
let mut incr = 0.001;
loop {
let syncing = {
let files = env.io.files.borrow();
// TODO: currently assuming we only have 1 file that is syncing
files
.iter()
.any(|file| file.sync_completion.borrow().is_some())
};
let inject_fault = env.rng.gen_bool(current_prob);
if inject_fault || syncing {
env.io.inject_fault(true);
}
match rows.step()? {
StepResult::Row => {
let row = rows.row().unwrap();
let mut r = Vec::new();
@ -605,24 +617,12 @@ impl Interaction {
out.push(r);
}
StepResult::IO => {
let syncing = {
let files = env.io.files.borrow();
// TODO: currently assuming we only have 1 file that is syncing
files
.iter()
.any(|file| file.sync_completion.borrow().is_some())
};
let inject_fault = env.rng.gen_bool(current_prob);
if inject_fault || syncing {
env.io.inject_fault(true);
}
rows.run_once()?;
current_prob += incr;
if current_prob > 1.0 {
current_prob = 1.0;
} else {
incr += 0.01;
incr *= 1.01;
}
}
StepResult::Done => {

View file

@ -123,7 +123,10 @@ pub(crate) enum Property {
/// tends to optimize `where` statements while keeping the result column expressions
/// unoptimized. This property is used to test the optimizer. The property is successful
/// if the two queries return the same number of rows.
SelectSelectOptimizer { table: String, predicate: Predicate },
SelectSelectOptimizer {
table: String,
predicate: Predicate,
},
/// FsyncNoWait is a property which tests if we do not loose any data after not waiting for fsync.
///
/// # Interactions
@ -132,7 +135,14 @@ pub(crate) enum Property {
/// - Execute the `query` again
/// - Query tables to assert that the values were inserted
///
FsyncNoWait { query: Query, tables: Vec<String> },
FsyncNoWait {
query: Query,
tables: Vec<String>,
},
FaultyQuery {
query: Query,
tables: Vec<String>,
},
}
impl Property {
@ -145,6 +155,7 @@ impl Property {
Property::DropSelect { .. } => "Drop-Select",
Property::SelectSelectOptimizer { .. } => "Select-Select-Optimizer",
Property::FsyncNoWait { .. } => "FsyncNoWait",
Property::FaultyQuery { .. } => "FaultyQuery",
}
}
/// interactions construct a list of interactions, which is an executable representation of the property.
@ -436,49 +447,53 @@ impl Property {
vec![assumption, select1, select2, assertion]
}
Property::FsyncNoWait { query, tables } => {
let checks = tables.iter().flat_map(|table| {
let select = Interaction::Query(Query::Select(Select {
table: table.clone(),
result_columns: vec![ResultColumn::Star],
predicate: Predicate::true_(),
limit: None,
distinct: Distinctness::All,
}));
let assertion = Interaction::Assertion(Assertion {
message: format!(
"table {} should contain all of its values after the wal reopened",
table
),
func: Box::new({
let table = table.clone();
move |stack: &Vec<ResultSet>, env: &SimulatorEnv| {
let table =
env.tables.iter().find(|t| t.name == table).ok_or_else(
|| {
LimboError::InternalError(format!(
"table {} should exist",
table
))
},
)?;
let last = stack.last().unwrap();
match last {
Ok(vals) => Ok(*vals == table.rows),
Err(err) => Err(LimboError::InternalError(format!("{}", err))),
}
}
}),
});
[select, assertion].into_iter()
});
let checks = assert_all_table_values(tables);
Vec::from_iter(
std::iter::once(Interaction::FsyncQuery(query.clone())).chain(checks),
)
}
Property::FaultyQuery { query, tables } => {
let checks = assert_all_table_values(tables);
let first = std::iter::once(Interaction::FaultyQuery(query.clone()));
Vec::from_iter(first.chain(checks))
}
}
}
}
fn assert_all_table_values(tables: &[String]) -> impl Iterator<Item = Interaction> + use<'_> {
let checks = tables.iter().flat_map(|table| {
let select = Interaction::Query(Query::Select(Select {
table: table.clone(),
result_columns: vec![ResultColumn::Star],
predicate: Predicate::true_(),
limit: None,
distinct: Distinctness::All,
}));
let assertion = Interaction::Assertion(Assertion {
message: format!(
"table {} should contain all of its values after the wal reopened",
table
),
func: Box::new({
let table = table.clone();
move |stack: &Vec<ResultSet>, env: &SimulatorEnv| {
let table = env.tables.iter().find(|t| t.name == table).ok_or_else(|| {
LimboError::InternalError(format!("table {} should exist", table))
})?;
let last = stack.last().unwrap();
match last {
Ok(vals) => Ok(*vals == table.rows),
Err(err) => Err(LimboError::InternalError(format!("{}", err))),
}
}
}),
});
[select, assertion].into_iter()
});
checks
}
#[derive(Debug)]
pub(crate) struct Remaining {
pub(crate) read: f64,
@ -755,6 +770,17 @@ fn property_fsync_no_wait<R: rand::Rng>(
}
}
fn property_faulty_query<R: rand::Rng>(
rng: &mut R,
env: &SimulatorEnv,
remaining: &Remaining,
) -> Property {
Property::FaultyQuery {
query: Query::arbitrary_from(rng, (env, remaining)),
tables: env.tables.iter().map(|t| t.name.clone()).collect(),
}
}
impl ArbitraryFrom<(&SimulatorEnv, &InteractionStats)> for Property {
fn arbitrary_from<R: rand::Rng>(
rng: &mut R,
@ -820,6 +846,10 @@ impl ArbitraryFrom<(&SimulatorEnv, &InteractionStats)> for Property {
},
Box::new(|rng: &mut R| property_fsync_no_wait(rng, env, &remaining_)),
),
(
20.0,
Box::new(|rng: &mut R| property_faulty_query(rng, env, &remaining_)),
),
],
rng,
)

View file

@ -192,7 +192,7 @@ pub(crate) fn execute_interaction(
};
let results = interaction.execute_query(conn, &env.io);
tracing::debug!("{:?}", results);
tracing::debug!(?results);
stack.push(results);
limbo_integrity_check(conn)?;
}
@ -204,7 +204,7 @@ pub(crate) fn execute_interaction(
};
let results = interaction.execute_fsync_query(conn.clone(), env);
tracing::debug!("{:?}", results);
tracing::debug!(?results);
stack.push(results);
let query_interaction = Interaction::Query(query.clone());
@ -235,7 +235,7 @@ pub(crate) fn execute_interaction(
};
let results = interaction.execute_faulty_query(&conn, env);
tracing::debug!("{:?}", results);
tracing::debug!(?results);
stack.push(results);
// Reset fault injection
env.io.inject_fault(false);

View file

@ -26,6 +26,9 @@ pub(crate) struct SimulatorFile {
/// Number of `sync` function calls (both success and failures).
pub(crate) nr_sync_calls: Cell<usize>,
/// Number of `sync` function calls with injected fault.
pub(crate) nr_sync_faults: Cell<usize>,
pub(crate) page_size: usize,
pub(crate) rng: RefCell<ChaCha8Rng>,
@ -107,6 +110,7 @@ impl File for SimulatorFile {
) -> Result<Arc<turso_core::Completion>> {
self.nr_pread_calls.set(self.nr_pread_calls.get() + 1);
if self.fault.get() {
tracing::debug!("pread fault");
self.nr_pread_faults.set(self.nr_pread_faults.get() + 1);
return Err(turso_core::LimboError::InternalError(
"Injected fault".into(),
@ -141,6 +145,7 @@ impl File for SimulatorFile {
) -> Result<Arc<turso_core::Completion>> {
self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1);
if self.fault.get() {
tracing::debug!("pwrite fault");
self.nr_pwrite_faults.set(self.nr_pwrite_faults.get() + 1);
return Err(turso_core::LimboError::InternalError(
"Injected fault".into(),
@ -169,6 +174,13 @@ impl File for SimulatorFile {
fn sync(&self, mut c: turso_core::Completion) -> Result<Arc<turso_core::Completion>> {
self.nr_sync_calls.set(self.nr_sync_calls.get() + 1);
if self.fault.get() {
tracing::debug!("sync fault");
self.nr_sync_faults.set(self.nr_sync_faults.get() + 1);
return Err(turso_core::LimboError::InternalError(
"Injected fault".into(),
));
}
if let Some(latency) = self.generate_latency_duration() {
let CompletionType::Sync(sync_completion) = &mut c.completion_type else {
unreachable!();

View file

@ -79,6 +79,7 @@ impl IO for SimulatorIO {
fault: Cell::new(false),
nr_pread_faults: Cell::new(0),
nr_pwrite_faults: Cell::new(0),
nr_sync_faults: Cell::new(0),
nr_pread_calls: Cell::new(0),
nr_pwrite_calls: Cell::new(0),
nr_sync_calls: Cell::new(0),

View file

@ -72,7 +72,8 @@ impl InteractionPlan {
}
Property::SelectLimit { .. }
| Property::SelectSelectOptimizer { .. }
| Property::FsyncNoWait { .. } => {}
| Property::FsyncNoWait { .. }
| Property::FaultyQuery { .. } => {}
}
}
// Check again after query clear if the interactions still uses the failing table