Compare commits

..

134 Commits

Author SHA1 Message Date
Arsenii Kulikov
1549e93eac fix 2026-02-01 19:00:59 +04:00
Arsenii Kulikov
48e1270dec wip 2026-02-01 18:19:52 +04:00
Georgios Konstantopoulos
821974a6a6 fix(trie): use full_key in proof_required_fn when path is prefix
When calling proof_required_fn in update_leaves, use the original full_key
instead of the padded path when path is a prefix of full_path. This ensures
proof targets are correctly associated with the original key for cache lookups.

The min_len is always derived from path.len() regardless of which key is used.

Introduces proof_target_for_path helper to DRY up the logic across all three
call sites (removal, update/insert, and touched).
2026-01-31 11:53:26 +00:00
Arsenii Kulikov
6183adb2db Merge branch 'klkvr/sparse-trie-cache' into klkvr/sparse-trie-cache-preserve 2026-01-30 23:40:00 +04:00
Arsenii Kulikov
1b0c54a0a9 wip 2026-01-30 22:50:49 +04:00
Arsenii Kulikov
3aaa8daefc wip 2026-01-30 22:40:27 +04:00
Arsenii Kulikov
bb97b80116 wip 2026-01-30 22:25:33 +04:00
Arsenii Kulikov
81d1aa1eb4 Merge branch 'mattsse/sparse-trie-preservation' into klkvr/sparse-trie-cache-preserve 2026-01-30 21:55:32 +04:00
Arsenii Kulikov
d6d1a090e5 fix 2026-01-30 21:51:55 +04:00
Arsenii Kulikov
b52700dd95 Merge branch 'main' into klkvr/sparse-trie-cache 2026-01-30 21:50:32 +04:00
Matthias Seitz
65201a1abf fix docs 2026-01-30 18:49:43 +01:00
Matthias Seitz
b7cb06b88f update take updates 2026-01-30 18:42:11 +01:00
Arsenii Kulikov
5b86a17331 fix 2026-01-30 21:25:30 +04:00
Arsenii Kulikov
914ab7d5e6 fix 2026-01-30 21:22:53 +04:00
Matthias Seitz
7e38827df5 docs: fix broken intra-doc link in StorageSettings
Amp-Thread-ID: https://ampcode.com/threads/T-019c0fdd-3e07-75c3-aa3d-b5a22cdb19bc
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 18:18:59 +01:00
Arsenii Kulikov
dd2c5b7c53 wip 2026-01-30 21:18:53 +04:00
Matthias Seitz
ed528fb975 fix(trie): update prune test expectations for hash stub counting
Amp-Thread-ID: https://ampcode.com/threads/T-019c0fdd-3e07-75c3-aa3d-b5a22cdb19bc
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 18:12:29 +01:00
Arsenii Kulikov
25a97f0be4 wip 2026-01-30 20:59:13 +04:00
Arsenii Kulikov
844e531a3f wip 2026-01-30 20:55:49 +04:00
Matthias Seitz
ef860c20fb refactor: PreservedSparseTrie as enum, anchor by state root
- Make PreservedSparseTrie an enum with Anchored and Cleared variants
- Anchor trie by computed state_root instead of block_hash
- Add parent_state_root to ExecutionEnv for continuation checks
- Feature-gate prune functions with cfg(feature = "std")
- Add reveal-on-first-try hitrate tracking to update_leaves
- Add prune stats (nodes_converted, hot_subtries_skipped)

Amp-Thread-ID: https://ampcode.com/threads/T-019c0f54-ed9a-7599-a6a2-827a04d388c2
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 17:46:13 +01:00
Arsenii Kulikov
755d1e5f61 wip 2026-01-30 20:45:34 +04:00
Arsenii Kulikov
7d1bd7c9f8 wip 2026-01-30 20:41:19 +04:00
Arsenii Kulikov
93c2455cef wip 2026-01-30 20:39:51 +04:00
Arsenii Kulikov
6c661eb868 instrument 2026-01-30 20:36:25 +04:00
Brian Picciano
f90b5c8a7f fix(trie): cleanup modified branch masks in update_leaf on reveal failure (#21629) 2026-01-30 16:06:28 +00:00
Matthias Seitz
c8b84404ae fix: typo and update revealed_node_count to size_hint in test
Amp-Thread-ID: https://ampcode.com/threads/T-019c0f54-ed9a-7599-a6a2-827a04d388c2
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 16:41:50 +01:00
Arsenii Kulikov
b722a7f360 wip 2026-01-30 19:21:31 +04:00
Chase Wright
d4fa6806b7 fix(ethstats): WSS Handling (#21595)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 15:15:41 +00:00
Matthias Seitz
63742ab4ae fix(debug-client): fix off-by-one in block hash buffer lookup (#21628)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 15:15:26 +00:00
Matthias Seitz
f0eb2aad7c refactor(trie): rename Heat structs to Modified and cleanup
- Rename HotSubtries to ModifiedSubtries
- Rename SubtrieHeat to SubtrieModifications
- Rename TrieHeatState to TrieModificationState
- Rename StorageTrieHeat to StorageTrieModifications
- Merge run/run_inner into single run method in sparse_trie.rs
- Feature-gate rayon::join with cfg(feature = "std")

Amp-Thread-ID: https://ampcode.com/threads/T-019c0f54-ed9a-7599-a6a2-827a04d388c2
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 16:14:41 +01:00
Arsenii Kulikov
ea6da11cf4 wip 2026-01-30 18:55:13 +04:00
Arsenii Kulikov
20029d3c1a wip 2026-01-30 18:52:21 +04:00
Arsenii Kulikov
40eb3bb7b4 wip 2026-01-30 18:42:24 +04:00
Matthias Seitz
8875c8da25 Merge branch 'main' into mattsse/sparse-trie-preservation 2026-01-30 15:35:11 +01:00
Arsenii Kulikov
d87b48c9fc wip 2026-01-30 18:31:52 +04:00
Matthias Seitz
08122bc1ea perf: use biased select and prioritize engine events (#21556)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 14:31:27 +00:00
Arsenii Kulikov
95778a0cc1 wip 2026-01-30 18:26:01 +04:00
Arsenii Kulikov
17359dadd9 wip 2026-01-30 18:19:23 +04:00
Georgios Konstantopoulos
83afaf1aa7 feat(grafana): add gauge panels for save_blocks _last metrics (#21604)
Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
2026-01-30 14:08:32 +00:00
Alexey Shekhirin
d72300c685 fix(net): include disconnect reason in P2PStreamError display (#21626) 2026-01-30 14:04:58 +00:00
Matthias Seitz
faf64c712e feat(cli): add reth db state command for historical contract storage (#21570)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 14:03:19 +00:00
theo
b3d532ce9d chore(op-reth): move op-dependent examples into crates/optimism/examples/ (#21495)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-01-30 14:02:12 +00:00
Georgios Konstantopoulos
9d064be77e feat(rpc): add EIP-7934 block size validation to testing_buildBlockV1 (#21623)
Co-authored-by: Alexey <alexey@tempo.xyz>
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 13:57:51 +00:00
Arsenii Kulikov
bab01a7bba wip 2026-01-30 17:27:51 +04:00
Arsenii Kulikov
82960045c9 wip 2026-01-30 17:25:15 +04:00
Brian Picciano
513fca16e9 always clear Cleared variant 2026-01-30 14:25:03 +01:00
Brian Picciano
ad242a2002 Restore conditional spawn of SparseTrieCacheTask 2026-01-30 14:21:15 +01:00
Matus Kysel
e3c256340e feat(txpool): add EIP-7594 blob sidecar toggle (#21622) 2026-01-30 12:27:06 +00:00
Arsenii Kulikov
840d6066a6 wip 2026-01-30 16:15:19 +04:00
Brian Picciano
1f315b4f3d Merge remote-tracking branch 'origin/main' into mattsse/sparse-trie-preservation 2026-01-30 13:11:36 +01:00
Arsenii Kulikov
fa604aa3dc wip 2026-01-30 16:02:28 +04:00
ligt
d0df549ddb chore(engine-tree): simplify impl trait bound (#21621) 2026-01-30 11:55:23 +00:00
Arsenii Kulikov
4b851acfdb wip 2026-01-30 15:53:15 +04:00
Arsenii Kulikov
7ccb43ea13 perf: cache fetched proof targets in SparseTrieCacheTask (#21612)
Co-authored-by: Brian Picciano <me@mediocregopher.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 11:44:36 +00:00
Arsenii Kulikov
785933a8ef wip 2026-01-30 15:38:08 +04:00
Arsenii Kulikov
20f48b1e50 fix(proof_v2): make sure that all storage proofs are delivered (#21611)
Co-authored-by: Brian Picciano <me@mediocregopher.com>
2026-01-30 11:21:17 +00:00
Arsenii Kulikov
eb5455de68 wip 2026-01-30 14:38:58 +04:00
Arsenii Kulikov
cb9bf1fe3a wip 2026-01-30 14:34:38 +04:00
Matthias Seitz
99973c7781 chore: add debug println statements for size hints and prune stats
Amp-Thread-ID: https://ampcode.com/threads/T-019c0cef-835a-722d-91d8-5bed02d86747
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 04:44:40 +01:00
Matthias Seitz
9d89d4cb5e feat(trie): add heat tracking to ParallelSparseTrie pruning
- Add SubtrieHeat to track modification frequency of lower subtries
- Hot subtries (heat > 0) are skipped during pruning to avoid expensive re-reveals
- Heat increments by 2 on modification, decays by 1 each prune cycle
- Add subtrie_for_path_mut_untracked for internal operations that shouldn't affect heat
- Update tests to account for heat decay cycles

Amp-Thread-ID: https://ampcode.com/threads/T-019c0cef-835a-722d-91d8-5bed02d86747
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 04:32:25 +01:00
Dan Cline
0470c65e6c feat(cli): add --metrics param to reth prune (#21613)
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 03:24:25 +00:00
Matthias Seitz
f07b37f029 fix: restore shrink_to logic to distribute capacity among all tries
Distribute capacity equally among account trie and all storage tries
(both active and cleared) for proper memory management.

Amp-Thread-ID: https://ampcode.com/threads/T-019c0c7a-dd98-7419-a379-2aff05f2375d
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 04:20:23 +01:00
Matthias Seitz
10b87c8a2e feat(trie): add heat-based tracking with prune backlog for storage tries
- Replace generation-based recency tracking with heat-based tracking
- Track heat (incremented on access) and prune_backlog (cycles since last prune)
- Prune every other cycle using prune_backlog threshold
- Add StorageTriesPruneStats for debug logging of prune operations
- Use debug! logging instead of println for timing information

Amp-Thread-ID: https://ampcode.com/threads/T-019c0c7a-dd98-7419-a379-2aff05f2375d
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 03:05:36 +01:00
Georgios Konstantopoulos
9de1f0905e feat(prune): add static file pruning support for sender recovery (#21598) 2026-01-30 01:09:38 +00:00
Matthias Seitz
095d021969 fix: track heat on storage reveal instead of leaf update
Heat should be tracked when storage proofs are revealed (actual access),
not on leaf updates. This is more accurate because:
- Reveals indicate the storage was actually needed for this block
- Multiple updates to the same storage in one block count as one access
- The modified_this_cycle deduplication handles repeated reveals

Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 02:09:28 +01:00
Matthias Seitz
0c192ef514 refactor: move StorageTrieHeat after StorageTries impl blocks
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 02:01:47 +01:00
Matthias Seitz
946c74a538 feat: replace generation-based recency with heat-based tracking for storage tries
- Add StorageTrieHeat struct with decay mechanism similar to SubtrieHeat
- Heat incremented by 2 on actual leaf updates (not insert/remove operations)
- Heat decays by 1 each prune cycle for unmodified tries
- Score = size * heat_multiplier for smarter eviction decisions
- Only prune hot tries (already pruned cold ones in previous cycles)

Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 02:00:18 +01:00
joshieDo
327a1a6681 test(stages): add pipeline forward sync and unwind test (#21602)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 00:49:28 +00:00
Matthias Seitz
2455fa4ff5 chore: restore SparseTrieUpdates::with_capacity and take_updates pre-allocation
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 01:17:10 +01:00
Matthias Seitz
0585dc1180 chore: add detailed timing debug for StorageTries::prune
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 01:08:32 +01:00
Matthias Seitz
8b985c102a chore: add timing debug for prune branches
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 01:07:19 +01:00
Matthias Seitz
6fefbfc65f chore: remove unused requested_proof_targets field, add timing debug
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 01:01:35 +01:00
Matthias Seitz
2012602909 chore: restore SparseTrieUpdates::with_capacity and take_updates pre-allocation
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 00:44:21 +01:00
Matthias Seitz
c84b3475a4 chore: simplify take_updates and remove SparseTrieUpdates::with_capacity
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 00:40:58 +01:00
Arsenii Kulikov
f3107565f3 clippy 2026-01-30 03:25:42 +04:00
Arsenii Kulikov
ac6a0fa372 wip 2026-01-30 03:14:57 +04:00
Arsenii Kulikov
5899cd4188 fix 2026-01-30 03:14:42 +04:00
Arsenii Kulikov
a3b76591b7 wip 2026-01-30 03:08:49 +04:00
Dan Cline
b8f27b73ad chore: fix unused parallel trie const without std (#21610) 2026-01-29 23:05:32 +00:00
かりんとう
7ec5ff6483 refactor(reth-bench): dedupe derive_ws_rpc_url helper (#21585) 2026-01-29 22:50:22 +00:00
Georgios Konstantopoulos
f98af4ad9f feat(rpc): default --testing.skip-invalid-transactions to true (#21603)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 22:03:19 +00:00
Matthias Seitz
ab84d61f91 chore: simplify take_updates implementation
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 22:59:44 +01:00
Matthias Seitz
8c1724af13 chore: remove parallel iteration from shrink_to
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 22:58:05 +01:00
joshieDo
d8e912f66b fix(provider): prune account changesets from static files in remove_state_above (#21605) 2026-01-29 21:57:28 +00:00
Georgios Konstantopoulos
3f6354c2a4 chore: fmt
Amp-Thread-ID: https://ampcode.com/threads/T-019c0b00-7c38-74f9-9349-4ca17922c279
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 21:52:38 +00:00
Matthias Seitz
ba64eb5fc7 merge: origin/main into mattsse/sparse-trie-preservation
Merged upstream changes with the sparse trie preservation feature:
- Resolved conflicts in TreeConfig, payload processor, and sparse trie modules
- Kept simplified generation-based recency tracking for storage tries
- Preserved score-based pruning strategy (size * recency)
- Retained size_hint() O(1) optimization for SparseTrieExt

Amp-Thread-ID: https://ampcode.com/threads/T-019c0bbc-3a64-7447-a4be-59edb249fc5f
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 22:50:50 +01:00
joshieDo
67a7a1c2d1 chore: revert "test(stages): add pipeline forward sync and unwind test" (#21601) 2026-01-29 22:36:47 +01:00
Georgios Konstantopoulos
0572c4e0ca feat(metrics): add _last gauge metrics for save_blocks timings (#21597) 2026-01-29 21:34:27 +00:00
Arsenii Kulikov
1b2935763b wip 2026-01-30 01:16:06 +04:00
joshieDo
2b1833576b test(stages): add pipeline forward sync and unwind test (#21553)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 21:13:07 +00:00
Dan Cline
5592c362d4 feat(grafana): add reth-persistence dashboard (#21594) 2026-01-29 21:05:07 +00:00
Matthias Seitz
cf3eef1874 chore: remove debug timing prints
Amp-Thread-ID: https://ampcode.com/threads/T-019c0b54-f8f0-7608-851b-e3d22922f2ff
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 21:55:49 +01:00
Matthias Seitz
cc3f3d7062 perf(trie): simplify sparse trie pruning and shrinking
- Add size_hint() to SparseTrieExt trait for O(1) size estimation
- Simplify storage trie eviction: keep top N by score (size * recency_multiplier)
- Only prune recently modified tries (age <= 2) above size threshold (1000 nodes)
- Fix shrink_to: don't shrink cleared tries, simpler capacity distribution
- Remove LRU dirty tracking complexity in favor of generation-based recency

Amp-Thread-ID: https://ampcode.com/threads/T-019c0b54-f8f0-7608-851b-e3d22922f2ff
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 21:55:04 +01:00
Georgios Konstantopoulos
6beec25f43 fix(grafana): order MerkleChangeSets checkpoint after MerkleExecute (#21581) 2026-01-29 20:40:26 +00:00
Matthias Seitz
92ce4f3af0 wip 2026-01-29 20:54:57 +01:00
Arsenii Kulikov
19bf580f93 feat: sparse trie as cache (#21583)
Co-authored-by: yongkangc <chiayongkang@hotmail.com>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
Co-authored-by: Brian Picciano <me@mediocregopher.com>
2026-01-29 19:11:48 +00:00
Georgios Konstantopoulos
28e5911482 Revert "fix(trie): address clippy lints for sparse trie pruning"
This reverts commit 257ccd41e9.
2026-01-29 18:35:03 +00:00
Georgios Konstantopoulos
257ccd41e9 fix(trie): address clippy lints for sparse trie pruning
Amp-Thread-ID: https://ampcode.com/threads/T-019c0b00-7c38-74f9-9349-4ca17922c279
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 18:32:18 +00:00
joshieDo
796ba6d5dc chore(trie): remove unused direct MDBX changeset readers (#21580) 2026-01-29 17:50:19 +00:00
Matthias Seitz
a53ef64a54 perf(trie): optimize prune with LRU eviction and dirty tracking
- Add generation-based LRU tracking for storage tries
- Only prune dirty tries (modified since last prune)
- Evict non-dirty tries first when over limit, then LRU evict oldest
- Use O(n) select_nth_unstable for eviction instead of O(n log n) sort
- Parallelize shrink_to for storage tries
- Track trie modifications via insert_storage_trie

Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c0a25-8a84-7739-b5eb-ccf486a76629
2026-01-29 18:10:35 +01:00
Georgios Konstantopoulos
5307dfc22b chore: update RPC URLs from ithaca.xyz to reth.rs (#21574)
Co-authored-by: Tim Beiko <tim@ethereum.org>
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 17:06:13 +00:00
Brian Picciano
f380ed1581 fix(engine): Try to always compute storage root in V2 proofs when account proof is present, fallback if not (#21579) 2026-01-29 16:58:59 +00:00
DaniPopes
f7313c755c chore(deps): bump codspeed (#21578) 2026-01-29 16:50:09 +00:00
Georgios Konstantopoulos
3bc2191590 chore: remove cargo-chef from Dockerfile.depot (#21577) 2026-01-29 16:28:44 +00:00
Brian Picciano
320f2a6015 fix(trie): PST: Fix update_leaf atomicity, remove update_leaves revealed tracking, fix callback calling (#21573) 2026-01-29 16:18:42 +00:00
Brian Picciano
74f9e386ed add histogram for reuse timing 2026-01-29 15:20:12 +01:00
Georgios Konstantopoulos
70bfdafd26 fix(provider): check executed block before returning historical state (#21571)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
2026-01-29 13:54:50 +00:00
Brian Picciano
45ff349b00 refactor: move sparse trie constants to reth-engine-primitives
Remove DEFAULT_SPARSE_TRIE_PRUNE_DEPTH and DEFAULT_MAX_PRESERVED_STORAGE_TRIES
from reth-trie-sparse and define them in reth-engine-primitives instead.
2026-01-29 13:47:46 +00:00
Brian Picciano
d057c4e9e6 refactor: also reuse DEFAULT_SPARSE_TRIE_PRUNE_DEPTH from reth-trie-sparse 2026-01-29 13:46:36 +00:00
Brian Picciano
405b40f02f refactor: address review comments
- Reuse DEFAULT_MAX_PRESERVED_STORAGE_TRIES from reth-trie-sparse instead of duplicating constant
- Apply take_updates with_capacity pattern to sparse/src/trie.rs to match sparse-parallel
2026-01-29 13:45:23 +00:00
Brian Picciano
868fffe0dd nitpick comment 2026-01-29 14:29:10 +01:00
Brian Picciano
b288c4e259 Fix PST::take_updates not replacing with Some if was previously enabled 2026-01-29 14:27:15 +01:00
YK
e9fe0283a9 fix(provider): use storage-aware methods in unwind_trie_state_from (#21561) 2026-01-29 11:54:12 +00:00
Brian Picciano
4e26a6edc9 Merge remote-tracking branch 'origin/main' into mattsse/sparse-trie-preservation 2026-01-29 12:48:51 +01:00
Alexey Shekhirin
92b8857625 fix(reth-bench): stop fetcher when reaching chain tip (#21568) 2026-01-29 11:34:15 +00:00
YK
2d71243cf6 feat(trie): add update_leaves method to SparseTrieExt (#21525)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
2026-01-29 11:25:08 +00:00
かりんとう
732bf712aa refactor(reth-bench): dedupe read_input and load_jwt_secret helpers (#21555) 2026-01-29 10:17:51 +00:00
Dan Cline
0901c2ca8b fix(reth-bench): retry testing_buildBlockV1 when payload gas < target (#21547)
Co-authored-by: Alexey Shekhirin <github@shekhirin.com>
2026-01-29 10:08:54 +00:00
Matthias Seitz
2352158b3d fix(reth-bench): return error instead of panic on invalid payload (#21557)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 00:35:19 +00:00
Matthias Seitz
26f788d5ca feat(engine): enable sparse trie structure reuse for continuation payloads
When a new payload's parent matches the preserved trie's block hash,
the pruned trie structure is now reused directly instead of being cleared.
This enables incremental trie updates without rebuilding from scratch.

For non-continuation payloads, the trie is still cleared but allocations
are preserved for memory efficiency.

Amp-Thread-ID: https://ampcode.com/threads/T-019c06cb-b73a-756c-963b-c1ef3db6a6bb
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 00:07:30 +01:00
Matthias Seitz
9433ac5666 fix(engine): prevent race condition in sparse trie preservation
Acquire guard before sending state root result to ensure the next block's
take() blocks until we've stored the trie for reuse.

Also always clear the trie (keeping allocations) because the pruned trie
cannot be directly reused - prune() deletes values needed for state root
computation and clears revealed_account_paths which breaks proof filtering.

Amp-Thread-ID: https://ampcode.com/threads/T-019c060d-2327-741f-b8d2-6d246e5d521e
Co-authored-by: Amp <amp@ampcode.com>
2026-01-29 00:07:26 +01:00
Georgios Konstantopoulos
1a98605ce6 chore(net): downgrade fork id mismatch log to trace (#21554)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-28 22:41:42 +00:00
DaniPopes
8d37f76d23 chore: move scripts from .github/assets to .github/scripts (#21539)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-28 22:14:37 +00:00
Dan Cline
2d9cf4c989 chore: fix unused warns in sparse trie (#21546) 2026-01-28 21:48:59 +00:00
DaniPopes
f5ca71d2fb chore(deps): cargo update (#21538)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-28 19:49:15 +00:00
Alexey Shekhirin
8d58c98034 feat(reth-bench): add reporting and wait options to replay-payloads (#21537) 2026-01-28 19:13:19 +00:00
Matthias Seitz
50e0591540 perf(tree): optimistically prepare canonical overlay (#21475)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-28 18:16:04 +00:00
joshieDo
013dfdf8c8 fix(prune): add minimum 64 block retention for receipts and bodies (#21520) 2026-01-28 18:10:07 +00:00
Matthias Seitz
0796a20f24 fix(engine): clear trie on failed state root computation
A failed computation may leave the trie in a partially updated state.
Only preserve the trie for reuse when computation succeeds.

Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c0566-6cc1-7415-a083-e82fa071cdc7
2026-01-28 18:20:09 +01:00
Matthias Seitz
2633d3e513 refactor(engine): improve sparse trie task API and error handling
- Change run() to take &mut self instead of consuming self
- Add into_cleared_trie() for invalid/cancelled payloads (skips pruning)
- Clear trie when state root receiver is dropped
- Improve documentation with cross-references

Amp-Thread-ID: https://ampcode.com/threads/T-019c0566-6cc1-7415-a083-e82fa071cdc7
Co-authored-by: Amp <amp@ampcode.com>
2026-01-28 17:55:16 +01:00
Matthias Seitz
1f8fb7e58f feat(engine): preserve sparse trie across payload validations
Implement sparse trie preservation to reduce memory allocations when
validating consecutive blocks (parent-child relationship).

Changes:
- Add PreservedSparseTrie to store trie with its computed block hash
- Add SharedPreservedSparseTrie newtype for cleaner handle passing
- Add prune_depth and max_storage_tries config to TreeConfig
- Add SparseStateTrie::clear() and shrink_to() methods
- Prune trie after sending state root result (non-blocking)
- Reuse pruned trie when new payload's parent matches stored block hash
- Clear trie (keeping allocations) when not a continuation

RETH-183

Amp-Thread-ID: https://ampcode.com/threads/T-019c0566-6cc1-7415-a083-e82fa071cdc7
Co-authored-by: Amp <amp@ampcode.com>
2026-01-28 17:44:55 +01:00
joshieDo
effa0ab4c7 fix(provider): read changesets from static files during unwind (#21528)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-28 15:52:08 +00:00
SS
543a85e9f3 fix: simplify UTF-8 decoding in StreamCodec by using Result::ok (#21524) 2026-01-28 15:12:55 +00:00
theo
88eb0beeb2 chore(op-reth): remove op-reth dependencies from core reth library crates (#21492) 2026-01-28 14:53:17 +00:00
159 changed files with 7597 additions and 1439 deletions

View File

@@ -38,6 +38,6 @@ for pid in "${saving_pids[@]}"; do
done
# Make sure we don't rebuild images on the CI jobs
git apply ../.github/assets/hive/no_sim_build.diff
git apply ../.github/scripts/hive/no_sim_build.diff
go build .
mv ./hive ../hive_assets/

View File

@@ -58,11 +58,11 @@ jobs:
uses: actions/cache@v5
with:
path: ./hive_assets
key: hive-assets-${{ steps.hive-commit.outputs.hash }}-${{ hashFiles('.github/assets/hive/build_simulators.sh') }}
key: hive-assets-${{ steps.hive-commit.outputs.hash }}-${{ hashFiles('.github/scripts/hive/build_simulators.sh') }}
- name: Build hive assets
if: steps.cache-hive.outputs.cache-hit != 'true'
run: .github/assets/hive/build_simulators.sh
run: .github/scripts/hive/build_simulators.sh
- name: Load cached Docker images
if: steps.cache-hive.outputs.cache-hit == 'true'
@@ -213,7 +213,7 @@ jobs:
path: /tmp
- name: Load Docker images
run: .github/assets/hive/load_images.sh
run: .github/scripts/hive/load_images.sh
- name: Move hive binary
run: |
@@ -241,11 +241,11 @@ jobs:
FILTER="/"
fi
echo "filter: $FILTER"
.github/assets/hive/run_simulator.sh "${{ matrix.scenario.sim }}" "$FILTER"
.github/scripts/hive/run_simulator.sh "${{ matrix.scenario.sim }}" "$FILTER"
- name: Parse hive output
run: |
find hivetests/workspace/logs -type f -name "*.json" ! -name "hive.json" | xargs -I {} python .github/assets/hive/parse.py {} --exclusion .github/assets/hive/expected_failures.yaml --ignored .github/assets/hive/ignored_tests.yaml
find hivetests/workspace/logs -type f -name "*.json" ! -name "hive.json" | xargs -I {} python .github/scripts/hive/parse.py {} --exclusion .github/scripts/hive/expected_failures.yaml --ignored .github/scripts/hive/ignored_tests.yaml
- name: Print simulator output
if: ${{ failure() }}

View File

@@ -22,7 +22,7 @@ concurrency:
jobs:
test:
name: test / ${{ matrix.network }}
name: test / ${{ matrix.network }} / ${{ matrix.storage }}
if: github.event_name != 'schedule'
runs-on: depot-ubuntu-latest-4
env:
@@ -30,13 +30,17 @@ jobs:
strategy:
matrix:
network: ["ethereum", "optimism"]
storage: ["stable", "edge"]
exclude:
- network: optimism
storage: edge
timeout-minutes: 60
steps:
- uses: actions/checkout@v6
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- name: Install Geth
run: .github/assets/install_geth.sh
run: .github/scripts/install_geth.sh
- uses: taiki-e/install-action@nextest
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
@@ -46,7 +50,7 @@ jobs:
name: Run tests
run: |
cargo nextest run \
--locked --features "asm-keccak ${{ matrix.network }}" \
--locked --features "asm-keccak ${{ matrix.network }} ${{ matrix.storage == 'edge' && 'edge' || '' }}" \
--workspace --exclude ef-tests \
-E "kind(test) and not binary(e2e_testsuite)"
- if: matrix.network == 'optimism'

View File

@@ -19,5 +19,5 @@ jobs:
uses: actions/github-script@v8
with:
script: |
const label_pr = require('./.github/assets/label_pr.js')
const label_pr = require('./.github/scripts/label_pr.js')
await label_pr({github, context})

View File

@@ -76,7 +76,7 @@ jobs:
- name: Run Wasm checks
run: |
sudo apt update && sudo apt install gcc-multilib
.github/assets/check_wasm.sh
.github/scripts/check_wasm.sh
riscv:
runs-on: depot-ubuntu-latest
@@ -94,7 +94,7 @@ jobs:
cache-on-failure: true
- uses: dcarbone/install-jq-action@v3
- name: Run RISC-V checks
run: .github/assets/check_rv32imac.sh
run: .github/scripts/check_rv32imac.sh
crate-checks:
name: crate-checks (${{ matrix.partition }}/${{ matrix.total_partitions }})

View File

@@ -43,7 +43,7 @@ jobs:
uses: docker/build-push-action@v6
with:
context: .
file: .github/assets/hive/Dockerfile
file: .github/scripts/hive/Dockerfile
tags: ${{ inputs.image_tag }}
outputs: type=docker,dest=./artifacts/reth_image.tar
build-args: |

157
Cargo.lock generated
View File

@@ -186,9 +186,9 @@ dependencies = [
[[package]]
name = "alloy-dyn-abi"
version = "1.5.2"
version = "1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "369f5707b958927176265e8a58627fc6195e5dfa5c55689396e68b241b3a72e6"
checksum = "14ff5ee5f27aa305bda825c735f686ad71bb65508158f059f513895abe69b8c3"
dependencies = [
"alloy-json-abi",
"alloy-primitives",
@@ -340,9 +340,9 @@ dependencies = [
[[package]]
name = "alloy-json-abi"
version = "1.5.2"
version = "1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84e3cf01219c966f95a460c95f1d4c30e12f6c18150c21a30b768af2a2a29142"
checksum = "8708475665cc00e081c085886e68eada2f64cfa08fc668213a9231655093d4de"
dependencies = [
"alloy-primitives",
"alloy-sol-type-parser",
@@ -437,9 +437,9 @@ dependencies = [
[[package]]
name = "alloy-primitives"
version = "1.5.2"
version = "1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6a0fb18dd5fb43ec5f0f6a20be1ce0287c79825827de5744afaa6c957737c33"
checksum = "3b88cf92ed20685979ed1d8472422f0c6c2d010cec77caf63aaa7669cc1a7bc2"
dependencies = [
"alloy-rlp",
"arbitrary",
@@ -464,7 +464,6 @@ dependencies = [
"rustc-hash",
"serde",
"sha3",
"tiny-keccak",
]
[[package]]
@@ -794,9 +793,9 @@ dependencies = [
[[package]]
name = "alloy-sol-macro"
version = "1.5.2"
version = "1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09eb18ce0df92b4277291bbaa0ed70545d78b02948df756bbd3d6214bf39a218"
checksum = "f5fa1ca7e617c634d2bd9fa71f9ec8e47c07106e248b9fcbd3eaddc13cabd625"
dependencies = [
"alloy-sol-macro-expander",
"alloy-sol-macro-input",
@@ -808,9 +807,9 @@ dependencies = [
[[package]]
name = "alloy-sol-macro-expander"
version = "1.5.2"
version = "1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95d9fa2daf21f59aa546d549943f10b5cce1ae59986774019fbedae834ffe01b"
checksum = "27c00c0c3a75150a9dc7c8c679ca21853a137888b4e1c5569f92d7e2b15b5102"
dependencies = [
"alloy-sol-macro-input",
"const-hex",
@@ -819,16 +818,16 @@ dependencies = [
"proc-macro-error2",
"proc-macro2",
"quote",
"sha3",
"syn 2.0.114",
"syn-solidity",
"tiny-keccak",
]
[[package]]
name = "alloy-sol-macro-input"
version = "1.5.2"
version = "1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9396007fe69c26ee118a19f4dee1f5d1d6be186ea75b3881adf16d87f8444686"
checksum = "297db260eb4d67c105f68d6ba11b8874eec681caec5505eab8fbebee97f790bc"
dependencies = [
"const-hex",
"dunce",
@@ -842,9 +841,9 @@ dependencies = [
[[package]]
name = "alloy-sol-type-parser"
version = "1.5.2"
version = "1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af67a0b0dcebe14244fc92002cd8d96ecbf65db4639d479f5fcd5805755a4c27"
checksum = "94b91b13181d3bcd23680fd29d7bc861d1f33fbe90fdd0af67162434aeba902d"
dependencies = [
"serde",
"winnow",
@@ -852,9 +851,9 @@ dependencies = [
[[package]]
name = "alloy-sol-types"
version = "1.5.2"
version = "1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09aeea64f09a7483bdcd4193634c7e5cf9fd7775ee767585270cd8ce2d69dc95"
checksum = "fc442cc2a75207b708d481314098a0f8b6f7b58e3148dd8d8cc7407b0d6f9385"
dependencies = [
"alloy-json-abi",
"alloy-primitives",
@@ -1040,6 +1039,15 @@ version = "1.0.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61"
[[package]]
name = "approx"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cab112f0a86d568ea0e627cc1d6be74a1e9cd55214684db5561995f6dad897c6"
dependencies = [
"num-traits",
]
[[package]]
name = "aquamarine"
version = "0.6.0"
@@ -2209,9 +2217,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.5.54"
version = "4.5.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6e6ff9dcd79cff5cd969a17a545d79e84ab086e444102a591e288a8aa3ce394"
checksum = "3e34525d5bbbd55da2bb745d34b36121baac88d07619a9a09cfcf4a6c0832785"
dependencies = [
"clap_builder",
"clap_derive",
@@ -2219,9 +2227,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.5.54"
version = "4.5.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa42cf4d2b7a41bc8f663a7cab4031ebafa1bf3875705bfaf8466dc60ab52c00"
checksum = "59a20016a20a3da95bef50ec7238dbd09baeef4311dcdd38ec15aba69812fb61"
dependencies = [
"anstream",
"anstyle",
@@ -2231,9 +2239,9 @@ dependencies = [
[[package]]
name = "clap_derive"
version = "4.5.49"
version = "4.5.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a0b5487afeab2deb2ff4e03a807ad1a03ac532ff5a2cee5d86884440c7f7671"
checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5"
dependencies = [
"heck",
"proc-macro2",
@@ -2258,35 +2266,42 @@ dependencies = [
[[package]]
name = "codspeed"
version = "2.10.1"
version = "4.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93f4cce9c27c49c4f101fffeebb1826f41a9df2e7498b7cd4d95c0658b796c6c"
checksum = "38c2eb3388ebe26b5a0ab6bf4969d9c4840143d7f6df07caa3cc851b0606cef6"
dependencies = [
"anyhow",
"cc",
"colored",
"getrandom 0.2.17",
"glob",
"libc",
"nix 0.30.1",
"serde",
"serde_json",
"uuid",
"statrs",
]
[[package]]
name = "codspeed-criterion-compat"
version = "2.10.1"
version = "4.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3c23d880a28a2aab52d38ca8481dd7a3187157d0a952196b6db1db3c8499725"
checksum = "e1e270597a1d1e183f86d1cc9f94f0133654ee3daf201c17903ee29363555dd7"
dependencies = [
"clap",
"codspeed",
"codspeed-criterion-compat-walltime",
"colored",
"futures",
"regex",
"tokio",
]
[[package]]
name = "codspeed-criterion-compat-walltime"
version = "2.10.1"
version = "4.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b0a2f7365e347f4f22a67e9ea689bf7bc89900a354e22e26cf8a531a42c8fbb"
checksum = "e6c2613d2fac930fe34456be76f9124ee0800bb9db2e7fd2d6c65b9ebe98a292"
dependencies = [
"anes",
"cast",
@@ -3590,7 +3605,6 @@ version = "0.0.0"
dependencies = [
"alloy-eips",
"alloy-evm",
"alloy-sol-macro",
"alloy-sol-types",
"eyre",
"reth-ethereum",
@@ -4819,9 +4833,9 @@ dependencies = [
[[package]]
name = "iana-time-zone"
version = "0.1.64"
version = "0.1.65"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33e57f83510bb73707521ebaffa789ec8caf86f9657cad665b092b581d40e9fb"
checksum = "e31bc9ad994ba00e440a8aa5c9ef0ec67d5cb5e5cb0cc7f8b744a35b389cc470"
dependencies = [
"android_system_properties",
"core-foundation-sys",
@@ -5489,9 +5503,9 @@ dependencies = [
[[package]]
name = "keccak-asm"
version = "0.1.4"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "505d1856a39b200489082f90d897c3f07c455563880bc5952e38eabf731c83b6"
checksum = "b646a74e746cd25045aa0fd42f4f7f78aa6d119380182c7e63a5593c4ab8df6f"
dependencies = [
"digest 0.10.7",
"sha3-asm",
@@ -5979,9 +5993,9 @@ dependencies = [
[[package]]
name = "moka"
version = "0.12.12"
version = "0.12.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3dec6bd31b08944e08b58fd99373893a6c17054d6f3ea5006cc894f4f4eee2a"
checksum = "b4ac832c50ced444ef6be0767a008b02c106a909ba79d1d830501e94b96f6b7e"
dependencies = [
"crossbeam-channel",
"crossbeam-epoch",
@@ -6095,9 +6109,12 @@ dependencies = [
[[package]]
name = "notify-types"
version = "2.0.0"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e0826a989adedc2a244799e823aece04662b66609d96af8dff7ac6df9a8925d"
checksum = "42b8cfee0e339a0337359f3c88165702ac6e600dc01c0cc9579a92d62b08477a"
dependencies = [
"bitflags 2.10.0",
]
[[package]]
name = "ntapi"
@@ -7797,6 +7814,7 @@ dependencies = [
"itertools 0.14.0",
"lz4",
"metrics",
"parking_lot",
"proptest",
"proptest-arbitrary-interop",
"ratatui",
@@ -8041,6 +8059,7 @@ dependencies = [
"derive_more",
"metrics",
"modular-bitfield",
"op-alloy-consensus",
"parity-scale-codec",
"proptest",
"proptest-arbitrary-interop",
@@ -8048,7 +8067,6 @@ dependencies = [
"reth-codecs",
"reth-db-models",
"reth-ethereum-primitives",
"reth-optimism-primitives",
"reth-primitives-traits",
"reth-prune-types",
"reth-stages-types",
@@ -8319,7 +8337,6 @@ dependencies = [
"reth-chainspec",
"reth-engine-primitives",
"reth-ethereum-engine-primitives",
"reth-optimism-chainspec",
"reth-payload-builder",
"reth-payload-primitives",
"reth-primitives-traits",
@@ -10526,9 +10543,7 @@ dependencies = [
"op-alloy-rpc-types",
"reth-ethereum-primitives",
"reth-evm",
"reth-optimism-primitives",
"reth-primitives-traits",
"reth-storage-api",
"serde_json",
"thiserror 2.0.18",
]
@@ -10718,6 +10733,7 @@ version = "1.10.2"
dependencies = [
"alloy-consensus",
"alloy-eips",
"alloy-genesis",
"alloy-primitives",
"alloy-rlp",
"assert_matches",
@@ -10737,6 +10753,7 @@ dependencies = [
"reth-consensus",
"reth-db",
"reth-db-api",
"reth-db-common",
"reth-downloaders",
"reth-era",
"reth-era-downloader",
@@ -10762,6 +10779,7 @@ dependencies = [
"reth-storage-api",
"reth-storage-errors",
"reth-testing-utils",
"reth-tracing",
"reth-trie",
"reth-trie-db",
"tempfile",
@@ -12204,9 +12222,9 @@ dependencies = [
[[package]]
name = "sha3-asm"
version = "0.1.4"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c28efc5e327c837aa837c59eae585fc250715ef939ac32881bcc11677cd02d46"
checksum = "b31139435f327c93c6038ed350ae4588e2c70a13d50599509fee6349967ba35a"
dependencies = [
"cc",
"cfg-if",
@@ -12318,9 +12336,9 @@ dependencies = [
[[package]]
name = "siphasher"
version = "1.0.1"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d"
checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e"
[[package]]
name = "sketches-ddsketch"
@@ -12436,6 +12454,16 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "statrs"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a3fe7c28c6512e766b0874335db33c94ad7b8f9054228ae1c2abd47ce7d335e"
dependencies = [
"approx",
"num-traits",
]
[[package]]
name = "strsim"
version = "0.11.1"
@@ -12515,9 +12543,9 @@ dependencies = [
[[package]]
name = "syn-solidity"
version = "1.5.2"
version = "1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f92d01b5de07eaf324f7fca61cc6bd3d82bbc1de5b6c963e6fe79e86f36580d"
checksum = "2379beea9476b89d0237078be761cf8e012d92d5ae4ae0c9a329f974838870fc"
dependencies = [
"paste",
"proc-macro2",
@@ -12825,15 +12853,6 @@ dependencies = [
"time-core",
]
[[package]]
name = "tiny-keccak"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237"
dependencies = [
"crunchy",
]
[[package]]
name = "tinystr"
version = "0.8.2"
@@ -13025,9 +13044,9 @@ checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801"
[[package]]
name = "tonic"
version = "0.14.2"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203"
checksum = "a286e33f82f8a1ee2df63f4fa35c0becf4a85a0cb03091a15fd7bf0b402dc94a"
dependencies = [
"async-trait",
"base64 0.22.1",
@@ -13051,9 +13070,9 @@ dependencies = [
[[package]]
name = "tonic-prost"
version = "0.14.2"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67"
checksum = "d6c55a2d6a14174563de34409c9f92ff981d006f56da9c6ecd40d9d4a31500b0"
dependencies = [
"bytes",
"prost 0.14.3",
@@ -14440,18 +14459,18 @@ dependencies = [
[[package]]
name = "zerocopy"
version = "0.8.33"
version = "0.8.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "668f5168d10b9ee831de31933dc111a459c97ec93225beb307aed970d1372dfd"
checksum = "fdea86ddd5568519879b8187e1cf04e24fce28f7fe046ceecbce472ff19a2572"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.33"
version = "0.8.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c7962b26b0a8685668b671ee4b54d007a67d4eaf05fda79ac0ecf41e32270f1"
checksum = "0c15e1b46eff7c6c91195752e0eeed8ef040e391cdece7c25376957d5f15df22"
dependencies = [
"proc-macro2",
"quote",
@@ -14535,9 +14554,9 @@ dependencies = [
[[package]]
name = "zmij"
version = "1.0.16"
version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfcd145825aace48cff44a8844de64bf75feec3080e0aa5cdbde72961ae51a65"
checksum = "02aae0f83f69aafc94776e879363e9771d7ecbffe2c7fbb6c14c5e00dfe88439"
[[package]]
name = "zstd"

View File

@@ -77,6 +77,10 @@ members = [
"crates/optimism/cli",
"crates/optimism/consensus",
"crates/optimism/evm/",
"crates/optimism/examples/custom-node",
"crates/optimism/examples/engine-api-access",
"crates/optimism/examples/exex-hello-world",
"crates/optimism/examples/op-db-access",
"crates/optimism/flashblocks/",
"crates/optimism/hardforks/",
"crates/optimism/node/",
@@ -145,7 +149,6 @@ members = [
"examples/beacon-api-sse/",
"examples/bsc-p2p",
"examples/custom-dev-node/",
"examples/custom-node/",
"examples/custom-engine-types/",
"examples/custom-evm/",
"examples/custom-hardforks/",
@@ -154,10 +157,7 @@ members = [
"examples/custom-payload-builder/",
"examples/custom-rlpx-subprotocol",
"examples/custom-rpc-middleware",
"examples/custom-node",
"examples/db-access",
"examples/engine-api-access",
"examples/exex-hello-world",
"examples/exex-subscription",
"examples/exex-test",
"examples/full-contract-state",
@@ -168,7 +168,6 @@ members = [
"examples/node-builder-api/",
"examples/node-custom-rpc/",
"examples/node-event-hooks/",
"examples/op-db-access/",
"examples/polygon-p2p/",
"examples/rpc-db/",
"examples/precompile-cache/",
@@ -484,15 +483,15 @@ op-revm = { version = "15.0.0", default-features = false }
revm-inspectors = "0.34.1"
# eth
alloy-dyn-abi = "1.5.4"
alloy-primitives = { version = "1.5.4", default-features = false, features = ["map-foldhash"] }
alloy-sol-types = { version = "1.5.4", default-features = false }
alloy-chains = { version = "0.2.5", default-features = false }
alloy-dyn-abi = "1.5.2"
alloy-eip2124 = { version = "0.2.0", default-features = false }
alloy-eip7928 = { version = "0.3.0", default-features = false }
alloy-evm = { version = "0.27.0", default-features = false }
alloy-primitives = { version = "1.5.0", default-features = false, features = ["map-foldhash"] }
alloy-rlp = { version = "0.3.10", default-features = false, features = ["core-net"] }
alloy-sol-macro = "1.5.0"
alloy-sol-types = { version = "1.5.0", default-features = false }
alloy-trie = { version = "0.9.1", default-features = false }
alloy-hardforks = "0.4.5"
@@ -671,7 +670,7 @@ tracing-opentelemetry = "0.32"
# misc-testing
arbitrary = "1.3"
assert_matches = "1.5.0"
criterion = { package = "codspeed-criterion-compat", version = "2.7" }
criterion = { package = "codspeed-criterion-compat", version = "4.3" }
insta = "1.41"
proptest = "1.7"
proptest-derive = "0.5"
@@ -733,7 +732,7 @@ snap = "1.1.1"
socket2 = { version = "0.5", default-features = false }
sysinfo = { version = "0.33", default-features = false }
tracing-journald = "0.3"
tracing-logfmt = "0.3.3"
tracing-logfmt = "=0.3.5"
tracing-samply = "0.1"
tracing-subscriber = { version = "0.3", default-features = false }
tracing-tracy = "0.11"

View File

@@ -5,7 +5,7 @@
# reth: --build-arg BINARY=reth
# op-reth: --build-arg BINARY=op-reth --build-arg MANIFEST_PATH=crates/optimism/bin
FROM lukemathwalker/cargo-chef:latest-rust-1 AS chef
FROM rust:1 AS builder
WORKDIR /app
LABEL org.opencontainers.image.source=https://github.com/paradigmxyz/reth
@@ -19,14 +19,6 @@ ENV RUSTC_WRAPPER=sccache
ENV SCCACHE_DIR=/sccache
ENV SCCACHE_WEBDAV_ENDPOINT=https://cache.depot.dev
# Builds a cargo-chef plan
FROM chef AS planner
COPY --exclude=.git . .
RUN cargo chef prepare --recipe-path recipe.json
FROM chef AS builder
COPY --from=planner /app/recipe.json recipe.json
# Binary to build (reth or op-reth)
ARG BINARY=reth
@@ -53,13 +45,6 @@ ENV VERGEN_GIT_SHA=$VERGEN_GIT_SHA
ENV VERGEN_GIT_DESCRIBE=$VERGEN_GIT_DESCRIBE
ENV VERGEN_GIT_DIRTY=$VERGEN_GIT_DIRTY
# Build dependencies
RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \
--mount=type=cache,target=/usr/local/cargo/registry,sharing=shared \
--mount=type=cache,target=/usr/local/cargo/git,sharing=shared \
--mount=type=cache,target=$SCCACHE_DIR,sharing=shared \
cargo chef cook --profile $BUILD_PROFILE --features "$FEATURES" --locked --recipe-path recipe.json --manifest-path $MANIFEST_PATH/Cargo.toml
# Build application
COPY --exclude=.git . .
RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \

View File

@@ -274,10 +274,10 @@ impl Args {
/// Get the default RPC URL for a given chain
const fn get_default_rpc_url(chain: &Chain) -> &'static str {
match chain.id() {
8453 => "https://base-mainnet.rpc.ithaca.xyz", // base
8453 => "https://base.reth.rs/rpc", // base
84532 => "https://base-sepolia.rpc.ithaca.xyz", // base-sepolia
27082 => "https://rpc.hoodi.ethpandaops.io", // hoodi
_ => "https://reth-ethereum.ithaca.xyz/rpc", // mainnet and fallback
_ => "https://ethereum.reth.rs/rpc", // mainnet and fallback
}
}

View File

@@ -132,13 +132,24 @@ impl<S: TransactionSource> TransactionCollector<S> {
/// Collect transactions starting from the given block number.
///
/// Skips blob transactions (type 3) and collects until target gas is reached.
/// Returns the collected raw transaction bytes, total gas used, and the next block number.
pub async fn collect(&self, start_block: u64) -> eyre::Result<(Vec<Bytes>, u64, u64)> {
let mut transactions: Vec<Bytes> = Vec::new();
/// Returns a `CollectionResult` with transactions, gas info, and next block.
pub async fn collect(&self, start_block: u64) -> eyre::Result<CollectionResult> {
self.collect_gas(start_block, self.target_gas).await
}
/// Collect transactions up to a specific gas target.
///
/// This is used both for initial collection and for retry top-ups.
pub async fn collect_gas(
&self,
start_block: u64,
gas_target: u64,
) -> eyre::Result<CollectionResult> {
let mut transactions: Vec<RawTransaction> = Vec::new();
let mut total_gas: u64 = 0;
let mut current_block = start_block;
while total_gas < self.target_gas {
while total_gas < gas_target {
let Some((block_txs, _)) = self.source.fetch_block_transactions(current_block).await?
else {
warn!(block = current_block, "Block not found, stopping");
@@ -151,12 +162,12 @@ impl<S: TransactionSource> TransactionCollector<S> {
continue;
}
if total_gas + tx.gas_used <= self.target_gas {
transactions.push(tx.raw);
if total_gas + tx.gas_used <= gas_target {
total_gas += tx.gas_used;
transactions.push(tx);
}
if total_gas >= self.target_gas {
if total_gas >= gas_target {
break;
}
}
@@ -164,7 +175,7 @@ impl<S: TransactionSource> TransactionCollector<S> {
current_block += 1;
// Stop early if remaining gas is under 1M (close enough to target)
let remaining_gas = self.target_gas.saturating_sub(total_gas);
let remaining_gas = gas_target.saturating_sub(total_gas);
if remaining_gas < 1_000_000 {
break;
}
@@ -172,12 +183,12 @@ impl<S: TransactionSource> TransactionCollector<S> {
info!(
total_txs = transactions.len(),
total_gas,
gas_sent = total_gas,
next_block = current_block,
"Finished collecting transactions"
);
Ok((transactions, total_gas, current_block))
Ok(CollectionResult { transactions, gas_sent: total_gas, next_block: current_block })
}
}
@@ -252,6 +263,80 @@ struct BuiltPayload {
envelope: ExecutionPayloadEnvelopeV4,
block_hash: B256,
timestamp: u64,
/// The actual gas used in the built block.
gas_used: u64,
}
/// Result of collecting transactions from blocks.
#[derive(Debug)]
pub struct CollectionResult {
/// Collected transactions with their gas info.
pub transactions: Vec<RawTransaction>,
/// Total gas sent (sum of historical `gas_used` for all collected txs).
pub gas_sent: u64,
/// Next block number to continue collecting from.
pub next_block: u64,
}
/// Constants for retry logic.
const MAX_BUILD_RETRIES: u32 = 5;
/// Maximum retries for fetching a transaction batch.
const MAX_FETCH_RETRIES: u32 = 5;
/// Tolerance: if `gas_used` is within 1M of target, don't retry.
const MIN_TARGET_SLACK: u64 = 1_000_000;
/// Maximum gas to request in retries (10x target as safety cap).
const MAX_ADDITIONAL_GAS_MULTIPLIER: u64 = 10;
/// Fetches a batch of transactions with retry logic.
///
/// Returns `None` if all retries are exhausted.
async fn fetch_batch_with_retry<S: TransactionSource>(
collector: &TransactionCollector<S>,
block: u64,
) -> Option<CollectionResult> {
for attempt in 1..=MAX_FETCH_RETRIES {
match collector.collect(block).await {
Ok(result) => return Some(result),
Err(e) => {
if attempt == MAX_FETCH_RETRIES {
warn!(attempt, error = %e, "Failed to fetch transactions after max retries");
return None;
}
warn!(attempt, error = %e, "Failed to fetch transactions, retrying...");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
}
None
}
/// Outcome of a build attempt check.
enum RetryOutcome {
/// Payload is close enough to target gas.
Success,
/// Max retries reached, accept what we have.
MaxRetries,
/// Need more transactions with the specified gas amount.
NeedMore(u64),
}
/// Buffer for receiving transaction batches from the fetcher.
///
/// This abstracts over the channel to allow the main loop to request
/// batches on demand, including for retries.
struct TxBuffer {
receiver: mpsc::Receiver<CollectionResult>,
}
impl TxBuffer {
const fn new(receiver: mpsc::Receiver<CollectionResult>) -> Self {
Self { receiver }
}
/// Take the next available batch from the fetcher.
async fn take_batch(&mut self) -> Option<CollectionResult> {
self.receiver.recv().await
}
}
impl Command {
@@ -312,19 +397,20 @@ impl Command {
)
.await?;
} else {
// Single payload - collect transactions and build
// Single payload - collect transactions and build with retry
let tx_source = RpcTransactionSource::from_url(&self.rpc_url)?;
let collector = TransactionCollector::new(tx_source, self.target_gas);
let (transactions, _total_gas, _next_block) = collector.collect(start_block).await?;
let result = collector.collect(start_block).await?;
if transactions.is_empty() {
if result.transactions.is_empty() {
return Err(eyre::eyre!("No transactions collected"));
}
self.execute_sequential(
self.execute_sequential_with_retry(
&auth_provider,
&testing_provider,
transactions,
&collector,
result,
parent_hash,
parent_timestamp,
)
@@ -335,32 +421,34 @@ impl Command {
Ok(())
}
/// Sequential execution path for single payload or no-execute mode.
async fn execute_sequential(
/// Sequential execution path with retry logic for underfilled payloads.
async fn execute_sequential_with_retry<S: TransactionSource>(
&self,
auth_provider: &RootProvider<AnyNetwork>,
testing_provider: &RootProvider<AnyNetwork>,
transactions: Vec<Bytes>,
collector: &TransactionCollector<S>,
initial_result: CollectionResult,
mut parent_hash: B256,
mut parent_timestamp: u64,
) -> eyre::Result<()> {
for i in 0..self.count {
info!(
payload = i + 1,
total = self.count,
parent_hash = %parent_hash,
parent_timestamp = parent_timestamp,
"Building payload via testing_buildBlockV1"
);
let mut current_result = initial_result;
for i in 0..self.count {
let built = self
.build_payload(testing_provider, &transactions, i, parent_hash, parent_timestamp)
.build_with_retry(
testing_provider,
collector,
&mut current_result,
i,
parent_hash,
parent_timestamp,
)
.await?;
self.save_payload(&built)?;
if self.execute || self.count > 1 {
info!(payload = i + 1, block_hash = %built.block_hash, "Executing payload (newPayload + FCU)");
info!(payload = i + 1, block_hash = %built.block_hash, gas_used = built.gas_used, "Executing payload (newPayload + FCU)");
self.execute_payload_v4(auth_provider, built.envelope, parent_hash).await?;
info!(payload = i + 1, "Payload executed successfully");
}
@@ -371,7 +459,62 @@ impl Command {
Ok(())
}
/// Pipelined execution - fetches transactions and builds payloads in background.
/// Build a payload with retry logic when `gas_used` is below target.
///
/// Uses the ratio of `gas_used/gas_sent` to estimate how many more transactions
/// are needed to hit the target gas.
async fn build_with_retry<S: TransactionSource>(
&self,
testing_provider: &RootProvider<AnyNetwork>,
collector: &TransactionCollector<S>,
result: &mut CollectionResult,
index: u64,
parent_hash: B256,
parent_timestamp: u64,
) -> eyre::Result<BuiltPayload> {
for attempt in 1..=MAX_BUILD_RETRIES {
let tx_bytes: Vec<Bytes> = result.transactions.iter().map(|t| t.raw.clone()).collect();
let gas_sent = result.gas_sent;
info!(
payload = index + 1,
attempt,
tx_count = tx_bytes.len(),
gas_sent,
parent_hash = %parent_hash,
"Building payload via testing_buildBlockV1"
);
let built = Self::build_payload_static(
testing_provider,
&tx_bytes,
index,
parent_hash,
parent_timestamp,
)
.await?;
match self.check_retry_outcome(&built, index, attempt, gas_sent) {
RetryOutcome::Success | RetryOutcome::MaxRetries => return Ok(built),
RetryOutcome::NeedMore(additional_gas) => {
let additional =
collector.collect_gas(result.next_block, additional_gas).await?;
result.transactions.extend(additional.transactions);
result.gas_sent = result.gas_sent.saturating_add(additional.gas_sent);
result.next_block = additional.next_block;
}
}
}
warn!(payload = index + 1, "Retry loop exited without returning a payload");
Err(eyre::eyre!("build_with_retry exhausted retries without result"))
}
/// Pipelined execution - fetches transactions in background, builds with retry.
///
/// The fetcher continuously produces transaction batches. The main loop consumes them,
/// builds payloads with retry logic (requesting more transactions if underfilled),
/// and executes each payload before moving to the next.
async fn execute_pipelined(
&self,
auth_provider: &RootProvider<AnyNetwork>,
@@ -380,180 +523,220 @@ impl Command {
initial_parent_hash: B256,
initial_parent_timestamp: u64,
) -> eyre::Result<()> {
// Create channel for transaction batches (one batch per payload)
let (tx_sender, mut tx_receiver) = mpsc::channel::<Vec<Bytes>>(self.prefetch_buffer);
// Create channel for transaction batches - fetcher sends CollectionResult
let (tx_sender, tx_receiver) = mpsc::channel::<CollectionResult>(self.prefetch_buffer);
// Spawn background task to continuously fetch transaction batches
let rpc_url = self.rpc_url.clone();
let target_gas = self.target_gas;
let count = self.count;
let fetcher_handle = tokio::spawn(async move {
let tx_source = match RpcTransactionSource::from_url(&rpc_url) {
Ok(source) => source,
Err(e) => {
warn!(error = %e, "Failed to create transaction source");
return;
return None;
}
};
let collector = TransactionCollector::new(tx_source, target_gas);
let mut current_block = start_block;
for payload_idx in 0..count {
const MAX_RETRIES: u32 = 5;
let mut attempts = 0;
let result = loop {
attempts += 1;
match collector.collect(current_block).await {
Ok(res) => break Some(res),
Err(e) => {
if attempts >= MAX_RETRIES {
warn!(payload = payload_idx + 1, attempts, error = %e, "Failed to fetch transactions after max retries");
break None;
}
warn!(payload = payload_idx + 1, attempts, error = %e, "Failed to fetch transactions, retrying...");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
};
let Some((transactions, total_gas, next_block)) = result else {
while let Some(batch) = fetch_batch_with_retry(&collector, current_block).await {
if batch.transactions.is_empty() {
info!(block = current_block, "Reached chain tip, stopping fetcher");
break;
};
}
info!(
payload = payload_idx + 1,
tx_count = transactions.len(),
total_gas,
blocks = format!("{}..{}", current_block, next_block),
"Fetched transactions"
tx_count = batch.transactions.len(),
gas_sent = batch.gas_sent,
blocks = format!("{}..{}", current_block, batch.next_block),
"Fetched transaction batch"
);
current_block = next_block;
current_block = batch.next_block;
if tx_sender.send(transactions).await.is_err() {
if tx_sender.send(batch).await.is_err() {
break;
}
}
Some(current_block)
});
// Transaction buffer: holds transactions from batches + any extras from retries
let mut tx_buffer = TxBuffer::new(tx_receiver);
let mut parent_hash = initial_parent_hash;
let mut parent_timestamp = initial_parent_timestamp;
let mut pending_build: Option<tokio::task::JoinHandle<eyre::Result<BuiltPayload>>> = None;
for i in 0..self.count {
let is_last = i == self.count - 1;
// Get initial batch of transactions for this payload
let mut result = tx_buffer
.take_batch()
.await
.ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?;
// Get current payload (either from pending build or build now)
let current_payload = if let Some(handle) = pending_build.take() {
handle.await??
} else {
// First payload - wait for transactions and build synchronously
let transactions = tx_receiver
.recv()
.await
.ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?;
if result.transactions.is_empty() {
return Err(eyre::eyre!("No transactions collected for payload {}", i + 1));
}
if transactions.is_empty() {
return Err(eyre::eyre!("No transactions collected for payload {}", i + 1));
}
info!(
payload = i + 1,
total = self.count,
parent_hash = %parent_hash,
parent_timestamp = parent_timestamp,
tx_count = transactions.len(),
"Building payload via testing_buildBlockV1"
);
self.build_payload(
// Build with retry - may need to request more transactions
let built = self
.build_with_retry_buffered(
testing_provider,
&transactions,
&mut tx_buffer,
&mut result,
i,
parent_hash,
parent_timestamp,
)
.await?
};
.await?;
self.save_payload(&current_payload)?;
self.save_payload(&built)?;
let current_block_hash = current_payload.block_hash;
let current_timestamp = current_payload.timestamp;
let current_block_hash = built.block_hash;
let current_timestamp = built.timestamp;
// Execute current payload first
info!(payload = i + 1, block_hash = %current_block_hash, "Executing payload (newPayload + FCU)");
self.execute_payload_v4(auth_provider, current_payload.envelope, parent_hash).await?;
// Execute payload
info!(payload = i + 1, block_hash = %current_block_hash, gas_used = built.gas_used, "Executing payload (newPayload + FCU)");
self.execute_payload_v4(auth_provider, built.envelope, parent_hash).await?;
info!(payload = i + 1, "Payload executed successfully");
// Start building next payload in background (if not last) - AFTER execution
if !is_last {
// Get transactions for next payload (should already be fetched or fetching)
let next_transactions = tx_receiver
.recv()
.await
.ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?;
if next_transactions.is_empty() {
return Err(eyre::eyre!("No transactions collected for payload {}", i + 2));
}
let testing_provider = testing_provider.clone();
let next_index = i + 1;
let total = self.count;
pending_build = Some(tokio::spawn(async move {
info!(
payload = next_index + 1,
total = total,
parent_hash = %current_block_hash,
parent_timestamp = current_timestamp,
tx_count = next_transactions.len(),
"Building payload via testing_buildBlockV1"
);
Self::build_payload_static(
&testing_provider,
&next_transactions,
next_index,
current_block_hash,
current_timestamp,
)
.await
}));
}
parent_hash = current_block_hash;
parent_timestamp = current_timestamp;
}
// Clean up the fetcher task
drop(tx_receiver);
drop(tx_buffer);
let _ = fetcher_handle.await;
Ok(())
}
/// Build a single payload via `testing_buildBlockV1`.
async fn build_payload(
/// Build a payload with retry logic, using the buffered transaction source.
async fn build_with_retry_buffered(
&self,
testing_provider: &RootProvider<AnyNetwork>,
transactions: &[Bytes],
tx_buffer: &mut TxBuffer,
result: &mut CollectionResult,
index: u64,
parent_hash: B256,
parent_timestamp: u64,
) -> eyre::Result<BuiltPayload> {
Self::build_payload_static(
testing_provider,
transactions,
index,
parent_hash,
parent_timestamp,
)
.await
for attempt in 1..=MAX_BUILD_RETRIES {
let tx_bytes: Vec<Bytes> = result.transactions.iter().map(|t| t.raw.clone()).collect();
let gas_sent = result.gas_sent;
info!(
payload = index + 1,
attempt,
tx_count = tx_bytes.len(),
gas_sent,
parent_hash = %parent_hash,
"Building payload via testing_buildBlockV1"
);
let built = Self::build_payload_static(
testing_provider,
&tx_bytes,
index,
parent_hash,
parent_timestamp,
)
.await?;
match self.check_retry_outcome(&built, index, attempt, gas_sent) {
RetryOutcome::Success | RetryOutcome::MaxRetries => return Ok(built),
RetryOutcome::NeedMore(additional_gas) => {
let mut collected_gas = 0u64;
while collected_gas < additional_gas {
if let Some(batch) = tx_buffer.take_batch().await {
collected_gas += batch.gas_sent;
result.transactions.extend(batch.transactions);
result.gas_sent = result.gas_sent.saturating_add(batch.gas_sent);
result.next_block = batch.next_block;
} else {
warn!("Transaction fetcher exhausted, proceeding with available transactions");
break;
}
}
}
}
}
warn!(payload = index + 1, "Retry loop exited without returning a payload");
Err(eyre::eyre!("build_with_retry_buffered exhausted retries without result"))
}
/// Static version for use in spawned tasks.
/// Determines the outcome of a build attempt.
fn check_retry_outcome(
&self,
built: &BuiltPayload,
index: u64,
attempt: u32,
gas_sent: u64,
) -> RetryOutcome {
let gas_used = built.gas_used;
if gas_used + MIN_TARGET_SLACK >= self.target_gas {
info!(
payload = index + 1,
gas_used,
target_gas = self.target_gas,
attempts = attempt,
"Payload built successfully"
);
return RetryOutcome::Success;
}
if attempt == MAX_BUILD_RETRIES {
warn!(
payload = index + 1,
gas_used,
target_gas = self.target_gas,
gas_sent,
"Underfilled after max retries, accepting payload"
);
return RetryOutcome::MaxRetries;
}
if gas_used == 0 {
warn!(
payload = index + 1,
"Zero gas used in payload, requesting fixed chunk of additional transactions"
);
return RetryOutcome::NeedMore(self.target_gas);
}
let gas_sent_needed_total =
(self.target_gas as u128 * gas_sent as u128).div_ceil(gas_used as u128) as u64;
let additional = gas_sent_needed_total.saturating_sub(gas_sent);
let additional = additional.min(self.target_gas * MAX_ADDITIONAL_GAS_MULTIPLIER);
if additional == 0 {
info!(
payload = index + 1,
gas_used,
target_gas = self.target_gas,
"No additional transactions needed based on ratio"
);
return RetryOutcome::Success;
}
let ratio = gas_used as f64 / gas_sent as f64;
info!(
payload = index + 1,
gas_used,
gas_sent,
ratio = format!("{:.4}", ratio),
additional_gas = additional,
"Underfilled, collecting more transactions for retry"
);
RetryOutcome::NeedMore(additional)
}
/// Build a single payload via `testing_buildBlockV1`.
async fn build_payload_static(
testing_provider: &RootProvider<AnyNetwork>,
transactions: &[Bytes],
@@ -591,8 +774,9 @@ impl Command {
let block_hash = inner.block_hash;
let block_number = inner.block_number;
let timestamp = inner.timestamp;
let gas_used = inner.gas_used;
Ok(BuiltPayload { block_number, envelope: v4_envelope, block_hash, timestamp })
Ok(BuiltPayload { block_number, envelope: v4_envelope, block_hash, timestamp, gas_used })
}
/// Save a payload to disk.

View File

@@ -1,6 +1,33 @@
//! Common helpers for reth-bench commands.
use crate::valid_payload::call_forkchoice_updated;
use eyre::Result;
use std::io::{BufReader, Read};
/// Read input from either a file path or stdin.
pub(crate) fn read_input(path: Option<&str>) -> Result<String> {
Ok(match path {
Some(path) => reth_fs_util::read_to_string(path)?,
None => String::from_utf8(
BufReader::new(std::io::stdin()).bytes().collect::<Result<Vec<_>, _>>()?,
)?,
})
}
/// Load JWT secret from either a file or use the provided string directly.
pub(crate) fn load_jwt_secret(jwt_secret: Option<&str>) -> Result<Option<String>> {
match jwt_secret {
Some(secret) => {
// Try to read as file first
match std::fs::read_to_string(secret) {
Ok(contents) => Ok(Some(contents.trim().to_string())),
// If file read fails, use the string directly
Err(_) => Ok(Some(secret.to_string())),
}
}
None => Ok(None),
}
}
/// Parses a gas limit value with optional suffix: K for thousand, M for million, G for billion.
///

View File

@@ -15,6 +15,7 @@ pub use generate_big_block::{
mod new_payload_fcu;
mod new_payload_only;
mod output;
mod persistence_waiter;
mod replay_payloads;
mod send_invalid_payload;
mod send_payload;

View File

@@ -15,28 +15,23 @@ use crate::{
output::{
write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
},
persistence_waiter::{
derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
PERSISTENCE_CHECKPOINT_TIMEOUT,
},
},
valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
};
use alloy_eips::BlockNumHash;
use alloy_network::Ethereum;
use alloy_provider::{Provider, RootProvider};
use alloy_pubsub::SubscriptionStream;
use alloy_rpc_client::RpcClient;
use alloy_provider::Provider;
use alloy_rpc_types_engine::ForkchoiceState;
use alloy_transport_ws::WsConnect;
use clap::Parser;
use eyre::{Context, OptionExt};
use futures::StreamExt;
use humantime::parse_duration;
use reth_cli_runner::CliContext;
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
use reth_node_core::args::BenchmarkArgs;
use std::time::{Duration, Instant};
use tracing::{debug, info};
use url::Url;
const PERSISTENCE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(60);
/// `reth benchmark new-payload-fcu` command
#[derive(Debug, Parser)]
@@ -105,7 +100,11 @@ impl Command {
let mut waiter = match (self.wait_time, self.wait_for_persistence) {
(Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)),
(None, true) => {
let sub = self.setup_persistence_subscription().await?;
let ws_url = derive_ws_rpc_url(
self.benchmark.ws_rpc_url.as_deref(),
&self.benchmark.engine_rpc_url,
)?;
let sub = setup_persistence_subscription(ws_url).await?;
Some(PersistenceWaiter::with_subscription(
sub,
self.persistence_threshold,
@@ -245,293 +244,22 @@ impl Command {
results.into_iter().unzip();
if let Some(ref path) = self.benchmark.output {
write_benchmark_results(path, &gas_output_results, combined_results)?;
write_benchmark_results(path, &gas_output_results, &combined_results)?;
}
let gas_output = TotalGasOutput::new(gas_output_results)?;
let gas_output =
TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
info!(
total_duration=?gas_output.total_duration,
total_gas_used=?gas_output.total_gas_used,
blocks_processed=?gas_output.blocks_processed,
"Total Ggas/s: {:.4}",
gas_output.total_gigagas_per_second()
total_gas_used = gas_output.total_gas_used,
total_duration = ?gas_output.total_duration,
execution_duration = ?gas_output.execution_duration,
blocks_processed = gas_output.blocks_processed,
wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()),
execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()),
"Benchmark complete"
);
Ok(())
}
/// Returns the websocket RPC URL used for the persistence subscription.
///
/// Preference:
/// - If `--ws-rpc-url` is provided, use it directly.
/// - Otherwise, derive a WS RPC URL from `--engine-rpc-url`.
///
/// The persistence subscription endpoint (`reth_subscribePersistedBlock`) is exposed on
/// the regular RPC server (WS port, usually 8546), not on the engine API port (usually 8551).
/// Since `BenchmarkArgs` only has the engine URL by default, we convert the scheme
/// (http→ws, https→wss) and force the port to 8546.
fn derive_ws_rpc_url(&self) -> eyre::Result<Url> {
if let Some(ref ws_url) = self.benchmark.ws_rpc_url {
let parsed: Url = ws_url
.parse()
.wrap_err_with(|| format!("Failed to parse WebSocket RPC URL: {ws_url}"))?;
info!(target: "reth-bench", ws_url = %parsed, "Using provided WebSocket RPC URL");
Ok(parsed)
} else {
let derived = engine_url_to_ws_url(&self.benchmark.engine_rpc_url)?;
debug!(
target: "reth-bench",
engine_url = %self.benchmark.engine_rpc_url,
%derived,
"Derived WebSocket RPC URL from engine RPC URL"
);
Ok(derived)
}
}
/// Establishes a websocket connection and subscribes to `reth_subscribePersistedBlock`.
async fn setup_persistence_subscription(&self) -> eyre::Result<PersistenceSubscription> {
let ws_url = self.derive_ws_rpc_url()?;
info!("Connecting to WebSocket at {} for persistence subscription", ws_url);
let ws_connect = WsConnect::new(ws_url.to_string());
let client = RpcClient::connect_pubsub(ws_connect)
.await
.wrap_err("Failed to connect to WebSocket RPC endpoint")?;
let provider: RootProvider<Ethereum> = RootProvider::new(client);
let subscription = provider
.subscribe_to::<BlockNumHash>("reth_subscribePersistedBlock")
.await
.wrap_err("Failed to subscribe to persistence notifications")?;
info!("Subscribed to persistence notifications");
Ok(PersistenceSubscription::new(provider, subscription.into_stream()))
}
}
/// Converts an engine API URL to the default RPC websocket URL.
///
/// Transformations:
/// - `http` → `ws`
/// - `https` → `wss`
/// - `ws` / `wss` keep their scheme
/// - Port is always set to `8546`, reth's default RPC websocket port.
///
/// This is used when we only know the engine API URL (typically `:8551`) but
/// need to connect to the node's WS RPC endpoint for persistence events.
fn engine_url_to_ws_url(engine_url: &str) -> eyre::Result<Url> {
let url: Url = engine_url
.parse()
.wrap_err_with(|| format!("Failed to parse engine RPC URL: {engine_url}"))?;
let mut ws_url = url.clone();
match ws_url.scheme() {
"http" => ws_url
.set_scheme("ws")
.map_err(|_| eyre::eyre!("Failed to set WS scheme for URL: {url}"))?,
"https" => ws_url
.set_scheme("wss")
.map_err(|_| eyre::eyre!("Failed to set WSS scheme for URL: {url}"))?,
"ws" | "wss" => {}
scheme => {
return Err(eyre::eyre!(
"Unsupported URL scheme '{scheme}' for URL: {url}. Expected http, https, ws, or wss."
))
}
}
ws_url.set_port(Some(8546)).map_err(|_| eyre::eyre!("Failed to set port for URL: {url}"))?;
Ok(ws_url)
}
/// Waits until the persistence subscription reports that `target` has been persisted.
///
/// Consumes subscription events until `last_persisted >= target`, or returns an error if:
/// - the subscription stream ends unexpectedly, or
/// - `timeout` elapses before `target` is observed.
async fn wait_for_persistence(
stream: &mut SubscriptionStream<BlockNumHash>,
target: u64,
last_persisted: &mut u64,
timeout: Duration,
) -> eyre::Result<()> {
tokio::time::timeout(timeout, async {
while *last_persisted < target {
match stream.next().await {
Some(persisted) => {
*last_persisted = persisted.number;
debug!(
target: "reth-bench",
persisted_block = ?last_persisted,
"Received persistence notification"
);
}
None => {
return Err(eyre::eyre!("Persistence subscription closed unexpectedly"));
}
}
}
Ok(())
})
.await
.map_err(|_| {
eyre::eyre!(
"Persistence timeout: target block {} not persisted within {:?}. Last persisted: {}",
target,
timeout,
last_persisted
)
})?
}
/// Wrapper that keeps both the subscription stream and the underlying provider alive.
/// The provider must be kept alive for the subscription to continue receiving events.
struct PersistenceSubscription {
_provider: RootProvider<Ethereum>,
stream: SubscriptionStream<BlockNumHash>,
}
impl PersistenceSubscription {
const fn new(
provider: RootProvider<Ethereum>,
stream: SubscriptionStream<BlockNumHash>,
) -> Self {
Self { _provider: provider, stream }
}
const fn stream_mut(&mut self) -> &mut SubscriptionStream<BlockNumHash> {
&mut self.stream
}
}
/// Encapsulates the block waiting logic.
///
/// Provides a simple `on_block()` interface that handles both:
/// - Fixed duration waits (when `wait_time` is set)
/// - Persistence-based waits (when `subscription` is set)
///
/// For persistence mode, waits after every `(threshold + 1)` blocks.
struct PersistenceWaiter {
wait_time: Option<Duration>,
subscription: Option<PersistenceSubscription>,
blocks_sent: u64,
last_persisted: u64,
threshold: u64,
timeout: Duration,
}
impl PersistenceWaiter {
const fn with_duration(wait_time: Duration) -> Self {
Self {
wait_time: Some(wait_time),
subscription: None,
blocks_sent: 0,
last_persisted: 0,
threshold: 0,
timeout: Duration::ZERO,
}
}
const fn with_subscription(
subscription: PersistenceSubscription,
threshold: u64,
timeout: Duration,
) -> Self {
Self {
wait_time: None,
subscription: Some(subscription),
blocks_sent: 0,
last_persisted: 0,
threshold,
timeout,
}
}
/// Called once per block. Waits based on the configured mode.
#[allow(clippy::manual_is_multiple_of)]
async fn on_block(&mut self, block_number: u64) -> eyre::Result<()> {
if let Some(wait_time) = self.wait_time {
tokio::time::sleep(wait_time).await;
return Ok(());
}
let Some(ref mut subscription) = self.subscription else {
return Ok(());
};
self.blocks_sent += 1;
if self.blocks_sent % (self.threshold + 1) == 0 {
debug!(
target: "reth-bench",
target_block = ?block_number,
last_persisted = self.last_persisted,
blocks_sent = self.blocks_sent,
"Waiting for persistence"
);
wait_for_persistence(
subscription.stream_mut(),
block_number,
&mut self.last_persisted,
self.timeout,
)
.await?;
debug!(
target: "reth-bench",
persisted = self.last_persisted,
"Persistence caught up"
);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_engine_url_to_ws_url() {
// http -> ws, always uses port 8546
let result = engine_url_to_ws_url("http://localhost:8551").unwrap();
assert_eq!(result.as_str(), "ws://localhost:8546/");
// https -> wss
let result = engine_url_to_ws_url("https://localhost:8551").unwrap();
assert_eq!(result.as_str(), "wss://localhost:8546/");
// Custom engine port still maps to 8546
let result = engine_url_to_ws_url("http://localhost:9551").unwrap();
assert_eq!(result.port(), Some(8546));
// Already ws passthrough
let result = engine_url_to_ws_url("ws://localhost:8546").unwrap();
assert_eq!(result.scheme(), "ws");
// Invalid inputs
assert!(engine_url_to_ws_url("ftp://localhost:8551").is_err());
assert!(engine_url_to_ws_url("not a valid url").is_err());
}
#[tokio::test]
async fn test_waiter_with_duration() {
let mut waiter = PersistenceWaiter::with_duration(Duration::from_millis(1));
let start = Instant::now();
waiter.on_block(1).await.unwrap();
waiter.on_block(2).await.unwrap();
waiter.on_block(3).await.unwrap();
// Should have waited ~3ms total
assert!(start.elapsed() >= Duration::from_millis(3));
}
}

View File

@@ -6,7 +6,7 @@ use csv::Writer;
use eyre::OptionExt;
use reth_primitives_traits::constants::GIGAGAS;
use serde::{ser::SerializeStruct, Deserialize, Serialize};
use std::{path::Path, time::Duration};
use std::{fs, path::Path, time::Duration};
use tracing::info;
/// This is the suffix for gas output csv files.
@@ -158,29 +158,58 @@ pub(crate) struct TotalGasRow {
pub(crate) struct TotalGasOutput {
/// The total gas used in the benchmark.
pub(crate) total_gas_used: u64,
/// The total duration of the benchmark.
/// The total wall-clock duration of the benchmark (includes wait times).
pub(crate) total_duration: Duration,
/// The total gas used per second.
pub(crate) total_gas_per_second: f64,
/// The total execution-only duration (excludes wait times).
pub(crate) execution_duration: Duration,
/// The number of blocks processed.
pub(crate) blocks_processed: u64,
}
impl TotalGasOutput {
/// Create a new [`TotalGasOutput`] from a list of [`TotalGasRow`].
/// Create a new [`TotalGasOutput`] from gas rows only.
///
/// Use this when execution-only timing is not available (e.g., `new_payload_only`).
/// `execution_duration` will equal `total_duration`.
pub(crate) fn new(rows: Vec<TotalGasRow>) -> eyre::Result<Self> {
// the duration is obtained from the last row
let total_duration = rows.last().map(|row| row.time).ok_or_eyre("empty results")?;
let blocks_processed = rows.len() as u64;
let total_gas_used: u64 = rows.into_iter().map(|row| row.gas_used).sum();
let total_gas_per_second = total_gas_used as f64 / total_duration.as_secs_f64();
Ok(Self { total_gas_used, total_duration, total_gas_per_second, blocks_processed })
Ok(Self {
total_gas_used,
total_duration,
execution_duration: total_duration,
blocks_processed,
})
}
/// Return the total gigagas per second.
/// Create a new [`TotalGasOutput`] from gas rows and combined results.
///
/// - `rows`: Used for total gas and wall-clock duration
/// - `combined_results`: Used for execution-only duration (sum of `total_latency`)
pub(crate) fn with_combined_results(
rows: Vec<TotalGasRow>,
combined_results: &[CombinedResult],
) -> eyre::Result<Self> {
let total_duration = rows.last().map(|row| row.time).ok_or_eyre("empty results")?;
let blocks_processed = rows.len() as u64;
let total_gas_used: u64 = rows.into_iter().map(|row| row.gas_used).sum();
// Sum execution-only time from combined results
let execution_duration: Duration = combined_results.iter().map(|r| r.total_latency).sum();
Ok(Self { total_gas_used, total_duration, execution_duration, blocks_processed })
}
/// Return the total gigagas per second based on wall-clock time.
pub(crate) fn total_gigagas_per_second(&self) -> f64 {
self.total_gas_per_second / GIGAGAS as f64
self.total_gas_used as f64 / self.total_duration.as_secs_f64() / GIGAGAS as f64
}
/// Return the execution-only gigagas per second (excludes wait times).
pub(crate) fn execution_gigagas_per_second(&self) -> f64 {
self.total_gas_used as f64 / self.execution_duration.as_secs_f64() / GIGAGAS as f64
}
}
@@ -192,8 +221,10 @@ impl TotalGasOutput {
pub(crate) fn write_benchmark_results(
output_dir: &Path,
gas_results: &[TotalGasRow],
combined_results: Vec<CombinedResult>,
combined_results: &[CombinedResult],
) -> eyre::Result<()> {
fs::create_dir_all(output_dir)?;
let output_path = output_dir.join(COMBINED_OUTPUT_SUFFIX);
info!("Writing engine api call latency output to file: {:?}", output_path);
let mut writer = Writer::from_path(&output_path)?;

View File

@@ -0,0 +1,304 @@
//! Persistence waiting utilities for benchmarks.
//!
//! Provides waiting behavior to control benchmark pacing:
//! - **Fixed duration waits**: Sleep for a fixed time between blocks
//! - **Persistence-based waits**: Wait for blocks to be persisted using
//! `reth_subscribePersistedBlock` subscription
use alloy_eips::BlockNumHash;
use alloy_network::Ethereum;
use alloy_provider::{Provider, RootProvider};
use alloy_pubsub::SubscriptionStream;
use alloy_rpc_client::RpcClient;
use alloy_transport_ws::WsConnect;
use eyre::Context;
use futures::StreamExt;
use std::time::Duration;
use tracing::{debug, info};
/// Default `WebSocket` RPC port for reth.
const DEFAULT_WS_RPC_PORT: u16 = 8546;
use url::Url;
/// Default timeout for waiting on persistence.
pub(crate) const PERSISTENCE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(60);
/// Returns the websocket RPC URL used for the persistence subscription.
///
/// Preference:
/// - If `ws_rpc_url` is provided, use it directly.
/// - Otherwise, derive a WS RPC URL from `engine_rpc_url`.
///
/// The persistence subscription endpoint (`reth_subscribePersistedBlock`) is exposed on
/// the regular RPC server (WS port, usually 8546), not on the engine API port (usually 8551).
/// Since we may only have the engine URL by default, we convert the scheme
/// (http→ws, https→wss) and force the port to 8546.
pub(crate) fn derive_ws_rpc_url(
ws_rpc_url: Option<&str>,
engine_rpc_url: &str,
) -> eyre::Result<Url> {
if let Some(ws_url) = ws_rpc_url {
let parsed: Url = ws_url
.parse()
.wrap_err_with(|| format!("Failed to parse WebSocket RPC URL: {ws_url}"))?;
info!(target: "reth-bench", ws_url = %parsed, "Using provided WebSocket RPC URL");
Ok(parsed)
} else {
let derived = engine_url_to_ws_url(engine_rpc_url)?;
debug!(
target: "reth-bench",
engine_url = %engine_rpc_url,
%derived,
"Derived WebSocket RPC URL from engine RPC URL"
);
Ok(derived)
}
}
/// Converts an engine API URL to the default RPC websocket URL.
///
/// Transformations:
/// - `http` → `ws`
/// - `https` → `wss`
/// - `ws` / `wss` keep their scheme
/// - Port is always set to `8546`, reth's default RPC websocket port.
///
/// This is used when we only know the engine API URL (typically `:8551`) but
/// need to connect to the node's WS RPC endpoint for persistence events.
fn engine_url_to_ws_url(engine_url: &str) -> eyre::Result<Url> {
let url: Url = engine_url
.parse()
.wrap_err_with(|| format!("Failed to parse engine RPC URL: {engine_url}"))?;
let mut ws_url = url.clone();
match ws_url.scheme() {
"http" => ws_url
.set_scheme("ws")
.map_err(|_| eyre::eyre!("Failed to set WS scheme for URL: {url}"))?,
"https" => ws_url
.set_scheme("wss")
.map_err(|_| eyre::eyre!("Failed to set WSS scheme for URL: {url}"))?,
"ws" | "wss" => {}
scheme => {
return Err(eyre::eyre!(
"Unsupported URL scheme '{scheme}' for URL: {url}. Expected http, https, ws, or wss."
))
}
}
ws_url
.set_port(Some(DEFAULT_WS_RPC_PORT))
.map_err(|_| eyre::eyre!("Failed to set port for URL: {url}"))?;
Ok(ws_url)
}
/// Waits until the persistence subscription reports that `target` has been persisted.
///
/// Consumes subscription events until `last_persisted >= target`, or returns an error if:
/// - the subscription stream ends unexpectedly, or
/// - `timeout` elapses before `target` is observed.
async fn wait_for_persistence(
stream: &mut SubscriptionStream<BlockNumHash>,
target: u64,
last_persisted: &mut u64,
timeout: Duration,
) -> eyre::Result<()> {
tokio::time::timeout(timeout, async {
while *last_persisted < target {
match stream.next().await {
Some(persisted) => {
*last_persisted = persisted.number;
debug!(
target: "reth-bench",
persisted_block = ?last_persisted,
"Received persistence notification"
);
}
None => {
return Err(eyre::eyre!("Persistence subscription closed unexpectedly"));
}
}
}
Ok(())
})
.await
.map_err(|_| {
eyre::eyre!(
"Persistence timeout: target block {} not persisted within {:?}. Last persisted: {}",
target,
timeout,
last_persisted
)
})?
}
/// Wrapper that keeps both the subscription stream and the underlying provider alive.
/// The provider must be kept alive for the subscription to continue receiving events.
pub(crate) struct PersistenceSubscription {
_provider: RootProvider<Ethereum>,
stream: SubscriptionStream<BlockNumHash>,
}
impl PersistenceSubscription {
const fn new(
provider: RootProvider<Ethereum>,
stream: SubscriptionStream<BlockNumHash>,
) -> Self {
Self { _provider: provider, stream }
}
const fn stream_mut(&mut self) -> &mut SubscriptionStream<BlockNumHash> {
&mut self.stream
}
}
/// Establishes a websocket connection and subscribes to `reth_subscribePersistedBlock`.
pub(crate) async fn setup_persistence_subscription(
ws_url: Url,
) -> eyre::Result<PersistenceSubscription> {
info!("Connecting to WebSocket at {} for persistence subscription", ws_url);
let ws_connect = WsConnect::new(ws_url.to_string());
let client = RpcClient::connect_pubsub(ws_connect)
.await
.wrap_err("Failed to connect to WebSocket RPC endpoint")?;
let provider: RootProvider<Ethereum> = RootProvider::new(client);
let subscription = provider
.subscribe_to::<BlockNumHash>("reth_subscribePersistedBlock")
.await
.wrap_err("Failed to subscribe to persistence notifications")?;
info!("Subscribed to persistence notifications");
Ok(PersistenceSubscription::new(provider, subscription.into_stream()))
}
/// Encapsulates the block waiting logic.
///
/// Provides a simple `on_block()` interface that handles both:
/// - Fixed duration waits (when `wait_time` is set)
/// - Persistence-based waits (when `subscription` is set)
///
/// For persistence mode, waits after every `(threshold + 1)` blocks.
pub(crate) struct PersistenceWaiter {
wait_time: Option<Duration>,
subscription: Option<PersistenceSubscription>,
blocks_sent: u64,
last_persisted: u64,
threshold: u64,
timeout: Duration,
}
impl PersistenceWaiter {
pub(crate) const fn with_duration(wait_time: Duration) -> Self {
Self {
wait_time: Some(wait_time),
subscription: None,
blocks_sent: 0,
last_persisted: 0,
threshold: 0,
timeout: Duration::ZERO,
}
}
pub(crate) const fn with_subscription(
subscription: PersistenceSubscription,
threshold: u64,
timeout: Duration,
) -> Self {
Self {
wait_time: None,
subscription: Some(subscription),
blocks_sent: 0,
last_persisted: 0,
threshold,
timeout,
}
}
/// Called once per block. Waits based on the configured mode.
#[allow(clippy::manual_is_multiple_of)]
pub(crate) async fn on_block(&mut self, block_number: u64) -> eyre::Result<()> {
if let Some(wait_time) = self.wait_time {
tokio::time::sleep(wait_time).await;
return Ok(());
}
let Some(ref mut subscription) = self.subscription else {
return Ok(());
};
self.blocks_sent += 1;
if self.blocks_sent % (self.threshold + 1) == 0 {
debug!(
target: "reth-bench",
target_block = ?block_number,
last_persisted = self.last_persisted,
blocks_sent = self.blocks_sent,
"Waiting for persistence"
);
wait_for_persistence(
subscription.stream_mut(),
block_number,
&mut self.last_persisted,
self.timeout,
)
.await?;
debug!(
target: "reth-bench",
persisted = self.last_persisted,
"Persistence caught up"
);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Instant;
#[test]
fn test_engine_url_to_ws_url() {
// http -> ws, always uses port 8546
let result = engine_url_to_ws_url("http://localhost:8551").unwrap();
assert_eq!(result.as_str(), "ws://localhost:8546/");
// https -> wss
let result = engine_url_to_ws_url("https://localhost:8551").unwrap();
assert_eq!(result.as_str(), "wss://localhost:8546/");
// Custom engine port still maps to 8546
let result = engine_url_to_ws_url("http://localhost:9551").unwrap();
assert_eq!(result.port(), Some(8546));
// Already ws passthrough
let result = engine_url_to_ws_url("ws://localhost:8546").unwrap();
assert_eq!(result.scheme(), "ws");
// Invalid inputs
assert!(engine_url_to_ws_url("ftp://localhost:8551").is_err());
assert!(engine_url_to_ws_url("not a valid url").is_err());
}
#[tokio::test]
async fn test_waiter_with_duration() {
let mut waiter = PersistenceWaiter::with_duration(Duration::from_millis(1));
let start = Instant::now();
waiter.on_block(1).await.unwrap();
waiter.on_block(2).await.unwrap();
waiter.on_block(3).await.unwrap();
// Should have waited ~3ms total
assert!(start.elapsed() >= Duration::from_millis(3));
}
}

View File

@@ -2,10 +2,27 @@
//!
//! This command reads `ExecutionPayloadEnvelopeV4` files from a directory and replays them
//! in sequence using `newPayload` followed by `forkchoiceUpdated`.
//!
//! Supports configurable waiting behavior:
//! - **`--wait-time`**: Fixed sleep interval between blocks.
//! - **`--wait-for-persistence`**: Waits for every Nth block to be persisted using the
//! `reth_subscribePersistedBlock` subscription, where N matches the engine's persistence
//! threshold. This ensures the benchmark doesn't outpace persistence.
//!
//! Both options can be used together or independently.
use crate::{
authenticated_transport::AuthenticatedTransportConnect,
bench::output::GasRampPayloadFile,
bench::{
output::{
write_benchmark_results, CombinedResult, GasRampPayloadFile, NewPayloadResult,
TotalGasOutput, TotalGasRow,
},
persistence_waiter::{
derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
PERSISTENCE_CHECKPOINT_TIMEOUT,
},
},
valid_payload::{call_forkchoice_updated, call_new_payload},
};
use alloy_primitives::B256;
@@ -14,11 +31,16 @@ use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret};
use clap::Parser;
use eyre::Context;
use reqwest::Url;
use humantime::parse_duration;
use reth_cli_runner::CliContext;
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
use reth_node_api::EngineApiMessageVersion;
use std::path::PathBuf;
use std::{
path::PathBuf,
time::{Duration, Instant},
};
use tracing::{debug, info};
use url::Url;
/// `reth bench replay-payloads` command
///
@@ -51,6 +73,42 @@ pub struct Command {
/// These are replayed before the main payloads to warm up the gas limit.
#[arg(long, value_name = "GAS_RAMP_DIR")]
gas_ramp_dir: Option<PathBuf>,
/// Optional output directory for benchmark results (CSV files).
#[arg(long, value_name = "OUTPUT")]
output: Option<PathBuf>,
/// How long to wait after a forkchoice update before sending the next payload.
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
wait_time: Option<Duration>,
/// Wait for blocks to be persisted before sending the next batch.
///
/// When enabled, waits for every Nth block to be persisted using the
/// `reth_subscribePersistedBlock` subscription. This ensures the benchmark
/// doesn't outpace persistence.
///
/// The subscription uses the regular RPC websocket endpoint (no JWT required).
#[arg(long, default_value = "false", verbatim_doc_comment)]
wait_for_persistence: bool,
/// Engine persistence threshold used for deciding when to wait for persistence.
///
/// The benchmark waits after every `(threshold + 1)` blocks. By default this
/// matches the engine's `DEFAULT_PERSISTENCE_THRESHOLD` (2), so waits occur
/// at blocks 3, 6, 9, etc.
#[arg(
long = "persistence-threshold",
value_name = "PERSISTENCE_THRESHOLD",
default_value_t = DEFAULT_PERSISTENCE_THRESHOLD,
verbatim_doc_comment
)]
persistence_threshold: u64,
/// Optional `WebSocket` RPC URL for persistence subscription.
/// If not provided, derives from engine RPC URL by changing scheme to ws and port to 8546.
#[arg(long, value_name = "WS_RPC_URL", verbatim_doc_comment)]
ws_rpc_url: Option<String>,
}
/// A loaded payload ready for execution.
@@ -78,6 +136,33 @@ impl Command {
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
info!(payload_dir = %self.payload_dir.display(), "Replaying payloads");
// Log mode configuration
if let Some(duration) = self.wait_time {
info!("Using wait-time mode with {}ms delay between blocks", duration.as_millis());
}
if self.wait_for_persistence {
info!(
"Persistence waiting enabled (waits after every {} blocks to match engine gap > {} behavior)",
self.persistence_threshold + 1,
self.persistence_threshold
);
}
// Set up waiter based on configured options (duration takes precedence)
let mut waiter = match (self.wait_time, self.wait_for_persistence) {
(Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)),
(None, true) => {
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
let sub = setup_persistence_subscription(ws_url).await?;
Some(PersistenceWaiter::with_subscription(
sub,
self.persistence_threshold,
PERSISTENCE_CHECKPOINT_TIMEOUT,
))
}
(None, false) => None,
};
// Set up authenticated engine provider
let jwt =
std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
@@ -144,6 +229,11 @@ impl Command {
call_forkchoice_updated(&auth_provider, payload.version, fcu_state, None).await?;
info!(gas_ramp_payload = i + 1, "Gas ramp payload executed successfully");
if let Some(w) = &mut waiter {
w.on_block(payload.block_number).await?;
}
parent_hash = payload.file.block_hash;
}
@@ -151,22 +241,112 @@ impl Command {
info!(count = gas_ramp_payloads.len(), "All gas ramp payloads replayed");
}
let mut results = Vec::new();
let total_benchmark_duration = Instant::now();
for (i, payload) in payloads.iter().enumerate() {
info!(
let envelope = &payload.envelope;
let block_hash = payload.block_hash;
let execution_payload = &envelope.envelope_inner.execution_payload;
let inner_payload = &execution_payload.payload_inner.payload_inner;
let gas_used = inner_payload.gas_used;
let gas_limit = inner_payload.gas_limit;
let block_number = inner_payload.block_number;
let transaction_count =
execution_payload.payload_inner.payload_inner.transactions.len() as u64;
debug!(
payload = i + 1,
total = payloads.len(),
index = payload.index,
block_hash = %payload.block_hash,
block_hash = %block_hash,
"Executing payload (newPayload + FCU)"
);
self.execute_payload_v4(&auth_provider, &payload.envelope, parent_hash).await?;
let start = Instant::now();
info!(payload = i + 1, "Payload executed successfully");
parent_hash = payload.block_hash;
debug!(
method = "engine_newPayloadV4",
block_hash = %block_hash,
"Sending newPayload"
);
let status = auth_provider
.new_payload_v4(
execution_payload.clone(),
vec![],
B256::ZERO,
envelope.execution_requests.to_vec(),
)
.await?;
let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
if !status.is_valid() {
return Err(eyre::eyre!("Payload rejected: {:?}", status));
}
let fcu_state = ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash: parent_hash,
finalized_block_hash: parent_hash,
};
debug!(method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated");
let fcu_result = auth_provider.fork_choice_updated_v3(fcu_state, None).await?;
let total_latency = start.elapsed();
let fcu_latency = total_latency - new_payload_result.latency;
let combined_result = CombinedResult {
block_number,
gas_limit,
transaction_count,
new_payload_result,
fcu_latency,
total_latency,
};
let current_duration = total_benchmark_duration.elapsed();
info!(%combined_result);
if let Some(w) = &mut waiter {
w.on_block(block_number).await?;
}
let gas_row =
TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
results.push((gas_row, combined_result));
debug!(?status, ?fcu_result, "Payload executed successfully");
parent_hash = block_hash;
}
info!(count = payloads.len(), "All payloads replayed successfully");
// Drop waiter - we don't need to wait for final blocks to persist
// since the benchmark goal is measuring Ggas/s of newPayload/FCU, not persistence.
drop(waiter);
let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
results.into_iter().unzip();
if let Some(ref path) = self.output {
write_benchmark_results(path, &gas_output_results, &combined_results)?;
}
let gas_output =
TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
info!(
total_gas_used = gas_output.total_gas_used,
total_duration = ?gas_output.total_duration,
execution_duration = ?gas_output.execution_duration,
blocks_processed = gas_output.blocks_processed,
wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()),
execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()),
"Benchmark complete"
);
Ok(())
}
@@ -284,49 +464,4 @@ impl Command {
Ok(payloads)
}
async fn execute_payload_v4(
&self,
provider: &RootProvider<AnyNetwork>,
envelope: &ExecutionPayloadEnvelopeV4,
parent_hash: B256,
) -> eyre::Result<()> {
let block_hash =
envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
debug!(
method = "engine_newPayloadV4",
block_hash = %block_hash,
"Sending newPayload"
);
let status = provider
.new_payload_v4(
envelope.envelope_inner.execution_payload.clone(),
vec![],
B256::ZERO,
envelope.execution_requests.to_vec(),
)
.await?;
info!(?status, "newPayloadV4 response");
if !status.is_valid() {
return Err(eyre::eyre!("Payload rejected: {:?}", status));
}
let fcu_state = ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash: parent_hash,
finalized_block_hash: parent_hash,
};
debug!(method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated");
let fcu_result = provider.fork_choice_updated_v3(fcu_state, None).await?;
info!(?fcu_result, "forkchoiceUpdatedV3 response");
Ok(())
}
}

View File

@@ -3,6 +3,7 @@
mod invalidation;
use invalidation::InvalidationConfig;
use super::helpers::{load_jwt_secret, read_input};
use alloy_primitives::{Address, B256};
use alloy_provider::network::AnyRpcBlock;
use alloy_rpc_types_engine::ExecutionPayload;
@@ -10,7 +11,7 @@ use clap::Parser;
use eyre::{OptionExt, Result};
use op_alloy_consensus::OpTxEnvelope;
use reth_cli_runner::CliContext;
use std::io::{BufReader, Read, Write};
use std::io::Write;
/// Command for generating and sending an invalid `engine_newPayload` request.
///
@@ -180,27 +181,6 @@ enum Mode {
}
impl Command {
/// Read input from either a file or stdin
fn read_input(&self) -> Result<String> {
Ok(match &self.path {
Some(path) => reth_fs_util::read_to_string(path)?,
None => String::from_utf8(
BufReader::new(std::io::stdin()).bytes().collect::<Result<Vec<_>, _>>()?,
)?,
})
}
/// Load JWT secret from either a file or use the provided string directly
fn load_jwt_secret(&self) -> Result<Option<String>> {
match &self.jwt_secret {
Some(secret) => match std::fs::read_to_string(secret) {
Ok(contents) => Ok(Some(contents.trim().to_string())),
Err(_) => Ok(Some(secret.clone())),
},
None => Ok(None),
}
}
/// Build `InvalidationConfig` from command flags
const fn build_invalidation_config(&self) -> InvalidationConfig {
InvalidationConfig {
@@ -236,8 +216,8 @@ impl Command {
/// Execute the command
pub async fn execute(self, _ctx: CliContext) -> Result<()> {
let block_json = self.read_input()?;
let jwt_secret = self.load_jwt_secret()?;
let block_json = read_input(self.path.as_deref())?;
let jwt_secret = load_jwt_secret(self.jwt_secret.as_deref())?;
let block = serde_json::from_str::<AnyRpcBlock>(&block_json)?
.into_inner()

View File

@@ -1,10 +1,11 @@
use super::helpers::{load_jwt_secret, read_input};
use alloy_provider::network::AnyRpcBlock;
use alloy_rpc_types_engine::ExecutionPayload;
use clap::Parser;
use eyre::{OptionExt, Result};
use op_alloy_consensus::OpTxEnvelope;
use reth_cli_runner::CliContext;
use std::io::{BufReader, Read, Write};
use std::io::Write;
/// Command for generating and sending an `engine_newPayload` request constructed from an RPC
/// block.
@@ -51,38 +52,13 @@ enum Mode {
}
impl Command {
/// Read input from either a file or stdin
fn read_input(&self) -> Result<String> {
Ok(match &self.path {
Some(path) => reth_fs_util::read_to_string(path)?,
None => String::from_utf8(
BufReader::new(std::io::stdin()).bytes().collect::<Result<Vec<_>, _>>()?,
)?,
})
}
/// Load JWT secret from either a file or use the provided string directly
fn load_jwt_secret(&self) -> Result<Option<String>> {
match &self.jwt_secret {
Some(secret) => {
// Try to read as file first
match std::fs::read_to_string(secret) {
Ok(contents) => Ok(Some(contents.trim().to_string())),
// If file read fails, use the string directly
Err(_) => Ok(Some(secret.clone())),
}
}
None => Ok(None),
}
}
/// Execute the generate payload command
pub async fn execute(self, _ctx: CliContext) -> Result<()> {
// Load block
let block_json = self.read_input()?;
let block_json = read_input(self.path.as_deref())?;
// Load JWT secret
let jwt_secret = self.load_jwt_secret()?;
let jwt_secret = load_jwt_secret(self.jwt_secret.as_deref())?;
// Parse the block
let block = serde_json::from_str::<AnyRpcBlock>(&block_json)?

View File

@@ -260,7 +260,9 @@ pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
while !status.is_valid() {
if status.is_invalid() {
error!(?status, ?params, "Invalid {method}",);
panic!("Invalid {method}: {status:?}");
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(std::io::Error::other(
format!("Invalid {method}: {status:?}"),
))))
}
if status.is_syncing() {
return Err(alloy_json_rpc::RpcError::UnsupportedFeature(

View File

@@ -78,6 +78,7 @@ lz4.workspace = true
zstd.workspace = true
serde.workspace = true
serde_json.workspace = true
parking_lot.workspace = true
tar.workspace = true
tracing.workspace = true
backon.workspace = true

View File

@@ -17,6 +17,7 @@ mod get;
mod list;
mod repair_trie;
mod settings;
mod state;
mod static_file_header;
mod stats;
/// DB List TUI
@@ -65,6 +66,8 @@ pub enum Subcommands {
Settings(settings::Command),
/// Gets storage size information for an account
AccountStorage(account_storage::Command),
/// Gets account state and storage at a specific block
State(state::Command),
}
/// Initializes a provider factory with specified access rights, and then execute with the provided
@@ -198,6 +201,11 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
command.execute(&tool)?;
});
}
Subcommands::State(command) => {
db_exec!(self.env, tool, N, AccessRights::RO, {
command.execute(&tool)?;
});
}
}
Ok(())

View File

@@ -0,0 +1,412 @@
use alloy_primitives::{Address, BlockNumber, B256, U256};
use clap::Parser;
use parking_lot::Mutex;
use reth_db_api::{
cursor::{DbCursorRO, DbDupCursorRO},
database::Database,
tables,
transaction::DbTx,
};
use reth_db_common::DbTool;
use reth_node_builder::NodeTypesWithDB;
use reth_provider::providers::ProviderNodeTypes;
use reth_storage_api::{BlockNumReader, StateProvider, StorageSettingsCache};
use std::{
collections::BTreeSet,
thread,
time::{Duration, Instant},
};
use tracing::{error, info};
/// Log progress every 5 seconds
const LOG_INTERVAL: Duration = Duration::from_secs(30);
/// The arguments for the `reth db state` command
#[derive(Parser, Debug)]
pub struct Command {
/// The account address to get state for
address: Address,
/// Block number to query state at (uses current state if not provided)
#[arg(long, short)]
block: Option<BlockNumber>,
/// Maximum number of storage slots to display
#[arg(long, short, default_value = "100")]
limit: usize,
/// Output format (table, json, csv)
#[arg(long, short, default_value = "table")]
format: OutputFormat,
}
impl Command {
/// Execute `db state` command
pub fn execute<N: NodeTypesWithDB + ProviderNodeTypes>(
self,
tool: &DbTool<N>,
) -> eyre::Result<()> {
let address = self.address;
let limit = self.limit;
if let Some(block) = self.block {
self.execute_historical(tool, address, block, limit)
} else {
self.execute_current(tool, address, limit)
}
}
fn execute_current<N: NodeTypesWithDB + ProviderNodeTypes>(
&self,
tool: &DbTool<N>,
address: Address,
limit: usize,
) -> eyre::Result<()> {
let entries = tool.provider_factory.db_ref().view(|tx| {
// Get account info
let account = tx.get::<tables::PlainAccountState>(address)?;
// Get storage entries
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let mut entries = Vec::new();
let mut last_log = Instant::now();
let walker = cursor.walk_dup(Some(address), None)?;
for (idx, entry) in walker.enumerate() {
let (_, storage_entry) = entry?;
if storage_entry.value != U256::ZERO {
entries.push((storage_entry.key, storage_entry.value));
}
if entries.len() >= limit {
break;
}
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
address = %address,
slots_scanned = idx,
"Scanning storage slots"
);
last_log = Instant::now();
}
}
Ok::<_, eyre::Report>((account, entries))
})??;
let (account, storage_entries) = entries;
self.print_results(address, None, account, &storage_entries);
Ok(())
}
fn execute_historical<N: NodeTypesWithDB + ProviderNodeTypes>(
&self,
tool: &DbTool<N>,
address: Address,
block: BlockNumber,
limit: usize,
) -> eyre::Result<()> {
let provider = tool.provider_factory.history_by_block_number(block)?;
// Get account info at that block
let account = provider.basic_account(&address)?;
// Check storage settings to determine where history is stored
let storage_settings = tool.provider_factory.cached_storage_settings();
let history_in_rocksdb = storage_settings.storages_history_in_rocksdb;
// For historical queries, enumerate keys from history indices only
// (not PlainStorageState, which reflects current state)
let mut storage_keys = BTreeSet::new();
if history_in_rocksdb {
error!(
target: "reth::cli",
"Historical storage queries with RocksDB backend are not yet supported. \
Use MDBX for storage history or query current state without --block."
);
return Ok(());
}
// Collect keys from MDBX StorageChangeSets using parallel scanning
self.collect_mdbx_storage_keys_parallel(tool, address, &mut storage_keys)?;
info!(
target: "reth::cli",
address = %address,
block = block,
total_keys = storage_keys.len(),
"Found storage keys to query"
);
// Now query each key at the historical block using the StateProvider
// This handles both MDBX and RocksDB backends transparently
let mut entries = Vec::new();
let mut last_log = Instant::now();
for (idx, key) in storage_keys.iter().enumerate() {
match provider.storage(address, *key) {
Ok(Some(value)) if value != U256::ZERO => {
entries.push((*key, value));
}
_ => {}
}
if entries.len() >= limit {
break;
}
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
address = %address,
block = block,
keys_total = storage_keys.len(),
slots_scanned = idx,
slots_found = entries.len(),
"Scanning historical storage slots"
);
last_log = Instant::now();
}
}
self.print_results(address, Some(block), account, &entries);
Ok(())
}
/// Collects storage keys from MDBX StorageChangeSets using parallel block range scanning.
fn collect_mdbx_storage_keys_parallel<N: NodeTypesWithDB + ProviderNodeTypes>(
&self,
tool: &DbTool<N>,
address: Address,
keys: &mut BTreeSet<B256>,
) -> eyre::Result<()> {
const CHUNK_SIZE: u64 = 500_000; // 500k blocks per thread
let num_threads = std::thread::available_parallelism()
.map(|p| p.get().saturating_sub(1).max(1))
.unwrap_or(4);
// Get the current tip block
let tip = tool.provider_factory.provider()?.best_block_number()?;
if tip == 0 {
return Ok(());
}
info!(
target: "reth::cli",
address = %address,
tip,
chunk_size = CHUNK_SIZE,
num_threads,
"Starting parallel MDBX changeset scan"
);
// Shared state for collecting keys
let collected_keys: Mutex<BTreeSet<B256>> = Mutex::new(BTreeSet::new());
let total_entries_scanned = Mutex::new(0usize);
// Create chunk ranges
let mut chunks: Vec<(u64, u64)> = Vec::new();
let mut start = 0u64;
while start <= tip {
let end = (start + CHUNK_SIZE - 1).min(tip);
chunks.push((start, end));
start = end + 1;
}
let chunks_ref = &chunks;
let next_chunk = Mutex::new(0usize);
let next_chunk_ref = &next_chunk;
let collected_keys_ref = &collected_keys;
let total_entries_ref = &total_entries_scanned;
thread::scope(|s| {
let handles: Vec<_> = (0..num_threads)
.map(|thread_id| {
s.spawn(move || {
loop {
// Get next chunk to process
let chunk_idx = {
let mut idx = next_chunk_ref.lock();
if *idx >= chunks_ref.len() {
return Ok::<_, eyre::Report>(());
}
let current = *idx;
*idx += 1;
current
};
let (chunk_start, chunk_end) = chunks_ref[chunk_idx];
// Open a new read transaction for this chunk
tool.provider_factory.db_ref().view(|tx| {
tx.disable_long_read_transaction_safety();
let mut changeset_cursor =
tx.cursor_read::<tables::StorageChangeSets>()?;
let start_key =
reth_db_api::models::BlockNumberAddress((chunk_start, address));
let end_key =
reth_db_api::models::BlockNumberAddress((chunk_end, address));
let mut local_keys = BTreeSet::new();
let mut entries_in_chunk = 0usize;
if let Ok(walker) = changeset_cursor.walk_range(start_key..=end_key)
{
for (block_addr, storage_entry) in walker.flatten() {
if block_addr.address() == address {
local_keys.insert(storage_entry.key);
}
entries_in_chunk += 1;
}
}
// Merge into global state
collected_keys_ref.lock().extend(local_keys);
*total_entries_ref.lock() += entries_in_chunk;
info!(
target: "reth::cli",
thread_id,
chunk_start,
chunk_end,
entries_in_chunk,
"Thread completed chunk"
);
Ok::<_, eyre::Report>(())
})??;
}
})
})
.collect();
for handle in handles {
handle.join().map_err(|_| eyre::eyre!("Thread panicked"))??;
}
Ok::<_, eyre::Report>(())
})?;
let final_keys = collected_keys.into_inner();
let total = *total_entries_scanned.lock();
info!(
target: "reth::cli",
address = %address,
total_entries = total,
unique_keys = final_keys.len(),
"Finished parallel MDBX changeset scan"
);
keys.extend(final_keys);
Ok(())
}
fn print_results(
&self,
address: Address,
block: Option<BlockNumber>,
account: Option<reth_primitives_traits::Account>,
storage: &[(alloy_primitives::B256, U256)],
) {
match self.format {
OutputFormat::Table => {
println!("Account: {address}");
if let Some(b) = block {
println!("Block: {b}");
} else {
println!("Block: latest");
}
println!();
if let Some(acc) = account {
println!("Nonce: {}", acc.nonce);
println!("Balance: {} wei", acc.balance);
if let Some(code_hash) = acc.bytecode_hash {
println!("Code hash: {code_hash}");
}
} else {
println!("Account not found");
}
println!();
println!("Storage ({} slots):", storage.len());
println!("{:-<130}", "");
println!("{:<66} | {:<64}", "Slot", "Value");
println!("{:-<130}", "");
for (key, value) in storage {
println!("{key} | {value:#066x}");
}
}
OutputFormat::Json => {
let output = serde_json::json!({
"address": address.to_string(),
"block": block,
"account": account.map(|a| serde_json::json!({
"nonce": a.nonce,
"balance": a.balance.to_string(),
"code_hash": a.bytecode_hash.map(|h| h.to_string()),
})),
"storage": storage.iter().map(|(k, v)| {
serde_json::json!({
"key": k.to_string(),
"value": format!("{v:#066x}"),
})
}).collect::<Vec<_>>(),
});
println!("{}", serde_json::to_string_pretty(&output).unwrap());
}
OutputFormat::Csv => {
println!("slot,value");
for (key, value) in storage {
println!("{key},{value:#066x}");
}
}
}
}
}
#[derive(Debug, Clone, Default, clap::ValueEnum)]
pub enum OutputFormat {
#[default]
Table,
Json,
Csv,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_state_args() {
let cmd = Command::try_parse_from([
"state",
"0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045",
"--block",
"1000000",
])
.unwrap();
assert_eq!(
cmd.address,
"0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045".parse::<Address>().unwrap()
);
assert_eq!(cmd.block, Some(1000000));
}
#[test]
fn parse_state_args_no_block() {
let cmd = Command::try_parse_from(["state", "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"])
.unwrap();
assert_eq!(cmd.block, None);
}
}

View File

@@ -1,8 +1,16 @@
//! Command that runs pruning without any limits.
use crate::common::{AccessRights, CliNodeTypes, EnvironmentArgs};
use clap::Parser;
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks};
use reth_cli::chainspec::ChainSpecParser;
use reth_cli_runner::CliContext;
use reth_node_builder::common::metrics_hooks;
use reth_node_core::{args::MetricArgs, version::version_metadata};
use reth_node_metrics::{
chain::ChainSpecInfo,
server::{MetricServer, MetricServerConfig},
version::VersionInfo,
};
use reth_prune::PrunerBuilder;
use reth_static_file::StaticFileProducer;
use std::sync::Arc;
@@ -13,14 +21,42 @@ use tracing::info;
pub struct PruneCommand<C: ChainSpecParser> {
#[command(flatten)]
env: EnvironmentArgs<C>,
/// Prometheus metrics configuration.
#[command(flatten)]
metrics: MetricArgs,
}
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> PruneCommand<C> {
/// Execute the `prune` command
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(self) -> eyre::Result<()> {
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(
self,
ctx: CliContext,
) -> eyre::Result<()> {
let env = self.env.init::<N>(AccessRights::RW)?;
let provider_factory = env.provider_factory;
let config = env.config.prune;
let data_dir = env.data_dir;
if let Some(listen_addr) = self.metrics.prometheus {
let config = MetricServerConfig::new(
listen_addr,
VersionInfo {
version: version_metadata().cargo_pkg_version.as_ref(),
build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
cargo_features: version_metadata().vergen_cargo_features.as_ref(),
git_sha: version_metadata().vergen_git_sha.as_ref(),
target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
build_profile: version_metadata().build_profile_name.as_ref(),
},
ChainSpecInfo { name: provider_factory.chain_spec().chain().to_string() },
ctx.task_executor,
metrics_hooks(&provider_factory),
data_dir.pprof_dumps(),
);
MetricServer::new(config).serve().await?;
}
// Copy data from database to static files
info!(target: "reth::cli", "Copying data from database to static files...");

View File

@@ -38,12 +38,8 @@ pub trait BlockProvider: Send + Sync + 'static {
offset: usize,
) -> impl Future<Output = eyre::Result<B256>> + Send {
async move {
let stored_hash = previous_block_hashes
.len()
.checked_sub(offset)
.and_then(|index| previous_block_hashes.get(index));
if let Some(hash) = stored_hash {
return Ok(*hash);
if let Some(hash) = get_hash_at_offset(previous_block_hashes, offset) {
return Ok(hash);
}
// Return zero hash if the chain isn't long enough to have the block at the offset.
@@ -83,7 +79,7 @@ where
/// Spawn the client to start sending FCUs and new payloads by periodically fetching recent
/// blocks.
pub async fn run(self) {
let mut previous_block_hashes = AllocRingBuffer::new(64);
let mut previous_block_hashes = AllocRingBuffer::new(65);
let mut block_stream = {
let (tx, rx) = mpsc::channel::<P::Block>(64);
let block_provider = self.block_provider.clone();
@@ -142,3 +138,60 @@ where
}
}
}
/// Looks up a block hash from the ring buffer at the given offset from the most recent entry.
///
/// Returns `None` if the buffer doesn't have enough entries to satisfy the offset.
fn get_hash_at_offset(buffer: &AllocRingBuffer<B256>, offset: usize) -> Option<B256> {
buffer.len().checked_sub(offset + 1).and_then(|index| buffer.get(index).copied())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_hash_at_offset() {
let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
// Empty buffer returns None for any offset
assert_eq!(get_hash_at_offset(&buffer, 0), None);
assert_eq!(get_hash_at_offset(&buffer, 1), None);
// Push hashes 0..65
for i in 0..65u8 {
buffer.push(B256::with_last_byte(i));
}
// offset=0 should return the most recent (64)
assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(64)));
// offset=32 (safe block) should return hash 32
assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(32)));
// offset=64 (finalized block) should return hash 0 (the oldest)
assert_eq!(get_hash_at_offset(&buffer, 64), Some(B256::with_last_byte(0)));
// offset=65 exceeds buffer, should return None
assert_eq!(get_hash_at_offset(&buffer, 65), None);
}
#[test]
fn test_get_hash_at_offset_insufficient_entries() {
let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
// With only 1 entry, only offset=0 works
buffer.push(B256::with_last_byte(1));
assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(1)));
assert_eq!(get_hash_at_offset(&buffer, 1), None);
assert_eq!(get_hash_at_offset(&buffer, 32), None);
assert_eq!(get_hash_at_offset(&buffer, 64), None);
// With 33 entries, offset=32 works but offset=64 doesn't
for i in 2..=33u8 {
buffer.push(B256::with_last_byte(i));
}
assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(1)));
assert_eq!(get_hash_at_offset(&buffer, 64), None);
}
}

View File

@@ -469,3 +469,123 @@ async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
Ok(())
}
/// Reorg with `RocksDB`: verifies that unwind correctly reads changesets from
/// storage-aware locations (static files vs MDBX) rather than directly from MDBX.
///
/// This test exercises `unwind_trie_state_from` which previously failed with
/// `UnsortedInput` errors because it read changesets directly from MDBX tables
/// instead of using storage-aware methods that check `storage_changesets_in_static_files`.
#[tokio::test]
async fn test_rocksdb_reorg_unwind() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
)
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.build()
.await?;
assert_eq!(nodes.len(), 1);
// Use two separate wallets to avoid nonce conflicts during reorg
let wallets = wallet::Wallet::new(2).with_chain_id(chain_id).wallet_gen();
let signer1 = wallets[0].clone();
let signer2 = wallets[1].clone();
let client = nodes[0].rpc_client().expect("RPC client");
// Mine block 1 with a transaction from signer1
let raw_tx1 =
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer1.clone(), 0).await;
let tx_hash1 = nodes[0].rpc.inject_tx(raw_tx1).await?;
wait_for_pending_tx(&client, tx_hash1).await;
let payload1 = nodes[0].advance_block().await?;
let block1_hash = payload1.block().hash();
assert_eq!(payload1.block().number(), 1);
// Poll until tx1 appears in RocksDB (ensures persistence happened)
let tx_number1 = poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash1).await;
assert_eq!(tx_number1, 0, "First tx should have tx_number 0");
// Mine block 2 with transaction from signer1 (nonce 1)
let raw_tx2 =
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer1.clone(), 1).await;
let tx_hash2 = nodes[0].rpc.inject_tx(raw_tx2).await?;
wait_for_pending_tx(&client, tx_hash2).await;
let payload2 = nodes[0].advance_block().await?;
assert_eq!(payload2.block().number(), 2);
// Poll until tx2 appears in RocksDB
let tx_number2 = poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash2).await;
assert_eq!(tx_number2, 1, "Second tx should have tx_number 1");
// Mine block 3 with transaction from signer1 (nonce 2)
let raw_tx3 =
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer1.clone(), 2).await;
let tx_hash3 = nodes[0].rpc.inject_tx(raw_tx3).await?;
wait_for_pending_tx(&client, tx_hash3).await;
let payload3 = nodes[0].advance_block().await?;
assert_eq!(payload3.block().number(), 3);
// Poll until tx3 appears in RocksDB
let tx_number3 = poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash3).await;
assert_eq!(tx_number3, 2, "Third tx should have tx_number 2");
// Now create an alternate block 2 using signer2 (different wallet, avoids nonce conflict)
// Inject a tx from signer2 (nonce 0) before building the alternate block
let raw_alt_tx =
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer2.clone(), 0).await;
let alt_tx_hash = nodes[0].rpc.inject_tx(raw_alt_tx).await?;
wait_for_pending_tx(&client, alt_tx_hash).await;
// Build an alternate payload (this builds on top of the current head, i.e., block 3)
// But we want to reorg back to block 1, so we'll use the payload and then FCU to it
let alt_payload = nodes[0].new_payload().await?;
let alt_block_hash = nodes[0].submit_payload(alt_payload.clone()).await?;
// Trigger reorg: make the alternate chain canonical by sending FCU pointing to block 1's hash
// as finalized, which should trigger an unwind of blocks 2 and 3
// The alt block becomes the new head
nodes[0].update_forkchoice(block1_hash, alt_block_hash).await?;
// Give time for the reorg to complete
tokio::time::sleep(Duration::from_millis(500)).await;
// Verify we can still query transactions and the chain is consistent
// If unwind_trie_state_from failed, this would have errored during reorg
let latest: Option<alloy_rpc_types_eth::Block> =
client.request("eth_getBlockByNumber", ("latest", false)).await?;
let latest = latest.expect("Latest block should exist");
// The alt block is at height 4 (on top of block 3)
assert!(latest.header.number >= 3, "Should be at height >= 3 after operation");
// tx1 from block 1 should still be there
let tx1: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash1]).await?;
assert!(tx1.is_some(), "tx1 from block 1 should still be queryable");
assert_eq!(tx1.unwrap().block_number, Some(1));
// Mine another block to verify the chain can continue
let raw_tx_final =
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer2.clone(), 1).await;
let tx_hash_final = nodes[0].rpc.inject_tx(raw_tx_final).await?;
wait_for_pending_tx(&client, tx_hash_final).await;
let final_payload = nodes[0].advance_block().await?;
assert!(final_payload.block().number() > 3, "Should be able to mine block after reorg");
// Verify tx_final is included
let tx_final: Option<Transaction> =
client.request("eth_getTransactionByHash", [tx_hash_final]).await?;
assert!(tx_final.is_some(), "final tx should be in latest block");
Ok(())
}

View File

@@ -32,9 +32,7 @@ futures-util.workspace = true
# misc
eyre.workspace = true
tracing.workspace = true
op-alloy-rpc-types-engine = { workspace = true, optional = true }
reth-optimism-chainspec = { workspace = true, optional = true }
[lints]
workspace = true
@@ -42,7 +40,6 @@ workspace = true
[features]
op = [
"dep:op-alloy-rpc-types-engine",
"dep:reth-optimism-chainspec",
"reth-payload-primitives/op",
"reth-primitives-traits/op",
]

View File

@@ -72,13 +72,18 @@ where
&self,
parent: &SealedHeader<ChainSpec::Header>,
) -> op_alloy_rpc_types_engine::OpPayloadAttributes {
/// Dummy system transaction for dev mode.
/// OP Mainnet transaction at index 0 in block 124665056.
///
/// <https://optimistic.etherscan.io/tx/0x312e290cf36df704a2217b015d6455396830b0ce678b860ebfcc30f41403d7b1>
const TX_SET_L1_BLOCK_OP_MAINNET_BLOCK_124665056: [u8; 251] = alloy_primitives::hex!(
"7ef8f8a0683079df94aa5b9cf86687d739a60a9b4f0835e520ec4d664e2e415dca17a6df94deaddeaddeaddeaddeaddeaddeaddeaddead00019442000000000000000000000000000000000000158080830f424080b8a4440a5e200000146b000f79c500000000000000040000000066d052e700000000013ad8a3000000000000000000000000000000000000000000000000000000003ef1278700000000000000000000000000000000000000000000000000000000000000012fdf87b89884a61e74b322bbcf60386f543bfae7827725efaaf0ab1de2294a590000000000000000000000006887246668a3b87f54deb3b94ba47a6f63f32985"
);
op_alloy_rpc_types_engine::OpPayloadAttributes {
payload_attributes: self.build(parent),
// Add dummy system transaction
transactions: Some(vec![
reth_optimism_chainspec::constants::TX_SET_L1_BLOCK_OP_MAINNET_BLOCK_124665056
.into(),
]),
transactions: Some(vec![TX_SET_L1_BLOCK_OP_MAINNET_BLOCK_124665056.into()]),
no_tx_pool: None,
gas_limit: None,
eip_1559_params: None,

View File

@@ -47,6 +47,17 @@ pub const DEFAULT_RESERVED_CPU_CORES: usize = 1;
/// Default maximum concurrency for prewarm task.
pub const DEFAULT_PREWARM_MAX_CONCURRENCY: usize = 16;
/// Default depth for sparse trie pruning.
///
/// Nodes at this depth and below are converted to hash stubs to reduce memory.
/// Depth 4 means we keep roughly 16^4 = 65536 potential branch paths at most.
pub const DEFAULT_SPARSE_TRIE_PRUNE_DEPTH: usize = 4;
/// Default maximum number of storage tries to keep after pruning.
///
/// Storage tries beyond this limit are cleared (but allocations preserved).
pub const DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES: usize = 100;
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = EPOCH_SLOTS as u32 * 2;
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
@@ -152,6 +163,12 @@ pub struct TreeConfig {
disable_proof_v2: bool,
/// Whether to disable cache metrics recording (can be expensive with large cached state).
disable_cache_metrics: bool,
/// Whether to enable sparse trie as cache.
enable_sparse_trie_as_cache: bool,
/// Depth for sparse trie pruning after state root computation.
sparse_trie_prune_depth: usize,
/// Maximum number of storage tries to retain after pruning.
sparse_trie_max_storage_tries: usize,
}
impl Default for TreeConfig {
@@ -181,6 +198,9 @@ impl Default for TreeConfig {
account_worker_count: default_account_worker_count(),
disable_proof_v2: false,
disable_cache_metrics: false,
enable_sparse_trie_as_cache: false,
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
}
}
}
@@ -213,6 +233,8 @@ impl TreeConfig {
account_worker_count: usize,
disable_proof_v2: bool,
disable_cache_metrics: bool,
sparse_trie_prune_depth: usize,
sparse_trie_max_storage_tries: usize,
) -> Self {
Self {
persistence_threshold,
@@ -239,6 +261,9 @@ impl TreeConfig {
account_worker_count,
disable_proof_v2,
disable_cache_metrics,
enable_sparse_trie_as_cache: false,
sparse_trie_prune_depth,
sparse_trie_max_storage_tries,
}
}
@@ -540,4 +565,37 @@ impl TreeConfig {
self.disable_cache_metrics = disable_cache_metrics;
self
}
/// Returns whether sparse trie as cache is enabled.
pub const fn enable_sparse_trie_as_cache(&self) -> bool {
self.enable_sparse_trie_as_cache
}
/// Setter for whether to enable sparse trie as cache.
pub const fn with_enable_sparse_trie_as_cache(mut self, value: bool) -> Self {
self.enable_sparse_trie_as_cache = value;
self
}
/// Returns the sparse trie prune depth.
pub const fn sparse_trie_prune_depth(&self) -> usize {
self.sparse_trie_prune_depth
}
/// Setter for sparse trie prune depth.
pub const fn with_sparse_trie_prune_depth(mut self, depth: usize) -> Self {
self.sparse_trie_prune_depth = depth;
self
}
/// Returns the maximum number of storage tries to retain after pruning.
pub const fn sparse_trie_max_storage_tries(&self) -> usize {
self.sparse_trie_max_storage_tries
}
/// Setter for maximum storage tries to retain.
pub const fn with_sparse_trie_max_storage_tries(mut self, max_tries: usize) -> Self {
self.sparse_trie_max_storage_tries = max_tries;
self
}
}

View File

@@ -30,7 +30,7 @@ use reth_payload_primitives::{
};
use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
use reth_provider::{
BlockExecutionOutput, BlockExecutionResult, BlockNumReader, BlockReader, ChangeSetReader,
BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
TransactionVariant,
@@ -321,11 +321,10 @@ where
+ HashedPostStateProvider
+ Clone
+ 'static,
<P as DatabaseProviderFactory>::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
P::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
+ StageCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader
+ BlockNumReader,
+ StorageChangeSetReader,
C: ConfigureEvm<Primitives = N> + 'static,
T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
V: EngineValidator<T>,
@@ -1406,7 +1405,20 @@ where
);
self.changeset_cache.evict(eviction_threshold);
// Invalidate cached overlay since the anchor has changed
self.state.tree_state.invalidate_cached_overlay();
self.on_new_persisted_block()?;
// Re-prepare overlay for the current canonical head with the new anchor.
// Spawn a background task to trigger computation so it's ready when the next payload
// arrives.
if let Some(overlay) = self.state.tree_state.prepare_canonical_overlay() {
rayon::spawn(move || {
let _ = overlay.get();
});
}
Ok(())
}

View File

@@ -7,14 +7,14 @@ use crate::tree::{
prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
sparse_trie::StateRootComputeOutcome,
},
sparse_trie::SparseTrieTask,
sparse_trie::{SparseTrieCacheTask, SparseTrieTask, SpawnedSparseTrieTask},
StateProviderBuilder, TreeConfig,
};
use alloy_eip7928::BlockAccessList;
use alloy_eips::eip1898::BlockWithParent;
use alloy_evm::block::StateChangeSource;
use alloy_primitives::B256;
use crossbeam_channel::Sender as CrossbeamSender;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use executor::WorkloadExecutor;
use metrics::Counter;
use multiproof::{SparseTrieUpdate, *};
@@ -39,10 +39,7 @@ use reth_trie_parallel::{
proof_task::{ProofTaskCtx, ProofWorkerHandle},
root::ParallelStateRootError,
};
use reth_trie_sparse::{
provider::{TrieNodeProvider, TrieNodeProviderFactory},
ClearedSparseStateTrie, RevealableSparseTrie, SparseStateTrie,
};
use reth_trie_sparse::{RevealableSparseTrie, SparseStateTrie};
use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
use std::{
collections::BTreeMap,
@@ -59,10 +56,13 @@ use tracing::{debug, debug_span, instrument, warn, Span};
pub mod bal;
pub mod executor;
pub mod multiproof;
mod preserved_sparse_trie;
pub mod prewarm;
pub mod receipt_root_task;
pub mod sparse_trie;
use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
///
/// These values were determined by performing benchmarks using gradually increasing values to judge
@@ -125,13 +125,16 @@ where
precompile_cache_disabled: bool,
/// Precompile cache map.
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
/// A cleared `SparseStateTrie`, kept around to be reused for the state root computation so
/// that allocations can be minimized.
sparse_state_trie: Arc<
parking_lot::Mutex<Option<ClearedSparseStateTrie<ParallelSparseTrie, ParallelSparseTrie>>>,
>,
/// A pruned `SparseStateTrie`, kept around as a cache of already revealed trie nodes and to
/// re-use allocated memory. Stored with the block hash it was computed for to enable trie
/// preservation across sequential payload validations.
sparse_state_trie: SharedPreservedSparseTrie,
/// Maximum concurrency for prewarm task.
prewarm_max_concurrency: usize,
/// Sparse trie prune depth.
sparse_trie_prune_depth: usize,
/// Maximum storage tries to retain after pruning.
sparse_trie_max_storage_tries: usize,
/// Whether to disable cache metrics recording.
disable_cache_metrics: bool,
}
@@ -163,8 +166,10 @@ where
disable_state_cache: config.disable_state_cache(),
precompile_cache_disabled: config.precompile_cache_disabled(),
precompile_cache_map,
sparse_state_trie: Arc::default(),
sparse_state_trie: SharedPreservedSparseTrie::default(),
prewarm_max_concurrency: config.prewarm_max_concurrency(),
sparse_trie_prune_depth: config.sparse_trie_prune_depth(),
sparse_trie_max_storage_tries: config.sparse_trie_max_storage_tries(),
disable_cache_metrics: config.disable_cache_metrics(),
}
}
@@ -240,6 +245,9 @@ where
// Extract V2 proofs flag early so we can pass it to prewarm
let v2_proofs_enabled = !config.disable_proof_v2();
// Capture parent_state_root before env is moved into spawn_caching_with
let parent_state_root = env.parent_state_root;
// Handle BAL-based optimization if available
let prewarm_handle = if let Some(bal) = bal {
// When BAL is present, use BAL prewarming and send BAL to multiproof
@@ -283,37 +291,46 @@ where
v2_proofs_enabled,
);
let multi_proof_task = MultiProofTask::new(
proof_handle.clone(),
to_sparse_trie,
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
to_multi_proof.clone(),
from_multi_proof,
)
.with_v2_proofs_enabled(v2_proofs_enabled);
if !config.enable_sparse_trie_as_cache() {
let multi_proof_task = MultiProofTask::new(
proof_handle.clone(),
to_sparse_trie,
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
to_multi_proof.clone(),
from_multi_proof.clone(),
)
.with_v2_proofs_enabled(v2_proofs_enabled);
// spawn multi-proof task
let parent_span = span.clone();
let saved_cache = prewarm_handle.saved_cache.clone();
self.executor.spawn_blocking(move || {
let _enter = parent_span.entered();
// Build a state provider for the multiproof task
let provider = provider_builder.build().expect("failed to build provider");
let provider = if let Some(saved_cache) = saved_cache {
let (cache, metrics, _disable_metrics) = saved_cache.split();
Box::new(CachedStateProvider::new(provider, cache, metrics))
as Box<dyn StateProvider>
} else {
Box::new(provider)
};
multi_proof_task.run(provider);
});
// spawn multi-proof task
let parent_span = span.clone();
let saved_cache = prewarm_handle.saved_cache.clone();
self.executor.spawn_blocking(move || {
let _enter = parent_span.entered();
// Build a state provider for the multiproof task
let provider = provider_builder.build().expect("failed to build provider");
let provider = if let Some(saved_cache) = saved_cache {
let (cache, metrics, _disable_metrics) = saved_cache.split();
Box::new(CachedStateProvider::new(provider, cache, metrics))
as Box<dyn StateProvider>
} else {
Box::new(provider)
};
multi_proof_task.run(provider);
});
}
// wire the sparse trie to the state root response receiver
let (state_root_tx, state_root_rx) = channel();
// Spawn the sparse trie task using any stored trie and parallel trie configuration.
self.spawn_sparse_trie_task(sparse_trie_rx, proof_handle, state_root_tx);
self.spawn_sparse_trie_task(
sparse_trie_rx,
proof_handle,
state_root_tx,
from_multi_proof,
config,
parent_state_root,
);
PayloadHandle {
to_multi_proof: Some(to_multi_proof),
@@ -492,64 +509,124 @@ where
}
/// Spawns the [`SparseTrieTask`] for this payload processor.
///
/// The trie is preserved when the new payload is a child of the previous one.
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
fn spawn_sparse_trie_task<BPF>(
fn spawn_sparse_trie_task(
&self,
sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
proof_worker_handle: BPF,
proof_worker_handle: ProofWorkerHandle,
state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
) where
BPF: TrieNodeProviderFactory + Clone + Send + Sync + 'static,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
{
let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
from_multi_proof: CrossbeamReceiver<MultiProofMessage>,
config: &TreeConfig,
parent_state_root: B256,
) {
let preserved_sparse_trie = self.sparse_state_trie.clone();
let trie_metrics = self.trie_metrics.clone();
let span = Span::current();
let disable_sparse_trie_as_cache = !config.enable_sparse_trie_as_cache();
let prune_depth = self.sparse_trie_prune_depth;
let max_storage_tries = self.sparse_trie_max_storage_tries;
self.executor.spawn_blocking(move || {
let _enter = span.entered();
// Reuse a stored SparseStateTrie, or create a new one using the desired configuration
// if there's none to reuse.
let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
let default_trie = RevealableSparseTrie::blind_from(
ParallelSparseTrie::default()
.with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
);
ClearedSparseStateTrie::from_state_trie(
// Reuse a stored SparseStateTrie if available, applying continuation logic.
// If this payload's parent state root matches the preserved trie's anchor,
// we can reuse the pruned trie structure. Otherwise, we clear the trie but
// keep allocations.
let sparse_state_trie = preserved_sparse_trie
.take()
.map(|preserved| preserved.into_trie_for(parent_state_root))
.unwrap_or_else(|| {
debug!(
target: "engine::tree::payload_processor",
"Creating new sparse trie - no preserved trie available"
);
let default_trie = RevealableSparseTrie::blind_from(
ParallelSparseTrie::default().with_parallelism_thresholds(
PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS,
),
);
SparseStateTrie::new()
.with_accounts_trie(default_trie.clone())
.with_default_storage_trie(default_trie)
.with_updates(true),
)
});
.with_updates(true)
});
let task =
SparseTrieTask::<_, ParallelSparseTrie, ParallelSparseTrie>::new_with_cleared_trie(
let mut task = if disable_sparse_trie_as_cache {
SpawnedSparseTrieTask::Cleared(SparseTrieTask::new(
sparse_trie_rx,
proof_worker_handle,
trie_metrics,
trie_metrics.clone(),
sparse_state_trie,
))
} else {
SpawnedSparseTrieTask::Cached(SparseTrieCacheTask::new(
from_multi_proof,
proof_worker_handle,
trie_metrics.clone(),
sparse_state_trie,
))
};
let result = task.run();
if let Err(e) = &result {
tracing::error!(target: "engine::tree::payload_processor", "State root computation failed: {e:?}");
}
// Capture the computed state_root before sending the result
let computed_state_root = result.as_ref().ok().map(|outcome| outcome.state_root);
// Acquire the guard before sending the result to prevent a race condition:
// Without this, the next block could start after send() but before store(),
// causing take() to return None and forcing it to create a new empty trie
// instead of reusing the preserved one. Holding the guard ensures the next
// block's take() blocks until we've stored the trie for reuse.
let mut guard = preserved_sparse_trie.lock();
// Send state root computation result - next block may start but will block on take()
if state_root_tx.send(result).is_err() {
// Receiver dropped - payload was likely invalid or cancelled.
// Clear the trie instead of preserving potentially invalid state.
debug!(
target: "engine::tree::payload_processor",
"State root receiver dropped, clearing trie"
);
let trie = task.into_cleared_trie(
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
);
guard.store(PreservedSparseTrie::cleared(trie));
return;
}
let (result, trie) = task.run();
// Send state root computation result
let _ = state_root_tx.send(result);
// Clear the SparseStateTrie, shrink, and replace it back into the mutex _after_ sending
// results to the next step, so that time spent clearing doesn't block the step after
// this one.
let _enter = debug_span!(target: "engine::tree::payload_processor", "clear").entered();
let mut cleared_trie = ClearedSparseStateTrie::from_state_trie(trie);
// Shrink the sparse trie so that we don't have ever increasing memory.
cleared_trie.shrink_to(
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
);
cleared_sparse_trie.lock().replace(cleared_trie);
// Only preserve the trie as anchored if computation succeeded.
// A failed computation may have left the trie in a partially updated state.
let _enter =
debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
if let Some(state_root) = computed_state_root {
let start = std::time::Instant::now();
let trie = task.into_trie_for_reuse(
prune_depth,
max_storage_tries,
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
);
trie_metrics
.into_trie_for_reuse_duration_histogram
.record(start.elapsed().as_secs_f64());
guard.store(PreservedSparseTrie::anchored(trie, state_root));
} else {
debug!(
target: "engine::tree::payload_processor",
"State root computation failed, clearing trie"
);
let trie = task.into_cleared_trie(
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
);
guard.store(PreservedSparseTrie::cleared(trie));
}
});
}
@@ -883,6 +960,10 @@ pub struct ExecutionEnv<Evm: ConfigureEvm> {
pub hash: B256,
/// Hash of the parent block.
pub parent_hash: B256,
/// State root of the parent block.
/// Used for sparse trie continuation: if the preserved trie's anchor matches this,
/// the trie can be reused directly.
pub parent_state_root: B256,
}
impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
@@ -894,6 +975,7 @@ where
evm_env: Default::default(),
hash: Default::default(),
parent_hash: Default::default(),
parent_state_root: Default::default(),
}
}
}

View File

@@ -311,11 +311,7 @@ impl VersionedMultiProofTargets {
fn chunking_length(&self) -> usize {
match self {
Self::Legacy(targets) => targets.chunking_length(),
Self::V2(targets) => {
// For V2, count accounts + storage slots
targets.account_targets.len() +
targets.storage_targets.values().map(|slots| slots.len()).sum::<usize>()
}
Self::V2(targets) => targets.chunking_length(),
}
}
@@ -587,6 +583,8 @@ pub(crate) struct MultiProofTaskMetrics {
pub first_update_wait_time_histogram: Histogram,
/// Total time spent waiting for the last proof result.
pub last_proof_wait_time_histogram: Histogram,
/// Time spent preparing the sparse trie for reuse after state root computation.
pub into_trie_for_reuse_duration_histogram: Histogram,
}
/// Standalone task that receives a transaction state stream and updates relevant
@@ -1492,7 +1490,7 @@ fn get_proof_targets(
/// Dispatches work items as a single unit or in chunks based on target size and worker
/// availability.
#[allow(clippy::too_many_arguments)]
fn dispatch_with_chunking<T, I>(
pub(crate) fn dispatch_with_chunking<T, I>(
items: T,
chunking_len: usize,
chunk_size: Option<usize>,

View File

@@ -0,0 +1,117 @@
//! Preserved sparse trie for reuse across payload validations.
use alloy_primitives::B256;
use parking_lot::Mutex;
use reth_trie_sparse::SparseStateTrie;
use reth_trie_sparse_parallel::ParallelSparseTrie;
use std::sync::Arc;
use tracing::debug;
/// Type alias for the sparse trie type used in preservation.
pub(super) type SparseTrie = SparseStateTrie<ParallelSparseTrie, ParallelSparseTrie>;
/// Shared handle to a preserved sparse trie that can be reused across payload validations.
///
/// This is stored in [`PayloadProcessor`](super::PayloadProcessor) and cloned to pass to
/// [`SparseTrieTask`](super::sparse_trie::SparseTrieTask) for trie reuse.
#[derive(Debug, Default, Clone)]
pub(super) struct SharedPreservedSparseTrie(Arc<Mutex<Option<PreservedSparseTrie>>>);
impl SharedPreservedSparseTrie {
/// Takes the preserved trie if present, leaving `None` in its place.
pub(super) fn take(&self) -> Option<PreservedSparseTrie> {
self.0.lock().take()
}
/// Acquires a guard that blocks `take()` until dropped.
/// Use this before sending the state root result to ensure the next block
/// waits for the trie to be stored.
pub(super) fn lock(&self) -> PreservedTrieGuard<'_> {
PreservedTrieGuard(self.0.lock())
}
}
/// Guard that holds the lock on the preserved trie.
/// While held, `take()` will block. Call `store()` to save the trie before dropping.
pub(super) struct PreservedTrieGuard<'a>(parking_lot::MutexGuard<'a, Option<PreservedSparseTrie>>);
impl PreservedTrieGuard<'_> {
/// Stores a preserved trie for later reuse.
pub(super) fn store(&mut self, trie: PreservedSparseTrie) {
self.0.replace(trie);
}
}
/// A preserved sparse trie that can be reused across payload validations.
///
/// The trie exists in one of two states:
/// - **Anchored**: Has a computed state root and can be reused for payloads whose parent state root
/// matches the anchor.
/// - **Cleared**: Trie data has been cleared but allocations are preserved for reuse.
#[derive(Debug)]
pub(super) enum PreservedSparseTrie {
/// Trie with a computed state root that can be reused for continuation payloads.
Anchored {
/// The sparse state trie (pruned after root computation).
trie: SparseTrie,
/// The state root this trie represents (computed from the previous block).
/// Used to verify continuity: new payload's `parent_state_root` must match this.
state_root: B256,
},
/// Cleared trie with preserved allocations, ready for fresh use.
Cleared {
/// The sparse state trie with cleared data but preserved allocations.
trie: SparseTrie,
},
}
impl PreservedSparseTrie {
/// Creates a new anchored preserved trie.
///
/// The `state_root` is the computed state root from the trie, which becomes the
/// anchor for determining if subsequent payloads can reuse this trie.
pub(super) const fn anchored(trie: SparseTrie, state_root: B256) -> Self {
Self::Anchored { trie, state_root }
}
/// Creates a cleared preserved trie (allocations preserved, data cleared).
pub(super) const fn cleared(trie: SparseTrie) -> Self {
Self::Cleared { trie }
}
/// Consumes self and returns the trie for reuse.
///
/// If the preserved trie is anchored and the parent state root matches, the pruned
/// trie structure is reused directly. Otherwise, the trie is cleared but allocations
/// are preserved to reduce memory overhead.
pub(super) fn into_trie_for(self, parent_state_root: B256) -> SparseTrie {
match self {
Self::Anchored { trie, state_root } if state_root == parent_state_root => {
debug!(
target: "engine::tree::payload_processor",
%state_root,
"Reusing anchored sparse trie for continuation payload"
);
trie
}
Self::Anchored { mut trie, state_root } => {
debug!(
target: "engine::tree::payload_processor",
anchor_root = %state_root,
%parent_state_root,
"Clearing anchored sparse trie - parent state root mismatch"
);
trie.clear();
trie
}
Self::Cleared { trie } => {
debug!(
target: "engine::tree::payload_processor",
%parent_state_root,
"Using cleared sparse trie with preserved allocations"
);
trie
}
}
}
}

View File

@@ -1,21 +1,100 @@
//! Sparse Trie task related functionality.
use crate::tree::payload_processor::multiproof::{MultiProofTaskMetrics, SparseTrieUpdate};
use alloy_primitives::B256;
use rayon::iter::{ParallelBridge, ParallelIterator};
use reth_trie::{updates::TrieUpdates, Nibbles};
use reth_trie_parallel::{proof_task::ProofResult, root::ParallelStateRootError};
use reth_trie_sparse::{
errors::{SparseStateTrieResult, SparseTrieErrorKind},
provider::{TrieNodeProvider, TrieNodeProviderFactory},
ClearedSparseStateTrie, SerialSparseTrie, SparseStateTrie, SparseTrie,
use crate::tree::{
multiproof::{
dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
VersionedMultiProofTargets,
},
payload_processor::multiproof::{MultiProofTaskMetrics, SparseTrieUpdate},
};
use alloy_primitives::B256;
use alloy_rlp::Decodable;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use rayon::iter::{IntoParallelRefMutIterator, ParallelBridge, ParallelIterator};
use reth_primitives_traits::Account;
use reth_revm::state::EvmState;
use reth_trie::{
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
TrieAccount, EMPTY_ROOT_HASH,
};
use reth_trie_parallel::{
proof_task::{
AccountMultiproofInput, ProofResult, ProofResultContext, ProofResultMessage,
ProofWorkerHandle,
},
root::ParallelStateRootError,
targets_v2::MultiProofTargetsV2,
};
use reth_trie_sparse::{
errors::{SparseStateTrieResult, SparseTrieErrorKind, SparseTrieResult},
provider::{TrieNodeProvider, TrieNodeProviderFactory},
LeafUpdate, SerialSparseTrie, SparseStateTrie, SparseTrie, SparseTrieExt,
};
use revm_primitives::{hash_map::Entry, B256Map};
use smallvec::SmallVec;
use std::{
sync::mpsc,
time::{Duration, Instant},
};
use tracing::{debug, debug_span, instrument, trace};
use tracing::{debug, debug_span, error, instrument, trace};
#[expect(clippy::large_enum_variant)]
pub(super) enum SpawnedSparseTrieTask<BPF, A, S>
where
BPF: TrieNodeProviderFactory + Send + Sync,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
{
Cleared(SparseTrieTask<BPF, A, S>),
Cached(SparseTrieCacheTask<A, S>),
}
impl<BPF, A, S> SpawnedSparseTrieTask<BPF, A, S>
where
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
{
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
match self {
Self::Cleared(task) => task.run(),
Self::Cached(task) => task.run(),
}
}
pub(super) fn into_trie_for_reuse(
self,
prune_depth: usize,
max_storage_tries: usize,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> SparseStateTrie<A, S> {
match self {
Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
Self::Cached(task) => task.into_trie_for_reuse(
prune_depth,
max_storage_tries,
max_nodes_capacity,
max_values_capacity,
),
}
}
pub(super) fn into_cleared_trie(
self,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> SparseStateTrie<A, S> {
match self {
Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
Self::Cached(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
}
}
}
/// A task responsible for populating the sparse trie.
pub(super) struct SparseTrieTask<BPF, A = SerialSparseTrie, S = SerialSparseTrie>
@@ -38,46 +117,29 @@ where
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
{
/// Creates a new sparse trie, pre-populating with a [`ClearedSparseStateTrie`].
pub(super) fn new_with_cleared_trie(
/// Creates a new sparse trie task with the given trie.
pub(super) const fn new(
updates: mpsc::Receiver<SparseTrieUpdate>,
blinded_provider_factory: BPF,
metrics: MultiProofTaskMetrics,
sparse_state_trie: ClearedSparseStateTrie<A, S>,
trie: SparseStateTrie<A, S>,
) -> Self {
Self { updates, metrics, trie: sparse_state_trie.into_inner(), blinded_provider_factory }
Self { updates, metrics, trie, blinded_provider_factory }
}
/// Runs the sparse trie task to completion.
/// Runs the sparse trie task to completion, computing the state root.
///
/// This waits for new incoming [`SparseTrieUpdate`].
///
/// This concludes once the last trie update has been received.
///
/// # Returns
///
/// - State root computation outcome.
/// - `SparseStateTrie` that needs to be cleared and reused to avoid reallocations.
/// Receives [`SparseTrieUpdate`]s until the channel is closed, applying each update
/// to the trie. Once all updates are processed, computes and returns the final state root.
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
pub(super) fn run(
mut self,
) -> (Result<StateRootComputeOutcome, ParallelStateRootError>, SparseStateTrie<A, S>) {
// run the main loop to completion
let result = self.run_inner();
(result, self.trie)
}
/// Inner function to run the sparse trie task to completion.
///
/// See [`Self::run`] for more information.
fn run_inner(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
let now = Instant::now();
let mut num_iterations = 0;
@@ -127,6 +189,528 @@ where
Ok(StateRootComputeOutcome { state_root, trie_updates })
}
/// Clears and shrinks the trie, discarding all state.
///
/// Use this when the payload was invalid or cancelled - we don't want to preserve
/// potentially invalid trie state, but we keep the allocations for reuse.
pub(super) fn into_cleared_trie(
mut self,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> SparseStateTrie<A, S> {
self.trie.clear();
self.trie.shrink_to(max_nodes_capacity, max_values_capacity);
self.trie
}
}
/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
pub(super) struct SparseTrieCacheTask<A = SerialSparseTrie, S = SerialSparseTrie> {
/// Sender for proof results.
proof_result_tx: CrossbeamSender<ProofResultMessage>,
/// Receiver for proof results directly from workers.
proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
/// Receives updates from execution and prewarming.
updates: CrossbeamReceiver<MultiProofMessage>,
/// `SparseStateTrie` used for computing the state root.
trie: SparseStateTrie<A, S>,
/// Handle to the proof worker pools (storage and account).
proof_worker_handle: ProofWorkerHandle,
/// Account trie updates.
account_updates: B256Map<LeafUpdate>,
/// Storage trie updates. hashed address -> slot -> update.
storage_updates: B256Map<B256Map<LeafUpdate>>,
/// Account updates that are blocked by storage root calculation or account reveal.
///
/// Those are being moved into `account_updates` once storage roots
/// are revealed and/or calculated.
///
/// Invariant: for each entry in `pending_account_updates` account must either be already
/// revealed in the trie or have an entry in `account_updates`.
///
/// Values can be either of:
/// - None: account had a storage update and is awaiting storage root calculation and/or
/// account node reveal to complete.
/// - Some(_): account was changed/destroyed and is awaiting storage root calculation/reveal
/// to complete.
pending_account_updates: B256Map<Option<Option<Account>>>,
/// Cache of account proof targets that were already fetched/requested from the proof workers.
/// account -> lowest `min_len` requested.
fetched_account_targets: B256Map<u8>,
/// Cache of storage proof targets that have already been fetched/requested from the proof
/// workers. account -> slot -> lowest `min_len` requested.
fetched_storage_targets: B256Map<B256Map<u8>>,
/// Whether the last state update has been received.
finished_state_updates: bool,
/// Pending targets to be dispatched to the proof workers.
pending_targets: MultiProofTargetsV2,
/// Number of pending updates that were received but not yet processed.
pending_updates: usize,
/// Metrics for the sparse trie.
metrics: MultiProofTaskMetrics,
}
impl<A, S> SparseTrieCacheTask<A, S>
where
A: SparseTrieExt + Default,
S: SparseTrieExt + Default + Clone,
{
/// Creates a new sparse trie, pre-populating with a [`ClearedSparseStateTrie`].
pub(super) fn new(
updates: CrossbeamReceiver<MultiProofMessage>,
proof_worker_handle: ProofWorkerHandle,
metrics: MultiProofTaskMetrics,
trie: SparseStateTrie<A, S>,
) -> Self {
let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
Self {
proof_result_tx,
proof_result_rx,
updates,
proof_worker_handle,
trie,
account_updates: Default::default(),
storage_updates: Default::default(),
pending_account_updates: Default::default(),
fetched_account_targets: Default::default(),
fetched_storage_targets: Default::default(),
finished_state_updates: Default::default(),
pending_targets: Default::default(),
pending_updates: Default::default(),
metrics,
}
}
/// Prunes and shrinks the trie for reuse in the next payload built on top of this one.
///
/// Should be called after the state root result has been sent.
pub(super) fn into_trie_for_reuse(
mut self,
prune_depth: usize,
max_storage_tries: usize,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> SparseStateTrie<A, S> {
self.trie.prune(prune_depth, max_storage_tries);
self.trie.shrink_to(max_nodes_capacity, max_values_capacity);
self.trie
}
/// Clears and shrinks the trie, discarding all state.
///
/// Use this when the payload was invalid or cancelled - we don't want to preserve
/// potentially invalid trie state, but we keep the allocations for reuse.
pub(super) fn into_cleared_trie(
mut self,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> SparseStateTrie<A, S> {
self.trie.clear();
self.trie.shrink_to(max_nodes_capacity, max_values_capacity);
self.trie
}
/// Runs the sparse trie task to completion.
///
/// This waits for new incoming [`MultiProofMessage`]s, applies updates to the trie and
/// schedules proof fetching when needed.
///
/// This concludes once the last state update has been received and processed.
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
let now = Instant::now();
loop {
crossbeam_channel::select_biased! {
recv(self.updates) -> message => {
let update = match message {
Ok(m) => m,
Err(_) => {
break
}
};
self.on_multiproof_message(update);
self.pending_updates += 1;
}
recv(self.proof_result_rx) -> message => {
let Ok(result) = message else {
unreachable!("we own the sender half")
};
let ProofResult::V2(mut result) = result.result? else {
unreachable!("sparse trie as cache must only be used with multiproof v2");
};
while let Ok(res) = self.proof_result_rx.try_recv() {
let ProofResult::V2(res) = res.result? else {
unreachable!("sparse trie as cache must only be used with multiproof v2");
};
result.extend(res);
}
self.on_proof_result(result)?;
},
}
if self.updates.is_empty() && self.proof_result_rx.is_empty() {
self.dispatch_pending_targets();
self.process_updates()?;
self.dispatch_pending_targets();
} else if self.updates.is_empty() || self.pending_updates > 100 {
self.process_leaf_updates()?;
self.dispatch_pending_targets();
} else if self.updates.is_empty() || self.pending_targets.chunking_length() > 100 {
self.dispatch_pending_targets();
}
if self.finished_state_updates &&
self.account_updates.is_empty() &&
self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
{
break;
}
}
debug!(target: "engine::root", "All proofs processed, ending calculation");
let start = Instant::now();
let (state_root, trie_updates) =
self.trie.root_with_updates(&self.proof_worker_handle).map_err(|e| {
ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
})?;
let end = Instant::now();
self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
Ok(StateRootComputeOutcome { state_root, trie_updates })
}
fn on_multiproof_message(&mut self, message: MultiProofMessage) {
match message {
MultiProofMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
MultiProofMessage::StateUpdate(_, state) => self.on_state_update(state),
MultiProofMessage::EmptyProof { .. } => unreachable!(),
MultiProofMessage::BlockAccessList(_) => todo!(),
MultiProofMessage::FinishedStateUpdates => self.finished_state_updates = true,
}
}
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
fn on_prewarm_targets(&mut self, targets: VersionedMultiProofTargets) {
let VersionedMultiProofTargets::V2(targets) = targets else {
unreachable!("sparse trie as cache must only be used with V2 multiproof targets");
};
for target in targets.account_targets {
// Only touch accounts that are not yet present in the updates set.
self.account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
}
for (address, slots) in targets.storage_targets {
for slot in slots {
// Only touch storages that are not yet present in the updates set.
self.storage_updates
.entry(address)
.or_default()
.entry(slot.key())
.or_insert(LeafUpdate::Touched);
}
// Touch corresponding account leaf to make sure its revealed in accounts trie for
// storage root update.
self.account_updates.entry(address).or_insert(LeafUpdate::Touched);
}
}
/// Processes a state update and encodes all state changes as trie updates.
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all,
fields(accounts = update.len())
)]
fn on_state_update(&mut self, update: EvmState) {
let hashed_state_update = evm_state_to_hashed_post_state(update);
for (address, storage) in hashed_state_update.storages {
for (slot, value) in storage.storage {
let encoded = if value.is_zero() {
Vec::new()
} else {
alloy_rlp::encode_fixed_size(&value).to_vec()
};
self.storage_updates
.entry(address)
.or_default()
.insert(slot, LeafUpdate::Changed(encoded));
}
// Make sure account is tracked in `account_updates` so that it is revealed in accounts
// trie for storage root update.
self.account_updates.entry(address).or_insert(LeafUpdate::Touched);
// Make sure account is tracked in `pending_account_updates` so that once storage root
// is computed, it will be updated in the accounts trie.
self.pending_account_updates.entry(address).or_insert(None);
}
for (address, account) in hashed_state_update.accounts {
// Track account as touched.
//
// This might overwrite an existing update, which is fine, because storage root from it
// is already tracked in the trie and can be easily fetched again.
self.account_updates.insert(address, LeafUpdate::Touched);
// Track account in `pending_account_updates` so that once storage root is computed,
// it will be updated in the accounts trie.
self.pending_account_updates.insert(address, Some(account));
}
}
fn on_proof_result(
&mut self,
result: DecodedMultiProofV2,
) -> Result<(), ParallelStateRootError> {
self.trie.reveal_decoded_multiproof_v2(result).map_err(|e| {
ParallelStateRootError::Other(format!("could not reveal multiproof: {e:?}"))
})
}
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
fn process_leaf_updates(&mut self) -> SparseTrieResult<()> {
self.pending_updates = 0;
// Make sure that tries exist for all addresses that have updates.
for address in self.storage_updates.keys() {
self.trie.get_or_create_storage_trie_mut(*address);
}
let storage_results: Vec<_> = self
.trie
.storage_tries()
.iter_mut()
.filter_map(|(address, trie)| {
let updates = self.storage_updates.remove(address)?;
let fetched = self.fetched_storage_targets.remove(address).unwrap_or_default();
Some((address, updates, fetched, trie))
})
.par_bridge()
.map(|(address, mut updates, mut fetched, trie)| {
let mut targets = Vec::new();
trie.update_leaves(&mut updates, |path, min_len| match fetched.entry(path) {
Entry::Occupied(mut entry) => {
if min_len < *entry.get() {
entry.insert(min_len);
targets.push(Target::new(path).with_min_len(min_len));
}
}
Entry::Vacant(entry) => {
entry.insert(min_len);
targets.push(Target::new(path).with_min_len(min_len));
}
})?;
SparseTrieResult::Ok((address, targets, fetched, updates))
})
.collect::<Result<Vec<_>, _>>()?;
for (address, targets, fetched, updates) in storage_results {
self.fetched_storage_targets.insert(*address, fetched);
self.storage_updates.insert(*address, updates);
if !targets.is_empty() {
self.pending_targets.storage_targets.entry(*address).or_default().extend(targets);
}
}
// Process account trie updates and fill the account targets.
self.trie.trie_mut().update_leaves(
&mut self.account_updates,
|target, min_len| match self.fetched_account_targets.entry(target) {
Entry::Occupied(mut entry) => {
if min_len < *entry.get() {
entry.insert(min_len);
self.pending_targets
.account_targets
.push(Target::new(target).with_min_len(min_len));
}
}
Entry::Vacant(entry) => {
entry.insert(min_len);
self.pending_targets
.account_targets
.push(Target::new(target).with_min_len(min_len));
}
},
)?;
Ok(())
}
/// Applies updates to the sparse trie and dispatches requested multiproof targets.
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
fn process_updates(&mut self) -> SparseTrieResult<()> {
self.process_leaf_updates()?;
if self.pending_account_updates.is_empty() {
return Ok(());
}
let roots = self
.trie
.storage_tries()
.par_iter_mut()
.filter(|(address, _)| {
self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty())
})
.map(|(address, trie)| {
let root =
trie.root().expect("updates are drained, trie should be revealed by now");
(address, root)
})
.collect::<Vec<_>>();
for (address, storage_root) in roots {
// If the storage root is known and we have a pending update for this account, encode it
// into a proper update.
if let Entry::Occupied(entry) = self.pending_account_updates.entry(*address) &&
entry.get().is_some()
{
let account = entry.remove().expect("just checked, should be Some");
let encoded = if account.is_none_or(|account| account.is_empty()) &&
storage_root == EMPTY_ROOT_HASH
{
Vec::new()
} else {
// TODO: optimize allocation
alloy_rlp::encode(account.unwrap_or_default().into_trie_account(storage_root))
};
self.account_updates.insert(*address, LeafUpdate::Changed(encoded));
}
}
// Now promote pending account updates if possible.
self.pending_account_updates.retain(|addr, account| {
// If account has pending storage updates, it is still pending.
if self.storage_updates.get(addr).is_some_and(|updates| !updates.is_empty()) {
return true;
}
// Get the current account state either from the trie or from latest account update.
let trie_account = if let Some(LeafUpdate::Changed(encoded)) = self.account_updates.get(addr) {
Some(encoded).filter(|encoded| !encoded.is_empty())
} else if !self.account_updates.contains_key(addr) {
self.trie.get_account_value(addr)
} else {
// Needs to be revealed first
return true;
};
let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
let (account, storage_root) = if let Some(account) = account.take() {
// If account is Some(_) here it means it didn't have any storage updates
// and we can fetch the storage root directly from the account trie.
//
// If it did have storage updates, we would've had processed it above when iterating over storage tries.
let storage_root = trie_account.map(|account| account.storage_root).unwrap_or(EMPTY_ROOT_HASH);
(account, storage_root)
} else {
(trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
};
let encoded = if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
Vec::new()
} else {
let account = account.unwrap_or_default().into_trie_account(storage_root);
// TODO: optimize allocation
alloy_rlp::encode(account)
};
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
false
});
// Process account trie updates and fill the account targets.
self.trie.trie_mut().update_leaves(
&mut self.account_updates,
|target, min_len| match self.fetched_account_targets.entry(target) {
Entry::Occupied(mut entry) => {
if min_len < *entry.get() {
entry.insert(min_len);
self.pending_targets
.account_targets
.push(Target::new(target).with_min_len(min_len));
}
}
Entry::Vacant(entry) => {
entry.insert(min_len);
self.pending_targets
.account_targets
.push(Target::new(target).with_min_len(min_len));
}
},
)?;
Ok(())
}
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
fn dispatch_pending_targets(&mut self) {
if !self.pending_targets.is_empty() {
let chunking_length = self.pending_targets.chunking_length();
dispatch_with_chunking(
std::mem::take(&mut self.pending_targets),
chunking_length,
Some(60),
300,
self.proof_worker_handle.available_account_workers(),
self.proof_worker_handle.available_storage_workers(),
MultiProofTargetsV2::chunks,
|proof_targets| {
if let Err(e) = self.proof_worker_handle.dispatch_account_multiproof(
AccountMultiproofInput::V2 {
targets: proof_targets,
proof_result_sender: ProofResultContext::new(
self.proof_result_tx.clone(),
0,
HashedPostState::default(),
Instant::now(),
),
},
) {
error!("failed to dispatch account multiproof: {e:?}");
}
},
);
}
}
}
/// Outcome of the state root computation, including the state root itself with

View File

@@ -402,7 +402,12 @@ where
.in_scope(|| self.evm_env_for(&input))
.map_err(NewPayloadError::other)?;
let env = ExecutionEnv { evm_env, hash: input.hash(), parent_hash: input.parent_hash() };
let env = ExecutionEnv {
evm_env,
hash: input.hash(),
parent_hash: input.parent_hash(),
parent_state_root: parent_block.state_root(),
};
// Plan the strategy used for state root computation.
let strategy = self.plan_state_root_computation();
@@ -1112,10 +1117,13 @@ where
/// while the trie input computation is deferred until the overlay is actually needed.
///
/// If parent is on disk (no in-memory blocks), returns `None` for the lazy overlay.
///
/// Uses a cached overlay if available for the canonical head (the common case).
fn get_parent_lazy_overlay(
parent_hash: B256,
state: &EngineApiTreeState<N>,
) -> (Option<LazyOverlay>, B256) {
// Get blocks leading to the parent to determine the anchor
let (anchor_hash, blocks) =
state.tree_state.blocks_by_hash(parent_hash).unwrap_or_else(|| (parent_hash, vec![]));
@@ -1124,6 +1132,17 @@ where
return (None, anchor_hash);
}
// Try to use the cached overlay if it matches both parent hash and anchor
if let Some(cached) = state.tree_state.get_cached_overlay(parent_hash, anchor_hash) {
debug!(
target: "engine::tree::payload_validator",
%parent_hash,
%anchor_hash,
"Using cached canonical overlay"
);
return (Some(cached.overlay.clone()), cached.anchor_hash);
}
debug!(
target: "engine::tree::payload_validator",
%anchor_hash,

View File

@@ -6,7 +6,7 @@ use alloy_primitives::{
map::{HashMap, HashSet},
BlockNumber, B256,
};
use reth_chain_state::{EthPrimitives, ExecutedBlock};
use reth_chain_state::{DeferredTrieData, EthPrimitives, ExecutedBlock, LazyOverlay};
use reth_primitives_traits::{AlloyBlockHeader, NodePrimitives, SealedHeader};
use std::{
collections::{btree_map, hash_map, BTreeMap, VecDeque},
@@ -38,6 +38,12 @@ pub struct TreeState<N: NodePrimitives = EthPrimitives> {
pub(crate) current_canonical_head: BlockNumHash,
/// The engine API variant of this handler
pub(crate) engine_kind: EngineApiKind,
/// Pre-computed lazy overlay for the canonical head.
///
/// This is optimistically prepared after the canonical head changes, so that
/// the next payload building on the canonical head can use it immediately
/// without recomputing.
pub(crate) cached_canonical_overlay: Option<PreparedCanonicalOverlay>,
}
impl<N: NodePrimitives> TreeState<N> {
@@ -49,6 +55,7 @@ impl<N: NodePrimitives> TreeState<N> {
current_canonical_head,
parent_to_child: HashMap::default(),
engine_kind,
cached_canonical_overlay: None,
}
}
@@ -92,6 +99,66 @@ impl<N: NodePrimitives> TreeState<N> {
Some((parent_hash, blocks))
}
/// Prepares a cached lazy overlay for the current canonical head.
///
/// This should be called after the canonical head changes to optimistically
/// prepare the overlay for the next payload that will likely build on it.
///
/// Returns a clone of the [`LazyOverlay`] so the caller can spawn a background
/// task to trigger computation via [`LazyOverlay::get`]. This ensures the overlay
/// is actually computed before the next payload arrives.
pub(crate) fn prepare_canonical_overlay(&mut self) -> Option<LazyOverlay> {
let canonical_hash = self.current_canonical_head.hash;
// Get blocks leading to the canonical head
let Some((anchor_hash, blocks)) = self.blocks_by_hash(canonical_hash) else {
// Canonical head not in memory (persisted), no overlay needed
self.cached_canonical_overlay = None;
return None;
};
// Extract deferred trie data handles from blocks (newest to oldest)
let handles: Vec<DeferredTrieData> = blocks.iter().map(|b| b.trie_data_handle()).collect();
let overlay = LazyOverlay::new(anchor_hash, handles);
self.cached_canonical_overlay = Some(PreparedCanonicalOverlay {
parent_hash: canonical_hash,
overlay: overlay.clone(),
anchor_hash,
});
debug!(
target: "engine::tree",
%canonical_hash,
%anchor_hash,
num_blocks = blocks.len(),
"Prepared cached canonical overlay"
);
Some(overlay)
}
/// Returns the cached overlay if it matches the requested parent hash and anchor.
///
/// Both parent hash and anchor hash must match to ensure the overlay is valid.
/// This prevents using a stale overlay after persistence has advanced the anchor.
pub(crate) fn get_cached_overlay(
&self,
parent_hash: B256,
expected_anchor: B256,
) -> Option<&PreparedCanonicalOverlay> {
self.cached_canonical_overlay.as_ref().filter(|cached| {
cached.parent_hash == parent_hash && cached.anchor_hash == expected_anchor
})
}
/// Invalidates the cached overlay.
///
/// Should be called when the anchor changes (e.g., after persistence).
pub(crate) fn invalidate_cached_overlay(&mut self) {
self.cached_canonical_overlay = None;
}
/// Insert executed block into the state.
pub(crate) fn insert_executed(&mut self, executed: ExecutedBlock<N>) {
let hash = executed.recovered_block().hash();
@@ -288,6 +355,9 @@ impl<N: NodePrimitives> TreeState<N> {
if let Some(finalized_num_hash) = finalized_num_hash {
self.prune_finalized_sidechains(finalized_num_hash);
}
// Invalidate the cached overlay since blocks were removed and the anchor may have changed
self.invalidate_cached_overlay();
}
/// Updates the canonical head to the given block.
@@ -355,6 +425,39 @@ impl<N: NodePrimitives> TreeState<N> {
}
}
/// Pre-computed lazy overlay for the canonical head block.
///
/// This is prepared **optimistically** when the canonical head changes, allowing
/// the next payload (which typically builds on the canonical head) to reuse
/// the pre-computed overlay immediately without re-traversing in-memory blocks.
///
/// The overlay captures deferred trie data handles from all in-memory blocks
/// between the canonical head and the persisted anchor. When a new payload
/// arrives building on the canonical head, this cached overlay can be used
/// directly instead of calling `blocks_by_hash` and collecting handles again.
///
/// # Invalidation
///
/// The cached overlay is invalidated when:
/// - Persistence completes (anchor changes)
/// - The canonical head changes to a different block
#[derive(Debug, Clone)]
pub struct PreparedCanonicalOverlay {
/// The block hash for which this overlay is prepared as a parent.
///
/// When a payload arrives with this parent hash, the overlay can be reused.
pub parent_hash: B256,
/// The pre-computed lazy overlay containing deferred trie data handles.
///
/// This is computed optimistically after `set_canonical_head` so subsequent
/// payloads don't need to re-collect the handles.
pub overlay: LazyOverlay,
/// The anchor hash (persisted ancestor) this overlay is based on.
///
/// Used to verify the overlay is still valid (anchor hasn't changed due to persistence).
pub anchor_hash: B256,
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -259,6 +259,7 @@ impl TestHarness {
current_canonical_head: blocks.last().unwrap().recovered_block().num_hash(),
parent_to_child,
engine_kind: EngineApiKind::Ethereum,
cached_canonical_overlay: None,
};
let last_executed_block = blocks.last().unwrap().clone();

View File

@@ -174,7 +174,7 @@ where
}
Commands::P2P(command) => runner.run_until_ctrl_c(command.execute::<N>()),
Commands::Config(command) => runner.run_until_ctrl_c(command.execute()),
Commands::Prune(command) => runner.run_until_ctrl_c(command.execute::<N>()),
Commands::Prune(command) => runner.run_command_until_exit(|ctx| command.execute::<N>(ctx)),
#[cfg(feature = "dev")]
Commands::TestVectors(command) => runner.run_until_ctrl_c(command.execute()),
Commands::ReExecute(command) => runner.run_until_ctrl_c(command.execute::<N>(components)),

View File

@@ -1,4 +1,4 @@
use crate::utils::eth_payload_attributes;
use crate::utils::{advance_with_random_transactions, eth_payload_attributes};
use alloy_eips::eip7685::RequestsOrHash;
use alloy_genesis::Genesis;
use alloy_primitives::{Address, B256};
@@ -6,8 +6,9 @@ use alloy_rpc_types_engine::{PayloadAttributes, PayloadStatusEnum};
use jsonrpsee_core::client::ClientT;
use reth_chainspec::{ChainSpecBuilder, EthChainSpec, MAINNET};
use reth_e2e_test_utils::{
node::NodeTestContext, setup, transaction::TransactionTestContext, wallet::Wallet,
node::NodeTestContext, setup, setup_engine, transaction::TransactionTestContext, wallet::Wallet,
};
use reth_node_api::TreeConfig;
use reth_node_builder::{NodeBuilder, NodeHandle};
use reth_node_core::{args::RpcServerArgs, node_config::NodeConfig};
use reth_node_ethereum::EthereumNode;
@@ -256,3 +257,56 @@ async fn test_testing_build_block_v1_osaka() -> eyre::Result<()> {
Ok(())
}
/// Tests that sparse trie allocation reuse works correctly across consecutive blocks.
///
/// This test exercises the sparse trie allocation reuse path by:
/// 1. Starting a node with parallel state root computation enabled
/// 2. Advancing multiple consecutive blocks with random transactions
/// 3. Verifying that all blocks are successfully validated (state roots match)
///
/// Note: Trie structure reuse is currently disabled due to pruning creating blinded
/// nodes. The preserved trie's allocations are still reused to reduce memory overhead,
/// but the trie is cleared between blocks.
#[tokio::test]
async fn test_sparse_trie_reuse_across_blocks() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
// Use parallel state root (non-legacy) with pruning enabled
let tree_config = TreeConfig::default()
.with_legacy_state_root(false)
.with_sparse_trie_prune_depth(2)
.with_sparse_trie_max_storage_tries(100);
let (mut nodes, _tasks, _wallet) = setup_engine::<EthereumNode>(
1,
Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(serde_json::from_str(include_str!("../assets/genesis.json")).unwrap())
.cancun_activated()
.build(),
),
false,
tree_config,
eth_payload_attributes,
)
.await?;
let mut node = nodes.pop().unwrap();
// Use a seeded RNG for reproducibility
let mut rng = rand::rng();
// Advance multiple consecutive blocks with random transactions.
// This exercises the sparse trie reuse path where each block's pruned trie
// is reused for the next block's state root computation.
let num_blocks = 5;
advance_with_random_transactions(&mut node, num_blocks, &mut rng, true).await?;
// Verify the chain advanced correctly
let best_block = node.inner.provider.best_block_number()?;
assert_eq!(best_block, num_blocks as u64, "Expected {} blocks, got {}", num_blocks, best_block);
Ok(())
}

View File

@@ -171,7 +171,7 @@ pub enum SparseTrieErrorKind {
/// Path to the node.
path: Nibbles,
/// Node that was at the path when revealing.
node: Box<dyn core::fmt::Debug + Send>,
node: Box<dyn core::fmt::Debug + Send + Sync>,
},
/// RLP error.
#[error(transparent)]
@@ -184,7 +184,7 @@ pub enum SparseTrieErrorKind {
},
/// Other.
#[error(transparent)]
Other(#[from] Box<dyn core::error::Error + Send>),
Other(#[from] Box<dyn core::error::Error + Send + Sync>),
}
/// Trie witness errors.

View File

@@ -83,6 +83,28 @@ impl From<&'static str> for FileClientError {
}
impl<B: FullBlock> FileClient<B> {
/// Create a new file client from a slice of sealed blocks.
pub fn from_blocks(blocks: impl IntoIterator<Item = SealedBlock<B>>) -> Self {
let blocks: Vec<_> = blocks.into_iter().collect();
let capacity = blocks.len();
let mut headers = HashMap::with_capacity(capacity);
let mut hash_to_number = HashMap::with_capacity(capacity);
let mut bodies = HashMap::with_capacity(capacity);
for block in blocks {
let number = block.number();
let hash = block.hash();
let (header, body) = block.split_sealed_header_body();
headers.insert(number, header.into_header());
hash_to_number.insert(hash, number);
bodies.insert(hash, body);
}
Self { headers, hash_to_number, bodies }
}
/// Create a new file client from a file path.
pub async fn new<P: AsRef<Path>>(
path: P,

View File

@@ -68,7 +68,7 @@ pub enum P2PStreamError {
SendBufferFull,
/// Disconnected error.
#[error("disconnected")]
#[error("disconnected: {0}")]
Disconnected(DisconnectReason),
/// Unknown disconnect reason error.

View File

@@ -20,7 +20,7 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use tracing::{debug, trace};
use tracing::trace;
#[cfg_attr(doc, aquamarine::aquamarine)]
/// Contains the connectivity related state of the network.
@@ -259,7 +259,7 @@ impl<N: NetworkPrimitives> Swarm<N> {
if self.sessions.is_valid_fork_id(fork_id) {
self.state_mut().peers_mut().set_discovered_fork_id(peer_id, fork_id);
} else {
debug!(target: "net", ?peer_id, remote_fork_id=?fork_id, our_fork_id=?self.sessions.fork_id(), "fork id mismatch, removing peer");
trace!(target: "net", ?peer_id, remote_fork_id=?fork_id, our_fork_id=?self.sessions.fork_id(), "fork id mismatch, removing peer");
self.state_mut().peers_mut().remove_peer(peer_id);
}
}

View File

@@ -303,20 +303,8 @@ impl EngineNodeLauncher {
// the CL
loop {
tokio::select! {
shutdown_req = &mut shutdown_rx => {
if let Ok(req) = shutdown_req {
debug!(target: "reth::cli", "received engine shutdown request");
engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(
FromOrchestrator::Terminate { tx: req.done_tx }.into()
);
}
}
payload = built_payloads.select_next_some() => {
if let Some(executed_block) = payload.executed_block() {
debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(), "inserting built payload");
engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block.into_executed_payload()).into());
}
}
biased;
event = engine_service.next() => {
let Some(event) = event else { break };
debug!(target: "reth::cli", "Event: {event}");
@@ -364,6 +352,20 @@ impl EngineNodeLauncher {
}
}
}
payload = built_payloads.select_next_some() => {
if let Some(executed_block) = payload.executed_block() {
debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(), "inserting built payload");
engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block.into_executed_payload()).into());
}
}
shutdown_req = &mut shutdown_rx => {
if let Ok(req) = shutdown_req {
debug!(target: "reth::cli", "received engine shutdown request");
engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(
FromOrchestrator::Terminate { tx: req.done_tx }.into()
);
}
}
}
}

View File

@@ -37,6 +37,7 @@ pub struct DefaultEngineValues {
account_worker_count: Option<usize>,
disable_proof_v2: bool,
cache_metrics_disabled: bool,
enable_sparse_trie_as_cache: bool,
}
impl DefaultEngineValues {
@@ -172,6 +173,12 @@ impl DefaultEngineValues {
self.cache_metrics_disabled = v;
self
}
/// Set whether to enable sparse trie as cache by default
pub const fn with_enable_sparse_trie_as_cache(mut self, v: bool) -> Self {
self.enable_sparse_trie_as_cache = v;
self
}
}
impl Default for DefaultEngineValues {
@@ -197,6 +204,7 @@ impl Default for DefaultEngineValues {
account_worker_count: None,
disable_proof_v2: false,
cache_metrics_disabled: false,
enable_sparse_trie_as_cache: false,
}
}
}
@@ -324,6 +332,10 @@ pub struct EngineArgs {
/// Disable cache metrics recording, which can take up to 50ms with large cached state.
#[arg(long = "engine.disable-cache-metrics", default_value_t = DefaultEngineValues::get_global().cache_metrics_disabled)]
pub cache_metrics_disabled: bool,
/// Enable sparse trie as cache.
#[arg(long = "engine.enable-sparse-trie-as-cache", default_value_t = DefaultEngineValues::get_global().enable_sparse_trie_as_cache, conflicts_with = "disable_proof_v2")]
pub enable_sparse_trie_as_cache: bool,
}
#[allow(deprecated)]
@@ -350,6 +362,7 @@ impl Default for EngineArgs {
account_worker_count,
disable_proof_v2,
cache_metrics_disabled,
enable_sparse_trie_as_cache,
} = DefaultEngineValues::get_global().clone();
Self {
persistence_threshold,
@@ -376,6 +389,7 @@ impl Default for EngineArgs {
account_worker_count,
disable_proof_v2,
cache_metrics_disabled,
enable_sparse_trie_as_cache,
}
}
}
@@ -412,6 +426,7 @@ impl EngineArgs {
config = config.with_disable_proof_v2(self.disable_proof_v2);
config = config.without_cache_metrics(self.cache_metrics_disabled);
config = config.with_enable_sparse_trie_as_cache(self.enable_sparse_trie_as_cache);
config
}
@@ -464,6 +479,7 @@ mod tests {
account_worker_count: Some(8),
disable_proof_v2: false,
cache_metrics_disabled: true,
enable_sparse_trie_as_cache: false,
};
let parsed_args = CommandParser::<EngineArgs>::parse_from([

View File

@@ -5,7 +5,9 @@ use alloy_primitives::{Address, BlockNumber};
use clap::{builder::RangedU64ValueParser, Args};
use reth_chainspec::EthereumHardforks;
use reth_config::config::PruneConfig;
use reth_prune_types::{PruneMode, PruneModes, ReceiptsLogPruneConfig, MINIMUM_PRUNING_DISTANCE};
use reth_prune_types::{
PruneMode, PruneModes, ReceiptsLogPruneConfig, MINIMUM_UNWIND_SAFE_DISTANCE,
};
use std::{collections::BTreeMap, ops::Not, sync::OnceLock};
/// Global static pruning defaults
@@ -68,9 +70,9 @@ impl Default for DefaultPruningValues {
full_prune_modes: PruneModes {
sender_recovery: Some(PruneMode::Full),
transaction_lookup: None,
receipts: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
account_history: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
storage_history: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
receipts: Some(PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)),
account_history: Some(PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)),
storage_history: Some(PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)),
// This field is ignored when full_bodies_history_use_pre_merge is true
bodies_history: None,
receipts_log_filter: Default::default(),
@@ -80,9 +82,9 @@ impl Default for DefaultPruningValues {
sender_recovery: Some(PruneMode::Full),
transaction_lookup: Some(PruneMode::Full),
receipts: Some(PruneMode::Full),
account_history: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
storage_history: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
bodies_history: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
account_history: Some(PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)),
storage_history: Some(PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)),
bodies_history: Some(PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)),
receipts_log_filter: Default::default(),
},
}
@@ -93,7 +95,8 @@ impl Default for DefaultPruningValues {
#[derive(Debug, Clone, Args, PartialEq, Eq, Default)]
#[command(next_help_heading = "Pruning")]
pub struct PruningArgs {
/// Run full node. Only the most recent [`MINIMUM_PRUNING_DISTANCE`] block states are stored.
/// Run full node. Only the most recent [`MINIMUM_UNWIND_SAFE_DISTANCE`] block states are
/// stored.
#[arg(long, default_value_t = false, conflicts_with = "minimal")]
pub full: bool,

View File

@@ -645,7 +645,7 @@ pub struct RpcServerArgs {
///
/// When enabled, transactions that fail execution will be skipped, and all subsequent
/// transactions from the same sender will also be skipped.
#[arg(long = "testing.skip-invalid-transactions", default_value_t = false)]
#[arg(long = "testing.skip-invalid-transactions", default_value_t = true)]
pub testing_skip_invalid_transactions: bool,
}
@@ -859,7 +859,7 @@ impl Default for RpcServerArgs {
rpc_state_cache,
gas_price_oracle,
rpc_send_raw_transaction_sync_timeout,
testing_skip_invalid_transactions: false,
testing_skip_invalid_transactions: true,
}
}
}

View File

@@ -13,15 +13,23 @@ pub(crate) struct EthstatsCredentials {
pub secret: String,
/// Host address of the `EthStats` server
pub host: String,
/// Whether to use secure `WebSocket` (`WSS`) connection
pub use_tls: bool,
}
impl FromStr for EthstatsCredentials {
type Err = EthStatsError;
/// Parse credentials from a string in the format "`node_id:secret@host`"
/// Parse credentials from a string in the format "`node_id:secret@host`" or
/// "`node_id:secret@wss://host`"
///
/// Supports the following formats:
/// - `node_id:secret@host` - Uses plain `WebSocket` (`ws://`)
/// - `node_id:secret@ws://host` - Explicitly use plain `WebSocket`
/// - `node_id:secret@wss://host` - Use secure `WebSocket` (`WSS`)
///
/// # Arguments
/// * `s` - String containing credentials in the format "`node_id:secret@host`"
/// * `s` - String containing credentials
///
/// # Returns
/// * `Ok(EthstatsCredentials)` - Successfully parsed credentials
@@ -32,7 +40,7 @@ impl FromStr for EthstatsCredentials {
return Err(EthStatsError::InvalidUrl("Missing '@' separator".to_string()));
}
let creds = parts[0];
let host = parts[1].to_string();
let mut host = parts[1].to_string();
let creds_parts: Vec<&str> = creds.split(':').collect();
if creds_parts.len() != 2 {
return Err(EthStatsError::InvalidUrl(
@@ -42,6 +50,16 @@ impl FromStr for EthstatsCredentials {
let node_id = creds_parts[0].to_string();
let secret = creds_parts[1].to_string();
Ok(Self { node_id, secret, host })
// Detect and strip protocol prefix if present
let mut use_tls = false;
if let Some(stripped) = host.strip_prefix("wss://") {
use_tls = true;
host = stripped.to_string();
} else if let Some(stripped) = host.strip_prefix("ws://") {
use_tls = false;
host = stripped.to_string();
}
Ok(Self { node_id, secret, host, use_tls })
}
}

View File

@@ -102,13 +102,15 @@ where
/// Establish `WebSocket` connection to the `EthStats` server
///
/// Attempts to connect to the server using the credentials and handles
/// connection timeouts and errors.
/// connection timeouts and errors. Uses either `ws://` or `wss://` based
/// on the credentials configuration.
async fn connect(&self) -> Result<(), EthStatsError> {
debug!(
target: "ethstats",
"Attempting to connect to EthStats server at {}", self.credentials.host
);
let full_url = format!("ws://{}/api", self.credentials.host);
let protocol = if self.credentials.use_tls { "wss" } else { "ws" };
let full_url = format!("{}://{}/api", protocol, self.credentials.host);
let url = Url::parse(&full_url).map_err(EthStatsError::Url)?;
match timeout(CONNECT_TIMEOUT, connect_async(url.as_str())).await {

View File

@@ -1,7 +1,5 @@
//! OP stack variation of chain spec constants.
use alloy_primitives::hex;
//------------------------------- BASE MAINNET -------------------------------//
/// Max gas limit on Base: <https://basescan.org/block/17208876>
@@ -11,13 +9,3 @@ pub const BASE_MAINNET_MAX_GAS_LIMIT: u64 = 105_000_000;
/// Max gas limit on Base Sepolia: <https://sepolia.basescan.org/block/12506483>
pub const BASE_SEPOLIA_MAX_GAS_LIMIT: u64 = 45_000_000;
//----------------------------------- DEV ------------------------------------//
/// Dummy system transaction for dev mode
/// OP Mainnet transaction at index 0 in block 124665056.
///
/// <https://optimistic.etherscan.io/tx/0x312e290cf36df704a2217b015d6455396830b0ce678b860ebfcc30f41403d7b1>
pub const TX_SET_L1_BLOCK_OP_MAINNET_BLOCK_124665056: [u8; 251] = hex!(
"7ef8f8a0683079df94aa5b9cf86687d739a60a9b4f0835e520ec4d664e2e415dca17a6df94deaddeaddeaddeaddeaddeaddeaddeaddead00019442000000000000000000000000000000000000158080830f424080b8a4440a5e200000146b000f79c500000000000000040000000066d052e700000000013ad8a3000000000000000000000000000000000000000000000000000000003ef1278700000000000000000000000000000000000000000000000000000000000000012fdf87b89884a61e74b322bbcf60386f543bfae7827725efaaf0ab1de2294a590000000000000000000000006887246668a3b87f54deb3b94ba47a6f63f32985"
);

View File

@@ -106,7 +106,9 @@ where
}
Commands::P2P(command) => runner.run_until_ctrl_c(command.execute::<OpNode>()),
Commands::Config(command) => runner.run_until_ctrl_c(command.execute()),
Commands::Prune(command) => runner.run_until_ctrl_c(command.execute::<OpNode>()),
Commands::Prune(command) => {
runner.run_command_until_exit(|ctx| command.execute::<OpNode>(ctx))
}
#[cfg(feature = "dev")]
Commands::TestVectors(command) => runner.run_until_ctrl_c(command.execute()),
Commands::ReExecute(command) => {

View File

@@ -4,14 +4,17 @@ use crate::{OpEthApi, OpEthApiError, SequencerClient};
use alloy_primitives::{Bytes, B256};
use alloy_rpc_types_eth::TransactionInfo;
use futures::StreamExt;
use op_alloy_consensus::{transaction::OpTransactionInfo, OpTransaction};
use op_alloy_consensus::{
transaction::{OpDepositInfo, OpTransactionInfo},
OpTransaction,
};
use reth_chain_state::CanonStateSubscriptions;
use reth_optimism_primitives::DepositReceipt;
use reth_primitives_traits::{Recovered, SignedTransaction, SignerRecoverable, WithEncoded};
use reth_rpc_eth_api::{
helpers::{spec::SignersForRpc, EthTransactions, LoadReceipt, LoadTransaction, SpawnBlocking},
try_into_op_tx_info, EthApiTypes as _, FromEthApiError, FromEvmError, RpcConvert, RpcNodeCore,
RpcReceipt, TxInfoMapper,
EthApiTypes as _, FromEthApiError, FromEvmError, RpcConvert, RpcNodeCore, RpcReceipt,
TxInfoMapper,
};
use reth_rpc_eth_types::{block::convert_transaction_receipt, EthApiError, TransactionSource};
use reth_storage_api::{errors::ProviderError, ProviderTx, ReceiptProvider, TransactionsProvider};
@@ -282,6 +285,18 @@ where
type Err = ProviderError;
fn try_map(&self, tx: &T, tx_info: TransactionInfo) -> Result<Self::Out, ProviderError> {
try_into_op_tx_info(&self.provider, tx, tx_info)
let deposit_meta = if tx.is_deposit() {
self.provider.receipt_by_hash(*tx.tx_hash())?.and_then(|receipt| {
receipt.as_deposit_receipt().map(|receipt| OpDepositInfo {
deposit_receipt_version: receipt.deposit_receipt_version,
deposit_nonce: receipt.deposit_nonce,
})
})
} else {
None
}
.unwrap_or_default();
Ok(OpTransactionInfo::new(tx_info, deposit_meta))
}
}

View File

@@ -89,6 +89,12 @@ impl From<revm_state::Account> for Account {
}
}
impl From<TrieAccount> for Account {
fn from(value: TrieAccount) -> Self {
Self { balance: value.balance, nonce: value.nonce, bytecode_hash: Some(value.code_hash) }
}
}
impl InMemorySize for Account {
fn size(&self) -> usize {
size_of::<Self>()

View File

@@ -11,7 +11,7 @@ use reth_provider::{
};
use reth_prune_types::{
PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, ReceiptsLogPruneConfig, SegmentOutput,
MINIMUM_PRUNING_DISTANCE,
MINIMUM_UNWIND_SAFE_DISTANCE,
};
use tracing::{instrument, trace};
#[derive(Debug)]
@@ -49,8 +49,8 @@ where
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
// Contract log filtering removes every receipt possible except the ones in the list. So,
// for the other receipts it's as if they had a `PruneMode::Distance()` of
// `MINIMUM_PRUNING_DISTANCE`.
let to_block = PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)
// `MINIMUM_UNWIND_SAFE_DISTANCE`.
let to_block = PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)
.prune_target_block(input.to_block, PruneSegment::ContractLogs, PrunePurpose::User)?
.map(|(bn, _)| bn)
.unwrap_or_default();

View File

@@ -1,14 +1,18 @@
use crate::{
db_ext::DbTxPruneExt,
segments::{PruneInput, Segment},
segments::{self, PruneInput, Segment},
PrunerError,
};
use reth_db_api::{tables, transaction::DbTxMut};
use reth_provider::{BlockReader, DBProvider, TransactionsProvider};
use reth_provider::{
BlockReader, DBProvider, EitherWriterDestination, StaticFileProviderFactory,
StorageSettingsCache, TransactionsProvider,
};
use reth_prune_types::{
PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use tracing::{instrument, trace};
use reth_static_file_types::StaticFileSegment;
use tracing::{debug, instrument, trace};
#[derive(Debug)]
pub struct SenderRecovery {
@@ -23,7 +27,11 @@ impl SenderRecovery {
impl<Provider> Segment<Provider> for SenderRecovery
where
Provider: DBProvider<Tx: DbTxMut> + TransactionsProvider + BlockReader,
Provider: DBProvider<Tx: DbTxMut>
+ TransactionsProvider
+ BlockReader
+ StorageSettingsCache
+ StaticFileProviderFactory,
{
fn segment(&self) -> PruneSegment {
PruneSegment::SenderRecovery
@@ -39,6 +47,16 @@ where
#[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))]
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
if EitherWriterDestination::senders(provider).is_static_file() {
debug!(target: "pruner", "Pruning transaction senders from static files.");
return segments::prune_static_files(
provider,
input,
StaticFileSegment::TransactionSenders,
)
}
debug!(target: "pruner", "Pruning transaction senders from database.");
let tx_range = match input.get_next_tx_num_range(provider)? {
Some(range) => range,
None => {

View File

@@ -30,7 +30,9 @@ pub use pruner::{
SegmentOutputCheckpoint,
};
pub use segment::{PrunePurpose, PruneSegment, PruneSegmentError};
pub use target::{PruneModes, UnwindTargetPrunedError, MINIMUM_PRUNING_DISTANCE};
pub use target::{
PruneModes, UnwindTargetPrunedError, MINIMUM_DISTANCE, MINIMUM_UNWIND_SAFE_DISTANCE,
};
/// Configuration for pruning receipts not associated with logs emitted by the specified contracts.
#[derive(Debug, Clone, PartialEq, Eq, Default)]

View File

@@ -41,8 +41,12 @@ impl PruneMode {
segment: PruneSegment,
purpose: PrunePurpose,
) -> Result<Option<(BlockNumber, Self)>, PruneSegmentError> {
let min_blocks = segment.min_blocks();
let result = match self {
Self::Full if segment.min_blocks() == 0 => Some((tip, *self)),
Self::Full if min_blocks == 0 => Some((tip, *self)),
// For segments with min_blocks > 0, Full mode behaves like Distance(min_blocks)
Self::Full if min_blocks <= tip => Some((tip - min_blocks, *self)),
Self::Full => None, // Nothing to prune yet
Self::Distance(distance) if *distance > tip => None, // Nothing to prune yet
Self::Distance(distance) if *distance >= segment.min_blocks() => {
Some((tip - distance, *self))
@@ -84,9 +88,7 @@ impl PruneMode {
#[cfg(test)]
mod tests {
use crate::{
PruneMode, PrunePurpose, PruneSegment, PruneSegmentError, MINIMUM_PRUNING_DISTANCE,
};
use crate::{PruneMode, PrunePurpose, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE};
use assert_matches::assert_matches;
use serde::Deserialize;
@@ -96,8 +98,8 @@ mod tests {
let segment = PruneSegment::AccountHistory;
let tests = vec![
// MINIMUM_PRUNING_DISTANCE makes this impossible
(PruneMode::Full, Err(PruneSegmentError::Configuration(segment))),
// Full mode with min_blocks > 0 behaves like Distance(min_blocks)
(PruneMode::Full, Ok(Some(tip - segment.min_blocks()))),
// Nothing to prune
(PruneMode::Distance(tip + 1), Ok(None)),
(
@@ -107,12 +109,12 @@ mod tests {
// Nothing to prune
(PruneMode::Before(tip + 1), Ok(None)),
(
PruneMode::Before(tip - MINIMUM_PRUNING_DISTANCE),
Ok(Some(tip - MINIMUM_PRUNING_DISTANCE - 1)),
PruneMode::Before(tip - MINIMUM_UNWIND_SAFE_DISTANCE),
Ok(Some(tip - MINIMUM_UNWIND_SAFE_DISTANCE - 1)),
),
(
PruneMode::Before(tip - MINIMUM_PRUNING_DISTANCE - 1),
Ok(Some(tip - MINIMUM_PRUNING_DISTANCE - 2)),
PruneMode::Before(tip - MINIMUM_UNWIND_SAFE_DISTANCE - 1),
Ok(Some(tip - MINIMUM_UNWIND_SAFE_DISTANCE - 2)),
),
// Nothing to prune
(PruneMode::Before(tip - 1), Ok(None)),
@@ -146,13 +148,13 @@ mod tests {
let tests = vec![
(PruneMode::Distance(tip + 1), 1, !should_prune),
(
PruneMode::Distance(MINIMUM_PRUNING_DISTANCE + 1),
tip - MINIMUM_PRUNING_DISTANCE - 1,
PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE + 1),
tip - MINIMUM_UNWIND_SAFE_DISTANCE - 1,
!should_prune,
),
(
PruneMode::Distance(MINIMUM_PRUNING_DISTANCE + 1),
tip - MINIMUM_PRUNING_DISTANCE - 2,
PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE + 1),
tip - MINIMUM_UNWIND_SAFE_DISTANCE - 2,
should_prune,
),
(PruneMode::Before(tip + 1), 1, should_prune),

View File

@@ -1,6 +1,6 @@
#![allow(deprecated)] // necessary to all defining deprecated `PruneSegment` variants
use crate::MINIMUM_PRUNING_DISTANCE;
use crate::{MINIMUM_DISTANCE, MINIMUM_UNWIND_SAFE_DISTANCE};
use derive_more::Display;
use strum::{EnumIter, IntoEnumIterator};
use thiserror::Error;
@@ -65,9 +65,10 @@ impl PruneSegment {
/// Returns minimum number of blocks to keep in the database for this segment.
pub const fn min_blocks(&self) -> u64 {
match self {
Self::SenderRecovery | Self::TransactionLookup | Self::Receipts | Self::Bodies => 0,
Self::SenderRecovery | Self::TransactionLookup => 0,
Self::Receipts | Self::Bodies => MINIMUM_DISTANCE,
Self::ContractLogs | Self::AccountHistory | Self::StorageHistory => {
MINIMUM_PRUNING_DISTANCE
MINIMUM_UNWIND_SAFE_DISTANCE
}
#[expect(deprecated)]
#[expect(clippy::match_same_arms)]

View File

@@ -9,7 +9,12 @@ use crate::{PruneCheckpoint, PruneMode, PruneSegment, ReceiptsLogPruneConfig};
/// consensus protocol.
/// 2. Another 10k blocks to have a room for maneuver in case when things go wrong and a manual
/// unwind is required.
pub const MINIMUM_PRUNING_DISTANCE: u64 = 32 * 2 + 10_000;
pub const MINIMUM_UNWIND_SAFE_DISTANCE: u64 = 32 * 2 + 10_000;
/// Minimum blocks to retain for receipts and bodies to ensure reorg safety.
/// This prevents pruning data that may be needed when handling chain reorganizations,
/// specifically when `canonical_block_by_hash` needs to reconstruct `ExecutedBlock` from disk.
pub const MINIMUM_DISTANCE: u64 = 64;
/// Type of history that can be pruned
#[derive(Debug, Error, PartialEq, Eq, Clone)]
@@ -56,7 +61,7 @@ pub struct PruneModes {
any(test, feature = "serde"),
serde(
skip_serializing_if = "Option::is_none",
deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<MINIMUM_PRUNING_DISTANCE, _>"
deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<MINIMUM_UNWIND_SAFE_DISTANCE, _>"
)
)]
pub account_history: Option<PruneMode>,
@@ -65,7 +70,7 @@ pub struct PruneModes {
any(test, feature = "serde"),
serde(
skip_serializing_if = "Option::is_none",
deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<MINIMUM_PRUNING_DISTANCE, _>"
deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<MINIMUM_UNWIND_SAFE_DISTANCE, _>"
)
)]
pub storage_history: Option<PruneMode>,

View File

@@ -117,10 +117,7 @@ impl tokio_util::codec::Decoder for StreamCodec {
buf.advance(start_idx);
}
let bts = buf.split_to(idx + 1 - start_idx);
return match String::from_utf8(bts.into()) {
Ok(val) => Ok(Some(val)),
Err(_) => Ok(None),
}
return Ok(String::from_utf8(bts.into()).ok())
}
}
Ok(None)

View File

@@ -14,7 +14,6 @@ workspace = true
[dependencies]
# reth
reth-primitives-traits.workspace = true
reth-storage-api = { workspace = true, optional = true }
reth-evm.workspace = true
reth-ethereum-primitives.workspace = true
@@ -31,7 +30,6 @@ alloy-evm = { workspace = true, features = ["rpc"] }
op-alloy-consensus = { workspace = true, optional = true }
op-alloy-rpc-types = { workspace = true, optional = true }
op-alloy-network = { workspace = true, optional = true }
reth-optimism-primitives = { workspace = true, optional = true }
# io
jsonrpsee-types.workspace = true
@@ -51,8 +49,6 @@ op = [
"dep:op-alloy-consensus",
"dep:op-alloy-rpc-types",
"dep:op-alloy-network",
"dep:reth-optimism-primitives",
"dep:reth-storage-api",
"reth-evm/op",
"reth-primitives-traits/op",
"alloy-evm/op",

View File

@@ -24,6 +24,3 @@ pub use transaction::{
};
pub use alloy_evm::rpc::{CallFees, CallFeesError, EthTxEnvError, TryIntoTxEnv};
#[cfg(feature = "op")]
pub use transaction::op::*;

View File

@@ -29,7 +29,7 @@ impl TryFromReceiptResponse<alloy_network::Ethereum> for reth_ethereum_primitive
}
#[cfg(feature = "op")]
impl TryFromReceiptResponse<op_alloy_network::Optimism> for reth_optimism_primitives::OpReceipt {
impl TryFromReceiptResponse<op_alloy_network::Optimism> for op_alloy_consensus::OpReceipt {
type Error = Infallible;
fn from_receipt_response(

Some files were not shown because too many files have changed in this diff Show More