mirror of
https://github.com/tursodatabase/limbo.git
synced 2025-08-04 01:58:16 +00:00
core: Switch to parking_lot for RwLock
We really need to make the WAL lock less expensive, but switching to `parking_lot` is anyway something we should do. Before: ``` Execute `SELECT 1`/Limbo time: [56.230 ns 56.463 ns 56.688 ns] ``` After: ``` Execute `SELECT 1`/Limbo time: [52.003 ns 52.132 ns 52.287 ns] ```
This commit is contained in:
parent
750164fb85
commit
e4d7474372
6 changed files with 39 additions and 31 deletions
3
Cargo.lock
generated
3
Cargo.lock
generated
|
@ -1590,6 +1590,7 @@ dependencies = [
|
|||
"miette",
|
||||
"mimalloc",
|
||||
"mockall",
|
||||
"parking_lot",
|
||||
"pest",
|
||||
"pest_derive",
|
||||
"polling",
|
||||
|
@ -1681,7 +1682,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "limbo_time"
|
||||
version = "0.0.13"
|
||||
version = "0.0.14"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"limbo_ext",
|
||||
|
|
|
@ -69,6 +69,7 @@ limbo_percentile = { path = "../extensions/percentile", optional = true, feature
|
|||
limbo_time = { path = "../extensions/time", optional = true, features = ["static"] }
|
||||
miette = "7.4.0"
|
||||
strum = "0.26"
|
||||
parking_lot = "0.12.3"
|
||||
|
||||
[build-dependencies]
|
||||
chrono = "0.4.38"
|
||||
|
|
|
@ -24,13 +24,14 @@ use fallible_iterator::FallibleIterator;
|
|||
use libloading::{Library, Symbol};
|
||||
use limbo_ext::{ExtensionApi, ExtensionEntryPoint};
|
||||
use log::trace;
|
||||
use parking_lot::RwLock;
|
||||
use schema::Schema;
|
||||
use sqlite3_parser::ast;
|
||||
use sqlite3_parser::{ast::Cmd, lexer::sql::Parser};
|
||||
use std::cell::Cell;
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZero;
|
||||
use std::sync::{Arc, OnceLock, RwLock};
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use std::{cell::RefCell, rc::Rc};
|
||||
use storage::btree::btree_init_page;
|
||||
#[cfg(feature = "fs")]
|
||||
|
|
|
@ -5,11 +5,12 @@ use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent};
|
|||
use crate::storage::wal::{CheckpointResult, Wal};
|
||||
use crate::{Buffer, Result};
|
||||
use log::trace;
|
||||
use parking_lot::RwLock;
|
||||
use std::cell::{RefCell, UnsafeCell};
|
||||
use std::collections::HashSet;
|
||||
use std::rc::Rc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::page_cache::{DumbLruPageCache, PageCacheKey};
|
||||
use super::wal::{CheckpointMode, CheckpointStatus};
|
||||
|
@ -225,7 +226,7 @@ impl Pager {
|
|||
/// Reads a page from the database.
|
||||
pub fn read_page(&self, page_idx: usize) -> Result<PageRef> {
|
||||
trace!("read_page(page_idx = {})", page_idx);
|
||||
let mut page_cache = self.page_cache.write().unwrap();
|
||||
let mut page_cache = self.page_cache.write();
|
||||
let page_key = PageCacheKey::new(page_idx, Some(self.wal.borrow().get_max_frame()));
|
||||
if let Some(page) = page_cache.get(&page_key) {
|
||||
trace!("read_page(page_idx = {}) = cached", page_idx);
|
||||
|
@ -261,7 +262,7 @@ impl Pager {
|
|||
pub fn load_page(&self, page: PageRef) -> Result<()> {
|
||||
let id = page.get().id;
|
||||
trace!("load_page(page_idx = {})", id);
|
||||
let mut page_cache = self.page_cache.write().unwrap();
|
||||
let mut page_cache = self.page_cache.write();
|
||||
page.set_locked();
|
||||
let page_key = PageCacheKey::new(id, Some(self.wal.borrow().get_max_frame()));
|
||||
if let Some(frame_id) = self.wal.borrow().find_frame(id as u64)? {
|
||||
|
@ -297,7 +298,7 @@ impl Pager {
|
|||
|
||||
/// Changes the size of the page cache.
|
||||
pub fn change_page_cache_size(&self, capacity: usize) {
|
||||
let mut page_cache = self.page_cache.write().unwrap();
|
||||
let mut page_cache = self.page_cache.write();
|
||||
page_cache.resize(capacity);
|
||||
}
|
||||
|
||||
|
@ -315,7 +316,7 @@ impl Pager {
|
|||
FlushState::Start => {
|
||||
let db_size = self.db_header.borrow().database_size;
|
||||
for page_id in self.dirty_pages.borrow().iter() {
|
||||
let mut cache = self.page_cache.write().unwrap();
|
||||
let mut cache = self.page_cache.write();
|
||||
let page_key =
|
||||
PageCacheKey::new(*page_id, Some(self.wal.borrow().get_max_frame()));
|
||||
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
|
||||
|
@ -446,7 +447,7 @@ impl Pager {
|
|||
}
|
||||
}
|
||||
// TODO: only clear cache of things that are really invalidated
|
||||
self.page_cache.write().unwrap().clear();
|
||||
self.page_cache.write().clear();
|
||||
checkpoint_result
|
||||
}
|
||||
|
||||
|
@ -482,7 +483,7 @@ impl Pager {
|
|||
// setup page and add to cache
|
||||
page.set_dirty();
|
||||
self.add_dirty(page.get().id);
|
||||
let mut cache = self.page_cache.write().unwrap();
|
||||
let mut cache = self.page_cache.write();
|
||||
let page_key =
|
||||
PageCacheKey::new(page.get().id, Some(self.wal.borrow().get_max_frame()));
|
||||
cache.insert(page_key, page.clone());
|
||||
|
@ -491,7 +492,7 @@ impl Pager {
|
|||
}
|
||||
|
||||
pub fn put_loaded_page(&self, id: usize, page: PageRef) {
|
||||
let mut cache = self.page_cache.write().unwrap();
|
||||
let mut cache = self.page_cache.write();
|
||||
// cache insert invalidates previous page
|
||||
let page_key = PageCacheKey::new(id, Some(self.wal.borrow().get_max_frame()));
|
||||
cache.insert(page_key, page.clone());
|
||||
|
@ -525,7 +526,9 @@ pub fn allocate_page(page_id: usize, buffer_pool: &Rc<BufferPool>, offset: usize
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::Arc;
|
||||
|
||||
use parking_lot::RwLock;
|
||||
|
||||
use crate::storage::page_cache::{DumbLruPageCache, PageCacheKey};
|
||||
|
||||
|
@ -539,13 +542,13 @@ mod tests {
|
|||
let thread = {
|
||||
let cache = cache.clone();
|
||||
std::thread::spawn(move || {
|
||||
let mut cache = cache.write().unwrap();
|
||||
let mut cache = cache.write();
|
||||
let page_key = PageCacheKey::new(1, None);
|
||||
cache.insert(page_key, Arc::new(Page::new(1)));
|
||||
})
|
||||
};
|
||||
let _ = thread.join();
|
||||
let mut cache = cache.write().unwrap();
|
||||
let mut cache = cache.write();
|
||||
let page_key = PageCacheKey::new(1, None);
|
||||
let page = cache.get(&page_key);
|
||||
assert_eq!(page.unwrap().get().id, 1);
|
||||
|
|
|
@ -49,10 +49,11 @@ use crate::storage::pager::Pager;
|
|||
use crate::types::{OwnedRecord, OwnedValue};
|
||||
use crate::{File, Result};
|
||||
use log::trace;
|
||||
use parking_lot::RwLock;
|
||||
use std::cell::RefCell;
|
||||
use std::pin::Pin;
|
||||
use std::rc::Rc;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::pager::PageRef;
|
||||
|
||||
|
@ -1147,7 +1148,7 @@ pub fn begin_read_wal_header(io: &Rc<dyn File>) -> Result<Arc<RwLock<WalHeader>>
|
|||
fn finish_read_wal_header(buf: Rc<RefCell<Buffer>>, header: Arc<RwLock<WalHeader>>) -> Result<()> {
|
||||
let buf = buf.borrow();
|
||||
let buf = buf.as_slice();
|
||||
let mut header = header.write().unwrap();
|
||||
let mut header = header.write();
|
||||
header.magic = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
|
||||
header.file_format = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
|
||||
header.page_size = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
use log::{debug, trace};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use parking_lot::RwLock;
|
||||
use std::fmt::Formatter;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::RwLock;
|
||||
use std::{cell::RefCell, fmt, rc::Rc, sync::Arc};
|
||||
|
||||
use crate::io::{File, SyncCompletion, IO};
|
||||
|
@ -311,7 +312,7 @@ impl fmt::Debug for WalFileShared {
|
|||
impl Wal for WalFile {
|
||||
/// Begin a read transaction.
|
||||
fn begin_read_tx(&mut self) -> Result<LimboResult> {
|
||||
let mut shared = self.shared.write().unwrap();
|
||||
let mut shared = self.shared.write();
|
||||
let max_frame_in_wal = shared.max_frame;
|
||||
self.min_frame = shared.nbackfills + 1;
|
||||
|
||||
|
@ -365,7 +366,7 @@ impl Wal for WalFile {
|
|||
|
||||
/// End a read transaction.
|
||||
fn end_read_tx(&self) -> Result<LimboResult> {
|
||||
let mut shared = self.shared.write().unwrap();
|
||||
let mut shared = self.shared.write();
|
||||
let read_lock = &mut shared.read_locks[self.max_frame_read_lock_index];
|
||||
read_lock.unlock();
|
||||
Ok(LimboResult::Ok)
|
||||
|
@ -373,7 +374,7 @@ impl Wal for WalFile {
|
|||
|
||||
/// Begin a write transaction
|
||||
fn begin_write_tx(&mut self) -> Result<LimboResult> {
|
||||
let mut shared = self.shared.write().unwrap();
|
||||
let mut shared = self.shared.write();
|
||||
let busy = !shared.write_lock.write();
|
||||
if busy {
|
||||
return Ok(LimboResult::Busy);
|
||||
|
@ -383,14 +384,14 @@ impl Wal for WalFile {
|
|||
|
||||
/// End a write transaction
|
||||
fn end_write_tx(&self) -> Result<LimboResult> {
|
||||
let mut shared = self.shared.write().unwrap();
|
||||
let mut shared = self.shared.write();
|
||||
shared.write_lock.unlock();
|
||||
Ok(LimboResult::Ok)
|
||||
}
|
||||
|
||||
/// Find the latest frame containing a page.
|
||||
fn find_frame(&self, page_id: u64) -> Result<Option<u64>> {
|
||||
let shared = self.shared.read().unwrap();
|
||||
let shared = self.shared.read();
|
||||
let frames = shared.frame_cache.get(&page_id);
|
||||
if frames.is_none() {
|
||||
return Ok(None);
|
||||
|
@ -408,7 +409,7 @@ impl Wal for WalFile {
|
|||
fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Rc<BufferPool>) -> Result<()> {
|
||||
debug!("read_frame({})", frame_id);
|
||||
let offset = self.frame_offset(frame_id);
|
||||
let shared = self.shared.read().unwrap();
|
||||
let shared = self.shared.read();
|
||||
page.set_locked();
|
||||
begin_read_wal_frame(
|
||||
&shared.file,
|
||||
|
@ -427,7 +428,7 @@ impl Wal for WalFile {
|
|||
write_counter: Rc<RefCell<usize>>,
|
||||
) -> Result<()> {
|
||||
let page_id = page.get().id;
|
||||
let mut shared = self.shared.write().unwrap();
|
||||
let mut shared = self.shared.write();
|
||||
let frame_id = if shared.max_frame == 0 {
|
||||
1
|
||||
} else {
|
||||
|
@ -441,7 +442,7 @@ impl Wal for WalFile {
|
|||
page_id
|
||||
);
|
||||
let header = shared.wal_header.clone();
|
||||
let header = header.read().unwrap();
|
||||
let header = header.read();
|
||||
let checksums = shared.last_checksum;
|
||||
let checksums = begin_write_wal_frame(
|
||||
&shared.file,
|
||||
|
@ -468,7 +469,7 @@ impl Wal for WalFile {
|
|||
}
|
||||
|
||||
fn should_checkpoint(&self) -> bool {
|
||||
let shared = self.shared.read().unwrap();
|
||||
let shared = self.shared.read();
|
||||
let frame_id = shared.max_frame as usize;
|
||||
frame_id >= self.checkpoint_threshold
|
||||
}
|
||||
|
@ -490,7 +491,7 @@ impl Wal for WalFile {
|
|||
CheckpointState::Start => {
|
||||
// TODO(pere): check what frames are safe to checkpoint between many readers!
|
||||
self.ongoing_checkpoint.min_frame = self.min_frame;
|
||||
let mut shared = self.shared.write().unwrap();
|
||||
let mut shared = self.shared.write();
|
||||
let max_frame_in_wal = shared.max_frame as u32;
|
||||
let mut max_safe_frame = shared.max_frame;
|
||||
for read_lock in shared.read_locks.iter_mut() {
|
||||
|
@ -515,7 +516,7 @@ impl Wal for WalFile {
|
|||
);
|
||||
}
|
||||
CheckpointState::ReadFrame => {
|
||||
let shared = self.shared.read().unwrap();
|
||||
let shared = self.shared.read();
|
||||
assert!(
|
||||
self.ongoing_checkpoint.current_page as usize
|
||||
<= shared.pages_in_frames.len()
|
||||
|
@ -574,7 +575,7 @@ impl Wal for WalFile {
|
|||
if *write_counter.borrow() > 0 {
|
||||
return Ok(CheckpointStatus::IO);
|
||||
}
|
||||
let shared = self.shared.read().unwrap();
|
||||
let shared = self.shared.read();
|
||||
if (self.ongoing_checkpoint.current_page as usize)
|
||||
< shared.pages_in_frames.len()
|
||||
{
|
||||
|
@ -587,7 +588,7 @@ impl Wal for WalFile {
|
|||
if *write_counter.borrow() > 0 {
|
||||
return Ok(CheckpointStatus::IO);
|
||||
}
|
||||
let mut shared = self.shared.write().unwrap();
|
||||
let mut shared = self.shared.write();
|
||||
|
||||
// Record two num pages fields to return as checkpoint result to caller.
|
||||
// Ref: pnLog, pnCkpt on https://www.sqlite.org/c3ref/wal_checkpoint_v2.html
|
||||
|
@ -618,7 +619,7 @@ impl Wal for WalFile {
|
|||
let state = *self.sync_state.borrow();
|
||||
match state {
|
||||
SyncState::NotSyncing => {
|
||||
let shared = self.shared.write().unwrap();
|
||||
let shared = self.shared.write();
|
||||
debug!("wal_sync");
|
||||
{
|
||||
let syncing = self.syncing.clone();
|
||||
|
@ -756,7 +757,7 @@ impl WalFileShared {
|
|||
Arc::new(RwLock::new(wal_header))
|
||||
};
|
||||
let checksum = {
|
||||
let checksum = header.read().unwrap();
|
||||
let checksum = header.read();
|
||||
(checksum.checksum_1, checksum.checksum_2)
|
||||
};
|
||||
let shared = WalFileShared {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue