Implement create virtual table and VUpdate opcode

This commit is contained in:
PThorpe92 2025-02-14 09:34:30 -05:00
parent d775b3ea5a
commit 9c8083231c
No known key found for this signature in database
GPG key ID: 66DB3FBACBDD05CC
22 changed files with 940 additions and 214 deletions

View file

@ -27,7 +27,7 @@ use fallible_iterator::FallibleIterator;
use libloading::{Library, Symbol};
#[cfg(not(target_family = "wasm"))]
use limbo_ext::{ExtensionApi, ExtensionEntryPoint};
use limbo_ext::{ResultCode, VTabModuleImpl, Value as ExtValue};
use limbo_ext::{ResultCode, VTabKind, VTabModuleImpl, Value as ExtValue};
use limbo_sqlite3_parser::{ast, ast::Cmd, lexer::sql::Parser};
use parking_lot::RwLock;
use schema::{Column, Schema};
@ -49,7 +49,7 @@ pub use storage::wal::WalFile;
pub use storage::wal::WalFileShared;
use types::OwnedValue;
pub use types::Value;
use util::parse_schema_rows;
use util::{columns_from_create_table_body, parse_schema_rows};
use vdbe::builder::QueryMode;
use vdbe::VTabOpaqueCursor;
@ -87,7 +87,6 @@ pub struct Database {
schema: Rc<RefCell<Schema>>,
header: Rc<RefCell<DatabaseHeader>>,
syms: Rc<RefCell<SymbolTable>>,
vtab_modules: HashMap<String, Rc<VTabModuleImpl>>,
// Shared structures of a Database are the parts that are common to multiple threads that might
// create DB connections.
_shared_page_cache: Arc<RwLock<DumbLruPageCache>>,
@ -149,8 +148,7 @@ impl Database {
header: header.clone(),
_shared_page_cache: _shared_page_cache.clone(),
_shared_wal: shared_wal.clone(),
syms,
vtab_modules: HashMap::new(),
syms: syms.clone(),
};
if let Err(e) = db.register_builtins() {
return Err(LimboError::ExtensionError(e));
@ -169,7 +167,7 @@ impl Database {
});
let rows = conn.query("SELECT * FROM sqlite_schema")?;
let mut schema = schema.borrow_mut();
parse_schema_rows(rows, &mut schema, io)?;
parse_schema_rows(rows, &mut schema, io, &syms.borrow())?;
Ok(db)
}
@ -276,10 +274,9 @@ impl Connection {
pub fn prepare(self: &Rc<Connection>, sql: impl AsRef<str>) -> Result<Statement> {
let sql = sql.as_ref();
tracing::trace!("Preparing: {}", sql);
let db = &self.db;
let mut parser = Parser::new(sql.as_bytes());
let syms = &db.syms.borrow();
let cmd = parser.next()?;
let syms = self.db.syms.borrow();
if let Some(cmd) = cmd {
match cmd {
Cmd::Stmt(stmt) => {
@ -289,7 +286,7 @@ impl Connection {
self.header.clone(),
self.pager.clone(),
Rc::downgrade(self),
syms,
&syms,
QueryMode::Normal,
)?);
Ok(Statement::new(program, self.pager.clone()))
@ -315,7 +312,7 @@ impl Connection {
pub(crate) fn run_cmd(self: &Rc<Connection>, cmd: Cmd) -> Result<Option<Statement>> {
let db = self.db.clone();
let syms: &SymbolTable = &db.syms.borrow();
let syms = db.syms.borrow();
match cmd {
Cmd::Stmt(stmt) => {
let program = Rc::new(translate::translate(
@ -324,7 +321,7 @@ impl Connection {
self.header.clone(),
self.pager.clone(),
Rc::downgrade(self),
syms,
&syms,
QueryMode::Normal,
)?);
let stmt = Statement::new(program, self.pager.clone());
@ -337,7 +334,7 @@ impl Connection {
self.header.clone(),
self.pager.clone(),
Rc::downgrade(self),
syms,
&syms,
QueryMode::Explain,
)?;
program.explain();
@ -346,12 +343,8 @@ impl Connection {
Cmd::ExplainQueryPlan(stmt) => {
match stmt {
ast::Stmt::Select(select) => {
let mut plan = prepare_select_plan(
&self.schema.borrow(),
*select,
&self.db.syms.borrow(),
None,
)?;
let mut plan =
prepare_select_plan(&self.schema.borrow(), *select, &syms, None)?;
optimize_plan(&mut plan, &self.schema.borrow())?;
println!("{}", plan);
}
@ -368,10 +361,9 @@ impl Connection {
pub fn execute(self: &Rc<Connection>, sql: impl AsRef<str>) -> Result<()> {
let sql = sql.as_ref();
let db = &self.db;
let syms: &SymbolTable = &db.syms.borrow();
let mut parser = Parser::new(sql.as_bytes());
let cmd = parser.next()?;
let syms = self.db.syms.borrow();
if let Some(cmd) = cmd {
match cmd {
Cmd::Explain(stmt) => {
@ -381,7 +373,7 @@ impl Connection {
self.header.clone(),
self.pager.clone(),
Rc::downgrade(self),
syms,
&syms,
QueryMode::Explain,
)?;
program.explain();
@ -394,7 +386,7 @@ impl Connection {
self.header.clone(),
self.pager.clone(),
Rc::downgrade(self),
syms,
&syms,
QueryMode::Normal,
)?;
@ -524,14 +516,73 @@ pub type StepResult = vdbe::StepResult;
#[derive(Clone, Debug)]
pub struct VirtualTable {
name: String,
args: Option<Vec<ast::Expr>>,
args: Option<Vec<String>>,
pub implementation: Rc<VTabModuleImpl>,
columns: Vec<Column>,
}
impl VirtualTable {
pub(crate) fn from_args(
tbl_name: Option<&str>,
module_name: &str,
args: &[String],
syms: &SymbolTable,
kind: VTabKind,
) -> Result<Rc<Self>> {
let module = syms
.vtab_modules
.get(module_name)
.ok_or(LimboError::ExtensionError(format!(
"Virtual table module not found: {}",
module_name
)))?;
if let VTabKind::VirtualTable = kind {
if module.module_type != VTabKind::VirtualTable {
return Err(LimboError::ExtensionError(format!(
"Virtual table module {} is not a virtual table",
module_name
)));
}
};
let schema = module.implementation.as_ref().init_schema(args)?;
let mut parser = Parser::new(schema.as_bytes());
parser.reset(schema.as_bytes());
println!("Schema: {}", schema);
if let ast::Cmd::Stmt(ast::Stmt::CreateTable { body, .. }) = parser.next()?.ok_or(
LimboError::ParseError("Failed to parse schema from virtual table module".to_string()),
)? {
let columns = columns_from_create_table_body(&body)?;
let vtab = Rc::new(VirtualTable {
name: tbl_name.unwrap_or(module_name).to_owned(),
args: Some(args.to_vec()),
implementation: module.implementation.clone(),
columns,
});
return Ok(vtab);
}
Err(crate::LimboError::ParseError(
"Failed to parse schema from virtual table module".to_string(),
))
}
pub fn open(&self) -> VTabOpaqueCursor {
let cursor = unsafe { (self.implementation.open)() };
let args = if let Some(args) = &self.args {
args.iter()
.map(|e| std::ffi::CString::new(e.to_string()).unwrap().into_raw())
.collect()
} else {
Vec::new()
};
let cursor =
unsafe { (self.implementation.open)(args.as_slice().as_ptr(), args.len() as i32) };
// free the CString pointers
for arg in args {
unsafe {
if !arg.is_null() {
let _ = std::ffi::CString::from_raw(arg);
}
}
}
VTabOpaqueCursor::new(cursor)
}
@ -580,13 +631,51 @@ impl VirtualTable {
_ => Err(LimboError::ExtensionError("Next failed".to_string())),
}
}
pub fn update(&self, args: &[OwnedValue], rowid: Option<i64>) -> Result<Option<i64>> {
let arg_count = args.len();
let mut ext_args = Vec::with_capacity(arg_count);
for i in 0..arg_count {
let ownedvalue_arg = args.get(i).unwrap();
let extvalue_arg: ExtValue = match ownedvalue_arg {
OwnedValue::Null => Ok(ExtValue::null()),
OwnedValue::Integer(i) => Ok(ExtValue::from_integer(*i)),
OwnedValue::Float(f) => Ok(ExtValue::from_float(*f)),
OwnedValue::Text(t) => Ok(ExtValue::from_text(t.as_str().to_string())),
OwnedValue::Blob(b) => Ok(ExtValue::from_blob((**b).clone())),
other => Err(LimboError::ExtensionError(format!(
"Unsupported value type: {:?}",
other
))),
}?;
ext_args.push(extvalue_arg);
}
let rowid = rowid.unwrap_or(-1);
let newrowid = 0i64;
let implementation = self.implementation.as_ref();
let rc = unsafe {
(self.implementation.update)(
implementation as *const VTabModuleImpl as *mut std::ffi::c_void,
arg_count as i32,
ext_args.as_ptr(),
rowid,
&newrowid as *const _ as *mut i64,
)
};
match rc {
ResultCode::OK => Ok(None),
ResultCode::RowID => Ok(Some(newrowid)),
_ => Err(LimboError::ExtensionError(rc.to_string())),
}
}
}
pub(crate) struct SymbolTable {
pub functions: HashMap<String, Rc<function::ExternalFunc>>,
#[cfg(not(target_family = "wasm"))]
extensions: Vec<(Library, *const ExtensionApi)>,
pub vtabs: HashMap<String, VirtualTable>,
pub vtabs: HashMap<String, Rc<VirtualTable>>,
pub vtab_modules: HashMap<String, Rc<crate::ext::VTabImpl>>,
}
impl std::fmt::Debug for SymbolTable {
@ -631,6 +720,7 @@ impl SymbolTable {
vtabs: HashMap::new(),
#[cfg(not(target_family = "wasm"))]
extensions: Vec::new(),
vtab_modules: HashMap::new(),
}
}