Merge 'Emit IdxDelete instruction and some fixes on seek after deletion' from Pere Diaz Bou

Previously `DELETE FROM ...` only emitted deletes for main table, but
this is incorrect as we want to remove entries from index tables as
well.

Closes #1383
This commit is contained in:
Pere Diaz Bou 2025-04-28 09:13:54 +03:00
commit 63a94e7c62
8 changed files with 335 additions and 8 deletions

View file

@ -5238,8 +5238,10 @@ mod tests {
fast_lock::SpinLock,
io::{Buffer, Completion, MemoryIO, OpenFlags, IO},
storage::{
database::DatabaseFile, page_cache::DumbLruPageCache, sqlite3_ondisk,
sqlite3_ondisk::DatabaseHeader,
database::DatabaseFile,
page_cache::DumbLruPageCache,
pager::CreateBTreeFlags,
sqlite3_ondisk::{self, DatabaseHeader},
},
types::Text,
vdbe::Register,
@ -5757,6 +5759,81 @@ mod tests {
}
}
}
fn btree_index_insert_fuzz_run(attempts: usize, inserts: usize) {
let (mut rng, seed) = if std::env::var("SEED").is_ok() {
let seed = std::env::var("SEED").unwrap();
let seed = seed.parse::<u64>().unwrap();
let rng = ChaCha8Rng::seed_from_u64(seed);
(rng, seed)
} else {
rng_from_time()
};
let mut seen = HashSet::new();
tracing::info!("super seed: {}", seed);
for _ in 0..attempts {
let (pager, _) = empty_btree();
let index_root_page = pager.btree_create(&CreateBTreeFlags::new_index());
let index_root_page = index_root_page as usize;
let mut cursor = BTreeCursor::new(None, pager.clone(), index_root_page);
let mut keys = Vec::new();
tracing::info!("seed: {}", seed);
for _ in 0..inserts {
let key = {
let result;
loop {
let cols = (0..10)
.map(|_| (rng.next_u64() % (1 << 30)) as i64)
.collect::<Vec<_>>();
if seen.contains(&cols) {
continue;
} else {
seen.insert(cols.clone());
}
result = cols;
break;
}
result
};
keys.push(key.clone());
let value = ImmutableRecord::from_registers(
&key.iter()
.map(|col| Register::OwnedValue(OwnedValue::Integer(*col)))
.collect::<Vec<_>>(),
);
run_until_done(
|| {
cursor.insert(
&BTreeKey::new_index_key(&value),
cursor.is_write_in_progress(),
)
},
pager.deref(),
)
.unwrap();
keys.sort();
cursor.move_to_root();
}
keys.sort();
cursor.move_to_root();
for key in keys.iter() {
tracing::trace!("seeking key: {:?}", key);
run_until_done(|| cursor.next(), pager.deref()).unwrap();
let record = cursor.record();
let record = record.as_ref().unwrap();
let cursor_key = record.get_values();
assert_eq!(
cursor_key,
&key.iter()
.map(|col| RefValue::Integer(*col))
.collect::<Vec<_>>(),
"key {:?} is not found",
key
);
}
}
}
#[test]
pub fn test_drop_odd() {
let db = get_database();
@ -5810,6 +5887,11 @@ mod tests {
}
}
#[test]
pub fn btree_index_insert_fuzz_run_equal_size() {
btree_index_insert_fuzz_run(2, 1024 * 32);
}
#[test]
pub fn btree_insert_fuzz_run_random() {
btree_insert_fuzz_run(128, 16, |rng| (rng.next_u32() % 4096) as usize);

View file

@ -50,6 +50,11 @@ pub fn prepare_delete_plan(
crate::bail_corrupt_error!("Table is neither a virtual table nor a btree table");
};
let name = tbl_name.name.0.as_str().to_string();
let indexes = schema
.get_indices(table.get_name())
.iter()
.cloned()
.collect();
let mut table_references = vec![TableReference {
table,
identifier: name,
@ -82,6 +87,7 @@ pub fn prepare_delete_plan(
limit: resolved_limit,
offset: resolved_offset,
contains_constant_false_condition: false,
indexes,
};
Ok(Plan::Delete(plan))

View file

@ -2,13 +2,16 @@
// It handles translating high-level SQL operations into low-level bytecode that can be executed by the virtual machine.
use std::rc::Rc;
use std::sync::Arc;
use limbo_sqlite3_parser::ast::{self};
use crate::function::Func;
use crate::schema::Index;
use crate::translate::plan::{DeletePlan, Plan, Search};
use crate::util::exprs_are_equivalent;
use crate::vdbe::builder::ProgramBuilder;
use crate::vdbe::insn::RegisterOrLiteral;
use crate::vdbe::{insn::Insn, BranchOffset};
use crate::{Result, SymbolTable};
@ -373,7 +376,13 @@ fn emit_program_for_delete(
&plan.table_references,
&plan.where_clause,
)?;
emit_delete_insns(program, &mut t_ctx, &plan.table_references, &plan.limit)?;
emit_delete_insns(
program,
&mut t_ctx,
&plan.table_references,
&plan.indexes,
&plan.limit,
)?;
// Clean up and close the main execution loop
close_loop(program, &mut t_ctx, &plan.table_references)?;
@ -390,6 +399,7 @@ fn emit_delete_insns(
program: &mut ProgramBuilder,
t_ctx: &mut TranslateCtx,
table_references: &[TableReference],
index_references: &[Arc<Index>],
limit: &Option<isize>,
) -> Result<()> {
let table_reference = table_references.first().unwrap();
@ -405,11 +415,12 @@ fn emit_delete_insns(
},
_ => return Ok(()),
};
let main_table_cursor_id = program.resolve_cursor_id(table_reference.table.get_name());
// Emit the instructions to delete the row
let key_reg = program.alloc_register();
program.emit_insn(Insn::RowId {
cursor_id,
cursor_id: main_table_cursor_id,
dest: key_reg,
});
@ -430,7 +441,43 @@ fn emit_delete_insns(
conflict_action,
});
} else {
program.emit_insn(Insn::Delete { cursor_id });
for index in index_references {
let index_cursor_id = program.alloc_cursor_id(
Some(index.name.clone()),
crate::vdbe::builder::CursorType::BTreeIndex(index.clone()),
);
program.emit_insn(Insn::OpenWrite {
cursor_id: index_cursor_id,
root_page: RegisterOrLiteral::Literal(index.root_page),
});
let num_regs = index.columns.len() + 1;
let start_reg = program.alloc_registers(num_regs);
// Emit columns that are part of the index
index
.columns
.iter()
.enumerate()
.for_each(|(reg_offset, column_index)| {
program.emit_insn(Insn::Column {
cursor_id: main_table_cursor_id,
column: column_index.pos_in_table,
dest: start_reg + reg_offset,
});
});
program.emit_insn(Insn::RowId {
cursor_id: main_table_cursor_id,
dest: start_reg + num_regs - 1,
});
program.emit_insn(Insn::IdxDelete {
start_reg,
num_regs,
cursor_id: index_cursor_id,
});
}
program.emit_insn(Insn::Delete {
cursor_id: main_table_cursor_id,
});
}
if let Some(limit) = limit {
let limit_reg = program.alloc_register();

View file

@ -297,6 +297,8 @@ pub struct DeletePlan {
pub offset: Option<isize>,
/// query contains a constant condition that is always false
pub contains_constant_false_condition: bool,
/// Indexes that must be updated by the delete operation.
pub indexes: Vec<Arc<Index>>,
}
#[derive(Debug, Clone)]

View file

@ -1810,11 +1810,17 @@ pub fn op_row_id(
let rowid = {
let mut index_cursor = state.get_cursor(index_cursor_id);
let index_cursor = index_cursor.as_btree_mut();
index_cursor.rowid()?
let record = index_cursor.record();
let record = record.as_ref().unwrap();
let rowid = record.get_values().last().unwrap();
match rowid {
RefValue::Integer(rowid) => *rowid as u64,
_ => unreachable!(),
}
};
let mut table_cursor = state.get_cursor(table_cursor_id);
let table_cursor = table_cursor.as_btree_mut();
match table_cursor.seek(SeekKey::TableRowId(rowid.unwrap()), SeekOp::EQ)? {
match table_cursor.seek(SeekKey::TableRowId(rowid), SeekOp::EQ)? {
CursorResult::Ok(_) => None,
CursorResult::IO => Some((index_cursor_id, table_cursor_id)),
}
@ -2069,7 +2075,6 @@ pub fn op_idx_ge(
let idx_values = idx_record.get_values();
let idx_values = &idx_values[..record_from_regs.len()];
let record_values = record_from_regs.get_values();
let record_values = &record_values[..idx_values.len()];
let ord = compare_immutable(&idx_values, &record_values, cursor.index_key_sort_order);
if ord.is_ge() {
target_pc.to_offset_int()
@ -3759,6 +3764,46 @@ pub fn op_delete(
Ok(InsnFunctionStepResult::Step)
}
pub fn op_idx_delete(
program: &Program,
state: &mut ProgramState,
insn: &Insn,
pager: &Rc<Pager>,
mv_store: Option<&Rc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
let Insn::IdxDelete {
cursor_id,
start_reg,
num_regs,
} = insn
else {
unreachable!("unexpected Insn {:?}", insn)
};
let record = make_record(&state.registers, start_reg, num_regs);
{
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
return_if_io!(cursor.seek(SeekKey::IndexKey(&record), SeekOp::EQ));
if cursor.rowid()?.is_none() {
// If P5 is not zero, then raise an SQLITE_CORRUPT_INDEX error if no matching
// index entry is found. This happens when running an UPDATE or DELETE statement and the
// index entry to be updated or deleted is not found. For some uses of IdxDelete
// (example: the EXCEPT operator) it does not matter that no matching entry is found.
// For those cases, P5 is zero. Also, do not raise this (self-correcting and non-critical) error if in writable_schema mode.
return Err(LimboError::Corrupt(format!(
"IdxDelete: no matching index entry found for record {:?}",
record
)));
}
return_if_io!(cursor.delete());
}
let n_change = program.n_change.get();
program.n_change.set(n_change + 1);
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
}
pub fn op_idx_insert(
program: &Program,
state: &mut ProgramState,

View file

@ -1037,6 +1037,19 @@ pub fn insn_to_str(
0,
"".to_string(),
),
Insn::IdxDelete {
cursor_id,
start_reg,
num_regs,
} => (
"IdxDelete",
*cursor_id as i32,
*start_reg as i32,
*num_regs as i32,
OwnedValue::build_text(""),
0,
"".to_string(),
),
Insn::NewRowid {
cursor,
rowid_reg,

View file

@ -650,6 +650,12 @@ pub enum Insn {
cursor_id: CursorID,
},
IdxDelete {
start_reg: usize,
num_regs: usize,
cursor_id: CursorID,
},
NewRowid {
cursor: CursorID, // P1
rowid_reg: usize, // P2 Destination register to store the new rowid
@ -961,6 +967,7 @@ impl Insn {
Insn::Once { .. } => execute::op_once,
Insn::NotFound { .. } => execute::op_not_found,
Insn::Affinity { .. } => execute::op_affinity,
Insn::IdxDelete { .. } => execute::op_idx_delete,
}
}
}

View file

@ -461,3 +461,128 @@ fn test_insert_after_big_blob() -> anyhow::Result<()> {
Ok(())
}
#[test_log::test]
#[ignore = "this takes too long :)"]
fn test_write_delete_with_index() -> anyhow::Result<()> {
let _ = env_logger::try_init();
maybe_setup_tracing();
let tmp_db = TempDatabase::new_with_rusqlite("CREATE TABLE test (x PRIMARY KEY);");
let conn = tmp_db.connect_limbo();
let list_query = "SELECT * FROM test";
let max_iterations = 1000;
for i in 0..max_iterations {
println!("inserting {} ", i);
if (i % 100) == 0 {
let progress = (i as f64 / max_iterations as f64) * 100.0;
println!("progress {:.1}%", progress);
}
let insert_query = format!("INSERT INTO test VALUES ({})", i);
match conn.query(insert_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
};
}
for i in 0..max_iterations {
println!("deleting {} ", i);
if (i % 100) == 0 {
let progress = (i as f64 / max_iterations as f64) * 100.0;
println!("progress {:.1}%", progress);
}
let delete_query = format!("delete from test where x={}", i);
match conn.query(delete_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
};
println!("listing after deleting {} ", i);
let mut current_read_index = i + 1;
match conn.query(list_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::Row => {
let row = rows.row().unwrap();
let first_value = row.get::<&OwnedValue>(0).expect("missing id");
let id = match first_value {
limbo_core::OwnedValue::Integer(i) => *i as i32,
limbo_core::OwnedValue::Float(f) => *f as i32,
_ => unreachable!(),
};
assert_eq!(current_read_index, id);
current_read_index += 1;
}
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
StepResult::Busy => {
panic!("Database is busy");
}
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
}
for i in i + 1..max_iterations {
// now test with seek
match conn.query(format!("select * from test where x = {}", i)) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::Row => {
let row = rows.row().unwrap();
let first_value = row.get::<&OwnedValue>(0).expect("missing id");
let id = match first_value {
limbo_core::OwnedValue::Integer(i) => *i as i32,
limbo_core::OwnedValue::Float(f) => *f as i32,
_ => unreachable!(),
};
assert_eq!(i, id);
break;
}
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
StepResult::Busy => {
panic!("Database is busy");
}
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
}
}
}
Ok(())
}