mirror of
https://github.com/tursodatabase/limbo.git
synced 2025-08-04 18:18:03 +00:00
introduce fsync interaction + property
This commit is contained in:
parent
f806d97d0f
commit
0288f4aac6
5 changed files with 185 additions and 51 deletions
|
@ -188,6 +188,10 @@ impl Display for InteractionPlan {
|
|||
writeln!(f, "-- ASSERT {};", assertion.message)?
|
||||
}
|
||||
Interaction::Fault(fault) => writeln!(f, "-- FAULT '{}';", fault)?,
|
||||
Interaction::FsyncQuery(query) => {
|
||||
writeln!(f, "-- FSYNC QUERY;")?;
|
||||
writeln!(f, "{};", query)?
|
||||
}
|
||||
}
|
||||
}
|
||||
writeln!(f, "-- end testing '{}'", name)?;
|
||||
|
@ -238,6 +242,9 @@ pub(crate) enum Interaction {
|
|||
Assumption(Assertion),
|
||||
Assertion(Assertion),
|
||||
Fault(Fault),
|
||||
/// Will attempt to run any random query. However, when the connection tries to sync it will
|
||||
/// close all connections and reopen the database and assert that no data was lost
|
||||
FsyncQuery(Query),
|
||||
}
|
||||
|
||||
impl Display for Interaction {
|
||||
|
@ -247,6 +254,7 @@ impl Display for Interaction {
|
|||
Self::Assumption(assumption) => write!(f, "ASSUME {}", assumption.message),
|
||||
Self::Assertion(assertion) => write!(f, "ASSERT {}", assertion.message),
|
||||
Self::Fault(fault) => write!(f, "FAULT '{}'", fault),
|
||||
Self::FsyncQuery(query) => write!(f, "{}", query),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -373,6 +381,8 @@ impl Interactions {
|
|||
select1.shadow(env);
|
||||
select2.shadow(env);
|
||||
}
|
||||
// Nothing should change
|
||||
Property::FsyncNoWait { .. } => {}
|
||||
}
|
||||
for interaction in property.interactions() {
|
||||
match interaction {
|
||||
|
@ -402,6 +412,8 @@ impl Interactions {
|
|||
Interaction::Assertion(_) => {}
|
||||
Interaction::Assumption(_) => {}
|
||||
Interaction::Fault(_) => {}
|
||||
// FsyncQuery should not shadow as we are not going to run it to completion
|
||||
Interaction::FsyncQuery(_) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -503,7 +515,9 @@ impl Interaction {
|
|||
pub(crate) fn shadow(&self, env: &mut SimulatorEnv) -> Vec<Vec<SimValue>> {
|
||||
match self {
|
||||
Self::Query(query) => query.shadow(env),
|
||||
Self::Assumption(_) | Self::Assertion(_) | Self::Fault(_) => vec![],
|
||||
Self::Assumption(_) | Self::Assertion(_) | Self::Fault(_) | Self::FsyncQuery(_) => {
|
||||
vec![]
|
||||
}
|
||||
}
|
||||
}
|
||||
pub(crate) fn execute_query(&self, conn: &mut Arc<Connection>, io: &SimulatorIO) -> ResultSet {
|
||||
|
@ -557,9 +571,6 @@ impl Interaction {
|
|||
env: &SimulatorEnv,
|
||||
) -> Result<()> {
|
||||
match self {
|
||||
Self::Query(_) => {
|
||||
unreachable!("unexpected: this function should only be called on assertions")
|
||||
}
|
||||
Self::Assertion(assertion) => {
|
||||
let result = assertion.func.as_ref()(stack, env);
|
||||
match result {
|
||||
|
@ -573,10 +584,7 @@ impl Interaction {
|
|||
))),
|
||||
}
|
||||
}
|
||||
Self::Assumption(_) => {
|
||||
unreachable!("unexpected: this function should only be called on assertions")
|
||||
}
|
||||
Self::Fault(_) => {
|
||||
_ => {
|
||||
unreachable!("unexpected: this function should only be called on assertions")
|
||||
}
|
||||
}
|
||||
|
@ -588,12 +596,6 @@ impl Interaction {
|
|||
env: &SimulatorEnv,
|
||||
) -> Result<()> {
|
||||
match self {
|
||||
Self::Query(_) => {
|
||||
unreachable!("unexpected: this function should only be called on assumptions")
|
||||
}
|
||||
Self::Assertion(_) => {
|
||||
unreachable!("unexpected: this function should only be called on assumptions")
|
||||
}
|
||||
Self::Assumption(assumption) => {
|
||||
let result = assumption.func.as_ref()(stack, env);
|
||||
match result {
|
||||
|
@ -607,7 +609,7 @@ impl Interaction {
|
|||
))),
|
||||
}
|
||||
}
|
||||
Self::Fault(_) => {
|
||||
_ => {
|
||||
unreachable!("unexpected: this function should only be called on assumptions")
|
||||
}
|
||||
}
|
||||
|
@ -615,15 +617,6 @@ impl Interaction {
|
|||
|
||||
pub(crate) fn execute_fault(&self, env: &mut SimulatorEnv, conn_index: usize) -> Result<()> {
|
||||
match self {
|
||||
Self::Query(_) => {
|
||||
unreachable!("unexpected: this function should only be called on faults")
|
||||
}
|
||||
Self::Assertion(_) => {
|
||||
unreachable!("unexpected: this function should only be called on faults")
|
||||
}
|
||||
Self::Assumption(_) => {
|
||||
unreachable!("unexpected: this function should only be called on faults")
|
||||
}
|
||||
Self::Fault(fault) => {
|
||||
match fault {
|
||||
Fault::Disconnect => {
|
||||
|
@ -637,36 +630,90 @@ impl Interaction {
|
|||
env.connections[conn_index] = SimConnection::Disconnected;
|
||||
}
|
||||
Fault::ReopenDatabase => {
|
||||
// 1. Close all connections without default checkpoint-on-close behavior
|
||||
// to expose bugs related to how we handle WAL
|
||||
let num_conns = env.connections.len();
|
||||
env.connections.clear();
|
||||
|
||||
// 2. Re-open database
|
||||
let db_path = env.db_path.clone();
|
||||
let db = match turso_core::Database::open_file(
|
||||
env.io.clone(),
|
||||
&db_path,
|
||||
false,
|
||||
false,
|
||||
) {
|
||||
Ok(db) => db,
|
||||
Err(e) => {
|
||||
panic!("error opening simulator test file {:?}: {:?}", db_path, e);
|
||||
}
|
||||
};
|
||||
env.db = db;
|
||||
|
||||
for _ in 0..num_conns {
|
||||
env.connections
|
||||
.push(SimConnection::LimboConnection(env.db.connect().unwrap()));
|
||||
}
|
||||
reopen_database(env);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
_ => {
|
||||
unreachable!("unexpected: this function should only be called on faults")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn execute_fsync_query(
|
||||
&self,
|
||||
conn: Arc<Connection>,
|
||||
env: &mut SimulatorEnv,
|
||||
) -> Result<()> {
|
||||
if let Self::FsyncQuery(query) = self {
|
||||
let query_str = query.to_string();
|
||||
let rows = conn.query(&query_str);
|
||||
if rows.is_err() {
|
||||
let err = rows.err();
|
||||
tracing::debug!(
|
||||
"Error running query '{}': {:?}",
|
||||
&query_str[0..query_str.len().min(4096)],
|
||||
err
|
||||
);
|
||||
return Err(err.unwrap());
|
||||
}
|
||||
let rows = rows?;
|
||||
assert!(rows.is_some());
|
||||
let mut rows = rows.unwrap();
|
||||
while let Ok(row) = rows.step() {
|
||||
match row {
|
||||
StepResult::IO => {
|
||||
let syncing = {
|
||||
let files = env.io.files.borrow();
|
||||
// TODO: currently assuming we only have 1 file that is syncing
|
||||
files
|
||||
.iter()
|
||||
.any(|file| file.sync_completion.borrow().is_some())
|
||||
};
|
||||
if syncing {
|
||||
reopen_database(env);
|
||||
} else {
|
||||
env.io.run_once().unwrap();
|
||||
}
|
||||
}
|
||||
StepResult::Done => {
|
||||
break;
|
||||
}
|
||||
StepResult::Row | StepResult::Interrupt | StepResult::Busy => {}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
unreachable!("unexpected: this function should only be called on queries")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn reopen_database(env: &mut SimulatorEnv) {
|
||||
// 1. Close all connections without default checkpoint-on-close behavior
|
||||
// to expose bugs related to how we handle WAL
|
||||
let num_conns = env.connections.len();
|
||||
env.connections.clear();
|
||||
|
||||
// Clear all open files
|
||||
env.io.files.borrow_mut().clear();
|
||||
|
||||
// 2. Re-open database
|
||||
let db_path = env.db_path.clone();
|
||||
let db = match turso_core::Database::open_file(env.io.clone(), &db_path, false, false) {
|
||||
Ok(db) => db,
|
||||
Err(e) => {
|
||||
panic!("error opening simulator test file {:?}: {:?}", db_path, e);
|
||||
}
|
||||
};
|
||||
env.db = db;
|
||||
|
||||
for _ in 0..num_conns {
|
||||
env.connections
|
||||
.push(SimConnection::LimboConnection(env.db.connect().unwrap()));
|
||||
}
|
||||
}
|
||||
|
||||
fn random_create<R: rand::Rng>(rng: &mut R, _env: &SimulatorEnv) -> Interactions {
|
||||
|
|
|
@ -9,7 +9,7 @@ use crate::{
|
|||
select::{Distinctness, ResultColumn},
|
||||
Create, Delete, Drop, Insert, Query, Select,
|
||||
},
|
||||
table::SimValue,
|
||||
table::{SimValue, Table},
|
||||
},
|
||||
runner::env::SimulatorEnv,
|
||||
};
|
||||
|
@ -127,6 +127,10 @@ pub(crate) enum Property {
|
|||
table: String,
|
||||
predicate: Predicate,
|
||||
},
|
||||
FsyncNoWait {
|
||||
query: Query,
|
||||
tables: Vec<Table>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Property {
|
||||
|
@ -138,6 +142,7 @@ impl Property {
|
|||
Property::DeleteSelect { .. } => "Delete-Select",
|
||||
Property::DropSelect { .. } => "Drop-Select",
|
||||
Property::SelectSelectOptimizer { .. } => "Select-Select-Optimizer",
|
||||
Property::FsyncNoWait { .. } => "FsyncNoWait",
|
||||
}
|
||||
}
|
||||
/// interactions construct a list of interactions, which is an executable representation of the property.
|
||||
|
@ -428,6 +433,47 @@ impl Property {
|
|||
|
||||
vec![assumption, select1, select2, assertion]
|
||||
}
|
||||
Property::FsyncNoWait { query, tables } => {
|
||||
let checks = tables
|
||||
.iter()
|
||||
.map(|table| {
|
||||
let select = Interaction::Query(Query::Select(Select {
|
||||
table: table.name.clone(),
|
||||
result_columns: vec![ResultColumn::Star],
|
||||
predicate: Predicate::true_(),
|
||||
limit: None,
|
||||
distinct: Distinctness::All,
|
||||
}));
|
||||
let assertion = Interaction::Assertion(Assertion {
|
||||
message: format!(
|
||||
"table {} contains all of its values after the wal reopened",
|
||||
table.name
|
||||
),
|
||||
func: Box::new({
|
||||
let table = table.clone();
|
||||
move |stack: &Vec<ResultSet>, _: &SimulatorEnv| {
|
||||
let last = stack.last().unwrap();
|
||||
match last {
|
||||
Ok(vals) => {
|
||||
if *vals != table.rows {
|
||||
tracing::error!(table.name, ?vals, ?table.rows, "values mismatch after wal reopen");
|
||||
}
|
||||
Ok(*vals == table.rows)
|
||||
}
|
||||
Err(err) => {
|
||||
Err(LimboError::InternalError(format!("{}", err)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
});
|
||||
[select, assertion].into_iter()
|
||||
})
|
||||
.flatten();
|
||||
Vec::from_iter(
|
||||
std::iter::once(Interaction::FsyncQuery(query.clone())).chain(checks),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -697,6 +743,17 @@ fn property_select_select_optimizer<R: rand::Rng>(rng: &mut R, env: &SimulatorEn
|
|||
}
|
||||
}
|
||||
|
||||
fn property_fsync_no_wait<R: rand::Rng>(
|
||||
rng: &mut R,
|
||||
env: &SimulatorEnv,
|
||||
remaining: &Remaining,
|
||||
) -> Property {
|
||||
Property::FsyncNoWait {
|
||||
query: Query::arbitrary_from(rng, (env, remaining)),
|
||||
tables: env.tables.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
impl ArbitraryFrom<(&SimulatorEnv, &InteractionStats)> for Property {
|
||||
fn arbitrary_from<R: rand::Rng>(
|
||||
rng: &mut R,
|
||||
|
@ -754,6 +811,10 @@ impl ArbitraryFrom<(&SimulatorEnv, &InteractionStats)> for Property {
|
|||
},
|
||||
Box::new(|rng: &mut R| property_select_select_optimizer(rng, env)),
|
||||
),
|
||||
(
|
||||
50.0, // Freestyle number
|
||||
Box::new(|rng: &mut R| property_fsync_no_wait(rng, env, &remaining_)),
|
||||
),
|
||||
],
|
||||
rng,
|
||||
)
|
||||
|
|
|
@ -337,6 +337,15 @@ fn execute_interaction_rusqlite(
|
|||
tracing::debug!("{:?}", results);
|
||||
stack.push(results);
|
||||
}
|
||||
Interaction::FsyncQuery(_) => {
|
||||
let conn = match &env.connections[connection_index] {
|
||||
SimConnection::LimboConnection(conn) => conn.clone(),
|
||||
SimConnection::SQLiteConnection(_) => unreachable!(),
|
||||
SimConnection::Disconnected => unreachable!(),
|
||||
};
|
||||
|
||||
interaction.execute_fsync_query(conn.clone(), env)?;
|
||||
}
|
||||
Interaction::Assertion(_) => {
|
||||
interaction.execute_assertion(stack, env)?;
|
||||
stack.clear();
|
||||
|
|
|
@ -196,6 +196,22 @@ pub(crate) fn execute_interaction(
|
|||
stack.push(results);
|
||||
limbo_integrity_check(conn)?;
|
||||
}
|
||||
Interaction::FsyncQuery(_) => {
|
||||
let conn = match &env.connections[connection_index] {
|
||||
SimConnection::LimboConnection(conn) => conn.clone(),
|
||||
SimConnection::SQLiteConnection(_) => unreachable!(),
|
||||
SimConnection::Disconnected => unreachable!(),
|
||||
};
|
||||
|
||||
interaction.execute_fsync_query(conn.clone(), env)?;
|
||||
|
||||
let conn = match &env.connections[connection_index] {
|
||||
SimConnection::LimboConnection(conn) => conn,
|
||||
SimConnection::SQLiteConnection(_) => unreachable!(),
|
||||
SimConnection::Disconnected => unreachable!(),
|
||||
};
|
||||
limbo_integrity_check(conn)?;
|
||||
}
|
||||
Interaction::Assertion(_) => {
|
||||
interaction.execute_assertion(stack, env)?;
|
||||
stack.clear();
|
||||
|
@ -217,7 +233,7 @@ pub(crate) fn execute_interaction(
|
|||
Ok(ExecutionContinuation::NextInteraction)
|
||||
}
|
||||
|
||||
fn limbo_integrity_check(conn: &mut Arc<Connection>) -> Result<()> {
|
||||
fn limbo_integrity_check(conn: &Arc<Connection>) -> Result<()> {
|
||||
let mut rows = conn.query("PRAGMA integrity_check;")?.unwrap();
|
||||
let mut result = Vec::new();
|
||||
|
||||
|
|
|
@ -71,7 +71,8 @@ impl InteractionPlan {
|
|||
queries.clear();
|
||||
}
|
||||
Property::SelectLimit { .. }
|
||||
| Property::SelectSelectOptimizer { .. } => {}
|
||||
| Property::SelectSelectOptimizer { .. }
|
||||
| Property::FsyncNoWait { .. } => {}
|
||||
}
|
||||
}
|
||||
// Check again after query clear if the interactions still uses the failing table
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue