refactor(trie): improve account multiproof handling with crossbeam channel integration

This commit is contained in:
Yong Kang
2025-10-22 07:08:34 +00:00
parent ad3131cd55
commit 275eea94fd

View File

@@ -5,16 +5,21 @@ use crate::{
StorageRootTargets,
};
use alloy_primitives::{map::B256Set, B256};
use crossbeam_channel::unbounded as crossbeam_unbounded;
use dashmap::DashMap;
use reth_execution_errors::StorageRootError;
use reth_storage_errors::db::DatabaseError;
use reth_trie::{
prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets, TriePrefixSetsMut},
updates::TrieUpdatesSorted,
DecodedMultiProof, DecodedStorageMultiProof, HashedPostStateSorted, MultiProofTargets, Nibbles,
DecodedMultiProof, DecodedStorageMultiProof, HashedPostState, HashedPostStateSorted,
MultiProofTargets, Nibbles,
};
use reth_trie_common::added_removed_keys::MultiAddedRemovedKeys;
use std::sync::{mpsc::Receiver, Arc};
use std::{
sync::{mpsc::Receiver, Arc},
time::Instant,
};
use tracing::trace;
/// Parallel proof calculator.
@@ -182,6 +187,9 @@ impl ParallelProof {
);
// Queue account multiproof request to account worker pool
// Create channel for receiving ProofResultMessage
let (result_tx, result_rx) = crossbeam_unbounded();
let account_multiproof_start_time = Instant::now();
let input = AccountMultiproofInput {
targets,
@@ -189,19 +197,26 @@ impl ParallelProof {
collect_branch_node_masks: self.collect_branch_node_masks,
multi_added_removed_keys: self.multi_added_removed_keys.clone(),
missed_leaves_storage_roots: self.missed_leaves_storage_roots.clone(),
proof_result_sender: (
result_tx,
0,
HashedPostState::default(),
account_multiproof_start_time,
),
};
let receiver = self
.proof_worker_handle
self.proof_worker_handle
.dispatch_account_multiproof(input)
.map_err(|e| ParallelStateRootError::Other(e.to_string()))?;
// Wait for account multiproof result from worker
let (multiproof, stats) = receiver.recv().map_err(|_| {
// Wait for account multiproof result from worker via ProofResultMessage
let proof_result_msg = result_rx.recv().map_err(|_| {
ParallelStateRootError::Other(
"Account multiproof channel dropped: worker died or pool shutdown".to_string(),
)
})??;
})?;
let (multiproof, stats) = proof_result_msg.result?;
#[cfg(feature = "metrics")]
self.metrics.record(stats);