#![allow(clippy::arc_with_non_send_sync)] mod error; mod ext; mod fast_lock; mod function; mod functions; mod info; mod io; #[cfg(feature = "json")] mod json; pub mod mvcc; mod parameters; mod pragma; mod pseudo; pub mod result; mod schema; mod storage; mod translate; pub mod types; #[allow(dead_code)] mod util; mod vdbe; mod vector; mod vtab; #[cfg(feature = "fuzz")] pub mod numeric; #[cfg(not(feature = "fuzz"))] mod numeric; #[cfg(not(target_family = "wasm"))] #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; use crate::vtab::VirtualTable; use crate::{fast_lock::SpinLock, translate::optimizer::optimize_plan}; use core::str; pub use error::LimboError; use fallible_iterator::FallibleIterator; pub use io::clock::{Clock, Instant}; #[cfg(all(feature = "fs", target_family = "unix"))] pub use io::UnixIO; #[cfg(all(feature = "fs", target_os = "linux", feature = "io_uring"))] pub use io::UringIO; pub use io::{ Buffer, Completion, File, MemoryIO, OpenFlags, PlatformIO, SyscallIO, WriteCompletion, IO, }; use limbo_sqlite3_parser::{ast, ast::Cmd, lexer::sql::Parser}; use parking_lot::RwLock; use schema::Schema; use std::{ borrow::Cow, cell::{Cell, RefCell, UnsafeCell}, collections::HashMap, fmt::Display, io::Write, num::NonZero, ops::Deref, rc::Rc, sync::{Arc, OnceLock}, }; use storage::btree::{btree_init_page, BTreePageInner}; #[cfg(feature = "fs")] use storage::database::DatabaseFile; pub use storage::pager::PagerCacheflushStatus; pub use storage::{ buffer_pool::BufferPool, database::DatabaseStorage, pager::PageRef, pager::{Page, Pager}, wal::{CheckpointMode, CheckpointResult, CheckpointStatus, Wal, WalFile, WalFileShared}, }; use storage::{ page_cache::DumbLruPageCache, pager::allocate_page, sqlite3_ondisk::{DatabaseHeader, DATABASE_HEADER_SIZE}, }; use tracing::{instrument, Level}; use translate::select::prepare_select_plan; pub use types::RefValue; pub use types::Value; use util::parse_schema_rows; use vdbe::builder::QueryMode; use vdbe::builder::TableRefIdCounter; pub type Result = std::result::Result; pub static DATABASE_VERSION: OnceLock = OnceLock::new(); #[derive(Clone, Copy, PartialEq, Eq)] enum TransactionState { Write, Read, None, } pub(crate) type MvStore = mvcc::MvStore; pub(crate) type MvCursor = mvcc::cursor::ScanCursor; pub struct Database { mv_store: Option>, schema: Arc>, // TODO: make header work without lock header: Arc>, db_file: Arc, io: Arc, page_size: u32, // Shared structures of a Database are the parts that are common to multiple threads that might // create DB connections. _shared_page_cache: Arc>, shared_wal: Arc>, open_flags: OpenFlags, } unsafe impl Send for Database {} unsafe impl Sync for Database {} impl Database { #[cfg(feature = "fs")] pub fn open_file(io: Arc, path: &str, enable_mvcc: bool) -> Result> { Self::open_file_with_flags(io, path, OpenFlags::default(), enable_mvcc) } #[cfg(feature = "fs")] pub fn open_file_with_flags( io: Arc, path: &str, flags: OpenFlags, enable_mvcc: bool, ) -> Result> { let file = io.open_file(path, flags, true)?; maybe_init_database_file(&file, &io)?; let db_file = Arc::new(DatabaseFile::new(file)); Self::open_with_flags(io, path, db_file, flags, enable_mvcc) } #[allow(clippy::arc_with_non_send_sync)] pub fn open( io: Arc, path: &str, db_file: Arc, enable_mvcc: bool, ) -> Result> { Self::open_with_flags(io, path, db_file, OpenFlags::default(), enable_mvcc) } #[allow(clippy::arc_with_non_send_sync)] pub fn open_with_flags( io: Arc, path: &str, db_file: Arc, flags: OpenFlags, enable_mvcc: bool, ) -> Result> { let db_header = Pager::begin_open(db_file.clone())?; // ensure db header is there io.run_once()?; let page_size = db_header.lock().get_page_size(); let wal_path = format!("{}-wal", path); let shared_wal = WalFileShared::open_shared(&io, wal_path.as_str(), page_size)?; DATABASE_VERSION.get_or_init(|| { let version = db_header.lock().version_number; version.to_string() }); let mv_store = if enable_mvcc { Some(Rc::new(MvStore::new( mvcc::LocalClock::new(), mvcc::persistent_storage::Storage::new_noop(), ))) } else { None }; let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::default())); let schema = Arc::new(RwLock::new(Schema::new())); let db = Database { mv_store, schema: schema.clone(), header: db_header.clone(), _shared_page_cache: shared_page_cache.clone(), shared_wal: shared_wal.clone(), db_file, io: io.clone(), page_size, open_flags: flags, }; let db = Arc::new(db); { // parse schema let conn = db.connect()?; let rows = conn.query("SELECT * FROM sqlite_schema")?; let mut schema = schema .try_write() .expect("lock on schema should succeed first try"); let syms = conn.syms.borrow(); if let Err(LimboError::ExtensionError(e)) = parse_schema_rows(rows, &mut schema, io, &syms, None) { // this means that a vtab exists and we no longer have the module loaded. we print // a warning to the user to load the module eprintln!("Warning: {}", e); } } Ok(db) } pub fn connect(self: &Arc) -> Result> { let buffer_pool = Rc::new(BufferPool::new(self.page_size as usize)); let wal = Rc::new(RefCell::new(WalFile::new( self.io.clone(), self.page_size, self.shared_wal.clone(), buffer_pool.clone(), ))); // For now let's open database without shared cache by default. let pager = Rc::new(Pager::finish_open( self.header.clone(), self.db_file.clone(), wal, self.io.clone(), Arc::new(RwLock::new(DumbLruPageCache::default())), buffer_pool, )?); let conn = Arc::new(Connection { _db: self.clone(), pager: pager.clone(), schema: self.schema.clone(), header: self.header.clone(), last_insert_rowid: Cell::new(0), auto_commit: Cell::new(true), mv_transactions: RefCell::new(Vec::new()), transaction_state: Cell::new(TransactionState::None), last_change: Cell::new(0), syms: RefCell::new(SymbolTable::new()), total_changes: Cell::new(0), _shared_cache: false, cache_size: Cell::new(self.header.lock().default_page_cache_size), }); if let Err(e) = conn.register_builtins() { return Err(LimboError::ExtensionError(e)); } Ok(conn) } /// Open a new database file with a specified VFS without an existing database /// connection and symbol table to register extensions. #[cfg(feature = "fs")] #[allow(clippy::arc_with_non_send_sync)] pub fn open_new(path: &str, vfs: &str) -> Result<(Arc, Arc)> { let vfsmods = ext::add_builtin_vfs_extensions(None)?; let io: Arc = match vfsmods.iter().find(|v| v.0 == vfs).map(|v| v.1.clone()) { Some(vfs) => vfs, None => match vfs.trim() { "memory" => Arc::new(MemoryIO::new()), "syscall" => Arc::new(SyscallIO::new()?), #[cfg(all(target_os = "linux", feature = "io_uring"))] "io_uring" => Arc::new(UringIO::new()?), other => { return Err(LimboError::InvalidArgument(format!( "no such VFS: {}", other ))); } }, }; let db = Self::open_file(io.clone(), path, false)?; Ok((io, db)) } } pub fn maybe_init_database_file(file: &Arc, io: &Arc) -> Result<()> { if file.size()? == 0 { // init db let db_header = DatabaseHeader::default(); let page1 = allocate_page( 1, &Rc::new(BufferPool::new(db_header.get_page_size() as usize)), DATABASE_HEADER_SIZE, ); let page1 = Arc::new(BTreePageInner { page: RefCell::new(page1), }); { // Create the sqlite_schema table, for this we just need to create the btree page // for the first page of the database which is basically like any other btree page // but with a 100 byte offset, so we just init the page so that sqlite understands // this is a correct page. btree_init_page( &page1, storage::sqlite3_ondisk::PageType::TableLeaf, DATABASE_HEADER_SIZE, (db_header.get_page_size() - db_header.reserved_space as u32) as u16, ); let page1 = page1.get(); let contents = page1.get().contents.as_mut().unwrap(); contents.write_database_header(&db_header); // write the first page to disk synchronously let flag_complete = Rc::new(RefCell::new(false)); { let flag_complete = flag_complete.clone(); let completion = Completion::Write(WriteCompletion::new(Box::new(move |_| { *flag_complete.borrow_mut() = true; }))); #[allow(clippy::arc_with_non_send_sync)] file.pwrite(0, contents.buffer.clone(), Arc::new(completion))?; } let mut limit = 100; loop { io.run_once()?; if *flag_complete.borrow() { break; } limit -= 1; if limit == 0 { panic!("Database file couldn't be initialized, io loop run for {} iterations and write didn't finish", limit); } } } }; Ok(()) } pub struct Connection { _db: Arc, pager: Rc, schema: Arc>, header: Arc>, auto_commit: Cell, mv_transactions: RefCell>, transaction_state: Cell, last_insert_rowid: Cell, last_change: Cell, total_changes: Cell, syms: RefCell, _shared_cache: bool, cache_size: Cell, } impl Connection { #[instrument(skip_all, level = Level::TRACE)] pub fn prepare(self: &Arc, sql: impl AsRef) -> Result { if sql.as_ref().is_empty() { return Err(LimboError::InvalidArgument( "The supplied SQL string contains no statements".to_string(), )); } let sql = sql.as_ref(); tracing::trace!("Preparing: {}", sql); let mut parser = Parser::new(sql.as_bytes()); let cmd = parser.next()?; let syms = self.syms.borrow(); let cmd = cmd.expect("Successful parse on nonempty input string should produce a command"); let byte_offset_end = parser.offset(); let input = str::from_utf8(&sql.as_bytes()[..byte_offset_end]) .unwrap() .trim(); match cmd { Cmd::Stmt(stmt) => { let program = Rc::new(translate::translate( self.schema .try_read() .ok_or(LimboError::SchemaLocked)? .deref(), stmt, self.header.clone(), self.pager.clone(), self.clone(), &syms, QueryMode::Normal, &input, )?); Ok(Statement::new( program, self._db.mv_store.clone(), self.pager.clone(), )) } Cmd::Explain(_stmt) => todo!(), Cmd::ExplainQueryPlan(_stmt) => todo!(), } } #[instrument(skip_all, level = Level::TRACE)] pub fn query(self: &Arc, sql: impl AsRef) -> Result> { let sql = sql.as_ref(); tracing::trace!("Querying: {}", sql); let mut parser = Parser::new(sql.as_bytes()); let cmd = parser.next()?; let byte_offset_end = parser.offset(); let input = str::from_utf8(&sql.as_bytes()[..byte_offset_end]) .unwrap() .trim(); match cmd { Some(cmd) => self.run_cmd(cmd, input), None => Ok(None), } } #[instrument(skip_all, level = Level::TRACE)] pub(crate) fn run_cmd( self: &Arc, cmd: Cmd, input: &str, ) -> Result> { let syms = self.syms.borrow(); match cmd { Cmd::Stmt(ref stmt) | Cmd::Explain(ref stmt) => { let program = translate::translate( self.schema .try_read() .ok_or(LimboError::SchemaLocked)? .deref(), stmt.clone(), self.header.clone(), self.pager.clone(), self.clone(), &syms, cmd.into(), input, )?; let stmt = Statement::new( program.into(), self._db.mv_store.clone(), self.pager.clone(), ); Ok(Some(stmt)) } Cmd::ExplainQueryPlan(stmt) => { let mut table_ref_counter = TableRefIdCounter::new(); match stmt { ast::Stmt::Select(select) => { let mut plan = prepare_select_plan( self.schema .try_read() .ok_or(LimboError::SchemaLocked)? .deref(), *select, &syms, &[], &mut table_ref_counter, translate::plan::QueryDestination::ResultRows, )?; optimize_plan( &mut plan, self.schema .try_read() .ok_or(LimboError::SchemaLocked)? .deref(), )?; let _ = std::io::stdout().write_all(plan.to_string().as_bytes()); } _ => todo!(), } Ok(None) } } } pub fn query_runner<'a>(self: &'a Arc, sql: &'a [u8]) -> QueryRunner<'a> { QueryRunner::new(self, sql) } /// Execute will run a query from start to finish taking ownership of I/O because it will run pending I/Os if it didn't finish. /// TODO: make this api async #[instrument(skip_all, level = Level::TRACE)] pub fn execute(self: &Arc, sql: impl AsRef) -> Result<()> { let sql = sql.as_ref(); let mut parser = Parser::new(sql.as_bytes()); let cmd = parser.next()?; let syms = self.syms.borrow(); let byte_offset_end = parser.offset(); let input = str::from_utf8(&sql.as_bytes()[..byte_offset_end]) .unwrap() .trim(); if let Some(cmd) = cmd { match cmd { Cmd::Explain(stmt) => { let program = translate::translate( self.schema .try_read() .ok_or(LimboError::SchemaLocked)? .deref(), stmt, self.header.clone(), self.pager.clone(), self.clone(), &syms, QueryMode::Explain, &input, )?; let _ = std::io::stdout().write_all(program.explain().as_bytes()); } Cmd::ExplainQueryPlan(_stmt) => todo!(), Cmd::Stmt(stmt) => { let program = translate::translate( self.schema .try_read() .ok_or(LimboError::SchemaLocked)? .deref(), stmt, self.header.clone(), self.pager.clone(), self.clone(), &syms, QueryMode::Normal, &input, )?; let mut state = vdbe::ProgramState::new(program.max_registers, program.cursor_ref.len()); loop { let res = program.step( &mut state, self._db.mv_store.clone(), self.pager.clone(), )?; if matches!(res, StepResult::Done) { break; } self._db.io.run_once()?; } } } } Ok(()) } pub fn wal_frame_count(&self) -> Result { self.pager.wal_frame_count() } pub fn wal_get_frame( &self, frame_no: u32, p_frame: *mut u8, frame_len: u32, ) -> Result> { self.pager.wal_get_frame(frame_no, p_frame, frame_len) } /// Flush dirty pages to disk. /// This will write the dirty pages to the WAL and then fsync the WAL. /// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to /// the database file and then fsync the database file. pub fn cacheflush(&self) -> Result { self.pager.cacheflush() } pub fn clear_page_cache(&self) -> Result<()> { self.pager.clear_page_cache(); Ok(()) } pub fn checkpoint(&self) -> Result { let checkpoint_result = self.pager.wal_checkpoint(); Ok(checkpoint_result) } /// Close a connection and checkpoint. pub fn close(&self) -> Result<()> { self.pager.checkpoint_shutdown() } pub fn last_insert_rowid(&self) -> i64 { self.last_insert_rowid.get() } fn update_last_rowid(&self, rowid: i64) { self.last_insert_rowid.set(rowid); } pub fn set_changes(&self, nchange: i64) { self.last_change.set(nchange); let prev_total_changes = self.total_changes.get(); self.total_changes.set(prev_total_changes + nchange); } pub fn total_changes(&self) -> i64 { self.total_changes.get() } pub fn get_cache_size(&self) -> i32 { self.cache_size.get() } pub fn set_cache_size(&self, size: i32) { self.cache_size.set(size); } #[cfg(feature = "fs")] pub fn open_new(&self, path: &str, vfs: &str) -> Result<(Arc, Arc)> { Database::open_with_vfs(&self._db, path, vfs) } pub fn list_vfs(&self) -> Vec { let mut all_vfs = vec![String::from("memory")]; #[cfg(feature = "fs")] { #[cfg(target_family = "unix")] { all_vfs.push("syscall".to_string()); } #[cfg(all(target_os = "linux", feature = "io_uring"))] { all_vfs.push("io_uring".to_string()); } all_vfs.extend(crate::ext::list_vfs_modules()); } all_vfs } pub fn get_auto_commit(&self) -> bool { self.auto_commit.get() } pub fn parse_schema_rows(self: &Arc) -> Result<()> { let rows = self.query("SELECT * FROM sqlite_schema")?; let mut schema = self .schema .try_write() .expect("lock on schema should succeed first try"); { let syms = self.syms.borrow(); if let Err(LimboError::ExtensionError(e)) = parse_schema_rows(rows, &mut schema, self.pager.io.clone(), &syms, None) { // this means that a vtab exists and we no longer have the module loaded. we print // a warning to the user to load the module eprintln!("Warning: {}", e); } } Ok(()) } // Clearly there is something to improve here, Vec> isn't a couple of tea /// Query the current rows/values of `pragma_name`. pub fn pragma_query(self: &Arc, pragma_name: &str) -> Result>> { let pragma = format!("PRAGMA {}", pragma_name); let mut stmt = self.prepare(pragma)?; let mut results = Vec::new(); loop { match stmt.step()? { vdbe::StepResult::Row => { let row: Vec = stmt .row() .unwrap() .get_values() .map(|v| v.clone()) .collect(); results.push(row); } vdbe::StepResult::Interrupt | vdbe::StepResult::Busy => { return Err(LimboError::Busy); } _ => break, } } Ok(results) } /// Set a new value to `pragma_name`. /// /// Some pragmas will return the updated value which cannot be retrieved /// with this method. pub fn pragma_update( self: &Arc, pragma_name: &str, pragma_value: V, ) -> Result>> { let pragma = format!("PRAGMA {} = {}", pragma_name, pragma_value); let mut stmt = self.prepare(pragma)?; let mut results = Vec::new(); loop { match stmt.step()? { vdbe::StepResult::Row => { let row: Vec = stmt .row() .unwrap() .get_values() .map(|v| v.clone()) .collect(); results.push(row); } vdbe::StepResult::Interrupt | vdbe::StepResult::Busy => { return Err(LimboError::Busy); } _ => break, } } Ok(results) } /// Query the current value(s) of `pragma_name` associated to /// `pragma_value`. /// /// This method can be used with query-only pragmas which need an argument /// (e.g. `table_info('one_tbl')`) or pragmas which returns value(s) /// (e.g. `integrity_check`). pub fn pragma( self: &Arc, pragma_name: &str, pragma_value: V, ) -> Result>> { let pragma = format!("PRAGMA {}({})", pragma_name, pragma_value); let mut stmt = self.prepare(pragma)?; let mut results = Vec::new(); loop { match stmt.step()? { vdbe::StepResult::Row => { let row: Vec = stmt .row() .unwrap() .get_values() .map(|v| v.clone()) .collect(); results.push(row); } vdbe::StepResult::Interrupt | vdbe::StepResult::Busy => { return Err(LimboError::Busy); } _ => break, } } Ok(results) } } pub struct Statement { program: Rc, state: vdbe::ProgramState, mv_store: Option>, pager: Rc, } impl Statement { pub fn new( program: Rc, mv_store: Option>, pager: Rc, ) -> Self { let state = vdbe::ProgramState::new(program.max_registers, program.cursor_ref.len()); Self { program, state, mv_store, pager, } } pub fn set_mv_tx_id(&mut self, mv_tx_id: Option) { self.state.mv_tx_id = mv_tx_id; } pub fn interrupt(&mut self) { self.state.interrupt(); } pub fn step(&mut self) -> Result { self.program .step(&mut self.state, self.mv_store.clone(), self.pager.clone()) } pub fn run_once(&self) -> Result<()> { self.pager.io.run_once() } pub fn num_columns(&self) -> usize { self.program.result_columns.len() } pub fn get_column_name(&self, idx: usize) -> Cow { let column = &self.program.result_columns.get(idx).expect("No column"); match column.name(&self.program.table_references) { Some(name) => Cow::Borrowed(name), None => Cow::Owned(column.expr.to_string()), } } pub fn parameters(&self) -> ¶meters::Parameters { &self.program.parameters } pub fn parameters_count(&self) -> usize { self.program.parameters.count() } pub fn bind_at(&mut self, index: NonZero, value: Value) { self.state.bind_at(index, value); } pub fn reset(&mut self) { self.state.reset(); } pub fn row(&self) -> Option<&Row> { self.state.result_row.as_ref() } pub fn explain(&self) -> String { self.program.explain() } } pub type Row = vdbe::Row; pub type StepResult = vdbe::StepResult; pub struct SymbolTable { pub functions: HashMap>, pub vtabs: HashMap>, pub vtab_modules: HashMap>, } impl std::fmt::Debug for SymbolTable { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("SymbolTable") .field("functions", &self.functions) .finish() } } fn is_shared_library(path: &std::path::Path) -> bool { path.extension() .map_or(false, |ext| ext == "so" || ext == "dylib" || ext == "dll") } pub fn resolve_ext_path(extpath: &str) -> Result { let path = std::path::Path::new(extpath); if !path.exists() { if is_shared_library(path) { return Err(LimboError::ExtensionError(format!( "Extension file not found: {}", extpath ))); }; let maybe = path.with_extension(std::env::consts::DLL_EXTENSION); maybe .exists() .then_some(maybe) .ok_or(LimboError::ExtensionError(format!( "Extension file not found: {}", extpath ))) } else { Ok(path.to_path_buf()) } } impl SymbolTable { pub fn new() -> Self { Self { functions: HashMap::new(), vtabs: HashMap::new(), vtab_modules: HashMap::new(), } } pub fn resolve_function( &self, name: &str, _arg_count: usize, ) -> Option> { self.functions.get(name).cloned() } } pub struct QueryRunner<'a> { parser: Parser<'a>, conn: &'a Arc, statements: &'a [u8], last_offset: usize, } impl<'a> QueryRunner<'a> { pub(crate) fn new(conn: &'a Arc, statements: &'a [u8]) -> Self { Self { parser: Parser::new(statements), conn, statements, last_offset: 0, } } } impl Iterator for QueryRunner<'_> { type Item = Result>; fn next(&mut self) -> Option { match self.parser.next() { Ok(Some(cmd)) => { let byte_offset_end = self.parser.offset(); let input = str::from_utf8(&self.statements[self.last_offset..byte_offset_end]) .unwrap() .trim(); self.last_offset = byte_offset_end; Some(self.conn.run_cmd(cmd, &input)) } Ok(None) => None, Err(err) => { self.parser.finalize(); Some(Result::Err(LimboError::from(err))) } } } }