limbo/core/statement.rs

522 lines
17 KiB
Rust

use std::{
borrow::Cow,
num::NonZero,
ops::Deref,
sync::{atomic::Ordering, Arc},
task::Waker,
time::Duration,
};
use tracing::{instrument, Level};
use turso_parser::{
ast::{fmt::ToTokens, Cmd},
parser::Parser,
};
use crate::{
parameters,
schema::Trigger,
stats::refresh_analyze_stats,
translate::{self, display::PlanContext, emitter::TransactionMode},
vdbe::{
self,
explain::{EXPLAIN_COLUMNS_TYPE, EXPLAIN_QUERY_PLAN_COLUMNS_TYPE},
},
Instant, LimboError, MvStore, Pager, QueryMode, Result, Value, EXPLAIN_COLUMNS,
EXPLAIN_QUERY_PLAN_COLUMNS,
};
type ProgramExecutionState = vdbe::ProgramExecutionState;
type Row = vdbe::Row;
type StepResult = vdbe::StepResult;
#[derive(Debug)]
struct BusyTimeout {
/// Busy timeout instant
timeout: Instant,
/// Next iteration index for DELAYS
iteration: usize,
}
impl BusyTimeout {
const DELAYS: [std::time::Duration; 12] = [
Duration::from_millis(1),
Duration::from_millis(2),
Duration::from_millis(5),
Duration::from_millis(10),
Duration::from_millis(15),
Duration::from_millis(20),
Duration::from_millis(25),
Duration::from_millis(25),
Duration::from_millis(25),
Duration::from_millis(50),
Duration::from_millis(50),
Duration::from_millis(100),
];
const TOTALS: [std::time::Duration; 12] = [
Duration::from_millis(0),
Duration::from_millis(1),
Duration::from_millis(3),
Duration::from_millis(8),
Duration::from_millis(18),
Duration::from_millis(33),
Duration::from_millis(53),
Duration::from_millis(78),
Duration::from_millis(103),
Duration::from_millis(128),
Duration::from_millis(178),
Duration::from_millis(228),
];
pub fn new(now: Instant) -> Self {
Self {
timeout: now,
iteration: 0,
}
}
// implementation of sqliteDefaultBusyCallback
pub fn busy_callback(&mut self, now: Instant, max_duration: Duration) {
let idx = self.iteration.min(11);
let mut delay = Self::DELAYS[idx];
let mut prior = Self::TOTALS[idx];
if self.iteration >= 12 {
prior += delay * (self.iteration as u32 - 11);
}
if prior + delay > max_duration {
delay = max_duration.saturating_sub(prior);
// no more waiting after this
if delay.is_zero() {
return;
}
}
self.iteration = self.iteration.saturating_add(1);
self.timeout = now + delay;
}
}
pub struct Statement {
pub(crate) program: vdbe::Program,
state: vdbe::ProgramState,
pager: Arc<Pager>,
/// Whether the statement accesses the database.
/// Used to determine whether we need to check for schema changes when
/// starting a transaction.
accesses_db: bool,
/// indicates if the statement is a NORMAL/EXPLAIN/EXPLAIN QUERY PLAN
query_mode: QueryMode,
/// Flag to show if the statement was busy
busy: bool,
/// Busy timeout instant
/// We need Option here because `io.now()` is not a cheap call
busy_timeout: Option<BusyTimeout>,
}
impl std::fmt::Debug for Statement {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Statement").finish()
}
}
impl Drop for Statement {
fn drop(&mut self) {
self.reset();
}
}
impl Statement {
pub fn new(program: vdbe::Program, pager: Arc<Pager>, query_mode: QueryMode) -> Self {
let accesses_db = program.accesses_db;
let (max_registers, cursor_count) = match query_mode {
QueryMode::Normal => (program.max_registers, program.cursor_ref.len()),
QueryMode::Explain => (EXPLAIN_COLUMNS.len(), 0),
QueryMode::ExplainQueryPlan => (EXPLAIN_QUERY_PLAN_COLUMNS.len(), 0),
};
let state = vdbe::ProgramState::new(max_registers, cursor_count);
Self {
program,
state,
pager,
accesses_db,
query_mode,
busy: false,
busy_timeout: None,
}
}
pub fn get_trigger(&self) -> Option<Arc<Trigger>> {
self.program.trigger.clone()
}
pub fn get_query_mode(&self) -> QueryMode {
self.query_mode
}
pub fn n_change(&self) -> i64 {
self.program
.n_change
.load(std::sync::atomic::Ordering::SeqCst)
}
pub fn set_mv_tx(&mut self, mv_tx: Option<(u64, TransactionMode)>) {
*self.program.connection.mv_tx.write() = mv_tx;
}
pub fn interrupt(&mut self) {
self.state.interrupt();
}
pub fn execution_state(&self) -> ProgramExecutionState {
self.state.execution_state
}
pub fn mv_store(&self) -> impl Deref<Target = Option<Arc<MvStore>>> {
self.program.connection.mv_store()
}
fn _step(&mut self, waker: Option<&Waker>) -> Result<StepResult> {
if let Some(busy_timeout) = self.busy_timeout.as_ref() {
if self.pager.io.now() < busy_timeout.timeout {
// Yield the query as the timeout has not been reached yet
if let Some(waker) = waker {
waker.wake_by_ref();
}
return Ok(StepResult::IO);
}
}
let mut res = if !self.accesses_db {
self.program
.step(&mut self.state, self.pager.clone(), self.query_mode, waker)
} else {
const MAX_SCHEMA_RETRY: usize = 50;
let mut res =
self.program
.step(&mut self.state, self.pager.clone(), self.query_mode, waker);
for attempt in 0..MAX_SCHEMA_RETRY {
// Only reprepare if we still need to update schema
if !matches!(res, Err(LimboError::SchemaUpdated)) {
break;
}
tracing::debug!("reprepare: attempt={}", attempt);
self.reprepare()?;
res =
self.program
.step(&mut self.state, self.pager.clone(), self.query_mode, waker);
}
res
};
// Aggregate metrics when statement completes
if matches!(res, Ok(StepResult::Done)) {
let mut conn_metrics = self.program.connection.metrics.write();
conn_metrics.record_statement(self.state.metrics.clone());
self.busy = false;
drop(conn_metrics);
// After ANALYZE completes, refresh in-memory stats so planners can use them.
let sql = self.program.sql.trim_start();
if sql.to_ascii_uppercase().starts_with("ANALYZE") {
refresh_analyze_stats(&self.program.connection);
}
} else {
self.busy = true;
}
if matches!(res, Ok(StepResult::Busy)) {
let now = self.pager.io.now();
let max_duration = *self.program.connection.busy_timeout.read();
self.busy_timeout = match self.busy_timeout.take() {
None => {
let mut result = BusyTimeout::new(now);
result.busy_callback(now, max_duration);
Some(result)
}
Some(mut bt) => {
bt.busy_callback(now, max_duration);
Some(bt)
}
};
if now < self.busy_timeout.as_ref().unwrap().timeout {
if let Some(waker) = waker {
waker.wake_by_ref();
}
res = Ok(StepResult::IO);
}
}
res
}
pub fn step(&mut self) -> Result<StepResult> {
self._step(None)
}
pub fn step_with_waker(&mut self, waker: &Waker) -> Result<StepResult> {
self._step(Some(waker))
}
pub fn run_ignore_rows(&mut self) -> Result<()> {
loop {
match self.step()? {
vdbe::StepResult::Done => return Ok(()),
vdbe::StepResult::IO => self.pager.io.step()?,
vdbe::StepResult::Row => continue,
vdbe::StepResult::Interrupt | vdbe::StepResult::Busy => {
return Err(LimboError::Busy)
}
}
}
}
pub fn run_collect_rows(&mut self) -> Result<Vec<Vec<Value>>> {
let mut values = Vec::new();
loop {
match self.step()? {
vdbe::StepResult::Done => return Ok(values),
vdbe::StepResult::IO => self.pager.io.step()?,
vdbe::StepResult::Row => {
values.push(self.row().unwrap().get_values().cloned().collect());
continue;
}
vdbe::StepResult::Interrupt | vdbe::StepResult::Busy => {
return Err(LimboError::Busy)
}
}
}
}
/// Blocks execution, advances IO, and runs to completion of the statement
pub fn run_with_row_callback(
&mut self,
mut func: impl FnMut(&Row) -> Result<()>,
) -> Result<()> {
loop {
match self.step()? {
vdbe::StepResult::Done => break,
vdbe::StepResult::IO => self.pager.io.step()?,
vdbe::StepResult::Row => {
func(self.row().expect("row should be present"))?;
}
vdbe::StepResult::Interrupt => return Err(LimboError::Interrupt),
vdbe::StepResult::Busy => return Err(LimboError::Busy),
}
}
Ok(())
}
/// Blocks execution, advances IO, and stops at any StepResult except IO
/// You can optionally pass a handler to run after IO is advanced
pub fn run_one_step_blocking(
&mut self,
mut pre_io_func: impl FnMut() -> Result<()>,
mut post_io_func: impl FnMut() -> Result<()>,
) -> Result<Option<&Row>> {
let result = loop {
match self.step()? {
vdbe::StepResult::Done => break None,
vdbe::StepResult::IO => {
pre_io_func()?;
self.pager.io.step()?;
post_io_func()?;
}
vdbe::StepResult::Row => break Some(self.row().expect("row should be present")),
vdbe::StepResult::Interrupt => return Err(LimboError::Interrupt),
vdbe::StepResult::Busy => return Err(LimboError::Busy),
}
};
Ok(result)
}
#[instrument(skip_all, level = Level::DEBUG)]
fn reprepare(&mut self) -> Result<()> {
tracing::trace!("repreparing statement");
let conn = self.program.connection.clone();
*conn.schema.write() = conn.db.clone_schema();
self.program = {
let mut parser = Parser::new(self.program.sql.as_bytes());
let cmd = parser.next_cmd()?;
let cmd = cmd.expect("Same SQL string should be able to be parsed");
let syms = conn.syms.read();
let mode = self.query_mode;
debug_assert_eq!(QueryMode::new(&cmd), mode,);
let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd;
translate::translate(
conn.schema.read().deref(),
stmt,
self.pager.clone(),
conn.clone(),
&syms,
mode,
&self.program.sql,
)?
};
// Save parameters before they are reset
let parameters = std::mem::take(&mut self.state.parameters);
let (max_registers, cursor_count) = match self.query_mode {
QueryMode::Normal => (self.program.max_registers, self.program.cursor_ref.len()),
QueryMode::Explain => (EXPLAIN_COLUMNS.len(), 0),
QueryMode::ExplainQueryPlan => (EXPLAIN_QUERY_PLAN_COLUMNS.len(), 0),
};
self.reset_internal(Some(max_registers), Some(cursor_count));
// Load the parameters back into the state
self.state.parameters = parameters;
Ok(())
}
pub fn num_columns(&self) -> usize {
match self.query_mode {
QueryMode::Normal => self.program.result_columns.len(),
QueryMode::Explain => EXPLAIN_COLUMNS.len(),
QueryMode::ExplainQueryPlan => EXPLAIN_QUERY_PLAN_COLUMNS.len(),
}
}
pub fn get_column_name(&self, idx: usize) -> Cow<'_, str> {
if self.query_mode == QueryMode::Explain {
return Cow::Owned(EXPLAIN_COLUMNS.get(idx).expect("No column").to_string());
}
if self.query_mode == QueryMode::ExplainQueryPlan {
return Cow::Owned(
EXPLAIN_QUERY_PLAN_COLUMNS
.get(idx)
.expect("No column")
.to_string(),
);
}
match self.query_mode {
QueryMode::Normal => {
let column = &self.program.result_columns.get(idx).expect("No column");
match column.name(&self.program.table_references) {
Some(name) => Cow::Borrowed(name),
None => {
let tables = [&self.program.table_references];
let ctx = PlanContext(&tables);
Cow::Owned(column.expr.displayer(&ctx).to_string())
}
}
}
QueryMode::Explain => Cow::Borrowed(EXPLAIN_COLUMNS[idx]),
QueryMode::ExplainQueryPlan => Cow::Borrowed(EXPLAIN_QUERY_PLAN_COLUMNS[idx]),
}
}
pub fn get_column_table_name(&self, idx: usize) -> Option<Cow<'_, str>> {
if self.query_mode == QueryMode::Explain || self.query_mode == QueryMode::ExplainQueryPlan {
return None;
}
let column = &self.program.result_columns.get(idx).expect("No column");
match &column.expr {
turso_parser::ast::Expr::Column { table, .. } => self
.program
.table_references
.find_table_by_internal_id(*table)
.map(|(_, table_ref)| Cow::Borrowed(table_ref.get_name())),
_ => None,
}
}
pub fn get_column_type(&self, idx: usize) -> Option<String> {
if self.query_mode == QueryMode::Explain {
return Some(
EXPLAIN_COLUMNS_TYPE
.get(idx)
.expect("No column")
.to_string(),
);
}
if self.query_mode == QueryMode::ExplainQueryPlan {
return Some(
EXPLAIN_QUERY_PLAN_COLUMNS_TYPE
.get(idx)
.expect("No column")
.to_string(),
);
}
let column = &self.program.result_columns.get(idx).expect("No column");
match &column.expr {
turso_parser::ast::Expr::Column {
table,
column: column_idx,
..
} => {
let (_, table_ref) = self
.program
.table_references
.find_table_by_internal_id(*table)?;
let table_column = table_ref.get_column_at(*column_idx)?;
match &table_column.ty() {
crate::schema::Type::Integer => Some("INTEGER".to_string()),
crate::schema::Type::Real => Some("REAL".to_string()),
crate::schema::Type::Text => Some("TEXT".to_string()),
crate::schema::Type::Blob => Some("BLOB".to_string()),
crate::schema::Type::Numeric => Some("NUMERIC".to_string()),
crate::schema::Type::Null => None,
}
}
_ => None,
}
}
pub fn parameters(&self) -> &parameters::Parameters {
&self.program.parameters
}
pub fn parameters_count(&self) -> usize {
self.program.parameters.count()
}
pub fn parameter_index(&self, name: &str) -> Option<NonZero<usize>> {
self.program.parameters.index(name)
}
pub fn bind_at(&mut self, index: NonZero<usize>, value: Value) {
self.state.bind_at(index, value);
}
pub fn clear_bindings(&mut self) {
self.state.clear_bindings();
}
pub fn reset(&mut self) {
self.reset_internal(None, None);
}
fn reset_internal(&mut self, max_registers: Option<usize>, max_cursors: Option<usize>) {
// as abort uses auto_txn_cleanup value - it needs to be called before state.reset
self.program.abort(&self.pager, None, &mut self.state);
self.state.reset(max_registers, max_cursors);
self.program.n_change.store(0, Ordering::SeqCst);
self.busy = false;
self.busy_timeout = None;
}
pub fn row(&self) -> Option<&Row> {
self.state.result_row.as_ref()
}
pub fn get_sql(&self) -> &str {
&self.program.sql
}
pub fn is_busy(&self) -> bool {
self.busy
}
/// Internal method to get IO from a statement.
/// Used by select internal crate
///
/// Avoid using this method for advancing IO while iteration over `step`.
/// Prefer to use helper methods instead such as [Self::run_with_row_callback]
pub fn _io(&self) -> &dyn crate::IO {
self.pager.io.as_ref()
}
}