Compare commits

...

76 Commits

Author SHA1 Message Date
Brian Picciano
d6324d63e2 chore: release 1.11.3 2026-03-12 12:34:39 +01:00
Brian Picciano
5f3ade1bfe fix(trie): Reset proof v2 calculator on error (#22781)
Co-authored-by: Amp <amp@ampcode.com>
2026-03-12 10:09:18 +00:00
Derek Cofausper
b053f6fafe cherry-pick: fix don't produce both updates and removals for trie nodes (#22507)
Co-Authored-By: Arsenii Kulikov <62447812+klkvr@users.noreply.github.com>
2026-03-12 02:30:25 +00:00
Derek Cofausper
2a58e7a077 cherry-pick: install rayon panic handler (37f5b3a)
Co-Authored-By: Arsenii Kulikov <62447812+klkvr@users.noreply.github.com>
2026-03-12 02:30:17 +00:00
Emma Jamieson-Hoare
793a3d5fb3 fix missing import 2026-03-10 11:44:07 +00:00
Emma Jamieson-Hoare
89ae1af694 chore: upgrade to 1.11.2 2026-03-10 10:48:03 +00:00
Alexey Shekhirin
9c33fb5d45 fix(engine): reset execution cache hash on clear (#22895)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 10:46:09 +00:00
Alexey Shekhirin
bef3d7b4d1 fix lockfile 2026-02-23 18:36:44 +00:00
Emma Jamieson-Hoare
e918c17af9 chore: release 1.11.1
Amp-Thread-ID: https://ampcode.com/threads/T-019c8ba4-fd85-736b-9d2d-e878d350a91b
Co-authored-by: Amp <amp@ampcode.com>
2026-02-23 18:02:14 +00:00
Arsenii Kulikov
fcc170d53c fix: properly reveal trie nodes (#22415) 2026-02-23 17:58:13 +00:00
Arsenii Kulikov
c685842ba2 fix: overlay preparation on tokio (#22492) 2026-02-23 17:57:51 +00:00
Georgios Konstantopoulos
564ffa5868 fix(ci): pass docker tags as separate set entries in bake action (#22151)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 22:10:35 +00:00
Dan Cline
12891dd171 chore: allow invalid storage metadata (#22150) 2026-02-12 22:02:26 +00:00
Emma Jamieson-Hoare
c1015022f5 chore: release reth v1.11.0 (#22148) 2026-02-12 21:39:30 +00:00
Dan Cline
e3fe6326bc chore(storage): rm storage settings, use only one (#22042)
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
2026-02-12 21:17:05 +00:00
Dan Cline
e3d520b24f feat(network): add inbound / outbound scopes for disconnect reasons (#22070) 2026-02-12 20:54:03 +00:00
Dan Cline
9f29939ea1 feat: bundle mdbx_copy as reth db copy subcommand (#22061)
Co-authored-by: Emma Jamieson-Hoare <emmajam@users.noreply.github.com>
2026-02-12 20:39:56 +00:00
Matthias Seitz
10881d1c73 chore: fix book (#22142) 2026-02-12 21:44:53 +01:00
John Letey
408593467b feat(download): optional chain-aware snapshot url (#22119) 2026-02-12 21:42:19 +01:00
Emma Jamieson-Hoare
8caf8cdf11 docs: improve reth.rs/overview page (#22131)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 20:10:34 +00:00
Georgios Konstantopoulos
1e8030ef28 fix(engine): return error on updates channel disconnect in sparse trie task (#22139)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 20:00:36 +00:00
YK
f72c503d6f feat(metrics): use 5M first gas bucket for finer-grained newPayload metrics (#22136)
Co-authored-by: Ubuntu <ubuntu@dev-yk.tail388b2e.ts.net>
2026-02-12 19:03:21 +00:00
Emma Jamieson-Hoare
42890e6e7f fix: improve nightly Docker build failure Slack notification (#22130)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 18:58:55 +00:00
Dan Cline
e30e441ada fix: stage drop prunes account/storage changeset static files (#22062) 2026-02-12 18:34:46 +00:00
Georgios Konstantopoulos
121160d248 refactor(db): use hashed state as canonical state representation (#21115)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
2026-02-12 18:02:02 +00:00
Georgios Konstantopoulos
7ff78ca082 perf(engine): use transaction count threshold for prewarm skip (#22094)
Co-authored-by: yk <yongkang@tempo.xyz>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Ubuntu <ubuntu@dev-yk.tail388b2e.ts.net>
Co-authored-by: YK <chiayongkang@hotmail.com>
2026-02-12 17:07:52 +00:00
Georgios Konstantopoulos
d7f56d509c chore: add DaniPopes as codeowner for tasks crate (#22128)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 12:08:02 -05:00
Georgios Konstantopoulos
3300e404cf feat(engine): add --engine.disable-sparse-trie-cache-pruning flag (#21967)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: mattsse <19890894+mattsse@users.noreply.github.com>
Co-authored-by: alexey <17802178+shekhirin@users.noreply.github.com>
Co-authored-by: Alexey Shekhirin <github@shekhirin.com>
2026-02-12 16:36:31 +00:00
Georgios Konstantopoulos
77cb99fc78 chore(node): update misleading consensus engine log message (#22124)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Emma Jamieson-Hoare <emmajam@users.noreply.github.com>
2026-02-12 16:14:03 +00:00
Georgios Konstantopoulos
66169c7e7c feat(reth-bench): add progress field to per-block benchmark logs (#22016)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 16:03:32 +00:00
Georgios Konstantopoulos
4f5fafc8f3 fix(net): correct EthMessageID::max for eth70 and later versions (#22076)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 15:53:11 +00:00
Georgios Konstantopoulos
0b8e6c6ed3 feat(net): enforce EIP-868 fork ID for discovered peers (#22013)
Co-authored-by: Emma <emma@tempo.xyz>
Co-authored-by: Matthias Seitz <mattsse@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Emma Jamieson-Hoare <ejamieson19@gmail.com>
Co-authored-by: Emma Jamieson-Hoare <emmajam@users.noreply.github.com>
2026-02-12 15:29:37 +00:00
Georgios Konstantopoulos
4a62d38af2 perf(engine): use sequential sig recovery for blocks with small blocks (#22077)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Ubuntu <ubuntu@dev-yk.tail388b2e.ts.net>
Co-authored-by: YK <chiayongkang@hotmail.com>
2026-02-12 15:06:21 +00:00
Georgios Konstantopoulos
dc4f249f09 chore: zero-pad thread indices in thread names (#22113)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 12:45:49 +00:00
Brian Picciano
c915841a45 chore(stateless): Remove reth-stateless crate (#22115) 2026-02-12 11:20:49 +00:00
Georgios Konstantopoulos
217a337d8c chore(engine): remove biased select in engine service loop (#21961)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
2026-02-12 05:45:45 +00:00
Georgios Konstantopoulos
74d57008b6 chore(engine): downgrade failed response delivery logs to warn (#22055)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 05:44:09 +00:00
Georgios Konstantopoulos
f8767bc678 fix(engine): add await_state_root span to timeout path (#22111)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 05:14:39 +00:00
Georgios Konstantopoulos
81c83bba68 refactor(engine): remove unnecessary turbofish on CachedStateProvider, add new_prewarm (#22107)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 02:48:57 +00:00
Georgios Konstantopoulos
cd8ec58703 refactor(engine): move CachedStateProvider prewarm to const generic (#22106)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 01:30:24 +00:00
DaniPopes
931b17c3fd chore: bump alloy-core deps (#22104) 2026-02-12 01:15:56 +00:00
Emma Jamieson-Hoare
807d328cf0 fix: move alloy-primitives to regular dependency in bin/reth (#22105)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 01:15:12 +00:00
Georgios Konstantopoulos
8a6bbd29fe fix(tracing): return error instead of panicking on log directory creation failure (#22100)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 00:40:39 +00:00
Georgios Konstantopoulos
8bedaaee71 feat(docker): include debug symbols in maxperf images (#22003)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 00:34:41 +00:00
Emma Jamieson-Hoare
09cd105671 fix(primitives): move feature-referenced deps from dev-dependencies to optional dependencies (#22103)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 23:50:56 +00:00
Georgios Konstantopoulos
a0b60b7e64 feat(evm): impl ExecutableTxTuple for Either via EitherTxIterator (#22102)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 23:48:17 +00:00
DaniPopes
90e15d096d perf: reduce tracing span noise in prewarm and proof workers (#22101) 2026-02-11 23:32:50 +00:00
Emma Jamieson-Hoare
a161ca294f feat(net): add reason label to backed_off_peers metric (#22009)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 23:00:20 +00:00
Emma Jamieson-Hoare
3a5c41e3da test: add WebSocket subscription integration tests for eth_subscribe (#22065)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 22:56:47 +00:00
Georgios Konstantopoulos
968d3c9534 revert: skip transaction prewarming for small blocks (#22059) (#22097) 2026-02-11 14:38:08 -08:00
DaniPopes
fc6666f6a7 perf: treat hashes as bytes in BranchNodeCompact (#22089) 2026-02-11 22:11:49 +00:00
DaniPopes
ff3a854326 perf: use dedicated trie rayon pool for proof workers (#22051) 2026-02-11 22:10:17 +00:00
DaniPopes
04543ed16b chore: add span and log to runtime build (#22064) 2026-02-11 22:06:14 +00:00
Emma Jamieson-Hoare
ae3f0d4d1a test: expand CLI integration tests (#22086)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 21:43:28 +00:00
Georgios Konstantopoulos
5bccdc4a5d feat(engine): add state root task timeout with sequential fallback (#22004)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 20:45:45 +00:00
Georgios Konstantopoulos
0b7cd60668 perf(engine): skip transaction prewarming for small blocks (#22059)
Co-authored-by: yk <yongkang@tempo.xyz>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 20:37:04 +00:00
YK
aa983b49af perf(engine): add PrewarmMode::Skipped to avoid spawning idle workers (#22066)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Ubuntu <ubuntu@dev-yk.tail388b2e.ts.net>
2026-02-11 19:48:48 +00:00
Georgios Konstantopoulos
2aff617767 feat(cli): split account-history and storage-history stage drops (#22083)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 19:21:55 +00:00
Georgios Konstantopoulos
2c5d00ffb5 feat(engine): add gas bucket label to newPayload metrics (#22067)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 19:00:07 +00:00
Georgios Konstantopoulos
e2a3527414 test: add CLI integration tests for reth binary (#22069)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 17:56:16 +00:00
Georgios Konstantopoulos
e4cb3d3aed chore(cli): log received signals at info level (#22071)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 17:55:37 +00:00
DaniPopes
079b7b9d57 fix: don't drop node (#22063) 2026-02-11 16:43:55 +00:00
Georgios Konstantopoulos
8a25d7d3cf chore: remove ress crates from workspace (#22057)
Co-authored-by: mattsse <matt@paradigm.xyz>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-11 13:39:56 +00:00
Minhyuk Kim
a5ced84098 feat(node/builder): add build_with_ordering_and_spawn_maintenance_task to TxPoolBuilder (#21979) 2026-02-11 12:58:29 +00:00
Emma Jamieson-Hoare
59760a2fe3 feat(net): add direction labels to closed_sessions and pending_session_failures metrics (#22014)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:59:06 +00:00
Matthias Seitz
b9d21f293e refactor: remove TypesAnd1-5 staging types from ProviderFactoryBuilder (#22049)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:57:05 +00:00
Georgios Konstantopoulos
dec1cad318 refactor(trie): merge SparseTrieExt into SparseTrie trait (#22035)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:39:56 +00:00
Georgios Konstantopoulos
165b94c3fa chore(docker): pass RUSTC_WRAPPER to cargo build in Dockerfile.depot (#22048)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:37:43 +00:00
Georgios Konstantopoulos
69e4c06ae7 chore(log): simplify default profiler tracing filter (#22050)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:33:20 +00:00
Georgios Konstantopoulos
1406a984a7 ci: pass --no-fail-fast to all cargo nextest runs (#22046)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 04:21:38 +00:00
Andrey Kolishchak
93d6b9782c fix(node): ethstats conn/last_ping deadlock (#21463) 2026-02-11 03:48:54 +00:00
DaniPopes
68e4ff1f7d feat: global runtime (#21934) 2026-02-11 03:45:09 +00:00
Georgios Konstantopoulos
33467ea6dd fix(reth-bench): increase WS keepalive interval to match persistence timeout (#22039) 2026-02-11 02:45:54 +00:00
Georgios Konstantopoulos
3bf9280b3c refactor(storage): add with_*_opt builder methods to StorageSettings (#21998)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 02:19:33 +00:00
Georgios Konstantopoulos
5c93986e6d feat(reth-bench): accept bare integers as milliseconds for --wait-time (#22038)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 01:57:42 +00:00
Georgios Konstantopoulos
779e0eb8bb perf: downgrade on_hashed_state_update and on_prewarm_targets spans to trace (#22044)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-10 22:45:05 +00:00
302 changed files with 9305 additions and 8133 deletions

View File

@@ -0,0 +1,5 @@
---
reth-network: minor
---
Added reason label to backed_off_peers metric. The metric now tracks backed off peers by reason (too_many_peers, graceful_close, connection_error) to improve observability.

View File

@@ -0,0 +1,5 @@
---
ef-tests: patch
---
Removed reth-stateless crate and stateless validation from ef-tests.

View File

@@ -0,0 +1,4 @@
---
---
Added WebSocket subscription integration tests for eth_subscribe.

View File

@@ -0,0 +1,4 @@
---
---
Improved nightly Docker build failure Slack notification with more detailed formatting and context.

View File

@@ -0,0 +1,7 @@
---
reth: patch
reth-cli-commands: patch
reth-node-core: patch
---
Removed experimental ress protocol support for stateless Ethereum nodes.

View File

@@ -0,0 +1,5 @@
---
reth-node-builder: patch
---
Removed biased select in engine service loop to allow fair scheduling of shutdown requests alongside event processing.

View File

@@ -0,0 +1,4 @@
---
---
Improved documentation overview page with better structure and clarity.

View File

@@ -0,0 +1,5 @@
---
reth-node-events: patch
---
Updated consensus engine log message to be more accurate about received updates.

View File

@@ -0,0 +1,9 @@
---
reth-network-api: minor
reth-network-types: minor
reth-network: minor
reth-node-core: minor
reth: minor
---
Added optional ENR fork ID enforcement to filter out peers from incompatible networks during peer discovery, controlled by the `--enforce-enr-fork-id` CLI flag.

View File

@@ -0,0 +1,5 @@
---
reth-primitives: patch
---
Moved feature-referenced dependencies from dev-dependencies to optional dependencies to ensure they are available when their corresponding features are enabled.

View File

@@ -0,0 +1,5 @@
---
reth-provider: patch
---
Removed unused staging types from ProviderFactoryBuilder.

View File

@@ -0,0 +1,5 @@
---
reth-trie-sparse: patch
---
Fixed a bug where trie nodes could appear in both `updated_nodes` and `removed_nodes` simultaneously by removing entries from `removed_nodes` when a node is inserted as updated.

View File

@@ -0,0 +1,4 @@
---
---
Expanded CLI integration tests with subcommand help coverage, config TOML validation, genesis JSON validation, and send transaction round-trip test for dev mode.

View File

@@ -0,0 +1,5 @@
---
reth-network: minor
---
Added direction labels to `closed_sessions` and `pending_session_failures` metrics. Operators can now distinguish session closures and failures by direction (`active`, `incoming_pending`, `outgoing_pending` for closed sessions; `inbound`, `outbound` for pending session failures).

View File

@@ -0,0 +1,5 @@
---
reth-trie: patch
---
Fixed a potential panic in `ProofCalculator` by clearing internal computation state (`branch_stack`, `child_stack`, `branch_path`, etc.) after errors, preventing stale state from causing `usize` underflow panics when the calculator is reused. Added a test verifying correct behavior after simulated mid-computation errors.

2
.github/CODEOWNERS vendored
View File

@@ -38,7 +38,7 @@ crates/storage/libmdbx-rs/ @shekhirin
crates/storage/nippy-jar/ @joshieDo @shekhirin
crates/storage/provider/ @joshieDo @shekhirin @yongkangc
crates/storage/storage-api/ @joshieDo
crates/tasks/ @mattsse
crates/tasks/ @mattsse @DaniPopes
crates/tokio-util/ @mattsse
crates/tracing/ @mattsse @shekhirin
crates/tracing-otlp/ @mattsse @Rjected

View File

@@ -27,7 +27,6 @@ crates_to_check=(
reth-ethereum-forks
reth-ethereum-primitives
reth-ethereum-consensus
reth-stateless
)
any_failed=0

View File

@@ -63,6 +63,7 @@ exclude_crates=(
reth-provider # tokio
reth-prune # tokio
reth-prune-static-files # reth-provider
reth-tasks # tokio rt-multi-thread
reth-stages-api # reth-provider, reth-prune
reth-static-file # tokio
reth-transaction-pool # c-kzg

View File

@@ -70,18 +70,27 @@ jobs:
# Add 'latest' tag for non-RC releases
if [[ ! "$VERSION" =~ -rc ]]; then
echo "ethereum_tags=${REGISTRY}/reth:${VERSION},${REGISTRY}/reth:latest" >> "$GITHUB_OUTPUT"
{
echo "ethereum_set<<EOF"
echo "ethereum.tags=${REGISTRY}/reth:${VERSION}"
echo "ethereum.tags=${REGISTRY}/reth:latest"
echo "EOF"
} >> "$GITHUB_OUTPUT"
else
echo "ethereum_tags=${REGISTRY}/reth:${VERSION}" >> "$GITHUB_OUTPUT"
echo "ethereum_set=ethereum.tags=${REGISTRY}/reth:${VERSION}" >> "$GITHUB_OUTPUT"
fi
elif [[ "${{ github.event_name }}" == "schedule" ]] || [[ "${{ inputs.build_type }}" == "nightly" ]]; then
echo "targets=nightly" >> "$GITHUB_OUTPUT"
echo "ethereum_tags=${REGISTRY}/reth:nightly" >> "$GITHUB_OUTPUT"
echo "ethereum_set=ethereum.tags=${REGISTRY}/reth:nightly" >> "$GITHUB_OUTPUT"
else
# git-sha build
echo "targets=ethereum" >> "$GITHUB_OUTPUT"
echo "ethereum_tags=${REGISTRY}/reth:${{ github.sha }}" >> "$GITHUB_OUTPUT"
echo "ethereum_set=ethereum.tags=${REGISTRY}/reth:${{ github.sha }}" >> "$GITHUB_OUTPUT"
fi
- name: Build and push images
@@ -97,7 +106,7 @@ jobs:
targets: ${{ steps.params.outputs.targets }}
push: ${{ !(github.event_name == 'workflow_dispatch' && inputs.dry_run) }}
set: |
ethereum.tags=${{ steps.params.outputs.ethereum_tags }}
${{ steps.params.outputs.ethereum_set }}
- name: Verify image architectures
env:
@@ -117,6 +126,18 @@ jobs:
- name: Slack Webhook Action
uses: rtCamp/action-slack-notify@v2
env:
SLACK_COLOR: ${{ job.status }}
SLACK_MESSAGE: "Failed run: https://github.com/paradigmxyz/reth/actions/runs/${{ github.run_id }}"
SLACK_COLOR: danger
SLACK_ICON_EMOJI: ":rotating_light:"
SLACK_USERNAME: "GitHub Actions"
SLACK_TITLE: ":rotating_light: Nightly Docker Build Failed"
SLACK_MESSAGE: |
The scheduled nightly Docker build failed.
*Commit:* `${{ github.sha }}`
*Branch:* `${{ github.ref_name }}`
*Run:* <https://github.com/paradigmxyz/reth/actions/runs/${{ github.run_id }}|View logs>
*Action required:* Re-run the workflow or investigate the build failure.
SLACK_FOOTER: "paradigmxyz/reth · docker.yml"
MSG_MINIMAL: true
SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK_URL }}

View File

@@ -35,6 +35,7 @@ jobs:
- name: Run e2e tests
run: |
cargo nextest run \
--no-fail-fast \
--locked --features "asm-keccak" \
--workspace \
--exclude 'example-*' \
@@ -61,6 +62,7 @@ jobs:
- name: Run RocksDB e2e tests
run: |
cargo nextest run \
--no-fail-fast \
--locked --features "edge" \
-p reth-e2e-test-utils \
-E 'binary(rocksdb)'

View File

@@ -46,6 +46,7 @@ jobs:
- name: Run tests
run: |
cargo nextest run \
--no-fail-fast \
--locked --features "asm-keccak ${{ matrix.network }} ${{ matrix.storage == 'edge' && 'edge' || '' }}" \
--workspace --exclude ef-tests \
-E "kind(test) and not binary(e2e_testsuite)"
@@ -76,4 +77,4 @@ jobs:
with:
cache-on-failure: true
- name: run era1 files integration tests
run: cargo nextest run --release --package reth-era --test it -- --ignored
run: cargo nextest run --no-fail-fast --release --package reth-era --test it -- --ignored

View File

@@ -49,6 +49,7 @@ jobs:
- name: Run tests
run: |
cargo nextest run \
--no-fail-fast \
--features "${{ matrix.features }} $EDGE_FEATURES" --locked \
${{ matrix.exclude_args }} --workspace \
--exclude ef-tests --no-tests=warn \
@@ -87,7 +88,7 @@ jobs:
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
- run: cargo nextest run --cargo-profile hivetests -p ef-tests --features "asm-keccak ef-tests"
- run: cargo nextest run --no-fail-fast --cargo-profile hivetests -p ef-tests --features "asm-keccak ef-tests"
doc:
name: doc tests

677
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
[workspace.package]
version = "1.10.2"
version = "1.11.3"
edition = "2024"
rust-version = "1.88"
license = "MIT OR Apache-2.0"
@@ -83,8 +83,6 @@ members = [
"crates/prune/db",
"crates/prune/prune",
"crates/prune/types",
"crates/ress/protocol",
"crates/ress/provider",
"crates/revm/",
"crates/rpc/ipc/",
"crates/rpc/rpc-api/",
@@ -101,7 +99,6 @@ members = [
"crates/stages/api/",
"crates/stages/stages/",
"crates/stages/types/",
"crates/stateless",
"crates/static-file/static-file",
"crates/static-file/types/",
"crates/storage/codecs/",
@@ -309,6 +306,11 @@ inherits = "release"
lto = "fat"
codegen-units = 1
[profile.maxperf-symbols]
inherits = "maxperf"
debug = "full"
strip = "none"
[profile.reproducible]
inherits = "release"
panic = "abort"
@@ -417,7 +419,6 @@ reth-rpc-convert = { path = "crates/rpc/rpc-convert" }
reth-stages = { path = "crates/stages/stages" }
reth-stages-api = { path = "crates/stages/api" }
reth-stages-types = { path = "crates/stages/types", default-features = false }
reth-stateless = { path = "crates/stateless", default-features = false }
reth-static-file = { path = "crates/static-file/static-file" }
reth-static-file-types = { path = "crates/static-file/types", default-features = false }
reth-storage-api = { path = "crates/storage/storage-api", default-features = false }
@@ -434,8 +435,6 @@ reth-trie-db = { path = "crates/trie/db" }
reth-trie-parallel = { path = "crates/trie/parallel" }
reth-trie-sparse = { path = "crates/trie/sparse", default-features = false }
reth-zstd-compressors = { path = "crates/storage/zstd-compressors", default-features = false }
reth-ress-protocol = { path = "crates/ress/protocol" }
reth-ress-provider = { path = "crates/ress/provider" }
# revm
revm = { version = "34.0.0", default-features = false }
@@ -449,15 +448,19 @@ op-revm = { version = "15.0.0", default-features = false }
revm-inspectors = "0.34.2"
# eth
alloy-dyn-abi = "1.5.4"
alloy-primitives = { version = "1.5.4", default-features = false, features = ["map-foldhash"] }
alloy-sol-types = { version = "1.5.4", default-features = false }
alloy-dyn-abi = "1.5.6"
alloy-primitives = { version = "1.5.6", default-features = false, features = [
"map-foldhash",
] }
alloy-sol-types = { version = "1.5.6", default-features = false }
alloy-chains = { version = "0.2.5", default-features = false }
alloy-eip2124 = { version = "0.2.0", default-features = false }
alloy-eip7928 = { version = "0.3.0", default-features = false }
alloy-evm = { version = "0.27.2", default-features = false }
alloy-rlp = { version = "0.3.10", default-features = false, features = ["core-net"] }
alloy-rlp = { version = "0.3.13", default-features = false, features = [
"core-net",
] }
alloy-trie = { version = "0.9.4", default-features = false }
alloy-hardforks = "0.4.5"
@@ -469,10 +472,15 @@ alloy-genesis = { version = "1.6.3", default-features = false }
alloy-json-rpc = { version = "1.6.3", default-features = false }
alloy-network = { version = "1.6.3", default-features = false }
alloy-network-primitives = { version = "1.6.3", default-features = false }
alloy-provider = { version = "1.6.3", features = ["reqwest", "debug-api"], default-features = false }
alloy-provider = { version = "1.6.3", features = [
"reqwest",
"debug-api",
], default-features = false }
alloy-pubsub = { version = "1.6.3", default-features = false }
alloy-rpc-client = { version = "1.6.3", default-features = false }
alloy-rpc-types = { version = "1.6.3", features = ["eth"], default-features = false }
alloy-rpc-types = { version = "1.6.3", features = [
"eth",
], default-features = false }
alloy-rpc-types-admin = { version = "1.6.3", default-features = false }
alloy-rpc-types-anvil = { version = "1.6.3", default-features = false }
alloy-rpc-types-beacon = { version = "1.6.3", default-features = false }
@@ -486,7 +494,9 @@ alloy-serde = { version = "1.6.3", default-features = false }
alloy-signer = { version = "1.6.3", default-features = false }
alloy-signer-local = { version = "1.6.3", default-features = false }
alloy-transport = { version = "1.6.3" }
alloy-transport-http = { version = "1.6.3", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-http = { version = "1.6.3", features = [
"reqwest-rustls-tls",
], default-features = false }
alloy-transport-ipc = { version = "1.6.3", default-features = false }
alloy-transport-ws = { version = "1.6.3", default-features = false }
@@ -505,7 +515,10 @@ either = { version = "1.15.0", default-features = false }
arrayvec = { version = "0.7.6", default-features = false }
aquamarine = "0.6"
auto_impl = "1"
backon = { version = "1.2", default-features = false, features = ["std-blocking-sleep", "tokio-sleep"] }
backon = { version = "1.2", default-features = false, features = [
"std-blocking-sleep",
"tokio-sleep",
] }
bincode = "1.3"
bitflags = "2.4"
boyer-moore-magiclen = "0.2.16"
@@ -527,9 +540,13 @@ itertools = { version = "0.14", default-features = false }
linked_hash_set = "0.1"
lz4 = "1.28.1"
modular-bitfield = "0.13.1"
notify = { version = "8.0.0", default-features = false, features = ["macos_fsevent"] }
notify = { version = "8.0.0", default-features = false, features = [
"macos_fsevent",
] }
nybbles = { version = "0.4.8", default-features = false }
once_cell = { version = "1.19", default-features = false, features = ["critical-section"] }
once_cell = { version = "1.19", default-features = false, features = [
"critical-section",
] }
parking_lot = "0.12"
paste = "1.0"
rand = "0.9"
@@ -548,7 +565,9 @@ strum_macros = "0.27"
syn = "2.0"
thiserror = { version = "2.0.0", default-features = false }
tar = "0.4.44"
tracing = { version = "0.1.0", default-features = false }
tracing = { version = "0.1.0", default-features = false, features = [
"attributes",
] }
tracing-appender = "0.2"
url = { version = "2.3", default-features = false }
zstd = "0.13"
@@ -586,7 +605,11 @@ futures-util = { version = "0.3", default-features = false }
hyper = "1.3"
hyper-util = "0.1.5"
pin-project = "1.0.12"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "rustls-tls-native-roots", "stream"] }
reqwest = { version = "0.12", default-features = false, features = [
"rustls-tls",
"rustls-tls-native-roots",
"stream",
] }
tracing-futures = "0.2"
tower = "0.5"
tower-http = "0.6"
@@ -611,7 +634,10 @@ proptest-arbitrary-interop = "0.1.0"
# crypto
enr = { version = "0.13", default-features = false }
k256 = { version = "0.13", default-features = false, features = ["ecdsa"] }
secp256k1 = { version = "0.30", default-features = false, features = ["global-context", "recovery"] }
secp256k1 = { version = "0.30", default-features = false, features = [
"global-context",
"recovery",
] }
# rand 8 for secp256k1
rand_08 = { package = "rand", version = "0.8" }

View File

@@ -51,7 +51,8 @@ RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \
--mount=type=cache,target=/usr/local/cargo/registry,sharing=shared \
--mount=type=cache,target=/usr/local/cargo/git,sharing=shared \
--mount=type=cache,target=$SCCACHE_DIR,sharing=shared \
SCCACHE_WEBDAV_ENDPOINT=https://cache.depot.dev SCCACHE_DIR=/sccache sccache --start-server && \
export RUSTC_WRAPPER=sccache SCCACHE_WEBDAV_ENDPOINT=https://cache.depot.dev SCCACHE_DIR=/sccache && \
sccache --start-server && \
if [ -n "$RUSTFLAGS" ]; then \
export RUSTFLAGS="$RUSTFLAGS"; \
elif [ "$TARGETPLATFORM" = "linux/amd64" ]; then \

View File

@@ -153,7 +153,7 @@ COV_FILE := lcov.info
.PHONY: test-unit
test-unit: ## Run unit tests.
cargo install cargo-nextest --locked
cargo nextest run $(UNIT_TEST_ARGS)
cargo nextest run --no-fail-fast $(UNIT_TEST_ARGS)
.PHONY: cov-unit
@@ -186,7 +186,7 @@ $(EEST_TESTS_DIR):
.PHONY: ef-tests
ef-tests: $(EF_TESTS_DIR) $(EEST_TESTS_DIR) ## Runs Legacy and EEST tests.
cargo nextest run -p ef-tests --release --features ef-tests
cargo nextest run --no-fail-fast -p ef-tests --release --features ef-tests
##@ reth-bench

View File

@@ -32,7 +32,7 @@ Otherwise, running `make maxperf` at the root of the repo should be sufficient f
The `reth-bench new-payload-fcu` command is the most representative of ethereum mainnet live sync, alternating between sending `engine_newPayload` calls and `engine_forkchoiceUpdated` calls.
The `new-payload-fcu` command supports two optional waiting modes that can be used together or independently:
- `--wait-time <duration>`: Fixed sleep interval between blocks (e.g., `--wait-time 100ms`)
- `--wait-time <duration>`: Fixed sleep interval between blocks (e.g., `--wait-time 100ms` or `--wait-time 400` for 400ms)
- `--wait-for-persistence`: Waits for blocks to be persisted using the `reth_subscribePersistedBlock` subscription
When using `--wait-for-persistence`, the benchmark waits after every `(threshold + 1)` blocks, where the threshold defaults to the engine's persistence threshold (2). This can be customized with `--persistence-threshold <N>`.

View File

@@ -192,6 +192,15 @@ impl Command {
parent_header = block.header;
parent_hash = block_hash;
blocks_processed += 1;
let progress = match mode {
RampMode::Blocks(total) => format!("{blocks_processed}/{total}"),
RampMode::TargetGasLimit(target) => {
let pct = (parent_header.gas_limit as f64 / target as f64 * 100.0).min(100.0);
format!("{pct:.1}%")
}
};
info!(target: "reth-bench", progress, block_number = parent_header.number, gas_limit = parent_header.gas_limit, "Block processed");
}
let final_gas_limit = parent_header.gas_limit;

View File

@@ -2,7 +2,10 @@
use crate::valid_payload::call_forkchoice_updated;
use eyre::Result;
use std::io::{BufReader, Read};
use std::{
io::{BufReader, Read},
time::Duration,
};
/// Read input from either a file path or stdin.
pub(crate) fn read_input(path: Option<&str>) -> Result<String> {
@@ -51,6 +54,22 @@ pub(crate) fn parse_gas_limit(s: &str) -> eyre::Result<u64> {
let base: u64 = num_str.trim().parse()?;
base.checked_mul(multiplier).ok_or_else(|| eyre::eyre!("value overflow"))
}
/// Parses a duration string, treating bare integers as milliseconds.
///
/// Accepts either a `humantime` duration string (e.g. `"100ms"`, `"2s"`) or a plain
/// integer which is interpreted as milliseconds (e.g. `"400"` → 400ms).
pub(crate) fn parse_duration(s: &str) -> eyre::Result<Duration> {
match humantime::parse_duration(s) {
Ok(d) => Ok(d),
Err(_) => {
let millis: u64 =
s.trim().parse().map_err(|_| eyre::eyre!("invalid duration: {s:?}"))?;
Ok(Duration::from_millis(millis))
}
}
}
use alloy_consensus::Header;
use alloy_eips::eip4844::kzg_to_versioned_hash;
use alloy_primitives::{Address, B256};
@@ -270,4 +289,24 @@ mod tests {
assert!(parse_gas_limit("G").is_err());
assert!(parse_gas_limit("-1G").is_err());
}
#[test]
fn test_parse_duration_with_unit() {
assert_eq!(parse_duration("100ms").unwrap(), Duration::from_millis(100));
assert_eq!(parse_duration("2s").unwrap(), Duration::from_secs(2));
assert_eq!(parse_duration("1m").unwrap(), Duration::from_secs(60));
}
#[test]
fn test_parse_duration_bare_millis() {
assert_eq!(parse_duration("400").unwrap(), Duration::from_millis(400));
assert_eq!(parse_duration("0").unwrap(), Duration::from_millis(0));
assert_eq!(parse_duration("1000").unwrap(), Duration::from_millis(1000));
}
#[test]
fn test_parse_duration_errors() {
assert!(parse_duration("abc").is_err());
assert!(parse_duration("").is_err());
}
}

View File

@@ -12,6 +12,7 @@
use crate::{
bench::{
context::BenchContext,
helpers::parse_duration,
output::{
write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
},
@@ -25,7 +26,6 @@ use alloy_provider::Provider;
use alloy_rpc_types_engine::ForkchoiceState;
use clap::Parser;
use eyre::{Context, OptionExt};
use humantime::parse_duration;
use reth_cli_runner::CliContext;
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
use reth_node_core::args::BenchmarkArgs;
@@ -40,6 +40,9 @@ pub struct Command {
rpc_url: String,
/// How long to wait after a forkchoice update before sending the next payload.
///
/// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
/// milliseconds (e.g. `400`).
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
wait_time: Option<Duration>,
@@ -117,7 +120,7 @@ impl Command {
self.benchmark.ws_rpc_url.as_deref(),
&self.benchmark.engine_rpc_url,
)?;
let sub = setup_persistence_subscription(ws_url).await?;
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
Some(PersistenceWaiter::with_duration_and_subscription(
duration,
sub,
@@ -131,7 +134,7 @@ impl Command {
self.benchmark.ws_rpc_url.as_deref(),
&self.benchmark.engine_rpc_url,
)?;
let sub = setup_persistence_subscription(ws_url).await?;
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
Some(PersistenceWaiter::with_subscription(
sub,
self.persistence_threshold,
@@ -150,6 +153,7 @@ impl Command {
..
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let total_blocks = benchmark_mode.total_blocks();
let buffer_size = self.rpc_block_buffer_size;
// Use a oneshot channel to propagate errors from the spawned task
@@ -203,6 +207,7 @@ impl Command {
});
let mut results = Vec::new();
let mut blocks_processed = 0u64;
let total_benchmark_duration = Instant::now();
let mut total_wait_time = Duration::ZERO;
@@ -246,8 +251,13 @@ impl Command {
// Exclude time spent waiting on the block prefetch channel from the benchmark duration.
// We want to measure engine throughput, not RPC fetch latency.
blocks_processed += 1;
let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
info!(target: "reth-bench", %combined_result);
let progress = match total_blocks {
Some(total) => format!("{blocks_processed}/{total}"),
None => format!("{blocks_processed}"),
};
info!(target: "reth-bench", progress, %combined_result);
if let Some(w) = &mut waiter {
w.on_block(block_number).await?;

View File

@@ -52,6 +52,7 @@ impl Command {
..
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let total_blocks = benchmark_mode.total_blocks();
let buffer_size = self.rpc_block_buffer_size;
// Use a oneshot channel to propagate errors from the spawned task
@@ -82,8 +83,8 @@ impl Command {
}
});
// put results in a summary vec so they can be printed at the end
let mut results = Vec::new();
let mut blocks_processed = 0u64;
let total_benchmark_duration = Instant::now();
let mut total_wait_time = Duration::ZERO;
@@ -105,7 +106,12 @@ impl Command {
call_new_payload(&auth_provider, version, params).await?;
let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
info!(target: "reth-bench", %new_payload_result);
blocks_processed += 1;
let progress = match total_blocks {
Some(total) => format!("{blocks_processed}/{total}"),
None => format!("{blocks_processed}"),
};
info!(target: "reth-bench", progress, %new_payload_result);
// current duration since the start of the benchmark minus the time
// waiting for blocks

View File

@@ -154,12 +154,18 @@ impl PersistenceSubscription {
}
/// Establishes a websocket connection and subscribes to `reth_subscribePersistedBlock`.
///
/// The `keepalive_interval` is set to match `persistence_timeout` so that the `WebSocket`
/// connection is not dropped during long MDBX commits that block the server from responding
/// to pings.
pub(crate) async fn setup_persistence_subscription(
ws_url: Url,
persistence_timeout: Duration,
) -> eyre::Result<PersistenceSubscription> {
info!(target: "reth-bench", "Connecting to WebSocket at {} for persistence subscription", ws_url);
let ws_connect = WsConnect::new(ws_url.to_string());
let ws_connect =
WsConnect::new(ws_url.to_string()).with_keepalive_interval(persistence_timeout);
let client = RpcClient::connect_pubsub(ws_connect)
.await
.wrap_err("Failed to connect to WebSocket RPC endpoint")?;

View File

@@ -14,6 +14,7 @@
use crate::{
authenticated_transport::AuthenticatedTransportConnect,
bench::{
helpers::parse_duration,
output::{
write_benchmark_results, CombinedResult, GasRampPayloadFile, NewPayloadResult,
TotalGasOutput, TotalGasRow,
@@ -30,7 +31,6 @@ use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret};
use clap::Parser;
use eyre::Context;
use humantime::parse_duration;
use reth_cli_runner::CliContext;
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
use reth_node_api::EngineApiMessageVersion;
@@ -78,6 +78,9 @@ pub struct Command {
output: Option<PathBuf>,
/// How long to wait after a forkchoice update before sending the next payload.
///
/// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
/// milliseconds (e.g. `400`).
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
wait_time: Option<Duration>,
@@ -166,7 +169,7 @@ impl Command {
let mut waiter = match (self.wait_time, self.wait_for_persistence) {
(Some(duration), true) => {
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
let sub = setup_persistence_subscription(ws_url).await?;
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
Some(PersistenceWaiter::with_duration_and_subscription(
duration,
sub,
@@ -177,7 +180,7 @@ impl Command {
(Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
(None, true) => {
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
let sub = setup_persistence_subscription(ws_url).await?;
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
Some(PersistenceWaiter::with_subscription(
sub,
self.persistence_threshold,
@@ -338,7 +341,8 @@ impl Command {
};
let current_duration = total_benchmark_duration.elapsed();
info!(target: "reth-bench", %combined_result);
let progress = format!("{}/{}", i + 1, payloads.len());
info!(target: "reth-bench", progress, %combined_result);
if let Some(w) = &mut waiter {
w.on_block(block_number).await?;

View File

@@ -20,6 +20,19 @@ impl BenchMode {
}
}
/// Returns the total number of blocks in the benchmark, if known.
///
/// For [`BenchMode::Range`] this is the length of the range.
/// For [`BenchMode::Continuous`] the total is unbounded, so `None` is returned.
pub const fn total_blocks(&self) -> Option<u64> {
match self {
Self::Continuous(_) => None,
Self::Range(range) => {
Some(range.end().saturating_sub(*range.start()).saturating_add(1))
}
}
}
/// Create a [`BenchMode`] from optional `from` and `to` fields.
pub fn new(from: Option<u64>, to: Option<u64>, latest_block: u64) -> Result<Self, eyre::Error> {
// If neither `--from` nor `--to` are provided, we will run the benchmark continuously,

View File

@@ -33,7 +33,6 @@ reth-chainspec.workspace = true
reth-primitives.workspace = true
reth-db = { workspace = true, features = ["mdbx"] }
reth-provider.workspace = true
reth-evm.workspace = true
reth-revm.workspace = true
reth-transaction-pool.workspace = true
reth-cli-runner.workspace = true
@@ -53,32 +52,31 @@ reth-payload-primitives.workspace = true
reth-node-api.workspace = true
reth-node-core.workspace = true
reth-ethereum-payload-builder.workspace = true
reth-ethereum-primitives.workspace = true
reth-node-ethereum.workspace = true
reth-node-builder.workspace = true
reth-node-metrics.workspace = true
reth-consensus.workspace = true
reth-tokio-util.workspace = true
reth-ress-protocol.workspace = true
reth-ress-provider.workspace = true
# alloy
alloy-primitives.workspace = true
alloy-rpc-types = { workspace = true, features = ["engine"] }
# tracing
tracing.workspace = true
# async
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] }
# misc
aquamarine.workspace = true
clap = { workspace = true, features = ["derive", "env"] }
eyre.workspace = true
[dev-dependencies]
alloy-node-bindings = "1.6.3"
alloy-provider = { workspace = true, features = ["reqwest"] }
alloy-rpc-types-eth.workspace = true
backon.workspace = true
serde_json.workspace = true
tempfile.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
toml.workspace = true
[features]
default = [
@@ -115,10 +113,12 @@ asm-keccak = [
"reth-primitives/asm-keccak",
"reth-ethereum-cli/asm-keccak",
"reth-node-ethereum/asm-keccak",
"alloy-primitives/asm-keccak",
]
keccak-cache-global = [
"reth-node-core/keccak-cache-global",
"reth-node-ethereum/keccak-cache-global",
"alloy-primitives/keccak-cache-global",
]
jemalloc = [
"reth-cli-util/jemalloc",

View File

@@ -15,7 +15,6 @@
//! - `asm-keccak`: Replaces the default, pure-Rust implementation of Keccak256 with one implemented
//! in assembly; see [the `keccak-asm` crate](https://github.com/DaniPopes/keccak-asm) for more
//! details and supported targets.
//! - `min-debug-logs`: Disables all logs below `debug` level.
//!
//! ### Allocator Features
//!
@@ -52,6 +51,9 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg))]
// Used in feature flags only (`asm-keccak`, `keccak-cache-global`)
use alloy_primitives as _;
pub mod cli;
/// Re-exported utils.
@@ -206,12 +208,9 @@ pub mod rpc {
}
}
/// Ress subprotocol installation.
pub mod ress;
// re-export for convenience
#[doc(inline)]
pub use reth_cli_runner::{tokio_runtime, CliContext, CliRunner};
pub use reth_cli_runner::{CliContext, CliRunner};
// for rendering diagrams
use aquamarine as _;
@@ -219,3 +218,4 @@ use aquamarine as _;
// used in main
use clap as _;
use reth_cli_util as _;
use tracing as _;

View File

@@ -8,9 +8,8 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::ne
static MALLOC_CONF: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
use clap::Parser;
use reth::{args::RessArgs, cli::Cli, ress::install_ress_subprotocol};
use reth::cli::Cli;
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
use reth_node_builder::NodeHandle;
use reth_node_ethereum::EthereumNode;
use tracing::info;
@@ -22,27 +21,12 @@ fn main() {
unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
}
if let Err(err) =
Cli::<EthereumChainSpecParser, RessArgs>::parse().run(async move |builder, ress_args| {
info!(target: "reth::cli", "Launching node");
let NodeHandle { node, node_exit_future } =
builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?;
if let Err(err) = Cli::<EthereumChainSpecParser>::parse().run(async move |builder, _| {
info!(target: "reth::cli", "Launching node");
let handle = builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?;
// Install ress subprotocol.
if ress_args.enabled {
install_ress_subprotocol(
ress_args,
node.provider,
node.evm_config,
node.network,
node.task_executor,
node.add_ons_handle.engine_events.new_listener(),
)?;
}
node_exit_future.await
})
{
handle.wait_for_node_exit().await
}) {
eprintln!("Error: {err:?}");
std::process::exit(1);
}

View File

@@ -1,67 +0,0 @@
use reth_ethereum_primitives::EthPrimitives;
use reth_evm::ConfigureEvm;
use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols};
use reth_network_api::FullNetwork;
use reth_node_api::ConsensusEngineEvent;
use reth_node_core::args::RessArgs;
use reth_provider::providers::{BlockchainProvider, ProviderNodeTypes};
use reth_ress_protocol::{NodeType, ProtocolState, RessProtocolHandler};
use reth_ress_provider::{maintain_pending_state, PendingState, RethRessProtocolProvider};
use reth_tasks::TaskExecutor;
use reth_tokio_util::EventStream;
use tokio::sync::mpsc;
use tracing::*;
/// Install `ress` subprotocol if it's enabled.
pub fn install_ress_subprotocol<P, E, N>(
args: RessArgs,
provider: BlockchainProvider<P>,
evm_config: E,
network: N,
task_executor: TaskExecutor,
engine_events: EventStream<ConsensusEngineEvent<EthPrimitives>>,
) -> eyre::Result<()>
where
P: ProviderNodeTypes<Primitives = EthPrimitives>,
E: ConfigureEvm<Primitives = EthPrimitives> + Clone + 'static,
N: FullNetwork + NetworkProtocols,
{
info!(target: "reth::cli", "Installing ress subprotocol");
let pending_state = PendingState::default();
// Spawn maintenance task for pending state.
task_executor.spawn(maintain_pending_state(
engine_events,
provider.clone(),
pending_state.clone(),
));
let (tx, mut rx) = mpsc::unbounded_channel();
let provider = RethRessProtocolProvider::new(
provider,
evm_config,
Box::new(task_executor.clone()),
args.max_witness_window,
args.witness_max_parallel,
args.witness_cache_size,
pending_state,
)?;
network.add_rlpx_sub_protocol(
RessProtocolHandler {
provider,
node_type: NodeType::Stateful,
peers_handle: network.peers_handle().clone(),
max_active_connections: args.max_active_connections,
state: ProtocolState::new(tx),
}
.into_rlpx_sub_protocol(),
);
info!(target: "reth::cli", "Ress subprotocol support enabled");
task_executor.spawn(async move {
while let Some(event) = rx.recv().await {
trace!(target: "reth::ress", ?event, "Received ress event");
}
});
Ok(())
}

255
bin/reth/tests/it/main.rs Normal file
View File

@@ -0,0 +1,255 @@
#![allow(missing_docs)]
use std::process::Command;
const RETH: &str = env!("CARGO_BIN_EXE_reth");
// ── Helpers ──────────────────────────────────────────────────────────────────
/// Runs `reth <args>` and returns stdout, asserting exit code 0.
///
/// Tracing is suppressed via `RUST_LOG=off` so that log lines emitted during
/// binary startup don't pollute stdout-based assertions.
#[track_caller]
fn reth_ok(args: &[&str]) -> String {
let output = Command::new(RETH).env("RUST_LOG", "off").args(args).output().unwrap();
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(output.status.success(), "args {args:?} failed.\nstdout: {stdout}\nstderr: {stderr}");
stdout.into_owned()
}
/// Spawns an isolated dev-mode reth node.
///
/// Discovery is disabled and peer limits are zeroed so the node is fully
/// isolated. Each call gets a unique temporary data directory so that
/// concurrent test runs never collide on the default `reth/dev/` path.
fn spawn_dev() -> (alloy_node_bindings::RethInstance, tempfile::TempDir) {
use alloy_node_bindings::Reth;
let datadir = tempfile::tempdir().expect("failed to create temp dir");
let instance = Reth::at(RETH)
.dev()
.disable_discovery()
.data_dir(datadir.path())
.args(["--max-outbound-peers", "0", "--max-inbound-peers", "0"])
.spawn();
// Return the TempDir alongside the instance so it lives as long as the node.
(instance, datadir)
}
// ── Original tests (from PR #22069) ──────────────────────────────────────────
#[test]
fn help() {
let stdout = reth_ok(&["--help"]);
assert!(stdout.contains("Usage"), "stdout: {stdout}");
assert!(stdout.contains("node"), "stdout: {stdout}");
}
#[test]
fn version() {
let stdout = reth_ok(&["--version"]);
assert!(stdout.to_lowercase().contains("reth"), "stdout: {stdout}");
}
#[test]
fn node_help() {
let stdout = reth_ok(&["node", "--help"]);
assert!(stdout.contains("--dev"), "stdout: {stdout}");
assert!(stdout.contains("--http"), "stdout: {stdout}");
}
#[test]
fn unknown_subcommand() {
let output = Command::new(RETH).arg("definitely-not-a-cmd").output().unwrap();
assert!(!output.status.success());
}
#[test]
fn unknown_flag() {
let output = Command::new(RETH).args(["node", "--no-such-flag"]).output().unwrap();
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(!output.status.success());
assert!(stderr.contains("--no-such-flag"), "stderr: {stderr}");
}
#[tokio::test]
async fn dev_node_eth_syncing() {
use alloy_provider::{Provider, ProviderBuilder};
let (reth, _datadir) = spawn_dev();
let provider = ProviderBuilder::new().connect_http(reth.endpoint().parse().unwrap());
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let _syncing = provider.syncing().await.expect("eth_syncing failed");
}
// ── Subcommand --help coverage ───────────────────────────────────────────────
//
// Every registered subcommand must produce valid --help output. This catches
// clap wiring regressions (e.g. a missing field, a conflicting arg name, or a
// broken `help_message()` call) that would otherwise only surface when a user
// runs the command.
#[test]
fn init_help() {
let stdout = reth_ok(&["init", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn init_state_help() {
let stdout = reth_ok(&["init-state", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn import_help() {
let stdout = reth_ok(&["import", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn import_era_help() {
let stdout = reth_ok(&["import-era", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn export_era_help() {
let stdout = reth_ok(&["export-era", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn dump_genesis_help() {
let stdout = reth_ok(&["dump-genesis", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn db_help() {
let stdout = reth_ok(&["db", "--help"]);
assert!(stdout.contains("stats"), "stdout: {stdout}");
}
#[test]
fn stage_help() {
let stdout = reth_ok(&["stage", "--help"]);
assert!(stdout.contains("run"), "stdout: {stdout}");
}
#[test]
fn p2p_help() {
let stdout = reth_ok(&["p2p", "--help"]);
assert!(stdout.contains("header"), "stdout: {stdout}");
}
#[test]
fn config_help() {
let stdout = reth_ok(&["config", "--help"]);
assert!(stdout.contains("--default"), "stdout: {stdout}");
}
#[test]
fn prune_help() {
let stdout = reth_ok(&["prune", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn download_help() {
let stdout = reth_ok(&["download", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn re_execute_help() {
let stdout = reth_ok(&["re-execute", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
// ── `config --default` outputs valid TOML ────────────────────────────────────
#[test]
fn config_default_valid_toml() {
let stdout = reth_ok(&["config", "--default"]);
let parsed: toml::Value =
toml::from_str(&stdout).expect("config --default did not produce valid TOML");
// The default config must contain the [stages] table — this is the heart of
// the pipeline configuration and its absence would indicate a serialization
// regression.
assert!(parsed.get("stages").is_some(), "missing [stages] in config output");
}
// ── `dump-genesis` outputs valid JSON ────────────────────────────────────────
#[test]
fn dump_genesis_mainnet_valid_json() {
let stdout = reth_ok(&["dump-genesis"]);
let genesis: serde_json::Value =
serde_json::from_str(&stdout).expect("dump-genesis did not produce valid JSON");
assert!(genesis.get("nonce").is_some(), "missing nonce in genesis JSON");
assert!(genesis.get("alloc").is_some(), "missing alloc in genesis JSON");
}
#[test]
fn dump_genesis_sepolia_valid_json() {
let stdout = reth_ok(&["dump-genesis", "--chain", "sepolia"]);
let genesis: serde_json::Value = serde_json::from_str(&stdout)
.expect("dump-genesis --chain sepolia did not produce valid JSON");
assert!(genesis.get("alloc").is_some(), "missing alloc in sepolia genesis JSON");
}
// ── Dev node: send transaction round-trip ────────────────────────────────────
//
// Exercises the full pipeline: RPC submission → mempool → sealing → execution →
// receipt retrieval. Uses the pre-funded dev account so no genesis customization
// is required.
#[tokio::test]
async fn dev_node_send_tx_and_mine() {
use alloy_primitives::{Address, U256};
use alloy_provider::{Provider, ProviderBuilder};
use alloy_rpc_types_eth::TransactionRequest;
let (reth, _datadir) = spawn_dev();
let provider = ProviderBuilder::new().connect_http(reth.endpoint().parse().unwrap());
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// Dev mode pre-funds the first dev account.
let accounts = provider.get_accounts().await.expect("eth_accounts failed");
assert!(!accounts.is_empty(), "dev node should expose at least one account");
let sender = accounts[0];
let recipient = Address::with_last_byte(0x42);
let tx = TransactionRequest::default().from(sender).to(recipient).value(U256::from(1_000_000));
let tx_hash = provider.send_transaction(tx).await.expect("eth_sendTransaction failed");
// In dev/instant-mine mode the node seals a block for each transaction, so
// the receipt becomes available almost immediately.
let receipt = tx_hash.get_receipt().await.expect("failed to get receipt");
assert!(receipt.status(), "transaction should have succeeded");
assert_eq!(receipt.to, Some(recipient));
assert!(receipt.block_number.unwrap() > 0, "receipt should be in a mined block");
// Verify the transfer actually mutated state.
let balance = provider.get_balance(recipient).await.expect("eth_getBalance failed");
assert_eq!(balance, U256::from(1_000_000));
}
const fn main() {}

View File

@@ -312,6 +312,11 @@ impl DeferredTrieData {
/// Given that invariant, circular wait dependencies are impossible.
#[instrument(level = "debug", target = "engine::tree::deferred_trie", skip_all)]
pub fn wait_cloned(&self) -> ComputedTrieData {
#[cfg(feature = "rayon")]
debug_assert!(
rayon::current_thread_index().is_none(),
"wait_cloned must not be called from a rayon worker thread"
);
let mut state = self.state.lock();
match &mut *state {
// If the deferred trie data is ready, return the cached result.

View File

@@ -1061,6 +1061,14 @@ mod tests {
) -> ProviderResult<Option<StorageValue>> {
Ok(None)
}
fn storage_by_hashed_key(
&self,
_address: Address,
_hashed_storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
Ok(None)
}
}
impl BytecodeReader for MockStateProvider {

View File

@@ -223,6 +223,26 @@ impl<N: NodePrimitives> StateProvider for MemoryOverlayStateProviderRef<'_, N> {
self.historical.storage(address, storage_key)
}
fn storage_by_hashed_key(
&self,
address: Address,
hashed_storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
let hashed_address = keccak256(address);
let state = &self.trie_input().state;
if let Some(hs) = state.storages.get(&hashed_address) {
if let Some(value) = hs.storage.get(&hashed_storage_key) {
return Ok(Some(*value));
}
if hs.wiped {
return Ok(Some(StorageValue::ZERO));
}
}
self.historical.storage_by_hashed_key(address, hashed_storage_key)
}
}
impl<N: NodePrimitives> BytecodeReader for MemoryOverlayStateProviderRef<'_, N> {

View File

@@ -66,7 +66,8 @@ pub trait RethCli: Sized {
F: FnOnce(Self, CliRunner) -> R,
{
let cli = Self::parse_args()?;
let runner = CliRunner::try_default_runtime()?;
let runner = CliRunner::try_default_runtime()
.map_err(|e| Error::raw(clap::error::ErrorKind::Io, e))?;
Ok(cli.with_runner(f, runner))
}

View File

@@ -19,7 +19,7 @@ use reth_node_builder::{
Node, NodeComponents, NodeComponentsBuilder, NodeTypes, NodeTypesWithDBAdapter,
};
use reth_node_core::{
args::{DatabaseArgs, DatadirArgs, RocksDbArgs, StaticFilesArgs, StorageArgs},
args::{DatabaseArgs, DatadirArgs, StaticFilesArgs, StorageArgs},
dirs::{ChainPath, DataDirPath},
};
use reth_provider::{
@@ -67,70 +67,35 @@ pub struct EnvironmentArgs<C: ChainSpecParser> {
#[command(flatten)]
pub static_files: StaticFilesArgs,
/// All `RocksDB` related arguments
#[command(flatten)]
pub rocksdb: RocksDbArgs,
/// Storage mode configuration (v2 vs v1/legacy)
#[command(flatten)]
pub storage: StorageArgs,
}
impl<C: ChainSpecParser> EnvironmentArgs<C> {
/// Returns the effective storage settings derived from `--storage.v2`, static-file, and
/// `RocksDB` CLI args.
/// Returns the effective storage settings derived from `--storage.v2`.
///
/// The base storage mode is determined by `--storage.v2`:
/// - When `--storage.v2` is set: uses [`StorageSettings::v2()`] defaults
/// - Otherwise: uses [`StorageSettings::v1()`] defaults
///
/// Individual `--static-files.*` and `--rocksdb.*` flags override the base when explicitly set.
/// - Otherwise: uses [`StorageSettings::base()`] defaults
pub fn storage_settings(&self) -> StorageSettings {
let mut s = if self.storage.v2 { StorageSettings::v2() } else { StorageSettings::base() };
// Apply static files overrides (only when explicitly set)
if let Some(v) = self.static_files.receipts {
s = s.with_receipts_in_static_files(v);
if self.storage.v2 {
StorageSettings::v2()
} else {
StorageSettings::base()
}
if let Some(v) = self.static_files.transaction_senders {
s = s.with_transaction_senders_in_static_files(v);
}
if let Some(v) = self.static_files.account_changesets {
s = s.with_account_changesets_in_static_files(v);
}
if let Some(v) = self.static_files.storage_changesets {
s = s.with_storage_changesets_in_static_files(v);
}
// Apply rocksdb overrides
// --rocksdb.all sets all rocksdb flags to true
if self.rocksdb.all {
s = s
.with_transaction_hash_numbers_in_rocksdb(true)
.with_storages_history_in_rocksdb(true)
.with_account_history_in_rocksdb(true);
}
// Individual rocksdb flags override --rocksdb.all when explicitly set
if let Some(v) = self.rocksdb.tx_hash {
s = s.with_transaction_hash_numbers_in_rocksdb(v);
}
if let Some(v) = self.rocksdb.storages_history {
s = s.with_storages_history_in_rocksdb(v);
}
if let Some(v) = self.rocksdb.account_history {
s = s.with_account_history_in_rocksdb(v);
}
s
}
/// Initializes environment according to [`AccessRights`] and returns an instance of
/// [`Environment`].
///
/// Internally builds a [`reth_tasks::Runtime`] attached to the current tokio handle for
/// parallel storage I/O.
pub fn init<N: CliNodeTypes>(&self, access: AccessRights) -> eyre::Result<Environment<N>>
where
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
{
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
let data_dir = self.datadir.clone().resolve_datadir(self.chain.chain());
let db_path = data_dir.db();
let sf_path = data_dir.static_files();
@@ -186,7 +151,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
.build()?;
let provider_factory =
self.create_provider_factory(&config, db, sfp, rocksdb_provider, access)?;
self.create_provider_factory(&config, db, sfp, rocksdb_provider, access, runtime)?;
if access.is_read_write() {
debug!(target: "reth::cli", chain=%self.chain.chain(), genesis=?self.chain.genesis_hash(), "Initializing genesis");
init_genesis_with_settings(&provider_factory, self.storage_settings())?;
@@ -207,6 +172,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
static_file_provider: StaticFileProvider<N::Primitives>,
rocksdb_provider: RocksDBProvider,
access: AccessRights,
runtime: reth_tasks::Runtime,
) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>>>
where
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
@@ -217,6 +183,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
self.chain.clone(),
static_file_provider,
rocksdb_provider,
runtime,
)?
.with_prune_modes(prune_modes.clone());

View File

@@ -5,6 +5,7 @@ use reth_codecs::Compact;
use reth_db_api::{cursor::DbDupCursorRO, database::Database, tables, transaction::DbTx};
use reth_db_common::DbTool;
use reth_node_builder::NodeTypesWithDB;
use reth_storage_api::StorageSettingsCache;
use std::time::{Duration, Instant};
use tracing::info;
@@ -22,52 +23,94 @@ impl Command {
/// Execute `db account-storage` command
pub fn execute<N: NodeTypesWithDB>(self, tool: &DbTool<N>) -> eyre::Result<()> {
let address = self.address;
let (slot_count, plain_size) = tool.provider_factory.db_ref().view(|tx| {
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let mut count = 0usize;
let mut total_value_bytes = 0usize;
let mut last_log = Instant::now();
let use_hashed_state = tool.provider_factory.cached_storage_settings().use_hashed_state();
// Walk all storage entries for this address
let walker = cursor.walk_dup(Some(address), None)?;
for entry in walker {
let (_, storage_entry) = entry?;
count += 1;
// StorageEntry encodes as: 32 bytes (key/subkey uncompressed) + compressed U256
let mut buf = Vec::new();
let entry_len = storage_entry.to_compact(&mut buf);
total_value_bytes += entry_len;
let (slot_count, storage_size) = if use_hashed_state {
let hashed_address = keccak256(address);
tool.provider_factory.db_ref().view(|tx| {
let mut cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
let mut count = 0usize;
let mut total_value_bytes = 0usize;
let mut last_log = Instant::now();
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
address = %address,
slots = count,
key = %storage_entry.key,
"Processing storage slots"
);
last_log = Instant::now();
let walker = cursor.walk_dup(Some(hashed_address), None)?;
for entry in walker {
let (_, storage_entry) = entry?;
count += 1;
let mut buf = Vec::new();
let entry_len = storage_entry.to_compact(&mut buf);
total_value_bytes += entry_len;
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
address = %address,
slots = count,
key = %storage_entry.key,
"Processing hashed storage slots"
);
last_log = Instant::now();
}
}
}
// Add 20 bytes for the Address key (stored once per account in dupsort)
let total_size = if count > 0 { 20 + total_value_bytes } else { 0 };
let total_size = if count > 0 { 32 + total_value_bytes } else { 0 };
Ok::<_, eyre::Report>((count, total_size))
})??;
Ok::<_, eyre::Report>((count, total_size))
})??
} else {
tool.provider_factory.db_ref().view(|tx| {
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let mut count = 0usize;
let mut total_value_bytes = 0usize;
let mut last_log = Instant::now();
// Estimate hashed storage size: 32-byte B256 key instead of 20-byte Address
let hashed_size_estimate = if slot_count > 0 { plain_size + 12 } else { 0 };
let total_estimate = plain_size + hashed_size_estimate;
// Walk all storage entries for this address
let walker = cursor.walk_dup(Some(address), None)?;
for entry in walker {
let (_, storage_entry) = entry?;
count += 1;
let mut buf = Vec::new();
// StorageEntry encodes as: 32 bytes (key/subkey uncompressed) + compressed U256
let entry_len = storage_entry.to_compact(&mut buf);
total_value_bytes += entry_len;
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
address = %address,
slots = count,
key = %storage_entry.key,
"Processing storage slots"
);
last_log = Instant::now();
}
}
// Add 20 bytes for the Address key (stored once per account in dupsort)
let total_size = if count > 0 { 20 + total_value_bytes } else { 0 };
Ok::<_, eyre::Report>((count, total_size))
})??
};
let hashed_address = keccak256(address);
println!("Account: {address}");
println!("Hashed address: {hashed_address}");
println!("Storage slots: {slot_count}");
println!("Plain storage size: {} (estimated)", human_bytes(plain_size as f64));
println!("Hashed storage size: {} (estimated)", human_bytes(hashed_size_estimate as f64));
println!("Total estimated size: {}", human_bytes(total_estimate as f64));
if use_hashed_state {
println!("Hashed storage size: {} (estimated)", human_bytes(storage_size as f64));
} else {
// Estimate hashed storage size: 32-byte B256 key instead of 20-byte Address
let hashed_size_estimate = if slot_count > 0 { storage_size + 12 } else { 0 };
let total_estimate = storage_size + hashed_size_estimate;
println!("Plain storage size: {} (estimated)", human_bytes(storage_size as f64));
println!(
"Hashed storage size: {} (estimated)",
human_bytes(hashed_size_estimate as f64)
);
println!("Total estimated size: {}", human_bytes(total_estimate as f64));
}
Ok(())
}

View File

@@ -0,0 +1,61 @@
use clap::Parser;
use reth_db::mdbx::{self, ffi};
use std::path::PathBuf;
/// Copies the MDBX database to a new location.
///
/// Equivalent to the standalone `mdbx_copy` tool but bundled into reth.
#[derive(Parser, Debug)]
pub struct Command {
/// Destination path for the database copy.
dest: PathBuf,
/// Compact the database while copying (reclaims free space).
#[arg(short, long)]
compact: bool,
/// Force dynamic size for the destination database.
#[arg(short = 'd', long)]
force_dynamic_size: bool,
/// Throttle to avoid MVCC pressure on writers.
#[arg(short = 'p', long)]
throttle_mvcc: bool,
}
impl Command {
/// Execute `db copy` command
pub fn execute(self, db: &mdbx::DatabaseEnv) -> eyre::Result<()> {
let mut flags: ffi::MDBX_copy_flags_t = ffi::MDBX_CP_DEFAULTS;
if self.compact {
flags |= ffi::MDBX_CP_COMPACT;
}
if self.force_dynamic_size {
flags |= ffi::MDBX_CP_FORCE_DYNAMIC_SIZE;
}
if self.throttle_mvcc {
flags |= ffi::MDBX_CP_THROTTLE_MVCC;
}
let dest = self
.dest
.to_str()
.ok_or_else(|| eyre::eyre!("destination path must be valid UTF-8"))?;
let dest_cstr = std::ffi::CString::new(dest)?;
println!("Copying database to {} ...", self.dest.display());
let rc = db.with_raw_env_ptr(|env_ptr| unsafe {
ffi::mdbx_env_copy(env_ptr, dest_cstr.as_ptr(), flags)
});
if rc != 0 {
eyre::bail!("mdbx_env_copy failed with error code {rc}: {}", unsafe {
std::ffi::CStr::from_ptr(ffi::mdbx_strerror(rc)).to_string_lossy()
});
}
println!("Done.");
Ok(())
}
}

View File

@@ -98,7 +98,8 @@ impl Command {
)?;
if let Some(entry) = entry {
println!("{}", serde_json::to_string_pretty(&entry)?);
let se: reth_primitives_traits::StorageEntry = entry.into();
println!("{}", serde_json::to_string_pretty(&se)?);
} else {
error!(target: "reth::cli", "No content for the given table key.");
}
@@ -106,7 +107,14 @@ impl Command {
}
let changesets = provider.storage_changeset(key.block_number())?;
println!("{}", serde_json::to_string_pretty(&changesets)?);
let serializable: Vec<_> = changesets
.into_iter()
.map(|(addr, entry)| {
let se: reth_primitives_traits::StorageEntry = entry.into();
(addr, se)
})
.collect();
println!("{}", serde_json::to_string_pretty(&serializable)?);
return Ok(());
}

View File

@@ -12,6 +12,7 @@ use std::{
mod account_storage;
mod checksum;
mod clear;
mod copy;
mod diff;
mod get;
mod list;
@@ -42,6 +43,8 @@ pub enum Subcommands {
List(list::Command),
/// Calculates the content checksum of a table or static file segment
Checksum(checksum::Command),
/// Copies the MDBX database to a new location (bundled mdbx_copy)
Copy(copy::Command),
/// Create a diff between two database tables or two entire databases.
Diff(diff::Command),
/// Gets the content of a table for the given key
@@ -70,23 +73,23 @@ pub enum Subcommands {
State(state::Command),
}
/// Initializes a provider factory with specified access rights, and then execute with the provided
/// command
macro_rules! db_exec {
($env:expr, $tool:ident, $N:ident, $access_rights:expr, $command:block) => {
let Environment { provider_factory, .. } = $env.init::<$N>($access_rights)?;
let $tool = DbTool::new(provider_factory)?;
$command;
};
}
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C> {
/// Execute `db` command
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(
self,
ctx: CliContext,
) -> eyre::Result<()> {
/// Initializes a provider factory with specified access rights, and then executes the
/// provided command.
macro_rules! db_exec {
($env:expr, $tool:ident, $N:ident, $access_rights:expr, $command:block) => {
let Environment { provider_factory, .. } = $env.init::<$N>($access_rights)?;
let $tool = DbTool::new(provider_factory)?;
$command;
};
}
let data_dir = self.env.datadir.clone().resolve_datadir(self.env.chain.chain());
let db_path = data_dir.db();
let static_files_path = data_dir.static_files();
@@ -124,6 +127,11 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
command.execute(&tool)?;
});
}
Subcommands::Copy(command) => {
db_exec!(self.env, tool, N, AccessRights::RO, {
command.execute(tool.provider_factory.db_ref())?;
});
}
Subcommands::Diff(command) => {
db_exec!(self.env, tool, N, AccessRights::RO, {
command.execute(&tool)?;

View File

@@ -64,7 +64,7 @@ impl Command {
let executor = task_executor.clone();
let pprof_dump_dir = data_dir.pprof_dumps();
let handle = task_executor.spawn_critical("metrics server", async move {
let handle = task_executor.spawn_critical_task("metrics server", async move {
let config = MetricServerConfig::new(
listen_addr,
VersionInfo {

View File

@@ -39,50 +39,12 @@ enum Subcommands {
#[derive(Debug, Clone, Copy, Subcommand)]
#[clap(rename_all = "snake_case")]
pub enum SetCommand {
/// Store receipts in static files instead of the database
Receipts {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store transaction senders in static files instead of the database
TransactionSenders {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store account changesets in static files instead of the database
AccountChangesets {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store storage history in rocksdb instead of MDBX
StoragesHistory {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store transaction hash to number mapping in rocksdb instead of MDBX
TransactionHashNumbers {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store account history in rocksdb instead of MDBX
AccountHistory {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store storage changesets in static files instead of the database
StorageChangesets {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Use hashed state tables (HashedAccounts/HashedStorages) as canonical state
/// Enable or disable v2 storage layout
///
/// When enabled, execution writes directly to hashed tables, eliminating need for
/// separate hashing stages. State reads come from hashed tables.
///
/// WARNING: Changing this setting in either direction requires re-syncing the database.
/// Enabling on an existing plain-state database leaves hashed tables empty.
/// Disabling on an existing hashed-state database leaves plain tables empty.
UseHashedState {
/// When enabled, uses static files for receipts/senders/changesets and RocksDB for
/// history indices and transaction hashes. When disabled, uses v1/legacy layout (everything in
/// MDBX).
V2 {
#[clap(action(ArgAction::Set))]
value: bool,
},
@@ -125,87 +87,18 @@ impl Command {
println!("No storage settings found, creating new settings.");
}
let mut settings @ StorageSettings {
receipts_in_static_files: _,
transaction_senders_in_static_files: _,
storages_history_in_rocksdb: _,
transaction_hash_numbers_in_rocksdb: _,
account_history_in_rocksdb: _,
account_changesets_in_static_files: _,
storage_changesets_in_static_files: _,
use_hashed_state: _,
} = settings.unwrap_or_else(StorageSettings::v1);
let mut settings @ StorageSettings { storage_v2: _ } =
settings.unwrap_or_else(StorageSettings::v1);
// Update the setting based on the key
match cmd {
SetCommand::Receipts { value } => {
if settings.receipts_in_static_files == value {
println!("receipts_in_static_files is already set to {}", value);
SetCommand::V2 { value } => {
if settings.storage_v2 == value {
println!("storage_v2 is already set to {}", value);
return Ok(());
}
settings.receipts_in_static_files = value;
println!("Set receipts_in_static_files = {}", value);
}
SetCommand::TransactionSenders { value } => {
if settings.transaction_senders_in_static_files == value {
println!("transaction_senders_in_static_files is already set to {}", value);
return Ok(());
}
settings.transaction_senders_in_static_files = value;
println!("Set transaction_senders_in_static_files = {}", value);
}
SetCommand::AccountChangesets { value } => {
if settings.account_changesets_in_static_files == value {
println!("account_changesets_in_static_files is already set to {}", value);
return Ok(());
}
settings.account_changesets_in_static_files = value;
println!("Set account_changesets_in_static_files = {}", value);
}
SetCommand::StoragesHistory { value } => {
if settings.storages_history_in_rocksdb == value {
println!("storages_history_in_rocksdb is already set to {}", value);
return Ok(());
}
settings.storages_history_in_rocksdb = value;
println!("Set storages_history_in_rocksdb = {}", value);
}
SetCommand::TransactionHashNumbers { value } => {
if settings.transaction_hash_numbers_in_rocksdb == value {
println!("transaction_hash_numbers_in_rocksdb is already set to {}", value);
return Ok(());
}
settings.transaction_hash_numbers_in_rocksdb = value;
println!("Set transaction_hash_numbers_in_rocksdb = {}", value);
}
SetCommand::AccountHistory { value } => {
if settings.account_history_in_rocksdb == value {
println!("account_history_in_rocksdb is already set to {}", value);
return Ok(());
}
settings.account_history_in_rocksdb = value;
println!("Set account_history_in_rocksdb = {}", value);
}
SetCommand::StorageChangesets { value } => {
if settings.storage_changesets_in_static_files == value {
println!("storage_changesets_in_static_files is already set to {}", value);
return Ok(());
}
settings.storage_changesets_in_static_files = value;
println!("Set storage_changesets_in_static_files = {}", value);
}
SetCommand::UseHashedState { value } => {
if settings.use_hashed_state == value {
println!("use_hashed_state is already set to {}", value);
return Ok(());
}
if settings.use_hashed_state && !value {
println!("WARNING: Disabling use_hashed_state on an existing hashed-state database requires a full resync.");
} else {
println!("WARNING: Enabling use_hashed_state on an existing plain-state database requires a full resync.");
}
settings.use_hashed_state = value;
println!("Set use_hashed_state = {}", value);
settings.storage_v2 = value;
println!("Set storage_v2 = {}", value);
}
}

View File

@@ -1,4 +1,4 @@
use alloy_primitives::{Address, BlockNumber, B256, U256};
use alloy_primitives::{keccak256, Address, BlockNumber, B256, U256};
use clap::Parser;
use parking_lot::Mutex;
use reth_db_api::{
@@ -63,39 +63,65 @@ impl Command {
address: Address,
limit: usize,
) -> eyre::Result<()> {
let use_hashed_state = tool.provider_factory.cached_storage_settings().use_hashed_state();
let entries = tool.provider_factory.db_ref().view(|tx| {
// Get account info
let account = tx.get::<tables::PlainAccountState>(address)?;
// Get storage entries
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let mut entries = Vec::new();
let mut last_log = Instant::now();
let walker = cursor.walk_dup(Some(address), None)?;
for (idx, entry) in walker.enumerate() {
let (_, storage_entry) = entry?;
if storage_entry.value != U256::ZERO {
entries.push((storage_entry.key, storage_entry.value));
let (account, walker_entries) = if use_hashed_state {
let hashed_address = keccak256(address);
let account = tx.get::<tables::HashedAccounts>(hashed_address)?;
let mut cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
let walker = cursor.walk_dup(Some(hashed_address), None)?;
let mut entries = Vec::new();
let mut last_log = Instant::now();
for (idx, entry) in walker.enumerate() {
let (_, storage_entry) = entry?;
if storage_entry.value != U256::ZERO {
entries.push((storage_entry.key, storage_entry.value));
}
if entries.len() >= limit {
break;
}
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
address = %address,
slots_scanned = idx,
"Scanning storage slots"
);
last_log = Instant::now();
}
}
if entries.len() >= limit {
break;
(account, entries)
} else {
// Get account info
let account = tx.get::<tables::PlainAccountState>(address)?;
// Get storage entries
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let walker = cursor.walk_dup(Some(address), None)?;
let mut entries = Vec::new();
let mut last_log = Instant::now();
for (idx, entry) in walker.enumerate() {
let (_, storage_entry) = entry?;
if storage_entry.value != U256::ZERO {
entries.push((storage_entry.key, storage_entry.value));
}
if entries.len() >= limit {
break;
}
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
address = %address,
slots_scanned = idx,
"Scanning storage slots"
);
last_log = Instant::now();
}
}
(account, entries)
};
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
address = %address,
slots_scanned = idx,
"Scanning storage slots"
);
last_log = Instant::now();
}
}
Ok::<_, eyre::Report>((account, entries))
Ok::<_, eyre::Report>((account, walker_entries))
})??;
let (account, storage_entries) = entries;
@@ -119,7 +145,7 @@ impl Command {
// Check storage settings to determine where history is stored
let storage_settings = tool.provider_factory.cached_storage_settings();
let history_in_rocksdb = storage_settings.storages_history_in_rocksdb;
let history_in_rocksdb = storage_settings.storage_v2;
// For historical queries, enumerate keys from history indices only
// (not PlainStorageState, which reflects current state)

View File

@@ -37,6 +37,14 @@ pub struct DownloadDefaults {
pub available_snapshots: Vec<Cow<'static, str>>,
/// Default base URL for snapshots
pub default_base_url: Cow<'static, str>,
/// Default base URL for chain-aware snapshots.
///
/// When set, the chain ID is appended to form the full URL: `{base_url}/{chain_id}`.
/// For example, given a base URL of `https://snapshots.example.com` and chain ID `1`,
/// the resulting URL would be `https://snapshots.example.com/1`.
///
/// Falls back to [`default_base_url`](Self::default_base_url) when `None`.
pub default_chain_aware_base_url: Option<Cow<'static, str>>,
/// Optional custom long help text that overrides the generated help
pub long_help: Option<String>,
}
@@ -60,6 +68,7 @@ impl DownloadDefaults {
Cow::Borrowed("https://publicnode.com/snapshots (full nodes & testnets)"),
],
default_base_url: Cow::Borrowed(MERKLE_BASE_URL),
default_chain_aware_base_url: None,
long_help: None,
}
}
@@ -84,9 +93,11 @@ impl DownloadDefaults {
}
help.push_str(
"\nIf no URL is provided, the latest mainnet archive snapshot\nwill be proposed for download from ",
"\nIf no URL is provided, the latest archive snapshot for the selected chain\nwill be proposed for download from ",
);
help.push_str(
self.default_chain_aware_base_url.as_deref().unwrap_or(&self.default_base_url),
);
help.push_str(self.default_base_url.as_ref());
help.push_str(
".\n\nLocal file:// URLs are also supported for extracting snapshots from disk.",
);
@@ -111,6 +122,12 @@ impl DownloadDefaults {
self
}
/// Set the default chain-aware base URL.
pub fn with_chain_aware_base_url(mut self, url: impl Into<Cow<'static, str>>) -> Self {
self.default_chain_aware_base_url = Some(url.into());
self
}
/// Builder: Set custom long help text, overriding the generated help
pub fn with_long_help(mut self, help: impl Into<String>) -> Self {
self.long_help = Some(help.into());
@@ -142,7 +159,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
let url = match self.url {
Some(url) => url,
None => {
let url = get_latest_snapshot_url().await?;
let url = get_latest_snapshot_url(self.env.chain.chain().id()).await?;
info!(target: "reth::cli", "Using default snapshot URL: {}", url);
url
}
@@ -509,8 +526,12 @@ async fn stream_and_extract(url: &str, target_dir: &Path) -> Result<()> {
}
// Builds default URL for latest mainnet archive snapshot using configured defaults
async fn get_latest_snapshot_url() -> Result<String> {
let base_url = &DownloadDefaults::get_global().default_base_url;
async fn get_latest_snapshot_url(chain_id: u64) -> Result<String> {
let defaults = DownloadDefaults::get_global();
let base_url = match &defaults.default_chain_aware_base_url {
Some(url) => format!("{url}/{chain_id}"),
None => defaults.default_base_url.to_string(),
};
let latest_url = format!("{base_url}/latest.txt");
let filename = Client::new()
.get(latest_url)

View File

@@ -10,8 +10,8 @@ use reth_node_builder::NodeBuilder;
use reth_node_core::{
args::{
DatabaseArgs, DatadirArgs, DebugArgs, DevArgs, EngineArgs, EraArgs, MetricArgs,
NetworkArgs, PayloadBuilderArgs, PruningArgs, RocksDbArgs, RpcServerArgs, StaticFilesArgs,
StorageArgs, TxPoolArgs,
NetworkArgs, PayloadBuilderArgs, PruningArgs, RpcServerArgs, StaticFilesArgs, StorageArgs,
TxPoolArgs,
},
node_config::NodeConfig,
version,
@@ -103,10 +103,6 @@ pub struct NodeCommand<C: ChainSpecParser, Ext: clap::Args + fmt::Debug = NoArgs
#[command(flatten)]
pub pruning: PruningArgs,
/// All `RocksDB` table routing arguments
#[command(flatten)]
pub rocksdb: RocksDbArgs,
/// Engine cli arguments
#[command(flatten, next_help_heading = "Engine")]
pub engine: EngineArgs,
@@ -119,8 +115,8 @@ pub struct NodeCommand<C: ChainSpecParser, Ext: clap::Args + fmt::Debug = NoArgs
#[command(flatten, next_help_heading = "Static Files")]
pub static_files: StaticFilesArgs,
/// Storage mode configuration (v2 vs v1/legacy)
#[command(flatten)]
/// All storage related arguments with --storage prefix
#[command(flatten, next_help_heading = "Storage")]
pub storage: StorageArgs,
/// Additional cli arguments
@@ -175,7 +171,6 @@ where
db,
dev,
pruning,
rocksdb,
engine,
era,
static_files,
@@ -183,9 +178,6 @@ where
ext,
} = self;
// Validate RocksDB arguments
rocksdb.validate()?;
// set up node config
let mut node_config = NodeConfig {
datadir,
@@ -201,7 +193,6 @@ where
db,
dev,
pruning,
rocksdb,
engine,
era,
static_files,

View File

@@ -76,7 +76,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> PruneComma
// Set up cancellation token for graceful shutdown on Ctrl+C
let cancellation = CancellationToken::new();
let cancellation_clone = cancellation.clone();
ctx.task_executor.spawn_critical("prune-ctrl-c", async move {
ctx.task_executor.spawn_critical_task("prune-ctrl-c", async move {
tokio::signal::ctrl_c().await.expect("failed to listen for ctrl-c");
cancellation_clone.cancel();
});

View File

@@ -9,7 +9,10 @@ use reth_db_api::{
transaction::{DbTx, DbTxMut},
};
use reth_db_common::{
init::{insert_genesis_header, insert_genesis_history, insert_genesis_state},
init::{
insert_genesis_account_history, insert_genesis_header, insert_genesis_state,
insert_genesis_storage_history,
},
DbTool,
};
use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
@@ -42,12 +45,16 @@ impl<C: ChainSpecParser> Command<C> {
let tool = DbTool::new(provider_factory)?;
let static_file_segment = match self.stage {
StageEnum::Headers => Some(StaticFileSegment::Headers),
StageEnum::Bodies => Some(StaticFileSegment::Transactions),
StageEnum::Execution => Some(StaticFileSegment::Receipts),
StageEnum::Senders => Some(StaticFileSegment::TransactionSenders),
_ => None,
let static_file_segments = match self.stage {
StageEnum::Headers => vec![StaticFileSegment::Headers],
StageEnum::Bodies => vec![StaticFileSegment::Transactions],
StageEnum::Execution => vec![
StaticFileSegment::Receipts,
StaticFileSegment::AccountChangeSets,
StaticFileSegment::StorageChangeSets,
],
StageEnum::Senders => vec![StaticFileSegment::TransactionSenders],
_ => vec![],
};
// Calling `StaticFileProviderRW::prune_*` will instruct the writer to prune rows only
@@ -55,35 +62,33 @@ impl<C: ChainSpecParser> Command<C> {
// deleting the jar files, otherwise if the task were to be interrupted after we
// have deleted them, BUT before we have committed the checkpoints to the database, we'd
// lose essential data.
if let Some(static_file_segment) = static_file_segment {
let static_file_provider = tool.provider_factory.static_file_provider();
if let Some(highest_block) =
static_file_provider.get_highest_static_file_block(static_file_segment)
let static_file_provider = tool.provider_factory.static_file_provider();
for segment in static_file_segments {
if let Some(highest_block) = static_file_provider.get_highest_static_file_block(segment)
{
let mut writer = static_file_provider.latest_writer(static_file_segment)?;
let mut writer = static_file_provider.latest_writer(segment)?;
match static_file_segment {
match segment {
StaticFileSegment::Headers => {
// Prune all headers leaving genesis intact.
writer.prune_headers(highest_block)?;
}
StaticFileSegment::Transactions => {
let to_delete = static_file_provider
.get_highest_static_file_tx(static_file_segment)
.get_highest_static_file_tx(segment)
.map(|tx_num| tx_num + 1)
.unwrap_or_default();
writer.prune_transactions(to_delete, 0)?;
}
StaticFileSegment::Receipts => {
let to_delete = static_file_provider
.get_highest_static_file_tx(static_file_segment)
.get_highest_static_file_tx(segment)
.map(|tx_num| tx_num + 1)
.unwrap_or_default();
writer.prune_receipts(to_delete, 0)?;
}
StaticFileSegment::TransactionSenders => {
let to_delete = static_file_provider
.get_highest_static_file_tx(static_file_segment)
.get_highest_static_file_tx(segment)
.map(|tx_num| tx_num + 1)
.unwrap_or_default();
writer.prune_transaction_senders(to_delete, 0)?;
@@ -128,8 +133,15 @@ impl<C: ChainSpecParser> Command<C> {
reset_stage_checkpoint(tx, StageId::SenderRecovery)?;
}
StageEnum::Execution => {
tx.clear::<tables::PlainAccountState>()?;
tx.clear::<tables::PlainStorageState>()?;
if provider_rw.cached_storage_settings().use_hashed_state() {
tx.clear::<tables::HashedAccounts>()?;
tx.clear::<tables::HashedStorages>()?;
reset_stage_checkpoint(tx, StageId::AccountHashing)?;
reset_stage_checkpoint(tx, StageId::StorageHashing)?;
} else {
tx.clear::<tables::PlainAccountState>()?;
tx.clear::<tables::PlainStorageState>()?;
}
tx.clear::<tables::AccountChangeSets>()?;
tx.clear::<tables::StorageChangeSets>()?;
tx.clear::<tables::Bytecodes>()?;
@@ -171,29 +183,42 @@ impl<C: ChainSpecParser> Command<C> {
None,
)?;
}
StageEnum::AccountHistory | StageEnum::StorageHistory => {
StageEnum::AccountHistory => {
let settings = provider_rw.cached_storage_settings();
let rocksdb = tool.provider_factory.rocksdb_provider();
if settings.account_history_in_rocksdb {
if settings.storage_v2 {
rocksdb.clear::<tables::AccountsHistory>()?;
} else {
tx.clear::<tables::AccountsHistory>()?;
}
if settings.storages_history_in_rocksdb {
reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
insert_genesis_account_history(
&provider_rw,
self.env.chain.genesis().alloc.iter(),
)?;
}
StageEnum::StorageHistory => {
let settings = provider_rw.cached_storage_settings();
let rocksdb = tool.provider_factory.rocksdb_provider();
if settings.storage_v2 {
rocksdb.clear::<tables::StoragesHistory>()?;
} else {
tx.clear::<tables::StoragesHistory>()?;
}
reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
reset_stage_checkpoint(tx, StageId::IndexStorageHistory)?;
insert_genesis_history(&provider_rw, self.env.chain.genesis().alloc.iter())?;
insert_genesis_storage_history(
&provider_rw,
self.env.chain.genesis().alloc.iter(),
)?;
}
StageEnum::TxLookup => {
if provider_rw.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
if provider_rw.cached_storage_settings().storage_v2 {
tool.provider_factory
.rocksdb_provider()
.clear::<tables::TransactionHashNumbers>()?;

View File

@@ -37,12 +37,14 @@ where
unwind_and_copy(db_tool, from, tip_block_number, &output_db, evm_config.clone())?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
runtime,
)?,
to,
from,

View File

@@ -33,12 +33,14 @@ pub(crate) async fn dump_hashing_account_stage<N: ProviderNodeTypes<DB = Databas
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
runtime,
)?,
to,
from,

View File

@@ -23,12 +23,14 @@ pub(crate) async fn dump_hashing_storage_stage<N: ProviderNodeTypes<DB = Databas
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
runtime,
)?,
to,
from,

View File

@@ -57,12 +57,14 @@ where
unwind_and_copy(db_tool, (from, to), tip_block_number, &output_db, evm_config, consensus)?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
runtime,
)?,
to,
from,

View File

@@ -10,9 +10,10 @@
//! Entrypoint for running commands.
use reth_tasks::{TaskExecutor, TaskManager};
use reth_tasks::{PanickedTaskError, TaskExecutor};
use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
use tracing::{debug, error, trace};
use tokio::task::JoinHandle;
use tracing::{debug, error, info};
/// Executes CLI commands.
///
@@ -20,21 +21,24 @@ use tracing::{debug, error, trace};
#[derive(Debug)]
pub struct CliRunner {
config: CliRunnerConfig,
tokio_runtime: tokio::runtime::Runtime,
runtime: reth_tasks::Runtime,
}
impl CliRunner {
/// Attempts to create a new [`CliRunner`] using the default tokio
/// [`Runtime`](tokio::runtime::Runtime).
/// Attempts to create a new [`CliRunner`] using the default
/// [`Runtime`](reth_tasks::Runtime).
///
/// The default tokio runtime is multi-threaded, with both I/O and time drivers enabled.
pub fn try_default_runtime() -> Result<Self, std::io::Error> {
Ok(Self { config: CliRunnerConfig::default(), tokio_runtime: tokio_runtime()? })
/// The default runtime is multi-threaded, with both I/O and time drivers enabled.
pub fn try_default_runtime() -> Result<Self, reth_tasks::RuntimeBuildError> {
Self::try_with_runtime_config(reth_tasks::RuntimeConfig::default())
}
/// Create a new [`CliRunner`] from a provided tokio [`Runtime`](tokio::runtime::Runtime).
pub const fn from_runtime(tokio_runtime: tokio::runtime::Runtime) -> Self {
Self { config: CliRunnerConfig::new(), tokio_runtime }
/// Creates a new [`CliRunner`] with the given [`RuntimeConfig`](reth_tasks::RuntimeConfig).
pub fn try_with_runtime_config(
config: reth_tasks::RuntimeConfig,
) -> Result<Self, reth_tasks::RuntimeBuildError> {
let runtime = reth_tasks::RuntimeBuilder::new(config).build()?;
Ok(Self { config: CliRunnerConfig::default(), runtime })
}
/// Sets the [`CliRunnerConfig`] for this runner.
@@ -48,7 +52,7 @@ impl CliRunner {
where
F: Future<Output = T>,
{
self.tokio_runtime.block_on(fut)
self.runtime.handle().block_on(fut)
}
/// Executes the given _async_ command on the tokio runtime until the command future resolves or
@@ -64,12 +68,11 @@ impl CliRunner {
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
{
let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
AsyncCliRunner::new(self.tokio_runtime);
let (context, task_manager_handle) = cli_context(&self.runtime);
// Executes the command until it finished or ctrl-c was fired
let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
&mut task_manager,
let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
task_manager_handle,
run_until_ctrl_c(command(context)),
));
@@ -77,13 +80,13 @@ impl CliRunner {
error!(target: "reth::cli", "shutting down due to error");
} else {
debug!(target: "reth::cli", "shutting down gracefully");
// after the command has finished or exit signal was received we shutdown the task
// manager which fires the shutdown signal to all tasks spawned via the task
// after the command has finished or exit signal was received we shutdown the
// runtime which fires the shutdown signal to all tasks spawned via the task
// executor and awaiting on tasks spawned with graceful shutdown
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
}
tokio_shutdown(tokio_runtime, true);
runtime_shutdown(self.runtime, true);
command_res
}
@@ -99,17 +102,16 @@ impl CliRunner {
F: Future<Output = Result<(), E>> + Send + 'static,
E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
{
let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
AsyncCliRunner::new(self.tokio_runtime);
let (context, task_manager_handle) = cli_context(&self.runtime);
// Spawn the command on the blocking thread pool
let handle = tokio_runtime.handle().clone();
let command_handle =
tokio_runtime.handle().spawn_blocking(move || handle.block_on(command(context)));
let handle = self.runtime.handle().clone();
let handle2 = handle.clone();
let command_handle = handle.spawn_blocking(move || handle2.block_on(command(context)));
// Wait for the command to complete or ctrl-c
let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
&mut task_manager,
let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
task_manager_handle,
run_until_ctrl_c(
async move { command_handle.await.expect("Failed to join blocking task") },
),
@@ -119,10 +121,10 @@ impl CliRunner {
error!(target: "reth::cli", "shutting down due to error");
} else {
debug!(target: "reth::cli", "shutting down gracefully");
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
}
tokio_shutdown(tokio_runtime, true);
runtime_shutdown(self.runtime, true);
command_res
}
@@ -133,48 +135,40 @@ impl CliRunner {
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<std::io::Error> + 'static,
{
self.tokio_runtime.block_on(run_until_ctrl_c(fut))?;
self.runtime.handle().block_on(run_until_ctrl_c(fut))?;
Ok(())
}
/// Executes a regular future as a spawned blocking task until completion or until external
/// signal received.
///
/// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking) .
/// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking).
pub fn run_blocking_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
where
F: Future<Output = Result<(), E>> + Send + 'static,
E: Send + Sync + From<std::io::Error> + 'static,
{
let tokio_runtime = self.tokio_runtime;
let handle = tokio_runtime.handle().clone();
let fut = tokio_runtime.handle().spawn_blocking(move || handle.block_on(fut));
tokio_runtime
let handle = self.runtime.handle().clone();
let handle2 = handle.clone();
let fut = handle.spawn_blocking(move || handle2.block_on(fut));
self.runtime
.handle()
.block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
tokio_shutdown(tokio_runtime, false);
runtime_shutdown(self.runtime, false);
Ok(())
}
}
/// [`CliRunner`] configuration when executing commands asynchronously
struct AsyncCliRunner {
context: CliContext,
task_manager: TaskManager,
tokio_runtime: tokio::runtime::Runtime,
}
// === impl AsyncCliRunner ===
impl AsyncCliRunner {
/// Given a tokio [`Runtime`](tokio::runtime::Runtime), creates additional context required to
/// execute commands asynchronously.
fn new(tokio_runtime: tokio::runtime::Runtime) -> Self {
let task_manager = TaskManager::new(tokio_runtime.handle().clone());
let task_executor = task_manager.executor();
Self { context: CliContext { task_executor }, task_manager, tokio_runtime }
}
/// Extracts the task manager handle from the runtime and creates the [`CliContext`].
fn cli_context(
runtime: &reth_tasks::Runtime,
) -> (CliContext, JoinHandle<Result<(), PanickedTaskError>>) {
let handle =
runtime.take_task_manager_handle().expect("Runtime must contain a TaskManager handle");
let context = CliContext { task_executor: runtime.clone() };
(context, handle)
}
/// Additional context provided by the [`CliRunner`] when executing commands
@@ -216,37 +210,25 @@ impl CliRunnerConfig {
}
}
/// Creates a new default tokio multi-thread [Runtime](tokio::runtime::Runtime) with all features
/// enabled
pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
// Keep the threads alive for at least the block time (12 seconds) plus buffer.
// This prevents the costly process of spawning new threads on every
// new block, and instead reuses the existing threads.
.thread_keep_alive(Duration::from_secs(15))
.thread_name("tokio-rt")
.build()
}
/// Runs the given future to completion or until a critical task panicked.
///
/// Returns the error if a task panicked, or the given future returned an error.
async fn run_to_completion_or_panic<F, E>(tasks: &mut TaskManager, fut: F) -> Result<(), E>
async fn run_to_completion_or_panic<F, E>(
task_manager_handle: JoinHandle<Result<(), PanickedTaskError>>,
fut: F,
) -> Result<(), E>
where
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
{
{
let fut = pin!(fut);
tokio::select! {
task_manager_result = tasks => {
if let Err(panicked_error) = task_manager_result {
return Err(panicked_error.into());
}
},
res = fut => res?,
}
let fut = pin!(fut);
tokio::select! {
task_manager_result = task_manager_handle => {
if let Ok(Err(panicked_error)) = task_manager_result {
return Err(panicked_error.into());
}
},
res = fut => res?,
}
Ok(())
}
@@ -271,10 +253,10 @@ where
tokio::select! {
_ = ctrl_c => {
trace!(target: "reth::cli", "Received ctrl-c");
info!(target: "reth::cli", "Received ctrl-c");
},
_ = sigterm => {
trace!(target: "reth::cli", "Received SIGTERM");
info!(target: "reth::cli", "Received SIGTERM");
},
res = fut => res?,
}
@@ -287,7 +269,7 @@ where
tokio::select! {
_ = ctrl_c => {
trace!(target: "reth::cli", "Received ctrl-c");
info!(target: "reth::cli", "Received ctrl-c");
},
res = fut => res?,
}
@@ -296,17 +278,17 @@ where
Ok(())
}
/// Shut down the given Tokio runtime, and wait for it if `wait` is set.
/// Default timeout for waiting on the tokio runtime to shut down.
const DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
/// Shut down the given [`Runtime`](reth_tasks::Runtime), and wait for it if `wait` is set.
///
/// `drop(tokio_runtime)` would block the current thread until its pools
/// (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop
/// it on a separate thread and wait for up to 5 seconds for this operation to
/// complete.
fn tokio_shutdown(rt: tokio::runtime::Runtime, wait: bool) {
// Shutdown the runtime on a separate thread
/// Dropping the runtime on the current thread could block due to tokio pool teardown.
/// Instead, we drop it on a separate thread and optionally wait for completion.
fn runtime_shutdown(rt: reth_tasks::Runtime, wait: bool) {
let (tx, rx) = mpsc::channel();
std::thread::Builder::new()
.name("tokio-shutdown".to_string())
.name("rt-shutdown".to_string())
.spawn(move || {
drop(rt);
let _ = tx.send(());
@@ -314,8 +296,8 @@ fn tokio_shutdown(rt: tokio::runtime::Runtime, wait: bool) {
.unwrap();
if wait {
let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
let _ = rx.recv_timeout(DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT).inspect_err(|err| {
tracing::warn!(target: "reth::cli", %err, "runtime shutdown timed out");
});
}
}

View File

@@ -11,7 +11,6 @@ use reth_node_builder::{
PayloadTypes,
};
use reth_provider::providers::{BlockchainProvider, NodeTypesForProvider};
use reth_tasks::TaskManager;
use std::sync::Arc;
use wallet::Wallet;
@@ -50,7 +49,7 @@ pub async fn setup<N>(
chain_spec: Arc<N::ChainSpec>,
is_dev: bool,
attributes_generator: impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes + Send + Sync + Copy + 'static,
) -> eyre::Result<(Vec<NodeHelperType<N>>, TaskManager, Wallet)>
) -> eyre::Result<(Vec<NodeHelperType<N>>, Wallet)>
where
N: NodeBuilderHelper,
{
@@ -69,7 +68,6 @@ pub async fn setup_engine<N>(
attributes_generator: impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes + Send + Sync + Copy + 'static,
) -> eyre::Result<(
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
TaskManager,
Wallet,
)>
where
@@ -96,7 +94,6 @@ pub async fn setup_engine_with_connection<N>(
connect_nodes: bool,
) -> eyre::Result<(
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
TaskManager,
Wallet,
)>
where

View File

@@ -14,7 +14,7 @@ use reth_node_core::args::{DiscoveryArgs, NetworkArgs, RpcServerArgs};
use reth_primitives_traits::AlloyBlockHeader;
use reth_provider::providers::BlockchainProvider;
use reth_rpc_server_types::RpcModuleSelection;
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use std::sync::Arc;
use tracing::{span, Instrument, Level};
@@ -110,11 +110,9 @@ where
self,
) -> eyre::Result<(
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
TaskManager,
Wallet,
)> {
let tasks = TaskManager::current();
let exec = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
let network_config = NetworkArgs {
discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },
@@ -153,7 +151,7 @@ where
let span = span!(Level::INFO, "node", idx);
let node = N::default();
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
.testing_node(exec.clone())
.testing_node(runtime.clone())
.with_types_and_provider::<N, BlockchainProvider<_>>()
.with_components(node.components_builder())
.with_add_ons(node.add_ons())
@@ -197,7 +195,7 @@ where
}
}
Ok((nodes, tasks, Wallet::default().with_chain_id(self.chain_spec.chain().into())))
Ok((nodes, Wallet::default().with_chain_id(self.chain_spec.chain().into())))
}
}

View File

@@ -15,7 +15,7 @@ use reth_provider::{
};
use reth_rpc_server_types::RpcModuleSelection;
use reth_stages_types::StageId;
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use std::{path::Path, sync::Arc};
use tempfile::TempDir;
use tracing::{debug, info, span, Level};
@@ -24,8 +24,6 @@ use tracing::{debug, info, span, Level};
pub struct ChainImportResult {
/// The nodes that were created
pub nodes: Vec<NodeHelperType<EthereumNode>>,
/// The task manager
pub task_manager: TaskManager,
/// The wallet for testing
pub wallet: Wallet,
/// Temporary directories that must be kept alive for the duration of the test
@@ -68,8 +66,7 @@ pub async fn setup_engine_with_chain_import(
+ Copy
+ 'static,
) -> eyre::Result<ChainImportResult> {
let tasks = TaskManager::current();
let exec = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
let network_config = NetworkArgs {
discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },
@@ -129,6 +126,7 @@ pub async fn setup_engine_with_chain_import(
.with_default_tables()
.build()
.unwrap(),
reth_tasks::Runtime::test(),
)?;
// Initialize genesis if needed
@@ -221,7 +219,7 @@ pub async fn setup_engine_with_chain_import(
let node = EthereumNode::default();
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
.testing_node_with_datadir(exec.clone(), datadir.clone())
.testing_node_with_datadir(runtime.clone(), datadir.clone())
.with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
.with_components(node.components_builder())
.with_add_ons(node.add_ons())
@@ -243,7 +241,6 @@ pub async fn setup_engine_with_chain_import(
Ok(ChainImportResult {
nodes,
task_manager: tasks,
wallet: crate::Wallet::default().with_chain_id(chain_spec.chain.id()),
_temp_dirs: temp_dirs,
})
@@ -333,6 +330,7 @@ mod tests {
.with_default_tables()
.build()
.unwrap(),
reth_tasks::Runtime::test(),
)
.expect("failed to create provider factory");
@@ -397,6 +395,7 @@ mod tests {
.with_default_tables()
.build()
.unwrap(),
reth_tasks::Runtime::test(),
)
.expect("failed to create provider factory");
@@ -497,6 +496,7 @@ mod tests {
.with_default_tables()
.build()
.unwrap(),
reth_tasks::Runtime::test(),
)
.expect("failed to create provider factory");

View File

@@ -1,6 +1,6 @@
//! Test setup utilities for configuring the initial state.
use crate::{setup_engine_with_connection, testsuite::Environment, NodeBuilderHelper};
use crate::{testsuite::Environment, E2ETestSetupBuilder, NodeBuilderHelper};
use alloy_eips::BlockNumberOrTag;
use alloy_primitives::B256;
use alloy_rpc_types_engine::{ForkchoiceState, PayloadAttributes};
@@ -38,6 +38,8 @@ pub struct Setup<I> {
shutdown_tx: Option<mpsc::Sender<()>>,
/// Is this setup in dev mode
pub is_dev: bool,
/// Whether to use v2 storage mode (hashed keys, static file changesets, rocksdb history)
pub storage_v2: bool,
/// Tracks instance generic.
_phantom: PhantomData<I>,
/// Holds the import result to keep nodes alive when using imported chain
@@ -58,6 +60,7 @@ impl<I> Default for Setup<I> {
tree_config: TreeConfig::default(),
shutdown_tx: None,
is_dev: true,
storage_v2: false,
_phantom: Default::default(),
import_result_holder: None,
import_rlp_path: None,
@@ -126,6 +129,12 @@ where
self
}
/// Enable v2 storage mode (hashed keys, static file changesets, rocksdb history)
pub const fn with_storage_v2(mut self) -> Self {
self.storage_v2 = true;
self
}
/// Apply setup using pre-imported chain data from RLP file
pub async fn apply_with_import<N>(
&mut self,
@@ -194,23 +203,32 @@ where
self.shutdown_tx = Some(shutdown_tx);
let is_dev = self.is_dev;
let storage_v2 = self.storage_v2;
let node_count = self.network.node_count;
let tree_config = self.tree_config.clone();
let attributes_generator = Self::create_static_attributes_generator::<N>();
let result = setup_engine_with_connection::<N>(
let mut builder = E2ETestSetupBuilder::<N, _>::new(
node_count,
Arc::<N::ChainSpec>::new((*chain_spec).clone().into()),
is_dev,
self.tree_config.clone(),
attributes_generator,
self.network.connect_nodes,
)
.await;
.with_tree_config_modifier(move |base| {
tree_config.clone().with_cross_block_cache_size(base.cross_block_cache_size())
})
.with_node_config_modifier(move |config| config.set_dev(is_dev))
.with_connect_nodes(self.network.connect_nodes);
if storage_v2 {
builder = builder.with_storage_v2();
}
let result = builder.build().await;
let mut node_clients = Vec::new();
match result {
Ok((nodes, executor, _wallet)) => {
Ok((nodes, _wallet)) => {
// create HTTP clients for each node's RPC and Engine API endpoints
for node in &nodes {
node_clients.push(node.to_node_client()?);
@@ -218,12 +236,11 @@ where
// spawn a separate task just to handle the shutdown
tokio::spawn(async move {
// keep nodes and executor in scope to ensure they're not dropped
// keep nodes in scope to ensure they're not dropped
let _nodes = nodes;
let _executor = executor;
// Wait for shutdown signal
let _ = shutdown_rx.recv().await;
// nodes and executor will be dropped here when the test completes
// nodes will be dropped here when the test completes
});
}
Err(e) => {

View File

@@ -370,15 +370,14 @@ async fn test_setup_builder_with_custom_tree_config() -> Result<()> {
.build(),
);
let (nodes, _tasks, _wallet) =
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, |_| {
EthPayloadBuilderAttributes::default()
})
.with_tree_config_modifier(|config| {
config.with_persistence_threshold(0).with_memory_block_buffer_target(5)
})
.build()
.await?;
let (nodes, _wallet) = E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, |_| {
EthPayloadBuilderAttributes::default()
})
.with_tree_config_modifier(|config| {
config.with_persistence_threshold(0).with_memory_block_buffer_target(5)
})
.build()
.await?;
assert_eq!(nodes.len(), 1);

View File

@@ -10,7 +10,6 @@ use jsonrpsee::core::client::ClientT;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
use reth_db::tables;
use reth_e2e_test_utils::{transaction::TransactionTestContext, wallet, E2ETestSetupBuilder};
use reth_node_core::args::RocksDbArgs;
use reth_node_ethereum::EthereumNode;
use reth_payload_builder::EthPayloadBuilderAttributes;
use reth_provider::RocksDBProviderFactory;
@@ -96,22 +95,6 @@ fn test_attributes_generator(timestamp: u64) -> EthPayloadBuilderAttributes {
EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
}
/// Verifies that `RocksDB` CLI defaults are `None` (deferred to storage mode).
#[test]
fn test_rocksdb_defaults_are_none() {
let args = RocksDbArgs::default();
assert!(args.tx_hash.is_none(), "tx_hash default should be None (deferred to --storage.v2)");
assert!(
args.storages_history.is_none(),
"storages_history default should be None (deferred to --storage.v2)"
);
assert!(
args.account_history.is_none(),
"account_history default should be None (deferred to --storage.v2)"
);
}
/// Smoke test: node boots with `RocksDB` routing enabled.
#[tokio::test]
async fn test_rocksdb_node_startup() -> Result<()> {
@@ -119,7 +102,7 @@ async fn test_rocksdb_node_startup() -> Result<()> {
let chain_spec = test_chain_spec();
let (nodes, _tasks, _wallet) =
let (nodes, _wallet) =
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
.with_storage_v2()
.build()
@@ -147,7 +130,7 @@ async fn test_rocksdb_block_mining() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _wallet) =
let (mut nodes, _wallet) =
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
.with_storage_v2()
.build()
@@ -201,7 +184,7 @@ async fn test_rocksdb_transaction_queries() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
@@ -268,7 +251,7 @@ async fn test_rocksdb_multi_tx_same_block() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
@@ -336,7 +319,7 @@ async fn test_rocksdb_txs_across_blocks() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
@@ -421,7 +404,7 @@ async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
@@ -477,7 +460,7 @@ async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
///
/// This test exercises `unwind_trie_state_from` which previously failed with
/// `UnsortedInput` errors because it read changesets directly from MDBX tables
/// instead of using storage-aware methods that check `storage_changesets_in_static_files`.
/// instead of using storage-aware methods that check `is_v2()`.
#[tokio::test]
async fn test_rocksdb_reorg_unwind() -> Result<()> {
reth_tracing::init_test_tracing();
@@ -485,7 +468,7 @@ async fn test_rocksdb_reorg_unwind() -> Result<()> {
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,

View File

@@ -1,6 +1,7 @@
//! Engine tree configuration.
use alloy_eips::merge::EPOCH_SLOTS;
use core::time::Duration;
/// Triggers persistence when the number of canonical blocks in memory exceeds this threshold.
pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
@@ -64,6 +65,9 @@ pub const DEFAULT_SPARSE_TRIE_PRUNE_DEPTH: usize = 4;
/// Storage tries beyond this limit are cleared (but allocations preserved).
pub const DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES: usize = 100;
/// Default timeout for the state root task before spawning a sequential fallback.
pub const DEFAULT_STATE_ROOT_TASK_TIMEOUT: Duration = Duration::from_secs(1);
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = EPOCH_SLOTS as u32 * 2;
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
@@ -175,6 +179,13 @@ pub struct TreeConfig {
sparse_trie_prune_depth: usize,
/// Maximum number of storage tries to retain after pruning.
sparse_trie_max_storage_tries: usize,
/// Whether to fully disable sparse trie cache pruning between blocks.
disable_sparse_trie_cache_pruning: bool,
/// Timeout for the state root task before spawning a sequential fallback computation.
/// If `Some`, after waiting this duration for the state root task, a sequential state root
/// computation is spawned in parallel and whichever finishes first is used.
/// If `None`, the timeout fallback is disabled.
state_root_task_timeout: Option<Duration>,
}
impl Default for TreeConfig {
@@ -207,6 +218,8 @@ impl Default for TreeConfig {
disable_trie_cache: false,
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
disable_sparse_trie_cache_pruning: false,
state_root_task_timeout: Some(DEFAULT_STATE_ROOT_TASK_TIMEOUT),
}
}
}
@@ -241,6 +254,7 @@ impl TreeConfig {
disable_cache_metrics: bool,
sparse_trie_prune_depth: usize,
sparse_trie_max_storage_tries: usize,
state_root_task_timeout: Option<Duration>,
) -> Self {
Self {
persistence_threshold,
@@ -270,6 +284,8 @@ impl TreeConfig {
disable_trie_cache: false,
sparse_trie_prune_depth,
sparse_trie_max_storage_tries,
disable_sparse_trie_cache_pruning: false,
state_root_task_timeout,
}
}
@@ -618,4 +634,26 @@ impl TreeConfig {
self.sparse_trie_max_storage_tries = max_tries;
self
}
/// Returns whether sparse trie cache pruning is disabled.
pub const fn disable_sparse_trie_cache_pruning(&self) -> bool {
self.disable_sparse_trie_cache_pruning
}
/// Setter for whether to disable sparse trie cache pruning.
pub const fn with_disable_sparse_trie_cache_pruning(mut self, value: bool) -> Self {
self.disable_sparse_trie_cache_pruning = value;
self
}
/// Returns the state root task timeout.
pub const fn state_root_task_timeout(&self) -> Option<Duration> {
self.state_root_task_timeout
}
/// Setter for state root task timeout.
pub const fn with_state_root_task_timeout(mut self, timeout: Option<Duration>) -> Self {
self.state_root_task_timeout = timeout;
self
}
}

View File

@@ -23,7 +23,7 @@ use serde::{de::DeserializeOwned, Serialize};
// Re-export [`ExecutionPayload`] moved to `reth_payload_primitives`
#[cfg(feature = "std")]
pub use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator, ExecutableTxTuple};
pub use reth_evm::{ConfigureEngineEvm, ConvertTx, ExecutableTxIterator, ExecutableTxTuple};
pub use reth_payload_primitives::ExecutionPayload;
mod error;

View File

@@ -20,7 +20,7 @@ use reth_node_types::{BlockTy, NodeTypes};
use reth_payload_builder::PayloadBuilderHandle;
use reth_provider::{
providers::{BlockchainProvider, ProviderNodeTypes},
ProviderFactory,
ProviderFactory, StorageSettingsCache,
};
use reth_prune::PrunerWithFactory;
use reth_stages_api::{MetricEventsSender, Pipeline};
@@ -94,6 +94,7 @@ where
if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
let downloader = BasicBlockDownloader::new(client, consensus.clone());
let use_hashed_state = provider.cached_storage_settings().use_hashed_state();
let persistence_handle =
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
@@ -111,6 +112,7 @@ where
engine_kind,
evm_config,
changeset_cache,
use_hashed_state,
);
let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
@@ -201,6 +203,7 @@ mod tests {
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
changeset_cache.clone(),
reth_tasks::Runtime::test(),
);
let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();

View File

@@ -141,7 +141,15 @@ test-utils = [
"reth-ethereum-primitives/test-utils",
"reth-node-ethereum/test-utils",
"reth-evm-ethereum/test-utils",
"reth-tasks/test-utils",
]
rocksdb = [
"reth-provider/rocksdb",
"reth-prune/rocksdb",
"reth-stages?/rocksdb",
"reth-e2e-test-utils/rocksdb",
]
edge = ["rocksdb"]
[[test]]
name = "e2e_testsuite"

View File

@@ -12,8 +12,7 @@ use rand::Rng;
use reth_chainspec::ChainSpec;
use reth_db_common::init::init_genesis;
use reth_engine_tree::tree::{
executor::WorkloadExecutor, precompile_cache::PrecompileCacheMap, PayloadProcessor,
StateProviderBuilder, TreeConfig,
precompile_cache::PrecompileCacheMap, PayloadProcessor, StateProviderBuilder, TreeConfig,
};
use reth_ethereum_primitives::TransactionSigned;
use reth_evm::OnStateHook;
@@ -219,7 +218,7 @@ fn bench_state_root(c: &mut Criterion) {
setup_provider(&factory, &state_updates).expect("failed to setup provider");
let payload_processor = PayloadProcessor::new(
WorkloadExecutor::default(),
reth_tasks::Runtime::test(),
EthEvmConfig::new(factory.chain_spec()),
&TreeConfig::default(),
PrecompileCacheMap::default(),

View File

@@ -138,7 +138,7 @@ impl<N: ProviderNodeTypes> PipelineSync<N> {
let (tx, rx) = oneshot::channel();
let pipeline = pipeline.take().expect("exists");
self.pipeline_task_spawner.spawn_critical_blocking(
self.pipeline_task_spawner.spawn_critical_blocking_task(
"pipeline task",
Box::pin(async move {
let result = pipeline.run_as_fut(Some(target)).await;

View File

@@ -76,8 +76,16 @@ impl CacheConfig for EpochCacheConfig {
type FixedCache<K, V, H = DefaultHashBuilder> = fixed_cache::Cache<K, V, H, EpochCacheConfig>;
/// A wrapper of a state provider and a shared cache.
///
/// The const generic `PREWARM` controls whether every cache miss is populated. This is only
/// relevant for pre-warm transaction execution with the intention to pre-populate the cache with
/// data for regular block execution. During regular block execution the cache doesn't need to be
/// populated because the actual EVM database [`State`](revm::database::State) also caches
/// internally during block execution and the cache is then updated after the block with the entire
/// [`BundleState`] output of that block which contains all accessed accounts, code, storage. See
/// also [`ExecutionCache::insert_state`].
#[derive(Debug)]
pub struct CachedStateProvider<S> {
pub struct CachedStateProvider<S, const PREWARM: bool = false> {
/// The state provider
state_provider: S,
@@ -86,15 +94,9 @@ pub struct CachedStateProvider<S> {
/// Metrics for the cached state provider
metrics: CachedStateMetrics,
/// If prewarm enabled we populate every cache miss
prewarm: bool,
}
impl<S> CachedStateProvider<S>
where
S: StateProvider,
{
impl<S> CachedStateProvider<S> {
/// Creates a new [`CachedStateProvider`] from an [`ExecutionCache`], state provider, and
/// [`CachedStateMetrics`].
pub const fn new(
@@ -102,27 +104,18 @@ where
caches: ExecutionCache,
metrics: CachedStateMetrics,
) -> Self {
Self { state_provider, caches, metrics, prewarm: false }
Self { state_provider, caches, metrics }
}
}
impl<S> CachedStateProvider<S> {
/// Enables pre-warm mode so that every cache miss is populated.
///
/// This is only relevant for pre-warm transaction execution with the intention to pre-populate
/// the cache with data for regular block execution. During regular block execution the
/// cache doesn't need to be populated because the actual EVM database
/// [`State`](revm::database::State) also caches internally during block execution and the cache
/// is then updated after the block with the entire [`BundleState`] output of that block which
/// contains all accessed accounts,code,storage. See also [`ExecutionCache::insert_state`].
pub const fn prewarm(mut self) -> Self {
self.prewarm = true;
self
}
/// Returns whether this provider should pre-warm cache misses.
const fn is_prewarm(&self) -> bool {
self.prewarm
impl<S> CachedStateProvider<S, true> {
/// Creates a new [`CachedStateProvider`] with prewarming enabled.
pub const fn new_prewarm(
state_provider: S,
caches: ExecutionCache,
metrics: CachedStateMetrics,
) -> Self {
Self { state_provider, caches, metrics }
}
}
@@ -307,9 +300,9 @@ impl<K: PartialEq, V> StatsHandler<K, V> for CacheStatsHandler {
}
}
impl<S: AccountReader> AccountReader for CachedStateProvider<S> {
impl<S: AccountReader, const PREWARM: bool> AccountReader for CachedStateProvider<S, PREWARM> {
fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
if self.is_prewarm() {
if PREWARM {
match self.caches.get_or_try_insert_account_with(*address, || {
self.state_provider.basic_account(address)
})? {
@@ -334,13 +327,13 @@ pub enum CachedStatus<T> {
Cached(T),
}
impl<S: StateProvider> StateProvider for CachedStateProvider<S> {
impl<S: StateProvider, const PREWARM: bool> StateProvider for CachedStateProvider<S, PREWARM> {
fn storage(
&self,
account: Address,
storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
if self.is_prewarm() {
if PREWARM {
match self.caches.get_or_try_insert_storage_with(account, storage_key, || {
self.state_provider.storage(account, storage_key).map(Option::unwrap_or_default)
})? {
@@ -358,11 +351,19 @@ impl<S: StateProvider> StateProvider for CachedStateProvider<S> {
self.state_provider.storage(account, storage_key)
}
}
fn storage_by_hashed_key(
&self,
address: Address,
hashed_storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
self.state_provider.storage_by_hashed_key(address, hashed_storage_key)
}
}
impl<S: BytecodeReader> BytecodeReader for CachedStateProvider<S> {
impl<S: BytecodeReader, const PREWARM: bool> BytecodeReader for CachedStateProvider<S, PREWARM> {
fn bytecode_by_hash(&self, code_hash: &B256) -> ProviderResult<Option<Bytecode>> {
if self.is_prewarm() {
if PREWARM {
match self.caches.get_or_try_insert_code_with(*code_hash, || {
self.state_provider.bytecode_by_hash(code_hash)
})? {
@@ -378,7 +379,9 @@ impl<S: BytecodeReader> BytecodeReader for CachedStateProvider<S> {
}
}
impl<S: StateRootProvider> StateRootProvider for CachedStateProvider<S> {
impl<S: StateRootProvider, const PREWARM: bool> StateRootProvider
for CachedStateProvider<S, PREWARM>
{
fn state_root(&self, hashed_state: HashedPostState) -> ProviderResult<B256> {
self.state_provider.state_root(hashed_state)
}
@@ -402,7 +405,9 @@ impl<S: StateRootProvider> StateRootProvider for CachedStateProvider<S> {
}
}
impl<S: StateProofProvider> StateProofProvider for CachedStateProvider<S> {
impl<S: StateProofProvider, const PREWARM: bool> StateProofProvider
for CachedStateProvider<S, PREWARM>
{
fn proof(
&self,
input: TrieInput,
@@ -429,7 +434,9 @@ impl<S: StateProofProvider> StateProofProvider for CachedStateProvider<S> {
}
}
impl<S: StorageRootProvider> StorageRootProvider for CachedStateProvider<S> {
impl<S: StorageRootProvider, const PREWARM: bool> StorageRootProvider
for CachedStateProvider<S, PREWARM>
{
fn storage_root(
&self,
address: Address,
@@ -457,7 +464,7 @@ impl<S: StorageRootProvider> StorageRootProvider for CachedStateProvider<S> {
}
}
impl<S: BlockHashReader> BlockHashReader for CachedStateProvider<S> {
impl<S: BlockHashReader, const PREWARM: bool> BlockHashReader for CachedStateProvider<S, PREWARM> {
fn block_hash(&self, number: alloy_primitives::BlockNumber) -> ProviderResult<Option<B256>> {
self.state_provider.block_hash(number)
}
@@ -471,7 +478,9 @@ impl<S: BlockHashReader> BlockHashReader for CachedStateProvider<S> {
}
}
impl<S: HashedPostStateProvider> HashedPostStateProvider for CachedStateProvider<S> {
impl<S: HashedPostStateProvider, const PREWARM: bool> HashedPostStateProvider
for CachedStateProvider<S, PREWARM>
{
fn hashed_post_state(&self, bundle_state: &reth_revm::db::BundleState) -> HashedPostState {
self.state_provider.hashed_post_state(bundle_state)
}
@@ -836,8 +845,10 @@ impl SavedCache {
self.caches.update_metrics(&self.metrics);
}
/// Clears all caches, resetting them to empty state.
pub(crate) fn clear(&self) {
/// Clears all caches, resetting them to empty state,
/// and updates the hash of the block this cache belongs to.
pub(crate) fn clear_with_hash(&mut self, hash: B256) {
self.hash = hash;
self.caches.clear();
}
}

View File

@@ -199,6 +199,17 @@ impl<S: StateProvider> StateProvider for InstrumentedStateProvider<S> {
self.record_storage_fetch(start.elapsed());
res
}
fn storage_by_hashed_key(
&self,
address: Address,
hashed_storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
let start = Instant::now();
let res = self.state_provider.storage_by_hashed_key(address, hashed_storage_key);
self.record_storage_fetch(start.elapsed());
res
}
}
impl<S: BytecodeReader> BytecodeReader for InstrumentedStateProvider<S> {

View File

@@ -8,9 +8,18 @@ use reth_metrics::{
metrics::{Counter, Gauge, Histogram},
Metrics,
};
use reth_primitives_traits::constants::gas_units::MEGAGAS;
use reth_trie::updates::TrieUpdates;
use std::time::{Duration, Instant};
/// Upper bounds for each gas bucket. The last bucket is a catch-all for
/// everything above the final threshold: <5M, 5-10M, 10-20M, 20-30M, 30-40M, >40M.
const GAS_BUCKET_THRESHOLDS: [u64; 5] =
[5 * MEGAGAS, 10 * MEGAGAS, 20 * MEGAGAS, 30 * MEGAGAS, 40 * MEGAGAS];
/// Total number of gas buckets (thresholds + 1 catch-all).
const NUM_GAS_BUCKETS: usize = GAS_BUCKET_THRESHOLDS.len() + 1;
/// Metrics for the `EngineApi`.
#[derive(Debug, Default)]
pub struct EngineApiMetrics {
@@ -235,6 +244,65 @@ impl ForkchoiceUpdatedMetrics {
}
}
/// Per-gas-bucket newPayload metrics, initialized once via [`Self::new_with_labels`].
#[derive(Clone, Metrics)]
#[metrics(scope = "consensus.engine.beacon")]
pub(crate) struct NewPayloadGasBucketMetrics {
/// Latency for new payload calls in this gas bucket.
pub(crate) new_payload_gas_bucket_latency: Histogram,
/// Gas per second for new payload calls in this gas bucket.
pub(crate) new_payload_gas_bucket_gas_per_second: Histogram,
}
/// Holds pre-initialized [`NewPayloadGasBucketMetrics`] instances, one per gas bucket.
#[derive(Debug)]
pub(crate) struct GasBucketMetrics {
buckets: [NewPayloadGasBucketMetrics; NUM_GAS_BUCKETS],
}
impl Default for GasBucketMetrics {
fn default() -> Self {
Self {
buckets: std::array::from_fn(|i| {
let label = Self::bucket_label(i);
NewPayloadGasBucketMetrics::new_with_labels(&[("gas_bucket", label)])
}),
}
}
}
impl GasBucketMetrics {
fn record(&self, gas_used: u64, elapsed: Duration) {
let idx = Self::bucket_index(gas_used);
self.buckets[idx].new_payload_gas_bucket_latency.record(elapsed);
self.buckets[idx]
.new_payload_gas_bucket_gas_per_second
.record(gas_used as f64 / elapsed.as_secs_f64());
}
fn bucket_index(gas_used: u64) -> usize {
GAS_BUCKET_THRESHOLDS
.iter()
.position(|&threshold| gas_used < threshold)
.unwrap_or(GAS_BUCKET_THRESHOLDS.len())
}
/// Returns a human-readable label like `<5M`, `5-10M`, … `>40M`.
fn bucket_label(index: usize) -> String {
if index == 0 {
let hi = GAS_BUCKET_THRESHOLDS[0] / MEGAGAS;
format!("<{hi}M")
} else if index < GAS_BUCKET_THRESHOLDS.len() {
let lo = GAS_BUCKET_THRESHOLDS[index - 1] / MEGAGAS;
let hi = GAS_BUCKET_THRESHOLDS[index] / MEGAGAS;
format!("{lo}-{hi}M")
} else {
let lo = GAS_BUCKET_THRESHOLDS[GAS_BUCKET_THRESHOLDS.len() - 1] / MEGAGAS;
format!(">{lo}M")
}
}
}
/// Metrics for engine newPayload responses.
#[derive(Metrics)]
#[metrics(scope = "consensus.engine.beacon")]
@@ -245,6 +313,9 @@ pub(crate) struct NewPayloadStatusMetrics {
/// Start time of the latest new payload call.
#[metric(skip)]
pub(crate) latest_start_at: Option<Instant>,
/// Gas-bucket-labeled latency and gas/s histograms.
#[metric(skip)]
pub(crate) gas_bucket: GasBucketMetrics,
/// The total count of new payload messages received.
pub(crate) new_payload_messages: Counter,
/// The total count of new payload messages that we responded to with
@@ -321,6 +392,7 @@ impl NewPayloadStatusMetrics {
self.new_payload_messages.increment(1);
self.new_payload_latency.record(elapsed);
self.new_payload_last.set(elapsed);
self.gas_bucket.record(gas_used, elapsed);
if let Some(latest_forkchoice_updated_at) = latest_forkchoice_updated_at.take() {
self.forkchoice_updated_new_payload_time_diff
.record(start - latest_forkchoice_updated_at);
@@ -365,6 +437,8 @@ pub struct BlockValidationMetrics {
pub state_root_parallel_fallback_total: Counter,
/// Total number of times the state root task failed but the fallback succeeded.
pub state_root_task_fallback_success_total: Counter,
/// Total number of times the state root task timed out and a sequential fallback was spawned.
pub state_root_task_timeout_total: Counter,
/// Latest state root duration, ie the time spent blocked waiting for the state root.
pub state_root_duration: Gauge,
/// Histogram for state root duration ie the time spent blocked waiting for the state root

View File

@@ -32,7 +32,7 @@ use reth_provider::{
BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
TransactionVariant,
StorageSettingsCache, TransactionVariant,
};
use reth_revm::database::StateProviderDatabase;
use reth_stages_api::ControlFlow;
@@ -271,6 +271,9 @@ where
evm_config: C,
/// Changeset cache for in-memory trie changesets
changeset_cache: ChangesetCache,
/// Whether the node uses hashed state as canonical storage (v2 mode).
/// Cached at construction to avoid threading `StorageSettingsCache` bounds everywhere.
use_hashed_state: bool,
}
impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
@@ -296,6 +299,7 @@ where
.field("engine_kind", &self.engine_kind)
.field("evm_config", &self.evm_config)
.field("changeset_cache", &self.changeset_cache)
.field("use_hashed_state", &self.use_hashed_state)
.finish()
}
}
@@ -313,7 +317,8 @@ where
P::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
+ StageCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader,
+ StorageChangeSetReader
+ StorageSettingsCache,
C: ConfigureEvm<Primitives = N> + 'static,
T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
V: EngineValidator<T>,
@@ -334,6 +339,7 @@ where
engine_kind: EngineApiKind,
evm_config: C,
changeset_cache: ChangesetCache,
use_hashed_state: bool,
) -> Self {
let (incoming_tx, incoming) = crossbeam_channel::unbounded();
@@ -355,6 +361,7 @@ where
engine_kind,
evm_config,
changeset_cache,
use_hashed_state,
}
}
@@ -375,6 +382,7 @@ where
kind: EngineApiKind,
evm_config: C,
changeset_cache: ChangesetCache,
use_hashed_state: bool,
) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
{
let best_block_number = provider.best_block_number().unwrap_or(0);
@@ -407,6 +415,7 @@ where
kind,
evm_config,
changeset_cache,
use_hashed_state,
);
let incoming = task.incoming_tx.clone();
spawn_os_thread("engine", || task.run());
@@ -1402,7 +1411,7 @@ where
// Spawn a background task to trigger computation so it's ready when the next payload
// arrives.
if let Some(overlay) = self.state.tree_state.prepare_canonical_overlay() {
rayon::spawn(move || {
tokio::task::spawn_blocking(move || {
let _ = overlay.get();
});
}
@@ -1510,7 +1519,7 @@ where
.engine
.failed_forkchoice_updated_response_deliveries
.increment(1);
error!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
}
}
BeaconEngineMessage::NewPayload { payload, tx } => {
@@ -1534,7 +1543,7 @@ where
BeaconOnNewPayloadError::Internal(Box::new(e))
}))
{
error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
self.metrics
.engine
.failed_new_payload_response_deliveries
@@ -2379,7 +2388,12 @@ where
self.update_reorg_metrics(old.len(), old_first);
self.reinsert_reorged_blocks(new.clone());
self.reinsert_reorged_blocks(old.clone());
// When use_hashed_state is enabled, skip reinserting the old chain — the
// bundle state references plain state reverts which don't exist.
if !self.use_hashed_state {
self.reinsert_reorged_blocks(old.clone());
}
}
// update the tracked in-memory state with the new chain

View File

@@ -1,47 +0,0 @@
//! Executor for mixed I/O and CPU workloads.
use reth_trie_parallel::root::get_tokio_runtime_handle;
use tokio::{runtime::Handle, task::JoinHandle};
/// An executor for mixed I/O and CPU workloads.
///
/// This type uses tokio to spawn blocking tasks and will reuse an existing tokio
/// runtime if available or create its own.
#[derive(Debug, Clone)]
pub struct WorkloadExecutor {
inner: WorkloadExecutorInner,
}
impl Default for WorkloadExecutor {
fn default() -> Self {
Self { inner: WorkloadExecutorInner::new() }
}
}
impl WorkloadExecutor {
/// Returns the handle to the tokio runtime
pub(super) const fn handle(&self) -> &Handle {
&self.inner.handle
}
/// Runs the provided function on an executor dedicated to blocking operations.
#[track_caller]
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.inner.handle.spawn_blocking(func)
}
}
#[derive(Debug, Clone)]
struct WorkloadExecutorInner {
handle: Handle,
}
impl WorkloadExecutorInner {
fn new() -> Self {
Self { handle: get_tokio_runtime_handle() }
}
}

View File

@@ -15,7 +15,6 @@ use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
use alloy_evm::block::StateChangeSource;
use alloy_primitives::B256;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use executor::WorkloadExecutor;
use metrics::{Counter, Histogram};
use multiproof::{SparseTrieUpdate, *};
use parking_lot::RwLock;
@@ -24,8 +23,8 @@ use rayon::prelude::*;
use reth_evm::{
block::ExecutableTxParts,
execute::{ExecutableTxFor, WithTxEnv},
ConfigureEvm, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook, SpecFor,
TxEnvFor,
ConfigureEvm, ConvertTx, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook,
SpecFor, TxEnvFor,
};
use reth_metrics::Metrics;
use reth_primitives_traits::NodePrimitives;
@@ -34,6 +33,7 @@ use reth_provider::{
StateProviderFactory, StateReader,
};
use reth_revm::{db::BundleState, state::EvmState};
use reth_tasks::Runtime;
use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
use reth_trie_parallel::{
proof_task::{ProofTaskCtx, ProofWorkerHandle},
@@ -50,12 +50,11 @@ use std::{
mpsc::{self, channel},
Arc,
},
time::Instant,
time::{Duration, Instant},
};
use tracing::{debug, debug_span, instrument, warn, Span};
pub mod bal;
pub mod executor;
pub mod multiproof;
mod preserved_sparse_trie;
pub mod prewarm;
@@ -95,6 +94,9 @@ pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
/// 144MB.
pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
/// Blocks with fewer transactions than this skip prewarming, since the fixed overhead of spawning
/// prewarm workers exceeds the execution time saved.
pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;
/// Type alias for [`PayloadHandle`] returned by payload processor spawn methods.
type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
WithTxEnv<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>,
@@ -109,7 +111,7 @@ where
Evm: ConfigureEvm,
{
/// The executor used by to spawn tasks.
executor: WorkloadExecutor,
executor: Runtime,
/// The most recent cache used for execution.
execution_cache: PayloadExecutionCache,
/// Metrics for trie operations
@@ -136,6 +138,8 @@ where
sparse_trie_prune_depth: usize,
/// Maximum storage tries to retain after pruning.
sparse_trie_max_storage_tries: usize,
/// Whether sparse trie cache pruning is fully disabled.
disable_sparse_trie_cache_pruning: bool,
/// Whether to disable cache metrics recording.
disable_cache_metrics: bool,
}
@@ -146,13 +150,13 @@ where
Evm: ConfigureEvm<Primitives = N>,
{
/// Returns a reference to the workload executor driving payload tasks.
pub const fn executor(&self) -> &WorkloadExecutor {
pub const fn executor(&self) -> &Runtime {
&self.executor
}
/// Creates a new payload processor.
pub fn new(
executor: WorkloadExecutor,
executor: Runtime,
evm_config: Evm,
config: &TreeConfig,
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
@@ -171,6 +175,7 @@ where
prewarm_max_concurrency: config.prewarm_max_concurrency(),
sparse_trie_prune_depth: config.sparse_trie_prune_depth(),
sparse_trie_max_storage_tries: config.sparse_trie_max_storage_tries(),
disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
disable_cache_metrics: config.disable_cache_metrics(),
}
}
@@ -236,7 +241,8 @@ where
+ 'static,
{
// start preparing transactions immediately
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
let (prewarm_rx, execution_rx) =
self.spawn_tx_iterator(transactions, env.transaction_count);
let span = Span::current();
let (to_sparse_trie, sparse_trie_rx) = channel();
@@ -277,15 +283,7 @@ where
// Create and spawn the storage proof task
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
let storage_worker_count = config.storage_worker_count();
let account_worker_count = config.account_worker_count();
let proof_handle = ProofWorkerHandle::new(
self.executor.handle().clone(),
task_ctx,
storage_worker_count,
account_worker_count,
v2_proofs_enabled,
);
let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, v2_proofs_enabled);
if config.disable_trie_cache() {
let multi_proof_task = MultiProofTask::new(
@@ -351,7 +349,8 @@ where
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
let (prewarm_rx, execution_rx) =
self.spawn_tx_iterator(transactions, env.transaction_count);
// This path doesn't use multiproof, so V2 proofs flag doesn't matter
let prewarm_handle =
self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal, false);
@@ -364,11 +363,23 @@ where
}
}
/// Transaction count threshold below which sequential signature recovery is used.
///
/// For blocks with fewer than this many transactions, the rayon parallel iterator overhead
/// (work-stealing setup, channel-based reorder) exceeds the cost of sequential ECDSA
/// recovery. Inspired by Nethermind's `RecoverSignature` which uses sequential `foreach`
/// for small blocks.
const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
/// Spawns a task advancing transaction env iterator and streaming updates through a channel.
///
/// For blocks with fewer than [`Self::SMALL_BLOCK_TX_THRESHOLD`] transactions, uses
/// sequential iteration to avoid rayon overhead.
#[expect(clippy::type_complexity)]
fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
&self,
transactions: I,
transaction_count: usize,
) -> (
mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Recovered>>,
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
@@ -377,22 +388,51 @@ where
let (prewarm_tx, prewarm_rx) = mpsc::channel();
let (execute_tx, execute_rx) = mpsc::channel();
// Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
rayon::spawn(move || {
let (transactions, convert) = transactions.into();
transactions.into_par_iter().enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
let tx = convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
// Only send Ok(_) variants to prewarming task.
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
if transaction_count == 0 {
// Empty block — nothing to do.
} else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
// Sequential path for small blocks — avoids rayon work-stealing setup and
// channel-based reorder overhead when it costs more than the ECDSA recovery itself.
debug!(
target: "engine::tree::payload_processor",
transaction_count,
"using sequential sig recovery for small block"
);
self.executor.spawn_blocking(move || {
let (transactions, convert) = transactions.into_parts();
for (idx, tx) in transactions.into_iter().enumerate() {
let tx = convert.convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
let _ = ooo_tx.send((idx, tx));
}
let _ = ooo_tx.send((idx, tx));
});
});
} else {
// Parallel path — spawn on rayon for parallel signature recovery.
rayon::spawn(move || {
let (transactions, convert) = transactions.into_parts();
transactions.into_par_iter().enumerate().for_each_with(
ooo_tx,
|ooo_tx, (idx, tx)| {
let tx = convert.convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
// Only send Ok(_) variants to prewarming task.
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
let _ = ooo_tx.send((idx, tx));
},
);
});
}
// Spawn a task that processes out-of-order transactions from the task above and sends them
// to the execution task in order.
@@ -404,8 +444,8 @@ where
let _ = execute_tx.send(tx);
next_for_execution += 1;
while let Some(entry) = queue.first_entry() &&
*entry.key() == next_for_execution
while let Some(entry) = queue.first_entry()
&& *entry.key() == next_for_execution
{
let _ = execute_tx.send(entry.remove());
next_for_execution += 1;
@@ -423,7 +463,7 @@ where
fn spawn_caching_with<P>(
&self,
env: ExecutionEnv<Evm>,
mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
provider_builder: StateProviderBuilder<N, P>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
bal: Option<Arc<BlockAccessList>>,
@@ -432,11 +472,8 @@ where
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
if self.disable_transaction_prewarming {
// if no transactions should be executed we clear them but still spawn the task for
// caching updates
transactions = mpsc::channel().1;
}
let skip_prewarm =
self.disable_transaction_prewarming || env.transaction_count < SMALL_BLOCK_TX_THRESHOLD;
let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
@@ -465,7 +502,9 @@ where
{
let to_prewarm_task = to_prewarm_task.clone();
self.executor.spawn_blocking(move || {
let mode = if let Some(bal) = bal {
let mode = if skip_prewarm {
PrewarmMode::Skipped
} else if let Some(bal) = bal {
PrewarmMode::BlockAccessList(bal)
} else {
PrewarmMode::Transactions(transactions)
@@ -514,6 +553,7 @@ where
let disable_trie_cache = config.disable_trie_cache();
let prune_depth = self.sparse_trie_prune_depth;
let max_storage_tries = self.sparse_trie_max_storage_tries;
let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
let chunk_size =
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size());
let executor = self.executor.clone();
@@ -610,6 +650,7 @@ where
max_storage_tries,
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
disable_cache_pruning,
);
trie_metrics
.into_trie_for_reuse_duration_histogram
@@ -723,6 +764,18 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
.map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
}
/// Takes the state root receiver out of the handle for use with custom waiting logic
/// (e.g., timeout-based waiting).
///
/// # Panics
///
/// If payload processing was started without background tasks.
pub const fn take_state_root_rx(
&mut self,
) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
self.state_root.take().expect("state_root is None")
}
/// Returns a state hook to be used to send state updates to this task.
///
/// If a multiproof task is spawned the hook will notify it about new states.
@@ -873,7 +926,7 @@ impl PayloadExecutionCache {
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
let start = Instant::now();
let cache = self.inner.read();
let mut cache = self.inner.write();
let elapsed = start.elapsed();
self.metrics.execution_cache_wait_duration.record(elapsed.as_secs_f64());
@@ -881,7 +934,7 @@ impl PayloadExecutionCache {
warn!(blocked_for=?elapsed, "Blocked waiting for execution cache mutex");
}
if let Some(c) = cache.as_ref() {
if let Some(c) = cache.as_mut() {
let cached_hash = c.executed_block_hash();
// Check that the cache hash matches the parent hash of the current block. It won't
// match in case it's a fork block.
@@ -902,13 +955,13 @@ impl PayloadExecutionCache {
);
if available {
// If the has is available (no other threads are using it), but has a mismatching
// parent hash, we can just clear it and keep using without re-creating from
// scratch.
if !hash_matches {
c.clear();
// Fork block: clear and update the hash on the ORIGINAL before cloning.
// This prevents the canonical chain from matching on the stale hash
// and picking up polluted data if the fork block fails.
c.clear_with_hash(parent_hash);
}
return Some(c.clone())
return Some(c.clone());
} else if hash_matches {
self.metrics.execution_cache_in_use.increment(1);
}
@@ -919,10 +972,25 @@ impl PayloadExecutionCache {
None
}
/// Clears the tracked cache
#[expect(unused)]
pub(crate) fn clear(&self) {
self.inner.write().take();
/// Waits until the execution cache becomes available for use.
///
/// This acquires a write lock to ensure exclusive access, then immediately releases it.
/// This is useful for synchronization before starting payload processing.
///
/// Returns the time spent waiting for the lock.
pub fn wait_for_availability(&self) -> Duration {
let start = Instant::now();
// Acquire write lock to wait for any current holders to finish
let _guard = self.inner.write();
let elapsed = start.elapsed();
if elapsed.as_millis() > 5 {
debug!(
target: "engine::tree::payload_processor",
blocked_for=?elapsed,
"Waited for execution cache to become available"
);
}
elapsed
}
/// Updates the cache with a closure that has exclusive access to the guard.
@@ -1001,9 +1069,7 @@ mod tests {
use super::PayloadExecutionCache;
use crate::tree::{
cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
payload_processor::{
evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
},
payload_processor::{evm_state_to_hashed_post_state, PayloadProcessor},
precompile_cache::PrecompileCacheMap,
StateProviderBuilder, TreeConfig,
};
@@ -1077,10 +1143,18 @@ mod tests {
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
// When the parent hash doesn't match, the cache is cleared and returned for reuse
// When the parent hash doesn't match (fork block), the cache is cleared,
// hash updated on the original, and clone returned for reuse
let different_hash = B256::from([4u8; 32]);
let cache = execution_cache.get_cache_for(different_hash);
assert!(cache.is_some(), "cache should be returned for reuse after clearing")
assert!(cache.is_some(), "cache should be returned for reuse after clearing");
drop(cache);
// The stored cache now has the fork block's parent hash.
// Canonical chain looking for original hash sees a mismatch → clears and reuses.
let original = execution_cache.get_cache_for(hash);
assert!(original.is_some(), "canonical chain gets cache back via mismatch+clear");
}
#[test]
@@ -1105,7 +1179,7 @@ mod tests {
#[test]
fn on_inserted_executed_block_populates_cache() {
let payload_processor = PayloadProcessor::new(
WorkloadExecutor::default(),
reth_tasks::Runtime::test(),
EthEvmConfig::new(Arc::new(ChainSpec::default())),
&TreeConfig::default(),
PrecompileCacheMap::default(),
@@ -1134,7 +1208,7 @@ mod tests {
#[test]
fn on_inserted_executed_block_skips_on_parent_mismatch() {
let payload_processor = PayloadProcessor::new(
WorkloadExecutor::default(),
reth_tasks::Runtime::test(),
EthEvmConfig::new(Arc::new(ChainSpec::default())),
&TreeConfig::default(),
PrecompileCacheMap::default(),
@@ -1269,7 +1343,7 @@ mod tests {
}
let mut payload_processor = PayloadProcessor::new(
WorkloadExecutor::default(),
reth_tasks::Runtime::test(),
EthEvmConfig::new(factory.chain_spec()),
&TreeConfig::default(),
PrecompileCacheMap::default(),
@@ -1304,4 +1378,61 @@ mod tests {
"State root mismatch: task={root_from_task}, base={root_from_regular}"
);
}
/// Tests the full prewarm lifecycle for a fork block:
///
/// 1. Cache is at canonical block 4.
/// 2. Fork block (parent = block 2) checks out the cache via `get_cache_for`, simulating what
/// `PrewarmCacheTask` does when it receives a `SavedCache`.
/// 3. Prewarm populates the shared cache with fork-specific state.
/// 4. While the prewarm clone is alive, the cache is unavailable (`usage_guard` > 1).
/// 5. Prewarm drops without calling `save_cache` (fork block was invalid).
/// 6. Canonical block 5 (parent = block 4) must get a cache with correct hash and no stale fork
/// data.
#[test]
fn fork_prewarm_dropped_without_save_does_not_corrupt_cache() {
let execution_cache = PayloadExecutionCache::default();
// Canonical chain at block 4.
let block4_hash = B256::from([4u8; 32]);
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(block4_hash)));
// Fork block arrives with parent = block 2. Prewarm task checks out the cache.
// This simulates PrewarmCacheTask receiving a SavedCache clone from get_cache_for.
let fork_parent = B256::from([2u8; 32]);
let prewarm_cache = execution_cache.get_cache_for(fork_parent);
assert!(prewarm_cache.is_some(), "prewarm should obtain cache for fork block");
let prewarm_cache = prewarm_cache.unwrap();
assert_eq!(prewarm_cache.executed_block_hash(), fork_parent);
// Prewarm populates cache with fork-specific state (ancestor data for block 2).
// Since ExecutionCache uses Arc<Inner>, this data is shared with the stored original.
let fork_addr = Address::from([0xBB; 20]);
let fork_key = B256::from([0xCC; 32]);
prewarm_cache.cache().insert_storage(fork_addr, fork_key, Some(U256::from(999)));
// While prewarm holds the clone, the usage_guard count > 1 → cache is in use.
let during_prewarm = execution_cache.get_cache_for(block4_hash);
assert!(
during_prewarm.is_none(),
"cache must be unavailable while prewarm holds a reference"
);
// Fork block fails — prewarm task drops without calling save_cache/update_with_guard.
drop(prewarm_cache);
// Canonical block 5 arrives (parent = block 4).
// Stored hash = fork_parent (our fix), so get_cache_for sees a mismatch,
// clears the stale fork data, and returns a cache with hash = block4_hash.
let block5_cache = execution_cache.get_cache_for(block4_hash);
assert!(
block5_cache.is_some(),
"canonical chain must get cache after fork prewarm is dropped"
);
assert_eq!(
block5_cache.as_ref().unwrap().executed_block_hash(),
block4_hash,
"cache must carry the canonical parent hash, not the fork parent"
);
}
}

View File

@@ -1541,23 +1541,18 @@ mod tests {
providers::OverlayStateProviderFactory, test_utils::create_test_provider_factory,
BlockNumReader, BlockReader, ChangeSetReader, DatabaseProviderFactory, LatestStateProvider,
PruneCheckpointReader, StageCheckpointReader, StateProviderBox, StorageChangeSetReader,
StorageSettingsCache,
};
use reth_trie::MultiProof;
use reth_trie_db::ChangesetCache;
use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofWorkerHandle};
use revm_primitives::{B256, U256};
use std::sync::{Arc, OnceLock};
use tokio::runtime::{Handle, Runtime};
/// Get a handle to the test runtime, creating it if necessary
fn get_test_runtime_handle() -> Handle {
static TEST_RT: OnceLock<Runtime> = OnceLock::new();
TEST_RT
.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()
})
.handle()
.clone()
/// Get a test runtime, creating it if necessary
fn get_test_runtime() -> &'static reth_tasks::Runtime {
static TEST_RT: OnceLock<reth_tasks::Runtime> = OnceLock::new();
TEST_RT.get_or_init(reth_tasks::Runtime::test)
}
fn create_test_state_root_task<F>(factory: F) -> MultiProofTask
@@ -1568,16 +1563,17 @@ mod tests {
+ PruneCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader
+ StorageSettingsCache
+ BlockNumReader,
> + Clone
+ Send
+ 'static,
{
let rt_handle = get_test_runtime_handle();
let runtime = get_test_runtime();
let changeset_cache = ChangesetCache::new();
let overlay_factory = OverlayStateProviderFactory::new(factory, changeset_cache);
let task_ctx = ProofTaskCtx::new(overlay_factory);
let proof_handle = ProofWorkerHandle::new(rt_handle, task_ctx, 1, 1, false);
let proof_handle = ProofWorkerHandle::new(runtime, task_ctx, false);
let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();
let (tx, rx) = crossbeam_channel::unbounded();
@@ -1587,7 +1583,10 @@ mod tests {
fn create_cached_provider<F>(factory: F) -> CachedStateProvider<StateProviderBox>
where
F: DatabaseProviderFactory<
Provider: BlockReader + StageCheckpointReader + PruneCheckpointReader,
Provider: BlockReader
+ StageCheckpointReader
+ PruneCheckpointReader
+ reth_provider::StorageSettingsCache,
> + Clone
+ Send
+ 'static,

View File

@@ -15,7 +15,6 @@ use crate::tree::{
cached_state::{CachedStateProvider, SavedCache},
payload_processor::{
bal::{self, total_slots, BALSlotIter},
executor::WorkloadExecutor,
multiproof::{MultiProofMessage, VersionedMultiProofTargets},
PayloadExecutionCache,
},
@@ -37,6 +36,7 @@ use reth_provider::{
StateReader,
};
use reth_revm::{database::StateProviderDatabase, state::EvmState};
use reth_tasks::Runtime;
use reth_trie::MultiProofTargets;
use std::{
ops::Range,
@@ -49,13 +49,16 @@ use std::{
};
use tracing::{debug, debug_span, instrument, trace, warn, Span};
/// Determines the prewarming mode: transaction-based or BAL-based.
/// Determines the prewarming mode: transaction-based, BAL-based, or skipped.
#[derive(Debug)]
pub enum PrewarmMode<Tx> {
/// Prewarm by executing transactions from a stream.
Transactions(Receiver<Tx>),
/// Prewarm by prefetching slots from a Block Access List.
BlockAccessList(Arc<BlockAccessList>),
/// Transaction prewarming is skipped (e.g. small blocks where the overhead exceeds the
/// benefit). No workers are spawned.
Skipped,
}
/// A wrapper for transactions that includes their index in the block.
@@ -78,7 +81,7 @@ where
Evm: ConfigureEvm<Primitives = N>,
{
/// The executor used to spawn execution tasks.
executor: WorkloadExecutor,
executor: Runtime,
/// Shared execution cache.
execution_cache: PayloadExecutionCache,
/// Context provided to execution tasks
@@ -101,7 +104,7 @@ where
{
/// Initializes the task with the given transactions pending execution
pub fn new(
executor: WorkloadExecutor,
executor: Runtime,
execution_cache: PayloadExecutionCache,
ctx: PrewarmContext<N, P, Evm>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
@@ -416,6 +419,10 @@ where
PrewarmMode::BlockAccessList(bal) => {
self.run_bal_prewarm(bal, actions_tx);
}
PrewarmMode::Skipped => {
let _ = actions_tx
.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
}
}
let mut final_execution_outcome = None;
@@ -528,11 +535,8 @@ where
if let Some(saved_cache) = saved_cache {
let caches = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
state_provider = Box::new(
CachedStateProvider::new(state_provider, caches, cache_metrics)
// ensure we pre-warm the cache
.prewarm(),
);
state_provider =
Box::new(CachedStateProvider::new_prewarm(state_provider, caches, cache_metrics));
}
let state_provider = StateProviderDatabase::new(state_provider);
@@ -590,18 +594,11 @@ where
return
};
while let Ok(IndexedTransaction { index, tx }) = {
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "recv tx")
.entered();
txs.recv()
} {
let enter = debug_span!(
while let Ok(IndexedTransaction { index, tx }) = txs.recv() {
let _enter = debug_span!(
target: "engine::tree::payload_processor::prewarm",
"prewarm tx",
index,
tx_hash = %tx.tx().tx_hash(),
is_success = tracing::field::Empty,
gas_used = tracing::field::Empty,
)
.entered();
@@ -632,12 +629,6 @@ where
};
metrics.execution_duration.record(start.elapsed());
// record some basic information about the transactions
enter.record("gas_used", res.result.gas_used());
enter.record("is_success", res.result.is_success());
drop(enter);
// If the task was cancelled, stop execution, and exit.
if terminate_execution.load(Ordering::Relaxed) {
break
@@ -646,16 +637,12 @@ where
// Only send outcome for transactions after the first txn
// as the main execution will be just as fast
if index > 0 {
let _enter =
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
.entered();
let (targets, storage_targets) =
multiproof_targets_from_state(res.state, v2_proofs_enabled);
metrics.prefetch_storage_targets.record(storage_targets as f64);
if let Some(to_multi_proof) = &to_multi_proof {
let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
}
drop(_enter);
}
metrics.total_runtime.record(start.elapsed());
@@ -671,7 +658,7 @@ where
fn spawn_workers<Tx>(
self,
workers_needed: usize,
task_executor: &WorkloadExecutor,
task_executor: &Runtime,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
done_tx: Sender<()>,
) -> CrossbeamSender<IndexedTransaction<Tx>>
@@ -708,7 +695,7 @@ where
fn spawn_bal_worker(
&self,
idx: usize,
executor: &WorkloadExecutor,
executor: &Runtime,
bal: Arc<BlockAccessList>,
range: Range<usize>,
done_tx: Sender<()>,

View File

@@ -1,6 +1,5 @@
//! Sparse Trie task related functionality.
use super::executor::WorkloadExecutor;
use crate::tree::{
multiproof::{
dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
@@ -13,6 +12,7 @@ use alloy_rlp::{Decodable, Encodable};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use rayon::iter::ParallelIterator;
use reth_primitives_traits::{Account, ParallelBridgeBuffered};
use reth_tasks::Runtime;
use reth_trie::{
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
TrieAccount, EMPTY_ROOT_HASH, TRIE_ACCOUNT_RLP_MAX_SIZE,
@@ -28,7 +28,7 @@ use reth_trie_parallel::{
use reth_trie_sparse::{
errors::{SparseStateTrieResult, SparseTrieErrorKind, SparseTrieResult},
provider::{TrieNodeProvider, TrieNodeProviderFactory},
DeferredDrops, LeafUpdate, ParallelSparseTrie, SparseStateTrie, SparseTrie, SparseTrieExt,
DeferredDrops, LeafUpdate, ParallelSparseTrie, SparseStateTrie, SparseTrie,
};
use revm_primitives::{hash_map::Entry, B256Map};
use smallvec::SmallVec;
@@ -44,8 +44,8 @@ where
BPF: TrieNodeProviderFactory + Send + Sync,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
{
Cleared(SparseTrieTask<BPF, A, S>),
Cached(SparseTrieCacheTask<A, S>),
@@ -56,8 +56,8 @@ where
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
{
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
match self {
@@ -72,6 +72,7 @@ where
max_storage_tries: usize,
max_nodes_capacity: usize,
max_values_capacity: usize,
disable_pruning: bool,
) -> (SparseStateTrie<A, S>, DeferredDrops) {
match self {
Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
@@ -80,6 +81,7 @@ where
max_storage_tries,
max_nodes_capacity,
max_values_capacity,
disable_pruning,
),
}
}
@@ -117,8 +119,8 @@ where
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
{
/// Creates a new sparse trie task with the given trie.
pub(super) const fn new(
@@ -277,12 +279,12 @@ pub(super) struct SparseTrieCacheTask<A = ParallelSparseTrie, S = ParallelSparse
impl<A, S> SparseTrieCacheTask<A, S>
where
A: SparseTrieExt + Default,
S: SparseTrieExt + Default + Clone,
A: SparseTrie + Default,
S: SparseTrie + Default + Clone,
{
/// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`].
pub(super) fn new_with_trie(
executor: &WorkloadExecutor,
executor: &Runtime,
updates: CrossbeamReceiver<MultiProofMessage>,
proof_worker_handle: ProofWorkerHandle,
metrics: MultiProofTaskMetrics,
@@ -356,16 +358,23 @@ where
/// Prunes and shrinks the trie for reuse in the next payload built on top of this one.
///
/// Should be called after the state root result has been sent.
///
/// When `disable_pruning` is true, the trie is preserved without any node pruning,
/// storage trie eviction, or capacity shrinking, keeping the full cache intact for
/// benchmarking purposes.
pub(super) fn into_trie_for_reuse(
self,
prune_depth: usize,
max_storage_tries: usize,
max_nodes_capacity: usize,
max_values_capacity: usize,
disable_pruning: bool,
) -> (SparseStateTrie<A, S>, DeferredDrops) {
let Self { mut trie, .. } = self;
trie.prune(prune_depth, max_storage_tries);
trie.shrink_to(max_nodes_capacity, max_values_capacity);
if !disable_pruning {
trie.prune(prune_depth, max_storage_tries);
trie.shrink_to(max_nodes_capacity, max_values_capacity);
}
let deferred = trie.take_deferred_drops();
(trie, deferred)
}
@@ -407,7 +416,9 @@ where
let update = match message {
Ok(m) => m,
Err(_) => {
break
return Err(ParallelStateRootError::Other(
"updates channel disconnected before state root calculation".to_string(),
))
}
};
@@ -486,7 +497,7 @@ where
}
#[instrument(
level = "debug",
level = "trace",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
@@ -518,7 +529,7 @@ where
/// Processes a hashed state update and encodes all state changes as trie updates.
#[instrument(
level = "debug",
level = "trace",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]

View File

@@ -4,7 +4,7 @@ use crate::tree::{
cached_state::CachedStateProvider,
error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
instrumented_state::InstrumentedStateProvider,
payload_processor::{executor::WorkloadExecutor, PayloadProcessor},
payload_processor::PayloadProcessor,
precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
sparse_trie::StateRootComputeOutcome,
EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle, StateProviderBuilder,
@@ -17,7 +17,6 @@ use alloy_evm::Evm;
use alloy_primitives::B256;
use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle};
use rayon::prelude::*;
use reth_chain_state::{CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, LazyOverlay};
use reth_consensus::{ConsensusError, FullConsensus, ReceiptRootBloom};
use reth_engine_primitives::{
@@ -39,7 +38,7 @@ use reth_provider::{
providers::OverlayStateProviderFactory, BlockExecutionOutput, BlockNumReader, BlockReader,
ChangeSetReader, DatabaseProviderFactory, DatabaseProviderROFactory, HashedPostStateProvider,
ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProvider,
StateProviderFactory, StateReader, StorageChangeSetReader,
StateProviderFactory, StateReader, StorageChangeSetReader, StorageSettingsCache,
};
use reth_revm::db::{states::bundle_state::BundleRetention, State};
use reth_trie::{updates::TrieUpdates, HashedPostState, StateRoot};
@@ -49,7 +48,7 @@ use revm_primitives::Address;
use std::{
collections::HashMap,
panic::{self, AssertUnwindSafe},
sync::Arc,
sync::{mpsc::RecvTimeoutError, Arc},
time::Instant,
};
use tracing::{debug, debug_span, error, info, instrument, trace, warn};
@@ -134,6 +133,8 @@ where
validator: V,
/// Changeset cache for in-memory trie changesets
changeset_cache: ChangesetCache,
/// Task runtime for spawning parallel work.
runtime: reth_tasks::Runtime,
}
impl<N, P, Evm, V> BasicEngineValidator<P, Evm, V>
@@ -145,7 +146,8 @@ where
+ PruneCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader
+ BlockNumReader,
+ BlockNumReader
+ StorageSettingsCache,
> + BlockReader<Header = N::BlockHeader>
+ ChangeSetReader
+ BlockNumReader
@@ -166,10 +168,11 @@ where
config: TreeConfig,
invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
changeset_cache: ChangesetCache,
runtime: reth_tasks::Runtime,
) -> Self {
let precompile_cache_map = PrecompileCacheMap::default();
let payload_processor = PayloadProcessor::new(
WorkloadExecutor::default(),
runtime.clone(),
evm_config.clone(),
&config,
precompile_cache_map.clone(),
@@ -186,6 +189,7 @@ where
metrics: EngineApiMetrics::default(),
validator,
changeset_cache,
runtime,
}
}
@@ -228,35 +232,20 @@ where
V: PayloadValidator<T, Block = N::Block>,
Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
{
match input {
Ok(match input {
BlockOrPayload::Payload(payload) => {
let (iter, convert) = self
let iter = self
.evm_config
.tx_iterator_for_payload(payload)
.map_err(NewPayloadError::other)?
.into();
let iter = Either::Left(iter.into_par_iter().map(Either::Left));
let convert = move |tx| {
let Either::Left(tx) = tx else { unreachable!() };
convert(tx).map(Either::Left).map_err(Either::Left)
};
// Box the closure to satisfy the `Fn` bound both here and in the branch below
Ok((iter, Box::new(convert) as Box<dyn Fn(_) -> _ + Send + Sync + 'static>))
.map_err(NewPayloadError::other)?;
Either::Left(iter)
}
BlockOrPayload::Block(block) => {
let iter = Either::Right(
block.body().clone_transactions().into_par_iter().map(Either::Right),
);
let convert = move |tx: Either<_, N::SignedTx>| {
let Either::Right(tx) = tx else { unreachable!() };
tx.try_into_recovered().map(Either::Right).map_err(Either::Right)
};
Ok((iter, Box::new(convert)))
let txs = block.body().clone_transactions();
let convert = |tx: N::SignedTx| tx.try_into_recovered();
Either::Right((txs, convert))
}
}
})
}
/// Returns a [`ExecutionCtxFor`] for the given payload or block.
@@ -515,7 +504,17 @@ where
match strategy {
StateRootStrategy::StateRootTask => {
debug!(target: "engine::tree::payload_validator", "Using sparse trie state root algorithm");
match handle.state_root() {
let task_result = ensure_ok_post_block!(
self.await_state_root_with_timeout(
&mut handle,
overlay_factory.clone(),
&hashed_state,
),
block
);
match task_result {
Ok(StateRootComputeOutcome { state_root, trie_updates }) => {
let elapsed = root_time.elapsed();
info!(target: "engine::tree::payload_validator", ?state_root, ?elapsed, "State root task finished");
@@ -586,7 +585,7 @@ where
}
let (root, updates) = ensure_ok_post_block!(
self.compute_state_root_serial(overlay_factory.clone(), &hashed_state),
Self::compute_state_root_serial(overlay_factory.clone(), &hashed_state),
block
);
@@ -818,21 +817,18 @@ where
let tx = tx_result.map_err(BlockExecutionError::other)?;
let tx_signer = *<Tx as alloy_evm::RecoveredTx<InnerTx>>::signer(&tx);
let tx_hash = <Tx as alloy_evm::RecoveredTx<InnerTx>>::tx(&tx).tx_hash();
senders.push(tx_signer);
let span = debug_span!(
let _enter = debug_span!(
target: "engine::tree",
"execute tx",
?tx_hash,
gas_used = tracing::field::Empty,
);
let enter = span.entered();
)
.entered();
trace!(target: "engine::tree", "Executing transaction");
let tx_start = Instant::now();
let gas_used = executor.execute_transaction(tx)?;
executor.execute_transaction(tx)?;
self.metrics.record_transaction_execution(tx_start.elapsed());
let current_len = executor.receipts().len();
@@ -844,8 +840,6 @@ where
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
}
}
enter.record("gas_used", gas_used);
}
drop(exec_span);
@@ -874,7 +868,8 @@ where
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
let overlay_factory =
overlay_factory.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
ParallelStateRoot::new(overlay_factory, prefix_sets).incremental_root_with_updates()
ParallelStateRoot::new(overlay_factory, prefix_sets, self.runtime.clone())
.incremental_root_with_updates()
}
/// Compute state root for the given hashed post state in serial.
@@ -883,7 +878,6 @@ where
/// [`HashedPostState`] containing the changes of this block, to compute the state root and
/// trie updates for this block.
fn compute_state_root_serial(
&self,
overlay_factory: OverlayStateProviderFactory<P>,
hashed_state: &HashedPostState,
) -> ProviderResult<(B256, TrieUpdates)> {
@@ -901,6 +895,102 @@ where
.root_with_updates()?)
}
/// Awaits the state root from the background task, with an optional timeout fallback.
///
/// If a timeout is configured (`state_root_task_timeout`), this method first waits for the
/// state root task up to the timeout duration. If the task doesn't complete in time, a
/// sequential state root computation is spawned via `spawn_blocking`. Both computations
/// then race: the main thread polls the task receiver and the sequential result channel
/// in a loop, returning whichever finishes first.
///
/// If no timeout is configured, this simply awaits the state root task without any fallback.
///
/// Returns `ProviderResult<Result<...>>` where the outer `ProviderResult` captures
/// unrecoverable errors from the sequential fallback (e.g. DB errors), while the inner
/// `Result` captures parallel state root task errors that can still fall back to serial.
#[instrument(
level = "debug",
target = "engine::tree::payload_validator",
name = "await_state_root",
skip_all
)]
fn await_state_root_with_timeout<Tx, Err, R: Send + Sync + 'static>(
&self,
handle: &mut PayloadHandle<Tx, Err, R>,
overlay_factory: OverlayStateProviderFactory<P>,
hashed_state: &HashedPostState,
) -> ProviderResult<Result<StateRootComputeOutcome, ParallelStateRootError>> {
let Some(timeout) = self.config.state_root_task_timeout() else {
return Ok(handle.state_root());
};
let task_rx = handle.take_state_root_rx();
match task_rx.recv_timeout(timeout) {
Ok(result) => Ok(result),
Err(RecvTimeoutError::Disconnected) => {
Ok(Err(ParallelStateRootError::Other("sparse trie task dropped".to_string())))
}
Err(RecvTimeoutError::Timeout) => {
warn!(
target: "engine::tree::payload_validator",
?timeout,
"State root task timed out, spawning sequential fallback"
);
self.metrics.block_validation.state_root_task_timeout_total.increment(1);
let (seq_tx, seq_rx) =
std::sync::mpsc::channel::<ProviderResult<(B256, TrieUpdates)>>();
let seq_overlay = overlay_factory;
let seq_hashed_state = hashed_state.clone();
self.payload_processor.executor().spawn_blocking(move || {
let result = Self::compute_state_root_serial(seq_overlay, &seq_hashed_state);
let _ = seq_tx.send(result);
});
const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
loop {
match task_rx.recv_timeout(POLL_INTERVAL) {
Ok(result) => {
debug!(
target: "engine::tree::payload_validator",
source = "task",
"State root timeout race won"
);
return Ok(result);
}
Err(RecvTimeoutError::Disconnected) => {
debug!(
target: "engine::tree::payload_validator",
"State root task dropped, waiting for sequential fallback"
);
let result = seq_rx.recv().map_err(|_| {
ProviderError::other(std::io::Error::other(
"both state root computations failed",
))
})?;
let (state_root, trie_updates) = result?;
return Ok(Ok(StateRootComputeOutcome { state_root, trie_updates }));
}
Err(RecvTimeoutError::Timeout) => {}
}
if let Ok(result) = seq_rx.try_recv() {
debug!(
target: "engine::tree::payload_validator",
source = "sequential",
"State root timeout race won"
);
let (state_root, trie_updates) = result?;
return Ok(Ok(StateRootComputeOutcome { state_root, trie_updates }));
}
}
}
}
}
/// Compares trie updates from the state root task with serial state root computation.
///
/// This is used for debugging and validating the correctness of the parallel state root
@@ -915,7 +1005,7 @@ where
) {
debug!(target: "engine::tree::payload_validator", "Comparing trie updates with serial computation");
match self.compute_state_root_serial(overlay_factory.clone(), hashed_state) {
match Self::compute_state_root_serial(overlay_factory.clone(), hashed_state) {
Ok((serial_root, serial_trie_updates)) => {
debug!(
target: "engine::tree::payload_validator",
@@ -1437,7 +1527,8 @@ where
+ PruneCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader
+ BlockNumReader,
+ BlockNumReader
+ StorageSettingsCache,
> + BlockReader<Header = N::BlockHeader>
+ StateProviderFactory
+ StateReader

View File

@@ -203,6 +203,7 @@ impl TestHarness {
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
changeset_cache.clone(),
reth_tasks::Runtime::test(),
);
let tree = EngineApiTreeHandler::new(
@@ -220,6 +221,7 @@ impl TestHarness {
EngineApiKind::Ethereum,
evm_config,
changeset_cache,
provider.cached_storage_settings().use_hashed_state(),
);
let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
@@ -404,6 +406,7 @@ impl ValidatorTestHarness {
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
changeset_cache,
reth_tasks::Runtime::test(),
);
Self { harness, validator, metrics: TestMetrics::default() }

View File

@@ -2,13 +2,15 @@
mod fcu_finalized_blocks;
use alloy_rpc_types_engine::PayloadStatusEnum;
use eyre::Result;
use reth_chainspec::{ChainSpecBuilder, MAINNET};
use reth_e2e_test_utils::testsuite::{
actions::{
CaptureBlock, CompareNodeChainTips, CreateFork, ExpectFcuStatus, MakeCanonical,
ProduceBlocks, ProduceBlocksLocally, ProduceInvalidBlocks, ReorgTo, SelectActiveNode,
SendNewPayloads, UpdateBlockInfo, ValidateCanonicalTag, WaitForSync,
BlockReference, CaptureBlock, CompareNodeChainTips, CreateFork, ExpectFcuStatus,
MakeCanonical, ProduceBlocks, ProduceBlocksLocally, ProduceInvalidBlocks, ReorgTo,
SelectActiveNode, SendForkchoiceUpdate, SendNewPayloads, SetForkBase, UpdateBlockInfo,
ValidateCanonicalTag, WaitForSync,
},
setup::{NetworkSetup, Setup},
TestBuilder,
@@ -39,6 +41,14 @@ fn default_engine_tree_setup() -> Setup<EthEngineTypes> {
)
}
/// Creates a v2 storage mode setup for engine tree e2e tests.
///
/// v2 mode uses keccak256-hashed slot keys in static file changesets and rocksdb history
/// instead of plain keys in MDBX.
fn v2_engine_tree_setup() -> Setup<EthEngineTypes> {
default_engine_tree_setup().with_storage_v2()
}
/// Test that verifies forkchoice update and canonical chain insertion functionality.
#[tokio::test]
async fn test_engine_tree_fcu_canon_chain_insertion_e2e() -> Result<()> {
@@ -334,3 +344,152 @@ async fn test_engine_tree_live_sync_transition_eventually_canonical_e2e() -> Res
Ok(())
}
// ==================== v2 storage mode variants ====================
/// v2 variant: Verifies forkchoice update and canonical chain insertion in v2 storage mode.
///
/// Exercises the full `save_blocks` → `write_state` → static file changeset path with hashed keys.
#[tokio::test]
async fn test_engine_tree_fcu_canon_chain_insertion_v2_e2e() -> Result<()> {
reth_tracing::init_test_tracing();
let test = TestBuilder::new()
.with_setup(v2_engine_tree_setup())
.with_action(ProduceBlocks::<EthEngineTypes>::new(1))
.with_action(MakeCanonical::new())
.with_action(ProduceBlocks::<EthEngineTypes>::new(3))
.with_action(MakeCanonical::new());
test.run::<EthereumNode>().await?;
Ok(())
}
/// v2 variant: Verifies forkchoice update with a reorg where all blocks are already available.
///
/// Exercises `write_state_reverts` path with hashed changeset keys during CL-driven reorgs.
#[tokio::test]
async fn test_engine_tree_fcu_reorg_with_all_blocks_v2_e2e() -> Result<()> {
reth_tracing::init_test_tracing();
let test = TestBuilder::new()
.with_setup(v2_engine_tree_setup())
.with_action(ProduceBlocks::<EthEngineTypes>::new(5))
.with_action(MakeCanonical::new())
.with_action(CreateFork::<EthEngineTypes>::new(2, 3))
.with_action(CaptureBlock::new("fork_tip"))
.with_action(ReorgTo::<EthEngineTypes>::new_from_tag("fork_tip"));
test.run::<EthereumNode>().await?;
Ok(())
}
/// v2 variant: Verifies progressive canonical chain extension in v2 storage mode.
#[tokio::test]
async fn test_engine_tree_fcu_extends_canon_chain_v2_e2e() -> Result<()> {
reth_tracing::init_test_tracing();
let test = TestBuilder::new()
.with_setup(v2_engine_tree_setup())
.with_action(ProduceBlocks::<EthEngineTypes>::new(1))
.with_action(MakeCanonical::new())
.with_action(ProduceBlocks::<EthEngineTypes>::new(10))
.with_action(CaptureBlock::new("target_block"))
.with_action(ReorgTo::<EthEngineTypes>::new_from_tag("target_block"))
.with_action(MakeCanonical::new());
test.run::<EthereumNode>().await?;
Ok(())
}
/// Creates a 2-node setup for disk-level reorg testing.
///
/// Uses unconnected nodes so fork blocks can be produced independently on Node 1 and then
/// sent to Node 0 via newPayload only (no FCU), keeping Node 0's persisted chain intact
/// until the final `ReorgTo` triggers `find_disk_reorg`.
fn disk_reorg_setup(storage_v2: bool) -> Setup<EthEngineTypes> {
let mut setup = Setup::default()
.with_chain_spec(Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(
serde_json::from_str(include_str!(
"../../../../e2e-test-utils/src/testsuite/assets/genesis.json"
))
.unwrap(),
)
.cancun_activated()
.build(),
))
.with_network(NetworkSetup::multi_node_unconnected(2))
.with_tree_config(
TreeConfig::default().with_legacy_state_root(false).with_has_enough_parallelism(true),
);
if storage_v2 {
setup = setup.with_storage_v2();
}
setup
}
/// Builds a disk-level reorg test scenario.
///
/// 1. Both nodes receive 3 shared blocks
/// 2. Node 0 extends to 10 blocks locally (persisted to disk)
/// 3. Node 1 builds an 8-block fork from block 3 (its canonical head)
/// 4. Fork blocks are sent to Node 0 via newPayload (no FCU, old chain stays on disk)
/// 5. FCU to fork tip on Node 0 triggers `find_disk_reorg` → `RemoveBlocksAbove(3)`
fn disk_reorg_test(storage_v2: bool) -> TestBuilder<EthEngineTypes> {
TestBuilder::new()
.with_setup(disk_reorg_setup(storage_v2))
.with_action(SelectActiveNode::new(0))
.with_action(ProduceBlocks::<EthEngineTypes>::new(3))
.with_action(MakeCanonical::new())
.with_action(ProduceBlocksLocally::<EthEngineTypes>::new(7))
.with_action(MakeCanonical::with_active_node())
.with_action(SelectActiveNode::new(1))
.with_action(SetForkBase::new(3))
.with_action(ProduceBlocksLocally::<EthEngineTypes>::new(8))
.with_action(MakeCanonical::with_active_node())
.with_action(CaptureBlock::new("fork_tip"))
.with_action(
SendNewPayloads::<EthEngineTypes>::new()
.with_source_node(1)
.with_target_node(0)
.with_start_block(4)
.with_total_blocks(8),
)
.with_action(
SendForkchoiceUpdate::<EthEngineTypes>::new(
BlockReference::Tag("fork_tip".into()),
BlockReference::Tag("fork_tip".into()),
BlockReference::Tag("fork_tip".into()),
)
.with_expected_status(PayloadStatusEnum::Valid)
.with_node_idx(0),
)
}
/// Verifies disk-level reorg in v1 (plain key) storage mode.
///
/// Confirms `find_disk_reorg()` detects persisted blocks on the wrong fork and calls
/// `RemoveBlocksAbove` to truncate, then re-persists the correct fork chain.
#[tokio::test]
async fn test_engine_tree_disk_reorg_v1_e2e() -> Result<()> {
reth_tracing::init_test_tracing();
disk_reorg_test(false).run::<EthereumNode>().await?;
Ok(())
}
/// v2 variant: Verifies disk-level reorg in v2 storage mode.
///
/// Same scenario as v1 but with hashed changeset keys in static files and rocksdb history.
/// Exercises `find_disk_reorg()` → `RemoveBlocksAbove` with v2 hashed key format.
#[tokio::test]
async fn test_engine_tree_disk_reorg_v2_e2e() -> Result<()> {
reth_tracing::init_test_tracing();
disk_reorg_test(true).run::<EthereumNode>().await?;
Ok(())
}

View File

@@ -22,6 +22,7 @@ reth-node-core.workspace = true
reth-node-ethereum.workspace = true
reth-node-metrics.workspace = true
reth-rpc-server-types.workspace = true
reth-tasks.workspace = true
reth-tracing.workspace = true
reth-node-api.workspace = true

View File

@@ -17,6 +17,7 @@ use reth_node_builder::{NodeBuilder, WithLaunchContext};
use reth_node_ethereum::{consensus::EthBeaconConsensus, EthEvmConfig, EthereumNode};
use reth_node_metrics::recorder::install_prometheus_recorder;
use reth_rpc_server_types::RpcModuleValidator;
use reth_tasks::RayonConfig;
use reth_tracing::{FileWorkerGuard, Layers};
use std::{fmt, sync::Arc};
@@ -153,6 +154,16 @@ where
Rpc::validate_selection(ws_api, "ws.api").map_err(|e| eyre!("{e}"))?;
}
let rayon_config = RayonConfig {
reserved_cpu_cores: command.engine.reserved_cpu_cores,
proof_storage_worker_threads: command.engine.storage_worker_count,
proof_account_worker_threads: command.engine.account_worker_count,
..Default::default()
};
let runner = CliRunner::try_with_runtime_config(
reth_tasks::RuntimeConfig::default().with_rayon(rayon_config),
)?;
runner.run_command_until_exit(|ctx| {
command.execute(ctx, FnLauncher::new::<C, Ext>(launcher))
})

View File

@@ -92,7 +92,7 @@ impl<
/// This accepts a closure that is used to launch the node via the
/// [`NodeCommand`](node::NodeCommand).
///
/// This command will be run on the [default tokio runtime](reth_cli_runner::tokio_runtime).
/// This command will be run on the default tokio runtime.
///
///
/// # Example
@@ -143,7 +143,7 @@ impl<
/// This accepts a closure that is used to launch the node via the
/// [`NodeCommand`](node::NodeCommand).
///
/// This command will be run on the [default tokio runtime](reth_cli_runner::tokio_runtime).
/// This command will be run on the default tokio runtime.
pub fn run_with_components<N>(
self,
components: impl CliComponentsBuilder<N>,

View File

@@ -112,4 +112,5 @@ test-utils = [
"reth-primitives-traits/test-utils",
"reth-evm-ethereum/test-utils",
"reth-stages-types/test-utils",
"reth-tasks/test-utils",
]

View File

@@ -107,26 +107,16 @@ impl EthereumNode {
/// use reth_chainspec::MAINNET;
/// use reth_node_ethereum::EthereumNode;
///
/// let factory = EthereumNode::provider_factory_builder()
/// .open_read_only(MAINNET.clone(), "datadir")
/// .unwrap();
/// fn demo(runtime: reth_tasks::Runtime) {
/// let factory = EthereumNode::provider_factory_builder()
/// .open_read_only(MAINNET.clone(), "datadir", runtime)
/// .unwrap();
/// }
/// ```
///
/// # Open a Providerfactory manually with all required components
///
/// ```no_run
/// use reth_chainspec::ChainSpecBuilder;
/// use reth_db::open_db_read_only;
/// use reth_node_ethereum::EthereumNode;
/// use reth_provider::providers::{RocksDBProvider, StaticFileProvider};
///
/// let factory = EthereumNode::provider_factory_builder()
/// .db(open_db_read_only("db", Default::default()).unwrap())
/// .chainspec(ChainSpecBuilder::mainnet().build().into())
/// .static_file(StaticFileProvider::read_only("db/static_files", false).unwrap())
/// .rocksdb_provider(RocksDBProvider::builder("db/rocksdb").build().unwrap())
/// .build_provider_factory();
/// ```
/// See also [`ProviderFactory::new`](reth_provider::ProviderFactory::new) for constructing
/// a [`ProviderFactory`](reth_provider::ProviderFactory) manually with all required
/// components.
pub fn provider_factory_builder() -> ProviderFactoryBuilder<Self> {
ProviderFactoryBuilder::default()
}
@@ -513,7 +503,7 @@ where
// it doesn't impact the first block or the first gossiped blob transaction, so we
// initialize this in the background
let kzg_settings = validator.validator().kzg_settings().clone();
ctx.task_executor().spawn_blocking(async move {
ctx.task_executor().spawn_blocking_task(async move {
let _ = kzg_settings.get();
debug!(target: "reth::cli", "Initialized KZG settings");
});

View File

@@ -10,7 +10,7 @@ use reth_ethereum_primitives::PooledTransactionVariant;
use reth_node_builder::{NodeBuilder, NodeHandle};
use reth_node_core::{args::RpcServerArgs, node_config::NodeConfig};
use reth_node_ethereum::EthereumNode;
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use reth_transaction_pool::TransactionPool;
use std::{
sync::Arc,
@@ -20,8 +20,7 @@ use std::{
#[tokio::test]
async fn can_handle_blobs() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let exec = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
let chain_spec = Arc::new(
@@ -37,7 +36,7 @@ async fn can_handle_blobs() -> eyre::Result<()> {
.with_unused_ports()
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http());
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
.testing_node(exec.clone())
.testing_node(runtime.clone())
.node(EthereumNode::default())
.launch()
.await?;
@@ -92,8 +91,7 @@ async fn can_handle_blobs() -> eyre::Result<()> {
#[tokio::test]
async fn can_send_legacy_sidecar_post_activation() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let exec = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
let chain_spec = Arc::new(
@@ -107,7 +105,7 @@ async fn can_send_legacy_sidecar_post_activation() -> eyre::Result<()> {
.with_force_blob_sidecar_upcasting(),
);
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
.testing_node(exec.clone())
.testing_node(runtime.clone())
.node(EthereumNode::default())
.launch()
.await?;
@@ -146,8 +144,7 @@ async fn can_send_legacy_sidecar_post_activation() -> eyre::Result<()> {
#[tokio::test]
async fn blob_conversion_at_osaka() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let exec = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let current_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
// Osaka activates in 2 slots
@@ -170,7 +167,7 @@ async fn blob_conversion_at_osaka() -> eyre::Result<()> {
.with_force_blob_sidecar_upcasting(),
);
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
.testing_node(exec.clone())
.testing_node(runtime.clone())
.node(EthereumNode::default())
.launch()
.await?;

View File

@@ -27,7 +27,7 @@ async fn can_run_eth_node_with_custom_genesis_number() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, wallet) =
let (mut nodes, wallet) =
setup::<EthereumNode>(1, chain_spec, false, eth_payload_attributes).await?;
let mut node = nodes.pop().unwrap();
@@ -81,7 +81,7 @@ async fn custom_genesis_block_query_boundaries() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, _wallet) =
let (mut nodes, _wallet) =
setup::<EthereumNode>(1, chain_spec, false, eth_payload_attributes).await?;
let node = nodes.pop().unwrap();

View File

@@ -9,20 +9,19 @@ use reth_node_core::args::DevArgs;
use reth_node_ethereum::{node::EthereumAddOns, EthereumNode};
use reth_provider::{providers::BlockchainProvider, CanonStateSubscriptions};
use reth_rpc_eth_api::{helpers::EthTransactions, EthApiServer};
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use std::sync::Arc;
#[tokio::test]
async fn can_run_dev_node() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let exec = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let node_config = NodeConfig::test()
.with_chain(custom_chain())
.with_dev(DevArgs { dev: true, ..Default::default() });
let NodeHandle { node, .. } = NodeBuilder::new(node_config.clone())
.testing_node(exec.clone())
.testing_node(runtime.clone())
.with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
.with_components(EthereumNode::components())
.with_add_ons(EthereumAddOns::default())
@@ -37,15 +36,14 @@ async fn can_run_dev_node() -> eyre::Result<()> {
#[tokio::test]
async fn can_run_dev_node_custom_attributes() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let exec = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let node_config = NodeConfig::test()
.with_chain(custom_chain())
.with_dev(DevArgs { dev: true, ..Default::default() });
let fee_recipient = Address::random();
let NodeHandle { node, .. } = NodeBuilder::new(node_config.clone())
.testing_node(exec.clone())
.testing_node(runtime.clone())
.with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
.with_components(EthereumNode::components())
.with_add_ons(EthereumAddOns::default())

View File

@@ -14,14 +14,14 @@ use reth_node_core::{args::RpcServerArgs, node_config::NodeConfig};
use reth_node_ethereum::EthereumNode;
use reth_provider::BlockNumReader;
use reth_rpc_api::TestingBuildBlockRequestV1;
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use std::sync::Arc;
#[tokio::test]
async fn can_run_eth_node() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let (mut nodes, _tasks, wallet) = setup::<EthereumNode>(
let (mut nodes, wallet) = setup::<EthereumNode>(
1,
Arc::new(
ChainSpecBuilder::default()
@@ -57,8 +57,7 @@ async fn can_run_eth_node() -> eyre::Result<()> {
#[cfg(unix)]
async fn can_run_eth_node_with_auth_engine_api_over_ipc() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let exec = TaskManager::current();
let exec = exec.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
// Chain spec with test allocs
let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
@@ -76,7 +75,7 @@ async fn can_run_eth_node_with_auth_engine_api_over_ipc() -> eyre::Result<()> {
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http().with_auth_ipc());
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
.testing_node(exec)
.testing_node(runtime)
.node(EthereumNode::default())
.launch()
.await?;
@@ -105,8 +104,7 @@ async fn can_run_eth_node_with_auth_engine_api_over_ipc() -> eyre::Result<()> {
#[cfg(unix)]
async fn test_failed_run_eth_node_with_no_auth_engine_api_over_ipc_opts() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let exec = TaskManager::current();
let exec = exec.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
// Chain spec with test allocs
let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
@@ -121,7 +119,7 @@ async fn test_failed_run_eth_node_with_no_auth_engine_api_over_ipc_opts() -> eyr
// Node setup
let node_config = NodeConfig::test().with_chain(chain_spec);
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
.testing_node(exec)
.testing_node(runtime)
.node(EthereumNode::default())
.launch()
.await?;
@@ -139,7 +137,7 @@ async fn test_failed_run_eth_node_with_no_auth_engine_api_over_ipc_opts() -> eyr
async fn test_engine_graceful_shutdown() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let (mut nodes, _tasks, wallet) = setup::<EthereumNode>(
let (mut nodes, wallet) = setup::<EthereumNode>(
1,
Arc::new(
ChainSpecBuilder::default()
@@ -190,8 +188,7 @@ async fn test_engine_graceful_shutdown() -> eyre::Result<()> {
#[tokio::test]
async fn test_testing_build_block_v1_osaka() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let exec = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
let chain_spec = Arc::new(
@@ -208,7 +205,7 @@ async fn test_testing_build_block_v1_osaka() -> eyre::Result<()> {
);
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
.testing_node(exec)
.testing_node(runtime)
.node(EthereumNode::default())
.launch()
.await?;
@@ -278,7 +275,7 @@ async fn test_sparse_trie_reuse_across_blocks() -> eyre::Result<()> {
.with_sparse_trie_prune_depth(2)
.with_sparse_trie_max_storage_tries(100);
let (mut nodes, _tasks, _wallet) = setup_engine::<EthereumNode>(
let (mut nodes, _wallet) = setup_engine::<EthereumNode>(
1,
Arc::new(
ChainSpecBuilder::default()

View File

@@ -37,7 +37,7 @@ async fn can_handle_invalid_payload_then_valid() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
2,
chain_spec.clone(),
false,
@@ -154,7 +154,7 @@ async fn can_handle_multiple_invalid_payloads() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
2,
chain_spec.clone(),
false,
@@ -255,7 +255,7 @@ async fn can_handle_invalid_payload_with_transactions() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
2,
chain_spec.clone(),
false,

View File

@@ -18,7 +18,7 @@ use std::{sync::Arc, time::Duration};
async fn can_sync() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let (mut nodes, _tasks, wallet) = setup::<EthereumNode>(
let (mut nodes, wallet) = setup::<EthereumNode>(
2,
Arc::new(
ChainSpecBuilder::default()
@@ -74,7 +74,7 @@ async fn e2e_test_send_transactions() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, _) = setup_engine::<EthereumNode>(
let (mut nodes, _) = setup_engine::<EthereumNode>(
2,
chain_spec.clone(),
false,
@@ -116,7 +116,7 @@ async fn test_long_reorg() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, _) = setup_engine::<EthereumNode>(
let (mut nodes, _) = setup_engine::<EthereumNode>(
2,
chain_spec.clone(),
false,
@@ -172,7 +172,7 @@ async fn test_reorg_through_backfill() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, _) = setup_engine::<EthereumNode>(
let (mut nodes, _) = setup_engine::<EthereumNode>(
2,
chain_spec.clone(),
false,
@@ -236,7 +236,7 @@ async fn test_tx_propagation() -> eyre::Result<()> {
};
// Setup 10 nodes
let (mut nodes, _tasks, _) = setup_engine_with_connection::<EthereumNode>(
let (mut nodes, _) = setup_engine_with_connection::<EthereumNode>(
10,
chain_spec.clone(),
false,

View File

@@ -12,7 +12,7 @@ use reth_node_core::{args::RpcServerArgs, node_config::NodeConfig};
use reth_node_ethereum::EthereumNode;
use reth_primitives_traits::Recovered;
use reth_provider::CanonStateSubscriptions;
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use reth_transaction_pool::{
blobstore::InMemoryBlobStore, test_utils::OkValidator, BlockInfo, CoinbaseTipOrdering,
EthPooledTransaction, Pool, PoolTransaction, TransactionOrigin, TransactionPool,
@@ -24,8 +24,7 @@ use std::{sync::Arc, time::Duration};
#[tokio::test]
async fn maintain_txpool_stale_eviction() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let executor = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let txpool = Pool::new(
OkValidator::default(),
@@ -49,7 +48,7 @@ async fn maintain_txpool_stale_eviction() -> eyre::Result<()> {
.with_unused_ports()
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http());
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
.testing_node(executor.clone())
.testing_node(runtime.clone())
.node(EthereumNode::default())
.launch()
.await?;
@@ -63,13 +62,13 @@ async fn maintain_txpool_stale_eviction() -> eyre::Result<()> {
..Default::default()
};
executor.spawn_critical(
runtime.spawn_critical_task(
"txpool maintenance task",
reth_transaction_pool::maintain::maintain_transaction_pool_future(
node.inner.provider.clone(),
txpool.clone(),
node.inner.provider.clone().canonical_state_stream(),
executor.clone(),
runtime.clone(),
config,
),
);
@@ -98,8 +97,7 @@ async fn maintain_txpool_stale_eviction() -> eyre::Result<()> {
#[tokio::test]
async fn maintain_txpool_reorg() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let executor = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let txpool = Pool::new(
OkValidator::default(),
@@ -124,7 +122,7 @@ async fn maintain_txpool_reorg() -> eyre::Result<()> {
.with_unused_ports()
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http());
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
.testing_node(executor.clone())
.testing_node(runtime.clone())
.node(EthereumNode::default())
.launch()
.await?;
@@ -135,13 +133,13 @@ async fn maintain_txpool_reorg() -> eyre::Result<()> {
let w1 = wallets.first().unwrap();
let w2 = wallets.last().unwrap();
executor.spawn_critical(
runtime.spawn_critical_task(
"txpool maintenance task",
reth_transaction_pool::maintain::maintain_transaction_pool_future(
node.inner.provider.clone(),
txpool.clone(),
node.inner.provider.clone().canonical_state_stream(),
executor.clone(),
runtime.clone(),
reth_transaction_pool::maintain::MaintainPoolConfig::default(),
),
);
@@ -231,8 +229,7 @@ async fn maintain_txpool_reorg() -> eyre::Result<()> {
#[tokio::test]
async fn maintain_txpool_commit() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let executor = tasks.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let txpool = Pool::new(
OkValidator::default(),
@@ -256,7 +253,7 @@ async fn maintain_txpool_commit() -> eyre::Result<()> {
.with_unused_ports()
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http());
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
.testing_node(executor.clone())
.testing_node(runtime.clone())
.node(EthereumNode::default())
.launch()
.await?;
@@ -265,13 +262,13 @@ async fn maintain_txpool_commit() -> eyre::Result<()> {
let wallet = Wallet::default();
executor.spawn_critical(
runtime.spawn_critical_task(
"txpool maintenance task",
reth_transaction_pool::maintain::maintain_transaction_pool_future(
node.inner.provider.clone(),
txpool.clone(),
node.inner.provider.clone().canonical_state_stream(),
executor.clone(),
runtime.clone(),
reth_transaction_pool::maintain::MaintainPoolConfig::default(),
),
);

View File

@@ -12,7 +12,7 @@ use reth_node_builder::{NodeBuilder, NodeHandle};
use reth_node_core::{args::RpcServerArgs, node_config::NodeConfig};
use reth_node_ethereum::EthereumNode;
use reth_rpc_server_types::RpcModuleSelection;
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use serde::Deserialize;
use std::sync::Arc;
@@ -29,8 +29,7 @@ async fn debug_trace_call_matches_geth_prestate_snapshot() -> Result<()> {
let mut genesis: Genesis = MAINNET.genesis().clone();
genesis.coinbase = address!("0x95222290dd7278aa3ddd389cc1e1d165cc4bafe5");
let exec = TaskManager::current();
let exec = exec.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
let expected_frame = expected_snapshot_frame()?;
let prestate_mode = match &expected_frame {
@@ -63,7 +62,7 @@ async fn debug_trace_call_matches_geth_prestate_snapshot() -> Result<()> {
);
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
.testing_node(exec)
.testing_node(runtime)
.node(EthereumNode::default())
.launch()
.await?;

View File

@@ -21,7 +21,7 @@ use reth_node_core::{
use reth_node_ethereum::EthereumNode;
use reth_payload_primitives::BuiltPayload;
use reth_rpc_api::servers::AdminApiServer;
use reth_tasks::TaskManager;
use reth_tasks::Runtime;
use std::{
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
@@ -57,7 +57,7 @@ async fn test_fee_history() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
1,
chain_spec.clone(),
false,
@@ -142,7 +142,7 @@ async fn test_flashbots_validate_v3() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
1,
chain_spec.clone(),
false,
@@ -224,7 +224,7 @@ async fn test_flashbots_validate_v4() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
1,
chain_spec.clone(),
false,
@@ -314,7 +314,7 @@ async fn test_eth_config() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
1,
chain_spec.clone(),
false,
@@ -344,8 +344,7 @@ async fn test_eth_config() -> eyre::Result<()> {
async fn test_admin_external_ip() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let exec = TaskManager::current();
let exec = exec.executor();
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
// Chain spec with test allocs
let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
@@ -363,7 +362,7 @@ async fn test_admin_external_ip() -> eyre::Result<()> {
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http());
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
.testing_node(exec)
.testing_node(runtime)
.node(EthereumNode::default())
.launch()
.await?;

View File

@@ -142,7 +142,7 @@ async fn test_selfdestruct_post_dencun() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
let (mut nodes, _tasks, wallet) =
let (mut nodes, wallet) =
setup_engine::<EthereumNode>(1, cancun_spec(), false, tree_config, eth_payload_attributes)
.await?;
let mut node = nodes.pop().unwrap();
@@ -236,7 +236,7 @@ async fn test_selfdestruct_same_tx_post_dencun() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
let (mut nodes, _tasks, wallet) =
let (mut nodes, wallet) =
setup_engine::<EthereumNode>(1, cancun_spec(), false, tree_config, eth_payload_attributes)
.await?;
let mut node = nodes.pop().unwrap();
@@ -311,7 +311,7 @@ async fn test_selfdestruct_pre_dencun() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
1,
shanghai_spec(),
false,
@@ -421,7 +421,7 @@ async fn test_selfdestruct_same_tx_preexisting_account_post_dencun() -> eyre::Re
reth_tracing::init_test_tracing();
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
let (mut nodes, _tasks, wallet) =
let (mut nodes, wallet) =
setup_engine::<EthereumNode>(1, cancun_spec(), false, tree_config, eth_payload_attributes)
.await?;
let mut node = nodes.pop().unwrap();

View File

@@ -29,7 +29,7 @@ async fn test_simulate_v1_with_max_fee_per_blob_gas_only() -> eyre::Result<()> {
.build(),
);
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
1,
chain_spec.clone(),
false,

Some files were not shown because too many files have changed in this diff Show More