Compare commits

...

77 Commits

Author SHA1 Message Date
rakita
0f09a92acc fix: update gas_used/gas_refunded to ResultGas API 2026-02-12 16:47:57 +01:00
rakita
df8ba50cb7 chore: update revm/inspectors/alloy-evm patches and fix Bytecode flatten 2026-02-12 16:37:56 +01:00
rakita
61341a1342 Merge remote-tracking branch 'origin/main' into staging
# Conflicts:
#	Cargo.lock
#	crates/optimism/evm/src/lib.rs
2026-02-12 16:32:00 +01:00
Georgios Konstantopoulos
4a62d38af2 perf(engine): use sequential sig recovery for blocks with small blocks (#22077)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Ubuntu <ubuntu@dev-yk.tail388b2e.ts.net>
Co-authored-by: YK <chiayongkang@hotmail.com>
2026-02-12 15:06:21 +00:00
Georgios Konstantopoulos
dc4f249f09 chore: zero-pad thread indices in thread names (#22113)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 12:45:49 +00:00
Brian Picciano
c915841a45 chore(stateless): Remove reth-stateless crate (#22115) 2026-02-12 11:20:49 +00:00
Georgios Konstantopoulos
217a337d8c chore(engine): remove biased select in engine service loop (#21961)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
2026-02-12 05:45:45 +00:00
Georgios Konstantopoulos
74d57008b6 chore(engine): downgrade failed response delivery logs to warn (#22055)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 05:44:09 +00:00
Georgios Konstantopoulos
f8767bc678 fix(engine): add await_state_root span to timeout path (#22111)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 05:14:39 +00:00
Georgios Konstantopoulos
81c83bba68 refactor(engine): remove unnecessary turbofish on CachedStateProvider, add new_prewarm (#22107)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 02:48:57 +00:00
Georgios Konstantopoulos
cd8ec58703 refactor(engine): move CachedStateProvider prewarm to const generic (#22106)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 01:30:24 +00:00
DaniPopes
931b17c3fd chore: bump alloy-core deps (#22104) 2026-02-12 01:15:56 +00:00
Emma Jamieson-Hoare
807d328cf0 fix: move alloy-primitives to regular dependency in bin/reth (#22105)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 01:15:12 +00:00
Georgios Konstantopoulos
8a6bbd29fe fix(tracing): return error instead of panicking on log directory creation failure (#22100)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 00:40:39 +00:00
Georgios Konstantopoulos
8bedaaee71 feat(docker): include debug symbols in maxperf images (#22003)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 00:34:41 +00:00
Emma Jamieson-Hoare
09cd105671 fix(primitives): move feature-referenced deps from dev-dependencies to optional dependencies (#22103)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 23:50:56 +00:00
Georgios Konstantopoulos
a0b60b7e64 feat(evm): impl ExecutableTxTuple for Either via EitherTxIterator (#22102)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 23:48:17 +00:00
DaniPopes
90e15d096d perf: reduce tracing span noise in prewarm and proof workers (#22101) 2026-02-11 23:32:50 +00:00
Emma Jamieson-Hoare
a161ca294f feat(net): add reason label to backed_off_peers metric (#22009)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 23:00:20 +00:00
Emma Jamieson-Hoare
3a5c41e3da test: add WebSocket subscription integration tests for eth_subscribe (#22065)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 22:56:47 +00:00
Georgios Konstantopoulos
968d3c9534 revert: skip transaction prewarming for small blocks (#22059) (#22097) 2026-02-11 14:38:08 -08:00
DaniPopes
fc6666f6a7 perf: treat hashes as bytes in BranchNodeCompact (#22089) 2026-02-11 22:11:49 +00:00
DaniPopes
ff3a854326 perf: use dedicated trie rayon pool for proof workers (#22051) 2026-02-11 22:10:17 +00:00
DaniPopes
04543ed16b chore: add span and log to runtime build (#22064) 2026-02-11 22:06:14 +00:00
Emma Jamieson-Hoare
ae3f0d4d1a test: expand CLI integration tests (#22086)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 21:43:28 +00:00
Georgios Konstantopoulos
5bccdc4a5d feat(engine): add state root task timeout with sequential fallback (#22004)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 20:45:45 +00:00
Georgios Konstantopoulos
0b7cd60668 perf(engine): skip transaction prewarming for small blocks (#22059)
Co-authored-by: yk <yongkang@tempo.xyz>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 20:37:04 +00:00
YK
aa983b49af perf(engine): add PrewarmMode::Skipped to avoid spawning idle workers (#22066)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Ubuntu <ubuntu@dev-yk.tail388b2e.ts.net>
2026-02-11 19:48:48 +00:00
Georgios Konstantopoulos
2aff617767 feat(cli): split account-history and storage-history stage drops (#22083)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 19:21:55 +00:00
Georgios Konstantopoulos
2c5d00ffb5 feat(engine): add gas bucket label to newPayload metrics (#22067)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 19:00:07 +00:00
Georgios Konstantopoulos
e2a3527414 test: add CLI integration tests for reth binary (#22069)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 17:56:16 +00:00
Georgios Konstantopoulos
e4cb3d3aed chore(cli): log received signals at info level (#22071)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 17:55:37 +00:00
DaniPopes
079b7b9d57 fix: don't drop node (#22063) 2026-02-11 16:43:55 +00:00
Georgios Konstantopoulos
8a25d7d3cf chore: remove ress crates from workspace (#22057)
Co-authored-by: mattsse <matt@paradigm.xyz>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-11 13:39:56 +00:00
Minhyuk Kim
a5ced84098 feat(node/builder): add build_with_ordering_and_spawn_maintenance_task to TxPoolBuilder (#21979) 2026-02-11 12:58:29 +00:00
Emma Jamieson-Hoare
59760a2fe3 feat(net): add direction labels to closed_sessions and pending_session_failures metrics (#22014)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:59:06 +00:00
Matthias Seitz
b9d21f293e refactor: remove TypesAnd1-5 staging types from ProviderFactoryBuilder (#22049)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:57:05 +00:00
Georgios Konstantopoulos
dec1cad318 refactor(trie): merge SparseTrieExt into SparseTrie trait (#22035)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:39:56 +00:00
Georgios Konstantopoulos
165b94c3fa chore(docker): pass RUSTC_WRAPPER to cargo build in Dockerfile.depot (#22048)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:37:43 +00:00
Georgios Konstantopoulos
69e4c06ae7 chore(log): simplify default profiler tracing filter (#22050)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:33:20 +00:00
Georgios Konstantopoulos
1406a984a7 ci: pass --no-fail-fast to all cargo nextest runs (#22046)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:21:38 +00:00
Andrey Kolishchak
93d6b9782c fix(node): ethstats conn/last_ping deadlock (#21463) 2026-02-11 03:48:54 +00:00
DaniPopes
68e4ff1f7d feat: global runtime (#21934) 2026-02-11 03:45:09 +00:00
Georgios Konstantopoulos
33467ea6dd fix(reth-bench): increase WS keepalive interval to match persistence timeout (#22039) 2026-02-11 02:45:54 +00:00
Georgios Konstantopoulos
3bf9280b3c refactor(storage): add with_*_opt builder methods to StorageSettings (#21998)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 02:19:33 +00:00
Georgios Konstantopoulos
5c93986e6d feat(reth-bench): accept bare integers as milliseconds for --wait-time (#22038)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 01:57:42 +00:00
Georgios Konstantopoulos
779e0eb8bb perf: downgrade on_hashed_state_update and on_prewarm_targets spans to trace (#22044)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-10 22:45:05 +00:00
rakita
3d02124e50 Merge origin/main into staging 2026-02-03 13:24:01 +01:00
rakita
953a1b3399 chore: update revm, revm-inspectors, alloy-evm to staging
- revm: 6aa06829d2caa2aa38606ed22b83354a7a7ff98e
- revm-inspectors: cc4f62d8c107b3d2dc42a220690a079cdfc8dfb1
- alloy-evm: 80e6a436ecade2b9bd950fd70bd0d6a45ef0a43e
2026-02-03 13:22:25 +01:00
rakita
a364989c61 Merge origin/main into staging 2026-02-03 13:14:17 +01:00
rakita
f4075f5926 Merge remote staging 2026-01-28 19:12:11 +01:00
rakita
211d3d2924 fix: adapt to revm hasher type changes 2026-01-28 19:11:48 +01:00
rakita
dbc5313a2c Merge origin/main into staging - update patches 2026-01-28 19:09:24 +01:00
rakita
345fc9cfd2 Merge branch 'main' into staging 2026-01-26 22:35:00 +01:00
rakita
33d61c30cb chore: update revm, revm-inspectors, alloy-evm to staging
Updates dependencies:
- revm to 300efbf3e391e1796f5210cd4506508e385a55d2
- revm-inspectors to 9464524e7d983c8601ba7af3048cd60575dad6bd
- alloy-evm to 1c8ff5a179e760517016ab4e4d5af1c4b5923594

Fixes:
- Add slot_num field to BlockEnv for EIP-7843 SLOTNUM opcode
- Update BlockHashCache API: use lowest() instead of keys()
2026-01-26 21:32:08 +01:00
DaniPopes
beb0c5e527 chore: reduce number of nightly builds (#21446) 2026-01-26 21:30:39 +01:00
DaniPopes
73c39279b1 chore: remove unused docker from makefile (#21445) 2026-01-26 21:30:39 +01:00
ethfanWilliam
2fd0a703e2 fix(stages): retain RocksDB TempDir in TestStageDB to prevent premature deletion (#21444) 2026-01-26 21:30:39 +01:00
Dan Cline
7dcd77de95 fix(pruner): prune account and storage changeset static files (#21346) 2026-01-26 21:30:39 +01:00
Dan Cline
da6e6afe78 chore(metrics): add a gas_last metric similar to new_payload_last (#21437) 2026-01-26 21:30:39 +01:00
Brian Picciano
04d9a33c68 refactor(trie): always use ParallelSparseTrie, deprecate config flags (#21435) 2026-01-26 21:30:39 +01:00
Arsenii Kulikov
be1f657b3c perf: use shared channel for prewarm workers (#21429) 2026-01-26 21:30:39 +01:00
Rez
cab4cbf0ea feat: configurable EVM execution limits (#21088)
Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
2026-01-26 21:30:39 +01:00
figtracer
29f51abb22 feat(rpc): add transaction hash caching to EthStateCache (#21180)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 21:30:39 +01:00
Matthias Seitz
dd0ee0709c fix(rpc): add block timestamp validation in eth_simulateV1 (#21397)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 21:30:39 +01:00
Matthias Seitz
dcd9e50663 fix(rpc): use correct error codes for eth_simulateV1 reverts and halts (#21412)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 21:30:39 +01:00
Matthias Seitz
59d11378b9 fix(rpc): add block number validation in eth_simulateV1 (#21396)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 21:30:39 +01:00
Matthias Seitz
70ceb755ad fix(rpc): populate block_hash in eth_simulateV1 logs (#21413)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 21:30:39 +01:00
Matthias Seitz
a49991c766 feat(cli): make stopping on invalid block the default for reth import (#21403)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 21:30:39 +01:00
Matthias Seitz
0a77c2aae1 feat(rpc): implement movePrecompileToAddress for eth_simulateV1 (#21414)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 21:30:39 +01:00
Matthias Seitz
93adbf82a0 fix(rpc): set prevrandao to zero for eth_simulateV1 simulated blocks (#21399)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 21:30:39 +01:00
Matthias Seitz
ce08b6f265 fix(rpc): cap simulate_v1 default gas limit to RPC gas cap (#21402)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 21:30:39 +01:00
Matthias Seitz
b03a704a1e fix(engine): only warn for critical capability mismatches (#21398)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 21:30:39 +01:00
Andrey Kolishchak
a51e03fce6 fix(net): FetchFullBlockRangeFuture can get stuck forever after partial body fetch + error (#21411)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 21:30:39 +01:00
rakita
57d7c98f66 chore: merge main and update alloy-evm staging patch 2026-01-26 12:39:58 +01:00
rakita
5d9a43f2d4 Merge remote-tracking branch 'origin/main' into staging 2026-01-26 12:36:44 +01:00
rakita
defd0e8e5c Bump revm to staging and fix breaking changes
- Patch revm and all sub-crates to staging commit 0dc217a9
- Patch revm-inspectors to staging commit fccc4ac5
- Patch alloy-evm to staging commit 625ccc0f
- Add slot_num field to BlockEnv initializers
- Update BlockHashCache usage (no longer has keys method)
2026-01-26 02:41:26 +01:00
207 changed files with 3711 additions and 5389 deletions

View File

@@ -0,0 +1,5 @@
---
reth-network: minor
---
Added reason label to backed_off_peers metric. The metric now tracks backed off peers by reason (too_many_peers, graceful_close, connection_error) to improve observability.

View File

@@ -0,0 +1,5 @@
---
ef-tests: patch
---
Removed reth-stateless crate and stateless validation from ef-tests.

View File

@@ -0,0 +1,4 @@
---
---
Added WebSocket subscription integration tests for eth_subscribe.

View File

@@ -0,0 +1,7 @@
---
reth: patch
reth-cli-commands: patch
reth-node-core: patch
---
Removed experimental ress protocol support for stateless Ethereum nodes.

View File

@@ -0,0 +1,5 @@
---
reth-node-builder: patch
---
Removed biased select in engine service loop to allow fair scheduling of shutdown requests alongside event processing.

View File

@@ -0,0 +1,5 @@
---
reth-primitives: patch
---
Moved feature-referenced dependencies from dev-dependencies to optional dependencies to ensure they are available when their corresponding features are enabled.

View File

@@ -0,0 +1,5 @@
---
reth-provider: patch
---
Removed unused staging types from ProviderFactoryBuilder.

View File

@@ -0,0 +1,4 @@
---
---
Expanded CLI integration tests with subcommand help coverage, config TOML validation, genesis JSON validation, and send transaction round-trip test for dev mode.

View File

@@ -0,0 +1,5 @@
---
reth-network: minor
---
Added direction labels to `closed_sessions` and `pending_session_failures` metrics. Operators can now distinguish session closures and failures by direction (`active`, `incoming_pending`, `outgoing_pending` for closed sessions; `inbound`, `outbound` for pending session failures).

View File

@@ -27,7 +27,6 @@ crates_to_check=(
reth-ethereum-forks
reth-ethereum-primitives
reth-ethereum-consensus
reth-stateless
)
any_failed=0

View File

@@ -63,6 +63,7 @@ exclude_crates=(
reth-provider # tokio
reth-prune # tokio
reth-prune-static-files # reth-provider
reth-tasks # tokio rt-multi-thread
reth-stages-api # reth-provider, reth-prune
reth-static-file # tokio
reth-transaction-pool # c-kzg

View File

@@ -35,6 +35,7 @@ jobs:
- name: Run e2e tests
run: |
cargo nextest run \
--no-fail-fast \
--locked --features "asm-keccak" \
--workspace \
--exclude 'example-*' \
@@ -61,6 +62,7 @@ jobs:
- name: Run RocksDB e2e tests
run: |
cargo nextest run \
--no-fail-fast \
--locked --features "edge" \
-p reth-e2e-test-utils \
-E 'binary(rocksdb)'

View File

@@ -46,6 +46,7 @@ jobs:
- name: Run tests
run: |
cargo nextest run \
--no-fail-fast \
--locked --features "asm-keccak ${{ matrix.network }} ${{ matrix.storage == 'edge' && 'edge' || '' }}" \
--workspace --exclude ef-tests \
-E "kind(test) and not binary(e2e_testsuite)"
@@ -76,4 +77,4 @@ jobs:
with:
cache-on-failure: true
- name: run era1 files integration tests
run: cargo nextest run --release --package reth-era --test it -- --ignored
run: cargo nextest run --no-fail-fast --release --package reth-era --test it -- --ignored

View File

@@ -49,6 +49,7 @@ jobs:
- name: Run tests
run: |
cargo nextest run \
--no-fail-fast \
--features "${{ matrix.features }} $EDGE_FEATURES" --locked \
${{ matrix.exclude_args }} --workspace \
--exclude ef-tests --no-tests=warn \
@@ -87,7 +88,7 @@ jobs:
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
- run: cargo nextest run --cargo-profile hivetests -p ef-tests --features "asm-keccak ef-tests"
- run: cargo nextest run --no-fail-fast --cargo-profile hivetests -p ef-tests --features "asm-keccak ef-tests"
doc:
name: doc tests

672
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -83,8 +83,6 @@ members = [
"crates/prune/db",
"crates/prune/prune",
"crates/prune/types",
"crates/ress/protocol",
"crates/ress/provider",
"crates/revm/",
"crates/rpc/ipc/",
"crates/rpc/rpc-api/",
@@ -101,7 +99,6 @@ members = [
"crates/stages/api/",
"crates/stages/stages/",
"crates/stages/types/",
"crates/stateless",
"crates/static-file/static-file",
"crates/static-file/types/",
"crates/storage/codecs/",
@@ -309,6 +306,11 @@ inherits = "release"
lto = "fat"
codegen-units = 1
[profile.maxperf-symbols]
inherits = "maxperf"
debug = "full"
strip = "none"
[profile.reproducible]
inherits = "release"
panic = "abort"
@@ -417,7 +419,6 @@ reth-rpc-convert = { path = "crates/rpc/rpc-convert" }
reth-stages = { path = "crates/stages/stages" }
reth-stages-api = { path = "crates/stages/api" }
reth-stages-types = { path = "crates/stages/types", default-features = false }
reth-stateless = { path = "crates/stateless", default-features = false }
reth-static-file = { path = "crates/static-file/static-file" }
reth-static-file-types = { path = "crates/static-file/types", default-features = false }
reth-storage-api = { path = "crates/storage/storage-api", default-features = false }
@@ -434,8 +435,6 @@ reth-trie-db = { path = "crates/trie/db" }
reth-trie-parallel = { path = "crates/trie/parallel" }
reth-trie-sparse = { path = "crates/trie/sparse", default-features = false }
reth-zstd-compressors = { path = "crates/storage/zstd-compressors", default-features = false }
reth-ress-protocol = { path = "crates/ress/protocol" }
reth-ress-provider = { path = "crates/ress/provider" }
# revm
revm = { version = "34.0.0", default-features = false }
@@ -449,15 +448,15 @@ op-revm = { version = "15.0.0", default-features = false }
revm-inspectors = "0.34.2"
# eth
alloy-dyn-abi = "1.5.4"
alloy-primitives = { version = "1.5.4", default-features = false, features = ["map-foldhash"] }
alloy-sol-types = { version = "1.5.4", default-features = false }
alloy-dyn-abi = "1.5.6"
alloy-primitives = { version = "1.5.6", default-features = false, features = ["map-foldhash"] }
alloy-sol-types = { version = "1.5.6", default-features = false }
alloy-chains = { version = "0.2.5", default-features = false }
alloy-eip2124 = { version = "0.2.0", default-features = false }
alloy-eip7928 = { version = "0.3.0", default-features = false }
alloy-evm = { version = "0.27.2", default-features = false }
alloy-rlp = { version = "0.3.10", default-features = false, features = ["core-net"] }
alloy-rlp = { version = "0.3.13", default-features = false, features = ["core-net"] }
alloy-trie = { version = "0.9.4", default-features = false }
alloy-hardforks = "0.4.5"
@@ -548,7 +547,7 @@ strum_macros = "0.27"
syn = "2.0"
thiserror = { version = "2.0.0", default-features = false }
tar = "0.4.44"
tracing = { version = "0.1.0", default-features = false }
tracing = { version = "0.1.0", default-features = false, features = ["attributes"] }
tracing-appender = "0.2"
url = { version = "2.3", default-features = false }
zstd = "0.13"
@@ -712,6 +711,28 @@ vergen-git2 = "9.1.0"
ipnet = "2.11"
[patch.crates-io]
# revm staging patches
revm = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
op-revm = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
revm-bytecode = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
revm-context = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
revm-context-interface = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
revm-database = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
revm-database-interface = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
revm-handler = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
revm-inspector = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
revm-interpreter = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
revm-precompile = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
revm-primitives = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
revm-state = { git = "https://github.com/bluealloy/revm", rev = "33330a285e621b9170c30a21cfea9ab32e2a2169" }
# revm-inspectors staging patch
revm-inspectors = { git = "https://github.com/paradigmxyz/revm-inspectors", rev = "e80e2eab72dfa18011e6a99abd37027290a46e83" }
# alloy-evm staging patches
alloy-evm = { git = "https://github.com/alloy-rs/evm", rev = "742dc14749ea0279c03ca27b1c26f26ac19fbefb" }
alloy-op-evm = { git = "https://github.com/alloy-rs/evm", rev = "742dc14749ea0279c03ca27b1c26f26ac19fbefb" }
# alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-contract = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-eips = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }

View File

@@ -51,7 +51,8 @@ RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \
--mount=type=cache,target=/usr/local/cargo/registry,sharing=shared \
--mount=type=cache,target=/usr/local/cargo/git,sharing=shared \
--mount=type=cache,target=$SCCACHE_DIR,sharing=shared \
SCCACHE_WEBDAV_ENDPOINT=https://cache.depot.dev SCCACHE_DIR=/sccache sccache --start-server && \
export RUSTC_WRAPPER=sccache SCCACHE_WEBDAV_ENDPOINT=https://cache.depot.dev SCCACHE_DIR=/sccache && \
sccache --start-server && \
if [ -n "$RUSTFLAGS" ]; then \
export RUSTFLAGS="$RUSTFLAGS"; \
elif [ "$TARGETPLATFORM" = "linux/amd64" ]; then \

View File

@@ -153,7 +153,7 @@ COV_FILE := lcov.info
.PHONY: test-unit
test-unit: ## Run unit tests.
cargo install cargo-nextest --locked
cargo nextest run $(UNIT_TEST_ARGS)
cargo nextest run --no-fail-fast $(UNIT_TEST_ARGS)
.PHONY: cov-unit
@@ -186,7 +186,7 @@ $(EEST_TESTS_DIR):
.PHONY: ef-tests
ef-tests: $(EF_TESTS_DIR) $(EEST_TESTS_DIR) ## Runs Legacy and EEST tests.
cargo nextest run -p ef-tests --release --features ef-tests
cargo nextest run --no-fail-fast -p ef-tests --release --features ef-tests
##@ reth-bench

View File

@@ -32,7 +32,7 @@ Otherwise, running `make maxperf` at the root of the repo should be sufficient f
The `reth-bench new-payload-fcu` command is the most representative of ethereum mainnet live sync, alternating between sending `engine_newPayload` calls and `engine_forkchoiceUpdated` calls.
The `new-payload-fcu` command supports two optional waiting modes that can be used together or independently:
- `--wait-time <duration>`: Fixed sleep interval between blocks (e.g., `--wait-time 100ms`)
- `--wait-time <duration>`: Fixed sleep interval between blocks (e.g., `--wait-time 100ms` or `--wait-time 400` for 400ms)
- `--wait-for-persistence`: Waits for blocks to be persisted using the `reth_subscribePersistedBlock` subscription
When using `--wait-for-persistence`, the benchmark waits after every `(threshold + 1)` blocks, where the threshold defaults to the engine's persistence threshold (2). This can be customized with `--persistence-threshold <N>`.

View File

@@ -2,7 +2,10 @@
use crate::valid_payload::call_forkchoice_updated;
use eyre::Result;
use std::io::{BufReader, Read};
use std::{
io::{BufReader, Read},
time::Duration,
};
/// Read input from either a file path or stdin.
pub(crate) fn read_input(path: Option<&str>) -> Result<String> {
@@ -51,6 +54,22 @@ pub(crate) fn parse_gas_limit(s: &str) -> eyre::Result<u64> {
let base: u64 = num_str.trim().parse()?;
base.checked_mul(multiplier).ok_or_else(|| eyre::eyre!("value overflow"))
}
/// Parses a duration string, treating bare integers as milliseconds.
///
/// Accepts either a `humantime` duration string (e.g. `"100ms"`, `"2s"`) or a plain
/// integer which is interpreted as milliseconds (e.g. `"400"` → 400ms).
pub(crate) fn parse_duration(s: &str) -> eyre::Result<Duration> {
match humantime::parse_duration(s) {
Ok(d) => Ok(d),
Err(_) => {
let millis: u64 =
s.trim().parse().map_err(|_| eyre::eyre!("invalid duration: {s:?}"))?;
Ok(Duration::from_millis(millis))
}
}
}
use alloy_consensus::Header;
use alloy_eips::eip4844::kzg_to_versioned_hash;
use alloy_primitives::{Address, B256};
@@ -270,4 +289,24 @@ mod tests {
assert!(parse_gas_limit("G").is_err());
assert!(parse_gas_limit("-1G").is_err());
}
#[test]
fn test_parse_duration_with_unit() {
assert_eq!(parse_duration("100ms").unwrap(), Duration::from_millis(100));
assert_eq!(parse_duration("2s").unwrap(), Duration::from_secs(2));
assert_eq!(parse_duration("1m").unwrap(), Duration::from_secs(60));
}
#[test]
fn test_parse_duration_bare_millis() {
assert_eq!(parse_duration("400").unwrap(), Duration::from_millis(400));
assert_eq!(parse_duration("0").unwrap(), Duration::from_millis(0));
assert_eq!(parse_duration("1000").unwrap(), Duration::from_millis(1000));
}
#[test]
fn test_parse_duration_errors() {
assert!(parse_duration("abc").is_err());
assert!(parse_duration("").is_err());
}
}

View File

@@ -12,6 +12,7 @@
use crate::{
bench::{
context::BenchContext,
helpers::parse_duration,
output::{
write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
},
@@ -25,7 +26,6 @@ use alloy_provider::Provider;
use alloy_rpc_types_engine::ForkchoiceState;
use clap::Parser;
use eyre::{Context, OptionExt};
use humantime::parse_duration;
use reth_cli_runner::CliContext;
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
use reth_node_core::args::BenchmarkArgs;
@@ -40,6 +40,9 @@ pub struct Command {
rpc_url: String,
/// How long to wait after a forkchoice update before sending the next payload.
///
/// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
/// milliseconds (e.g. `400`).
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
wait_time: Option<Duration>,
@@ -117,7 +120,7 @@ impl Command {
self.benchmark.ws_rpc_url.as_deref(),
&self.benchmark.engine_rpc_url,
)?;
let sub = setup_persistence_subscription(ws_url).await?;
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
Some(PersistenceWaiter::with_duration_and_subscription(
duration,
sub,
@@ -131,7 +134,7 @@ impl Command {
self.benchmark.ws_rpc_url.as_deref(),
&self.benchmark.engine_rpc_url,
)?;
let sub = setup_persistence_subscription(ws_url).await?;
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
Some(PersistenceWaiter::with_subscription(
sub,
self.persistence_threshold,

View File

@@ -154,12 +154,18 @@ impl PersistenceSubscription {
}
/// Establishes a websocket connection and subscribes to `reth_subscribePersistedBlock`.
///
/// The `keepalive_interval` is set to match `persistence_timeout` so that the `WebSocket`
/// connection is not dropped during long MDBX commits that block the server from responding
/// to pings.
pub(crate) async fn setup_persistence_subscription(
ws_url: Url,
persistence_timeout: Duration,
) -> eyre::Result<PersistenceSubscription> {
info!(target: "reth-bench", "Connecting to WebSocket at {} for persistence subscription", ws_url);
let ws_connect = WsConnect::new(ws_url.to_string());
let ws_connect =
WsConnect::new(ws_url.to_string()).with_keepalive_interval(persistence_timeout);
let client = RpcClient::connect_pubsub(ws_connect)
.await
.wrap_err("Failed to connect to WebSocket RPC endpoint")?;

View File

@@ -14,6 +14,7 @@
use crate::{
authenticated_transport::AuthenticatedTransportConnect,
bench::{
helpers::parse_duration,
output::{
write_benchmark_results, CombinedResult, GasRampPayloadFile, NewPayloadResult,
TotalGasOutput, TotalGasRow,
@@ -30,7 +31,6 @@ use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret};
use clap::Parser;
use eyre::Context;
use humantime::parse_duration;
use reth_cli_runner::CliContext;
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
use reth_node_api::EngineApiMessageVersion;
@@ -78,6 +78,9 @@ pub struct Command {
output: Option<PathBuf>,
/// How long to wait after a forkchoice update before sending the next payload.
///
/// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
/// milliseconds (e.g. `400`).
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
wait_time: Option<Duration>,
@@ -166,7 +169,7 @@ impl Command {
let mut waiter = match (self.wait_time, self.wait_for_persistence) {
(Some(duration), true) => {
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
let sub = setup_persistence_subscription(ws_url).await?;
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
Some(PersistenceWaiter::with_duration_and_subscription(
duration,
sub,
@@ -177,7 +180,7 @@ impl Command {
(Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
(None, true) => {
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
let sub = setup_persistence_subscription(ws_url).await?;
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
Some(PersistenceWaiter::with_subscription(
sub,
self.persistence_threshold,

View File

@@ -33,7 +33,6 @@ reth-chainspec.workspace = true
reth-primitives.workspace = true
reth-db = { workspace = true, features = ["mdbx"] }
reth-provider.workspace = true
reth-evm.workspace = true
reth-revm.workspace = true
reth-transaction-pool.workspace = true
reth-cli-runner.workspace = true
@@ -53,32 +52,31 @@ reth-payload-primitives.workspace = true
reth-node-api.workspace = true
reth-node-core.workspace = true
reth-ethereum-payload-builder.workspace = true
reth-ethereum-primitives.workspace = true
reth-node-ethereum.workspace = true
reth-node-builder.workspace = true
reth-node-metrics.workspace = true
reth-consensus.workspace = true
reth-tokio-util.workspace = true
reth-ress-protocol.workspace = true
reth-ress-provider.workspace = true
# alloy
alloy-primitives.workspace = true
alloy-rpc-types = { workspace = true, features = ["engine"] }
# tracing
tracing.workspace = true
# async
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] }
# misc
aquamarine.workspace = true
clap = { workspace = true, features = ["derive", "env"] }
eyre.workspace = true
[dev-dependencies]
alloy-node-bindings = "1.6.3"
alloy-provider = { workspace = true, features = ["reqwest"] }
alloy-rpc-types-eth.workspace = true
backon.workspace = true
serde_json.workspace = true
tempfile.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
toml.workspace = true
[features]
default = [
@@ -115,10 +113,12 @@ asm-keccak = [
"reth-primitives/asm-keccak",
"reth-ethereum-cli/asm-keccak",
"reth-node-ethereum/asm-keccak",
"alloy-primitives/asm-keccak",
]
keccak-cache-global = [
"reth-node-core/keccak-cache-global",
"reth-node-ethereum/keccak-cache-global",
"alloy-primitives/keccak-cache-global",
]
jemalloc = [
"reth-cli-util/jemalloc",

View File

@@ -15,7 +15,6 @@
//! - `asm-keccak`: Replaces the default, pure-Rust implementation of Keccak256 with one implemented
//! in assembly; see [the `keccak-asm` crate](https://github.com/DaniPopes/keccak-asm) for more
//! details and supported targets.
//! - `min-debug-logs`: Disables all logs below `debug` level.
//!
//! ### Allocator Features
//!
@@ -52,6 +51,9 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg))]
// Used in feature flags only (`asm-keccak`, `keccak-cache-global`)
use alloy_primitives as _;
pub mod cli;
/// Re-exported utils.
@@ -206,12 +208,9 @@ pub mod rpc {
}
}
/// Ress subprotocol installation.
pub mod ress;
// re-export for convenience
#[doc(inline)]
pub use reth_cli_runner::{tokio_runtime, CliContext, CliRunner};
pub use reth_cli_runner::{CliContext, CliRunner};
// for rendering diagrams
use aquamarine as _;
@@ -219,3 +218,4 @@ use aquamarine as _;
// used in main
use clap as _;
use reth_cli_util as _;
use tracing as _;

View File

@@ -8,9 +8,8 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::ne
static MALLOC_CONF: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
use clap::Parser;
use reth::{args::RessArgs, cli::Cli, ress::install_ress_subprotocol};
use reth::cli::Cli;
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
use reth_node_builder::NodeHandle;
use reth_node_ethereum::EthereumNode;
use tracing::info;
@@ -22,27 +21,12 @@ fn main() {
unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
}
if let Err(err) =
Cli::<EthereumChainSpecParser, RessArgs>::parse().run(async move |builder, ress_args| {
info!(target: "reth::cli", "Launching node");
let NodeHandle { node, node_exit_future } =
builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?;
if let Err(err) = Cli::<EthereumChainSpecParser>::parse().run(async move |builder, _| {
info!(target: "reth::cli", "Launching node");
let handle = builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?;
// Install ress subprotocol.
if ress_args.enabled {
install_ress_subprotocol(
ress_args,
node.provider,
node.evm_config,
node.network,
node.task_executor,
node.add_ons_handle.engine_events.new_listener(),
)?;
}
node_exit_future.await
})
{
handle.wait_for_node_exit().await
}) {
eprintln!("Error: {err:?}");
std::process::exit(1);
}

View File

@@ -1,67 +0,0 @@
use reth_ethereum_primitives::EthPrimitives;
use reth_evm::ConfigureEvm;
use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols};
use reth_network_api::FullNetwork;
use reth_node_api::ConsensusEngineEvent;
use reth_node_core::args::RessArgs;
use reth_provider::providers::{BlockchainProvider, ProviderNodeTypes};
use reth_ress_protocol::{NodeType, ProtocolState, RessProtocolHandler};
use reth_ress_provider::{maintain_pending_state, PendingState, RethRessProtocolProvider};
use reth_tasks::TaskExecutor;
use reth_tokio_util::EventStream;
use tokio::sync::mpsc;
use tracing::*;
/// Install `ress` subprotocol if it's enabled.
pub fn install_ress_subprotocol<P, E, N>(
args: RessArgs,
provider: BlockchainProvider<P>,
evm_config: E,
network: N,
task_executor: TaskExecutor,
engine_events: EventStream<ConsensusEngineEvent<EthPrimitives>>,
) -> eyre::Result<()>
where
P: ProviderNodeTypes<Primitives = EthPrimitives>,
E: ConfigureEvm<Primitives = EthPrimitives> + Clone + 'static,
N: FullNetwork + NetworkProtocols,
{
info!(target: "reth::cli", "Installing ress subprotocol");
let pending_state = PendingState::default();
// Spawn maintenance task for pending state.
task_executor.spawn(maintain_pending_state(
engine_events,
provider.clone(),
pending_state.clone(),
));
let (tx, mut rx) = mpsc::unbounded_channel();
let provider = RethRessProtocolProvider::new(
provider,
evm_config,
Box::new(task_executor.clone()),
args.max_witness_window,
args.witness_max_parallel,
args.witness_cache_size,
pending_state,
)?;
network.add_rlpx_sub_protocol(
RessProtocolHandler {
provider,
node_type: NodeType::Stateful,
peers_handle: network.peers_handle().clone(),
max_active_connections: args.max_active_connections,
state: ProtocolState::new(tx),
}
.into_rlpx_sub_protocol(),
);
info!(target: "reth::cli", "Ress subprotocol support enabled");
task_executor.spawn(async move {
while let Some(event) = rx.recv().await {
trace!(target: "reth::ress", ?event, "Received ress event");
}
});
Ok(())
}

255
bin/reth/tests/it/main.rs Normal file
View File

@@ -0,0 +1,255 @@
#![allow(missing_docs)]
use std::process::Command;
const RETH: &str = env!("CARGO_BIN_EXE_reth");
// ── Helpers ──────────────────────────────────────────────────────────────────
/// Runs `reth <args>` and returns stdout, asserting exit code 0.
///
/// Tracing is suppressed via `RUST_LOG=off` so that log lines emitted during
/// binary startup don't pollute stdout-based assertions.
#[track_caller]
fn reth_ok(args: &[&str]) -> String {
let output = Command::new(RETH).env("RUST_LOG", "off").args(args).output().unwrap();
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(output.status.success(), "args {args:?} failed.\nstdout: {stdout}\nstderr: {stderr}");
stdout.into_owned()
}
/// Spawns an isolated dev-mode reth node.
///
/// Discovery is disabled and peer limits are zeroed so the node is fully
/// isolated. Each call gets a unique temporary data directory so that
/// concurrent test runs never collide on the default `reth/dev/` path.
fn spawn_dev() -> (alloy_node_bindings::RethInstance, tempfile::TempDir) {
use alloy_node_bindings::Reth;
let datadir = tempfile::tempdir().expect("failed to create temp dir");
let instance = Reth::at(RETH)
.dev()
.disable_discovery()
.data_dir(datadir.path())
.args(["--max-outbound-peers", "0", "--max-inbound-peers", "0"])
.spawn();
// Return the TempDir alongside the instance so it lives as long as the node.
(instance, datadir)
}
// ── Original tests (from PR #22069) ──────────────────────────────────────────
#[test]
fn help() {
let stdout = reth_ok(&["--help"]);
assert!(stdout.contains("Usage"), "stdout: {stdout}");
assert!(stdout.contains("node"), "stdout: {stdout}");
}
#[test]
fn version() {
let stdout = reth_ok(&["--version"]);
assert!(stdout.to_lowercase().contains("reth"), "stdout: {stdout}");
}
#[test]
fn node_help() {
let stdout = reth_ok(&["node", "--help"]);
assert!(stdout.contains("--dev"), "stdout: {stdout}");
assert!(stdout.contains("--http"), "stdout: {stdout}");
}
#[test]
fn unknown_subcommand() {
let output = Command::new(RETH).arg("definitely-not-a-cmd").output().unwrap();
assert!(!output.status.success());
}
#[test]
fn unknown_flag() {
let output = Command::new(RETH).args(["node", "--no-such-flag"]).output().unwrap();
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(!output.status.success());
assert!(stderr.contains("--no-such-flag"), "stderr: {stderr}");
}
#[tokio::test]
async fn dev_node_eth_syncing() {
use alloy_provider::{Provider, ProviderBuilder};
let (reth, _datadir) = spawn_dev();
let provider = ProviderBuilder::new().connect_http(reth.endpoint().parse().unwrap());
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let _syncing = provider.syncing().await.expect("eth_syncing failed");
}
// ── Subcommand --help coverage ───────────────────────────────────────────────
//
// Every registered subcommand must produce valid --help output. This catches
// clap wiring regressions (e.g. a missing field, a conflicting arg name, or a
// broken `help_message()` call) that would otherwise only surface when a user
// runs the command.
#[test]
fn init_help() {
let stdout = reth_ok(&["init", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn init_state_help() {
let stdout = reth_ok(&["init-state", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn import_help() {
let stdout = reth_ok(&["import", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn import_era_help() {
let stdout = reth_ok(&["import-era", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn export_era_help() {
let stdout = reth_ok(&["export-era", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn dump_genesis_help() {
let stdout = reth_ok(&["dump-genesis", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn db_help() {
let stdout = reth_ok(&["db", "--help"]);
assert!(stdout.contains("stats"), "stdout: {stdout}");
}
#[test]
fn stage_help() {
let stdout = reth_ok(&["stage", "--help"]);
assert!(stdout.contains("run"), "stdout: {stdout}");
}
#[test]
fn p2p_help() {
let stdout = reth_ok(&["p2p", "--help"]);
assert!(stdout.contains("header"), "stdout: {stdout}");
}
#[test]
fn config_help() {
let stdout = reth_ok(&["config", "--help"]);
assert!(stdout.contains("--default"), "stdout: {stdout}");
}
#[test]
fn prune_help() {
let stdout = reth_ok(&["prune", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn download_help() {
let stdout = reth_ok(&["download", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn re_execute_help() {
let stdout = reth_ok(&["re-execute", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
// ── `config --default` outputs valid TOML ────────────────────────────────────
#[test]
fn config_default_valid_toml() {
let stdout = reth_ok(&["config", "--default"]);
let parsed: toml::Value =
toml::from_str(&stdout).expect("config --default did not produce valid TOML");
// The default config must contain the [stages] table — this is the heart of
// the pipeline configuration and its absence would indicate a serialization
// regression.
assert!(parsed.get("stages").is_some(), "missing [stages] in config output");
}
// ── `dump-genesis` outputs valid JSON ────────────────────────────────────────
#[test]
fn dump_genesis_mainnet_valid_json() {
let stdout = reth_ok(&["dump-genesis"]);
let genesis: serde_json::Value =
serde_json::from_str(&stdout).expect("dump-genesis did not produce valid JSON");
assert!(genesis.get("nonce").is_some(), "missing nonce in genesis JSON");
assert!(genesis.get("alloc").is_some(), "missing alloc in genesis JSON");
}
#[test]
fn dump_genesis_sepolia_valid_json() {
let stdout = reth_ok(&["dump-genesis", "--chain", "sepolia"]);
let genesis: serde_json::Value = serde_json::from_str(&stdout)
.expect("dump-genesis --chain sepolia did not produce valid JSON");
assert!(genesis.get("alloc").is_some(), "missing alloc in sepolia genesis JSON");
}
// ── Dev node: send transaction round-trip ────────────────────────────────────
//
// Exercises the full pipeline: RPC submission → mempool → sealing → execution →
// receipt retrieval. Uses the pre-funded dev account so no genesis customization
// is required.
#[tokio::test]
async fn dev_node_send_tx_and_mine() {
use alloy_primitives::{Address, U256};
use alloy_provider::{Provider, ProviderBuilder};
use alloy_rpc_types_eth::TransactionRequest;
let (reth, _datadir) = spawn_dev();
let provider = ProviderBuilder::new().connect_http(reth.endpoint().parse().unwrap());
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// Dev mode pre-funds the first dev account.
let accounts = provider.get_accounts().await.expect("eth_accounts failed");
assert!(!accounts.is_empty(), "dev node should expose at least one account");
let sender = accounts[0];
let recipient = Address::with_last_byte(0x42);
let tx = TransactionRequest::default().from(sender).to(recipient).value(U256::from(1_000_000));
let tx_hash = provider.send_transaction(tx).await.expect("eth_sendTransaction failed");
// In dev/instant-mine mode the node seals a block for each transaction, so
// the receipt becomes available almost immediately.
let receipt = tx_hash.get_receipt().await.expect("failed to get receipt");
assert!(receipt.status(), "transaction should have succeeded");
assert_eq!(receipt.to, Some(recipient));
assert!(receipt.block_number.unwrap() > 0, "receipt should be in a mined block");
// Verify the transfer actually mutated state.
let balance = provider.get_balance(recipient).await.expect("eth_getBalance failed");
assert_eq!(balance, U256::from(1_000_000));
}
const fn main() {}

View File

@@ -66,7 +66,8 @@ pub trait RethCli: Sized {
F: FnOnce(Self, CliRunner) -> R,
{
let cli = Self::parse_args()?;
let runner = CliRunner::try_default_runtime()?;
let runner = CliRunner::try_default_runtime()
.map_err(|e| Error::raw(clap::error::ErrorKind::Io, e))?;
Ok(cli.with_runner(f, runner))
}

View File

@@ -127,10 +127,14 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
/// Initializes environment according to [`AccessRights`] and returns an instance of
/// [`Environment`].
///
/// Internally builds a [`reth_tasks::Runtime`] attached to the current tokio handle for
/// parallel storage I/O.
pub fn init<N: CliNodeTypes>(&self, access: AccessRights) -> eyre::Result<Environment<N>>
where
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
{
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
let data_dir = self.datadir.clone().resolve_datadir(self.chain.chain());
let db_path = data_dir.db();
let sf_path = data_dir.static_files();
@@ -186,7 +190,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
.build()?;
let provider_factory =
self.create_provider_factory(&config, db, sfp, rocksdb_provider, access)?;
self.create_provider_factory(&config, db, sfp, rocksdb_provider, access, runtime)?;
if access.is_read_write() {
debug!(target: "reth::cli", chain=%self.chain.chain(), genesis=?self.chain.genesis_hash(), "Initializing genesis");
init_genesis_with_settings(&provider_factory, self.storage_settings())?;
@@ -207,6 +211,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
static_file_provider: StaticFileProvider<N::Primitives>,
rocksdb_provider: RocksDBProvider,
access: AccessRights,
runtime: reth_tasks::Runtime,
) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>>>
where
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
@@ -217,6 +222,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
self.chain.clone(),
static_file_provider,
rocksdb_provider,
runtime,
)?
.with_prune_modes(prune_modes.clone());

View File

@@ -70,23 +70,23 @@ pub enum Subcommands {
State(state::Command),
}
/// Initializes a provider factory with specified access rights, and then execute with the provided
/// command
macro_rules! db_exec {
($env:expr, $tool:ident, $N:ident, $access_rights:expr, $command:block) => {
let Environment { provider_factory, .. } = $env.init::<$N>($access_rights)?;
let $tool = DbTool::new(provider_factory)?;
$command;
};
}
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C> {
/// Execute `db` command
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(
self,
ctx: CliContext,
) -> eyre::Result<()> {
/// Initializes a provider factory with specified access rights, and then executes the
/// provided command.
macro_rules! db_exec {
($env:expr, $tool:ident, $N:ident, $access_rights:expr, $command:block) => {
let Environment { provider_factory, .. } = $env.init::<$N>($access_rights)?;
let $tool = DbTool::new(provider_factory)?;
$command;
};
}
let data_dir = self.env.datadir.clone().resolve_datadir(self.env.chain.chain());
let db_path = data_dir.db();
let static_files_path = data_dir.static_files();

View File

@@ -64,7 +64,7 @@ impl Command {
let executor = task_executor.clone();
let pprof_dump_dir = data_dir.pprof_dumps();
let handle = task_executor.spawn_critical("metrics server", async move {
let handle = task_executor.spawn_critical_task("metrics server", async move {
let config = MetricServerConfig::new(
listen_addr,
VersionInfo {

View File

@@ -76,7 +76,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> PruneComma
// Set up cancellation token for graceful shutdown on Ctrl+C
let cancellation = CancellationToken::new();
let cancellation_clone = cancellation.clone();
ctx.task_executor.spawn_critical("prune-ctrl-c", async move {
ctx.task_executor.spawn_critical_task("prune-ctrl-c", async move {
tokio::signal::ctrl_c().await.expect("failed to listen for ctrl-c");
cancellation_clone.cancel();
});

View File

@@ -9,7 +9,10 @@ use reth_db_api::{
transaction::{DbTx, DbTxMut},
};
use reth_db_common::{
init::{insert_genesis_header, insert_genesis_history, insert_genesis_state},
init::{
insert_genesis_account_history, insert_genesis_header, insert_genesis_state,
insert_genesis_storage_history,
},
DbTool,
};
use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
@@ -171,7 +174,7 @@ impl<C: ChainSpecParser> Command<C> {
None,
)?;
}
StageEnum::AccountHistory | StageEnum::StorageHistory => {
StageEnum::AccountHistory => {
let settings = provider_rw.cached_storage_settings();
let rocksdb = tool.provider_factory.rocksdb_provider();
@@ -181,16 +184,29 @@ impl<C: ChainSpecParser> Command<C> {
tx.clear::<tables::AccountsHistory>()?;
}
reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
insert_genesis_account_history(
&provider_rw,
self.env.chain.genesis().alloc.iter(),
)?;
}
StageEnum::StorageHistory => {
let settings = provider_rw.cached_storage_settings();
let rocksdb = tool.provider_factory.rocksdb_provider();
if settings.storages_history_in_rocksdb {
rocksdb.clear::<tables::StoragesHistory>()?;
} else {
tx.clear::<tables::StoragesHistory>()?;
}
reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
reset_stage_checkpoint(tx, StageId::IndexStorageHistory)?;
insert_genesis_history(&provider_rw, self.env.chain.genesis().alloc.iter())?;
insert_genesis_storage_history(
&provider_rw,
self.env.chain.genesis().alloc.iter(),
)?;
}
StageEnum::TxLookup => {
if provider_rw.cached_storage_settings().transaction_hash_numbers_in_rocksdb {

View File

@@ -37,12 +37,14 @@ where
unwind_and_copy(db_tool, from, tip_block_number, &output_db, evm_config.clone())?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
runtime,
)?,
to,
from,

View File

@@ -33,12 +33,14 @@ pub(crate) async fn dump_hashing_account_stage<N: ProviderNodeTypes<DB = Databas
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
runtime,
)?,
to,
from,

View File

@@ -23,12 +23,14 @@ pub(crate) async fn dump_hashing_storage_stage<N: ProviderNodeTypes<DB = Databas
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
runtime,
)?,
to,
from,

View File

@@ -57,12 +57,14 @@ where
unwind_and_copy(db_tool, (from, to), tip_block_number, &output_db, evm_config, consensus)?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
runtime,
)?,
to,
from,

View File

@@ -10,9 +10,10 @@
//! Entrypoint for running commands.
use reth_tasks::{TaskExecutor, TaskManager};
use reth_tasks::{PanickedTaskError, TaskExecutor};
use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
use tracing::{debug, error, trace};
use tokio::task::JoinHandle;
use tracing::{debug, error, info};
/// Executes CLI commands.
///
@@ -20,21 +21,24 @@ use tracing::{debug, error, trace};
#[derive(Debug)]
pub struct CliRunner {
config: CliRunnerConfig,
tokio_runtime: tokio::runtime::Runtime,
runtime: reth_tasks::Runtime,
}
impl CliRunner {
/// Attempts to create a new [`CliRunner`] using the default tokio
/// [`Runtime`](tokio::runtime::Runtime).
/// Attempts to create a new [`CliRunner`] using the default
/// [`Runtime`](reth_tasks::Runtime).
///
/// The default tokio runtime is multi-threaded, with both I/O and time drivers enabled.
pub fn try_default_runtime() -> Result<Self, std::io::Error> {
Ok(Self { config: CliRunnerConfig::default(), tokio_runtime: tokio_runtime()? })
/// The default runtime is multi-threaded, with both I/O and time drivers enabled.
pub fn try_default_runtime() -> Result<Self, reth_tasks::RuntimeBuildError> {
Self::try_with_runtime_config(reth_tasks::RuntimeConfig::default())
}
/// Create a new [`CliRunner`] from a provided tokio [`Runtime`](tokio::runtime::Runtime).
pub const fn from_runtime(tokio_runtime: tokio::runtime::Runtime) -> Self {
Self { config: CliRunnerConfig::new(), tokio_runtime }
/// Creates a new [`CliRunner`] with the given [`RuntimeConfig`](reth_tasks::RuntimeConfig).
pub fn try_with_runtime_config(
config: reth_tasks::RuntimeConfig,
) -> Result<Self, reth_tasks::RuntimeBuildError> {
let runtime = reth_tasks::RuntimeBuilder::new(config).build()?;
Ok(Self { config: CliRunnerConfig::default(), runtime })
}
/// Sets the [`CliRunnerConfig`] for this runner.
@@ -48,7 +52,7 @@ impl CliRunner {
where
F: Future<Output = T>,
{
self.tokio_runtime.block_on(fut)
self.runtime.handle().block_on(fut)
}
/// Executes the given _async_ command on the tokio runtime until the command future resolves or
@@ -64,12 +68,11 @@ impl CliRunner {
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
{
let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
AsyncCliRunner::new(self.tokio_runtime);
let (context, task_manager_handle) = cli_context(&self.runtime);
// Executes the command until it finished or ctrl-c was fired
let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
&mut task_manager,
let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
task_manager_handle,
run_until_ctrl_c(command(context)),
));
@@ -77,13 +80,13 @@ impl CliRunner {
error!(target: "reth::cli", "shutting down due to error");
} else {
debug!(target: "reth::cli", "shutting down gracefully");
// after the command has finished or exit signal was received we shutdown the task
// manager which fires the shutdown signal to all tasks spawned via the task
// after the command has finished or exit signal was received we shutdown the
// runtime which fires the shutdown signal to all tasks spawned via the task
// executor and awaiting on tasks spawned with graceful shutdown
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
}
tokio_shutdown(tokio_runtime, true);
runtime_shutdown(self.runtime, true);
command_res
}
@@ -99,17 +102,16 @@ impl CliRunner {
F: Future<Output = Result<(), E>> + Send + 'static,
E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
{
let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
AsyncCliRunner::new(self.tokio_runtime);
let (context, task_manager_handle) = cli_context(&self.runtime);
// Spawn the command on the blocking thread pool
let handle = tokio_runtime.handle().clone();
let command_handle =
tokio_runtime.handle().spawn_blocking(move || handle.block_on(command(context)));
let handle = self.runtime.handle().clone();
let handle2 = handle.clone();
let command_handle = handle.spawn_blocking(move || handle2.block_on(command(context)));
// Wait for the command to complete or ctrl-c
let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
&mut task_manager,
let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
task_manager_handle,
run_until_ctrl_c(
async move { command_handle.await.expect("Failed to join blocking task") },
),
@@ -119,10 +121,10 @@ impl CliRunner {
error!(target: "reth::cli", "shutting down due to error");
} else {
debug!(target: "reth::cli", "shutting down gracefully");
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
}
tokio_shutdown(tokio_runtime, true);
runtime_shutdown(self.runtime, true);
command_res
}
@@ -133,48 +135,40 @@ impl CliRunner {
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<std::io::Error> + 'static,
{
self.tokio_runtime.block_on(run_until_ctrl_c(fut))?;
self.runtime.handle().block_on(run_until_ctrl_c(fut))?;
Ok(())
}
/// Executes a regular future as a spawned blocking task until completion or until external
/// signal received.
///
/// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking) .
/// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking).
pub fn run_blocking_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
where
F: Future<Output = Result<(), E>> + Send + 'static,
E: Send + Sync + From<std::io::Error> + 'static,
{
let tokio_runtime = self.tokio_runtime;
let handle = tokio_runtime.handle().clone();
let fut = tokio_runtime.handle().spawn_blocking(move || handle.block_on(fut));
tokio_runtime
let handle = self.runtime.handle().clone();
let handle2 = handle.clone();
let fut = handle.spawn_blocking(move || handle2.block_on(fut));
self.runtime
.handle()
.block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
tokio_shutdown(tokio_runtime, false);
runtime_shutdown(self.runtime, false);
Ok(())
}
}
/// [`CliRunner`] configuration when executing commands asynchronously
struct AsyncCliRunner {
context: CliContext,
task_manager: TaskManager,
tokio_runtime: tokio::runtime::Runtime,
}
// === impl AsyncCliRunner ===
impl AsyncCliRunner {
/// Given a tokio [`Runtime`](tokio::runtime::Runtime), creates additional context required to
/// execute commands asynchronously.
fn new(tokio_runtime: tokio::runtime::Runtime) -> Self {
let task_manager = TaskManager::new(tokio_runtime.handle().clone());
let task_executor = task_manager.executor();
Self { context: CliContext { task_executor }, task_manager, tokio_runtime }
}
/// Extracts the task manager handle from the runtime and creates the [`CliContext`].
fn cli_context(
runtime: &reth_tasks::Runtime,
) -> (CliContext, JoinHandle<Result<(), PanickedTaskError>>) {
let handle =
runtime.take_task_manager_handle().expect("Runtime must contain a TaskManager handle");
let context = CliContext { task_executor: runtime.clone() };
(context, handle)
}
/// Additional context provided by the [`CliRunner`] when executing commands
@@ -216,37 +210,25 @@ impl CliRunnerConfig {
}
}
/// Creates a new default tokio multi-thread [Runtime](tokio::runtime::Runtime) with all features
/// enabled
pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
// Keep the threads alive for at least the block time (12 seconds) plus buffer.
// This prevents the costly process of spawning new threads on every
// new block, and instead reuses the existing threads.
.thread_keep_alive(Duration::from_secs(15))
.thread_name("tokio-rt")
.build()
}
/// Runs the given future to completion or until a critical task panicked.
///
/// Returns the error if a task panicked, or the given future returned an error.
async fn run_to_completion_or_panic<F, E>(tasks: &mut TaskManager, fut: F) -> Result<(), E>
async fn run_to_completion_or_panic<F, E>(
task_manager_handle: JoinHandle<Result<(), PanickedTaskError>>,
fut: F,
) -> Result<(), E>
where
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
{
{
let fut = pin!(fut);
tokio::select! {
task_manager_result = tasks => {
if let Err(panicked_error) = task_manager_result {
return Err(panicked_error.into());
}
},
res = fut => res?,
}
let fut = pin!(fut);
tokio::select! {
task_manager_result = task_manager_handle => {
if let Ok(Err(panicked_error)) = task_manager_result {
return Err(panicked_error.into());
}
},
res = fut => res?,
}
Ok(())
}
@@ -271,10 +253,10 @@ where
tokio::select! {
_ = ctrl_c => {
trace!(target: "reth::cli", "Received ctrl-c");
info!(target: "reth::cli", "Received ctrl-c");
},
_ = sigterm => {
trace!(target: "reth::cli", "Received SIGTERM");
info!(target: "reth::cli", "Received SIGTERM");
},
res = fut => res?,
}
@@ -287,7 +269,7 @@ where
tokio::select! {
_ = ctrl_c => {
trace!(target: "reth::cli", "Received ctrl-c");
info!(target: "reth::cli", "Received ctrl-c");
},
res = fut => res?,
}
@@ -296,17 +278,17 @@ where
Ok(())
}
/// Shut down the given Tokio runtime, and wait for it if `wait` is set.
/// Default timeout for waiting on the tokio runtime to shut down.
const DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
/// Shut down the given [`Runtime`](reth_tasks::Runtime), and wait for it if `wait` is set.
///
/// `drop(tokio_runtime)` would block the current thread until its pools
/// (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop
/// it on a separate thread and wait for up to 5 seconds for this operation to
/// complete.
fn tokio_shutdown(rt: tokio::runtime::Runtime, wait: bool) {
// Shutdown the runtime on a separate thread
/// Dropping the runtime on the current thread could block due to tokio pool teardown.
/// Instead, we drop it on a separate thread and optionally wait for completion.
fn runtime_shutdown(rt: reth_tasks::Runtime, wait: bool) {
let (tx, rx) = mpsc::channel();
std::thread::Builder::new()
.name("tokio-shutdown".to_string())
.name("rt-shutdown".to_string())
.spawn(move || {
drop(rt);
let _ = tx.send(());
@@ -314,8 +296,8 @@ fn tokio_shutdown(rt: tokio::runtime::Runtime, wait: bool) {
.unwrap();
if wait {
let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
let _ = rx.recv_timeout(DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT).inspect_err(|err| {
tracing::warn!(target: "reth::cli", %err, "runtime shutdown timed out");
});
}
}

View File

@@ -11,7 +11,6 @@ use reth_node_builder::{
PayloadTypes,
};
use reth_provider::providers::{BlockchainProvider, NodeTypesForProvider};
use reth_tasks::TaskManager;
use std::sync::Arc;
use wallet::Wallet;
@@ -50,7 +49,7 @@ pub async fn setup<N>(
chain_spec: Arc<N::ChainSpec>,
is_dev: bool,
attributes_generator: impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes + Send + Sync + Copy + 'static,
) -> eyre::Result<(Vec<NodeHelperType<N>>, TaskManager, Wallet)>
) -> eyre::Result<(Vec<NodeHelperType<N>>, Wallet)>
where
N: NodeBuilderHelper,
{
@@ -69,7 +68,6 @@ pub async fn setup_engine<N>(
attributes_generator: impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes + Send + Sync + Copy + 'static,
) -> eyre::Result<(
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
TaskManager,
Wallet,
)>
where
@@ -96,7 +94,6 @@ pub async fn setup_engine_with_connection<N>(
connect_nodes: bool,
) -> eyre::Result<(
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
TaskManager,
Wallet,
)>
where

View File

@@ -14,7 +14,7 @@ use reth_node_core::args::{DiscoveryArgs, NetworkArgs, RpcServerArgs};
use reth_primitives_traits::AlloyBlockHeader;
use reth_provider::providers::BlockchainProvider;
use reth_rpc_server_types::RpcModuleSelection;
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use std::sync::Arc;
use tracing::{span, Instrument, Level};
@@ -110,11 +110,9 @@ where
self,
) -> eyre::Result<(
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
TaskManager,
Wallet,
)> {
let tasks = TaskManager::current();
let exec = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
let network_config = NetworkArgs {
discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },
@@ -153,7 +151,7 @@ where
let span = span!(Level::INFO, "node", idx);
let node = N::default();
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
.testing_node(exec.clone())
.testing_node(runtime.clone())
.with_types_and_provider::<N, BlockchainProvider<_>>()
.with_components(node.components_builder())
.with_add_ons(node.add_ons())
@@ -197,7 +195,7 @@ where
}
}
Ok((nodes, tasks, Wallet::default().with_chain_id(self.chain_spec.chain().into())))
Ok((nodes, Wallet::default().with_chain_id(self.chain_spec.chain().into())))
}
}

View File

@@ -15,7 +15,7 @@ use reth_provider::{
};
use reth_rpc_server_types::RpcModuleSelection;
use reth_stages_types::StageId;
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use std::{path::Path, sync::Arc};
use tempfile::TempDir;
use tracing::{debug, info, span, Level};
@@ -24,8 +24,6 @@ use tracing::{debug, info, span, Level};
pub struct ChainImportResult {
/// The nodes that were created
pub nodes: Vec<NodeHelperType<EthereumNode>>,
/// The task manager
pub task_manager: TaskManager,
/// The wallet for testing
pub wallet: Wallet,
/// Temporary directories that must be kept alive for the duration of the test
@@ -68,8 +66,7 @@ pub async fn setup_engine_with_chain_import(
+ Copy
+ 'static,
) -> eyre::Result<ChainImportResult> {
let tasks = TaskManager::current();
let exec = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
let network_config = NetworkArgs {
discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },
@@ -129,6 +126,7 @@ pub async fn setup_engine_with_chain_import(
.with_default_tables()
.build()
.unwrap(),
reth_tasks::Runtime::test(),
)?;
// Initialize genesis if needed
@@ -221,7 +219,7 @@ pub async fn setup_engine_with_chain_import(
let node = EthereumNode::default();
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
.testing_node_with_datadir(exec.clone(), datadir.clone())
.testing_node_with_datadir(runtime.clone(), datadir.clone())
.with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
.with_components(node.components_builder())
.with_add_ons(node.add_ons())
@@ -243,7 +241,6 @@ pub async fn setup_engine_with_chain_import(
Ok(ChainImportResult {
nodes,
task_manager: tasks,
wallet: crate::Wallet::default().with_chain_id(chain_spec.chain.id()),
_temp_dirs: temp_dirs,
})
@@ -333,6 +330,7 @@ mod tests {
.with_default_tables()
.build()
.unwrap(),
reth_tasks::Runtime::test(),
)
.expect("failed to create provider factory");
@@ -397,6 +395,7 @@ mod tests {
.with_default_tables()
.build()
.unwrap(),
reth_tasks::Runtime::test(),
)
.expect("failed to create provider factory");
@@ -497,6 +496,7 @@ mod tests {
.with_default_tables()
.build()
.unwrap(),
reth_tasks::Runtime::test(),
)
.expect("failed to create provider factory");

View File

@@ -210,7 +210,7 @@ where
let mut node_clients = Vec::new();
match result {
Ok((nodes, executor, _wallet)) => {
Ok((nodes, _wallet)) => {
// create HTTP clients for each node's RPC and Engine API endpoints
for node in &nodes {
node_clients.push(node.to_node_client()?);
@@ -218,12 +218,11 @@ where
// spawn a separate task just to handle the shutdown
tokio::spawn(async move {
// keep nodes and executor in scope to ensure they're not dropped
// keep nodes in scope to ensure they're not dropped
let _nodes = nodes;
let _executor = executor;
// Wait for shutdown signal
let _ = shutdown_rx.recv().await;
// nodes and executor will be dropped here when the test completes
// nodes will be dropped here when the test completes
});
}
Err(e) => {

View File

@@ -370,15 +370,14 @@ async fn test_setup_builder_with_custom_tree_config() -> Result<()> {
.build(),
);
let (nodes, _tasks, _wallet) =
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, |_| {
EthPayloadBuilderAttributes::default()
})
.with_tree_config_modifier(|config| {
config.with_persistence_threshold(0).with_memory_block_buffer_target(5)
})
.build()
.await?;
let (nodes, _wallet) = E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, |_| {
EthPayloadBuilderAttributes::default()
})
.with_tree_config_modifier(|config| {
config.with_persistence_threshold(0).with_memory_block_buffer_target(5)
})
.build()
.await?;
assert_eq!(nodes.len(), 1);

View File

@@ -119,7 +119,7 @@ async fn test_rocksdb_node_startup() -> Result<()> {
let chain_spec = test_chain_spec();
let (nodes, _tasks, _wallet) =
let (nodes, _wallet) =
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
.with_storage_v2()
.build()
@@ -147,7 +147,7 @@ async fn test_rocksdb_block_mining() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _wallet) =
let (mut nodes, _wallet) =
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
.with_storage_v2()
.build()
@@ -201,7 +201,7 @@ async fn test_rocksdb_transaction_queries() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
@@ -268,7 +268,7 @@ async fn test_rocksdb_multi_tx_same_block() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
@@ -336,7 +336,7 @@ async fn test_rocksdb_txs_across_blocks() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
@@ -421,7 +421,7 @@ async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
@@ -485,7 +485,7 @@ async fn test_rocksdb_reorg_unwind() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,

View File

@@ -1,6 +1,7 @@
//! Engine tree configuration.
use alloy_eips::merge::EPOCH_SLOTS;
use core::time::Duration;
/// Triggers persistence when the number of canonical blocks in memory exceeds this threshold.
pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
@@ -64,6 +65,9 @@ pub const DEFAULT_SPARSE_TRIE_PRUNE_DEPTH: usize = 4;
/// Storage tries beyond this limit are cleared (but allocations preserved).
pub const DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES: usize = 100;
/// Default timeout for the state root task before spawning a sequential fallback.
pub const DEFAULT_STATE_ROOT_TASK_TIMEOUT: Duration = Duration::from_secs(1);
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = EPOCH_SLOTS as u32 * 2;
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
@@ -175,6 +179,11 @@ pub struct TreeConfig {
sparse_trie_prune_depth: usize,
/// Maximum number of storage tries to retain after pruning.
sparse_trie_max_storage_tries: usize,
/// Timeout for the state root task before spawning a sequential fallback computation.
/// If `Some`, after waiting this duration for the state root task, a sequential state root
/// computation is spawned in parallel and whichever finishes first is used.
/// If `None`, the timeout fallback is disabled.
state_root_task_timeout: Option<Duration>,
}
impl Default for TreeConfig {
@@ -207,6 +216,7 @@ impl Default for TreeConfig {
disable_trie_cache: false,
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
state_root_task_timeout: Some(DEFAULT_STATE_ROOT_TASK_TIMEOUT),
}
}
}
@@ -241,6 +251,7 @@ impl TreeConfig {
disable_cache_metrics: bool,
sparse_trie_prune_depth: usize,
sparse_trie_max_storage_tries: usize,
state_root_task_timeout: Option<Duration>,
) -> Self {
Self {
persistence_threshold,
@@ -270,6 +281,7 @@ impl TreeConfig {
disable_trie_cache: false,
sparse_trie_prune_depth,
sparse_trie_max_storage_tries,
state_root_task_timeout,
}
}
@@ -618,4 +630,15 @@ impl TreeConfig {
self.sparse_trie_max_storage_tries = max_tries;
self
}
/// Returns the state root task timeout.
pub const fn state_root_task_timeout(&self) -> Option<Duration> {
self.state_root_task_timeout
}
/// Setter for state root task timeout.
pub const fn with_state_root_task_timeout(mut self, timeout: Option<Duration>) -> Self {
self.state_root_task_timeout = timeout;
self
}
}

View File

@@ -23,7 +23,7 @@ use serde::{de::DeserializeOwned, Serialize};
// Re-export [`ExecutionPayload`] moved to `reth_payload_primitives`
#[cfg(feature = "std")]
pub use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator, ExecutableTxTuple};
pub use reth_evm::{ConfigureEngineEvm, ConvertTx, ExecutableTxIterator, ExecutableTxTuple};
pub use reth_payload_primitives::ExecutionPayload;
mod error;

View File

@@ -201,6 +201,7 @@ mod tests {
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
changeset_cache.clone(),
reth_tasks::Runtime::test(),
);
let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();

View File

@@ -141,6 +141,7 @@ test-utils = [
"reth-ethereum-primitives/test-utils",
"reth-node-ethereum/test-utils",
"reth-evm-ethereum/test-utils",
"reth-tasks/test-utils",
]
[[test]]

View File

@@ -12,8 +12,7 @@ use rand::Rng;
use reth_chainspec::ChainSpec;
use reth_db_common::init::init_genesis;
use reth_engine_tree::tree::{
executor::WorkloadExecutor, precompile_cache::PrecompileCacheMap, PayloadProcessor,
StateProviderBuilder, TreeConfig,
precompile_cache::PrecompileCacheMap, PayloadProcessor, StateProviderBuilder, TreeConfig,
};
use reth_ethereum_primitives::TransactionSigned;
use reth_evm::OnStateHook;
@@ -219,7 +218,7 @@ fn bench_state_root(c: &mut Criterion) {
setup_provider(&factory, &state_updates).expect("failed to setup provider");
let payload_processor = PayloadProcessor::new(
WorkloadExecutor::default(),
reth_tasks::Runtime::test(),
EthEvmConfig::new(factory.chain_spec()),
&TreeConfig::default(),
PrecompileCacheMap::default(),

View File

@@ -138,7 +138,7 @@ impl<N: ProviderNodeTypes> PipelineSync<N> {
let (tx, rx) = oneshot::channel();
let pipeline = pipeline.take().expect("exists");
self.pipeline_task_spawner.spawn_critical_blocking(
self.pipeline_task_spawner.spawn_critical_blocking_task(
"pipeline task",
Box::pin(async move {
let result = pipeline.run_as_fut(Some(target)).await;

View File

@@ -76,8 +76,16 @@ impl CacheConfig for EpochCacheConfig {
type FixedCache<K, V, H = DefaultHashBuilder> = fixed_cache::Cache<K, V, H, EpochCacheConfig>;
/// A wrapper of a state provider and a shared cache.
///
/// The const generic `PREWARM` controls whether every cache miss is populated. This is only
/// relevant for pre-warm transaction execution with the intention to pre-populate the cache with
/// data for regular block execution. During regular block execution the cache doesn't need to be
/// populated because the actual EVM database [`State`](revm::database::State) also caches
/// internally during block execution and the cache is then updated after the block with the entire
/// [`BundleState`] output of that block which contains all accessed accounts, code, storage. See
/// also [`ExecutionCache::insert_state`].
#[derive(Debug)]
pub struct CachedStateProvider<S> {
pub struct CachedStateProvider<S, const PREWARM: bool = false> {
/// The state provider
state_provider: S,
@@ -86,15 +94,9 @@ pub struct CachedStateProvider<S> {
/// Metrics for the cached state provider
metrics: CachedStateMetrics,
/// If prewarm enabled we populate every cache miss
prewarm: bool,
}
impl<S> CachedStateProvider<S>
where
S: StateProvider,
{
impl<S> CachedStateProvider<S> {
/// Creates a new [`CachedStateProvider`] from an [`ExecutionCache`], state provider, and
/// [`CachedStateMetrics`].
pub const fn new(
@@ -102,27 +104,18 @@ where
caches: ExecutionCache,
metrics: CachedStateMetrics,
) -> Self {
Self { state_provider, caches, metrics, prewarm: false }
Self { state_provider, caches, metrics }
}
}
impl<S> CachedStateProvider<S> {
/// Enables pre-warm mode so that every cache miss is populated.
///
/// This is only relevant for pre-warm transaction execution with the intention to pre-populate
/// the cache with data for regular block execution. During regular block execution the
/// cache doesn't need to be populated because the actual EVM database
/// [`State`](revm::database::State) also caches internally during block execution and the cache
/// is then updated after the block with the entire [`BundleState`] output of that block which
/// contains all accessed accounts,code,storage. See also [`ExecutionCache::insert_state`].
pub const fn prewarm(mut self) -> Self {
self.prewarm = true;
self
}
/// Returns whether this provider should pre-warm cache misses.
const fn is_prewarm(&self) -> bool {
self.prewarm
impl<S> CachedStateProvider<S, true> {
/// Creates a new [`CachedStateProvider`] with prewarming enabled.
pub const fn new_prewarm(
state_provider: S,
caches: ExecutionCache,
metrics: CachedStateMetrics,
) -> Self {
Self { state_provider, caches, metrics }
}
}
@@ -307,9 +300,9 @@ impl<K: PartialEq, V> StatsHandler<K, V> for CacheStatsHandler {
}
}
impl<S: AccountReader> AccountReader for CachedStateProvider<S> {
impl<S: AccountReader, const PREWARM: bool> AccountReader for CachedStateProvider<S, PREWARM> {
fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
if self.is_prewarm() {
if PREWARM {
match self.caches.get_or_try_insert_account_with(*address, || {
self.state_provider.basic_account(address)
})? {
@@ -334,13 +327,13 @@ pub enum CachedStatus<T> {
Cached(T),
}
impl<S: StateProvider> StateProvider for CachedStateProvider<S> {
impl<S: StateProvider, const PREWARM: bool> StateProvider for CachedStateProvider<S, PREWARM> {
fn storage(
&self,
account: Address,
storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
if self.is_prewarm() {
if PREWARM {
match self.caches.get_or_try_insert_storage_with(account, storage_key, || {
self.state_provider.storage(account, storage_key).map(Option::unwrap_or_default)
})? {
@@ -360,9 +353,9 @@ impl<S: StateProvider> StateProvider for CachedStateProvider<S> {
}
}
impl<S: BytecodeReader> BytecodeReader for CachedStateProvider<S> {
impl<S: BytecodeReader, const PREWARM: bool> BytecodeReader for CachedStateProvider<S, PREWARM> {
fn bytecode_by_hash(&self, code_hash: &B256) -> ProviderResult<Option<Bytecode>> {
if self.is_prewarm() {
if PREWARM {
match self.caches.get_or_try_insert_code_with(*code_hash, || {
self.state_provider.bytecode_by_hash(code_hash)
})? {
@@ -378,7 +371,9 @@ impl<S: BytecodeReader> BytecodeReader for CachedStateProvider<S> {
}
}
impl<S: StateRootProvider> StateRootProvider for CachedStateProvider<S> {
impl<S: StateRootProvider, const PREWARM: bool> StateRootProvider
for CachedStateProvider<S, PREWARM>
{
fn state_root(&self, hashed_state: HashedPostState) -> ProviderResult<B256> {
self.state_provider.state_root(hashed_state)
}
@@ -402,7 +397,9 @@ impl<S: StateRootProvider> StateRootProvider for CachedStateProvider<S> {
}
}
impl<S: StateProofProvider> StateProofProvider for CachedStateProvider<S> {
impl<S: StateProofProvider, const PREWARM: bool> StateProofProvider
for CachedStateProvider<S, PREWARM>
{
fn proof(
&self,
input: TrieInput,
@@ -429,7 +426,9 @@ impl<S: StateProofProvider> StateProofProvider for CachedStateProvider<S> {
}
}
impl<S: StorageRootProvider> StorageRootProvider for CachedStateProvider<S> {
impl<S: StorageRootProvider, const PREWARM: bool> StorageRootProvider
for CachedStateProvider<S, PREWARM>
{
fn storage_root(
&self,
address: Address,
@@ -457,7 +456,7 @@ impl<S: StorageRootProvider> StorageRootProvider for CachedStateProvider<S> {
}
}
impl<S: BlockHashReader> BlockHashReader for CachedStateProvider<S> {
impl<S: BlockHashReader, const PREWARM: bool> BlockHashReader for CachedStateProvider<S, PREWARM> {
fn block_hash(&self, number: alloy_primitives::BlockNumber) -> ProviderResult<Option<B256>> {
self.state_provider.block_hash(number)
}
@@ -471,7 +470,9 @@ impl<S: BlockHashReader> BlockHashReader for CachedStateProvider<S> {
}
}
impl<S: HashedPostStateProvider> HashedPostStateProvider for CachedStateProvider<S> {
impl<S: HashedPostStateProvider, const PREWARM: bool> HashedPostStateProvider
for CachedStateProvider<S, PREWARM>
{
fn hashed_post_state(&self, bundle_state: &reth_revm::db::BundleState) -> HashedPostState {
self.state_provider.hashed_post_state(bundle_state)
}

View File

@@ -8,9 +8,17 @@ use reth_metrics::{
metrics::{Counter, Gauge, Histogram},
Metrics,
};
use reth_primitives_traits::constants::gas_units::MEGAGAS;
use reth_trie::updates::TrieUpdates;
use std::time::{Duration, Instant};
/// Width of each gas bucket in gas units (10 Mgas).
const GAS_BUCKET_SIZE: u64 = 10 * MEGAGAS;
/// Number of gas buckets. The last bucket is a catch-all for everything above
/// `(NUM_GAS_BUCKETS - 1) * GAS_BUCKET_SIZE`.
const NUM_GAS_BUCKETS: usize = 5;
/// Metrics for the `EngineApi`.
#[derive(Debug, Default)]
pub struct EngineApiMetrics {
@@ -235,6 +243,63 @@ impl ForkchoiceUpdatedMetrics {
}
}
/// Per-gas-bucket newPayload metrics, initialized once via [`Self::new_with_labels`].
#[derive(Clone, Metrics)]
#[metrics(scope = "consensus.engine.beacon")]
pub(crate) struct NewPayloadGasBucketMetrics {
/// Latency for new payload calls in this gas bucket.
pub(crate) new_payload_gas_bucket_latency: Histogram,
/// Gas per second for new payload calls in this gas bucket.
pub(crate) new_payload_gas_bucket_gas_per_second: Histogram,
}
/// Holds pre-initialized [`NewPayloadGasBucketMetrics`] instances, one per gas bucket.
#[derive(Debug)]
pub(crate) struct GasBucketMetrics {
buckets: [NewPayloadGasBucketMetrics; NUM_GAS_BUCKETS],
}
impl Default for GasBucketMetrics {
fn default() -> Self {
Self {
buckets: std::array::from_fn(|i| {
let label = Self::bucket_label(i);
NewPayloadGasBucketMetrics::new_with_labels(&[("gas_bucket", label)])
}),
}
}
}
impl GasBucketMetrics {
fn record(&self, gas_used: u64, elapsed: Duration) {
let idx = Self::bucket_index(gas_used);
self.buckets[idx].new_payload_gas_bucket_latency.record(elapsed);
self.buckets[idx]
.new_payload_gas_bucket_gas_per_second
.record(gas_used as f64 / elapsed.as_secs_f64());
}
fn bucket_index(gas_used: u64) -> usize {
let idx = gas_used / GAS_BUCKET_SIZE;
(idx as usize).min(NUM_GAS_BUCKETS - 1)
}
/// Returns a human-readable label like `<10M`, `10-20M`, … `>40M`.
fn bucket_label(index: usize) -> String {
let m = GAS_BUCKET_SIZE / 1_000_000;
if index == 0 {
format!("<{m}M")
} else if index < NUM_GAS_BUCKETS - 1 {
let lo = m * index as u64;
let hi = lo + m;
format!("{lo}-{hi}M")
} else {
let lo = m * index as u64;
format!(">{lo}M")
}
}
}
/// Metrics for engine newPayload responses.
#[derive(Metrics)]
#[metrics(scope = "consensus.engine.beacon")]
@@ -245,6 +310,9 @@ pub(crate) struct NewPayloadStatusMetrics {
/// Start time of the latest new payload call.
#[metric(skip)]
pub(crate) latest_start_at: Option<Instant>,
/// Gas-bucket-labeled latency and gas/s histograms.
#[metric(skip)]
pub(crate) gas_bucket: GasBucketMetrics,
/// The total count of new payload messages received.
pub(crate) new_payload_messages: Counter,
/// The total count of new payload messages that we responded to with
@@ -321,6 +389,7 @@ impl NewPayloadStatusMetrics {
self.new_payload_messages.increment(1);
self.new_payload_latency.record(elapsed);
self.new_payload_last.set(elapsed);
self.gas_bucket.record(gas_used, elapsed);
if let Some(latest_forkchoice_updated_at) = latest_forkchoice_updated_at.take() {
self.forkchoice_updated_new_payload_time_diff
.record(start - latest_forkchoice_updated_at);
@@ -365,6 +434,8 @@ pub struct BlockValidationMetrics {
pub state_root_parallel_fallback_total: Counter,
/// Total number of times the state root task failed but the fallback succeeded.
pub state_root_task_fallback_success_total: Counter,
/// Total number of times the state root task timed out and a sequential fallback was spawned.
pub state_root_task_timeout_total: Counter,
/// Latest state root duration, ie the time spent blocked waiting for the state root.
pub state_root_duration: Gauge,
/// Histogram for state root duration ie the time spent blocked waiting for the state root

View File

@@ -1510,7 +1510,7 @@ where
.engine
.failed_forkchoice_updated_response_deliveries
.increment(1);
error!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
}
}
BeaconEngineMessage::NewPayload { payload, tx } => {
@@ -1534,7 +1534,7 @@ where
BeaconOnNewPayloadError::Internal(Box::new(e))
}))
{
error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
self.metrics
.engine
.failed_new_payload_response_deliveries

View File

@@ -1,47 +0,0 @@
//! Executor for mixed I/O and CPU workloads.
use reth_trie_parallel::root::get_tokio_runtime_handle;
use tokio::{runtime::Handle, task::JoinHandle};
/// An executor for mixed I/O and CPU workloads.
///
/// This type uses tokio to spawn blocking tasks and will reuse an existing tokio
/// runtime if available or create its own.
#[derive(Debug, Clone)]
pub struct WorkloadExecutor {
inner: WorkloadExecutorInner,
}
impl Default for WorkloadExecutor {
fn default() -> Self {
Self { inner: WorkloadExecutorInner::new() }
}
}
impl WorkloadExecutor {
/// Returns the handle to the tokio runtime
pub(super) const fn handle(&self) -> &Handle {
&self.inner.handle
}
/// Runs the provided function on an executor dedicated to blocking operations.
#[track_caller]
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.inner.handle.spawn_blocking(func)
}
}
#[derive(Debug, Clone)]
struct WorkloadExecutorInner {
handle: Handle,
}
impl WorkloadExecutorInner {
fn new() -> Self {
Self { handle: get_tokio_runtime_handle() }
}
}

View File

@@ -15,7 +15,6 @@ use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
use alloy_evm::block::StateChangeSource;
use alloy_primitives::B256;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use executor::WorkloadExecutor;
use metrics::{Counter, Histogram};
use multiproof::{SparseTrieUpdate, *};
use parking_lot::RwLock;
@@ -24,8 +23,8 @@ use rayon::prelude::*;
use reth_evm::{
block::ExecutableTxParts,
execute::{ExecutableTxFor, WithTxEnv},
ConfigureEvm, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook, SpecFor,
TxEnvFor,
ConfigureEvm, ConvertTx, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook,
SpecFor, TxEnvFor,
};
use reth_metrics::Metrics;
use reth_primitives_traits::NodePrimitives;
@@ -34,6 +33,7 @@ use reth_provider::{
StateProviderFactory, StateReader,
};
use reth_revm::{db::BundleState, state::EvmState};
use reth_tasks::Runtime;
use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
use reth_trie_parallel::{
proof_task::{ProofTaskCtx, ProofWorkerHandle},
@@ -55,7 +55,6 @@ use std::{
use tracing::{debug, debug_span, instrument, warn, Span};
pub mod bal;
pub mod executor;
pub mod multiproof;
mod preserved_sparse_trie;
pub mod prewarm;
@@ -109,7 +108,7 @@ where
Evm: ConfigureEvm,
{
/// The executor used by to spawn tasks.
executor: WorkloadExecutor,
executor: Runtime,
/// The most recent cache used for execution.
execution_cache: PayloadExecutionCache,
/// Metrics for trie operations
@@ -146,13 +145,13 @@ where
Evm: ConfigureEvm<Primitives = N>,
{
/// Returns a reference to the workload executor driving payload tasks.
pub const fn executor(&self) -> &WorkloadExecutor {
pub const fn executor(&self) -> &Runtime {
&self.executor
}
/// Creates a new payload processor.
pub fn new(
executor: WorkloadExecutor,
executor: Runtime,
evm_config: Evm,
config: &TreeConfig,
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
@@ -236,7 +235,8 @@ where
+ 'static,
{
// start preparing transactions immediately
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
let (prewarm_rx, execution_rx) =
self.spawn_tx_iterator(transactions, env.transaction_count);
let span = Span::current();
let (to_sparse_trie, sparse_trie_rx) = channel();
@@ -277,15 +277,7 @@ where
// Create and spawn the storage proof task
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
let storage_worker_count = config.storage_worker_count();
let account_worker_count = config.account_worker_count();
let proof_handle = ProofWorkerHandle::new(
self.executor.handle().clone(),
task_ctx,
storage_worker_count,
account_worker_count,
v2_proofs_enabled,
);
let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, v2_proofs_enabled);
if config.disable_trie_cache() {
let multi_proof_task = MultiProofTask::new(
@@ -351,7 +343,8 @@ where
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
let (prewarm_rx, execution_rx) =
self.spawn_tx_iterator(transactions, env.transaction_count);
// This path doesn't use multiproof, so V2 proofs flag doesn't matter
let prewarm_handle =
self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal, false);
@@ -364,11 +357,23 @@ where
}
}
/// Transaction count threshold below which sequential signature recovery is used.
///
/// For blocks with fewer than this many transactions, the rayon parallel iterator overhead
/// (work-stealing setup, channel-based reorder) exceeds the cost of sequential ECDSA
/// recovery. Inspired by Nethermind's `RecoverSignature` which uses sequential `foreach`
/// for small blocks.
const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
/// Spawns a task advancing transaction env iterator and streaming updates through a channel.
///
/// For blocks with fewer than [`Self::SMALL_BLOCK_TX_THRESHOLD`] transactions, uses
/// sequential iteration to avoid rayon overhead.
#[expect(clippy::type_complexity)]
fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
&self,
transactions: I,
transaction_count: usize,
) -> (
mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Recovered>>,
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
@@ -377,22 +382,51 @@ where
let (prewarm_tx, prewarm_rx) = mpsc::channel();
let (execute_tx, execute_rx) = mpsc::channel();
// Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
rayon::spawn(move || {
let (transactions, convert) = transactions.into();
transactions.into_par_iter().enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
let tx = convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
// Only send Ok(_) variants to prewarming task.
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
if transaction_count == 0 {
// Empty block — nothing to do.
} else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
// Sequential path for small blocks — avoids rayon work-stealing setup and
// channel-based reorder overhead when it costs more than the ECDSA recovery itself.
debug!(
target: "engine::tree::payload_processor",
transaction_count,
"using sequential sig recovery for small block"
);
self.executor.spawn_blocking(move || {
let (transactions, convert) = transactions.into_parts();
for (idx, tx) in transactions.into_iter().enumerate() {
let tx = convert.convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
let _ = ooo_tx.send((idx, tx));
}
let _ = ooo_tx.send((idx, tx));
});
});
} else {
// Parallel path — spawn on rayon for parallel signature recovery.
rayon::spawn(move || {
let (transactions, convert) = transactions.into_parts();
transactions.into_par_iter().enumerate().for_each_with(
ooo_tx,
|ooo_tx, (idx, tx)| {
let tx = convert.convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
// Only send Ok(_) variants to prewarming task.
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
let _ = ooo_tx.send((idx, tx));
},
);
});
}
// Spawn a task that processes out-of-order transactions from the task above and sends them
// to the execution task in order.
@@ -423,7 +457,7 @@ where
fn spawn_caching_with<P>(
&self,
env: ExecutionEnv<Evm>,
mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
provider_builder: StateProviderBuilder<N, P>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
bal: Option<Arc<BlockAccessList>>,
@@ -432,11 +466,7 @@ where
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
if self.disable_transaction_prewarming {
// if no transactions should be executed we clear them but still spawn the task for
// caching updates
transactions = mpsc::channel().1;
}
let skip_prewarm = self.disable_transaction_prewarming;
let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
@@ -465,7 +495,9 @@ where
{
let to_prewarm_task = to_prewarm_task.clone();
self.executor.spawn_blocking(move || {
let mode = if let Some(bal) = bal {
let mode = if skip_prewarm {
PrewarmMode::Skipped
} else if let Some(bal) = bal {
PrewarmMode::BlockAccessList(bal)
} else {
PrewarmMode::Transactions(transactions)
@@ -723,6 +755,18 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
.map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
}
/// Takes the state root receiver out of the handle for use with custom waiting logic
/// (e.g., timeout-based waiting).
///
/// # Panics
///
/// If payload processing was started without background tasks.
pub const fn take_state_root_rx(
&mut self,
) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
self.state_root.take().expect("state_root is None")
}
/// Returns a state hook to be used to send state updates to this task.
///
/// If a multiproof task is spawned the hook will notify it about new states.
@@ -1001,9 +1045,7 @@ mod tests {
use super::PayloadExecutionCache;
use crate::tree::{
cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
payload_processor::{
evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
},
payload_processor::{evm_state_to_hashed_post_state, PayloadProcessor},
precompile_cache::PrecompileCacheMap,
StateProviderBuilder, TreeConfig,
};
@@ -1105,7 +1147,7 @@ mod tests {
#[test]
fn on_inserted_executed_block_populates_cache() {
let payload_processor = PayloadProcessor::new(
WorkloadExecutor::default(),
reth_tasks::Runtime::test(),
EthEvmConfig::new(Arc::new(ChainSpec::default())),
&TreeConfig::default(),
PrecompileCacheMap::default(),
@@ -1134,7 +1176,7 @@ mod tests {
#[test]
fn on_inserted_executed_block_skips_on_parent_mismatch() {
let payload_processor = PayloadProcessor::new(
WorkloadExecutor::default(),
reth_tasks::Runtime::test(),
EthEvmConfig::new(Arc::new(ChainSpec::default())),
&TreeConfig::default(),
PrecompileCacheMap::default(),
@@ -1269,7 +1311,7 @@ mod tests {
}
let mut payload_processor = PayloadProcessor::new(
WorkloadExecutor::default(),
reth_tasks::Runtime::test(),
EthEvmConfig::new(factory.chain_spec()),
&TreeConfig::default(),
PrecompileCacheMap::default(),

View File

@@ -1547,17 +1547,11 @@ mod tests {
use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofWorkerHandle};
use revm_primitives::{B256, U256};
use std::sync::{Arc, OnceLock};
use tokio::runtime::{Handle, Runtime};
/// Get a handle to the test runtime, creating it if necessary
fn get_test_runtime_handle() -> Handle {
static TEST_RT: OnceLock<Runtime> = OnceLock::new();
TEST_RT
.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()
})
.handle()
.clone()
/// Get a test runtime, creating it if necessary
fn get_test_runtime() -> &'static reth_tasks::Runtime {
static TEST_RT: OnceLock<reth_tasks::Runtime> = OnceLock::new();
TEST_RT.get_or_init(reth_tasks::Runtime::test)
}
fn create_test_state_root_task<F>(factory: F) -> MultiProofTask
@@ -1573,11 +1567,11 @@ mod tests {
+ Send
+ 'static,
{
let rt_handle = get_test_runtime_handle();
let runtime = get_test_runtime();
let changeset_cache = ChangesetCache::new();
let overlay_factory = OverlayStateProviderFactory::new(factory, changeset_cache);
let task_ctx = ProofTaskCtx::new(overlay_factory);
let proof_handle = ProofWorkerHandle::new(rt_handle, task_ctx, 1, 1, false);
let proof_handle = ProofWorkerHandle::new(runtime, task_ctx, false);
let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();
let (tx, rx) = crossbeam_channel::unbounded();

View File

@@ -15,7 +15,6 @@ use crate::tree::{
cached_state::{CachedStateProvider, SavedCache},
payload_processor::{
bal::{self, total_slots, BALSlotIter},
executor::WorkloadExecutor,
multiproof::{MultiProofMessage, VersionedMultiProofTargets},
PayloadExecutionCache,
},
@@ -37,6 +36,7 @@ use reth_provider::{
StateReader,
};
use reth_revm::{database::StateProviderDatabase, state::EvmState};
use reth_tasks::Runtime;
use reth_trie::MultiProofTargets;
use std::{
ops::Range,
@@ -49,13 +49,16 @@ use std::{
};
use tracing::{debug, debug_span, instrument, trace, warn, Span};
/// Determines the prewarming mode: transaction-based or BAL-based.
/// Determines the prewarming mode: transaction-based, BAL-based, or skipped.
#[derive(Debug)]
pub enum PrewarmMode<Tx> {
/// Prewarm by executing transactions from a stream.
Transactions(Receiver<Tx>),
/// Prewarm by prefetching slots from a Block Access List.
BlockAccessList(Arc<BlockAccessList>),
/// Transaction prewarming is skipped (e.g. small blocks where the overhead exceeds the
/// benefit). No workers are spawned.
Skipped,
}
/// A wrapper for transactions that includes their index in the block.
@@ -78,7 +81,7 @@ where
Evm: ConfigureEvm<Primitives = N>,
{
/// The executor used to spawn execution tasks.
executor: WorkloadExecutor,
executor: Runtime,
/// Shared execution cache.
execution_cache: PayloadExecutionCache,
/// Context provided to execution tasks
@@ -101,7 +104,7 @@ where
{
/// Initializes the task with the given transactions pending execution
pub fn new(
executor: WorkloadExecutor,
executor: Runtime,
execution_cache: PayloadExecutionCache,
ctx: PrewarmContext<N, P, Evm>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
@@ -416,6 +419,10 @@ where
PrewarmMode::BlockAccessList(bal) => {
self.run_bal_prewarm(bal, actions_tx);
}
PrewarmMode::Skipped => {
let _ = actions_tx
.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
}
}
let mut final_execution_outcome = None;
@@ -528,11 +535,8 @@ where
if let Some(saved_cache) = saved_cache {
let caches = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
state_provider = Box::new(
CachedStateProvider::new(state_provider, caches, cache_metrics)
// ensure we pre-warm the cache
.prewarm(),
);
state_provider =
Box::new(CachedStateProvider::new_prewarm(state_provider, caches, cache_metrics));
}
let state_provider = StateProviderDatabase::new(state_provider);
@@ -590,18 +594,11 @@ where
return
};
while let Ok(IndexedTransaction { index, tx }) = {
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "recv tx")
.entered();
txs.recv()
} {
let enter = debug_span!(
while let Ok(IndexedTransaction { index, tx }) = txs.recv() {
let _enter = debug_span!(
target: "engine::tree::payload_processor::prewarm",
"prewarm tx",
index,
tx_hash = %tx.tx().tx_hash(),
is_success = tracing::field::Empty,
gas_used = tracing::field::Empty,
)
.entered();
@@ -632,12 +629,6 @@ where
};
metrics.execution_duration.record(start.elapsed());
// record some basic information about the transactions
enter.record("gas_used", res.result.gas_used());
enter.record("is_success", res.result.is_success());
drop(enter);
// If the task was cancelled, stop execution, and exit.
if terminate_execution.load(Ordering::Relaxed) {
break
@@ -646,16 +637,12 @@ where
// Only send outcome for transactions after the first txn
// as the main execution will be just as fast
if index > 0 {
let _enter =
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
.entered();
let (targets, storage_targets) =
multiproof_targets_from_state(res.state, v2_proofs_enabled);
metrics.prefetch_storage_targets.record(storage_targets as f64);
if let Some(to_multi_proof) = &to_multi_proof {
let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
}
drop(_enter);
}
metrics.total_runtime.record(start.elapsed());
@@ -671,7 +658,7 @@ where
fn spawn_workers<Tx>(
self,
workers_needed: usize,
task_executor: &WorkloadExecutor,
task_executor: &Runtime,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
done_tx: Sender<()>,
) -> CrossbeamSender<IndexedTransaction<Tx>>
@@ -708,7 +695,7 @@ where
fn spawn_bal_worker(
&self,
idx: usize,
executor: &WorkloadExecutor,
executor: &Runtime,
bal: Arc<BlockAccessList>,
range: Range<usize>,
done_tx: Sender<()>,

View File

@@ -1,6 +1,5 @@
//! Sparse Trie task related functionality.
use super::executor::WorkloadExecutor;
use crate::tree::{
multiproof::{
dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
@@ -13,6 +12,7 @@ use alloy_rlp::{Decodable, Encodable};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use rayon::iter::ParallelIterator;
use reth_primitives_traits::{Account, ParallelBridgeBuffered};
use reth_tasks::Runtime;
use reth_trie::{
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
TrieAccount, EMPTY_ROOT_HASH, TRIE_ACCOUNT_RLP_MAX_SIZE,
@@ -28,7 +28,7 @@ use reth_trie_parallel::{
use reth_trie_sparse::{
errors::{SparseStateTrieResult, SparseTrieErrorKind, SparseTrieResult},
provider::{TrieNodeProvider, TrieNodeProviderFactory},
DeferredDrops, LeafUpdate, ParallelSparseTrie, SparseStateTrie, SparseTrie, SparseTrieExt,
DeferredDrops, LeafUpdate, ParallelSparseTrie, SparseStateTrie, SparseTrie,
};
use revm_primitives::{hash_map::Entry, B256Map};
use smallvec::SmallVec;
@@ -44,8 +44,8 @@ where
BPF: TrieNodeProviderFactory + Send + Sync,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
{
Cleared(SparseTrieTask<BPF, A, S>),
Cached(SparseTrieCacheTask<A, S>),
@@ -56,8 +56,8 @@ where
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
{
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
match self {
@@ -117,8 +117,8 @@ where
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
{
/// Creates a new sparse trie task with the given trie.
pub(super) const fn new(
@@ -277,12 +277,12 @@ pub(super) struct SparseTrieCacheTask<A = ParallelSparseTrie, S = ParallelSparse
impl<A, S> SparseTrieCacheTask<A, S>
where
A: SparseTrieExt + Default,
S: SparseTrieExt + Default + Clone,
A: SparseTrie + Default,
S: SparseTrie + Default + Clone,
{
/// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`].
pub(super) fn new_with_trie(
executor: &WorkloadExecutor,
executor: &Runtime,
updates: CrossbeamReceiver<MultiProofMessage>,
proof_worker_handle: ProofWorkerHandle,
metrics: MultiProofTaskMetrics,
@@ -486,7 +486,7 @@ where
}
#[instrument(
level = "debug",
level = "trace",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
@@ -518,7 +518,7 @@ where
/// Processes a hashed state update and encodes all state changes as trie updates.
#[instrument(
level = "debug",
level = "trace",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]

View File

@@ -4,7 +4,7 @@ use crate::tree::{
cached_state::CachedStateProvider,
error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
instrumented_state::InstrumentedStateProvider,
payload_processor::{executor::WorkloadExecutor, PayloadProcessor},
payload_processor::PayloadProcessor,
precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
sparse_trie::StateRootComputeOutcome,
EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle, StateProviderBuilder,
@@ -17,7 +17,6 @@ use alloy_evm::Evm;
use alloy_primitives::B256;
use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle};
use rayon::prelude::*;
use reth_chain_state::{CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, LazyOverlay};
use reth_consensus::{ConsensusError, FullConsensus, ReceiptRootBloom};
use reth_engine_primitives::{
@@ -49,7 +48,7 @@ use revm_primitives::Address;
use std::{
collections::HashMap,
panic::{self, AssertUnwindSafe},
sync::Arc,
sync::{mpsc::RecvTimeoutError, Arc},
time::Instant,
};
use tracing::{debug, debug_span, error, info, instrument, trace, warn};
@@ -134,6 +133,8 @@ where
validator: V,
/// Changeset cache for in-memory trie changesets
changeset_cache: ChangesetCache,
/// Task runtime for spawning parallel work.
runtime: reth_tasks::Runtime,
}
impl<N, P, Evm, V> BasicEngineValidator<P, Evm, V>
@@ -166,10 +167,11 @@ where
config: TreeConfig,
invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
changeset_cache: ChangesetCache,
runtime: reth_tasks::Runtime,
) -> Self {
let precompile_cache_map = PrecompileCacheMap::default();
let payload_processor = PayloadProcessor::new(
WorkloadExecutor::default(),
runtime.clone(),
evm_config.clone(),
&config,
precompile_cache_map.clone(),
@@ -186,6 +188,7 @@ where
metrics: EngineApiMetrics::default(),
validator,
changeset_cache,
runtime,
}
}
@@ -228,35 +231,20 @@ where
V: PayloadValidator<T, Block = N::Block>,
Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
{
match input {
Ok(match input {
BlockOrPayload::Payload(payload) => {
let (iter, convert) = self
let iter = self
.evm_config
.tx_iterator_for_payload(payload)
.map_err(NewPayloadError::other)?
.into();
let iter = Either::Left(iter.into_par_iter().map(Either::Left));
let convert = move |tx| {
let Either::Left(tx) = tx else { unreachable!() };
convert(tx).map(Either::Left).map_err(Either::Left)
};
// Box the closure to satisfy the `Fn` bound both here and in the branch below
Ok((iter, Box::new(convert) as Box<dyn Fn(_) -> _ + Send + Sync + 'static>))
.map_err(NewPayloadError::other)?;
Either::Left(iter)
}
BlockOrPayload::Block(block) => {
let iter = Either::Right(
block.body().clone_transactions().into_par_iter().map(Either::Right),
);
let convert = move |tx: Either<_, N::SignedTx>| {
let Either::Right(tx) = tx else { unreachable!() };
tx.try_into_recovered().map(Either::Right).map_err(Either::Right)
};
Ok((iter, Box::new(convert)))
let txs = block.body().clone_transactions();
let convert = |tx: N::SignedTx| tx.try_into_recovered();
Either::Right((txs, convert))
}
}
})
}
/// Returns a [`ExecutionCtxFor`] for the given payload or block.
@@ -515,7 +503,17 @@ where
match strategy {
StateRootStrategy::StateRootTask => {
debug!(target: "engine::tree::payload_validator", "Using sparse trie state root algorithm");
match handle.state_root() {
let task_result = ensure_ok_post_block!(
self.await_state_root_with_timeout(
&mut handle,
overlay_factory.clone(),
&hashed_state,
),
block
);
match task_result {
Ok(StateRootComputeOutcome { state_root, trie_updates }) => {
let elapsed = root_time.elapsed();
info!(target: "engine::tree::payload_validator", ?state_root, ?elapsed, "State root task finished");
@@ -586,7 +584,7 @@ where
}
let (root, updates) = ensure_ok_post_block!(
self.compute_state_root_serial(overlay_factory.clone(), &hashed_state),
Self::compute_state_root_serial(overlay_factory.clone(), &hashed_state),
block
);
@@ -818,21 +816,18 @@ where
let tx = tx_result.map_err(BlockExecutionError::other)?;
let tx_signer = *<Tx as alloy_evm::RecoveredTx<InnerTx>>::signer(&tx);
let tx_hash = <Tx as alloy_evm::RecoveredTx<InnerTx>>::tx(&tx).tx_hash();
senders.push(tx_signer);
let span = debug_span!(
let _enter = debug_span!(
target: "engine::tree",
"execute tx",
?tx_hash,
gas_used = tracing::field::Empty,
);
let enter = span.entered();
)
.entered();
trace!(target: "engine::tree", "Executing transaction");
let tx_start = Instant::now();
let gas_used = executor.execute_transaction(tx)?;
executor.execute_transaction(tx)?;
self.metrics.record_transaction_execution(tx_start.elapsed());
let current_len = executor.receipts().len();
@@ -844,8 +839,6 @@ where
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
}
}
enter.record("gas_used", gas_used);
}
drop(exec_span);
@@ -874,7 +867,8 @@ where
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
let overlay_factory =
overlay_factory.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
ParallelStateRoot::new(overlay_factory, prefix_sets).incremental_root_with_updates()
ParallelStateRoot::new(overlay_factory, prefix_sets, self.runtime.clone())
.incremental_root_with_updates()
}
/// Compute state root for the given hashed post state in serial.
@@ -883,7 +877,6 @@ where
/// [`HashedPostState`] containing the changes of this block, to compute the state root and
/// trie updates for this block.
fn compute_state_root_serial(
&self,
overlay_factory: OverlayStateProviderFactory<P>,
hashed_state: &HashedPostState,
) -> ProviderResult<(B256, TrieUpdates)> {
@@ -901,6 +894,102 @@ where
.root_with_updates()?)
}
/// Awaits the state root from the background task, with an optional timeout fallback.
///
/// If a timeout is configured (`state_root_task_timeout`), this method first waits for the
/// state root task up to the timeout duration. If the task doesn't complete in time, a
/// sequential state root computation is spawned via `spawn_blocking`. Both computations
/// then race: the main thread polls the task receiver and the sequential result channel
/// in a loop, returning whichever finishes first.
///
/// If no timeout is configured, this simply awaits the state root task without any fallback.
///
/// Returns `ProviderResult<Result<...>>` where the outer `ProviderResult` captures
/// unrecoverable errors from the sequential fallback (e.g. DB errors), while the inner
/// `Result` captures parallel state root task errors that can still fall back to serial.
#[instrument(
level = "debug",
target = "engine::tree::payload_validator",
name = "await_state_root",
skip_all
)]
fn await_state_root_with_timeout<Tx, Err, R: Send + Sync + 'static>(
&self,
handle: &mut PayloadHandle<Tx, Err, R>,
overlay_factory: OverlayStateProviderFactory<P>,
hashed_state: &HashedPostState,
) -> ProviderResult<Result<StateRootComputeOutcome, ParallelStateRootError>> {
let Some(timeout) = self.config.state_root_task_timeout() else {
return Ok(handle.state_root());
};
let task_rx = handle.take_state_root_rx();
match task_rx.recv_timeout(timeout) {
Ok(result) => Ok(result),
Err(RecvTimeoutError::Disconnected) => {
Ok(Err(ParallelStateRootError::Other("sparse trie task dropped".to_string())))
}
Err(RecvTimeoutError::Timeout) => {
warn!(
target: "engine::tree::payload_validator",
?timeout,
"State root task timed out, spawning sequential fallback"
);
self.metrics.block_validation.state_root_task_timeout_total.increment(1);
let (seq_tx, seq_rx) =
std::sync::mpsc::channel::<ProviderResult<(B256, TrieUpdates)>>();
let seq_overlay = overlay_factory;
let seq_hashed_state = hashed_state.clone();
self.payload_processor.executor().spawn_blocking(move || {
let result = Self::compute_state_root_serial(seq_overlay, &seq_hashed_state);
let _ = seq_tx.send(result);
});
const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
loop {
match task_rx.recv_timeout(POLL_INTERVAL) {
Ok(result) => {
debug!(
target: "engine::tree::payload_validator",
source = "task",
"State root timeout race won"
);
return Ok(result);
}
Err(RecvTimeoutError::Disconnected) => {
debug!(
target: "engine::tree::payload_validator",
"State root task dropped, waiting for sequential fallback"
);
let result = seq_rx.recv().map_err(|_| {
ProviderError::other(std::io::Error::other(
"both state root computations failed",
))
})?;
let (state_root, trie_updates) = result?;
return Ok(Ok(StateRootComputeOutcome { state_root, trie_updates }));
}
Err(RecvTimeoutError::Timeout) => {}
}
if let Ok(result) = seq_rx.try_recv() {
debug!(
target: "engine::tree::payload_validator",
source = "sequential",
"State root timeout race won"
);
let (state_root, trie_updates) = result?;
return Ok(Ok(StateRootComputeOutcome { state_root, trie_updates }));
}
}
}
}
}
/// Compares trie updates from the state root task with serial state root computation.
///
/// This is used for debugging and validating the correctness of the parallel state root
@@ -915,7 +1004,7 @@ where
) {
debug!(target: "engine::tree::payload_validator", "Comparing trie updates with serial computation");
match self.compute_state_root_serial(overlay_factory.clone(), hashed_state) {
match Self::compute_state_root_serial(overlay_factory.clone(), hashed_state) {
Ok((serial_root, serial_trie_updates)) => {
debug!(
target: "engine::tree::payload_validator",

View File

@@ -203,6 +203,7 @@ impl TestHarness {
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
changeset_cache.clone(),
reth_tasks::Runtime::test(),
);
let tree = EngineApiTreeHandler::new(
@@ -404,6 +405,7 @@ impl ValidatorTestHarness {
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
changeset_cache,
reth_tasks::Runtime::test(),
);
Self { harness, validator, metrics: TestMetrics::default() }

View File

@@ -22,6 +22,7 @@ reth-node-core.workspace = true
reth-node-ethereum.workspace = true
reth-node-metrics.workspace = true
reth-rpc-server-types.workspace = true
reth-tasks.workspace = true
reth-tracing.workspace = true
reth-node-api.workspace = true

View File

@@ -17,6 +17,7 @@ use reth_node_builder::{NodeBuilder, WithLaunchContext};
use reth_node_ethereum::{consensus::EthBeaconConsensus, EthEvmConfig, EthereumNode};
use reth_node_metrics::recorder::install_prometheus_recorder;
use reth_rpc_server_types::RpcModuleValidator;
use reth_tasks::RayonConfig;
use reth_tracing::{FileWorkerGuard, Layers};
use std::{fmt, sync::Arc};
@@ -153,6 +154,16 @@ where
Rpc::validate_selection(ws_api, "ws.api").map_err(|e| eyre!("{e}"))?;
}
let rayon_config = RayonConfig {
reserved_cpu_cores: command.engine.reserved_cpu_cores,
proof_storage_worker_threads: command.engine.storage_worker_count,
proof_account_worker_threads: command.engine.account_worker_count,
..Default::default()
};
let runner = CliRunner::try_with_runtime_config(
reth_tasks::RuntimeConfig::default().with_rayon(rayon_config),
)?;
runner.run_command_until_exit(|ctx| {
command.execute(ctx, FnLauncher::new::<C, Ext>(launcher))
})

View File

@@ -92,7 +92,7 @@ impl<
/// This accepts a closure that is used to launch the node via the
/// [`NodeCommand`](node::NodeCommand).
///
/// This command will be run on the [default tokio runtime](reth_cli_runner::tokio_runtime).
/// This command will be run on the default tokio runtime.
///
///
/// # Example
@@ -143,7 +143,7 @@ impl<
/// This accepts a closure that is used to launch the node via the
/// [`NodeCommand`](node::NodeCommand).
///
/// This command will be run on the [default tokio runtime](reth_cli_runner::tokio_runtime).
/// This command will be run on the default tokio runtime.
pub fn run_with_components<N>(
self,
components: impl CliComponentsBuilder<N>,

View File

@@ -273,6 +273,7 @@ where
gas_limit: payload.payload.gas_limit(),
basefee: payload.payload.saturated_base_fee_per_gas(),
blob_excess_gas_and_price,
slot_num: 0,
};
Ok(EvmEnv { cfg_env, block_env })

View File

@@ -108,8 +108,7 @@ impl<'a, DB: Database, I: Inspector<EthEvmContext<&'a mut State<DB>>>> BlockExec
result: ResultAndState::new(
ExecutionResult::Success {
reason: SuccessReason::Return,
gas_used: 0,
gas_refunded: 0,
gas: Default::default(),
logs: vec![],
output: Output::Call(Bytes::from(vec![])),
},

View File

@@ -112,4 +112,5 @@ test-utils = [
"reth-primitives-traits/test-utils",
"reth-evm-ethereum/test-utils",
"reth-stages-types/test-utils",
"reth-tasks/test-utils",
]

View File

@@ -107,26 +107,16 @@ impl EthereumNode {
/// use reth_chainspec::MAINNET;
/// use reth_node_ethereum::EthereumNode;
///
/// let factory = EthereumNode::provider_factory_builder()
/// .open_read_only(MAINNET.clone(), "datadir")
/// .unwrap();
/// fn demo(runtime: reth_tasks::Runtime) {
/// let factory = EthereumNode::provider_factory_builder()
/// .open_read_only(MAINNET.clone(), "datadir", runtime)
/// .unwrap();
/// }
/// ```
///
/// # Open a Providerfactory manually with all required components
///
/// ```no_run
/// use reth_chainspec::ChainSpecBuilder;
/// use reth_db::open_db_read_only;
/// use reth_node_ethereum::EthereumNode;
/// use reth_provider::providers::{RocksDBProvider, StaticFileProvider};
///
/// let factory = EthereumNode::provider_factory_builder()
/// .db(open_db_read_only("db", Default::default()).unwrap())
/// .chainspec(ChainSpecBuilder::mainnet().build().into())
/// .static_file(StaticFileProvider::read_only("db/static_files", false).unwrap())
/// .rocksdb_provider(RocksDBProvider::builder("db/rocksdb").build().unwrap())
/// .build_provider_factory();
/// ```
/// See also [`ProviderFactory::new`](reth_provider::ProviderFactory::new) for constructing
/// a [`ProviderFactory`](reth_provider::ProviderFactory) manually with all required
/// components.
pub fn provider_factory_builder() -> ProviderFactoryBuilder<Self> {
ProviderFactoryBuilder::default()
}
@@ -513,7 +503,7 @@ where
// it doesn't impact the first block or the first gossiped blob transaction, so we
// initialize this in the background
let kzg_settings = validator.validator().kzg_settings().clone();
ctx.task_executor().spawn_blocking(async move {
ctx.task_executor().spawn_blocking_task(async move {
let _ = kzg_settings.get();
debug!(target: "reth::cli", "Initialized KZG settings");
});

View File

@@ -10,7 +10,7 @@ use reth_ethereum_primitives::PooledTransactionVariant;
use reth_node_builder::{NodeBuilder, NodeHandle};
use reth_node_core::{args::RpcServerArgs, node_config::NodeConfig};
use reth_node_ethereum::EthereumNode;
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use reth_transaction_pool::TransactionPool;
use std::{
sync::Arc,
@@ -20,8 +20,7 @@ use std::{
#[tokio::test]
async fn can_handle_blobs() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let exec = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
let chain_spec = Arc::new(
@@ -37,7 +36,7 @@ async fn can_handle_blobs() -> eyre::Result<()> {
.with_unused_ports()
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http());
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
.testing_node(exec.clone())
.testing_node(runtime.clone())
.node(EthereumNode::default())
.launch()
.await?;
@@ -92,8 +91,7 @@ async fn can_handle_blobs() -> eyre::Result<()> {
#[tokio::test]
async fn can_send_legacy_sidecar_post_activation() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let exec = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
let chain_spec = Arc::new(
@@ -107,7 +105,7 @@ async fn can_send_legacy_sidecar_post_activation() -> eyre::Result<()> {
.with_force_blob_sidecar_upcasting(),
);
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
.testing_node(exec.clone())
.testing_node(runtime.clone())
.node(EthereumNode::default())
.launch()
.await?;
@@ -146,8 +144,7 @@ async fn can_send_legacy_sidecar_post_activation() -> eyre::Result<()> {
#[tokio::test]
async fn blob_conversion_at_osaka() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let exec = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let current_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
// Osaka activates in 2 slots
@@ -170,7 +167,7 @@ async fn blob_conversion_at_osaka() -> eyre::Result<()> {
.with_force_blob_sidecar_upcasting(),
);
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
.testing_node(exec.clone())
.testing_node(runtime.clone())
.node(EthereumNode::default())
.launch()
.await?;

View File

@@ -27,7 +27,7 @@ async fn can_run_eth_node_with_custom_genesis_number() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, wallet) =
let (mut nodes, wallet) =
setup::<EthereumNode>(1, chain_spec, false, eth_payload_attributes).await?;
let mut node = nodes.pop().unwrap();
@@ -81,7 +81,7 @@ async fn custom_genesis_block_query_boundaries() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, _wallet) =
let (mut nodes, _wallet) =
setup::<EthereumNode>(1, chain_spec, false, eth_payload_attributes).await?;
let node = nodes.pop().unwrap();

View File

@@ -9,20 +9,19 @@ use reth_node_core::args::DevArgs;
use reth_node_ethereum::{node::EthereumAddOns, EthereumNode};
use reth_provider::{providers::BlockchainProvider, CanonStateSubscriptions};
use reth_rpc_eth_api::{helpers::EthTransactions, EthApiServer};
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use std::sync::Arc;
#[tokio::test]
async fn can_run_dev_node() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let exec = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let node_config = NodeConfig::test()
.with_chain(custom_chain())
.with_dev(DevArgs { dev: true, ..Default::default() });
let NodeHandle { node, .. } = NodeBuilder::new(node_config.clone())
.testing_node(exec.clone())
.testing_node(runtime.clone())
.with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
.with_components(EthereumNode::components())
.with_add_ons(EthereumAddOns::default())
@@ -37,15 +36,14 @@ async fn can_run_dev_node() -> eyre::Result<()> {
#[tokio::test]
async fn can_run_dev_node_custom_attributes() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let exec = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let node_config = NodeConfig::test()
.with_chain(custom_chain())
.with_dev(DevArgs { dev: true, ..Default::default() });
let fee_recipient = Address::random();
let NodeHandle { node, .. } = NodeBuilder::new(node_config.clone())
.testing_node(exec.clone())
.testing_node(runtime.clone())
.with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
.with_components(EthereumNode::components())
.with_add_ons(EthereumAddOns::default())

View File

@@ -14,14 +14,14 @@ use reth_node_core::{args::RpcServerArgs, node_config::NodeConfig};
use reth_node_ethereum::EthereumNode;
use reth_provider::BlockNumReader;
use reth_rpc_api::TestingBuildBlockRequestV1;
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use std::sync::Arc;
#[tokio::test]
async fn can_run_eth_node() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let (mut nodes, _tasks, wallet) = setup::<EthereumNode>(
let (mut nodes, wallet) = setup::<EthereumNode>(
1,
Arc::new(
ChainSpecBuilder::default()
@@ -57,8 +57,7 @@ async fn can_run_eth_node() -> eyre::Result<()> {
#[cfg(unix)]
async fn can_run_eth_node_with_auth_engine_api_over_ipc() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let exec = TaskManager::current();
let exec = exec.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
// Chain spec with test allocs
let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
@@ -76,7 +75,7 @@ async fn can_run_eth_node_with_auth_engine_api_over_ipc() -> eyre::Result<()> {
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http().with_auth_ipc());
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
.testing_node(exec)
.testing_node(runtime)
.node(EthereumNode::default())
.launch()
.await?;
@@ -105,8 +104,7 @@ async fn can_run_eth_node_with_auth_engine_api_over_ipc() -> eyre::Result<()> {
#[cfg(unix)]
async fn test_failed_run_eth_node_with_no_auth_engine_api_over_ipc_opts() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let exec = TaskManager::current();
let exec = exec.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
// Chain spec with test allocs
let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
@@ -121,7 +119,7 @@ async fn test_failed_run_eth_node_with_no_auth_engine_api_over_ipc_opts() -> eyr
// Node setup
let node_config = NodeConfig::test().with_chain(chain_spec);
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
.testing_node(exec)
.testing_node(runtime)
.node(EthereumNode::default())
.launch()
.await?;
@@ -139,7 +137,7 @@ async fn test_failed_run_eth_node_with_no_auth_engine_api_over_ipc_opts() -> eyr
async fn test_engine_graceful_shutdown() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let (mut nodes, _tasks, wallet) = setup::<EthereumNode>(
let (mut nodes, wallet) = setup::<EthereumNode>(
1,
Arc::new(
ChainSpecBuilder::default()
@@ -190,8 +188,7 @@ async fn test_engine_graceful_shutdown() -> eyre::Result<()> {
#[tokio::test]
async fn test_testing_build_block_v1_osaka() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let exec = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
let chain_spec = Arc::new(
@@ -208,7 +205,7 @@ async fn test_testing_build_block_v1_osaka() -> eyre::Result<()> {
);
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
.testing_node(exec)
.testing_node(runtime)
.node(EthereumNode::default())
.launch()
.await?;
@@ -278,7 +275,7 @@ async fn test_sparse_trie_reuse_across_blocks() -> eyre::Result<()> {
.with_sparse_trie_prune_depth(2)
.with_sparse_trie_max_storage_tries(100);
let (mut nodes, _tasks, _wallet) = setup_engine::<EthereumNode>(
let (mut nodes, _wallet) = setup_engine::<EthereumNode>(
1,
Arc::new(
ChainSpecBuilder::default()

View File

@@ -37,7 +37,7 @@ async fn can_handle_invalid_payload_then_valid() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
2,
chain_spec.clone(),
false,
@@ -154,7 +154,7 @@ async fn can_handle_multiple_invalid_payloads() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
2,
chain_spec.clone(),
false,
@@ -255,7 +255,7 @@ async fn can_handle_invalid_payload_with_transactions() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
2,
chain_spec.clone(),
false,

View File

@@ -18,7 +18,7 @@ use std::{sync::Arc, time::Duration};
async fn can_sync() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let (mut nodes, _tasks, wallet) = setup::<EthereumNode>(
let (mut nodes, wallet) = setup::<EthereumNode>(
2,
Arc::new(
ChainSpecBuilder::default()
@@ -74,7 +74,7 @@ async fn e2e_test_send_transactions() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, _) = setup_engine::<EthereumNode>(
let (mut nodes, _) = setup_engine::<EthereumNode>(
2,
chain_spec.clone(),
false,
@@ -116,7 +116,7 @@ async fn test_long_reorg() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, _) = setup_engine::<EthereumNode>(
let (mut nodes, _) = setup_engine::<EthereumNode>(
2,
chain_spec.clone(),
false,
@@ -172,7 +172,7 @@ async fn test_reorg_through_backfill() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, _) = setup_engine::<EthereumNode>(
let (mut nodes, _) = setup_engine::<EthereumNode>(
2,
chain_spec.clone(),
false,
@@ -236,7 +236,7 @@ async fn test_tx_propagation() -> eyre::Result<()> {
};
// Setup 10 nodes
let (mut nodes, _tasks, _) = setup_engine_with_connection::<EthereumNode>(
let (mut nodes, _) = setup_engine_with_connection::<EthereumNode>(
10,
chain_spec.clone(),
false,

View File

@@ -12,7 +12,7 @@ use reth_node_core::{args::RpcServerArgs, node_config::NodeConfig};
use reth_node_ethereum::EthereumNode;
use reth_primitives_traits::Recovered;
use reth_provider::CanonStateSubscriptions;
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use reth_transaction_pool::{
blobstore::InMemoryBlobStore, test_utils::OkValidator, BlockInfo, CoinbaseTipOrdering,
EthPooledTransaction, Pool, PoolTransaction, TransactionOrigin, TransactionPool,
@@ -24,8 +24,7 @@ use std::{sync::Arc, time::Duration};
#[tokio::test]
async fn maintain_txpool_stale_eviction() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let executor = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let txpool = Pool::new(
OkValidator::default(),
@@ -49,7 +48,7 @@ async fn maintain_txpool_stale_eviction() -> eyre::Result<()> {
.with_unused_ports()
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http());
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
.testing_node(executor.clone())
.testing_node(runtime.clone())
.node(EthereumNode::default())
.launch()
.await?;
@@ -63,13 +62,13 @@ async fn maintain_txpool_stale_eviction() -> eyre::Result<()> {
..Default::default()
};
executor.spawn_critical(
runtime.spawn_critical_task(
"txpool maintenance task",
reth_transaction_pool::maintain::maintain_transaction_pool_future(
node.inner.provider.clone(),
txpool.clone(),
node.inner.provider.clone().canonical_state_stream(),
executor.clone(),
runtime.clone(),
config,
),
);
@@ -98,8 +97,7 @@ async fn maintain_txpool_stale_eviction() -> eyre::Result<()> {
#[tokio::test]
async fn maintain_txpool_reorg() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let executor = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let txpool = Pool::new(
OkValidator::default(),
@@ -124,7 +122,7 @@ async fn maintain_txpool_reorg() -> eyre::Result<()> {
.with_unused_ports()
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http());
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
.testing_node(executor.clone())
.testing_node(runtime.clone())
.node(EthereumNode::default())
.launch()
.await?;
@@ -135,13 +133,13 @@ async fn maintain_txpool_reorg() -> eyre::Result<()> {
let w1 = wallets.first().unwrap();
let w2 = wallets.last().unwrap();
executor.spawn_critical(
runtime.spawn_critical_task(
"txpool maintenance task",
reth_transaction_pool::maintain::maintain_transaction_pool_future(
node.inner.provider.clone(),
txpool.clone(),
node.inner.provider.clone().canonical_state_stream(),
executor.clone(),
runtime.clone(),
reth_transaction_pool::maintain::MaintainPoolConfig::default(),
),
);
@@ -231,8 +229,7 @@ async fn maintain_txpool_reorg() -> eyre::Result<()> {
#[tokio::test]
async fn maintain_txpool_commit() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let executor = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let txpool = Pool::new(
OkValidator::default(),
@@ -256,7 +253,7 @@ async fn maintain_txpool_commit() -> eyre::Result<()> {
.with_unused_ports()
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http());
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
.testing_node(executor.clone())
.testing_node(runtime.clone())
.node(EthereumNode::default())
.launch()
.await?;
@@ -265,13 +262,13 @@ async fn maintain_txpool_commit() -> eyre::Result<()> {
let wallet = Wallet::default();
executor.spawn_critical(
runtime.spawn_critical_task(
"txpool maintenance task",
reth_transaction_pool::maintain::maintain_transaction_pool_future(
node.inner.provider.clone(),
txpool.clone(),
node.inner.provider.clone().canonical_state_stream(),
executor.clone(),
runtime.clone(),
reth_transaction_pool::maintain::MaintainPoolConfig::default(),
),
);

View File

@@ -12,7 +12,7 @@ use reth_node_builder::{NodeBuilder, NodeHandle};
use reth_node_core::{args::RpcServerArgs, node_config::NodeConfig};
use reth_node_ethereum::EthereumNode;
use reth_rpc_server_types::RpcModuleSelection;
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use serde::Deserialize;
use std::sync::Arc;
@@ -29,8 +29,7 @@ async fn debug_trace_call_matches_geth_prestate_snapshot() -> Result<()> {
let mut genesis: Genesis = MAINNET.genesis().clone();
genesis.coinbase = address!("0x95222290dd7278aa3ddd389cc1e1d165cc4bafe5");
let exec = TaskManager::current();
let exec = exec.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let expected_frame = expected_snapshot_frame()?;
let prestate_mode = match &expected_frame {
@@ -63,7 +62,7 @@ async fn debug_trace_call_matches_geth_prestate_snapshot() -> Result<()> {
);
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
.testing_node(exec)
.testing_node(runtime)
.node(EthereumNode::default())
.launch()
.await?;

View File

@@ -21,7 +21,7 @@ use reth_node_core::{
use reth_node_ethereum::EthereumNode;
use reth_payload_primitives::BuiltPayload;
use reth_rpc_api::servers::AdminApiServer;
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use std::{
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
@@ -57,7 +57,7 @@ async fn test_fee_history() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
1,
chain_spec.clone(),
false,
@@ -142,7 +142,7 @@ async fn test_flashbots_validate_v3() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
1,
chain_spec.clone(),
false,
@@ -224,7 +224,7 @@ async fn test_flashbots_validate_v4() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
1,
chain_spec.clone(),
false,
@@ -314,7 +314,7 @@ async fn test_eth_config() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
1,
chain_spec.clone(),
false,
@@ -344,8 +344,7 @@ async fn test_eth_config() -> eyre::Result<()> {
async fn test_admin_external_ip() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let exec = TaskManager::current();
let exec = exec.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
// Chain spec with test allocs
let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
@@ -363,7 +362,7 @@ async fn test_admin_external_ip() -> eyre::Result<()> {
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http());
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
.testing_node(exec)
.testing_node(runtime)
.node(EthereumNode::default())
.launch()
.await?;

View File

@@ -142,7 +142,7 @@ async fn test_selfdestruct_post_dencun() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
let (mut nodes, _tasks, wallet) =
let (mut nodes, wallet) =
setup_engine::<EthereumNode>(1, cancun_spec(), false, tree_config, eth_payload_attributes)
.await?;
let mut node = nodes.pop().unwrap();
@@ -236,7 +236,7 @@ async fn test_selfdestruct_same_tx_post_dencun() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
let (mut nodes, _tasks, wallet) =
let (mut nodes, wallet) =
setup_engine::<EthereumNode>(1, cancun_spec(), false, tree_config, eth_payload_attributes)
.await?;
let mut node = nodes.pop().unwrap();
@@ -311,7 +311,7 @@ async fn test_selfdestruct_pre_dencun() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
1,
shanghai_spec(),
false,
@@ -421,7 +421,7 @@ async fn test_selfdestruct_same_tx_preexisting_account_post_dencun() -> eyre::Re
reth_tracing::init_test_tracing();
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
let (mut nodes, _tasks, wallet) =
let (mut nodes, wallet) =
setup_engine::<EthereumNode>(1, cancun_spec(), false, tree_config, eth_payload_attributes)
.await?;
let mut node = nodes.pop().unwrap();

View File

@@ -29,7 +29,7 @@ async fn test_simulate_v1_with_max_fee_per_blob_gas_only() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
1,
chain_spec.clone(),
false,

View File

@@ -11,7 +11,7 @@ use reth_node_builder::{EngineNodeLauncher, FullNodeComponents, NodeBuilder, Nod
use reth_node_ethereum::node::{EthereumAddOns, EthereumNode};
use reth_provider::providers::BlockchainProvider;
use reth_rpc_builder::Identity;
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
#[test]
fn test_basic_setup() {
@@ -46,13 +46,13 @@ fn test_basic_setup() {
#[tokio::test]
async fn test_eth_launcher() {
let tasks = TaskManager::current();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let config = NodeConfig::test();
let db = create_test_rw_db();
let _builder =
NodeBuilder::new(config)
.with_database(db)
.with_launch_context(tasks.executor())
.with_launch_context(runtime.clone())
.with_types_and_provider::<EthereumNode, BlockchainProvider<
NodeTypesWithDBAdapter<EthereumNode, Arc<TempDatabase<DatabaseEnv>>>,
>>()
@@ -64,7 +64,7 @@ async fn test_eth_launcher() {
})
.launch_with_fn(|builder| {
let launcher = EngineNodeLauncher::new(
tasks.executor(),
runtime.clone(),
builder.config().datadir(),
Default::default(),
);
@@ -81,13 +81,13 @@ fn test_eth_launcher_with_tokio_runtime() {
let custom_rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
main_rt.block_on(async {
let tasks = TaskManager::current();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let config = NodeConfig::test();
let db = create_test_rw_db();
let _builder =
NodeBuilder::new(config)
.with_database(db)
.with_launch_context(tasks.executor())
.with_launch_context(runtime.clone())
.with_types_and_provider::<EthereumNode, BlockchainProvider<
NodeTypesWithDBAdapter<EthereumNode, Arc<TempDatabase<DatabaseEnv>>>,
>>()
@@ -101,7 +101,7 @@ fn test_eth_launcher_with_tokio_runtime() {
})
.launch_with_fn(|builder| {
let launcher = EngineNodeLauncher::new(
tasks.executor(),
runtime.clone(),
builder.config().datadir(),
Default::default(),
);

View File

@@ -13,14 +13,14 @@ use reth_node_core::{
use reth_node_ethereum::{node::EthereumAddOns, EthereumNode};
use reth_rpc_api::TestingBuildBlockRequestV1;
use reth_rpc_server_types::{RethRpcModule, RpcModuleSelection};
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use std::str::FromStr;
use tempfile::tempdir;
use tokio::sync::oneshot;
#[tokio::test(flavor = "multi_thread")]
async fn testing_rpc_build_block_works() -> eyre::Result<()> {
let tasks = TaskManager::current();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let mut rpc_args = reth_node_core::args::RpcServerArgs::default().with_http();
rpc_args.http_api = Some(RpcModuleSelection::from_iter([RethRpcModule::Testing]));
let tempdir = tempdir().expect("temp datadir");
@@ -41,7 +41,7 @@ async fn testing_rpc_build_block_works() -> eyre::Result<()> {
let builder = NodeBuilder::new(config)
.with_database(db)
.with_launch_context(tasks.executor())
.with_launch_context(runtime)
.with_types::<EthereumNode>()
.with_components(EthereumNode::components())
.with_add_ons(EthereumAddOns::default())

View File

@@ -100,6 +100,7 @@ test-utils = [
"reth-node-builder?/test-utils",
"reth-trie-db?/test-utils",
"reth-codecs?/test-utils",
"reth-tasks?/test-utils",
]
full = [

View File

@@ -1,4 +1,5 @@
use crate::{execute::ExecutableTxFor, ConfigureEvm, EvmEnvFor, ExecutionCtxFor, TxEnvFor};
use alloy_consensus::transaction::Either;
use alloy_evm::{block::ExecutableTxParts, RecoveredTx};
use rayon::prelude::*;
use reth_primitives_traits::TxTy;
@@ -21,10 +22,55 @@ pub trait ConfigureEngineEvm<ExecutionData>: ConfigureEvm {
) -> Result<impl ExecutableTxIterator<Self>, Self::Error>;
}
/// Converts a raw transaction into an executable transaction.
///
/// This trait abstracts the conversion logic (e.g., decoding, signature recovery) that is
/// parallelized in the engine.
pub trait ConvertTx<RawTx>: Send + Sync + 'static {
/// The executable transaction type.
type Tx;
/// Errors that may occur during conversion.
type Error;
/// Converts a raw transaction.
fn convert(&self, raw: RawTx) -> Result<Self::Tx, Self::Error>;
}
// Blanket impl so closures still work.
impl<F, RawTx, Tx, Err> ConvertTx<RawTx> for F
where
F: Fn(RawTx) -> Result<Tx, Err> + Send + Sync + 'static,
{
type Tx = Tx;
type Error = Err;
fn convert(&self, raw: RawTx) -> Result<Tx, Err> {
self(raw)
}
}
impl<A, B, RA, RB> ConvertTx<Either<RA, RB>> for Either<A, B>
where
A: ConvertTx<RA>,
B: ConvertTx<RB>,
{
type Tx = Either<A::Tx, B::Tx>;
type Error = Either<A::Error, B::Error>;
fn convert(&self, raw: Either<RA, RB>) -> Result<Self::Tx, Self::Error> {
match (self, raw) {
(Self::Left(a), Either::Left(raw)) => {
a.convert(raw).map(Either::Left).map_err(Either::Left)
}
(Self::Right(b), Either::Right(raw)) => {
b.convert(raw).map(Either::Right).map_err(Either::Right)
}
_ => unreachable!(),
}
}
}
/// A helper trait representing a pair of a "raw" transactions iterator and a closure that can be
/// used to convert them to an executable transaction. This tuple is used in the engine to
/// parallelize heavy work like decoding or recovery.
pub trait ExecutableTxTuple: Into<(Self::IntoIter, Self::Convert)> + Send + 'static {
pub trait ExecutableTxTuple: Send + 'static {
/// Raw transaction that can be converted to an [`ExecutableTxTuple::Tx`]
///
/// This can be any type that can be converted to an [`ExecutableTxTuple::Tx`]. For example,
@@ -37,12 +83,16 @@ pub trait ExecutableTxTuple: Into<(Self::IntoIter, Self::Convert)> + Send + 'sta
/// Iterator over [`ExecutableTxTuple::Tx`].
type IntoIter: IntoParallelIterator<Item = Self::RawTx, Iter: IndexedParallelIterator>
+ IntoIterator<Item = Self::RawTx>
+ Send
+ 'static;
/// Closure that can be used to convert a [`ExecutableTxTuple::RawTx`] to a
/// Converter that can be used to convert a [`ExecutableTxTuple::RawTx`] to a
/// [`ExecutableTxTuple::Tx`]. This might involve heavy work like decoding or recovery
/// and will be parallelized in the engine.
type Convert: Fn(Self::RawTx) -> Result<Self::Tx, Self::Error> + Send + Sync + 'static;
type Convert: ConvertTx<Self::RawTx, Tx = Self::Tx, Error = Self::Error>;
/// Decomposes into the raw transaction iterator and converter.
fn into_parts(self) -> (Self::IntoIter, Self::Convert);
}
impl<RawTx, Tx, Err, I, F> ExecutableTxTuple for (I, F)
@@ -50,7 +100,10 @@ where
RawTx: Send + Sync + 'static,
Tx: Clone + Send + Sync + 'static,
Err: core::error::Error + Send + Sync + 'static,
I: IntoParallelIterator<Item = RawTx, Iter: IndexedParallelIterator> + Send + 'static,
I: IntoParallelIterator<Item = RawTx, Iter: IndexedParallelIterator>
+ IntoIterator<Item = RawTx>
+ Send
+ 'static,
F: Fn(RawTx) -> Result<Tx, Err> + Send + Sync + 'static,
{
type RawTx = RawTx;
@@ -59,6 +112,10 @@ where
type IntoIter = I;
type Convert = F;
fn into_parts(self) -> (I, F) {
self
}
}
/// Iterator over executable transactions.
@@ -76,3 +133,72 @@ where
{
type Recovered = <T::Tx as ExecutableTxParts<TxEnvFor<Evm>, TxTy<Evm::Primitives>>>::Recovered;
}
/// Wraps `Either<L, R>` to implement both [`IntoParallelIterator`] and [`IntoIterator`],
/// mapping items through [`Either::Left`] / [`Either::Right`] on demand without collecting.
#[derive(Debug)]
pub struct EitherIter<L, R>(Either<L, R>);
impl<L, R> IntoParallelIterator for EitherIter<L, R>
where
L: IntoParallelIterator,
R: IntoParallelIterator,
L::Iter: IndexedParallelIterator,
R::Iter: IndexedParallelIterator,
{
type Item = Either<L::Item, R::Item>;
type Iter = Either<
rayon::iter::Map<L::Iter, fn(L::Item) -> Either<L::Item, R::Item>>,
rayon::iter::Map<R::Iter, fn(R::Item) -> Either<L::Item, R::Item>>,
>;
fn into_par_iter(self) -> Self::Iter {
match self.0 {
Either::Left(l) => Either::Left(l.into_par_iter().map(Either::Left)),
Either::Right(r) => Either::Right(r.into_par_iter().map(Either::Right)),
}
}
}
impl<L, R> IntoIterator for EitherIter<L, R>
where
L: IntoIterator,
R: IntoIterator,
{
type Item = Either<L::Item, R::Item>;
type IntoIter = Either<
core::iter::Map<L::IntoIter, fn(L::Item) -> Either<L::Item, R::Item>>,
core::iter::Map<R::IntoIter, fn(R::Item) -> Either<L::Item, R::Item>>,
>;
fn into_iter(self) -> Self::IntoIter {
match self.0 {
Either::Left(l) => Either::Left(l.into_iter().map(Either::Left)),
Either::Right(r) => Either::Right(r.into_iter().map(Either::Right)),
}
}
}
// SAFETY: `EitherIter` is just a newtype over `Either<L, R>`.
unsafe impl<L: Send, R: Send> Send for EitherIter<L, R> {}
impl<A: ExecutableTxTuple, B: ExecutableTxTuple> ExecutableTxTuple for Either<A, B> {
type RawTx = Either<A::RawTx, B::RawTx>;
type Tx = Either<A::Tx, B::Tx>;
type Error = Either<A::Error, B::Error>;
type IntoIter = EitherIter<A::IntoIter, B::IntoIter>;
type Convert = Either<A::Convert, B::Convert>;
fn into_parts(self) -> (Self::IntoIter, Self::Convert) {
match self {
Self::Left(a) => {
let (iter, convert) = a.into_parts();
(EitherIter(Either::Left(iter)), Either::Left(convert))
}
Self::Right(b) => {
let (iter, convert) = b.into_parts();
(EitherIter(Either::Right(iter)), Either::Right(convert))
}
}
}
}

View File

@@ -47,7 +47,7 @@ pub use aliases::*;
#[cfg(feature = "std")]
mod engine;
#[cfg(feature = "std")]
pub use engine::{ConfigureEngineEvm, ExecutableTxIterator, ExecutableTxTuple};
pub use engine::{ConfigureEngineEvm, ConvertTx, ExecutableTxIterator, ExecutableTxTuple};
#[cfg(feature = "metrics")]
pub mod metrics;

View File

@@ -55,7 +55,7 @@ use reth_provider::{
providers::{BlockchainProvider, RocksDBProvider, StaticFileProvider},
BlockReader, EthStorage, ProviderFactory,
};
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use reth_transaction_pool::test_utils::{testing_pool, TestPool};
use tempfile::TempDir;
use thiserror::Error;
@@ -175,8 +175,8 @@ pub struct TestExExHandle {
pub events_rx: UnboundedReceiver<ExExEvent>,
/// Channel for sending notifications to the Execution Extension
pub notifications_tx: Sender<ExExNotification>,
/// Node task manager
pub tasks: TaskManager,
/// Node task runtime
pub runtime: Runtime,
/// WAL temp directory handle
_wal_directory: TempDir,
}
@@ -252,6 +252,7 @@ pub async fn test_exex_context_with_chain_spec(
chain_spec.clone(),
StaticFileProvider::read_write(static_dir.keep()).expect("static file provider"),
RocksDBProvider::builder(rocksdb_dir.keep()).with_default_tables().build().unwrap(),
reth_tasks::Runtime::test(),
)?;
let genesis_hash = init_genesis(&provider_factory)?;
@@ -265,9 +266,9 @@ pub async fn test_exex_context_with_chain_spec(
)
.await?;
let network = network_manager.handle().clone();
let tasks = TaskManager::current();
let task_executor = tasks.executor();
tasks.executor().spawn(network_manager);
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let task_executor = runtime.clone();
runtime.spawn_task(network_manager);
let (_, payload_builder_handle) = NoopPayloadBuilderService::<EthEngineTypes>::new();
@@ -320,7 +321,7 @@ pub async fn test_exex_context_with_chain_spec(
provider_factory,
events_rx,
notifications_tx,
tasks,
runtime,
_wal_directory: wal_directory,
},
))

View File

@@ -83,4 +83,5 @@ test-utils = [
"reth-primitives-traits/test-utils",
"dep:reth-ethereum-primitives",
"reth-ethereum-primitives?/test-utils",
"reth-tasks/test-utils",
]

View File

@@ -86,7 +86,7 @@ impl<B: Block + 'static> TaskDownloader<B> {
downloader,
};
spawner.spawn(downloader.boxed());
spawner.spawn_task(downloader.boxed());
Self { from_downloader: ReceiverStream::new(bodies_rx), to_downloader }
}

View File

@@ -78,7 +78,7 @@ impl<H: Sealable + Send + Sync + Unpin + 'static> TaskDownloader<H> {
updates: UnboundedReceiverStream::new(updates_rx),
downloader,
};
spawner.spawn(downloader.boxed());
spawner.spawn_task(downloader.boxed());
Self { from_downloader: ReceiverStream::new(headers_rx), to_downloader }
}

View File

@@ -139,6 +139,7 @@ test-utils = [
"reth-ethereum-primitives/test-utils",
"dep:reth-evm-ethereum",
"reth-evm-ethereum?/test-utils",
"reth-tasks/test-utils",
]
[[bench]]

View File

@@ -24,9 +24,12 @@ use crate::{
import::{BlockImport, BlockImportEvent, BlockImportOutcome, BlockValidation, NewBlockEvent},
listener::ConnectionListener,
message::{NewBlockMessage, PeerMessage},
metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
metrics::{
BackedOffPeersMetrics, ClosedSessionsMetrics, DisconnectMetrics, NetworkMetrics,
PendingSessionFailureMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
},
network::{NetworkHandle, NetworkHandleMessage},
peers::PeersManager,
peers::{BackoffReason, PeersManager},
poll_nested_stream_with_budget,
protocol::IntoRlpxSubProtocol,
required_block_filter::RequiredBlockFilter,
@@ -139,6 +142,12 @@ pub struct NetworkManager<N: NetworkPrimitives = EthNetworkPrimitives> {
metrics: NetworkMetrics,
/// Disconnect metrics for the Network
disconnect_metrics: DisconnectMetrics,
/// Closed sessions metrics, split by direction.
closed_sessions_metrics: ClosedSessionsMetrics,
/// Pending session failure metrics, split by direction.
pending_session_failure_metrics: PendingSessionFailureMetrics,
/// Backed off peers metrics, split by reason.
backed_off_peers_metrics: BackedOffPeersMetrics,
}
impl NetworkManager {
@@ -354,6 +363,9 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
num_active_peers,
metrics: Default::default(),
disconnect_metrics: Default::default(),
closed_sessions_metrics: Default::default(),
pending_session_failure_metrics: Default::default(),
backed_off_peers_metrics: Default::default(),
})
}
@@ -860,13 +872,18 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
&peer_id,
err,
);
self.backed_off_peers_metrics.increment_for_reason(
BackoffReason::from_disconnect(err.as_disconnected()),
);
err.as_disconnected()
} else {
// Gracefully disconnected
self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id);
self.backed_off_peers_metrics
.increment_for_reason(BackoffReason::GracefulClose);
None
};
self.metrics.closed_sessions.increment(1);
self.closed_sessions_metrics.active.increment(1);
self.update_active_connection_metrics();
if let Some(reason) = reason {
@@ -891,7 +908,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
.state_mut()
.peers_mut()
.on_incoming_pending_session_dropped(remote_addr, err);
self.metrics.pending_session_failures.increment(1);
self.pending_session_failure_metrics.inbound.increment(1);
if let Some(reason) = err.as_disconnected() {
self.disconnect_metrics.increment(reason);
}
@@ -901,13 +918,10 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
.peers_mut()
.on_incoming_pending_session_gracefully_closed();
}
self.metrics.closed_sessions.increment(1);
self.closed_sessions_metrics.incoming_pending.increment(1);
self.metrics
.incoming_connections
.set(self.swarm.state().peers().num_inbound_connections() as f64);
self.metrics
.backed_off_peers
.set(self.swarm.state().peers().num_backed_off_peers() as f64);
}
SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
trace!(
@@ -924,7 +938,10 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
&peer_id,
err,
);
self.metrics.pending_session_failures.increment(1);
self.pending_session_failure_metrics.outbound.increment(1);
self.backed_off_peers_metrics.increment_for_reason(
BackoffReason::from_disconnect(err.as_disconnected()),
);
if let Some(reason) = err.as_disconnected() {
self.disconnect_metrics.increment(reason);
}
@@ -934,9 +951,8 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
.peers_mut()
.on_outgoing_pending_session_gracefully_closed(&peer_id);
}
self.metrics.closed_sessions.increment(1);
self.closed_sessions_metrics.outgoing_pending.increment(1);
self.update_pending_connection_metrics();
self.metrics
.backed_off_peers
.set(self.swarm.state().peers().num_backed_off_peers() as f64);
@@ -956,6 +972,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
&error,
);
self.backed_off_peers_metrics.increment_for_reason(BackoffReason::ConnectionError);
self.metrics
.backed_off_peers
.set(self.swarm.state().peers().num_backed_off_peers() as f64);

View File

@@ -22,12 +22,6 @@ pub struct NetworkMetrics {
/// Number of peers known to the node
pub(crate) tracked_peers: Gauge,
/// Cumulative number of failures of pending sessions
pub(crate) pending_session_failures: Counter,
/// Total number of sessions closed
pub(crate) closed_sessions: Counter,
/// Number of active incoming connections
pub(crate) incoming_connections: Gauge,
@@ -77,6 +71,68 @@ pub struct NetworkMetrics {
pub(crate) acc_duration_poll_swarm: Gauge,
}
/// Metrics for closed sessions, split by direction.
#[derive(Debug)]
pub struct ClosedSessionsMetrics {
/// Sessions closed from active (established) connections.
pub active: Counter,
/// Sessions closed from incoming pending connections.
pub incoming_pending: Counter,
/// Sessions closed from outgoing pending connections.
pub outgoing_pending: Counter,
}
impl Default for ClosedSessionsMetrics {
fn default() -> Self {
Self {
active: metrics::counter!("network_closed_sessions", "direction" => "active"),
incoming_pending: metrics::counter!("network_closed_sessions", "direction" => "incoming_pending"),
outgoing_pending: metrics::counter!("network_closed_sessions", "direction" => "outgoing_pending"),
}
}
}
/// Metrics for pending session failures, split by direction.
#[derive(Debug)]
pub struct PendingSessionFailureMetrics {
/// Failures on incoming pending sessions.
pub inbound: Counter,
/// Failures on outgoing pending sessions.
pub outbound: Counter,
}
impl Default for PendingSessionFailureMetrics {
fn default() -> Self {
Self {
inbound: metrics::counter!("network_pending_session_failures", "direction" => "inbound"),
outbound: metrics::counter!("network_pending_session_failures", "direction" => "outbound"),
}
}
}
/// Metrics for backed off peers, split by reason.
#[derive(Metrics)]
#[metrics(scope = "network.backed_off_peers")]
pub struct BackedOffPeersMetrics {
/// Peers backed off because they reported too many peers.
pub too_many_peers: Counter,
/// Peers backed off after a graceful session close.
pub graceful_close: Counter,
/// Peers backed off due to connection or protocol errors.
pub connection_error: Counter,
}
impl BackedOffPeersMetrics {
/// Increments the counter for the given backoff reason.
pub fn increment_for_reason(&self, reason: crate::peers::BackoffReason) {
match reason {
crate::peers::BackoffReason::TooManyPeers => self.too_many_peers.increment(1),
crate::peers::BackoffReason::GracefulClose => self.graceful_close.increment(1),
crate::peers::BackoffReason::ConnectionError => self.connection_error.increment(1),
}
}
}
/// Metrics for `SessionManager`
#[derive(Metrics)]
#[metrics(scope = "network")]

View File

@@ -1260,6 +1260,27 @@ impl Display for InboundConnectionError {
}
}
/// The reason a peer was backed off.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BackoffReason {
/// The remote peer responded with `TooManyPeers` (0x04).
TooManyPeers,
/// The session was gracefully closed and we're backing off briefly.
GracefulClose,
/// A connection or protocol-level error occurred.
ConnectionError,
}
impl BackoffReason {
/// Derives the backoff reason from an optional [`DisconnectReason`].
pub const fn from_disconnect(reason: Option<DisconnectReason>) -> Self {
match reason {
Some(DisconnectReason::TooManyPeers) => Self::TooManyPeers,
_ => Self::ConnectionError,
}
}
}
#[cfg(test)]
mod tests {
use alloy_primitives::B512;

View File

@@ -229,7 +229,7 @@ impl<N: NetworkPrimitives> SessionManager<N> {
where
F: Future<Output = ()> + Send + 'static,
{
self.executor.spawn(f.boxed());
self.executor.spawn_task(f.boxed());
}
/// Invoked on a received status update.

View File

@@ -50,7 +50,7 @@ reth-rpc-eth-types.workspace = true
reth-rpc-layer.workspace = true
reth-stages.workspace = true
reth-static-file.workspace = true
reth-tasks.workspace = true
reth-tasks = { workspace = true, features = ["rayon"] }
reth-tokio-util.workspace = true
reth-tracing.workspace = true
reth-transaction-pool.workspace = true
@@ -120,6 +120,7 @@ test-utils = [
"reth-evm-ethereum/test-utils",
"reth-node-ethereum/test-utils",
"reth-primitives-traits/test-utils",
"reth-tasks/test-utils",
]
op = [
"reth-db/op",

View File

@@ -903,8 +903,8 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
.request_handler(self.provider().clone())
.split_with_handle();
self.executor.spawn_critical_blocking("p2p txpool", Box::pin(txpool));
self.executor.spawn_critical_blocking("p2p eth request handler", Box::pin(eth));
self.executor.spawn_critical_blocking_task("p2p txpool", Box::pin(txpool));
self.executor.spawn_critical_blocking_task("p2p eth request handler", Box::pin(eth));
let default_peers_path = self.config().datadir().known_peers();
let known_peers_file = self.config().network.persistent_peers_file(default_peers_path);

View File

@@ -324,7 +324,7 @@ mod test {
use reth_node_ethereum::EthereumNode;
use reth_payload_builder::PayloadBuilderHandle;
use reth_provider::noop::NoopProvider;
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use reth_transaction_pool::noop::NoopTransactionPool;
#[test]
@@ -345,9 +345,7 @@ mod test {
let task_executor = {
let runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let manager = TaskManager::new(handle);
manager.executor()
Runtime::with_existing_handle(runtime.handle().clone()).unwrap()
};
let node = NodeAdapter { components, task_executor, provider: NoopProvider::default() };

View File

@@ -107,7 +107,8 @@ where
let (payload_service, payload_service_handle) =
PayloadBuilderService::new(payload_generator, ctx.provider().canonical_state_stream());
ctx.task_executor().spawn_critical("payload builder service", Box::pin(payload_service));
ctx.task_executor()
.spawn_critical_task("payload builder service", Box::pin(payload_service));
Ok(payload_service_handle)
}
@@ -133,7 +134,7 @@ where
) -> eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>> {
let (tx, mut rx) = mpsc::unbounded_channel();
ctx.task_executor().spawn_critical("payload builder", async move {
ctx.task_executor().spawn_critical_task("payload builder", async move {
#[allow(clippy::collection_is_never_read)]
let mut subscriptions = Vec::new();

View File

@@ -7,7 +7,8 @@ use reth_chainspec::EthereumHardforks;
use reth_node_api::{BlockTy, NodeTypes, TxTy};
use reth_transaction_pool::{
blobstore::DiskFileBlobStore, BlobStore, CoinbaseTipOrdering, PoolConfig, PoolTransaction,
SubPoolLimit, TransactionPool, TransactionValidationTaskExecutor, TransactionValidator,
SubPoolLimit, TransactionOrdering, TransactionPool, TransactionValidationTaskExecutor,
TransactionValidator,
};
use std::future::Future;
@@ -174,10 +175,31 @@ where
where
BS: BlobStore,
{
let ctx = self.ctx;
let transaction_pool = self.build(blob_store, pool_config);
// Spawn maintenance tasks using standalone functions
spawn_maintenance_tasks(ctx, transaction_pool.clone(), transaction_pool.config())?;
self.build_with_ordering_and_spawn_maintenance_task(
CoinbaseTipOrdering::default(),
blob_store,
pool_config,
)
}
/// Build the transaction pool with a custom [`TransactionOrdering`] and spawn its maintenance
/// tasks.
pub fn build_with_ordering_and_spawn_maintenance_task<BS, O>(
self,
ordering: O,
blob_store: BS,
pool_config: PoolConfig,
) -> eyre::Result<reth_transaction_pool::Pool<TransactionValidationTaskExecutor<V>, O, BS>>
where
BS: BlobStore,
O: TransactionOrdering<Transaction = V::Transaction>,
{
let TxPoolBuilder { ctx, validator, .. } = self;
let transaction_pool =
reth_transaction_pool::Pool::new(validator, ordering, blob_store, pool_config.clone());
spawn_maintenance_tasks(ctx, transaction_pool.clone(), &pool_config)?;
Ok(transaction_pool)
}
@@ -256,7 +278,7 @@ where
let chain_events = ctx.provider().canonical_state_stream();
let client = ctx.provider().clone();
ctx.task_executor().spawn_critical(
ctx.task_executor().spawn_critical_task(
"txpool maintenance task",
reth_transaction_pool::maintain::maintain_transaction_pool_future(
client,

View File

@@ -215,9 +215,7 @@ impl LaunchContext {
/// Configure global settings this includes:
///
/// - Raising the file descriptor limit
/// - Configuring the global rayon thread pool with available parallelism. Honoring
/// engine.reserved-cpu-cores to reserve given number of cores for O while using at least 1
/// core for the rayon thread pool
/// - Configuring the global rayon thread pool for implicit `par_iter` usage
pub fn configure_globals(&self, reserved_cpu_cores: usize) {
// Raise the fd limit of the process.
// Does not do anything on windows.
@@ -229,14 +227,12 @@ impl LaunchContext {
Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
}
// Reserving the given number of CPU cores for the rest of OS.
// Users can reserve more cores by setting engine.reserved-cpu-cores
// Note: The global rayon thread pool will use at least one core.
// Configure the implicit global rayon pool for `par_iter` usage.
let num_threads = available_parallelism()
.map_or(0, |num| num.get().saturating_sub(reserved_cpu_cores).max(1));
if let Err(err) = ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(|i| format!("rayon-{i}"))
.thread_name(|i| format!("rayon-{i:02}"))
.build_global()
{
warn!(%err, "Failed to build global thread pool")
@@ -503,6 +499,7 @@ where
self.chain_spec(),
static_file_provider,
rocksdb_provider,
self.task_executor().clone(),
)?
.with_prune_modes(self.prune_modes())
.with_changeset_cache(changeset_cache);
@@ -558,7 +555,7 @@ where
let (tx, rx) = oneshot::channel();
// Pipeline should be run as blocking and panic if it fails.
self.task_executor().spawn_critical_blocking(
self.task_executor().spawn_critical_blocking_task(
"pipeline task",
Box::pin(async move {
let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
@@ -678,7 +675,8 @@ where
debug!(target: "reth::cli", "Spawning stages metrics listener task");
let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
self.task_executor()
.spawn_critical_task("stages metrics listener task", sync_metrics_listener);
LaunchContextWith {
inner: self.inner,
@@ -1105,7 +1103,7 @@ where
// If engine events are provided, spawn listener for new payload reporting
let ethstats_for_events = ethstats.clone();
let task_executor = self.task_executor().clone();
task_executor.spawn(Box::pin(async move {
task_executor.spawn_task(Box::pin(async move {
while let Some(event) = engine_events.next().await {
use reth_engine_primitives::ConsensusEngineEvent;
match event {
@@ -1131,7 +1129,7 @@ where
}));
// Spawn main ethstats service
task_executor.spawn(Box::pin(async move { ethstats.run().await }));
task_executor.spawn_task(Box::pin(async move { ethstats.run().await }));
Ok(())
}

View File

@@ -213,7 +213,7 @@ where
handle
.node
.task_executor
.spawn_critical("custom debug block provider consensus client", async move {
.spawn_critical_task("custom debug block provider consensus client", async move {
rpc_consensus_client.run().await
});
} else if let Some(url) = config.debug.rpc_consensus_url.clone() {
@@ -234,7 +234,7 @@ where
Arc::new(block_provider),
);
handle.node.task_executor.spawn_critical("rpc-ws consensus client", async move {
handle.node.task_executor.spawn_critical_task("rpc-ws consensus client", async move {
rpc_consensus_client.run().await
});
} else if let Some(maybe_custom_etherscan_url) = config.debug.etherscan.clone() {
@@ -262,9 +262,12 @@ where
handle.node.add_ons_handle.beacon_engine_handle.clone(),
Arc::new(block_provider),
);
handle.node.task_executor.spawn_critical("etherscan consensus client", async move {
rpc_consensus_client.run().await
});
handle
.node
.task_executor
.spawn_critical_task("etherscan consensus client", async move {
rpc_consensus_client.run().await
});
}
if config.dev.dev {
@@ -289,7 +292,7 @@ where
};
let dev_mining_mode = handle.node.config.dev_mining_mode(pool);
handle.node.task_executor.spawn_critical("local engine", async move {
handle.node.task_executor.spawn_critical_task("local engine", async move {
LocalMiner::new(
blockchain_db,
builder,

Some files were not shown because too many files have changed in this diff Show More