mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
refactor(multiproof): streamline storage proof request handling
- Reorganized the logic for queuing storage proof requests during trie traversal to ensure all accounts in the extended prefix set are processed, even those with no storage changes. - Removed redundant code for storage proof request queuing, enhancing clarity and maintainability. - Updated the handling of destroyed accounts to avoid unnecessary cloning, improving performance and memory efficiency.
This commit is contained in:
@@ -70,40 +70,6 @@ fn execute_account_multiproof_worker<Tx: DbTx>(
|
||||
storage_proof_handle: ProofTaskManagerHandle<Tx>,
|
||||
proof_tx: &ProofTaskTx<Tx>,
|
||||
) -> Result<DecodedMultiProof, ParallelStateRootError> {
|
||||
// Queue storage proof requests to storage manager
|
||||
let mut storage_receivers: B256Map<
|
||||
CrossbeamReceiver<Result<DecodedStorageMultiProof, ParallelStateRootError>>,
|
||||
> = B256Map::default();
|
||||
for (address, slots) in targets.iter() {
|
||||
if slots.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let (sender, receiver) = crossbeam_channel::unbounded();
|
||||
let prefix_set = prefix_sets.storage_prefix_sets.get(address).cloned().unwrap_or_default();
|
||||
let storage_input = StorageProofInput::new(
|
||||
*address,
|
||||
prefix_set,
|
||||
Arc::new(slots.clone()), // Arc clone is cheap (reference count only)
|
||||
collect_branch_node_masks,
|
||||
multi_added_removed_keys.clone(),
|
||||
);
|
||||
|
||||
if storage_proof_handle
|
||||
.queue_task(ProofTaskKind::StorageProof(storage_input, sender))
|
||||
.is_err()
|
||||
{
|
||||
debug!(
|
||||
target: "trie::proof_task",
|
||||
?address,
|
||||
"Storage manager closed, cannot queue proof"
|
||||
);
|
||||
return Err(storage_manager_closed_error());
|
||||
}
|
||||
|
||||
storage_receivers.insert(*address, receiver);
|
||||
}
|
||||
|
||||
// Build account multiproof, fetching storage proofs on-demand during trie traversal
|
||||
let (trie_cursor_factory, hashed_cursor_factory) = proof_tx.create_factories();
|
||||
|
||||
@@ -118,7 +84,7 @@ fn execute_account_multiproof_worker<Tx: DbTx>(
|
||||
} else {
|
||||
// Otherwise, start with existing keys
|
||||
reth_trie::prefix_set::PrefixSetMut::from(
|
||||
prefix_sets.account_prefix_set.iter().cloned(),
|
||||
prefix_sets.account_prefix_set.iter().copied(),
|
||||
)
|
||||
}
|
||||
},
|
||||
@@ -128,13 +94,13 @@ fn execute_account_multiproof_worker<Tx: DbTx>(
|
||||
let mutable_set = if prefix_set.all() {
|
||||
reth_trie::prefix_set::PrefixSetMut::all()
|
||||
} else {
|
||||
reth_trie::prefix_set::PrefixSetMut::from(prefix_set.iter().cloned())
|
||||
reth_trie::prefix_set::PrefixSetMut::from(prefix_set.iter().copied())
|
||||
};
|
||||
storage_sets.insert(*address, mutable_set);
|
||||
}
|
||||
storage_sets
|
||||
},
|
||||
destroyed_accounts: prefix_sets.destroyed_accounts.clone(),
|
||||
destroyed_accounts: prefix_sets.destroyed_accounts,
|
||||
};
|
||||
|
||||
// Now extend with targets. Using .extend() properly preserves the `all` flag.
|
||||
@@ -157,6 +123,45 @@ fn execute_account_multiproof_worker<Tx: DbTx>(
|
||||
|
||||
let extended_prefix_sets = extended_prefix_sets.freeze();
|
||||
|
||||
// Queue storage proof requests for ALL accounts we'll visit during trie traversal.
|
||||
// The walker visits all accounts in extended_prefix_sets.account_prefix_set, so we need
|
||||
// to queue storage proofs for all of them, even if they have no storage changes or empty slots.
|
||||
let mut storage_receivers: B256Map<
|
||||
CrossbeamReceiver<Result<DecodedStorageMultiProof, ParallelStateRootError>>,
|
||||
> = B256Map::default();
|
||||
|
||||
for account_nibbles in &extended_prefix_sets.account_prefix_set {
|
||||
// Convert nibbles back to B256 address
|
||||
let address = B256::from_slice(&account_nibbles.pack());
|
||||
|
||||
let (sender, receiver) = crossbeam_channel::unbounded();
|
||||
let prefix_set =
|
||||
extended_prefix_sets.storage_prefix_sets.get(&address).cloned().unwrap_or_default();
|
||||
let target_slots = targets.get(&address).cloned().unwrap_or_default();
|
||||
|
||||
let storage_input = StorageProofInput::new(
|
||||
address,
|
||||
prefix_set,
|
||||
Arc::new(target_slots),
|
||||
collect_branch_node_masks,
|
||||
multi_added_removed_keys.clone(),
|
||||
);
|
||||
|
||||
if storage_proof_handle
|
||||
.queue_task(ProofTaskKind::StorageProof(storage_input, sender))
|
||||
.is_err()
|
||||
{
|
||||
debug!(
|
||||
target: "trie::proof_task",
|
||||
?address,
|
||||
"Storage manager closed, cannot queue proof"
|
||||
);
|
||||
return Err(storage_manager_closed_error());
|
||||
}
|
||||
|
||||
storage_receivers.insert(address, receiver);
|
||||
}
|
||||
|
||||
let result = crate::proof::build_account_multiproof_with_storage(
|
||||
trie_cursor_factory,
|
||||
hashed_cursor_factory,
|
||||
|
||||
Reference in New Issue
Block a user