refactor(trie): implement on-demand storage proof fetching

- Updated `build_account_multiproof_with_storage` to accept storage proof receivers, allowing for lazy fetching of storage proofs during trie traversal.
- Removed pre-computed storage proof handling in favor of real-time proof retrieval, enhancing performance and efficiency.
- Introduced standardized error handling for closed storage proof channels.
- Adjusted related functions and structures to support the new fetching mechanism.
This commit is contained in:
Yong Kang
2025-10-02 08:31:55 +00:00
parent 85f0ff18bd
commit 20770f41c1
2 changed files with 52 additions and 32 deletions

View File

@@ -443,6 +443,7 @@ where
Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
{
// Calculate worker counts and concurrency limits
// TODO: We gottta experiment with this + metrics
let storage_workers = config.storage_proof_workers();
let account_workers = config.account_proof_workers();
let max_storage_concurrency = config.max_proof_task_concurrency() as usize;

View File

@@ -1,6 +1,6 @@
use crate::{
metrics::ParallelTrieMetrics,
proof_task::{ProofTaskKind, ProofTaskManagerHandle, StorageProofInput},
proof_task::{ProofTaskKind, ProofTaskManagerHandle, StorageProofInput, StorageProofResult},
root::ParallelStateRootError,
stats::{ParallelTrieStats, ParallelTrieTracker},
StorageRootTargets,
@@ -10,7 +10,7 @@ use alloy_primitives::{
B256,
};
use alloy_rlp::{BufMut, Encodable};
use dashmap::DashMap;
use crossbeam_channel::Receiver;
use itertools::Itertools;
use reth_execution_errors::StorageRootError;
use reth_provider::{
@@ -34,13 +34,22 @@ use reth_trie_common::{
proof::{DecodedProofNodes, ProofRetainer},
};
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
use std::sync::{mpsc::Receiver, Arc};
use tracing::trace;
use std::sync::Arc;
use tracing::{debug, trace, warn};
/// Builds an account multiproof with pre-computed storage proofs.
/// Creates a standardized error for when a storage proof channel closes unexpectedly.
#[inline]
fn storage_channel_closed_error(address: &B256) -> ParallelStateRootError {
ParallelStateRootError::StorageRoot(StorageRootError::Database(DatabaseError::Other(format!(
"storage proof channel closed unexpectedly for {address}"
))))
}
/// Builds an account multiproof with storage proof receivers.
///
/// This function accepts pre-computed storage proofs as input and builds the account multiproof
/// by walking the account trie and assembling the final proof.
/// This function accepts a map of storage proof receivers and fetches proofs on-demand
/// during account trie traversal, allowing account trie walking to interleave with
/// storage proof computation for better performance.
///
/// Returns a tuple containing the decoded multiproof and stats for metrics recording.
pub fn build_account_multiproof_with_storage<TCF, HCF>(
@@ -48,7 +57,7 @@ pub fn build_account_multiproof_with_storage<TCF, HCF>(
hashed_cursor_factory: HCF,
targets: MultiProofTargets,
prefix_sets: TriePrefixSets,
storage_proofs: B256Map<DecodedStorageMultiProof>,
mut storage_receivers: B256Map<Receiver<StorageProofResult>>,
collect_branch_node_masks: bool,
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
) -> Result<(DecodedMultiProof, ParallelTrieStats), ParallelStateRootError>
@@ -57,8 +66,6 @@ where
HCF: HashedCursorFactory + Clone,
{
let mut tracker = ParallelTrieTracker::default();
// Track the number of pre-computed storage proofs
tracker.set_precomputed_storage_roots(storage_proofs.len() as u64);
let accounts_added_removed_keys =
multi_added_removed_keys.as_ref().map(|keys| keys.get_accounts());
@@ -85,7 +92,6 @@ where
// Storage multiproofs for non empty tries will be overwritten if necessary.
let mut collected_decoded_storages: B256Map<DecodedStorageMultiProof> =
targets.keys().map(|key| (*key, DecodedStorageMultiProof::empty())).collect();
let mut storage_proofs = storage_proofs;
let mut account_rlp = Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE);
let mut account_node_iter = TrieNodeIter::state_trie(
walker,
@@ -97,13 +103,27 @@ where
hash_builder.add_branch(node.key, node.value, node.children_are_in_trie);
}
TrieElement::Leaf(hashed_address, account) => {
let decoded_storage_multiproof = match storage_proofs.remove(&hashed_address) {
Some(proof) => proof,
// Fetch storage proof on-demand (blocks if not yet ready)
let decoded_storage_multiproof = match storage_receivers.remove(&hashed_address) {
Some(receiver) => {
// Block waiting for the storage proof result
match receiver.recv() {
Ok(Ok(proof)) => proof,
Ok(Err(e)) => return Err(e),
Err(_) => return Err(storage_channel_closed_error(&hashed_address)),
}
}
// Since we do not store all intermediate nodes in the database, there might
// be a possibility of re-adding a non-modified leaf to the hash builder.
None => {
tracker.inc_missed_leaves();
warn!(
target: "trie::proof",
?hashed_address,
"No storage proof receiver found - using fallback synchronous computation"
);
let raw_fallback_proof = StorageProof::new_hashed(
trie_cursor_factory.clone(),
hashed_cursor_factory.clone(),
@@ -259,14 +279,23 @@ where
let input = StorageProofInput::new(
hashed_address,
prefix_set,
target_slots,
Arc::new(target_slots),
self.collect_branch_node_masks,
self.multi_added_removed_keys.clone(),
);
let (sender, receiver) = std::sync::mpsc::channel();
let _ =
self.storage_proof_task_handle.queue_task(ProofTaskKind::StorageProof(input, sender));
let (sender, receiver) = crossbeam_channel::unbounded();
if self
.storage_proof_task_handle
.queue_task(ProofTaskKind::StorageProof(input, sender))
.is_err()
{
debug!(
target: "trie::parallel_proof",
?hashed_address,
"Storage proof task queue failed - manager closed"
);
}
receiver
}
@@ -343,8 +372,9 @@ where
// stores the receiver for the storage proof outcome for the hashed addresses
// this way we can lazily await the outcome when we iterate over the map
let mut storage_proof_receivers =
B256Map::with_capacity_and_hasher(storage_root_targets.len(), Default::default());
let mut storage_proof_receivers: B256Map<
crossbeam_channel::Receiver<Result<DecodedStorageMultiProof, ParallelStateRootError>>,
> = B256Map::with_capacity_and_hasher(storage_root_targets.len(), Default::default());
for (hashed_address, prefix_set) in
storage_root_targets.into_iter().sorted_unstable_by_key(|(address, _)| *address)
@@ -357,18 +387,6 @@ where
storage_proof_receivers.insert(hashed_address, receiver);
}
// Wait for all storage proofs to complete
let mut storage_proofs =
B256Map::with_capacity_and_hasher(storage_proof_receivers.len(), Default::default());
for (hashed_address, receiver) in storage_proof_receivers {
let proof = receiver.recv().map_err(|e| {
ParallelStateRootError::StorageRoot(StorageRootError::Database(
DatabaseError::Other(format!("channel closed for {hashed_address}: {e}")),
))
})??;
storage_proofs.insert(hashed_address, proof);
}
let provider_ro = self.view.provider_ro()?;
let trie_cursor_factory = InMemoryTrieCursorFactory::new(
DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
@@ -379,12 +397,13 @@ where
&self.state_sorted,
);
// Build account multiproof with on-demand storage proof fetching
let (multiproof, stats) = build_account_multiproof_with_storage(
trie_cursor_factory,
hashed_cursor_factory,
targets,
frozen_prefix_sets,
storage_proofs,
storage_proof_receivers, // ← Pass receivers directly for on-demand fetching
self.collect_branch_node_masks,
self.multi_added_removed_keys,
)?;