mirror of
https://github.com/tursodatabase/limbo.git
synced 2025-08-04 18:18:03 +00:00
Merge 'Fix infinite loops, rollback problems, and other bugs found by I/O fault injection' from Pedro Muniz
Was running the sim with I/O faults enabled and fixed some nasty bugs. Now, there are some more nasty bugs to fix as well. This is the command that I use to run the simulator `cargo run -p limbo_sim -- --minimum- tests 10 --maximum-tests 1000` This PR mainly fixes the following bugs: - Not decrementing in flight write counter when `pwrite` fails - not rolling back the transaction on `step` error - not rolling back the transaction on `run_once` error - some functions were just being unwrapped when they could suffer io errors - Only change max_frame after wal sync's Reviewed-by: Pere Diaz Bou <pere-altea@homail.com> Reviewed-by: Pere Diaz Bou <pere-altea@homail.com> Closes #1946
This commit is contained in:
commit
c206c56847
45 changed files with 448 additions and 329 deletions
25
Makefile
25
Makefile
|
@ -6,6 +6,7 @@ UNAME_S := $(shell uname -s)
|
|||
|
||||
# Executable used to execute the compatibility tests.
|
||||
SQLITE_EXEC ?= scripts/limbo-sqlite3
|
||||
RUST_LOG := off
|
||||
|
||||
all: check-rust-version check-wasm-target limbo limbo-wasm
|
||||
.PHONY: all
|
||||
|
@ -55,23 +56,23 @@ test: limbo uv-sync test-compat test-vector test-sqlite3 test-shell test-extensi
|
|||
.PHONY: test
|
||||
|
||||
test-extensions: limbo uv-sync
|
||||
uv run --project limbo_test test-extensions
|
||||
RUST_LOG=$(RUST_LOG) uv run --project limbo_test test-extensions
|
||||
.PHONY: test-extensions
|
||||
|
||||
test-shell: limbo uv-sync
|
||||
SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-shell
|
||||
RUST_LOG=$(RUST_LOG) SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-shell
|
||||
.PHONY: test-shell
|
||||
|
||||
test-compat:
|
||||
SQLITE_EXEC=$(SQLITE_EXEC) ./testing/all.test
|
||||
RUST_LOG=$(RUST_LOG) SQLITE_EXEC=$(SQLITE_EXEC) ./testing/all.test
|
||||
.PHONY: test-compat
|
||||
|
||||
test-vector:
|
||||
SQLITE_EXEC=$(SQLITE_EXEC) ./testing/vector.test
|
||||
RUST_LOG=$(RUST_LOG) SQLITE_EXEC=$(SQLITE_EXEC) ./testing/vector.test
|
||||
.PHONY: test-vector
|
||||
|
||||
test-time:
|
||||
SQLITE_EXEC=$(SQLITE_EXEC) ./testing/time.test
|
||||
RUST_LOG=$(RUST_LOG) SQLITE_EXEC=$(SQLITE_EXEC) ./testing/time.test
|
||||
.PHONY: test-time
|
||||
|
||||
reset-db:
|
||||
|
@ -85,16 +86,16 @@ test-sqlite3: reset-db
|
|||
.PHONY: test-sqlite3
|
||||
|
||||
test-json:
|
||||
SQLITE_EXEC=$(SQLITE_EXEC) ./testing/json.test
|
||||
RUST_LOG=$(RUST_LOG) SQLITE_EXEC=$(SQLITE_EXEC) ./testing/json.test
|
||||
.PHONY: test-json
|
||||
|
||||
test-memory: limbo uv-sync
|
||||
SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-memory
|
||||
RUST_LOG=$(RUST_LOG) SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-memory
|
||||
.PHONY: test-memory
|
||||
|
||||
test-write: limbo uv-sync
|
||||
@if [ "$(SQLITE_EXEC)" != "scripts/limbo-sqlite3" ]; then \
|
||||
SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-write; \
|
||||
RUST_LOG=$(RUST_LOG) SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-write; \
|
||||
else \
|
||||
echo "Skipping test-write: SQLITE_EXEC does not have indexes scripts/limbo-sqlite3"; \
|
||||
fi
|
||||
|
@ -102,7 +103,7 @@ test-write: limbo uv-sync
|
|||
|
||||
test-update: limbo uv-sync
|
||||
@if [ "$(SQLITE_EXEC)" != "scripts/limbo-sqlite3" ]; then \
|
||||
SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-update; \
|
||||
RUST_LOG=$(RUST_LOG) SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-update; \
|
||||
else \
|
||||
echo "Skipping test-update: SQLITE_EXEC does not have indexes scripts/limbo-sqlite3"; \
|
||||
fi
|
||||
|
@ -110,7 +111,7 @@ test-update: limbo uv-sync
|
|||
|
||||
test-collate: limbo uv-sync
|
||||
@if [ "$(SQLITE_EXEC)" != "scripts/limbo-sqlite3" ]; then \
|
||||
SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-collate; \
|
||||
RUST_LOG=$(RUST_LOG) SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-collate; \
|
||||
else \
|
||||
echo "Skipping test-collate: SQLITE_EXEC does not have indexes scripts/limbo-sqlite3"; \
|
||||
fi
|
||||
|
@ -118,7 +119,7 @@ test-collate: limbo uv-sync
|
|||
|
||||
test-constraint: limbo uv-sync
|
||||
@if [ "$(SQLITE_EXEC)" != "scripts/limbo-sqlite3" ]; then \
|
||||
SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-constraint; \
|
||||
RUST_LOG=$(RUST_LOG) SQLITE_EXEC=$(SQLITE_EXEC) uv run --project limbo_test test-constraint; \
|
||||
else \
|
||||
echo "Skipping test-constraint: SQLITE_EXEC does not have indexes scripts/limbo-sqlite3"; \
|
||||
fi
|
||||
|
@ -126,7 +127,7 @@ test-constraint: limbo uv-sync
|
|||
|
||||
bench-vfs: uv-sync
|
||||
cargo build --release
|
||||
uv run --project limbo_test bench-vfs "$(SQL)" "$(N)"
|
||||
RUST_LOG=$(RUST_LOG) uv run --project limbo_test bench-vfs "$(SQL)" "$(N)"
|
||||
|
||||
clickbench:
|
||||
./perf/clickbench/benchmark.sh
|
||||
|
|
|
@ -7,7 +7,7 @@ use turso_core::{LimboError, Statement, StepResult, Value};
|
|||
|
||||
pub struct LimboRows<'conn> {
|
||||
stmt: Box<Statement>,
|
||||
conn: &'conn mut LimboConn,
|
||||
_conn: &'conn mut LimboConn,
|
||||
err: Option<LimboError>,
|
||||
}
|
||||
|
||||
|
@ -15,7 +15,7 @@ impl<'conn> LimboRows<'conn> {
|
|||
pub fn new(stmt: Statement, conn: &'conn mut LimboConn) -> Self {
|
||||
LimboRows {
|
||||
stmt: Box::new(stmt),
|
||||
conn,
|
||||
_conn: conn,
|
||||
err: None,
|
||||
}
|
||||
}
|
||||
|
@ -55,8 +55,12 @@ pub extern "C" fn rows_next(ctx: *mut c_void) -> ResultCode {
|
|||
Ok(StepResult::Row) => ResultCode::Row,
|
||||
Ok(StepResult::Done) => ResultCode::Done,
|
||||
Ok(StepResult::IO) => {
|
||||
let _ = ctx.conn.io.run_once();
|
||||
ResultCode::Io
|
||||
let res = ctx.stmt.run_once();
|
||||
if res.is_err() {
|
||||
ResultCode::Error
|
||||
} else {
|
||||
ResultCode::Io
|
||||
}
|
||||
}
|
||||
Ok(StepResult::Busy) => ResultCode::Busy,
|
||||
Ok(StepResult::Interrupt) => ResultCode::Interrupt,
|
||||
|
|
|
@ -64,7 +64,10 @@ pub extern "C" fn stmt_execute(
|
|||
return ResultCode::Done;
|
||||
}
|
||||
Ok(StepResult::IO) => {
|
||||
let _ = stmt.conn.io.run_once();
|
||||
let res = statement.run_once();
|
||||
if res.is_err() {
|
||||
return ResultCode::Error;
|
||||
}
|
||||
}
|
||||
Ok(StepResult::Busy) => {
|
||||
return ResultCode::Busy;
|
||||
|
|
|
@ -13,12 +13,12 @@ use turso_core::Connection;
|
|||
#[derive(Clone)]
|
||||
pub struct TursoConnection {
|
||||
pub(crate) conn: Arc<Connection>,
|
||||
pub(crate) io: Arc<dyn turso_core::IO>,
|
||||
pub(crate) _io: Arc<dyn turso_core::IO>,
|
||||
}
|
||||
|
||||
impl TursoConnection {
|
||||
pub fn new(conn: Arc<Connection>, io: Arc<dyn turso_core::IO>) -> Self {
|
||||
TursoConnection { conn, io }
|
||||
TursoConnection { conn, _io: io }
|
||||
}
|
||||
|
||||
#[allow(clippy::wrong_self_convention)]
|
||||
|
|
|
@ -76,7 +76,7 @@ pub extern "system" fn Java_tech_turso_core_TursoStatement_step<'local>(
|
|||
};
|
||||
}
|
||||
StepResult::IO => {
|
||||
if let Err(e) = stmt.connection.io.run_once() {
|
||||
if let Err(e) = stmt.stmt.run_once() {
|
||||
set_err_msg_and_throw_exception(&mut env, obj, TURSO_ETC, e.to_string());
|
||||
return to_turso_step_result(&mut env, STEP_RESULT_ID_ERROR, None);
|
||||
}
|
||||
|
|
|
@ -41,7 +41,7 @@ pub struct Database {
|
|||
pub name: String,
|
||||
_db: Arc<turso_core::Database>,
|
||||
conn: Arc<turso_core::Connection>,
|
||||
io: Arc<dyn turso_core::IO>,
|
||||
_io: Arc<dyn turso_core::IO>,
|
||||
}
|
||||
|
||||
impl ObjectFinalize for Database {
|
||||
|
@ -82,7 +82,7 @@ impl Database {
|
|||
conn,
|
||||
open: true,
|
||||
name: path,
|
||||
io,
|
||||
_io: io,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -114,7 +114,7 @@ impl Database {
|
|||
return Ok(env.get_undefined()?.into_unknown())
|
||||
}
|
||||
turso_core::StepResult::IO => {
|
||||
self.io.run_once().map_err(into_napi_error)?;
|
||||
stmt.run_once().map_err(into_napi_error)?;
|
||||
continue;
|
||||
}
|
||||
step @ turso_core::StepResult::Interrupt
|
||||
|
@ -185,7 +185,7 @@ impl Database {
|
|||
Ok(Some(mut stmt)) => loop {
|
||||
match stmt.step() {
|
||||
Ok(StepResult::Row) => continue,
|
||||
Ok(StepResult::IO) => self.io.run_once().map_err(into_napi_error)?,
|
||||
Ok(StepResult::IO) => stmt.run_once().map_err(into_napi_error)?,
|
||||
Ok(StepResult::Done) => break,
|
||||
Ok(StepResult::Interrupt | StepResult::Busy) => {
|
||||
return Err(napi::Error::new(
|
||||
|
@ -308,7 +308,7 @@ impl Statement {
|
|||
}
|
||||
turso_core::StepResult::Done => return Ok(env.get_undefined()?.into_unknown()),
|
||||
turso_core::StepResult::IO => {
|
||||
self.database.io.run_once().map_err(into_napi_error)?;
|
||||
stmt.run_once().map_err(into_napi_error)?;
|
||||
continue;
|
||||
}
|
||||
turso_core::StepResult::Interrupt | turso_core::StepResult::Busy => {
|
||||
|
@ -338,7 +338,7 @@ impl Statement {
|
|||
self.check_and_bind(args)?;
|
||||
Ok(IteratorStatement {
|
||||
stmt: Rc::clone(&self.inner),
|
||||
database: self.database.clone(),
|
||||
_database: self.database.clone(),
|
||||
env,
|
||||
presentation_mode: self.presentation_mode.clone(),
|
||||
})
|
||||
|
@ -401,7 +401,7 @@ impl Statement {
|
|||
break;
|
||||
}
|
||||
turso_core::StepResult::IO => {
|
||||
self.database.io.run_once().map_err(into_napi_error)?;
|
||||
stmt.run_once().map_err(into_napi_error)?;
|
||||
}
|
||||
turso_core::StepResult::Interrupt | turso_core::StepResult::Busy => {
|
||||
return Err(napi::Error::new(
|
||||
|
@ -480,7 +480,7 @@ impl Statement {
|
|||
#[napi(iterator)]
|
||||
pub struct IteratorStatement {
|
||||
stmt: Rc<RefCell<turso_core::Statement>>,
|
||||
database: Database,
|
||||
_database: Database,
|
||||
env: Env,
|
||||
presentation_mode: PresentationMode,
|
||||
}
|
||||
|
@ -528,7 +528,7 @@ impl Generator for IteratorStatement {
|
|||
}
|
||||
turso_core::StepResult::Done => return None,
|
||||
turso_core::StepResult::IO => {
|
||||
self.database.io.run_once().ok()?;
|
||||
stmt.run_once().ok()?;
|
||||
continue;
|
||||
}
|
||||
turso_core::StepResult::Interrupt | turso_core::StepResult::Busy => return None,
|
||||
|
|
|
@ -96,14 +96,12 @@ impl Cursor {
|
|||
// For DDL and DML statements,
|
||||
// we need to execute the statement immediately
|
||||
if stmt_is_ddl || stmt_is_dml || stmt_is_tx {
|
||||
let mut stmt = stmt.borrow_mut();
|
||||
while let turso_core::StepResult::IO = stmt
|
||||
.borrow_mut()
|
||||
.step()
|
||||
.map_err(|e| PyErr::new::<OperationalError, _>(format!("Step error: {:?}", e)))?
|
||||
{
|
||||
self.conn
|
||||
.io
|
||||
.run_once()
|
||||
stmt.run_once()
|
||||
.map_err(|e| PyErr::new::<OperationalError, _>(format!("IO error: {:?}", e)))?;
|
||||
}
|
||||
}
|
||||
|
@ -132,7 +130,7 @@ impl Cursor {
|
|||
return Ok(Some(py_row));
|
||||
}
|
||||
turso_core::StepResult::IO => {
|
||||
self.conn.io.run_once().map_err(|e| {
|
||||
stmt.run_once().map_err(|e| {
|
||||
PyErr::new::<OperationalError, _>(format!("IO error: {:?}", e))
|
||||
})?;
|
||||
}
|
||||
|
@ -168,7 +166,7 @@ impl Cursor {
|
|||
results.push(py_row);
|
||||
}
|
||||
turso_core::StepResult::IO => {
|
||||
self.conn.io.run_once().map_err(|e| {
|
||||
stmt.run_once().map_err(|e| {
|
||||
PyErr::new::<OperationalError, _>(format!("IO error: {:?}", e))
|
||||
})?;
|
||||
}
|
||||
|
@ -233,7 +231,7 @@ fn stmt_is_tx(sql: &str) -> bool {
|
|||
#[derive(Clone)]
|
||||
pub struct Connection {
|
||||
conn: Arc<turso_core::Connection>,
|
||||
io: Arc<dyn turso_core::IO>,
|
||||
_io: Arc<dyn turso_core::IO>,
|
||||
}
|
||||
|
||||
#[pymethods]
|
||||
|
@ -308,7 +306,7 @@ impl Drop for Connection {
|
|||
#[pyfunction]
|
||||
pub fn connect(path: &str) -> Result<Connection> {
|
||||
match turso_core::Connection::from_uri(path, false, false) {
|
||||
Ok((io, conn)) => Ok(Connection { conn, io }),
|
||||
Ok((io, conn)) => Ok(Connection { conn, _io: io }),
|
||||
Err(e) => Err(PyErr::new::<ProgrammingError, _>(format!(
|
||||
"Failed to create connection: {:?}",
|
||||
e
|
||||
|
|
22
cli/app.rs
22
cli/app.rs
|
@ -24,6 +24,7 @@ use std::{
|
|||
},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tracing::level_filters::LevelFilter;
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
|
||||
use turso_core::{Connection, Database, LimboError, OpenFlags, Statement, StepResult, Value};
|
||||
|
@ -95,7 +96,7 @@ macro_rules! query_internal {
|
|||
$body(row)?;
|
||||
}
|
||||
StepResult::IO => {
|
||||
$self.io.run_once()?;
|
||||
rows.run_once()?;
|
||||
}
|
||||
StepResult::Interrupt => break,
|
||||
StepResult::Done => break,
|
||||
|
@ -175,7 +176,6 @@ impl Limbo {
|
|||
pub fn with_readline(mut self, mut rl: Editor<LimboHelper, DefaultHistory>) -> Self {
|
||||
let h = LimboHelper::new(
|
||||
self.conn.clone(),
|
||||
self.io.clone(),
|
||||
self.config.as_ref().map(|c| c.highlight.clone()),
|
||||
);
|
||||
rl.set_helper(Some(h));
|
||||
|
@ -644,8 +644,7 @@ impl Limbo {
|
|||
let _ = self.show_info();
|
||||
}
|
||||
Command::Import(args) => {
|
||||
let mut import_file =
|
||||
ImportFile::new(self.conn.clone(), self.io.clone(), &mut self.writer);
|
||||
let mut import_file = ImportFile::new(self.conn.clone(), &mut self.writer);
|
||||
import_file.import(args)
|
||||
}
|
||||
Command::LoadExtension(args) => {
|
||||
|
@ -833,7 +832,7 @@ impl Limbo {
|
|||
}
|
||||
Ok(StepResult::IO) => {
|
||||
let start = Instant::now();
|
||||
self.io.run_once()?;
|
||||
rows.run_once()?;
|
||||
if let Some(ref mut stats) = statistics {
|
||||
stats.io_time_elapsed_samples.push(start.elapsed());
|
||||
}
|
||||
|
@ -908,7 +907,12 @@ impl Limbo {
|
|||
.with_thread_ids(true)
|
||||
.with_ansi(should_emit_ansi),
|
||||
)
|
||||
.with(EnvFilter::from_default_env().add_directive("rustyline=off".parse().unwrap()))
|
||||
.with(
|
||||
EnvFilter::builder()
|
||||
.with_default_directive(LevelFilter::OFF.into())
|
||||
.from_env_lossy()
|
||||
.add_directive("rustyline=off".parse().unwrap()),
|
||||
)
|
||||
.try_init()
|
||||
{
|
||||
println!("Unable to setup tracing appender: {:?}", e);
|
||||
|
@ -940,7 +944,7 @@ impl Limbo {
|
|||
}
|
||||
}
|
||||
StepResult::IO => {
|
||||
self.io.run_once()?;
|
||||
rows.run_once()?;
|
||||
}
|
||||
StepResult::Interrupt => break,
|
||||
StepResult::Done => break,
|
||||
|
@ -996,7 +1000,7 @@ impl Limbo {
|
|||
}
|
||||
}
|
||||
StepResult::IO => {
|
||||
self.io.run_once()?;
|
||||
rows.run_once()?;
|
||||
}
|
||||
StepResult::Interrupt => break,
|
||||
StepResult::Done => break,
|
||||
|
@ -1047,7 +1051,7 @@ impl Limbo {
|
|||
}
|
||||
}
|
||||
StepResult::IO => {
|
||||
self.io.run_once()?;
|
||||
rows.run_once()?;
|
||||
}
|
||||
StepResult::Interrupt => break,
|
||||
StepResult::Done => break,
|
||||
|
|
|
@ -21,17 +21,12 @@ pub struct ImportArgs {
|
|||
|
||||
pub struct ImportFile<'a> {
|
||||
conn: Arc<Connection>,
|
||||
io: Arc<dyn turso_core::IO>,
|
||||
writer: &'a mut dyn Write,
|
||||
}
|
||||
|
||||
impl<'a> ImportFile<'a> {
|
||||
pub fn new(
|
||||
conn: Arc<Connection>,
|
||||
io: Arc<dyn turso_core::IO>,
|
||||
writer: &'a mut dyn Write,
|
||||
) -> Self {
|
||||
Self { conn, io, writer }
|
||||
pub fn new(conn: Arc<Connection>, writer: &'a mut dyn Write) -> Self {
|
||||
Self { conn, writer }
|
||||
}
|
||||
|
||||
pub fn import(&mut self, args: ImportArgs) {
|
||||
|
@ -79,7 +74,7 @@ impl<'a> ImportFile<'a> {
|
|||
while let Ok(x) = rows.step() {
|
||||
match x {
|
||||
turso_core::StepResult::IO => {
|
||||
self.io.run_once().unwrap();
|
||||
rows.run_once().unwrap();
|
||||
}
|
||||
turso_core::StepResult::Done => break,
|
||||
turso_core::StepResult::Interrupt => break,
|
||||
|
|
|
@ -40,11 +40,7 @@ pub struct LimboHelper {
|
|||
}
|
||||
|
||||
impl LimboHelper {
|
||||
pub fn new(
|
||||
conn: Arc<Connection>,
|
||||
io: Arc<dyn turso_core::IO>,
|
||||
syntax_config: Option<HighlightConfig>,
|
||||
) -> Self {
|
||||
pub fn new(conn: Arc<Connection>, syntax_config: Option<HighlightConfig>) -> Self {
|
||||
// Load only predefined syntax
|
||||
let ps = from_uncompressed_data(include_bytes!(concat!(
|
||||
env!("OUT_DIR"),
|
||||
|
@ -59,7 +55,7 @@ impl LimboHelper {
|
|||
}
|
||||
}
|
||||
LimboHelper {
|
||||
completer: SqlCompleter::new(conn, io),
|
||||
completer: SqlCompleter::new(conn),
|
||||
syntax_set: ps,
|
||||
theme_set: ts,
|
||||
syntax_config: syntax_config.unwrap_or_default(),
|
||||
|
@ -141,7 +137,6 @@ impl Highlighter for LimboHelper {
|
|||
|
||||
pub struct SqlCompleter<C: Parser + Send + Sync + 'static> {
|
||||
conn: Arc<Connection>,
|
||||
io: Arc<dyn turso_core::IO>,
|
||||
// Has to be a ref cell as Rustyline takes immutable reference to self
|
||||
// This problem would be solved with Reedline as it uses &mut self for completions
|
||||
cmd: RefCell<clap::Command>,
|
||||
|
@ -149,10 +144,9 @@ pub struct SqlCompleter<C: Parser + Send + Sync + 'static> {
|
|||
}
|
||||
|
||||
impl<C: Parser + Send + Sync + 'static> SqlCompleter<C> {
|
||||
pub fn new(conn: Arc<Connection>, io: Arc<dyn turso_core::IO>) -> Self {
|
||||
pub fn new(conn: Arc<Connection>) -> Self {
|
||||
Self {
|
||||
conn,
|
||||
io,
|
||||
cmd: C::command().into(),
|
||||
_cmd_phantom: PhantomData,
|
||||
}
|
||||
|
@ -228,7 +222,7 @@ impl<C: Parser + Send + Sync + 'static> SqlCompleter<C> {
|
|||
candidates.push(pair);
|
||||
}
|
||||
StepResult::IO => {
|
||||
try_result!(self.io.run_once(), (prefix_pos, candidates));
|
||||
try_result!(rows.run_once(), (prefix_pos, candidates));
|
||||
}
|
||||
StepResult::Interrupt => break,
|
||||
StepResult::Done => break,
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
|
||||
use pprof::criterion::{Output, PProfProfiler};
|
||||
use std::sync::Arc;
|
||||
use turso_core::{Database, PlatformIO, IO};
|
||||
use turso_core::{Database, PlatformIO};
|
||||
|
||||
fn rusqlite_open() -> rusqlite::Connection {
|
||||
let sqlite_conn = rusqlite::Connection::open("../testing/testing.db").unwrap();
|
||||
|
@ -79,7 +79,6 @@ fn bench_execute_select_rows(criterion: &mut Criterion) {
|
|||
let mut stmt = limbo_conn
|
||||
.prepare(format!("SELECT * FROM users LIMIT {}", *i))
|
||||
.unwrap();
|
||||
let io = io.clone();
|
||||
b.iter(|| {
|
||||
loop {
|
||||
match stmt.step().unwrap() {
|
||||
|
@ -87,7 +86,7 @@ fn bench_execute_select_rows(criterion: &mut Criterion) {
|
|||
black_box(stmt.row());
|
||||
}
|
||||
turso_core::StepResult::IO => {
|
||||
let _ = io.run_once();
|
||||
stmt.run_once().unwrap();
|
||||
}
|
||||
turso_core::StepResult::Done => {
|
||||
break;
|
||||
|
@ -141,7 +140,6 @@ fn bench_execute_select_1(criterion: &mut Criterion) {
|
|||
|
||||
group.bench_function("limbo_execute_select_1", |b| {
|
||||
let mut stmt = limbo_conn.prepare("SELECT 1").unwrap();
|
||||
let io = io.clone();
|
||||
b.iter(|| {
|
||||
loop {
|
||||
match stmt.step().unwrap() {
|
||||
|
@ -149,7 +147,7 @@ fn bench_execute_select_1(criterion: &mut Criterion) {
|
|||
black_box(stmt.row());
|
||||
}
|
||||
turso_core::StepResult::IO => {
|
||||
let _ = io.run_once();
|
||||
stmt.run_once().unwrap();
|
||||
}
|
||||
turso_core::StepResult::Done => {
|
||||
break;
|
||||
|
@ -194,7 +192,6 @@ fn bench_execute_select_count(criterion: &mut Criterion) {
|
|||
|
||||
group.bench_function("limbo_execute_select_count", |b| {
|
||||
let mut stmt = limbo_conn.prepare("SELECT count() FROM users").unwrap();
|
||||
let io = io.clone();
|
||||
b.iter(|| {
|
||||
loop {
|
||||
match stmt.step().unwrap() {
|
||||
|
@ -202,7 +199,7 @@ fn bench_execute_select_count(criterion: &mut Criterion) {
|
|||
black_box(stmt.row());
|
||||
}
|
||||
turso_core::StepResult::IO => {
|
||||
let _ = io.run_once();
|
||||
stmt.run_once().unwrap();
|
||||
}
|
||||
turso_core::StepResult::Done => {
|
||||
break;
|
||||
|
|
|
@ -4,7 +4,7 @@ use pprof::{
|
|||
flamegraph::Options,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use turso_core::{Database, PlatformIO, IO};
|
||||
use turso_core::{Database, PlatformIO};
|
||||
|
||||
// Title: JSONB Function Benchmarking
|
||||
|
||||
|
@ -447,13 +447,12 @@ fn bench(criterion: &mut Criterion) {
|
|||
|
||||
group.bench_function("Limbo", |b| {
|
||||
let mut stmt = limbo_conn.prepare(&query).unwrap();
|
||||
let io = io.clone();
|
||||
b.iter(|| {
|
||||
loop {
|
||||
match stmt.step().unwrap() {
|
||||
turso_core::StepResult::Row => {}
|
||||
turso_core::StepResult::IO => {
|
||||
let _ = io.run_once();
|
||||
stmt.run_once().unwrap();
|
||||
}
|
||||
turso_core::StepResult::Done => {
|
||||
break;
|
||||
|
@ -606,13 +605,12 @@ fn bench_sequential_jsonb(criterion: &mut Criterion) {
|
|||
|
||||
group.bench_function("Limbo - Sequential", |b| {
|
||||
let mut stmt = limbo_conn.prepare(&query).unwrap();
|
||||
let io = io.clone();
|
||||
b.iter(|| {
|
||||
loop {
|
||||
match stmt.step().unwrap() {
|
||||
turso_core::StepResult::Row => {}
|
||||
turso_core::StepResult::IO => {
|
||||
let _ = io.run_once();
|
||||
stmt.run_once().unwrap();
|
||||
}
|
||||
turso_core::StepResult::Done => {
|
||||
break;
|
||||
|
@ -899,13 +897,12 @@ fn bench_json_patch(criterion: &mut Criterion) {
|
|||
|
||||
group.bench_function("Limbo", |b| {
|
||||
let mut stmt = limbo_conn.prepare(&query).unwrap();
|
||||
let io = io.clone();
|
||||
b.iter(|| {
|
||||
loop {
|
||||
match stmt.step().unwrap() {
|
||||
turso_core::StepResult::Row => {}
|
||||
turso_core::StepResult::IO => {
|
||||
let _ = io.run_once();
|
||||
stmt.run_once().unwrap();
|
||||
}
|
||||
turso_core::StepResult::Done => {
|
||||
break;
|
||||
|
|
|
@ -2,7 +2,7 @@ use std::sync::Arc;
|
|||
|
||||
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, SamplingMode};
|
||||
use pprof::criterion::{Output, PProfProfiler};
|
||||
use turso_core::{Database, PlatformIO, IO as _};
|
||||
use turso_core::{Database, PlatformIO};
|
||||
|
||||
const TPC_H_PATH: &str = "../perf/tpc-h/TPC-H.db";
|
||||
|
||||
|
@ -97,7 +97,7 @@ fn bench_tpc_h_queries(criterion: &mut Criterion) {
|
|||
black_box(stmt.row());
|
||||
}
|
||||
turso_core::StepResult::IO => {
|
||||
let _ = io.run_once();
|
||||
stmt.run_once().unwrap();
|
||||
}
|
||||
turso_core::StepResult::Done => {
|
||||
break;
|
||||
|
|
|
@ -65,7 +65,10 @@ pub unsafe extern "C" fn execute(
|
|||
return ResultCode::OK;
|
||||
}
|
||||
Ok(StepResult::IO) => {
|
||||
let _ = conn.pager.io.run_once();
|
||||
let res = stmt.run_once();
|
||||
if res.is_err() {
|
||||
return ResultCode::Error;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Ok(StepResult::Interrupt) => return ResultCode::Interrupt,
|
||||
|
@ -154,7 +157,6 @@ pub unsafe extern "C" fn stmt_step(stmt: *mut Stmt) -> ResultCode {
|
|||
tracing::error!("stmt_step: null connection or context");
|
||||
return ResultCode::Error;
|
||||
}
|
||||
let conn: &Connection = unsafe { &*(stmt._conn as *const Connection) };
|
||||
let stmt_ctx: &mut Statement = unsafe { &mut *(stmt._ctx as *mut Statement) };
|
||||
while let Ok(res) = stmt_ctx.step() {
|
||||
match res {
|
||||
|
@ -162,7 +164,10 @@ pub unsafe extern "C" fn stmt_step(stmt: *mut Stmt) -> ResultCode {
|
|||
StepResult::Done => return ResultCode::EOF,
|
||||
StepResult::IO => {
|
||||
// always handle IO step result internally.
|
||||
let _ = conn.pager.io.run_once();
|
||||
let res = stmt_ctx.run_once();
|
||||
if res.is_err() {
|
||||
return ResultCode::Error;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
StepResult::Interrupt => return ResultCode::Interrupt,
|
||||
|
|
|
@ -18,7 +18,7 @@ use std::{
|
|||
io::{ErrorKind, Read, Seek, Write},
|
||||
sync::Arc,
|
||||
};
|
||||
use tracing::{debug, trace};
|
||||
use tracing::{debug, instrument, trace, Level};
|
||||
|
||||
struct OwnedCallbacks(UnsafeCell<Callbacks>);
|
||||
// We assume we locking on IO level is done by user.
|
||||
|
@ -219,6 +219,7 @@ impl IO for UnixIO {
|
|||
Ok(unix_file)
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::INFO)]
|
||||
fn run_once(&self) -> Result<()> {
|
||||
if self.callbacks.is_empty() {
|
||||
return Ok(());
|
||||
|
@ -333,6 +334,7 @@ impl File for UnixFile<'_> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::INFO)]
|
||||
fn pread(&self, pos: usize, c: Completion) -> Result<Arc<Completion>> {
|
||||
let file = self.file.borrow();
|
||||
let result = {
|
||||
|
@ -366,6 +368,7 @@ impl File for UnixFile<'_> {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::INFO)]
|
||||
fn pwrite(
|
||||
&self,
|
||||
pos: usize,
|
||||
|
@ -401,6 +404,7 @@ impl File for UnixFile<'_> {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::INFO)]
|
||||
fn sync(&self, c: Completion) -> Result<Arc<Completion>> {
|
||||
let file = self.file.borrow();
|
||||
let result = fs::fsync(file.as_fd());
|
||||
|
@ -415,6 +419,7 @@ impl File for UnixFile<'_> {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::INFO)]
|
||||
fn size(&self) -> Result<u64> {
|
||||
let file = self.file.borrow();
|
||||
Ok(file.metadata()?.len())
|
||||
|
|
48
core/lib.rs
48
core/lib.rs
|
@ -102,6 +102,17 @@ enum TransactionState {
|
|||
None,
|
||||
}
|
||||
|
||||
impl TransactionState {
|
||||
fn change_schema(&self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
TransactionState::Write {
|
||||
change_schema: true
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) type MvStore = mvcc::MvStore<mvcc::LocalClock>;
|
||||
|
||||
pub(crate) type MvCursor = mvcc::cursor::ScanCursor<mvcc::LocalClock>;
|
||||
|
@ -217,7 +228,7 @@ impl Database {
|
|||
if is_empty == 2 {
|
||||
// parse schema
|
||||
let conn = db.connect()?;
|
||||
let schema_version = get_schema_version(&conn, &io)?;
|
||||
let schema_version = get_schema_version(&conn)?;
|
||||
schema.write().schema_version = schema_version;
|
||||
let rows = conn.query("SELECT * FROM sqlite_schema")?;
|
||||
let mut schema = schema
|
||||
|
@ -225,7 +236,7 @@ impl Database {
|
|||
.expect("lock on schema should succeed first try");
|
||||
let syms = conn.syms.borrow();
|
||||
if let Err(LimboError::ExtensionError(e)) =
|
||||
parse_schema_rows(rows, &mut schema, io, &syms, None)
|
||||
parse_schema_rows(rows, &mut schema, &syms, None)
|
||||
{
|
||||
// this means that a vtab exists and we no longer have the module loaded. we print
|
||||
// a warning to the user to load the module
|
||||
|
@ -390,7 +401,7 @@ impl Database {
|
|||
}
|
||||
}
|
||||
|
||||
fn get_schema_version(conn: &Arc<Connection>, io: &Arc<dyn IO>) -> Result<u32> {
|
||||
fn get_schema_version(conn: &Arc<Connection>) -> Result<u32> {
|
||||
let mut rows = conn
|
||||
.query("PRAGMA schema_version")?
|
||||
.ok_or(LimboError::InternalError(
|
||||
|
@ -409,7 +420,7 @@ fn get_schema_version(conn: &Arc<Connection>, io: &Arc<dyn IO>) -> Result<u32> {
|
|||
schema_version = Some(row.get::<i64>(0)? as u32);
|
||||
}
|
||||
StepResult::IO => {
|
||||
io.run_once()?;
|
||||
rows.run_once()?;
|
||||
}
|
||||
StepResult::Interrupt => {
|
||||
return Err(LimboError::InternalError(
|
||||
|
@ -453,7 +464,7 @@ pub struct Connection {
|
|||
}
|
||||
|
||||
impl Connection {
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn prepare(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<Statement> {
|
||||
if sql.as_ref().is_empty() {
|
||||
return Err(LimboError::InvalidArgument(
|
||||
|
@ -494,7 +505,7 @@ impl Connection {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn query(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<Option<Statement>> {
|
||||
let sql = sql.as_ref();
|
||||
tracing::trace!("Querying: {}", sql);
|
||||
|
@ -510,7 +521,7 @@ impl Connection {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub(crate) fn run_cmd(
|
||||
self: &Arc<Connection>,
|
||||
cmd: Cmd,
|
||||
|
@ -563,7 +574,7 @@ impl Connection {
|
|||
|
||||
/// Execute will run a query from start to finish taking ownership of I/O because it will run pending I/Os if it didn't finish.
|
||||
/// TODO: make this api async
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn execute(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<()> {
|
||||
let sql = sql.as_ref();
|
||||
let mut parser = Parser::new(sql.as_bytes());
|
||||
|
@ -610,7 +621,7 @@ impl Connection {
|
|||
if matches!(res, StepResult::Done) {
|
||||
break;
|
||||
}
|
||||
self._db.io.run_once()?;
|
||||
self.run_once()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -618,6 +629,15 @@ impl Connection {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn run_once(&self) -> Result<()> {
|
||||
let res = self._db.io.run_once();
|
||||
if res.is_err() {
|
||||
let state = self.transaction_state.get();
|
||||
self.pager.rollback(state.change_schema(), self)?;
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs")]
|
||||
pub fn from_uri(
|
||||
uri: &str,
|
||||
|
@ -756,7 +776,7 @@ impl Connection {
|
|||
{
|
||||
let syms = self.syms.borrow();
|
||||
if let Err(LimboError::ExtensionError(e)) =
|
||||
parse_schema_rows(rows, &mut schema, self.pager.io.clone(), &syms, None)
|
||||
parse_schema_rows(rows, &mut schema, &syms, None)
|
||||
{
|
||||
// this means that a vtab exists and we no longer have the module loaded. we print
|
||||
// a warning to the user to load the module
|
||||
|
@ -883,7 +903,13 @@ impl Statement {
|
|||
}
|
||||
|
||||
pub fn run_once(&self) -> Result<()> {
|
||||
self.pager.io.run_once()
|
||||
let res = self.pager.io.run_once();
|
||||
if res.is_err() {
|
||||
let state = self.program.connection.transaction_state.get();
|
||||
self.pager
|
||||
.rollback(state.change_schema(), &self.program.connection)?;
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
pub fn num_columns(&self) -> usize {
|
||||
|
|
|
@ -592,6 +592,7 @@ impl BTreeCursor {
|
|||
|
||||
/// Check if the table is empty.
|
||||
/// This is done by checking if the root page has no cells.
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn is_empty_table(&self) -> Result<CursorResult<bool>> {
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let mv_cursor = mv_cursor.borrow();
|
||||
|
@ -606,7 +607,7 @@ impl BTreeCursor {
|
|||
|
||||
/// Move the cursor to the previous record and return it.
|
||||
/// Used in backwards iteration.
|
||||
#[instrument(skip(self), level = Level::TRACE, name = "prev")]
|
||||
#[instrument(skip(self), level = Level::INFO, name = "prev")]
|
||||
fn get_prev_record(&mut self) -> Result<CursorResult<bool>> {
|
||||
loop {
|
||||
let page = self.stack.top();
|
||||
|
@ -717,7 +718,7 @@ impl BTreeCursor {
|
|||
|
||||
/// Reads the record of a cell that has overflow pages. This is a state machine that requires to be called until completion so everything
|
||||
/// that calls this function should be reentrant.
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn process_overflow_read(
|
||||
&self,
|
||||
payload: &'static [u8],
|
||||
|
@ -835,6 +836,7 @@ impl BTreeCursor {
|
|||
///
|
||||
/// If the cell has overflow pages, it will skip till the overflow page which
|
||||
/// is at the offset given.
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn read_write_payload_with_offset(
|
||||
&mut self,
|
||||
mut offset: u32,
|
||||
|
@ -945,6 +947,7 @@ impl BTreeCursor {
|
|||
Ok(CursorResult::Ok(()))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn continue_payload_overflow_with_offset(
|
||||
&mut self,
|
||||
buffer: &mut Vec<u8>,
|
||||
|
@ -1121,7 +1124,7 @@ impl BTreeCursor {
|
|||
|
||||
/// Move the cursor to the next record and return it.
|
||||
/// Used in forwards iteration, which is the default.
|
||||
#[instrument(skip(self), level = Level::TRACE, name = "next")]
|
||||
#[instrument(skip(self), level = Level::INFO, name = "next")]
|
||||
fn get_next_record(&mut self) -> Result<CursorResult<bool>> {
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let mut mv_cursor = mv_cursor.borrow_mut();
|
||||
|
@ -1261,20 +1264,21 @@ impl BTreeCursor {
|
|||
}
|
||||
|
||||
/// Move the cursor to the root page of the btree.
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn move_to_root(&mut self) {
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn move_to_root(&mut self) -> Result<()> {
|
||||
self.seek_state = CursorSeekState::Start;
|
||||
self.going_upwards = false;
|
||||
tracing::trace!(root_page = self.root_page);
|
||||
let mem_page = self.read_page(self.root_page).unwrap();
|
||||
let mem_page = self.read_page(self.root_page)?;
|
||||
self.stack.clear();
|
||||
self.stack.push(mem_page);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Move the cursor to the rightmost record in the btree.
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
#[instrument(skip(self), level = Level::INFO)]
|
||||
fn move_to_rightmost(&mut self) -> Result<CursorResult<bool>> {
|
||||
self.move_to_root();
|
||||
self.move_to_root()?;
|
||||
|
||||
loop {
|
||||
let mem_page = self.stack.top();
|
||||
|
@ -1307,7 +1311,7 @@ impl BTreeCursor {
|
|||
}
|
||||
|
||||
/// Specialized version of move_to() for table btrees.
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
#[instrument(skip(self), level = Level::INFO)]
|
||||
fn tablebtree_move_to(&mut self, rowid: i64, seek_op: SeekOp) -> Result<CursorResult<()>> {
|
||||
'outer: loop {
|
||||
let page = self.stack.top();
|
||||
|
@ -1425,7 +1429,7 @@ impl BTreeCursor {
|
|||
}
|
||||
|
||||
/// Specialized version of move_to() for index btrees.
|
||||
#[instrument(skip(self, index_key), level = Level::TRACE)]
|
||||
#[instrument(skip(self, index_key), level = Level::INFO)]
|
||||
fn indexbtree_move_to(
|
||||
&mut self,
|
||||
index_key: &ImmutableRecord,
|
||||
|
@ -1641,7 +1645,7 @@ impl BTreeCursor {
|
|||
|
||||
/// Specialized version of do_seek() for table btrees that uses binary search instead
|
||||
/// of iterating cells in order.
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn tablebtree_seek(&mut self, rowid: i64, seek_op: SeekOp) -> Result<CursorResult<bool>> {
|
||||
turso_assert!(
|
||||
self.mv_cursor.is_none(),
|
||||
|
@ -1761,7 +1765,7 @@ impl BTreeCursor {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn indexbtree_seek(
|
||||
&mut self,
|
||||
key: &ImmutableRecord,
|
||||
|
@ -2032,7 +2036,7 @@ impl BTreeCursor {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn move_to(&mut self, key: SeekKey<'_>, cmp: SeekOp) -> Result<CursorResult<()>> {
|
||||
turso_assert!(
|
||||
self.mv_cursor.is_none(),
|
||||
|
@ -2072,7 +2076,7 @@ impl BTreeCursor {
|
|||
self.seek_state = CursorSeekState::Start;
|
||||
}
|
||||
if matches!(self.seek_state, CursorSeekState::Start) {
|
||||
self.move_to_root();
|
||||
self.move_to_root()?;
|
||||
}
|
||||
|
||||
let ret = match key {
|
||||
|
@ -2085,7 +2089,7 @@ impl BTreeCursor {
|
|||
|
||||
/// Insert a record into the btree.
|
||||
/// If the insert operation overflows the page, it will be split and the btree will be balanced.
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn insert_into_page(&mut self, bkey: &BTreeKey) -> Result<CursorResult<()>> {
|
||||
let record = bkey
|
||||
.get_record()
|
||||
|
@ -2266,7 +2270,7 @@ impl BTreeCursor {
|
|||
/// This is a naive algorithm that doesn't try to distribute cells evenly by content.
|
||||
/// It will try to split the page in half by keys not by content.
|
||||
/// Sqlite tries to have a page at least 40% full.
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
#[instrument(skip(self), level = Level::INFO)]
|
||||
fn balance(&mut self) -> Result<CursorResult<()>> {
|
||||
turso_assert!(
|
||||
matches!(self.state, CursorState::Write(_)),
|
||||
|
@ -2310,7 +2314,7 @@ impl BTreeCursor {
|
|||
}
|
||||
|
||||
if !self.stack.has_parent() {
|
||||
self.balance_root();
|
||||
self.balance_root()?;
|
||||
}
|
||||
|
||||
let write_info = self.state.mut_write_info().unwrap();
|
||||
|
@ -2328,6 +2332,7 @@ impl BTreeCursor {
|
|||
}
|
||||
|
||||
/// Balance a non root page by trying to balance cells between a maximum of 3 siblings that should be neighboring the page that overflowed/underflowed.
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn balance_non_root(&mut self) -> Result<CursorResult<()>> {
|
||||
turso_assert!(
|
||||
matches!(self.state, CursorState::Write(_)),
|
||||
|
@ -2885,7 +2890,7 @@ impl BTreeCursor {
|
|||
pages_to_balance_new[i].replace(page.clone());
|
||||
} else {
|
||||
// FIXME: handle page cache is full
|
||||
let page = self.allocate_page(page_type, 0);
|
||||
let page = self.allocate_page(page_type, 0)?;
|
||||
pages_to_balance_new[i].replace(page);
|
||||
// Since this page didn't exist before, we can set it to cells length as it
|
||||
// marks them as empty since it is a prefix sum of cells.
|
||||
|
@ -3780,7 +3785,7 @@ impl BTreeCursor {
|
|||
/// Balance the root page.
|
||||
/// This is done when the root page overflows, and we need to create a new root page.
|
||||
/// See e.g. https://en.wikipedia.org/wiki/B-tree
|
||||
fn balance_root(&mut self) {
|
||||
fn balance_root(&mut self) -> Result<()> {
|
||||
/* todo: balance deeper, create child and copy contents of root there. Then split root */
|
||||
/* if we are in root page then we just need to create a new root and push key there */
|
||||
|
||||
|
@ -3797,7 +3802,7 @@ impl BTreeCursor {
|
|||
// FIXME: handle page cache is full
|
||||
let child_btree =
|
||||
self.pager
|
||||
.do_allocate_page(root_contents.page_type(), 0, BtreePageAllocMode::Any);
|
||||
.do_allocate_page(root_contents.page_type(), 0, BtreePageAllocMode::Any)?;
|
||||
|
||||
tracing::debug!(
|
||||
"balance_root(root={}, rightmost={}, page_type={:?})",
|
||||
|
@ -3855,6 +3860,7 @@ impl BTreeCursor {
|
|||
self.stack.push(root_btree.clone());
|
||||
self.stack.set_cell_index(0); // leave parent pointing at the rightmost pointer (in this case 0, as there are no cells), since we will be balancing the rightmost child page.
|
||||
self.stack.push(child_btree.clone());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn usable_space(&self) -> usize {
|
||||
|
@ -3862,6 +3868,7 @@ impl BTreeCursor {
|
|||
}
|
||||
|
||||
/// Find the index of the cell in the page that contains the given rowid.
|
||||
#[instrument( skip_all, level = Level::INFO)]
|
||||
fn find_cell(&mut self, page: &PageContent, key: &BTreeKey) -> Result<CursorResult<usize>> {
|
||||
if self.find_cell_state.0.is_none() {
|
||||
self.find_cell_state.set(0);
|
||||
|
@ -3936,9 +3943,10 @@ impl BTreeCursor {
|
|||
Ok(CursorResult::Ok(cell_idx))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn seek_end(&mut self) -> Result<CursorResult<()>> {
|
||||
assert!(self.mv_cursor.is_none()); // unsure about this -_-
|
||||
self.move_to_root();
|
||||
self.move_to_root()?;
|
||||
loop {
|
||||
let mem_page = self.stack.top();
|
||||
let page_id = mem_page.get().get().id;
|
||||
|
@ -3964,6 +3972,7 @@ impl BTreeCursor {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn seek_to_last(&mut self) -> Result<CursorResult<()>> {
|
||||
let has_record = return_if_io!(self.move_to_rightmost());
|
||||
self.invalidate_record();
|
||||
|
@ -3984,13 +3993,14 @@ impl BTreeCursor {
|
|||
self.root_page
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn rewind(&mut self) -> Result<CursorResult<()>> {
|
||||
if self.mv_cursor.is_some() {
|
||||
let cursor_has_record = return_if_io!(self.get_next_record());
|
||||
self.invalidate_record();
|
||||
self.has_record.replace(cursor_has_record);
|
||||
} else {
|
||||
self.move_to_root();
|
||||
self.move_to_root()?;
|
||||
|
||||
let cursor_has_record = return_if_io!(self.get_next_record());
|
||||
self.invalidate_record();
|
||||
|
@ -3999,6 +4009,7 @@ impl BTreeCursor {
|
|||
Ok(CursorResult::Ok(()))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn last(&mut self) -> Result<CursorResult<()>> {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
let cursor_has_record = return_if_io!(self.move_to_rightmost());
|
||||
|
@ -4007,6 +4018,7 @@ impl BTreeCursor {
|
|||
Ok(CursorResult::Ok(()))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn next(&mut self) -> Result<CursorResult<bool>> {
|
||||
return_if_io!(self.restore_context());
|
||||
let cursor_has_record = return_if_io!(self.get_next_record());
|
||||
|
@ -4022,6 +4034,7 @@ impl BTreeCursor {
|
|||
.invalidate();
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn prev(&mut self) -> Result<CursorResult<bool>> {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
return_if_io!(self.restore_context());
|
||||
|
@ -4031,7 +4044,7 @@ impl BTreeCursor {
|
|||
Ok(CursorResult::Ok(cursor_has_record))
|
||||
}
|
||||
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
#[instrument(skip(self), level = Level::INFO)]
|
||||
pub fn rowid(&mut self) -> Result<CursorResult<Option<i64>>> {
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let mv_cursor = mv_cursor.borrow();
|
||||
|
@ -4073,7 +4086,7 @@ impl BTreeCursor {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
#[instrument(skip(self), level = Level::INFO)]
|
||||
pub fn seek(&mut self, key: SeekKey<'_>, op: SeekOp) -> Result<CursorResult<bool>> {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
// Empty trace to capture the span information
|
||||
|
@ -4094,7 +4107,7 @@ impl BTreeCursor {
|
|||
/// Return a reference to the record the cursor is currently pointing to.
|
||||
/// If record was not parsed yet, then we have to parse it and in case of I/O we yield control
|
||||
/// back.
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
#[instrument(skip(self), level = Level::INFO)]
|
||||
pub fn record(&self) -> Result<CursorResult<Option<Ref<ImmutableRecord>>>> {
|
||||
if !self.has_record.get() {
|
||||
return Ok(CursorResult::Ok(None));
|
||||
|
@ -4162,7 +4175,7 @@ impl BTreeCursor {
|
|||
Ok(CursorResult::Ok(Some(record_ref)))
|
||||
}
|
||||
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
#[instrument(skip(self), level = Level::INFO)]
|
||||
pub fn insert(
|
||||
&mut self,
|
||||
key: &BTreeKey,
|
||||
|
@ -4232,7 +4245,7 @@ impl BTreeCursor {
|
|||
/// 7. WaitForBalancingToComplete -> perform balancing
|
||||
/// 8. SeekAfterBalancing -> adjust the cursor to a node that is closer to the deleted value. go to Finish
|
||||
/// 9. Finish -> Delete operation is done. Return CursorResult(Ok())
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
#[instrument(skip(self), level = Level::INFO)]
|
||||
pub fn delete(&mut self) -> Result<CursorResult<()>> {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
|
||||
|
@ -4609,6 +4622,7 @@ impl BTreeCursor {
|
|||
}
|
||||
|
||||
/// Search for a key in an Index Btree. Looking up indexes that need to be unique, we cannot compare the rowid
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn key_exists_in_index(&mut self, key: &ImmutableRecord) -> Result<CursorResult<bool>> {
|
||||
return_if_io!(self.seek(SeekKey::IndexKey(key), SeekOp::GE { eq_only: true }));
|
||||
|
||||
|
@ -4638,6 +4652,7 @@ impl BTreeCursor {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn exists(&mut self, key: &Value) -> Result<CursorResult<bool>> {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
let int_key = match key {
|
||||
|
@ -4654,6 +4669,7 @@ impl BTreeCursor {
|
|||
/// Clear the overflow pages linked to a specific page provided by the leaf cell
|
||||
/// Uses a state machine to keep track of it's operations so that traversal can be
|
||||
/// resumed from last point after IO interruption
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn clear_overflow_pages(&mut self, cell: &BTreeCell) -> Result<CursorResult<()>> {
|
||||
loop {
|
||||
let state = self.overflow_state.take().unwrap_or(OverflowState::Start);
|
||||
|
@ -4722,10 +4738,10 @@ impl BTreeCursor {
|
|||
/// ```
|
||||
///
|
||||
/// The destruction order would be: [4',4,5,2,6,7,3,1]
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
#[instrument(skip(self), level = Level::INFO)]
|
||||
pub fn btree_destroy(&mut self) -> Result<CursorResult<Option<usize>>> {
|
||||
if let CursorState::None = &self.state {
|
||||
self.move_to_root();
|
||||
self.move_to_root()?;
|
||||
self.state = CursorState::Destroy(DestroyInfo {
|
||||
state: DestroyState::Start,
|
||||
});
|
||||
|
@ -4998,10 +5014,10 @@ impl BTreeCursor {
|
|||
/// Count the number of entries in the b-tree
|
||||
///
|
||||
/// Only supposed to be used in the context of a simple Count Select Statement
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
#[instrument(skip(self), level = Level::INFO)]
|
||||
pub fn count(&mut self) -> Result<CursorResult<usize>> {
|
||||
if self.count == 0 {
|
||||
self.move_to_root();
|
||||
self.move_to_root()?;
|
||||
}
|
||||
|
||||
if let Some(_mv_cursor) = &self.mv_cursor {
|
||||
|
@ -5034,7 +5050,7 @@ impl BTreeCursor {
|
|||
loop {
|
||||
if !self.stack.has_parent() {
|
||||
// All pages of the b-tree have been visited. Return successfully
|
||||
self.move_to_root();
|
||||
self.move_to_root()?;
|
||||
|
||||
return Ok(CursorResult::Ok(self.count));
|
||||
}
|
||||
|
@ -5107,6 +5123,7 @@ impl BTreeCursor {
|
|||
}
|
||||
|
||||
/// If context is defined, restore it and set it None on success
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn restore_context(&mut self) -> Result<CursorResult<()>> {
|
||||
if self.context.is_none() || !matches!(self.valid_state, CursorValidState::RequireSeek) {
|
||||
return Ok(CursorResult::Ok(()));
|
||||
|
@ -5137,7 +5154,7 @@ impl BTreeCursor {
|
|||
btree_read_page(&self.pager, page_idx)
|
||||
}
|
||||
|
||||
pub fn allocate_page(&self, page_type: PageType, offset: usize) -> BTreePage {
|
||||
pub fn allocate_page(&self, page_type: PageType, offset: usize) -> Result<BTreePage> {
|
||||
self.pager
|
||||
.do_allocate_page(page_type, offset, BtreePageAllocMode::Any)
|
||||
}
|
||||
|
@ -5545,7 +5562,7 @@ impl PageStack {
|
|||
}
|
||||
/// Push a new page onto the stack.
|
||||
/// This effectively means traversing to a child page.
|
||||
#[instrument(skip_all, level = Level::TRACE, name = "pagestack::push")]
|
||||
#[instrument(skip_all, level = Level::INFO, name = "pagestack::push")]
|
||||
fn _push(&self, page: BTreePage, starting_cell_idx: i32) {
|
||||
tracing::trace!(
|
||||
current = self.current_page.get(),
|
||||
|
@ -5572,7 +5589,7 @@ impl PageStack {
|
|||
|
||||
/// Pop a page off the stack.
|
||||
/// This effectively means traversing back up to a parent page.
|
||||
#[instrument(skip_all, level = Level::TRACE, name = "pagestack::pop")]
|
||||
#[instrument(skip_all, level = Level::INFO, name = "pagestack::pop")]
|
||||
fn pop(&self) {
|
||||
let current = self.current_page.get();
|
||||
assert!(current >= 0);
|
||||
|
@ -5584,7 +5601,7 @@ impl PageStack {
|
|||
|
||||
/// Get the top page on the stack.
|
||||
/// This is the page that is currently being traversed.
|
||||
#[instrument(skip(self), level = Level::TRACE, name = "pagestack::top", )]
|
||||
#[instrument(skip(self), level = Level::INFO, name = "pagestack::top", )]
|
||||
fn top(&self) -> BTreePage {
|
||||
let page = self.stack.borrow()[self.current()]
|
||||
.as_ref()
|
||||
|
@ -5616,7 +5633,7 @@ impl PageStack {
|
|||
|
||||
/// Advance the current cell index of the current page to the next cell.
|
||||
/// We usually advance after going traversing a new page
|
||||
#[instrument(skip(self), level = Level::TRACE, name = "pagestack::advance",)]
|
||||
#[instrument(skip(self), level = Level::INFO, name = "pagestack::advance",)]
|
||||
fn advance(&self) {
|
||||
let current = self.current();
|
||||
tracing::trace!(
|
||||
|
@ -5626,7 +5643,7 @@ impl PageStack {
|
|||
self.cell_indices.borrow_mut()[current] += 1;
|
||||
}
|
||||
|
||||
#[instrument(skip(self), level = Level::TRACE, name = "pagestack::retreat")]
|
||||
#[instrument(skip(self), level = Level::INFO, name = "pagestack::retreat")]
|
||||
fn retreat(&self) {
|
||||
let current = self.current();
|
||||
tracing::trace!(
|
||||
|
@ -7066,10 +7083,10 @@ mod tests {
|
|||
}
|
||||
run_until_done(|| pager.begin_read_tx(), &pager).unwrap();
|
||||
// FIXME: add sorted vector instead, should be okay for small amounts of keys for now :P, too lazy to fix right now
|
||||
cursor.move_to_root();
|
||||
cursor.move_to_root().unwrap();
|
||||
let mut valid = true;
|
||||
if do_validate {
|
||||
cursor.move_to_root();
|
||||
cursor.move_to_root().unwrap();
|
||||
for key in keys.iter() {
|
||||
tracing::trace!("seeking key: {}", key);
|
||||
run_until_done(|| cursor.next(), pager.deref()).unwrap();
|
||||
|
@ -7102,7 +7119,7 @@ mod tests {
|
|||
if matches!(validate_btree(pager.clone(), root_page), (_, false)) {
|
||||
panic!("invalid btree");
|
||||
}
|
||||
cursor.move_to_root();
|
||||
cursor.move_to_root().unwrap();
|
||||
for key in keys.iter() {
|
||||
tracing::trace!("seeking key: {}", key);
|
||||
run_until_done(|| cursor.next(), pager.deref()).unwrap();
|
||||
|
@ -7181,7 +7198,7 @@ mod tests {
|
|||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
cursor.move_to_root();
|
||||
cursor.move_to_root().unwrap();
|
||||
loop {
|
||||
match pager.end_tx(false, false, &conn, false).unwrap() {
|
||||
crate::PagerCacheflushStatus::Done(_) => break,
|
||||
|
@ -7194,7 +7211,7 @@ mod tests {
|
|||
|
||||
// Check that all keys can be found by seeking
|
||||
pager.begin_read_tx().unwrap();
|
||||
cursor.move_to_root();
|
||||
cursor.move_to_root().unwrap();
|
||||
for (i, key) in keys.iter().enumerate() {
|
||||
tracing::info!("seeking key {}/{}: {:?}", i + 1, keys.len(), key);
|
||||
let exists = run_until_done(
|
||||
|
@ -7214,7 +7231,7 @@ mod tests {
|
|||
assert!(exists, "key {:?} is not found", key);
|
||||
}
|
||||
// Check that key count is right
|
||||
cursor.move_to_root();
|
||||
cursor.move_to_root().unwrap();
|
||||
let mut count = 0;
|
||||
while run_until_done(|| cursor.next(), pager.deref()).unwrap() {
|
||||
count += 1;
|
||||
|
@ -7227,7 +7244,7 @@ mod tests {
|
|||
keys.len()
|
||||
);
|
||||
// Check that all keys can be found in-order, by iterating the btree
|
||||
cursor.move_to_root();
|
||||
cursor.move_to_root().unwrap();
|
||||
let mut prev = None;
|
||||
for (i, key) in keys.iter().enumerate() {
|
||||
tracing::info!("iterating key {}/{}: {:?}", i + 1, keys.len(), key);
|
||||
|
@ -7571,11 +7588,11 @@ mod tests {
|
|||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), 2);
|
||||
|
||||
// Initialize page 2 as a root page (interior)
|
||||
let root_page = cursor.allocate_page(PageType::TableInterior, 0);
|
||||
let root_page = cursor.allocate_page(PageType::TableInterior, 0)?;
|
||||
|
||||
// Allocate two leaf pages
|
||||
let page3 = cursor.allocate_page(PageType::TableLeaf, 0);
|
||||
let page4 = cursor.allocate_page(PageType::TableLeaf, 0);
|
||||
let page3 = cursor.allocate_page(PageType::TableLeaf, 0)?;
|
||||
let page4 = cursor.allocate_page(PageType::TableLeaf, 0)?;
|
||||
|
||||
// Configure the root page to point to the two leaf pages
|
||||
{
|
||||
|
@ -8429,7 +8446,7 @@ mod tests {
|
|||
);
|
||||
}
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page);
|
||||
cursor.move_to_root();
|
||||
cursor.move_to_root().unwrap();
|
||||
for i in 0..iterations {
|
||||
let has_next = run_until_done(|| cursor.next(), pager.deref()).unwrap();
|
||||
if !has_next {
|
||||
|
|
|
@ -2,6 +2,7 @@ use crate::error::LimboError;
|
|||
use crate::io::CompletionType;
|
||||
use crate::{io::Completion, Buffer, Result};
|
||||
use std::{cell::RefCell, sync::Arc};
|
||||
use tracing::{instrument, Level};
|
||||
|
||||
/// DatabaseStorage is an interface a database file that consists of pages.
|
||||
///
|
||||
|
@ -32,6 +33,7 @@ unsafe impl Sync for DatabaseFile {}
|
|||
|
||||
#[cfg(feature = "fs")]
|
||||
impl DatabaseStorage for DatabaseFile {
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn read_page(&self, page_idx: usize, c: Completion) -> Result<()> {
|
||||
let r = c.as_read();
|
||||
let size = r.buf().len();
|
||||
|
@ -44,6 +46,7 @@ impl DatabaseStorage for DatabaseFile {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn write_page(
|
||||
&self,
|
||||
page_idx: usize,
|
||||
|
@ -60,11 +63,13 @@ impl DatabaseStorage for DatabaseFile {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn sync(&self, c: Completion) -> Result<()> {
|
||||
let _ = self.file.sync(c)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn size(&self) -> Result<u64> {
|
||||
self.file.size()
|
||||
}
|
||||
|
@ -85,6 +90,7 @@ unsafe impl Send for FileMemoryStorage {}
|
|||
unsafe impl Sync for FileMemoryStorage {}
|
||||
|
||||
impl DatabaseStorage for FileMemoryStorage {
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn read_page(&self, page_idx: usize, c: Completion) -> Result<()> {
|
||||
let r = match c.completion_type {
|
||||
CompletionType::Read(ref r) => r,
|
||||
|
@ -100,6 +106,7 @@ impl DatabaseStorage for FileMemoryStorage {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn write_page(
|
||||
&self,
|
||||
page_idx: usize,
|
||||
|
@ -115,11 +122,13 @@ impl DatabaseStorage for FileMemoryStorage {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn sync(&self, c: Completion) -> Result<()> {
|
||||
let _ = self.file.sync(c)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn size(&self) -> Result<u64> {
|
||||
self.file.size()
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ use std::collections::HashSet;
|
|||
use std::rc::Rc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tracing::{trace, Level};
|
||||
use tracing::{instrument, trace, Level};
|
||||
|
||||
use super::btree::{btree_init_page, BTreePage};
|
||||
use super::page_cache::{CacheError, CacheResizeResult, DumbLruPageCache, PageCacheKey};
|
||||
|
@ -471,6 +471,7 @@ impl Pager {
|
|||
|
||||
/// This method is used to allocate a new root page for a btree, both for tables and indexes
|
||||
/// FIXME: handle no room in page cache
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn btree_create(&self, flags: &CreateBTreeFlags) -> Result<CursorResult<u32>> {
|
||||
let page_type = match flags {
|
||||
_ if flags.is_table() => PageType::TableLeaf,
|
||||
|
@ -479,7 +480,7 @@ impl Pager {
|
|||
};
|
||||
#[cfg(feature = "omit_autovacuum")]
|
||||
{
|
||||
let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any);
|
||||
let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)?;
|
||||
let page_id = page.get().get().id;
|
||||
Ok(CursorResult::Ok(page_id as u32))
|
||||
}
|
||||
|
@ -490,7 +491,7 @@ impl Pager {
|
|||
let auto_vacuum_mode = self.auto_vacuum_mode.borrow();
|
||||
match *auto_vacuum_mode {
|
||||
AutoVacuumMode::None => {
|
||||
let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any);
|
||||
let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)?;
|
||||
let page_id = page.get().get().id;
|
||||
Ok(CursorResult::Ok(page_id as u32))
|
||||
}
|
||||
|
@ -514,7 +515,7 @@ impl Pager {
|
|||
page_type,
|
||||
0,
|
||||
BtreePageAllocMode::Exact(root_page_num),
|
||||
);
|
||||
)?;
|
||||
let allocated_page_id = page.get().get().id as u32;
|
||||
if allocated_page_id != root_page_num {
|
||||
// TODO(Zaid): Handle swapping the allocated page with the desired root page
|
||||
|
@ -558,8 +559,8 @@ impl Pager {
|
|||
page_type: PageType,
|
||||
offset: usize,
|
||||
_alloc_mode: BtreePageAllocMode,
|
||||
) -> BTreePage {
|
||||
let page = self.allocate_page().unwrap();
|
||||
) -> Result<BTreePage> {
|
||||
let page = self.allocate_page()?;
|
||||
let page = Arc::new(BTreePageInner {
|
||||
page: RefCell::new(page),
|
||||
});
|
||||
|
@ -569,7 +570,7 @@ impl Pager {
|
|||
page.get().get().id,
|
||||
page.get().get_contents().page_type()
|
||||
);
|
||||
page
|
||||
Ok(page)
|
||||
}
|
||||
|
||||
/// The "usable size" of a database page is the page size specified by the 2-byte integer at offset 16
|
||||
|
@ -589,6 +590,7 @@ impl Pager {
|
|||
}
|
||||
|
||||
#[inline(always)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn begin_read_tx(&self) -> Result<CursorResult<LimboResult>> {
|
||||
// We allocate the first page lazily in the first transaction
|
||||
match self.maybe_allocate_page1()? {
|
||||
|
@ -598,6 +600,7 @@ impl Pager {
|
|||
Ok(CursorResult::Ok(self.wal.borrow_mut().begin_read_tx()?))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn maybe_allocate_page1(&self) -> Result<CursorResult<()>> {
|
||||
if self.is_empty.load(Ordering::SeqCst) < DB_STATE_INITIALIZED {
|
||||
if let Ok(_lock) = self.init_lock.try_lock() {
|
||||
|
@ -621,6 +624,7 @@ impl Pager {
|
|||
}
|
||||
|
||||
#[inline(always)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn begin_write_tx(&self) -> Result<CursorResult<LimboResult>> {
|
||||
// TODO(Diego): The only possibly allocate page1 here is because OpenEphemeral needs a write transaction
|
||||
// we should have a unique API to begin transactions, something like sqlite3BtreeBeginTrans
|
||||
|
@ -631,6 +635,7 @@ impl Pager {
|
|||
Ok(CursorResult::Ok(self.wal.borrow_mut().begin_write_tx()?))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn end_tx(
|
||||
&self,
|
||||
rollback: bool,
|
||||
|
@ -666,13 +671,14 @@ impl Pager {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn end_read_tx(&self) -> Result<()> {
|
||||
self.wal.borrow().end_read_tx()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reads a page from the database.
|
||||
#[tracing::instrument(skip_all, level = Level::DEBUG)]
|
||||
#[tracing::instrument(skip_all, level = Level::INFO)]
|
||||
pub fn read_page(&self, page_idx: usize) -> Result<PageRef, LimboError> {
|
||||
tracing::trace!("read_page(page_idx = {})", page_idx);
|
||||
let mut page_cache = self.page_cache.write();
|
||||
|
@ -759,11 +765,12 @@ impl Pager {
|
|||
/// In the base case, it will write the dirty pages to the WAL and then fsync the WAL.
|
||||
/// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to
|
||||
/// the database file and then fsync the database file.
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn cacheflush(&self, wal_checkpoint_disabled: bool) -> Result<PagerCacheflushStatus> {
|
||||
let mut checkpoint_result = CheckpointResult::default();
|
||||
loop {
|
||||
let res = loop {
|
||||
let state = self.flush_info.borrow().state;
|
||||
trace!("cacheflush {:?}", state);
|
||||
trace!(?state);
|
||||
match state {
|
||||
FlushState::Start => {
|
||||
let db_size = header_accessor::get_database_size(self)?;
|
||||
|
@ -795,7 +802,6 @@ impl Pager {
|
|||
let in_flight = *self.flush_info.borrow().in_flight_writes.borrow();
|
||||
if in_flight == 0 {
|
||||
self.flush_info.borrow_mut().state = FlushState::SyncWal;
|
||||
self.wal.borrow_mut().finish_append_frames_commit()?;
|
||||
} else {
|
||||
return Ok(PagerCacheflushStatus::IO);
|
||||
}
|
||||
|
@ -807,9 +813,7 @@ impl Pager {
|
|||
|
||||
if wal_checkpoint_disabled || !self.wal.borrow().should_checkpoint() {
|
||||
self.flush_info.borrow_mut().state = FlushState::Start;
|
||||
return Ok(PagerCacheflushStatus::Done(
|
||||
PagerCacheflushResult::WalWritten,
|
||||
));
|
||||
break PagerCacheflushResult::WalWritten;
|
||||
}
|
||||
self.flush_info.borrow_mut().state = FlushState::Checkpoint;
|
||||
}
|
||||
|
@ -831,16 +835,17 @@ impl Pager {
|
|||
return Ok(PagerCacheflushStatus::IO);
|
||||
} else {
|
||||
self.flush_info.borrow_mut().state = FlushState::Start;
|
||||
break;
|
||||
break PagerCacheflushResult::Checkpointed(checkpoint_result);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(PagerCacheflushStatus::Done(
|
||||
PagerCacheflushResult::Checkpointed(checkpoint_result),
|
||||
))
|
||||
};
|
||||
// We should only signal that we finished appenind frames after wal sync to avoid inconsistencies when sync fails
|
||||
self.wal.borrow_mut().finish_append_frames_commit()?;
|
||||
Ok(PagerCacheflushStatus::Done(res))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn wal_get_frame(
|
||||
&self,
|
||||
frame_no: u32,
|
||||
|
@ -856,11 +861,12 @@ impl Pager {
|
|||
)
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO, target = "pager_checkpoint",)]
|
||||
pub fn checkpoint(&self) -> Result<CheckpointStatus> {
|
||||
let mut checkpoint_result = CheckpointResult::default();
|
||||
loop {
|
||||
let state = *self.checkpoint_state.borrow();
|
||||
trace!("pager_checkpoint(state={:?})", state);
|
||||
trace!(?state);
|
||||
match state {
|
||||
CheckpointState::Checkpoint => {
|
||||
let in_flight = self.checkpoint_inflight.clone();
|
||||
|
@ -932,6 +938,7 @@ impl Pager {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn wal_checkpoint(&self, wal_checkpoint_disabled: bool) -> Result<CheckpointResult> {
|
||||
if wal_checkpoint_disabled {
|
||||
return Ok(CheckpointResult {
|
||||
|
@ -947,7 +954,7 @@ impl Pager {
|
|||
CheckpointMode::Passive,
|
||||
) {
|
||||
Ok(CheckpointStatus::IO) => {
|
||||
let _ = self.io.run_once();
|
||||
self.io.run_once()?;
|
||||
}
|
||||
Ok(CheckpointStatus::Done(res)) => {
|
||||
checkpoint_result = res;
|
||||
|
@ -965,6 +972,7 @@ impl Pager {
|
|||
|
||||
// Providing a page is optional, if provided it will be used to avoid reading the page from disk.
|
||||
// This is implemented in accordance with sqlite freepage2() function.
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn free_page(&self, page: Option<PageRef>, page_id: usize) -> Result<()> {
|
||||
tracing::trace!("free_page(page_id={})", page_id);
|
||||
const TRUNK_PAGE_HEADER_SIZE: usize = 8;
|
||||
|
@ -1036,6 +1044,7 @@ impl Pager {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn allocate_page1(&self) -> Result<CursorResult<PageRef>> {
|
||||
let state = self.allocate_page1_state.borrow().clone();
|
||||
match state {
|
||||
|
@ -1111,6 +1120,7 @@ impl Pager {
|
|||
*/
|
||||
// FIXME: handle no room in page cache
|
||||
#[allow(clippy::readonly_write_lock)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn allocate_page(&self) -> Result<PageRef> {
|
||||
let old_db_size = header_accessor::get_database_size(self)?;
|
||||
#[allow(unused_mut)]
|
||||
|
@ -1195,7 +1205,9 @@ impl Pager {
|
|||
(page_size - reserved_space) as usize
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn rollback(&self, change_schema: bool, connection: &Connection) -> Result<(), LimboError> {
|
||||
tracing::debug!(change_schema);
|
||||
self.dirty_pages.borrow_mut().clear();
|
||||
let mut cache = self.page_cache.write();
|
||||
cache.unset_dirty_all_pages();
|
||||
|
|
|
@ -727,6 +727,7 @@ impl PageContent {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn begin_read_page(
|
||||
db_file: Arc<dyn DatabaseStorage>,
|
||||
buffer_pool: Arc<BufferPool>,
|
||||
|
@ -773,6 +774,7 @@ pub fn finish_read_page(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn begin_write_btree_page(
|
||||
pager: &Pager,
|
||||
page: &PageRef,
|
||||
|
@ -791,13 +793,14 @@ pub fn begin_write_btree_page(
|
|||
};
|
||||
|
||||
*write_counter.borrow_mut() += 1;
|
||||
let clone_counter = write_counter.clone();
|
||||
let write_complete = {
|
||||
let buf_copy = buffer.clone();
|
||||
Box::new(move |bytes_written: i32| {
|
||||
tracing::trace!("finish_write_btree_page");
|
||||
let buf_copy = buf_copy.clone();
|
||||
let buf_len = buf_copy.borrow().len();
|
||||
*write_counter.borrow_mut() -= 1;
|
||||
*clone_counter.borrow_mut() -= 1;
|
||||
|
||||
page_finish.clear_dirty();
|
||||
if bytes_written < buf_len as i32 {
|
||||
|
@ -806,10 +809,15 @@ pub fn begin_write_btree_page(
|
|||
})
|
||||
};
|
||||
let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete)));
|
||||
page_source.write_page(page_id, buffer.clone(), c)?;
|
||||
Ok(())
|
||||
let res = page_source.write_page(page_id, buffer.clone(), c);
|
||||
if res.is_err() {
|
||||
// Avoid infinite loop if write page fails
|
||||
*write_counter.borrow_mut() -= 1;
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn begin_sync(db_file: Arc<dyn DatabaseStorage>, syncing: Rc<RefCell<bool>>) -> Result<()> {
|
||||
assert!(!*syncing.borrow());
|
||||
*syncing.borrow_mut() = true;
|
||||
|
@ -1481,7 +1489,7 @@ pub fn begin_read_wal_frame(
|
|||
Ok(c)
|
||||
}
|
||||
|
||||
#[instrument(skip(io, page, write_counter, wal_header, checksums), level = Level::TRACE)]
|
||||
#[instrument(err,skip(io, page, write_counter, wal_header, checksums), level = Level::INFO)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn begin_write_wal_frame(
|
||||
io: &Arc<dyn File>,
|
||||
|
@ -1548,13 +1556,14 @@ pub fn begin_write_wal_frame(
|
|||
(Arc::new(RefCell::new(buffer)), final_checksum)
|
||||
};
|
||||
|
||||
let clone_counter = write_counter.clone();
|
||||
*write_counter.borrow_mut() += 1;
|
||||
let write_complete = {
|
||||
let buf_copy = buffer.clone();
|
||||
Box::new(move |bytes_written: i32| {
|
||||
let buf_copy = buf_copy.clone();
|
||||
let buf_len = buf_copy.borrow().len();
|
||||
*write_counter.borrow_mut() -= 1;
|
||||
*clone_counter.borrow_mut() -= 1;
|
||||
|
||||
page_finish.clear_dirty();
|
||||
if bytes_written < buf_len as i32 {
|
||||
|
@ -1564,7 +1573,12 @@ pub fn begin_write_wal_frame(
|
|||
};
|
||||
#[allow(clippy::arc_with_non_send_sync)]
|
||||
let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete)));
|
||||
io.pwrite(offset, buffer.clone(), c)?;
|
||||
let res = io.pwrite(offset, buffer.clone(), c);
|
||||
if res.is_err() {
|
||||
// If we do not reduce the counter here on error, we incur an infinite loop when cacheflushing
|
||||
*write_counter.borrow_mut() -= 1;
|
||||
}
|
||||
res?;
|
||||
tracing::trace!("Frame written and synced");
|
||||
Ok(checksums)
|
||||
}
|
||||
|
|
|
@ -499,6 +499,7 @@ impl fmt::Debug for WalFileShared {
|
|||
|
||||
impl Wal for WalFile {
|
||||
/// Begin a read transaction.
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn begin_read_tx(&mut self) -> Result<LimboResult> {
|
||||
let max_frame_in_wal = self.get_shared().max_frame.load(Ordering::SeqCst);
|
||||
|
||||
|
@ -564,6 +565,7 @@ impl Wal for WalFile {
|
|||
|
||||
/// End a read transaction.
|
||||
#[inline(always)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn end_read_tx(&self) -> Result<LimboResult> {
|
||||
tracing::debug!("end_read_tx(lock={})", self.max_frame_read_lock_index);
|
||||
let read_lock = &mut self.get_shared().read_locks[self.max_frame_read_lock_index];
|
||||
|
@ -572,6 +574,7 @@ impl Wal for WalFile {
|
|||
}
|
||||
|
||||
/// Begin a write transaction
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn begin_write_tx(&mut self) -> Result<LimboResult> {
|
||||
let busy = !self.get_shared().write_lock.write();
|
||||
tracing::debug!("begin_write_transaction(busy={})", busy);
|
||||
|
@ -582,6 +585,7 @@ impl Wal for WalFile {
|
|||
}
|
||||
|
||||
/// End a write transaction
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn end_write_tx(&self) -> Result<LimboResult> {
|
||||
tracing::debug!("end_write_txn");
|
||||
self.get_shared().write_lock.unlock();
|
||||
|
@ -589,6 +593,7 @@ impl Wal for WalFile {
|
|||
}
|
||||
|
||||
/// Find the latest frame containing a page.
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn find_frame(&self, page_id: u64) -> Result<Option<u64>> {
|
||||
let shared = self.get_shared();
|
||||
let frames = shared.frame_cache.lock();
|
||||
|
@ -606,6 +611,7 @@ impl Wal for WalFile {
|
|||
}
|
||||
|
||||
/// Read a frame from the WAL.
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Arc<BufferPool>) -> Result<()> {
|
||||
tracing::debug!("read_frame({})", frame_id);
|
||||
let offset = self.frame_offset(frame_id);
|
||||
|
@ -624,6 +630,7 @@ impl Wal for WalFile {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn read_frame_raw(
|
||||
&self,
|
||||
frame_id: u64,
|
||||
|
@ -650,6 +657,7 @@ impl Wal for WalFile {
|
|||
}
|
||||
|
||||
/// Write a frame to the WAL.
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn append_frame(
|
||||
&mut self,
|
||||
page: PageRef,
|
||||
|
@ -660,12 +668,7 @@ impl Wal for WalFile {
|
|||
let max_frame = self.max_frame;
|
||||
let frame_id = if max_frame == 0 { 1 } else { max_frame + 1 };
|
||||
let offset = self.frame_offset(frame_id);
|
||||
tracing::debug!(
|
||||
"append_frame(frame={}, offset={}, page_id={})",
|
||||
frame_id,
|
||||
offset,
|
||||
page_id
|
||||
);
|
||||
tracing::debug!(frame_id, offset, page_id);
|
||||
let checksums = {
|
||||
let shared = self.get_shared();
|
||||
let header = shared.wal_header.clone();
|
||||
|
@ -699,13 +702,14 @@ impl Wal for WalFile {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn should_checkpoint(&self) -> bool {
|
||||
let shared = self.get_shared();
|
||||
let frame_id = shared.max_frame.load(Ordering::SeqCst) as usize;
|
||||
frame_id >= self.checkpoint_threshold
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn checkpoint(
|
||||
&mut self,
|
||||
pager: &Pager,
|
||||
|
@ -869,7 +873,7 @@ impl Wal for WalFile {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
#[instrument(err, skip_all, level = Level::INFO)]
|
||||
fn sync(&mut self) -> Result<WalFsyncStatus> {
|
||||
match self.sync_state.get() {
|
||||
SyncState::NotSyncing => {
|
||||
|
@ -911,6 +915,7 @@ impl Wal for WalFile {
|
|||
self.min_frame
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::INFO)]
|
||||
fn rollback(&mut self) -> Result<()> {
|
||||
// TODO(pere): have to remove things from frame_cache because they are no longer valid.
|
||||
// TODO(pere): clear page cache in pager.
|
||||
|
@ -918,7 +923,7 @@ impl Wal for WalFile {
|
|||
// TODO(pere): implement proper hashmap, this sucks :).
|
||||
let shared = self.get_shared();
|
||||
let max_frame = shared.max_frame.load(Ordering::SeqCst);
|
||||
tracing::trace!("rollback(to_max_frame={})", max_frame);
|
||||
tracing::debug!(to_max_frame = max_frame);
|
||||
let mut frame_cache = shared.frame_cache.lock();
|
||||
for (_, frames) in frame_cache.iter_mut() {
|
||||
let mut last_valid_frame = frames.len();
|
||||
|
@ -936,14 +941,11 @@ impl Wal for WalFile {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn finish_append_frames_commit(&mut self) -> Result<()> {
|
||||
let shared = self.get_shared();
|
||||
shared.max_frame.store(self.max_frame, Ordering::SeqCst);
|
||||
tracing::trace!(
|
||||
"finish_append_frames_commit(max_frame={}, last_checksum={:?})",
|
||||
self.max_frame,
|
||||
self.last_checksum
|
||||
);
|
||||
tracing::trace!(self.max_frame, ?self.last_checksum);
|
||||
shared.last_checksum = self.last_checksum;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@ use turso_sqlite3_parser::ast::{CompoundOperator, SortOrder};
|
|||
|
||||
use tracing::Level;
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn emit_program_for_compound_select(
|
||||
program: &mut ProgramBuilder,
|
||||
plan: Plan,
|
||||
|
|
|
@ -198,7 +198,7 @@ pub enum TransactionMode {
|
|||
|
||||
/// Main entry point for emitting bytecode for a SQL query
|
||||
/// Takes a query plan and generates the corresponding bytecode program
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn emit_program(
|
||||
program: &mut ProgramBuilder,
|
||||
plan: Plan,
|
||||
|
@ -216,7 +216,7 @@ pub fn emit_program(
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn emit_program_for_select(
|
||||
program: &mut ProgramBuilder,
|
||||
mut plan: SelectPlan,
|
||||
|
@ -255,7 +255,7 @@ fn emit_program_for_select(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn emit_query<'a>(
|
||||
program: &mut ProgramBuilder,
|
||||
plan: &'a mut SelectPlan,
|
||||
|
@ -395,7 +395,7 @@ pub fn emit_query<'a>(
|
|||
Ok(t_ctx.reg_result_cols_start.unwrap())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn emit_program_for_delete(
|
||||
program: &mut ProgramBuilder,
|
||||
plan: DeletePlan,
|
||||
|
@ -580,7 +580,7 @@ fn emit_delete_insns(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn emit_program_for_update(
|
||||
program: &mut ProgramBuilder,
|
||||
mut plan: UpdatePlan,
|
||||
|
@ -699,7 +699,7 @@ fn emit_program_for_update(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn emit_update_insns(
|
||||
plan: &UpdatePlan,
|
||||
t_ctx: &TranslateCtx,
|
||||
|
|
|
@ -27,7 +27,7 @@ pub struct ConditionMetadata {
|
|||
pub jump_target_when_false: BranchOffset,
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn emit_cond_jump(program: &mut ProgramBuilder, cond_meta: ConditionMetadata, reg: usize) {
|
||||
if cond_meta.jump_if_condition_is_true {
|
||||
program.emit_insn(Insn::If {
|
||||
|
@ -131,7 +131,7 @@ macro_rules! expect_arguments_even {
|
|||
}};
|
||||
}
|
||||
|
||||
#[instrument(skip(program, referenced_tables, expr, resolver), level = Level::TRACE)]
|
||||
#[instrument(skip(program, referenced_tables, expr, resolver), level = Level::INFO)]
|
||||
pub fn translate_condition_expr(
|
||||
program: &mut ProgramBuilder,
|
||||
referenced_tables: &TableReferences,
|
||||
|
|
|
@ -53,7 +53,7 @@ use transaction::{translate_tx_begin, translate_tx_commit};
|
|||
use turso_sqlite3_parser::ast::{self, Delete, Insert};
|
||||
use update::translate_update;
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn translate(
|
||||
schema: &Schema,
|
||||
|
|
|
@ -3,7 +3,7 @@ use crate::{
|
|||
schema::{self, Column, Schema, Type},
|
||||
translate::{collate::CollationSeq, expr::walk_expr, plan::JoinOrderMember},
|
||||
types::{Value, ValueType},
|
||||
LimboError, OpenFlags, Result, Statement, StepResult, SymbolTable, IO,
|
||||
LimboError, OpenFlags, Result, Statement, StepResult, SymbolTable,
|
||||
};
|
||||
use std::{rc::Rc, sync::Arc};
|
||||
use turso_sqlite3_parser::ast::{
|
||||
|
@ -51,7 +51,6 @@ struct UnparsedFromSqlIndex {
|
|||
pub fn parse_schema_rows(
|
||||
rows: Option<Statement>,
|
||||
schema: &mut Schema,
|
||||
io: Arc<dyn IO>,
|
||||
syms: &SymbolTable,
|
||||
mv_tx_id: Option<u64>,
|
||||
) -> Result<()> {
|
||||
|
@ -130,7 +129,7 @@ pub fn parse_schema_rows(
|
|||
StepResult::IO => {
|
||||
// TODO: How do we ensure that the I/O we submitted to
|
||||
// read the schema is actually complete?
|
||||
io.run_once()?;
|
||||
rows.run_once()?;
|
||||
}
|
||||
StepResult::Interrupt => break,
|
||||
StepResult::Done => break,
|
||||
|
|
|
@ -291,7 +291,7 @@ impl ProgramBuilder {
|
|||
});
|
||||
}
|
||||
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
#[instrument(skip(self), level = Level::INFO)]
|
||||
pub fn emit_insn(&mut self, insn: Insn) {
|
||||
let function = insn.to_function();
|
||||
// This seemingly empty trace here is needed so that a function span is emmited with it
|
||||
|
|
|
@ -4978,7 +4978,6 @@ pub fn op_parse_schema(
|
|||
parse_schema_rows(
|
||||
Some(stmt),
|
||||
&mut new_schema,
|
||||
conn.pager.io.clone(),
|
||||
&conn.syms.borrow(),
|
||||
state.mv_tx_id,
|
||||
)?;
|
||||
|
@ -4993,7 +4992,6 @@ pub fn op_parse_schema(
|
|||
parse_schema_rows(
|
||||
Some(stmt),
|
||||
&mut new_schema,
|
||||
conn.pager.io.clone(),
|
||||
&conn.syms.borrow(),
|
||||
state.mv_tx_id,
|
||||
)?;
|
||||
|
|
|
@ -368,6 +368,7 @@ pub struct Program {
|
|||
}
|
||||
|
||||
impl Program {
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn step(
|
||||
&self,
|
||||
state: &mut ProgramState,
|
||||
|
@ -382,8 +383,12 @@ impl Program {
|
|||
let _ = state.result_row.take();
|
||||
let (insn, insn_function) = &self.insns[state.pc as usize];
|
||||
trace_insn(self, state.pc as InsnReference, insn);
|
||||
let res = insn_function(self, state, insn, &pager, mv_store.as_ref())?;
|
||||
match res {
|
||||
let res = insn_function(self, state, insn, &pager, mv_store.as_ref());
|
||||
if res.is_err() {
|
||||
let state = self.connection.transaction_state.get();
|
||||
pager.rollback(state.change_schema(), &self.connection)?
|
||||
}
|
||||
match res? {
|
||||
InsnFunctionStepResult::Step => {}
|
||||
InsnFunctionStepResult::Done => return Ok(StepResult::Done),
|
||||
InsnFunctionStepResult::IO => return Ok(StepResult::IO),
|
||||
|
@ -394,7 +399,7 @@ impl Program {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn commit_txn(
|
||||
&self,
|
||||
pager: Rc<Pager>,
|
||||
|
@ -460,7 +465,7 @@ impl Program {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self, pager, connection), level = Level::TRACE)]
|
||||
#[instrument(skip(self, pager, connection), level = Level::INFO)]
|
||||
fn step_end_write_txn(
|
||||
&self,
|
||||
pager: &Rc<Pager>,
|
||||
|
@ -476,10 +481,16 @@ impl Program {
|
|||
connection.wal_checkpoint_disabled.get(),
|
||||
)?;
|
||||
match cacheflush_status {
|
||||
PagerCacheflushStatus::Done(_) => {
|
||||
PagerCacheflushStatus::Done(status) => {
|
||||
if self.change_cnt_on {
|
||||
self.connection.set_changes(self.n_change.get());
|
||||
}
|
||||
if matches!(
|
||||
status,
|
||||
crate::storage::pager::PagerCacheflushResult::Rollback
|
||||
) {
|
||||
pager.rollback(change_schema, connection)?;
|
||||
}
|
||||
connection.transaction_state.replace(TransactionState::None);
|
||||
*commit_state = CommitState::Ready;
|
||||
}
|
||||
|
@ -553,7 +564,7 @@ fn make_record(registers: &[Register], start_reg: &usize, count: &usize) -> Immu
|
|||
ImmutableRecord::from_registers(regs, regs.len())
|
||||
}
|
||||
|
||||
#[instrument(skip(program), level = Level::TRACE)]
|
||||
#[instrument(skip(program), level = Level::INFO)]
|
||||
fn trace_insn(program: &Program, addr: InsnReference, insn: &Insn) {
|
||||
if !tracing::enabled!(tracing::Level::TRACE) {
|
||||
return;
|
||||
|
|
|
@ -194,7 +194,7 @@ fn do_fuzz(expr: Expr) -> Result<Corpus, Box<dyn Error>> {
|
|||
loop {
|
||||
use turso_core::StepResult;
|
||||
match stmt.step()? {
|
||||
StepResult::IO => io.run_once()?,
|
||||
StepResult::IO => stmt.run_once()?,
|
||||
StepResult::Row => {
|
||||
let row = stmt.row().unwrap();
|
||||
assert_eq!(row.len(), 1, "expr: {:?}", expr);
|
||||
|
|
|
@ -21,7 +21,7 @@ if [[ -n "$iterations" ]]; then
|
|||
echo "Running limbo_sim for $iterations iterations..."
|
||||
for ((i=1; i<=iterations; i++)); do
|
||||
echo "Iteration $i of $iterations"
|
||||
cargo run -p limbo_sim
|
||||
cargo run -p limbo_sim -- --maximum-tests 2000
|
||||
done
|
||||
echo "Completed $iterations iterations"
|
||||
else
|
||||
|
|
|
@ -7,14 +7,14 @@ use std::{
|
|||
};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use turso_core::{Connection, Result, StepResult, IO};
|
||||
use turso_core::{Connection, Result, StepResult};
|
||||
|
||||
use crate::{
|
||||
model::{
|
||||
query::{update::Update, Create, CreateIndex, Delete, Drop, Insert, Query, Select},
|
||||
table::SimValue,
|
||||
},
|
||||
runner::{env::SimConnection, io::SimulatorIO},
|
||||
runner::env::SimConnection,
|
||||
SimulatorEnv,
|
||||
};
|
||||
|
||||
|
@ -266,7 +266,7 @@ impl Display for Interaction {
|
|||
}
|
||||
}
|
||||
|
||||
type AssertionFunc = dyn Fn(&Vec<ResultSet>, &SimulatorEnv) -> Result<bool>;
|
||||
type AssertionFunc = dyn Fn(&Vec<ResultSet>, &mut SimulatorEnv) -> Result<bool>;
|
||||
|
||||
enum AssertionAST {
|
||||
Pick(),
|
||||
|
@ -411,7 +411,7 @@ impl Interaction {
|
|||
}
|
||||
}
|
||||
}
|
||||
pub(crate) fn execute_query(&self, conn: &mut Arc<Connection>, io: &SimulatorIO) -> ResultSet {
|
||||
pub(crate) fn execute_query(&self, conn: &mut Arc<Connection>) -> ResultSet {
|
||||
if let Self::Query(query) = self {
|
||||
let query_str = query.to_string();
|
||||
let rows = conn.query(&query_str);
|
||||
|
@ -440,7 +440,7 @@ impl Interaction {
|
|||
out.push(r);
|
||||
}
|
||||
StepResult::IO => {
|
||||
io.run_once().unwrap();
|
||||
rows.run_once().unwrap();
|
||||
}
|
||||
StepResult::Interrupt => {}
|
||||
StepResult::Done => {
|
||||
|
@ -459,7 +459,7 @@ impl Interaction {
|
|||
pub(crate) fn execute_assertion(
|
||||
&self,
|
||||
stack: &Vec<ResultSet>,
|
||||
env: &SimulatorEnv,
|
||||
env: &mut SimulatorEnv,
|
||||
) -> Result<()> {
|
||||
match self {
|
||||
Self::Assertion(assertion) => {
|
||||
|
@ -484,7 +484,7 @@ impl Interaction {
|
|||
pub(crate) fn execute_assumption(
|
||||
&self,
|
||||
stack: &Vec<ResultSet>,
|
||||
env: &SimulatorEnv,
|
||||
env: &mut SimulatorEnv,
|
||||
) -> Result<()> {
|
||||
match self {
|
||||
Self::Assumption(assumption) => {
|
||||
|
@ -664,6 +664,7 @@ fn reopen_database(env: &mut SimulatorEnv) {
|
|||
env.connections.clear();
|
||||
|
||||
// Clear all open files
|
||||
// TODO: for correct reporting of faults we should get all the recorded numbers and transfer to the new file
|
||||
env.io.files.borrow_mut().clear();
|
||||
|
||||
// 2. Re-open database
|
||||
|
|
|
@ -11,6 +11,7 @@ use crate::{
|
|||
Create, Delete, Drop, Insert, Query, Select,
|
||||
},
|
||||
table::SimValue,
|
||||
FAULT_ERROR_MSG,
|
||||
},
|
||||
runner::env::SimulatorEnv,
|
||||
};
|
||||
|
@ -200,7 +201,7 @@ impl Property {
|
|||
message: format!("table {} exists", insert.table()),
|
||||
func: Box::new({
|
||||
let table_name = table.clone();
|
||||
move |_: &Vec<ResultSet>, env: &SimulatorEnv| {
|
||||
move |_: &Vec<ResultSet>, env: &mut SimulatorEnv| {
|
||||
Ok(env.tables.iter().any(|t| t.name == table_name))
|
||||
}
|
||||
}),
|
||||
|
@ -221,7 +222,7 @@ impl Property {
|
|||
.map(|i| !i.end_with_commit)
|
||||
.unwrap_or(false),
|
||||
),
|
||||
func: Box::new(move |stack: &Vec<ResultSet>, _: &SimulatorEnv| {
|
||||
func: Box::new(move |stack: &Vec<ResultSet>, _| {
|
||||
let rows = stack.last().unwrap();
|
||||
match rows {
|
||||
Ok(rows) => {
|
||||
|
@ -248,7 +249,7 @@ impl Property {
|
|||
let assumption = Interaction::Assumption(Assertion {
|
||||
message: "Double-Create-Failure should not be called on an existing table"
|
||||
.to_string(),
|
||||
func: Box::new(move |_: &Vec<ResultSet>, env: &SimulatorEnv| {
|
||||
func: Box::new(move |_: &Vec<ResultSet>, env: &mut SimulatorEnv| {
|
||||
Ok(!env.tables.iter().any(|t| t.name == table_name))
|
||||
}),
|
||||
});
|
||||
|
@ -259,17 +260,17 @@ impl Property {
|
|||
let table_name = create.table.name.clone();
|
||||
|
||||
let assertion = Interaction::Assertion(Assertion {
|
||||
message:
|
||||
"creating two tables with the name should result in a failure for the second query"
|
||||
.to_string(),
|
||||
func: Box::new(move |stack: &Vec<ResultSet>, _: &SimulatorEnv| {
|
||||
let last = stack.last().unwrap();
|
||||
match last {
|
||||
Ok(_) => Ok(false),
|
||||
Err(e) => Ok(e.to_string().to_lowercase().contains(&format!("table {table_name} already exists"))),
|
||||
}
|
||||
}),
|
||||
});
|
||||
message:
|
||||
"creating two tables with the name should result in a failure for the second query"
|
||||
.to_string(),
|
||||
func: Box::new(move |stack: &Vec<ResultSet>, _| {
|
||||
let last = stack.last().unwrap();
|
||||
match last {
|
||||
Ok(_) => Ok(false),
|
||||
Err(e) => Ok(e.to_string().to_lowercase().contains(&format!("table {table_name} already exists"))),
|
||||
}
|
||||
}),
|
||||
});
|
||||
|
||||
let mut interactions = Vec::new();
|
||||
interactions.push(assumption);
|
||||
|
@ -287,7 +288,7 @@ impl Property {
|
|||
message: format!("table {} exists", table_name),
|
||||
func: Box::new({
|
||||
let table_name = table_name.clone();
|
||||
move |_: &Vec<ResultSet>, env: &SimulatorEnv| {
|
||||
move |_, env: &mut SimulatorEnv| {
|
||||
Ok(env.tables.iter().any(|t| t.name == table_name))
|
||||
}
|
||||
}),
|
||||
|
@ -299,7 +300,7 @@ impl Property {
|
|||
|
||||
let assertion = Interaction::Assertion(Assertion {
|
||||
message: "select query should respect the limit clause".to_string(),
|
||||
func: Box::new(move |stack: &Vec<ResultSet>, _: &SimulatorEnv| {
|
||||
func: Box::new(move |stack: &Vec<ResultSet>, _| {
|
||||
let last = stack.last().unwrap();
|
||||
match last {
|
||||
Ok(rows) => Ok(limit >= rows.len()),
|
||||
|
@ -323,7 +324,7 @@ impl Property {
|
|||
message: format!("table {} exists", table),
|
||||
func: Box::new({
|
||||
let table = table.clone();
|
||||
move |_: &Vec<ResultSet>, env: &SimulatorEnv| {
|
||||
move |_: &Vec<ResultSet>, env: &mut SimulatorEnv| {
|
||||
Ok(env.tables.iter().any(|t| t.name == table))
|
||||
}
|
||||
}),
|
||||
|
@ -344,7 +345,7 @@ impl Property {
|
|||
|
||||
let assertion = Interaction::Assertion(Assertion {
|
||||
message: format!("`{}` should return no values for table `{}`", select, table,),
|
||||
func: Box::new(move |stack: &Vec<ResultSet>, _: &SimulatorEnv| {
|
||||
func: Box::new(move |stack: &Vec<ResultSet>, _| {
|
||||
let rows = stack.last().unwrap();
|
||||
match rows {
|
||||
Ok(rows) => Ok(rows.is_empty()),
|
||||
|
@ -371,7 +372,7 @@ impl Property {
|
|||
message: format!("table {} exists", table),
|
||||
func: Box::new({
|
||||
let table = table.clone();
|
||||
move |_: &Vec<ResultSet>, env: &SimulatorEnv| {
|
||||
move |_, env: &mut SimulatorEnv| {
|
||||
Ok(env.tables.iter().any(|t| t.name == table))
|
||||
}
|
||||
}),
|
||||
|
@ -384,7 +385,7 @@ impl Property {
|
|||
"select query should result in an error for table '{}'",
|
||||
table
|
||||
),
|
||||
func: Box::new(move |stack: &Vec<ResultSet>, _: &SimulatorEnv| {
|
||||
func: Box::new(move |stack: &Vec<ResultSet>, _| {
|
||||
let last = stack.last().unwrap();
|
||||
match last {
|
||||
Ok(_) => Ok(false),
|
||||
|
@ -416,7 +417,7 @@ impl Property {
|
|||
message: format!("table {} exists", table),
|
||||
func: Box::new({
|
||||
let table = table.clone();
|
||||
move |_: &Vec<ResultSet>, env: &SimulatorEnv| {
|
||||
move |_: &Vec<ResultSet>, env: &mut SimulatorEnv| {
|
||||
Ok(env.tables.iter().any(|t| t.name == table))
|
||||
}
|
||||
}),
|
||||
|
@ -440,7 +441,7 @@ impl Property {
|
|||
|
||||
let assertion = Interaction::Assertion(Assertion {
|
||||
message: "select queries should return the same amount of results".to_string(),
|
||||
func: Box::new(move |stack: &Vec<ResultSet>, _: &SimulatorEnv| {
|
||||
func: Box::new(move |stack: &Vec<ResultSet>, _| {
|
||||
let select_star = stack.last().unwrap();
|
||||
let select_predicate = stack.get(stack.len() - 2).unwrap();
|
||||
match (select_predicate, select_star) {
|
||||
|
@ -476,7 +477,35 @@ impl Property {
|
|||
}
|
||||
Property::FaultyQuery { query, tables } => {
|
||||
let checks = assert_all_table_values(tables);
|
||||
let first = std::iter::once(Interaction::FaultyQuery(query.clone()));
|
||||
let query_clone = query.clone();
|
||||
let assumption = Assertion {
|
||||
// A fault may not occur as we first signal we want a fault injected,
|
||||
// then when IO is called the fault triggers. It may happen that a fault is injected
|
||||
// but no IO happens right after it
|
||||
message: "fault occured".to_string(),
|
||||
func: Box::new(move |stack, env| {
|
||||
let last = stack.last().unwrap();
|
||||
match last {
|
||||
Ok(_) => {
|
||||
query_clone.shadow(env);
|
||||
Ok(true)
|
||||
}
|
||||
Err(err) => {
|
||||
let msg = format!("{}", err);
|
||||
if msg.contains(FAULT_ERROR_MSG) {
|
||||
Ok(true)
|
||||
} else {
|
||||
Err(LimboError::InternalError(msg))
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
};
|
||||
let first = [
|
||||
Interaction::FaultyQuery(query.clone()),
|
||||
Interaction::Assumption(assumption),
|
||||
]
|
||||
.into_iter();
|
||||
Vec::from_iter(first.chain(checks))
|
||||
}
|
||||
}
|
||||
|
@ -493,15 +522,15 @@ fn assert_all_table_values(tables: &[String]) -> impl Iterator<Item = Interactio
|
|||
distinct: Distinctness::All,
|
||||
}));
|
||||
let assertion = Interaction::Assertion(Assertion {
|
||||
message: format!(
|
||||
"table {} should contain all of its values after the wal reopened",
|
||||
table
|
||||
),
|
||||
message: format!("table {} should contain all of its values", table),
|
||||
func: Box::new({
|
||||
let table = table.clone();
|
||||
move |stack: &Vec<ResultSet>, env: &SimulatorEnv| {
|
||||
move |stack: &Vec<ResultSet>, env: &mut SimulatorEnv| {
|
||||
let table = env.tables.iter().find(|t| t.name == table).ok_or_else(|| {
|
||||
LimboError::InternalError(format!("table {} should exist", table))
|
||||
LimboError::InternalError(format!(
|
||||
"table {} should exist in simulator env",
|
||||
table
|
||||
))
|
||||
})?;
|
||||
let last = stack.last().unwrap();
|
||||
match last {
|
||||
|
|
|
@ -1,2 +1,4 @@
|
|||
pub mod query;
|
||||
pub mod table;
|
||||
|
||||
pub(crate) const FAULT_ERROR_MSG: &str = "Injected fault";
|
||||
|
|
|
@ -54,7 +54,7 @@ pub struct SimulatorCLI {
|
|||
pub disable_delete: bool,
|
||||
#[clap(long, help = "disable CREATE Statement", default_value_t = false)]
|
||||
pub disable_create: bool,
|
||||
#[clap(long, help = "disable CREATE INDEX Statement", default_value_t = false)]
|
||||
#[clap(long, help = "disable CREATE INDEX Statement", default_value_t = true)]
|
||||
pub disable_create_index: bool,
|
||||
#[clap(long, help = "disable DROP Statement", default_value_t = false)]
|
||||
pub disable_drop: bool,
|
||||
|
@ -84,7 +84,7 @@ pub struct SimulatorCLI {
|
|||
pub disable_select_optimizer: bool,
|
||||
#[clap(long, help = "disable FsyncNoWait Property", default_value_t = true)]
|
||||
pub disable_fsync_no_wait: bool,
|
||||
#[clap(long, help = "disable FaultyQuery Property", default_value_t = true)]
|
||||
#[clap(long, help = "disable FaultyQuery Property", default_value_t = false)]
|
||||
pub disable_faulty_query: bool,
|
||||
#[clap(long, help = "disable Reopen-Database fault", default_value_t = false)]
|
||||
pub disable_reopen_database: bool,
|
||||
|
|
|
@ -191,7 +191,7 @@ pub(crate) fn execute_interaction(
|
|||
SimConnection::Disconnected => unreachable!(),
|
||||
};
|
||||
|
||||
let results = interaction.execute_query(conn, &env.io);
|
||||
let results = interaction.execute_query(conn);
|
||||
tracing::debug!(?results);
|
||||
stack.push(results);
|
||||
limbo_integrity_check(conn)?;
|
||||
|
|
|
@ -7,6 +7,8 @@ use rand::Rng as _;
|
|||
use rand_chacha::ChaCha8Rng;
|
||||
use tracing::{instrument, Level};
|
||||
use turso_core::{CompletionType, File, Result};
|
||||
|
||||
use crate::model::FAULT_ERROR_MSG;
|
||||
pub(crate) struct SimulatorFile {
|
||||
pub(crate) inner: Arc<dyn File>,
|
||||
pub(crate) fault: Cell<bool>,
|
||||
|
@ -88,7 +90,7 @@ impl File for SimulatorFile {
|
|||
fn lock_file(&self, exclusive: bool) -> Result<()> {
|
||||
if self.fault.get() {
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"Injected fault".into(),
|
||||
FAULT_ERROR_MSG.into(),
|
||||
));
|
||||
}
|
||||
self.inner.lock_file(exclusive)
|
||||
|
@ -97,7 +99,7 @@ impl File for SimulatorFile {
|
|||
fn unlock_file(&self) -> Result<()> {
|
||||
if self.fault.get() {
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"Injected fault".into(),
|
||||
FAULT_ERROR_MSG.into(),
|
||||
));
|
||||
}
|
||||
self.inner.unlock_file()
|
||||
|
@ -113,7 +115,7 @@ impl File for SimulatorFile {
|
|||
tracing::debug!("pread fault");
|
||||
self.nr_pread_faults.set(self.nr_pread_faults.get() + 1);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"Injected fault".into(),
|
||||
FAULT_ERROR_MSG.into(),
|
||||
));
|
||||
}
|
||||
if let Some(latency) = self.generate_latency_duration() {
|
||||
|
@ -148,7 +150,7 @@ impl File for SimulatorFile {
|
|||
tracing::debug!("pwrite fault");
|
||||
self.nr_pwrite_faults.set(self.nr_pwrite_faults.get() + 1);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"Injected fault".into(),
|
||||
FAULT_ERROR_MSG.into(),
|
||||
));
|
||||
}
|
||||
if let Some(latency) = self.generate_latency_duration() {
|
||||
|
@ -178,7 +180,7 @@ impl File for SimulatorFile {
|
|||
tracing::debug!("sync fault");
|
||||
self.nr_sync_faults.set(self.nr_sync_faults.get() + 1);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"Injected fault".into(),
|
||||
FAULT_ERROR_MSG.into(),
|
||||
));
|
||||
}
|
||||
if let Some(latency) = self.generate_latency_duration() {
|
||||
|
|
|
@ -7,7 +7,7 @@ use rand::{RngCore, SeedableRng};
|
|||
use rand_chacha::ChaCha8Rng;
|
||||
use turso_core::{Clock, Instant, OpenFlags, PlatformIO, Result, IO};
|
||||
|
||||
use crate::runner::file::SimulatorFile;
|
||||
use crate::{model::FAULT_ERROR_MSG, runner::file::SimulatorFile};
|
||||
|
||||
pub(crate) struct SimulatorIO {
|
||||
pub(crate) inner: Box<dyn IO>,
|
||||
|
@ -104,7 +104,7 @@ impl IO for SimulatorIO {
|
|||
self.nr_run_once_faults
|
||||
.replace(self.nr_run_once_faults.get() + 1);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"Injected fault".into(),
|
||||
FAULT_ERROR_MSG.into(),
|
||||
));
|
||||
}
|
||||
self.inner.run_once()?;
|
||||
|
|
|
@ -60,6 +60,7 @@ impl InteractionPlan {
|
|||
.uses()
|
||||
.iter()
|
||||
.any(|t| depending_tables.contains(t));
|
||||
|
||||
if has_table {
|
||||
// Remove the extensional parts of the properties
|
||||
if let Interactions::Property(p) = interactions {
|
||||
|
@ -82,13 +83,15 @@ impl InteractionPlan {
|
|||
.iter()
|
||||
.any(|t| depending_tables.contains(t));
|
||||
}
|
||||
has_table
|
||||
&& !matches!(
|
||||
interactions,
|
||||
Interactions::Query(Query::Select(_))
|
||||
| Interactions::Property(Property::SelectLimit { .. })
|
||||
| Interactions::Property(Property::SelectSelectOptimizer { .. })
|
||||
)
|
||||
let is_fault = matches!(interactions, Interactions::Fault(..));
|
||||
is_fault
|
||||
|| (has_table
|
||||
&& !matches!(
|
||||
interactions,
|
||||
Interactions::Query(Query::Select(_))
|
||||
| Interactions::Property(Property::SelectLimit { .. })
|
||||
| Interactions::Property(Property::SelectSelectOptimizer { .. })
|
||||
))
|
||||
};
|
||||
idx += 1;
|
||||
retain
|
||||
|
|
|
@ -247,12 +247,11 @@ pub unsafe extern "C" fn sqlite3_step(stmt: *mut sqlite3_stmt) -> ffi::c_int {
|
|||
let stmt = &mut *stmt;
|
||||
let db = &mut *stmt.db;
|
||||
loop {
|
||||
let db = db.inner.lock().unwrap();
|
||||
let _db = db.inner.lock().unwrap();
|
||||
if let Ok(result) = stmt.stmt.step() {
|
||||
match result {
|
||||
turso_core::StepResult::IO => {
|
||||
let io = db.io.clone();
|
||||
io.run_once().unwrap();
|
||||
stmt.stmt.run_once().unwrap();
|
||||
continue;
|
||||
}
|
||||
turso_core::StepResult::Done => return SQLITE_DONE,
|
||||
|
|
|
@ -183,7 +183,7 @@ pub(crate) fn sqlite_exec_rows(
|
|||
}
|
||||
|
||||
pub(crate) fn limbo_exec_rows(
|
||||
db: &TempDatabase,
|
||||
_db: &TempDatabase,
|
||||
conn: &Arc<turso_core::Connection>,
|
||||
query: &str,
|
||||
) -> Vec<Vec<rusqlite::types::Value>> {
|
||||
|
@ -198,7 +198,7 @@ pub(crate) fn limbo_exec_rows(
|
|||
break row;
|
||||
}
|
||||
turso_core::StepResult::IO => {
|
||||
db.io.run_once().unwrap();
|
||||
stmt.run_once().unwrap();
|
||||
continue;
|
||||
}
|
||||
turso_core::StepResult::Done => break 'outer,
|
||||
|
@ -221,7 +221,7 @@ pub(crate) fn limbo_exec_rows(
|
|||
}
|
||||
|
||||
pub(crate) fn limbo_exec_rows_error(
|
||||
db: &TempDatabase,
|
||||
_db: &TempDatabase,
|
||||
conn: &Arc<turso_core::Connection>,
|
||||
query: &str,
|
||||
) -> turso_core::Result<()> {
|
||||
|
@ -230,7 +230,7 @@ pub(crate) fn limbo_exec_rows_error(
|
|||
let result = stmt.step()?;
|
||||
match result {
|
||||
turso_core::StepResult::IO => {
|
||||
db.io.run_once()?;
|
||||
stmt.run_once()?;
|
||||
continue;
|
||||
}
|
||||
turso_core::StepResult::Done => return Ok(()),
|
||||
|
|
|
@ -16,7 +16,7 @@ fn test_last_insert_rowid_basic() -> anyhow::Result<()> {
|
|||
loop {
|
||||
match rows.step()? {
|
||||
StepResult::IO => {
|
||||
tmp_db.io.run_once()?;
|
||||
rows.run_once()?;
|
||||
}
|
||||
StepResult::Done => break,
|
||||
_ => unreachable!(),
|
||||
|
@ -36,7 +36,7 @@ fn test_last_insert_rowid_basic() -> anyhow::Result<()> {
|
|||
}
|
||||
}
|
||||
StepResult::IO => {
|
||||
tmp_db.io.run_once()?;
|
||||
rows.run_once()?;
|
||||
}
|
||||
StepResult::Interrupt => break,
|
||||
StepResult::Done => break,
|
||||
|
@ -50,7 +50,7 @@ fn test_last_insert_rowid_basic() -> anyhow::Result<()> {
|
|||
Ok(Some(ref mut rows)) => loop {
|
||||
match rows.step()? {
|
||||
StepResult::IO => {
|
||||
tmp_db.io.run_once()?;
|
||||
rows.run_once()?;
|
||||
}
|
||||
StepResult::Done => break,
|
||||
_ => unreachable!(),
|
||||
|
@ -72,7 +72,7 @@ fn test_last_insert_rowid_basic() -> anyhow::Result<()> {
|
|||
}
|
||||
}
|
||||
StepResult::IO => {
|
||||
tmp_db.io.run_once()?;
|
||||
rows.run_once()?;
|
||||
}
|
||||
StepResult::Interrupt => break,
|
||||
StepResult::Done => break,
|
||||
|
@ -101,7 +101,7 @@ fn test_integer_primary_key() -> anyhow::Result<()> {
|
|||
let mut insert_query = conn.query(query)?.unwrap();
|
||||
loop {
|
||||
match insert_query.step()? {
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => insert_query.run_once()?,
|
||||
StepResult::Done => break,
|
||||
_ => unreachable!(),
|
||||
}
|
||||
|
@ -117,7 +117,7 @@ fn test_integer_primary_key() -> anyhow::Result<()> {
|
|||
rowids.push(*id);
|
||||
}
|
||||
}
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => select_query.run_once()?,
|
||||
StepResult::Interrupt | StepResult::Done => break,
|
||||
StepResult::Busy => panic!("Database is busy"),
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ fn test_statement_reset_bind() -> anyhow::Result<()> {
|
|||
turso_core::Value::Integer(1)
|
||||
);
|
||||
}
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => stmt.run_once()?,
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
|
@ -37,7 +37,7 @@ fn test_statement_reset_bind() -> anyhow::Result<()> {
|
|||
turso_core::Value::Integer(2)
|
||||
);
|
||||
}
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => stmt.run_once()?,
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
|
@ -88,7 +88,7 @@ fn test_statement_bind() -> anyhow::Result<()> {
|
|||
}
|
||||
}
|
||||
StepResult::IO => {
|
||||
tmp_db.io.run_once()?;
|
||||
stmt.run_once()?;
|
||||
}
|
||||
StepResult::Interrupt => break,
|
||||
StepResult::Done => break,
|
||||
|
@ -125,7 +125,7 @@ fn test_insert_parameter_remap() -> anyhow::Result<()> {
|
|||
}
|
||||
loop {
|
||||
match ins.step()? {
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => ins.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
_ => {}
|
||||
|
@ -150,7 +150,7 @@ fn test_insert_parameter_remap() -> anyhow::Result<()> {
|
|||
// D = 22
|
||||
assert_eq!(row.get::<&Value>(3).unwrap(), &Value::Integer(22));
|
||||
}
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => sel.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
}
|
||||
|
@ -196,7 +196,7 @@ fn test_insert_parameter_remap_all_params() -> anyhow::Result<()> {
|
|||
// execute the insert (no rows returned)
|
||||
loop {
|
||||
match ins.step()? {
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => ins.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
_ => {}
|
||||
|
@ -222,7 +222,7 @@ fn test_insert_parameter_remap_all_params() -> anyhow::Result<()> {
|
|||
// D = 999
|
||||
assert_eq!(row.get::<&Value>(3).unwrap(), &Value::Integer(999));
|
||||
}
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => sel.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
}
|
||||
|
@ -264,7 +264,7 @@ fn test_insert_parameter_multiple_remap_backwards() -> anyhow::Result<()> {
|
|||
// execute the insert (no rows returned)
|
||||
loop {
|
||||
match ins.step()? {
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => ins.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
_ => {}
|
||||
|
@ -290,7 +290,7 @@ fn test_insert_parameter_multiple_remap_backwards() -> anyhow::Result<()> {
|
|||
// D = 999
|
||||
assert_eq!(row.get::<&Value>(3).unwrap(), &Value::Integer(444));
|
||||
}
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => sel.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
}
|
||||
|
@ -331,7 +331,7 @@ fn test_insert_parameter_multiple_no_remap() -> anyhow::Result<()> {
|
|||
// execute the insert (no rows returned)
|
||||
loop {
|
||||
match ins.step()? {
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => ins.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
_ => {}
|
||||
|
@ -357,7 +357,7 @@ fn test_insert_parameter_multiple_no_remap() -> anyhow::Result<()> {
|
|||
// D = 999
|
||||
assert_eq!(row.get::<&Value>(3).unwrap(), &Value::Integer(444));
|
||||
}
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => sel.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
}
|
||||
|
@ -402,7 +402,7 @@ fn test_insert_parameter_multiple_row() -> anyhow::Result<()> {
|
|||
// execute the insert (no rows returned)
|
||||
loop {
|
||||
match ins.step()? {
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => ins.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
_ => {}
|
||||
|
@ -434,7 +434,7 @@ fn test_insert_parameter_multiple_row() -> anyhow::Result<()> {
|
|||
);
|
||||
i += 1;
|
||||
}
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => sel.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
}
|
||||
|
@ -450,7 +450,7 @@ fn test_bind_parameters_update_query() -> anyhow::Result<()> {
|
|||
let mut ins = conn.prepare("insert into test (a, b) values (3, 'test1');")?;
|
||||
loop {
|
||||
match ins.step()? {
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => ins.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
_ => {}
|
||||
|
@ -461,7 +461,7 @@ fn test_bind_parameters_update_query() -> anyhow::Result<()> {
|
|||
ins.bind_at(2.try_into()?, Value::build_text("test1"));
|
||||
loop {
|
||||
match ins.step()? {
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => ins.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
_ => {}
|
||||
|
@ -476,7 +476,7 @@ fn test_bind_parameters_update_query() -> anyhow::Result<()> {
|
|||
assert_eq!(row.get::<&Value>(0).unwrap(), &Value::Integer(222));
|
||||
assert_eq!(row.get::<&Value>(1).unwrap(), &Value::build_text("test1"),);
|
||||
}
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => sel.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
}
|
||||
|
@ -495,7 +495,7 @@ fn test_bind_parameters_update_query_multiple_where() -> anyhow::Result<()> {
|
|||
let mut ins = conn.prepare("insert into test (a, b, c, d) values (3, 'test1', 4, 5);")?;
|
||||
loop {
|
||||
match ins.step()? {
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => ins.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
_ => {}
|
||||
|
@ -507,7 +507,7 @@ fn test_bind_parameters_update_query_multiple_where() -> anyhow::Result<()> {
|
|||
ins.bind_at(3.try_into()?, Value::Integer(5));
|
||||
loop {
|
||||
match ins.step()? {
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => ins.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
_ => {}
|
||||
|
@ -524,7 +524,7 @@ fn test_bind_parameters_update_query_multiple_where() -> anyhow::Result<()> {
|
|||
assert_eq!(row.get::<&Value>(2).unwrap(), &Value::Integer(4));
|
||||
assert_eq!(row.get::<&Value>(3).unwrap(), &Value::Integer(5));
|
||||
}
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => sel.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
}
|
||||
|
@ -543,7 +543,7 @@ fn test_bind_parameters_update_rowid_alias() -> anyhow::Result<()> {
|
|||
let mut ins = conn.prepare("insert into test (id, name) values (1, 'test');")?;
|
||||
loop {
|
||||
match ins.step()? {
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => ins.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
_ => {}
|
||||
|
@ -558,7 +558,7 @@ fn test_bind_parameters_update_rowid_alias() -> anyhow::Result<()> {
|
|||
assert_eq!(row.get::<&Value>(0).unwrap(), &Value::Integer(1));
|
||||
assert_eq!(row.get::<&Value>(1).unwrap(), &Value::build_text("test"),);
|
||||
}
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => sel.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
}
|
||||
|
@ -568,7 +568,7 @@ fn test_bind_parameters_update_rowid_alias() -> anyhow::Result<()> {
|
|||
ins.bind_at(2.try_into()?, Value::Integer(1));
|
||||
loop {
|
||||
match ins.step()? {
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => ins.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
_ => {}
|
||||
|
@ -583,7 +583,7 @@ fn test_bind_parameters_update_rowid_alias() -> anyhow::Result<()> {
|
|||
assert_eq!(row.get::<&Value>(0).unwrap(), &Value::Integer(1));
|
||||
assert_eq!(row.get::<&Value>(1).unwrap(), &Value::build_text("updated"),);
|
||||
}
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => sel.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
}
|
||||
|
@ -618,7 +618,7 @@ fn test_bind_parameters_update_rowid_alias_seek_rowid() -> anyhow::Result<()> {
|
|||
&Value::Integer(if i == 0 { 4 } else { 11 })
|
||||
);
|
||||
}
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => sel.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
}
|
||||
|
@ -631,7 +631,7 @@ fn test_bind_parameters_update_rowid_alias_seek_rowid() -> anyhow::Result<()> {
|
|||
ins.bind_at(4.try_into()?, Value::Integer(5));
|
||||
loop {
|
||||
match ins.step()? {
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => ins.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
_ => {}
|
||||
|
@ -649,7 +649,7 @@ fn test_bind_parameters_update_rowid_alias_seek_rowid() -> anyhow::Result<()> {
|
|||
&Value::build_text(if i == 0 { "updated" } else { "test" }),
|
||||
);
|
||||
}
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => sel.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
}
|
||||
|
@ -678,7 +678,7 @@ fn test_bind_parameters_delete_rowid_alias_seek_out_of_order() -> anyhow::Result
|
|||
ins.bind_at(4.try_into()?, Value::build_text("test"));
|
||||
loop {
|
||||
match ins.step()? {
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => ins.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
_ => {}
|
||||
|
@ -693,7 +693,7 @@ fn test_bind_parameters_delete_rowid_alias_seek_out_of_order() -> anyhow::Result
|
|||
let row = sel.row().unwrap();
|
||||
assert_eq!(row.get::<&Value>(0).unwrap(), &Value::build_text("correct"),);
|
||||
}
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => sel.run_once()?,
|
||||
StepResult::Done | StepResult::Interrupt => break,
|
||||
StepResult::Busy => panic!("database busy"),
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ fn test_simple_overflow_page() -> anyhow::Result<()> {
|
|||
Ok(Some(ref mut rows)) => loop {
|
||||
match rows.step()? {
|
||||
StepResult::IO => {
|
||||
tmp_db.io.run_once()?;
|
||||
rows.run_once()?;
|
||||
}
|
||||
StepResult::Done => break,
|
||||
_ => unreachable!(),
|
||||
|
@ -68,7 +68,7 @@ fn test_simple_overflow_page() -> anyhow::Result<()> {
|
|||
compare_string(&huge_text, text);
|
||||
}
|
||||
StepResult::IO => {
|
||||
tmp_db.io.run_once()?;
|
||||
rows.run_once()?;
|
||||
}
|
||||
StepResult::Interrupt => break,
|
||||
StepResult::Done => break,
|
||||
|
@ -110,7 +110,7 @@ fn test_sequential_overflow_page() -> anyhow::Result<()> {
|
|||
Ok(Some(ref mut rows)) => loop {
|
||||
match rows.step()? {
|
||||
StepResult::IO => {
|
||||
tmp_db.io.run_once()?;
|
||||
rows.run_once()?;
|
||||
}
|
||||
StepResult::Done => break,
|
||||
_ => unreachable!(),
|
||||
|
@ -138,7 +138,7 @@ fn test_sequential_overflow_page() -> anyhow::Result<()> {
|
|||
current_index += 1;
|
||||
}
|
||||
StepResult::IO => {
|
||||
tmp_db.io.run_once()?;
|
||||
rows.run_once()?;
|
||||
}
|
||||
StepResult::Interrupt => break,
|
||||
StepResult::Done => break,
|
||||
|
@ -247,7 +247,7 @@ fn test_statement_reset() -> anyhow::Result<()> {
|
|||
);
|
||||
break;
|
||||
}
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => stmt.run_once()?,
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
|
@ -264,7 +264,7 @@ fn test_statement_reset() -> anyhow::Result<()> {
|
|||
);
|
||||
break;
|
||||
}
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::IO => stmt.run_once()?,
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
|
@ -748,7 +748,7 @@ fn run_query_on_row(
|
|||
}
|
||||
|
||||
fn run_query_core(
|
||||
tmp_db: &TempDatabase,
|
||||
_tmp_db: &TempDatabase,
|
||||
conn: &Arc<Connection>,
|
||||
query: &str,
|
||||
mut on_row: Option<impl FnMut(&Row)>,
|
||||
|
@ -757,7 +757,7 @@ fn run_query_core(
|
|||
Ok(Some(ref mut rows)) => loop {
|
||||
match rows.step()? {
|
||||
StepResult::IO => {
|
||||
tmp_db.io.run_once()?;
|
||||
rows.run_once()?;
|
||||
}
|
||||
StepResult::Done => break,
|
||||
StepResult::Row => {
|
||||
|
|
|
@ -13,7 +13,7 @@ fn test_wal_checkpoint_result() -> Result<()> {
|
|||
let conn = tmp_db.connect_limbo();
|
||||
conn.execute("CREATE TABLE t1 (id text);")?;
|
||||
|
||||
let res = execute_and_get_strings(&tmp_db, &conn, "pragma journal_mode;")?;
|
||||
let res = execute_and_get_strings(&conn, "pragma journal_mode;")?;
|
||||
assert_eq!(res, vec!["wal"]);
|
||||
|
||||
conn.execute("insert into t1(id) values (1), (2);")?;
|
||||
|
@ -22,7 +22,7 @@ fn test_wal_checkpoint_result() -> Result<()> {
|
|||
do_flush(&conn, &tmp_db).unwrap();
|
||||
|
||||
// checkpoint result should return > 0 num pages now as database has data
|
||||
let res = execute_and_get_ints(&tmp_db, &conn, "pragma wal_checkpoint;")?;
|
||||
let res = execute_and_get_ints(&conn, "pragma wal_checkpoint;")?;
|
||||
println!("'pragma wal_checkpoint;' returns: {res:?}");
|
||||
assert_eq!(res.len(), 3);
|
||||
assert_eq!(res[0], 0); // checkpoint successfully
|
||||
|
@ -46,7 +46,7 @@ fn test_wal_1_writer_1_reader() -> Result<()> {
|
|||
match rows.step().unwrap() {
|
||||
StepResult::Row => {}
|
||||
StepResult::IO => {
|
||||
tmp_db.lock().unwrap().io.run_once().unwrap();
|
||||
rows.run_once().unwrap();
|
||||
}
|
||||
StepResult::Interrupt => break,
|
||||
StepResult::Done => break,
|
||||
|
@ -86,7 +86,7 @@ fn test_wal_1_writer_1_reader() -> Result<()> {
|
|||
i += 1;
|
||||
}
|
||||
StepResult::IO => {
|
||||
tmp_db.lock().unwrap().io.run_once().unwrap();
|
||||
rows.run_once().unwrap();
|
||||
}
|
||||
StepResult::Interrupt => break,
|
||||
StepResult::Done => break,
|
||||
|
@ -110,11 +110,7 @@ fn test_wal_1_writer_1_reader() -> Result<()> {
|
|||
}
|
||||
|
||||
/// Execute a statement and get strings result
|
||||
pub(crate) fn execute_and_get_strings(
|
||||
tmp_db: &TempDatabase,
|
||||
conn: &Arc<Connection>,
|
||||
sql: &str,
|
||||
) -> Result<Vec<String>> {
|
||||
pub(crate) fn execute_and_get_strings(conn: &Arc<Connection>, sql: &str) -> Result<Vec<String>> {
|
||||
let statement = conn.prepare(sql)?;
|
||||
let stmt = Rc::new(RefCell::new(statement));
|
||||
let mut result = Vec::new();
|
||||
|
@ -130,19 +126,15 @@ pub(crate) fn execute_and_get_strings(
|
|||
}
|
||||
StepResult::Done => break,
|
||||
StepResult::Interrupt => break,
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::Busy => tmp_db.io.run_once()?,
|
||||
StepResult::IO => stmt.run_once()?,
|
||||
StepResult::Busy => stmt.run_once()?,
|
||||
}
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Execute a statement and get integers
|
||||
pub(crate) fn execute_and_get_ints(
|
||||
tmp_db: &TempDatabase,
|
||||
conn: &Arc<Connection>,
|
||||
sql: &str,
|
||||
) -> Result<Vec<i64>> {
|
||||
pub(crate) fn execute_and_get_ints(conn: &Arc<Connection>, sql: &str) -> Result<Vec<i64>> {
|
||||
let statement = conn.prepare(sql)?;
|
||||
let stmt = Rc::new(RefCell::new(statement));
|
||||
let mut result = Vec::new();
|
||||
|
@ -166,8 +158,8 @@ pub(crate) fn execute_and_get_ints(
|
|||
}
|
||||
StepResult::Done => break,
|
||||
StepResult::Interrupt => break,
|
||||
StepResult::IO => tmp_db.io.run_once()?,
|
||||
StepResult::Busy => tmp_db.io.run_once()?,
|
||||
StepResult::IO => stmt.run_once()?,
|
||||
StepResult::Busy => stmt.run_once()?,
|
||||
}
|
||||
}
|
||||
Ok(result)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue