propagate waker to sdk-kit

This commit is contained in:
Nikita Sivukhin 2025-12-18 12:17:57 +04:00
parent 0d3032149e
commit 028e13f283
3 changed files with 27 additions and 16 deletions

View file

@ -241,7 +241,7 @@ impl PyTursoStatement {
/// The caller must always either use [Self::step] or [Self::execute] methods for single statement - but never mix them together
pub fn step(&mut self) -> PyResult<PyTursoStatusCode> {
Ok(turso_status_to_py(
self.statement.step().map_err(turso_error_to_py_err)?,
self.statement.step(None).map_err(turso_error_to_py_err)?,
))
}
@ -253,7 +253,10 @@ impl PyTursoStatement {
///
/// The caller must always either use [Self::step] or [Self::execute] methods for single statement - but never mix them together
pub fn execute(&mut self) -> PyResult<PyTursoExecutionResult> {
let result = self.statement.execute().map_err(turso_error_to_py_err)?;
let result = self
.statement
.execute(None)
.map_err(turso_error_to_py_err)?;
Ok(PyTursoExecutionResult {
status: turso_status_to_py(result.status),
rows_changed: result.rows_changed,
@ -299,7 +302,9 @@ impl PyTursoStatement {
/// Note, that if statement wasn't started (no step / execute methods was called) - finalize will not execute the statement
pub fn finalize(&mut self) -> PyResult<PyTursoStatusCode> {
Ok(turso_status_to_py(
self.statement.finalize().map_err(turso_error_to_py_err)?,
self.statement
.finalize(None)
.map_err(turso_error_to_py_err)?,
))
}
/// Reset the statement by clearing bindings and reclaiming memory of the program from previous run

View file

@ -218,7 +218,7 @@ pub extern "C" fn turso_statement_execute(
Ok(statement) => statement,
Err(err) => return unsafe { err.to_capi(error_opt_out) },
};
match statement.execute() {
match statement.execute(None) {
Ok(result) => {
if !rows_changed.is_null() {
unsafe { *rows_changed = result.rows_changed };
@ -239,7 +239,7 @@ pub extern "C" fn turso_statement_step(
Ok(statement) => statement,
Err(err) => return unsafe { err.to_capi(error_opt_out) },
};
match statement.step() {
match statement.step(None) {
Ok(status) => status.to_capi(),
Err(err) => unsafe { err.to_capi(error_opt_out) },
}
@ -271,7 +271,7 @@ pub extern "C" fn turso_statement_finalize(
Ok(statement) => statement,
Err(err) => return unsafe { err.to_capi(error_opt_out) },
};
match statement.finalize() {
match statement.finalize(None) {
Ok(status) => status.to_capi(),
Err(err) => unsafe { err.to_capi(error_opt_out) },
}

View file

@ -2,6 +2,7 @@ use std::{
borrow::Cow,
fmt::Display,
sync::{Arc, Mutex, Once, RwLock},
task::Waker,
};
use tracing::level_filters::LevelFilter;
@ -730,16 +731,21 @@ impl TursoStatement {
/// method returns [TursoStatusCode::Done] if execution is finished
/// method returns [TursoStatusCode::Row] if execution generated a row
/// method returns [TursoStatusCode::Io] if async_io was set and execution needs IO in order to make progress
pub fn step(&mut self) -> Result<TursoStatusCode, TursoError> {
pub fn step(&mut self, waker: Option<&Waker>) -> Result<TursoStatusCode, TursoError> {
let guard = self.concurrent_guard.clone();
let _guard = guard.try_use()?;
self.step_no_guard()
self.step_no_guard(waker)
}
fn step_no_guard(&mut self) -> Result<TursoStatusCode, TursoError> {
fn step_no_guard(&mut self, waker: Option<&Waker>) -> Result<TursoStatusCode, TursoError> {
let async_io = self.async_io;
loop {
return match self.statement.step() {
let result = if let Some(waker) = waker {
self.statement.step_with_waker(waker)
} else {
self.statement.step()
};
return match result {
Ok(StepResult::Done) => Ok(TursoStatusCode::Done),
Ok(StepResult::Row) => Ok(TursoStatusCode::Row),
Ok(StepResult::Busy) => Err(TursoError {
@ -765,12 +771,12 @@ impl TursoStatement {
/// execute statement to completion
/// method returns [TursoStatusCode::Done] if execution completed
/// method returns [TursoStatusCode::Io] if async_io was set and execution needs IO in order to make progress
pub fn execute(&mut self) -> Result<TursoExecutionResult, TursoError> {
pub fn execute(&mut self, waker: Option<&Waker>) -> Result<TursoExecutionResult, TursoError> {
let guard = self.concurrent_guard.clone();
let _guard = guard.try_use()?;
loop {
let status = self.step_no_guard()?;
let status = self.step_no_guard(waker)?;
if status == TursoStatusCode::Row {
continue;
} else if status == TursoStatusCode::Io {
@ -832,12 +838,12 @@ impl TursoStatement {
}
/// finalize statement execution
/// this method must be called in the end of statement execution (either successfull or not)
pub fn finalize(&mut self) -> Result<TursoStatusCode, TursoError> {
pub fn finalize(&mut self, waker: Option<&Waker>) -> Result<TursoStatusCode, TursoError> {
let guard = self.concurrent_guard.clone();
let _guard = guard.try_use()?;
while self.statement.execution_state().is_running() {
let status = self.step_no_guard()?;
let status = self.step_no_guard(waker)?;
if status == TursoStatusCode::Io {
return Ok(status);
}
@ -909,7 +915,7 @@ mod tests {
let mut threads = Vec::new();
for mut stmt in [stmt1, stmt2] {
let thread = std::thread::spawn(move || stmt.execute());
let thread = std::thread::spawn(move || stmt.execute(None));
threads.push(thread);
}
let mut results = Vec::new();
@ -935,6 +941,6 @@ mod tests {
let mut stmt = conn
.prepare_single("SELECT * FROM generate_series(1, 10000)")
.unwrap();
assert_eq!(stmt.execute().unwrap().status, TursoStatusCode::Done);
assert_eq!(stmt.execute(None).unwrap().status, TursoStatusCode::Done);
}
}