Compare commits

...

85 Commits

Author SHA1 Message Date
Arsenii Kulikov
1549e93eac fix 2026-02-01 19:00:59 +04:00
Arsenii Kulikov
48e1270dec wip 2026-02-01 18:19:52 +04:00
Georgios Konstantopoulos
821974a6a6 fix(trie): use full_key in proof_required_fn when path is prefix
When calling proof_required_fn in update_leaves, use the original full_key
instead of the padded path when path is a prefix of full_path. This ensures
proof targets are correctly associated with the original key for cache lookups.

The min_len is always derived from path.len() regardless of which key is used.

Introduces proof_target_for_path helper to DRY up the logic across all three
call sites (removal, update/insert, and touched).
2026-01-31 11:53:26 +00:00
Arsenii Kulikov
6183adb2db Merge branch 'klkvr/sparse-trie-cache' into klkvr/sparse-trie-cache-preserve 2026-01-30 23:40:00 +04:00
Arsenii Kulikov
1b0c54a0a9 wip 2026-01-30 22:50:49 +04:00
Arsenii Kulikov
3aaa8daefc wip 2026-01-30 22:40:27 +04:00
Arsenii Kulikov
bb97b80116 wip 2026-01-30 22:25:33 +04:00
Arsenii Kulikov
81d1aa1eb4 Merge branch 'mattsse/sparse-trie-preservation' into klkvr/sparse-trie-cache-preserve 2026-01-30 21:55:32 +04:00
Arsenii Kulikov
d6d1a090e5 fix 2026-01-30 21:51:55 +04:00
Arsenii Kulikov
b52700dd95 Merge branch 'main' into klkvr/sparse-trie-cache 2026-01-30 21:50:32 +04:00
Matthias Seitz
65201a1abf fix docs 2026-01-30 18:49:43 +01:00
Matthias Seitz
b7cb06b88f update take updates 2026-01-30 18:42:11 +01:00
Arsenii Kulikov
5b86a17331 fix 2026-01-30 21:25:30 +04:00
Arsenii Kulikov
914ab7d5e6 fix 2026-01-30 21:22:53 +04:00
Matthias Seitz
7e38827df5 docs: fix broken intra-doc link in StorageSettings
Amp-Thread-ID: https://ampcode.com/threads/T-019c0fdd-3e07-75c3-aa3d-b5a22cdb19bc
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 18:18:59 +01:00
Arsenii Kulikov
dd2c5b7c53 wip 2026-01-30 21:18:53 +04:00
Matthias Seitz
ed528fb975 fix(trie): update prune test expectations for hash stub counting
Amp-Thread-ID: https://ampcode.com/threads/T-019c0fdd-3e07-75c3-aa3d-b5a22cdb19bc
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 18:12:29 +01:00
Arsenii Kulikov
25a97f0be4 wip 2026-01-30 20:59:13 +04:00
Arsenii Kulikov
844e531a3f wip 2026-01-30 20:55:49 +04:00
Matthias Seitz
ef860c20fb refactor: PreservedSparseTrie as enum, anchor by state root
- Make PreservedSparseTrie an enum with Anchored and Cleared variants
- Anchor trie by computed state_root instead of block_hash
- Add parent_state_root to ExecutionEnv for continuation checks
- Feature-gate prune functions with cfg(feature = "std")
- Add reveal-on-first-try hitrate tracking to update_leaves
- Add prune stats (nodes_converted, hot_subtries_skipped)

Amp-Thread-ID: https://ampcode.com/threads/T-019c0f54-ed9a-7599-a6a2-827a04d388c2
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 17:46:13 +01:00
Arsenii Kulikov
755d1e5f61 wip 2026-01-30 20:45:34 +04:00
Arsenii Kulikov
7d1bd7c9f8 wip 2026-01-30 20:41:19 +04:00
Arsenii Kulikov
93c2455cef wip 2026-01-30 20:39:51 +04:00
Arsenii Kulikov
6c661eb868 instrument 2026-01-30 20:36:25 +04:00
Matthias Seitz
c8b84404ae fix: typo and update revealed_node_count to size_hint in test
Amp-Thread-ID: https://ampcode.com/threads/T-019c0f54-ed9a-7599-a6a2-827a04d388c2
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 16:41:50 +01:00
Arsenii Kulikov
b722a7f360 wip 2026-01-30 19:21:31 +04:00
Matthias Seitz
f0eb2aad7c refactor(trie): rename Heat structs to Modified and cleanup
- Rename HotSubtries to ModifiedSubtries
- Rename SubtrieHeat to SubtrieModifications
- Rename TrieHeatState to TrieModificationState
- Rename StorageTrieHeat to StorageTrieModifications
- Merge run/run_inner into single run method in sparse_trie.rs
- Feature-gate rayon::join with cfg(feature = "std")

Amp-Thread-ID: https://ampcode.com/threads/T-019c0f54-ed9a-7599-a6a2-827a04d388c2
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 16:14:41 +01:00
Arsenii Kulikov
ea6da11cf4 wip 2026-01-30 18:55:13 +04:00
Arsenii Kulikov
20029d3c1a wip 2026-01-30 18:52:21 +04:00
Arsenii Kulikov
40eb3bb7b4 wip 2026-01-30 18:42:24 +04:00
Matthias Seitz
8875c8da25 Merge branch 'main' into mattsse/sparse-trie-preservation 2026-01-30 15:35:11 +01:00
Arsenii Kulikov
d87b48c9fc wip 2026-01-30 18:31:52 +04:00
Arsenii Kulikov
95778a0cc1 wip 2026-01-30 18:26:01 +04:00
Arsenii Kulikov
17359dadd9 wip 2026-01-30 18:19:23 +04:00
Arsenii Kulikov
bab01a7bba wip 2026-01-30 17:27:51 +04:00
Arsenii Kulikov
82960045c9 wip 2026-01-30 17:25:15 +04:00
Brian Picciano
513fca16e9 always clear Cleared variant 2026-01-30 14:25:03 +01:00
Brian Picciano
ad242a2002 Restore conditional spawn of SparseTrieCacheTask 2026-01-30 14:21:15 +01:00
Arsenii Kulikov
840d6066a6 wip 2026-01-30 16:15:19 +04:00
Brian Picciano
1f315b4f3d Merge remote-tracking branch 'origin/main' into mattsse/sparse-trie-preservation 2026-01-30 13:11:36 +01:00
Arsenii Kulikov
fa604aa3dc wip 2026-01-30 16:02:28 +04:00
Arsenii Kulikov
4b851acfdb wip 2026-01-30 15:53:15 +04:00
Arsenii Kulikov
785933a8ef wip 2026-01-30 15:38:08 +04:00
Arsenii Kulikov
eb5455de68 wip 2026-01-30 14:38:58 +04:00
Arsenii Kulikov
cb9bf1fe3a wip 2026-01-30 14:34:38 +04:00
Matthias Seitz
99973c7781 chore: add debug println statements for size hints and prune stats
Amp-Thread-ID: https://ampcode.com/threads/T-019c0cef-835a-722d-91d8-5bed02d86747
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 04:44:40 +01:00
Matthias Seitz
9d89d4cb5e feat(trie): add heat tracking to ParallelSparseTrie pruning
- Add SubtrieHeat to track modification frequency of lower subtries
- Hot subtries (heat > 0) are skipped during pruning to avoid expensive re-reveals
- Heat increments by 2 on modification, decays by 1 each prune cycle
- Add subtrie_for_path_mut_untracked for internal operations that shouldn't affect heat
- Update tests to account for heat decay cycles

Amp-Thread-ID: https://ampcode.com/threads/T-019c0cef-835a-722d-91d8-5bed02d86747
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 04:32:25 +01:00
Matthias Seitz
f07b37f029 fix: restore shrink_to logic to distribute capacity among all tries
Distribute capacity equally among account trie and all storage tries
(both active and cleared) for proper memory management.

Amp-Thread-ID: https://ampcode.com/threads/T-019c0c7a-dd98-7419-a379-2aff05f2375d
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 04:20:23 +01:00
Matthias Seitz
10b87c8a2e feat(trie): add heat-based tracking with prune backlog for storage tries
- Replace generation-based recency tracking with heat-based tracking
- Track heat (incremented on access) and prune_backlog (cycles since last prune)
- Prune every other cycle using prune_backlog threshold
- Add StorageTriesPruneStats for debug logging of prune operations
- Use debug! logging instead of println for timing information

Amp-Thread-ID: https://ampcode.com/threads/T-019c0c7a-dd98-7419-a379-2aff05f2375d
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 03:05:36 +01:00
Matthias Seitz
095d021969 fix: track heat on storage reveal instead of leaf update
Heat should be tracked when storage proofs are revealed (actual access),
not on leaf updates. This is more accurate because:
- Reveals indicate the storage was actually needed for this block
- Multiple updates to the same storage in one block count as one access
- The modified_this_cycle deduplication handles repeated reveals

Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 02:09:28 +01:00
Matthias Seitz
0c192ef514 refactor: move StorageTrieHeat after StorageTries impl blocks
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 02:01:47 +01:00
Matthias Seitz
946c74a538 feat: replace generation-based recency with heat-based tracking for storage tries
- Add StorageTrieHeat struct with decay mechanism similar to SubtrieHeat
- Heat incremented by 2 on actual leaf updates (not insert/remove operations)
- Heat decays by 1 each prune cycle for unmodified tries
- Score = size * heat_multiplier for smarter eviction decisions
- Only prune hot tries (already pruned cold ones in previous cycles)

Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 02:00:18 +01:00
Matthias Seitz
2455fa4ff5 chore: restore SparseTrieUpdates::with_capacity and take_updates pre-allocation
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 01:17:10 +01:00
Matthias Seitz
0585dc1180 chore: add detailed timing debug for StorageTries::prune
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 01:08:32 +01:00
Matthias Seitz
8b985c102a chore: add timing debug for prune branches
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 01:07:19 +01:00
Matthias Seitz
6fefbfc65f chore: remove unused requested_proof_targets field, add timing debug
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 01:01:35 +01:00
Matthias Seitz
2012602909 chore: restore SparseTrieUpdates::with_capacity and take_updates pre-allocation
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 00:44:21 +01:00
Matthias Seitz
c84b3475a4 chore: simplify take_updates and remove SparseTrieUpdates::with_capacity
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 00:40:58 +01:00
Arsenii Kulikov
f3107565f3 clippy 2026-01-30 03:25:42 +04:00
Arsenii Kulikov
ac6a0fa372 wip 2026-01-30 03:14:57 +04:00
Arsenii Kulikov
5899cd4188 fix 2026-01-30 03:14:42 +04:00
Arsenii Kulikov
a3b76591b7 wip 2026-01-30 03:08:49 +04:00
Matthias Seitz
ab84d61f91 chore: simplify take_updates implementation
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 22:59:44 +01:00
Matthias Seitz
8c1724af13 chore: remove parallel iteration from shrink_to
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 22:58:05 +01:00
Georgios Konstantopoulos
3f6354c2a4 chore: fmt
Amp-Thread-ID: https://ampcode.com/threads/T-019c0b00-7c38-74f9-9349-4ca17922c279
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 21:52:38 +00:00
Matthias Seitz
ba64eb5fc7 merge: origin/main into mattsse/sparse-trie-preservation
Merged upstream changes with the sparse trie preservation feature:
- Resolved conflicts in TreeConfig, payload processor, and sparse trie modules
- Kept simplified generation-based recency tracking for storage tries
- Preserved score-based pruning strategy (size * recency)
- Retained size_hint() O(1) optimization for SparseTrieExt

Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 22:50:50 +01:00
Arsenii Kulikov
1b2935763b wip 2026-01-30 01:16:06 +04:00
Matthias Seitz
cf3eef1874 chore: remove debug timing prints
Amp-Thread-ID: https://ampcode.com/threads/T-019c0b54-f8f0-7608-851b-e3d22922f2ff
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 21:55:49 +01:00
Matthias Seitz
cc3f3d7062 perf(trie): simplify sparse trie pruning and shrinking
- Add size_hint() to SparseTrieExt trait for O(1) size estimation
- Simplify storage trie eviction: keep top N by score (size * recency_multiplier)
- Only prune recently modified tries (age <= 2) above size threshold (1000 nodes)
- Fix shrink_to: don't shrink cleared tries, simpler capacity distribution
- Remove LRU dirty tracking complexity in favor of generation-based recency

Amp-Thread-ID: https://ampcode.com/threads/T-019c0b54-f8f0-7608-851b-e3d22922f2ff
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 21:55:04 +01:00
Matthias Seitz
92ce4f3af0 wip 2026-01-29 20:54:57 +01:00
Georgios Konstantopoulos
28e5911482 Revert "fix(trie): address clippy lints for sparse trie pruning"
This reverts commit 257ccd41e9.
2026-01-29 18:35:03 +00:00
Georgios Konstantopoulos
257ccd41e9 fix(trie): address clippy lints for sparse trie pruning
Amp-Thread-ID: https://ampcode.com/threads/T-019c0b00-7c38-74f9-9349-4ca17922c279
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 18:32:18 +00:00
Matthias Seitz
a53ef64a54 perf(trie): optimize prune with LRU eviction and dirty tracking
- Add generation-based LRU tracking for storage tries
- Only prune dirty tries (modified since last prune)
- Evict non-dirty tries first when over limit, then LRU evict oldest
- Use O(n) select_nth_unstable for eviction instead of O(n log n) sort
- Parallelize shrink_to for storage tries
- Track trie modifications via insert_storage_trie

Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c0a25-8a84-7739-b5eb-ccf486a76629
2026-01-29 18:10:35 +01:00
Brian Picciano
74f9e386ed add histogram for reuse timing 2026-01-29 15:20:12 +01:00
Brian Picciano
45ff349b00 refactor: move sparse trie constants to reth-engine-primitives
Remove DEFAULT_SPARSE_TRIE_PRUNE_DEPTH and DEFAULT_MAX_PRESERVED_STORAGE_TRIES
from reth-trie-sparse and define them in reth-engine-primitives instead.
2026-01-29 13:47:46 +00:00
Brian Picciano
d057c4e9e6 refactor: also reuse DEFAULT_SPARSE_TRIE_PRUNE_DEPTH from reth-trie-sparse 2026-01-29 13:46:36 +00:00
Brian Picciano
405b40f02f refactor: address review comments
- Reuse DEFAULT_MAX_PRESERVED_STORAGE_TRIES from reth-trie-sparse instead of duplicating constant
- Apply take_updates with_capacity pattern to sparse/src/trie.rs to match sparse-parallel
2026-01-29 13:45:23 +00:00
Brian Picciano
868fffe0dd nitpick comment 2026-01-29 14:29:10 +01:00
Brian Picciano
b288c4e259 Fix PST::take_updates not replacing with Some if was previously enabled 2026-01-29 14:27:15 +01:00
Brian Picciano
4e26a6edc9 Merge remote-tracking branch 'origin/main' into mattsse/sparse-trie-preservation 2026-01-29 12:48:51 +01:00
Matthias Seitz
26f788d5ca feat(engine): enable sparse trie structure reuse for continuation payloads
When a new payload's parent matches the preserved trie's block hash,
the pruned trie structure is now reused directly instead of being cleared.
This enables incremental trie updates without rebuilding from scratch.

For non-continuation payloads, the trie is still cleared but allocations
are preserved for memory efficiency.

Amp-Thread-ID: https://ampcode.com/threads/T-019c06cb-b73a-756c-963b-c1ef3db6a6bb
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 00:07:30 +01:00
Matthias Seitz
9433ac5666 fix(engine): prevent race condition in sparse trie preservation
Acquire guard before sending state root result to ensure the next block's
take() blocks until we've stored the trie for reuse.

Also always clear the trie (keeping allocations) because the pruned trie
cannot be directly reused - prune() deletes values needed for state root
computation and clears revealed_account_paths which breaks proof filtering.

Amp-Thread-ID: https://ampcode.com/threads/T-019c060d-2327-741f-b8d2-6d246e5d521e
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 00:07:26 +01:00
Matthias Seitz
0796a20f24 fix(engine): clear trie on failed state root computation
A failed computation may leave the trie in a partially updated state.
Only preserve the trie for reuse when computation succeeds.

Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c0566-6cc1-7415-a083-e82fa071cdc7
2026-01-28 18:20:09 +01:00
Matthias Seitz
2633d3e513 refactor(engine): improve sparse trie task API and error handling
- Change run() to take &mut self instead of consuming self
- Add into_cleared_trie() for invalid/cancelled payloads (skips pruning)
- Clear trie when state root receiver is dropped
- Improve documentation with cross-references

Amp-Thread-ID: https://ampcode.com/threads/T-019c0566-6cc1-7415-a083-e82fa071cdc7
Co-authored-by: Amp <amp@ampcode.com>
2026-01-28 17:55:16 +01:00
Matthias Seitz
1f8fb7e58f feat(engine): preserve sparse trie across payload validations
Implement sparse trie preservation to reduce memory allocations when
validating consecutive blocks (parent-child relationship).

Changes:
- Add PreservedSparseTrie to store trie with its computed block hash
- Add SharedPreservedSparseTrie newtype for cleaner handle passing
- Add prune_depth and max_storage_tries config to TreeConfig
- Add SparseStateTrie::clear() and shrink_to() methods
- Prune trie after sending state root result (non-blocking)
- Reuse pruned trie when new payload's parent matches stored block hash
- Clear trie (keeping allocations) when not a continuation

RETH-183

Amp-Thread-ID: https://ampcode.com/threads/T-019c0566-6cc1-7415-a083-e82fa071cdc7
Co-authored-by: Amp <amp@ampcode.com>
2026-01-28 17:44:55 +01:00
15 changed files with 1213 additions and 385 deletions

View File

@@ -47,6 +47,17 @@ pub const DEFAULT_RESERVED_CPU_CORES: usize = 1;
/// Default maximum concurrency for prewarm task.
pub const DEFAULT_PREWARM_MAX_CONCURRENCY: usize = 16;
/// Default depth for sparse trie pruning.
///
/// Nodes at this depth and below are converted to hash stubs to reduce memory.
/// Depth 4 means we keep roughly 16^4 = 65536 potential branch paths at most.
pub const DEFAULT_SPARSE_TRIE_PRUNE_DEPTH: usize = 4;
/// Default maximum number of storage tries to keep after pruning.
///
/// Storage tries beyond this limit are cleared (but allocations preserved).
pub const DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES: usize = 100;
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = EPOCH_SLOTS as u32 * 2;
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
@@ -154,6 +165,10 @@ pub struct TreeConfig {
disable_cache_metrics: bool,
/// Whether to enable sparse trie as cache.
enable_sparse_trie_as_cache: bool,
/// Depth for sparse trie pruning after state root computation.
sparse_trie_prune_depth: usize,
/// Maximum number of storage tries to retain after pruning.
sparse_trie_max_storage_tries: usize,
}
impl Default for TreeConfig {
@@ -184,6 +199,8 @@ impl Default for TreeConfig {
disable_proof_v2: false,
disable_cache_metrics: false,
enable_sparse_trie_as_cache: false,
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
}
}
}
@@ -216,6 +233,8 @@ impl TreeConfig {
account_worker_count: usize,
disable_proof_v2: bool,
disable_cache_metrics: bool,
sparse_trie_prune_depth: usize,
sparse_trie_max_storage_tries: usize,
) -> Self {
Self {
persistence_threshold,
@@ -243,6 +262,8 @@ impl TreeConfig {
disable_proof_v2,
disable_cache_metrics,
enable_sparse_trie_as_cache: false,
sparse_trie_prune_depth,
sparse_trie_max_storage_tries,
}
}
@@ -555,4 +576,26 @@ impl TreeConfig {
self.enable_sparse_trie_as_cache = value;
self
}
/// Returns the sparse trie prune depth.
pub const fn sparse_trie_prune_depth(&self) -> usize {
self.sparse_trie_prune_depth
}
/// Setter for sparse trie prune depth.
pub const fn with_sparse_trie_prune_depth(mut self, depth: usize) -> Self {
self.sparse_trie_prune_depth = depth;
self
}
/// Returns the maximum number of storage tries to retain after pruning.
pub const fn sparse_trie_max_storage_tries(&self) -> usize {
self.sparse_trie_max_storage_tries
}
/// Setter for maximum storage tries to retain.
pub const fn with_sparse_trie_max_storage_tries(mut self, max_tries: usize) -> Self {
self.sparse_trie_max_storage_tries = max_tries;
self
}
}

View File

@@ -7,7 +7,7 @@ use crate::tree::{
prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
sparse_trie::StateRootComputeOutcome,
},
sparse_trie::{SparseTrieCacheTask, SparseTrieTask},
sparse_trie::{SparseTrieCacheTask, SparseTrieTask, SpawnedSparseTrieTask},
StateProviderBuilder, TreeConfig,
};
use alloy_eip7928::BlockAccessList;
@@ -39,7 +39,7 @@ use reth_trie_parallel::{
proof_task::{ProofTaskCtx, ProofWorkerHandle},
root::ParallelStateRootError,
};
use reth_trie_sparse::{ClearedSparseStateTrie, RevealableSparseTrie, SparseStateTrie};
use reth_trie_sparse::{RevealableSparseTrie, SparseStateTrie};
use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
use std::{
collections::BTreeMap,
@@ -56,10 +56,13 @@ use tracing::{debug, debug_span, instrument, warn, Span};
pub mod bal;
pub mod executor;
pub mod multiproof;
mod preserved_sparse_trie;
pub mod prewarm;
pub mod receipt_root_task;
pub mod sparse_trie;
use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
///
/// These values were determined by performing benchmarks using gradually increasing values to judge
@@ -122,13 +125,16 @@ where
precompile_cache_disabled: bool,
/// Precompile cache map.
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
/// A cleared `SparseStateTrie`, kept around to be reused for the state root computation so
/// that allocations can be minimized.
sparse_state_trie: Arc<
parking_lot::Mutex<Option<ClearedSparseStateTrie<ParallelSparseTrie, ParallelSparseTrie>>>,
>,
/// A pruned `SparseStateTrie`, kept around as a cache of already revealed trie nodes and to
/// re-use allocated memory. Stored with the block hash it was computed for to enable trie
/// preservation across sequential payload validations.
sparse_state_trie: SharedPreservedSparseTrie,
/// Maximum concurrency for prewarm task.
prewarm_max_concurrency: usize,
/// Sparse trie prune depth.
sparse_trie_prune_depth: usize,
/// Maximum storage tries to retain after pruning.
sparse_trie_max_storage_tries: usize,
/// Whether to disable cache metrics recording.
disable_cache_metrics: bool,
}
@@ -160,8 +166,10 @@ where
disable_state_cache: config.disable_state_cache(),
precompile_cache_disabled: config.precompile_cache_disabled(),
precompile_cache_map,
sparse_state_trie: Arc::default(),
sparse_state_trie: SharedPreservedSparseTrie::default(),
prewarm_max_concurrency: config.prewarm_max_concurrency(),
sparse_trie_prune_depth: config.sparse_trie_prune_depth(),
sparse_trie_max_storage_tries: config.sparse_trie_max_storage_tries(),
disable_cache_metrics: config.disable_cache_metrics(),
}
}
@@ -237,6 +245,9 @@ where
// Extract V2 proofs flag early so we can pass it to prewarm
let v2_proofs_enabled = !config.disable_proof_v2();
// Capture parent_state_root before env is moved into spawn_caching_with
let parent_state_root = env.parent_state_root;
// Handle BAL-based optimization if available
let prewarm_handle = if let Some(bal) = bal {
// When BAL is present, use BAL prewarming and send BAL to multiproof
@@ -318,6 +329,7 @@ where
state_root_tx,
from_multi_proof,
config,
parent_state_root,
);
PayloadHandle {
@@ -497,6 +509,8 @@ where
}
/// Spawns the [`SparseTrieTask`] for this payload processor.
///
/// The trie is preserved when the new payload is a child of the previous one.
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
fn spawn_sparse_trie_task(
&self,
@@ -505,64 +519,114 @@ where
state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
from_multi_proof: CrossbeamReceiver<MultiProofMessage>,
config: &TreeConfig,
parent_state_root: B256,
) {
let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
let preserved_sparse_trie = self.sparse_state_trie.clone();
let trie_metrics = self.trie_metrics.clone();
let span = Span::current();
let disable_sparse_trie_as_cache = !config.enable_sparse_trie_as_cache();
let prune_depth = self.sparse_trie_prune_depth;
let max_storage_tries = self.sparse_trie_max_storage_tries;
self.executor.spawn_blocking(move || {
let _enter = span.entered();
// Reuse a stored SparseStateTrie, or create a new one using the desired configuration
// if there's none to reuse.
let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
let default_trie = RevealableSparseTrie::blind_from(
ParallelSparseTrie::default()
.with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
);
ClearedSparseStateTrie::from_state_trie(
// Reuse a stored SparseStateTrie if available, applying continuation logic.
// If this payload's parent state root matches the preserved trie's anchor,
// we can reuse the pruned trie structure. Otherwise, we clear the trie but
// keep allocations.
let sparse_state_trie = preserved_sparse_trie
.take()
.map(|preserved| preserved.into_trie_for(parent_state_root))
.unwrap_or_else(|| {
debug!(
target: "engine::tree::payload_processor",
"Creating new sparse trie - no preserved trie available"
);
let default_trie = RevealableSparseTrie::blind_from(
ParallelSparseTrie::default().with_parallelism_thresholds(
PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS,
),
);
SparseStateTrie::new()
.with_accounts_trie(default_trie.clone())
.with_default_storage_trie(default_trie)
.with_updates(true),
)
});
.with_updates(true)
});
let (result, trie) = if disable_sparse_trie_as_cache {
SparseTrieTask::new_with_cleared_trie(
let mut task = if disable_sparse_trie_as_cache {
SpawnedSparseTrieTask::Cleared(SparseTrieTask::new(
sparse_trie_rx,
proof_worker_handle,
trie_metrics,
trie_metrics.clone(),
sparse_state_trie,
)
.run()
))
} else {
SparseTrieCacheTask::new_with_cleared_trie(
SpawnedSparseTrieTask::Cached(SparseTrieCacheTask::new(
from_multi_proof,
proof_worker_handle,
trie_metrics,
trie_metrics.clone(),
sparse_state_trie,
)
.run()
))
};
// Send state root computation result
let _ = state_root_tx.send(result);
let result = task.run();
if let Err(e) = &result {
tracing::error!(target: "engine::tree::payload_processor", "State root computation failed: {e:?}");
}
// Capture the computed state_root before sending the result
let computed_state_root = result.as_ref().ok().map(|outcome| outcome.state_root);
// Clear the SparseStateTrie, shrink, and replace it back into the mutex _after_ sending
// results to the next step, so that time spent clearing doesn't block the step after
// this one.
let _enter = debug_span!(target: "engine::tree::payload_processor", "clear").entered();
let mut cleared_trie = ClearedSparseStateTrie::from_state_trie(trie);
// Acquire the guard before sending the result to prevent a race condition:
// Without this, the next block could start after send() but before store(),
// causing take() to return None and forcing it to create a new empty trie
// instead of reusing the preserved one. Holding the guard ensures the next
// block's take() blocks until we've stored the trie for reuse.
let mut guard = preserved_sparse_trie.lock();
// Shrink the sparse trie so that we don't have ever increasing memory.
cleared_trie.shrink_to(
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
);
// Send state root computation result - next block may start but will block on take()
if state_root_tx.send(result).is_err() {
// Receiver dropped - payload was likely invalid or cancelled.
// Clear the trie instead of preserving potentially invalid state.
debug!(
target: "engine::tree::payload_processor",
"State root receiver dropped, clearing trie"
);
let trie = task.into_cleared_trie(
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
);
guard.store(PreservedSparseTrie::cleared(trie));
return;
}
cleared_sparse_trie.lock().replace(cleared_trie);
// Only preserve the trie as anchored if computation succeeded.
// A failed computation may have left the trie in a partially updated state.
let _enter =
debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
if let Some(state_root) = computed_state_root {
let start = std::time::Instant::now();
let trie = task.into_trie_for_reuse(
prune_depth,
max_storage_tries,
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
);
trie_metrics
.into_trie_for_reuse_duration_histogram
.record(start.elapsed().as_secs_f64());
guard.store(PreservedSparseTrie::anchored(trie, state_root));
} else {
debug!(
target: "engine::tree::payload_processor",
"State root computation failed, clearing trie"
);
let trie = task.into_cleared_trie(
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
);
guard.store(PreservedSparseTrie::cleared(trie));
}
});
}
@@ -896,6 +960,10 @@ pub struct ExecutionEnv<Evm: ConfigureEvm> {
pub hash: B256,
/// Hash of the parent block.
pub parent_hash: B256,
/// State root of the parent block.
/// Used for sparse trie continuation: if the preserved trie's anchor matches this,
/// the trie can be reused directly.
pub parent_state_root: B256,
}
impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
@@ -907,6 +975,7 @@ where
evm_env: Default::default(),
hash: Default::default(),
parent_hash: Default::default(),
parent_state_root: Default::default(),
}
}
}

View File

@@ -311,11 +311,7 @@ impl VersionedMultiProofTargets {
fn chunking_length(&self) -> usize {
match self {
Self::Legacy(targets) => targets.chunking_length(),
Self::V2(targets) => {
// For V2, count accounts + storage slots
targets.account_targets.len() +
targets.storage_targets.values().map(|slots| slots.len()).sum::<usize>()
}
Self::V2(targets) => targets.chunking_length(),
}
}
@@ -587,6 +583,8 @@ pub(crate) struct MultiProofTaskMetrics {
pub first_update_wait_time_histogram: Histogram,
/// Total time spent waiting for the last proof result.
pub last_proof_wait_time_histogram: Histogram,
/// Time spent preparing the sparse trie for reuse after state root computation.
pub into_trie_for_reuse_duration_histogram: Histogram,
}
/// Standalone task that receives a transaction state stream and updates relevant
@@ -1492,7 +1490,7 @@ fn get_proof_targets(
/// Dispatches work items as a single unit or in chunks based on target size and worker
/// availability.
#[allow(clippy::too_many_arguments)]
fn dispatch_with_chunking<T, I>(
pub(crate) fn dispatch_with_chunking<T, I>(
items: T,
chunking_len: usize,
chunk_size: Option<usize>,

View File

@@ -0,0 +1,117 @@
//! Preserved sparse trie for reuse across payload validations.
use alloy_primitives::B256;
use parking_lot::Mutex;
use reth_trie_sparse::SparseStateTrie;
use reth_trie_sparse_parallel::ParallelSparseTrie;
use std::sync::Arc;
use tracing::debug;
/// Type alias for the sparse trie type used in preservation.
pub(super) type SparseTrie = SparseStateTrie<ParallelSparseTrie, ParallelSparseTrie>;
/// Shared handle to a preserved sparse trie that can be reused across payload validations.
///
/// This is stored in [`PayloadProcessor`](super::PayloadProcessor) and cloned to pass to
/// [`SparseTrieTask`](super::sparse_trie::SparseTrieTask) for trie reuse.
#[derive(Debug, Default, Clone)]
pub(super) struct SharedPreservedSparseTrie(Arc<Mutex<Option<PreservedSparseTrie>>>);
impl SharedPreservedSparseTrie {
/// Takes the preserved trie if present, leaving `None` in its place.
pub(super) fn take(&self) -> Option<PreservedSparseTrie> {
self.0.lock().take()
}
/// Acquires a guard that blocks `take()` until dropped.
/// Use this before sending the state root result to ensure the next block
/// waits for the trie to be stored.
pub(super) fn lock(&self) -> PreservedTrieGuard<'_> {
PreservedTrieGuard(self.0.lock())
}
}
/// Guard that holds the lock on the preserved trie.
/// While held, `take()` will block. Call `store()` to save the trie before dropping.
pub(super) struct PreservedTrieGuard<'a>(parking_lot::MutexGuard<'a, Option<PreservedSparseTrie>>);
impl PreservedTrieGuard<'_> {
/// Stores a preserved trie for later reuse.
pub(super) fn store(&mut self, trie: PreservedSparseTrie) {
self.0.replace(trie);
}
}
/// A preserved sparse trie that can be reused across payload validations.
///
/// The trie exists in one of two states:
/// - **Anchored**: Has a computed state root and can be reused for payloads whose parent state root
/// matches the anchor.
/// - **Cleared**: Trie data has been cleared but allocations are preserved for reuse.
#[derive(Debug)]
pub(super) enum PreservedSparseTrie {
/// Trie with a computed state root that can be reused for continuation payloads.
Anchored {
/// The sparse state trie (pruned after root computation).
trie: SparseTrie,
/// The state root this trie represents (computed from the previous block).
/// Used to verify continuity: new payload's `parent_state_root` must match this.
state_root: B256,
},
/// Cleared trie with preserved allocations, ready for fresh use.
Cleared {
/// The sparse state trie with cleared data but preserved allocations.
trie: SparseTrie,
},
}
impl PreservedSparseTrie {
/// Creates a new anchored preserved trie.
///
/// The `state_root` is the computed state root from the trie, which becomes the
/// anchor for determining if subsequent payloads can reuse this trie.
pub(super) const fn anchored(trie: SparseTrie, state_root: B256) -> Self {
Self::Anchored { trie, state_root }
}
/// Creates a cleared preserved trie (allocations preserved, data cleared).
pub(super) const fn cleared(trie: SparseTrie) -> Self {
Self::Cleared { trie }
}
/// Consumes self and returns the trie for reuse.
///
/// If the preserved trie is anchored and the parent state root matches, the pruned
/// trie structure is reused directly. Otherwise, the trie is cleared but allocations
/// are preserved to reduce memory overhead.
pub(super) fn into_trie_for(self, parent_state_root: B256) -> SparseTrie {
match self {
Self::Anchored { trie, state_root } if state_root == parent_state_root => {
debug!(
target: "engine::tree::payload_processor",
%state_root,
"Reusing anchored sparse trie for continuation payload"
);
trie
}
Self::Anchored { mut trie, state_root } => {
debug!(
target: "engine::tree::payload_processor",
anchor_root = %state_root,
%parent_state_root,
"Clearing anchored sparse trie - parent state root mismatch"
);
trie.clear();
trie
}
Self::Cleared { trie } => {
debug!(
target: "engine::tree::payload_processor",
%parent_state_root,
"Using cleared sparse trie with preserved allocations"
);
trie
}
}
}
}

View File

@@ -1,18 +1,21 @@
//! Sparse Trie task related functionality.
use crate::tree::{
multiproof::{evm_state_to_hashed_post_state, MultiProofMessage, VersionedMultiProofTargets},
multiproof::{
dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
VersionedMultiProofTargets,
},
payload_processor::multiproof::{MultiProofTaskMetrics, SparseTrieUpdate},
};
use alloy_primitives::B256;
use alloy_rlp::Decodable;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use rayon::iter::{ParallelBridge, ParallelIterator};
use reth_errors::ProviderError;
use rayon::iter::{IntoParallelRefMutIterator, ParallelBridge, ParallelIterator};
use reth_primitives_traits::Account;
use reth_revm::state::EvmState;
use reth_trie::{
proof_v2::Target, updates::TrieUpdates, HashedPostState, Nibbles, TrieAccount, EMPTY_ROOT_HASH,
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
TrieAccount, EMPTY_ROOT_HASH,
};
use reth_trie_parallel::{
proof_task::{
@@ -23,10 +26,9 @@ use reth_trie_parallel::{
targets_v2::MultiProofTargetsV2,
};
use reth_trie_sparse::{
errors::{SparseStateTrieResult, SparseTrieErrorKind},
errors::{SparseStateTrieResult, SparseTrieErrorKind, SparseTrieResult},
provider::{TrieNodeProvider, TrieNodeProviderFactory},
ClearedSparseStateTrie, LeafUpdate, SerialSparseTrie, SparseStateTrie, SparseTrie,
SparseTrieExt,
LeafUpdate, SerialSparseTrie, SparseStateTrie, SparseTrie, SparseTrieExt,
};
use revm_primitives::{hash_map::Entry, B256Map};
use smallvec::SmallVec;
@@ -34,7 +36,65 @@ use std::{
sync::mpsc,
time::{Duration, Instant},
};
use tracing::{debug, debug_span, instrument, trace};
use tracing::{debug, debug_span, error, instrument, trace};
#[expect(clippy::large_enum_variant)]
pub(super) enum SpawnedSparseTrieTask<BPF, A, S>
where
BPF: TrieNodeProviderFactory + Send + Sync,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
{
Cleared(SparseTrieTask<BPF, A, S>),
Cached(SparseTrieCacheTask<A, S>),
}
impl<BPF, A, S> SpawnedSparseTrieTask<BPF, A, S>
where
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
{
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
match self {
Self::Cleared(task) => task.run(),
Self::Cached(task) => task.run(),
}
}
pub(super) fn into_trie_for_reuse(
self,
prune_depth: usize,
max_storage_tries: usize,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> SparseStateTrie<A, S> {
match self {
Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
Self::Cached(task) => task.into_trie_for_reuse(
prune_depth,
max_storage_tries,
max_nodes_capacity,
max_values_capacity,
),
}
}
pub(super) fn into_cleared_trie(
self,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> SparseStateTrie<A, S> {
match self {
Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
Self::Cached(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
}
}
}
/// A task responsible for populating the sparse trie.
pub(super) struct SparseTrieTask<BPF, A = SerialSparseTrie, S = SerialSparseTrie>
@@ -57,46 +117,29 @@ where
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
{
/// Creates a new sparse trie, pre-populating with a [`ClearedSparseStateTrie`].
pub(super) fn new_with_cleared_trie(
/// Creates a new sparse trie task with the given trie.
pub(super) const fn new(
updates: mpsc::Receiver<SparseTrieUpdate>,
blinded_provider_factory: BPF,
metrics: MultiProofTaskMetrics,
sparse_state_trie: ClearedSparseStateTrie<A, S>,
trie: SparseStateTrie<A, S>,
) -> Self {
Self { updates, metrics, trie: sparse_state_trie.into_inner(), blinded_provider_factory }
Self { updates, metrics, trie, blinded_provider_factory }
}
/// Runs the sparse trie task to completion.
/// Runs the sparse trie task to completion, computing the state root.
///
/// This waits for new incoming [`SparseTrieUpdate`].
///
/// This concludes once the last trie update has been received.
///
/// # Returns
///
/// - State root computation outcome.
/// - `SparseStateTrie` that needs to be cleared and reused to avoid reallocations.
/// Receives [`SparseTrieUpdate`]s until the channel is closed, applying each update
/// to the trie. Once all updates are processed, computes and returns the final state root.
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
pub(super) fn run(
mut self,
) -> (Result<StateRootComputeOutcome, ParallelStateRootError>, SparseStateTrie<A, S>) {
// run the main loop to completion
let result = self.run_inner();
(result, self.trie)
}
/// Inner function to run the sparse trie task to completion.
///
/// See [`Self::run`] for more information.
fn run_inner(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
let now = Instant::now();
let mut num_iterations = 0;
@@ -146,6 +189,20 @@ where
Ok(StateRootComputeOutcome { state_root, trie_updates })
}
/// Clears and shrinks the trie, discarding all state.
///
/// Use this when the payload was invalid or cancelled - we don't want to preserve
/// potentially invalid trie state, but we keep the allocations for reuse.
pub(super) fn into_cleared_trie(
mut self,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> SparseStateTrie<A, S> {
self.trie.clear();
self.trie.shrink_to(max_nodes_capacity, max_values_capacity);
self.trie
}
}
/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
@@ -184,6 +241,12 @@ pub(super) struct SparseTrieCacheTask<A = SerialSparseTrie, S = SerialSparseTrie
/// Cache of storage proof targets that have already been fetched/requested from the proof
/// workers. account -> slot -> lowest `min_len` requested.
fetched_storage_targets: B256Map<B256Map<u8>>,
/// Whether the last state update has been received.
finished_state_updates: bool,
/// Pending targets to be dispatched to the proof workers.
pending_targets: MultiProofTargetsV2,
/// Number of pending updates that were received but not yet processed.
pending_updates: usize,
/// Metrics for the sparse trie.
metrics: MultiProofTaskMetrics,
}
@@ -194,11 +257,11 @@ where
S: SparseTrieExt + Default + Clone,
{
/// Creates a new sparse trie, pre-populating with a [`ClearedSparseStateTrie`].
pub(super) fn new_with_cleared_trie(
pub(super) fn new(
updates: CrossbeamReceiver<MultiProofMessage>,
proof_worker_handle: ProofWorkerHandle,
metrics: MultiProofTaskMetrics,
sparse_state_trie: ClearedSparseStateTrie<A, S>,
trie: SparseStateTrie<A, S>,
) -> Self {
let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
Self {
@@ -206,55 +269,64 @@ where
proof_result_rx,
updates,
proof_worker_handle,
trie: sparse_state_trie.into_inner(),
trie,
account_updates: Default::default(),
storage_updates: Default::default(),
pending_account_updates: Default::default(),
fetched_account_targets: Default::default(),
fetched_storage_targets: Default::default(),
finished_state_updates: Default::default(),
pending_targets: Default::default(),
pending_updates: Default::default(),
metrics,
}
}
/// Prunes and shrinks the trie for reuse in the next payload built on top of this one.
///
/// Should be called after the state root result has been sent.
pub(super) fn into_trie_for_reuse(
mut self,
prune_depth: usize,
max_storage_tries: usize,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> SparseStateTrie<A, S> {
self.trie.prune(prune_depth, max_storage_tries);
self.trie.shrink_to(max_nodes_capacity, max_values_capacity);
self.trie
}
/// Clears and shrinks the trie, discarding all state.
///
/// Use this when the payload was invalid or cancelled - we don't want to preserve
/// potentially invalid trie state, but we keep the allocations for reuse.
pub(super) fn into_cleared_trie(
mut self,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> SparseStateTrie<A, S> {
self.trie.clear();
self.trie.shrink_to(max_nodes_capacity, max_values_capacity);
self.trie
}
/// Runs the sparse trie task to completion.
///
/// This waits for new incoming [`MultiProofMessage`]s, applies updates to the trie and
/// schedules proof fetching when needed.
///
/// This concludes once the last state update has been received and processed.
///
/// # Returns
///
/// - State root computation outcome.
/// - `SparseStateTrie` that needs to be cleared and reused to avoid reallocations.
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
pub(super) fn run(
mut self,
) -> (Result<StateRootComputeOutcome, ParallelStateRootError>, SparseStateTrie<A, S>) {
// run the main loop to completion
let result = self.run_inner();
(result, self.trie)
}
/// Inner function to run the sparse trie task to completion.
///
/// See [`Self::run`] for more information.
fn run_inner(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
let now = Instant::now();
let mut finished_state_updates = false;
loop {
crossbeam_channel::select_biased! {
recv(self.proof_result_rx) -> message => {
let Ok(result) = message else {
unreachable!("we own the sender half")
};
self.on_proof_result(result)?;
},
recv(self.updates) -> message => {
let update = match message {
Ok(m) => m,
@@ -263,27 +335,39 @@ where
}
};
match update {
MultiProofMessage::PrefetchProofs(targets) => {
self.on_prewarm_targets(targets);
}
MultiProofMessage::StateUpdate(_, state) => {
self.on_state_update(state);
}
MultiProofMessage::EmptyProof { sequence_number: _, state } => {
self.on_hashed_state_update(state);
}
MultiProofMessage::BlockAccessList(_) => todo!(),
MultiProofMessage::FinishedStateUpdates => {
finished_state_updates = true;
}
}
self.on_multiproof_message(update);
self.pending_updates += 1;
}
recv(self.proof_result_rx) -> message => {
let Ok(result) = message else {
unreachable!("we own the sender half")
};
let ProofResult::V2(mut result) = result.result? else {
unreachable!("sparse trie as cache must only be used with multiproof v2");
};
while let Ok(res) = self.proof_result_rx.try_recv() {
let ProofResult::V2(res) = res.result? else {
unreachable!("sparse trie as cache must only be used with multiproof v2");
};
result.extend(res);
}
self.on_proof_result(result)?;
},
}
self.process_updates()?;
if self.updates.is_empty() && self.proof_result_rx.is_empty() {
self.dispatch_pending_targets();
self.process_updates()?;
self.dispatch_pending_targets();
} else if self.updates.is_empty() || self.pending_updates > 100 {
self.process_leaf_updates()?;
self.dispatch_pending_targets();
} else if self.updates.is_empty() || self.pending_targets.chunking_length() > 100 {
self.dispatch_pending_targets();
}
if finished_state_updates &&
if self.finished_state_updates &&
self.account_updates.is_empty() &&
self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
{
@@ -291,11 +375,6 @@ where
}
}
// Process any remaining pending account updates.
if !self.pending_account_updates.is_empty() {
self.process_updates()?;
}
debug!(target: "engine::root", "All proofs processed, ending calculation");
let start = Instant::now();
@@ -311,6 +390,21 @@ where
Ok(StateRootComputeOutcome { state_root, trie_updates })
}
fn on_multiproof_message(&mut self, message: MultiProofMessage) {
match message {
MultiProofMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
MultiProofMessage::StateUpdate(_, state) => self.on_state_update(state),
MultiProofMessage::EmptyProof { .. } => unreachable!(),
MultiProofMessage::BlockAccessList(_) => todo!(),
MultiProofMessage::FinishedStateUpdates => self.finished_state_updates = true,
}
}
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
fn on_prewarm_targets(&mut self, targets: VersionedMultiProofTargets) {
let VersionedMultiProofTargets::V2(targets) = targets else {
unreachable!("sparse trie as cache must only be used with V2 multiproof targets");
@@ -346,11 +440,7 @@ where
)]
fn on_state_update(&mut self, update: EvmState) {
let hashed_state_update = evm_state_to_hashed_post_state(update);
self.on_hashed_state_update(hashed_state_update)
}
/// Processes a hashed state update and encodes all state changes as trie updates.
fn on_hashed_state_update(&mut self, hashed_state_update: HashedPostState) {
for (address, storage) in hashed_state_update.storages {
for (slot, value) in storage.storage {
let encoded = if value.is_zero() {
@@ -388,74 +478,138 @@ where
fn on_proof_result(
&mut self,
result: ProofResultMessage,
result: DecodedMultiProofV2,
) -> Result<(), ParallelStateRootError> {
let ProofResult::V2(result) = result.result? else {
unreachable!("sparse trie as cache must only be used with multiproof v2");
};
self.trie.reveal_decoded_multiproof_v2(result).map_err(|e| {
ParallelStateRootError::Other(format!("could not reveal multiproof: {e:?}"))
})
}
/// Applies updates to the sparse trie and dispatches requested multiproof targets.
fn process_updates(&mut self) -> Result<(), ProviderError> {
let mut targets = MultiProofTargetsV2::default();
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
fn process_leaf_updates(&mut self) -> SparseTrieResult<()> {
self.pending_updates = 0;
for (addr, updates) in &mut self.storage_updates {
let trie = self.trie.get_or_create_storage_trie_mut(*addr);
let fetched_storage = self.fetched_storage_targets.entry(*addr).or_default();
// Make sure that tries exist for all addresses that have updates.
for address in self.storage_updates.keys() {
self.trie.get_or_create_storage_trie_mut(*address);
}
trie.update_leaves(updates, |path, min_len| match fetched_storage.entry(path) {
let storage_results: Vec<_> = self
.trie
.storage_tries()
.iter_mut()
.filter_map(|(address, trie)| {
let updates = self.storage_updates.remove(address)?;
let fetched = self.fetched_storage_targets.remove(address).unwrap_or_default();
Some((address, updates, fetched, trie))
})
.par_bridge()
.map(|(address, mut updates, mut fetched, trie)| {
let mut targets = Vec::new();
trie.update_leaves(&mut updates, |path, min_len| match fetched.entry(path) {
Entry::Occupied(mut entry) => {
if min_len < *entry.get() {
entry.insert(min_len);
targets.push(Target::new(path).with_min_len(min_len));
}
}
Entry::Vacant(entry) => {
entry.insert(min_len);
targets.push(Target::new(path).with_min_len(min_len));
}
})?;
SparseTrieResult::Ok((address, targets, fetched, updates))
})
.collect::<Result<Vec<_>, _>>()?;
for (address, targets, fetched, updates) in storage_results {
self.fetched_storage_targets.insert(*address, fetched);
self.storage_updates.insert(*address, updates);
if !targets.is_empty() {
self.pending_targets.storage_targets.entry(*address).or_default().extend(targets);
}
}
// Process account trie updates and fill the account targets.
self.trie.trie_mut().update_leaves(
&mut self.account_updates,
|target, min_len| match self.fetched_account_targets.entry(target) {
Entry::Occupied(mut entry) => {
if min_len < *entry.get() {
entry.insert(min_len);
targets
.storage_targets
.entry(*addr)
.or_default()
.push(Target::new(path).with_min_len(min_len));
self.pending_targets
.account_targets
.push(Target::new(target).with_min_len(min_len));
}
}
Entry::Vacant(entry) => {
entry.insert(min_len);
targets
.storage_targets
.entry(*addr)
.or_default()
.push(Target::new(path).with_min_len(min_len));
self.pending_targets
.account_targets
.push(Target::new(target).with_min_len(min_len));
}
})
.map_err(ProviderError::other)?;
},
)?;
// If all storage updates were processed, we can now compute the new storage root.
if updates.is_empty() {
let storage_root =
Ok(())
}
/// Applies updates to the sparse trie and dispatches requested multiproof targets.
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
fn process_updates(&mut self) -> SparseTrieResult<()> {
self.process_leaf_updates()?;
if self.pending_account_updates.is_empty() {
return Ok(());
}
let roots = self
.trie
.storage_tries()
.par_iter_mut()
.filter(|(address, _)| {
self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty())
})
.map(|(address, trie)| {
let root =
trie.root().expect("updates are drained, trie should be revealed by now");
// If there is a pending account update for this address with known info, we can
// encode it into proper update right away.
if let Entry::Occupied(entry) = self.pending_account_updates.entry(*addr) &&
entry.get().is_some()
(address, root)
})
.collect::<Vec<_>>();
for (address, storage_root) in roots {
// If the storage root is known and we have a pending update for this account, encode it
// into a proper update.
if let Entry::Occupied(entry) = self.pending_account_updates.entry(*address) &&
entry.get().is_some()
{
let account = entry.remove().expect("just checked, should be Some");
let encoded = if account.is_none_or(|account| account.is_empty()) &&
storage_root == EMPTY_ROOT_HASH
{
let account = entry.remove().expect("just checked, should be Some");
let encoded = if account.is_none_or(|account| account.is_empty()) &&
storage_root == EMPTY_ROOT_HASH
{
Vec::new()
} else {
// TODO: optimize allocation
alloy_rlp::encode(
account.unwrap_or_default().into_trie_account(storage_root),
)
};
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
}
Vec::new()
} else {
// TODO: optimize allocation
alloy_rlp::encode(account.unwrap_or_default().into_trie_account(storage_root))
};
self.account_updates.insert(*address, LeafUpdate::Changed(encoded));
}
}
// Now handle pending account updates that can be upgraded to a proper update.
// Now promote pending account updates if possible.
self.pending_account_updates.retain(|addr, account| {
// If account has pending storage updates, it is still pending.
if self.storage_updates.get(addr).is_some_and(|updates| !updates.is_empty()) {
@@ -475,7 +629,7 @@ where
let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
let (account, storage_root) = if let Some(account) = account.take() {
// If account is Some(_) here it means it didn't have any storage updates
// If account is Some(_) here it means it didn't have any storage updates
// and we can fetch the storage root directly from the account trie.
//
// If it did have storage updates, we would've had processed it above when iterating over storage tries.
@@ -500,38 +654,63 @@ where
});
// Process account trie updates and fill the account targets.
self.trie
.trie_mut()
.update_leaves(&mut self.account_updates, |target, min_len| {
match self.fetched_account_targets.entry(target) {
Entry::Occupied(mut entry) => {
if min_len < *entry.get() {
entry.insert(min_len);
targets.account_targets.push(Target::new(target).with_min_len(min_len));
}
}
Entry::Vacant(entry) => {
self.trie.trie_mut().update_leaves(
&mut self.account_updates,
|target, min_len| match self.fetched_account_targets.entry(target) {
Entry::Occupied(mut entry) => {
if min_len < *entry.get() {
entry.insert(min_len);
targets.account_targets.push(Target::new(target).with_min_len(min_len));
self.pending_targets
.account_targets
.push(Target::new(target).with_min_len(min_len));
}
}
})
.map_err(ProviderError::other)?;
if !targets.is_empty() {
self.proof_worker_handle.dispatch_account_multiproof(AccountMultiproofInput::V2 {
targets,
proof_result_sender: ProofResultContext::new(
self.proof_result_tx.clone(),
0,
HashedPostState::default(),
Instant::now(),
),
})?;
}
Entry::Vacant(entry) => {
entry.insert(min_len);
self.pending_targets
.account_targets
.push(Target::new(target).with_min_len(min_len));
}
},
)?;
Ok(())
}
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
fn dispatch_pending_targets(&mut self) {
if !self.pending_targets.is_empty() {
let chunking_length = self.pending_targets.chunking_length();
dispatch_with_chunking(
std::mem::take(&mut self.pending_targets),
chunking_length,
Some(60),
300,
self.proof_worker_handle.available_account_workers(),
self.proof_worker_handle.available_storage_workers(),
MultiProofTargetsV2::chunks,
|proof_targets| {
if let Err(e) = self.proof_worker_handle.dispatch_account_multiproof(
AccountMultiproofInput::V2 {
targets: proof_targets,
proof_result_sender: ProofResultContext::new(
self.proof_result_tx.clone(),
0,
HashedPostState::default(),
Instant::now(),
),
},
) {
error!("failed to dispatch account multiproof: {e:?}");
}
},
);
}
}
}
/// Outcome of the state root computation, including the state root itself with

View File

@@ -402,7 +402,12 @@ where
.in_scope(|| self.evm_env_for(&input))
.map_err(NewPayloadError::other)?;
let env = ExecutionEnv { evm_env, hash: input.hash(), parent_hash: input.parent_hash() };
let env = ExecutionEnv {
evm_env,
hash: input.hash(),
parent_hash: input.parent_hash(),
parent_state_root: parent_block.state_root(),
};
// Plan the strategy used for state root computation.
let strategy = self.plan_state_root_computation();

View File

@@ -1,4 +1,4 @@
use crate::utils::eth_payload_attributes;
use crate::utils::{advance_with_random_transactions, eth_payload_attributes};
use alloy_eips::eip7685::RequestsOrHash;
use alloy_genesis::Genesis;
use alloy_primitives::{Address, B256};
@@ -6,8 +6,9 @@ use alloy_rpc_types_engine::{PayloadAttributes, PayloadStatusEnum};
use jsonrpsee_core::client::ClientT;
use reth_chainspec::{ChainSpecBuilder, EthChainSpec, MAINNET};
use reth_e2e_test_utils::{
node::NodeTestContext, setup, transaction::TransactionTestContext, wallet::Wallet,
node::NodeTestContext, setup, setup_engine, transaction::TransactionTestContext, wallet::Wallet,
};
use reth_node_api::TreeConfig;
use reth_node_builder::{NodeBuilder, NodeHandle};
use reth_node_core::{args::RpcServerArgs, node_config::NodeConfig};
use reth_node_ethereum::EthereumNode;
@@ -256,3 +257,56 @@ async fn test_testing_build_block_v1_osaka() -> eyre::Result<()> {
Ok(())
}
/// Tests that sparse trie allocation reuse works correctly across consecutive blocks.
///
/// This test exercises the sparse trie allocation reuse path by:
/// 1. Starting a node with parallel state root computation enabled
/// 2. Advancing multiple consecutive blocks with random transactions
/// 3. Verifying that all blocks are successfully validated (state roots match)
///
/// Note: Trie structure reuse is currently disabled due to pruning creating blinded
/// nodes. The preserved trie's allocations are still reused to reduce memory overhead,
/// but the trie is cleared between blocks.
#[tokio::test]
async fn test_sparse_trie_reuse_across_blocks() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
// Use parallel state root (non-legacy) with pruning enabled
let tree_config = TreeConfig::default()
.with_legacy_state_root(false)
.with_sparse_trie_prune_depth(2)
.with_sparse_trie_max_storage_tries(100);
let (mut nodes, _tasks, _wallet) = setup_engine::<EthereumNode>(
1,
Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(serde_json::from_str(include_str!("../assets/genesis.json")).unwrap())
.cancun_activated()
.build(),
),
false,
tree_config,
eth_payload_attributes,
)
.await?;
let mut node = nodes.pop().unwrap();
// Use a seeded RNG for reproducibility
let mut rng = rand::rng();
// Advance multiple consecutive blocks with random transactions.
// This exercises the sparse trie reuse path where each block's pruned trie
// is reused for the next block's state root computation.
let num_blocks = 5;
advance_with_random_transactions(&mut node, num_blocks, &mut rng, true).await?;
// Verify the chain advanced correctly
let best_block = node.inner.provider.best_block_number()?;
assert_eq!(best_block, num_blocks as u64, "Expected {} blocks, got {}", num_blocks, best_block);
Ok(())
}

View File

@@ -39,7 +39,7 @@ pub struct StorageSettings {
impl StorageSettings {
/// Returns the default base `StorageSettings` for this build.
///
/// When the `edge` feature is enabled, returns [`Self::edge()`].
/// When the `edge` feature is enabled, returns `Self::edge()`.
/// Otherwise, returns [`Self::legacy()`].
pub const fn base() -> Self {
#[cfg(feature = "edge")]

View File

@@ -4,7 +4,7 @@ use crate::{stats::ParallelTrieTracker, storage_root_targets::StorageRootTargets
use alloy_primitives::B256;
use alloy_rlp::{BufMut, Encodable};
use itertools::Itertools;
use reth_execution_errors::{StateProofError, StorageRootError};
use reth_execution_errors::{SparseTrieError, StateProofError, StorageRootError};
use reth_provider::{DatabaseProviderROFactory, ProviderError};
use reth_storage_errors::db::DatabaseError;
use reth_trie::{
@@ -235,6 +235,9 @@ pub enum ParallelStateRootError {
/// Other unspecified error.
#[error("{_0}")]
Other(String),
/// Sparse trie error.
#[error(transparent)]
SparseTrie(#[from] SparseTrieError),
}
impl From<ParallelStateRootError> for ProviderError {
@@ -245,6 +248,7 @@ impl From<ParallelStateRootError> for ProviderError {
Self::Database(error)
}
ParallelStateRootError::Other(other) => Self::Database(DatabaseError::Other(other)),
ParallelStateRootError::SparseTrie(error) => Self::other(error),
}
}
}

View File

@@ -18,6 +18,17 @@ impl MultiProofTargetsV2 {
pub fn is_empty(&self) -> bool {
self.account_targets.is_empty() && self.storage_targets.is_empty()
}
/// Returns the number of items that will be considered during chunking.
pub fn chunking_length(&self) -> usize {
self.account_targets.len() +
self.storage_targets.values().map(|slots| slots.len()).sum::<usize>()
}
/// Returns an iterator that yields chunks of the specified size.
pub fn chunks(self, chunk_size: usize) -> impl Iterator<Item = Self> {
ChunkedMultiProofTargetsV2::new(self, chunk_size)
}
}
/// An iterator that yields chunks of V2 proof targets of at most `size` account and storage

View File

@@ -123,6 +123,9 @@ pub struct ParallelSparseTrie {
update_actions_buffers: Vec<Vec<SparseTrieUpdatesAction>>,
/// Thresholds controlling when parallelism is enabled for different operations.
parallelism_thresholds: ParallelismThresholds,
/// Tracks heat of lower subtries for smart pruning decisions.
/// Hot subtries are skipped during pruning to keep frequently-used data revealed.
subtrie_heat: SubtrieModifications,
/// Metrics for the parallel sparse trie.
#[cfg(feature = "metrics")]
metrics: crate::metrics::ParallelSparseTrieMetrics,
@@ -141,6 +144,7 @@ impl Default for ParallelSparseTrie {
branch_node_masks: BranchNodeMasksMap::default(),
update_actions_buffers: Vec::default(),
parallelism_thresholds: Default::default(),
subtrie_heat: SubtrieModifications::default(),
#[cfg(feature = "metrics")]
metrics: Default::default(),
}
@@ -909,7 +913,17 @@ impl SparseTrie for ParallelSparseTrie {
}
fn take_updates(&mut self) -> SparseTrieUpdates {
self.updates.take().unwrap_or_default()
match self.updates.take() {
Some(updates) => {
// NOTE: we need to preserve Some case
self.updates = Some(SparseTrieUpdates::with_capacity(
updates.updated_nodes.len(),
updates.removed_nodes.len(),
));
updates
}
None => SparseTrieUpdates::default(),
}
}
fn wipe(&mut self) {
@@ -917,6 +931,7 @@ impl SparseTrie for ParallelSparseTrie {
self.lower_subtries = [const { LowerSparseSubtrie::Blind(None) }; NUM_LOWER_SUBTRIES];
self.prefix_set = PrefixSetMut::all();
self.updates = self.updates.is_some().then(SparseTrieUpdates::wiped);
self.subtrie_heat.clear();
}
fn clear(&mut self) {
@@ -928,6 +943,7 @@ impl SparseTrie for ParallelSparseTrie {
self.prefix_set.clear();
self.updates = None;
self.branch_node_masks.clear();
self.subtrie_heat.clear();
// `update_actions_buffers` doesn't need to be cleared; we want to reuse the Vecs it has
// buffered, and all of those are already inherently cleared when they get used.
}
@@ -1032,21 +1048,22 @@ impl SparseTrie for ParallelSparseTrie {
}
impl SparseTrieExt for ParallelSparseTrie {
/// Returns the count of revealed (non-hash) nodes across all subtries.
fn revealed_node_count(&self) -> usize {
let upper_count = self.upper_subtrie.nodes.values().filter(|n| !n.is_hash()).count();
/// O(1) size hint based on total node count (including hash stubs).
fn size_hint(&self) -> usize {
let upper_count = self.upper_subtrie.nodes.len();
let lower_count: usize = self
.lower_subtries
.iter()
.filter_map(|s| s.as_revealed_ref())
.map(|s| s.nodes.values().filter(|n| !n.is_hash()).count())
.map(|s| s.nodes.len())
.sum();
upper_count + lower_count
}
fn prune(&mut self, max_depth: usize) -> usize {
// Decay heat for subtries not modified this cycle
self.subtrie_heat.decay_and_reset();
// DFS traversal to find nodes at max_depth that can be pruned.
// Collects "effective pruned roots" - children of nodes at max_depth with computed hashes.
// We replace nodes with Hash stubs inline during traversal.
@@ -1056,6 +1073,16 @@ impl SparseTrieExt for ParallelSparseTrie {
// DFS traversal: pop path and depth, skip if subtrie or node not found.
while let Some((path, depth)) = stack.pop() {
// Skip traversal into hot lower subtries beyond max_depth.
// At max_depth, we still need to process the node to convert children to hashes.
// This keeps frequently-modified subtries revealed to avoid expensive re-reveals.
if depth > max_depth &&
let SparseSubtrieType::Lower(idx) = SparseSubtrieType::from_path(&path) &&
self.subtrie_heat.is_hot(idx)
{
continue;
}
// Get children to visit from current node (immutable access)
let children: SmallVec<[Nibbles; 16]> = {
let Some(subtrie) = self.subtrie_for_path(&path) else { continue };
@@ -1096,10 +1123,11 @@ impl SparseTrieExt for ParallelSparseTrie {
.and_then(|n| n.hash());
if let Some(hash) = hash {
self.subtrie_for_path_mut(&child)
.nodes
.insert(child, SparseNode::Hash(hash));
effective_pruned_roots.push((child, hash));
// Use untracked access to avoid marking subtrie as modified during pruning
if let Some(subtrie) = self.subtrie_for_path_mut_untracked(&child) {
subtrie.nodes.insert(child, SparseNode::Hash(hash));
effective_pruned_roots.push((child, hash));
}
}
} else {
stack.push((child, depth + 1));
@@ -1211,8 +1239,8 @@ impl SparseTrieExt for ParallelSparseTrie {
Ok(()) => {}
Err(e) => {
if let Some(path) = Self::get_retriable_path(&e) {
let target_key = Self::nibbles_to_padded_b256(&path);
let min_len = (path.len() as u8).min(64);
let (target_key, min_len) =
Self::proof_target_for_path(key, &full_path, &path);
proof_required_fn(target_key, min_len);
updates.insert(key, LeafUpdate::Changed(value));
} else {
@@ -1225,8 +1253,8 @@ impl SparseTrieExt for ParallelSparseTrie {
if let Err(e) = self.update_leaf(full_path, value.clone(), NoRevealProvider)
{
if let Some(path) = Self::get_retriable_path(&e) {
let target_key = Self::nibbles_to_padded_b256(&path);
let min_len = (path.len() as u8).min(64);
let (target_key, min_len) =
Self::proof_target_for_path(key, &full_path, &path);
proof_required_fn(target_key, min_len);
updates.insert(key, LeafUpdate::Changed(value));
} else {
@@ -1239,8 +1267,8 @@ impl SparseTrieExt for ParallelSparseTrie {
// Touched is read-only: check if path is accessible, request proof if blinded.
match self.find_leaf(&full_path, None) {
Err(LeafLookupError::BlindedNode { path, .. }) => {
let target_key = Self::nibbles_to_padded_b256(&path);
let min_len = (path.len() as u8).min(64);
let (target_key, min_len) =
Self::proof_target_for_path(key, &full_path, &path);
proof_required_fn(target_key, min_len);
updates.insert(key, LeafUpdate::Touched);
}
@@ -1307,6 +1335,18 @@ impl ParallelSparseTrie {
B256::from(bytes)
}
/// Computes the proof target key and `min_len` for a blinded node error.
///
/// Returns `(target_key, min_len)` where:
/// - `target_key` is `full_key` if `path` is a prefix of `full_path`, otherwise the padded path
/// - `min_len` is always based on `path.len()`
fn proof_target_for_path(full_key: B256, full_path: &Nibbles, path: &Nibbles) -> (B256, u8) {
let min_len = (path.len() as u8).min(64);
let target_key =
if full_path.starts_with(path) { full_key } else { Self::nibbles_to_padded_b256(path) };
(target_key, min_len)
}
/// Rolls back a partial update by removing the value, removing any inserted nodes,
/// removing any inserted branch masks, and restoring any modified original node.
/// This ensures `update_leaf` is atomic - either it succeeds completely or leaves the trie
@@ -1381,6 +1421,7 @@ impl ParallelSparseTrie {
SparseSubtrieType::Upper => None,
SparseSubtrieType::Lower(idx) => {
self.lower_subtries[idx].reveal(path);
self.subtrie_heat.mark_modified(idx);
Some(self.lower_subtries[idx].as_revealed_mut().expect("just revealed"))
}
}
@@ -1416,6 +1457,19 @@ impl ParallelSparseTrie {
}
}
/// Returns a mutable reference to a subtrie without marking it as modified.
/// Used for internal operations like pruning that shouldn't affect heat tracking.
fn subtrie_for_path_mut_untracked(&mut self, path: &Nibbles) -> Option<&mut SparseSubtrie> {
if SparseSubtrieType::path_len_is_upper(path.len()) {
Some(&mut self.upper_subtrie)
} else {
match SparseSubtrieType::from_path(path) {
SparseSubtrieType::Upper => None,
SparseSubtrieType::Lower(idx) => self.lower_subtries[idx].as_revealed_mut(),
}
}
}
/// Returns the next node in the traversal path from the given path towards the leaf for the
/// given full leaf path, or an error if any node along the traversal path is not revealed.
///
@@ -2052,10 +2106,93 @@ impl ParallelSparseTrie {
}
self.lower_subtries[index] = LowerSparseSubtrie::Revealed(subtrie);
self.subtrie_heat.mark_modified(index);
}
}
}
/// Bitset tracking which of the 256 lower subtries were modified in the current cycle.
#[derive(Clone, Default, PartialEq, Eq, Debug)]
struct ModifiedSubtries([u64; 4]);
impl ModifiedSubtries {
/// Marks a subtrie index as modified.
#[inline]
fn set(&mut self, idx: usize) {
debug_assert!(idx < NUM_LOWER_SUBTRIES);
self.0[idx >> 6] |= 1 << (idx & 63);
}
/// Returns whether a subtrie index is marked as modified.
#[inline]
fn get(&self, idx: usize) -> bool {
debug_assert!(idx < NUM_LOWER_SUBTRIES);
(self.0[idx >> 6] & (1 << (idx & 63))) != 0
}
/// Clears all modification flags.
#[inline]
const fn clear(&mut self) {
self.0 = [0; 4];
}
}
/// Tracks heat (modification frequency) for each of the 256 lower subtries.
///
/// Heat is used to avoid pruning frequently-modified subtries, which would cause
/// expensive re-reveal operations on subsequent updates.
///
/// - Heat is incremented by 2 when a subtrie is modified
/// - Heat decays by 1 each prune cycle for subtries not modified that cycle
/// - Subtries with heat > 0 are considered "hot" and skipped during pruning
#[derive(Clone, PartialEq, Eq, Debug)]
struct SubtrieModifications {
/// Heat level (0-255) for each of the 256 lower subtries.
heat: [u8; NUM_LOWER_SUBTRIES],
/// Tracks which subtries were modified in the current cycle.
modified: ModifiedSubtries,
}
impl Default for SubtrieModifications {
fn default() -> Self {
Self { heat: [0; NUM_LOWER_SUBTRIES], modified: ModifiedSubtries::default() }
}
}
impl SubtrieModifications {
/// Marks a subtrie as modified, incrementing its heat by 1.
#[inline]
fn mark_modified(&mut self, idx: usize) {
debug_assert!(idx < NUM_LOWER_SUBTRIES);
self.modified.set(idx);
self.heat[idx] = self.heat[idx].saturating_add(1);
}
/// Returns whether a subtrie is currently hot (heat > 0).
#[inline]
fn is_hot(&self, idx: usize) -> bool {
debug_assert!(idx < NUM_LOWER_SUBTRIES);
self.heat[idx] > 0
}
/// Decays heat for subtries not modified this cycle and resets modification tracking.
/// Called at the start of each prune cycle.
fn decay_and_reset(&mut self) {
for (idx, heat) in self.heat.iter_mut().enumerate() {
if !self.modified.get(idx) {
*heat = heat.saturating_sub(1);
}
}
self.modified.clear();
}
/// Clears all heat tracking state.
const fn clear(&mut self) {
self.heat = [0; NUM_LOWER_SUBTRIES];
self.modified.clear();
}
}
/// This is a subtrie of the [`ParallelSparseTrie`] that contains a map from path to sparse trie
/// nodes.
#[derive(Clone, PartialEq, Eq, Debug, Default)]
@@ -7744,7 +7881,11 @@ mod tests {
#[test]
fn test_prune_at_various_depths() {
for max_depth in [0, 1, 2] {
// Test depths 0 and 1, which are in the Upper subtrie (no heat tracking).
// Depth 2 is the boundary where Lower subtries start (UPPER_TRIE_MAX_DEPTH=2),
// and with `depth >= max_depth` heat check, hot Lower subtries at depth 2
// are protected from pruning traversal.
for max_depth in [0, 1] {
let provider = DefaultTrieNodeProvider;
let mut trie = ParallelSparseTrie::default();
@@ -7764,21 +7905,27 @@ mod tests {
}
let root_before = trie.root();
let nodes_before = trie.revealed_node_count();
let nodes_before = trie.size_hint();
trie.prune(max_depth);
// Prune multiple times to allow heat to fully decay.
// Heat starts at 1 and decays by 1 each cycle for unmodified subtries,
// so we need 2 prune cycles: 1→0, then actual prune.
for _ in 0..2 {
trie.prune(max_depth);
}
let root_after = trie.root();
assert_eq!(root_before, root_after, "root hash should be preserved after prune");
let nodes_after = trie.revealed_node_count();
let nodes_after = trie.size_hint();
assert!(
nodes_after < nodes_before,
"node count should decrease after prune at depth {max_depth}"
);
if max_depth == 0 {
assert_eq!(nodes_after, 1, "only root should be revealed after prune(0)");
// Root + 4 hash stubs for children at [0], [1], [2], [3]
assert_eq!(nodes_after, 5, "root + 4 hash stubs after prune(0)");
}
}
}
@@ -7824,13 +7971,13 @@ mod tests {
trie.update_leaf(Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]), value, &provider).unwrap();
let root_before = trie.root();
let nodes_before = trie.revealed_node_count();
let nodes_before = trie.size_hint();
trie.prune(0);
let root_after = trie.root();
assert_eq!(root_before, root_after, "root hash should be preserved");
assert_eq!(trie.revealed_node_count(), nodes_before, "single leaf trie should not change");
assert_eq!(trie.size_hint(), nodes_before, "single leaf trie should not change");
}
#[test]
@@ -7846,11 +7993,11 @@ mod tests {
}
trie.root();
let nodes_before = trie.revealed_node_count();
let nodes_before = trie.size_hint();
trie.prune(100);
assert_eq!(nodes_before, trie.revealed_node_count(), "deep prune should have no effect");
assert_eq!(nodes_before, trie.size_hint(), "deep prune should have no effect");
}
#[test]
@@ -7866,10 +8013,16 @@ mod tests {
.unwrap();
let root_before = trie.root();
trie.prune(1);
// Prune multiple times to allow heat to fully decay.
// Heat starts at 1 and decays by 1 each cycle for unmodified subtries,
// so we need 2 prune cycles: 1→0, then actual prune.
for _ in 0..2 {
trie.prune(1);
}
assert_eq!(root_before, trie.root(), "root hash should be preserved");
assert_eq!(trie.revealed_node_count(), 2, "should have root + extension after prune(1)");
// Root + extension + 2 hash stubs (for the two leaves' parent branches)
assert_eq!(trie.size_hint(), 4, "root + extension + hash stubs after prune(1)");
}
#[test]
@@ -7882,13 +8035,13 @@ mod tests {
trie.update_leaf(Nibbles::from_nibbles([0x1]), small_value, &provider).unwrap();
let root_before = trie.root();
let nodes_before = trie.revealed_node_count();
let nodes_before = trie.size_hint();
trie.prune(0);
assert_eq!(root_before, trie.root(), "root hash must be preserved");
if trie.revealed_node_count() == nodes_before {
if trie.size_hint() == nodes_before {
assert!(trie.get_leaf_value(&Nibbles::from_nibbles([0x0])).is_some());
assert!(trie.get_leaf_value(&Nibbles::from_nibbles([0x1])).is_some());
}
@@ -7932,9 +8085,15 @@ mod tests {
}
let root_before = trie.root();
let pruned = trie.prune(1);
assert!(pruned > 0, "should have pruned some nodes");
// Prune multiple times to allow heat to fully decay.
// Heat starts at 1 and decays by 1 each cycle for unmodified subtries.
let mut total_pruned = 0;
for _ in 0..2 {
total_pruned += trie.prune(1);
}
assert!(total_pruned > 0, "should have pruned some nodes");
assert_eq!(root_before, trie.root(), "root hash should be preserved");
for key in &keys {
@@ -7956,14 +8115,14 @@ mod tests {
}
trie.root();
let nodes_before = trie.revealed_node_count();
let nodes_before = trie.size_hint();
// If depth were truncated to u8, 300 would become 44 and might prune something
trie.prune(300);
assert_eq!(
nodes_before,
trie.revealed_node_count(),
trie.size_hint(),
"prune(300) should have no effect on a shallow trie"
);
}

View File

@@ -5,12 +5,6 @@
extern crate alloc;
/// Default depth to prune sparse tries to for cross-payload caching.
pub const DEFAULT_SPARSE_TRIE_PRUNE_DEPTH: usize = 4;
/// Default number of storage tries to preserve across payload validations.
pub const DEFAULT_MAX_PRESERVED_STORAGE_TRIES: usize = 100;
mod state;
pub use state::*;

View File

@@ -19,6 +19,8 @@ use reth_trie_common::{
Nibbles, ProofTrieNode, RlpNode, StorageMultiProof, TrieAccount, TrieNode, EMPTY_ROOT_HASH,
TRIE_ACCOUNT_RLP_MAX_SIZE,
};
#[cfg(feature = "std")]
use tracing::debug;
use tracing::{instrument, trace};
/// Provides type-safe re-use of cleared [`SparseStateTrie`]s, which helps to save allocations
@@ -44,38 +46,25 @@ where
Self(trie)
}
/// Shrink the cleared sparse trie's capacity to the given node and value size.
/// This helps reduce memory usage when the trie has excess capacity.
/// The capacity is distributed equally across the account trie and all storage tries.
pub fn shrink_to(&mut self, node_size: usize, value_size: usize) {
// Count total number of storage tries (active + cleared + default)
let storage_tries_count = self.0.storage.tries.len() + self.0.storage.cleared_tries.len();
// Total tries = 1 account trie + all storage tries
let total_tries = 1 + storage_tries_count;
// Distribute capacity equally among all tries
let node_size_per_trie = node_size / total_tries;
let value_size_per_trie = value_size / total_tries;
// Shrink the account trie
self.0.state.shrink_nodes_to(node_size_per_trie);
self.0.state.shrink_values_to(value_size_per_trie);
// Give storage tries the remaining capacity after account trie allocation
let storage_node_size = node_size.saturating_sub(node_size_per_trie);
let storage_value_size = value_size.saturating_sub(value_size_per_trie);
// Shrink all storage tries (they will redistribute internally)
self.0.storage.shrink_to(storage_node_size, storage_value_size);
}
/// Returns the cleared [`SparseStateTrie`], consuming this instance.
pub fn into_inner(self) -> SparseStateTrie<A, S> {
self.0
}
}
impl<A, S> ClearedSparseStateTrie<A, S>
where
A: SparseTrieTrait + SparseTrieExt + Default,
S: SparseTrieTrait + SparseTrieExt + Default + Clone,
{
/// Shrink the cleared sparse trie's capacity to the given node and value size.
///
/// Delegates to the inner `SparseStateTrie::shrink_to`.
pub fn shrink_to(&mut self, max_nodes: usize, max_values: usize) {
self.0.shrink_to(max_nodes, max_values);
}
}
#[derive(Debug)]
/// Sparse state trie representing lazy-loaded Ethereum state trie.
pub struct SparseStateTrie<
@@ -224,11 +213,23 @@ where
self.storage.tries.remove(address)
}
/// Takes the storage trie for the provided address, creating a blind one if it doesn't exist.
pub fn take_or_create_storage_trie(&mut self, address: &B256) -> RevealableSparseTrie<S> {
self.storage.tries.remove(address).unwrap_or_else(|| {
self.storage.cleared_tries.pop().unwrap_or_else(|| self.storage.default_trie.clone())
})
}
/// Inserts storage trie for the provided address.
pub fn insert_storage_trie(&mut self, address: B256, storage_trie: RevealableSparseTrie<S>) {
self.storage.tries.insert(address, storage_trie);
}
/// Returns mutable reference to storage tries.
pub const fn storage_tries(&mut self) -> &mut B256Map<RevealableSparseTrie<S>> {
&mut self.storage.tries
}
/// Returns mutable reference to storage sparse trie, creating a blind one if it doesn't exist.
pub fn get_or_create_storage_trie_mut(
&mut self,
@@ -271,6 +272,8 @@ where
{
for (account, storage_subtree) in storages {
self.reveal_decoded_storage_multiproof(account, storage_subtree)?;
// Mark this storage trie as hot (accessed this tick)
self.storage.modifications.mark_accessed(account);
}
Ok(())
@@ -313,6 +316,8 @@ where
for (account, revealed_nodes, trie, result) in results {
self.storage.revealed_paths.insert(account, revealed_nodes);
self.storage.tries.insert(account, trie);
// Mark this storage trie as hot (accessed this tick)
self.storage.modifications.mark_accessed(account);
if let Ok(_metric_values) = result {
#[cfg(feature = "metrics")]
{
@@ -353,6 +358,8 @@ where
{
for (account, storage_proofs) in multiproof.storage_proofs {
self.reveal_storage_v2_proof_nodes(account, storage_proofs)?;
// Mark this storage trie as hot (accessed this tick)
self.storage.modifications.mark_accessed(account);
}
Ok(())
@@ -393,6 +400,8 @@ where
for (account, result, revealed_nodes, trie) in results {
self.storage.revealed_paths.insert(account, revealed_nodes);
self.storage.tries.insert(account, trie);
// Mark this storage trie as hot (accessed this tick)
self.storage.modifications.mark_accessed(account);
if let Ok(_metric_values) = result {
#[cfg(feature = "metrics")]
{
@@ -993,23 +1002,42 @@ where
A: SparseTrieTrait + SparseTrieExt + Default,
S: SparseTrieTrait + SparseTrieExt + Default + Clone,
{
/// Minimum number of storage tries before parallel pruning is enabled.
#[cfg(feature = "std")]
const PARALLEL_PRUNE_THRESHOLD: usize = 16;
/// Clears all trie data while preserving allocations for reuse.
///
/// This resets the trie to an empty state but keeps the underlying memory allocations,
/// which can significantly reduce allocation overhead when the trie is reused.
pub fn clear(&mut self) {
self.state = core::mem::take(&mut self.state).clear();
self.revealed_account_paths.clear();
self.storage.clear();
self.account_rlp_buf.clear();
}
/// Returns true if parallelism should be enabled for pruning the given number of tries.
/// Will always return false in `no_std` builds.
const fn is_prune_parallelism_enabled(num_tries: usize) -> bool {
#[cfg(not(feature = "std"))]
{
let _ = num_tries;
return false;
}
/// Shrinks the capacity of the sparse trie to the given node and value sizes.
///
/// This helps reduce memory usage when the trie has excess capacity.
/// Distributes capacity equally among all tries (account + storage).
pub fn shrink_to(&mut self, max_nodes: usize, max_values: usize) {
// Count total number of storage tries (active + cleared)
let storage_tries_count = self.storage.tries.len() + self.storage.cleared_tries.len();
#[cfg(feature = "std")]
{
num_tries >= Self::PARALLEL_PRUNE_THRESHOLD
}
// Total tries = 1 account trie + all storage tries
let total_tries = 1 + storage_tries_count;
// Distribute capacity equally among all tries
let nodes_per_trie = max_nodes / total_tries;
let values_per_trie = max_values / total_tries;
// Shrink the account trie
self.state.shrink_nodes_to(nodes_per_trie);
self.state.shrink_values_to(values_per_trie);
// Give storage tries the remaining capacity after account trie allocation
let storage_nodes = max_nodes.saturating_sub(nodes_per_trie);
let storage_values = max_values.saturating_sub(values_per_trie);
// Shrink all storage tries (they will redistribute internally)
self.storage.shrink_to(storage_nodes, storage_values);
}
/// Prunes the account trie and selected storage tries to reduce memory usage.
@@ -1025,84 +1053,21 @@ where
/// # Effects
///
/// - Clears `revealed_account_paths` and `revealed_paths` for all storage tries
#[cfg(feature = "std")]
#[instrument(target = "trie::sparse", skip_all, fields(max_depth, max_storage_tries))]
pub fn prune(&mut self, max_depth: usize, max_storage_tries: usize) {
if let Some(trie) = self.state.as_revealed_mut() {
trie.prune(max_depth);
}
self.revealed_account_paths.clear();
let mut storage_trie_counts: Vec<(B256, usize)> = self
.storage
.tries
.iter()
.map(|(hash, trie)| {
let count = match trie {
RevealableSparseTrie::Revealed(t) => t.revealed_node_count(),
RevealableSparseTrie::Blind(_) => 0,
};
(*hash, count)
})
.collect();
// Use O(n) selection instead of O(n log n) sort
let tries_to_keep: HashSet<B256> = if storage_trie_counts.len() <= max_storage_tries {
storage_trie_counts.iter().map(|(hash, _)| *hash).collect()
} else {
storage_trie_counts
.select_nth_unstable_by(max_storage_tries.saturating_sub(1), |a, b| b.1.cmp(&a.1));
storage_trie_counts[..max_storage_tries].iter().map(|(hash, _)| *hash).collect()
};
// Collect keys to avoid borrow conflict
let tries_to_clear: Vec<B256> = self
.storage
.tries
.keys()
.filter(|hash| !tries_to_keep.contains(*hash))
.copied()
.collect();
// Evict storage tries that exceeded limit, saving cleared allocations for reuse
for hash in tries_to_clear {
if let Some(trie) = self.storage.tries.remove(&hash) {
self.storage.cleared_tries.push(trie.clear());
}
if let Some(mut paths) = self.storage.revealed_paths.remove(&hash) {
paths.clear();
self.storage.cleared_revealed_paths.push(paths);
}
}
// Prune storage tries that are kept
if Self::is_prune_parallelism_enabled(tries_to_keep.len()) {
#[cfg(feature = "std")]
{
use rayon::prelude::*;
self.storage.tries.par_iter_mut().for_each(|(hash, trie)| {
if tries_to_keep.contains(hash) &&
let Some(t) = trie.as_revealed_mut()
{
t.prune(max_depth);
}
});
}
} else {
for hash in &tries_to_keep {
if let Some(trie) =
self.storage.tries.get_mut(hash).and_then(|t| t.as_revealed_mut())
{
// Prune state and storage tries in parallel
rayon::join(
|| {
if let Some(trie) = self.state.as_revealed_mut() {
trie.prune(max_depth);
}
}
}
// Clear revealed_paths for kept tries
for hash in &tries_to_keep {
if let Some(paths) = self.storage.revealed_paths.get_mut(hash) {
paths.clear();
}
}
self.revealed_account_paths.clear();
},
|| {
self.storage.prune(max_depth, max_storage_tries);
},
);
}
}
@@ -1121,6 +1086,119 @@ struct StorageTries<S = SerialSparseTrie> {
cleared_revealed_paths: Vec<HashSet<Nibbles>>,
/// A default cleared trie instance, which will be cloned when creating new tries.
default_trie: RevealableSparseTrie<S>,
/// Tracks access patterns and modification state of storage tries for smart pruning decisions.
modifications: StorageTrieModifications,
}
#[cfg(feature = "std")]
impl<S: SparseTrieTrait + SparseTrieExt> StorageTries<S> {
/// Prunes and evicts storage tries.
///
/// Keeps the top `max_storage_tries` by a score combining size and heat.
/// Evicts lower-scored tries entirely, prunes kept tries to `max_depth`.
fn prune(&mut self, max_depth: usize, max_storage_tries: usize) {
let fn_start = std::time::Instant::now();
let mut stats =
StorageTriesPruneStats { total_tries_before: self.tries.len(), ..Default::default() };
// Update heat for accessed tries
self.modifications.update_and_reset();
// Collect (address, size, score) for all tries
// Score = size * heat_multiplier
// Hot tries (high heat) get boosted weight
let mut trie_info: Vec<(B256, usize, usize)> = self
.tries
.iter()
.map(|(address, trie)| {
let size = match trie {
RevealableSparseTrie::Blind(_) => return (*address, 0, 0),
RevealableSparseTrie::Revealed(t) => t.size_hint(),
};
let heat = self.modifications.heat(address);
// Heat multiplier: 1 (cold) to 3 (very hot, heat >= 4)
let heat_multiplier = 1 + (heat.min(4) / 2) as usize;
(*address, size, size * heat_multiplier)
})
.collect();
// Use O(n) selection to find top max_storage_tries by score
if trie_info.len() > max_storage_tries {
trie_info
.select_nth_unstable_by(max_storage_tries.saturating_sub(1), |a, b| b.2.cmp(&a.2));
trie_info.truncate(max_storage_tries);
}
let tries_to_keep: B256Map<usize> =
trie_info.iter().map(|(address, size, _)| (*address, *size)).collect();
stats.tries_to_keep = tries_to_keep.len();
// Collect keys to evict
let tries_to_clear: Vec<B256> =
self.tries.keys().filter(|addr| !tries_to_keep.contains_key(*addr)).copied().collect();
stats.tries_to_evict = tries_to_clear.len();
// Evict storage tries that exceeded limit, saving cleared allocations for reuse
for address in &tries_to_clear {
if let Some(trie) = self.tries.remove(address) {
self.cleared_tries.push(trie.clear());
}
if let Some(mut paths) = self.revealed_paths.remove(address) {
paths.clear();
self.cleared_revealed_paths.push(paths);
}
self.modifications.remove(address);
}
// Prune storage tries that are kept, but only if:
// - They haven't been pruned since last access
// - They're large enough to be worth pruning
const MIN_SIZE_TO_PRUNE: usize = 1000;
let prune_start = std::time::Instant::now();
for (address, size) in &tries_to_keep {
if *size < MIN_SIZE_TO_PRUNE {
stats.skipped_small += 1;
continue; // Small tries aren't worth the DFS cost
}
let Some(heat_state) = self.modifications.get_mut(address) else {
continue; // No heat state = not tracked
};
// Only prune if backlog >= 2 (skip every other cycle)
if heat_state.prune_backlog < 2 {
stats.skipped_recently_pruned += 1;
continue; // Recently pruned, skip this cycle
}
if let Some(trie) = self.tries.get_mut(address).and_then(|t| t.as_revealed_mut()) {
trie.prune(max_depth);
heat_state.prune_backlog = 0; // Reset backlog after prune
stats.pruned_count += 1;
}
}
stats.prune_elapsed = prune_start.elapsed();
// Clear revealed_paths for kept tries
for hash in tries_to_keep.keys() {
if let Some(paths) = self.revealed_paths.get_mut(hash) {
paths.clear();
}
}
stats.total_tries_after = self.tries.len();
stats.total_elapsed = fn_start.elapsed();
debug!(
target: "trie::sparse",
before = stats.total_tries_before,
after = stats.total_tries_after,
kept = stats.tries_to_keep,
evicted = stats.tries_to_evict,
pruned = stats.pruned_count,
skipped_small = stats.skipped_small,
skipped_recent = stats.skipped_recently_pruned,
?stats.prune_elapsed,
?stats.total_elapsed,
"StorageTries::prune completed"
);
}
}
impl<S: SparseTrieTrait> StorageTries<S> {
@@ -1132,30 +1210,32 @@ impl<S: SparseTrieTrait> StorageTries<S> {
set.clear();
set
}));
self.modifications.clear();
}
/// Shrinks the capacity of all storage tries (active, cleared, and default) to the given sizes.
/// The capacity is distributed equally among all tries that have allocations.
fn shrink_to(&mut self, node_size: usize, value_size: usize) {
// Count total number of tries with capacity (active + cleared + default)
let active_count = self.tries.len();
let cleared_count = self.cleared_tries.len();
let total_tries = 1 + active_count + cleared_count;
/// Shrinks the capacity of all storage tries to the given total sizes.
///
/// Distributes capacity equally among all tries (active + cleared).
fn shrink_to(&mut self, max_nodes: usize, max_values: usize) {
let total_tries = self.tries.len() + self.cleared_tries.len();
if total_tries == 0 {
return;
}
// Distribute capacity equally among all tries
let node_size_per_trie = node_size / total_tries;
let value_size_per_trie = value_size / total_tries;
let nodes_per_trie = max_nodes / total_tries;
let values_per_trie = max_values / total_tries;
// Shrink active storage tries
for trie in self.tries.values_mut() {
trie.shrink_nodes_to(node_size_per_trie);
trie.shrink_values_to(value_size_per_trie);
trie.shrink_nodes_to(nodes_per_trie);
trie.shrink_values_to(values_per_trie);
}
// Shrink cleared storage tries
for trie in &mut self.cleared_tries {
trie.shrink_nodes_to(node_size_per_trie);
trie.shrink_values_to(value_size_per_trie);
trie.shrink_nodes_to(nodes_per_trie);
trie.shrink_values_to(values_per_trie);
}
}
}
@@ -1213,6 +1293,96 @@ impl<S: SparseTrieTrait + Clone> StorageTries<S> {
}
}
/// Statistics from a storage tries prune operation.
#[derive(Debug, Default)]
#[allow(dead_code)]
struct StorageTriesPruneStats {
total_tries_before: usize,
total_tries_after: usize,
tries_to_keep: usize,
tries_to_evict: usize,
pruned_count: usize,
skipped_small: usize,
skipped_recently_pruned: usize,
prune_elapsed: core::time::Duration,
total_elapsed: core::time::Duration,
}
/// Per-trie access tracking and prune state.
///
/// Tracks how frequently a storage trie is accessed and when it was last pruned,
/// enabling smart pruning decisions that preserve frequently-used tries.
#[derive(Debug, Clone, Copy, Default)]
#[allow(dead_code)]
struct TrieModificationState {
/// Access frequency level (0-255). Incremented each cycle the trie is accessed.
/// Used for prioritizing which tries to keep during pruning.
heat: u8,
/// Prune backlog - cycles since last prune. Incremented each cycle,
/// reset to 0 when pruned. Used to decide when pruning is needed.
prune_backlog: u8,
}
/// Tracks access patterns and modification state of storage tries for smart pruning decisions.
///
/// Access-based tracking is more accurate than simple generation counting because it tracks
/// actual access patterns rather than administrative operations (take/insert).
///
/// - Access frequency is incremented when a storage proof is revealed (accessed)
/// - Access frequency decays each prune cycle for tries not accessed that cycle
/// - Tries with higher access frequency are prioritized for preservation during pruning
#[derive(Debug, Default)]
struct StorageTrieModifications {
/// Access frequency and prune state per storage trie address.
state: B256Map<TrieModificationState>,
/// Tracks which tries were accessed in the current cycle (between prune calls).
accessed_this_cycle: HashSet<B256>,
}
#[allow(dead_code)]
impl StorageTrieModifications {
/// Marks a storage trie as accessed this cycle.
/// Heat and `prune_backlog` are updated in [`Self::update_and_reset`].
#[inline]
fn mark_accessed(&mut self, address: B256) {
self.accessed_this_cycle.insert(address);
}
/// Returns mutable reference to the heat state for a storage trie.
#[inline]
fn get_mut(&mut self, address: &B256) -> Option<&mut TrieModificationState> {
self.state.get_mut(address)
}
/// Returns the heat level for a storage trie (0 if not tracked).
#[inline]
fn heat(&self, address: &B256) -> u8 {
self.state.get(address).map_or(0, |s| s.heat)
}
/// Updates heat and prune backlog for accessed tries.
/// Called at the start of each prune cycle.
fn update_and_reset(&mut self) {
for address in self.accessed_this_cycle.drain() {
let entry = self.state.entry(address).or_default();
entry.heat = entry.heat.saturating_add(1);
entry.prune_backlog = entry.prune_backlog.saturating_add(1);
}
}
/// Removes tracking for a specific address (when trie is evicted).
fn remove(&mut self, address: &B256) {
self.state.remove(address);
self.accessed_this_cycle.remove(address);
}
/// Clears all heat tracking state.
fn clear(&mut self) {
self.state.clear();
self.accessed_this_cycle.clear();
}
}
#[derive(Debug, PartialEq, Eq, Default)]
struct ProofNodesMetricValues {
/// Number of nodes in the proof.

View File

@@ -249,8 +249,12 @@ pub trait SparseTrie: Sized + Debug + Send + Sync {
/// converting nodes beyond a certain depth into hash stubs. This is useful for reducing
/// memory usage when caching tries across payload validations.
pub trait SparseTrieExt: SparseTrie {
/// Returns the number of revealed (non-Hash) nodes in the trie.
fn revealed_node_count(&self) -> usize;
/// Returns a cheap O(1) size hint for the trie representing the count of revealed
/// (non-Hash) nodes.
///
/// This is used as a heuristic for prioritizing which storage tries to keep
/// during pruning. Larger values indicate larger tries that are more valuable to preserve.
fn size_hint(&self) -> usize;
/// Replaces nodes beyond `max_depth` with hash stubs and removes their descendants.
///
@@ -310,6 +314,17 @@ pub struct SparseTrieUpdates {
pub wiped: bool,
}
impl SparseTrieUpdates {
/// Initialize a [`Self`] with given capacities.
pub fn with_capacity(num_updated_nodes: usize, num_removed_nodes: usize) -> Self {
Self {
updated_nodes: HashMap::with_capacity_and_hasher(num_updated_nodes, Default::default()),
removed_nodes: HashSet::with_capacity_and_hasher(num_removed_nodes, Default::default()),
wiped: false,
}
}
}
/// Error type for a leaf lookup operation
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LeafLookupError {

View File

@@ -979,7 +979,17 @@ impl SparseTrieTrait for SerialSparseTrie {
}
fn take_updates(&mut self) -> SparseTrieUpdates {
self.updates.take().unwrap_or_default()
match self.updates.take() {
Some(updates) => {
// NOTE: we need to preserve Some case
self.updates = Some(SparseTrieUpdates::with_capacity(
updates.updated_nodes.len(),
updates.removed_nodes.len(),
));
updates
}
None => SparseTrieUpdates::default(),
}
}
fn wipe(&mut self) {