diff --git a/crates/engine/primitives/src/config.rs b/crates/engine/primitives/src/config.rs index 0b72e1d624..2870d3dccc 100644 --- a/crates/engine/primitives/src/config.rs +++ b/crates/engine/primitives/src/config.rs @@ -34,11 +34,6 @@ fn default_account_worker_count() -> usize { /// The size of proof targets chunk to spawn in one multiproof calculation. pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 60; -/// The size of proof targets chunk to spawn in one multiproof calculation when V2 proofs are -/// enabled. This is 4x the default chunk size to take advantage of more efficient V2 proof -/// computation. -pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2: usize = DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE * 4; - /// Default number of reserved CPU cores for non-reth processes. /// /// This will be deducted from the thread count of main reth global threadpool. @@ -272,17 +267,6 @@ impl TreeConfig { self.multiproof_chunk_size } - /// Return the multiproof task chunk size, using the V2 default if V2 proofs are enabled - /// and the chunk size is at the default value. - pub const fn effective_multiproof_chunk_size(&self) -> usize { - if self.enable_proof_v2 && self.multiproof_chunk_size == DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE - { - DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2 - } else { - self.multiproof_chunk_size - } - } - /// Return the number of reserved CPU cores for non-reth processes pub const fn reserved_cpu_cores(&self) -> usize { self.reserved_cpu_cores diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 6d61578f63..f606fb1091 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -247,9 +247,6 @@ where let (to_sparse_trie, sparse_trie_rx) = channel(); let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded(); - // Extract V2 proofs flag early so we can pass it to prewarm - let v2_proofs_enabled = config.enable_proof_v2(); - // Handle BAL-based optimization if available let prewarm_handle = if let Some(bal) = bal { // When BAL is present, use BAL prewarming and send BAL to multiproof @@ -266,7 +263,6 @@ where provider_builder.clone(), None, // Don't send proof targets when BAL is present Some(bal), - v2_proofs_enabled, ) } else { // Normal path: spawn with transaction prewarming @@ -277,7 +273,6 @@ where provider_builder.clone(), Some(to_multi_proof.clone()), None, - v2_proofs_enabled, ) }; @@ -285,6 +280,7 @@ where let task_ctx = ProofTaskCtx::new(multiproof_provider_factory); let storage_worker_count = config.storage_worker_count(); let account_worker_count = config.account_worker_count(); + let v2_proofs_enabled = config.enable_proof_v2(); let proof_handle = ProofWorkerHandle::new( self.executor.handle().clone(), task_ctx, @@ -296,13 +292,10 @@ where let multi_proof_task = MultiProofTask::new( proof_handle.clone(), to_sparse_trie, - config - .multiproof_chunking_enabled() - .then_some(config.effective_multiproof_chunk_size()), + config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()), to_multi_proof.clone(), from_multi_proof, - ) - .with_v2_proofs_enabled(v2_proofs_enabled); + ); // spawn multi-proof task let parent_span = span.clone(); @@ -351,9 +344,8 @@ where P: BlockReader + StateProviderFactory + StateReader + Clone + 'static, { let (prewarm_rx, execution_rx, size_hint) = self.spawn_tx_iterator(transactions); - // This path doesn't use multiproof, so V2 proofs flag doesn't matter let prewarm_handle = - self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None, bal, false); + self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None, bal); PayloadHandle { to_multi_proof: None, prewarm_handle, @@ -420,7 +412,6 @@ where } /// Spawn prewarming optionally wired to the multiproof task for target updates. - #[expect(clippy::too_many_arguments)] fn spawn_caching_with

( &self, env: ExecutionEnv, @@ -429,7 +420,6 @@ where provider_builder: StateProviderBuilder, to_multi_proof: Option>, bal: Option>, - v2_proofs_enabled: bool, ) -> CacheTaskHandle where P: BlockReader + StateProviderFactory + StateReader + Clone + 'static, @@ -452,7 +442,6 @@ where terminate_execution: Arc::new(AtomicBool::new(false)), precompile_cache_disabled: self.precompile_cache_disabled, precompile_cache_map: self.precompile_cache_map.clone(), - v2_proofs_enabled, }; let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new( diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index 823c3e54e9..b5f1272b67 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -11,18 +11,14 @@ use reth_metrics::Metrics; use reth_provider::AccountReader; use reth_revm::state::EvmState; use reth_trie::{ - added_removed_keys::MultiAddedRemovedKeys, proof_v2, HashedPostState, HashedStorage, + added_removed_keys::MultiAddedRemovedKeys, DecodedMultiProof, HashedPostState, HashedStorage, MultiProofTargets, }; -#[cfg(test)] -use reth_trie_parallel::stats::ParallelTrieTracker; use reth_trie_parallel::{ proof::ParallelProof, proof_task::{ - AccountMultiproofInput, ProofResult, ProofResultContext, ProofResultMessage, - ProofWorkerHandle, + AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle, }, - targets_v2::{ChunkedMultiProofTargetsV2, MultiProofTargetsV2}, }; use revm_primitives::map::{hash_map, B256Map}; use std::{collections::BTreeMap, sync::Arc, time::Instant}; @@ -67,12 +63,12 @@ const DEFAULT_MAX_TARGETS_FOR_CHUNKING: usize = 300; /// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the /// state. -#[derive(Debug)] +#[derive(Default, Debug)] pub struct SparseTrieUpdate { /// The state update that was used to calculate the proof pub(crate) state: HashedPostState, /// The calculated multiproof - pub(crate) multiproof: ProofResult, + pub(crate) multiproof: DecodedMultiProof, } impl SparseTrieUpdate { @@ -84,11 +80,7 @@ impl SparseTrieUpdate { /// Construct update from multiproof. #[cfg(test)] pub(super) fn from_multiproof(multiproof: reth_trie::MultiProof) -> alloy_rlp::Result { - let stats = ParallelTrieTracker::default().finish(); - Ok(Self { - state: HashedPostState::default(), - multiproof: ProofResult::Legacy(multiproof.try_into()?, stats), - }) + Ok(Self { multiproof: multiproof.try_into()?, ..Default::default() }) } /// Extend update with contents of the other. @@ -102,7 +94,7 @@ impl SparseTrieUpdate { #[derive(Debug)] pub(super) enum MultiProofMessage { /// Prefetch proof targets - PrefetchProofs(VersionedMultiProofTargets), + PrefetchProofs(MultiProofTargets), /// New state update from transaction execution with its source StateUpdate(Source, EvmState), /// State update that can be applied to the sparse trie without any new proofs. @@ -231,155 +223,12 @@ pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostStat hashed_state } -/// Extends a `MultiProofTargets` with the contents of a `VersionedMultiProofTargets`, -/// regardless of which variant the latter is. -fn extend_multiproof_targets(dest: &mut MultiProofTargets, src: &VersionedMultiProofTargets) { - match src { - VersionedMultiProofTargets::Legacy(targets) => { - dest.extend_ref(targets); - } - VersionedMultiProofTargets::V2(targets) => { - // Add all account targets - for target in &targets.account_targets { - dest.entry(target.key()).or_default(); - } - - // Add all storage targets - for (hashed_address, slots) in &targets.storage_targets { - let slot_set = dest.entry(*hashed_address).or_default(); - for slot in slots { - slot_set.insert(slot.key()); - } - } - } - } -} - -/// A set of multiproof targets which can be either in the legacy or V2 representations. -#[derive(Debug)] -pub(super) enum VersionedMultiProofTargets { - /// Legacy targets - Legacy(MultiProofTargets), - /// V2 targets - V2(MultiProofTargetsV2), -} - -impl VersionedMultiProofTargets { - /// Returns true if there are no account or storage targets. - fn is_empty(&self) -> bool { - match self { - Self::Legacy(targets) => targets.is_empty(), - Self::V2(targets) => targets.is_empty(), - } - } - - /// Returns the number of account targets in the multiproof target - fn account_targets_len(&self) -> usize { - match self { - Self::Legacy(targets) => targets.len(), - Self::V2(targets) => targets.account_targets.len(), - } - } - - /// Returns the number of storage targets in the multiproof target - fn storage_targets_len(&self) -> usize { - match self { - Self::Legacy(targets) => targets.values().map(|slots| slots.len()).sum::(), - Self::V2(targets) => { - targets.storage_targets.values().map(|slots| slots.len()).sum::() - } - } - } - - /// Returns the number of accounts in the multiproof targets. - fn len(&self) -> usize { - match self { - Self::Legacy(targets) => targets.len(), - Self::V2(targets) => targets.account_targets.len(), - } - } - - /// Returns the total storage slot count across all accounts. - fn storage_count(&self) -> usize { - match self { - Self::Legacy(targets) => targets.values().map(|slots| slots.len()).sum(), - Self::V2(targets) => targets.storage_targets.values().map(|slots| slots.len()).sum(), - } - } - - /// Returns the number of items that will be considered during chunking. - fn chunking_length(&self) -> usize { - match self { - Self::Legacy(targets) => targets.chunking_length(), - Self::V2(targets) => { - // For V2, count accounts + storage slots - targets.account_targets.len() + - targets.storage_targets.values().map(|slots| slots.len()).sum::() - } - } - } - - /// Retains the targets representing the difference with another `MultiProofTargets`. - /// Removes all targets that are already present in `other`. - fn retain_difference(&mut self, other: &MultiProofTargets) { - match self { - Self::Legacy(targets) => { - targets.retain_difference(other); - } - Self::V2(targets) => { - // Remove account targets that exist in other - targets.account_targets.retain(|target| !other.contains_key(&target.key())); - - // For each account in storage_targets, remove slots that exist in other - targets.storage_targets.retain(|hashed_address, slots| { - if let Some(other_slots) = other.get(hashed_address) { - slots.retain(|slot| !other_slots.contains(&slot.key())); - !slots.is_empty() - } else { - true - } - }); - } - } - } - - /// Extends this `VersionedMultiProofTargets` with the contents of another. - /// - /// Panics if the variants do not match. - fn extend(&mut self, other: Self) { - match (self, other) { - (Self::Legacy(dest), Self::Legacy(src)) => { - dest.extend(src); - } - (Self::V2(dest), Self::V2(src)) => { - dest.account_targets.extend(src.account_targets); - for (addr, slots) in src.storage_targets { - dest.storage_targets.entry(addr).or_default().extend(slots); - } - } - _ => panic!("Cannot extend VersionedMultiProofTargets with mismatched variants"), - } - } - - /// Chunks this `VersionedMultiProofTargets` into smaller chunks of the given size. - fn chunks(self, chunk_size: usize) -> Box> { - match self { - Self::Legacy(targets) => { - Box::new(MultiProofTargets::chunks(targets, chunk_size).map(Self::Legacy)) - } - Self::V2(targets) => { - Box::new(ChunkedMultiProofTargetsV2::new(targets, chunk_size).map(Self::V2)) - } - } - } -} - /// Input parameters for dispatching a multiproof calculation. #[derive(Debug)] struct MultiproofInput { source: Option, hashed_state_update: HashedPostState, - proof_targets: VersionedMultiProofTargets, + proof_targets: MultiProofTargets, proof_sequence_number: u64, state_root_message_sender: CrossbeamSender, multi_added_removed_keys: Option>, @@ -414,6 +263,8 @@ pub struct MultiproofManager { proof_result_tx: CrossbeamSender, /// Metrics metrics: MultiProofTaskMetrics, + /// Whether to use V2 storage proofs + v2_proofs_enabled: bool, } impl MultiproofManager { @@ -427,7 +278,9 @@ impl MultiproofManager { metrics.max_storage_workers.set(proof_worker_handle.total_storage_workers() as f64); metrics.max_account_workers.set(proof_worker_handle.total_account_workers() as f64); - Self { metrics, proof_worker_handle, proof_result_tx } + let v2_proofs_enabled = proof_worker_handle.v2_proofs_enabled(); + + Self { metrics, proof_worker_handle, proof_result_tx, v2_proofs_enabled } } /// Dispatches a new multiproof calculation to worker pools. @@ -472,48 +325,41 @@ impl MultiproofManager { multi_added_removed_keys, } = multiproof_input; + let account_targets = proof_targets.len(); + let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::(); + trace!( target: "engine::tree::payload_processor::multiproof", proof_sequence_number, ?proof_targets, - account_targets = proof_targets.account_targets_len(), - storage_targets = proof_targets.storage_targets_len(), + account_targets, + storage_targets, ?source, "Dispatching multiproof to workers" ); let start = Instant::now(); - // Workers will send ProofResultMessage directly to proof_result_rx - let proof_result_sender = ProofResultContext::new( - self.proof_result_tx.clone(), - proof_sequence_number, - hashed_state_update, - start, - ); - - let input = match proof_targets { - VersionedMultiProofTargets::Legacy(proof_targets) => { - // Extend prefix sets with targets - let frozen_prefix_sets = ParallelProof::extend_prefix_sets_with_targets( - &Default::default(), - &proof_targets, - ); - - AccountMultiproofInput::Legacy { - targets: proof_targets, - prefix_sets: frozen_prefix_sets, - collect_branch_node_masks: true, - multi_added_removed_keys, - proof_result_sender, - } - } - VersionedMultiProofTargets::V2(proof_targets) => { - AccountMultiproofInput::V2 { targets: proof_targets, proof_result_sender } - } - }; + // Extend prefix sets with targets + let frozen_prefix_sets = + ParallelProof::extend_prefix_sets_with_targets(&Default::default(), &proof_targets); // Dispatch account multiproof to worker pool with result sender + let input = AccountMultiproofInput { + targets: proof_targets, + prefix_sets: frozen_prefix_sets, + collect_branch_node_masks: true, + multi_added_removed_keys, + // Workers will send ProofResultMessage directly to proof_result_rx + proof_result_sender: ProofResultContext::new( + self.proof_result_tx.clone(), + proof_sequence_number, + hashed_state_update, + start, + ), + v2_proofs_enabled: self.v2_proofs_enabled, + }; + if let Err(e) = self.proof_worker_handle.dispatch_account_multiproof(input) { error!(target: "engine::tree::payload_processor::multiproof", ?e, "Failed to dispatch account multiproof"); return; @@ -715,9 +561,6 @@ pub(super) struct MultiProofTask { /// there are any active workers and force chunking across workers. This is to prevent tasks /// which are very long from hitting a single worker. max_targets_for_chunking: usize, - /// Whether or not V2 proof calculation is enabled. If enabled then [`MultiProofTargetsV2`] - /// will be produced by state updates. - v2_proofs_enabled: bool, } impl MultiProofTask { @@ -749,16 +592,9 @@ impl MultiProofTask { ), metrics, max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING, - v2_proofs_enabled: false, } } - /// Enables V2 proof target generation on state updates. - pub(super) const fn with_v2_proofs_enabled(mut self, v2_proofs_enabled: bool) -> Self { - self.v2_proofs_enabled = v2_proofs_enabled; - self - } - /// Handles request for proof prefetch. /// /// Returns how many multiproof tasks were dispatched for the prefetch request. @@ -766,29 +602,37 @@ impl MultiProofTask { level = "debug", target = "engine::tree::payload_processor::multiproof", skip_all, - fields(accounts = targets.account_targets_len(), chunks = 0) + fields(accounts = targets.len(), chunks = 0) )] - fn on_prefetch_proof(&mut self, mut targets: VersionedMultiProofTargets) -> u64 { + fn on_prefetch_proof(&mut self, mut targets: MultiProofTargets) -> u64 { // Remove already fetched proof targets to avoid redundant work. targets.retain_difference(&self.fetched_proof_targets); - extend_multiproof_targets(&mut self.fetched_proof_targets, &targets); + self.fetched_proof_targets.extend_ref(&targets); - // For Legacy multiproofs, make sure all target accounts have an `AddedRemovedKeySet` in the + // Make sure all target accounts have an `AddedRemovedKeySet` in the // [`MultiAddedRemovedKeys`]. Even if there are not any known removed keys for the account, // we still want to optimistically fetch extension children for the leaf addition case. - // V2 multiproofs don't need this. - let multi_added_removed_keys = - if let VersionedMultiProofTargets::Legacy(legacy_targets) = &targets { - self.multi_added_removed_keys.touch_accounts(legacy_targets.keys().copied()); - Some(Arc::new(self.multi_added_removed_keys.clone())) - } else { - None - }; + self.multi_added_removed_keys.touch_accounts(targets.keys().copied()); + + // Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks + let multi_added_removed_keys = Arc::new(MultiAddedRemovedKeys { + account: self.multi_added_removed_keys.account.clone(), + storages: targets + .keys() + .filter_map(|account| { + self.multi_added_removed_keys + .storages + .get(account) + .cloned() + .map(|keys| (*account, keys)) + }) + .collect(), + }); self.metrics.prefetch_proof_targets_accounts_histogram.record(targets.len() as f64); self.metrics .prefetch_proof_targets_storages_histogram - .record(targets.storage_count() as f64); + .record(targets.values().map(|slots| slots.len()).sum::() as f64); let chunking_len = targets.chunking_length(); let available_account_workers = @@ -802,7 +646,7 @@ impl MultiProofTask { self.max_targets_for_chunking, available_account_workers, available_storage_workers, - VersionedMultiProofTargets::chunks, + MultiProofTargets::chunks, |proof_targets| { self.multiproof_manager.dispatch(MultiproofInput { source: None, @@ -810,7 +654,7 @@ impl MultiProofTask { proof_targets, proof_sequence_number: self.proof_sequencer.next_sequence(), state_root_message_sender: self.tx.clone(), - multi_added_removed_keys: multi_added_removed_keys.clone(), + multi_added_removed_keys: Some(multi_added_removed_keys.clone()), }); }, ); @@ -913,7 +757,6 @@ impl MultiProofTask { self.multiproof_manager.proof_worker_handle.available_account_workers(); let available_storage_workers = self.multiproof_manager.proof_worker_handle.available_storage_workers(); - let num_chunks = dispatch_with_chunking( not_fetched_state_update, chunking_len, @@ -927,9 +770,8 @@ impl MultiProofTask { &hashed_state_update, &self.fetched_proof_targets, &multi_added_removed_keys, - self.v2_proofs_enabled, ); - extend_multiproof_targets(&mut spawned_proof_targets, &proof_targets); + spawned_proof_targets.extend_ref(&proof_targets); self.multiproof_manager.dispatch(MultiproofInput { source: Some(source), @@ -1029,10 +871,7 @@ impl MultiProofTask { batch_metrics.proofs_processed += 1; if let Some(combined_update) = self.on_proof( sequence_number, - SparseTrieUpdate { - state, - multiproof: ProofResult::empty(self.v2_proofs_enabled), - }, + SparseTrieUpdate { state, multiproof: Default::default() }, ) { let _ = self.to_sparse_trie.send(combined_update); } @@ -1059,7 +898,8 @@ impl MultiProofTask { } let account_targets = merged_targets.len(); - let storage_targets = merged_targets.storage_count(); + let storage_targets = + merged_targets.values().map(|slots| slots.len()).sum::(); batch_metrics.prefetch_proofs_requested += self.on_prefetch_proof(merged_targets); trace!( target: "engine::tree::payload_processor::multiproof", @@ -1163,10 +1003,7 @@ impl MultiProofTask { if let Some(combined_update) = self.on_proof( sequence_number, - SparseTrieUpdate { - state, - multiproof: ProofResult::empty(self.v2_proofs_enabled), - }, + SparseTrieUpdate { state, multiproof: Default::default() }, ) { let _ = self.to_sparse_trie.send(combined_update); } @@ -1269,7 +1106,7 @@ impl MultiProofTask { let update = SparseTrieUpdate { state: proof_result.state, - multiproof: proof_result_data, + multiproof: proof_result_data.proof, }; if let Some(combined_update) = @@ -1359,7 +1196,7 @@ struct MultiproofBatchCtx { /// received. updates_finished_time: Option, /// Reusable buffer for accumulating prefetch targets during batching. - accumulated_prefetch_targets: Vec, + accumulated_prefetch_targets: Vec, } impl MultiproofBatchCtx { @@ -1405,77 +1242,40 @@ fn get_proof_targets( state_update: &HashedPostState, fetched_proof_targets: &MultiProofTargets, multi_added_removed_keys: &MultiAddedRemovedKeys, - v2_enabled: bool, -) -> VersionedMultiProofTargets { - if v2_enabled { - let mut targets = MultiProofTargetsV2::default(); +) -> MultiProofTargets { + let mut targets = MultiProofTargets::default(); - // first collect all new accounts (not previously fetched) - for &hashed_address in state_update.accounts.keys() { - if !fetched_proof_targets.contains_key(&hashed_address) { - targets.account_targets.push(hashed_address.into()); - } + // first collect all new accounts (not previously fetched) + for hashed_address in state_update.accounts.keys() { + if !fetched_proof_targets.contains_key(hashed_address) { + targets.insert(*hashed_address, HashSet::default()); } - - // then process storage slots for all accounts in the state update - for (hashed_address, storage) in &state_update.storages { - let fetched = fetched_proof_targets.get(hashed_address); - - // If the storage is wiped, we still need to fetch the account proof. - if storage.wiped && fetched.is_none() { - targets.account_targets.push(Into::::into(*hashed_address)); - continue - } - - let changed_slots = storage - .storage - .keys() - .filter(|slot| !fetched.is_some_and(|f| f.contains(*slot))) - .map(|slot| Into::::into(*slot)) - .collect::>(); - - if !changed_slots.is_empty() { - targets.account_targets.push((*hashed_address).into()); - targets.storage_targets.insert(*hashed_address, changed_slots); - } - } - - VersionedMultiProofTargets::V2(targets) - } else { - let mut targets = MultiProofTargets::default(); - - // first collect all new accounts (not previously fetched) - for hashed_address in state_update.accounts.keys() { - if !fetched_proof_targets.contains_key(hashed_address) { - targets.insert(*hashed_address, HashSet::default()); - } - } - - // then process storage slots for all accounts in the state update - for (hashed_address, storage) in &state_update.storages { - let fetched = fetched_proof_targets.get(hashed_address); - let storage_added_removed_keys = multi_added_removed_keys.get_storage(hashed_address); - let mut changed_slots = storage - .storage - .keys() - .filter(|slot| { - !fetched.is_some_and(|f| f.contains(*slot)) || - storage_added_removed_keys.is_some_and(|k| k.is_removed(slot)) - }) - .peekable(); - - // If the storage is wiped, we still need to fetch the account proof. - if storage.wiped && fetched.is_none() { - targets.entry(*hashed_address).or_default(); - } - - if changed_slots.peek().is_some() { - targets.entry(*hashed_address).or_default().extend(changed_slots); - } - } - - VersionedMultiProofTargets::Legacy(targets) } + + // then process storage slots for all accounts in the state update + for (hashed_address, storage) in &state_update.storages { + let fetched = fetched_proof_targets.get(hashed_address); + let storage_added_removed_keys = multi_added_removed_keys.get_storage(hashed_address); + let mut changed_slots = storage + .storage + .keys() + .filter(|slot| { + !fetched.is_some_and(|f| f.contains(*slot)) || + storage_added_removed_keys.is_some_and(|k| k.is_removed(slot)) + }) + .peekable(); + + // If the storage is wiped, we still need to fetch the account proof. + if storage.wiped && fetched.is_none() { + targets.entry(*hashed_address).or_default(); + } + + if changed_slots.peek().is_some() { + targets.entry(*hashed_address).or_default().extend(changed_slots); + } + } + + targets } /// Dispatches work items as a single unit or in chunks based on target size and worker @@ -1681,24 +1481,12 @@ mod tests { state } - fn unwrap_legacy_targets(targets: VersionedMultiProofTargets) -> MultiProofTargets { - match targets { - VersionedMultiProofTargets::Legacy(targets) => targets, - VersionedMultiProofTargets::V2(_) => panic!("Expected Legacy targets"), - } - } - #[test] fn test_get_proof_targets_new_account_targets() { let state = create_get_proof_targets_state(); let fetched = MultiProofTargets::default(); - let targets = unwrap_legacy_targets(get_proof_targets( - &state, - &fetched, - &MultiAddedRemovedKeys::new(), - false, - )); + let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new()); // should return all accounts as targets since nothing was fetched before assert_eq!(targets.len(), state.accounts.len()); @@ -1712,12 +1500,7 @@ mod tests { let state = create_get_proof_targets_state(); let fetched = MultiProofTargets::default(); - let targets = unwrap_legacy_targets(get_proof_targets( - &state, - &fetched, - &MultiAddedRemovedKeys::new(), - false, - )); + let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new()); // verify storage slots are included for accounts with storage for (addr, storage) in &state.storages { @@ -1745,12 +1528,7 @@ mod tests { // mark the account as already fetched fetched.insert(*fetched_addr, HashSet::default()); - let targets = unwrap_legacy_targets(get_proof_targets( - &state, - &fetched, - &MultiAddedRemovedKeys::new(), - false, - )); + let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new()); // should not include the already fetched account since it has no storage updates assert!(!targets.contains_key(fetched_addr)); @@ -1770,12 +1548,7 @@ mod tests { fetched_slots.insert(fetched_slot); fetched.insert(*addr, fetched_slots); - let targets = unwrap_legacy_targets(get_proof_targets( - &state, - &fetched, - &MultiAddedRemovedKeys::new(), - false, - )); + let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new()); // should not include the already fetched storage slot let target_slots = &targets[addr]; @@ -1788,12 +1561,7 @@ mod tests { let state = HashedPostState::default(); let fetched = MultiProofTargets::default(); - let targets = unwrap_legacy_targets(get_proof_targets( - &state, - &fetched, - &MultiAddedRemovedKeys::new(), - false, - )); + let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new()); assert!(targets.is_empty()); } @@ -1820,12 +1588,7 @@ mod tests { fetched_slots.insert(slot1); fetched.insert(addr1, fetched_slots); - let targets = unwrap_legacy_targets(get_proof_targets( - &state, - &fetched, - &MultiAddedRemovedKeys::new(), - false, - )); + let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new()); assert!(targets.contains_key(&addr2)); assert!(!targets[&addr1].contains(&slot1)); @@ -1851,12 +1614,7 @@ mod tests { assert!(!state.accounts.contains_key(&addr)); assert!(!fetched.contains_key(&addr)); - let targets = unwrap_legacy_targets(get_proof_targets( - &state, - &fetched, - &MultiAddedRemovedKeys::new(), - false, - )); + let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new()); // verify that we still get the storage slots for the unmodified account assert!(targets.contains_key(&addr)); @@ -1898,12 +1656,7 @@ mod tests { removed_state.storages.insert(addr, removed_storage); multi_added_removed_keys.update_with_state(&removed_state); - let targets = unwrap_legacy_targets(get_proof_targets( - &state, - &fetched, - &multi_added_removed_keys, - false, - )); + let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys); // slot1 should be included despite being fetched, because it's marked as removed assert!(targets.contains_key(&addr)); @@ -1930,12 +1683,7 @@ mod tests { storage.storage.insert(slot1, U256::from(100)); state.storages.insert(addr, storage); - let targets = unwrap_legacy_targets(get_proof_targets( - &state, - &fetched, - &multi_added_removed_keys, - false, - )); + let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys); // account should be included because storage is wiped and account wasn't fetched assert!(targets.contains_key(&addr)); @@ -1978,12 +1726,7 @@ mod tests { removed_state.storages.insert(addr, removed_storage); multi_added_removed_keys.update_with_state(&removed_state); - let targets = unwrap_legacy_targets(get_proof_targets( - &state, - &fetched, - &multi_added_removed_keys, - false, - )); + let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys); // only slots in the state update can be included, so slot3 should not appear assert!(!targets.contains_key(&addr)); @@ -2010,12 +1753,9 @@ mod tests { targets3.insert(addr3, HashSet::default()); let tx = task.tx.clone(); - tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets1))) - .unwrap(); - tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets2))) - .unwrap(); - tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets3))) - .unwrap(); + tx.send(MultiProofMessage::PrefetchProofs(targets1)).unwrap(); + tx.send(MultiProofMessage::PrefetchProofs(targets2)).unwrap(); + tx.send(MultiProofMessage::PrefetchProofs(targets3)).unwrap(); let proofs_requested = if let Ok(MultiProofMessage::PrefetchProofs(targets)) = task.rx.recv() { @@ -2029,12 +1769,11 @@ mod tests { assert_eq!(num_batched, 3); assert_eq!(merged_targets.len(), 3); - let legacy_targets = unwrap_legacy_targets(merged_targets); - assert!(legacy_targets.contains_key(&addr1)); - assert!(legacy_targets.contains_key(&addr2)); - assert!(legacy_targets.contains_key(&addr3)); + assert!(merged_targets.contains_key(&addr1)); + assert!(merged_targets.contains_key(&addr2)); + assert!(merged_targets.contains_key(&addr3)); - task.on_prefetch_proof(VersionedMultiProofTargets::Legacy(legacy_targets)) + task.on_prefetch_proof(merged_targets) } else { panic!("Expected PrefetchProofs message"); }; @@ -2109,16 +1848,11 @@ mod tests { // Queue: [PrefetchProofs1, PrefetchProofs2, StateUpdate1, StateUpdate2, PrefetchProofs3] let tx = task.tx.clone(); - tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets1))) - .unwrap(); - tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets2))) - .unwrap(); + tx.send(MultiProofMessage::PrefetchProofs(targets1)).unwrap(); + tx.send(MultiProofMessage::PrefetchProofs(targets2)).unwrap(); tx.send(MultiProofMessage::StateUpdate(source.into(), state_update1)).unwrap(); tx.send(MultiProofMessage::StateUpdate(source.into(), state_update2)).unwrap(); - tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy( - targets3.clone(), - ))) - .unwrap(); + tx.send(MultiProofMessage::PrefetchProofs(targets3.clone())).unwrap(); // Step 1: Receive and batch PrefetchProofs (should get targets1 + targets2) let mut pending_msg: Option = None; @@ -2144,10 +1878,9 @@ mod tests { // Should have batched exactly 2 PrefetchProofs (not 3!) assert_eq!(num_batched, 2, "Should batch only until different message type"); assert_eq!(merged_targets.len(), 2); - let legacy_targets = unwrap_legacy_targets(merged_targets); - assert!(legacy_targets.contains_key(&addr1)); - assert!(legacy_targets.contains_key(&addr2)); - assert!(!legacy_targets.contains_key(&addr3), "addr3 should NOT be in first batch"); + assert!(merged_targets.contains_key(&addr1)); + assert!(merged_targets.contains_key(&addr2)); + assert!(!merged_targets.contains_key(&addr3), "addr3 should NOT be in first batch"); } else { panic!("Expected PrefetchProofs message"); } @@ -2172,8 +1905,7 @@ mod tests { match task.rx.try_recv() { Ok(MultiProofMessage::PrefetchProofs(targets)) => { assert_eq!(targets.len(), 1); - let legacy_targets = unwrap_legacy_targets(targets); - assert!(legacy_targets.contains_key(&addr3)); + assert!(targets.contains_key(&addr3)); } _ => panic!("PrefetchProofs3 was lost!"), } @@ -2219,13 +1951,9 @@ mod tests { let source = StateChangeSource::Transaction(99); let tx = task.tx.clone(); - tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(prefetch1))) - .unwrap(); + tx.send(MultiProofMessage::PrefetchProofs(prefetch1)).unwrap(); tx.send(MultiProofMessage::StateUpdate(source.into(), state_update)).unwrap(); - tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy( - prefetch2.clone(), - ))) - .unwrap(); + tx.send(MultiProofMessage::PrefetchProofs(prefetch2.clone())).unwrap(); let mut ctx = MultiproofBatchCtx::new(Instant::now()); let mut batch_metrics = MultiproofBatchMetrics::default(); @@ -2258,8 +1986,7 @@ mod tests { match task.rx.try_recv() { Ok(MultiProofMessage::PrefetchProofs(targets)) => { assert_eq!(targets.len(), 1); - let legacy_targets = unwrap_legacy_targets(targets); - assert!(legacy_targets.contains_key(&prefetch_addr2)); + assert!(targets.contains_key(&prefetch_addr2)); } other => panic!("Expected PrefetchProofs2 in channel, got {:?}", other), } diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index 1083450549..6021098627 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -16,7 +16,7 @@ use crate::tree::{ payload_processor::{ bal::{total_slots, BALSlotIter}, executor::WorkloadExecutor, - multiproof::{MultiProofMessage, VersionedMultiProofTargets}, + multiproof::MultiProofMessage, ExecutionCache as PayloadExecutionCache, }, precompile_cache::{CachedPrecompile, PrecompileCacheMap}, @@ -237,7 +237,7 @@ where } /// If configured and the tx returned proof targets, emit the targets the transaction produced - fn send_multi_proof_targets(&self, targets: Option) { + fn send_multi_proof_targets(&self, targets: Option) { if self.is_execution_terminated() { // if execution is already terminated then we dont need to send more proof fetch // messages @@ -479,8 +479,6 @@ where pub(super) terminate_execution: Arc, pub(super) precompile_cache_disabled: bool, pub(super) precompile_cache_map: PrecompileCacheMap>, - /// Whether V2 proof calculation is enabled. - pub(super) v2_proofs_enabled: bool, } impl PrewarmContext @@ -489,12 +487,10 @@ where P: BlockReader + StateProviderFactory + StateReader + Clone + 'static, Evm: ConfigureEvm + 'static, { - /// Splits this context into an evm, an evm config, metrics, the atomic bool for terminating - /// execution, and whether V2 proofs are enabled. + /// Splits this context into an evm, an evm config, metrics, and the atomic bool for terminating + /// execution. #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)] - fn evm_for_ctx( - self, - ) -> Option<(EvmFor, PrewarmMetrics, Arc, bool)> { + fn evm_for_ctx(self) -> Option<(EvmFor, PrewarmMetrics, Arc)> { let Self { env, evm_config, @@ -504,7 +500,6 @@ where terminate_execution, precompile_cache_disabled, precompile_cache_map, - v2_proofs_enabled, } = self; let mut state_provider = match provider.build() { @@ -554,7 +549,7 @@ where }); } - Some((evm, metrics, terminate_execution, v2_proofs_enabled)) + Some((evm, metrics, terminate_execution)) } /// Accepts an [`mpsc::Receiver`] of transactions and a handle to prewarm task. Executes @@ -575,10 +570,7 @@ where ) where Tx: ExecutableTxFor, { - let Some((mut evm, metrics, terminate_execution, v2_proofs_enabled)) = self.evm_for_ctx() - else { - return - }; + let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return }; while let Ok(IndexedTransaction { index, tx }) = { let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "recv tx") @@ -641,8 +633,7 @@ where let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash()) .entered(); - let (targets, storage_targets) = - multiproof_targets_from_state(res.state, v2_proofs_enabled); + let (targets, storage_targets) = multiproof_targets_from_state(res.state); metrics.prefetch_storage_targets.record(storage_targets as f64); let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) }); drop(_enter); @@ -787,22 +778,9 @@ where } } -/// Returns a set of [`VersionedMultiProofTargets`] and the total amount of storage targets, based -/// on the given state. -fn multiproof_targets_from_state( - state: EvmState, - v2_enabled: bool, -) -> (VersionedMultiProofTargets, usize) { - if v2_enabled { - multiproof_targets_v2_from_state(state) - } else { - multiproof_targets_legacy_from_state(state) - } -} - -/// Returns legacy [`MultiProofTargets`] and the total amount of storage targets, based on the +/// Returns a set of [`MultiProofTargets`] and the total amount of storage targets, based on the /// given state. -fn multiproof_targets_legacy_from_state(state: EvmState) -> (VersionedMultiProofTargets, usize) { +fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) { let mut targets = MultiProofTargets::with_capacity(state.len()); let mut storage_targets = 0; for (addr, account) in state { @@ -832,50 +810,7 @@ fn multiproof_targets_legacy_from_state(state: EvmState) -> (VersionedMultiProof targets.insert(keccak256(addr), storage_set); } - (VersionedMultiProofTargets::Legacy(targets), storage_targets) -} - -/// Returns V2 [`reth_trie_parallel::targets_v2::MultiProofTargetsV2`] and the total amount of -/// storage targets, based on the given state. -fn multiproof_targets_v2_from_state(state: EvmState) -> (VersionedMultiProofTargets, usize) { - use reth_trie::proof_v2; - use reth_trie_parallel::targets_v2::MultiProofTargetsV2; - - let mut targets = MultiProofTargetsV2::default(); - let mut storage_target_count = 0; - for (addr, account) in state { - // if the account was not touched, or if the account was selfdestructed, do not - // fetch proofs for it - // - // Since selfdestruct can only happen in the same transaction, we can skip - // prefetching proofs for selfdestructed accounts - // - // See: https://eips.ethereum.org/EIPS/eip-6780 - if !account.is_touched() || account.is_selfdestructed() { - continue - } - - let hashed_address = keccak256(addr); - targets.account_targets.push(hashed_address.into()); - - let mut storage_slots = Vec::with_capacity(account.storage.len()); - for (key, slot) in account.storage { - // do nothing if unchanged - if !slot.is_changed() { - continue - } - - let hashed_slot = keccak256(B256::new(key.to_be_bytes())); - storage_slots.push(proof_v2::Target::from(hashed_slot)); - } - - storage_target_count += storage_slots.len(); - if !storage_slots.is_empty() { - targets.storage_targets.insert(hashed_address, storage_slots); - } - } - - (VersionedMultiProofTargets::V2(targets), storage_target_count) + (targets, storage_targets) } /// The events the pre-warm task can handle. @@ -900,7 +835,7 @@ pub(super) enum PrewarmTaskEvent { /// The outcome of a pre-warm task Outcome { /// The prepared proof targets based on the evm state outcome - proof_targets: Option, + proof_targets: Option, }, /// Finished executing all transactions FinishedTxExecution { diff --git a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs index 052fd8672b..b4c150cfa9 100644 --- a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs +++ b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs @@ -4,7 +4,7 @@ use crate::tree::payload_processor::multiproof::{MultiProofTaskMetrics, SparseTr use alloy_primitives::B256; use rayon::iter::{ParallelBridge, ParallelIterator}; use reth_trie::{updates::TrieUpdates, Nibbles}; -use reth_trie_parallel::{proof_task::ProofResult, root::ParallelStateRootError}; +use reth_trie_parallel::root::ParallelStateRootError; use reth_trie_sparse::{ errors::{SparseStateTrieResult, SparseTrieErrorKind}, provider::{TrieNodeProvider, TrieNodeProviderFactory}, @@ -97,8 +97,8 @@ where debug!( target: "engine::root", num_updates, - account_proofs = update.multiproof.account_proofs_len(), - storage_proofs = update.multiproof.storage_proofs_len(), + account_proofs = update.multiproof.account_subtree.len(), + storage_proofs = update.multiproof.storages.len(), "Updating sparse trie" ); @@ -157,14 +157,7 @@ where let started_at = Instant::now(); // Reveal new accounts and storage slots. - match multiproof { - ProofResult::Legacy(decoded, _) => { - trie.reveal_decoded_multiproof(decoded)?; - } - ProofResult::V2(decoded_v2) => { - trie.reveal_decoded_multiproof_v2(decoded_v2)?; - } - } + trie.reveal_decoded_multiproof(multiproof)?; let reveal_multiproof_elapsed = started_at.elapsed(); trace!( target: "engine::root::sparse", diff --git a/crates/trie/parallel/Cargo.toml b/crates/trie/parallel/Cargo.toml index 812dd2b85b..d64f2dfb51 100644 --- a/crates/trie/parallel/Cargo.toml +++ b/crates/trie/parallel/Cargo.toml @@ -13,8 +13,8 @@ workspace = true [dependencies] # reth -reth-primitives-traits.workspace = true reth-execution-errors.workspace = true +reth-primitives-traits.workspace = true reth-provider.workspace = true reth-storage-errors.workspace = true reth-trie-common.workspace = true diff --git a/crates/trie/parallel/src/proof.rs b/crates/trie/parallel/src/proof.rs index d42534c271..7bf936bad3 100644 --- a/crates/trie/parallel/src/proof.rs +++ b/crates/trie/parallel/src/proof.rs @@ -197,7 +197,7 @@ impl ParallelProof { let (result_tx, result_rx) = crossbeam_unbounded(); let account_multiproof_start_time = Instant::now(); - let input = AccountMultiproofInput::Legacy { + let input = AccountMultiproofInput { targets, prefix_sets, collect_branch_node_masks: self.collect_branch_node_masks, @@ -208,6 +208,7 @@ impl ParallelProof { HashedPostState::default(), account_multiproof_start_time, ), + v2_proofs_enabled: self.v2_proofs_enabled, }; self.proof_worker_handle @@ -221,9 +222,7 @@ impl ParallelProof { ) })?; - let ProofResult::Legacy(multiproof, stats) = proof_result_msg.result? else { - panic!("AccountMultiproofInput::Legacy was submitted, expected legacy result") - }; + let ProofResult { proof: multiproof, stats } = proof_result_msg.result?; #[cfg(feature = "metrics")] self.metrics.record(stats); @@ -236,7 +235,7 @@ impl ParallelProof { leaves_added = stats.leaves_added(), missed_leaves = stats.missed_leaves(), precomputed_storage_roots = stats.precomputed_storage_roots(), - "Calculated decoded proof", + "Calculated decoded proof" ); Ok(multiproof) diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index 076931f48c..eb6f892346 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -32,8 +32,6 @@ use crate::{ root::ParallelStateRootError, stats::{ParallelTrieStats, ParallelTrieTracker}, - targets_v2::MultiProofTargetsV2, - value_encoder::AsyncAccountValueEncoder, StorageRootTargets, }; use alloy_primitives::{ @@ -51,11 +49,11 @@ use reth_trie::{ node_iter::{TrieElement, TrieNodeIter}, prefix_set::TriePrefixSets, proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider, StorageProof}, - proof_v2, + proof_v2::{self, StorageProofCalculator}, trie_cursor::{InstrumentedTrieCursor, TrieCursorFactory, TrieCursorMetricsCache}, walker::TrieWalker, - DecodedMultiProof, DecodedMultiProofV2, DecodedStorageMultiProof, HashBuilder, HashedPostState, - MultiProofTargets, Nibbles, ProofTrieNode, TRIE_ACCOUNT_RLP_MAX_SIZE, + DecodedMultiProof, DecodedStorageMultiProof, HashBuilder, HashedPostState, MultiProofTargets, + Nibbles, ProofTrieNode, TRIE_ACCOUNT_RLP_MAX_SIZE, }; use reth_trie_common::{ added_removed_keys::MultiAddedRemovedKeys, @@ -222,8 +220,7 @@ impl ProofWorkerHandle { metrics, #[cfg(feature = "metrics")] cursor_metrics, - ) - .with_v2_proofs(v2_proofs_enabled); + ); if let Err(error) = worker.run() { error!( target: "trie::proof_task", @@ -336,12 +333,16 @@ impl ProofWorkerHandle { ProviderError::other(std::io::Error::other("account workers unavailable")); if let AccountWorkerJob::AccountMultiproof { input } = err.0 { - let ProofResultContext { - sender: result_tx, - sequence_number: seq, - state, - start_time: start, - } = input.into_proof_result_sender(); + let AccountMultiproofInput { + proof_result_sender: + ProofResultContext { + sender: result_tx, + sequence_number: seq, + state, + start_time: start, + }, + .. + } = *input; let _ = result_tx.send(ProofResultMessage { sequence_number: seq, @@ -604,65 +605,11 @@ impl TrieNodeProvider for ProofTaskTrieNodeProvider { /// Result of a multiproof calculation. #[derive(Debug)] -pub enum ProofResult { - /// Legacy multiproof calculation result. - Legacy(DecodedMultiProof, ParallelTrieStats), - /// V2 multiproof calculation result. - V2(DecodedMultiProofV2), -} - -impl ProofResult { - /// Creates an empty [`ProofResult`] of the appropriate variant based on `v2_enabled`. - /// - /// Use this when constructing empty proofs (e.g., for state updates where all targets - /// were already fetched) to ensure consistency with the proof version being used. - pub fn empty(v2_enabled: bool) -> Self { - if v2_enabled { - Self::V2(DecodedMultiProofV2::default()) - } else { - let stats = ParallelTrieTracker::default().finish(); - Self::Legacy(DecodedMultiProof::default(), stats) - } - } - - /// Returns true if the result contains no proofs - pub fn is_empty(&self) -> bool { - match self { - Self::Legacy(proof, _) => proof.is_empty(), - Self::V2(proof) => proof.is_empty(), - } - } - - /// Extends the receiver with the value of the given results. - /// - /// # Panics - /// - /// This method panics if the two [`ProofResult`]s are not the same variant. - pub fn extend(&mut self, other: Self) { - match (self, other) { - (Self::Legacy(proof, _), Self::Legacy(other, _)) => proof.extend(other), - (Self::V2(proof), Self::V2(other)) => proof.extend(other), - _ => panic!("mismatched ProofResults, cannot extend one with the other"), - } - } - - /// Returns the number of account proofs. - pub fn account_proofs_len(&self) -> usize { - match self { - Self::Legacy(proof, _) => proof.account_subtree.len(), - Self::V2(proof) => proof.account_proofs.len(), - } - } - - /// Returns the total number of storage proofs - pub fn storage_proofs_len(&self) -> usize { - match self { - Self::Legacy(proof, _) => { - proof.storages.values().map(|p| p.subtree.len()).sum::() - } - Self::V2(proof) => proof.storage_proofs.values().map(|p| p.len()).sum::(), - } - } +pub struct ProofResult { + /// The account multiproof + pub proof: DecodedMultiProof, + /// Statistics collected during proof computation + pub stats: ParallelTrieStats, } /// Channel used by worker threads to deliver `ProofResultMessage` items back to @@ -942,7 +889,7 @@ where &self, proof_tx: &ProofTaskTx, v2_calculator: Option< - &mut proof_v2::StorageProofCalculator< + &mut StorageProofCalculator< ::StorageTrieCursor<'_>, ::StorageCursor<'_>, >, @@ -1106,8 +1053,6 @@ struct AccountProofWorker { /// Cursor metrics for this worker #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics, - /// Set to true if V2 proofs are enabled. - v2_enabled: bool, } impl AccountProofWorker @@ -1137,16 +1082,9 @@ where metrics, #[cfg(feature = "metrics")] cursor_metrics, - v2_enabled: false, } } - /// Changes whether or not V2 proofs are enabled. - const fn with_v2_proofs(mut self, v2_enabled: bool) -> Self { - self.v2_enabled = v2_enabled; - self - } - /// Runs the worker loop, processing jobs until the channel closes. /// /// # Lifecycle @@ -1179,17 +1117,6 @@ where let mut account_nodes_processed = 0u64; let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default(); - let mut v2_calculator = if self.v2_enabled { - let trie_cursor = proof_tx.provider.account_trie_cursor()?; - let hashed_cursor = proof_tx.provider.hashed_account_cursor()?; - Some(proof_v2::ProofCalculator::<_, _, AsyncAccountValueEncoder>::new( - trie_cursor, - hashed_cursor, - )) - } else { - None - }; - // Count this worker as available only after successful initialization. self.available_workers.fetch_add(1, Ordering::Relaxed); @@ -1201,7 +1128,6 @@ where AccountWorkerJob::AccountMultiproof { input } => { self.process_account_multiproof( &proof_tx, - v2_calculator.as_mut(), *input, &mut account_proofs_processed, &mut cursor_metrics_cache, @@ -1240,18 +1166,26 @@ where Ok(()) } - fn compute_legacy_account_multiproof( + /// Processes an account multiproof request. + fn process_account_multiproof( &self, proof_tx: &ProofTaskTx, - targets: MultiProofTargets, - mut prefix_sets: TriePrefixSets, - collect_branch_node_masks: bool, - multi_added_removed_keys: Option>, - proof_cursor_metrics: &mut ProofTaskCursorMetricsCache, - ) -> Result - where + input: AccountMultiproofInput, + account_proofs_processed: &mut u64, + cursor_metrics_cache: &mut ProofTaskCursorMetricsCache, + ) where Provider: TrieCursorFactory + HashedCursorFactory, { + let AccountMultiproofInput { + targets, + mut prefix_sets, + collect_branch_node_masks, + multi_added_removed_keys, + proof_result_sender: + ProofResultContext { sender: result_tx, sequence_number: seq, state, start_time: start }, + v2_proofs_enabled, + } = input; + let span = debug_span!( target: "trie::proof_task", "Account multiproof calculation", @@ -1265,6 +1199,8 @@ where "Processing account multiproof" ); + let proof_start = Instant::now(); + let mut tracker = ParallelTrieTracker::default(); let mut storage_prefix_sets = std::mem::take(&mut prefix_sets.storage_prefix_sets); @@ -1274,14 +1210,29 @@ where tracker.set_precomputed_storage_roots(storage_root_targets_len as u64); - let storage_proof_receivers = dispatch_storage_proofs( + let storage_proof_receivers = match dispatch_storage_proofs( &self.storage_work_tx, &targets, &mut storage_prefix_sets, collect_branch_node_masks, multi_added_removed_keys.as_ref(), - )?; + v2_proofs_enabled, + ) { + Ok(receivers) => receivers, + Err(error) => { + // Send error through result channel + error!(target: "trie::proof_task", "Failed to dispatch storage proofs: {error}"); + let _ = result_tx.send(ProofResultMessage { + sequence_number: seq, + result: Err(error), + elapsed: start.elapsed(), + state, + }); + return; + } + }; + // Use the missed leaves cache passed from the multiproof manager let account_prefix_set = std::mem::take(&mut prefix_sets.account_prefix_set); let ctx = AccountMultiproofParams { @@ -1293,115 +1244,17 @@ where cached_storage_roots: &self.cached_storage_roots, }; - let result = build_account_multiproof_with_storage_roots( - &proof_tx.provider, - ctx, - &mut tracker, - proof_cursor_metrics, - ); + let result = + build_account_multiproof_with_storage_roots(&proof_tx.provider, ctx, &mut tracker); + + let now = Instant::now(); + let proof_elapsed = now.duration_since(proof_start); + let total_elapsed = now.duration_since(start); + let proof_cursor_metrics = tracker.cursor_metrics; + proof_cursor_metrics.record_spans(); let stats = tracker.finish(); - result.map(|proof| ProofResult::Legacy(proof, stats)) - } - - fn compute_v2_account_multiproof( - &self, - v2_calculator: &mut proof_v2::ProofCalculator< - ::AccountTrieCursor<'_>, - ::AccountCursor<'_>, - AsyncAccountValueEncoder, - >, - targets: MultiProofTargetsV2, - ) -> Result - where - Provider: TrieCursorFactory + HashedCursorFactory, - { - let MultiProofTargetsV2 { mut account_targets, storage_targets } = targets; - - let span = debug_span!( - target: "trie::proof_task", - "Account V2 multiproof calculation", - account_targets = account_targets.len(), - storage_targets = storage_targets.values().map(|t| t.len()).sum::(), - worker_id = self.worker_id, - ); - let _span_guard = span.enter(); - - trace!(target: "trie::proof_task", "Processing V2 account multiproof"); - - let storage_proof_receivers = - dispatch_v2_storage_proofs(&self.storage_work_tx, &account_targets, storage_targets)?; - - let mut value_encoder = AsyncAccountValueEncoder::new( - self.storage_work_tx.clone(), - storage_proof_receivers, - self.cached_storage_roots.clone(), - ); - - let proof = DecodedMultiProofV2 { - account_proofs: v2_calculator.proof(&mut value_encoder, &mut account_targets)?, - storage_proofs: value_encoder.into_storage_proofs()?, - }; - - Ok(ProofResult::V2(proof)) - } - - /// Processes an account multiproof request. - fn process_account_multiproof( - &self, - proof_tx: &ProofTaskTx, - v2_calculator: Option< - &mut proof_v2::ProofCalculator< - ::AccountTrieCursor<'_>, - ::AccountCursor<'_>, - AsyncAccountValueEncoder, - >, - >, - input: AccountMultiproofInput, - account_proofs_processed: &mut u64, - cursor_metrics_cache: &mut ProofTaskCursorMetricsCache, - ) where - Provider: TrieCursorFactory + HashedCursorFactory, - { - let mut proof_cursor_metrics = ProofTaskCursorMetricsCache::default(); - let proof_start = Instant::now(); - - let (proof_result_sender, result) = match input { - AccountMultiproofInput::Legacy { - targets, - prefix_sets, - collect_branch_node_masks, - multi_added_removed_keys, - proof_result_sender, - } => ( - proof_result_sender, - self.compute_legacy_account_multiproof( - proof_tx, - targets, - prefix_sets, - collect_branch_node_masks, - multi_added_removed_keys, - &mut proof_cursor_metrics, - ), - ), - AccountMultiproofInput::V2 { targets, proof_result_sender } => ( - proof_result_sender, - self.compute_v2_account_multiproof::( - v2_calculator.expect("v2 calculator provided"), - targets, - ), - ), - }; - - let ProofResultContext { - sender: result_tx, - sequence_number: seq, - state, - start_time: start, - } = proof_result_sender; - - let proof_elapsed = proof_start.elapsed(); - let total_elapsed = start.elapsed(); + let result = result.map(|proof| ProofResult { proof, stats }); *account_proofs_processed += 1; // Send result to MultiProofTask @@ -1422,8 +1275,6 @@ where ); } - proof_cursor_metrics.record_spans(); - trace!( target: "trie::proof_task", proof_time_us = proof_elapsed.as_micros(), @@ -1504,7 +1355,6 @@ fn build_account_multiproof_with_storage_roots

( provider: &P, ctx: AccountMultiproofParams<'_>, tracker: &mut ParallelTrieTracker, - proof_cursor_metrics: &mut ProofTaskCursorMetricsCache, ) -> Result where P: TrieCursorFactory + HashedCursorFactory, @@ -1512,12 +1362,15 @@ where let accounts_added_removed_keys = ctx.multi_added_removed_keys.as_ref().map(|keys| keys.get_accounts()); + // Create local metrics caches for account cursors. We can't directly use the metrics caches in + // the tracker due to the call to `inc_missed_leaves` which occurs on it. + let mut account_trie_cursor_metrics = TrieCursorMetricsCache::default(); + let mut account_hashed_cursor_metrics = HashedCursorMetricsCache::default(); + // Wrap account trie cursor with instrumented cursor let account_trie_cursor = provider.account_trie_cursor().map_err(ProviderError::Database)?; - let account_trie_cursor = InstrumentedTrieCursor::new( - account_trie_cursor, - &mut proof_cursor_metrics.account_trie_cursor, - ); + let account_trie_cursor = + InstrumentedTrieCursor::new(account_trie_cursor, &mut account_trie_cursor_metrics); // Create the walker. let walker = TrieWalker::<_>::state_trie(account_trie_cursor, ctx.prefix_set) @@ -1544,10 +1397,8 @@ where // Wrap account hashed cursor with instrumented cursor let account_hashed_cursor = provider.hashed_account_cursor().map_err(ProviderError::Database)?; - let account_hashed_cursor = InstrumentedHashedCursor::new( - account_hashed_cursor, - &mut proof_cursor_metrics.account_hashed_cursor, - ); + let account_hashed_cursor = + InstrumentedHashedCursor::new(account_hashed_cursor, &mut account_hashed_cursor_metrics); let mut account_node_iter = TrieNodeIter::state_trie(walker, account_hashed_cursor); @@ -1611,10 +1462,10 @@ where StorageProof::new_hashed(provider, provider, hashed_address) .with_prefix_set_mut(Default::default()) .with_trie_cursor_metrics( - &mut proof_cursor_metrics.storage_trie_cursor, + &mut tracker.cursor_metrics.storage_trie_cursor, ) .with_hashed_cursor_metrics( - &mut proof_cursor_metrics.storage_hashed_cursor, + &mut tracker.cursor_metrics.storage_hashed_cursor, ) .storage_multiproof( ctx.targets @@ -1665,6 +1516,21 @@ where BranchNodeMasksMap::default() }; + // Extend tracker with accumulated metrics from account cursors + tracker.cursor_metrics.account_trie_cursor.extend(&account_trie_cursor_metrics); + tracker.cursor_metrics.account_hashed_cursor.extend(&account_hashed_cursor_metrics); + + // Consume remaining storage proof receivers for accounts not encountered during trie walk. + // Done last to allow storage workers more time to complete while we finalized the account trie. + for (hashed_address, receiver) in storage_proof_receivers { + if let Ok(proof_msg) = receiver.recv() { + let proof_result = proof_msg.result?; + let proof = Into::>::into(proof_result) + .expect("Partial proofs are not yet supported"); + collected_decoded_storages.insert(hashed_address, proof); + } + } + Ok(DecodedMultiProof { account_subtree: decoded_account_subtree, branch_node_masks, @@ -1684,6 +1550,7 @@ fn dispatch_storage_proofs( storage_prefix_sets: &mut B256Map, with_branch_node_masks: bool, multi_added_removed_keys: Option<&Arc>, + use_v2_proofs: bool, ) -> Result>, ParallelStateRootError> { let mut storage_proof_receivers = B256Map::with_capacity_and_hasher(targets.len(), Default::default()); @@ -1697,14 +1564,20 @@ fn dispatch_storage_proofs( let (result_tx, result_rx) = crossbeam_channel::unbounded(); // Create computation input based on V2 flag - let prefix_set = storage_prefix_sets.remove(hashed_address).unwrap_or_default(); - let input = StorageProofInput::legacy( - *hashed_address, - prefix_set, - target_slots.clone(), - with_branch_node_masks, - multi_added_removed_keys.cloned(), - ); + let input = if use_v2_proofs { + // Convert target slots to V2 targets + let v2_targets = target_slots.iter().copied().map(Into::into).collect(); + StorageProofInput::new(*hashed_address, v2_targets) + } else { + let prefix_set = storage_prefix_sets.remove(hashed_address).unwrap_or_default(); + StorageProofInput::legacy( + *hashed_address, + prefix_set, + target_slots.clone(), + with_branch_node_masks, + multi_added_removed_keys.cloned(), + ) + }; // Always dispatch a storage proof so we obtain the storage root even when no slots are // requested. @@ -1722,64 +1595,6 @@ fn dispatch_storage_proofs( Ok(storage_proof_receivers) } - -/// Queues V2 storage proofs for all accounts in the targets and returns receivers. -/// -/// This function queues all storage proof tasks to the worker pool but returns immediately -/// with receivers, allowing the account trie walk to proceed in parallel with storage proof -/// computation. This enables interleaved parallelism for better performance. -/// -/// Propagates errors up if queuing fails. Receivers must be consumed by the caller. -fn dispatch_v2_storage_proofs( - storage_work_tx: &CrossbeamSender, - account_targets: &Vec, - storage_targets: B256Map>, -) -> Result>, ParallelStateRootError> { - let mut storage_proof_receivers = - B256Map::with_capacity_and_hasher(account_targets.len(), Default::default()); - - // Dispatch all proofs for targeted storage slots - for (hashed_address, targets) in storage_targets { - // Create channel for receiving StorageProofResultMessage - let (result_tx, result_rx) = crossbeam_channel::unbounded(); - let input = StorageProofInput::new(hashed_address, targets); - - storage_work_tx - .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx }) - .map_err(|_| { - ParallelStateRootError::Other(format!( - "Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable", - )) - })?; - - storage_proof_receivers.insert(hashed_address, result_rx); - } - - // If there are any targeted accounts which did not have storage targets then we generate a - // single proof target for them so that we get their root. - for target in account_targets { - let hashed_address = target.key(); - if storage_proof_receivers.contains_key(&hashed_address) { - continue - } - - let (result_tx, result_rx) = crossbeam_channel::unbounded(); - let input = StorageProofInput::new(hashed_address, vec![proof_v2::Target::new(B256::ZERO)]); - - storage_work_tx - .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx }) - .map_err(|_| { - ParallelStateRootError::Other(format!( - "Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable", - )) - })?; - - storage_proof_receivers.insert(hashed_address, result_rx); - } - - Ok(storage_proof_receivers) -} - /// Input parameters for storage proof computation. #[derive(Debug)] pub enum StorageProofInput { @@ -1824,7 +1639,7 @@ impl StorageProofInput { } } - /// Creates a new [`StorageProofInput`] with the given hashed address and target slots. + /// Creates a new [`StorageProofInput`] with the given hashed address and target slots. pub const fn new(hashed_address: B256, targets: Vec) -> Self { Self::V2 { hashed_address, targets } } @@ -1840,39 +1655,20 @@ impl StorageProofInput { } /// Input parameters for account multiproof computation. -#[derive(Debug)] -pub enum AccountMultiproofInput { - /// Legacy account multiproof proof variant - Legacy { - /// The targets for which to compute the multiproof. - targets: MultiProofTargets, - /// The prefix sets for the proof calculation. - prefix_sets: TriePrefixSets, - /// Whether or not to collect branch node masks. - collect_branch_node_masks: bool, - /// Provided by the user to give the necessary context to retain extra proofs. - multi_added_removed_keys: Option>, - /// Context for sending the proof result. - proof_result_sender: ProofResultContext, - }, - /// V2 account multiproof variant - V2 { - /// The targets for which to compute the multiproof. - targets: MultiProofTargetsV2, - /// Context for sending the proof result. - proof_result_sender: ProofResultContext, - }, -} - -impl AccountMultiproofInput { - /// Returns the [`ProofResultContext`] for this input, consuming the input. - fn into_proof_result_sender(self) -> ProofResultContext { - match self { - Self::Legacy { proof_result_sender, .. } | Self::V2 { proof_result_sender, .. } => { - proof_result_sender - } - } - } +#[derive(Debug, Clone)] +pub struct AccountMultiproofInput { + /// The targets for which to compute the multiproof. + pub targets: MultiProofTargets, + /// The prefix sets for the proof calculation. + pub prefix_sets: TriePrefixSets, + /// Whether or not to collect branch node masks. + pub collect_branch_node_masks: bool, + /// Provided by the user to give the necessary context to retain extra proofs. + pub multi_added_removed_keys: Option>, + /// Context for sending the proof result. + pub proof_result_sender: ProofResultContext, + /// Whether to use V2 storage proofs. + pub v2_proofs_enabled: bool, } /// Parameters for building an account multiproof with pre-computed storage roots. diff --git a/crates/trie/parallel/src/stats.rs b/crates/trie/parallel/src/stats.rs index de5b0a628e..088b95c970 100644 --- a/crates/trie/parallel/src/stats.rs +++ b/crates/trie/parallel/src/stats.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "metrics")] +use crate::proof_task_metrics::ProofTaskCursorMetricsCache; use derive_more::Deref; use reth_trie::stats::{TrieStats, TrieTracker}; @@ -34,6 +36,9 @@ pub struct ParallelTrieTracker { trie: TrieTracker, precomputed_storage_roots: u64, missed_leaves: u64, + #[cfg(feature = "metrics")] + /// Local tracking of cursor-related metrics + pub cursor_metrics: ProofTaskCursorMetricsCache, } impl ParallelTrieTracker { diff --git a/crates/trie/parallel/src/value_encoder.rs b/crates/trie/parallel/src/value_encoder.rs index 7b08d3e1b5..13c611922d 100644 --- a/crates/trie/parallel/src/value_encoder.rs +++ b/crates/trie/parallel/src/value_encoder.rs @@ -86,6 +86,7 @@ pub(crate) struct AsyncAccountValueEncoder { impl AsyncAccountValueEncoder { /// Initializes a [`Self`] using a `ProofWorkerHandle` which will be used to calculate storage /// roots asynchronously. + #[expect(dead_code)] pub(crate) fn new( storage_work_tx: CrossbeamSender, dispatched: B256Map>, @@ -105,6 +106,7 @@ impl AsyncAccountValueEncoder { /// /// This method panics if any deferred encoders produced by [`Self::deferred_encoder`] have not /// been dropped. + #[expect(dead_code)] pub(crate) fn into_storage_proofs( self, ) -> Result>, StateProofError> {