diff --git a/crates/engine/primitives/src/config.rs b/crates/engine/primitives/src/config.rs
index 2870d3dccc..0b72e1d624 100644
--- a/crates/engine/primitives/src/config.rs
+++ b/crates/engine/primitives/src/config.rs
@@ -34,6 +34,11 @@ fn default_account_worker_count() -> usize {
/// The size of proof targets chunk to spawn in one multiproof calculation.
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 60;
+/// The size of proof targets chunk to spawn in one multiproof calculation when V2 proofs are
+/// enabled. This is 4x the default chunk size to take advantage of more efficient V2 proof
+/// computation.
+pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2: usize = DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE * 4;
+
/// Default number of reserved CPU cores for non-reth processes.
///
/// This will be deducted from the thread count of main reth global threadpool.
@@ -267,6 +272,17 @@ impl TreeConfig {
self.multiproof_chunk_size
}
+ /// Return the multiproof task chunk size, using the V2 default if V2 proofs are enabled
+ /// and the chunk size is at the default value.
+ pub const fn effective_multiproof_chunk_size(&self) -> usize {
+ if self.enable_proof_v2 && self.multiproof_chunk_size == DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE
+ {
+ DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2
+ } else {
+ self.multiproof_chunk_size
+ }
+ }
+
/// Return the number of reserved CPU cores for non-reth processes
pub const fn reserved_cpu_cores(&self) -> usize {
self.reserved_cpu_cores
diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs
index 1803929c89..1fa4232b0e 100644
--- a/crates/engine/tree/src/tree/payload_processor/mod.rs
+++ b/crates/engine/tree/src/tree/payload_processor/mod.rs
@@ -245,6 +245,9 @@ where
let (to_sparse_trie, sparse_trie_rx) = channel();
let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded();
+ // Extract V2 proofs flag early so we can pass it to prewarm
+ let v2_proofs_enabled = config.enable_proof_v2();
+
// Handle BAL-based optimization if available
let prewarm_handle = if let Some(bal) = bal {
// When BAL is present, use BAL prewarming and send BAL to multiproof
@@ -261,6 +264,7 @@ where
provider_builder.clone(),
None, // Don't send proof targets when BAL is present
Some(bal),
+ v2_proofs_enabled,
)
} else {
// Normal path: spawn with transaction prewarming
@@ -271,6 +275,7 @@ where
provider_builder.clone(),
Some(to_multi_proof.clone()),
None,
+ v2_proofs_enabled,
)
};
@@ -278,7 +283,6 @@ where
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
let storage_worker_count = config.storage_worker_count();
let account_worker_count = config.account_worker_count();
- let v2_proofs_enabled = config.enable_proof_v2();
let proof_handle = ProofWorkerHandle::new(
self.executor.handle().clone(),
task_ctx,
@@ -290,10 +294,13 @@ where
let multi_proof_task = MultiProofTask::new(
proof_handle.clone(),
to_sparse_trie,
- config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
+ config
+ .multiproof_chunking_enabled()
+ .then_some(config.effective_multiproof_chunk_size()),
to_multi_proof.clone(),
from_multi_proof,
- );
+ )
+ .with_v2_proofs_enabled(v2_proofs_enabled);
// spawn multi-proof task
let parent_span = span.clone();
@@ -342,8 +349,9 @@ where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
let (prewarm_rx, execution_rx, size_hint) = self.spawn_tx_iterator(transactions);
+ // This path doesn't use multiproof, so V2 proofs flag doesn't matter
let prewarm_handle =
- self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None, bal);
+ self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None, bal, false);
PayloadHandle {
to_multi_proof: None,
prewarm_handle,
@@ -410,6 +418,7 @@ where
}
/// Spawn prewarming optionally wired to the multiproof task for target updates.
+ #[expect(clippy::too_many_arguments)]
fn spawn_caching_with
(
&self,
env: ExecutionEnv,
@@ -418,6 +427,7 @@ where
provider_builder: StateProviderBuilder,
to_multi_proof: Option>,
bal: Option>,
+ v2_proofs_enabled: bool,
) -> CacheTaskHandle
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
@@ -440,6 +450,7 @@ where
terminate_execution: Arc::new(AtomicBool::new(false)),
precompile_cache_disabled: self.precompile_cache_disabled,
precompile_cache_map: self.precompile_cache_map.clone(),
+ v2_proofs_enabled,
};
let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs
index b5f1272b67..823c3e54e9 100644
--- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs
+++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs
@@ -11,14 +11,18 @@ use reth_metrics::Metrics;
use reth_provider::AccountReader;
use reth_revm::state::EvmState;
use reth_trie::{
- added_removed_keys::MultiAddedRemovedKeys, DecodedMultiProof, HashedPostState, HashedStorage,
+ added_removed_keys::MultiAddedRemovedKeys, proof_v2, HashedPostState, HashedStorage,
MultiProofTargets,
};
+#[cfg(test)]
+use reth_trie_parallel::stats::ParallelTrieTracker;
use reth_trie_parallel::{
proof::ParallelProof,
proof_task::{
- AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
+ AccountMultiproofInput, ProofResult, ProofResultContext, ProofResultMessage,
+ ProofWorkerHandle,
},
+ targets_v2::{ChunkedMultiProofTargetsV2, MultiProofTargetsV2},
};
use revm_primitives::map::{hash_map, B256Map};
use std::{collections::BTreeMap, sync::Arc, time::Instant};
@@ -63,12 +67,12 @@ const DEFAULT_MAX_TARGETS_FOR_CHUNKING: usize = 300;
/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
/// state.
-#[derive(Default, Debug)]
+#[derive(Debug)]
pub struct SparseTrieUpdate {
/// The state update that was used to calculate the proof
pub(crate) state: HashedPostState,
/// The calculated multiproof
- pub(crate) multiproof: DecodedMultiProof,
+ pub(crate) multiproof: ProofResult,
}
impl SparseTrieUpdate {
@@ -80,7 +84,11 @@ impl SparseTrieUpdate {
/// Construct update from multiproof.
#[cfg(test)]
pub(super) fn from_multiproof(multiproof: reth_trie::MultiProof) -> alloy_rlp::Result {
- Ok(Self { multiproof: multiproof.try_into()?, ..Default::default() })
+ let stats = ParallelTrieTracker::default().finish();
+ Ok(Self {
+ state: HashedPostState::default(),
+ multiproof: ProofResult::Legacy(multiproof.try_into()?, stats),
+ })
}
/// Extend update with contents of the other.
@@ -94,7 +102,7 @@ impl SparseTrieUpdate {
#[derive(Debug)]
pub(super) enum MultiProofMessage {
/// Prefetch proof targets
- PrefetchProofs(MultiProofTargets),
+ PrefetchProofs(VersionedMultiProofTargets),
/// New state update from transaction execution with its source
StateUpdate(Source, EvmState),
/// State update that can be applied to the sparse trie without any new proofs.
@@ -223,12 +231,155 @@ pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostStat
hashed_state
}
+/// Extends a `MultiProofTargets` with the contents of a `VersionedMultiProofTargets`,
+/// regardless of which variant the latter is.
+fn extend_multiproof_targets(dest: &mut MultiProofTargets, src: &VersionedMultiProofTargets) {
+ match src {
+ VersionedMultiProofTargets::Legacy(targets) => {
+ dest.extend_ref(targets);
+ }
+ VersionedMultiProofTargets::V2(targets) => {
+ // Add all account targets
+ for target in &targets.account_targets {
+ dest.entry(target.key()).or_default();
+ }
+
+ // Add all storage targets
+ for (hashed_address, slots) in &targets.storage_targets {
+ let slot_set = dest.entry(*hashed_address).or_default();
+ for slot in slots {
+ slot_set.insert(slot.key());
+ }
+ }
+ }
+ }
+}
+
+/// A set of multiproof targets which can be either in the legacy or V2 representations.
+#[derive(Debug)]
+pub(super) enum VersionedMultiProofTargets {
+ /// Legacy targets
+ Legacy(MultiProofTargets),
+ /// V2 targets
+ V2(MultiProofTargetsV2),
+}
+
+impl VersionedMultiProofTargets {
+ /// Returns true if there are no account or storage targets.
+ fn is_empty(&self) -> bool {
+ match self {
+ Self::Legacy(targets) => targets.is_empty(),
+ Self::V2(targets) => targets.is_empty(),
+ }
+ }
+
+ /// Returns the number of account targets in the multiproof target
+ fn account_targets_len(&self) -> usize {
+ match self {
+ Self::Legacy(targets) => targets.len(),
+ Self::V2(targets) => targets.account_targets.len(),
+ }
+ }
+
+ /// Returns the number of storage targets in the multiproof target
+ fn storage_targets_len(&self) -> usize {
+ match self {
+ Self::Legacy(targets) => targets.values().map(|slots| slots.len()).sum::(),
+ Self::V2(targets) => {
+ targets.storage_targets.values().map(|slots| slots.len()).sum::()
+ }
+ }
+ }
+
+ /// Returns the number of accounts in the multiproof targets.
+ fn len(&self) -> usize {
+ match self {
+ Self::Legacy(targets) => targets.len(),
+ Self::V2(targets) => targets.account_targets.len(),
+ }
+ }
+
+ /// Returns the total storage slot count across all accounts.
+ fn storage_count(&self) -> usize {
+ match self {
+ Self::Legacy(targets) => targets.values().map(|slots| slots.len()).sum(),
+ Self::V2(targets) => targets.storage_targets.values().map(|slots| slots.len()).sum(),
+ }
+ }
+
+ /// Returns the number of items that will be considered during chunking.
+ fn chunking_length(&self) -> usize {
+ match self {
+ Self::Legacy(targets) => targets.chunking_length(),
+ Self::V2(targets) => {
+ // For V2, count accounts + storage slots
+ targets.account_targets.len() +
+ targets.storage_targets.values().map(|slots| slots.len()).sum::()
+ }
+ }
+ }
+
+ /// Retains the targets representing the difference with another `MultiProofTargets`.
+ /// Removes all targets that are already present in `other`.
+ fn retain_difference(&mut self, other: &MultiProofTargets) {
+ match self {
+ Self::Legacy(targets) => {
+ targets.retain_difference(other);
+ }
+ Self::V2(targets) => {
+ // Remove account targets that exist in other
+ targets.account_targets.retain(|target| !other.contains_key(&target.key()));
+
+ // For each account in storage_targets, remove slots that exist in other
+ targets.storage_targets.retain(|hashed_address, slots| {
+ if let Some(other_slots) = other.get(hashed_address) {
+ slots.retain(|slot| !other_slots.contains(&slot.key()));
+ !slots.is_empty()
+ } else {
+ true
+ }
+ });
+ }
+ }
+ }
+
+ /// Extends this `VersionedMultiProofTargets` with the contents of another.
+ ///
+ /// Panics if the variants do not match.
+ fn extend(&mut self, other: Self) {
+ match (self, other) {
+ (Self::Legacy(dest), Self::Legacy(src)) => {
+ dest.extend(src);
+ }
+ (Self::V2(dest), Self::V2(src)) => {
+ dest.account_targets.extend(src.account_targets);
+ for (addr, slots) in src.storage_targets {
+ dest.storage_targets.entry(addr).or_default().extend(slots);
+ }
+ }
+ _ => panic!("Cannot extend VersionedMultiProofTargets with mismatched variants"),
+ }
+ }
+
+ /// Chunks this `VersionedMultiProofTargets` into smaller chunks of the given size.
+ fn chunks(self, chunk_size: usize) -> Box> {
+ match self {
+ Self::Legacy(targets) => {
+ Box::new(MultiProofTargets::chunks(targets, chunk_size).map(Self::Legacy))
+ }
+ Self::V2(targets) => {
+ Box::new(ChunkedMultiProofTargetsV2::new(targets, chunk_size).map(Self::V2))
+ }
+ }
+ }
+}
+
/// Input parameters for dispatching a multiproof calculation.
#[derive(Debug)]
struct MultiproofInput {
source: Option,
hashed_state_update: HashedPostState,
- proof_targets: MultiProofTargets,
+ proof_targets: VersionedMultiProofTargets,
proof_sequence_number: u64,
state_root_message_sender: CrossbeamSender,
multi_added_removed_keys: Option>,
@@ -263,8 +414,6 @@ pub struct MultiproofManager {
proof_result_tx: CrossbeamSender,
/// Metrics
metrics: MultiProofTaskMetrics,
- /// Whether to use V2 storage proofs
- v2_proofs_enabled: bool,
}
impl MultiproofManager {
@@ -278,9 +427,7 @@ impl MultiproofManager {
metrics.max_storage_workers.set(proof_worker_handle.total_storage_workers() as f64);
metrics.max_account_workers.set(proof_worker_handle.total_account_workers() as f64);
- let v2_proofs_enabled = proof_worker_handle.v2_proofs_enabled();
-
- Self { metrics, proof_worker_handle, proof_result_tx, v2_proofs_enabled }
+ Self { metrics, proof_worker_handle, proof_result_tx }
}
/// Dispatches a new multiproof calculation to worker pools.
@@ -325,41 +472,48 @@ impl MultiproofManager {
multi_added_removed_keys,
} = multiproof_input;
- let account_targets = proof_targets.len();
- let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::();
-
trace!(
target: "engine::tree::payload_processor::multiproof",
proof_sequence_number,
?proof_targets,
- account_targets,
- storage_targets,
+ account_targets = proof_targets.account_targets_len(),
+ storage_targets = proof_targets.storage_targets_len(),
?source,
"Dispatching multiproof to workers"
);
let start = Instant::now();
- // Extend prefix sets with targets
- let frozen_prefix_sets =
- ParallelProof::extend_prefix_sets_with_targets(&Default::default(), &proof_targets);
+ // Workers will send ProofResultMessage directly to proof_result_rx
+ let proof_result_sender = ProofResultContext::new(
+ self.proof_result_tx.clone(),
+ proof_sequence_number,
+ hashed_state_update,
+ start,
+ );
- // Dispatch account multiproof to worker pool with result sender
- let input = AccountMultiproofInput {
- targets: proof_targets,
- prefix_sets: frozen_prefix_sets,
- collect_branch_node_masks: true,
- multi_added_removed_keys,
- // Workers will send ProofResultMessage directly to proof_result_rx
- proof_result_sender: ProofResultContext::new(
- self.proof_result_tx.clone(),
- proof_sequence_number,
- hashed_state_update,
- start,
- ),
- v2_proofs_enabled: self.v2_proofs_enabled,
+ let input = match proof_targets {
+ VersionedMultiProofTargets::Legacy(proof_targets) => {
+ // Extend prefix sets with targets
+ let frozen_prefix_sets = ParallelProof::extend_prefix_sets_with_targets(
+ &Default::default(),
+ &proof_targets,
+ );
+
+ AccountMultiproofInput::Legacy {
+ targets: proof_targets,
+ prefix_sets: frozen_prefix_sets,
+ collect_branch_node_masks: true,
+ multi_added_removed_keys,
+ proof_result_sender,
+ }
+ }
+ VersionedMultiProofTargets::V2(proof_targets) => {
+ AccountMultiproofInput::V2 { targets: proof_targets, proof_result_sender }
+ }
};
+ // Dispatch account multiproof to worker pool with result sender
if let Err(e) = self.proof_worker_handle.dispatch_account_multiproof(input) {
error!(target: "engine::tree::payload_processor::multiproof", ?e, "Failed to dispatch account multiproof");
return;
@@ -561,6 +715,9 @@ pub(super) struct MultiProofTask {
/// there are any active workers and force chunking across workers. This is to prevent tasks
/// which are very long from hitting a single worker.
max_targets_for_chunking: usize,
+ /// Whether or not V2 proof calculation is enabled. If enabled then [`MultiProofTargetsV2`]
+ /// will be produced by state updates.
+ v2_proofs_enabled: bool,
}
impl MultiProofTask {
@@ -592,9 +749,16 @@ impl MultiProofTask {
),
metrics,
max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
+ v2_proofs_enabled: false,
}
}
+ /// Enables V2 proof target generation on state updates.
+ pub(super) const fn with_v2_proofs_enabled(mut self, v2_proofs_enabled: bool) -> Self {
+ self.v2_proofs_enabled = v2_proofs_enabled;
+ self
+ }
+
/// Handles request for proof prefetch.
///
/// Returns how many multiproof tasks were dispatched for the prefetch request.
@@ -602,37 +766,29 @@ impl MultiProofTask {
level = "debug",
target = "engine::tree::payload_processor::multiproof",
skip_all,
- fields(accounts = targets.len(), chunks = 0)
+ fields(accounts = targets.account_targets_len(), chunks = 0)
)]
- fn on_prefetch_proof(&mut self, mut targets: MultiProofTargets) -> u64 {
+ fn on_prefetch_proof(&mut self, mut targets: VersionedMultiProofTargets) -> u64 {
// Remove already fetched proof targets to avoid redundant work.
targets.retain_difference(&self.fetched_proof_targets);
- self.fetched_proof_targets.extend_ref(&targets);
+ extend_multiproof_targets(&mut self.fetched_proof_targets, &targets);
- // Make sure all target accounts have an `AddedRemovedKeySet` in the
+ // For Legacy multiproofs, make sure all target accounts have an `AddedRemovedKeySet` in the
// [`MultiAddedRemovedKeys`]. Even if there are not any known removed keys for the account,
// we still want to optimistically fetch extension children for the leaf addition case.
- self.multi_added_removed_keys.touch_accounts(targets.keys().copied());
-
- // Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks
- let multi_added_removed_keys = Arc::new(MultiAddedRemovedKeys {
- account: self.multi_added_removed_keys.account.clone(),
- storages: targets
- .keys()
- .filter_map(|account| {
- self.multi_added_removed_keys
- .storages
- .get(account)
- .cloned()
- .map(|keys| (*account, keys))
- })
- .collect(),
- });
+ // V2 multiproofs don't need this.
+ let multi_added_removed_keys =
+ if let VersionedMultiProofTargets::Legacy(legacy_targets) = &targets {
+ self.multi_added_removed_keys.touch_accounts(legacy_targets.keys().copied());
+ Some(Arc::new(self.multi_added_removed_keys.clone()))
+ } else {
+ None
+ };
self.metrics.prefetch_proof_targets_accounts_histogram.record(targets.len() as f64);
self.metrics
.prefetch_proof_targets_storages_histogram
- .record(targets.values().map(|slots| slots.len()).sum::() as f64);
+ .record(targets.storage_count() as f64);
let chunking_len = targets.chunking_length();
let available_account_workers =
@@ -646,7 +802,7 @@ impl MultiProofTask {
self.max_targets_for_chunking,
available_account_workers,
available_storage_workers,
- MultiProofTargets::chunks,
+ VersionedMultiProofTargets::chunks,
|proof_targets| {
self.multiproof_manager.dispatch(MultiproofInput {
source: None,
@@ -654,7 +810,7 @@ impl MultiProofTask {
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
- multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
+ multi_added_removed_keys: multi_added_removed_keys.clone(),
});
},
);
@@ -757,6 +913,7 @@ impl MultiProofTask {
self.multiproof_manager.proof_worker_handle.available_account_workers();
let available_storage_workers =
self.multiproof_manager.proof_worker_handle.available_storage_workers();
+
let num_chunks = dispatch_with_chunking(
not_fetched_state_update,
chunking_len,
@@ -770,8 +927,9 @@ impl MultiProofTask {
&hashed_state_update,
&self.fetched_proof_targets,
&multi_added_removed_keys,
+ self.v2_proofs_enabled,
);
- spawned_proof_targets.extend_ref(&proof_targets);
+ extend_multiproof_targets(&mut spawned_proof_targets, &proof_targets);
self.multiproof_manager.dispatch(MultiproofInput {
source: Some(source),
@@ -871,7 +1029,10 @@ impl MultiProofTask {
batch_metrics.proofs_processed += 1;
if let Some(combined_update) = self.on_proof(
sequence_number,
- SparseTrieUpdate { state, multiproof: Default::default() },
+ SparseTrieUpdate {
+ state,
+ multiproof: ProofResult::empty(self.v2_proofs_enabled),
+ },
) {
let _ = self.to_sparse_trie.send(combined_update);
}
@@ -898,8 +1059,7 @@ impl MultiProofTask {
}
let account_targets = merged_targets.len();
- let storage_targets =
- merged_targets.values().map(|slots| slots.len()).sum::();
+ let storage_targets = merged_targets.storage_count();
batch_metrics.prefetch_proofs_requested += self.on_prefetch_proof(merged_targets);
trace!(
target: "engine::tree::payload_processor::multiproof",
@@ -1003,7 +1163,10 @@ impl MultiProofTask {
if let Some(combined_update) = self.on_proof(
sequence_number,
- SparseTrieUpdate { state, multiproof: Default::default() },
+ SparseTrieUpdate {
+ state,
+ multiproof: ProofResult::empty(self.v2_proofs_enabled),
+ },
) {
let _ = self.to_sparse_trie.send(combined_update);
}
@@ -1106,7 +1269,7 @@ impl MultiProofTask {
let update = SparseTrieUpdate {
state: proof_result.state,
- multiproof: proof_result_data.proof,
+ multiproof: proof_result_data,
};
if let Some(combined_update) =
@@ -1196,7 +1359,7 @@ struct MultiproofBatchCtx {
/// received.
updates_finished_time: Option,
/// Reusable buffer for accumulating prefetch targets during batching.
- accumulated_prefetch_targets: Vec,
+ accumulated_prefetch_targets: Vec,
}
impl MultiproofBatchCtx {
@@ -1242,40 +1405,77 @@ fn get_proof_targets(
state_update: &HashedPostState,
fetched_proof_targets: &MultiProofTargets,
multi_added_removed_keys: &MultiAddedRemovedKeys,
-) -> MultiProofTargets {
- let mut targets = MultiProofTargets::default();
+ v2_enabled: bool,
+) -> VersionedMultiProofTargets {
+ if v2_enabled {
+ let mut targets = MultiProofTargetsV2::default();
- // first collect all new accounts (not previously fetched)
- for hashed_address in state_update.accounts.keys() {
- if !fetched_proof_targets.contains_key(hashed_address) {
- targets.insert(*hashed_address, HashSet::default());
+ // first collect all new accounts (not previously fetched)
+ for &hashed_address in state_update.accounts.keys() {
+ if !fetched_proof_targets.contains_key(&hashed_address) {
+ targets.account_targets.push(hashed_address.into());
+ }
}
+
+ // then process storage slots for all accounts in the state update
+ for (hashed_address, storage) in &state_update.storages {
+ let fetched = fetched_proof_targets.get(hashed_address);
+
+ // If the storage is wiped, we still need to fetch the account proof.
+ if storage.wiped && fetched.is_none() {
+ targets.account_targets.push(Into::::into(*hashed_address));
+ continue
+ }
+
+ let changed_slots = storage
+ .storage
+ .keys()
+ .filter(|slot| !fetched.is_some_and(|f| f.contains(*slot)))
+ .map(|slot| Into::::into(*slot))
+ .collect::>();
+
+ if !changed_slots.is_empty() {
+ targets.account_targets.push((*hashed_address).into());
+ targets.storage_targets.insert(*hashed_address, changed_slots);
+ }
+ }
+
+ VersionedMultiProofTargets::V2(targets)
+ } else {
+ let mut targets = MultiProofTargets::default();
+
+ // first collect all new accounts (not previously fetched)
+ for hashed_address in state_update.accounts.keys() {
+ if !fetched_proof_targets.contains_key(hashed_address) {
+ targets.insert(*hashed_address, HashSet::default());
+ }
+ }
+
+ // then process storage slots for all accounts in the state update
+ for (hashed_address, storage) in &state_update.storages {
+ let fetched = fetched_proof_targets.get(hashed_address);
+ let storage_added_removed_keys = multi_added_removed_keys.get_storage(hashed_address);
+ let mut changed_slots = storage
+ .storage
+ .keys()
+ .filter(|slot| {
+ !fetched.is_some_and(|f| f.contains(*slot)) ||
+ storage_added_removed_keys.is_some_and(|k| k.is_removed(slot))
+ })
+ .peekable();
+
+ // If the storage is wiped, we still need to fetch the account proof.
+ if storage.wiped && fetched.is_none() {
+ targets.entry(*hashed_address).or_default();
+ }
+
+ if changed_slots.peek().is_some() {
+ targets.entry(*hashed_address).or_default().extend(changed_slots);
+ }
+ }
+
+ VersionedMultiProofTargets::Legacy(targets)
}
-
- // then process storage slots for all accounts in the state update
- for (hashed_address, storage) in &state_update.storages {
- let fetched = fetched_proof_targets.get(hashed_address);
- let storage_added_removed_keys = multi_added_removed_keys.get_storage(hashed_address);
- let mut changed_slots = storage
- .storage
- .keys()
- .filter(|slot| {
- !fetched.is_some_and(|f| f.contains(*slot)) ||
- storage_added_removed_keys.is_some_and(|k| k.is_removed(slot))
- })
- .peekable();
-
- // If the storage is wiped, we still need to fetch the account proof.
- if storage.wiped && fetched.is_none() {
- targets.entry(*hashed_address).or_default();
- }
-
- if changed_slots.peek().is_some() {
- targets.entry(*hashed_address).or_default().extend(changed_slots);
- }
- }
-
- targets
}
/// Dispatches work items as a single unit or in chunks based on target size and worker
@@ -1481,12 +1681,24 @@ mod tests {
state
}
+ fn unwrap_legacy_targets(targets: VersionedMultiProofTargets) -> MultiProofTargets {
+ match targets {
+ VersionedMultiProofTargets::Legacy(targets) => targets,
+ VersionedMultiProofTargets::V2(_) => panic!("Expected Legacy targets"),
+ }
+ }
+
#[test]
fn test_get_proof_targets_new_account_targets() {
let state = create_get_proof_targets_state();
let fetched = MultiProofTargets::default();
- let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
+ let targets = unwrap_legacy_targets(get_proof_targets(
+ &state,
+ &fetched,
+ &MultiAddedRemovedKeys::new(),
+ false,
+ ));
// should return all accounts as targets since nothing was fetched before
assert_eq!(targets.len(), state.accounts.len());
@@ -1500,7 +1712,12 @@ mod tests {
let state = create_get_proof_targets_state();
let fetched = MultiProofTargets::default();
- let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
+ let targets = unwrap_legacy_targets(get_proof_targets(
+ &state,
+ &fetched,
+ &MultiAddedRemovedKeys::new(),
+ false,
+ ));
// verify storage slots are included for accounts with storage
for (addr, storage) in &state.storages {
@@ -1528,7 +1745,12 @@ mod tests {
// mark the account as already fetched
fetched.insert(*fetched_addr, HashSet::default());
- let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
+ let targets = unwrap_legacy_targets(get_proof_targets(
+ &state,
+ &fetched,
+ &MultiAddedRemovedKeys::new(),
+ false,
+ ));
// should not include the already fetched account since it has no storage updates
assert!(!targets.contains_key(fetched_addr));
@@ -1548,7 +1770,12 @@ mod tests {
fetched_slots.insert(fetched_slot);
fetched.insert(*addr, fetched_slots);
- let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
+ let targets = unwrap_legacy_targets(get_proof_targets(
+ &state,
+ &fetched,
+ &MultiAddedRemovedKeys::new(),
+ false,
+ ));
// should not include the already fetched storage slot
let target_slots = &targets[addr];
@@ -1561,7 +1788,12 @@ mod tests {
let state = HashedPostState::default();
let fetched = MultiProofTargets::default();
- let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
+ let targets = unwrap_legacy_targets(get_proof_targets(
+ &state,
+ &fetched,
+ &MultiAddedRemovedKeys::new(),
+ false,
+ ));
assert!(targets.is_empty());
}
@@ -1588,7 +1820,12 @@ mod tests {
fetched_slots.insert(slot1);
fetched.insert(addr1, fetched_slots);
- let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
+ let targets = unwrap_legacy_targets(get_proof_targets(
+ &state,
+ &fetched,
+ &MultiAddedRemovedKeys::new(),
+ false,
+ ));
assert!(targets.contains_key(&addr2));
assert!(!targets[&addr1].contains(&slot1));
@@ -1614,7 +1851,12 @@ mod tests {
assert!(!state.accounts.contains_key(&addr));
assert!(!fetched.contains_key(&addr));
- let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
+ let targets = unwrap_legacy_targets(get_proof_targets(
+ &state,
+ &fetched,
+ &MultiAddedRemovedKeys::new(),
+ false,
+ ));
// verify that we still get the storage slots for the unmodified account
assert!(targets.contains_key(&addr));
@@ -1656,7 +1898,12 @@ mod tests {
removed_state.storages.insert(addr, removed_storage);
multi_added_removed_keys.update_with_state(&removed_state);
- let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
+ let targets = unwrap_legacy_targets(get_proof_targets(
+ &state,
+ &fetched,
+ &multi_added_removed_keys,
+ false,
+ ));
// slot1 should be included despite being fetched, because it's marked as removed
assert!(targets.contains_key(&addr));
@@ -1683,7 +1930,12 @@ mod tests {
storage.storage.insert(slot1, U256::from(100));
state.storages.insert(addr, storage);
- let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
+ let targets = unwrap_legacy_targets(get_proof_targets(
+ &state,
+ &fetched,
+ &multi_added_removed_keys,
+ false,
+ ));
// account should be included because storage is wiped and account wasn't fetched
assert!(targets.contains_key(&addr));
@@ -1726,7 +1978,12 @@ mod tests {
removed_state.storages.insert(addr, removed_storage);
multi_added_removed_keys.update_with_state(&removed_state);
- let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
+ let targets = unwrap_legacy_targets(get_proof_targets(
+ &state,
+ &fetched,
+ &multi_added_removed_keys,
+ false,
+ ));
// only slots in the state update can be included, so slot3 should not appear
assert!(!targets.contains_key(&addr));
@@ -1753,9 +2010,12 @@ mod tests {
targets3.insert(addr3, HashSet::default());
let tx = task.tx.clone();
- tx.send(MultiProofMessage::PrefetchProofs(targets1)).unwrap();
- tx.send(MultiProofMessage::PrefetchProofs(targets2)).unwrap();
- tx.send(MultiProofMessage::PrefetchProofs(targets3)).unwrap();
+ tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets1)))
+ .unwrap();
+ tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets2)))
+ .unwrap();
+ tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets3)))
+ .unwrap();
let proofs_requested =
if let Ok(MultiProofMessage::PrefetchProofs(targets)) = task.rx.recv() {
@@ -1769,11 +2029,12 @@ mod tests {
assert_eq!(num_batched, 3);
assert_eq!(merged_targets.len(), 3);
- assert!(merged_targets.contains_key(&addr1));
- assert!(merged_targets.contains_key(&addr2));
- assert!(merged_targets.contains_key(&addr3));
+ let legacy_targets = unwrap_legacy_targets(merged_targets);
+ assert!(legacy_targets.contains_key(&addr1));
+ assert!(legacy_targets.contains_key(&addr2));
+ assert!(legacy_targets.contains_key(&addr3));
- task.on_prefetch_proof(merged_targets)
+ task.on_prefetch_proof(VersionedMultiProofTargets::Legacy(legacy_targets))
} else {
panic!("Expected PrefetchProofs message");
};
@@ -1848,11 +2109,16 @@ mod tests {
// Queue: [PrefetchProofs1, PrefetchProofs2, StateUpdate1, StateUpdate2, PrefetchProofs3]
let tx = task.tx.clone();
- tx.send(MultiProofMessage::PrefetchProofs(targets1)).unwrap();
- tx.send(MultiProofMessage::PrefetchProofs(targets2)).unwrap();
+ tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets1)))
+ .unwrap();
+ tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets2)))
+ .unwrap();
tx.send(MultiProofMessage::StateUpdate(source.into(), state_update1)).unwrap();
tx.send(MultiProofMessage::StateUpdate(source.into(), state_update2)).unwrap();
- tx.send(MultiProofMessage::PrefetchProofs(targets3.clone())).unwrap();
+ tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(
+ targets3.clone(),
+ )))
+ .unwrap();
// Step 1: Receive and batch PrefetchProofs (should get targets1 + targets2)
let mut pending_msg: Option = None;
@@ -1878,9 +2144,10 @@ mod tests {
// Should have batched exactly 2 PrefetchProofs (not 3!)
assert_eq!(num_batched, 2, "Should batch only until different message type");
assert_eq!(merged_targets.len(), 2);
- assert!(merged_targets.contains_key(&addr1));
- assert!(merged_targets.contains_key(&addr2));
- assert!(!merged_targets.contains_key(&addr3), "addr3 should NOT be in first batch");
+ let legacy_targets = unwrap_legacy_targets(merged_targets);
+ assert!(legacy_targets.contains_key(&addr1));
+ assert!(legacy_targets.contains_key(&addr2));
+ assert!(!legacy_targets.contains_key(&addr3), "addr3 should NOT be in first batch");
} else {
panic!("Expected PrefetchProofs message");
}
@@ -1905,7 +2172,8 @@ mod tests {
match task.rx.try_recv() {
Ok(MultiProofMessage::PrefetchProofs(targets)) => {
assert_eq!(targets.len(), 1);
- assert!(targets.contains_key(&addr3));
+ let legacy_targets = unwrap_legacy_targets(targets);
+ assert!(legacy_targets.contains_key(&addr3));
}
_ => panic!("PrefetchProofs3 was lost!"),
}
@@ -1951,9 +2219,13 @@ mod tests {
let source = StateChangeSource::Transaction(99);
let tx = task.tx.clone();
- tx.send(MultiProofMessage::PrefetchProofs(prefetch1)).unwrap();
+ tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(prefetch1)))
+ .unwrap();
tx.send(MultiProofMessage::StateUpdate(source.into(), state_update)).unwrap();
- tx.send(MultiProofMessage::PrefetchProofs(prefetch2.clone())).unwrap();
+ tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(
+ prefetch2.clone(),
+ )))
+ .unwrap();
let mut ctx = MultiproofBatchCtx::new(Instant::now());
let mut batch_metrics = MultiproofBatchMetrics::default();
@@ -1986,7 +2258,8 @@ mod tests {
match task.rx.try_recv() {
Ok(MultiProofMessage::PrefetchProofs(targets)) => {
assert_eq!(targets.len(), 1);
- assert!(targets.contains_key(&prefetch_addr2));
+ let legacy_targets = unwrap_legacy_targets(targets);
+ assert!(legacy_targets.contains_key(&prefetch_addr2));
}
other => panic!("Expected PrefetchProofs2 in channel, got {:?}", other),
}
diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs
index 6021098627..1083450549 100644
--- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs
+++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs
@@ -16,7 +16,7 @@ use crate::tree::{
payload_processor::{
bal::{total_slots, BALSlotIter},
executor::WorkloadExecutor,
- multiproof::MultiProofMessage,
+ multiproof::{MultiProofMessage, VersionedMultiProofTargets},
ExecutionCache as PayloadExecutionCache,
},
precompile_cache::{CachedPrecompile, PrecompileCacheMap},
@@ -237,7 +237,7 @@ where
}
/// If configured and the tx returned proof targets, emit the targets the transaction produced
- fn send_multi_proof_targets(&self, targets: Option) {
+ fn send_multi_proof_targets(&self, targets: Option) {
if self.is_execution_terminated() {
// if execution is already terminated then we dont need to send more proof fetch
// messages
@@ -479,6 +479,8 @@ where
pub(super) terminate_execution: Arc,
pub(super) precompile_cache_disabled: bool,
pub(super) precompile_cache_map: PrecompileCacheMap>,
+ /// Whether V2 proof calculation is enabled.
+ pub(super) v2_proofs_enabled: bool,
}
impl PrewarmContext
@@ -487,10 +489,12 @@ where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
Evm: ConfigureEvm + 'static,
{
- /// Splits this context into an evm, an evm config, metrics, and the atomic bool for terminating
- /// execution.
+ /// Splits this context into an evm, an evm config, metrics, the atomic bool for terminating
+ /// execution, and whether V2 proofs are enabled.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
- fn evm_for_ctx(self) -> Option<(EvmFor, PrewarmMetrics, Arc)> {
+ fn evm_for_ctx(
+ self,
+ ) -> Option<(EvmFor, PrewarmMetrics, Arc, bool)> {
let Self {
env,
evm_config,
@@ -500,6 +504,7 @@ where
terminate_execution,
precompile_cache_disabled,
precompile_cache_map,
+ v2_proofs_enabled,
} = self;
let mut state_provider = match provider.build() {
@@ -549,7 +554,7 @@ where
});
}
- Some((evm, metrics, terminate_execution))
+ Some((evm, metrics, terminate_execution, v2_proofs_enabled))
}
/// Accepts an [`mpsc::Receiver`] of transactions and a handle to prewarm task. Executes
@@ -570,7 +575,10 @@ where
) where
Tx: ExecutableTxFor,
{
- let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };
+ let Some((mut evm, metrics, terminate_execution, v2_proofs_enabled)) = self.evm_for_ctx()
+ else {
+ return
+ };
while let Ok(IndexedTransaction { index, tx }) = {
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "recv tx")
@@ -633,7 +641,8 @@ where
let _enter =
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
.entered();
- let (targets, storage_targets) = multiproof_targets_from_state(res.state);
+ let (targets, storage_targets) =
+ multiproof_targets_from_state(res.state, v2_proofs_enabled);
metrics.prefetch_storage_targets.record(storage_targets as f64);
let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
drop(_enter);
@@ -778,9 +787,22 @@ where
}
}
-/// Returns a set of [`MultiProofTargets`] and the total amount of storage targets, based on the
+/// Returns a set of [`VersionedMultiProofTargets`] and the total amount of storage targets, based
+/// on the given state.
+fn multiproof_targets_from_state(
+ state: EvmState,
+ v2_enabled: bool,
+) -> (VersionedMultiProofTargets, usize) {
+ if v2_enabled {
+ multiproof_targets_v2_from_state(state)
+ } else {
+ multiproof_targets_legacy_from_state(state)
+ }
+}
+
+/// Returns legacy [`MultiProofTargets`] and the total amount of storage targets, based on the
/// given state.
-fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) {
+fn multiproof_targets_legacy_from_state(state: EvmState) -> (VersionedMultiProofTargets, usize) {
let mut targets = MultiProofTargets::with_capacity(state.len());
let mut storage_targets = 0;
for (addr, account) in state {
@@ -810,7 +832,50 @@ fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize)
targets.insert(keccak256(addr), storage_set);
}
- (targets, storage_targets)
+ (VersionedMultiProofTargets::Legacy(targets), storage_targets)
+}
+
+/// Returns V2 [`reth_trie_parallel::targets_v2::MultiProofTargetsV2`] and the total amount of
+/// storage targets, based on the given state.
+fn multiproof_targets_v2_from_state(state: EvmState) -> (VersionedMultiProofTargets, usize) {
+ use reth_trie::proof_v2;
+ use reth_trie_parallel::targets_v2::MultiProofTargetsV2;
+
+ let mut targets = MultiProofTargetsV2::default();
+ let mut storage_target_count = 0;
+ for (addr, account) in state {
+ // if the account was not touched, or if the account was selfdestructed, do not
+ // fetch proofs for it
+ //
+ // Since selfdestruct can only happen in the same transaction, we can skip
+ // prefetching proofs for selfdestructed accounts
+ //
+ // See: https://eips.ethereum.org/EIPS/eip-6780
+ if !account.is_touched() || account.is_selfdestructed() {
+ continue
+ }
+
+ let hashed_address = keccak256(addr);
+ targets.account_targets.push(hashed_address.into());
+
+ let mut storage_slots = Vec::with_capacity(account.storage.len());
+ for (key, slot) in account.storage {
+ // do nothing if unchanged
+ if !slot.is_changed() {
+ continue
+ }
+
+ let hashed_slot = keccak256(B256::new(key.to_be_bytes()));
+ storage_slots.push(proof_v2::Target::from(hashed_slot));
+ }
+
+ storage_target_count += storage_slots.len();
+ if !storage_slots.is_empty() {
+ targets.storage_targets.insert(hashed_address, storage_slots);
+ }
+ }
+
+ (VersionedMultiProofTargets::V2(targets), storage_target_count)
}
/// The events the pre-warm task can handle.
@@ -835,7 +900,7 @@ pub(super) enum PrewarmTaskEvent {
/// The outcome of a pre-warm task
Outcome {
/// The prepared proof targets based on the evm state outcome
- proof_targets: Option,
+ proof_targets: Option,
},
/// Finished executing all transactions
FinishedTxExecution {
diff --git a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs
index b4c150cfa9..052fd8672b 100644
--- a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs
+++ b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs
@@ -4,7 +4,7 @@ use crate::tree::payload_processor::multiproof::{MultiProofTaskMetrics, SparseTr
use alloy_primitives::B256;
use rayon::iter::{ParallelBridge, ParallelIterator};
use reth_trie::{updates::TrieUpdates, Nibbles};
-use reth_trie_parallel::root::ParallelStateRootError;
+use reth_trie_parallel::{proof_task::ProofResult, root::ParallelStateRootError};
use reth_trie_sparse::{
errors::{SparseStateTrieResult, SparseTrieErrorKind},
provider::{TrieNodeProvider, TrieNodeProviderFactory},
@@ -97,8 +97,8 @@ where
debug!(
target: "engine::root",
num_updates,
- account_proofs = update.multiproof.account_subtree.len(),
- storage_proofs = update.multiproof.storages.len(),
+ account_proofs = update.multiproof.account_proofs_len(),
+ storage_proofs = update.multiproof.storage_proofs_len(),
"Updating sparse trie"
);
@@ -157,7 +157,14 @@ where
let started_at = Instant::now();
// Reveal new accounts and storage slots.
- trie.reveal_decoded_multiproof(multiproof)?;
+ match multiproof {
+ ProofResult::Legacy(decoded, _) => {
+ trie.reveal_decoded_multiproof(decoded)?;
+ }
+ ProofResult::V2(decoded_v2) => {
+ trie.reveal_decoded_multiproof_v2(decoded_v2)?;
+ }
+ }
let reveal_multiproof_elapsed = started_at.elapsed();
trace!(
target: "engine::root::sparse",
diff --git a/crates/trie/parallel/Cargo.toml b/crates/trie/parallel/Cargo.toml
index d64f2dfb51..812dd2b85b 100644
--- a/crates/trie/parallel/Cargo.toml
+++ b/crates/trie/parallel/Cargo.toml
@@ -13,8 +13,8 @@ workspace = true
[dependencies]
# reth
-reth-execution-errors.workspace = true
reth-primitives-traits.workspace = true
+reth-execution-errors.workspace = true
reth-provider.workspace = true
reth-storage-errors.workspace = true
reth-trie-common.workspace = true
diff --git a/crates/trie/parallel/src/proof.rs b/crates/trie/parallel/src/proof.rs
index 7bf936bad3..d42534c271 100644
--- a/crates/trie/parallel/src/proof.rs
+++ b/crates/trie/parallel/src/proof.rs
@@ -197,7 +197,7 @@ impl ParallelProof {
let (result_tx, result_rx) = crossbeam_unbounded();
let account_multiproof_start_time = Instant::now();
- let input = AccountMultiproofInput {
+ let input = AccountMultiproofInput::Legacy {
targets,
prefix_sets,
collect_branch_node_masks: self.collect_branch_node_masks,
@@ -208,7 +208,6 @@ impl ParallelProof {
HashedPostState::default(),
account_multiproof_start_time,
),
- v2_proofs_enabled: self.v2_proofs_enabled,
};
self.proof_worker_handle
@@ -222,7 +221,9 @@ impl ParallelProof {
)
})?;
- let ProofResult { proof: multiproof, stats } = proof_result_msg.result?;
+ let ProofResult::Legacy(multiproof, stats) = proof_result_msg.result? else {
+ panic!("AccountMultiproofInput::Legacy was submitted, expected legacy result")
+ };
#[cfg(feature = "metrics")]
self.metrics.record(stats);
@@ -235,7 +236,7 @@ impl ParallelProof {
leaves_added = stats.leaves_added(),
missed_leaves = stats.missed_leaves(),
precomputed_storage_roots = stats.precomputed_storage_roots(),
- "Calculated decoded proof"
+ "Calculated decoded proof",
);
Ok(multiproof)
diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs
index eb6f892346..076931f48c 100644
--- a/crates/trie/parallel/src/proof_task.rs
+++ b/crates/trie/parallel/src/proof_task.rs
@@ -32,6 +32,8 @@
use crate::{
root::ParallelStateRootError,
stats::{ParallelTrieStats, ParallelTrieTracker},
+ targets_v2::MultiProofTargetsV2,
+ value_encoder::AsyncAccountValueEncoder,
StorageRootTargets,
};
use alloy_primitives::{
@@ -49,11 +51,11 @@ use reth_trie::{
node_iter::{TrieElement, TrieNodeIter},
prefix_set::TriePrefixSets,
proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider, StorageProof},
- proof_v2::{self, StorageProofCalculator},
+ proof_v2,
trie_cursor::{InstrumentedTrieCursor, TrieCursorFactory, TrieCursorMetricsCache},
walker::TrieWalker,
- DecodedMultiProof, DecodedStorageMultiProof, HashBuilder, HashedPostState, MultiProofTargets,
- Nibbles, ProofTrieNode, TRIE_ACCOUNT_RLP_MAX_SIZE,
+ DecodedMultiProof, DecodedMultiProofV2, DecodedStorageMultiProof, HashBuilder, HashedPostState,
+ MultiProofTargets, Nibbles, ProofTrieNode, TRIE_ACCOUNT_RLP_MAX_SIZE,
};
use reth_trie_common::{
added_removed_keys::MultiAddedRemovedKeys,
@@ -220,7 +222,8 @@ impl ProofWorkerHandle {
metrics,
#[cfg(feature = "metrics")]
cursor_metrics,
- );
+ )
+ .with_v2_proofs(v2_proofs_enabled);
if let Err(error) = worker.run() {
error!(
target: "trie::proof_task",
@@ -333,16 +336,12 @@ impl ProofWorkerHandle {
ProviderError::other(std::io::Error::other("account workers unavailable"));
if let AccountWorkerJob::AccountMultiproof { input } = err.0 {
- let AccountMultiproofInput {
- proof_result_sender:
- ProofResultContext {
- sender: result_tx,
- sequence_number: seq,
- state,
- start_time: start,
- },
- ..
- } = *input;
+ let ProofResultContext {
+ sender: result_tx,
+ sequence_number: seq,
+ state,
+ start_time: start,
+ } = input.into_proof_result_sender();
let _ = result_tx.send(ProofResultMessage {
sequence_number: seq,
@@ -605,11 +604,65 @@ impl TrieNodeProvider for ProofTaskTrieNodeProvider {
/// Result of a multiproof calculation.
#[derive(Debug)]
-pub struct ProofResult {
- /// The account multiproof
- pub proof: DecodedMultiProof,
- /// Statistics collected during proof computation
- pub stats: ParallelTrieStats,
+pub enum ProofResult {
+ /// Legacy multiproof calculation result.
+ Legacy(DecodedMultiProof, ParallelTrieStats),
+ /// V2 multiproof calculation result.
+ V2(DecodedMultiProofV2),
+}
+
+impl ProofResult {
+ /// Creates an empty [`ProofResult`] of the appropriate variant based on `v2_enabled`.
+ ///
+ /// Use this when constructing empty proofs (e.g., for state updates where all targets
+ /// were already fetched) to ensure consistency with the proof version being used.
+ pub fn empty(v2_enabled: bool) -> Self {
+ if v2_enabled {
+ Self::V2(DecodedMultiProofV2::default())
+ } else {
+ let stats = ParallelTrieTracker::default().finish();
+ Self::Legacy(DecodedMultiProof::default(), stats)
+ }
+ }
+
+ /// Returns true if the result contains no proofs
+ pub fn is_empty(&self) -> bool {
+ match self {
+ Self::Legacy(proof, _) => proof.is_empty(),
+ Self::V2(proof) => proof.is_empty(),
+ }
+ }
+
+ /// Extends the receiver with the value of the given results.
+ ///
+ /// # Panics
+ ///
+ /// This method panics if the two [`ProofResult`]s are not the same variant.
+ pub fn extend(&mut self, other: Self) {
+ match (self, other) {
+ (Self::Legacy(proof, _), Self::Legacy(other, _)) => proof.extend(other),
+ (Self::V2(proof), Self::V2(other)) => proof.extend(other),
+ _ => panic!("mismatched ProofResults, cannot extend one with the other"),
+ }
+ }
+
+ /// Returns the number of account proofs.
+ pub fn account_proofs_len(&self) -> usize {
+ match self {
+ Self::Legacy(proof, _) => proof.account_subtree.len(),
+ Self::V2(proof) => proof.account_proofs.len(),
+ }
+ }
+
+ /// Returns the total number of storage proofs
+ pub fn storage_proofs_len(&self) -> usize {
+ match self {
+ Self::Legacy(proof, _) => {
+ proof.storages.values().map(|p| p.subtree.len()).sum::()
+ }
+ Self::V2(proof) => proof.storage_proofs.values().map(|p| p.len()).sum::(),
+ }
+ }
}
/// Channel used by worker threads to deliver `ProofResultMessage` items back to
@@ -889,7 +942,7 @@ where
&self,
proof_tx: &ProofTaskTx,
v2_calculator: Option<
- &mut StorageProofCalculator<
+ &mut proof_v2::StorageProofCalculator<
::StorageTrieCursor<'_>,
::StorageCursor<'_>,
>,
@@ -1053,6 +1106,8 @@ struct AccountProofWorker {
/// Cursor metrics for this worker
#[cfg(feature = "metrics")]
cursor_metrics: ProofTaskCursorMetrics,
+ /// Set to true if V2 proofs are enabled.
+ v2_enabled: bool,
}
impl AccountProofWorker
@@ -1082,9 +1137,16 @@ where
metrics,
#[cfg(feature = "metrics")]
cursor_metrics,
+ v2_enabled: false,
}
}
+ /// Changes whether or not V2 proofs are enabled.
+ const fn with_v2_proofs(mut self, v2_enabled: bool) -> Self {
+ self.v2_enabled = v2_enabled;
+ self
+ }
+
/// Runs the worker loop, processing jobs until the channel closes.
///
/// # Lifecycle
@@ -1117,6 +1179,17 @@ where
let mut account_nodes_processed = 0u64;
let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
+ let mut v2_calculator = if self.v2_enabled {
+ let trie_cursor = proof_tx.provider.account_trie_cursor()?;
+ let hashed_cursor = proof_tx.provider.hashed_account_cursor()?;
+ Some(proof_v2::ProofCalculator::<_, _, AsyncAccountValueEncoder>::new(
+ trie_cursor,
+ hashed_cursor,
+ ))
+ } else {
+ None
+ };
+
// Count this worker as available only after successful initialization.
self.available_workers.fetch_add(1, Ordering::Relaxed);
@@ -1128,6 +1201,7 @@ where
AccountWorkerJob::AccountMultiproof { input } => {
self.process_account_multiproof(
&proof_tx,
+ v2_calculator.as_mut(),
*input,
&mut account_proofs_processed,
&mut cursor_metrics_cache,
@@ -1166,26 +1240,18 @@ where
Ok(())
}
- /// Processes an account multiproof request.
- fn process_account_multiproof(
+ fn compute_legacy_account_multiproof(
&self,
proof_tx: &ProofTaskTx,
- input: AccountMultiproofInput,
- account_proofs_processed: &mut u64,
- cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
- ) where
+ targets: MultiProofTargets,
+ mut prefix_sets: TriePrefixSets,
+ collect_branch_node_masks: bool,
+ multi_added_removed_keys: Option>,
+ proof_cursor_metrics: &mut ProofTaskCursorMetricsCache,
+ ) -> Result
+ where
Provider: TrieCursorFactory + HashedCursorFactory,
{
- let AccountMultiproofInput {
- targets,
- mut prefix_sets,
- collect_branch_node_masks,
- multi_added_removed_keys,
- proof_result_sender:
- ProofResultContext { sender: result_tx, sequence_number: seq, state, start_time: start },
- v2_proofs_enabled,
- } = input;
-
let span = debug_span!(
target: "trie::proof_task",
"Account multiproof calculation",
@@ -1199,8 +1265,6 @@ where
"Processing account multiproof"
);
- let proof_start = Instant::now();
-
let mut tracker = ParallelTrieTracker::default();
let mut storage_prefix_sets = std::mem::take(&mut prefix_sets.storage_prefix_sets);
@@ -1210,29 +1274,14 @@ where
tracker.set_precomputed_storage_roots(storage_root_targets_len as u64);
- let storage_proof_receivers = match dispatch_storage_proofs(
+ let storage_proof_receivers = dispatch_storage_proofs(
&self.storage_work_tx,
&targets,
&mut storage_prefix_sets,
collect_branch_node_masks,
multi_added_removed_keys.as_ref(),
- v2_proofs_enabled,
- ) {
- Ok(receivers) => receivers,
- Err(error) => {
- // Send error through result channel
- error!(target: "trie::proof_task", "Failed to dispatch storage proofs: {error}");
- let _ = result_tx.send(ProofResultMessage {
- sequence_number: seq,
- result: Err(error),
- elapsed: start.elapsed(),
- state,
- });
- return;
- }
- };
+ )?;
- // Use the missed leaves cache passed from the multiproof manager
let account_prefix_set = std::mem::take(&mut prefix_sets.account_prefix_set);
let ctx = AccountMultiproofParams {
@@ -1244,17 +1293,115 @@ where
cached_storage_roots: &self.cached_storage_roots,
};
- let result =
- build_account_multiproof_with_storage_roots(&proof_tx.provider, ctx, &mut tracker);
-
- let now = Instant::now();
- let proof_elapsed = now.duration_since(proof_start);
- let total_elapsed = now.duration_since(start);
- let proof_cursor_metrics = tracker.cursor_metrics;
- proof_cursor_metrics.record_spans();
+ let result = build_account_multiproof_with_storage_roots(
+ &proof_tx.provider,
+ ctx,
+ &mut tracker,
+ proof_cursor_metrics,
+ );
let stats = tracker.finish();
- let result = result.map(|proof| ProofResult { proof, stats });
+ result.map(|proof| ProofResult::Legacy(proof, stats))
+ }
+
+ fn compute_v2_account_multiproof(
+ &self,
+ v2_calculator: &mut proof_v2::ProofCalculator<
+ ::AccountTrieCursor<'_>,
+ ::AccountCursor<'_>,
+ AsyncAccountValueEncoder,
+ >,
+ targets: MultiProofTargetsV2,
+ ) -> Result
+ where
+ Provider: TrieCursorFactory + HashedCursorFactory,
+ {
+ let MultiProofTargetsV2 { mut account_targets, storage_targets } = targets;
+
+ let span = debug_span!(
+ target: "trie::proof_task",
+ "Account V2 multiproof calculation",
+ account_targets = account_targets.len(),
+ storage_targets = storage_targets.values().map(|t| t.len()).sum::(),
+ worker_id = self.worker_id,
+ );
+ let _span_guard = span.enter();
+
+ trace!(target: "trie::proof_task", "Processing V2 account multiproof");
+
+ let storage_proof_receivers =
+ dispatch_v2_storage_proofs(&self.storage_work_tx, &account_targets, storage_targets)?;
+
+ let mut value_encoder = AsyncAccountValueEncoder::new(
+ self.storage_work_tx.clone(),
+ storage_proof_receivers,
+ self.cached_storage_roots.clone(),
+ );
+
+ let proof = DecodedMultiProofV2 {
+ account_proofs: v2_calculator.proof(&mut value_encoder, &mut account_targets)?,
+ storage_proofs: value_encoder.into_storage_proofs()?,
+ };
+
+ Ok(ProofResult::V2(proof))
+ }
+
+ /// Processes an account multiproof request.
+ fn process_account_multiproof(
+ &self,
+ proof_tx: &ProofTaskTx,
+ v2_calculator: Option<
+ &mut proof_v2::ProofCalculator<
+ ::AccountTrieCursor<'_>,
+ ::AccountCursor<'_>,
+ AsyncAccountValueEncoder,
+ >,
+ >,
+ input: AccountMultiproofInput,
+ account_proofs_processed: &mut u64,
+ cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
+ ) where
+ Provider: TrieCursorFactory + HashedCursorFactory,
+ {
+ let mut proof_cursor_metrics = ProofTaskCursorMetricsCache::default();
+ let proof_start = Instant::now();
+
+ let (proof_result_sender, result) = match input {
+ AccountMultiproofInput::Legacy {
+ targets,
+ prefix_sets,
+ collect_branch_node_masks,
+ multi_added_removed_keys,
+ proof_result_sender,
+ } => (
+ proof_result_sender,
+ self.compute_legacy_account_multiproof(
+ proof_tx,
+ targets,
+ prefix_sets,
+ collect_branch_node_masks,
+ multi_added_removed_keys,
+ &mut proof_cursor_metrics,
+ ),
+ ),
+ AccountMultiproofInput::V2 { targets, proof_result_sender } => (
+ proof_result_sender,
+ self.compute_v2_account_multiproof::(
+ v2_calculator.expect("v2 calculator provided"),
+ targets,
+ ),
+ ),
+ };
+
+ let ProofResultContext {
+ sender: result_tx,
+ sequence_number: seq,
+ state,
+ start_time: start,
+ } = proof_result_sender;
+
+ let proof_elapsed = proof_start.elapsed();
+ let total_elapsed = start.elapsed();
*account_proofs_processed += 1;
// Send result to MultiProofTask
@@ -1275,6 +1422,8 @@ where
);
}
+ proof_cursor_metrics.record_spans();
+
trace!(
target: "trie::proof_task",
proof_time_us = proof_elapsed.as_micros(),
@@ -1355,6 +1504,7 @@ fn build_account_multiproof_with_storage_roots
(
provider: &P,
ctx: AccountMultiproofParams<'_>,
tracker: &mut ParallelTrieTracker,
+ proof_cursor_metrics: &mut ProofTaskCursorMetricsCache,
) -> Result
where
P: TrieCursorFactory + HashedCursorFactory,
@@ -1362,15 +1512,12 @@ where
let accounts_added_removed_keys =
ctx.multi_added_removed_keys.as_ref().map(|keys| keys.get_accounts());
- // Create local metrics caches for account cursors. We can't directly use the metrics caches in
- // the tracker due to the call to `inc_missed_leaves` which occurs on it.
- let mut account_trie_cursor_metrics = TrieCursorMetricsCache::default();
- let mut account_hashed_cursor_metrics = HashedCursorMetricsCache::default();
-
// Wrap account trie cursor with instrumented cursor
let account_trie_cursor = provider.account_trie_cursor().map_err(ProviderError::Database)?;
- let account_trie_cursor =
- InstrumentedTrieCursor::new(account_trie_cursor, &mut account_trie_cursor_metrics);
+ let account_trie_cursor = InstrumentedTrieCursor::new(
+ account_trie_cursor,
+ &mut proof_cursor_metrics.account_trie_cursor,
+ );
// Create the walker.
let walker = TrieWalker::<_>::state_trie(account_trie_cursor, ctx.prefix_set)
@@ -1397,8 +1544,10 @@ where
// Wrap account hashed cursor with instrumented cursor
let account_hashed_cursor =
provider.hashed_account_cursor().map_err(ProviderError::Database)?;
- let account_hashed_cursor =
- InstrumentedHashedCursor::new(account_hashed_cursor, &mut account_hashed_cursor_metrics);
+ let account_hashed_cursor = InstrumentedHashedCursor::new(
+ account_hashed_cursor,
+ &mut proof_cursor_metrics.account_hashed_cursor,
+ );
let mut account_node_iter = TrieNodeIter::state_trie(walker, account_hashed_cursor);
@@ -1462,10 +1611,10 @@ where
StorageProof::new_hashed(provider, provider, hashed_address)
.with_prefix_set_mut(Default::default())
.with_trie_cursor_metrics(
- &mut tracker.cursor_metrics.storage_trie_cursor,
+ &mut proof_cursor_metrics.storage_trie_cursor,
)
.with_hashed_cursor_metrics(
- &mut tracker.cursor_metrics.storage_hashed_cursor,
+ &mut proof_cursor_metrics.storage_hashed_cursor,
)
.storage_multiproof(
ctx.targets
@@ -1516,21 +1665,6 @@ where
BranchNodeMasksMap::default()
};
- // Extend tracker with accumulated metrics from account cursors
- tracker.cursor_metrics.account_trie_cursor.extend(&account_trie_cursor_metrics);
- tracker.cursor_metrics.account_hashed_cursor.extend(&account_hashed_cursor_metrics);
-
- // Consume remaining storage proof receivers for accounts not encountered during trie walk.
- // Done last to allow storage workers more time to complete while we finalized the account trie.
- for (hashed_address, receiver) in storage_proof_receivers {
- if let Ok(proof_msg) = receiver.recv() {
- let proof_result = proof_msg.result?;
- let proof = Into::