diff --git a/bindings/python/src/turso.rs b/bindings/python/src/turso.rs index b6e89dcec..d1be19e2b 100644 --- a/bindings/python/src/turso.rs +++ b/bindings/python/src/turso.rs @@ -241,7 +241,7 @@ impl PyTursoStatement { /// The caller must always either use [Self::step] or [Self::execute] methods for single statement - but never mix them together pub fn step(&mut self) -> PyResult { Ok(turso_status_to_py( - self.statement.step().map_err(turso_error_to_py_err)?, + self.statement.step(None).map_err(turso_error_to_py_err)?, )) } @@ -253,7 +253,10 @@ impl PyTursoStatement { /// /// The caller must always either use [Self::step] or [Self::execute] methods for single statement - but never mix them together pub fn execute(&mut self) -> PyResult { - let result = self.statement.execute().map_err(turso_error_to_py_err)?; + let result = self + .statement + .execute(None) + .map_err(turso_error_to_py_err)?; Ok(PyTursoExecutionResult { status: turso_status_to_py(result.status), rows_changed: result.rows_changed, @@ -299,7 +302,9 @@ impl PyTursoStatement { /// Note, that if statement wasn't started (no step / execute methods was called) - finalize will not execute the statement pub fn finalize(&mut self) -> PyResult { Ok(turso_status_to_py( - self.statement.finalize().map_err(turso_error_to_py_err)?, + self.statement + .finalize(None) + .map_err(turso_error_to_py_err)?, )) } /// Reset the statement by clearing bindings and reclaiming memory of the program from previous run diff --git a/sdk-kit/src/capi.rs b/sdk-kit/src/capi.rs index 601ccc5d9..c184cf154 100644 --- a/sdk-kit/src/capi.rs +++ b/sdk-kit/src/capi.rs @@ -218,7 +218,7 @@ pub extern "C" fn turso_statement_execute( Ok(statement) => statement, Err(err) => return unsafe { err.to_capi(error_opt_out) }, }; - match statement.execute() { + match statement.execute(None) { Ok(result) => { if !rows_changed.is_null() { unsafe { *rows_changed = result.rows_changed }; @@ -239,7 +239,7 @@ pub extern "C" fn turso_statement_step( Ok(statement) => statement, Err(err) => return unsafe { err.to_capi(error_opt_out) }, }; - match statement.step() { + match statement.step(None) { Ok(status) => status.to_capi(), Err(err) => unsafe { err.to_capi(error_opt_out) }, } @@ -271,7 +271,7 @@ pub extern "C" fn turso_statement_finalize( Ok(statement) => statement, Err(err) => return unsafe { err.to_capi(error_opt_out) }, }; - match statement.finalize() { + match statement.finalize(None) { Ok(status) => status.to_capi(), Err(err) => unsafe { err.to_capi(error_opt_out) }, } diff --git a/sdk-kit/src/rsapi.rs b/sdk-kit/src/rsapi.rs index 459425670..ee90d9b26 100644 --- a/sdk-kit/src/rsapi.rs +++ b/sdk-kit/src/rsapi.rs @@ -2,6 +2,7 @@ use std::{ borrow::Cow, fmt::Display, sync::{Arc, Mutex, Once, RwLock}, + task::Waker, }; use tracing::level_filters::LevelFilter; @@ -730,16 +731,21 @@ impl TursoStatement { /// method returns [TursoStatusCode::Done] if execution is finished /// method returns [TursoStatusCode::Row] if execution generated a row /// method returns [TursoStatusCode::Io] if async_io was set and execution needs IO in order to make progress - pub fn step(&mut self) -> Result { + pub fn step(&mut self, waker: Option<&Waker>) -> Result { let guard = self.concurrent_guard.clone(); let _guard = guard.try_use()?; - self.step_no_guard() + self.step_no_guard(waker) } - fn step_no_guard(&mut self) -> Result { + fn step_no_guard(&mut self, waker: Option<&Waker>) -> Result { let async_io = self.async_io; loop { - return match self.statement.step() { + let result = if let Some(waker) = waker { + self.statement.step_with_waker(waker) + } else { + self.statement.step() + }; + return match result { Ok(StepResult::Done) => Ok(TursoStatusCode::Done), Ok(StepResult::Row) => Ok(TursoStatusCode::Row), Ok(StepResult::Busy) => Err(TursoError { @@ -765,12 +771,12 @@ impl TursoStatement { /// execute statement to completion /// method returns [TursoStatusCode::Done] if execution completed /// method returns [TursoStatusCode::Io] if async_io was set and execution needs IO in order to make progress - pub fn execute(&mut self) -> Result { + pub fn execute(&mut self, waker: Option<&Waker>) -> Result { let guard = self.concurrent_guard.clone(); let _guard = guard.try_use()?; loop { - let status = self.step_no_guard()?; + let status = self.step_no_guard(waker)?; if status == TursoStatusCode::Row { continue; } else if status == TursoStatusCode::Io { @@ -832,12 +838,12 @@ impl TursoStatement { } /// finalize statement execution /// this method must be called in the end of statement execution (either successfull or not) - pub fn finalize(&mut self) -> Result { + pub fn finalize(&mut self, waker: Option<&Waker>) -> Result { let guard = self.concurrent_guard.clone(); let _guard = guard.try_use()?; while self.statement.execution_state().is_running() { - let status = self.step_no_guard()?; + let status = self.step_no_guard(waker)?; if status == TursoStatusCode::Io { return Ok(status); } @@ -909,7 +915,7 @@ mod tests { let mut threads = Vec::new(); for mut stmt in [stmt1, stmt2] { - let thread = std::thread::spawn(move || stmt.execute()); + let thread = std::thread::spawn(move || stmt.execute(None)); threads.push(thread); } let mut results = Vec::new(); @@ -935,6 +941,6 @@ mod tests { let mut stmt = conn .prepare_single("SELECT * FROM generate_series(1, 10000)") .unwrap(); - assert_eq!(stmt.execute().unwrap().status, TursoStatusCode::Done); + assert_eq!(stmt.execute(None).unwrap().status, TursoStatusCode::Done); } }