database: add dropping unused row versions

When a row version is not visible by any transactions,
active or future ones, it should be dropped.
This commit is contained in:
Piotr Sarna 2023-04-20 15:34:17 +02:00
parent 2a018ea9a3
commit fb6ce70993

View file

@ -2,7 +2,7 @@ use crate::clock::LogicalClock;
use crate::errors::DatabaseError;
use serde::{Deserialize, Serialize};
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
@ -135,6 +135,7 @@ impl<
let inner = DatabaseInner {
rows: RefCell::new(HashMap::new()),
txs: RefCell::new(HashMap::new()),
tx_timestamps: RefCell::new(BTreeMap::new()),
tx_ids: AtomicU64::new(0),
clock,
storage,
@ -260,16 +261,20 @@ impl<
inner.rollback_tx(tx_id).await;
}
/// Drops all unused row versions from the database.
///
/// A version is considered unused if it is not visible to any active transaction
/// and it is not the most recent version of the row.
pub async fn drop_unused_row_versions(&self) {
let inner = self.inner.lock().await;
inner.drop_unused_row_versions();
}
#[cfg(test)]
pub(crate) async fn scan_storage(&self) -> Result<Vec<Mutation>> {
use futures::StreamExt;
let inner = self.inner.lock().await;
Ok(inner
.storage
.scan()
.await?
.collect::<Vec<Mutation>>()
.await)
Ok(inner.storage.scan().await?.collect::<Vec<Mutation>>().await)
}
}
@ -277,6 +282,7 @@ impl<
pub struct DatabaseInner<Clock: LogicalClock, Storage: crate::persistent_storage::Storage> {
rows: RefCell<HashMap<u64, Vec<RowVersion>>>,
txs: RefCell<HashMap<TxID, Transaction>>,
tx_timestamps: RefCell<BTreeMap<u64, usize>>,
tx_ids: AtomicU64,
clock: Clock,
storage: Storage,
@ -356,7 +362,9 @@ impl<Clock: LogicalClock, Storage: crate::persistent_storage::Storage>
let tx = Transaction::new(tx_id, begin_ts);
tracing::trace!("BEGIN {tx}");
let mut txs = self.txs.borrow_mut();
let mut tx_timestamps = self.tx_timestamps.borrow_mut();
txs.insert(tx_id, tx);
*tx_timestamps.entry(begin_ts).or_insert(0) += 1;
tx_id
}
@ -401,6 +409,13 @@ impl<Clock: LogicalClock, Storage: crate::persistent_storage::Storage>
// invariant doesn't necessarily hold anymore because another thread
// might have speculatively read a version that we want to remove.
// But that's a problem for another day.
let mut tx_timestamps = self.tx_timestamps.borrow_mut();
if let Some(timestamp_entry) = tx_timestamps.get_mut(&tx.begin_ts) {
*timestamp_entry -= 1;
if timestamp_entry == &0 {
tx_timestamps.remove(&tx.begin_ts);
}
}
txs.remove(&tx_id);
drop(rows);
drop(txs);
@ -436,6 +451,54 @@ impl<Clock: LogicalClock, Storage: crate::persistent_storage::Storage>
fn get_timestamp(&mut self) -> u64 {
self.clock.get_timestamp()
}
/// Drops all rows that are not visible to any transaction.
/// The logic is as follows. If a row version has an end marker
/// which denotes a transaction that is not active, then we can
/// drop the row version -- it is not visible to any transaction.
/// If a row version has an end marker that denotes a timestamp T_END,
/// then we can drop the row version only if all active transactions
/// have a begin timestamp that is greater than timestamp T_END.
/// FIXME: this function is a full scan over all rows and row versions.
/// We can do better by keeping an index of row versions ordered
/// by their end timestamps.
fn drop_unused_row_versions(&self) {
let txs = self.txs.borrow();
let tx_timestamps = self.tx_timestamps.borrow();
let mut rows = self.rows.borrow_mut();
let mut to_remove = Vec::new();
for (id, row_versions) in rows.iter_mut() {
row_versions.retain(|rv| {
let should_stay = match rv.end {
Some(TxTimestampOrID::Timestamp(version_end_ts)) => {
match tx_timestamps.first_key_value() {
// a transaction started before this row version ended,
// ergo row version is needed
Some((begin_ts, _)) => version_end_ts >= *begin_ts,
// no transaction => row version is not needed
None => false,
}
}
// Let's skip potentially complex logic if the transaction is still
// active/tracked. We will drop the row version when the transaction
// gets garbage-collected itself, it will always happen eventually.
Some(TxTimestampOrID::TxID(tx_id)) => !txs.contains_key(&tx_id),
// this row version is current, ergo visible
None => true,
};
if !should_stay {
tracing::debug!("Dropping row version {} {:?}-{:?}", id, rv.begin, rv.end);
}
should_stay
});
if row_versions.is_empty() {
to_remove.push(*id);
}
}
for id in to_remove {
rows.remove(&id);
}
}
}
/// A write-write conflict happens when transaction T_m attempts to update a
@ -599,6 +662,7 @@ mod tests {
let row = db.read(tx2, 1).await.unwrap().unwrap();
db.commit_tx(tx2).await.unwrap();
assert_eq!(tx1_updated_row, row);
db.drop_unused_row_versions().await;
}
#[traced_test]
@ -885,19 +949,12 @@ mod tests {
.await
.unwrap();
let mutation = db
.scan_storage()
.await
.unwrap();
let mutation = db.scan_storage().await.unwrap();
println!("{:?}", mutation);
db.commit_tx(tx4).await.unwrap();
let mutation = db
.scan_storage()
.await
.unwrap();
let mutation = db.scan_storage().await.unwrap();
println!("{:?}", mutation);
}
}