mirror of
https://github.com/tursodatabase/limbo.git
synced 2025-08-04 18:18:03 +00:00
Merge 'Write database header via normal pager route' from meteorgan
Closes: #1613 Closes #1634
This commit is contained in:
commit
2087393d22
5 changed files with 41 additions and 79 deletions
|
@ -176,9 +176,9 @@ impl Limbo {
|
|||
self
|
||||
}
|
||||
|
||||
fn first_run(&mut self, sql: Option<String>, quiet: bool) -> io::Result<()> {
|
||||
fn first_run(&mut self, sql: Option<String>, quiet: bool) -> Result<(), LimboError> {
|
||||
if let Some(sql) = sql {
|
||||
self.handle_first_input(&sql);
|
||||
self.handle_first_input(&sql)?;
|
||||
}
|
||||
if !quiet {
|
||||
self.write_fmt(format_args!("Limbo v{}", env!("CARGO_PKG_VERSION")))?;
|
||||
|
@ -188,12 +188,13 @@ impl Limbo {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_first_input(&mut self, cmd: &str) {
|
||||
fn handle_first_input(&mut self, cmd: &str) -> Result<(), LimboError> {
|
||||
if cmd.trim().starts_with('.') {
|
||||
self.handle_dot_command(&cmd[1..]);
|
||||
} else {
|
||||
self.run_query(cmd);
|
||||
}
|
||||
self.close_conn()?;
|
||||
std::process::exit(0);
|
||||
}
|
||||
|
||||
|
|
|
@ -3,7 +3,9 @@ use crate::result::LimboResult;
|
|||
use crate::storage::btree::BTreePageInner;
|
||||
use crate::storage::buffer_pool::BufferPool;
|
||||
use crate::storage::database::DatabaseStorage;
|
||||
use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent, PageType};
|
||||
use crate::storage::sqlite3_ondisk::{
|
||||
self, DatabaseHeader, PageContent, PageType, DATABASE_HEADER_PAGE_ID,
|
||||
};
|
||||
use crate::storage::wal::{CheckpointResult, Wal, WalFsyncStatus};
|
||||
use crate::Completion;
|
||||
use crate::{Buffer, LimboError, Result};
|
||||
|
@ -382,8 +384,19 @@ impl Pager {
|
|||
}
|
||||
|
||||
/// Writes the database header.
|
||||
pub fn write_database_header(&self, header: &DatabaseHeader) {
|
||||
sqlite3_ondisk::begin_write_database_header(header, self).expect("failed to write header");
|
||||
pub fn write_database_header(&self, header: &DatabaseHeader) -> Result<()> {
|
||||
let header_page = self.read_page(DATABASE_HEADER_PAGE_ID)?;
|
||||
while header_page.is_locked() {
|
||||
// FIXME: we should never run io here!
|
||||
self.io.run_once()?;
|
||||
}
|
||||
header_page.set_dirty();
|
||||
self.add_dirty(DATABASE_HEADER_PAGE_ID);
|
||||
|
||||
let contents = header_page.get().contents.as_ref().unwrap();
|
||||
contents.write_database_header(&header);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Changes the size of the page cache.
|
||||
|
@ -665,24 +678,8 @@ impl Pager {
|
|||
let header = &self.db_header;
|
||||
let mut header = header.lock();
|
||||
header.database_size += 1;
|
||||
{
|
||||
// update database size
|
||||
// read sync for now
|
||||
loop {
|
||||
let first_page_ref = self.read_page(1)?;
|
||||
if first_page_ref.is_locked() {
|
||||
// FIXME: we should never run io here!
|
||||
self.io.run_once()?;
|
||||
continue;
|
||||
}
|
||||
first_page_ref.set_dirty();
|
||||
self.add_dirty(1);
|
||||
|
||||
let contents = first_page_ref.get().contents.as_ref().unwrap();
|
||||
contents.write_database_header(&header);
|
||||
break;
|
||||
}
|
||||
}
|
||||
// update database size
|
||||
self.write_database_header(&mut header)?;
|
||||
|
||||
// FIXME: should reserve page cache entry before modifying the database
|
||||
let page = allocate_page(header.database_size as usize, &self.buffer_pool, 0);
|
||||
|
@ -694,13 +691,11 @@ impl Pager {
|
|||
let page_key = PageCacheKey::new(page.get().id);
|
||||
let mut cache = self.page_cache.write();
|
||||
match cache.insert(page_key, page.clone()) {
|
||||
Err(CacheError::Full) => return Err(LimboError::CacheFull),
|
||||
Err(_) => {
|
||||
return Err(LimboError::InternalError(
|
||||
"Unknown error inserting page to cache".into(),
|
||||
))
|
||||
}
|
||||
Ok(_) => return Ok(page),
|
||||
Err(CacheError::Full) => Err(LimboError::CacheFull),
|
||||
Err(_) => Err(LimboError::InternalError(
|
||||
"Unknown error inserting page to cache".into(),
|
||||
)),
|
||||
Ok(_) => Ok(page),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -83,6 +83,8 @@ const MAX_PAGE_SIZE: u32 = 65536;
|
|||
/// The default page size in bytes.
|
||||
const DEFAULT_PAGE_SIZE: u16 = 4096;
|
||||
|
||||
pub const DATABASE_HEADER_PAGE_ID: usize = 1;
|
||||
|
||||
/// The database header.
|
||||
/// The first 100 bytes of the database file comprise the database file header.
|
||||
/// The database file header is divided into fields as shown by the table below.
|
||||
|
@ -298,7 +300,7 @@ pub fn begin_read_database_header(
|
|||
});
|
||||
let c = Completion::Read(ReadCompletion::new(buf, complete));
|
||||
#[allow(clippy::arc_with_non_send_sync)]
|
||||
db_file.read_page(1, Arc::new(c))?;
|
||||
db_file.read_page(DATABASE_HEADER_PAGE_ID, Arc::new(c))?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
|
@ -338,48 +340,6 @@ fn finish_read_database_header(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub fn begin_write_database_header(header: &DatabaseHeader, pager: &Pager) -> Result<()> {
|
||||
let page_source = pager.db_file.clone();
|
||||
let header = Rc::new(header.clone());
|
||||
|
||||
let drop_fn = Rc::new(|_buf| {});
|
||||
#[allow(clippy::arc_with_non_send_sync)]
|
||||
let buffer_to_copy = Arc::new(RefCell::new(Buffer::allocate(512, drop_fn)));
|
||||
let buffer_to_copy_in_cb = buffer_to_copy.clone();
|
||||
|
||||
let read_complete = Box::new(move |buffer: Arc<RefCell<Buffer>>| {
|
||||
let buffer = buffer.borrow().clone();
|
||||
let buffer = Rc::new(RefCell::new(buffer));
|
||||
let mut buf_mut = buffer.borrow_mut();
|
||||
write_header_to_buf(buf_mut.as_mut_slice(), &header);
|
||||
let mut dest_buf = buffer_to_copy_in_cb.borrow_mut();
|
||||
dest_buf.as_mut_slice().copy_from_slice(buf_mut.as_slice());
|
||||
});
|
||||
|
||||
let drop_fn = Rc::new(|_buf| {});
|
||||
#[allow(clippy::arc_with_non_send_sync)]
|
||||
let buf = Arc::new(RefCell::new(Buffer::allocate(512, drop_fn)));
|
||||
let c = Completion::Read(ReadCompletion::new(buf, read_complete));
|
||||
#[allow(clippy::arc_with_non_send_sync)]
|
||||
page_source.read_page(1, Arc::new(c))?;
|
||||
// run get header block
|
||||
pager.io.run_once()?;
|
||||
|
||||
let buffer_to_copy_in_cb = buffer_to_copy.clone();
|
||||
let write_complete = Box::new(move |bytes_written: i32| {
|
||||
let buf_len = buffer_to_copy_in_cb.borrow().len();
|
||||
if bytes_written < buf_len as i32 {
|
||||
tracing::error!("wrote({bytes_written}) less than expected({buf_len})");
|
||||
}
|
||||
// finish_read_database_header(buf, header).unwrap();
|
||||
});
|
||||
|
||||
let c = Completion::Write(WriteCompletion::new(write_complete));
|
||||
page_source.write_page(1, buffer_to_copy, Arc::new(c))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn write_header_to_buf(buf: &mut [u8], header: &DatabaseHeader) {
|
||||
buf[0..16].copy_from_slice(&header.magic);
|
||||
buf[16..18].copy_from_slice(&header.page_size.to_be_bytes());
|
||||
|
@ -835,7 +795,7 @@ pub fn finish_read_page(
|
|||
page: PageRef,
|
||||
) -> Result<()> {
|
||||
trace!("finish_read_btree_page(page_idx = {})", page_idx);
|
||||
let pos = if page_idx == 1 {
|
||||
let pos = if page_idx == DATABASE_HEADER_PAGE_ID {
|
||||
DATABASE_HEADER_SIZE
|
||||
} else {
|
||||
0
|
||||
|
|
|
@ -125,7 +125,7 @@ fn update_pragma(
|
|||
},
|
||||
_ => bail_parse_error!("Not a valid value"),
|
||||
};
|
||||
update_cache_size(cache_size, header, pager);
|
||||
update_cache_size(cache_size, header, pager)?;
|
||||
Ok(())
|
||||
}
|
||||
PragmaName::JournalMode => {
|
||||
|
@ -283,7 +283,11 @@ fn query_pragma(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn update_cache_size(value: i64, header: Arc<SpinLock<DatabaseHeader>>, pager: Rc<Pager>) {
|
||||
fn update_cache_size(
|
||||
value: i64,
|
||||
header: Arc<SpinLock<DatabaseHeader>>,
|
||||
pager: Rc<Pager>,
|
||||
) -> crate::Result<()> {
|
||||
let mut cache_size_unformatted: i64 = value;
|
||||
let mut cache_size = if cache_size_unformatted < 0 {
|
||||
let kb = cache_size_unformatted.abs() * 1024;
|
||||
|
@ -306,10 +310,12 @@ fn update_cache_size(value: i64, header: Arc<SpinLock<DatabaseHeader>>, pager: R
|
|||
.unwrap_or_else(|_| panic!("invalid value, too big for a i32 {}", value));
|
||||
|
||||
// update in disk
|
||||
pager.write_database_header(&header_guard);
|
||||
pager.write_database_header(&header_guard)?;
|
||||
|
||||
// update cache size
|
||||
pager
|
||||
.change_page_cache_size(cache_size)
|
||||
.expect("couldn't update page cache size");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -4536,7 +4536,7 @@ pub fn op_set_cookie(
|
|||
Cookie::UserVersion => {
|
||||
let mut header_guard = pager.db_header.lock();
|
||||
header_guard.user_version = *value;
|
||||
pager.write_database_header(&*header_guard);
|
||||
pager.write_database_header(&*header_guard)?;
|
||||
}
|
||||
cookie => todo!("{cookie:?} is not yet implement for SetCookie"),
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue