use ticks as the main simulator driver, handle disconnects correctly, add multi-connection setup

This commit is contained in:
alpaylan 2024-12-19 23:40:04 -05:00
parent cb20ca7e40
commit b355568023
2 changed files with 136 additions and 48 deletions

View file

@ -9,7 +9,7 @@ use crate::{
query::{Create, Insert, Predicate, Query, Select},
table::Value,
},
SimulatorEnv, SimulatorOpts,
SimConnection, SimulatorEnv, SimulatorOpts,
};
use crate::generation::{frequency, Arbitrary, ArbitraryFrom};
@ -21,6 +21,7 @@ pub(crate) type ResultSet = Vec<Vec<Value>>;
pub(crate) struct InteractionPlan {
pub(crate) plan: Vec<Interaction>,
pub(crate) stack: Vec<ResultSet>,
pub(crate) interaction_pointer: usize,
}
impl Display for InteractionPlan {
@ -31,6 +32,7 @@ impl Display for InteractionPlan {
Interaction::Assertion(assertion) => {
write!(f, "-- ASSERT: {};\n", assertion.message)?
}
Interaction::Fault(fault) => write!(f, "-- FAULT: {};\n", fault)?,
}
}
@ -58,6 +60,7 @@ impl Display for InteractionStats {
pub(crate) enum Interaction {
Query(Query),
Assertion(Assertion),
Fault(Fault),
}
impl Display for Interaction {
@ -65,6 +68,7 @@ impl Display for Interaction {
match self {
Interaction::Query(query) => write!(f, "{}", query),
Interaction::Assertion(assertion) => write!(f, "ASSERT: {}", assertion.message),
Interaction::Fault(fault) => write!(f, "FAULT: {}", fault),
}
}
}
@ -74,6 +78,18 @@ pub(crate) struct Assertion {
pub(crate) message: String,
}
pub(crate) enum Fault {
Disconnect,
}
impl Display for Fault {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Fault::Disconnect => write!(f, "DISCONNECT"),
}
}
}
pub(crate) struct Interactions(Vec<Interaction>);
impl Interactions {
@ -96,6 +112,7 @@ impl Interactions {
Query::Select(_) => {}
},
Interaction::Assertion(_) => {}
Interaction::Fault(_) => {}
}
}
}
@ -106,6 +123,7 @@ impl InteractionPlan {
InteractionPlan {
plan: Vec::new(),
stack: Vec::new(),
interaction_pointer: 0,
}
}
@ -127,6 +145,7 @@ impl InteractionPlan {
Query::Create(_) => {}
},
Interaction::Assertion(_) => {}
Interaction::Fault(_) => {}
}
}
@ -223,6 +242,9 @@ impl Interaction {
Interaction::Assertion(_) => {
unreachable!("unexpected: this function should only be called on queries")
}
Interaction::Fault(fault) => {
unreachable!("unexpected: this function should only be called on queries")
}
}
}
@ -239,6 +261,38 @@ impl Interaction {
}
Ok(())
}
Interaction::Fault(_) => {
unreachable!("unexpected: this function should only be called on assertions")
}
}
}
pub(crate) fn execute_fault(&self, env: &mut SimulatorEnv, conn_index: usize) -> Result<()> {
match self {
Interaction::Query(_) => {
unreachable!("unexpected: this function should only be called on faults")
}
Interaction::Assertion(_) => {
unreachable!("unexpected: this function should only be called on faults")
}
Interaction::Fault(fault) => {
match fault {
Fault::Disconnect => {
match env.connections[conn_index] {
SimConnection::Connected(ref mut conn) => {
conn.close()?;
}
SimConnection::Disconnected => {
return Err(limbo_core::LimboError::InternalError(
"Tried to disconnect a disconnected connection".to_string(),
));
}
}
env.connections[conn_index] = SimConnection::Disconnected;
}
}
Ok(())
}
}
}
}
@ -307,6 +361,11 @@ fn random_write<R: rand::Rng>(rng: &mut R, env: &SimulatorEnv) -> Interactions {
Interactions(vec![insert_query])
}
fn random_fault<R: rand::Rng>(rng: &mut R, env: &SimulatorEnv) -> Interactions {
let fault = Interaction::Fault(Fault::Disconnect);
Interactions(vec![fault])
}
impl ArbitraryFrom<(&SimulatorEnv, InteractionStats)> for Interactions {
fn arbitrary_from<R: rand::Rng>(
rng: &mut R,
@ -334,6 +393,7 @@ impl ArbitraryFrom<(&SimulatorEnv, InteractionStats)> for Interactions {
Box::new(|rng: &mut R| random_write(rng, env)),
),
(1, Box::new(|rng: &mut R| create_table(rng, env))),
(1, Box::new(|rng: &mut R| random_fault(rng, env))),
],
rng,
)

View file

@ -1,4 +1,4 @@
use generation::plan::{Interaction, ResultSet};
use generation::plan::{Interaction, InteractionPlan, ResultSet};
use generation::{pick, pick_index, Arbitrary, ArbitraryFrom};
use limbo_core::{Connection, Database, File, OpenFlags, PlatformIO, Result, RowResult, IO};
use model::query::{Create, Insert, Predicate, Query, Select};
@ -66,7 +66,7 @@ fn main() {
};
let opts = SimulatorOpts {
ticks: rng.gen_range(0..4096),
ticks: rng.gen_range(0..1024),
max_connections: 1, // TODO: for now let's use one connection as we didn't implement
// correct transactions procesing
max_tables: rng.gen_range(0..128),
@ -74,7 +74,7 @@ fn main() {
write_percent,
delete_percent,
page_size: 4096, // TODO: randomize this too
max_interactions: rng.gen_range(0..10000),
max_interactions: rng.gen_range(0..1024),
};
let io = Arc::new(SimulatorIO::new(seed, opts.page_size).unwrap());
@ -101,63 +101,87 @@ fn main() {
println!("Initial opts {:?}", env.opts);
log::info!("Generating database interaction plan...");
let mut plan = generation::plan::InteractionPlan::arbitrary_from(&mut env.rng.clone(), &env);
let mut plans = (1..=env.opts.max_connections)
.map(|_| InteractionPlan::arbitrary_from(&mut env.rng.clone(), &env))
.collect::<Vec<_>>();
log::info!("{}", plan.stats());
log::info!("{}", plans[0].stats());
for interaction in &plan.plan {
let connection_index = pick_index(env.connections.len(), &mut env.rng);
let mut connection = env.connections[connection_index].clone();
log::info!("Executing database interaction plan...");
let result = execute_plans(&mut env, &mut plans);
if matches!(connection, SimConnection::Disconnected) {
connection = SimConnection::Connected(env.db.connect());
env.connections[connection_index] = connection.clone();
}
match &mut connection {
SimConnection::Connected(conn) => {
let disconnect = env.rng.gen_ratio(1, 100);
if disconnect {
log::info!("disconnecting {}", connection_index);
let _ = conn.close();
env.connections[connection_index] = SimConnection::Disconnected;
} else {
match process_connection(conn, interaction, &mut plan.stack) {
Ok(_) => {
log::info!("connection {} processed", connection_index);
}
Err(err) => {
log::error!("error {}", err);
log::debug!("db is at {:?}", path);
// save the interaction plan
let mut path = TempDir::new().unwrap().into_path();
path.push("simulator.plan");
let mut f = std::fs::File::create(path.clone()).unwrap();
f.write(plan.to_string().as_bytes()).unwrap();
log::debug!("plan saved at {:?}", path);
log::debug!("seed was {}", seed);
break;
}
}
}
}
SimConnection::Disconnected => {
log::info!("disconnecting {}", connection_index);
env.connections[connection_index] = SimConnection::Connected(env.db.connect());
}
}
if result.is_err() {
log::error!("error executing plans: {:?}", result.err());
}
log::info!("db is at {:?}", path);
let mut path = TempDir::new().unwrap().into_path();
path.push("simulator.plan");
let mut f = std::fs::File::create(path.clone()).unwrap();
f.write(plans[0].to_string().as_bytes()).unwrap();
log::info!("plan saved at {:?}", path);
log::info!("seed was {}", seed);
env.io.print_stats();
}
fn process_connection(
conn: &mut Rc<Connection>,
fn execute_plans(env: &mut SimulatorEnv, plans: &mut Vec<InteractionPlan>) -> Result<()> {
// todo: add history here by recording which interaction was executed at which tick
for _tick in 0..env.opts.ticks {
// Pick the connection to interact with
let connection_index = pick_index(env.connections.len(), &mut env.rng);
// Execute the interaction for the selected connection
execute_plan(env, connection_index, plans)?;
}
Ok(())
}
fn execute_plan(
env: &mut SimulatorEnv,
connection_index: usize,
plans: &mut Vec<InteractionPlan>,
) -> Result<()> {
let connection = &env.connections[connection_index];
let plan = &mut plans[connection_index];
if plan.interaction_pointer >= plan.plan.len() {
return Ok(());
}
let interaction = &plan.plan[plan.interaction_pointer];
if let SimConnection::Disconnected = connection {
log::info!("connecting {}", connection_index);
env.connections[connection_index] = SimConnection::Connected(env.db.connect());
} else {
match execute_interaction(env, connection_index, interaction, &mut plan.stack) {
Ok(_) => {
log::debug!("connection {} processed", connection_index);
plan.interaction_pointer += 1;
}
Err(err) => {
log::error!("error {}", err);
return Err(err);
}
}
}
Ok(())
}
fn execute_interaction(
env: &mut SimulatorEnv,
connection_index: usize,
interaction: &Interaction,
stack: &mut Vec<ResultSet>,
) -> Result<()> {
match interaction {
generation::plan::Interaction::Query(_) => {
let conn = match &mut env.connections[connection_index] {
SimConnection::Connected(conn) => conn,
SimConnection::Disconnected => unreachable!(),
};
log::debug!("{}", interaction);
let results = interaction.execute_query(conn)?;
log::debug!("{:?}", results);
@ -165,6 +189,10 @@ fn process_connection(
}
generation::plan::Interaction::Assertion(_) => {
interaction.execute_assertion(stack)?;
stack.clear();
}
Interaction::Fault(_) => {
interaction.execute_fault(env, connection_index)?;
}
}