Merge 'Fix infinite loops, rollback problems, and other bugs found by I/O fault injection' from Pedro Muniz

Was running the sim with I/O faults enabled and fixed some nasty bugs.
Now, there are some more nasty bugs to fix as well. This is the command
that I use to run the simulator `cargo run -p limbo_sim -- --minimum-
tests 10 --maximum-tests 1000`
This PR mainly fixes the following bugs:
- Not decrementing in flight write counter when `pwrite` fails
- not rolling back the transaction on `step` error
- not rolling back the transaction on `run_once` error
- some functions were just being unwrapped when they could suffer io
errors
- Only change max_frame after wal sync's

Reviewed-by: Pere Diaz Bou <pere-altea@homail.com>
Reviewed-by: Pere Diaz Bou <pere-altea@homail.com>

Closes #1946
This commit is contained in:
Pekka Enberg 2025-07-07 21:31:26 +03:00
commit 341f963a8e
48 changed files with 475 additions and 368 deletions

View file

@ -73,7 +73,7 @@ jobs:
with:
prefix-key: "v1-rust" # can be updated if we need to reset caches due to non-trivial change in the dependencies (for example, custom env var were set for single workspace project)
- name: Install the project
run: ./scripts/run-sim --iterations 50
run: ./scripts/run-sim --maximum-tests 2000 loop -n 50 -s
test-limbo:
runs-on: blacksmith-4vcpu-ubuntu-2404

View file

@ -6,6 +6,7 @@ UNAME_S := $(shell uname -s)
# Executable used to execute the compatibility tests.
SQLITE_EXEC ?= scripts/limbo-sqlite3
RUST_LOG := off
all: check-rust-version check-wasm-target limbo limbo-wasm
.PHONY: all
@ -55,23 +56,23 @@ test: limbo uv-sync test-compat test-vector test-sqlite3 test-shell test-extensi
.PHONY: test
test-extensions: limbo uv-sync
uv run --project limbo_test test-extensions
RUST_LOG=$(RUST_LOG) uv run --project limbo_test test-extensions
.PHONY: test-extensions
test-shell: limbo uv-sync
SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-shell
RUST_LOG=$(RUST_LOG) SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-shell
.PHONY: test-shell
test-compat:
SQLITE_EXEC=$(SQLITE_EXEC) ./testing/all.test
RUST_LOG=$(RUST_LOG) SQLITE_EXEC=$(SQLITE_EXEC) ./testing/all.test
.PHONY: test-compat
test-vector:
SQLITE_EXEC=$(SQLITE_EXEC) ./testing/vector.test
RUST_LOG=$(RUST_LOG) SQLITE_EXEC=$(SQLITE_EXEC) ./testing/vector.test
.PHONY: test-vector
test-time:
SQLITE_EXEC=$(SQLITE_EXEC) ./testing/time.test
RUST_LOG=$(RUST_LOG) SQLITE_EXEC=$(SQLITE_EXEC) ./testing/time.test
.PHONY: test-time
reset-db:
@ -85,16 +86,16 @@ test-sqlite3: reset-db
.PHONY: test-sqlite3
test-json:
SQLITE_EXEC=$(SQLITE_EXEC) ./testing/json.test
RUST_LOG=$(RUST_LOG) SQLITE_EXEC=$(SQLITE_EXEC) ./testing/json.test
.PHONY: test-json
test-memory: limbo uv-sync
SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-memory
RUST_LOG=$(RUST_LOG) SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-memory
.PHONY: test-memory
test-write: limbo uv-sync
@if [ "$(SQLITE_EXEC)" != "scripts/limbo-sqlite3" ]; then \
SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-write; \
RUST_LOG=$(RUST_LOG) SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-write; \
else \
echo "Skipping test-write: SQLITE_EXEC does not have indexes scripts/limbo-sqlite3"; \
fi
@ -102,7 +103,7 @@ test-write: limbo uv-sync
test-update: limbo uv-sync
@if [ "$(SQLITE_EXEC)" != "scripts/limbo-sqlite3" ]; then \
SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-update; \
RUST_LOG=$(RUST_LOG) SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-update; \
else \
echo "Skipping test-update: SQLITE_EXEC does not have indexes scripts/limbo-sqlite3"; \
fi
@ -110,7 +111,7 @@ test-update: limbo uv-sync
test-collate: limbo uv-sync
@if [ "$(SQLITE_EXEC)" != "scripts/limbo-sqlite3" ]; then \
SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-collate; \
RUST_LOG=$(RUST_LOG) SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-collate; \
else \
echo "Skipping test-collate: SQLITE_EXEC does not have indexes scripts/limbo-sqlite3"; \
fi
@ -118,7 +119,7 @@ test-collate: limbo uv-sync
test-constraint: limbo uv-sync
@if [ "$(SQLITE_EXEC)" != "scripts/limbo-sqlite3" ]; then \
SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-constraint; \
RUST_LOG=$(RUST_LOG) SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-constraint; \
else \
echo "Skipping test-constraint: SQLITE_EXEC does not have indexes scripts/limbo-sqlite3"; \
fi
@ -126,7 +127,7 @@ test-constraint: limbo uv-sync
bench-vfs: uv-sync
cargo build --release
uv run --project limbo_test bench-vfs "$(SQL)" "$(N)"
RUST_LOG=$(RUST_LOG) uv run --project limbo_test bench-vfs "$(SQL)" "$(N)"
clickbench:
./perf/clickbench/benchmark.sh

View file

@ -7,7 +7,7 @@ use turso_core::{LimboError, Statement, StepResult, Value};
pub struct LimboRows<'conn> {
stmt: Box<Statement>,
conn: &'conn mut LimboConn,
_conn: &'conn mut LimboConn,
err: Option<LimboError>,
}
@ -15,7 +15,7 @@ impl<'conn> LimboRows<'conn> {
pub fn new(stmt: Statement, conn: &'conn mut LimboConn) -> Self {
LimboRows {
stmt: Box::new(stmt),
conn,
_conn: conn,
err: None,
}
}
@ -55,8 +55,12 @@ pub extern "C" fn rows_next(ctx: *mut c_void) -> ResultCode {
Ok(StepResult::Row) => ResultCode::Row,
Ok(StepResult::Done) => ResultCode::Done,
Ok(StepResult::IO) => {
let _ = ctx.conn.io.run_once();
ResultCode::Io
let res = ctx.stmt.run_once();
if res.is_err() {
ResultCode::Error
} else {
ResultCode::Io
}
}
Ok(StepResult::Busy) => ResultCode::Busy,
Ok(StepResult::Interrupt) => ResultCode::Interrupt,

View file

@ -64,7 +64,10 @@ pub extern "C" fn stmt_execute(
return ResultCode::Done;
}
Ok(StepResult::IO) => {
let _ = stmt.conn.io.run_once();
let res = statement.run_once();
if res.is_err() {
return ResultCode::Error;
}
}
Ok(StepResult::Busy) => {
return ResultCode::Busy;

View file

@ -13,12 +13,12 @@ use turso_core::Connection;
#[derive(Clone)]
pub struct TursoConnection {
pub(crate) conn: Arc<Connection>,
pub(crate) io: Arc<dyn turso_core::IO>,
pub(crate) _io: Arc<dyn turso_core::IO>,
}
impl TursoConnection {
pub fn new(conn: Arc<Connection>, io: Arc<dyn turso_core::IO>) -> Self {
TursoConnection { conn, io }
TursoConnection { conn, _io: io }
}
#[allow(clippy::wrong_self_convention)]

View file

@ -76,7 +76,7 @@ pub extern "system" fn Java_tech_turso_core_TursoStatement_step<'local>(
};
}
StepResult::IO => {
if let Err(e) = stmt.connection.io.run_once() {
if let Err(e) = stmt.stmt.run_once() {
set_err_msg_and_throw_exception(&mut env, obj, TURSO_ETC, e.to_string());
return to_turso_step_result(&mut env, STEP_RESULT_ID_ERROR, None);
}

View file

@ -41,7 +41,7 @@ pub struct Database {
pub name: String,
_db: Arc<turso_core::Database>,
conn: Arc<turso_core::Connection>,
io: Arc<dyn turso_core::IO>,
_io: Arc<dyn turso_core::IO>,
}
impl ObjectFinalize for Database {
@ -82,7 +82,7 @@ impl Database {
conn,
open: true,
name: path,
io,
_io: io,
})
}
@ -114,7 +114,7 @@ impl Database {
return Ok(env.get_undefined()?.into_unknown())
}
turso_core::StepResult::IO => {
self.io.run_once().map_err(into_napi_error)?;
stmt.run_once().map_err(into_napi_error)?;
continue;
}
step @ turso_core::StepResult::Interrupt
@ -185,7 +185,7 @@ impl Database {
Ok(Some(mut stmt)) => loop {
match stmt.step() {
Ok(StepResult::Row) => continue,
Ok(StepResult::IO) => self.io.run_once().map_err(into_napi_error)?,
Ok(StepResult::IO) => stmt.run_once().map_err(into_napi_error)?,
Ok(StepResult::Done) => break,
Ok(StepResult::Interrupt | StepResult::Busy) => {
return Err(napi::Error::new(
@ -308,7 +308,7 @@ impl Statement {
}
turso_core::StepResult::Done => return Ok(env.get_undefined()?.into_unknown()),
turso_core::StepResult::IO => {
self.database.io.run_once().map_err(into_napi_error)?;
stmt.run_once().map_err(into_napi_error)?;
continue;
}
turso_core::StepResult::Interrupt | turso_core::StepResult::Busy => {
@ -338,7 +338,7 @@ impl Statement {
self.check_and_bind(args)?;
Ok(IteratorStatement {
stmt: Rc::clone(&self.inner),
database: self.database.clone(),
_database: self.database.clone(),
env,
presentation_mode: self.presentation_mode.clone(),
})
@ -401,7 +401,7 @@ impl Statement {
break;
}
turso_core::StepResult::IO => {
self.database.io.run_once().map_err(into_napi_error)?;
stmt.run_once().map_err(into_napi_error)?;
}
turso_core::StepResult::Interrupt | turso_core::StepResult::Busy => {
return Err(napi::Error::new(
@ -480,7 +480,7 @@ impl Statement {
#[napi(iterator)]
pub struct IteratorStatement {
stmt: Rc<RefCell<turso_core::Statement>>,
database: Database,
_database: Database,
env: Env,
presentation_mode: PresentationMode,
}
@ -528,7 +528,7 @@ impl Generator for IteratorStatement {
}
turso_core::StepResult::Done => return None,
turso_core::StepResult::IO => {
self.database.io.run_once().ok()?;
stmt.run_once().ok()?;
continue;
}
turso_core::StepResult::Interrupt | turso_core::StepResult::Busy => return None,

View file

@ -96,14 +96,12 @@ impl Cursor {
// For DDL and DML statements,
// we need to execute the statement immediately
if stmt_is_ddl || stmt_is_dml || stmt_is_tx {
let mut stmt = stmt.borrow_mut();
while let turso_core::StepResult::IO = stmt
.borrow_mut()
.step()
.map_err(|e| PyErr::new::<OperationalError, _>(format!("Step error: {:?}", e)))?
{
self.conn
.io
.run_once()
stmt.run_once()
.map_err(|e| PyErr::new::<OperationalError, _>(format!("IO error: {:?}", e)))?;
}
}
@ -132,7 +130,7 @@ impl Cursor {
return Ok(Some(py_row));
}
turso_core::StepResult::IO => {
self.conn.io.run_once().map_err(|e| {
stmt.run_once().map_err(|e| {
PyErr::new::<OperationalError, _>(format!("IO error: {:?}", e))
})?;
}
@ -168,7 +166,7 @@ impl Cursor {
results.push(py_row);
}
turso_core::StepResult::IO => {
self.conn.io.run_once().map_err(|e| {
stmt.run_once().map_err(|e| {
PyErr::new::<OperationalError, _>(format!("IO error: {:?}", e))
})?;
}
@ -233,7 +231,7 @@ fn stmt_is_tx(sql: &str) -> bool {
#[derive(Clone)]
pub struct Connection {
conn: Arc<turso_core::Connection>,
io: Arc<dyn turso_core::IO>,
_io: Arc<dyn turso_core::IO>,
}
#[pymethods]
@ -308,7 +306,7 @@ impl Drop for Connection {
#[pyfunction]
pub fn connect(path: &str) -> Result<Connection> {
match turso_core::Connection::from_uri(path, false, false) {
Ok((io, conn)) => Ok(Connection { conn, io }),
Ok((io, conn)) => Ok(Connection { conn, _io: io }),
Err(e) => Err(PyErr::new::<ProgrammingError, _>(format!(
"Failed to create connection: {:?}",
e

View file

@ -24,6 +24,7 @@ use std::{
},
time::{Duration, Instant},
};
use tracing::level_filters::LevelFilter;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use turso_core::{Connection, Database, LimboError, OpenFlags, Statement, StepResult, Value};
@ -95,7 +96,7 @@ macro_rules! query_internal {
$body(row)?;
}
StepResult::IO => {
$self.io.run_once()?;
rows.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
@ -175,7 +176,6 @@ impl Limbo {
pub fn with_readline(mut self, mut rl: Editor<LimboHelper, DefaultHistory>) -> Self {
let h = LimboHelper::new(
self.conn.clone(),
self.io.clone(),
self.config.as_ref().map(|c| c.highlight.clone()),
);
rl.set_helper(Some(h));
@ -644,8 +644,7 @@ impl Limbo {
let _ = self.show_info();
}
Command::Import(args) => {
let mut import_file =
ImportFile::new(self.conn.clone(), self.io.clone(), &mut self.writer);
let mut import_file = ImportFile::new(self.conn.clone(), &mut self.writer);
import_file.import(args)
}
Command::LoadExtension(args) => {
@ -740,7 +739,7 @@ impl Limbo {
}
Ok(StepResult::IO) => {
let start = Instant::now();
self.io.run_once()?;
rows.run_once()?;
if let Some(ref mut stats) = statistics {
stats.io_time_elapsed_samples.push(start.elapsed());
}
@ -833,7 +832,7 @@ impl Limbo {
}
Ok(StepResult::IO) => {
let start = Instant::now();
self.io.run_once()?;
rows.run_once()?;
if let Some(ref mut stats) = statistics {
stats.io_time_elapsed_samples.push(start.elapsed());
}
@ -908,7 +907,12 @@ impl Limbo {
.with_thread_ids(true)
.with_ansi(should_emit_ansi),
)
.with(EnvFilter::from_default_env().add_directive("rustyline=off".parse().unwrap()))
.with(
EnvFilter::builder()
.with_default_directive(LevelFilter::OFF.into())
.from_env_lossy()
.add_directive("rustyline=off".parse().unwrap()),
)
.try_init()
{
println!("Unable to setup tracing appender: {:?}", e);
@ -940,7 +944,7 @@ impl Limbo {
}
}
StepResult::IO => {
self.io.run_once()?;
rows.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
@ -996,7 +1000,7 @@ impl Limbo {
}
}
StepResult::IO => {
self.io.run_once()?;
rows.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
@ -1047,7 +1051,7 @@ impl Limbo {
}
}
StepResult::IO => {
self.io.run_once()?;
rows.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,

View file

@ -21,17 +21,12 @@ pub struct ImportArgs {
pub struct ImportFile<'a> {
conn: Arc<Connection>,
io: Arc<dyn turso_core::IO>,
writer: &'a mut dyn Write,
}
impl<'a> ImportFile<'a> {
pub fn new(
conn: Arc<Connection>,
io: Arc<dyn turso_core::IO>,
writer: &'a mut dyn Write,
) -> Self {
Self { conn, io, writer }
pub fn new(conn: Arc<Connection>, writer: &'a mut dyn Write) -> Self {
Self { conn, writer }
}
pub fn import(&mut self, args: ImportArgs) {
@ -79,7 +74,7 @@ impl<'a> ImportFile<'a> {
while let Ok(x) = rows.step() {
match x {
turso_core::StepResult::IO => {
self.io.run_once().unwrap();
rows.run_once().unwrap();
}
turso_core::StepResult::Done => break,
turso_core::StepResult::Interrupt => break,

View file

@ -40,11 +40,7 @@ pub struct LimboHelper {
}
impl LimboHelper {
pub fn new(
conn: Arc<Connection>,
io: Arc<dyn turso_core::IO>,
syntax_config: Option<HighlightConfig>,
) -> Self {
pub fn new(conn: Arc<Connection>, syntax_config: Option<HighlightConfig>) -> Self {
// Load only predefined syntax
let ps = from_uncompressed_data(include_bytes!(concat!(
env!("OUT_DIR"),
@ -59,7 +55,7 @@ impl LimboHelper {
}
}
LimboHelper {
completer: SqlCompleter::new(conn, io),
completer: SqlCompleter::new(conn),
syntax_set: ps,
theme_set: ts,
syntax_config: syntax_config.unwrap_or_default(),
@ -141,7 +137,6 @@ impl Highlighter for LimboHelper {
pub struct SqlCompleter<C: Parser + Send + Sync + 'static> {
conn: Arc<Connection>,
io: Arc<dyn turso_core::IO>,
// Has to be a ref cell as Rustyline takes immutable reference to self
// This problem would be solved with Reedline as it uses &mut self for completions
cmd: RefCell<clap::Command>,
@ -149,10 +144,9 @@ pub struct SqlCompleter<C: Parser + Send + Sync + 'static> {
}
impl<C: Parser + Send + Sync + 'static> SqlCompleter<C> {
pub fn new(conn: Arc<Connection>, io: Arc<dyn turso_core::IO>) -> Self {
pub fn new(conn: Arc<Connection>) -> Self {
Self {
conn,
io,
cmd: C::command().into(),
_cmd_phantom: PhantomData,
}
@ -228,7 +222,7 @@ impl<C: Parser + Send + Sync + 'static> SqlCompleter<C> {
candidates.push(pair);
}
StepResult::IO => {
try_result!(self.io.run_once(), (prefix_pos, candidates));
try_result!(rows.run_once(), (prefix_pos, candidates));
}
StepResult::Interrupt => break,
StepResult::Done => break,

View file

@ -1,7 +1,7 @@
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
use pprof::criterion::{Output, PProfProfiler};
use std::sync::Arc;
use turso_core::{Database, PlatformIO, IO};
use turso_core::{Database, PlatformIO};
fn rusqlite_open() -> rusqlite::Connection {
let sqlite_conn = rusqlite::Connection::open("../testing/testing.db").unwrap();
@ -79,7 +79,6 @@ fn bench_execute_select_rows(criterion: &mut Criterion) {
let mut stmt = limbo_conn
.prepare(format!("SELECT * FROM users LIMIT {}", *i))
.unwrap();
let io = io.clone();
b.iter(|| {
loop {
match stmt.step().unwrap() {
@ -87,7 +86,7 @@ fn bench_execute_select_rows(criterion: &mut Criterion) {
black_box(stmt.row());
}
turso_core::StepResult::IO => {
let _ = io.run_once();
stmt.run_once().unwrap();
}
turso_core::StepResult::Done => {
break;
@ -141,7 +140,6 @@ fn bench_execute_select_1(criterion: &mut Criterion) {
group.bench_function("limbo_execute_select_1", |b| {
let mut stmt = limbo_conn.prepare("SELECT 1").unwrap();
let io = io.clone();
b.iter(|| {
loop {
match stmt.step().unwrap() {
@ -149,7 +147,7 @@ fn bench_execute_select_1(criterion: &mut Criterion) {
black_box(stmt.row());
}
turso_core::StepResult::IO => {
let _ = io.run_once();
stmt.run_once().unwrap();
}
turso_core::StepResult::Done => {
break;
@ -194,7 +192,6 @@ fn bench_execute_select_count(criterion: &mut Criterion) {
group.bench_function("limbo_execute_select_count", |b| {
let mut stmt = limbo_conn.prepare("SELECT count() FROM users").unwrap();
let io = io.clone();
b.iter(|| {
loop {
match stmt.step().unwrap() {
@ -202,7 +199,7 @@ fn bench_execute_select_count(criterion: &mut Criterion) {
black_box(stmt.row());
}
turso_core::StepResult::IO => {
let _ = io.run_once();
stmt.run_once().unwrap();
}
turso_core::StepResult::Done => {
break;

View file

@ -4,7 +4,7 @@ use pprof::{
flamegraph::Options,
};
use std::sync::Arc;
use turso_core::{Database, PlatformIO, IO};
use turso_core::{Database, PlatformIO};
// Title: JSONB Function Benchmarking
@ -447,13 +447,12 @@ fn bench(criterion: &mut Criterion) {
group.bench_function("Limbo", |b| {
let mut stmt = limbo_conn.prepare(&query).unwrap();
let io = io.clone();
b.iter(|| {
loop {
match stmt.step().unwrap() {
turso_core::StepResult::Row => {}
turso_core::StepResult::IO => {
let _ = io.run_once();
stmt.run_once().unwrap();
}
turso_core::StepResult::Done => {
break;
@ -606,13 +605,12 @@ fn bench_sequential_jsonb(criterion: &mut Criterion) {
group.bench_function("Limbo - Sequential", |b| {
let mut stmt = limbo_conn.prepare(&query).unwrap();
let io = io.clone();
b.iter(|| {
loop {
match stmt.step().unwrap() {
turso_core::StepResult::Row => {}
turso_core::StepResult::IO => {
let _ = io.run_once();
stmt.run_once().unwrap();
}
turso_core::StepResult::Done => {
break;
@ -899,13 +897,12 @@ fn bench_json_patch(criterion: &mut Criterion) {
group.bench_function("Limbo", |b| {
let mut stmt = limbo_conn.prepare(&query).unwrap();
let io = io.clone();
b.iter(|| {
loop {
match stmt.step().unwrap() {
turso_core::StepResult::Row => {}
turso_core::StepResult::IO => {
let _ = io.run_once();
stmt.run_once().unwrap();
}
turso_core::StepResult::Done => {
break;

View file

@ -2,7 +2,7 @@ use std::sync::Arc;
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, SamplingMode};
use pprof::criterion::{Output, PProfProfiler};
use turso_core::{Database, PlatformIO, IO as _};
use turso_core::{Database, PlatformIO};
const TPC_H_PATH: &str = "../perf/tpc-h/TPC-H.db";
@ -97,7 +97,7 @@ fn bench_tpc_h_queries(criterion: &mut Criterion) {
black_box(stmt.row());
}
turso_core::StepResult::IO => {
let _ = io.run_once();
stmt.run_once().unwrap();
}
turso_core::StepResult::Done => {
break;

View file

@ -65,7 +65,10 @@ pub unsafe extern "C" fn execute(
return ResultCode::OK;
}
Ok(StepResult::IO) => {
let _ = conn.pager.io.run_once();
let res = stmt.run_once();
if res.is_err() {
return ResultCode::Error;
}
continue;
}
Ok(StepResult::Interrupt) => return ResultCode::Interrupt,
@ -154,7 +157,6 @@ pub unsafe extern "C" fn stmt_step(stmt: *mut Stmt) -> ResultCode {
tracing::error!("stmt_step: null connection or context");
return ResultCode::Error;
}
let conn: &Connection = unsafe { &*(stmt._conn as *const Connection) };
let stmt_ctx: &mut Statement = unsafe { &mut *(stmt._ctx as *mut Statement) };
while let Ok(res) = stmt_ctx.step() {
match res {
@ -162,7 +164,10 @@ pub unsafe extern "C" fn stmt_step(stmt: *mut Stmt) -> ResultCode {
StepResult::Done => return ResultCode::EOF,
StepResult::IO => {
// always handle IO step result internally.
let _ = conn.pager.io.run_once();
let res = stmt_ctx.run_once();
if res.is_err() {
return ResultCode::Error;
}
continue;
}
StepResult::Interrupt => return ResultCode::Interrupt,

View file

@ -18,7 +18,7 @@ use std::{
io::{ErrorKind, Read, Seek, Write},
sync::Arc,
};
use tracing::{debug, trace};
use tracing::{debug, instrument, trace, Level};
struct OwnedCallbacks(UnsafeCell<Callbacks>);
// We assume we locking on IO level is done by user.
@ -219,6 +219,7 @@ impl IO for UnixIO {
Ok(unix_file)
}
#[instrument(err, skip_all, level = Level::INFO)]
fn run_once(&self) -> Result<()> {
if self.callbacks.is_empty() {
return Ok(());
@ -333,6 +334,7 @@ impl File for UnixFile<'_> {
Ok(())
}
#[instrument(err, skip_all, level = Level::INFO)]
fn pread(&self, pos: usize, c: Completion) -> Result<Arc<Completion>> {
let file = self.file.borrow();
let result = {
@ -366,6 +368,7 @@ impl File for UnixFile<'_> {
}
}
#[instrument(err, skip_all, level = Level::INFO)]
fn pwrite(
&self,
pos: usize,
@ -401,6 +404,7 @@ impl File for UnixFile<'_> {
}
}
#[instrument(err, skip_all, level = Level::INFO)]
fn sync(&self, c: Completion) -> Result<Arc<Completion>> {
let file = self.file.borrow();
let result = fs::fsync(file.as_fd());
@ -415,6 +419,7 @@ impl File for UnixFile<'_> {
}
}
#[instrument(err, skip_all, level = Level::INFO)]
fn size(&self) -> Result<u64> {
let file = self.file.borrow();
Ok(file.metadata()?.len())

View file

@ -98,7 +98,7 @@ pub type Result<T, E = LimboError> = std::result::Result<T, E>;
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
enum TransactionState {
Write { change_schema: bool },
Write { schema_did_change: bool },
Read,
None,
}
@ -218,7 +218,7 @@ impl Database {
if is_empty == 2 {
// parse schema
let conn = db.connect()?;
let schema_version = get_schema_version(&conn, &io)?;
let schema_version = get_schema_version(&conn)?;
schema.write().schema_version = schema_version;
let rows = conn.query("SELECT * FROM sqlite_schema")?;
let mut schema = schema
@ -226,7 +226,7 @@ impl Database {
.expect("lock on schema should succeed first try");
let syms = conn.syms.borrow();
if let Err(LimboError::ExtensionError(e)) =
parse_schema_rows(rows, &mut schema, io, &syms, None)
parse_schema_rows(rows, &mut schema, &syms, None)
{
// this means that a vtab exists and we no longer have the module loaded. we print
// a warning to the user to load the module
@ -393,7 +393,7 @@ impl Database {
}
}
fn get_schema_version(conn: &Arc<Connection>, io: &Arc<dyn IO>) -> Result<u32> {
fn get_schema_version(conn: &Arc<Connection>) -> Result<u32> {
let mut rows = conn
.query("PRAGMA schema_version")?
.ok_or(LimboError::InternalError(
@ -412,7 +412,7 @@ fn get_schema_version(conn: &Arc<Connection>, io: &Arc<dyn IO>) -> Result<u32> {
schema_version = Some(row.get::<i64>(0)? as u32);
}
StepResult::IO => {
io.run_once()?;
rows.run_once()?;
}
StepResult::Interrupt => {
return Err(LimboError::InternalError(
@ -490,7 +490,7 @@ pub struct Connection {
}
impl Connection {
#[instrument(skip_all, level = Level::TRACE)]
#[instrument(skip_all, level = Level::INFO)]
pub fn prepare(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<Statement> {
if sql.as_ref().is_empty() {
return Err(LimboError::InvalidArgument(
@ -531,7 +531,7 @@ impl Connection {
}
}
#[instrument(skip_all, level = Level::TRACE)]
#[instrument(skip_all, level = Level::INFO)]
pub fn query(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<Option<Statement>> {
let sql = sql.as_ref();
tracing::trace!("Querying: {}", sql);
@ -547,7 +547,7 @@ impl Connection {
}
}
#[instrument(skip_all, level = Level::TRACE)]
#[instrument(skip_all, level = Level::INFO)]
pub(crate) fn run_cmd(
self: &Arc<Connection>,
cmd: Cmd,
@ -600,7 +600,7 @@ impl Connection {
/// Execute will run a query from start to finish taking ownership of I/O because it will run pending I/Os if it didn't finish.
/// TODO: make this api async
#[instrument(skip_all, level = Level::TRACE)]
#[instrument(skip_all, level = Level::INFO)]
pub fn execute(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<()> {
let sql = sql.as_ref();
let mut parser = Parser::new(sql.as_bytes());
@ -647,7 +647,7 @@ impl Connection {
if matches!(res, StepResult::Done) {
break;
}
self._db.io.run_once()?;
self.run_once()?;
}
}
}
@ -655,6 +655,17 @@ impl Connection {
Ok(())
}
fn run_once(&self) -> Result<()> {
let res = self._db.io.run_once();
if res.is_err() {
let state = self.transaction_state.get();
if let TransactionState::Write { schema_did_change } = state {
self.pager.rollback(schema_did_change, self)?
}
}
res
}
#[cfg(feature = "fs")]
pub fn from_uri(
uri: &str,
@ -800,7 +811,7 @@ impl Connection {
{
let syms = self.syms.borrow();
if let Err(LimboError::ExtensionError(e)) =
parse_schema_rows(rows, &mut schema, self.pager.io.clone(), &syms, None)
parse_schema_rows(rows, &mut schema, &syms, None)
{
// this means that a vtab exists and we no longer have the module loaded. we print
// a warning to the user to load the module
@ -927,7 +938,15 @@ impl Statement {
}
pub fn run_once(&self) -> Result<()> {
self.pager.io.run_once()
let res = self.pager.io.run_once();
if res.is_err() {
let state = self.program.connection.transaction_state.get();
if let TransactionState::Write { schema_did_change } = state {
self.pager
.rollback(schema_did_change, &self.program.connection)?
}
}
res
}
pub fn num_columns(&self) -> usize {

View file

@ -592,6 +592,7 @@ impl BTreeCursor {
/// Check if the table is empty.
/// This is done by checking if the root page has no cells.
#[instrument(skip_all, level = Level::INFO)]
fn is_empty_table(&self) -> Result<CursorResult<bool>> {
if let Some(mv_cursor) = &self.mv_cursor {
let mv_cursor = mv_cursor.borrow();
@ -606,7 +607,7 @@ impl BTreeCursor {
/// Move the cursor to the previous record and return it.
/// Used in backwards iteration.
#[instrument(skip(self), level = Level::TRACE, name = "prev")]
#[instrument(skip(self), level = Level::INFO, name = "prev")]
fn get_prev_record(&mut self) -> Result<CursorResult<bool>> {
loop {
let page = self.stack.top();
@ -717,7 +718,7 @@ impl BTreeCursor {
/// Reads the record of a cell that has overflow pages. This is a state machine that requires to be called until completion so everything
/// that calls this function should be reentrant.
#[instrument(skip_all, level = Level::TRACE)]
#[instrument(skip_all, level = Level::INFO)]
fn process_overflow_read(
&self,
payload: &'static [u8],
@ -835,6 +836,7 @@ impl BTreeCursor {
///
/// If the cell has overflow pages, it will skip till the overflow page which
/// is at the offset given.
#[instrument(skip_all, level = Level::INFO)]
pub fn read_write_payload_with_offset(
&mut self,
mut offset: u32,
@ -945,6 +947,7 @@ impl BTreeCursor {
Ok(CursorResult::Ok(()))
}
#[instrument(skip_all, level = Level::INFO)]
pub fn continue_payload_overflow_with_offset(
&mut self,
buffer: &mut Vec<u8>,
@ -1121,7 +1124,7 @@ impl BTreeCursor {
/// Move the cursor to the next record and return it.
/// Used in forwards iteration, which is the default.
#[instrument(skip(self), level = Level::TRACE, name = "next")]
#[instrument(skip(self), level = Level::INFO, name = "next")]
fn get_next_record(&mut self) -> Result<CursorResult<bool>> {
if let Some(mv_cursor) = &self.mv_cursor {
let mut mv_cursor = mv_cursor.borrow_mut();
@ -1261,20 +1264,21 @@ impl BTreeCursor {
}
/// Move the cursor to the root page of the btree.
#[instrument(skip_all, level = Level::TRACE)]
fn move_to_root(&mut self) {
#[instrument(skip_all, level = Level::INFO)]
fn move_to_root(&mut self) -> Result<()> {
self.seek_state = CursorSeekState::Start;
self.going_upwards = false;
tracing::trace!(root_page = self.root_page);
let mem_page = self.read_page(self.root_page).unwrap();
let mem_page = self.read_page(self.root_page)?;
self.stack.clear();
self.stack.push(mem_page);
Ok(())
}
/// Move the cursor to the rightmost record in the btree.
#[instrument(skip(self), level = Level::TRACE)]
#[instrument(skip(self), level = Level::INFO)]
fn move_to_rightmost(&mut self) -> Result<CursorResult<bool>> {
self.move_to_root();
self.move_to_root()?;
loop {
let mem_page = self.stack.top();
@ -1307,7 +1311,7 @@ impl BTreeCursor {
}
/// Specialized version of move_to() for table btrees.
#[instrument(skip(self), level = Level::TRACE)]
#[instrument(skip(self), level = Level::INFO)]
fn tablebtree_move_to(&mut self, rowid: i64, seek_op: SeekOp) -> Result<CursorResult<()>> {
'outer: loop {
let page = self.stack.top();
@ -1425,7 +1429,7 @@ impl BTreeCursor {
}
/// Specialized version of move_to() for index btrees.
#[instrument(skip(self, index_key), level = Level::TRACE)]
#[instrument(skip(self, index_key), level = Level::INFO)]
fn indexbtree_move_to(
&mut self,
index_key: &ImmutableRecord,
@ -1641,7 +1645,7 @@ impl BTreeCursor {
/// Specialized version of do_seek() for table btrees that uses binary search instead
/// of iterating cells in order.
#[instrument(skip_all, level = Level::TRACE)]
#[instrument(skip_all, level = Level::INFO)]
fn tablebtree_seek(&mut self, rowid: i64, seek_op: SeekOp) -> Result<CursorResult<bool>> {
turso_assert!(
self.mv_cursor.is_none(),
@ -1761,7 +1765,7 @@ impl BTreeCursor {
}
}
#[instrument(skip_all, level = Level::TRACE)]
#[instrument(skip_all, level = Level::INFO)]
fn indexbtree_seek(
&mut self,
key: &ImmutableRecord,
@ -2032,7 +2036,7 @@ impl BTreeCursor {
}
}
#[instrument(skip_all, level = Level::TRACE)]
#[instrument(skip_all, level = Level::INFO)]
pub fn move_to(&mut self, key: SeekKey<'_>, cmp: SeekOp) -> Result<CursorResult<()>> {
turso_assert!(
self.mv_cursor.is_none(),
@ -2072,7 +2076,7 @@ impl BTreeCursor {
self.seek_state = CursorSeekState::Start;
}
if matches!(self.seek_state, CursorSeekState::Start) {
self.move_to_root();
self.move_to_root()?;
}
let ret = match key {
@ -2085,7 +2089,7 @@ impl BTreeCursor {
/// Insert a record into the btree.
/// If the insert operation overflows the page, it will be split and the btree will be balanced.
#[instrument(skip_all, level = Level::TRACE)]
#[instrument(skip_all, level = Level::INFO)]
fn insert_into_page(&mut self, bkey: &BTreeKey) -> Result<CursorResult<()>> {
let record = bkey
.get_record()
@ -2266,7 +2270,7 @@ impl BTreeCursor {
/// This is a naive algorithm that doesn't try to distribute cells evenly by content.
/// It will try to split the page in half by keys not by content.
/// Sqlite tries to have a page at least 40% full.
#[instrument(skip(self), level = Level::TRACE)]
#[instrument(skip(self), level = Level::INFO)]
fn balance(&mut self) -> Result<CursorResult<()>> {
turso_assert!(
matches!(self.state, CursorState::Write(_)),
@ -2310,7 +2314,7 @@ impl BTreeCursor {
}
if !self.stack.has_parent() {
self.balance_root();
self.balance_root()?;
}
let write_info = self.state.mut_write_info().unwrap();
@ -2328,6 +2332,7 @@ impl BTreeCursor {
}
/// Balance a non root page by trying to balance cells between a maximum of 3 siblings that should be neighboring the page that overflowed/underflowed.
#[instrument(skip_all, level = Level::INFO)]
fn balance_non_root(&mut self) -> Result<CursorResult<()>> {
turso_assert!(
matches!(self.state, CursorState::Write(_)),
@ -2885,7 +2890,7 @@ impl BTreeCursor {
pages_to_balance_new[i].replace(page.clone());
} else {
// FIXME: handle page cache is full
let page = self.allocate_page(page_type, 0);
let page = self.allocate_page(page_type, 0)?;
pages_to_balance_new[i].replace(page);
// Since this page didn't exist before, we can set it to cells length as it
// marks them as empty since it is a prefix sum of cells.
@ -3780,7 +3785,7 @@ impl BTreeCursor {
/// Balance the root page.
/// This is done when the root page overflows, and we need to create a new root page.
/// See e.g. https://en.wikipedia.org/wiki/B-tree
fn balance_root(&mut self) {
fn balance_root(&mut self) -> Result<()> {
/* todo: balance deeper, create child and copy contents of root there. Then split root */
/* if we are in root page then we just need to create a new root and push key there */
@ -3797,7 +3802,7 @@ impl BTreeCursor {
// FIXME: handle page cache is full
let child_btree =
self.pager
.do_allocate_page(root_contents.page_type(), 0, BtreePageAllocMode::Any);
.do_allocate_page(root_contents.page_type(), 0, BtreePageAllocMode::Any)?;
tracing::debug!(
"balance_root(root={}, rightmost={}, page_type={:?})",
@ -3855,6 +3860,7 @@ impl BTreeCursor {
self.stack.push(root_btree.clone());
self.stack.set_cell_index(0); // leave parent pointing at the rightmost pointer (in this case 0, as there are no cells), since we will be balancing the rightmost child page.
self.stack.push(child_btree.clone());
Ok(())
}
fn usable_space(&self) -> usize {
@ -3862,6 +3868,7 @@ impl BTreeCursor {
}
/// Find the index of the cell in the page that contains the given rowid.
#[instrument( skip_all, level = Level::INFO)]
fn find_cell(&mut self, page: &PageContent, key: &BTreeKey) -> Result<CursorResult<usize>> {
if self.find_cell_state.0.is_none() {
self.find_cell_state.set(0);
@ -3936,9 +3943,10 @@ impl BTreeCursor {
Ok(CursorResult::Ok(cell_idx))
}
#[instrument(skip_all, level = Level::INFO)]
pub fn seek_end(&mut self) -> Result<CursorResult<()>> {
assert!(self.mv_cursor.is_none()); // unsure about this -_-
self.move_to_root();
self.move_to_root()?;
loop {
let mem_page = self.stack.top();
let page_id = mem_page.get().get().id;
@ -3964,6 +3972,7 @@ impl BTreeCursor {
}
}
#[instrument(skip_all, level = Level::INFO)]
pub fn seek_to_last(&mut self) -> Result<CursorResult<()>> {
let has_record = return_if_io!(self.move_to_rightmost());
self.invalidate_record();
@ -3984,13 +3993,14 @@ impl BTreeCursor {
self.root_page
}
#[instrument(skip_all, level = Level::INFO)]
pub fn rewind(&mut self) -> Result<CursorResult<()>> {
if self.mv_cursor.is_some() {
let cursor_has_record = return_if_io!(self.get_next_record());
self.invalidate_record();
self.has_record.replace(cursor_has_record);
} else {
self.move_to_root();
self.move_to_root()?;
let cursor_has_record = return_if_io!(self.get_next_record());
self.invalidate_record();
@ -3999,6 +4009,7 @@ impl BTreeCursor {
Ok(CursorResult::Ok(()))
}
#[instrument(skip_all, level = Level::INFO)]
pub fn last(&mut self) -> Result<CursorResult<()>> {
assert!(self.mv_cursor.is_none());
let cursor_has_record = return_if_io!(self.move_to_rightmost());
@ -4007,6 +4018,7 @@ impl BTreeCursor {
Ok(CursorResult::Ok(()))
}
#[instrument(skip_all, level = Level::INFO)]
pub fn next(&mut self) -> Result<CursorResult<bool>> {
return_if_io!(self.restore_context());
let cursor_has_record = return_if_io!(self.get_next_record());
@ -4022,6 +4034,7 @@ impl BTreeCursor {
.invalidate();
}
#[instrument(skip_all, level = Level::INFO)]
pub fn prev(&mut self) -> Result<CursorResult<bool>> {
assert!(self.mv_cursor.is_none());
return_if_io!(self.restore_context());
@ -4031,7 +4044,7 @@ impl BTreeCursor {
Ok(CursorResult::Ok(cursor_has_record))
}
#[instrument(skip(self), level = Level::TRACE)]
#[instrument(skip(self), level = Level::INFO)]
pub fn rowid(&mut self) -> Result<CursorResult<Option<i64>>> {
if let Some(mv_cursor) = &self.mv_cursor {
let mv_cursor = mv_cursor.borrow();
@ -4073,7 +4086,7 @@ impl BTreeCursor {
}
}
#[instrument(skip(self), level = Level::TRACE)]
#[instrument(skip(self), level = Level::INFO)]
pub fn seek(&mut self, key: SeekKey<'_>, op: SeekOp) -> Result<CursorResult<bool>> {
assert!(self.mv_cursor.is_none());
// Empty trace to capture the span information
@ -4094,7 +4107,7 @@ impl BTreeCursor {
/// Return a reference to the record the cursor is currently pointing to.
/// If record was not parsed yet, then we have to parse it and in case of I/O we yield control
/// back.
#[instrument(skip(self), level = Level::TRACE)]
#[instrument(skip(self), level = Level::INFO)]
pub fn record(&self) -> Result<CursorResult<Option<Ref<ImmutableRecord>>>> {
if !self.has_record.get() {
return Ok(CursorResult::Ok(None));
@ -4162,7 +4175,7 @@ impl BTreeCursor {
Ok(CursorResult::Ok(Some(record_ref)))
}
#[instrument(skip(self), level = Level::TRACE)]
#[instrument(skip(self), level = Level::INFO)]
pub fn insert(
&mut self,
key: &BTreeKey,
@ -4232,7 +4245,7 @@ impl BTreeCursor {
/// 7. WaitForBalancingToComplete -> perform balancing
/// 8. SeekAfterBalancing -> adjust the cursor to a node that is closer to the deleted value. go to Finish
/// 9. Finish -> Delete operation is done. Return CursorResult(Ok())
#[instrument(skip(self), level = Level::TRACE)]
#[instrument(skip(self), level = Level::INFO)]
pub fn delete(&mut self) -> Result<CursorResult<()>> {
assert!(self.mv_cursor.is_none());
@ -4609,6 +4622,7 @@ impl BTreeCursor {
}
/// Search for a key in an Index Btree. Looking up indexes that need to be unique, we cannot compare the rowid
#[instrument(skip_all, level = Level::INFO)]
pub fn key_exists_in_index(&mut self, key: &ImmutableRecord) -> Result<CursorResult<bool>> {
return_if_io!(self.seek(SeekKey::IndexKey(key), SeekOp::GE { eq_only: true }));
@ -4638,6 +4652,7 @@ impl BTreeCursor {
}
}
#[instrument(skip_all, level = Level::INFO)]
pub fn exists(&mut self, key: &Value) -> Result<CursorResult<bool>> {
assert!(self.mv_cursor.is_none());
let int_key = match key {
@ -4654,6 +4669,7 @@ impl BTreeCursor {
/// Clear the overflow pages linked to a specific page provided by the leaf cell
/// Uses a state machine to keep track of it's operations so that traversal can be
/// resumed from last point after IO interruption
#[instrument(skip_all, level = Level::INFO)]
fn clear_overflow_pages(&mut self, cell: &BTreeCell) -> Result<CursorResult<()>> {
loop {
let state = self.overflow_state.take().unwrap_or(OverflowState::Start);
@ -4722,10 +4738,10 @@ impl BTreeCursor {
/// ```
///
/// The destruction order would be: [4',4,5,2,6,7,3,1]
#[instrument(skip(self), level = Level::TRACE)]
#[instrument(skip(self), level = Level::INFO)]
pub fn btree_destroy(&mut self) -> Result<CursorResult<Option<usize>>> {
if let CursorState::None = &self.state {
self.move_to_root();
self.move_to_root()?;
self.state = CursorState::Destroy(DestroyInfo {
state: DestroyState::Start,
});
@ -4998,10 +5014,10 @@ impl BTreeCursor {
/// Count the number of entries in the b-tree
///
/// Only supposed to be used in the context of a simple Count Select Statement
#[instrument(skip(self), level = Level::TRACE)]
#[instrument(skip(self), level = Level::INFO)]
pub fn count(&mut self) -> Result<CursorResult<usize>> {
if self.count == 0 {
self.move_to_root();
self.move_to_root()?;
}
if let Some(_mv_cursor) = &self.mv_cursor {
@ -5034,7 +5050,7 @@ impl BTreeCursor {
loop {
if !self.stack.has_parent() {
// All pages of the b-tree have been visited. Return successfully
self.move_to_root();
self.move_to_root()?;
return Ok(CursorResult::Ok(self.count));
}
@ -5107,6 +5123,7 @@ impl BTreeCursor {
}
/// If context is defined, restore it and set it None on success
#[instrument(skip_all, level = Level::INFO)]
fn restore_context(&mut self) -> Result<CursorResult<()>> {
if self.context.is_none() || !matches!(self.valid_state, CursorValidState::RequireSeek) {
return Ok(CursorResult::Ok(()));
@ -5137,7 +5154,7 @@ impl BTreeCursor {
btree_read_page(&self.pager, page_idx)
}
pub fn allocate_page(&self, page_type: PageType, offset: usize) -> BTreePage {
pub fn allocate_page(&self, page_type: PageType, offset: usize) -> Result<BTreePage> {
self.pager
.do_allocate_page(page_type, offset, BtreePageAllocMode::Any)
}
@ -5545,7 +5562,7 @@ impl PageStack {
}
/// Push a new page onto the stack.
/// This effectively means traversing to a child page.
#[instrument(skip_all, level = Level::TRACE, name = "pagestack::push")]
#[instrument(skip_all, level = Level::INFO, name = "pagestack::push")]
fn _push(&self, page: BTreePage, starting_cell_idx: i32) {
tracing::trace!(
current = self.current_page.get(),
@ -5572,7 +5589,7 @@ impl PageStack {
/// Pop a page off the stack.
/// This effectively means traversing back up to a parent page.
#[instrument(skip_all, level = Level::TRACE, name = "pagestack::pop")]
#[instrument(skip_all, level = Level::INFO, name = "pagestack::pop")]
fn pop(&self) {
let current = self.current_page.get();
assert!(current >= 0);
@ -5584,7 +5601,7 @@ impl PageStack {
/// Get the top page on the stack.
/// This is the page that is currently being traversed.
#[instrument(skip(self), level = Level::TRACE, name = "pagestack::top", )]
#[instrument(skip(self), level = Level::INFO, name = "pagestack::top", )]
fn top(&self) -> BTreePage {
let page = self.stack.borrow()[self.current()]
.as_ref()
@ -5616,7 +5633,7 @@ impl PageStack {
/// Advance the current cell index of the current page to the next cell.
/// We usually advance after going traversing a new page
#[instrument(skip(self), level = Level::TRACE, name = "pagestack::advance",)]
#[instrument(skip(self), level = Level::INFO, name = "pagestack::advance",)]
fn advance(&self) {
let current = self.current();
tracing::trace!(
@ -5626,7 +5643,7 @@ impl PageStack {
self.cell_indices.borrow_mut()[current] += 1;
}
#[instrument(skip(self), level = Level::TRACE, name = "pagestack::retreat")]
#[instrument(skip(self), level = Level::INFO, name = "pagestack::retreat")]
fn retreat(&self) {
let current = self.current();
tracing::trace!(
@ -7066,10 +7083,10 @@ mod tests {
}
run_until_done(|| pager.begin_read_tx(), &pager).unwrap();
// FIXME: add sorted vector instead, should be okay for small amounts of keys for now :P, too lazy to fix right now
cursor.move_to_root();
cursor.move_to_root().unwrap();
let mut valid = true;
if do_validate {
cursor.move_to_root();
cursor.move_to_root().unwrap();
for key in keys.iter() {
tracing::trace!("seeking key: {}", key);
run_until_done(|| cursor.next(), pager.deref()).unwrap();
@ -7102,7 +7119,7 @@ mod tests {
if matches!(validate_btree(pager.clone(), root_page), (_, false)) {
panic!("invalid btree");
}
cursor.move_to_root();
cursor.move_to_root().unwrap();
for key in keys.iter() {
tracing::trace!("seeking key: {}", key);
run_until_done(|| cursor.next(), pager.deref()).unwrap();
@ -7181,7 +7198,7 @@ mod tests {
pager.deref(),
)
.unwrap();
cursor.move_to_root();
cursor.move_to_root().unwrap();
loop {
match pager.end_tx(false, false, &conn, false).unwrap() {
crate::PagerCacheflushStatus::Done(_) => break,
@ -7194,7 +7211,7 @@ mod tests {
// Check that all keys can be found by seeking
pager.begin_read_tx().unwrap();
cursor.move_to_root();
cursor.move_to_root().unwrap();
for (i, key) in keys.iter().enumerate() {
tracing::info!("seeking key {}/{}: {:?}", i + 1, keys.len(), key);
let exists = run_until_done(
@ -7214,7 +7231,7 @@ mod tests {
assert!(exists, "key {:?} is not found", key);
}
// Check that key count is right
cursor.move_to_root();
cursor.move_to_root().unwrap();
let mut count = 0;
while run_until_done(|| cursor.next(), pager.deref()).unwrap() {
count += 1;
@ -7227,7 +7244,7 @@ mod tests {
keys.len()
);
// Check that all keys can be found in-order, by iterating the btree
cursor.move_to_root();
cursor.move_to_root().unwrap();
let mut prev = None;
for (i, key) in keys.iter().enumerate() {
tracing::info!("iterating key {}/{}: {:?}", i + 1, keys.len(), key);
@ -7571,11 +7588,11 @@ mod tests {
let mut cursor = BTreeCursor::new_table(None, pager.clone(), 2);
// Initialize page 2 as a root page (interior)
let root_page = cursor.allocate_page(PageType::TableInterior, 0);
let root_page = cursor.allocate_page(PageType::TableInterior, 0)?;
// Allocate two leaf pages
let page3 = cursor.allocate_page(PageType::TableLeaf, 0);
let page4 = cursor.allocate_page(PageType::TableLeaf, 0);
let page3 = cursor.allocate_page(PageType::TableLeaf, 0)?;
let page4 = cursor.allocate_page(PageType::TableLeaf, 0)?;
// Configure the root page to point to the two leaf pages
{
@ -8429,7 +8446,7 @@ mod tests {
);
}
let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page);
cursor.move_to_root();
cursor.move_to_root().unwrap();
for i in 0..iterations {
let has_next = run_until_done(|| cursor.next(), pager.deref()).unwrap();
if !has_next {

View file

@ -2,6 +2,7 @@ use crate::error::LimboError;
use crate::io::CompletionType;
use crate::{io::Completion, Buffer, Result};
use std::{cell::RefCell, sync::Arc};
use tracing::{instrument, Level};
/// DatabaseStorage is an interface a database file that consists of pages.
///
@ -32,6 +33,7 @@ unsafe impl Sync for DatabaseFile {}
#[cfg(feature = "fs")]
impl DatabaseStorage for DatabaseFile {
#[instrument(skip_all, level = Level::INFO)]
fn read_page(&self, page_idx: usize, c: Completion) -> Result<()> {
let r = c.as_read();
let size = r.buf().len();
@ -44,6 +46,7 @@ impl DatabaseStorage for DatabaseFile {
Ok(())
}
#[instrument(skip_all, level = Level::INFO)]
fn write_page(
&self,
page_idx: usize,
@ -60,11 +63,13 @@ impl DatabaseStorage for DatabaseFile {
Ok(())
}
#[instrument(skip_all, level = Level::INFO)]
fn sync(&self, c: Completion) -> Result<()> {
let _ = self.file.sync(c)?;
Ok(())
}
#[instrument(skip_all, level = Level::INFO)]
fn size(&self) -> Result<u64> {
self.file.size()
}
@ -85,6 +90,7 @@ unsafe impl Send for FileMemoryStorage {}
unsafe impl Sync for FileMemoryStorage {}
impl DatabaseStorage for FileMemoryStorage {
#[instrument(skip_all, level = Level::INFO)]
fn read_page(&self, page_idx: usize, c: Completion) -> Result<()> {
let r = match c.completion_type {
CompletionType::Read(ref r) => r,
@ -100,6 +106,7 @@ impl DatabaseStorage for FileMemoryStorage {
Ok(())
}
#[instrument(skip_all, level = Level::INFO)]
fn write_page(
&self,
page_idx: usize,
@ -115,11 +122,13 @@ impl DatabaseStorage for FileMemoryStorage {
Ok(())
}
#[instrument(skip_all, level = Level::INFO)]
fn sync(&self, c: Completion) -> Result<()> {
let _ = self.file.sync(c)?;
Ok(())
}
#[instrument(skip_all, level = Level::INFO)]
fn size(&self) -> Result<u64> {
self.file.size()
}

View file

@ -14,7 +14,7 @@ use std::collections::HashSet;
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use tracing::{trace, Level};
use tracing::{instrument, trace, Level};
use super::btree::{btree_init_page, BTreePage};
use super::page_cache::{CacheError, CacheResizeResult, DumbLruPageCache, PageCacheKey};
@ -471,6 +471,7 @@ impl Pager {
/// This method is used to allocate a new root page for a btree, both for tables and indexes
/// FIXME: handle no room in page cache
#[instrument(skip_all, level = Level::INFO)]
pub fn btree_create(&self, flags: &CreateBTreeFlags) -> Result<CursorResult<u32>> {
let page_type = match flags {
_ if flags.is_table() => PageType::TableLeaf,
@ -479,7 +480,7 @@ impl Pager {
};
#[cfg(feature = "omit_autovacuum")]
{
let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any);
let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)?;
let page_id = page.get().get().id;
Ok(CursorResult::Ok(page_id as u32))
}
@ -490,7 +491,7 @@ impl Pager {
let auto_vacuum_mode = self.auto_vacuum_mode.borrow();
match *auto_vacuum_mode {
AutoVacuumMode::None => {
let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any);
let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)?;
let page_id = page.get().get().id;
Ok(CursorResult::Ok(page_id as u32))
}
@ -514,7 +515,7 @@ impl Pager {
page_type,
0,
BtreePageAllocMode::Exact(root_page_num),
);
)?;
let allocated_page_id = page.get().get().id as u32;
if allocated_page_id != root_page_num {
// TODO(Zaid): Handle swapping the allocated page with the desired root page
@ -558,8 +559,8 @@ impl Pager {
page_type: PageType,
offset: usize,
_alloc_mode: BtreePageAllocMode,
) -> BTreePage {
let page = self.allocate_page().unwrap();
) -> Result<BTreePage> {
let page = self.allocate_page()?;
let page = Arc::new(BTreePageInner {
page: RefCell::new(page),
});
@ -569,7 +570,7 @@ impl Pager {
page.get().get().id,
page.get().get_contents().page_type()
);
page
Ok(page)
}
/// The "usable size" of a database page is the page size specified by the 2-byte integer at offset 16
@ -589,6 +590,7 @@ impl Pager {
}
#[inline(always)]
#[instrument(skip_all, level = Level::INFO)]
pub fn begin_read_tx(&self) -> Result<CursorResult<LimboResult>> {
// We allocate the first page lazily in the first transaction
match self.maybe_allocate_page1()? {
@ -598,6 +600,7 @@ impl Pager {
Ok(CursorResult::Ok(self.wal.borrow_mut().begin_read_tx()?))
}
#[instrument(skip_all, level = Level::INFO)]
fn maybe_allocate_page1(&self) -> Result<CursorResult<()>> {
if self.is_empty.load(Ordering::SeqCst) < DB_STATE_INITIALIZED {
if let Ok(_lock) = self.init_lock.try_lock() {
@ -621,6 +624,7 @@ impl Pager {
}
#[inline(always)]
#[instrument(skip_all, level = Level::INFO)]
pub fn begin_write_tx(&self) -> Result<CursorResult<LimboResult>> {
// TODO(Diego): The only possibly allocate page1 here is because OpenEphemeral needs a write transaction
// we should have a unique API to begin transactions, something like sqlite3BtreeBeginTrans
@ -631,10 +635,11 @@ impl Pager {
Ok(CursorResult::Ok(self.wal.borrow_mut().begin_write_tx()?))
}
#[instrument(skip_all, level = Level::INFO)]
pub fn end_tx(
&self,
rollback: bool,
change_schema: bool,
schema_did_change: bool,
connection: &Connection,
wal_checkpoint_disabled: bool,
) -> Result<PagerCacheflushStatus> {
@ -648,7 +653,7 @@ impl Pager {
match cacheflush_status {
PagerCacheflushStatus::IO => Ok(PagerCacheflushStatus::IO),
PagerCacheflushStatus::Done(_) => {
let maybe_schema_pair = if change_schema {
let maybe_schema_pair = if schema_did_change {
let schema = connection.schema.borrow().clone();
// Lock first before writing to the database schema in case someone tries to read the schema before it's updated
let db_schema = connection._db.schema.write();
@ -666,13 +671,14 @@ impl Pager {
}
}
#[instrument(skip_all, level = Level::INFO)]
pub fn end_read_tx(&self) -> Result<()> {
self.wal.borrow().end_read_tx()?;
Ok(())
}
/// Reads a page from the database.
#[tracing::instrument(skip_all, level = Level::DEBUG)]
#[tracing::instrument(skip_all, level = Level::INFO)]
pub fn read_page(&self, page_idx: usize) -> Result<PageRef, LimboError> {
tracing::trace!("read_page(page_idx = {})", page_idx);
let mut page_cache = self.page_cache.write();
@ -759,11 +765,12 @@ impl Pager {
/// In the base case, it will write the dirty pages to the WAL and then fsync the WAL.
/// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to
/// the database file and then fsync the database file.
#[instrument(skip_all, level = Level::INFO)]
pub fn cacheflush(&self, wal_checkpoint_disabled: bool) -> Result<PagerCacheflushStatus> {
let mut checkpoint_result = CheckpointResult::default();
loop {
let res = loop {
let state = self.flush_info.borrow().state;
trace!("cacheflush {:?}", state);
trace!(?state);
match state {
FlushState::Start => {
let db_size = header_accessor::get_database_size(self)?;
@ -795,7 +802,6 @@ impl Pager {
let in_flight = *self.flush_info.borrow().in_flight_writes.borrow();
if in_flight == 0 {
self.flush_info.borrow_mut().state = FlushState::SyncWal;
self.wal.borrow_mut().finish_append_frames_commit()?;
} else {
return Ok(PagerCacheflushStatus::IO);
}
@ -807,9 +813,7 @@ impl Pager {
if wal_checkpoint_disabled || !self.wal.borrow().should_checkpoint() {
self.flush_info.borrow_mut().state = FlushState::Start;
return Ok(PagerCacheflushStatus::Done(
PagerCacheflushResult::WalWritten,
));
break PagerCacheflushResult::WalWritten;
}
self.flush_info.borrow_mut().state = FlushState::Checkpoint;
}
@ -831,16 +835,17 @@ impl Pager {
return Ok(PagerCacheflushStatus::IO);
} else {
self.flush_info.borrow_mut().state = FlushState::Start;
break;
break PagerCacheflushResult::Checkpointed(checkpoint_result);
}
}
}
}
Ok(PagerCacheflushStatus::Done(
PagerCacheflushResult::Checkpointed(checkpoint_result),
))
};
// We should only signal that we finished appenind frames after wal sync to avoid inconsistencies when sync fails
self.wal.borrow_mut().finish_append_frames_commit()?;
Ok(PagerCacheflushStatus::Done(res))
}
#[instrument(skip_all, level = Level::INFO)]
pub fn wal_get_frame(
&self,
frame_no: u32,
@ -856,11 +861,12 @@ impl Pager {
)
}
#[instrument(skip_all, level = Level::INFO, target = "pager_checkpoint",)]
pub fn checkpoint(&self) -> Result<CheckpointStatus> {
let mut checkpoint_result = CheckpointResult::default();
loop {
let state = *self.checkpoint_state.borrow();
trace!("pager_checkpoint(state={:?})", state);
trace!(?state);
match state {
CheckpointState::Checkpoint => {
let in_flight = self.checkpoint_inflight.clone();
@ -932,6 +938,7 @@ impl Pager {
Ok(())
}
#[instrument(skip_all, level = Level::INFO)]
pub fn wal_checkpoint(&self, wal_checkpoint_disabled: bool) -> Result<CheckpointResult> {
if wal_checkpoint_disabled {
return Ok(CheckpointResult {
@ -947,7 +954,7 @@ impl Pager {
CheckpointMode::Passive,
) {
Ok(CheckpointStatus::IO) => {
let _ = self.io.run_once();
self.io.run_once()?;
}
Ok(CheckpointStatus::Done(res)) => {
checkpoint_result = res;
@ -965,6 +972,7 @@ impl Pager {
// Providing a page is optional, if provided it will be used to avoid reading the page from disk.
// This is implemented in accordance with sqlite freepage2() function.
#[instrument(skip_all, level = Level::INFO)]
pub fn free_page(&self, page: Option<PageRef>, page_id: usize) -> Result<()> {
tracing::trace!("free_page(page_id={})", page_id);
const TRUNK_PAGE_HEADER_SIZE: usize = 8;
@ -1036,6 +1044,7 @@ impl Pager {
Ok(())
}
#[instrument(skip_all, level = Level::INFO)]
pub fn allocate_page1(&self) -> Result<CursorResult<PageRef>> {
let state = self.allocate_page1_state.borrow().clone();
match state {
@ -1111,6 +1120,7 @@ impl Pager {
*/
// FIXME: handle no room in page cache
#[allow(clippy::readonly_write_lock)]
#[instrument(skip_all, level = Level::INFO)]
pub fn allocate_page(&self) -> Result<PageRef> {
let old_db_size = header_accessor::get_database_size(self)?;
#[allow(unused_mut)]
@ -1195,12 +1205,18 @@ impl Pager {
(page_size - reserved_space) as usize
}
pub fn rollback(&self, change_schema: bool, connection: &Connection) -> Result<(), LimboError> {
#[instrument(skip_all, level = Level::INFO)]
pub fn rollback(
&self,
schema_did_change: bool,
connection: &Connection,
) -> Result<(), LimboError> {
tracing::debug!(schema_did_change);
self.dirty_pages.borrow_mut().clear();
let mut cache = self.page_cache.write();
cache.unset_dirty_all_pages();
cache.clear().expect("failed to clear page cache");
if change_schema {
if schema_did_change {
let prev_schema = connection._db.schema.read().clone();
connection.schema.replace(prev_schema);
}

View file

@ -727,6 +727,7 @@ impl PageContent {
}
}
#[instrument(skip_all, level = Level::INFO)]
pub fn begin_read_page(
db_file: Arc<dyn DatabaseStorage>,
buffer_pool: Arc<BufferPool>,
@ -773,6 +774,7 @@ pub fn finish_read_page(
Ok(())
}
#[instrument(skip_all, level = Level::INFO)]
pub fn begin_write_btree_page(
pager: &Pager,
page: &PageRef,
@ -791,13 +793,14 @@ pub fn begin_write_btree_page(
};
*write_counter.borrow_mut() += 1;
let clone_counter = write_counter.clone();
let write_complete = {
let buf_copy = buffer.clone();
Box::new(move |bytes_written: i32| {
tracing::trace!("finish_write_btree_page");
let buf_copy = buf_copy.clone();
let buf_len = buf_copy.borrow().len();
*write_counter.borrow_mut() -= 1;
*clone_counter.borrow_mut() -= 1;
page_finish.clear_dirty();
if bytes_written < buf_len as i32 {
@ -806,10 +809,15 @@ pub fn begin_write_btree_page(
})
};
let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete)));
page_source.write_page(page_id, buffer.clone(), c)?;
Ok(())
let res = page_source.write_page(page_id, buffer.clone(), c);
if res.is_err() {
// Avoid infinite loop if write page fails
*write_counter.borrow_mut() -= 1;
}
res
}
#[instrument(skip_all, level = Level::INFO)]
pub fn begin_sync(db_file: Arc<dyn DatabaseStorage>, syncing: Rc<RefCell<bool>>) -> Result<()> {
assert!(!*syncing.borrow());
*syncing.borrow_mut() = true;
@ -1481,7 +1489,7 @@ pub fn begin_read_wal_frame(
Ok(c)
}
#[instrument(skip(io, page, write_counter, wal_header, checksums), level = Level::TRACE)]
#[instrument(err,skip(io, page, write_counter, wal_header, checksums), level = Level::INFO)]
#[allow(clippy::too_many_arguments)]
pub fn begin_write_wal_frame(
io: &Arc<dyn File>,
@ -1548,13 +1556,14 @@ pub fn begin_write_wal_frame(
(Arc::new(RefCell::new(buffer)), final_checksum)
};
let clone_counter = write_counter.clone();
*write_counter.borrow_mut() += 1;
let write_complete = {
let buf_copy = buffer.clone();
Box::new(move |bytes_written: i32| {
let buf_copy = buf_copy.clone();
let buf_len = buf_copy.borrow().len();
*write_counter.borrow_mut() -= 1;
*clone_counter.borrow_mut() -= 1;
page_finish.clear_dirty();
if bytes_written < buf_len as i32 {
@ -1564,7 +1573,12 @@ pub fn begin_write_wal_frame(
};
#[allow(clippy::arc_with_non_send_sync)]
let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete)));
io.pwrite(offset, buffer.clone(), c)?;
let res = io.pwrite(offset, buffer.clone(), c);
if res.is_err() {
// If we do not reduce the counter here on error, we incur an infinite loop when cacheflushing
*write_counter.borrow_mut() -= 1;
}
res?;
tracing::trace!("Frame written and synced");
Ok(checksums)
}

View file

@ -499,6 +499,7 @@ impl fmt::Debug for WalFileShared {
impl Wal for WalFile {
/// Begin a read transaction.
#[instrument(skip_all, level = Level::INFO)]
fn begin_read_tx(&mut self) -> Result<LimboResult> {
let max_frame_in_wal = self.get_shared().max_frame.load(Ordering::SeqCst);
@ -564,6 +565,7 @@ impl Wal for WalFile {
/// End a read transaction.
#[inline(always)]
#[instrument(skip_all, level = Level::INFO)]
fn end_read_tx(&self) -> Result<LimboResult> {
tracing::debug!("end_read_tx(lock={})", self.max_frame_read_lock_index);
let read_lock = &mut self.get_shared().read_locks[self.max_frame_read_lock_index];
@ -572,6 +574,7 @@ impl Wal for WalFile {
}
/// Begin a write transaction
#[instrument(skip_all, level = Level::INFO)]
fn begin_write_tx(&mut self) -> Result<LimboResult> {
let busy = !self.get_shared().write_lock.write();
tracing::debug!("begin_write_transaction(busy={})", busy);
@ -582,6 +585,7 @@ impl Wal for WalFile {
}
/// End a write transaction
#[instrument(skip_all, level = Level::INFO)]
fn end_write_tx(&self) -> Result<LimboResult> {
tracing::debug!("end_write_txn");
self.get_shared().write_lock.unlock();
@ -589,6 +593,7 @@ impl Wal for WalFile {
}
/// Find the latest frame containing a page.
#[instrument(skip_all, level = Level::INFO)]
fn find_frame(&self, page_id: u64) -> Result<Option<u64>> {
let shared = self.get_shared();
let frames = shared.frame_cache.lock();
@ -606,6 +611,7 @@ impl Wal for WalFile {
}
/// Read a frame from the WAL.
#[instrument(skip_all, level = Level::INFO)]
fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Arc<BufferPool>) -> Result<()> {
tracing::debug!("read_frame({})", frame_id);
let offset = self.frame_offset(frame_id);
@ -624,6 +630,7 @@ impl Wal for WalFile {
Ok(())
}
#[instrument(skip_all, level = Level::INFO)]
fn read_frame_raw(
&self,
frame_id: u64,
@ -650,6 +657,7 @@ impl Wal for WalFile {
}
/// Write a frame to the WAL.
#[instrument(skip_all, level = Level::INFO)]
fn append_frame(
&mut self,
page: PageRef,
@ -660,12 +668,7 @@ impl Wal for WalFile {
let max_frame = self.max_frame;
let frame_id = if max_frame == 0 { 1 } else { max_frame + 1 };
let offset = self.frame_offset(frame_id);
tracing::debug!(
"append_frame(frame={}, offset={}, page_id={})",
frame_id,
offset,
page_id
);
tracing::debug!(frame_id, offset, page_id);
let checksums = {
let shared = self.get_shared();
let header = shared.wal_header.clone();
@ -699,13 +702,14 @@ impl Wal for WalFile {
Ok(())
}
#[instrument(skip_all, level = Level::INFO)]
fn should_checkpoint(&self) -> bool {
let shared = self.get_shared();
let frame_id = shared.max_frame.load(Ordering::SeqCst) as usize;
frame_id >= self.checkpoint_threshold
}
#[instrument(skip_all, level = Level::TRACE)]
#[instrument(skip_all, level = Level::INFO)]
fn checkpoint(
&mut self,
pager: &Pager,
@ -869,7 +873,7 @@ impl Wal for WalFile {
}
}
#[instrument(skip_all, level = Level::DEBUG)]
#[instrument(err, skip_all, level = Level::INFO)]
fn sync(&mut self) -> Result<WalFsyncStatus> {
match self.sync_state.get() {
SyncState::NotSyncing => {
@ -911,6 +915,7 @@ impl Wal for WalFile {
self.min_frame
}
#[instrument(err, skip_all, level = Level::INFO)]
fn rollback(&mut self) -> Result<()> {
// TODO(pere): have to remove things from frame_cache because they are no longer valid.
// TODO(pere): clear page cache in pager.
@ -918,7 +923,7 @@ impl Wal for WalFile {
// TODO(pere): implement proper hashmap, this sucks :).
let shared = self.get_shared();
let max_frame = shared.max_frame.load(Ordering::SeqCst);
tracing::trace!("rollback(to_max_frame={})", max_frame);
tracing::debug!(to_max_frame = max_frame);
let mut frame_cache = shared.frame_cache.lock();
for (_, frames) in frame_cache.iter_mut() {
let mut last_valid_frame = frames.len();
@ -936,14 +941,11 @@ impl Wal for WalFile {
Ok(())
}
#[instrument(skip_all, level = Level::INFO)]
fn finish_append_frames_commit(&mut self) -> Result<()> {
let shared = self.get_shared();
shared.max_frame.store(self.max_frame, Ordering::SeqCst);
tracing::trace!(
"finish_append_frames_commit(max_frame={}, last_checksum={:?})",
self.max_frame,
self.last_checksum
);
tracing::trace!(self.max_frame, ?self.last_checksum);
shared.last_checksum = self.last_checksum;
Ok(())
}

View file

@ -11,7 +11,7 @@ use turso_sqlite3_parser::ast::{CompoundOperator, SortOrder};
use tracing::Level;
#[instrument(skip_all, level = Level::TRACE)]
#[instrument(skip_all, level = Level::INFO)]
pub fn emit_program_for_compound_select(
program: &mut ProgramBuilder,
plan: Plan,

View file

@ -201,7 +201,7 @@ pub enum TransactionMode {
/// Main entry point for emitting bytecode for a SQL query
/// Takes a query plan and generates the corresponding bytecode program
#[instrument(skip_all, level = Level::TRACE)]
#[instrument(skip_all, level = Level::INFO)]
pub fn emit_program(
program: &mut ProgramBuilder,
plan: Plan,
@ -219,7 +219,7 @@ pub fn emit_program(
}
}
#[instrument(skip_all, level = Level::TRACE)]
#[instrument(skip_all, level = Level::INFO)]
fn emit_program_for_select(
program: &mut ProgramBuilder,
mut plan: SelectPlan,
@ -258,7 +258,7 @@ fn emit_program_for_select(
Ok(())
}
#[instrument(skip_all, level = Level::TRACE)]
#[instrument(skip_all, level = Level::INFO)]
pub fn emit_query<'a>(
program: &mut ProgramBuilder,
plan: &'a mut SelectPlan,
@ -398,7 +398,7 @@ pub fn emit_query<'a>(
Ok(t_ctx.reg_result_cols_start.unwrap())
}
#[instrument(skip_all, level = Level::TRACE)]
#[instrument(skip_all, level = Level::INFO)]
fn emit_program_for_delete(
program: &mut ProgramBuilder,
plan: DeletePlan,
@ -599,7 +599,7 @@ fn emit_delete_insns(
Ok(())
}
#[instrument(skip_all, level = Level::TRACE)]
#[instrument(skip_all, level = Level::INFO)]
fn emit_program_for_update(
program: &mut ProgramBuilder,
mut plan: UpdatePlan,
@ -718,7 +718,7 @@ fn emit_program_for_update(
Ok(())
}
#[instrument(skip_all, level = Level::TRACE)]
#[instrument(skip_all, level = Level::INFO)]
fn emit_update_insns(
plan: &UpdatePlan,
t_ctx: &TranslateCtx,

View file

@ -27,7 +27,7 @@ pub struct ConditionMetadata {
pub jump_target_when_false: BranchOffset,
}
#[instrument(skip_all, level = Level::TRACE)]
#[instrument(skip_all, level = Level::INFO)]
fn emit_cond_jump(program: &mut ProgramBuilder, cond_meta: ConditionMetadata, reg: usize) {
if cond_meta.jump_if_condition_is_true {
program.emit_insn(Insn::If {
@ -131,7 +131,7 @@ macro_rules! expect_arguments_even {
}};
}
#[instrument(skip(program, referenced_tables, expr, resolver), level = Level::TRACE)]
#[instrument(skip(program, referenced_tables, expr, resolver), level = Level::INFO)]
pub fn translate_condition_expr(
program: &mut ProgramBuilder,
referenced_tables: &TableReferences,

View file

@ -53,7 +53,7 @@ use transaction::{translate_tx_begin, translate_tx_commit};
use turso_sqlite3_parser::ast::{self, Delete, Insert};
use update::translate_update;
#[instrument(skip_all, level = Level::TRACE)]
#[instrument(skip_all, level = Level::INFO)]
#[allow(clippy::too_many_arguments)]
pub fn translate(
schema: &Schema,

View file

@ -3,7 +3,7 @@ use crate::{
schema::{self, Column, Schema, Type},
translate::{collate::CollationSeq, expr::walk_expr, plan::JoinOrderMember},
types::{Value, ValueType},
LimboError, OpenFlags, Result, Statement, StepResult, SymbolTable, IO,
LimboError, OpenFlags, Result, Statement, StepResult, SymbolTable,
};
use std::{rc::Rc, sync::Arc};
use turso_sqlite3_parser::ast::{
@ -51,7 +51,6 @@ struct UnparsedFromSqlIndex {
pub fn parse_schema_rows(
rows: Option<Statement>,
schema: &mut Schema,
io: Arc<dyn IO>,
syms: &SymbolTable,
mv_tx_id: Option<u64>,
) -> Result<()> {
@ -130,7 +129,7 @@ pub fn parse_schema_rows(
StepResult::IO => {
// TODO: How do we ensure that the I/O we submitted to
// read the schema is actually complete?
io.run_once()?;
rows.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,

View file

@ -301,7 +301,7 @@ impl ProgramBuilder {
});
}
#[instrument(skip(self), level = Level::TRACE)]
#[instrument(skip(self), level = Level::INFO)]
pub fn emit_insn(&mut self, insn: Insn) {
let function = insn.to_function();
// This seemingly empty trace here is needed so that a function span is emmited with it

View file

@ -1699,22 +1699,22 @@ pub fn op_transaction(
} else {
let current_state = conn.transaction_state.get();
let (new_transaction_state, updated) = match (current_state, write) {
(TransactionState::Write { change_schema }, true) => {
(TransactionState::Write { change_schema }, false)
(TransactionState::Write { schema_did_change }, true) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Write { change_schema }, false) => {
(TransactionState::Write { change_schema }, false)
(TransactionState::Write { schema_did_change }, false) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Read, true) => (
TransactionState::Write {
change_schema: false,
schema_did_change: false,
},
true,
),
(TransactionState::Read, false) => (TransactionState::Read, false),
(TransactionState::None, true) => (
TransactionState::Write {
change_schema: false,
schema_did_change: false,
},
true,
),
@ -1766,9 +1766,9 @@ pub fn op_auto_commit(
super::StepResult::Busy => Ok(InsnFunctionStepResult::Busy),
};
}
let change_schema =
if let TransactionState::Write { change_schema } = conn.transaction_state.get() {
change_schema
let schema_did_change =
if let TransactionState::Write { schema_did_change } = conn.transaction_state.get() {
schema_did_change
} else {
false
};
@ -1776,7 +1776,7 @@ pub fn op_auto_commit(
if *auto_commit != conn.auto_commit.get() {
if *rollback {
// TODO(pere): add rollback I/O logic once we implement rollback journal
pager.rollback(change_schema, &conn)?;
pager.rollback(schema_did_change, &conn)?;
conn.auto_commit.replace(true);
} else {
conn.auto_commit.replace(*auto_commit);
@ -4978,7 +4978,6 @@ pub fn op_parse_schema(
parse_schema_rows(
Some(stmt),
&mut new_schema,
conn.pager.io.clone(),
&conn.syms.borrow(),
state.mv_tx_id,
)?;
@ -4993,7 +4992,6 @@ pub fn op_parse_schema(
parse_schema_rows(
Some(stmt),
&mut new_schema,
conn.pager.io.clone(),
&conn.syms.borrow(),
state.mv_tx_id,
)?;
@ -5065,8 +5063,8 @@ pub fn op_set_cookie(
Cookie::SchemaVersion => {
// we update transaction state to indicate that the schema has changed
match program.connection.transaction_state.get() {
TransactionState::Write { change_schema } => {
program.connection.transaction_state.set(TransactionState::Write { change_schema: true });
TransactionState::Write { schema_did_change } => {
program.connection.transaction_state.set(TransactionState::Write { schema_did_change: true });
},
TransactionState::Read => unreachable!("invalid transaction state for SetCookie: TransactionState::Read, should be write"),
TransactionState::None => unreachable!("invalid transaction state for SetCookie: TransactionState::None, should be write"),

View file

@ -368,6 +368,7 @@ pub struct Program {
}
impl Program {
#[instrument(skip_all, level = Level::INFO)]
pub fn step(
&self,
state: &mut ProgramState,
@ -382,8 +383,14 @@ impl Program {
let _ = state.result_row.take();
let (insn, insn_function) = &self.insns[state.pc as usize];
trace_insn(self, state.pc as InsnReference, insn);
let res = insn_function(self, state, insn, &pager, mv_store.as_ref())?;
match res {
let res = insn_function(self, state, insn, &pager, mv_store.as_ref());
if res.is_err() {
let state = self.connection.transaction_state.get();
if let TransactionState::Write { schema_did_change } = state {
pager.rollback(schema_did_change, &self.connection)?
}
}
match res? {
InsnFunctionStepResult::Step => {}
InsnFunctionStepResult::Done => return Ok(StepResult::Done),
InsnFunctionStepResult::IO => return Ok(StepResult::IO),
@ -394,7 +401,7 @@ impl Program {
}
}
#[instrument(skip_all, level = Level::TRACE)]
#[instrument(skip_all, level = Level::INFO)]
pub fn commit_txn(
&self,
pager: Rc<Pager>,
@ -422,7 +429,8 @@ impl Program {
program_state.commit_state
);
if program_state.commit_state == CommitState::Committing {
let TransactionState::Write { change_schema } = connection.transaction_state.get()
let TransactionState::Write { schema_did_change } =
connection.transaction_state.get()
else {
unreachable!("invalid state for write commit step")
};
@ -431,18 +439,18 @@ impl Program {
&mut program_state.commit_state,
&connection,
rollback,
change_schema,
schema_did_change,
)
} else if auto_commit {
let current_state = connection.transaction_state.get();
tracing::trace!("Auto-commit state: {:?}", current_state);
match current_state {
TransactionState::Write { change_schema } => self.step_end_write_txn(
TransactionState::Write { schema_did_change } => self.step_end_write_txn(
&pager,
&mut program_state.commit_state,
&connection,
rollback,
change_schema,
schema_did_change,
),
TransactionState::Read => {
connection.transaction_state.replace(TransactionState::None);
@ -460,26 +468,32 @@ impl Program {
}
}
#[instrument(skip(self, pager, connection), level = Level::TRACE)]
#[instrument(skip(self, pager, connection), level = Level::INFO)]
fn step_end_write_txn(
&self,
pager: &Rc<Pager>,
commit_state: &mut CommitState,
connection: &Connection,
rollback: bool,
change_schema: bool,
schema_did_change: bool,
) -> Result<StepResult> {
let cacheflush_status = pager.end_tx(
rollback,
change_schema,
schema_did_change,
connection,
connection.wal_checkpoint_disabled.get(),
)?;
match cacheflush_status {
PagerCacheflushStatus::Done(_) => {
PagerCacheflushStatus::Done(status) => {
if self.change_cnt_on {
self.connection.set_changes(self.n_change.get());
}
if matches!(
status,
crate::storage::pager::PagerCacheflushResult::Rollback
) {
pager.rollback(schema_did_change, connection)?;
}
connection.transaction_state.replace(TransactionState::None);
*commit_state = CommitState::Ready;
}
@ -553,7 +567,7 @@ fn make_record(registers: &[Register], start_reg: &usize, count: &usize) -> Immu
ImmutableRecord::from_registers(regs, regs.len())
}
#[instrument(skip(program), level = Level::TRACE)]
#[instrument(skip(program), level = Level::INFO)]
fn trace_insn(program: &Program, addr: InsnReference, insn: &Insn) {
if !tracing::enabled!(tracing::Level::TRACE) {
return;

View file

@ -194,7 +194,7 @@ fn do_fuzz(expr: Expr) -> Result<Corpus, Box<dyn Error>> {
loop {
use turso_core::StepResult;
match stmt.step()? {
StepResult::IO => io.run_once()?,
StepResult::IO => stmt.run_once()?,
StepResult::Row => {
let row = stmt.row().unwrap();
assert_eq!(row.len(), 1, "expr: {:?}", expr);

View file

@ -2,28 +2,8 @@
set -e
iterations=""
while [[ $# -gt 0 ]]; do
case $1 in
--iterations)
iterations="$2"
shift 2
;;
*)
echo "Unknown option: $1"
echo "Usage: $0 [--max-iterations N]"
exit 1
;;
esac
done
if [[ -n "$iterations" ]]; then
echo "Running limbo_sim for $iterations iterations..."
for ((i=1; i<=iterations; i++)); do
echo "Iteration $i of $iterations"
cargo run -p limbo_sim
done
echo "Completed $iterations iterations"
if [[ -n "$@" ]]; then
cargo run -p limbo_sim -- "$@"
else
echo "Running limbo_sim in infinite loop..."
while true; do

View file

@ -7,14 +7,14 @@ use std::{
};
use serde::{Deserialize, Serialize};
use turso_core::{Connection, Result, StepResult, IO};
use turso_core::{Connection, Result, StepResult};
use crate::{
model::{
query::{update::Update, Create, CreateIndex, Delete, Drop, Insert, Query, Select},
table::SimValue,
},
runner::{env::SimConnection, io::SimulatorIO},
runner::env::SimConnection,
SimulatorEnv,
};
@ -266,7 +266,7 @@ impl Display for Interaction {
}
}
type AssertionFunc = dyn Fn(&Vec<ResultSet>, &SimulatorEnv) -> Result<bool>;
type AssertionFunc = dyn Fn(&Vec<ResultSet>, &mut SimulatorEnv) -> Result<bool>;
enum AssertionAST {
Pick(),
@ -411,7 +411,7 @@ impl Interaction {
}
}
}
pub(crate) fn execute_query(&self, conn: &mut Arc<Connection>, io: &SimulatorIO) -> ResultSet {
pub(crate) fn execute_query(&self, conn: &mut Arc<Connection>) -> ResultSet {
if let Self::Query(query) = self {
let query_str = query.to_string();
let rows = conn.query(&query_str);
@ -440,7 +440,7 @@ impl Interaction {
out.push(r);
}
StepResult::IO => {
io.run_once().unwrap();
rows.run_once().unwrap();
}
StepResult::Interrupt => {}
StepResult::Done => {
@ -459,7 +459,7 @@ impl Interaction {
pub(crate) fn execute_assertion(
&self,
stack: &Vec<ResultSet>,
env: &SimulatorEnv,
env: &mut SimulatorEnv,
) -> Result<()> {
match self {
Self::Assertion(assertion) => {
@ -484,7 +484,7 @@ impl Interaction {
pub(crate) fn execute_assumption(
&self,
stack: &Vec<ResultSet>,
env: &SimulatorEnv,
env: &mut SimulatorEnv,
) -> Result<()> {
match self {
Self::Assumption(assumption) => {
@ -664,6 +664,7 @@ fn reopen_database(env: &mut SimulatorEnv) {
env.connections.clear();
// Clear all open files
// TODO: for correct reporting of faults we should get all the recorded numbers and transfer to the new file
env.io.files.borrow_mut().clear();
// 2. Re-open database

View file

@ -11,6 +11,7 @@ use crate::{
Create, Delete, Drop, Insert, Query, Select,
},
table::SimValue,
FAULT_ERROR_MSG,
},
runner::env::SimulatorEnv,
};
@ -200,7 +201,7 @@ impl Property {
message: format!("table {} exists", insert.table()),
func: Box::new({
let table_name = table.clone();
move |_: &Vec<ResultSet>, env: &SimulatorEnv| {
move |_: &Vec<ResultSet>, env: &mut SimulatorEnv| {
Ok(env.tables.iter().any(|t| t.name == table_name))
}
}),
@ -221,7 +222,7 @@ impl Property {
.map(|i| !i.end_with_commit)
.unwrap_or(false),
),
func: Box::new(move |stack: &Vec<ResultSet>, _: &SimulatorEnv| {
func: Box::new(move |stack: &Vec<ResultSet>, _| {
let rows = stack.last().unwrap();
match rows {
Ok(rows) => {
@ -248,7 +249,7 @@ impl Property {
let assumption = Interaction::Assumption(Assertion {
message: "Double-Create-Failure should not be called on an existing table"
.to_string(),
func: Box::new(move |_: &Vec<ResultSet>, env: &SimulatorEnv| {
func: Box::new(move |_: &Vec<ResultSet>, env: &mut SimulatorEnv| {
Ok(!env.tables.iter().any(|t| t.name == table_name))
}),
});
@ -262,7 +263,7 @@ impl Property {
message:
"creating two tables with the name should result in a failure for the second query"
.to_string(),
func: Box::new(move |stack: &Vec<ResultSet>, _: &SimulatorEnv| {
func: Box::new(move |stack: &Vec<ResultSet>, _| {
let last = stack.last().unwrap();
match last {
Ok(_) => Ok(false),
@ -287,7 +288,7 @@ impl Property {
message: format!("table {} exists", table_name),
func: Box::new({
let table_name = table_name.clone();
move |_: &Vec<ResultSet>, env: &SimulatorEnv| {
move |_, env: &mut SimulatorEnv| {
Ok(env.tables.iter().any(|t| t.name == table_name))
}
}),
@ -299,7 +300,7 @@ impl Property {
let assertion = Interaction::Assertion(Assertion {
message: "select query should respect the limit clause".to_string(),
func: Box::new(move |stack: &Vec<ResultSet>, _: &SimulatorEnv| {
func: Box::new(move |stack: &Vec<ResultSet>, _| {
let last = stack.last().unwrap();
match last {
Ok(rows) => Ok(limit >= rows.len()),
@ -323,7 +324,7 @@ impl Property {
message: format!("table {} exists", table),
func: Box::new({
let table = table.clone();
move |_: &Vec<ResultSet>, env: &SimulatorEnv| {
move |_: &Vec<ResultSet>, env: &mut SimulatorEnv| {
Ok(env.tables.iter().any(|t| t.name == table))
}
}),
@ -344,7 +345,7 @@ impl Property {
let assertion = Interaction::Assertion(Assertion {
message: format!("`{}` should return no values for table `{}`", select, table,),
func: Box::new(move |stack: &Vec<ResultSet>, _: &SimulatorEnv| {
func: Box::new(move |stack: &Vec<ResultSet>, _| {
let rows = stack.last().unwrap();
match rows {
Ok(rows) => Ok(rows.is_empty()),
@ -371,7 +372,7 @@ impl Property {
message: format!("table {} exists", table),
func: Box::new({
let table = table.clone();
move |_: &Vec<ResultSet>, env: &SimulatorEnv| {
move |_, env: &mut SimulatorEnv| {
Ok(env.tables.iter().any(|t| t.name == table))
}
}),
@ -384,7 +385,7 @@ impl Property {
"select query should result in an error for table '{}'",
table
),
func: Box::new(move |stack: &Vec<ResultSet>, _: &SimulatorEnv| {
func: Box::new(move |stack: &Vec<ResultSet>, _| {
let last = stack.last().unwrap();
match last {
Ok(_) => Ok(false),
@ -416,7 +417,7 @@ impl Property {
message: format!("table {} exists", table),
func: Box::new({
let table = table.clone();
move |_: &Vec<ResultSet>, env: &SimulatorEnv| {
move |_: &Vec<ResultSet>, env: &mut SimulatorEnv| {
Ok(env.tables.iter().any(|t| t.name == table))
}
}),
@ -440,7 +441,7 @@ impl Property {
let assertion = Interaction::Assertion(Assertion {
message: "select queries should return the same amount of results".to_string(),
func: Box::new(move |stack: &Vec<ResultSet>, _: &SimulatorEnv| {
func: Box::new(move |stack: &Vec<ResultSet>, _| {
let select_star = stack.last().unwrap();
let select_predicate = stack.get(stack.len() - 2).unwrap();
match (select_predicate, select_star) {
@ -476,7 +477,35 @@ impl Property {
}
Property::FaultyQuery { query, tables } => {
let checks = assert_all_table_values(tables);
let first = std::iter::once(Interaction::FaultyQuery(query.clone()));
let query_clone = query.clone();
let assumption = Assertion {
// A fault may not occur as we first signal we want a fault injected,
// then when IO is called the fault triggers. It may happen that a fault is injected
// but no IO happens right after it
message: "fault occured".to_string(),
func: Box::new(move |stack, env| {
let last = stack.last().unwrap();
match last {
Ok(_) => {
query_clone.shadow(env);
Ok(true)
}
Err(err) => {
let msg = format!("{}", err);
if msg.contains(FAULT_ERROR_MSG) {
Ok(true)
} else {
Err(LimboError::InternalError(msg))
}
}
}
}),
};
let first = [
Interaction::FaultyQuery(query.clone()),
Interaction::Assumption(assumption),
]
.into_iter();
Vec::from_iter(first.chain(checks))
}
}
@ -493,15 +522,15 @@ fn assert_all_table_values(tables: &[String]) -> impl Iterator<Item = Interactio
distinct: Distinctness::All,
}));
let assertion = Interaction::Assertion(Assertion {
message: format!(
"table {} should contain all of its values after the wal reopened",
table
),
message: format!("table {} should contain all of its values", table),
func: Box::new({
let table = table.clone();
move |stack: &Vec<ResultSet>, env: &SimulatorEnv| {
move |stack: &Vec<ResultSet>, env: &mut SimulatorEnv| {
let table = env.tables.iter().find(|t| t.name == table).ok_or_else(|| {
LimboError::InternalError(format!("table {} should exist", table))
LimboError::InternalError(format!(
"table {} should exist in simulator env",
table
))
})?;
let last = stack.last().unwrap();
match last {

View file

@ -1,2 +1,4 @@
pub mod query;
pub mod table;
pub(crate) const FAULT_ERROR_MSG: &str = "Injected fault";

View file

@ -16,19 +16,24 @@ pub(crate) struct Commit;
pub(crate) struct Rollback;
impl Begin {
pub(crate) fn shadow(&self, _env: &mut SimulatorEnv) -> Vec<Vec<SimValue>> {
pub(crate) fn shadow(&self, env: &mut SimulatorEnv) -> Vec<Vec<SimValue>> {
env.tables_snapshot = Some(env.tables.clone());
vec![]
}
}
impl Commit {
pub(crate) fn shadow(&self, _env: &mut SimulatorEnv) -> Vec<Vec<SimValue>> {
pub(crate) fn shadow(&self, env: &mut SimulatorEnv) -> Vec<Vec<SimValue>> {
env.tables_snapshot = None;
vec![]
}
}
impl Rollback {
pub(crate) fn shadow(&self, _env: &mut SimulatorEnv) -> Vec<Vec<SimValue>> {
pub(crate) fn shadow(&self, env: &mut SimulatorEnv) -> Vec<Vec<SimValue>> {
if let Some(tables) = env.tables_snapshot.take() {
env.tables = tables;
}
vec![]
}
}

View file

@ -54,7 +54,7 @@ pub struct SimulatorCLI {
pub disable_delete: bool,
#[clap(long, help = "disable CREATE Statement", default_value_t = false)]
pub disable_create: bool,
#[clap(long, help = "disable CREATE INDEX Statement", default_value_t = false)]
#[clap(long, help = "disable CREATE INDEX Statement", default_value_t = true)]
pub disable_create_index: bool,
#[clap(long, help = "disable DROP Statement", default_value_t = false)]
pub disable_drop: bool,
@ -84,7 +84,7 @@ pub struct SimulatorCLI {
pub disable_select_optimizer: bool,
#[clap(long, help = "disable FsyncNoWait Property", default_value_t = true)]
pub disable_fsync_no_wait: bool,
#[clap(long, help = "disable FaultyQuery Property", default_value_t = true)]
#[clap(long, help = "disable FaultyQuery Property", default_value_t = false)]
pub disable_faulty_query: bool,
#[clap(long, help = "disable Reopen-Database fault", default_value_t = false)]
pub disable_reopen_database: bool,

View file

@ -21,6 +21,7 @@ pub(crate) struct SimulatorEnv {
pub(crate) db: Arc<Database>,
pub(crate) rng: ChaCha8Rng,
pub(crate) db_path: String,
pub tables_snapshot: Option<Vec<Table>>,
}
impl SimulatorEnv {
@ -35,6 +36,7 @@ impl SimulatorEnv {
db: self.db.clone(),
rng: self.rng.clone(),
db_path: self.db_path.clone(),
tables_snapshot: None,
}
}
}
@ -164,6 +166,7 @@ impl SimulatorEnv {
io,
db,
db_path: db_path.to_str().unwrap().to_string(),
tables_snapshot: None,
}
}
}

View file

@ -191,7 +191,7 @@ pub(crate) fn execute_interaction(
SimConnection::Disconnected => unreachable!(),
};
let results = interaction.execute_query(conn, &env.io);
let results = interaction.execute_query(conn);
tracing::debug!(?results);
stack.push(results);
limbo_integrity_check(conn)?;

View file

@ -7,6 +7,8 @@ use rand::Rng as _;
use rand_chacha::ChaCha8Rng;
use tracing::{instrument, Level};
use turso_core::{CompletionType, File, Result};
use crate::model::FAULT_ERROR_MSG;
pub(crate) struct SimulatorFile {
pub(crate) inner: Arc<dyn File>,
pub(crate) fault: Cell<bool>,
@ -88,7 +90,7 @@ impl File for SimulatorFile {
fn lock_file(&self, exclusive: bool) -> Result<()> {
if self.fault.get() {
return Err(turso_core::LimboError::InternalError(
"Injected fault".into(),
FAULT_ERROR_MSG.into(),
));
}
self.inner.lock_file(exclusive)
@ -97,7 +99,7 @@ impl File for SimulatorFile {
fn unlock_file(&self) -> Result<()> {
if self.fault.get() {
return Err(turso_core::LimboError::InternalError(
"Injected fault".into(),
FAULT_ERROR_MSG.into(),
));
}
self.inner.unlock_file()
@ -113,7 +115,7 @@ impl File for SimulatorFile {
tracing::debug!("pread fault");
self.nr_pread_faults.set(self.nr_pread_faults.get() + 1);
return Err(turso_core::LimboError::InternalError(
"Injected fault".into(),
FAULT_ERROR_MSG.into(),
));
}
if let Some(latency) = self.generate_latency_duration() {
@ -148,7 +150,7 @@ impl File for SimulatorFile {
tracing::debug!("pwrite fault");
self.nr_pwrite_faults.set(self.nr_pwrite_faults.get() + 1);
return Err(turso_core::LimboError::InternalError(
"Injected fault".into(),
FAULT_ERROR_MSG.into(),
));
}
if let Some(latency) = self.generate_latency_duration() {
@ -178,7 +180,7 @@ impl File for SimulatorFile {
tracing::debug!("sync fault");
self.nr_sync_faults.set(self.nr_sync_faults.get() + 1);
return Err(turso_core::LimboError::InternalError(
"Injected fault".into(),
FAULT_ERROR_MSG.into(),
));
}
if let Some(latency) = self.generate_latency_duration() {

View file

@ -7,7 +7,7 @@ use rand::{RngCore, SeedableRng};
use rand_chacha::ChaCha8Rng;
use turso_core::{Clock, Instant, OpenFlags, PlatformIO, Result, IO};
use crate::runner::file::SimulatorFile;
use crate::{model::FAULT_ERROR_MSG, runner::file::SimulatorFile};
pub(crate) struct SimulatorIO {
pub(crate) inner: Box<dyn IO>,
@ -104,7 +104,7 @@ impl IO for SimulatorIO {
self.nr_run_once_faults
.replace(self.nr_run_once_faults.get() + 1);
return Err(turso_core::LimboError::InternalError(
"Injected fault".into(),
FAULT_ERROR_MSG.into(),
));
}
self.inner.run_once()?;

View file

@ -60,6 +60,7 @@ impl InteractionPlan {
.uses()
.iter()
.any(|t| depending_tables.contains(t));
if has_table {
// Remove the extensional parts of the properties
if let Interactions::Property(p) = interactions {
@ -82,13 +83,15 @@ impl InteractionPlan {
.iter()
.any(|t| depending_tables.contains(t));
}
has_table
&& !matches!(
interactions,
Interactions::Query(Query::Select(_))
| Interactions::Property(Property::SelectLimit { .. })
| Interactions::Property(Property::SelectSelectOptimizer { .. })
)
let is_fault = matches!(interactions, Interactions::Fault(..));
is_fault
|| (has_table
&& !matches!(
interactions,
Interactions::Query(Query::Select(_))
| Interactions::Property(Property::SelectLimit { .. })
| Interactions::Property(Property::SelectSelectOptimizer { .. })
))
};
idx += 1;
retain

View file

@ -247,12 +247,11 @@ pub unsafe extern "C" fn sqlite3_step(stmt: *mut sqlite3_stmt) -> ffi::c_int {
let stmt = &mut *stmt;
let db = &mut *stmt.db;
loop {
let db = db.inner.lock().unwrap();
let _db = db.inner.lock().unwrap();
if let Ok(result) = stmt.stmt.step() {
match result {
turso_core::StepResult::IO => {
let io = db.io.clone();
io.run_once().unwrap();
stmt.stmt.run_once().unwrap();
continue;
}
turso_core::StepResult::Done => return SQLITE_DONE,

View file

@ -183,7 +183,7 @@ pub(crate) fn sqlite_exec_rows(
}
pub(crate) fn limbo_exec_rows(
db: &TempDatabase,
_db: &TempDatabase,
conn: &Arc<turso_core::Connection>,
query: &str,
) -> Vec<Vec<rusqlite::types::Value>> {
@ -198,7 +198,7 @@ pub(crate) fn limbo_exec_rows(
break row;
}
turso_core::StepResult::IO => {
db.io.run_once().unwrap();
stmt.run_once().unwrap();
continue;
}
turso_core::StepResult::Done => break 'outer,
@ -221,7 +221,7 @@ pub(crate) fn limbo_exec_rows(
}
pub(crate) fn limbo_exec_rows_error(
db: &TempDatabase,
_db: &TempDatabase,
conn: &Arc<turso_core::Connection>,
query: &str,
) -> turso_core::Result<()> {
@ -230,7 +230,7 @@ pub(crate) fn limbo_exec_rows_error(
let result = stmt.step()?;
match result {
turso_core::StepResult::IO => {
db.io.run_once()?;
stmt.run_once()?;
continue;
}
turso_core::StepResult::Done => return Ok(()),

View file

@ -16,7 +16,7 @@ fn test_last_insert_rowid_basic() -> anyhow::Result<()> {
loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
rows.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
@ -36,7 +36,7 @@ fn test_last_insert_rowid_basic() -> anyhow::Result<()> {
}
}
StepResult::IO => {
tmp_db.io.run_once()?;
rows.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
@ -50,7 +50,7 @@ fn test_last_insert_rowid_basic() -> anyhow::Result<()> {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
rows.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
@ -72,7 +72,7 @@ fn test_last_insert_rowid_basic() -> anyhow::Result<()> {
}
}
StepResult::IO => {
tmp_db.io.run_once()?;
rows.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
@ -101,7 +101,7 @@ fn test_integer_primary_key() -> anyhow::Result<()> {
let mut insert_query = conn.query(query)?.unwrap();
loop {
match insert_query.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => insert_query.run_once()?,
StepResult::Done => break,
_ => unreachable!(),
}
@ -117,7 +117,7 @@ fn test_integer_primary_key() -> anyhow::Result<()> {
rowids.push(*id);
}
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => select_query.run_once()?,
StepResult::Interrupt | StepResult::Done => break,
StepResult::Busy => panic!("Database is busy"),
}

View file

@ -19,7 +19,7 @@ fn test_statement_reset_bind() -> anyhow::Result<()> {
turso_core::Value::Integer(1)
);
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => stmt.run_once()?,
_ => break,
}
}
@ -37,7 +37,7 @@ fn test_statement_reset_bind() -> anyhow::Result<()> {
turso_core::Value::Integer(2)
);
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => stmt.run_once()?,
_ => break,
}
}
@ -88,7 +88,7 @@ fn test_statement_bind() -> anyhow::Result<()> {
}
}
StepResult::IO => {
tmp_db.io.run_once()?;
stmt.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
@ -125,7 +125,7 @@ fn test_insert_parameter_remap() -> anyhow::Result<()> {
}
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@ -150,7 +150,7 @@ fn test_insert_parameter_remap() -> anyhow::Result<()> {
// D = 22
assert_eq!(row.get::<&Value>(3).unwrap(), &Value::Integer(22));
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@ -196,7 +196,7 @@ fn test_insert_parameter_remap_all_params() -> anyhow::Result<()> {
// execute the insert (no rows returned)
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@ -222,7 +222,7 @@ fn test_insert_parameter_remap_all_params() -> anyhow::Result<()> {
// D = 999
assert_eq!(row.get::<&Value>(3).unwrap(), &Value::Integer(999));
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@ -264,7 +264,7 @@ fn test_insert_parameter_multiple_remap_backwards() -> anyhow::Result<()> {
// execute the insert (no rows returned)
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@ -290,7 +290,7 @@ fn test_insert_parameter_multiple_remap_backwards() -> anyhow::Result<()> {
// D = 999
assert_eq!(row.get::<&Value>(3).unwrap(), &Value::Integer(444));
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@ -331,7 +331,7 @@ fn test_insert_parameter_multiple_no_remap() -> anyhow::Result<()> {
// execute the insert (no rows returned)
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@ -357,7 +357,7 @@ fn test_insert_parameter_multiple_no_remap() -> anyhow::Result<()> {
// D = 999
assert_eq!(row.get::<&Value>(3).unwrap(), &Value::Integer(444));
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@ -402,7 +402,7 @@ fn test_insert_parameter_multiple_row() -> anyhow::Result<()> {
// execute the insert (no rows returned)
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@ -434,7 +434,7 @@ fn test_insert_parameter_multiple_row() -> anyhow::Result<()> {
);
i += 1;
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@ -450,7 +450,7 @@ fn test_bind_parameters_update_query() -> anyhow::Result<()> {
let mut ins = conn.prepare("insert into test (a, b) values (3, 'test1');")?;
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@ -461,7 +461,7 @@ fn test_bind_parameters_update_query() -> anyhow::Result<()> {
ins.bind_at(2.try_into()?, Value::build_text("test1"));
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@ -476,7 +476,7 @@ fn test_bind_parameters_update_query() -> anyhow::Result<()> {
assert_eq!(row.get::<&Value>(0).unwrap(), &Value::Integer(222));
assert_eq!(row.get::<&Value>(1).unwrap(), &Value::build_text("test1"),);
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@ -495,7 +495,7 @@ fn test_bind_parameters_update_query_multiple_where() -> anyhow::Result<()> {
let mut ins = conn.prepare("insert into test (a, b, c, d) values (3, 'test1', 4, 5);")?;
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@ -507,7 +507,7 @@ fn test_bind_parameters_update_query_multiple_where() -> anyhow::Result<()> {
ins.bind_at(3.try_into()?, Value::Integer(5));
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@ -524,7 +524,7 @@ fn test_bind_parameters_update_query_multiple_where() -> anyhow::Result<()> {
assert_eq!(row.get::<&Value>(2).unwrap(), &Value::Integer(4));
assert_eq!(row.get::<&Value>(3).unwrap(), &Value::Integer(5));
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@ -543,7 +543,7 @@ fn test_bind_parameters_update_rowid_alias() -> anyhow::Result<()> {
let mut ins = conn.prepare("insert into test (id, name) values (1, 'test');")?;
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@ -558,7 +558,7 @@ fn test_bind_parameters_update_rowid_alias() -> anyhow::Result<()> {
assert_eq!(row.get::<&Value>(0).unwrap(), &Value::Integer(1));
assert_eq!(row.get::<&Value>(1).unwrap(), &Value::build_text("test"),);
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@ -568,7 +568,7 @@ fn test_bind_parameters_update_rowid_alias() -> anyhow::Result<()> {
ins.bind_at(2.try_into()?, Value::Integer(1));
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@ -583,7 +583,7 @@ fn test_bind_parameters_update_rowid_alias() -> anyhow::Result<()> {
assert_eq!(row.get::<&Value>(0).unwrap(), &Value::Integer(1));
assert_eq!(row.get::<&Value>(1).unwrap(), &Value::build_text("updated"),);
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@ -618,7 +618,7 @@ fn test_bind_parameters_update_rowid_alias_seek_rowid() -> anyhow::Result<()> {
&Value::Integer(if i == 0 { 4 } else { 11 })
);
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@ -631,7 +631,7 @@ fn test_bind_parameters_update_rowid_alias_seek_rowid() -> anyhow::Result<()> {
ins.bind_at(4.try_into()?, Value::Integer(5));
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@ -649,7 +649,7 @@ fn test_bind_parameters_update_rowid_alias_seek_rowid() -> anyhow::Result<()> {
&Value::build_text(if i == 0 { "updated" } else { "test" }),
);
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@ -678,7 +678,7 @@ fn test_bind_parameters_delete_rowid_alias_seek_out_of_order() -> anyhow::Result
ins.bind_at(4.try_into()?, Value::build_text("test"));
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@ -693,7 +693,7 @@ fn test_bind_parameters_delete_rowid_alias_seek_out_of_order() -> anyhow::Result
let row = sel.row().unwrap();
assert_eq!(row.get::<&Value>(0).unwrap(), &Value::build_text("correct"),);
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}

View file

@ -42,7 +42,7 @@ fn test_simple_overflow_page() -> anyhow::Result<()> {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
rows.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
@ -68,7 +68,7 @@ fn test_simple_overflow_page() -> anyhow::Result<()> {
compare_string(&huge_text, text);
}
StepResult::IO => {
tmp_db.io.run_once()?;
rows.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
@ -110,7 +110,7 @@ fn test_sequential_overflow_page() -> anyhow::Result<()> {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
rows.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
@ -138,7 +138,7 @@ fn test_sequential_overflow_page() -> anyhow::Result<()> {
current_index += 1;
}
StepResult::IO => {
tmp_db.io.run_once()?;
rows.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
@ -247,7 +247,7 @@ fn test_statement_reset() -> anyhow::Result<()> {
);
break;
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => stmt.run_once()?,
_ => break,
}
}
@ -264,7 +264,7 @@ fn test_statement_reset() -> anyhow::Result<()> {
);
break;
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => stmt.run_once()?,
_ => break,
}
}
@ -748,7 +748,7 @@ fn run_query_on_row(
}
fn run_query_core(
tmp_db: &TempDatabase,
_tmp_db: &TempDatabase,
conn: &Arc<Connection>,
query: &str,
mut on_row: Option<impl FnMut(&Row)>,
@ -757,7 +757,7 @@ fn run_query_core(
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
rows.run_once()?;
}
StepResult::Done => break,
StepResult::Row => {

View file

@ -13,7 +13,7 @@ fn test_wal_checkpoint_result() -> Result<()> {
let conn = tmp_db.connect_limbo();
conn.execute("CREATE TABLE t1 (id text);")?;
let res = execute_and_get_strings(&tmp_db, &conn, "pragma journal_mode;")?;
let res = execute_and_get_strings(&conn, "pragma journal_mode;")?;
assert_eq!(res, vec!["wal"]);
conn.execute("insert into t1(id) values (1), (2);")?;
@ -22,7 +22,7 @@ fn test_wal_checkpoint_result() -> Result<()> {
do_flush(&conn, &tmp_db).unwrap();
// checkpoint result should return > 0 num pages now as database has data
let res = execute_and_get_ints(&tmp_db, &conn, "pragma wal_checkpoint;")?;
let res = execute_and_get_ints(&conn, "pragma wal_checkpoint;")?;
println!("'pragma wal_checkpoint;' returns: {res:?}");
assert_eq!(res.len(), 3);
assert_eq!(res[0], 0); // checkpoint successfully
@ -46,7 +46,7 @@ fn test_wal_1_writer_1_reader() -> Result<()> {
match rows.step().unwrap() {
StepResult::Row => {}
StepResult::IO => {
tmp_db.lock().unwrap().io.run_once().unwrap();
rows.run_once().unwrap();
}
StepResult::Interrupt => break,
StepResult::Done => break,
@ -86,7 +86,7 @@ fn test_wal_1_writer_1_reader() -> Result<()> {
i += 1;
}
StepResult::IO => {
tmp_db.lock().unwrap().io.run_once().unwrap();
rows.run_once().unwrap();
}
StepResult::Interrupt => break,
StepResult::Done => break,
@ -110,11 +110,7 @@ fn test_wal_1_writer_1_reader() -> Result<()> {
}
/// Execute a statement and get strings result
pub(crate) fn execute_and_get_strings(
tmp_db: &TempDatabase,
conn: &Arc<Connection>,
sql: &str,
) -> Result<Vec<String>> {
pub(crate) fn execute_and_get_strings(conn: &Arc<Connection>, sql: &str) -> Result<Vec<String>> {
let statement = conn.prepare(sql)?;
let stmt = Rc::new(RefCell::new(statement));
let mut result = Vec::new();
@ -130,19 +126,15 @@ pub(crate) fn execute_and_get_strings(
}
StepResult::Done => break,
StepResult::Interrupt => break,
StepResult::IO => tmp_db.io.run_once()?,
StepResult::Busy => tmp_db.io.run_once()?,
StepResult::IO => stmt.run_once()?,
StepResult::Busy => stmt.run_once()?,
}
}
Ok(result)
}
/// Execute a statement and get integers
pub(crate) fn execute_and_get_ints(
tmp_db: &TempDatabase,
conn: &Arc<Connection>,
sql: &str,
) -> Result<Vec<i64>> {
pub(crate) fn execute_and_get_ints(conn: &Arc<Connection>, sql: &str) -> Result<Vec<i64>> {
let statement = conn.prepare(sql)?;
let stmt = Rc::new(RefCell::new(statement));
let mut result = Vec::new();
@ -166,8 +158,8 @@ pub(crate) fn execute_and_get_ints(
}
StepResult::Done => break,
StepResult::Interrupt => break,
StepResult::IO => tmp_db.io.run_once()?,
StepResult::Busy => tmp_db.io.run_once()?,
StepResult::IO => stmt.run_once()?,
StepResult::Busy => stmt.run_once()?,
}
}
Ok(result)