Compare commits

...

54 Commits

Author SHA1 Message Date
Brian Picciano
d6324d63e2 chore: release 1.11.3 2026-03-12 12:34:39 +01:00
Brian Picciano
5f3ade1bfe fix(trie): Reset proof v2 calculator on error (#22781)
Co-authored-by: Amp <amp@ampcode.com>
2026-03-12 10:09:18 +00:00
Derek Cofausper
b053f6fafe cherry-pick: fix don't produce both updates and removals for trie nodes (#22507)
Co-Authored-By: Arsenii Kulikov <62447812+klkvr@users.noreply.github.com>
2026-03-12 02:30:25 +00:00
Derek Cofausper
2a58e7a077 cherry-pick: install rayon panic handler (37f5b3a)
Co-Authored-By: Arsenii Kulikov <62447812+klkvr@users.noreply.github.com>
2026-03-12 02:30:17 +00:00
Emma Jamieson-Hoare
793a3d5fb3 fix missing import 2026-03-10 11:44:07 +00:00
Emma Jamieson-Hoare
89ae1af694 chore: upgrade to 1.11.2 2026-03-10 10:48:03 +00:00
Alexey Shekhirin
9c33fb5d45 fix(engine): reset execution cache hash on clear (#22895)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 10:46:09 +00:00
Alexey Shekhirin
bef3d7b4d1 fix lockfile 2026-02-23 18:36:44 +00:00
Emma Jamieson-Hoare
e918c17af9 chore: release 1.11.1
Amp-Thread-ID: https://ampcode.com/threads/T-019c8ba4-fd85-736b-9d2d-e878d350a91b
Co-authored-by: Amp <amp@ampcode.com>
2026-02-23 18:02:14 +00:00
Arsenii Kulikov
fcc170d53c fix: properly reveal trie nodes (#22415) 2026-02-23 17:58:13 +00:00
Arsenii Kulikov
c685842ba2 fix: overlay preparation on tokio (#22492) 2026-02-23 17:57:51 +00:00
Georgios Konstantopoulos
564ffa5868 fix(ci): pass docker tags as separate set entries in bake action (#22151)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 22:10:35 +00:00
Dan Cline
12891dd171 chore: allow invalid storage metadata (#22150) 2026-02-12 22:02:26 +00:00
Emma Jamieson-Hoare
c1015022f5 chore: release reth v1.11.0 (#22148) 2026-02-12 21:39:30 +00:00
Dan Cline
e3fe6326bc chore(storage): rm storage settings, use only one (#22042)
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
2026-02-12 21:17:05 +00:00
Dan Cline
e3d520b24f feat(network): add inbound / outbound scopes for disconnect reasons (#22070) 2026-02-12 20:54:03 +00:00
Dan Cline
9f29939ea1 feat: bundle mdbx_copy as reth db copy subcommand (#22061)
Co-authored-by: Emma Jamieson-Hoare <emmajam@users.noreply.github.com>
2026-02-12 20:39:56 +00:00
Matthias Seitz
10881d1c73 chore: fix book (#22142) 2026-02-12 21:44:53 +01:00
John Letey
408593467b feat(download): optional chain-aware snapshot url (#22119) 2026-02-12 21:42:19 +01:00
Emma Jamieson-Hoare
8caf8cdf11 docs: improve reth.rs/overview page (#22131)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 20:10:34 +00:00
Georgios Konstantopoulos
1e8030ef28 fix(engine): return error on updates channel disconnect in sparse trie task (#22139)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 20:00:36 +00:00
YK
f72c503d6f feat(metrics): use 5M first gas bucket for finer-grained newPayload metrics (#22136)
Co-authored-by: Ubuntu <ubuntu@dev-yk.tail388b2e.ts.net>
2026-02-12 19:03:21 +00:00
Emma Jamieson-Hoare
42890e6e7f fix: improve nightly Docker build failure Slack notification (#22130)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 18:58:55 +00:00
Dan Cline
e30e441ada fix: stage drop prunes account/storage changeset static files (#22062) 2026-02-12 18:34:46 +00:00
Georgios Konstantopoulos
121160d248 refactor(db): use hashed state as canonical state representation (#21115)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
2026-02-12 18:02:02 +00:00
Georgios Konstantopoulos
7ff78ca082 perf(engine): use transaction count threshold for prewarm skip (#22094)
Co-authored-by: yk <yongkang@tempo.xyz>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Ubuntu <ubuntu@dev-yk.tail388b2e.ts.net>
Co-authored-by: YK <chiayongkang@hotmail.com>
2026-02-12 17:07:52 +00:00
Georgios Konstantopoulos
d7f56d509c chore: add DaniPopes as codeowner for tasks crate (#22128)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 12:08:02 -05:00
Georgios Konstantopoulos
3300e404cf feat(engine): add --engine.disable-sparse-trie-cache-pruning flag (#21967)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: mattsse <19890894+mattsse@users.noreply.github.com>
Co-authored-by: alexey <17802178+shekhirin@users.noreply.github.com>
Co-authored-by: Alexey Shekhirin <github@shekhirin.com>
2026-02-12 16:36:31 +00:00
Georgios Konstantopoulos
77cb99fc78 chore(node): update misleading consensus engine log message (#22124)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Emma Jamieson-Hoare <emmajam@users.noreply.github.com>
2026-02-12 16:14:03 +00:00
Georgios Konstantopoulos
66169c7e7c feat(reth-bench): add progress field to per-block benchmark logs (#22016)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 16:03:32 +00:00
Georgios Konstantopoulos
4f5fafc8f3 fix(net): correct EthMessageID::max for eth70 and later versions (#22076)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 15:53:11 +00:00
Georgios Konstantopoulos
0b8e6c6ed3 feat(net): enforce EIP-868 fork ID for discovered peers (#22013)
Co-authored-by: Emma <emma@tempo.xyz>
Co-authored-by: Matthias Seitz <mattsse@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Emma Jamieson-Hoare <ejamieson19@gmail.com>
Co-authored-by: Emma Jamieson-Hoare <emmajam@users.noreply.github.com>
2026-02-12 15:29:37 +00:00
Georgios Konstantopoulos
4a62d38af2 perf(engine): use sequential sig recovery for blocks with small blocks (#22077)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Ubuntu <ubuntu@dev-yk.tail388b2e.ts.net>
Co-authored-by: YK <chiayongkang@hotmail.com>
2026-02-12 15:06:21 +00:00
Georgios Konstantopoulos
dc4f249f09 chore: zero-pad thread indices in thread names (#22113)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 12:45:49 +00:00
Brian Picciano
c915841a45 chore(stateless): Remove reth-stateless crate (#22115) 2026-02-12 11:20:49 +00:00
Georgios Konstantopoulos
217a337d8c chore(engine): remove biased select in engine service loop (#21961)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
2026-02-12 05:45:45 +00:00
Georgios Konstantopoulos
74d57008b6 chore(engine): downgrade failed response delivery logs to warn (#22055)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 05:44:09 +00:00
Georgios Konstantopoulos
f8767bc678 fix(engine): add await_state_root span to timeout path (#22111)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 05:14:39 +00:00
Georgios Konstantopoulos
81c83bba68 refactor(engine): remove unnecessary turbofish on CachedStateProvider, add new_prewarm (#22107)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 02:48:57 +00:00
Georgios Konstantopoulos
cd8ec58703 refactor(engine): move CachedStateProvider prewarm to const generic (#22106)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 01:30:24 +00:00
DaniPopes
931b17c3fd chore: bump alloy-core deps (#22104) 2026-02-12 01:15:56 +00:00
Emma Jamieson-Hoare
807d328cf0 fix: move alloy-primitives to regular dependency in bin/reth (#22105)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 01:15:12 +00:00
Georgios Konstantopoulos
8a6bbd29fe fix(tracing): return error instead of panicking on log directory creation failure (#22100)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 00:40:39 +00:00
Georgios Konstantopoulos
8bedaaee71 feat(docker): include debug symbols in maxperf images (#22003)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-12 00:34:41 +00:00
Emma Jamieson-Hoare
09cd105671 fix(primitives): move feature-referenced deps from dev-dependencies to optional dependencies (#22103)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 23:50:56 +00:00
Georgios Konstantopoulos
a0b60b7e64 feat(evm): impl ExecutableTxTuple for Either via EitherTxIterator (#22102)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 23:48:17 +00:00
DaniPopes
90e15d096d perf: reduce tracing span noise in prewarm and proof workers (#22101) 2026-02-11 23:32:50 +00:00
Emma Jamieson-Hoare
a161ca294f feat(net): add reason label to backed_off_peers metric (#22009)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 23:00:20 +00:00
Emma Jamieson-Hoare
3a5c41e3da test: add WebSocket subscription integration tests for eth_subscribe (#22065)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 22:56:47 +00:00
Georgios Konstantopoulos
968d3c9534 revert: skip transaction prewarming for small blocks (#22059) (#22097) 2026-02-11 14:38:08 -08:00
DaniPopes
fc6666f6a7 perf: treat hashes as bytes in BranchNodeCompact (#22089) 2026-02-11 22:11:49 +00:00
DaniPopes
ff3a854326 perf: use dedicated trie rayon pool for proof workers (#22051) 2026-02-11 22:10:17 +00:00
DaniPopes
04543ed16b chore: add span and log to runtime build (#22064) 2026-02-11 22:06:14 +00:00
Emma Jamieson-Hoare
ae3f0d4d1a test: expand CLI integration tests (#22086)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-11 21:43:28 +00:00
176 changed files with 6936 additions and 4550 deletions

View File

@@ -0,0 +1,5 @@
---
reth-network: minor
---
Added reason label to backed_off_peers metric. The metric now tracks backed off peers by reason (too_many_peers, graceful_close, connection_error) to improve observability.

View File

@@ -0,0 +1,5 @@
---
ef-tests: patch
---
Removed reth-stateless crate and stateless validation from ef-tests.

View File

@@ -0,0 +1,4 @@
---
---
Added WebSocket subscription integration tests for eth_subscribe.

View File

@@ -0,0 +1,4 @@
---
---
Improved nightly Docker build failure Slack notification with more detailed formatting and context.

View File

@@ -0,0 +1,5 @@
---
reth-node-builder: patch
---
Removed biased select in engine service loop to allow fair scheduling of shutdown requests alongside event processing.

View File

@@ -0,0 +1,4 @@
---
---
Improved documentation overview page with better structure and clarity.

View File

@@ -0,0 +1,5 @@
---
reth-node-events: patch
---
Updated consensus engine log message to be more accurate about received updates.

View File

@@ -0,0 +1,9 @@
---
reth-network-api: minor
reth-network-types: minor
reth-network: minor
reth-node-core: minor
reth: minor
---
Added optional ENR fork ID enforcement to filter out peers from incompatible networks during peer discovery, controlled by the `--enforce-enr-fork-id` CLI flag.

View File

@@ -0,0 +1,5 @@
---
reth-primitives: patch
---
Moved feature-referenced dependencies from dev-dependencies to optional dependencies to ensure they are available when their corresponding features are enabled.

View File

@@ -0,0 +1,5 @@
---
reth-trie-sparse: patch
---
Fixed a bug where trie nodes could appear in both `updated_nodes` and `removed_nodes` simultaneously by removing entries from `removed_nodes` when a node is inserted as updated.

View File

@@ -0,0 +1,4 @@
---
---
Expanded CLI integration tests with subcommand help coverage, config TOML validation, genesis JSON validation, and send transaction round-trip test for dev mode.

View File

@@ -0,0 +1,5 @@
---
reth-trie: patch
---
Fixed a potential panic in `ProofCalculator` by clearing internal computation state (`branch_stack`, `child_stack`, `branch_path`, etc.) after errors, preventing stale state from causing `usize` underflow panics when the calculator is reused. Added a test verifying correct behavior after simulated mid-computation errors.

2
.github/CODEOWNERS vendored
View File

@@ -38,7 +38,7 @@ crates/storage/libmdbx-rs/ @shekhirin
crates/storage/nippy-jar/ @joshieDo @shekhirin
crates/storage/provider/ @joshieDo @shekhirin @yongkangc
crates/storage/storage-api/ @joshieDo
crates/tasks/ @mattsse
crates/tasks/ @mattsse @DaniPopes
crates/tokio-util/ @mattsse
crates/tracing/ @mattsse @shekhirin
crates/tracing-otlp/ @mattsse @Rjected

View File

@@ -27,7 +27,6 @@ crates_to_check=(
reth-ethereum-forks
reth-ethereum-primitives
reth-ethereum-consensus
reth-stateless
)
any_failed=0

View File

@@ -70,18 +70,27 @@ jobs:
# Add 'latest' tag for non-RC releases
if [[ ! "$VERSION" =~ -rc ]]; then
echo "ethereum_tags=${REGISTRY}/reth:${VERSION},${REGISTRY}/reth:latest" >> "$GITHUB_OUTPUT"
{
echo "ethereum_set<<EOF"
echo "ethereum.tags=${REGISTRY}/reth:${VERSION}"
echo "ethereum.tags=${REGISTRY}/reth:latest"
echo "EOF"
} >> "$GITHUB_OUTPUT"
else
echo "ethereum_tags=${REGISTRY}/reth:${VERSION}" >> "$GITHUB_OUTPUT"
echo "ethereum_set=ethereum.tags=${REGISTRY}/reth:${VERSION}" >> "$GITHUB_OUTPUT"
fi
elif [[ "${{ github.event_name }}" == "schedule" ]] || [[ "${{ inputs.build_type }}" == "nightly" ]]; then
echo "targets=nightly" >> "$GITHUB_OUTPUT"
echo "ethereum_tags=${REGISTRY}/reth:nightly" >> "$GITHUB_OUTPUT"
echo "ethereum_set=ethereum.tags=${REGISTRY}/reth:nightly" >> "$GITHUB_OUTPUT"
else
# git-sha build
echo "targets=ethereum" >> "$GITHUB_OUTPUT"
echo "ethereum_tags=${REGISTRY}/reth:${{ github.sha }}" >> "$GITHUB_OUTPUT"
echo "ethereum_set=ethereum.tags=${REGISTRY}/reth:${{ github.sha }}" >> "$GITHUB_OUTPUT"
fi
- name: Build and push images
@@ -97,7 +106,7 @@ jobs:
targets: ${{ steps.params.outputs.targets }}
push: ${{ !(github.event_name == 'workflow_dispatch' && inputs.dry_run) }}
set: |
ethereum.tags=${{ steps.params.outputs.ethereum_tags }}
${{ steps.params.outputs.ethereum_set }}
- name: Verify image architectures
env:
@@ -117,6 +126,18 @@ jobs:
- name: Slack Webhook Action
uses: rtCamp/action-slack-notify@v2
env:
SLACK_COLOR: ${{ job.status }}
SLACK_MESSAGE: "Failed run: https://github.com/paradigmxyz/reth/actions/runs/${{ github.run_id }}"
SLACK_COLOR: danger
SLACK_ICON_EMOJI: ":rotating_light:"
SLACK_USERNAME: "GitHub Actions"
SLACK_TITLE: ":rotating_light: Nightly Docker Build Failed"
SLACK_MESSAGE: |
The scheduled nightly Docker build failed.
*Commit:* `${{ github.sha }}`
*Branch:* `${{ github.ref_name }}`
*Run:* <https://github.com/paradigmxyz/reth/actions/runs/${{ github.run_id }}|View logs>
*Action required:* Re-run the workflow or investigate the build failure.
SLACK_FOOTER: "paradigmxyz/reth · docker.yml"
MSG_MINIMAL: true
SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK_URL }}

332
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
[workspace.package]
version = "1.10.2"
version = "1.11.3"
edition = "2024"
rust-version = "1.88"
license = "MIT OR Apache-2.0"
@@ -99,7 +99,6 @@ members = [
"crates/stages/api/",
"crates/stages/stages/",
"crates/stages/types/",
"crates/stateless",
"crates/static-file/static-file",
"crates/static-file/types/",
"crates/storage/codecs/",
@@ -307,6 +306,11 @@ inherits = "release"
lto = "fat"
codegen-units = 1
[profile.maxperf-symbols]
inherits = "maxperf"
debug = "full"
strip = "none"
[profile.reproducible]
inherits = "release"
panic = "abort"
@@ -415,7 +419,6 @@ reth-rpc-convert = { path = "crates/rpc/rpc-convert" }
reth-stages = { path = "crates/stages/stages" }
reth-stages-api = { path = "crates/stages/api" }
reth-stages-types = { path = "crates/stages/types", default-features = false }
reth-stateless = { path = "crates/stateless", default-features = false }
reth-static-file = { path = "crates/static-file/static-file" }
reth-static-file-types = { path = "crates/static-file/types", default-features = false }
reth-storage-api = { path = "crates/storage/storage-api", default-features = false }
@@ -445,15 +448,19 @@ op-revm = { version = "15.0.0", default-features = false }
revm-inspectors = "0.34.2"
# eth
alloy-dyn-abi = "1.5.4"
alloy-primitives = { version = "1.5.4", default-features = false, features = ["map-foldhash"] }
alloy-sol-types = { version = "1.5.4", default-features = false }
alloy-dyn-abi = "1.5.6"
alloy-primitives = { version = "1.5.6", default-features = false, features = [
"map-foldhash",
] }
alloy-sol-types = { version = "1.5.6", default-features = false }
alloy-chains = { version = "0.2.5", default-features = false }
alloy-eip2124 = { version = "0.2.0", default-features = false }
alloy-eip7928 = { version = "0.3.0", default-features = false }
alloy-evm = { version = "0.27.2", default-features = false }
alloy-rlp = { version = "0.3.10", default-features = false, features = ["core-net"] }
alloy-rlp = { version = "0.3.13", default-features = false, features = [
"core-net",
] }
alloy-trie = { version = "0.9.4", default-features = false }
alloy-hardforks = "0.4.5"
@@ -465,10 +472,15 @@ alloy-genesis = { version = "1.6.3", default-features = false }
alloy-json-rpc = { version = "1.6.3", default-features = false }
alloy-network = { version = "1.6.3", default-features = false }
alloy-network-primitives = { version = "1.6.3", default-features = false }
alloy-provider = { version = "1.6.3", features = ["reqwest", "debug-api"], default-features = false }
alloy-provider = { version = "1.6.3", features = [
"reqwest",
"debug-api",
], default-features = false }
alloy-pubsub = { version = "1.6.3", default-features = false }
alloy-rpc-client = { version = "1.6.3", default-features = false }
alloy-rpc-types = { version = "1.6.3", features = ["eth"], default-features = false }
alloy-rpc-types = { version = "1.6.3", features = [
"eth",
], default-features = false }
alloy-rpc-types-admin = { version = "1.6.3", default-features = false }
alloy-rpc-types-anvil = { version = "1.6.3", default-features = false }
alloy-rpc-types-beacon = { version = "1.6.3", default-features = false }
@@ -482,7 +494,9 @@ alloy-serde = { version = "1.6.3", default-features = false }
alloy-signer = { version = "1.6.3", default-features = false }
alloy-signer-local = { version = "1.6.3", default-features = false }
alloy-transport = { version = "1.6.3" }
alloy-transport-http = { version = "1.6.3", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-http = { version = "1.6.3", features = [
"reqwest-rustls-tls",
], default-features = false }
alloy-transport-ipc = { version = "1.6.3", default-features = false }
alloy-transport-ws = { version = "1.6.3", default-features = false }
@@ -501,7 +515,10 @@ either = { version = "1.15.0", default-features = false }
arrayvec = { version = "0.7.6", default-features = false }
aquamarine = "0.6"
auto_impl = "1"
backon = { version = "1.2", default-features = false, features = ["std-blocking-sleep", "tokio-sleep"] }
backon = { version = "1.2", default-features = false, features = [
"std-blocking-sleep",
"tokio-sleep",
] }
bincode = "1.3"
bitflags = "2.4"
boyer-moore-magiclen = "0.2.16"
@@ -523,9 +540,13 @@ itertools = { version = "0.14", default-features = false }
linked_hash_set = "0.1"
lz4 = "1.28.1"
modular-bitfield = "0.13.1"
notify = { version = "8.0.0", default-features = false, features = ["macos_fsevent"] }
notify = { version = "8.0.0", default-features = false, features = [
"macos_fsevent",
] }
nybbles = { version = "0.4.8", default-features = false }
once_cell = { version = "1.19", default-features = false, features = ["critical-section"] }
once_cell = { version = "1.19", default-features = false, features = [
"critical-section",
] }
parking_lot = "0.12"
paste = "1.0"
rand = "0.9"
@@ -544,7 +565,9 @@ strum_macros = "0.27"
syn = "2.0"
thiserror = { version = "2.0.0", default-features = false }
tar = "0.4.44"
tracing = { version = "0.1.0", default-features = false }
tracing = { version = "0.1.0", default-features = false, features = [
"attributes",
] }
tracing-appender = "0.2"
url = { version = "2.3", default-features = false }
zstd = "0.13"
@@ -582,7 +605,11 @@ futures-util = { version = "0.3", default-features = false }
hyper = "1.3"
hyper-util = "0.1.5"
pin-project = "1.0.12"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "rustls-tls-native-roots", "stream"] }
reqwest = { version = "0.12", default-features = false, features = [
"rustls-tls",
"rustls-tls-native-roots",
"stream",
] }
tracing-futures = "0.2"
tower = "0.5"
tower-http = "0.6"
@@ -607,7 +634,10 @@ proptest-arbitrary-interop = "0.1.0"
# crypto
enr = { version = "0.13", default-features = false }
k256 = { version = "0.13", default-features = false, features = ["ecdsa"] }
secp256k1 = { version = "0.30", default-features = false, features = ["global-context", "recovery"] }
secp256k1 = { version = "0.30", default-features = false, features = [
"global-context",
"recovery",
] }
# rand 8 for secp256k1
rand_08 = { package = "rand", version = "0.8" }

View File

@@ -192,6 +192,15 @@ impl Command {
parent_header = block.header;
parent_hash = block_hash;
blocks_processed += 1;
let progress = match mode {
RampMode::Blocks(total) => format!("{blocks_processed}/{total}"),
RampMode::TargetGasLimit(target) => {
let pct = (parent_header.gas_limit as f64 / target as f64 * 100.0).min(100.0);
format!("{pct:.1}%")
}
};
info!(target: "reth-bench", progress, block_number = parent_header.number, gas_limit = parent_header.gas_limit, "Block processed");
}
let final_gas_limit = parent_header.gas_limit;

View File

@@ -153,6 +153,7 @@ impl Command {
..
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let total_blocks = benchmark_mode.total_blocks();
let buffer_size = self.rpc_block_buffer_size;
// Use a oneshot channel to propagate errors from the spawned task
@@ -206,6 +207,7 @@ impl Command {
});
let mut results = Vec::new();
let mut blocks_processed = 0u64;
let total_benchmark_duration = Instant::now();
let mut total_wait_time = Duration::ZERO;
@@ -249,8 +251,13 @@ impl Command {
// Exclude time spent waiting on the block prefetch channel from the benchmark duration.
// We want to measure engine throughput, not RPC fetch latency.
blocks_processed += 1;
let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
info!(target: "reth-bench", %combined_result);
let progress = match total_blocks {
Some(total) => format!("{blocks_processed}/{total}"),
None => format!("{blocks_processed}"),
};
info!(target: "reth-bench", progress, %combined_result);
if let Some(w) = &mut waiter {
w.on_block(block_number).await?;

View File

@@ -52,6 +52,7 @@ impl Command {
..
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let total_blocks = benchmark_mode.total_blocks();
let buffer_size = self.rpc_block_buffer_size;
// Use a oneshot channel to propagate errors from the spawned task
@@ -82,8 +83,8 @@ impl Command {
}
});
// put results in a summary vec so they can be printed at the end
let mut results = Vec::new();
let mut blocks_processed = 0u64;
let total_benchmark_duration = Instant::now();
let mut total_wait_time = Duration::ZERO;
@@ -105,7 +106,12 @@ impl Command {
call_new_payload(&auth_provider, version, params).await?;
let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
info!(target: "reth-bench", %new_payload_result);
blocks_processed += 1;
let progress = match total_blocks {
Some(total) => format!("{blocks_processed}/{total}"),
None => format!("{blocks_processed}"),
};
info!(target: "reth-bench", progress, %new_payload_result);
// current duration since the start of the benchmark minus the time
// waiting for blocks

View File

@@ -341,7 +341,8 @@ impl Command {
};
let current_duration = total_benchmark_duration.elapsed();
info!(target: "reth-bench", %combined_result);
let progress = format!("{}/{}", i + 1, payloads.len());
info!(target: "reth-bench", progress, %combined_result);
if let Some(w) = &mut waiter {
w.on_block(block_number).await?;

View File

@@ -20,6 +20,19 @@ impl BenchMode {
}
}
/// Returns the total number of blocks in the benchmark, if known.
///
/// For [`BenchMode::Range`] this is the length of the range.
/// For [`BenchMode::Continuous`] the total is unbounded, so `None` is returned.
pub const fn total_blocks(&self) -> Option<u64> {
match self {
Self::Continuous(_) => None,
Self::Range(range) => {
Some(range.end().saturating_sub(*range.start()).saturating_add(1))
}
}
}
/// Create a [`BenchMode`] from optional `from` and `to` fields.
pub fn new(from: Option<u64>, to: Option<u64>, latest_block: u64) -> Result<Self, eyre::Error> {
// If neither `--from` nor `--to` are provided, we will run the benchmark continuously,

View File

@@ -58,6 +58,7 @@ reth-node-metrics.workspace = true
reth-consensus.workspace = true
# alloy
alloy-primitives.workspace = true
alloy-rpc-types = { workspace = true, features = ["engine"] }
# tracing
@@ -70,9 +71,12 @@ clap = { workspace = true, features = ["derive", "env"] }
[dev-dependencies]
alloy-node-bindings = "1.6.3"
alloy-provider = { workspace = true, features = ["reqwest"] }
alloy-rpc-types-eth.workspace = true
backon.workspace = true
serde_json.workspace = true
tempfile.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
toml.workspace = true
[features]
default = [
@@ -109,10 +113,12 @@ asm-keccak = [
"reth-primitives/asm-keccak",
"reth-ethereum-cli/asm-keccak",
"reth-node-ethereum/asm-keccak",
"alloy-primitives/asm-keccak",
]
keccak-cache-global = [
"reth-node-core/keccak-cache-global",
"reth-node-ethereum/keccak-cache-global",
"alloy-primitives/keccak-cache-global",
]
jemalloc = [
"reth-cli-util/jemalloc",

View File

@@ -51,6 +51,9 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg))]
// Used in feature flags only (`asm-keccak`, `keccak-cache-global`)
use alloy_primitives as _;
pub mod cli;
/// Re-exported utils.

View File

@@ -4,28 +4,60 @@ use std::process::Command;
const RETH: &str = env!("CARGO_BIN_EXE_reth");
// ── Helpers ──────────────────────────────────────────────────────────────────
/// Runs `reth <args>` and returns stdout, asserting exit code 0.
///
/// Tracing is suppressed via `RUST_LOG=off` so that log lines emitted during
/// binary startup don't pollute stdout-based assertions.
#[track_caller]
fn reth_ok(args: &[&str]) -> String {
let output = Command::new(RETH).env("RUST_LOG", "off").args(args).output().unwrap();
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(output.status.success(), "args {args:?} failed.\nstdout: {stdout}\nstderr: {stderr}");
stdout.into_owned()
}
/// Spawns an isolated dev-mode reth node.
///
/// Discovery is disabled and peer limits are zeroed so the node is fully
/// isolated. Each call gets a unique temporary data directory so that
/// concurrent test runs never collide on the default `reth/dev/` path.
fn spawn_dev() -> (alloy_node_bindings::RethInstance, tempfile::TempDir) {
use alloy_node_bindings::Reth;
let datadir = tempfile::tempdir().expect("failed to create temp dir");
let instance = Reth::at(RETH)
.dev()
.disable_discovery()
.data_dir(datadir.path())
.args(["--max-outbound-peers", "0", "--max-inbound-peers", "0"])
.spawn();
// Return the TempDir alongside the instance so it lives as long as the node.
(instance, datadir)
}
// ── Original tests (from PR #22069) ──────────────────────────────────────────
#[test]
fn help() {
let output = Command::new(RETH).arg("--help").output().unwrap();
let stdout = String::from_utf8_lossy(&output.stdout);
assert!(output.status.success());
let stdout = reth_ok(&["--help"]);
assert!(stdout.contains("Usage"), "stdout: {stdout}");
assert!(stdout.contains("node"), "stdout: {stdout}");
}
#[test]
fn version() {
let output = Command::new(RETH).arg("--version").output().unwrap();
let stdout = String::from_utf8_lossy(&output.stdout);
assert!(output.status.success());
let stdout = reth_ok(&["--version"]);
assert!(stdout.to_lowercase().contains("reth"), "stdout: {stdout}");
}
#[test]
fn node_help() {
let output = Command::new(RETH).args(["node", "--help"]).output().unwrap();
let stdout = String::from_utf8_lossy(&output.stdout);
assert!(output.status.success());
let stdout = reth_ok(&["node", "--help"]);
assert!(stdout.contains("--dev"), "stdout: {stdout}");
assert!(stdout.contains("--http"), "stdout: {stdout}");
}
@@ -46,22 +78,178 @@ fn unknown_flag() {
#[tokio::test]
async fn dev_node_eth_syncing() {
use alloy_node_bindings::Reth;
use alloy_provider::{Provider, ProviderBuilder};
let reth = Reth::at(RETH)
.dev()
.disable_discovery()
.args(["--max-outbound-peers", "0", "--max-inbound-peers", "0"])
.spawn();
let (reth, _datadir) = spawn_dev();
let provider = ProviderBuilder::new().connect_http(reth.endpoint().parse().unwrap());
// give the node a moment to fully initialize
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// eth_syncing should not fail on a dev node
let _syncing = provider.syncing().await.expect("eth_syncing failed");
}
// ── Subcommand --help coverage ───────────────────────────────────────────────
//
// Every registered subcommand must produce valid --help output. This catches
// clap wiring regressions (e.g. a missing field, a conflicting arg name, or a
// broken `help_message()` call) that would otherwise only surface when a user
// runs the command.
#[test]
fn init_help() {
let stdout = reth_ok(&["init", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn init_state_help() {
let stdout = reth_ok(&["init-state", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn import_help() {
let stdout = reth_ok(&["import", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn import_era_help() {
let stdout = reth_ok(&["import-era", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn export_era_help() {
let stdout = reth_ok(&["export-era", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn dump_genesis_help() {
let stdout = reth_ok(&["dump-genesis", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn db_help() {
let stdout = reth_ok(&["db", "--help"]);
assert!(stdout.contains("stats"), "stdout: {stdout}");
}
#[test]
fn stage_help() {
let stdout = reth_ok(&["stage", "--help"]);
assert!(stdout.contains("run"), "stdout: {stdout}");
}
#[test]
fn p2p_help() {
let stdout = reth_ok(&["p2p", "--help"]);
assert!(stdout.contains("header"), "stdout: {stdout}");
}
#[test]
fn config_help() {
let stdout = reth_ok(&["config", "--help"]);
assert!(stdout.contains("--default"), "stdout: {stdout}");
}
#[test]
fn prune_help() {
let stdout = reth_ok(&["prune", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn download_help() {
let stdout = reth_ok(&["download", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
#[test]
fn re_execute_help() {
let stdout = reth_ok(&["re-execute", "--help"]);
assert!(stdout.contains("--chain"), "stdout: {stdout}");
}
// ── `config --default` outputs valid TOML ────────────────────────────────────
#[test]
fn config_default_valid_toml() {
let stdout = reth_ok(&["config", "--default"]);
let parsed: toml::Value =
toml::from_str(&stdout).expect("config --default did not produce valid TOML");
// The default config must contain the [stages] table — this is the heart of
// the pipeline configuration and its absence would indicate a serialization
// regression.
assert!(parsed.get("stages").is_some(), "missing [stages] in config output");
}
// ── `dump-genesis` outputs valid JSON ────────────────────────────────────────
#[test]
fn dump_genesis_mainnet_valid_json() {
let stdout = reth_ok(&["dump-genesis"]);
let genesis: serde_json::Value =
serde_json::from_str(&stdout).expect("dump-genesis did not produce valid JSON");
assert!(genesis.get("nonce").is_some(), "missing nonce in genesis JSON");
assert!(genesis.get("alloc").is_some(), "missing alloc in genesis JSON");
}
#[test]
fn dump_genesis_sepolia_valid_json() {
let stdout = reth_ok(&["dump-genesis", "--chain", "sepolia"]);
let genesis: serde_json::Value = serde_json::from_str(&stdout)
.expect("dump-genesis --chain sepolia did not produce valid JSON");
assert!(genesis.get("alloc").is_some(), "missing alloc in sepolia genesis JSON");
}
// ── Dev node: send transaction round-trip ────────────────────────────────────
//
// Exercises the full pipeline: RPC submission → mempool → sealing → execution →
// receipt retrieval. Uses the pre-funded dev account so no genesis customization
// is required.
#[tokio::test]
async fn dev_node_send_tx_and_mine() {
use alloy_primitives::{Address, U256};
use alloy_provider::{Provider, ProviderBuilder};
use alloy_rpc_types_eth::TransactionRequest;
let (reth, _datadir) = spawn_dev();
let provider = ProviderBuilder::new().connect_http(reth.endpoint().parse().unwrap());
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// Dev mode pre-funds the first dev account.
let accounts = provider.get_accounts().await.expect("eth_accounts failed");
assert!(!accounts.is_empty(), "dev node should expose at least one account");
let sender = accounts[0];
let recipient = Address::with_last_byte(0x42);
let tx = TransactionRequest::default().from(sender).to(recipient).value(U256::from(1_000_000));
let tx_hash = provider.send_transaction(tx).await.expect("eth_sendTransaction failed");
// In dev/instant-mine mode the node seals a block for each transaction, so
// the receipt becomes available almost immediately.
let receipt = tx_hash.get_receipt().await.expect("failed to get receipt");
assert!(receipt.status(), "transaction should have succeeded");
assert_eq!(receipt.to, Some(recipient));
assert!(receipt.block_number.unwrap() > 0, "receipt should be in a mined block");
// Verify the transfer actually mutated state.
let balance = provider.get_balance(recipient).await.expect("eth_getBalance failed");
assert_eq!(balance, U256::from(1_000_000));
}
const fn main() {}

View File

@@ -312,6 +312,11 @@ impl DeferredTrieData {
/// Given that invariant, circular wait dependencies are impossible.
#[instrument(level = "debug", target = "engine::tree::deferred_trie", skip_all)]
pub fn wait_cloned(&self) -> ComputedTrieData {
#[cfg(feature = "rayon")]
debug_assert!(
rayon::current_thread_index().is_none(),
"wait_cloned must not be called from a rayon worker thread"
);
let mut state = self.state.lock();
match &mut *state {
// If the deferred trie data is ready, return the cached result.

View File

@@ -1061,6 +1061,14 @@ mod tests {
) -> ProviderResult<Option<StorageValue>> {
Ok(None)
}
fn storage_by_hashed_key(
&self,
_address: Address,
_hashed_storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
Ok(None)
}
}
impl BytecodeReader for MockStateProvider {

View File

@@ -223,6 +223,26 @@ impl<N: NodePrimitives> StateProvider for MemoryOverlayStateProviderRef<'_, N> {
self.historical.storage(address, storage_key)
}
fn storage_by_hashed_key(
&self,
address: Address,
hashed_storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
let hashed_address = keccak256(address);
let state = &self.trie_input().state;
if let Some(hs) = state.storages.get(&hashed_address) {
if let Some(value) = hs.storage.get(&hashed_storage_key) {
return Ok(Some(*value));
}
if hs.wiped {
return Ok(Some(StorageValue::ZERO));
}
}
self.historical.storage_by_hashed_key(address, hashed_storage_key)
}
}
impl<N: NodePrimitives> BytecodeReader for MemoryOverlayStateProviderRef<'_, N> {

View File

@@ -19,7 +19,7 @@ use reth_node_builder::{
Node, NodeComponents, NodeComponentsBuilder, NodeTypes, NodeTypesWithDBAdapter,
};
use reth_node_core::{
args::{DatabaseArgs, DatadirArgs, RocksDbArgs, StaticFilesArgs, StorageArgs},
args::{DatabaseArgs, DatadirArgs, StaticFilesArgs, StorageArgs},
dirs::{ChainPath, DataDirPath},
};
use reth_provider::{
@@ -67,62 +67,23 @@ pub struct EnvironmentArgs<C: ChainSpecParser> {
#[command(flatten)]
pub static_files: StaticFilesArgs,
/// All `RocksDB` related arguments
#[command(flatten)]
pub rocksdb: RocksDbArgs,
/// Storage mode configuration (v2 vs v1/legacy)
#[command(flatten)]
pub storage: StorageArgs,
}
impl<C: ChainSpecParser> EnvironmentArgs<C> {
/// Returns the effective storage settings derived from `--storage.v2`, static-file, and
/// `RocksDB` CLI args.
/// Returns the effective storage settings derived from `--storage.v2`.
///
/// The base storage mode is determined by `--storage.v2`:
/// - When `--storage.v2` is set: uses [`StorageSettings::v2()`] defaults
/// - Otherwise: uses [`StorageSettings::v1()`] defaults
///
/// Individual `--static-files.*` and `--rocksdb.*` flags override the base when explicitly set.
/// - Otherwise: uses [`StorageSettings::base()`] defaults
pub fn storage_settings(&self) -> StorageSettings {
let mut s = if self.storage.v2 { StorageSettings::v2() } else { StorageSettings::base() };
// Apply static files overrides (only when explicitly set)
if let Some(v) = self.static_files.receipts {
s = s.with_receipts_in_static_files(v);
if self.storage.v2 {
StorageSettings::v2()
} else {
StorageSettings::base()
}
if let Some(v) = self.static_files.transaction_senders {
s = s.with_transaction_senders_in_static_files(v);
}
if let Some(v) = self.static_files.account_changesets {
s = s.with_account_changesets_in_static_files(v);
}
if let Some(v) = self.static_files.storage_changesets {
s = s.with_storage_changesets_in_static_files(v);
}
// Apply rocksdb overrides
// --rocksdb.all sets all rocksdb flags to true
if self.rocksdb.all {
s = s
.with_transaction_hash_numbers_in_rocksdb(true)
.with_storages_history_in_rocksdb(true)
.with_account_history_in_rocksdb(true);
}
// Individual rocksdb flags override --rocksdb.all when explicitly set
if let Some(v) = self.rocksdb.tx_hash {
s = s.with_transaction_hash_numbers_in_rocksdb(v);
}
if let Some(v) = self.rocksdb.storages_history {
s = s.with_storages_history_in_rocksdb(v);
}
if let Some(v) = self.rocksdb.account_history {
s = s.with_account_history_in_rocksdb(v);
}
s
}
/// Initializes environment according to [`AccessRights`] and returns an instance of

View File

@@ -5,6 +5,7 @@ use reth_codecs::Compact;
use reth_db_api::{cursor::DbDupCursorRO, database::Database, tables, transaction::DbTx};
use reth_db_common::DbTool;
use reth_node_builder::NodeTypesWithDB;
use reth_storage_api::StorageSettingsCache;
use std::time::{Duration, Instant};
use tracing::info;
@@ -22,52 +23,94 @@ impl Command {
/// Execute `db account-storage` command
pub fn execute<N: NodeTypesWithDB>(self, tool: &DbTool<N>) -> eyre::Result<()> {
let address = self.address;
let (slot_count, plain_size) = tool.provider_factory.db_ref().view(|tx| {
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let mut count = 0usize;
let mut total_value_bytes = 0usize;
let mut last_log = Instant::now();
let use_hashed_state = tool.provider_factory.cached_storage_settings().use_hashed_state();
// Walk all storage entries for this address
let walker = cursor.walk_dup(Some(address), None)?;
for entry in walker {
let (_, storage_entry) = entry?;
count += 1;
// StorageEntry encodes as: 32 bytes (key/subkey uncompressed) + compressed U256
let mut buf = Vec::new();
let entry_len = storage_entry.to_compact(&mut buf);
total_value_bytes += entry_len;
let (slot_count, storage_size) = if use_hashed_state {
let hashed_address = keccak256(address);
tool.provider_factory.db_ref().view(|tx| {
let mut cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
let mut count = 0usize;
let mut total_value_bytes = 0usize;
let mut last_log = Instant::now();
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
address = %address,
slots = count,
key = %storage_entry.key,
"Processing storage slots"
);
last_log = Instant::now();
let walker = cursor.walk_dup(Some(hashed_address), None)?;
for entry in walker {
let (_, storage_entry) = entry?;
count += 1;
let mut buf = Vec::new();
let entry_len = storage_entry.to_compact(&mut buf);
total_value_bytes += entry_len;
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
address = %address,
slots = count,
key = %storage_entry.key,
"Processing hashed storage slots"
);
last_log = Instant::now();
}
}
}
// Add 20 bytes for the Address key (stored once per account in dupsort)
let total_size = if count > 0 { 20 + total_value_bytes } else { 0 };
let total_size = if count > 0 { 32 + total_value_bytes } else { 0 };
Ok::<_, eyre::Report>((count, total_size))
})??;
Ok::<_, eyre::Report>((count, total_size))
})??
} else {
tool.provider_factory.db_ref().view(|tx| {
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let mut count = 0usize;
let mut total_value_bytes = 0usize;
let mut last_log = Instant::now();
// Estimate hashed storage size: 32-byte B256 key instead of 20-byte Address
let hashed_size_estimate = if slot_count > 0 { plain_size + 12 } else { 0 };
let total_estimate = plain_size + hashed_size_estimate;
// Walk all storage entries for this address
let walker = cursor.walk_dup(Some(address), None)?;
for entry in walker {
let (_, storage_entry) = entry?;
count += 1;
let mut buf = Vec::new();
// StorageEntry encodes as: 32 bytes (key/subkey uncompressed) + compressed U256
let entry_len = storage_entry.to_compact(&mut buf);
total_value_bytes += entry_len;
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
address = %address,
slots = count,
key = %storage_entry.key,
"Processing storage slots"
);
last_log = Instant::now();
}
}
// Add 20 bytes for the Address key (stored once per account in dupsort)
let total_size = if count > 0 { 20 + total_value_bytes } else { 0 };
Ok::<_, eyre::Report>((count, total_size))
})??
};
let hashed_address = keccak256(address);
println!("Account: {address}");
println!("Hashed address: {hashed_address}");
println!("Storage slots: {slot_count}");
println!("Plain storage size: {} (estimated)", human_bytes(plain_size as f64));
println!("Hashed storage size: {} (estimated)", human_bytes(hashed_size_estimate as f64));
println!("Total estimated size: {}", human_bytes(total_estimate as f64));
if use_hashed_state {
println!("Hashed storage size: {} (estimated)", human_bytes(storage_size as f64));
} else {
// Estimate hashed storage size: 32-byte B256 key instead of 20-byte Address
let hashed_size_estimate = if slot_count > 0 { storage_size + 12 } else { 0 };
let total_estimate = storage_size + hashed_size_estimate;
println!("Plain storage size: {} (estimated)", human_bytes(storage_size as f64));
println!(
"Hashed storage size: {} (estimated)",
human_bytes(hashed_size_estimate as f64)
);
println!("Total estimated size: {}", human_bytes(total_estimate as f64));
}
Ok(())
}

View File

@@ -0,0 +1,61 @@
use clap::Parser;
use reth_db::mdbx::{self, ffi};
use std::path::PathBuf;
/// Copies the MDBX database to a new location.
///
/// Equivalent to the standalone `mdbx_copy` tool but bundled into reth.
#[derive(Parser, Debug)]
pub struct Command {
/// Destination path for the database copy.
dest: PathBuf,
/// Compact the database while copying (reclaims free space).
#[arg(short, long)]
compact: bool,
/// Force dynamic size for the destination database.
#[arg(short = 'd', long)]
force_dynamic_size: bool,
/// Throttle to avoid MVCC pressure on writers.
#[arg(short = 'p', long)]
throttle_mvcc: bool,
}
impl Command {
/// Execute `db copy` command
pub fn execute(self, db: &mdbx::DatabaseEnv) -> eyre::Result<()> {
let mut flags: ffi::MDBX_copy_flags_t = ffi::MDBX_CP_DEFAULTS;
if self.compact {
flags |= ffi::MDBX_CP_COMPACT;
}
if self.force_dynamic_size {
flags |= ffi::MDBX_CP_FORCE_DYNAMIC_SIZE;
}
if self.throttle_mvcc {
flags |= ffi::MDBX_CP_THROTTLE_MVCC;
}
let dest = self
.dest
.to_str()
.ok_or_else(|| eyre::eyre!("destination path must be valid UTF-8"))?;
let dest_cstr = std::ffi::CString::new(dest)?;
println!("Copying database to {} ...", self.dest.display());
let rc = db.with_raw_env_ptr(|env_ptr| unsafe {
ffi::mdbx_env_copy(env_ptr, dest_cstr.as_ptr(), flags)
});
if rc != 0 {
eyre::bail!("mdbx_env_copy failed with error code {rc}: {}", unsafe {
std::ffi::CStr::from_ptr(ffi::mdbx_strerror(rc)).to_string_lossy()
});
}
println!("Done.");
Ok(())
}
}

View File

@@ -98,7 +98,8 @@ impl Command {
)?;
if let Some(entry) = entry {
println!("{}", serde_json::to_string_pretty(&entry)?);
let se: reth_primitives_traits::StorageEntry = entry.into();
println!("{}", serde_json::to_string_pretty(&se)?);
} else {
error!(target: "reth::cli", "No content for the given table key.");
}
@@ -106,7 +107,14 @@ impl Command {
}
let changesets = provider.storage_changeset(key.block_number())?;
println!("{}", serde_json::to_string_pretty(&changesets)?);
let serializable: Vec<_> = changesets
.into_iter()
.map(|(addr, entry)| {
let se: reth_primitives_traits::StorageEntry = entry.into();
(addr, se)
})
.collect();
println!("{}", serde_json::to_string_pretty(&serializable)?);
return Ok(());
}

View File

@@ -12,6 +12,7 @@ use std::{
mod account_storage;
mod checksum;
mod clear;
mod copy;
mod diff;
mod get;
mod list;
@@ -42,6 +43,8 @@ pub enum Subcommands {
List(list::Command),
/// Calculates the content checksum of a table or static file segment
Checksum(checksum::Command),
/// Copies the MDBX database to a new location (bundled mdbx_copy)
Copy(copy::Command),
/// Create a diff between two database tables or two entire databases.
Diff(diff::Command),
/// Gets the content of a table for the given key
@@ -124,6 +127,11 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
command.execute(&tool)?;
});
}
Subcommands::Copy(command) => {
db_exec!(self.env, tool, N, AccessRights::RO, {
command.execute(tool.provider_factory.db_ref())?;
});
}
Subcommands::Diff(command) => {
db_exec!(self.env, tool, N, AccessRights::RO, {
command.execute(&tool)?;

View File

@@ -39,50 +39,12 @@ enum Subcommands {
#[derive(Debug, Clone, Copy, Subcommand)]
#[clap(rename_all = "snake_case")]
pub enum SetCommand {
/// Store receipts in static files instead of the database
Receipts {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store transaction senders in static files instead of the database
TransactionSenders {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store account changesets in static files instead of the database
AccountChangesets {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store storage history in rocksdb instead of MDBX
StoragesHistory {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store transaction hash to number mapping in rocksdb instead of MDBX
TransactionHashNumbers {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store account history in rocksdb instead of MDBX
AccountHistory {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store storage changesets in static files instead of the database
StorageChangesets {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Use hashed state tables (HashedAccounts/HashedStorages) as canonical state
/// Enable or disable v2 storage layout
///
/// When enabled, execution writes directly to hashed tables, eliminating need for
/// separate hashing stages. State reads come from hashed tables.
///
/// WARNING: Changing this setting in either direction requires re-syncing the database.
/// Enabling on an existing plain-state database leaves hashed tables empty.
/// Disabling on an existing hashed-state database leaves plain tables empty.
UseHashedState {
/// When enabled, uses static files for receipts/senders/changesets and RocksDB for
/// history indices and transaction hashes. When disabled, uses v1/legacy layout (everything in
/// MDBX).
V2 {
#[clap(action(ArgAction::Set))]
value: bool,
},
@@ -125,87 +87,18 @@ impl Command {
println!("No storage settings found, creating new settings.");
}
let mut settings @ StorageSettings {
receipts_in_static_files: _,
transaction_senders_in_static_files: _,
storages_history_in_rocksdb: _,
transaction_hash_numbers_in_rocksdb: _,
account_history_in_rocksdb: _,
account_changesets_in_static_files: _,
storage_changesets_in_static_files: _,
use_hashed_state: _,
} = settings.unwrap_or_else(StorageSettings::v1);
let mut settings @ StorageSettings { storage_v2: _ } =
settings.unwrap_or_else(StorageSettings::v1);
// Update the setting based on the key
match cmd {
SetCommand::Receipts { value } => {
if settings.receipts_in_static_files == value {
println!("receipts_in_static_files is already set to {}", value);
SetCommand::V2 { value } => {
if settings.storage_v2 == value {
println!("storage_v2 is already set to {}", value);
return Ok(());
}
settings.receipts_in_static_files = value;
println!("Set receipts_in_static_files = {}", value);
}
SetCommand::TransactionSenders { value } => {
if settings.transaction_senders_in_static_files == value {
println!("transaction_senders_in_static_files is already set to {}", value);
return Ok(());
}
settings.transaction_senders_in_static_files = value;
println!("Set transaction_senders_in_static_files = {}", value);
}
SetCommand::AccountChangesets { value } => {
if settings.account_changesets_in_static_files == value {
println!("account_changesets_in_static_files is already set to {}", value);
return Ok(());
}
settings.account_changesets_in_static_files = value;
println!("Set account_changesets_in_static_files = {}", value);
}
SetCommand::StoragesHistory { value } => {
if settings.storages_history_in_rocksdb == value {
println!("storages_history_in_rocksdb is already set to {}", value);
return Ok(());
}
settings.storages_history_in_rocksdb = value;
println!("Set storages_history_in_rocksdb = {}", value);
}
SetCommand::TransactionHashNumbers { value } => {
if settings.transaction_hash_numbers_in_rocksdb == value {
println!("transaction_hash_numbers_in_rocksdb is already set to {}", value);
return Ok(());
}
settings.transaction_hash_numbers_in_rocksdb = value;
println!("Set transaction_hash_numbers_in_rocksdb = {}", value);
}
SetCommand::AccountHistory { value } => {
if settings.account_history_in_rocksdb == value {
println!("account_history_in_rocksdb is already set to {}", value);
return Ok(());
}
settings.account_history_in_rocksdb = value;
println!("Set account_history_in_rocksdb = {}", value);
}
SetCommand::StorageChangesets { value } => {
if settings.storage_changesets_in_static_files == value {
println!("storage_changesets_in_static_files is already set to {}", value);
return Ok(());
}
settings.storage_changesets_in_static_files = value;
println!("Set storage_changesets_in_static_files = {}", value);
}
SetCommand::UseHashedState { value } => {
if settings.use_hashed_state == value {
println!("use_hashed_state is already set to {}", value);
return Ok(());
}
if settings.use_hashed_state && !value {
println!("WARNING: Disabling use_hashed_state on an existing hashed-state database requires a full resync.");
} else {
println!("WARNING: Enabling use_hashed_state on an existing plain-state database requires a full resync.");
}
settings.use_hashed_state = value;
println!("Set use_hashed_state = {}", value);
settings.storage_v2 = value;
println!("Set storage_v2 = {}", value);
}
}

View File

@@ -1,4 +1,4 @@
use alloy_primitives::{Address, BlockNumber, B256, U256};
use alloy_primitives::{keccak256, Address, BlockNumber, B256, U256};
use clap::Parser;
use parking_lot::Mutex;
use reth_db_api::{
@@ -63,39 +63,65 @@ impl Command {
address: Address,
limit: usize,
) -> eyre::Result<()> {
let use_hashed_state = tool.provider_factory.cached_storage_settings().use_hashed_state();
let entries = tool.provider_factory.db_ref().view(|tx| {
// Get account info
let account = tx.get::<tables::PlainAccountState>(address)?;
// Get storage entries
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let mut entries = Vec::new();
let mut last_log = Instant::now();
let walker = cursor.walk_dup(Some(address), None)?;
for (idx, entry) in walker.enumerate() {
let (_, storage_entry) = entry?;
if storage_entry.value != U256::ZERO {
entries.push((storage_entry.key, storage_entry.value));
let (account, walker_entries) = if use_hashed_state {
let hashed_address = keccak256(address);
let account = tx.get::<tables::HashedAccounts>(hashed_address)?;
let mut cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
let walker = cursor.walk_dup(Some(hashed_address), None)?;
let mut entries = Vec::new();
let mut last_log = Instant::now();
for (idx, entry) in walker.enumerate() {
let (_, storage_entry) = entry?;
if storage_entry.value != U256::ZERO {
entries.push((storage_entry.key, storage_entry.value));
}
if entries.len() >= limit {
break;
}
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
address = %address,
slots_scanned = idx,
"Scanning storage slots"
);
last_log = Instant::now();
}
}
if entries.len() >= limit {
break;
(account, entries)
} else {
// Get account info
let account = tx.get::<tables::PlainAccountState>(address)?;
// Get storage entries
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let walker = cursor.walk_dup(Some(address), None)?;
let mut entries = Vec::new();
let mut last_log = Instant::now();
for (idx, entry) in walker.enumerate() {
let (_, storage_entry) = entry?;
if storage_entry.value != U256::ZERO {
entries.push((storage_entry.key, storage_entry.value));
}
if entries.len() >= limit {
break;
}
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
address = %address,
slots_scanned = idx,
"Scanning storage slots"
);
last_log = Instant::now();
}
}
(account, entries)
};
if last_log.elapsed() >= LOG_INTERVAL {
info!(
target: "reth::cli",
address = %address,
slots_scanned = idx,
"Scanning storage slots"
);
last_log = Instant::now();
}
}
Ok::<_, eyre::Report>((account, entries))
Ok::<_, eyre::Report>((account, walker_entries))
})??;
let (account, storage_entries) = entries;
@@ -119,7 +145,7 @@ impl Command {
// Check storage settings to determine where history is stored
let storage_settings = tool.provider_factory.cached_storage_settings();
let history_in_rocksdb = storage_settings.storages_history_in_rocksdb;
let history_in_rocksdb = storage_settings.storage_v2;
// For historical queries, enumerate keys from history indices only
// (not PlainStorageState, which reflects current state)

View File

@@ -37,6 +37,14 @@ pub struct DownloadDefaults {
pub available_snapshots: Vec<Cow<'static, str>>,
/// Default base URL for snapshots
pub default_base_url: Cow<'static, str>,
/// Default base URL for chain-aware snapshots.
///
/// When set, the chain ID is appended to form the full URL: `{base_url}/{chain_id}`.
/// For example, given a base URL of `https://snapshots.example.com` and chain ID `1`,
/// the resulting URL would be `https://snapshots.example.com/1`.
///
/// Falls back to [`default_base_url`](Self::default_base_url) when `None`.
pub default_chain_aware_base_url: Option<Cow<'static, str>>,
/// Optional custom long help text that overrides the generated help
pub long_help: Option<String>,
}
@@ -60,6 +68,7 @@ impl DownloadDefaults {
Cow::Borrowed("https://publicnode.com/snapshots (full nodes & testnets)"),
],
default_base_url: Cow::Borrowed(MERKLE_BASE_URL),
default_chain_aware_base_url: None,
long_help: None,
}
}
@@ -84,9 +93,11 @@ impl DownloadDefaults {
}
help.push_str(
"\nIf no URL is provided, the latest mainnet archive snapshot\nwill be proposed for download from ",
"\nIf no URL is provided, the latest archive snapshot for the selected chain\nwill be proposed for download from ",
);
help.push_str(
self.default_chain_aware_base_url.as_deref().unwrap_or(&self.default_base_url),
);
help.push_str(self.default_base_url.as_ref());
help.push_str(
".\n\nLocal file:// URLs are also supported for extracting snapshots from disk.",
);
@@ -111,6 +122,12 @@ impl DownloadDefaults {
self
}
/// Set the default chain-aware base URL.
pub fn with_chain_aware_base_url(mut self, url: impl Into<Cow<'static, str>>) -> Self {
self.default_chain_aware_base_url = Some(url.into());
self
}
/// Builder: Set custom long help text, overriding the generated help
pub fn with_long_help(mut self, help: impl Into<String>) -> Self {
self.long_help = Some(help.into());
@@ -142,7 +159,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
let url = match self.url {
Some(url) => url,
None => {
let url = get_latest_snapshot_url().await?;
let url = get_latest_snapshot_url(self.env.chain.chain().id()).await?;
info!(target: "reth::cli", "Using default snapshot URL: {}", url);
url
}
@@ -509,8 +526,12 @@ async fn stream_and_extract(url: &str, target_dir: &Path) -> Result<()> {
}
// Builds default URL for latest mainnet archive snapshot using configured defaults
async fn get_latest_snapshot_url() -> Result<String> {
let base_url = &DownloadDefaults::get_global().default_base_url;
async fn get_latest_snapshot_url(chain_id: u64) -> Result<String> {
let defaults = DownloadDefaults::get_global();
let base_url = match &defaults.default_chain_aware_base_url {
Some(url) => format!("{url}/{chain_id}"),
None => defaults.default_base_url.to_string(),
};
let latest_url = format!("{base_url}/latest.txt");
let filename = Client::new()
.get(latest_url)

View File

@@ -10,8 +10,8 @@ use reth_node_builder::NodeBuilder;
use reth_node_core::{
args::{
DatabaseArgs, DatadirArgs, DebugArgs, DevArgs, EngineArgs, EraArgs, MetricArgs,
NetworkArgs, PayloadBuilderArgs, PruningArgs, RocksDbArgs, RpcServerArgs, StaticFilesArgs,
StorageArgs, TxPoolArgs,
NetworkArgs, PayloadBuilderArgs, PruningArgs, RpcServerArgs, StaticFilesArgs, StorageArgs,
TxPoolArgs,
},
node_config::NodeConfig,
version,
@@ -103,10 +103,6 @@ pub struct NodeCommand<C: ChainSpecParser, Ext: clap::Args + fmt::Debug = NoArgs
#[command(flatten)]
pub pruning: PruningArgs,
/// All `RocksDB` table routing arguments
#[command(flatten)]
pub rocksdb: RocksDbArgs,
/// Engine cli arguments
#[command(flatten, next_help_heading = "Engine")]
pub engine: EngineArgs,
@@ -119,8 +115,8 @@ pub struct NodeCommand<C: ChainSpecParser, Ext: clap::Args + fmt::Debug = NoArgs
#[command(flatten, next_help_heading = "Static Files")]
pub static_files: StaticFilesArgs,
/// Storage mode configuration (v2 vs v1/legacy)
#[command(flatten)]
/// All storage related arguments with --storage prefix
#[command(flatten, next_help_heading = "Storage")]
pub storage: StorageArgs,
/// Additional cli arguments
@@ -175,7 +171,6 @@ where
db,
dev,
pruning,
rocksdb,
engine,
era,
static_files,
@@ -183,9 +178,6 @@ where
ext,
} = self;
// Validate RocksDB arguments
rocksdb.validate()?;
// set up node config
let mut node_config = NodeConfig {
datadir,
@@ -201,7 +193,6 @@ where
db,
dev,
pruning,
rocksdb,
engine,
era,
static_files,

View File

@@ -45,12 +45,16 @@ impl<C: ChainSpecParser> Command<C> {
let tool = DbTool::new(provider_factory)?;
let static_file_segment = match self.stage {
StageEnum::Headers => Some(StaticFileSegment::Headers),
StageEnum::Bodies => Some(StaticFileSegment::Transactions),
StageEnum::Execution => Some(StaticFileSegment::Receipts),
StageEnum::Senders => Some(StaticFileSegment::TransactionSenders),
_ => None,
let static_file_segments = match self.stage {
StageEnum::Headers => vec![StaticFileSegment::Headers],
StageEnum::Bodies => vec![StaticFileSegment::Transactions],
StageEnum::Execution => vec![
StaticFileSegment::Receipts,
StaticFileSegment::AccountChangeSets,
StaticFileSegment::StorageChangeSets,
],
StageEnum::Senders => vec![StaticFileSegment::TransactionSenders],
_ => vec![],
};
// Calling `StaticFileProviderRW::prune_*` will instruct the writer to prune rows only
@@ -58,35 +62,33 @@ impl<C: ChainSpecParser> Command<C> {
// deleting the jar files, otherwise if the task were to be interrupted after we
// have deleted them, BUT before we have committed the checkpoints to the database, we'd
// lose essential data.
if let Some(static_file_segment) = static_file_segment {
let static_file_provider = tool.provider_factory.static_file_provider();
if let Some(highest_block) =
static_file_provider.get_highest_static_file_block(static_file_segment)
let static_file_provider = tool.provider_factory.static_file_provider();
for segment in static_file_segments {
if let Some(highest_block) = static_file_provider.get_highest_static_file_block(segment)
{
let mut writer = static_file_provider.latest_writer(static_file_segment)?;
let mut writer = static_file_provider.latest_writer(segment)?;
match static_file_segment {
match segment {
StaticFileSegment::Headers => {
// Prune all headers leaving genesis intact.
writer.prune_headers(highest_block)?;
}
StaticFileSegment::Transactions => {
let to_delete = static_file_provider
.get_highest_static_file_tx(static_file_segment)
.get_highest_static_file_tx(segment)
.map(|tx_num| tx_num + 1)
.unwrap_or_default();
writer.prune_transactions(to_delete, 0)?;
}
StaticFileSegment::Receipts => {
let to_delete = static_file_provider
.get_highest_static_file_tx(static_file_segment)
.get_highest_static_file_tx(segment)
.map(|tx_num| tx_num + 1)
.unwrap_or_default();
writer.prune_receipts(to_delete, 0)?;
}
StaticFileSegment::TransactionSenders => {
let to_delete = static_file_provider
.get_highest_static_file_tx(static_file_segment)
.get_highest_static_file_tx(segment)
.map(|tx_num| tx_num + 1)
.unwrap_or_default();
writer.prune_transaction_senders(to_delete, 0)?;
@@ -131,8 +133,15 @@ impl<C: ChainSpecParser> Command<C> {
reset_stage_checkpoint(tx, StageId::SenderRecovery)?;
}
StageEnum::Execution => {
tx.clear::<tables::PlainAccountState>()?;
tx.clear::<tables::PlainStorageState>()?;
if provider_rw.cached_storage_settings().use_hashed_state() {
tx.clear::<tables::HashedAccounts>()?;
tx.clear::<tables::HashedStorages>()?;
reset_stage_checkpoint(tx, StageId::AccountHashing)?;
reset_stage_checkpoint(tx, StageId::StorageHashing)?;
} else {
tx.clear::<tables::PlainAccountState>()?;
tx.clear::<tables::PlainStorageState>()?;
}
tx.clear::<tables::AccountChangeSets>()?;
tx.clear::<tables::StorageChangeSets>()?;
tx.clear::<tables::Bytecodes>()?;
@@ -178,7 +187,7 @@ impl<C: ChainSpecParser> Command<C> {
let settings = provider_rw.cached_storage_settings();
let rocksdb = tool.provider_factory.rocksdb_provider();
if settings.account_history_in_rocksdb {
if settings.storage_v2 {
rocksdb.clear::<tables::AccountsHistory>()?;
} else {
tx.clear::<tables::AccountsHistory>()?;
@@ -195,7 +204,7 @@ impl<C: ChainSpecParser> Command<C> {
let settings = provider_rw.cached_storage_settings();
let rocksdb = tool.provider_factory.rocksdb_provider();
if settings.storages_history_in_rocksdb {
if settings.storage_v2 {
rocksdb.clear::<tables::StoragesHistory>()?;
} else {
tx.clear::<tables::StoragesHistory>()?;
@@ -209,7 +218,7 @@ impl<C: ChainSpecParser> Command<C> {
)?;
}
StageEnum::TxLookup => {
if provider_rw.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
if provider_rw.cached_storage_settings().storage_v2 {
tool.provider_factory
.rocksdb_provider()
.clear::<tables::TransactionHashNumbers>()?;

View File

@@ -30,8 +30,14 @@ impl CliRunner {
///
/// The default runtime is multi-threaded, with both I/O and time drivers enabled.
pub fn try_default_runtime() -> Result<Self, reth_tasks::RuntimeBuildError> {
let runtime =
reth_tasks::RuntimeBuilder::new(reth_tasks::RuntimeConfig::default()).build()?;
Self::try_with_runtime_config(reth_tasks::RuntimeConfig::default())
}
/// Creates a new [`CliRunner`] with the given [`RuntimeConfig`](reth_tasks::RuntimeConfig).
pub fn try_with_runtime_config(
config: reth_tasks::RuntimeConfig,
) -> Result<Self, reth_tasks::RuntimeBuildError> {
let runtime = reth_tasks::RuntimeBuilder::new(config).build()?;
Ok(Self { config: CliRunnerConfig::default(), runtime })
}

View File

@@ -1,6 +1,6 @@
//! Test setup utilities for configuring the initial state.
use crate::{setup_engine_with_connection, testsuite::Environment, NodeBuilderHelper};
use crate::{testsuite::Environment, E2ETestSetupBuilder, NodeBuilderHelper};
use alloy_eips::BlockNumberOrTag;
use alloy_primitives::B256;
use alloy_rpc_types_engine::{ForkchoiceState, PayloadAttributes};
@@ -38,6 +38,8 @@ pub struct Setup<I> {
shutdown_tx: Option<mpsc::Sender<()>>,
/// Is this setup in dev mode
pub is_dev: bool,
/// Whether to use v2 storage mode (hashed keys, static file changesets, rocksdb history)
pub storage_v2: bool,
/// Tracks instance generic.
_phantom: PhantomData<I>,
/// Holds the import result to keep nodes alive when using imported chain
@@ -58,6 +60,7 @@ impl<I> Default for Setup<I> {
tree_config: TreeConfig::default(),
shutdown_tx: None,
is_dev: true,
storage_v2: false,
_phantom: Default::default(),
import_result_holder: None,
import_rlp_path: None,
@@ -126,6 +129,12 @@ where
self
}
/// Enable v2 storage mode (hashed keys, static file changesets, rocksdb history)
pub const fn with_storage_v2(mut self) -> Self {
self.storage_v2 = true;
self
}
/// Apply setup using pre-imported chain data from RLP file
pub async fn apply_with_import<N>(
&mut self,
@@ -194,19 +203,28 @@ where
self.shutdown_tx = Some(shutdown_tx);
let is_dev = self.is_dev;
let storage_v2 = self.storage_v2;
let node_count = self.network.node_count;
let tree_config = self.tree_config.clone();
let attributes_generator = Self::create_static_attributes_generator::<N>();
let result = setup_engine_with_connection::<N>(
let mut builder = E2ETestSetupBuilder::<N, _>::new(
node_count,
Arc::<N::ChainSpec>::new((*chain_spec).clone().into()),
is_dev,
self.tree_config.clone(),
attributes_generator,
self.network.connect_nodes,
)
.await;
.with_tree_config_modifier(move |base| {
tree_config.clone().with_cross_block_cache_size(base.cross_block_cache_size())
})
.with_node_config_modifier(move |config| config.set_dev(is_dev))
.with_connect_nodes(self.network.connect_nodes);
if storage_v2 {
builder = builder.with_storage_v2();
}
let result = builder.build().await;
let mut node_clients = Vec::new();
match result {

View File

@@ -10,7 +10,6 @@ use jsonrpsee::core::client::ClientT;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
use reth_db::tables;
use reth_e2e_test_utils::{transaction::TransactionTestContext, wallet, E2ETestSetupBuilder};
use reth_node_core::args::RocksDbArgs;
use reth_node_ethereum::EthereumNode;
use reth_payload_builder::EthPayloadBuilderAttributes;
use reth_provider::RocksDBProviderFactory;
@@ -96,22 +95,6 @@ fn test_attributes_generator(timestamp: u64) -> EthPayloadBuilderAttributes {
EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
}
/// Verifies that `RocksDB` CLI defaults are `None` (deferred to storage mode).
#[test]
fn test_rocksdb_defaults_are_none() {
let args = RocksDbArgs::default();
assert!(args.tx_hash.is_none(), "tx_hash default should be None (deferred to --storage.v2)");
assert!(
args.storages_history.is_none(),
"storages_history default should be None (deferred to --storage.v2)"
);
assert!(
args.account_history.is_none(),
"account_history default should be None (deferred to --storage.v2)"
);
}
/// Smoke test: node boots with `RocksDB` routing enabled.
#[tokio::test]
async fn test_rocksdb_node_startup() -> Result<()> {
@@ -477,7 +460,7 @@ async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
///
/// This test exercises `unwind_trie_state_from` which previously failed with
/// `UnsortedInput` errors because it read changesets directly from MDBX tables
/// instead of using storage-aware methods that check `storage_changesets_in_static_files`.
/// instead of using storage-aware methods that check `is_v2()`.
#[tokio::test]
async fn test_rocksdb_reorg_unwind() -> Result<()> {
reth_tracing::init_test_tracing();

View File

@@ -179,6 +179,8 @@ pub struct TreeConfig {
sparse_trie_prune_depth: usize,
/// Maximum number of storage tries to retain after pruning.
sparse_trie_max_storage_tries: usize,
/// Whether to fully disable sparse trie cache pruning between blocks.
disable_sparse_trie_cache_pruning: bool,
/// Timeout for the state root task before spawning a sequential fallback computation.
/// If `Some`, after waiting this duration for the state root task, a sequential state root
/// computation is spawned in parallel and whichever finishes first is used.
@@ -216,6 +218,7 @@ impl Default for TreeConfig {
disable_trie_cache: false,
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
disable_sparse_trie_cache_pruning: false,
state_root_task_timeout: Some(DEFAULT_STATE_ROOT_TASK_TIMEOUT),
}
}
@@ -281,6 +284,7 @@ impl TreeConfig {
disable_trie_cache: false,
sparse_trie_prune_depth,
sparse_trie_max_storage_tries,
disable_sparse_trie_cache_pruning: false,
state_root_task_timeout,
}
}
@@ -631,6 +635,17 @@ impl TreeConfig {
self
}
/// Returns whether sparse trie cache pruning is disabled.
pub const fn disable_sparse_trie_cache_pruning(&self) -> bool {
self.disable_sparse_trie_cache_pruning
}
/// Setter for whether to disable sparse trie cache pruning.
pub const fn with_disable_sparse_trie_cache_pruning(mut self, value: bool) -> Self {
self.disable_sparse_trie_cache_pruning = value;
self
}
/// Returns the state root task timeout.
pub const fn state_root_task_timeout(&self) -> Option<Duration> {
self.state_root_task_timeout

View File

@@ -23,7 +23,7 @@ use serde::{de::DeserializeOwned, Serialize};
// Re-export [`ExecutionPayload`] moved to `reth_payload_primitives`
#[cfg(feature = "std")]
pub use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator, ExecutableTxTuple};
pub use reth_evm::{ConfigureEngineEvm, ConvertTx, ExecutableTxIterator, ExecutableTxTuple};
pub use reth_payload_primitives::ExecutionPayload;
mod error;

View File

@@ -20,7 +20,7 @@ use reth_node_types::{BlockTy, NodeTypes};
use reth_payload_builder::PayloadBuilderHandle;
use reth_provider::{
providers::{BlockchainProvider, ProviderNodeTypes},
ProviderFactory,
ProviderFactory, StorageSettingsCache,
};
use reth_prune::PrunerWithFactory;
use reth_stages_api::{MetricEventsSender, Pipeline};
@@ -94,6 +94,7 @@ where
if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
let downloader = BasicBlockDownloader::new(client, consensus.clone());
let use_hashed_state = provider.cached_storage_settings().use_hashed_state();
let persistence_handle =
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
@@ -111,6 +112,7 @@ where
engine_kind,
evm_config,
changeset_cache,
use_hashed_state,
);
let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);

View File

@@ -143,6 +143,13 @@ test-utils = [
"reth-evm-ethereum/test-utils",
"reth-tasks/test-utils",
]
rocksdb = [
"reth-provider/rocksdb",
"reth-prune/rocksdb",
"reth-stages?/rocksdb",
"reth-e2e-test-utils/rocksdb",
]
edge = ["rocksdb"]
[[test]]
name = "e2e_testsuite"

View File

@@ -76,8 +76,16 @@ impl CacheConfig for EpochCacheConfig {
type FixedCache<K, V, H = DefaultHashBuilder> = fixed_cache::Cache<K, V, H, EpochCacheConfig>;
/// A wrapper of a state provider and a shared cache.
///
/// The const generic `PREWARM` controls whether every cache miss is populated. This is only
/// relevant for pre-warm transaction execution with the intention to pre-populate the cache with
/// data for regular block execution. During regular block execution the cache doesn't need to be
/// populated because the actual EVM database [`State`](revm::database::State) also caches
/// internally during block execution and the cache is then updated after the block with the entire
/// [`BundleState`] output of that block which contains all accessed accounts, code, storage. See
/// also [`ExecutionCache::insert_state`].
#[derive(Debug)]
pub struct CachedStateProvider<S> {
pub struct CachedStateProvider<S, const PREWARM: bool = false> {
/// The state provider
state_provider: S,
@@ -86,15 +94,9 @@ pub struct CachedStateProvider<S> {
/// Metrics for the cached state provider
metrics: CachedStateMetrics,
/// If prewarm enabled we populate every cache miss
prewarm: bool,
}
impl<S> CachedStateProvider<S>
where
S: StateProvider,
{
impl<S> CachedStateProvider<S> {
/// Creates a new [`CachedStateProvider`] from an [`ExecutionCache`], state provider, and
/// [`CachedStateMetrics`].
pub const fn new(
@@ -102,27 +104,18 @@ where
caches: ExecutionCache,
metrics: CachedStateMetrics,
) -> Self {
Self { state_provider, caches, metrics, prewarm: false }
Self { state_provider, caches, metrics }
}
}
impl<S> CachedStateProvider<S> {
/// Enables pre-warm mode so that every cache miss is populated.
///
/// This is only relevant for pre-warm transaction execution with the intention to pre-populate
/// the cache with data for regular block execution. During regular block execution the
/// cache doesn't need to be populated because the actual EVM database
/// [`State`](revm::database::State) also caches internally during block execution and the cache
/// is then updated after the block with the entire [`BundleState`] output of that block which
/// contains all accessed accounts,code,storage. See also [`ExecutionCache::insert_state`].
pub const fn prewarm(mut self) -> Self {
self.prewarm = true;
self
}
/// Returns whether this provider should pre-warm cache misses.
const fn is_prewarm(&self) -> bool {
self.prewarm
impl<S> CachedStateProvider<S, true> {
/// Creates a new [`CachedStateProvider`] with prewarming enabled.
pub const fn new_prewarm(
state_provider: S,
caches: ExecutionCache,
metrics: CachedStateMetrics,
) -> Self {
Self { state_provider, caches, metrics }
}
}
@@ -307,9 +300,9 @@ impl<K: PartialEq, V> StatsHandler<K, V> for CacheStatsHandler {
}
}
impl<S: AccountReader> AccountReader for CachedStateProvider<S> {
impl<S: AccountReader, const PREWARM: bool> AccountReader for CachedStateProvider<S, PREWARM> {
fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
if self.is_prewarm() {
if PREWARM {
match self.caches.get_or_try_insert_account_with(*address, || {
self.state_provider.basic_account(address)
})? {
@@ -334,13 +327,13 @@ pub enum CachedStatus<T> {
Cached(T),
}
impl<S: StateProvider> StateProvider for CachedStateProvider<S> {
impl<S: StateProvider, const PREWARM: bool> StateProvider for CachedStateProvider<S, PREWARM> {
fn storage(
&self,
account: Address,
storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
if self.is_prewarm() {
if PREWARM {
match self.caches.get_or_try_insert_storage_with(account, storage_key, || {
self.state_provider.storage(account, storage_key).map(Option::unwrap_or_default)
})? {
@@ -358,11 +351,19 @@ impl<S: StateProvider> StateProvider for CachedStateProvider<S> {
self.state_provider.storage(account, storage_key)
}
}
fn storage_by_hashed_key(
&self,
address: Address,
hashed_storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
self.state_provider.storage_by_hashed_key(address, hashed_storage_key)
}
}
impl<S: BytecodeReader> BytecodeReader for CachedStateProvider<S> {
impl<S: BytecodeReader, const PREWARM: bool> BytecodeReader for CachedStateProvider<S, PREWARM> {
fn bytecode_by_hash(&self, code_hash: &B256) -> ProviderResult<Option<Bytecode>> {
if self.is_prewarm() {
if PREWARM {
match self.caches.get_or_try_insert_code_with(*code_hash, || {
self.state_provider.bytecode_by_hash(code_hash)
})? {
@@ -378,7 +379,9 @@ impl<S: BytecodeReader> BytecodeReader for CachedStateProvider<S> {
}
}
impl<S: StateRootProvider> StateRootProvider for CachedStateProvider<S> {
impl<S: StateRootProvider, const PREWARM: bool> StateRootProvider
for CachedStateProvider<S, PREWARM>
{
fn state_root(&self, hashed_state: HashedPostState) -> ProviderResult<B256> {
self.state_provider.state_root(hashed_state)
}
@@ -402,7 +405,9 @@ impl<S: StateRootProvider> StateRootProvider for CachedStateProvider<S> {
}
}
impl<S: StateProofProvider> StateProofProvider for CachedStateProvider<S> {
impl<S: StateProofProvider, const PREWARM: bool> StateProofProvider
for CachedStateProvider<S, PREWARM>
{
fn proof(
&self,
input: TrieInput,
@@ -429,7 +434,9 @@ impl<S: StateProofProvider> StateProofProvider for CachedStateProvider<S> {
}
}
impl<S: StorageRootProvider> StorageRootProvider for CachedStateProvider<S> {
impl<S: StorageRootProvider, const PREWARM: bool> StorageRootProvider
for CachedStateProvider<S, PREWARM>
{
fn storage_root(
&self,
address: Address,
@@ -457,7 +464,7 @@ impl<S: StorageRootProvider> StorageRootProvider for CachedStateProvider<S> {
}
}
impl<S: BlockHashReader> BlockHashReader for CachedStateProvider<S> {
impl<S: BlockHashReader, const PREWARM: bool> BlockHashReader for CachedStateProvider<S, PREWARM> {
fn block_hash(&self, number: alloy_primitives::BlockNumber) -> ProviderResult<Option<B256>> {
self.state_provider.block_hash(number)
}
@@ -471,7 +478,9 @@ impl<S: BlockHashReader> BlockHashReader for CachedStateProvider<S> {
}
}
impl<S: HashedPostStateProvider> HashedPostStateProvider for CachedStateProvider<S> {
impl<S: HashedPostStateProvider, const PREWARM: bool> HashedPostStateProvider
for CachedStateProvider<S, PREWARM>
{
fn hashed_post_state(&self, bundle_state: &reth_revm::db::BundleState) -> HashedPostState {
self.state_provider.hashed_post_state(bundle_state)
}
@@ -836,8 +845,10 @@ impl SavedCache {
self.caches.update_metrics(&self.metrics);
}
/// Clears all caches, resetting them to empty state.
pub(crate) fn clear(&self) {
/// Clears all caches, resetting them to empty state,
/// and updates the hash of the block this cache belongs to.
pub(crate) fn clear_with_hash(&mut self, hash: B256) {
self.hash = hash;
self.caches.clear();
}
}

View File

@@ -199,6 +199,17 @@ impl<S: StateProvider> StateProvider for InstrumentedStateProvider<S> {
self.record_storage_fetch(start.elapsed());
res
}
fn storage_by_hashed_key(
&self,
address: Address,
hashed_storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
let start = Instant::now();
let res = self.state_provider.storage_by_hashed_key(address, hashed_storage_key);
self.record_storage_fetch(start.elapsed());
res
}
}
impl<S: BytecodeReader> BytecodeReader for InstrumentedStateProvider<S> {

View File

@@ -12,12 +12,13 @@ use reth_primitives_traits::constants::gas_units::MEGAGAS;
use reth_trie::updates::TrieUpdates;
use std::time::{Duration, Instant};
/// Width of each gas bucket in gas units (10 Mgas).
const GAS_BUCKET_SIZE: u64 = 10 * MEGAGAS;
/// Upper bounds for each gas bucket. The last bucket is a catch-all for
/// everything above the final threshold: <5M, 5-10M, 10-20M, 20-30M, 30-40M, >40M.
const GAS_BUCKET_THRESHOLDS: [u64; 5] =
[5 * MEGAGAS, 10 * MEGAGAS, 20 * MEGAGAS, 30 * MEGAGAS, 40 * MEGAGAS];
/// Number of gas buckets. The last bucket is a catch-all for everything above
/// `(NUM_GAS_BUCKETS - 1) * GAS_BUCKET_SIZE`.
const NUM_GAS_BUCKETS: usize = 5;
/// Total number of gas buckets (thresholds + 1 catch-all).
const NUM_GAS_BUCKETS: usize = GAS_BUCKET_THRESHOLDS.len() + 1;
/// Metrics for the `EngineApi`.
#[derive(Debug, Default)]
@@ -280,21 +281,23 @@ impl GasBucketMetrics {
}
fn bucket_index(gas_used: u64) -> usize {
let idx = gas_used / GAS_BUCKET_SIZE;
(idx as usize).min(NUM_GAS_BUCKETS - 1)
GAS_BUCKET_THRESHOLDS
.iter()
.position(|&threshold| gas_used < threshold)
.unwrap_or(GAS_BUCKET_THRESHOLDS.len())
}
/// Returns a human-readable label like `<10M`, `10-20M`, … `>40M`.
/// Returns a human-readable label like `<5M`, `5-10M`, … `>40M`.
fn bucket_label(index: usize) -> String {
let m = GAS_BUCKET_SIZE / 1_000_000;
if index == 0 {
format!("<{m}M")
} else if index < NUM_GAS_BUCKETS - 1 {
let lo = m * index as u64;
let hi = lo + m;
let hi = GAS_BUCKET_THRESHOLDS[0] / MEGAGAS;
format!("<{hi}M")
} else if index < GAS_BUCKET_THRESHOLDS.len() {
let lo = GAS_BUCKET_THRESHOLDS[index - 1] / MEGAGAS;
let hi = GAS_BUCKET_THRESHOLDS[index] / MEGAGAS;
format!("{lo}-{hi}M")
} else {
let lo = m * index as u64;
let lo = GAS_BUCKET_THRESHOLDS[GAS_BUCKET_THRESHOLDS.len() - 1] / MEGAGAS;
format!(">{lo}M")
}
}

View File

@@ -32,7 +32,7 @@ use reth_provider::{
BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
TransactionVariant,
StorageSettingsCache, TransactionVariant,
};
use reth_revm::database::StateProviderDatabase;
use reth_stages_api::ControlFlow;
@@ -271,6 +271,9 @@ where
evm_config: C,
/// Changeset cache for in-memory trie changesets
changeset_cache: ChangesetCache,
/// Whether the node uses hashed state as canonical storage (v2 mode).
/// Cached at construction to avoid threading `StorageSettingsCache` bounds everywhere.
use_hashed_state: bool,
}
impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
@@ -296,6 +299,7 @@ where
.field("engine_kind", &self.engine_kind)
.field("evm_config", &self.evm_config)
.field("changeset_cache", &self.changeset_cache)
.field("use_hashed_state", &self.use_hashed_state)
.finish()
}
}
@@ -313,7 +317,8 @@ where
P::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
+ StageCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader,
+ StorageChangeSetReader
+ StorageSettingsCache,
C: ConfigureEvm<Primitives = N> + 'static,
T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
V: EngineValidator<T>,
@@ -334,6 +339,7 @@ where
engine_kind: EngineApiKind,
evm_config: C,
changeset_cache: ChangesetCache,
use_hashed_state: bool,
) -> Self {
let (incoming_tx, incoming) = crossbeam_channel::unbounded();
@@ -355,6 +361,7 @@ where
engine_kind,
evm_config,
changeset_cache,
use_hashed_state,
}
}
@@ -375,6 +382,7 @@ where
kind: EngineApiKind,
evm_config: C,
changeset_cache: ChangesetCache,
use_hashed_state: bool,
) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
{
let best_block_number = provider.best_block_number().unwrap_or(0);
@@ -407,6 +415,7 @@ where
kind,
evm_config,
changeset_cache,
use_hashed_state,
);
let incoming = task.incoming_tx.clone();
spawn_os_thread("engine", || task.run());
@@ -1402,7 +1411,7 @@ where
// Spawn a background task to trigger computation so it's ready when the next payload
// arrives.
if let Some(overlay) = self.state.tree_state.prepare_canonical_overlay() {
rayon::spawn(move || {
tokio::task::spawn_blocking(move || {
let _ = overlay.get();
});
}
@@ -1510,7 +1519,7 @@ where
.engine
.failed_forkchoice_updated_response_deliveries
.increment(1);
error!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
}
}
BeaconEngineMessage::NewPayload { payload, tx } => {
@@ -1534,7 +1543,7 @@ where
BeaconOnNewPayloadError::Internal(Box::new(e))
}))
{
error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
self.metrics
.engine
.failed_new_payload_response_deliveries
@@ -2379,7 +2388,12 @@ where
self.update_reorg_metrics(old.len(), old_first);
self.reinsert_reorged_blocks(new.clone());
self.reinsert_reorged_blocks(old.clone());
// When use_hashed_state is enabled, skip reinserting the old chain — the
// bundle state references plain state reverts which don't exist.
if !self.use_hashed_state {
self.reinsert_reorged_blocks(old.clone());
}
}
// update the tracked in-memory state with the new chain

View File

@@ -23,8 +23,8 @@ use rayon::prelude::*;
use reth_evm::{
block::ExecutableTxParts,
execute::{ExecutableTxFor, WithTxEnv},
ConfigureEvm, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook, SpecFor,
TxEnvFor,
ConfigureEvm, ConvertTx, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook,
SpecFor, TxEnvFor,
};
use reth_metrics::Metrics;
use reth_primitives_traits::NodePrimitives;
@@ -50,7 +50,7 @@ use std::{
mpsc::{self, channel},
Arc,
},
time::Instant,
time::{Duration, Instant},
};
use tracing::{debug, debug_span, instrument, warn, Span};
@@ -94,17 +94,9 @@ pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
/// 144MB.
pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
/// Gas used threshold below which prewarming is skipped for small blocks.
///
/// Based on analysis of 100 mainnet blocks (starting at block 24,425,210), reth's `new_payload`
/// winrate drops from 88% on 40-50M gas blocks to 49% on 0-10M gas blocks. The root cause is the
/// fixed overhead of spawning prewarm workers (building state providers, creating EVM instances,
/// wrapping precompiles) which exceeds execution time on small blocks.
///
/// 20M gas is used as the threshold because ~23% of recent mainnet blocks fall under this level
/// where prewarming overhead dominates.
pub const SMALL_BLOCK_GAS_THRESHOLD: u64 = 20_000_000;
/// Blocks with fewer transactions than this skip prewarming, since the fixed overhead of spawning
/// prewarm workers exceeds the execution time saved.
pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;
/// Type alias for [`PayloadHandle`] returned by payload processor spawn methods.
type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
WithTxEnv<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>,
@@ -146,6 +138,8 @@ where
sparse_trie_prune_depth: usize,
/// Maximum storage tries to retain after pruning.
sparse_trie_max_storage_tries: usize,
/// Whether sparse trie cache pruning is fully disabled.
disable_sparse_trie_cache_pruning: bool,
/// Whether to disable cache metrics recording.
disable_cache_metrics: bool,
}
@@ -181,6 +175,7 @@ where
prewarm_max_concurrency: config.prewarm_max_concurrency(),
sparse_trie_prune_depth: config.sparse_trie_prune_depth(),
sparse_trie_max_storage_tries: config.sparse_trie_max_storage_tries(),
disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
disable_cache_metrics: config.disable_cache_metrics(),
}
}
@@ -246,7 +241,8 @@ where
+ 'static,
{
// start preparing transactions immediately
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
let (prewarm_rx, execution_rx) =
self.spawn_tx_iterator(transactions, env.transaction_count);
let span = Span::current();
let (to_sparse_trie, sparse_trie_rx) = channel();
@@ -287,15 +283,7 @@ where
// Create and spawn the storage proof task
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
let storage_worker_count = config.storage_worker_count();
let account_worker_count = config.account_worker_count();
let proof_handle = ProofWorkerHandle::new(
&self.executor,
task_ctx,
storage_worker_count,
account_worker_count,
v2_proofs_enabled,
);
let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, v2_proofs_enabled);
if config.disable_trie_cache() {
let multi_proof_task = MultiProofTask::new(
@@ -361,7 +349,8 @@ where
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
let (prewarm_rx, execution_rx) =
self.spawn_tx_iterator(transactions, env.transaction_count);
// This path doesn't use multiproof, so V2 proofs flag doesn't matter
let prewarm_handle =
self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal, false);
@@ -374,11 +363,23 @@ where
}
}
/// Transaction count threshold below which sequential signature recovery is used.
///
/// For blocks with fewer than this many transactions, the rayon parallel iterator overhead
/// (work-stealing setup, channel-based reorder) exceeds the cost of sequential ECDSA
/// recovery. Inspired by Nethermind's `RecoverSignature` which uses sequential `foreach`
/// for small blocks.
const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
/// Spawns a task advancing transaction env iterator and streaming updates through a channel.
///
/// For blocks with fewer than [`Self::SMALL_BLOCK_TX_THRESHOLD`] transactions, uses
/// sequential iteration to avoid rayon overhead.
#[expect(clippy::type_complexity)]
fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
&self,
transactions: I,
transaction_count: usize,
) -> (
mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Recovered>>,
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
@@ -387,22 +388,51 @@ where
let (prewarm_tx, prewarm_rx) = mpsc::channel();
let (execute_tx, execute_rx) = mpsc::channel();
// Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
rayon::spawn(move || {
let (transactions, convert) = transactions.into();
transactions.into_par_iter().enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
let tx = convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
// Only send Ok(_) variants to prewarming task.
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
if transaction_count == 0 {
// Empty block — nothing to do.
} else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
// Sequential path for small blocks — avoids rayon work-stealing setup and
// channel-based reorder overhead when it costs more than the ECDSA recovery itself.
debug!(
target: "engine::tree::payload_processor",
transaction_count,
"using sequential sig recovery for small block"
);
self.executor.spawn_blocking(move || {
let (transactions, convert) = transactions.into_parts();
for (idx, tx) in transactions.into_iter().enumerate() {
let tx = convert.convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
let _ = ooo_tx.send((idx, tx));
}
let _ = ooo_tx.send((idx, tx));
});
});
} else {
// Parallel path — spawn on rayon for parallel signature recovery.
rayon::spawn(move || {
let (transactions, convert) = transactions.into_parts();
transactions.into_par_iter().enumerate().for_each_with(
ooo_tx,
|ooo_tx, (idx, tx)| {
let tx = convert.convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
// Only send Ok(_) variants to prewarming task.
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
let _ = ooo_tx.send((idx, tx));
},
);
});
}
// Spawn a task that processes out-of-order transactions from the task above and sends them
// to the execution task in order.
@@ -414,8 +444,8 @@ where
let _ = execute_tx.send(tx);
next_for_execution += 1;
while let Some(entry) = queue.first_entry() &&
*entry.key() == next_for_execution
while let Some(entry) = queue.first_entry()
&& *entry.key() == next_for_execution
{
let _ = execute_tx.send(entry.remove());
next_for_execution += 1;
@@ -442,8 +472,8 @@ where
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
let skip_prewarm = self.disable_transaction_prewarming ||
(env.gas_used > 0 && env.gas_used < SMALL_BLOCK_GAS_THRESHOLD);
let skip_prewarm =
self.disable_transaction_prewarming || env.transaction_count < SMALL_BLOCK_TX_THRESHOLD;
let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
@@ -523,6 +553,7 @@ where
let disable_trie_cache = config.disable_trie_cache();
let prune_depth = self.sparse_trie_prune_depth;
let max_storage_tries = self.sparse_trie_max_storage_tries;
let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
let chunk_size =
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size());
let executor = self.executor.clone();
@@ -619,6 +650,7 @@ where
max_storage_tries,
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
disable_cache_pruning,
);
trie_metrics
.into_trie_for_reuse_duration_histogram
@@ -894,7 +926,7 @@ impl PayloadExecutionCache {
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
let start = Instant::now();
let cache = self.inner.read();
let mut cache = self.inner.write();
let elapsed = start.elapsed();
self.metrics.execution_cache_wait_duration.record(elapsed.as_secs_f64());
@@ -902,7 +934,7 @@ impl PayloadExecutionCache {
warn!(blocked_for=?elapsed, "Blocked waiting for execution cache mutex");
}
if let Some(c) = cache.as_ref() {
if let Some(c) = cache.as_mut() {
let cached_hash = c.executed_block_hash();
// Check that the cache hash matches the parent hash of the current block. It won't
// match in case it's a fork block.
@@ -923,13 +955,13 @@ impl PayloadExecutionCache {
);
if available {
// If the has is available (no other threads are using it), but has a mismatching
// parent hash, we can just clear it and keep using without re-creating from
// scratch.
if !hash_matches {
c.clear();
// Fork block: clear and update the hash on the ORIGINAL before cloning.
// This prevents the canonical chain from matching on the stale hash
// and picking up polluted data if the fork block fails.
c.clear_with_hash(parent_hash);
}
return Some(c.clone())
return Some(c.clone());
} else if hash_matches {
self.metrics.execution_cache_in_use.increment(1);
}
@@ -940,10 +972,25 @@ impl PayloadExecutionCache {
None
}
/// Clears the tracked cache
#[expect(unused)]
pub(crate) fn clear(&self) {
self.inner.write().take();
/// Waits until the execution cache becomes available for use.
///
/// This acquires a write lock to ensure exclusive access, then immediately releases it.
/// This is useful for synchronization before starting payload processing.
///
/// Returns the time spent waiting for the lock.
pub fn wait_for_availability(&self) -> Duration {
let start = Instant::now();
// Acquire write lock to wait for any current holders to finish
let _guard = self.inner.write();
let elapsed = start.elapsed();
if elapsed.as_millis() > 5 {
debug!(
target: "engine::tree::payload_processor",
blocked_for=?elapsed,
"Waited for execution cache to become available"
);
}
elapsed
}
/// Updates the cache with a closure that has exclusive access to the guard.
@@ -996,10 +1043,6 @@ pub struct ExecutionEnv<Evm: ConfigureEvm> {
/// Used to determine parallel worker count for prewarming.
/// A value of 0 indicates the count is unknown.
pub transaction_count: usize,
/// Total gas used in the block.
/// Used to skip prewarming for small blocks (see [`SMALL_BLOCK_GAS_THRESHOLD`]).
/// A value of 0 indicates the gas used is unknown.
pub gas_used: u64,
/// Withdrawals included in the block.
/// Used to generate prefetch targets for withdrawal addresses.
pub withdrawals: Option<Vec<Withdrawal>>,
@@ -1016,7 +1059,6 @@ where
parent_hash: Default::default(),
parent_state_root: Default::default(),
transaction_count: 0,
gas_used: 0,
withdrawals: None,
}
}
@@ -1101,10 +1143,18 @@ mod tests {
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
// When the parent hash doesn't match, the cache is cleared and returned for reuse
// When the parent hash doesn't match (fork block), the cache is cleared,
// hash updated on the original, and clone returned for reuse
let different_hash = B256::from([4u8; 32]);
let cache = execution_cache.get_cache_for(different_hash);
assert!(cache.is_some(), "cache should be returned for reuse after clearing")
assert!(cache.is_some(), "cache should be returned for reuse after clearing");
drop(cache);
// The stored cache now has the fork block's parent hash.
// Canonical chain looking for original hash sees a mismatch → clears and reuses.
let original = execution_cache.get_cache_for(hash);
assert!(original.is_some(), "canonical chain gets cache back via mismatch+clear");
}
#[test]
@@ -1328,4 +1378,61 @@ mod tests {
"State root mismatch: task={root_from_task}, base={root_from_regular}"
);
}
/// Tests the full prewarm lifecycle for a fork block:
///
/// 1. Cache is at canonical block 4.
/// 2. Fork block (parent = block 2) checks out the cache via `get_cache_for`, simulating what
/// `PrewarmCacheTask` does when it receives a `SavedCache`.
/// 3. Prewarm populates the shared cache with fork-specific state.
/// 4. While the prewarm clone is alive, the cache is unavailable (`usage_guard` > 1).
/// 5. Prewarm drops without calling `save_cache` (fork block was invalid).
/// 6. Canonical block 5 (parent = block 4) must get a cache with correct hash and no stale fork
/// data.
#[test]
fn fork_prewarm_dropped_without_save_does_not_corrupt_cache() {
let execution_cache = PayloadExecutionCache::default();
// Canonical chain at block 4.
let block4_hash = B256::from([4u8; 32]);
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(block4_hash)));
// Fork block arrives with parent = block 2. Prewarm task checks out the cache.
// This simulates PrewarmCacheTask receiving a SavedCache clone from get_cache_for.
let fork_parent = B256::from([2u8; 32]);
let prewarm_cache = execution_cache.get_cache_for(fork_parent);
assert!(prewarm_cache.is_some(), "prewarm should obtain cache for fork block");
let prewarm_cache = prewarm_cache.unwrap();
assert_eq!(prewarm_cache.executed_block_hash(), fork_parent);
// Prewarm populates cache with fork-specific state (ancestor data for block 2).
// Since ExecutionCache uses Arc<Inner>, this data is shared with the stored original.
let fork_addr = Address::from([0xBB; 20]);
let fork_key = B256::from([0xCC; 32]);
prewarm_cache.cache().insert_storage(fork_addr, fork_key, Some(U256::from(999)));
// While prewarm holds the clone, the usage_guard count > 1 → cache is in use.
let during_prewarm = execution_cache.get_cache_for(block4_hash);
assert!(
during_prewarm.is_none(),
"cache must be unavailable while prewarm holds a reference"
);
// Fork block fails — prewarm task drops without calling save_cache/update_with_guard.
drop(prewarm_cache);
// Canonical block 5 arrives (parent = block 4).
// Stored hash = fork_parent (our fix), so get_cache_for sees a mismatch,
// clears the stale fork data, and returns a cache with hash = block4_hash.
let block5_cache = execution_cache.get_cache_for(block4_hash);
assert!(
block5_cache.is_some(),
"canonical chain must get cache after fork prewarm is dropped"
);
assert_eq!(
block5_cache.as_ref().unwrap().executed_block_hash(),
block4_hash,
"cache must carry the canonical parent hash, not the fork parent"
);
}
}

View File

@@ -1541,6 +1541,7 @@ mod tests {
providers::OverlayStateProviderFactory, test_utils::create_test_provider_factory,
BlockNumReader, BlockReader, ChangeSetReader, DatabaseProviderFactory, LatestStateProvider,
PruneCheckpointReader, StageCheckpointReader, StateProviderBox, StorageChangeSetReader,
StorageSettingsCache,
};
use reth_trie::MultiProof;
use reth_trie_db::ChangesetCache;
@@ -1562,6 +1563,7 @@ mod tests {
+ PruneCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader
+ StorageSettingsCache
+ BlockNumReader,
> + Clone
+ Send
@@ -1571,7 +1573,7 @@ mod tests {
let changeset_cache = ChangesetCache::new();
let overlay_factory = OverlayStateProviderFactory::new(factory, changeset_cache);
let task_ctx = ProofTaskCtx::new(overlay_factory);
let proof_handle = ProofWorkerHandle::new(runtime, task_ctx, 1, 1, false);
let proof_handle = ProofWorkerHandle::new(runtime, task_ctx, false);
let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();
let (tx, rx) = crossbeam_channel::unbounded();
@@ -1581,7 +1583,10 @@ mod tests {
fn create_cached_provider<F>(factory: F) -> CachedStateProvider<StateProviderBox>
where
F: DatabaseProviderFactory<
Provider: BlockReader + StageCheckpointReader + PruneCheckpointReader,
Provider: BlockReader
+ StageCheckpointReader
+ PruneCheckpointReader
+ reth_provider::StorageSettingsCache,
> + Clone
+ Send
+ 'static,

View File

@@ -535,11 +535,8 @@ where
if let Some(saved_cache) = saved_cache {
let caches = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
state_provider = Box::new(
CachedStateProvider::new(state_provider, caches, cache_metrics)
// ensure we pre-warm the cache
.prewarm(),
);
state_provider =
Box::new(CachedStateProvider::new_prewarm(state_provider, caches, cache_metrics));
}
let state_provider = StateProviderDatabase::new(state_provider);
@@ -598,13 +595,10 @@ where
};
while let Ok(IndexedTransaction { index, tx }) = txs.recv() {
let enter = debug_span!(
let _enter = debug_span!(
target: "engine::tree::payload_processor::prewarm",
"prewarm tx",
index,
tx_hash = %tx.tx().tx_hash(),
is_success = tracing::field::Empty,
gas_used = tracing::field::Empty,
)
.entered();
@@ -635,12 +629,6 @@ where
};
metrics.execution_duration.record(start.elapsed());
// record some basic information about the transactions
enter.record("gas_used", res.result.gas_used());
enter.record("is_success", res.result.is_success());
drop(enter);
// If the task was cancelled, stop execution, and exit.
if terminate_execution.load(Ordering::Relaxed) {
break
@@ -649,16 +637,12 @@ where
// Only send outcome for transactions after the first txn
// as the main execution will be just as fast
if index > 0 {
let _enter =
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
.entered();
let (targets, storage_targets) =
multiproof_targets_from_state(res.state, v2_proofs_enabled);
metrics.prefetch_storage_targets.record(storage_targets as f64);
if let Some(to_multi_proof) = &to_multi_proof {
let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
}
drop(_enter);
}
metrics.total_runtime.record(start.elapsed());

View File

@@ -72,6 +72,7 @@ where
max_storage_tries: usize,
max_nodes_capacity: usize,
max_values_capacity: usize,
disable_pruning: bool,
) -> (SparseStateTrie<A, S>, DeferredDrops) {
match self {
Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
@@ -80,6 +81,7 @@ where
max_storage_tries,
max_nodes_capacity,
max_values_capacity,
disable_pruning,
),
}
}
@@ -356,16 +358,23 @@ where
/// Prunes and shrinks the trie for reuse in the next payload built on top of this one.
///
/// Should be called after the state root result has been sent.
///
/// When `disable_pruning` is true, the trie is preserved without any node pruning,
/// storage trie eviction, or capacity shrinking, keeping the full cache intact for
/// benchmarking purposes.
pub(super) fn into_trie_for_reuse(
self,
prune_depth: usize,
max_storage_tries: usize,
max_nodes_capacity: usize,
max_values_capacity: usize,
disable_pruning: bool,
) -> (SparseStateTrie<A, S>, DeferredDrops) {
let Self { mut trie, .. } = self;
trie.prune(prune_depth, max_storage_tries);
trie.shrink_to(max_nodes_capacity, max_values_capacity);
if !disable_pruning {
trie.prune(prune_depth, max_storage_tries);
trie.shrink_to(max_nodes_capacity, max_values_capacity);
}
let deferred = trie.take_deferred_drops();
(trie, deferred)
}
@@ -407,7 +416,9 @@ where
let update = match message {
Ok(m) => m,
Err(_) => {
break
return Err(ParallelStateRootError::Other(
"updates channel disconnected before state root calculation".to_string(),
))
}
};

View File

@@ -17,7 +17,6 @@ use alloy_evm::Evm;
use alloy_primitives::B256;
use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle};
use rayon::prelude::*;
use reth_chain_state::{CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, LazyOverlay};
use reth_consensus::{ConsensusError, FullConsensus, ReceiptRootBloom};
use reth_engine_primitives::{
@@ -39,7 +38,7 @@ use reth_provider::{
providers::OverlayStateProviderFactory, BlockExecutionOutput, BlockNumReader, BlockReader,
ChangeSetReader, DatabaseProviderFactory, DatabaseProviderROFactory, HashedPostStateProvider,
ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProvider,
StateProviderFactory, StateReader, StorageChangeSetReader,
StateProviderFactory, StateReader, StorageChangeSetReader, StorageSettingsCache,
};
use reth_revm::db::{states::bundle_state::BundleRetention, State};
use reth_trie::{updates::TrieUpdates, HashedPostState, StateRoot};
@@ -147,7 +146,8 @@ where
+ PruneCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader
+ BlockNumReader,
+ BlockNumReader
+ StorageSettingsCache,
> + BlockReader<Header = N::BlockHeader>
+ ChangeSetReader
+ BlockNumReader
@@ -232,35 +232,20 @@ where
V: PayloadValidator<T, Block = N::Block>,
Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
{
match input {
Ok(match input {
BlockOrPayload::Payload(payload) => {
let (iter, convert) = self
let iter = self
.evm_config
.tx_iterator_for_payload(payload)
.map_err(NewPayloadError::other)?
.into();
let iter = Either::Left(iter.into_par_iter().map(Either::Left));
let convert = move |tx| {
let Either::Left(tx) = tx else { unreachable!() };
convert(tx).map(Either::Left).map_err(Either::Left)
};
// Box the closure to satisfy the `Fn` bound both here and in the branch below
Ok((iter, Box::new(convert) as Box<dyn Fn(_) -> _ + Send + Sync + 'static>))
.map_err(NewPayloadError::other)?;
Either::Left(iter)
}
BlockOrPayload::Block(block) => {
let iter = Either::Right(
block.body().clone_transactions().into_par_iter().map(Either::Right),
);
let convert = move |tx: Either<_, N::SignedTx>| {
let Either::Right(tx) = tx else { unreachable!() };
tx.try_into_recovered().map(Either::Right).map_err(Either::Right)
};
Ok((iter, Box::new(convert)))
let txs = block.body().clone_transactions();
let convert = |tx: N::SignedTx| tx.try_into_recovered();
Either::Right((txs, convert))
}
}
})
}
/// Returns a [`ExecutionCtxFor`] for the given payload or block.
@@ -412,7 +397,6 @@ where
parent_hash: input.parent_hash(),
parent_state_root: parent_block.state_root(),
transaction_count: input.transaction_count(),
gas_used: input.gas_used(),
withdrawals: input.withdrawals().map(|w| w.to_vec()),
};
@@ -833,21 +817,18 @@ where
let tx = tx_result.map_err(BlockExecutionError::other)?;
let tx_signer = *<Tx as alloy_evm::RecoveredTx<InnerTx>>::signer(&tx);
let tx_hash = <Tx as alloy_evm::RecoveredTx<InnerTx>>::tx(&tx).tx_hash();
senders.push(tx_signer);
let span = debug_span!(
let _enter = debug_span!(
target: "engine::tree",
"execute tx",
?tx_hash,
gas_used = tracing::field::Empty,
);
let enter = span.entered();
)
.entered();
trace!(target: "engine::tree", "Executing transaction");
let tx_start = Instant::now();
let gas_used = executor.execute_transaction(tx)?;
executor.execute_transaction(tx)?;
self.metrics.record_transaction_execution(tx_start.elapsed());
let current_len = executor.receipts().len();
@@ -859,8 +840,6 @@ where
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
}
}
enter.record("gas_used", gas_used);
}
drop(exec_span);
@@ -929,6 +908,12 @@ where
/// Returns `ProviderResult<Result<...>>` where the outer `ProviderResult` captures
/// unrecoverable errors from the sequential fallback (e.g. DB errors), while the inner
/// `Result` captures parallel state root task errors that can still fall back to serial.
#[instrument(
level = "debug",
target = "engine::tree::payload_validator",
name = "await_state_root",
skip_all
)]
fn await_state_root_with_timeout<Tx, Err, R: Send + Sync + 'static>(
&self,
handle: &mut PayloadHandle<Tx, Err, R>,
@@ -1542,7 +1527,8 @@ where
+ PruneCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader
+ BlockNumReader,
+ BlockNumReader
+ StorageSettingsCache,
> + BlockReader<Header = N::BlockHeader>
+ StateProviderFactory
+ StateReader
@@ -1663,17 +1649,6 @@ impl<T: PayloadTypes> BlockOrPayload<T> {
}
}
/// Returns the total gas used by all transactions in the payload or block.
pub fn gas_used(&self) -> u64
where
T::ExecutionData: ExecutionPayload,
{
match self {
Self::Payload(payload) => payload.gas_used(),
Self::Block(block) => block.header().gas_used(),
}
}
/// Returns the withdrawals from the payload or block.
pub fn withdrawals(&self) -> Option<&[Withdrawal]>
where

View File

@@ -221,6 +221,7 @@ impl TestHarness {
EngineApiKind::Ethereum,
evm_config,
changeset_cache,
provider.cached_storage_settings().use_hashed_state(),
);
let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());

View File

@@ -2,13 +2,15 @@
mod fcu_finalized_blocks;
use alloy_rpc_types_engine::PayloadStatusEnum;
use eyre::Result;
use reth_chainspec::{ChainSpecBuilder, MAINNET};
use reth_e2e_test_utils::testsuite::{
actions::{
CaptureBlock, CompareNodeChainTips, CreateFork, ExpectFcuStatus, MakeCanonical,
ProduceBlocks, ProduceBlocksLocally, ProduceInvalidBlocks, ReorgTo, SelectActiveNode,
SendNewPayloads, UpdateBlockInfo, ValidateCanonicalTag, WaitForSync,
BlockReference, CaptureBlock, CompareNodeChainTips, CreateFork, ExpectFcuStatus,
MakeCanonical, ProduceBlocks, ProduceBlocksLocally, ProduceInvalidBlocks, ReorgTo,
SelectActiveNode, SendForkchoiceUpdate, SendNewPayloads, SetForkBase, UpdateBlockInfo,
ValidateCanonicalTag, WaitForSync,
},
setup::{NetworkSetup, Setup},
TestBuilder,
@@ -39,6 +41,14 @@ fn default_engine_tree_setup() -> Setup<EthEngineTypes> {
)
}
/// Creates a v2 storage mode setup for engine tree e2e tests.
///
/// v2 mode uses keccak256-hashed slot keys in static file changesets and rocksdb history
/// instead of plain keys in MDBX.
fn v2_engine_tree_setup() -> Setup<EthEngineTypes> {
default_engine_tree_setup().with_storage_v2()
}
/// Test that verifies forkchoice update and canonical chain insertion functionality.
#[tokio::test]
async fn test_engine_tree_fcu_canon_chain_insertion_e2e() -> Result<()> {
@@ -334,3 +344,152 @@ async fn test_engine_tree_live_sync_transition_eventually_canonical_e2e() -> Res
Ok(())
}
// ==================== v2 storage mode variants ====================
/// v2 variant: Verifies forkchoice update and canonical chain insertion in v2 storage mode.
///
/// Exercises the full `save_blocks` → `write_state` → static file changeset path with hashed keys.
#[tokio::test]
async fn test_engine_tree_fcu_canon_chain_insertion_v2_e2e() -> Result<()> {
reth_tracing::init_test_tracing();
let test = TestBuilder::new()
.with_setup(v2_engine_tree_setup())
.with_action(ProduceBlocks::<EthEngineTypes>::new(1))
.with_action(MakeCanonical::new())
.with_action(ProduceBlocks::<EthEngineTypes>::new(3))
.with_action(MakeCanonical::new());
test.run::<EthereumNode>().await?;
Ok(())
}
/// v2 variant: Verifies forkchoice update with a reorg where all blocks are already available.
///
/// Exercises `write_state_reverts` path with hashed changeset keys during CL-driven reorgs.
#[tokio::test]
async fn test_engine_tree_fcu_reorg_with_all_blocks_v2_e2e() -> Result<()> {
reth_tracing::init_test_tracing();
let test = TestBuilder::new()
.with_setup(v2_engine_tree_setup())
.with_action(ProduceBlocks::<EthEngineTypes>::new(5))
.with_action(MakeCanonical::new())
.with_action(CreateFork::<EthEngineTypes>::new(2, 3))
.with_action(CaptureBlock::new("fork_tip"))
.with_action(ReorgTo::<EthEngineTypes>::new_from_tag("fork_tip"));
test.run::<EthereumNode>().await?;
Ok(())
}
/// v2 variant: Verifies progressive canonical chain extension in v2 storage mode.
#[tokio::test]
async fn test_engine_tree_fcu_extends_canon_chain_v2_e2e() -> Result<()> {
reth_tracing::init_test_tracing();
let test = TestBuilder::new()
.with_setup(v2_engine_tree_setup())
.with_action(ProduceBlocks::<EthEngineTypes>::new(1))
.with_action(MakeCanonical::new())
.with_action(ProduceBlocks::<EthEngineTypes>::new(10))
.with_action(CaptureBlock::new("target_block"))
.with_action(ReorgTo::<EthEngineTypes>::new_from_tag("target_block"))
.with_action(MakeCanonical::new());
test.run::<EthereumNode>().await?;
Ok(())
}
/// Creates a 2-node setup for disk-level reorg testing.
///
/// Uses unconnected nodes so fork blocks can be produced independently on Node 1 and then
/// sent to Node 0 via newPayload only (no FCU), keeping Node 0's persisted chain intact
/// until the final `ReorgTo` triggers `find_disk_reorg`.
fn disk_reorg_setup(storage_v2: bool) -> Setup<EthEngineTypes> {
let mut setup = Setup::default()
.with_chain_spec(Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(
serde_json::from_str(include_str!(
"../../../../e2e-test-utils/src/testsuite/assets/genesis.json"
))
.unwrap(),
)
.cancun_activated()
.build(),
))
.with_network(NetworkSetup::multi_node_unconnected(2))
.with_tree_config(
TreeConfig::default().with_legacy_state_root(false).with_has_enough_parallelism(true),
);
if storage_v2 {
setup = setup.with_storage_v2();
}
setup
}
/// Builds a disk-level reorg test scenario.
///
/// 1. Both nodes receive 3 shared blocks
/// 2. Node 0 extends to 10 blocks locally (persisted to disk)
/// 3. Node 1 builds an 8-block fork from block 3 (its canonical head)
/// 4. Fork blocks are sent to Node 0 via newPayload (no FCU, old chain stays on disk)
/// 5. FCU to fork tip on Node 0 triggers `find_disk_reorg` → `RemoveBlocksAbove(3)`
fn disk_reorg_test(storage_v2: bool) -> TestBuilder<EthEngineTypes> {
TestBuilder::new()
.with_setup(disk_reorg_setup(storage_v2))
.with_action(SelectActiveNode::new(0))
.with_action(ProduceBlocks::<EthEngineTypes>::new(3))
.with_action(MakeCanonical::new())
.with_action(ProduceBlocksLocally::<EthEngineTypes>::new(7))
.with_action(MakeCanonical::with_active_node())
.with_action(SelectActiveNode::new(1))
.with_action(SetForkBase::new(3))
.with_action(ProduceBlocksLocally::<EthEngineTypes>::new(8))
.with_action(MakeCanonical::with_active_node())
.with_action(CaptureBlock::new("fork_tip"))
.with_action(
SendNewPayloads::<EthEngineTypes>::new()
.with_source_node(1)
.with_target_node(0)
.with_start_block(4)
.with_total_blocks(8),
)
.with_action(
SendForkchoiceUpdate::<EthEngineTypes>::new(
BlockReference::Tag("fork_tip".into()),
BlockReference::Tag("fork_tip".into()),
BlockReference::Tag("fork_tip".into()),
)
.with_expected_status(PayloadStatusEnum::Valid)
.with_node_idx(0),
)
}
/// Verifies disk-level reorg in v1 (plain key) storage mode.
///
/// Confirms `find_disk_reorg()` detects persisted blocks on the wrong fork and calls
/// `RemoveBlocksAbove` to truncate, then re-persists the correct fork chain.
#[tokio::test]
async fn test_engine_tree_disk_reorg_v1_e2e() -> Result<()> {
reth_tracing::init_test_tracing();
disk_reorg_test(false).run::<EthereumNode>().await?;
Ok(())
}
/// v2 variant: Verifies disk-level reorg in v2 storage mode.
///
/// Same scenario as v1 but with hashed changeset keys in static files and rocksdb history.
/// Exercises `find_disk_reorg()` → `RemoveBlocksAbove` with v2 hashed key format.
#[tokio::test]
async fn test_engine_tree_disk_reorg_v2_e2e() -> Result<()> {
reth_tracing::init_test_tracing();
disk_reorg_test(true).run::<EthereumNode>().await?;
Ok(())
}

View File

@@ -22,6 +22,7 @@ reth-node-core.workspace = true
reth-node-ethereum.workspace = true
reth-node-metrics.workspace = true
reth-rpc-server-types.workspace = true
reth-tasks.workspace = true
reth-tracing.workspace = true
reth-node-api.workspace = true

View File

@@ -17,6 +17,7 @@ use reth_node_builder::{NodeBuilder, WithLaunchContext};
use reth_node_ethereum::{consensus::EthBeaconConsensus, EthEvmConfig, EthereumNode};
use reth_node_metrics::recorder::install_prometheus_recorder;
use reth_rpc_server_types::RpcModuleValidator;
use reth_tasks::RayonConfig;
use reth_tracing::{FileWorkerGuard, Layers};
use std::{fmt, sync::Arc};
@@ -153,6 +154,16 @@ where
Rpc::validate_selection(ws_api, "ws.api").map_err(|e| eyre!("{e}"))?;
}
let rayon_config = RayonConfig {
reserved_cpu_cores: command.engine.reserved_cpu_cores,
proof_storage_worker_threads: command.engine.storage_worker_count,
proof_account_worker_threads: command.engine.account_worker_count,
..Default::default()
};
let runner = CliRunner::try_with_runtime_config(
reth_tasks::RuntimeConfig::default().with_rayon(rayon_config),
)?;
runner.run_command_until_exit(|ctx| {
command.execute(ctx, FnLauncher::new::<C, Ext>(launcher))
})

View File

@@ -1,4 +1,5 @@
use crate::{execute::ExecutableTxFor, ConfigureEvm, EvmEnvFor, ExecutionCtxFor, TxEnvFor};
use alloy_consensus::transaction::Either;
use alloy_evm::{block::ExecutableTxParts, RecoveredTx};
use rayon::prelude::*;
use reth_primitives_traits::TxTy;
@@ -21,10 +22,55 @@ pub trait ConfigureEngineEvm<ExecutionData>: ConfigureEvm {
) -> Result<impl ExecutableTxIterator<Self>, Self::Error>;
}
/// Converts a raw transaction into an executable transaction.
///
/// This trait abstracts the conversion logic (e.g., decoding, signature recovery) that is
/// parallelized in the engine.
pub trait ConvertTx<RawTx>: Send + Sync + 'static {
/// The executable transaction type.
type Tx;
/// Errors that may occur during conversion.
type Error;
/// Converts a raw transaction.
fn convert(&self, raw: RawTx) -> Result<Self::Tx, Self::Error>;
}
// Blanket impl so closures still work.
impl<F, RawTx, Tx, Err> ConvertTx<RawTx> for F
where
F: Fn(RawTx) -> Result<Tx, Err> + Send + Sync + 'static,
{
type Tx = Tx;
type Error = Err;
fn convert(&self, raw: RawTx) -> Result<Tx, Err> {
self(raw)
}
}
impl<A, B, RA, RB> ConvertTx<Either<RA, RB>> for Either<A, B>
where
A: ConvertTx<RA>,
B: ConvertTx<RB>,
{
type Tx = Either<A::Tx, B::Tx>;
type Error = Either<A::Error, B::Error>;
fn convert(&self, raw: Either<RA, RB>) -> Result<Self::Tx, Self::Error> {
match (self, raw) {
(Self::Left(a), Either::Left(raw)) => {
a.convert(raw).map(Either::Left).map_err(Either::Left)
}
(Self::Right(b), Either::Right(raw)) => {
b.convert(raw).map(Either::Right).map_err(Either::Right)
}
_ => unreachable!(),
}
}
}
/// A helper trait representing a pair of a "raw" transactions iterator and a closure that can be
/// used to convert them to an executable transaction. This tuple is used in the engine to
/// parallelize heavy work like decoding or recovery.
pub trait ExecutableTxTuple: Into<(Self::IntoIter, Self::Convert)> + Send + 'static {
pub trait ExecutableTxTuple: Send + 'static {
/// Raw transaction that can be converted to an [`ExecutableTxTuple::Tx`]
///
/// This can be any type that can be converted to an [`ExecutableTxTuple::Tx`]. For example,
@@ -37,12 +83,16 @@ pub trait ExecutableTxTuple: Into<(Self::IntoIter, Self::Convert)> + Send + 'sta
/// Iterator over [`ExecutableTxTuple::Tx`].
type IntoIter: IntoParallelIterator<Item = Self::RawTx, Iter: IndexedParallelIterator>
+ IntoIterator<Item = Self::RawTx>
+ Send
+ 'static;
/// Closure that can be used to convert a [`ExecutableTxTuple::RawTx`] to a
/// Converter that can be used to convert a [`ExecutableTxTuple::RawTx`] to a
/// [`ExecutableTxTuple::Tx`]. This might involve heavy work like decoding or recovery
/// and will be parallelized in the engine.
type Convert: Fn(Self::RawTx) -> Result<Self::Tx, Self::Error> + Send + Sync + 'static;
type Convert: ConvertTx<Self::RawTx, Tx = Self::Tx, Error = Self::Error>;
/// Decomposes into the raw transaction iterator and converter.
fn into_parts(self) -> (Self::IntoIter, Self::Convert);
}
impl<RawTx, Tx, Err, I, F> ExecutableTxTuple for (I, F)
@@ -50,7 +100,10 @@ where
RawTx: Send + Sync + 'static,
Tx: Clone + Send + Sync + 'static,
Err: core::error::Error + Send + Sync + 'static,
I: IntoParallelIterator<Item = RawTx, Iter: IndexedParallelIterator> + Send + 'static,
I: IntoParallelIterator<Item = RawTx, Iter: IndexedParallelIterator>
+ IntoIterator<Item = RawTx>
+ Send
+ 'static,
F: Fn(RawTx) -> Result<Tx, Err> + Send + Sync + 'static,
{
type RawTx = RawTx;
@@ -59,6 +112,10 @@ where
type IntoIter = I;
type Convert = F;
fn into_parts(self) -> (I, F) {
self
}
}
/// Iterator over executable transactions.
@@ -76,3 +133,72 @@ where
{
type Recovered = <T::Tx as ExecutableTxParts<TxEnvFor<Evm>, TxTy<Evm::Primitives>>>::Recovered;
}
/// Wraps `Either<L, R>` to implement both [`IntoParallelIterator`] and [`IntoIterator`],
/// mapping items through [`Either::Left`] / [`Either::Right`] on demand without collecting.
#[derive(Debug)]
pub struct EitherIter<L, R>(Either<L, R>);
impl<L, R> IntoParallelIterator for EitherIter<L, R>
where
L: IntoParallelIterator,
R: IntoParallelIterator,
L::Iter: IndexedParallelIterator,
R::Iter: IndexedParallelIterator,
{
type Item = Either<L::Item, R::Item>;
type Iter = Either<
rayon::iter::Map<L::Iter, fn(L::Item) -> Either<L::Item, R::Item>>,
rayon::iter::Map<R::Iter, fn(R::Item) -> Either<L::Item, R::Item>>,
>;
fn into_par_iter(self) -> Self::Iter {
match self.0 {
Either::Left(l) => Either::Left(l.into_par_iter().map(Either::Left)),
Either::Right(r) => Either::Right(r.into_par_iter().map(Either::Right)),
}
}
}
impl<L, R> IntoIterator for EitherIter<L, R>
where
L: IntoIterator,
R: IntoIterator,
{
type Item = Either<L::Item, R::Item>;
type IntoIter = Either<
core::iter::Map<L::IntoIter, fn(L::Item) -> Either<L::Item, R::Item>>,
core::iter::Map<R::IntoIter, fn(R::Item) -> Either<L::Item, R::Item>>,
>;
fn into_iter(self) -> Self::IntoIter {
match self.0 {
Either::Left(l) => Either::Left(l.into_iter().map(Either::Left)),
Either::Right(r) => Either::Right(r.into_iter().map(Either::Right)),
}
}
}
// SAFETY: `EitherIter` is just a newtype over `Either<L, R>`.
unsafe impl<L: Send, R: Send> Send for EitherIter<L, R> {}
impl<A: ExecutableTxTuple, B: ExecutableTxTuple> ExecutableTxTuple for Either<A, B> {
type RawTx = Either<A::RawTx, B::RawTx>;
type Tx = Either<A::Tx, B::Tx>;
type Error = Either<A::Error, B::Error>;
type IntoIter = EitherIter<A::IntoIter, B::IntoIter>;
type Convert = Either<A::Convert, B::Convert>;
fn into_parts(self) -> (Self::IntoIter, Self::Convert) {
match self {
Self::Left(a) => {
let (iter, convert) = a.into_parts();
(EitherIter(Either::Left(iter)), Either::Left(convert))
}
Self::Right(b) => {
let (iter, convert) = b.into_parts();
(EitherIter(Either::Right(iter)), Either::Right(convert))
}
}
}
}

View File

@@ -47,7 +47,7 @@ pub use aliases::*;
#[cfg(feature = "std")]
mod engine;
#[cfg(feature = "std")]
pub use engine::{ConfigureEngineEvm, ExecutableTxIterator, ExecutableTxTuple};
pub use engine::{ConfigureEngineEvm, ConvertTx, ExecutableTxIterator, ExecutableTxTuple};
#[cfg(feature = "metrics")]
pub mod metrics;

View File

@@ -18,6 +18,7 @@ use reth_provider::{
};
use reth_revm::database::StateProviderDatabase;
use reth_testing_utils::generators::sign_tx_with_key_pair;
use reth_trie_common::KeccakKeyHasher;
use secp256k1::Keypair;
pub(crate) fn to_execution_outcome(
@@ -77,12 +78,9 @@ where
let execution_outcome = to_execution_outcome(block.number(), &block_execution_output);
// Commit the block's execution outcome to the database
let hashed_state = execution_outcome.hash_state_slow::<KeccakKeyHasher>().into_sorted();
let provider_rw = provider_factory.provider_rw()?;
provider_rw.append_blocks_with_state(
vec![block.clone()],
&execution_outcome,
Default::default(),
)?;
provider_rw.append_blocks_with_state(vec![block.clone()], &execution_outcome, hashed_state)?;
provider_rw.commit()?;
Ok(block_execution_output)
@@ -210,11 +208,12 @@ where
execution_outcome.state_mut().reverts.sort();
// Commit the block's execution outcome to the database
let hashed_state = execution_outcome.hash_state_slow::<KeccakKeyHasher>().into_sorted();
let provider_rw = provider_factory.provider_rw()?;
provider_rw.append_blocks_with_state(
vec![block1.clone(), block2.clone()],
&execution_outcome,
Default::default(),
hashed_state,
)?;
provider_rw.commit()?;

View File

@@ -589,7 +589,7 @@ impl EthMessageID {
/// Returns the max value for the given version.
pub const fn max(version: EthVersion) -> u8 {
if version.is_eth69() {
if version as u8 >= EthVersion::Eth69 as u8 {
Self::BlockRangeUpdate.to_u8()
} else {
Self::Receipts.to_u8()
@@ -937,6 +937,13 @@ mod tests {
assert!(matches!(decoded, StatusMessage::Legacy(s) if s == status));
}
#[test]
fn eth_message_id_max_includes_block_range_update() {
assert_eq!(EthMessageID::max(EthVersion::Eth69), EthMessageID::BlockRangeUpdate.to_u8(),);
assert_eq!(EthMessageID::max(EthVersion::Eth70), EthMessageID::BlockRangeUpdate.to_u8(),);
assert_eq!(EthMessageID::max(EthVersion::Eth68), EthMessageID::Receipts.to_u8());
}
#[test]
fn decode_status_rejects_non_status() {
let msg = EthMessage::<EthNetworkPrimitives>::GetBlockBodies(RequestPair {

View File

@@ -8,7 +8,7 @@ use reth_eth_wire_types::{
};
use reth_ethereum_forks::ForkId;
use reth_network_p2p::error::{RequestError, RequestResult};
use reth_network_peers::PeerId;
use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::{PeerAddr, PeerKind};
use reth_tokio_util::EventStream;
use std::{
@@ -152,8 +152,13 @@ pub trait NetworkEventListenerProvider: NetworkPeersEvents {
pub enum DiscoveryEvent {
/// Discovered a node
NewNode(DiscoveredEvent),
/// Retrieved a [`ForkId`] from the peer via ENR request, See <https://eips.ethereum.org/EIPS/eip-868>
EnrForkId(PeerId, ForkId),
/// Retrieved a [`ForkId`] from the peer via ENR request.
///
/// Contains the full [`NodeRecord`] (peer ID + address) and the reported [`ForkId`].
/// Used to verify fork compatibility before admitting the peer.
///
/// See also <https://eips.ethereum.org/EIPS/eip-868>
EnrForkId(NodeRecord, ForkId),
}
/// Represents events related to peer discovery in the network.

View File

@@ -172,6 +172,11 @@ pub struct PeersConfig {
/// IPs within the specified CIDR ranges will be allowed.
#[cfg_attr(feature = "serde", serde(skip))]
pub ip_filter: IpFilter,
/// If true, discovered peers without a confirmed ENR [`ForkId`](alloy_eip2124::ForkId)
/// (EIP-868) will not be added to the peer set until their fork ID is verified.
///
/// This filters out peers from other networks that pollute the discovery table.
pub enforce_enr_fork_id: bool,
}
impl Default for PeersConfig {
@@ -191,6 +196,7 @@ impl Default for PeersConfig {
max_backoff_count: 5,
incoming_ip_throttle_duration: INBOUND_IP_THROTTLE_DURATION,
ip_filter: IpFilter::default(),
enforce_enr_fork_id: false,
}
}
}
@@ -314,6 +320,13 @@ impl PeersConfig {
self
}
/// If set, discovered peers without a confirmed ENR [`ForkId`](alloy_eip2124::ForkId) will not
/// be added to the peer set until their fork ID is verified via EIP-868.
pub const fn with_enforce_enr_fork_id(mut self, enforce: bool) -> Self {
self.enforce_enr_fork_id = enforce;
self
}
/// Returns settings for testing
#[cfg(any(test, feature = "test-utils"))]
pub fn test() -> Self {

View File

@@ -240,7 +240,7 @@ impl Discovery {
self.on_node_record_update(record, None);
}
DiscoveryUpdate::EnrForkId(node, fork_id) => {
self.queued_events.push_back(DiscoveryEvent::EnrForkId(node.id, fork_id))
self.queued_events.push_back(DiscoveryEvent::EnrForkId(node, fork_id))
}
DiscoveryUpdate::Removed(peer_id) => {
self.discovered_nodes.remove(&peer_id);

View File

@@ -25,11 +25,11 @@ use crate::{
listener::ConnectionListener,
message::{NewBlockMessage, PeerMessage},
metrics::{
ClosedSessionsMetrics, DisconnectMetrics, NetworkMetrics, PendingSessionFailureMetrics,
NETWORK_POOL_TRANSACTIONS_SCOPE,
BackedOffPeersMetrics, ClosedSessionsMetrics, DirectionalDisconnectMetrics, NetworkMetrics,
PendingSessionFailureMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
},
network::{NetworkHandle, NetworkHandleMessage},
peers::PeersManager,
peers::{BackoffReason, PeersManager},
poll_nested_stream_with_budget,
protocol::IntoRlpxSubProtocol,
required_block_filter::RequiredBlockFilter,
@@ -140,12 +140,14 @@ pub struct NetworkManager<N: NetworkPrimitives = EthNetworkPrimitives> {
num_active_peers: Arc<AtomicUsize>,
/// Metrics for the Network
metrics: NetworkMetrics,
/// Disconnect metrics for the Network
disconnect_metrics: DisconnectMetrics,
/// Disconnect metrics for the Network, split by connection direction.
disconnect_metrics: DirectionalDisconnectMetrics,
/// Closed sessions metrics, split by direction.
closed_sessions_metrics: ClosedSessionsMetrics,
/// Pending session failure metrics, split by direction.
pending_session_failure_metrics: PendingSessionFailureMetrics,
/// Backed off peers metrics, split by reason.
backed_off_peers_metrics: BackedOffPeersMetrics,
}
impl NetworkManager {
@@ -363,6 +365,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
disconnect_metrics: Default::default(),
closed_sessions_metrics: Default::default(),
pending_session_failure_metrics: Default::default(),
backed_off_peers_metrics: Default::default(),
})
}
@@ -861,6 +864,9 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
"Session disconnected"
);
// Capture direction before state is reset to Idle
let is_inbound = self.swarm.state().peers().is_inbound_peer(&peer_id);
let reason = if let Some(ref err) = error {
// If the connection was closed due to an error, we report
// the peer
@@ -869,17 +875,26 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
&peer_id,
err,
);
self.backed_off_peers_metrics.increment_for_reason(
BackoffReason::from_disconnect(err.as_disconnected()),
);
err.as_disconnected()
} else {
// Gracefully disconnected
self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id);
self.backed_off_peers_metrics
.increment_for_reason(BackoffReason::GracefulClose);
None
};
self.closed_sessions_metrics.active.increment(1);
self.update_active_connection_metrics();
if let Some(reason) = reason {
self.disconnect_metrics.increment(reason);
if is_inbound {
self.disconnect_metrics.increment_inbound(reason);
} else {
self.disconnect_metrics.increment_outbound(reason);
}
}
self.metrics
.backed_off_peers
@@ -902,7 +917,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
.on_incoming_pending_session_dropped(remote_addr, err);
self.pending_session_failure_metrics.inbound.increment(1);
if let Some(reason) = err.as_disconnected() {
self.disconnect_metrics.increment(reason);
self.disconnect_metrics.increment_inbound(reason);
}
} else {
self.swarm
@@ -914,9 +929,6 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
self.metrics
.incoming_connections
.set(self.swarm.state().peers().num_inbound_connections() as f64);
self.metrics
.backed_off_peers
.set(self.swarm.state().peers().num_backed_off_peers() as f64);
}
SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
trace!(
@@ -934,8 +946,11 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
err,
);
self.pending_session_failure_metrics.outbound.increment(1);
self.backed_off_peers_metrics.increment_for_reason(
BackoffReason::from_disconnect(err.as_disconnected()),
);
if let Some(reason) = err.as_disconnected() {
self.disconnect_metrics.increment(reason);
self.disconnect_metrics.increment_outbound(reason);
}
} else {
self.swarm
@@ -945,7 +960,6 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
}
self.closed_sessions_metrics.outgoing_pending.increment(1);
self.update_pending_connection_metrics();
self.metrics
.backed_off_peers
.set(self.swarm.state().peers().num_backed_off_peers() as f64);
@@ -965,6 +979,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
&error,
);
self.backed_off_peers_metrics.increment_for_reason(BackoffReason::ConnectionError);
self.metrics
.backed_off_peers
.set(self.swarm.state().peers().num_backed_off_peers() as f64);

View File

@@ -2,7 +2,7 @@ use metrics::Histogram;
use reth_eth_wire::DisconnectReason;
use reth_ethereum_primitives::TxType;
use reth_metrics::{
metrics::{self, Counter, Gauge},
metrics::{Counter, Gauge},
Metrics,
};
@@ -110,6 +110,29 @@ impl Default for PendingSessionFailureMetrics {
}
}
/// Metrics for backed off peers, split by reason.
#[derive(Metrics)]
#[metrics(scope = "network.backed_off_peers")]
pub struct BackedOffPeersMetrics {
/// Peers backed off because they reported too many peers.
pub too_many_peers: Counter,
/// Peers backed off after a graceful session close.
pub graceful_close: Counter,
/// Peers backed off due to connection or protocol errors.
pub connection_error: Counter,
}
impl BackedOffPeersMetrics {
/// Increments the counter for the given backoff reason.
pub fn increment_for_reason(&self, reason: crate::peers::BackoffReason) {
match reason {
crate::peers::BackoffReason::TooManyPeers => self.too_many_peers.increment(1),
crate::peers::BackoffReason::GracefulClose => self.graceful_close.increment(1),
crate::peers::BackoffReason::ConnectionError => self.connection_error.increment(1),
}
}
}
/// Metrics for `SessionManager`
#[derive(Metrics)]
#[metrics(scope = "network")]
@@ -278,6 +301,34 @@ macro_rules! duration_metered_exec {
}};
}
/// Direction-aware wrapper for disconnect metrics.
///
/// Tracks disconnect reasons for inbound and outbound connections separately, in addition to
/// the combined (legacy) counters.
#[derive(Debug, Default)]
pub(crate) struct DirectionalDisconnectMetrics {
/// Combined disconnect metrics (all directions).
pub(crate) total: DisconnectMetrics,
/// Disconnect metrics for inbound connections only.
pub(crate) inbound: InboundDisconnectMetrics,
/// Disconnect metrics for outbound connections only.
pub(crate) outbound: OutboundDisconnectMetrics,
}
impl DirectionalDisconnectMetrics {
/// Increments disconnect counters for an inbound connection.
pub(crate) fn increment_inbound(&self, reason: DisconnectReason) {
self.total.increment(reason);
self.inbound.increment(reason);
}
/// Increments disconnect counters for an outbound connection.
pub(crate) fn increment_outbound(&self, reason: DisconnectReason) {
self.total.increment(reason);
self.outbound.increment(reason);
}
}
/// Metrics for Disconnection types
///
/// These are just counters, and ideally we would implement these metrics on a peer-by-peer basis,
@@ -347,6 +398,144 @@ impl DisconnectMetrics {
}
}
/// Disconnect metrics scoped to inbound connections only.
///
/// These counters track disconnect reasons exclusively for sessions that were initiated by
/// remote peers connecting to this node. This helps operators distinguish between being rejected
/// by remote peers (outbound) vs rejecting incoming peers (inbound).
#[derive(Metrics)]
#[metrics(scope = "network.inbound")]
pub struct InboundDisconnectMetrics {
/// Number of inbound peer disconnects due to `DisconnectRequested` (0x00)
pub(crate) disconnect_requested: Counter,
/// Number of inbound peer disconnects due to `TcpSubsystemError` (0x01)
pub(crate) tcp_subsystem_error: Counter,
/// Number of inbound peer disconnects due to `ProtocolBreach` (0x02)
pub(crate) protocol_breach: Counter,
/// Number of inbound peer disconnects due to `UselessPeer` (0x03)
pub(crate) useless_peer: Counter,
/// Number of inbound peer disconnects due to `TooManyPeers` (0x04)
pub(crate) too_many_peers: Counter,
/// Number of inbound peer disconnects due to `AlreadyConnected` (0x05)
pub(crate) already_connected: Counter,
/// Number of inbound peer disconnects due to `IncompatibleP2PProtocolVersion` (0x06)
pub(crate) incompatible: Counter,
/// Number of inbound peer disconnects due to `NullNodeIdentity` (0x07)
pub(crate) null_node_identity: Counter,
/// Number of inbound peer disconnects due to `ClientQuitting` (0x08)
pub(crate) client_quitting: Counter,
/// Number of inbound peer disconnects due to `UnexpectedHandshakeIdentity` (0x09)
pub(crate) unexpected_identity: Counter,
/// Number of inbound peer disconnects due to `ConnectedToSelf` (0x0a)
pub(crate) connected_to_self: Counter,
/// Number of inbound peer disconnects due to `PingTimeout` (0x0b)
pub(crate) ping_timeout: Counter,
/// Number of inbound peer disconnects due to `SubprotocolSpecific` (0x10)
pub(crate) subprotocol_specific: Counter,
}
impl InboundDisconnectMetrics {
/// Increments the proper counter for the given disconnect reason
pub(crate) fn increment(&self, reason: DisconnectReason) {
match reason {
DisconnectReason::DisconnectRequested => self.disconnect_requested.increment(1),
DisconnectReason::TcpSubsystemError => self.tcp_subsystem_error.increment(1),
DisconnectReason::ProtocolBreach => self.protocol_breach.increment(1),
DisconnectReason::UselessPeer => self.useless_peer.increment(1),
DisconnectReason::TooManyPeers => self.too_many_peers.increment(1),
DisconnectReason::AlreadyConnected => self.already_connected.increment(1),
DisconnectReason::IncompatibleP2PProtocolVersion => self.incompatible.increment(1),
DisconnectReason::NullNodeIdentity => self.null_node_identity.increment(1),
DisconnectReason::ClientQuitting => self.client_quitting.increment(1),
DisconnectReason::UnexpectedHandshakeIdentity => self.unexpected_identity.increment(1),
DisconnectReason::ConnectedToSelf => self.connected_to_self.increment(1),
DisconnectReason::PingTimeout => self.ping_timeout.increment(1),
DisconnectReason::SubprotocolSpecific => self.subprotocol_specific.increment(1),
}
}
}
/// Disconnect metrics scoped to outbound connections only.
///
/// These counters track disconnect reasons exclusively for sessions that this node initiated
/// by dialing out to remote peers. A high `too_many_peers` count here indicates remote peers
/// are rejecting our connection attempts because they are full.
#[derive(Metrics)]
#[metrics(scope = "network.outbound")]
pub struct OutboundDisconnectMetrics {
/// Number of outbound peer disconnects due to `DisconnectRequested` (0x00)
pub(crate) disconnect_requested: Counter,
/// Number of outbound peer disconnects due to `TcpSubsystemError` (0x01)
pub(crate) tcp_subsystem_error: Counter,
/// Number of outbound peer disconnects due to `ProtocolBreach` (0x02)
pub(crate) protocol_breach: Counter,
/// Number of outbound peer disconnects due to `UselessPeer` (0x03)
pub(crate) useless_peer: Counter,
/// Number of outbound peer disconnects due to `TooManyPeers` (0x04)
pub(crate) too_many_peers: Counter,
/// Number of outbound peer disconnects due to `AlreadyConnected` (0x05)
pub(crate) already_connected: Counter,
/// Number of outbound peer disconnects due to `IncompatibleP2PProtocolVersion` (0x06)
pub(crate) incompatible: Counter,
/// Number of outbound peer disconnects due to `NullNodeIdentity` (0x07)
pub(crate) null_node_identity: Counter,
/// Number of outbound peer disconnects due to `ClientQuitting` (0x08)
pub(crate) client_quitting: Counter,
/// Number of outbound peer disconnects due to `UnexpectedHandshakeIdentity` (0x09)
pub(crate) unexpected_identity: Counter,
/// Number of outbound peer disconnects due to `ConnectedToSelf` (0x0a)
pub(crate) connected_to_self: Counter,
/// Number of outbound peer disconnects due to `PingTimeout` (0x0b)
pub(crate) ping_timeout: Counter,
/// Number of outbound peer disconnects due to `SubprotocolSpecific` (0x10)
pub(crate) subprotocol_specific: Counter,
}
impl OutboundDisconnectMetrics {
/// Increments the proper counter for the given disconnect reason
pub(crate) fn increment(&self, reason: DisconnectReason) {
match reason {
DisconnectReason::DisconnectRequested => self.disconnect_requested.increment(1),
DisconnectReason::TcpSubsystemError => self.tcp_subsystem_error.increment(1),
DisconnectReason::ProtocolBreach => self.protocol_breach.increment(1),
DisconnectReason::UselessPeer => self.useless_peer.increment(1),
DisconnectReason::TooManyPeers => self.too_many_peers.increment(1),
DisconnectReason::AlreadyConnected => self.already_connected.increment(1),
DisconnectReason::IncompatibleP2PProtocolVersion => self.incompatible.increment(1),
DisconnectReason::NullNodeIdentity => self.null_node_identity.increment(1),
DisconnectReason::ClientQuitting => self.client_quitting.increment(1),
DisconnectReason::UnexpectedHandshakeIdentity => self.unexpected_identity.increment(1),
DisconnectReason::ConnectedToSelf => self.connected_to_self.increment(1),
DisconnectReason::PingTimeout => self.ping_timeout.increment(1),
DisconnectReason::SubprotocolSpecific => self.subprotocol_specific.increment(1),
}
}
}
/// Metrics for the `EthRequestHandler`
#[derive(Metrics)]
#[metrics(scope = "network")]

View File

@@ -92,6 +92,9 @@ pub struct PeersManager {
incoming_ip_throttle_duration: Duration,
/// IP address filter for restricting network connections to specific IP ranges.
ip_filter: reth_net_banlist::IpFilter,
/// If true, discovered peers without a confirmed ENR fork ID will not be added until their
/// fork ID is verified via EIP-868.
enforce_enr_fork_id: bool,
}
impl PeersManager {
@@ -111,6 +114,7 @@ impl PeersManager {
max_backoff_count,
incoming_ip_throttle_duration,
ip_filter,
enforce_enr_fork_id,
} = config;
let (manager_tx, handle_rx) = mpsc::unbounded_channel();
let now = Instant::now();
@@ -167,6 +171,7 @@ impl PeersManager {
net_connection_state: NetworkConnectionState::default(),
incoming_ip_throttle_duration,
ip_filter,
enforce_enr_fork_id,
}
}
@@ -175,6 +180,11 @@ impl PeersManager {
PeersHandle::new(self.manager_tx.clone())
}
/// Returns `true` if discovered peers must have a confirmed ENR fork ID before being added.
pub(crate) const fn enforce_enr_fork_id(&self) -> bool {
self.enforce_enr_fork_id
}
/// Returns the number of peers in the peer set
#[inline]
pub(crate) fn num_known_peers(&self) -> usize {
@@ -208,6 +218,13 @@ impl PeersManager {
})
}
/// Returns `true` if the given peer is connected via an inbound session.
pub(crate) fn is_inbound_peer(&self, peer_id: &PeerId) -> bool {
self.peers.get(peer_id).is_some_and(|p| {
matches!(p.state, PeerConnectionState::In | PeerConnectionState::DisconnectingIn)
})
}
/// Returns an iterator over all peer ids for peers with the given kind
pub(crate) fn peers_by_kind(&self, kind: PeerKind) -> impl Iterator<Item = PeerId> + '_ {
self.peers.iter().filter_map(move |(peer_id, peer)| (peer.kind == kind).then_some(*peer_id))
@@ -738,17 +755,6 @@ impl PeersManager {
}
}
/// Called as follow-up for a discovered peer.
///
/// The [`ForkId`] is retrieved from an ENR record that the peer announces over the discovery
/// protocol
pub(crate) fn set_discovered_fork_id(&mut self, peer_id: PeerId, fork_id: ForkId) {
if let Some(peer) = self.peers.get_mut(&peer_id) {
trace!(target: "net::peers", ?peer_id, ?fork_id, "set discovered fork id");
peer.fork_id = Some(Box::new(fork_id));
}
}
/// Called for a newly discovered peer.
///
/// If the peer already exists, then the address, kind and `fork_id` will be updated.
@@ -1260,6 +1266,27 @@ impl Display for InboundConnectionError {
}
}
/// The reason a peer was backed off.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BackoffReason {
/// The remote peer responded with `TooManyPeers` (0x04).
TooManyPeers,
/// The session was gracefully closed and we're backing off briefly.
GracefulClose,
/// A connection or protocol-level error occurred.
ConnectionError,
}
impl BackoffReason {
/// Derives the backoff reason from an optional [`DisconnectReason`].
pub const fn from_disconnect(reason: Option<DisconnectReason>) -> Self {
match reason {
Some(DisconnectReason::TooManyPeers) => Self::TooManyPeers,
_ => Self::ConnectionError,
}
}
}
#[cfg(test)]
mod tests {
use alloy_primitives::B512;

View File

@@ -332,9 +332,19 @@ impl<N: NetworkPrimitives> NetworkState<N> {
fork_id,
});
}
DiscoveryEvent::EnrForkId(peer_id, fork_id) => {
self.queued_messages
.push_back(StateAction::DiscoveredEnrForkId { peer_id, fork_id });
DiscoveryEvent::EnrForkId(record, fork_id) => {
let peer_id = record.id;
let tcp_addr = record.tcp_addr();
if tcp_addr.port() == 0 {
return
}
let udp_addr = record.udp_addr();
let addr = PeerAddr::new(tcp_addr, Some(udp_addr));
self.queued_messages.push_back(StateAction::DiscoveredEnrForkId {
peer_id,
addr,
fork_id,
});
}
}
}
@@ -552,6 +562,8 @@ pub(crate) enum StateAction<N: NetworkPrimitives> {
/// Retrieved a [`ForkId`] from the peer via ENR request, See <https://eips.ethereum.org/EIPS/eip-868>
DiscoveredEnrForkId {
peer_id: PeerId,
/// The address of the peer.
addr: PeerAddr,
/// The reported [`ForkId`] by this peer.
fork_id: ForkId,
},

View File

@@ -246,18 +246,28 @@ impl<N: NetworkPrimitives> Swarm<N> {
StateAction::PeerAdded(peer_id) => return Some(SwarmEvent::PeerAdded(peer_id)),
StateAction::PeerRemoved(peer_id) => return Some(SwarmEvent::PeerRemoved(peer_id)),
StateAction::DiscoveredNode { peer_id, addr, fork_id } => {
// Don't try to connect to peer if node is shutting down
if self.is_shutting_down() {
return None
}
// Insert peer only if no fork id or a valid fork id
if fork_id.map_or_else(|| true, |f| self.sessions.is_valid_fork_id(f)) {
// When `enforce_enr_fork_id` is enabled, peers discovered without a confirmed
// fork ID (via EIP-868 ENR) are deferred — they'll only be added once a
// `DiscoveredEnrForkId` event arrives with a validated fork ID.
//
// When disabled (default), peers without a fork ID are admitted immediately.
// Peers that *do* carry a fork ID are always validated against ours.
let enforce = self.state().peers().enforce_enr_fork_id();
let allow = match fork_id {
Some(f) => self.sessions.is_valid_fork_id(f),
None => !enforce,
};
if allow {
self.state_mut().peers_mut().add_peer(peer_id, addr, fork_id);
}
}
StateAction::DiscoveredEnrForkId { peer_id, fork_id } => {
StateAction::DiscoveredEnrForkId { peer_id, addr, fork_id } => {
if self.sessions.is_valid_fork_id(fork_id) {
self.state_mut().peers_mut().set_discovered_fork_id(peer_id, fork_id);
self.state_mut().peers_mut().add_peer(peer_id, addr, Some(fork_id));
} else {
trace!(target: "net", ?peer_id, remote_fork_id=?fork_id, our_fork_id=?self.sessions.fork_id(), "fork id mismatch, removing peer");
self.state_mut().peers_mut().remove_peer(peer_id);

View File

@@ -232,7 +232,7 @@ impl LaunchContext {
.map_or(0, |num| num.get().saturating_sub(reserved_cpu_cores).max(1));
if let Err(err) = ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(|i| format!("rayon-{i}"))
.thread_name(|i| format!("rayon-{i:02}"))
.build_global()
{
warn!(%err, "Failed to build global thread pool")

View File

@@ -303,8 +303,6 @@ impl EngineNodeLauncher {
// the CL
loop {
tokio::select! {
biased;
event = engine_service.next() => {
let Some(event) = event else { break };
debug!(target: "reth::cli", "Event: {event}");

View File

@@ -43,6 +43,7 @@ pub struct DefaultEngineValues {
disable_trie_cache: bool,
sparse_trie_prune_depth: usize,
sparse_trie_max_storage_tries: usize,
disable_sparse_trie_cache_pruning: bool,
state_root_task_timeout: Option<String>,
}
@@ -198,6 +199,12 @@ impl DefaultEngineValues {
self
}
/// Set whether to disable sparse trie cache pruning by default
pub const fn with_disable_sparse_trie_cache_pruning(mut self, v: bool) -> Self {
self.disable_sparse_trie_cache_pruning = v;
self
}
/// Set the default state root task timeout
pub fn with_state_root_task_timeout(mut self, v: Option<String>) -> Self {
self.state_root_task_timeout = v;
@@ -231,6 +238,7 @@ impl Default for DefaultEngineValues {
disable_trie_cache: false,
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
disable_sparse_trie_cache_pruning: false,
state_root_task_timeout: Some("1s".to_string()),
}
}
@@ -372,6 +380,12 @@ pub struct EngineArgs {
#[arg(long = "engine.sparse-trie-max-storage-tries", default_value_t = DefaultEngineValues::get_global().sparse_trie_max_storage_tries)]
pub sparse_trie_max_storage_tries: usize,
/// Fully disable sparse trie cache pruning. When set, the cached sparse trie is preserved
/// without any node pruning or storage trie eviction between blocks. Useful for benchmarking
/// the effects of retaining the full trie cache.
#[arg(long = "engine.disable-sparse-trie-cache-pruning", default_value_t = DefaultEngineValues::get_global().disable_sparse_trie_cache_pruning)]
pub disable_sparse_trie_cache_pruning: bool,
/// Configure the timeout for the state root task before spawning a sequential fallback.
/// If the state root task takes longer than this, a sequential computation starts in
/// parallel and whichever finishes first is used.
@@ -415,6 +429,7 @@ impl Default for EngineArgs {
disable_trie_cache,
sparse_trie_prune_depth,
sparse_trie_max_storage_tries,
disable_sparse_trie_cache_pruning,
state_root_task_timeout,
} = DefaultEngineValues::get_global().clone();
Self {
@@ -445,6 +460,7 @@ impl Default for EngineArgs {
disable_trie_cache,
sparse_trie_prune_depth,
sparse_trie_max_storage_tries,
disable_sparse_trie_cache_pruning,
state_root_task_timeout: state_root_task_timeout
.as_deref()
.map(|s| humantime::parse_duration(s).expect("valid default duration")),
@@ -480,6 +496,7 @@ impl EngineArgs {
.with_disable_trie_cache(self.disable_trie_cache)
.with_sparse_trie_prune_depth(self.sparse_trie_prune_depth)
.with_sparse_trie_max_storage_tries(self.sparse_trie_max_storage_tries)
.with_disable_sparse_trie_cache_pruning(self.disable_sparse_trie_cache_pruning)
.with_state_root_task_timeout(self.state_root_task_timeout.filter(|d| !d.is_zero()))
}
}
@@ -534,6 +551,7 @@ mod tests {
disable_trie_cache: true,
sparse_trie_prune_depth: 10,
sparse_trie_max_storage_tries: 100,
disable_sparse_trie_cache_pruning: true,
state_root_task_timeout: Some(Duration::from_secs(2)),
};
@@ -570,6 +588,7 @@ mod tests {
"10",
"--engine.sparse-trie-max-storage-tries",
"100",
"--engine.disable-sparse-trie-cache-pruning",
"--engine.state-root-task-timeout",
"2s",
])

View File

@@ -76,11 +76,7 @@ pub use era::{DefaultEraHost, EraArgs, EraSourceArgs};
mod static_files;
pub use static_files::{StaticFilesArgs, MINIMAL_BLOCKS_PER_FILE};
/// `RocksDbArgs` for configuring RocksDB table routing.
mod rocksdb;
pub use rocksdb::{RocksDbArgs, RocksDbArgsError};
/// `StorageArgs` for configuring storage mode (v2 vs v1/legacy).
/// `StorageArgs` for configuring storage settings.
mod storage;
pub use storage::StorageArgs;

View File

@@ -227,6 +227,14 @@ pub struct NetworkArgs {
/// Example: --netrestrict "192.168.0.0/16,10.0.0.0/8"
#[arg(long, value_name = "NETRESTRICT")]
pub netrestrict: Option<String>,
/// Enforce EIP-868 ENR fork ID validation for discovered peers.
///
/// When enabled, peers discovered without a confirmed fork ID are not added to the peer set
/// until their fork ID is verified via EIP-868 ENR request. This filters out peers from other
/// networks that pollute the discovery table.
#[arg(long)]
pub enforce_enr_fork_id: bool,
}
impl NetworkArgs {
@@ -333,7 +341,8 @@ impl NetworkArgs {
)
.with_max_inbound_opt(self.resolved_max_inbound_peers())
.with_max_outbound_opt(self.resolved_max_outbound_peers())
.with_ip_filter(ip_filter);
.with_ip_filter(ip_filter)
.with_enforce_enr_fork_id(self.enforce_enr_fork_id);
// Configure basic network stack
NetworkConfigBuilder::<N>::new(secret_key)
@@ -491,6 +500,7 @@ impl Default for NetworkArgs {
required_block_hashes: vec![],
network_id: None,
netrestrict: None,
enforce_enr_fork_id: false,
}
}
}

View File

@@ -1,160 +0,0 @@
//! clap [Args](clap::Args) for `RocksDB` table routing configuration
use clap::{ArgAction, Args};
/// Parameters for `RocksDB` table routing configuration.
///
/// These flags control which database tables are stored in `RocksDB` instead of MDBX.
/// All flags are genesis-initialization-only: changing them after genesis requires a re-sync.
///
/// When `--storage.v2` is used, the defaults for these flags change to enable `RocksDB` routing.
/// Individual flags can still override those defaults when explicitly set.
#[derive(Debug, Args, PartialEq, Eq, Clone, Copy, Default)]
#[command(next_help_heading = "RocksDB")]
pub struct RocksDbArgs {
/// Route all supported tables to `RocksDB` instead of MDBX.
///
/// This enables `RocksDB` for `tx-hash`, `storages-history`, and `account-history` tables.
/// Cannot be combined with individual flags set to false.
#[arg(long = "rocksdb.all", action = ArgAction::SetTrue)]
pub all: bool,
/// Route tx hash -> number table to `RocksDB` instead of MDBX.
///
/// This is a genesis-initialization-only flag: changing it after genesis requires a re-sync.
/// Defaults to the base storage mode (v1: false, v2: true).
#[arg(long = "rocksdb.tx-hash", action = ArgAction::Set)]
pub tx_hash: Option<bool>,
/// Route storages history tables to `RocksDB` instead of MDBX.
///
/// This is a genesis-initialization-only flag: changing it after genesis requires a re-sync.
/// Defaults to the base storage mode (v1: false, v2: true).
#[arg(long = "rocksdb.storages-history", action = ArgAction::Set)]
pub storages_history: Option<bool>,
/// Route account history tables to `RocksDB` instead of MDBX.
///
/// This is a genesis-initialization-only flag: changing it after genesis requires a re-sync.
/// Defaults to the base storage mode (v1: false, v2: true).
#[arg(long = "rocksdb.account-history", action = ArgAction::Set)]
pub account_history: Option<bool>,
}
impl RocksDbArgs {
/// Validates the `RocksDB` arguments.
///
/// Returns an error if `--rocksdb.all` is used with any individual flag explicitly set to
/// `false`.
pub const fn validate(&self) -> Result<(), RocksDbArgsError> {
if self.all {
if matches!(self.tx_hash, Some(false)) {
return Err(RocksDbArgsError::ConflictingFlags("tx-hash"));
}
if matches!(self.storages_history, Some(false)) {
return Err(RocksDbArgsError::ConflictingFlags("storages-history"));
}
if matches!(self.account_history, Some(false)) {
return Err(RocksDbArgsError::ConflictingFlags("account-history"));
}
}
Ok(())
}
}
/// Error type for `RocksDB` argument validation.
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub enum RocksDbArgsError {
/// `--rocksdb.all` cannot be combined with an individual flag set to false.
#[error("--rocksdb.all cannot be combined with --rocksdb.{0}=false")]
ConflictingFlags(&'static str),
}
#[cfg(test)]
mod tests {
use super::*;
use clap::Parser;
#[derive(Parser)]
struct CommandParser<T: Args> {
#[command(flatten)]
args: T,
}
#[test]
fn test_default_rocksdb_args() {
let args = CommandParser::<RocksDbArgs>::parse_from(["reth"]).args;
assert_eq!(args, RocksDbArgs::default());
assert!(!args.all);
assert!(args.tx_hash.is_none());
assert!(args.storages_history.is_none());
assert!(args.account_history.is_none());
}
#[test]
fn test_parse_all_flag() {
let args = CommandParser::<RocksDbArgs>::parse_from(["reth", "--rocksdb.all"]).args;
assert!(args.all);
assert!(args.tx_hash.is_none());
}
#[test]
fn test_parse_individual_flags() {
let args = CommandParser::<RocksDbArgs>::parse_from([
"reth",
"--rocksdb.tx-hash=true",
"--rocksdb.storages-history=false",
"--rocksdb.account-history=true",
])
.args;
assert!(!args.all);
assert_eq!(args.tx_hash, Some(true));
assert_eq!(args.storages_history, Some(false));
assert_eq!(args.account_history, Some(true));
}
#[test]
fn test_validate_all_with_none_ok() {
let args =
RocksDbArgs { all: true, tx_hash: None, storages_history: None, account_history: None };
assert!(args.validate().is_ok());
}
#[test]
fn test_validate_all_with_true_ok() {
let args = RocksDbArgs {
all: true,
tx_hash: Some(true),
storages_history: Some(true),
account_history: Some(true),
};
assert!(args.validate().is_ok());
}
#[test]
fn test_validate_all_with_false_errors() {
let args = RocksDbArgs {
all: true,
tx_hash: Some(false),
storages_history: None,
account_history: None,
};
assert_eq!(args.validate(), Err(RocksDbArgsError::ConflictingFlags("tx-hash")));
let args = RocksDbArgs {
all: true,
tx_hash: None,
storages_history: Some(false),
account_history: None,
};
assert_eq!(args.validate(), Err(RocksDbArgsError::ConflictingFlags("storages-history")));
let args = RocksDbArgs {
all: true,
tx_hash: None,
storages_history: None,
account_history: Some(false),
};
assert_eq!(args.validate(), Err(RocksDbArgsError::ConflictingFlags("account-history")));
}
}

View File

@@ -9,9 +9,6 @@ use reth_config::config::{BlocksPerFileConfig, StaticFilesConfig};
pub const MINIMAL_BLOCKS_PER_FILE: u64 = 10000;
/// Parameters for static files configuration
///
/// When `--storage.v2` is used, the defaults for the storage flags change to enable static file
/// storage. Individual flags can still override those defaults when explicitly set.
#[derive(Debug, Args, PartialEq, Eq, Clone, Copy, Default)]
#[command(next_help_heading = "Static Files")]
pub struct StaticFilesArgs {
@@ -38,53 +35,6 @@ pub struct StaticFilesArgs {
/// Number of blocks per file for the storage changesets segment.
#[arg(long = "static-files.blocks-per-file.storage-change-sets")]
pub blocks_per_file_storage_change_sets: Option<u64>,
/// Store receipts in static files instead of the database.
///
/// When enabled, receipts will be written to static files on disk instead of the database.
///
/// Note: This setting can only be configured at genesis initialization. Once
/// the node has been initialized, changing this flag requires re-syncing from scratch.
///
/// Defaults to the base storage mode (v1: false, v2: true).
#[arg(long = "static-files.receipts", action = clap::ArgAction::Set)]
pub receipts: Option<bool>,
/// Store transaction senders in static files instead of the database.
///
/// When enabled, transaction senders will be written to static files on disk instead of the
/// database.
///
/// Note: This setting can only be configured at genesis initialization. Once
/// the node has been initialized, changing this flag requires re-syncing from scratch.
///
/// Defaults to the base storage mode (v1: false, v2: true).
#[arg(long = "static-files.transaction-senders", action = clap::ArgAction::Set)]
pub transaction_senders: Option<bool>,
/// Store account changesets in static files.
///
/// When enabled, account changesets will be written to static files on disk instead of the
/// database.
///
/// Note: This setting can only be configured at genesis initialization. Once
/// the node has been initialized, changing this flag requires re-syncing from scratch.
///
/// Defaults to the base storage mode (v1: false, v2: true).
#[arg(long = "static-files.account-change-sets", action = clap::ArgAction::Set)]
pub account_changesets: Option<bool>,
/// Store storage changesets in static files.
///
/// When enabled, storage changesets will be written to static files on disk instead of the
/// database.
///
/// Note: This setting can only be configured at genesis initialization. Once
/// the node has been initialized, changing this flag requires re-syncing from scratch.
///
/// Defaults to the base storage mode (v1: false, v2: true).
#[arg(long = "static-files.storage-change-sets", action = clap::ArgAction::Set)]
pub storage_changesets: Option<bool>,
}
impl StaticFilesArgs {

View File

@@ -1,11 +1,13 @@
//! clap [Args](clap::Args) for storage mode configuration
//! clap [Args](clap::Args) for storage configuration
use clap::{ArgAction, Args};
/// Parameters for storage mode configuration.
/// Parameters for storage configuration.
///
/// This controls whether the node uses v2 storage defaults (with `RocksDB` and static file
/// optimizations) or v1/legacy storage defaults.
///
/// Individual storage settings can be overridden with `--static-files.*` and `--rocksdb.*` flags.
#[derive(Debug, Args, PartialEq, Eq, Clone, Copy, Default)]
#[command(next_help_heading = "Storage")]
pub struct StorageArgs {
@@ -23,16 +25,6 @@ pub struct StorageArgs {
/// flags.
#[arg(long = "storage.v2", action = ArgAction::SetTrue)]
pub v2: bool,
/// Use hashed state tables (`HashedAccounts`/`HashedStorages`) as canonical state
/// representation instead of plain state tables.
///
/// When enabled, execution writes directly to hashed tables, eliminating the need for
/// separate hashing stages. This should only be enabled for new databases.
///
/// WARNING: Changing this setting on an existing database requires a full resync.
#[arg(long = "storage.use-hashed-state", default_value_t = false)]
pub use_hashed_state: bool,
}
#[cfg(test)]
@@ -40,21 +32,24 @@ mod tests {
use super::*;
use clap::Parser;
/// A helper type to parse Args more easily
#[derive(Parser)]
struct CommandParser {
struct CommandParser<T: Args> {
#[command(flatten)]
args: StorageArgs,
args: T,
}
#[test]
fn test_default_storage_args() {
let args = CommandParser::parse_from(["reth"]).args;
let default_args = StorageArgs::default();
let args = CommandParser::<StorageArgs>::parse_from(["reth"]).args;
assert_eq!(args, default_args);
assert!(!args.v2);
}
#[test]
fn test_parse_v2_flag() {
let args = CommandParser::parse_from(["reth", "--storage.v2"]).args;
let args = CommandParser::<StorageArgs>::parse_from(["reth", "--storage.v2"]).args;
assert!(args.v2);
}
}

View File

@@ -3,7 +3,7 @@
use crate::{
args::{
DatabaseArgs, DatadirArgs, DebugArgs, DevArgs, EngineArgs, NetworkArgs, PayloadBuilderArgs,
PruningArgs, RocksDbArgs, RpcServerArgs, StaticFilesArgs, StorageArgs, TxPoolArgs,
PruningArgs, RpcServerArgs, StaticFilesArgs, StorageArgs, TxPoolArgs,
},
dirs::{ChainPath, DataDirPath},
utils::get_single_header,
@@ -152,10 +152,7 @@ pub struct NodeConfig<ChainSpec> {
/// All static files related arguments
pub static_files: StaticFilesArgs,
/// All `RocksDB` table routing arguments
pub rocksdb: RocksDbArgs,
/// Storage mode configuration (v2 vs v1/legacy)
/// All storage related arguments with --storage prefix
pub storage: StorageArgs,
}
@@ -188,7 +185,6 @@ impl<ChainSpec> NodeConfig<ChainSpec> {
engine: EngineArgs::default(),
era: EraArgs::default(),
static_files: StaticFilesArgs::default(),
rocksdb: RocksDbArgs::default(),
storage: StorageArgs::default(),
}
}
@@ -264,7 +260,6 @@ impl<ChainSpec> NodeConfig<ChainSpec> {
engine,
era,
static_files,
rocksdb,
storage,
..
} = self;
@@ -285,7 +280,6 @@ impl<ChainSpec> NodeConfig<ChainSpec> {
engine,
era,
static_files,
rocksdb,
storage,
}
}
@@ -355,6 +349,12 @@ impl<ChainSpec> NodeConfig<ChainSpec> {
self
}
/// Set the storage args for the node
pub const fn with_storage(mut self, storage: StorageArgs) -> Self {
self.storage = storage;
self
}
/// Returns pruning configuration.
pub fn prune_config(&self) -> Option<PruneConfig>
where
@@ -363,42 +363,17 @@ impl<ChainSpec> NodeConfig<ChainSpec> {
self.pruning.prune_config(&self.chain)
}
/// Returns the effective storage settings derived from `--storage.v2`, static-file, and
/// `RocksDB` CLI args.
/// Returns the effective storage settings derived from `--storage.v2`.
///
/// The base storage mode is determined by `--storage.v2`:
/// - When `--storage.v2` is set: uses [`StorageSettings::v2()`] defaults
/// - Otherwise: uses [`StorageSettings::v1()`] defaults
///
/// Individual `--static-files.*` and `--rocksdb.*` flags override the base when explicitly set.
/// - Otherwise: uses [`StorageSettings::base()`] defaults
pub const fn storage_settings(&self) -> StorageSettings {
let mut s = if self.storage.v2 { StorageSettings::v2() } else { StorageSettings::base() };
// Apply static files overrides (only when explicitly set)
s = s
.with_receipts_in_static_files_opt(self.static_files.receipts)
.with_transaction_senders_in_static_files_opt(self.static_files.transaction_senders)
.with_account_changesets_in_static_files_opt(self.static_files.account_changesets)
.with_storage_changesets_in_static_files_opt(self.static_files.storage_changesets);
// Apply rocksdb overrides
// --rocksdb.all sets all rocksdb flags to true
if self.rocksdb.all {
s = s
.with_transaction_hash_numbers_in_rocksdb(true)
.with_storages_history_in_rocksdb(true)
.with_account_history_in_rocksdb(true);
if self.storage.v2 {
StorageSettings::v2()
} else {
StorageSettings::base()
}
// Individual rocksdb flags override --rocksdb.all when explicitly set
s = s
.with_transaction_hash_numbers_in_rocksdb_opt(self.rocksdb.tx_hash)
.with_storages_history_in_rocksdb_opt(self.rocksdb.storages_history)
.with_account_history_in_rocksdb_opt(self.rocksdb.account_history);
s = s.with_use_hashed_state(self.storage.use_hashed_state);
s
}
/// Returns the max block that the node should run to, looking it up from the network if
@@ -595,7 +570,6 @@ impl<ChainSpec> NodeConfig<ChainSpec> {
engine: self.engine,
era: self.era,
static_files: self.static_files,
rocksdb: self.rocksdb,
storage: self.storage,
}
}
@@ -638,7 +612,6 @@ impl<ChainSpec> Clone for NodeConfig<ChainSpec> {
engine: self.engine.clone(),
era: self.era.clone(),
static_files: self.static_files,
rocksdb: self.rocksdb,
storage: self.storage,
}
}

View File

@@ -265,7 +265,7 @@ impl NodeState {
warn!(number=block.number(), hash=?block.hash(), "Encountered invalid block");
}
ConsensusEngineEvent::BlockReceived(num_hash) => {
info!(number=num_hash.number, hash=?num_hash.hash, "Received block from consensus engine");
info!(number=num_hash.number, hash=?num_hash.hash, "Received new payload from consensus engine");
}
}
}

View File

@@ -164,7 +164,7 @@ pub use alloy_primitives::{logs_bloom, Log, LogData};
pub mod proofs;
mod storage;
pub use storage::{StorageEntry, ValueWithSubKey};
pub use storage::{StorageEntry, StorageSlotKey, ValueWithSubKey};
pub mod sync;

View File

@@ -1,4 +1,4 @@
use alloy_primitives::{B256, U256};
use alloy_primitives::{keccak256, B256, U256};
/// Trait for `DupSort` table values that contain a subkey.
///
@@ -12,6 +12,117 @@ pub trait ValueWithSubKey {
fn get_subkey(&self) -> Self::SubKey;
}
/// A storage slot key that tracks whether it holds a plain (unhashed) EVM slot
/// or a keccak256-hashed slot.
///
/// This enum replaces the `use_hashed_state: bool` parameter pattern by carrying
/// provenance with the key itself. Once tagged at a read/write boundary, downstream
/// code can call [`Self::to_hashed`] without risk of double-hashing — hashing a
/// [`StorageSlotKey::Hashed`] is a no-op.
///
/// The on-disk encoding is unchanged (raw 32-byte [`B256`]). The variant is set
/// by the code that knows the context (which table, which storage mode).
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum StorageSlotKey {
/// An unhashed EVM storage slot, as produced by REVM execution.
Plain(B256),
/// A keccak256-hashed storage slot, as stored in `HashedStorages` and
/// in v2-mode `StorageChangeSets`.
Hashed(B256),
}
impl Default for StorageSlotKey {
fn default() -> Self {
Self::Plain(B256::ZERO)
}
}
impl StorageSlotKey {
/// Create a plain slot key from a REVM [`U256`] storage index.
pub const fn from_u256(slot: U256) -> Self {
Self::Plain(B256::new(slot.to_be_bytes()))
}
/// Create a plain slot key from a raw [`B256`].
pub const fn plain(key: B256) -> Self {
Self::Plain(key)
}
/// Create a hashed slot key from a raw [`B256`].
pub const fn hashed(key: B256) -> Self {
Self::Hashed(key)
}
/// Tag a raw [`B256`] based on the storage mode.
///
/// When `use_hashed_state` is true the key is assumed already hashed.
/// When false it is assumed to be a plain slot.
pub const fn from_raw(key: B256, use_hashed_state: bool) -> Self {
if use_hashed_state {
Self::Hashed(key)
} else {
Self::Plain(key)
}
}
/// Returns the raw [`B256`] regardless of variant.
pub const fn as_b256(&self) -> B256 {
match *self {
Self::Plain(b) | Self::Hashed(b) => b,
}
}
/// Returns `true` if this key is already hashed.
pub const fn is_hashed(&self) -> bool {
matches!(self, Self::Hashed(_))
}
/// Returns `true` if this key is plain (unhashed).
pub const fn is_plain(&self) -> bool {
matches!(self, Self::Plain(_))
}
/// Produce the keccak256-hashed form of this slot key.
///
/// - If already [`Hashed`](Self::Hashed), returns the inner value as-is (no double-hash).
/// - If [`Plain`](Self::Plain), applies keccak256 and returns the result.
pub fn to_hashed(&self) -> B256 {
match *self {
Self::Hashed(b) => b,
Self::Plain(b) => keccak256(b),
}
}
/// Convert a plain slot to its changeset representation.
///
/// In v2 mode (`use_hashed_state = true`), the changeset stores hashed keys,
/// so the plain key is hashed. In v1 mode, the plain key is stored as-is.
///
/// Panics (debug) if called on an already-hashed key.
pub fn to_changeset_key(self, use_hashed_state: bool) -> B256 {
debug_assert!(self.is_plain(), "to_changeset_key called on already-hashed key");
if use_hashed_state {
self.to_hashed()
} else {
self.as_b256()
}
}
/// Like [`to_changeset_key`](Self::to_changeset_key) but returns a tagged
/// [`StorageSlotKey`] instead of a raw [`B256`].
///
/// Panics (debug) if called on an already-hashed key.
pub fn to_changeset(self, use_hashed_state: bool) -> Self {
Self::from_raw(self.to_changeset_key(use_hashed_state), use_hashed_state)
}
}
impl From<StorageSlotKey> for B256 {
fn from(key: StorageSlotKey) -> Self {
key.as_b256()
}
}
/// Account storage entry.
///
/// `key` is the subkey when used as a value in the `StorageChangeSets` table.
@@ -31,6 +142,14 @@ impl StorageEntry {
pub const fn new(key: B256, value: U256) -> Self {
Self { key, value }
}
/// Tag this entry's key as a [`StorageSlotKey`] based on the storage mode.
///
/// When `use_hashed_state` is true, the key is tagged as already-hashed.
/// When false, it is tagged as plain.
pub const fn slot_key(&self, use_hashed_state: bool) -> StorageSlotKey {
StorageSlotKey::from_raw(self.key, use_hashed_state)
}
}
impl ValueWithSubKey for StorageEntry {

View File

@@ -20,27 +20,28 @@ reth-static-file-types.workspace = true
# ethereum
alloy-consensus.workspace = true
alloy-primitives = { workspace = true, optional = true }
alloy-rlp = { workspace = true, optional = true }
alloy-eips = { workspace = true, optional = true }
alloy-genesis = { workspace = true, optional = true }
# for eip-4844
c-kzg = { workspace = true, features = ["serde"], optional = true }
# misc
once_cell.workspace = true
reth-codecs = { workspace = true, optional = true }
[dev-dependencies]
# eth
reth-primitives-traits = { workspace = true, features = ["arbitrary", "test-utils"] }
alloy-primitives.workspace = true
alloy-rlp.workspace = true
alloy-eips = { workspace = true, features = ["arbitrary"] }
alloy-genesis.workspace = true
arbitrary = { workspace = true, features = ["derive"] }
proptest-arbitrary-interop.workspace = true
proptest.workspace = true
reth-codecs.workspace = true
criterion.workspace = true

View File

@@ -18,6 +18,18 @@
#![cfg_attr(docsrs, feature(doc_cfg))]
#![cfg_attr(not(feature = "std"), no_std)]
// These are used as optional dependencies solely for feature forwarding.
#[cfg(feature = "alloy-eips")]
use alloy_eips as _;
#[cfg(feature = "alloy-genesis")]
use alloy_genesis as _;
#[cfg(feature = "alloy-primitives")]
use alloy_primitives as _;
#[cfg(feature = "alloy-rlp")]
use alloy_rlp as _;
#[cfg(feature = "reth-codecs")]
use reth_codecs as _;
mod block;
mod receipt;
pub use reth_static_file_types as static_file;

View File

@@ -75,7 +75,7 @@ where
// Check where account history indices are stored
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().account_history_in_rocksdb {
if provider.cached_storage_settings().storage_v2 {
return self.prune_rocksdb(provider, input, range, range_end);
}
@@ -405,9 +405,7 @@ mod tests {
let segment = AccountHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_account_changesets_in_static_files(false),
);
provider.set_storage_settings_cache(StorageSettings::v1());
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
@@ -508,7 +506,11 @@ mod tests {
test_prune(1400, 3, (PruneProgress::Finished, 804));
}
/// Tests the `prune_static_files` code path. On unix with rocksdb feature, v2 storage
/// routes to `prune_rocksdb` instead, so this test only runs without rocksdb (the
/// `prune_rocksdb_path` test covers that configuration).
#[test]
#[cfg(not(all(unix, feature = "rocksdb")))]
fn prune_static_file() {
let db = TestStageDB::default();
let mut rng = generators::rng();
@@ -564,9 +566,7 @@ mod tests {
let segment = AccountHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_account_changesets_in_static_files(true),
);
provider.set_storage_settings_cache(StorageSettings::v2());
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
@@ -714,11 +714,7 @@ mod tests {
PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() };
let segment = AccountHistory::new(prune_mode);
db.factory.set_storage_settings_cache(
StorageSettings::default()
.with_account_changesets_in_static_files(true)
.with_account_history_in_rocksdb(true),
);
db.factory.set_storage_settings_cache(StorageSettings::v2());
let provider = db.factory.database_provider_rw().unwrap();
let result = segment.prune(&provider, input).unwrap();
@@ -832,9 +828,7 @@ mod tests {
let segment = AccountHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_account_changesets_in_static_files(false),
);
provider.set_storage_settings_cache(StorageSettings::v1());
let result = segment.prune(&provider, input).unwrap();
// Should report that there's more data
@@ -892,9 +886,7 @@ mod tests {
};
let provider2 = db.factory.database_provider_rw().unwrap();
provider2.set_storage_settings_cache(
StorageSettings::default().with_account_changesets_in_static_files(false),
);
provider2.set_storage_settings_cache(StorageSettings::v1());
let result2 = segment.prune(&provider2, input2).unwrap();
assert!(result2.progress.is_finished(), "Second run should complete");

View File

@@ -76,7 +76,7 @@ where
// Check where storage history indices are stored
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storages_history_in_rocksdb {
if provider.cached_storage_settings().storage_v2 {
return self.prune_rocksdb(provider, input, range, range_end);
}
@@ -135,7 +135,7 @@ impl StorageHistory {
let (block_address, entry) = result?;
let block_number = block_address.block_number();
let address = block_address.address();
highest_deleted_storages.insert((address, entry.key), block_number);
highest_deleted_storages.insert((address, entry.key.as_b256()), block_number);
last_changeset_pruned_block = Some(block_number);
pruned_changesets += 1;
limiter.increment_deleted_entries_count();
@@ -273,7 +273,7 @@ impl StorageHistory {
let (block_address, entry) = result?;
let block_number = block_address.block_number();
let address = block_address.address();
highest_deleted_storages.insert((address, entry.key), block_number);
highest_deleted_storages.insert((address, entry.key.as_b256()), block_number);
last_changeset_pruned_block = Some(block_number);
changesets_processed += 1;
limiter.increment_deleted_entries_count();
@@ -413,9 +413,7 @@ mod tests {
let segment = StorageHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_storage_changesets_in_static_files(false),
);
provider.set_storage_settings_cache(StorageSettings::v1());
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
@@ -520,7 +518,11 @@ mod tests {
test_prune(1200, 3, (PruneProgress::Finished, 202));
}
/// Tests the `prune_static_files` code path. On unix with rocksdb feature, v2 storage
/// routes to `prune_rocksdb` instead, so this test only runs without rocksdb (the
/// `prune_rocksdb_path` test covers that configuration).
#[test]
#[cfg(not(all(unix, feature = "rocksdb")))]
fn prune_static_file() {
let db = TestStageDB::default();
let mut rng = generators::rng();
@@ -577,9 +579,7 @@ mod tests {
let segment = StorageHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_storage_changesets_in_static_files(true),
);
provider.set_storage_settings_cache(StorageSettings::v2());
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
@@ -739,9 +739,7 @@ mod tests {
let segment = StorageHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_storage_changesets_in_static_files(false),
);
provider.set_storage_settings_cache(StorageSettings::v1());
let result = segment.prune(&provider, input).unwrap();
// Should report that there's more data
@@ -793,9 +791,7 @@ mod tests {
};
let provider2 = db.factory.database_provider_rw().unwrap();
provider2.set_storage_settings_cache(
StorageSettings::default().with_storage_changesets_in_static_files(false),
);
provider2.set_storage_settings_cache(StorageSettings::v1());
let result2 = segment.prune(&provider2, input2).unwrap();
assert!(result2.progress.is_finished(), "Second run should complete");
@@ -895,11 +891,7 @@ mod tests {
let segment = StorageHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default()
.with_storage_changesets_in_static_files(true)
.with_storages_history_in_rocksdb(true),
);
provider.set_storage_settings_cache(StorageSettings::v2());
let result = segment.prune(&provider, input).unwrap();
provider.commit().expect("commit");

View File

@@ -96,7 +96,7 @@ where
// Check where transaction hash numbers are stored
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
if provider.cached_storage_settings().storage_v2 {
return self.prune_rocksdb(provider, input, start, end);
}
@@ -491,9 +491,7 @@ mod tests {
let segment = TransactionLookup::new(prune_mode);
// Enable RocksDB storage for transaction hash numbers
db.factory.set_storage_settings_cache(
StorageSettings::v1().with_transaction_hash_numbers_in_rocksdb(true),
);
db.factory.set_storage_settings_cache(StorageSettings::v2());
let provider = db.factory.database_provider_rw().unwrap();
let result = segment.prune(&provider, input).unwrap();
@@ -578,9 +576,7 @@ mod tests {
}
// Enable RocksDB storage for transaction hash numbers
db.factory.set_storage_settings_cache(
StorageSettings::v1().with_transaction_hash_numbers_in_rocksdb(true),
);
db.factory.set_storage_settings_cache(StorageSettings::v2());
let to_block: BlockNumber = 6;
let prune_mode = PruneMode::Before(to_block);

View File

@@ -160,6 +160,14 @@ impl StateProvider for StateProviderTest {
) -> ProviderResult<Option<alloy_primitives::StorageValue>> {
Ok(self.accounts.get(&account).and_then(|(storage, _)| storage.get(&storage_key).copied()))
}
fn storage_by_hashed_key(
&self,
_address: Address,
_hashed_storage_key: StorageKey,
) -> ProviderResult<Option<alloy_primitives::StorageValue>> {
Ok(None)
}
}
impl BytecodeReader for StateProviderTest {

View File

@@ -6,5 +6,6 @@ mod middleware;
mod serde;
mod startup;
pub mod utils;
mod ws;
const fn main() {}

View File

@@ -0,0 +1,207 @@
#![allow(unreachable_pub)]
//! `WebSocket` subscription tests for `eth_subscribe` / `eth_unsubscribe`
use crate::utils::{launch_ws, test_rpc_builder};
use jsonrpsee::core::client::{Subscription, SubscriptionClientT};
use reth_rpc_server_types::RpcModuleSelection;
use reth_tokio_util::EventSender;
use serde_json::Value;
use std::time::Duration;
use reth_rpc_builder::{RpcServerConfig, TransportRpcModuleConfig};
/// Helper to launch a WS server with the Eth module.
async fn launch_ws_eth() -> reth_rpc_builder::RpcServerHandle {
launch_ws(vec![reth_rpc_server_types::RethRpcModule::Eth]).await
}
#[tokio::test(flavor = "multi_thread")]
async fn test_eth_subscribe_all_supported_kinds_accept() {
reth_tracing::init_test_tracing();
let handle = launch_ws_eth().await;
let client = handle.ws_client().await.unwrap();
let cases: Vec<(&str, Vec<Value>)> = vec![
("newHeads", vec![]),
("newPendingTransactions", vec![]),
("newPendingTransactions", vec![serde_json::json!(true)]),
("logs", vec![serde_json::json!({})]),
(
"logs",
vec![serde_json::json!({"address": "0x0000000000000000000000000000000000000001"})],
),
(
"logs",
vec![
serde_json::json!({"topics": ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"]}),
],
),
];
for (kind, params) in cases {
let mut rpc_params = jsonrpsee::core::params::ArrayParams::new();
rpc_params.insert(kind).unwrap();
for p in params {
rpc_params.insert(p).unwrap();
}
let sub: Subscription<Value> = client
.subscribe("eth_subscribe", rpc_params, "eth_unsubscribe")
.await
.unwrap_or_else(|e| panic!("subscribe({kind}) should succeed: {e}"));
sub.unsubscribe()
.await
.unwrap_or_else(|e| panic!("unsubscribe({kind}) should succeed: {e}"));
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_eth_subscribe_syncing_delivers_initial_status() {
reth_tracing::init_test_tracing();
let handle = launch_ws_eth().await;
let client = handle.ws_client().await.unwrap();
let mut sub: Subscription<Value> = client
.subscribe("eth_subscribe", jsonrpsee::rpc_params!["syncing"], "eth_unsubscribe")
.await
.unwrap();
let initial = tokio::time::timeout(Duration::from_secs(5), sub.next())
.await
.expect("timed out waiting for initial sync status")
.expect("subscription ended unexpectedly")
.expect("failed to deserialize sync status");
// NoopNetwork reports is_syncing = false
assert_eq!(initial, serde_json::json!(false));
sub.unsubscribe().await.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn test_eth_subscribe_invalid_kind_rejected() {
reth_tracing::init_test_tracing();
let handle = launch_ws_eth().await;
let client = handle.ws_client().await.unwrap();
let result: Result<Subscription<Value>, _> = client
.subscribe("eth_subscribe", jsonrpsee::rpc_params!["invalidKind"], "eth_unsubscribe")
.await;
assert!(result.is_err(), "invalid subscription kind must be rejected");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_eth_subscribe_server_survives_client_disconnect() {
reth_tracing::init_test_tracing();
let handle = launch_ws_eth().await;
{
let client = handle.ws_client().await.unwrap();
let _sub: Subscription<Value> = client
.subscribe("eth_subscribe", jsonrpsee::rpc_params!["newHeads"], "eth_unsubscribe")
.await
.unwrap();
// client + subscription drop here
}
// Server must still accept new connections after a client disconnects
let client2 = handle.ws_client().await.unwrap();
let sub: Subscription<Value> = client2
.subscribe("eth_subscribe", jsonrpsee::rpc_params!["newHeads"], "eth_unsubscribe")
.await
.unwrap();
sub.unsubscribe().await.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn test_eth_subscribe_not_available_over_http() {
reth_tracing::init_test_tracing();
let builder = test_rpc_builder();
let eth_api = builder.bootstrap_eth_api();
let modules = RpcModuleSelection::Standard;
let server =
builder.build(TransportRpcModuleConfig::set_http(modules), eth_api, EventSender::new(1));
let handle = RpcServerConfig::http(Default::default())
.with_http_address(crate::utils::test_address())
.start(&server)
.await
.unwrap();
assert!(handle.ws_client().await.is_none(), "WS should not be available on HTTP-only server");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_eth_subscribe_pending_transactions_receives_tx() {
use reth_consensus::noop::NoopConsensus;
use reth_evm_ethereum::EthEvmConfig;
use reth_network_api::noop::NoopNetwork;
use reth_provider::test_utils::NoopProvider;
use reth_rpc_builder::RpcModuleBuilder;
use reth_tasks::TokioTaskExecutor;
use reth_transaction_pool::{
test_utils::{TestPool, TestPoolBuilder},
PoolTransaction, TransactionOrigin, TransactionPool,
};
reth_tracing::init_test_tracing();
let pool: TestPool = TestPoolBuilder::default().into();
let pool_clone = pool.clone();
let builder = RpcModuleBuilder::default()
.with_provider(NoopProvider::default())
.with_pool(pool)
.with_network(NoopNetwork::default())
.with_executor(Box::new(TokioTaskExecutor::default()))
.with_evm_config(EthEvmConfig::mainnet())
.with_consensus(NoopConsensus::default());
let eth_api = builder.bootstrap_eth_api();
let server = builder.build(
TransportRpcModuleConfig::set_ws(RpcModuleSelection::Standard),
eth_api,
EventSender::new(1),
);
let handle = RpcServerConfig::ws(Default::default())
.with_ws_address(crate::utils::test_address())
.start(&server)
.await
.unwrap();
let client = handle.ws_client().await.unwrap();
// Subscribe to pending transaction hashes
let mut sub: Subscription<Value> = client
.subscribe(
"eth_subscribe",
jsonrpsee::rpc_params!["newPendingTransactions"],
"eth_unsubscribe",
)
.await
.unwrap();
// Insert a transaction into the pool
let tx = reth_transaction_pool::test_utils::MockTransaction::eip1559();
let expected_hash = *tx.hash();
pool_clone.add_transaction(TransactionOrigin::External, tx).await.unwrap();
// We should receive the tx hash via the subscription
let received = tokio::time::timeout(Duration::from_secs(5), sub.next())
.await
.expect("timed out waiting for pending tx notification")
.expect("subscription ended unexpectedly")
.expect("failed to deserialize tx hash");
let received_hash: alloy_primitives::TxHash = serde_json::from_value(received).unwrap();
assert_eq!(received_hash, expected_hash);
sub.unsubscribe().await.unwrap();
}

View File

@@ -154,6 +154,14 @@ impl StateProvider for StateProviderTraitObjWrapper {
self.0.storage(account, storage_key)
}
fn storage_by_hashed_key(
&self,
address: Address,
hashed_storage_key: alloy_primitives::StorageKey,
) -> reth_errors::ProviderResult<Option<alloy_primitives::StorageValue>> {
self.0.storage_by_hashed_key(address, hashed_storage_key)
}
fn account_code(
&self,
addr: &Address,

View File

@@ -541,7 +541,7 @@ where
eth_proof_window,
blocking_task_pool.unwrap_or_else(|| {
BlockingTaskPool::builder()
.thread_name(|i| format!("blocking-{i}"))
.thread_name(|i| format!("blocking-{i:02}"))
.build()
.map(BlockingTaskPool::new)
.expect("failed to build blocking task pool")

View File

@@ -22,6 +22,7 @@ use reth_stages_api::{
UnwindInput, UnwindOutput,
};
use reth_static_file_types::StaticFileSegment;
use reth_trie::KeccakKeyHasher;
use std::{
cmp::{max, Ordering},
collections::BTreeMap,
@@ -461,9 +462,16 @@ where
}
}
// write output
// Write output. When `use_hashed_state` is enabled, `write_state` skips writing to
// plain account/storage tables and only writes bytecodes and changesets. The hashed
// state is then written separately below.
provider.write_state(&state, OriginalValuesKnown::Yes, StateWriteConfig::default())?;
if provider.cached_storage_settings().use_hashed_state() {
let hashed_state = state.hash_state_slow::<KeccakKeyHasher>();
provider.write_hashed_state(&hashed_state.into_sorted())?;
}
let db_write_duration = time.elapsed();
debug!(
target: "sync::stages::execution",
@@ -1261,8 +1269,7 @@ mod tests {
// but no receipt data is written.
let factory = create_test_provider_factory();
factory
.set_storage_settings_cache(StorageSettings::v1().with_receipts_in_static_files(true));
factory.set_storage_settings_cache(StorageSettings::v2());
// Setup with block 1
let provider_rw = factory.database_provider_rw().unwrap();

View File

@@ -9,7 +9,9 @@ use reth_db_api::{
};
use reth_etl::Collector;
use reth_primitives_traits::Account;
use reth_provider::{AccountExtReader, DBProvider, HashingWriter, StatsReader};
use reth_provider::{
AccountExtReader, DBProvider, HashingWriter, StatsReader, StorageSettingsCache,
};
use reth_stages_api::{
AccountHashingCheckpoint, EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint,
StageError, StageId, UnwindInput, UnwindOutput,
@@ -134,7 +136,11 @@ impl Default for AccountHashingStage {
impl<Provider> Stage<Provider> for AccountHashingStage
where
Provider: DBProvider<Tx: DbTxMut> + HashingWriter + AccountExtReader + StatsReader,
Provider: DBProvider<Tx: DbTxMut>
+ HashingWriter
+ AccountExtReader
+ StatsReader
+ StorageSettingsCache,
{
/// Return the id of the stage
fn id(&self) -> StageId {
@@ -142,11 +148,21 @@ where
}
/// Execute the stage.
///
/// When `use_hashed_state` is enabled, this stage is a no-op because the execution stage
/// writes directly to `HashedAccounts`. Otherwise, it hashes plain state to populate hashed
/// tables.
fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
// If using hashed state as canonical, execution already writes to `HashedAccounts`,
// so this stage becomes a no-op.
if provider.cached_storage_settings().use_hashed_state() {
return Ok(ExecOutput::done(input.checkpoint().with_block_number(input.target())));
}
let (from_block, to_block) = input.next_block_range().into_inner();
// if there are more blocks then threshold it is faster to go over Plain state and hash all
@@ -234,10 +250,14 @@ where
provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
// NOTE: this runs in both v1 and v2 mode. In v2 mode, execution writes
// directly to `HashedAccounts`, but the unwind must still revert those
// entries here because `MerkleUnwind` runs after this stage (in unwind
// order) and needs `HashedAccounts` to reflect the target block state
// before it can verify the state root.
let (range, unwind_progress, _) =
input.unwind_block_range_with_threshold(self.commit_threshold);
// Aggregate all transition changesets and make a list of accounts that have been changed.
provider.unwind_account_hashing_range(range)?;
let mut stage_checkpoint =

View File

@@ -15,6 +15,7 @@ use reth_stages_api::{
EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
StorageHashingCheckpoint, UnwindInput, UnwindOutput,
};
use reth_storage_api::StorageSettingsCache;
use reth_storage_errors::provider::ProviderResult;
use std::{
fmt::Debug,
@@ -68,7 +69,11 @@ impl Default for StorageHashingStage {
impl<Provider> Stage<Provider> for StorageHashingStage
where
Provider: DBProvider<Tx: DbTxMut> + StorageReader + HashingWriter + StatsReader,
Provider: DBProvider<Tx: DbTxMut>
+ StorageReader
+ HashingWriter
+ StatsReader
+ StorageSettingsCache,
{
/// Return the id of the stage
fn id(&self) -> StageId {
@@ -82,6 +87,12 @@ where
return Ok(ExecOutput::done(input.checkpoint()))
}
// If use_hashed_state is enabled, execution writes directly to `HashedStorages`,
// so this stage becomes a no-op.
if provider.cached_storage_settings().use_hashed_state() {
return Ok(ExecOutput::done(input.checkpoint().with_block_number(input.target())));
}
let (from_block, to_block) = input.next_block_range().into_inner();
// if there are more blocks then threshold it is faster to go over Plain state and hash all
@@ -176,6 +187,11 @@ where
provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
// NOTE: this runs in both v1 and v2 mode. In v2 mode, execution writes
// directly to `HashedStorages`, but the unwind must still revert those
// entries here because `MerkleUnwind` runs after this stage (in unwind
// order) and needs `HashedStorages` to reflect the target block state
// before it can verify the state root.
let (range, unwind_progress, _) =
input.unwind_block_range_with_threshold(self.commit_threshold);

View File

@@ -103,7 +103,7 @@ where
let mut range = input.next_block_range();
let first_sync = input.checkpoint().block_number == 0;
let use_rocksdb = provider.cached_storage_settings().account_history_in_rocksdb;
let use_rocksdb = provider.cached_storage_settings().storage_v2;
// On first sync we might have history coming from genesis. We clear the table since it's
// faster to rebuild from scratch.
@@ -122,7 +122,7 @@ where
info!(target: "sync::stages::index_account_history::exec", ?first_sync, ?use_rocksdb, "Collecting indices");
let collector = if provider.cached_storage_settings().account_changesets_in_static_files {
let collector = if provider.cached_storage_settings().storage_v2 {
// Use the provider-based collection that can read from static files.
collect_account_history_indices(provider, range.clone(), &self.etl_config)?
} else {
@@ -666,32 +666,43 @@ mod tests {
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use reth_provider::RocksDBProviderFactory;
use reth_provider::{
providers::StaticFileWriter, RocksDBProviderFactory, StaticFileProviderFactory,
};
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::StorageSettings;
/// Sets up v2 account test data: writes block body indices to MDBX and
/// account changesets to static files (matching realistic v2 layout).
fn setup_v2_account_data(db: &TestStageDB, block_range: std::ops::RangeInclusive<u64>) {
db.factory.set_storage_settings_cache(StorageSettings::v2());
db.commit(|tx| {
for block in block_range.clone() {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
}
Ok(())
})
.unwrap();
let static_file_provider = db.factory.static_file_provider();
let mut writer =
static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
for block in block_range {
writer.append_account_changeset(vec![acc()], block).unwrap();
}
writer.commit().unwrap();
}
/// Test that when `account_history_in_rocksdb` is enabled, the stage
/// writes account history indices to `RocksDB` instead of MDBX.
#[tokio::test]
async fn execute_writes_to_rocksdb_when_enabled() {
// init
let db = TestStageDB::default();
// Enable RocksDB for account history
db.factory.set_storage_settings_cache(
StorageSettings::v1().with_account_history_in_rocksdb(true),
);
db.commit(|tx| {
for block in 0..=10 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
Ok(())
})
.unwrap();
setup_v2_account_data(&db, 0..=10);
let input = ExecInput { target: Some(10), ..Default::default() };
let mut stage = IndexAccountHistoryStage::default();
@@ -721,22 +732,7 @@ mod tests {
#[tokio::test]
async fn unwind_works_when_rocksdb_enabled() {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::v1().with_account_history_in_rocksdb(true),
);
db.commit(|tx| {
for block in 0..=10 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
Ok(())
})
.unwrap();
setup_v2_account_data(&db, 0..=10);
let input = ExecInput { target: Some(10), ..Default::default() };
let mut stage = IndexAccountHistoryStage::default();
@@ -772,22 +768,7 @@ mod tests {
#[tokio::test]
async fn execute_incremental_sync() {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::v1().with_account_history_in_rocksdb(true),
);
db.commit(|tx| {
for block in 0..=5 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
Ok(())
})
.unwrap();
setup_v2_account_data(&db, 0..=10);
let input = ExecInput { target: Some(5), ..Default::default() };
let mut stage = IndexAccountHistoryStage::default();
@@ -802,18 +783,6 @@ mod tests {
let blocks: Vec<u64> = result.unwrap().iter().collect();
assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
db.commit(|tx| {
for block in 6..=10 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
Ok(())
})
.unwrap();
let input = ExecInput { target: Some(10), checkpoint: Some(StageCheckpoint::new(5)) };
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();

View File

@@ -107,7 +107,7 @@ where
let mut range = input.next_block_range();
let first_sync = input.checkpoint().block_number == 0;
let use_rocksdb = provider.cached_storage_settings().storages_history_in_rocksdb;
let use_rocksdb = provider.cached_storage_settings().storage_v2;
// On first sync we might have history coming from genesis. We clear the table since it's
// faster to rebuild from scratch.
@@ -125,7 +125,7 @@ where
}
info!(target: "sync::stages::index_storage_history::exec", ?first_sync, ?use_rocksdb, "Collecting indices");
let collector = if provider.cached_storage_settings().storage_changesets_in_static_files {
let collector = if provider.cached_storage_settings().storage_v2 {
collect_storage_history_indices(provider, range.clone(), &self.etl_config)?
} else {
collect_history_indices::<_, tables::StorageChangeSets, tables::StoragesHistory, _>(
@@ -694,33 +694,51 @@ mod tests {
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use reth_provider::RocksDBProviderFactory;
use reth_db_api::models::StorageBeforeTx;
use reth_provider::{providers::StaticFileWriter, RocksDBProviderFactory};
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::StorageSettings;
/// Sets up v2 storage test data: writes block body indices to MDBX and
/// storage changesets to static files (matching realistic v2 layout).
fn setup_v2_storage_data(db: &TestStageDB, block_range: std::ops::RangeInclusive<u64>) {
db.factory.set_storage_settings_cache(StorageSettings::v2());
db.commit(|tx| {
for block in block_range.clone() {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
}
Ok(())
})
.unwrap();
let static_file_provider = db.factory.static_file_provider();
let mut writer =
static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
for block in block_range {
writer
.append_storage_changeset(
vec![StorageBeforeTx {
address: ADDRESS,
key: STORAGE_KEY,
value: U256::ZERO,
}],
block,
)
.unwrap();
}
writer.commit().unwrap();
}
/// Test that when `storages_history_in_rocksdb` is enabled, the stage
/// writes storage history indices to `RocksDB` instead of MDBX.
#[tokio::test]
async fn execute_writes_to_rocksdb_when_enabled() {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::v1().with_storages_history_in_rocksdb(true),
);
db.commit(|tx| {
for block in 0..=10 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(STORAGE_KEY),
)?;
}
Ok(())
})
.unwrap();
setup_v2_storage_data(&db, 0..=10);
let input = ExecInput { target: Some(10), ..Default::default() };
let mut stage = IndexStorageHistoryStage::default();
@@ -748,25 +766,7 @@ mod tests {
#[tokio::test]
async fn unwind_works_when_rocksdb_enabled() {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::v1().with_storages_history_in_rocksdb(true),
);
db.commit(|tx| {
for block in 0..=10 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(STORAGE_KEY),
)?;
}
Ok(())
})
.unwrap();
setup_v2_storage_data(&db, 0..=10);
let input = ExecInput { target: Some(10), ..Default::default() };
let mut stage = IndexStorageHistoryStage::default();
@@ -803,25 +803,7 @@ mod tests {
#[tokio::test]
async fn unwind_to_zero_keeps_block_zero() {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::v1().with_storages_history_in_rocksdb(true),
);
db.commit(|tx| {
for block in 0..=5 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(STORAGE_KEY),
)?;
}
Ok(())
})
.unwrap();
setup_v2_storage_data(&db, 0..=5);
let input = ExecInput { target: Some(5), ..Default::default() };
let mut stage = IndexStorageHistoryStage::default();
@@ -852,25 +834,7 @@ mod tests {
#[tokio::test]
async fn execute_incremental_sync() {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::v1().with_storages_history_in_rocksdb(true),
);
db.commit(|tx| {
for block in 0..=5 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(STORAGE_KEY),
)?;
}
Ok(())
})
.unwrap();
setup_v2_storage_data(&db, 0..=10);
let input = ExecInput { target: Some(5), ..Default::default() };
let mut stage = IndexStorageHistoryStage::default();
@@ -885,21 +849,6 @@ mod tests {
let blocks: Vec<u64> = result.unwrap().iter().collect();
assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
db.commit(|tx| {
for block in 6..=10 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(STORAGE_KEY),
)?;
}
Ok(())
})
.unwrap();
let input = ExecInput { target: Some(10), checkpoint: Some(StageCheckpoint::new(5)) };
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
@@ -919,27 +868,8 @@ mod tests {
use reth_db_api::models::sharded_key::NUM_OF_INDICES_IN_SHARD;
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::v1().with_storages_history_in_rocksdb(true),
);
let num_blocks = (NUM_OF_INDICES_IN_SHARD * 2 + 100) as u64;
db.commit(|tx| {
for block in 0..num_blocks {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(STORAGE_KEY),
)?;
}
Ok(())
})
.unwrap();
setup_v2_storage_data(&db, 0..=num_blocks - 1);
let input = ExecInput { target: Some(num_blocks - 1), ..Default::default() };
let mut stage = IndexStorageHistoryStage::default();

View File

@@ -9,7 +9,7 @@ use reth_db_api::{
use reth_primitives_traits::{GotExpected, SealedHeader};
use reth_provider::{
ChangeSetReader, DBProvider, HeaderProvider, ProviderError, StageCheckpointReader,
StageCheckpointWriter, StatsReader, StorageChangeSetReader, TrieWriter,
StageCheckpointWriter, StatsReader, StorageChangeSetReader, StorageSettingsCache, TrieWriter,
};
use reth_stages_api::{
BlockErrorKind, EntitiesCheckpoint, ExecInput, ExecOutput, MerkleCheckpoint, Stage,
@@ -160,6 +160,7 @@ where
+ HeaderProvider
+ ChangeSetReader
+ StorageChangeSetReader
+ StorageSettingsCache
+ StageCheckpointReader
+ StageCheckpointWriter,
{

View File

@@ -540,9 +540,7 @@ mod tests {
let mut rng = generators::rng();
let runner = SenderRecoveryTestRunner::default();
runner.db.factory.set_storage_settings_cache(
StorageSettings::v1().with_transaction_senders_in_static_files(true),
);
runner.db.factory.set_storage_settings_cache(StorageSettings::v2());
let input = ExecInput {
target: Some(target),
checkpoint: Some(StageCheckpoint::new(stage_progress)),

View File

@@ -200,7 +200,7 @@ where
}
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
if provider.cached_storage_settings().storage_v2 {
provider.commit_pending_rocksdb_batches()?;
provider.rocksdb_provider().flush(&[Tables::TransactionHashNumbers.name()])?;
}
@@ -618,9 +618,7 @@ mod tests {
let runner = TransactionLookupTestRunner::default();
// Enable RocksDB for transaction hash numbers
runner.db.factory.set_storage_settings_cache(
StorageSettings::v1().with_transaction_hash_numbers_in_rocksdb(true),
);
runner.db.factory.set_storage_settings_cache(StorageSettings::v2());
let input = ExecInput {
target: Some(previous_stage),
@@ -686,9 +684,7 @@ mod tests {
let runner = TransactionLookupTestRunner::default();
// Enable RocksDB for transaction hash numbers
runner.db.factory.set_storage_settings_cache(
StorageSettings::v1().with_transaction_hash_numbers_in_rocksdb(true),
);
runner.db.factory.set_storage_settings_cache(StorageSettings::v2());
// Insert blocks with transactions
let blocks = random_block_range(

Some files were not shown because too many files have changed in this diff Show More