mirror of
https://github.com/tursodatabase/limbo.git
synced 2025-12-23 08:21:09 +00:00
## Background In the simulator, we do our best to replicate the effects of an interactive transaction into the simulator's shadow state whenever that transaction commits. ## Problem this didn't work: BEGIN; ALTER TABLE t ADD COLUMN foo; DELETE FROM t WHERE bar != 5; COMMIT; None of the rows where bar != 5 were deleted because apply_snapshot() was checking that the rows in the committed table were exactly equal to the rows that were recorded, but since the recorded deletes contained a NULL `foo` column, they never matched. This meant that the sim thought it should still have all the rows that were deleted. ## Fix: like all the other operations, record add column / drop column too so that they are applied in sequential order in apply_snapshot() No explicit test for this - I ran into this in another branch of mine whose seed doesn't reproduce on main (because I changed the simulator in that branch). Reviewed-by: Pedro Muniz (@pedrocarlo) Closes #4264
947 lines
33 KiB
Rust
947 lines
33 KiB
Rust
use std::fmt::Display;
|
|
use std::mem;
|
|
use std::ops::{Deref, DerefMut};
|
|
use std::panic::UnwindSafe;
|
|
use std::path::{Path, PathBuf};
|
|
use std::sync::Arc;
|
|
|
|
use bitmaps::Bitmap;
|
|
use garde::Validate;
|
|
use rand::{Rng, SeedableRng};
|
|
use rand_chacha::ChaCha8Rng;
|
|
use sql_generation::generation::GenerationContext;
|
|
use sql_generation::model::query::transaction::Rollback;
|
|
use sql_generation::model::table::{SimValue, Table};
|
|
use tracing::trace;
|
|
use turso_core::Database;
|
|
|
|
use crate::generation::Shadow;
|
|
use crate::model::Query;
|
|
use crate::profiles::Profile;
|
|
use crate::runner::SimIO;
|
|
use crate::runner::cli::IoBackend;
|
|
use crate::runner::io::SimulatorIO;
|
|
use crate::runner::memory::io::MemorySimIO;
|
|
const DEFAULT_CACHE_SIZE: usize = 2000;
|
|
use super::cli::SimulatorCLI;
|
|
|
|
#[derive(Debug, Copy, Clone)]
|
|
pub(crate) enum SimulationType {
|
|
Default,
|
|
Doublecheck,
|
|
Differential,
|
|
}
|
|
|
|
#[derive(Debug, Copy, Clone)]
|
|
pub(crate) enum SimulationPhase {
|
|
Test,
|
|
Shrink,
|
|
}
|
|
|
|
/// Represents a single operation during a transaction, applied in order.
|
|
#[derive(Debug, Clone)]
|
|
pub enum RowOperation {
|
|
Insert {
|
|
table_name: String,
|
|
row: Vec<SimValue>,
|
|
},
|
|
Delete {
|
|
table_name: String,
|
|
row: Vec<SimValue>,
|
|
},
|
|
DropTable {
|
|
table_name: String,
|
|
},
|
|
RenameTable {
|
|
old_name: String,
|
|
new_name: String,
|
|
},
|
|
AddColumn {
|
|
table_name: String,
|
|
column: sql_generation::model::table::Column,
|
|
},
|
|
DropColumn {
|
|
table_name: String,
|
|
column_index: usize,
|
|
},
|
|
RenameColumn {
|
|
table_name: String,
|
|
old_name: String,
|
|
new_name: String,
|
|
},
|
|
AlterColumn {
|
|
table_name: String,
|
|
old_name: String,
|
|
new_column: sql_generation::model::table::Column,
|
|
},
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct TransactionTables {
|
|
/// The current state after applying transaction's changes (used for reads within the transaction)
|
|
current_tables: Vec<Table>,
|
|
/// Operations recorded during this transaction, in order
|
|
operations: Vec<RowOperation>,
|
|
}
|
|
|
|
impl TransactionTables {
|
|
pub fn new(tables: Vec<Table>) -> Self {
|
|
Self {
|
|
current_tables: tables,
|
|
operations: Vec::new(),
|
|
}
|
|
}
|
|
|
|
pub fn record_insert(&mut self, table_name: String, row: Vec<SimValue>) {
|
|
self.operations
|
|
.push(RowOperation::Insert { table_name, row });
|
|
}
|
|
|
|
pub fn record_delete(&mut self, table_name: String, row: Vec<SimValue>) {
|
|
self.operations
|
|
.push(RowOperation::Delete { table_name, row });
|
|
}
|
|
|
|
pub fn record_drop_table(&mut self, table_name: String) {
|
|
self.operations.push(RowOperation::DropTable { table_name });
|
|
}
|
|
|
|
pub fn record_rename_table(&mut self, old_name: String, new_name: String) {
|
|
self.operations
|
|
.push(RowOperation::RenameTable { old_name, new_name });
|
|
}
|
|
|
|
pub fn record_add_column(
|
|
&mut self,
|
|
table_name: String,
|
|
column: sql_generation::model::table::Column,
|
|
) {
|
|
self.operations
|
|
.push(RowOperation::AddColumn { table_name, column });
|
|
}
|
|
|
|
pub fn record_drop_column(&mut self, table_name: String, column_index: usize) {
|
|
self.operations.push(RowOperation::DropColumn {
|
|
table_name,
|
|
column_index,
|
|
});
|
|
}
|
|
|
|
pub fn record_rename_column(&mut self, table_name: String, old_name: String, new_name: String) {
|
|
self.operations.push(RowOperation::RenameColumn {
|
|
table_name,
|
|
old_name,
|
|
new_name,
|
|
});
|
|
}
|
|
|
|
pub fn record_alter_column(
|
|
&mut self,
|
|
table_name: String,
|
|
old_name: String,
|
|
new_column: sql_generation::model::table::Column,
|
|
) {
|
|
self.operations.push(RowOperation::AlterColumn {
|
|
table_name,
|
|
old_name,
|
|
new_column,
|
|
});
|
|
}
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct ShadowTables<'a> {
|
|
commited_tables: &'a Vec<Table>,
|
|
transaction_tables: Option<&'a TransactionTables>,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct ShadowTablesMut<'a> {
|
|
commited_tables: &'a mut Vec<Table>,
|
|
transaction_tables: &'a mut Option<TransactionTables>,
|
|
}
|
|
|
|
impl<'a> ShadowTables<'a> {
|
|
fn tables(&self) -> &'a Vec<Table> {
|
|
self.transaction_tables
|
|
.map_or(self.commited_tables, |v| &v.current_tables)
|
|
}
|
|
}
|
|
|
|
impl<'a> Deref for ShadowTables<'a> {
|
|
type Target = Vec<Table>;
|
|
|
|
fn deref(&self) -> &Self::Target {
|
|
self.tables()
|
|
}
|
|
}
|
|
|
|
impl<'a, 'b> ShadowTablesMut<'a>
|
|
where
|
|
'a: 'b,
|
|
{
|
|
fn tables(&'a self) -> &'a Vec<Table> {
|
|
self.transaction_tables
|
|
.as_ref()
|
|
.map(|t| &t.current_tables)
|
|
.unwrap_or(self.commited_tables)
|
|
}
|
|
|
|
fn tables_mut(&'b mut self) -> &'b mut Vec<Table> {
|
|
self.transaction_tables
|
|
.as_mut()
|
|
.map(|t| &mut t.current_tables)
|
|
.unwrap_or(self.commited_tables)
|
|
}
|
|
|
|
/// Record that a row was inserted during the current transaction
|
|
pub fn record_insert(&mut self, table_name: String, row: Vec<SimValue>) {
|
|
if let Some(txn) = &mut *self.transaction_tables {
|
|
txn.record_insert(table_name, row);
|
|
}
|
|
}
|
|
|
|
/// Record that a row was deleted during the current transaction
|
|
pub fn record_delete(&mut self, table_name: String, row: Vec<SimValue>) {
|
|
if let Some(txn) = &mut *self.transaction_tables {
|
|
txn.record_delete(table_name, row);
|
|
}
|
|
}
|
|
|
|
/// Record that a table was dropped during the current transaction
|
|
pub fn record_drop_table(&mut self, table_name: String) {
|
|
if let Some(txn) = &mut *self.transaction_tables {
|
|
txn.record_drop_table(table_name);
|
|
}
|
|
}
|
|
|
|
/// Record that a table was renamed during the current transaction
|
|
pub fn record_rename_table(&mut self, old_name: String, new_name: String) {
|
|
if let Some(txn) = &mut *self.transaction_tables {
|
|
txn.record_rename_table(old_name, new_name);
|
|
}
|
|
}
|
|
|
|
/// Record that a column was added during the current transaction
|
|
pub fn record_add_column(
|
|
&mut self,
|
|
table_name: String,
|
|
column: sql_generation::model::table::Column,
|
|
) {
|
|
if let Some(txn) = &mut *self.transaction_tables {
|
|
txn.record_add_column(table_name, column);
|
|
}
|
|
}
|
|
|
|
/// Record that a column was dropped during the current transaction
|
|
pub fn record_drop_column(&mut self, table_name: String, column_index: usize) {
|
|
if let Some(txn) = &mut *self.transaction_tables {
|
|
txn.record_drop_column(table_name, column_index);
|
|
}
|
|
}
|
|
|
|
/// Record that a column was renamed during the current transaction
|
|
pub fn record_rename_column(&mut self, table_name: String, old_name: String, new_name: String) {
|
|
if let Some(txn) = &mut *self.transaction_tables {
|
|
txn.record_rename_column(table_name, old_name, new_name);
|
|
}
|
|
}
|
|
|
|
/// Record that a column was altered during the current transaction
|
|
pub fn record_alter_column(
|
|
&mut self,
|
|
table_name: String,
|
|
old_name: String,
|
|
new_column: sql_generation::model::table::Column,
|
|
) {
|
|
if let Some(txn) = &mut *self.transaction_tables {
|
|
txn.record_alter_column(table_name, old_name, new_column);
|
|
}
|
|
}
|
|
|
|
pub fn create_snapshot(&mut self) {
|
|
*self.transaction_tables = Some(TransactionTables::new(self.commited_tables.clone()));
|
|
}
|
|
|
|
pub fn apply_snapshot(&mut self) {
|
|
if let Some(transaction_tables) = self.transaction_tables.take() {
|
|
// Apply all operations in recorded order.
|
|
// This ensures operations like ADD COLUMN, DELETE are applied correctly
|
|
// where DELETE sees rows with the same shape as when it was recorded.
|
|
for op in &transaction_tables.operations {
|
|
match op {
|
|
RowOperation::Insert { table_name, row } => {
|
|
let committed = self
|
|
.commited_tables
|
|
.iter_mut()
|
|
.find(|t| &t.name == table_name)
|
|
.expect("Table should exist in committed tables");
|
|
committed.rows.push(row.clone());
|
|
}
|
|
RowOperation::Delete { table_name, row } => {
|
|
let committed = self
|
|
.commited_tables
|
|
.iter_mut()
|
|
.find(|t| &t.name == table_name)
|
|
.expect("Table should exist in committed tables");
|
|
if let Some(pos) = committed.rows.iter().position(|r| r == row) {
|
|
committed.rows.remove(pos);
|
|
}
|
|
}
|
|
RowOperation::DropTable { table_name } => {
|
|
self.commited_tables.retain(|t| &t.name != table_name);
|
|
}
|
|
RowOperation::RenameTable { old_name, new_name } => {
|
|
let committed = self
|
|
.commited_tables
|
|
.iter_mut()
|
|
.find(|t| &t.name == old_name)
|
|
.expect("Table should exist in committed tables");
|
|
committed.name = new_name.clone();
|
|
}
|
|
RowOperation::AddColumn { table_name, column } => {
|
|
let committed = self
|
|
.commited_tables
|
|
.iter_mut()
|
|
.find(|t| &t.name == table_name)
|
|
.expect("Table should exist in committed tables");
|
|
let txn_table = transaction_tables
|
|
.current_tables
|
|
.iter()
|
|
.find(|t| &t.name == table_name)
|
|
.expect("Transaction table should exist");
|
|
assert!(
|
|
txn_table.columns.len() > committed.columns.len(),
|
|
"Transaction table should have more columns than committed table"
|
|
);
|
|
committed.columns.push(column.clone());
|
|
let new_col_count = committed.columns.len();
|
|
// Add NULL only for rows that need it.
|
|
// Rows inserted after ADD COLUMN in the same transaction
|
|
// already have the correct number of values.
|
|
for row in &mut committed.rows {
|
|
while row.len() < new_col_count {
|
|
row.push(SimValue::NULL);
|
|
}
|
|
}
|
|
}
|
|
RowOperation::DropColumn {
|
|
table_name,
|
|
column_index,
|
|
} => {
|
|
let committed = self
|
|
.commited_tables
|
|
.iter_mut()
|
|
.find(|t| &t.name == table_name)
|
|
.expect("Table should exist in committed tables");
|
|
let txn_table = transaction_tables
|
|
.current_tables
|
|
.iter()
|
|
.find(|t| &t.name == table_name)
|
|
.expect("Transaction table should exist");
|
|
assert!(
|
|
txn_table.columns.len() < committed.columns.len(),
|
|
"Transaction table should have fewer columns than committed table"
|
|
);
|
|
let old_col_count = committed.columns.len();
|
|
committed.columns.remove(*column_index);
|
|
// Only remove from rows that have the old column count.
|
|
// Rows inserted after DROP COLUMN in the same transaction
|
|
// already have the correct (new) number of values.
|
|
for row in &mut committed.rows {
|
|
if row.len() == old_col_count {
|
|
row.remove(*column_index);
|
|
}
|
|
}
|
|
}
|
|
RowOperation::RenameColumn {
|
|
table_name,
|
|
old_name,
|
|
new_name,
|
|
} => {
|
|
let committed = self
|
|
.commited_tables
|
|
.iter_mut()
|
|
.find(|t| &t.name == table_name)
|
|
.expect("Table should exist in committed tables");
|
|
let col = committed
|
|
.columns
|
|
.iter_mut()
|
|
.find(|c| &c.name == old_name)
|
|
.expect("Column should exist");
|
|
col.name = new_name.clone();
|
|
// Update index column names
|
|
for index in &mut committed.indexes {
|
|
for (col_name, _) in &mut index.columns {
|
|
if col_name == old_name {
|
|
*col_name = new_name.clone();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
RowOperation::AlterColumn {
|
|
table_name,
|
|
old_name,
|
|
new_column,
|
|
} => {
|
|
if let Some(committed) = self
|
|
.commited_tables
|
|
.iter_mut()
|
|
.find(|t| &t.name == table_name)
|
|
{
|
|
if let Some(col) =
|
|
committed.columns.iter_mut().find(|c| &c.name == old_name)
|
|
{
|
|
*col = new_column.clone();
|
|
}
|
|
// Update index column names if the column was renamed
|
|
for index in &mut committed.indexes {
|
|
for (col_name, _) in &mut index.columns {
|
|
if col_name == old_name {
|
|
*col_name = new_column.name.clone();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Sync indexes and add new tables created in the transaction.
|
|
for current_table in &transaction_tables.current_tables {
|
|
if let Some(committed) = self
|
|
.commited_tables
|
|
.iter_mut()
|
|
.find(|t| t.name == current_table.name)
|
|
{
|
|
// Sync indexes (CreateIndex/DropIndex don't have RowOperations yet)
|
|
committed.indexes = current_table.indexes.clone();
|
|
} else {
|
|
// New table created in transaction - copy with rows intact.
|
|
self.commited_tables.push(current_table.clone());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn delete_snapshot(&mut self) {
|
|
*self.transaction_tables = None;
|
|
}
|
|
}
|
|
|
|
impl<'a> Deref for ShadowTablesMut<'a> {
|
|
type Target = Vec<Table>;
|
|
|
|
fn deref(&self) -> &Self::Target {
|
|
self.tables()
|
|
}
|
|
}
|
|
|
|
impl<'a> DerefMut for ShadowTablesMut<'a> {
|
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
|
self.tables_mut()
|
|
}
|
|
}
|
|
|
|
pub(crate) struct SimulatorEnv {
|
|
pub(crate) opts: SimulatorOpts,
|
|
pub profile: Profile,
|
|
pub(crate) connections: Vec<SimConnection>,
|
|
pub(crate) io: Arc<dyn SimIO>,
|
|
pub(crate) db: Option<Arc<Database>>,
|
|
pub(crate) rng: ChaCha8Rng,
|
|
|
|
seed: u64,
|
|
pub(crate) paths: Paths,
|
|
pub(crate) type_: SimulationType,
|
|
pub(crate) phase: SimulationPhase,
|
|
pub io_backend: IoBackend,
|
|
|
|
/// If connection state is None, means we are not in a transaction
|
|
pub connection_tables: Vec<Option<TransactionTables>>,
|
|
/// Bit map indicating whether a connection has executed a query that is not transaction related
|
|
///
|
|
/// E.g Select, Insert, Create
|
|
/// and not Begin, Commit, Rollback \
|
|
/// Has max size of 64 to accomodate 64 connections
|
|
connection_last_query: Bitmap<64>,
|
|
// Table data that is committed into the database or wal
|
|
pub committed_tables: Vec<Table>,
|
|
}
|
|
|
|
impl UnwindSafe for SimulatorEnv {}
|
|
|
|
impl SimulatorEnv {
|
|
pub(crate) fn clone_without_connections(&self) -> Self {
|
|
SimulatorEnv {
|
|
opts: self.opts.clone(),
|
|
io: self.io.clone(),
|
|
db: self.db.clone(),
|
|
rng: self.rng.clone(),
|
|
seed: self.seed,
|
|
paths: self.paths.clone(),
|
|
type_: self.type_,
|
|
phase: self.phase,
|
|
io_backend: self.io_backend,
|
|
profile: self.profile.clone(),
|
|
connections: (0..self.connections.len())
|
|
.map(|_| SimConnection::Disconnected)
|
|
.collect(),
|
|
// TODO: not sure if connection_tables should be recreated instead
|
|
connection_tables: self.connection_tables.clone(),
|
|
connection_last_query: self.connection_last_query,
|
|
committed_tables: self.committed_tables.clone(),
|
|
}
|
|
}
|
|
|
|
pub(crate) fn clear(&mut self) {
|
|
self.clear_tables();
|
|
self.connections.iter_mut().for_each(|c| c.disconnect());
|
|
self.rng = ChaCha8Rng::seed_from_u64(self.opts.seed);
|
|
|
|
let latency_prof = &self.profile.io.latency;
|
|
|
|
let io: Arc<dyn SimIO> = match self.io_backend {
|
|
IoBackend::Memory => Arc::new(MemorySimIO::new(
|
|
self.opts.seed,
|
|
self.opts.page_size,
|
|
latency_prof.latency_probability,
|
|
latency_prof.min_tick,
|
|
latency_prof.max_tick,
|
|
)),
|
|
_ => Arc::new(
|
|
SimulatorIO::new(
|
|
self.opts.seed,
|
|
self.opts.page_size,
|
|
latency_prof.latency_probability,
|
|
latency_prof.min_tick,
|
|
latency_prof.max_tick,
|
|
self.io_backend,
|
|
)
|
|
.unwrap(),
|
|
),
|
|
};
|
|
|
|
// Remove existing database file
|
|
let db_path = self.get_db_path();
|
|
if db_path.exists() {
|
|
std::fs::remove_file(&db_path).unwrap();
|
|
}
|
|
|
|
let wal_path = db_path.with_extension("db-wal");
|
|
if wal_path.exists() {
|
|
std::fs::remove_file(&wal_path).unwrap();
|
|
}
|
|
self.db = None;
|
|
|
|
let db = match Database::open_file_with_flags(
|
|
io.clone(),
|
|
db_path.to_str().unwrap(),
|
|
turso_core::OpenFlags::default(),
|
|
turso_core::DatabaseOpts::new()
|
|
.with_mvcc(self.profile.experimental_mvcc)
|
|
.with_autovacuum(true),
|
|
None,
|
|
) {
|
|
Ok(db) => db,
|
|
Err(e) => {
|
|
tracing::error!(%e);
|
|
panic!("error opening simulator test file {db_path:?}: {e:?}");
|
|
}
|
|
};
|
|
self.io = io;
|
|
self.db = Some(db);
|
|
}
|
|
|
|
pub(crate) fn get_db_path(&self) -> PathBuf {
|
|
self.paths.db(&self.type_, &self.phase)
|
|
}
|
|
|
|
pub(crate) fn get_plan_path(&self) -> PathBuf {
|
|
self.paths.plan(&self.type_, &self.phase)
|
|
}
|
|
|
|
pub(crate) fn clone_as(&self, simulation_type: SimulationType) -> Self {
|
|
let mut env = self.clone_without_connections();
|
|
env.type_ = simulation_type;
|
|
env.clear();
|
|
env
|
|
}
|
|
|
|
pub(crate) fn clone_at_phase(&self, phase: SimulationPhase) -> Self {
|
|
let mut env = self.clone_without_connections();
|
|
env.phase = phase;
|
|
env.clear();
|
|
env
|
|
}
|
|
|
|
pub fn choose_conn(&self, rng: &mut impl Rng) -> usize {
|
|
rng.random_range(0..self.connections.len())
|
|
}
|
|
|
|
/// Rng only used for generating interactions. By having a separate Rng we can guarantee that a particular seed
|
|
/// will always create the same interactions plan, regardless of the changes that happen in the Database code
|
|
pub fn gen_rng(&self) -> ChaCha8Rng {
|
|
// Seed + 1 so that there is no relation with the original seed, and so we have no way to accidently generate
|
|
// the first Create statement twice in a row
|
|
ChaCha8Rng::seed_from_u64(self.seed + 1)
|
|
}
|
|
}
|
|
|
|
impl SimulatorEnv {
|
|
pub(crate) fn new(
|
|
seed: u64,
|
|
cli_opts: &SimulatorCLI,
|
|
paths: Paths,
|
|
simulation_type: SimulationType,
|
|
profile: &Profile,
|
|
) -> Self {
|
|
let mut rng = ChaCha8Rng::seed_from_u64(seed);
|
|
|
|
let mut opts = SimulatorOpts {
|
|
seed,
|
|
ticks: usize::MAX,
|
|
disable_select_optimizer: cli_opts.disable_select_optimizer,
|
|
disable_insert_values_select: cli_opts.disable_insert_values_select,
|
|
disable_double_create_failure: cli_opts.disable_double_create_failure,
|
|
disable_select_limit: cli_opts.disable_select_limit,
|
|
disable_delete_select: cli_opts.disable_delete_select,
|
|
disable_drop_select: cli_opts.disable_drop_select,
|
|
disable_where_true_false_null: cli_opts.disable_where_true_false_null,
|
|
disable_union_all_preserves_cardinality: cli_opts
|
|
.disable_union_all_preserves_cardinality,
|
|
disable_fsync_no_wait: cli_opts.disable_fsync_no_wait,
|
|
disable_faulty_query: cli_opts.disable_faulty_query,
|
|
page_size: 4096, // TODO: randomize this too
|
|
max_interactions: rng.random_range(cli_opts.minimum_tests..=cli_opts.maximum_tests),
|
|
max_time_simulation: cli_opts.maximum_time,
|
|
disable_reopen_database: cli_opts.disable_reopen_database,
|
|
disable_integrity_check: cli_opts.disable_integrity_check,
|
|
cache_size: profile.cache_size_pages.unwrap_or(DEFAULT_CACHE_SIZE),
|
|
};
|
|
|
|
// Remove existing database file if it exists
|
|
let db_path = paths.db(&simulation_type, &SimulationPhase::Test);
|
|
|
|
if db_path.exists() {
|
|
std::fs::remove_file(&db_path).unwrap();
|
|
}
|
|
|
|
let wal_path = db_path.with_extension("db-wal");
|
|
if wal_path.exists() {
|
|
std::fs::remove_file(&wal_path).unwrap();
|
|
}
|
|
|
|
let mut profile = profile.clone();
|
|
// Conditionals here so that we can override some profile options from the CLI
|
|
if let Some(mvcc) = cli_opts.experimental_mvcc {
|
|
profile.experimental_mvcc = mvcc;
|
|
}
|
|
if let Some(latency_prob) = cli_opts.latency_probability {
|
|
profile.io.latency.latency_probability = latency_prob;
|
|
}
|
|
if let Some(max_tick) = cli_opts.max_tick {
|
|
profile.io.latency.max_tick = max_tick;
|
|
}
|
|
if let Some(min_tick) = cli_opts.min_tick {
|
|
profile.io.latency.min_tick = min_tick;
|
|
}
|
|
if cli_opts.differential {
|
|
// Disable faults when running against sqlite as we cannot control faults on it
|
|
profile.io.enable = false;
|
|
// Disable limits due to differences in return order from turso and rusqlite
|
|
opts.disable_select_limit = true;
|
|
|
|
// There is no `ALTER COLUMN` in SQLite
|
|
profile.query.gen_opts.query.alter_table.alter_column = false;
|
|
}
|
|
|
|
profile.validate().unwrap();
|
|
|
|
let latency_prof = &profile.io.latency;
|
|
|
|
let io_backend = cli_opts.io_backend;
|
|
let io: Arc<dyn SimIO> = match io_backend {
|
|
IoBackend::Memory => Arc::new(MemorySimIO::new(
|
|
opts.seed,
|
|
opts.page_size,
|
|
latency_prof.latency_probability,
|
|
latency_prof.min_tick,
|
|
latency_prof.max_tick,
|
|
)),
|
|
_ => Arc::new(
|
|
SimulatorIO::new(
|
|
opts.seed,
|
|
opts.page_size,
|
|
latency_prof.latency_probability,
|
|
latency_prof.min_tick,
|
|
latency_prof.max_tick,
|
|
io_backend,
|
|
)
|
|
.unwrap(),
|
|
),
|
|
};
|
|
|
|
let db = match Database::open_file_with_flags(
|
|
io.clone(),
|
|
db_path.to_str().unwrap(),
|
|
turso_core::OpenFlags::default(),
|
|
turso_core::DatabaseOpts::new()
|
|
.with_mvcc(profile.experimental_mvcc)
|
|
.with_autovacuum(true),
|
|
None,
|
|
) {
|
|
Ok(db) => db,
|
|
Err(e) => {
|
|
panic!("error opening simulator test file {db_path:?}: {e:?}");
|
|
}
|
|
};
|
|
|
|
let connections = (0..profile.max_connections)
|
|
.map(|_| SimConnection::Disconnected)
|
|
.collect::<Vec<_>>();
|
|
|
|
SimulatorEnv {
|
|
opts,
|
|
connections,
|
|
paths,
|
|
rng,
|
|
seed,
|
|
io,
|
|
db: Some(db),
|
|
type_: simulation_type,
|
|
phase: SimulationPhase::Test,
|
|
io_backend,
|
|
profile: profile.clone(),
|
|
committed_tables: Vec::new(),
|
|
connection_tables: vec![None; profile.max_connections],
|
|
connection_last_query: Bitmap::new(),
|
|
}
|
|
}
|
|
|
|
pub(crate) fn connect(&mut self, connection_index: usize) {
|
|
if connection_index >= self.connections.len() {
|
|
panic!("connection index out of bounds");
|
|
}
|
|
|
|
if self.connections[connection_index].is_connected() {
|
|
trace!("Connection {connection_index} is already connected, skipping reconnection");
|
|
return;
|
|
}
|
|
|
|
match self.type_ {
|
|
SimulationType::Default | SimulationType::Doublecheck => {
|
|
let conn = self
|
|
.db
|
|
.as_ref()
|
|
.expect("db to be Some")
|
|
.connect()
|
|
.expect("Failed to connect to Limbo database");
|
|
if self.opts.cache_size != DEFAULT_CACHE_SIZE {
|
|
conn.execute(format!("PRAGMA cache_size = {}", self.opts.cache_size))
|
|
.expect("set pragma cache_size");
|
|
}
|
|
self.connections[connection_index] = SimConnection::LimboConnection(conn);
|
|
}
|
|
SimulationType::Differential => {
|
|
self.connections[connection_index] = SimConnection::SQLiteConnection(
|
|
rusqlite::Connection::open(self.get_db_path())
|
|
.expect("Failed to open SQLite connection"),
|
|
);
|
|
}
|
|
};
|
|
}
|
|
|
|
/// Clears the commited tables and the connection tables
|
|
pub fn clear_tables(&mut self) {
|
|
self.committed_tables.clear();
|
|
self.connection_tables.iter_mut().for_each(|t| *t = None);
|
|
self.connection_last_query = Bitmap::new();
|
|
}
|
|
|
|
// TODO: does not yet create the appropriate context to avoid WriteWriteConflitcs
|
|
pub fn connection_context(&self, conn_index: usize) -> impl GenerationContext {
|
|
struct ConnectionGenContext<'a> {
|
|
tables: &'a Vec<sql_generation::model::table::Table>,
|
|
opts: &'a sql_generation::generation::Opts,
|
|
}
|
|
|
|
impl<'a> GenerationContext for ConnectionGenContext<'a> {
|
|
fn tables(&self) -> &Vec<sql_generation::model::table::Table> {
|
|
self.tables
|
|
}
|
|
|
|
fn opts(&self) -> &sql_generation::generation::Opts {
|
|
self.opts
|
|
}
|
|
}
|
|
|
|
let tables = self.get_conn_tables(conn_index).tables();
|
|
|
|
ConnectionGenContext {
|
|
opts: &self.profile.query.gen_opts,
|
|
tables,
|
|
}
|
|
}
|
|
|
|
pub fn conn_in_transaction(&self, conn_index: usize) -> bool {
|
|
self.connection_tables
|
|
.get(conn_index)
|
|
.is_some_and(|t| t.is_some())
|
|
}
|
|
|
|
pub fn has_conn_executed_query_after_transaction(&self, conn_index: usize) -> bool {
|
|
self.connection_last_query.get(conn_index)
|
|
}
|
|
|
|
pub fn update_conn_last_interaction(&mut self, conn_index: usize, query: Option<&Query>) {
|
|
// If the conn will execute a transaction statement then we set the bitmap to false
|
|
// to indicate we have not executed any queries yet after the transaction begun
|
|
let value = query.is_some_and(|query| {
|
|
matches!(
|
|
query,
|
|
Query::Begin(..) | Query::Commit(..) | Query::Rollback(..)
|
|
)
|
|
});
|
|
self.connection_last_query.set(conn_index, value);
|
|
}
|
|
|
|
pub fn rollback_conn(&mut self, conn_index: usize) {
|
|
Rollback.shadow(&mut self.get_conn_tables_mut(conn_index));
|
|
self.update_conn_last_interaction(conn_index, Some(&Query::Rollback(Rollback)));
|
|
}
|
|
|
|
pub fn get_conn_tables(&self, conn_index: usize) -> ShadowTables<'_> {
|
|
ShadowTables {
|
|
transaction_tables: self.connection_tables.get(conn_index).unwrap().as_ref(),
|
|
commited_tables: &self.committed_tables,
|
|
}
|
|
}
|
|
|
|
pub fn get_conn_tables_mut(&mut self, conn_index: usize) -> ShadowTablesMut<'_> {
|
|
ShadowTablesMut {
|
|
transaction_tables: self.connection_tables.get_mut(conn_index).unwrap(),
|
|
commited_tables: &mut self.committed_tables,
|
|
}
|
|
}
|
|
}
|
|
|
|
pub(crate) enum SimConnection {
|
|
LimboConnection(Arc<turso_core::Connection>),
|
|
SQLiteConnection(rusqlite::Connection),
|
|
Disconnected,
|
|
}
|
|
|
|
impl SimConnection {
|
|
pub(crate) fn is_connected(&self) -> bool {
|
|
match self {
|
|
SimConnection::LimboConnection(_) | SimConnection::SQLiteConnection(_) => true,
|
|
SimConnection::Disconnected => false,
|
|
}
|
|
}
|
|
pub(crate) fn disconnect(&mut self) {
|
|
let conn = mem::replace(self, SimConnection::Disconnected);
|
|
|
|
match conn {
|
|
SimConnection::LimboConnection(conn) => {
|
|
conn.close().unwrap();
|
|
}
|
|
SimConnection::SQLiteConnection(conn) => {
|
|
conn.close().unwrap();
|
|
}
|
|
SimConnection::Disconnected => {}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Display for SimConnection {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
match self {
|
|
SimConnection::LimboConnection(_) => {
|
|
write!(f, "LimboConnection")
|
|
}
|
|
SimConnection::SQLiteConnection(_) => {
|
|
write!(f, "SQLiteConnection")
|
|
}
|
|
SimConnection::Disconnected => {
|
|
write!(f, "Disconnected")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub(crate) struct SimulatorOpts {
|
|
pub(crate) seed: u64,
|
|
pub(crate) ticks: usize,
|
|
|
|
pub(crate) disable_select_optimizer: bool,
|
|
pub(crate) disable_insert_values_select: bool,
|
|
pub(crate) disable_double_create_failure: bool,
|
|
pub(crate) disable_select_limit: bool,
|
|
pub(crate) disable_delete_select: bool,
|
|
pub(crate) disable_drop_select: bool,
|
|
pub(crate) disable_where_true_false_null: bool,
|
|
pub(crate) disable_union_all_preserves_cardinality: bool,
|
|
pub(crate) disable_fsync_no_wait: bool,
|
|
pub(crate) disable_faulty_query: bool,
|
|
pub(crate) disable_reopen_database: bool,
|
|
pub(crate) disable_integrity_check: bool,
|
|
|
|
pub(crate) max_interactions: u32,
|
|
pub(crate) page_size: usize,
|
|
pub(crate) cache_size: usize,
|
|
pub(crate) max_time_simulation: usize,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub(crate) struct Paths {
|
|
pub(crate) base: PathBuf,
|
|
pub(crate) history: PathBuf,
|
|
}
|
|
|
|
impl Paths {
|
|
pub(crate) fn new(output_dir: &Path) -> Self {
|
|
Paths {
|
|
base: output_dir.to_path_buf(),
|
|
history: PathBuf::from(output_dir).join("history.txt"),
|
|
}
|
|
}
|
|
|
|
fn path_(&self, type_: &SimulationType, phase: &SimulationPhase) -> PathBuf {
|
|
match (type_, phase) {
|
|
(SimulationType::Default, SimulationPhase::Test) => self.base.join(Path::new("test")),
|
|
(SimulationType::Default, SimulationPhase::Shrink) => {
|
|
self.base.join(Path::new("shrink"))
|
|
}
|
|
(SimulationType::Differential, SimulationPhase::Test) => {
|
|
self.base.join(Path::new("diff"))
|
|
}
|
|
(SimulationType::Differential, SimulationPhase::Shrink) => {
|
|
self.base.join(Path::new("diff_shrink"))
|
|
}
|
|
(SimulationType::Doublecheck, SimulationPhase::Test) => {
|
|
self.base.join(Path::new("doublecheck"))
|
|
}
|
|
(SimulationType::Doublecheck, SimulationPhase::Shrink) => {
|
|
self.base.join(Path::new("doublecheck_shrink"))
|
|
}
|
|
}
|
|
}
|
|
|
|
pub(crate) fn db(&self, type_: &SimulationType, phase: &SimulationPhase) -> PathBuf {
|
|
self.path_(type_, phase).with_extension("db")
|
|
}
|
|
pub(crate) fn plan(&self, type_: &SimulationType, phase: &SimulationPhase) -> PathBuf {
|
|
self.path_(type_, phase).with_extension("sql")
|
|
}
|
|
|
|
pub fn delete_all_files(&self) {
|
|
if self.base.exists() {
|
|
let res = std::fs::remove_dir_all(&self.base);
|
|
if res.is_err() {
|
|
tracing::error!(error = %res.unwrap_err(),"failed to remove directory");
|
|
}
|
|
}
|
|
}
|
|
}
|