feat(trie): add V2 account proof computation and refactor proof types (#21214)

Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Brian Picciano
2026-01-21 15:18:44 +01:00
committed by GitHub
parent ebaa4bda3a
commit f85fcba872
10 changed files with 844 additions and 274 deletions

View File

@@ -34,6 +34,11 @@ fn default_account_worker_count() -> usize {
/// The size of proof targets chunk to spawn in one multiproof calculation.
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 60;
/// The size of proof targets chunk to spawn in one multiproof calculation when V2 proofs are
/// enabled. This is 4x the default chunk size to take advantage of more efficient V2 proof
/// computation.
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2: usize = DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE * 4;
/// Default number of reserved CPU cores for non-reth processes.
///
/// This will be deducted from the thread count of main reth global threadpool.
@@ -267,6 +272,17 @@ impl TreeConfig {
self.multiproof_chunk_size
}
/// Return the multiproof task chunk size, using the V2 default if V2 proofs are enabled
/// and the chunk size is at the default value.
pub const fn effective_multiproof_chunk_size(&self) -> usize {
if self.enable_proof_v2 && self.multiproof_chunk_size == DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE
{
DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2
} else {
self.multiproof_chunk_size
}
}
/// Return the number of reserved CPU cores for non-reth processes
pub const fn reserved_cpu_cores(&self) -> usize {
self.reserved_cpu_cores

View File

@@ -245,6 +245,9 @@ where
let (to_sparse_trie, sparse_trie_rx) = channel();
let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded();
// Extract V2 proofs flag early so we can pass it to prewarm
let v2_proofs_enabled = config.enable_proof_v2();
// Handle BAL-based optimization if available
let prewarm_handle = if let Some(bal) = bal {
// When BAL is present, use BAL prewarming and send BAL to multiproof
@@ -261,6 +264,7 @@ where
provider_builder.clone(),
None, // Don't send proof targets when BAL is present
Some(bal),
v2_proofs_enabled,
)
} else {
// Normal path: spawn with transaction prewarming
@@ -271,6 +275,7 @@ where
provider_builder.clone(),
Some(to_multi_proof.clone()),
None,
v2_proofs_enabled,
)
};
@@ -278,7 +283,6 @@ where
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
let storage_worker_count = config.storage_worker_count();
let account_worker_count = config.account_worker_count();
let v2_proofs_enabled = config.enable_proof_v2();
let proof_handle = ProofWorkerHandle::new(
self.executor.handle().clone(),
task_ctx,
@@ -290,10 +294,13 @@ where
let multi_proof_task = MultiProofTask::new(
proof_handle.clone(),
to_sparse_trie,
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
config
.multiproof_chunking_enabled()
.then_some(config.effective_multiproof_chunk_size()),
to_multi_proof.clone(),
from_multi_proof,
);
)
.with_v2_proofs_enabled(v2_proofs_enabled);
// spawn multi-proof task
let parent_span = span.clone();
@@ -342,8 +349,9 @@ where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
let (prewarm_rx, execution_rx, size_hint) = self.spawn_tx_iterator(transactions);
// This path doesn't use multiproof, so V2 proofs flag doesn't matter
let prewarm_handle =
self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None, bal);
self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None, bal, false);
PayloadHandle {
to_multi_proof: None,
prewarm_handle,
@@ -410,6 +418,7 @@ where
}
/// Spawn prewarming optionally wired to the multiproof task for target updates.
#[expect(clippy::too_many_arguments)]
fn spawn_caching_with<P>(
&self,
env: ExecutionEnv<Evm>,
@@ -418,6 +427,7 @@ where
provider_builder: StateProviderBuilder<N, P>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
bal: Option<Arc<BlockAccessList>>,
v2_proofs_enabled: bool,
) -> CacheTaskHandle<N::Receipt>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
@@ -440,6 +450,7 @@ where
terminate_execution: Arc::new(AtomicBool::new(false)),
precompile_cache_disabled: self.precompile_cache_disabled,
precompile_cache_map: self.precompile_cache_map.clone(),
v2_proofs_enabled,
};
let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(

View File

@@ -11,14 +11,18 @@ use reth_metrics::Metrics;
use reth_provider::AccountReader;
use reth_revm::state::EvmState;
use reth_trie::{
added_removed_keys::MultiAddedRemovedKeys, DecodedMultiProof, HashedPostState, HashedStorage,
added_removed_keys::MultiAddedRemovedKeys, proof_v2, HashedPostState, HashedStorage,
MultiProofTargets,
};
#[cfg(test)]
use reth_trie_parallel::stats::ParallelTrieTracker;
use reth_trie_parallel::{
proof::ParallelProof,
proof_task::{
AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
AccountMultiproofInput, ProofResult, ProofResultContext, ProofResultMessage,
ProofWorkerHandle,
},
targets_v2::{ChunkedMultiProofTargetsV2, MultiProofTargetsV2},
};
use revm_primitives::map::{hash_map, B256Map};
use std::{collections::BTreeMap, sync::Arc, time::Instant};
@@ -63,12 +67,12 @@ const DEFAULT_MAX_TARGETS_FOR_CHUNKING: usize = 300;
/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
/// state.
#[derive(Default, Debug)]
#[derive(Debug)]
pub struct SparseTrieUpdate {
/// The state update that was used to calculate the proof
pub(crate) state: HashedPostState,
/// The calculated multiproof
pub(crate) multiproof: DecodedMultiProof,
pub(crate) multiproof: ProofResult,
}
impl SparseTrieUpdate {
@@ -80,7 +84,11 @@ impl SparseTrieUpdate {
/// Construct update from multiproof.
#[cfg(test)]
pub(super) fn from_multiproof(multiproof: reth_trie::MultiProof) -> alloy_rlp::Result<Self> {
Ok(Self { multiproof: multiproof.try_into()?, ..Default::default() })
let stats = ParallelTrieTracker::default().finish();
Ok(Self {
state: HashedPostState::default(),
multiproof: ProofResult::Legacy(multiproof.try_into()?, stats),
})
}
/// Extend update with contents of the other.
@@ -94,7 +102,7 @@ impl SparseTrieUpdate {
#[derive(Debug)]
pub(super) enum MultiProofMessage {
/// Prefetch proof targets
PrefetchProofs(MultiProofTargets),
PrefetchProofs(VersionedMultiProofTargets),
/// New state update from transaction execution with its source
StateUpdate(Source, EvmState),
/// State update that can be applied to the sparse trie without any new proofs.
@@ -223,12 +231,155 @@ pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostStat
hashed_state
}
/// Extends a `MultiProofTargets` with the contents of a `VersionedMultiProofTargets`,
/// regardless of which variant the latter is.
fn extend_multiproof_targets(dest: &mut MultiProofTargets, src: &VersionedMultiProofTargets) {
match src {
VersionedMultiProofTargets::Legacy(targets) => {
dest.extend_ref(targets);
}
VersionedMultiProofTargets::V2(targets) => {
// Add all account targets
for target in &targets.account_targets {
dest.entry(target.key()).or_default();
}
// Add all storage targets
for (hashed_address, slots) in &targets.storage_targets {
let slot_set = dest.entry(*hashed_address).or_default();
for slot in slots {
slot_set.insert(slot.key());
}
}
}
}
}
/// A set of multiproof targets which can be either in the legacy or V2 representations.
#[derive(Debug)]
pub(super) enum VersionedMultiProofTargets {
/// Legacy targets
Legacy(MultiProofTargets),
/// V2 targets
V2(MultiProofTargetsV2),
}
impl VersionedMultiProofTargets {
/// Returns true if there are no account or storage targets.
fn is_empty(&self) -> bool {
match self {
Self::Legacy(targets) => targets.is_empty(),
Self::V2(targets) => targets.is_empty(),
}
}
/// Returns the number of account targets in the multiproof target
fn account_targets_len(&self) -> usize {
match self {
Self::Legacy(targets) => targets.len(),
Self::V2(targets) => targets.account_targets.len(),
}
}
/// Returns the number of storage targets in the multiproof target
fn storage_targets_len(&self) -> usize {
match self {
Self::Legacy(targets) => targets.values().map(|slots| slots.len()).sum::<usize>(),
Self::V2(targets) => {
targets.storage_targets.values().map(|slots| slots.len()).sum::<usize>()
}
}
}
/// Returns the number of accounts in the multiproof targets.
fn len(&self) -> usize {
match self {
Self::Legacy(targets) => targets.len(),
Self::V2(targets) => targets.account_targets.len(),
}
}
/// Returns the total storage slot count across all accounts.
fn storage_count(&self) -> usize {
match self {
Self::Legacy(targets) => targets.values().map(|slots| slots.len()).sum(),
Self::V2(targets) => targets.storage_targets.values().map(|slots| slots.len()).sum(),
}
}
/// Returns the number of items that will be considered during chunking.
fn chunking_length(&self) -> usize {
match self {
Self::Legacy(targets) => targets.chunking_length(),
Self::V2(targets) => {
// For V2, count accounts + storage slots
targets.account_targets.len() +
targets.storage_targets.values().map(|slots| slots.len()).sum::<usize>()
}
}
}
/// Retains the targets representing the difference with another `MultiProofTargets`.
/// Removes all targets that are already present in `other`.
fn retain_difference(&mut self, other: &MultiProofTargets) {
match self {
Self::Legacy(targets) => {
targets.retain_difference(other);
}
Self::V2(targets) => {
// Remove account targets that exist in other
targets.account_targets.retain(|target| !other.contains_key(&target.key()));
// For each account in storage_targets, remove slots that exist in other
targets.storage_targets.retain(|hashed_address, slots| {
if let Some(other_slots) = other.get(hashed_address) {
slots.retain(|slot| !other_slots.contains(&slot.key()));
!slots.is_empty()
} else {
true
}
});
}
}
}
/// Extends this `VersionedMultiProofTargets` with the contents of another.
///
/// Panics if the variants do not match.
fn extend(&mut self, other: Self) {
match (self, other) {
(Self::Legacy(dest), Self::Legacy(src)) => {
dest.extend(src);
}
(Self::V2(dest), Self::V2(src)) => {
dest.account_targets.extend(src.account_targets);
for (addr, slots) in src.storage_targets {
dest.storage_targets.entry(addr).or_default().extend(slots);
}
}
_ => panic!("Cannot extend VersionedMultiProofTargets with mismatched variants"),
}
}
/// Chunks this `VersionedMultiProofTargets` into smaller chunks of the given size.
fn chunks(self, chunk_size: usize) -> Box<dyn Iterator<Item = Self>> {
match self {
Self::Legacy(targets) => {
Box::new(MultiProofTargets::chunks(targets, chunk_size).map(Self::Legacy))
}
Self::V2(targets) => {
Box::new(ChunkedMultiProofTargetsV2::new(targets, chunk_size).map(Self::V2))
}
}
}
}
/// Input parameters for dispatching a multiproof calculation.
#[derive(Debug)]
struct MultiproofInput {
source: Option<Source>,
hashed_state_update: HashedPostState,
proof_targets: MultiProofTargets,
proof_targets: VersionedMultiProofTargets,
proof_sequence_number: u64,
state_root_message_sender: CrossbeamSender<MultiProofMessage>,
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
@@ -263,8 +414,6 @@ pub struct MultiproofManager {
proof_result_tx: CrossbeamSender<ProofResultMessage>,
/// Metrics
metrics: MultiProofTaskMetrics,
/// Whether to use V2 storage proofs
v2_proofs_enabled: bool,
}
impl MultiproofManager {
@@ -278,9 +427,7 @@ impl MultiproofManager {
metrics.max_storage_workers.set(proof_worker_handle.total_storage_workers() as f64);
metrics.max_account_workers.set(proof_worker_handle.total_account_workers() as f64);
let v2_proofs_enabled = proof_worker_handle.v2_proofs_enabled();
Self { metrics, proof_worker_handle, proof_result_tx, v2_proofs_enabled }
Self { metrics, proof_worker_handle, proof_result_tx }
}
/// Dispatches a new multiproof calculation to worker pools.
@@ -325,41 +472,48 @@ impl MultiproofManager {
multi_added_removed_keys,
} = multiproof_input;
let account_targets = proof_targets.len();
let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::<usize>();
trace!(
target: "engine::tree::payload_processor::multiproof",
proof_sequence_number,
?proof_targets,
account_targets,
storage_targets,
account_targets = proof_targets.account_targets_len(),
storage_targets = proof_targets.storage_targets_len(),
?source,
"Dispatching multiproof to workers"
);
let start = Instant::now();
// Extend prefix sets with targets
let frozen_prefix_sets =
ParallelProof::extend_prefix_sets_with_targets(&Default::default(), &proof_targets);
// Workers will send ProofResultMessage directly to proof_result_rx
let proof_result_sender = ProofResultContext::new(
self.proof_result_tx.clone(),
proof_sequence_number,
hashed_state_update,
start,
);
// Dispatch account multiproof to worker pool with result sender
let input = AccountMultiproofInput {
targets: proof_targets,
prefix_sets: frozen_prefix_sets,
collect_branch_node_masks: true,
multi_added_removed_keys,
// Workers will send ProofResultMessage directly to proof_result_rx
proof_result_sender: ProofResultContext::new(
self.proof_result_tx.clone(),
proof_sequence_number,
hashed_state_update,
start,
),
v2_proofs_enabled: self.v2_proofs_enabled,
let input = match proof_targets {
VersionedMultiProofTargets::Legacy(proof_targets) => {
// Extend prefix sets with targets
let frozen_prefix_sets = ParallelProof::extend_prefix_sets_with_targets(
&Default::default(),
&proof_targets,
);
AccountMultiproofInput::Legacy {
targets: proof_targets,
prefix_sets: frozen_prefix_sets,
collect_branch_node_masks: true,
multi_added_removed_keys,
proof_result_sender,
}
}
VersionedMultiProofTargets::V2(proof_targets) => {
AccountMultiproofInput::V2 { targets: proof_targets, proof_result_sender }
}
};
// Dispatch account multiproof to worker pool with result sender
if let Err(e) = self.proof_worker_handle.dispatch_account_multiproof(input) {
error!(target: "engine::tree::payload_processor::multiproof", ?e, "Failed to dispatch account multiproof");
return;
@@ -561,6 +715,9 @@ pub(super) struct MultiProofTask {
/// there are any active workers and force chunking across workers. This is to prevent tasks
/// which are very long from hitting a single worker.
max_targets_for_chunking: usize,
/// Whether or not V2 proof calculation is enabled. If enabled then [`MultiProofTargetsV2`]
/// will be produced by state updates.
v2_proofs_enabled: bool,
}
impl MultiProofTask {
@@ -592,9 +749,16 @@ impl MultiProofTask {
),
metrics,
max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
v2_proofs_enabled: false,
}
}
/// Enables V2 proof target generation on state updates.
pub(super) const fn with_v2_proofs_enabled(mut self, v2_proofs_enabled: bool) -> Self {
self.v2_proofs_enabled = v2_proofs_enabled;
self
}
/// Handles request for proof prefetch.
///
/// Returns how many multiproof tasks were dispatched for the prefetch request.
@@ -602,37 +766,29 @@ impl MultiProofTask {
level = "debug",
target = "engine::tree::payload_processor::multiproof",
skip_all,
fields(accounts = targets.len(), chunks = 0)
fields(accounts = targets.account_targets_len(), chunks = 0)
)]
fn on_prefetch_proof(&mut self, mut targets: MultiProofTargets) -> u64 {
fn on_prefetch_proof(&mut self, mut targets: VersionedMultiProofTargets) -> u64 {
// Remove already fetched proof targets to avoid redundant work.
targets.retain_difference(&self.fetched_proof_targets);
self.fetched_proof_targets.extend_ref(&targets);
extend_multiproof_targets(&mut self.fetched_proof_targets, &targets);
// Make sure all target accounts have an `AddedRemovedKeySet` in the
// For Legacy multiproofs, make sure all target accounts have an `AddedRemovedKeySet` in the
// [`MultiAddedRemovedKeys`]. Even if there are not any known removed keys for the account,
// we still want to optimistically fetch extension children for the leaf addition case.
self.multi_added_removed_keys.touch_accounts(targets.keys().copied());
// Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks
let multi_added_removed_keys = Arc::new(MultiAddedRemovedKeys {
account: self.multi_added_removed_keys.account.clone(),
storages: targets
.keys()
.filter_map(|account| {
self.multi_added_removed_keys
.storages
.get(account)
.cloned()
.map(|keys| (*account, keys))
})
.collect(),
});
// V2 multiproofs don't need this.
let multi_added_removed_keys =
if let VersionedMultiProofTargets::Legacy(legacy_targets) = &targets {
self.multi_added_removed_keys.touch_accounts(legacy_targets.keys().copied());
Some(Arc::new(self.multi_added_removed_keys.clone()))
} else {
None
};
self.metrics.prefetch_proof_targets_accounts_histogram.record(targets.len() as f64);
self.metrics
.prefetch_proof_targets_storages_histogram
.record(targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
.record(targets.storage_count() as f64);
let chunking_len = targets.chunking_length();
let available_account_workers =
@@ -646,7 +802,7 @@ impl MultiProofTask {
self.max_targets_for_chunking,
available_account_workers,
available_storage_workers,
MultiProofTargets::chunks,
VersionedMultiProofTargets::chunks,
|proof_targets| {
self.multiproof_manager.dispatch(MultiproofInput {
source: None,
@@ -654,7 +810,7 @@ impl MultiProofTask {
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
multi_added_removed_keys: multi_added_removed_keys.clone(),
});
},
);
@@ -757,6 +913,7 @@ impl MultiProofTask {
self.multiproof_manager.proof_worker_handle.available_account_workers();
let available_storage_workers =
self.multiproof_manager.proof_worker_handle.available_storage_workers();
let num_chunks = dispatch_with_chunking(
not_fetched_state_update,
chunking_len,
@@ -770,8 +927,9 @@ impl MultiProofTask {
&hashed_state_update,
&self.fetched_proof_targets,
&multi_added_removed_keys,
self.v2_proofs_enabled,
);
spawned_proof_targets.extend_ref(&proof_targets);
extend_multiproof_targets(&mut spawned_proof_targets, &proof_targets);
self.multiproof_manager.dispatch(MultiproofInput {
source: Some(source),
@@ -871,7 +1029,10 @@ impl MultiProofTask {
batch_metrics.proofs_processed += 1;
if let Some(combined_update) = self.on_proof(
sequence_number,
SparseTrieUpdate { state, multiproof: Default::default() },
SparseTrieUpdate {
state,
multiproof: ProofResult::empty(self.v2_proofs_enabled),
},
) {
let _ = self.to_sparse_trie.send(combined_update);
}
@@ -898,8 +1059,7 @@ impl MultiProofTask {
}
let account_targets = merged_targets.len();
let storage_targets =
merged_targets.values().map(|slots| slots.len()).sum::<usize>();
let storage_targets = merged_targets.storage_count();
batch_metrics.prefetch_proofs_requested += self.on_prefetch_proof(merged_targets);
trace!(
target: "engine::tree::payload_processor::multiproof",
@@ -1003,7 +1163,10 @@ impl MultiProofTask {
if let Some(combined_update) = self.on_proof(
sequence_number,
SparseTrieUpdate { state, multiproof: Default::default() },
SparseTrieUpdate {
state,
multiproof: ProofResult::empty(self.v2_proofs_enabled),
},
) {
let _ = self.to_sparse_trie.send(combined_update);
}
@@ -1106,7 +1269,7 @@ impl MultiProofTask {
let update = SparseTrieUpdate {
state: proof_result.state,
multiproof: proof_result_data.proof,
multiproof: proof_result_data,
};
if let Some(combined_update) =
@@ -1196,7 +1359,7 @@ struct MultiproofBatchCtx {
/// received.
updates_finished_time: Option<Instant>,
/// Reusable buffer for accumulating prefetch targets during batching.
accumulated_prefetch_targets: Vec<MultiProofTargets>,
accumulated_prefetch_targets: Vec<VersionedMultiProofTargets>,
}
impl MultiproofBatchCtx {
@@ -1242,40 +1405,77 @@ fn get_proof_targets(
state_update: &HashedPostState,
fetched_proof_targets: &MultiProofTargets,
multi_added_removed_keys: &MultiAddedRemovedKeys,
) -> MultiProofTargets {
let mut targets = MultiProofTargets::default();
v2_enabled: bool,
) -> VersionedMultiProofTargets {
if v2_enabled {
let mut targets = MultiProofTargetsV2::default();
// first collect all new accounts (not previously fetched)
for hashed_address in state_update.accounts.keys() {
if !fetched_proof_targets.contains_key(hashed_address) {
targets.insert(*hashed_address, HashSet::default());
// first collect all new accounts (not previously fetched)
for &hashed_address in state_update.accounts.keys() {
if !fetched_proof_targets.contains_key(&hashed_address) {
targets.account_targets.push(hashed_address.into());
}
}
// then process storage slots for all accounts in the state update
for (hashed_address, storage) in &state_update.storages {
let fetched = fetched_proof_targets.get(hashed_address);
// If the storage is wiped, we still need to fetch the account proof.
if storage.wiped && fetched.is_none() {
targets.account_targets.push(Into::<proof_v2::Target>::into(*hashed_address));
continue
}
let changed_slots = storage
.storage
.keys()
.filter(|slot| !fetched.is_some_and(|f| f.contains(*slot)))
.map(|slot| Into::<proof_v2::Target>::into(*slot))
.collect::<Vec<_>>();
if !changed_slots.is_empty() {
targets.account_targets.push((*hashed_address).into());
targets.storage_targets.insert(*hashed_address, changed_slots);
}
}
VersionedMultiProofTargets::V2(targets)
} else {
let mut targets = MultiProofTargets::default();
// first collect all new accounts (not previously fetched)
for hashed_address in state_update.accounts.keys() {
if !fetched_proof_targets.contains_key(hashed_address) {
targets.insert(*hashed_address, HashSet::default());
}
}
// then process storage slots for all accounts in the state update
for (hashed_address, storage) in &state_update.storages {
let fetched = fetched_proof_targets.get(hashed_address);
let storage_added_removed_keys = multi_added_removed_keys.get_storage(hashed_address);
let mut changed_slots = storage
.storage
.keys()
.filter(|slot| {
!fetched.is_some_and(|f| f.contains(*slot)) ||
storage_added_removed_keys.is_some_and(|k| k.is_removed(slot))
})
.peekable();
// If the storage is wiped, we still need to fetch the account proof.
if storage.wiped && fetched.is_none() {
targets.entry(*hashed_address).or_default();
}
if changed_slots.peek().is_some() {
targets.entry(*hashed_address).or_default().extend(changed_slots);
}
}
VersionedMultiProofTargets::Legacy(targets)
}
// then process storage slots for all accounts in the state update
for (hashed_address, storage) in &state_update.storages {
let fetched = fetched_proof_targets.get(hashed_address);
let storage_added_removed_keys = multi_added_removed_keys.get_storage(hashed_address);
let mut changed_slots = storage
.storage
.keys()
.filter(|slot| {
!fetched.is_some_and(|f| f.contains(*slot)) ||
storage_added_removed_keys.is_some_and(|k| k.is_removed(slot))
})
.peekable();
// If the storage is wiped, we still need to fetch the account proof.
if storage.wiped && fetched.is_none() {
targets.entry(*hashed_address).or_default();
}
if changed_slots.peek().is_some() {
targets.entry(*hashed_address).or_default().extend(changed_slots);
}
}
targets
}
/// Dispatches work items as a single unit or in chunks based on target size and worker
@@ -1481,12 +1681,24 @@ mod tests {
state
}
fn unwrap_legacy_targets(targets: VersionedMultiProofTargets) -> MultiProofTargets {
match targets {
VersionedMultiProofTargets::Legacy(targets) => targets,
VersionedMultiProofTargets::V2(_) => panic!("Expected Legacy targets"),
}
}
#[test]
fn test_get_proof_targets_new_account_targets() {
let state = create_get_proof_targets_state();
let fetched = MultiProofTargets::default();
let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
let targets = unwrap_legacy_targets(get_proof_targets(
&state,
&fetched,
&MultiAddedRemovedKeys::new(),
false,
));
// should return all accounts as targets since nothing was fetched before
assert_eq!(targets.len(), state.accounts.len());
@@ -1500,7 +1712,12 @@ mod tests {
let state = create_get_proof_targets_state();
let fetched = MultiProofTargets::default();
let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
let targets = unwrap_legacy_targets(get_proof_targets(
&state,
&fetched,
&MultiAddedRemovedKeys::new(),
false,
));
// verify storage slots are included for accounts with storage
for (addr, storage) in &state.storages {
@@ -1528,7 +1745,12 @@ mod tests {
// mark the account as already fetched
fetched.insert(*fetched_addr, HashSet::default());
let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
let targets = unwrap_legacy_targets(get_proof_targets(
&state,
&fetched,
&MultiAddedRemovedKeys::new(),
false,
));
// should not include the already fetched account since it has no storage updates
assert!(!targets.contains_key(fetched_addr));
@@ -1548,7 +1770,12 @@ mod tests {
fetched_slots.insert(fetched_slot);
fetched.insert(*addr, fetched_slots);
let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
let targets = unwrap_legacy_targets(get_proof_targets(
&state,
&fetched,
&MultiAddedRemovedKeys::new(),
false,
));
// should not include the already fetched storage slot
let target_slots = &targets[addr];
@@ -1561,7 +1788,12 @@ mod tests {
let state = HashedPostState::default();
let fetched = MultiProofTargets::default();
let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
let targets = unwrap_legacy_targets(get_proof_targets(
&state,
&fetched,
&MultiAddedRemovedKeys::new(),
false,
));
assert!(targets.is_empty());
}
@@ -1588,7 +1820,12 @@ mod tests {
fetched_slots.insert(slot1);
fetched.insert(addr1, fetched_slots);
let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
let targets = unwrap_legacy_targets(get_proof_targets(
&state,
&fetched,
&MultiAddedRemovedKeys::new(),
false,
));
assert!(targets.contains_key(&addr2));
assert!(!targets[&addr1].contains(&slot1));
@@ -1614,7 +1851,12 @@ mod tests {
assert!(!state.accounts.contains_key(&addr));
assert!(!fetched.contains_key(&addr));
let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
let targets = unwrap_legacy_targets(get_proof_targets(
&state,
&fetched,
&MultiAddedRemovedKeys::new(),
false,
));
// verify that we still get the storage slots for the unmodified account
assert!(targets.contains_key(&addr));
@@ -1656,7 +1898,12 @@ mod tests {
removed_state.storages.insert(addr, removed_storage);
multi_added_removed_keys.update_with_state(&removed_state);
let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
let targets = unwrap_legacy_targets(get_proof_targets(
&state,
&fetched,
&multi_added_removed_keys,
false,
));
// slot1 should be included despite being fetched, because it's marked as removed
assert!(targets.contains_key(&addr));
@@ -1683,7 +1930,12 @@ mod tests {
storage.storage.insert(slot1, U256::from(100));
state.storages.insert(addr, storage);
let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
let targets = unwrap_legacy_targets(get_proof_targets(
&state,
&fetched,
&multi_added_removed_keys,
false,
));
// account should be included because storage is wiped and account wasn't fetched
assert!(targets.contains_key(&addr));
@@ -1726,7 +1978,12 @@ mod tests {
removed_state.storages.insert(addr, removed_storage);
multi_added_removed_keys.update_with_state(&removed_state);
let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
let targets = unwrap_legacy_targets(get_proof_targets(
&state,
&fetched,
&multi_added_removed_keys,
false,
));
// only slots in the state update can be included, so slot3 should not appear
assert!(!targets.contains_key(&addr));
@@ -1753,9 +2010,12 @@ mod tests {
targets3.insert(addr3, HashSet::default());
let tx = task.tx.clone();
tx.send(MultiProofMessage::PrefetchProofs(targets1)).unwrap();
tx.send(MultiProofMessage::PrefetchProofs(targets2)).unwrap();
tx.send(MultiProofMessage::PrefetchProofs(targets3)).unwrap();
tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets1)))
.unwrap();
tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets2)))
.unwrap();
tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets3)))
.unwrap();
let proofs_requested =
if let Ok(MultiProofMessage::PrefetchProofs(targets)) = task.rx.recv() {
@@ -1769,11 +2029,12 @@ mod tests {
assert_eq!(num_batched, 3);
assert_eq!(merged_targets.len(), 3);
assert!(merged_targets.contains_key(&addr1));
assert!(merged_targets.contains_key(&addr2));
assert!(merged_targets.contains_key(&addr3));
let legacy_targets = unwrap_legacy_targets(merged_targets);
assert!(legacy_targets.contains_key(&addr1));
assert!(legacy_targets.contains_key(&addr2));
assert!(legacy_targets.contains_key(&addr3));
task.on_prefetch_proof(merged_targets)
task.on_prefetch_proof(VersionedMultiProofTargets::Legacy(legacy_targets))
} else {
panic!("Expected PrefetchProofs message");
};
@@ -1848,11 +2109,16 @@ mod tests {
// Queue: [PrefetchProofs1, PrefetchProofs2, StateUpdate1, StateUpdate2, PrefetchProofs3]
let tx = task.tx.clone();
tx.send(MultiProofMessage::PrefetchProofs(targets1)).unwrap();
tx.send(MultiProofMessage::PrefetchProofs(targets2)).unwrap();
tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets1)))
.unwrap();
tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets2)))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(source.into(), state_update1)).unwrap();
tx.send(MultiProofMessage::StateUpdate(source.into(), state_update2)).unwrap();
tx.send(MultiProofMessage::PrefetchProofs(targets3.clone())).unwrap();
tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(
targets3.clone(),
)))
.unwrap();
// Step 1: Receive and batch PrefetchProofs (should get targets1 + targets2)
let mut pending_msg: Option<MultiProofMessage> = None;
@@ -1878,9 +2144,10 @@ mod tests {
// Should have batched exactly 2 PrefetchProofs (not 3!)
assert_eq!(num_batched, 2, "Should batch only until different message type");
assert_eq!(merged_targets.len(), 2);
assert!(merged_targets.contains_key(&addr1));
assert!(merged_targets.contains_key(&addr2));
assert!(!merged_targets.contains_key(&addr3), "addr3 should NOT be in first batch");
let legacy_targets = unwrap_legacy_targets(merged_targets);
assert!(legacy_targets.contains_key(&addr1));
assert!(legacy_targets.contains_key(&addr2));
assert!(!legacy_targets.contains_key(&addr3), "addr3 should NOT be in first batch");
} else {
panic!("Expected PrefetchProofs message");
}
@@ -1905,7 +2172,8 @@ mod tests {
match task.rx.try_recv() {
Ok(MultiProofMessage::PrefetchProofs(targets)) => {
assert_eq!(targets.len(), 1);
assert!(targets.contains_key(&addr3));
let legacy_targets = unwrap_legacy_targets(targets);
assert!(legacy_targets.contains_key(&addr3));
}
_ => panic!("PrefetchProofs3 was lost!"),
}
@@ -1951,9 +2219,13 @@ mod tests {
let source = StateChangeSource::Transaction(99);
let tx = task.tx.clone();
tx.send(MultiProofMessage::PrefetchProofs(prefetch1)).unwrap();
tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(prefetch1)))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(source.into(), state_update)).unwrap();
tx.send(MultiProofMessage::PrefetchProofs(prefetch2.clone())).unwrap();
tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(
prefetch2.clone(),
)))
.unwrap();
let mut ctx = MultiproofBatchCtx::new(Instant::now());
let mut batch_metrics = MultiproofBatchMetrics::default();
@@ -1986,7 +2258,8 @@ mod tests {
match task.rx.try_recv() {
Ok(MultiProofMessage::PrefetchProofs(targets)) => {
assert_eq!(targets.len(), 1);
assert!(targets.contains_key(&prefetch_addr2));
let legacy_targets = unwrap_legacy_targets(targets);
assert!(legacy_targets.contains_key(&prefetch_addr2));
}
other => panic!("Expected PrefetchProofs2 in channel, got {:?}", other),
}

View File

@@ -16,7 +16,7 @@ use crate::tree::{
payload_processor::{
bal::{total_slots, BALSlotIter},
executor::WorkloadExecutor,
multiproof::MultiProofMessage,
multiproof::{MultiProofMessage, VersionedMultiProofTargets},
ExecutionCache as PayloadExecutionCache,
},
precompile_cache::{CachedPrecompile, PrecompileCacheMap},
@@ -237,7 +237,7 @@ where
}
/// If configured and the tx returned proof targets, emit the targets the transaction produced
fn send_multi_proof_targets(&self, targets: Option<MultiProofTargets>) {
fn send_multi_proof_targets(&self, targets: Option<VersionedMultiProofTargets>) {
if self.is_execution_terminated() {
// if execution is already terminated then we dont need to send more proof fetch
// messages
@@ -479,6 +479,8 @@ where
pub(super) terminate_execution: Arc<AtomicBool>,
pub(super) precompile_cache_disabled: bool,
pub(super) precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
/// Whether V2 proof calculation is enabled.
pub(super) v2_proofs_enabled: bool,
}
impl<N, P, Evm> PrewarmContext<N, P, Evm>
@@ -487,10 +489,12 @@ where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
Evm: ConfigureEvm<Primitives = N> + 'static,
{
/// Splits this context into an evm, an evm config, metrics, and the atomic bool for terminating
/// execution.
/// Splits this context into an evm, an evm config, metrics, the atomic bool for terminating
/// execution, and whether V2 proofs are enabled.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>)> {
fn evm_for_ctx(
self,
) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>, bool)> {
let Self {
env,
evm_config,
@@ -500,6 +504,7 @@ where
terminate_execution,
precompile_cache_disabled,
precompile_cache_map,
v2_proofs_enabled,
} = self;
let mut state_provider = match provider.build() {
@@ -549,7 +554,7 @@ where
});
}
Some((evm, metrics, terminate_execution))
Some((evm, metrics, terminate_execution, v2_proofs_enabled))
}
/// Accepts an [`mpsc::Receiver`] of transactions and a handle to prewarm task. Executes
@@ -570,7 +575,10 @@ where
) where
Tx: ExecutableTxFor<Evm>,
{
let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };
let Some((mut evm, metrics, terminate_execution, v2_proofs_enabled)) = self.evm_for_ctx()
else {
return
};
while let Ok(IndexedTransaction { index, tx }) = {
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "recv tx")
@@ -633,7 +641,8 @@ where
let _enter =
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
.entered();
let (targets, storage_targets) = multiproof_targets_from_state(res.state);
let (targets, storage_targets) =
multiproof_targets_from_state(res.state, v2_proofs_enabled);
metrics.prefetch_storage_targets.record(storage_targets as f64);
let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
drop(_enter);
@@ -778,9 +787,22 @@ where
}
}
/// Returns a set of [`MultiProofTargets`] and the total amount of storage targets, based on the
/// Returns a set of [`VersionedMultiProofTargets`] and the total amount of storage targets, based
/// on the given state.
fn multiproof_targets_from_state(
state: EvmState,
v2_enabled: bool,
) -> (VersionedMultiProofTargets, usize) {
if v2_enabled {
multiproof_targets_v2_from_state(state)
} else {
multiproof_targets_legacy_from_state(state)
}
}
/// Returns legacy [`MultiProofTargets`] and the total amount of storage targets, based on the
/// given state.
fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) {
fn multiproof_targets_legacy_from_state(state: EvmState) -> (VersionedMultiProofTargets, usize) {
let mut targets = MultiProofTargets::with_capacity(state.len());
let mut storage_targets = 0;
for (addr, account) in state {
@@ -810,7 +832,50 @@ fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize)
targets.insert(keccak256(addr), storage_set);
}
(targets, storage_targets)
(VersionedMultiProofTargets::Legacy(targets), storage_targets)
}
/// Returns V2 [`reth_trie_parallel::targets_v2::MultiProofTargetsV2`] and the total amount of
/// storage targets, based on the given state.
fn multiproof_targets_v2_from_state(state: EvmState) -> (VersionedMultiProofTargets, usize) {
use reth_trie::proof_v2;
use reth_trie_parallel::targets_v2::MultiProofTargetsV2;
let mut targets = MultiProofTargetsV2::default();
let mut storage_target_count = 0;
for (addr, account) in state {
// if the account was not touched, or if the account was selfdestructed, do not
// fetch proofs for it
//
// Since selfdestruct can only happen in the same transaction, we can skip
// prefetching proofs for selfdestructed accounts
//
// See: https://eips.ethereum.org/EIPS/eip-6780
if !account.is_touched() || account.is_selfdestructed() {
continue
}
let hashed_address = keccak256(addr);
targets.account_targets.push(hashed_address.into());
let mut storage_slots = Vec::with_capacity(account.storage.len());
for (key, slot) in account.storage {
// do nothing if unchanged
if !slot.is_changed() {
continue
}
let hashed_slot = keccak256(B256::new(key.to_be_bytes()));
storage_slots.push(proof_v2::Target::from(hashed_slot));
}
storage_target_count += storage_slots.len();
if !storage_slots.is_empty() {
targets.storage_targets.insert(hashed_address, storage_slots);
}
}
(VersionedMultiProofTargets::V2(targets), storage_target_count)
}
/// The events the pre-warm task can handle.
@@ -835,7 +900,7 @@ pub(super) enum PrewarmTaskEvent<R> {
/// The outcome of a pre-warm task
Outcome {
/// The prepared proof targets based on the evm state outcome
proof_targets: Option<MultiProofTargets>,
proof_targets: Option<VersionedMultiProofTargets>,
},
/// Finished executing all transactions
FinishedTxExecution {

View File

@@ -4,7 +4,7 @@ use crate::tree::payload_processor::multiproof::{MultiProofTaskMetrics, SparseTr
use alloy_primitives::B256;
use rayon::iter::{ParallelBridge, ParallelIterator};
use reth_trie::{updates::TrieUpdates, Nibbles};
use reth_trie_parallel::root::ParallelStateRootError;
use reth_trie_parallel::{proof_task::ProofResult, root::ParallelStateRootError};
use reth_trie_sparse::{
errors::{SparseStateTrieResult, SparseTrieErrorKind},
provider::{TrieNodeProvider, TrieNodeProviderFactory},
@@ -97,8 +97,8 @@ where
debug!(
target: "engine::root",
num_updates,
account_proofs = update.multiproof.account_subtree.len(),
storage_proofs = update.multiproof.storages.len(),
account_proofs = update.multiproof.account_proofs_len(),
storage_proofs = update.multiproof.storage_proofs_len(),
"Updating sparse trie"
);
@@ -157,7 +157,14 @@ where
let started_at = Instant::now();
// Reveal new accounts and storage slots.
trie.reveal_decoded_multiproof(multiproof)?;
match multiproof {
ProofResult::Legacy(decoded, _) => {
trie.reveal_decoded_multiproof(decoded)?;
}
ProofResult::V2(decoded_v2) => {
trie.reveal_decoded_multiproof_v2(decoded_v2)?;
}
}
let reveal_multiproof_elapsed = started_at.elapsed();
trace!(
target: "engine::root::sparse",

View File

@@ -13,8 +13,8 @@ workspace = true
[dependencies]
# reth
reth-execution-errors.workspace = true
reth-primitives-traits.workspace = true
reth-execution-errors.workspace = true
reth-provider.workspace = true
reth-storage-errors.workspace = true
reth-trie-common.workspace = true

View File

@@ -197,7 +197,7 @@ impl ParallelProof {
let (result_tx, result_rx) = crossbeam_unbounded();
let account_multiproof_start_time = Instant::now();
let input = AccountMultiproofInput {
let input = AccountMultiproofInput::Legacy {
targets,
prefix_sets,
collect_branch_node_masks: self.collect_branch_node_masks,
@@ -208,7 +208,6 @@ impl ParallelProof {
HashedPostState::default(),
account_multiproof_start_time,
),
v2_proofs_enabled: self.v2_proofs_enabled,
};
self.proof_worker_handle
@@ -222,7 +221,9 @@ impl ParallelProof {
)
})?;
let ProofResult { proof: multiproof, stats } = proof_result_msg.result?;
let ProofResult::Legacy(multiproof, stats) = proof_result_msg.result? else {
panic!("AccountMultiproofInput::Legacy was submitted, expected legacy result")
};
#[cfg(feature = "metrics")]
self.metrics.record(stats);
@@ -235,7 +236,7 @@ impl ParallelProof {
leaves_added = stats.leaves_added(),
missed_leaves = stats.missed_leaves(),
precomputed_storage_roots = stats.precomputed_storage_roots(),
"Calculated decoded proof"
"Calculated decoded proof",
);
Ok(multiproof)

View File

@@ -32,6 +32,8 @@
use crate::{
root::ParallelStateRootError,
stats::{ParallelTrieStats, ParallelTrieTracker},
targets_v2::MultiProofTargetsV2,
value_encoder::AsyncAccountValueEncoder,
StorageRootTargets,
};
use alloy_primitives::{
@@ -49,11 +51,11 @@ use reth_trie::{
node_iter::{TrieElement, TrieNodeIter},
prefix_set::TriePrefixSets,
proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider, StorageProof},
proof_v2::{self, StorageProofCalculator},
proof_v2,
trie_cursor::{InstrumentedTrieCursor, TrieCursorFactory, TrieCursorMetricsCache},
walker::TrieWalker,
DecodedMultiProof, DecodedStorageMultiProof, HashBuilder, HashedPostState, MultiProofTargets,
Nibbles, ProofTrieNode, TRIE_ACCOUNT_RLP_MAX_SIZE,
DecodedMultiProof, DecodedMultiProofV2, DecodedStorageMultiProof, HashBuilder, HashedPostState,
MultiProofTargets, Nibbles, ProofTrieNode, TRIE_ACCOUNT_RLP_MAX_SIZE,
};
use reth_trie_common::{
added_removed_keys::MultiAddedRemovedKeys,
@@ -220,7 +222,8 @@ impl ProofWorkerHandle {
metrics,
#[cfg(feature = "metrics")]
cursor_metrics,
);
)
.with_v2_proofs(v2_proofs_enabled);
if let Err(error) = worker.run() {
error!(
target: "trie::proof_task",
@@ -333,16 +336,12 @@ impl ProofWorkerHandle {
ProviderError::other(std::io::Error::other("account workers unavailable"));
if let AccountWorkerJob::AccountMultiproof { input } = err.0 {
let AccountMultiproofInput {
proof_result_sender:
ProofResultContext {
sender: result_tx,
sequence_number: seq,
state,
start_time: start,
},
..
} = *input;
let ProofResultContext {
sender: result_tx,
sequence_number: seq,
state,
start_time: start,
} = input.into_proof_result_sender();
let _ = result_tx.send(ProofResultMessage {
sequence_number: seq,
@@ -605,11 +604,65 @@ impl TrieNodeProvider for ProofTaskTrieNodeProvider {
/// Result of a multiproof calculation.
#[derive(Debug)]
pub struct ProofResult {
/// The account multiproof
pub proof: DecodedMultiProof,
/// Statistics collected during proof computation
pub stats: ParallelTrieStats,
pub enum ProofResult {
/// Legacy multiproof calculation result.
Legacy(DecodedMultiProof, ParallelTrieStats),
/// V2 multiproof calculation result.
V2(DecodedMultiProofV2),
}
impl ProofResult {
/// Creates an empty [`ProofResult`] of the appropriate variant based on `v2_enabled`.
///
/// Use this when constructing empty proofs (e.g., for state updates where all targets
/// were already fetched) to ensure consistency with the proof version being used.
pub fn empty(v2_enabled: bool) -> Self {
if v2_enabled {
Self::V2(DecodedMultiProofV2::default())
} else {
let stats = ParallelTrieTracker::default().finish();
Self::Legacy(DecodedMultiProof::default(), stats)
}
}
/// Returns true if the result contains no proofs
pub fn is_empty(&self) -> bool {
match self {
Self::Legacy(proof, _) => proof.is_empty(),
Self::V2(proof) => proof.is_empty(),
}
}
/// Extends the receiver with the value of the given results.
///
/// # Panics
///
/// This method panics if the two [`ProofResult`]s are not the same variant.
pub fn extend(&mut self, other: Self) {
match (self, other) {
(Self::Legacy(proof, _), Self::Legacy(other, _)) => proof.extend(other),
(Self::V2(proof), Self::V2(other)) => proof.extend(other),
_ => panic!("mismatched ProofResults, cannot extend one with the other"),
}
}
/// Returns the number of account proofs.
pub fn account_proofs_len(&self) -> usize {
match self {
Self::Legacy(proof, _) => proof.account_subtree.len(),
Self::V2(proof) => proof.account_proofs.len(),
}
}
/// Returns the total number of storage proofs
pub fn storage_proofs_len(&self) -> usize {
match self {
Self::Legacy(proof, _) => {
proof.storages.values().map(|p| p.subtree.len()).sum::<usize>()
}
Self::V2(proof) => proof.storage_proofs.values().map(|p| p.len()).sum::<usize>(),
}
}
}
/// Channel used by worker threads to deliver `ProofResultMessage` items back to
@@ -889,7 +942,7 @@ where
&self,
proof_tx: &ProofTaskTx<Provider>,
v2_calculator: Option<
&mut StorageProofCalculator<
&mut proof_v2::StorageProofCalculator<
<Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
<Provider as HashedCursorFactory>::StorageCursor<'_>,
>,
@@ -1053,6 +1106,8 @@ struct AccountProofWorker<Factory> {
/// Cursor metrics for this worker
#[cfg(feature = "metrics")]
cursor_metrics: ProofTaskCursorMetrics,
/// Set to true if V2 proofs are enabled.
v2_enabled: bool,
}
impl<Factory> AccountProofWorker<Factory>
@@ -1082,9 +1137,16 @@ where
metrics,
#[cfg(feature = "metrics")]
cursor_metrics,
v2_enabled: false,
}
}
/// Changes whether or not V2 proofs are enabled.
const fn with_v2_proofs(mut self, v2_enabled: bool) -> Self {
self.v2_enabled = v2_enabled;
self
}
/// Runs the worker loop, processing jobs until the channel closes.
///
/// # Lifecycle
@@ -1117,6 +1179,17 @@ where
let mut account_nodes_processed = 0u64;
let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
let mut v2_calculator = if self.v2_enabled {
let trie_cursor = proof_tx.provider.account_trie_cursor()?;
let hashed_cursor = proof_tx.provider.hashed_account_cursor()?;
Some(proof_v2::ProofCalculator::<_, _, AsyncAccountValueEncoder>::new(
trie_cursor,
hashed_cursor,
))
} else {
None
};
// Count this worker as available only after successful initialization.
self.available_workers.fetch_add(1, Ordering::Relaxed);
@@ -1128,6 +1201,7 @@ where
AccountWorkerJob::AccountMultiproof { input } => {
self.process_account_multiproof(
&proof_tx,
v2_calculator.as_mut(),
*input,
&mut account_proofs_processed,
&mut cursor_metrics_cache,
@@ -1166,26 +1240,18 @@ where
Ok(())
}
/// Processes an account multiproof request.
fn process_account_multiproof<Provider>(
fn compute_legacy_account_multiproof<Provider>(
&self,
proof_tx: &ProofTaskTx<Provider>,
input: AccountMultiproofInput,
account_proofs_processed: &mut u64,
cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
) where
targets: MultiProofTargets,
mut prefix_sets: TriePrefixSets,
collect_branch_node_masks: bool,
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
proof_cursor_metrics: &mut ProofTaskCursorMetricsCache,
) -> Result<ProofResult, ParallelStateRootError>
where
Provider: TrieCursorFactory + HashedCursorFactory,
{
let AccountMultiproofInput {
targets,
mut prefix_sets,
collect_branch_node_masks,
multi_added_removed_keys,
proof_result_sender:
ProofResultContext { sender: result_tx, sequence_number: seq, state, start_time: start },
v2_proofs_enabled,
} = input;
let span = debug_span!(
target: "trie::proof_task",
"Account multiproof calculation",
@@ -1199,8 +1265,6 @@ where
"Processing account multiproof"
);
let proof_start = Instant::now();
let mut tracker = ParallelTrieTracker::default();
let mut storage_prefix_sets = std::mem::take(&mut prefix_sets.storage_prefix_sets);
@@ -1210,29 +1274,14 @@ where
tracker.set_precomputed_storage_roots(storage_root_targets_len as u64);
let storage_proof_receivers = match dispatch_storage_proofs(
let storage_proof_receivers = dispatch_storage_proofs(
&self.storage_work_tx,
&targets,
&mut storage_prefix_sets,
collect_branch_node_masks,
multi_added_removed_keys.as_ref(),
v2_proofs_enabled,
) {
Ok(receivers) => receivers,
Err(error) => {
// Send error through result channel
error!(target: "trie::proof_task", "Failed to dispatch storage proofs: {error}");
let _ = result_tx.send(ProofResultMessage {
sequence_number: seq,
result: Err(error),
elapsed: start.elapsed(),
state,
});
return;
}
};
)?;
// Use the missed leaves cache passed from the multiproof manager
let account_prefix_set = std::mem::take(&mut prefix_sets.account_prefix_set);
let ctx = AccountMultiproofParams {
@@ -1244,17 +1293,115 @@ where
cached_storage_roots: &self.cached_storage_roots,
};
let result =
build_account_multiproof_with_storage_roots(&proof_tx.provider, ctx, &mut tracker);
let now = Instant::now();
let proof_elapsed = now.duration_since(proof_start);
let total_elapsed = now.duration_since(start);
let proof_cursor_metrics = tracker.cursor_metrics;
proof_cursor_metrics.record_spans();
let result = build_account_multiproof_with_storage_roots(
&proof_tx.provider,
ctx,
&mut tracker,
proof_cursor_metrics,
);
let stats = tracker.finish();
let result = result.map(|proof| ProofResult { proof, stats });
result.map(|proof| ProofResult::Legacy(proof, stats))
}
fn compute_v2_account_multiproof<Provider>(
&self,
v2_calculator: &mut proof_v2::ProofCalculator<
<Provider as TrieCursorFactory>::AccountTrieCursor<'_>,
<Provider as HashedCursorFactory>::AccountCursor<'_>,
AsyncAccountValueEncoder,
>,
targets: MultiProofTargetsV2,
) -> Result<ProofResult, ParallelStateRootError>
where
Provider: TrieCursorFactory + HashedCursorFactory,
{
let MultiProofTargetsV2 { mut account_targets, storage_targets } = targets;
let span = debug_span!(
target: "trie::proof_task",
"Account V2 multiproof calculation",
account_targets = account_targets.len(),
storage_targets = storage_targets.values().map(|t| t.len()).sum::<usize>(),
worker_id = self.worker_id,
);
let _span_guard = span.enter();
trace!(target: "trie::proof_task", "Processing V2 account multiproof");
let storage_proof_receivers =
dispatch_v2_storage_proofs(&self.storage_work_tx, &account_targets, storage_targets)?;
let mut value_encoder = AsyncAccountValueEncoder::new(
self.storage_work_tx.clone(),
storage_proof_receivers,
self.cached_storage_roots.clone(),
);
let proof = DecodedMultiProofV2 {
account_proofs: v2_calculator.proof(&mut value_encoder, &mut account_targets)?,
storage_proofs: value_encoder.into_storage_proofs()?,
};
Ok(ProofResult::V2(proof))
}
/// Processes an account multiproof request.
fn process_account_multiproof<Provider>(
&self,
proof_tx: &ProofTaskTx<Provider>,
v2_calculator: Option<
&mut proof_v2::ProofCalculator<
<Provider as TrieCursorFactory>::AccountTrieCursor<'_>,
<Provider as HashedCursorFactory>::AccountCursor<'_>,
AsyncAccountValueEncoder,
>,
>,
input: AccountMultiproofInput,
account_proofs_processed: &mut u64,
cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
) where
Provider: TrieCursorFactory + HashedCursorFactory,
{
let mut proof_cursor_metrics = ProofTaskCursorMetricsCache::default();
let proof_start = Instant::now();
let (proof_result_sender, result) = match input {
AccountMultiproofInput::Legacy {
targets,
prefix_sets,
collect_branch_node_masks,
multi_added_removed_keys,
proof_result_sender,
} => (
proof_result_sender,
self.compute_legacy_account_multiproof(
proof_tx,
targets,
prefix_sets,
collect_branch_node_masks,
multi_added_removed_keys,
&mut proof_cursor_metrics,
),
),
AccountMultiproofInput::V2 { targets, proof_result_sender } => (
proof_result_sender,
self.compute_v2_account_multiproof::<Provider>(
v2_calculator.expect("v2 calculator provided"),
targets,
),
),
};
let ProofResultContext {
sender: result_tx,
sequence_number: seq,
state,
start_time: start,
} = proof_result_sender;
let proof_elapsed = proof_start.elapsed();
let total_elapsed = start.elapsed();
*account_proofs_processed += 1;
// Send result to MultiProofTask
@@ -1275,6 +1422,8 @@ where
);
}
proof_cursor_metrics.record_spans();
trace!(
target: "trie::proof_task",
proof_time_us = proof_elapsed.as_micros(),
@@ -1355,6 +1504,7 @@ fn build_account_multiproof_with_storage_roots<P>(
provider: &P,
ctx: AccountMultiproofParams<'_>,
tracker: &mut ParallelTrieTracker,
proof_cursor_metrics: &mut ProofTaskCursorMetricsCache,
) -> Result<DecodedMultiProof, ParallelStateRootError>
where
P: TrieCursorFactory + HashedCursorFactory,
@@ -1362,15 +1512,12 @@ where
let accounts_added_removed_keys =
ctx.multi_added_removed_keys.as_ref().map(|keys| keys.get_accounts());
// Create local metrics caches for account cursors. We can't directly use the metrics caches in
// the tracker due to the call to `inc_missed_leaves` which occurs on it.
let mut account_trie_cursor_metrics = TrieCursorMetricsCache::default();
let mut account_hashed_cursor_metrics = HashedCursorMetricsCache::default();
// Wrap account trie cursor with instrumented cursor
let account_trie_cursor = provider.account_trie_cursor().map_err(ProviderError::Database)?;
let account_trie_cursor =
InstrumentedTrieCursor::new(account_trie_cursor, &mut account_trie_cursor_metrics);
let account_trie_cursor = InstrumentedTrieCursor::new(
account_trie_cursor,
&mut proof_cursor_metrics.account_trie_cursor,
);
// Create the walker.
let walker = TrieWalker::<_>::state_trie(account_trie_cursor, ctx.prefix_set)
@@ -1397,8 +1544,10 @@ where
// Wrap account hashed cursor with instrumented cursor
let account_hashed_cursor =
provider.hashed_account_cursor().map_err(ProviderError::Database)?;
let account_hashed_cursor =
InstrumentedHashedCursor::new(account_hashed_cursor, &mut account_hashed_cursor_metrics);
let account_hashed_cursor = InstrumentedHashedCursor::new(
account_hashed_cursor,
&mut proof_cursor_metrics.account_hashed_cursor,
);
let mut account_node_iter = TrieNodeIter::state_trie(walker, account_hashed_cursor);
@@ -1462,10 +1611,10 @@ where
StorageProof::new_hashed(provider, provider, hashed_address)
.with_prefix_set_mut(Default::default())
.with_trie_cursor_metrics(
&mut tracker.cursor_metrics.storage_trie_cursor,
&mut proof_cursor_metrics.storage_trie_cursor,
)
.with_hashed_cursor_metrics(
&mut tracker.cursor_metrics.storage_hashed_cursor,
&mut proof_cursor_metrics.storage_hashed_cursor,
)
.storage_multiproof(
ctx.targets
@@ -1516,21 +1665,6 @@ where
BranchNodeMasksMap::default()
};
// Extend tracker with accumulated metrics from account cursors
tracker.cursor_metrics.account_trie_cursor.extend(&account_trie_cursor_metrics);
tracker.cursor_metrics.account_hashed_cursor.extend(&account_hashed_cursor_metrics);
// Consume remaining storage proof receivers for accounts not encountered during trie walk.
// Done last to allow storage workers more time to complete while we finalized the account trie.
for (hashed_address, receiver) in storage_proof_receivers {
if let Ok(proof_msg) = receiver.recv() {
let proof_result = proof_msg.result?;
let proof = Into::<Option<DecodedStorageMultiProof>>::into(proof_result)
.expect("Partial proofs are not yet supported");
collected_decoded_storages.insert(hashed_address, proof);
}
}
Ok(DecodedMultiProof {
account_subtree: decoded_account_subtree,
branch_node_masks,
@@ -1550,7 +1684,6 @@ fn dispatch_storage_proofs(
storage_prefix_sets: &mut B256Map<PrefixSet>,
with_branch_node_masks: bool,
multi_added_removed_keys: Option<&Arc<MultiAddedRemovedKeys>>,
use_v2_proofs: bool,
) -> Result<B256Map<CrossbeamReceiver<StorageProofResultMessage>>, ParallelStateRootError> {
let mut storage_proof_receivers =
B256Map::with_capacity_and_hasher(targets.len(), Default::default());
@@ -1564,20 +1697,14 @@ fn dispatch_storage_proofs(
let (result_tx, result_rx) = crossbeam_channel::unbounded();
// Create computation input based on V2 flag
let input = if use_v2_proofs {
// Convert target slots to V2 targets
let v2_targets = target_slots.iter().copied().map(Into::into).collect();
StorageProofInput::new(*hashed_address, v2_targets)
} else {
let prefix_set = storage_prefix_sets.remove(hashed_address).unwrap_or_default();
StorageProofInput::legacy(
*hashed_address,
prefix_set,
target_slots.clone(),
with_branch_node_masks,
multi_added_removed_keys.cloned(),
)
};
let prefix_set = storage_prefix_sets.remove(hashed_address).unwrap_or_default();
let input = StorageProofInput::legacy(
*hashed_address,
prefix_set,
target_slots.clone(),
with_branch_node_masks,
multi_added_removed_keys.cloned(),
);
// Always dispatch a storage proof so we obtain the storage root even when no slots are
// requested.
@@ -1595,6 +1722,64 @@ fn dispatch_storage_proofs(
Ok(storage_proof_receivers)
}
/// Queues V2 storage proofs for all accounts in the targets and returns receivers.
///
/// This function queues all storage proof tasks to the worker pool but returns immediately
/// with receivers, allowing the account trie walk to proceed in parallel with storage proof
/// computation. This enables interleaved parallelism for better performance.
///
/// Propagates errors up if queuing fails. Receivers must be consumed by the caller.
fn dispatch_v2_storage_proofs(
storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
account_targets: &Vec<proof_v2::Target>,
storage_targets: B256Map<Vec<proof_v2::Target>>,
) -> Result<B256Map<CrossbeamReceiver<StorageProofResultMessage>>, ParallelStateRootError> {
let mut storage_proof_receivers =
B256Map::with_capacity_and_hasher(account_targets.len(), Default::default());
// Dispatch all proofs for targeted storage slots
for (hashed_address, targets) in storage_targets {
// Create channel for receiving StorageProofResultMessage
let (result_tx, result_rx) = crossbeam_channel::unbounded();
let input = StorageProofInput::new(hashed_address, targets);
storage_work_tx
.send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
.map_err(|_| {
ParallelStateRootError::Other(format!(
"Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable",
))
})?;
storage_proof_receivers.insert(hashed_address, result_rx);
}
// If there are any targeted accounts which did not have storage targets then we generate a
// single proof target for them so that we get their root.
for target in account_targets {
let hashed_address = target.key();
if storage_proof_receivers.contains_key(&hashed_address) {
continue
}
let (result_tx, result_rx) = crossbeam_channel::unbounded();
let input = StorageProofInput::new(hashed_address, vec![proof_v2::Target::new(B256::ZERO)]);
storage_work_tx
.send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
.map_err(|_| {
ParallelStateRootError::Other(format!(
"Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable",
))
})?;
storage_proof_receivers.insert(hashed_address, result_rx);
}
Ok(storage_proof_receivers)
}
/// Input parameters for storage proof computation.
#[derive(Debug)]
pub enum StorageProofInput {
@@ -1639,7 +1824,7 @@ impl StorageProofInput {
}
}
/// Creates a new [`StorageProofInput`] with the given hashed address and target slots.
/// Creates a new [`StorageProofInput`] with the given hashed address and target slots.
pub const fn new(hashed_address: B256, targets: Vec<proof_v2::Target>) -> Self {
Self::V2 { hashed_address, targets }
}
@@ -1655,20 +1840,39 @@ impl StorageProofInput {
}
/// Input parameters for account multiproof computation.
#[derive(Debug, Clone)]
pub struct AccountMultiproofInput {
/// The targets for which to compute the multiproof.
pub targets: MultiProofTargets,
/// The prefix sets for the proof calculation.
pub prefix_sets: TriePrefixSets,
/// Whether or not to collect branch node masks.
pub collect_branch_node_masks: bool,
/// Provided by the user to give the necessary context to retain extra proofs.
pub multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
/// Context for sending the proof result.
pub proof_result_sender: ProofResultContext,
/// Whether to use V2 storage proofs.
pub v2_proofs_enabled: bool,
#[derive(Debug)]
pub enum AccountMultiproofInput {
/// Legacy account multiproof proof variant
Legacy {
/// The targets for which to compute the multiproof.
targets: MultiProofTargets,
/// The prefix sets for the proof calculation.
prefix_sets: TriePrefixSets,
/// Whether or not to collect branch node masks.
collect_branch_node_masks: bool,
/// Provided by the user to give the necessary context to retain extra proofs.
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
/// Context for sending the proof result.
proof_result_sender: ProofResultContext,
},
/// V2 account multiproof variant
V2 {
/// The targets for which to compute the multiproof.
targets: MultiProofTargetsV2,
/// Context for sending the proof result.
proof_result_sender: ProofResultContext,
},
}
impl AccountMultiproofInput {
/// Returns the [`ProofResultContext`] for this input, consuming the input.
fn into_proof_result_sender(self) -> ProofResultContext {
match self {
Self::Legacy { proof_result_sender, .. } | Self::V2 { proof_result_sender, .. } => {
proof_result_sender
}
}
}
}
/// Parameters for building an account multiproof with pre-computed storage roots.

View File

@@ -1,5 +1,3 @@
#[cfg(feature = "metrics")]
use crate::proof_task_metrics::ProofTaskCursorMetricsCache;
use derive_more::Deref;
use reth_trie::stats::{TrieStats, TrieTracker};
@@ -36,9 +34,6 @@ pub struct ParallelTrieTracker {
trie: TrieTracker,
precomputed_storage_roots: u64,
missed_leaves: u64,
#[cfg(feature = "metrics")]
/// Local tracking of cursor-related metrics
pub cursor_metrics: ProofTaskCursorMetricsCache,
}
impl ParallelTrieTracker {

View File

@@ -86,7 +86,6 @@ pub(crate) struct AsyncAccountValueEncoder {
impl AsyncAccountValueEncoder {
/// Initializes a [`Self`] using a `ProofWorkerHandle` which will be used to calculate storage
/// roots asynchronously.
#[expect(dead_code)]
pub(crate) fn new(
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
dispatched: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
@@ -106,7 +105,6 @@ impl AsyncAccountValueEncoder {
///
/// This method panics if any deferred encoders produced by [`Self::deferred_encoder`] have not
/// been dropped.
#[expect(dead_code)]
pub(crate) fn into_storage_proofs(
self,
) -> Result<B256Map<Vec<ProofTrieNode>>, StateProofError> {