limbo/core/mvcc/mod.rs
Pere Diaz Bou da4190a23e Convert u64 rowid to i64
Rowids can be negative, therefore let's swap to i64
2025-05-30 13:07:31 +02:00

163 lines
6 KiB
Rust

//! Multiversion concurrency control (MVCC) for Rust.
//!
//! This module implements the main memory MVCC method outlined in the paper
//! "High-Performance Concurrency Control Mechanisms for Main-Memory Databases"
//! by Per-Åke Larson et al (VLDB, 2011).
//!
//! ## Data anomalies
//!
//! * A *dirty write* occurs when transaction T_m updates a value that is written by
//! transaction T_n but not yet committed. The MVCC algorithm prevents dirty
//! writes by validating that a row version is visible to transaction T_m before
//! allowing update to it.
//!
//! * A *dirty read* occurs when transaction T_m reads a value that was written by
//! transaction T_n but not yet committed. The MVCC algorithm prevents dirty
//! reads by validating that a row version is visible to transaction T_m.
//!
//! * A *fuzzy read* (non-repeatable read) occurs when transaction T_m reads a
//! different value in the course of the transaction because another
//! transaction T_n has updated the value.
//!
//! * A *lost update* occurs when transactions T_m and T_n both attempt to update
//! the same value, resulting in one of the updates being lost. The MVCC algorithm
//! prevents lost updates by detecting the write-write conflict and letting the
//! first-writer win by aborting the later transaction.
//!
//! TODO: phantom reads, cursor lost updates, read skew, write skew.
//!
//! ## TODO
//!
//! * Optimistic reads and writes
//! * Garbage collection
pub mod clock;
pub mod cursor;
pub mod database;
pub mod errors;
pub mod persistent_storage;
pub use clock::LocalClock;
pub use database::MvStore;
#[cfg(test)]
mod tests {
use crate::mvcc::clock::LocalClock;
use crate::mvcc::database::{MvStore, Row, RowID};
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
static IDS: AtomicI64 = AtomicI64::new(1);
#[test]
fn test_non_overlapping_concurrent_inserts() {
// Two threads insert to the database concurrently using non-overlapping
// row IDs.
let clock = LocalClock::default();
let storage = crate::mvcc::persistent_storage::Storage::new_noop();
let db = Arc::new(MvStore::new(clock, storage));
let iterations = 100000;
let th1 = {
let db = db.clone();
std::thread::spawn(move || {
for _ in 0..iterations {
let tx = db.begin_tx();
let id = IDS.fetch_add(1, Ordering::SeqCst);
let id = RowID {
table_id: 1,
row_id: id,
};
let row = Row {
id,
data: "Hello".to_string().into_bytes(),
};
db.insert(tx, row.clone()).unwrap();
db.commit_tx(tx).unwrap();
let tx = db.begin_tx();
let committed_row = db.read(tx, id).unwrap();
db.commit_tx(tx).unwrap();
assert_eq!(committed_row, Some(row));
}
})
};
let th2 = {
std::thread::spawn(move || {
for _ in 0..iterations {
let tx = db.begin_tx();
let id = IDS.fetch_add(1, Ordering::SeqCst);
let id = RowID {
table_id: 1,
row_id: id,
};
let row = Row {
id,
data: "World".to_string().into_bytes(),
};
db.insert(tx, row.clone()).unwrap();
db.commit_tx(tx).unwrap();
let tx = db.begin_tx();
let committed_row = db.read(tx, id).unwrap();
db.commit_tx(tx).unwrap();
assert_eq!(committed_row, Some(row));
}
})
};
th1.join().unwrap();
th2.join().unwrap();
}
// FIXME: This test fails sporadically.
#[test]
#[ignore]
fn test_overlapping_concurrent_inserts_read_your_writes() {
let clock = LocalClock::default();
let storage = crate::mvcc::persistent_storage::Storage::new_noop();
let db = Arc::new(MvStore::new(clock, storage));
let iterations = 100000;
let work = |prefix: &'static str| {
let db = db.clone();
std::thread::spawn(move || {
let mut failed_upserts = 0;
for i in 0..iterations {
if i % 1000 == 0 {
tracing::debug!("{prefix}: {i}");
}
if i % 10000 == 0 {
let dropped = db.drop_unused_row_versions();
tracing::debug!("garbage collected {dropped} versions");
}
let tx = db.begin_tx();
let id = i % 16;
let id = RowID {
table_id: 1,
row_id: id,
};
let row = Row {
id,
data: format!("{prefix} @{tx}").into_bytes(),
};
if let Err(e) = db.upsert(tx, row.clone()) {
tracing::trace!("upsert failed: {e}");
failed_upserts += 1;
continue;
}
let committed_row = db.read(tx, id).unwrap();
db.commit_tx(tx).unwrap();
assert_eq!(committed_row, Some(row));
}
tracing::info!(
"{prefix}'s failed upserts: {failed_upserts}/{iterations} {:.2}%",
(failed_upserts * 100) as f64 / iterations as f64
);
})
};
let threads = vec![work("A"), work("B"), work("C"), work("D")];
for th in threads {
th.join().unwrap();
}
}
}