chore: remove legacy proof code paths and simplify to V2-only (#22270)

Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Brian Picciano
2026-02-17 15:42:13 +01:00
committed by GitHub
parent e89bf483bc
commit 6f9a3242ef
16 changed files with 169 additions and 3776 deletions

View File

@@ -0,0 +1,10 @@
---
reth-engine-primitives: patch
reth-engine-tree: patch
reth-node-core: patch
reth-trie-parallel: minor
---
Removed legacy proof calculation system and V2-specific configuration flags.
Removed the legacy (non-V2) proof calculation code paths, simplified multiproof task architecture by removing the dual-mode system, and cleaned up V2-specific CLI flags (`--engine.disable-proof-v2`, `--engine.disable-trie-cache`) that are no longer needed. The codebase now exclusively uses V2 proofs with the sparse trie cache.

2
Cargo.lock generated
View File

@@ -8389,7 +8389,6 @@ dependencies = [
"revm-state",
"schnellru",
"serde_json",
"smallvec",
"thiserror 2.0.18",
"tokio",
"tracing",
@@ -10596,7 +10595,6 @@ dependencies = [
"reth-storage-errors",
"reth-tasks",
"reth-trie",
"reth-trie-common",
"reth-trie-db",
"reth-trie-sparse",
"thiserror 2.0.18",

View File

@@ -39,11 +39,6 @@ pub const SMALL_BLOCK_MULTIPROOF_CHUNK_SIZE: usize = 30;
/// Gas threshold below which the small block chunk size is used.
pub const SMALL_BLOCK_GAS_THRESHOLD: u64 = 20_000_000;
/// The size of proof targets chunk to spawn in one multiproof calculation when V2 proofs are
/// enabled. This is 4x the default chunk size to take advantage of more efficient V2 proof
/// computation.
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2: usize = DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE * 4;
/// Default number of reserved CPU cores for non-reth processes.
///
/// This will be deducted from the thread count of main reth global threadpool.
@@ -162,12 +157,8 @@ pub struct TreeConfig {
storage_worker_count: usize,
/// Number of account proof worker threads.
account_worker_count: usize,
/// Whether to disable V2 storage proofs.
disable_proof_v2: bool,
/// Whether to disable cache metrics recording (can be expensive with large cached state).
disable_cache_metrics: bool,
/// Whether to disable sparse trie cache.
disable_trie_cache: bool,
/// Depth for sparse trie pruning after state root computation.
sparse_trie_prune_depth: usize,
/// Maximum number of storage tries to retain after pruning.
@@ -205,9 +196,7 @@ impl Default for TreeConfig {
allow_unwind_canonical_header: false,
storage_worker_count: default_storage_worker_count(),
account_worker_count: default_account_worker_count(),
disable_proof_v2: false,
disable_cache_metrics: false,
disable_trie_cache: false,
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
disable_sparse_trie_cache_pruning: false,
@@ -241,7 +230,6 @@ impl TreeConfig {
allow_unwind_canonical_header: bool,
storage_worker_count: usize,
account_worker_count: usize,
disable_proof_v2: bool,
disable_cache_metrics: bool,
sparse_trie_prune_depth: usize,
sparse_trie_max_storage_tries: usize,
@@ -269,9 +257,7 @@ impl TreeConfig {
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
disable_proof_v2,
disable_cache_metrics,
disable_trie_cache: false,
sparse_trie_prune_depth,
sparse_trie_max_storage_tries,
disable_sparse_trie_cache_pruning: false,
@@ -314,16 +300,9 @@ impl TreeConfig {
self.multiproof_chunk_size
}
/// Return the multiproof task chunk size, using the V2 default if V2 proofs are enabled
/// and the chunk size is at the default value.
/// Return the effective multiproof task chunk size.
pub const fn effective_multiproof_chunk_size(&self) -> usize {
if !self.disable_proof_v2 &&
self.multiproof_chunk_size == DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE
{
DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2
} else {
self.multiproof_chunk_size
}
self.multiproof_chunk_size
}
/// Return the number of reserved CPU cores for non-reth processes
@@ -559,17 +538,6 @@ impl TreeConfig {
self
}
/// Return whether V2 storage proofs are disabled.
pub const fn disable_proof_v2(&self) -> bool {
self.disable_proof_v2
}
/// Setter for whether to disable V2 storage proofs.
pub const fn with_disable_proof_v2(mut self, disable_proof_v2: bool) -> Self {
self.disable_proof_v2 = disable_proof_v2;
self
}
/// Returns whether cache metrics recording is disabled.
pub const fn disable_cache_metrics(&self) -> bool {
self.disable_cache_metrics
@@ -581,17 +549,6 @@ impl TreeConfig {
self
}
/// Returns whether sparse trie cache is disabled.
pub const fn disable_trie_cache(&self) -> bool {
self.disable_trie_cache
}
/// Setter for whether to disable sparse trie cache.
pub const fn with_disable_trie_cache(mut self, value: bool) -> Self {
self.disable_trie_cache = value;
self
}
/// Returns the sparse trie prune depth.
pub const fn sparse_trie_prune_depth(&self) -> usize {
self.sparse_trie_prune_depth

View File

@@ -54,7 +54,6 @@ thiserror.workspace = true
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "sync", "macros"] }
fixed-cache.workspace = true
moka = { workspace = true, features = ["sync"] }
smallvec.workspace = true
# metrics
metrics.workspace = true

View File

@@ -2,12 +2,12 @@
use super::precompile_cache::PrecompileCacheMap;
use crate::tree::{
cached_state::{CachedStateMetrics, CachedStateProvider, ExecutionCache, SavedCache},
cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
payload_processor::{
prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
sparse_trie::StateRootComputeOutcome,
},
sparse_trie::{SparseTrieCacheTask, SparseTrieTask, SpawnedSparseTrieTask},
sparse_trie::SparseTrieCacheTask,
CacheWaitDurations, StateProviderBuilder, TreeConfig, WaitForCaches,
};
use alloy_eip7928::BlockAccessList;
@@ -16,7 +16,7 @@ use alloy_evm::block::StateChangeSource;
use alloy_primitives::B256;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use metrics::{Counter, Histogram};
use multiproof::{SparseTrieUpdate, *};
use multiproof::*;
use parking_lot::RwLock;
use prewarm::PrewarmMetrics;
use rayon::prelude::*;
@@ -30,8 +30,7 @@ use reth_evm::{
use reth_metrics::Metrics;
use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
use reth_provider::{
BlockExecutionOutput, BlockReader, DatabaseProviderROFactory, StateProvider,
StateProviderFactory, StateReader,
BlockExecutionOutput, BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader,
};
use reth_revm::{db::BundleState, state::EvmState};
use reth_tasks::{ForEachOrdered, Runtime};
@@ -249,7 +248,7 @@ where
///
/// ## Sparse trie task
///
/// Responsible for calculating the state root based on the received [`SparseTrieUpdate`].
/// Responsible for calculating the state root.
///
/// This task runs until there are no further updates to process.
///
@@ -284,66 +283,32 @@ where
self.spawn_tx_iterator(transactions, env.transaction_count);
let span = Span::current();
let (to_sparse_trie, sparse_trie_rx) = channel();
let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded();
let v2_proofs_enabled = !config.disable_proof_v2();
let parent_state_root = env.parent_state_root;
let transaction_count = env.transaction_count;
let chunk_size = Self::adaptive_chunk_size(config, env.gas_used);
let prewarm_handle = self.spawn_caching_with(
env,
prewarm_rx,
provider_builder.clone(),
provider_builder,
Some(to_multi_proof.clone()),
bal,
v2_proofs_enabled,
);
// Create and spawn the storage proof task.
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
let halve_workers = transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD;
let proof_handle =
ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers, v2_proofs_enabled);
if config.disable_trie_cache() {
let multi_proof_task = MultiProofTask::new(
proof_handle.clone(),
to_sparse_trie,
chunk_size,
to_multi_proof.clone(),
from_multi_proof.clone(),
)
.with_v2_proofs_enabled(v2_proofs_enabled);
// spawn multi-proof task
let parent_span = span.clone();
let saved_cache = prewarm_handle.saved_cache.clone();
self.executor.spawn_blocking(move || {
let _enter = parent_span.entered();
// Build a state provider for the multiproof task
let provider = provider_builder.build().expect("failed to build provider");
let provider = if let Some(saved_cache) = saved_cache {
let (cache, metrics, _disable_metrics) = saved_cache.split();
Box::new(CachedStateProvider::new(provider, cache, metrics))
as Box<dyn StateProvider>
} else {
Box::new(provider)
};
multi_proof_task.run(provider);
});
}
let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers);
// wire the sparse trie to the state root response receiver
let (state_root_tx, state_root_rx) = channel();
// Spawn the sparse trie task using any stored trie and parallel trie configuration.
self.spawn_sparse_trie_task(
sparse_trie_rx,
proof_handle,
state_root_tx,
from_multi_proof,
config,
parent_state_root,
chunk_size,
);
@@ -373,9 +338,7 @@ where
{
let (prewarm_rx, execution_rx) =
self.spawn_tx_iterator(transactions, env.transaction_count);
// This path doesn't use multiproof, so V2 proofs flag doesn't matter
let prewarm_handle =
self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal, false);
let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal);
PayloadHandle {
to_multi_proof: None,
prewarm_handle,
@@ -494,7 +457,7 @@ where
level = "debug",
target = "engine::tree::payload_processor",
skip_all,
fields(bal=%bal.is_some(), %v2_proofs_enabled)
fields(bal=%bal.is_some())
)]
fn spawn_caching_with<P>(
&self,
@@ -503,7 +466,6 @@ where
provider_builder: StateProviderBuilder<N, P>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
bal: Option<Arc<BlockAccessList>>,
v2_proofs_enabled: bool,
) -> CacheTaskHandle<N::Receipt>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
@@ -523,7 +485,6 @@ where
terminate_execution: Arc::new(AtomicBool::new(false)),
precompile_cache_disabled: self.precompile_cache_disabled,
precompile_cache_map: self.precompile_cache_map.clone(),
v2_proofs_enabled,
};
let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
@@ -570,23 +531,19 @@ where
}
}
/// Spawns the [`SparseTrieTask`] for this payload processor.
/// Spawns the [`SparseTrieCacheTask`] for this payload processor.
///
/// The trie is preserved when the new payload is a child of the previous one.
#[expect(clippy::too_many_arguments)]
fn spawn_sparse_trie_task(
&self,
sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
proof_worker_handle: ProofWorkerHandle,
state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
from_multi_proof: CrossbeamReceiver<MultiProofMessage>,
config: &TreeConfig,
parent_state_root: B256,
chunk_size: Option<usize>,
) {
let preserved_sparse_trie = self.sparse_state_trie.clone();
let trie_metrics = self.trie_metrics.clone();
let disable_trie_cache = config.disable_trie_cache();
let prune_depth = self.sparse_trie_prune_depth;
let max_storage_tries = self.sparse_trie_max_storage_tries;
let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
@@ -625,23 +582,14 @@ where
.with_updates(true)
});
let mut task = if disable_trie_cache {
SpawnedSparseTrieTask::Cleared(SparseTrieTask::new(
sparse_trie_rx,
proof_worker_handle,
trie_metrics.clone(),
sparse_state_trie,
))
} else {
SpawnedSparseTrieTask::Cached(SparseTrieCacheTask::new_with_trie(
&executor,
from_multi_proof,
proof_worker_handle,
trie_metrics.clone(),
sparse_state_trie.with_skip_proof_node_filtering(true),
chunk_size,
))
};
let mut task = SparseTrieCacheTask::new_with_trie(
&executor,
from_multi_proof,
proof_worker_handle,
trie_metrics.clone(),
sparse_state_trie.with_skip_proof_node_filtering(true),
chunk_size,
);
let result = task.run();
// Capture the computed state_root before sending the result

File diff suppressed because it is too large Load Diff

View File

@@ -12,7 +12,7 @@ pub(super) type SparseTrie = SparseStateTrie;
/// Shared handle to a preserved sparse trie that can be reused across payload validations.
///
/// This is stored in [`PayloadProcessor`](super::PayloadProcessor) and cloned to pass to
/// [`SparseTrieTask`](super::sparse_trie::SparseTrieTask) for trie reuse.
/// [`SparseTrieCacheTask`](super::sparse_trie::SparseTrieCacheTask) for trie reuse.
#[derive(Debug, Default, Clone)]
pub(super) struct SharedPreservedSparseTrie(Arc<Mutex<Option<PreservedSparseTrie>>>);

View File

@@ -13,11 +13,7 @@
use crate::tree::{
cached_state::{CachedStateProvider, SavedCache},
payload_processor::{
bal,
multiproof::{MultiProofMessage, VersionedMultiProofTargets},
PayloadExecutionCache,
},
payload_processor::{bal, multiproof::MultiProofMessage, PayloadExecutionCache},
precompile_cache::{CachedPrecompile, PrecompileCacheMap},
ExecutionEnv, StateProviderBuilder,
};
@@ -25,7 +21,7 @@ use alloy_consensus::transaction::TxHashRef;
use alloy_eip7928::BlockAccessList;
use alloy_eips::eip4895::Withdrawal;
use alloy_evm::Database;
use alloy_primitives::{keccak256, map::B256Set, StorageKey, B256};
use alloy_primitives::{keccak256, StorageKey, B256};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use metrics::{Counter, Gauge, Histogram};
use rayon::prelude::*;
@@ -38,7 +34,7 @@ use reth_provider::{
};
use reth_revm::{database::StateProviderDatabase, state::EvmState};
use reth_tasks::Runtime;
use reth_trie::MultiProofTargets;
use reth_trie_parallel::targets_v2::MultiProofTargetsV2;
use std::sync::{
atomic::{AtomicBool, Ordering},
mpsc::{self, channel, Receiver, Sender, SyncSender},
@@ -187,9 +183,9 @@ where
&& let Some(withdrawals) = &ctx.env.withdrawals
&& !withdrawals.is_empty()
{
let targets =
multiproof_targets_from_withdrawals(withdrawals, ctx.v2_proofs_enabled);
let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
let targets = multiproof_targets_from_withdrawals(withdrawals);
let _ = to_multi_proof
.send(MultiProofMessage::PrefetchProofs(targets));
}
// drop sender and wait for all tasks to finish
@@ -456,8 +452,6 @@ where
pub precompile_cache_disabled: bool,
/// The precompile cache map.
pub precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
/// Whether V2 proof calculation is enabled.
pub v2_proofs_enabled: bool,
}
impl<N, P, Evm> PrewarmContext<N, P, Evm>
@@ -466,12 +460,9 @@ where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
Evm: ConfigureEvm<Primitives = N> + 'static,
{
/// Splits this context into an evm, an evm config, metrics, the atomic bool for terminating
/// execution, and whether V2 proofs are enabled.
/// Splits this context into an evm, metrics, and the atomic bool for terminating execution.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn evm_for_ctx(
self,
) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>, bool)> {
fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>)> {
let Self {
env,
evm_config,
@@ -481,7 +472,6 @@ where
terminate_execution,
precompile_cache_disabled,
precompile_cache_map,
v2_proofs_enabled,
} = self;
let mut state_provider = match provider.build() {
@@ -532,7 +522,7 @@ where
});
}
Some((evm, metrics, terminate_execution, v2_proofs_enabled))
Some((evm, metrics, terminate_execution))
}
/// Accepts a [`CrossbeamReceiver`] of transactions and a handle to prewarm task. Executes
@@ -553,10 +543,7 @@ where
) where
Tx: ExecutableTxFor<Evm>,
{
let Some((mut evm, metrics, terminate_execution, v2_proofs_enabled)) = self.evm_for_ctx()
else {
return
};
let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };
while let Ok(IndexedTransaction { index, tx }) = txs.recv() {
let _enter = debug_span!(
@@ -601,8 +588,7 @@ where
// Only send outcome for transactions after the first txn
// as the main execution will be just as fast
if index > 0 {
let (targets, storage_targets) =
multiproof_targets_from_state(res.state, v2_proofs_enabled);
let (targets, storage_targets) = multiproof_targets_from_state(res.state);
metrics.prefetch_storage_targets.record(storage_targets as f64);
if let Some(to_multi_proof) = &to_multi_proof {
let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
@@ -694,59 +680,10 @@ where
}
}
/// Returns a set of [`VersionedMultiProofTargets`] and the total amount of storage targets, based
/// on the given state.
fn multiproof_targets_from_state(
state: EvmState,
v2_enabled: bool,
) -> (VersionedMultiProofTargets, usize) {
if v2_enabled {
multiproof_targets_v2_from_state(state)
} else {
multiproof_targets_legacy_from_state(state)
}
}
/// Returns legacy [`MultiProofTargets`] and the total amount of storage targets, based on the
/// Returns a set of [`MultiProofTargetsV2`] and the total amount of storage targets, based on the
/// given state.
fn multiproof_targets_legacy_from_state(state: EvmState) -> (VersionedMultiProofTargets, usize) {
let mut targets = MultiProofTargets::with_capacity(state.len());
let mut storage_targets = 0;
for (addr, account) in state {
// if the account was not touched, or if the account was selfdestructed, do not
// fetch proofs for it
//
// Since selfdestruct can only happen in the same transaction, we can skip
// prefetching proofs for selfdestructed accounts
//
// See: https://eips.ethereum.org/EIPS/eip-6780
if !account.is_touched() || account.is_selfdestructed() {
continue
}
let mut storage_set =
B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
for (key, slot) in account.storage {
// do nothing if unchanged
if !slot.is_changed() {
continue
}
storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
}
storage_targets += storage_set.len();
targets.insert(keccak256(addr), storage_set);
}
(VersionedMultiProofTargets::Legacy(targets), storage_targets)
}
/// Returns V2 [`reth_trie_parallel::targets_v2::MultiProofTargetsV2`] and the total amount of
/// storage targets, based on the given state.
fn multiproof_targets_v2_from_state(state: EvmState) -> (VersionedMultiProofTargets, usize) {
fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargetsV2, usize) {
use reth_trie::proof_v2;
use reth_trie_parallel::targets_v2::MultiProofTargetsV2;
let mut targets = MultiProofTargetsV2::default();
let mut storage_target_count = 0;
@@ -782,27 +719,17 @@ fn multiproof_targets_v2_from_state(state: EvmState) -> (VersionedMultiProofTarg
}
}
(VersionedMultiProofTargets::V2(targets), storage_target_count)
(targets, storage_target_count)
}
/// Returns [`VersionedMultiProofTargets`] for withdrawal addresses.
/// Returns [`MultiProofTargetsV2`] for withdrawal addresses.
///
/// Withdrawals only modify account balances (no storage), so the targets contain
/// only account-level entries with empty storage sets.
fn multiproof_targets_from_withdrawals(
withdrawals: &[Withdrawal],
v2_enabled: bool,
) -> VersionedMultiProofTargets {
use reth_trie_parallel::targets_v2::MultiProofTargetsV2;
if v2_enabled {
VersionedMultiProofTargets::V2(MultiProofTargetsV2 {
account_targets: withdrawals.iter().map(|w| keccak256(w.address).into()).collect(),
..Default::default()
})
} else {
VersionedMultiProofTargets::Legacy(
withdrawals.iter().map(|w| (keccak256(w.address), Default::default())).collect(),
)
fn multiproof_targets_from_withdrawals(withdrawals: &[Withdrawal]) -> MultiProofTargetsV2 {
MultiProofTargetsV2 {
account_targets: withdrawals.iter().map(|w| keccak256(w.address).into()).collect(),
..Default::default()
}
}

View File

@@ -3,9 +3,9 @@
use crate::tree::{
multiproof::{
dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
VersionedMultiProofTargets, DEFAULT_MAX_TARGETS_FOR_CHUNKING,
DEFAULT_MAX_TARGETS_FOR_CHUNKING,
},
payload_processor::multiproof::{MultiProofTaskMetrics, SparseTrieUpdate},
payload_processor::multiproof::MultiProofTaskMetrics,
};
use alloy_primitives::B256;
use alloy_rlp::{Decodable, Encodable};
@@ -14,13 +14,12 @@ use rayon::iter::ParallelIterator;
use reth_primitives_traits::{Account, FastInstant as Instant, ParallelBridgeBuffered};
use reth_tasks::Runtime;
use reth_trie::{
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
TrieAccount, EMPTY_ROOT_HASH, TRIE_ACCOUNT_RLP_MAX_SIZE,
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, TrieAccount,
EMPTY_ROOT_HASH, TRIE_ACCOUNT_RLP_MAX_SIZE,
};
use reth_trie_parallel::{
proof_task::{
AccountMultiproofInput, ProofResult, ProofResultContext, ProofResultMessage,
ProofWorkerHandle,
AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
},
root::ParallelStateRootError,
targets_v2::MultiProofTargetsV2,
@@ -28,194 +27,11 @@ use reth_trie_parallel::{
#[cfg(feature = "trie-debug")]
use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
use reth_trie_sparse::{
errors::{SparseStateTrieResult, SparseTrieErrorKind, SparseTrieResult},
provider::{TrieNodeProvider, TrieNodeProviderFactory},
DeferredDrops, LeafUpdate, ParallelSparseTrie, SparseStateTrie, SparseTrie,
errors::SparseTrieResult, DeferredDrops, LeafUpdate, ParallelSparseTrie, SparseStateTrie,
SparseTrie,
};
use revm_primitives::{hash_map::Entry, B256Map};
use smallvec::SmallVec;
use std::{sync::mpsc, time::Duration};
use tracing::{debug, debug_span, error, instrument, trace};
#[expect(clippy::large_enum_variant)]
pub(super) enum SpawnedSparseTrieTask<BPF, A, S>
where
BPF: TrieNodeProviderFactory + Send + Sync,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
{
Cleared(SparseTrieTask<BPF, A, S>),
Cached(SparseTrieCacheTask<A, S>),
}
impl<BPF, A, S> SpawnedSparseTrieTask<BPF, A, S>
where
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
{
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
match self {
Self::Cleared(task) => task.run(),
Self::Cached(task) => task.run(),
}
}
pub(super) fn into_trie_for_reuse(
self,
prune_depth: usize,
max_storage_tries: usize,
max_nodes_capacity: usize,
max_values_capacity: usize,
disable_pruning: bool,
) -> (SparseStateTrie<A, S>, DeferredDrops) {
match self {
Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
Self::Cached(task) => task.into_trie_for_reuse(
prune_depth,
max_storage_tries,
max_nodes_capacity,
max_values_capacity,
disable_pruning,
),
}
}
pub(super) fn into_cleared_trie(
self,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> (SparseStateTrie<A, S>, DeferredDrops) {
match self {
Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
Self::Cached(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
}
}
}
/// A task responsible for populating the sparse trie.
pub(super) struct SparseTrieTask<BPF, A = ParallelSparseTrie, S = ParallelSparseTrie>
where
BPF: TrieNodeProviderFactory + Send + Sync,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
{
/// Receives updates from the state root task.
pub(super) updates: mpsc::Receiver<SparseTrieUpdate>,
/// `SparseStateTrie` used for computing the state root.
pub(super) trie: SparseStateTrie<A, S>,
pub(super) metrics: MultiProofTaskMetrics,
/// Trie node provider factory.
blinded_provider_factory: BPF,
}
impl<BPF, A, S> SparseTrieTask<BPF, A, S>
where
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
{
/// Creates a new sparse trie task with the given trie.
pub(super) const fn new(
updates: mpsc::Receiver<SparseTrieUpdate>,
blinded_provider_factory: BPF,
metrics: MultiProofTaskMetrics,
trie: SparseStateTrie<A, S>,
) -> Self {
Self { updates, metrics, trie, blinded_provider_factory }
}
/// Runs the sparse trie task to completion, computing the state root.
///
/// Receives [`SparseTrieUpdate`]s until the channel is closed, applying each update
/// to the trie. Once all updates are processed, computes and returns the final state root.
#[instrument(
name = "SparseTrieTask::run",
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
let now = Instant::now();
let mut num_iterations = 0;
while let Ok(mut update) = self.updates.recv() {
num_iterations += 1;
let mut num_updates = 1;
let _enter =
debug_span!(target: "engine::tree::payload_processor::sparse_trie", "drain updates")
.entered();
while let Ok(next) = self.updates.try_recv() {
update.extend(next);
num_updates += 1;
}
drop(_enter);
debug!(
target: "engine::root",
num_updates,
account_proofs = update.multiproof.account_proofs_len(),
storage_proofs = update.multiproof.storage_proofs_len(),
"Updating sparse trie"
);
let elapsed =
update_sparse_trie(&mut self.trie, update, &self.blinded_provider_factory)
.map_err(|e| {
ParallelStateRootError::Other(format!(
"could not calculate state root: {e:?}"
))
})?;
self.metrics.sparse_trie_update_duration_histogram.record(elapsed);
trace!(target: "engine::root", ?elapsed, num_iterations, "Root calculation completed");
}
debug!(target: "engine::root", num_iterations, "All proofs processed, ending calculation");
let start = Instant::now();
let (state_root, trie_updates) =
self.trie.root_with_updates(&self.blinded_provider_factory).map_err(|e| {
ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
})?;
#[cfg(feature = "trie-debug")]
let debug_recorders = self.trie.take_debug_recorders();
let end = Instant::now();
self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
Ok(StateRootComputeOutcome {
state_root,
trie_updates,
#[cfg(feature = "trie-debug")]
debug_recorders,
})
}
/// Clears and shrinks the trie, discarding all state.
///
/// Use this when the payload was invalid or cancelled - we don't want to preserve
/// potentially invalid trie state, but we keep the allocations for reuse.
pub(super) fn into_cleared_trie(
self,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> (SparseStateTrie<A, S>, DeferredDrops) {
let Self { mut trie, .. } = self;
trie.clear();
trie.shrink_to(max_nodes_capacity, max_values_capacity);
let deferred = trie.take_deferred_drops();
(trie, deferred)
}
}
use tracing::{debug, debug_span, error, instrument};
/// Maximum number of pending/prewarm updates that we accumulate in memory before actually applying.
const MAX_PENDING_UPDATES: usize = 100;
@@ -436,14 +252,10 @@ where
let Ok(result) = message else {
unreachable!("we own the sender half")
};
let ProofResult::V2(mut result) = result.result? else {
unreachable!("sparse trie as cache must only be used with multiproof v2");
};
let mut result = result.result?;
while let Ok(next) = self.proof_result_rx.try_recv() {
let ProofResult::V2(res) = next.result? else {
unreachable!("sparse trie as cache must only be used with multiproof v2");
};
let res = next.result?;
result.extend(res);
}
@@ -516,11 +328,7 @@ where
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
fn on_prewarm_targets(&mut self, targets: VersionedMultiProofTargets) {
let VersionedMultiProofTargets::V2(targets) = targets else {
unreachable!("sparse trie as cache must only be used with V2 multiproof targets");
};
fn on_prewarm_targets(&mut self, targets: MultiProofTargetsV2) {
for target in targets.account_targets {
// Only touch accounts that are not yet present in the updates set.
self.new_account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
@@ -873,7 +681,7 @@ where
MultiProofTargetsV2::chunks,
|proof_targets| {
if let Err(e) = self.proof_worker_handle.dispatch_account_multiproof(
AccountMultiproofInput::V2 {
AccountMultiproofInput {
targets: proof_targets,
proof_result_sender: ProofResultContext::new(
self.proof_result_tx.clone(),
@@ -896,7 +704,7 @@ enum SparseTrieTaskMessage {
/// A hashed state update ready to be processed.
HashedState(HashedPostState),
/// Prefetch proof targets (passed through directly).
PrefetchProofs(VersionedMultiProofTargets),
PrefetchProofs(MultiProofTargetsV2),
/// Signals that all state updates have been received.
FinishedStateUpdates,
}
@@ -915,160 +723,6 @@ pub struct StateRootComputeOutcome {
pub debug_recorders: Vec<(Option<B256>, TrieDebugRecorder)>,
}
/// Updates the sparse trie with the given proofs and state, and returns the elapsed time.
#[instrument(level = "debug", target = "engine::tree::payload_processor::sparse_trie", skip_all)]
pub(crate) fn update_sparse_trie<BPF, A, S>(
trie: &mut SparseStateTrie<A, S>,
SparseTrieUpdate { mut state, multiproof }: SparseTrieUpdate,
blinded_provider_factory: &BPF,
) -> SparseStateTrieResult<Duration>
where
BPF: TrieNodeProviderFactory + Send + Sync,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
{
trace!(target: "engine::root::sparse", "Updating sparse trie");
let started_at = Instant::now();
// Reveal new accounts and storage slots.
match multiproof {
ProofResult::Legacy(decoded, _) => {
trie.reveal_decoded_multiproof(decoded)?;
}
ProofResult::V2(decoded_v2) => {
trie.reveal_decoded_multiproof_v2(decoded_v2)?;
}
}
let reveal_multiproof_elapsed = started_at.elapsed();
trace!(
target: "engine::root::sparse",
?reveal_multiproof_elapsed,
"Done revealing multiproof"
);
// Update storage slots with new values and calculate storage roots.
let span = tracing::Span::current();
let results: Vec<_> = state
.storages
.into_iter()
.map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
.par_bridge_buffered()
.map(|(address, storage, storage_trie)| {
let _enter =
debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage trie", ?address)
.entered();
trace!(target: "engine::tree::payload_processor::sparse_trie", "Updating storage");
let storage_provider = blinded_provider_factory.storage_node_provider(address);
let mut storage_trie = storage_trie.ok_or(SparseTrieErrorKind::Blind)?;
if storage.wiped {
trace!(target: "engine::tree::payload_processor::sparse_trie", "Wiping storage");
storage_trie.wipe()?;
}
// Defer leaf removals until after updates/additions, so that we don't delete an
// intermediate branch node during a removal and then re-add that branch back during a
// later leaf addition. This is an optimization, but also a requirement inherited from
// multiproof generating, which can't know the order that leaf operations happen in.
let mut removed_slots = SmallVec::<[Nibbles; 8]>::new();
for (slot, value) in storage.storage {
let slot_nibbles = Nibbles::unpack(slot);
if value.is_zero() {
removed_slots.push(slot_nibbles);
continue;
}
trace!(target: "engine::tree::payload_processor::sparse_trie", ?slot_nibbles, "Updating storage slot");
storage_trie.update_leaf(
slot_nibbles,
alloy_rlp::encode_fixed_size(&value).to_vec(),
&storage_provider,
)?;
}
for slot_nibbles in removed_slots {
trace!(target: "engine::root::sparse", ?slot_nibbles, "Removing storage slot");
storage_trie.remove_leaf(&slot_nibbles, &storage_provider)?;
}
storage_trie.root();
SparseStateTrieResult::Ok((address, storage_trie))
})
.collect();
// Defer leaf removals until after updates/additions, so that we don't delete an intermediate
// branch node during a removal and then re-add that branch back during a later leaf addition.
// This is an optimization, but also a requirement inherited from multiproof generating, which
// can't know the order that leaf operations happen in.
let mut removed_accounts = Vec::new();
// Update account storage roots
let _enter =
tracing::debug_span!(target: "engine::tree::payload_processor::sparse_trie", "account trie")
.entered();
for result in results {
let (address, storage_trie) = result?;
trie.insert_storage_trie(address, storage_trie);
if let Some(account) = state.accounts.remove(&address) {
// If the account itself has an update, remove it from the state update and update in
// one go instead of doing it down below.
trace!(target: "engine::root::sparse", ?address, "Updating account and its storage root");
if !trie.update_account(
address,
account.unwrap_or_default(),
blinded_provider_factory,
)? {
removed_accounts.push(address);
}
} else if trie.is_account_revealed(address) {
// Otherwise, if the account is revealed, only update its storage root.
trace!(target: "engine::root::sparse", ?address, "Updating account storage root");
if !trie.update_account_storage_root(address, blinded_provider_factory)? {
removed_accounts.push(address);
}
}
}
// Update accounts
for (address, account) in state.accounts {
trace!(target: "engine::root::sparse", ?address, "Updating account");
if !trie.update_account(address, account.unwrap_or_default(), blinded_provider_factory)? {
removed_accounts.push(address);
}
}
// Remove accounts
for address in removed_accounts {
trace!(target: "engine::root::sparse", ?address, "Removing account");
let nibbles = Nibbles::unpack(address);
trie.remove_account_leaf(&nibbles, blinded_provider_factory)?;
}
let elapsed_before = started_at.elapsed();
trace!(
target: "engine::root::sparse",
"Calculating subtries"
);
trie.calculate_subtries();
let elapsed = started_at.elapsed();
let below_level_elapsed = elapsed - elapsed_before;
trace!(
target: "engine::root::sparse",
?below_level_elapsed,
"Intermediate nodes calculated"
);
Ok(elapsed)
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -39,9 +39,7 @@ pub struct DefaultEngineValues {
storage_worker_count: Option<usize>,
account_worker_count: Option<usize>,
prewarming_threads: Option<usize>,
disable_proof_v2: bool,
cache_metrics_disabled: bool,
disable_trie_cache: bool,
sparse_trie_prune_depth: usize,
sparse_trie_max_storage_tries: usize,
disable_sparse_trie_cache_pruning: bool,
@@ -176,24 +174,12 @@ impl DefaultEngineValues {
self
}
/// Set whether to disable proof V2 by default
pub const fn with_disable_proof_v2(mut self, v: bool) -> Self {
self.disable_proof_v2 = v;
self
}
/// Set whether to disable cache metrics by default
pub const fn with_cache_metrics_disabled(mut self, v: bool) -> Self {
self.cache_metrics_disabled = v;
self
}
/// Set whether to disable sparse trie cache by default
pub const fn with_disable_trie_cache(mut self, v: bool) -> Self {
self.disable_trie_cache = v;
self
}
/// Set the sparse trie prune depth by default
pub const fn with_sparse_trie_prune_depth(mut self, v: usize) -> Self {
self.sparse_trie_prune_depth = v;
@@ -241,9 +227,7 @@ impl Default for DefaultEngineValues {
storage_worker_count: None,
account_worker_count: None,
prewarming_threads: None,
disable_proof_v2: false,
cache_metrics_disabled: false,
disable_trie_cache: false,
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
disable_sparse_trie_cache_pruning: false,
@@ -373,18 +357,10 @@ pub struct EngineArgs {
#[arg(long = "engine.prewarming-threads", default_value = Resettable::from(DefaultEngineValues::get_global().prewarming_threads.map(|v| v.to_string().into())))]
pub prewarming_threads: Option<usize>,
/// Disable V2 storage proofs for state root calculations
#[arg(long = "engine.disable-proof-v2", default_value_t = DefaultEngineValues::get_global().disable_proof_v2)]
pub disable_proof_v2: bool,
/// Disable cache metrics recording, which can take up to 50ms with large cached state.
#[arg(long = "engine.disable-cache-metrics", default_value_t = DefaultEngineValues::get_global().cache_metrics_disabled)]
pub cache_metrics_disabled: bool,
/// Disable sparse trie cache.
#[arg(long = "engine.disable-trie-cache", default_value_t = DefaultEngineValues::get_global().disable_trie_cache, conflicts_with = "disable_proof_v2")]
pub disable_trie_cache: bool,
/// Sparse trie prune depth.
#[arg(long = "engine.sparse-trie-prune-depth", default_value_t = DefaultEngineValues::get_global().sparse_trie_prune_depth)]
pub sparse_trie_prune_depth: usize,
@@ -438,9 +414,7 @@ impl Default for EngineArgs {
storage_worker_count,
account_worker_count,
prewarming_threads,
disable_proof_v2,
cache_metrics_disabled,
disable_trie_cache,
sparse_trie_prune_depth,
sparse_trie_max_storage_tries,
disable_sparse_trie_cache_pruning,
@@ -470,9 +444,7 @@ impl Default for EngineArgs {
storage_worker_count,
account_worker_count,
prewarming_threads,
disable_proof_v2,
cache_metrics_disabled,
disable_trie_cache,
sparse_trie_prune_depth,
sparse_trie_max_storage_tries,
disable_sparse_trie_cache_pruning,
@@ -506,9 +478,7 @@ impl EngineArgs {
.with_unwind_canonical_header(self.allow_unwind_canonical_header)
.with_storage_worker_count_opt(self.storage_worker_count)
.with_account_worker_count_opt(self.account_worker_count)
.with_disable_proof_v2(self.disable_proof_v2)
.without_cache_metrics(self.cache_metrics_disabled)
.with_disable_trie_cache(self.disable_trie_cache)
.with_sparse_trie_prune_depth(self.sparse_trie_prune_depth)
.with_sparse_trie_max_storage_tries(self.sparse_trie_max_storage_tries)
.with_disable_sparse_trie_cache_pruning(self.disable_sparse_trie_cache_pruning)
@@ -562,9 +532,7 @@ mod tests {
storage_worker_count: Some(16),
account_worker_count: Some(8),
prewarming_threads: Some(4),
disable_proof_v2: false,
cache_metrics_disabled: true,
disable_trie_cache: true,
sparse_trie_prune_depth: 10,
sparse_trie_max_storage_tries: 100,
disable_sparse_trie_cache_pruning: true,
@@ -601,7 +569,6 @@ mod tests {
"--engine.prewarming-threads",
"4",
"--engine.disable-cache-metrics",
"--engine.disable-trie-cache",
"--engine.sparse-trie-prune-depth",
"10",
"--engine.sparse-trie-max-storage-tries",

View File

@@ -17,7 +17,6 @@ reth-primitives-traits = { workspace = true, features = ["dashmap", "std"] }
reth-execution-errors.workspace = true
reth-provider.workspace = true
reth-storage-errors.workspace = true
reth-trie-common.workspace = true
reth-trie-sparse = { workspace = true, features = ["std"] }
reth-tasks = { workspace = true, features = ["rayon"] }
reth-trie.workspace = true
@@ -60,7 +59,6 @@ metrics = ["reth-metrics", "dep:metrics", "reth-trie/metrics", "reth-trie-sparse
test-utils = [
"reth-primitives-traits/test-utils",
"reth-provider/test-utils",
"reth-trie-common/test-utils",
"reth-trie-db/test-utils",
"reth-trie-sparse/test-utils",
"reth-trie/test-utils",

View File

@@ -18,8 +18,6 @@ pub mod stats;
pub mod root;
/// Implementation of parallel proof computation.
pub mod proof;
pub mod proof_task;
/// Async value encoder for V2 proofs.

View File

@@ -1,283 +0,0 @@
use crate::{
metrics::ParallelTrieMetrics,
proof_task::{AccountMultiproofInput, ProofResult, ProofResultContext, ProofWorkerHandle},
root::ParallelStateRootError,
StorageRootTargets,
};
use crossbeam_channel::unbounded as crossbeam_unbounded;
use reth_primitives_traits::FastInstant as Instant;
use reth_trie::{
prefix_set::{PrefixSetMut, TriePrefixSets, TriePrefixSetsMut},
DecodedMultiProof, HashedPostState, MultiProofTargets, Nibbles,
};
use reth_trie_common::added_removed_keys::MultiAddedRemovedKeys;
use std::sync::Arc;
use tracing::trace;
/// Parallel proof calculator.
///
/// This can collect proof for many targets in parallel, spawning a task for each hashed address
/// that has proof targets.
#[derive(Debug)]
pub struct ParallelProof {
/// The collection of prefix sets for the computation.
pub prefix_sets: Arc<TriePrefixSetsMut>,
/// Flag indicating whether to include branch node masks in the proof.
collect_branch_node_masks: bool,
/// Provided by the user to give the necessary context to retain extra proofs.
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
/// Handle to the proof worker pools.
proof_worker_handle: ProofWorkerHandle,
/// Whether to use V2 storage proofs.
v2_proofs_enabled: bool,
#[cfg(feature = "metrics")]
metrics: ParallelTrieMetrics,
}
impl ParallelProof {
/// Create new state proof generator.
pub fn new(
prefix_sets: Arc<TriePrefixSetsMut>,
proof_worker_handle: ProofWorkerHandle,
) -> Self {
Self {
prefix_sets,
collect_branch_node_masks: false,
multi_added_removed_keys: None,
proof_worker_handle,
v2_proofs_enabled: false,
#[cfg(feature = "metrics")]
metrics: ParallelTrieMetrics::new_with_labels(&[("type", "proof")]),
}
}
/// Set whether to use V2 storage proofs.
pub const fn with_v2_proofs_enabled(mut self, v2_proofs_enabled: bool) -> Self {
self.v2_proofs_enabled = v2_proofs_enabled;
self
}
/// Set the flag indicating whether to include branch node masks in the proof.
pub const fn with_branch_node_masks(mut self, branch_node_masks: bool) -> Self {
self.collect_branch_node_masks = branch_node_masks;
self
}
/// Configure the `ParallelProof` with a [`MultiAddedRemovedKeys`], allowing for retaining
/// extra proofs needed to add and remove leaf nodes from the tries.
pub fn with_multi_added_removed_keys(
mut self,
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
) -> Self {
self.multi_added_removed_keys = multi_added_removed_keys;
self
}
/// Extends prefix sets with the given multiproof targets and returns the frozen result.
///
/// This is a helper function used to prepare prefix sets before computing multiproofs.
/// Returns frozen (immutable) prefix sets ready for use in proof computation.
pub fn extend_prefix_sets_with_targets(
base_prefix_sets: &TriePrefixSetsMut,
targets: &MultiProofTargets,
) -> TriePrefixSets {
let mut extended = base_prefix_sets.clone();
extended.extend(TriePrefixSetsMut {
account_prefix_set: PrefixSetMut::from(targets.keys().copied().map(Nibbles::unpack)),
storage_prefix_sets: targets
.iter()
.filter(|&(_hashed_address, slots)| !slots.is_empty())
.map(|(hashed_address, slots)| {
(*hashed_address, PrefixSetMut::from(slots.iter().map(Nibbles::unpack)))
})
.collect(),
destroyed_accounts: Default::default(),
});
extended.freeze()
}
/// Generate a state multiproof according to specified targets.
pub fn decoded_multiproof(
self,
targets: MultiProofTargets,
) -> Result<DecodedMultiProof, ParallelStateRootError> {
// Extend prefix sets with targets
let prefix_sets = Self::extend_prefix_sets_with_targets(&self.prefix_sets, &targets);
let storage_root_targets_len = StorageRootTargets::count(
&prefix_sets.account_prefix_set,
&prefix_sets.storage_prefix_sets,
);
trace!(
target: "trie::parallel_proof",
total_targets = storage_root_targets_len,
"Starting parallel proof generation"
);
// Queue account multiproof request to account worker pool
// Create channel for receiving ProofResultMessage
let (result_tx, result_rx) = crossbeam_unbounded();
let account_multiproof_start_time = Instant::now();
let input = AccountMultiproofInput::Legacy {
targets,
prefix_sets,
collect_branch_node_masks: self.collect_branch_node_masks,
multi_added_removed_keys: self.multi_added_removed_keys.clone(),
proof_result_sender: ProofResultContext::new(
result_tx,
0,
HashedPostState::default(),
account_multiproof_start_time,
),
};
self.proof_worker_handle
.dispatch_account_multiproof(input)
.map_err(|e| ParallelStateRootError::Other(e.to_string()))?;
// Wait for account multiproof result from worker
let proof_result_msg = result_rx.recv().map_err(|_| {
ParallelStateRootError::Other(
"Account multiproof channel dropped: worker died or pool shutdown".to_string(),
)
})?;
let ProofResult::Legacy(multiproof, stats) = proof_result_msg.result? else {
panic!("AccountMultiproofInput::Legacy was submitted, expected legacy result")
};
#[cfg(feature = "metrics")]
self.metrics.record(stats);
trace!(
target: "trie::parallel_proof",
total_targets = storage_root_targets_len,
duration = ?stats.duration(),
branches_added = stats.branches_added(),
leaves_added = stats.leaves_added(),
missed_leaves = stats.missed_leaves(),
precomputed_storage_roots = stats.precomputed_storage_roots(),
"Calculated decoded proof",
);
Ok(multiproof)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::proof_task::{ProofTaskCtx, ProofWorkerHandle};
use alloy_primitives::{
keccak256,
map::{B256Set, DefaultHashBuilder, HashMap},
Address, B256, U256,
};
use rand::Rng;
use reth_primitives_traits::{Account, StorageEntry};
use reth_provider::{test_utils::create_test_provider_factory, HashingWriter};
use reth_trie::proof::Proof;
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
#[test]
fn random_parallel_proof() {
let factory = create_test_provider_factory();
let mut rng = rand::rng();
let state = (0..100)
.map(|_| {
let address = Address::random();
let account =
Account { balance: U256::from(rng.random::<u64>()), ..Default::default() };
let mut storage = HashMap::<B256, U256, DefaultHashBuilder>::default();
let has_storage = rng.random_bool(0.7);
if has_storage {
for _ in 0..100 {
storage.insert(
B256::from(U256::from(rng.random::<u64>())),
U256::from(rng.random::<u64>()),
);
}
}
(address, (account, storage))
})
.collect::<HashMap<_, _, DefaultHashBuilder>>();
{
let provider_rw = factory.provider_rw().unwrap();
provider_rw
.insert_account_for_hashing(
state.iter().map(|(address, (account, _))| (*address, Some(*account))),
)
.unwrap();
provider_rw
.insert_storage_for_hashing(state.iter().map(|(address, (_, storage))| {
(
*address,
storage
.iter()
.map(|(slot, value)| StorageEntry { key: *slot, value: *value }),
)
}))
.unwrap();
provider_rw.commit().unwrap();
}
let mut targets = MultiProofTargets::default();
for (address, (_, storage)) in state.iter().take(10) {
let hashed_address = keccak256(*address);
let mut target_slots = B256Set::default();
for (slot, _) in storage.iter().take(5) {
target_slots.insert(*slot);
}
if !target_slots.is_empty() {
targets.insert(hashed_address, target_slots);
}
}
let provider_rw = factory.provider_rw().unwrap();
let trie_cursor_factory = DatabaseTrieCursorFactory::new(provider_rw.tx_ref());
let hashed_cursor_factory = DatabaseHashedCursorFactory::new(provider_rw.tx_ref());
let changeset_cache = reth_trie_db::ChangesetCache::new();
let factory =
reth_provider::providers::OverlayStateProviderFactory::new(factory, changeset_cache);
let task_ctx = ProofTaskCtx::new(factory);
let runtime = reth_tasks::Runtime::test();
let proof_worker_handle = ProofWorkerHandle::new(&runtime, task_ctx, false, false);
let parallel_result = ParallelProof::new(Default::default(), proof_worker_handle.clone())
.decoded_multiproof(targets.clone())
.unwrap();
let sequential_result_raw = Proof::new(trie_cursor_factory, hashed_cursor_factory)
.multiproof(targets.clone())
.unwrap(); // targets might be consumed by parallel_result
let sequential_result_decoded: DecodedMultiProof = sequential_result_raw
.try_into()
.expect("Failed to decode sequential_result for test comparison");
// to help narrow down what is wrong - first compare account subtries
assert_eq!(parallel_result.account_subtree, sequential_result_decoded.account_subtree);
// then compare length of all storage subtries
assert_eq!(parallel_result.storages.len(), sequential_result_decoded.storages.len());
// then compare each storage subtrie
for (hashed_address, storage_proof) in &parallel_result.storages {
let sequential_storage_proof =
sequential_result_decoded.storages.get(hashed_address).unwrap();
assert_eq!(storage_proof, sequential_storage_proof);
}
// then compare the entire thing for any mask differences
assert_eq!(parallel_result, sequential_result_decoded);
// Workers shut down automatically when handle is dropped
drop(proof_worker_handle);
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,4 @@
use crate::proof_task::{StorageProofResult, StorageProofResultMessage};
use crate::proof_task::StorageProofResultMessage;
use alloy_primitives::{map::B256Map, B256};
use alloy_rlp::Encodable;
use core::cell::RefCell;
@@ -109,11 +109,7 @@ impl<TC, HC> Drop for AsyncAccountDeferredValueEncoder<TC, HC> {
stats.borrow_mut().storage_wait_time += wait_start.elapsed();
let StorageProofResult::V2 { proof, .. } = result else {
panic!("StorageProofResult is not V2: {result:?}")
};
storage_proof_results.borrow_mut().insert(*hashed_address, proof);
storage_proof_results.borrow_mut().insert(*hashed_address, result.proof);
Ok(())
})()
} else {
@@ -159,13 +155,9 @@ where
.result?;
stats.borrow_mut().storage_wait_time += wait_start.elapsed();
let StorageProofResult::V2 { root, proof } = result else {
panic!("StorageProofResult is not V2: {result:?}")
};
storage_proof_results.borrow_mut().insert(hashed_address, result.proof);
storage_proof_results.borrow_mut().insert(hashed_address, proof);
let root = match root {
let root = match result.root {
Some(root) => root,
None => {
// In `compute_v2_account_multiproof` we ensure that all dispatched storage
@@ -290,11 +282,7 @@ impl<TC, HC> AsyncAccountValueEncoder<TC, HC> {
.result?;
stats.storage_wait_time += wait_start.elapsed();
let StorageProofResult::V2 { proof, .. } = result else {
panic!("StorageProofResult is not V2: {result:?}")
};
storage_proof_results.insert(*hashed_address, proof);
storage_proof_results.insert(*hashed_address, result.proof);
}
Ok((storage_proof_results, stats))

View File

@@ -984,15 +984,9 @@ Engine:
--engine.prewarming-threads <PREWARMING_THREADS>
Configure the number of prewarming threads. If not specified, defaults to available parallelism
--engine.disable-proof-v2
Disable V2 storage proofs for state root calculations
--engine.disable-cache-metrics
Disable cache metrics recording, which can take up to 50ms with large cached state
--engine.disable-trie-cache
Disable sparse trie cache
--engine.sparse-trie-prune-depth <SPARSE_TRIE_PRUNE_DEPTH>
Sparse trie prune depth