From 85aca61860afad947be4effdc53c8ef267db3984 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 2 May 2025 08:18:46 +0200 Subject: [PATCH] [SKIP CI] WIP merkle todo encapsulation --- src/table/data.rs | 79 +++++++++++++++++++++++++++++++++++--------- src/table/metrics.rs | 7 ++-- 2 files changed, 67 insertions(+), 19 deletions(-) diff --git a/src/table/data.rs b/src/table/data.rs index 0b65c46b..af001748 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -3,6 +3,7 @@ use std::convert::TryInto; use std::sync::Arc; use serde_bytes::ByteBuf; +use tokio::sync::SemaphorePermit; use tokio::sync::{Notify, Semaphore}; use garage_db as db; @@ -21,6 +22,67 @@ use crate::replication::*; use crate::schema::*; use crate::util::*; +pub(crate) struct MerkleTodo { + merkle_todo: db::Tree, + merkle_todo_notify: Notify, + merkle_todo_bounded_queue: Option>, +} +impl Clone for MerkleTodo { + fn clone(&self) -> Self { + Self { + merkle_todo: self.merkle_todo.clone(), + merkle_todo_notify: Notify::new(), + merkle_todo_bounded_queue: self.merkle_todo_bounded_queue.clone(), + } + } +} +impl MerkleTodo { + fn new(db: &db::Db, config: &MerkleBackpressureEnum) -> Self { + let merkle_todo = db + .open_tree(format!("{}:merkle_todo", F::TABLE_NAME)) + .expect("Unable to open DB Merkle TODO tree"); + + let merkle_todo_bounded_queue = match config { + MerkleBackpressureEnum::None => None, + MerkleBackpressureEnum::FixedQueue(p) => { + Some(Arc::new(Semaphore::new(p.max_queue_size))) + } + }; + + Self { + merkle_todo, + merkle_todo_notify: Notify::new(), + merkle_todo_bounded_queue, + } + } + + pub(crate) fn len(&self) -> Result { + self.merkle_todo.len() + } + + pub(crate) async fn with_db(&self, f: F) { + let bounded = self + .merkle_todo_bounded_queue + .clone() + .unwrap_or(Arc::new(Semaphore::new(1))); + let permit = bounded.acquire().await.unwrap(); + f(&self.merkle_todo, permit); + } + + pub(crate) fn appended(&self, permit: SemaphorePermit) { + permit.forget(); + self.merkle_todo_notify.notify_one(); + } + + pub(crate) fn processed(&self) { + let bounded = self + .merkle_todo_bounded_queue + .clone() + .unwrap_or(Arc::new(Semaphore::new(1))); + bounded.add_permits(1); + } +} + pub struct TableData { system: Arc, @@ -30,9 +92,7 @@ pub struct TableData { pub store: db::Tree, pub(crate) merkle_tree: db::Tree, - pub(crate) merkle_todo: db::Tree, - pub(crate) merkle_todo_notify: Notify, - pub(crate) merkle_todo_bounded_queue: Option>, + pub(crate) merkle_todo: MerkleTodo, pub(crate) insert_queue: db::Tree, pub(crate) insert_queue_notify: Arc, @@ -59,16 +119,8 @@ impl TableData { let merkle_tree = db .open_tree(format!("{}:merkle_tree", F::TABLE_NAME)) .expect("Unable to open DB Merkle tree tree"); - let merkle_todo = db - .open_tree(format!("{}:merkle_todo", F::TABLE_NAME)) - .expect("Unable to open DB Merkle TODO tree"); - let merkle_todo_bounded_queue = match config { - MerkleBackpressureEnum::None => None, - MerkleBackpressureEnum::FixedQueue(p) => { - Some(Arc::new(Semaphore::new(p.max_queue_size))) - } - }; + let merkle_todo = MerkleTodo::new::(db, config); let insert_queue = db .open_tree(format!("{}:insert_queue", F::TABLE_NAME)) @@ -83,7 +135,6 @@ impl TableData { store.clone(), merkle_tree.clone(), merkle_todo.clone(), - merkle_todo_bounded_queue.clone(), gc_todo.clone(), ); @@ -94,8 +145,6 @@ impl TableData { store, merkle_tree, merkle_todo, - merkle_todo_notify: Notify::new(), - merkle_todo_bounded_queue, insert_queue, insert_queue_notify: Arc::new(Notify::new()), gc_todo, diff --git a/src/table/metrics.rs b/src/table/metrics.rs index f5c029d9..6b05a21c 100644 --- a/src/table/metrics.rs +++ b/src/table/metrics.rs @@ -1,10 +1,10 @@ use opentelemetry::{global, metrics::*, KeyValue}; use std::convert::TryInto; -use std::sync::Arc; -use tokio::sync::Semaphore; use garage_db as db; +use crate::data::MerkleTodo; + /// TableMetrics reference all counter used for metrics pub struct TableMetrics { pub(crate) _table_size: ValueObserver, @@ -29,8 +29,7 @@ impl TableMetrics { table_name: &'static str, store: db::Tree, merkle_tree: db::Tree, - merkle_todo: db::Tree, - merkle_todo_bounded_queue: Option>, + merkle_todo: MerkleTodo, gc_todo: db::Tree, ) -> Self { let meter = global::meter(table_name);