Compare commits

...

103 Commits

Author SHA1 Message Date
Arsenii Kulikov
fe2297639c trace_span 2026-02-19 20:57:33 +04:00
Arsenii Kulikov
4e49b5dea2 optimize update_leaf 2026-02-19 20:55:57 +04:00
Arsenii Kulikov
bb963e634e optimize find_leaf 2026-02-19 20:55:51 +04:00
Alexey Shekhirin
c225132b81 ci(bench): drop root privileges for reth-bench (#22380) 2026-02-19 14:46:52 +00:00
radik878
dcc5d9ec30 fix(events): handle PipelineEvent::Unwound to clean up current_stage (#22340) 2026-02-19 13:48:57 +00:00
Alexey Shekhirin
6cd56b645b ci(bench): support running benchmarks on closed/merged PRs (#22378) 2026-02-19 13:16:03 +00:00
Emma Jamieson-Hoare
794dbff26e ci(hive): remove EIP-6110 deposit tests from expected failures (now passing) (#22377)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-19 12:56:36 +00:00
Emma Jamieson-Hoare
fcfbed0bbc ci(hive): ignore flaky reorg and sync timeout tests (#22376)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-19 12:55:20 +00:00
Alexey Shekhirin
70bcd475fe ci(bench): ABBA run order (#22335) 2026-02-19 12:40:44 +00:00
Emma Jamieson-Hoare
cd6e895a97 fix(rpc): return -32602 for PayloadAttributes structure validation errors (#22374)
Co-authored-by: yongkangc <chiayongkang@hotmail.com>
2026-02-19 12:32:31 +00:00
Emma Jamieson-Hoare
6552a3a9ab ci(hive): fix eels runner OOM crashes (#22373)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-19 12:04:22 +00:00
Derek Cofausper
6a91089542 ci(bench): fix cleanup to use sudo pkill and lazy unmount (#22372)
Co-authored-by: Alexey Shekhirin <github@shekhirin.com>
2026-02-19 11:23:03 +00:00
YK
a9a1e504b4 refactor(trie): simplify encode_account_leaf_value (#22366) 2026-02-19 10:36:44 +00:00
YK
e280f25885 feat(trie): expose storage_wait_time as dedicated Prometheus metric (#22359) 2026-02-19 10:36:26 +00:00
Arsenii Kulikov
37c4f908fa perf: store blinded node hashes on SparseNode::Branch (#22290)
Co-authored-by: Alexey Shekhirin <github@shekhirin.com>
2026-02-19 09:34:42 +00:00
Georgios Konstantopoulos
a157be3f3b perf(tasks): add LazyHandle<T>, use for hash post state (#22347)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-19 08:48:24 +00:00
Georgios Konstantopoulos
e0eb306b2b chore(engine): rename finish span to BlockExecutor::finish (#22356)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-19 07:29:18 +00:00
Micke
7f4f3f1eb9 fix(prune): correct deleted entries count when skip_filter is used (#22312) 2026-02-19 06:19:02 +00:00
Georgios Konstantopoulos
8970f82aaf perf(engine): prefetch first txs sequentially to avoid rayon scheduling stall (#22305)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
2026-02-19 05:53:52 +00:00
Georgios Konstantopoulos
8529da976f fix(cli): store extradata as Bytes, decode hex in parser (#22344)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-19 04:38:16 +00:00
stevencartavia
8fa539225b refactor: remove duplicate apply_pre_execution_changes from Trace trait (#22333) 2026-02-19 04:32:42 +00:00
Doohyun Cho
93d546a36d perf(trie): preserve allocations in sparse trie wipe() (#21089) 2026-02-19 04:02:20 +00:00
zhygis
5c83eb0b06 feat(log): disable file logging by default for non-node commands (#21521)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
2026-02-19 03:16:47 +00:00
Georgios Konstantopoulos
cd32e3cc05 feat(reth-bench): add prometheus metrics scraper (#22244)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
2026-02-19 03:13:40 +00:00
MergeBot
26470cadfc perf(trie): remove redundant HashMap lookup in sparse trie account state query (#22328)
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-19 03:09:04 +00:00
Brian Picciano
506ab806e4 fix: propagate trie update diff result to trigger debug recorder writes (#22331)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-19 02:52:43 +00:00
Forostovec
c2e846093e fix(net): use continue instead of return in buffer_hashes loop (#22337) 2026-02-19 02:46:33 +00:00
dependabot[bot]
5df22b12d8 chore(deps): bump actions/upload-artifact from 4 to 6 (#22338)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-02-19 02:33:00 +00:00
dependabot[bot]
ff9700bb3b chore(deps): bump actions/github-script from 7 to 8 (#22339)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-02-19 02:32:57 +00:00
Georgios Konstantopoulos
85d35fa6c0 feat(tasks): add WorkerMap for named single-thread workers (#22262)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-19 02:27:02 +00:00
Georgios Konstantopoulos
47544d9a7e fix(txpool): ensure transactions are added to pending subpool in nonce order (#22308)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-18 20:31:04 +00:00
Alexey Shekhirin
ef33961aff ci(bench): download snapshot in parallel with builds (#22332) 2026-02-18 17:40:17 +00:00
Georgios Konstantopoulos
0e01a694a7 fix(storage): clarify storage settings mismatch warning (#22330)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-18 16:35:28 +00:00
Alexey Shekhirin
ee19320ee8 ci(bench): use ABBA run order to reduce variance (#22321) 2026-02-18 15:33:31 +00:00
Alexey Shekhirin
9251997c1f ci(bench): build baseline and feature binaries in parallel (#22323) 2026-02-18 14:30:58 +00:00
Brian Picciano
302993b45a feat(trie-debug): record SetRoot op in ParallelSparseTrie::set_root (#22324)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-18 14:03:42 +00:00
Brian Picciano
8d97ab63c6 perf: use stack-allocated [u8; 65] for StoredNibblesSubKey encoding (#22314)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-18 13:35:20 +00:00
Matthias Seitz
251f83ab0b refactor: replace TryFrom*Response traits with unified RpcResponseConverter (#22320)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-18 13:32:47 +00:00
Alexey Shekhirin
e6e0dde903 ci(bench): queue reth-bench jobs and report queue position in PR comment (#22318) 2026-02-18 12:53:12 +00:00
Georgios Konstantopoulos
b1b51261af feat(ci): granular status updates for reth-bench workflow (#22297)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-18 12:05:59 +00:00
Georgios Konstantopoulos
2ae5ef475e feat(ci): add workflow_dispatch trigger for reth-bench (#22298)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-18 11:22:22 +00:00
drhgencer
8861e2724f fix(txpool): notify subscribers when set_block_info promotes transaction (#22243)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-18 09:08:16 +00:00
Georgios Konstantopoulos
734ec4ffe6 feat(engine): add tracing spans to execute_block setup (#22304)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-18 06:23:24 +00:00
Georgios Konstantopoulos
cbcdf8dac0 chore(tracing): use underscores instead of spaces in span names (#22307)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-18 06:21:04 +00:00
Georgios Konstantopoulos
826e387c87 refactor(rpc): use ..Default::default() for SimCallResult initialization (#22309)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-18 05:42:22 +00:00
Forostovec
1c40188993 fix: correct message ID in NodeData version error (#22291) 2026-02-18 05:02:33 +00:00
Matthias Seitz
49a2df0d7a chore: bump alloy deps 1.7.1 -> 1.7.3 (#22296)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-18 06:02:04 +01:00
DaniPopes
a1d1b6def6 fix: prevent ANSI escape codes leaking into Tracy zone text (#22306) 2026-02-18 03:49:34 +00:00
Georgios Konstantopoulos
56bbb3ce2c feat(cli): add reth db prune-checkpoints command (#22288)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-18 01:25:53 +00:00
Georgios Konstantopoulos
5b1010322c docs: clarify StateWriteConfig is about database (MDBX) writes vs static files (#22299)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-18 01:13:31 +00:00
Georgios Konstantopoulos
a195b777eb perf(storage): skip plain state conversion in write_state for storage v2 (#22294)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
2026-02-18 00:48:06 +00:00
Georgios Konstantopoulos
5045e6ef8b feat(bench): add wait time breakdown tables to CI report (#22293) 2026-02-17 23:44:03 +00:00
Alexey Shekhirin
b49cadb346 ci(bench): rename main/branch to baseline/feature, add ref args (#22284)
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-17 23:00:01 +00:00
Georgios Konstantopoulos
aeb2c6e731 chore(primitives): remove legacy transaction roundtrip tests (#22292)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-17 21:15:14 +00:00
stevencartavia
477fed7a11 refactor(primitives): use alloy's EthereumReceipt type (#22254)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-17 20:30:52 +00:00
MergeBot
59993b974a fix(rpc): resolve AtBlockHash to single block in eth_getFilterChanges (#22283)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-17 20:15:33 +00:00
Georgios Konstantopoulos
9ecef47aff fix(provider): skip sender pruning during reorg when sender_recovery is full (#22271)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-17 19:15:03 +00:00
DaniPopes
0ba685386d refactor: dedup runtime initializations (#22263)
Co-authored-by: Alexey Shekhirin <github@shekhirin.com>
2026-02-17 17:35:31 +00:00
Georgios Konstantopoulos
6ff4f947c8 fix(trie): propagate parent span to proof workers (#22279)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-17 16:48:19 +00:00
Alexey Shekhirin
719bbc2543 ci: reth-bench (#22134) 2026-02-17 16:47:47 +00:00
Emma Jamieson-Hoare
a9a6044bc5 chore: fix the rust version for docker (#22278) 2026-02-17 15:19:03 +00:00
Brian Picciano
6f9a3242ef chore: remove legacy proof code paths and simplify to V2-only (#22270)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-17 14:42:13 +00:00
Matthias Seitz
e89bf483bc feat(rpc): add query methods to ActiveFilters (#22275)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-17 14:37:04 +00:00
Georgios Konstantopoulos
61038449c8 fix(rpc): update eth_simulateV1 revert error code to 3 (#22272)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-17 14:36:34 +00:00
Georgios Konstantopoulos
48b2cd970f docs: fix default jwt.hex path in cli args (#22269)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-17 14:35:32 +00:00
Brian Picciano
fb90051010 fix(trie): subtrie root node too small to have hash (#22114)
Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-17 13:30:47 +00:00
Georgios Konstantopoulos
a0a622a155 ci: use normal Docker builds for fork PRs instead of Depot (#22268)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-17 13:27:38 +00:00
Brian Picciano
8db352dfd2 feat(trie): add trie-debug feature for recording sparse trie mutations (#22234)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-17 11:59:11 +00:00
Brian Picciano
117b212e2e feat(trie): Combine extension and branch nodes in output from proof v2 (#22021)
Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
2026-02-17 11:12:48 +00:00
Brian Picciano
df9e3669aa chore: Update nix flake (#22237) 2026-02-17 11:06:11 +00:00
Georgios Konstantopoulos
0464cddfb0 ci: fall back to GitHub-hosted runners for forks (#22266)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-17 10:34:43 +00:00
Tomass
e21a174737 refactor(net): use VecDeque::pop_front_if from Rust 1.93 (#22260) 2026-02-17 08:46:56 +00:00
DaniPopes
e972d9d8c7 chore: rm transact_batch span (#22258) 2026-02-17 04:45:05 +00:00
Georgios Konstantopoulos
7f00ebfafe chore: elide lifetimes in iter_sub_trie_targets (#22256)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-17 02:11:09 +00:00
Matthias Seitz
883e9ae8cc feat(node-core): add with_dev_block_time helper to NodeConfig (#22251)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-17 02:40:46 +01:00
DaniPopes
a1e4132c2d chore: reduce size of common spans (#22253) 2026-02-17 01:29:32 +00:00
DaniPopes
4ecb0d5680 perf: use mutex in for_each_ordered (#22252) 2026-02-17 01:19:56 +00:00
Georgios Konstantopoulos
5b8808e5fd feat(engine): add trigger-based MiningMode variant (#22250)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-17 00:28:25 +00:00
Georgios Konstantopoulos
2eec519bf9 feat(tasks): add WorkerPool with per-thread Worker state (#22154)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-16 23:46:51 +00:00
Georgios Konstantopoulos
02513ecf3b perf(engine): overlap block conversion with execution in payload validation (#21957)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
2026-02-16 23:46:45 +00:00
Alexey Shekhirin
10c6bdb5ff fix(engine): wait for persistence to complete in reth_newPayload (#22239) 2026-02-16 14:08:36 +00:00
Matthias Seitz
20ae9ac405 docs: add type ordering style guide to CLAUDE.md (#22236)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-16 13:38:53 +01:00
Alexey Shekhirin
881500e592 feat(rpc, reth-bench): reth_newPayload methods for reth-bench (#22133)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
2026-02-16 11:11:13 +00:00
pepes
8db125daff fix(engine-primitives): delegate block_to_payload to T (#22180) 2026-02-16 10:09:58 +00:00
James Niken
bf2071f773 fix(primitives-traits): handle KECCAK_EMPTY in From<TrieAccount> (#22200) 2026-02-16 10:02:56 +00:00
Alvarez
ee5ec069cd refactor(tracing): use Option::transpose() for file_guard (#22181) 2026-02-16 11:08:59 +01:00
YK
8722277d6e perf: adaptive multiproof chunk size based on block gas usage (#22233) 2026-02-16 09:49:56 +00:00
Georgios Konstantopoulos
57148eac9f refactor(tasks): remove TaskSpawner trait in favor of concrete Runtime (#22052)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-16 08:51:10 +00:00
YK
74abad29ad perf: reduce update_leaves key cloning (#22228) 2026-02-16 08:34:21 +00:00
drhgencer
997af404a5 fix(rpc): trim spaces in CORS domain parsing (#22192)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
2026-02-16 06:51:34 +00:00
bobtajson
314a92e93c refactor(cli): deduplicate download finalization logic (#22164) 2026-02-16 06:41:47 +00:00
Georgios Konstantopoulos
f0c4be108b fix(engine): send correct transaction index in prewarm task (#22223)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-16 06:21:02 +00:00
Georgios Konstantopoulos
9265e8e46c chore: remove reserved_cpu_cores from rayon thread pools (#22221)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-16 06:13:24 +00:00
Georgios Konstantopoulos
7594e1513a perf: replace some std::time::Instant with quanta::Instant (#22211)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
2026-02-16 05:15:06 +00:00
Georgios Konstantopoulos
7f5acc2723 fix(net): use test backoff durations in Testnet PeerConfig (#22222)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-16 04:45:47 +00:00
DaniPopes
60d0430c2b chore(trie): add level=debug to sparse trie state spans (#22220) 2026-02-16 04:31:26 +00:00
Georgios Konstantopoulos
d49f828998 test: speed up slow integration tests (#22216)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-16 03:53:15 +00:00
Georgios Konstantopoulos
2f78bcd7b5 fix(test): activate prague for sparse trie reuse e2e test (#22215)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-16 03:50:20 +00:00
Georgios Konstantopoulos
f60febfa62 chore(ci): reduce default test timeout to 60s (#22212)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-16 03:43:58 +00:00
Georgios Konstantopoulos
317f858bd4 feat(engine): add gas-bucketed sub-phase metrics for new_payload (#22210)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: YK <chiayongkang@hotmail.com>
2026-02-16 03:35:59 +00:00
Georgios Konstantopoulos
11acd97982 chore: use --locked for all cargo install invocations (#22214)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-16 03:35:17 +00:00
Georgios Konstantopoulos
f5cf90227b fix(net): fix flaky test_trusted_peer_only test (#22213)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-16 03:30:37 +00:00
DaniPopes
0dd47af250 perf: add dedicated prewarming rayon pool (#22108) 2026-02-16 03:05:36 +00:00
351 changed files with 9808 additions and 10284 deletions

View File

@@ -0,0 +1,10 @@
---
reth-engine-primitives: patch
reth-engine-tree: patch
reth-node-core: patch
reth-trie-parallel: minor
---
Removed legacy proof calculation system and V2-specific configuration flags.
Removed the legacy (non-V2) proof calculation code paths, simplified multiproof task architecture by removing the dual-mode system, and cleaned up V2-specific CLI flags (`--engine.disable-proof-v2`, `--engine.disable-trie-cache`) that are no longer needed. The codebase now exclusively uses V2 proofs with the sparse trie cache.

View File

@@ -0,0 +1,5 @@
---
reth-trie-sparse: patch
---
Refactored sparse trie node state tracking to use RLP nodes instead of hashes. Replaced `Option<B256>` hash fields with `SparseNodeState` enum that tracks either dirty nodes or cached RLP nodes with optional database storage flags. Added debug assertions to validate leaf path lengths and improved pruning logic to use node paths directly instead of path-hash tuples.

View File

@@ -0,0 +1,5 @@
---
reth-trie-sparse: patch
---
Added recording of `SetRoot` operation in `ParallelSparseTrie::set_root` when the `trie-debug` feature is enabled.

View File

@@ -0,0 +1,6 @@
---
reth-rpc-convert: minor
reth-storage-rpc-provider: minor
---
Replaced the separate `TryFromBlockResponse`, `TryFromReceiptResponse`, and `TryFromTransactionResponse` traits with a unified `RpcResponseConverter` trait and default `EthRpcConverter` implementation. Removed the `op-alloy-network` dependency and refactored `RpcBlockchainProvider` to store a dynamic converter instance instead of relying on per-type trait bounds.

View File

@@ -0,0 +1,5 @@
---
reth-rpc-eth-types: patch
---
Updated `eth_simulateV1` revert error code from `-32000` to `3` to be consistent with `eth_call`, per [execution-apis#748](https://github.com/ethereum/execution-apis/pull/748).

View File

@@ -0,0 +1,5 @@
---
reth-engine-tree: patch
---
Fixed `compare_trie_updates` to return `bool` indicating whether differences were found, and updated the caller to properly use the return value instead of treating all successful comparisons as having no differences.

View File

@@ -0,0 +1,5 @@
---
reth-node-core: minor
---
Added `with_dev_block_time` helper method to `NodeConfig` for configuring dev miner block production interval.

View File

@@ -0,0 +1,5 @@
---
reth-db-api: patch
---
Changed `StoredNibblesSubKey` encoding to use a stack-allocated `[u8; 65]` array instead of a heap-allocated `Vec<u8>`, avoiding unnecessary heap allocation.

View File

@@ -0,0 +1,8 @@
---
reth: patch
reth-engine-tree: patch
reth-node-builder: patch
reth-trie-sparse: minor
---
Added `trie-debug` feature for recording sparse trie mutations to aid in debugging state root mismatches.

View File

@@ -0,0 +1,5 @@
---
reth-provider: patch
---
Fixed sender pruning during block reorg to skip when sender_recovery is fully pruned, preventing a fatal crash when no sender data exists in static files.

View File

@@ -0,0 +1,6 @@
---
reth-engine-local: minor
reth-node-builder: minor
---
Added trigger-based `MiningMode` variant that allows blocks to be built on-demand via custom streams, and exposed `with_mining_mode` method on `DebugNodeLauncherFuture` to override default mining configuration.

View File

@@ -0,0 +1,5 @@
---
reth-transaction-pool: patch
---
Fixed a bug where transactions from the same sender were added to the pending subpool out of nonce order. Ensured `process_updates` runs before `add_new_transaction` so that lower-nonce promotions are enqueued before the newly inserted higher-nonce transaction, preserving correct ordering for live `BestTransactions` iterators.

View File

@@ -1,6 +1,6 @@
[profile.default]
retries = { backoff = "exponential", count = 2, delay = "2s", jitter = true }
slow-timeout = { period = "30s", terminate-after = 4 }
slow-timeout = { period = "30s", terminate-after = 2 }
[[profile.default.overrides]]
filter = "test(general_state_tests)"

67
.github/scripts/bench-reth-build.sh vendored Executable file
View File

@@ -0,0 +1,67 @@
#!/usr/bin/env bash
#
# Builds (or fetches from cache) reth binaries for benchmarking.
#
# Usage: bench-reth-build.sh <baseline|feature> <source-dir> <commit> [branch-sha]
#
# baseline — build/fetch the baseline binary at <commit> (merge-base)
# source-dir must be checked out at <commit>
# feature — build/fetch the candidate binary + reth-bench at <commit>
# source-dir must be checked out at <commit>
# optional branch-sha is the PR head commit for cache key
#
# Outputs:
# baseline: <source-dir>/target/profiling/reth
# feature: <source-dir>/target/profiling/reth, reth-bench installed to cargo bin
#
# Required: mc (MinIO client) configured at /home/ubuntu/.mc
set -euo pipefail
MC="mc --config-dir /home/ubuntu/.mc"
MODE="$1"
SOURCE_DIR="$2"
COMMIT="$3"
case "$MODE" in
baseline|main)
BUCKET="minio/reth-binaries/${COMMIT}"
mkdir -p "${SOURCE_DIR}/target/profiling"
if $MC stat "${BUCKET}/reth" &>/dev/null; then
echo "Cache hit for baseline (${COMMIT}), downloading binary..."
$MC cp "${BUCKET}/reth" "${SOURCE_DIR}/target/profiling/reth"
chmod +x "${SOURCE_DIR}/target/profiling/reth"
else
echo "Cache miss for baseline (${COMMIT}), building from source..."
cd "${SOURCE_DIR}"
cargo build --profile profiling --bin reth
$MC cp target/profiling/reth "${BUCKET}/reth"
fi
;;
feature|branch)
BRANCH_SHA="${4:-$COMMIT}"
BUCKET="minio/reth-binaries/${BRANCH_SHA}"
if $MC stat "${BUCKET}/reth" &>/dev/null && $MC stat "${BUCKET}/reth-bench" &>/dev/null; then
echo "Cache hit for ${BRANCH_SHA}, downloading binaries..."
mkdir -p "${SOURCE_DIR}/target/profiling"
$MC cp "${BUCKET}/reth" "${SOURCE_DIR}/target/profiling/reth"
$MC cp "${BUCKET}/reth-bench" /home/ubuntu/.cargo/bin/reth-bench
chmod +x "${SOURCE_DIR}/target/profiling/reth" /home/ubuntu/.cargo/bin/reth-bench
else
echo "Cache miss for ${BRANCH_SHA}, building from source..."
cd "${SOURCE_DIR}"
rustup show active-toolchain || rustup default stable
make profiling
make install-reth-bench
$MC cp target/profiling/reth "${BUCKET}/reth"
$MC cp "$(which reth-bench)" "${BUCKET}/reth-bench"
fi
;;
*)
echo "Usage: $0 <baseline|feature> <source-dir> <commit> [branch-sha]"
exit 1
;;
esac

258
.github/scripts/bench-reth-charts.py vendored Normal file
View File

@@ -0,0 +1,258 @@
#!/usr/bin/env python3
"""Generate benchmark charts from reth-bench CSV output.
Usage:
bench-engine-charts.py <combined_csv> --output-dir <dir> [--baseline <baseline_csv>]
Generates three PNG charts:
1. newPayload latency + Ggas/s per block (+ latency diff when baseline present)
2. Wait breakdown (persistence, execution cache, sparse trie) per block
3. Scatter plot of gas used vs latency
When --baseline is provided, charts overlay both datasets for comparison.
"""
import argparse
import csv
import sys
from pathlib import Path
import numpy as np
try:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
except ImportError:
print("matplotlib is required: pip install matplotlib", file=sys.stderr)
sys.exit(1)
GIGAGAS = 1_000_000_000
def parse_combined_csv(path: str) -> list[dict]:
rows = []
with open(path) as f:
reader = csv.DictReader(f)
for row in reader:
rows.append(
{
"block_number": int(row["block_number"]),
"gas_used": int(row["gas_used"]),
"new_payload_latency_us": int(row["new_payload_latency"]),
"persistence_wait_us": int(row["persistence_wait"])
if row.get("persistence_wait")
else None,
"execution_cache_wait_us": int(row.get("execution_cache_wait", 0)),
"sparse_trie_wait_us": int(row.get("sparse_trie_wait", 0)),
}
)
return rows
def plot_latency_and_throughput(
feature: list[dict], baseline: list[dict] | None, out: Path,
baseline_name: str = "baseline", feature_name: str = "feature",
):
num_plots = 3 if baseline else 2
fig, axes = plt.subplots(num_plots, 1, figsize=(12, 4 * num_plots), sharex=True)
ax1, ax2 = axes[0], axes[1]
feat_x = [r["block_number"] for r in feature]
feat_lat = [r["new_payload_latency_us"] / 1_000 for r in feature]
feat_ggas = []
for r in feature:
lat_s = r["new_payload_latency_us"] / 1_000_000
feat_ggas.append(r["gas_used"] / lat_s / GIGAGAS if lat_s > 0 else 0)
if baseline:
base_x = [r["block_number"] for r in baseline]
base_lat = [r["new_payload_latency_us"] / 1_000 for r in baseline]
base_ggas = []
for r in baseline:
lat_s = r["new_payload_latency_us"] / 1_000_000
base_ggas.append(r["gas_used"] / lat_s / GIGAGAS if lat_s > 0 else 0)
ax1.plot(base_x, base_lat, linewidth=0.8, label=baseline_name, alpha=0.7)
ax2.plot(base_x, base_ggas, linewidth=0.8, label=baseline_name, alpha=0.7)
ax1.plot(feat_x, feat_lat, linewidth=0.8, label=feature_name)
ax1.set_ylabel("Latency (ms)")
ax1.set_title("newPayload Latency per Block")
ax1.grid(True, alpha=0.3)
if baseline:
ax1.legend()
ax2.plot(feat_x, feat_ggas, linewidth=0.8, label=feature_name)
ax2.set_ylabel("Ggas/s")
ax2.set_title("Execution Throughput per Block")
ax2.grid(True, alpha=0.3)
if baseline:
ax2.legend()
if baseline:
ax3 = axes[2]
base_by_block = {r["block_number"]: r["new_payload_latency_us"] for r in baseline}
blocks, diffs = [], []
for r in feature:
bn = r["block_number"]
if bn in base_by_block and base_by_block[bn] > 0:
pct = (r["new_payload_latency_us"] - base_by_block[bn]) / base_by_block[bn] * 100
blocks.append(bn)
diffs.append(pct)
if blocks:
colors = ["green" if d <= 0 else "red" for d in diffs]
ax3.bar(blocks, diffs, width=1.0, color=colors, alpha=0.7, edgecolor="none")
ax3.axhline(0, color="black", linewidth=0.5)
ax3.set_ylabel("Δ Latency (%)")
ax3.set_title("Per-Block newPayload Latency Change (feature vs baseline)")
ax3.grid(True, alpha=0.3, axis="y")
axes[-1].set_xlabel("Block Number")
fig.tight_layout()
fig.savefig(out, dpi=150)
plt.close(fig)
def plot_wait_breakdown(
feature: list[dict], baseline: list[dict] | None, out: Path,
baseline_name: str = "baseline", feature_name: str = "feature",
):
series = [
("Persistence Wait", "persistence_wait_us"),
("State Cache Wait", "execution_cache_wait_us"),
("Trie Cache Wait", "sparse_trie_wait_us"),
]
fig, axes = plt.subplots(len(series), 1, figsize=(12, 3 * len(series)), sharex=True)
for ax, (label, key) in zip(axes, series):
if baseline:
bx = [r["block_number"] for r in baseline if r[key] is not None]
by = [r[key] / 1_000 for r in baseline if r[key] is not None]
if bx:
ax.plot(bx, by, linewidth=0.8, label=baseline_name, alpha=0.7)
fx = [r["block_number"] for r in feature if r[key] is not None]
fy = [r[key] / 1_000 for r in feature if r[key] is not None]
if fx:
ax.plot(fx, fy, linewidth=0.8, label=feature_name)
ax.set_ylabel("ms")
ax.set_title(label)
ax.grid(True, alpha=0.3)
if baseline:
ax.legend()
axes[-1].set_xlabel("Block Number")
fig.suptitle("Wait Time Breakdown per Block", fontsize=14, y=1.01)
fig.tight_layout()
fig.savefig(out, dpi=150, bbox_inches="tight")
plt.close(fig)
def _add_regression(ax, x, y, color, label):
"""Add a linear regression line to the axes."""
if len(x) < 2:
return
xa, ya = np.array(x), np.array(y)
m, b = np.polyfit(xa, ya, 1)
x_range = np.linspace(xa.min(), xa.max(), 100)
ax.plot(x_range, m * x_range + b, color=color, linewidth=1.5, alpha=0.8,
label=label)
def plot_gas_vs_latency(
feature: list[dict], baseline: list[dict] | None, out: Path,
baseline_name: str = "baseline", feature_name: str = "feature",
):
fig, ax = plt.subplots(figsize=(8, 6))
if baseline:
bgas = [r["gas_used"] / 1_000_000 for r in baseline]
blat = [r["new_payload_latency_us"] / 1_000 for r in baseline]
ax.scatter(bgas, blat, s=8, alpha=0.5)
_add_regression(ax, bgas, blat, "tab:blue", baseline_name)
fgas = [r["gas_used"] / 1_000_000 for r in feature]
flat = [r["new_payload_latency_us"] / 1_000 for r in feature]
ax.scatter(fgas, flat, s=8, alpha=0.6)
_add_regression(ax, fgas, flat, "tab:orange", feature_name)
ax.set_xlabel("Gas Used (Mgas)")
ax.set_ylabel("newPayload Latency (ms)")
ax.set_title("Gas Used vs Latency")
ax.grid(True, alpha=0.3)
ax.legend()
fig.tight_layout()
fig.savefig(out, dpi=150)
plt.close(fig)
def merge_csvs(paths: list[str]) -> list[dict]:
"""Parse and merge multiple CSVs, averaging values for duplicate blocks."""
by_block: dict[int, list[dict]] = {}
for path in paths:
for row in parse_combined_csv(path):
by_block.setdefault(row["block_number"], []).append(row)
merged = []
for bn in sorted(by_block):
rows = by_block[bn]
if len(rows) == 1:
merged.append(rows[0])
else:
avg = {"block_number": bn}
for key in ("gas_used", "new_payload_latency_us"):
avg[key] = int(sum(r[key] for r in rows) / len(rows))
for key in ("persistence_wait_us", "execution_cache_wait_us", "sparse_trie_wait_us"):
vals = [r[key] for r in rows if r[key] is not None]
avg[key] = int(sum(vals) / len(vals)) if vals else None
merged.append(avg)
return merged
def main():
parser = argparse.ArgumentParser(description="Generate benchmark charts")
parser.add_argument(
"--feature", nargs="+", required=True,
help="Path(s) to feature combined_latency.csv",
)
parser.add_argument(
"--output-dir", required=True, help="Output directory for PNG charts"
)
parser.add_argument(
"--baseline", nargs="+", help="Path(s) to baseline combined_latency.csv"
)
parser.add_argument("--baseline-name", default="baseline", help="Label for baseline")
parser.add_argument("--feature-name", "--branch-name", default="feature", help="Label for feature")
args = parser.parse_args()
feature = merge_csvs(args.feature)
if not feature:
print("No results found in feature CSV(s)", file=sys.stderr)
sys.exit(1)
baseline = None
if args.baseline:
baseline = merge_csvs(args.baseline)
if not baseline:
print(
"Warning: no results in baseline CSV(s), skipping comparison",
file=sys.stderr,
)
baseline = None
out_dir = Path(args.output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
bname = args.baseline_name
fname = args.feature_name
plot_latency_and_throughput(feature, baseline, out_dir / "latency_throughput.png", bname, fname)
plot_wait_breakdown(feature, baseline, out_dir / "wait_breakdown.png", bname, fname)
plot_gas_vs_latency(feature, baseline, out_dir / "gas_vs_latency.png", bname, fname)
print(f"Charts written to {out_dir}")
if __name__ == "__main__":
main()

103
.github/scripts/bench-reth-run.sh vendored Executable file
View File

@@ -0,0 +1,103 @@
#!/usr/bin/env bash
#
# Runs a single reth-bench cycle: mount snapshot → start node → warmup →
# benchmark → stop node → recover snapshot.
#
# Usage: bench-reth-run.sh <label> <binary> <output-dir>
#
# Required env: SCHELK_MOUNT, BENCH_RPC_URL, BENCH_BLOCKS, BENCH_WARMUP_BLOCKS
set -euo pipefail
LABEL="$1"
BINARY="$2"
OUTPUT_DIR="$3"
DATADIR="$SCHELK_MOUNT/datadir"
mkdir -p "$OUTPUT_DIR"
LOG="${OUTPUT_DIR}/node.log"
cleanup() {
kill "$TAIL_PID" 2>/dev/null || true
if [ -n "${RETH_PID:-}" ] && sudo kill -0 "$RETH_PID" 2>/dev/null; then
sudo kill "$RETH_PID"
for i in $(seq 1 30); do
sudo kill -0 "$RETH_PID" 2>/dev/null || break
sleep 1
done
sudo kill -9 "$RETH_PID" 2>/dev/null || true
sleep 1
fi
if mountpoint -q "$SCHELK_MOUNT"; then
sudo umount -l "$SCHELK_MOUNT" || true
sudo schelk recover -y || true
fi
}
TAIL_PID=
trap cleanup EXIT
# Mount
sudo schelk mount -y
sync
sudo sh -c 'echo 3 > /proc/sys/vm/drop_caches'
echo "=== Cache state after drop ==="
free -h
grep Cached /proc/meminfo
# Start reth
# CPU layout: core 0 = OS/IRQs/reth-bench/aux, cores 1+ = reth node
RETH_BENCH="$(which reth-bench)"
ONLINE=$(nproc --all)
RETH_CPUS="1-$(( ONLINE - 1 ))"
sudo taskset -c "$RETH_CPUS" nice -n -20 "$BINARY" node \
--datadir "$DATADIR" \
--engine.accept-execution-requests-hash \
--http \
--http.port 8545 \
--ws \
--ws.api all \
--authrpc.port 8551 \
--disable-discovery \
--no-persist-peers \
> "$LOG" 2>&1 &
RETH_PID=$!
stdbuf -oL tail -f "$LOG" | sed -u "s/^/[reth] /" &
TAIL_PID=$!
for i in $(seq 1 60); do
if curl -sf http://127.0.0.1:8545 -X POST \
-H 'Content-Type: application/json' \
-d '{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}' \
> /dev/null 2>&1; then
echo "reth (${LABEL}) is ready after ${i}s"
break
fi
if [ "$i" -eq 60 ]; then
echo "::error::reth (${LABEL}) failed to start within 60s"
cat "$LOG"
exit 1
fi
sleep 1
done
# Run reth-bench with high priority but as the current user so output
# files are not root-owned (avoids EACCES on next checkout).
BENCH_NICE="sudo nice -n -20 sudo -u $(id -un)"
# Warmup
$BENCH_NICE "$RETH_BENCH" new-payload-fcu \
--rpc-url "$BENCH_RPC_URL" \
--engine-rpc-url http://127.0.0.1:8551 \
--jwt-secret "$DATADIR/jwt.hex" \
--advance "${BENCH_WARMUP_BLOCKS:-50}" \
--reth-new-payload 2>&1 | sed -u "s/^/[bench] /"
# Benchmark
$BENCH_NICE "$RETH_BENCH" new-payload-fcu \
--rpc-url "$BENCH_RPC_URL" \
--engine-rpc-url http://127.0.0.1:8551 \
--jwt-secret "$DATADIR/jwt.hex" \
--advance "$BENCH_BLOCKS" \
--reth-new-payload \
--output "$OUTPUT_DIR" 2>&1 | sed -u "s/^/[bench] /"
# cleanup runs via trap

125
.github/scripts/bench-reth-snapshot.sh vendored Executable file
View File

@@ -0,0 +1,125 @@
#!/usr/bin/env bash
#
# Downloads the latest nightly snapshot into the schelk volume with
# progress reporting to the GitHub PR comment.
#
# Skips the download if the local ETag marker matches the remote one.
#
# Usage: bench-reth-snapshot.sh [--check]
# --check Only check if a download is needed; exits 0 if up-to-date, 1 if not.
#
# Required env:
# SCHELK_MOUNT schelk mount point (e.g. /reth-bench)
# GITHUB_TOKEN token for GitHub API calls (only for download)
# BENCH_COMMENT_ID PR comment ID to update (optional)
# BENCH_REPO owner/repo (e.g. paradigmxyz/reth)
# BENCH_JOB_URL link to the Actions job
# BENCH_ACTOR user who triggered the benchmark
# BENCH_CONFIG config summary line
set -euo pipefail
BUCKET="minio/reth-snapshots/reth-1-minimal-nightly-previous.tar.zst"
DATADIR="$SCHELK_MOUNT/datadir"
ETAG_FILE="$HOME/.reth-bench-snapshot-etag"
# Get remote metadata via JSON for reliable parsing
MC_STAT=$(mc stat --json "$BUCKET" 2>/dev/null || true)
REMOTE_ETAG=$(echo "$MC_STAT" | jq -r '.etag // empty')
if [ -z "$REMOTE_ETAG" ]; then
echo "::warning::Failed to get ETag from mc stat, will re-download"
REMOTE_ETAG="unknown-$(date +%s)"
fi
LOCAL_ETAG=""
[ -f "$ETAG_FILE" ] && LOCAL_ETAG=$(cat "$ETAG_FILE")
if [ "$REMOTE_ETAG" = "$LOCAL_ETAG" ]; then
echo "Snapshot is up-to-date (ETag: ${REMOTE_ETAG})"
if [ "${1:-}" = "--check" ]; then
exit 0
fi
exit 0
fi
echo "Snapshot needs update (local: ${LOCAL_ETAG:-<none>}, remote: ${REMOTE_ETAG})"
if [ "${1:-}" = "--check" ]; then
exit 1
fi
# Get compressed size for progress tracking
TOTAL_BYTES=$(echo "$MC_STAT" | jq -r '.size // empty')
if [ -z "$TOTAL_BYTES" ] || [ "$TOTAL_BYTES" = "0" ]; then
echo "::error::Failed to get snapshot size from mc stat"
exit 1
fi
echo "Snapshot size: $TOTAL_BYTES bytes ($(numfmt --to=iec "$TOTAL_BYTES"))"
# Prepare mount
mountpoint -q "$SCHELK_MOUNT" && sudo schelk recover -y || true
sudo schelk mount -y
sudo rm -rf "$DATADIR"
sudo mkdir -p "$DATADIR"
update_comment() {
local pct="$1"
[ -z "${BENCH_COMMENT_ID:-}" ] && return 0
local status="Building binaries & downloading snapshot… ${pct}%"
local body
body="$(printf 'cc @%s\n\n🚀 Benchmark started! [View job](%s)\n\n⏳ **Status:** %s\n\n%s' \
"$BENCH_ACTOR" "$BENCH_JOB_URL" "$status" "$BENCH_CONFIG")"
curl -sf -X PATCH \
-H "Authorization: token ${GITHUB_TOKEN}" \
-H "Accept: application/vnd.github.v3+json" \
"https://api.github.com/repos/${BENCH_REPO}/issues/comments/${BENCH_COMMENT_ID}" \
-d "$(jq -nc --arg body "$body" '{body: $body}')" \
> /dev/null 2>&1 || true
}
# Track compressed bytes flowing through the pipe
DL_BYTES_FILE=$(mktemp)
echo 0 > "$DL_BYTES_FILE"
# Start progress reporter in background
(
while true; do
sleep 10
CURRENT=$(cat "$DL_BYTES_FILE" 2>/dev/null || echo 0)
if [ "$TOTAL_BYTES" -gt 0 ]; then
PCT=$(( CURRENT * 100 / TOTAL_BYTES ))
[ "$PCT" -gt 100 ] && PCT=100
echo "Snapshot download: $(numfmt --to=iec "$CURRENT") / $(numfmt --to=iec "$TOTAL_BYTES") (${PCT}%)"
update_comment "$PCT"
fi
done
) &
PROGRESS_PID=$!
trap 'kill $PROGRESS_PID 2>/dev/null || true; rm -f "$DL_BYTES_FILE"' EXIT
# Download and extract; python byte counter tracks compressed bytes received
mc cat "$BUCKET" | python3 -c "
import sys
count = 0
while True:
data = sys.stdin.buffer.read(1048576)
if not data:
break
count += len(data)
sys.stdout.buffer.write(data)
with open('$DL_BYTES_FILE', 'w') as f:
f.write(str(count))
" | pzstd -d -p 6 | sudo tar -xf - -C "$DATADIR"
# Stop progress reporter
kill $PROGRESS_PID 2>/dev/null || true
wait $PROGRESS_PID 2>/dev/null || true
update_comment "100"
echo "Snapshot download complete"
# Sync the new snapshot as the schelk baseline
sync
sudo schelk recover -y
# Save ETag marker
echo "$REMOTE_ETAG" > "$ETAG_FILE"
echo "Snapshot synced to schelk (ETag: ${REMOTE_ETAG})"

504
.github/scripts/bench-reth-summary.py vendored Executable file
View File

@@ -0,0 +1,504 @@
#!/usr/bin/env python3
"""Parse reth-bench CSV output and generate a summary JSON + markdown comparison.
Usage:
bench-reth-summary.py <combined_csv> <gas_csv> \
--output-summary <summary.json> \
--output-markdown <comment.md> \
--baseline-csv <baseline_combined.csv> \
[--repo <owner/repo>] \
[--baseline-ref <sha>] \
[--feature-name <name>] \
[--feature-sha <sha>]
Generates a paired statistical comparison between baseline and feature.
Matches blocks by number and computes per-block diffs to cancel out gas
variance. Fails if baseline or feature CSV is missing or empty.
"""
import argparse
import csv
import json
import math
import random
import sys
GIGAGAS = 1_000_000_000
T_CRITICAL = 1.96 # two-tailed 95% confidence
BOOTSTRAP_ITERATIONS = 10_000
def _opt_int(row: dict, key: str) -> int | None:
"""Return int value for a CSV field, or None if missing/empty."""
v = row.get(key)
if v is None or v == "":
return None
return int(v)
def parse_combined_csv(path: str) -> list[dict]:
"""Parse combined_latency.csv into a list of per-block dicts."""
rows = []
with open(path) as f:
reader = csv.DictReader(f)
for row in reader:
rows.append(
{
"block_number": int(row["block_number"]),
"gas_used": int(row["gas_used"]),
"gas_limit": int(row["gas_limit"]),
"transaction_count": int(row["transaction_count"]),
"new_payload_latency_us": int(row["new_payload_latency"]),
"fcu_latency_us": int(row["fcu_latency"]),
"total_latency_us": int(row["total_latency"]),
"persistence_wait_us": _opt_int(row, "persistence_wait"),
"execution_cache_wait_us": _opt_int(row, "execution_cache_wait"),
"sparse_trie_wait_us": _opt_int(row, "sparse_trie_wait"),
}
)
return rows
def parse_gas_csv(path: str) -> list[dict]:
"""Parse total_gas.csv into a list of per-block dicts."""
rows = []
with open(path) as f:
reader = csv.DictReader(f)
for row in reader:
rows.append(
{
"block_number": int(row["block_number"]),
"gas_used": int(row["gas_used"]),
"time_us": int(row["time"]),
}
)
return rows
def stddev(values: list[float], mean: float) -> float:
if len(values) < 2:
return 0.0
return math.sqrt(sum((v - mean) ** 2 for v in values) / (len(values) - 1))
def percentile(sorted_vals: list[float], pct: int) -> float:
if not sorted_vals:
return 0.0
idx = int(len(sorted_vals) * pct / 100)
idx = min(idx, len(sorted_vals) - 1)
return sorted_vals[idx]
def compute_stats(combined: list[dict]) -> dict:
"""Compute per-run statistics from parsed CSV data."""
n = len(combined)
if n == 0:
return {}
latencies_ms = [r["new_payload_latency_us"] / 1_000 for r in combined]
sorted_lat = sorted(latencies_ms)
mean_lat = sum(latencies_ms) / n
std_lat = stddev(latencies_ms, mean_lat)
mgas_s_values = []
for r in combined:
lat_s = r["new_payload_latency_us"] / 1_000_000
if lat_s > 0:
mgas_s_values.append(r["gas_used"] / lat_s / 1_000_000)
mean_mgas_s = sum(mgas_s_values) / len(mgas_s_values) if mgas_s_values else 0
return {
"n": n,
"mean_ms": mean_lat,
"stddev_ms": std_lat,
"p50_ms": percentile(sorted_lat, 50),
"p90_ms": percentile(sorted_lat, 90),
"p99_ms": percentile(sorted_lat, 99),
"mean_mgas_s": mean_mgas_s,
}
def compute_wait_stats(combined: list[dict], field: str) -> dict:
"""Compute mean/p50/p95 for a wait time field (in ms)."""
values_ms = []
for r in combined:
v = r.get(field)
if v is not None:
values_ms.append(v / 1_000)
if not values_ms:
return {}
n = len(values_ms)
mean_val = sum(values_ms) / n
sorted_vals = sorted(values_ms)
return {
"mean_ms": mean_val,
"p50_ms": percentile(sorted_vals, 50),
"p95_ms": percentile(sorted_vals, 95),
}
def _paired_data(
baseline: list[dict], feature: list[dict]
) -> tuple[list[tuple[float, float]], list[float], list[float]]:
"""Match blocks and return paired latencies and per-block diffs.
Returns:
pairs: list of (baseline_ms, feature_ms) tuples
lat_diffs_ms: list of feature baseline latency diffs in ms
mgas_diffs: list of feature baseline Mgas/s diffs
"""
baseline_by_block = {r["block_number"]: r for r in baseline}
feature_by_block = {r["block_number"]: r for r in feature}
common_blocks = sorted(set(baseline_by_block) & set(feature_by_block))
pairs = []
lat_diffs_ms = []
mgas_diffs = []
for bn in common_blocks:
b = baseline_by_block[bn]
f = feature_by_block[bn]
b_ms = b["new_payload_latency_us"] / 1_000
f_ms = f["new_payload_latency_us"] / 1_000
pairs.append((b_ms, f_ms))
lat_diffs_ms.append(f_ms - b_ms)
b_lat_s = b["new_payload_latency_us"] / 1_000_000
f_lat_s = f["new_payload_latency_us"] / 1_000_000
if b_lat_s > 0 and f_lat_s > 0:
mgas_diffs.append(
f["gas_used"] / f_lat_s / 1_000_000
- b["gas_used"] / b_lat_s / 1_000_000
)
return pairs, lat_diffs_ms, mgas_diffs
def compute_paired_stats(
baseline_runs: list[list[dict]],
feature_runs: list[list[dict]],
) -> dict:
"""Compute paired statistics between baseline and feature runs.
Each pair (baseline_runs[i], feature_runs[i]) produces per-block diffs.
All diffs are pooled for the final CI.
"""
all_pairs = []
all_lat_diffs = []
all_mgas_diffs = []
blocks_per_pair = []
for baseline, feature in zip(baseline_runs, feature_runs):
pairs, lat_diffs, mgas_diffs = _paired_data(baseline, feature)
all_pairs.extend(pairs)
all_lat_diffs.extend(lat_diffs)
all_mgas_diffs.extend(mgas_diffs)
blocks_per_pair.append(len(pairs))
if not all_lat_diffs:
return {}
n = len(all_lat_diffs)
mean_diff = sum(all_lat_diffs) / n
std_diff = stddev(all_lat_diffs, mean_diff)
se = std_diff / math.sqrt(n) if n > 0 else 0.0
ci = T_CRITICAL * se
# Bootstrap CI on difference-of-percentiles (resample paired blocks)
base_lats = sorted([p[0] for p in all_pairs])
feature_lats = sorted([p[1] for p in all_pairs])
p50_diff = percentile(feature_lats, 50) - percentile(base_lats, 50)
p90_diff = percentile(feature_lats, 90) - percentile(base_lats, 90)
p99_diff = percentile(feature_lats, 99) - percentile(base_lats, 99)
rng = random.Random(42)
p50_boot, p90_boot, p99_boot = [], [], []
for _ in range(BOOTSTRAP_ITERATIONS):
sample = rng.choices(all_pairs, k=n)
b_sorted = sorted(p[0] for p in sample)
f_sorted = sorted(p[1] for p in sample)
p50_boot.append(percentile(f_sorted, 50) - percentile(b_sorted, 50))
p90_boot.append(percentile(f_sorted, 90) - percentile(b_sorted, 90))
p99_boot.append(percentile(f_sorted, 99) - percentile(b_sorted, 99))
p50_boot.sort()
p90_boot.sort()
p99_boot.sort()
lo = int(BOOTSTRAP_ITERATIONS * 0.025)
hi = int(BOOTSTRAP_ITERATIONS * 0.975)
mean_mgas_diff = sum(all_mgas_diffs) / len(all_mgas_diffs) if all_mgas_diffs else 0.0
std_mgas_diff = stddev(all_mgas_diffs, mean_mgas_diff) if len(all_mgas_diffs) > 1 else 0.0
mgas_se = std_mgas_diff / math.sqrt(len(all_mgas_diffs)) if all_mgas_diffs else 0.0
mgas_ci = T_CRITICAL * mgas_se
return {
"n": n,
"mean_diff_ms": mean_diff,
"ci_ms": ci,
"p50_diff_ms": p50_diff,
"p50_ci_ms": (p50_boot[hi] - p50_boot[lo]) / 2,
"p90_diff_ms": p90_diff,
"p90_ci_ms": (p90_boot[hi] - p90_boot[lo]) / 2,
"p99_diff_ms": p99_diff,
"p99_ci_ms": (p99_boot[hi] - p99_boot[lo]) / 2,
"mean_mgas_diff": mean_mgas_diff,
"mgas_ci": mgas_ci,
"blocks": max(blocks_per_pair),
}
def compute_summary(combined: list[dict], gas: list[dict]) -> dict:
"""Compute aggregate metrics from parsed CSV data."""
blocks = len(combined)
return {
"blocks": blocks,
}
def format_duration(seconds: float) -> str:
if seconds >= 60:
return f"{seconds / 60:.1f}min"
return f"{seconds}s"
def format_gas(gas: int) -> str:
if gas >= GIGAGAS:
return f"{gas / GIGAGAS:.1f}G"
if gas >= 1_000_000:
return f"{gas / 1_000_000:.1f}M"
return f"{gas:,}"
def fmt_ms(v: float) -> str:
return f"{v:.2f}ms"
def fmt_mgas(v: float) -> str:
return f"{v:.2f}"
def change_str(pct: float, ci_pct: float, lower_is_better: bool) -> str:
"""Format change% with paired CI significance.
Significant if the CI doesn't cross zero (i.e. |pct| > ci_pct).
"""
significant = abs(pct) > ci_pct
if not significant:
emoji = ""
elif (pct < 0) == lower_is_better:
emoji = ""
else:
emoji = ""
return f"{pct:+.2f}% {emoji}{ci_pct:.2f}%)"
def generate_comparison_table(
run1: dict,
run2: dict,
paired: dict,
repo: str,
baseline_ref: str,
baseline_name: str,
feature_name: str,
feature_sha: str,
) -> str:
"""Generate a markdown comparison table between baseline and feature."""
n = paired["blocks"]
def pct(base: float, feat: float) -> float:
return (feat - base) / base * 100.0 if base > 0 else 0.0
mean_pct = pct(run1["mean_ms"], run2["mean_ms"])
gas_pct = pct(run1["mean_mgas_s"], run2["mean_mgas_s"])
p50_pct = pct(run1["p50_ms"], run2["p50_ms"])
p90_pct = pct(run1["p90_ms"], run2["p90_ms"])
p99_pct = pct(run1["p99_ms"], run2["p99_ms"])
# Bootstrap CIs as % of baseline percentile
p50_ci_pct = paired["p50_ci_ms"] / run1["p50_ms"] * 100.0 if run1["p50_ms"] > 0 else 0.0
p90_ci_pct = paired["p90_ci_ms"] / run1["p90_ms"] * 100.0 if run1["p90_ms"] > 0 else 0.0
p99_ci_pct = paired["p99_ci_ms"] / run1["p99_ms"] * 100.0 if run1["p99_ms"] > 0 else 0.0
# CI as a percentage of baseline mean
lat_ci_pct = paired["ci_ms"] / run1["mean_ms"] * 100.0 if run1["mean_ms"] > 0 else 0.0
mgas_ci_pct = paired["mgas_ci"] / run1["mean_mgas_s"] * 100.0 if run1["mean_mgas_s"] > 0 else 0.0
base_url = f"https://github.com/{repo}/commit"
baseline_label = f"[`{baseline_name}`]({base_url}/{baseline_ref})"
feature_label = f"[`{feature_name}`]({base_url}/{feature_sha})"
lines = [
f"| Metric | {baseline_label} | {feature_label} | Change |",
"|--------|------|--------|--------|",
f"| Mean | {fmt_ms(run1['mean_ms'])} | {fmt_ms(run2['mean_ms'])} | {change_str(mean_pct, lat_ci_pct, lower_is_better=True)} |",
f"| StdDev | {fmt_ms(run1['stddev_ms'])} | {fmt_ms(run2['stddev_ms'])} | |",
f"| P50 | {fmt_ms(run1['p50_ms'])} | {fmt_ms(run2['p50_ms'])} | {change_str(p50_pct, p50_ci_pct, lower_is_better=True)} |",
f"| P90 | {fmt_ms(run1['p90_ms'])} | {fmt_ms(run2['p90_ms'])} | {change_str(p90_pct, p90_ci_pct, lower_is_better=True)} |",
f"| P99 | {fmt_ms(run1['p99_ms'])} | {fmt_ms(run2['p99_ms'])} | {change_str(p99_pct, p99_ci_pct, lower_is_better=True)} |",
f"| Mgas/s | {fmt_mgas(run1['mean_mgas_s'])} | {fmt_mgas(run2['mean_mgas_s'])} | {change_str(gas_pct, mgas_ci_pct, lower_is_better=False)} |",
"",
f"*{n} blocks*",
]
return "\n".join(lines)
def generate_wait_time_table(
title: str,
baseline_stats: dict,
feature_stats: dict,
baseline_label: str,
feature_label: str,
) -> str:
"""Generate a markdown table for a wait time metric."""
if not baseline_stats or not feature_stats:
return ""
lines = [
f"### {title}",
"",
f"| Metric | {baseline_label} | {feature_label} |",
"|--------|------|--------|",
f"| Mean | {fmt_ms(baseline_stats['mean_ms'])} | {fmt_ms(feature_stats['mean_ms'])} |",
f"| P50 | {fmt_ms(baseline_stats['p50_ms'])} | {fmt_ms(feature_stats['p50_ms'])} |",
f"| P95 | {fmt_ms(baseline_stats['p95_ms'])} | {fmt_ms(feature_stats['p95_ms'])} |",
]
return "\n".join(lines)
def generate_markdown(
summary: dict, comparison_table: str,
wait_time_tables: list[str] | None = None,
behind_baseline: int = 0, repo: str = "", baseline_ref: str = "", baseline_name: str = "",
) -> str:
"""Generate a markdown comment body."""
lines = ["## Benchmark Results", ""]
if behind_baseline > 0:
s = "s" if behind_baseline > 1 else ""
diff_link = f"https://github.com/{repo}/compare/{baseline_ref[:12]}...{baseline_name}"
lines.append(f"> ⚠️ Feature is [**{behind_baseline} commit{s} behind `{baseline_name}`**]({diff_link}). Consider rebasing for accurate results.")
lines.append("")
lines.append(comparison_table)
if wait_time_tables:
lines.append("")
lines.append("<details>")
lines.append("<summary>Wait Time Breakdown</summary>")
lines.append("")
for table in wait_time_tables:
if table:
lines.append(table)
lines.append("")
lines.append("</details>")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Parse reth-bench ABBA results")
parser.add_argument(
"--baseline-csv", nargs="+", required=True,
help="Baseline combined_latency.csv files (A1, A2)",
)
parser.add_argument(
"--feature-csv", "--branch-csv", nargs="+", required=True,
help="Feature combined_latency.csv files (B1, B2)",
)
parser.add_argument("--gas-csv", required=True, help="Path to total_gas.csv")
parser.add_argument(
"--output-summary", required=True, help="Output JSON summary path"
)
parser.add_argument("--output-markdown", required=True, help="Output markdown path")
parser.add_argument(
"--repo", default="paradigmxyz/reth", help="GitHub repo (owner/name)"
)
parser.add_argument("--baseline-ref", default=None, help="Baseline commit SHA")
parser.add_argument("--baseline-name", default=None, help="Baseline display name")
parser.add_argument("--feature-name", "--branch-name", default=None, help="Feature branch name")
parser.add_argument("--feature-ref", "--branch-sha", "--feature-sha", default=None, help="Feature commit SHA")
parser.add_argument("--behind-baseline", "--behind-main", type=int, default=0, help="Commits behind baseline")
args = parser.parse_args()
if len(args.baseline_csv) != len(args.feature_csv):
print("Must provide equal number of baseline and feature CSVs", file=sys.stderr)
sys.exit(1)
baseline_runs = []
feature_runs = []
for path in args.baseline_csv:
data = parse_combined_csv(path)
if not data:
print(f"No results in {path}", file=sys.stderr)
sys.exit(1)
baseline_runs.append(data)
for path in args.feature_csv:
data = parse_combined_csv(path)
if not data:
print(f"No results in {path}", file=sys.stderr)
sys.exit(1)
feature_runs.append(data)
gas = parse_gas_csv(args.gas_csv)
all_baseline = [r for run in baseline_runs for r in run]
all_feature = [r for run in feature_runs for r in run]
summary = compute_summary(all_feature, gas)
with open(args.output_summary, "w") as f:
json.dump(summary, f, indent=2)
print(f"Summary written to {args.output_summary}")
baseline_stats = compute_stats(all_baseline)
feature_stats = compute_stats(all_feature)
paired_stats = compute_paired_stats(baseline_runs, feature_runs)
if not paired_stats:
print("No common blocks between baseline and feature runs", file=sys.stderr)
sys.exit(1)
baseline_ref = args.baseline_ref or "main"
baseline_name = args.baseline_name or "baseline"
feature_name = args.feature_name or "feature"
feature_sha = args.feature_ref or "unknown"
comparison_table = generate_comparison_table(
baseline_stats,
feature_stats,
paired_stats,
repo=args.repo,
baseline_ref=baseline_ref,
baseline_name=baseline_name,
feature_name=feature_name,
feature_sha=feature_sha,
)
print(f"Generated comparison ({paired_stats['n']} paired blocks, "
f"mean diff {paired_stats['mean_diff_ms']:+.3f}ms ± {paired_stats['ci_ms']:.3f}ms)")
base_url = f"https://github.com/{args.repo}/commit"
baseline_label = f"[`{baseline_name}`]({base_url}/{baseline_ref})"
feature_label = f"[`{feature_name}`]({base_url}/{feature_sha})"
wait_fields = [
("persistence_wait_us", "Persistence Wait"),
("sparse_trie_wait_us", "Trie Cache Update Wait"),
("execution_cache_wait_us", "Execution Cache Update Wait"),
]
wait_time_tables = []
for field, title in wait_fields:
b_stats = compute_wait_stats(all_baseline, field)
f_stats = compute_wait_stats(all_feature, field)
table = generate_wait_time_table(title, b_stats, f_stats, baseline_label, feature_label)
if table:
wait_time_tables.append(table)
markdown = generate_markdown(
summary, comparison_table,
wait_time_tables=wait_time_tables,
behind_baseline=args.behind_baseline,
repo=args.repo,
baseline_ref=baseline_ref,
baseline_name=baseline_name,
)
with open(args.output_markdown, "w") as f:
f.write(markdown)
print(f"Markdown written to {args.output_markdown}")
if __name__ == "__main__":
main()

27
.github/scripts/bench-update-status.js vendored Normal file
View File

@@ -0,0 +1,27 @@
// Updates the reth-bench PR comment with current status.
//
// Reads from environment:
// BENCH_COMMENT_ID GitHub comment ID to update
// BENCH_JOB_URL URL to the Actions job page
// BENCH_CONFIG Config line (blocks, warmup, refs)
// BENCH_ACTOR User who triggered the benchmark
//
// Usage from actions/github-script:
// const s = require('./.github/scripts/bench-update-status.js');
// await s({github, context, status: 'Building baseline binary...'});
function buildBody(status) {
return `cc @${process.env.BENCH_ACTOR}\n\n🚀 Benchmark started! [View job](${process.env.BENCH_JOB_URL})\n\n⏳ **Status:** ${status}\n\n${process.env.BENCH_CONFIG}`;
}
async function updateStatus({ github, context, status }) {
await github.rest.issues.updateComment({
owner: context.repo.owner,
repo: context.repo.repo,
comment_id: parseInt(process.env.BENCH_COMMENT_ID),
body: buildBody(status),
});
}
updateStatus.buildBody = buildBody;
module.exports = updateStatus;

View File

@@ -59,10 +59,6 @@ engine-auth: [ ]
#
# System contract tests (already fixed and deployed):
#
# tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout and test_invalid_log_length
# System contract is already fixed and deployed; tests cover scenarios where contract is
# malformed which can't happen retroactively. No point in adding checks.
#
# tests/prague/eip7002_el_triggerable_withdrawals/test_contract_deployment.py::test_system_contract_deployment
# tests/prague/eip7251_consolidations/test_contract_deployment.py::test_system_contract_deployment
# Post-fork system contract deployment tests. Should fix for spec compliance but not realistic
@@ -71,32 +67,8 @@ eels/consume-engine:
- tests/prague/eip7702_set_code_tx/test_set_code_txs.py::test_set_code_to_non_empty_storage[fork_Prague-blockchain_test_engine-zero_nonce]-reth
- tests/prague/eip7251_consolidations/test_contract_deployment.py::test_system_contract_deployment[fork_CancunToPragueAtTime15k-blockchain_test_engine-deploy_after_fork-nonzero_balance]-reth
- tests/prague/eip7251_consolidations/test_contract_deployment.py::test_system_contract_deployment[fork_CancunToPragueAtTime15k-blockchain_test_engine-deploy_after_fork-zero_balance]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Prague-blockchain_test_engine-log_argument_amount_offset-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Prague-blockchain_test_engine-log_argument_amount_size-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Prague-blockchain_test_engine-log_argument_index_offset-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Prague-blockchain_test_engine-log_argument_index_size-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Prague-blockchain_test_engine-log_argument_pubkey_offset-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Prague-blockchain_test_engine-log_argument_pubkey_size-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Prague-blockchain_test_engine-log_argument_signature_offset-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Prague-blockchain_test_engine-log_argument_signature_size-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Prague-blockchain_test_engine-log_argument_withdrawal_credentials_offset-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Prague-blockchain_test_engine-log_argument_withdrawal_credentials_size-value_zero]-reth
- tests/prague/eip7002_el_triggerable_withdrawals/test_contract_deployment.py::test_system_contract_deployment[fork_CancunToPragueAtTime15k-blockchain_test_engine-deploy_after_fork-nonzero_balance]-reth
- tests/prague/eip7002_el_triggerable_withdrawals/test_contract_deployment.py::test_system_contract_deployment[fork_CancunToPragueAtTime15k-blockchain_test_engine-deploy_after_fork-zero_balance]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_log_length[fork_Prague-blockchain_test_engine-slice_bytes_False]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_log_length[fork_Prague-blockchain_test_engine-slice_bytes_True]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Osaka-blockchain_test_engine-log_argument_amount_offset-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Osaka-blockchain_test_engine-log_argument_amount_size-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Osaka-blockchain_test_engine-log_argument_index_offset-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Osaka-blockchain_test_engine-log_argument_index_size-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Osaka-blockchain_test_engine-log_argument_pubkey_offset-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Osaka-blockchain_test_engine-log_argument_pubkey_size-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Osaka-blockchain_test_engine-log_argument_signature_offset-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Osaka-blockchain_test_engine-log_argument_signature_size-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Osaka-blockchain_test_engine-log_argument_withdrawal_credentials_offset-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Osaka-blockchain_test_engine-log_argument_withdrawal_credentials_size-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_log_length[fork_Osaka-blockchain_test_engine-slice_bytes_False]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_log_length[fork_Osaka-blockchain_test_engine-slice_bytes_True]-reth
- tests/paris/eip7610_create_collision/test_initcollision.py::test_init_collision_create_tx[fork_Osaka-tx_type_0-blockchain_test_engine_from_state_test-non-empty-balance-revert-initcode]-reth
- tests/paris/eip7610_create_collision/test_initcollision.py::test_init_collision_create_tx[fork_Prague-tx_type_0-blockchain_test_engine_from_state_test-non-empty-balance-correct-initcode]-reth
- tests/paris/eip7610_create_collision/test_initcollision.py::test_init_collision_create_tx[fork_Paris-tx_type_1-blockchain_test_engine_from_state_test-non-empty-balance-correct-initcode]-reth

View File

@@ -16,12 +16,16 @@ engine-withdrawals:
- Withdrawals Fork on Block 1 - 8 Block Re-Org NewPayload (Paris) (reth)
- Withdrawals Fork on Block 8 - 10 Block Re-Org NewPayload (Paris) (reth)
- Withdrawals Fork on Canonical Block 8 / Side Block 7 - 10 Block Re-Org (Paris) (reth)
- Sync after 2 blocks - Withdrawals on Block 2 - Multiple Withdrawal Accounts (Paris) (reth)
- Sync after 2 blocks - Withdrawals on Block 2 - Multiple Withdrawal Accounts - No Transactions (Paris) (reth)
- Sync after 128 blocks - Withdrawals on Block 2 - Multiple Withdrawal Accounts (Paris) (reth)
engine-cancun:
- Transaction Re-Org, New Payload on Revert Back (Cancun) (reth)
- Transaction Re-Org, Re-Org to Different Block (Cancun) (reth)
- Transaction Re-Org, Re-Org Out (Cancun) (reth)
- Invalid Missing Ancestor ReOrg, StateRoot, EmptyTxs=False, Invalid P9 (Cancun) (reth)
- Invalid Missing Ancestor Syncing ReOrg, Timestamp, EmptyTxs=False, CanonicalReOrg=False, Invalid P8 (Cancun) (reth)
- Invalid Missing Ancestor Syncing ReOrg, Timestamp, EmptyTxs=False, CanonicalReOrg=True, Invalid P8 (Cancun) (reth)
- Multiple New Payloads Extending Canonical Chain, Wait for Canonical Payload (Cancun) (reth)
engine-api:
- Transaction Re-Org, Re-Org Out (Paris) (reth)

View File

@@ -6,8 +6,14 @@ cd hivetests/
sim="${1}"
limit="${2}"
# Use lower parallelism for eels tests to avoid OOM-killing the runner
parallelism=16
if [[ "${sim}" == *"eels"* ]]; then
parallelism=4
fi
run_hive() {
hive --sim "${sim}" --sim.limit "${limit}" --sim.parallelism 16 --client reth 2>&1 | tee /tmp/log || true
hive --sim "${sim}" --sim.limit "${limit}" --sim.parallelism "${parallelism}" --client reth 2>&1 | tee /tmp/log || true
}
check_log() {

View File

@@ -1,11 +1,40 @@
# Runs benchmarks.
#
# The reth-bench job replays real blocks via the Engine API against a reth node
# backed by a local snapshot managed with schelk.
#
# It runs the baseline binary and the feature (candidate) binary on the
# same block range (snapshot recovered between runs) to compare performance.
on:
pull_request:
# TODO: Disabled temporarily for https://github.com/CodSpeedHQ/runner/issues/55
# merge_group:
push:
branches: [main]
issue_comment:
types: [created, edited]
workflow_dispatch:
inputs:
blocks:
description: "Number of blocks to benchmark"
required: false
default: "500"
type: string
warmup:
description: "Number of warmup blocks"
required: false
default: "100"
type: string
baseline:
description: "Baseline git ref (default: merge-base)"
required: false
default: ""
type: string
feature:
description: "Feature git ref (default: branch head)"
required: false
default: ""
type: string
env:
CARGO_TERM_COLOR: always
@@ -14,9 +43,18 @@ env:
RUSTC_WRAPPER: "sccache"
name: bench
permissions:
contents: write
pull-requests: write
jobs:
codspeed:
if: github.event_name == 'push'
runs-on: depot-ubuntu-latest
concurrency:
group: bench-codspeed-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
strategy:
matrix:
partition: [1, 2]
@@ -31,6 +69,7 @@ jobs:
- uses: actions/checkout@v6
with:
submodules: true
ref: ${{ github.event_name == 'issue_comment' && format('refs/pull/{0}/merge', github.event.issue.number) || '' }}
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: mozilla-actions/sccache-action@v0.0.9
@@ -49,3 +88,707 @@ jobs:
run: cargo codspeed run ${{ matrix.crates }}
mode: instrumentation
token: ${{ secrets.CODSPEED_TOKEN }}
reth-bench-ack:
if: |
(github.event_name == 'issue_comment' && github.event.issue.pull_request && startsWith(github.event.comment.body, 'derek bench')) ||
github.event_name == 'workflow_dispatch'
name: reth-bench-ack
runs-on: ubuntu-latest
outputs:
pr: ${{ steps.args.outputs.pr }}
actor: ${{ steps.args.outputs.actor }}
blocks: ${{ steps.args.outputs.blocks }}
warmup: ${{ steps.args.outputs.warmup }}
baseline: ${{ steps.args.outputs.baseline }}
feature: ${{ steps.args.outputs.feature }}
baseline-name: ${{ steps.args.outputs.baseline-name }}
feature-name: ${{ steps.args.outputs.feature-name }}
comment-id: ${{ steps.ack.outputs.comment-id }}
steps:
- name: Check org membership
if: github.event_name == 'issue_comment'
uses: actions/github-script@v8
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
const user = context.payload.comment.user.login;
try {
const { status } = await github.rest.orgs.checkMembershipForUser({
org: 'paradigmxyz',
username: user,
});
if (status !== 204 && status !== 302) {
core.setFailed(`@${user} is not a member of paradigmxyz`);
}
} catch (e) {
core.setFailed(`@${user} is not a member of paradigmxyz`);
}
- name: Parse arguments
id: args
uses: actions/github-script@v8
with:
script: |
let pr, actor, blocks, warmup, baseline, feature;
if (context.eventName === 'workflow_dispatch') {
actor = '${{ github.actor }}';
blocks = '${{ github.event.inputs.blocks }}' || '500';
warmup = '${{ github.event.inputs.warmup }}' || '100';
baseline = '${{ github.event.inputs.baseline }}';
feature = '${{ github.event.inputs.feature }}';
// Find PR for the selected branch
const branch = '${{ github.ref_name }}';
const { data: prs } = await github.rest.pulls.list({
owner: context.repo.owner,
repo: context.repo.repo,
head: `${context.repo.owner}:${branch}`,
state: 'open',
per_page: 1,
});
pr = prs.length ? String(prs[0].number) : '';
if (!pr) {
core.info(`No open PR found for branch '${branch}', results will be in job summary`);
}
} else {
pr = String(context.issue.number);
actor = context.payload.comment.user.login;
const body = context.payload.comment.body.trim();
const intArgs = new Set(['blocks', 'warmup']);
const refArgs = new Set(['baseline', 'feature']);
const defaults = { blocks: '500', warmup: '100', baseline: '', feature: '' };
const unknown = [];
const invalid = [];
const args = body.replace(/^derek bench\s*/, '');
for (const part of args.split(/\s+/).filter(Boolean)) {
const eq = part.indexOf('=');
if (eq === -1) {
unknown.push(part);
continue;
}
const key = part.slice(0, eq);
const value = part.slice(eq + 1);
if (intArgs.has(key)) {
if (!/^\d+$/.test(value)) {
invalid.push(`\`${key}=${value}\` (must be a positive integer)`);
} else {
defaults[key] = value;
}
} else if (refArgs.has(key)) {
if (!value) {
invalid.push(`\`${key}=\` (must be a git ref)`);
} else {
defaults[key] = value;
}
} else {
unknown.push(key);
}
}
const errors = [];
if (unknown.length) errors.push(`Unknown argument(s): \`${unknown.join('`, `')}\``);
if (invalid.length) errors.push(`Invalid value(s): ${invalid.join(', ')}`);
if (errors.length) {
const msg = `❌ **Invalid bench command**\n\n${errors.join('\n')}\n\n**Usage:** \`derek bench [blocks=N] [warmup=N] [baseline=REF] [feature=REF]\``;
await github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: context.issue.number,
body: msg,
});
core.setFailed(msg);
return;
}
blocks = defaults.blocks;
warmup = defaults.warmup;
baseline = defaults.baseline;
feature = defaults.feature;
}
// Resolve display names for baseline/feature
let baselineName = baseline || 'main';
let featureName = feature;
if (!featureName) {
if (pr) {
const { data: prData } = await github.rest.pulls.get({
owner: context.repo.owner,
repo: context.repo.repo,
pull_number: parseInt(pr),
});
featureName = prData.head.ref;
} else {
featureName = '${{ github.ref_name }}';
}
}
core.setOutput('pr', pr || '');
core.setOutput('actor', actor);
core.setOutput('blocks', blocks);
core.setOutput('warmup', warmup);
core.setOutput('baseline', baseline);
core.setOutput('feature', feature);
core.setOutput('baseline-name', baselineName);
core.setOutput('feature-name', featureName);
- name: Acknowledge request
id: ack
uses: actions/github-script@v8
with:
script: |
if (context.eventName === 'issue_comment') {
await github.rest.reactions.createForIssueComment({
owner: context.repo.owner,
repo: context.repo.repo,
comment_id: context.payload.comment.id,
content: 'eyes',
});
}
const pr = '${{ steps.args.outputs.pr }}';
if (!pr) return;
const runUrl = `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;
// Count queued/waiting bench runs ahead of this one
let queueMsg = '';
let ahead = 0;
try {
const statuses = ['queued', 'in_progress', 'waiting', 'requested', 'pending'];
const allRuns = [];
for (const status of statuses) {
const { data: { workflow_runs: r } } = await github.rest.actions.listWorkflowRuns({
owner: context.repo.owner,
repo: context.repo.repo,
workflow_id: 'bench.yml',
status,
per_page: 100,
});
allRuns.push(...r);
}
// Only count runs that trigger reth-bench (not push-triggered codspeed runs)
const benchRuns = allRuns.filter(r => r.event === 'issue_comment' || r.event === 'workflow_dispatch');
const thisRun = benchRuns.find(r => r.id === context.runId);
const thisCreatedAt = thisRun ? new Date(thisRun.created_at) : new Date();
ahead = benchRuns.filter(r => r.id !== context.runId && new Date(r.created_at) <= thisCreatedAt).length;
if (ahead > 0) {
queueMsg = `\n🔢 **Queue position:** \`#${ahead + 1}\` (${ahead} job(s) ahead)`;
}
} catch (e) {
// Non-fatal — queue info is best-effort
core.info(`Could not fetch queue info: ${e.message}`);
}
const actor = '${{ steps.args.outputs.actor }}';
const blocks = '${{ steps.args.outputs.blocks }}';
const warmup = '${{ steps.args.outputs.warmup }}';
const baseline = '${{ steps.args.outputs.baseline-name }}';
const feature = '${{ steps.args.outputs.feature-name }}';
const config = `**Config:** ${blocks} blocks, ${warmup} warmup blocks, baseline: \`${baseline}\`, feature: \`${feature}\``;
const { data: comment } = await github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: parseInt(pr),
body: `cc @${actor}\n\n🚀 Benchmark queued! [View run](${runUrl})\n\n⏳ **Status:** Waiting for runner...${queueMsg}\n\n${config}`,
});
core.setOutput('comment-id', String(comment.id));
core.setOutput('queue-position', String(ahead || 0));
- name: Poll queue position
if: steps.ack.outputs.comment-id && steps.ack.outputs.queue-position != '0'
uses: actions/github-script@v8
with:
script: |
const pr = '${{ steps.args.outputs.pr }}';
const commentId = parseInt('${{ steps.ack.outputs.comment-id }}');
const actor = '${{ steps.args.outputs.actor }}';
const blocks = '${{ steps.args.outputs.blocks }}';
const warmup = '${{ steps.args.outputs.warmup }}';
const baseline = '${{ steps.args.outputs.baseline-name }}';
const feature = '${{ steps.args.outputs.feature-name }}';
const config = `**Config:** ${blocks} blocks, ${warmup} warmup blocks, baseline: \`${baseline}\`, feature: \`${feature}\``;
const runUrl = `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;
async function getQueuePosition() {
const statuses = ['queued', 'in_progress', 'waiting', 'requested', 'pending'];
const allRuns = [];
for (const status of statuses) {
const { data: { workflow_runs: r } } = await github.rest.actions.listWorkflowRuns({
owner: context.repo.owner,
repo: context.repo.repo,
workflow_id: 'bench.yml',
status,
per_page: 100,
});
allRuns.push(...r);
}
const benchRuns = allRuns.filter(r => r.event === 'issue_comment' || r.event === 'workflow_dispatch');
const thisRun = benchRuns.find(r => r.id === context.runId);
const thisCreatedAt = thisRun ? new Date(thisRun.created_at) : new Date();
return benchRuns.filter(r => r.id !== context.runId && new Date(r.created_at) <= thisCreatedAt).length;
}
let lastPosition = parseInt('${{ steps.ack.outputs.queue-position }}');
const sleep = ms => new Promise(r => setTimeout(r, ms));
while (true) {
await sleep(10_000);
try {
const ahead = await getQueuePosition();
if (ahead !== lastPosition) {
lastPosition = ahead;
const queueMsg = ahead > 0
? `\n🔢 **Queue position:** \`#${ahead + 1}\` (${ahead} job(s) ahead)`
: '';
await github.rest.issues.updateComment({
owner: context.repo.owner,
repo: context.repo.repo,
comment_id: commentId,
body: `cc @${actor}\n\n🚀 Benchmark queued! [View run](${runUrl})\n\n⏳ **Status:** Waiting for runner...${queueMsg}\n\n${config}`,
});
}
if (ahead === 0) break;
} catch (e) {
core.info(`Queue poll error: ${e.message}`);
}
}
reth-bench:
needs: reth-bench-ack
name: reth-bench
runs-on: [self-hosted, Linux, X64]
timeout-minutes: 120
concurrency:
group: reth-bench-queue
cancel-in-progress: false
env:
BENCH_RPC_URL: https://ethereum.reth.rs/rpc
SCHELK_MOUNT: /reth-bench
BENCH_WORK_DIR: ${{ github.workspace }}/bench-work
BENCH_PR: ${{ needs.reth-bench-ack.outputs.pr }}
BENCH_ACTOR: ${{ needs.reth-bench-ack.outputs.actor }}
BENCH_BLOCKS: ${{ needs.reth-bench-ack.outputs.blocks }}
BENCH_WARMUP_BLOCKS: ${{ needs.reth-bench-ack.outputs.warmup }}
BENCH_COMMENT_ID: ${{ needs.reth-bench-ack.outputs.comment-id }}
steps:
- name: Resolve checkout ref
id: checkout-ref
uses: actions/github-script@v8
with:
script: |
if (!process.env.BENCH_PR) {
core.setOutput('ref', '${{ github.ref }}');
return;
}
const { data: pr } = await github.rest.pulls.get({
owner: context.repo.owner,
repo: context.repo.repo,
pull_number: parseInt(process.env.BENCH_PR),
});
// For closed/merged PRs, the merge ref doesn't exist — use head SHA
if (pr.state !== 'open') {
core.info(`PR #${process.env.BENCH_PR} is ${pr.state}, using head SHA ${pr.head.sha}`);
core.setOutput('ref', pr.head.sha);
} else {
core.setOutput('ref', `refs/pull/${process.env.BENCH_PR}/merge`);
}
- uses: actions/checkout@v6
with:
submodules: true
fetch-depth: 0
ref: ${{ steps.checkout-ref.outputs.ref }}
- name: Resolve job URL and update status
if: env.BENCH_COMMENT_ID
uses: actions/github-script@v8
with:
script: |
const { data: jobs } = await github.rest.actions.listJobsForWorkflowRun({
owner: context.repo.owner,
repo: context.repo.repo,
run_id: context.runId,
});
const job = jobs.jobs.find(j => j.name === 'reth-bench');
const jobUrl = job ? job.html_url : `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;
core.exportVariable('BENCH_JOB_URL', jobUrl);
const blocks = process.env.BENCH_BLOCKS;
const warmup = process.env.BENCH_WARMUP_BLOCKS;
const baseline = '${{ needs.reth-bench-ack.outputs.baseline-name }}';
const feature = '${{ needs.reth-bench-ack.outputs.feature-name }}';
core.exportVariable('BENCH_CONFIG', `**Config:** ${blocks} blocks, ${warmup} warmup blocks, baseline: \`${baseline}\`, feature: \`${feature}\``);
const { buildBody } = require('./.github/scripts/bench-update-status.js');
await github.rest.issues.updateComment({
owner: context.repo.owner,
repo: context.repo.repo,
comment_id: parseInt(process.env.BENCH_COMMENT_ID),
body: buildBody('Building binaries...'),
});
- uses: dtolnay/rust-toolchain@stable
- uses: mozilla-actions/sccache-action@v0.0.9
continue-on-error: true
# Verify all required tools are available
- name: Check dependencies
run: |
export PATH="$HOME/.local/bin:$HOME/.cargo/bin:$PATH"
echo "$HOME/.local/bin" >> "$GITHUB_PATH"
echo "$HOME/.cargo/bin" >> "$GITHUB_PATH"
missing=()
for cmd in mc schelk cpupower taskset stdbuf python3 curl make uv pzstd jq; do
command -v "$cmd" &>/dev/null || missing+=("$cmd")
done
if [ ${#missing[@]} -gt 0 ]; then
echo "::error::Missing required tools: ${missing[*]}"
exit 1
fi
echo "All dependencies found"
# Build binaries
- name: Resolve PR head branch
id: pr-info
uses: actions/github-script@v8
with:
script: |
if (process.env.BENCH_PR) {
const { data: pr } = await github.rest.pulls.get({
owner: context.repo.owner,
repo: context.repo.repo,
pull_number: parseInt(process.env.BENCH_PR),
});
core.setOutput('head-ref', pr.head.ref);
core.setOutput('head-sha', pr.head.sha);
} else {
core.setOutput('head-ref', '${{ github.ref_name }}');
core.setOutput('head-sha', '${{ github.sha }}');
}
- name: Resolve refs
id: refs
run: |
BASELINE_ARG="${{ needs.reth-bench-ack.outputs.baseline }}"
FEATURE_ARG="${{ needs.reth-bench-ack.outputs.feature }}"
if [ -n "$BASELINE_ARG" ]; then
git fetch origin "$BASELINE_ARG" --quiet 2>/dev/null || true
BASELINE_REF=$(git rev-parse "$BASELINE_ARG" 2>/dev/null || git rev-parse "origin/$BASELINE_ARG" 2>/dev/null)
BASELINE_NAME="$BASELINE_ARG"
else
BASELINE_REF=$(git merge-base HEAD origin/main 2>/dev/null || echo "${{ github.sha }}")
BASELINE_NAME="main"
fi
if [ -n "$FEATURE_ARG" ]; then
git fetch origin "$FEATURE_ARG" --quiet 2>/dev/null || true
FEATURE_REF=$(git rev-parse "$FEATURE_ARG" 2>/dev/null || git rev-parse "origin/$FEATURE_ARG" 2>/dev/null)
FEATURE_NAME="$FEATURE_ARG"
else
FEATURE_REF="${{ steps.pr-info.outputs.head-sha }}"
FEATURE_NAME="${{ steps.pr-info.outputs.head-ref }}"
fi
echo "baseline-ref=$BASELINE_REF" >> "$GITHUB_OUTPUT"
echo "baseline-name=$BASELINE_NAME" >> "$GITHUB_OUTPUT"
echo "feature-ref=$FEATURE_REF" >> "$GITHUB_OUTPUT"
echo "feature-name=$FEATURE_NAME" >> "$GITHUB_OUTPUT"
- name: Check if snapshot needs update
id: snapshot-check
run: |
if .github/scripts/bench-reth-snapshot.sh --check; then
echo "needed=false" >> "$GITHUB_OUTPUT"
else
echo "needed=true" >> "$GITHUB_OUTPUT"
fi
- name: Update status (snapshot needed)
if: env.BENCH_COMMENT_ID && steps.snapshot-check.outputs.needed == 'true'
uses: actions/github-script@v8
with:
script: |
const s = require('./.github/scripts/bench-update-status.js');
await s({github, context, status: 'Building binaries & downloading snapshot...'});
- name: Prepare source dirs
run: |
BASELINE_REF="${{ steps.refs.outputs.baseline-ref }}"
if [ -d ../reth-baseline ]; then
git -C ../reth-baseline fetch origin "$BASELINE_REF"
else
git clone . ../reth-baseline
fi
git -C ../reth-baseline checkout "$BASELINE_REF"
ln -sfn "$(pwd)" ../reth-feature
- name: Build binaries and download snapshot in parallel
id: build
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
BENCH_REPO: ${{ github.repository }}
SNAPSHOT_NEEDED: ${{ steps.snapshot-check.outputs.needed }}
run: |
BASELINE_DIR="$(cd ../reth-baseline && pwd)"
FEATURE_DIR="$(cd ../reth-feature && pwd)"
.github/scripts/bench-reth-build.sh baseline "${BASELINE_DIR}" "${{ steps.refs.outputs.baseline-ref }}" &
PID_BASELINE=$!
.github/scripts/bench-reth-build.sh feature "${FEATURE_DIR}" "${{ steps.refs.outputs.feature-ref }}" &
PID_FEATURE=$!
PID_SNAPSHOT=
if [ "$SNAPSHOT_NEEDED" = "true" ]; then
.github/scripts/bench-reth-snapshot.sh &
PID_SNAPSHOT=$!
fi
FAIL=0
wait $PID_BASELINE || FAIL=1
wait $PID_FEATURE || FAIL=1
[ -n "$PID_SNAPSHOT" ] && { wait $PID_SNAPSHOT || FAIL=1; }
if [ $FAIL -ne 0 ]; then
echo "::error::One or more parallel tasks failed (builds / snapshot download)"
exit 1
fi
# System tuning for reproducible benchmarks
- name: System setup
run: |
sudo cpupower frequency-set -g performance || true
# Disable turbo boost (Intel and AMD paths)
echo 1 | sudo tee /sys/devices/system/cpu/intel_pstate/no_turbo 2>/dev/null || true
echo 0 | sudo tee /sys/devices/system/cpu/cpufreq/boost 2>/dev/null || true
sudo swapoff -a || true
echo 0 | sudo tee /proc/sys/kernel/randomize_va_space || true
# Disable SMT (hyperthreading)
for cpu in /sys/devices/system/cpu/cpu*/topology/thread_siblings_list; do
first=$(cut -d, -f1 < "$cpu" | cut -d- -f1)
current=$(echo "$cpu" | grep -o 'cpu[0-9]*' | grep -o '[0-9]*')
if [ "$current" != "$first" ]; then
echo 0 | sudo tee "/sys/devices/system/cpu/cpu${current}/online" || true
fi
done
echo "Online CPUs: $(nproc)"
# Disable transparent huge pages (compaction causes latency spikes)
for p in /sys/kernel/mm/transparent_hugepage /sys/kernel/mm/transparent_hugepages; do
[ -d "$p" ] && echo never | sudo tee "$p/enabled" && echo never | sudo tee "$p/defrag" && break
done || true
# Prevent deep C-states (avoids wake-up latency jitter)
sudo sh -c 'exec 3<>/dev/cpu_dma_latency; echo -ne "\x00\x00\x00\x00" >&3; sleep infinity' &
# Move all IRQs to core 0 (housekeeping core)
for irq in /proc/irq/*/smp_affinity_list; do
echo 0 | sudo tee "$irq" 2>/dev/null || true
done
# Stop noisy background services
sudo systemctl stop irqbalance cron atd unattended-upgrades snapd 2>/dev/null || true
# Log environment for reproducibility
echo "=== Benchmark environment ==="
uname -r
lscpu | grep -E 'Model name|CPU\(s\)|MHz|NUMA'
cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_governor
cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq
cat /sys/kernel/mm/transparent_hugepage/enabled 2>/dev/null || cat /sys/kernel/mm/transparent_hugepages/enabled 2>/dev/null || echo "THP: unknown"
free -h
# Clean up any leftover state
- name: Pre-flight cleanup
run: |
sudo pkill -9 reth || true
sleep 1
if mountpoint -q "$SCHELK_MOUNT"; then
sudo umount -l "$SCHELK_MOUNT" || true
sudo schelk recover -y || true
fi
rm -rf "$BENCH_WORK_DIR"
mkdir -p "$BENCH_WORK_DIR"
- name: Update status (running benchmarks)
if: success() && env.BENCH_COMMENT_ID
uses: actions/github-script@v8
with:
script: |
const s = require('./.github/scripts/bench-update-status.js');
await s({github, context, status: 'Running benchmarks...'});
# Interleaved run order (B-F-F-B) to reduce systematic bias from
# thermal drift and cache warming.
- name: "Run benchmark: baseline (1/2)"
id: run-baseline-1
run: taskset -c 0 .github/scripts/bench-reth-run.sh baseline ../reth-baseline/target/profiling/reth "$BENCH_WORK_DIR/baseline-1"
- name: "Run benchmark: feature (1/2)"
id: run-feature-1
run: taskset -c 0 .github/scripts/bench-reth-run.sh feature ../reth-feature/target/profiling/reth "$BENCH_WORK_DIR/feature-1"
- name: "Run benchmark: feature (2/2)"
id: run-feature-2
run: taskset -c 0 .github/scripts/bench-reth-run.sh feature ../reth-feature/target/profiling/reth "$BENCH_WORK_DIR/feature-2"
- name: "Run benchmark: baseline (2/2)"
id: run-baseline-2
run: taskset -c 0 .github/scripts/bench-reth-run.sh baseline ../reth-baseline/target/profiling/reth "$BENCH_WORK_DIR/baseline-2"
# Results & charts
- name: Parse results
id: results
if: success()
env:
BASELINE_REF: ${{ steps.refs.outputs.baseline-ref }}
BASELINE_NAME: ${{ steps.refs.outputs.baseline-name }}
FEATURE_NAME: ${{ steps.refs.outputs.feature-name }}
FEATURE_REF: ${{ steps.refs.outputs.feature-ref }}
run: |
git fetch origin "${BASELINE_NAME}" --quiet 2>/dev/null || true
BASELINE_HEAD=$(git rev-parse "origin/${BASELINE_NAME}" 2>/dev/null || echo "")
BEHIND_BASELINE=0
if [ -n "$BASELINE_HEAD" ] && [ "$BASELINE_REF" != "$BASELINE_HEAD" ]; then
BEHIND_BASELINE=$(git rev-list --count "${BASELINE_REF}..${BASELINE_HEAD}" 2>/dev/null || echo "0")
fi
SUMMARY_ARGS="--output-summary $BENCH_WORK_DIR/summary.json"
SUMMARY_ARGS="$SUMMARY_ARGS --output-markdown $BENCH_WORK_DIR/comment.md"
SUMMARY_ARGS="$SUMMARY_ARGS --repo ${{ github.repository }}"
SUMMARY_ARGS="$SUMMARY_ARGS --baseline-ref ${BASELINE_REF}"
SUMMARY_ARGS="$SUMMARY_ARGS --baseline-name ${BASELINE_NAME}"
SUMMARY_ARGS="$SUMMARY_ARGS --feature-name ${FEATURE_NAME}"
SUMMARY_ARGS="$SUMMARY_ARGS --feature-ref ${FEATURE_REF}"
SUMMARY_ARGS="$SUMMARY_ARGS --baseline-csv $BENCH_WORK_DIR/baseline-1/combined_latency.csv $BENCH_WORK_DIR/baseline-2/combined_latency.csv"
SUMMARY_ARGS="$SUMMARY_ARGS --feature-csv $BENCH_WORK_DIR/feature-1/combined_latency.csv $BENCH_WORK_DIR/feature-2/combined_latency.csv"
SUMMARY_ARGS="$SUMMARY_ARGS --gas-csv $BENCH_WORK_DIR/feature-1/total_gas.csv"
if [ "$BEHIND_BASELINE" -gt 0 ]; then
SUMMARY_ARGS="$SUMMARY_ARGS --behind-baseline $BEHIND_BASELINE"
fi
# shellcheck disable=SC2086
python3 .github/scripts/bench-reth-summary.py $SUMMARY_ARGS
- name: Generate charts
if: success()
env:
BASELINE_NAME: ${{ steps.refs.outputs.baseline-name }}
FEATURE_NAME: ${{ steps.refs.outputs.feature-name }}
run: |
CHART_ARGS="--output-dir $BENCH_WORK_DIR/charts"
CHART_ARGS="$CHART_ARGS --feature $BENCH_WORK_DIR/feature-1/combined_latency.csv $BENCH_WORK_DIR/feature-2/combined_latency.csv"
CHART_ARGS="$CHART_ARGS --baseline $BENCH_WORK_DIR/baseline-1/combined_latency.csv $BENCH_WORK_DIR/baseline-2/combined_latency.csv"
CHART_ARGS="$CHART_ARGS --baseline-name ${BASELINE_NAME}"
CHART_ARGS="$CHART_ARGS --feature-name ${FEATURE_NAME}"
# shellcheck disable=SC2086
uv run --with matplotlib python3 .github/scripts/bench-reth-charts.py $CHART_ARGS
- name: Upload results
if: success()
uses: actions/upload-artifact@v6
with:
name: bench-reth-results
path: ${{ env.BENCH_WORK_DIR }}
- name: Push charts
id: push-charts
if: success() && env.BENCH_PR
run: |
PR_NUMBER=${{ env.BENCH_PR }}
RUN_ID=${{ github.run_id }}
CHART_DIR="pr/${PR_NUMBER}/${RUN_ID}"
if git fetch origin bench-charts 2>/dev/null; then
git checkout bench-charts
else
git checkout --orphan bench-charts
git rm -rf . 2>/dev/null || true
fi
mkdir -p "${CHART_DIR}"
cp "$BENCH_WORK_DIR"/charts/*.png "${CHART_DIR}/"
git add "${CHART_DIR}"
git -c user.name="github-actions" -c user.email="github-actions@github.com" \
commit -m "bench charts for PR #${PR_NUMBER} run ${RUN_ID}"
git push origin bench-charts
echo "sha=$(git rev-parse HEAD)" >> "$GITHUB_OUTPUT"
- name: Compare & comment
if: success()
uses: actions/github-script@v8
with:
script: |
const fs = require('fs');
let comment = '';
try {
comment = fs.readFileSync(process.env.BENCH_WORK_DIR + '/comment.md', 'utf8');
} catch (e) {
comment = '⚠️ Engine benchmark completed but failed to generate comparison.';
}
const sha = '${{ steps.push-charts.outputs.sha }}';
const prNumber = process.env.BENCH_PR;
const runId = '${{ github.run_id }}';
if (sha && prNumber) {
const baseUrl = `https://raw.githubusercontent.com/${context.repo.owner}/${context.repo.repo}/${sha}/pr/${prNumber}/${runId}`;
const charts = [
{ file: 'latency_throughput.png', label: 'Latency, Throughput & Diff' },
{ file: 'wait_breakdown.png', label: 'Wait Time Breakdown' },
{ file: 'gas_vs_latency.png', label: 'Gas vs Latency' },
];
let chartMarkdown = '\n\n### Charts\n\n';
for (const chart of charts) {
chartMarkdown += `<details><summary>${chart.label}</summary>\n\n`;
chartMarkdown += `![${chart.label}](${baseUrl}/${chart.file})\n\n`;
chartMarkdown += `</details>\n\n`;
}
comment += chartMarkdown;
}
const jobUrl = process.env.BENCH_JOB_URL || `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;
const body = `cc @${process.env.BENCH_ACTOR}\n\n✅ Benchmark complete! [View job](${jobUrl})\n\n${comment}`;
const ackCommentId = process.env.BENCH_COMMENT_ID;
if (ackCommentId) {
await github.rest.issues.updateComment({
owner: context.repo.owner,
repo: context.repo.repo,
comment_id: parseInt(ackCommentId),
body,
});
} else {
// No PR — write results to job summary
await core.summary.addRaw(body).write();
}
- name: Update status (failed)
if: failure() && env.BENCH_COMMENT_ID
uses: actions/github-script@v8
with:
script: |
const steps_status = [
['building binaries${{ steps.snapshot-check.outputs.needed == 'true' && ' & downloading snapshot' || '' }}', '${{ steps.build.outcome }}'],
['running baseline benchmark (1/2)', '${{ steps.run-baseline-1.outcome }}'],
['running feature benchmark (1/2)', '${{ steps.run-feature-1.outcome }}'],
['running feature benchmark (2/2)', '${{ steps.run-feature-2.outcome }}'],
['running baseline benchmark (2/2)', '${{ steps.run-baseline-2.outcome }}'],
];
const failed = steps_status.find(([, o]) => o === 'failure');
const failedStep = failed ? failed[0] : 'unknown step';
await github.rest.issues.updateComment({
owner: context.repo.owner, repo: context.repo.repo,
comment_id: parseInt(process.env.BENCH_COMMENT_ID),
body: `cc @${process.env.BENCH_ACTOR}\n\n❌ Benchmark failed while ${failedStep}. [View logs](${process.env.BENCH_JOB_URL})`,
});
- name: Upload node log
if: failure()
uses: actions/upload-artifact@v6
with:
name: reth-node-log
path: |
${{ env.BENCH_WORK_DIR }}/*/node.log
- name: Restore system settings
if: always()
run: |
sudo systemctl start irqbalance cron atd 2>/dev/null || true

View File

@@ -15,7 +15,7 @@ env:
jobs:
build:
runs-on: depot-ubuntu-latest-8
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-8' || 'ubuntu-latest' }}
timeout-minutes: 90
steps:
- name: Checkout

View File

@@ -25,7 +25,7 @@ env:
jobs:
check:
name: Check compilation with patched alloy
runs-on: depot-ubuntu-latest-16
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-16' || 'ubuntu-latest' }}
timeout-minutes: 60
steps:
- uses: actions/checkout@v4

View File

@@ -18,7 +18,7 @@ env:
name: compact-codec
jobs:
compact-codec:
runs-on: depot-ubuntu-latest
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
strategy:
matrix:
bin:

View File

@@ -15,7 +15,6 @@ on:
jobs:
build:
if: github.repository == 'paradigmxyz/reth'
timeout-minutes: 45
runs-on: ubuntu-latest
permissions:
@@ -31,10 +30,22 @@ jobs:
echo "sha=${{ github.sha }}" >> "$GITHUB_OUTPUT"
echo "describe=$(git describe --always --tags)" >> "$GITHUB_OUTPUT"
- name: Detect fork
id: fork
run: |
if [ "${{ github.event_name }}" = "pull_request" ] && [ "${{ github.event.pull_request.head.repo.full_name }}" != "${{ github.repository }}" ]; then
echo "is_fork=true" >> "$GITHUB_OUTPUT"
else
echo "is_fork=false" >> "$GITHUB_OUTPUT"
fi
# Depot build (upstream only)
- name: Set up Depot CLI
if: steps.fork.outputs.is_fork == 'false'
uses: depot/setup-action@v1
- name: Build reth image
- name: Build reth image (Depot)
if: steps.fork.outputs.is_fork == 'false'
uses: depot/bake-action@v1
env:
DEPOT_TOKEN: ${{ secrets.DEPOT_TOKEN }}
@@ -46,6 +57,24 @@ jobs:
targets: ${{ inputs.hive_target }}
push: false
# Docker build (forks)
- name: Set up Docker Buildx
if: steps.fork.outputs.is_fork == 'true'
uses: docker/setup-buildx-action@v3
- name: Build reth image (Docker)
if: steps.fork.outputs.is_fork == 'true'
uses: docker/bake-action@v6
env:
VERGEN_GIT_SHA: ${{ steps.git.outputs.sha }}
VERGEN_GIT_DESCRIBE: ${{ steps.git.outputs.describe }}
with:
files: docker-bake.hcl
targets: ${{ inputs.hive_target }}
push: false
set: |
*.dockerfile=Dockerfile
- name: Upload reth image
uses: actions/upload-artifact@v6
with:

View File

@@ -20,7 +20,7 @@ concurrency:
jobs:
test:
name: e2e-testsuite
runs-on: depot-ubuntu-latest-4
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
env:
RUST_BACKTRACE: 1
timeout-minutes: 90
@@ -47,7 +47,7 @@ jobs:
rocksdb:
name: e2e-rocksdb
runs-on: depot-ubuntu-latest-4
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
env:
RUST_BACKTRACE: 1
timeout-minutes: 60

View File

@@ -32,7 +32,7 @@ jobs:
prepare-hive:
if: github.repository == 'paradigmxyz/reth'
timeout-minutes: 45
runs-on: depot-ubuntu-latest-4
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
steps:
- uses: actions/checkout@v6
- name: Checkout hive tests
@@ -188,7 +188,8 @@ jobs:
- build-reth-edge
- prepare-hive
name: ${{ matrix.storage }} / ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }}
runs-on: depot-ubuntu-latest-4
# Use larger runners for eels tests to avoid OOM runner crashes
runs-on: ${{ github.repository == 'paradigmxyz/reth' && (contains(matrix.scenario.sim, 'eels') && 'depot-ubuntu-latest-8' || 'depot-ubuntu-latest-4') || 'ubuntu-latest' }}
permissions:
issues: write
steps:

View File

@@ -24,7 +24,7 @@ jobs:
test:
name: test / ${{ matrix.network }} / ${{ matrix.storage }}
if: github.event_name != 'schedule'
runs-on: depot-ubuntu-latest-4
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
env:
RUST_BACKTRACE: 1
strategy:

View File

@@ -31,7 +31,7 @@ jobs:
strategy:
fail-fast: false
name: run kurtosis
runs-on: depot-ubuntu-latest
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
needs:
- build-reth
steps:

View File

@@ -13,7 +13,7 @@ env:
jobs:
clippy-binaries:
name: clippy binaries / ${{ matrix.type }}
runs-on: depot-ubuntu-latest
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
timeout-minutes: 30
strategy:
matrix:
@@ -42,7 +42,7 @@ jobs:
clippy:
name: clippy
runs-on: depot-ubuntu-latest
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -59,7 +59,7 @@ jobs:
RUSTFLAGS: -D warnings
wasm:
runs-on: depot-ubuntu-latest
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -79,7 +79,7 @@ jobs:
.github/scripts/check_wasm.sh
riscv:
runs-on: depot-ubuntu-latest
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
timeout-minutes: 60
steps:
- uses: actions/checkout@v6
@@ -98,7 +98,7 @@ jobs:
crate-checks:
name: crate-checks (${{ matrix.partition }}/${{ matrix.total_partitions }})
runs-on: depot-ubuntu-latest-4
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
strategy:
matrix:
partition: [1, 2, 3]
@@ -117,7 +117,7 @@ jobs:
msrv:
name: MSRV
runs-on: depot-ubuntu-latest
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -135,7 +135,7 @@ jobs:
docs:
name: docs
runs-on: depot-ubuntu-latest-4
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -153,7 +153,7 @@ jobs:
fmt:
name: fmt
runs-on: depot-ubuntu-latest
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -167,7 +167,7 @@ jobs:
udeps:
name: udeps
runs-on: depot-ubuntu-latest
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -182,7 +182,7 @@ jobs:
book:
name: book
runs-on: depot-ubuntu-latest
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -240,7 +240,7 @@ jobs:
# Checks that selected crates can compile with power set of features
features:
name: features
runs-on: depot-ubuntu-latest
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -264,7 +264,7 @@ jobs:
# Check crates correctly propagate features
feature-propagation:
runs-on: depot-ubuntu-latest
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
timeout-minutes: 20
steps:
- uses: actions/checkout@v6

View File

@@ -102,7 +102,7 @@ jobs:
- name: Install cross main
id: cross_main
run: |
cargo install cross --git https://github.com/cross-rs/cross
cargo install cross --locked --git https://github.com/cross-rs/cross
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true

View File

@@ -23,7 +23,7 @@ jobs:
name: stage-run-test
# Only run stage commands test in merge groups
if: github.event_name == 'merge_group'
runs-on: depot-ubuntu-latest
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
env:
RUST_LOG: info,sync=error
RUST_BACKTRACE: 1
@@ -38,7 +38,7 @@ jobs:
cache-on-failure: true
- name: Build reth
run: |
cargo install --path bin/reth
cargo install --locked --path bin/reth
- name: Run headers stage
run: |
reth stage run headers --from ${{ env.FROM_BLOCK }} --to ${{ env.TO_BLOCK }} --commit --checkpoints

View File

@@ -19,7 +19,7 @@ jobs:
sync:
if: github.repository == 'paradigmxyz/reth'
name: sync (${{ matrix.chain.bin }})
runs-on: depot-ubuntu-latest
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
env:
RUST_LOG: info,sync=error
RUST_BACKTRACE: 1

View File

@@ -19,7 +19,7 @@ jobs:
sync:
if: github.repository == 'paradigmxyz/reth'
name: sync (${{ matrix.chain.bin }})
runs-on: depot-ubuntu-latest
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
env:
RUST_LOG: info,sync=error
RUST_BACKTRACE: 1

View File

@@ -20,7 +20,7 @@ concurrency:
jobs:
test:
name: test / ${{ matrix.type }} / ${{ matrix.storage }}
runs-on: depot-ubuntu-latest-4
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
env:
RUST_BACKTRACE: 1
EDGE_FEATURES: ${{ matrix.storage == 'edge' && 'edge' || '' }}
@@ -57,7 +57,7 @@ jobs:
state:
name: Ethereum state tests
runs-on: depot-ubuntu-latest-4
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
env:
RUST_LOG: info,sync=error
RUST_BACKTRACE: 1
@@ -92,7 +92,7 @@ jobs:
doc:
name: doc tests
runs-on: depot-ubuntu-latest
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
env:
RUST_BACKTRACE: 1
timeout-minutes: 30

View File

@@ -313,6 +313,74 @@ GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst);
Before adding a comment, ask: Would someone reading just the current code (no PR, no history) find this helpful?
#### Rust Style Guides
##### Type Ordering in Files
When defining structs, traits, and functions in a file, follow this ordering convention. The file's primary type (matching the file name) comes first, followed by supporting public types, then private types and helpers.
```rust
use ...;
/// The primary type of this file (matches filename).
pub struct PayloadProcessor { ... }
impl PayloadProcessor { ... }
// Followed by public auxiliary types that support the primary type
/// Configuration for the processor.
pub struct PayloadProcessorConfig { ... }
/// Result type returned by processor operations.
pub struct ProcessorResult { ... }
// Followed by public traits related to the primary type
pub trait ProcessorExt { ... }
// Followed by private helper types
struct InternalState { ... }
// Followed by private helper functions
fn validate_input() { ... }
```
❌ **Bad**: Adding new traits and auxiliary types **above** the file's primary type (see [#22133](https://github.com/paradigmxyz/reth/pull/22133)):
```rust
use ...;
// ❌ BAD - new auxiliary struct added before the file's main type
pub struct CacheWaitDurations { ... }
// ❌ BAD - new trait added before the file's main type
pub trait WaitForCaches { ... }
// The file's primary type is buried below unrelated additions
pub struct PayloadProcessor { ... }
```
✅ **Good**: New types go **after** the primary type:
```rust
use ...;
// ✅ The file's primary type stays at the top
pub struct PayloadProcessor { ... }
impl PayloadProcessor { ... }
// ✅ Auxiliary types follow the primary type
pub struct CacheWaitDurations { ... }
pub trait WaitForCaches { ... }
impl WaitForCaches for PayloadProcessor { ... }
```
### Example Contribution Workflow
Let's say you want to fix a bug where external IP resolution fails on startup:

381
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -459,33 +459,33 @@ alloy-trie = { version = "0.9.4", default-features = false }
alloy-hardforks = "0.4.5"
alloy-consensus = { version = "1.6.3", default-features = false }
alloy-contract = { version = "1.6.3", default-features = false }
alloy-eips = { version = "1.6.3", default-features = false }
alloy-genesis = { version = "1.6.3", default-features = false }
alloy-json-rpc = { version = "1.6.3", default-features = false }
alloy-network = { version = "1.6.3", default-features = false }
alloy-network-primitives = { version = "1.6.3", default-features = false }
alloy-provider = { version = "1.6.3", features = ["reqwest", "debug-api"], default-features = false }
alloy-pubsub = { version = "1.6.3", default-features = false }
alloy-rpc-client = { version = "1.6.3", default-features = false }
alloy-rpc-types = { version = "1.6.3", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.6.3", default-features = false }
alloy-rpc-types-anvil = { version = "1.6.3", default-features = false }
alloy-rpc-types-beacon = { version = "1.6.3", default-features = false }
alloy-rpc-types-debug = { version = "1.6.3", default-features = false }
alloy-rpc-types-engine = { version = "1.6.3", default-features = false }
alloy-rpc-types-eth = { version = "1.6.3", default-features = false }
alloy-rpc-types-mev = { version = "1.6.3", default-features = false }
alloy-rpc-types-trace = { version = "1.6.3", default-features = false }
alloy-rpc-types-txpool = { version = "1.6.3", default-features = false }
alloy-serde = { version = "1.6.3", default-features = false }
alloy-signer = { version = "1.6.3", default-features = false }
alloy-signer-local = { version = "1.6.3", default-features = false }
alloy-transport = { version = "1.6.3" }
alloy-transport-http = { version = "1.6.3", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.6.3", default-features = false }
alloy-transport-ws = { version = "1.6.3", default-features = false }
alloy-consensus = { version = "1.7.3", default-features = false }
alloy-contract = { version = "1.7.3", default-features = false }
alloy-eips = { version = "1.7.3", default-features = false }
alloy-genesis = { version = "1.7.3", default-features = false }
alloy-json-rpc = { version = "1.7.3", default-features = false }
alloy-network = { version = "1.7.3", default-features = false }
alloy-network-primitives = { version = "1.7.3", default-features = false }
alloy-provider = { version = "1.7.3", features = ["reqwest", "debug-api"], default-features = false }
alloy-pubsub = { version = "1.7.3", default-features = false }
alloy-rpc-client = { version = "1.7.3", default-features = false }
alloy-rpc-types = { version = "1.7.3", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.7.3", default-features = false }
alloy-rpc-types-anvil = { version = "1.7.3", default-features = false }
alloy-rpc-types-beacon = { version = "1.7.3", default-features = false }
alloy-rpc-types-debug = { version = "1.7.3", default-features = false }
alloy-rpc-types-engine = { version = "1.7.3", default-features = false }
alloy-rpc-types-eth = { version = "1.7.3", default-features = false }
alloy-rpc-types-mev = { version = "1.7.3", default-features = false }
alloy-rpc-types-trace = { version = "1.7.3", default-features = false }
alloy-rpc-types-txpool = { version = "1.7.3", default-features = false }
alloy-serde = { version = "1.7.3", default-features = false }
alloy-signer = { version = "1.7.3", default-features = false }
alloy-signer-local = { version = "1.7.3", default-features = false }
alloy-transport = { version = "1.7.3" }
alloy-transport-http = { version = "1.7.3", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.7.3", default-features = false }
alloy-transport-ws = { version = "1.7.3", default-features = false }
# op
alloy-op-evm = { version = "0.27.2", default-features = false }
@@ -528,6 +528,7 @@ notify = { version = "8.0.0", default-features = false, features = ["macos_fseve
nybbles = { version = "0.4.8", default-features = false }
once_cell = { version = "1.19", default-features = false, features = ["critical-section"] }
parking_lot = "0.12"
quanta = "0.12"
paste = "1.0"
rand = "0.9"
rayon = "1.7"
@@ -650,7 +651,7 @@ ethereum_ssz_derive = "0.10.1"
jemalloc_pprof = { version = "0.8", default-features = false }
tikv-jemalloc-ctl = "0.6"
tikv-jemallocator = "0.6"
tracy-client = "0.18.0"
tracy-client = { version = "0.18.0", features = ["demangle"] }
snmalloc-rs = { version = "0.3.7", features = ["build_cc"] }
aes = "0.8.1"

View File

@@ -1,6 +1,6 @@
# syntax=docker.io/docker/dockerfile:1.7-labs
FROM lukemathwalker/cargo-chef:latest-rust-1 AS chef
FROM lukemathwalker/cargo-chef:latest-rust-1.93 AS chef
WORKDIR /app
LABEL org.opencontainers.image.source=https://github.com/paradigmxyz/reth

View File

@@ -4,7 +4,7 @@
# Usage:
# reth: --build-arg BINARY=reth
FROM rust:1 AS builder
FROM rust:1.93 AS builder
WORKDIR /app
LABEL org.opencontainers.image.source=https://github.com/paradigmxyz/reth

View File

@@ -80,7 +80,7 @@ build-native-%:
#
# These commands require that:
#
# - `cross` is installed (`cargo install cross`).
# - `cross` is installed (`cargo install --locked cross`).
# - Docker is running.
# - The current user is in the `docker` group.
#
@@ -261,7 +261,7 @@ lint-typos: ensure-typos
ensure-typos:
@if ! command -v typos &> /dev/null; then \
echo "typos not found. Please install it by running the command 'cargo install typos-cli' or refer to the following link for more information: https://github.com/crate-ci/typos"; \
echo "typos not found. Please install it by running the command 'cargo install --locked typos-cli' or refer to the following link for more information: https://github.com/crate-ci/typos"; \
exit 1; \
fi

View File

@@ -29,6 +29,8 @@ pub(crate) struct BenchContext {
pub(crate) next_block: u64,
/// Whether the chain is an OP rollup.
pub(crate) is_optimism: bool,
/// Whether to use `reth_newPayload` endpoint instead of `engine_newPayload*`.
pub(crate) use_reth_namespace: bool,
}
impl BenchContext {
@@ -140,6 +142,14 @@ impl BenchContext {
};
let next_block = first_block.header.number + 1;
Ok(Self { auth_provider, block_provider, benchmark_mode, next_block, is_optimism })
let use_reth_namespace = bench_args.reth_new_payload;
Ok(Self {
auth_provider,
block_provider,
benchmark_mode,
next_block,
is_optimism,
use_reth_namespace,
})
}
}

View File

@@ -6,7 +6,7 @@ use crate::{
helpers::{build_payload, parse_gas_limit, prepare_payload_request, rpc_block_to_header},
output::GasRampPayloadFile,
},
valid_payload::{call_forkchoice_updated, call_new_payload, payload_to_new_payload},
valid_payload::{call_forkchoice_updated, call_new_payload_with_reth, payload_to_new_payload},
};
use alloy_eips::BlockNumberOrTag;
use alloy_provider::{network::AnyNetwork, Provider, RootProvider};
@@ -47,6 +47,14 @@ pub struct Command {
/// Output directory for benchmark results and generated payloads.
#[arg(long, value_name = "OUTPUT")]
output: PathBuf,
/// Use `reth_newPayload` endpoint instead of `engine_newPayload*`.
///
/// The `reth_newPayload` endpoint is a reth-specific extension that takes `ExecutionData`
/// directly, waits for persistence and cache updates to complete before processing,
/// and returns server-side timing breakdowns (latency, persistence wait, cache wait).
#[arg(long, default_value = "false", verbatim_doc_comment)]
reth_new_payload: bool,
}
/// Mode for determining when to stop ramping.
@@ -138,6 +146,9 @@ impl Command {
);
}
}
if self.reth_new_payload {
info!("Using reth_newPayload endpoint");
}
let mut blocks_processed = 0u64;
let total_benchmark_duration = Instant::now();
@@ -163,7 +174,7 @@ impl Command {
// Regenerate the payload from the modified block, but keep the original sidecar
// which contains the actual execution requests data (not just the hash)
let (payload, _) = ExecutionPayload::from_block_unchecked(block_hash, &block);
let (version, params) = payload_to_new_payload(
let (version, params, execution_data) = payload_to_new_payload(
payload,
sidecar,
false,
@@ -174,13 +185,18 @@ impl Command {
// Save payload to file with version info for replay
let payload_path =
self.output.join(format!("payload_block_{}.json", block.header.number));
let file =
GasRampPayloadFile { version: version as u8, block_hash, params: params.clone() };
let file = GasRampPayloadFile {
version: version as u8,
block_hash,
params: params.clone(),
execution_data: Some(execution_data.clone()),
};
let payload_json = serde_json::to_string_pretty(&file)?;
std::fs::write(&payload_path, &payload_json)?;
info!(target: "reth-bench", block_number = block.header.number, path = %payload_path.display(), "Saved payload");
call_new_payload(&provider, version, params).await?;
let reth_data = self.reth_new_payload.then_some(execution_data);
let _ = call_new_payload_with_reth(&provider, version, params, reth_data).await?;
let forkchoice_state = ForkchoiceState {
head_block_hash: block_hash,

View File

@@ -0,0 +1,156 @@
//! Prometheus metrics scraper for reth-bench.
//!
//! Scrapes a node's Prometheus metrics endpoint after each block to record
//! execution and state root durations with block-level granularity.
use csv::Writer;
use eyre::Context;
use reqwest::Client;
use serde::Serialize;
use std::{path::Path, time::Duration};
use tracing::info;
/// Suffix for the metrics CSV output file.
pub(crate) const METRICS_OUTPUT_SUFFIX: &str = "metrics.csv";
/// A single row of scraped prometheus metrics for one block.
#[derive(Debug, Clone, Serialize)]
pub(crate) struct MetricsRow {
/// The block number.
pub(crate) block_number: u64,
/// EVM execution duration in seconds (from `sync_execution_execution_duration` gauge).
pub(crate) execution_duration_secs: Option<f64>,
/// State root computation duration in seconds (from
/// `sync_block_validation_state_root_duration` gauge).
pub(crate) state_root_duration_secs: Option<f64>,
}
/// Scrapes a Prometheus metrics endpoint after each block to collect
/// execution and state root durations.
pub(crate) struct MetricsScraper {
/// The full URL of the Prometheus metrics endpoint.
url: String,
/// Reusable HTTP client.
client: Client,
/// Collected metrics rows, one per block.
rows: Vec<MetricsRow>,
}
impl MetricsScraper {
/// Creates a new scraper if a URL is provided.
pub(crate) fn maybe_new(url: Option<String>) -> Option<Self> {
url.map(|url| {
info!(target: "reth-bench", %url, "Prometheus metrics scraping enabled");
let client = Client::builder()
.timeout(Duration::from_secs(5))
.build()
.expect("failed to build reqwest client");
Self { url, client, rows: Vec::new() }
})
}
/// Scrapes the metrics endpoint and records values for the given block.
pub(crate) async fn scrape_after_block(&mut self, block_number: u64) -> eyre::Result<()> {
let text = self
.client
.get(&self.url)
.send()
.await
.wrap_err("failed to fetch metrics endpoint")?
.error_for_status()
.wrap_err("metrics endpoint returned error status")?
.text()
.await
.wrap_err("failed to read metrics response body")?;
let execution = parse_gauge(&text, "sync_execution_execution_duration");
let state_root = parse_gauge(&text, "sync_block_validation_state_root_duration");
self.rows.push(MetricsRow {
block_number,
execution_duration_secs: execution,
state_root_duration_secs: state_root,
});
Ok(())
}
/// Writes collected metrics to a CSV file in the output directory.
pub(crate) fn write_csv(&self, output_dir: &Path) -> eyre::Result<()> {
let path = output_dir.join(METRICS_OUTPUT_SUFFIX);
info!(target: "reth-bench", "Writing scraped metrics to file: {:?}", path);
let mut writer = Writer::from_path(&path)?;
for row in &self.rows {
writer.serialize(row)?;
}
writer.flush()?;
Ok(())
}
}
/// Parses a Prometheus gauge value from exposition-format text.
///
/// Searches for lines starting with `name` followed by either a space or `{`
/// (for labeled metrics), then parses the numeric value. Returns the last
/// matching sample to handle metrics emitted with multiple label sets.
fn parse_gauge(text: &str, name: &str) -> Option<f64> {
let mut result = None;
for line in text.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
if !line.starts_with(name) {
continue;
}
// Ensure we match the full metric name, not a prefix of another metric.
let rest = &line[name.len()..];
if !rest.starts_with(' ') && !rest.starts_with('{') {
continue;
}
// Format: `metric_name{labels} value [timestamp]` or `metric_name value [timestamp]`
// Value is always the second whitespace-separated token.
let mut parts = line.split_whitespace();
if let Some(value_str) = parts.nth(1) &&
let Ok(v) = value_str.parse::<f64>()
{
result = Some(v);
}
}
result
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_gauge_simple() {
let text = r#"# HELP sync_execution_execution_duration Duration of execution
# TYPE sync_execution_execution_duration gauge
sync_execution_execution_duration 0.123456
"#;
assert_eq!(parse_gauge(text, "sync_execution_execution_duration"), Some(0.123456));
}
#[test]
fn test_parse_gauge_missing() {
let text = "some_other_metric 1.0\n";
assert_eq!(parse_gauge(text, "sync_execution_execution_duration"), None);
}
#[test]
fn test_parse_gauge_with_labels() {
let text = "sync_block_validation_state_root_duration{instance=\"node1\"} 0.5\n";
assert_eq!(parse_gauge(text, "sync_block_validation_state_root_duration"), Some(0.5));
}
#[test]
fn test_parse_gauge_prefix_no_false_match() {
let text =
"sync_execution_execution_duration_total 99.0\nsync_execution_execution_duration 0.5\n";
assert_eq!(parse_gauge(text, "sync_execution_execution_duration"), Some(0.5));
}
}

View File

@@ -12,6 +12,7 @@ pub(crate) mod helpers;
pub use generate_big_block::{
RawTransaction, RpcTransactionSource, TransactionCollector, TransactionSource,
};
pub(crate) mod metrics_scraper;
mod new_payload_fcu;
mod new_payload_only;
mod output;

View File

@@ -13,6 +13,7 @@ use crate::{
bench::{
context::BenchContext,
helpers::parse_duration,
metrics_scraper::MetricsScraper,
output::{
write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
},
@@ -20,7 +21,7 @@ use crate::{
derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
},
},
valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload_with_reth},
};
use alloy_provider::Provider;
use alloy_rpc_types_engine::ForkchoiceState;
@@ -30,7 +31,7 @@ use reth_cli_runner::CliContext;
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
use reth_node_core::args::BenchmarkArgs;
use std::time::{Duration, Instant};
use tracing::{debug, info};
use tracing::{debug, info, warn};
/// `reth benchmark new-payload-fcu` command
#[derive(Debug, Parser)]
@@ -150,10 +151,17 @@ impl Command {
auth_provider,
mut next_block,
is_optimism,
..
use_reth_namespace,
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let total_blocks = benchmark_mode.total_blocks();
let mut metrics_scraper = MetricsScraper::maybe_new(self.benchmark.metrics_url.clone());
if use_reth_namespace {
info!("Using reth_newPayload endpoint");
}
let buffer_size = self.rpc_block_buffer_size;
// Use a oneshot channel to propagate errors from the spawned task
@@ -230,16 +238,40 @@ impl Command {
finalized_block_hash: finalized,
};
let (version, params) = block_to_new_payload(block, is_optimism)?;
let (version, params, execution_data) = block_to_new_payload(block, is_optimism)?;
let start = Instant::now();
call_new_payload(&auth_provider, version, params).await?;
let reth_data = use_reth_namespace.then_some(execution_data);
let server_timings =
call_new_payload_with_reth(&auth_provider, version, params, reth_data).await?;
let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
let np_latency =
server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
let new_payload_result = NewPayloadResult {
gas_used,
latency: np_latency,
persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
execution_cache_wait: server_timings
.as_ref()
.map(|t| t.execution_cache_wait)
.unwrap_or_default(),
sparse_trie_wait: server_timings
.as_ref()
.map(|t| t.sparse_trie_wait)
.unwrap_or_default(),
};
let fcu_start = Instant::now();
call_forkchoice_updated(&auth_provider, version, forkchoice_state, None).await?;
let fcu_latency = fcu_start.elapsed();
let total_latency = start.elapsed();
let fcu_latency = total_latency - new_payload_result.latency;
let total_latency = if server_timings.is_some() {
// When using server-side latency for newPayload, derive total from the
// independently measured components to avoid mixing server-side and
// client-side (network-inclusive) timings.
np_latency + fcu_latency
} else {
start.elapsed()
};
let combined_result = CombinedResult {
block_number,
gas_limit,
@@ -259,6 +291,12 @@ impl Command {
};
info!(target: "reth-bench", progress, %combined_result);
if let Some(scraper) = metrics_scraper.as_mut() &&
let Err(err) = scraper.scrape_after_block(block_number).await
{
warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
}
if let Some(w) = &mut waiter {
w.on_block(block_number).await?;
}
@@ -284,6 +322,10 @@ impl Command {
write_benchmark_results(path, &gas_output_results, &combined_results)?;
}
if let (Some(path), Some(scraper)) = (&self.benchmark.output, &metrics_scraper) {
scraper.write_csv(path)?;
}
let gas_output =
TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;

View File

@@ -3,12 +3,13 @@
use crate::{
bench::{
context::BenchContext,
metrics_scraper::MetricsScraper,
output::{
NewPayloadResult, TotalGasOutput, TotalGasRow, GAS_OUTPUT_SUFFIX,
NEW_PAYLOAD_OUTPUT_SUFFIX,
},
},
valid_payload::{block_to_new_payload, call_new_payload},
valid_payload::{block_to_new_payload, call_new_payload_with_reth},
};
use alloy_provider::Provider;
use clap::Parser;
@@ -49,10 +50,17 @@ impl Command {
auth_provider,
mut next_block,
is_optimism,
..
use_reth_namespace,
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let total_blocks = benchmark_mode.total_blocks();
let mut metrics_scraper = MetricsScraper::maybe_new(self.benchmark.metrics_url.clone());
if use_reth_namespace {
info!("Using reth_newPayload endpoint");
}
let buffer_size = self.rpc_block_buffer_size;
// Use a oneshot channel to propagate errors from the spawned task
@@ -100,12 +108,28 @@ impl Command {
debug!(target: "reth-bench", number=?block.header.number, "Sending payload to engine");
let (version, params) = block_to_new_payload(block, is_optimism)?;
let (version, params, execution_data) = block_to_new_payload(block, is_optimism)?;
let start = Instant::now();
call_new_payload(&auth_provider, version, params).await?;
let reth_data = use_reth_namespace.then_some(execution_data);
let server_timings =
call_new_payload_with_reth(&auth_provider, version, params, reth_data).await?;
let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
let latency =
server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
let new_payload_result = NewPayloadResult {
gas_used,
latency,
persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
execution_cache_wait: server_timings
.as_ref()
.map(|t| t.execution_cache_wait)
.unwrap_or_default(),
sparse_trie_wait: server_timings
.as_ref()
.map(|t| t.sparse_trie_wait)
.unwrap_or_default(),
};
blocks_processed += 1;
let progress = match total_blocks {
Some(total) => format!("{blocks_processed}/{total}"),
@@ -121,6 +145,12 @@ impl Command {
let row =
TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
results.push((row, new_payload_result));
if let Some(scraper) = metrics_scraper.as_mut() &&
let Err(err) = scraper.scrape_after_block(block_number).await
{
tracing::warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
}
}
// Check if the spawned task encountered an error
@@ -151,6 +181,10 @@ impl Command {
}
writer.flush()?;
if let Some(scraper) = &metrics_scraper {
scraper.write_csv(&path)?;
}
info!(target: "reth-bench", "Finished writing benchmark output files to {:?}.", path);
}

View File

@@ -27,6 +27,9 @@ pub(crate) struct GasRampPayloadFile {
pub(crate) block_hash: B256,
/// The params to pass to newPayload.
pub(crate) params: serde_json::Value,
/// The execution data for `reth_newPayload`.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) execution_data: Option<alloy_rpc_types_engine::ExecutionData>,
}
/// This represents the results of a single `newPayload` call in the benchmark, containing the gas
@@ -37,6 +40,12 @@ pub(crate) struct NewPayloadResult {
pub(crate) gas_used: u64,
/// The latency of the `newPayload` call.
pub(crate) latency: Duration,
/// Time spent waiting for persistence. `None` when no persistence was in-flight.
pub(crate) persistence_wait: Option<Duration>,
/// Time spent waiting for execution cache lock.
pub(crate) execution_cache_wait: Duration,
/// Time spent waiting for sparse trie lock.
pub(crate) sparse_trie_wait: Duration,
}
impl NewPayloadResult {
@@ -67,9 +76,12 @@ impl Serialize for NewPayloadResult {
{
// convert the time to microseconds
let time = self.latency.as_micros();
let mut state = serializer.serialize_struct("NewPayloadResult", 2)?;
let mut state = serializer.serialize_struct("NewPayloadResult", 5)?;
state.serialize_field("gas_used", &self.gas_used)?;
state.serialize_field("latency", &time)?;
state.serialize_field("persistence_wait", &self.persistence_wait.map(|d| d.as_micros()))?;
state.serialize_field("execution_cache_wait", &self.execution_cache_wait.as_micros())?;
state.serialize_field("sparse_trie_wait", &self.sparse_trie_wait.as_micros())?;
state.end()
}
}
@@ -126,7 +138,7 @@ impl Serialize for CombinedResult {
let fcu_latency = self.fcu_latency.as_micros();
let new_payload_latency = self.new_payload_result.latency.as_micros();
let total_latency = self.total_latency.as_micros();
let mut state = serializer.serialize_struct("CombinedResult", 7)?;
let mut state = serializer.serialize_struct("CombinedResult", 10)?;
// flatten the new payload result because this is meant for CSV writing
state.serialize_field("block_number", &self.block_number)?;
@@ -136,6 +148,18 @@ impl Serialize for CombinedResult {
state.serialize_field("new_payload_latency", &new_payload_latency)?;
state.serialize_field("fcu_latency", &fcu_latency)?;
state.serialize_field("total_latency", &total_latency)?;
state.serialize_field(
"persistence_wait",
&self.new_payload_result.persistence_wait.map(|d| d.as_micros()),
)?;
state.serialize_field(
"execution_cache_wait",
&self.new_payload_result.execution_cache_wait.as_micros(),
)?;
state.serialize_field(
"sparse_trie_wait",
&self.new_payload_result.sparse_trie_wait.as_micros(),
)?;
state.end()
}
}

View File

@@ -15,6 +15,7 @@ use crate::{
authenticated_transport::AuthenticatedTransportConnect,
bench::{
helpers::parse_duration,
metrics_scraper::MetricsScraper,
output::{
write_benchmark_results, CombinedResult, GasRampPayloadFile, NewPayloadResult,
TotalGasOutput, TotalGasRow,
@@ -23,12 +24,15 @@ use crate::{
derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
},
},
valid_payload::{call_forkchoice_updated, call_new_payload},
valid_payload::{call_forkchoice_updated, call_new_payload_with_reth},
};
use alloy_primitives::B256;
use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret};
use alloy_rpc_types_engine::{
CancunPayloadFields, ExecutionData, ExecutionPayloadEnvelopeV4, ExecutionPayloadSidecar,
ForkchoiceState, JwtSecret, PraguePayloadFields,
};
use clap::Parser;
use eyre::Context;
use reth_cli_runner::CliContext;
@@ -124,6 +128,22 @@ pub struct Command {
/// If not provided, derives from engine RPC URL by changing scheme to ws and port to 8546.
#[arg(long, value_name = "WS_RPC_URL", verbatim_doc_comment)]
ws_rpc_url: Option<String>,
/// Use `reth_newPayload` endpoint instead of `engine_newPayload*`.
///
/// The `reth_newPayload` endpoint is a reth-specific extension that takes `ExecutionData`
/// directly, waits for persistence and cache updates to complete before processing,
/// and returns server-side timing breakdowns (latency, persistence wait, cache wait).
#[arg(long, default_value = "false", verbatim_doc_comment)]
reth_new_payload: bool,
/// Optional Prometheus metrics endpoint to scrape after each block.
///
/// When provided, reth-bench will fetch metrics from this URL after each
/// payload, recording per-block execution and state root durations.
/// Results are written to `metrics.csv` in the output directory.
#[arg(long = "metrics-url", value_name = "URL", verbatim_doc_comment)]
metrics_url: Option<String>,
}
/// A loaded payload ready for execution.
@@ -163,6 +183,9 @@ impl Command {
self.persistence_threshold
);
}
if self.reth_new_payload {
info!("Using reth_newPayload endpoint");
}
// Set up waiter based on configured options
// When both are set: wait at least wait_time, and also wait for persistence if needed
@@ -190,6 +213,8 @@ impl Command {
(None, false) => None,
};
let mut metrics_scraper = MetricsScraper::maybe_new(self.metrics_url.clone());
// Set up authenticated engine provider
let jwt =
std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
@@ -248,7 +273,15 @@ impl Command {
"Executing gas ramp payload (newPayload + FCU)"
);
call_new_payload(&auth_provider, payload.version, payload.file.params.clone()).await?;
let reth_data =
if self.reth_new_payload { payload.file.execution_data.clone() } else { None };
let _ = call_new_payload_with_reth(
&auth_provider,
payload.version,
payload.file.params.clone(),
reth_data,
)
.await?;
let fcu_state = ForkchoiceState {
head_block_hash: payload.file.block_hash,
@@ -303,20 +336,47 @@ impl Command {
"Sending newPayload"
);
let status = auth_provider
.new_payload_v4(
execution_payload.clone(),
vec![],
B256::ZERO,
envelope.execution_requests.to_vec(),
)
.await?;
let params = serde_json::to_value((
execution_payload.clone(),
Vec::<B256>::new(),
B256::ZERO,
envelope.execution_requests.to_vec(),
))?;
let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
let reth_data = self.reth_new_payload.then(|| ExecutionData {
payload: execution_payload.clone().into(),
sidecar: ExecutionPayloadSidecar::v4(
CancunPayloadFields {
versioned_hashes: Vec::new(),
parent_beacon_block_root: B256::ZERO,
},
PraguePayloadFields { requests: envelope.execution_requests.clone().into() },
),
});
if !status.is_valid() {
return Err(eyre::eyre!("Payload rejected: {:?}", status));
}
let server_timings = call_new_payload_with_reth(
&auth_provider,
EngineApiMessageVersion::V4,
params,
reth_data,
)
.await?;
let np_latency =
server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
let new_payload_result = NewPayloadResult {
gas_used,
latency: np_latency,
persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
execution_cache_wait: server_timings
.as_ref()
.map(|t| t.execution_cache_wait)
.unwrap_or_default(),
sparse_trie_wait: server_timings
.as_ref()
.map(|t| t.sparse_trie_wait)
.unwrap_or_default(),
};
let fcu_state = ForkchoiceState {
head_block_hash: block_hash,
@@ -326,10 +386,12 @@ impl Command {
debug!(target: "reth-bench", method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated");
let fcu_start = Instant::now();
let fcu_result = auth_provider.fork_choice_updated_v3(fcu_state, None).await?;
let fcu_latency = fcu_start.elapsed();
let total_latency = start.elapsed();
let fcu_latency = total_latency - new_payload_result.latency;
let total_latency =
if server_timings.is_some() { np_latency + fcu_latency } else { start.elapsed() };
let combined_result = CombinedResult {
block_number,
@@ -344,6 +406,12 @@ impl Command {
let progress = format!("{}/{}", i + 1, payloads.len());
info!(target: "reth-bench", progress, %combined_result);
if let Some(scraper) = metrics_scraper.as_mut() &&
let Err(err) = scraper.scrape_after_block(block_number).await
{
tracing::warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
}
if let Some(w) = &mut waiter {
w.on_block(block_number).await?;
}
@@ -352,7 +420,7 @@ impl Command {
TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
results.push((gas_row, combined_result));
debug!(target: "reth-bench", ?status, ?fcu_result, "Payload executed successfully");
debug!(target: "reth-bench", ?fcu_result, "Payload executed successfully");
parent_hash = block_hash;
}
@@ -367,6 +435,10 @@ impl Command {
write_benchmark_results(path, &gas_output_results, &combined_results)?;
}
if let (Some(path), Some(scraper)) = (&self.output, &metrics_scraper) {
scraper.write_csv(path)?;
}
let gas_output =
TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
info!(

View File

@@ -6,12 +6,14 @@ use alloy_eips::eip7685::Requests;
use alloy_primitives::B256;
use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
use alloy_rpc_types_engine::{
ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ForkchoiceState,
ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
ExecutionData, ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar,
ForkchoiceState, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
};
use alloy_transport::TransportResult;
use op_alloy_rpc_types_engine::OpExecutionPayloadV4;
use reth_node_api::EngineApiMessageVersion;
use serde::Deserialize;
use std::time::Duration;
use tracing::{debug, error};
/// An extension trait for providers that implement the engine API, to wait for a VALID response.
@@ -161,10 +163,13 @@ where
}
}
/// Converts an RPC block into versioned engine API params and an [`ExecutionData`].
///
/// Returns `(version, versioned_params, execution_data)`.
pub(crate) fn block_to_new_payload(
block: AnyRpcBlock,
is_optimism: bool,
) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> {
) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value, ExecutionData)> {
let block = block
.into_inner()
.map_header(|header| header.map(|h| h.into_header_with_defaults()))
@@ -179,13 +184,19 @@ pub(crate) fn block_to_new_payload(
payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None)
}
/// Converts an execution payload and sidecar into versioned engine API params and an
/// [`ExecutionData`].
///
/// Returns `(version, versioned_params, execution_data)`.
pub(crate) fn payload_to_new_payload(
payload: ExecutionPayload,
sidecar: ExecutionPayloadSidecar,
is_optimism: bool,
withdrawals_root: Option<B256>,
target_version: Option<EngineApiMessageVersion>,
) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> {
) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value, ExecutionData)> {
let execution_data = ExecutionData { payload: payload.clone(), sidecar: sidecar.clone() };
let (version, params) = match payload {
ExecutionPayload::V3(payload) => {
let cancun = sidecar.cancun().unwrap();
@@ -244,7 +255,7 @@ pub(crate) fn payload_to_new_payload(
}
};
Ok((version, params))
Ok((version, params, execution_data))
}
/// Calls the correct `engine_newPayload` method depending on the given [`ExecutionPayload`] and its
@@ -252,32 +263,109 @@ pub(crate) fn payload_to_new_payload(
///
/// # Panics
/// If the given payload is a V3 payload, but a parent beacon block root is provided as `None`.
#[allow(dead_code)]
pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
provider: P,
version: EngineApiMessageVersion,
params: serde_json::Value,
) -> TransportResult<()> {
let method = version.method_name();
) -> TransportResult<Option<NewPayloadTimingBreakdown>> {
call_new_payload_with_reth(provider, version, params, None).await
}
debug!(target: "reth-bench", method, "Sending newPayload");
/// Response from `reth_newPayload` endpoint, which includes server-measured latency.
#[derive(Debug, Deserialize)]
struct RethPayloadStatus {
#[serde(flatten)]
status: PayloadStatus,
latency_us: u64,
#[serde(default)]
persistence_wait_us: Option<u64>,
#[serde(default)]
execution_cache_wait_us: u64,
#[serde(default)]
sparse_trie_wait_us: u64,
}
let mut status: PayloadStatus = provider.client().request(method, &params).await?;
/// Server-side timing breakdown from `reth_newPayload` endpoint.
#[derive(Debug, Clone, Copy, Default)]
pub(crate) struct NewPayloadTimingBreakdown {
/// Server-side execution latency.
pub(crate) latency: Duration,
/// Time spent waiting for persistence. `None` when no persistence was in-flight.
pub(crate) persistence_wait: Option<Duration>,
/// Time spent waiting for execution cache lock.
pub(crate) execution_cache_wait: Duration,
/// Time spent waiting for sparse trie lock.
pub(crate) sparse_trie_wait: Duration,
}
while !status.is_valid() {
if status.is_invalid() {
error!(target: "reth-bench", ?status, ?params, "Invalid {method}",);
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(std::io::Error::other(
format!("Invalid {method}: {status:?}"),
))))
/// Calls either `engine_newPayload*` or `reth_newPayload` depending on whether
/// `reth_execution_data` is provided.
///
/// When `reth_execution_data` is `Some`, uses the `reth_newPayload` endpoint which takes
/// `ExecutionData` directly and waits for persistence and cache updates to complete.
///
/// Returns the server-reported timing breakdown when using the reth namespace, or `None` for
/// the standard engine namespace.
pub(crate) async fn call_new_payload_with_reth<N: Network, P: Provider<N>>(
provider: P,
version: EngineApiMessageVersion,
params: serde_json::Value,
reth_execution_data: Option<ExecutionData>,
) -> TransportResult<Option<NewPayloadTimingBreakdown>> {
if let Some(execution_data) = reth_execution_data {
let method = "reth_newPayload";
let reth_params = serde_json::to_value((execution_data.clone(),))
.expect("ExecutionData serialization cannot fail");
debug!(target: "reth-bench", method, "Sending newPayload");
let mut resp: RethPayloadStatus = provider.client().request(method, &reth_params).await?;
while !resp.status.is_valid() {
if resp.status.is_invalid() {
error!(target: "reth-bench", status=?resp.status, "Invalid {method}");
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
std::io::Error::other(format!("Invalid {method}: {:?}", resp.status)),
)))
}
if resp.status.is_syncing() {
return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
"invalid range: no canonical state found for parent of requested block",
))
}
resp = provider.client().request(method, &reth_params).await?;
}
if status.is_syncing() {
return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
"invalid range: no canonical state found for parent of requested block",
))
Ok(Some(NewPayloadTimingBreakdown {
latency: Duration::from_micros(resp.latency_us),
persistence_wait: resp.persistence_wait_us.map(Duration::from_micros),
execution_cache_wait: Duration::from_micros(resp.execution_cache_wait_us),
sparse_trie_wait: Duration::from_micros(resp.sparse_trie_wait_us),
}))
} else {
let method = version.method_name();
debug!(target: "reth-bench", method, "Sending newPayload");
let mut status: PayloadStatus = provider.client().request(method, &params).await?;
while !status.is_valid() {
if status.is_invalid() {
error!(target: "reth-bench", ?status, ?params, "Invalid {method}",);
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
std::io::Error::other(format!("Invalid {method}: {status:?}")),
)))
}
if status.is_syncing() {
return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
"invalid range: no canonical state found for parent of requested block",
))
}
status = provider.client().request(method, &params).await?;
}
status = provider.client().request(method, &params).await?;
Ok(None)
}
Ok(())
}
/// Calls the correct `engine_forkchoiceUpdated` method depending on the given

View File

@@ -190,6 +190,7 @@ min-trace-logs = [
"reth-node-core/min-trace-logs",
]
trie-debug = ["reth-node-builder/trie-debug", "reth-node-core/trie-debug"]
rocksdb = ["reth-ethereum-cli/rocksdb", "reth-node-core/rocksdb"]
edge = ["rocksdb"]

View File

@@ -43,7 +43,7 @@ reth-node-metrics.workspace = true
reth-ethereum-primitives = { workspace = true, optional = true }
reth-provider.workspace = true
reth-prune.workspace = true
reth-prune-types = { workspace = true, optional = true }
reth-prune-types.workspace = true
reth-revm.workspace = true
reth-stages.workspace = true
reth-stages-types = { workspace = true, optional = true }
@@ -125,7 +125,7 @@ arbitrary = [
"reth-stages-types/test-utils",
"reth-trie-common/test-utils",
"reth-codecs/arbitrary",
"reth-prune-types?/arbitrary",
"reth-prune-types/arbitrary",
"reth-stages-types?/arbitrary",
"reth-trie-common/arbitrary",
"alloy-consensus/arbitrary",

View File

@@ -89,13 +89,15 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
/// Initializes environment according to [`AccessRights`] and returns an instance of
/// [`Environment`].
///
/// Internally builds a [`reth_tasks::Runtime`] attached to the current tokio handle for
/// parallel storage I/O.
pub fn init<N: CliNodeTypes>(&self, access: AccessRights) -> eyre::Result<Environment<N>>
/// The provided `runtime` is used for parallel storage I/O.
pub fn init<N: CliNodeTypes>(
&self,
access: AccessRights,
runtime: reth_tasks::Runtime,
) -> eyre::Result<Environment<N>>
where
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
{
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
let data_dir = self.datadir.clone().resolve_datadir(self.chain.chain());
let db_path = data_dir.db();
let sf_path = data_dir.static_files();

View File

@@ -16,6 +16,7 @@ mod copy;
mod diff;
mod get;
mod list;
mod prune_checkpoints;
mod repair_trie;
mod settings;
mod state;
@@ -67,6 +68,8 @@ pub enum Subcommands {
Path,
/// Manage storage settings
Settings(settings::Command),
/// View or set prune checkpoints
PruneCheckpoints(prune_checkpoints::Command),
/// Gets storage size information for an account
AccountStorage(account_storage::Command),
/// Gets account state and storage at a specific block
@@ -83,7 +86,8 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
/// provided command.
macro_rules! db_exec {
($env:expr, $tool:ident, $N:ident, $access_rights:expr, $command:block) => {
let Environment { provider_factory, .. } = $env.init::<$N>($access_rights)?;
let Environment { provider_factory, .. } =
$env.init::<$N>($access_rights, ctx.task_executor.clone())?;
let $tool = DbTool::new(provider_factory)?;
$command;
@@ -204,6 +208,11 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
command.execute(&tool)?;
});
}
Subcommands::PruneCheckpoints(command) => {
db_exec!(self.env, tool, N, command.access_rights(), {
command.execute(&tool)?;
});
}
Subcommands::AccountStorage(command) => {
db_exec!(self.env, tool, N, AccessRights::RO, {
command.execute(&tool)?;

View File

@@ -0,0 +1,221 @@
//! `reth db prune-checkpoints` command for viewing and setting prune checkpoint values.
use clap::{Args, Parser, Subcommand, ValueEnum};
use reth_db_common::DbTool;
use reth_provider::{providers::ProviderNodeTypes, DBProvider, DatabaseProviderFactory};
use reth_prune_types::{PruneCheckpoint, PruneMode, PruneSegment};
use reth_storage_api::{PruneCheckpointReader, PruneCheckpointWriter};
use crate::common::AccessRights;
/// `reth db prune-checkpoints` subcommand
#[derive(Debug, Parser)]
pub struct Command {
#[command(subcommand)]
command: Subcommands,
}
impl Command {
/// Returns database access rights required for the command.
pub fn access_rights(&self) -> AccessRights {
match &self.command {
Subcommands::Get { .. } => AccessRights::RO,
Subcommands::Set(_) => AccessRights::RW,
}
}
}
#[derive(Debug, Subcommand)]
enum Subcommands {
/// Get prune checkpoint(s) from database.
///
/// Shows the current prune progress for each segment, including the highest
/// pruned block/tx number and the active prune mode.
Get {
/// Specific segment to query. If omitted, shows all segments.
#[arg(long, value_enum)]
segment: Option<SegmentArg>,
},
/// Set a prune checkpoint for a segment.
///
/// WARNING: Manually setting checkpoints can cause data inconsistencies.
/// Only use this if you know what you're doing (e.g., recovering from a
/// corrupted checkpoint or forcing a re-prune from a specific block).
Set(SetArgs),
}
/// Arguments for the `set` subcommand
#[derive(Debug, Args)]
pub struct SetArgs {
/// The prune segment to update
#[arg(long, value_enum)]
segment: SegmentArg,
/// Highest pruned block number
#[arg(long)]
block_number: Option<u64>,
/// Highest pruned transaction number
#[arg(long)]
tx_number: Option<u64>,
/// Prune mode to write: full, distance, or before
#[arg(long, value_enum)]
mode: PruneModeArg,
/// Value for distance or before mode (required unless mode is full)
#[arg(long, required_if_eq_any([("mode", "distance"), ("mode", "before")]))]
mode_value: Option<u64>,
}
/// CLI-friendly prune segment names (excludes deprecated variants)
#[derive(Debug, Clone, Copy, ValueEnum)]
#[clap(rename_all = "kebab-case")]
pub enum SegmentArg {
SenderRecovery,
TransactionLookup,
Receipts,
ContractLogs,
AccountHistory,
StorageHistory,
Bodies,
}
impl From<SegmentArg> for PruneSegment {
fn from(arg: SegmentArg) -> Self {
match arg {
SegmentArg::SenderRecovery => Self::SenderRecovery,
SegmentArg::TransactionLookup => Self::TransactionLookup,
SegmentArg::Receipts => Self::Receipts,
SegmentArg::ContractLogs => Self::ContractLogs,
SegmentArg::AccountHistory => Self::AccountHistory,
SegmentArg::StorageHistory => Self::StorageHistory,
SegmentArg::Bodies => Self::Bodies,
}
}
}
/// CLI-friendly prune mode
#[derive(Debug, Clone, Copy, ValueEnum)]
#[clap(rename_all = "kebab-case")]
pub enum PruneModeArg {
/// Prune all blocks
Full,
/// Keep the last N blocks (requires --mode-value)
Distance,
/// Prune blocks before a specific block number (requires --mode-value)
Before,
}
impl Command {
/// Execute the command
pub fn execute<N: ProviderNodeTypes>(self, tool: &DbTool<N>) -> eyre::Result<()> {
match self.command {
Subcommands::Get { segment } => Self::get(tool, segment),
Subcommands::Set(args) => Self::set(tool, args),
}
}
fn get<N: ProviderNodeTypes>(
tool: &DbTool<N>,
segment: Option<SegmentArg>,
) -> eyre::Result<()> {
let provider = tool.provider_factory.provider()?;
match segment {
Some(seg) => {
let segment: PruneSegment = seg.into();
match provider.get_prune_checkpoint(segment)? {
Some(checkpoint) => print_checkpoint(segment, &checkpoint),
None => println!("No checkpoint found for {segment}"),
}
}
None => {
let mut checkpoints = provider.get_prune_checkpoints()?;
checkpoints.sort_by_key(|(seg, _)| *seg);
if checkpoints.is_empty() {
println!("No prune checkpoints found.");
} else {
println!(
"{:<25} {:>15} {:>15} {:>20}",
"Segment", "Block Number", "Tx Number", "Prune Mode"
);
println!("{}", "-".repeat(80));
for (segment, checkpoint) in &checkpoints {
println!(
"{:<25} {:>15} {:>15} {:>20}",
segment.to_string(),
fmt_opt(checkpoint.block_number),
fmt_opt(checkpoint.tx_number),
fmt_mode(&checkpoint.prune_mode),
);
}
}
}
}
Ok(())
}
fn set<N: ProviderNodeTypes>(tool: &DbTool<N>, args: SetArgs) -> eyre::Result<()> {
eyre::ensure!(
args.block_number.is_some() || args.tx_number.is_some(),
"at least one of --block-number or --tx-number must be provided"
);
let prune_mode = match args.mode {
PruneModeArg::Full => PruneMode::Full,
PruneModeArg::Distance => PruneMode::Distance(
args.mode_value
.ok_or_else(|| eyre::eyre!("--mode-value is required for distance mode"))?,
),
PruneModeArg::Before => PruneMode::Before(
args.mode_value
.ok_or_else(|| eyre::eyre!("--mode-value is required for before mode"))?,
),
};
let segment: PruneSegment = args.segment.into();
let checkpoint = PruneCheckpoint {
block_number: args.block_number,
tx_number: args.tx_number,
prune_mode,
};
let provider_rw = tool.provider_factory.database_provider_rw()?;
// Show previous value if any
if let Some(prev) = provider_rw.get_prune_checkpoint(segment)? {
println!("Previous checkpoint for {segment}:");
print_checkpoint(segment, &prev);
println!();
}
provider_rw.save_prune_checkpoint(segment, checkpoint)?;
provider_rw.commit()?;
println!("Updated checkpoint for {segment}:");
print_checkpoint(segment, &checkpoint);
Ok(())
}
}
fn print_checkpoint(segment: PruneSegment, checkpoint: &PruneCheckpoint) {
println!(" Segment: {segment}");
println!(" Block Number: {}", fmt_opt(checkpoint.block_number));
println!(" Tx Number: {}", fmt_opt(checkpoint.tx_number));
println!(" Prune Mode: {}", fmt_mode(&checkpoint.prune_mode));
}
fn fmt_opt(val: Option<u64>) -> String {
val.map_or("-".to_string(), |n| n.to_string())
}
fn fmt_mode(mode: &PruneMode) -> String {
match mode {
PruneMode::Full => "Full".to_string(),
PruneMode::Distance(d) => format!("Distance({d})"),
PruneMode::Before(b) => format!("Before({b})"),
}
}

View File

@@ -384,15 +384,19 @@ fn resumable_download(url: &str, target_dir: &Path) -> Result<(PathBuf, u64)> {
let mut total_size: Option<u64> = None;
let mut last_error: Option<eyre::Error> = None;
let finalize_download = |size: u64| -> Result<(PathBuf, u64)> {
fs::rename(&part_path, &final_path)?;
info!(target: "reth::cli", "Download complete: {}", final_path.display());
Ok((final_path.clone(), size))
};
for attempt in 1..=MAX_DOWNLOAD_RETRIES {
let existing_size = fs::metadata(&part_path).map(|m| m.len()).unwrap_or(0);
if let Some(total) = total_size &&
existing_size >= total
{
fs::rename(&part_path, &final_path)?;
info!(target: "reth::cli", "Download complete: {}", final_path.display());
return Ok((final_path, total));
return finalize_download(total);
}
if attempt > 1 {
@@ -476,9 +480,7 @@ fn resumable_download(url: &str, target_dir: &Path) -> Result<(PathBuf, u64)> {
continue;
}
fs::rename(&part_path, &final_path)?;
info!(target: "reth::cli", "Download complete: {}", final_path.display());
return Ok((final_path, current_total));
return finalize_download(current_total);
}
Err(last_error

View File

@@ -44,11 +44,11 @@ pub struct ExportArgs {
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ExportEraCommand<C> {
/// Execute `export-era` command
pub async fn execute<N>(self) -> eyre::Result<()>
pub async fn execute<N>(self, runtime: reth_tasks::Runtime) -> eyre::Result<()>
where
N: CliNodeTypes<ChainSpec = C::ChainSpec>,
{
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO)?;
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO, runtime)?;
// Either specified path or default to `<data-dir>/<chain>/era1-export/`
let data_dir = match &self.export.path {

View File

@@ -47,6 +47,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
pub async fn execute<N, Comp>(
self,
components: impl FnOnce(Arc<N::ChainSpec>) -> Comp,
runtime: reth_tasks::Runtime,
) -> eyre::Result<()>
where
N: CliNodeTypes<ChainSpec = C::ChainSpec>,
@@ -54,7 +55,8 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
{
info!(target: "reth::cli", "reth {} starting", version_metadata().short_version);
let Environment { provider_factory, config, .. } = self.env.init::<N>(AccessRights::RW)?;
let Environment { provider_factory, config, .. } =
self.env.init::<N>(AccessRights::RW, runtime.clone())?;
let components = components(provider_factory.chain_spec());
@@ -85,6 +87,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
&config,
executor.clone(),
consensus.clone(),
runtime.clone(),
)
.await?;

View File

@@ -87,6 +87,7 @@ pub async fn import_blocks_from_file<N>(
config: &Config,
executor: impl ConfigureEvm<Primitives = N::Primitives> + 'static,
consensus: Arc<impl FullConsensus<N::Primitives> + 'static>,
runtime: reth_tasks::Runtime,
) -> eyre::Result<ImportResult>
where
N: ProviderNodeTypes,
@@ -147,6 +148,7 @@ where
static_file_producer.clone(),
import_config.no_state,
executor.clone(),
runtime.clone(),
)?;
// override the tip
@@ -257,6 +259,7 @@ where
///
/// If configured to execute, all stages will run. Otherwise, only stages that don't require state
/// will run.
#[expect(clippy::too_many_arguments)]
pub fn build_import_pipeline_impl<N, C, E>(
config: &Config,
provider_factory: ProviderFactory<N>,
@@ -265,6 +268,7 @@ pub fn build_import_pipeline_impl<N, C, E>(
static_file_producer: StaticFileProducer<ProviderFactory<N>>,
disable_exec: bool,
evm_config: E,
runtime: reth_tasks::Runtime,
) -> eyre::Result<(Pipeline<N>, impl futures::Stream<Item = NodeEvent<N::Primitives>> + use<N, C, E>)>
where
N: ProviderNodeTypes,
@@ -283,7 +287,7 @@ where
let mut header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers)
.build(file_client.clone(), consensus.clone())
.into_task();
.into_task_with(&runtime);
// TODO: The pipeline should correctly configure the downloader on its own.
// Find the possibility to remove unnecessary pre-configuration.
header_downloader.update_local_head(local_head);
@@ -291,7 +295,7 @@ where
let mut body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies)
.build(file_client.clone(), consensus.clone(), provider_factory.clone())
.into_task();
.into_task_with(&runtime);
// TODO: The pipeline should correctly configure the downloader on its own.
// Find the possibility to remove unnecessary pre-configuration.
body_downloader

View File

@@ -64,13 +64,14 @@ impl TryFromChain for ChainKind {
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportEraCommand<C> {
/// Execute `import-era` command
pub async fn execute<N>(self) -> eyre::Result<()>
pub async fn execute<N>(self, runtime: reth_tasks::Runtime) -> eyre::Result<()>
where
N: CliNodeTypes<ChainSpec = C::ChainSpec>,
{
info!(target: "reth::cli", "reth {} starting", version_metadata().short_version);
let Environment { provider_factory, config, .. } = self.env.init::<N>(AccessRights::RW)?;
let Environment { provider_factory, config, .. } =
self.env.init::<N>(AccessRights::RW, runtime)?;
let mut hash_collector = Collector::new(config.stages.etl.file_size, config.stages.etl.dir);

View File

@@ -18,10 +18,13 @@ pub struct InitCommand<C: ChainSpecParser> {
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> InitCommand<C> {
/// Execute the `init` command
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(self) -> eyre::Result<()> {
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(
self,
runtime: reth_tasks::Runtime,
) -> eyre::Result<()> {
info!(target: "reth::cli", "reth init starting");
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RW)?;
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RW, runtime)?;
let genesis_block_number = provider_factory.chain_spec().genesis_header().number();
let hash = provider_factory

View File

@@ -65,7 +65,7 @@ pub struct InitStateCommand<C: ChainSpecParser> {
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> InitStateCommand<C> {
/// Execute the `init` command
pub async fn execute<N>(self) -> eyre::Result<()>
pub async fn execute<N>(self, runtime: reth_tasks::Runtime) -> eyre::Result<()>
where
N: CliNodeTypes<
ChainSpec = C::ChainSpec,
@@ -74,7 +74,8 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> InitStateC
{
info!(target: "reth::cli", "Reth init-state starting");
let Environment { config, provider_factory, .. } = self.env.init::<N>(AccessRights::RW)?;
let Environment { config, provider_factory, .. } =
self.env.init::<N>(AccessRights::RW, runtime)?;
let static_file_provider = provider_factory.static_file_provider();
let provider_rw = provider_factory.database_provider_rw()?;

View File

@@ -16,6 +16,7 @@ use reth_node_core::{
args::{DatadirArgs, NetworkArgs},
utils::get_single_header,
};
use reth_tasks::Runtime;
pub mod bootnode;
pub mod enode;
@@ -194,17 +195,18 @@ impl<C: ChainSpecParser> DownloadArgs<C> {
let rlpx_socket = (self.network.addr, self.network.port).into();
let boot_nodes = self.chain.bootnodes().unwrap_or_default();
let net = NetworkConfigBuilder::<N::NetworkPrimitives>::new(p2p_secret_key)
.peer_config(config.peers_config_with_basic_nodes_from_file(None))
.external_ip_resolver(self.network.nat.clone())
.network_id(self.network.network_id)
.boot_nodes(boot_nodes.clone())
.apply(|builder| {
self.network.discovery.apply_to_builder(builder, rlpx_socket, boot_nodes)
})
.build_with_noop_provider(self.chain.clone())
.manager()
.await?;
let net =
NetworkConfigBuilder::<N::NetworkPrimitives>::new(p2p_secret_key, Runtime::test())
.peer_config(config.peers_config_with_basic_nodes_from_file(None))
.external_ip_resolver(self.network.nat.clone())
.network_id(self.network.network_id)
.boot_nodes(boot_nodes.clone())
.apply(|builder| {
self.network.discovery.apply_to_builder(builder, rlpx_socket, boot_nodes)
})
.build_with_noop_provider(self.chain.clone())
.manager()
.await?;
let handle = net.handle().clone();
tokio::task::spawn(net);

View File

@@ -36,7 +36,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> PruneComma
self,
ctx: CliContext,
) -> eyre::Result<()> {
let env = self.env.init::<N>(AccessRights::RW)?;
let env = self.env.init::<N>(AccessRights::RW, ctx.task_executor.clone())?;
let provider_factory = env.provider_factory;
let config = env.config.prune;
let data_dir = env.data_dir;

View File

@@ -60,11 +60,15 @@ impl<C: ChainSpecParser> Command<C> {
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
/// Execute `re-execute` command
pub async fn execute<N>(self, components: impl CliComponentsBuilder<N>) -> eyre::Result<()>
pub async fn execute<N>(
self,
components: impl CliComponentsBuilder<N>,
runtime: reth_tasks::Runtime,
) -> eyre::Result<()>
where
N: CliNodeTypes<ChainSpec = C::ChainSpec>,
{
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO)?;
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO, runtime)?;
let components = components(provider_factory.chain_spec());

View File

@@ -37,11 +37,11 @@ pub struct Command<C: ChainSpecParser> {
impl<C: ChainSpecParser> Command<C> {
/// Execute `db` command
pub async fn execute<N: CliNodeTypes>(self) -> eyre::Result<()>
pub async fn execute<N: CliNodeTypes>(self, runtime: reth_tasks::Runtime) -> eyre::Result<()>
where
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
{
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RW)?;
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RW, runtime)?;
let tool = DbTool::new(provider_factory)?;

View File

@@ -16,6 +16,7 @@ use reth_stages::{stages::ExecutionStage, Stage, StageCheckpoint, UnwindInput};
use std::sync::Arc;
use tracing::info;
#[expect(clippy::too_many_arguments)]
pub(crate) async fn dump_execution_stage<N, E, C>(
db_tool: &DbTool<N>,
from: u64,
@@ -24,6 +25,7 @@ pub(crate) async fn dump_execution_stage<N, E, C>(
should_run: bool,
evm_config: E,
consensus: C,
runtime: reth_tasks::Runtime,
) -> eyre::Result<()>
where
N: ProviderNodeTypes<DB = DatabaseEnv>,
@@ -37,7 +39,6 @@ where
unwind_and_copy(db_tool, from, tip_block_number, &output_db, evm_config.clone())?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,

View File

@@ -18,6 +18,7 @@ pub(crate) async fn dump_hashing_account_stage<N: ProviderNodeTypes<DB = Databas
to: BlockNumber,
output_datadir: ChainPath<DataDirPath>,
should_run: bool,
runtime: reth_tasks::Runtime,
) -> Result<()> {
let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?;
@@ -33,7 +34,6 @@ pub(crate) async fn dump_hashing_account_stage<N: ProviderNodeTypes<DB = Databas
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,

View File

@@ -17,13 +17,13 @@ pub(crate) async fn dump_hashing_storage_stage<N: ProviderNodeTypes<DB = Databas
to: u64,
output_datadir: ChainPath<DataDirPath>,
should_run: bool,
runtime: reth_tasks::Runtime,
) -> Result<()> {
let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?;
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,

View File

@@ -24,6 +24,7 @@ use reth_stages::{
};
use tracing::info;
#[expect(clippy::too_many_arguments)]
pub(crate) async fn dump_merkle_stage<N>(
db_tool: &DbTool<N>,
from: BlockNumber,
@@ -32,6 +33,7 @@ pub(crate) async fn dump_merkle_stage<N>(
should_run: bool,
evm_config: impl ConfigureEvm<Primitives = N::Primitives>,
consensus: impl FullConsensus<N::Primitives> + 'static,
runtime: reth_tasks::Runtime,
) -> Result<()>
where
N: ProviderNodeTypes<DB = DatabaseEnv>,
@@ -57,7 +59,6 @@ where
unwind_and_copy(db_tool, (from, to), tip_block_number, &output_db, evm_config, consensus)?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,

View File

@@ -72,30 +72,36 @@ pub struct StageCommand {
}
macro_rules! handle_stage {
($stage_fn:ident, $tool:expr, $command:expr) => {{
($stage_fn:ident, $tool:expr, $command:expr, $runtime:expr) => {{
let StageCommand { output_datadir, from, to, dry_run, .. } = $command;
let output_datadir =
output_datadir.with_chain($tool.chain().chain(), DatadirArgs::default());
$stage_fn($tool, *from, *to, output_datadir, *dry_run).await?
$stage_fn($tool, *from, *to, output_datadir, *dry_run, $runtime).await?
}};
($stage_fn:ident, $tool:expr, $command:expr, $executor:expr, $consensus:expr) => {{
($stage_fn:ident, $tool:expr, $command:expr, $executor:expr, $consensus:expr, $runtime:expr) => {{
let StageCommand { output_datadir, from, to, dry_run, .. } = $command;
let output_datadir =
output_datadir.with_chain($tool.chain().chain(), DatadirArgs::default());
$stage_fn($tool, *from, *to, output_datadir, *dry_run, $executor, $consensus).await?
$stage_fn($tool, *from, *to, output_datadir, *dry_run, $executor, $consensus, $runtime)
.await?
}};
}
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C> {
/// Execute `dump-stage` command
pub async fn execute<N, Comp, F>(self, components: F) -> eyre::Result<()>
pub async fn execute<N, Comp, F>(
self,
components: F,
runtime: reth_tasks::Runtime,
) -> eyre::Result<()>
where
N: CliNodeTypes<ChainSpec = C::ChainSpec>,
Comp: CliNodeComponents<N>,
F: FnOnce(Arc<C::ChainSpec>) -> Comp,
{
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO)?;
let Environment { provider_factory, .. } =
self.env.init::<N>(AccessRights::RO, runtime.clone())?;
let tool = DbTool::new(provider_factory)?;
let components = components(tool.chain());
let evm_config = components.evm_config().clone();
@@ -103,12 +109,23 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
match &self.command {
Stages::Execution(cmd) => {
handle_stage!(dump_execution_stage, &tool, cmd, evm_config, consensus)
handle_stage!(
dump_execution_stage,
&tool,
cmd,
evm_config,
consensus,
runtime.clone()
)
}
Stages::StorageHashing(cmd) => {
handle_stage!(dump_hashing_storage_stage, &tool, cmd, runtime.clone())
}
Stages::AccountHashing(cmd) => {
handle_stage!(dump_hashing_account_stage, &tool, cmd, runtime.clone())
}
Stages::StorageHashing(cmd) => handle_stage!(dump_hashing_storage_stage, &tool, cmd),
Stages::AccountHashing(cmd) => handle_stage!(dump_hashing_account_stage, &tool, cmd),
Stages::Merkle(cmd) => {
handle_stage!(dump_merkle_stage, &tool, cmd, evm_config, consensus)
handle_stage!(dump_merkle_stage, &tool, cmd, evm_config, consensus, runtime.clone())
}
}

View File

@@ -49,11 +49,12 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
N: CliNodeTypes<ChainSpec = C::ChainSpec>,
Comp: CliNodeComponents<N>,
{
let executor = ctx.task_executor.clone();
match self.command {
Subcommands::Run(command) => command.execute::<N, _, _>(ctx, components).await,
Subcommands::Drop(command) => command.execute::<N>().await,
Subcommands::Dump(command) => command.execute::<N, _, _>(components).await,
Subcommands::Unwind(command) => command.execute::<N, _, _>(components).await,
Subcommands::Drop(command) => command.execute::<N>(executor).await,
Subcommands::Dump(command) => command.execute::<N, _, _>(components, executor).await,
Subcommands::Unwind(command) => command.execute::<N, _, _>(components, executor).await,
}
}
}

View File

@@ -119,8 +119,9 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
// Does not do anything on windows.
let _ = fdlimit::raise_fd_limit();
let runtime = ctx.task_executor.clone();
let Environment { provider_factory, config, data_dir } =
self.env.init::<N>(AccessRights::RW)?;
self.env.init::<N>(AccessRights::RW, ctx.task_executor.clone())?;
let mut provider_rw = provider_factory.database_provider_rw()?;
let components = components(provider_factory.chain_spec());
@@ -171,6 +172,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
provider_factory.chain_spec(),
p2p_secret_key,
default_peers_path,
runtime.clone(),
)
.build(provider_factory.clone())
.start_network()
@@ -226,6 +228,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
provider_factory.chain_spec(),
p2p_secret_key,
default_peers_path,
runtime.clone(),
)
.build(provider_factory.clone())
.start_network()

View File

@@ -46,12 +46,14 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>, F, Comp>(
self,
components: F,
runtime: reth_tasks::Runtime,
) -> eyre::Result<()>
where
Comp: CliNodeComponents<N>,
F: FnOnce(Arc<C::ChainSpec>) -> Comp,
{
let Environment { provider_factory, config, .. } = self.env.init::<N>(AccessRights::RW)?;
let Environment { provider_factory, config, .. } =
self.env.init::<N>(AccessRights::RW, runtime)?;
let target = self.command.unwind_target(provider_factory.clone())?;

View File

@@ -47,6 +47,11 @@ impl CliRunner {
self
}
/// Returns a clone of the underlying [`Runtime`](reth_tasks::Runtime).
pub fn runtime(&self) -> reth_tasks::Runtime {
self.runtime.clone()
}
/// Executes an async block on the runtime and blocks until completion.
pub fn block_on<F, T>(&self, fut: F) -> T
where

View File

@@ -54,12 +54,20 @@ impl<T: PayloadTypes> PayloadTestContext<T> {
Ok(())
}
/// Wait until the best built payload is ready
/// Wait until the best built payload is ready.
///
/// Panics if the payload builder does not produce a non-empty payload within 30 seconds.
pub async fn wait_for_built_payload(&self, payload_id: PayloadId) {
let start = std::time::Instant::now();
loop {
let payload =
self.payload_builder.best_payload(payload_id).await.transpose().ok().flatten();
if payload.is_none_or(|p| p.block().body().transactions().is_empty()) {
assert!(
start.elapsed() < std::time::Duration::from_secs(30),
"timed out waiting for a non-empty payload for {payload_id} — \
check that the chain spec supports all generated tx types"
);
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
continue
}

View File

@@ -112,7 +112,7 @@ where
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
Wallet,
)> {
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
let runtime = Runtime::test();
let network_config = NetworkArgs {
discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },

View File

@@ -15,7 +15,6 @@ use reth_provider::{
};
use reth_rpc_server_types::RpcModuleSelection;
use reth_stages_types::StageId;
use reth_tasks::Runtime;
use std::{path::Path, sync::Arc};
use tempfile::TempDir;
use tracing::{debug, info, span, Level};
@@ -66,7 +65,7 @@ pub async fn setup_engine_with_chain_import(
+ Copy
+ 'static,
) -> eyre::Result<ChainImportResult> {
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
let runtime = reth_tasks::Runtime::test();
let network_config = NetworkArgs {
discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },
@@ -149,6 +148,7 @@ pub async fn setup_engine_with_chain_import(
&config,
evm_config,
consensus,
runtime.clone(),
)
.await?;
@@ -343,6 +343,7 @@ mod tests {
let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone());
// Use NoopConsensus to skip gas limit validation for test imports
let consensus = reth_consensus::noop::NoopConsensus::arc();
let runtime = reth_tasks::Runtime::test();
let result = import_blocks_from_file(
&rlp_path,
@@ -351,6 +352,7 @@ mod tests {
&config,
evm_config,
consensus,
runtime,
)
.await
.unwrap();
@@ -509,6 +511,7 @@ mod tests {
let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone());
// Use NoopConsensus to skip gas limit validation for test imports
let consensus = reth_consensus::noop::NoopConsensus::arc();
let runtime = reth_tasks::Runtime::test();
let result = import_blocks_from_file(
&rlp_path,
@@ -517,6 +520,7 @@ mod tests {
&config,
evm_config,
consensus,
runtime,
)
.await
.unwrap();

View File

@@ -3,7 +3,7 @@
use alloy_primitives::{TxHash, B256};
use alloy_rpc_types_engine::ForkchoiceState;
use eyre::OptionExt;
use futures_util::{stream::Fuse, StreamExt};
use futures_util::{stream::Fuse, Stream, StreamExt};
use reth_engine_primitives::ConsensusEngineHandle;
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_primitives::{
@@ -14,6 +14,7 @@ use reth_storage_api::BlockReader;
use reth_transaction_pool::TransactionPool;
use std::{
collections::VecDeque,
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
@@ -24,7 +25,6 @@ use tokio_stream::wrappers::ReceiverStream;
use tracing::error;
/// A mining mode for the local dev engine.
#[derive(Debug)]
pub enum MiningMode<Pool: TransactionPool + Unpin> {
/// In this mode a block is built as soon as
/// a valid transaction reaches the pool.
@@ -43,6 +43,25 @@ pub enum MiningMode<Pool: TransactionPool + Unpin> {
},
/// In this mode a block is built at a fixed interval.
Interval(Interval),
/// In this mode a block is built when the trigger stream yields a value.
///
/// This is a general-purpose trigger that can be fired on demand, for example via a channel
/// or any other [`Stream`] implementation.
Trigger(Pin<Box<dyn Stream<Item = ()> + Send + Sync>>),
}
impl<Pool: TransactionPool + Unpin> fmt::Debug for MiningMode<Pool> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Instant { max_transactions, accumulated, .. } => f
.debug_struct("Instant")
.field("max_transactions", max_transactions)
.field("accumulated", accumulated)
.finish(),
Self::Interval(interval) => f.debug_tuple("Interval").field(interval).finish(),
Self::Trigger(_) => f.debug_tuple("Trigger").finish(),
}
}
}
impl<Pool: TransactionPool + Unpin> MiningMode<Pool> {
@@ -57,6 +76,14 @@ impl<Pool: TransactionPool + Unpin> MiningMode<Pool> {
let start = tokio::time::Instant::now() + duration;
Self::Interval(tokio::time::interval_at(start, duration))
}
/// Constructor for a [`MiningMode::Trigger`]
///
/// Accepts any stream that yields `()` values, each of which triggers a new block to be
/// mined. This can be backed by a channel, a custom stream, or any other async source.
pub fn trigger(trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self {
Self::Trigger(Box::pin(trigger))
}
}
impl<Pool: TransactionPool + Unpin> Future for MiningMode<Pool> {
@@ -91,6 +118,12 @@ impl<Pool: TransactionPool + Unpin> Future for MiningMode<Pool> {
}
Poll::Pending
}
Self::Trigger(trigger) => {
if trigger.poll_next_unpin(cx).is_ready() {
return Poll::Ready(())
}
Poll::Pending
}
}
}
}

View File

@@ -32,28 +32,18 @@ fn default_account_worker_count() -> usize {
/// The size of proof targets chunk to spawn in one multiproof calculation.
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 60;
/// The size of proof targets chunk to spawn in one multiproof calculation when V2 proofs are
/// enabled. This is 4x the default chunk size to take advantage of more efficient V2 proof
/// computation.
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2: usize = DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE * 4;
/// The size of proof targets chunk optimized for small blocks (≤20M gas used).
/// Benchmarks: <https://gist.github.com/yongkangc/fda9c24846f0ba891376bcf81b002008>
pub const SMALL_BLOCK_MULTIPROOF_CHUNK_SIZE: usize = 30;
/// Gas threshold below which the small block chunk size is used.
pub const SMALL_BLOCK_GAS_THRESHOLD: u64 = 20_000_000;
/// Default number of reserved CPU cores for non-reth processes.
///
/// This will be deducted from the thread count of main reth global threadpool.
pub const DEFAULT_RESERVED_CPU_CORES: usize = 1;
/// Returns the default maximum concurrency for prewarm task based on available parallelism.
fn default_prewarm_max_concurrency() -> usize {
#[cfg(feature = "std")]
{
std::thread::available_parallelism().map_or(16, |n| n.get())
}
#[cfg(not(feature = "std"))]
{
16
}
}
/// Default depth for sparse trie pruning.
///
/// Nodes at this depth and below are converted to hash stubs to reduce memory.
@@ -161,20 +151,14 @@ pub struct TreeConfig {
/// where immediate payload regeneration is desired despite the head not changing or moving to
/// an ancestor.
always_process_payload_attributes_on_canonical_head: bool,
/// Maximum concurrency for the prewarm task.
prewarm_max_concurrency: usize,
/// Whether to unwind canonical header to ancestor during forkchoice updates.
allow_unwind_canonical_header: bool,
/// Number of storage proof worker threads.
storage_worker_count: usize,
/// Number of account proof worker threads.
account_worker_count: usize,
/// Whether to disable V2 storage proofs.
disable_proof_v2: bool,
/// Whether to disable cache metrics recording (can be expensive with large cached state).
disable_cache_metrics: bool,
/// Whether to disable sparse trie cache.
disable_trie_cache: bool,
/// Depth for sparse trie pruning after state root computation.
sparse_trie_prune_depth: usize,
/// Maximum number of storage tries to retain after pruning.
@@ -209,13 +193,10 @@ impl Default for TreeConfig {
precompile_cache_disabled: false,
state_root_fallback: false,
always_process_payload_attributes_on_canonical_head: false,
prewarm_max_concurrency: default_prewarm_max_concurrency(),
allow_unwind_canonical_header: false,
storage_worker_count: default_storage_worker_count(),
account_worker_count: default_account_worker_count(),
disable_proof_v2: false,
disable_cache_metrics: false,
disable_trie_cache: false,
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
disable_sparse_trie_cache_pruning: false,
@@ -246,11 +227,9 @@ impl TreeConfig {
precompile_cache_disabled: bool,
state_root_fallback: bool,
always_process_payload_attributes_on_canonical_head: bool,
prewarm_max_concurrency: usize,
allow_unwind_canonical_header: bool,
storage_worker_count: usize,
account_worker_count: usize,
disable_proof_v2: bool,
disable_cache_metrics: bool,
sparse_trie_prune_depth: usize,
sparse_trie_max_storage_tries: usize,
@@ -275,13 +254,10 @@ impl TreeConfig {
precompile_cache_disabled,
state_root_fallback,
always_process_payload_attributes_on_canonical_head,
prewarm_max_concurrency,
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
disable_proof_v2,
disable_cache_metrics,
disable_trie_cache: false,
sparse_trie_prune_depth,
sparse_trie_max_storage_tries,
disable_sparse_trie_cache_pruning: false,
@@ -324,16 +300,9 @@ impl TreeConfig {
self.multiproof_chunk_size
}
/// Return the multiproof task chunk size, using the V2 default if V2 proofs are enabled
/// and the chunk size is at the default value.
/// Return the effective multiproof task chunk size.
pub const fn effective_multiproof_chunk_size(&self) -> usize {
if !self.disable_proof_v2 &&
self.multiproof_chunk_size == DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE
{
DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2
} else {
self.multiproof_chunk_size
}
self.multiproof_chunk_size
}
/// Return the number of reserved CPU cores for non-reth processes
@@ -533,17 +502,6 @@ impl TreeConfig {
self.has_enough_parallelism && !self.legacy_state_root
}
/// Setter for prewarm max concurrency.
pub const fn with_prewarm_max_concurrency(mut self, prewarm_max_concurrency: usize) -> Self {
self.prewarm_max_concurrency = prewarm_max_concurrency;
self
}
/// Return the prewarm max concurrency.
pub const fn prewarm_max_concurrency(&self) -> usize {
self.prewarm_max_concurrency
}
/// Return the number of storage proof worker threads.
pub const fn storage_worker_count(&self) -> usize {
self.storage_worker_count
@@ -580,17 +538,6 @@ impl TreeConfig {
self
}
/// Return whether V2 storage proofs are disabled.
pub const fn disable_proof_v2(&self) -> bool {
self.disable_proof_v2
}
/// Setter for whether to disable V2 storage proofs.
pub const fn with_disable_proof_v2(mut self, disable_proof_v2: bool) -> Self {
self.disable_proof_v2 = disable_proof_v2;
self
}
/// Returns whether cache metrics recording is disabled.
pub const fn disable_cache_metrics(&self) -> bool {
self.disable_cache_metrics
@@ -602,17 +549,6 @@ impl TreeConfig {
self
}
/// Returns whether sparse trie cache is disabled.
pub const fn disable_trie_cache(&self) -> bool {
self.disable_trie_cache
}
/// Setter for whether to disable sparse trie cache.
pub const fn with_disable_trie_cache(mut self, value: bool) -> Self {
self.disable_trie_cache = value;
self
}
/// Returns the sparse trie prune depth.
pub const fn sparse_trie_prune_depth(&self) -> usize {
self.sparse_trie_prune_depth

View File

@@ -15,6 +15,7 @@ use futures::{future::Either, FutureExt, TryFutureExt};
use reth_errors::RethResult;
use reth_payload_builder_primitives::PayloadBuilderError;
use reth_payload_primitives::{EngineApiMessageVersion, PayloadTypes};
use std::time::Duration;
use tokio::sync::{mpsc::UnboundedSender, oneshot};
/// Type alias for backwards compat
@@ -142,6 +143,20 @@ impl Future for PendingPayloadId {
}
}
/// Timing breakdown for `reth_newPayload` responses.
#[derive(Debug, Clone, Copy)]
pub struct NewPayloadTimings {
/// Server-side execution latency.
pub latency: Duration,
/// Time spent waiting for persistence to complete.
/// `None` when no persistence was in-flight.
pub persistence_wait: Option<Duration>,
/// Time spent waiting for the execution cache lock.
pub execution_cache_wait: Duration,
/// Time spent waiting for the sparse trie lock.
pub sparse_trie_wait: Duration,
}
/// A message for the beacon engine from other components of the node (engine RPC API invoked by the
/// consensus layer).
#[derive(Debug)]
@@ -153,6 +168,16 @@ pub enum BeaconEngineMessage<Payload: PayloadTypes> {
/// The sender for returning payload status result.
tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
},
/// Message with new payload used by `reth_newPayload` endpoint.
///
/// Waits for persistence, execution cache, and sparse trie locks before processing,
/// and returns detailed timing breakdown alongside the payload status.
RethNewPayload {
/// The execution payload received by Engine API.
payload: Payload::ExecutionData,
/// The sender for returning payload status result and timing breakdown.
tx: oneshot::Sender<Result<(PayloadStatus, NewPayloadTimings), BeaconOnNewPayloadError>>,
},
/// Message with updated forkchoice state.
ForkchoiceUpdated {
/// The updated forkchoice state.
@@ -178,6 +203,15 @@ impl<Payload: PayloadTypes> Display for BeaconEngineMessage<Payload> {
payload.block_hash()
)
}
Self::RethNewPayload { payload, .. } => {
write!(
f,
"RethNewPayload(parent: {}, number: {}, hash: {})",
payload.parent_hash(),
payload.block_number(),
payload.block_hash()
)
}
Self::ForkchoiceUpdated { state, payload_attrs, .. } => {
// we don't want to print the entire payload attributes, because for OP this
// includes all txs
@@ -223,6 +257,19 @@ where
rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
}
/// Sends a new payload message used by `reth_newPayload` endpoint.
///
/// Waits for persistence, execution cache, and sparse trie locks before processing,
/// and returns detailed timing breakdown alongside the payload status.
pub async fn reth_new_payload(
&self,
payload: Payload::ExecutionData,
) -> Result<(PayloadStatus, NewPayloadTimings), BeaconOnNewPayloadError> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::RethNewPayload { payload, tx });
rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
}
/// Sends a forkchoice update message to the beacon consensus engine and waits for a response.
///
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>

View File

@@ -54,7 +54,6 @@ thiserror.workspace = true
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "sync", "macros"] }
fixed-cache.workspace = true
moka = { workspace = true, features = ["sync"] }
smallvec.workspace = true
# metrics
metrics.workspace = true
@@ -73,6 +72,7 @@ reth-prune-types = { workspace = true, optional = true }
reth-stages = { workspace = true, optional = true }
reth-static-file = { workspace = true, optional = true }
reth-tracing = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true }
[dev-dependencies]
# reth
@@ -143,6 +143,7 @@ test-utils = [
"reth-evm-ethereum/test-utils",
"reth-tasks/test-utils",
]
trie-debug = ["reth-trie-sparse/trie-debug", "dep:serde_json"]
rocksdb = [
"reth-provider/rocksdb",
"reth-prune/rocksdb",

View File

@@ -10,7 +10,7 @@
use futures::FutureExt;
use reth_provider::providers::ProviderNodeTypes;
use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult};
use reth_tasks::TaskSpawner;
use reth_tasks::Runtime;
use std::task::{ready, Context, Poll};
use tokio::sync::oneshot;
use tracing::trace;
@@ -80,7 +80,7 @@ pub enum BackfillEvent {
#[derive(Debug)]
pub struct PipelineSync<N: ProviderNodeTypes> {
/// The type that can spawn the pipeline task.
pipeline_task_spawner: Box<dyn TaskSpawner>,
pipeline_task_spawner: Runtime,
/// The current state of the pipeline.
/// The pipeline is used for large ranges.
pipeline_state: PipelineState<N>,
@@ -90,7 +90,7 @@ pub struct PipelineSync<N: ProviderNodeTypes> {
impl<N: ProviderNodeTypes> PipelineSync<N> {
/// Create a new instance.
pub fn new(pipeline: Pipeline<N>, pipeline_task_spawner: Box<dyn TaskSpawner>) -> Self {
pub fn new(pipeline: Pipeline<N>, pipeline_task_spawner: Runtime) -> Self {
Self {
pipeline_task_spawner,
pipeline_state: PipelineState::Idle(Some(Box::new(pipeline))),
@@ -140,10 +140,10 @@ impl<N: ProviderNodeTypes> PipelineSync<N> {
let pipeline = pipeline.take().expect("exists");
self.pipeline_task_spawner.spawn_critical_blocking_task(
"pipeline task",
Box::pin(async move {
async move {
let result = pipeline.run_as_fut(Some(target)).await;
let _ = tx.send(result);
}),
},
);
self.pipeline_state = PipelineState::Running(rx);
@@ -241,7 +241,7 @@ mod tests {
use reth_provider::test_utils::MockNodeTypesWithDB;
use reth_stages::ExecOutput;
use reth_stages_api::StageCheckpoint;
use reth_tasks::TokioTaskExecutor;
use reth_tasks::Runtime;
use std::{collections::VecDeque, future::poll_fn, sync::Arc};
struct TestHarness {
@@ -267,7 +267,7 @@ mod tests {
})]))
.build(chain_spec);
let pipeline_sync = PipelineSync::new(pipeline, Box::<TokioTaskExecutor>::default());
let pipeline_sync = PipelineSync::new(pipeline, Runtime::test());
let client = TestFullBlockClient::default();
let header = Header {
base_fee_per_gas: Some(7),

View File

@@ -10,7 +10,7 @@ use crate::{
download::BasicBlockDownloader,
engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler},
persistence::PersistenceHandle,
tree::{EngineApiTreeHandler, EngineValidator, TreeConfig},
tree::{EngineApiTreeHandler, EngineValidator, TreeConfig, WaitForCaches},
};
use futures::Stream;
use reth_consensus::FullConsensus;
@@ -25,7 +25,7 @@ use reth_provider::{
};
use reth_prune::PrunerWithFactory;
use reth_stages_api::{MetricEventsSender, Pipeline};
use reth_tasks::TaskSpawner;
use reth_tasks::Runtime;
use reth_trie_db::ChangesetCache;
use std::sync::Arc;
@@ -54,7 +54,7 @@ pub fn build_engine_orchestrator<N, Client, S, V, C>(
client: Client,
incoming_requests: S,
pipeline: Pipeline<N>,
pipeline_task_spawner: Box<dyn TaskSpawner>,
pipeline_task_spawner: Runtime,
provider: ProviderFactory<N>,
blockchain_db: BlockchainProvider<N>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
@@ -76,7 +76,7 @@ where
N: ProviderNodeTypes,
Client: BlockClient<Block = <N::Primitives as NodePrimitives>::Block> + 'static,
S: Stream<Item = BeaconEngineMessage<N::Payload>> + Send + Sync + Unpin + 'static,
V: EngineValidator<N::Payload>,
V: EngineValidator<N::Payload> + WaitForCaches,
C: ConfigureEvm<Primitives = N::Primitives> + 'static,
{
let downloader = BasicBlockDownloader::new(client, consensus.clone());

View File

@@ -4,7 +4,7 @@ use crossbeam_channel::Sender as CrossbeamSender;
use reth_chain_state::ExecutedBlock;
use reth_errors::ProviderError;
use reth_ethereum_primitives::EthPrimitives;
use reth_primitives_traits::NodePrimitives;
use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
use reth_provider::{
providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode,
@@ -18,7 +18,6 @@ use std::{
Arc,
},
thread::JoinHandle,
time::Instant,
};
use thiserror::Error;
use tracing::{debug, error, instrument};

View File

@@ -3,7 +3,7 @@ use alloy_primitives::{Address, StorageKey, StorageValue, B256};
use metrics::{Gauge, Histogram};
use reth_errors::ProviderResult;
use reth_metrics::Metrics;
use reth_primitives_traits::{Account, Bytecode};
use reth_primitives_traits::{Account, Bytecode, FastInstant as Instant};
use reth_provider::{
AccountReader, BlockHashReader, BytecodeReader, HashedPostStateProvider, StateProofProvider,
StateProvider, StateRootProvider, StorageRootProvider,
@@ -14,7 +14,7 @@ use reth_trie::{
};
use std::{
sync::atomic::{AtomicU64, Ordering},
time::{Duration, Instant},
time::Duration,
};
/// Nanoseconds per second

View File

@@ -8,9 +8,9 @@ use reth_metrics::{
metrics::{Counter, Gauge, Histogram},
Metrics,
};
use reth_primitives_traits::constants::gas_units::MEGAGAS;
use reth_primitives_traits::{constants::gas_units::MEGAGAS, FastInstant as Instant};
use reth_trie::updates::TrieUpdates;
use std::time::{Duration, Instant};
use std::time::Duration;
/// Upper bounds for each gas bucket. The last bucket is a catch-all for
/// everything above the final threshold: <5M, 5-10M, 10-20M, 20-30M, 30-40M, >40M.
@@ -34,6 +34,10 @@ pub struct EngineApiMetrics {
/// Metrics for EIP-7928 Block-Level Access Lists (BAL).
#[allow(dead_code)]
pub(crate) bal: BalMetrics,
/// Gas-bucketed execution sub-phase metrics.
pub(crate) execution_gas_buckets: ExecutionGasBucketMetrics,
/// Gas-bucketed block validation sub-phase metrics.
pub(crate) block_validation_gas_buckets: BlockValidationGasBucketMetrics,
}
impl EngineApiMetrics {
@@ -82,6 +86,22 @@ impl EngineApiMetrics {
self.executor.post_execution_histogram.record(elapsed);
}
/// Records execution duration into the gas-bucketed execution histogram.
pub fn record_block_execution_gas_bucket(&self, gas_used: u64, elapsed: Duration) {
let idx = GasBucketMetrics::bucket_index(gas_used);
self.execution_gas_buckets.buckets[idx]
.execution_gas_bucket_histogram
.record(elapsed.as_secs_f64());
}
/// Records state root duration into the gas-bucketed block validation histogram.
pub fn record_state_root_gas_bucket(&self, gas_used: u64, elapsed_secs: f64) {
let idx = GasBucketMetrics::bucket_index(gas_used);
self.block_validation_gas_buckets.buckets[idx]
.state_root_gas_bucket_histogram
.record(elapsed_secs);
}
/// Records the time spent waiting for the next transaction from the iterator.
pub fn record_transaction_wait(&self, elapsed: Duration) {
self.executor.transaction_wait_histogram.record(elapsed);
@@ -280,7 +300,8 @@ impl GasBucketMetrics {
.record(gas_used as f64 / elapsed.as_secs_f64());
}
fn bucket_index(gas_used: u64) -> usize {
/// Returns the bucket index for a given gas value.
pub(crate) fn bucket_index(gas_used: u64) -> usize {
GAS_BUCKET_THRESHOLDS
.iter()
.position(|&threshold| gas_used < threshold)
@@ -288,7 +309,7 @@ impl GasBucketMetrics {
}
/// Returns a human-readable label like `<5M`, `5-10M`, … `>40M`.
fn bucket_label(index: usize) -> String {
pub(crate) fn bucket_label(index: usize) -> String {
if index == 0 {
let hi = GAS_BUCKET_THRESHOLDS[0] / MEGAGAS;
format!("<{hi}M")
@@ -303,6 +324,56 @@ impl GasBucketMetrics {
}
}
/// Per-gas-bucket execution duration metric.
#[derive(Clone, Metrics)]
#[metrics(scope = "sync.execution")]
pub(crate) struct ExecutionGasBucketSeries {
/// Gas-bucketed EVM execution duration.
pub(crate) execution_gas_bucket_histogram: Histogram,
}
/// Holds pre-initialized [`ExecutionGasBucketSeries`] instances, one per gas bucket.
#[derive(Debug)]
pub(crate) struct ExecutionGasBucketMetrics {
buckets: [ExecutionGasBucketSeries; NUM_GAS_BUCKETS],
}
impl Default for ExecutionGasBucketMetrics {
fn default() -> Self {
Self {
buckets: std::array::from_fn(|i| {
let label = GasBucketMetrics::bucket_label(i);
ExecutionGasBucketSeries::new_with_labels(&[("gas_bucket", label)])
}),
}
}
}
/// Per-gas-bucket block validation metrics (state root).
#[derive(Clone, Metrics)]
#[metrics(scope = "sync.block_validation")]
pub(crate) struct BlockValidationGasBucketSeries {
/// Gas-bucketed state root computation duration.
pub(crate) state_root_gas_bucket_histogram: Histogram,
}
/// Holds pre-initialized [`BlockValidationGasBucketSeries`] instances, one per gas bucket.
#[derive(Debug)]
pub(crate) struct BlockValidationGasBucketMetrics {
buckets: [BlockValidationGasBucketSeries; NUM_GAS_BUCKETS],
}
impl Default for BlockValidationGasBucketMetrics {
fn default() -> Self {
Self {
buckets: std::array::from_fn(|i| {
let label = GasBucketMetrics::bucket_label(i);
BlockValidationGasBucketSeries::new_with_labels(&[("gas_bucket", label)])
}),
}
}
}
/// Metrics for engine newPayload responses.
#[derive(Metrics)]
#[metrics(scope = "consensus.engine.beacon")]

View File

@@ -19,7 +19,7 @@ use reth_chain_state::{
use reth_consensus::{Consensus, FullConsensus};
use reth_engine_primitives::{
BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
ForkchoiceStateTracker, OnForkChoiceUpdated,
ForkchoiceStateTracker, NewPayloadTimings, OnForkChoiceUpdated,
};
use reth_errors::{ConsensusError, ProviderResult};
use reth_evm::ConfigureEvm;
@@ -27,7 +27,9 @@ use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_primitives::{
BuiltPayload, EngineApiMessageVersion, NewPayloadError, PayloadBuilderAttributes, PayloadTypes,
};
use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
use reth_primitives_traits::{
FastInstant as Instant, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
};
use reth_provider::{
BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
@@ -40,7 +42,7 @@ use reth_tasks::spawn_os_thread;
use reth_trie_db::ChangesetCache;
use revm::interpreter::debug_unreachable;
use state::TreeState;
use std::{fmt::Debug, ops, sync::Arc, time::Instant};
use std::{fmt::Debug, ops, sync::Arc, time::Duration};
use crossbeam_channel::{Receiver, Sender};
use tokio::sync::{
@@ -321,7 +323,7 @@ where
+ StorageSettingsCache,
C: ConfigureEvm<Primitives = N> + 'static,
T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
V: EngineValidator<T>,
V: EngineValidator<T> + WaitForCaches,
{
/// Creates a new [`EngineApiTreeHandler`].
#[expect(clippy::too_many_arguments)]
@@ -1553,6 +1555,94 @@ where
// handle the event if any
self.on_maybe_tree_event(maybe_event)?;
}
BeaconEngineMessage::RethNewPayload { payload, tx } => {
// Before processing the new payload, we wait for persistence and
// cache updates to complete. We do it in parallel, spawning
// persistence and cache update wait tasks with Tokio, so that we
// can get an unbiased breakdown on how long did every step take.
//
// If we first wait for persistence, and only then for cache
// updates, we will offset the cache update waits by the duration of
// persistence, which is incorrect.
debug!(target: "engine::tree", "Waiting for persistence and caches in parallel before processing reth_newPayload");
let pending_persistence = self.persistence_state.rx.take();
let persistence_rx = if let Some((rx, start_time, _action)) =
pending_persistence
{
let (persistence_tx, persistence_rx) =
std::sync::mpsc::channel();
tokio::task::spawn_blocking(move || {
let start = Instant::now();
let result =
rx.recv().expect("persistence state channel closed");
let _ = persistence_tx.send((
result,
start_time,
start.elapsed(),
));
});
Some(persistence_rx)
} else {
None
};
let cache_wait = self.payload_validator.wait_for_caches();
let persistence_wait = if let Some(persistence_rx) = persistence_rx
{
let (result, start_time, wait_duration) = persistence_rx
.recv()
.expect("persistence result channel closed");
let _ = self.on_persistence_complete(result, start_time);
Some(wait_duration)
} else {
None
};
debug!(
target: "engine::tree",
?persistence_wait,
execution_cache_wait = ?cache_wait.execution_cache,
sparse_trie_wait = ?cache_wait.sparse_trie,
"Persistence finished and caches updated for reth_newPayload"
);
let start = Instant::now();
let gas_used = payload.gas_used();
let num_hash = payload.num_hash();
let mut output = self.on_new_payload(payload);
let latency = start.elapsed();
self.metrics.engine.new_payload.update_response_metrics(
start,
&mut self.metrics.engine.forkchoice_updated.latest_finish_at,
&output,
gas_used,
);
let maybe_event =
output.as_mut().ok().and_then(|out| out.event.take());
let timings = NewPayloadTimings {
latency,
persistence_wait,
execution_cache_wait: cache_wait.execution_cache,
sparse_trie_wait: cache_wait.sparse_trie,
};
if let Err(err) =
tx.send(output.map(|o| (o.outcome, timings)).map_err(|e| {
BeaconOnNewPayloadError::Internal(Box::new(e))
}))
{
error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
self.metrics
.engine
.failed_new_payload_response_deliveries
.increment(1);
}
self.on_maybe_tree_event(maybe_event)?;
}
}
}
}
@@ -3046,3 +3136,23 @@ enum PersistTarget {
/// Persist all blocks up to and including the canonical head.
Head,
}
/// Result of waiting for caches to become available.
#[derive(Debug, Clone, Copy, Default)]
pub struct CacheWaitDurations {
/// Time spent waiting for the execution cache lock.
pub execution_cache: Duration,
/// Time spent waiting for the sparse trie lock.
pub sparse_trie: Duration,
}
/// Trait for types that can wait for caches to become available.
///
/// This is used by `reth_newPayload` endpoint to ensure that payload processing
/// waits for any ongoing operations to complete before starting.
pub trait WaitForCaches {
/// Waits for cache updates to complete.
///
/// Returns the time spent waiting for each cache separately.
fn wait_for_caches(&self) -> CacheWaitDurations;
}

View File

@@ -2,13 +2,13 @@
use super::precompile_cache::PrecompileCacheMap;
use crate::tree::{
cached_state::{CachedStateMetrics, CachedStateProvider, ExecutionCache, SavedCache},
cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
payload_processor::{
prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
sparse_trie::StateRootComputeOutcome,
},
sparse_trie::{SparseTrieCacheTask, SparseTrieTask, SpawnedSparseTrieTask},
StateProviderBuilder, TreeConfig,
sparse_trie::SparseTrieCacheTask,
CacheWaitDurations, StateProviderBuilder, TreeConfig, WaitForCaches,
};
use alloy_eip7928::BlockAccessList;
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
@@ -16,10 +16,11 @@ use alloy_evm::block::StateChangeSource;
use alloy_primitives::B256;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use metrics::{Counter, Histogram};
use multiproof::{SparseTrieUpdate, *};
use multiproof::*;
use parking_lot::RwLock;
use prewarm::PrewarmMetrics;
use rayon::prelude::*;
use reth_engine_primitives::{SMALL_BLOCK_GAS_THRESHOLD, SMALL_BLOCK_MULTIPROOF_CHUNK_SIZE};
use reth_evm::{
block::ExecutableTxParts,
execute::{ExecutableTxFor, WithTxEnv},
@@ -27,10 +28,9 @@ use reth_evm::{
SpecFor, TxEnvFor,
};
use reth_metrics::Metrics;
use reth_primitives_traits::NodePrimitives;
use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
use reth_provider::{
BlockExecutionOutput, BlockReader, DatabaseProviderROFactory, StateProvider,
StateProviderFactory, StateReader,
BlockExecutionOutput, BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader,
};
use reth_revm::{db::BundleState, state::EvmState};
use reth_tasks::{ForEachOrdered, Runtime};
@@ -49,7 +49,7 @@ use std::{
mpsc::{self, channel},
Arc,
},
time::Instant,
time::Duration,
};
use tracing::{debug, debug_span, instrument, warn, Span};
@@ -132,8 +132,6 @@ where
/// re-use allocated memory. Stored with the block hash it was computed for to enable trie
/// preservation across sequential payload validations.
sparse_state_trie: SharedPreservedSparseTrie,
/// Maximum concurrency for prewarm task.
prewarm_max_concurrency: usize,
/// Sparse trie prune depth.
sparse_trie_prune_depth: usize,
/// Maximum storage tries to retain after pruning.
@@ -172,7 +170,6 @@ where
precompile_cache_disabled: config.precompile_cache_disabled(),
precompile_cache_map,
sparse_state_trie: SharedPreservedSparseTrie::default(),
prewarm_max_concurrency: config.prewarm_max_concurrency(),
sparse_trie_prune_depth: config.sparse_trie_prune_depth(),
sparse_trie_max_storage_tries: config.sparse_trie_max_storage_tries(),
disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
@@ -181,6 +178,46 @@ where
}
}
impl<Evm> WaitForCaches for PayloadProcessor<Evm>
where
Evm: ConfigureEvm,
{
fn wait_for_caches(&self) -> CacheWaitDurations {
debug!(target: "engine::tree::payload_processor", "Waiting for execution cache and sparse trie locks");
// Wait for both caches in parallel using std threads
let execution_cache = self.execution_cache.clone();
let sparse_trie = self.sparse_state_trie.clone();
// Use channels and spawn_blocking instead of std::thread::spawn
let (execution_tx, execution_rx) = std::sync::mpsc::channel();
let (sparse_trie_tx, sparse_trie_rx) = std::sync::mpsc::channel();
self.executor.spawn_blocking(move || {
let _ = execution_tx.send(execution_cache.wait_for_availability());
});
self.executor.spawn_blocking(move || {
let _ = sparse_trie_tx.send(sparse_trie.wait_for_availability());
});
let execution_cache_duration =
execution_rx.recv().expect("execution cache wait task failed to send result");
let sparse_trie_duration =
sparse_trie_rx.recv().expect("sparse trie wait task failed to send result");
debug!(
target: "engine::tree::payload_processor",
?execution_cache_duration,
?sparse_trie_duration,
"Execution cache and sparse trie locks acquired"
);
CacheWaitDurations {
execution_cache: execution_cache_duration,
sparse_trie: sparse_trie_duration,
}
}
}
impl<N, Evm> PayloadProcessor<Evm>
where
N: NodePrimitives,
@@ -211,7 +248,7 @@ where
///
/// ## Sparse trie task
///
/// Responsible for calculating the state root based on the received [`SparseTrieUpdate`].
/// Responsible for calculating the state root.
///
/// This task runs until there are no further updates to process.
///
@@ -238,6 +275,7 @@ where
F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
+ Clone
+ Send
+ Sync
+ 'static,
{
// start preparing transactions immediately
@@ -245,66 +283,34 @@ where
self.spawn_tx_iterator(transactions, env.transaction_count);
let span = Span::current();
let (to_sparse_trie, sparse_trie_rx) = channel();
let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded();
let v2_proofs_enabled = !config.disable_proof_v2();
let parent_state_root = env.parent_state_root;
let transaction_count = env.transaction_count;
let chunk_size = Self::adaptive_chunk_size(config, env.gas_used);
let prewarm_handle = self.spawn_caching_with(
env,
prewarm_rx,
provider_builder.clone(),
provider_builder,
Some(to_multi_proof.clone()),
bal,
v2_proofs_enabled,
);
// Create and spawn the storage proof task.
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
let halve_workers = transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD;
let proof_handle =
ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers, v2_proofs_enabled);
if config.disable_trie_cache() {
let multi_proof_task = MultiProofTask::new(
proof_handle.clone(),
to_sparse_trie,
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
to_multi_proof.clone(),
from_multi_proof.clone(),
)
.with_v2_proofs_enabled(v2_proofs_enabled);
// spawn multi-proof task
let parent_span = span.clone();
let saved_cache = prewarm_handle.saved_cache.clone();
self.executor.spawn_blocking(move || {
let _enter = parent_span.entered();
// Build a state provider for the multiproof task
let provider = provider_builder.build().expect("failed to build provider");
let provider = if let Some(saved_cache) = saved_cache {
let (cache, metrics, _disable_metrics) = saved_cache.split();
Box::new(CachedStateProvider::new(provider, cache, metrics))
as Box<dyn StateProvider>
} else {
Box::new(provider)
};
multi_proof_task.run(provider);
});
}
let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers);
// wire the sparse trie to the state root response receiver
let (state_root_tx, state_root_rx) = channel();
// Spawn the sparse trie task using any stored trie and parallel trie configuration.
self.spawn_sparse_trie_task(
sparse_trie_rx,
proof_handle,
state_root_tx,
from_multi_proof,
config,
parent_state_root,
chunk_size,
);
PayloadHandle {
@@ -332,9 +338,7 @@ where
{
let (prewarm_rx, execution_rx) =
self.spawn_tx_iterator(transactions, env.transaction_count);
// This path doesn't use multiproof, so V2 proofs flag doesn't matter
let prewarm_handle =
self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal, false);
let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal);
PayloadHandle {
to_multi_proof: None,
prewarm_handle,
@@ -348,19 +352,46 @@ where
/// produce fewer state changes and most workers would be idle overhead.
const SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD: usize = 30;
/// Transaction count threshold below which sequential signature recovery is used.
/// Transaction count threshold below which sequential conversion is used.
///
/// For blocks with fewer than this many transactions, the rayon parallel iterator overhead
/// (work-stealing setup, channel-based reorder) exceeds the cost of sequential ECDSA
/// recovery. Inspired by Nethermind's `RecoverSignature` which uses sequential `foreach`
/// for small blocks.
/// (work-stealing setup, channel-based reorder) exceeds the cost of sequential conversion.
/// Inspired by Nethermind's `RecoverSignature` which uses sequential `foreach` for small
/// blocks.
const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
/// Number of leading transactions to convert sequentially before entering the rayon
/// parallel path.
///
/// Rayon's work-stealing does not guarantee that index 0 is processed first, so the
/// ordered consumer can block for up to ~1ms waiting for the first slot. By converting
/// a small head sequentially and sending it immediately, execution can start without
/// waiting for rayon scheduling.
const PARALLEL_PREFETCH_COUNT: usize = 4;
/// Returns the multiproof chunk size adapted to the block's gas usage.
///
/// For blocks with ≤20M gas used, a smaller chunk size (30) yields better throughput.
/// For larger blocks, the configured default chunk size is used.
const fn adaptive_chunk_size(config: &TreeConfig, gas_used: u64) -> Option<usize> {
if !config.multiproof_chunking_enabled() {
return None;
}
let size = if gas_used > 0 && gas_used <= SMALL_BLOCK_GAS_THRESHOLD {
SMALL_BLOCK_MULTIPROOF_CHUNK_SIZE
} else {
config.multiproof_chunk_size()
};
Some(size)
}
/// Spawns a task advancing transaction env iterator and streaming updates through a channel.
///
/// For blocks with fewer than [`Self::SMALL_BLOCK_TX_THRESHOLD`] transactions, uses
/// sequential iteration to avoid rayon overhead. For larger blocks, uses rayon parallel
/// iteration with [`ForEachOrdered`] to recover signatures in parallel while streaming
/// iteration with [`ForEachOrdered`] to convert transactions in parallel while streaming
/// results to execution in the original transaction order.
#[expect(clippy::type_complexity)]
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
@@ -369,7 +400,7 @@ where
transactions: I,
transaction_count: usize,
) -> (
mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Recovered>>,
mpsc::Receiver<(usize, WithTxEnv<TxEnvFor<Evm>, I::Recovered>)>,
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
) {
let (prewarm_tx, prewarm_rx) = mpsc::sync_channel(transaction_count);
@@ -379,40 +410,47 @@ where
// Empty block — nothing to do.
} else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
// Sequential path for small blocks — avoids rayon work-stealing setup and
// channel-based reorder overhead when it costs more than the ECDSA recovery itself.
// channel-based reorder overhead when it costs more than sequential conversion.
debug!(
target: "engine::tree::payload_processor",
transaction_count,
"using sequential sig recovery for small block"
);
self.executor.spawn_blocking(move || {
let _enter =
debug_span!(target: "engine::tree::payload_processor", "tx_iterator").entered();
let (transactions, convert) = transactions.into_parts();
for tx in transactions {
let tx = convert.convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
let _ = execute_tx.send(tx);
}
convert_serial(transactions.into_iter(), &convert, &prewarm_tx, &execute_tx);
});
} else {
// Parallel path — recover signatures in parallel on rayon, stream results
// to execution in order via `for_each_ordered`.
rayon::spawn(move || {
//
// To avoid a ~1ms stall waiting for rayon to schedule index 0, the first
// few transactions are recovered sequentially and sent immediately before
// entering the parallel iterator for the remainder.
let prefetch = Self::PARALLEL_PREFETCH_COUNT.min(transaction_count);
self.executor.spawn_blocking(move || {
let _enter =
debug_span!(target: "engine::tree::payload_processor", "tx_iterator").entered();
let (transactions, convert) = transactions.into_parts();
transactions
.into_par_iter()
.map(|tx| {
let mut all: Vec<_> = transactions.into_iter().collect();
let rest = all.split_off(prefetch.min(all.len()));
// Convert the first few transactions sequentially so execution can
// start immediately without waiting for rayon work-stealing.
convert_serial(all.into_iter(), &convert, &prewarm_tx, &execute_tx);
// Convert the remaining transactions in parallel.
rest.into_par_iter()
.enumerate()
.map(|(i, tx)| {
let idx = i + prefetch;
let tx = convert.convert(tx);
tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
let tx = WithTxEnv { tx_env, tx: Arc::new(tx) };
// Send to prewarming out of order — order doesn't matter there.
let _ = prewarm_tx.send(tx.clone());
let _ = prewarm_tx.send((idx, tx.clone()));
tx
})
})
@@ -430,16 +468,15 @@ where
level = "debug",
target = "engine::tree::payload_processor",
skip_all,
fields(bal=%bal.is_some(), %v2_proofs_enabled)
fields(bal=%bal.is_some())
)]
fn spawn_caching_with<P>(
&self,
env: ExecutionEnv<Evm>,
transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
transactions: mpsc::Receiver<(usize, impl ExecutableTxFor<Evm> + Clone + Send + 'static)>,
provider_builder: StateProviderBuilder<N, P>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
bal: Option<Arc<BlockAccessList>>,
v2_proofs_enabled: bool,
) -> CacheTaskHandle<N::Receipt>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
@@ -459,7 +496,6 @@ where
terminate_execution: Arc::new(AtomicBool::new(false)),
precompile_cache_disabled: self.precompile_cache_disabled,
precompile_cache_map: self.precompile_cache_map.clone(),
v2_proofs_enabled,
};
let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
@@ -467,12 +503,11 @@ where
self.execution_cache.clone(),
prewarm_ctx,
to_multi_proof,
self.prewarm_max_concurrency,
);
{
let to_prewarm_task = to_prewarm_task.clone();
self.executor.spawn_blocking(move || {
self.executor.spawn_blocking_named("prewarm", move || {
let mode = if skip_prewarm {
PrewarmMode::Skipped
} else if let Some(bal) = bal {
@@ -507,30 +542,26 @@ where
}
}
/// Spawns the [`SparseTrieTask`] for this payload processor.
/// Spawns the [`SparseTrieCacheTask`] for this payload processor.
///
/// The trie is preserved when the new payload is a child of the previous one.
fn spawn_sparse_trie_task(
&self,
sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
proof_worker_handle: ProofWorkerHandle,
state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
from_multi_proof: CrossbeamReceiver<MultiProofMessage>,
config: &TreeConfig,
parent_state_root: B256,
chunk_size: Option<usize>,
) {
let preserved_sparse_trie = self.sparse_state_trie.clone();
let trie_metrics = self.trie_metrics.clone();
let disable_trie_cache = config.disable_trie_cache();
let prune_depth = self.sparse_trie_prune_depth;
let max_storage_tries = self.sparse_trie_max_storage_tries;
let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
let chunk_size =
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size());
let executor = self.executor.clone();
let parent_span = Span::current();
self.executor.spawn_blocking(move || {
self.executor.spawn_blocking_named("sparse-trie", move || {
let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
.entered();
@@ -562,23 +593,14 @@ where
.with_updates(true)
});
let mut task = if disable_trie_cache {
SpawnedSparseTrieTask::Cleared(SparseTrieTask::new(
sparse_trie_rx,
proof_worker_handle,
trie_metrics.clone(),
sparse_state_trie,
))
} else {
SpawnedSparseTrieTask::Cached(SparseTrieCacheTask::new_with_trie(
&executor,
from_multi_proof,
proof_worker_handle,
trie_metrics.clone(),
sparse_state_trie.with_skip_proof_node_filtering(true),
chunk_size,
))
};
let mut task = SparseTrieCacheTask::new_with_trie(
&executor,
from_multi_proof,
proof_worker_handle,
trie_metrics.clone(),
sparse_state_trie.with_skip_proof_node_filtering(true),
chunk_size,
);
let result = task.run();
// Capture the computed state_root before sending the result
@@ -615,7 +637,7 @@ where
let _enter =
debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
let deferred = if let Some(state_root) = computed_state_root {
let start = std::time::Instant::now();
let start = Instant::now();
let (trie, deferred) = task.into_trie_for_reuse(
prune_depth,
max_storage_tries,
@@ -697,6 +719,30 @@ where
}
}
/// Converts transactions sequentially and sends them to the prewarm and execute channels.
fn convert_serial<RawTx, Tx, TxEnv, InnerTx, Recovered, Err, C>(
iter: impl Iterator<Item = RawTx>,
convert: &C,
prewarm_tx: &mpsc::SyncSender<(usize, WithTxEnv<TxEnv, Recovered>)>,
execute_tx: &mpsc::SyncSender<Result<WithTxEnv<TxEnv, Recovered>, Err>>,
) where
Tx: ExecutableTxParts<TxEnv, InnerTx, Recovered = Recovered>,
TxEnv: Clone,
C: ConvertTx<RawTx, Tx = Tx, Error = Err>,
{
for (idx, raw_tx) in iter.enumerate() {
let tx = convert.convert(raw_tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
if let Ok(tx) = &tx {
let _ = prewarm_tx.send((idx, tx.clone()));
}
let _ = execute_tx.send(tx);
}
}
/// Handle to all the spawned tasks.
///
/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
@@ -949,6 +995,27 @@ impl PayloadExecutionCache {
self.inner.write().take();
}
/// Waits until the execution cache becomes available for use.
///
/// This acquires a write lock to ensure exclusive access, then immediately releases it.
/// This is useful for synchronization before starting payload processing.
///
/// Returns the time spent waiting for the lock.
pub fn wait_for_availability(&self) -> Duration {
let start = Instant::now();
// Acquire write lock to wait for any current holders to finish
let _guard = self.inner.write();
let elapsed = start.elapsed();
if elapsed.as_millis() > 5 {
debug!(
target: "engine::tree::payload_processor",
blocked_for=?elapsed,
"Waited for execution cache to become available"
);
}
elapsed
}
/// Updates the cache with a closure that has exclusive access to the guard.
/// This ensures that all cache operations happen atomically.
///
@@ -999,6 +1066,9 @@ pub struct ExecutionEnv<Evm: ConfigureEvm> {
/// Used to determine parallel worker count for prewarming.
/// A value of 0 indicates the count is unknown.
pub transaction_count: usize,
/// Total gas used by all transactions in the block.
/// Used to adaptively select multiproof chunk size for optimal throughput.
pub gas_used: u64,
/// Withdrawals included in the block.
/// Used to generate prefetch targets for withdrawal addresses.
pub withdrawals: Option<Vec<Withdrawal>>,
@@ -1015,6 +1085,7 @@ where
parent_hash: Default::default(),
parent_state_root: Default::default(),
transaction_count: 0,
gas_used: 0,
withdrawals: None,
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -3,7 +3,7 @@
use alloy_primitives::B256;
use parking_lot::Mutex;
use reth_trie_sparse::SparseStateTrie;
use std::sync::Arc;
use std::{sync::Arc, time::Instant};
use tracing::debug;
/// Type alias for the sparse trie type used in preservation.
@@ -12,7 +12,7 @@ pub(super) type SparseTrie = SparseStateTrie;
/// Shared handle to a preserved sparse trie that can be reused across payload validations.
///
/// This is stored in [`PayloadProcessor`](super::PayloadProcessor) and cloned to pass to
/// [`SparseTrieTask`](super::sparse_trie::SparseTrieTask) for trie reuse.
/// [`SparseTrieCacheTask`](super::sparse_trie::SparseTrieCacheTask) for trie reuse.
#[derive(Debug, Default, Clone)]
pub(super) struct SharedPreservedSparseTrie(Arc<Mutex<Option<PreservedSparseTrie>>>);
@@ -28,6 +28,27 @@ impl SharedPreservedSparseTrie {
pub(super) fn lock(&self) -> PreservedTrieGuard<'_> {
PreservedTrieGuard(self.0.lock())
}
/// Waits until the sparse trie lock becomes available.
///
/// This acquires and immediately releases the lock, ensuring that any
/// ongoing operations complete before returning. Useful for synchronization
/// before starting payload processing.
///
/// Returns the time spent waiting for the lock.
pub(super) fn wait_for_availability(&self) -> std::time::Duration {
let start = Instant::now();
let _guard = self.0.lock();
let elapsed = start.elapsed();
if elapsed.as_millis() > 5 {
debug!(
target: "engine::tree::payload_processor",
blocked_for=?elapsed,
"Waited for preserved sparse trie to become available"
);
}
elapsed
}
}
/// Guard that holds the lock on the preserved trie.

View File

@@ -13,11 +13,7 @@
use crate::tree::{
cached_state::{CachedStateProvider, SavedCache},
payload_processor::{
bal::{self, total_slots, BALSlotIter},
multiproof::{MultiProofMessage, VersionedMultiProofTargets},
PayloadExecutionCache,
},
payload_processor::{bal, multiproof::MultiProofMessage, PayloadExecutionCache},
precompile_cache::{CachedPrecompile, PrecompileCacheMap},
ExecutionEnv, StateProviderBuilder,
};
@@ -25,35 +21,32 @@ use alloy_consensus::transaction::TxHashRef;
use alloy_eip7928::BlockAccessList;
use alloy_eips::eip4895::Withdrawal;
use alloy_evm::Database;
use alloy_primitives::{keccak256, map::B256Set, B256};
use alloy_primitives::{keccak256, StorageKey, B256};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use metrics::{Counter, Gauge, Histogram};
use rayon::prelude::*;
use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, RecoveredTx, SpecFor};
use reth_metrics::Metrics;
use reth_primitives_traits::NodePrimitives;
use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
use reth_provider::{
AccountReader, BlockExecutionOutput, BlockReader, StateProvider, StateProviderFactory,
StateReader,
};
use reth_revm::{database::StateProviderDatabase, state::EvmState};
use reth_tasks::Runtime;
use reth_trie::MultiProofTargets;
use std::{
ops::Range,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{self, channel, Receiver, Sender, SyncSender},
Arc,
},
time::Instant,
use reth_trie_parallel::targets_v2::MultiProofTargetsV2;
use std::sync::{
atomic::{AtomicBool, Ordering},
mpsc::{self, channel, Receiver, Sender, SyncSender},
Arc,
};
use tracing::{debug, debug_span, instrument, trace, warn, Span};
/// Determines the prewarming mode: transaction-based, BAL-based, or skipped.
#[derive(Debug)]
pub enum PrewarmMode<Tx> {
/// Prewarm by executing transactions from a stream.
Transactions(Receiver<Tx>),
/// Prewarm by executing transactions from a stream, each paired with its block index.
Transactions(Receiver<(usize, Tx)>),
/// Prewarm by prefetching slots from a Block Access List.
BlockAccessList(Arc<BlockAccessList>),
/// Transaction prewarming is skipped (e.g. small blocks where the overhead exceeds the
@@ -86,8 +79,6 @@ where
execution_cache: PayloadExecutionCache,
/// Context provided to execution tasks
ctx: PrewarmContext<N, P, Evm>,
/// How many transactions should be executed in parallel
max_concurrency: usize,
/// Sender to emit evm state outcome messages, if any.
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
/// Receiver for events produced by tx execution
@@ -108,13 +99,12 @@ where
execution_cache: PayloadExecutionCache,
ctx: PrewarmContext<N, P, Evm>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
max_concurrency: usize,
) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
let (actions_tx, actions_rx) = channel();
trace!(
target: "engine::tree::payload_processor::prewarm",
max_concurrency,
prewarming_threads = executor.prewarming_pool().current_num_threads(),
transaction_count = ctx.env.transaction_count,
"Initialized prewarm task"
);
@@ -124,7 +114,6 @@ where
executor,
execution_cache,
ctx,
max_concurrency,
to_multi_proof,
actions_rx,
parent_span: Span::current(),
@@ -140,7 +129,7 @@ where
/// subsequent transactions in the block.
fn spawn_all<Tx>(
&self,
pending: mpsc::Receiver<Tx>,
pending: mpsc::Receiver<(usize, Tx)>,
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
) where
@@ -148,20 +137,18 @@ where
{
let executor = self.executor.clone();
let ctx = self.ctx.clone();
let max_concurrency = self.max_concurrency;
let span = Span::current();
self.executor.spawn_blocking(move || {
self.executor.spawn_blocking_named("prewarm-spawn", move || {
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
// When transaction_count is 0, it means the count is unknown. In this case, spawn
// max workers to handle potentially many transactions in parallel rather
// than bottlenecking on a single worker.
let transaction_count = ctx.env.transaction_count;
let workers_needed = if transaction_count == 0 {
max_concurrency
let pool_threads = executor.prewarming_pool().current_num_threads();
// Don't spawn more workers than transactions. When transaction_count is 0
// (unknown), use all pool threads.
let workers_needed = if ctx.env.transaction_count > 0 {
ctx.env.transaction_count.min(pool_threads)
} else {
transaction_count.min(max_concurrency)
pool_threads
};
let (done_tx, done_rx) = mpsc::sync_channel(workers_needed);
@@ -170,8 +157,8 @@ where
let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, to_multi_proof.clone(), done_tx.clone());
// Distribute transactions to workers
let mut tx_index = 0usize;
while let Ok(tx) = pending.recv() {
let mut tx_count = 0usize;
while let Ok((tx_index, tx)) = pending.recv() {
// Stop distributing if termination was requested
if ctx.terminate_execution.load(Ordering::Relaxed) {
trace!(
@@ -188,7 +175,7 @@ where
// exit early when signaled.
let _ = tx_sender.send(indexed_tx);
tx_index += 1;
tx_count += 1;
}
// Send withdrawal prefetch targets after all transactions have been distributed
@@ -196,9 +183,9 @@ where
&& let Some(withdrawals) = &ctx.env.withdrawals
&& !withdrawals.is_empty()
{
let targets =
multiproof_targets_from_withdrawals(withdrawals, ctx.v2_proofs_enabled);
let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
let targets = multiproof_targets_from_withdrawals(withdrawals);
let _ = to_multi_proof
.send(MultiProofMessage::PrefetchProofs(targets));
}
// drop sender and wait for all tasks to finish
@@ -207,7 +194,7 @@ where
while done_rx.recv().is_ok() {}
let _ = actions_tx
.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_index });
.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_count });
});
}
@@ -274,10 +261,8 @@ where
}
}
/// Runs BAL-based prewarming by spawning workers to prefetch storage slots.
///
/// Divides the total slots across `max_concurrency` workers, each responsible for
/// prefetching a range of slots from the BAL.
/// Runs BAL-based prewarming by using the prewarming pool's parallel iterator to prefetch
/// accounts and storage slots.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn run_bal_prewarm(
&self,
@@ -296,59 +281,35 @@ where
return;
}
let total_slots = total_slots(&bal);
trace!(
target: "engine::tree::payload_processor::prewarm",
total_slots,
max_concurrency = self.max_concurrency,
"Starting BAL prewarm"
);
if total_slots == 0 {
if bal.is_empty() {
self.send_bal_hashed_state(&bal);
let _ =
actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
return;
}
// Calculate number of workers needed (at most max_concurrency)
let workers_needed = total_slots.min(self.max_concurrency);
trace!(
target: "engine::tree::payload_processor::prewarm",
accounts = bal.len(),
"Starting BAL prewarm"
);
let (done_tx, done_rx) = mpsc::sync_channel(workers_needed);
// Calculate slots per worker
let slots_per_worker = total_slots / workers_needed;
let remainder = total_slots % workers_needed;
// Spawn workers with their assigned ranges
for i in 0..workers_needed {
let start = i * slots_per_worker + i.min(remainder);
let extra = if i < remainder { 1 } else { 0 };
let end = start + slots_per_worker + extra;
self.ctx.spawn_bal_worker(
i,
&self.executor,
Arc::clone(&bal),
start..end,
done_tx.clone(),
let ctx = self.ctx.clone();
self.executor.prewarming_pool().install_fn(|| {
bal.par_iter().for_each_init(
|| (ctx.clone(), None::<CachedStateProvider<reth_provider::StateProviderBox>>),
|(ctx, provider), account| {
if ctx.terminate_execution.load(Ordering::Relaxed) {
return;
}
ctx.prefetch_bal_account(provider, account);
},
);
}
// Drop our handle to done_tx so we can detect completion
drop(done_tx);
// Wait for all workers to complete
let mut completed_workers = 0;
while done_rx.recv().is_ok() {
completed_workers += 1;
}
});
trace!(
target: "engine::tree::payload_processor::prewarm",
completed_workers,
"All BAL prewarm workers completed"
"All BAL prewarm accounts completed"
);
// Convert BAL to HashedPostState and send to multiproof task
@@ -491,8 +452,6 @@ where
pub precompile_cache_disabled: bool,
/// The precompile cache map.
pub precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
/// Whether V2 proof calculation is enabled.
pub v2_proofs_enabled: bool,
}
impl<N, P, Evm> PrewarmContext<N, P, Evm>
@@ -501,12 +460,9 @@ where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
Evm: ConfigureEvm<Primitives = N> + 'static,
{
/// Splits this context into an evm, an evm config, metrics, the atomic bool for terminating
/// execution, and whether V2 proofs are enabled.
/// Splits this context into an evm, metrics, and the atomic bool for terminating execution.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn evm_for_ctx(
self,
) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>, bool)> {
fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>)> {
let Self {
env,
evm_config,
@@ -516,7 +472,6 @@ where
terminate_execution,
precompile_cache_disabled,
precompile_cache_map,
v2_proofs_enabled,
} = self;
let mut state_provider = match provider.build() {
@@ -567,7 +522,7 @@ where
});
}
Some((evm, metrics, terminate_execution, v2_proofs_enabled))
Some((evm, metrics, terminate_execution))
}
/// Accepts a [`CrossbeamReceiver`] of transactions and a handle to prewarm task. Executes
@@ -580,7 +535,6 @@ where
///
/// Note: There are no ordering guarantees; this does not reflect the state produced by
/// sequential execution.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn transact_batch<Tx>(
self,
txs: CrossbeamReceiver<IndexedTransaction<Tx>>,
@@ -589,16 +543,13 @@ where
) where
Tx: ExecutableTxFor<Evm>,
{
let Some((mut evm, metrics, terminate_execution, v2_proofs_enabled)) = self.evm_for_ctx()
else {
return
};
let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };
while let Ok(IndexedTransaction { index, tx }) = txs.recv() {
let _enter = debug_span!(
target: "engine::tree::payload_processor::prewarm",
"prewarm tx",
index,
i=index,
)
.entered();
@@ -637,8 +588,7 @@ where
// Only send outcome for transactions after the first txn
// as the main execution will be just as fast
if index > 0 {
let (targets, storage_targets) =
multiproof_targets_from_state(res.state, v2_proofs_enabled);
let (targets, storage_targets) = multiproof_targets_from_state(res.state);
metrics.prefetch_storage_targets.record(storage_targets as f64);
if let Some(to_multi_proof) = &to_multi_proof {
let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
@@ -668,171 +618,72 @@ where
let (tx_sender, tx_receiver) = crossbeam_channel::unbounded();
// Spawn workers that all pull from the shared receiver
let executor = task_executor.clone();
let span = Span::current();
task_executor.spawn_blocking(move || {
let _enter = span.entered();
for idx in 0..workers_needed {
let ctx = self.clone();
let to_multi_proof = to_multi_proof.clone();
let done_tx = done_tx.clone();
let rx = tx_receiver.clone();
let span = debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
executor.spawn_blocking(move || {
let _enter = span.entered();
ctx.transact_batch(rx, to_multi_proof, done_tx);
});
}
});
for idx in 0..workers_needed {
let ctx = self.clone();
let to_multi_proof = to_multi_proof.clone();
let done_tx = done_tx.clone();
let rx = tx_receiver.clone();
let span = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: &span, "prewarm_worker", idx);
task_executor.prewarming_pool().spawn(move || {
let _enter = span.entered();
ctx.transact_batch(rx, to_multi_proof, done_tx);
});
}
tx_sender
}
/// Spawns a worker task for BAL slot prefetching.
/// Prefetches a single account and all its storage slots from the BAL into the cache.
///
/// The worker iterates over the specified range of slots in the BAL and ensures
/// each slot is loaded into the cache by accessing it through the state provider.
fn spawn_bal_worker(
/// The `provider` is lazily initialized on first call and reused across accounts on the same
/// thread.
fn prefetch_bal_account(
&self,
idx: usize,
executor: &Runtime,
bal: Arc<BlockAccessList>,
range: Range<usize>,
done_tx: SyncSender<()>,
provider: &mut Option<CachedStateProvider<reth_provider::StateProviderBox>>,
account: &alloy_eip7928::AccountChanges,
) {
let ctx = self.clone();
let span = debug_span!(
target: "engine::tree::payload_processor::prewarm",
"bal prewarm worker",
idx,
range_start = range.start,
range_end = range.end
);
executor.spawn_blocking(move || {
let _enter = span.entered();
ctx.prefetch_bal_slots(bal, range, done_tx);
});
}
/// Prefetches storage slots from a BAL range into the cache.
///
/// This iterates through the specified range of slots and accesses them via the state
/// provider to populate the cache.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn prefetch_bal_slots(
self,
bal: Arc<BlockAccessList>,
range: Range<usize>,
done_tx: SyncSender<()>,
) {
let Self { saved_cache, provider, metrics, .. } = self;
// Build state provider
let state_provider = match provider.build() {
Ok(provider) => provider,
Err(err) => {
trace!(
target: "engine::tree::payload_processor::prewarm",
%err,
"Failed to build state provider in BAL prewarm thread"
);
let _ = done_tx.send(());
return;
let state_provider = match provider {
Some(p) => p,
slot @ None => {
let built = match self.provider.build() {
Ok(p) => p,
Err(err) => {
trace!(
target: "engine::tree::payload_processor::prewarm",
%err,
"Failed to build state provider in BAL prewarm thread"
);
return;
}
};
let saved_cache =
self.saved_cache.as_ref().expect("BAL prewarm should only run with cache");
let caches = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
slot.insert(CachedStateProvider::new(built, caches, cache_metrics))
}
};
// Wrap with cache (guaranteed to be Some since run_bal_prewarm checks)
let saved_cache = saved_cache.expect("BAL prewarm should only run with cache");
let caches = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
let state_provider = CachedStateProvider::new(state_provider, caches, cache_metrics);
let start = Instant::now();
// Track last seen address to avoid fetching the same account multiple times.
let mut last_address = None;
let _ = state_provider.basic_account(&account.address);
// Iterate through the assigned range of slots
for (address, slot) in BALSlotIter::new(&bal, range.clone()) {
// Fetch the account if this is a different address than the last one
if last_address != Some(address) {
let _ = state_provider.basic_account(&address);
last_address = Some(address);
}
// Access the slot to populate the cache
let _ = state_provider.storage(address, slot);
for slot in &account.storage_changes {
let _ = state_provider.storage(account.address, StorageKey::from(slot.slot));
}
for &slot in &account.storage_reads {
let _ = state_provider.storage(account.address, StorageKey::from(slot));
}
let elapsed = start.elapsed();
trace!(
target: "engine::tree::payload_processor::prewarm",
?range,
elapsed_ms = elapsed.as_millis(),
"BAL prewarm worker completed"
);
// Signal completion
let _ = done_tx.send(());
metrics.bal_slot_iteration_duration.record(elapsed.as_secs_f64());
self.metrics.bal_slot_iteration_duration.record(start.elapsed().as_secs_f64());
}
}
/// Returns a set of [`VersionedMultiProofTargets`] and the total amount of storage targets, based
/// on the given state.
fn multiproof_targets_from_state(
state: EvmState,
v2_enabled: bool,
) -> (VersionedMultiProofTargets, usize) {
if v2_enabled {
multiproof_targets_v2_from_state(state)
} else {
multiproof_targets_legacy_from_state(state)
}
}
/// Returns legacy [`MultiProofTargets`] and the total amount of storage targets, based on the
/// Returns a set of [`MultiProofTargetsV2`] and the total amount of storage targets, based on the
/// given state.
fn multiproof_targets_legacy_from_state(state: EvmState) -> (VersionedMultiProofTargets, usize) {
let mut targets = MultiProofTargets::with_capacity(state.len());
let mut storage_targets = 0;
for (addr, account) in state {
// if the account was not touched, or if the account was selfdestructed, do not
// fetch proofs for it
//
// Since selfdestruct can only happen in the same transaction, we can skip
// prefetching proofs for selfdestructed accounts
//
// See: https://eips.ethereum.org/EIPS/eip-6780
if !account.is_touched() || account.is_selfdestructed() {
continue
}
let mut storage_set =
B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
for (key, slot) in account.storage {
// do nothing if unchanged
if !slot.is_changed() {
continue
}
storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
}
storage_targets += storage_set.len();
targets.insert(keccak256(addr), storage_set);
}
(VersionedMultiProofTargets::Legacy(targets), storage_targets)
}
/// Returns V2 [`reth_trie_parallel::targets_v2::MultiProofTargetsV2`] and the total amount of
/// storage targets, based on the given state.
fn multiproof_targets_v2_from_state(state: EvmState) -> (VersionedMultiProofTargets, usize) {
fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargetsV2, usize) {
use reth_trie::proof_v2;
use reth_trie_parallel::targets_v2::MultiProofTargetsV2;
let mut targets = MultiProofTargetsV2::default();
let mut storage_target_count = 0;
@@ -868,27 +719,17 @@ fn multiproof_targets_v2_from_state(state: EvmState) -> (VersionedMultiProofTarg
}
}
(VersionedMultiProofTargets::V2(targets), storage_target_count)
(targets, storage_target_count)
}
/// Returns [`VersionedMultiProofTargets`] for withdrawal addresses.
/// Returns [`MultiProofTargetsV2`] for withdrawal addresses.
///
/// Withdrawals only modify account balances (no storage), so the targets contain
/// only account-level entries with empty storage sets.
fn multiproof_targets_from_withdrawals(
withdrawals: &[Withdrawal],
v2_enabled: bool,
) -> VersionedMultiProofTargets {
use reth_trie_parallel::targets_v2::MultiProofTargetsV2;
if v2_enabled {
VersionedMultiProofTargets::V2(MultiProofTargetsV2 {
account_targets: withdrawals.iter().map(|w| keccak256(w.address).into()).collect(),
..Default::default()
})
} else {
VersionedMultiProofTargets::Legacy(
withdrawals.iter().map(|w| (keccak256(w.address), Default::default())).collect(),
)
fn multiproof_targets_from_withdrawals(withdrawals: &[Withdrawal]) -> MultiProofTargetsV2 {
MultiProofTargetsV2 {
account_targets: withdrawals.iter().map(|w| keccak256(w.address).into()).collect(),
..Default::default()
}
}

View File

@@ -3,212 +3,35 @@
use crate::tree::{
multiproof::{
dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
VersionedMultiProofTargets, DEFAULT_MAX_TARGETS_FOR_CHUNKING,
DEFAULT_MAX_TARGETS_FOR_CHUNKING,
},
payload_processor::multiproof::{MultiProofTaskMetrics, SparseTrieUpdate},
payload_processor::multiproof::MultiProofTaskMetrics,
};
use alloy_primitives::B256;
use alloy_rlp::{Decodable, Encodable};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use rayon::iter::ParallelIterator;
use reth_primitives_traits::{Account, ParallelBridgeBuffered};
use reth_primitives_traits::{Account, FastInstant as Instant, ParallelBridgeBuffered};
use reth_tasks::Runtime;
use reth_trie::{
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
TrieAccount, EMPTY_ROOT_HASH, TRIE_ACCOUNT_RLP_MAX_SIZE,
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, TrieAccount,
EMPTY_ROOT_HASH, TRIE_ACCOUNT_RLP_MAX_SIZE,
};
use reth_trie_parallel::{
proof_task::{
AccountMultiproofInput, ProofResult, ProofResultContext, ProofResultMessage,
ProofWorkerHandle,
AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
},
root::ParallelStateRootError,
targets_v2::MultiProofTargetsV2,
};
#[cfg(feature = "trie-debug")]
use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
use reth_trie_sparse::{
errors::{SparseStateTrieResult, SparseTrieErrorKind, SparseTrieResult},
provider::{TrieNodeProvider, TrieNodeProviderFactory},
DeferredDrops, LeafUpdate, ParallelSparseTrie, SparseStateTrie, SparseTrie,
errors::SparseTrieResult, DeferredDrops, LeafUpdate, ParallelSparseTrie, SparseStateTrie,
SparseTrie,
};
use revm_primitives::{hash_map::Entry, B256Map};
use smallvec::SmallVec;
use std::{
sync::mpsc,
time::{Duration, Instant},
};
use tracing::{debug, debug_span, error, instrument, trace};
#[expect(clippy::large_enum_variant)]
pub(super) enum SpawnedSparseTrieTask<BPF, A, S>
where
BPF: TrieNodeProviderFactory + Send + Sync,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
{
Cleared(SparseTrieTask<BPF, A, S>),
Cached(SparseTrieCacheTask<A, S>),
}
impl<BPF, A, S> SpawnedSparseTrieTask<BPF, A, S>
where
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
{
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
match self {
Self::Cleared(task) => task.run(),
Self::Cached(task) => task.run(),
}
}
pub(super) fn into_trie_for_reuse(
self,
prune_depth: usize,
max_storage_tries: usize,
max_nodes_capacity: usize,
max_values_capacity: usize,
disable_pruning: bool,
) -> (SparseStateTrie<A, S>, DeferredDrops) {
match self {
Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
Self::Cached(task) => task.into_trie_for_reuse(
prune_depth,
max_storage_tries,
max_nodes_capacity,
max_values_capacity,
disable_pruning,
),
}
}
pub(super) fn into_cleared_trie(
self,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> (SparseStateTrie<A, S>, DeferredDrops) {
match self {
Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
Self::Cached(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
}
}
}
/// A task responsible for populating the sparse trie.
pub(super) struct SparseTrieTask<BPF, A = ParallelSparseTrie, S = ParallelSparseTrie>
where
BPF: TrieNodeProviderFactory + Send + Sync,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
{
/// Receives updates from the state root task.
pub(super) updates: mpsc::Receiver<SparseTrieUpdate>,
/// `SparseStateTrie` used for computing the state root.
pub(super) trie: SparseStateTrie<A, S>,
pub(super) metrics: MultiProofTaskMetrics,
/// Trie node provider factory.
blinded_provider_factory: BPF,
}
impl<BPF, A, S> SparseTrieTask<BPF, A, S>
where
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
{
/// Creates a new sparse trie task with the given trie.
pub(super) const fn new(
updates: mpsc::Receiver<SparseTrieUpdate>,
blinded_provider_factory: BPF,
metrics: MultiProofTaskMetrics,
trie: SparseStateTrie<A, S>,
) -> Self {
Self { updates, metrics, trie, blinded_provider_factory }
}
/// Runs the sparse trie task to completion, computing the state root.
///
/// Receives [`SparseTrieUpdate`]s until the channel is closed, applying each update
/// to the trie. Once all updates are processed, computes and returns the final state root.
#[instrument(
name = "SparseTrieTask::run",
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
let now = Instant::now();
let mut num_iterations = 0;
while let Ok(mut update) = self.updates.recv() {
num_iterations += 1;
let mut num_updates = 1;
let _enter =
debug_span!(target: "engine::tree::payload_processor::sparse_trie", "drain updates")
.entered();
while let Ok(next) = self.updates.try_recv() {
update.extend(next);
num_updates += 1;
}
drop(_enter);
debug!(
target: "engine::root",
num_updates,
account_proofs = update.multiproof.account_proofs_len(),
storage_proofs = update.multiproof.storage_proofs_len(),
"Updating sparse trie"
);
let elapsed =
update_sparse_trie(&mut self.trie, update, &self.blinded_provider_factory)
.map_err(|e| {
ParallelStateRootError::Other(format!(
"could not calculate state root: {e:?}"
))
})?;
self.metrics.sparse_trie_update_duration_histogram.record(elapsed);
trace!(target: "engine::root", ?elapsed, num_iterations, "Root calculation completed");
}
debug!(target: "engine::root", num_iterations, "All proofs processed, ending calculation");
let start = Instant::now();
let (state_root, trie_updates) =
self.trie.root_with_updates(&self.blinded_provider_factory).map_err(|e| {
ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
})?;
let end = Instant::now();
self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
Ok(StateRootComputeOutcome { state_root, trie_updates })
}
/// Clears and shrinks the trie, discarding all state.
///
/// Use this when the payload was invalid or cancelled - we don't want to preserve
/// potentially invalid trie state, but we keep the allocations for reuse.
pub(super) fn into_cleared_trie(
self,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> (SparseStateTrie<A, S>, DeferredDrops) {
let Self { mut trie, .. } = self;
trie.clear();
trie.shrink_to(max_nodes_capacity, max_values_capacity);
let deferred = trie.take_deferred_drops();
(trie, deferred)
}
}
use tracing::{debug, debug_span, error, instrument, trace_span};
/// Maximum number of pending/prewarm updates that we accumulate in memory before actually applying.
const MAX_PENDING_UPDATES: usize = 100;
@@ -295,7 +118,7 @@ where
let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
let parent_span = tracing::Span::current();
executor.spawn_blocking(move || {
executor.spawn_blocking_named("trie-hashing", move || {
let _span = debug_span!(parent: parent_span, "run_hashing_task").entered();
Self::run_hashing_task(updates, hashed_state_tx)
});
@@ -335,7 +158,7 @@ where
SparseTrieTaskMessage::PrefetchProofs(targets)
}
MultiProofMessage::StateUpdate(_, state) => {
let _span = debug_span!(target: "engine::tree::payload_processor::sparse_trie", "hashing state update", update_len = state.len()).entered();
let _span = debug_span!(target: "engine::tree::payload_processor::sparse_trie", "hashing_state_update", n = state.len()).entered();
let hashed = evm_state_to_hashed_post_state(state);
SparseTrieTaskMessage::HashedState(hashed)
}
@@ -429,14 +252,10 @@ where
let Ok(result) = message else {
unreachable!("we own the sender half")
};
let ProofResult::V2(mut result) = result.result? else {
unreachable!("sparse trie as cache must only be used with multiproof v2");
};
let mut result = result.result?;
while let Ok(next) = self.proof_result_rx.try_recv() {
let ProofResult::V2(res) = next.result? else {
unreachable!("sparse trie as cache must only be used with multiproof v2");
};
let res = next.result?;
result.extend(res);
}
@@ -478,11 +297,19 @@ where
ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
})?;
#[cfg(feature = "trie-debug")]
let debug_recorders = self.trie.take_debug_recorders();
let end = Instant::now();
self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
Ok(StateRootComputeOutcome { state_root, trie_updates })
Ok(StateRootComputeOutcome {
state_root,
trie_updates,
#[cfg(feature = "trie-debug")]
debug_recorders,
})
}
/// Processes a [`SparseTrieTaskMessage`] from the hashing task.
@@ -501,11 +328,7 @@ where
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
fn on_prewarm_targets(&mut self, targets: VersionedMultiProofTargets) {
let VersionedMultiProofTargets::V2(targets) = targets else {
unreachable!("sparse trie as cache must only be used with V2 multiproof targets");
};
fn on_prewarm_targets(&mut self, targets: MultiProofTargetsV2) {
for target in targets.account_targets {
// Only touch accounts that are not yet present in the updates set.
self.new_account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
@@ -655,7 +478,7 @@ where
})
.par_bridge_buffered()
.map(|(address, updates, mut fetched, mut trie)| {
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage trie leaf updates", ?address).entered();
let _enter = trace_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
let mut targets = Vec::new();
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
@@ -754,7 +577,7 @@ where
})
.par_bridge_buffered()
.for_each(|(address, trie)| {
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage root", ?address).entered();
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_root", ?address).entered();
trie.root().expect("updates are drained, trie should be revealed by now");
});
drop(span);
@@ -771,18 +594,7 @@ where
return true;
} else if let Some(account) = account.take() {
let storage_root = self.trie.storage_root(addr).expect("updates are drained, storage trie should be revealed by now");
let encoded = if account.is_none_or(|account| account.is_empty()) &&
storage_root == EMPTY_ROOT_HASH
{
Vec::new()
} else {
account_rlp_buf.clear();
account
.unwrap_or_default()
.into_trie_account(storage_root)
.encode(account_rlp_buf);
account_rlp_buf.clone()
};
let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
num_promoted += 1;
return false;
@@ -790,13 +602,13 @@ where
}
// Get the current account state either from the trie or from latest account update.
let trie_account = if let Some(LeafUpdate::Changed(encoded)) = self.account_updates.get(addr) {
Some(encoded).filter(|encoded| !encoded.is_empty())
} else if !self.account_updates.contains_key(addr) {
self.trie.get_account_value(addr)
} else {
let trie_account = match self.account_updates.get(addr) {
Some(LeafUpdate::Changed(encoded)) => {
Some(encoded).filter(|encoded| !encoded.is_empty())
}
// Needs to be revealed first
return true;
Some(LeafUpdate::Touched) => return true,
None => self.trie.get_account_value(addr),
};
let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
@@ -813,13 +625,7 @@ where
(trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
};
let encoded = if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
Vec::new()
} else {
account_rlp_buf.clear();
account.unwrap_or_default().into_trie_account(storage_root).encode(account_rlp_buf);
account_rlp_buf.clone()
};
let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
num_promoted += 1;
@@ -858,7 +664,7 @@ where
MultiProofTargetsV2::chunks,
|proof_targets| {
if let Err(e) = self.proof_worker_handle.dispatch_account_multiproof(
AccountMultiproofInput::V2 {
AccountMultiproofInput {
targets: proof_targets,
proof_result_sender: ProofResultContext::new(
self.proof_result_tx.clone(),
@@ -876,12 +682,27 @@ where
}
}
/// RLP-encodes the account as a [`TrieAccount`] leaf value, or returns empty for deletions.
fn encode_account_leaf_value(
account: Option<Account>,
storage_root: B256,
account_rlp_buf: &mut Vec<u8>,
) -> Vec<u8> {
if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
return Vec::new();
}
account_rlp_buf.clear();
account.unwrap_or_default().into_trie_account(storage_root).encode(account_rlp_buf);
account_rlp_buf.clone()
}
/// Message type for the sparse trie task.
enum SparseTrieTaskMessage {
/// A hashed state update ready to be processed.
HashedState(HashedPostState),
/// Prefetch proof targets (passed through directly).
PrefetchProofs(VersionedMultiProofTargets),
PrefetchProofs(MultiProofTargetsV2),
/// Signals that all state updates have been received.
FinishedStateUpdates,
}
@@ -894,166 +715,16 @@ pub struct StateRootComputeOutcome {
pub state_root: B256,
/// The trie updates.
pub trie_updates: TrieUpdates,
}
/// Updates the sparse trie with the given proofs and state, and returns the elapsed time.
#[instrument(level = "debug", target = "engine::tree::payload_processor::sparse_trie", skip_all)]
pub(crate) fn update_sparse_trie<BPF, A, S>(
trie: &mut SparseStateTrie<A, S>,
SparseTrieUpdate { mut state, multiproof }: SparseTrieUpdate,
blinded_provider_factory: &BPF,
) -> SparseStateTrieResult<Duration>
where
BPF: TrieNodeProviderFactory + Send + Sync,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
{
trace!(target: "engine::root::sparse", "Updating sparse trie");
let started_at = Instant::now();
// Reveal new accounts and storage slots.
match multiproof {
ProofResult::Legacy(decoded, _) => {
trie.reveal_decoded_multiproof(decoded)?;
}
ProofResult::V2(decoded_v2) => {
trie.reveal_decoded_multiproof_v2(decoded_v2)?;
}
}
let reveal_multiproof_elapsed = started_at.elapsed();
trace!(
target: "engine::root::sparse",
?reveal_multiproof_elapsed,
"Done revealing multiproof"
);
// Update storage slots with new values and calculate storage roots.
let span = tracing::Span::current();
let results: Vec<_> = state
.storages
.into_iter()
.map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
.par_bridge_buffered()
.map(|(address, storage, storage_trie)| {
let _enter =
debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage trie", ?address)
.entered();
trace!(target: "engine::tree::payload_processor::sparse_trie", "Updating storage");
let storage_provider = blinded_provider_factory.storage_node_provider(address);
let mut storage_trie = storage_trie.ok_or(SparseTrieErrorKind::Blind)?;
if storage.wiped {
trace!(target: "engine::tree::payload_processor::sparse_trie", "Wiping storage");
storage_trie.wipe()?;
}
// Defer leaf removals until after updates/additions, so that we don't delete an
// intermediate branch node during a removal and then re-add that branch back during a
// later leaf addition. This is an optimization, but also a requirement inherited from
// multiproof generating, which can't know the order that leaf operations happen in.
let mut removed_slots = SmallVec::<[Nibbles; 8]>::new();
for (slot, value) in storage.storage {
let slot_nibbles = Nibbles::unpack(slot);
if value.is_zero() {
removed_slots.push(slot_nibbles);
continue;
}
trace!(target: "engine::tree::payload_processor::sparse_trie", ?slot_nibbles, "Updating storage slot");
storage_trie.update_leaf(
slot_nibbles,
alloy_rlp::encode_fixed_size(&value).to_vec(),
&storage_provider,
)?;
}
for slot_nibbles in removed_slots {
trace!(target: "engine::root::sparse", ?slot_nibbles, "Removing storage slot");
storage_trie.remove_leaf(&slot_nibbles, &storage_provider)?;
}
storage_trie.root();
SparseStateTrieResult::Ok((address, storage_trie))
})
.collect();
// Defer leaf removals until after updates/additions, so that we don't delete an intermediate
// branch node during a removal and then re-add that branch back during a later leaf addition.
// This is an optimization, but also a requirement inherited from multiproof generating, which
// can't know the order that leaf operations happen in.
let mut removed_accounts = Vec::new();
// Update account storage roots
let _enter =
tracing::debug_span!(target: "engine::tree::payload_processor::sparse_trie", "account trie")
.entered();
for result in results {
let (address, storage_trie) = result?;
trie.insert_storage_trie(address, storage_trie);
if let Some(account) = state.accounts.remove(&address) {
// If the account itself has an update, remove it from the state update and update in
// one go instead of doing it down below.
trace!(target: "engine::root::sparse", ?address, "Updating account and its storage root");
if !trie.update_account(
address,
account.unwrap_or_default(),
blinded_provider_factory,
)? {
removed_accounts.push(address);
}
} else if trie.is_account_revealed(address) {
// Otherwise, if the account is revealed, only update its storage root.
trace!(target: "engine::root::sparse", ?address, "Updating account storage root");
if !trie.update_account_storage_root(address, blinded_provider_factory)? {
removed_accounts.push(address);
}
}
}
// Update accounts
for (address, account) in state.accounts {
trace!(target: "engine::root::sparse", ?address, "Updating account");
if !trie.update_account(address, account.unwrap_or_default(), blinded_provider_factory)? {
removed_accounts.push(address);
}
}
// Remove accounts
for address in removed_accounts {
trace!(target: "engine::root::sparse", ?address, "Removing account");
let nibbles = Nibbles::unpack(address);
trie.remove_account_leaf(&nibbles, blinded_provider_factory)?;
}
let elapsed_before = started_at.elapsed();
trace!(
target: "engine::root::sparse",
"Calculating subtries"
);
trie.calculate_subtries();
let elapsed = started_at.elapsed();
let below_level_elapsed = elapsed - elapsed_before;
trace!(
target: "engine::root::sparse",
?below_level_elapsed,
"Intermediate nodes calculated"
);
Ok(elapsed)
/// Debug recorders taken from the sparse tries, keyed by `None` for account trie
/// and `Some(address)` for storage tries.
#[cfg(feature = "trie-debug")]
pub debug_recorders: Vec<(Option<B256>, TrieDebugRecorder)>,
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::{keccak256, Address, U256};
use alloy_primitives::{keccak256, Address, B256, U256};
use reth_trie_sparse::ParallelSparseTrie;
#[test]
@@ -1104,4 +775,33 @@ mod tests {
assert!(hashed_state_rx.recv().is_err());
handle.join().unwrap();
}
#[test]
fn test_encode_account_leaf_value_empty_account_and_empty_root_is_empty() {
let mut account_rlp_buf = vec![0xAB];
let encoded = encode_account_leaf_value(None, EMPTY_ROOT_HASH, &mut account_rlp_buf);
assert!(encoded.is_empty());
// Early return should not touch the caller's buffer.
assert_eq!(account_rlp_buf, vec![0xAB]);
}
#[test]
fn test_encode_account_leaf_value_non_empty_account_is_rlp() {
let storage_root = B256::from([0x99; 32]);
let account = Some(Account {
nonce: 7,
balance: U256::from(42),
bytecode_hash: Some(B256::from([0xAA; 32])),
});
let mut account_rlp_buf = vec![0x00, 0x01];
let encoded = encode_account_leaf_value(account, storage_root, &mut account_rlp_buf);
let decoded = TrieAccount::decode(&mut &encoded[..]).expect("valid account RLP");
assert_eq!(decoded.nonce, 7);
assert_eq!(decoded.balance, U256::from(42));
assert_eq!(decoded.storage_root, storage_root);
assert_eq!(account_rlp_buf, encoded);
}
}

View File

@@ -7,14 +7,16 @@ use crate::tree::{
payload_processor::PayloadProcessor,
precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
sparse_trie::StateRootComputeOutcome,
EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle, StateProviderBuilder,
StateProviderDatabase, TreeConfig,
CacheWaitDurations, EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle,
StateProviderBuilder, StateProviderDatabase, TreeConfig, WaitForCaches,
};
use alloy_consensus::transaction::{Either, TxHashRef};
use alloy_eip7928::BlockAccessList;
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal, NumHash};
use alloy_evm::Evm;
use alloy_primitives::B256;
#[cfg(feature = "trie-debug")]
use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle};
use reth_chain_state::{CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, LazyOverlay};
@@ -31,8 +33,8 @@ use reth_payload_primitives::{
BuiltPayload, InvalidPayloadAttributesError, NewPayloadError, PayloadTypes,
};
use reth_primitives_traits::{
AlloyBlockHeader, BlockBody, BlockTy, GotExpected, NodePrimitives, RecoveredBlock, SealedBlock,
SealedHeader, SignerRecoverable,
AlloyBlockHeader, BlockBody, BlockTy, FastInstant as Instant, GotExpected, NodePrimitives,
RecoveredBlock, SealedBlock, SealedHeader, SignerRecoverable,
};
use reth_provider::{
providers::OverlayStateProviderFactory, BlockExecutionOutput, BlockNumReader, BlockReader,
@@ -49,10 +51,12 @@ use std::{
collections::HashMap,
panic::{self, AssertUnwindSafe},
sync::{mpsc::RecvTimeoutError, Arc},
time::Instant,
};
use tracing::{debug, debug_span, error, info, instrument, trace, warn};
/// Handle to a [`HashedPostState`] computed on a background thread.
type LazyHashedPostState = reth_tasks::LazyHandle<HashedPostState>;
/// Context providing access to tree state during validation.
///
/// This context is provided to the [`EngineValidator`] and includes the state of the tree's
@@ -327,9 +331,43 @@ where
mut ctx: TreeCtx<'_, N>,
) -> ValidationOutcome<N, InsertPayloadError<N::Block>>
where
V: PayloadValidator<T, Block = N::Block>,
V: PayloadValidator<T, Block = N::Block> + Clone,
Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
{
// Spawn block conversion on a background thread so it runs concurrently with the
// rest of the function (setup + execution). For payloads this overlaps the cost of
// RLP decoding + header hashing; for already-converted blocks this is a no-op.
let convert_to_block = match &input {
BlockOrPayload::Payload(_) => {
let payload_clone = input.clone();
let validator = self.validator.clone();
let handle = self.payload_processor.executor().spawn_blocking_named(
"payload-convert",
move || {
let BlockOrPayload::Payload(payload) = payload_clone else {
unreachable!()
};
validator.convert_payload_to_block(payload)
},
);
Either::Left(handle)
}
BlockOrPayload::Block(_) => Either::Right(()),
};
// Returns the sealed block, either by awaiting the background conversion task (for
// payloads) or by extracting the already-converted block directly.
let convert_to_block =
move |input: BlockOrPayload<T>| -> Result<SealedBlock<N::Block>, NewPayloadError> {
match convert_to_block {
Either::Left(handle) => handle.try_into_inner().expect("sole handle"),
Either::Right(()) => {
let BlockOrPayload::Block(block) = input else { unreachable!() };
Ok(block)
}
}
};
/// A helper macro that returns the block in case there was an error
/// This macro is used for early returns before block conversion
macro_rules! ensure_ok {
@@ -337,7 +375,7 @@ where
match $expr {
Ok(val) => val,
Err(e) => {
let block = self.convert_to_block(input)?;
let block = convert_to_block(input)?;
return Err(InsertBlockError::new(block, e.into()).into())
}
}
@@ -362,13 +400,13 @@ where
trace!(target: "engine::tree::payload_validator", "Fetching block state provider");
let _enter =
debug_span!(target: "engine::tree::payload_validator", "state provider").entered();
debug_span!(target: "engine::tree::payload_validator", "state_provider").entered();
let Some(provider_builder) =
ensure_ok!(self.state_provider_builder(parent_hash, ctx.state()))
else {
// this is pre-validated in the tree
return Err(InsertBlockError::new(
self.convert_to_block(input)?,
convert_to_block(input)?,
ProviderError::HeaderNotFound(parent_hash.into()).into(),
)
.into())
@@ -381,13 +419,13 @@ where
let Some(parent_block) = ensure_ok!(self.sealed_header_by_hash(parent_hash, ctx.state()))
else {
return Err(InsertBlockError::new(
self.convert_to_block(input)?,
convert_to_block(input)?,
ProviderError::HeaderNotFound(parent_hash.into()).into(),
)
.into())
};
let evm_env = debug_span!(target: "engine::tree::payload_validator", "evm env")
let evm_env = debug_span!(target: "engine::tree::payload_validator", "evm_env")
.in_scope(|| self.evm_env_for(&input))
.map_err(NewPayloadError::other)?;
@@ -397,6 +435,7 @@ where
parent_hash: input.parent_hash(),
parent_state_root: parent_block.state_root(),
transaction_count: input.transaction_count(),
gas_used: input.gas_used(),
withdrawals: input.withdrawals().map(|w| w.to_vec()),
};
@@ -473,7 +512,22 @@ where
// needed. This frees up resources while state root computation continues.
let valid_block_tx = handle.terminate_caching(Some(output.clone()));
let block = self.convert_to_block(input)?.with_senders(senders);
// Spawn hashed post state computation in background so it runs concurrently with
// block conversion and receipt root computation. This is a pure CPU-bound task
// (keccak256 hashing of all changed addresses and storage slots).
let hashed_state_output = output.clone();
let hashed_state_provider = self.provider.clone();
let hashed_state: LazyHashedPostState =
self.payload_processor.executor().spawn_blocking_named("hash-post-state", move || {
let _span = debug_span!(
target: "engine::tree::payload_validator",
"hashed_post_state",
)
.entered();
hashed_state_provider.hashed_post_state(&hashed_state_output.state)
});
let block = convert_to_block(input)?.with_senders(senders);
// Wait for the receipt root computation to complete.
let receipt_root_bloom = receipt_root_rx
@@ -492,7 +546,8 @@ where
&parent_block,
&output,
&mut ctx,
receipt_root_bloom
receipt_root_bloom,
hashed_state,
),
block
);
@@ -500,6 +555,8 @@ where
let root_time = Instant::now();
let mut maybe_state_root = None;
let mut state_root_task_failed = false;
#[cfg(feature = "trie-debug")]
let mut trie_debug_recorders = Vec::new();
match strategy {
StateRootStrategy::StateRootTask => {
@@ -515,17 +572,34 @@ where
);
match task_result {
Ok(StateRootComputeOutcome { state_root, trie_updates }) => {
Ok(StateRootComputeOutcome {
state_root,
trie_updates,
#[cfg(feature = "trie-debug")]
debug_recorders,
}) => {
let elapsed = root_time.elapsed();
info!(target: "engine::tree::payload_validator", ?state_root, ?elapsed, "State root task finished");
#[cfg(feature = "trie-debug")]
{
trie_debug_recorders = debug_recorders;
}
// Compare trie updates with serial computation if configured
if self.config.always_compare_trie_updates() {
self.compare_trie_updates_with_serial(
let _has_diff = self.compare_trie_updates_with_serial(
overlay_factory.clone(),
&hashed_state,
trie_updates.clone(),
);
#[cfg(feature = "trie-debug")]
if _has_diff {
Self::write_trie_debug_recorders(
block.header().number(),
&trie_debug_recorders,
);
}
}
// we double check the state root here for good measure
@@ -538,6 +612,11 @@ where
block_state_root = ?block.header().state_root(),
"State root task returned incorrect state root"
);
#[cfg(feature = "trie-debug")]
Self::write_trie_debug_recorders(
block.header().number(),
&trie_debug_recorders,
);
state_root_task_failed = true;
}
}
@@ -597,10 +676,15 @@ where
};
self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
self.metrics
.record_state_root_gas_bucket(block.header().gas_used(), root_elapsed.as_secs_f64());
debug!(target: "engine::tree::payload_validator", ?root_elapsed, "Calculated state root");
// ensure state root matches
if state_root != block.header().state_root() {
#[cfg(feature = "trie-debug")]
Self::write_trie_debug_recorders(block.header().number(), &trie_debug_recorders);
// call post-block hook
self.on_invalid_block(
&parent_block,
@@ -699,20 +783,27 @@ where
{
debug!(target: "engine::tree::payload_validator", "Executing block");
let mut db = State::builder()
.with_database(StateProviderDatabase::new(state_provider))
.with_bundle_update()
.without_state_clear()
.build();
let mut db = debug_span!(target: "engine::tree", "build_state_db").in_scope(|| {
State::builder()
.with_database(StateProviderDatabase::new(state_provider))
.with_bundle_update()
.without_state_clear()
.build()
});
let spec_id = *env.evm_env.spec_id();
let evm = self.evm_config.evm_with_env(&mut db, env.evm_env);
let ctx =
self.execution_ctx_for(input).map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
let mut executor = self.evm_config.create_executor(evm, ctx);
let (spec_id, mut executor) = {
let _span = debug_span!(target: "engine::tree", "create_evm").entered();
let spec_id = *env.evm_env.spec_id();
let evm = self.evm_config.evm_with_env(&mut db, env.evm_env);
let ctx = self
.execution_ctx_for(input)
.map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
let executor = self.evm_config.create_executor(evm, ctx);
(spec_id, executor)
};
if !self.config.precompile_cache_disabled() {
// Only cache pure precompiles to avoid issues with stateful precompiles
let _span = debug_span!(target: "engine::tree", "setup_precompile_cache").entered();
executor.evm_mut().precompiles_mut().map_pure_precompiles(|address, precompile| {
let metrics = self
.precompile_cache_metrics
@@ -734,7 +825,9 @@ where
let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded();
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
self.payload_processor.executor().spawn_blocking(move || task_handle.run(receipts_len));
self.payload_processor
.executor()
.spawn_blocking_named("receipt-root", move || task_handle.run(receipts_len));
let transaction_count = input.transaction_count();
let executor = executor.with_state_hook(Some(Box::new(handle.state_hook())));
@@ -752,19 +845,20 @@ where
// Finish execution and get the result
let post_exec_start = Instant::now();
let (_evm, result) = debug_span!(target: "engine::tree", "finish")
let (_evm, result) = debug_span!(target: "engine::tree", "BlockExecutor::finish")
.in_scope(|| executor.finish())
.map(|(evm, result)| (evm.into_db(), result))?;
self.metrics.record_post_execution(post_exec_start.elapsed());
// Merge transitions into bundle state
debug_span!(target: "engine::tree", "merge transitions")
debug_span!(target: "engine::tree", "merge_transitions")
.in_scope(|| db.merge_transitions(BundleRetention::Reverts));
let output = BlockExecutionOutput { result, state: db.take_bundle() };
let execution_duration = execution_start.elapsed();
self.metrics.record_block_execution(&output, execution_duration);
self.metrics.record_block_execution_gas_bucket(output.result.gas_used, execution_duration);
debug!(target: "engine::tree::payload_validator", elapsed = ?execution_duration, "Executed block");
Ok((output, senders, result_rx))
@@ -796,7 +890,7 @@ where
// Apply pre-execution changes (e.g., beacon root update)
let pre_exec_start = Instant::now();
debug_span!(target: "engine::tree", "pre execution")
debug_span!(target: "engine::tree", "pre_execution")
.in_scope(|| executor.apply_pre_execution_changes())?;
self.metrics.record_pre_execution(pre_exec_start.elapsed());
@@ -860,8 +954,9 @@ where
fn compute_state_root_parallel(
&self,
overlay_factory: OverlayStateProviderFactory<P>,
hashed_state: &HashedPostState,
hashed_state: &LazyHashedPostState,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
let hashed_state = hashed_state.get();
// The `hashed_state` argument will be taken into account as part of the overlay, but we
// need to use the prefix sets which were generated from it to indicate to the
// ParallelStateRoot which parts of the trie need to be recomputed.
@@ -879,8 +974,9 @@ where
/// trie updates for this block.
fn compute_state_root_serial(
overlay_factory: OverlayStateProviderFactory<P>,
hashed_state: &HashedPostState,
hashed_state: &LazyHashedPostState,
) -> ProviderResult<(B256, TrieUpdates)> {
let hashed_state = hashed_state.get();
// The `hashed_state` argument will be taken into account as part of the overlay, but we
// need to use the prefix sets which were generated from it to indicate to the
// StateRoot which parts of the trie need to be recomputed.
@@ -918,7 +1014,7 @@ where
&self,
handle: &mut PayloadHandle<Tx, Err, R>,
overlay_factory: OverlayStateProviderFactory<P>,
hashed_state: &HashedPostState,
hashed_state: &LazyHashedPostState,
) -> ProviderResult<Result<StateRootComputeOutcome, ParallelStateRootError>> {
let Some(timeout) = self.config.state_root_task_timeout() else {
return Ok(handle.state_root());
@@ -944,7 +1040,7 @@ where
let seq_overlay = overlay_factory;
let seq_hashed_state = hashed_state.clone();
self.payload_processor.executor().spawn_blocking(move || {
self.payload_processor.executor().spawn_blocking_named("serial-root", move || {
let result = Self::compute_state_root_serial(seq_overlay, &seq_hashed_state);
let _ = seq_tx.send(result);
});
@@ -972,7 +1068,12 @@ where
))
})?;
let (state_root, trie_updates) = result?;
return Ok(Ok(StateRootComputeOutcome { state_root, trie_updates }));
return Ok(Ok(StateRootComputeOutcome {
state_root,
trie_updates,
#[cfg(feature = "trie-debug")]
debug_recorders: Vec::new(),
}));
}
Err(RecvTimeoutError::Timeout) => {}
}
@@ -984,7 +1085,12 @@ where
"State root timeout race won"
);
let (state_root, trie_updates) = result?;
return Ok(Ok(StateRootComputeOutcome { state_root, trie_updates }));
return Ok(Ok(StateRootComputeOutcome {
state_root,
trie_updates,
#[cfg(feature = "trie-debug")]
debug_recorders: Vec::new(),
}));
}
}
}
@@ -1000,9 +1106,9 @@ where
fn compare_trie_updates_with_serial(
&self,
overlay_factory: OverlayStateProviderFactory<P>,
hashed_state: &HashedPostState,
hashed_state: &LazyHashedPostState,
task_trie_updates: TrieUpdates,
) {
) -> bool {
debug!(target: "engine::tree::payload_validator", "Comparing trie updates with serial computation");
match Self::compute_state_root_serial(overlay_factory.clone(), hashed_state) {
@@ -1016,16 +1122,20 @@ where
// Get a database provider to use as trie cursor factory
match overlay_factory.database_provider_ro() {
Ok(provider) => {
if let Err(err) = super::trie_updates::compare_trie_updates(
match super::trie_updates::compare_trie_updates(
&provider,
task_trie_updates,
serial_trie_updates,
) {
warn!(
target: "engine::tree::payload_validator",
%err,
"Error comparing trie updates"
);
Ok(has_diff) => return has_diff,
Err(err) => {
warn!(
target: "engine::tree::payload_validator",
%err,
"Error comparing trie updates"
);
return true;
}
}
}
Err(err) => {
@@ -1045,6 +1155,45 @@ where
);
}
}
false
}
/// Writes trie debug recorders to a JSON file for the given block number.
///
/// The file is written to the current working directory as
/// `trie_debug_block_{block_number}.json`.
#[cfg(feature = "trie-debug")]
fn write_trie_debug_recorders(
block_number: u64,
recorders: &[(Option<B256>, TrieDebugRecorder)],
) {
let path = format!("trie_debug_block_{block_number}.json");
match serde_json::to_string_pretty(recorders) {
Ok(json) => match std::fs::write(&path, json) {
Ok(()) => {
warn!(
target: "engine::tree::payload_validator",
%path,
"Wrote trie debug recorders to file"
);
}
Err(err) => {
warn!(
target: "engine::tree::payload_validator",
%err,
%path,
"Failed to write trie debug recorders"
);
}
},
Err(err) => {
warn!(
target: "engine::tree::payload_validator",
%err,
"Failed to serialize trie debug recorders"
);
}
}
}
/// Validates the block after execution.
@@ -1056,6 +1205,8 @@ where
///
/// If `receipt_root_bloom` is provided, it will be used instead of computing the receipt root
/// and logs bloom from the receipts.
///
/// The `hashed_state` handle wraps the background hashed post state computation.
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
fn validate_post_execution<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
&self,
@@ -1064,7 +1215,8 @@ where
output: &BlockExecutionOutput<N::Receipt>,
ctx: &mut TreeCtx<'_, N>,
receipt_root_bloom: Option<ReceiptRootBloom>,
) -> Result<HashedPostState, InsertBlockErrorKind>
hashed_state: LazyHashedPostState,
) -> Result<LazyHashedPostState, InsertBlockErrorKind>
where
V: PayloadValidator<T, Block = N::Block>,
{
@@ -1099,14 +1251,10 @@ where
}
drop(_enter);
let _enter =
debug_span!(target: "engine::tree::payload_validator", "hashed_post_state").entered();
let hashed_state = self.provider.hashed_post_state(&output.state);
drop(_enter);
let _enter = debug_span!(target: "engine::tree::payload_validator", "validate_block_post_execution_with_hashed_state").entered();
if let Err(err) =
self.validator.validate_block_post_execution_with_hashed_state(&hashed_state, block)
if let Err(err) = self
.validator
.validate_block_post_execution_with_hashed_state(hashed_state.get(), block)
{
// call post-block hook
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
@@ -1330,7 +1478,7 @@ where
block: RecoveredBlock<N::Block>,
execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
ctx: &TreeCtx<'_, N>,
hashed_state: HashedPostState,
hashed_state: LazyHashedPostState,
trie_output: TrieUpdates,
overlay_factory: OverlayStateProviderFactory<P>,
) -> ExecutedBlock<N> {
@@ -1347,12 +1495,14 @@ where
overlay_blocks.iter().rev().map(|b| b.trie_data_handle()).collect();
// Create deferred handle with fallback inputs in case the background task hasn't completed.
let deferred_trie_data = DeferredTrieData::pending(
Arc::new(hashed_state),
Arc::new(trie_output),
anchor_hash,
ancestors,
);
// Resolve the lazy handle into Arc<HashedPostState>. By this point the hashed state has
// already been computed and used for state root verification, so .get() returns instantly.
let hashed_state = match hashed_state.try_into_inner() {
Ok(state) => Arc::new(state),
Err(handle) => Arc::new(handle.get().clone()),
};
let deferred_trie_data =
DeferredTrieData::pending(hashed_state, Arc::new(trie_output), anchor_hash, ancestors);
let deferred_handle_task = deferred_trie_data.clone();
let block_validation_metrics = self.metrics.block_validation.clone();
@@ -1438,7 +1588,9 @@ where
};
// Spawn task that computes trie data asynchronously.
self.payload_processor.executor().spawn_blocking(compute_trie_input_task);
self.payload_processor
.executor()
.spawn_blocking_named("trie-input", compute_trie_input_task);
ExecutedBlock::with_deferred_trie_data(
Arc::new(block),
@@ -1538,7 +1690,7 @@ where
+ Clone
+ 'static,
N: NodePrimitives,
V: PayloadValidator<Types, Block = N::Block>,
V: PayloadValidator<Types, Block = N::Block> + Clone,
Evm: ConfigureEngineEvm<Types::ExecutionData, Primitives = N> + 'static,
Types: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
{
@@ -1582,8 +1734,17 @@ where
}
}
impl<P, Evm, V> WaitForCaches for BasicEngineValidator<P, Evm, V>
where
Evm: ConfigureEvm,
{
fn wait_for_caches(&self) -> CacheWaitDurations {
self.payload_processor.wait_for_caches()
}
}
/// Enum representing either block or payload being validated.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum BlockOrPayload<T: PayloadTypes> {
/// Payload.
Payload(T::ExecutionData),
@@ -1659,4 +1820,15 @@ impl<T: PayloadTypes> BlockOrPayload<T> {
Self::Block(block) => block.body().withdrawals().map(|w| w.as_slice()),
}
}
/// Returns the total gas used by the block.
pub fn gas_used(&self) -> u64
where
T::ExecutionData: ExecutionPayload,
{
match self {
Self::Payload(payload) => payload.gas_used(),
Self::Block(block) => block.gas_used(),
}
}
}

View File

@@ -23,7 +23,7 @@
use alloy_eips::BlockNumHash;
use alloy_primitives::B256;
use crossbeam_channel::Receiver as CrossbeamReceiver;
use std::time::Instant;
use reth_primitives_traits::FastInstant as Instant;
use tracing::trace;
/// The state of the persistence task.

View File

@@ -101,11 +101,13 @@ impl StorageTrieUpdatesDiff {
/// Compares the trie updates from state root task, regular state root calculation and database,
/// and logs the differences if there's any.
///
/// Returns `true` if there are differences.
pub(crate) fn compare_trie_updates(
trie_cursor_factory: impl TrieCursorFactory,
task: TrieUpdates,
regular: TrieUpdates,
) -> Result<(), DatabaseError> {
) -> Result<bool, DatabaseError> {
let mut task = adjust_trie_updates(task);
let mut regular = adjust_trie_updates(regular);
@@ -179,9 +181,10 @@ pub(crate) fn compare_trie_updates(
}
// log differences
let has_differences = diff.has_differences();
diff.log_differences();
Ok(())
Ok(has_differences)
}
fn compare_storage_trie_updates<C: TrieCursor>(

View File

@@ -77,7 +77,8 @@ impl EngineMessageStore {
})?,
)?;
}
BeaconEngineMessage::NewPayload { payload, tx: _tx } => {
BeaconEngineMessage::NewPayload { payload, .. } |
BeaconEngineMessage::RethNewPayload { payload, .. } => {
let filename = format!("{}-new_payload-{}.json", timestamp, payload.block_hash());
fs::write(
self.path.join(filename),

View File

@@ -106,6 +106,11 @@ where
self.cli.logs.log_file_directory.join(chain_spec.chain().to_string());
}
// Apply node-specific log defaults before initializing tracing
if matches!(self.cli.command, Commands::Node(_)) {
self.cli.logs.apply_node_defaults();
}
self.init_tracing(&runner)?;
// Install the prometheus recorder to be sure to record all metrics
@@ -144,6 +149,8 @@ where
N: CliNodeTypes<Primitives: NodePrimitives<BlockHeader: HeaderMut>, ChainSpec: Hardforks>,
SubCmd: ExtendedCommand + Subcommand + fmt::Debug,
{
let rt = runner.runtime();
match cli.command {
Commands::Node(command) => {
// Validate RPC modules using the configured validator
@@ -158,6 +165,7 @@ where
reserved_cpu_cores: command.engine.reserved_cpu_cores,
proof_storage_worker_threads: command.engine.storage_worker_count,
proof_account_worker_threads: command.engine.account_worker_count,
prewarming_threads: command.engine.prewarming_threads,
..Default::default()
};
let runner = CliRunner::try_with_runtime_config(
@@ -168,13 +176,13 @@ where
command.execute(ctx, FnLauncher::new::<C, Ext>(launcher))
})
}
Commands::Init(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>()),
Commands::InitState(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>()),
Commands::Init(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>(rt)),
Commands::InitState(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>(rt)),
Commands::Import(command) => {
runner.run_blocking_until_ctrl_c(command.execute::<N, _>(components))
runner.run_blocking_until_ctrl_c(command.execute::<N, _>(components, rt))
}
Commands::ImportEra(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>()),
Commands::ExportEra(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>()),
Commands::ImportEra(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>(rt)),
Commands::ExportEra(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>(rt)),
Commands::DumpGenesis(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::Db(command) => {
runner.run_blocking_command_until_exit(|ctx| command.execute::<N>(ctx))
@@ -188,7 +196,9 @@ where
Commands::Prune(command) => runner.run_command_until_exit(|ctx| command.execute::<N>(ctx)),
#[cfg(feature = "dev")]
Commands::TestVectors(command) => runner.run_until_ctrl_c(command.execute()),
Commands::ReExecute(command) => runner.run_until_ctrl_c(command.execute::<N>(components)),
Commands::ReExecute(command) => {
runner.run_until_ctrl_c(command.execute::<N>(components, rt))
}
Commands::Ext(command) => command.execute(runner),
}
}

Some files were not shown because too many files have changed in this diff Show More