Merge 'feature: INSERT INTO <table> SELECT' from Pedro Muniz

Closes #1528 .
- Modified `translate_select` so that the caller can define if the
statement is top-level statement or a subquery.
- Refactored `translate_insert` to offload the translation of multi-row
VALUES and SELECT statements to `translate_select`
- I did not try to change much of `populate_column_registers` as I did
not want to break `translate_virtual_table_insert`. Ideally, I would
want to unite this remaining logic folding `populate_column_registers`
into `populate_columns_multiple_rows` and the
`translate_virtual_table_insert` into `translate_insert`. But, I think
this may be best suited for a separate PR.
## TODO
- ~Tests~ - *Done*
- ~Need to emit a temp table when we are selecting and inserting into
the Same Table -
https://github.com/sqlite/sqlite/blob/master/src/insert.c#L1369~ -
*Done*
- Optimization when table have the exact same schema - open an Issue
about it
- Virtual Tables do not benefit yet from this feature - open an Issue
about it

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #1566
This commit is contained in:
Jussi Saurio 2025-05-27 10:50:26 +03:00
commit b72b99c973
10 changed files with 628 additions and 174 deletions

View file

@ -421,6 +421,7 @@ impl Connection {
&syms,
None,
&mut table_ref_counter,
translate::plan::QueryDestination::ResultRows,
)?;
optimize_plan(
&mut plan,

View file

@ -261,6 +261,16 @@ fn emit_program_for_compound_select(
first_t_ctx.limit_ctx = limit_ctx;
}
let mut registers_subqery = None;
let yield_reg = match first.query_destination {
QueryDestination::CoroutineYield { yield_reg, .. } => {
registers_subqery = Some(program.alloc_registers(first.result_columns.len()));
first_t_ctx.reg_result_cols_start = registers_subqery.clone();
Some(yield_reg)
}
_ => None,
};
let mut union_dedupe_index = if requires_union_deduplication {
let dedupe_index = get_union_dedupe_index(program, &first);
first.query_destination = QueryDestination::EphemeralIndex {
@ -320,8 +330,16 @@ fn emit_program_for_compound_select(
dedupe_index.as_ref(),
limit_ctx,
label_next_select,
yield_reg.clone(),
);
}
if matches!(
select.query_destination,
crate::translate::plan::QueryDestination::CoroutineYield { .. }
) {
// Need to reuse the same registers when you are yielding
t_ctx.reg_result_cols_start = registers_subqery.clone();
}
emit_query(program, &mut select, &mut t_ctx)?;
program.preassign_label_to_next_insn(label_next_select);
}
@ -334,6 +352,7 @@ fn emit_program_for_compound_select(
dedupe_index.as_ref(),
limit_ctx,
label_jump_over_dedupe,
yield_reg,
);
program.preassign_label_to_next_insn(label_jump_over_dedupe);
}
@ -390,6 +409,7 @@ fn read_deduplicated_union_rows(
dedupe_index: &Index,
limit_ctx: Option<LimitCtx>,
label_limit_reached: BranchOffset,
yield_reg: Option<usize>,
) {
let label_dedupe_next = program.allocate_label();
let label_dedupe_loop_start = program.allocate_label();
@ -400,16 +420,30 @@ fn read_deduplicated_union_rows(
});
program.preassign_label_to_next_insn(label_dedupe_loop_start);
for col_idx in 0..dedupe_index.columns.len() {
let start_reg = if let Some(yield_reg) = yield_reg {
// Need to reuse the yield_reg for the column being emitted
yield_reg + 1
} else {
dedupe_cols_start_reg
};
program.emit_insn(Insn::Column {
cursor_id: dedupe_cursor_id,
column: col_idx,
dest: dedupe_cols_start_reg + col_idx,
dest: start_reg + col_idx,
});
}
program.emit_insn(Insn::ResultRow {
start_reg: dedupe_cols_start_reg,
count: dedupe_index.columns.len(),
});
if let Some(yield_reg) = yield_reg {
program.emit_insn(Insn::Yield {
yield_reg,
end_offset: BranchOffset::Offset(0),
});
} else {
program.emit_insn(Insn::ResultRow {
start_reg: dedupe_cols_start_reg,
count: dedupe_index.columns.len(),
});
}
if let Some(limit_ctx) = limit_ctx {
program.emit_insn(Insn::DecrJumpZero {
reg: limit_ctx.reg_limit,
@ -512,7 +546,9 @@ pub fn emit_query<'a>(
}
// Allocate registers for result columns
t_ctx.reg_result_cols_start = Some(program.alloc_registers(plan.result_columns.len()));
if t_ctx.reg_result_cols_start.is_none() {
t_ctx.reg_result_cols_start = Some(program.alloc_registers(plan.result_columns.len()));
}
// Initialize cursors and other resources needed for query execution
if let Some(ref mut order_by) = plan.order_by {

View file

@ -1,4 +1,3 @@
use std::ops::Deref;
use std::rc::Rc;
use limbo_sqlite3_parser::ast::{
@ -10,6 +9,7 @@ use crate::schema::{IndexColumn, Table};
use crate::util::normalize_ident;
use crate::vdbe::builder::{ProgramBuilderOpts, QueryMode};
use crate::vdbe::insn::{IdxInsertFlags, RegisterOrLiteral};
use crate::vdbe::BranchOffset;
use crate::{
schema::{Column, Schema},
vdbe::{
@ -20,19 +20,27 @@ use crate::{
use crate::{Result, SymbolTable, VirtualTable};
use super::emitter::Resolver;
use super::expr::{translate_expr_no_constant_opt, NoConstantOptReason};
use super::expr::{translate_expr, translate_expr_no_constant_opt, NoConstantOptReason};
use super::optimizer::rewrite_expr;
use super::plan::QueryDestination;
use super::select::translate_select;
struct TempTableCtx {
cursor_id: usize,
loop_start_label: BranchOffset,
loop_end_label: BranchOffset,
}
#[allow(clippy::too_many_arguments)]
pub fn translate_insert(
query_mode: QueryMode,
schema: &Schema,
with: &Option<With>,
on_conflict: &Option<ResolveType>,
tbl_name: &QualifiedName,
columns: &Option<DistinctNames>,
body: &mut InsertBody,
_returning: &Option<Vec<ResultColumn>>,
with: Option<With>,
on_conflict: Option<ResolveType>,
tbl_name: QualifiedName,
columns: Option<DistinctNames>,
mut body: InsertBody,
_returning: Option<Vec<ResultColumn>>,
syms: &SymbolTable,
mut program: ProgramBuilder,
) -> Result<ProgramBuilder> {
@ -59,8 +67,8 @@ pub fn translate_insert(
let resolver = Resolver::new(syms);
if let Some(virtual_table) = &table.virtual_table() {
translate_virtual_table_insert(
&mut program,
program = translate_virtual_table_insert(
program,
virtual_table.clone(),
columns,
body,
@ -78,10 +86,187 @@ pub fn translate_insert(
crate::bail_parse_error!("INSERT into WITHOUT ROWID table is not supported");
}
let cursor_id = program.alloc_cursor_id(
Some(table_name.0.clone()),
CursorType::BTreeTable(btree_table.clone()),
);
let root_page = btree_table.root_page;
let mut values: Option<Vec<Expr>> = None;
let inserting_multiple_rows = match &mut body {
InsertBody::Select(select, _) => match select.body.select.as_mut() {
// TODO see how to avoid clone
OneSelect::Values(values_expr) if values_expr.len() <= 1 => {
if values_expr.is_empty() {
crate::bail_parse_error!("no values to insert");
}
let mut param_idx = 1;
for expr in values_expr.iter_mut().flat_map(|v| v.iter_mut()) {
rewrite_expr(expr, &mut param_idx)?;
}
values = values_expr.pop();
false
}
_ => true,
},
InsertBody::DefaultValues => false,
};
let halt_label = program.allocate_label();
let loop_start_label = program.allocate_label();
let mut yield_reg_opt = None;
let mut temp_table_ctx = None;
let (num_values, cursor_id) = match body {
// TODO: upsert
InsertBody::Select(select, _) => {
// Simple Common case of INSERT INTO <table> VALUES (...)
if matches!(select.body.select.as_ref(), OneSelect::Values(values) if values.len() <= 1)
{
(
values.as_ref().unwrap().len(),
program.alloc_cursor_id(
Some(table_name.0.clone()),
CursorType::BTreeTable(btree_table.clone()),
),
)
} else {
// Multiple rows - use coroutine for value population
let yield_reg = program.alloc_register();
let jump_on_definition_label = program.allocate_label();
let start_offset_label = program.allocate_label();
program.emit_insn(Insn::InitCoroutine {
yield_reg,
jump_on_definition: jump_on_definition_label,
start_offset: start_offset_label,
});
program.preassign_label_to_next_insn(start_offset_label);
let query_destination = QueryDestination::CoroutineYield {
yield_reg,
coroutine_implementation_start: halt_label,
};
program.incr_nesting();
let result = translate_select(
query_mode,
schema,
*select,
syms,
program,
query_destination,
)?;
program = result.program;
program.decr_nesting();
program.emit_insn(Insn::EndCoroutine { yield_reg });
program.preassign_label_to_next_insn(jump_on_definition_label);
// Have to allocate the cursor here to avoid having `init_loop` inside `translate_select` selecting the incorrect
// cursor_id
let cursor_id = program.alloc_cursor_id(
Some(table_name.0.clone()),
CursorType::BTreeTable(btree_table.clone()),
);
// From SQLite
/* Set useTempTable to TRUE if the result of the SELECT statement
** should be written into a temporary table (template 4). Set to
** FALSE if each output row of the SELECT can be written directly into
** the destination table (template 3).
**
** A temp table must be used if the table being updated is also one
** of the tables being read by the SELECT statement. Also use a
** temp table in the case of row triggers.
*/
if program.is_table_open(&table, schema) {
let temp_cursor_id = program.alloc_cursor_id(
Some("temp table".to_string()),
CursorType::BTreeTable(btree_table.clone()),
);
temp_table_ctx = Some(TempTableCtx {
cursor_id: temp_cursor_id,
loop_start_label: program.allocate_label(),
loop_end_label: program.allocate_label(),
});
program.emit_insn(Insn::OpenEphemeral {
cursor_id: temp_cursor_id,
is_table: true,
});
// Main loop
// FIXME: rollback is not implemented. E.g. if you insert 2 rows and one fails to unique constraint violation,
// the other row will still be inserted.
program.preassign_label_to_next_insn(loop_start_label);
let yield_label = program.allocate_label();
program.emit_insn(Insn::Yield {
yield_reg,
end_offset: yield_label,
});
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: yield_reg + 1,
count: result.num_result_cols,
dest_reg: record_reg,
index_name: None,
});
let rowid_reg = program.alloc_register();
program.emit_insn(Insn::NewRowid {
cursor: temp_cursor_id,
rowid_reg,
prev_largest_reg: 0,
});
program.emit_insn(Insn::Insert {
cursor: temp_cursor_id,
key_reg: rowid_reg,
record_reg,
flag: 0,
table_name: "".to_string(),
});
// loop back
program.emit_insn(Insn::Goto {
target_pc: loop_start_label,
});
program.preassign_label_to_next_insn(yield_label);
program.emit_insn(Insn::OpenWrite {
cursor_id,
root_page: RegisterOrLiteral::Literal(root_page),
name: table_name.0.clone(),
});
} else {
program.emit_insn(Insn::OpenWrite {
cursor_id,
root_page: RegisterOrLiteral::Literal(root_page),
name: table_name.0.clone(),
});
// Main loop
// FIXME: rollback is not implemented. E.g. if you insert 2 rows and one fails to unique constraint violation,
// the other row will still be inserted.
program.preassign_label_to_next_insn(loop_start_label);
program.emit_insn(Insn::Yield {
yield_reg,
end_offset: halt_label,
});
}
yield_reg_opt = Some(yield_reg);
(result.num_result_cols, cursor_id)
}
}
InsertBody::DefaultValues => (
0,
program.alloc_cursor_id(
Some(table_name.0.clone()),
CursorType::BTreeTable(btree_table.clone()),
),
),
};
// allocate cursor id's for each btree index cursor we'll need to populate the indexes
// (idx name, root_page, idx cursor id)
let idx_cursors = schema
@ -98,21 +283,8 @@ pub fn translate_insert(
)
})
.collect::<Vec<(&String, usize, usize)>>();
let root_page = btree_table.root_page;
let values = match body {
InsertBody::Select(ref mut select, _) => match select.body.select.as_mut() {
OneSelect::Values(ref mut values) => values,
_ => todo!(),
},
InsertBody::DefaultValues => &mut vec![vec![]],
};
let mut param_idx = 1;
for expr in values.iter_mut().flat_map(|v| v.iter_mut()) {
rewrite_expr(expr, &mut param_idx)?;
}
let column_mappings = resolve_columns_for_insert(&table, columns, values)?;
let index_col_mappings = resolve_indicies_for_insert(schema, table.as_ref(), &column_mappings)?;
let column_mappings = resolve_columns_for_insert(&table, &columns, num_values)?;
// Check if rowid was provided (through INTEGER PRIMARY KEY as a rowid alias)
let rowid_alias_index = btree_table.columns.iter().position(|c| c.is_rowid_alias);
let has_user_provided_rowid = {
@ -138,56 +310,24 @@ pub fn translate_insert(
};
let record_register = program.alloc_register();
let halt_label = program.allocate_label();
let loop_start_label = program.allocate_label();
let inserting_multiple_rows = values.len() > 1;
// Multiple rows - use coroutine for value population
if inserting_multiple_rows {
let yield_reg = program.alloc_register();
let jump_on_definition_label = program.allocate_label();
let start_offset_label = program.allocate_label();
program.emit_insn(Insn::InitCoroutine {
yield_reg,
jump_on_definition: jump_on_definition_label,
start_offset: start_offset_label,
});
program.preassign_label_to_next_insn(start_offset_label);
for value in values.iter() {
populate_column_registers(
&mut program,
value,
&column_mappings,
column_registers_start,
true,
rowid_reg,
&resolver,
)?;
program.emit_insn(Insn::Yield {
yield_reg,
end_offset: halt_label,
if let Some(ref temp_table_ctx) = temp_table_ctx {
// Rewind loop to read from ephemeral table
program.emit_insn(Insn::Rewind {
cursor_id: temp_table_ctx.cursor_id,
pc_if_empty: temp_table_ctx.loop_end_label,
});
program.preassign_label_to_next_insn(temp_table_ctx.loop_start_label);
}
program.emit_insn(Insn::EndCoroutine { yield_reg });
program.preassign_label_to_next_insn(jump_on_definition_label);
program.emit_insn(Insn::OpenWrite {
cursor_id,
root_page: RegisterOrLiteral::Literal(root_page),
name: table_name.0.clone(),
});
// Main loop
// FIXME: rollback is not implemented. E.g. if you insert 2 rows and one fails to unique constraint violation,
// the other row will still be inserted.
program.resolve_label(loop_start_label, program.offset());
program.emit_insn(Insn::Yield {
yield_reg,
end_offset: halt_label,
});
populate_columns_multiple_rows(
&mut program,
&column_mappings,
column_registers_start,
yield_reg_opt.unwrap() + 1,
&resolver,
&temp_table_ctx,
)?;
} else {
// Single row - populate registers directly
program.emit_insn(Insn::OpenWrite {
@ -198,10 +338,9 @@ pub fn translate_insert(
populate_column_registers(
&mut program,
&values[0],
&values.unwrap(),
&column_mappings,
column_registers_start,
false,
rowid_reg,
&resolver,
)?;
@ -292,6 +431,7 @@ pub fn translate_insert(
_ => (),
}
let index_col_mappings = resolve_indicies_for_insert(schema, table.as_ref(), &column_mappings)?;
for index_col_mapping in index_col_mappings {
// find which cursor we opened earlier for this index
let idx_cursor_id = idx_cursors
@ -400,10 +540,22 @@ pub fn translate_insert(
});
if inserting_multiple_rows {
// For multiple rows, loop back
program.emit_insn(Insn::Goto {
target_pc: loop_start_label,
});
if let Some(temp_table_ctx) = temp_table_ctx {
program.emit_insn(Insn::Next {
cursor_id: temp_table_ctx.cursor_id,
pc_if_next: temp_table_ctx.loop_start_label,
});
program.preassign_label_to_next_insn(temp_table_ctx.loop_end_label);
program.emit_insn(Insn::Close {
cursor_id: temp_table_ctx.cursor_id,
});
} else {
// For multiple rows which not require a temp table, loop back
program.emit_insn(Insn::Goto {
target_pc: loop_start_label,
});
}
}
program.resolve_label(halt_label, program.offset());
@ -444,17 +596,12 @@ struct ColumnMapping<'a> {
fn resolve_columns_for_insert<'a>(
table: &'a Table,
columns: &Option<DistinctNames>,
values: &[Vec<Expr>],
num_values: usize,
) -> Result<Vec<ColumnMapping<'a>>> {
if values.is_empty() {
crate::bail_parse_error!("no values to insert");
}
let table_columns = &table.columns();
let table_columns = table.columns();
// Case 1: No columns specified - map values to columns in order
if columns.is_none() {
let num_values = values[0].len();
if num_values > table_columns.len() {
if num_values != table_columns.len() {
crate::bail_parse_error!(
"table {} has {} columns but {} values were supplied",
&table.get_name(),
@ -463,13 +610,6 @@ fn resolve_columns_for_insert<'a>(
);
}
// Verify all value tuples have same length
for value in values.iter().skip(1) {
if value.len() != num_values {
crate::bail_parse_error!("all VALUES must have the same number of terms");
}
}
// Map each column to either its corresponding value index or None
return Ok(table_columns
.iter()
@ -578,6 +718,62 @@ fn resolve_indicies_for_insert(
Ok(index_col_mappings)
}
fn populate_columns_multiple_rows(
program: &mut ProgramBuilder,
column_mappings: &[ColumnMapping],
column_registers_start: usize,
yield_reg: usize,
resolver: &Resolver,
temp_table_ctx: &Option<TempTableCtx>,
) -> Result<()> {
let mut value_index_seen = 0;
let mut other_values_seen = 0;
for (i, mapping) in column_mappings.iter().enumerate() {
let target_reg = column_registers_start + i;
other_values_seen += 1;
if let Some(value_index) = mapping.value_index {
// Decrement as we have now seen a value index instead
other_values_seen -= 1;
if let Some(temp_table_ctx) = temp_table_ctx {
program.emit_insn(Insn::Column {
cursor_id: temp_table_ctx.cursor_id,
column: value_index_seen,
dest: column_registers_start + i,
});
} else {
program.emit_insn(Insn::Copy {
src_reg: yield_reg + value_index_seen,
dst_reg: column_registers_start + value_index + other_values_seen,
amount: 0,
});
}
value_index_seen += 1;
} else if mapping.column.is_rowid_alias {
program.emit_insn(Insn::SoftNull { reg: target_reg });
} else if let Some(default_expr) = mapping.default_value {
translate_expr(program, None, default_expr, target_reg, resolver)?;
} else {
// Column was not specified as has no DEFAULT - use NULL if it is nullable, otherwise error
// Rowid alias columns can be NULL because we will autogenerate a rowid in that case.
let is_nullable = !mapping.column.primary_key || mapping.column.is_rowid_alias;
if is_nullable {
program.emit_insn(Insn::Null {
dest: target_reg,
dest_end: None,
});
} else {
crate::bail_parse_error!(
"column {} is not nullable",
mapping.column.name.as_ref().expect("column name is None")
);
}
}
}
Ok(())
}
/// Populates the column registers with values for a single row
#[allow(clippy::too_many_arguments)]
fn populate_column_registers(
@ -585,7 +781,6 @@ fn populate_column_registers(
value: &[Expr],
column_mappings: &[ColumnMapping],
column_registers_start: usize,
inserting_multiple_rows: bool,
rowid_reg: usize,
resolver: &Resolver,
) -> Result<()> {
@ -595,10 +790,8 @@ fn populate_column_registers(
// Column has a value in the VALUES tuple
if let Some(value_index) = mapping.value_index {
// When inserting a single row, SQLite writes the value provided for the rowid alias column (INTEGER PRIMARY KEY)
// directly into the rowid register and writes a NULL into the rowid alias column. Not sure why this only happens
// in the single row case, but let's copy it.
let write_directly_to_rowid_reg =
mapping.column.is_rowid_alias && !inserting_multiple_rows;
// directly into the rowid register and writes a NULL into the rowid alias column.
let write_directly_to_rowid_reg = mapping.column.is_rowid_alias;
let reg = if write_directly_to_rowid_reg {
rowid_reg
} else {
@ -645,24 +838,25 @@ fn populate_column_registers(
Ok(())
}
// TODO: comeback here later to apply the same improvements on select
fn translate_virtual_table_insert(
program: &mut ProgramBuilder,
mut program: ProgramBuilder,
virtual_table: Rc<VirtualTable>,
columns: &Option<DistinctNames>,
body: &InsertBody,
on_conflict: &Option<ResolveType>,
columns: Option<DistinctNames>,
mut body: InsertBody,
on_conflict: Option<ResolveType>,
resolver: &Resolver,
) -> Result<()> {
let values = match body {
InsertBody::Select(select, None) => match &select.body.select.deref() {
OneSelect::Values(values) => values,
) -> Result<ProgramBuilder> {
let (num_values, value) = match &mut body {
InsertBody::Select(select, None) => match select.body.select.as_mut() {
OneSelect::Values(values) => (values[0].len(), values.pop().unwrap()),
_ => crate::bail_parse_error!("Virtual tables only support VALUES clause in INSERT"),
},
InsertBody::DefaultValues => &vec![],
InsertBody::DefaultValues => (0, vec![]),
_ => crate::bail_parse_error!("Unsupported INSERT body for virtual tables"),
};
let table = Table::Virtual(virtual_table.clone());
let column_mappings = resolve_columns_for_insert(&table, columns, values)?;
let column_mappings = resolve_columns_for_insert(&table, &columns, num_values)?;
let registers_start = program.alloc_registers(2);
/* *
@ -679,11 +873,10 @@ fn translate_virtual_table_insert(
let values_reg = program.alloc_registers(column_mappings.len());
populate_column_registers(
program,
&values[0],
&mut program,
&value,
&column_mappings,
values_reg,
false,
registers_start,
resolver,
)?;
@ -704,5 +897,5 @@ fn translate_virtual_table_insert(
let halt_label = program.allocate_label();
program.resolve_label(halt_label, program.offset());
Ok(())
Ok(program)
}

View file

@ -232,7 +232,17 @@ pub fn translate_inner(
ast::Stmt::Release(_) => bail_parse_error!("RELEASE not supported yet"),
ast::Stmt::Rollback { .. } => bail_parse_error!("ROLLBACK not supported yet"),
ast::Stmt::Savepoint(_) => bail_parse_error!("SAVEPOINT not supported yet"),
ast::Stmt::Select(select) => translate_select(query_mode, schema, *select, syms, program)?,
ast::Stmt::Select(select) => {
translate_select(
query_mode,
schema,
*select,
syms,
program,
plan::QueryDestination::ResultRows,
)?
.program
}
ast::Stmt::Update(mut update) => translate_update(
query_mode,
schema,
@ -248,18 +258,18 @@ pub fn translate_inner(
or_conflict,
tbl_name,
columns,
mut body,
body,
returning,
} = *insert;
translate_insert(
query_mode,
schema,
&with,
&or_conflict,
&tbl_name,
&columns,
&mut body,
&returning,
with,
or_conflict,
tbl_name,
columns,
body,
returning,
syms,
program,
)?

View file

@ -293,15 +293,21 @@ fn parse_from_clause_table<'a>(
crate::bail_parse_error!("Table {} not found", normalized_qualified_name);
}
ast::SelectTable::Select(subselect, maybe_alias) => {
let Plan::Select(mut subplan) =
prepare_select_plan(schema, *subselect, syms, Some(scope), table_ref_counter)?
else {
crate::bail_parse_error!("Only non-compound SELECT queries are currently supported in FROM clause subqueries");
};
subplan.query_destination = QueryDestination::CoroutineYield {
let query_destination = QueryDestination::CoroutineYield {
yield_reg: usize::MAX, // will be set later in bytecode emission
coroutine_implementation_start: BranchOffset::Placeholder, // will be set later in bytecode emission
};
let Plan::Select(subplan) = prepare_select_plan(
schema,
*subselect,
syms,
Some(scope),
table_ref_counter,
query_destination,
)?
else {
crate::bail_parse_error!("Only non-compound SELECT queries are currently supported in FROM clause subqueries");
};
let cur_table_index = scope.tables.len();
let identifier = maybe_alias
.map(|a| match a {
@ -449,16 +455,23 @@ pub fn parse_from<'a>(
crate::bail_parse_error!("duplicate WITH table name {}", cte.tbl_name.0);
}
// CTE can refer to other CTEs that came before it, plus any schema tables or tables in the outer scope.
let cte_plan =
prepare_select_plan(schema, *cte.select, syms, Some(&scope), table_ref_counter)?;
let Plan::Select(mut cte_plan) = cte_plan else {
crate::bail_parse_error!("Only SELECT queries are currently supported in CTEs");
};
cte_plan.query_destination = QueryDestination::CoroutineYield {
// CTE can be rewritten as a subquery.
let query_destination = QueryDestination::CoroutineYield {
yield_reg: usize::MAX, // will be set later in bytecode emission
coroutine_implementation_start: BranchOffset::Placeholder, // will be set later in bytecode emission
};
// CTE can refer to other CTEs that came before it, plus any schema tables or tables in the outer scope.
let cte_plan = prepare_select_plan(
schema,
*cte.select,
syms,
Some(&scope),
table_ref_counter,
query_destination,
)?;
let Plan::Select(cte_plan) = cte_plan else {
crate::bail_parse_error!("Only SELECT queries are currently supported in CTEs");
};
scope.ctes.push(Cte {
name: cte_name_normalized,
plan: cte_plan,

View file

@ -19,52 +19,71 @@ use crate::{schema::Schema, vdbe::builder::ProgramBuilder, Result};
use limbo_sqlite3_parser::ast::{self, CompoundSelect, SortOrder};
use limbo_sqlite3_parser::ast::{ResultColumn, SelectInner};
pub struct TranslateSelectResult {
pub program: ProgramBuilder,
pub num_result_cols: usize,
}
pub fn translate_select(
query_mode: QueryMode,
schema: &Schema,
select: ast::Select,
syms: &SymbolTable,
mut program: ProgramBuilder,
) -> Result<ProgramBuilder> {
query_destination: QueryDestination,
) -> Result<TranslateSelectResult> {
let mut select_plan = prepare_select_plan(
schema,
select,
syms,
None,
&mut program.table_reference_counter,
query_destination,
)?;
optimize_plan(&mut select_plan, schema)?;
let num_result_cols;
let opts = match &select_plan {
Plan::Select(select) => ProgramBuilderOpts {
query_mode,
num_cursors: count_plan_required_cursors(select),
approx_num_insns: estimate_num_instructions(select),
approx_num_labels: estimate_num_labels(select),
},
Plan::CompoundSelect { first, rest, .. } => ProgramBuilderOpts {
query_mode,
num_cursors: count_plan_required_cursors(first)
+ rest
.iter()
.map(|(plan, _)| count_plan_required_cursors(plan))
.sum::<usize>(),
approx_num_insns: estimate_num_instructions(first)
+ rest
.iter()
.map(|(plan, _)| estimate_num_instructions(plan))
.sum::<usize>(),
approx_num_labels: estimate_num_labels(first)
+ rest
.iter()
.map(|(plan, _)| estimate_num_labels(plan))
.sum::<usize>(),
},
Plan::Select(select) => {
num_result_cols = select.result_columns.len();
ProgramBuilderOpts {
query_mode,
num_cursors: count_plan_required_cursors(select),
approx_num_insns: estimate_num_instructions(select),
approx_num_labels: estimate_num_labels(select),
}
}
Plan::CompoundSelect { first, rest, .. } => {
// Compound Selects must return the same number of columns
num_result_cols = first.result_columns.len();
ProgramBuilderOpts {
query_mode,
num_cursors: count_plan_required_cursors(first)
+ rest
.iter()
.map(|(plan, _)| count_plan_required_cursors(plan))
.sum::<usize>(),
approx_num_insns: estimate_num_instructions(first)
+ rest
.iter()
.map(|(plan, _)| estimate_num_instructions(plan))
.sum::<usize>(),
approx_num_labels: estimate_num_labels(first)
+ rest
.iter()
.map(|(plan, _)| estimate_num_labels(plan))
.sum::<usize>(),
}
}
other => panic!("plan is not a SelectPlan: {:?}", other),
};
program.extend(&opts);
emit_program(&mut program, select_plan, syms)?;
Ok(program)
Ok(TranslateSelectResult {
program,
num_result_cols,
})
}
pub fn prepare_select_plan<'a>(
@ -73,6 +92,7 @@ pub fn prepare_select_plan<'a>(
syms: &SymbolTable,
outer_scope: Option<&'a Scope<'a>>,
table_ref_counter: &mut TableRefIdCounter,
query_destination: QueryDestination,
) -> Result<Plan> {
let compounds = select.body.compounds.take();
match compounds {
@ -87,6 +107,7 @@ pub fn prepare_select_plan<'a>(
syms,
outer_scope,
table_ref_counter,
query_destination,
)?))
}
Some(compounds) => {
@ -99,6 +120,7 @@ pub fn prepare_select_plan<'a>(
syms,
outer_scope,
table_ref_counter,
query_destination.clone(),
)?;
let mut rest = Vec::with_capacity(compounds.len());
for CompoundSelect { select, operator } in compounds {
@ -119,6 +141,7 @@ pub fn prepare_select_plan<'a>(
syms,
outer_scope,
table_ref_counter,
query_destination.clone(),
)?;
rest.push((plan, operator));
}
@ -168,6 +191,7 @@ fn prepare_one_select_plan<'a>(
syms: &SymbolTable,
outer_scope: Option<&'a Scope<'a>>,
table_ref_counter: &mut TableRefIdCounter,
query_destination: QueryDestination,
) -> Result<SelectPlan> {
match select {
ast::OneSelect::Select(select_inner) => {
@ -237,7 +261,7 @@ fn prepare_one_select_plan<'a>(
limit: None,
offset: None,
contains_constant_false_condition: false,
query_destination: QueryDestination::ResultRows,
query_destination,
distinctness: Distinctness::from_ast(distinctness.as_ref()),
values: vec![],
};
@ -551,7 +575,7 @@ fn prepare_one_select_plan<'a>(
limit: None,
offset: None,
contains_constant_false_condition: false,
query_destination: QueryDestination::ResultRows,
query_destination,
distinctness: Distinctness::NonDistinct,
values,
};

View file

@ -10,7 +10,7 @@ use limbo_sqlite3_parser::ast::{self, TableInternalId};
use crate::{
fast_lock::SpinLock,
parameters::Parameters,
schema::{BTreeTable, Index, PseudoTable},
schema::{BTreeTable, Index, PseudoTable, Schema, Table},
storage::sqlite3_ondisk::DatabaseHeader,
translate::{
collate::CollationSeq,
@ -37,7 +37,10 @@ impl TableRefIdCounter {
}
}
use super::{BranchOffset, CursorID, Insn, InsnFunction, InsnReference, JumpTarget, Program};
use super::{
insn::RegisterOrLiteral, BranchOffset, CursorID, Insn, InsnFunction, InsnReference, JumpTarget,
Program,
};
#[allow(dead_code)]
pub struct ProgramBuilder {
pub table_reference_counter: TableRefIdCounter,
@ -682,6 +685,66 @@ impl ProgramBuilder {
}
}
/// Checks whether `table` or any of its indices has been opened in the program
pub fn is_table_open(&self, table: &Table, schema: &Schema) -> bool {
let btree = table.btree();
let vtab = table.virtual_table();
for (insn, ..) in self.insns.iter() {
match insn {
Insn::OpenRead {
cursor_id,
root_page,
..
} => {
if let Some(btree) = &btree {
if btree.root_page == *root_page {
return true;
}
}
let name = self.cursor_ref[*cursor_id].0.as_ref();
if name.is_none() {
continue;
}
let name = name.unwrap();
let indices = schema.get_indices(name);
for index in indices {
if index.root_page == *root_page {
return true;
}
}
}
Insn::OpenWrite {
root_page, name, ..
} => {
let RegisterOrLiteral::Literal(root_page) = root_page else {
unreachable!("root page can only be a literal");
};
if let Some(btree) = &btree {
if btree.root_page == *root_page {
return true;
}
}
let indices = schema.get_indices(name);
for index in indices {
if index.root_page == *root_page {
return true;
}
}
}
Insn::VOpen { cursor_id, .. } => {
if let Some(vtab) = &vtab {
let name = self.cursor_ref[*cursor_id].0.as_ref().unwrap();
if vtab.name == *name {
return true;
}
}
}
_ => {}
}
}
false
}
pub fn build(
mut self,
database_header: Arc<SpinLock<DatabaseHeader>>,

View file

@ -370,7 +370,11 @@ pub fn insn_to_str(
0,
Value::build_text(""),
0,
"".to_string(),
program.cursor_ref[*cursor_id]
.0
.as_ref()
.unwrap()
.to_string(),
),
Insn::VCreate {
table_name,

View file

@ -196,3 +196,113 @@ do_execsql_test_on_specific_db {:memory:} unique_insert_with_pkey {
SELECT * FROM t2;
} {1|1
2|6}
do_execsql_test_on_specific_db {:memory:} insert_from_select_1 {
CREATE TABLE t(a);
CREATE TABLE t2(b, c);
INSERT INTO t2 values (1, 2), (10, 20);
INSERT INTO t SELECT b FROM t2;
SELECT * FROM t;
INSERT INTO t SELECT c FROM t2;
SELECT * FROM t;
} {1
10
1
10
2
20}
do_execsql_test_on_specific_db {:memory:} insert_from_select_where {
CREATE TABLE t(a);
CREATE TABLE t2(b, c);
INSERT INTO t2 values (1, 2), (10, 20);
INSERT INTO t SELECT b FROM t2 WHERE b < 10;
SELECT * FROM t;
INSERT INTO t SELECT c FROM t2 WHERE c > 2;
SELECT * FROM t;
} {1
1
20}
do_execsql_test_on_specific_db {:memory:} insert_from_select_union_all {
CREATE TABLE t1(x INTEGER);
CREATE TABLE t2(x INTEGER);
CREATE TABLE t3(x INTEGER);
INSERT INTO t1 VALUES(1),(2),(3);
INSERT INTO t2 VALUES(4),(5),(6);
INSERT INTO t3 VALUES(7),(8),(9);
INSERT INTO t1 SELECT x FROM t2 UNION ALL SELECT x FROM t3;
SELECT * FROM t1;
} {1
2
3
4
5
6
7
8
9}
do_execsql_test_on_specific_db {:memory:} insert_from_select_union_all_where {
CREATE TABLE t1(x INTEGER);
CREATE TABLE t2(x INTEGER);
CREATE TABLE t3(x INTEGER);
INSERT INTO t1 VALUES(1),(2),(3);
INSERT INTO t2 VALUES(4),(5),(6);
INSERT INTO t3 VALUES(7),(8),(9);
INSERT INTO t1 SELECT x FROM t2 WHERE x != 4 UNION ALL SELECT x FROM t3 WHERE x == 8;
SELECT * FROM t1;
} {1
2
3
5
6
8}
do_execsql_test_on_specific_db {:memory:} insert_from_select_same_table {
CREATE TABLE t(a INTEGER PRIMARY KEY, b);
INSERT INTO t(b) VALUES (1),(2),(3);
SELECT * FROM t;
INSERT INTO t(b) SELECT b FROM t;
SELECT * FROM t;
} {1|1
2|2
3|3
1|1
2|2
3|3
4|1
5|2
6|3}
do_execsql_test_on_specific_db {:memory:} insert_from_select_same_table_2 {
CREATE TABLE t(a INTEGER PRIMARY KEY, b, c);
INSERT INTO t(b, c) VALUES (1, 100),(2, 200),(3, 300);
SELECT * FROM t;
INSERT INTO t(b, c) SELECT b,c FROM t;
SELECT * FROM t;
} {1|1|100
2|2|200
3|3|300
1|1|100
2|2|200
3|3|300
4|1|100
5|2|200
6|3|300}
do_execsql_test_on_specific_db {:memory:} insert_from_select_union {
CREATE TABLE t(a, b);
CREATE TABLE t2(b, c);
INSERT INTO t2 VALUES (1, 100), (2, 200);
INSERT INTO t SELECT * FROM t UNION SELECT * FROM t2;
SELECT * FROM t;
} {1|100
2|200}

View file

@ -332,8 +332,8 @@ fn test_insert_after_big_blob() -> anyhow::Result<()> {
let tmp_db = TempDatabase::new_with_rusqlite("CREATE TABLE temp (t1 BLOB, t2 INTEGER)");
let conn = tmp_db.connect_limbo();
conn.execute("insert into temp values (zeroblob (262144))")?;
conn.execute("insert into temp values (1)")?;
conn.execute("insert into temp(t1) values (zeroblob (262144))")?;
conn.execute("insert into temp(t2) values (1)")?;
Ok(())
}