From 7ceca70353eab35000ddc70de10017cedd7a0ba2 Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Thu, 8 Jan 2026 11:53:24 +0100 Subject: [PATCH] feat(trie): Add flag to enable proof v2 for storage proof workers (#20617) Co-authored-by: YK --- crates/engine/primitives/src/config.rs | 16 + .../tree/src/tree/payload_processor/mod.rs | 9 +- .../src/tree/payload_processor/multiproof.rs | 29 +- crates/node/core/src/args/engine.rs | 17 + crates/trie/parallel/src/proof.rs | 69 ++- crates/trie/parallel/src/proof_task.rs | 416 ++++++++++++------ crates/trie/parallel/src/root.rs | 11 +- crates/trie/trie/src/proof_v2/mod.rs | 27 +- docs/cli/help.rs | 68 +-- docs/vocs/docs/pages/cli/op-reth/node.mdx | 3 + docs/vocs/docs/pages/cli/reth/node.mdx | 3 + 11 files changed, 440 insertions(+), 228 deletions(-) diff --git a/crates/engine/primitives/src/config.rs b/crates/engine/primitives/src/config.rs index 10265b9a43..1eacfef6c1 100644 --- a/crates/engine/primitives/src/config.rs +++ b/crates/engine/primitives/src/config.rs @@ -135,6 +135,8 @@ pub struct TreeConfig { storage_worker_count: usize, /// Number of account proof worker threads. account_worker_count: usize, + /// Whether to enable V2 storage proofs. + enable_proof_v2: bool, } impl Default for TreeConfig { @@ -163,6 +165,7 @@ impl Default for TreeConfig { allow_unwind_canonical_header: false, storage_worker_count: default_storage_worker_count(), account_worker_count: default_account_worker_count(), + enable_proof_v2: false, } } } @@ -194,6 +197,7 @@ impl TreeConfig { allow_unwind_canonical_header: bool, storage_worker_count: usize, account_worker_count: usize, + enable_proof_v2: bool, ) -> Self { Self { persistence_threshold, @@ -219,6 +223,7 @@ impl TreeConfig { allow_unwind_canonical_header, storage_worker_count, account_worker_count, + enable_proof_v2, } } @@ -500,4 +505,15 @@ impl TreeConfig { self.account_worker_count = account_worker_count.max(MIN_WORKER_COUNT); self } + + /// Return whether V2 storage proofs are enabled. + pub const fn enable_proof_v2(&self) -> bool { + self.enable_proof_v2 + } + + /// Setter for whether to enable V2 storage proofs. + pub const fn with_enable_proof_v2(mut self, enable_proof_v2: bool) -> Self { + self.enable_proof_v2 = enable_proof_v2; + self + } } diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 20a77b1602..1e4a2e1440 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -274,24 +274,23 @@ where let task_ctx = ProofTaskCtx::new(multiproof_provider_factory); let storage_worker_count = config.storage_worker_count(); let account_worker_count = config.account_worker_count(); + let v2_proofs_enabled = config.enable_proof_v2(); let proof_handle = ProofWorkerHandle::new( self.executor.handle().clone(), task_ctx, storage_worker_count, account_worker_count, + v2_proofs_enabled, ); let multi_proof_task = MultiProofTask::new( proof_handle.clone(), to_sparse_trie, config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()), - to_multi_proof, + to_multi_proof.clone(), from_multi_proof, ); - // wire the multiproof task to the prewarm task - let to_multi_proof = Some(multi_proof_task.state_root_message_sender()); - // spawn multi-proof task let parent_span = span.clone(); let saved_cache = prewarm_handle.saved_cache.clone(); @@ -316,7 +315,7 @@ where self.spawn_sparse_trie_task(sparse_trie_rx, proof_handle, state_root_tx); PayloadHandle { - to_multi_proof, + to_multi_proof: Some(to_multi_proof), prewarm_handle, state_root: Some(state_root_rx), transactions: execution_rx, diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index e4264f48bd..f30592dd12 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -283,6 +283,8 @@ pub struct MultiproofManager { proof_result_tx: CrossbeamSender, /// Metrics metrics: MultiProofTaskMetrics, + /// Whether to use V2 storage proofs + v2_proofs_enabled: bool, } impl MultiproofManager { @@ -296,11 +298,14 @@ impl MultiproofManager { metrics.max_storage_workers.set(proof_worker_handle.total_storage_workers() as f64); metrics.max_account_workers.set(proof_worker_handle.total_account_workers() as f64); + let v2_proofs_enabled = proof_worker_handle.v2_proofs_enabled(); + Self { metrics, proof_worker_handle, missed_leaves_storage_roots: Default::default(), proof_result_tx, + v2_proofs_enabled, } } @@ -380,6 +385,7 @@ impl MultiproofManager { hashed_state_update, start, ), + v2_proofs_enabled: self.v2_proofs_enabled, }; if let Err(e) = self.proof_worker_handle.dispatch_account_multiproof(input) { @@ -619,11 +625,6 @@ impl MultiProofTask { } } - /// Returns a sender that can be used to send arbitrary [`MultiProofMessage`]s to this task. - pub(super) fn state_root_message_sender(&self) -> CrossbeamSender { - self.tx.clone() - } - /// Handles request for proof prefetch. /// /// Returns how many multiproof tasks were dispatched for the prefetch request. @@ -1223,7 +1224,7 @@ impl MultiProofTask { let update = SparseTrieUpdate { state: proof_result.state, - multiproof: proof_result_data.into_multiproof(), + multiproof: proof_result_data.proof, }; if let Some(combined_update) = @@ -1531,7 +1532,7 @@ mod tests { let rt_handle = get_test_runtime_handle(); let overlay_factory = OverlayStateProviderFactory::new(factory); let task_ctx = ProofTaskCtx::new(overlay_factory); - let proof_handle = ProofWorkerHandle::new(rt_handle, task_ctx, 1, 1); + let proof_handle = ProofWorkerHandle::new(rt_handle, task_ctx, 1, 1, false); let (to_sparse_trie, _receiver) = std::sync::mpsc::channel(); let (tx, rx) = crossbeam_channel::unbounded(); @@ -2005,7 +2006,7 @@ mod tests { let mut targets3 = MultiProofTargets::default(); targets3.insert(addr3, HashSet::default()); - let tx = task.state_root_message_sender(); + let tx = task.tx.clone(); tx.send(MultiProofMessage::PrefetchProofs(targets1)).unwrap(); tx.send(MultiProofMessage::PrefetchProofs(targets2)).unwrap(); tx.send(MultiProofMessage::PrefetchProofs(targets3)).unwrap(); @@ -2081,7 +2082,7 @@ mod tests { let source = StateChangeSource::Transaction(0); - let tx = task.state_root_message_sender(); + let tx = task.tx.clone(); tx.send(MultiProofMessage::StateUpdate(source.into(), update1.clone())).unwrap(); tx.send(MultiProofMessage::StateUpdate(source.into(), update2.clone())).unwrap(); @@ -2145,7 +2146,7 @@ mod tests { let source_b = StateChangeSource::Transaction(2); // Queue: A1 (immediate dispatch), B1 (batched), A2 (should become pending) - let tx = task.state_root_message_sender(); + let tx = task.tx.clone(); tx.send(MultiProofMessage::StateUpdate(source_a.into(), create_state_update(addr_a1, 100))) .unwrap(); tx.send(MultiProofMessage::StateUpdate(source_b.into(), create_state_update(addr_b1, 200))) @@ -2267,7 +2268,7 @@ mod tests { let source = StateChangeSource::PreBlock(StateChangePreBlockSource::BeaconRootContract); // Queue: first update dispatched immediately, next two should not merge - let tx = task.state_root_message_sender(); + let tx = task.tx.clone(); tx.send(MultiProofMessage::StateUpdate(source.into(), create_state_update(addr1, 100))) .unwrap(); tx.send(MultiProofMessage::StateUpdate(source.into(), create_state_update(addr2, 200))) @@ -2410,7 +2411,7 @@ mod tests { let source = StateChangeSource::Transaction(42); // Queue: [PrefetchProofs1, PrefetchProofs2, StateUpdate1, StateUpdate2, PrefetchProofs3] - let tx = task.state_root_message_sender(); + let tx = task.tx.clone(); tx.send(MultiProofMessage::PrefetchProofs(targets1)).unwrap(); tx.send(MultiProofMessage::PrefetchProofs(targets2)).unwrap(); tx.send(MultiProofMessage::StateUpdate(source.into(), state_update1)).unwrap(); @@ -2511,7 +2512,7 @@ mod tests { let source = StateChangeSource::Transaction(99); - let tx = task.state_root_message_sender(); + let tx = task.tx.clone(); tx.send(MultiProofMessage::PrefetchProofs(prefetch1)).unwrap(); tx.send(MultiProofMessage::StateUpdate(source.into(), state_update)).unwrap(); tx.send(MultiProofMessage::PrefetchProofs(prefetch2.clone())).unwrap(); @@ -2607,7 +2608,7 @@ mod tests { let source = StateChangeSource::Transaction(42); // Queue: [Prefetch1, State1, State2, State3, Prefetch2] - let tx = task.state_root_message_sender(); + let tx = task.tx.clone(); tx.send(MultiProofMessage::PrefetchProofs(prefetch1.clone())).unwrap(); tx.send(MultiProofMessage::StateUpdate( source.into(), diff --git a/crates/node/core/src/args/engine.rs b/crates/node/core/src/args/engine.rs index a860e62e45..8662f797c7 100644 --- a/crates/node/core/src/args/engine.rs +++ b/crates/node/core/src/args/engine.rs @@ -36,6 +36,7 @@ pub struct DefaultEngineValues { allow_unwind_canonical_header: bool, storage_worker_count: Option, account_worker_count: Option, + enable_proof_v2: bool, } impl DefaultEngineValues { @@ -165,6 +166,12 @@ impl DefaultEngineValues { self.account_worker_count = v; self } + + /// Set whether to enable proof V2 by default + pub const fn with_enable_proof_v2(mut self, v: bool) -> Self { + self.enable_proof_v2 = v; + self + } } impl Default for DefaultEngineValues { @@ -189,6 +196,7 @@ impl Default for DefaultEngineValues { allow_unwind_canonical_header: false, storage_worker_count: None, account_worker_count: None, + enable_proof_v2: false, } } } @@ -308,6 +316,10 @@ pub struct EngineArgs { /// If not specified, defaults to the same count as storage workers. #[arg(long = "engine.account-worker-count", default_value = Resettable::from(DefaultEngineValues::get_global().account_worker_count.map(|v| v.to_string().into())))] pub account_worker_count: Option, + + /// Enable V2 storage proofs for state root calculations + #[arg(long = "engine.enable-proof-v2", default_value_t = DefaultEngineValues::get_global().enable_proof_v2)] + pub enable_proof_v2: bool, } #[allow(deprecated)] @@ -333,6 +345,7 @@ impl Default for EngineArgs { allow_unwind_canonical_header, storage_worker_count, account_worker_count, + enable_proof_v2, } = DefaultEngineValues::get_global().clone(); Self { persistence_threshold, @@ -357,6 +370,7 @@ impl Default for EngineArgs { allow_unwind_canonical_header, storage_worker_count, account_worker_count, + enable_proof_v2, } } } @@ -392,6 +406,8 @@ impl EngineArgs { config = config.with_account_worker_count(count); } + config = config.with_enable_proof_v2(self.enable_proof_v2); + config } } @@ -441,6 +457,7 @@ mod tests { allow_unwind_canonical_header: true, storage_worker_count: Some(16), account_worker_count: Some(8), + enable_proof_v2: false, }; let parsed_args = CommandParser::::parse_from([ diff --git a/crates/trie/parallel/src/proof.rs b/crates/trie/parallel/src/proof.rs index 433c13fb08..adb26a265b 100644 --- a/crates/trie/parallel/src/proof.rs +++ b/crates/trie/parallel/src/proof.rs @@ -1,8 +1,8 @@ use crate::{ metrics::ParallelTrieMetrics, proof_task::{ - AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle, - StorageProofInput, + AccountMultiproofInput, ProofResult, ProofResultContext, ProofWorkerHandle, + StorageProofInput, StorageProofResultMessage, }, root::ParallelStateRootError, StorageRootTargets, @@ -37,6 +37,8 @@ pub struct ParallelProof { /// Cached storage proof roots for missed leaves; this maps /// hashed (missed) addresses to their storage proof roots. missed_leaves_storage_roots: Arc>, + /// Whether to use V2 storage proofs. + v2_proofs_enabled: bool, #[cfg(feature = "metrics")] metrics: ParallelTrieMetrics, } @@ -54,11 +56,18 @@ impl ParallelProof { collect_branch_node_masks: false, multi_added_removed_keys: None, proof_worker_handle, + v2_proofs_enabled: false, #[cfg(feature = "metrics")] metrics: ParallelTrieMetrics::new_with_labels(&[("type", "proof")]), } } + /// Set whether to use V2 storage proofs. + pub const fn with_v2_proofs_enabled(mut self, v2_proofs_enabled: bool) -> Self { + self.v2_proofs_enabled = v2_proofs_enabled; + self + } + /// Set the flag indicating whether to include branch node masks in the proof. pub const fn with_branch_node_masks(mut self, branch_node_masks: bool) -> Self { self.collect_branch_node_masks = branch_node_masks; @@ -80,23 +89,26 @@ impl ParallelProof { hashed_address: B256, prefix_set: PrefixSet, target_slots: B256Set, - ) -> Result, ParallelStateRootError> { + ) -> Result, ParallelStateRootError> { let (result_tx, result_rx) = crossbeam_channel::unbounded(); - let start = Instant::now(); - let input = StorageProofInput::new( - hashed_address, - prefix_set, - target_slots, - self.collect_branch_node_masks, - self.multi_added_removed_keys.clone(), - ); + let input = if self.v2_proofs_enabled { + StorageProofInput::new( + hashed_address, + target_slots.into_iter().map(Into::into).collect(), + ) + } else { + StorageProofInput::legacy( + hashed_address, + prefix_set, + target_slots, + self.collect_branch_node_masks, + self.multi_added_removed_keys.clone(), + ) + }; self.proof_worker_handle - .dispatch_storage_proof( - input, - ProofResultContext::new(result_tx, 0, HashedPostState::default(), start), - ) + .dispatch_storage_proof(input, result_tx) .map_err(|e| ParallelStateRootError::Other(e.to_string()))?; Ok(result_rx) @@ -127,19 +139,9 @@ impl ParallelProof { })?; // Extract storage proof directly from the result - let storage_proof = match proof_msg.result? { - crate::proof_task::ProofResult::StorageProof { hashed_address: addr, proof } => { - debug_assert_eq!( - addr, - hashed_address, - "storage worker must return same address: expected {hashed_address}, got {addr}" - ); - proof - } - crate::proof_task::ProofResult::AccountMultiproof { .. } => { - unreachable!("storage worker only sends StorageProof variant") - } - }; + let proof_result = proof_msg.result?; + let storage_proof = Into::>::into(proof_result) + .expect("Partial proofs are not yet supported"); trace!( target: "trie::parallel_proof", @@ -210,6 +212,7 @@ impl ParallelProof { HashedPostState::default(), account_multiproof_start_time, ), + v2_proofs_enabled: self.v2_proofs_enabled, }; self.proof_worker_handle @@ -223,12 +226,7 @@ impl ParallelProof { ) })?; - let (multiproof, stats) = match proof_result_msg.result? { - crate::proof_task::ProofResult::AccountMultiproof { proof, stats } => (proof, stats), - crate::proof_task::ProofResult::StorageProof { .. } => { - unreachable!("account worker only sends AccountMultiproof variant") - } - }; + let ProofResult { proof: multiproof, stats } = proof_result_msg.result?; #[cfg(feature = "metrics")] self.metrics.record(stats); @@ -330,7 +328,8 @@ mod tests { let factory = reth_provider::providers::OverlayStateProviderFactory::new(factory); let task_ctx = ProofTaskCtx::new(factory); - let proof_worker_handle = ProofWorkerHandle::new(rt.handle().clone(), task_ctx, 1, 1); + let proof_worker_handle = + ProofWorkerHandle::new(rt.handle().clone(), task_ctx, 1, 1, false); let parallel_result = ParallelProof::new(Default::default(), Default::default(), proof_worker_handle.clone()) diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index 73e91e98fc..2f64be4513 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -49,10 +49,11 @@ use reth_trie::{ node_iter::{TrieElement, TrieNodeIter}, prefix_set::TriePrefixSets, proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider, StorageProof}, + proof_v2::{self, StorageProofCalculator}, trie_cursor::{InstrumentedTrieCursor, TrieCursorFactory, TrieCursorMetricsCache}, walker::TrieWalker, DecodedMultiProof, DecodedStorageMultiProof, HashBuilder, HashedPostState, MultiProofTargets, - Nibbles, TRIE_ACCOUNT_RLP_MAX_SIZE, + Nibbles, ProofTrieNode, TRIE_ACCOUNT_RLP_MAX_SIZE, }; use reth_trie_common::{ added_removed_keys::MultiAddedRemovedKeys, @@ -77,7 +78,6 @@ use crate::proof_task_metrics::{ ProofTaskCursorMetrics, ProofTaskCursorMetricsCache, ProofTaskTrieMetrics, }; -type StorageProofResult = Result; type TrieNodeProviderResult = Result, SparseTrieError>; /// A handle that provides type-safe access to proof worker pools. @@ -101,6 +101,8 @@ pub struct ProofWorkerHandle { storage_worker_count: usize, /// Total number of account workers spawned account_worker_count: usize, + /// Whether V2 storage proofs are enabled + v2_proofs_enabled: bool, } impl ProofWorkerHandle { @@ -114,11 +116,13 @@ impl ProofWorkerHandle { /// - `task_ctx`: Shared context with database view and prefix sets /// - `storage_worker_count`: Number of storage workers to spawn /// - `account_worker_count`: Number of account workers to spawn + /// - `v2_proofs_enabled`: Whether to enable V2 storage proofs pub fn new( executor: Handle, task_ctx: ProofTaskCtx, storage_worker_count: usize, account_worker_count: usize, + v2_proofs_enabled: bool, ) -> Self where Factory: DatabaseProviderROFactory @@ -138,6 +142,7 @@ impl ProofWorkerHandle { target: "trie::proof_task", storage_worker_count, account_worker_count, + ?v2_proofs_enabled, "Spawning proof worker pools" ); @@ -167,7 +172,8 @@ impl ProofWorkerHandle { metrics, #[cfg(feature = "metrics")] cursor_metrics, - ); + ) + .with_v2_proofs(v2_proofs_enabled); if let Err(error) = worker.run() { error!( target: "trie::proof_task", @@ -228,9 +234,15 @@ impl ProofWorkerHandle { account_available_workers, storage_worker_count, account_worker_count, + v2_proofs_enabled, } } + /// Returns whether V2 storage proofs are enabled for this worker pool. + pub const fn v2_proofs_enabled(&self) -> bool { + self.v2_proofs_enabled + } + /// Returns how many storage workers are currently available/idle. pub fn available_storage_workers(&self) -> usize { self.storage_available_workers.load(Ordering::Relaxed) @@ -281,8 +293,9 @@ impl ProofWorkerHandle { pub fn dispatch_storage_proof( &self, input: StorageProofInput, - proof_result_sender: ProofResultContext, + proof_result_sender: CrossbeamSender, ) -> Result<(), ProviderError> { + let hashed_address = input.hashed_address(); self.storage_work_tx .send(StorageWorkerJob::StorageProof { input, proof_result_sender }) .map_err(|err| { @@ -290,18 +303,9 @@ impl ProofWorkerHandle { ProviderError::other(std::io::Error::other("storage workers unavailable")); if let StorageWorkerJob::StorageProof { proof_result_sender, .. } = err.0 { - let ProofResultContext { - sender: result_tx, - sequence_number: seq, - state, - start_time: start, - } = proof_result_sender; - - let _ = result_tx.send(ProofResultMessage { - sequence_number: seq, + let _ = proof_result_sender.send(StorageProofResultMessage { + hashed_address, result: Err(ParallelStateRootError::Provider(error.clone())), - elapsed: start.elapsed(), - state, }); } @@ -378,7 +382,7 @@ impl ProofWorkerHandle { } } -/// Data used for initializing cursor factories that is shared across all storage proof instances. +/// Data used for initializing cursor factories that is shared across all proof worker instances. #[derive(Clone, Debug)] pub struct ProofTaskCtx { /// The factory for creating state providers. @@ -392,7 +396,7 @@ impl ProofTaskCtx { } } -/// This contains all information shared between all storage proof instances. +/// This contains all information shared between account proof worker instances. #[derive(Debug)] pub struct ProofTaskTx { /// The provider that implements `TrieCursorFactory` and `HashedCursorFactory`. @@ -417,20 +421,23 @@ where /// /// Used by storage workers in the worker pool to compute storage proofs. #[inline] - fn compute_storage_proof( + fn compute_legacy_storage_proof( &self, input: StorageProofInput, trie_cursor_metrics: &mut TrieCursorMetricsCache, hashed_cursor_metrics: &mut HashedCursorMetricsCache, - ) -> StorageProofResult { + ) -> Result { // Consume the input so we can move large collections (e.g. target slots) without cloning. - let StorageProofInput { + let StorageProofInput::Legacy { hashed_address, prefix_set, target_slots, with_branch_node_masks, multi_added_removed_keys, - } = input; + } = input + else { + panic!("compute_legacy_storage_proof only accepts StorageProofInput::Legacy") + }; // Get or create added/removed keys context let multi_added_removed_keys = @@ -469,7 +476,7 @@ where hashed_address, e )) }) - }); + })?; trace!( target: "trie::proof_task", @@ -479,7 +486,51 @@ where "Completed storage proof calculation" ); - decoded_result + Ok(StorageProofResult::Legacy { proof: decoded_result }) + } + + fn compute_v2_storage_proof( + &self, + input: StorageProofInput, + calculator: &mut proof_v2::StorageProofCalculator< + ::StorageTrieCursor<'_>, + ::StorageCursor<'_>, + >, + ) -> Result { + let StorageProofInput::V2 { hashed_address, mut targets } = input else { + panic!("compute_v2_storage_proof only accepts StorageProofInput::V2") + }; + + // If targets is empty it means the caller only wants the root hash. The V2 proof calculator + // will do nothing given no targets, so instead we give it a fake target so it always + // returns at least the root. + if targets.is_empty() { + targets.push(proof_v2::Target::new(B256::ZERO)); + } + + let span = debug_span!( + target: "trie::proof_task", + "V2 Storage proof calculation", + ?hashed_address, + targets = ?targets.len(), + worker_id = self.id, + ); + let _span_guard = span.enter(); + + let proof_start = Instant::now(); + let proof = calculator.storage_proof(hashed_address, &mut targets)?; + let root = calculator.compute_root_hash(&proof)?; + + trace!( + target: "trie::proof_task", + hashed_address = ?hashed_address, + proof_time_us = proof_start.elapsed().as_micros(), + ?root, + worker_id = self.id, + "Completed V2 storage proof calculation" + ); + + Ok(StorageProofResult::V2 { proof, root }) } /// Process a blinded storage node request. @@ -552,39 +603,16 @@ impl TrieNodeProvider for ProofTaskTrieNodeProvider { } } } -/// Result of a proof calculation, which can be either an account multiproof or a storage proof. + +/// Result of a multiproof calculation. #[derive(Debug)] -pub enum ProofResult { - /// Account multiproof with statistics - AccountMultiproof { - /// The account multiproof - proof: DecodedMultiProof, - /// Statistics collected during proof computation - stats: ParallelTrieStats, - }, - /// Storage proof for a specific account - StorageProof { - /// The hashed address this storage proof belongs to - hashed_address: B256, - /// The storage multiproof - proof: DecodedStorageMultiProof, - }, +pub struct ProofResult { + /// The account multiproof + pub proof: DecodedMultiProof, + /// Statistics collected during proof computation + pub stats: ParallelTrieStats, } -impl ProofResult { - /// Convert this proof result into a `DecodedMultiProof`. - /// - /// For account multiproofs, returns the multiproof directly (discarding stats). - /// For storage proofs, wraps the storage proof into a minimal multiproof. - pub fn into_multiproof(self) -> DecodedMultiProof { - match self { - Self::AccountMultiproof { proof, stats: _ } => proof, - Self::StorageProof { hashed_address, proof } => { - DecodedMultiProof::from_storage_proof(hashed_address, proof) - } - } - } -} /// Channel used by worker threads to deliver `ProofResultMessage` items back to /// `MultiProofTask`. /// @@ -634,6 +662,58 @@ impl ProofResultContext { Self { sender, sequence_number, state, start_time } } } + +/// The results of a storage proof calculation. +#[derive(Debug)] +pub(crate) enum StorageProofResult { + Legacy { + /// The storage multiproof + proof: DecodedStorageMultiProof, + }, + V2 { + /// The calculated V2 proof nodes + proof: Vec, + /// The storage root calculated by the V2 proof + root: Option, + }, +} + +impl StorageProofResult { + /// Returns the calculated root of the trie, if one can be calculated from the proof. + const fn root(&self) -> Option { + match self { + Self::Legacy { proof } => Some(proof.root), + Self::V2 { root, .. } => *root, + } + } +} + +impl From for Option { + /// Returns None if the V2 proof result doesn't have a calculated root hash. + fn from(proof_result: StorageProofResult) -> Self { + match proof_result { + StorageProofResult::Legacy { proof } => Some(proof), + StorageProofResult::V2 { proof, root } => root.map(|root| { + let branch_node_masks = proof + .iter() + .filter_map(|node| node.masks.map(|masks| (node.path, masks))) + .collect(); + let subtree = proof.into_iter().map(|node| (node.path, node.node)).collect(); + DecodedStorageMultiProof { root, subtree, branch_node_masks } + }), + } + } +} + +/// Message containing a completed storage proof result with metadata. +#[derive(Debug)] +pub struct StorageProofResultMessage { + /// The hashed address this storage proof belongs to + pub(crate) hashed_address: B256, + /// The storage proof calculation result + pub(crate) result: Result, +} + /// Internal message for storage workers. #[derive(Debug)] enum StorageWorkerJob { @@ -642,7 +722,7 @@ enum StorageWorkerJob { /// Storage proof input parameters input: StorageProofInput, /// Context for sending the proof result. - proof_result_sender: ProofResultContext, + proof_result_sender: CrossbeamSender, }, /// Blinded storage node retrieval request BlindedStorageNode { @@ -674,6 +754,8 @@ struct StorageProofWorker { /// Cursor metrics for this worker #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics, + /// Set to true if V2 proofs are enabled. + v2_enabled: bool, } impl StorageProofWorker @@ -698,9 +780,16 @@ where metrics, #[cfg(feature = "metrics")] cursor_metrics, + v2_enabled: false, } } + /// Changes whether or not V2 proofs are enabled. + const fn with_v2_proofs(mut self, v2_enabled: bool) -> Self { + self.v2_enabled = v2_enabled; + self + } + /// Runs the worker loop, processing jobs until the channel closes. /// /// # Lifecycle @@ -728,6 +817,7 @@ where metrics, #[cfg(feature = "metrics")] ref mut cursor_metrics, + v2_enabled: _, } = self; // Create provider from factory @@ -743,6 +833,13 @@ where let mut storage_proofs_processed = 0u64; let mut storage_nodes_processed = 0u64; let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default(); + let mut v2_calculator = if self.v2_enabled { + let trie_cursor = proof_tx.provider.storage_trie_cursor(B256::ZERO)?; + let hashed_cursor = proof_tx.provider.hashed_storage_cursor(B256::ZERO)?; + Some(proof_v2::StorageProofCalculator::new_storage(trie_cursor, hashed_cursor)) + } else { + None + }; // Initially mark this worker as available. available_workers.fetch_add(1, Ordering::Relaxed); @@ -756,6 +853,7 @@ where Self::process_storage_proof( worker_id, &proof_tx, + v2_calculator.as_mut(), input, proof_result_sender, &mut storage_proofs_processed, @@ -800,53 +898,58 @@ where fn process_storage_proof( worker_id: usize, proof_tx: &ProofTaskTx, + v2_calculator: Option< + &mut StorageProofCalculator< + ::StorageTrieCursor<'_>, + ::StorageCursor<'_>, + >, + >, input: StorageProofInput, - proof_result_sender: ProofResultContext, + proof_result_sender: CrossbeamSender, storage_proofs_processed: &mut u64, cursor_metrics_cache: &mut ProofTaskCursorMetricsCache, ) where Provider: TrieCursorFactory + HashedCursorFactory, { - let hashed_address = input.hashed_address; - let ProofResultContext { sender, sequence_number: seq, state, start_time } = - proof_result_sender; - let mut trie_cursor_metrics = TrieCursorMetricsCache::default(); let mut hashed_cursor_metrics = HashedCursorMetricsCache::default(); - - trace!( - target: "trie::proof_task", - worker_id, - hashed_address = ?hashed_address, - prefix_set_len = input.prefix_set.len(), - target_slots_len = input.target_slots.len(), - "Processing storage proof" - ); - + let hashed_address = input.hashed_address(); let proof_start = Instant::now(); - let result = proof_tx.compute_storage_proof( - input, - &mut trie_cursor_metrics, - &mut hashed_cursor_metrics, - ); + + let result = match &input { + StorageProofInput::Legacy { hashed_address, prefix_set, target_slots, .. } => { + trace!( + target: "trie::proof_task", + worker_id, + hashed_address = ?hashed_address, + prefix_set_len = prefix_set.len(), + target_slots_len = target_slots.len(), + "Processing storage proof" + ); + + proof_tx.compute_legacy_storage_proof( + input, + &mut trie_cursor_metrics, + &mut hashed_cursor_metrics, + ) + } + StorageProofInput::V2 { hashed_address, targets } => { + trace!( + target: "trie::proof_task", + worker_id, + hashed_address = ?hashed_address, + targets_len = targets.len(), + "Processing V2 storage proof" + ); + proof_tx + .compute_v2_storage_proof(input, v2_calculator.expect("v2 calculator provided")) + } + }; let proof_elapsed = proof_start.elapsed(); *storage_proofs_processed += 1; - let result_msg = result.map(|storage_proof| ProofResult::StorageProof { - hashed_address, - proof: storage_proof, - }); - - if sender - .send(ProofResultMessage { - sequence_number: seq, - result: result_msg, - elapsed: start_time.elapsed(), - state, - }) - .is_err() - { + if proof_result_sender.send(StorageProofResultMessage { hashed_address, result }).is_err() { trace!( target: "trie::proof_task", worker_id, @@ -1094,6 +1197,7 @@ where missed_leaves_storage_roots, proof_result_sender: ProofResultContext { sender: result_tx, sequence_number: seq, state, start_time: start }, + v2_proofs_enabled, } = input; let span = debug_span!( @@ -1126,6 +1230,7 @@ where &mut storage_prefix_sets, collect_branch_node_masks, multi_added_removed_keys.as_ref(), + v2_proofs_enabled, ) { Ok(receivers) => receivers, Err(error) => { @@ -1162,7 +1267,7 @@ where proof_cursor_metrics.record_spans(); let stats = tracker.finish(); - let result = result.map(|proof| ProofResult::AccountMultiproof { proof, stats }); + let result = result.map(|proof| ProofResult { proof, stats }); *account_proofs_processed += 1; // Send result to MultiProofTask @@ -1340,21 +1445,21 @@ where drop(_guard); // Extract storage proof from the result - let proof = match proof_msg.result? { - ProofResult::StorageProof { hashed_address: addr, proof } => { - debug_assert_eq!( - addr, - hashed_address, - "storage worker must return same address: expected {hashed_address}, got {addr}" - ); - proof - } - ProofResult::AccountMultiproof { .. } => { - unreachable!("storage worker only sends StorageProof variant") - } + debug_assert_eq!( + proof_msg.hashed_address, hashed_address, + "storage worker must return same address" + ); + let proof_result = proof_msg.result?; + let Some(root) = proof_result.root() else { + trace!( + target: "trie::proof_task", + ?proof_result, + "Received proof_result without root", + ); + panic!("Partial proofs are not yet supported"); }; - - let root = proof.root; + let proof = Into::>::into(proof_result) + .expect("Partial proofs are not yet supported (into)"); collected_decoded_storages.insert(hashed_address, proof); root } @@ -1411,9 +1516,10 @@ where for (hashed_address, receiver) in storage_proof_receivers { if let Ok(proof_msg) = receiver.recv() { // Extract storage proof from the result - if let Ok(ProofResult::StorageProof { proof, .. }) = proof_msg.result { - collected_decoded_storages.insert(hashed_address, proof); - } + let proof_result = proof_msg.result?; + let proof = Into::>::into(proof_result) + .expect("Partial proofs are not yet supported"); + collected_decoded_storages.insert(hashed_address, proof); } } @@ -1457,39 +1563,36 @@ fn dispatch_storage_proofs( storage_prefix_sets: &mut B256Map, with_branch_node_masks: bool, multi_added_removed_keys: Option<&Arc>, -) -> Result>, ParallelStateRootError> { + use_v2_proofs: bool, +) -> Result>, ParallelStateRootError> { let mut storage_proof_receivers = B256Map::with_capacity_and_hasher(targets.len(), Default::default()); // Dispatch all storage proofs to worker pool for (hashed_address, target_slots) in targets.iter() { - let prefix_set = storage_prefix_sets.remove(hashed_address).unwrap_or_default(); - // Create channel for receiving ProofResultMessage let (result_tx, result_rx) = crossbeam_channel::unbounded(); - let start = Instant::now(); - // Create computation input (data only, no communication channel) - let input = StorageProofInput::new( - *hashed_address, - prefix_set, - target_slots.clone(), - with_branch_node_masks, - multi_added_removed_keys.cloned(), - ); + // Create computation input based on V2 flag + let input = if use_v2_proofs { + // Convert target slots to V2 targets + let v2_targets = target_slots.iter().copied().map(Into::into).collect(); + StorageProofInput::new(*hashed_address, v2_targets) + } else { + let prefix_set = storage_prefix_sets.remove(hashed_address).unwrap_or_default(); + StorageProofInput::legacy( + *hashed_address, + prefix_set, + target_slots.clone(), + with_branch_node_masks, + multi_added_removed_keys.cloned(), + ) + }; // Always dispatch a storage proof so we obtain the storage root even when no slots are // requested. storage_work_tx - .send(StorageWorkerJob::StorageProof { - input, - proof_result_sender: ProofResultContext::new( - result_tx, - 0, - HashedPostState::default(), - start, - ), - }) + .send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx }) .map_err(|_| { ParallelStateRootError::Other(format!( "Failed to queue storage proof for {}: storage worker pool unavailable", @@ -1504,30 +1607,40 @@ fn dispatch_storage_proofs( } /// Input parameters for storage proof computation. #[derive(Debug)] -pub struct StorageProofInput { - /// The hashed address for which the proof is calculated. - hashed_address: B256, - /// The prefix set for the proof calculation. - prefix_set: PrefixSet, - /// The target slots for the proof calculation. - target_slots: B256Set, - /// Whether or not to collect branch node masks - with_branch_node_masks: bool, - /// Provided by the user to give the necessary context to retain extra proofs. - multi_added_removed_keys: Option>, +pub enum StorageProofInput { + /// Legacy storage proof variant + Legacy { + /// The hashed address for which the proof is calculated. + hashed_address: B256, + /// The prefix set for the proof calculation. + prefix_set: PrefixSet, + /// The target slots for the proof calculation. + target_slots: B256Set, + /// Whether or not to collect branch node masks + with_branch_node_masks: bool, + /// Provided by the user to give the necessary context to retain extra proofs. + multi_added_removed_keys: Option>, + }, + /// V2 storage proof variant + V2 { + /// The hashed address for which the proof is calculated. + hashed_address: B256, + /// The set of proof targets + targets: Vec, + }, } impl StorageProofInput { - /// Creates a new [`StorageProofInput`] with the given hashed address, prefix set, and target + /// Creates a legacy [`StorageProofInput`] with the given hashed address, prefix set, and target /// slots. - pub const fn new( + pub const fn legacy( hashed_address: B256, prefix_set: PrefixSet, target_slots: B256Set, with_branch_node_masks: bool, multi_added_removed_keys: Option>, ) -> Self { - Self { + Self::Legacy { hashed_address, prefix_set, target_slots, @@ -1535,7 +1648,22 @@ impl StorageProofInput { multi_added_removed_keys, } } + + /// Creates a new [`StorageProofInput`] with the given hashed address and target slots. + pub const fn new(hashed_address: B256, targets: Vec) -> Self { + Self::V2 { hashed_address, targets } + } + + /// Returns the targeted hashed address. + pub const fn hashed_address(&self) -> B256 { + match self { + Self::Legacy { hashed_address, .. } | Self::V2 { hashed_address, .. } => { + *hashed_address + } + } + } } + /// Input parameters for account multiproof computation. #[derive(Debug, Clone)] pub struct AccountMultiproofInput { @@ -1551,6 +1679,8 @@ pub struct AccountMultiproofInput { pub missed_leaves_storage_roots: Arc>, /// Context for sending the proof result. pub proof_result_sender: ProofResultContext, + /// Whether to use V2 storage proofs. + pub v2_proofs_enabled: bool, } /// Parameters for building an account multiproof with pre-computed storage roots. @@ -1564,7 +1694,7 @@ struct AccountMultiproofParams<'a> { /// Provided by the user to give the necessary context to retain extra proofs. multi_added_removed_keys: Option<&'a Arc>, /// Receivers for storage proofs being computed in parallel. - storage_proof_receivers: B256Map>, + storage_proof_receivers: B256Map>, /// Cached storage proof roots for missed leaves encountered during account trie walk. missed_leaves_storage_roots: &'a DashMap, } @@ -1607,7 +1737,7 @@ mod tests { reth_provider::providers::OverlayStateProviderFactory::new(provider_factory); let ctx = test_ctx(factory); - let proof_handle = ProofWorkerHandle::new(handle.clone(), ctx, 5, 3); + let proof_handle = ProofWorkerHandle::new(handle.clone(), ctx, 5, 3, false); // Verify handle can be cloned let _cloned_handle = proof_handle.clone(); diff --git a/crates/trie/parallel/src/root.rs b/crates/trie/parallel/src/root.rs index 5c9294e8f9..74f1295e46 100644 --- a/crates/trie/parallel/src/root.rs +++ b/crates/trie/parallel/src/root.rs @@ -4,7 +4,7 @@ use crate::{stats::ParallelTrieTracker, storage_root_targets::StorageRootTargets use alloy_primitives::B256; use alloy_rlp::{BufMut, Encodable}; use itertools::Itertools; -use reth_execution_errors::StorageRootError; +use reth_execution_errors::{StateProofError, StorageRootError}; use reth_provider::{DatabaseProviderROFactory, ProviderError}; use reth_storage_errors::db::DatabaseError; use reth_trie::{ @@ -255,6 +255,15 @@ impl From for ParallelStateRootError { } } +impl From for ParallelStateRootError { + fn from(error: StateProofError) -> Self { + match error { + StateProofError::Database(err) => Self::Provider(ProviderError::Database(err)), + StateProofError::Rlp(err) => Self::Provider(ProviderError::Rlp(err)), + } + } +} + /// Gets or creates a tokio runtime handle for spawning blocking tasks. /// This ensures we always have a runtime available for I/O operations. fn get_runtime_handle() -> Handle { diff --git a/crates/trie/trie/src/proof_v2/mod.rs b/crates/trie/trie/src/proof_v2/mod.rs index 028cf75d19..2abc55686f 100644 --- a/crates/trie/trie/src/proof_v2/mod.rs +++ b/crates/trie/trie/src/proof_v2/mod.rs @@ -11,7 +11,7 @@ use crate::{ hashed_cursor::{HashedCursor, HashedStorageCursor}, trie_cursor::{depth_first, TrieCursor, TrieStorageCursor}, }; -use alloy_primitives::{B256, U256}; +use alloy_primitives::{keccak256, B256, U256}; use alloy_rlp::Encodable; use alloy_trie::{BranchNodeCompact, TrieMask}; use reth_execution_errors::trie::StateProofError; @@ -1354,6 +1354,31 @@ where // Use the static StorageValueEncoder and pass it to proof_inner self.proof_inner(&STORAGE_VALUE_ENCODER, targets) } + + /// Computes the root hash from a set of proof nodes. + /// + /// Returns `None` if there is no root node (partial proof), otherwise returns the hash of the + /// root node. + /// + /// This method reuses the internal RLP encode buffer for efficiency. + pub fn compute_root_hash( + &mut self, + proof_nodes: &[ProofTrieNode], + ) -> Result, StateProofError> { + // Find the root node (node at empty path) + let root_node = proof_nodes.iter().find(|node| node.path.is_empty()); + + let Some(root) = root_node else { + return Ok(None); + }; + + // Compute the hash of the root node + self.rlp_encode_buf.clear(); + root.node.encode(&mut self.rlp_encode_buf); + let root_hash = keccak256(&self.rlp_encode_buf); + + Ok(Some(root_hash)) + } } /// Helper type wrapping a slice of [`Target`]s, primarily used to iterate through targets in diff --git a/docs/cli/help.rs b/docs/cli/help.rs index 30c769b9ff..c96eaf63a1 100755 --- a/docs/cli/help.rs +++ b/docs/cli/help.rs @@ -1,4 +1,4 @@ -#!/usr/bin/env -S cargo +nightly -Zscript +#!/usr/bin/env -S cargo -Zscript --- [package] edition = "2021" @@ -260,12 +260,17 @@ fn update_root_summary(root_dir: &Path, root_summary: &str) -> io::Result<()> { } /// Generates TypeScript sidebar files for each command. -fn generate_sidebar_files(vocs_dir: &Path, output: &[(Cmd, String)], verbose: bool) -> io::Result<()> { +fn generate_sidebar_files( + vocs_dir: &Path, + output: &[(Cmd, String)], + verbose: bool, +) -> io::Result<()> { // Group commands by their root command name (reth or op-reth) // Also create a map of commands to their help output - let mut commands_by_root: std::collections::HashMap> = std::collections::HashMap::new(); + let mut commands_by_root: std::collections::HashMap> = + std::collections::HashMap::new(); let mut help_map: std::collections::HashMap = std::collections::HashMap::new(); - + for (cmd, help_output) in output { let root_name = cmd.command_name().to_string(); commands_by_root.entry(root_name.clone()).or_insert_with(Vec::new).push(cmd); @@ -288,7 +293,7 @@ fn generate_sidebar_files(vocs_dir: &Path, output: &[(Cmd, String)], verbose: bo continue; } }; - + let sidebar_file = vocs_dir.join(file_name); if verbose { println!("Writing sidebar file: {}", sidebar_file.display()); @@ -307,19 +312,16 @@ fn generate_sidebar_ts( help_map: &std::collections::HashMap, ) -> io::Result { // Find all top-level commands (commands with exactly one subcommand) - let mut top_level_commands: Vec<&Cmd> = commands - .iter() - .copied() - .filter(|cmd| cmd.subcommands.len() == 1) - .collect(); - + let mut top_level_commands: Vec<&Cmd> = + commands.iter().copied().filter(|cmd| cmd.subcommands.len() == 1).collect(); + // Remove duplicates using a set let mut seen = std::collections::HashSet::new(); top_level_commands.retain(|cmd| { let key = &cmd.subcommands[0]; seen.insert(key.clone()) }); - + // Sort by the order they appear in help output, not alphabetically if let Some(help) = root_help { let help_order = parse_sub_commands(help); @@ -345,14 +347,15 @@ fn generate_sidebar_ts( ts_code.push_str(&format!(" link: \"/cli/{}\",\n", root_name)); ts_code.push_str(" collapsed: false,\n"); ts_code.push_str(" items: [\n"); - + for (idx, cmd) in top_level_commands.iter().enumerate() { let is_last = idx == top_level_commands.len() - 1; - if let Some(item_str) = build_sidebar_item(root_name, cmd, &commands, 1, help_map, is_last) { + if let Some(item_str) = build_sidebar_item(root_name, cmd, &commands, 1, help_map, is_last) + { ts_code.push_str(&item_str); } } - + ts_code.push_str(" ]\n"); ts_code.push_str("};\n\n"); @@ -371,17 +374,18 @@ fn build_sidebar_item( ) -> Option { let full_cmd_name = cmd.to_string(); let link_path = format!("/cli/{}", full_cmd_name.replace(" ", "/")); - - // Find all direct child commands (commands whose subcommands start with this command's subcommands) + + // Find all direct child commands (commands whose subcommands start with this command's + // subcommands) let mut children: Vec<&Cmd> = all_commands .iter() .copied() .filter(|other_cmd| { - other_cmd.subcommands.len() == cmd.subcommands.len() + 1 - && other_cmd.subcommands[..cmd.subcommands.len()] == cmd.subcommands[..] + other_cmd.subcommands.len() == cmd.subcommands.len() + 1 && + other_cmd.subcommands[..cmd.subcommands.len()] == cmd.subcommands[..] }) .collect(); - + // Sort children by the order they appear in help output, not alphabetically if children.len() > 1 { // Get help output for this command to determine subcommand order @@ -396,31 +400,37 @@ fn build_sidebar_item( }); } else { // Fall back to alphabetical if we can't get help - children.sort_by(|a, b| { - a.subcommands.last().unwrap().cmp(b.subcommands.last().unwrap()) - }); + children + .sort_by(|a, b| a.subcommands.last().unwrap().cmp(b.subcommands.last().unwrap())); } } - + let indent = " ".repeat(depth); let mut item_str = String::new(); - + item_str.push_str(&format!("{}{{\n", indent)); item_str.push_str(&format!("{} text: \"{}\",\n", indent, full_cmd_name)); item_str.push_str(&format!("{} link: \"{}\"", indent, link_path)); - + if !children.is_empty() { item_str.push_str(",\n"); item_str.push_str(&format!("{} collapsed: true,\n", indent)); item_str.push_str(&format!("{} items: [\n", indent)); - + for (idx, child_cmd) in children.iter().enumerate() { let child_is_last = idx == children.len() - 1; - if let Some(child_str) = build_sidebar_item(root_name, child_cmd, all_commands, depth + 1, help_map, child_is_last) { + if let Some(child_str) = build_sidebar_item( + root_name, + child_cmd, + all_commands, + depth + 1, + help_map, + child_is_last, + ) { item_str.push_str(&child_str); } } - + item_str.push_str(&format!("{} ]\n", indent)); if is_last { item_str.push_str(&format!("{}}}\n", indent)); diff --git a/docs/vocs/docs/pages/cli/op-reth/node.mdx b/docs/vocs/docs/pages/cli/op-reth/node.mdx index ee1adfee78..ad9efd44a5 100644 --- a/docs/vocs/docs/pages/cli/op-reth/node.mdx +++ b/docs/vocs/docs/pages/cli/op-reth/node.mdx @@ -956,6 +956,9 @@ Engine: --engine.account-worker-count Configure the number of account proof workers in the Tokio blocking pool. If not specified, defaults to the same count as storage workers + --engine.enable-proof-v2 + Enable V2 storage proofs for state root calculations + ERA: --era.enable Enable import from ERA1 files diff --git a/docs/vocs/docs/pages/cli/reth/node.mdx b/docs/vocs/docs/pages/cli/reth/node.mdx index 38b59183e4..11c94575d4 100644 --- a/docs/vocs/docs/pages/cli/reth/node.mdx +++ b/docs/vocs/docs/pages/cli/reth/node.mdx @@ -956,6 +956,9 @@ Engine: --engine.account-worker-count Configure the number of account proof workers in the Tokio blocking pool. If not specified, defaults to the same count as storage workers + --engine.enable-proof-v2 + Enable V2 storage proofs for state root calculations + ERA: --era.enable Enable import from ERA1 files