feat(trie): Add flag to enable proof v2 for storage proof workers (#20617)

Co-authored-by: YK <chiayongkang@hotmail.com>
This commit is contained in:
Brian Picciano
2026-01-08 11:53:24 +01:00
committed by GitHub
parent 4412a501eb
commit 7ceca70353
11 changed files with 440 additions and 228 deletions

View File

@@ -135,6 +135,8 @@ pub struct TreeConfig {
storage_worker_count: usize,
/// Number of account proof worker threads.
account_worker_count: usize,
/// Whether to enable V2 storage proofs.
enable_proof_v2: bool,
}
impl Default for TreeConfig {
@@ -163,6 +165,7 @@ impl Default for TreeConfig {
allow_unwind_canonical_header: false,
storage_worker_count: default_storage_worker_count(),
account_worker_count: default_account_worker_count(),
enable_proof_v2: false,
}
}
}
@@ -194,6 +197,7 @@ impl TreeConfig {
allow_unwind_canonical_header: bool,
storage_worker_count: usize,
account_worker_count: usize,
enable_proof_v2: bool,
) -> Self {
Self {
persistence_threshold,
@@ -219,6 +223,7 @@ impl TreeConfig {
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
enable_proof_v2,
}
}
@@ -500,4 +505,15 @@ impl TreeConfig {
self.account_worker_count = account_worker_count.max(MIN_WORKER_COUNT);
self
}
/// Return whether V2 storage proofs are enabled.
pub const fn enable_proof_v2(&self) -> bool {
self.enable_proof_v2
}
/// Setter for whether to enable V2 storage proofs.
pub const fn with_enable_proof_v2(mut self, enable_proof_v2: bool) -> Self {
self.enable_proof_v2 = enable_proof_v2;
self
}
}

View File

@@ -274,24 +274,23 @@ where
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
let storage_worker_count = config.storage_worker_count();
let account_worker_count = config.account_worker_count();
let v2_proofs_enabled = config.enable_proof_v2();
let proof_handle = ProofWorkerHandle::new(
self.executor.handle().clone(),
task_ctx,
storage_worker_count,
account_worker_count,
v2_proofs_enabled,
);
let multi_proof_task = MultiProofTask::new(
proof_handle.clone(),
to_sparse_trie,
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
to_multi_proof,
to_multi_proof.clone(),
from_multi_proof,
);
// wire the multiproof task to the prewarm task
let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
// spawn multi-proof task
let parent_span = span.clone();
let saved_cache = prewarm_handle.saved_cache.clone();
@@ -316,7 +315,7 @@ where
self.spawn_sparse_trie_task(sparse_trie_rx, proof_handle, state_root_tx);
PayloadHandle {
to_multi_proof,
to_multi_proof: Some(to_multi_proof),
prewarm_handle,
state_root: Some(state_root_rx),
transactions: execution_rx,

View File

@@ -283,6 +283,8 @@ pub struct MultiproofManager {
proof_result_tx: CrossbeamSender<ProofResultMessage>,
/// Metrics
metrics: MultiProofTaskMetrics,
/// Whether to use V2 storage proofs
v2_proofs_enabled: bool,
}
impl MultiproofManager {
@@ -296,11 +298,14 @@ impl MultiproofManager {
metrics.max_storage_workers.set(proof_worker_handle.total_storage_workers() as f64);
metrics.max_account_workers.set(proof_worker_handle.total_account_workers() as f64);
let v2_proofs_enabled = proof_worker_handle.v2_proofs_enabled();
Self {
metrics,
proof_worker_handle,
missed_leaves_storage_roots: Default::default(),
proof_result_tx,
v2_proofs_enabled,
}
}
@@ -380,6 +385,7 @@ impl MultiproofManager {
hashed_state_update,
start,
),
v2_proofs_enabled: self.v2_proofs_enabled,
};
if let Err(e) = self.proof_worker_handle.dispatch_account_multiproof(input) {
@@ -619,11 +625,6 @@ impl MultiProofTask {
}
}
/// Returns a sender that can be used to send arbitrary [`MultiProofMessage`]s to this task.
pub(super) fn state_root_message_sender(&self) -> CrossbeamSender<MultiProofMessage> {
self.tx.clone()
}
/// Handles request for proof prefetch.
///
/// Returns how many multiproof tasks were dispatched for the prefetch request.
@@ -1223,7 +1224,7 @@ impl MultiProofTask {
let update = SparseTrieUpdate {
state: proof_result.state,
multiproof: proof_result_data.into_multiproof(),
multiproof: proof_result_data.proof,
};
if let Some(combined_update) =
@@ -1531,7 +1532,7 @@ mod tests {
let rt_handle = get_test_runtime_handle();
let overlay_factory = OverlayStateProviderFactory::new(factory);
let task_ctx = ProofTaskCtx::new(overlay_factory);
let proof_handle = ProofWorkerHandle::new(rt_handle, task_ctx, 1, 1);
let proof_handle = ProofWorkerHandle::new(rt_handle, task_ctx, 1, 1, false);
let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();
let (tx, rx) = crossbeam_channel::unbounded();
@@ -2005,7 +2006,7 @@ mod tests {
let mut targets3 = MultiProofTargets::default();
targets3.insert(addr3, HashSet::default());
let tx = task.state_root_message_sender();
let tx = task.tx.clone();
tx.send(MultiProofMessage::PrefetchProofs(targets1)).unwrap();
tx.send(MultiProofMessage::PrefetchProofs(targets2)).unwrap();
tx.send(MultiProofMessage::PrefetchProofs(targets3)).unwrap();
@@ -2081,7 +2082,7 @@ mod tests {
let source = StateChangeSource::Transaction(0);
let tx = task.state_root_message_sender();
let tx = task.tx.clone();
tx.send(MultiProofMessage::StateUpdate(source.into(), update1.clone())).unwrap();
tx.send(MultiProofMessage::StateUpdate(source.into(), update2.clone())).unwrap();
@@ -2145,7 +2146,7 @@ mod tests {
let source_b = StateChangeSource::Transaction(2);
// Queue: A1 (immediate dispatch), B1 (batched), A2 (should become pending)
let tx = task.state_root_message_sender();
let tx = task.tx.clone();
tx.send(MultiProofMessage::StateUpdate(source_a.into(), create_state_update(addr_a1, 100)))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(source_b.into(), create_state_update(addr_b1, 200)))
@@ -2267,7 +2268,7 @@ mod tests {
let source = StateChangeSource::PreBlock(StateChangePreBlockSource::BeaconRootContract);
// Queue: first update dispatched immediately, next two should not merge
let tx = task.state_root_message_sender();
let tx = task.tx.clone();
tx.send(MultiProofMessage::StateUpdate(source.into(), create_state_update(addr1, 100)))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(source.into(), create_state_update(addr2, 200)))
@@ -2410,7 +2411,7 @@ mod tests {
let source = StateChangeSource::Transaction(42);
// Queue: [PrefetchProofs1, PrefetchProofs2, StateUpdate1, StateUpdate2, PrefetchProofs3]
let tx = task.state_root_message_sender();
let tx = task.tx.clone();
tx.send(MultiProofMessage::PrefetchProofs(targets1)).unwrap();
tx.send(MultiProofMessage::PrefetchProofs(targets2)).unwrap();
tx.send(MultiProofMessage::StateUpdate(source.into(), state_update1)).unwrap();
@@ -2511,7 +2512,7 @@ mod tests {
let source = StateChangeSource::Transaction(99);
let tx = task.state_root_message_sender();
let tx = task.tx.clone();
tx.send(MultiProofMessage::PrefetchProofs(prefetch1)).unwrap();
tx.send(MultiProofMessage::StateUpdate(source.into(), state_update)).unwrap();
tx.send(MultiProofMessage::PrefetchProofs(prefetch2.clone())).unwrap();
@@ -2607,7 +2608,7 @@ mod tests {
let source = StateChangeSource::Transaction(42);
// Queue: [Prefetch1, State1, State2, State3, Prefetch2]
let tx = task.state_root_message_sender();
let tx = task.tx.clone();
tx.send(MultiProofMessage::PrefetchProofs(prefetch1.clone())).unwrap();
tx.send(MultiProofMessage::StateUpdate(
source.into(),

View File

@@ -36,6 +36,7 @@ pub struct DefaultEngineValues {
allow_unwind_canonical_header: bool,
storage_worker_count: Option<usize>,
account_worker_count: Option<usize>,
enable_proof_v2: bool,
}
impl DefaultEngineValues {
@@ -165,6 +166,12 @@ impl DefaultEngineValues {
self.account_worker_count = v;
self
}
/// Set whether to enable proof V2 by default
pub const fn with_enable_proof_v2(mut self, v: bool) -> Self {
self.enable_proof_v2 = v;
self
}
}
impl Default for DefaultEngineValues {
@@ -189,6 +196,7 @@ impl Default for DefaultEngineValues {
allow_unwind_canonical_header: false,
storage_worker_count: None,
account_worker_count: None,
enable_proof_v2: false,
}
}
}
@@ -308,6 +316,10 @@ pub struct EngineArgs {
/// If not specified, defaults to the same count as storage workers.
#[arg(long = "engine.account-worker-count", default_value = Resettable::from(DefaultEngineValues::get_global().account_worker_count.map(|v| v.to_string().into())))]
pub account_worker_count: Option<usize>,
/// Enable V2 storage proofs for state root calculations
#[arg(long = "engine.enable-proof-v2", default_value_t = DefaultEngineValues::get_global().enable_proof_v2)]
pub enable_proof_v2: bool,
}
#[allow(deprecated)]
@@ -333,6 +345,7 @@ impl Default for EngineArgs {
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
enable_proof_v2,
} = DefaultEngineValues::get_global().clone();
Self {
persistence_threshold,
@@ -357,6 +370,7 @@ impl Default for EngineArgs {
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
enable_proof_v2,
}
}
}
@@ -392,6 +406,8 @@ impl EngineArgs {
config = config.with_account_worker_count(count);
}
config = config.with_enable_proof_v2(self.enable_proof_v2);
config
}
}
@@ -441,6 +457,7 @@ mod tests {
allow_unwind_canonical_header: true,
storage_worker_count: Some(16),
account_worker_count: Some(8),
enable_proof_v2: false,
};
let parsed_args = CommandParser::<EngineArgs>::parse_from([

View File

@@ -1,8 +1,8 @@
use crate::{
metrics::ParallelTrieMetrics,
proof_task::{
AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
StorageProofInput,
AccountMultiproofInput, ProofResult, ProofResultContext, ProofWorkerHandle,
StorageProofInput, StorageProofResultMessage,
},
root::ParallelStateRootError,
StorageRootTargets,
@@ -37,6 +37,8 @@ pub struct ParallelProof {
/// Cached storage proof roots for missed leaves; this maps
/// hashed (missed) addresses to their storage proof roots.
missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
/// Whether to use V2 storage proofs.
v2_proofs_enabled: bool,
#[cfg(feature = "metrics")]
metrics: ParallelTrieMetrics,
}
@@ -54,11 +56,18 @@ impl ParallelProof {
collect_branch_node_masks: false,
multi_added_removed_keys: None,
proof_worker_handle,
v2_proofs_enabled: false,
#[cfg(feature = "metrics")]
metrics: ParallelTrieMetrics::new_with_labels(&[("type", "proof")]),
}
}
/// Set whether to use V2 storage proofs.
pub const fn with_v2_proofs_enabled(mut self, v2_proofs_enabled: bool) -> Self {
self.v2_proofs_enabled = v2_proofs_enabled;
self
}
/// Set the flag indicating whether to include branch node masks in the proof.
pub const fn with_branch_node_masks(mut self, branch_node_masks: bool) -> Self {
self.collect_branch_node_masks = branch_node_masks;
@@ -80,23 +89,26 @@ impl ParallelProof {
hashed_address: B256,
prefix_set: PrefixSet,
target_slots: B256Set,
) -> Result<CrossbeamReceiver<ProofResultMessage>, ParallelStateRootError> {
) -> Result<CrossbeamReceiver<StorageProofResultMessage>, ParallelStateRootError> {
let (result_tx, result_rx) = crossbeam_channel::unbounded();
let start = Instant::now();
let input = StorageProofInput::new(
hashed_address,
prefix_set,
target_slots,
self.collect_branch_node_masks,
self.multi_added_removed_keys.clone(),
);
let input = if self.v2_proofs_enabled {
StorageProofInput::new(
hashed_address,
target_slots.into_iter().map(Into::into).collect(),
)
} else {
StorageProofInput::legacy(
hashed_address,
prefix_set,
target_slots,
self.collect_branch_node_masks,
self.multi_added_removed_keys.clone(),
)
};
self.proof_worker_handle
.dispatch_storage_proof(
input,
ProofResultContext::new(result_tx, 0, HashedPostState::default(), start),
)
.dispatch_storage_proof(input, result_tx)
.map_err(|e| ParallelStateRootError::Other(e.to_string()))?;
Ok(result_rx)
@@ -127,19 +139,9 @@ impl ParallelProof {
})?;
// Extract storage proof directly from the result
let storage_proof = match proof_msg.result? {
crate::proof_task::ProofResult::StorageProof { hashed_address: addr, proof } => {
debug_assert_eq!(
addr,
hashed_address,
"storage worker must return same address: expected {hashed_address}, got {addr}"
);
proof
}
crate::proof_task::ProofResult::AccountMultiproof { .. } => {
unreachable!("storage worker only sends StorageProof variant")
}
};
let proof_result = proof_msg.result?;
let storage_proof = Into::<Option<DecodedStorageMultiProof>>::into(proof_result)
.expect("Partial proofs are not yet supported");
trace!(
target: "trie::parallel_proof",
@@ -210,6 +212,7 @@ impl ParallelProof {
HashedPostState::default(),
account_multiproof_start_time,
),
v2_proofs_enabled: self.v2_proofs_enabled,
};
self.proof_worker_handle
@@ -223,12 +226,7 @@ impl ParallelProof {
)
})?;
let (multiproof, stats) = match proof_result_msg.result? {
crate::proof_task::ProofResult::AccountMultiproof { proof, stats } => (proof, stats),
crate::proof_task::ProofResult::StorageProof { .. } => {
unreachable!("account worker only sends AccountMultiproof variant")
}
};
let ProofResult { proof: multiproof, stats } = proof_result_msg.result?;
#[cfg(feature = "metrics")]
self.metrics.record(stats);
@@ -330,7 +328,8 @@ mod tests {
let factory = reth_provider::providers::OverlayStateProviderFactory::new(factory);
let task_ctx = ProofTaskCtx::new(factory);
let proof_worker_handle = ProofWorkerHandle::new(rt.handle().clone(), task_ctx, 1, 1);
let proof_worker_handle =
ProofWorkerHandle::new(rt.handle().clone(), task_ctx, 1, 1, false);
let parallel_result =
ParallelProof::new(Default::default(), Default::default(), proof_worker_handle.clone())

View File

@@ -49,10 +49,11 @@ use reth_trie::{
node_iter::{TrieElement, TrieNodeIter},
prefix_set::TriePrefixSets,
proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider, StorageProof},
proof_v2::{self, StorageProofCalculator},
trie_cursor::{InstrumentedTrieCursor, TrieCursorFactory, TrieCursorMetricsCache},
walker::TrieWalker,
DecodedMultiProof, DecodedStorageMultiProof, HashBuilder, HashedPostState, MultiProofTargets,
Nibbles, TRIE_ACCOUNT_RLP_MAX_SIZE,
Nibbles, ProofTrieNode, TRIE_ACCOUNT_RLP_MAX_SIZE,
};
use reth_trie_common::{
added_removed_keys::MultiAddedRemovedKeys,
@@ -77,7 +78,6 @@ use crate::proof_task_metrics::{
ProofTaskCursorMetrics, ProofTaskCursorMetricsCache, ProofTaskTrieMetrics,
};
type StorageProofResult = Result<DecodedStorageMultiProof, ParallelStateRootError>;
type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
/// A handle that provides type-safe access to proof worker pools.
@@ -101,6 +101,8 @@ pub struct ProofWorkerHandle {
storage_worker_count: usize,
/// Total number of account workers spawned
account_worker_count: usize,
/// Whether V2 storage proofs are enabled
v2_proofs_enabled: bool,
}
impl ProofWorkerHandle {
@@ -114,11 +116,13 @@ impl ProofWorkerHandle {
/// - `task_ctx`: Shared context with database view and prefix sets
/// - `storage_worker_count`: Number of storage workers to spawn
/// - `account_worker_count`: Number of account workers to spawn
/// - `v2_proofs_enabled`: Whether to enable V2 storage proofs
pub fn new<Factory>(
executor: Handle,
task_ctx: ProofTaskCtx<Factory>,
storage_worker_count: usize,
account_worker_count: usize,
v2_proofs_enabled: bool,
) -> Self
where
Factory: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
@@ -138,6 +142,7 @@ impl ProofWorkerHandle {
target: "trie::proof_task",
storage_worker_count,
account_worker_count,
?v2_proofs_enabled,
"Spawning proof worker pools"
);
@@ -167,7 +172,8 @@ impl ProofWorkerHandle {
metrics,
#[cfg(feature = "metrics")]
cursor_metrics,
);
)
.with_v2_proofs(v2_proofs_enabled);
if let Err(error) = worker.run() {
error!(
target: "trie::proof_task",
@@ -228,9 +234,15 @@ impl ProofWorkerHandle {
account_available_workers,
storage_worker_count,
account_worker_count,
v2_proofs_enabled,
}
}
/// Returns whether V2 storage proofs are enabled for this worker pool.
pub const fn v2_proofs_enabled(&self) -> bool {
self.v2_proofs_enabled
}
/// Returns how many storage workers are currently available/idle.
pub fn available_storage_workers(&self) -> usize {
self.storage_available_workers.load(Ordering::Relaxed)
@@ -281,8 +293,9 @@ impl ProofWorkerHandle {
pub fn dispatch_storage_proof(
&self,
input: StorageProofInput,
proof_result_sender: ProofResultContext,
proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
) -> Result<(), ProviderError> {
let hashed_address = input.hashed_address();
self.storage_work_tx
.send(StorageWorkerJob::StorageProof { input, proof_result_sender })
.map_err(|err| {
@@ -290,18 +303,9 @@ impl ProofWorkerHandle {
ProviderError::other(std::io::Error::other("storage workers unavailable"));
if let StorageWorkerJob::StorageProof { proof_result_sender, .. } = err.0 {
let ProofResultContext {
sender: result_tx,
sequence_number: seq,
state,
start_time: start,
} = proof_result_sender;
let _ = result_tx.send(ProofResultMessage {
sequence_number: seq,
let _ = proof_result_sender.send(StorageProofResultMessage {
hashed_address,
result: Err(ParallelStateRootError::Provider(error.clone())),
elapsed: start.elapsed(),
state,
});
}
@@ -378,7 +382,7 @@ impl ProofWorkerHandle {
}
}
/// Data used for initializing cursor factories that is shared across all storage proof instances.
/// Data used for initializing cursor factories that is shared across all proof worker instances.
#[derive(Clone, Debug)]
pub struct ProofTaskCtx<Factory> {
/// The factory for creating state providers.
@@ -392,7 +396,7 @@ impl<Factory> ProofTaskCtx<Factory> {
}
}
/// This contains all information shared between all storage proof instances.
/// This contains all information shared between account proof worker instances.
#[derive(Debug)]
pub struct ProofTaskTx<Provider> {
/// The provider that implements `TrieCursorFactory` and `HashedCursorFactory`.
@@ -417,20 +421,23 @@ where
///
/// Used by storage workers in the worker pool to compute storage proofs.
#[inline]
fn compute_storage_proof(
fn compute_legacy_storage_proof(
&self,
input: StorageProofInput,
trie_cursor_metrics: &mut TrieCursorMetricsCache,
hashed_cursor_metrics: &mut HashedCursorMetricsCache,
) -> StorageProofResult {
) -> Result<StorageProofResult, ParallelStateRootError> {
// Consume the input so we can move large collections (e.g. target slots) without cloning.
let StorageProofInput {
let StorageProofInput::Legacy {
hashed_address,
prefix_set,
target_slots,
with_branch_node_masks,
multi_added_removed_keys,
} = input;
} = input
else {
panic!("compute_legacy_storage_proof only accepts StorageProofInput::Legacy")
};
// Get or create added/removed keys context
let multi_added_removed_keys =
@@ -469,7 +476,7 @@ where
hashed_address, e
))
})
});
})?;
trace!(
target: "trie::proof_task",
@@ -479,7 +486,51 @@ where
"Completed storage proof calculation"
);
decoded_result
Ok(StorageProofResult::Legacy { proof: decoded_result })
}
fn compute_v2_storage_proof(
&self,
input: StorageProofInput,
calculator: &mut proof_v2::StorageProofCalculator<
<Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
<Provider as HashedCursorFactory>::StorageCursor<'_>,
>,
) -> Result<StorageProofResult, ParallelStateRootError> {
let StorageProofInput::V2 { hashed_address, mut targets } = input else {
panic!("compute_v2_storage_proof only accepts StorageProofInput::V2")
};
// If targets is empty it means the caller only wants the root hash. The V2 proof calculator
// will do nothing given no targets, so instead we give it a fake target so it always
// returns at least the root.
if targets.is_empty() {
targets.push(proof_v2::Target::new(B256::ZERO));
}
let span = debug_span!(
target: "trie::proof_task",
"V2 Storage proof calculation",
?hashed_address,
targets = ?targets.len(),
worker_id = self.id,
);
let _span_guard = span.enter();
let proof_start = Instant::now();
let proof = calculator.storage_proof(hashed_address, &mut targets)?;
let root = calculator.compute_root_hash(&proof)?;
trace!(
target: "trie::proof_task",
hashed_address = ?hashed_address,
proof_time_us = proof_start.elapsed().as_micros(),
?root,
worker_id = self.id,
"Completed V2 storage proof calculation"
);
Ok(StorageProofResult::V2 { proof, root })
}
/// Process a blinded storage node request.
@@ -552,39 +603,16 @@ impl TrieNodeProvider for ProofTaskTrieNodeProvider {
}
}
}
/// Result of a proof calculation, which can be either an account multiproof or a storage proof.
/// Result of a multiproof calculation.
#[derive(Debug)]
pub enum ProofResult {
/// Account multiproof with statistics
AccountMultiproof {
/// The account multiproof
proof: DecodedMultiProof,
/// Statistics collected during proof computation
stats: ParallelTrieStats,
},
/// Storage proof for a specific account
StorageProof {
/// The hashed address this storage proof belongs to
hashed_address: B256,
/// The storage multiproof
proof: DecodedStorageMultiProof,
},
pub struct ProofResult {
/// The account multiproof
pub proof: DecodedMultiProof,
/// Statistics collected during proof computation
pub stats: ParallelTrieStats,
}
impl ProofResult {
/// Convert this proof result into a `DecodedMultiProof`.
///
/// For account multiproofs, returns the multiproof directly (discarding stats).
/// For storage proofs, wraps the storage proof into a minimal multiproof.
pub fn into_multiproof(self) -> DecodedMultiProof {
match self {
Self::AccountMultiproof { proof, stats: _ } => proof,
Self::StorageProof { hashed_address, proof } => {
DecodedMultiProof::from_storage_proof(hashed_address, proof)
}
}
}
}
/// Channel used by worker threads to deliver `ProofResultMessage` items back to
/// `MultiProofTask`.
///
@@ -634,6 +662,58 @@ impl ProofResultContext {
Self { sender, sequence_number, state, start_time }
}
}
/// The results of a storage proof calculation.
#[derive(Debug)]
pub(crate) enum StorageProofResult {
Legacy {
/// The storage multiproof
proof: DecodedStorageMultiProof,
},
V2 {
/// The calculated V2 proof nodes
proof: Vec<ProofTrieNode>,
/// The storage root calculated by the V2 proof
root: Option<B256>,
},
}
impl StorageProofResult {
/// Returns the calculated root of the trie, if one can be calculated from the proof.
const fn root(&self) -> Option<B256> {
match self {
Self::Legacy { proof } => Some(proof.root),
Self::V2 { root, .. } => *root,
}
}
}
impl From<StorageProofResult> for Option<DecodedStorageMultiProof> {
/// Returns None if the V2 proof result doesn't have a calculated root hash.
fn from(proof_result: StorageProofResult) -> Self {
match proof_result {
StorageProofResult::Legacy { proof } => Some(proof),
StorageProofResult::V2 { proof, root } => root.map(|root| {
let branch_node_masks = proof
.iter()
.filter_map(|node| node.masks.map(|masks| (node.path, masks)))
.collect();
let subtree = proof.into_iter().map(|node| (node.path, node.node)).collect();
DecodedStorageMultiProof { root, subtree, branch_node_masks }
}),
}
}
}
/// Message containing a completed storage proof result with metadata.
#[derive(Debug)]
pub struct StorageProofResultMessage {
/// The hashed address this storage proof belongs to
pub(crate) hashed_address: B256,
/// The storage proof calculation result
pub(crate) result: Result<StorageProofResult, ParallelStateRootError>,
}
/// Internal message for storage workers.
#[derive(Debug)]
enum StorageWorkerJob {
@@ -642,7 +722,7 @@ enum StorageWorkerJob {
/// Storage proof input parameters
input: StorageProofInput,
/// Context for sending the proof result.
proof_result_sender: ProofResultContext,
proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
},
/// Blinded storage node retrieval request
BlindedStorageNode {
@@ -674,6 +754,8 @@ struct StorageProofWorker<Factory> {
/// Cursor metrics for this worker
#[cfg(feature = "metrics")]
cursor_metrics: ProofTaskCursorMetrics,
/// Set to true if V2 proofs are enabled.
v2_enabled: bool,
}
impl<Factory> StorageProofWorker<Factory>
@@ -698,9 +780,16 @@ where
metrics,
#[cfg(feature = "metrics")]
cursor_metrics,
v2_enabled: false,
}
}
/// Changes whether or not V2 proofs are enabled.
const fn with_v2_proofs(mut self, v2_enabled: bool) -> Self {
self.v2_enabled = v2_enabled;
self
}
/// Runs the worker loop, processing jobs until the channel closes.
///
/// # Lifecycle
@@ -728,6 +817,7 @@ where
metrics,
#[cfg(feature = "metrics")]
ref mut cursor_metrics,
v2_enabled: _,
} = self;
// Create provider from factory
@@ -743,6 +833,13 @@ where
let mut storage_proofs_processed = 0u64;
let mut storage_nodes_processed = 0u64;
let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
let mut v2_calculator = if self.v2_enabled {
let trie_cursor = proof_tx.provider.storage_trie_cursor(B256::ZERO)?;
let hashed_cursor = proof_tx.provider.hashed_storage_cursor(B256::ZERO)?;
Some(proof_v2::StorageProofCalculator::new_storage(trie_cursor, hashed_cursor))
} else {
None
};
// Initially mark this worker as available.
available_workers.fetch_add(1, Ordering::Relaxed);
@@ -756,6 +853,7 @@ where
Self::process_storage_proof(
worker_id,
&proof_tx,
v2_calculator.as_mut(),
input,
proof_result_sender,
&mut storage_proofs_processed,
@@ -800,53 +898,58 @@ where
fn process_storage_proof<Provider>(
worker_id: usize,
proof_tx: &ProofTaskTx<Provider>,
v2_calculator: Option<
&mut StorageProofCalculator<
<Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
<Provider as HashedCursorFactory>::StorageCursor<'_>,
>,
>,
input: StorageProofInput,
proof_result_sender: ProofResultContext,
proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
storage_proofs_processed: &mut u64,
cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
) where
Provider: TrieCursorFactory + HashedCursorFactory,
{
let hashed_address = input.hashed_address;
let ProofResultContext { sender, sequence_number: seq, state, start_time } =
proof_result_sender;
let mut trie_cursor_metrics = TrieCursorMetricsCache::default();
let mut hashed_cursor_metrics = HashedCursorMetricsCache::default();
trace!(
target: "trie::proof_task",
worker_id,
hashed_address = ?hashed_address,
prefix_set_len = input.prefix_set.len(),
target_slots_len = input.target_slots.len(),
"Processing storage proof"
);
let hashed_address = input.hashed_address();
let proof_start = Instant::now();
let result = proof_tx.compute_storage_proof(
input,
&mut trie_cursor_metrics,
&mut hashed_cursor_metrics,
);
let result = match &input {
StorageProofInput::Legacy { hashed_address, prefix_set, target_slots, .. } => {
trace!(
target: "trie::proof_task",
worker_id,
hashed_address = ?hashed_address,
prefix_set_len = prefix_set.len(),
target_slots_len = target_slots.len(),
"Processing storage proof"
);
proof_tx.compute_legacy_storage_proof(
input,
&mut trie_cursor_metrics,
&mut hashed_cursor_metrics,
)
}
StorageProofInput::V2 { hashed_address, targets } => {
trace!(
target: "trie::proof_task",
worker_id,
hashed_address = ?hashed_address,
targets_len = targets.len(),
"Processing V2 storage proof"
);
proof_tx
.compute_v2_storage_proof(input, v2_calculator.expect("v2 calculator provided"))
}
};
let proof_elapsed = proof_start.elapsed();
*storage_proofs_processed += 1;
let result_msg = result.map(|storage_proof| ProofResult::StorageProof {
hashed_address,
proof: storage_proof,
});
if sender
.send(ProofResultMessage {
sequence_number: seq,
result: result_msg,
elapsed: start_time.elapsed(),
state,
})
.is_err()
{
if proof_result_sender.send(StorageProofResultMessage { hashed_address, result }).is_err() {
trace!(
target: "trie::proof_task",
worker_id,
@@ -1094,6 +1197,7 @@ where
missed_leaves_storage_roots,
proof_result_sender:
ProofResultContext { sender: result_tx, sequence_number: seq, state, start_time: start },
v2_proofs_enabled,
} = input;
let span = debug_span!(
@@ -1126,6 +1230,7 @@ where
&mut storage_prefix_sets,
collect_branch_node_masks,
multi_added_removed_keys.as_ref(),
v2_proofs_enabled,
) {
Ok(receivers) => receivers,
Err(error) => {
@@ -1162,7 +1267,7 @@ where
proof_cursor_metrics.record_spans();
let stats = tracker.finish();
let result = result.map(|proof| ProofResult::AccountMultiproof { proof, stats });
let result = result.map(|proof| ProofResult { proof, stats });
*account_proofs_processed += 1;
// Send result to MultiProofTask
@@ -1340,21 +1445,21 @@ where
drop(_guard);
// Extract storage proof from the result
let proof = match proof_msg.result? {
ProofResult::StorageProof { hashed_address: addr, proof } => {
debug_assert_eq!(
addr,
hashed_address,
"storage worker must return same address: expected {hashed_address}, got {addr}"
);
proof
}
ProofResult::AccountMultiproof { .. } => {
unreachable!("storage worker only sends StorageProof variant")
}
debug_assert_eq!(
proof_msg.hashed_address, hashed_address,
"storage worker must return same address"
);
let proof_result = proof_msg.result?;
let Some(root) = proof_result.root() else {
trace!(
target: "trie::proof_task",
?proof_result,
"Received proof_result without root",
);
panic!("Partial proofs are not yet supported");
};
let root = proof.root;
let proof = Into::<Option<DecodedStorageMultiProof>>::into(proof_result)
.expect("Partial proofs are not yet supported (into)");
collected_decoded_storages.insert(hashed_address, proof);
root
}
@@ -1411,9 +1516,10 @@ where
for (hashed_address, receiver) in storage_proof_receivers {
if let Ok(proof_msg) = receiver.recv() {
// Extract storage proof from the result
if let Ok(ProofResult::StorageProof { proof, .. }) = proof_msg.result {
collected_decoded_storages.insert(hashed_address, proof);
}
let proof_result = proof_msg.result?;
let proof = Into::<Option<DecodedStorageMultiProof>>::into(proof_result)
.expect("Partial proofs are not yet supported");
collected_decoded_storages.insert(hashed_address, proof);
}
}
@@ -1457,39 +1563,36 @@ fn dispatch_storage_proofs(
storage_prefix_sets: &mut B256Map<PrefixSet>,
with_branch_node_masks: bool,
multi_added_removed_keys: Option<&Arc<MultiAddedRemovedKeys>>,
) -> Result<B256Map<CrossbeamReceiver<ProofResultMessage>>, ParallelStateRootError> {
use_v2_proofs: bool,
) -> Result<B256Map<CrossbeamReceiver<StorageProofResultMessage>>, ParallelStateRootError> {
let mut storage_proof_receivers =
B256Map::with_capacity_and_hasher(targets.len(), Default::default());
// Dispatch all storage proofs to worker pool
for (hashed_address, target_slots) in targets.iter() {
let prefix_set = storage_prefix_sets.remove(hashed_address).unwrap_or_default();
// Create channel for receiving ProofResultMessage
let (result_tx, result_rx) = crossbeam_channel::unbounded();
let start = Instant::now();
// Create computation input (data only, no communication channel)
let input = StorageProofInput::new(
*hashed_address,
prefix_set,
target_slots.clone(),
with_branch_node_masks,
multi_added_removed_keys.cloned(),
);
// Create computation input based on V2 flag
let input = if use_v2_proofs {
// Convert target slots to V2 targets
let v2_targets = target_slots.iter().copied().map(Into::into).collect();
StorageProofInput::new(*hashed_address, v2_targets)
} else {
let prefix_set = storage_prefix_sets.remove(hashed_address).unwrap_or_default();
StorageProofInput::legacy(
*hashed_address,
prefix_set,
target_slots.clone(),
with_branch_node_masks,
multi_added_removed_keys.cloned(),
)
};
// Always dispatch a storage proof so we obtain the storage root even when no slots are
// requested.
storage_work_tx
.send(StorageWorkerJob::StorageProof {
input,
proof_result_sender: ProofResultContext::new(
result_tx,
0,
HashedPostState::default(),
start,
),
})
.send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
.map_err(|_| {
ParallelStateRootError::Other(format!(
"Failed to queue storage proof for {}: storage worker pool unavailable",
@@ -1504,30 +1607,40 @@ fn dispatch_storage_proofs(
}
/// Input parameters for storage proof computation.
#[derive(Debug)]
pub struct StorageProofInput {
/// The hashed address for which the proof is calculated.
hashed_address: B256,
/// The prefix set for the proof calculation.
prefix_set: PrefixSet,
/// The target slots for the proof calculation.
target_slots: B256Set,
/// Whether or not to collect branch node masks
with_branch_node_masks: bool,
/// Provided by the user to give the necessary context to retain extra proofs.
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
pub enum StorageProofInput {
/// Legacy storage proof variant
Legacy {
/// The hashed address for which the proof is calculated.
hashed_address: B256,
/// The prefix set for the proof calculation.
prefix_set: PrefixSet,
/// The target slots for the proof calculation.
target_slots: B256Set,
/// Whether or not to collect branch node masks
with_branch_node_masks: bool,
/// Provided by the user to give the necessary context to retain extra proofs.
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
},
/// V2 storage proof variant
V2 {
/// The hashed address for which the proof is calculated.
hashed_address: B256,
/// The set of proof targets
targets: Vec<proof_v2::Target>,
},
}
impl StorageProofInput {
/// Creates a new [`StorageProofInput`] with the given hashed address, prefix set, and target
/// Creates a legacy [`StorageProofInput`] with the given hashed address, prefix set, and target
/// slots.
pub const fn new(
pub const fn legacy(
hashed_address: B256,
prefix_set: PrefixSet,
target_slots: B256Set,
with_branch_node_masks: bool,
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
) -> Self {
Self {
Self::Legacy {
hashed_address,
prefix_set,
target_slots,
@@ -1535,7 +1648,22 @@ impl StorageProofInput {
multi_added_removed_keys,
}
}
/// Creates a new [`StorageProofInput`] with the given hashed address and target slots.
pub const fn new(hashed_address: B256, targets: Vec<proof_v2::Target>) -> Self {
Self::V2 { hashed_address, targets }
}
/// Returns the targeted hashed address.
pub const fn hashed_address(&self) -> B256 {
match self {
Self::Legacy { hashed_address, .. } | Self::V2 { hashed_address, .. } => {
*hashed_address
}
}
}
}
/// Input parameters for account multiproof computation.
#[derive(Debug, Clone)]
pub struct AccountMultiproofInput {
@@ -1551,6 +1679,8 @@ pub struct AccountMultiproofInput {
pub missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
/// Context for sending the proof result.
pub proof_result_sender: ProofResultContext,
/// Whether to use V2 storage proofs.
pub v2_proofs_enabled: bool,
}
/// Parameters for building an account multiproof with pre-computed storage roots.
@@ -1564,7 +1694,7 @@ struct AccountMultiproofParams<'a> {
/// Provided by the user to give the necessary context to retain extra proofs.
multi_added_removed_keys: Option<&'a Arc<MultiAddedRemovedKeys>>,
/// Receivers for storage proofs being computed in parallel.
storage_proof_receivers: B256Map<CrossbeamReceiver<ProofResultMessage>>,
storage_proof_receivers: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
/// Cached storage proof roots for missed leaves encountered during account trie walk.
missed_leaves_storage_roots: &'a DashMap<B256, B256>,
}
@@ -1607,7 +1737,7 @@ mod tests {
reth_provider::providers::OverlayStateProviderFactory::new(provider_factory);
let ctx = test_ctx(factory);
let proof_handle = ProofWorkerHandle::new(handle.clone(), ctx, 5, 3);
let proof_handle = ProofWorkerHandle::new(handle.clone(), ctx, 5, 3, false);
// Verify handle can be cloned
let _cloned_handle = proof_handle.clone();

View File

@@ -4,7 +4,7 @@ use crate::{stats::ParallelTrieTracker, storage_root_targets::StorageRootTargets
use alloy_primitives::B256;
use alloy_rlp::{BufMut, Encodable};
use itertools::Itertools;
use reth_execution_errors::StorageRootError;
use reth_execution_errors::{StateProofError, StorageRootError};
use reth_provider::{DatabaseProviderROFactory, ProviderError};
use reth_storage_errors::db::DatabaseError;
use reth_trie::{
@@ -255,6 +255,15 @@ impl From<alloy_rlp::Error> for ParallelStateRootError {
}
}
impl From<StateProofError> for ParallelStateRootError {
fn from(error: StateProofError) -> Self {
match error {
StateProofError::Database(err) => Self::Provider(ProviderError::Database(err)),
StateProofError::Rlp(err) => Self::Provider(ProviderError::Rlp(err)),
}
}
}
/// Gets or creates a tokio runtime handle for spawning blocking tasks.
/// This ensures we always have a runtime available for I/O operations.
fn get_runtime_handle() -> Handle {

View File

@@ -11,7 +11,7 @@ use crate::{
hashed_cursor::{HashedCursor, HashedStorageCursor},
trie_cursor::{depth_first, TrieCursor, TrieStorageCursor},
};
use alloy_primitives::{B256, U256};
use alloy_primitives::{keccak256, B256, U256};
use alloy_rlp::Encodable;
use alloy_trie::{BranchNodeCompact, TrieMask};
use reth_execution_errors::trie::StateProofError;
@@ -1354,6 +1354,31 @@ where
// Use the static StorageValueEncoder and pass it to proof_inner
self.proof_inner(&STORAGE_VALUE_ENCODER, targets)
}
/// Computes the root hash from a set of proof nodes.
///
/// Returns `None` if there is no root node (partial proof), otherwise returns the hash of the
/// root node.
///
/// This method reuses the internal RLP encode buffer for efficiency.
pub fn compute_root_hash(
&mut self,
proof_nodes: &[ProofTrieNode],
) -> Result<Option<B256>, StateProofError> {
// Find the root node (node at empty path)
let root_node = proof_nodes.iter().find(|node| node.path.is_empty());
let Some(root) = root_node else {
return Ok(None);
};
// Compute the hash of the root node
self.rlp_encode_buf.clear();
root.node.encode(&mut self.rlp_encode_buf);
let root_hash = keccak256(&self.rlp_encode_buf);
Ok(Some(root_hash))
}
}
/// Helper type wrapping a slice of [`Target`]s, primarily used to iterate through targets in

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env -S cargo +nightly -Zscript
#!/usr/bin/env -S cargo -Zscript
---
[package]
edition = "2021"
@@ -260,12 +260,17 @@ fn update_root_summary(root_dir: &Path, root_summary: &str) -> io::Result<()> {
}
/// Generates TypeScript sidebar files for each command.
fn generate_sidebar_files(vocs_dir: &Path, output: &[(Cmd, String)], verbose: bool) -> io::Result<()> {
fn generate_sidebar_files(
vocs_dir: &Path,
output: &[(Cmd, String)],
verbose: bool,
) -> io::Result<()> {
// Group commands by their root command name (reth or op-reth)
// Also create a map of commands to their help output
let mut commands_by_root: std::collections::HashMap<String, Vec<&Cmd>> = std::collections::HashMap::new();
let mut commands_by_root: std::collections::HashMap<String, Vec<&Cmd>> =
std::collections::HashMap::new();
let mut help_map: std::collections::HashMap<String, String> = std::collections::HashMap::new();
for (cmd, help_output) in output {
let root_name = cmd.command_name().to_string();
commands_by_root.entry(root_name.clone()).or_insert_with(Vec::new).push(cmd);
@@ -288,7 +293,7 @@ fn generate_sidebar_files(vocs_dir: &Path, output: &[(Cmd, String)], verbose: bo
continue;
}
};
let sidebar_file = vocs_dir.join(file_name);
if verbose {
println!("Writing sidebar file: {}", sidebar_file.display());
@@ -307,19 +312,16 @@ fn generate_sidebar_ts(
help_map: &std::collections::HashMap<String, String>,
) -> io::Result<String> {
// Find all top-level commands (commands with exactly one subcommand)
let mut top_level_commands: Vec<&Cmd> = commands
.iter()
.copied()
.filter(|cmd| cmd.subcommands.len() == 1)
.collect();
let mut top_level_commands: Vec<&Cmd> =
commands.iter().copied().filter(|cmd| cmd.subcommands.len() == 1).collect();
// Remove duplicates using a set
let mut seen = std::collections::HashSet::new();
top_level_commands.retain(|cmd| {
let key = &cmd.subcommands[0];
seen.insert(key.clone())
});
// Sort by the order they appear in help output, not alphabetically
if let Some(help) = root_help {
let help_order = parse_sub_commands(help);
@@ -345,14 +347,15 @@ fn generate_sidebar_ts(
ts_code.push_str(&format!(" link: \"/cli/{}\",\n", root_name));
ts_code.push_str(" collapsed: false,\n");
ts_code.push_str(" items: [\n");
for (idx, cmd) in top_level_commands.iter().enumerate() {
let is_last = idx == top_level_commands.len() - 1;
if let Some(item_str) = build_sidebar_item(root_name, cmd, &commands, 1, help_map, is_last) {
if let Some(item_str) = build_sidebar_item(root_name, cmd, &commands, 1, help_map, is_last)
{
ts_code.push_str(&item_str);
}
}
ts_code.push_str(" ]\n");
ts_code.push_str("};\n\n");
@@ -371,17 +374,18 @@ fn build_sidebar_item(
) -> Option<String> {
let full_cmd_name = cmd.to_string();
let link_path = format!("/cli/{}", full_cmd_name.replace(" ", "/"));
// Find all direct child commands (commands whose subcommands start with this command's subcommands)
// Find all direct child commands (commands whose subcommands start with this command's
// subcommands)
let mut children: Vec<&Cmd> = all_commands
.iter()
.copied()
.filter(|other_cmd| {
other_cmd.subcommands.len() == cmd.subcommands.len() + 1
&& other_cmd.subcommands[..cmd.subcommands.len()] == cmd.subcommands[..]
other_cmd.subcommands.len() == cmd.subcommands.len() + 1 &&
other_cmd.subcommands[..cmd.subcommands.len()] == cmd.subcommands[..]
})
.collect();
// Sort children by the order they appear in help output, not alphabetically
if children.len() > 1 {
// Get help output for this command to determine subcommand order
@@ -396,31 +400,37 @@ fn build_sidebar_item(
});
} else {
// Fall back to alphabetical if we can't get help
children.sort_by(|a, b| {
a.subcommands.last().unwrap().cmp(b.subcommands.last().unwrap())
});
children
.sort_by(|a, b| a.subcommands.last().unwrap().cmp(b.subcommands.last().unwrap()));
}
}
let indent = " ".repeat(depth);
let mut item_str = String::new();
item_str.push_str(&format!("{}{{\n", indent));
item_str.push_str(&format!("{} text: \"{}\",\n", indent, full_cmd_name));
item_str.push_str(&format!("{} link: \"{}\"", indent, link_path));
if !children.is_empty() {
item_str.push_str(",\n");
item_str.push_str(&format!("{} collapsed: true,\n", indent));
item_str.push_str(&format!("{} items: [\n", indent));
for (idx, child_cmd) in children.iter().enumerate() {
let child_is_last = idx == children.len() - 1;
if let Some(child_str) = build_sidebar_item(root_name, child_cmd, all_commands, depth + 1, help_map, child_is_last) {
if let Some(child_str) = build_sidebar_item(
root_name,
child_cmd,
all_commands,
depth + 1,
help_map,
child_is_last,
) {
item_str.push_str(&child_str);
}
}
item_str.push_str(&format!("{} ]\n", indent));
if is_last {
item_str.push_str(&format!("{}}}\n", indent));

View File

@@ -956,6 +956,9 @@ Engine:
--engine.account-worker-count <ACCOUNT_WORKER_COUNT>
Configure the number of account proof workers in the Tokio blocking pool. If not specified, defaults to the same count as storage workers
--engine.enable-proof-v2
Enable V2 storage proofs for state root calculations
ERA:
--era.enable
Enable import from ERA1 files

View File

@@ -956,6 +956,9 @@ Engine:
--engine.account-worker-count <ACCOUNT_WORKER_COUNT>
Configure the number of account proof workers in the Tokio blocking pool. If not specified, defaults to the same count as storage workers
--engine.enable-proof-v2
Enable V2 storage proofs for state root calculations
ERA:
--era.enable
Enable import from ERA1 files