use limbo_core::{Connection, Database, PagerCacheflushStatus, IO}; use rand::{rng, RngCore}; use rusqlite::params; use std::path::{Path, PathBuf}; use std::sync::Arc; use tempfile::TempDir; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; #[allow(dead_code)] pub struct TempDatabase { pub path: PathBuf, pub io: Arc, } unsafe impl Send for TempDatabase {} #[allow(dead_code, clippy::arc_with_non_send_sync)] impl TempDatabase { pub fn new_empty() -> Self { Self::new(&format!("test-{}.db", rng().next_u32())) } pub fn new(db_name: &str) -> Self { let mut path = TempDir::new().unwrap().keep(); path.push(db_name); let io: Arc = Arc::new(limbo_core::PlatformIO::new().unwrap()); Self { path, io } } pub fn new_with_existent(db_path: &Path) -> Self { let io: Arc = Arc::new(limbo_core::PlatformIO::new().unwrap()); Self { path: db_path.to_path_buf(), io, } } pub fn new_with_rusqlite(table_sql: &str) -> Self { let _ = tracing_subscriber::fmt() .with_max_level(tracing::Level::TRACE) .finish(); let mut path = TempDir::new().unwrap().keep(); path.push("test.db"); { let connection = rusqlite::Connection::open(&path).unwrap(); connection .pragma_update(None, "journal_mode", "wal") .unwrap(); connection.execute(table_sql, ()).unwrap(); } let io: Arc = Arc::new(limbo_core::PlatformIO::new().unwrap()); Self { path, io } } pub fn connect_limbo(&self) -> Arc { Self::connect_limbo_with_flags(&self, limbo_core::OpenFlags::default()) } pub fn connect_limbo_with_flags( &self, flags: limbo_core::OpenFlags, ) -> Arc { log::debug!("conneting to limbo"); let db = Database::open_file_with_flags( self.io.clone(), self.path.to_str().unwrap(), flags, false, ) .unwrap(); let conn = db.connect().unwrap(); log::debug!("connected to limbo"); conn } pub fn limbo_database(&self) -> Arc { log::debug!("conneting to limbo"); Database::open_file(self.io.clone(), self.path.to_str().unwrap(), false).unwrap() } } pub(crate) fn do_flush(conn: &Arc, tmp_db: &TempDatabase) -> anyhow::Result<()> { loop { match conn.cacheflush()? { PagerCacheflushStatus::Done(_) => { break; } PagerCacheflushStatus::IO => { tmp_db.io.run_once()?; } } } Ok(()) } pub(crate) fn compare_string(a: impl AsRef, b: impl AsRef) { let a = a.as_ref(); let b = b.as_ref(); assert_eq!(a.len(), b.len(), "Strings are not equal in size!"); let a = a.as_bytes(); let b = b.as_bytes(); let len = a.len(); for i in 0..len { if a[i] != b[i] { println!( "Bytes differ \n\t at index: dec -> {} hex -> {:#02x} \n\t values dec -> {}!={} hex -> {:#02x}!={:#02x}", i, i, a[i], b[i], a[i], b[i] ); break; } } } pub fn maybe_setup_tracing() { let _ = tracing_subscriber::registry() .with( tracing_subscriber::fmt::layer() .with_ansi(false) .with_line_number(true) .with_thread_ids(true), ) .with(EnvFilter::from_default_env()) .try_init(); } pub(crate) fn sqlite_exec_rows( conn: &rusqlite::Connection, query: &str, ) -> Vec> { let mut stmt = conn.prepare(&query).unwrap(); let mut rows = stmt.query(params![]).unwrap(); let mut results = Vec::new(); while let Some(row) = rows.next().unwrap() { let mut result = Vec::new(); for i in 0.. { let column: rusqlite::types::Value = match row.get(i) { Ok(column) => column, Err(rusqlite::Error::InvalidColumnIndex(_)) => break, Err(err) => panic!("unexpected rusqlite error: {}", err), }; result.push(column); } results.push(result) } results } pub(crate) fn limbo_exec_rows( db: &TempDatabase, conn: &Arc, query: &str, ) -> Vec> { let mut stmt = conn.prepare(query).unwrap(); let mut rows = Vec::new(); 'outer: loop { let row = loop { let result = stmt.step().unwrap(); match result { limbo_core::StepResult::Row => { let row = stmt.row().unwrap(); break row; } limbo_core::StepResult::IO => { db.io.run_once().unwrap(); continue; } limbo_core::StepResult::Done => break 'outer, r => panic!("unexpected result {:?}: expecting single row", r), } }; let row = row .get_values() .map(|x| match x { limbo_core::Value::Null => rusqlite::types::Value::Null, limbo_core::Value::Integer(x) => rusqlite::types::Value::Integer(*x), limbo_core::Value::Float(x) => rusqlite::types::Value::Real(*x), limbo_core::Value::Text(x) => rusqlite::types::Value::Text(x.as_str().to_string()), limbo_core::Value::Blob(x) => rusqlite::types::Value::Blob(x.to_vec()), }) .collect(); rows.push(row); } rows } pub(crate) fn limbo_exec_rows_error( db: &TempDatabase, conn: &Arc, query: &str, ) -> limbo_core::Result<()> { let mut stmt = conn.prepare(query)?; loop { let result = stmt.step()?; match result { limbo_core::StepResult::IO => { db.io.run_once()?; continue; } limbo_core::StepResult::Done => return Ok(()), r => panic!("unexpected result {:?}: expecting single row", r), } } } #[cfg(test)] mod tests { use std::vec; use rand::Rng; use tempfile::TempDir; use super::{limbo_exec_rows, limbo_exec_rows_error, TempDatabase}; use rusqlite::types::Value; #[test] fn test_statement_columns() -> anyhow::Result<()> { let _ = env_logger::try_init(); let tmp_db = TempDatabase::new_with_rusqlite( "create table test (foo integer, bar integer, baz integer);", ); let conn = tmp_db.connect_limbo(); let stmt = conn.prepare("select * from test;")?; let columns = stmt.num_columns(); assert_eq!(columns, 3); assert_eq!(stmt.get_column_name(0), "foo"); assert_eq!(stmt.get_column_name(1), "bar"); assert_eq!(stmt.get_column_name(2), "baz"); let stmt = conn.prepare("select foo, bar from test;")?; let columns = stmt.num_columns(); assert_eq!(columns, 2); assert_eq!(stmt.get_column_name(0), "foo"); assert_eq!(stmt.get_column_name(1), "bar"); let stmt = conn.prepare("delete from test;")?; let columns = stmt.num_columns(); assert_eq!(columns, 0); let stmt = conn.prepare("insert into test (foo, bar, baz) values (1, 2, 3);")?; let columns = stmt.num_columns(); assert_eq!(columns, 0); let stmt = conn.prepare("delete from test where foo = 1")?; let columns = stmt.num_columns(); assert_eq!(columns, 0); Ok(()) } #[test] fn test_limbo_open_read_only() -> anyhow::Result<()> { let path = TempDir::new().unwrap().keep().join("temp_read_only"); let db = TempDatabase::new_with_existent(&path); { let conn = db.connect_limbo(); let ret = limbo_exec_rows(&db, &conn, "CREATE table t(a)"); assert!(ret.is_empty(), "{:?}", ret); limbo_exec_rows(&db, &conn, "INSERT INTO t values (1)"); conn.close().unwrap() } { let conn = db.connect_limbo_with_flags( limbo_core::OpenFlags::default() | limbo_core::OpenFlags::ReadOnly, ); let ret = limbo_exec_rows(&db, &conn, "SELECT * from t"); assert_eq!(ret, vec![vec![Value::Integer(1)]]); let err = limbo_exec_rows_error(&db, &conn, "INSERT INTO t values (1)").unwrap_err(); assert!(matches!(err, limbo_core::LimboError::ReadOnly), "{:?}", err); } Ok(()) } #[test] fn test_unique_index_ordering() -> anyhow::Result<()> { let db = TempDatabase::new_empty(); let conn = db.connect_limbo(); let _ = limbo_exec_rows(&db, &conn, "CREATE TABLE t(x INTEGER UNIQUE)"); // Insert 100 random integers between -1000 and 1000 let mut expected = Vec::new(); let mut rng = rand::rng(); let mut i = 0; while i < 100 { let val = rng.random_range(-1000..1000); if expected.contains(&val) { continue; } i += 1; expected.push(val); let ret = limbo_exec_rows(&db, &conn, &format!("INSERT INTO t VALUES ({})", val)); assert!(ret.is_empty(), "Insert failed for value {}: {:?}", val, ret); } // Sort expected values to match index order expected.sort(); // Query all values and verify they come back in sorted order let ret = limbo_exec_rows(&db, &conn, "SELECT x FROM t"); let actual: Vec = ret .into_iter() .map(|row| match &row[0] { Value::Integer(i) => *i, _ => panic!("Expected integer value"), }) .collect(); assert_eq!(actual, expected, "Values not returned in sorted order"); Ok(()) } #[test] fn test_large_unique_blobs() -> anyhow::Result<()> { let path = TempDir::new().unwrap().keep().join("temp_read_only"); let db = TempDatabase::new_with_existent(&path); let conn = db.connect_limbo(); let _ = limbo_exec_rows(&db, &conn, "CREATE TABLE t(x BLOB UNIQUE)"); // Insert 11 unique 1MB blobs for i in 0..11 { println!("Inserting blob #{}", i); let ret = limbo_exec_rows(&db, &conn, "INSERT INTO t VALUES (randomblob(1024*1024))"); assert!(ret.is_empty(), "Insert #{} failed: {:?}", i, ret); } // Verify we have 11 rows let ret = limbo_exec_rows(&db, &conn, "SELECT count(*) FROM t"); assert_eq!( ret, vec![vec![Value::Integer(11)]], "Expected 11 rows but got {:?}", ret ); Ok(()) } }