darkfid/task/garbage_collect: refactored to a standalone background task

This commit is contained in:
skoupidi
2026-04-03 17:41:20 +03:00
parent e33283d647
commit f0e22c8d1e
8 changed files with 196 additions and 231 deletions

View File

@@ -38,9 +38,6 @@ skip_fees = false
# Optional sync checkpoint hash
#checkpoint = ""
# Garbage collection task transactions batch size
txs_batch_size = 50
## Testnet JSON-RPC settings
[network_config."testnet".rpc]
# JSON-RPC listen URL
@@ -236,9 +233,6 @@ skip_fees = false
# Optional sync checkpoint hash
#checkpoint = ""
# Garbage collection task transactions batch size
txs_batch_size = 50
## Mainnet JSON-RPC settings
[network_config."mainnet".rpc]
# JSON-RPC listen URL
@@ -431,9 +425,6 @@ skip_fees = false
# Optional sync checkpoint hash
#checkpoint = ""
# Garbage collection task transactions batch size
txs_batch_size = 50
## Localnet JSON-RPC settings
[network_config."localnet".rpc]
# JSON-RPC listen URL

View File

@@ -49,7 +49,7 @@ use rpc::{management::ManagementRpcHandler, DefaultRpcHandler};
/// Validator async tasks
pub mod task;
use task::{consensus::ConsensusInitTaskConfig, consensus_init_task};
use task::{consensus::ConsensusInitTaskConfig, consensus_init_task, garbage_collect_task};
/// P2P net protocols
mod proto;
@@ -70,8 +70,6 @@ pub struct DarkfiNode {
p2p_handler: DarkfidP2pHandlerPtr,
/// Node miners registry pointer
registry: DarkfiMinersRegistryPtr,
/// Garbage collection task transactions batch size
txs_batch_size: usize,
/// A map of various subscribers exporting live info from the blockchain
subscribers: HashMap<&'static str, JsonSubscriber>,
/// Main JSON-RPC connection tracker
@@ -85,14 +83,12 @@ impl DarkfiNode {
validator: ValidatorPtr,
p2p_handler: DarkfidP2pHandlerPtr,
registry: DarkfiMinersRegistryPtr,
txs_batch_size: usize,
subscribers: HashMap<&'static str, JsonSubscriber>,
) -> Result<DarkfiNodePtr> {
Ok(Arc::new(Self {
validator,
p2p_handler,
registry,
txs_batch_size,
subscribers,
rpc_connections: Mutex::new(HashSet::new()),
management_rpc_connections: Mutex::new(HashSet::new()),
@@ -115,6 +111,8 @@ pub struct Darkfid {
management_rpc_task: StoppableTaskPtr,
/// Consensus protocol background task
consensus_task: StoppableTaskPtr,
/// Node garbage collection background task
gc_task: StoppableTaskPtr,
}
impl Darkfid {
@@ -127,7 +125,6 @@ impl Darkfid {
sled_db: &sled_overlay::sled::Db,
config: &ValidatorConfig,
net_settings: &Settings,
txs_batch_size: &Option<usize>,
ex: &ExecutorPtr,
) -> Result<DarkfidPtr> {
info!(target: "darkfid::Darkfid::init", "Initializing a Darkfi daemon...");
@@ -140,18 +137,6 @@ impl Darkfid {
// Initialize the miners registry
let registry = DarkfiMinersRegistry::init(network, &validator).await?;
// Grab blockchain network configured transactions batch size for garbage collection
let txs_batch_size = match txs_batch_size {
Some(b) => {
if *b > 0 {
*b
} else {
50
}
}
None => 50,
};
// Here we initialize various subscribers that can export live blockchain/consensus data.
let mut subscribers = HashMap::new();
subscribers.insert("blocks", JsonSubscriber::new("blockchain.subscribe_blocks"));
@@ -160,18 +145,25 @@ impl Darkfid {
subscribers.insert("dnet", JsonSubscriber::new("dnet.subscribe_events"));
// Initialize node
let node =
DarkfiNode::new(validator, p2p_handler, registry, txs_batch_size, subscribers).await?;
let node = DarkfiNode::new(validator, p2p_handler, registry, subscribers).await?;
// Generate the background tasks
let dnet_task = StoppableTask::new();
let rpc_task = StoppableTask::new();
let management_rpc_task = StoppableTask::new();
let consensus_task = StoppableTask::new();
let gc_task = StoppableTask::new();
info!(target: "darkfid::Darkfid::init", "Darkfi daemon initialized successfully!");
Ok(Arc::new(Self { node, dnet_task, rpc_task, management_rpc_task, consensus_task }))
Ok(Arc::new(Self {
node,
dnet_task,
rpc_task,
management_rpc_task,
consensus_task,
gc_task,
}))
}
/// Start the DarkFi daemon in the given executor, using the
@@ -249,13 +241,16 @@ impl Darkfid {
info!(target: "darkfid::Darkfid::start", "Starting P2P network");
self.node.p2p_handler.start(executor, &self.node).await?;
// Generate the signal queue smol channel
let (sender, receiver) = smol::channel::unbounded::<()>();
// Start the consensus protocol
info!(target: "darkfid::Darkfid::start", "Starting consensus protocol task");
self.consensus_task.clone().start(
consensus_init_task(
self.node.clone(),
config.clone(),
executor.clone(),
sender,
),
|res| async move {
match res {
@@ -267,6 +262,22 @@ impl Darkfid {
executor.clone(),
);
// Start the garbage collection task
info!(target: "darkfid::Darkfid::start", "Starting garbage collection task");
self.gc_task.clone().start(
garbage_collect_task(receiver, self.node.clone()),
|res| async {
match res {
Ok(()) | Err(Error::GarbageCollectionTaskStopped) => { /* Do nothing */ }
Err(e) => {
error!(target: "darkfid", "Failed starting garbage collection task: {e}")
}
}
},
Error::GarbageCollectionTaskStopped,
executor.clone(),
);
info!(target: "darkfid::Darkfid::start", "Darkfi daemon started successfully!");
Ok(())
}
@@ -295,6 +306,10 @@ impl Darkfid {
info!(target: "darkfid::Darkfid::stop", "Stopping P2P network protocols handler...");
self.node.p2p_handler.stop().await;
// Stop the garbage collection task
info!(target: "darkfid::Darkfid::stop", "Stopping garbage collection task...");
self.gc_task.stop().await;
// Stop the consensus task
info!(target: "darkfid::Darkfid::stop", "Stopping consensus task...");
self.consensus_task.stop().await;

View File

@@ -127,10 +127,6 @@ pub struct BlockchainNetwork {
/// Optional sync checkpoint hash
checkpoint: Option<String>,
#[structopt(long)]
/// Garbage collection task transactions batch size
txs_batch_size: Option<usize>,
#[structopt(flatten)]
/// P2P network settings
net: SettingsOpt,
@@ -246,15 +242,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
(env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"), blockchain_config.net).try_into()?;
// Generate the daemon
let daemon = Darkfid::init(
network,
&sled_db,
&config,
&p2p_settings,
&blockchain_config.txs_batch_size,
&ex,
)
.await?;
let daemon = Darkfid::init(network, &sled_db, &config, &p2p_settings, &ex).await?;
// Start the daemon
let config = ConsensusInitTaskConfig {

View File

@@ -327,7 +327,7 @@ impl DarkfiMinersRegistryState {
Ok(())
}
/// Auxilliary function to retrieve all current block templates
/// Auxiliary function to retrieve all current block templates
/// newly opened trees.
pub fn new_trees(&self) -> BTreeSet<IVec> {
let mut new_trees = BTreeSet::new();
@@ -339,7 +339,7 @@ impl DarkfiMinersRegistryState {
new_trees
}
/// Auxilliary function to retrieve all current block templates
/// Auxiliary function to retrieve all current block templates
/// transactions hashes.
pub fn proposed_transactions(&self) -> HashSet<TransactionHash> {
let mut proposed_txs = HashSet::new();

View File

@@ -21,17 +21,15 @@ use std::str::FromStr;
use darkfi::{
blockchain::HeaderHash,
rpc::{jsonrpc::JsonNotification, util::JsonValue},
system::{sleep, ExecutorPtr, StoppableTask, Subscription},
system::{sleep, Subscription},
util::{encoding::base64, time::Timestamp},
Error, Result,
};
use darkfi_serial::serialize_async;
use smol::channel::Sender;
use tracing::{error, info};
use crate::{
task::{garbage_collect::garbage_collect_task, sync_task},
DarkfiNodePtr,
};
use crate::{task::sync_task, DarkfiNodePtr};
/// Auxiliary structure representing node consensus init task configuration.
#[derive(Clone)]
@@ -48,7 +46,7 @@ pub struct ConsensusInitTaskConfig {
pub async fn consensus_init_task(
node: DarkfiNodePtr,
config: ConsensusInitTaskConfig,
ex: ExecutorPtr,
sender: Sender<()>,
) -> Result<()> {
// Check current canonical blockchain for curruption
// TODO: create a restore method reverting each block backwards
@@ -104,7 +102,7 @@ pub async fn consensus_init_task(
// Gracefully handle network disconnections
loop {
match listen_to_network(&node, &ex).await {
match listen_to_network(&node, &sender).await {
Ok(_) => return Ok(()),
Err(Error::NetworkNotConnected) => {
// Sync node again
@@ -130,7 +128,7 @@ pub async fn consensus_init_task(
}
/// Async task to start the consensus task, while monitoring for a network disconnections.
async fn listen_to_network(node: &DarkfiNodePtr, ex: &ExecutorPtr) -> Result<()> {
async fn listen_to_network(node: &DarkfiNodePtr, sender: &Sender<()>) -> Result<()> {
// Grab proposals subscriber and subscribe to it
let proposals_sub = node.subscribers.get("proposals").unwrap();
let prop_subscription = proposals_sub.publisher.clone().subscribe().await;
@@ -140,7 +138,7 @@ async fn listen_to_network(node: &DarkfiNodePtr, ex: &ExecutorPtr) -> Result<()>
let result = smol::future::or(
monitor_network(&net_subscription),
consensus_task(node, &prop_subscription, ex),
consensus_task(node, &prop_subscription, sender),
)
.await;
@@ -160,23 +158,15 @@ async fn monitor_network(subscription: &Subscription<Error>) -> Result<()> {
async fn consensus_task(
node: &DarkfiNodePtr,
subscription: &Subscription<JsonNotification>,
ex: &ExecutorPtr,
sender: &Sender<()>,
) -> Result<()> {
info!(target: "darkfid::task::consensus_task", "Starting consensus task...");
// Grab blocks subscriber
let block_sub = node.subscribers.get("blocks").unwrap();
// Create the garbage collection task using a dummy task
let gc_task = StoppableTask::new();
gc_task.clone().start(
async { Ok(()) },
|_| async { /* Do nothing */ },
Error::GarbageCollectionTaskStopped,
ex.clone(),
);
loop {
// Wait for a new proposal
subscription.receive().await;
// Check if we can confirm anything and broadcast them
@@ -193,42 +183,28 @@ async fn consensus_task(
};
// Refresh mining registry
let mut registry = node.registry.state.write().await;
if let Err(e) = registry.refresh(&validator).await {
error!(target: "darkfid", "Failed refreshing mining block templates: {e}")
if let Err(e) = node.registry.state.write().await.refresh(&validator).await {
error!(target: "darkfid::task::consensus_task", "Failed refreshing mining block templates: {e}")
}
// Notify the garbage collection task
if let Err(e) = sender.send(()).await {
error!(
target: "darkfid::task::consensus_task",
"Garbage collection channel send fail: {e}"
);
};
// Check if something was confirmed
if confirmed.is_empty() {
continue
}
// Purge all unreferenced contract trees from the database
if let Err(e) =
validator.consensus.purge_unreferenced_trees(&mut registry.new_trees()).await
{
error!(target: "darkfid::task::garbage_collect::purge_unreferenced_trees", "Purging unreferenced contract trees from the database failed: {e}");
}
// Broadcast confirmed blocks to subscribers
let mut notif_blocks = Vec::with_capacity(confirmed.len());
for block in confirmed {
notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
}
block_sub.notify(JsonValue::Array(notif_blocks)).await;
// Invoke the detached garbage collection task
gc_task.clone().stop().await;
gc_task.clone().start(
garbage_collect_task(node.clone()),
|res| async {
match res {
Ok(()) | Err(Error::GarbageCollectionTaskStopped) => { /* Do nothing */ }
Err(e) => {
error!(target: "darkfid", "Failed starting garbage collection task: {e}")
}
}
},
Error::GarbageCollectionTaskStopped,
ex.clone(),
);
}
}

View File

@@ -16,158 +16,154 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use darkfi::{error::TxVerifyFailed, validator::verification::verify_transactions, Error, Result};
use std::collections::HashMap;
use darkfi::{
blockchain::parse_record, tx::Transaction, validator::verification::verify_transaction,
zk::VerifyingKey, Result,
};
use darkfi_sdk::{crypto::MerkleTree, tx::TransactionHash};
use smol::channel::Receiver;
use tracing::{debug, error, info};
use crate::DarkfiNodePtr;
/// Async task used for purging erroneous pending transactions from the nodes mempool.
pub async fn garbage_collect_task(node: DarkfiNodePtr) -> Result<()> {
/// Auxiliary macro to check if channel receiver is empty so we can
/// abort current iteration.
macro_rules! trigger_queue_check {
($receiver:ident, $label:tt) => {
if !$receiver.is_empty() {
continue $label
}
};
}
/// Async task used for purging unreferenced trees and erroneous
/// pending transactions from the nodes mempool.
pub async fn garbage_collect_task(receiver: Receiver<()>, node: DarkfiNodePtr) -> Result<()> {
info!(target: "darkfid::task::garbage_collect_task", "Starting garbage collection task...");
// Grab all current unproposed transactions. We verify them in batches,
// to not load them all in memory.
let validator = node.validator.read().await;
let mut last_checked = TransactionHash::none();
let mut txs = match validator
.blockchain
.transactions
.get_after_pending(&last_checked, node.txs_batch_size)
{
Ok(txs) => txs,
Err(e) => {
error!(
target: "darkfid::task::garbage_collect_task",
"Uproposed transactions retrieval failed: {e}"
);
return Ok(())
}
};
'outer: loop {
// Wait for a new trigger
if let Err(e) = receiver.recv().await {
error!(target: "darkfid::task::garbage_collect_task", "recv fail: {e}");
continue
};
// Check if we have transactions to process
if txs.is_empty() {
info!(target: "darkfid::task::garbage_collect_task", "Garbage collection finished successfully!");
return Ok(())
}
// Grab configured target
let target = validator.consensus.module.target;
drop(validator);
while !txs.is_empty() {
// Verify each one against current forks
for tx in txs {
last_checked = tx.hash();
let tx_vec = [tx.clone()];
let mut valid = false;
// Grab a lock over current consensus forks state
let mut validator = node.validator.write().await;
// Iterate over them to verify transaction validity in their overlays
for fork in validator.consensus.forks.iter_mut() {
// Clone forks' overlay
let overlay = match fork.overlay.lock().unwrap().full_clone() {
Ok(o) => o,
Err(e) => {
error!(
target: "darkfid::task::garbage_collect_task",
"Overlay full clone creation failed: {e}"
);
return Err(e)
}
};
// Grab all current proposals transactions hashes
let proposals_txs =
match overlay.lock().unwrap().get_blocks_txs_hashes(&fork.proposals) {
Ok(txs) => txs,
Err(e) => {
error!(
target: "darkfid::task::garbage_collect_task",
"Proposal transactions retrieval failed: {e}"
);
return Err(e)
}
};
// If the hash is contained in the proposals transactions vec, skip it
if proposals_txs.contains(&last_checked) {
continue
}
// Grab forks' next block height
let next_block_height = match fork.get_next_block_height() {
Ok(h) => h,
Err(e) => {
error!(
target: "darkfid::task::garbage_collect_task",
"Next fork block height retrieval failed: {e}"
);
return Err(e)
}
};
// Verify transaction
let result = verify_transactions(
&overlay,
next_block_height,
target,
&tx_vec,
&mut MerkleTree::new(1),
false,
)
.await;
// Check result
match result {
Ok(_) => valid = true,
Err(Error::TxVerifyFailed(TxVerifyFailed::ErroneousTxs(_))) => {
/* Do nothing */
}
Err(e) => {
error!(
target: "darkfid::task::garbage_collect_task",
"Verifying transaction {last_checked} failed: {e}"
);
return Err(e)
}
}
}
// Remove transaction if its invalid for all the forks
if !valid {
debug!(target: "darkfid::task::garbage_collect_task", "Removing invalid transaction: {last_checked}");
if let Err(e) = validator.blockchain.remove_pending_txs_hashes(&[last_checked]) {
error!(
target: "darkfid::task::garbage_collect_task",
"Removing invalid transaction {last_checked} failed: {e}"
);
};
}
}
// Grab next batch
txs = match node
// Purge all unreferenced contract trees from the database
trigger_queue_check!(receiver, 'outer);
debug!(target: "darkfid::task::garbage_collect_task", "Starting garbage collection iteration...");
if let Err(e) = node
.validator
.read()
.await
.blockchain
.transactions
.get_after_pending(&last_checked, node.txs_batch_size)
.consensus
.purge_unreferenced_trees(&mut node.registry.state.read().await.new_trees())
.await
{
Ok(txs) => txs,
error!(target: "darkfid::task::garbage_collect_task", "Purging unreferenced contract trees from the database failed: {e}");
continue
}
debug!(target: "darkfid::task::garbage_collect_task", "Unreferenced trees purged successfully, retrieving pending transactions...");
// Check if our mempool is empty
trigger_queue_check!(receiver, 'outer);
let validator = node.validator.read().await;
if validator.blockchain.transactions.pending.is_empty() {
debug!(target: "darkfid::task::garbage_collect_task", "No pending transactions to process");
continue
}
// Grab validator current best fork and an iterator over its
// pending transactions so we don't hold the validator lock.
let pending = validator.blockchain.transactions.pending.iter();
let fork = match validator.best_current_fork().await {
Ok(f) => f,
Err(e) => {
error!(
target: "darkfid::task::garbage_collect_task",
"Uproposed transactions next batch retrieval failed: {e}"
);
break
error!(target: "darkfid::task::garbage_collect_task", "Retrieving validator current best fork failed: {e}");
continue
}
};
}
let verify_fees = validator.verify_fees;
drop(validator);
info!(target: "darkfid::task::garbage_collect_task", "Garbage collection finished successfully!");
Ok(())
// Transactions Merkle tree
trigger_queue_check!(receiver, 'outer);
let mut tree = MerkleTree::new(1);
// Map of ZK proof verifying keys for the current transactions
// batch.
let mut vks: HashMap<[u8; 32], HashMap<String, VerifyingKey>> = HashMap::new();
// Grab forks' next block height
let next_block_height = match fork.get_next_block_height() {
Ok(h) => h,
Err(e) => {
error!(
target: "darkfid::task::garbage_collect_task",
"Next fork block height retrieval failed: {e}"
);
continue
}
};
// Iterate over all pending transactions
for record in pending {
trigger_queue_check!(receiver, 'outer);
let record = match record {
Ok(r) => r,
Err(e) => {
error!(target: "darkfid::task::garbage_collect_task", "Failed retrieving pending tx: {e}");
continue 'outer
}
};
let (tx_hash, tx) = match parse_record::<TransactionHash, Transaction>(record) {
Ok((h, t)) => (h, t),
Err(e) => {
error!(target: "darkfid::task::garbage_collect_task", "Failed parsing pending tx: {e}");
continue
}
};
// If the transaction has already been proposed, remove it
trigger_queue_check!(receiver, 'outer);
debug!(target: "darkfid::task::garbage_collect_task", "Checking transaction: {tx_hash}");
if fork.overlay.lock().unwrap().transactions.contains(&tx_hash)? {
debug!(target: "darkfid::task::garbage_collect_task", "Transaction {tx_hash} has already been proposed, removing...");
if let Err(e) = fork.blockchain.remove_pending_txs_hashes(&[tx_hash]) {
error!(target: "darkfid::task::garbage_collect_task", "Failed removing pending tx: {e}");
};
continue
}
// Update the verifying keys map
trigger_queue_check!(receiver, 'outer);
for call in &tx.calls {
vks.entry(call.data.contract_id.to_bytes()).or_default();
}
// Verify the transaction against current state
trigger_queue_check!(receiver, 'outer);
fork.overlay.lock().unwrap().checkpoint();
let result = verify_transaction(
&fork.overlay,
next_block_height,
fork.module.target,
&tx,
&mut tree,
&mut vks,
verify_fees,
)
.await;
fork.overlay.lock().unwrap().revert_to_checkpoint();
if let Err(e) = result {
debug!(target: "darkfid::task::garbage_collect_task", "Pending transaction {tx_hash} verification failed: {e}");
if let Err(e) = fork.blockchain.remove_pending_txs_hashes(&[tx_hash]) {
error!(target: "darkfid::task::garbage_collect_task", "Failed removing pending tx: {e}");
};
continue
}
debug!(target: "darkfid::task::garbage_collect_task", "Pending transaction {tx_hash} verification successfully.");
}
}
}

View File

@@ -302,7 +302,7 @@ pub async fn generate_node(
let p2p_handler = DarkfidP2pHandler::init(settings, ex).await?;
let registry = DarkfiMinersRegistry::init(Network::Mainnet, &validator).await?;
let node =
DarkfiNode::new(validator.clone(), p2p_handler.clone(), registry, 50, subscribers.clone())
DarkfiNode::new(validator.clone(), p2p_handler.clone(), registry, subscribers.clone())
.await?;
p2p_handler.start(ex, &node).await?;

View File

@@ -289,7 +289,6 @@ fn darkfid_programmatic_control() -> Result<()> {
&sled_db,
&config,
&darkfi::net::Settings::default(),
&None,
&ex,
)
.await