From 0d6045ad878e76bc2ac033cb5ad97e94de889638 Mon Sep 17 00:00:00 2001 From: skoupidi Date: Tue, 8 Jul 2025 16:31:04 +0300 Subject: [PATCH] drk: use merkle tree checkpoints instead of sled-overlay diffs to manage them --- bin/drk/src/cache.rs | 20 ++++++----- bin/drk/src/dao.rs | 45 +++++++++-------------- bin/drk/src/interactive.rs | 4 +-- bin/drk/src/main.rs | 2 +- bin/drk/src/money.rs | 24 ++++++------- bin/drk/src/rpc.rs | 68 +++++++++++------------------------ bin/drk/src/scanned_blocks.rs | 37 ++++++++++++++++++- 7 files changed, 98 insertions(+), 102 deletions(-) diff --git a/bin/drk/src/cache.rs b/bin/drk/src/cache.rs index de1c89ced..16622215a 100644 --- a/bin/drk/src/cache.rs +++ b/bin/drk/src/cache.rs @@ -80,6 +80,18 @@ impl Cache { }) } + /// Execute an atomic sled batch corresponding to inserts to the + /// merkle trees tree. For each record, the bytes slice is used as + /// the key, and the serialized merkle tree is used as value. + pub fn insert_merkle_trees(&self, trees: &[(&[u8], &MerkleTree)]) -> Result<()> { + let mut batch = sled::Batch::default(); + for (key, tree) in trees { + batch.insert(*key, serialize(*tree)); + } + self.merkle_trees.apply_batch(batch)?; + Ok(()) + } + /// Insert a `u32` and a block inverse diff into store's inverse /// diffs tree. The block height is used as the key, and the /// serialized database inverse diff is used as value. @@ -138,14 +150,6 @@ impl CacheOverlay { )?; Ok(()) } - - /// Insert a bytes slice and a merkle tree into overlay's merkle - /// trees tree. The provided bytes slice is used as the key, and - /// the serialized merkle tree is used as value. - pub fn insert_merkle_tree(&mut self, key: &[u8], tree: &MerkleTree) -> Result<()> { - self.0.insert(SLED_MERKLE_TREES_TREE, key, &serialize(tree))?; - Ok(()) - } } pub type CacheSmt = SparseMerkleTree< diff --git a/bin/drk/src/dao.rs b/bin/drk/src/dao.rs index 2d1378c12..bac9a5b1e 100644 --- a/bin/drk/src/dao.rs +++ b/bin/drk/src/dao.rs @@ -927,11 +927,11 @@ impl Drk { pub async fn get_dao_trees(&self) -> Result<(MerkleTree, MerkleTree)> { let daos_tree = match self.cache.merkle_trees.get(SLED_MERKLE_TREES_DAO_DAOS)? { Some(tree_bytes) => deserialize_async(&tree_bytes).await?, - None => MerkleTree::new(1), + None => MerkleTree::new(u32::MAX as usize), }; let proposals_tree = match self.cache.merkle_trees.get(SLED_MERKLE_TREES_DAO_PROPOSALS)? { Some(tree_bytes) => deserialize_async(&tree_bytes).await?, - None => MerkleTree::new(1), + None => MerkleTree::new(u32::MAX as usize), }; Ok((daos_tree, proposals_tree)) } @@ -1406,9 +1406,7 @@ impl Drk { /// Append data related to DAO contract transactions into the /// wallet database and update the provided scan cache. - /// Returns a flag indicating if the daos tree should be updated, - /// one indicating if the proposals tree should be updated and - /// another one indicating if provided data refer to our own + /// Returns a flag indicating if provided data refer to our own /// wallet. pub async fn apply_tx_dao_data( &self, @@ -1417,51 +1415,42 @@ impl Drk { tx_hash: &TransactionHash, call_idx: &u8, block_height: &u32, - ) -> Result<(bool, bool, bool)> { + ) -> Result { // Run through the transaction call data and see what we got: match DaoFunction::try_from(data[0])? { DaoFunction::Mint => { scan_cache.log(String::from("[apply_tx_dao_data] Found Dao::Mint call")); let params: DaoMintParams = deserialize_async(&data[1..]).await?; - let own_tx = self - .apply_dao_mint_data( - scan_cache, - ¶ms.dao_bulla, - tx_hash, - call_idx, - block_height, - ) - .await?; - Ok((true, false, own_tx)) + self.apply_dao_mint_data( + scan_cache, + ¶ms.dao_bulla, + tx_hash, + call_idx, + block_height, + ) + .await } DaoFunction::Propose => { scan_cache.log(String::from("[apply_tx_dao_data] Found Dao::Propose call")); let params: DaoProposeParams = deserialize_async(&data[1..]).await?; - let own_tx = self - .apply_dao_propose_data(scan_cache, ¶ms, tx_hash, call_idx, block_height) - .await?; - Ok((false, true, own_tx)) + self.apply_dao_propose_data(scan_cache, ¶ms, tx_hash, call_idx, block_height) + .await } DaoFunction::Vote => { scan_cache.log(String::from("[apply_tx_dao_data] Found Dao::Vote call")); let params: DaoVoteParams = deserialize_async(&data[1..]).await?; - let own_tx = self - .apply_dao_vote_data(scan_cache, ¶ms, tx_hash, call_idx, block_height) - .await?; - Ok((false, false, own_tx)) + self.apply_dao_vote_data(scan_cache, ¶ms, tx_hash, call_idx, block_height).await } DaoFunction::Exec => { scan_cache.log(String::from("[apply_tx_dao_data] Found Dao::Exec call")); let params: DaoExecParams = deserialize_async(&data[1..]).await?; - let own_tx = - self.apply_dao_exec_data(scan_cache, ¶ms, tx_hash, block_height).await?; - Ok((false, false, own_tx)) + self.apply_dao_exec_data(scan_cache, ¶ms, tx_hash, block_height).await } DaoFunction::AuthMoneyTransfer => { scan_cache .log(String::from("[apply_tx_dao_data] Found Dao::AuthMoneyTransfer call")); // Does nothing, just verifies the other calls are correct - Ok((false, false, false)) + Ok(false) } } } diff --git a/bin/drk/src/interactive.rs b/bin/drk/src/interactive.rs index 260ee758a..36c532af8 100644 --- a/bin/drk/src/interactive.rs +++ b/bin/drk/src/interactive.rs @@ -2446,7 +2446,7 @@ async fn handle_scan( }; let mut buf = vec![]; - if let Err(e) = lock.reset_to_height(height, &mut buf) { + if let Err(e) = lock.reset_to_height(height, &mut buf).await { buf.push(format!("Failed during wallet reset: {e}")); append_or_print(output, None, print, buf).await; return @@ -2995,7 +2995,7 @@ async fn handle_token_mint(drk: &DrkPtr, parts: &[&str], output: &mut Vec Some(s), diff --git a/bin/drk/src/main.rs b/bin/drk/src/main.rs index a6d4bdf82..bdef710d8 100644 --- a/bin/drk/src/main.rs +++ b/bin/drk/src/main.rs @@ -2019,7 +2019,7 @@ async fn realmain(args: Args, ex: ExecutorPtr) -> Result<()> { if let Some(height) = reset { let mut buf = vec![]; - if let Err(e) = drk.reset_to_height(height, &mut buf) { + if let Err(e) = drk.reset_to_height(height, &mut buf).await { print_output(&buf); eprintln!("Failed during wallet reset: {e}"); exit(2); diff --git a/bin/drk/src/money.rs b/bin/drk/src/money.rs index 99ed83b2f..ef294434f 100644 --- a/bin/drk/src/money.rs +++ b/bin/drk/src/money.rs @@ -679,7 +679,7 @@ impl Drk { match self.cache.merkle_trees.get(SLED_MERKLE_TREES_MONEY)? { Some(tree_bytes) => Ok(deserialize_async(&tree_bytes).await?), None => { - let mut tree = MerkleTree::new(1); + let mut tree = MerkleTree::new(u32::MAX as usize); tree.append(MerkleNode::from(pallas::Base::ZERO)); let _ = tree.mark().unwrap(); Ok(tree) @@ -772,21 +772,20 @@ impl Drk { /// Auxiliary function to handle coins with their notes from a /// transaction money call. - /// Returns a flag indicating if the money tree should be updated, - /// along with found own coins. + /// Returns our found own coins. fn handle_money_call_coins( &self, tree: &mut MerkleTree, secrets: &[SecretKey], messages_buffer: &mut Vec, coins: &[(Coin, AeadEncryptedNote)], - ) -> (bool, Vec) { + ) -> Vec { // Keep track of our own coins found in the vec let mut owncoins = vec![]; // Check if provided coins vec is empty if coins.is_empty() { - return (false, owncoins) + return owncoins } // Handle provided coins vector and grab our own @@ -809,7 +808,7 @@ impl Drk { } } - (true, owncoins) + owncoins } /// Auxiliary function to handle own coins from a transaction money @@ -943,8 +942,8 @@ impl Drk { /// Append data related to Money contract transactions into the /// wallet database and update the provided scan cache. - /// Returns a flag indicating if the money tree should be updated - /// and one indicating if provided data refer to our own wallet. + /// Returns a flag indicating if provided data refer to our own + /// wallet. pub async fn apply_tx_money_data( &self, scan_cache: &mut ScanCache, @@ -952,13 +951,13 @@ impl Drk { calls: &[DarkLeaf], tx_hash: &String, block_height: &u32, - ) -> Result<(bool, bool)> { + ) -> Result { // Parse the call let (nullifiers, coins, freezes) = self.parse_money_call(scan_cache, call_idx, calls).await?; // Parse call coins and grab our own - let (update_tree, owncoins) = self.handle_money_call_coins( + let owncoins = self.handle_money_call_coins( &mut scan_cache.money_tree, &scan_cache.notes_secrets, &mut scan_cache.messages_buffer, @@ -988,10 +987,7 @@ impl Drk { kaching().await; } - Ok(( - update_tree || wallet_spent_coins, - wallet_spent_coins || !owncoins.is_empty() || wallet_freezes, - )) + Ok(wallet_spent_coins || !owncoins.is_empty() || wallet_freezes) } /// Auxiliary function to grab all the nullifiers from a transaction money call. diff --git a/bin/drk/src/rpc.rs b/bin/drk/src/rpc.rs index d1045a0bd..7d6b5de8d 100644 --- a/bin/drk/src/rpc.rs +++ b/bin/drk/src/rpc.rs @@ -147,11 +147,7 @@ impl Drk { /// `scan_block` will go over over transactions in a block and handle their calls /// based on the called contract. async fn scan_block(&self, scan_cache: &mut ScanCache, block: &BlockInfo) -> Result<()> { - // Keep track of the trees we need to update and our wallet - // transactions. - let mut update_money_tree = false; - let mut update_dao_daos_tree = false; - let mut update_dao_proposals_tree = false; + // Keep track of our wallet transactions. let mut wallet_txs = vec![]; // Scan the block @@ -167,7 +163,7 @@ impl Drk { for (i, call) in tx.calls.iter().enumerate() { if call.data.contract_id == *MONEY_CONTRACT_ID { scan_cache.log(format!("[scan_block] Found Money contract in call {i}")); - let (update_tree, own_tx) = self + if self .apply_tx_money_data( scan_cache, &i, @@ -175,11 +171,8 @@ impl Drk { &tx_hash_string, &block.header.height, ) - .await?; - if update_tree { - update_money_tree = true; - } - if own_tx { + .await? + { wallet_tx = true; } continue @@ -187,7 +180,7 @@ impl Drk { if call.data.contract_id == *DAO_CONTRACT_ID { scan_cache.log(format!("[scan_block] Found DAO contract in call {i}")); - let (update_daos_tree, update_proposals_tree, own_tx) = self + if self .apply_tx_dao_data( scan_cache, &call.data.data, @@ -195,14 +188,8 @@ impl Drk { &(i as u8), &block.header.height, ) - .await?; - if update_daos_tree { - update_dao_daos_tree = true; - } - if update_proposals_tree { - update_dao_proposals_tree = true; - } - if own_tx { + .await? + { wallet_tx = true; } continue @@ -225,32 +212,6 @@ impl Drk { } } - // Update money merkle tree, if needed - if update_money_tree { - scan_cache - .money_smt - .store - .overlay - .insert_merkle_tree(SLED_MERKLE_TREES_MONEY, &scan_cache.money_tree)?; - } - - // Update dao daos merkle tree, if needed - if update_dao_daos_tree { - scan_cache - .money_smt - .store - .overlay - .insert_merkle_tree(SLED_MERKLE_TREES_DAO_DAOS, &scan_cache.dao_daos_tree)?; - } - - // Update dao proposals merkle tree, if needed - if update_dao_proposals_tree { - scan_cache.money_smt.store.overlay.insert_merkle_tree( - SLED_MERKLE_TREES_DAO_PROPOSALS, - &scan_cache.dao_proposals_tree, - )?; - } - // Insert the block record scan_cache .money_smt @@ -267,6 +228,16 @@ impl Drk { // Insert the state inverse diff record self.cache.insert_state_inverse_diff(&block.header.height, &diff.inverse())?; + // Checkpoint and update the merkle trees + scan_cache.money_tree.checkpoint(block.header.height as usize); + scan_cache.dao_daos_tree.checkpoint(block.header.height as usize); + scan_cache.dao_proposals_tree.checkpoint(block.header.height as usize); + self.cache.insert_merkle_trees(&[ + (SLED_MERKLE_TREES_MONEY, &scan_cache.money_tree), + (SLED_MERKLE_TREES_DAO_DAOS, &scan_cache.dao_daos_tree), + (SLED_MERKLE_TREES_DAO_PROPOSALS, &scan_cache.dao_proposals_tree), + ])?; + // Flush sled self.cache.sled_db.flush()?; @@ -341,7 +312,7 @@ impl Drk { // Reset to its height buf.push(format!("Last common block found: {height} - {scanned_block_hash}")); - self.reset_to_height(height, &mut buf)?; + self.reset_to_height(height, &mut buf).await?; append_or_print(output, sender, print, buf).await; break } @@ -720,7 +691,8 @@ pub async fn subscribe_blocks( let lock = drk.read().await; if block.header.height <= last_scanned_height { let reset_height = block.header.height.saturating_sub(1); - if let Err(e) = lock.reset_to_height(reset_height, &mut shell_message) { + if let Err(e) = lock.reset_to_height(reset_height, &mut shell_message).await + { shell_sender.send(shell_message).await?; break 'outer Error::Custom(format!( "[subscribe_blocks] Wallet state reset failed: {e}" diff --git a/bin/drk/src/scanned_blocks.rs b/bin/drk/src/scanned_blocks.rs index 94a29c98d..f75b8bf9b 100644 --- a/bin/drk/src/scanned_blocks.rs +++ b/bin/drk/src/scanned_blocks.rs @@ -20,7 +20,9 @@ use darkfi_serial::deserialize; use crate::{ cache::CacheOverlay, + dao::{SLED_MERKLE_TREES_DAO_DAOS, SLED_MERKLE_TREES_DAO_PROPOSALS}, error::{WalletDbError, WalletDbResult}, + money::SLED_MERKLE_TREES_MONEY, Drk, }; @@ -98,7 +100,11 @@ impl Drk { /// Reset state to provided block height. /// If genesis block height(0) was provided, perform a full reset. - pub fn reset_to_height(&self, height: u32, output: &mut Vec) -> WalletDbResult<()> { + pub async fn reset_to_height( + &self, + height: u32, + output: &mut Vec, + ) -> WalletDbResult<()> { output.push(format!("Resetting wallet state to block: {height}")); // If genesis block height(0) was provided, @@ -118,6 +124,22 @@ impl Drk { return Ok(()) } + // Grab our current merkle trees + let mut money_tree = match self.get_money_tree().await { + Ok(t) => t, + Err(e) => { + output.push(format!("[reset_to_height] Money merkle tree retrieval failed: {e}")); + return Err(WalletDbError::GenericError) + } + }; + let (mut dao_daos_tree, mut dao_proposals_tree) = match self.get_dao_trees().await { + Ok(p) => p, + Err(e) => { + output.push(format!("[reset_to_height] DAO merkle trees retrieval failed: {e}")); + return Err(WalletDbError::GenericError) + } + }; + // Create an overlay to apply the reverse diffs let mut overlay = match CacheOverlay::new(&self.cache) { Ok(o) => o, @@ -160,6 +182,19 @@ impl Drk { return Err(WalletDbError::GenericError) } + // Rewind and update the merkle trees + money_tree.checkpoint(height as usize); + dao_daos_tree.checkpoint(height as usize); + dao_proposals_tree.checkpoint(height as usize); + if let Err(e) = self.cache.insert_merkle_trees(&[ + (SLED_MERKLE_TREES_MONEY, &money_tree), + (SLED_MERKLE_TREES_DAO_DAOS, &dao_daos_tree), + (SLED_MERKLE_TREES_DAO_PROPOSALS, &dao_proposals_tree), + ]) { + output.push(format!("[reset_to_height] Updating merkle trees failed: {e}")); + return Err(WalletDbError::GenericError) + }; + // Flush sled if let Err(e) = self.cache.sled_db.flush() { output.push(format!("[reset_to_height] Flushing cache sled database failed: {e}"));