From f0e22c8d1e98ce1a6bcf10a6f48d4631ecb91d0b Mon Sep 17 00:00:00 2001 From: skoupidi Date: Fri, 3 Apr 2026 17:41:20 +0300 Subject: [PATCH] darkfid/task/garbage_collect: refactored to a standalone background task --- bin/darkfid/darkfid_config.toml | 9 - bin/darkfid/src/lib.rs | 59 +++-- bin/darkfid/src/main.rs | 14 +- bin/darkfid/src/registry/mod.rs | 4 +- bin/darkfid/src/task/consensus.rs | 66 ++---- bin/darkfid/src/task/garbage_collect.rs | 272 ++++++++++++------------ bin/darkfid/src/tests/harness.rs | 2 +- bin/darkfid/src/tests/mod.rs | 1 - 8 files changed, 196 insertions(+), 231 deletions(-) diff --git a/bin/darkfid/darkfid_config.toml b/bin/darkfid/darkfid_config.toml index 6960231e7..54a1bbafb 100644 --- a/bin/darkfid/darkfid_config.toml +++ b/bin/darkfid/darkfid_config.toml @@ -38,9 +38,6 @@ skip_fees = false # Optional sync checkpoint hash #checkpoint = "" -# Garbage collection task transactions batch size -txs_batch_size = 50 - ## Testnet JSON-RPC settings [network_config."testnet".rpc] # JSON-RPC listen URL @@ -236,9 +233,6 @@ skip_fees = false # Optional sync checkpoint hash #checkpoint = "" -# Garbage collection task transactions batch size -txs_batch_size = 50 - ## Mainnet JSON-RPC settings [network_config."mainnet".rpc] # JSON-RPC listen URL @@ -431,9 +425,6 @@ skip_fees = false # Optional sync checkpoint hash #checkpoint = "" -# Garbage collection task transactions batch size -txs_batch_size = 50 - ## Localnet JSON-RPC settings [network_config."localnet".rpc] # JSON-RPC listen URL diff --git a/bin/darkfid/src/lib.rs b/bin/darkfid/src/lib.rs index 21f757b61..b4b24fd32 100644 --- a/bin/darkfid/src/lib.rs +++ b/bin/darkfid/src/lib.rs @@ -49,7 +49,7 @@ use rpc::{management::ManagementRpcHandler, DefaultRpcHandler}; /// Validator async tasks pub mod task; -use task::{consensus::ConsensusInitTaskConfig, consensus_init_task}; +use task::{consensus::ConsensusInitTaskConfig, consensus_init_task, garbage_collect_task}; /// P2P net protocols mod proto; @@ -70,8 +70,6 @@ pub struct DarkfiNode { p2p_handler: DarkfidP2pHandlerPtr, /// Node miners registry pointer registry: DarkfiMinersRegistryPtr, - /// Garbage collection task transactions batch size - txs_batch_size: usize, /// A map of various subscribers exporting live info from the blockchain subscribers: HashMap<&'static str, JsonSubscriber>, /// Main JSON-RPC connection tracker @@ -85,14 +83,12 @@ impl DarkfiNode { validator: ValidatorPtr, p2p_handler: DarkfidP2pHandlerPtr, registry: DarkfiMinersRegistryPtr, - txs_batch_size: usize, subscribers: HashMap<&'static str, JsonSubscriber>, ) -> Result { Ok(Arc::new(Self { validator, p2p_handler, registry, - txs_batch_size, subscribers, rpc_connections: Mutex::new(HashSet::new()), management_rpc_connections: Mutex::new(HashSet::new()), @@ -115,6 +111,8 @@ pub struct Darkfid { management_rpc_task: StoppableTaskPtr, /// Consensus protocol background task consensus_task: StoppableTaskPtr, + /// Node garbage collection background task + gc_task: StoppableTaskPtr, } impl Darkfid { @@ -127,7 +125,6 @@ impl Darkfid { sled_db: &sled_overlay::sled::Db, config: &ValidatorConfig, net_settings: &Settings, - txs_batch_size: &Option, ex: &ExecutorPtr, ) -> Result { info!(target: "darkfid::Darkfid::init", "Initializing a Darkfi daemon..."); @@ -140,18 +137,6 @@ impl Darkfid { // Initialize the miners registry let registry = DarkfiMinersRegistry::init(network, &validator).await?; - // Grab blockchain network configured transactions batch size for garbage collection - let txs_batch_size = match txs_batch_size { - Some(b) => { - if *b > 0 { - *b - } else { - 50 - } - } - None => 50, - }; - // Here we initialize various subscribers that can export live blockchain/consensus data. let mut subscribers = HashMap::new(); subscribers.insert("blocks", JsonSubscriber::new("blockchain.subscribe_blocks")); @@ -160,18 +145,25 @@ impl Darkfid { subscribers.insert("dnet", JsonSubscriber::new("dnet.subscribe_events")); // Initialize node - let node = - DarkfiNode::new(validator, p2p_handler, registry, txs_batch_size, subscribers).await?; + let node = DarkfiNode::new(validator, p2p_handler, registry, subscribers).await?; // Generate the background tasks let dnet_task = StoppableTask::new(); let rpc_task = StoppableTask::new(); let management_rpc_task = StoppableTask::new(); let consensus_task = StoppableTask::new(); + let gc_task = StoppableTask::new(); info!(target: "darkfid::Darkfid::init", "Darkfi daemon initialized successfully!"); - Ok(Arc::new(Self { node, dnet_task, rpc_task, management_rpc_task, consensus_task })) + Ok(Arc::new(Self { + node, + dnet_task, + rpc_task, + management_rpc_task, + consensus_task, + gc_task, + })) } /// Start the DarkFi daemon in the given executor, using the @@ -249,13 +241,16 @@ impl Darkfid { info!(target: "darkfid::Darkfid::start", "Starting P2P network"); self.node.p2p_handler.start(executor, &self.node).await?; + // Generate the signal queue smol channel + let (sender, receiver) = smol::channel::unbounded::<()>(); + // Start the consensus protocol info!(target: "darkfid::Darkfid::start", "Starting consensus protocol task"); self.consensus_task.clone().start( consensus_init_task( self.node.clone(), config.clone(), - executor.clone(), + sender, ), |res| async move { match res { @@ -267,6 +262,22 @@ impl Darkfid { executor.clone(), ); + // Start the garbage collection task + info!(target: "darkfid::Darkfid::start", "Starting garbage collection task"); + self.gc_task.clone().start( + garbage_collect_task(receiver, self.node.clone()), + |res| async { + match res { + Ok(()) | Err(Error::GarbageCollectionTaskStopped) => { /* Do nothing */ } + Err(e) => { + error!(target: "darkfid", "Failed starting garbage collection task: {e}") + } + } + }, + Error::GarbageCollectionTaskStopped, + executor.clone(), + ); + info!(target: "darkfid::Darkfid::start", "Darkfi daemon started successfully!"); Ok(()) } @@ -295,6 +306,10 @@ impl Darkfid { info!(target: "darkfid::Darkfid::stop", "Stopping P2P network protocols handler..."); self.node.p2p_handler.stop().await; + // Stop the garbage collection task + info!(target: "darkfid::Darkfid::stop", "Stopping garbage collection task..."); + self.gc_task.stop().await; + // Stop the consensus task info!(target: "darkfid::Darkfid::stop", "Stopping consensus task..."); self.consensus_task.stop().await; diff --git a/bin/darkfid/src/main.rs b/bin/darkfid/src/main.rs index 4706723c3..d76417bdc 100644 --- a/bin/darkfid/src/main.rs +++ b/bin/darkfid/src/main.rs @@ -127,10 +127,6 @@ pub struct BlockchainNetwork { /// Optional sync checkpoint hash checkpoint: Option, - #[structopt(long)] - /// Garbage collection task transactions batch size - txs_batch_size: Option, - #[structopt(flatten)] /// P2P network settings net: SettingsOpt, @@ -246,15 +242,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { (env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"), blockchain_config.net).try_into()?; // Generate the daemon - let daemon = Darkfid::init( - network, - &sled_db, - &config, - &p2p_settings, - &blockchain_config.txs_batch_size, - &ex, - ) - .await?; + let daemon = Darkfid::init(network, &sled_db, &config, &p2p_settings, &ex).await?; // Start the daemon let config = ConsensusInitTaskConfig { diff --git a/bin/darkfid/src/registry/mod.rs b/bin/darkfid/src/registry/mod.rs index ed88edb94..ef91c54af 100644 --- a/bin/darkfid/src/registry/mod.rs +++ b/bin/darkfid/src/registry/mod.rs @@ -327,7 +327,7 @@ impl DarkfiMinersRegistryState { Ok(()) } - /// Auxilliary function to retrieve all current block templates + /// Auxiliary function to retrieve all current block templates /// newly opened trees. pub fn new_trees(&self) -> BTreeSet { let mut new_trees = BTreeSet::new(); @@ -339,7 +339,7 @@ impl DarkfiMinersRegistryState { new_trees } - /// Auxilliary function to retrieve all current block templates + /// Auxiliary function to retrieve all current block templates /// transactions hashes. pub fn proposed_transactions(&self) -> HashSet { let mut proposed_txs = HashSet::new(); diff --git a/bin/darkfid/src/task/consensus.rs b/bin/darkfid/src/task/consensus.rs index c5edc8a36..83695978c 100644 --- a/bin/darkfid/src/task/consensus.rs +++ b/bin/darkfid/src/task/consensus.rs @@ -21,17 +21,15 @@ use std::str::FromStr; use darkfi::{ blockchain::HeaderHash, rpc::{jsonrpc::JsonNotification, util::JsonValue}, - system::{sleep, ExecutorPtr, StoppableTask, Subscription}, + system::{sleep, Subscription}, util::{encoding::base64, time::Timestamp}, Error, Result, }; use darkfi_serial::serialize_async; +use smol::channel::Sender; use tracing::{error, info}; -use crate::{ - task::{garbage_collect::garbage_collect_task, sync_task}, - DarkfiNodePtr, -}; +use crate::{task::sync_task, DarkfiNodePtr}; /// Auxiliary structure representing node consensus init task configuration. #[derive(Clone)] @@ -48,7 +46,7 @@ pub struct ConsensusInitTaskConfig { pub async fn consensus_init_task( node: DarkfiNodePtr, config: ConsensusInitTaskConfig, - ex: ExecutorPtr, + sender: Sender<()>, ) -> Result<()> { // Check current canonical blockchain for curruption // TODO: create a restore method reverting each block backwards @@ -104,7 +102,7 @@ pub async fn consensus_init_task( // Gracefully handle network disconnections loop { - match listen_to_network(&node, &ex).await { + match listen_to_network(&node, &sender).await { Ok(_) => return Ok(()), Err(Error::NetworkNotConnected) => { // Sync node again @@ -130,7 +128,7 @@ pub async fn consensus_init_task( } /// Async task to start the consensus task, while monitoring for a network disconnections. -async fn listen_to_network(node: &DarkfiNodePtr, ex: &ExecutorPtr) -> Result<()> { +async fn listen_to_network(node: &DarkfiNodePtr, sender: &Sender<()>) -> Result<()> { // Grab proposals subscriber and subscribe to it let proposals_sub = node.subscribers.get("proposals").unwrap(); let prop_subscription = proposals_sub.publisher.clone().subscribe().await; @@ -140,7 +138,7 @@ async fn listen_to_network(node: &DarkfiNodePtr, ex: &ExecutorPtr) -> Result<()> let result = smol::future::or( monitor_network(&net_subscription), - consensus_task(node, &prop_subscription, ex), + consensus_task(node, &prop_subscription, sender), ) .await; @@ -160,23 +158,15 @@ async fn monitor_network(subscription: &Subscription) -> Result<()> { async fn consensus_task( node: &DarkfiNodePtr, subscription: &Subscription, - ex: &ExecutorPtr, + sender: &Sender<()>, ) -> Result<()> { info!(target: "darkfid::task::consensus_task", "Starting consensus task..."); // Grab blocks subscriber let block_sub = node.subscribers.get("blocks").unwrap(); - // Create the garbage collection task using a dummy task - let gc_task = StoppableTask::new(); - gc_task.clone().start( - async { Ok(()) }, - |_| async { /* Do nothing */ }, - Error::GarbageCollectionTaskStopped, - ex.clone(), - ); - loop { + // Wait for a new proposal subscription.receive().await; // Check if we can confirm anything and broadcast them @@ -193,42 +183,28 @@ async fn consensus_task( }; // Refresh mining registry - let mut registry = node.registry.state.write().await; - if let Err(e) = registry.refresh(&validator).await { - error!(target: "darkfid", "Failed refreshing mining block templates: {e}") + if let Err(e) = node.registry.state.write().await.refresh(&validator).await { + error!(target: "darkfid::task::consensus_task", "Failed refreshing mining block templates: {e}") } + // Notify the garbage collection task + if let Err(e) = sender.send(()).await { + error!( + target: "darkfid::task::consensus_task", + "Garbage collection channel send fail: {e}" + ); + }; + + // Check if something was confirmed if confirmed.is_empty() { continue } - // Purge all unreferenced contract trees from the database - if let Err(e) = - validator.consensus.purge_unreferenced_trees(&mut registry.new_trees()).await - { - error!(target: "darkfid::task::garbage_collect::purge_unreferenced_trees", "Purging unreferenced contract trees from the database failed: {e}"); - } - + // Broadcast confirmed blocks to subscribers let mut notif_blocks = Vec::with_capacity(confirmed.len()); for block in confirmed { notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await))); } block_sub.notify(JsonValue::Array(notif_blocks)).await; - - // Invoke the detached garbage collection task - gc_task.clone().stop().await; - gc_task.clone().start( - garbage_collect_task(node.clone()), - |res| async { - match res { - Ok(()) | Err(Error::GarbageCollectionTaskStopped) => { /* Do nothing */ } - Err(e) => { - error!(target: "darkfid", "Failed starting garbage collection task: {e}") - } - } - }, - Error::GarbageCollectionTaskStopped, - ex.clone(), - ); } } diff --git a/bin/darkfid/src/task/garbage_collect.rs b/bin/darkfid/src/task/garbage_collect.rs index fe2d32499..bdbeafe1e 100644 --- a/bin/darkfid/src/task/garbage_collect.rs +++ b/bin/darkfid/src/task/garbage_collect.rs @@ -16,158 +16,154 @@ * along with this program. If not, see . */ -use darkfi::{error::TxVerifyFailed, validator::verification::verify_transactions, Error, Result}; +use std::collections::HashMap; + +use darkfi::{ + blockchain::parse_record, tx::Transaction, validator::verification::verify_transaction, + zk::VerifyingKey, Result, +}; use darkfi_sdk::{crypto::MerkleTree, tx::TransactionHash}; +use smol::channel::Receiver; use tracing::{debug, error, info}; use crate::DarkfiNodePtr; -/// Async task used for purging erroneous pending transactions from the nodes mempool. -pub async fn garbage_collect_task(node: DarkfiNodePtr) -> Result<()> { +/// Auxiliary macro to check if channel receiver is empty so we can +/// abort current iteration. +macro_rules! trigger_queue_check { + ($receiver:ident, $label:tt) => { + if !$receiver.is_empty() { + continue $label + } + }; +} + +/// Async task used for purging unreferenced trees and erroneous +/// pending transactions from the nodes mempool. +pub async fn garbage_collect_task(receiver: Receiver<()>, node: DarkfiNodePtr) -> Result<()> { info!(target: "darkfid::task::garbage_collect_task", "Starting garbage collection task..."); - // Grab all current unproposed transactions. We verify them in batches, - // to not load them all in memory. - let validator = node.validator.read().await; - let mut last_checked = TransactionHash::none(); - let mut txs = match validator - .blockchain - .transactions - .get_after_pending(&last_checked, node.txs_batch_size) - { - Ok(txs) => txs, - Err(e) => { - error!( - target: "darkfid::task::garbage_collect_task", - "Uproposed transactions retrieval failed: {e}" - ); - return Ok(()) - } - }; + 'outer: loop { + // Wait for a new trigger + if let Err(e) = receiver.recv().await { + error!(target: "darkfid::task::garbage_collect_task", "recv fail: {e}"); + continue + }; - // Check if we have transactions to process - if txs.is_empty() { - info!(target: "darkfid::task::garbage_collect_task", "Garbage collection finished successfully!"); - return Ok(()) - } - - // Grab configured target - let target = validator.consensus.module.target; - drop(validator); - - while !txs.is_empty() { - // Verify each one against current forks - for tx in txs { - last_checked = tx.hash(); - let tx_vec = [tx.clone()]; - let mut valid = false; - - // Grab a lock over current consensus forks state - let mut validator = node.validator.write().await; - - // Iterate over them to verify transaction validity in their overlays - for fork in validator.consensus.forks.iter_mut() { - // Clone forks' overlay - let overlay = match fork.overlay.lock().unwrap().full_clone() { - Ok(o) => o, - Err(e) => { - error!( - target: "darkfid::task::garbage_collect_task", - "Overlay full clone creation failed: {e}" - ); - return Err(e) - } - }; - - // Grab all current proposals transactions hashes - let proposals_txs = - match overlay.lock().unwrap().get_blocks_txs_hashes(&fork.proposals) { - Ok(txs) => txs, - Err(e) => { - error!( - target: "darkfid::task::garbage_collect_task", - "Proposal transactions retrieval failed: {e}" - ); - return Err(e) - } - }; - - // If the hash is contained in the proposals transactions vec, skip it - if proposals_txs.contains(&last_checked) { - continue - } - - // Grab forks' next block height - let next_block_height = match fork.get_next_block_height() { - Ok(h) => h, - Err(e) => { - error!( - target: "darkfid::task::garbage_collect_task", - "Next fork block height retrieval failed: {e}" - ); - return Err(e) - } - }; - - // Verify transaction - let result = verify_transactions( - &overlay, - next_block_height, - target, - &tx_vec, - &mut MerkleTree::new(1), - false, - ) - .await; - - // Check result - match result { - Ok(_) => valid = true, - Err(Error::TxVerifyFailed(TxVerifyFailed::ErroneousTxs(_))) => { - /* Do nothing */ - } - Err(e) => { - error!( - target: "darkfid::task::garbage_collect_task", - "Verifying transaction {last_checked} failed: {e}" - ); - return Err(e) - } - } - } - - // Remove transaction if its invalid for all the forks - if !valid { - debug!(target: "darkfid::task::garbage_collect_task", "Removing invalid transaction: {last_checked}"); - if let Err(e) = validator.blockchain.remove_pending_txs_hashes(&[last_checked]) { - error!( - target: "darkfid::task::garbage_collect_task", - "Removing invalid transaction {last_checked} failed: {e}" - ); - }; - } - } - - // Grab next batch - txs = match node + // Purge all unreferenced contract trees from the database + trigger_queue_check!(receiver, 'outer); + debug!(target: "darkfid::task::garbage_collect_task", "Starting garbage collection iteration..."); + if let Err(e) = node .validator .read() .await - .blockchain - .transactions - .get_after_pending(&last_checked, node.txs_batch_size) + .consensus + .purge_unreferenced_trees(&mut node.registry.state.read().await.new_trees()) + .await { - Ok(txs) => txs, + error!(target: "darkfid::task::garbage_collect_task", "Purging unreferenced contract trees from the database failed: {e}"); + continue + } + debug!(target: "darkfid::task::garbage_collect_task", "Unreferenced trees purged successfully, retrieving pending transactions..."); + + // Check if our mempool is empty + trigger_queue_check!(receiver, 'outer); + let validator = node.validator.read().await; + if validator.blockchain.transactions.pending.is_empty() { + debug!(target: "darkfid::task::garbage_collect_task", "No pending transactions to process"); + continue + } + + // Grab validator current best fork and an iterator over its + // pending transactions so we don't hold the validator lock. + let pending = validator.blockchain.transactions.pending.iter(); + let fork = match validator.best_current_fork().await { + Ok(f) => f, Err(e) => { - error!( - target: "darkfid::task::garbage_collect_task", - "Uproposed transactions next batch retrieval failed: {e}" - ); - break + error!(target: "darkfid::task::garbage_collect_task", "Retrieving validator current best fork failed: {e}"); + continue } }; - } + let verify_fees = validator.verify_fees; + drop(validator); - info!(target: "darkfid::task::garbage_collect_task", "Garbage collection finished successfully!"); - Ok(()) + // Transactions Merkle tree + trigger_queue_check!(receiver, 'outer); + let mut tree = MerkleTree::new(1); + + // Map of ZK proof verifying keys for the current transactions + // batch. + let mut vks: HashMap<[u8; 32], HashMap> = HashMap::new(); + + // Grab forks' next block height + let next_block_height = match fork.get_next_block_height() { + Ok(h) => h, + Err(e) => { + error!( + target: "darkfid::task::garbage_collect_task", + "Next fork block height retrieval failed: {e}" + ); + continue + } + }; + + // Iterate over all pending transactions + for record in pending { + trigger_queue_check!(receiver, 'outer); + let record = match record { + Ok(r) => r, + Err(e) => { + error!(target: "darkfid::task::garbage_collect_task", "Failed retrieving pending tx: {e}"); + continue 'outer + } + }; + let (tx_hash, tx) = match parse_record::(record) { + Ok((h, t)) => (h, t), + Err(e) => { + error!(target: "darkfid::task::garbage_collect_task", "Failed parsing pending tx: {e}"); + continue + } + }; + + // If the transaction has already been proposed, remove it + trigger_queue_check!(receiver, 'outer); + debug!(target: "darkfid::task::garbage_collect_task", "Checking transaction: {tx_hash}"); + if fork.overlay.lock().unwrap().transactions.contains(&tx_hash)? { + debug!(target: "darkfid::task::garbage_collect_task", "Transaction {tx_hash} has already been proposed, removing..."); + if let Err(e) = fork.blockchain.remove_pending_txs_hashes(&[tx_hash]) { + error!(target: "darkfid::task::garbage_collect_task", "Failed removing pending tx: {e}"); + }; + continue + } + + // Update the verifying keys map + trigger_queue_check!(receiver, 'outer); + for call in &tx.calls { + vks.entry(call.data.contract_id.to_bytes()).or_default(); + } + + // Verify the transaction against current state + trigger_queue_check!(receiver, 'outer); + fork.overlay.lock().unwrap().checkpoint(); + let result = verify_transaction( + &fork.overlay, + next_block_height, + fork.module.target, + &tx, + &mut tree, + &mut vks, + verify_fees, + ) + .await; + fork.overlay.lock().unwrap().revert_to_checkpoint(); + if let Err(e) = result { + debug!(target: "darkfid::task::garbage_collect_task", "Pending transaction {tx_hash} verification failed: {e}"); + if let Err(e) = fork.blockchain.remove_pending_txs_hashes(&[tx_hash]) { + error!(target: "darkfid::task::garbage_collect_task", "Failed removing pending tx: {e}"); + }; + continue + } + debug!(target: "darkfid::task::garbage_collect_task", "Pending transaction {tx_hash} verification successfully."); + } + } } diff --git a/bin/darkfid/src/tests/harness.rs b/bin/darkfid/src/tests/harness.rs index f8f32a15a..d9391c808 100644 --- a/bin/darkfid/src/tests/harness.rs +++ b/bin/darkfid/src/tests/harness.rs @@ -302,7 +302,7 @@ pub async fn generate_node( let p2p_handler = DarkfidP2pHandler::init(settings, ex).await?; let registry = DarkfiMinersRegistry::init(Network::Mainnet, &validator).await?; let node = - DarkfiNode::new(validator.clone(), p2p_handler.clone(), registry, 50, subscribers.clone()) + DarkfiNode::new(validator.clone(), p2p_handler.clone(), registry, subscribers.clone()) .await?; p2p_handler.start(ex, &node).await?; diff --git a/bin/darkfid/src/tests/mod.rs b/bin/darkfid/src/tests/mod.rs index 5ab19c512..b773a89c6 100644 --- a/bin/darkfid/src/tests/mod.rs +++ b/bin/darkfid/src/tests/mod.rs @@ -289,7 +289,6 @@ fn darkfid_programmatic_control() -> Result<()> { &sled_db, &config, &darkfi::net::Settings::default(), - &None, &ex, ) .await