Merge 'Rust binding improvements' from Pedro Muniz
Some checks are pending
Build and push limbo-sim image / deploy (push) Waiting to run
Go Tests / test (push) Waiting to run
Java Tests / test (push) Waiting to run
JavaScript / stable - aarch64-apple-darwin - node@20 (push) Waiting to run
JavaScript / stable - x86_64-apple-darwin - node@20 (push) Waiting to run
JavaScript / stable - x86_64-pc-windows-msvc - node@20 (push) Waiting to run
JavaScript / stable - x86_64-unknown-linux-gnu - node@20 (push) Waiting to run
JavaScript / Test bindings on x86_64-apple-darwin - node@20 (push) Blocked by required conditions
JavaScript / Test bindings on Linux-x64-gnu - node@20 (push) Blocked by required conditions
JavaScript / Build universal macOS binary (push) Blocked by required conditions
JavaScript / Publish (push) Blocked by required conditions
Python / configure-strategy (push) Waiting to run
Python / test (push) Blocked by required conditions
Python / lint (push) Waiting to run
Python / linux (x86_64) (push) Waiting to run
Python / macos-x86_64 (x86_64) (push) Waiting to run
Python / macos-arm64 (aarch64) (push) Waiting to run
Python / sdist (push) Waiting to run
Python / Release (push) Blocked by required conditions
Rust / cargo-fmt-check (push) Waiting to run
Rust / build-native (blacksmith-4vcpu-ubuntu-2404) (push) Waiting to run
Rust / build-native (macos-latest) (push) Waiting to run
Rust / build-native (windows-latest) (push) Waiting to run
Rust / clippy (push) Waiting to run
Rust / build-wasm (push) Waiting to run
Rust / simulator (push) Waiting to run
Rust / test-limbo (push) Waiting to run
Rust / test-sqlite (push) Waiting to run
Rust Benchmarks+Nyrkiö / bench (push) Waiting to run
Rust Benchmarks+Nyrkiö / clickbench (push) Waiting to run
Rust Benchmarks+Nyrkiö / tpc-h-criterion (push) Waiting to run
Rust Benchmarks+Nyrkiö / tpc-h (push) Waiting to run

This PR aims to add some more comments and documentation to the Rust
binding. It also fixes an issue where you could not reuse the statement
after it ran to completion or errored. Now the statement resets in those
cases. Also implemented `Stream` for the Rows struct allowing users to
leverage the built-in iterator like methods for better ergonomics.

Closes #1905
This commit is contained in:
Pekka Enberg 2025-07-01 00:08:18 +03:00
commit bd60cd214c
10 changed files with 402 additions and 236 deletions

1
Cargo.lock generated
View file

@ -3620,6 +3620,7 @@ dependencies = [
name = "turso"
version = "0.1.1"
dependencies = [
"futures-util",
"tempfile",
"thiserror 2.0.12",
"tokio",

View file

@ -10,13 +10,15 @@ repository.workspace = true
description = "Turso Rust API"
[features]
default = []
default = ["futures"]
experimental_indexes = []
antithesis = ["turso_core/antithesis"]
futures = ["dep:futures-util"]
[dependencies]
turso_core = { workspace = true, features = ["io_uring"] }
thiserror = "2.0.9"
futures-util = { version = "0.3.31", optional = true, default-features = false, features = ["std", "async-await"] }
[dev-dependencies]
tempfile = "3.20.0"

View file

@ -1,3 +1,6 @@
#![cfg(feature = "futures")]
use futures_util::stream::TryStreamExt;
use turso::Builder;
#[tokio::main]
@ -6,8 +9,15 @@ async fn main() {
let conn = db.connect().unwrap();
conn.query("select 1; select 1;", ()).await.unwrap();
// `query` and other methods, only parse the first query given.
let mut rows = conn.query("select 1; select 1;", ()).await.unwrap();
// Iterate over the rows with the Stream iterator syntax
while let Some(row) = rows.try_next().await.unwrap() {
let val = row.get_value(0).unwrap();
println!("{:?}", val);
}
// Contrary to `prepare` and `query`, `execute` is not lazy and will execute the query to completion
conn.execute("CREATE TABLE IF NOT EXISTS users (email TEXT)", ())
.await
.unwrap();
@ -32,9 +42,20 @@ async fn main() {
let mut rows = stmt.query(["foo@example.com"]).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
while let Some(row) = rows.try_next().await.unwrap() {
let value = row.get_value(0).unwrap();
println!("Row: {:?}", value);
}
let value = row.get_value(0).unwrap();
let rows = stmt.query(["foo@example.com"]).await.unwrap();
println!("Row: {:?}", value);
// As `Rows` implement streams you can easily map over it and apply other transformations you see fit
println!("Using Stream map");
rows.map_ok(|row| row.get_value(0).unwrap())
.try_for_each(|val| {
println!("Row: {:?}", val.as_text().unwrap());
futures_util::future::ready(Ok(()))
})
.await
.unwrap();
}

View file

@ -20,28 +20,33 @@
//! You can also prepare statements with the [`Connection`] object and then execute the [`Statement`] objects:
//!
//! ```rust,no_run
//! # use crate::turso::futures_util::TryStreamExt;
//! # async fn run() {
//! # use turso::Builder;
//! # let db = Builder::new_local(":memory:").build().await.unwrap();
//! # let conn = db.connect().unwrap();
//! let mut stmt = conn.prepare("SELECT * FROM users WHERE email = ?1").await.unwrap();
//! let mut rows = stmt.query(["foo@example.com"]).await.unwrap();
//! let row = rows.next().await.unwrap().unwrap();
//! let row = rows.try_next().await.unwrap().unwrap();
//! let value = row.get_value(0).unwrap();
//! println!("Row: {:?}", value);
//! # }
//! ```
pub mod params;
pub mod row;
pub mod statement;
pub mod value;
#[cfg(feature = "futures")]
pub use futures_util;
pub use params::params_from_iter;
pub use value::Value;
pub use params::params_from_iter;
use crate::params::*;
use crate::row::{Row, Rows};
use crate::statement::Statement;
use std::fmt::Debug;
use std::num::NonZero;
use std::sync::{Arc, Mutex};
#[derive(Debug, thiserror::Error)]
@ -62,7 +67,7 @@ impl From<turso_core::LimboError> for Error {
pub(crate) type BoxError = Box<dyn std::error::Error + Send + Sync>;
pub type Result<T> = std::result::Result<T, Error>;
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// A builder for `Database`.
pub struct Builder {
@ -213,119 +218,6 @@ impl Debug for Connection {
}
}
/// A prepared statement.
pub struct Statement {
inner: Arc<Mutex<turso_core::Statement>>,
}
impl Clone for Statement {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
unsafe impl Send for Statement {}
unsafe impl Sync for Statement {}
impl Statement {
/// Query the database with this prepared statement.
pub async fn query(&mut self, params: impl IntoParams) -> Result<Rows> {
let params = params.into_params()?;
match params {
params::Params::None => (),
params::Params::Positional(values) => {
for (i, value) in values.into_iter().enumerate() {
let mut stmt = self.inner.lock().unwrap();
stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into());
}
}
params::Params::Named(values) => {
for (name, value) in values.into_iter() {
let mut stmt = self.inner.lock().unwrap();
let i = stmt.parameters().index(name).unwrap();
stmt.bind_at(i, value.into());
}
}
}
#[allow(clippy::arc_with_non_send_sync)]
let rows = Rows {
inner: Arc::clone(&self.inner),
};
Ok(rows)
}
/// Execute this prepared statement.
pub async fn execute(&mut self, params: impl IntoParams) -> Result<u64> {
{
// Reset the statement before executing
self.inner.lock().unwrap().reset();
}
let params = params.into_params()?;
match params {
params::Params::None => (),
params::Params::Positional(values) => {
for (i, value) in values.into_iter().enumerate() {
let mut stmt = self.inner.lock().unwrap();
stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into());
}
}
params::Params::Named(values) => {
for (name, value) in values.into_iter() {
let mut stmt = self.inner.lock().unwrap();
let i = stmt.parameters().index(name).unwrap();
stmt.bind_at(i, value.into());
}
}
}
loop {
let mut stmt = self.inner.lock().unwrap();
match stmt.step() {
Ok(turso_core::StepResult::Row) => {
// unexpected row during execution, error out.
return Ok(2);
}
Ok(turso_core::StepResult::Done) => {
return Ok(0);
}
Ok(turso_core::StepResult::IO) => {
let _ = stmt.run_once();
//return Ok(1);
}
Ok(turso_core::StepResult::Busy) => {
return Ok(4);
}
Ok(turso_core::StepResult::Interrupt) => {
return Ok(3);
}
Err(err) => {
return Err(err.into());
}
}
}
}
/// Returns columns of the result of this prepared statement.
pub fn columns(&self) -> Vec<Column> {
let stmt = self.inner.lock().unwrap();
let n = stmt.num_columns();
let mut cols = Vec::with_capacity(n);
for i in 0..n {
let name = stmt.get_column_name(i).into_owned();
cols.push(Column {
name,
decl_type: None, // TODO
});
}
cols
}
}
/// Column information.
pub struct Column {
name: String,
@ -357,100 +249,27 @@ pub enum Params {
pub struct Transaction {}
/// Results of a prepared statement query.
pub struct Rows {
inner: Arc<Mutex<turso_core::Statement>>,
}
impl Clone for Rows {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
unsafe impl Send for Rows {}
unsafe impl Sync for Rows {}
impl Rows {
/// Fetch the next row of this result set.
pub async fn next(&mut self) -> Result<Option<Row>> {
loop {
let mut stmt = self
.inner
.lock()
.map_err(|e| Error::MutexError(e.to_string()))?;
match stmt.step() {
Ok(turso_core::StepResult::Row) => {
let row = stmt.row().unwrap();
return Ok(Some(Row {
values: row.get_values().map(|v| v.to_owned()).collect(),
}));
}
Ok(turso_core::StepResult::Done) => return Ok(None),
Ok(turso_core::StepResult::IO) => {
if let Err(e) = stmt.run_once() {
return Err(e.into());
}
continue;
}
Ok(turso_core::StepResult::Busy) => return Ok(None),
Ok(turso_core::StepResult::Interrupt) => return Ok(None),
_ => return Ok(None),
}
}
}
}
/// Query result row.
#[derive(Debug)]
pub struct Row {
values: Vec<turso_core::Value>,
}
unsafe impl Send for Row {}
unsafe impl Sync for Row {}
impl Row {
pub fn get_value(&self, index: usize) -> Result<Value> {
let value = &self.values[index];
match value {
turso_core::Value::Integer(i) => Ok(Value::Integer(*i)),
turso_core::Value::Null => Ok(Value::Null),
turso_core::Value::Float(f) => Ok(Value::Real(*f)),
turso_core::Value::Text(text) => Ok(Value::Text(text.to_string())),
turso_core::Value::Blob(items) => Ok(Value::Blob(items.to_vec())),
}
}
pub fn column_count(&self) -> usize {
self.values.len()
}
}
impl<'a> FromIterator<&'a turso_core::Value> for Row {
fn from_iter<T: IntoIterator<Item = &'a turso_core::Value>>(iter: T) -> Self {
let values = iter
.into_iter()
.map(|v| match v {
turso_core::Value::Integer(i) => turso_core::Value::Integer(*i),
turso_core::Value::Null => turso_core::Value::Null,
turso_core::Value::Float(f) => turso_core::Value::Float(*f),
turso_core::Value::Text(s) => turso_core::Value::Text(s.clone()),
turso_core::Value::Blob(b) => turso_core::Value::Blob(b.clone()),
})
.collect();
Row { values }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(feature = "futures")]
use futures_util::TryStreamExt;
use tempfile::NamedTempFile;
#[cfg(not(feature = "futures"))]
macro_rules! rows_next {
($rows:expr) => {
$rows.next()
};
}
#[cfg(feature = "futures")]
macro_rules! rows_next {
($rows:expr) => {
$rows.try_next()
};
}
#[tokio::test]
async fn test_database_persistence() -> Result<()> {
let temp_file = NamedTempFile::new().unwrap();
@ -479,13 +298,13 @@ mod tests {
.query("SELECT name FROM test_persistence ORDER BY id;", ())
.await?;
let row1 = rows.next().await?.expect("Expected first row");
let row1 = rows_next!(rows).await?.expect("Expected first row");
assert_eq!(row1.get_value(0)?, Value::Text("Alice".to_string()));
let row2 = rows.next().await?.expect("Expected second row");
let row2 = rows_next!(rows).await?.expect("Expected second row");
assert_eq!(row2.get_value(0)?, Value::Text("Bob".to_string()));
assert!(rows.next().await?.is_none(), "Expected no more rows");
assert!(rows_next!(rows).await?.is_none(), "Expected no more rows");
Ok(())
}
@ -534,8 +353,7 @@ mod tests {
.await?;
for (i, value) in original_data.iter().enumerate().take(NUM_INSERTS) {
let row = rows
.next()
let row = rows_next!(rows)
.await?
.unwrap_or_else(|| panic!("Expected row {} but found None", i));
assert_eq!(
@ -547,7 +365,7 @@ mod tests {
}
assert!(
rows.next().await?.is_none(),
rows_next!(rows).await?.is_none(),
"Expected no more rows after retrieving all inserted data"
);
@ -604,9 +422,9 @@ mod tests {
let mut rows_iter = conn
.query("SELECT count(*) FROM test_persistence;", ())
.await?;
let rows = rows_iter.next().await?.unwrap();
let rows = rows_next!(rows_iter).await?.unwrap();
assert_eq!(rows.get_value(0)?, Value::Integer(i as i64 + 1));
assert!(rows_iter.next().await?.is_none());
assert!(rows_next!(rows_iter).await?.is_none());
}
}

View file

@ -9,14 +9,14 @@ mod sealed {
use sealed::Sealed;
/// Converts some type into parameters that can be passed
/// to libsql.
/// to Turso.
///
/// The trait is sealed and not designed to be implemented by hand
/// but instead provides a few ways to use it.
///
/// # Passing parameters to libsql
/// # Passing parameters to Turso
///
/// Many functions in this library let you pass parameters to libsql. Doing this
/// Many functions in this library let you pass parameters to Turso. Doing this
/// lets you avoid any risk of SQL injection, and is simpler than escaping
/// things manually. These functions generally contain some parameter that generically
/// accepts some implementation this trait.
@ -27,7 +27,7 @@ use sealed::Sealed;
///
/// - For heterogeneous parameter lists of 16 or less items a tuple syntax is supported
/// by doing `(1, "foo")`.
/// - For hetergeneous parameter lists of 16 or greater, the [`turso::params!`] is supported
/// - For hetergeneous parameter lists of 16 or greater, the [`crate::params!`] is supported
/// by doing `turso::params![1, "foo"]`.
/// - For homogeneous parameter types (where they are all the same type), const arrays are
/// supported by doing `[1, 2, 3]`.
@ -62,8 +62,8 @@ use sealed::Sealed;
///
/// - For heterogeneous parameter lists of 16 or less items a tuple syntax is supported
/// by doing `(("key1", 1), ("key2", "foo"))`.
/// - For hetergeneous parameter lists of 16 or greater, the [`turso::params!`] is supported
/// by doing `turso::named_params!["key1": 1, "key2": "foo"]`.
/// - For hetergeneous parameter lists of 16 or greater, the [`crate::named_params!`] is supported
/// by doing `turso::named_params!{"key1": 1, "key2": "foo"}`.
/// - For homogeneous parameter types (where they are all the same type), const arrays are
/// supported by doing `[("key1", 1), ("key2, 2), ("key3", 3)]`.
///
@ -77,7 +77,7 @@ use sealed::Sealed;
/// // Using a tuple:
/// stmt.execute(((":key1", 0), (":key2", "foobar"))).await?;
///
/// // Using `turso::named_params!`:
/// // Using `named_params!`:
/// stmt.execute(named_params! {":key1": 1i32, ":key2": "blah" }).await?;
///
/// // const array:
@ -106,7 +106,7 @@ pub enum Params {
/// # Example
///
/// ```rust
/// # use turso::{Connection, params_from_iter, Rows};
/// # use turso::{Connection, params_from_iter};
/// # async fn run(conn: &Connection) {
///
/// let iter = vec![1, 2, 3];

158
bindings/rust/src/row.rs Normal file
View file

@ -0,0 +1,158 @@
use std::sync::{Arc, Mutex};
use crate::Error;
use crate::Value;
/// Results of a prepared statement query.
#[must_use = "Rows is lazy and will do nothing unless consumed"]
pub struct Rows {
pub(crate) inner: Arc<Mutex<turso_core::Statement>>,
}
impl Clone for Rows {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
unsafe impl Send for Rows {}
unsafe impl Sync for Rows {}
#[cfg(not(feature = "futures"))]
impl Rows {
/// Fetch the next row of this result set.
pub async fn next(&mut self) -> crate::Result<Option<Row>> {
loop {
let mut stmt = self
.inner
.lock()
.map_err(|e| Error::MutexError(e.to_string()))?;
match stmt.step().inspect_err(|_| stmt.reset())? {
turso_core::StepResult::Row => {
let row = stmt.row().unwrap();
return Ok(Some(Row {
values: row.get_values().map(|v| v.to_owned()).collect(),
}));
}
turso_core::StepResult::Done => {
stmt.reset();
return Ok(None);
}
turso_core::StepResult::IO => {
if let Err(e) = stmt.run_once() {
return Err(e.into());
}
continue;
}
// TODO: Busy should probably be an error
turso_core::StepResult::Busy | turso_core::StepResult::Interrupt => {
stmt.reset();
return Ok(None);
}
}
}
}
}
#[cfg(feature = "futures")]
impl futures_util::Stream for Rows {
type Item = crate::Result<Row>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use std::task::Poll;
let stmt = self
.inner
.lock()
.map_err(|e| Error::MutexError(e.to_string()));
if let Err(err) = stmt {
return Poll::Ready(Some(Err(err)));
}
let mut stmt = stmt.unwrap();
match stmt.step() {
Ok(step_result) => match step_result {
turso_core::StepResult::Row => {
let row = stmt.row().unwrap();
Poll::Ready(Some(Ok(Row {
values: row.get_values().map(|v| v.to_owned()).collect(),
})))
}
turso_core::StepResult::Done => {
stmt.reset();
Poll::Ready(None)
}
turso_core::StepResult::IO => {
if let Err(e) = stmt.run_once() {
return Poll::Ready(Some(Err(e.into())));
}
// TODO: see correct way to signal for this task to wake up
Poll::Pending
}
// TODO: Busy and Interrupt should probably return errors
turso_core::StepResult::Busy | turso_core::StepResult::Interrupt => {
stmt.reset();
Poll::Ready(None)
}
},
Err(err) => {
stmt.reset();
Poll::Ready(Some(Err(Error::from(err))))
}
}
}
}
/// Query result row.
#[derive(Debug)]
pub struct Row {
values: Vec<turso_core::Value>,
}
unsafe impl Send for Row {}
unsafe impl Sync for Row {}
impl Row {
pub fn get_value(&self, index: usize) -> crate::Result<Value> {
let value = &self.values[index];
match value {
turso_core::Value::Integer(i) => Ok(Value::Integer(*i)),
turso_core::Value::Null => Ok(Value::Null),
turso_core::Value::Float(f) => Ok(Value::Real(*f)),
turso_core::Value::Text(text) => Ok(Value::Text(text.to_string())),
turso_core::Value::Blob(items) => Ok(Value::Blob(items.to_vec())),
}
}
pub fn column_count(&self) -> usize {
self.values.len()
}
}
impl<'a> FromIterator<&'a turso_core::Value> for Row {
fn from_iter<T: IntoIterator<Item = &'a turso_core::Value>>(iter: T) -> Self {
let values = iter
.into_iter()
.map(|v| match v {
turso_core::Value::Integer(i) => turso_core::Value::Integer(*i),
turso_core::Value::Null => turso_core::Value::Null,
turso_core::Value::Float(f) => turso_core::Value::Float(*f),
turso_core::Value::Text(s) => turso_core::Value::Text(s.clone()),
turso_core::Value::Blob(b) => turso_core::Value::Blob(b.clone()),
})
.collect();
Row { values }
}
}
#[cfg(test)]
mod tests {
#[tokio::test]
async fn test_row() {}
}

View file

@ -0,0 +1,124 @@
use std::{
num::NonZero,
sync::{Arc, Mutex},
};
use crate::{
params::{self, IntoParams},
Column, Rows,
};
/// A prepared statement.
///
/// Statements when executed or queried are reset after they encounter an error or run to completion
pub struct Statement {
pub(crate) inner: Arc<Mutex<turso_core::Statement>>,
}
impl Clone for Statement {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
unsafe impl Send for Statement {}
unsafe impl Sync for Statement {}
impl Statement {
/// Query the database with this prepared statement.
pub async fn query(&mut self, params: impl IntoParams) -> crate::Result<Rows> {
let params = params.into_params()?;
match params {
params::Params::None => (),
params::Params::Positional(values) => {
for (i, value) in values.into_iter().enumerate() {
let mut stmt = self.inner.lock().unwrap();
stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into());
}
}
params::Params::Named(values) => {
for (name, value) in values.into_iter() {
let mut stmt = self.inner.lock().unwrap();
let i = stmt.parameters().index(name).unwrap();
stmt.bind_at(i, value.into());
}
}
}
#[allow(clippy::arc_with_non_send_sync)]
let rows = Rows {
inner: Arc::clone(&self.inner),
};
Ok(rows)
}
/// Execute this prepared statement.
pub async fn execute(&mut self, params: impl IntoParams) -> crate::Result<u64> {
{
// Reset the statement before executing
self.inner.lock().unwrap().reset();
}
let params = params.into_params()?;
match params {
params::Params::None => (),
params::Params::Positional(values) => {
for (i, value) in values.into_iter().enumerate() {
let mut stmt = self.inner.lock().unwrap();
stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into());
}
}
params::Params::Named(values) => {
for (name, value) in values.into_iter() {
let mut stmt = self.inner.lock().unwrap();
let i = stmt.parameters().index(name).unwrap();
stmt.bind_at(i, value.into());
}
}
}
loop {
let mut stmt = self.inner.lock().unwrap();
match stmt.step() {
Ok(turso_core::StepResult::Row) => {}
Ok(turso_core::StepResult::Done) => {
stmt.reset();
return Ok(0);
}
Ok(turso_core::StepResult::IO) => {
stmt.run_once()?;
}
Ok(turso_core::StepResult::Busy) => {
stmt.reset();
return Ok(4);
}
Ok(turso_core::StepResult::Interrupt) => {
stmt.reset();
return Ok(3);
}
Err(err) => {
stmt.reset();
return Err(err.into());
}
}
}
}
/// Returns columns of the result of this prepared statement.
pub fn columns(&self) -> Vec<Column> {
let stmt = self.inner.lock().unwrap();
let n = stmt.num_columns();
let mut cols = Vec::with_capacity(n);
for i in 0..n {
let name = stmt.get_column_name(i).into_owned();
cols.push(Column {
name,
decl_type: None, // TODO
});
}
cols
}
}

View file

@ -11,7 +11,7 @@ pub enum Value {
Blob(Vec<u8>),
}
/// The possible types a column can be in libsql.
/// The possible types a column can be in Turso.
#[derive(Debug, Copy, Clone)]
pub enum ValueType {
Integer = 1,

View file

@ -1,5 +1,21 @@
#[cfg(feature = "futures")]
use futures_util::TryStreamExt;
use turso::{Builder, Value};
#[cfg(not(feature = "futures"))]
macro_rules! rows_next {
($rows:expr) => {
$rows.next()
};
}
#[cfg(feature = "futures")]
macro_rules! rows_next {
($rows:expr) => {
$rows.try_next()
};
}
#[tokio::test]
async fn test_rows_next() {
let builder = Builder::new_local(":memory:");
@ -34,24 +50,49 @@ async fn test_rows_next() {
.unwrap();
let mut res = conn.query("SELECT * FROM test", ()).await.unwrap();
assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(),
rows_next!(res)
.await
.unwrap()
.unwrap()
.get_value(0)
.unwrap(),
1.into()
);
assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(),
rows_next!(res)
.await
.unwrap()
.unwrap()
.get_value(0)
.unwrap(),
2.into()
);
assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(),
rows_next!(res)
.await
.unwrap()
.unwrap()
.get_value(0)
.unwrap(),
3.into()
);
assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(),
rows_next!(res)
.await
.unwrap()
.unwrap()
.get_value(0)
.unwrap(),
4.into()
);
assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(),
rows_next!(res)
.await
.unwrap()
.unwrap()
.get_value(0)
.unwrap(),
5.into()
);
assert!(res.next().await.unwrap().is_none());
assert!(rows_next!(res).await.unwrap().is_none());
}

View file

@ -15,6 +15,7 @@ use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
use turso::futures_util::TryStreamExt;
use turso::Builder;
pub struct Plan {
@ -522,7 +523,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
const INTEGRITY_CHECK_INTERVAL: usize = 100;
if query_index % INTEGRITY_CHECK_INTERVAL == 0 {
let mut res = conn.query("PRAGMA integrity_check", ()).await.unwrap();
if let Some(row) = res.next().await? {
if let Some(row) = res.try_next().await? {
let value = row.get_value(0).unwrap();
if value != "ok".into() {
panic!("integrity check failed: {:?}", value);