Compare commits

..

99 Commits

Author SHA1 Message Date
Brian Picciano
d6324d63e2 chore: release 1.11.3 2026-03-12 12:34:39 +01:00
Brian Picciano
5f3ade1bfe fix(trie): Reset proof v2 calculator on error (#22781)
Co-authored-by: Amp <amp@ampcode.com>
2026-03-12 10:09:18 +00:00
Derek Cofausper
b053f6fafe cherry-pick: fix don't produce both updates and removals for trie nodes (#22507)
Co-Authored-By: Arsenii Kulikov <62447812+klkvr@users.noreply.github.com>
2026-03-12 02:30:25 +00:00
Derek Cofausper
2a58e7a077 cherry-pick: install rayon panic handler (37f5b3a)
Co-Authored-By: Arsenii Kulikov <62447812+klkvr@users.noreply.github.com>
2026-03-12 02:30:17 +00:00
Emma Jamieson-Hoare
793a3d5fb3 fix missing import 2026-03-10 11:44:07 +00:00
Emma Jamieson-Hoare
89ae1af694 chore: upgrade to 1.11.2 2026-03-10 10:48:03 +00:00
Alexey Shekhirin
9c33fb5d45 fix(engine): reset execution cache hash on clear (#22895)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 10:46:09 +00:00
Alexey Shekhirin
bef3d7b4d1 fix lockfile 2026-02-23 18:36:44 +00:00
Emma Jamieson-Hoare
e918c17af9 chore: release 1.11.1
Amp-Thread-ID: https://ampcode.com/threads/T-019c8ba4-fd85-736b-9d2d-e878d350a91b
Co-authored-by: Amp <amp@ampcode.com>
2026-02-23 18:02:14 +00:00
Arsenii Kulikov
fcc170d53c fix: properly reveal trie nodes (#22415) 2026-02-23 17:58:13 +00:00
Arsenii Kulikov
c685842ba2 fix: overlay preparation on tokio (#22492) 2026-02-23 17:57:51 +00:00
Georgios Konstantopoulos
564ffa5868 fix(ci): pass docker tags as separate set entries in bake action (#22151)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 22:10:35 +00:00
Dan Cline
12891dd171 chore: allow invalid storage metadata (#22150) 2026-02-12 22:02:26 +00:00
Emma Jamieson-Hoare
c1015022f5 chore: release reth v1.11.0 (#22148) 2026-02-12 21:39:30 +00:00
Dan Cline
e3fe6326bc chore(storage): rm storage settings, use only one (#22042)
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
2026-02-12 21:17:05 +00:00
Dan Cline
e3d520b24f feat(network): add inbound / outbound scopes for disconnect reasons (#22070) 2026-02-12 20:54:03 +00:00
Dan Cline
9f29939ea1 feat: bundle mdbx_copy as reth db copy subcommand (#22061)
Co-authored-by: Emma Jamieson-Hoare <emmajam@users.noreply.github.com>
2026-02-12 20:39:56 +00:00
Matthias Seitz
10881d1c73 chore: fix book (#22142) 2026-02-12 21:44:53 +01:00
John Letey
408593467b feat(download): optional chain-aware snapshot url (#22119) 2026-02-12 21:42:19 +01:00
Emma Jamieson-Hoare
8caf8cdf11 docs: improve reth.rs/overview page (#22131)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 20:10:34 +00:00
Georgios Konstantopoulos
1e8030ef28 fix(engine): return error on updates channel disconnect in sparse trie task (#22139)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 20:00:36 +00:00
YK
f72c503d6f feat(metrics): use 5M first gas bucket for finer-grained newPayload metrics (#22136)
Co-authored-by: Ubuntu <ubuntu@dev-yk.tail388b2e.ts.net>
2026-02-12 19:03:21 +00:00
Emma Jamieson-Hoare
42890e6e7f fix: improve nightly Docker build failure Slack notification (#22130)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 18:58:55 +00:00
Dan Cline
e30e441ada fix: stage drop prunes account/storage changeset static files (#22062) 2026-02-12 18:34:46 +00:00
Georgios Konstantopoulos
121160d248 refactor(db): use hashed state as canonical state representation (#21115)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
2026-02-12 18:02:02 +00:00
Georgios Konstantopoulos
7ff78ca082 perf(engine): use transaction count threshold for prewarm skip (#22094)
Co-authored-by: yk <yongkang@tempo.xyz>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Ubuntu <ubuntu@dev-yk.tail388b2e.ts.net>
Co-authored-by: YK <chiayongkang@hotmail.com>
2026-02-12 17:07:52 +00:00
Georgios Konstantopoulos
d7f56d509c chore: add DaniPopes as codeowner for tasks crate (#22128)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 12:08:02 -05:00
Georgios Konstantopoulos
3300e404cf feat(engine): add --engine.disable-sparse-trie-cache-pruning flag (#21967)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: mattsse <19890894+mattsse@users.noreply.github.com>
Co-authored-by: alexey <17802178+shekhirin@users.noreply.github.com>
Co-authored-by: Alexey Shekhirin <github@shekhirin.com>
2026-02-12 16:36:31 +00:00
Georgios Konstantopoulos
77cb99fc78 chore(node): update misleading consensus engine log message (#22124)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Emma Jamieson-Hoare <emmajam@users.noreply.github.com>
2026-02-12 16:14:03 +00:00
Georgios Konstantopoulos
66169c7e7c feat(reth-bench): add progress field to per-block benchmark logs (#22016)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 16:03:32 +00:00
Georgios Konstantopoulos
4f5fafc8f3 fix(net): correct EthMessageID::max for eth70 and later versions (#22076)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 15:53:11 +00:00
Georgios Konstantopoulos
0b8e6c6ed3 feat(net): enforce EIP-868 fork ID for discovered peers (#22013)
Co-authored-by: Emma <emma@tempo.xyz>
Co-authored-by: Matthias Seitz <mattsse@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Emma Jamieson-Hoare <ejamieson19@gmail.com>
Co-authored-by: Emma Jamieson-Hoare <emmajam@users.noreply.github.com>
2026-02-12 15:29:37 +00:00
Georgios Konstantopoulos
4a62d38af2 perf(engine): use sequential sig recovery for blocks with small blocks (#22077)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Ubuntu <ubuntu@dev-yk.tail388b2e.ts.net>
Co-authored-by: YK <chiayongkang@hotmail.com>
2026-02-12 15:06:21 +00:00
Georgios Konstantopoulos
dc4f249f09 chore: zero-pad thread indices in thread names (#22113)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 12:45:49 +00:00
Brian Picciano
c915841a45 chore(stateless): Remove reth-stateless crate (#22115) 2026-02-12 11:20:49 +00:00
Georgios Konstantopoulos
217a337d8c chore(engine): remove biased select in engine service loop (#21961)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
2026-02-12 05:45:45 +00:00
Georgios Konstantopoulos
74d57008b6 chore(engine): downgrade failed response delivery logs to warn (#22055)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 05:44:09 +00:00
Georgios Konstantopoulos
f8767bc678 fix(engine): add await_state_root span to timeout path (#22111)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 05:14:39 +00:00
Georgios Konstantopoulos
81c83bba68 refactor(engine): remove unnecessary turbofish on CachedStateProvider, add new_prewarm (#22107)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 02:48:57 +00:00
Georgios Konstantopoulos
cd8ec58703 refactor(engine): move CachedStateProvider prewarm to const generic (#22106)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 01:30:24 +00:00
DaniPopes
931b17c3fd chore: bump alloy-core deps (#22104) 2026-02-12 01:15:56 +00:00
Emma Jamieson-Hoare
807d328cf0 fix: move alloy-primitives to regular dependency in bin/reth (#22105)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 01:15:12 +00:00
Georgios Konstantopoulos
8a6bbd29fe fix(tracing): return error instead of panicking on log directory creation failure (#22100)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 00:40:39 +00:00
Georgios Konstantopoulos
8bedaaee71 feat(docker): include debug symbols in maxperf images (#22003)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 00:34:41 +00:00
Emma Jamieson-Hoare
09cd105671 fix(primitives): move feature-referenced deps from dev-dependencies to optional dependencies (#22103)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 23:50:56 +00:00
Georgios Konstantopoulos
a0b60b7e64 feat(evm): impl ExecutableTxTuple for Either via EitherTxIterator (#22102)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 23:48:17 +00:00
DaniPopes
90e15d096d perf: reduce tracing span noise in prewarm and proof workers (#22101) 2026-02-11 23:32:50 +00:00
Emma Jamieson-Hoare
a161ca294f feat(net): add reason label to backed_off_peers metric (#22009)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 23:00:20 +00:00
Emma Jamieson-Hoare
3a5c41e3da test: add WebSocket subscription integration tests for eth_subscribe (#22065)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 22:56:47 +00:00
Georgios Konstantopoulos
968d3c9534 revert: skip transaction prewarming for small blocks (#22059) (#22097) 2026-02-11 14:38:08 -08:00
DaniPopes
fc6666f6a7 perf: treat hashes as bytes in BranchNodeCompact (#22089) 2026-02-11 22:11:49 +00:00
DaniPopes
ff3a854326 perf: use dedicated trie rayon pool for proof workers (#22051) 2026-02-11 22:10:17 +00:00
DaniPopes
04543ed16b chore: add span and log to runtime build (#22064) 2026-02-11 22:06:14 +00:00
Emma Jamieson-Hoare
ae3f0d4d1a test: expand CLI integration tests (#22086)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 21:43:28 +00:00
Georgios Konstantopoulos
5bccdc4a5d feat(engine): add state root task timeout with sequential fallback (#22004)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 20:45:45 +00:00
Georgios Konstantopoulos
0b7cd60668 perf(engine): skip transaction prewarming for small blocks (#22059)
Co-authored-by: yk <yongkang@tempo.xyz>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 20:37:04 +00:00
YK
aa983b49af perf(engine): add PrewarmMode::Skipped to avoid spawning idle workers (#22066)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Ubuntu <ubuntu@dev-yk.tail388b2e.ts.net>
2026-02-11 19:48:48 +00:00
Georgios Konstantopoulos
2aff617767 feat(cli): split account-history and storage-history stage drops (#22083)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 19:21:55 +00:00
Georgios Konstantopoulos
2c5d00ffb5 feat(engine): add gas bucket label to newPayload metrics (#22067)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 19:00:07 +00:00
Georgios Konstantopoulos
e2a3527414 test: add CLI integration tests for reth binary (#22069)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 17:56:16 +00:00
Georgios Konstantopoulos
e4cb3d3aed chore(cli): log received signals at info level (#22071)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 17:55:37 +00:00
DaniPopes
079b7b9d57 fix: don't drop node (#22063) 2026-02-11 16:43:55 +00:00
Georgios Konstantopoulos
8a25d7d3cf chore: remove ress crates from workspace (#22057)
Co-authored-by: mattsse <matt@paradigm.xyz>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-11 13:39:56 +00:00
Minhyuk Kim
a5ced84098 feat(node/builder): add build_with_ordering_and_spawn_maintenance_task to TxPoolBuilder (#21979) 2026-02-11 12:58:29 +00:00
Emma Jamieson-Hoare
59760a2fe3 feat(net): add direction labels to closed_sessions and pending_session_failures metrics (#22014)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:59:06 +00:00
Matthias Seitz
b9d21f293e refactor: remove TypesAnd1-5 staging types from ProviderFactoryBuilder (#22049)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:57:05 +00:00
Georgios Konstantopoulos
dec1cad318 refactor(trie): merge SparseTrieExt into SparseTrie trait (#22035)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:39:56 +00:00
Georgios Konstantopoulos
165b94c3fa chore(docker): pass RUSTC_WRAPPER to cargo build in Dockerfile.depot (#22048)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:37:43 +00:00
Georgios Konstantopoulos
69e4c06ae7 chore(log): simplify default profiler tracing filter (#22050)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:33:20 +00:00
Georgios Konstantopoulos
1406a984a7 ci: pass --no-fail-fast to all cargo nextest runs (#22046)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:21:38 +00:00
Andrey Kolishchak
93d6b9782c fix(node): ethstats conn/last_ping deadlock (#21463) 2026-02-11 03:48:54 +00:00
DaniPopes
68e4ff1f7d feat: global runtime (#21934) 2026-02-11 03:45:09 +00:00
Georgios Konstantopoulos
33467ea6dd fix(reth-bench): increase WS keepalive interval to match persistence timeout (#22039) 2026-02-11 02:45:54 +00:00
Georgios Konstantopoulos
3bf9280b3c refactor(storage): add with_*_opt builder methods to StorageSettings (#21998)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 02:19:33 +00:00
Georgios Konstantopoulos
5c93986e6d feat(reth-bench): accept bare integers as milliseconds for --wait-time (#22038)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 01:57:42 +00:00
Georgios Konstantopoulos
779e0eb8bb perf: downgrade on_hashed_state_update and on_prewarm_targets spans to trace (#22044)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-10 22:45:05 +00:00
Emma Jamieson-Hoare
5c4163c177 feat(exex): make backfill thresholds configurable (#22037)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
2026-02-10 21:30:18 +00:00
Emma Jamieson-Hoare
c5d1f70dd3 fix(txpool): correct swapped args in blob_tx_priority calls (#22030)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-10 21:17:34 +00:00
YK
a8ec78fc87 perf(engine): implement BAL handler for SparseTrieCacheTask (#21990)
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-10 20:50:16 +00:00
Georgios Konstantopoulos
1ecbb0b9d6 chore: move jemalloc, asm-keccak, min-debug-logs to default features (#22034)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-10 20:46:59 +00:00
Georgios Konstantopoulos
a40647e651 fix(docker): fix sccache stats in Dockerfile.depot (#22033)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-10 19:50:30 +00:00
Georgios Konstantopoulos
b25b8c00ee feat(engine): add getPayloadBodiesV2 endpoints for EIP-7928 BAL support (#21774)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-10 15:10:05 +00:00
John Chase
b20a99e1c9 ci: skip scheduled workflows on forks (#22022) 2026-02-10 14:36:20 +00:00
DaniPopes
9ec0e3cd51 chore: rm random log file (#22023) 2026-02-10 15:43:56 +01:00
Georgios Konstantopoulos
09837bbdb4 chore: remove base.reth.rs public endpoint references (#22019)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-10 14:18:46 +00:00
Matthias Seitz
198e457a12 feat(rpc): add subscribeFinalizedChainNotifications endpoint (#22011)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-10 15:07:31 +01:00
DaniPopes
c727c61101 feat(trie): remove SerialSparseTrie (#21808)
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Brian Picciano <me@mediocregopher.com>
2026-02-10 13:50:54 +00:00
Georgios Konstantopoulos
366857559b fix(rocksdb): set max_open_files to prevent fd exhaustion (#22005)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-09 22:32:48 +00:00
Matthias Seitz
ccd15e8a25 refactor(txpool): rename and document validation methods (#22008)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-09 22:17:19 +00:00
Georgios Konstantopoulos
67f89fa4b2 feat(engine): prefetch withdrawal addresses in pre-warming (#21966)
Co-authored-by: mattsse <matt@paradigm.xyz>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-09 22:14:41 +00:00
Georgios Konstantopoulos
a87510069d refactor(pool): add IntoIter: Send bounds to avoid unnecessary Vec collect (#22001)
Co-authored-by: klkvr <klkvr@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Emma Jamieson-Hoare <ejamieson19@gmail.com>
Co-authored-by: Emma Jamieson-Hoare <emmajam@users.noreply.github.com>
2026-02-09 21:45:56 +00:00
Emma Jamieson-Hoare
b3fe168548 fix(rpc): enforce blockHash constraint in append_matching_block_logs (#22007)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-09 21:45:53 +00:00
Emma Jamieson-Hoare
8d7583b6fb chore: move Kurtosis failures to the hive slack channel (#21983)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-09 21:39:04 +00:00
Georgios Konstantopoulos
32466fe223 feat(rpc): propagate TransactionOrigin through send_transaction and batcher (#21969)
Co-authored-by: klkvr <klkvr@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-09 20:34:23 +00:00
Alexey Shekhirin
f2061991c5 feat(engine): reorg depth commitment metric (#21992) 2026-02-09 20:25:54 +00:00
Dan Cline
a549b4d66d feat(storage): add use_hashed_state storage setting (#21997) 2026-02-09 20:15:13 +00:00
Arsenii Kulikov
cdcea2bd33 perf: better scheduling for storage roots computation (#21987)
Co-authored-by: Brian Picciano <me@mediocregopher.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-09 18:10:45 +00:00
Matthias Seitz
3898cc5c3d chore(deps): bump alloy 1.6.2 -> 1.6.3 (#21986)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-09 19:00:13 +01:00
Dan Cline
c558c1d10f fix(stages): skip sender unwind when fully pruned (#21988) 2026-02-09 17:36:20 +00:00
372 changed files with 10679 additions and 21217 deletions

View File

@@ -0,0 +1,5 @@
---
reth-transaction-pool: patch
---
Renamed and documented validation methods for clarity. `validate_one_no_state` and `validate_one_against_state` are now public methods `validate_stateless` and `validate_stateful` with improved documentation explaining their respective validation phases.

View File

@@ -0,0 +1,5 @@
---
reth-network: minor
---
Added reason label to backed_off_peers metric. The metric now tracks backed off peers by reason (too_many_peers, graceful_close, connection_error) to improve observability.

View File

@@ -0,0 +1,5 @@
---
ef-tests: patch
---
Removed reth-stateless crate and stateless validation from ef-tests.

View File

@@ -0,0 +1,6 @@
---
reth-exex: patch
reth-exex-types: patch
---
Added configurable backfill thresholds to ExEx notifications stream and added regression tests for state provider parity between pipeline and backfill execution paths.

View File

@@ -0,0 +1,4 @@
---
---
Added WebSocket subscription integration tests for eth_subscribe.

View File

@@ -0,0 +1,4 @@
---
---
Improved nightly Docker build failure Slack notification with more detailed formatting and context.

View File

@@ -0,0 +1,7 @@
---
reth: patch
reth-cli-commands: patch
reth-node-core: patch
---
Removed experimental ress protocol support for stateless Ethereum nodes.

View File

@@ -0,0 +1,5 @@
---
reth-node-builder: patch
---
Removed biased select in engine service loop to allow fair scheduling of shutdown requests alongside event processing.

View File

@@ -0,0 +1,5 @@
---
reth-transaction-pool: patch
---
Fixed swapped arguments in `blob_tx_priority` function calls, correcting the parameter order to match the function signature.

View File

@@ -0,0 +1,4 @@
---
---
Improved documentation overview page with better structure and clarity.

View File

@@ -0,0 +1,5 @@
---
reth-node-events: patch
---
Updated consensus engine log message to be more accurate about received updates.

View File

@@ -0,0 +1,9 @@
---
reth-network-api: minor
reth-network-types: minor
reth-network: minor
reth-node-core: minor
reth: minor
---
Added optional ENR fork ID enforcement to filter out peers from incompatible networks during peer discovery, controlled by the `--enforce-enr-fork-id` CLI flag.

View File

@@ -0,0 +1,5 @@
---
reth-primitives: patch
---
Moved feature-referenced dependencies from dev-dependencies to optional dependencies to ensure they are available when their corresponding features are enabled.

View File

@@ -0,0 +1,5 @@
---
reth-transaction-pool: minor
---
Added `IntoIter: Send` bounds to `validate_transactions` and `validate_transactions_with_origin` in the `TransactionValidator` trait, avoiding unnecessary `Vec` collects. Simplified default `validate_transactions_with_origin` to delegate to `validate_transactions`.

View File

@@ -0,0 +1,5 @@
---
reth-provider: patch
---
Removed unused staging types from ProviderFactoryBuilder.

View File

@@ -0,0 +1,5 @@
---
reth-trie-sparse: minor
---
Removed `SerialSparseTrie` from the workspace, consolidating on `ParallelSparseTrie` as the single sparse trie implementation in `reth-trie-sparse`.

View File

@@ -0,0 +1,5 @@
---
reth-trie-sparse: patch
---
Fixed a bug where trie nodes could appear in both `updated_nodes` and `removed_nodes` simultaneously by removing entries from `removed_nodes` when a node is inserted as updated.

View File

@@ -0,0 +1,4 @@
---
---
Expanded CLI integration tests with subcommand help coverage, config TOML validation, genesis JSON validation, and send transaction round-trip test for dev mode.

View File

@@ -0,0 +1,5 @@
---
reth-network: minor
---
Added direction labels to `closed_sessions` and `pending_session_failures` metrics. Operators can now distinguish session closures and failures by direction (`active`, `incoming_pending`, `outgoing_pending` for closed sessions; `inbound`, `outbound` for pending session failures).

View File

@@ -0,0 +1,4 @@
---
---
Moved Kurtosis CI failure notifications to the hive Slack channel.

View File

@@ -0,0 +1,7 @@
---
reth-rpc-api: minor
reth-rpc-builder: patch
reth-rpc: minor
---
Added `subscribeFinalizedChainNotifications` RPC endpoint that buffers committed chain notifications and emits them once a new finalized block is received.

View File

@@ -0,0 +1,5 @@
---
reth-trie: patch
---
Fixed a potential panic in `ProofCalculator` by clearing internal computation state (`branch_stack`, `child_stack`, `branch_path`, etc.) after errors, preventing stale state from causing `usize` underflow panics when the calculator is reused. Added a test verifying correct behavior after simulated mid-computation errors.

2
.github/CODEOWNERS vendored
View File

@@ -38,7 +38,7 @@ crates/storage/libmdbx-rs/ @shekhirin
crates/storage/nippy-jar/ @joshieDo @shekhirin
crates/storage/provider/ @joshieDo @shekhirin @yongkangc
crates/storage/storage-api/ @joshieDo
crates/tasks/ @mattsse
crates/tasks/ @mattsse @DaniPopes
crates/tokio-util/ @mattsse
crates/tracing/ @mattsse @shekhirin
crates/tracing-otlp/ @mattsse @Rjected

View File

@@ -27,7 +27,6 @@ crates_to_check=(
reth-ethereum-forks
reth-ethereum-primitives
reth-ethereum-consensus
reth-stateless
)
any_failed=0

View File

@@ -63,6 +63,7 @@ exclude_crates=(
reth-provider # tokio
reth-prune # tokio
reth-prune-static-files # reth-provider
reth-tasks # tokio rt-multi-thread
reth-stages-api # reth-provider, reth-prune
reth-static-file # tokio
reth-transaction-pool # c-kzg

View File

@@ -15,6 +15,7 @@ permissions:
jobs:
update:
if: github.repository == 'paradigmxyz/reth'
uses: tempoxyz/ci/.github/workflows/cargo-update-pr.yml@main
secrets:
token: ${{ secrets.GITHUB_TOKEN }}

View File

@@ -31,6 +31,7 @@ on:
jobs:
build:
if: github.repository == 'paradigmxyz/reth'
name: Build Docker images
runs-on: ubuntu-24.04
permissions:
@@ -69,18 +70,27 @@ jobs:
# Add 'latest' tag for non-RC releases
if [[ ! "$VERSION" =~ -rc ]]; then
echo "ethereum_tags=${REGISTRY}/reth:${VERSION},${REGISTRY}/reth:latest" >> "$GITHUB_OUTPUT"
{
echo "ethereum_set<<EOF"
echo "ethereum.tags=${REGISTRY}/reth:${VERSION}"
echo "ethereum.tags=${REGISTRY}/reth:latest"
echo "EOF"
} >> "$GITHUB_OUTPUT"
else
echo "ethereum_tags=${REGISTRY}/reth:${VERSION}" >> "$GITHUB_OUTPUT"
echo "ethereum_set=ethereum.tags=${REGISTRY}/reth:${VERSION}" >> "$GITHUB_OUTPUT"
fi
elif [[ "${{ github.event_name }}" == "schedule" ]] || [[ "${{ inputs.build_type }}" == "nightly" ]]; then
echo "targets=nightly" >> "$GITHUB_OUTPUT"
echo "ethereum_tags=${REGISTRY}/reth:nightly" >> "$GITHUB_OUTPUT"
echo "ethereum_set=ethereum.tags=${REGISTRY}/reth:nightly" >> "$GITHUB_OUTPUT"
else
# git-sha build
echo "targets=ethereum" >> "$GITHUB_OUTPUT"
echo "ethereum_tags=${REGISTRY}/reth:${{ github.sha }}" >> "$GITHUB_OUTPUT"
echo "ethereum_set=ethereum.tags=${REGISTRY}/reth:${{ github.sha }}" >> "$GITHUB_OUTPUT"
fi
- name: Build and push images
@@ -96,7 +106,7 @@ jobs:
targets: ${{ steps.params.outputs.targets }}
push: ${{ !(github.event_name == 'workflow_dispatch' && inputs.dry_run) }}
set: |
ethereum.tags=${{ steps.params.outputs.ethereum_tags }}
${{ steps.params.outputs.ethereum_set }}
- name: Verify image architectures
env:
@@ -116,6 +126,18 @@ jobs:
- name: Slack Webhook Action
uses: rtCamp/action-slack-notify@v2
env:
SLACK_COLOR: ${{ job.status }}
SLACK_MESSAGE: "Failed run: https://github.com/paradigmxyz/reth/actions/runs/${{ github.run_id }}"
SLACK_COLOR: danger
SLACK_ICON_EMOJI: ":rotating_light:"
SLACK_USERNAME: "GitHub Actions"
SLACK_TITLE: ":rotating_light: Nightly Docker Build Failed"
SLACK_MESSAGE: |
The scheduled nightly Docker build failed.
*Commit:* `${{ github.sha }}`
*Branch:* `${{ github.ref_name }}`
*Run:* <https://github.com/paradigmxyz/reth/actions/runs/${{ github.run_id }}|View logs>
*Action required:* Re-run the workflow or investigate the build failure.
SLACK_FOOTER: "paradigmxyz/reth · docker.yml"
MSG_MINIMAL: true
SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK_URL }}

View File

@@ -35,6 +35,7 @@ jobs:
- name: Run e2e tests
run: |
cargo nextest run \
--no-fail-fast \
--locked --features "asm-keccak" \
--workspace \
--exclude 'example-*' \
@@ -61,6 +62,7 @@ jobs:
- name: Run RocksDB e2e tests
run: |
cargo nextest run \
--no-fail-fast \
--locked --features "edge" \
-p reth-e2e-test-utils \
-E 'binary(rocksdb)'

View File

@@ -46,6 +46,7 @@ jobs:
- name: Run tests
run: |
cargo nextest run \
--no-fail-fast \
--locked --features "asm-keccak ${{ matrix.network }} ${{ matrix.storage == 'edge' && 'edge' || '' }}" \
--workspace --exclude ef-tests \
-E "kind(test) and not binary(e2e_testsuite)"
@@ -64,7 +65,7 @@ jobs:
era-files:
name: era1 file integration tests once a day
if: github.event_name == 'schedule'
if: github.event_name == 'schedule' && github.repository == 'paradigmxyz/reth'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
@@ -76,4 +77,4 @@ jobs:
with:
cache-on-failure: true
- name: run era1 files integration tests
run: cargo nextest run --release --package reth-era --test it -- --ignored
run: cargo nextest run --no-fail-fast --release --package reth-era --test it -- --ignored

View File

@@ -20,6 +20,7 @@ concurrency:
jobs:
build-reth:
if: github.repository == 'paradigmxyz/reth'
uses: ./.github/workflows/docker-test.yml
with:
hive_target: kurtosis
@@ -65,4 +66,4 @@ jobs:
env:
SLACK_COLOR: ${{ job.status }}
SLACK_MESSAGE: "Failed run: https://github.com/paradigmxyz/reth/actions/runs/${{ github.run_id }}"
SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK_URL }}
SLACK_WEBHOOK: ${{ secrets.SLACK_HIVE_WEBHOOK_URL }}

View File

@@ -7,6 +7,7 @@ on:
jobs:
build:
if: github.repository == 'paradigmxyz/reth'
name: build reproducible binaries
runs-on: ${{ matrix.runner }}
strategy:

View File

@@ -38,7 +38,7 @@ jobs:
cache-on-failure: true
- name: Build reth
run: |
cargo install --features asm-keccak,jemalloc --path bin/reth
cargo install --path bin/reth
- name: Run headers stage
run: |
reth stage run headers --from ${{ env.FROM_BLOCK }} --to ${{ env.TO_BLOCK }} --commit --checkpoints

View File

@@ -9,6 +9,7 @@ on:
jobs:
close-issues:
if: github.repository == 'paradigmxyz/reth'
runs-on: ubuntu-latest
permissions:
issues: write

View File

@@ -17,6 +17,7 @@ concurrency:
jobs:
sync:
if: github.repository == 'paradigmxyz/reth'
name: sync (${{ matrix.chain.bin }})
runs-on: depot-ubuntu-latest
env:

View File

@@ -17,6 +17,7 @@ concurrency:
jobs:
sync:
if: github.repository == 'paradigmxyz/reth'
name: sync (${{ matrix.chain.bin }})
runs-on: depot-ubuntu-latest
env:

View File

@@ -49,6 +49,7 @@ jobs:
- name: Run tests
run: |
cargo nextest run \
--no-fail-fast \
--features "${{ matrix.features }} $EDGE_FEATURES" --locked \
${{ matrix.exclude_args }} --workspace \
--exclude ef-tests --no-tests=warn \
@@ -87,7 +88,7 @@ jobs:
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
- run: cargo nextest run --cargo-profile hivetests -p ef-tests --features "asm-keccak ef-tests"
- run: cargo nextest run --no-fail-fast --cargo-profile hivetests -p ef-tests --features "asm-keccak ef-tests"
doc:
name: doc tests

View File

@@ -381,7 +381,7 @@ cargo nextest run --workspace
cargo bench --bench bench_name
# Build optimized binary
cargo build --release --features "jemalloc asm-keccak"
cargo build --release
# Check compilation for all features
cargo check --workspace --all-features

829
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
[workspace.package]
version = "1.10.2"
version = "1.11.3"
edition = "2024"
rust-version = "1.88"
license = "MIT OR Apache-2.0"
@@ -83,8 +83,6 @@ members = [
"crates/prune/db",
"crates/prune/prune",
"crates/prune/types",
"crates/ress/protocol",
"crates/ress/provider",
"crates/revm/",
"crates/rpc/ipc/",
"crates/rpc/rpc-api/",
@@ -101,7 +99,6 @@ members = [
"crates/stages/api/",
"crates/stages/stages/",
"crates/stages/types/",
"crates/stateless",
"crates/static-file/static-file",
"crates/static-file/types/",
"crates/storage/codecs/",
@@ -125,7 +122,6 @@ members = [
"crates/trie/db",
"crates/trie/parallel/",
"crates/trie/sparse",
"crates/trie/sparse-parallel/",
"crates/trie/trie",
"examples/beacon-api-sidecar-fetcher/",
"examples/beacon-api-sse/",
@@ -310,6 +306,11 @@ inherits = "release"
lto = "fat"
codegen-units = 1
[profile.maxperf-symbols]
inherits = "maxperf"
debug = "full"
strip = "none"
[profile.reproducible]
inherits = "release"
panic = "abort"
@@ -418,7 +419,6 @@ reth-rpc-convert = { path = "crates/rpc/rpc-convert" }
reth-stages = { path = "crates/stages/stages" }
reth-stages-api = { path = "crates/stages/api" }
reth-stages-types = { path = "crates/stages/types", default-features = false }
reth-stateless = { path = "crates/stateless", default-features = false }
reth-static-file = { path = "crates/static-file/static-file" }
reth-static-file-types = { path = "crates/static-file/types", default-features = false }
reth-storage-api = { path = "crates/storage/storage-api", default-features = false }
@@ -434,10 +434,7 @@ reth-trie-common = { path = "crates/trie/common", default-features = false }
reth-trie-db = { path = "crates/trie/db" }
reth-trie-parallel = { path = "crates/trie/parallel" }
reth-trie-sparse = { path = "crates/trie/sparse", default-features = false }
reth-trie-sparse-parallel = { path = "crates/trie/sparse-parallel" }
reth-zstd-compressors = { path = "crates/storage/zstd-compressors", default-features = false }
reth-ress-protocol = { path = "crates/ress/protocol" }
reth-ress-provider = { path = "crates/ress/provider" }
# revm
revm = { version = "34.0.0", default-features = false }
@@ -451,46 +448,57 @@ op-revm = { version = "15.0.0", default-features = false }
revm-inspectors = "0.34.2"
# eth
alloy-dyn-abi = "1.5.4"
alloy-primitives = { version = "1.5.4", default-features = false, features = ["map-foldhash"] }
alloy-sol-types = { version = "1.5.4", default-features = false }
alloy-dyn-abi = "1.5.6"
alloy-primitives = { version = "1.5.6", default-features = false, features = [
"map-foldhash",
] }
alloy-sol-types = { version = "1.5.6", default-features = false }
alloy-chains = { version = "0.2.5", default-features = false }
alloy-eip2124 = { version = "0.2.0", default-features = false }
alloy-eip7928 = { version = "0.3.0", default-features = false }
alloy-evm = { version = "0.27.2", default-features = false }
alloy-rlp = { version = "0.3.10", default-features = false, features = ["core-net"] }
alloy-rlp = { version = "0.3.13", default-features = false, features = [
"core-net",
] }
alloy-trie = { version = "0.9.4", default-features = false }
alloy-hardforks = "0.4.5"
alloy-consensus = { version = "1.6.2", default-features = false }
alloy-contract = { version = "1.6.2", default-features = false }
alloy-eips = { version = "1.6.2", default-features = false }
alloy-genesis = { version = "1.6.2", default-features = false }
alloy-json-rpc = { version = "1.6.2", default-features = false }
alloy-network = { version = "1.6.2", default-features = false }
alloy-network-primitives = { version = "1.6.2", default-features = false }
alloy-provider = { version = "1.6.2", features = ["reqwest", "debug-api"], default-features = false }
alloy-pubsub = { version = "1.6.2", default-features = false }
alloy-rpc-client = { version = "1.6.2", default-features = false }
alloy-rpc-types = { version = "1.6.2", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.6.2", default-features = false }
alloy-rpc-types-anvil = { version = "1.6.2", default-features = false }
alloy-rpc-types-beacon = { version = "1.6.2", default-features = false }
alloy-rpc-types-debug = { version = "1.6.2", default-features = false }
alloy-rpc-types-engine = { version = "1.6.2", default-features = false }
alloy-rpc-types-eth = { version = "1.6.2", default-features = false }
alloy-rpc-types-mev = { version = "1.6.2", default-features = false }
alloy-rpc-types-trace = { version = "1.6.2", default-features = false }
alloy-rpc-types-txpool = { version = "1.6.2", default-features = false }
alloy-serde = { version = "1.6.2", default-features = false }
alloy-signer = { version = "1.6.2", default-features = false }
alloy-signer-local = { version = "1.6.2", default-features = false }
alloy-transport = { version = "1.6.2" }
alloy-transport-http = { version = "1.6.2", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.6.2", default-features = false }
alloy-transport-ws = { version = "1.6.2", default-features = false }
alloy-consensus = { version = "1.6.3", default-features = false }
alloy-contract = { version = "1.6.3", default-features = false }
alloy-eips = { version = "1.6.3", default-features = false }
alloy-genesis = { version = "1.6.3", default-features = false }
alloy-json-rpc = { version = "1.6.3", default-features = false }
alloy-network = { version = "1.6.3", default-features = false }
alloy-network-primitives = { version = "1.6.3", default-features = false }
alloy-provider = { version = "1.6.3", features = [
"reqwest",
"debug-api",
], default-features = false }
alloy-pubsub = { version = "1.6.3", default-features = false }
alloy-rpc-client = { version = "1.6.3", default-features = false }
alloy-rpc-types = { version = "1.6.3", features = [
"eth",
], default-features = false }
alloy-rpc-types-admin = { version = "1.6.3", default-features = false }
alloy-rpc-types-anvil = { version = "1.6.3", default-features = false }
alloy-rpc-types-beacon = { version = "1.6.3", default-features = false }
alloy-rpc-types-debug = { version = "1.6.3", default-features = false }
alloy-rpc-types-engine = { version = "1.6.3", default-features = false }
alloy-rpc-types-eth = { version = "1.6.3", default-features = false }
alloy-rpc-types-mev = { version = "1.6.3", default-features = false }
alloy-rpc-types-trace = { version = "1.6.3", default-features = false }
alloy-rpc-types-txpool = { version = "1.6.3", default-features = false }
alloy-serde = { version = "1.6.3", default-features = false }
alloy-signer = { version = "1.6.3", default-features = false }
alloy-signer-local = { version = "1.6.3", default-features = false }
alloy-transport = { version = "1.6.3" }
alloy-transport-http = { version = "1.6.3", features = [
"reqwest-rustls-tls",
], default-features = false }
alloy-transport-ipc = { version = "1.6.3", default-features = false }
alloy-transport-ws = { version = "1.6.3", default-features = false }
# op
alloy-op-evm = { version = "0.27.2", default-features = false }
@@ -507,7 +515,10 @@ either = { version = "1.15.0", default-features = false }
arrayvec = { version = "0.7.6", default-features = false }
aquamarine = "0.6"
auto_impl = "1"
backon = { version = "1.2", default-features = false, features = ["std-blocking-sleep", "tokio-sleep"] }
backon = { version = "1.2", default-features = false, features = [
"std-blocking-sleep",
"tokio-sleep",
] }
bincode = "1.3"
bitflags = "2.4"
boyer-moore-magiclen = "0.2.16"
@@ -529,9 +540,13 @@ itertools = { version = "0.14", default-features = false }
linked_hash_set = "0.1"
lz4 = "1.28.1"
modular-bitfield = "0.13.1"
notify = { version = "8.0.0", default-features = false, features = ["macos_fsevent"] }
notify = { version = "8.0.0", default-features = false, features = [
"macos_fsevent",
] }
nybbles = { version = "0.4.8", default-features = false }
once_cell = { version = "1.19", default-features = false, features = ["critical-section"] }
once_cell = { version = "1.19", default-features = false, features = [
"critical-section",
] }
parking_lot = "0.12"
paste = "1.0"
rand = "0.9"
@@ -550,7 +565,9 @@ strum_macros = "0.27"
syn = "2.0"
thiserror = { version = "2.0.0", default-features = false }
tar = "0.4.44"
tracing = { version = "0.1.0", default-features = false }
tracing = { version = "0.1.0", default-features = false, features = [
"attributes",
] }
tracing-appender = "0.2"
url = { version = "2.3", default-features = false }
zstd = "0.13"
@@ -588,7 +605,11 @@ futures-util = { version = "0.3", default-features = false }
hyper = "1.3"
hyper-util = "0.1.5"
pin-project = "1.0.12"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "rustls-tls-native-roots", "stream"] }
reqwest = { version = "0.12", default-features = false, features = [
"rustls-tls",
"rustls-tls-native-roots",
"stream",
] }
tracing-futures = "0.2"
tower = "0.5"
tower-http = "0.6"
@@ -613,7 +634,10 @@ proptest-arbitrary-interop = "0.1.0"
# crypto
enr = { version = "0.13", default-features = false }
k256 = { version = "0.13", default-features = false, features = ["ecdsa"] }
secp256k1 = { version = "0.30", default-features = false, features = ["global-context", "recovery"] }
secp256k1 = { version = "0.30", default-features = false, features = [
"global-context",
"recovery",
] }
# rand 8 for secp256k1
rand_08 = { package = "rand", version = "0.8" }

View File

@@ -51,14 +51,15 @@ RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \
--mount=type=cache,target=/usr/local/cargo/registry,sharing=shared \
--mount=type=cache,target=/usr/local/cargo/git,sharing=shared \
--mount=type=cache,target=$SCCACHE_DIR,sharing=shared \
export RUSTC_WRAPPER=sccache SCCACHE_WEBDAV_ENDPOINT=https://cache.depot.dev SCCACHE_DIR=/sccache && \
sccache --start-server && \
if [ -n "$RUSTFLAGS" ]; then \
export RUSTFLAGS="$RUSTFLAGS"; \
elif [ "$TARGETPLATFORM" = "linux/amd64" ]; then \
export RUSTFLAGS="-C target-cpu=x86-64-v3 -C target-feature=+pclmulqdq"; \
fi && \
cargo build --profile $BUILD_PROFILE --features "$FEATURES" --locked --bin $BINARY --manifest-path $MANIFEST_PATH/Cargo.toml
RUN sccache --show-stats || true
cargo build --profile $BUILD_PROFILE --features "$FEATURES" --locked --bin $BINARY --manifest-path $MANIFEST_PATH/Cargo.toml && \
sccache --show-stats
# Copy binary to a known location (ARG not resolved in COPY)
# Note: Custom profiles like maxperf/profiling output to target/<profile>/, not target/release/

View File

@@ -12,12 +12,7 @@ FULL_DB_TOOLS_DIR := $(shell pwd)/$(DB_TOOLS_DIR)/
CARGO_TARGET_DIR ?= target
# List of features to use when building. Can be overridden via the environment.
# No jemalloc on Windows
ifeq ($(OS),Windows_NT)
FEATURES ?= asm-keccak min-debug-logs
else
FEATURES ?= jemalloc asm-keccak min-debug-logs
endif
FEATURES ?=
# Cargo profile for builds. Default is for local builds, CI uses an override.
PROFILE ?= release
@@ -158,7 +153,7 @@ COV_FILE := lcov.info
.PHONY: test-unit
test-unit: ## Run unit tests.
cargo install cargo-nextest --locked
cargo nextest run $(UNIT_TEST_ARGS)
cargo nextest run --no-fail-fast $(UNIT_TEST_ARGS)
.PHONY: cov-unit
@@ -191,7 +186,7 @@ $(EEST_TESTS_DIR):
.PHONY: ef-tests
ef-tests: $(EF_TESTS_DIR) $(EEST_TESTS_DIR) ## Runs Legacy and EEST tests.
cargo nextest run -p ef-tests --release --features ef-tests
cargo nextest run --no-fail-fast -p ef-tests --release --features ef-tests
##@ reth-bench
@@ -238,16 +233,15 @@ update-book-cli: build-debug ## Update book cli documentation.
.PHONY: profiling
profiling: ## Builds `reth` with optimisations, but also symbols.
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --features jemalloc,asm-keccak
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling
.PHONY: maxperf
maxperf: ## Builds `reth` with the most aggressive optimisations.
RUSTFLAGS="-C target-cpu=native" cargo build --profile maxperf --features jemalloc,asm-keccak
RUSTFLAGS="-C target-cpu=native" cargo build --profile maxperf
.PHONY: maxperf-no-asm
maxperf-no-asm: ## Builds `reth` with the most aggressive optimisations, minus the "asm-keccak" feature.
RUSTFLAGS="-C target-cpu=native" cargo build --profile maxperf --features jemalloc
RUSTFLAGS="-C target-cpu=native" cargo build --profile maxperf --no-default-features --features jemalloc,min-debug-logs,otlp,otlp-logs,reth-revm/portable,js-tracer,keccak-cache-global,rocksdb
fmt:
cargo +nightly fmt

View File

@@ -30,7 +30,7 @@ reth-bench-compare \
| `--draw` | Generate charts (needs Python/uv) | `false` | No |
| `--profile` | Enable CPU profiling (needs samply) | `false` | No |
| `-vvvv` | Debug logging | Info | No |
| `--features <FEATURES>` | Rust features for both builds | `jemalloc,asm-keccak` | No |
| `--features <FEATURES>` | Extra Rust features for both builds | - | No |
| `--rustflags <FLAGS>` | RUSTFLAGS for both builds | `-C target-cpu=native` | No |
| `--baseline-features <FEATURES>` | Features for baseline only | Inherits `--features` | No |
| `--feature-features <FEATURES>` | Features for feature only | Inherits `--features` | No |

View File

@@ -191,10 +191,9 @@ pub(crate) struct Args {
#[arg(trailing_var_arg = true, allow_hyphen_values = true)]
pub reth_args: Vec<String>,
/// Comma-separated list of features to enable during reth compilation (applied to both builds)
///
/// Example: `jemalloc,asm-keccak`
#[arg(long, value_name = "FEATURES", default_value = "jemalloc,asm-keccak")]
/// Comma-separated list of extra features to enable during reth compilation (applied to both
/// builds)
#[arg(long, value_name = "FEATURES", default_value = "")]
pub features: String,
/// Comma-separated list of features to enable only for baseline build (overrides --features)
@@ -205,7 +204,7 @@ pub(crate) struct Args {
/// Comma-separated list of features to enable only for feature build (overrides --features)
///
/// Example: `--feature-features jemalloc,asm-keccak`
/// Example: `--feature-features jemalloc-prof`
#[arg(long, value_name = "FEATURES")]
pub feature_features: Option<String>,
@@ -277,10 +276,8 @@ impl Args {
/// Get the default RPC URL for a given chain
const fn get_default_rpc_url(chain: &Chain) -> &'static str {
match chain.id() {
8453 => "https://base.reth.rs/rpc", // base
84532 => "https://base-sepolia.rpc.ithaca.xyz", // base-sepolia
27082 => "https://rpc.hoodi.ethpandaops.io", // hoodi
_ => "https://ethereum.reth.rs/rpc", // mainnet and fallback
27082 => "https://rpc.hoodi.ethpandaops.io", // hoodi
_ => "https://ethereum.reth.rs/rpc", // mainnet and fallback
}
}

View File

@@ -32,7 +32,7 @@ Otherwise, running `make maxperf` at the root of the repo should be sufficient f
The `reth-bench new-payload-fcu` command is the most representative of ethereum mainnet live sync, alternating between sending `engine_newPayload` calls and `engine_forkchoiceUpdated` calls.
The `new-payload-fcu` command supports two optional waiting modes that can be used together or independently:
- `--wait-time <duration>`: Fixed sleep interval between blocks (e.g., `--wait-time 100ms`)
- `--wait-time <duration>`: Fixed sleep interval between blocks (e.g., `--wait-time 100ms` or `--wait-time 400` for 400ms)
- `--wait-for-persistence`: Waits for blocks to be persisted using the `reth_subscribePersistedBlock` subscription
When using `--wait-for-persistence`, the benchmark waits after every `(threshold + 1)` blocks, where the threshold defaults to the engine's persistence threshold (2). This can be customized with `--persistence-threshold <N>`.
@@ -73,7 +73,7 @@ make profiling
If the purpose of the benchmark is to obtain `jemalloc` memory profiles that can then be analyzed by `jeprof`, it should be compiled with the `profiling` profile and the `jemalloc-prof` feature:
```bash
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --features "jemalloc-prof,asm-keccak"
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --features "jemalloc-prof"
```
> [!NOTE]
@@ -82,7 +82,7 @@ RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --features "jem
Finally, if the purpose of the benchmark is to profile the node when `snmalloc` is configured as the default allocator, it would be built with the following
command:
```bash
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --no-default-features --features "snmalloc-native,asm-keccak"
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --no-default-features --features "snmalloc-native,asm-keccak,min-debug-logs"
```
### Run the Benchmark:

View File

@@ -192,6 +192,15 @@ impl Command {
parent_header = block.header;
parent_hash = block_hash;
blocks_processed += 1;
let progress = match mode {
RampMode::Blocks(total) => format!("{blocks_processed}/{total}"),
RampMode::TargetGasLimit(target) => {
let pct = (parent_header.gas_limit as f64 / target as f64 * 100.0).min(100.0);
format!("{pct:.1}%")
}
};
info!(target: "reth-bench", progress, block_number = parent_header.number, gas_limit = parent_header.gas_limit, "Block processed");
}
let final_gas_limit = parent_header.gas_limit;

View File

@@ -2,7 +2,10 @@
use crate::valid_payload::call_forkchoice_updated;
use eyre::Result;
use std::io::{BufReader, Read};
use std::{
io::{BufReader, Read},
time::Duration,
};
/// Read input from either a file path or stdin.
pub(crate) fn read_input(path: Option<&str>) -> Result<String> {
@@ -51,6 +54,22 @@ pub(crate) fn parse_gas_limit(s: &str) -> eyre::Result<u64> {
let base: u64 = num_str.trim().parse()?;
base.checked_mul(multiplier).ok_or_else(|| eyre::eyre!("value overflow"))
}
/// Parses a duration string, treating bare integers as milliseconds.
///
/// Accepts either a `humantime` duration string (e.g. `"100ms"`, `"2s"`) or a plain
/// integer which is interpreted as milliseconds (e.g. `"400"` → 400ms).
pub(crate) fn parse_duration(s: &str) -> eyre::Result<Duration> {
match humantime::parse_duration(s) {
Ok(d) => Ok(d),
Err(_) => {
let millis: u64 =
s.trim().parse().map_err(|_| eyre::eyre!("invalid duration: {s:?}"))?;
Ok(Duration::from_millis(millis))
}
}
}
use alloy_consensus::Header;
use alloy_eips::eip4844::kzg_to_versioned_hash;
use alloy_primitives::{Address, B256};
@@ -270,4 +289,24 @@ mod tests {
assert!(parse_gas_limit("G").is_err());
assert!(parse_gas_limit("-1G").is_err());
}
#[test]
fn test_parse_duration_with_unit() {
assert_eq!(parse_duration("100ms").unwrap(), Duration::from_millis(100));
assert_eq!(parse_duration("2s").unwrap(), Duration::from_secs(2));
assert_eq!(parse_duration("1m").unwrap(), Duration::from_secs(60));
}
#[test]
fn test_parse_duration_bare_millis() {
assert_eq!(parse_duration("400").unwrap(), Duration::from_millis(400));
assert_eq!(parse_duration("0").unwrap(), Duration::from_millis(0));
assert_eq!(parse_duration("1000").unwrap(), Duration::from_millis(1000));
}
#[test]
fn test_parse_duration_errors() {
assert!(parse_duration("abc").is_err());
assert!(parse_duration("").is_err());
}
}

View File

@@ -12,6 +12,7 @@
use crate::{
bench::{
context::BenchContext,
helpers::parse_duration,
output::{
write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
},
@@ -25,7 +26,6 @@ use alloy_provider::Provider;
use alloy_rpc_types_engine::ForkchoiceState;
use clap::Parser;
use eyre::{Context, OptionExt};
use humantime::parse_duration;
use reth_cli_runner::CliContext;
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
use reth_node_core::args::BenchmarkArgs;
@@ -40,6 +40,9 @@ pub struct Command {
rpc_url: String,
/// How long to wait after a forkchoice update before sending the next payload.
///
/// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
/// milliseconds (e.g. `400`).
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
wait_time: Option<Duration>,
@@ -117,7 +120,7 @@ impl Command {
self.benchmark.ws_rpc_url.as_deref(),
&self.benchmark.engine_rpc_url,
)?;
let sub = setup_persistence_subscription(ws_url).await?;
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
Some(PersistenceWaiter::with_duration_and_subscription(
duration,
sub,
@@ -131,7 +134,7 @@ impl Command {
self.benchmark.ws_rpc_url.as_deref(),
&self.benchmark.engine_rpc_url,
)?;
let sub = setup_persistence_subscription(ws_url).await?;
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
Some(PersistenceWaiter::with_subscription(
sub,
self.persistence_threshold,
@@ -150,6 +153,7 @@ impl Command {
..
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let total_blocks = benchmark_mode.total_blocks();
let buffer_size = self.rpc_block_buffer_size;
// Use a oneshot channel to propagate errors from the spawned task
@@ -203,6 +207,7 @@ impl Command {
});
let mut results = Vec::new();
let mut blocks_processed = 0u64;
let total_benchmark_duration = Instant::now();
let mut total_wait_time = Duration::ZERO;
@@ -246,8 +251,13 @@ impl Command {
// Exclude time spent waiting on the block prefetch channel from the benchmark duration.
// We want to measure engine throughput, not RPC fetch latency.
blocks_processed += 1;
let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
info!(target: "reth-bench", %combined_result);
let progress = match total_blocks {
Some(total) => format!("{blocks_processed}/{total}"),
None => format!("{blocks_processed}"),
};
info!(target: "reth-bench", progress, %combined_result);
if let Some(w) = &mut waiter {
w.on_block(block_number).await?;

View File

@@ -52,6 +52,7 @@ impl Command {
..
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let total_blocks = benchmark_mode.total_blocks();
let buffer_size = self.rpc_block_buffer_size;
// Use a oneshot channel to propagate errors from the spawned task
@@ -82,8 +83,8 @@ impl Command {
}
});
// put results in a summary vec so they can be printed at the end
let mut results = Vec::new();
let mut blocks_processed = 0u64;
let total_benchmark_duration = Instant::now();
let mut total_wait_time = Duration::ZERO;
@@ -105,7 +106,12 @@ impl Command {
call_new_payload(&auth_provider, version, params).await?;
let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
info!(target: "reth-bench", %new_payload_result);
blocks_processed += 1;
let progress = match total_blocks {
Some(total) => format!("{blocks_processed}/{total}"),
None => format!("{blocks_processed}"),
};
info!(target: "reth-bench", progress, %new_payload_result);
// current duration since the start of the benchmark minus the time
// waiting for blocks

View File

@@ -154,12 +154,18 @@ impl PersistenceSubscription {
}
/// Establishes a websocket connection and subscribes to `reth_subscribePersistedBlock`.
///
/// The `keepalive_interval` is set to match `persistence_timeout` so that the `WebSocket`
/// connection is not dropped during long MDBX commits that block the server from responding
/// to pings.
pub(crate) async fn setup_persistence_subscription(
ws_url: Url,
persistence_timeout: Duration,
) -> eyre::Result<PersistenceSubscription> {
info!(target: "reth-bench", "Connecting to WebSocket at {} for persistence subscription", ws_url);
let ws_connect = WsConnect::new(ws_url.to_string());
let ws_connect =
WsConnect::new(ws_url.to_string()).with_keepalive_interval(persistence_timeout);
let client = RpcClient::connect_pubsub(ws_connect)
.await
.wrap_err("Failed to connect to WebSocket RPC endpoint")?;

View File

@@ -14,6 +14,7 @@
use crate::{
authenticated_transport::AuthenticatedTransportConnect,
bench::{
helpers::parse_duration,
output::{
write_benchmark_results, CombinedResult, GasRampPayloadFile, NewPayloadResult,
TotalGasOutput, TotalGasRow,
@@ -30,7 +31,6 @@ use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret};
use clap::Parser;
use eyre::Context;
use humantime::parse_duration;
use reth_cli_runner::CliContext;
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
use reth_node_api::EngineApiMessageVersion;
@@ -78,6 +78,9 @@ pub struct Command {
output: Option<PathBuf>,
/// How long to wait after a forkchoice update before sending the next payload.
///
/// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
/// milliseconds (e.g. `400`).
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
wait_time: Option<Duration>,
@@ -166,7 +169,7 @@ impl Command {
let mut waiter = match (self.wait_time, self.wait_for_persistence) {
(Some(duration), true) => {
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
let sub = setup_persistence_subscription(ws_url).await?;
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
Some(PersistenceWaiter::with_duration_and_subscription(
duration,
sub,
@@ -177,7 +180,7 @@ impl Command {
(Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
(None, true) => {
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
let sub = setup_persistence_subscription(ws_url).await?;
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
Some(PersistenceWaiter::with_subscription(
sub,
self.persistence_threshold,
@@ -338,7 +341,8 @@ impl Command {
};
let current_duration = total_benchmark_duration.elapsed();
info!(target: "reth-bench", %combined_result);
let progress = format!("{}/{}", i + 1, payloads.len());
info!(target: "reth-bench", progress, %combined_result);
if let Some(w) = &mut waiter {
w.on_block(block_number).await?;

View File

@@ -20,6 +20,19 @@ impl BenchMode {
}
}
/// Returns the total number of blocks in the benchmark, if known.
///
/// For [`BenchMode::Range`] this is the length of the range.
/// For [`BenchMode::Continuous`] the total is unbounded, so `None` is returned.
pub const fn total_blocks(&self) -> Option<u64> {
match self {
Self::Continuous(_) => None,
Self::Range(range) => {
Some(range.end().saturating_sub(*range.start()).saturating_add(1))
}
}
}
/// Create a [`BenchMode`] from optional `from` and `to` fields.
pub fn new(from: Option<u64>, to: Option<u64>, latest_block: u64) -> Result<Self, eyre::Error> {
// If neither `--from` nor `--to` are provided, we will run the benchmark continuously,

View File

@@ -33,7 +33,6 @@ reth-chainspec.workspace = true
reth-primitives.workspace = true
reth-db = { workspace = true, features = ["mdbx"] }
reth-provider.workspace = true
reth-evm.workspace = true
reth-revm.workspace = true
reth-transaction-pool.workspace = true
reth-cli-runner.workspace = true
@@ -53,32 +52,31 @@ reth-payload-primitives.workspace = true
reth-node-api.workspace = true
reth-node-core.workspace = true
reth-ethereum-payload-builder.workspace = true
reth-ethereum-primitives.workspace = true
reth-node-ethereum.workspace = true
reth-node-builder.workspace = true
reth-node-metrics.workspace = true
reth-consensus.workspace = true
reth-tokio-util.workspace = true
reth-ress-protocol.workspace = true
reth-ress-provider.workspace = true
# alloy
alloy-primitives.workspace = true
alloy-rpc-types = { workspace = true, features = ["engine"] }
# tracing
tracing.workspace = true
# async
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] }
# misc
aquamarine.workspace = true
clap = { workspace = true, features = ["derive", "env"] }
eyre.workspace = true
[dev-dependencies]
alloy-node-bindings = "1.6.3"
alloy-provider = { workspace = true, features = ["reqwest"] }
alloy-rpc-types-eth.workspace = true
backon.workspace = true
serde_json.workspace = true
tempfile.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
toml.workspace = true
[features]
default = [
@@ -89,6 +87,7 @@ default = [
"js-tracer",
"keccak-cache-global",
"asm-keccak",
"min-debug-logs",
"rocksdb",
]
@@ -114,10 +113,12 @@ asm-keccak = [
"reth-primitives/asm-keccak",
"reth-ethereum-cli/asm-keccak",
"reth-node-ethereum/asm-keccak",
"alloy-primitives/asm-keccak",
]
keccak-cache-global = [
"reth-node-core/keccak-cache-global",
"reth-node-ethereum/keccak-cache-global",
"alloy-primitives/keccak-cache-global",
]
jemalloc = [
"reth-cli-util/jemalloc",

View File

@@ -51,6 +51,9 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg))]
// Used in feature flags only (`asm-keccak`, `keccak-cache-global`)
use alloy_primitives as _;
pub mod cli;
/// Re-exported utils.
@@ -205,12 +208,9 @@ pub mod rpc {
}
}
/// Ress subprotocol installation.
pub mod ress;
// re-export for convenience
#[doc(inline)]
pub use reth_cli_runner::{tokio_runtime, CliContext, CliRunner};
pub use reth_cli_runner::{CliContext, CliRunner};
// for rendering diagrams
use aquamarine as _;
@@ -218,3 +218,4 @@ use aquamarine as _;
// used in main
use clap as _;
use reth_cli_util as _;
use tracing as _;

View File

@@ -8,9 +8,8 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::ne
static MALLOC_CONF: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
use clap::Parser;
use reth::{args::RessArgs, cli::Cli, ress::install_ress_subprotocol};
use reth::cli::Cli;
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
use reth_node_builder::NodeHandle;
use reth_node_ethereum::EthereumNode;
use tracing::info;
@@ -22,27 +21,12 @@ fn main() {
unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
}
if let Err(err) =
Cli::<EthereumChainSpecParser, RessArgs>::parse().run(async move |builder, ress_args| {
info!(target: "reth::cli", "Launching node");
let NodeHandle { node, node_exit_future } =
builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?;
if let Err(err) = Cli::<EthereumChainSpecParser>::parse().run(async move |builder, _| {
info!(target: "reth::cli", "Launching node");
let handle = builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?;
// Install ress subprotocol.
if ress_args.enabled {
install_ress_subprotocol(
ress_args,
node.provider,
node.evm_config,
node.network,
node.task_executor,
node.add_ons_handle.engine_events.new_listener(),
)?;
}
node_exit_future.await
})
{
handle.wait_for_node_exit().await
}) {
eprintln!("Error: {err:?}");
std::process::exit(1);
}

View File

@@ -1,67 +0,0 @@
use reth_ethereum_primitives::EthPrimitives;
use reth_evm::ConfigureEvm;
use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols};
use reth_network_api::FullNetwork;
use reth_node_api::ConsensusEngineEvent;
use reth_node_core::args::RessArgs;
use reth_provider::providers::{BlockchainProvider, ProviderNodeTypes};
use reth_ress_protocol::{NodeType, ProtocolState, RessProtocolHandler};
use reth_ress_provider::{maintain_pending_state, PendingState, RethRessProtocolProvider};
use reth_tasks::TaskExecutor;
use reth_tokio_util::EventStream;
use tokio::sync::mpsc;
use tracing::*;
/// Install `ress` subprotocol if it's enabled.
pub fn install_ress_subprotocol<P, E, N>(
args: RessArgs,
provider: BlockchainProvider<P>,
evm_config: E,
network: N,
task_executor: TaskExecutor,
engine_events: EventStream<ConsensusEngineEvent<EthPrimitives>>,
) -> eyre::Result<()>
where
P: ProviderNodeTypes<Primitives = EthPrimitives>,
E: ConfigureEvm<Primitives = EthPrimitives> + Clone + 'static,
N: FullNetwork + NetworkProtocols,
{
info!(target: "reth::cli", "Installing ress subprotocol");
let pending_state = PendingState::default();
// Spawn maintenance task for pending state.
task_executor.spawn(maintain_pending_state(
engine_events,
provider.clone(),
pending_state.clone(),
));
let (tx, mut rx) = mpsc::unbounded_channel();
let provider = RethRessProtocolProvider::new(
provider,
evm_config,
Box::new(task_executor.clone()),
args.max_witness_window,
args.witness_max_parallel,
args.witness_cache_size,
pending_state,
)?;
network.add_rlpx_sub_protocol(
RessProtocolHandler {
provider,
node_type: NodeType::Stateful,
peers_handle: network.peers_handle().clone(),
max_active_connections: args.max_active_connections,
state: ProtocolState::new(tx),
}
.into_rlpx_sub_protocol(),
);
info!(target: "reth::cli", "Ress subprotocol support enabled");
task_executor.spawn(async move {
while let Some(event) = rx.recv().await {
trace!(target: "reth::ress", ?event, "Received ress event");
}
});
Ok(())
}

255
bin/reth/tests/it/main.rs Normal file
View File

@@ -0,0 +1,255 @@
#![allow(missing_docs)]
use std::process::Command;
const RETH: &str = env!("CARGO_BIN_EXE_reth");
// ── Helpers ──────────────────────────────────────────────────────────────────
/// Runs `reth <args>` and returns stdout, asserting exit code 0.
///
/// Tracing is suppressed via `RUST_LOG=off` so that log lines emitted during
/// binary startup don't pollute stdout-based assertions.
#[track_caller]
fn reth_ok(args: &[&str]) -> String {
let output = Command::new(RETH).env("RUST_LOG", "off").args(args).output().unwrap();
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(output.status.success(), "args {args:?} failed.\nstdout: {stdout}\nstderr: {stderr}");
stdout.into_owned()
}
/// Spawns an isolated dev-mode reth node.
///
/// Discovery is disabled and peer limits are zeroed so the node is fully
/// isolated. Each call gets a unique temporary data directory so that
/// concurrent test runs never collide on the default `reth/dev/` path.
fn spawn_dev() -> (alloy_node_bindings::RethInstance, tempfile::TempDir) {
use alloy_node_bindings::Reth;
let datadir = tempfile::tempdir().expect("failed to create temp dir");
let instance = Reth::at(RETH)
.dev()
.disable_discovery()
.data_dir(datadir.path())
.args(["--max-outbound-peers", "0", "--max-inbound-peers", "0"])
.spawn();
// Return the TempDir alongside the instance so it lives as long as the node.
(instance, datadir)
}
// ── Original tests (from PR #22069) ──────────────────────────────────────────
#[test]
fn help() {
let stdout = reth_ok(&["--help"]);
assert!(stdout.contains("Usage"), "stdout: {stdout}");
assert!(stdout.contains("node"), "stdout: {stdout}");
}
#[test]
fn version() {
let stdout = reth_ok(&["--version"]);
assert!(stdout.to_lowercase().contains("reth"), "stdout: {stdout}");
}
#[test]
fn node_help() {
let stdout = reth_ok(&["node", "--help"]);
assert!(stdout.contains("--dev"), "stdout: {stdout}");
assert!(stdout.contains("--http"), "stdout: {stdout}");
}
#[test]
fn unknown_subcommand() {
let output = Command::new(RETH).arg("definitely-not-a-cmd").output().unwrap();
assert!(!output.status.success());
}
#[test]
fn unknown_flag() {
let output = Command::new(RETH).args(["node", "--no-such-flag"]).output().unwrap();
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(!output.status.success());
assert!(stderr.contains("--no-such-flag"), "stderr: {stderr}");
}
#[tokio::test]
async fn dev_node_eth_syncing() {
use alloy_provider::{Provider, ProviderBuilder};
let (reth, _datadir) = spawn_dev();
let provider = ProviderBuilder::new().connect_http(reth.endpoint().parse().unwrap());
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let _syncing = provider.syncing().await.expect("eth_syncing failed");
}
// ── Subcommand --help coverage ───────────────────────────────────────────────
//
// Every registered subcommand must produce valid --help output. This catches
// clap wiring regressions (e.g. a missing field, a conflicting arg name, or a
// broken `help_message()` call) that would otherwise only surface when a user
// runs the command.
#[test]
fn init_help() {
let stdout = reth_ok(&["init", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn init_state_help() {
let stdout = reth_ok(&["init-state", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn import_help() {
let stdout = reth_ok(&["import", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn import_era_help() {
let stdout = reth_ok(&["import-era", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn export_era_help() {
let stdout = reth_ok(&["export-era", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn dump_genesis_help() {
let stdout = reth_ok(&["dump-genesis", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn db_help() {
let stdout = reth_ok(&["db", "--help"]);
assert!(stdout.contains("stats"), "stdout: {stdout}");
}
#[test]
fn stage_help() {
let stdout = reth_ok(&["stage", "--help"]);
assert!(stdout.contains("run"), "stdout: {stdout}");
}
#[test]
fn p2p_help() {
let stdout = reth_ok(&["p2p", "--help"]);
assert!(stdout.contains("header"), "stdout: {stdout}");
}
#[test]
fn config_help() {
let stdout = reth_ok(&["config", "--help"]);
assert!(stdout.contains("--default"), "stdout: {stdout}");
}
#[test]
fn prune_help() {
let stdout = reth_ok(&["prune", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn download_help() {
let stdout = reth_ok(&["download", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn re_execute_help() {
let stdout = reth_ok(&["re-execute", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
// ── `config --default` outputs valid TOML ────────────────────────────────────
#[test]
fn config_default_valid_toml() {
let stdout = reth_ok(&["config", "--default"]);
let parsed: toml::Value =
toml::from_str(&stdout).expect("config --default did not produce valid TOML");
// The default config must contain the [stages] table — this is the heart of
// the pipeline configuration and its absence would indicate a serialization
// regression.
assert!(parsed.get("stages").is_some(), "missing [stages] in config output");
}
// ── `dump-genesis` outputs valid JSON ────────────────────────────────────────
#[test]
fn dump_genesis_mainnet_valid_json() {
let stdout = reth_ok(&["dump-genesis"]);
let genesis: serde_json::Value =
serde_json::from_str(&stdout).expect("dump-genesis did not produce valid JSON");
assert!(genesis.get("nonce").is_some(), "missing nonce in genesis JSON");
assert!(genesis.get("alloc").is_some(), "missing alloc in genesis JSON");
}
#[test]
fn dump_genesis_sepolia_valid_json() {
let stdout = reth_ok(&["dump-genesis", "--chain", "sepolia"]);
let genesis: serde_json::Value = serde_json::from_str(&stdout)
.expect("dump-genesis --chain sepolia did not produce valid JSON");
assert!(genesis.get("alloc").is_some(), "missing alloc in sepolia genesis JSON");
}
// ── Dev node: send transaction round-trip ────────────────────────────────────
//
// Exercises the full pipeline: RPC submission → mempool → sealing → execution →
// receipt retrieval. Uses the pre-funded dev account so no genesis customization
// is required.
#[tokio::test]
async fn dev_node_send_tx_and_mine() {
use alloy_primitives::{Address, U256};
use alloy_provider::{Provider, ProviderBuilder};
use alloy_rpc_types_eth::TransactionRequest;
let (reth, _datadir) = spawn_dev();
let provider = ProviderBuilder::new().connect_http(reth.endpoint().parse().unwrap());
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// Dev mode pre-funds the first dev account.
let accounts = provider.get_accounts().await.expect("eth_accounts failed");
assert!(!accounts.is_empty(), "dev node should expose at least one account");
let sender = accounts[0];
let recipient = Address::with_last_byte(0x42);
let tx = TransactionRequest::default().from(sender).to(recipient).value(U256::from(1_000_000));
let tx_hash = provider.send_transaction(tx).await.expect("eth_sendTransaction failed");
// In dev/instant-mine mode the node seals a block for each transaction, so
// the receipt becomes available almost immediately.
let receipt = tx_hash.get_receipt().await.expect("failed to get receipt");
assert!(receipt.status(), "transaction should have succeeded");
assert_eq!(receipt.to, Some(recipient));
assert!(receipt.block_number.unwrap() > 0, "receipt should be in a mined block");
// Verify the transfer actually mutated state.
let balance = provider.get_balance(recipient).await.expect("eth_getBalance failed");
assert_eq!(balance, U256::from(1_000_000));
}
const fn main() {}

View File

@@ -312,6 +312,11 @@ impl DeferredTrieData {
/// Given that invariant, circular wait dependencies are impossible.
#[instrument(level = "debug", target = "engine::tree::deferred_trie", skip_all)]
pub fn wait_cloned(&self) -> ComputedTrieData {
#[cfg(feature = "rayon")]
debug_assert!(
rayon::current_thread_index().is_none(),
"wait_cloned must not be called from a rayon worker thread"
);
let mut state = self.state.lock();
match &mut *state {
// If the deferred trie data is ready, return the cached result.

View File

@@ -1061,6 +1061,14 @@ mod tests {
) -> ProviderResult<Option<StorageValue>> {
Ok(None)
}
fn storage_by_hashed_key(
&self,
_address: Address,
_hashed_storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
Ok(None)
}
}
impl BytecodeReader for MockStateProvider {

View File

@@ -223,6 +223,26 @@ impl<N: NodePrimitives> StateProvider for MemoryOverlayStateProviderRef<'_, N> {
self.historical.storage(address, storage_key)
}
fn storage_by_hashed_key(
&self,
address: Address,
hashed_storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
let hashed_address = keccak256(address);
let state = &self.trie_input().state;
if let Some(hs) = state.storages.get(&hashed_address) {
if let Some(value) = hs.storage.get(&hashed_storage_key) {
return Ok(Some(*value));
}
if hs.wiped {
return Ok(Some(StorageValue::ZERO));
}
}
self.historical.storage_by_hashed_key(address, hashed_storage_key)
}
}
impl<N: NodePrimitives> BytecodeReader for MemoryOverlayStateProviderRef<'_, N> {

View File

@@ -66,7 +66,8 @@ pub trait RethCli: Sized {
F: FnOnce(Self, CliRunner) -> R,
{
let cli = Self::parse_args()?;
let runner = CliRunner::try_default_runtime()?;
let runner = CliRunner::try_default_runtime()
.map_err(|e| Error::raw(clap::error::ErrorKind::Io, e))?;
Ok(cli.with_runner(f, runner))
}

View File

@@ -134,4 +134,4 @@ arbitrary = [
]
rocksdb = ["reth-db-common/rocksdb", "reth-stages/rocksdb", "reth-provider/rocksdb", "reth-prune/rocksdb"]
edge = ["rocksdb", "reth-db-common/edge", "reth-provider/edge"]
edge = ["rocksdb"]

View File

@@ -19,7 +19,7 @@ use reth_node_builder::{
Node, NodeComponents, NodeComponentsBuilder, NodeTypes, NodeTypesWithDBAdapter,
};
use reth_node_core::{
args::{DatabaseArgs, DatadirArgs, RocksDbArgs, StaticFilesArgs, StorageArgs},
args::{DatabaseArgs, DatadirArgs, StaticFilesArgs, StorageArgs},
dirs::{ChainPath, DataDirPath},
};
use reth_provider::{
@@ -67,70 +67,35 @@ pub struct EnvironmentArgs<C: ChainSpecParser> {
#[command(flatten)]
pub static_files: StaticFilesArgs,
/// All `RocksDB` related arguments
#[command(flatten)]
pub rocksdb: RocksDbArgs,
/// Storage mode configuration (v2 vs v1/legacy)
#[command(flatten)]
pub storage: StorageArgs,
}
impl<C: ChainSpecParser> EnvironmentArgs<C> {
/// Returns the effective storage settings derived from `--storage.v2`, static-file, and
/// `RocksDB` CLI args.
/// Returns the effective storage settings derived from `--storage.v2`.
///
/// The base storage mode is determined by `--storage.v2`:
/// - When `--storage.v2` is set: uses [`StorageSettings::v2()`] defaults
/// - Otherwise: uses [`StorageSettings::v1()`] defaults
///
/// Individual `--static-files.*` and `--rocksdb.*` flags override the base when explicitly set.
/// - Otherwise: uses [`StorageSettings::base()`] defaults
pub fn storage_settings(&self) -> StorageSettings {
let mut s = if self.storage.v2 { StorageSettings::v2() } else { StorageSettings::base() };
// Apply static files overrides (only when explicitly set)
if let Some(v) = self.static_files.receipts {
s = s.with_receipts_in_static_files(v);
if self.storage.v2 {
StorageSettings::v2()
} else {
StorageSettings::base()
}
if let Some(v) = self.static_files.transaction_senders {
s = s.with_transaction_senders_in_static_files(v);
}
if let Some(v) = self.static_files.account_changesets {
s = s.with_account_changesets_in_static_files(v);
}
if let Some(v) = self.static_files.storage_changesets {
s = s.with_storage_changesets_in_static_files(v);
}
// Apply rocksdb overrides
// --rocksdb.all sets all rocksdb flags to true
if self.rocksdb.all {
s = s
.with_transaction_hash_numbers_in_rocksdb(true)
.with_storages_history_in_rocksdb(true)
.with_account_history_in_rocksdb(true);
}
// Individual rocksdb flags override --rocksdb.all when explicitly set
if let Some(v) = self.rocksdb.tx_hash {
s = s.with_transaction_hash_numbers_in_rocksdb(v);
}
if let Some(v) = self.rocksdb.storages_history {
s = s.with_storages_history_in_rocksdb(v);
}
if let Some(v) = self.rocksdb.account_history {
s = s.with_account_history_in_rocksdb(v);
}
s
}
/// Initializes environment according to [`AccessRights`] and returns an instance of
/// [`Environment`].
///
/// Internally builds a [`reth_tasks::Runtime`] attached to the current tokio handle for
/// parallel storage I/O.
pub fn init<N: CliNodeTypes>(&self, access: AccessRights) -> eyre::Result<Environment<N>>
where
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
{
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
let data_dir = self.datadir.clone().resolve_datadir(self.chain.chain());
let db_path = data_dir.db();
let sf_path = data_dir.static_files();
@@ -186,7 +151,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
.build()?;
let provider_factory =
self.create_provider_factory(&config, db, sfp, rocksdb_provider, access)?;
self.create_provider_factory(&config, db, sfp, rocksdb_provider, access, runtime)?;
if access.is_read_write() {
debug!(target: "reth::cli", chain=%self.chain.chain(), genesis=?self.chain.genesis_hash(), "Initializing genesis");
init_genesis_with_settings(&provider_factory, self.storage_settings())?;
@@ -207,6 +172,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
static_file_provider: StaticFileProvider<N::Primitives>,
rocksdb_provider: RocksDBProvider,
access: AccessRights,
runtime: reth_tasks::Runtime,
) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>>>
where
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
@@ -217,6 +183,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
self.chain.clone(),
static_file_provider,
rocksdb_provider,
runtime,
)?
.with_prune_modes(prune_modes.clone());

View File

@@ -5,6 +5,7 @@ use reth_codecs::Compact;
use reth_db_api::{cursor::DbDupCursorRO, database::Database, tables, transaction::DbTx};
use reth_db_common::DbTool;
use reth_node_builder::NodeTypesWithDB;
use reth_storage_api::StorageSettingsCache;
use std::time::{Duration, Instant};
use tracing::info;
@@ -16,26 +17,22 @@ const LOG_INTERVAL: Duration = Duration::from_secs(5);
pub struct Command {
/// The account address to check storage for
address: Address,
/// Use hashed state tables (HashedStorages) instead of plain state
#[arg(long)]
hashed: bool,
}
impl Command {
/// Execute `db account-storage` command
pub fn execute<N: NodeTypesWithDB>(self, tool: &DbTool<N>) -> eyre::Result<()> {
let address = self.address;
let use_hashed = self.hashed;
let hashed_address = keccak256(address);
let use_hashed_state = tool.provider_factory.cached_storage_settings().use_hashed_state();
let (slot_count, storage_size) = tool.provider_factory.db_ref().view(|tx| {
let mut count = 0usize;
let mut total_value_bytes = 0usize;
let mut last_log = Instant::now();
if use_hashed {
let (slot_count, storage_size) = if use_hashed_state {
let hashed_address = keccak256(address);
tool.provider_factory.db_ref().view(|tx| {
let mut cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
let mut count = 0usize;
let mut total_value_bytes = 0usize;
let mut last_log = Instant::now();
let walker = cursor.walk_dup(Some(hashed_address), None)?;
for entry in walker {
let (_, storage_entry) = entry?;
@@ -47,7 +44,7 @@ impl Command {
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
hashed_address = %hashed_address,
address = %address,
slots = count,
key = %storage_entry.key,
"Processing hashed storage slots"
@@ -55,16 +52,25 @@ impl Command {
last_log = Instant::now();
}
}
// HashedStorages uses 32-byte B256 key
let total_size = if count > 0 { 32 + total_value_bytes } else { 0 };
Ok::<_, eyre::Report>((count, total_size))
} else {
})??
} else {
tool.provider_factory.db_ref().view(|tx| {
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let mut count = 0usize;
let mut total_value_bytes = 0usize;
let mut last_log = Instant::now();
// Walk all storage entries for this address
let walker = cursor.walk_dup(Some(address), None)?;
for entry in walker {
let (_, storage_entry) = entry?;
count += 1;
let mut buf = Vec::new();
// StorageEntry encodes as: 32 bytes (key/subkey uncompressed) + compressed U256
let entry_len = storage_entry.to_compact(&mut buf);
total_value_bytes += entry_len;
@@ -79,24 +85,26 @@ impl Command {
last_log = Instant::now();
}
}
// PlainStorageState uses 20-byte Address key
let total_size = if count > 0 { 20 + total_value_bytes } else { 0 };
Ok::<_, eyre::Report>((count, total_size))
}
})??;
let state_source = if use_hashed { "hashed" } else { "plain" };
// Add 20 bytes for the Address key (stored once per account in dupsort)
let total_size = if count > 0 { 20 + total_value_bytes } else { 0 };
Ok::<_, eyre::Report>((count, total_size))
})??
};
let hashed_address = keccak256(address);
println!("Account: {address}");
println!("Hashed address: {hashed_address}");
println!("State source: {state_source}");
println!("Storage slots: {slot_count}");
println!("Storage size: {} (estimated)", human_bytes(storage_size as f64));
if !use_hashed {
// When querying plain state, also estimate what hashed would be
if use_hashed_state {
println!("Hashed storage size: {} (estimated)", human_bytes(storage_size as f64));
} else {
// Estimate hashed storage size: 32-byte B256 key instead of 20-byte Address
let hashed_size_estimate = if slot_count > 0 { storage_size + 12 } else { 0 };
let total_estimate = storage_size + hashed_size_estimate;
println!("Plain storage size: {} (estimated)", human_bytes(storage_size as f64));
println!(
"Hashed storage size: {} (estimated)",
human_bytes(hashed_size_estimate as f64)
@@ -123,17 +131,5 @@ mod tests {
cmd.address,
"0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045".parse::<Address>().unwrap()
);
assert!(!cmd.hashed);
}
#[test]
fn parse_hashed_flag() {
let cmd = Command::try_parse_from([
"account-storage",
"0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045",
"--hashed",
])
.unwrap();
assert!(cmd.hashed);
}
}

View File

@@ -0,0 +1,61 @@
use clap::Parser;
use reth_db::mdbx::{self, ffi};
use std::path::PathBuf;
/// Copies the MDBX database to a new location.
///
/// Equivalent to the standalone `mdbx_copy` tool but bundled into reth.
#[derive(Parser, Debug)]
pub struct Command {
/// Destination path for the database copy.
dest: PathBuf,
/// Compact the database while copying (reclaims free space).
#[arg(short, long)]
compact: bool,
/// Force dynamic size for the destination database.
#[arg(short = 'd', long)]
force_dynamic_size: bool,
/// Throttle to avoid MVCC pressure on writers.
#[arg(short = 'p', long)]
throttle_mvcc: bool,
}
impl Command {
/// Execute `db copy` command
pub fn execute(self, db: &mdbx::DatabaseEnv) -> eyre::Result<()> {
let mut flags: ffi::MDBX_copy_flags_t = ffi::MDBX_CP_DEFAULTS;
if self.compact {
flags |= ffi::MDBX_CP_COMPACT;
}
if self.force_dynamic_size {
flags |= ffi::MDBX_CP_FORCE_DYNAMIC_SIZE;
}
if self.throttle_mvcc {
flags |= ffi::MDBX_CP_THROTTLE_MVCC;
}
let dest = self
.dest
.to_str()
.ok_or_else(|| eyre::eyre!("destination path must be valid UTF-8"))?;
let dest_cstr = std::ffi::CString::new(dest)?;
println!("Copying database to {} ...", self.dest.display());
let rc = db.with_raw_env_ptr(|env_ptr| unsafe {
ffi::mdbx_env_copy(env_ptr, dest_cstr.as_ptr(), flags)
});
if rc != 0 {
eyre::bail!("mdbx_env_copy failed with error code {rc}: {}", unsafe {
std::ffi::CStr::from_ptr(ffi::mdbx_strerror(rc)).to_string_lossy()
});
}
println!("Done.");
Ok(())
}
}

View File

@@ -98,7 +98,8 @@ impl Command {
)?;
if let Some(entry) = entry {
println!("{}", serde_json::to_string_pretty(&entry)?);
let se: reth_primitives_traits::StorageEntry = entry.into();
println!("{}", serde_json::to_string_pretty(&se)?);
} else {
error!(target: "reth::cli", "No content for the given table key.");
}
@@ -106,7 +107,14 @@ impl Command {
}
let changesets = provider.storage_changeset(key.block_number())?;
println!("{}", serde_json::to_string_pretty(&changesets)?);
let serializable: Vec<_> = changesets
.into_iter()
.map(|(addr, entry)| {
let se: reth_primitives_traits::StorageEntry = entry.into();
(addr, se)
})
.collect();
println!("{}", serde_json::to_string_pretty(&serializable)?);
return Ok(());
}

View File

@@ -12,6 +12,7 @@ use std::{
mod account_storage;
mod checksum;
mod clear;
mod copy;
mod diff;
mod get;
mod list;
@@ -42,6 +43,8 @@ pub enum Subcommands {
List(list::Command),
/// Calculates the content checksum of a table or static file segment
Checksum(checksum::Command),
/// Copies the MDBX database to a new location (bundled mdbx_copy)
Copy(copy::Command),
/// Create a diff between two database tables or two entire databases.
Diff(diff::Command),
/// Gets the content of a table for the given key
@@ -70,23 +73,23 @@ pub enum Subcommands {
State(state::Command),
}
/// Initializes a provider factory with specified access rights, and then execute with the provided
/// command
macro_rules! db_exec {
($env:expr, $tool:ident, $N:ident, $access_rights:expr, $command:block) => {
let Environment { provider_factory, .. } = $env.init::<$N>($access_rights)?;
let $tool = DbTool::new(provider_factory)?;
$command;
};
}
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C> {
/// Execute `db` command
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(
self,
ctx: CliContext,
) -> eyre::Result<()> {
/// Initializes a provider factory with specified access rights, and then executes the
/// provided command.
macro_rules! db_exec {
($env:expr, $tool:ident, $N:ident, $access_rights:expr, $command:block) => {
let Environment { provider_factory, .. } = $env.init::<$N>($access_rights)?;
let $tool = DbTool::new(provider_factory)?;
$command;
};
}
let data_dir = self.env.datadir.clone().resolve_datadir(self.env.chain.chain());
let db_path = data_dir.db();
let static_files_path = data_dir.static_files();
@@ -124,6 +127,11 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
command.execute(&tool)?;
});
}
Subcommands::Copy(command) => {
db_exec!(self.env, tool, N, AccessRights::RO, {
command.execute(tool.provider_factory.db_ref())?;
});
}
Subcommands::Diff(command) => {
db_exec!(self.env, tool, N, AccessRights::RO, {
command.execute(&tool)?;

View File

@@ -64,7 +64,7 @@ impl Command {
let executor = task_executor.clone();
let pprof_dump_dir = data_dir.pprof_dumps();
let handle = task_executor.spawn_critical("metrics server", async move {
let handle = task_executor.spawn_critical_task("metrics server", async move {
let config = MetricServerConfig::new(
listen_addr,
VersionInfo {

View File

@@ -39,38 +39,12 @@ enum Subcommands {
#[derive(Debug, Clone, Copy, Subcommand)]
#[clap(rename_all = "snake_case")]
pub enum SetCommand {
/// Store receipts in static files instead of the database
Receipts {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store transaction senders in static files instead of the database
TransactionSenders {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store account changesets in static files instead of the database
AccountChangesets {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store storage history in rocksdb instead of MDBX
StoragesHistory {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store transaction hash to number mapping in rocksdb instead of MDBX
TransactionHashNumbers {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store account history in rocksdb instead of MDBX
AccountHistory {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store storage changesets in static files instead of the database
StorageChangesets {
/// Enable or disable v2 storage layout
///
/// When enabled, uses static files for receipts/senders/changesets and RocksDB for
/// history indices and transaction hashes. When disabled, uses v1/legacy layout (everything in
/// MDBX).
V2 {
#[clap(action(ArgAction::Set))]
value: bool,
},
@@ -113,74 +87,18 @@ impl Command {
println!("No storage settings found, creating new settings.");
}
let mut settings @ StorageSettings {
receipts_in_static_files: _,
transaction_senders_in_static_files: _,
storages_history_in_rocksdb: _,
transaction_hash_numbers_in_rocksdb: _,
account_history_in_rocksdb: _,
account_changesets_in_static_files: _,
storage_changesets_in_static_files: _,
use_hashed_state: _,
} = settings.unwrap_or_else(StorageSettings::v1);
let mut settings @ StorageSettings { storage_v2: _ } =
settings.unwrap_or_else(StorageSettings::v1);
// Update the setting based on the key
match cmd {
SetCommand::Receipts { value } => {
if settings.receipts_in_static_files == value {
println!("receipts_in_static_files is already set to {}", value);
SetCommand::V2 { value } => {
if settings.storage_v2 == value {
println!("storage_v2 is already set to {}", value);
return Ok(());
}
settings.receipts_in_static_files = value;
println!("Set receipts_in_static_files = {}", value);
}
SetCommand::TransactionSenders { value } => {
if settings.transaction_senders_in_static_files == value {
println!("transaction_senders_in_static_files is already set to {}", value);
return Ok(());
}
settings.transaction_senders_in_static_files = value;
println!("Set transaction_senders_in_static_files = {}", value);
}
SetCommand::AccountChangesets { value } => {
if settings.account_changesets_in_static_files == value {
println!("account_changesets_in_static_files is already set to {}", value);
return Ok(());
}
settings.account_changesets_in_static_files = value;
println!("Set account_changesets_in_static_files = {}", value);
}
SetCommand::StoragesHistory { value } => {
if settings.storages_history_in_rocksdb == value {
println!("storages_history_in_rocksdb is already set to {}", value);
return Ok(());
}
settings.storages_history_in_rocksdb = value;
println!("Set storages_history_in_rocksdb = {}", value);
}
SetCommand::TransactionHashNumbers { value } => {
if settings.transaction_hash_numbers_in_rocksdb == value {
println!("transaction_hash_numbers_in_rocksdb is already set to {}", value);
return Ok(());
}
settings.transaction_hash_numbers_in_rocksdb = value;
println!("Set transaction_hash_numbers_in_rocksdb = {}", value);
}
SetCommand::AccountHistory { value } => {
if settings.account_history_in_rocksdb == value {
println!("account_history_in_rocksdb is already set to {}", value);
return Ok(());
}
settings.account_history_in_rocksdb = value;
println!("Set account_history_in_rocksdb = {}", value);
}
SetCommand::StorageChangesets { value } => {
if settings.storage_changesets_in_static_files == value {
println!("storage_changesets_in_static_files is already set to {}", value);
return Ok(());
}
settings.storage_changesets_in_static_files = value;
println!("Set storage_changesets_in_static_files = {}", value);
settings.storage_v2 = value;
println!("Set storage_v2 = {}", value);
}
}

View File

@@ -39,10 +39,6 @@ pub struct Command {
/// Output format (table, json, csv)
#[arg(long, short, default_value = "table")]
format: OutputFormat,
/// Use hashed state tables (HashedStorages) instead of plain state
#[arg(long)]
hashed: bool,
}
impl Command {
@@ -67,57 +63,24 @@ impl Command {
address: Address,
limit: usize,
) -> eyre::Result<()> {
let use_hashed = self.hashed;
let hashed_address = keccak256(address);
let use_hashed_state = tool.provider_factory.cached_storage_settings().use_hashed_state();
let entries = tool.provider_factory.db_ref().view(|tx| {
let account = if use_hashed {
tx.get::<tables::HashedAccounts>(hashed_address)?
} else {
tx.get::<tables::PlainAccountState>(address)?
};
let mut entries = Vec::new();
let mut last_log = Instant::now();
if use_hashed {
let (account, walker_entries) = if use_hashed_state {
let hashed_address = keccak256(address);
let account = tx.get::<tables::HashedAccounts>(hashed_address)?;
let mut cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
let walker = cursor.walk_dup(Some(hashed_address), None)?;
let mut entries = Vec::new();
let mut last_log = Instant::now();
for (idx, entry) in walker.enumerate() {
let (_, storage_entry) = entry?;
if storage_entry.value != U256::ZERO {
entries.push((storage_entry.key, storage_entry.value));
}
if entries.len() >= limit {
break;
}
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
hashed_address = %hashed_address,
slots_scanned = idx,
"Scanning hashed storage slots"
);
last_log = Instant::now();
}
}
} else {
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let walker = cursor.walk_dup(Some(address), None)?;
for (idx, entry) in walker.enumerate() {
let (_, storage_entry) = entry?;
if storage_entry.value != U256::ZERO {
entries.push((storage_entry.key, storage_entry.value));
}
if entries.len() >= limit {
break;
}
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
@@ -128,14 +91,42 @@ impl Command {
last_log = Instant::now();
}
}
}
(account, entries)
} else {
// Get account info
let account = tx.get::<tables::PlainAccountState>(address)?;
// Get storage entries
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let walker = cursor.walk_dup(Some(address), None)?;
let mut entries = Vec::new();
let mut last_log = Instant::now();
for (idx, entry) in walker.enumerate() {
let (_, storage_entry) = entry?;
if storage_entry.value != U256::ZERO {
entries.push((storage_entry.key, storage_entry.value));
}
if entries.len() >= limit {
break;
}
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
address = %address,
slots_scanned = idx,
"Scanning storage slots"
);
last_log = Instant::now();
}
}
(account, entries)
};
Ok::<_, eyre::Report>((account, entries))
Ok::<_, eyre::Report>((account, walker_entries))
})??;
let (account, storage_entries) = entries;
self.print_results(address, None, account, &storage_entries, use_hashed);
self.print_results(address, None, account, &storage_entries);
Ok(())
}
@@ -154,7 +145,7 @@ impl Command {
// Check storage settings to determine where history is stored
let storage_settings = tool.provider_factory.cached_storage_settings();
let history_in_rocksdb = storage_settings.storages_history_in_rocksdb;
let history_in_rocksdb = storage_settings.storage_v2;
// For historical queries, enumerate keys from history indices only
// (not PlainStorageState, which reflects current state)
@@ -211,7 +202,7 @@ impl Command {
}
}
self.print_results(address, Some(block), account, &entries, false);
self.print_results(address, Some(block), account, &entries);
Ok(())
}
@@ -353,22 +344,15 @@ impl Command {
block: Option<BlockNumber>,
account: Option<reth_primitives_traits::Account>,
storage: &[(alloy_primitives::B256, U256)],
use_hashed: bool,
) {
let state_source = if use_hashed { "hashed" } else { "plain" };
match self.format {
OutputFormat::Table => {
println!("Account: {address}");
if use_hashed {
println!("Hashed address: {}", keccak256(address));
}
if let Some(b) = block {
println!("Block: {b}");
} else {
println!("Block: latest");
}
println!("State source: {state_source}");
println!();
if let Some(acc) = account {
@@ -382,10 +366,9 @@ impl Command {
}
println!();
let slot_header = if use_hashed { "Hashed Slot" } else { "Slot" };
println!("Storage ({} slots):", storage.len());
println!("{:-<130}", "");
println!("{:<66} | {:<64}", slot_header, "Value");
println!("{:<66} | {:<64}", "Slot", "Value");
println!("{:-<130}", "");
for (key, value) in storage {
println!("{key} | {value:#066x}");
@@ -394,9 +377,7 @@ impl Command {
OutputFormat::Json => {
let output = serde_json::json!({
"address": address.to_string(),
"hashed_address": if use_hashed { Some(keccak256(address).to_string()) } else { None },
"block": block,
"state_source": state_source,
"account": account.map(|a| serde_json::json!({
"nonce": a.nonce,
"balance": a.balance.to_string(),
@@ -454,17 +435,5 @@ mod tests {
let cmd = Command::try_parse_from(["state", "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"])
.unwrap();
assert_eq!(cmd.block, None);
assert!(!cmd.hashed);
}
#[test]
fn parse_state_args_hashed() {
let cmd = Command::try_parse_from([
"state",
"0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045",
"--hashed",
])
.unwrap();
assert!(cmd.hashed);
}
}

View File

@@ -37,6 +37,14 @@ pub struct DownloadDefaults {
pub available_snapshots: Vec<Cow<'static, str>>,
/// Default base URL for snapshots
pub default_base_url: Cow<'static, str>,
/// Default base URL for chain-aware snapshots.
///
/// When set, the chain ID is appended to form the full URL: `{base_url}/{chain_id}`.
/// For example, given a base URL of `https://snapshots.example.com` and chain ID `1`,
/// the resulting URL would be `https://snapshots.example.com/1`.
///
/// Falls back to [`default_base_url`](Self::default_base_url) when `None`.
pub default_chain_aware_base_url: Option<Cow<'static, str>>,
/// Optional custom long help text that overrides the generated help
pub long_help: Option<String>,
}
@@ -60,6 +68,7 @@ impl DownloadDefaults {
Cow::Borrowed("https://publicnode.com/snapshots (full nodes & testnets)"),
],
default_base_url: Cow::Borrowed(MERKLE_BASE_URL),
default_chain_aware_base_url: None,
long_help: None,
}
}
@@ -84,9 +93,11 @@ impl DownloadDefaults {
}
help.push_str(
"\nIf no URL is provided, the latest mainnet archive snapshot\nwill be proposed for download from ",
"\nIf no URL is provided, the latest archive snapshot for the selected chain\nwill be proposed for download from ",
);
help.push_str(
self.default_chain_aware_base_url.as_deref().unwrap_or(&self.default_base_url),
);
help.push_str(self.default_base_url.as_ref());
help.push_str(
".\n\nLocal file:// URLs are also supported for extracting snapshots from disk.",
);
@@ -111,6 +122,12 @@ impl DownloadDefaults {
self
}
/// Set the default chain-aware base URL.
pub fn with_chain_aware_base_url(mut self, url: impl Into<Cow<'static, str>>) -> Self {
self.default_chain_aware_base_url = Some(url.into());
self
}
/// Builder: Set custom long help text, overriding the generated help
pub fn with_long_help(mut self, help: impl Into<String>) -> Self {
self.long_help = Some(help.into());
@@ -142,7 +159,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
let url = match self.url {
Some(url) => url,
None => {
let url = get_latest_snapshot_url().await?;
let url = get_latest_snapshot_url(self.env.chain.chain().id()).await?;
info!(target: "reth::cli", "Using default snapshot URL: {}", url);
url
}
@@ -509,8 +526,12 @@ async fn stream_and_extract(url: &str, target_dir: &Path) -> Result<()> {
}
// Builds default URL for latest mainnet archive snapshot using configured defaults
async fn get_latest_snapshot_url() -> Result<String> {
let base_url = &DownloadDefaults::get_global().default_base_url;
async fn get_latest_snapshot_url(chain_id: u64) -> Result<String> {
let defaults = DownloadDefaults::get_global();
let base_url = match &defaults.default_chain_aware_base_url {
Some(url) => format!("{url}/{chain_id}"),
None => defaults.default_base_url.to_string(),
};
let latest_url = format!("{base_url}/latest.txt");
let filename = Client::new()
.get(latest_url)

View File

@@ -10,8 +10,8 @@ use reth_node_builder::NodeBuilder;
use reth_node_core::{
args::{
DatabaseArgs, DatadirArgs, DebugArgs, DevArgs, EngineArgs, EraArgs, MetricArgs,
NetworkArgs, PayloadBuilderArgs, PruningArgs, RocksDbArgs, RpcServerArgs, StaticFilesArgs,
StorageArgs, TxPoolArgs,
NetworkArgs, PayloadBuilderArgs, PruningArgs, RpcServerArgs, StaticFilesArgs, StorageArgs,
TxPoolArgs,
},
node_config::NodeConfig,
version,
@@ -103,10 +103,6 @@ pub struct NodeCommand<C: ChainSpecParser, Ext: clap::Args + fmt::Debug = NoArgs
#[command(flatten)]
pub pruning: PruningArgs,
/// All `RocksDB` table routing arguments
#[command(flatten)]
pub rocksdb: RocksDbArgs,
/// Engine cli arguments
#[command(flatten, next_help_heading = "Engine")]
pub engine: EngineArgs,
@@ -119,8 +115,8 @@ pub struct NodeCommand<C: ChainSpecParser, Ext: clap::Args + fmt::Debug = NoArgs
#[command(flatten, next_help_heading = "Static Files")]
pub static_files: StaticFilesArgs,
/// Storage mode configuration (v2 vs v1/legacy)
#[command(flatten)]
/// All storage related arguments with --storage prefix
#[command(flatten, next_help_heading = "Storage")]
pub storage: StorageArgs,
/// Additional cli arguments
@@ -175,7 +171,6 @@ where
db,
dev,
pruning,
rocksdb,
engine,
era,
static_files,
@@ -183,9 +178,6 @@ where
ext,
} = self;
// Validate RocksDB arguments
rocksdb.validate()?;
// set up node config
let mut node_config = NodeConfig {
datadir,
@@ -201,7 +193,6 @@ where
db,
dev,
pruning,
rocksdb,
engine,
era,
static_files,

View File

@@ -76,7 +76,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> PruneComma
// Set up cancellation token for graceful shutdown on Ctrl+C
let cancellation = CancellationToken::new();
let cancellation_clone = cancellation.clone();
ctx.task_executor.spawn_critical("prune-ctrl-c", async move {
ctx.task_executor.spawn_critical_task("prune-ctrl-c", async move {
tokio::signal::ctrl_c().await.expect("failed to listen for ctrl-c");
cancellation_clone.cancel();
});

View File

@@ -9,7 +9,10 @@ use reth_db_api::{
transaction::{DbTx, DbTxMut},
};
use reth_db_common::{
init::{insert_genesis_header, insert_genesis_history, insert_genesis_state},
init::{
insert_genesis_account_history, insert_genesis_header, insert_genesis_state,
insert_genesis_storage_history,
},
DbTool,
};
use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
@@ -42,12 +45,16 @@ impl<C: ChainSpecParser> Command<C> {
let tool = DbTool::new(provider_factory)?;
let static_file_segment = match self.stage {
StageEnum::Headers => Some(StaticFileSegment::Headers),
StageEnum::Bodies => Some(StaticFileSegment::Transactions),
StageEnum::Execution => Some(StaticFileSegment::Receipts),
StageEnum::Senders => Some(StaticFileSegment::TransactionSenders),
_ => None,
let static_file_segments = match self.stage {
StageEnum::Headers => vec![StaticFileSegment::Headers],
StageEnum::Bodies => vec![StaticFileSegment::Transactions],
StageEnum::Execution => vec![
StaticFileSegment::Receipts,
StaticFileSegment::AccountChangeSets,
StaticFileSegment::StorageChangeSets,
],
StageEnum::Senders => vec![StaticFileSegment::TransactionSenders],
_ => vec![],
};
// Calling `StaticFileProviderRW::prune_*` will instruct the writer to prune rows only
@@ -55,35 +62,33 @@ impl<C: ChainSpecParser> Command<C> {
// deleting the jar files, otherwise if the task were to be interrupted after we
// have deleted them, BUT before we have committed the checkpoints to the database, we'd
// lose essential data.
if let Some(static_file_segment) = static_file_segment {
let static_file_provider = tool.provider_factory.static_file_provider();
if let Some(highest_block) =
static_file_provider.get_highest_static_file_block(static_file_segment)
let static_file_provider = tool.provider_factory.static_file_provider();
for segment in static_file_segments {
if let Some(highest_block) = static_file_provider.get_highest_static_file_block(segment)
{
let mut writer = static_file_provider.latest_writer(static_file_segment)?;
let mut writer = static_file_provider.latest_writer(segment)?;
match static_file_segment {
match segment {
StaticFileSegment::Headers => {
// Prune all headers leaving genesis intact.
writer.prune_headers(highest_block)?;
}
StaticFileSegment::Transactions => {
let to_delete = static_file_provider
.get_highest_static_file_tx(static_file_segment)
.get_highest_static_file_tx(segment)
.map(|tx_num| tx_num + 1)
.unwrap_or_default();
writer.prune_transactions(to_delete, 0)?;
}
StaticFileSegment::Receipts => {
let to_delete = static_file_provider
.get_highest_static_file_tx(static_file_segment)
.get_highest_static_file_tx(segment)
.map(|tx_num| tx_num + 1)
.unwrap_or_default();
writer.prune_receipts(to_delete, 0)?;
}
StaticFileSegment::TransactionSenders => {
let to_delete = static_file_provider
.get_highest_static_file_tx(static_file_segment)
.get_highest_static_file_tx(segment)
.map(|tx_num| tx_num + 1)
.unwrap_or_default();
writer.prune_transaction_senders(to_delete, 0)?;
@@ -128,8 +133,15 @@ impl<C: ChainSpecParser> Command<C> {
reset_stage_checkpoint(tx, StageId::SenderRecovery)?;
}
StageEnum::Execution => {
tx.clear::<tables::PlainAccountState>()?;
tx.clear::<tables::PlainStorageState>()?;
if provider_rw.cached_storage_settings().use_hashed_state() {
tx.clear::<tables::HashedAccounts>()?;
tx.clear::<tables::HashedStorages>()?;
reset_stage_checkpoint(tx, StageId::AccountHashing)?;
reset_stage_checkpoint(tx, StageId::StorageHashing)?;
} else {
tx.clear::<tables::PlainAccountState>()?;
tx.clear::<tables::PlainStorageState>()?;
}
tx.clear::<tables::AccountChangeSets>()?;
tx.clear::<tables::StorageChangeSets>()?;
tx.clear::<tables::Bytecodes>()?;
@@ -171,29 +183,42 @@ impl<C: ChainSpecParser> Command<C> {
None,
)?;
}
StageEnum::AccountHistory | StageEnum::StorageHistory => {
StageEnum::AccountHistory => {
let settings = provider_rw.cached_storage_settings();
let rocksdb = tool.provider_factory.rocksdb_provider();
if settings.account_history_in_rocksdb {
if settings.storage_v2 {
rocksdb.clear::<tables::AccountsHistory>()?;
} else {
tx.clear::<tables::AccountsHistory>()?;
}
if settings.storages_history_in_rocksdb {
reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
insert_genesis_account_history(
&provider_rw,
self.env.chain.genesis().alloc.iter(),
)?;
}
StageEnum::StorageHistory => {
let settings = provider_rw.cached_storage_settings();
let rocksdb = tool.provider_factory.rocksdb_provider();
if settings.storage_v2 {
rocksdb.clear::<tables::StoragesHistory>()?;
} else {
tx.clear::<tables::StoragesHistory>()?;
}
reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
reset_stage_checkpoint(tx, StageId::IndexStorageHistory)?;
insert_genesis_history(&provider_rw, self.env.chain.genesis().alloc.iter())?;
insert_genesis_storage_history(
&provider_rw,
self.env.chain.genesis().alloc.iter(),
)?;
}
StageEnum::TxLookup => {
if provider_rw.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
if provider_rw.cached_storage_settings().storage_v2 {
tool.provider_factory
.rocksdb_provider()
.clear::<tables::TransactionHashNumbers>()?;

View File

@@ -37,12 +37,14 @@ where
unwind_and_copy(db_tool, from, tip_block_number, &output_db, evm_config.clone())?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
runtime,
)?,
to,
from,

View File

@@ -33,12 +33,14 @@ pub(crate) async fn dump_hashing_account_stage<N: ProviderNodeTypes<DB = Databas
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
runtime,
)?,
to,
from,

View File

@@ -23,12 +23,14 @@ pub(crate) async fn dump_hashing_storage_stage<N: ProviderNodeTypes<DB = Databas
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
runtime,
)?,
to,
from,

View File

@@ -57,12 +57,14 @@ where
unwind_and_copy(db_tool, (from, to), tip_block_number, &output_db, evm_config, consensus)?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
runtime,
)?,
to,
from,

View File

@@ -10,9 +10,10 @@
//! Entrypoint for running commands.
use reth_tasks::{TaskExecutor, TaskManager};
use reth_tasks::{PanickedTaskError, TaskExecutor};
use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
use tracing::{debug, error, trace};
use tokio::task::JoinHandle;
use tracing::{debug, error, info};
/// Executes CLI commands.
///
@@ -20,21 +21,24 @@ use tracing::{debug, error, trace};
#[derive(Debug)]
pub struct CliRunner {
config: CliRunnerConfig,
tokio_runtime: tokio::runtime::Runtime,
runtime: reth_tasks::Runtime,
}
impl CliRunner {
/// Attempts to create a new [`CliRunner`] using the default tokio
/// [`Runtime`](tokio::runtime::Runtime).
/// Attempts to create a new [`CliRunner`] using the default
/// [`Runtime`](reth_tasks::Runtime).
///
/// The default tokio runtime is multi-threaded, with both I/O and time drivers enabled.
pub fn try_default_runtime() -> Result<Self, std::io::Error> {
Ok(Self { config: CliRunnerConfig::default(), tokio_runtime: tokio_runtime()? })
/// The default runtime is multi-threaded, with both I/O and time drivers enabled.
pub fn try_default_runtime() -> Result<Self, reth_tasks::RuntimeBuildError> {
Self::try_with_runtime_config(reth_tasks::RuntimeConfig::default())
}
/// Create a new [`CliRunner`] from a provided tokio [`Runtime`](tokio::runtime::Runtime).
pub const fn from_runtime(tokio_runtime: tokio::runtime::Runtime) -> Self {
Self { config: CliRunnerConfig::new(), tokio_runtime }
/// Creates a new [`CliRunner`] with the given [`RuntimeConfig`](reth_tasks::RuntimeConfig).
pub fn try_with_runtime_config(
config: reth_tasks::RuntimeConfig,
) -> Result<Self, reth_tasks::RuntimeBuildError> {
let runtime = reth_tasks::RuntimeBuilder::new(config).build()?;
Ok(Self { config: CliRunnerConfig::default(), runtime })
}
/// Sets the [`CliRunnerConfig`] for this runner.
@@ -48,7 +52,7 @@ impl CliRunner {
where
F: Future<Output = T>,
{
self.tokio_runtime.block_on(fut)
self.runtime.handle().block_on(fut)
}
/// Executes the given _async_ command on the tokio runtime until the command future resolves or
@@ -64,12 +68,11 @@ impl CliRunner {
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
{
let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
AsyncCliRunner::new(self.tokio_runtime);
let (context, task_manager_handle) = cli_context(&self.runtime);
// Executes the command until it finished or ctrl-c was fired
let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
&mut task_manager,
let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
task_manager_handle,
run_until_ctrl_c(command(context)),
));
@@ -77,13 +80,13 @@ impl CliRunner {
error!(target: "reth::cli", "shutting down due to error");
} else {
debug!(target: "reth::cli", "shutting down gracefully");
// after the command has finished or exit signal was received we shutdown the task
// manager which fires the shutdown signal to all tasks spawned via the task
// after the command has finished or exit signal was received we shutdown the
// runtime which fires the shutdown signal to all tasks spawned via the task
// executor and awaiting on tasks spawned with graceful shutdown
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
}
tokio_shutdown(tokio_runtime, true);
runtime_shutdown(self.runtime, true);
command_res
}
@@ -99,17 +102,16 @@ impl CliRunner {
F: Future<Output = Result<(), E>> + Send + 'static,
E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
{
let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
AsyncCliRunner::new(self.tokio_runtime);
let (context, task_manager_handle) = cli_context(&self.runtime);
// Spawn the command on the blocking thread pool
let handle = tokio_runtime.handle().clone();
let command_handle =
tokio_runtime.handle().spawn_blocking(move || handle.block_on(command(context)));
let handle = self.runtime.handle().clone();
let handle2 = handle.clone();
let command_handle = handle.spawn_blocking(move || handle2.block_on(command(context)));
// Wait for the command to complete or ctrl-c
let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
&mut task_manager,
let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
task_manager_handle,
run_until_ctrl_c(
async move { command_handle.await.expect("Failed to join blocking task") },
),
@@ -119,10 +121,10 @@ impl CliRunner {
error!(target: "reth::cli", "shutting down due to error");
} else {
debug!(target: "reth::cli", "shutting down gracefully");
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
}
tokio_shutdown(tokio_runtime, true);
runtime_shutdown(self.runtime, true);
command_res
}
@@ -133,48 +135,40 @@ impl CliRunner {
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<std::io::Error> + 'static,
{
self.tokio_runtime.block_on(run_until_ctrl_c(fut))?;
self.runtime.handle().block_on(run_until_ctrl_c(fut))?;
Ok(())
}
/// Executes a regular future as a spawned blocking task until completion or until external
/// signal received.
///
/// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking) .
/// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking).
pub fn run_blocking_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
where
F: Future<Output = Result<(), E>> + Send + 'static,
E: Send + Sync + From<std::io::Error> + 'static,
{
let tokio_runtime = self.tokio_runtime;
let handle = tokio_runtime.handle().clone();
let fut = tokio_runtime.handle().spawn_blocking(move || handle.block_on(fut));
tokio_runtime
let handle = self.runtime.handle().clone();
let handle2 = handle.clone();
let fut = handle.spawn_blocking(move || handle2.block_on(fut));
self.runtime
.handle()
.block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
tokio_shutdown(tokio_runtime, false);
runtime_shutdown(self.runtime, false);
Ok(())
}
}
/// [`CliRunner`] configuration when executing commands asynchronously
struct AsyncCliRunner {
context: CliContext,
task_manager: TaskManager,
tokio_runtime: tokio::runtime::Runtime,
}
// === impl AsyncCliRunner ===
impl AsyncCliRunner {
/// Given a tokio [`Runtime`](tokio::runtime::Runtime), creates additional context required to
/// execute commands asynchronously.
fn new(tokio_runtime: tokio::runtime::Runtime) -> Self {
let task_manager = TaskManager::new(tokio_runtime.handle().clone());
let task_executor = task_manager.executor();
Self { context: CliContext { task_executor }, task_manager, tokio_runtime }
}
/// Extracts the task manager handle from the runtime and creates the [`CliContext`].
fn cli_context(
runtime: &reth_tasks::Runtime,
) -> (CliContext, JoinHandle<Result<(), PanickedTaskError>>) {
let handle =
runtime.take_task_manager_handle().expect("Runtime must contain a TaskManager handle");
let context = CliContext { task_executor: runtime.clone() };
(context, handle)
}
/// Additional context provided by the [`CliRunner`] when executing commands
@@ -216,37 +210,25 @@ impl CliRunnerConfig {
}
}
/// Creates a new default tokio multi-thread [Runtime](tokio::runtime::Runtime) with all features
/// enabled
pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
// Keep the threads alive for at least the block time (12 seconds) plus buffer.
// This prevents the costly process of spawning new threads on every
// new block, and instead reuses the existing threads.
.thread_keep_alive(Duration::from_secs(15))
.thread_name("tokio-rt")
.build()
}
/// Runs the given future to completion or until a critical task panicked.
///
/// Returns the error if a task panicked, or the given future returned an error.
async fn run_to_completion_or_panic<F, E>(tasks: &mut TaskManager, fut: F) -> Result<(), E>
async fn run_to_completion_or_panic<F, E>(
task_manager_handle: JoinHandle<Result<(), PanickedTaskError>>,
fut: F,
) -> Result<(), E>
where
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
{
{
let fut = pin!(fut);
tokio::select! {
task_manager_result = tasks => {
if let Err(panicked_error) = task_manager_result {
return Err(panicked_error.into());
}
},
res = fut => res?,
}
let fut = pin!(fut);
tokio::select! {
task_manager_result = task_manager_handle => {
if let Ok(Err(panicked_error)) = task_manager_result {
return Err(panicked_error.into());
}
},
res = fut => res?,
}
Ok(())
}
@@ -271,10 +253,10 @@ where
tokio::select! {
_ = ctrl_c => {
trace!(target: "reth::cli", "Received ctrl-c");
info!(target: "reth::cli", "Received ctrl-c");
},
_ = sigterm => {
trace!(target: "reth::cli", "Received SIGTERM");
info!(target: "reth::cli", "Received SIGTERM");
},
res = fut => res?,
}
@@ -287,7 +269,7 @@ where
tokio::select! {
_ = ctrl_c => {
trace!(target: "reth::cli", "Received ctrl-c");
info!(target: "reth::cli", "Received ctrl-c");
},
res = fut => res?,
}
@@ -296,17 +278,17 @@ where
Ok(())
}
/// Shut down the given Tokio runtime, and wait for it if `wait` is set.
/// Default timeout for waiting on the tokio runtime to shut down.
const DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
/// Shut down the given [`Runtime`](reth_tasks::Runtime), and wait for it if `wait` is set.
///
/// `drop(tokio_runtime)` would block the current thread until its pools
/// (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop
/// it on a separate thread and wait for up to 5 seconds for this operation to
/// complete.
fn tokio_shutdown(rt: tokio::runtime::Runtime, wait: bool) {
// Shutdown the runtime on a separate thread
/// Dropping the runtime on the current thread could block due to tokio pool teardown.
/// Instead, we drop it on a separate thread and optionally wait for completion.
fn runtime_shutdown(rt: reth_tasks::Runtime, wait: bool) {
let (tx, rx) = mpsc::channel();
std::thread::Builder::new()
.name("tokio-shutdown".to_string())
.name("rt-shutdown".to_string())
.spawn(move || {
drop(rt);
let _ = tx.send(());
@@ -314,8 +296,8 @@ fn tokio_shutdown(rt: tokio::runtime::Runtime, wait: bool) {
.unwrap();
if wait {
let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
let _ = rx.recv_timeout(DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT).inspect_err(|err| {
tracing::warn!(target: "reth::cli", %err, "runtime shutdown timed out");
});
}
}

View File

@@ -11,7 +11,6 @@ use reth_node_builder::{
PayloadTypes,
};
use reth_provider::providers::{BlockchainProvider, NodeTypesForProvider};
use reth_tasks::TaskManager;
use std::sync::Arc;
use wallet::Wallet;
@@ -50,7 +49,7 @@ pub async fn setup<N>(
chain_spec: Arc<N::ChainSpec>,
is_dev: bool,
attributes_generator: impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes + Send + Sync + Copy + 'static,
) -> eyre::Result<(Vec<NodeHelperType<N>>, TaskManager, Wallet)>
) -> eyre::Result<(Vec<NodeHelperType<N>>, Wallet)>
where
N: NodeBuilderHelper,
{
@@ -69,7 +68,6 @@ pub async fn setup_engine<N>(
attributes_generator: impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes + Send + Sync + Copy + 'static,
) -> eyre::Result<(
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
TaskManager,
Wallet,
)>
where
@@ -96,7 +94,6 @@ pub async fn setup_engine_with_connection<N>(
connect_nodes: bool,
) -> eyre::Result<(
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
TaskManager,
Wallet,
)>
where

View File

@@ -14,7 +14,7 @@ use reth_node_core::args::{DiscoveryArgs, NetworkArgs, RpcServerArgs};
use reth_primitives_traits::AlloyBlockHeader;
use reth_provider::providers::BlockchainProvider;
use reth_rpc_server_types::RpcModuleSelection;
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use std::sync::Arc;
use tracing::{span, Instrument, Level};
@@ -110,11 +110,9 @@ where
self,
) -> eyre::Result<(
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
TaskManager,
Wallet,
)> {
let tasks = TaskManager::current();
let exec = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
let network_config = NetworkArgs {
discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },
@@ -153,7 +151,7 @@ where
let span = span!(Level::INFO, "node", idx);
let node = N::default();
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
.testing_node(exec.clone())
.testing_node(runtime.clone())
.with_types_and_provider::<N, BlockchainProvider<_>>()
.with_components(node.components_builder())
.with_add_ons(node.add_ons())
@@ -197,7 +195,7 @@ where
}
}
Ok((nodes, tasks, Wallet::default().with_chain_id(self.chain_spec.chain().into())))
Ok((nodes, Wallet::default().with_chain_id(self.chain_spec.chain().into())))
}
}

View File

@@ -15,7 +15,7 @@ use reth_provider::{
};
use reth_rpc_server_types::RpcModuleSelection;
use reth_stages_types::StageId;
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use std::{path::Path, sync::Arc};
use tempfile::TempDir;
use tracing::{debug, info, span, Level};
@@ -24,8 +24,6 @@ use tracing::{debug, info, span, Level};
pub struct ChainImportResult {
/// The nodes that were created
pub nodes: Vec<NodeHelperType<EthereumNode>>,
/// The task manager
pub task_manager: TaskManager,
/// The wallet for testing
pub wallet: Wallet,
/// Temporary directories that must be kept alive for the duration of the test
@@ -68,8 +66,7 @@ pub async fn setup_engine_with_chain_import(
+ Copy
+ 'static,
) -> eyre::Result<ChainImportResult> {
let tasks = TaskManager::current();
let exec = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
let network_config = NetworkArgs {
discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },
@@ -129,6 +126,7 @@ pub async fn setup_engine_with_chain_import(
.with_default_tables()
.build()
.unwrap(),
reth_tasks::Runtime::test(),
)?;
// Initialize genesis if needed
@@ -221,7 +219,7 @@ pub async fn setup_engine_with_chain_import(
let node = EthereumNode::default();
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
.testing_node_with_datadir(exec.clone(), datadir.clone())
.testing_node_with_datadir(runtime.clone(), datadir.clone())
.with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
.with_components(node.components_builder())
.with_add_ons(node.add_ons())
@@ -243,7 +241,6 @@ pub async fn setup_engine_with_chain_import(
Ok(ChainImportResult {
nodes,
task_manager: tasks,
wallet: crate::Wallet::default().with_chain_id(chain_spec.chain.id()),
_temp_dirs: temp_dirs,
})
@@ -333,6 +330,7 @@ mod tests {
.with_default_tables()
.build()
.unwrap(),
reth_tasks::Runtime::test(),
)
.expect("failed to create provider factory");
@@ -397,6 +395,7 @@ mod tests {
.with_default_tables()
.build()
.unwrap(),
reth_tasks::Runtime::test(),
)
.expect("failed to create provider factory");
@@ -497,6 +496,7 @@ mod tests {
.with_default_tables()
.build()
.unwrap(),
reth_tasks::Runtime::test(),
)
.expect("failed to create provider factory");

View File

@@ -1,6 +1,6 @@
//! Test setup utilities for configuring the initial state.
use crate::{setup_engine_with_connection, testsuite::Environment, NodeBuilderHelper};
use crate::{testsuite::Environment, E2ETestSetupBuilder, NodeBuilderHelper};
use alloy_eips::BlockNumberOrTag;
use alloy_primitives::B256;
use alloy_rpc_types_engine::{ForkchoiceState, PayloadAttributes};
@@ -38,6 +38,8 @@ pub struct Setup<I> {
shutdown_tx: Option<mpsc::Sender<()>>,
/// Is this setup in dev mode
pub is_dev: bool,
/// Whether to use v2 storage mode (hashed keys, static file changesets, rocksdb history)
pub storage_v2: bool,
/// Tracks instance generic.
_phantom: PhantomData<I>,
/// Holds the import result to keep nodes alive when using imported chain
@@ -58,6 +60,7 @@ impl<I> Default for Setup<I> {
tree_config: TreeConfig::default(),
shutdown_tx: None,
is_dev: true,
storage_v2: false,
_phantom: Default::default(),
import_result_holder: None,
import_rlp_path: None,
@@ -126,6 +129,12 @@ where
self
}
/// Enable v2 storage mode (hashed keys, static file changesets, rocksdb history)
pub const fn with_storage_v2(mut self) -> Self {
self.storage_v2 = true;
self
}
/// Apply setup using pre-imported chain data from RLP file
pub async fn apply_with_import<N>(
&mut self,
@@ -194,23 +203,32 @@ where
self.shutdown_tx = Some(shutdown_tx);
let is_dev = self.is_dev;
let storage_v2 = self.storage_v2;
let node_count = self.network.node_count;
let tree_config = self.tree_config.clone();
let attributes_generator = Self::create_static_attributes_generator::<N>();
let result = setup_engine_with_connection::<N>(
let mut builder = E2ETestSetupBuilder::<N, _>::new(
node_count,
Arc::<N::ChainSpec>::new((*chain_spec).clone().into()),
is_dev,
self.tree_config.clone(),
attributes_generator,
self.network.connect_nodes,
)
.await;
.with_tree_config_modifier(move |base| {
tree_config.clone().with_cross_block_cache_size(base.cross_block_cache_size())
})
.with_node_config_modifier(move |config| config.set_dev(is_dev))
.with_connect_nodes(self.network.connect_nodes);
if storage_v2 {
builder = builder.with_storage_v2();
}
let result = builder.build().await;
let mut node_clients = Vec::new();
match result {
Ok((nodes, executor, _wallet)) => {
Ok((nodes, _wallet)) => {
// create HTTP clients for each node's RPC and Engine API endpoints
for node in &nodes {
node_clients.push(node.to_node_client()?);
@@ -218,12 +236,11 @@ where
// spawn a separate task just to handle the shutdown
tokio::spawn(async move {
// keep nodes and executor in scope to ensure they're not dropped
// keep nodes in scope to ensure they're not dropped
let _nodes = nodes;
let _executor = executor;
// Wait for shutdown signal
let _ = shutdown_rx.recv().await;
// nodes and executor will be dropped here when the test completes
// nodes will be dropped here when the test completes
});
}
Err(e) => {

View File

@@ -370,15 +370,14 @@ async fn test_setup_builder_with_custom_tree_config() -> Result<()> {
.build(),
);
let (nodes, _tasks, _wallet) =
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, |_| {
EthPayloadBuilderAttributes::default()
})
.with_tree_config_modifier(|config| {
config.with_persistence_threshold(0).with_memory_block_buffer_target(5)
})
.build()
.await?;
let (nodes, _wallet) = E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, |_| {
EthPayloadBuilderAttributes::default()
})
.with_tree_config_modifier(|config| {
config.with_persistence_threshold(0).with_memory_block_buffer_target(5)
})
.build()
.await?;
assert_eq!(nodes.len(), 1);

View File

@@ -10,7 +10,6 @@ use jsonrpsee::core::client::ClientT;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
use reth_db::tables;
use reth_e2e_test_utils::{transaction::TransactionTestContext, wallet, E2ETestSetupBuilder};
use reth_node_core::args::RocksDbArgs;
use reth_node_ethereum::EthereumNode;
use reth_payload_builder::EthPayloadBuilderAttributes;
use reth_provider::RocksDBProviderFactory;
@@ -96,22 +95,6 @@ fn test_attributes_generator(timestamp: u64) -> EthPayloadBuilderAttributes {
EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
}
/// Verifies that `RocksDB` CLI defaults are `None` (deferred to storage mode).
#[test]
fn test_rocksdb_defaults_are_none() {
let args = RocksDbArgs::default();
assert!(args.tx_hash.is_none(), "tx_hash default should be None (deferred to --storage.v2)");
assert!(
args.storages_history.is_none(),
"storages_history default should be None (deferred to --storage.v2)"
);
assert!(
args.account_history.is_none(),
"account_history default should be None (deferred to --storage.v2)"
);
}
/// Smoke test: node boots with `RocksDB` routing enabled.
#[tokio::test]
async fn test_rocksdb_node_startup() -> Result<()> {
@@ -119,7 +102,7 @@ async fn test_rocksdb_node_startup() -> Result<()> {
let chain_spec = test_chain_spec();
let (nodes, _tasks, _wallet) =
let (nodes, _wallet) =
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
.with_storage_v2()
.build()
@@ -147,7 +130,7 @@ async fn test_rocksdb_block_mining() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _wallet) =
let (mut nodes, _wallet) =
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
.with_storage_v2()
.build()
@@ -201,7 +184,7 @@ async fn test_rocksdb_transaction_queries() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
@@ -268,7 +251,7 @@ async fn test_rocksdb_multi_tx_same_block() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
@@ -336,7 +319,7 @@ async fn test_rocksdb_txs_across_blocks() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
@@ -421,7 +404,7 @@ async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
@@ -477,7 +460,7 @@ async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
///
/// This test exercises `unwind_trie_state_from` which previously failed with
/// `UnsortedInput` errors because it read changesets directly from MDBX tables
/// instead of using storage-aware methods that check `storage_changesets_in_static_files`.
/// instead of using storage-aware methods that check `is_v2()`.
#[tokio::test]
async fn test_rocksdb_reorg_unwind() -> Result<()> {
reth_tracing::init_test_tracing();
@@ -485,7 +468,7 @@ async fn test_rocksdb_reorg_unwind() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,

View File

@@ -1,6 +1,7 @@
//! Engine tree configuration.
use alloy_eips::merge::EPOCH_SLOTS;
use core::time::Duration;
/// Triggers persistence when the number of canonical blocks in memory exceeds this threshold.
pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
@@ -64,6 +65,9 @@ pub const DEFAULT_SPARSE_TRIE_PRUNE_DEPTH: usize = 4;
/// Storage tries beyond this limit are cleared (but allocations preserved).
pub const DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES: usize = 100;
/// Default timeout for the state root task before spawning a sequential fallback.
pub const DEFAULT_STATE_ROOT_TASK_TIMEOUT: Duration = Duration::from_secs(1);
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = EPOCH_SLOTS as u32 * 2;
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
@@ -175,6 +179,13 @@ pub struct TreeConfig {
sparse_trie_prune_depth: usize,
/// Maximum number of storage tries to retain after pruning.
sparse_trie_max_storage_tries: usize,
/// Whether to fully disable sparse trie cache pruning between blocks.
disable_sparse_trie_cache_pruning: bool,
/// Timeout for the state root task before spawning a sequential fallback computation.
/// If `Some`, after waiting this duration for the state root task, a sequential state root
/// computation is spawned in parallel and whichever finishes first is used.
/// If `None`, the timeout fallback is disabled.
state_root_task_timeout: Option<Duration>,
}
impl Default for TreeConfig {
@@ -207,6 +218,8 @@ impl Default for TreeConfig {
disable_trie_cache: false,
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
disable_sparse_trie_cache_pruning: false,
state_root_task_timeout: Some(DEFAULT_STATE_ROOT_TASK_TIMEOUT),
}
}
}
@@ -241,6 +254,7 @@ impl TreeConfig {
disable_cache_metrics: bool,
sparse_trie_prune_depth: usize,
sparse_trie_max_storage_tries: usize,
state_root_task_timeout: Option<Duration>,
) -> Self {
Self {
persistence_threshold,
@@ -270,6 +284,8 @@ impl TreeConfig {
disable_trie_cache: false,
sparse_trie_prune_depth,
sparse_trie_max_storage_tries,
disable_sparse_trie_cache_pruning: false,
state_root_task_timeout,
}
}
@@ -618,4 +634,26 @@ impl TreeConfig {
self.sparse_trie_max_storage_tries = max_tries;
self
}
/// Returns whether sparse trie cache pruning is disabled.
pub const fn disable_sparse_trie_cache_pruning(&self) -> bool {
self.disable_sparse_trie_cache_pruning
}
/// Setter for whether to disable sparse trie cache pruning.
pub const fn with_disable_sparse_trie_cache_pruning(mut self, value: bool) -> Self {
self.disable_sparse_trie_cache_pruning = value;
self
}
/// Returns the state root task timeout.
pub const fn state_root_task_timeout(&self) -> Option<Duration> {
self.state_root_task_timeout
}
/// Setter for state root task timeout.
pub const fn with_state_root_task_timeout(mut self, timeout: Option<Duration>) -> Self {
self.state_root_task_timeout = timeout;
self
}
}

View File

@@ -23,7 +23,7 @@ use serde::{de::DeserializeOwned, Serialize};
// Re-export [`ExecutionPayload`] moved to `reth_payload_primitives`
#[cfg(feature = "std")]
pub use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator, ExecutableTxTuple};
pub use reth_evm::{ConfigureEngineEvm, ConvertTx, ExecutableTxIterator, ExecutableTxTuple};
pub use reth_payload_primitives::ExecutionPayload;
mod error;

View File

@@ -20,7 +20,7 @@ use reth_node_types::{BlockTy, NodeTypes};
use reth_payload_builder::PayloadBuilderHandle;
use reth_provider::{
providers::{BlockchainProvider, ProviderNodeTypes},
ProviderFactory,
ProviderFactory, StorageSettingsCache,
};
use reth_prune::PrunerWithFactory;
use reth_stages_api::{MetricEventsSender, Pipeline};
@@ -94,6 +94,7 @@ where
if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
let downloader = BasicBlockDownloader::new(client, consensus.clone());
let use_hashed_state = provider.cached_storage_settings().use_hashed_state();
let persistence_handle =
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
@@ -111,6 +112,7 @@ where
engine_kind,
evm_config,
changeset_cache,
use_hashed_state,
);
let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
@@ -201,6 +203,7 @@ mod tests {
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
changeset_cache.clone(),
reth_tasks::Runtime::test(),
);
let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();

View File

@@ -32,7 +32,6 @@ reth-stages-api.workspace = true
reth-tasks.workspace = true
reth-trie-parallel.workspace = true
reth-trie-sparse = { workspace = true, features = ["std", "metrics"] }
reth-trie-sparse-parallel = { workspace = true, features = ["std"] }
reth-trie.workspace = true
reth-trie-common.workspace = true
reth-trie-db.workspace = true
@@ -142,7 +141,15 @@ test-utils = [
"reth-ethereum-primitives/test-utils",
"reth-node-ethereum/test-utils",
"reth-evm-ethereum/test-utils",
"reth-tasks/test-utils",
]
rocksdb = [
"reth-provider/rocksdb",
"reth-prune/rocksdb",
"reth-stages?/rocksdb",
"reth-e2e-test-utils/rocksdb",
]
edge = ["rocksdb"]
[[test]]
name = "e2e_testsuite"

View File

@@ -12,8 +12,7 @@ use rand::Rng;
use reth_chainspec::ChainSpec;
use reth_db_common::init::init_genesis;
use reth_engine_tree::tree::{
executor::WorkloadExecutor, precompile_cache::PrecompileCacheMap, PayloadProcessor,
StateProviderBuilder, TreeConfig,
precompile_cache::PrecompileCacheMap, PayloadProcessor, StateProviderBuilder, TreeConfig,
};
use reth_ethereum_primitives::TransactionSigned;
use reth_evm::OnStateHook;
@@ -219,7 +218,7 @@ fn bench_state_root(c: &mut Criterion) {
setup_provider(&factory, &state_updates).expect("failed to setup provider");
let payload_processor = PayloadProcessor::new(
WorkloadExecutor::default(),
reth_tasks::Runtime::test(),
EthEvmConfig::new(factory.chain_spec()),
&TreeConfig::default(),
PrecompileCacheMap::default(),

View File

@@ -138,7 +138,7 @@ impl<N: ProviderNodeTypes> PipelineSync<N> {
let (tx, rx) = oneshot::channel();
let pipeline = pipeline.take().expect("exists");
self.pipeline_task_spawner.spawn_critical_blocking(
self.pipeline_task_spawner.spawn_critical_blocking_task(
"pipeline task",
Box::pin(async move {
let result = pipeline.run_as_fut(Some(target)).await;

View File

@@ -76,8 +76,16 @@ impl CacheConfig for EpochCacheConfig {
type FixedCache<K, V, H = DefaultHashBuilder> = fixed_cache::Cache<K, V, H, EpochCacheConfig>;
/// A wrapper of a state provider and a shared cache.
///
/// The const generic `PREWARM` controls whether every cache miss is populated. This is only
/// relevant for pre-warm transaction execution with the intention to pre-populate the cache with
/// data for regular block execution. During regular block execution the cache doesn't need to be
/// populated because the actual EVM database [`State`](revm::database::State) also caches
/// internally during block execution and the cache is then updated after the block with the entire
/// [`BundleState`] output of that block which contains all accessed accounts, code, storage. See
/// also [`ExecutionCache::insert_state`].
#[derive(Debug)]
pub struct CachedStateProvider<S> {
pub struct CachedStateProvider<S, const PREWARM: bool = false> {
/// The state provider
state_provider: S,
@@ -86,15 +94,9 @@ pub struct CachedStateProvider<S> {
/// Metrics for the cached state provider
metrics: CachedStateMetrics,
/// If prewarm enabled we populate every cache miss
prewarm: bool,
}
impl<S> CachedStateProvider<S>
where
S: StateProvider,
{
impl<S> CachedStateProvider<S> {
/// Creates a new [`CachedStateProvider`] from an [`ExecutionCache`], state provider, and
/// [`CachedStateMetrics`].
pub const fn new(
@@ -102,27 +104,18 @@ where
caches: ExecutionCache,
metrics: CachedStateMetrics,
) -> Self {
Self { state_provider, caches, metrics, prewarm: false }
Self { state_provider, caches, metrics }
}
}
impl<S> CachedStateProvider<S> {
/// Enables pre-warm mode so that every cache miss is populated.
///
/// This is only relevant for pre-warm transaction execution with the intention to pre-populate
/// the cache with data for regular block execution. During regular block execution the
/// cache doesn't need to be populated because the actual EVM database
/// [`State`](revm::database::State) also caches internally during block execution and the cache
/// is then updated after the block with the entire [`BundleState`] output of that block which
/// contains all accessed accounts,code,storage. See also [`ExecutionCache::insert_state`].
pub const fn prewarm(mut self) -> Self {
self.prewarm = true;
self
}
/// Returns whether this provider should pre-warm cache misses.
const fn is_prewarm(&self) -> bool {
self.prewarm
impl<S> CachedStateProvider<S, true> {
/// Creates a new [`CachedStateProvider`] with prewarming enabled.
pub const fn new_prewarm(
state_provider: S,
caches: ExecutionCache,
metrics: CachedStateMetrics,
) -> Self {
Self { state_provider, caches, metrics }
}
}
@@ -307,9 +300,9 @@ impl<K: PartialEq, V> StatsHandler<K, V> for CacheStatsHandler {
}
}
impl<S: AccountReader> AccountReader for CachedStateProvider<S> {
impl<S: AccountReader, const PREWARM: bool> AccountReader for CachedStateProvider<S, PREWARM> {
fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
if self.is_prewarm() {
if PREWARM {
match self.caches.get_or_try_insert_account_with(*address, || {
self.state_provider.basic_account(address)
})? {
@@ -334,13 +327,13 @@ pub enum CachedStatus<T> {
Cached(T),
}
impl<S: StateProvider> StateProvider for CachedStateProvider<S> {
impl<S: StateProvider, const PREWARM: bool> StateProvider for CachedStateProvider<S, PREWARM> {
fn storage(
&self,
account: Address,
storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
if self.is_prewarm() {
if PREWARM {
match self.caches.get_or_try_insert_storage_with(account, storage_key, || {
self.state_provider.storage(account, storage_key).map(Option::unwrap_or_default)
})? {
@@ -358,11 +351,19 @@ impl<S: StateProvider> StateProvider for CachedStateProvider<S> {
self.state_provider.storage(account, storage_key)
}
}
fn storage_by_hashed_key(
&self,
address: Address,
hashed_storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
self.state_provider.storage_by_hashed_key(address, hashed_storage_key)
}
}
impl<S: BytecodeReader> BytecodeReader for CachedStateProvider<S> {
impl<S: BytecodeReader, const PREWARM: bool> BytecodeReader for CachedStateProvider<S, PREWARM> {
fn bytecode_by_hash(&self, code_hash: &B256) -> ProviderResult<Option<Bytecode>> {
if self.is_prewarm() {
if PREWARM {
match self.caches.get_or_try_insert_code_with(*code_hash, || {
self.state_provider.bytecode_by_hash(code_hash)
})? {
@@ -378,7 +379,9 @@ impl<S: BytecodeReader> BytecodeReader for CachedStateProvider<S> {
}
}
impl<S: StateRootProvider> StateRootProvider for CachedStateProvider<S> {
impl<S: StateRootProvider, const PREWARM: bool> StateRootProvider
for CachedStateProvider<S, PREWARM>
{
fn state_root(&self, hashed_state: HashedPostState) -> ProviderResult<B256> {
self.state_provider.state_root(hashed_state)
}
@@ -402,7 +405,9 @@ impl<S: StateRootProvider> StateRootProvider for CachedStateProvider<S> {
}
}
impl<S: StateProofProvider> StateProofProvider for CachedStateProvider<S> {
impl<S: StateProofProvider, const PREWARM: bool> StateProofProvider
for CachedStateProvider<S, PREWARM>
{
fn proof(
&self,
input: TrieInput,
@@ -429,7 +434,9 @@ impl<S: StateProofProvider> StateProofProvider for CachedStateProvider<S> {
}
}
impl<S: StorageRootProvider> StorageRootProvider for CachedStateProvider<S> {
impl<S: StorageRootProvider, const PREWARM: bool> StorageRootProvider
for CachedStateProvider<S, PREWARM>
{
fn storage_root(
&self,
address: Address,
@@ -457,7 +464,7 @@ impl<S: StorageRootProvider> StorageRootProvider for CachedStateProvider<S> {
}
}
impl<S: BlockHashReader> BlockHashReader for CachedStateProvider<S> {
impl<S: BlockHashReader, const PREWARM: bool> BlockHashReader for CachedStateProvider<S, PREWARM> {
fn block_hash(&self, number: alloy_primitives::BlockNumber) -> ProviderResult<Option<B256>> {
self.state_provider.block_hash(number)
}
@@ -471,7 +478,9 @@ impl<S: BlockHashReader> BlockHashReader for CachedStateProvider<S> {
}
}
impl<S: HashedPostStateProvider> HashedPostStateProvider for CachedStateProvider<S> {
impl<S: HashedPostStateProvider, const PREWARM: bool> HashedPostStateProvider
for CachedStateProvider<S, PREWARM>
{
fn hashed_post_state(&self, bundle_state: &reth_revm::db::BundleState) -> HashedPostState {
self.state_provider.hashed_post_state(bundle_state)
}
@@ -836,8 +845,10 @@ impl SavedCache {
self.caches.update_metrics(&self.metrics);
}
/// Clears all caches, resetting them to empty state.
pub(crate) fn clear(&self) {
/// Clears all caches, resetting them to empty state,
/// and updates the hash of the block this cache belongs to.
pub(crate) fn clear_with_hash(&mut self, hash: B256) {
self.hash = hash;
self.caches.clear();
}
}

View File

@@ -199,6 +199,17 @@ impl<S: StateProvider> StateProvider for InstrumentedStateProvider<S> {
self.record_storage_fetch(start.elapsed());
res
}
fn storage_by_hashed_key(
&self,
address: Address,
hashed_storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
let start = Instant::now();
let res = self.state_provider.storage_by_hashed_key(address, hashed_storage_key);
self.record_storage_fetch(start.elapsed());
res
}
}
impl<S: BytecodeReader> BytecodeReader for InstrumentedStateProvider<S> {

View File

@@ -8,9 +8,18 @@ use reth_metrics::{
metrics::{Counter, Gauge, Histogram},
Metrics,
};
use reth_primitives_traits::constants::gas_units::MEGAGAS;
use reth_trie::updates::TrieUpdates;
use std::time::{Duration, Instant};
/// Upper bounds for each gas bucket. The last bucket is a catch-all for
/// everything above the final threshold: <5M, 5-10M, 10-20M, 20-30M, 30-40M, >40M.
const GAS_BUCKET_THRESHOLDS: [u64; 5] =
[5 * MEGAGAS, 10 * MEGAGAS, 20 * MEGAGAS, 30 * MEGAGAS, 40 * MEGAGAS];
/// Total number of gas buckets (thresholds + 1 catch-all).
const NUM_GAS_BUCKETS: usize = GAS_BUCKET_THRESHOLDS.len() + 1;
/// Metrics for the `EngineApi`.
#[derive(Debug, Default)]
pub struct EngineApiMetrics {
@@ -90,8 +99,9 @@ impl EngineApiMetrics {
pub struct TreeMetrics {
/// The highest block number in the canonical chain
pub canonical_chain_height: Gauge,
/// The number of reorgs
pub reorgs: Counter,
/// Metrics for reorgs.
#[metric(skip)]
pub reorgs: ReorgMetrics,
/// The latest reorg depth
pub latest_reorg_depth: Gauge,
/// The current safe block height (this is required by optimism)
@@ -100,6 +110,27 @@ pub struct TreeMetrics {
pub finalized_block_height: Gauge,
}
/// Metrics for reorgs.
#[derive(Debug)]
pub struct ReorgMetrics {
/// The number of head block reorgs
pub head: Counter,
/// The number of safe block reorgs
pub safe: Counter,
/// The number of finalized block reorgs
pub finalized: Counter,
}
impl Default for ReorgMetrics {
fn default() -> Self {
Self {
head: metrics::counter!("blockchain_tree_reorgs", "commitment" => "head"),
safe: metrics::counter!("blockchain_tree_reorgs", "commitment" => "safe"),
finalized: metrics::counter!("blockchain_tree_reorgs", "commitment" => "finalized"),
}
}
}
/// Metrics for the `EngineApi`.
#[derive(Metrics)]
#[metrics(scope = "consensus.engine.beacon")]
@@ -213,6 +244,65 @@ impl ForkchoiceUpdatedMetrics {
}
}
/// Per-gas-bucket newPayload metrics, initialized once via [`Self::new_with_labels`].
#[derive(Clone, Metrics)]
#[metrics(scope = "consensus.engine.beacon")]
pub(crate) struct NewPayloadGasBucketMetrics {
/// Latency for new payload calls in this gas bucket.
pub(crate) new_payload_gas_bucket_latency: Histogram,
/// Gas per second for new payload calls in this gas bucket.
pub(crate) new_payload_gas_bucket_gas_per_second: Histogram,
}
/// Holds pre-initialized [`NewPayloadGasBucketMetrics`] instances, one per gas bucket.
#[derive(Debug)]
pub(crate) struct GasBucketMetrics {
buckets: [NewPayloadGasBucketMetrics; NUM_GAS_BUCKETS],
}
impl Default for GasBucketMetrics {
fn default() -> Self {
Self {
buckets: std::array::from_fn(|i| {
let label = Self::bucket_label(i);
NewPayloadGasBucketMetrics::new_with_labels(&[("gas_bucket", label)])
}),
}
}
}
impl GasBucketMetrics {
fn record(&self, gas_used: u64, elapsed: Duration) {
let idx = Self::bucket_index(gas_used);
self.buckets[idx].new_payload_gas_bucket_latency.record(elapsed);
self.buckets[idx]
.new_payload_gas_bucket_gas_per_second
.record(gas_used as f64 / elapsed.as_secs_f64());
}
fn bucket_index(gas_used: u64) -> usize {
GAS_BUCKET_THRESHOLDS
.iter()
.position(|&threshold| gas_used < threshold)
.unwrap_or(GAS_BUCKET_THRESHOLDS.len())
}
/// Returns a human-readable label like `<5M`, `5-10M`, … `>40M`.
fn bucket_label(index: usize) -> String {
if index == 0 {
let hi = GAS_BUCKET_THRESHOLDS[0] / MEGAGAS;
format!("<{hi}M")
} else if index < GAS_BUCKET_THRESHOLDS.len() {
let lo = GAS_BUCKET_THRESHOLDS[index - 1] / MEGAGAS;
let hi = GAS_BUCKET_THRESHOLDS[index] / MEGAGAS;
format!("{lo}-{hi}M")
} else {
let lo = GAS_BUCKET_THRESHOLDS[GAS_BUCKET_THRESHOLDS.len() - 1] / MEGAGAS;
format!(">{lo}M")
}
}
}
/// Metrics for engine newPayload responses.
#[derive(Metrics)]
#[metrics(scope = "consensus.engine.beacon")]
@@ -223,6 +313,9 @@ pub(crate) struct NewPayloadStatusMetrics {
/// Start time of the latest new payload call.
#[metric(skip)]
pub(crate) latest_start_at: Option<Instant>,
/// Gas-bucket-labeled latency and gas/s histograms.
#[metric(skip)]
pub(crate) gas_bucket: GasBucketMetrics,
/// The total count of new payload messages received.
pub(crate) new_payload_messages: Counter,
/// The total count of new payload messages that we responded to with
@@ -299,6 +392,7 @@ impl NewPayloadStatusMetrics {
self.new_payload_messages.increment(1);
self.new_payload_latency.record(elapsed);
self.new_payload_last.set(elapsed);
self.gas_bucket.record(gas_used, elapsed);
if let Some(latest_forkchoice_updated_at) = latest_forkchoice_updated_at.take() {
self.forkchoice_updated_new_payload_time_diff
.record(start - latest_forkchoice_updated_at);
@@ -343,6 +437,8 @@ pub struct BlockValidationMetrics {
pub state_root_parallel_fallback_total: Counter,
/// Total number of times the state root task failed but the fallback succeeded.
pub state_root_task_fallback_success_total: Counter,
/// Total number of times the state root task timed out and a sequential fallback was spawned.
pub state_root_task_timeout_total: Counter,
/// Latest state root duration, ie the time spent blocked waiting for the state root.
pub state_root_duration: Gauge,
/// Histogram for state root duration ie the time spent blocked waiting for the state root

View File

@@ -32,12 +32,13 @@ use reth_provider::{
BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
TransactionVariant,
StorageSettingsCache, TransactionVariant,
};
use reth_revm::database::StateProviderDatabase;
use reth_stages_api::ControlFlow;
use reth_tasks::spawn_os_thread;
use reth_trie_db::ChangesetCache;
use revm::interpreter::debug_unreachable;
use state::TreeState;
use std::{fmt::Debug, ops, sync::Arc, time::Instant};
@@ -270,6 +271,9 @@ where
evm_config: C,
/// Changeset cache for in-memory trie changesets
changeset_cache: ChangesetCache,
/// Whether the node uses hashed state as canonical storage (v2 mode).
/// Cached at construction to avoid threading `StorageSettingsCache` bounds everywhere.
use_hashed_state: bool,
}
impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
@@ -295,6 +299,7 @@ where
.field("engine_kind", &self.engine_kind)
.field("evm_config", &self.evm_config)
.field("changeset_cache", &self.changeset_cache)
.field("use_hashed_state", &self.use_hashed_state)
.finish()
}
}
@@ -312,7 +317,8 @@ where
P::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
+ StageCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader,
+ StorageChangeSetReader
+ StorageSettingsCache,
C: ConfigureEvm<Primitives = N> + 'static,
T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
V: EngineValidator<T>,
@@ -333,6 +339,7 @@ where
engine_kind: EngineApiKind,
evm_config: C,
changeset_cache: ChangesetCache,
use_hashed_state: bool,
) -> Self {
let (incoming_tx, incoming) = crossbeam_channel::unbounded();
@@ -354,6 +361,7 @@ where
engine_kind,
evm_config,
changeset_cache,
use_hashed_state,
}
}
@@ -374,6 +382,7 @@ where
kind: EngineApiKind,
evm_config: C,
changeset_cache: ChangesetCache,
use_hashed_state: bool,
) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
{
let best_block_number = provider.best_block_number().unwrap_or(0);
@@ -406,6 +415,7 @@ where
kind,
evm_config,
changeset_cache,
use_hashed_state,
);
let incoming = task.incoming_tx.clone();
spawn_os_thread("engine", || task.run());
@@ -1401,7 +1411,7 @@ where
// Spawn a background task to trigger computation so it's ready when the next payload
// arrives.
if let Some(overlay) = self.state.tree_state.prepare_canonical_overlay() {
rayon::spawn(move || {
tokio::task::spawn_blocking(move || {
let _ = overlay.get();
});
}
@@ -1509,7 +1519,7 @@ where
.engine
.failed_forkchoice_updated_response_deliveries
.increment(1);
error!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
}
}
BeaconEngineMessage::NewPayload { payload, tx } => {
@@ -1533,7 +1543,7 @@ where
BeaconOnNewPayloadError::Internal(Box::new(e))
}))
{
error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
self.metrics
.engine
.failed_new_payload_response_deliveries
@@ -2376,9 +2386,14 @@ where
let old_first = old.first().map(|first| first.recovered_block().num_hash());
trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
self.update_reorg_metrics(old.len());
self.update_reorg_metrics(old.len(), old_first);
self.reinsert_reorged_blocks(new.clone());
self.reinsert_reorged_blocks(old.clone());
// When use_hashed_state is enabled, skip reinserting the old chain — the
// bundle state references plain state reverts which don't exist.
if !self.use_hashed_state {
self.reinsert_reorged_blocks(old.clone());
}
}
// update the tracked in-memory state with the new chain
@@ -2398,9 +2413,23 @@ where
));
}
/// This updates metrics based on the given reorg length.
fn update_reorg_metrics(&self, old_chain_length: usize) {
self.metrics.tree.reorgs.increment(1);
/// This updates metrics based on the given reorg length and first reorged block number.
fn update_reorg_metrics(&self, old_chain_length: usize, first_reorged_block: Option<NumHash>) {
if let Some(first_reorged_block) = first_reorged_block.map(|block| block.number) {
if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() &&
first_reorged_block <= finalized.number
{
self.metrics.tree.reorgs.finalized.increment(1);
} else if let Some(safe) = self.canonical_in_memory_state.get_safe_num_hash() &&
first_reorged_block <= safe.number
{
self.metrics.tree.reorgs.safe.increment(1);
} else {
self.metrics.tree.reorgs.head.increment(1);
}
} else {
debug_unreachable!("Reorged chain doesn't have any blocks");
}
self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
}

View File

@@ -1,47 +0,0 @@
//! Executor for mixed I/O and CPU workloads.
use reth_trie_parallel::root::get_tokio_runtime_handle;
use tokio::{runtime::Handle, task::JoinHandle};
/// An executor for mixed I/O and CPU workloads.
///
/// This type uses tokio to spawn blocking tasks and will reuse an existing tokio
/// runtime if available or create its own.
#[derive(Debug, Clone)]
pub struct WorkloadExecutor {
inner: WorkloadExecutorInner,
}
impl Default for WorkloadExecutor {
fn default() -> Self {
Self { inner: WorkloadExecutorInner::new() }
}
}
impl WorkloadExecutor {
/// Returns the handle to the tokio runtime
pub(super) const fn handle(&self) -> &Handle {
&self.inner.handle
}
/// Runs the provided function on an executor dedicated to blocking operations.
#[track_caller]
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.inner.handle.spawn_blocking(func)
}
}
#[derive(Debug, Clone)]
struct WorkloadExecutorInner {
handle: Handle,
}
impl WorkloadExecutorInner {
fn new() -> Self {
Self { handle: get_tokio_runtime_handle() }
}
}

View File

@@ -11,11 +11,10 @@ use crate::tree::{
StateProviderBuilder, TreeConfig,
};
use alloy_eip7928::BlockAccessList;
use alloy_eips::eip1898::BlockWithParent;
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
use alloy_evm::block::StateChangeSource;
use alloy_primitives::B256;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use executor::WorkloadExecutor;
use metrics::{Counter, Histogram};
use multiproof::{SparseTrieUpdate, *};
use parking_lot::RwLock;
@@ -24,8 +23,8 @@ use rayon::prelude::*;
use reth_evm::{
block::ExecutableTxParts,
execute::{ExecutableTxFor, WithTxEnv},
ConfigureEvm, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook, SpecFor,
TxEnvFor,
ConfigureEvm, ConvertTx, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook,
SpecFor, TxEnvFor,
};
use reth_metrics::Metrics;
use reth_primitives_traits::NodePrimitives;
@@ -34,13 +33,15 @@ use reth_provider::{
StateProviderFactory, StateReader,
};
use reth_revm::{db::BundleState, state::EvmState};
use reth_tasks::Runtime;
use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
use reth_trie_parallel::{
proof_task::{ProofTaskCtx, ProofWorkerHandle},
root::ParallelStateRootError,
};
use reth_trie_sparse::{RevealableSparseTrie, SparseStateTrie};
use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
use reth_trie_sparse::{
ParallelSparseTrie, ParallelismThresholds, RevealableSparseTrie, SparseStateTrie,
};
use std::{
collections::BTreeMap,
ops::Not,
@@ -49,12 +50,11 @@ use std::{
mpsc::{self, channel},
Arc,
},
time::Instant,
time::{Duration, Instant},
};
use tracing::{debug, debug_span, instrument, warn, Span};
pub mod bal;
pub mod executor;
pub mod multiproof;
mod preserved_sparse_trie;
pub mod prewarm;
@@ -94,6 +94,9 @@ pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
/// 144MB.
pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
/// Blocks with fewer transactions than this skip prewarming, since the fixed overhead of spawning
/// prewarm workers exceeds the execution time saved.
pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;
/// Type alias for [`PayloadHandle`] returned by payload processor spawn methods.
type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
WithTxEnv<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>,
@@ -108,7 +111,7 @@ where
Evm: ConfigureEvm,
{
/// The executor used by to spawn tasks.
executor: WorkloadExecutor,
executor: Runtime,
/// The most recent cache used for execution.
execution_cache: PayloadExecutionCache,
/// Metrics for trie operations
@@ -135,6 +138,8 @@ where
sparse_trie_prune_depth: usize,
/// Maximum storage tries to retain after pruning.
sparse_trie_max_storage_tries: usize,
/// Whether sparse trie cache pruning is fully disabled.
disable_sparse_trie_cache_pruning: bool,
/// Whether to disable cache metrics recording.
disable_cache_metrics: bool,
}
@@ -145,13 +150,13 @@ where
Evm: ConfigureEvm<Primitives = N>,
{
/// Returns a reference to the workload executor driving payload tasks.
pub const fn executor(&self) -> &WorkloadExecutor {
pub const fn executor(&self) -> &Runtime {
&self.executor
}
/// Creates a new payload processor.
pub fn new(
executor: WorkloadExecutor,
executor: Runtime,
evm_config: Evm,
config: &TreeConfig,
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
@@ -170,6 +175,7 @@ where
prewarm_max_concurrency: config.prewarm_max_concurrency(),
sparse_trie_prune_depth: config.sparse_trie_prune_depth(),
sparse_trie_max_storage_tries: config.sparse_trie_max_storage_tries(),
disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
disable_cache_metrics: config.disable_cache_metrics(),
}
}
@@ -235,7 +241,8 @@ where
+ 'static,
{
// start preparing transactions immediately
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
let (prewarm_rx, execution_rx) =
self.spawn_tx_iterator(transactions, env.transaction_count);
let span = Span::current();
let (to_sparse_trie, sparse_trie_rx) = channel();
@@ -252,15 +259,13 @@ where
// When BAL is present, use BAL prewarming and send BAL to multiproof
debug!(target: "engine::tree::payload_processor", "BAL present, using BAL prewarming");
// Send BAL message immediately to MultiProofTask
let _ = to_multi_proof.send(MultiProofMessage::BlockAccessList(Arc::clone(&bal)));
// Spawn with BAL prewarming
// The prewarm task converts the BAL to HashedPostState and sends it on
// to_multi_proof after slot prefetching completes.
self.spawn_caching_with(
env,
prewarm_rx,
provider_builder.clone(),
None, // Don't send proof targets when BAL is present
Some(to_multi_proof.clone()),
Some(bal),
v2_proofs_enabled,
)
@@ -278,15 +283,7 @@ where
// Create and spawn the storage proof task
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
let storage_worker_count = config.storage_worker_count();
let account_worker_count = config.account_worker_count();
let proof_handle = ProofWorkerHandle::new(
self.executor.handle().clone(),
task_ctx,
storage_worker_count,
account_worker_count,
v2_proofs_enabled,
);
let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, v2_proofs_enabled);
if config.disable_trie_cache() {
let multi_proof_task = MultiProofTask::new(
@@ -352,7 +349,8 @@ where
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
let (prewarm_rx, execution_rx) =
self.spawn_tx_iterator(transactions, env.transaction_count);
// This path doesn't use multiproof, so V2 proofs flag doesn't matter
let prewarm_handle =
self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal, false);
@@ -365,11 +363,23 @@ where
}
}
/// Transaction count threshold below which sequential signature recovery is used.
///
/// For blocks with fewer than this many transactions, the rayon parallel iterator overhead
/// (work-stealing setup, channel-based reorder) exceeds the cost of sequential ECDSA
/// recovery. Inspired by Nethermind's `RecoverSignature` which uses sequential `foreach`
/// for small blocks.
const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
/// Spawns a task advancing transaction env iterator and streaming updates through a channel.
///
/// For blocks with fewer than [`Self::SMALL_BLOCK_TX_THRESHOLD`] transactions, uses
/// sequential iteration to avoid rayon overhead.
#[expect(clippy::type_complexity)]
fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
&self,
transactions: I,
transaction_count: usize,
) -> (
mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Recovered>>,
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
@@ -378,22 +388,51 @@ where
let (prewarm_tx, prewarm_rx) = mpsc::channel();
let (execute_tx, execute_rx) = mpsc::channel();
// Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
rayon::spawn(move || {
let (transactions, convert) = transactions.into();
transactions.into_par_iter().enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
let tx = convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
// Only send Ok(_) variants to prewarming task.
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
if transaction_count == 0 {
// Empty block — nothing to do.
} else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
// Sequential path for small blocks — avoids rayon work-stealing setup and
// channel-based reorder overhead when it costs more than the ECDSA recovery itself.
debug!(
target: "engine::tree::payload_processor",
transaction_count,
"using sequential sig recovery for small block"
);
self.executor.spawn_blocking(move || {
let (transactions, convert) = transactions.into_parts();
for (idx, tx) in transactions.into_iter().enumerate() {
let tx = convert.convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
let _ = ooo_tx.send((idx, tx));
}
let _ = ooo_tx.send((idx, tx));
});
});
} else {
// Parallel path — spawn on rayon for parallel signature recovery.
rayon::spawn(move || {
let (transactions, convert) = transactions.into_parts();
transactions.into_par_iter().enumerate().for_each_with(
ooo_tx,
|ooo_tx, (idx, tx)| {
let tx = convert.convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
// Only send Ok(_) variants to prewarming task.
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
let _ = ooo_tx.send((idx, tx));
},
);
});
}
// Spawn a task that processes out-of-order transactions from the task above and sends them
// to the execution task in order.
@@ -405,8 +444,8 @@ where
let _ = execute_tx.send(tx);
next_for_execution += 1;
while let Some(entry) = queue.first_entry() &&
*entry.key() == next_for_execution
while let Some(entry) = queue.first_entry()
&& *entry.key() == next_for_execution
{
let _ = execute_tx.send(entry.remove());
next_for_execution += 1;
@@ -424,7 +463,7 @@ where
fn spawn_caching_with<P>(
&self,
env: ExecutionEnv<Evm>,
mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
provider_builder: StateProviderBuilder<N, P>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
bal: Option<Arc<BlockAccessList>>,
@@ -433,11 +472,8 @@ where
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
if self.disable_transaction_prewarming {
// if no transactions should be executed we clear them but still spawn the task for
// caching updates
transactions = mpsc::channel().1;
}
let skip_prewarm =
self.disable_transaction_prewarming || env.transaction_count < SMALL_BLOCK_TX_THRESHOLD;
let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
@@ -466,7 +502,9 @@ where
{
let to_prewarm_task = to_prewarm_task.clone();
self.executor.spawn_blocking(move || {
let mode = if let Some(bal) = bal {
let mode = if skip_prewarm {
PrewarmMode::Skipped
} else if let Some(bal) = bal {
PrewarmMode::BlockAccessList(bal)
} else {
PrewarmMode::Transactions(transactions)
@@ -515,6 +553,7 @@ where
let disable_trie_cache = config.disable_trie_cache();
let prune_depth = self.sparse_trie_prune_depth;
let max_storage_tries = self.sparse_trie_max_storage_tries;
let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
let chunk_size =
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size());
let executor = self.executor.clone();
@@ -611,6 +650,7 @@ where
max_storage_tries,
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
disable_cache_pruning,
);
trie_metrics
.into_trie_for_reuse_duration_histogram
@@ -724,6 +764,18 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
.map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
}
/// Takes the state root receiver out of the handle for use with custom waiting logic
/// (e.g., timeout-based waiting).
///
/// # Panics
///
/// If payload processing was started without background tasks.
pub const fn take_state_root_rx(
&mut self,
) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
self.state_root.take().expect("state_root is None")
}
/// Returns a state hook to be used to send state updates to this task.
///
/// If a multiproof task is spawned the hook will notify it about new states.
@@ -874,7 +926,7 @@ impl PayloadExecutionCache {
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
let start = Instant::now();
let cache = self.inner.read();
let mut cache = self.inner.write();
let elapsed = start.elapsed();
self.metrics.execution_cache_wait_duration.record(elapsed.as_secs_f64());
@@ -882,7 +934,7 @@ impl PayloadExecutionCache {
warn!(blocked_for=?elapsed, "Blocked waiting for execution cache mutex");
}
if let Some(c) = cache.as_ref() {
if let Some(c) = cache.as_mut() {
let cached_hash = c.executed_block_hash();
// Check that the cache hash matches the parent hash of the current block. It won't
// match in case it's a fork block.
@@ -903,13 +955,13 @@ impl PayloadExecutionCache {
);
if available {
// If the has is available (no other threads are using it), but has a mismatching
// parent hash, we can just clear it and keep using without re-creating from
// scratch.
if !hash_matches {
c.clear();
// Fork block: clear and update the hash on the ORIGINAL before cloning.
// This prevents the canonical chain from matching on the stale hash
// and picking up polluted data if the fork block fails.
c.clear_with_hash(parent_hash);
}
return Some(c.clone())
return Some(c.clone());
} else if hash_matches {
self.metrics.execution_cache_in_use.increment(1);
}
@@ -920,10 +972,25 @@ impl PayloadExecutionCache {
None
}
/// Clears the tracked cache
#[expect(unused)]
pub(crate) fn clear(&self) {
self.inner.write().take();
/// Waits until the execution cache becomes available for use.
///
/// This acquires a write lock to ensure exclusive access, then immediately releases it.
/// This is useful for synchronization before starting payload processing.
///
/// Returns the time spent waiting for the lock.
pub fn wait_for_availability(&self) -> Duration {
let start = Instant::now();
// Acquire write lock to wait for any current holders to finish
let _guard = self.inner.write();
let elapsed = start.elapsed();
if elapsed.as_millis() > 5 {
debug!(
target: "engine::tree::payload_processor",
blocked_for=?elapsed,
"Waited for execution cache to become available"
);
}
elapsed
}
/// Updates the cache with a closure that has exclusive access to the guard.
@@ -976,6 +1043,9 @@ pub struct ExecutionEnv<Evm: ConfigureEvm> {
/// Used to determine parallel worker count for prewarming.
/// A value of 0 indicates the count is unknown.
pub transaction_count: usize,
/// Withdrawals included in the block.
/// Used to generate prefetch targets for withdrawal addresses.
pub withdrawals: Option<Vec<Withdrawal>>,
}
impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
@@ -989,6 +1059,7 @@ where
parent_hash: Default::default(),
parent_state_root: Default::default(),
transaction_count: 0,
withdrawals: None,
}
}
}
@@ -998,9 +1069,7 @@ mod tests {
use super::PayloadExecutionCache;
use crate::tree::{
cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
payload_processor::{
evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
},
payload_processor::{evm_state_to_hashed_post_state, PayloadProcessor},
precompile_cache::PrecompileCacheMap,
StateProviderBuilder, TreeConfig,
};
@@ -1074,10 +1143,18 @@ mod tests {
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
// When the parent hash doesn't match, the cache is cleared and returned for reuse
// When the parent hash doesn't match (fork block), the cache is cleared,
// hash updated on the original, and clone returned for reuse
let different_hash = B256::from([4u8; 32]);
let cache = execution_cache.get_cache_for(different_hash);
assert!(cache.is_some(), "cache should be returned for reuse after clearing")
assert!(cache.is_some(), "cache should be returned for reuse after clearing");
drop(cache);
// The stored cache now has the fork block's parent hash.
// Canonical chain looking for original hash sees a mismatch → clears and reuses.
let original = execution_cache.get_cache_for(hash);
assert!(original.is_some(), "canonical chain gets cache back via mismatch+clear");
}
#[test]
@@ -1102,7 +1179,7 @@ mod tests {
#[test]
fn on_inserted_executed_block_populates_cache() {
let payload_processor = PayloadProcessor::new(
WorkloadExecutor::default(),
reth_tasks::Runtime::test(),
EthEvmConfig::new(Arc::new(ChainSpec::default())),
&TreeConfig::default(),
PrecompileCacheMap::default(),
@@ -1131,7 +1208,7 @@ mod tests {
#[test]
fn on_inserted_executed_block_skips_on_parent_mismatch() {
let payload_processor = PayloadProcessor::new(
WorkloadExecutor::default(),
reth_tasks::Runtime::test(),
EthEvmConfig::new(Arc::new(ChainSpec::default())),
&TreeConfig::default(),
PrecompileCacheMap::default(),
@@ -1266,7 +1343,7 @@ mod tests {
}
let mut payload_processor = PayloadProcessor::new(
WorkloadExecutor::default(),
reth_tasks::Runtime::test(),
EthEvmConfig::new(factory.chain_spec()),
&TreeConfig::default(),
PrecompileCacheMap::default(),
@@ -1301,4 +1378,61 @@ mod tests {
"State root mismatch: task={root_from_task}, base={root_from_regular}"
);
}
/// Tests the full prewarm lifecycle for a fork block:
///
/// 1. Cache is at canonical block 4.
/// 2. Fork block (parent = block 2) checks out the cache via `get_cache_for`, simulating what
/// `PrewarmCacheTask` does when it receives a `SavedCache`.
/// 3. Prewarm populates the shared cache with fork-specific state.
/// 4. While the prewarm clone is alive, the cache is unavailable (`usage_guard` > 1).
/// 5. Prewarm drops without calling `save_cache` (fork block was invalid).
/// 6. Canonical block 5 (parent = block 4) must get a cache with correct hash and no stale fork
/// data.
#[test]
fn fork_prewarm_dropped_without_save_does_not_corrupt_cache() {
let execution_cache = PayloadExecutionCache::default();
// Canonical chain at block 4.
let block4_hash = B256::from([4u8; 32]);
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(block4_hash)));
// Fork block arrives with parent = block 2. Prewarm task checks out the cache.
// This simulates PrewarmCacheTask receiving a SavedCache clone from get_cache_for.
let fork_parent = B256::from([2u8; 32]);
let prewarm_cache = execution_cache.get_cache_for(fork_parent);
assert!(prewarm_cache.is_some(), "prewarm should obtain cache for fork block");
let prewarm_cache = prewarm_cache.unwrap();
assert_eq!(prewarm_cache.executed_block_hash(), fork_parent);
// Prewarm populates cache with fork-specific state (ancestor data for block 2).
// Since ExecutionCache uses Arc<Inner>, this data is shared with the stored original.
let fork_addr = Address::from([0xBB; 20]);
let fork_key = B256::from([0xCC; 32]);
prewarm_cache.cache().insert_storage(fork_addr, fork_key, Some(U256::from(999)));
// While prewarm holds the clone, the usage_guard count > 1 → cache is in use.
let during_prewarm = execution_cache.get_cache_for(block4_hash);
assert!(
during_prewarm.is_none(),
"cache must be unavailable while prewarm holds a reference"
);
// Fork block fails — prewarm task drops without calling save_cache/update_with_guard.
drop(prewarm_cache);
// Canonical block 5 arrives (parent = block 4).
// Stored hash = fork_parent (our fix), so get_cache_for sees a mismatch,
// clears the stale fork data, and returns a cache with hash = block4_hash.
let block5_cache = execution_cache.get_cache_for(block4_hash);
assert!(
block5_cache.is_some(),
"canonical chain must get cache after fork prewarm is dropped"
);
assert_eq!(
block5_cache.as_ref().unwrap().executed_block_hash(),
block4_hash,
"cache must carry the canonical parent hash, not the fork parent"
);
}
}

View File

@@ -115,6 +115,8 @@ pub enum MultiProofMessage {
/// The state update that was used to calculate the proof
state: HashedPostState,
},
/// Pre-hashed state update from BAL conversion that can be applied directly without proofs.
HashedStateUpdate(HashedPostState),
/// Block Access List (EIP-7928; BAL) containing complete state changes for the block.
///
/// When received, the task generates a single state update from the BAL and processes it.
@@ -1189,6 +1191,11 @@ impl MultiProofTask {
}
false
}
MultiProofMessage::HashedStateUpdate(hashed_state) => {
batch_metrics.state_update_proofs_requested +=
self.on_hashed_state_update(Source::BlockAccessList, hashed_state);
false
}
}
}
@@ -1534,23 +1541,18 @@ mod tests {
providers::OverlayStateProviderFactory, test_utils::create_test_provider_factory,
BlockNumReader, BlockReader, ChangeSetReader, DatabaseProviderFactory, LatestStateProvider,
PruneCheckpointReader, StageCheckpointReader, StateProviderBox, StorageChangeSetReader,
StorageSettingsCache,
};
use reth_trie::MultiProof;
use reth_trie_db::ChangesetCache;
use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofWorkerHandle};
use revm_primitives::{B256, U256};
use std::sync::{Arc, OnceLock};
use tokio::runtime::{Handle, Runtime};
/// Get a handle to the test runtime, creating it if necessary
fn get_test_runtime_handle() -> Handle {
static TEST_RT: OnceLock<Runtime> = OnceLock::new();
TEST_RT
.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()
})
.handle()
.clone()
/// Get a test runtime, creating it if necessary
fn get_test_runtime() -> &'static reth_tasks::Runtime {
static TEST_RT: OnceLock<reth_tasks::Runtime> = OnceLock::new();
TEST_RT.get_or_init(reth_tasks::Runtime::test)
}
fn create_test_state_root_task<F>(factory: F) -> MultiProofTask
@@ -1561,16 +1563,17 @@ mod tests {
+ PruneCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader
+ StorageSettingsCache
+ BlockNumReader,
> + Clone
+ Send
+ 'static,
{
let rt_handle = get_test_runtime_handle();
let runtime = get_test_runtime();
let changeset_cache = ChangesetCache::new();
let overlay_factory = OverlayStateProviderFactory::new(factory, changeset_cache);
let task_ctx = ProofTaskCtx::new(overlay_factory);
let proof_handle = ProofWorkerHandle::new(rt_handle, task_ctx, 1, 1, false);
let proof_handle = ProofWorkerHandle::new(runtime, task_ctx, false);
let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();
let (tx, rx) = crossbeam_channel::unbounded();
@@ -1580,7 +1583,10 @@ mod tests {
fn create_cached_provider<F>(factory: F) -> CachedStateProvider<StateProviderBox>
where
F: DatabaseProviderFactory<
Provider: BlockReader + StageCheckpointReader + PruneCheckpointReader,
Provider: BlockReader
+ StageCheckpointReader
+ PruneCheckpointReader
+ reth_provider::StorageSettingsCache,
> + Clone
+ Send
+ 'static,

View File

@@ -3,12 +3,11 @@
use alloy_primitives::B256;
use parking_lot::Mutex;
use reth_trie_sparse::SparseStateTrie;
use reth_trie_sparse_parallel::ParallelSparseTrie;
use std::sync::Arc;
use tracing::debug;
/// Type alias for the sparse trie type used in preservation.
pub(super) type SparseTrie = SparseStateTrie<ParallelSparseTrie, ParallelSparseTrie>;
pub(super) type SparseTrie = SparseStateTrie;
/// Shared handle to a preserved sparse trie that can be reused across payload validations.
///

View File

@@ -14,8 +14,7 @@
use crate::tree::{
cached_state::{CachedStateProvider, SavedCache},
payload_processor::{
bal::{total_slots, BALSlotIter},
executor::WorkloadExecutor,
bal::{self, total_slots, BALSlotIter},
multiproof::{MultiProofMessage, VersionedMultiProofTargets},
PayloadExecutionCache,
},
@@ -24,6 +23,7 @@ use crate::tree::{
};
use alloy_consensus::transaction::TxHashRef;
use alloy_eip7928::BlockAccessList;
use alloy_eips::eip4895::Withdrawal;
use alloy_evm::Database;
use alloy_primitives::{keccak256, map::B256Set, B256};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
@@ -36,6 +36,7 @@ use reth_provider::{
StateReader,
};
use reth_revm::{database::StateProviderDatabase, state::EvmState};
use reth_tasks::Runtime;
use reth_trie::MultiProofTargets;
use std::{
ops::Range,
@@ -48,13 +49,16 @@ use std::{
};
use tracing::{debug, debug_span, instrument, trace, warn, Span};
/// Determines the prewarming mode: transaction-based or BAL-based.
/// Determines the prewarming mode: transaction-based, BAL-based, or skipped.
#[derive(Debug)]
pub enum PrewarmMode<Tx> {
/// Prewarm by executing transactions from a stream.
Transactions(Receiver<Tx>),
/// Prewarm by prefetching slots from a Block Access List.
BlockAccessList(Arc<BlockAccessList>),
/// Transaction prewarming is skipped (e.g. small blocks where the overhead exceeds the
/// benefit). No workers are spawned.
Skipped,
}
/// A wrapper for transactions that includes their index in the block.
@@ -77,7 +81,7 @@ where
Evm: ConfigureEvm<Primitives = N>,
{
/// The executor used to spawn execution tasks.
executor: WorkloadExecutor,
executor: Runtime,
/// Shared execution cache.
execution_cache: PayloadExecutionCache,
/// Context provided to execution tasks
@@ -100,7 +104,7 @@ where
{
/// Initializes the task with the given transactions pending execution
pub fn new(
executor: WorkloadExecutor,
executor: Runtime,
execution_cache: PayloadExecutionCache,
ctx: PrewarmContext<N, P, Evm>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
@@ -163,7 +167,7 @@ where
};
// Spawn workers
let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, to_multi_proof, done_tx.clone());
let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, to_multi_proof.clone(), done_tx.clone());
// Distribute transactions to workers
let mut tx_index = 0usize;
@@ -187,6 +191,16 @@ where
tx_index += 1;
}
// Send withdrawal prefetch targets after all transactions have been distributed
if let Some(to_multi_proof) = to_multi_proof
&& let Some(withdrawals) = &ctx.env.withdrawals
&& !withdrawals.is_empty()
{
let targets =
multiproof_targets_from_withdrawals(withdrawals, ctx.v2_proofs_enabled);
let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
}
// drop sender and wait for all tasks to finish
drop(done_tx);
drop(tx_sender);
@@ -276,6 +290,7 @@ where
target: "engine::tree::payload_processor::prewarm",
"Skipping BAL prewarm - no cache available"
);
self.send_bal_hashed_state(&bal);
let _ =
actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
return;
@@ -291,7 +306,7 @@ where
);
if total_slots == 0 {
// No slots to prefetch, signal completion immediately
self.send_bal_hashed_state(&bal);
let _ =
actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
return;
@@ -336,10 +351,51 @@ where
"All BAL prewarm workers completed"
);
// Convert BAL to HashedPostState and send to multiproof task
self.send_bal_hashed_state(&bal);
// Signal that execution has finished
let _ = actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
}
/// Converts the BAL to [`HashedPostState`](reth_trie::HashedPostState) and sends it to the
/// multiproof task.
fn send_bal_hashed_state(&self, bal: &BlockAccessList) {
let Some(to_multi_proof) = &self.to_multi_proof else { return };
let provider = match self.ctx.provider.build() {
Ok(provider) => provider,
Err(err) => {
warn!(
target: "engine::tree::payload_processor::prewarm",
?err,
"Failed to build provider for BAL hashed state conversion"
);
return;
}
};
match bal::bal_to_hashed_post_state(bal, &provider) {
Ok(hashed_state) => {
debug!(
target: "engine::tree::payload_processor::prewarm",
accounts = hashed_state.accounts.len(),
storages = hashed_state.storages.len(),
"Converted BAL to hashed post state"
);
let _ = to_multi_proof.send(MultiProofMessage::HashedStateUpdate(hashed_state));
let _ = to_multi_proof.send(MultiProofMessage::FinishedStateUpdates);
}
Err(err) => {
warn!(
target: "engine::tree::payload_processor::prewarm",
?err,
"Failed to convert BAL to hashed state"
);
}
}
}
/// Executes the task.
///
/// This will execute the transactions until all transactions have been processed or the task
@@ -363,6 +419,10 @@ where
PrewarmMode::BlockAccessList(bal) => {
self.run_bal_prewarm(bal, actions_tx);
}
PrewarmMode::Skipped => {
let _ = actions_tx
.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
}
}
let mut final_execution_outcome = None;
@@ -475,11 +535,8 @@ where
if let Some(saved_cache) = saved_cache {
let caches = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
state_provider = Box::new(
CachedStateProvider::new(state_provider, caches, cache_metrics)
// ensure we pre-warm the cache
.prewarm(),
);
state_provider =
Box::new(CachedStateProvider::new_prewarm(state_provider, caches, cache_metrics));
}
let state_provider = StateProviderDatabase::new(state_provider);
@@ -537,18 +594,11 @@ where
return
};
while let Ok(IndexedTransaction { index, tx }) = {
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "recv tx")
.entered();
txs.recv()
} {
let enter = debug_span!(
while let Ok(IndexedTransaction { index, tx }) = txs.recv() {
let _enter = debug_span!(
target: "engine::tree::payload_processor::prewarm",
"prewarm tx",
index,
tx_hash = %tx.tx().tx_hash(),
is_success = tracing::field::Empty,
gas_used = tracing::field::Empty,
)
.entered();
@@ -579,12 +629,6 @@ where
};
metrics.execution_duration.record(start.elapsed());
// record some basic information about the transactions
enter.record("gas_used", res.result.gas_used());
enter.record("is_success", res.result.is_success());
drop(enter);
// If the task was cancelled, stop execution, and exit.
if terminate_execution.load(Ordering::Relaxed) {
break
@@ -593,16 +637,12 @@ where
// Only send outcome for transactions after the first txn
// as the main execution will be just as fast
if index > 0 {
let _enter =
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
.entered();
let (targets, storage_targets) =
multiproof_targets_from_state(res.state, v2_proofs_enabled);
metrics.prefetch_storage_targets.record(storage_targets as f64);
if let Some(to_multi_proof) = &to_multi_proof {
let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
}
drop(_enter);
}
metrics.total_runtime.record(start.elapsed());
@@ -618,7 +658,7 @@ where
fn spawn_workers<Tx>(
self,
workers_needed: usize,
task_executor: &WorkloadExecutor,
task_executor: &Runtime,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
done_tx: Sender<()>,
) -> CrossbeamSender<IndexedTransaction<Tx>>
@@ -655,7 +695,7 @@ where
fn spawn_bal_worker(
&self,
idx: usize,
executor: &WorkloadExecutor,
executor: &Runtime,
bal: Arc<BlockAccessList>,
range: Range<usize>,
done_tx: Sender<()>,
@@ -831,6 +871,27 @@ fn multiproof_targets_v2_from_state(state: EvmState) -> (VersionedMultiProofTarg
(VersionedMultiProofTargets::V2(targets), storage_target_count)
}
/// Returns [`VersionedMultiProofTargets`] for withdrawal addresses.
///
/// Withdrawals only modify account balances (no storage), so the targets contain
/// only account-level entries with empty storage sets.
fn multiproof_targets_from_withdrawals(
withdrawals: &[Withdrawal],
v2_enabled: bool,
) -> VersionedMultiProofTargets {
use reth_trie_parallel::targets_v2::MultiProofTargetsV2;
if v2_enabled {
VersionedMultiProofTargets::V2(MultiProofTargetsV2 {
account_targets: withdrawals.iter().map(|w| keccak256(w.address).into()).collect(),
..Default::default()
})
} else {
VersionedMultiProofTargets::Legacy(
withdrawals.iter().map(|w| (keccak256(w.address), Default::default())).collect(),
)
}
}
/// The events the pre-warm task can handle.
///
/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the main

View File

@@ -1,6 +1,5 @@
//! Sparse Trie task related functionality.
use super::executor::WorkloadExecutor;
use crate::tree::{
multiproof::{
dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
@@ -11,8 +10,9 @@ use crate::tree::{
use alloy_primitives::B256;
use alloy_rlp::{Decodable, Encodable};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
use rayon::iter::ParallelIterator;
use reth_primitives_traits::{Account, ParallelBridgeBuffered};
use reth_tasks::Runtime;
use reth_trie::{
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
TrieAccount, EMPTY_ROOT_HASH, TRIE_ACCOUNT_RLP_MAX_SIZE,
@@ -28,7 +28,7 @@ use reth_trie_parallel::{
use reth_trie_sparse::{
errors::{SparseStateTrieResult, SparseTrieErrorKind, SparseTrieResult},
provider::{TrieNodeProvider, TrieNodeProviderFactory},
DeferredDrops, LeafUpdate, SerialSparseTrie, SparseStateTrie, SparseTrie, SparseTrieExt,
DeferredDrops, LeafUpdate, ParallelSparseTrie, SparseStateTrie, SparseTrie,
};
use revm_primitives::{hash_map::Entry, B256Map};
use smallvec::SmallVec;
@@ -44,8 +44,8 @@ where
BPF: TrieNodeProviderFactory + Send + Sync,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
{
Cleared(SparseTrieTask<BPF, A, S>),
Cached(SparseTrieCacheTask<A, S>),
@@ -56,8 +56,8 @@ where
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
{
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
match self {
@@ -72,6 +72,7 @@ where
max_storage_tries: usize,
max_nodes_capacity: usize,
max_values_capacity: usize,
disable_pruning: bool,
) -> (SparseStateTrie<A, S>, DeferredDrops) {
match self {
Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
@@ -80,6 +81,7 @@ where
max_storage_tries,
max_nodes_capacity,
max_values_capacity,
disable_pruning,
),
}
}
@@ -97,7 +99,7 @@ where
}
/// A task responsible for populating the sparse trie.
pub(super) struct SparseTrieTask<BPF, A = SerialSparseTrie, S = SerialSparseTrie>
pub(super) struct SparseTrieTask<BPF, A = ParallelSparseTrie, S = ParallelSparseTrie>
where
BPF: TrieNodeProviderFactory + Send + Sync,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
@@ -117,8 +119,8 @@ where
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
{
/// Creates a new sparse trie task with the given trie.
pub(super) const fn new(
@@ -212,7 +214,7 @@ where
const MAX_PENDING_UPDATES: usize = 100;
/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
pub(super) struct SparseTrieCacheTask<A = SerialSparseTrie, S = SerialSparseTrie> {
pub(super) struct SparseTrieCacheTask<A = ParallelSparseTrie, S = ParallelSparseTrie> {
/// Sender for proof results.
proof_result_tx: CrossbeamSender<ProofResultMessage>,
/// Receiver for proof results directly from workers.
@@ -277,12 +279,12 @@ pub(super) struct SparseTrieCacheTask<A = SerialSparseTrie, S = SerialSparseTrie
impl<A, S> SparseTrieCacheTask<A, S>
where
A: SparseTrieExt + Default,
S: SparseTrieExt + Default + Clone,
A: SparseTrie + Default,
S: SparseTrie + Default + Clone,
{
/// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`].
pub(super) fn new_with_trie(
executor: &WorkloadExecutor,
executor: &Runtime,
updates: CrossbeamReceiver<MultiProofMessage>,
proof_worker_handle: ProofWorkerHandle,
metrics: MultiProofTaskMetrics,
@@ -343,6 +345,9 @@ where
MultiProofMessage::EmptyProof { .. } | MultiProofMessage::BlockAccessList(_) => {
continue
}
MultiProofMessage::HashedStateUpdate(state) => {
SparseTrieTaskMessage::HashedState(state)
}
};
if hashed_state_tx.send(msg).is_err() {
break;
@@ -353,16 +358,23 @@ where
/// Prunes and shrinks the trie for reuse in the next payload built on top of this one.
///
/// Should be called after the state root result has been sent.
///
/// When `disable_pruning` is true, the trie is preserved without any node pruning,
/// storage trie eviction, or capacity shrinking, keeping the full cache intact for
/// benchmarking purposes.
pub(super) fn into_trie_for_reuse(
self,
prune_depth: usize,
max_storage_tries: usize,
max_nodes_capacity: usize,
max_values_capacity: usize,
disable_pruning: bool,
) -> (SparseStateTrie<A, S>, DeferredDrops) {
let Self { mut trie, .. } = self;
trie.prune(prune_depth, max_storage_tries);
trie.shrink_to(max_nodes_capacity, max_values_capacity);
if !disable_pruning {
trie.prune(prune_depth, max_storage_tries);
trie.shrink_to(max_nodes_capacity, max_values_capacity);
}
let deferred = trie.take_deferred_drops();
(trie, deferred)
}
@@ -404,7 +416,9 @@ where
let update = match message {
Ok(m) => m,
Err(_) => {
break
return Err(ParallelStateRootError::Other(
"updates channel disconnected before state root calculation".to_string(),
))
}
};
@@ -483,7 +497,7 @@ where
}
#[instrument(
level = "debug",
level = "trace",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
@@ -515,7 +529,7 @@ where
/// Processes a hashed state update and encodes all state changes as trie updates.
#[instrument(
level = "debug",
level = "trace",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
@@ -675,6 +689,11 @@ where
/// Invokes `update_leaves` for the accounts trie and collects any new targets.
///
/// Returns whether any updates were drained (applied to the trie).
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
fn process_account_leaf_updates(&mut self, new: bool) -> SparseTrieResult<bool> {
let account_updates =
if new { &mut self.new_account_updates } else { &mut self.account_updates };
@@ -718,53 +737,50 @@ where
return Ok(());
}
let span = tracing::Span::current();
let roots = self
let span = debug_span!("compute_storage_roots").entered();
self
.trie
.storage_tries_mut()
.par_iter_mut()
.filter(|(address, _)| {
self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty())
.iter_mut()
.filter(|(address, trie)| {
self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty()) &&
!trie.is_root_cached()
})
.map(|(address, trie)| {
.par_bridge_buffered()
.for_each(|(address, trie)| {
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage root", ?address).entered();
let root =
trie.root().expect("updates are drained, trie should be revealed by now");
(address, root)
})
.collect::<Vec<_>>();
for (addr, storage_root) in roots {
// If the storage root is known and we have a pending update for this account, encode it
// into a proper update.
if let Entry::Occupied(entry) = self.pending_account_updates.entry(*addr) &&
entry.get().is_some()
{
let account = entry.remove().expect("just checked, should be Some");
let encoded = if account.is_none_or(|account| account.is_empty()) &&
storage_root == EMPTY_ROOT_HASH
{
Vec::new()
} else {
self.account_rlp_buf.clear();
account
.unwrap_or_default()
.into_trie_account(storage_root)
.encode(&mut self.account_rlp_buf);
self.account_rlp_buf.clone()
};
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
}
}
trie.root().expect("updates are drained, trie should be revealed by now");
});
drop(span);
loop {
let span = debug_span!("promote_updates", promoted = tracing::field::Empty).entered();
// Now handle pending account updates that can be upgraded to a proper update.
let account_rlp_buf = &mut self.account_rlp_buf;
let mut num_promoted = 0;
self.pending_account_updates.retain(|addr, account| {
// If account has pending storage updates, it is still pending.
if self.storage_updates.get(addr).is_some_and(|updates| !updates.is_empty()) {
return true;
if let Some(updates) = self.storage_updates.get(addr) {
if !updates.is_empty() {
// If account has pending storage updates, it is still pending.
return true;
} else if let Some(account) = account.take() {
let storage_root = self.trie.storage_root(addr).expect("updates are drained, storage trie should be revealed by now");
let encoded = if account.is_none_or(|account| account.is_empty()) &&
storage_root == EMPTY_ROOT_HASH
{
Vec::new()
} else {
account_rlp_buf.clear();
account
.unwrap_or_default()
.into_trie_account(storage_root)
.encode(account_rlp_buf);
account_rlp_buf.clone()
};
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
num_promoted += 1;
return false;
}
}
// Get the current account state either from the trie or from latest account update.
@@ -799,15 +815,18 @@ where
account_rlp_buf.clone()
};
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
num_promoted += 1;
false
});
span.record("promoted", num_promoted);
drop(span);
// Only exit when no new updates are processed.
//
// We need to keep iterating if any updates are being drained because that might
// indicate that more pending account updates can be promoted.
if !self.process_account_leaf_updates(false)? {
if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
break
}
}
@@ -1024,3 +1043,59 @@ where
Ok(elapsed)
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::{keccak256, Address, U256};
use reth_trie_sparse::ParallelSparseTrie;
#[test]
fn test_run_hashing_task_hashed_state_update_forwards() {
let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
let address = keccak256(Address::random());
let slot = keccak256(U256::from(42).to_be_bytes::<32>());
let value = U256::from(999);
let mut hashed_state = HashedPostState::default();
hashed_state.accounts.insert(
address,
Some(Account { balance: U256::from(100), nonce: 1, bytecode_hash: None }),
);
let mut storage = reth_trie::HashedStorage::new(false);
storage.storage.insert(slot, value);
hashed_state.storages.insert(address, storage);
let expected_state = hashed_state.clone();
let handle = std::thread::spawn(move || {
SparseTrieCacheTask::<ParallelSparseTrie, ParallelSparseTrie>::run_hashing_task(
updates_rx,
hashed_state_tx,
);
});
updates_tx.send(MultiProofMessage::HashedStateUpdate(hashed_state)).unwrap();
updates_tx.send(MultiProofMessage::FinishedStateUpdates).unwrap();
drop(updates_tx);
let SparseTrieTaskMessage::HashedState(received) = hashed_state_rx.recv().unwrap() else {
panic!("expected HashedState message");
};
let account = received.accounts.get(&address).unwrap().unwrap();
assert_eq!(account.balance, expected_state.accounts[&address].unwrap().balance);
assert_eq!(account.nonce, expected_state.accounts[&address].unwrap().nonce);
let storage = received.storages.get(&address).unwrap();
assert_eq!(*storage.storage.get(&slot).unwrap(), value);
let second = hashed_state_rx.recv().unwrap();
assert!(matches!(second, SparseTrieTaskMessage::FinishedStateUpdates));
assert!(hashed_state_rx.recv().is_err());
handle.join().unwrap();
}
}

Some files were not shown because too many files have changed in this diff Show More