Compare commits

..

118 Commits

Author SHA1 Message Date
Brian Picciano
d6324d63e2 chore: release 1.11.3 2026-03-12 12:34:39 +01:00
Brian Picciano
5f3ade1bfe fix(trie): Reset proof v2 calculator on error (#22781)
Co-authored-by: Amp <amp@ampcode.com>
2026-03-12 10:09:18 +00:00
Derek Cofausper
b053f6fafe cherry-pick: fix don't produce both updates and removals for trie nodes (#22507)
Co-Authored-By: Arsenii Kulikov <62447812+klkvr@users.noreply.github.com>
2026-03-12 02:30:25 +00:00
Derek Cofausper
2a58e7a077 cherry-pick: install rayon panic handler (37f5b3a)
Co-Authored-By: Arsenii Kulikov <62447812+klkvr@users.noreply.github.com>
2026-03-12 02:30:17 +00:00
Emma Jamieson-Hoare
793a3d5fb3 fix missing import 2026-03-10 11:44:07 +00:00
Emma Jamieson-Hoare
89ae1af694 chore: upgrade to 1.11.2 2026-03-10 10:48:03 +00:00
Alexey Shekhirin
9c33fb5d45 fix(engine): reset execution cache hash on clear (#22895)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 10:46:09 +00:00
Alexey Shekhirin
bef3d7b4d1 fix lockfile 2026-02-23 18:36:44 +00:00
Emma Jamieson-Hoare
e918c17af9 chore: release 1.11.1
Amp-Thread-ID: https://ampcode.com/threads/T-019c8ba4-fd85-736b-9d2d-e878d350a91b
Co-authored-by: Amp <amp@ampcode.com>
2026-02-23 18:02:14 +00:00
Arsenii Kulikov
fcc170d53c fix: properly reveal trie nodes (#22415) 2026-02-23 17:58:13 +00:00
Arsenii Kulikov
c685842ba2 fix: overlay preparation on tokio (#22492) 2026-02-23 17:57:51 +00:00
Georgios Konstantopoulos
564ffa5868 fix(ci): pass docker tags as separate set entries in bake action (#22151)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 22:10:35 +00:00
Dan Cline
12891dd171 chore: allow invalid storage metadata (#22150) 2026-02-12 22:02:26 +00:00
Emma Jamieson-Hoare
c1015022f5 chore: release reth v1.11.0 (#22148) 2026-02-12 21:39:30 +00:00
Dan Cline
e3fe6326bc chore(storage): rm storage settings, use only one (#22042)
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
2026-02-12 21:17:05 +00:00
Dan Cline
e3d520b24f feat(network): add inbound / outbound scopes for disconnect reasons (#22070) 2026-02-12 20:54:03 +00:00
Dan Cline
9f29939ea1 feat: bundle mdbx_copy as reth db copy subcommand (#22061)
Co-authored-by: Emma Jamieson-Hoare <emmajam@users.noreply.github.com>
2026-02-12 20:39:56 +00:00
Matthias Seitz
10881d1c73 chore: fix book (#22142) 2026-02-12 21:44:53 +01:00
John Letey
408593467b feat(download): optional chain-aware snapshot url (#22119) 2026-02-12 21:42:19 +01:00
Emma Jamieson-Hoare
8caf8cdf11 docs: improve reth.rs/overview page (#22131)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 20:10:34 +00:00
Georgios Konstantopoulos
1e8030ef28 fix(engine): return error on updates channel disconnect in sparse trie task (#22139)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 20:00:36 +00:00
YK
f72c503d6f feat(metrics): use 5M first gas bucket for finer-grained newPayload metrics (#22136)
Co-authored-by: Ubuntu <ubuntu@dev-yk.tail388b2e.ts.net>
2026-02-12 19:03:21 +00:00
Emma Jamieson-Hoare
42890e6e7f fix: improve nightly Docker build failure Slack notification (#22130)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 18:58:55 +00:00
Dan Cline
e30e441ada fix: stage drop prunes account/storage changeset static files (#22062) 2026-02-12 18:34:46 +00:00
Georgios Konstantopoulos
121160d248 refactor(db): use hashed state as canonical state representation (#21115)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
2026-02-12 18:02:02 +00:00
Georgios Konstantopoulos
7ff78ca082 perf(engine): use transaction count threshold for prewarm skip (#22094)
Co-authored-by: yk <yongkang@tempo.xyz>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Ubuntu <ubuntu@dev-yk.tail388b2e.ts.net>
Co-authored-by: YK <chiayongkang@hotmail.com>
2026-02-12 17:07:52 +00:00
Georgios Konstantopoulos
d7f56d509c chore: add DaniPopes as codeowner for tasks crate (#22128)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 12:08:02 -05:00
Georgios Konstantopoulos
3300e404cf feat(engine): add --engine.disable-sparse-trie-cache-pruning flag (#21967)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: mattsse <19890894+mattsse@users.noreply.github.com>
Co-authored-by: alexey <17802178+shekhirin@users.noreply.github.com>
Co-authored-by: Alexey Shekhirin <github@shekhirin.com>
2026-02-12 16:36:31 +00:00
Georgios Konstantopoulos
77cb99fc78 chore(node): update misleading consensus engine log message (#22124)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Emma Jamieson-Hoare <emmajam@users.noreply.github.com>
2026-02-12 16:14:03 +00:00
Georgios Konstantopoulos
66169c7e7c feat(reth-bench): add progress field to per-block benchmark logs (#22016)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 16:03:32 +00:00
Georgios Konstantopoulos
4f5fafc8f3 fix(net): correct EthMessageID::max for eth70 and later versions (#22076)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 15:53:11 +00:00
Georgios Konstantopoulos
0b8e6c6ed3 feat(net): enforce EIP-868 fork ID for discovered peers (#22013)
Co-authored-by: Emma <emma@tempo.xyz>
Co-authored-by: Matthias Seitz <mattsse@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Emma Jamieson-Hoare <ejamieson19@gmail.com>
Co-authored-by: Emma Jamieson-Hoare <emmajam@users.noreply.github.com>
2026-02-12 15:29:37 +00:00
Georgios Konstantopoulos
4a62d38af2 perf(engine): use sequential sig recovery for blocks with small blocks (#22077)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Ubuntu <ubuntu@dev-yk.tail388b2e.ts.net>
Co-authored-by: YK <chiayongkang@hotmail.com>
2026-02-12 15:06:21 +00:00
Georgios Konstantopoulos
dc4f249f09 chore: zero-pad thread indices in thread names (#22113)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 12:45:49 +00:00
Brian Picciano
c915841a45 chore(stateless): Remove reth-stateless crate (#22115) 2026-02-12 11:20:49 +00:00
Georgios Konstantopoulos
217a337d8c chore(engine): remove biased select in engine service loop (#21961)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
2026-02-12 05:45:45 +00:00
Georgios Konstantopoulos
74d57008b6 chore(engine): downgrade failed response delivery logs to warn (#22055)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 05:44:09 +00:00
Georgios Konstantopoulos
f8767bc678 fix(engine): add await_state_root span to timeout path (#22111)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 05:14:39 +00:00
Georgios Konstantopoulos
81c83bba68 refactor(engine): remove unnecessary turbofish on CachedStateProvider, add new_prewarm (#22107)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 02:48:57 +00:00
Georgios Konstantopoulos
cd8ec58703 refactor(engine): move CachedStateProvider prewarm to const generic (#22106)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 01:30:24 +00:00
DaniPopes
931b17c3fd chore: bump alloy-core deps (#22104) 2026-02-12 01:15:56 +00:00
Emma Jamieson-Hoare
807d328cf0 fix: move alloy-primitives to regular dependency in bin/reth (#22105)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 01:15:12 +00:00
Georgios Konstantopoulos
8a6bbd29fe fix(tracing): return error instead of panicking on log directory creation failure (#22100)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 00:40:39 +00:00
Georgios Konstantopoulos
8bedaaee71 feat(docker): include debug symbols in maxperf images (#22003)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 00:34:41 +00:00
Emma Jamieson-Hoare
09cd105671 fix(primitives): move feature-referenced deps from dev-dependencies to optional dependencies (#22103)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 23:50:56 +00:00
Georgios Konstantopoulos
a0b60b7e64 feat(evm): impl ExecutableTxTuple for Either via EitherTxIterator (#22102)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 23:48:17 +00:00
DaniPopes
90e15d096d perf: reduce tracing span noise in prewarm and proof workers (#22101) 2026-02-11 23:32:50 +00:00
Emma Jamieson-Hoare
a161ca294f feat(net): add reason label to backed_off_peers metric (#22009)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 23:00:20 +00:00
Emma Jamieson-Hoare
3a5c41e3da test: add WebSocket subscription integration tests for eth_subscribe (#22065)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 22:56:47 +00:00
Georgios Konstantopoulos
968d3c9534 revert: skip transaction prewarming for small blocks (#22059) (#22097) 2026-02-11 14:38:08 -08:00
DaniPopes
fc6666f6a7 perf: treat hashes as bytes in BranchNodeCompact (#22089) 2026-02-11 22:11:49 +00:00
DaniPopes
ff3a854326 perf: use dedicated trie rayon pool for proof workers (#22051) 2026-02-11 22:10:17 +00:00
DaniPopes
04543ed16b chore: add span and log to runtime build (#22064) 2026-02-11 22:06:14 +00:00
Emma Jamieson-Hoare
ae3f0d4d1a test: expand CLI integration tests (#22086)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 21:43:28 +00:00
Georgios Konstantopoulos
5bccdc4a5d feat(engine): add state root task timeout with sequential fallback (#22004)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 20:45:45 +00:00
Georgios Konstantopoulos
0b7cd60668 perf(engine): skip transaction prewarming for small blocks (#22059)
Co-authored-by: yk <yongkang@tempo.xyz>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 20:37:04 +00:00
YK
aa983b49af perf(engine): add PrewarmMode::Skipped to avoid spawning idle workers (#22066)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Ubuntu <ubuntu@dev-yk.tail388b2e.ts.net>
2026-02-11 19:48:48 +00:00
Georgios Konstantopoulos
2aff617767 feat(cli): split account-history and storage-history stage drops (#22083)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 19:21:55 +00:00
Georgios Konstantopoulos
2c5d00ffb5 feat(engine): add gas bucket label to newPayload metrics (#22067)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 19:00:07 +00:00
Georgios Konstantopoulos
e2a3527414 test: add CLI integration tests for reth binary (#22069)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 17:56:16 +00:00
Georgios Konstantopoulos
e4cb3d3aed chore(cli): log received signals at info level (#22071)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 17:55:37 +00:00
DaniPopes
079b7b9d57 fix: don't drop node (#22063) 2026-02-11 16:43:55 +00:00
Georgios Konstantopoulos
8a25d7d3cf chore: remove ress crates from workspace (#22057)
Co-authored-by: mattsse <matt@paradigm.xyz>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-11 13:39:56 +00:00
Minhyuk Kim
a5ced84098 feat(node/builder): add build_with_ordering_and_spawn_maintenance_task to TxPoolBuilder (#21979) 2026-02-11 12:58:29 +00:00
Emma Jamieson-Hoare
59760a2fe3 feat(net): add direction labels to closed_sessions and pending_session_failures metrics (#22014)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:59:06 +00:00
Matthias Seitz
b9d21f293e refactor: remove TypesAnd1-5 staging types from ProviderFactoryBuilder (#22049)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:57:05 +00:00
Georgios Konstantopoulos
dec1cad318 refactor(trie): merge SparseTrieExt into SparseTrie trait (#22035)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:39:56 +00:00
Georgios Konstantopoulos
165b94c3fa chore(docker): pass RUSTC_WRAPPER to cargo build in Dockerfile.depot (#22048)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:37:43 +00:00
Georgios Konstantopoulos
69e4c06ae7 chore(log): simplify default profiler tracing filter (#22050)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:33:20 +00:00
Georgios Konstantopoulos
1406a984a7 ci: pass --no-fail-fast to all cargo nextest runs (#22046)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:21:38 +00:00
Andrey Kolishchak
93d6b9782c fix(node): ethstats conn/last_ping deadlock (#21463) 2026-02-11 03:48:54 +00:00
DaniPopes
68e4ff1f7d feat: global runtime (#21934) 2026-02-11 03:45:09 +00:00
Georgios Konstantopoulos
33467ea6dd fix(reth-bench): increase WS keepalive interval to match persistence timeout (#22039) 2026-02-11 02:45:54 +00:00
Georgios Konstantopoulos
3bf9280b3c refactor(storage): add with_*_opt builder methods to StorageSettings (#21998)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 02:19:33 +00:00
Georgios Konstantopoulos
5c93986e6d feat(reth-bench): accept bare integers as milliseconds for --wait-time (#22038)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 01:57:42 +00:00
Georgios Konstantopoulos
779e0eb8bb perf: downgrade on_hashed_state_update and on_prewarm_targets spans to trace (#22044)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-10 22:45:05 +00:00
Emma Jamieson-Hoare
5c4163c177 feat(exex): make backfill thresholds configurable (#22037)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
2026-02-10 21:30:18 +00:00
Emma Jamieson-Hoare
c5d1f70dd3 fix(txpool): correct swapped args in blob_tx_priority calls (#22030)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-10 21:17:34 +00:00
YK
a8ec78fc87 perf(engine): implement BAL handler for SparseTrieCacheTask (#21990)
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-10 20:50:16 +00:00
Georgios Konstantopoulos
1ecbb0b9d6 chore: move jemalloc, asm-keccak, min-debug-logs to default features (#22034)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-10 20:46:59 +00:00
Georgios Konstantopoulos
a40647e651 fix(docker): fix sccache stats in Dockerfile.depot (#22033)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-10 19:50:30 +00:00
Georgios Konstantopoulos
b25b8c00ee feat(engine): add getPayloadBodiesV2 endpoints for EIP-7928 BAL support (#21774)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-10 15:10:05 +00:00
John Chase
b20a99e1c9 ci: skip scheduled workflows on forks (#22022) 2026-02-10 14:36:20 +00:00
DaniPopes
9ec0e3cd51 chore: rm random log file (#22023) 2026-02-10 15:43:56 +01:00
Georgios Konstantopoulos
09837bbdb4 chore: remove base.reth.rs public endpoint references (#22019)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-10 14:18:46 +00:00
Matthias Seitz
198e457a12 feat(rpc): add subscribeFinalizedChainNotifications endpoint (#22011)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-10 15:07:31 +01:00
DaniPopes
c727c61101 feat(trie): remove SerialSparseTrie (#21808)
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Brian Picciano <me@mediocregopher.com>
2026-02-10 13:50:54 +00:00
Georgios Konstantopoulos
366857559b fix(rocksdb): set max_open_files to prevent fd exhaustion (#22005)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-09 22:32:48 +00:00
Matthias Seitz
ccd15e8a25 refactor(txpool): rename and document validation methods (#22008)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-09 22:17:19 +00:00
Georgios Konstantopoulos
67f89fa4b2 feat(engine): prefetch withdrawal addresses in pre-warming (#21966)
Co-authored-by: mattsse <matt@paradigm.xyz>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-09 22:14:41 +00:00
Georgios Konstantopoulos
a87510069d refactor(pool): add IntoIter: Send bounds to avoid unnecessary Vec collect (#22001)
Co-authored-by: klkvr <klkvr@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Emma Jamieson-Hoare <ejamieson19@gmail.com>
Co-authored-by: Emma Jamieson-Hoare <emmajam@users.noreply.github.com>
2026-02-09 21:45:56 +00:00
Emma Jamieson-Hoare
b3fe168548 fix(rpc): enforce blockHash constraint in append_matching_block_logs (#22007)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-09 21:45:53 +00:00
Emma Jamieson-Hoare
8d7583b6fb chore: move Kurtosis failures to the hive slack channel (#21983)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-09 21:39:04 +00:00
Georgios Konstantopoulos
32466fe223 feat(rpc): propagate TransactionOrigin through send_transaction and batcher (#21969)
Co-authored-by: klkvr <klkvr@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-09 20:34:23 +00:00
Alexey Shekhirin
f2061991c5 feat(engine): reorg depth commitment metric (#21992) 2026-02-09 20:25:54 +00:00
Dan Cline
a549b4d66d feat(storage): add use_hashed_state storage setting (#21997) 2026-02-09 20:15:13 +00:00
Arsenii Kulikov
cdcea2bd33 perf: better scheduling for storage roots computation (#21987)
Co-authored-by: Brian Picciano <me@mediocregopher.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-09 18:10:45 +00:00
Matthias Seitz
3898cc5c3d chore(deps): bump alloy 1.6.2 -> 1.6.3 (#21986)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-09 19:00:13 +01:00
Dan Cline
c558c1d10f fix(stages): skip sender unwind when fully pruned (#21988) 2026-02-09 17:36:20 +00:00
Georgios Konstantopoulos
5f7ecc6187 chore(net): remove OP stack bootnodes (#21984)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-09 15:00:58 +00:00
DaniPopes
15b6e7f6fc ci: use depot for hive and kurtosis image builds, run daily (#21976)
Co-authored-by: Jennifer <jenpaff0@gmail.com>
2026-02-09 14:55:12 +00:00
Georgios Konstantopoulos
503b9b87a6 feat(tracing): add jsonrpsee targets to profiling filter (#21981) 2026-02-09 13:42:37 +00:00
Matthias Seitz
600eab20a5 feat(cli): rename enable-sparse-trie-as-cache to legacy-trie (#21851)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-09 13:02:14 +00:00
Matthias Seitz
a7eef9c6dc chore(deps): bump alloy from 1.6.1 to 1.6.2 (#21974)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-09 13:52:39 +01:00
Brian Picciano
6aebf8c064 chore(trie): Spans and traces for sparse trie (#21973) 2026-02-09 11:53:40 +00:00
Brian Picciano
655a463c18 fix(trie): Do not reveal disconnected leaves (#21924) 2026-02-09 11:39:40 +00:00
github-actions[bot]
a8b9c9a9dc chore(deps): weekly cargo update (#21955)
Co-authored-by: github-merge-queue <118344674+github-merge-queue@users.noreply.github.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-08 10:38:24 +00:00
Georgios Konstantopoulos
7679625fd3 chore(ci): improve wasm and riscv check output (#21956)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-08 02:51:54 +00:00
Georgios Konstantopoulos
7ac0d542b6 refactor(engine): wrap ExecutionCache internals in single Arc (#21950)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-07 19:13:16 +00:00
Huber
e4b2b1edf3 feat(txpool): add missing no_eip7702/set_eip7702 builder methods (#21926) 2026-02-07 19:12:23 +00:00
Matthias Seitz
95ed377135 perf(prewarm): disable balance check for prewarming transactions (#21941)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-07 18:40:41 +00:00
DaniPopes
db01c10a1d chore: add libmdbx to default tracing filter (#21944) 2026-02-07 16:32:15 +00:00
Haardik
b9d7744389 feat: add a public prune_transactions method to the TransactionPool (#21765)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-07 14:14:08 +00:00
Georgios Konstantopoulos
e72e85632b perf(persistence): combine save_blocks and prune into single MDBX commit (#21927)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
2026-02-07 04:33:31 +00:00
Dan Cline
8033b77ad3 chore(persistence): delete ambiguous TODO (#21923) 2026-02-07 00:19:46 +00:00
DaniPopes
1fe5623f78 chore: bump persistence timeout (#21919) 2026-02-06 22:24:26 +00:00
DaniPopes
887421cef2 chore: log mdbx tx manager msg (#21916) 2026-02-06 21:28:14 +00:00
Dan Cline
352430cd84 fix: skip sender recovery stage when senders fully pruned (#21918) 2026-02-06 21:22:40 +00:00
373 changed files with 11341 additions and 12925 deletions

View File

@@ -0,0 +1,5 @@
---
reth-transaction-pool: patch
---
Renamed and documented validation methods for clarity. `validate_one_no_state` and `validate_one_against_state` are now public methods `validate_stateless` and `validate_stateful` with improved documentation explaining their respective validation phases.

View File

@@ -0,0 +1,5 @@
---
reth-network: minor
---
Added reason label to backed_off_peers metric. The metric now tracks backed off peers by reason (too_many_peers, graceful_close, connection_error) to improve observability.

View File

@@ -0,0 +1,5 @@
---
reth-trie-sparse-parallel: patch
---
Fixed parallel sparse trie to skip revealing disconnected leaves by checking parent branch reachability before inserting leaf nodes.

View File

@@ -0,0 +1,5 @@
---
ef-tests: patch
---
Removed reth-stateless crate and stateless validation from ef-tests.

View File

@@ -0,0 +1,6 @@
---
reth-engine-tree: patch
reth-trie-sparse-parallel: patch
---
Added tracing spans and debug logs to sparse trie operations for better observability during parallel state root computation.

View File

@@ -0,0 +1,6 @@
---
reth-exex: patch
reth-exex-types: patch
---
Added configurable backfill thresholds to ExEx notifications stream and added regression tests for state provider parity between pipeline and backfill execution paths.

View File

@@ -0,0 +1,4 @@
---
---
Added WebSocket subscription integration tests for eth_subscribe.

View File

@@ -0,0 +1,4 @@
---
---
Improved nightly Docker build failure Slack notification with more detailed formatting and context.

View File

@@ -0,0 +1,7 @@
---
reth: patch
reth-cli-commands: patch
reth-node-core: patch
---
Removed experimental ress protocol support for stateless Ethereum nodes.

View File

@@ -0,0 +1,5 @@
---
reth-node-builder: patch
---
Removed biased select in engine service loop to allow fair scheduling of shutdown requests alongside event processing.

View File

@@ -0,0 +1,5 @@
---
reth-transaction-pool: patch
---
Fixed swapped arguments in `blob_tx_priority` function calls, correcting the parameter order to match the function signature.

View File

@@ -0,0 +1,4 @@
---
---
Improved documentation overview page with better structure and clarity.

View File

@@ -0,0 +1,5 @@
---
reth-node-events: patch
---
Updated consensus engine log message to be more accurate about received updates.

View File

@@ -0,0 +1,6 @@
---
reth-chainspec: minor
reth-network-peers: minor
---
Removed OP stack bootnodes from default chain configurations and network peers module.

View File

@@ -0,0 +1,9 @@
---
reth-network-api: minor
reth-network-types: minor
reth-network: minor
reth-node-core: minor
reth: minor
---
Added optional ENR fork ID enforcement to filter out peers from incompatible networks during peer discovery, controlled by the `--enforce-enr-fork-id` CLI flag.

View File

@@ -0,0 +1,5 @@
---
reth-primitives: patch
---
Moved feature-referenced dependencies from dev-dependencies to optional dependencies to ensure they are available when their corresponding features are enabled.

View File

@@ -0,0 +1,5 @@
---
reth-transaction-pool: minor
---
Added `IntoIter: Send` bounds to `validate_transactions` and `validate_transactions_with_origin` in the `TransactionValidator` trait, avoiding unnecessary `Vec` collects. Simplified default `validate_transactions_with_origin` to delegate to `validate_transactions`.

View File

@@ -0,0 +1,5 @@
---
reth-provider: patch
---
Removed unused staging types from ProviderFactoryBuilder.

View File

@@ -0,0 +1,5 @@
---
reth-trie-sparse: minor
---
Removed `SerialSparseTrie` from the workspace, consolidating on `ParallelSparseTrie` as the single sparse trie implementation in `reth-trie-sparse`.

View File

@@ -0,0 +1,5 @@
---
reth-trie-sparse: patch
---
Fixed a bug where trie nodes could appear in both `updated_nodes` and `removed_nodes` simultaneously by removing entries from `removed_nodes` when a node is inserted as updated.

View File

@@ -0,0 +1,4 @@
---
---
Expanded CLI integration tests with subcommand help coverage, config TOML validation, genesis JSON validation, and send transaction round-trip test for dev mode.

View File

@@ -0,0 +1,5 @@
---
reth-network: minor
---
Added direction labels to `closed_sessions` and `pending_session_failures` metrics. Operators can now distinguish session closures and failures by direction (`active`, `incoming_pending`, `outgoing_pending` for closed sessions; `inbound`, `outbound` for pending session failures).

View File

@@ -0,0 +1,4 @@
---
---
Moved Kurtosis CI failure notifications to the hive Slack channel.

View File

@@ -0,0 +1,7 @@
---
reth-rpc-api: minor
reth-rpc-builder: patch
reth-rpc: minor
---
Added `subscribeFinalizedChainNotifications` RPC endpoint that buffers committed chain notifications and emits them once a new finalized block is received.

View File

@@ -0,0 +1,5 @@
---
reth-trie: patch
---
Fixed a potential panic in `ProofCalculator` by clearing internal computation state (`branch_stack`, `child_stack`, `branch_path`, etc.) after errors, preventing stale state from causing `usize` underflow panics when the calculator is reused. Added a test verifying correct behavior after simulated mid-computation errors.

2
.github/CODEOWNERS vendored
View File

@@ -38,7 +38,7 @@ crates/storage/libmdbx-rs/ @shekhirin
crates/storage/nippy-jar/ @joshieDo @shekhirin
crates/storage/provider/ @joshieDo @shekhirin @yongkangc
crates/storage/storage-api/ @joshieDo
crates/tasks/ @mattsse
crates/tasks/ @mattsse @DaniPopes
crates/tokio-util/ @mattsse
crates/tracing/ @mattsse @shekhirin
crates/tracing-otlp/ @mattsse @Rjected

View File

@@ -1,7 +1,6 @@
#!/usr/bin/env bash
set +e # Disable immediate exit on error
set -uo pipefail
# Array of crates to check
crates_to_check=(
reth-codecs-derive
reth-primitives
@@ -28,54 +27,22 @@ crates_to_check=(
reth-ethereum-forks
reth-ethereum-primitives
reth-ethereum-consensus
reth-stateless
)
# Array to hold the results
results=()
# Flag to track if any command fails
any_failed=0
tmpdir=$(mktemp -d 2>/dev/null || mktemp -d -t reth-check)
trap 'rm -rf -- "$tmpdir"' EXIT INT TERM
for crate in "${crates_to_check[@]}"; do
cmd="cargo +stable build -p $crate --target riscv32imac-unknown-none-elf --no-default-features"
if [ -n "$CI" ]; then
echo "::group::$cmd"
outfile="$tmpdir/$crate.log"
if cargo +stable build -p "$crate" --target riscv32imac-unknown-none-elf --no-default-features --color never >"$outfile" 2>&1; then
echo "$crate"
else
printf "\n%s:\n %s\n" "$crate" "$cmd"
fi
set +e # Disable immediate exit on error
# Run the command and capture the return code
$cmd
ret_code=$?
set -e # Re-enable immediate exit on error
# Store the result in the dictionary
if [ $ret_code -eq 0 ]; then
results+=("1:✅:$crate")
else
results+=("2:❌:$crate")
echo "$crate"
sed 's/^/ /' "$outfile"
echo ""
any_failed=1
fi
if [ -n "$CI" ]; then
echo "::endgroup::"
fi
done
# Sort the results by status and then by crate name
IFS=$'\n' sorted_results=($(sort <<<"${results[*]}"))
unset IFS
# Print summary
echo -e "\nSummary of build results:"
for result in "${sorted_results[@]}"; do
status="${result#*:}"
status="${status%%:*}"
crate="${result##*:}"
echo "$status $crate"
done
# Exit with a non-zero status if any command fails
exit $any_failed

View File

@@ -1,11 +1,10 @@
#!/usr/bin/env bash
set +e # Disable immediate exit on error
set -uo pipefail
# Array of crates to compile
crates=($(cargo metadata --format-version=1 --no-deps | jq -r '.packages[].name' | grep '^reth' | sort))
readarray -t crates < <(
cargo metadata --format-version=1 --no-deps | jq -r '.packages[].name' | grep '^reth' | sort
)
# Array of crates to exclude
# Used with the `contains` function.
# shellcheck disable=SC2034
exclude_crates=(
# The following require investigation if they can be fixed
@@ -64,6 +63,7 @@ exclude_crates=(
reth-provider # tokio
reth-prune # tokio
reth-prune-static-files # reth-provider
reth-tasks # tokio rt-multi-thread
reth-stages-api # reth-provider, reth-prune
reth-static-file # tokio
reth-transaction-pool # c-kzg
@@ -77,70 +77,35 @@ exclude_crates=(
reth-node-ethstats
)
# Array to hold the results
results=()
# Flag to track if any command fails
any_failed=0
tmpdir=$(mktemp -d 2>/dev/null || mktemp -d -t reth-check)
trap 'rm -rf -- "$tmpdir"' EXIT INT TERM
# Function to check if a value exists in an array
contains() {
local array="$1[@]"
local seeking=$2
local in=1
local seeking="$2"
local element
for element in "${!array}"; do
if [[ "$element" == "$seeking" ]]; then
in=0
break
fi
[[ "$element" == "$seeking" ]] && return 0
done
return $in
return 1
}
for crate in "${crates[@]}"; do
if contains exclude_crates "$crate"; then
results+=("3:⏭️:$crate")
echo "⏭️ $crate"
continue
fi
cmd="cargo +stable build -p $crate --target wasm32-wasip1 --no-default-features"
if [ -n "$CI" ]; then
echo "::group::$cmd"
outfile="$tmpdir/$crate.log"
if cargo +stable build -p "$crate" --target wasm32-wasip1 --no-default-features --color never >"$outfile" 2>&1; then
echo "$crate"
else
printf "\n%s:\n %s\n" "$crate" "$cmd"
fi
set +e # Disable immediate exit on error
# Run the command and capture the return code
$cmd
ret_code=$?
set -e # Re-enable immediate exit on error
# Store the result in the dictionary
if [ $ret_code -eq 0 ]; then
results+=("1:✅:$crate")
else
results+=("2:❌:$crate")
echo "$crate"
sed 's/^/ /' "$outfile"
echo ""
any_failed=1
fi
if [ -n "$CI" ]; then
echo "::endgroup::"
fi
done
# Sort the results by status and then by crate name
IFS=$'\n' sorted_results=($(sort <<<"${results[*]}"))
unset IFS
# Print summary
echo -e "\nSummary of build results:"
for result in "${sorted_results[@]}"; do
status="${result#*:}"
status="${status%%:*}"
crate="${result##*:}"
echo "$status $crate"
done
# Exit with a non-zero status if any command fails
exit $any_failed

View File

@@ -15,6 +15,7 @@ permissions:
jobs:
update:
if: github.repository == 'paradigmxyz/reth'
uses: tempoxyz/ci/.github/workflows/cargo-update-pr.yml@main
secrets:
token: ${{ secrets.GITHUB_TOKEN }}

53
.github/workflows/docker-test.yml vendored Normal file
View File

@@ -0,0 +1,53 @@
name: Build test Docker image
on:
workflow_call:
inputs:
hive_target:
required: true
type: string
description: "Docker bake target to build (e.g. hive-stable, hive-edge)"
artifact_name:
required: false
type: string
default: "artifacts"
description: "Name for the uploaded artifact"
jobs:
build:
if: github.repository == 'paradigmxyz/reth'
timeout-minutes: 45
runs-on: ubuntu-latest
permissions:
id-token: write
contents: read
steps:
- uses: actions/checkout@v6
- run: mkdir -p artifacts
- name: Get git info
id: git
run: |
echo "sha=${{ github.sha }}" >> "$GITHUB_OUTPUT"
echo "describe=$(git describe --always --tags)" >> "$GITHUB_OUTPUT"
- name: Set up Depot CLI
uses: depot/setup-action@v1
- name: Build reth image
uses: depot/bake-action@v1
env:
DEPOT_TOKEN: ${{ secrets.DEPOT_TOKEN }}
VERGEN_GIT_SHA: ${{ steps.git.outputs.sha }}
VERGEN_GIT_DESCRIBE: ${{ steps.git.outputs.describe }}
with:
project: ${{ vars.DEPOT_PROJECT_ID }}
files: docker-bake.hcl
targets: ${{ inputs.hive_target }}
push: false
- name: Upload reth image
uses: actions/upload-artifact@v6
with:
name: ${{ inputs.artifact_name }}
path: ./artifacts

View File

@@ -31,6 +31,7 @@ on:
jobs:
build:
if: github.repository == 'paradigmxyz/reth'
name: Build Docker images
runs-on: ubuntu-24.04
permissions:
@@ -69,18 +70,27 @@ jobs:
# Add 'latest' tag for non-RC releases
if [[ ! "$VERSION" =~ -rc ]]; then
echo "ethereum_tags=${REGISTRY}/reth:${VERSION},${REGISTRY}/reth:latest" >> "$GITHUB_OUTPUT"
{
echo "ethereum_set<<EOF"
echo "ethereum.tags=${REGISTRY}/reth:${VERSION}"
echo "ethereum.tags=${REGISTRY}/reth:latest"
echo "EOF"
} >> "$GITHUB_OUTPUT"
else
echo "ethereum_tags=${REGISTRY}/reth:${VERSION}" >> "$GITHUB_OUTPUT"
echo "ethereum_set=ethereum.tags=${REGISTRY}/reth:${VERSION}" >> "$GITHUB_OUTPUT"
fi
elif [[ "${{ github.event_name }}" == "schedule" ]] || [[ "${{ inputs.build_type }}" == "nightly" ]]; then
echo "targets=nightly" >> "$GITHUB_OUTPUT"
echo "ethereum_tags=${REGISTRY}/reth:nightly" >> "$GITHUB_OUTPUT"
echo "ethereum_set=ethereum.tags=${REGISTRY}/reth:nightly" >> "$GITHUB_OUTPUT"
else
# git-sha build
echo "targets=ethereum" >> "$GITHUB_OUTPUT"
echo "ethereum_tags=${REGISTRY}/reth:${{ github.sha }}" >> "$GITHUB_OUTPUT"
echo "ethereum_set=ethereum.tags=${REGISTRY}/reth:${{ github.sha }}" >> "$GITHUB_OUTPUT"
fi
- name: Build and push images
@@ -96,7 +106,7 @@ jobs:
targets: ${{ steps.params.outputs.targets }}
push: ${{ !(github.event_name == 'workflow_dispatch' && inputs.dry_run) }}
set: |
ethereum.tags=${{ steps.params.outputs.ethereum_tags }}
${{ steps.params.outputs.ethereum_set }}
- name: Verify image architectures
env:
@@ -116,6 +126,18 @@ jobs:
- name: Slack Webhook Action
uses: rtCamp/action-slack-notify@v2
env:
SLACK_COLOR: ${{ job.status }}
SLACK_MESSAGE: "Failed run: https://github.com/paradigmxyz/reth/actions/runs/${{ github.run_id }}"
SLACK_COLOR: danger
SLACK_ICON_EMOJI: ":rotating_light:"
SLACK_USERNAME: "GitHub Actions"
SLACK_TITLE: ":rotating_light: Nightly Docker Build Failed"
SLACK_MESSAGE: |
The scheduled nightly Docker build failed.
*Commit:* `${{ github.sha }}`
*Branch:* `${{ github.ref_name }}`
*Run:* <https://github.com/paradigmxyz/reth/actions/runs/${{ github.run_id }}|View logs>
*Action required:* Re-run the workflow or investigate the build failure.
SLACK_FOOTER: "paradigmxyz/reth · docker.yml"
MSG_MINIMAL: true
SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK_URL }}

View File

@@ -35,6 +35,7 @@ jobs:
- name: Run e2e tests
run: |
cargo nextest run \
--no-fail-fast \
--locked --features "asm-keccak" \
--workspace \
--exclude 'example-*' \
@@ -61,6 +62,7 @@ jobs:
- name: Run RocksDB e2e tests
run: |
cargo nextest run \
--no-fail-fast \
--locked --features "edge" \
-p reth-e2e-test-utils \
-E 'binary(rocksdb)'

View File

@@ -5,7 +5,7 @@ name: hive
on:
workflow_dispatch:
schedule:
- cron: "0 */6 * * *"
- cron: "0 0 * * *"
env:
CARGO_TERM_COLOR: always
@@ -15,27 +15,24 @@ concurrency:
cancel-in-progress: true
jobs:
prepare-reth-stable:
uses: ./.github/workflows/prepare-reth.yml
build-reth-stable:
uses: ./.github/workflows/docker-test.yml
with:
image_tag: ghcr.io/paradigmxyz/reth:latest
binary_name: reth
cargo_features: "asm-keccak"
hive_target: hive-stable
artifact_name: "reth-stable"
secrets: inherit
prepare-reth-edge:
uses: ./.github/workflows/prepare-reth.yml
build-reth-edge:
uses: ./.github/workflows/docker-test.yml
with:
image_tag: ghcr.io/paradigmxyz/reth:latest
binary_name: reth
cargo_features: "asm-keccak edge"
hive_target: hive-edge
artifact_name: "reth-edge"
secrets: inherit
prepare-hive:
if: github.repository == 'paradigmxyz/reth'
timeout-minutes: 45
runs-on:
group: Reth
runs-on: depot-ubuntu-latest-4
steps:
- uses: actions/checkout@v6
- name: Checkout hive tests
@@ -187,12 +184,11 @@ jobs:
- sim: ethereum/eels/consume-rlp
limit: .*tests/paris.*
needs:
- prepare-reth-stable
- prepare-reth-edge
- build-reth-stable
- build-reth-edge
- prepare-hive
name: ${{ matrix.storage }} / ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }}
runs-on:
group: Reth
runs-on: depot-ubuntu-latest-4
permissions:
issues: write
steps:

View File

@@ -46,6 +46,7 @@ jobs:
- name: Run tests
run: |
cargo nextest run \
--no-fail-fast \
--locked --features "asm-keccak ${{ matrix.network }} ${{ matrix.storage == 'edge' && 'edge' || '' }}" \
--workspace --exclude ef-tests \
-E "kind(test) and not binary(e2e_testsuite)"
@@ -64,7 +65,7 @@ jobs:
era-files:
name: era1 file integration tests once a day
if: github.event_name == 'schedule'
if: github.event_name == 'schedule' && github.repository == 'paradigmxyz/reth'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
@@ -76,4 +77,4 @@ jobs:
with:
cache-on-failure: true
- name: run era1 files integration tests
run: cargo nextest run --release --package reth-era --test it -- --ignored
run: cargo nextest run --no-fail-fast --release --package reth-era --test it -- --ignored

View File

@@ -5,7 +5,7 @@ name: kurtosis
on:
workflow_dispatch:
schedule:
- cron: "0 */6 * * *"
- cron: "0 0 * * *"
push:
tags:
@@ -19,11 +19,12 @@ concurrency:
cancel-in-progress: true
jobs:
prepare-reth:
uses: ./.github/workflows/prepare-reth.yml
build-reth:
if: github.repository == 'paradigmxyz/reth'
uses: ./.github/workflows/docker-test.yml
with:
image_tag: ghcr.io/paradigmxyz/reth:kurtosis-ci
binary_name: reth
hive_target: kurtosis
secrets: inherit
test:
timeout-minutes: 60
@@ -32,7 +33,7 @@ jobs:
name: run kurtosis
runs-on: depot-ubuntu-latest
needs:
- prepare-reth
- build-reth
steps:
- uses: actions/checkout@v6
with:
@@ -65,4 +66,4 @@ jobs:
env:
SLACK_COLOR: ${{ job.status }}
SLACK_MESSAGE: "Failed run: https://github.com/paradigmxyz/reth/actions/runs/${{ github.run_id }}"
SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK_URL }}
SLACK_WEBHOOK: ${{ secrets.SLACK_HIVE_WEBHOOK_URL }}

View File

@@ -1,61 +0,0 @@
name: Prepare Reth Image
on:
workflow_call:
inputs:
image_tag:
required: true
type: string
description: "Docker image tag to use"
binary_name:
required: false
type: string
default: "reth"
description: "Binary name to build"
cargo_features:
required: false
type: string
default: "asm-keccak"
description: "Cargo features to enable"
cargo_package:
required: false
type: string
description: "Optional cargo package path"
artifact_name:
required: false
type: string
default: "artifacts"
description: "Name for the uploaded artifact"
jobs:
prepare-reth:
if: github.repository == 'paradigmxyz/reth'
timeout-minutes: 45
runs-on: depot-ubuntu-latest
steps:
- uses: actions/checkout@v6
- run: mkdir artifacts
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build and export reth image
uses: docker/build-push-action@v6
with:
context: .
file: .github/scripts/hive/Dockerfile
tags: ${{ inputs.image_tag }}
outputs: type=docker,dest=./artifacts/reth_image.tar
build-args: |
CARGO_BIN=${{ inputs.binary_name }}
MANIFEST_PATH=${{ inputs.cargo_package }}
FEATURES=${{ inputs.cargo_features }}
cache-from: type=gha
cache-to: type=gha,mode=max
- name: Upload reth image
id: upload
uses: actions/upload-artifact@v6
with:
name: ${{ inputs.artifact_name }}
path: ./artifacts

View File

@@ -7,6 +7,7 @@ on:
jobs:
build:
if: github.repository == 'paradigmxyz/reth'
name: build reproducible binaries
runs-on: ${{ matrix.runner }}
strategy:

View File

@@ -38,7 +38,7 @@ jobs:
cache-on-failure: true
- name: Build reth
run: |
cargo install --features asm-keccak,jemalloc --path bin/reth
cargo install --path bin/reth
- name: Run headers stage
run: |
reth stage run headers --from ${{ env.FROM_BLOCK }} --to ${{ env.TO_BLOCK }} --commit --checkpoints

View File

@@ -9,6 +9,7 @@ on:
jobs:
close-issues:
if: github.repository == 'paradigmxyz/reth'
runs-on: ubuntu-latest
permissions:
issues: write

View File

@@ -17,6 +17,7 @@ concurrency:
jobs:
sync:
if: github.repository == 'paradigmxyz/reth'
name: sync (${{ matrix.chain.bin }})
runs-on: depot-ubuntu-latest
env:

View File

@@ -17,6 +17,7 @@ concurrency:
jobs:
sync:
if: github.repository == 'paradigmxyz/reth'
name: sync (${{ matrix.chain.bin }})
runs-on: depot-ubuntu-latest
env:

View File

@@ -49,6 +49,7 @@ jobs:
- name: Run tests
run: |
cargo nextest run \
--no-fail-fast \
--features "${{ matrix.features }} $EDGE_FEATURES" --locked \
${{ matrix.exclude_args }} --workspace \
--exclude ef-tests --no-tests=warn \
@@ -87,7 +88,7 @@ jobs:
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
- run: cargo nextest run --cargo-profile hivetests -p ef-tests --features "asm-keccak ef-tests"
- run: cargo nextest run --no-fail-fast --cargo-profile hivetests -p ef-tests --features "asm-keccak ef-tests"
doc:
name: doc tests

View File

@@ -381,7 +381,7 @@ cargo nextest run --workspace
cargo bench --bench bench_name
# Build optimized binary
cargo build --release --features "jemalloc asm-keccak"
cargo build --release
# Check compilation for all features
cargo check --workspace --all-features

879
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
[workspace.package]
version = "1.10.2"
version = "1.11.3"
edition = "2024"
rust-version = "1.88"
license = "MIT OR Apache-2.0"
@@ -83,8 +83,6 @@ members = [
"crates/prune/db",
"crates/prune/prune",
"crates/prune/types",
"crates/ress/protocol",
"crates/ress/provider",
"crates/revm/",
"crates/rpc/ipc/",
"crates/rpc/rpc-api/",
@@ -101,7 +99,6 @@ members = [
"crates/stages/api/",
"crates/stages/stages/",
"crates/stages/types/",
"crates/stateless",
"crates/static-file/static-file",
"crates/static-file/types/",
"crates/storage/codecs/",
@@ -125,7 +122,6 @@ members = [
"crates/trie/db",
"crates/trie/parallel/",
"crates/trie/sparse",
"crates/trie/sparse-parallel/",
"crates/trie/trie",
"examples/beacon-api-sidecar-fetcher/",
"examples/beacon-api-sse/",
@@ -310,6 +306,11 @@ inherits = "release"
lto = "fat"
codegen-units = 1
[profile.maxperf-symbols]
inherits = "maxperf"
debug = "full"
strip = "none"
[profile.reproducible]
inherits = "release"
panic = "abort"
@@ -418,7 +419,6 @@ reth-rpc-convert = { path = "crates/rpc/rpc-convert" }
reth-stages = { path = "crates/stages/stages" }
reth-stages-api = { path = "crates/stages/api" }
reth-stages-types = { path = "crates/stages/types", default-features = false }
reth-stateless = { path = "crates/stateless", default-features = false }
reth-static-file = { path = "crates/static-file/static-file" }
reth-static-file-types = { path = "crates/static-file/types", default-features = false }
reth-storage-api = { path = "crates/storage/storage-api", default-features = false }
@@ -434,10 +434,7 @@ reth-trie-common = { path = "crates/trie/common", default-features = false }
reth-trie-db = { path = "crates/trie/db" }
reth-trie-parallel = { path = "crates/trie/parallel" }
reth-trie-sparse = { path = "crates/trie/sparse", default-features = false }
reth-trie-sparse-parallel = { path = "crates/trie/sparse-parallel" }
reth-zstd-compressors = { path = "crates/storage/zstd-compressors", default-features = false }
reth-ress-protocol = { path = "crates/ress/protocol" }
reth-ress-provider = { path = "crates/ress/provider" }
# revm
revm = { version = "34.0.0", default-features = false }
@@ -451,46 +448,57 @@ op-revm = { version = "15.0.0", default-features = false }
revm-inspectors = "0.34.2"
# eth
alloy-dyn-abi = "1.5.4"
alloy-primitives = { version = "1.5.4", default-features = false, features = ["map-foldhash"] }
alloy-sol-types = { version = "1.5.4", default-features = false }
alloy-dyn-abi = "1.5.6"
alloy-primitives = { version = "1.5.6", default-features = false, features = [
"map-foldhash",
] }
alloy-sol-types = { version = "1.5.6", default-features = false }
alloy-chains = { version = "0.2.5", default-features = false }
alloy-eip2124 = { version = "0.2.0", default-features = false }
alloy-eip7928 = { version = "0.3.0", default-features = false }
alloy-evm = { version = "0.27.2", default-features = false }
alloy-rlp = { version = "0.3.10", default-features = false, features = ["core-net"] }
alloy-rlp = { version = "0.3.13", default-features = false, features = [
"core-net",
] }
alloy-trie = { version = "0.9.4", default-features = false }
alloy-hardforks = "0.4.5"
alloy-consensus = { version = "1.6.1", default-features = false }
alloy-contract = { version = "1.6.1", default-features = false }
alloy-eips = { version = "1.6.1", default-features = false }
alloy-genesis = { version = "1.6.1", default-features = false }
alloy-json-rpc = { version = "1.6.1", default-features = false }
alloy-network = { version = "1.6.1", default-features = false }
alloy-network-primitives = { version = "1.6.1", default-features = false }
alloy-provider = { version = "1.6.1", features = ["reqwest", "debug-api"], default-features = false }
alloy-pubsub = { version = "1.6.1", default-features = false }
alloy-rpc-client = { version = "1.6.1", default-features = false }
alloy-rpc-types = { version = "1.6.1", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.6.1", default-features = false }
alloy-rpc-types-anvil = { version = "1.6.1", default-features = false }
alloy-rpc-types-beacon = { version = "1.6.1", default-features = false }
alloy-rpc-types-debug = { version = "1.6.1", default-features = false }
alloy-rpc-types-engine = { version = "1.6.1", default-features = false }
alloy-rpc-types-eth = { version = "1.6.1", default-features = false }
alloy-rpc-types-mev = { version = "1.6.1", default-features = false }
alloy-rpc-types-trace = { version = "1.6.1", default-features = false }
alloy-rpc-types-txpool = { version = "1.6.1", default-features = false }
alloy-serde = { version = "1.6.1", default-features = false }
alloy-signer = { version = "1.6.1", default-features = false }
alloy-signer-local = { version = "1.6.1", default-features = false }
alloy-transport = { version = "1.6.1" }
alloy-transport-http = { version = "1.6.1", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.6.1", default-features = false }
alloy-transport-ws = { version = "1.6.1", default-features = false }
alloy-consensus = { version = "1.6.3", default-features = false }
alloy-contract = { version = "1.6.3", default-features = false }
alloy-eips = { version = "1.6.3", default-features = false }
alloy-genesis = { version = "1.6.3", default-features = false }
alloy-json-rpc = { version = "1.6.3", default-features = false }
alloy-network = { version = "1.6.3", default-features = false }
alloy-network-primitives = { version = "1.6.3", default-features = false }
alloy-provider = { version = "1.6.3", features = [
"reqwest",
"debug-api",
], default-features = false }
alloy-pubsub = { version = "1.6.3", default-features = false }
alloy-rpc-client = { version = "1.6.3", default-features = false }
alloy-rpc-types = { version = "1.6.3", features = [
"eth",
], default-features = false }
alloy-rpc-types-admin = { version = "1.6.3", default-features = false }
alloy-rpc-types-anvil = { version = "1.6.3", default-features = false }
alloy-rpc-types-beacon = { version = "1.6.3", default-features = false }
alloy-rpc-types-debug = { version = "1.6.3", default-features = false }
alloy-rpc-types-engine = { version = "1.6.3", default-features = false }
alloy-rpc-types-eth = { version = "1.6.3", default-features = false }
alloy-rpc-types-mev = { version = "1.6.3", default-features = false }
alloy-rpc-types-trace = { version = "1.6.3", default-features = false }
alloy-rpc-types-txpool = { version = "1.6.3", default-features = false }
alloy-serde = { version = "1.6.3", default-features = false }
alloy-signer = { version = "1.6.3", default-features = false }
alloy-signer-local = { version = "1.6.3", default-features = false }
alloy-transport = { version = "1.6.3" }
alloy-transport-http = { version = "1.6.3", features = [
"reqwest-rustls-tls",
], default-features = false }
alloy-transport-ipc = { version = "1.6.3", default-features = false }
alloy-transport-ws = { version = "1.6.3", default-features = false }
# op
alloy-op-evm = { version = "0.27.2", default-features = false }
@@ -507,7 +515,10 @@ either = { version = "1.15.0", default-features = false }
arrayvec = { version = "0.7.6", default-features = false }
aquamarine = "0.6"
auto_impl = "1"
backon = { version = "1.2", default-features = false, features = ["std-blocking-sleep", "tokio-sleep"] }
backon = { version = "1.2", default-features = false, features = [
"std-blocking-sleep",
"tokio-sleep",
] }
bincode = "1.3"
bitflags = "2.4"
boyer-moore-magiclen = "0.2.16"
@@ -529,9 +540,13 @@ itertools = { version = "0.14", default-features = false }
linked_hash_set = "0.1"
lz4 = "1.28.1"
modular-bitfield = "0.13.1"
notify = { version = "8.0.0", default-features = false, features = ["macos_fsevent"] }
notify = { version = "8.0.0", default-features = false, features = [
"macos_fsevent",
] }
nybbles = { version = "0.4.8", default-features = false }
once_cell = { version = "1.19", default-features = false, features = ["critical-section"] }
once_cell = { version = "1.19", default-features = false, features = [
"critical-section",
] }
parking_lot = "0.12"
paste = "1.0"
rand = "0.9"
@@ -550,7 +565,9 @@ strum_macros = "0.27"
syn = "2.0"
thiserror = { version = "2.0.0", default-features = false }
tar = "0.4.44"
tracing = { version = "0.1.0", default-features = false }
tracing = { version = "0.1.0", default-features = false, features = [
"attributes",
] }
tracing-appender = "0.2"
url = { version = "2.3", default-features = false }
zstd = "0.13"
@@ -588,7 +605,11 @@ futures-util = { version = "0.3", default-features = false }
hyper = "1.3"
hyper-util = "0.1.5"
pin-project = "1.0.12"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "rustls-tls-native-roots", "stream"] }
reqwest = { version = "0.12", default-features = false, features = [
"rustls-tls",
"rustls-tls-native-roots",
"stream",
] }
tracing-futures = "0.2"
tower = "0.5"
tower-http = "0.6"
@@ -613,7 +634,10 @@ proptest-arbitrary-interop = "0.1.0"
# crypto
enr = { version = "0.13", default-features = false }
k256 = { version = "0.13", default-features = false, features = ["ecdsa"] }
secp256k1 = { version = "0.30", default-features = false, features = ["global-context", "recovery"] }
secp256k1 = { version = "0.30", default-features = false, features = [
"global-context",
"recovery",
] }
# rand 8 for secp256k1
rand_08 = { package = "rand", version = "0.8" }

View File

@@ -51,14 +51,15 @@ RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \
--mount=type=cache,target=/usr/local/cargo/registry,sharing=shared \
--mount=type=cache,target=/usr/local/cargo/git,sharing=shared \
--mount=type=cache,target=$SCCACHE_DIR,sharing=shared \
export RUSTC_WRAPPER=sccache SCCACHE_WEBDAV_ENDPOINT=https://cache.depot.dev SCCACHE_DIR=/sccache && \
sccache --start-server && \
if [ -n "$RUSTFLAGS" ]; then \
export RUSTFLAGS="$RUSTFLAGS"; \
elif [ "$TARGETPLATFORM" = "linux/amd64" ]; then \
export RUSTFLAGS="-C target-cpu=x86-64-v3 -C target-feature=+pclmulqdq"; \
fi && \
cargo build --profile $BUILD_PROFILE --features "$FEATURES" --locked --bin $BINARY --manifest-path $MANIFEST_PATH/Cargo.toml
RUN sccache --show-stats || true
cargo build --profile $BUILD_PROFILE --features "$FEATURES" --locked --bin $BINARY --manifest-path $MANIFEST_PATH/Cargo.toml && \
sccache --show-stats
# Copy binary to a known location (ARG not resolved in COPY)
# Note: Custom profiles like maxperf/profiling output to target/<profile>/, not target/release/

View File

@@ -12,12 +12,7 @@ FULL_DB_TOOLS_DIR := $(shell pwd)/$(DB_TOOLS_DIR)/
CARGO_TARGET_DIR ?= target
# List of features to use when building. Can be overridden via the environment.
# No jemalloc on Windows
ifeq ($(OS),Windows_NT)
FEATURES ?= asm-keccak min-debug-logs
else
FEATURES ?= jemalloc asm-keccak min-debug-logs
endif
FEATURES ?=
# Cargo profile for builds. Default is for local builds, CI uses an override.
PROFILE ?= release
@@ -158,7 +153,7 @@ COV_FILE := lcov.info
.PHONY: test-unit
test-unit: ## Run unit tests.
cargo install cargo-nextest --locked
cargo nextest run $(UNIT_TEST_ARGS)
cargo nextest run --no-fail-fast $(UNIT_TEST_ARGS)
.PHONY: cov-unit
@@ -191,7 +186,7 @@ $(EEST_TESTS_DIR):
.PHONY: ef-tests
ef-tests: $(EF_TESTS_DIR) $(EEST_TESTS_DIR) ## Runs Legacy and EEST tests.
cargo nextest run -p ef-tests --release --features ef-tests
cargo nextest run --no-fail-fast -p ef-tests --release --features ef-tests
##@ reth-bench
@@ -238,16 +233,15 @@ update-book-cli: build-debug ## Update book cli documentation.
.PHONY: profiling
profiling: ## Builds `reth` with optimisations, but also symbols.
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --features jemalloc,asm-keccak
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling
.PHONY: maxperf
maxperf: ## Builds `reth` with the most aggressive optimisations.
RUSTFLAGS="-C target-cpu=native" cargo build --profile maxperf --features jemalloc,asm-keccak
RUSTFLAGS="-C target-cpu=native" cargo build --profile maxperf
.PHONY: maxperf-no-asm
maxperf-no-asm: ## Builds `reth` with the most aggressive optimisations, minus the "asm-keccak" feature.
RUSTFLAGS="-C target-cpu=native" cargo build --profile maxperf --features jemalloc
RUSTFLAGS="-C target-cpu=native" cargo build --profile maxperf --no-default-features --features jemalloc,min-debug-logs,otlp,otlp-logs,reth-revm/portable,js-tracer,keccak-cache-global,rocksdb
fmt:
cargo +nightly fmt

View File

@@ -30,7 +30,7 @@ reth-bench-compare \
| `--draw` | Generate charts (needs Python/uv) | `false` | No |
| `--profile` | Enable CPU profiling (needs samply) | `false` | No |
| `-vvvv` | Debug logging | Info | No |
| `--features <FEATURES>` | Rust features for both builds | `jemalloc,asm-keccak` | No |
| `--features <FEATURES>` | Extra Rust features for both builds | - | No |
| `--rustflags <FLAGS>` | RUSTFLAGS for both builds | `-C target-cpu=native` | No |
| `--baseline-features <FEATURES>` | Features for baseline only | Inherits `--features` | No |
| `--feature-features <FEATURES>` | Features for feature only | Inherits `--features` | No |

View File

@@ -191,10 +191,9 @@ pub(crate) struct Args {
#[arg(trailing_var_arg = true, allow_hyphen_values = true)]
pub reth_args: Vec<String>,
/// Comma-separated list of features to enable during reth compilation (applied to both builds)
///
/// Example: `jemalloc,asm-keccak`
#[arg(long, value_name = "FEATURES", default_value = "jemalloc,asm-keccak")]
/// Comma-separated list of extra features to enable during reth compilation (applied to both
/// builds)
#[arg(long, value_name = "FEATURES", default_value = "")]
pub features: String,
/// Comma-separated list of features to enable only for baseline build (overrides --features)
@@ -205,7 +204,7 @@ pub(crate) struct Args {
/// Comma-separated list of features to enable only for feature build (overrides --features)
///
/// Example: `--feature-features jemalloc,asm-keccak`
/// Example: `--feature-features jemalloc-prof`
#[arg(long, value_name = "FEATURES")]
pub feature_features: Option<String>,
@@ -277,10 +276,8 @@ impl Args {
/// Get the default RPC URL for a given chain
const fn get_default_rpc_url(chain: &Chain) -> &'static str {
match chain.id() {
8453 => "https://base.reth.rs/rpc", // base
84532 => "https://base-sepolia.rpc.ithaca.xyz", // base-sepolia
27082 => "https://rpc.hoodi.ethpandaops.io", // hoodi
_ => "https://ethereum.reth.rs/rpc", // mainnet and fallback
27082 => "https://rpc.hoodi.ethpandaops.io", // hoodi
_ => "https://ethereum.reth.rs/rpc", // mainnet and fallback
}
}

View File

@@ -32,7 +32,7 @@ Otherwise, running `make maxperf` at the root of the repo should be sufficient f
The `reth-bench new-payload-fcu` command is the most representative of ethereum mainnet live sync, alternating between sending `engine_newPayload` calls and `engine_forkchoiceUpdated` calls.
The `new-payload-fcu` command supports two optional waiting modes that can be used together or independently:
- `--wait-time <duration>`: Fixed sleep interval between blocks (e.g., `--wait-time 100ms`)
- `--wait-time <duration>`: Fixed sleep interval between blocks (e.g., `--wait-time 100ms` or `--wait-time 400` for 400ms)
- `--wait-for-persistence`: Waits for blocks to be persisted using the `reth_subscribePersistedBlock` subscription
When using `--wait-for-persistence`, the benchmark waits after every `(threshold + 1)` blocks, where the threshold defaults to the engine's persistence threshold (2). This can be customized with `--persistence-threshold <N>`.
@@ -73,7 +73,7 @@ make profiling
If the purpose of the benchmark is to obtain `jemalloc` memory profiles that can then be analyzed by `jeprof`, it should be compiled with the `profiling` profile and the `jemalloc-prof` feature:
```bash
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --features "jemalloc-prof,asm-keccak"
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --features "jemalloc-prof"
```
> [!NOTE]
@@ -82,7 +82,7 @@ RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --features "jem
Finally, if the purpose of the benchmark is to profile the node when `snmalloc` is configured as the default allocator, it would be built with the following
command:
```bash
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --no-default-features --features "snmalloc-native,asm-keccak"
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --no-default-features --features "snmalloc-native,asm-keccak,min-debug-logs"
```
### Run the Benchmark:

View File

@@ -192,6 +192,15 @@ impl Command {
parent_header = block.header;
parent_hash = block_hash;
blocks_processed += 1;
let progress = match mode {
RampMode::Blocks(total) => format!("{blocks_processed}/{total}"),
RampMode::TargetGasLimit(target) => {
let pct = (parent_header.gas_limit as f64 / target as f64 * 100.0).min(100.0);
format!("{pct:.1}%")
}
};
info!(target: "reth-bench", progress, block_number = parent_header.number, gas_limit = parent_header.gas_limit, "Block processed");
}
let final_gas_limit = parent_header.gas_limit;

View File

@@ -2,7 +2,10 @@
use crate::valid_payload::call_forkchoice_updated;
use eyre::Result;
use std::io::{BufReader, Read};
use std::{
io::{BufReader, Read},
time::Duration,
};
/// Read input from either a file path or stdin.
pub(crate) fn read_input(path: Option<&str>) -> Result<String> {
@@ -51,6 +54,22 @@ pub(crate) fn parse_gas_limit(s: &str) -> eyre::Result<u64> {
let base: u64 = num_str.trim().parse()?;
base.checked_mul(multiplier).ok_or_else(|| eyre::eyre!("value overflow"))
}
/// Parses a duration string, treating bare integers as milliseconds.
///
/// Accepts either a `humantime` duration string (e.g. `"100ms"`, `"2s"`) or a plain
/// integer which is interpreted as milliseconds (e.g. `"400"` → 400ms).
pub(crate) fn parse_duration(s: &str) -> eyre::Result<Duration> {
match humantime::parse_duration(s) {
Ok(d) => Ok(d),
Err(_) => {
let millis: u64 =
s.trim().parse().map_err(|_| eyre::eyre!("invalid duration: {s:?}"))?;
Ok(Duration::from_millis(millis))
}
}
}
use alloy_consensus::Header;
use alloy_eips::eip4844::kzg_to_versioned_hash;
use alloy_primitives::{Address, B256};
@@ -270,4 +289,24 @@ mod tests {
assert!(parse_gas_limit("G").is_err());
assert!(parse_gas_limit("-1G").is_err());
}
#[test]
fn test_parse_duration_with_unit() {
assert_eq!(parse_duration("100ms").unwrap(), Duration::from_millis(100));
assert_eq!(parse_duration("2s").unwrap(), Duration::from_secs(2));
assert_eq!(parse_duration("1m").unwrap(), Duration::from_secs(60));
}
#[test]
fn test_parse_duration_bare_millis() {
assert_eq!(parse_duration("400").unwrap(), Duration::from_millis(400));
assert_eq!(parse_duration("0").unwrap(), Duration::from_millis(0));
assert_eq!(parse_duration("1000").unwrap(), Duration::from_millis(1000));
}
#[test]
fn test_parse_duration_errors() {
assert!(parse_duration("abc").is_err());
assert!(parse_duration("").is_err());
}
}

View File

@@ -12,12 +12,12 @@
use crate::{
bench::{
context::BenchContext,
helpers::parse_duration,
output::{
write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
},
persistence_waiter::{
derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
PERSISTENCE_CHECKPOINT_TIMEOUT,
},
},
valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
@@ -26,7 +26,6 @@ use alloy_provider::Provider;
use alloy_rpc_types_engine::ForkchoiceState;
use clap::Parser;
use eyre::{Context, OptionExt};
use humantime::parse_duration;
use reth_cli_runner::CliContext;
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
use reth_node_core::args::BenchmarkArgs;
@@ -41,6 +40,9 @@ pub struct Command {
rpc_url: String,
/// How long to wait after a forkchoice update before sending the next payload.
///
/// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
/// milliseconds (e.g. `400`).
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
wait_time: Option<Duration>,
@@ -67,6 +69,19 @@ pub struct Command {
)]
persistence_threshold: u64,
/// Timeout for waiting on persistence at each checkpoint.
///
/// Must be long enough to account for the persistence thread being blocked
/// by pruning after the previous save.
#[arg(
long = "persistence-timeout",
value_name = "PERSISTENCE_TIMEOUT",
value_parser = parse_duration,
default_value = "120s",
verbatim_doc_comment
)]
persistence_timeout: Duration,
/// The size of the block buffer (channel capacity) for prefetching blocks from the RPC
/// endpoint.
#[arg(
@@ -105,12 +120,12 @@ impl Command {
self.benchmark.ws_rpc_url.as_deref(),
&self.benchmark.engine_rpc_url,
)?;
let sub = setup_persistence_subscription(ws_url).await?;
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
Some(PersistenceWaiter::with_duration_and_subscription(
duration,
sub,
self.persistence_threshold,
PERSISTENCE_CHECKPOINT_TIMEOUT,
self.persistence_timeout,
))
}
(Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
@@ -119,11 +134,11 @@ impl Command {
self.benchmark.ws_rpc_url.as_deref(),
&self.benchmark.engine_rpc_url,
)?;
let sub = setup_persistence_subscription(ws_url).await?;
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
Some(PersistenceWaiter::with_subscription(
sub,
self.persistence_threshold,
PERSISTENCE_CHECKPOINT_TIMEOUT,
self.persistence_timeout,
))
}
(None, false) => None,
@@ -138,6 +153,7 @@ impl Command {
..
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let total_blocks = benchmark_mode.total_blocks();
let buffer_size = self.rpc_block_buffer_size;
// Use a oneshot channel to propagate errors from the spawned task
@@ -191,6 +207,7 @@ impl Command {
});
let mut results = Vec::new();
let mut blocks_processed = 0u64;
let total_benchmark_duration = Instant::now();
let mut total_wait_time = Duration::ZERO;
@@ -234,8 +251,13 @@ impl Command {
// Exclude time spent waiting on the block prefetch channel from the benchmark duration.
// We want to measure engine throughput, not RPC fetch latency.
blocks_processed += 1;
let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
info!(target: "reth-bench", %combined_result);
let progress = match total_blocks {
Some(total) => format!("{blocks_processed}/{total}"),
None => format!("{blocks_processed}"),
};
info!(target: "reth-bench", progress, %combined_result);
if let Some(w) = &mut waiter {
w.on_block(block_number).await?;

View File

@@ -52,6 +52,7 @@ impl Command {
..
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let total_blocks = benchmark_mode.total_blocks();
let buffer_size = self.rpc_block_buffer_size;
// Use a oneshot channel to propagate errors from the spawned task
@@ -82,8 +83,8 @@ impl Command {
}
});
// put results in a summary vec so they can be printed at the end
let mut results = Vec::new();
let mut blocks_processed = 0u64;
let total_benchmark_duration = Instant::now();
let mut total_wait_time = Duration::ZERO;
@@ -105,7 +106,12 @@ impl Command {
call_new_payload(&auth_provider, version, params).await?;
let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
info!(target: "reth-bench", %new_payload_result);
blocks_processed += 1;
let progress = match total_blocks {
Some(total) => format!("{blocks_processed}/{total}"),
None => format!("{blocks_processed}"),
};
info!(target: "reth-bench", progress, %new_payload_result);
// current duration since the start of the benchmark minus the time
// waiting for blocks

View File

@@ -22,9 +22,6 @@ use tracing::{debug, info};
const DEFAULT_WS_RPC_PORT: u16 = 8546;
use url::Url;
/// Default timeout for waiting on persistence.
pub(crate) const PERSISTENCE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(60);
/// Returns the websocket RPC URL used for the persistence subscription.
///
/// Preference:
@@ -157,12 +154,18 @@ impl PersistenceSubscription {
}
/// Establishes a websocket connection and subscribes to `reth_subscribePersistedBlock`.
///
/// The `keepalive_interval` is set to match `persistence_timeout` so that the `WebSocket`
/// connection is not dropped during long MDBX commits that block the server from responding
/// to pings.
pub(crate) async fn setup_persistence_subscription(
ws_url: Url,
persistence_timeout: Duration,
) -> eyre::Result<PersistenceSubscription> {
info!(target: "reth-bench", "Connecting to WebSocket at {} for persistence subscription", ws_url);
let ws_connect = WsConnect::new(ws_url.to_string());
let ws_connect =
WsConnect::new(ws_url.to_string()).with_keepalive_interval(persistence_timeout);
let client = RpcClient::connect_pubsub(ws_connect)
.await
.wrap_err("Failed to connect to WebSocket RPC endpoint")?;

View File

@@ -14,13 +14,13 @@
use crate::{
authenticated_transport::AuthenticatedTransportConnect,
bench::{
helpers::parse_duration,
output::{
write_benchmark_results, CombinedResult, GasRampPayloadFile, NewPayloadResult,
TotalGasOutput, TotalGasRow,
},
persistence_waiter::{
derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
PERSISTENCE_CHECKPOINT_TIMEOUT,
},
},
valid_payload::{call_forkchoice_updated, call_new_payload},
@@ -31,7 +31,6 @@ use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret};
use clap::Parser;
use eyre::Context;
use humantime::parse_duration;
use reth_cli_runner::CliContext;
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
use reth_node_api::EngineApiMessageVersion;
@@ -79,6 +78,9 @@ pub struct Command {
output: Option<PathBuf>,
/// How long to wait after a forkchoice update before sending the next payload.
///
/// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
/// milliseconds (e.g. `400`).
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
wait_time: Option<Duration>,
@@ -105,6 +107,19 @@ pub struct Command {
)]
persistence_threshold: u64,
/// Timeout for waiting on persistence at each checkpoint.
///
/// Must be long enough to account for the persistence thread being blocked
/// by pruning after the previous save.
#[arg(
long = "persistence-timeout",
value_name = "PERSISTENCE_TIMEOUT",
value_parser = parse_duration,
default_value = "120s",
verbatim_doc_comment
)]
persistence_timeout: Duration,
/// Optional `WebSocket` RPC URL for persistence subscription.
/// If not provided, derives from engine RPC URL by changing scheme to ws and port to 8546.
#[arg(long, value_name = "WS_RPC_URL", verbatim_doc_comment)]
@@ -154,22 +169,22 @@ impl Command {
let mut waiter = match (self.wait_time, self.wait_for_persistence) {
(Some(duration), true) => {
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
let sub = setup_persistence_subscription(ws_url).await?;
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
Some(PersistenceWaiter::with_duration_and_subscription(
duration,
sub,
self.persistence_threshold,
PERSISTENCE_CHECKPOINT_TIMEOUT,
self.persistence_timeout,
))
}
(Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
(None, true) => {
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
let sub = setup_persistence_subscription(ws_url).await?;
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
Some(PersistenceWaiter::with_subscription(
sub,
self.persistence_threshold,
PERSISTENCE_CHECKPOINT_TIMEOUT,
self.persistence_timeout,
))
}
(None, false) => None,
@@ -326,7 +341,8 @@ impl Command {
};
let current_duration = total_benchmark_duration.elapsed();
info!(target: "reth-bench", %combined_result);
let progress = format!("{}/{}", i + 1, payloads.len());
info!(target: "reth-bench", progress, %combined_result);
if let Some(w) = &mut waiter {
w.on_block(block_number).await?;

View File

@@ -20,6 +20,19 @@ impl BenchMode {
}
}
/// Returns the total number of blocks in the benchmark, if known.
///
/// For [`BenchMode::Range`] this is the length of the range.
/// For [`BenchMode::Continuous`] the total is unbounded, so `None` is returned.
pub const fn total_blocks(&self) -> Option<u64> {
match self {
Self::Continuous(_) => None,
Self::Range(range) => {
Some(range.end().saturating_sub(*range.start()).saturating_add(1))
}
}
}
/// Create a [`BenchMode`] from optional `from` and `to` fields.
pub fn new(from: Option<u64>, to: Option<u64>, latest_block: u64) -> Result<Self, eyre::Error> {
// If neither `--from` nor `--to` are provided, we will run the benchmark continuously,

View File

@@ -33,7 +33,6 @@ reth-chainspec.workspace = true
reth-primitives.workspace = true
reth-db = { workspace = true, features = ["mdbx"] }
reth-provider.workspace = true
reth-evm.workspace = true
reth-revm.workspace = true
reth-transaction-pool.workspace = true
reth-cli-runner.workspace = true
@@ -53,32 +52,31 @@ reth-payload-primitives.workspace = true
reth-node-api.workspace = true
reth-node-core.workspace = true
reth-ethereum-payload-builder.workspace = true
reth-ethereum-primitives.workspace = true
reth-node-ethereum.workspace = true
reth-node-builder.workspace = true
reth-node-metrics.workspace = true
reth-consensus.workspace = true
reth-tokio-util.workspace = true
reth-ress-protocol.workspace = true
reth-ress-provider.workspace = true
# alloy
alloy-primitives.workspace = true
alloy-rpc-types = { workspace = true, features = ["engine"] }
# tracing
tracing.workspace = true
# async
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] }
# misc
aquamarine.workspace = true
clap = { workspace = true, features = ["derive", "env"] }
eyre.workspace = true
[dev-dependencies]
alloy-node-bindings = "1.6.3"
alloy-provider = { workspace = true, features = ["reqwest"] }
alloy-rpc-types-eth.workspace = true
backon.workspace = true
serde_json.workspace = true
tempfile.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
toml.workspace = true
[features]
default = [
@@ -89,6 +87,7 @@ default = [
"js-tracer",
"keccak-cache-global",
"asm-keccak",
"min-debug-logs",
"rocksdb",
]
@@ -107,7 +106,6 @@ js-tracer = [
"reth-rpc-eth-types/js-tracer",
]
debug-jitter = ["reth-node-builder/debug-jitter"]
dev = ["reth-ethereum-cli/dev"]
asm-keccak = [
@@ -115,10 +113,12 @@ asm-keccak = [
"reth-primitives/asm-keccak",
"reth-ethereum-cli/asm-keccak",
"reth-node-ethereum/asm-keccak",
"alloy-primitives/asm-keccak",
]
keccak-cache-global = [
"reth-node-core/keccak-cache-global",
"reth-node-ethereum/keccak-cache-global",
"alloy-primitives/keccak-cache-global",
]
jemalloc = [
"reth-cli-util/jemalloc",

View File

@@ -51,6 +51,9 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg))]
// Used in feature flags only (`asm-keccak`, `keccak-cache-global`)
use alloy_primitives as _;
pub mod cli;
/// Re-exported utils.
@@ -205,12 +208,9 @@ pub mod rpc {
}
}
/// Ress subprotocol installation.
pub mod ress;
// re-export for convenience
#[doc(inline)]
pub use reth_cli_runner::{tokio_runtime, CliContext, CliRunner};
pub use reth_cli_runner::{CliContext, CliRunner};
// for rendering diagrams
use aquamarine as _;
@@ -218,3 +218,4 @@ use aquamarine as _;
// used in main
use clap as _;
use reth_cli_util as _;
use tracing as _;

View File

@@ -8,9 +8,8 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::ne
static MALLOC_CONF: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
use clap::Parser;
use reth::{args::RessArgs, cli::Cli, ress::install_ress_subprotocol};
use reth::cli::Cli;
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
use reth_node_builder::NodeHandle;
use reth_node_ethereum::EthereumNode;
use tracing::info;
@@ -22,27 +21,12 @@ fn main() {
unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
}
if let Err(err) =
Cli::<EthereumChainSpecParser, RessArgs>::parse().run(async move |builder, ress_args| {
info!(target: "reth::cli", "Launching node");
let NodeHandle { node, node_exit_future } =
builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?;
if let Err(err) = Cli::<EthereumChainSpecParser>::parse().run(async move |builder, _| {
info!(target: "reth::cli", "Launching node");
let handle = builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?;
// Install ress subprotocol.
if ress_args.enabled {
install_ress_subprotocol(
ress_args,
node.provider,
node.evm_config,
node.network,
node.task_executor,
node.add_ons_handle.engine_events.new_listener(),
)?;
}
node_exit_future.await
})
{
handle.wait_for_node_exit().await
}) {
eprintln!("Error: {err:?}");
std::process::exit(1);
}

View File

@@ -1,67 +0,0 @@
use reth_ethereum_primitives::EthPrimitives;
use reth_evm::ConfigureEvm;
use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols};
use reth_network_api::FullNetwork;
use reth_node_api::ConsensusEngineEvent;
use reth_node_core::args::RessArgs;
use reth_provider::providers::{BlockchainProvider, ProviderNodeTypes};
use reth_ress_protocol::{NodeType, ProtocolState, RessProtocolHandler};
use reth_ress_provider::{maintain_pending_state, PendingState, RethRessProtocolProvider};
use reth_tasks::TaskExecutor;
use reth_tokio_util::EventStream;
use tokio::sync::mpsc;
use tracing::*;
/// Install `ress` subprotocol if it's enabled.
pub fn install_ress_subprotocol<P, E, N>(
args: RessArgs,
provider: BlockchainProvider<P>,
evm_config: E,
network: N,
task_executor: TaskExecutor,
engine_events: EventStream<ConsensusEngineEvent<EthPrimitives>>,
) -> eyre::Result<()>
where
P: ProviderNodeTypes<Primitives = EthPrimitives>,
E: ConfigureEvm<Primitives = EthPrimitives> + Clone + 'static,
N: FullNetwork + NetworkProtocols,
{
info!(target: "reth::cli", "Installing ress subprotocol");
let pending_state = PendingState::default();
// Spawn maintenance task for pending state.
task_executor.spawn(maintain_pending_state(
engine_events,
provider.clone(),
pending_state.clone(),
));
let (tx, mut rx) = mpsc::unbounded_channel();
let provider = RethRessProtocolProvider::new(
provider,
evm_config,
Box::new(task_executor.clone()),
args.max_witness_window,
args.witness_max_parallel,
args.witness_cache_size,
pending_state,
)?;
network.add_rlpx_sub_protocol(
RessProtocolHandler {
provider,
node_type: NodeType::Stateful,
peers_handle: network.peers_handle().clone(),
max_active_connections: args.max_active_connections,
state: ProtocolState::new(tx),
}
.into_rlpx_sub_protocol(),
);
info!(target: "reth::cli", "Ress subprotocol support enabled");
task_executor.spawn(async move {
while let Some(event) = rx.recv().await {
trace!(target: "reth::ress", ?event, "Received ress event");
}
});
Ok(())
}

255
bin/reth/tests/it/main.rs Normal file
View File

@@ -0,0 +1,255 @@
#![allow(missing_docs)]
use std::process::Command;
const RETH: &str = env!("CARGO_BIN_EXE_reth");
// ── Helpers ──────────────────────────────────────────────────────────────────
/// Runs `reth <args>` and returns stdout, asserting exit code 0.
///
/// Tracing is suppressed via `RUST_LOG=off` so that log lines emitted during
/// binary startup don't pollute stdout-based assertions.
#[track_caller]
fn reth_ok(args: &[&str]) -> String {
let output = Command::new(RETH).env("RUST_LOG", "off").args(args).output().unwrap();
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(output.status.success(), "args {args:?} failed.\nstdout: {stdout}\nstderr: {stderr}");
stdout.into_owned()
}
/// Spawns an isolated dev-mode reth node.
///
/// Discovery is disabled and peer limits are zeroed so the node is fully
/// isolated. Each call gets a unique temporary data directory so that
/// concurrent test runs never collide on the default `reth/dev/` path.
fn spawn_dev() -> (alloy_node_bindings::RethInstance, tempfile::TempDir) {
use alloy_node_bindings::Reth;
let datadir = tempfile::tempdir().expect("failed to create temp dir");
let instance = Reth::at(RETH)
.dev()
.disable_discovery()
.data_dir(datadir.path())
.args(["--max-outbound-peers", "0", "--max-inbound-peers", "0"])
.spawn();
// Return the TempDir alongside the instance so it lives as long as the node.
(instance, datadir)
}
// ── Original tests (from PR #22069) ──────────────────────────────────────────
#[test]
fn help() {
let stdout = reth_ok(&["--help"]);
assert!(stdout.contains("Usage"), "stdout: {stdout}");
assert!(stdout.contains("node"), "stdout: {stdout}");
}
#[test]
fn version() {
let stdout = reth_ok(&["--version"]);
assert!(stdout.to_lowercase().contains("reth"), "stdout: {stdout}");
}
#[test]
fn node_help() {
let stdout = reth_ok(&["node", "--help"]);
assert!(stdout.contains("--dev"), "stdout: {stdout}");
assert!(stdout.contains("--http"), "stdout: {stdout}");
}
#[test]
fn unknown_subcommand() {
let output = Command::new(RETH).arg("definitely-not-a-cmd").output().unwrap();
assert!(!output.status.success());
}
#[test]
fn unknown_flag() {
let output = Command::new(RETH).args(["node", "--no-such-flag"]).output().unwrap();
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(!output.status.success());
assert!(stderr.contains("--no-such-flag"), "stderr: {stderr}");
}
#[tokio::test]
async fn dev_node_eth_syncing() {
use alloy_provider::{Provider, ProviderBuilder};
let (reth, _datadir) = spawn_dev();
let provider = ProviderBuilder::new().connect_http(reth.endpoint().parse().unwrap());
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let _syncing = provider.syncing().await.expect("eth_syncing failed");
}
// ── Subcommand --help coverage ───────────────────────────────────────────────
//
// Every registered subcommand must produce valid --help output. This catches
// clap wiring regressions (e.g. a missing field, a conflicting arg name, or a
// broken `help_message()` call) that would otherwise only surface when a user
// runs the command.
#[test]
fn init_help() {
let stdout = reth_ok(&["init", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn init_state_help() {
let stdout = reth_ok(&["init-state", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn import_help() {
let stdout = reth_ok(&["import", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn import_era_help() {
let stdout = reth_ok(&["import-era", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn export_era_help() {
let stdout = reth_ok(&["export-era", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn dump_genesis_help() {
let stdout = reth_ok(&["dump-genesis", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn db_help() {
let stdout = reth_ok(&["db", "--help"]);
assert!(stdout.contains("stats"), "stdout: {stdout}");
}
#[test]
fn stage_help() {
let stdout = reth_ok(&["stage", "--help"]);
assert!(stdout.contains("run"), "stdout: {stdout}");
}
#[test]
fn p2p_help() {
let stdout = reth_ok(&["p2p", "--help"]);
assert!(stdout.contains("header"), "stdout: {stdout}");
}
#[test]
fn config_help() {
let stdout = reth_ok(&["config", "--help"]);
assert!(stdout.contains("--default"), "stdout: {stdout}");
}
#[test]
fn prune_help() {
let stdout = reth_ok(&["prune", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn download_help() {
let stdout = reth_ok(&["download", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn re_execute_help() {
let stdout = reth_ok(&["re-execute", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
// ── `config --default` outputs valid TOML ────────────────────────────────────
#[test]
fn config_default_valid_toml() {
let stdout = reth_ok(&["config", "--default"]);
let parsed: toml::Value =
toml::from_str(&stdout).expect("config --default did not produce valid TOML");
// The default config must contain the [stages] table — this is the heart of
// the pipeline configuration and its absence would indicate a serialization
// regression.
assert!(parsed.get("stages").is_some(), "missing [stages] in config output");
}
// ── `dump-genesis` outputs valid JSON ────────────────────────────────────────
#[test]
fn dump_genesis_mainnet_valid_json() {
let stdout = reth_ok(&["dump-genesis"]);
let genesis: serde_json::Value =
serde_json::from_str(&stdout).expect("dump-genesis did not produce valid JSON");
assert!(genesis.get("nonce").is_some(), "missing nonce in genesis JSON");
assert!(genesis.get("alloc").is_some(), "missing alloc in genesis JSON");
}
#[test]
fn dump_genesis_sepolia_valid_json() {
let stdout = reth_ok(&["dump-genesis", "--chain", "sepolia"]);
let genesis: serde_json::Value = serde_json::from_str(&stdout)
.expect("dump-genesis --chain sepolia did not produce valid JSON");
assert!(genesis.get("alloc").is_some(), "missing alloc in sepolia genesis JSON");
}
// ── Dev node: send transaction round-trip ────────────────────────────────────
//
// Exercises the full pipeline: RPC submission → mempool → sealing → execution →
// receipt retrieval. Uses the pre-funded dev account so no genesis customization
// is required.
#[tokio::test]
async fn dev_node_send_tx_and_mine() {
use alloy_primitives::{Address, U256};
use alloy_provider::{Provider, ProviderBuilder};
use alloy_rpc_types_eth::TransactionRequest;
let (reth, _datadir) = spawn_dev();
let provider = ProviderBuilder::new().connect_http(reth.endpoint().parse().unwrap());
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// Dev mode pre-funds the first dev account.
let accounts = provider.get_accounts().await.expect("eth_accounts failed");
assert!(!accounts.is_empty(), "dev node should expose at least one account");
let sender = accounts[0];
let recipient = Address::with_last_byte(0x42);
let tx = TransactionRequest::default().from(sender).to(recipient).value(U256::from(1_000_000));
let tx_hash = provider.send_transaction(tx).await.expect("eth_sendTransaction failed");
// In dev/instant-mine mode the node seals a block for each transaction, so
// the receipt becomes available almost immediately.
let receipt = tx_hash.get_receipt().await.expect("failed to get receipt");
assert!(receipt.status(), "transaction should have succeeded");
assert_eq!(receipt.to, Some(recipient));
assert!(receipt.block_number.unwrap() > 0, "receipt should be in a mined block");
// Verify the transfer actually mutated state.
let balance = provider.get_balance(recipient).await.expect("eth_getBalance failed");
assert_eq!(balance, U256::from(1_000_000));
}
const fn main() {}

View File

@@ -312,6 +312,11 @@ impl DeferredTrieData {
/// Given that invariant, circular wait dependencies are impossible.
#[instrument(level = "debug", target = "engine::tree::deferred_trie", skip_all)]
pub fn wait_cloned(&self) -> ComputedTrieData {
#[cfg(feature = "rayon")]
debug_assert!(
rayon::current_thread_index().is_none(),
"wait_cloned must not be called from a rayon worker thread"
);
let mut state = self.state.lock();
match &mut *state {
// If the deferred trie data is ready, return the cached result.

View File

@@ -1061,6 +1061,14 @@ mod tests {
) -> ProviderResult<Option<StorageValue>> {
Ok(None)
}
fn storage_by_hashed_key(
&self,
_address: Address,
_hashed_storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
Ok(None)
}
}
impl BytecodeReader for MockStateProvider {

View File

@@ -223,6 +223,26 @@ impl<N: NodePrimitives> StateProvider for MemoryOverlayStateProviderRef<'_, N> {
self.historical.storage(address, storage_key)
}
fn storage_by_hashed_key(
&self,
address: Address,
hashed_storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
let hashed_address = keccak256(address);
let state = &self.trie_input().state;
if let Some(hs) = state.storages.get(&hashed_address) {
if let Some(value) = hs.storage.get(&hashed_storage_key) {
return Ok(Some(*value));
}
if hs.wiped {
return Ok(Some(StorageValue::ZERO));
}
}
self.historical.storage_by_hashed_key(address, hashed_storage_key)
}
}
impl<N: NodePrimitives> BytecodeReader for MemoryOverlayStateProviderRef<'_, N> {

View File

@@ -39,10 +39,7 @@ use reth_ethereum_forks::{
ChainHardforks, DisplayHardforks, EthereumHardfork, EthereumHardforks, ForkCondition,
ForkFilter, ForkFilterKey, ForkHash, ForkId, Hardfork, Hardforks, Head, DEV_HARDFORKS,
};
use reth_network_peers::{
holesky_nodes, hoodi_nodes, mainnet_nodes, op_nodes, op_testnet_nodes, sepolia_nodes,
NodeRecord,
};
use reth_network_peers::{holesky_nodes, hoodi_nodes, mainnet_nodes, sepolia_nodes, NodeRecord};
use reth_primitives_traits::{sync::LazyLock, BlockHeader, SealedHeader};
/// Helper method building a [`Header`] given [`Genesis`] and [`ChainHardforks`].
@@ -780,15 +777,6 @@ impl<H: BlockHeader> ChainSpec<H> {
C::Sepolia => Some(sepolia_nodes()),
C::Holesky => Some(holesky_nodes()),
C::Hoodi => Some(hoodi_nodes()),
// opstack uses the same bootnodes for all chains: <https://github.com/paradigmxyz/reth/issues/14603>
C::Base | C::Optimism | C::Unichain | C::World => Some(op_nodes()),
C::OptimismSepolia | C::BaseSepolia | C::UnichainSepolia | C::WorldSepolia => {
Some(op_testnet_nodes())
}
// fallback for optimism chains
chain if chain.is_optimism() && chain.is_testnet() => Some(op_testnet_nodes()),
chain if chain.is_optimism() => Some(op_nodes()),
_ => None,
}
}

View File

@@ -66,7 +66,8 @@ pub trait RethCli: Sized {
F: FnOnce(Self, CliRunner) -> R,
{
let cli = Self::parse_args()?;
let runner = CliRunner::try_default_runtime()?;
let runner = CliRunner::try_default_runtime()
.map_err(|e| Error::raw(clap::error::ErrorKind::Io, e))?;
Ok(cli.with_runner(f, runner))
}

View File

@@ -19,7 +19,7 @@ use reth_node_builder::{
Node, NodeComponents, NodeComponentsBuilder, NodeTypes, NodeTypesWithDBAdapter,
};
use reth_node_core::{
args::{DatabaseArgs, DatadirArgs, RocksDbArgs, StaticFilesArgs, StorageArgs},
args::{DatabaseArgs, DatadirArgs, StaticFilesArgs, StorageArgs},
dirs::{ChainPath, DataDirPath},
};
use reth_provider::{
@@ -67,70 +67,35 @@ pub struct EnvironmentArgs<C: ChainSpecParser> {
#[command(flatten)]
pub static_files: StaticFilesArgs,
/// All `RocksDB` related arguments
#[command(flatten)]
pub rocksdb: RocksDbArgs,
/// Storage mode configuration (v2 vs v1/legacy)
#[command(flatten)]
pub storage: StorageArgs,
}
impl<C: ChainSpecParser> EnvironmentArgs<C> {
/// Returns the effective storage settings derived from `--storage.v2`, static-file, and
/// `RocksDB` CLI args.
/// Returns the effective storage settings derived from `--storage.v2`.
///
/// The base storage mode is determined by `--storage.v2`:
/// - When `--storage.v2` is set: uses [`StorageSettings::v2()`] defaults
/// - Otherwise: uses [`StorageSettings::v1()`] defaults
///
/// Individual `--static-files.*` and `--rocksdb.*` flags override the base when explicitly set.
/// - Otherwise: uses [`StorageSettings::base()`] defaults
pub fn storage_settings(&self) -> StorageSettings {
let mut s = if self.storage.v2 { StorageSettings::v2() } else { StorageSettings::base() };
// Apply static files overrides (only when explicitly set)
if let Some(v) = self.static_files.receipts {
s = s.with_receipts_in_static_files(v);
if self.storage.v2 {
StorageSettings::v2()
} else {
StorageSettings::base()
}
if let Some(v) = self.static_files.transaction_senders {
s = s.with_transaction_senders_in_static_files(v);
}
if let Some(v) = self.static_files.account_changesets {
s = s.with_account_changesets_in_static_files(v);
}
if let Some(v) = self.static_files.storage_changesets {
s = s.with_storage_changesets_in_static_files(v);
}
// Apply rocksdb overrides
// --rocksdb.all sets all rocksdb flags to true
if self.rocksdb.all {
s = s
.with_transaction_hash_numbers_in_rocksdb(true)
.with_storages_history_in_rocksdb(true)
.with_account_history_in_rocksdb(true);
}
// Individual rocksdb flags override --rocksdb.all when explicitly set
if let Some(v) = self.rocksdb.tx_hash {
s = s.with_transaction_hash_numbers_in_rocksdb(v);
}
if let Some(v) = self.rocksdb.storages_history {
s = s.with_storages_history_in_rocksdb(v);
}
if let Some(v) = self.rocksdb.account_history {
s = s.with_account_history_in_rocksdb(v);
}
s
}
/// Initializes environment according to [`AccessRights`] and returns an instance of
/// [`Environment`].
///
/// Internally builds a [`reth_tasks::Runtime`] attached to the current tokio handle for
/// parallel storage I/O.
pub fn init<N: CliNodeTypes>(&self, access: AccessRights) -> eyre::Result<Environment<N>>
where
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
{
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
let data_dir = self.datadir.clone().resolve_datadir(self.chain.chain());
let db_path = data_dir.db();
let sf_path = data_dir.static_files();
@@ -186,7 +151,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
.build()?;
let provider_factory =
self.create_provider_factory(&config, db, sfp, rocksdb_provider, access)?;
self.create_provider_factory(&config, db, sfp, rocksdb_provider, access, runtime)?;
if access.is_read_write() {
debug!(target: "reth::cli", chain=%self.chain.chain(), genesis=?self.chain.genesis_hash(), "Initializing genesis");
init_genesis_with_settings(&provider_factory, self.storage_settings())?;
@@ -207,6 +172,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
static_file_provider: StaticFileProvider<N::Primitives>,
rocksdb_provider: RocksDBProvider,
access: AccessRights,
runtime: reth_tasks::Runtime,
) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>>>
where
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
@@ -217,6 +183,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
self.chain.clone(),
static_file_provider,
rocksdb_provider,
runtime,
)?
.with_prune_modes(prune_modes.clone());

View File

@@ -5,6 +5,7 @@ use reth_codecs::Compact;
use reth_db_api::{cursor::DbDupCursorRO, database::Database, tables, transaction::DbTx};
use reth_db_common::DbTool;
use reth_node_builder::NodeTypesWithDB;
use reth_storage_api::StorageSettingsCache;
use std::time::{Duration, Instant};
use tracing::info;
@@ -22,52 +23,94 @@ impl Command {
/// Execute `db account-storage` command
pub fn execute<N: NodeTypesWithDB>(self, tool: &DbTool<N>) -> eyre::Result<()> {
let address = self.address;
let (slot_count, plain_size) = tool.provider_factory.db_ref().view(|tx| {
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let mut count = 0usize;
let mut total_value_bytes = 0usize;
let mut last_log = Instant::now();
let use_hashed_state = tool.provider_factory.cached_storage_settings().use_hashed_state();
// Walk all storage entries for this address
let walker = cursor.walk_dup(Some(address), None)?;
for entry in walker {
let (_, storage_entry) = entry?;
count += 1;
// StorageEntry encodes as: 32 bytes (key/subkey uncompressed) + compressed U256
let mut buf = Vec::new();
let entry_len = storage_entry.to_compact(&mut buf);
total_value_bytes += entry_len;
let (slot_count, storage_size) = if use_hashed_state {
let hashed_address = keccak256(address);
tool.provider_factory.db_ref().view(|tx| {
let mut cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
let mut count = 0usize;
let mut total_value_bytes = 0usize;
let mut last_log = Instant::now();
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
address = %address,
slots = count,
key = %storage_entry.key,
"Processing storage slots"
);
last_log = Instant::now();
let walker = cursor.walk_dup(Some(hashed_address), None)?;
for entry in walker {
let (_, storage_entry) = entry?;
count += 1;
let mut buf = Vec::new();
let entry_len = storage_entry.to_compact(&mut buf);
total_value_bytes += entry_len;
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
address = %address,
slots = count,
key = %storage_entry.key,
"Processing hashed storage slots"
);
last_log = Instant::now();
}
}
}
// Add 20 bytes for the Address key (stored once per account in dupsort)
let total_size = if count > 0 { 20 + total_value_bytes } else { 0 };
let total_size = if count > 0 { 32 + total_value_bytes } else { 0 };
Ok::<_, eyre::Report>((count, total_size))
})??;
Ok::<_, eyre::Report>((count, total_size))
})??
} else {
tool.provider_factory.db_ref().view(|tx| {
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let mut count = 0usize;
let mut total_value_bytes = 0usize;
let mut last_log = Instant::now();
// Estimate hashed storage size: 32-byte B256 key instead of 20-byte Address
let hashed_size_estimate = if slot_count > 0 { plain_size + 12 } else { 0 };
let total_estimate = plain_size + hashed_size_estimate;
// Walk all storage entries for this address
let walker = cursor.walk_dup(Some(address), None)?;
for entry in walker {
let (_, storage_entry) = entry?;
count += 1;
let mut buf = Vec::new();
// StorageEntry encodes as: 32 bytes (key/subkey uncompressed) + compressed U256
let entry_len = storage_entry.to_compact(&mut buf);
total_value_bytes += entry_len;
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
address = %address,
slots = count,
key = %storage_entry.key,
"Processing storage slots"
);
last_log = Instant::now();
}
}
// Add 20 bytes for the Address key (stored once per account in dupsort)
let total_size = if count > 0 { 20 + total_value_bytes } else { 0 };
Ok::<_, eyre::Report>((count, total_size))
})??
};
let hashed_address = keccak256(address);
println!("Account: {address}");
println!("Hashed address: {hashed_address}");
println!("Storage slots: {slot_count}");
println!("Plain storage size: {} (estimated)", human_bytes(plain_size as f64));
println!("Hashed storage size: {} (estimated)", human_bytes(hashed_size_estimate as f64));
println!("Total estimated size: {}", human_bytes(total_estimate as f64));
if use_hashed_state {
println!("Hashed storage size: {} (estimated)", human_bytes(storage_size as f64));
} else {
// Estimate hashed storage size: 32-byte B256 key instead of 20-byte Address
let hashed_size_estimate = if slot_count > 0 { storage_size + 12 } else { 0 };
let total_estimate = storage_size + hashed_size_estimate;
println!("Plain storage size: {} (estimated)", human_bytes(storage_size as f64));
println!(
"Hashed storage size: {} (estimated)",
human_bytes(hashed_size_estimate as f64)
);
println!("Total estimated size: {}", human_bytes(total_estimate as f64));
}
Ok(())
}

View File

@@ -0,0 +1,61 @@
use clap::Parser;
use reth_db::mdbx::{self, ffi};
use std::path::PathBuf;
/// Copies the MDBX database to a new location.
///
/// Equivalent to the standalone `mdbx_copy` tool but bundled into reth.
#[derive(Parser, Debug)]
pub struct Command {
/// Destination path for the database copy.
dest: PathBuf,
/// Compact the database while copying (reclaims free space).
#[arg(short, long)]
compact: bool,
/// Force dynamic size for the destination database.
#[arg(short = 'd', long)]
force_dynamic_size: bool,
/// Throttle to avoid MVCC pressure on writers.
#[arg(short = 'p', long)]
throttle_mvcc: bool,
}
impl Command {
/// Execute `db copy` command
pub fn execute(self, db: &mdbx::DatabaseEnv) -> eyre::Result<()> {
let mut flags: ffi::MDBX_copy_flags_t = ffi::MDBX_CP_DEFAULTS;
if self.compact {
flags |= ffi::MDBX_CP_COMPACT;
}
if self.force_dynamic_size {
flags |= ffi::MDBX_CP_FORCE_DYNAMIC_SIZE;
}
if self.throttle_mvcc {
flags |= ffi::MDBX_CP_THROTTLE_MVCC;
}
let dest = self
.dest
.to_str()
.ok_or_else(|| eyre::eyre!("destination path must be valid UTF-8"))?;
let dest_cstr = std::ffi::CString::new(dest)?;
println!("Copying database to {} ...", self.dest.display());
let rc = db.with_raw_env_ptr(|env_ptr| unsafe {
ffi::mdbx_env_copy(env_ptr, dest_cstr.as_ptr(), flags)
});
if rc != 0 {
eyre::bail!("mdbx_env_copy failed with error code {rc}: {}", unsafe {
std::ffi::CStr::from_ptr(ffi::mdbx_strerror(rc)).to_string_lossy()
});
}
println!("Done.");
Ok(())
}
}

View File

@@ -98,7 +98,8 @@ impl Command {
)?;
if let Some(entry) = entry {
println!("{}", serde_json::to_string_pretty(&entry)?);
let se: reth_primitives_traits::StorageEntry = entry.into();
println!("{}", serde_json::to_string_pretty(&se)?);
} else {
error!(target: "reth::cli", "No content for the given table key.");
}
@@ -106,7 +107,14 @@ impl Command {
}
let changesets = provider.storage_changeset(key.block_number())?;
println!("{}", serde_json::to_string_pretty(&changesets)?);
let serializable: Vec<_> = changesets
.into_iter()
.map(|(addr, entry)| {
let se: reth_primitives_traits::StorageEntry = entry.into();
(addr, se)
})
.collect();
println!("{}", serde_json::to_string_pretty(&serializable)?);
return Ok(());
}

View File

@@ -12,6 +12,7 @@ use std::{
mod account_storage;
mod checksum;
mod clear;
mod copy;
mod diff;
mod get;
mod list;
@@ -42,6 +43,8 @@ pub enum Subcommands {
List(list::Command),
/// Calculates the content checksum of a table or static file segment
Checksum(checksum::Command),
/// Copies the MDBX database to a new location (bundled mdbx_copy)
Copy(copy::Command),
/// Create a diff between two database tables or two entire databases.
Diff(diff::Command),
/// Gets the content of a table for the given key
@@ -70,23 +73,23 @@ pub enum Subcommands {
State(state::Command),
}
/// Initializes a provider factory with specified access rights, and then execute with the provided
/// command
macro_rules! db_exec {
($env:expr, $tool:ident, $N:ident, $access_rights:expr, $command:block) => {
let Environment { provider_factory, .. } = $env.init::<$N>($access_rights)?;
let $tool = DbTool::new(provider_factory)?;
$command;
};
}
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C> {
/// Execute `db` command
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(
self,
ctx: CliContext,
) -> eyre::Result<()> {
/// Initializes a provider factory with specified access rights, and then executes the
/// provided command.
macro_rules! db_exec {
($env:expr, $tool:ident, $N:ident, $access_rights:expr, $command:block) => {
let Environment { provider_factory, .. } = $env.init::<$N>($access_rights)?;
let $tool = DbTool::new(provider_factory)?;
$command;
};
}
let data_dir = self.env.datadir.clone().resolve_datadir(self.env.chain.chain());
let db_path = data_dir.db();
let static_files_path = data_dir.static_files();
@@ -124,6 +127,11 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
command.execute(&tool)?;
});
}
Subcommands::Copy(command) => {
db_exec!(self.env, tool, N, AccessRights::RO, {
command.execute(tool.provider_factory.db_ref())?;
});
}
Subcommands::Diff(command) => {
db_exec!(self.env, tool, N, AccessRights::RO, {
command.execute(&tool)?;

View File

@@ -64,7 +64,7 @@ impl Command {
let executor = task_executor.clone();
let pprof_dump_dir = data_dir.pprof_dumps();
let handle = task_executor.spawn_critical("metrics server", async move {
let handle = task_executor.spawn_critical_task("metrics server", async move {
let config = MetricServerConfig::new(
listen_addr,
VersionInfo {

View File

@@ -39,38 +39,12 @@ enum Subcommands {
#[derive(Debug, Clone, Copy, Subcommand)]
#[clap(rename_all = "snake_case")]
pub enum SetCommand {
/// Store receipts in static files instead of the database
Receipts {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store transaction senders in static files instead of the database
TransactionSenders {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store account changesets in static files instead of the database
AccountChangesets {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store storage history in rocksdb instead of MDBX
StoragesHistory {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store transaction hash to number mapping in rocksdb instead of MDBX
TransactionHashNumbers {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store account history in rocksdb instead of MDBX
AccountHistory {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store storage changesets in static files instead of the database
StorageChangesets {
/// Enable or disable v2 storage layout
///
/// When enabled, uses static files for receipts/senders/changesets and RocksDB for
/// history indices and transaction hashes. When disabled, uses v1/legacy layout (everything in
/// MDBX).
V2 {
#[clap(action(ArgAction::Set))]
value: bool,
},
@@ -113,73 +87,18 @@ impl Command {
println!("No storage settings found, creating new settings.");
}
let mut settings @ StorageSettings {
receipts_in_static_files: _,
transaction_senders_in_static_files: _,
storages_history_in_rocksdb: _,
transaction_hash_numbers_in_rocksdb: _,
account_history_in_rocksdb: _,
account_changesets_in_static_files: _,
storage_changesets_in_static_files: _,
} = settings.unwrap_or_else(StorageSettings::v1);
let mut settings @ StorageSettings { storage_v2: _ } =
settings.unwrap_or_else(StorageSettings::v1);
// Update the setting based on the key
match cmd {
SetCommand::Receipts { value } => {
if settings.receipts_in_static_files == value {
println!("receipts_in_static_files is already set to {}", value);
SetCommand::V2 { value } => {
if settings.storage_v2 == value {
println!("storage_v2 is already set to {}", value);
return Ok(());
}
settings.receipts_in_static_files = value;
println!("Set receipts_in_static_files = {}", value);
}
SetCommand::TransactionSenders { value } => {
if settings.transaction_senders_in_static_files == value {
println!("transaction_senders_in_static_files is already set to {}", value);
return Ok(());
}
settings.transaction_senders_in_static_files = value;
println!("Set transaction_senders_in_static_files = {}", value);
}
SetCommand::AccountChangesets { value } => {
if settings.account_changesets_in_static_files == value {
println!("account_changesets_in_static_files is already set to {}", value);
return Ok(());
}
settings.account_changesets_in_static_files = value;
println!("Set account_changesets_in_static_files = {}", value);
}
SetCommand::StoragesHistory { value } => {
if settings.storages_history_in_rocksdb == value {
println!("storages_history_in_rocksdb is already set to {}", value);
return Ok(());
}
settings.storages_history_in_rocksdb = value;
println!("Set storages_history_in_rocksdb = {}", value);
}
SetCommand::TransactionHashNumbers { value } => {
if settings.transaction_hash_numbers_in_rocksdb == value {
println!("transaction_hash_numbers_in_rocksdb is already set to {}", value);
return Ok(());
}
settings.transaction_hash_numbers_in_rocksdb = value;
println!("Set transaction_hash_numbers_in_rocksdb = {}", value);
}
SetCommand::AccountHistory { value } => {
if settings.account_history_in_rocksdb == value {
println!("account_history_in_rocksdb is already set to {}", value);
return Ok(());
}
settings.account_history_in_rocksdb = value;
println!("Set account_history_in_rocksdb = {}", value);
}
SetCommand::StorageChangesets { value } => {
if settings.storage_changesets_in_static_files == value {
println!("storage_changesets_in_static_files is already set to {}", value);
return Ok(());
}
settings.storage_changesets_in_static_files = value;
println!("Set storage_changesets_in_static_files = {}", value);
settings.storage_v2 = value;
println!("Set storage_v2 = {}", value);
}
}

View File

@@ -1,4 +1,4 @@
use alloy_primitives::{Address, BlockNumber, B256, U256};
use alloy_primitives::{keccak256, Address, BlockNumber, B256, U256};
use clap::Parser;
use parking_lot::Mutex;
use reth_db_api::{
@@ -63,39 +63,65 @@ impl Command {
address: Address,
limit: usize,
) -> eyre::Result<()> {
let use_hashed_state = tool.provider_factory.cached_storage_settings().use_hashed_state();
let entries = tool.provider_factory.db_ref().view(|tx| {
// Get account info
let account = tx.get::<tables::PlainAccountState>(address)?;
// Get storage entries
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let mut entries = Vec::new();
let mut last_log = Instant::now();
let walker = cursor.walk_dup(Some(address), None)?;
for (idx, entry) in walker.enumerate() {
let (_, storage_entry) = entry?;
if storage_entry.value != U256::ZERO {
entries.push((storage_entry.key, storage_entry.value));
let (account, walker_entries) = if use_hashed_state {
let hashed_address = keccak256(address);
let account = tx.get::<tables::HashedAccounts>(hashed_address)?;
let mut cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
let walker = cursor.walk_dup(Some(hashed_address), None)?;
let mut entries = Vec::new();
let mut last_log = Instant::now();
for (idx, entry) in walker.enumerate() {
let (_, storage_entry) = entry?;
if storage_entry.value != U256::ZERO {
entries.push((storage_entry.key, storage_entry.value));
}
if entries.len() >= limit {
break;
}
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
address = %address,
slots_scanned = idx,
"Scanning storage slots"
);
last_log = Instant::now();
}
}
if entries.len() >= limit {
break;
(account, entries)
} else {
// Get account info
let account = tx.get::<tables::PlainAccountState>(address)?;
// Get storage entries
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let walker = cursor.walk_dup(Some(address), None)?;
let mut entries = Vec::new();
let mut last_log = Instant::now();
for (idx, entry) in walker.enumerate() {
let (_, storage_entry) = entry?;
if storage_entry.value != U256::ZERO {
entries.push((storage_entry.key, storage_entry.value));
}
if entries.len() >= limit {
break;
}
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
address = %address,
slots_scanned = idx,
"Scanning storage slots"
);
last_log = Instant::now();
}
}
(account, entries)
};
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
address = %address,
slots_scanned = idx,
"Scanning storage slots"
);
last_log = Instant::now();
}
}
Ok::<_, eyre::Report>((account, entries))
Ok::<_, eyre::Report>((account, walker_entries))
})??;
let (account, storage_entries) = entries;
@@ -119,7 +145,7 @@ impl Command {
// Check storage settings to determine where history is stored
let storage_settings = tool.provider_factory.cached_storage_settings();
let history_in_rocksdb = storage_settings.storages_history_in_rocksdb;
let history_in_rocksdb = storage_settings.storage_v2;
// For historical queries, enumerate keys from history indices only
// (not PlainStorageState, which reflects current state)

View File

@@ -37,6 +37,14 @@ pub struct DownloadDefaults {
pub available_snapshots: Vec<Cow<'static, str>>,
/// Default base URL for snapshots
pub default_base_url: Cow<'static, str>,
/// Default base URL for chain-aware snapshots.
///
/// When set, the chain ID is appended to form the full URL: `{base_url}/{chain_id}`.
/// For example, given a base URL of `https://snapshots.example.com` and chain ID `1`,
/// the resulting URL would be `https://snapshots.example.com/1`.
///
/// Falls back to [`default_base_url`](Self::default_base_url) when `None`.
pub default_chain_aware_base_url: Option<Cow<'static, str>>,
/// Optional custom long help text that overrides the generated help
pub long_help: Option<String>,
}
@@ -60,6 +68,7 @@ impl DownloadDefaults {
Cow::Borrowed("https://publicnode.com/snapshots (full nodes & testnets)"),
],
default_base_url: Cow::Borrowed(MERKLE_BASE_URL),
default_chain_aware_base_url: None,
long_help: None,
}
}
@@ -84,9 +93,11 @@ impl DownloadDefaults {
}
help.push_str(
"\nIf no URL is provided, the latest mainnet archive snapshot\nwill be proposed for download from ",
"\nIf no URL is provided, the latest archive snapshot for the selected chain\nwill be proposed for download from ",
);
help.push_str(
self.default_chain_aware_base_url.as_deref().unwrap_or(&self.default_base_url),
);
help.push_str(self.default_base_url.as_ref());
help.push_str(
".\n\nLocal file:// URLs are also supported for extracting snapshots from disk.",
);
@@ -111,6 +122,12 @@ impl DownloadDefaults {
self
}
/// Set the default chain-aware base URL.
pub fn with_chain_aware_base_url(mut self, url: impl Into<Cow<'static, str>>) -> Self {
self.default_chain_aware_base_url = Some(url.into());
self
}
/// Builder: Set custom long help text, overriding the generated help
pub fn with_long_help(mut self, help: impl Into<String>) -> Self {
self.long_help = Some(help.into());
@@ -142,7 +159,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
let url = match self.url {
Some(url) => url,
None => {
let url = get_latest_snapshot_url().await?;
let url = get_latest_snapshot_url(self.env.chain.chain().id()).await?;
info!(target: "reth::cli", "Using default snapshot URL: {}", url);
url
}
@@ -509,8 +526,12 @@ async fn stream_and_extract(url: &str, target_dir: &Path) -> Result<()> {
}
// Builds default URL for latest mainnet archive snapshot using configured defaults
async fn get_latest_snapshot_url() -> Result<String> {
let base_url = &DownloadDefaults::get_global().default_base_url;
async fn get_latest_snapshot_url(chain_id: u64) -> Result<String> {
let defaults = DownloadDefaults::get_global();
let base_url = match &defaults.default_chain_aware_base_url {
Some(url) => format!("{url}/{chain_id}"),
None => defaults.default_base_url.to_string(),
};
let latest_url = format!("{base_url}/latest.txt");
let filename = Client::new()
.get(latest_url)

View File

@@ -10,8 +10,8 @@ use reth_node_builder::NodeBuilder;
use reth_node_core::{
args::{
DatabaseArgs, DatadirArgs, DebugArgs, DevArgs, EngineArgs, EraArgs, MetricArgs,
NetworkArgs, PayloadBuilderArgs, PruningArgs, RocksDbArgs, RpcServerArgs, StaticFilesArgs,
StorageArgs, TxPoolArgs,
NetworkArgs, PayloadBuilderArgs, PruningArgs, RpcServerArgs, StaticFilesArgs, StorageArgs,
TxPoolArgs,
},
node_config::NodeConfig,
version,
@@ -103,10 +103,6 @@ pub struct NodeCommand<C: ChainSpecParser, Ext: clap::Args + fmt::Debug = NoArgs
#[command(flatten)]
pub pruning: PruningArgs,
/// All `RocksDB` table routing arguments
#[command(flatten)]
pub rocksdb: RocksDbArgs,
/// Engine cli arguments
#[command(flatten, next_help_heading = "Engine")]
pub engine: EngineArgs,
@@ -119,8 +115,8 @@ pub struct NodeCommand<C: ChainSpecParser, Ext: clap::Args + fmt::Debug = NoArgs
#[command(flatten, next_help_heading = "Static Files")]
pub static_files: StaticFilesArgs,
/// Storage mode configuration (v2 vs v1/legacy)
#[command(flatten)]
/// All storage related arguments with --storage prefix
#[command(flatten, next_help_heading = "Storage")]
pub storage: StorageArgs,
/// Additional cli arguments
@@ -175,7 +171,6 @@ where
db,
dev,
pruning,
rocksdb,
engine,
era,
static_files,
@@ -183,9 +178,6 @@ where
ext,
} = self;
// Validate RocksDB arguments
rocksdb.validate()?;
// set up node config
let mut node_config = NodeConfig {
datadir,
@@ -201,7 +193,6 @@ where
db,
dev,
pruning,
rocksdb,
engine,
era,
static_files,

View File

@@ -76,7 +76,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> PruneComma
// Set up cancellation token for graceful shutdown on Ctrl+C
let cancellation = CancellationToken::new();
let cancellation_clone = cancellation.clone();
ctx.task_executor.spawn_critical("prune-ctrl-c", async move {
ctx.task_executor.spawn_critical_task("prune-ctrl-c", async move {
tokio::signal::ctrl_c().await.expect("failed to listen for ctrl-c");
cancellation_clone.cancel();
});

View File

@@ -9,7 +9,10 @@ use reth_db_api::{
transaction::{DbTx, DbTxMut},
};
use reth_db_common::{
init::{insert_genesis_header, insert_genesis_history, insert_genesis_state},
init::{
insert_genesis_account_history, insert_genesis_header, insert_genesis_state,
insert_genesis_storage_history,
},
DbTool,
};
use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
@@ -42,12 +45,16 @@ impl<C: ChainSpecParser> Command<C> {
let tool = DbTool::new(provider_factory)?;
let static_file_segment = match self.stage {
StageEnum::Headers => Some(StaticFileSegment::Headers),
StageEnum::Bodies => Some(StaticFileSegment::Transactions),
StageEnum::Execution => Some(StaticFileSegment::Receipts),
StageEnum::Senders => Some(StaticFileSegment::TransactionSenders),
_ => None,
let static_file_segments = match self.stage {
StageEnum::Headers => vec![StaticFileSegment::Headers],
StageEnum::Bodies => vec![StaticFileSegment::Transactions],
StageEnum::Execution => vec![
StaticFileSegment::Receipts,
StaticFileSegment::AccountChangeSets,
StaticFileSegment::StorageChangeSets,
],
StageEnum::Senders => vec![StaticFileSegment::TransactionSenders],
_ => vec![],
};
// Calling `StaticFileProviderRW::prune_*` will instruct the writer to prune rows only
@@ -55,35 +62,33 @@ impl<C: ChainSpecParser> Command<C> {
// deleting the jar files, otherwise if the task were to be interrupted after we
// have deleted them, BUT before we have committed the checkpoints to the database, we'd
// lose essential data.
if let Some(static_file_segment) = static_file_segment {
let static_file_provider = tool.provider_factory.static_file_provider();
if let Some(highest_block) =
static_file_provider.get_highest_static_file_block(static_file_segment)
let static_file_provider = tool.provider_factory.static_file_provider();
for segment in static_file_segments {
if let Some(highest_block) = static_file_provider.get_highest_static_file_block(segment)
{
let mut writer = static_file_provider.latest_writer(static_file_segment)?;
let mut writer = static_file_provider.latest_writer(segment)?;
match static_file_segment {
match segment {
StaticFileSegment::Headers => {
// Prune all headers leaving genesis intact.
writer.prune_headers(highest_block)?;
}
StaticFileSegment::Transactions => {
let to_delete = static_file_provider
.get_highest_static_file_tx(static_file_segment)
.get_highest_static_file_tx(segment)
.map(|tx_num| tx_num + 1)
.unwrap_or_default();
writer.prune_transactions(to_delete, 0)?;
}
StaticFileSegment::Receipts => {
let to_delete = static_file_provider
.get_highest_static_file_tx(static_file_segment)
.get_highest_static_file_tx(segment)
.map(|tx_num| tx_num + 1)
.unwrap_or_default();
writer.prune_receipts(to_delete, 0)?;
}
StaticFileSegment::TransactionSenders => {
let to_delete = static_file_provider
.get_highest_static_file_tx(static_file_segment)
.get_highest_static_file_tx(segment)
.map(|tx_num| tx_num + 1)
.unwrap_or_default();
writer.prune_transaction_senders(to_delete, 0)?;
@@ -128,8 +133,15 @@ impl<C: ChainSpecParser> Command<C> {
reset_stage_checkpoint(tx, StageId::SenderRecovery)?;
}
StageEnum::Execution => {
tx.clear::<tables::PlainAccountState>()?;
tx.clear::<tables::PlainStorageState>()?;
if provider_rw.cached_storage_settings().use_hashed_state() {
tx.clear::<tables::HashedAccounts>()?;
tx.clear::<tables::HashedStorages>()?;
reset_stage_checkpoint(tx, StageId::AccountHashing)?;
reset_stage_checkpoint(tx, StageId::StorageHashing)?;
} else {
tx.clear::<tables::PlainAccountState>()?;
tx.clear::<tables::PlainStorageState>()?;
}
tx.clear::<tables::AccountChangeSets>()?;
tx.clear::<tables::StorageChangeSets>()?;
tx.clear::<tables::Bytecodes>()?;
@@ -171,29 +183,42 @@ impl<C: ChainSpecParser> Command<C> {
None,
)?;
}
StageEnum::AccountHistory | StageEnum::StorageHistory => {
StageEnum::AccountHistory => {
let settings = provider_rw.cached_storage_settings();
let rocksdb = tool.provider_factory.rocksdb_provider();
if settings.account_history_in_rocksdb {
if settings.storage_v2 {
rocksdb.clear::<tables::AccountsHistory>()?;
} else {
tx.clear::<tables::AccountsHistory>()?;
}
if settings.storages_history_in_rocksdb {
reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
insert_genesis_account_history(
&provider_rw,
self.env.chain.genesis().alloc.iter(),
)?;
}
StageEnum::StorageHistory => {
let settings = provider_rw.cached_storage_settings();
let rocksdb = tool.provider_factory.rocksdb_provider();
if settings.storage_v2 {
rocksdb.clear::<tables::StoragesHistory>()?;
} else {
tx.clear::<tables::StoragesHistory>()?;
}
reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
reset_stage_checkpoint(tx, StageId::IndexStorageHistory)?;
insert_genesis_history(&provider_rw, self.env.chain.genesis().alloc.iter())?;
insert_genesis_storage_history(
&provider_rw,
self.env.chain.genesis().alloc.iter(),
)?;
}
StageEnum::TxLookup => {
if provider_rw.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
if provider_rw.cached_storage_settings().storage_v2 {
tool.provider_factory
.rocksdb_provider()
.clear::<tables::TransactionHashNumbers>()?;

View File

@@ -37,12 +37,14 @@ where
unwind_and_copy(db_tool, from, tip_block_number, &output_db, evm_config.clone())?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
runtime,
)?,
to,
from,

View File

@@ -33,12 +33,14 @@ pub(crate) async fn dump_hashing_account_stage<N: ProviderNodeTypes<DB = Databas
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
runtime,
)?,
to,
from,

View File

@@ -23,12 +23,14 @@ pub(crate) async fn dump_hashing_storage_stage<N: ProviderNodeTypes<DB = Databas
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
runtime,
)?,
to,
from,

View File

@@ -57,12 +57,14 @@ where
unwind_and_copy(db_tool, (from, to), tip_block_number, &output_db, evm_config, consensus)?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
runtime,
)?,
to,
from,

View File

@@ -248,9 +248,10 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
(Box::new(stage), None)
}
StageEnum::Senders => (
Box::new(SenderRecoveryStage::new(SenderRecoveryConfig {
commit_threshold: batch_size,
})),
Box::new(SenderRecoveryStage::new(
SenderRecoveryConfig { commit_threshold: batch_size },
None,
)),
None,
),
StageEnum::Execution => (

View File

@@ -10,9 +10,10 @@
//! Entrypoint for running commands.
use reth_tasks::{TaskExecutor, TaskManager};
use reth_tasks::{PanickedTaskError, TaskExecutor};
use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
use tracing::{debug, error, trace};
use tokio::task::JoinHandle;
use tracing::{debug, error, info};
/// Executes CLI commands.
///
@@ -20,21 +21,24 @@ use tracing::{debug, error, trace};
#[derive(Debug)]
pub struct CliRunner {
config: CliRunnerConfig,
tokio_runtime: tokio::runtime::Runtime,
runtime: reth_tasks::Runtime,
}
impl CliRunner {
/// Attempts to create a new [`CliRunner`] using the default tokio
/// [`Runtime`](tokio::runtime::Runtime).
/// Attempts to create a new [`CliRunner`] using the default
/// [`Runtime`](reth_tasks::Runtime).
///
/// The default tokio runtime is multi-threaded, with both I/O and time drivers enabled.
pub fn try_default_runtime() -> Result<Self, std::io::Error> {
Ok(Self { config: CliRunnerConfig::default(), tokio_runtime: tokio_runtime()? })
/// The default runtime is multi-threaded, with both I/O and time drivers enabled.
pub fn try_default_runtime() -> Result<Self, reth_tasks::RuntimeBuildError> {
Self::try_with_runtime_config(reth_tasks::RuntimeConfig::default())
}
/// Create a new [`CliRunner`] from a provided tokio [`Runtime`](tokio::runtime::Runtime).
pub const fn from_runtime(tokio_runtime: tokio::runtime::Runtime) -> Self {
Self { config: CliRunnerConfig::new(), tokio_runtime }
/// Creates a new [`CliRunner`] with the given [`RuntimeConfig`](reth_tasks::RuntimeConfig).
pub fn try_with_runtime_config(
config: reth_tasks::RuntimeConfig,
) -> Result<Self, reth_tasks::RuntimeBuildError> {
let runtime = reth_tasks::RuntimeBuilder::new(config).build()?;
Ok(Self { config: CliRunnerConfig::default(), runtime })
}
/// Sets the [`CliRunnerConfig`] for this runner.
@@ -48,7 +52,7 @@ impl CliRunner {
where
F: Future<Output = T>,
{
self.tokio_runtime.block_on(fut)
self.runtime.handle().block_on(fut)
}
/// Executes the given _async_ command on the tokio runtime until the command future resolves or
@@ -64,12 +68,11 @@ impl CliRunner {
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
{
let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
AsyncCliRunner::new(self.tokio_runtime);
let (context, task_manager_handle) = cli_context(&self.runtime);
// Executes the command until it finished or ctrl-c was fired
let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
&mut task_manager,
let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
task_manager_handle,
run_until_ctrl_c(command(context)),
));
@@ -77,13 +80,13 @@ impl CliRunner {
error!(target: "reth::cli", "shutting down due to error");
} else {
debug!(target: "reth::cli", "shutting down gracefully");
// after the command has finished or exit signal was received we shutdown the task
// manager which fires the shutdown signal to all tasks spawned via the task
// after the command has finished or exit signal was received we shutdown the
// runtime which fires the shutdown signal to all tasks spawned via the task
// executor and awaiting on tasks spawned with graceful shutdown
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
}
tokio_shutdown(tokio_runtime, true);
runtime_shutdown(self.runtime, true);
command_res
}
@@ -99,17 +102,16 @@ impl CliRunner {
F: Future<Output = Result<(), E>> + Send + 'static,
E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
{
let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
AsyncCliRunner::new(self.tokio_runtime);
let (context, task_manager_handle) = cli_context(&self.runtime);
// Spawn the command on the blocking thread pool
let handle = tokio_runtime.handle().clone();
let command_handle =
tokio_runtime.handle().spawn_blocking(move || handle.block_on(command(context)));
let handle = self.runtime.handle().clone();
let handle2 = handle.clone();
let command_handle = handle.spawn_blocking(move || handle2.block_on(command(context)));
// Wait for the command to complete or ctrl-c
let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
&mut task_manager,
let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
task_manager_handle,
run_until_ctrl_c(
async move { command_handle.await.expect("Failed to join blocking task") },
),
@@ -119,10 +121,10 @@ impl CliRunner {
error!(target: "reth::cli", "shutting down due to error");
} else {
debug!(target: "reth::cli", "shutting down gracefully");
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
}
tokio_shutdown(tokio_runtime, true);
runtime_shutdown(self.runtime, true);
command_res
}
@@ -133,48 +135,40 @@ impl CliRunner {
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<std::io::Error> + 'static,
{
self.tokio_runtime.block_on(run_until_ctrl_c(fut))?;
self.runtime.handle().block_on(run_until_ctrl_c(fut))?;
Ok(())
}
/// Executes a regular future as a spawned blocking task until completion or until external
/// signal received.
///
/// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking) .
/// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking).
pub fn run_blocking_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
where
F: Future<Output = Result<(), E>> + Send + 'static,
E: Send + Sync + From<std::io::Error> + 'static,
{
let tokio_runtime = self.tokio_runtime;
let handle = tokio_runtime.handle().clone();
let fut = tokio_runtime.handle().spawn_blocking(move || handle.block_on(fut));
tokio_runtime
let handle = self.runtime.handle().clone();
let handle2 = handle.clone();
let fut = handle.spawn_blocking(move || handle2.block_on(fut));
self.runtime
.handle()
.block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
tokio_shutdown(tokio_runtime, false);
runtime_shutdown(self.runtime, false);
Ok(())
}
}
/// [`CliRunner`] configuration when executing commands asynchronously
struct AsyncCliRunner {
context: CliContext,
task_manager: TaskManager,
tokio_runtime: tokio::runtime::Runtime,
}
// === impl AsyncCliRunner ===
impl AsyncCliRunner {
/// Given a tokio [`Runtime`](tokio::runtime::Runtime), creates additional context required to
/// execute commands asynchronously.
fn new(tokio_runtime: tokio::runtime::Runtime) -> Self {
let task_manager = TaskManager::new(tokio_runtime.handle().clone());
let task_executor = task_manager.executor();
Self { context: CliContext { task_executor }, task_manager, tokio_runtime }
}
/// Extracts the task manager handle from the runtime and creates the [`CliContext`].
fn cli_context(
runtime: &reth_tasks::Runtime,
) -> (CliContext, JoinHandle<Result<(), PanickedTaskError>>) {
let handle =
runtime.take_task_manager_handle().expect("Runtime must contain a TaskManager handle");
let context = CliContext { task_executor: runtime.clone() };
(context, handle)
}
/// Additional context provided by the [`CliRunner`] when executing commands
@@ -216,37 +210,25 @@ impl CliRunnerConfig {
}
}
/// Creates a new default tokio multi-thread [Runtime](tokio::runtime::Runtime) with all features
/// enabled
pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
// Keep the threads alive for at least the block time (12 seconds) plus buffer.
// This prevents the costly process of spawning new threads on every
// new block, and instead reuses the existing threads.
.thread_keep_alive(Duration::from_secs(15))
.thread_name("tokio-rt")
.build()
}
/// Runs the given future to completion or until a critical task panicked.
///
/// Returns the error if a task panicked, or the given future returned an error.
async fn run_to_completion_or_panic<F, E>(tasks: &mut TaskManager, fut: F) -> Result<(), E>
async fn run_to_completion_or_panic<F, E>(
task_manager_handle: JoinHandle<Result<(), PanickedTaskError>>,
fut: F,
) -> Result<(), E>
where
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
{
{
let fut = pin!(fut);
tokio::select! {
task_manager_result = tasks => {
if let Err(panicked_error) = task_manager_result {
return Err(panicked_error.into());
}
},
res = fut => res?,
}
let fut = pin!(fut);
tokio::select! {
task_manager_result = task_manager_handle => {
if let Ok(Err(panicked_error)) = task_manager_result {
return Err(panicked_error.into());
}
},
res = fut => res?,
}
Ok(())
}
@@ -271,10 +253,10 @@ where
tokio::select! {
_ = ctrl_c => {
trace!(target: "reth::cli", "Received ctrl-c");
info!(target: "reth::cli", "Received ctrl-c");
},
_ = sigterm => {
trace!(target: "reth::cli", "Received SIGTERM");
info!(target: "reth::cli", "Received SIGTERM");
},
res = fut => res?,
}
@@ -287,7 +269,7 @@ where
tokio::select! {
_ = ctrl_c => {
trace!(target: "reth::cli", "Received ctrl-c");
info!(target: "reth::cli", "Received ctrl-c");
},
res = fut => res?,
}
@@ -296,17 +278,17 @@ where
Ok(())
}
/// Shut down the given Tokio runtime, and wait for it if `wait` is set.
/// Default timeout for waiting on the tokio runtime to shut down.
const DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
/// Shut down the given [`Runtime`](reth_tasks::Runtime), and wait for it if `wait` is set.
///
/// `drop(tokio_runtime)` would block the current thread until its pools
/// (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop
/// it on a separate thread and wait for up to 5 seconds for this operation to
/// complete.
fn tokio_shutdown(rt: tokio::runtime::Runtime, wait: bool) {
// Shutdown the runtime on a separate thread
/// Dropping the runtime on the current thread could block due to tokio pool teardown.
/// Instead, we drop it on a separate thread and optionally wait for completion.
fn runtime_shutdown(rt: reth_tasks::Runtime, wait: bool) {
let (tx, rx) = mpsc::channel();
std::thread::Builder::new()
.name("tokio-shutdown".to_string())
.name("rt-shutdown".to_string())
.spawn(move || {
drop(rt);
let _ = tx.send(());
@@ -314,8 +296,8 @@ fn tokio_shutdown(rt: tokio::runtime::Runtime, wait: bool) {
.unwrap();
if wait {
let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
let _ = rx.recv_timeout(DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT).inspect_err(|err| {
tracing::warn!(target: "reth::cli", %err, "runtime shutdown timed out");
});
}
}

View File

@@ -11,7 +11,6 @@ use reth_node_builder::{
PayloadTypes,
};
use reth_provider::providers::{BlockchainProvider, NodeTypesForProvider};
use reth_tasks::TaskManager;
use std::sync::Arc;
use wallet::Wallet;
@@ -50,7 +49,7 @@ pub async fn setup<N>(
chain_spec: Arc<N::ChainSpec>,
is_dev: bool,
attributes_generator: impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes + Send + Sync + Copy + 'static,
) -> eyre::Result<(Vec<NodeHelperType<N>>, TaskManager, Wallet)>
) -> eyre::Result<(Vec<NodeHelperType<N>>, Wallet)>
where
N: NodeBuilderHelper,
{
@@ -69,7 +68,6 @@ pub async fn setup_engine<N>(
attributes_generator: impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes + Send + Sync + Copy + 'static,
) -> eyre::Result<(
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
TaskManager,
Wallet,
)>
where
@@ -96,7 +94,6 @@ pub async fn setup_engine_with_connection<N>(
connect_nodes: bool,
) -> eyre::Result<(
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
TaskManager,
Wallet,
)>
where

View File

@@ -14,7 +14,7 @@ use reth_node_core::args::{DiscoveryArgs, NetworkArgs, RpcServerArgs};
use reth_primitives_traits::AlloyBlockHeader;
use reth_provider::providers::BlockchainProvider;
use reth_rpc_server_types::RpcModuleSelection;
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use std::sync::Arc;
use tracing::{span, Instrument, Level};
@@ -110,11 +110,9 @@ where
self,
) -> eyre::Result<(
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
TaskManager,
Wallet,
)> {
let tasks = TaskManager::current();
let exec = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
let network_config = NetworkArgs {
discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },
@@ -153,7 +151,7 @@ where
let span = span!(Level::INFO, "node", idx);
let node = N::default();
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
.testing_node(exec.clone())
.testing_node(runtime.clone())
.with_types_and_provider::<N, BlockchainProvider<_>>()
.with_components(node.components_builder())
.with_add_ons(node.add_ons())
@@ -197,7 +195,7 @@ where
}
}
Ok((nodes, tasks, Wallet::default().with_chain_id(self.chain_spec.chain().into())))
Ok((nodes, Wallet::default().with_chain_id(self.chain_spec.chain().into())))
}
}

View File

@@ -15,7 +15,7 @@ use reth_provider::{
};
use reth_rpc_server_types::RpcModuleSelection;
use reth_stages_types::StageId;
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use std::{path::Path, sync::Arc};
use tempfile::TempDir;
use tracing::{debug, info, span, Level};
@@ -24,8 +24,6 @@ use tracing::{debug, info, span, Level};
pub struct ChainImportResult {
/// The nodes that were created
pub nodes: Vec<NodeHelperType<EthereumNode>>,
/// The task manager
pub task_manager: TaskManager,
/// The wallet for testing
pub wallet: Wallet,
/// Temporary directories that must be kept alive for the duration of the test
@@ -68,8 +66,7 @@ pub async fn setup_engine_with_chain_import(
+ Copy
+ 'static,
) -> eyre::Result<ChainImportResult> {
let tasks = TaskManager::current();
let exec = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
let network_config = NetworkArgs {
discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },
@@ -129,6 +126,7 @@ pub async fn setup_engine_with_chain_import(
.with_default_tables()
.build()
.unwrap(),
reth_tasks::Runtime::test(),
)?;
// Initialize genesis if needed
@@ -221,7 +219,7 @@ pub async fn setup_engine_with_chain_import(
let node = EthereumNode::default();
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
.testing_node_with_datadir(exec.clone(), datadir.clone())
.testing_node_with_datadir(runtime.clone(), datadir.clone())
.with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
.with_components(node.components_builder())
.with_add_ons(node.add_ons())
@@ -243,7 +241,6 @@ pub async fn setup_engine_with_chain_import(
Ok(ChainImportResult {
nodes,
task_manager: tasks,
wallet: crate::Wallet::default().with_chain_id(chain_spec.chain.id()),
_temp_dirs: temp_dirs,
})
@@ -333,6 +330,7 @@ mod tests {
.with_default_tables()
.build()
.unwrap(),
reth_tasks::Runtime::test(),
)
.expect("failed to create provider factory");
@@ -397,6 +395,7 @@ mod tests {
.with_default_tables()
.build()
.unwrap(),
reth_tasks::Runtime::test(),
)
.expect("failed to create provider factory");
@@ -497,6 +496,7 @@ mod tests {
.with_default_tables()
.build()
.unwrap(),
reth_tasks::Runtime::test(),
)
.expect("failed to create provider factory");

View File

@@ -1,6 +1,6 @@
//! Test setup utilities for configuring the initial state.
use crate::{setup_engine_with_connection, testsuite::Environment, NodeBuilderHelper};
use crate::{testsuite::Environment, E2ETestSetupBuilder, NodeBuilderHelper};
use alloy_eips::BlockNumberOrTag;
use alloy_primitives::B256;
use alloy_rpc_types_engine::{ForkchoiceState, PayloadAttributes};
@@ -38,6 +38,8 @@ pub struct Setup<I> {
shutdown_tx: Option<mpsc::Sender<()>>,
/// Is this setup in dev mode
pub is_dev: bool,
/// Whether to use v2 storage mode (hashed keys, static file changesets, rocksdb history)
pub storage_v2: bool,
/// Tracks instance generic.
_phantom: PhantomData<I>,
/// Holds the import result to keep nodes alive when using imported chain
@@ -58,6 +60,7 @@ impl<I> Default for Setup<I> {
tree_config: TreeConfig::default(),
shutdown_tx: None,
is_dev: true,
storage_v2: false,
_phantom: Default::default(),
import_result_holder: None,
import_rlp_path: None,
@@ -126,6 +129,12 @@ where
self
}
/// Enable v2 storage mode (hashed keys, static file changesets, rocksdb history)
pub const fn with_storage_v2(mut self) -> Self {
self.storage_v2 = true;
self
}
/// Apply setup using pre-imported chain data from RLP file
pub async fn apply_with_import<N>(
&mut self,
@@ -194,23 +203,32 @@ where
self.shutdown_tx = Some(shutdown_tx);
let is_dev = self.is_dev;
let storage_v2 = self.storage_v2;
let node_count = self.network.node_count;
let tree_config = self.tree_config.clone();
let attributes_generator = Self::create_static_attributes_generator::<N>();
let result = setup_engine_with_connection::<N>(
let mut builder = E2ETestSetupBuilder::<N, _>::new(
node_count,
Arc::<N::ChainSpec>::new((*chain_spec).clone().into()),
is_dev,
self.tree_config.clone(),
attributes_generator,
self.network.connect_nodes,
)
.await;
.with_tree_config_modifier(move |base| {
tree_config.clone().with_cross_block_cache_size(base.cross_block_cache_size())
})
.with_node_config_modifier(move |config| config.set_dev(is_dev))
.with_connect_nodes(self.network.connect_nodes);
if storage_v2 {
builder = builder.with_storage_v2();
}
let result = builder.build().await;
let mut node_clients = Vec::new();
match result {
Ok((nodes, executor, _wallet)) => {
Ok((nodes, _wallet)) => {
// create HTTP clients for each node's RPC and Engine API endpoints
for node in &nodes {
node_clients.push(node.to_node_client()?);
@@ -218,12 +236,11 @@ where
// spawn a separate task just to handle the shutdown
tokio::spawn(async move {
// keep nodes and executor in scope to ensure they're not dropped
// keep nodes in scope to ensure they're not dropped
let _nodes = nodes;
let _executor = executor;
// Wait for shutdown signal
let _ = shutdown_rx.recv().await;
// nodes and executor will be dropped here when the test completes
// nodes will be dropped here when the test completes
});
}
Err(e) => {

View File

@@ -370,15 +370,14 @@ async fn test_setup_builder_with_custom_tree_config() -> Result<()> {
.build(),
);
let (nodes, _tasks, _wallet) =
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, |_| {
EthPayloadBuilderAttributes::default()
})
.with_tree_config_modifier(|config| {
config.with_persistence_threshold(0).with_memory_block_buffer_target(5)
})
.build()
.await?;
let (nodes, _wallet) = E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, |_| {
EthPayloadBuilderAttributes::default()
})
.with_tree_config_modifier(|config| {
config.with_persistence_threshold(0).with_memory_block_buffer_target(5)
})
.build()
.await?;
assert_eq!(nodes.len(), 1);

View File

@@ -10,7 +10,6 @@ use jsonrpsee::core::client::ClientT;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
use reth_db::tables;
use reth_e2e_test_utils::{transaction::TransactionTestContext, wallet, E2ETestSetupBuilder};
use reth_node_core::args::RocksDbArgs;
use reth_node_ethereum::EthereumNode;
use reth_payload_builder::EthPayloadBuilderAttributes;
use reth_provider::RocksDBProviderFactory;
@@ -96,22 +95,6 @@ fn test_attributes_generator(timestamp: u64) -> EthPayloadBuilderAttributes {
EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
}
/// Verifies that `RocksDB` CLI defaults are `None` (deferred to storage mode).
#[test]
fn test_rocksdb_defaults_are_none() {
let args = RocksDbArgs::default();
assert!(args.tx_hash.is_none(), "tx_hash default should be None (deferred to --storage.v2)");
assert!(
args.storages_history.is_none(),
"storages_history default should be None (deferred to --storage.v2)"
);
assert!(
args.account_history.is_none(),
"account_history default should be None (deferred to --storage.v2)"
);
}
/// Smoke test: node boots with `RocksDB` routing enabled.
#[tokio::test]
async fn test_rocksdb_node_startup() -> Result<()> {
@@ -119,7 +102,7 @@ async fn test_rocksdb_node_startup() -> Result<()> {
let chain_spec = test_chain_spec();
let (nodes, _tasks, _wallet) =
let (nodes, _wallet) =
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
.with_storage_v2()
.build()
@@ -147,7 +130,7 @@ async fn test_rocksdb_block_mining() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _wallet) =
let (mut nodes, _wallet) =
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
.with_storage_v2()
.build()
@@ -201,7 +184,7 @@ async fn test_rocksdb_transaction_queries() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
@@ -268,7 +251,7 @@ async fn test_rocksdb_multi_tx_same_block() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
@@ -336,7 +319,7 @@ async fn test_rocksdb_txs_across_blocks() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
@@ -421,7 +404,7 @@ async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
@@ -477,7 +460,7 @@ async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
///
/// This test exercises `unwind_trie_state_from` which previously failed with
/// `UnsortedInput` errors because it read changesets directly from MDBX tables
/// instead of using storage-aware methods that check `storage_changesets_in_static_files`.
/// instead of using storage-aware methods that check `is_v2()`.
#[tokio::test]
async fn test_rocksdb_reorg_unwind() -> Result<()> {
reth_tracing::init_test_tracing();
@@ -485,7 +468,7 @@ async fn test_rocksdb_reorg_unwind() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,

View File

@@ -36,8 +36,6 @@ futures = { workspace = true, optional = true }
auto_impl.workspace = true
serde.workspace = true
thiserror.workspace = true
rand = { workspace = true, optional = true }
tracing = { workspace = true, optional = true }
[features]
default = ["std"]
@@ -56,4 +54,3 @@ std = [
"thiserror/std",
"reth-evm/std",
]
debug-jitter = ["dep:rand", "dep:tracing"]

View File

@@ -1,6 +1,7 @@
//! Engine tree configuration.
use alloy_eips::merge::EPOCH_SLOTS;
use core::time::Duration;
/// Triggers persistence when the number of canonical blocks in memory exceeds this threshold.
pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
@@ -64,6 +65,9 @@ pub const DEFAULT_SPARSE_TRIE_PRUNE_DEPTH: usize = 4;
/// Storage tries beyond this limit are cleared (but allocations preserved).
pub const DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES: usize = 100;
/// Default timeout for the state root task before spawning a sequential fallback.
pub const DEFAULT_STATE_ROOT_TASK_TIMEOUT: Duration = Duration::from_secs(1);
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = EPOCH_SLOTS as u32 * 2;
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
@@ -169,12 +173,19 @@ pub struct TreeConfig {
disable_proof_v2: bool,
/// Whether to disable cache metrics recording (can be expensive with large cached state).
disable_cache_metrics: bool,
/// Whether to enable sparse trie as cache.
enable_sparse_trie_as_cache: bool,
/// Whether to disable sparse trie cache.
disable_trie_cache: bool,
/// Depth for sparse trie pruning after state root computation.
sparse_trie_prune_depth: usize,
/// Maximum number of storage tries to retain after pruning.
sparse_trie_max_storage_tries: usize,
/// Whether to fully disable sparse trie cache pruning between blocks.
disable_sparse_trie_cache_pruning: bool,
/// Timeout for the state root task before spawning a sequential fallback computation.
/// If `Some`, after waiting this duration for the state root task, a sequential state root
/// computation is spawned in parallel and whichever finishes first is used.
/// If `None`, the timeout fallback is disabled.
state_root_task_timeout: Option<Duration>,
}
impl Default for TreeConfig {
@@ -204,9 +215,11 @@ impl Default for TreeConfig {
account_worker_count: default_account_worker_count(),
disable_proof_v2: false,
disable_cache_metrics: false,
enable_sparse_trie_as_cache: false,
disable_trie_cache: false,
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
disable_sparse_trie_cache_pruning: false,
state_root_task_timeout: Some(DEFAULT_STATE_ROOT_TASK_TIMEOUT),
}
}
}
@@ -241,6 +254,7 @@ impl TreeConfig {
disable_cache_metrics: bool,
sparse_trie_prune_depth: usize,
sparse_trie_max_storage_tries: usize,
state_root_task_timeout: Option<Duration>,
) -> Self {
Self {
persistence_threshold,
@@ -267,9 +281,11 @@ impl TreeConfig {
account_worker_count,
disable_proof_v2,
disable_cache_metrics,
enable_sparse_trie_as_cache: false,
disable_trie_cache: false,
sparse_trie_prune_depth,
sparse_trie_max_storage_tries,
disable_sparse_trie_cache_pruning: false,
state_root_task_timeout,
}
}
@@ -586,14 +602,14 @@ impl TreeConfig {
self
}
/// Returns whether sparse trie as cache is enabled.
pub const fn enable_sparse_trie_as_cache(&self) -> bool {
self.enable_sparse_trie_as_cache
/// Returns whether sparse trie cache is disabled.
pub const fn disable_trie_cache(&self) -> bool {
self.disable_trie_cache
}
/// Setter for whether to enable sparse trie as cache.
pub const fn with_enable_sparse_trie_as_cache(mut self, value: bool) -> Self {
self.enable_sparse_trie_as_cache = value;
/// Setter for whether to disable sparse trie cache.
pub const fn with_disable_trie_cache(mut self, value: bool) -> Self {
self.disable_trie_cache = value;
self
}
@@ -618,4 +634,26 @@ impl TreeConfig {
self.sparse_trie_max_storage_tries = max_tries;
self
}
/// Returns whether sparse trie cache pruning is disabled.
pub const fn disable_sparse_trie_cache_pruning(&self) -> bool {
self.disable_sparse_trie_cache_pruning
}
/// Setter for whether to disable sparse trie cache pruning.
pub const fn with_disable_sparse_trie_cache_pruning(mut self, value: bool) -> Self {
self.disable_sparse_trie_cache_pruning = value;
self
}
/// Returns the state root task timeout.
pub const fn state_root_task_timeout(&self) -> Option<Duration> {
self.state_root_task_timeout
}
/// Setter for state root task timeout.
pub const fn with_state_root_task_timeout(mut self, timeout: Option<Duration>) -> Self {
self.state_root_task_timeout = timeout;
self
}
}

View File

@@ -1,54 +0,0 @@
//! Debug jitter utilities for testing timing-related bugs.
//!
//! When the `debug-jitter` feature is enabled, various components can add
//! random delays to help trigger out-of-order timing bugs that may only
//! manifest in real-world conditions.
//!
//! Control via environment variable:
//! - `RETH_DEBUG_JITTER_MS`: Maximum jitter in milliseconds (0-N random delay)
//!
//! Example: `RETH_DEBUG_JITTER_MS=5` adds 0-5ms random delays.
use std::{sync::OnceLock, thread, time::Duration};
use rand::Rng;
use tracing::trace;
/// Cached jitter configuration from environment.
static JITTER_CONFIG: OnceLock<Option<u64>> = OnceLock::new();
/// Reads the jitter configuration from environment variables.
///
/// Returns `Some(max_ms)` if jitter is enabled, `None` otherwise.
fn get_jitter_config() -> Option<u64> {
*JITTER_CONFIG.get_or_init(|| {
std::env::var("RETH_DEBUG_JITTER_MS")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.filter(|&ms| ms > 0)
})
}
/// Applies a random jitter delay if configured.
///
/// When `RETH_DEBUG_JITTER_MS` is set to a positive value N,
/// this function sleeps for a random duration between 0 and N milliseconds.
///
/// This is useful for testing timing-sensitive code paths that may have
/// race conditions or ordering bugs that only manifest with variable latencies.
///
/// The `context` parameter is used for logging to identify where jitter was applied.
pub fn maybe_apply_jitter(context: &str) {
if let Some(max_ms) = get_jitter_config() {
let delay_ms = rand::rng().random_range(0..=max_ms);
if delay_ms > 0 {
trace!(
target: "reth::jitter",
context,
delay_ms,
"Applying debug jitter delay"
);
thread::sleep(Duration::from_millis(delay_ms));
}
}
}

View File

@@ -23,7 +23,7 @@ use serde::{de::DeserializeOwned, Serialize};
// Re-export [`ExecutionPayload`] moved to `reth_payload_primitives`
#[cfg(feature = "std")]
pub use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator, ExecutableTxTuple};
pub use reth_evm::{ConfigureEngineEvm, ConvertTx, ExecutableTxIterator, ExecutableTxTuple};
pub use reth_payload_primitives::ExecutionPayload;
mod error;
@@ -46,10 +46,6 @@ pub use invalid_block_hook::{InvalidBlockHook, InvalidBlockHooks, NoopInvalidBlo
pub mod config;
pub use config::*;
/// Debug jitter utilities for testing timing-related bugs.
#[cfg(feature = "debug-jitter")]
pub mod jitter;
/// This type defines the versioned types of the engine API based on the [ethereum engine API](https://github.com/ethereum/execution-apis/tree/main/src/engine).
///
/// This includes the execution payload types and payload attributes that are used to trigger a

View File

@@ -20,7 +20,7 @@ use reth_node_types::{BlockTy, NodeTypes};
use reth_payload_builder::PayloadBuilderHandle;
use reth_provider::{
providers::{BlockchainProvider, ProviderNodeTypes},
ProviderFactory,
ProviderFactory, StorageSettingsCache,
};
use reth_prune::PrunerWithFactory;
use reth_stages_api::{MetricEventsSender, Pipeline};
@@ -94,6 +94,7 @@ where
if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
let downloader = BasicBlockDownloader::new(client, consensus.clone());
let use_hashed_state = provider.cached_storage_settings().use_hashed_state();
let persistence_handle =
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
@@ -111,6 +112,7 @@ where
engine_kind,
evm_config,
changeset_cache,
use_hashed_state,
);
let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
@@ -201,6 +203,7 @@ mod tests {
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
changeset_cache.clone(),
reth_tasks::Runtime::test(),
);
let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();

View File

@@ -27,12 +27,11 @@ reth-primitives-traits = { workspace = true, features = ["rayon", "dashmap"] }
reth-ethereum-primitives.workspace = true
reth-provider.workspace = true
reth-prune.workspace = true
reth-revm.workspace = true
reth-revm = { workspace = true, features = ["optional-balance-check"] }
reth-stages-api.workspace = true
reth-tasks.workspace = true
reth-trie-parallel.workspace = true
reth-trie-sparse = { workspace = true, features = ["std", "metrics"] }
reth-trie-sparse-parallel = { workspace = true, features = ["std"] }
reth-trie.workspace = true
reth-trie-common.workspace = true
reth-trie-db.workspace = true
@@ -116,7 +115,6 @@ name = "state_root_task"
harness = false
[features]
debug-jitter = ["reth-engine-primitives/debug-jitter", "reth-trie-parallel/debug-jitter"]
test-utils = [
"reth-chain-state/test-utils",
"reth-chainspec/test-utils",
@@ -143,7 +141,15 @@ test-utils = [
"reth-ethereum-primitives/test-utils",
"reth-node-ethereum/test-utils",
"reth-evm-ethereum/test-utils",
"reth-tasks/test-utils",
]
rocksdb = [
"reth-provider/rocksdb",
"reth-prune/rocksdb",
"reth-stages?/rocksdb",
"reth-e2e-test-utils/rocksdb",
]
edge = ["rocksdb"]
[[test]]
name = "e2e_testsuite"

View File

@@ -12,8 +12,7 @@ use rand::Rng;
use reth_chainspec::ChainSpec;
use reth_db_common::init::init_genesis;
use reth_engine_tree::tree::{
executor::WorkloadExecutor, precompile_cache::PrecompileCacheMap, PayloadProcessor,
StateProviderBuilder, TreeConfig,
precompile_cache::PrecompileCacheMap, PayloadProcessor, StateProviderBuilder, TreeConfig,
};
use reth_ethereum_primitives::TransactionSigned;
use reth_evm::OnStateHook;
@@ -219,7 +218,7 @@ fn bench_state_root(c: &mut Criterion) {
setup_provider(&factory, &state_updates).expect("failed to setup provider");
let payload_processor = PayloadProcessor::new(
WorkloadExecutor::default(),
reth_tasks::Runtime::test(),
EthEvmConfig::new(factory.chain_spec()),
&TreeConfig::default(),
PrecompileCacheMap::default(),

View File

@@ -138,7 +138,7 @@ impl<N: ProviderNodeTypes> PipelineSync<N> {
let (tx, rx) = oneshot::channel();
let pipeline = pipeline.take().expect("exists");
self.pipeline_task_spawner.spawn_critical_blocking(
self.pipeline_task_spawner.spawn_critical_blocking_task(
"pipeline task",
Box::pin(async move {
let result = pipeline.run_as_fut(Some(target)).await;

View File

@@ -9,7 +9,7 @@ use reth_provider::{
providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode,
};
use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
use reth_prune::{PrunerError, PrunerWithFactory};
use reth_stages_api::{MetricEvent, MetricEventsSender};
use reth_tasks::spawn_os_thread;
use std::{
@@ -74,18 +74,6 @@ where
pending_safe_block: None,
}
}
/// Prunes block data before the given block number according to the configured prune
/// configuration.
#[instrument(level = "debug", target = "engine::persistence", skip_all, fields(block_num))]
fn prune_before(&mut self, block_num: u64) -> Result<PrunerOutput, PrunerError> {
debug!(target: "engine::persistence", ?block_num, "Running pruner");
let start_time = Instant::now();
// TODO: doing this properly depends on pruner segment changes
let result = self.pruner.run(block_num);
self.metrics.prune_before_duration_seconds.record(start_time.elapsed());
result
}
}
impl<N> PersistenceService<N>
@@ -118,11 +106,6 @@ where
let _ = self
.sync_metrics_tx
.send(MetricEvent::SyncHeight { height: block_number });
if self.pruner.is_pruning_needed(block_number) {
// We log `PrunerOutput` inside the `Pruner`
let _ = self.prune_before(block_number)?;
}
}
}
PersistenceAction::SaveFinalizedBlock(finalized_block) => {
@@ -163,7 +146,6 @@ where
let last_block = blocks.last().map(|b| b.recovered_block.num_hash());
let block_count = blocks.len();
// Take any pending finalized/safe block updates to commit together
let pending_finalized = self.pending_finalized_block.take();
let pending_safe = self.pending_safe_block.take();
@@ -171,11 +153,10 @@ where
let start_time = Instant::now();
if last_block.is_some() {
if let Some(last) = last_block {
let provider_rw = self.provider.database_provider_rw()?;
provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;
// Commit pending finalized/safe block updates in the same transaction
if let Some(finalized) = pending_finalized {
provider_rw.save_finalized_block_number(finalized)?;
}
@@ -183,6 +164,13 @@ where
provider_rw.save_safe_block_number(safe)?;
}
if self.pruner.is_pruning_needed(last.number) {
debug!(target: "engine::persistence", block_num=?last.number, "Running pruner");
let prune_start = Instant::now();
let _ = self.pruner.run_with_provider(&provider_rw, last.number)?;
self.metrics.prune_before_duration_seconds.record(prune_start.elapsed());
}
provider_rw.commit()?;
}

Some files were not shown because too many files have changed in this diff Show More