feat(engine): preserve sparse trie across payload validations (#21534)

Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Brian Picciano <me@mediocregopher.com>
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Matthias Seitz
2026-01-30 19:34:13 +01:00
committed by GitHub
parent 29072639d6
commit e1bc6d0f08
13 changed files with 936 additions and 257 deletions

View File

@@ -47,6 +47,17 @@ pub const DEFAULT_RESERVED_CPU_CORES: usize = 1;
/// Default maximum concurrency for prewarm task.
pub const DEFAULT_PREWARM_MAX_CONCURRENCY: usize = 16;
/// Default depth for sparse trie pruning.
///
/// Nodes at this depth and below are converted to hash stubs to reduce memory.
/// Depth 4 means we keep roughly 16^4 = 65536 potential branch paths at most.
pub const DEFAULT_SPARSE_TRIE_PRUNE_DEPTH: usize = 4;
/// Default maximum number of storage tries to keep after pruning.
///
/// Storage tries beyond this limit are cleared (but allocations preserved).
pub const DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES: usize = 100;
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = EPOCH_SLOTS as u32 * 2;
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
@@ -154,6 +165,10 @@ pub struct TreeConfig {
disable_cache_metrics: bool,
/// Whether to enable sparse trie as cache.
enable_sparse_trie_as_cache: bool,
/// Depth for sparse trie pruning after state root computation.
sparse_trie_prune_depth: usize,
/// Maximum number of storage tries to retain after pruning.
sparse_trie_max_storage_tries: usize,
}
impl Default for TreeConfig {
@@ -184,6 +199,8 @@ impl Default for TreeConfig {
disable_proof_v2: false,
disable_cache_metrics: false,
enable_sparse_trie_as_cache: false,
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
}
}
}
@@ -216,6 +233,8 @@ impl TreeConfig {
account_worker_count: usize,
disable_proof_v2: bool,
disable_cache_metrics: bool,
sparse_trie_prune_depth: usize,
sparse_trie_max_storage_tries: usize,
) -> Self {
Self {
persistence_threshold,
@@ -243,6 +262,8 @@ impl TreeConfig {
disable_proof_v2,
disable_cache_metrics,
enable_sparse_trie_as_cache: false,
sparse_trie_prune_depth,
sparse_trie_max_storage_tries,
}
}
@@ -555,4 +576,26 @@ impl TreeConfig {
self.enable_sparse_trie_as_cache = value;
self
}
/// Returns the sparse trie prune depth.
pub const fn sparse_trie_prune_depth(&self) -> usize {
self.sparse_trie_prune_depth
}
/// Setter for sparse trie prune depth.
pub const fn with_sparse_trie_prune_depth(mut self, depth: usize) -> Self {
self.sparse_trie_prune_depth = depth;
self
}
/// Returns the maximum number of storage tries to retain after pruning.
pub const fn sparse_trie_max_storage_tries(&self) -> usize {
self.sparse_trie_max_storage_tries
}
/// Setter for maximum storage tries to retain.
pub const fn with_sparse_trie_max_storage_tries(mut self, max_tries: usize) -> Self {
self.sparse_trie_max_storage_tries = max_tries;
self
}
}

View File

@@ -7,7 +7,7 @@ use crate::tree::{
prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
sparse_trie::StateRootComputeOutcome,
},
sparse_trie::{SparseTrieCacheTask, SparseTrieTask},
sparse_trie::{SparseTrieCacheTask, SparseTrieTask, SpawnedSparseTrieTask},
StateProviderBuilder, TreeConfig,
};
use alloy_eip7928::BlockAccessList;
@@ -56,10 +56,13 @@ use tracing::{debug, debug_span, instrument, warn, Span};
pub mod bal;
pub mod executor;
pub mod multiproof;
mod preserved_sparse_trie;
pub mod prewarm;
pub mod receipt_root_task;
pub mod sparse_trie;
use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
///
/// These values were determined by performing benchmarks using gradually increasing values to judge
@@ -122,13 +125,16 @@ where
precompile_cache_disabled: bool,
/// Precompile cache map.
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
/// A cleared `SparseStateTrie`, kept around to be reused for the state root computation so
/// that allocations can be minimized.
sparse_state_trie: Arc<
parking_lot::Mutex<Option<ClearedSparseStateTrie<ParallelSparseTrie, ParallelSparseTrie>>>,
>,
/// A pruned `SparseStateTrie`, kept around as a cache of already revealed trie nodes and to
/// re-use allocated memory. Stored with the block hash it was computed for to enable trie
/// preservation across sequential payload validations.
sparse_state_trie: SharedPreservedSparseTrie,
/// Maximum concurrency for prewarm task.
prewarm_max_concurrency: usize,
/// Sparse trie prune depth.
sparse_trie_prune_depth: usize,
/// Maximum storage tries to retain after pruning.
sparse_trie_max_storage_tries: usize,
/// Whether to disable cache metrics recording.
disable_cache_metrics: bool,
}
@@ -160,8 +166,10 @@ where
disable_state_cache: config.disable_state_cache(),
precompile_cache_disabled: config.precompile_cache_disabled(),
precompile_cache_map,
sparse_state_trie: Arc::default(),
sparse_state_trie: SharedPreservedSparseTrie::default(),
prewarm_max_concurrency: config.prewarm_max_concurrency(),
sparse_trie_prune_depth: config.sparse_trie_prune_depth(),
sparse_trie_max_storage_tries: config.sparse_trie_max_storage_tries(),
disable_cache_metrics: config.disable_cache_metrics(),
}
}
@@ -237,6 +245,9 @@ where
// Extract V2 proofs flag early so we can pass it to prewarm
let v2_proofs_enabled = !config.disable_proof_v2();
// Capture parent_state_root before env is moved into spawn_caching_with
let parent_state_root = env.parent_state_root;
// Handle BAL-based optimization if available
let prewarm_handle = if let Some(bal) = bal {
// When BAL is present, use BAL prewarming and send BAL to multiproof
@@ -318,6 +329,7 @@ where
state_root_tx,
from_multi_proof,
config,
parent_state_root,
);
PayloadHandle {
@@ -497,6 +509,8 @@ where
}
/// Spawns the [`SparseTrieTask`] for this payload processor.
///
/// The trie is preserved when the new payload is a child of the previous one.
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
fn spawn_sparse_trie_task(
&self,
@@ -505,64 +519,111 @@ where
state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
from_multi_proof: CrossbeamReceiver<MultiProofMessage>,
config: &TreeConfig,
parent_state_root: B256,
) {
let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
let preserved_sparse_trie = self.sparse_state_trie.clone();
let trie_metrics = self.trie_metrics.clone();
let span = Span::current();
let disable_sparse_trie_as_cache = !config.enable_sparse_trie_as_cache();
let prune_depth = self.sparse_trie_prune_depth;
let max_storage_tries = self.sparse_trie_max_storage_tries;
self.executor.spawn_blocking(move || {
let _enter = span.entered();
// Reuse a stored SparseStateTrie, or create a new one using the desired configuration
// if there's none to reuse.
let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
let default_trie = RevealableSparseTrie::blind_from(
ParallelSparseTrie::default()
.with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
);
ClearedSparseStateTrie::from_state_trie(
// Reuse a stored SparseStateTrie if available, applying continuation logic.
// If this payload's parent state root matches the preserved trie's anchor,
// we can reuse the pruned trie structure. Otherwise, we clear the trie but
// keep allocations.
let sparse_state_trie = preserved_sparse_trie
.take()
.map(|preserved| preserved.into_trie_for(parent_state_root))
.unwrap_or_else(|| {
debug!(
target: "engine::tree::payload_processor",
"Creating new sparse trie - no preserved trie available"
);
let default_trie = RevealableSparseTrie::blind_from(
ParallelSparseTrie::default().with_parallelism_thresholds(
PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS,
),
);
SparseStateTrie::new()
.with_accounts_trie(default_trie.clone())
.with_default_storage_trie(default_trie)
.with_updates(true),
)
});
.with_updates(true)
});
let (result, trie) = if disable_sparse_trie_as_cache {
SparseTrieTask::new_with_cleared_trie(
let mut task = if disable_sparse_trie_as_cache {
SpawnedSparseTrieTask::Cleared(SparseTrieTask::new(
sparse_trie_rx,
proof_worker_handle,
trie_metrics,
trie_metrics.clone(),
sparse_state_trie,
)
.run()
))
} else {
SparseTrieCacheTask::new_with_cleared_trie(
SpawnedSparseTrieTask::Cached(SparseTrieCacheTask::new_with_cleared_trie(
from_multi_proof,
proof_worker_handle,
trie_metrics,
sparse_state_trie,
)
.run()
trie_metrics.clone(),
ClearedSparseStateTrie::from_state_trie(sparse_state_trie),
))
};
// Send state root computation result
let _ = state_root_tx.send(result);
let result = task.run();
// Capture the computed state_root before sending the result
let computed_state_root = result.as_ref().ok().map(|outcome| outcome.state_root);
// Clear the SparseStateTrie, shrink, and replace it back into the mutex _after_ sending
// results to the next step, so that time spent clearing doesn't block the step after
// this one.
let _enter = debug_span!(target: "engine::tree::payload_processor", "clear").entered();
let mut cleared_trie = ClearedSparseStateTrie::from_state_trie(trie);
// Acquire the guard before sending the result to prevent a race condition:
// Without this, the next block could start after send() but before store(),
// causing take() to return None and forcing it to create a new empty trie
// instead of reusing the preserved one. Holding the guard ensures the next
// block's take() blocks until we've stored the trie for reuse.
let mut guard = preserved_sparse_trie.lock();
// Shrink the sparse trie so that we don't have ever increasing memory.
cleared_trie.shrink_to(
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
);
// Send state root computation result - next block may start but will block on take()
if state_root_tx.send(result).is_err() {
// Receiver dropped - payload was likely invalid or cancelled.
// Clear the trie instead of preserving potentially invalid state.
debug!(
target: "engine::tree::payload_processor",
"State root receiver dropped, clearing trie"
);
let trie = task.into_cleared_trie(
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
);
guard.store(PreservedSparseTrie::cleared(trie));
return;
}
cleared_sparse_trie.lock().replace(cleared_trie);
// Only preserve the trie as anchored if computation succeeded.
// A failed computation may have left the trie in a partially updated state.
let _enter =
debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
if let Some(state_root) = computed_state_root {
let start = std::time::Instant::now();
let trie = task.into_trie_for_reuse(
prune_depth,
max_storage_tries,
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
);
trie_metrics
.into_trie_for_reuse_duration_histogram
.record(start.elapsed().as_secs_f64());
guard.store(PreservedSparseTrie::anchored(trie, state_root));
} else {
debug!(
target: "engine::tree::payload_processor",
"State root computation failed, clearing trie"
);
let trie = task.into_cleared_trie(
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
);
guard.store(PreservedSparseTrie::cleared(trie));
}
});
}
@@ -896,6 +957,10 @@ pub struct ExecutionEnv<Evm: ConfigureEvm> {
pub hash: B256,
/// Hash of the parent block.
pub parent_hash: B256,
/// State root of the parent block.
/// Used for sparse trie continuation: if the preserved trie's anchor matches this,
/// the trie can be reused directly.
pub parent_state_root: B256,
}
impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
@@ -907,6 +972,7 @@ where
evm_env: Default::default(),
hash: Default::default(),
parent_hash: Default::default(),
parent_state_root: Default::default(),
}
}
}

View File

@@ -587,6 +587,8 @@ pub(crate) struct MultiProofTaskMetrics {
pub first_update_wait_time_histogram: Histogram,
/// Total time spent waiting for the last proof result.
pub last_proof_wait_time_histogram: Histogram,
/// Time spent preparing the sparse trie for reuse after state root computation.
pub into_trie_for_reuse_duration_histogram: Histogram,
}
/// Standalone task that receives a transaction state stream and updates relevant

View File

@@ -0,0 +1,117 @@
//! Preserved sparse trie for reuse across payload validations.
use alloy_primitives::B256;
use parking_lot::Mutex;
use reth_trie_sparse::SparseStateTrie;
use reth_trie_sparse_parallel::ParallelSparseTrie;
use std::sync::Arc;
use tracing::debug;
/// Type alias for the sparse trie type used in preservation.
pub(super) type SparseTrie = SparseStateTrie<ParallelSparseTrie, ParallelSparseTrie>;
/// Shared handle to a preserved sparse trie that can be reused across payload validations.
///
/// This is stored in [`PayloadProcessor`](super::PayloadProcessor) and cloned to pass to
/// [`SparseTrieTask`](super::sparse_trie::SparseTrieTask) for trie reuse.
#[derive(Debug, Default, Clone)]
pub(super) struct SharedPreservedSparseTrie(Arc<Mutex<Option<PreservedSparseTrie>>>);
impl SharedPreservedSparseTrie {
/// Takes the preserved trie if present, leaving `None` in its place.
pub(super) fn take(&self) -> Option<PreservedSparseTrie> {
self.0.lock().take()
}
/// Acquires a guard that blocks `take()` until dropped.
/// Use this before sending the state root result to ensure the next block
/// waits for the trie to be stored.
pub(super) fn lock(&self) -> PreservedTrieGuard<'_> {
PreservedTrieGuard(self.0.lock())
}
}
/// Guard that holds the lock on the preserved trie.
/// While held, `take()` will block. Call `store()` to save the trie before dropping.
pub(super) struct PreservedTrieGuard<'a>(parking_lot::MutexGuard<'a, Option<PreservedSparseTrie>>);
impl PreservedTrieGuard<'_> {
/// Stores a preserved trie for later reuse.
pub(super) fn store(&mut self, trie: PreservedSparseTrie) {
self.0.replace(trie);
}
}
/// A preserved sparse trie that can be reused across payload validations.
///
/// The trie exists in one of two states:
/// - **Anchored**: Has a computed state root and can be reused for payloads whose parent state root
/// matches the anchor.
/// - **Cleared**: Trie data has been cleared but allocations are preserved for reuse.
#[derive(Debug)]
pub(super) enum PreservedSparseTrie {
/// Trie with a computed state root that can be reused for continuation payloads.
Anchored {
/// The sparse state trie (pruned after root computation).
trie: SparseTrie,
/// The state root this trie represents (computed from the previous block).
/// Used to verify continuity: new payload's `parent_state_root` must match this.
state_root: B256,
},
/// Cleared trie with preserved allocations, ready for fresh use.
Cleared {
/// The sparse state trie with cleared data but preserved allocations.
trie: SparseTrie,
},
}
impl PreservedSparseTrie {
/// Creates a new anchored preserved trie.
///
/// The `state_root` is the computed state root from the trie, which becomes the
/// anchor for determining if subsequent payloads can reuse this trie.
pub(super) const fn anchored(trie: SparseTrie, state_root: B256) -> Self {
Self::Anchored { trie, state_root }
}
/// Creates a cleared preserved trie (allocations preserved, data cleared).
pub(super) const fn cleared(trie: SparseTrie) -> Self {
Self::Cleared { trie }
}
/// Consumes self and returns the trie for reuse.
///
/// If the preserved trie is anchored and the parent state root matches, the pruned
/// trie structure is reused directly. Otherwise, the trie is cleared but allocations
/// are preserved to reduce memory overhead.
pub(super) fn into_trie_for(self, parent_state_root: B256) -> SparseTrie {
match self {
Self::Anchored { trie, state_root } if state_root == parent_state_root => {
debug!(
target: "engine::tree::payload_processor",
%state_root,
"Reusing anchored sparse trie for continuation payload"
);
trie
}
Self::Anchored { mut trie, state_root } => {
debug!(
target: "engine::tree::payload_processor",
anchor_root = %state_root,
%parent_state_root,
"Clearing anchored sparse trie - parent state root mismatch"
);
trie.clear();
trie
}
Self::Cleared { trie } => {
debug!(
target: "engine::tree::payload_processor",
%parent_state_root,
"Using cleared sparse trie with preserved allocations"
);
trie
}
}
}
}

View File

@@ -36,6 +36,64 @@ use std::{
};
use tracing::{debug, debug_span, instrument, trace};
#[expect(clippy::large_enum_variant)]
pub(super) enum SpawnedSparseTrieTask<BPF, A, S>
where
BPF: TrieNodeProviderFactory + Send + Sync,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
{
Cleared(SparseTrieTask<BPF, A, S>),
Cached(SparseTrieCacheTask<A, S>),
}
impl<BPF, A, S> SpawnedSparseTrieTask<BPF, A, S>
where
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
{
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
match self {
Self::Cleared(task) => task.run(),
Self::Cached(task) => task.run(),
}
}
pub(super) fn into_trie_for_reuse(
self,
prune_depth: usize,
max_storage_tries: usize,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> SparseStateTrie<A, S> {
match self {
Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
Self::Cached(task) => task.into_trie_for_reuse(
prune_depth,
max_storage_tries,
max_nodes_capacity,
max_values_capacity,
),
}
}
pub(super) fn into_cleared_trie(
self,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> SparseStateTrie<A, S> {
match self {
Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
Self::Cached(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
}
}
}
/// A task responsible for populating the sparse trie.
pub(super) struct SparseTrieTask<BPF, A = SerialSparseTrie, S = SerialSparseTrie>
where
@@ -57,46 +115,29 @@ where
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
{
/// Creates a new sparse trie, pre-populating with a [`ClearedSparseStateTrie`].
pub(super) fn new_with_cleared_trie(
/// Creates a new sparse trie task with the given trie.
pub(super) const fn new(
updates: mpsc::Receiver<SparseTrieUpdate>,
blinded_provider_factory: BPF,
metrics: MultiProofTaskMetrics,
sparse_state_trie: ClearedSparseStateTrie<A, S>,
trie: SparseStateTrie<A, S>,
) -> Self {
Self { updates, metrics, trie: sparse_state_trie.into_inner(), blinded_provider_factory }
Self { updates, metrics, trie, blinded_provider_factory }
}
/// Runs the sparse trie task to completion.
/// Runs the sparse trie task to completion, computing the state root.
///
/// This waits for new incoming [`SparseTrieUpdate`].
///
/// This concludes once the last trie update has been received.
///
/// # Returns
///
/// - State root computation outcome.
/// - `SparseStateTrie` that needs to be cleared and reused to avoid reallocations.
/// Receives [`SparseTrieUpdate`]s until the channel is closed, applying each update
/// to the trie. Once all updates are processed, computes and returns the final state root.
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
pub(super) fn run(
mut self,
) -> (Result<StateRootComputeOutcome, ParallelStateRootError>, SparseStateTrie<A, S>) {
// run the main loop to completion
let result = self.run_inner();
(result, self.trie)
}
/// Inner function to run the sparse trie task to completion.
///
/// See [`Self::run`] for more information.
fn run_inner(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
let now = Instant::now();
let mut num_iterations = 0;
@@ -146,6 +187,20 @@ where
Ok(StateRootComputeOutcome { state_root, trie_updates })
}
/// Clears and shrinks the trie, discarding all state.
///
/// Use this when the payload was invalid or cancelled - we don't want to preserve
/// potentially invalid trie state, but we keep the allocations for reuse.
pub(super) fn into_cleared_trie(
mut self,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> SparseStateTrie<A, S> {
self.trie.clear();
self.trie.shrink_to(max_nodes_capacity, max_values_capacity);
self.trie
}
}
/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
@@ -216,34 +271,47 @@ where
}
}
/// Prunes and shrinks the trie for reuse in the next payload built on top of this one.
///
/// Should be called after the state root result has been sent.
pub(super) fn into_trie_for_reuse(
mut self,
prune_depth: usize,
max_storage_tries: usize,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> SparseStateTrie<A, S> {
self.trie.prune(prune_depth, max_storage_tries);
self.trie.shrink_to(max_nodes_capacity, max_values_capacity);
self.trie
}
/// Clears and shrinks the trie, discarding all state.
///
/// Use this when the payload was invalid or cancelled - we don't want to preserve
/// potentially invalid trie state, but we keep the allocations for reuse.
pub(super) fn into_cleared_trie(
mut self,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> SparseStateTrie<A, S> {
self.trie.clear();
self.trie.shrink_to(max_nodes_capacity, max_values_capacity);
self.trie
}
/// Runs the sparse trie task to completion.
///
/// This waits for new incoming [`MultiProofMessage`]s, applies updates to the trie and
/// schedules proof fetching when needed.
///
/// This concludes once the last state update has been received and processed.
///
/// # Returns
///
/// - State root computation outcome.
/// - `SparseStateTrie` that needs to be cleared and reused to avoid reallocations.
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
pub(super) fn run(
mut self,
) -> (Result<StateRootComputeOutcome, ParallelStateRootError>, SparseStateTrie<A, S>) {
// run the main loop to completion
let result = self.run_inner();
(result, self.trie)
}
/// Inner function to run the sparse trie task to completion.
///
/// See [`Self::run`] for more information.
fn run_inner(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
let now = Instant::now();
let mut finished_state_updates = false;
@@ -475,7 +543,7 @@ where
let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
let (account, storage_root) = if let Some(account) = account.take() {
// If account is Some(_) here it means it didn't have any storage updates
// If account is Some(_) here it means it didn't have any storage updates
// and we can fetch the storage root directly from the account trie.
//
// If it did have storage updates, we would've had processed it above when iterating over storage tries.

View File

@@ -402,7 +402,12 @@ where
.in_scope(|| self.evm_env_for(&input))
.map_err(NewPayloadError::other)?;
let env = ExecutionEnv { evm_env, hash: input.hash(), parent_hash: input.parent_hash() };
let env = ExecutionEnv {
evm_env,
hash: input.hash(),
parent_hash: input.parent_hash(),
parent_state_root: parent_block.state_root(),
};
// Plan the strategy used for state root computation.
let strategy = self.plan_state_root_computation();

View File

@@ -1,4 +1,4 @@
use crate::utils::eth_payload_attributes;
use crate::utils::{advance_with_random_transactions, eth_payload_attributes};
use alloy_eips::eip7685::RequestsOrHash;
use alloy_genesis::Genesis;
use alloy_primitives::{Address, B256};
@@ -6,8 +6,9 @@ use alloy_rpc_types_engine::{PayloadAttributes, PayloadStatusEnum};
use jsonrpsee_core::client::ClientT;
use reth_chainspec::{ChainSpecBuilder, EthChainSpec, MAINNET};
use reth_e2e_test_utils::{
node::NodeTestContext, setup, transaction::TransactionTestContext, wallet::Wallet,
node::NodeTestContext, setup, setup_engine, transaction::TransactionTestContext, wallet::Wallet,
};
use reth_node_api::TreeConfig;
use reth_node_builder::{NodeBuilder, NodeHandle};
use reth_node_core::{args::RpcServerArgs, node_config::NodeConfig};
use reth_node_ethereum::EthereumNode;
@@ -256,3 +257,56 @@ async fn test_testing_build_block_v1_osaka() -> eyre::Result<()> {
Ok(())
}
/// Tests that sparse trie allocation reuse works correctly across consecutive blocks.
///
/// This test exercises the sparse trie allocation reuse path by:
/// 1. Starting a node with parallel state root computation enabled
/// 2. Advancing multiple consecutive blocks with random transactions
/// 3. Verifying that all blocks are successfully validated (state roots match)
///
/// Note: Trie structure reuse is currently disabled due to pruning creating blinded
/// nodes. The preserved trie's allocations are still reused to reduce memory overhead,
/// but the trie is cleared between blocks.
#[tokio::test]
async fn test_sparse_trie_reuse_across_blocks() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
// Use parallel state root (non-legacy) with pruning enabled
let tree_config = TreeConfig::default()
.with_legacy_state_root(false)
.with_sparse_trie_prune_depth(2)
.with_sparse_trie_max_storage_tries(100);
let (mut nodes, _tasks, _wallet) = setup_engine::<EthereumNode>(
1,
Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(serde_json::from_str(include_str!("../assets/genesis.json")).unwrap())
.cancun_activated()
.build(),
),
false,
tree_config,
eth_payload_attributes,
)
.await?;
let mut node = nodes.pop().unwrap();
// Use a seeded RNG for reproducibility
let mut rng = rand::rng();
// Advance multiple consecutive blocks with random transactions.
// This exercises the sparse trie reuse path where each block's pruned trie
// is reused for the next block's state root computation.
let num_blocks = 5;
advance_with_random_transactions(&mut node, num_blocks, &mut rng, true).await?;
// Verify the chain advanced correctly
let best_block = node.inner.provider.best_block_number()?;
assert_eq!(best_block, num_blocks as u64, "Expected {} blocks, got {}", num_blocks, best_block);
Ok(())
}

View File

@@ -39,7 +39,7 @@ pub struct StorageSettings {
impl StorageSettings {
/// Returns the default base `StorageSettings` for this build.
///
/// When the `edge` feature is enabled, returns [`Self::edge()`].
/// When the `edge` feature is enabled, returns `Self::edge()`.
/// Otherwise, returns [`Self::legacy()`].
pub const fn base() -> Self {
#[cfg(feature = "edge")]

View File

@@ -123,6 +123,9 @@ pub struct ParallelSparseTrie {
update_actions_buffers: Vec<Vec<SparseTrieUpdatesAction>>,
/// Thresholds controlling when parallelism is enabled for different operations.
parallelism_thresholds: ParallelismThresholds,
/// Tracks heat of lower subtries for smart pruning decisions.
/// Hot subtries are skipped during pruning to keep frequently-used data revealed.
subtrie_heat: SubtrieModifications,
/// Metrics for the parallel sparse trie.
#[cfg(feature = "metrics")]
metrics: crate::metrics::ParallelSparseTrieMetrics,
@@ -141,6 +144,7 @@ impl Default for ParallelSparseTrie {
branch_node_masks: BranchNodeMasksMap::default(),
update_actions_buffers: Vec::default(),
parallelism_thresholds: Default::default(),
subtrie_heat: SubtrieModifications::default(),
#[cfg(feature = "metrics")]
metrics: Default::default(),
}
@@ -909,7 +913,17 @@ impl SparseTrie for ParallelSparseTrie {
}
fn take_updates(&mut self) -> SparseTrieUpdates {
self.updates.take().unwrap_or_default()
match self.updates.take() {
Some(updates) => {
// NOTE: we need to preserve Some case
self.updates = Some(SparseTrieUpdates::with_capacity(
updates.updated_nodes.len(),
updates.removed_nodes.len(),
));
updates
}
None => SparseTrieUpdates::default(),
}
}
fn wipe(&mut self) {
@@ -917,6 +931,7 @@ impl SparseTrie for ParallelSparseTrie {
self.lower_subtries = [const { LowerSparseSubtrie::Blind(None) }; NUM_LOWER_SUBTRIES];
self.prefix_set = PrefixSetMut::all();
self.updates = self.updates.is_some().then(SparseTrieUpdates::wiped);
self.subtrie_heat.clear();
}
fn clear(&mut self) {
@@ -928,6 +943,7 @@ impl SparseTrie for ParallelSparseTrie {
self.prefix_set.clear();
self.updates = None;
self.branch_node_masks.clear();
self.subtrie_heat.clear();
// `update_actions_buffers` doesn't need to be cleared; we want to reuse the Vecs it has
// buffered, and all of those are already inherently cleared when they get used.
}
@@ -1032,21 +1048,22 @@ impl SparseTrie for ParallelSparseTrie {
}
impl SparseTrieExt for ParallelSparseTrie {
/// Returns the count of revealed (non-hash) nodes across all subtries.
fn revealed_node_count(&self) -> usize {
let upper_count = self.upper_subtrie.nodes.values().filter(|n| !n.is_hash()).count();
/// O(1) size hint based on total node count (including hash stubs).
fn size_hint(&self) -> usize {
let upper_count = self.upper_subtrie.nodes.len();
let lower_count: usize = self
.lower_subtries
.iter()
.filter_map(|s| s.as_revealed_ref())
.map(|s| s.nodes.values().filter(|n| !n.is_hash()).count())
.map(|s| s.nodes.len())
.sum();
upper_count + lower_count
}
fn prune(&mut self, max_depth: usize) -> usize {
// Decay heat for subtries not modified this cycle
self.subtrie_heat.decay_and_reset();
// DFS traversal to find nodes at max_depth that can be pruned.
// Collects "effective pruned roots" - children of nodes at max_depth with computed hashes.
// We replace nodes with Hash stubs inline during traversal.
@@ -1056,6 +1073,16 @@ impl SparseTrieExt for ParallelSparseTrie {
// DFS traversal: pop path and depth, skip if subtrie or node not found.
while let Some((path, depth)) = stack.pop() {
// Skip traversal into hot lower subtries beyond max_depth.
// At max_depth, we still need to process the node to convert children to hashes.
// This keeps frequently-modified subtries revealed to avoid expensive re-reveals.
if depth > max_depth &&
let SparseSubtrieType::Lower(idx) = SparseSubtrieType::from_path(&path) &&
self.subtrie_heat.is_hot(idx)
{
continue;
}
// Get children to visit from current node (immutable access)
let children: SmallVec<[Nibbles; 16]> = {
let Some(subtrie) = self.subtrie_for_path(&path) else { continue };
@@ -1096,10 +1123,11 @@ impl SparseTrieExt for ParallelSparseTrie {
.and_then(|n| n.hash());
if let Some(hash) = hash {
self.subtrie_for_path_mut(&child)
.nodes
.insert(child, SparseNode::Hash(hash));
effective_pruned_roots.push((child, hash));
// Use untracked access to avoid marking subtrie as modified during pruning
if let Some(subtrie) = self.subtrie_for_path_mut_untracked(&child) {
subtrie.nodes.insert(child, SparseNode::Hash(hash));
effective_pruned_roots.push((child, hash));
}
}
} else {
stack.push((child, depth + 1));
@@ -1381,6 +1409,7 @@ impl ParallelSparseTrie {
SparseSubtrieType::Upper => None,
SparseSubtrieType::Lower(idx) => {
self.lower_subtries[idx].reveal(path);
self.subtrie_heat.mark_modified(idx);
Some(self.lower_subtries[idx].as_revealed_mut().expect("just revealed"))
}
}
@@ -1416,6 +1445,19 @@ impl ParallelSparseTrie {
}
}
/// Returns a mutable reference to a subtrie without marking it as modified.
/// Used for internal operations like pruning that shouldn't affect heat tracking.
fn subtrie_for_path_mut_untracked(&mut self, path: &Nibbles) -> Option<&mut SparseSubtrie> {
if SparseSubtrieType::path_len_is_upper(path.len()) {
Some(&mut self.upper_subtrie)
} else {
match SparseSubtrieType::from_path(path) {
SparseSubtrieType::Upper => None,
SparseSubtrieType::Lower(idx) => self.lower_subtries[idx].as_revealed_mut(),
}
}
}
/// Returns the next node in the traversal path from the given path towards the leaf for the
/// given full leaf path, or an error if any node along the traversal path is not revealed.
///
@@ -2052,10 +2094,93 @@ impl ParallelSparseTrie {
}
self.lower_subtries[index] = LowerSparseSubtrie::Revealed(subtrie);
self.subtrie_heat.mark_modified(index);
}
}
}
/// Bitset tracking which of the 256 lower subtries were modified in the current cycle.
#[derive(Clone, Default, PartialEq, Eq, Debug)]
struct ModifiedSubtries([u64; 4]);
impl ModifiedSubtries {
/// Marks a subtrie index as modified.
#[inline]
fn set(&mut self, idx: usize) {
debug_assert!(idx < NUM_LOWER_SUBTRIES);
self.0[idx >> 6] |= 1 << (idx & 63);
}
/// Returns whether a subtrie index is marked as modified.
#[inline]
fn get(&self, idx: usize) -> bool {
debug_assert!(idx < NUM_LOWER_SUBTRIES);
(self.0[idx >> 6] & (1 << (idx & 63))) != 0
}
/// Clears all modification flags.
#[inline]
const fn clear(&mut self) {
self.0 = [0; 4];
}
}
/// Tracks heat (modification frequency) for each of the 256 lower subtries.
///
/// Heat is used to avoid pruning frequently-modified subtries, which would cause
/// expensive re-reveal operations on subsequent updates.
///
/// - Heat is incremented by 2 when a subtrie is modified
/// - Heat decays by 1 each prune cycle for subtries not modified that cycle
/// - Subtries with heat > 0 are considered "hot" and skipped during pruning
#[derive(Clone, PartialEq, Eq, Debug)]
struct SubtrieModifications {
/// Heat level (0-255) for each of the 256 lower subtries.
heat: [u8; NUM_LOWER_SUBTRIES],
/// Tracks which subtries were modified in the current cycle.
modified: ModifiedSubtries,
}
impl Default for SubtrieModifications {
fn default() -> Self {
Self { heat: [0; NUM_LOWER_SUBTRIES], modified: ModifiedSubtries::default() }
}
}
impl SubtrieModifications {
/// Marks a subtrie as modified, incrementing its heat by 1.
#[inline]
fn mark_modified(&mut self, idx: usize) {
debug_assert!(idx < NUM_LOWER_SUBTRIES);
self.modified.set(idx);
self.heat[idx] = self.heat[idx].saturating_add(1);
}
/// Returns whether a subtrie is currently hot (heat > 0).
#[inline]
fn is_hot(&self, idx: usize) -> bool {
debug_assert!(idx < NUM_LOWER_SUBTRIES);
self.heat[idx] > 0
}
/// Decays heat for subtries not modified this cycle and resets modification tracking.
/// Called at the start of each prune cycle.
fn decay_and_reset(&mut self) {
for (idx, heat) in self.heat.iter_mut().enumerate() {
if !self.modified.get(idx) {
*heat = heat.saturating_sub(1);
}
}
self.modified.clear();
}
/// Clears all heat tracking state.
const fn clear(&mut self) {
self.heat = [0; NUM_LOWER_SUBTRIES];
self.modified.clear();
}
}
/// This is a subtrie of the [`ParallelSparseTrie`] that contains a map from path to sparse trie
/// nodes.
#[derive(Clone, PartialEq, Eq, Debug, Default)]
@@ -7735,7 +7860,11 @@ mod tests {
#[test]
fn test_prune_at_various_depths() {
for max_depth in [0, 1, 2] {
// Test depths 0 and 1, which are in the Upper subtrie (no heat tracking).
// Depth 2 is the boundary where Lower subtries start (UPPER_TRIE_MAX_DEPTH=2),
// and with `depth >= max_depth` heat check, hot Lower subtries at depth 2
// are protected from pruning traversal.
for max_depth in [0, 1] {
let provider = DefaultTrieNodeProvider;
let mut trie = ParallelSparseTrie::default();
@@ -7755,21 +7884,27 @@ mod tests {
}
let root_before = trie.root();
let nodes_before = trie.revealed_node_count();
let nodes_before = trie.size_hint();
trie.prune(max_depth);
// Prune multiple times to allow heat to fully decay.
// Heat starts at 1 and decays by 1 each cycle for unmodified subtries,
// so we need 2 prune cycles: 1→0, then actual prune.
for _ in 0..2 {
trie.prune(max_depth);
}
let root_after = trie.root();
assert_eq!(root_before, root_after, "root hash should be preserved after prune");
let nodes_after = trie.revealed_node_count();
let nodes_after = trie.size_hint();
assert!(
nodes_after < nodes_before,
"node count should decrease after prune at depth {max_depth}"
);
if max_depth == 0 {
assert_eq!(nodes_after, 1, "only root should be revealed after prune(0)");
// Root + 4 hash stubs for children at [0], [1], [2], [3]
assert_eq!(nodes_after, 5, "root + 4 hash stubs after prune(0)");
}
}
}
@@ -7815,13 +7950,13 @@ mod tests {
trie.update_leaf(Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]), value, &provider).unwrap();
let root_before = trie.root();
let nodes_before = trie.revealed_node_count();
let nodes_before = trie.size_hint();
trie.prune(0);
let root_after = trie.root();
assert_eq!(root_before, root_after, "root hash should be preserved");
assert_eq!(trie.revealed_node_count(), nodes_before, "single leaf trie should not change");
assert_eq!(trie.size_hint(), nodes_before, "single leaf trie should not change");
}
#[test]
@@ -7837,11 +7972,11 @@ mod tests {
}
trie.root();
let nodes_before = trie.revealed_node_count();
let nodes_before = trie.size_hint();
trie.prune(100);
assert_eq!(nodes_before, trie.revealed_node_count(), "deep prune should have no effect");
assert_eq!(nodes_before, trie.size_hint(), "deep prune should have no effect");
}
#[test]
@@ -7857,10 +7992,16 @@ mod tests {
.unwrap();
let root_before = trie.root();
trie.prune(1);
// Prune multiple times to allow heat to fully decay.
// Heat starts at 1 and decays by 1 each cycle for unmodified subtries,
// so we need 2 prune cycles: 1→0, then actual prune.
for _ in 0..2 {
trie.prune(1);
}
assert_eq!(root_before, trie.root(), "root hash should be preserved");
assert_eq!(trie.revealed_node_count(), 2, "should have root + extension after prune(1)");
// Root + extension + 2 hash stubs (for the two leaves' parent branches)
assert_eq!(trie.size_hint(), 4, "root + extension + hash stubs after prune(1)");
}
#[test]
@@ -7873,13 +8014,13 @@ mod tests {
trie.update_leaf(Nibbles::from_nibbles([0x1]), small_value, &provider).unwrap();
let root_before = trie.root();
let nodes_before = trie.revealed_node_count();
let nodes_before = trie.size_hint();
trie.prune(0);
assert_eq!(root_before, trie.root(), "root hash must be preserved");
if trie.revealed_node_count() == nodes_before {
if trie.size_hint() == nodes_before {
assert!(trie.get_leaf_value(&Nibbles::from_nibbles([0x0])).is_some());
assert!(trie.get_leaf_value(&Nibbles::from_nibbles([0x1])).is_some());
}
@@ -7923,9 +8064,15 @@ mod tests {
}
let root_before = trie.root();
let pruned = trie.prune(1);
assert!(pruned > 0, "should have pruned some nodes");
// Prune multiple times to allow heat to fully decay.
// Heat starts at 1 and decays by 1 each cycle for unmodified subtries.
let mut total_pruned = 0;
for _ in 0..2 {
total_pruned += trie.prune(1);
}
assert!(total_pruned > 0, "should have pruned some nodes");
assert_eq!(root_before, trie.root(), "root hash should be preserved");
for key in &keys {
@@ -7947,14 +8094,14 @@ mod tests {
}
trie.root();
let nodes_before = trie.revealed_node_count();
let nodes_before = trie.size_hint();
// If depth were truncated to u8, 300 would become 44 and might prune something
trie.prune(300);
assert_eq!(
nodes_before,
trie.revealed_node_count(),
trie.size_hint(),
"prune(300) should have no effect on a shallow trie"
);
}

View File

@@ -5,12 +5,6 @@
extern crate alloc;
/// Default depth to prune sparse tries to for cross-payload caching.
pub const DEFAULT_SPARSE_TRIE_PRUNE_DEPTH: usize = 4;
/// Default number of storage tries to preserve across payload validations.
pub const DEFAULT_MAX_PRESERVED_STORAGE_TRIES: usize = 100;
mod state;
pub use state::*;

View File

@@ -19,6 +19,8 @@ use reth_trie_common::{
Nibbles, ProofTrieNode, RlpNode, StorageMultiProof, TrieAccount, TrieNode, EMPTY_ROOT_HASH,
TRIE_ACCOUNT_RLP_MAX_SIZE,
};
#[cfg(feature = "std")]
use tracing::debug;
use tracing::{instrument, trace};
/// Provides type-safe re-use of cleared [`SparseStateTrie`]s, which helps to save allocations
@@ -44,38 +46,25 @@ where
Self(trie)
}
/// Shrink the cleared sparse trie's capacity to the given node and value size.
/// This helps reduce memory usage when the trie has excess capacity.
/// The capacity is distributed equally across the account trie and all storage tries.
pub fn shrink_to(&mut self, node_size: usize, value_size: usize) {
// Count total number of storage tries (active + cleared + default)
let storage_tries_count = self.0.storage.tries.len() + self.0.storage.cleared_tries.len();
// Total tries = 1 account trie + all storage tries
let total_tries = 1 + storage_tries_count;
// Distribute capacity equally among all tries
let node_size_per_trie = node_size / total_tries;
let value_size_per_trie = value_size / total_tries;
// Shrink the account trie
self.0.state.shrink_nodes_to(node_size_per_trie);
self.0.state.shrink_values_to(value_size_per_trie);
// Give storage tries the remaining capacity after account trie allocation
let storage_node_size = node_size.saturating_sub(node_size_per_trie);
let storage_value_size = value_size.saturating_sub(value_size_per_trie);
// Shrink all storage tries (they will redistribute internally)
self.0.storage.shrink_to(storage_node_size, storage_value_size);
}
/// Returns the cleared [`SparseStateTrie`], consuming this instance.
pub fn into_inner(self) -> SparseStateTrie<A, S> {
self.0
}
}
impl<A, S> ClearedSparseStateTrie<A, S>
where
A: SparseTrieTrait + SparseTrieExt + Default,
S: SparseTrieTrait + SparseTrieExt + Default + Clone,
{
/// Shrink the cleared sparse trie's capacity to the given node and value size.
///
/// Delegates to the inner `SparseStateTrie::shrink_to`.
pub fn shrink_to(&mut self, max_nodes: usize, max_values: usize) {
self.0.shrink_to(max_nodes, max_values);
}
}
#[derive(Debug)]
/// Sparse state trie representing lazy-loaded Ethereum state trie.
pub struct SparseStateTrie<
@@ -271,6 +260,8 @@ where
{
for (account, storage_subtree) in storages {
self.reveal_decoded_storage_multiproof(account, storage_subtree)?;
// Mark this storage trie as hot (accessed this tick)
self.storage.modifications.mark_accessed(account);
}
Ok(())
@@ -313,6 +304,8 @@ where
for (account, revealed_nodes, trie, result) in results {
self.storage.revealed_paths.insert(account, revealed_nodes);
self.storage.tries.insert(account, trie);
// Mark this storage trie as hot (accessed this tick)
self.storage.modifications.mark_accessed(account);
if let Ok(_metric_values) = result {
#[cfg(feature = "metrics")]
{
@@ -353,6 +346,8 @@ where
{
for (account, storage_proofs) in multiproof.storage_proofs {
self.reveal_storage_v2_proof_nodes(account, storage_proofs)?;
// Mark this storage trie as hot (accessed this tick)
self.storage.modifications.mark_accessed(account);
}
Ok(())
@@ -393,6 +388,8 @@ where
for (account, result, revealed_nodes, trie) in results {
self.storage.revealed_paths.insert(account, revealed_nodes);
self.storage.tries.insert(account, trie);
// Mark this storage trie as hot (accessed this tick)
self.storage.modifications.mark_accessed(account);
if let Ok(_metric_values) = result {
#[cfg(feature = "metrics")]
{
@@ -993,23 +990,42 @@ where
A: SparseTrieTrait + SparseTrieExt + Default,
S: SparseTrieTrait + SparseTrieExt + Default + Clone,
{
/// Minimum number of storage tries before parallel pruning is enabled.
#[cfg(feature = "std")]
const PARALLEL_PRUNE_THRESHOLD: usize = 16;
/// Clears all trie data while preserving allocations for reuse.
///
/// This resets the trie to an empty state but keeps the underlying memory allocations,
/// which can significantly reduce allocation overhead when the trie is reused.
pub fn clear(&mut self) {
self.state = core::mem::take(&mut self.state).clear();
self.revealed_account_paths.clear();
self.storage.clear();
self.account_rlp_buf.clear();
}
/// Returns true if parallelism should be enabled for pruning the given number of tries.
/// Will always return false in `no_std` builds.
const fn is_prune_parallelism_enabled(num_tries: usize) -> bool {
#[cfg(not(feature = "std"))]
{
let _ = num_tries;
return false;
}
/// Shrinks the capacity of the sparse trie to the given node and value sizes.
///
/// This helps reduce memory usage when the trie has excess capacity.
/// Distributes capacity equally among all tries (account + storage).
pub fn shrink_to(&mut self, max_nodes: usize, max_values: usize) {
// Count total number of storage tries (active + cleared)
let storage_tries_count = self.storage.tries.len() + self.storage.cleared_tries.len();
#[cfg(feature = "std")]
{
num_tries >= Self::PARALLEL_PRUNE_THRESHOLD
}
// Total tries = 1 account trie + all storage tries
let total_tries = 1 + storage_tries_count;
// Distribute capacity equally among all tries
let nodes_per_trie = max_nodes / total_tries;
let values_per_trie = max_values / total_tries;
// Shrink the account trie
self.state.shrink_nodes_to(nodes_per_trie);
self.state.shrink_values_to(values_per_trie);
// Give storage tries the remaining capacity after account trie allocation
let storage_nodes = max_nodes.saturating_sub(nodes_per_trie);
let storage_values = max_values.saturating_sub(values_per_trie);
// Shrink all storage tries (they will redistribute internally)
self.storage.shrink_to(storage_nodes, storage_values);
}
/// Prunes the account trie and selected storage tries to reduce memory usage.
@@ -1025,84 +1041,21 @@ where
/// # Effects
///
/// - Clears `revealed_account_paths` and `revealed_paths` for all storage tries
#[cfg(feature = "std")]
#[instrument(target = "trie::sparse", skip_all, fields(max_depth, max_storage_tries))]
pub fn prune(&mut self, max_depth: usize, max_storage_tries: usize) {
if let Some(trie) = self.state.as_revealed_mut() {
trie.prune(max_depth);
}
self.revealed_account_paths.clear();
let mut storage_trie_counts: Vec<(B256, usize)> = self
.storage
.tries
.iter()
.map(|(hash, trie)| {
let count = match trie {
RevealableSparseTrie::Revealed(t) => t.revealed_node_count(),
RevealableSparseTrie::Blind(_) => 0,
};
(*hash, count)
})
.collect();
// Use O(n) selection instead of O(n log n) sort
let tries_to_keep: HashSet<B256> = if storage_trie_counts.len() <= max_storage_tries {
storage_trie_counts.iter().map(|(hash, _)| *hash).collect()
} else {
storage_trie_counts
.select_nth_unstable_by(max_storage_tries.saturating_sub(1), |a, b| b.1.cmp(&a.1));
storage_trie_counts[..max_storage_tries].iter().map(|(hash, _)| *hash).collect()
};
// Collect keys to avoid borrow conflict
let tries_to_clear: Vec<B256> = self
.storage
.tries
.keys()
.filter(|hash| !tries_to_keep.contains(*hash))
.copied()
.collect();
// Evict storage tries that exceeded limit, saving cleared allocations for reuse
for hash in tries_to_clear {
if let Some(trie) = self.storage.tries.remove(&hash) {
self.storage.cleared_tries.push(trie.clear());
}
if let Some(mut paths) = self.storage.revealed_paths.remove(&hash) {
paths.clear();
self.storage.cleared_revealed_paths.push(paths);
}
}
// Prune storage tries that are kept
if Self::is_prune_parallelism_enabled(tries_to_keep.len()) {
#[cfg(feature = "std")]
{
use rayon::prelude::*;
self.storage.tries.par_iter_mut().for_each(|(hash, trie)| {
if tries_to_keep.contains(hash) &&
let Some(t) = trie.as_revealed_mut()
{
t.prune(max_depth);
}
});
}
} else {
for hash in &tries_to_keep {
if let Some(trie) =
self.storage.tries.get_mut(hash).and_then(|t| t.as_revealed_mut())
{
// Prune state and storage tries in parallel
rayon::join(
|| {
if let Some(trie) = self.state.as_revealed_mut() {
trie.prune(max_depth);
}
}
}
// Clear revealed_paths for kept tries
for hash in &tries_to_keep {
if let Some(paths) = self.storage.revealed_paths.get_mut(hash) {
paths.clear();
}
}
self.revealed_account_paths.clear();
},
|| {
self.storage.prune(max_depth, max_storage_tries);
},
);
}
}
@@ -1121,6 +1074,119 @@ struct StorageTries<S = SerialSparseTrie> {
cleared_revealed_paths: Vec<HashSet<Nibbles>>,
/// A default cleared trie instance, which will be cloned when creating new tries.
default_trie: RevealableSparseTrie<S>,
/// Tracks access patterns and modification state of storage tries for smart pruning decisions.
modifications: StorageTrieModifications,
}
#[cfg(feature = "std")]
impl<S: SparseTrieTrait + SparseTrieExt> StorageTries<S> {
/// Prunes and evicts storage tries.
///
/// Keeps the top `max_storage_tries` by a score combining size and heat.
/// Evicts lower-scored tries entirely, prunes kept tries to `max_depth`.
fn prune(&mut self, max_depth: usize, max_storage_tries: usize) {
let fn_start = std::time::Instant::now();
let mut stats =
StorageTriesPruneStats { total_tries_before: self.tries.len(), ..Default::default() };
// Update heat for accessed tries
self.modifications.update_and_reset();
// Collect (address, size, score) for all tries
// Score = size * heat_multiplier
// Hot tries (high heat) get boosted weight
let mut trie_info: Vec<(B256, usize, usize)> = self
.tries
.iter()
.map(|(address, trie)| {
let size = match trie {
RevealableSparseTrie::Blind(_) => return (*address, 0, 0),
RevealableSparseTrie::Revealed(t) => t.size_hint(),
};
let heat = self.modifications.heat(address);
// Heat multiplier: 1 (cold) to 3 (very hot, heat >= 4)
let heat_multiplier = 1 + (heat.min(4) / 2) as usize;
(*address, size, size * heat_multiplier)
})
.collect();
// Use O(n) selection to find top max_storage_tries by score
if trie_info.len() > max_storage_tries {
trie_info
.select_nth_unstable_by(max_storage_tries.saturating_sub(1), |a, b| b.2.cmp(&a.2));
trie_info.truncate(max_storage_tries);
}
let tries_to_keep: B256Map<usize> =
trie_info.iter().map(|(address, size, _)| (*address, *size)).collect();
stats.tries_to_keep = tries_to_keep.len();
// Collect keys to evict
let tries_to_clear: Vec<B256> =
self.tries.keys().filter(|addr| !tries_to_keep.contains_key(*addr)).copied().collect();
stats.tries_to_evict = tries_to_clear.len();
// Evict storage tries that exceeded limit, saving cleared allocations for reuse
for address in &tries_to_clear {
if let Some(trie) = self.tries.remove(address) {
self.cleared_tries.push(trie.clear());
}
if let Some(mut paths) = self.revealed_paths.remove(address) {
paths.clear();
self.cleared_revealed_paths.push(paths);
}
self.modifications.remove(address);
}
// Prune storage tries that are kept, but only if:
// - They haven't been pruned since last access
// - They're large enough to be worth pruning
const MIN_SIZE_TO_PRUNE: usize = 1000;
let prune_start = std::time::Instant::now();
for (address, size) in &tries_to_keep {
if *size < MIN_SIZE_TO_PRUNE {
stats.skipped_small += 1;
continue; // Small tries aren't worth the DFS cost
}
let Some(heat_state) = self.modifications.get_mut(address) else {
continue; // No heat state = not tracked
};
// Only prune if backlog >= 2 (skip every other cycle)
if heat_state.prune_backlog < 2 {
stats.skipped_recently_pruned += 1;
continue; // Recently pruned, skip this cycle
}
if let Some(trie) = self.tries.get_mut(address).and_then(|t| t.as_revealed_mut()) {
trie.prune(max_depth);
heat_state.prune_backlog = 0; // Reset backlog after prune
stats.pruned_count += 1;
}
}
stats.prune_elapsed = prune_start.elapsed();
// Clear revealed_paths for kept tries
for hash in tries_to_keep.keys() {
if let Some(paths) = self.revealed_paths.get_mut(hash) {
paths.clear();
}
}
stats.total_tries_after = self.tries.len();
stats.total_elapsed = fn_start.elapsed();
debug!(
target: "trie::sparse",
before = stats.total_tries_before,
after = stats.total_tries_after,
kept = stats.tries_to_keep,
evicted = stats.tries_to_evict,
pruned = stats.pruned_count,
skipped_small = stats.skipped_small,
skipped_recent = stats.skipped_recently_pruned,
?stats.prune_elapsed,
?stats.total_elapsed,
"StorageTries::prune completed"
);
}
}
impl<S: SparseTrieTrait> StorageTries<S> {
@@ -1132,30 +1198,32 @@ impl<S: SparseTrieTrait> StorageTries<S> {
set.clear();
set
}));
self.modifications.clear();
}
/// Shrinks the capacity of all storage tries (active, cleared, and default) to the given sizes.
/// The capacity is distributed equally among all tries that have allocations.
fn shrink_to(&mut self, node_size: usize, value_size: usize) {
// Count total number of tries with capacity (active + cleared + default)
let active_count = self.tries.len();
let cleared_count = self.cleared_tries.len();
let total_tries = 1 + active_count + cleared_count;
/// Shrinks the capacity of all storage tries to the given total sizes.
///
/// Distributes capacity equally among all tries (active + cleared).
fn shrink_to(&mut self, max_nodes: usize, max_values: usize) {
let total_tries = self.tries.len() + self.cleared_tries.len();
if total_tries == 0 {
return;
}
// Distribute capacity equally among all tries
let node_size_per_trie = node_size / total_tries;
let value_size_per_trie = value_size / total_tries;
let nodes_per_trie = max_nodes / total_tries;
let values_per_trie = max_values / total_tries;
// Shrink active storage tries
for trie in self.tries.values_mut() {
trie.shrink_nodes_to(node_size_per_trie);
trie.shrink_values_to(value_size_per_trie);
trie.shrink_nodes_to(nodes_per_trie);
trie.shrink_values_to(values_per_trie);
}
// Shrink cleared storage tries
for trie in &mut self.cleared_tries {
trie.shrink_nodes_to(node_size_per_trie);
trie.shrink_values_to(value_size_per_trie);
trie.shrink_nodes_to(nodes_per_trie);
trie.shrink_values_to(values_per_trie);
}
}
}
@@ -1213,6 +1281,96 @@ impl<S: SparseTrieTrait + Clone> StorageTries<S> {
}
}
/// Statistics from a storage tries prune operation.
#[derive(Debug, Default)]
#[allow(dead_code)]
struct StorageTriesPruneStats {
total_tries_before: usize,
total_tries_after: usize,
tries_to_keep: usize,
tries_to_evict: usize,
pruned_count: usize,
skipped_small: usize,
skipped_recently_pruned: usize,
prune_elapsed: core::time::Duration,
total_elapsed: core::time::Duration,
}
/// Per-trie access tracking and prune state.
///
/// Tracks how frequently a storage trie is accessed and when it was last pruned,
/// enabling smart pruning decisions that preserve frequently-used tries.
#[derive(Debug, Clone, Copy, Default)]
#[allow(dead_code)]
struct TrieModificationState {
/// Access frequency level (0-255). Incremented each cycle the trie is accessed.
/// Used for prioritizing which tries to keep during pruning.
heat: u8,
/// Prune backlog - cycles since last prune. Incremented each cycle,
/// reset to 0 when pruned. Used to decide when pruning is needed.
prune_backlog: u8,
}
/// Tracks access patterns and modification state of storage tries for smart pruning decisions.
///
/// Access-based tracking is more accurate than simple generation counting because it tracks
/// actual access patterns rather than administrative operations (take/insert).
///
/// - Access frequency is incremented when a storage proof is revealed (accessed)
/// - Access frequency decays each prune cycle for tries not accessed that cycle
/// - Tries with higher access frequency are prioritized for preservation during pruning
#[derive(Debug, Default)]
struct StorageTrieModifications {
/// Access frequency and prune state per storage trie address.
state: B256Map<TrieModificationState>,
/// Tracks which tries were accessed in the current cycle (between prune calls).
accessed_this_cycle: HashSet<B256>,
}
#[allow(dead_code)]
impl StorageTrieModifications {
/// Marks a storage trie as accessed this cycle.
/// Heat and `prune_backlog` are updated in [`Self::update_and_reset`].
#[inline]
fn mark_accessed(&mut self, address: B256) {
self.accessed_this_cycle.insert(address);
}
/// Returns mutable reference to the heat state for a storage trie.
#[inline]
fn get_mut(&mut self, address: &B256) -> Option<&mut TrieModificationState> {
self.state.get_mut(address)
}
/// Returns the heat level for a storage trie (0 if not tracked).
#[inline]
fn heat(&self, address: &B256) -> u8 {
self.state.get(address).map_or(0, |s| s.heat)
}
/// Updates heat and prune backlog for accessed tries.
/// Called at the start of each prune cycle.
fn update_and_reset(&mut self) {
for address in self.accessed_this_cycle.drain() {
let entry = self.state.entry(address).or_default();
entry.heat = entry.heat.saturating_add(1);
entry.prune_backlog = entry.prune_backlog.saturating_add(1);
}
}
/// Removes tracking for a specific address (when trie is evicted).
fn remove(&mut self, address: &B256) {
self.state.remove(address);
self.accessed_this_cycle.remove(address);
}
/// Clears all heat tracking state.
fn clear(&mut self) {
self.state.clear();
self.accessed_this_cycle.clear();
}
}
#[derive(Debug, PartialEq, Eq, Default)]
struct ProofNodesMetricValues {
/// Number of nodes in the proof.

View File

@@ -249,8 +249,12 @@ pub trait SparseTrie: Sized + Debug + Send + Sync {
/// converting nodes beyond a certain depth into hash stubs. This is useful for reducing
/// memory usage when caching tries across payload validations.
pub trait SparseTrieExt: SparseTrie {
/// Returns the number of revealed (non-Hash) nodes in the trie.
fn revealed_node_count(&self) -> usize;
/// Returns a cheap O(1) size hint for the trie representing the count of revealed
/// (non-Hash) nodes.
///
/// This is used as a heuristic for prioritizing which storage tries to keep
/// during pruning. Larger values indicate larger tries that are more valuable to preserve.
fn size_hint(&self) -> usize;
/// Replaces nodes beyond `max_depth` with hash stubs and removes their descendants.
///
@@ -310,6 +314,17 @@ pub struct SparseTrieUpdates {
pub wiped: bool,
}
impl SparseTrieUpdates {
/// Initialize a [`Self`] with given capacities.
pub fn with_capacity(num_updated_nodes: usize, num_removed_nodes: usize) -> Self {
Self {
updated_nodes: HashMap::with_capacity_and_hasher(num_updated_nodes, Default::default()),
removed_nodes: HashSet::with_capacity_and_hasher(num_removed_nodes, Default::default()),
wiped: false,
}
}
}
/// Error type for a leaf lookup operation
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LeafLookupError {

View File

@@ -979,7 +979,17 @@ impl SparseTrieTrait for SerialSparseTrie {
}
fn take_updates(&mut self) -> SparseTrieUpdates {
self.updates.take().unwrap_or_default()
match self.updates.take() {
Some(updates) => {
// NOTE: we need to preserve Some case
self.updates = Some(SparseTrieUpdates::with_capacity(
updates.updated_nodes.len(),
updates.removed_nodes.len(),
));
updates
}
None => SparseTrieUpdates::default(),
}
}
fn wipe(&mut self) {