Merge 'Switch Connection to use Arc instead of Rc' from Pekka Enberg

Connection needs to be Arc so that bindings can wrap it with `Mutex` for
multi-threading.

Closes #1749
This commit is contained in:
Pekka Enberg 2025-06-16 11:02:32 +03:00
commit 2b51fd69e8
32 changed files with 124 additions and 187 deletions

View file

@ -5,7 +5,6 @@ mod types;
use limbo_core::{Connection, Database, LimboError, IO};
use std::{
ffi::{c_char, c_void},
rc::Rc,
sync::Arc,
};
@ -40,13 +39,13 @@ pub unsafe extern "C" fn db_open(path: *const c_char) -> *mut c_void {
#[allow(dead_code)]
struct LimboConn {
conn: Rc<Connection>,
conn: Arc<Connection>,
io: Arc<dyn limbo_core::IO>,
err: Option<LimboError>,
}
impl LimboConn {
fn new(conn: Rc<Connection>, io: Arc<dyn limbo_core::IO>) -> Self {
fn new(conn: Arc<Connection>, io: Arc<dyn limbo_core::IO>) -> Self {
LimboConn {
conn,
io,

View file

@ -8,19 +8,16 @@ use jni::objects::{JByteArray, JObject};
use jni::sys::jlong;
use jni::JNIEnv;
use limbo_core::Connection;
use std::rc::Rc;
use std::sync::Arc;
#[derive(Clone)]
pub struct LimboConnection {
// Because java's LimboConnection is 1:1 mapped to limbo connection, we can use Rc
pub(crate) conn: Rc<Connection>,
// Because io is shared across multiple `LimboConnection`s, wrap it with Arc
pub(crate) conn: Arc<Connection>,
pub(crate) io: Arc<dyn limbo_core::IO>,
}
impl LimboConnection {
pub fn new(conn: Rc<Connection>, io: Arc<dyn limbo_core::IO>) -> Self {
pub fn new(conn: Arc<Connection>, io: Arc<dyn limbo_core::IO>) -> Self {
LimboConnection { conn, io }
}

View file

@ -41,7 +41,7 @@ pub struct Database {
#[napi(writable = false)]
pub name: String,
_db: Arc<limbo_core::Database>,
conn: Rc<limbo_core::Connection>,
conn: Arc<limbo_core::Connection>,
io: Arc<dyn limbo_core::IO>,
}

View file

@ -228,7 +228,7 @@ fn stmt_is_ddl(sql: &str) -> bool {
#[pyclass(unsendable)]
#[derive(Clone)]
pub struct Connection {
conn: Rc<limbo_core::Connection>,
conn: Arc<limbo_core::Connection>,
io: Arc<dyn limbo_core::IO>,
}
@ -310,13 +310,13 @@ pub fn connect(path: &str) -> Result<Connection> {
":memory:" => {
let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::MemoryIO::new());
let db = open_or(io.clone(), path)?;
let conn: Rc<limbo_core::Connection> = db.connect().unwrap();
let conn: Arc<limbo_core::Connection> = db.connect().unwrap();
Ok(Connection { conn, io })
}
path => {
let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::PlatformIO::new()?);
let db = open_or(io.clone(), path)?;
let conn: Rc<limbo_core::Connection> = db.connect().unwrap();
let conn: Arc<limbo_core::Connection> = db.connect().unwrap();
Ok(Connection { conn, io })
}
}

View file

@ -8,7 +8,6 @@ pub use params::params_from_iter;
use crate::params::*;
use std::fmt::Debug;
use std::num::NonZero;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
#[derive(Debug, thiserror::Error)]
@ -84,7 +83,7 @@ impl Database {
}
pub struct Connection {
inner: Arc<Mutex<Rc<limbo_core::Connection>>>,
inner: Arc<Mutex<Arc<limbo_core::Connection>>>,
}
impl Clone for Connection {

View file

@ -1,14 +1,13 @@
use js_sys::{Array, Object};
use limbo_core::{maybe_init_database_file, Clock, Instant, OpenFlags, Result};
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Arc;
use wasm_bindgen::prelude::*;
#[allow(dead_code)]
#[wasm_bindgen]
pub struct Database {
db: Arc<limbo_core::Database>,
conn: Rc<limbo_core::Connection>,
conn: Arc<limbo_core::Connection>,
}
#[allow(clippy::arc_with_non_send_sync)]

View file

@ -21,7 +21,6 @@ use std::{
fmt,
io::{self, BufRead as _, Write},
path::PathBuf,
rc::Rc,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
@ -68,7 +67,7 @@ pub struct Limbo {
pub prompt: String,
io: Arc<dyn limbo_core::IO>,
writer: Box<dyn Write>,
conn: Rc<limbo_core::Connection>,
conn: Arc<limbo_core::Connection>,
pub interrupt_count: Arc<AtomicUsize>,
input_buff: String,
opts: Settings,

View file

@ -1,7 +1,7 @@
use clap::Args;
use clap_complete::{ArgValueCompleter, PathCompleter};
use limbo_core::Connection;
use std::{fs::File, io::Write, path::PathBuf, rc::Rc, sync::Arc};
use std::{fs::File, io::Write, path::PathBuf, sync::Arc};
#[derive(Debug, Clone, Args)]
pub struct ImportArgs {
@ -20,14 +20,14 @@ pub struct ImportArgs {
}
pub struct ImportFile<'a> {
conn: Rc<Connection>,
conn: Arc<Connection>,
io: Arc<dyn limbo_core::IO>,
writer: &'a mut dyn Write,
}
impl<'a> ImportFile<'a> {
pub fn new(
conn: Rc<Connection>,
conn: Arc<Connection>,
io: Arc<dyn limbo_core::IO>,
writer: &'a mut dyn Write,
) -> Self {

View file

@ -8,7 +8,6 @@ use rustyline::{Completer, Helper, Hinter, Validator};
use shlex::Shlex;
use std::cell::RefCell;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::Arc;
use std::{ffi::OsString, path::PathBuf, str::FromStr as _};
use syntect::dumps::from_uncompressed_data;
@ -42,7 +41,7 @@ pub struct LimboHelper {
impl LimboHelper {
pub fn new(
conn: Rc<Connection>,
conn: Arc<Connection>,
io: Arc<dyn limbo_core::IO>,
syntax_config: Option<HighlightConfig>,
) -> Self {
@ -141,7 +140,7 @@ impl Highlighter for LimboHelper {
}
pub struct SqlCompleter<C: Parser + Send + Sync + 'static> {
conn: Rc<Connection>,
conn: Arc<Connection>,
io: Arc<dyn limbo_core::IO>,
// Has to be a ref cell as Rustyline takes immutable reference to self
// This problem would be solved with Reedline as it uses &mut self for completions
@ -150,7 +149,7 @@ pub struct SqlCompleter<C: Parser + Send + Sync + 'static> {
}
impl<C: Parser + Send + Sync + 'static> SqlCompleter<C> {
pub fn new(conn: Rc<Connection>, io: Arc<dyn limbo_core::IO>) -> Self {
pub fn new(conn: Arc<Connection>, io: Arc<dyn limbo_core::IO>) -> Self {
Self {
conn,
io,

View file

@ -6,7 +6,6 @@ use libloading::{Library, Symbol};
use limbo_ext::{ExtensionApi, ExtensionApiRef, ExtensionEntryPoint, ResultCode, VfsImpl};
use std::{
ffi::{c_char, CString},
rc::Rc,
sync::{Arc, Mutex, OnceLock},
};
@ -31,7 +30,7 @@ unsafe impl Sync for VfsMod {}
impl Connection {
pub fn load_extension<P: AsRef<std::ffi::OsStr>>(
self: &Rc<Connection>,
self: &Arc<Connection>,
path: P,
) -> crate::Result<()> {
use limbo_ext::ExtensionApiRef;

View file

@ -5,7 +5,7 @@ use std::{
ffi::{c_char, c_void, CStr, CString},
num::NonZeroUsize,
ptr,
rc::Weak,
sync::Arc,
};
/// Free memory for the internal context of the connection.
@ -17,7 +17,7 @@ pub unsafe extern "C" fn close(ctx: *mut c_void) {
}
// only free the memory for the boxed connection, we don't upgrade
// or actually close the core connection, as we were 'sharing' it.
let _ = Box::from_raw(ctx as *mut Weak<Connection>);
let _ = Box::from_raw(ctx as *mut Arc<Connection>);
}
/// Wrapper around core Connection::execute with optional arguments to bind
@ -41,12 +41,8 @@ pub unsafe extern "C" fn execute(
tracing::error!("query: null connection");
return ResultCode::Error;
};
let weak_ptr = extcon._ctx as *const Weak<Connection>;
let weak = &*weak_ptr;
let Some(conn) = weak.upgrade() else {
tracing::error!("prepare_stmt: failed to upgrade weak pointer in prepare stmt");
return ResultCode::Error;
};
let conn_ptr = extcon._ctx as *const Arc<Connection>;
let conn = &*conn_ptr;
match conn.query(&sql_str) {
Ok(Some(mut stmt)) => {
if arg_count > 0 {
@ -102,12 +98,8 @@ pub unsafe extern "C" fn prepare_stmt(ctx: *mut ExtConn, sql: *const c_char) ->
tracing::error!("prepare_stmt: null connection");
return ptr::null_mut();
};
let weak_ptr = extcon._ctx as *const Weak<Connection>;
let weak = &*weak_ptr;
let Some(conn) = weak.upgrade() else {
tracing::error!("prepare_stmt: failed to upgrade weak pointer in prepare stmt");
return ptr::null_mut();
};
let db_ptr = extcon._ctx as *const Arc<Connection>;
let conn = &*db_ptr;
match conn.prepare(&sql_str) {
Ok(stmt) => {
let raw_stmt = Box::into_raw(Box::new(stmt)) as *mut c_void;

View file

@ -209,7 +209,7 @@ impl Database {
Ok(db)
}
pub fn connect(self: &Arc<Database>) -> Result<Rc<Connection>> {
pub fn connect(self: &Arc<Database>) -> Result<Arc<Connection>> {
let buffer_pool = Rc::new(BufferPool::new(self.page_size as usize));
let wal = Rc::new(RefCell::new(WalFile::new(
@ -227,7 +227,7 @@ impl Database {
Arc::new(RwLock::new(DumbLruPageCache::default())),
buffer_pool,
)?);
let conn = Rc::new(Connection {
let conn = Arc::new(Connection {
_db: self.clone(),
pager: pager.clone(),
schema: self.schema.clone(),
@ -345,7 +345,7 @@ pub struct Connection {
impl Connection {
#[instrument(skip_all, level = Level::TRACE)]
pub fn prepare(self: &Rc<Connection>, sql: impl AsRef<str>) -> Result<Statement> {
pub fn prepare(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<Statement> {
if sql.as_ref().is_empty() {
return Err(LimboError::InvalidArgument(
"The supplied SQL string contains no statements".to_string(),
@ -372,7 +372,7 @@ impl Connection {
stmt,
self.header.clone(),
self.pager.clone(),
Rc::downgrade(self),
self.clone(),
&syms,
QueryMode::Normal,
&input,
@ -389,7 +389,7 @@ impl Connection {
}
#[instrument(skip_all, level = Level::TRACE)]
pub fn query(self: &Rc<Connection>, sql: impl AsRef<str>) -> Result<Option<Statement>> {
pub fn query(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<Option<Statement>> {
let sql = sql.as_ref();
tracing::trace!("Querying: {}", sql);
let mut parser = Parser::new(sql.as_bytes());
@ -406,7 +406,7 @@ impl Connection {
#[instrument(skip_all, level = Level::TRACE)]
pub(crate) fn run_cmd(
self: &Rc<Connection>,
self: &Arc<Connection>,
cmd: Cmd,
input: &str,
) -> Result<Option<Statement>> {
@ -421,7 +421,7 @@ impl Connection {
stmt.clone(),
self.header.clone(),
self.pager.clone(),
Rc::downgrade(self),
self.clone(),
&syms,
cmd.into(),
input,
@ -464,14 +464,14 @@ impl Connection {
}
}
pub fn query_runner<'a>(self: &'a Rc<Connection>, sql: &'a [u8]) -> QueryRunner<'a> {
pub fn query_runner<'a>(self: &'a Arc<Connection>, sql: &'a [u8]) -> QueryRunner<'a> {
QueryRunner::new(self, sql)
}
/// Execute will run a query from start to finish taking ownership of I/O because it will run pending I/Os if it didn't finish.
/// TODO: make this api async
#[instrument(skip_all, level = Level::TRACE)]
pub fn execute(self: &Rc<Connection>, sql: impl AsRef<str>) -> Result<()> {
pub fn execute(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<()> {
let sql = sql.as_ref();
let mut parser = Parser::new(sql.as_bytes());
let cmd = parser.next()?;
@ -491,7 +491,7 @@ impl Connection {
stmt,
self.header.clone(),
self.pager.clone(),
Rc::downgrade(self),
self.clone(),
&syms,
QueryMode::Explain,
&input,
@ -508,7 +508,7 @@ impl Connection {
stmt,
self.header.clone(),
self.pager.clone(),
Rc::downgrade(self),
self.clone(),
&syms,
QueryMode::Normal,
&input,
@ -620,7 +620,7 @@ impl Connection {
self.auto_commit.get()
}
pub fn parse_schema_rows(self: &Rc<Connection>) -> Result<()> {
pub fn parse_schema_rows(self: &Arc<Connection>) -> Result<()> {
let rows = self.query("SELECT * FROM sqlite_schema")?;
let mut schema = self
.schema
@ -641,7 +641,7 @@ impl Connection {
// Clearly there is something to improve here, Vec<Vec<Value>> isn't a couple of tea
/// Query the current rows/values of `pragma_name`.
pub fn pragma_query(self: &Rc<Connection>, pragma_name: &str) -> Result<Vec<Vec<Value>>> {
pub fn pragma_query(self: &Arc<Connection>, pragma_name: &str) -> Result<Vec<Vec<Value>>> {
let pragma = format!("PRAGMA {}", pragma_name);
let mut stmt = self.prepare(pragma)?;
let mut results = Vec::new();
@ -671,7 +671,7 @@ impl Connection {
/// Some pragmas will return the updated value which cannot be retrieved
/// with this method.
pub fn pragma_update<V: Display>(
self: &Rc<Connection>,
self: &Arc<Connection>,
pragma_name: &str,
pragma_value: V,
) -> Result<Vec<Vec<Value>>> {
@ -706,7 +706,7 @@ impl Connection {
/// (e.g. `table_info('one_tbl')`) or pragmas which returns value(s)
/// (e.g. `integrity_check`).
pub fn pragma<V: Display>(
self: &Rc<Connection>,
self: &Arc<Connection>,
pragma_name: &str,
pragma_value: V,
) -> Result<Vec<Vec<Value>>> {
@ -876,13 +876,13 @@ impl SymbolTable {
pub struct QueryRunner<'a> {
parser: Parser<'a>,
conn: &'a Rc<Connection>,
conn: &'a Arc<Connection>,
statements: &'a [u8],
last_offset: usize,
}
impl<'a> QueryRunner<'a> {
pub(crate) fn new(conn: &'a Rc<Connection>, statements: &'a [u8]) -> Self {
pub(crate) fn new(conn: &'a Arc<Connection>, statements: &'a [u8]) -> Self {
Self {
parser: Parser::new(statements),
conn,

View file

@ -1,8 +1,8 @@
use crate::{Connection, LimboError, Statement, StepResult, Value};
use bitflags::bitflags;
use limbo_sqlite3_parser::ast::PragmaName;
use std::rc::{Rc, Weak};
use std::str::FromStr;
use std::sync::Arc;
bitflags! {
// Flag names match those used in SQLite:
@ -141,13 +141,11 @@ impl PragmaVirtualTable {
))
}
pub(crate) fn open(&self, conn: Weak<Connection>) -> crate::Result<PragmaVirtualTableCursor> {
pub(crate) fn open(&self, conn: Arc<Connection>) -> crate::Result<PragmaVirtualTableCursor> {
Ok(PragmaVirtualTableCursor {
pragma_name: self.pragma_name.clone(),
pos: 0,
conn: conn
.upgrade()
.ok_or_else(|| LimboError::InternalError("Connection was dropped".into()))?,
conn,
stmt: None,
arg: None,
visible_column_count: self.visible_column_count,
@ -160,7 +158,7 @@ impl PragmaVirtualTable {
pub struct PragmaVirtualTableCursor {
pragma_name: String,
pos: usize,
conn: Rc<Connection>,
conn: Arc<Connection>,
stmt: Option<Statement>,
arg: Option<String>,
visible_column_count: usize,

View file

@ -6227,7 +6227,7 @@ mod tests {
pos: usize,
page: &mut PageContent,
record: ImmutableRecord,
conn: &Rc<Connection>,
conn: &Arc<Connection>,
) -> Vec<u8> {
let mut payload: Vec<u8> = Vec::new();
fill_cell_payload(

View file

@ -46,7 +46,7 @@ use insert::translate_insert;
use limbo_sqlite3_parser::ast::{self, Delete, Insert};
use schema::{translate_create_table, translate_create_virtual_table, translate_drop_table};
use select::translate_select;
use std::rc::{Rc, Weak};
use std::rc::Rc;
use std::sync::Arc;
use tracing::{instrument, Level};
use transaction::{translate_tx_begin, translate_tx_commit};
@ -58,7 +58,7 @@ pub fn translate(
stmt: ast::Stmt,
database_header: Arc<SpinLock<DatabaseHeader>>,
pager: Rc<Pager>,
connection: Weak<Connection>,
connection: Arc<Connection>,
syms: &SymbolTable,
query_mode: QueryMode,
_input: &str, // TODO: going to be used for CREATE VIEW

View file

@ -3,7 +3,7 @@
use limbo_sqlite3_parser::ast::PragmaName;
use limbo_sqlite3_parser::ast::{self, Expr};
use std::rc::{Rc, Weak};
use std::rc::Rc;
use std::sync::Arc;
use crate::fast_lock::SpinLock;
@ -34,7 +34,7 @@ pub fn translate_pragma(
body: Option<ast::PragmaBody>,
database_header: Arc<SpinLock<DatabaseHeader>>,
pager: Rc<Pager>,
connection: Weak<crate::Connection>,
connection: Arc<crate::Connection>,
mut program: ProgramBuilder,
) -> crate::Result<ProgramBuilder> {
let opts = ProgramBuilderOpts {
@ -124,7 +124,7 @@ fn update_pragma(
value: ast::Expr,
header: Arc<SpinLock<DatabaseHeader>>,
pager: Rc<Pager>,
connection: Weak<crate::Connection>,
connection: Arc<crate::Connection>,
program: &mut ProgramBuilder,
) -> crate::Result<()> {
match pragma {
@ -268,16 +268,13 @@ fn query_pragma(
value: Option<ast::Expr>,
database_header: Arc<SpinLock<DatabaseHeader>>,
pager: Rc<Pager>,
connection: Weak<crate::Connection>,
connection: Arc<crate::Connection>,
program: &mut ProgramBuilder,
) -> crate::Result<()> {
let register = program.alloc_register();
match pragma {
PragmaName::CacheSize => {
program.emit_int(
connection.upgrade().unwrap().get_cache_size() as i64,
register,
);
program.emit_int(connection.get_cache_size() as i64, register);
program.emit_result_row(register, 1);
program.add_pragma_result_column(pragma.to_string());
}
@ -417,7 +414,7 @@ fn update_cache_size(
value: i64,
header: Arc<SpinLock<DatabaseHeader>>,
pager: Rc<Pager>,
connection: Weak<crate::Connection>,
connection: Arc<crate::Connection>,
) -> crate::Result<()> {
let mut cache_size_unformatted: i64 = value;
let mut cache_size = if cache_size_unformatted < 0 {
@ -432,10 +429,7 @@ fn update_cache_size(
cache_size = MIN_PAGE_CACHE_SIZE;
cache_size_unformatted = MIN_PAGE_CACHE_SIZE as i64;
}
connection
.upgrade()
.unwrap()
.set_cache_size(cache_size_unformatted as i32);
connection.set_cache_size(cache_size_unformatted as i32);
// update cache size
pager

View file

@ -1,9 +1,4 @@
use std::{
cell::Cell,
cmp::Ordering,
rc::{Rc, Weak},
sync::Arc,
};
use std::{cell::Cell, cmp::Ordering, rc::Rc, sync::Arc};
use limbo_sqlite3_parser::ast::{self, TableInternalId};
use tracing::{instrument, Level};
@ -856,7 +851,7 @@ impl ProgramBuilder {
pub fn build(
mut self,
database_header: Arc<SpinLock<DatabaseHeader>>,
connection: Weak<Connection>,
connection: Arc<Connection>,
change_cnt_on: bool,
) -> Program {
self.resolve_labels();

View file

@ -214,10 +214,8 @@ pub fn op_drop_index(
let Insn::DropIndex { index, db: _ } = insn else {
unreachable!("unexpected Insn {:?}", insn)
};
if let Some(conn) = program.connection.upgrade() {
let mut schema = conn.schema.write();
schema.remove_index(&index);
}
let mut schema = program.connection.schema.write();
schema.remove_index(&index);
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
}
@ -310,7 +308,7 @@ pub fn op_checkpoint(
else {
unreachable!("unexpected Insn {:?}", insn)
};
let result = program.connection.upgrade().unwrap().checkpoint();
let result = program.connection.checkpoint();
match result {
Ok(CheckpointResult {
num_wal_frames: num_wal_pages,
@ -900,7 +898,7 @@ pub fn op_open_read(
.replace(Cursor::new_btree(cursor));
}
CursorType::BTreeIndex(index) => {
let conn = program.connection.upgrade().unwrap();
let conn = program.connection.clone();
let schema = conn.schema.try_read().ok_or(LimboError::SchemaLocked)?;
let table = schema
.get_table(&index.table_name)
@ -998,11 +996,7 @@ pub fn op_vcreate(
} else {
vec![]
};
let Some(conn) = program.connection.upgrade() else {
return Err(crate::LimboError::ExtensionError(
"Failed to upgrade Connection".to_string(),
));
};
let conn = program.connection.clone();
let table =
crate::VirtualTable::table(Some(&table_name), &module_name, args, &conn.syms.borrow())?;
{
@ -1123,9 +1117,7 @@ pub fn op_vupdate(
Ok(Some(new_rowid)) => {
if *conflict_action == 5 {
// ResolveType::Replace
if let Some(conn) = program.connection.upgrade() {
conn.update_last_rowid(new_rowid);
}
program.connection.update_last_rowid(new_rowid);
}
state.pc += 1;
}
@ -1181,12 +1173,7 @@ pub fn op_vdestroy(
let Insn::VDestroy { db, table_name } = insn else {
unreachable!("unexpected Insn {:?}", insn)
};
let Some(conn) = program.connection.upgrade() else {
return Err(crate::LimboError::ExtensionError(
"Failed to upgrade Connection".to_string(),
));
};
let conn = program.connection.clone();
{
let Some(vtab) = conn.syms.borrow_mut().vtabs.remove(table_name) else {
return Err(crate::LimboError::InternalError(
@ -1691,7 +1678,7 @@ pub fn op_transaction(
let Insn::Transaction { write } = insn else {
unreachable!("unexpected Insn {:?}", insn)
};
let connection = program.connection.upgrade().unwrap();
let connection = program.connection.clone();
if *write && connection._db.open_flags.contains(OpenFlags::ReadOnly) {
return Err(LimboError::ReadOnly);
}
@ -1746,7 +1733,7 @@ pub fn op_auto_commit(
else {
unreachable!("unexpected Insn {:?}", insn)
};
let conn = program.connection.upgrade().unwrap();
let conn = program.connection.clone();
if state.commit_state == CommitState::Committing {
return match program.commit_txn(pager.clone(), state, mv_store)? {
super::StepResult::Done => Ok(InsnFunctionStepResult::Done),
@ -3387,7 +3374,7 @@ pub fn op_function(
state.registers[*dest] = Register::Value(result);
}
ScalarFunc::Changes => {
let res = &program.connection.upgrade().unwrap().last_change;
let res = &program.connection.last_change;
let changes = res.get();
state.registers[*dest] = Register::Value(Value::Integer(changes));
}
@ -3435,12 +3422,8 @@ pub fn op_function(
state.registers[*dest] = Register::Value(result);
}
ScalarFunc::LastInsertRowid => {
if let Some(conn) = program.connection.upgrade() {
state.registers[*dest] =
Register::Value(Value::Integer(conn.last_insert_rowid() as i64));
} else {
state.registers[*dest] = Register::Value(Value::Null);
}
state.registers[*dest] =
Register::Value(Value::Integer(program.connection.last_insert_rowid() as i64));
}
ScalarFunc::Like => {
let pattern = &state.registers[*start_reg];
@ -3645,7 +3628,7 @@ pub fn op_function(
}
}
ScalarFunc::TotalChanges => {
let res = &program.connection.upgrade().unwrap().total_changes;
let res = &program.connection.total_changes;
let total_changes = res.get();
state.registers[*dest] = Register::Value(Value::Integer(total_changes));
}
@ -3706,9 +3689,7 @@ pub fn op_function(
ScalarFunc::LoadExtension => {
let extension = &state.registers[*start_reg];
let ext = resolve_ext_path(&extension.get_owned_value().to_string())?;
if let Some(conn) = program.connection.upgrade() {
conn.load_extension(ext)?;
}
program.connection.load_extension(ext)?;
}
ScalarFunc::StrfTime => {
let result = exec_strftime(&state.registers[*start_reg..*start_reg + arg_count]);
@ -4234,9 +4215,7 @@ pub fn op_insert(
// Only update last_insert_rowid for regular table inserts, not schema modifications
if cursor.root_page() != 1 {
if let Some(rowid) = return_if_io!(cursor.rowid()) {
if let Some(conn) = program.connection.upgrade() {
conn.update_last_rowid(rowid);
}
program.connection.update_last_rowid(rowid);
let prev_changes = program.n_change.get();
program.n_change.set(prev_changes + 1);
}
@ -4697,7 +4676,7 @@ pub fn op_open_write(
None => None,
};
if let Some(index) = maybe_index {
let conn = program.connection.upgrade().unwrap();
let conn = program.connection.clone();
let schema = conn.schema.try_read().ok_or(LimboError::SchemaLocked)?;
let table = schema
.get_table(&index.table_name)
@ -4823,7 +4802,8 @@ pub fn op_drop_table(
if *db > 0 {
todo!("temp databases not implemented yet");
}
if let Some(conn) = program.connection.upgrade() {
let conn = program.connection.clone();
{
let mut schema = conn.schema.write();
schema.remove_indices_for_table(table_name);
schema.remove_table(table_name);
@ -4900,8 +4880,7 @@ pub fn op_parse_schema(
else {
unreachable!("unexpected Insn {:?}", insn)
};
let conn = program.connection.upgrade();
let conn = conn.as_ref().unwrap();
let conn = program.connection.clone();
if let Some(where_clause) = where_clause {
let stmt = conn.prepare(format!(
@ -5187,7 +5166,7 @@ pub fn op_open_ephemeral(
_ => unreachable!("unexpected Insn {:?}", insn),
};
let conn = program.connection.upgrade().unwrap();
let conn = program.connection.clone();
let io = conn.pager.io.get_memory_io();
let file = io.open_file("", OpenFlags::Create, true)?;

View file

@ -54,8 +54,7 @@ use std::{
cell::{Cell, RefCell},
collections::HashMap,
num::NonZero,
ops::Deref,
rc::{Rc, Weak},
rc::Rc,
sync::Arc,
};
use tracing::{instrument, Level};
@ -351,7 +350,6 @@ macro_rules! must_be_btree_cursor {
}};
}
#[derive(Debug)]
pub struct Program {
pub max_registers: usize,
pub insns: Vec<(Insn, InsnFunction)>,
@ -359,7 +357,7 @@ pub struct Program {
pub database_header: Arc<SpinLock<DatabaseHeader>>,
pub comments: Option<Vec<(InsnReference, &'static str)>>,
pub parameters: crate::parameters::Parameters,
pub connection: Weak<Connection>,
pub connection: Arc<Connection>,
pub n_change: Cell<i64>,
pub change_cnt_on: bool,
pub result_columns: Vec<ResultSetColumn>,
@ -401,7 +399,7 @@ impl Program {
mv_store: Option<&Rc<MvStore>>,
) -> Result<StepResult> {
if let Some(mv_store) = mv_store {
let conn = self.connection.upgrade().unwrap();
let conn = self.connection.clone();
let auto_commit = conn.auto_commit.get();
if auto_commit {
let mut mv_transactions = conn.mv_transactions.borrow_mut();
@ -412,21 +410,18 @@ impl Program {
}
Ok(StepResult::Done)
} else {
let connection = self
.connection
.upgrade()
.expect("only weak ref to connection?");
let connection = self.connection.clone();
let auto_commit = connection.auto_commit.get();
tracing::trace!("Halt auto_commit {}", auto_commit);
if program_state.commit_state == CommitState::Committing {
self.step_end_write_txn(&pager, &mut program_state.commit_state, connection.deref())
self.step_end_write_txn(&pager, &mut program_state.commit_state, &connection)
} else if auto_commit {
let current_state = connection.transaction_state.get();
match current_state {
TransactionState::Write => self.step_end_write_txn(
&pager,
&mut program_state.commit_state,
connection.deref(),
&connection,
),
TransactionState::Read => {
connection.transaction_state.replace(TransactionState::None);
@ -437,9 +432,7 @@ impl Program {
}
} else {
if self.change_cnt_on {
if let Some(conn) = self.connection.upgrade() {
conn.set_changes(self.n_change.get());
}
self.connection.set_changes(self.n_change.get());
}
Ok(StepResult::Done)
}
@ -457,9 +450,7 @@ impl Program {
match cacheflush_status {
PagerCacheflushStatus::Done(_) => {
if self.change_cnt_on {
if let Some(conn) = self.connection.upgrade() {
conn.set_changes(self.n_change.get());
}
self.connection.set_changes(self.n_change.get());
}
connection.transaction_state.replace(TransactionState::None);
*commit_state = CommitState::Ready;

View file

@ -7,7 +7,8 @@ use limbo_ext::{ConstraintInfo, IndexInfo, OrderByInfo, ResultCode, VTabKind, VT
use limbo_sqlite3_parser::{ast, lexer::sql::Parser};
use std::cell::RefCell;
use std::ffi::c_void;
use std::rc::{Rc, Weak};
use std::rc::Rc;
use std::sync::Arc;
#[derive(Debug, Clone)]
enum VirtualTableType {
@ -90,7 +91,7 @@ impl VirtualTable {
}
}
pub(crate) fn open(&self, conn: Weak<Connection>) -> crate::Result<VirtualTableCursor> {
pub(crate) fn open(&self, conn: Arc<Connection>) -> crate::Result<VirtualTableCursor> {
match &self.vtab_type {
VirtualTableType::Pragma(table) => Ok(VirtualTableCursor::Pragma(table.open(conn)?)),
VirtualTableType::External(table) => {
@ -237,11 +238,11 @@ impl ExtVirtualTable {
Ok((vtab, schema))
}
/// Accepts a Weak pointer to the connection that owns the VTable, that the module
/// Accepts a pointer connection that owns the VTable, that the module
/// can optionally use to query the other tables.
fn open(&self, conn: Weak<Connection>) -> crate::Result<ExtVirtualTableCursor> {
fn open(&self, conn: Arc<Connection>) -> crate::Result<ExtVirtualTableCursor> {
// we need a Weak<Connection> to upgrade and call from the extension.
let weak_box: *mut Weak<Connection> = Box::into_raw(Box::new(conn));
let weak_box: *mut Arc<Connection> = Box::into_raw(Box::new(conn));
let conn = limbo_ext::Conn::new(
weak_box.cast(),
crate::ext::prepare_stmt,

View file

@ -3,7 +3,7 @@
mod keywords;
use std::rc::Rc;
use std::sync::Arc;
use keywords::KEYWORDS;
use limbo_ext::{
@ -87,7 +87,7 @@ impl VTable for CompletionTable {
type Cursor = CompletionCursor;
type Error = ResultCode;
fn open(&self, _conn: Option<Rc<Connection>>) -> Result<Self::Cursor, Self::Error> {
fn open(&self, _conn: Option<Arc<Connection>>) -> Result<Self::Cursor, Self::Error> {
Ok(CompletionCursor::default())
}
}

View file

@ -2,7 +2,7 @@ use crate::{types::StepResult, ExtResult, ResultCode, Value};
use std::{
ffi::{c_char, c_void, CStr, CString},
num::NonZeroUsize,
rc::Rc,
sync::Arc,
};
pub type RegisterModuleFn = unsafe extern "C" fn(
@ -131,7 +131,7 @@ pub trait VTable {
/// 'conn' is an Option to allow for testing. Otherwise a valid connection to the core database
/// that created the virtual table will be available to use in your extension here.
fn open(&self, _conn: Option<Rc<Connection>>) -> Result<Self::Cursor, Self::Error>;
fn open(&self, _conn: Option<Arc<Connection>>) -> Result<Self::Cursor, Self::Error>;
fn update(&mut self, _rowid: i64, _args: &[Value]) -> Result<(), Self::Error> {
Ok(())
}
@ -463,7 +463,7 @@ impl Connection {
}
/// From the included SQL string, prepare a statement for execution.
pub fn prepare(self: &Rc<Self>, sql: &str) -> ExtResult<Statement> {
pub fn prepare(self: &Arc<Self>, sql: &str) -> ExtResult<Statement> {
let stmt = unsafe { (*self.0).prepare_stmt(sql) };
if stmt.is_null() {
return Err(ResultCode::Error);
@ -473,7 +473,7 @@ impl Connection {
/// Execute a SQL statement with the given arguments.
/// Optionally returns the last inserted rowid for the query.
pub fn execute(self: &Rc<Self>, sql: &str, args: &[Value]) -> crate::ExtResult<Option<usize>> {
pub fn execute(self: &Arc<Self>, sql: &str, args: &[Value]) -> crate::ExtResult<Option<usize>> {
if self.0.is_null() {
return Err(ResultCode::Error);
}

View file

@ -26,7 +26,7 @@ use limbo_ext::{
};
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use std::rc::Rc;
use std::sync::Arc;
register_extension! {
vtabs: { CsvVTabModule }
@ -260,7 +260,7 @@ impl VTable for CsvTable {
type Cursor = CsvCursor;
type Error = ResultCode;
fn open(&self, _conn: Option<Rc<Connection>>) -> Result<Self::Cursor, Self::Error> {
fn open(&self, _conn: Option<Arc<Connection>>) -> Result<Self::Cursor, Self::Error> {
match self.new_reader() {
Ok(reader) => Ok(CsvCursor::new(reader, self)),
Err(_) => Err(ResultCode::Error),

View file

@ -1,4 +1,4 @@
use std::rc::Rc;
use std::sync::Arc;
use limbo_ext::{
register_extension, Connection, ResultCode, VTabCursor, VTabKind, VTabModule, VTabModuleDerive,
@ -45,7 +45,7 @@ impl VTable for GenerateSeriesTable {
type Cursor = GenerateSeriesCursor;
type Error = ResultCode;
fn open(&self, _conn: Option<Rc<Connection>>) -> Result<Self::Cursor, Self::Error> {
fn open(&self, _conn: Option<Arc<Connection>>) -> Result<Self::Cursor, Self::Error> {
Ok(GenerateSeriesCursor {
start: 0,
stop: 0,

View file

@ -10,8 +10,7 @@ use std::collections::BTreeMap;
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::num::NonZeroUsize;
use std::rc::Rc;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};
register_extension! {
vtabs: { KVStoreVTabModule, TableStatsVtabModule },
@ -139,7 +138,7 @@ impl VTable for KVStoreTable {
type Cursor = KVStoreCursor;
type Error = String;
fn open(&self, _conn: Option<Rc<Connection>>) -> Result<Self::Cursor, Self::Error> {
fn open(&self, _conn: Option<Arc<Connection>>) -> Result<Self::Cursor, Self::Error> {
let _ = env_logger::try_init();
Ok(KVStoreCursor {
rows: Vec::new(),
@ -303,7 +302,7 @@ pub struct TableStatsVtabModule;
pub struct StatsCursor {
pos: usize,
rows: Vec<(String, i64)>,
conn: Option<Rc<Connection>>,
conn: Option<Arc<Connection>>,
}
pub struct StatsTable {}
@ -322,7 +321,7 @@ impl VTable for StatsTable {
type Cursor = StatsCursor;
type Error = String;
fn open(&self, conn: Option<Rc<Connection>>) -> Result<Self::Cursor, Self::Error> {
fn open(&self, conn: Option<Arc<Connection>>) -> Result<Self::Cursor, Self::Error> {
Ok(StatsCursor {
pos: 0,
rows: Vec::new(),

View file

@ -55,7 +55,7 @@ pub fn derive_vtab_module(input: TokenStream) -> TokenStream {
}
let table = table as *const <#struct_name as ::limbo_ext::VTabModule>::Table;
let table: &<#struct_name as ::limbo_ext::VTabModule>::Table = &*table;
let conn = if conn.is_null() { None } else { Some(::std::rc::Rc::new(::limbo_ext::Connection::new(conn)))};
let conn = if conn.is_null() { None } else { Some(::std::sync::Arc::new(::limbo_ext::Connection::new(conn)))};
if let Ok(cursor) = <#struct_name as ::limbo_ext::VTabModule>::Table::open(table, conn) {
return ::std::boxed::Box::into_raw(::std::boxed::Box::new(cursor)) as *const ::std::ffi::c_void;
} else {

View file

@ -2,7 +2,7 @@ use std::{
collections::HashSet,
fmt::{Debug, Display},
path::Path,
rc::Rc,
sync::Arc,
vec,
};
@ -504,7 +504,7 @@ impl Interaction {
Self::Assumption(_) | Self::Assertion(_) | Self::Fault(_) => vec![],
}
}
pub(crate) fn execute_query(&self, conn: &mut Rc<Connection>, io: &SimulatorIO) -> ResultSet {
pub(crate) fn execute_query(&self, conn: &mut Arc<Connection>, io: &SimulatorIO) -> ResultSet {
if let Self::Query(query) = self {
let query_str = query.to_string();
let rows = conn.query(&query_str);

View file

@ -1,7 +1,6 @@
use std::fmt::Display;
use std::mem;
use std::path::Path;
use std::rc::Rc;
use std::sync::Arc;
use limbo_core::Database;
@ -164,7 +163,7 @@ where
}
pub(crate) enum SimConnection {
LimboConnection(Rc<limbo_core::Connection>),
LimboConnection(Arc<limbo_core::Connection>),
SQLiteConnection(rusqlite::Connection),
Disconnected,
}

View file

@ -5,7 +5,6 @@ use limbo_core::Value;
use std::ffi::{self, CStr, CString};
use tracing::trace;
use std::rc::Rc;
use std::sync::Arc;
macro_rules! stub {
@ -42,7 +41,7 @@ use util::sqlite3_safety_check_sick_or_ok;
pub struct sqlite3 {
pub(crate) io: Arc<dyn limbo_core::IO>,
pub(crate) _db: Arc<limbo_core::Database>,
pub(crate) conn: Rc<limbo_core::Connection>,
pub(crate) conn: Arc<limbo_core::Connection>,
pub(crate) err_code: ffi::c_int,
pub(crate) err_mask: ffi::c_int,
pub(crate) malloc_failed: bool,
@ -54,7 +53,7 @@ impl sqlite3 {
pub fn new(
io: Arc<dyn limbo_core::IO>,
db: Arc<limbo_core::Database>,
conn: Rc<limbo_core::Connection>,
conn: Arc<limbo_core::Connection>,
) -> Self {
Self {
io,

View file

@ -2,7 +2,6 @@ use limbo_core::{Connection, Database, PagerCacheflushStatus, IO};
use rand::{rng, RngCore};
use rusqlite::params;
use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::sync::Arc;
use tempfile::TempDir;
use tracing_subscriber::layer::SubscriberExt;
@ -55,14 +54,14 @@ impl TempDatabase {
Self { path, io }
}
pub fn connect_limbo(&self) -> Rc<limbo_core::Connection> {
pub fn connect_limbo(&self) -> Arc<limbo_core::Connection> {
Self::connect_limbo_with_flags(&self, limbo_core::OpenFlags::default())
}
pub fn connect_limbo_with_flags(
&self,
flags: limbo_core::OpenFlags,
) -> Rc<limbo_core::Connection> {
) -> Arc<limbo_core::Connection> {
log::debug!("conneting to limbo");
let db = Database::open_file_with_flags(
self.io.clone(),
@ -83,7 +82,7 @@ impl TempDatabase {
}
}
pub(crate) fn do_flush(conn: &Rc<Connection>, tmp_db: &TempDatabase) -> anyhow::Result<()> {
pub(crate) fn do_flush(conn: &Arc<Connection>, tmp_db: &TempDatabase) -> anyhow::Result<()> {
loop {
match conn.cacheflush()? {
PagerCacheflushStatus::Done(_) => {
@ -155,7 +154,7 @@ pub(crate) fn sqlite_exec_rows(
pub(crate) fn limbo_exec_rows(
db: &TempDatabase,
conn: &Rc<limbo_core::Connection>,
conn: &Arc<limbo_core::Connection>,
query: &str,
) -> Vec<Vec<rusqlite::types::Value>> {
let mut stmt = conn.prepare(query).unwrap();
@ -193,7 +192,7 @@ pub(crate) fn limbo_exec_rows(
pub(crate) fn limbo_exec_rows_error(
db: &TempDatabase,
conn: &Rc<limbo_core::Connection>,
conn: &Arc<limbo_core::Connection>,
query: &str,
) -> limbo_core::Result<()> {
let mut stmt = conn.prepare(query)?;

View file

@ -2,7 +2,7 @@ use crate::common::{self, maybe_setup_tracing};
use crate::common::{compare_string, do_flush, TempDatabase};
use limbo_core::{Connection, Row, StepResult, Value};
use log::debug;
use std::rc::Rc;
use std::sync::Arc;
#[test]
#[ignore]
@ -286,7 +286,7 @@ fn test_wal_restart() -> anyhow::Result<()> {
let tmp_db = TempDatabase::new_with_rusqlite("CREATE TABLE test (x INTEGER PRIMARY KEY);");
// threshold is 1000 by default
fn insert(i: usize, conn: &Rc<Connection>, tmp_db: &TempDatabase) -> anyhow::Result<()> {
fn insert(i: usize, conn: &Arc<Connection>, tmp_db: &TempDatabase) -> anyhow::Result<()> {
debug!("inserting {}", i);
let insert_query = format!("INSERT INTO test VALUES ({})", i);
run_query(tmp_db, conn, &insert_query)?;
@ -295,7 +295,7 @@ fn test_wal_restart() -> anyhow::Result<()> {
Ok(())
}
fn count(conn: &Rc<Connection>, tmp_db: &TempDatabase) -> anyhow::Result<usize> {
fn count(conn: &Arc<Connection>, tmp_db: &TempDatabase) -> anyhow::Result<usize> {
debug!("counting");
let list_query = "SELECT count(x) FROM test";
let mut count = None;
@ -447,13 +447,13 @@ fn test_delete_with_index() -> anyhow::Result<()> {
Ok(())
}
fn run_query(tmp_db: &TempDatabase, conn: &Rc<Connection>, query: &str) -> anyhow::Result<()> {
fn run_query(tmp_db: &TempDatabase, conn: &Arc<Connection>, query: &str) -> anyhow::Result<()> {
run_query_core(tmp_db, conn, query, None::<fn(&Row)>)
}
fn run_query_on_row(
tmp_db: &TempDatabase,
conn: &Rc<Connection>,
conn: &Arc<Connection>,
query: &str,
on_row: impl FnMut(&Row),
) -> anyhow::Result<()> {
@ -462,7 +462,7 @@ fn run_query_on_row(
fn run_query_core(
tmp_db: &TempDatabase,
conn: &Rc<Connection>,
conn: &Arc<Connection>,
query: &str,
mut on_row: Option<impl FnMut(&Row)>,
) -> anyhow::Result<()> {

View file

@ -112,7 +112,7 @@ fn test_wal_1_writer_1_reader() -> Result<()> {
/// Execute a statement and get strings result
pub(crate) fn execute_and_get_strings(
tmp_db: &TempDatabase,
conn: &Rc<Connection>,
conn: &Arc<Connection>,
sql: &str,
) -> Result<Vec<String>> {
let statement = conn.prepare(sql)?;
@ -140,7 +140,7 @@ pub(crate) fn execute_and_get_strings(
/// Execute a statement and get integers
pub(crate) fn execute_and_get_ints(
tmp_db: &TempDatabase,
conn: &Rc<Connection>,
conn: &Arc<Connection>,
sql: &str,
) -> Result<Vec<i64>> {
let statement = conn.prepare(sql)?;