mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
perf(trie): use Arc for proof sharing and add batch validation
- Wrap DecodedMultiProof and DecodedStorageMultiProof in Arc within ProofResult for O(1) cloning when sending batched results to multiple receivers - Add debug assertions in BatchedStorageProof::merge and BatchedAccountProof::merge to validate that all batched jobs share the same Arc for multi_added_removed_keys and missed_leaves_storage_roots (critical invariants for correctness) - Unwrap Arc at extraction sites using try_unwrap for zero-cost when sole owner
This commit is contained in:
@@ -126,7 +126,8 @@ impl ParallelProof {
|
||||
)))
|
||||
})?;
|
||||
|
||||
// Extract storage proof directly from the result
|
||||
// Extract storage proof directly from the result.
|
||||
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here.
|
||||
let storage_proof = match proof_msg.result? {
|
||||
crate::proof_task::ProofResult::StorageProof { hashed_address: addr, proof } => {
|
||||
debug_assert_eq!(
|
||||
@@ -134,7 +135,8 @@ impl ParallelProof {
|
||||
hashed_address,
|
||||
"storage worker must return same address: expected {hashed_address}, got {addr}"
|
||||
);
|
||||
proof
|
||||
// Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise.
|
||||
Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone())
|
||||
}
|
||||
crate::proof_task::ProofResult::AccountMultiproof { .. } => {
|
||||
unreachable!("storage worker only sends StorageProof variant")
|
||||
@@ -223,8 +225,12 @@ impl ParallelProof {
|
||||
)
|
||||
})?;
|
||||
|
||||
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here.
|
||||
let (multiproof, stats) = match proof_result_msg.result? {
|
||||
crate::proof_task::ProofResult::AccountMultiproof { proof, stats } => (proof, stats),
|
||||
crate::proof_task::ProofResult::AccountMultiproof { proof, stats } => {
|
||||
// Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise.
|
||||
(Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone()), stats)
|
||||
}
|
||||
crate::proof_task::ProofResult::StorageProof { .. } => {
|
||||
unreachable!("account worker only sends AccountMultiproof variant")
|
||||
}
|
||||
|
||||
@@ -116,7 +116,22 @@ impl BatchedStorageProof {
|
||||
}
|
||||
|
||||
/// Merges another storage proof job into this batch.
|
||||
///
|
||||
/// # Panics (debug builds only)
|
||||
/// Panics if `input.multi_added_removed_keys` does not point to the same Arc as the batch's.
|
||||
fn merge(&mut self, input: StorageProofInput, sender: ProofResultContext) {
|
||||
// Validate that all batched jobs share the same multi_added_removed_keys Arc.
|
||||
// This is a critical invariant: if jobs have different keys, the merged proof
|
||||
// would be computed with only the first job's keys, producing incorrect results.
|
||||
debug_assert!(
|
||||
match (&self.multi_added_removed_keys, &input.multi_added_removed_keys) {
|
||||
(Some(a), Some(b)) => Arc::ptr_eq(a, b),
|
||||
(None, None) => true,
|
||||
_ => false,
|
||||
},
|
||||
"All batched storage proof jobs must share the same multi_added_removed_keys Arc"
|
||||
);
|
||||
|
||||
self.prefix_set.extend_keys(input.prefix_set.iter().copied());
|
||||
self.target_slots.extend(input.target_slots);
|
||||
self.with_branch_node_masks |= input.with_branch_node_masks;
|
||||
@@ -221,7 +236,31 @@ impl BatchedAccountProof {
|
||||
}
|
||||
|
||||
/// Merges another account multiproof job into this batch.
|
||||
///
|
||||
/// # Panics (debug builds only)
|
||||
/// Panics if `input.multi_added_removed_keys` or `input.missed_leaves_storage_roots`
|
||||
/// do not point to the same Arc as the batch's.
|
||||
fn merge(&mut self, input: AccountMultiproofInput) {
|
||||
// Validate that all batched jobs share the same multi_added_removed_keys Arc.
|
||||
// This is a critical invariant: if jobs have different keys, the merged proof
|
||||
// would be computed with only the first job's keys, producing incorrect results.
|
||||
debug_assert!(
|
||||
match (&self.multi_added_removed_keys, &input.multi_added_removed_keys) {
|
||||
(Some(a), Some(b)) => Arc::ptr_eq(a, b),
|
||||
(None, None) => true,
|
||||
_ => false,
|
||||
},
|
||||
"All batched account proof jobs must share the same multi_added_removed_keys Arc"
|
||||
);
|
||||
|
||||
// Validate that all batched jobs share the same missed_leaves_storage_roots cache.
|
||||
// This is critical because workers may add entries to this cache during proof computation,
|
||||
// and all receivers expect to see those entries in their shared cache.
|
||||
debug_assert!(
|
||||
Arc::ptr_eq(&self.missed_leaves_storage_roots, &input.missed_leaves_storage_roots),
|
||||
"All batched account proof jobs must share the same missed_leaves_storage_roots Arc"
|
||||
);
|
||||
|
||||
// Merge targets.
|
||||
self.targets.extend(input.targets);
|
||||
|
||||
@@ -780,12 +819,16 @@ impl TrieNodeProvider for ProofTaskTrieNodeProvider {
|
||||
}
|
||||
}
|
||||
/// Result of a proof calculation, which can be either an account multiproof or a storage proof.
|
||||
///
|
||||
/// The proof data is wrapped in `Arc` to enable efficient sharing when batching multiple
|
||||
/// proof requests. This avoids expensive cloning of the underlying proof structures
|
||||
/// when sending results to multiple receivers.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ProofResult {
|
||||
/// Account multiproof with statistics
|
||||
AccountMultiproof {
|
||||
/// The account multiproof
|
||||
proof: DecodedMultiProof,
|
||||
/// The account multiproof (Arc-wrapped for efficient sharing in batches)
|
||||
proof: Arc<DecodedMultiProof>,
|
||||
/// Statistics collected during proof computation
|
||||
stats: ParallelTrieStats,
|
||||
},
|
||||
@@ -793,8 +836,8 @@ pub enum ProofResult {
|
||||
StorageProof {
|
||||
/// The hashed address this storage proof belongs to
|
||||
hashed_address: B256,
|
||||
/// The storage multiproof
|
||||
proof: DecodedStorageMultiProof,
|
||||
/// The storage multiproof (Arc-wrapped for efficient sharing in batches)
|
||||
proof: Arc<DecodedStorageMultiProof>,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -803,11 +846,17 @@ impl ProofResult {
|
||||
///
|
||||
/// For account multiproofs, returns the multiproof directly (discarding stats).
|
||||
/// For storage proofs, wraps the storage proof into a minimal multiproof.
|
||||
///
|
||||
/// Note: This method clones the inner proof data. If you need to avoid the clone
|
||||
/// when you're the sole owner, consider using `Arc::try_unwrap` first.
|
||||
pub fn into_multiproof(self) -> DecodedMultiProof {
|
||||
match self {
|
||||
Self::AccountMultiproof { proof, stats: _ } => proof,
|
||||
Self::AccountMultiproof { proof, stats: _ } => {
|
||||
Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone())
|
||||
}
|
||||
Self::StorageProof { hashed_address, proof } => {
|
||||
DecodedMultiProof::from_storage_proof(hashed_address, proof)
|
||||
let storage_proof = Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone());
|
||||
DecodedMultiProof::from_storage_proof(hashed_address, storage_proof)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1139,9 +1188,9 @@ where
|
||||
let num_senders = senders.len();
|
||||
match result {
|
||||
Ok(storage_proof) => {
|
||||
// Success case: clone the proof for each sender.
|
||||
// Success case: wrap proof in Arc for efficient sharing across all senders.
|
||||
let proof_result =
|
||||
ProofResult::StorageProof { hashed_address, proof: storage_proof };
|
||||
ProofResult::StorageProof { hashed_address, proof: Arc::new(storage_proof) };
|
||||
|
||||
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
|
||||
*storage_proofs_processed += 1;
|
||||
@@ -1562,8 +1611,8 @@ where
|
||||
let num_senders = senders.len();
|
||||
match result {
|
||||
Ok(proof) => {
|
||||
// Success case: clone the proof for each sender.
|
||||
let proof_result = ProofResult::AccountMultiproof { proof, stats };
|
||||
// Success case: wrap proof in Arc for efficient sharing across all senders.
|
||||
let proof_result = ProofResult::AccountMultiproof { proof: Arc::new(proof), stats };
|
||||
|
||||
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
|
||||
*account_proofs_processed += 1;
|
||||
@@ -1764,7 +1813,9 @@ where
|
||||
|
||||
drop(_guard);
|
||||
|
||||
// Extract storage proof from the result
|
||||
// Extract storage proof from the result.
|
||||
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it
|
||||
// here.
|
||||
let proof = match proof_msg.result? {
|
||||
ProofResult::StorageProof { hashed_address: addr, proof } => {
|
||||
debug_assert_eq!(
|
||||
@@ -1772,7 +1823,9 @@ where
|
||||
hashed_address,
|
||||
"storage worker must return same address: expected {hashed_address}, got {addr}"
|
||||
);
|
||||
proof
|
||||
// Efficiently unwrap Arc: returns inner value if sole owner, clones
|
||||
// otherwise.
|
||||
Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone())
|
||||
}
|
||||
ProofResult::AccountMultiproof { .. } => {
|
||||
unreachable!("storage worker only sends StorageProof variant")
|
||||
@@ -1835,8 +1888,11 @@ where
|
||||
// Consume remaining storage proof receivers for accounts not encountered during trie walk.
|
||||
for (hashed_address, receiver) in storage_proof_receivers {
|
||||
if let Ok(proof_msg) = receiver.recv() {
|
||||
// Extract storage proof from the result
|
||||
// Extract storage proof from the result.
|
||||
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here.
|
||||
if let Ok(ProofResult::StorageProof { proof, .. }) = proof_msg.result {
|
||||
// Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise.
|
||||
let proof = Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone());
|
||||
collected_decoded_storages.insert(hashed_address, proof);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user