mirror of
https://git.deuxfleurs.fr/Deuxfleurs/garage.git
synced 2025-12-23 11:37:28 +00:00
[SKIP CI] WIP merkle todo encapsulation
This commit is contained in:
parent
46ebfdba66
commit
85aca61860
2 changed files with 67 additions and 19 deletions
|
|
@ -3,6 +3,7 @@ use std::convert::TryInto;
|
|||
use std::sync::Arc;
|
||||
|
||||
use serde_bytes::ByteBuf;
|
||||
use tokio::sync::SemaphorePermit;
|
||||
use tokio::sync::{Notify, Semaphore};
|
||||
|
||||
use garage_db as db;
|
||||
|
|
@ -21,6 +22,67 @@ use crate::replication::*;
|
|||
use crate::schema::*;
|
||||
use crate::util::*;
|
||||
|
||||
pub(crate) struct MerkleTodo {
|
||||
merkle_todo: db::Tree,
|
||||
merkle_todo_notify: Notify,
|
||||
merkle_todo_bounded_queue: Option<Arc<Semaphore>>,
|
||||
}
|
||||
impl Clone for MerkleTodo {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
merkle_todo: self.merkle_todo.clone(),
|
||||
merkle_todo_notify: Notify::new(),
|
||||
merkle_todo_bounded_queue: self.merkle_todo_bounded_queue.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl MerkleTodo {
|
||||
fn new<F: TableSchema>(db: &db::Db, config: &MerkleBackpressureEnum) -> Self {
|
||||
let merkle_todo = db
|
||||
.open_tree(format!("{}:merkle_todo", F::TABLE_NAME))
|
||||
.expect("Unable to open DB Merkle TODO tree");
|
||||
|
||||
let merkle_todo_bounded_queue = match config {
|
||||
MerkleBackpressureEnum::None => None,
|
||||
MerkleBackpressureEnum::FixedQueue(p) => {
|
||||
Some(Arc::new(Semaphore::new(p.max_queue_size)))
|
||||
}
|
||||
};
|
||||
|
||||
Self {
|
||||
merkle_todo,
|
||||
merkle_todo_notify: Notify::new(),
|
||||
merkle_todo_bounded_queue,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn len(&self) -> Result<usize, db::Error> {
|
||||
self.merkle_todo.len()
|
||||
}
|
||||
|
||||
pub(crate) async fn with_db<F: FnOnce(&db::Tree, SemaphorePermit)>(&self, f: F) {
|
||||
let bounded = self
|
||||
.merkle_todo_bounded_queue
|
||||
.clone()
|
||||
.unwrap_or(Arc::new(Semaphore::new(1)));
|
||||
let permit = bounded.acquire().await.unwrap();
|
||||
f(&self.merkle_todo, permit);
|
||||
}
|
||||
|
||||
pub(crate) fn appended(&self, permit: SemaphorePermit) {
|
||||
permit.forget();
|
||||
self.merkle_todo_notify.notify_one();
|
||||
}
|
||||
|
||||
pub(crate) fn processed(&self) {
|
||||
let bounded = self
|
||||
.merkle_todo_bounded_queue
|
||||
.clone()
|
||||
.unwrap_or(Arc::new(Semaphore::new(1)));
|
||||
bounded.add_permits(1);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TableData<F: TableSchema, R: TableReplication> {
|
||||
system: Arc<System>,
|
||||
|
||||
|
|
@ -30,9 +92,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
|
|||
pub store: db::Tree,
|
||||
|
||||
pub(crate) merkle_tree: db::Tree,
|
||||
pub(crate) merkle_todo: db::Tree,
|
||||
pub(crate) merkle_todo_notify: Notify,
|
||||
pub(crate) merkle_todo_bounded_queue: Option<Arc<Semaphore>>,
|
||||
pub(crate) merkle_todo: MerkleTodo,
|
||||
|
||||
pub(crate) insert_queue: db::Tree,
|
||||
pub(crate) insert_queue_notify: Arc<Notify>,
|
||||
|
|
@ -59,16 +119,8 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
|||
let merkle_tree = db
|
||||
.open_tree(format!("{}:merkle_tree", F::TABLE_NAME))
|
||||
.expect("Unable to open DB Merkle tree tree");
|
||||
let merkle_todo = db
|
||||
.open_tree(format!("{}:merkle_todo", F::TABLE_NAME))
|
||||
.expect("Unable to open DB Merkle TODO tree");
|
||||
|
||||
let merkle_todo_bounded_queue = match config {
|
||||
MerkleBackpressureEnum::None => None,
|
||||
MerkleBackpressureEnum::FixedQueue(p) => {
|
||||
Some(Arc::new(Semaphore::new(p.max_queue_size)))
|
||||
}
|
||||
};
|
||||
let merkle_todo = MerkleTodo::new::<F>(db, config);
|
||||
|
||||
let insert_queue = db
|
||||
.open_tree(format!("{}:insert_queue", F::TABLE_NAME))
|
||||
|
|
@ -83,7 +135,6 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
|||
store.clone(),
|
||||
merkle_tree.clone(),
|
||||
merkle_todo.clone(),
|
||||
merkle_todo_bounded_queue.clone(),
|
||||
gc_todo.clone(),
|
||||
);
|
||||
|
||||
|
|
@ -94,8 +145,6 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
|||
store,
|
||||
merkle_tree,
|
||||
merkle_todo,
|
||||
merkle_todo_notify: Notify::new(),
|
||||
merkle_todo_bounded_queue,
|
||||
insert_queue,
|
||||
insert_queue_notify: Arc::new(Notify::new()),
|
||||
gc_todo,
|
||||
|
|
|
|||
|
|
@ -1,10 +1,10 @@
|
|||
use opentelemetry::{global, metrics::*, KeyValue};
|
||||
use std::convert::TryInto;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use garage_db as db;
|
||||
|
||||
use crate::data::MerkleTodo;
|
||||
|
||||
/// TableMetrics reference all counter used for metrics
|
||||
pub struct TableMetrics {
|
||||
pub(crate) _table_size: ValueObserver<u64>,
|
||||
|
|
@ -29,8 +29,7 @@ impl TableMetrics {
|
|||
table_name: &'static str,
|
||||
store: db::Tree,
|
||||
merkle_tree: db::Tree,
|
||||
merkle_todo: db::Tree,
|
||||
merkle_todo_bounded_queue: Option<Arc<Semaphore>>,
|
||||
merkle_todo: MerkleTodo,
|
||||
gc_todo: db::Tree,
|
||||
) -> Self {
|
||||
let meter = global::meter(table_name);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue