Merge 'Add statement interruption support' from Pekka Enberg

This adds an interrupt() method to Statement that allows apps to
interrupt a running statement. Please note that this is different from
`sqlite3_interrupt()` which interrupts all ongoing operations in a
database. Although we want to support that too, per statement interrupt
is much more useful to apps.

Reviewed-by: Pere Diaz Bou <pere-altea@homail.com>

Closes #512
This commit is contained in:
Pere Diaz Bou 2024-12-19 12:59:19 +01:00
commit dbe6b8d899
10 changed files with 56 additions and 1 deletions

View file

@ -134,6 +134,9 @@ impl Cursor {
PyErr::new::<OperationalError, _>(format!("IO error: {:?}", e))
})?;
}
limbo_core::RowResult::Interrupt => {
return Ok(None);
}
limbo_core::RowResult::Done => {
return Ok(None);
}
@ -165,6 +168,9 @@ impl Cursor {
PyErr::new::<OperationalError, _>(format!("IO error: {:?}", e))
})?;
}
limbo_core::RowResult::Interrupt => {
return Ok(results);
}
limbo_core::RowResult::Done => {
return Ok(results);
}

View file

@ -83,7 +83,9 @@ impl Statement {
}
JsValue::from(row_array)
}
Ok(limbo_core::RowResult::IO) | Ok(limbo_core::RowResult::Done) => JsValue::UNDEFINED,
Ok(limbo_core::RowResult::IO)
| Ok(limbo_core::RowResult::Done)
| Ok(limbo_core::RowResult::Interrupt) => JsValue::UNDEFINED,
Err(e) => panic!("Error: {:?}", e),
}
}
@ -101,6 +103,7 @@ impl Statement {
array.push(&row_array);
}
Ok(limbo_core::RowResult::IO) => {}
Ok(limbo_core::RowResult::Interrupt) => break,
Ok(limbo_core::RowResult::Done) => break,
Err(e) => panic!("Error: {:?}", e),
}

View file

@ -521,6 +521,7 @@ impl Limbo {
Ok(RowResult::IO) => {
self.io.run_once()?;
}
Ok(RowResult::Interrupt) => break,
Ok(RowResult::Done) => {
break;
}
@ -557,6 +558,7 @@ impl Limbo {
Ok(RowResult::IO) => {
self.io.run_once()?;
}
Ok(RowResult::Interrupt) => break,
Ok(RowResult::Done) => break,
Err(err) => {
let _ = self.write_fmt(format_args!("{}", err));
@ -606,6 +608,7 @@ impl Limbo {
RowResult::IO => {
self.io.run_once()?;
}
RowResult::Interrupt => break,
RowResult::Done => break,
}
}
@ -658,6 +661,7 @@ impl Limbo {
RowResult::IO => {
self.io.run_once()?;
}
RowResult::Interrupt => break,
RowResult::Done => break,
}
}

View file

@ -46,6 +46,9 @@ fn limbo_bench(criterion: &mut Criterion) {
limbo_core::RowResult::IO => {
io.run_once().unwrap();
}
limbo_core::RowResult::Interrupt => {
unreachable!();
}
limbo_core::RowResult::Done => {
unreachable!();
}
@ -68,6 +71,9 @@ fn limbo_bench(criterion: &mut Criterion) {
limbo_core::RowResult::IO => {
io.run_once().unwrap();
}
limbo_core::RowResult::Interrupt => {
unreachable!();
}
limbo_core::RowResult::Done => {
unreachable!();
}
@ -91,6 +97,9 @@ fn limbo_bench(criterion: &mut Criterion) {
limbo_core::RowResult::IO => {
io.run_once().unwrap();
}
limbo_core::RowResult::Interrupt => {
unreachable!();
}
limbo_core::RowResult::Done => {
unreachable!();
}

View file

@ -367,12 +367,17 @@ impl Statement {
}
}
pub fn interrupt(&mut self) {
self.state.interrupt();
}
pub fn step(&mut self) -> Result<RowResult<'_>> {
let result = self.program.step(&mut self.state, self.pager.clone())?;
match result {
vdbe::StepResult::Row(row) => Ok(RowResult::Row(Row { values: row.values })),
vdbe::StepResult::IO => Ok(RowResult::IO),
vdbe::StepResult::Done => Ok(RowResult::Done),
vdbe::StepResult::Interrupt => Ok(RowResult::Interrupt),
}
}
@ -388,6 +393,7 @@ pub enum RowResult<'a> {
Row(Row<'a>),
IO,
Done,
Interrupt,
}
pub struct Row<'a> {

View file

@ -51,6 +51,7 @@ pub fn parse_schema_rows(rows: Option<Rows>, schema: &mut Schema, io: Arc<dyn IO
// read the schema is actually complete?
io.run_once()?;
}
RowResult::Interrupt => break,
RowResult::Done => break,
}
}

View file

@ -554,6 +554,7 @@ pub enum StepResult<'a> {
Done,
IO,
Row(Record<'a>),
Interrupt,
}
/// If there is I/O, the instruction is restarted.
@ -589,6 +590,7 @@ pub struct ProgramState {
deferred_seek: Option<(CursorID, CursorID)>,
ended_coroutine: bool, // flag to notify yield coroutine finished
regex_cache: RegexCache,
interrupted: bool,
}
impl ProgramState {
@ -604,6 +606,7 @@ impl ProgramState {
deferred_seek: None,
ended_coroutine: false,
regex_cache: RegexCache::new(),
interrupted: false,
}
}
@ -614,6 +617,14 @@ impl ProgramState {
pub fn column(&self, i: usize) -> Option<String> {
Some(format!("{:?}", self.registers[i]))
}
pub fn interrupt(&mut self) {
self.interrupted = true;
}
pub fn is_interrupted(&self) -> bool {
self.interrupted
}
}
#[derive(Debug)]
@ -652,6 +663,9 @@ impl Program {
pager: Rc<Pager>,
) -> Result<StepResult<'a>> {
loop {
if state.is_interrupted() {
return Ok(StepResult::Interrupt);
}
let insn = &self.insns[state.pc as usize];
trace_insn(self, state.pc as InsnReference, insn);
let mut cursors = state.cursors.borrow_mut();

View file

@ -355,6 +355,9 @@ fn get_all_rows(
break 'rows_loop;
}
}
RowResult::Interrupt => {
break;
}
RowResult::Done => {
break;
}

View file

@ -19,6 +19,7 @@ pub const SQLITE_ERROR: ffi::c_int = 1;
pub const SQLITE_ABORT: ffi::c_int = 4;
pub const SQLITE_BUSY: ffi::c_int = 5;
pub const SQLITE_NOMEM: ffi::c_int = 7;
pub const SQLITE_INTERRUPT: ffi::c_int = 9;
pub const SQLITE_NOTFOUND: ffi::c_int = 14;
pub const SQLITE_MISUSE: ffi::c_int = 21;
pub const SQLITE_ROW: ffi::c_int = 100;
@ -235,6 +236,7 @@ pub unsafe extern "C" fn sqlite3_step(stmt: *mut sqlite3_stmt) -> std::ffi::c_in
match result {
limbo_core::RowResult::IO => SQLITE_BUSY,
limbo_core::RowResult::Done => SQLITE_DONE,
limbo_core::RowResult::Interrupt => SQLITE_INTERRUPT,
limbo_core::RowResult::Row(row) => {
stmt.row.replace(Some(row));
SQLITE_ROW

View file

@ -93,6 +93,7 @@ mod tests {
RowResult::IO => {
tmp_db.io.run_once()?;
}
RowResult::Interrupt => break,
RowResult::Done => break,
}
},
@ -160,6 +161,7 @@ mod tests {
RowResult::IO => {
tmp_db.io.run_once()?;
}
RowResult::Interrupt => break,
RowResult::Done => break,
}
},
@ -233,6 +235,7 @@ mod tests {
RowResult::IO => {
tmp_db.io.run_once()?;
}
RowResult::Interrupt => break,
RowResult::Done => break,
}
},
@ -295,6 +298,7 @@ mod tests {
RowResult::IO => {
tmp_db.io.run_once()?;
}
RowResult::Interrupt => break,
RowResult::Done => break,
}
},
@ -355,6 +359,7 @@ mod tests {
RowResult::IO => {
tmp_db.io.run_once()?;
}
RowResult::Interrupt => break,
RowResult::Done => break,
}
}
@ -446,6 +451,7 @@ mod tests {
RowResult::IO => {
tmp_db.io.run_once()?;
}
RowResult::Interrupt => break,
RowResult::Done => break,
}
}
@ -479,6 +485,7 @@ mod tests {
RowResult::IO => {
tmp_db.io.run_once()?;
}
RowResult::Interrupt => break,
RowResult::Done => break,
}
},