From f85fcba8721a19c6a9d03683727fb2a5149daaa5 Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Wed, 21 Jan 2026 15:18:44 +0100 Subject: [PATCH] feat(trie): add V2 account proof computation and refactor proof types (#21214) Co-authored-by: Amp --- crates/engine/primitives/src/config.rs | 16 + .../tree/src/tree/payload_processor/mod.rs | 19 +- .../src/tree/payload_processor/multiproof.rs | 517 +++++++++++++----- .../src/tree/payload_processor/prewarm.rs | 89 ++- .../src/tree/payload_processor/sparse_trie.rs | 15 +- crates/trie/parallel/Cargo.toml | 2 +- crates/trie/parallel/src/proof.rs | 9 +- crates/trie/parallel/src/proof_task.rs | 444 +++++++++++---- crates/trie/parallel/src/stats.rs | 5 - crates/trie/parallel/src/value_encoder.rs | 2 - 10 files changed, 844 insertions(+), 274 deletions(-) diff --git a/crates/engine/primitives/src/config.rs b/crates/engine/primitives/src/config.rs index 2870d3dccc..0b72e1d624 100644 --- a/crates/engine/primitives/src/config.rs +++ b/crates/engine/primitives/src/config.rs @@ -34,6 +34,11 @@ fn default_account_worker_count() -> usize { /// The size of proof targets chunk to spawn in one multiproof calculation. pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 60; +/// The size of proof targets chunk to spawn in one multiproof calculation when V2 proofs are +/// enabled. This is 4x the default chunk size to take advantage of more efficient V2 proof +/// computation. +pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2: usize = DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE * 4; + /// Default number of reserved CPU cores for non-reth processes. /// /// This will be deducted from the thread count of main reth global threadpool. @@ -267,6 +272,17 @@ impl TreeConfig { self.multiproof_chunk_size } + /// Return the multiproof task chunk size, using the V2 default if V2 proofs are enabled + /// and the chunk size is at the default value. + pub const fn effective_multiproof_chunk_size(&self) -> usize { + if self.enable_proof_v2 && self.multiproof_chunk_size == DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE + { + DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2 + } else { + self.multiproof_chunk_size + } + } + /// Return the number of reserved CPU cores for non-reth processes pub const fn reserved_cpu_cores(&self) -> usize { self.reserved_cpu_cores diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 1803929c89..1fa4232b0e 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -245,6 +245,9 @@ where let (to_sparse_trie, sparse_trie_rx) = channel(); let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded(); + // Extract V2 proofs flag early so we can pass it to prewarm + let v2_proofs_enabled = config.enable_proof_v2(); + // Handle BAL-based optimization if available let prewarm_handle = if let Some(bal) = bal { // When BAL is present, use BAL prewarming and send BAL to multiproof @@ -261,6 +264,7 @@ where provider_builder.clone(), None, // Don't send proof targets when BAL is present Some(bal), + v2_proofs_enabled, ) } else { // Normal path: spawn with transaction prewarming @@ -271,6 +275,7 @@ where provider_builder.clone(), Some(to_multi_proof.clone()), None, + v2_proofs_enabled, ) }; @@ -278,7 +283,6 @@ where let task_ctx = ProofTaskCtx::new(multiproof_provider_factory); let storage_worker_count = config.storage_worker_count(); let account_worker_count = config.account_worker_count(); - let v2_proofs_enabled = config.enable_proof_v2(); let proof_handle = ProofWorkerHandle::new( self.executor.handle().clone(), task_ctx, @@ -290,10 +294,13 @@ where let multi_proof_task = MultiProofTask::new( proof_handle.clone(), to_sparse_trie, - config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()), + config + .multiproof_chunking_enabled() + .then_some(config.effective_multiproof_chunk_size()), to_multi_proof.clone(), from_multi_proof, - ); + ) + .with_v2_proofs_enabled(v2_proofs_enabled); // spawn multi-proof task let parent_span = span.clone(); @@ -342,8 +349,9 @@ where P: BlockReader + StateProviderFactory + StateReader + Clone + 'static, { let (prewarm_rx, execution_rx, size_hint) = self.spawn_tx_iterator(transactions); + // This path doesn't use multiproof, so V2 proofs flag doesn't matter let prewarm_handle = - self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None, bal); + self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None, bal, false); PayloadHandle { to_multi_proof: None, prewarm_handle, @@ -410,6 +418,7 @@ where } /// Spawn prewarming optionally wired to the multiproof task for target updates. + #[expect(clippy::too_many_arguments)] fn spawn_caching_with

( &self, env: ExecutionEnv, @@ -418,6 +427,7 @@ where provider_builder: StateProviderBuilder, to_multi_proof: Option>, bal: Option>, + v2_proofs_enabled: bool, ) -> CacheTaskHandle where P: BlockReader + StateProviderFactory + StateReader + Clone + 'static, @@ -440,6 +450,7 @@ where terminate_execution: Arc::new(AtomicBool::new(false)), precompile_cache_disabled: self.precompile_cache_disabled, precompile_cache_map: self.precompile_cache_map.clone(), + v2_proofs_enabled, }; let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new( diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index b5f1272b67..823c3e54e9 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -11,14 +11,18 @@ use reth_metrics::Metrics; use reth_provider::AccountReader; use reth_revm::state::EvmState; use reth_trie::{ - added_removed_keys::MultiAddedRemovedKeys, DecodedMultiProof, HashedPostState, HashedStorage, + added_removed_keys::MultiAddedRemovedKeys, proof_v2, HashedPostState, HashedStorage, MultiProofTargets, }; +#[cfg(test)] +use reth_trie_parallel::stats::ParallelTrieTracker; use reth_trie_parallel::{ proof::ParallelProof, proof_task::{ - AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle, + AccountMultiproofInput, ProofResult, ProofResultContext, ProofResultMessage, + ProofWorkerHandle, }, + targets_v2::{ChunkedMultiProofTargetsV2, MultiProofTargetsV2}, }; use revm_primitives::map::{hash_map, B256Map}; use std::{collections::BTreeMap, sync::Arc, time::Instant}; @@ -63,12 +67,12 @@ const DEFAULT_MAX_TARGETS_FOR_CHUNKING: usize = 300; /// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the /// state. -#[derive(Default, Debug)] +#[derive(Debug)] pub struct SparseTrieUpdate { /// The state update that was used to calculate the proof pub(crate) state: HashedPostState, /// The calculated multiproof - pub(crate) multiproof: DecodedMultiProof, + pub(crate) multiproof: ProofResult, } impl SparseTrieUpdate { @@ -80,7 +84,11 @@ impl SparseTrieUpdate { /// Construct update from multiproof. #[cfg(test)] pub(super) fn from_multiproof(multiproof: reth_trie::MultiProof) -> alloy_rlp::Result { - Ok(Self { multiproof: multiproof.try_into()?, ..Default::default() }) + let stats = ParallelTrieTracker::default().finish(); + Ok(Self { + state: HashedPostState::default(), + multiproof: ProofResult::Legacy(multiproof.try_into()?, stats), + }) } /// Extend update with contents of the other. @@ -94,7 +102,7 @@ impl SparseTrieUpdate { #[derive(Debug)] pub(super) enum MultiProofMessage { /// Prefetch proof targets - PrefetchProofs(MultiProofTargets), + PrefetchProofs(VersionedMultiProofTargets), /// New state update from transaction execution with its source StateUpdate(Source, EvmState), /// State update that can be applied to the sparse trie without any new proofs. @@ -223,12 +231,155 @@ pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostStat hashed_state } +/// Extends a `MultiProofTargets` with the contents of a `VersionedMultiProofTargets`, +/// regardless of which variant the latter is. +fn extend_multiproof_targets(dest: &mut MultiProofTargets, src: &VersionedMultiProofTargets) { + match src { + VersionedMultiProofTargets::Legacy(targets) => { + dest.extend_ref(targets); + } + VersionedMultiProofTargets::V2(targets) => { + // Add all account targets + for target in &targets.account_targets { + dest.entry(target.key()).or_default(); + } + + // Add all storage targets + for (hashed_address, slots) in &targets.storage_targets { + let slot_set = dest.entry(*hashed_address).or_default(); + for slot in slots { + slot_set.insert(slot.key()); + } + } + } + } +} + +/// A set of multiproof targets which can be either in the legacy or V2 representations. +#[derive(Debug)] +pub(super) enum VersionedMultiProofTargets { + /// Legacy targets + Legacy(MultiProofTargets), + /// V2 targets + V2(MultiProofTargetsV2), +} + +impl VersionedMultiProofTargets { + /// Returns true if there are no account or storage targets. + fn is_empty(&self) -> bool { + match self { + Self::Legacy(targets) => targets.is_empty(), + Self::V2(targets) => targets.is_empty(), + } + } + + /// Returns the number of account targets in the multiproof target + fn account_targets_len(&self) -> usize { + match self { + Self::Legacy(targets) => targets.len(), + Self::V2(targets) => targets.account_targets.len(), + } + } + + /// Returns the number of storage targets in the multiproof target + fn storage_targets_len(&self) -> usize { + match self { + Self::Legacy(targets) => targets.values().map(|slots| slots.len()).sum::(), + Self::V2(targets) => { + targets.storage_targets.values().map(|slots| slots.len()).sum::() + } + } + } + + /// Returns the number of accounts in the multiproof targets. + fn len(&self) -> usize { + match self { + Self::Legacy(targets) => targets.len(), + Self::V2(targets) => targets.account_targets.len(), + } + } + + /// Returns the total storage slot count across all accounts. + fn storage_count(&self) -> usize { + match self { + Self::Legacy(targets) => targets.values().map(|slots| slots.len()).sum(), + Self::V2(targets) => targets.storage_targets.values().map(|slots| slots.len()).sum(), + } + } + + /// Returns the number of items that will be considered during chunking. + fn chunking_length(&self) -> usize { + match self { + Self::Legacy(targets) => targets.chunking_length(), + Self::V2(targets) => { + // For V2, count accounts + storage slots + targets.account_targets.len() + + targets.storage_targets.values().map(|slots| slots.len()).sum::() + } + } + } + + /// Retains the targets representing the difference with another `MultiProofTargets`. + /// Removes all targets that are already present in `other`. + fn retain_difference(&mut self, other: &MultiProofTargets) { + match self { + Self::Legacy(targets) => { + targets.retain_difference(other); + } + Self::V2(targets) => { + // Remove account targets that exist in other + targets.account_targets.retain(|target| !other.contains_key(&target.key())); + + // For each account in storage_targets, remove slots that exist in other + targets.storage_targets.retain(|hashed_address, slots| { + if let Some(other_slots) = other.get(hashed_address) { + slots.retain(|slot| !other_slots.contains(&slot.key())); + !slots.is_empty() + } else { + true + } + }); + } + } + } + + /// Extends this `VersionedMultiProofTargets` with the contents of another. + /// + /// Panics if the variants do not match. + fn extend(&mut self, other: Self) { + match (self, other) { + (Self::Legacy(dest), Self::Legacy(src)) => { + dest.extend(src); + } + (Self::V2(dest), Self::V2(src)) => { + dest.account_targets.extend(src.account_targets); + for (addr, slots) in src.storage_targets { + dest.storage_targets.entry(addr).or_default().extend(slots); + } + } + _ => panic!("Cannot extend VersionedMultiProofTargets with mismatched variants"), + } + } + + /// Chunks this `VersionedMultiProofTargets` into smaller chunks of the given size. + fn chunks(self, chunk_size: usize) -> Box> { + match self { + Self::Legacy(targets) => { + Box::new(MultiProofTargets::chunks(targets, chunk_size).map(Self::Legacy)) + } + Self::V2(targets) => { + Box::new(ChunkedMultiProofTargetsV2::new(targets, chunk_size).map(Self::V2)) + } + } + } +} + /// Input parameters for dispatching a multiproof calculation. #[derive(Debug)] struct MultiproofInput { source: Option, hashed_state_update: HashedPostState, - proof_targets: MultiProofTargets, + proof_targets: VersionedMultiProofTargets, proof_sequence_number: u64, state_root_message_sender: CrossbeamSender, multi_added_removed_keys: Option>, @@ -263,8 +414,6 @@ pub struct MultiproofManager { proof_result_tx: CrossbeamSender, /// Metrics metrics: MultiProofTaskMetrics, - /// Whether to use V2 storage proofs - v2_proofs_enabled: bool, } impl MultiproofManager { @@ -278,9 +427,7 @@ impl MultiproofManager { metrics.max_storage_workers.set(proof_worker_handle.total_storage_workers() as f64); metrics.max_account_workers.set(proof_worker_handle.total_account_workers() as f64); - let v2_proofs_enabled = proof_worker_handle.v2_proofs_enabled(); - - Self { metrics, proof_worker_handle, proof_result_tx, v2_proofs_enabled } + Self { metrics, proof_worker_handle, proof_result_tx } } /// Dispatches a new multiproof calculation to worker pools. @@ -325,41 +472,48 @@ impl MultiproofManager { multi_added_removed_keys, } = multiproof_input; - let account_targets = proof_targets.len(); - let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::(); - trace!( target: "engine::tree::payload_processor::multiproof", proof_sequence_number, ?proof_targets, - account_targets, - storage_targets, + account_targets = proof_targets.account_targets_len(), + storage_targets = proof_targets.storage_targets_len(), ?source, "Dispatching multiproof to workers" ); let start = Instant::now(); - // Extend prefix sets with targets - let frozen_prefix_sets = - ParallelProof::extend_prefix_sets_with_targets(&Default::default(), &proof_targets); + // Workers will send ProofResultMessage directly to proof_result_rx + let proof_result_sender = ProofResultContext::new( + self.proof_result_tx.clone(), + proof_sequence_number, + hashed_state_update, + start, + ); - // Dispatch account multiproof to worker pool with result sender - let input = AccountMultiproofInput { - targets: proof_targets, - prefix_sets: frozen_prefix_sets, - collect_branch_node_masks: true, - multi_added_removed_keys, - // Workers will send ProofResultMessage directly to proof_result_rx - proof_result_sender: ProofResultContext::new( - self.proof_result_tx.clone(), - proof_sequence_number, - hashed_state_update, - start, - ), - v2_proofs_enabled: self.v2_proofs_enabled, + let input = match proof_targets { + VersionedMultiProofTargets::Legacy(proof_targets) => { + // Extend prefix sets with targets + let frozen_prefix_sets = ParallelProof::extend_prefix_sets_with_targets( + &Default::default(), + &proof_targets, + ); + + AccountMultiproofInput::Legacy { + targets: proof_targets, + prefix_sets: frozen_prefix_sets, + collect_branch_node_masks: true, + multi_added_removed_keys, + proof_result_sender, + } + } + VersionedMultiProofTargets::V2(proof_targets) => { + AccountMultiproofInput::V2 { targets: proof_targets, proof_result_sender } + } }; + // Dispatch account multiproof to worker pool with result sender if let Err(e) = self.proof_worker_handle.dispatch_account_multiproof(input) { error!(target: "engine::tree::payload_processor::multiproof", ?e, "Failed to dispatch account multiproof"); return; @@ -561,6 +715,9 @@ pub(super) struct MultiProofTask { /// there are any active workers and force chunking across workers. This is to prevent tasks /// which are very long from hitting a single worker. max_targets_for_chunking: usize, + /// Whether or not V2 proof calculation is enabled. If enabled then [`MultiProofTargetsV2`] + /// will be produced by state updates. + v2_proofs_enabled: bool, } impl MultiProofTask { @@ -592,9 +749,16 @@ impl MultiProofTask { ), metrics, max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING, + v2_proofs_enabled: false, } } + /// Enables V2 proof target generation on state updates. + pub(super) const fn with_v2_proofs_enabled(mut self, v2_proofs_enabled: bool) -> Self { + self.v2_proofs_enabled = v2_proofs_enabled; + self + } + /// Handles request for proof prefetch. /// /// Returns how many multiproof tasks were dispatched for the prefetch request. @@ -602,37 +766,29 @@ impl MultiProofTask { level = "debug", target = "engine::tree::payload_processor::multiproof", skip_all, - fields(accounts = targets.len(), chunks = 0) + fields(accounts = targets.account_targets_len(), chunks = 0) )] - fn on_prefetch_proof(&mut self, mut targets: MultiProofTargets) -> u64 { + fn on_prefetch_proof(&mut self, mut targets: VersionedMultiProofTargets) -> u64 { // Remove already fetched proof targets to avoid redundant work. targets.retain_difference(&self.fetched_proof_targets); - self.fetched_proof_targets.extend_ref(&targets); + extend_multiproof_targets(&mut self.fetched_proof_targets, &targets); - // Make sure all target accounts have an `AddedRemovedKeySet` in the + // For Legacy multiproofs, make sure all target accounts have an `AddedRemovedKeySet` in the // [`MultiAddedRemovedKeys`]. Even if there are not any known removed keys for the account, // we still want to optimistically fetch extension children for the leaf addition case. - self.multi_added_removed_keys.touch_accounts(targets.keys().copied()); - - // Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks - let multi_added_removed_keys = Arc::new(MultiAddedRemovedKeys { - account: self.multi_added_removed_keys.account.clone(), - storages: targets - .keys() - .filter_map(|account| { - self.multi_added_removed_keys - .storages - .get(account) - .cloned() - .map(|keys| (*account, keys)) - }) - .collect(), - }); + // V2 multiproofs don't need this. + let multi_added_removed_keys = + if let VersionedMultiProofTargets::Legacy(legacy_targets) = &targets { + self.multi_added_removed_keys.touch_accounts(legacy_targets.keys().copied()); + Some(Arc::new(self.multi_added_removed_keys.clone())) + } else { + None + }; self.metrics.prefetch_proof_targets_accounts_histogram.record(targets.len() as f64); self.metrics .prefetch_proof_targets_storages_histogram - .record(targets.values().map(|slots| slots.len()).sum::() as f64); + .record(targets.storage_count() as f64); let chunking_len = targets.chunking_length(); let available_account_workers = @@ -646,7 +802,7 @@ impl MultiProofTask { self.max_targets_for_chunking, available_account_workers, available_storage_workers, - MultiProofTargets::chunks, + VersionedMultiProofTargets::chunks, |proof_targets| { self.multiproof_manager.dispatch(MultiproofInput { source: None, @@ -654,7 +810,7 @@ impl MultiProofTask { proof_targets, proof_sequence_number: self.proof_sequencer.next_sequence(), state_root_message_sender: self.tx.clone(), - multi_added_removed_keys: Some(multi_added_removed_keys.clone()), + multi_added_removed_keys: multi_added_removed_keys.clone(), }); }, ); @@ -757,6 +913,7 @@ impl MultiProofTask { self.multiproof_manager.proof_worker_handle.available_account_workers(); let available_storage_workers = self.multiproof_manager.proof_worker_handle.available_storage_workers(); + let num_chunks = dispatch_with_chunking( not_fetched_state_update, chunking_len, @@ -770,8 +927,9 @@ impl MultiProofTask { &hashed_state_update, &self.fetched_proof_targets, &multi_added_removed_keys, + self.v2_proofs_enabled, ); - spawned_proof_targets.extend_ref(&proof_targets); + extend_multiproof_targets(&mut spawned_proof_targets, &proof_targets); self.multiproof_manager.dispatch(MultiproofInput { source: Some(source), @@ -871,7 +1029,10 @@ impl MultiProofTask { batch_metrics.proofs_processed += 1; if let Some(combined_update) = self.on_proof( sequence_number, - SparseTrieUpdate { state, multiproof: Default::default() }, + SparseTrieUpdate { + state, + multiproof: ProofResult::empty(self.v2_proofs_enabled), + }, ) { let _ = self.to_sparse_trie.send(combined_update); } @@ -898,8 +1059,7 @@ impl MultiProofTask { } let account_targets = merged_targets.len(); - let storage_targets = - merged_targets.values().map(|slots| slots.len()).sum::(); + let storage_targets = merged_targets.storage_count(); batch_metrics.prefetch_proofs_requested += self.on_prefetch_proof(merged_targets); trace!( target: "engine::tree::payload_processor::multiproof", @@ -1003,7 +1163,10 @@ impl MultiProofTask { if let Some(combined_update) = self.on_proof( sequence_number, - SparseTrieUpdate { state, multiproof: Default::default() }, + SparseTrieUpdate { + state, + multiproof: ProofResult::empty(self.v2_proofs_enabled), + }, ) { let _ = self.to_sparse_trie.send(combined_update); } @@ -1106,7 +1269,7 @@ impl MultiProofTask { let update = SparseTrieUpdate { state: proof_result.state, - multiproof: proof_result_data.proof, + multiproof: proof_result_data, }; if let Some(combined_update) = @@ -1196,7 +1359,7 @@ struct MultiproofBatchCtx { /// received. updates_finished_time: Option, /// Reusable buffer for accumulating prefetch targets during batching. - accumulated_prefetch_targets: Vec, + accumulated_prefetch_targets: Vec, } impl MultiproofBatchCtx { @@ -1242,40 +1405,77 @@ fn get_proof_targets( state_update: &HashedPostState, fetched_proof_targets: &MultiProofTargets, multi_added_removed_keys: &MultiAddedRemovedKeys, -) -> MultiProofTargets { - let mut targets = MultiProofTargets::default(); + v2_enabled: bool, +) -> VersionedMultiProofTargets { + if v2_enabled { + let mut targets = MultiProofTargetsV2::default(); - // first collect all new accounts (not previously fetched) - for hashed_address in state_update.accounts.keys() { - if !fetched_proof_targets.contains_key(hashed_address) { - targets.insert(*hashed_address, HashSet::default()); + // first collect all new accounts (not previously fetched) + for &hashed_address in state_update.accounts.keys() { + if !fetched_proof_targets.contains_key(&hashed_address) { + targets.account_targets.push(hashed_address.into()); + } } + + // then process storage slots for all accounts in the state update + for (hashed_address, storage) in &state_update.storages { + let fetched = fetched_proof_targets.get(hashed_address); + + // If the storage is wiped, we still need to fetch the account proof. + if storage.wiped && fetched.is_none() { + targets.account_targets.push(Into::::into(*hashed_address)); + continue + } + + let changed_slots = storage + .storage + .keys() + .filter(|slot| !fetched.is_some_and(|f| f.contains(*slot))) + .map(|slot| Into::::into(*slot)) + .collect::>(); + + if !changed_slots.is_empty() { + targets.account_targets.push((*hashed_address).into()); + targets.storage_targets.insert(*hashed_address, changed_slots); + } + } + + VersionedMultiProofTargets::V2(targets) + } else { + let mut targets = MultiProofTargets::default(); + + // first collect all new accounts (not previously fetched) + for hashed_address in state_update.accounts.keys() { + if !fetched_proof_targets.contains_key(hashed_address) { + targets.insert(*hashed_address, HashSet::default()); + } + } + + // then process storage slots for all accounts in the state update + for (hashed_address, storage) in &state_update.storages { + let fetched = fetched_proof_targets.get(hashed_address); + let storage_added_removed_keys = multi_added_removed_keys.get_storage(hashed_address); + let mut changed_slots = storage + .storage + .keys() + .filter(|slot| { + !fetched.is_some_and(|f| f.contains(*slot)) || + storage_added_removed_keys.is_some_and(|k| k.is_removed(slot)) + }) + .peekable(); + + // If the storage is wiped, we still need to fetch the account proof. + if storage.wiped && fetched.is_none() { + targets.entry(*hashed_address).or_default(); + } + + if changed_slots.peek().is_some() { + targets.entry(*hashed_address).or_default().extend(changed_slots); + } + } + + VersionedMultiProofTargets::Legacy(targets) } - - // then process storage slots for all accounts in the state update - for (hashed_address, storage) in &state_update.storages { - let fetched = fetched_proof_targets.get(hashed_address); - let storage_added_removed_keys = multi_added_removed_keys.get_storage(hashed_address); - let mut changed_slots = storage - .storage - .keys() - .filter(|slot| { - !fetched.is_some_and(|f| f.contains(*slot)) || - storage_added_removed_keys.is_some_and(|k| k.is_removed(slot)) - }) - .peekable(); - - // If the storage is wiped, we still need to fetch the account proof. - if storage.wiped && fetched.is_none() { - targets.entry(*hashed_address).or_default(); - } - - if changed_slots.peek().is_some() { - targets.entry(*hashed_address).or_default().extend(changed_slots); - } - } - - targets } /// Dispatches work items as a single unit or in chunks based on target size and worker @@ -1481,12 +1681,24 @@ mod tests { state } + fn unwrap_legacy_targets(targets: VersionedMultiProofTargets) -> MultiProofTargets { + match targets { + VersionedMultiProofTargets::Legacy(targets) => targets, + VersionedMultiProofTargets::V2(_) => panic!("Expected Legacy targets"), + } + } + #[test] fn test_get_proof_targets_new_account_targets() { let state = create_get_proof_targets_state(); let fetched = MultiProofTargets::default(); - let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new()); + let targets = unwrap_legacy_targets(get_proof_targets( + &state, + &fetched, + &MultiAddedRemovedKeys::new(), + false, + )); // should return all accounts as targets since nothing was fetched before assert_eq!(targets.len(), state.accounts.len()); @@ -1500,7 +1712,12 @@ mod tests { let state = create_get_proof_targets_state(); let fetched = MultiProofTargets::default(); - let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new()); + let targets = unwrap_legacy_targets(get_proof_targets( + &state, + &fetched, + &MultiAddedRemovedKeys::new(), + false, + )); // verify storage slots are included for accounts with storage for (addr, storage) in &state.storages { @@ -1528,7 +1745,12 @@ mod tests { // mark the account as already fetched fetched.insert(*fetched_addr, HashSet::default()); - let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new()); + let targets = unwrap_legacy_targets(get_proof_targets( + &state, + &fetched, + &MultiAddedRemovedKeys::new(), + false, + )); // should not include the already fetched account since it has no storage updates assert!(!targets.contains_key(fetched_addr)); @@ -1548,7 +1770,12 @@ mod tests { fetched_slots.insert(fetched_slot); fetched.insert(*addr, fetched_slots); - let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new()); + let targets = unwrap_legacy_targets(get_proof_targets( + &state, + &fetched, + &MultiAddedRemovedKeys::new(), + false, + )); // should not include the already fetched storage slot let target_slots = &targets[addr]; @@ -1561,7 +1788,12 @@ mod tests { let state = HashedPostState::default(); let fetched = MultiProofTargets::default(); - let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new()); + let targets = unwrap_legacy_targets(get_proof_targets( + &state, + &fetched, + &MultiAddedRemovedKeys::new(), + false, + )); assert!(targets.is_empty()); } @@ -1588,7 +1820,12 @@ mod tests { fetched_slots.insert(slot1); fetched.insert(addr1, fetched_slots); - let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new()); + let targets = unwrap_legacy_targets(get_proof_targets( + &state, + &fetched, + &MultiAddedRemovedKeys::new(), + false, + )); assert!(targets.contains_key(&addr2)); assert!(!targets[&addr1].contains(&slot1)); @@ -1614,7 +1851,12 @@ mod tests { assert!(!state.accounts.contains_key(&addr)); assert!(!fetched.contains_key(&addr)); - let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new()); + let targets = unwrap_legacy_targets(get_proof_targets( + &state, + &fetched, + &MultiAddedRemovedKeys::new(), + false, + )); // verify that we still get the storage slots for the unmodified account assert!(targets.contains_key(&addr)); @@ -1656,7 +1898,12 @@ mod tests { removed_state.storages.insert(addr, removed_storage); multi_added_removed_keys.update_with_state(&removed_state); - let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys); + let targets = unwrap_legacy_targets(get_proof_targets( + &state, + &fetched, + &multi_added_removed_keys, + false, + )); // slot1 should be included despite being fetched, because it's marked as removed assert!(targets.contains_key(&addr)); @@ -1683,7 +1930,12 @@ mod tests { storage.storage.insert(slot1, U256::from(100)); state.storages.insert(addr, storage); - let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys); + let targets = unwrap_legacy_targets(get_proof_targets( + &state, + &fetched, + &multi_added_removed_keys, + false, + )); // account should be included because storage is wiped and account wasn't fetched assert!(targets.contains_key(&addr)); @@ -1726,7 +1978,12 @@ mod tests { removed_state.storages.insert(addr, removed_storage); multi_added_removed_keys.update_with_state(&removed_state); - let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys); + let targets = unwrap_legacy_targets(get_proof_targets( + &state, + &fetched, + &multi_added_removed_keys, + false, + )); // only slots in the state update can be included, so slot3 should not appear assert!(!targets.contains_key(&addr)); @@ -1753,9 +2010,12 @@ mod tests { targets3.insert(addr3, HashSet::default()); let tx = task.tx.clone(); - tx.send(MultiProofMessage::PrefetchProofs(targets1)).unwrap(); - tx.send(MultiProofMessage::PrefetchProofs(targets2)).unwrap(); - tx.send(MultiProofMessage::PrefetchProofs(targets3)).unwrap(); + tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets1))) + .unwrap(); + tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets2))) + .unwrap(); + tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets3))) + .unwrap(); let proofs_requested = if let Ok(MultiProofMessage::PrefetchProofs(targets)) = task.rx.recv() { @@ -1769,11 +2029,12 @@ mod tests { assert_eq!(num_batched, 3); assert_eq!(merged_targets.len(), 3); - assert!(merged_targets.contains_key(&addr1)); - assert!(merged_targets.contains_key(&addr2)); - assert!(merged_targets.contains_key(&addr3)); + let legacy_targets = unwrap_legacy_targets(merged_targets); + assert!(legacy_targets.contains_key(&addr1)); + assert!(legacy_targets.contains_key(&addr2)); + assert!(legacy_targets.contains_key(&addr3)); - task.on_prefetch_proof(merged_targets) + task.on_prefetch_proof(VersionedMultiProofTargets::Legacy(legacy_targets)) } else { panic!("Expected PrefetchProofs message"); }; @@ -1848,11 +2109,16 @@ mod tests { // Queue: [PrefetchProofs1, PrefetchProofs2, StateUpdate1, StateUpdate2, PrefetchProofs3] let tx = task.tx.clone(); - tx.send(MultiProofMessage::PrefetchProofs(targets1)).unwrap(); - tx.send(MultiProofMessage::PrefetchProofs(targets2)).unwrap(); + tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets1))) + .unwrap(); + tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets2))) + .unwrap(); tx.send(MultiProofMessage::StateUpdate(source.into(), state_update1)).unwrap(); tx.send(MultiProofMessage::StateUpdate(source.into(), state_update2)).unwrap(); - tx.send(MultiProofMessage::PrefetchProofs(targets3.clone())).unwrap(); + tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy( + targets3.clone(), + ))) + .unwrap(); // Step 1: Receive and batch PrefetchProofs (should get targets1 + targets2) let mut pending_msg: Option = None; @@ -1878,9 +2144,10 @@ mod tests { // Should have batched exactly 2 PrefetchProofs (not 3!) assert_eq!(num_batched, 2, "Should batch only until different message type"); assert_eq!(merged_targets.len(), 2); - assert!(merged_targets.contains_key(&addr1)); - assert!(merged_targets.contains_key(&addr2)); - assert!(!merged_targets.contains_key(&addr3), "addr3 should NOT be in first batch"); + let legacy_targets = unwrap_legacy_targets(merged_targets); + assert!(legacy_targets.contains_key(&addr1)); + assert!(legacy_targets.contains_key(&addr2)); + assert!(!legacy_targets.contains_key(&addr3), "addr3 should NOT be in first batch"); } else { panic!("Expected PrefetchProofs message"); } @@ -1905,7 +2172,8 @@ mod tests { match task.rx.try_recv() { Ok(MultiProofMessage::PrefetchProofs(targets)) => { assert_eq!(targets.len(), 1); - assert!(targets.contains_key(&addr3)); + let legacy_targets = unwrap_legacy_targets(targets); + assert!(legacy_targets.contains_key(&addr3)); } _ => panic!("PrefetchProofs3 was lost!"), } @@ -1951,9 +2219,13 @@ mod tests { let source = StateChangeSource::Transaction(99); let tx = task.tx.clone(); - tx.send(MultiProofMessage::PrefetchProofs(prefetch1)).unwrap(); + tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(prefetch1))) + .unwrap(); tx.send(MultiProofMessage::StateUpdate(source.into(), state_update)).unwrap(); - tx.send(MultiProofMessage::PrefetchProofs(prefetch2.clone())).unwrap(); + tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy( + prefetch2.clone(), + ))) + .unwrap(); let mut ctx = MultiproofBatchCtx::new(Instant::now()); let mut batch_metrics = MultiproofBatchMetrics::default(); @@ -1986,7 +2258,8 @@ mod tests { match task.rx.try_recv() { Ok(MultiProofMessage::PrefetchProofs(targets)) => { assert_eq!(targets.len(), 1); - assert!(targets.contains_key(&prefetch_addr2)); + let legacy_targets = unwrap_legacy_targets(targets); + assert!(legacy_targets.contains_key(&prefetch_addr2)); } other => panic!("Expected PrefetchProofs2 in channel, got {:?}", other), } diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index 6021098627..1083450549 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -16,7 +16,7 @@ use crate::tree::{ payload_processor::{ bal::{total_slots, BALSlotIter}, executor::WorkloadExecutor, - multiproof::MultiProofMessage, + multiproof::{MultiProofMessage, VersionedMultiProofTargets}, ExecutionCache as PayloadExecutionCache, }, precompile_cache::{CachedPrecompile, PrecompileCacheMap}, @@ -237,7 +237,7 @@ where } /// If configured and the tx returned proof targets, emit the targets the transaction produced - fn send_multi_proof_targets(&self, targets: Option) { + fn send_multi_proof_targets(&self, targets: Option) { if self.is_execution_terminated() { // if execution is already terminated then we dont need to send more proof fetch // messages @@ -479,6 +479,8 @@ where pub(super) terminate_execution: Arc, pub(super) precompile_cache_disabled: bool, pub(super) precompile_cache_map: PrecompileCacheMap>, + /// Whether V2 proof calculation is enabled. + pub(super) v2_proofs_enabled: bool, } impl PrewarmContext @@ -487,10 +489,12 @@ where P: BlockReader + StateProviderFactory + StateReader + Clone + 'static, Evm: ConfigureEvm + 'static, { - /// Splits this context into an evm, an evm config, metrics, and the atomic bool for terminating - /// execution. + /// Splits this context into an evm, an evm config, metrics, the atomic bool for terminating + /// execution, and whether V2 proofs are enabled. #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)] - fn evm_for_ctx(self) -> Option<(EvmFor, PrewarmMetrics, Arc)> { + fn evm_for_ctx( + self, + ) -> Option<(EvmFor, PrewarmMetrics, Arc, bool)> { let Self { env, evm_config, @@ -500,6 +504,7 @@ where terminate_execution, precompile_cache_disabled, precompile_cache_map, + v2_proofs_enabled, } = self; let mut state_provider = match provider.build() { @@ -549,7 +554,7 @@ where }); } - Some((evm, metrics, terminate_execution)) + Some((evm, metrics, terminate_execution, v2_proofs_enabled)) } /// Accepts an [`mpsc::Receiver`] of transactions and a handle to prewarm task. Executes @@ -570,7 +575,10 @@ where ) where Tx: ExecutableTxFor, { - let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return }; + let Some((mut evm, metrics, terminate_execution, v2_proofs_enabled)) = self.evm_for_ctx() + else { + return + }; while let Ok(IndexedTransaction { index, tx }) = { let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "recv tx") @@ -633,7 +641,8 @@ where let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash()) .entered(); - let (targets, storage_targets) = multiproof_targets_from_state(res.state); + let (targets, storage_targets) = + multiproof_targets_from_state(res.state, v2_proofs_enabled); metrics.prefetch_storage_targets.record(storage_targets as f64); let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) }); drop(_enter); @@ -778,9 +787,22 @@ where } } -/// Returns a set of [`MultiProofTargets`] and the total amount of storage targets, based on the +/// Returns a set of [`VersionedMultiProofTargets`] and the total amount of storage targets, based +/// on the given state. +fn multiproof_targets_from_state( + state: EvmState, + v2_enabled: bool, +) -> (VersionedMultiProofTargets, usize) { + if v2_enabled { + multiproof_targets_v2_from_state(state) + } else { + multiproof_targets_legacy_from_state(state) + } +} + +/// Returns legacy [`MultiProofTargets`] and the total amount of storage targets, based on the /// given state. -fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) { +fn multiproof_targets_legacy_from_state(state: EvmState) -> (VersionedMultiProofTargets, usize) { let mut targets = MultiProofTargets::with_capacity(state.len()); let mut storage_targets = 0; for (addr, account) in state { @@ -810,7 +832,50 @@ fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) targets.insert(keccak256(addr), storage_set); } - (targets, storage_targets) + (VersionedMultiProofTargets::Legacy(targets), storage_targets) +} + +/// Returns V2 [`reth_trie_parallel::targets_v2::MultiProofTargetsV2`] and the total amount of +/// storage targets, based on the given state. +fn multiproof_targets_v2_from_state(state: EvmState) -> (VersionedMultiProofTargets, usize) { + use reth_trie::proof_v2; + use reth_trie_parallel::targets_v2::MultiProofTargetsV2; + + let mut targets = MultiProofTargetsV2::default(); + let mut storage_target_count = 0; + for (addr, account) in state { + // if the account was not touched, or if the account was selfdestructed, do not + // fetch proofs for it + // + // Since selfdestruct can only happen in the same transaction, we can skip + // prefetching proofs for selfdestructed accounts + // + // See: https://eips.ethereum.org/EIPS/eip-6780 + if !account.is_touched() || account.is_selfdestructed() { + continue + } + + let hashed_address = keccak256(addr); + targets.account_targets.push(hashed_address.into()); + + let mut storage_slots = Vec::with_capacity(account.storage.len()); + for (key, slot) in account.storage { + // do nothing if unchanged + if !slot.is_changed() { + continue + } + + let hashed_slot = keccak256(B256::new(key.to_be_bytes())); + storage_slots.push(proof_v2::Target::from(hashed_slot)); + } + + storage_target_count += storage_slots.len(); + if !storage_slots.is_empty() { + targets.storage_targets.insert(hashed_address, storage_slots); + } + } + + (VersionedMultiProofTargets::V2(targets), storage_target_count) } /// The events the pre-warm task can handle. @@ -835,7 +900,7 @@ pub(super) enum PrewarmTaskEvent { /// The outcome of a pre-warm task Outcome { /// The prepared proof targets based on the evm state outcome - proof_targets: Option, + proof_targets: Option, }, /// Finished executing all transactions FinishedTxExecution { diff --git a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs index b4c150cfa9..052fd8672b 100644 --- a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs +++ b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs @@ -4,7 +4,7 @@ use crate::tree::payload_processor::multiproof::{MultiProofTaskMetrics, SparseTr use alloy_primitives::B256; use rayon::iter::{ParallelBridge, ParallelIterator}; use reth_trie::{updates::TrieUpdates, Nibbles}; -use reth_trie_parallel::root::ParallelStateRootError; +use reth_trie_parallel::{proof_task::ProofResult, root::ParallelStateRootError}; use reth_trie_sparse::{ errors::{SparseStateTrieResult, SparseTrieErrorKind}, provider::{TrieNodeProvider, TrieNodeProviderFactory}, @@ -97,8 +97,8 @@ where debug!( target: "engine::root", num_updates, - account_proofs = update.multiproof.account_subtree.len(), - storage_proofs = update.multiproof.storages.len(), + account_proofs = update.multiproof.account_proofs_len(), + storage_proofs = update.multiproof.storage_proofs_len(), "Updating sparse trie" ); @@ -157,7 +157,14 @@ where let started_at = Instant::now(); // Reveal new accounts and storage slots. - trie.reveal_decoded_multiproof(multiproof)?; + match multiproof { + ProofResult::Legacy(decoded, _) => { + trie.reveal_decoded_multiproof(decoded)?; + } + ProofResult::V2(decoded_v2) => { + trie.reveal_decoded_multiproof_v2(decoded_v2)?; + } + } let reveal_multiproof_elapsed = started_at.elapsed(); trace!( target: "engine::root::sparse", diff --git a/crates/trie/parallel/Cargo.toml b/crates/trie/parallel/Cargo.toml index d64f2dfb51..812dd2b85b 100644 --- a/crates/trie/parallel/Cargo.toml +++ b/crates/trie/parallel/Cargo.toml @@ -13,8 +13,8 @@ workspace = true [dependencies] # reth -reth-execution-errors.workspace = true reth-primitives-traits.workspace = true +reth-execution-errors.workspace = true reth-provider.workspace = true reth-storage-errors.workspace = true reth-trie-common.workspace = true diff --git a/crates/trie/parallel/src/proof.rs b/crates/trie/parallel/src/proof.rs index 7bf936bad3..d42534c271 100644 --- a/crates/trie/parallel/src/proof.rs +++ b/crates/trie/parallel/src/proof.rs @@ -197,7 +197,7 @@ impl ParallelProof { let (result_tx, result_rx) = crossbeam_unbounded(); let account_multiproof_start_time = Instant::now(); - let input = AccountMultiproofInput { + let input = AccountMultiproofInput::Legacy { targets, prefix_sets, collect_branch_node_masks: self.collect_branch_node_masks, @@ -208,7 +208,6 @@ impl ParallelProof { HashedPostState::default(), account_multiproof_start_time, ), - v2_proofs_enabled: self.v2_proofs_enabled, }; self.proof_worker_handle @@ -222,7 +221,9 @@ impl ParallelProof { ) })?; - let ProofResult { proof: multiproof, stats } = proof_result_msg.result?; + let ProofResult::Legacy(multiproof, stats) = proof_result_msg.result? else { + panic!("AccountMultiproofInput::Legacy was submitted, expected legacy result") + }; #[cfg(feature = "metrics")] self.metrics.record(stats); @@ -235,7 +236,7 @@ impl ParallelProof { leaves_added = stats.leaves_added(), missed_leaves = stats.missed_leaves(), precomputed_storage_roots = stats.precomputed_storage_roots(), - "Calculated decoded proof" + "Calculated decoded proof", ); Ok(multiproof) diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index eb6f892346..076931f48c 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -32,6 +32,8 @@ use crate::{ root::ParallelStateRootError, stats::{ParallelTrieStats, ParallelTrieTracker}, + targets_v2::MultiProofTargetsV2, + value_encoder::AsyncAccountValueEncoder, StorageRootTargets, }; use alloy_primitives::{ @@ -49,11 +51,11 @@ use reth_trie::{ node_iter::{TrieElement, TrieNodeIter}, prefix_set::TriePrefixSets, proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider, StorageProof}, - proof_v2::{self, StorageProofCalculator}, + proof_v2, trie_cursor::{InstrumentedTrieCursor, TrieCursorFactory, TrieCursorMetricsCache}, walker::TrieWalker, - DecodedMultiProof, DecodedStorageMultiProof, HashBuilder, HashedPostState, MultiProofTargets, - Nibbles, ProofTrieNode, TRIE_ACCOUNT_RLP_MAX_SIZE, + DecodedMultiProof, DecodedMultiProofV2, DecodedStorageMultiProof, HashBuilder, HashedPostState, + MultiProofTargets, Nibbles, ProofTrieNode, TRIE_ACCOUNT_RLP_MAX_SIZE, }; use reth_trie_common::{ added_removed_keys::MultiAddedRemovedKeys, @@ -220,7 +222,8 @@ impl ProofWorkerHandle { metrics, #[cfg(feature = "metrics")] cursor_metrics, - ); + ) + .with_v2_proofs(v2_proofs_enabled); if let Err(error) = worker.run() { error!( target: "trie::proof_task", @@ -333,16 +336,12 @@ impl ProofWorkerHandle { ProviderError::other(std::io::Error::other("account workers unavailable")); if let AccountWorkerJob::AccountMultiproof { input } = err.0 { - let AccountMultiproofInput { - proof_result_sender: - ProofResultContext { - sender: result_tx, - sequence_number: seq, - state, - start_time: start, - }, - .. - } = *input; + let ProofResultContext { + sender: result_tx, + sequence_number: seq, + state, + start_time: start, + } = input.into_proof_result_sender(); let _ = result_tx.send(ProofResultMessage { sequence_number: seq, @@ -605,11 +604,65 @@ impl TrieNodeProvider for ProofTaskTrieNodeProvider { /// Result of a multiproof calculation. #[derive(Debug)] -pub struct ProofResult { - /// The account multiproof - pub proof: DecodedMultiProof, - /// Statistics collected during proof computation - pub stats: ParallelTrieStats, +pub enum ProofResult { + /// Legacy multiproof calculation result. + Legacy(DecodedMultiProof, ParallelTrieStats), + /// V2 multiproof calculation result. + V2(DecodedMultiProofV2), +} + +impl ProofResult { + /// Creates an empty [`ProofResult`] of the appropriate variant based on `v2_enabled`. + /// + /// Use this when constructing empty proofs (e.g., for state updates where all targets + /// were already fetched) to ensure consistency with the proof version being used. + pub fn empty(v2_enabled: bool) -> Self { + if v2_enabled { + Self::V2(DecodedMultiProofV2::default()) + } else { + let stats = ParallelTrieTracker::default().finish(); + Self::Legacy(DecodedMultiProof::default(), stats) + } + } + + /// Returns true if the result contains no proofs + pub fn is_empty(&self) -> bool { + match self { + Self::Legacy(proof, _) => proof.is_empty(), + Self::V2(proof) => proof.is_empty(), + } + } + + /// Extends the receiver with the value of the given results. + /// + /// # Panics + /// + /// This method panics if the two [`ProofResult`]s are not the same variant. + pub fn extend(&mut self, other: Self) { + match (self, other) { + (Self::Legacy(proof, _), Self::Legacy(other, _)) => proof.extend(other), + (Self::V2(proof), Self::V2(other)) => proof.extend(other), + _ => panic!("mismatched ProofResults, cannot extend one with the other"), + } + } + + /// Returns the number of account proofs. + pub fn account_proofs_len(&self) -> usize { + match self { + Self::Legacy(proof, _) => proof.account_subtree.len(), + Self::V2(proof) => proof.account_proofs.len(), + } + } + + /// Returns the total number of storage proofs + pub fn storage_proofs_len(&self) -> usize { + match self { + Self::Legacy(proof, _) => { + proof.storages.values().map(|p| p.subtree.len()).sum::() + } + Self::V2(proof) => proof.storage_proofs.values().map(|p| p.len()).sum::(), + } + } } /// Channel used by worker threads to deliver `ProofResultMessage` items back to @@ -889,7 +942,7 @@ where &self, proof_tx: &ProofTaskTx, v2_calculator: Option< - &mut StorageProofCalculator< + &mut proof_v2::StorageProofCalculator< ::StorageTrieCursor<'_>, ::StorageCursor<'_>, >, @@ -1053,6 +1106,8 @@ struct AccountProofWorker { /// Cursor metrics for this worker #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics, + /// Set to true if V2 proofs are enabled. + v2_enabled: bool, } impl AccountProofWorker @@ -1082,9 +1137,16 @@ where metrics, #[cfg(feature = "metrics")] cursor_metrics, + v2_enabled: false, } } + /// Changes whether or not V2 proofs are enabled. + const fn with_v2_proofs(mut self, v2_enabled: bool) -> Self { + self.v2_enabled = v2_enabled; + self + } + /// Runs the worker loop, processing jobs until the channel closes. /// /// # Lifecycle @@ -1117,6 +1179,17 @@ where let mut account_nodes_processed = 0u64; let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default(); + let mut v2_calculator = if self.v2_enabled { + let trie_cursor = proof_tx.provider.account_trie_cursor()?; + let hashed_cursor = proof_tx.provider.hashed_account_cursor()?; + Some(proof_v2::ProofCalculator::<_, _, AsyncAccountValueEncoder>::new( + trie_cursor, + hashed_cursor, + )) + } else { + None + }; + // Count this worker as available only after successful initialization. self.available_workers.fetch_add(1, Ordering::Relaxed); @@ -1128,6 +1201,7 @@ where AccountWorkerJob::AccountMultiproof { input } => { self.process_account_multiproof( &proof_tx, + v2_calculator.as_mut(), *input, &mut account_proofs_processed, &mut cursor_metrics_cache, @@ -1166,26 +1240,18 @@ where Ok(()) } - /// Processes an account multiproof request. - fn process_account_multiproof( + fn compute_legacy_account_multiproof( &self, proof_tx: &ProofTaskTx, - input: AccountMultiproofInput, - account_proofs_processed: &mut u64, - cursor_metrics_cache: &mut ProofTaskCursorMetricsCache, - ) where + targets: MultiProofTargets, + mut prefix_sets: TriePrefixSets, + collect_branch_node_masks: bool, + multi_added_removed_keys: Option>, + proof_cursor_metrics: &mut ProofTaskCursorMetricsCache, + ) -> Result + where Provider: TrieCursorFactory + HashedCursorFactory, { - let AccountMultiproofInput { - targets, - mut prefix_sets, - collect_branch_node_masks, - multi_added_removed_keys, - proof_result_sender: - ProofResultContext { sender: result_tx, sequence_number: seq, state, start_time: start }, - v2_proofs_enabled, - } = input; - let span = debug_span!( target: "trie::proof_task", "Account multiproof calculation", @@ -1199,8 +1265,6 @@ where "Processing account multiproof" ); - let proof_start = Instant::now(); - let mut tracker = ParallelTrieTracker::default(); let mut storage_prefix_sets = std::mem::take(&mut prefix_sets.storage_prefix_sets); @@ -1210,29 +1274,14 @@ where tracker.set_precomputed_storage_roots(storage_root_targets_len as u64); - let storage_proof_receivers = match dispatch_storage_proofs( + let storage_proof_receivers = dispatch_storage_proofs( &self.storage_work_tx, &targets, &mut storage_prefix_sets, collect_branch_node_masks, multi_added_removed_keys.as_ref(), - v2_proofs_enabled, - ) { - Ok(receivers) => receivers, - Err(error) => { - // Send error through result channel - error!(target: "trie::proof_task", "Failed to dispatch storage proofs: {error}"); - let _ = result_tx.send(ProofResultMessage { - sequence_number: seq, - result: Err(error), - elapsed: start.elapsed(), - state, - }); - return; - } - }; + )?; - // Use the missed leaves cache passed from the multiproof manager let account_prefix_set = std::mem::take(&mut prefix_sets.account_prefix_set); let ctx = AccountMultiproofParams { @@ -1244,17 +1293,115 @@ where cached_storage_roots: &self.cached_storage_roots, }; - let result = - build_account_multiproof_with_storage_roots(&proof_tx.provider, ctx, &mut tracker); - - let now = Instant::now(); - let proof_elapsed = now.duration_since(proof_start); - let total_elapsed = now.duration_since(start); - let proof_cursor_metrics = tracker.cursor_metrics; - proof_cursor_metrics.record_spans(); + let result = build_account_multiproof_with_storage_roots( + &proof_tx.provider, + ctx, + &mut tracker, + proof_cursor_metrics, + ); let stats = tracker.finish(); - let result = result.map(|proof| ProofResult { proof, stats }); + result.map(|proof| ProofResult::Legacy(proof, stats)) + } + + fn compute_v2_account_multiproof( + &self, + v2_calculator: &mut proof_v2::ProofCalculator< + ::AccountTrieCursor<'_>, + ::AccountCursor<'_>, + AsyncAccountValueEncoder, + >, + targets: MultiProofTargetsV2, + ) -> Result + where + Provider: TrieCursorFactory + HashedCursorFactory, + { + let MultiProofTargetsV2 { mut account_targets, storage_targets } = targets; + + let span = debug_span!( + target: "trie::proof_task", + "Account V2 multiproof calculation", + account_targets = account_targets.len(), + storage_targets = storage_targets.values().map(|t| t.len()).sum::(), + worker_id = self.worker_id, + ); + let _span_guard = span.enter(); + + trace!(target: "trie::proof_task", "Processing V2 account multiproof"); + + let storage_proof_receivers = + dispatch_v2_storage_proofs(&self.storage_work_tx, &account_targets, storage_targets)?; + + let mut value_encoder = AsyncAccountValueEncoder::new( + self.storage_work_tx.clone(), + storage_proof_receivers, + self.cached_storage_roots.clone(), + ); + + let proof = DecodedMultiProofV2 { + account_proofs: v2_calculator.proof(&mut value_encoder, &mut account_targets)?, + storage_proofs: value_encoder.into_storage_proofs()?, + }; + + Ok(ProofResult::V2(proof)) + } + + /// Processes an account multiproof request. + fn process_account_multiproof( + &self, + proof_tx: &ProofTaskTx, + v2_calculator: Option< + &mut proof_v2::ProofCalculator< + ::AccountTrieCursor<'_>, + ::AccountCursor<'_>, + AsyncAccountValueEncoder, + >, + >, + input: AccountMultiproofInput, + account_proofs_processed: &mut u64, + cursor_metrics_cache: &mut ProofTaskCursorMetricsCache, + ) where + Provider: TrieCursorFactory + HashedCursorFactory, + { + let mut proof_cursor_metrics = ProofTaskCursorMetricsCache::default(); + let proof_start = Instant::now(); + + let (proof_result_sender, result) = match input { + AccountMultiproofInput::Legacy { + targets, + prefix_sets, + collect_branch_node_masks, + multi_added_removed_keys, + proof_result_sender, + } => ( + proof_result_sender, + self.compute_legacy_account_multiproof( + proof_tx, + targets, + prefix_sets, + collect_branch_node_masks, + multi_added_removed_keys, + &mut proof_cursor_metrics, + ), + ), + AccountMultiproofInput::V2 { targets, proof_result_sender } => ( + proof_result_sender, + self.compute_v2_account_multiproof::( + v2_calculator.expect("v2 calculator provided"), + targets, + ), + ), + }; + + let ProofResultContext { + sender: result_tx, + sequence_number: seq, + state, + start_time: start, + } = proof_result_sender; + + let proof_elapsed = proof_start.elapsed(); + let total_elapsed = start.elapsed(); *account_proofs_processed += 1; // Send result to MultiProofTask @@ -1275,6 +1422,8 @@ where ); } + proof_cursor_metrics.record_spans(); + trace!( target: "trie::proof_task", proof_time_us = proof_elapsed.as_micros(), @@ -1355,6 +1504,7 @@ fn build_account_multiproof_with_storage_roots

( provider: &P, ctx: AccountMultiproofParams<'_>, tracker: &mut ParallelTrieTracker, + proof_cursor_metrics: &mut ProofTaskCursorMetricsCache, ) -> Result where P: TrieCursorFactory + HashedCursorFactory, @@ -1362,15 +1512,12 @@ where let accounts_added_removed_keys = ctx.multi_added_removed_keys.as_ref().map(|keys| keys.get_accounts()); - // Create local metrics caches for account cursors. We can't directly use the metrics caches in - // the tracker due to the call to `inc_missed_leaves` which occurs on it. - let mut account_trie_cursor_metrics = TrieCursorMetricsCache::default(); - let mut account_hashed_cursor_metrics = HashedCursorMetricsCache::default(); - // Wrap account trie cursor with instrumented cursor let account_trie_cursor = provider.account_trie_cursor().map_err(ProviderError::Database)?; - let account_trie_cursor = - InstrumentedTrieCursor::new(account_trie_cursor, &mut account_trie_cursor_metrics); + let account_trie_cursor = InstrumentedTrieCursor::new( + account_trie_cursor, + &mut proof_cursor_metrics.account_trie_cursor, + ); // Create the walker. let walker = TrieWalker::<_>::state_trie(account_trie_cursor, ctx.prefix_set) @@ -1397,8 +1544,10 @@ where // Wrap account hashed cursor with instrumented cursor let account_hashed_cursor = provider.hashed_account_cursor().map_err(ProviderError::Database)?; - let account_hashed_cursor = - InstrumentedHashedCursor::new(account_hashed_cursor, &mut account_hashed_cursor_metrics); + let account_hashed_cursor = InstrumentedHashedCursor::new( + account_hashed_cursor, + &mut proof_cursor_metrics.account_hashed_cursor, + ); let mut account_node_iter = TrieNodeIter::state_trie(walker, account_hashed_cursor); @@ -1462,10 +1611,10 @@ where StorageProof::new_hashed(provider, provider, hashed_address) .with_prefix_set_mut(Default::default()) .with_trie_cursor_metrics( - &mut tracker.cursor_metrics.storage_trie_cursor, + &mut proof_cursor_metrics.storage_trie_cursor, ) .with_hashed_cursor_metrics( - &mut tracker.cursor_metrics.storage_hashed_cursor, + &mut proof_cursor_metrics.storage_hashed_cursor, ) .storage_multiproof( ctx.targets @@ -1516,21 +1665,6 @@ where BranchNodeMasksMap::default() }; - // Extend tracker with accumulated metrics from account cursors - tracker.cursor_metrics.account_trie_cursor.extend(&account_trie_cursor_metrics); - tracker.cursor_metrics.account_hashed_cursor.extend(&account_hashed_cursor_metrics); - - // Consume remaining storage proof receivers for accounts not encountered during trie walk. - // Done last to allow storage workers more time to complete while we finalized the account trie. - for (hashed_address, receiver) in storage_proof_receivers { - if let Ok(proof_msg) = receiver.recv() { - let proof_result = proof_msg.result?; - let proof = Into::>::into(proof_result) - .expect("Partial proofs are not yet supported"); - collected_decoded_storages.insert(hashed_address, proof); - } - } - Ok(DecodedMultiProof { account_subtree: decoded_account_subtree, branch_node_masks, @@ -1550,7 +1684,6 @@ fn dispatch_storage_proofs( storage_prefix_sets: &mut B256Map, with_branch_node_masks: bool, multi_added_removed_keys: Option<&Arc>, - use_v2_proofs: bool, ) -> Result>, ParallelStateRootError> { let mut storage_proof_receivers = B256Map::with_capacity_and_hasher(targets.len(), Default::default()); @@ -1564,20 +1697,14 @@ fn dispatch_storage_proofs( let (result_tx, result_rx) = crossbeam_channel::unbounded(); // Create computation input based on V2 flag - let input = if use_v2_proofs { - // Convert target slots to V2 targets - let v2_targets = target_slots.iter().copied().map(Into::into).collect(); - StorageProofInput::new(*hashed_address, v2_targets) - } else { - let prefix_set = storage_prefix_sets.remove(hashed_address).unwrap_or_default(); - StorageProofInput::legacy( - *hashed_address, - prefix_set, - target_slots.clone(), - with_branch_node_masks, - multi_added_removed_keys.cloned(), - ) - }; + let prefix_set = storage_prefix_sets.remove(hashed_address).unwrap_or_default(); + let input = StorageProofInput::legacy( + *hashed_address, + prefix_set, + target_slots.clone(), + with_branch_node_masks, + multi_added_removed_keys.cloned(), + ); // Always dispatch a storage proof so we obtain the storage root even when no slots are // requested. @@ -1595,6 +1722,64 @@ fn dispatch_storage_proofs( Ok(storage_proof_receivers) } + +/// Queues V2 storage proofs for all accounts in the targets and returns receivers. +/// +/// This function queues all storage proof tasks to the worker pool but returns immediately +/// with receivers, allowing the account trie walk to proceed in parallel with storage proof +/// computation. This enables interleaved parallelism for better performance. +/// +/// Propagates errors up if queuing fails. Receivers must be consumed by the caller. +fn dispatch_v2_storage_proofs( + storage_work_tx: &CrossbeamSender, + account_targets: &Vec, + storage_targets: B256Map>, +) -> Result>, ParallelStateRootError> { + let mut storage_proof_receivers = + B256Map::with_capacity_and_hasher(account_targets.len(), Default::default()); + + // Dispatch all proofs for targeted storage slots + for (hashed_address, targets) in storage_targets { + // Create channel for receiving StorageProofResultMessage + let (result_tx, result_rx) = crossbeam_channel::unbounded(); + let input = StorageProofInput::new(hashed_address, targets); + + storage_work_tx + .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx }) + .map_err(|_| { + ParallelStateRootError::Other(format!( + "Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable", + )) + })?; + + storage_proof_receivers.insert(hashed_address, result_rx); + } + + // If there are any targeted accounts which did not have storage targets then we generate a + // single proof target for them so that we get their root. + for target in account_targets { + let hashed_address = target.key(); + if storage_proof_receivers.contains_key(&hashed_address) { + continue + } + + let (result_tx, result_rx) = crossbeam_channel::unbounded(); + let input = StorageProofInput::new(hashed_address, vec![proof_v2::Target::new(B256::ZERO)]); + + storage_work_tx + .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx }) + .map_err(|_| { + ParallelStateRootError::Other(format!( + "Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable", + )) + })?; + + storage_proof_receivers.insert(hashed_address, result_rx); + } + + Ok(storage_proof_receivers) +} + /// Input parameters for storage proof computation. #[derive(Debug)] pub enum StorageProofInput { @@ -1639,7 +1824,7 @@ impl StorageProofInput { } } - /// Creates a new [`StorageProofInput`] with the given hashed address and target slots. + /// Creates a new [`StorageProofInput`] with the given hashed address and target slots. pub const fn new(hashed_address: B256, targets: Vec) -> Self { Self::V2 { hashed_address, targets } } @@ -1655,20 +1840,39 @@ impl StorageProofInput { } /// Input parameters for account multiproof computation. -#[derive(Debug, Clone)] -pub struct AccountMultiproofInput { - /// The targets for which to compute the multiproof. - pub targets: MultiProofTargets, - /// The prefix sets for the proof calculation. - pub prefix_sets: TriePrefixSets, - /// Whether or not to collect branch node masks. - pub collect_branch_node_masks: bool, - /// Provided by the user to give the necessary context to retain extra proofs. - pub multi_added_removed_keys: Option>, - /// Context for sending the proof result. - pub proof_result_sender: ProofResultContext, - /// Whether to use V2 storage proofs. - pub v2_proofs_enabled: bool, +#[derive(Debug)] +pub enum AccountMultiproofInput { + /// Legacy account multiproof proof variant + Legacy { + /// The targets for which to compute the multiproof. + targets: MultiProofTargets, + /// The prefix sets for the proof calculation. + prefix_sets: TriePrefixSets, + /// Whether or not to collect branch node masks. + collect_branch_node_masks: bool, + /// Provided by the user to give the necessary context to retain extra proofs. + multi_added_removed_keys: Option>, + /// Context for sending the proof result. + proof_result_sender: ProofResultContext, + }, + /// V2 account multiproof variant + V2 { + /// The targets for which to compute the multiproof. + targets: MultiProofTargetsV2, + /// Context for sending the proof result. + proof_result_sender: ProofResultContext, + }, +} + +impl AccountMultiproofInput { + /// Returns the [`ProofResultContext`] for this input, consuming the input. + fn into_proof_result_sender(self) -> ProofResultContext { + match self { + Self::Legacy { proof_result_sender, .. } | Self::V2 { proof_result_sender, .. } => { + proof_result_sender + } + } + } } /// Parameters for building an account multiproof with pre-computed storage roots. diff --git a/crates/trie/parallel/src/stats.rs b/crates/trie/parallel/src/stats.rs index 088b95c970..de5b0a628e 100644 --- a/crates/trie/parallel/src/stats.rs +++ b/crates/trie/parallel/src/stats.rs @@ -1,5 +1,3 @@ -#[cfg(feature = "metrics")] -use crate::proof_task_metrics::ProofTaskCursorMetricsCache; use derive_more::Deref; use reth_trie::stats::{TrieStats, TrieTracker}; @@ -36,9 +34,6 @@ pub struct ParallelTrieTracker { trie: TrieTracker, precomputed_storage_roots: u64, missed_leaves: u64, - #[cfg(feature = "metrics")] - /// Local tracking of cursor-related metrics - pub cursor_metrics: ProofTaskCursorMetricsCache, } impl ParallelTrieTracker { diff --git a/crates/trie/parallel/src/value_encoder.rs b/crates/trie/parallel/src/value_encoder.rs index 13c611922d..7b08d3e1b5 100644 --- a/crates/trie/parallel/src/value_encoder.rs +++ b/crates/trie/parallel/src/value_encoder.rs @@ -86,7 +86,6 @@ pub(crate) struct AsyncAccountValueEncoder { impl AsyncAccountValueEncoder { /// Initializes a [`Self`] using a `ProofWorkerHandle` which will be used to calculate storage /// roots asynchronously. - #[expect(dead_code)] pub(crate) fn new( storage_work_tx: CrossbeamSender, dispatched: B256Map>, @@ -106,7 +105,6 @@ impl AsyncAccountValueEncoder { /// /// This method panics if any deferred encoders produced by [`Self::deferred_encoder`] have not /// been dropped. - #[expect(dead_code)] pub(crate) fn into_storage_proofs( self, ) -> Result>, StateProofError> {