Compare commits

...

66 Commits

Author SHA1 Message Date
yongkangc
9a7d3be1fc docs: clarify RocksDB flags require explicit boolean values 2026-01-19 15:11:30 +00:00
yongkangc
961e232dea fix: use correct --rocksdb.* CLI flag names 2026-01-19 15:06:18 +00:00
yongkangc
0c0d12ac9a fix: correct RocksDB storage settings documentation
- Replace non-existent --storage.*-in-rocksdb CLI flags with correct
  reth db settings command usage
- Document actual commands: transaction_hash_numbers, storages_history,
  account_history via 'reth db settings set'
- Add section for viewing current settings with 'reth db settings get'
- Use consistent terminology (transaction hash table vs tx-hash)
2026-01-19 15:05:20 +00:00
yongkangc
62a97edbf1 docs: add documentation for --storage.*-in-rocksdb CLI flags
Documents the new storage CLI flags for RocksDB table routing:
- --storage.tx-hash-in-rocksdb
- --storage.storages-history-in-rocksdb
- --storage.account-history-in-rocksdb

These flags allow routing specific large tables to RocksDB for improved
performance on archive nodes and high-throughput workloads.
2026-01-19 14:56:12 +00:00
Matthias Seitz
c9dad4765d chore: bump version to 1.10.1 (#21188) 2026-01-19 14:04:08 +00:00
Dan Cline
1d55abeef3 chore: rename extend_ref methods on sorted data structures (#21043) 2026-01-19 13:04:57 +00:00
Niven
f7460e219c fix(flashblocks): Add flashblock ws connection retry period (#20510) 2026-01-19 12:01:33 +00:00
Georgios Konstantopoulos
0c66315f20 chore(bench): add --disable-tx-gossip to benchmark node args (#21171) 2026-01-19 11:45:56 +00:00
MozirDmitriy
6a2010e595 refactor(stages): reuse history index cache buffers in collect_history_indices (#21017) 2026-01-19 11:39:52 +00:00
Georgios Konstantopoulos
c2435ff6f8 feat(download): resumable snapshot downloads with auto-retry (#21161) 2026-01-19 10:26:24 +00:00
DaniPopes
52ec8e9491 ci: update to tempoxyz (#21176) 2026-01-19 10:21:37 +00:00
Georgios Konstantopoulos
a901d80ee6 chore: apply spelling and typo fixes (#21182) 2026-01-19 10:21:25 +00:00
MoNyAvA
915164078f docs: document minimal storage mode in pruning FAQ (#21025)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-01-19 10:27:45 +01:00
github-actions[bot]
be3234d848 chore(deps): weekly cargo update (#21167)
Co-authored-by: github-merge-queue <118344674+github-merge-queue@users.noreply.github.com>
2026-01-18 14:57:20 +00:00
Matthias Seitz
f624372334 feat(execution-types): add receipts_iter helper (#21162)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-17 19:20:28 +01:00
Matthias Seitz
40bc9d3860 revert: undo Chain crate, add LazyTrieData to trie-common (#21155) 2026-01-17 15:57:09 +00:00
Georgios Konstantopoulos
1ea574417f feat(engine): add new_payload_interval metric (start-to-start) (#21159) 2026-01-17 12:15:45 +00:00
Georgios Konstantopoulos
27e055f790 feat(engine): add time_between_new_payloads metric (#21158) 2026-01-17 10:20:22 +00:00
Georgios Konstantopoulos
d5dc0b27eb fix(storage-api): gate reth-chain dependency behind std feature
The reth-chain crate is inherently std-only (uses BTreeMap, Arc, etc.)
and was breaking the riscv32imac no_std builds by pulling in serde_core
which doesn't support no_std properly.

This makes reth-chain optional and only enables it when std feature is
active, gating the block_writer module that uses Chain behind std.
2026-01-17 08:32:10 +00:00
Georgios Konstantopoulos
c11c13000f perf(storage): batch trie updates across blocks in save_blocks (#21142)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: YK <chiayongkang@hotmail.com>
2026-01-17 07:15:40 +00:00
Matthias Seitz
6bf43ab24a refactor: use ExecutionOutcome::single instead of tuple From (#21152) 2026-01-17 01:51:26 +00:00
Matthias Seitz
574bde0d6f chore(chain-state): reorganize deferred_trie.rs impl blocks (#21151) 2026-01-17 01:39:29 +00:00
Matthias Seitz
79b8ffb828 feat(primitives-traits): add try_recover_signers for parallel batch recovery (#21103) 2026-01-17 01:24:53 +00:00
Dan Cline
c617d25c36 perf: make Chain use DeferredTrieData (#21137)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-01-17 01:05:35 +00:00
Georgios Konstantopoulos
b96a30821f fix(engine): request head block download when not buffered after backfill (#21150) 2026-01-17 00:33:27 +00:00
Mablr
012fbf5110 fix(docs/cli): update help.rs to use nightly toolchain (#21149) 2026-01-16 23:35:26 +00:00
Arsenii Kulikov
d7a5d1f872 fix: properly record span fields (#21148) 2026-01-16 23:25:54 +00:00
Matthias Seitz
3a39251f79 fix: release mutex before dropping ancestors in wait_cloned (#21146) 2026-01-16 22:32:23 +00:00
Julian Meyer
f6dbf2d82d feat(db): implement extra dup methods (#20964)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-01-16 21:31:52 +00:00
Brian Picciano
13707faf1a feat(consensus): incremental receipt root computation in background task (#21131) 2026-01-16 19:53:59 +00:00
Arsenii Kulikov
6e6415690c perf: start saving cache sooner (#21130) 2026-01-16 18:55:18 +00:00
Matthias Seitz
b81e373d78 chore(deps): bump vergen and vergen-git2 to 9.1.0 (#21141) 2026-01-16 20:00:43 +01:00
Arun Dhyani
a164654145 fix(exex): prevent ExExManager deadlock when buffer clears after being full (#21135) 2026-01-16 18:42:23 +00:00
Matthias Seitz
905bb95f8b perf(engine): defer trie overlay computation with LazyOverlay (#21133) 2026-01-16 18:25:04 +00:00
YK
13c32625bc feat(storage): add EitherReader for routing history queries to MDBX or RocksDB (#21063)
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
2026-01-16 17:44:43 +00:00
YK
1be9fab5bf perf: Optimize multiproof sequencer add_proof (#21129) 2026-01-16 17:33:48 +00:00
Arsenii Kulikov
80eb0d0fb6 refactor: use BlockExecutionOutcome in ExecutedBlock (#21123) 2026-01-16 17:07:19 +00:00
Matthias Seitz
5e178f6ac6 chore(deps): update alloy-evm and alloy-op-evm to 0.26.3 (#21126) 2026-01-16 17:24:45 +01:00
Matthias Seitz
b4b64096c8 perf(cli): use available_parallelism as default for re-execute (#21010) 2026-01-16 16:08:30 +00:00
figtracer
e313de818b chore(provider): pre alloc tx hashes (#21114) 2026-01-16 15:40:47 +00:00
rakita
86c414081a feat: stagging revm v34.0.0 (#20627)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-01-16 14:56:27 +00:00
Brian Picciano
a74cb9cbc3 feat(trie): in-memory trie changesets (#20997) 2026-01-16 01:06:31 +00:00
YK
e25411c32b perf(trie): fix extend_sorted_vec O(n log n) → O(n+m) merge (#21098) 2026-01-16 00:17:22 +00:00
Matthias Seitz
ec3323bba0 refactor(chain-state): extract blocks_to_chain helper (#21110) 2026-01-15 23:27:11 +00:00
Dan Cline
26cd132631 fix(reth-bench): use requests hash (#21111) 2026-01-15 19:19:16 +00:00
DaniPopes
079f59c2be perf: reserve in extend_sorted_vec (#21109) 2026-01-15 19:10:20 +00:00
joshieDo
e9b079ad62 feat: add rocksdb to save_blocks (#21003)
Co-authored-by: Sergei Shulepov <s.pepyakin@gmail.com>
Co-authored-by: Sergei Shulepov <pep@tempo.xyz>
Co-authored-by: yongkangc <chiayongkang@hotmail.com>
2026-01-15 18:33:19 +00:00
Dan Cline
b1f107b171 feat(reth-bench): add generate-big-block command (#21082) 2026-01-15 15:30:04 +00:00
YK
7d0e7e72de perf(trie): add k-way merge batch optimization for merge_overlay_trie_input (#21080) 2026-01-15 15:22:15 +00:00
joshieDo
f012b3391e feat: parallelize save_blocks (#20993)
Co-authored-by: Sergei Shulepov <s.pepyakin@gmail.com>
Co-authored-by: Sergei Shulepov <pep@tempo.xyz>
Co-authored-by: Brian Picciano <me@mediocregopher.com>
2026-01-15 14:58:06 +00:00
joshieDo
d225fc1d7f feat: add get/set db settings for rocksdb (#21095) 2026-01-15 14:48:05 +00:00
Dan Cline
d469b7f1d0 feat(rpc): add flag to skip invalid transactions in testing_buildBlockV1 (#21094)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-01-15 12:05:30 +00:00
YK
9bcd3712c8 test(storage): add parametrized MDBX/RocksDB history lookup equivalence tests (#20871) 2026-01-15 11:16:40 +00:00
Emma Jamieson-Hoare
b25f32a977 chore(release): set version v1.10.0 (#21091)
Co-authored-by: Emma Jamieson-Hoare <ejamieson19@gmai.com>
2026-01-15 10:50:35 +00:00
Emma Jamieson-Hoare
905de96944 chore: release 1.9.4 (#21048)
Co-authored-by: Emma Jamieson-Hoare <ejamieson19@gmai.com>
2026-01-15 09:41:54 +00:00
Sergei Shulepov
27fbd9a7de fix(db): change commit return type from Result<bool> to Result<()> (#21077)
Co-authored-by: Sergei Shulepov <pep@tempo.xyz>
2026-01-14 23:56:27 +00:00
DaniPopes
26a99ac5a3 perf: small improvement to extend_sorted_vec (#21032) 2026-01-14 23:46:58 +00:00
James Prestwich
1265a89c21 refactor: make use of dbi consistent across mdbx interface (#21079) 2026-01-14 23:42:42 +00:00
Matthias Seitz
b9ff5941eb feat(primitives): add SealedBlock::decode_sealed for efficient RLP decoding (#21030) 2026-01-14 22:49:55 +00:00
Sergei Shulepov
a75a0a5db7 feat(cli): support file:// URLs in reth download (#21026)
Co-authored-by: Sergei Shulepov <pep@tempo.xyz>
2026-01-14 22:30:42 +00:00
Matthias Seitz
0a4bac77d0 feat(primitives): add From<Sealed<B>> for SealedBlock<B> (#21078) 2026-01-14 22:19:09 +00:00
Kamil Szczygieł
1fbd5a95f8 feat: Support for sending logs through OTLP (#21039)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-01-14 21:29:00 +00:00
Arsenii Kulikov
1bc07fad8e perf: use binary search in ForwardInMemoryCursor (#21049) 2026-01-14 19:31:11 +00:00
Arsenii Kulikov
8cb506c4d3 perf: don't clone entire keys set (#21042) 2026-01-14 19:26:23 +00:00
ethfanWilliam
15f16a5a2e fix: propagate keccak-cache-global feature to reth-optimism-cli (#21051)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-01-14 19:22:22 +00:00
Brian Picciano
5cf1d2a0b0 fix(trie): Update branch masks when revealing blinded nodes (#20937) 2026-01-14 19:12:15 +00:00
335 changed files with 13170 additions and 3724 deletions

View File

@@ -12,7 +12,7 @@ workflows:
# Check that `A` activates the features of `B`.
"propagate-feature",
# These are the features to check:
"--features=std,op,dev,asm-keccak,jemalloc,jemalloc-prof,tracy-allocator,tracy,serde-bincode-compat,serde,test-utils,arbitrary,bench,alloy-compat,min-error-logs,min-warn-logs,min-info-logs,min-debug-logs,min-trace-logs,otlp,js-tracer,portable,keccak-cache-global",
"--features=std,op,dev,asm-keccak,jemalloc,jemalloc-prof,tracy-allocator,tracy,serde-bincode-compat,serde,test-utils,arbitrary,bench,alloy-compat,min-error-logs,min-warn-logs,min-info-logs,min-debug-logs,min-trace-logs,otlp,otlp-logs,js-tracer,portable,keccak-cache-global",
# Do not try to add a new section to `[features]` of `A` only because `B` exposes that feature. There are edge-cases where this is still needed, but we can add them manually.
"--left-side-feature-missing=ignore",
# Ignore the case that `A` it outside of the workspace. Otherwise it will report errors in external dependencies that we have no influence on.

View File

@@ -15,6 +15,6 @@ permissions:
jobs:
update:
uses: ithacaxyz/ci/.github/workflows/cargo-update-pr.yml@main
uses: tempoxyz/ci/.github/workflows/cargo-update-pr.yml@main
secrets:
token: ${{ secrets.GITHUB_TOKEN }}

View File

@@ -285,7 +285,7 @@ jobs:
- run: zepter run check
deny:
uses: ithacaxyz/ci/.github/workflows/deny.yml@main
uses: tempoxyz/ci/.github/workflows/deny.yml@main
lint-success:
name: lint success

View File

@@ -249,7 +249,7 @@ Write comments that remain valuable after the PR is merged. Future readers won't
unsafe impl GlobalAlloc for LimitedAllocator { ... }
// Binary search requires sorted input. Panics on unsorted slices.
fn find_index(items: &[Item], target: &Item) -> Option
fn find_index(items: &[Item], target: &Item) -> Option<usize>
// Timeout set to 5s to match EVM block processing limits
const TRACER_TIMEOUT: Duration = Duration::from_secs(5);

1287
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
[workspace.package]
version = "1.9.3"
version = "1.10.1"
edition = "2024"
rust-version = "1.88"
license = "MIT OR Apache-2.0"
@@ -473,22 +473,22 @@ reth-ress-protocol = { path = "crates/ress/protocol" }
reth-ress-provider = { path = "crates/ress/provider" }
# revm
revm = { version = "33.1.0", default-features = false }
revm-bytecode = { version = "7.1.1", default-features = false }
revm-database = { version = "9.0.5", default-features = false }
revm-state = { version = "8.1.1", default-features = false }
revm-primitives = { version = "21.0.2", default-features = false }
revm-interpreter = { version = "31.1.0", default-features = false }
revm-database-interface = { version = "8.0.5", default-features = false }
op-revm = { version = "14.1.0", default-features = false }
revm-inspectors = "0.33.2"
revm = { version = "34.0.0", default-features = false }
revm-bytecode = { version = "8.0.0", default-features = false }
revm-database = { version = "10.0.0", default-features = false }
revm-state = { version = "9.0.0", default-features = false }
revm-primitives = { version = "22.0.0", default-features = false }
revm-interpreter = { version = "32.0.0", default-features = false }
revm-database-interface = { version = "9.0.0", default-features = false }
op-revm = { version = "15.0.0", default-features = false }
revm-inspectors = "0.34.0"
# eth
alloy-chains = { version = "0.2.5", default-features = false }
alloy-dyn-abi = "1.4.1"
alloy-dyn-abi = "1.4.3"
alloy-eip2124 = { version = "0.2.0", default-features = false }
alloy-eip7928 = { version = "0.1.0", default-features = false }
alloy-evm = { version = "0.25.1", default-features = false }
alloy-eip7928 = { version = "0.3.0", default-features = false }
alloy-evm = { version = "0.26.3", default-features = false }
alloy-primitives = { version = "1.5.0", default-features = false, features = ["map-foldhash"] }
alloy-rlp = { version = "0.3.10", default-features = false, features = ["core-net"] }
alloy-sol-macro = "1.5.0"
@@ -497,36 +497,36 @@ alloy-trie = { version = "0.9.1", default-features = false }
alloy-hardforks = "0.4.5"
alloy-consensus = { version = "1.4.1", default-features = false }
alloy-contract = { version = "1.4.1", default-features = false }
alloy-eips = { version = "1.4.1", default-features = false }
alloy-genesis = { version = "1.4.1", default-features = false }
alloy-json-rpc = { version = "1.4.1", default-features = false }
alloy-network = { version = "1.4.1", default-features = false }
alloy-network-primitives = { version = "1.4.1", default-features = false }
alloy-provider = { version = "1.4.1", features = ["reqwest", "debug-api"], default-features = false }
alloy-pubsub = { version = "1.4.1", default-features = false }
alloy-rpc-client = { version = "1.4.1", default-features = false }
alloy-rpc-types = { version = "1.4.1", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.4.1", default-features = false }
alloy-rpc-types-anvil = { version = "1.4.1", default-features = false }
alloy-rpc-types-beacon = { version = "1.4.1", default-features = false }
alloy-rpc-types-debug = { version = "1.4.1", default-features = false }
alloy-rpc-types-engine = { version = "1.4.1", default-features = false }
alloy-rpc-types-eth = { version = "1.4.1", default-features = false }
alloy-rpc-types-mev = { version = "1.4.1", default-features = false }
alloy-rpc-types-trace = { version = "1.4.1", default-features = false }
alloy-rpc-types-txpool = { version = "1.4.1", default-features = false }
alloy-serde = { version = "1.4.1", default-features = false }
alloy-signer = { version = "1.4.1", default-features = false }
alloy-signer-local = { version = "1.4.1", default-features = false }
alloy-transport = { version = "1.4.1" }
alloy-transport-http = { version = "1.4.1", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.4.1", default-features = false }
alloy-transport-ws = { version = "1.4.1", default-features = false }
alloy-consensus = { version = "1.4.3", default-features = false }
alloy-contract = { version = "1.4.3", default-features = false }
alloy-eips = { version = "1.4.3", default-features = false }
alloy-genesis = { version = "1.4.3", default-features = false }
alloy-json-rpc = { version = "1.4.3", default-features = false }
alloy-network = { version = "1.4.3", default-features = false }
alloy-network-primitives = { version = "1.4.3", default-features = false }
alloy-provider = { version = "1.4.3", features = ["reqwest", "debug-api"], default-features = false }
alloy-pubsub = { version = "1.4.3", default-features = false }
alloy-rpc-client = { version = "1.4.3", default-features = false }
alloy-rpc-types = { version = "1.4.3", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.4.3", default-features = false }
alloy-rpc-types-anvil = { version = "1.4.3", default-features = false }
alloy-rpc-types-beacon = { version = "1.4.3", default-features = false }
alloy-rpc-types-debug = { version = "1.4.3", default-features = false }
alloy-rpc-types-engine = { version = "1.4.3", default-features = false }
alloy-rpc-types-eth = { version = "1.4.3", default-features = false }
alloy-rpc-types-mev = { version = "1.4.3", default-features = false }
alloy-rpc-types-trace = { version = "1.4.3", default-features = false }
alloy-rpc-types-txpool = { version = "1.4.3", default-features = false }
alloy-serde = { version = "1.4.3", default-features = false }
alloy-signer = { version = "1.4.3", default-features = false }
alloy-signer-local = { version = "1.4.3", default-features = false }
alloy-transport = { version = "1.4.3" }
alloy-transport-http = { version = "1.4.3", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.4.3", default-features = false }
alloy-transport-ws = { version = "1.4.3", default-features = false }
# op
alloy-op-evm = { version = "0.25.0", default-features = false }
alloy-op-evm = { version = "0.26.3", default-features = false }
alloy-op-hardforks = "0.4.4"
op-alloy-rpc-types = { version = "0.23.1", default-features = false }
op-alloy-rpc-types-engine = { version = "0.23.1", default-features = false }
@@ -665,6 +665,7 @@ opentelemetry_sdk = "0.31"
opentelemetry = "0.31"
opentelemetry-otlp = "0.31"
opentelemetry-semantic-conventions = "0.31"
opentelemetry-appender-tracing = "0.31"
tracing-opentelemetry = "0.32"
# misc-testing
@@ -738,15 +739,15 @@ tracing-subscriber = { version = "0.3", default-features = false }
tracing-tracy = "0.11"
triehash = "0.8"
typenum = "1.15.0"
vergen = "9.0.4"
vergen = "9.1.0"
visibility = "0.1.1"
walkdir = "2.3.3"
vergen-git2 = "1.0.5"
vergen-git2 = "9.1.0"
# networking
ipnet = "2.11"
# [patch.crates-io]
[patch.crates-io]
# alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-contract = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
# alloy-eips = { git = "https://github.com/alloy-rs/alloy", rev = "3049f232fbb44d1909883e154eb38ec5962f53a3" }
@@ -791,3 +792,8 @@ ipnet = "2.11"
# alloy-evm = { git = "https://github.com/alloy-rs/evm", rev = "a69f0b45a6b0286e16072cb8399e02ce6ceca353" }
# alloy-op-evm = { git = "https://github.com/alloy-rs/evm", rev = "a69f0b45a6b0286e16072cb8399e02ce6ceca353" }
# revm-inspectors = { git = "https://github.com/paradigmxyz/revm-inspectors", rev = "3020ea8" }
# alloy-evm = { git = "https://github.com/alloy-rs/evm", rev = "072c248" }
# alloy-op-evm = { git = "https://github.com/alloy-rs/evm", rev = "072c248" }

View File

@@ -163,6 +163,7 @@ impl NodeManager {
"eth,reth".to_string(),
"--disable-discovery".to_string(),
"--trusted-only".to_string(),
"--disable-tx-gossip".to_string(),
]);
// Add tracing arguments if OTLP endpoint is configured

View File

@@ -17,21 +17,26 @@ workspace = true
reth-cli-runner.workspace = true
reth-cli-util.workspace = true
reth-engine-primitives.workspace = true
reth-ethereum-primitives.workspace = true
reth-fs-util.workspace = true
reth-node-api.workspace = true
reth-node-core.workspace = true
reth-primitives-traits.workspace = true
reth-rpc-api.workspace = true
reth-tracing.workspace = true
reth-chainspec.workspace = true
# alloy
alloy-eips.workspace = true
alloy-json-rpc.workspace = true
alloy-consensus.workspace = true
alloy-network.workspace = true
alloy-primitives.workspace = true
alloy-provider = { workspace = true, features = ["engine-api", "pubsub", "reqwest-rustls-tls"], default-features = false }
alloy-pubsub.workspace = true
alloy-rpc-client = { workspace = true, features = ["pubsub"] }
alloy-rpc-types-engine.workspace = true
alloy-rpc-types-engine = { workspace = true, features = ["kzg"] }
alloy-transport-http.workspace = true
alloy-transport-ipc.workspace = true
alloy-transport-ws.workspace = true

View File

@@ -0,0 +1,160 @@
//! Benchmarks empty block processing by ramping the block gas limit.
use crate::{
authenticated_transport::AuthenticatedTransportConnect,
bench::{
helpers::{build_payload, prepare_payload_request, rpc_block_to_header},
output::GasRampPayloadFile,
},
valid_payload::{call_forkchoice_updated, call_new_payload, payload_to_new_payload},
};
use alloy_eips::BlockNumberOrTag;
use alloy_provider::{network::AnyNetwork, Provider, RootProvider};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{ExecutionPayload, ForkchoiceState, JwtSecret};
use clap::Parser;
use reqwest::Url;
use reth_chainspec::ChainSpec;
use reth_cli_runner::CliContext;
use reth_ethereum_primitives::TransactionSigned;
use reth_primitives_traits::constants::{GAS_LIMIT_BOUND_DIVISOR, MAXIMUM_GAS_LIMIT_BLOCK};
use std::{path::PathBuf, time::Instant};
use tracing::info;
/// `reth benchmark gas-limit-ramp` command.
#[derive(Debug, Parser)]
pub struct Command {
/// Number of blocks to generate.
#[arg(long, value_name = "BLOCKS")]
blocks: u64,
/// The Engine API RPC URL.
#[arg(long = "engine-rpc-url", value_name = "ENGINE_RPC_URL")]
engine_rpc_url: String,
/// Path to the JWT secret for Engine API authentication.
#[arg(long = "jwt-secret", value_name = "JWT_SECRET")]
jwt_secret: PathBuf,
/// Output directory for benchmark results and generated payloads.
#[arg(long, value_name = "OUTPUT")]
output: PathBuf,
}
impl Command {
/// Execute `benchmark gas-limit-ramp` command.
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
if self.blocks == 0 {
return Err(eyre::eyre!("--blocks must be greater than 0"));
}
// Ensure output directory exists
if self.output.is_file() {
return Err(eyre::eyre!("Output path must be a directory"));
}
if !self.output.exists() {
std::fs::create_dir_all(&self.output)?;
info!("Created output directory: {:?}", self.output);
}
// Set up authenticated provider (used for both Engine API and eth_ methods)
let jwt = std::fs::read_to_string(&self.jwt_secret)?;
let jwt = JwtSecret::from_hex(jwt)?;
let auth_url = Url::parse(&self.engine_rpc_url)?;
info!("Connecting to Engine RPC at {}", auth_url);
let auth_transport = AuthenticatedTransportConnect::new(auth_url, jwt);
let client = ClientBuilder::default().connect_with(auth_transport).await?;
let provider = RootProvider::<AnyNetwork>::new(client);
// Get chain spec - required for fork detection
let chain_id = provider.get_chain_id().await?;
let chain_spec = ChainSpec::from_chain_id(chain_id)
.ok_or_else(|| eyre::eyre!("Unsupported chain id: {chain_id}"))?;
// Fetch the current head block as parent
let parent_block = provider
.get_block_by_number(BlockNumberOrTag::Latest)
.full()
.await?
.ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
let (mut parent_header, mut parent_hash) = rpc_block_to_header(parent_block);
let canonical_parent = parent_header.number;
let start_block = canonical_parent + 1;
let end_block = start_block + self.blocks - 1;
info!(canonical_parent, start_block, end_block, "Starting gas limit ramp benchmark");
let mut next_block_number = start_block;
let total_benchmark_duration = Instant::now();
while next_block_number <= end_block {
let timestamp = parent_header.timestamp.saturating_add(1);
let request = prepare_payload_request(&chain_spec, timestamp, parent_hash);
let new_payload_version = request.new_payload_version;
let (payload, sidecar) = build_payload(&provider, request).await?;
let mut block =
payload.clone().try_into_block_with_sidecar::<TransactionSigned>(&sidecar)?;
let max_increase = max_gas_limit_increase(parent_header.gas_limit);
let gas_limit =
parent_header.gas_limit.saturating_add(max_increase).min(MAXIMUM_GAS_LIMIT_BLOCK);
block.header.gas_limit = gas_limit;
let block_hash = block.header.hash_slow();
// Regenerate the payload from the modified block, but keep the original sidecar
// which contains the actual execution requests data (not just the hash)
let (payload, _) = ExecutionPayload::from_block_unchecked(block_hash, &block);
let (version, params) = payload_to_new_payload(
payload,
sidecar,
false,
block.header.withdrawals_root,
Some(new_payload_version),
)?;
// Save payload to file with version info for replay
let payload_path =
self.output.join(format!("payload_block_{}.json", block.header.number));
let file =
GasRampPayloadFile { version: version as u8, block_hash, params: params.clone() };
let payload_json = serde_json::to_string_pretty(&file)?;
std::fs::write(&payload_path, &payload_json)?;
info!(block_number = block.header.number, path = %payload_path.display(), "Saved payload");
call_new_payload(&provider, version, params).await?;
let forkchoice_state = ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash: block_hash,
finalized_block_hash: block_hash,
};
call_forkchoice_updated(&provider, version, forkchoice_state, None).await?;
parent_header = block.header;
parent_hash = block_hash;
next_block_number += 1;
}
let final_gas_limit = parent_header.gas_limit;
info!(
total_duration=?total_benchmark_duration.elapsed(),
blocks_processed = self.blocks,
final_gas_limit,
"Benchmark complete"
);
Ok(())
}
}
const fn max_gas_limit_increase(parent_gas_limit: u64) -> u64 {
(parent_gas_limit / GAS_LIMIT_BOUND_DIVISOR).saturating_sub(1)
}

View File

@@ -0,0 +1,617 @@
//! Command for generating large blocks by packing transactions from real blocks.
//!
//! This command fetches transactions from existing blocks and packs them into a single
//! large block using the `testing_buildBlockV1` RPC endpoint.
use crate::authenticated_transport::AuthenticatedTransportConnect;
use alloy_eips::{BlockNumberOrTag, Typed2718};
use alloy_primitives::{Bytes, B256};
use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{
ExecutionPayloadEnvelopeV4, ExecutionPayloadEnvelopeV5, ForkchoiceState, JwtSecret,
PayloadAttributes,
};
use alloy_transport::layers::RetryBackoffLayer;
use clap::Parser;
use eyre::Context;
use reqwest::Url;
use reth_cli_runner::CliContext;
use reth_rpc_api::TestingBuildBlockRequestV1;
use std::future::Future;
use tokio::sync::mpsc;
use tracing::{info, warn};
/// A single transaction with its gas used and raw encoded bytes.
#[derive(Debug, Clone)]
pub struct RawTransaction {
/// The actual gas used by the transaction (from receipt).
pub gas_used: u64,
/// The transaction type (e.g., 3 for EIP-4844 blob txs).
pub tx_type: u8,
/// The raw RLP-encoded transaction bytes.
pub raw: Bytes,
}
/// Abstraction over sources of transactions for big block generation.
///
/// Implementors provide transactions from different sources (RPC, database, files, etc.)
pub trait TransactionSource {
/// Fetch transactions from a specific block number.
///
/// Returns `Ok(None)` if the block doesn't exist.
/// Returns `Ok(Some((transactions, gas_used)))` with the block's transactions and total gas.
fn fetch_block_transactions(
&self,
block_number: u64,
) -> impl Future<Output = eyre::Result<Option<(Vec<RawTransaction>, u64)>>> + Send;
}
/// RPC-based transaction source that fetches from a remote node.
#[derive(Debug)]
pub struct RpcTransactionSource {
provider: RootProvider<AnyNetwork>,
}
impl RpcTransactionSource {
/// Create a new RPC transaction source.
pub const fn new(provider: RootProvider<AnyNetwork>) -> Self {
Self { provider }
}
/// Create from an RPC URL with retry backoff.
pub fn from_url(rpc_url: &str) -> eyre::Result<Self> {
let client = ClientBuilder::default()
.layer(RetryBackoffLayer::new(10, 800, u64::MAX))
.http(rpc_url.parse()?);
let provider = RootProvider::<AnyNetwork>::new(client);
Ok(Self { provider })
}
}
impl TransactionSource for RpcTransactionSource {
async fn fetch_block_transactions(
&self,
block_number: u64,
) -> eyre::Result<Option<(Vec<RawTransaction>, u64)>> {
// Fetch block and receipts in parallel
let (block, receipts) = tokio::try_join!(
self.provider.get_block_by_number(block_number.into()).full(),
self.provider.get_block_receipts(block_number.into())
)?;
let Some(block) = block else {
return Ok(None);
};
let Some(receipts) = receipts else {
return Err(eyre::eyre!("Receipts not found for block {}", block_number));
};
let block_gas_used = block.header.gas_used;
// Convert cumulative gas from receipts to per-tx gas_used
let mut prev_cumulative = 0u64;
let transactions: Vec<RawTransaction> = block
.transactions
.txns()
.zip(receipts.iter())
.map(|(tx, receipt)| {
let cumulative = receipt.inner.inner.inner.receipt.cumulative_gas_used;
let gas_used = cumulative - prev_cumulative;
prev_cumulative = cumulative;
let with_encoded = tx.inner.inner.clone().into_encoded();
RawTransaction {
gas_used,
tx_type: tx.inner.ty(),
raw: with_encoded.encoded_bytes().clone(),
}
})
.collect();
Ok(Some((transactions, block_gas_used)))
}
}
/// Collects transactions from a source up to a target gas usage.
#[derive(Debug)]
pub struct TransactionCollector<S> {
source: S,
target_gas: u64,
}
impl<S: TransactionSource> TransactionCollector<S> {
/// Create a new transaction collector.
pub const fn new(source: S, target_gas: u64) -> Self {
Self { source, target_gas }
}
/// Collect transactions starting from the given block number.
///
/// Skips blob transactions (type 3) and collects until target gas is reached.
/// Returns the collected raw transaction bytes, total gas used, and the next block number.
pub async fn collect(&self, start_block: u64) -> eyre::Result<(Vec<Bytes>, u64, u64)> {
let mut transactions: Vec<Bytes> = Vec::new();
let mut total_gas: u64 = 0;
let mut current_block = start_block;
while total_gas < self.target_gas {
let Some((block_txs, _)) = self.source.fetch_block_transactions(current_block).await?
else {
warn!(block = current_block, "Block not found, stopping");
break;
};
for tx in block_txs {
// Skip blob transactions (EIP-4844, type 3)
if tx.tx_type == 3 {
continue;
}
if total_gas + tx.gas_used <= self.target_gas {
transactions.push(tx.raw);
total_gas += tx.gas_used;
}
if total_gas >= self.target_gas {
break;
}
}
current_block += 1;
// Stop early if remaining gas is under 1M (close enough to target)
let remaining_gas = self.target_gas.saturating_sub(total_gas);
if remaining_gas < 1_000_000 {
break;
}
}
info!(
total_txs = transactions.len(),
total_gas,
next_block = current_block,
"Finished collecting transactions"
);
Ok((transactions, total_gas, current_block))
}
}
/// `reth bench generate-big-block` command
///
/// Generates a large block by fetching transactions from existing blocks and packing them
/// into a single block using the `testing_buildBlockV1` RPC endpoint.
#[derive(Debug, Parser)]
pub struct Command {
/// The RPC URL to use for fetching blocks (can be an external archive node).
#[arg(long, value_name = "RPC_URL")]
rpc_url: String,
/// The engine RPC URL (with JWT authentication).
#[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")]
engine_rpc_url: String,
/// The RPC URL for `testing_buildBlockV1` calls (same node as engine, regular RPC port).
#[arg(long, value_name = "TESTING_RPC_URL", default_value = "http://localhost:8545")]
testing_rpc_url: String,
/// Path to the JWT secret file for engine API authentication.
#[arg(long, value_name = "JWT_SECRET")]
jwt_secret: std::path::PathBuf,
/// Target gas to pack into the block.
#[arg(long, value_name = "TARGET_GAS", default_value = "30000000")]
target_gas: u64,
/// Starting block number to fetch transactions from.
/// If not specified, starts from the engine's latest block.
#[arg(long, value_name = "FROM_BLOCK")]
from_block: Option<u64>,
/// Execute the payload (call newPayload + forkchoiceUpdated).
/// If false, only builds the payload and prints it.
#[arg(long, default_value = "false")]
execute: bool,
/// Number of payloads to generate. Each payload uses the previous as parent.
/// When count == 1, the payload is only generated and saved, not executed.
/// When count > 1, each payload is executed before building the next.
#[arg(long, default_value = "1")]
count: u64,
/// Number of transaction batches to prefetch in background when count > 1.
/// Higher values reduce latency but use more memory.
#[arg(long, default_value = "4")]
prefetch_buffer: usize,
/// Output directory for generated payloads. Each payload is saved as `payload_block_N.json`.
#[arg(long, value_name = "OUTPUT_DIR")]
output_dir: std::path::PathBuf,
}
/// A built payload ready for execution.
struct BuiltPayload {
block_number: u64,
envelope: ExecutionPayloadEnvelopeV4,
block_hash: B256,
timestamp: u64,
}
impl Command {
/// Execute the `generate-big-block` command
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
info!(target_gas = self.target_gas, count = self.count, "Generating big block(s)");
// Set up authenticated engine provider
let jwt =
std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
let jwt = JwtSecret::from_hex(jwt.trim())?;
let auth_url = Url::parse(&self.engine_rpc_url)?;
info!("Connecting to Engine RPC at {}", auth_url);
let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
// Set up testing RPC provider (for testing_buildBlockV1)
info!("Connecting to Testing RPC at {}", self.testing_rpc_url);
let testing_client = ClientBuilder::default()
.layer(RetryBackoffLayer::new(10, 800, u64::MAX))
.http(self.testing_rpc_url.parse()?);
let testing_provider = RootProvider::<AnyNetwork>::new(testing_client);
// Get the parent block (latest canonical block)
info!(endpoint = "engine", method = "eth_getBlockByNumber", block = "latest", "RPC call");
let parent_block = auth_provider
.get_block_by_number(BlockNumberOrTag::Latest)
.await?
.ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
let parent_hash = parent_block.header.hash;
let parent_number = parent_block.header.number;
let parent_timestamp = parent_block.header.timestamp;
info!(
parent_hash = %parent_hash,
parent_number = parent_number,
"Using initial parent block"
);
// Create output directory
std::fs::create_dir_all(&self.output_dir).wrap_err_with(|| {
format!("Failed to create output directory: {:?}", self.output_dir)
})?;
let start_block = self.from_block.unwrap_or(parent_number);
// Use pipelined execution when generating multiple payloads
if self.count > 1 {
self.execute_pipelined(
&auth_provider,
&testing_provider,
start_block,
parent_hash,
parent_timestamp,
)
.await?;
} else {
// Single payload - collect transactions and build
let tx_source = RpcTransactionSource::from_url(&self.rpc_url)?;
let collector = TransactionCollector::new(tx_source, self.target_gas);
let (transactions, _total_gas, _next_block) = collector.collect(start_block).await?;
if transactions.is_empty() {
return Err(eyre::eyre!("No transactions collected"));
}
self.execute_sequential(
&auth_provider,
&testing_provider,
transactions,
parent_hash,
parent_timestamp,
)
.await?;
}
info!(count = self.count, output_dir = %self.output_dir.display(), "All payloads generated");
Ok(())
}
/// Sequential execution path for single payload or no-execute mode.
async fn execute_sequential(
&self,
auth_provider: &RootProvider<AnyNetwork>,
testing_provider: &RootProvider<AnyNetwork>,
transactions: Vec<Bytes>,
mut parent_hash: B256,
mut parent_timestamp: u64,
) -> eyre::Result<()> {
for i in 0..self.count {
info!(
payload = i + 1,
total = self.count,
parent_hash = %parent_hash,
parent_timestamp = parent_timestamp,
"Building payload via testing_buildBlockV1"
);
let built = self
.build_payload(testing_provider, &transactions, i, parent_hash, parent_timestamp)
.await?;
self.save_payload(&built)?;
if self.execute || self.count > 1 {
info!(payload = i + 1, block_hash = %built.block_hash, "Executing payload (newPayload + FCU)");
self.execute_payload_v4(auth_provider, built.envelope, parent_hash).await?;
info!(payload = i + 1, "Payload executed successfully");
}
parent_hash = built.block_hash;
parent_timestamp = built.timestamp;
}
Ok(())
}
/// Pipelined execution - fetches transactions and builds payloads in background.
async fn execute_pipelined(
&self,
auth_provider: &RootProvider<AnyNetwork>,
testing_provider: &RootProvider<AnyNetwork>,
start_block: u64,
initial_parent_hash: B256,
initial_parent_timestamp: u64,
) -> eyre::Result<()> {
// Create channel for transaction batches (one batch per payload)
let (tx_sender, mut tx_receiver) = mpsc::channel::<Vec<Bytes>>(self.prefetch_buffer);
// Spawn background task to continuously fetch transaction batches
let rpc_url = self.rpc_url.clone();
let target_gas = self.target_gas;
let count = self.count;
let fetcher_handle = tokio::spawn(async move {
let tx_source = match RpcTransactionSource::from_url(&rpc_url) {
Ok(source) => source,
Err(e) => {
warn!(error = %e, "Failed to create transaction source");
return;
}
};
let collector = TransactionCollector::new(tx_source, target_gas);
let mut current_block = start_block;
for payload_idx in 0..count {
match collector.collect(current_block).await {
Ok((transactions, total_gas, next_block)) => {
info!(
payload = payload_idx + 1,
tx_count = transactions.len(),
total_gas,
blocks = format!("{}..{}", current_block, next_block),
"Fetched transactions"
);
current_block = next_block;
if tx_sender.send(transactions).await.is_err() {
break;
}
}
Err(e) => {
warn!(payload = payload_idx + 1, error = %e, "Failed to fetch transactions");
break;
}
}
}
});
let mut parent_hash = initial_parent_hash;
let mut parent_timestamp = initial_parent_timestamp;
let mut pending_build: Option<tokio::task::JoinHandle<eyre::Result<BuiltPayload>>> = None;
for i in 0..self.count {
let is_last = i == self.count - 1;
// Get current payload (either from pending build or build now)
let current_payload = if let Some(handle) = pending_build.take() {
handle.await??
} else {
// First payload - wait for transactions and build synchronously
let transactions = tx_receiver
.recv()
.await
.ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?;
if transactions.is_empty() {
return Err(eyre::eyre!("No transactions collected for payload {}", i + 1));
}
info!(
payload = i + 1,
total = self.count,
parent_hash = %parent_hash,
parent_timestamp = parent_timestamp,
tx_count = transactions.len(),
"Building payload via testing_buildBlockV1"
);
self.build_payload(
testing_provider,
&transactions,
i,
parent_hash,
parent_timestamp,
)
.await?
};
self.save_payload(&current_payload)?;
let current_block_hash = current_payload.block_hash;
let current_timestamp = current_payload.timestamp;
// Execute current payload first
info!(payload = i + 1, block_hash = %current_block_hash, "Executing payload (newPayload + FCU)");
self.execute_payload_v4(auth_provider, current_payload.envelope, parent_hash).await?;
info!(payload = i + 1, "Payload executed successfully");
// Start building next payload in background (if not last) - AFTER execution
if !is_last {
// Get transactions for next payload (should already be fetched or fetching)
let next_transactions = tx_receiver
.recv()
.await
.ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?;
if next_transactions.is_empty() {
return Err(eyre::eyre!("No transactions collected for payload {}", i + 2));
}
let testing_provider = testing_provider.clone();
let next_index = i + 1;
let total = self.count;
pending_build = Some(tokio::spawn(async move {
info!(
payload = next_index + 1,
total = total,
parent_hash = %current_block_hash,
parent_timestamp = current_timestamp,
tx_count = next_transactions.len(),
"Building payload via testing_buildBlockV1"
);
Self::build_payload_static(
&testing_provider,
&next_transactions,
next_index,
current_block_hash,
current_timestamp,
)
.await
}));
}
parent_hash = current_block_hash;
parent_timestamp = current_timestamp;
}
// Clean up the fetcher task
drop(tx_receiver);
let _ = fetcher_handle.await;
Ok(())
}
/// Build a single payload via `testing_buildBlockV1`.
async fn build_payload(
&self,
testing_provider: &RootProvider<AnyNetwork>,
transactions: &[Bytes],
index: u64,
parent_hash: B256,
parent_timestamp: u64,
) -> eyre::Result<BuiltPayload> {
Self::build_payload_static(
testing_provider,
transactions,
index,
parent_hash,
parent_timestamp,
)
.await
}
/// Static version for use in spawned tasks.
async fn build_payload_static(
testing_provider: &RootProvider<AnyNetwork>,
transactions: &[Bytes],
index: u64,
parent_hash: B256,
parent_timestamp: u64,
) -> eyre::Result<BuiltPayload> {
let request = TestingBuildBlockRequestV1 {
parent_block_hash: parent_hash,
payload_attributes: PayloadAttributes {
timestamp: parent_timestamp + 12,
prev_randao: B256::ZERO,
suggested_fee_recipient: alloy_primitives::Address::ZERO,
withdrawals: Some(vec![]),
parent_beacon_block_root: Some(B256::ZERO),
},
transactions: transactions.to_vec(),
extra_data: None,
};
let total_tx_bytes: usize = transactions.iter().map(|tx| tx.len()).sum();
info!(
payload = index + 1,
tx_count = transactions.len(),
total_tx_bytes = total_tx_bytes,
parent_hash = %parent_hash,
"Sending to testing_buildBlockV1"
);
let envelope: ExecutionPayloadEnvelopeV5 =
testing_provider.client().request("testing_buildBlockV1", [request]).await?;
let v4_envelope = envelope.try_into_v4()?;
let inner = &v4_envelope.envelope_inner.execution_payload.payload_inner.payload_inner;
let block_hash = inner.block_hash;
let block_number = inner.block_number;
let timestamp = inner.timestamp;
Ok(BuiltPayload { block_number, envelope: v4_envelope, block_hash, timestamp })
}
/// Save a payload to disk.
fn save_payload(&self, payload: &BuiltPayload) -> eyre::Result<()> {
let filename = format!("payload_block_{}.json", payload.block_number);
let filepath = self.output_dir.join(&filename);
let json = serde_json::to_string_pretty(&payload.envelope)?;
std::fs::write(&filepath, &json)
.wrap_err_with(|| format!("Failed to write payload to {:?}", filepath))?;
info!(block_number = payload.block_number, block_hash = %payload.block_hash, path = %filepath.display(), "Payload saved");
Ok(())
}
async fn execute_payload_v4(
&self,
provider: &RootProvider<AnyNetwork>,
envelope: ExecutionPayloadEnvelopeV4,
parent_hash: B256,
) -> eyre::Result<()> {
let block_hash =
envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
let status = provider
.new_payload_v4(
envelope.envelope_inner.execution_payload,
vec![],
B256::ZERO,
envelope.execution_requests.to_vec(),
)
.await?;
if !status.is_valid() {
return Err(eyre::eyre!("Payload rejected: {:?}", status));
}
let fcu_state = ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash: parent_hash,
finalized_block_hash: parent_hash,
};
let fcu_result = provider.fork_choice_updated_v3(fcu_state, None).await?;
if !fcu_result.is_valid() {
return Err(eyre::eyre!("FCU rejected: {:?}", fcu_result));
}
Ok(())
}
}

View File

@@ -0,0 +1,196 @@
//! Common helpers for reth-bench commands.
use crate::valid_payload::call_forkchoice_updated;
use alloy_consensus::Header;
use alloy_eips::eip4844::kzg_to_versioned_hash;
use alloy_primitives::{Address, B256};
use alloy_provider::{ext::EngineApi, network::AnyNetwork, RootProvider};
use alloy_rpc_types_engine::{
CancunPayloadFields, ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState,
PayloadAttributes, PayloadId, PraguePayloadFields,
};
use eyre::OptionExt;
use reth_chainspec::{ChainSpec, EthereumHardforks};
use reth_node_api::EngineApiMessageVersion;
use tracing::debug;
/// Prepared payload request data for triggering block building.
pub(crate) struct PayloadRequest {
/// The payload attributes for the new block.
pub(crate) attributes: PayloadAttributes,
/// The forkchoice state pointing to the parent block.
pub(crate) forkchoice_state: ForkchoiceState,
/// The engine API version for FCU calls.
pub(crate) fcu_version: EngineApiMessageVersion,
/// The getPayload version to use (1-5).
pub(crate) get_payload_version: u8,
/// The newPayload version to use.
pub(crate) new_payload_version: EngineApiMessageVersion,
}
/// Prepare payload attributes and forkchoice state for a new block.
pub(crate) fn prepare_payload_request(
chain_spec: &ChainSpec,
timestamp: u64,
parent_hash: B256,
) -> PayloadRequest {
let shanghai_active = chain_spec.is_shanghai_active_at_timestamp(timestamp);
let cancun_active = chain_spec.is_cancun_active_at_timestamp(timestamp);
let prague_active = chain_spec.is_prague_active_at_timestamp(timestamp);
let osaka_active = chain_spec.is_osaka_active_at_timestamp(timestamp);
// FCU version: V3 for Cancun+Prague+Osaka, V2 for Shanghai, V1 otherwise
let fcu_version = if cancun_active {
EngineApiMessageVersion::V3
} else if shanghai_active {
EngineApiMessageVersion::V2
} else {
EngineApiMessageVersion::V1
};
// getPayload version: 5 for Osaka, 4 for Prague, 3 for Cancun, 2 for Shanghai, 1 otherwise
// newPayload version: 4 for Prague+Osaka (no V5), 3 for Cancun, 2 for Shanghai, 1 otherwise
let (get_payload_version, new_payload_version) = if osaka_active {
(5, EngineApiMessageVersion::V4) // Osaka uses getPayloadV5 but newPayloadV4
} else if prague_active {
(4, EngineApiMessageVersion::V4)
} else if cancun_active {
(3, EngineApiMessageVersion::V3)
} else if shanghai_active {
(2, EngineApiMessageVersion::V2)
} else {
(1, EngineApiMessageVersion::V1)
};
PayloadRequest {
attributes: PayloadAttributes {
timestamp,
prev_randao: B256::ZERO,
suggested_fee_recipient: Address::ZERO,
withdrawals: shanghai_active.then(Vec::new),
parent_beacon_block_root: cancun_active.then_some(B256::ZERO),
},
forkchoice_state: ForkchoiceState {
head_block_hash: parent_hash,
safe_block_hash: parent_hash,
finalized_block_hash: parent_hash,
},
fcu_version,
get_payload_version,
new_payload_version,
}
}
/// Trigger payload building via FCU and retrieve the built payload.
///
/// This sends a forkchoiceUpdated with payload attributes to start building,
/// then calls getPayload to retrieve the result.
pub(crate) async fn build_payload(
provider: &RootProvider<AnyNetwork>,
request: PayloadRequest,
) -> eyre::Result<(ExecutionPayload, ExecutionPayloadSidecar)> {
let fcu_result = call_forkchoice_updated(
provider,
request.fcu_version,
request.forkchoice_state,
Some(request.attributes.clone()),
)
.await?;
let payload_id =
fcu_result.payload_id.ok_or_eyre("Payload builder did not return a payload id")?;
get_payload_with_sidecar(
provider,
request.get_payload_version,
payload_id,
request.attributes.parent_beacon_block_root,
)
.await
}
/// Convert an RPC block to a consensus header and block hash.
pub(crate) fn rpc_block_to_header(block: alloy_provider::network::AnyRpcBlock) -> (Header, B256) {
let block_hash = block.header.hash;
let header = block.header.inner.clone().into_header_with_defaults();
(header, block_hash)
}
/// Compute versioned hashes from KZG commitments.
fn versioned_hashes_from_commitments(
commitments: &[alloy_primitives::FixedBytes<48>],
) -> Vec<B256> {
commitments.iter().map(|c| kzg_to_versioned_hash(c.as_ref())).collect()
}
/// Fetch an execution payload using the appropriate engine API version.
pub(crate) async fn get_payload_with_sidecar(
provider: &RootProvider<AnyNetwork>,
version: u8,
payload_id: PayloadId,
parent_beacon_block_root: Option<B256>,
) -> eyre::Result<(ExecutionPayload, ExecutionPayloadSidecar)> {
debug!(get_payload_version = ?version, ?payload_id, "Sending getPayload");
match version {
1 => {
let payload = provider.get_payload_v1(payload_id).await?;
Ok((ExecutionPayload::V1(payload), ExecutionPayloadSidecar::none()))
}
2 => {
let envelope = provider.get_payload_v2(payload_id).await?;
let payload = match envelope.execution_payload {
alloy_rpc_types_engine::ExecutionPayloadFieldV2::V1(p) => ExecutionPayload::V1(p),
alloy_rpc_types_engine::ExecutionPayloadFieldV2::V2(p) => ExecutionPayload::V2(p),
};
Ok((payload, ExecutionPayloadSidecar::none()))
}
3 => {
let envelope = provider.get_payload_v3(payload_id).await?;
let versioned_hashes =
versioned_hashes_from_commitments(&envelope.blobs_bundle.commitments);
let cancun_fields = CancunPayloadFields {
parent_beacon_block_root: parent_beacon_block_root
.ok_or_eyre("parent_beacon_block_root required for V3")?,
versioned_hashes,
};
Ok((
ExecutionPayload::V3(envelope.execution_payload),
ExecutionPayloadSidecar::v3(cancun_fields),
))
}
4 => {
let envelope = provider.get_payload_v4(payload_id).await?;
let versioned_hashes = versioned_hashes_from_commitments(
&envelope.envelope_inner.blobs_bundle.commitments,
);
let cancun_fields = CancunPayloadFields {
parent_beacon_block_root: parent_beacon_block_root
.ok_or_eyre("parent_beacon_block_root required for V4")?,
versioned_hashes,
};
let prague_fields = PraguePayloadFields::new(envelope.execution_requests);
Ok((
ExecutionPayload::V3(envelope.envelope_inner.execution_payload),
ExecutionPayloadSidecar::v4(cancun_fields, prague_fields),
))
}
5 => {
// V5 (Osaka) - use raw request since alloy doesn't have get_payload_v5 yet
let envelope = provider.get_payload_v5(payload_id).await?;
let versioned_hashes =
versioned_hashes_from_commitments(&envelope.blobs_bundle.commitments);
let cancun_fields = CancunPayloadFields {
parent_beacon_block_root: parent_beacon_block_root
.ok_or_eyre("parent_beacon_block_root required for V5")?,
versioned_hashes,
};
let prague_fields = PraguePayloadFields::new(envelope.execution_requests);
Ok((
ExecutionPayload::V3(envelope.execution_payload),
ExecutionPayloadSidecar::v4(cancun_fields, prague_fields),
))
}
_ => panic!("This tool does not support getPayload versions past v5"),
}
}

View File

@@ -6,9 +6,16 @@ use reth_node_core::args::LogArgs;
use reth_tracing::FileWorkerGuard;
mod context;
mod gas_limit_ramp;
mod generate_big_block;
pub(crate) mod helpers;
pub use generate_big_block::{
RawTransaction, RpcTransactionSource, TransactionCollector, TransactionSource,
};
mod new_payload_fcu;
mod new_payload_only;
mod output;
mod replay_payloads;
mod send_payload;
/// `reth bench` command
@@ -27,6 +34,9 @@ pub enum Subcommands {
/// Benchmark which calls `newPayload`, then `forkchoiceUpdated`.
NewPayloadFcu(new_payload_fcu::Command),
/// Benchmark which builds empty blocks with a ramped gas limit.
GasLimitRamp(gas_limit_ramp::Command),
/// Benchmark which only calls subsequent `newPayload` calls.
NewPayloadOnly(new_payload_only::Command),
@@ -41,6 +51,29 @@ pub enum Subcommands {
/// `cast block latest --full --json | reth-bench send-payload --rpc-url localhost:5000
/// --jwt-secret $(cat ~/.local/share/reth/mainnet/jwt.hex)`
SendPayload(send_payload::Command),
/// Generate a large block by packing transactions from existing blocks.
///
/// This command fetches transactions from real blocks and packs them into a single
/// block using the `testing_buildBlockV1` RPC endpoint.
///
/// Example:
///
/// `reth-bench generate-big-block --rpc-url http://localhost:8545 --engine-rpc-url
/// http://localhost:8551 --jwt-secret ~/.local/share/reth/mainnet/jwt.hex --target-gas
/// 30000000`
GenerateBigBlock(generate_big_block::Command),
/// Replay pre-generated payloads from a directory.
///
/// This command reads payload files from a previous `generate-big-block` run and replays
/// them in sequence using `newPayload` followed by `forkchoiceUpdated`.
///
/// Example:
///
/// `reth-bench replay-payloads --payload-dir ./payloads --engine-rpc-url
/// http://localhost:8551 --jwt-secret ~/.local/share/reth/mainnet/jwt.hex`
ReplayPayloads(replay_payloads::Command),
}
impl BenchmarkCommand {
@@ -51,8 +84,11 @@ impl BenchmarkCommand {
match self.command {
Subcommands::NewPayloadFcu(command) => command.execute(ctx).await,
Subcommands::GasLimitRamp(command) => command.execute(ctx).await,
Subcommands::NewPayloadOnly(command) => command.execute(ctx).await,
Subcommands::SendPayload(command) => command.execute(ctx).await,
Subcommands::GenerateBigBlock(command) => command.execute(ctx).await,
Subcommands::ReplayPayloads(command) => command.execute(ctx).await,
}
}

View File

@@ -13,8 +13,7 @@ use crate::{
bench::{
context::BenchContext,
output::{
CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow, COMBINED_OUTPUT_SUFFIX,
GAS_OUTPUT_SUFFIX,
write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
},
},
valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
@@ -27,7 +26,6 @@ use alloy_rpc_client::RpcClient;
use alloy_rpc_types_engine::ForkchoiceState;
use alloy_transport_ws::WsConnect;
use clap::Parser;
use csv::Writer;
use eyre::{Context, OptionExt};
use futures::StreamExt;
use humantime::parse_duration;
@@ -123,6 +121,7 @@ impl Command {
auth_provider,
mut next_block,
is_optimism,
..
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let buffer_size = self.rpc_block_buffer_size;
@@ -188,6 +187,7 @@ impl Command {
result
} {
let gas_used = block.header.gas_used;
let gas_limit = block.header.gas_limit;
let block_number = block.header.number;
let transaction_count = block.transactions.len() as u64;
@@ -211,6 +211,7 @@ impl Command {
let fcu_latency = total_latency - new_payload_result.latency;
let combined_result = CombinedResult {
block_number,
gas_limit,
transaction_count,
new_payload_result,
fcu_latency,
@@ -240,28 +241,11 @@ impl Command {
// since the benchmark goal is measuring Ggas/s of newPayload/FCU, not persistence.
drop(waiter);
let (gas_output_results, combined_results): (_, Vec<CombinedResult>) =
let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
results.into_iter().unzip();
// Write CSV output files
if let Some(ref path) = self.benchmark.output {
let output_path = path.join(COMBINED_OUTPUT_SUFFIX);
info!("Writing engine api call latency output to file: {:?}", output_path);
let mut writer = Writer::from_path(&output_path)?;
for result in combined_results {
writer.serialize(result)?;
}
writer.flush()?;
let output_path = path.join(GAS_OUTPUT_SUFFIX);
info!("Writing total gas output to file: {:?}", output_path);
let mut writer = Writer::from_path(&output_path)?;
for row in &gas_output_results {
writer.serialize(row)?;
}
writer.flush()?;
info!("Finished writing benchmark output files to {:?}.", path);
write_benchmark_results(path, &gas_output_results, combined_results)?;
}
let gas_output = TotalGasOutput::new(gas_output_results)?;

View File

@@ -49,6 +49,7 @@ impl Command {
auth_provider,
mut next_block,
is_optimism,
..
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let buffer_size = self.rpc_block_buffer_size;
@@ -96,11 +97,7 @@ impl Command {
let transaction_count = block.transactions.len() as u64;
let gas_used = block.header.gas_used;
debug!(
target: "reth-bench",
number=?block.header.number,
"Sending payload to engine",
);
debug!(number=?block.header.number, "Sending payload to engine");
let (version, params) = block_to_new_payload(block, is_optimism)?;

View File

@@ -1,10 +1,13 @@
//! Contains various benchmark output formats, either for logging or for
//! serialization to / from files.
use alloy_primitives::B256;
use csv::Writer;
use eyre::OptionExt;
use reth_primitives_traits::constants::GIGAGAS;
use serde::{ser::SerializeStruct, Serialize};
use std::time::Duration;
use serde::{ser::SerializeStruct, Deserialize, Serialize};
use std::{path::Path, time::Duration};
use tracing::info;
/// This is the suffix for gas output csv files.
pub(crate) const GAS_OUTPUT_SUFFIX: &str = "total_gas.csv";
@@ -15,6 +18,17 @@ pub(crate) const COMBINED_OUTPUT_SUFFIX: &str = "combined_latency.csv";
/// This is the suffix for new payload output csv files.
pub(crate) const NEW_PAYLOAD_OUTPUT_SUFFIX: &str = "new_payload_latency.csv";
/// Serialized format for gas ramp payloads on disk.
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct GasRampPayloadFile {
/// Engine API version (1-5).
pub(crate) version: u8,
/// The block hash for FCU.
pub(crate) block_hash: B256,
/// The params to pass to newPayload.
pub(crate) params: serde_json::Value,
}
/// This represents the results of a single `newPayload` call in the benchmark, containing the gas
/// used and the `newPayload` latency.
#[derive(Debug)]
@@ -67,6 +81,8 @@ impl Serialize for NewPayloadResult {
pub(crate) struct CombinedResult {
/// The block number of the block being processed.
pub(crate) block_number: u64,
/// The gas limit of the block.
pub(crate) gas_limit: u64,
/// The number of transactions in the block.
pub(crate) transaction_count: u64,
/// The `newPayload` result.
@@ -88,7 +104,7 @@ impl std::fmt::Display for CombinedResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Payload {} processed at {:.4} Ggas/s, used {} total gas. Combined gas per second: {:.4} Ggas/s. fcu latency: {:?}, newPayload latency: {:?}",
"Block {} processed at {:.4} Ggas/s, used {} total gas. Combined: {:.4} Ggas/s. fcu: {:?}, newPayload: {:?}",
self.block_number,
self.new_payload_result.gas_per_second() / GIGAGAS as f64,
self.new_payload_result.gas_used,
@@ -110,10 +126,11 @@ impl Serialize for CombinedResult {
let fcu_latency = self.fcu_latency.as_micros();
let new_payload_latency = self.new_payload_result.latency.as_micros();
let total_latency = self.total_latency.as_micros();
let mut state = serializer.serialize_struct("CombinedResult", 6)?;
let mut state = serializer.serialize_struct("CombinedResult", 7)?;
// flatten the new payload result because this is meant for CSV writing
state.serialize_field("block_number", &self.block_number)?;
state.serialize_field("gas_limit", &self.gas_limit)?;
state.serialize_field("transaction_count", &self.transaction_count)?;
state.serialize_field("gas_used", &self.new_payload_result.gas_used)?;
state.serialize_field("new_payload_latency", &new_payload_latency)?;
@@ -167,6 +184,36 @@ impl TotalGasOutput {
}
}
/// Write benchmark results to CSV files.
///
/// Writes two files to the output directory:
/// - `combined_latency.csv`: Per-block latency results
/// - `total_gas.csv`: Per-block gas usage over time
pub(crate) fn write_benchmark_results(
output_dir: &Path,
gas_results: &[TotalGasRow],
combined_results: Vec<CombinedResult>,
) -> eyre::Result<()> {
let output_path = output_dir.join(COMBINED_OUTPUT_SUFFIX);
info!("Writing engine api call latency output to file: {:?}", output_path);
let mut writer = Writer::from_path(&output_path)?;
for result in combined_results {
writer.serialize(result)?;
}
writer.flush()?;
let output_path = output_dir.join(GAS_OUTPUT_SUFFIX);
info!("Writing total gas output to file: {:?}", output_path);
let mut writer = Writer::from_path(&output_path)?;
for row in gas_results {
writer.serialize(row)?;
}
writer.flush()?;
info!("Finished writing benchmark output files to {:?}.", output_dir);
Ok(())
}
/// This serializes the `time` field of the [`TotalGasRow`] to microseconds.
///
/// This is essentially just for the csv writer, which would have headers

View File

@@ -0,0 +1,332 @@
//! Command for replaying pre-generated payloads from disk.
//!
//! This command reads `ExecutionPayloadEnvelopeV4` files from a directory and replays them
//! in sequence using `newPayload` followed by `forkchoiceUpdated`.
use crate::{
authenticated_transport::AuthenticatedTransportConnect,
bench::output::GasRampPayloadFile,
valid_payload::{call_forkchoice_updated, call_new_payload},
};
use alloy_primitives::B256;
use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret};
use clap::Parser;
use eyre::Context;
use reqwest::Url;
use reth_cli_runner::CliContext;
use reth_node_api::EngineApiMessageVersion;
use std::path::PathBuf;
use tracing::{debug, info};
/// `reth bench replay-payloads` command
///
/// Replays pre-generated payloads from a directory by calling `newPayload` followed by
/// `forkchoiceUpdated` for each payload in sequence.
#[derive(Debug, Parser)]
pub struct Command {
/// The engine RPC URL (with JWT authentication).
#[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")]
engine_rpc_url: String,
/// Path to the JWT secret file for engine API authentication.
#[arg(long, value_name = "JWT_SECRET")]
jwt_secret: PathBuf,
/// Directory containing payload files (`payload_block_N.json`).
#[arg(long, value_name = "PAYLOAD_DIR")]
payload_dir: PathBuf,
/// Optional limit on the number of payloads to replay.
/// If not specified, replays all payloads in the directory.
#[arg(long, value_name = "COUNT")]
count: Option<usize>,
/// Skip the first N payloads.
#[arg(long, value_name = "SKIP", default_value = "0")]
skip: usize,
/// Optional directory containing gas ramp payloads to replay first.
/// These are replayed before the main payloads to warm up the gas limit.
#[arg(long, value_name = "GAS_RAMP_DIR")]
gas_ramp_dir: Option<PathBuf>,
}
/// A loaded payload ready for execution.
struct LoadedPayload {
/// The index (from filename).
index: u64,
/// The payload envelope.
envelope: ExecutionPayloadEnvelopeV4,
/// The block hash.
block_hash: B256,
}
/// A gas ramp payload loaded from disk.
struct GasRampPayload {
/// Block number from filename.
block_number: u64,
/// Engine API version for newPayload.
version: EngineApiMessageVersion,
/// The file contents.
file: GasRampPayloadFile,
}
impl Command {
/// Execute the `replay-payloads` command.
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
info!(payload_dir = %self.payload_dir.display(), "Replaying payloads");
// Set up authenticated engine provider
let jwt =
std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
let jwt = JwtSecret::from_hex(jwt.trim())?;
let auth_url = Url::parse(&self.engine_rpc_url)?;
info!("Connecting to Engine RPC at {}", auth_url);
let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
// Get parent block (latest canonical block) - we need this for the first FCU
let parent_block = auth_provider
.get_block_by_number(alloy_eips::BlockNumberOrTag::Latest)
.await?
.ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
let initial_parent_hash = parent_block.header.hash;
let initial_parent_number = parent_block.header.number;
info!(
parent_hash = %initial_parent_hash,
parent_number = initial_parent_number,
"Using initial parent block"
);
// Load all payloads upfront to avoid I/O delays between phases
let gas_ramp_payloads = if let Some(ref gas_ramp_dir) = self.gas_ramp_dir {
let payloads = self.load_gas_ramp_payloads(gas_ramp_dir)?;
if payloads.is_empty() {
return Err(eyre::eyre!("No gas ramp payload files found in {:?}", gas_ramp_dir));
}
info!(count = payloads.len(), "Loaded gas ramp payloads from disk");
payloads
} else {
Vec::new()
};
let payloads = self.load_payloads()?;
if payloads.is_empty() {
return Err(eyre::eyre!("No payload files found in {:?}", self.payload_dir));
}
info!(count = payloads.len(), "Loaded main payloads from disk");
let mut parent_hash = initial_parent_hash;
// Replay gas ramp payloads first
for (i, payload) in gas_ramp_payloads.iter().enumerate() {
info!(
gas_ramp_payload = i + 1,
total = gas_ramp_payloads.len(),
block_number = payload.block_number,
block_hash = %payload.file.block_hash,
"Executing gas ramp payload (newPayload + FCU)"
);
call_new_payload(&auth_provider, payload.version, payload.file.params.clone()).await?;
let fcu_state = ForkchoiceState {
head_block_hash: payload.file.block_hash,
safe_block_hash: parent_hash,
finalized_block_hash: parent_hash,
};
call_forkchoice_updated(&auth_provider, payload.version, fcu_state, None).await?;
info!(gas_ramp_payload = i + 1, "Gas ramp payload executed successfully");
parent_hash = payload.file.block_hash;
}
if !gas_ramp_payloads.is_empty() {
info!(count = gas_ramp_payloads.len(), "All gas ramp payloads replayed");
}
for (i, payload) in payloads.iter().enumerate() {
info!(
payload = i + 1,
total = payloads.len(),
index = payload.index,
block_hash = %payload.block_hash,
"Executing payload (newPayload + FCU)"
);
self.execute_payload_v4(&auth_provider, &payload.envelope, parent_hash).await?;
info!(payload = i + 1, "Payload executed successfully");
parent_hash = payload.block_hash;
}
info!(count = payloads.len(), "All payloads replayed successfully");
Ok(())
}
/// Load and parse all payload files from the directory.
fn load_payloads(&self) -> eyre::Result<Vec<LoadedPayload>> {
let mut payloads = Vec::new();
// Read directory entries
let entries: Vec<_> = std::fs::read_dir(&self.payload_dir)
.wrap_err_with(|| format!("Failed to read directory {:?}", self.payload_dir))?
.filter_map(|e| e.ok())
.filter(|e| {
e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
e.file_name().to_string_lossy().starts_with("payload_")
})
.collect();
// Parse filenames to get indices and sort
let mut indexed_paths: Vec<(u64, PathBuf)> = entries
.into_iter()
.filter_map(|e| {
let name = e.file_name();
let name_str = name.to_string_lossy();
// Extract index from "payload_NNN.json"
let index_str = name_str.strip_prefix("payload_")?.strip_suffix(".json")?;
let index: u64 = index_str.parse().ok()?;
Some((index, e.path()))
})
.collect();
indexed_paths.sort_by_key(|(idx, _)| *idx);
// Apply skip and count
let indexed_paths: Vec<_> = indexed_paths.into_iter().skip(self.skip).collect();
let indexed_paths: Vec<_> = match self.count {
Some(count) => indexed_paths.into_iter().take(count).collect(),
None => indexed_paths,
};
// Load each payload
for (index, path) in indexed_paths {
let content = std::fs::read_to_string(&path)
.wrap_err_with(|| format!("Failed to read {:?}", path))?;
let envelope: ExecutionPayloadEnvelopeV4 = serde_json::from_str(&content)
.wrap_err_with(|| format!("Failed to parse {:?}", path))?;
let block_hash =
envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
info!(
index = index,
block_hash = %block_hash,
path = %path.display(),
"Loaded payload"
);
payloads.push(LoadedPayload { index, envelope, block_hash });
}
Ok(payloads)
}
/// Load and parse gas ramp payload files from a directory.
fn load_gas_ramp_payloads(&self, dir: &PathBuf) -> eyre::Result<Vec<GasRampPayload>> {
let mut payloads = Vec::new();
let entries: Vec<_> = std::fs::read_dir(dir)
.wrap_err_with(|| format!("Failed to read directory {:?}", dir))?
.filter_map(|e| e.ok())
.filter(|e| {
e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
e.file_name().to_string_lossy().starts_with("payload_block_")
})
.collect();
// Parse filenames to get block numbers and sort
let mut indexed_paths: Vec<(u64, PathBuf)> = entries
.into_iter()
.filter_map(|e| {
let name = e.file_name();
let name_str = name.to_string_lossy();
// Extract block number from "payload_block_NNN.json"
let block_str = name_str.strip_prefix("payload_block_")?.strip_suffix(".json")?;
let block_number: u64 = block_str.parse().ok()?;
Some((block_number, e.path()))
})
.collect();
indexed_paths.sort_by_key(|(num, _)| *num);
for (block_number, path) in indexed_paths {
let content = std::fs::read_to_string(&path)
.wrap_err_with(|| format!("Failed to read {:?}", path))?;
let file: GasRampPayloadFile = serde_json::from_str(&content)
.wrap_err_with(|| format!("Failed to parse {:?}", path))?;
let version = match file.version {
1 => EngineApiMessageVersion::V1,
2 => EngineApiMessageVersion::V2,
3 => EngineApiMessageVersion::V3,
4 => EngineApiMessageVersion::V4,
5 => EngineApiMessageVersion::V5,
v => return Err(eyre::eyre!("Invalid version {} in {:?}", v, path)),
};
info!(
block_number,
block_hash = %file.block_hash,
path = %path.display(),
"Loaded gas ramp payload"
);
payloads.push(GasRampPayload { block_number, version, file });
}
Ok(payloads)
}
async fn execute_payload_v4(
&self,
provider: &RootProvider<AnyNetwork>,
envelope: &ExecutionPayloadEnvelopeV4,
parent_hash: B256,
) -> eyre::Result<()> {
let block_hash =
envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
debug!(
method = "engine_newPayloadV4",
block_hash = %block_hash,
"Sending newPayload"
);
let status = provider
.new_payload_v4(
envelope.envelope_inner.execution_payload.clone(),
vec![],
B256::ZERO,
envelope.execution_requests.to_vec(),
)
.await?;
info!(?status, "newPayloadV4 response");
if !status.is_valid() {
return Err(eyre::eyre!("Payload rejected: {:?}", status));
}
let fcu_state = ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash: parent_hash,
finalized_block_hash: parent_hash,
};
debug!(method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated");
let fcu_result = provider.fork_choice_updated_v3(fcu_state, None).await?;
info!(?fcu_result, "forkchoiceUpdatedV3 response");
Ok(())
}
}

View File

@@ -3,15 +3,16 @@
//! before sending additional calls.
use alloy_eips::eip7685::Requests;
use alloy_primitives::B256;
use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
use alloy_rpc_types_engine::{
ExecutionPayload, ExecutionPayloadInputV2, ForkchoiceState, ForkchoiceUpdated,
PayloadAttributes, PayloadStatus,
ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ForkchoiceState,
ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
};
use alloy_transport::TransportResult;
use op_alloy_rpc_types_engine::OpExecutionPayloadV4;
use reth_node_api::EngineApiMessageVersion;
use tracing::error;
use tracing::{debug, error};
/// An extension trait for providers that implement the engine API, to wait for a VALID response.
#[async_trait::async_trait]
@@ -52,6 +53,13 @@ where
fork_choice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated> {
debug!(
method = "engine_forkchoiceUpdatedV1",
?fork_choice_state,
?payload_attributes,
"Sending forkchoiceUpdated"
);
let mut status =
self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
@@ -82,6 +90,13 @@ where
fork_choice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated> {
debug!(
method = "engine_forkchoiceUpdatedV2",
?fork_choice_state,
?payload_attributes,
"Sending forkchoiceUpdated"
);
let mut status =
self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
@@ -112,6 +127,13 @@ where
fork_choice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated> {
debug!(
method = "engine_forkchoiceUpdatedV3",
?fork_choice_state,
?payload_attributes,
"Sending forkchoiceUpdated"
);
let mut status =
self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
@@ -148,33 +170,47 @@ pub(crate) fn block_to_new_payload(
// Convert to execution payload
let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None)
}
pub(crate) fn payload_to_new_payload(
payload: ExecutionPayload,
sidecar: ExecutionPayloadSidecar,
is_optimism: bool,
withdrawals_root: Option<B256>,
target_version: Option<EngineApiMessageVersion>,
) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> {
let (version, params) = match payload {
ExecutionPayload::V3(payload) => {
let cancun = sidecar.cancun().unwrap();
if let Some(prague) = sidecar.prague() {
// Use target version if provided (for Osaka), otherwise default to V4
let version = target_version.unwrap_or(EngineApiMessageVersion::V4);
if is_optimism {
let withdrawals_root = withdrawals_root.ok_or_else(|| {
eyre::eyre!("Missing withdrawals root for Optimism payload")
})?;
(
EngineApiMessageVersion::V4,
version,
serde_json::to_value((
OpExecutionPayloadV4 {
payload_inner: payload,
withdrawals_root: block.withdrawals_root.unwrap(),
},
OpExecutionPayloadV4 { payload_inner: payload, withdrawals_root },
cancun.versioned_hashes.clone(),
cancun.parent_beacon_block_root,
Requests::default(),
))?,
)
} else {
// Extract actual Requests from RequestsOrHash
let requests = prague.requests.requests_hash();
(
EngineApiMessageVersion::V4,
version,
serde_json::to_value((
payload,
cancun.versioned_hashes.clone(),
cancun.parent_beacon_block_root,
prague.requests.requests_hash(),
requests,
))?,
)
}
@@ -217,6 +253,8 @@ pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
) -> TransportResult<()> {
let method = version.method_name();
debug!(method, "Sending newPayload");
let mut status: PayloadStatus = provider.client().request(method, &params).await?;
while !status.is_valid() {
@@ -237,12 +275,15 @@ pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
/// Calls the correct `engine_forkchoiceUpdated` method depending on the given
/// `EngineApiMessageVersion`, using the provided forkchoice state and payload attributes for the
/// actual engine api message call.
///
/// Note: For Prague (V4), we still use forkchoiceUpdatedV3 as there is no V4.
pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
provider: P,
message_version: EngineApiMessageVersion,
forkchoice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated> {
// FCU V3 is used for both Cancun and Prague (there is no FCU V4)
match message_version {
EngineApiMessageVersion::V3 | EngineApiMessageVersion::V4 | EngineApiMessageVersion::V5 => {
provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await

View File

@@ -81,12 +81,16 @@ backon.workspace = true
tempfile.workspace = true
[features]
default = ["jemalloc", "otlp", "reth-revm/portable", "js-tracer", "keccak-cache-global", "asm-keccak"]
default = ["jemalloc", "otlp", "otlp-logs", "reth-revm/portable", "js-tracer", "keccak-cache-global", "asm-keccak"]
otlp = [
"reth-ethereum-cli/otlp",
"reth-node-core/otlp",
]
otlp-logs = [
"reth-ethereum-cli/otlp-logs",
"reth-node-core/otlp-logs",
]
js-tracer = [
"reth-node-builder/js-tracer",
"reth-node-ethereum/js-tracer",

View File

@@ -192,10 +192,10 @@ impl DeferredTrieData {
);
// Only trigger COW clone if there's actually data to add.
if !sorted_hashed_state.is_empty() {
Arc::make_mut(&mut overlay.state).extend_ref(&sorted_hashed_state);
Arc::make_mut(&mut overlay.state).extend_ref_and_sort(&sorted_hashed_state);
}
if !sorted_trie_updates.is_empty() {
Arc::make_mut(&mut overlay.nodes).extend_ref(&sorted_trie_updates);
Arc::make_mut(&mut overlay.nodes).extend_ref_and_sort(&sorted_trie_updates);
}
overlay
}
@@ -242,13 +242,13 @@ impl DeferredTrieData {
for ancestor in ancestors {
let ancestor_data = ancestor.wait_cloned();
state_mut.extend_ref(ancestor_data.hashed_state.as_ref());
nodes_mut.extend_ref(ancestor_data.trie_updates.as_ref());
state_mut.extend_ref_and_sort(ancestor_data.hashed_state.as_ref());
nodes_mut.extend_ref_and_sort(ancestor_data.trie_updates.as_ref());
}
// Extend with current block's sorted data last (takes precedence)
state_mut.extend_ref(sorted_hashed_state);
nodes_mut.extend_ref(sorted_trie_updates);
state_mut.extend_ref_and_sort(sorted_hashed_state);
nodes_mut.extend_ref_and_sort(sorted_trie_updates);
overlay
}
@@ -287,6 +287,11 @@ impl DeferredTrieData {
&inputs.ancestors,
);
*state = DeferredState::Ready(computed.clone());
// Release lock before inputs (and its ancestors) drop to avoid holding it
// while their potential last Arc refs drop (which could trigger recursive locking)
drop(state);
computed
}
}
@@ -516,7 +521,7 @@ mod tests {
let hashed_state = Arc::new(HashedPostStateSorted::new(accounts, B256Map::default()));
let trie_updates = Arc::default();
let mut overlay = TrieInputSorted::default();
Arc::make_mut(&mut overlay.state).extend_ref(hashed_state.as_ref());
Arc::make_mut(&mut overlay.state).extend_ref_and_sort(hashed_state.as_ref());
DeferredTrieData::ready(ComputedTrieData {
hashed_state,

View File

@@ -10,15 +10,15 @@ use alloy_primitives::{map::HashMap, BlockNumber, TxHash, B256};
use parking_lot::RwLock;
use reth_chainspec::ChainInfo;
use reth_ethereum_primitives::EthPrimitives;
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
use reth_metrics::{metrics::Gauge, Metrics};
use reth_primitives_traits::{
BlockBody as _, IndexedTx, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
SignedTransaction,
};
use reth_storage_api::StateProviderBox;
use reth_trie::{updates::TrieUpdatesSorted, HashedPostStateSorted, TrieInputSorted};
use std::{collections::BTreeMap, ops::Deref, sync::Arc, time::Instant};
use reth_trie::{updates::TrieUpdatesSorted, HashedPostStateSorted, LazyTrieData, TrieInputSorted};
use std::{collections::BTreeMap, sync::Arc, time::Instant};
use tokio::sync::{broadcast, watch};
/// Size of the broadcast channel used to notify canonical state events.
@@ -648,7 +648,7 @@ impl<N: NodePrimitives> BlockState<N> {
}
/// Returns the `Receipts` of executed block that determines the state.
pub fn receipts(&self) -> &Vec<Vec<N::Receipt>> {
pub fn receipts(&self) -> &Vec<N::Receipt> {
&self.block.execution_outcome().receipts
}
@@ -659,15 +659,7 @@ impl<N: NodePrimitives> BlockState<N> {
///
/// This clones the vector of receipts. To avoid it, use [`Self::executed_block_receipts_ref`].
pub fn executed_block_receipts(&self) -> Vec<N::Receipt> {
let receipts = self.receipts();
debug_assert!(
receipts.len() <= 1,
"Expected at most one block's worth of receipts, found {}",
receipts.len()
);
receipts.first().cloned().unwrap_or_default()
self.receipts().clone()
}
/// Returns a slice of `Receipt` of executed block that determines the state.
@@ -675,15 +667,7 @@ impl<N: NodePrimitives> BlockState<N> {
/// has only one element corresponding to the executed block associated to
/// the state.
pub fn executed_block_receipts_ref(&self) -> &[N::Receipt] {
let receipts = self.receipts();
debug_assert!(
receipts.len() <= 1,
"Expected at most one block's worth of receipts, found {}",
receipts.len()
);
receipts.first().map(|receipts| receipts.deref()).unwrap_or_default()
self.receipts()
}
/// Returns an iterator over __parent__ `BlockStates`.
@@ -767,7 +751,7 @@ pub struct ExecutedBlock<N: NodePrimitives = EthPrimitives> {
/// Recovered Block
pub recovered_block: Arc<RecoveredBlock<N::Block>>,
/// Block's execution outcome.
pub execution_output: Arc<ExecutionOutcome<N::Receipt>>,
pub execution_output: Arc<BlockExecutionOutput<N::Receipt>>,
/// Deferred trie data produced by execution.
///
/// This allows deferring the computation of the trie data which can be expensive.
@@ -779,7 +763,15 @@ impl<N: NodePrimitives> Default for ExecutedBlock<N> {
fn default() -> Self {
Self {
recovered_block: Default::default(),
execution_output: Default::default(),
execution_output: Arc::new(BlockExecutionOutput {
result: BlockExecutionResult {
receipts: Default::default(),
requests: Default::default(),
gas_used: 0,
blob_gas_used: 0,
},
state: Default::default(),
}),
trie_data: DeferredTrieData::ready(ComputedTrieData::default()),
}
}
@@ -800,7 +792,7 @@ impl<N: NodePrimitives> ExecutedBlock<N> {
/// payload builders). This is the safe default path.
pub fn new(
recovered_block: Arc<RecoveredBlock<N::Block>>,
execution_output: Arc<ExecutionOutcome<N::Receipt>>,
execution_output: Arc<BlockExecutionOutput<N::Receipt>>,
trie_data: ComputedTrieData,
) -> Self {
Self { recovered_block, execution_output, trie_data: DeferredTrieData::ready(trie_data) }
@@ -822,7 +814,7 @@ impl<N: NodePrimitives> ExecutedBlock<N> {
/// Use [`Self::new()`] instead when trie data is already computed and available immediately.
pub const fn with_deferred_trie_data(
recovered_block: Arc<RecoveredBlock<N::Block>>,
execution_output: Arc<ExecutionOutcome<N::Receipt>>,
execution_output: Arc<BlockExecutionOutput<N::Receipt>>,
trie_data: DeferredTrieData,
) -> Self {
Self { recovered_block, execution_output, trie_data }
@@ -842,7 +834,7 @@ impl<N: NodePrimitives> ExecutedBlock<N> {
/// Returns a reference to the block's execution outcome
#[inline]
pub fn execution_outcome(&self) -> &ExecutionOutcome<N::Receipt> {
pub fn execution_outcome(&self) -> &BlockExecutionOutput<N::Receipt> {
&self.execution_output
}
@@ -942,37 +934,39 @@ impl<N: NodePrimitives<SignedTx: SignedTransaction>> NewCanonicalChain<N> {
pub fn to_chain_notification(&self) -> CanonStateNotification<N> {
match self {
Self::Commit { new } => {
let new = Arc::new(new.iter().fold(Chain::default(), |mut chain, exec| {
chain.append_block(
exec.recovered_block().clone(),
exec.execution_outcome().clone(),
exec.trie_updates(),
exec.hashed_state(),
);
chain
}));
CanonStateNotification::Commit { new }
CanonStateNotification::Commit { new: Arc::new(Self::blocks_to_chain(new)) }
}
Self::Reorg { new, old } => {
let new = Arc::new(new.iter().fold(Chain::default(), |mut chain, exec| {
Self::Reorg { new, old } => CanonStateNotification::Reorg {
new: Arc::new(Self::blocks_to_chain(new)),
old: Arc::new(Self::blocks_to_chain(old)),
},
}
}
/// Converts a slice of executed blocks into a [`Chain`].
fn blocks_to_chain(blocks: &[ExecutedBlock<N>]) -> Chain<N> {
match blocks {
[] => Chain::default(),
[first, rest @ ..] => {
let mut chain = Chain::from_block(
first.recovered_block().clone(),
ExecutionOutcome::from((
first.execution_outcome().clone(),
first.block_number(),
)),
LazyTrieData::ready(first.hashed_state(), first.trie_updates()),
);
for exec in rest {
chain.append_block(
exec.recovered_block().clone(),
exec.execution_outcome().clone(),
exec.trie_updates(),
exec.hashed_state(),
ExecutionOutcome::from((
exec.execution_outcome().clone(),
exec.block_number(),
)),
LazyTrieData::ready(exec.hashed_state(), exec.trie_updates()),
);
chain
}));
let old = Arc::new(old.iter().fold(Chain::default(), |mut chain, exec| {
chain.append_block(
exec.recovered_block().clone(),
exec.execution_outcome().clone(),
exec.trie_updates(),
exec.hashed_state(),
);
chain
}));
CanonStateNotification::Reorg { new, old }
}
chain
}
}
}
@@ -1266,7 +1260,7 @@ mod tests {
let state = BlockState::new(block);
assert_eq!(state.receipts(), &receipts);
assert_eq!(state.receipts(), receipts.first().unwrap());
}
#[test]
@@ -1543,33 +1537,31 @@ mod tests {
let block2a =
test_block_builder.get_executed_block_with_number(2, block1.recovered_block.hash());
let sample_execution_outcome = ExecutionOutcome {
receipts: vec![vec![], vec![]],
requests: vec![Requests::default(), Requests::default()],
..Default::default()
};
// Test commit notification
let chain_commit = NewCanonicalChain::Commit { new: vec![block0.clone(), block1.clone()] };
// Build expected trie updates map
let mut expected_trie_updates = BTreeMap::new();
expected_trie_updates.insert(0, block0.trie_updates());
expected_trie_updates.insert(1, block1.trie_updates());
// Build expected trie data map
let mut expected_trie_data = BTreeMap::new();
expected_trie_data
.insert(0, LazyTrieData::ready(block0.hashed_state(), block0.trie_updates()));
expected_trie_data
.insert(1, LazyTrieData::ready(block1.hashed_state(), block1.trie_updates()));
// Build expected hashed state map
let mut expected_hashed_state = BTreeMap::new();
expected_hashed_state.insert(0, block0.hashed_state());
expected_hashed_state.insert(1, block1.hashed_state());
// Build expected execution outcome (first_block matches first block number)
let commit_execution_outcome = ExecutionOutcome {
receipts: vec![vec![], vec![]],
requests: vec![Requests::default(), Requests::default()],
first_block: 0,
..Default::default()
};
assert_eq!(
chain_commit.to_chain_notification(),
CanonStateNotification::Commit {
new: Arc::new(Chain::new(
vec![block0.recovered_block().clone(), block1.recovered_block().clone()],
sample_execution_outcome.clone(),
expected_trie_updates,
expected_hashed_state
commit_execution_outcome,
expected_trie_data,
))
}
);
@@ -1580,40 +1572,39 @@ mod tests {
old: vec![block1.clone(), block2.clone()],
};
// Build expected trie updates for old chain
let mut old_trie_updates = BTreeMap::new();
old_trie_updates.insert(1, block1.trie_updates());
old_trie_updates.insert(2, block2.trie_updates());
// Build expected trie data for old chain
let mut old_trie_data = BTreeMap::new();
old_trie_data.insert(1, LazyTrieData::ready(block1.hashed_state(), block1.trie_updates()));
old_trie_data.insert(2, LazyTrieData::ready(block2.hashed_state(), block2.trie_updates()));
// Build expected trie updates for new chain
let mut new_trie_updates = BTreeMap::new();
new_trie_updates.insert(1, block1a.trie_updates());
new_trie_updates.insert(2, block2a.trie_updates());
// Build expected trie data for new chain
let mut new_trie_data = BTreeMap::new();
new_trie_data
.insert(1, LazyTrieData::ready(block1a.hashed_state(), block1a.trie_updates()));
new_trie_data
.insert(2, LazyTrieData::ready(block2a.hashed_state(), block2a.trie_updates()));
// Build expected hashed state for old chain
let mut old_hashed_state = BTreeMap::new();
old_hashed_state.insert(1, block1.hashed_state());
old_hashed_state.insert(2, block2.hashed_state());
// Build expected hashed state for new chain
let mut new_hashed_state = BTreeMap::new();
new_hashed_state.insert(1, block1a.hashed_state());
new_hashed_state.insert(2, block2a.hashed_state());
// Build expected execution outcome for reorg chains (first_block matches first block
// number)
let reorg_execution_outcome = ExecutionOutcome {
receipts: vec![vec![], vec![]],
requests: vec![Requests::default(), Requests::default()],
first_block: 1,
..Default::default()
};
assert_eq!(
chain_reorg.to_chain_notification(),
CanonStateNotification::Reorg {
old: Arc::new(Chain::new(
vec![block1.recovered_block().clone(), block2.recovered_block().clone()],
sample_execution_outcome.clone(),
old_trie_updates,
old_hashed_state
reorg_execution_outcome.clone(),
old_trie_data,
)),
new: Arc::new(Chain::new(
vec![block1a.recovered_block().clone(), block2a.recovered_block().clone()],
sample_execution_outcome,
new_trie_updates,
new_hashed_state
reorg_execution_outcome,
new_trie_data,
))
}
);

View File

@@ -0,0 +1,228 @@
//! Lazy overlay computation for trie input.
//!
//! This module provides [`LazyOverlay`], a type that computes the [`TrieInputSorted`]
//! lazily on first access. This allows execution to start before the trie overlay
//! is fully computed.
use crate::DeferredTrieData;
use alloy_primitives::B256;
use reth_trie::{updates::TrieUpdatesSorted, HashedPostStateSorted, TrieInputSorted};
use std::sync::{Arc, OnceLock};
use tracing::{debug, trace};
/// Inputs captured for lazy overlay computation.
#[derive(Clone)]
struct LazyOverlayInputs {
/// The persisted ancestor hash (anchor) this overlay should be built on.
anchor_hash: B256,
/// Deferred trie data handles for all in-memory blocks (newest to oldest).
blocks: Vec<DeferredTrieData>,
}
/// Lazily computed trie overlay.
///
/// Captures the inputs needed to compute a [`TrieInputSorted`] and defers the actual
/// computation until first access. This is conceptually similar to [`DeferredTrieData`]
/// but for overlay computation.
///
/// # Fast Path vs Slow Path
///
/// - **Fast path**: If the tip block's cached `anchored_trie_input` is ready and its `anchor_hash`
/// matches our expected anchor, we can reuse it directly (O(1)).
/// - **Slow path**: Otherwise, we merge all ancestor blocks' trie data into a new overlay.
#[derive(Clone)]
pub struct LazyOverlay {
/// Computed result, cached after first access.
inner: Arc<OnceLock<TrieInputSorted>>,
/// Inputs for lazy computation.
inputs: LazyOverlayInputs,
}
impl std::fmt::Debug for LazyOverlay {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LazyOverlay")
.field("anchor_hash", &self.inputs.anchor_hash)
.field("num_blocks", &self.inputs.blocks.len())
.field("computed", &self.inner.get().is_some())
.finish()
}
}
impl LazyOverlay {
/// Create a new lazy overlay with the given anchor hash and block handles.
///
/// # Arguments
///
/// * `anchor_hash` - The persisted ancestor hash this overlay is built on top of
/// * `blocks` - Deferred trie data handles for in-memory blocks (newest to oldest)
pub fn new(anchor_hash: B256, blocks: Vec<DeferredTrieData>) -> Self {
Self { inner: Arc::new(OnceLock::new()), inputs: LazyOverlayInputs { anchor_hash, blocks } }
}
/// Returns the anchor hash this overlay is built on.
pub const fn anchor_hash(&self) -> B256 {
self.inputs.anchor_hash
}
/// Returns the number of in-memory blocks this overlay covers.
pub const fn num_blocks(&self) -> usize {
self.inputs.blocks.len()
}
/// Returns true if the overlay has already been computed.
pub fn is_computed(&self) -> bool {
self.inner.get().is_some()
}
/// Returns the computed trie input, computing it if necessary.
///
/// The first call triggers computation (which may block waiting for deferred data).
/// Subsequent calls return the cached result immediately.
pub fn get(&self) -> &TrieInputSorted {
self.inner.get_or_init(|| self.compute())
}
/// Returns the overlay as (nodes, state) tuple for use with `OverlayStateProviderFactory`.
pub fn as_overlay(&self) -> (Arc<TrieUpdatesSorted>, Arc<HashedPostStateSorted>) {
let input = self.get();
(Arc::clone(&input.nodes), Arc::clone(&input.state))
}
/// Compute the trie input overlay.
fn compute(&self) -> TrieInputSorted {
let anchor_hash = self.inputs.anchor_hash;
let blocks = &self.inputs.blocks;
if blocks.is_empty() {
debug!(target: "chain_state::lazy_overlay", "No in-memory blocks, returning empty overlay");
return TrieInputSorted::default();
}
// Fast path: Check if tip block's overlay is ready and anchor matches.
// The tip block (first in list) has the cumulative overlay from all ancestors.
if let Some(tip) = blocks.first() {
let data = tip.wait_cloned();
if let Some(anchored) = &data.anchored_trie_input {
if anchored.anchor_hash == anchor_hash {
trace!(target: "chain_state::lazy_overlay", %anchor_hash, "Reusing tip block's cached overlay (fast path)");
return (*anchored.trie_input).clone();
}
debug!(
target: "chain_state::lazy_overlay",
computed_anchor = %anchored.anchor_hash,
%anchor_hash,
"Anchor mismatch, falling back to merge"
);
}
}
// Slow path: Merge all blocks' trie data into a new overlay.
debug!(target: "chain_state::lazy_overlay", num_blocks = blocks.len(), "Merging blocks (slow path)");
Self::merge_blocks(blocks)
}
/// Merge all blocks' trie data into a single [`TrieInputSorted`].
///
/// Blocks are ordered newest to oldest. Uses hybrid merge algorithm that
/// switches between `extend_ref` (small batches) and k-way merge (large batches).
fn merge_blocks(blocks: &[DeferredTrieData]) -> TrieInputSorted {
const MERGE_BATCH_THRESHOLD: usize = 64;
if blocks.is_empty() {
return TrieInputSorted::default();
}
// Single block: use its data directly (no allocation)
if blocks.len() == 1 {
let data = blocks[0].wait_cloned();
return TrieInputSorted {
state: data.hashed_state,
nodes: data.trie_updates,
prefix_sets: Default::default(),
};
}
if blocks.len() < MERGE_BATCH_THRESHOLD {
// Small k: extend_ref loop with Arc::make_mut is faster.
// Uses copy-on-write - only clones inner data if Arc has multiple refs.
// Iterate oldest->newest so newer values override older ones.
let mut blocks_iter = blocks.iter().rev();
let first = blocks_iter.next().expect("blocks is non-empty");
let data = first.wait_cloned();
let mut state = data.hashed_state;
let mut nodes = data.trie_updates;
for block in blocks_iter {
let block_data = block.wait_cloned();
Arc::make_mut(&mut state).extend_ref_and_sort(block_data.hashed_state.as_ref());
Arc::make_mut(&mut nodes).extend_ref_and_sort(block_data.trie_updates.as_ref());
}
TrieInputSorted { state, nodes, prefix_sets: Default::default() }
} else {
// Large k: k-way merge is faster (O(n log k)).
// Collect is unavoidable here - we need all data materialized for k-way merge.
let trie_data: Vec<_> = blocks.iter().map(|b| b.wait_cloned()).collect();
let merged_state = HashedPostStateSorted::merge_batch(
trie_data.iter().map(|d| d.hashed_state.as_ref()),
);
let merged_nodes =
TrieUpdatesSorted::merge_batch(trie_data.iter().map(|d| d.trie_updates.as_ref()));
TrieInputSorted {
state: Arc::new(merged_state),
nodes: Arc::new(merged_nodes),
prefix_sets: Default::default(),
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use reth_trie::{updates::TrieUpdates, HashedPostState};
fn empty_deferred(anchor: B256) -> DeferredTrieData {
DeferredTrieData::pending(
Arc::new(HashedPostState::default()),
Arc::new(TrieUpdates::default()),
anchor,
Vec::new(),
)
}
#[test]
fn empty_blocks_returns_default() {
let overlay = LazyOverlay::new(B256::ZERO, vec![]);
let result = overlay.get();
assert!(result.state.is_empty());
assert!(result.nodes.is_empty());
}
#[test]
fn single_block_uses_data_directly() {
let anchor = B256::random();
let deferred = empty_deferred(anchor);
let overlay = LazyOverlay::new(anchor, vec![deferred]);
assert!(!overlay.is_computed());
let _ = overlay.get();
assert!(overlay.is_computed());
}
#[test]
fn cached_after_first_access() {
let overlay = LazyOverlay::new(B256::ZERO, vec![]);
// First access computes
let _ = overlay.get();
assert!(overlay.is_computed());
// Second access uses cache
let _ = overlay.get();
assert!(overlay.is_computed());
}
}

View File

@@ -14,6 +14,9 @@ pub use in_memory::*;
mod deferred_trie;
pub use deferred_trie::*;
mod lazy_overlay;
pub use lazy_overlay::*;
mod noop;
mod chain_info;

View File

@@ -280,7 +280,6 @@ mod tests {
vec![block1.clone(), block2.clone()],
ExecutionOutcome::default(),
BTreeMap::new(),
BTreeMap::new(),
));
// Create a commit notification
@@ -319,13 +318,11 @@ mod tests {
vec![block1.clone()],
ExecutionOutcome::default(),
BTreeMap::new(),
BTreeMap::new(),
));
let new_chain = Arc::new(Chain::new(
vec![block2.clone(), block3.clone()],
ExecutionOutcome::default(),
BTreeMap::new(),
BTreeMap::new(),
));
// Create a reorg notification
@@ -391,7 +388,6 @@ mod tests {
vec![block1.clone(), block2.clone()],
execution_outcome,
BTreeMap::new(),
BTreeMap::new(),
));
// Create a commit notification containing the new chain segment.
@@ -449,12 +445,8 @@ mod tests {
ExecutionOutcome { receipts: old_receipts, ..Default::default() };
// Create an old chain segment to be reverted, containing `old_block1`.
let old_chain: Arc<Chain> = Arc::new(Chain::new(
vec![old_block1.clone()],
old_execution_outcome,
BTreeMap::new(),
BTreeMap::new(),
));
let old_chain: Arc<Chain> =
Arc::new(Chain::new(vec![old_block1.clone()], old_execution_outcome, BTreeMap::new()));
// Define block2 for the new chain segment, which will be committed.
let mut body = BlockBody::<TransactionSigned>::default();
@@ -482,12 +474,8 @@ mod tests {
ExecutionOutcome { receipts: new_receipts, ..Default::default() };
// Create a new chain segment to be committed, containing `new_block1`.
let new_chain = Arc::new(Chain::new(
vec![new_block1.clone()],
new_execution_outcome,
BTreeMap::new(),
BTreeMap::new(),
));
let new_chain =
Arc::new(Chain::new(vec![new_block1.clone()], new_execution_outcome, BTreeMap::new()));
// Create a reorg notification with both reverted (old) and committed (new) chain segments.
let notification = CanonStateNotification::Reorg { old: old_chain, new: new_chain };

View File

@@ -3,10 +3,7 @@ use crate::{
CanonStateSubscriptions, ComputedTrieData,
};
use alloy_consensus::{Header, SignableTransaction, TxEip1559, TxReceipt, EMPTY_ROOT_HASH};
use alloy_eips::{
eip1559::{ETHEREUM_BLOCK_GAS_LIMIT_30M, INITIAL_BASE_FEE},
eip7685::Requests,
};
use alloy_eips::eip1559::{ETHEREUM_BLOCK_GAS_LIMIT_30M, INITIAL_BASE_FEE};
use alloy_primitives::{Address, BlockNumber, B256, U256};
use alloy_signer::SignerSync;
use alloy_signer_local::PrivateKeySigner;
@@ -16,7 +13,7 @@ use reth_chainspec::{ChainSpec, EthereumHardfork, MIN_TRANSACTION_GAS};
use reth_ethereum_primitives::{
Block, BlockBody, EthPrimitives, Receipt, Transaction, TransactionSigned,
};
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
use reth_primitives_traits::{
proofs::{calculate_receipt_root, calculate_transaction_root, calculate_withdrawals_root},
Account, NodePrimitives, Recovered, RecoveredBlock, SealedBlock, SealedHeader,
@@ -201,7 +198,7 @@ impl<N: NodePrimitives> TestBlockBuilder<N> {
fn get_executed_block(
&mut self,
block_number: BlockNumber,
receipts: Vec<Vec<Receipt>>,
mut receipts: Vec<Vec<Receipt>>,
parent_hash: B256,
) -> ExecutedBlock {
let block = self.generate_random_block(block_number, parent_hash);
@@ -209,12 +206,15 @@ impl<N: NodePrimitives> TestBlockBuilder<N> {
let trie_data = ComputedTrieData::default();
ExecutedBlock::new(
Arc::new(RecoveredBlock::new_sealed(block, senders)),
Arc::new(ExecutionOutcome::new(
BundleState::default(),
receipts,
block_number,
vec![Requests::default()],
)),
Arc::new(BlockExecutionOutput {
result: BlockExecutionResult {
receipts: receipts.pop().unwrap_or_default(),
requests: Default::default(),
gas_used: 0,
blob_gas_used: 0,
},
state: BundleState::default(),
}),
trie_data,
)
}

View File

@@ -460,6 +460,18 @@ impl ChainSpec {
pub fn builder() -> ChainSpecBuilder {
ChainSpecBuilder::default()
}
/// Map a chain ID to a known chain spec, if available.
pub fn from_chain_id(chain_id: u64) -> Option<Arc<Self>> {
match NamedChain::try_from(chain_id).ok()? {
NamedChain::Mainnet => Some(MAINNET.clone()),
NamedChain::Sepolia => Some(SEPOLIA.clone()),
NamedChain::Holesky => Some(HOLESKY.clone()),
NamedChain::Hoodi => Some(HOODI.clone()),
NamedChain::Dev => Some(DEV.clone()),
_ => None,
}
}
}
impl<H: BlockHeader> ChainSpec<H> {

View File

@@ -100,7 +100,7 @@ impl<N: NodeTypes> TableViewer<()> for ListTableViewer<'_, N> {
tx.disable_long_read_transaction_safety();
let table_db = tx.inner.open_db(Some(self.args.table.name())).wrap_err("Could not open db.")?;
let stats = tx.inner.db_stat(&table_db).wrap_err(format!("Could not find table: {}", self.args.table.name()))?;
let stats = tx.inner.db_stat(table_db.dbi()).wrap_err(format!("Could not find table: {}", self.args.table.name()))?;
let total_entries = stats.entries();
let final_entry_idx = total_entries.saturating_sub(1);
if self.args.skip > final_entry_idx {

View File

@@ -54,6 +54,21 @@ pub enum SetCommand {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store storage history in rocksdb instead of MDBX
StoragesHistory {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store transaction hash to number mapping in rocksdb instead of MDBX
TransactionHashNumbers {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store account history in rocksdb instead of MDBX
AccountHistory {
#[clap(action(ArgAction::Set))]
value: bool,
},
}
impl Command {
@@ -128,6 +143,30 @@ impl Command {
settings.account_changesets_in_static_files = value;
println!("Set account_changesets_in_static_files = {}", value);
}
SetCommand::StoragesHistory { value } => {
if settings.storages_history_in_rocksdb == value {
println!("storages_history_in_rocksdb is already set to {}", value);
return Ok(());
}
settings.storages_history_in_rocksdb = value;
println!("Set storages_history_in_rocksdb = {}", value);
}
SetCommand::TransactionHashNumbers { value } => {
if settings.transaction_hash_numbers_in_rocksdb == value {
println!("transaction_hash_numbers_in_rocksdb is already set to {}", value);
return Ok(());
}
settings.transaction_hash_numbers_in_rocksdb = value;
println!("Set transaction_hash_numbers_in_rocksdb = {}", value);
}
SetCommand::AccountHistory { value } => {
if settings.account_history_in_rocksdb == value {
println!("account_history_in_rocksdb is already set to {}", value);
return Ok(());
}
settings.account_history_in_rocksdb = value;
println!("Set account_history_in_rocksdb = {}", value);
}
}
// Write updated settings

View File

@@ -88,7 +88,7 @@ impl Command {
let stats = tx
.inner
.db_stat(&table_db)
.db_stat(table_db.dbi())
.wrap_err(format!("Could not find table: {db_table}"))?;
// Defaults to 16KB right now but we should
@@ -129,7 +129,8 @@ impl Command {
table.add_row(row);
let freelist = tx.inner.env().freelist()?;
let pagesize = tx.inner.db_stat(&mdbx::Database::freelist_db())?.page_size() as usize;
let pagesize =
tx.inner.db_stat(mdbx::Database::freelist_db().dbi())?.page_size() as usize;
let freelist_size = freelist * pagesize;
let mut row = Row::new();

View File

@@ -2,14 +2,15 @@ use crate::common::EnvironmentArgs;
use clap::Parser;
use eyre::Result;
use lz4::Decoder;
use reqwest::Client;
use reqwest::{blocking::Client as BlockingClient, header::RANGE, Client, StatusCode};
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_cli::chainspec::ChainSpecParser;
use reth_fs_util as fs;
use std::{
borrow::Cow,
io::{self, Read, Write},
path::Path,
fs::OpenOptions,
io::{self, BufWriter, Read, Write},
path::{Path, PathBuf},
sync::{Arc, OnceLock},
time::{Duration, Instant},
};
@@ -86,6 +87,9 @@ impl DownloadDefaults {
"\nIf no URL is provided, the latest mainnet archive snapshot\nwill be proposed for download from ",
);
help.push_str(self.default_base_url.as_ref());
help.push_str(
".\n\nLocal file:// URLs are also supported for extracting snapshots from disk.",
);
help
}
@@ -293,19 +297,14 @@ impl CompressionFormat {
}
}
/// Downloads and extracts a snapshot, blocking until finished.
fn blocking_download_and_extract(url: &str, target_dir: &Path) -> Result<()> {
let client = reqwest::blocking::Client::builder().build()?;
let response = client.get(url).send()?.error_for_status()?;
let total_size = response.content_length().ok_or_else(|| {
eyre::eyre!(
"Server did not provide Content-Length header. This is required for snapshot downloads"
)
})?;
let progress_reader = ProgressReader::new(response, total_size);
let format = CompressionFormat::from_url(url)?;
/// Extracts a compressed tar archive to the target directory with progress tracking.
fn extract_archive<R: Read>(
reader: R,
total_size: u64,
format: CompressionFormat,
target_dir: &Path,
) -> Result<()> {
let progress_reader = ProgressReader::new(reader, total_size);
match format {
CompressionFormat::Lz4 => {
@@ -322,6 +321,185 @@ fn blocking_download_and_extract(url: &str, target_dir: &Path) -> Result<()> {
Ok(())
}
/// Extracts a snapshot from a local file.
fn extract_from_file(path: &Path, format: CompressionFormat, target_dir: &Path) -> Result<()> {
let file = std::fs::File::open(path)?;
let total_size = file.metadata()?.len();
extract_archive(file, total_size, format, target_dir)
}
const MAX_DOWNLOAD_RETRIES: u32 = 10;
const RETRY_BACKOFF_SECS: u64 = 5;
/// Wrapper that tracks download progress while writing data.
/// Used with [`io::copy`] to display progress during downloads.
struct ProgressWriter<W> {
inner: W,
progress: DownloadProgress,
}
impl<W: Write> Write for ProgressWriter<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let n = self.inner.write(buf)?;
let _ = self.progress.update(n as u64);
Ok(n)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
/// Downloads a file with resume support using HTTP Range requests.
/// Automatically retries on failure, resuming from where it left off.
/// Returns the path to the downloaded file and its total size.
fn resumable_download(url: &str, target_dir: &Path) -> Result<(PathBuf, u64)> {
let file_name = Url::parse(url)
.ok()
.and_then(|u| u.path_segments()?.next_back().map(|s| s.to_string()))
.unwrap_or_else(|| "snapshot.tar".to_string());
let final_path = target_dir.join(&file_name);
let part_path = target_dir.join(format!("{file_name}.part"));
let client = BlockingClient::builder().timeout(Duration::from_secs(30)).build()?;
let mut total_size: Option<u64> = None;
let mut last_error: Option<eyre::Error> = None;
for attempt in 1..=MAX_DOWNLOAD_RETRIES {
let existing_size = fs::metadata(&part_path).map(|m| m.len()).unwrap_or(0);
if let Some(total) = total_size &&
existing_size >= total
{
fs::rename(&part_path, &final_path)?;
info!(target: "reth::cli", "Download complete: {}", final_path.display());
return Ok((final_path, total));
}
if attempt > 1 {
info!(target: "reth::cli",
"Retry attempt {}/{} - resuming from {} bytes",
attempt, MAX_DOWNLOAD_RETRIES, existing_size
);
}
let mut request = client.get(url);
if existing_size > 0 {
request = request.header(RANGE, format!("bytes={existing_size}-"));
if attempt == 1 {
info!(target: "reth::cli", "Resuming download from {} bytes", existing_size);
}
}
let response = match request.send().and_then(|r| r.error_for_status()) {
Ok(r) => r,
Err(e) => {
last_error = Some(e.into());
if attempt < MAX_DOWNLOAD_RETRIES {
info!(target: "reth::cli",
"Download failed, retrying in {} seconds...", RETRY_BACKOFF_SECS
);
std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
}
continue;
}
};
let is_partial = response.status() == StatusCode::PARTIAL_CONTENT;
let size = if is_partial {
response
.headers()
.get("Content-Range")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.split('/').next_back())
.and_then(|v| v.parse().ok())
} else {
response.content_length()
};
if total_size.is_none() {
total_size = size;
}
let current_total = total_size.ok_or_else(|| {
eyre::eyre!("Server did not provide Content-Length or Content-Range header")
})?;
let file = if is_partial && existing_size > 0 {
OpenOptions::new()
.append(true)
.open(&part_path)
.map_err(|e| fs::FsPathError::open(e, &part_path))?
} else {
fs::create_file(&part_path)?
};
let start_offset = if is_partial { existing_size } else { 0 };
let mut progress = DownloadProgress::new(current_total);
progress.downloaded = start_offset;
let mut writer = ProgressWriter { inner: BufWriter::new(file), progress };
let mut reader = response;
let copy_result = io::copy(&mut reader, &mut writer);
let flush_result = writer.inner.flush();
println!();
if let Err(e) = copy_result.and(flush_result) {
last_error = Some(e.into());
if attempt < MAX_DOWNLOAD_RETRIES {
info!(target: "reth::cli",
"Download interrupted, retrying in {} seconds...", RETRY_BACKOFF_SECS
);
std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
}
continue;
}
fs::rename(&part_path, &final_path)?;
info!(target: "reth::cli", "Download complete: {}", final_path.display());
return Ok((final_path, current_total));
}
Err(last_error
.unwrap_or_else(|| eyre::eyre!("Download failed after {} attempts", MAX_DOWNLOAD_RETRIES)))
}
/// Fetches the snapshot from a remote URL with resume support, then extracts it.
fn download_and_extract(url: &str, format: CompressionFormat, target_dir: &Path) -> Result<()> {
let (downloaded_path, total_size) = resumable_download(url, target_dir)?;
info!(target: "reth::cli", "Extracting snapshot...");
let file = fs::open(&downloaded_path)?;
extract_archive(file, total_size, format, target_dir)?;
fs::remove_file(&downloaded_path)?;
info!(target: "reth::cli", "Removed downloaded archive");
Ok(())
}
/// Downloads and extracts a snapshot, blocking until finished.
///
/// Supports both `file://` URLs for local files and HTTP(S) URLs for remote downloads.
fn blocking_download_and_extract(url: &str, target_dir: &Path) -> Result<()> {
let format = CompressionFormat::from_url(url)?;
if let Ok(parsed_url) = Url::parse(url) &&
parsed_url.scheme() == "file"
{
let file_path = parsed_url
.to_file_path()
.map_err(|_| eyre::eyre!("Invalid file:// URL path: {}", url))?;
extract_from_file(&file_path, format, target_dir)
} else {
download_and_extract(url, format, target_dir)
}
}
async fn stream_and_extract(url: &str, target_dir: &Path) -> Result<()> {
let target_dir = target_dir.to_path_buf();
let url = url.to_string();
@@ -380,6 +558,7 @@ mod tests {
assert!(help.contains("Available snapshot sources:"));
assert!(help.contains("merkle.io"));
assert!(help.contains("publicnode.com"));
assert!(help.contains("file://"));
}
#[test]
@@ -404,4 +583,25 @@ mod tests {
assert_eq!(defaults.available_snapshots.len(), 4); // 2 defaults + 2 added
assert_eq!(defaults.long_help, Some("Custom help for snapshots".to_string()));
}
#[test]
fn test_compression_format_detection() {
assert!(matches!(
CompressionFormat::from_url("https://example.com/snapshot.tar.lz4"),
Ok(CompressionFormat::Lz4)
));
assert!(matches!(
CompressionFormat::from_url("https://example.com/snapshot.tar.zst"),
Ok(CompressionFormat::Zstd)
));
assert!(matches!(
CompressionFormat::from_url("file:///path/to/snapshot.tar.lz4"),
Ok(CompressionFormat::Lz4)
));
assert!(matches!(
CompressionFormat::from_url("file:///path/to/snapshot.tar.zst"),
Ok(CompressionFormat::Zstd)
));
assert!(CompressionFormat::from_url("https://example.com/snapshot.tar.gz").is_err());
}
}

View File

@@ -42,9 +42,9 @@ pub struct Command<C: ChainSpecParser> {
#[arg(long)]
to: Option<u64>,
/// Number of tasks to run in parallel
#[arg(long, default_value = "10")]
num_tasks: u64,
/// Number of tasks to run in parallel. Defaults to the number of available CPUs.
#[arg(long)]
num_tasks: Option<u64>,
/// Continues with execution when an invalid block is encountered and collects these blocks.
#[arg(long)]
@@ -84,12 +84,16 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
}
};
let num_tasks = self.num_tasks.unwrap_or_else(|| {
std::thread::available_parallelism().map(|n| n.get() as u64).unwrap_or(10)
});
let total_blocks = max_block - min_block;
let total_gas = calculate_gas_used_from_headers(
&provider_factory.static_file_provider(),
min_block..=max_block,
)?;
let blocks_per_task = total_blocks / self.num_tasks;
let blocks_per_task = total_blocks / num_tasks;
let db_at = {
let provider_factory = provider_factory.clone();
@@ -107,10 +111,10 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
let _guard = cancellation.drop_guard();
let mut tasks = JoinSet::new();
for i in 0..self.num_tasks {
for i in 0..num_tasks {
let start_block = min_block + i * blocks_per_task;
let end_block =
if i == self.num_tasks - 1 { max_block } else { start_block + blocks_per_task };
if i == num_tasks - 1 { max_block } else { start_block + blocks_per_task };
// Spawn thread executing blocks
let provider_factory = provider_factory.clone();
@@ -148,7 +152,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
};
if let Err(err) = consensus
.validate_block_post_execution(&block, &result)
.validate_block_post_execution(&block, &result, None)
.wrap_err_with(|| {
format!("Failed to validate block {} {}", block.number(), block.hash())
})

View File

@@ -15,7 +15,7 @@ use reth_db_common::{
use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
use reth_node_core::args::StageEnum;
use reth_provider::{
DBProvider, DatabaseProviderFactory, StaticFileProviderFactory, StaticFileWriter, TrieWriter,
DBProvider, DatabaseProviderFactory, StaticFileProviderFactory, StaticFileWriter,
};
use reth_prune::PruneSegment;
use reth_stages::StageId;
@@ -167,10 +167,6 @@ impl<C: ChainSpecParser> Command<C> {
None,
)?;
}
StageEnum::MerkleChangeSets => {
provider_rw.clear_trie_changesets()?;
reset_stage_checkpoint(tx, StageId::MerkleChangeSets)?;
}
StageEnum::AccountHistory | StageEnum::StorageHistory => {
tx.clear::<tables::AccountsHistory>()?;
tx.clear::<tables::StoragesHistory>()?;

View File

@@ -550,7 +550,6 @@ impl PruneConfig {
/// - `Option<PruneMode>` fields: set from `other` only if `self` is `None`.
/// - `block_interval`: set from `other` only if `self.block_interval ==
/// DEFAULT_BLOCK_INTERVAL`.
/// - `merkle_changesets`: always set from `other`.
/// - `receipts_log_filter`: set from `other` only if `self` is empty and `other` is non-empty.
pub fn merge(&mut self, other: Self) {
let Self {
@@ -563,7 +562,6 @@ impl PruneConfig {
account_history,
storage_history,
bodies_history,
merkle_changesets,
receipts_log_filter,
},
} = other;
@@ -580,8 +578,6 @@ impl PruneConfig {
self.segments.account_history = self.segments.account_history.or(account_history);
self.segments.storage_history = self.segments.storage_history.or(storage_history);
self.segments.bodies_history = self.segments.bodies_history.or(bodies_history);
// Merkle changesets is not optional; always take the value from `other`
self.segments.merkle_changesets = merkle_changesets;
if self.segments.receipts_log_filter.0.is_empty() && !receipts_log_filter.0.is_empty() {
self.segments.receipts_log_filter = receipts_log_filter;
@@ -1091,7 +1087,6 @@ receipts = { distance = 16384 }
account_history: None,
storage_history: Some(PruneMode::Before(5000)),
bodies_history: None,
merkle_changesets: PruneMode::Before(0),
receipts_log_filter: ReceiptsLogPruneConfig(BTreeMap::from([(
Address::random(),
PruneMode::Full,
@@ -1108,7 +1103,6 @@ receipts = { distance = 16384 }
account_history: Some(PruneMode::Distance(2000)),
storage_history: Some(PruneMode::Distance(3000)),
bodies_history: None,
merkle_changesets: PruneMode::Distance(10000),
receipts_log_filter: ReceiptsLogPruneConfig(BTreeMap::from([
(Address::random(), PruneMode::Distance(1000)),
(Address::random(), PruneMode::Before(2000)),
@@ -1127,7 +1121,6 @@ receipts = { distance = 16384 }
assert_eq!(config1.segments.receipts, Some(PruneMode::Distance(1000)));
assert_eq!(config1.segments.account_history, Some(PruneMode::Distance(2000)));
assert_eq!(config1.segments.storage_history, Some(PruneMode::Before(5000)));
assert_eq!(config1.segments.merkle_changesets, PruneMode::Distance(10000));
assert_eq!(config1.segments.receipts_log_filter, original_filter);
}

View File

@@ -15,6 +15,12 @@ use alloc::{boxed::Box, fmt::Debug, string::String, sync::Arc, vec::Vec};
use alloy_consensus::Header;
use alloy_primitives::{BlockHash, BlockNumber, Bloom, B256};
use core::error::Error;
/// Pre-computed receipt root and logs bloom.
///
/// When provided to [`FullConsensus::validate_block_post_execution`], this allows skipping
/// the receipt root computation and using the pre-computed values instead.
pub type ReceiptRootBloom = (B256, Bloom);
use reth_execution_types::BlockExecutionResult;
use reth_primitives_traits::{
constants::{GAS_LIMIT_BOUND_DIVISOR, MAXIMUM_GAS_LIMIT_BLOCK, MINIMUM_GAS_LIMIT},
@@ -39,11 +45,15 @@ pub trait FullConsensus<N: NodePrimitives>: Consensus<N::Block> {
///
/// See the Yellow Paper sections 4.3.2 "Holistic Validity".
///
/// If `receipt_root_bloom` is provided, the implementation should use the pre-computed
/// receipt root and logs bloom instead of computing them from the receipts.
///
/// Note: validating blocks does not include other validations of the Consensus
fn validate_block_post_execution(
&self,
block: &RecoveredBlock<N::Block>,
result: &BlockExecutionResult<N::Receipt>,
receipt_root_bloom: Option<ReceiptRootBloom>,
) -> Result<(), ConsensusError>;
}

View File

@@ -18,7 +18,7 @@
//!
//! **Not for production use** - provides no security guarantees or consensus validation.
use crate::{Consensus, ConsensusError, FullConsensus, HeaderValidator};
use crate::{Consensus, ConsensusError, FullConsensus, HeaderValidator, ReceiptRootBloom};
use alloc::sync::Arc;
use reth_execution_types::BlockExecutionResult;
use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
@@ -76,6 +76,7 @@ impl<N: NodePrimitives> FullConsensus<N> for NoopConsensus {
&self,
_block: &RecoveredBlock<N::Block>,
_result: &BlockExecutionResult<N::Receipt>,
_receipt_root_bloom: Option<ReceiptRootBloom>,
) -> Result<(), ConsensusError> {
Ok(())
}

View File

@@ -1,4 +1,4 @@
use crate::{Consensus, ConsensusError, FullConsensus, HeaderValidator};
use crate::{Consensus, ConsensusError, FullConsensus, HeaderValidator, ReceiptRootBloom};
use core::sync::atomic::{AtomicBool, Ordering};
use reth_execution_types::BlockExecutionResult;
use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
@@ -51,6 +51,7 @@ impl<N: NodePrimitives> FullConsensus<N> for TestConsensus {
&self,
_block: &RecoveredBlock<N::Block>,
_result: &BlockExecutionResult<N::Receipt>,
_receipt_root_bloom: Option<ReceiptRootBloom>,
) -> Result<(), ConsensusError> {
if self.fail_validation() {
Err(ConsensusError::BaseFeeMissing)

View File

@@ -448,12 +448,14 @@ mod tests {
nonce: account.nonce,
code_hash: account.bytecode_hash.unwrap_or_default(),
code: None,
account_id: None,
}),
original_info: (i == 0).then(|| AccountInfo {
balance: account.balance.checked_div(U256::from(2)).unwrap_or(U256::ZERO),
nonce: 0,
code_hash: account.bytecode_hash.unwrap_or_default(),
code: None,
account_id: None,
}),
storage,
status: AccountStatus::default(),

View File

@@ -25,6 +25,7 @@ reth-tasks.workspace = true
reth-node-types.workspace = true
reth-chainspec.workspace = true
reth-engine-primitives.workspace = true
reth-trie-db.workspace = true
# async
futures.workspace = true
@@ -40,6 +41,8 @@ reth-evm-ethereum.workspace = true
reth-exex-types.workspace = true
reth-primitives-traits.workspace = true
reth-node-ethereum.workspace = true
reth-trie-db.workspace = true
alloy-eips.workspace = true
tokio = { workspace = true, features = ["sync"] }
tokio-stream.workspace = true

View File

@@ -26,6 +26,7 @@ use reth_provider::{
use reth_prune::PrunerWithFactory;
use reth_stages_api::{MetricEventsSender, Pipeline};
use reth_tasks::TaskSpawner;
use reth_trie_db::ChangesetCache;
use std::{
pin::Pin,
sync::Arc,
@@ -84,6 +85,7 @@ where
tree_config: TreeConfig,
sync_metrics_tx: MetricEventsSender,
evm_config: C,
changeset_cache: ChangesetCache,
) -> Self
where
V: EngineValidator<N::Payload>,
@@ -109,6 +111,7 @@ where
tree_config,
engine_kind,
evm_config,
changeset_cache,
);
let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
@@ -156,6 +159,7 @@ mod tests {
};
use reth_prune::Pruner;
use reth_tasks::TokioTaskExecutor;
use reth_trie_db::ChangesetCache;
use std::sync::Arc;
use tokio::sync::{mpsc::unbounded_channel, watch};
use tokio_stream::wrappers::UnboundedReceiverStream;
@@ -188,6 +192,8 @@ mod tests {
let pruner = Pruner::new_with_factory(provider_factory.clone(), vec![], 0, 0, None, rx);
let evm_config = EthEvmConfig::new(chain_spec.clone());
let changeset_cache = ChangesetCache::new();
let engine_validator = BasicEngineValidator::new(
blockchain_db.clone(),
consensus.clone(),
@@ -195,6 +201,7 @@ mod tests {
engine_payload_validator,
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
changeset_cache.clone(),
);
let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
@@ -214,6 +221,7 @@ mod tests {
TreeConfig::default(),
sync_metrics_tx,
evm_config,
changeset_cache,
);
}
}

View File

@@ -34,6 +34,8 @@ reth-trie-parallel.workspace = true
reth-trie-sparse = { workspace = true, features = ["std", "metrics"] }
reth-trie-sparse-parallel = { workspace = true, features = ["std"] }
reth-trie.workspace = true
reth-trie-common.workspace = true
reth-trie-db.workspace = true
# alloy
alloy-evm.workspace = true
@@ -94,7 +96,7 @@ reth-tracing.workspace = true
reth-node-ethereum.workspace = true
reth-e2e-test-utils.workspace = true
# alloy
# revm
revm-state.workspace = true
assert_matches.workspace = true
@@ -133,6 +135,8 @@ test-utils = [
"reth-static-file",
"reth-tracing",
"reth-trie/test-utils",
"reth-trie-common/test-utils",
"reth-trie-db/test-utils",
"reth-trie-sparse/test-utils",
"reth-prune-types?/test-utils",
"reth-trie-parallel/test-utils",

View File

@@ -26,7 +26,9 @@ fn create_bench_state(num_accounts: usize) -> EvmState {
nonce: 10,
code_hash: B256::from_slice(&rng.random::<[u8; 32]>()),
code: Default::default(),
account_id: None,
},
original_info: Box::new(AccountInfo::default()),
storage,
status: AccountStatus::empty(),
transaction_id: 0,

View File

@@ -62,6 +62,7 @@ fn create_bench_state_updates(params: &BenchParams) -> Vec<EvmState> {
storage: HashMap::default(),
status: AccountStatus::SelfDestructed,
transaction_id: 0,
original_info: Box::new(AccountInfo::default()),
}
} else {
RevmAccount {
@@ -70,6 +71,7 @@ fn create_bench_state_updates(params: &BenchParams) -> Vec<EvmState> {
nonce: rng.random::<u64>(),
code_hash: KECCAK_EMPTY,
code: Some(Default::default()),
account_id: None,
},
storage: (0..rng.random_range(0..=params.storage_slots_per_account))
.map(|_| {
@@ -84,6 +86,7 @@ fn create_bench_state_updates(params: &BenchParams) -> Vec<EvmState> {
})
.collect(),
status: AccountStatus::Touched,
original_info: Box::new(AccountInfo::default()),
transaction_id: 0,
}
};
@@ -239,7 +242,10 @@ fn bench_state_root(c: &mut Criterion) {
std::convert::identity,
),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider),
OverlayStateProviderFactory::new(
provider,
reth_trie_db::ChangesetCache::new(),
),
&TreeConfig::default(),
None,
);

View File

@@ -7,7 +7,7 @@ use reth_ethereum_primitives::EthPrimitives;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
DBProvider, DatabaseProviderFactory, ProviderFactory,
DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode,
};
use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
use reth_stages_api::{MetricEvent, MetricEventsSender};
@@ -151,7 +151,7 @@ where
if last_block.is_some() {
let provider_rw = self.provider.database_provider_rw()?;
provider_rw.save_blocks(blocks)?;
provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;
provider_rw.commit()?;
}
@@ -159,6 +159,7 @@ where
self.metrics.save_blocks_block_count.record(block_count as f64);
self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
Ok(last_block)
}
}

View File

@@ -60,16 +60,22 @@ impl EngineApiMetrics {
///
/// This method updates metrics for execution time, gas usage, and the number
/// of accounts, storage slots and bytecodes loaded and updated.
pub(crate) fn execute_metered<E, DB>(
///
/// The optional `on_receipt` callback is invoked after each transaction with the receipt
/// index and a reference to all receipts collected so far. This allows callers to stream
/// receipts to a background task for incremental receipt root computation.
pub(crate) fn execute_metered<E, DB, F>(
&self,
executor: E,
mut transactions: impl Iterator<Item = Result<impl ExecutableTx<E>, BlockExecutionError>>,
transaction_count: usize,
state_hook: Box<dyn OnStateHook>,
mut on_receipt: F,
) -> Result<(BlockExecutionOutput<E::Receipt>, Vec<Address>), BlockExecutionError>
where
DB: alloy_evm::Database,
E: BlockExecutor<Evm: Evm<DB: BorrowMut<State<DB>>>, Transaction: SignedTransaction>,
F: FnMut(&[E::Receipt]),
{
// clone here is cheap, all the metrics are Option<Arc<_>>. additionally
// they are globally registered so that the data recorded in the hook will
@@ -95,14 +101,21 @@ impl EngineApiMetrics {
let tx = tx?;
senders.push(*tx.signer());
let span =
debug_span!(target: "engine::tree", "execute tx", tx_hash=?tx.tx().tx_hash());
let span = debug_span!(
target: "engine::tree",
"execute tx",
tx_hash = ?tx.tx().tx_hash(),
gas_used = tracing::field::Empty,
);
let enter = span.entered();
trace!(target: "engine::tree", "Executing transaction");
let start = Instant::now();
let gas_used = executor.execute_transaction(tx)?;
self.executor.transaction_execution_histogram.record(start.elapsed());
// Invoke callback with the latest receipt
on_receipt(executor.receipts());
// record the tx gas used
enter.record("gas_used", gas_used);
}
@@ -257,7 +270,10 @@ impl ForkchoiceUpdatedMetrics {
pub(crate) struct NewPayloadStatusMetrics {
/// Finish time of the latest new payload call.
#[metric(skip)]
pub(crate) latest_at: Option<Instant>,
pub(crate) latest_finish_at: Option<Instant>,
/// Start time of the latest new payload call.
#[metric(skip)]
pub(crate) latest_start_at: Option<Instant>,
/// The total count of new payload messages received.
pub(crate) new_payload_messages: Counter,
/// The total count of new payload messages that we responded to with
@@ -285,6 +301,10 @@ pub(crate) struct NewPayloadStatusMetrics {
pub(crate) new_payload_latency: Histogram,
/// Latency for the last new payload call.
pub(crate) new_payload_last: Gauge,
/// Time from previous payload finish to current payload start (idle time).
pub(crate) time_between_new_payloads: Histogram,
/// Time from previous payload start to current payload start (total interval).
pub(crate) new_payload_interval: Histogram,
}
impl NewPayloadStatusMetrics {
@@ -298,7 +318,14 @@ impl NewPayloadStatusMetrics {
let finish = Instant::now();
let elapsed = finish - start;
self.latest_at = Some(finish);
if let Some(prev_finish) = self.latest_finish_at {
self.time_between_new_payloads.record(start - prev_finish);
}
if let Some(prev_start) = self.latest_start_at {
self.new_payload_interval.record(start - prev_start);
}
self.latest_finish_at = Some(finish);
self.latest_start_at = Some(start);
match result {
Ok(outcome) => match outcome.outcome.status {
PayloadStatusEnum::Valid => {
@@ -334,10 +361,6 @@ pub(crate) struct BlockValidationMetrics {
pub(crate) state_root_histogram: Histogram,
/// Histogram of deferred trie computation duration.
pub(crate) deferred_trie_compute_duration: Histogram,
/// Histogram of time spent waiting for deferred trie data to become available.
pub(crate) deferred_trie_wait_duration: Histogram,
/// Trie input computation duration
pub(crate) trie_input_duration: Histogram,
/// Payload conversion and validation latency
pub(crate) payload_validation_duration: Gauge,
/// Histogram of payload validation latency
@@ -408,12 +431,13 @@ mod tests {
/// A simple mock executor for testing that doesn't require complex EVM setup
struct MockExecutor {
state: EvmState,
receipts: Vec<Receipt>,
hook: Option<Box<dyn OnStateHook>>,
}
impl MockExecutor {
fn new(state: EvmState) -> Self {
Self { state, hook: None }
Self { state, receipts: vec![], hook: None }
}
}
@@ -495,12 +519,16 @@ mod tests {
self.hook = hook;
}
fn evm_mut(&mut self) -> &mut Self::Evm {
panic!("Mock executor evm_mut() not implemented")
}
fn evm(&self) -> &Self::Evm {
panic!("Mock executor evm() not implemented")
}
fn evm_mut(&mut self) -> &mut Self::Evm {
panic!("Mock executor evm_mut() not implemented")
fn receipts(&self) -> &[Self::Receipt] {
&self.receipts
}
}
@@ -535,11 +563,12 @@ mod tests {
let executor = MockExecutor::new(state);
// This will fail to create the EVM but should still call the hook
let _result = metrics.execute_metered::<_, EmptyDB>(
let _result = metrics.execute_metered::<_, EmptyDB, _>(
executor,
input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
input.transaction_count(),
state_hook,
|_| {},
);
// Check if hook was called (it might not be if finish() fails early)
@@ -580,7 +609,9 @@ mod tests {
nonce: 10,
code_hash: B256::random(),
code: Default::default(),
account_id: None,
},
original_info: Box::new(AccountInfo::default()),
storage,
status: AccountStatus::default(),
transaction_id: 0,
@@ -592,11 +623,12 @@ mod tests {
let executor = MockExecutor::new(state);
// Execute (will fail but should still update some metrics)
let _result = metrics.execute_metered::<_, EmptyDB>(
let _result = metrics.execute_metered::<_, EmptyDB, _>(
executor,
input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
input.transaction_count(),
state_hook,
|_| {},
);
let snapshot = snapshotter.snapshot().into_vec();

View File

@@ -30,11 +30,13 @@ use reth_payload_primitives::{
};
use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
use reth_provider::{
BlockReader, DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StateProviderBox,
StateProviderFactory, StateReader, TransactionVariant, TrieReader,
BlockExecutionOutput, BlockExecutionResult, BlockNumReader, BlockReader, ChangeSetReader,
DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
StateProviderBox, StateProviderFactory, StateReader, TransactionVariant,
};
use reth_revm::database::StateProviderDatabase;
use reth_stages_api::ControlFlow;
use reth_trie_db::ChangesetCache;
use revm::state::EvmState;
use state::TreeState;
use std::{fmt::Debug, ops, sync::Arc, time::Instant};
@@ -271,6 +273,8 @@ where
engine_kind: EngineApiKind,
/// The EVM configuration.
evm_config: C,
/// Changeset cache for in-memory trie changesets
changeset_cache: ChangesetCache,
}
impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
@@ -295,6 +299,7 @@ where
.field("metrics", &self.metrics)
.field("engine_kind", &self.engine_kind)
.field("evm_config", &self.evm_config)
.field("changeset_cache", &self.changeset_cache)
.finish()
}
}
@@ -307,11 +312,12 @@ where
+ StateProviderFactory
+ StateReader<Receipt = N::Receipt>
+ HashedPostStateProvider
+ TrieReader
+ Clone
+ 'static,
<P as DatabaseProviderFactory>::Provider:
BlockReader<Block = N::Block, Header = N::BlockHeader>,
<P as DatabaseProviderFactory>::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
+ StageCheckpointReader
+ ChangeSetReader
+ BlockNumReader,
C: ConfigureEvm<Primitives = N> + 'static,
T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
V: EngineValidator<T>,
@@ -331,6 +337,7 @@ where
config: TreeConfig,
engine_kind: EngineApiKind,
evm_config: C,
changeset_cache: ChangesetCache,
) -> Self {
let (incoming_tx, incoming) = crossbeam_channel::unbounded();
@@ -351,6 +358,7 @@ where
incoming_tx,
engine_kind,
evm_config,
changeset_cache,
}
}
@@ -370,6 +378,7 @@ where
config: TreeConfig,
kind: EngineApiKind,
evm_config: C,
changeset_cache: ChangesetCache,
) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
{
let best_block_number = provider.best_block_number().unwrap_or(0);
@@ -401,6 +410,7 @@ where
config,
kind,
evm_config,
changeset_cache,
);
let incoming = task.incoming_tx.clone();
std::thread::Builder::new().name("Engine Task".to_string()).spawn(|| task.run()).unwrap();
@@ -1365,6 +1375,21 @@ where
debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
// Evict trie changesets for blocks below the finalized block, but keep at least 64 blocks
if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() {
let min_threshold = last_persisted_block_number.saturating_sub(64);
let eviction_threshold = finalized.number.min(min_threshold);
debug!(
target: "engine::tree",
last_persisted = last_persisted_block_number,
finalized_number = finalized.number,
eviction_threshold,
"Evicting changesets below threshold"
);
self.changeset_cache.evict(eviction_threshold);
}
self.on_new_persisted_block()?;
Ok(())
}
@@ -1453,7 +1478,7 @@ where
self.metrics.engine.forkchoice_updated.update_response_metrics(
start,
&mut self.metrics.engine.new_payload.latest_at,
&mut self.metrics.engine.new_payload.latest_finish_at,
has_attrs,
&output,
);
@@ -1651,6 +1676,18 @@ where
)));
return Ok(());
}
} else {
// We don't have the head block or any of its ancestors buffered. Request
// a download for the head block which will then trigger further sync.
debug!(
target: "engine::tree",
head_hash = %sync_target_state.head_block_hash,
"Backfill complete but head block not buffered, requesting download"
);
self.emit_event(EngineApiEvent::Download(DownloadRequest::single_block(
sync_target_state.head_block_hash,
)));
return Ok(());
}
// try to close the gap by executing buffered blocks that are child blocks of the new head
@@ -1818,6 +1855,7 @@ where
/// or the database. If the required historical data (such as trie change sets) has been
/// pruned for a given block, this operation will return an error. On archive nodes, it
/// can retrieve any block.
#[instrument(level = "debug", target = "engine::tree", skip(self))]
fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
// check memory first
@@ -1830,12 +1868,23 @@ where
.sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
.ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
.split_sealed();
let execution_output = self
let mut execution_output = self
.provider
.get_state(block.header().number())?
.ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
let hashed_state = self.provider.hashed_post_state(execution_output.state());
let trie_updates = self.provider.get_block_trie_updates(block.number())?;
debug!(
target: "engine::tree",
number = ?block.number(),
"computing block trie updates",
);
let db_provider = self.provider.database_provider_ro()?;
let trie_updates = reth_trie_db::compute_block_trie_updates(
&self.changeset_cache,
&db_provider,
block.number(),
)?;
let sorted_hashed_state = Arc::new(hashed_state.into_sorted());
let sorted_trie_updates = Arc::new(trie_updates);
@@ -1843,9 +1892,19 @@ where
let trie_data =
ComputedTrieData::without_trie_input(sorted_hashed_state, sorted_trie_updates);
let execution_output = Arc::new(BlockExecutionOutput {
state: execution_output.bundle,
result: BlockExecutionResult {
receipts: execution_output.receipts.pop().unwrap_or_default(),
requests: execution_output.requests.pop().unwrap_or_default(),
gas_used: block.gas_used(),
blob_gas_used: block.blob_gas_used().unwrap_or_default(),
},
});
Ok(Some(ExecutedBlock::new(
Arc::new(RecoveredBlock::new_sealed(block, senders)),
Arc::new(execution_output),
execution_output,
trie_data,
)))
}

View File

@@ -101,7 +101,7 @@ impl<'a> Iterator for BALSlotIter<'a> {
return None;
}
return Some((address, slot));
return Some((address, StorageKey::from(slot)));
}
// Move to next account
@@ -177,13 +177,11 @@ where
let mut storage_map = HashedStorage::new(false);
for slot_changes in &account_changes.storage_changes {
let hashed_slot = keccak256(slot_changes.slot);
let hashed_slot = keccak256(slot_changes.slot.to_be_bytes::<32>());
// Get the last change for this slot
if let Some(last_change) = slot_changes.changes.last() {
storage_map
.storage
.insert(hashed_slot, U256::from_be_bytes(last_change.new_value.0));
storage_map.storage.insert(hashed_slot, last_change.new_value);
}
}
@@ -237,8 +235,8 @@ mod tests {
let provider = StateProviderTest::default();
let address = Address::random();
let slot = StorageKey::random();
let value = B256::random();
let slot = U256::random();
let value = U256::random();
let slot_changes = SlotChanges { slot, changes: vec![StorageChange::new(0, value)] };
@@ -258,10 +256,10 @@ mod tests {
assert!(result.storages.contains_key(&hashed_address));
let storage = result.storages.get(&hashed_address).unwrap();
let hashed_slot = keccak256(slot);
let hashed_slot = keccak256(slot.to_be_bytes::<32>());
let stored_value = storage.storage.get(&hashed_slot).unwrap();
assert_eq!(*stored_value, U256::from_be_bytes(value.0));
assert_eq!(*stored_value, value);
}
#[test]
@@ -392,15 +390,15 @@ mod tests {
let provider = StateProviderTest::default();
let address = Address::random();
let slot = StorageKey::random();
let slot = U256::random();
// Multiple changes to the same slot - should take the last one
let slot_changes = SlotChanges {
slot,
changes: vec![
StorageChange::new(0, B256::from(U256::from(100).to_be_bytes::<32>())),
StorageChange::new(1, B256::from(U256::from(200).to_be_bytes::<32>())),
StorageChange::new(2, B256::from(U256::from(300).to_be_bytes::<32>())),
StorageChange::new(0, U256::from(100)),
StorageChange::new(1, U256::from(200)),
StorageChange::new(2, U256::from(300)),
],
};
@@ -418,7 +416,7 @@ mod tests {
let hashed_address = keccak256(address);
let storage = result.storages.get(&hashed_address).unwrap();
let hashed_slot = keccak256(slot);
let hashed_slot = keccak256(slot.to_be_bytes::<32>());
let stored_value = storage.storage.get(&hashed_slot).unwrap();
@@ -438,15 +436,15 @@ mod tests {
address: addr1,
storage_changes: vec![
SlotChanges {
slot: StorageKey::from(U256::from(100)),
changes: vec![StorageChange::new(0, B256::ZERO)],
slot: U256::from(100),
changes: vec![StorageChange::new(0, U256::ZERO)],
},
SlotChanges {
slot: StorageKey::from(U256::from(101)),
changes: vec![StorageChange::new(0, B256::ZERO)],
slot: U256::from(101),
changes: vec![StorageChange::new(0, U256::ZERO)],
},
],
storage_reads: vec![StorageKey::from(U256::from(102))],
storage_reads: vec![U256::from(102)],
balance_changes: vec![],
nonce_changes: vec![],
code_changes: vec![],
@@ -456,10 +454,10 @@ mod tests {
let account2 = AccountChanges {
address: addr2,
storage_changes: vec![SlotChanges {
slot: StorageKey::from(U256::from(200)),
changes: vec![StorageChange::new(0, B256::ZERO)],
slot: U256::from(200),
changes: vec![StorageChange::new(0, U256::ZERO)],
}],
storage_reads: vec![StorageKey::from(U256::from(201))],
storage_reads: vec![U256::from(201)],
balance_changes: vec![],
nonce_changes: vec![],
code_changes: vec![],
@@ -470,15 +468,15 @@ mod tests {
address: addr3,
storage_changes: vec![
SlotChanges {
slot: StorageKey::from(U256::from(300)),
changes: vec![StorageChange::new(0, B256::ZERO)],
slot: U256::from(300),
changes: vec![StorageChange::new(0, U256::ZERO)],
},
SlotChanges {
slot: StorageKey::from(U256::from(301)),
changes: vec![StorageChange::new(0, B256::ZERO)],
slot: U256::from(301),
changes: vec![StorageChange::new(0, U256::ZERO)],
},
],
storage_reads: vec![StorageKey::from(U256::from(302))],
storage_reads: vec![U256::from(302)],
balance_changes: vec![],
nonce_changes: vec![],
code_changes: vec![],

View File

@@ -28,10 +28,10 @@ use reth_evm::{
ConfigureEvm, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook, SpecFor,
TxEnvFor,
};
use reth_execution_types::ExecutionOutcome;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
BlockReader, DatabaseProviderROFactory, StateProvider, StateProviderFactory, StateReader,
BlockExecutionOutput, BlockReader, DatabaseProviderROFactory, StateProvider,
StateProviderFactory, StateReader,
};
use reth_revm::{db::BundleState, state::EvmState};
use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
@@ -61,6 +61,7 @@ mod configured_sparse_trie;
pub mod executor;
pub mod multiproof;
pub mod prewarm;
pub mod receipt_root_task;
pub mod sparse_trie;
use configured_sparse_trie::ConfiguredSparseTrie;
@@ -665,13 +666,15 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
/// Terminates the entire caching task.
///
/// If the [`ExecutionOutcome`] is provided it will update the shared cache using its
/// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its
/// bundle state. Using `Arc<ExecutionOutcome>` allows sharing with the main execution
/// path without cloning the expensive `BundleState`.
///
/// Returns a sender for the channel that should be notified on block validation success.
pub(super) fn terminate_caching(
&mut self,
execution_outcome: Option<Arc<ExecutionOutcome<R>>>,
) {
execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
) -> Option<mpsc::Sender<()>> {
self.prewarm_handle.terminate_caching(execution_outcome)
}
@@ -707,15 +710,21 @@ impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
/// Terminates the entire pre-warming task.
///
/// If the [`ExecutionOutcome`] is provided it will update the shared cache using its
/// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its
/// bundle state. Using `Arc<ExecutionOutcome>` avoids cloning the expensive `BundleState`.
#[must_use = "sender must be used and notified on block validation success"]
pub(super) fn terminate_caching(
&mut self,
execution_outcome: Option<Arc<ExecutionOutcome<R>>>,
) {
execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
) -> Option<mpsc::Sender<()>> {
if let Some(tx) = self.to_prewarm_task.take() {
let event = PrewarmTaskEvent::Terminate { execution_outcome };
let (valid_block_tx, valid_block_rx) = mpsc::channel();
let event = PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx };
let _ = tx.send(event);
Some(valid_block_tx)
} else {
None
}
}
}
@@ -724,7 +733,10 @@ impl<R> Drop for CacheTaskHandle<R> {
fn drop(&mut self) {
// Ensure we always terminate on drop - send None without needing Send + Sync bounds
if let Some(tx) = self.to_prewarm_task.take() {
let _ = tx.send(PrewarmTaskEvent::Terminate { execution_outcome: None });
let _ = tx.send(PrewarmTaskEvent::Terminate {
execution_outcome: None,
valid_block_rx: mpsc::channel().1,
});
}
}
}
@@ -886,6 +898,7 @@ mod tests {
use reth_revm::db::BundleState;
use reth_testing_utils::generators;
use reth_trie::{test_utils::state_root, HashedPostState};
use reth_trie_db::ChangesetCache;
use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
use std::sync::Arc;
@@ -1058,7 +1071,9 @@ mod tests {
nonce: rng.random::<u64>(),
code_hash: KECCAK_EMPTY,
code: Some(Default::default()),
account_id: None,
},
original_info: Box::new(AccountInfo::default()),
storage,
status: AccountStatus::Touched,
transaction_id: 0,
@@ -1141,7 +1156,7 @@ mod tests {
std::convert::identity,
),
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider_factory),
OverlayStateProviderFactory::new(provider_factory, ChangesetCache::new()),
&TreeConfig::default(),
None, // No BAL for test
);

View File

@@ -20,6 +20,7 @@ use reth_trie_parallel::{
AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
},
};
use revm_primitives::map::{hash_map, B256Map};
use std::{collections::BTreeMap, sync::Arc, time::Instant};
use tracing::{debug, error, instrument, trace};
@@ -140,22 +141,27 @@ impl ProofSequencer {
/// Adds a proof with the corresponding state update and returns all sequential proofs and state
/// updates if we have a continuous sequence
fn add_proof(&mut self, sequence: u64, update: SparseTrieUpdate) -> Vec<SparseTrieUpdate> {
if sequence >= self.next_to_deliver {
// Optimization: fast path for in-order delivery to avoid BTreeMap overhead.
// If this is the expected sequence, return it immediately without buffering.
if sequence == self.next_to_deliver {
let mut consecutive_proofs = Vec::with_capacity(1);
consecutive_proofs.push(update);
self.next_to_deliver += 1;
// Check if we have subsequent proofs in the pending buffer
while let Some(pending) = self.pending_proofs.remove(&self.next_to_deliver) {
consecutive_proofs.push(pending);
self.next_to_deliver += 1;
}
return consecutive_proofs;
}
if sequence > self.next_to_deliver {
self.pending_proofs.insert(sequence, update);
}
let mut consecutive_proofs = Vec::with_capacity(self.pending_proofs.len());
let mut current_sequence = self.next_to_deliver;
// keep collecting proofs and state updates as long as we have consecutive sequence numbers
while let Some(pending) = self.pending_proofs.remove(&current_sequence) {
consecutive_proofs.push(pending);
current_sequence += 1;
}
self.next_to_deliver += consecutive_proofs.len() as u64;
consecutive_proofs
Vec::new()
}
/// Returns true if we still have pending proofs
@@ -609,7 +615,19 @@ impl MultiProofTask {
self.multi_added_removed_keys.touch_accounts(targets.keys().copied());
// Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks
let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
let multi_added_removed_keys = Arc::new(MultiAddedRemovedKeys {
account: self.multi_added_removed_keys.account.clone(),
storages: targets
.keys()
.filter_map(|account| {
self.multi_added_removed_keys
.storages
.get(account)
.cloned()
.map(|keys| (*account, keys))
})
.collect(),
});
self.metrics.prefetch_proof_targets_accounts_histogram.record(targets.len() as f64);
self.metrics
@@ -705,7 +723,33 @@ impl MultiProofTask {
}
// Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks
let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
let multi_added_removed_keys = Arc::new(MultiAddedRemovedKeys {
account: self.multi_added_removed_keys.account.clone(),
storages: {
let mut storages = B256Map::with_capacity_and_hasher(
not_fetched_state_update.storages.len(),
Default::default(),
);
for account in not_fetched_state_update
.storages
.keys()
.chain(not_fetched_state_update.accounts.keys())
{
if let hash_map::Entry::Vacant(entry) = storages.entry(*account) {
entry.insert(
self.multi_added_removed_keys
.storages
.get(account)
.cloned()
.unwrap_or_default(),
);
}
}
storages
},
});
let chunking_len = not_fetched_state_update.chunking_length();
let mut spawned_proof_targets = MultiProofTargets::default();
@@ -1279,9 +1323,10 @@ mod tests {
use reth_provider::{
providers::OverlayStateProviderFactory, test_utils::create_test_provider_factory,
BlockNumReader, BlockReader, ChangeSetReader, DatabaseProviderFactory, LatestStateProvider,
PruneCheckpointReader, StageCheckpointReader, StateProviderBox, TrieReader,
PruneCheckpointReader, StageCheckpointReader, StateProviderBox,
};
use reth_trie::MultiProof;
use reth_trie_db::ChangesetCache;
use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofWorkerHandle};
use revm_primitives::{B256, U256};
use std::sync::{Arc, OnceLock};
@@ -1302,7 +1347,6 @@ mod tests {
where
F: DatabaseProviderFactory<
Provider: BlockReader
+ TrieReader
+ StageCheckpointReader
+ PruneCheckpointReader
+ ChangeSetReader
@@ -1312,7 +1356,8 @@ mod tests {
+ 'static,
{
let rt_handle = get_test_runtime_handle();
let overlay_factory = OverlayStateProviderFactory::new(factory);
let changeset_cache = ChangesetCache::new();
let overlay_factory = OverlayStateProviderFactory::new(factory, changeset_cache);
let task_ctx = ProofTaskCtx::new(overlay_factory);
let proof_handle = ProofWorkerHandle::new(rt_handle, task_ctx, 1, 1, false);
let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();
@@ -1324,7 +1369,7 @@ mod tests {
fn create_cached_provider<F>(factory: F) -> CachedStateProvider<StateProviderBox>
where
F: DatabaseProviderFactory<
Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader,
Provider: BlockReader + StageCheckpointReader + PruneCheckpointReader,
> + Clone
+ Send
+ 'static,
@@ -1771,7 +1816,9 @@ mod tests {
nonce: 1,
code_hash: Default::default(),
code: Default::default(),
account_id: None,
},
original_info: Box::new(revm_state::AccountInfo::default()),
transaction_id: Default::default(),
storage: Default::default(),
status: revm_state::AccountStatus::Touched,
@@ -1788,7 +1835,9 @@ mod tests {
nonce: 2,
code_hash: Default::default(),
code: Default::default(),
account_id: None,
},
original_info: Box::new(revm_state::AccountInfo::default()),
transaction_id: Default::default(),
storage: Default::default(),
status: revm_state::AccountStatus::Touched,
@@ -1890,7 +1939,9 @@ mod tests {
nonce: 1,
code_hash: Default::default(),
code: Default::default(),
account_id: None,
},
original_info: Box::new(revm_state::AccountInfo::default()),
transaction_id: Default::default(),
storage: Default::default(),
status: revm_state::AccountStatus::Touched,

View File

@@ -30,10 +30,12 @@ use alloy_primitives::{keccak256, map::B256Set, B256};
use crossbeam_channel::Sender as CrossbeamSender;
use metrics::{Counter, Gauge, Histogram};
use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
use reth_execution_types::ExecutionOutcome;
use reth_metrics::Metrics;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{AccountReader, BlockReader, StateProvider, StateProviderFactory, StateReader};
use reth_provider::{
AccountReader, BlockExecutionOutput, BlockReader, StateProvider, StateProviderFactory,
StateReader,
};
use reth_revm::{database::StateProviderDatabase, state::EvmState};
use reth_trie::MultiProofTargets;
use std::{
@@ -259,7 +261,11 @@ where
///
/// This method is called from `run()` only after all execution tasks are complete.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn save_cache(self, execution_outcome: Arc<ExecutionOutcome<N::Receipt>>) {
fn save_cache(
self,
execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
valid_block_rx: mpsc::Receiver<()>,
) {
let start = Instant::now();
let Self { execution_cache, ctx: PrewarmContext { env, metrics, saved_cache, .. }, .. } =
@@ -277,7 +283,7 @@ where
// Insert state into cache while holding the lock
// Access the BundleState through the shared ExecutionOutcome
if new_cache.cache().insert_state(execution_outcome.state()).is_err() {
if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
// Clear the cache on error to prevent having a polluted cache
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on update error");
@@ -286,9 +292,11 @@ where
new_cache.update_metrics();
// Replace the shared cache with the new one; the previous cache (if any) is
// dropped.
*cached = Some(new_cache);
if valid_block_rx.recv().is_ok() {
// Replace the shared cache with the new one; the previous cache (if any) is
// dropped.
*cached = Some(new_cache);
}
});
let elapsed = start.elapsed();
@@ -419,9 +427,10 @@ where
// completed executing a set of transactions
self.send_multi_proof_targets(proof_targets);
}
PrewarmTaskEvent::Terminate { execution_outcome } => {
PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx } => {
trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
final_execution_outcome = Some(execution_outcome);
final_execution_outcome =
Some(execution_outcome.map(|outcome| (outcome, valid_block_rx)));
if finished_execution {
// all tasks are done, we can exit, which will save caches and exit
@@ -446,8 +455,8 @@ where
debug!(target: "engine::tree::payload_processor::prewarm", "Completed prewarm execution");
// save caches and finish using the shared ExecutionOutcome
if let Some(Some(execution_outcome)) = final_execution_outcome {
self.save_cache(execution_outcome);
if let Some(Some((execution_outcome, valid_block_rx))) = final_execution_outcome {
self.save_cache(execution_outcome, valid_block_rx);
}
}
}
@@ -567,9 +576,14 @@ where
.entered();
txs.recv()
} {
let enter =
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm tx", index, tx_hash=%tx.tx().tx_hash())
.entered();
let enter = debug_span!(
target: "engine::tree::payload_processor::prewarm",
"prewarm tx",
index,
tx_hash = %tx.tx().tx_hash(),
is_success = tracing::field::Empty,
)
.entered();
// create the tx env
let start = Instant::now();
@@ -810,7 +824,12 @@ pub(super) enum PrewarmTaskEvent<R> {
Terminate {
/// The final execution outcome. Using `Arc` allows sharing with the main execution
/// path without cloning the expensive `BundleState`.
execution_outcome: Option<Arc<ExecutionOutcome<R>>>,
execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
/// Receiver for the block validation result.
///
/// Cache saving is racing the state root validation. We optimistically construct the
/// updated cache but only save it once we know the block is valid.
valid_block_rx: mpsc::Receiver<()>,
},
/// The outcome of a pre-warm task
Outcome {

View File

@@ -0,0 +1,250 @@
//! Receipt root computation in a background task.
//!
//! This module provides a streaming receipt root builder that computes the receipt trie root
//! in a background thread. Receipts are sent via a channel with their index, and for each
//! receipt received, the builder incrementally flushes leaves to the underlying
//! [`OrderedTrieRootEncodedBuilder`] when possible. When the channel closes, the task returns the
//! computed root.
use alloy_eips::Encodable2718;
use alloy_primitives::{Bloom, B256};
use crossbeam_channel::Receiver;
use reth_primitives_traits::Receipt;
use reth_trie_common::ordered_root::OrderedTrieRootEncodedBuilder;
use tokio::sync::oneshot;
/// Receipt with index, ready to be sent to the background task for encoding and trie building.
#[derive(Debug, Clone)]
pub struct IndexedReceipt<R> {
/// The transaction index within the block.
pub index: usize,
/// The receipt.
pub receipt: R,
}
impl<R> IndexedReceipt<R> {
/// Creates a new indexed receipt.
#[inline]
pub const fn new(index: usize, receipt: R) -> Self {
Self { index, receipt }
}
}
/// Handle for running the receipt root computation in a background task.
///
/// This struct holds the channels needed to receive receipts and send the result.
/// Use [`Self::run`] to execute the computation (typically in a spawned blocking task).
#[derive(Debug)]
pub struct ReceiptRootTaskHandle<R> {
/// Receiver for indexed receipts.
receipt_rx: Receiver<IndexedReceipt<R>>,
/// Sender for the computed result.
result_tx: oneshot::Sender<(B256, Bloom)>,
}
impl<R: Receipt> ReceiptRootTaskHandle<R> {
/// Creates a new handle from the receipt receiver and result sender channels.
pub const fn new(
receipt_rx: Receiver<IndexedReceipt<R>>,
result_tx: oneshot::Sender<(B256, Bloom)>,
) -> Self {
Self { receipt_rx, result_tx }
}
/// Runs the receipt root computation, consuming the handle.
///
/// This method receives indexed receipts from the channel, encodes them,
/// and builds the trie incrementally. When all receipts have been received
/// (channel closed), it sends the result through the oneshot channel.
///
/// This is designed to be called inside a blocking task (e.g., via
/// `executor.spawn_blocking(move || handle.run(receipts_len))`).
///
/// # Arguments
///
/// * `receipts_len` - The total number of receipts expected. This is needed to correctly order
/// the trie keys according to RLP encoding rules.
///
/// # Panics
///
/// Panics if the number of receipts received doesn't match `receipts_len`.
pub fn run(self, receipts_len: usize) {
let mut builder = OrderedTrieRootEncodedBuilder::new(receipts_len);
let mut aggregated_bloom = Bloom::ZERO;
let mut encode_buf = Vec::new();
for indexed_receipt in self.receipt_rx {
let receipt_with_bloom = indexed_receipt.receipt.with_bloom_ref();
encode_buf.clear();
receipt_with_bloom.encode_2718(&mut encode_buf);
aggregated_bloom |= *receipt_with_bloom.bloom_ref();
builder.push_unchecked(indexed_receipt.index, &encode_buf);
}
let root = builder.finalize().expect("receipt root builder incomplete");
let _ = self.result_tx.send((root, aggregated_bloom));
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_consensus::{proofs::calculate_receipt_root, TxReceipt};
use alloy_primitives::{b256, hex, Address, Bytes, Log};
use crossbeam_channel::bounded;
use reth_ethereum_primitives::{Receipt, TxType};
#[tokio::test]
async fn test_receipt_root_task_empty() {
let (_tx, rx) = bounded::<IndexedReceipt<Receipt>>(1);
let (result_tx, result_rx) = oneshot::channel();
drop(_tx);
let handle = ReceiptRootTaskHandle::new(rx, result_tx);
tokio::task::spawn_blocking(move || handle.run(0)).await.unwrap();
let (root, bloom) = result_rx.await.unwrap();
// Empty trie root
assert_eq!(root, reth_trie_common::EMPTY_ROOT_HASH);
assert_eq!(bloom, Bloom::ZERO);
}
#[tokio::test]
async fn test_receipt_root_task_single_receipt() {
let receipts: Vec<Receipt> = vec![Receipt::default()];
let (tx, rx) = bounded(1);
let (result_tx, result_rx) = oneshot::channel();
let receipts_len = receipts.len();
let handle = ReceiptRootTaskHandle::new(rx, result_tx);
let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
for (i, receipt) in receipts.clone().into_iter().enumerate() {
tx.send(IndexedReceipt::new(i, receipt)).unwrap();
}
drop(tx);
join_handle.await.unwrap();
let (root, _bloom) = result_rx.await.unwrap();
// Verify against the standard calculation
let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
let expected_root = calculate_receipt_root(&receipts_with_bloom);
assert_eq!(root, expected_root);
}
#[tokio::test]
async fn test_receipt_root_task_multiple_receipts() {
let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
let (tx, rx) = bounded(4);
let (result_tx, result_rx) = oneshot::channel();
let receipts_len = receipts.len();
let handle = ReceiptRootTaskHandle::new(rx, result_tx);
let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
for (i, receipt) in receipts.into_iter().enumerate() {
tx.send(IndexedReceipt::new(i, receipt)).unwrap();
}
drop(tx);
join_handle.await.unwrap();
let (root, bloom) = result_rx.await.unwrap();
// Verify against expected values from existing test
assert_eq!(
root,
b256!("0x61353b4fb714dc1fccacbf7eafc4273e62f3d1eed716fe41b2a0cd2e12c63ebc")
);
assert_eq!(
bloom,
Bloom::from(hex!("00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"))
);
}
#[tokio::test]
async fn test_receipt_root_matches_standard_calculation() {
// Create some receipts with actual data
let receipts = vec![
Receipt {
tx_type: TxType::Legacy,
cumulative_gas_used: 21000,
success: true,
logs: vec![],
},
Receipt {
tx_type: TxType::Eip1559,
cumulative_gas_used: 42000,
success: true,
logs: vec![Log {
address: Address::ZERO,
data: alloy_primitives::LogData::new_unchecked(vec![B256::ZERO], Bytes::new()),
}],
},
Receipt {
tx_type: TxType::Eip2930,
cumulative_gas_used: 63000,
success: false,
logs: vec![],
},
];
// Calculate expected values first (before we move receipts)
let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
let expected_root = calculate_receipt_root(&receipts_with_bloom);
let expected_bloom =
receipts_with_bloom.iter().fold(Bloom::ZERO, |bloom, r| bloom | r.bloom_ref());
// Calculate using the task
let (tx, rx) = bounded(4);
let (result_tx, result_rx) = oneshot::channel();
let receipts_len = receipts.len();
let handle = ReceiptRootTaskHandle::new(rx, result_tx);
let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
for (i, receipt) in receipts.into_iter().enumerate() {
tx.send(IndexedReceipt::new(i, receipt)).unwrap();
}
drop(tx);
join_handle.await.unwrap();
let (task_root, task_bloom) = result_rx.await.unwrap();
assert_eq!(task_root, expected_root);
assert_eq!(task_bloom, expected_bloom);
}
#[tokio::test]
async fn test_receipt_root_task_out_of_order() {
let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
// Calculate expected values first (before we move receipts)
let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
let expected_root = calculate_receipt_root(&receipts_with_bloom);
let (tx, rx) = bounded(4);
let (result_tx, result_rx) = oneshot::channel();
let receipts_len = receipts.len();
let handle = ReceiptRootTaskHandle::new(rx, result_tx);
let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
// Send in reverse order to test out-of-order handling
for (i, receipt) in receipts.into_iter().enumerate().rev() {
tx.send(IndexedReceipt::new(i, receipt)).unwrap();
}
drop(tx);
join_handle.await.unwrap();
let (root, _bloom) = result_rx.await.unwrap();
assert_eq!(root, expected_root);
}
}

View File

@@ -15,9 +15,11 @@ use alloy_eip7928::BlockAccessList;
use alloy_eips::{eip1898::BlockWithParent, NumHash};
use alloy_evm::Evm;
use alloy_primitives::B256;
use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle};
use rayon::prelude::*;
use reth_chain_state::{CanonicalInMemoryState, DeferredTrieData, ExecutedBlock};
use reth_consensus::{ConsensusError, FullConsensus};
use reth_chain_state::{CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, LazyOverlay};
use reth_consensus::{ConsensusError, FullConsensus, ReceiptRootBloom};
use reth_engine_primitives::{
ConfigureEngineEvm, ExecutableTxIterator, ExecutionPayload, InvalidBlockHook, PayloadValidator,
};
@@ -35,12 +37,13 @@ use reth_primitives_traits::{
};
use reth_provider::{
providers::OverlayStateProviderFactory, BlockExecutionOutput, BlockNumReader, BlockReader,
ChangeSetReader, DatabaseProviderFactory, DatabaseProviderROFactory, ExecutionOutcome,
HashedPostStateProvider, ProviderError, PruneCheckpointReader, StageCheckpointReader,
StateProvider, StateProviderFactory, StateReader, TrieReader,
ChangeSetReader, DatabaseProviderFactory, DatabaseProviderROFactory, HashedPostStateProvider,
ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProvider,
StateProviderFactory, StateReader,
};
use reth_revm::db::State;
use reth_trie::{updates::TrieUpdates, HashedPostState, StateRoot, TrieInputSorted};
use reth_trie::{updates::TrieUpdates, HashedPostState, StateRoot};
use reth_trie_db::ChangesetCache;
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
use revm_primitives::Address;
use std::{
@@ -129,6 +132,8 @@ where
metrics: EngineApiMetrics,
/// Validator for the payload.
validator: V,
/// Changeset cache for in-memory trie changesets
changeset_cache: ChangesetCache,
}
impl<N, P, Evm, V> BasicEngineValidator<P, Evm, V>
@@ -136,7 +141,6 @@ where
N: NodePrimitives,
P: DatabaseProviderFactory<
Provider: BlockReader
+ TrieReader
+ StageCheckpointReader
+ PruneCheckpointReader
+ ChangeSetReader
@@ -160,6 +164,7 @@ where
validator: V,
config: TreeConfig,
invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
changeset_cache: ChangesetCache,
) -> Self {
let precompile_cache_map = PrecompileCacheMap::default();
let payload_processor = PayloadProcessor::new(
@@ -179,6 +184,7 @@ where
invalid_block_hook,
metrics: EngineApiMetrics::default(),
validator,
changeset_cache,
}
}
@@ -363,7 +369,6 @@ where
}
let parent_hash = input.parent_hash();
let block_num_hash = input.num_hash();
trace!(target: "engine::tree::payload_validator", "Fetching block state provider");
let _enter =
@@ -418,13 +423,23 @@ where
.map_err(Box::<dyn std::error::Error + Send + Sync>::from))
.map(Arc::new);
// Create lazy overlay from ancestors - this doesn't block, allowing execution to start
// before the trie data is ready. The overlay will be computed on first access.
let (lazy_overlay, anchor_hash) = Self::get_parent_lazy_overlay(parent_hash, ctx.state());
// Create overlay factory for payload processor (StateRootTask path needs it for
// multiproofs)
let overlay_factory =
OverlayStateProviderFactory::new(self.provider.clone(), self.changeset_cache.clone())
.with_block_hash(Some(anchor_hash))
.with_lazy_overlay(lazy_overlay);
// Spawn the appropriate processor based on strategy
let mut handle = ensure_ok!(self.spawn_payload_processor(
env.clone(),
txs,
provider_builder,
parent_hash,
ctx.state(),
overlay_factory.clone(),
strategy,
block_access_list,
));
@@ -440,19 +455,44 @@ where
state_provider = Box::new(InstrumentedStateProvider::new(state_provider, "engine"));
}
// Execute the block and handle any execution errors
let (output, senders) = match self.execute_block(state_provider, env, &input, &mut handle) {
Ok(output) => output,
Err(err) => return self.handle_execution_error(input, err, &parent_block),
};
// Execute the block and handle any execution errors.
// The receipt root task is spawned before execution and receives receipts incrementally
// as transactions complete, allowing parallel computation during execution.
let (output, senders, receipt_root_rx) =
match self.execute_block(state_provider, env, &input, &mut handle) {
Ok(output) => output,
Err(err) => return self.handle_execution_error(input, err, &parent_block),
};
// After executing the block we can stop prewarming transactions
handle.stop_prewarming_execution();
// Create ExecutionOutcome early so we can terminate caching before validation and state
// root computation. Using Arc allows sharing with both the caching task and the deferred
// trie task without cloning the expensive BundleState.
let output = Arc::new(output);
// Terminate caching task early since execution is complete and caching is no longer
// needed. This frees up resources while state root computation continues.
let valid_block_tx = handle.terminate_caching(Some(output.clone()));
let block = self.convert_to_block(input)?.with_senders(senders);
// Wait for the receipt root computation to complete.
let receipt_root_bloom = Some(
receipt_root_rx
.blocking_recv()
.expect("receipt root task dropped sender without result"),
);
let hashed_state = ensure_ok_post_block!(
self.validate_post_execution(&block, &parent_block, &output, &mut ctx),
self.validate_post_execution(
&block,
&parent_block,
&output,
&mut ctx,
receipt_root_bloom
),
block
);
@@ -485,11 +525,7 @@ where
}
StateRootStrategy::Parallel => {
debug!(target: "engine::tree::payload_validator", "Using parallel state root algorithm");
match self.compute_state_root_parallel(
block.parent_hash(),
&hashed_state,
ctx.state(),
) {
match self.compute_state_root_parallel(overlay_factory.clone(), &hashed_state) {
Ok(result) => {
let elapsed = root_time.elapsed();
info!(
@@ -525,7 +561,7 @@ where
}
let (root, updates) = ensure_ok_post_block!(
self.compute_state_root_serial(block.parent_hash(), &hashed_state, ctx.state()),
self.compute_state_root_serial(overlay_factory.clone(), &hashed_state),
block
);
(root, updates, root_time.elapsed())
@@ -555,14 +591,18 @@ where
.into())
}
// Create ExecutionOutcome and wrap in Arc for sharing with both the caching task
// and the deferred trie task. This avoids cloning the expensive BundleState.
let execution_outcome = Arc::new(ExecutionOutcome::from((output, block_num_hash.number)));
if let Some(valid_block_tx) = valid_block_tx {
let _ = valid_block_tx.send(());
}
// Terminate prewarming task with the shared execution outcome
handle.terminate_caching(Some(Arc::clone(&execution_outcome)));
Ok(self.spawn_deferred_trie_task(block, execution_outcome, &ctx, hashed_state, trie_output))
Ok(self.spawn_deferred_trie_task(
block,
output,
&ctx,
hashed_state,
trie_output,
overlay_factory,
))
}
/// Return sealed block header from database or in-memory state by hash.
@@ -600,13 +640,21 @@ where
/// Executes a block with the given state provider
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
#[expect(clippy::type_complexity)]
fn execute_block<S, Err, T>(
&mut self,
state_provider: S,
env: ExecutionEnv<Evm>,
input: &BlockOrPayload<T>,
handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err, N::Receipt>,
) -> Result<(BlockExecutionOutput<N::Receipt>, Vec<Address>), InsertBlockErrorKind>
) -> Result<
(
BlockExecutionOutput<N::Receipt>,
Vec<Address>,
tokio::sync::oneshot::Receiver<(B256, alloy_primitives::Bloom)>,
),
InsertBlockErrorKind,
>
where
S: StateProvider + Send,
Err: core::error::Error + Send + Sync + 'static,
@@ -645,6 +693,14 @@ where
});
}
// Spawn background task to compute receipt root and logs bloom incrementally.
// Unbounded channel is used since tx count bounds capacity anyway (max ~30k txs per block).
let receipts_len = input.transaction_count();
let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded();
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
self.payload_processor.executor().spawn_blocking(move || task_handle.run(receipts_len));
let execution_start = Instant::now();
let state_hook = Box::new(handle.state_hook());
let (output, senders) = self.metrics.execute_metered(
@@ -652,15 +708,30 @@ where
handle.iter_transactions().map(|res| res.map_err(BlockExecutionError::other)),
input.transaction_count(),
state_hook,
|receipts| {
// Send the latest receipt to the background task for incremental root computation.
// The receipt is cloned here; encoding happens in the background thread.
if let Some(receipt) = receipts.last() {
// Infer tx_index from the number of receipts collected so far
let tx_index = receipts.len() - 1;
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
}
},
)?;
drop(receipt_tx);
let execution_finish = Instant::now();
let execution_time = execution_finish.duration_since(execution_start);
debug!(target: "engine::tree::payload_validator", elapsed = ?execution_time, "Executed block");
Ok((output, senders))
Ok((output, senders, result_rx))
}
/// Compute state root for the given hashed post state in parallel.
///
/// Uses an overlay factory which provides the state of the parent block, along with the
/// [`HashedPostState`] containing the changes of this block, to compute the state root and
/// trie updates for this block.
///
/// # Returns
///
/// Returns `Ok(_)` if computed successfully.
@@ -668,58 +739,39 @@ where
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
fn compute_state_root_parallel(
&self,
parent_hash: B256,
overlay_factory: OverlayStateProviderFactory<P>,
hashed_state: &HashedPostState,
state: &EngineApiTreeState<N>,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
let (mut input, block_hash) = self.compute_trie_input(parent_hash, state)?;
// Extend state overlay with current block's sorted state.
input.prefix_sets.extend(hashed_state.construct_prefix_sets());
let sorted_hashed_state = hashed_state.clone_into_sorted();
Arc::make_mut(&mut input.state).extend_ref(&sorted_hashed_state);
let TrieInputSorted { nodes, state, prefix_sets: prefix_sets_mut } = input;
let factory = OverlayStateProviderFactory::new(self.provider.clone())
.with_block_hash(Some(block_hash))
.with_trie_overlay(Some(nodes))
.with_hashed_state_overlay(Some(state));
// The `hashed_state` argument is already taken into account as part of the overlay, but we
// The `hashed_state` argument will be taken into account as part of the overlay, but we
// need to use the prefix sets which were generated from it to indicate to the
// ParallelStateRoot which parts of the trie need to be recomputed.
let prefix_sets = prefix_sets_mut.freeze();
ParallelStateRoot::new(factory, prefix_sets).incremental_root_with_updates()
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
let overlay_factory =
overlay_factory.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
ParallelStateRoot::new(overlay_factory, prefix_sets).incremental_root_with_updates()
}
/// Compute state root for the given hashed post state in serial.
///
/// Uses an overlay factory which provides the state of the parent block, along with the
/// [`HashedPostState`] containing the changes of this block, to compute the state root and
/// trie updates for this block.
fn compute_state_root_serial(
&self,
parent_hash: B256,
overlay_factory: OverlayStateProviderFactory<P>,
hashed_state: &HashedPostState,
state: &EngineApiTreeState<N>,
) -> ProviderResult<(B256, TrieUpdates)> {
let (mut input, block_hash) = self.compute_trie_input(parent_hash, state)?;
// The `hashed_state` argument will be taken into account as part of the overlay, but we
// need to use the prefix sets which were generated from it to indicate to the
// StateRoot which parts of the trie need to be recomputed.
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
let overlay_factory =
overlay_factory.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
// Extend state overlay with current block's sorted state.
input.prefix_sets.extend(hashed_state.construct_prefix_sets());
let sorted_hashed_state = hashed_state.clone_into_sorted();
Arc::make_mut(&mut input.state).extend_ref(&sorted_hashed_state);
let TrieInputSorted { nodes, state, .. } = input;
let prefix_sets = hashed_state.construct_prefix_sets();
let factory = OverlayStateProviderFactory::new(self.provider.clone())
.with_block_hash(Some(block_hash))
.with_trie_overlay(Some(nodes))
.with_hashed_state_overlay(Some(state));
let provider = factory.database_provider_ro()?;
let provider = overlay_factory.database_provider_ro()?;
Ok(StateRoot::new(&provider, &provider)
.with_prefix_sets(prefix_sets.freeze())
.with_prefix_sets(prefix_sets)
.root_with_updates()?)
}
@@ -729,6 +781,9 @@ where
/// - parent header validation
/// - post-execution consensus validation
/// - state-root based post-execution validation
///
/// If `receipt_root_bloom` is provided, it will be used instead of computing the receipt root
/// and logs bloom from the receipts.
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
fn validate_post_execution<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
&self,
@@ -736,6 +791,7 @@ where
parent_block: &SealedHeader<N::BlockHeader>,
output: &BlockExecutionOutput<N::Receipt>,
ctx: &mut TreeCtx<'_, N>,
receipt_root_bloom: Option<ReceiptRootBloom>,
) -> Result<HashedPostState, InsertBlockErrorKind>
where
V: PayloadValidator<T, Block = N::Block>,
@@ -762,7 +818,9 @@ where
let _enter =
debug_span!(target: "engine::tree::payload_validator", "validate_block_post_execution")
.entered();
if let Err(err) = self.consensus.validate_block_post_execution(block, output) {
if let Err(err) =
self.consensus.validate_block_post_execution(block, output, receipt_root_bloom)
{
// call post-block hook
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
return Err(err.into())
@@ -802,6 +860,11 @@ where
///
/// The method handles strategy fallbacks if the preferred approach fails, ensuring
/// block execution always completes with a valid state root.
///
/// # Arguments
///
/// * `overlay_factory` - Pre-computed overlay factory for multiproof generation
/// (`StateRootTask`)
#[allow(clippy::too_many_arguments)]
#[instrument(
level = "debug",
@@ -814,8 +877,7 @@ where
env: ExecutionEnv<Evm>,
txs: T,
provider_builder: StateProviderBuilder<N, P>,
parent_hash: B256,
state: &EngineApiTreeState<N>,
overlay_factory: OverlayStateProviderFactory<P>,
strategy: StateRootStrategy,
block_access_list: Option<Arc<BlockAccessList>>,
) -> Result<
@@ -828,32 +890,14 @@ where
> {
match strategy {
StateRootStrategy::StateRootTask => {
// Compute trie input
let trie_input_start = Instant::now();
let (trie_input, block_hash) = self.compute_trie_input(parent_hash, state)?;
// Create OverlayStateProviderFactory with sorted trie data for multiproofs
let TrieInputSorted { nodes, state, .. } = trie_input;
let multiproof_provider_factory =
OverlayStateProviderFactory::new(self.provider.clone())
.with_block_hash(Some(block_hash))
.with_trie_overlay(Some(nodes))
.with_hashed_state_overlay(Some(state));
// Record trie input duration including OverlayStateProviderFactory setup
self.metrics
.block_validation
.trie_input_duration
.record(trie_input_start.elapsed().as_secs_f64());
let spawn_start = Instant::now();
// Use the pre-computed overlay factory for multiproofs
let handle = self.payload_processor.spawn(
env,
txs,
provider_builder,
multiproof_provider_factory,
overlay_factory,
&self.config,
block_access_list,
);
@@ -947,99 +991,36 @@ where
self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
}
/// Computes [`TrieInputSorted`] for the provided parent hash by combining database state
/// with in-memory overlays.
/// Creates a [`LazyOverlay`] for the parent block without blocking.
///
/// The goal of this function is to take in-memory blocks and generate a [`TrieInputSorted`]
/// that extends from the highest persisted ancestor up through the parent. This enables state
/// root computation and proof generation without requiring all blocks to be persisted
/// first.
/// Returns a lazy overlay that will compute the trie input on first access, and the anchor
/// block hash (the highest persisted ancestor). This allows execution to start immediately
/// while the trie input computation is deferred until the overlay is actually needed.
///
/// It works as follows:
/// 1. Collect in-memory overlay blocks using [`crate::tree::TreeState::blocks_by_hash`]. This
/// returns the highest persisted ancestor hash (`block_hash`) and the list of in-memory
/// blocks building on top of it.
/// 2. Fast path: If the tip in-memory block's trie input is already anchored to `block_hash`
/// (its `anchor_hash` matches `block_hash`), reuse it directly.
/// 3. Slow path: Build a new [`TrieInputSorted`] by aggregating the overlay blocks (from oldest
/// to newest) on top of the database state at `block_hash`.
#[instrument(
level = "debug",
target = "engine::tree::payload_validator",
skip_all,
fields(parent_hash)
)]
fn compute_trie_input(
&self,
/// If parent is on disk (no in-memory blocks), returns `None` for the lazy overlay.
fn get_parent_lazy_overlay(
parent_hash: B256,
state: &EngineApiTreeState<N>,
) -> ProviderResult<(TrieInputSorted, B256)> {
let wait_start = Instant::now();
let (block_hash, blocks) =
) -> (Option<LazyOverlay>, B256) {
let (anchor_hash, blocks) =
state.tree_state.blocks_by_hash(parent_hash).unwrap_or_else(|| (parent_hash, vec![]));
// Fast path: if the tip block's anchor matches the persisted ancestor hash, reuse its
// TrieInput. This means the TrieInputSorted already aggregates all in-memory overlays
// from that ancestor, so we can avoid re-aggregation.
if let Some(tip_block) = blocks.first() {
let data = tip_block.trie_data();
if let (Some(anchor_hash), Some(trie_input)) =
(data.anchor_hash(), data.trie_input().cloned()) &&
anchor_hash == block_hash
{
trace!(target: "engine::tree::payload_validator", %block_hash,"Reusing trie input with matching anchor hash");
self.metrics
.block_validation
.deferred_trie_wait_duration
.record(wait_start.elapsed().as_secs_f64());
return Ok(((*trie_input).clone(), block_hash));
}
}
if blocks.is_empty() {
debug!(target: "engine::tree::payload_validator", "Parent found on disk");
} else {
debug!(target: "engine::tree::payload_validator", historical = ?block_hash, blocks = blocks.len(), "Parent found in memory");
debug!(target: "engine::tree::payload_validator", "Parent found on disk, no lazy overlay needed");
return (None, anchor_hash);
}
// Extend with contents of parent in-memory blocks directly in sorted form.
let input = Self::merge_overlay_trie_input(&blocks);
debug!(
target: "engine::tree::payload_validator",
%anchor_hash,
num_blocks = blocks.len(),
"Creating lazy overlay for in-memory blocks"
);
self.metrics
.block_validation
.deferred_trie_wait_duration
.record(wait_start.elapsed().as_secs_f64());
Ok((input, block_hash))
}
// Extract deferred trie data handles (non-blocking)
let handles: Vec<DeferredTrieData> = blocks.iter().map(|b| b.trie_data_handle()).collect();
/// Aggregates multiple in-memory blocks into a single [`TrieInputSorted`] by combining their
/// state changes.
///
/// The input `blocks` vector is ordered newest -> oldest (see `TreeState::blocks_by_hash`).
/// We iterate it in reverse so we start with the oldest block's trie data and extend forward
/// toward the newest, ensuring newer state takes precedence.
fn merge_overlay_trie_input(blocks: &[ExecutedBlock<N>]) -> TrieInputSorted {
let mut input = TrieInputSorted::default();
let mut blocks_iter = blocks.iter().rev().peekable();
if let Some(first) = blocks_iter.next() {
let data = first.trie_data();
input.state = data.hashed_state;
input.nodes = data.trie_updates;
// Only clone and mutate if there are more in-memory blocks.
if blocks_iter.peek().is_some() {
let state_mut = Arc::make_mut(&mut input.state);
let nodes_mut = Arc::make_mut(&mut input.nodes);
for block in blocks_iter {
let data = block.trie_data();
state_mut.extend_ref(data.hashed_state.as_ref());
nodes_mut.extend_ref(data.trie_updates.as_ref());
}
}
}
input
(Some(LazyOverlay::new(anchor_hash, handles)), anchor_hash)
}
/// Spawns a background task to compute and sort trie data for the executed block.
@@ -1061,10 +1042,11 @@ where
fn spawn_deferred_trie_task(
&self,
block: RecoveredBlock<N::Block>,
execution_outcome: Arc<ExecutionOutcome<N::Receipt>>,
execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
ctx: &TreeCtx<'_, N>,
hashed_state: HashedPostState,
trie_output: TrieUpdates,
overlay_factory: OverlayStateProviderFactory<P>,
) -> ExecutedBlock<N> {
// Capture parent hash and ancestor overlays for deferred trie input construction.
let (anchor_hash, overlay_blocks) = ctx
@@ -1088,9 +1070,21 @@ where
let deferred_handle_task = deferred_trie_data.clone();
let block_validation_metrics = self.metrics.block_validation.clone();
// Capture block info and cache handle for changeset computation
let block_hash = block.hash();
let block_number = block.number();
let changeset_cache = self.changeset_cache.clone();
// Spawn background task to compute trie data. Calling `wait_cloned` will compute from
// the stored inputs and cache the result, so subsequent calls return immediately.
let compute_trie_input_task = move || {
let _span = debug_span!(
target: "engine::tree::payload_validator",
"compute_trie_input_task",
block_number
)
.entered();
let result = panic::catch_unwind(AssertUnwindSafe(|| {
let compute_start = Instant::now();
let computed = deferred_handle_task.wait_cloned();
@@ -1113,6 +1107,40 @@ where
.anchored_overlay_hashed_state_size
.record(anchored.trie_input.state.total_len() as f64);
}
// Compute and cache changesets using the computed trie_updates
let changeset_start = Instant::now();
// Get a provider from the overlay factory for trie cursor access
let changeset_result =
overlay_factory.database_provider_ro().and_then(|provider| {
reth_trie::changesets::compute_trie_changesets(
&provider,
&computed.trie_updates,
)
.map_err(ProviderError::Database)
});
match changeset_result {
Ok(changesets) => {
debug!(
target: "engine::tree::changeset",
?block_number,
elapsed = ?changeset_start.elapsed(),
"Computed and caching changesets"
);
changeset_cache.insert(block_hash, block_number, Arc::new(changesets));
}
Err(e) => {
warn!(
target: "engine::tree::changeset",
?block_number,
?e,
"Failed to compute changesets in deferred trie task"
);
}
}
}));
if result.is_err() {
@@ -1209,7 +1237,6 @@ impl<N, Types, P, Evm, V> EngineValidator<Types> for BasicEngineValidator<P, Evm
where
P: DatabaseProviderFactory<
Provider: BlockReader
+ TrieReader
+ StageCheckpointReader
+ PruneCheckpointReader
+ ChangeSetReader
@@ -1262,7 +1289,7 @@ where
fn on_inserted_executed_block(&self, block: ExecutedBlock<N>) {
self.payload_processor.on_inserted_executed_block(
block.recovered_block.block_with_parent(),
block.execution_output.state(),
&block.execution_output.state,
);
}
}

View File

@@ -7,6 +7,7 @@ use crate::{
PersistTarget, TreeConfig,
},
};
use reth_trie_db::ChangesetCache;
use alloy_eips::eip1898::BlockWithParent;
use alloy_primitives::{
@@ -26,7 +27,7 @@ use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_ethereum_primitives::{Block, EthPrimitives};
use reth_evm_ethereum::MockEvmConfig;
use reth_primitives_traits::Block as _;
use reth_provider::{test_utils::MockEthProvider, ExecutionOutcome};
use reth_provider::test_utils::MockEthProvider;
use std::{
collections::BTreeMap,
str::FromStr,
@@ -192,6 +193,7 @@ impl TestHarness {
let payload_builder = PayloadBuilderHandle::new(to_payload_service);
let evm_config = MockEvmConfig::default();
let changeset_cache = ChangesetCache::new();
let engine_validator = BasicEngineValidator::new(
provider.clone(),
consensus.clone(),
@@ -199,6 +201,7 @@ impl TestHarness {
payload_validator,
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
changeset_cache.clone(),
);
let tree = EngineApiTreeHandler::new(
@@ -215,6 +218,7 @@ impl TestHarness {
TreeConfig::default().with_legacy_state_root(false).with_has_enough_parallelism(true),
EngineApiKind::Ethereum,
evm_config,
changeset_cache,
);
let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
@@ -388,6 +392,7 @@ impl ValidatorTestHarness {
let provider = harness.provider.clone();
let payload_validator = MockEngineValidator;
let evm_config = MockEvmConfig::default();
let changeset_cache = ChangesetCache::new();
let validator = BasicEngineValidator::new(
provider,
@@ -396,6 +401,7 @@ impl ValidatorTestHarness {
payload_validator,
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
changeset_cache,
);
Self { harness, validator, metrics: TestMetrics::default() }
@@ -832,7 +838,7 @@ fn test_tree_state_on_new_head_deep_fork() {
for block in &chain_a {
test_harness.tree.state.tree_state.insert_executed(ExecutedBlock::new(
Arc::new(block.clone()),
Arc::new(ExecutionOutcome::default()),
Arc::new(BlockExecutionOutput::default()),
empty_trie_data(),
));
}
@@ -841,7 +847,7 @@ fn test_tree_state_on_new_head_deep_fork() {
for block in &chain_b {
test_harness.tree.state.tree_state.insert_executed(ExecutedBlock::new(
Arc::new(block.clone()),
Arc::new(ExecutionOutcome::default()),
Arc::new(BlockExecutionOutput::default()),
empty_trie_data(),
));
}
@@ -1002,6 +1008,15 @@ async fn test_engine_tree_live_sync_transition_required_blocks_requested() {
_ => panic!("Unexpected event: {event:#?}"),
}
// After backfill completes with head not buffered, we also request head download
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
assert_eq!(hash_set, HashSet::from_iter([main_chain_last_hash]));
}
_ => panic!("Unexpected event: {event:#?}"),
}
let _ = test_harness
.tree
.on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain

View File

@@ -38,6 +38,7 @@ tempfile.workspace = true
default = []
otlp = ["reth-tracing/otlp", "reth-node-core/otlp"]
otlp-logs = ["reth-tracing/otlp-logs", "reth-node-core/otlp-logs"]
dev = ["reth-cli-commands/arbitrary"]

View File

@@ -19,7 +19,7 @@ use reth_db::DatabaseEnv;
use reth_node_api::NodePrimitives;
use reth_node_builder::{NodeBuilder, WithLaunchContext};
use reth_node_core::{
args::{LogArgs, OtlpInitStatus, TraceArgs},
args::{LogArgs, OtlpInitStatus, OtlpLogsStatus, TraceArgs},
version::version_metadata,
};
use reth_node_metrics::recorder::install_prometheus_recorder;
@@ -223,16 +223,19 @@ impl<
/// If file logging is enabled, this function returns a guard that must be kept alive to ensure
/// that all logs are flushed to disk.
///
/// If an OTLP endpoint is specified, it will export metrics to the configured collector.
/// If an OTLP endpoint is specified, it will export traces and logs to the configured
/// collector.
pub fn init_tracing(
&mut self,
runner: &CliRunner,
mut layers: Layers,
) -> eyre::Result<Option<FileWorkerGuard>> {
let otlp_status = runner.block_on(self.traces.init_otlp_tracing(&mut layers))?;
let otlp_logs_status = runner.block_on(self.traces.init_otlp_logs(&mut layers))?;
let guard = self.logs.init_tracing_with_layers(layers)?;
info!(target: "reth::cli", "Initialized tracing, debug log directory: {}", self.logs.log_file_directory);
match otlp_status {
OtlpInitStatus::Started(endpoint) => {
info!(target: "reth::cli", "Started OTLP {:?} tracing export to {endpoint}", self.traces.protocol);
@@ -243,6 +246,16 @@ impl<
OtlpInitStatus::Disabled => {}
}
match otlp_logs_status {
OtlpLogsStatus::Started(endpoint) => {
info!(target: "reth::cli", "Started OTLP {:?} logs export to {endpoint}", self.traces.protocol);
}
OtlpLogsStatus::NoFeature => {
warn!(target: "reth::cli", "Provided OTLP logs arguments do not have effect, compile with the `otlp-logs` feature")
}
OtlpLogsStatus::Disabled => {}
}
Ok(guard)
}
}

View File

@@ -15,7 +15,7 @@ use alloc::{fmt::Debug, sync::Arc};
use alloy_consensus::{constants::MAXIMUM_EXTRA_DATA_SIZE, EMPTY_OMMER_ROOT_HASH};
use alloy_eips::eip7840::BlobParams;
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_consensus::{Consensus, ConsensusError, FullConsensus, HeaderValidator};
use reth_consensus::{Consensus, ConsensusError, FullConsensus, HeaderValidator, ReceiptRootBloom};
use reth_consensus_common::validation::{
validate_4844_header_standalone, validate_against_parent_4844,
validate_against_parent_eip1559_base_fee, validate_against_parent_gas_limit,
@@ -74,8 +74,15 @@ where
&self,
block: &RecoveredBlock<N::Block>,
result: &BlockExecutionResult<N::Receipt>,
receipt_root_bloom: Option<ReceiptRootBloom>,
) -> Result<(), ConsensusError> {
validate_block_post_execution(block, &self.chain_spec, &result.receipts, &result.requests)
validate_block_post_execution(
block,
&self.chain_spec,
&result.receipts,
&result.requests,
receipt_root_bloom,
)
}
}

View File

@@ -12,11 +12,15 @@ use reth_primitives_traits::{
///
/// - Compares the receipts root in the block header to the block body
/// - Compares the gas used in the block header to the actual gas usage after execution
///
/// If `receipt_root_bloom` is provided, the pre-computed receipt root and logs bloom are used
/// instead of computing them from the receipts.
pub fn validate_block_post_execution<B, R, ChainSpec>(
block: &RecoveredBlock<B>,
chain_spec: &ChainSpec,
receipts: &[R],
requests: &Requests,
receipt_root_bloom: Option<(B256, Bloom)>,
) -> Result<(), ConsensusError>
where
B: Block,
@@ -37,19 +41,26 @@ where
// operation as hashing that is required for state root got calculated in every
// transaction This was replaced with is_success flag.
// See more about EIP here: https://eips.ethereum.org/EIPS/eip-658
if chain_spec.is_byzantium_active_at_block(block.header().number()) &&
let Err(error) = verify_receipts(
block.header().receipts_root(),
block.header().logs_bloom(),
receipts,
)
{
let receipts = receipts
.iter()
.map(|r| Bytes::from(r.with_bloom_ref().encoded_2718()))
.collect::<Vec<_>>();
tracing::debug!(%error, ?receipts, "receipts verification failed");
return Err(error)
if chain_spec.is_byzantium_active_at_block(block.header().number()) {
let result = if let Some((receipts_root, logs_bloom)) = receipt_root_bloom {
compare_receipts_root_and_logs_bloom(
receipts_root,
logs_bloom,
block.header().receipts_root(),
block.header().logs_bloom(),
)
} else {
verify_receipts(block.header().receipts_root(), block.header().logs_bloom(), receipts)
};
if let Err(error) = result {
let receipts = receipts
.iter()
.map(|r| Bytes::from(r.with_bloom_ref().encoded_2718()))
.collect::<Vec<_>>();
tracing::debug!(%error, ?receipts, "receipts verification failed");
return Err(error)
}
}
// Validate that the header requests hash matches the calculated requests hash

View File

@@ -188,6 +188,7 @@ where
block: &'a SealedBlock<Block>,
) -> Result<EthBlockExecutionCtx<'a>, Self::Error> {
Ok(EthBlockExecutionCtx {
tx_count_hint: Some(block.transaction_count()),
parent_hash: block.header().parent_hash,
parent_beacon_block_root: block.header().parent_beacon_block_root,
ommers: &block.body().ommers,
@@ -202,6 +203,7 @@ where
attributes: Self::NextBlockEnvCtx,
) -> Result<EthBlockExecutionCtx<'_>, Self::Error> {
Ok(EthBlockExecutionCtx {
tx_count_hint: None,
parent_hash: parent.hash(),
parent_beacon_block_root: attributes.parent_beacon_block_root,
ommers: &[],
@@ -238,8 +240,9 @@ where
revm_spec_by_timestamp_and_block_number(self.chain_spec(), timestamp, block_number);
// configure evm env based on parent block
let mut cfg_env =
CfgEnv::new().with_chain_id(self.chain_spec().chain().id()).with_spec(spec);
let mut cfg_env = CfgEnv::new()
.with_chain_id(self.chain_spec().chain().id())
.with_spec_and_mainnet_gas_params(spec);
if let Some(blob_params) = &blob_params {
cfg_env.set_max_blobs_per_tx(blob_params.max_blobs_per_tx);
@@ -280,6 +283,7 @@ where
payload: &'a ExecutionData,
) -> Result<ExecutionCtxFor<'a, Self>, Self::Error> {
Ok(EthBlockExecutionCtx {
tx_count_hint: Some(payload.payload.transactions().len()),
parent_hash: payload.parent_hash(),
parent_beacon_block_root: payload.sidecar.parent_beacon_block_root(),
ommers: &[],
@@ -407,7 +411,7 @@ mod tests {
let db = CacheDB::<EmptyDBTyped<ProviderError>>::default();
let evm_env = EvmEnv {
cfg_env: CfgEnv::new().with_spec(SpecId::CONSTANTINOPLE),
cfg_env: CfgEnv::new().with_spec_and_mainnet_gas_params(SpecId::CONSTANTINOPLE),
..Default::default()
};
@@ -474,7 +478,7 @@ mod tests {
let db = CacheDB::<EmptyDBTyped<ProviderError>>::default();
let evm_env = EvmEnv {
cfg_env: CfgEnv::new().with_spec(SpecId::CONSTANTINOPLE),
cfg_env: CfgEnv::new().with_spec_and_mainnet_gas_params(SpecId::CONSTANTINOPLE),
..Default::default()
};

View File

@@ -65,7 +65,12 @@ impl BlockExecutorFactory for MockEvmConfig {
DB: Database + 'a,
I: Inspector<<Self::EvmFactory as EvmFactory>::Context<&'a mut State<DB>>> + 'a,
{
MockExecutor { result: self.exec_results.lock().pop().unwrap(), evm, hook: None }
MockExecutor {
result: self.exec_results.lock().pop().unwrap(),
evm,
hook: None,
receipts: Vec::new(),
}
}
}
@@ -76,6 +81,7 @@ pub struct MockExecutor<'a, DB: Database, I> {
evm: EthEvm<&'a mut State<DB>, I, PrecompilesMap>,
#[debug(skip)]
hook: Option<Box<dyn reth_evm::OnStateHook>>,
receipts: Vec<Receipt>,
}
impl<'a, DB: Database, I: Inspector<EthEvmContext<&'a mut State<DB>>>> BlockExecutor
@@ -89,6 +95,10 @@ impl<'a, DB: Database, I: Inspector<EthEvmContext<&'a mut State<DB>>>> BlockExec
Ok(())
}
fn receipts(&self) -> &[Self::Receipt] {
&self.receipts
}
fn execute_transaction_without_commit(
&mut self,
_tx: impl ExecutableTx<Self>,

View File

@@ -38,6 +38,7 @@ fn create_database_with_beacon_root_contract() -> CacheDB<EmptyDB> {
code_hash: keccak256(BEACON_ROOTS_CODE.clone()),
nonce: 1,
code: Some(Bytecode::new_raw(BEACON_ROOTS_CODE.clone())),
account_id: None,
};
db.insert_account_info(BEACON_ROOTS_ADDRESS, beacon_root_contract_account);
@@ -53,6 +54,7 @@ fn create_database_with_withdrawal_requests_contract() -> CacheDB<EmptyDB> {
balance: U256::ZERO,
code_hash: keccak256(WITHDRAWAL_REQUEST_PREDEPLOY_CODE.clone()),
code: Some(Bytecode::new_raw(WITHDRAWAL_REQUEST_PREDEPLOY_CODE.clone())),
account_id: None,
};
db.insert_account_info(
@@ -339,6 +341,7 @@ fn create_database_with_block_hashes(latest_block: u64) -> CacheDB<EmptyDB> {
code_hash: keccak256(HISTORY_STORAGE_CODE.clone()),
code: Some(Bytecode::new_raw(HISTORY_STORAGE_CODE.clone())),
nonce: 1,
account_id: None,
};
db.insert_account_info(HISTORY_STORAGE_ADDRESS, blockhashes_contract_account);

View File

@@ -303,6 +303,8 @@ where
let eth_config =
EthConfigHandler::new(ctx.node.provider().clone(), ctx.node.evm_config().clone());
let testing_skip_invalid_transactions = ctx.config.rpc.testing_skip_invalid_transactions;
self.inner
.launch_add_ons_with(ctx, move |container| {
container.modules.merge_if_module_configured(
@@ -316,14 +318,16 @@ where
// testing_buildBlockV1: only wire when the hidden testing module is explicitly
// requested on any transport. Default stays disabled to honor security guidance.
let testing_api = TestingApi::new(
let mut testing_api = TestingApi::new(
container.registry.eth_api().clone(),
container.registry.evm_config().clone(),
)
.into_rpc();
);
if testing_skip_invalid_transactions {
testing_api = testing_api.with_skip_invalid_transactions();
}
container
.modules
.merge_if_module_configured(RethRpcModule::Testing, testing_api)?;
.merge_if_module_configured(RethRpcModule::Testing, testing_api.into_rpc())?;
Ok(())
})

View File

@@ -153,7 +153,7 @@ async fn maintain_txpool_reorg() -> eyre::Result<()> {
w1.address(),
);
let pooled_tx1 = EthPooledTransaction::new(tx1.clone(), 200);
let tx_hash1 = *pooled_tx1.clone().hash();
let tx_hash1 = *pooled_tx1.hash();
// build tx2 from wallet2
let envelop2 = TransactionTestContext::transfer_tx(1, w2.clone()).await;
@@ -162,7 +162,7 @@ async fn maintain_txpool_reorg() -> eyre::Result<()> {
w2.address(),
);
let pooled_tx2 = EthPooledTransaction::new(tx2.clone(), 200);
let tx_hash2 = *pooled_tx2.clone().hash();
let tx_hash2 = *pooled_tx2.hash();
let block_info = BlockInfo {
block_gas_limit: ETHEREUM_BLOCK_GAS_LIMIT_30M,

View File

@@ -31,8 +31,8 @@ reth-payload-validator.workspace = true
# ethereum
alloy-rlp.workspace = true
revm.workspace = true
alloy-rpc-types-engine.workspace = true
revm.workspace = true
# alloy
alloy-eips.workspace = true

View File

@@ -155,7 +155,7 @@ where
let state_provider = client.state_by_block_hash(parent_header.hash())?;
let state = StateProviderDatabase::new(state_provider.as_ref());
let mut db =
State::builder().with_database_ref(cached_reads.as_db(state)).with_bundle_update().build();
State::builder().with_database(cached_reads.as_db_mut(state)).with_bundle_update().build();
let mut builder = evm_config
.builder_for_next_block(
@@ -247,7 +247,7 @@ where
limit: MAX_RLP_BLOCK_SIZE,
},
);
continue;
continue
}
// There's only limited amount of blob space available per block, so we need to check if

View File

@@ -236,7 +236,7 @@ impl reth_codecs::Compact for Transaction {
// # Panics
//
// A panic will be triggered if an identifier larger than 3 is passed from the database. For
// optimism a identifier with value [`DEPOSIT_TX_TYPE_ID`] is allowed.
// optimism an identifier with value [`DEPOSIT_TX_TYPE_ID`] is allowed.
fn from_compact(buf: &[u8], identifier: usize) -> (Self, &[u8]) {
let (tx_type, buf) = TxType::from_compact(buf, identifier);

View File

@@ -741,6 +741,7 @@ mod tests {
nonce,
code_hash: KECCAK_EMPTY,
code: None,
account_id: None,
};
state.insert_account(addr, account_info);
state
@@ -777,8 +778,13 @@ mod tests {
let mut state = setup_state_with_account(addr1, 100, 1);
let account2 =
AccountInfo { balance: U256::from(200), nonce: 1, code_hash: KECCAK_EMPTY, code: None };
let account2 = AccountInfo {
balance: U256::from(200),
nonce: 1,
code_hash: KECCAK_EMPTY,
code: None,
account_id: None,
};
state.insert_account(addr2, account2);
let mut increments = HashMap::default();
@@ -799,8 +805,13 @@ mod tests {
let mut state = setup_state_with_account(addr1, 100, 1);
let account2 =
AccountInfo { balance: U256::from(200), nonce: 1, code_hash: KECCAK_EMPTY, code: None };
let account2 = AccountInfo {
balance: U256::from(200),
nonce: 1,
code_hash: KECCAK_EMPTY,
code: None,
account_id: None,
};
state.insert_account(addr2, account2);
let mut increments = HashMap::default();

View File

@@ -399,7 +399,7 @@ pub trait ConfigureEvm: Clone + Debug + Send + Sync + Unpin {
/// // Complete block building
/// let outcome = builder.finish(state_provider)?;
/// ```
fn builder_for_next_block<'a, DB: Database>(
fn builder_for_next_block<'a, DB: Database + 'a>(
&'a self,
db: &'a mut State<DB>,
parent: &'a SealedHeader<<Self::Primitives as NodePrimitives>::BlockHeader>,

View File

@@ -1,7 +1,7 @@
//! Contains [Chain], a chain of blocks and their final state.
use crate::ExecutionOutcome;
use alloc::{borrow::Cow, collections::BTreeMap, sync::Arc, vec::Vec};
use alloc::{borrow::Cow, collections::BTreeMap, vec::Vec};
use alloy_consensus::{transaction::Recovered, BlockHeader};
use alloy_eips::{eip1898::ForkBlock, eip2718::Encodable2718, BlockNumHash};
use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash};
@@ -10,7 +10,7 @@ use reth_primitives_traits::{
transaction::signed::SignedTransaction, Block, BlockBody, IndexedTx, NodePrimitives,
RecoveredBlock, SealedHeader,
};
use reth_trie_common::{updates::TrieUpdatesSorted, HashedPostStateSorted};
use reth_trie_common::LazyTrieData;
/// A chain of blocks and their final state.
///
@@ -34,10 +34,10 @@ pub struct Chain<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives> {
///
/// Additionally, it includes the individual state changes that led to the current state.
execution_outcome: ExecutionOutcome<N::Receipt>,
/// State trie updates for each block in the chain, keyed by block number.
trie_updates: BTreeMap<BlockNumber, Arc<TrieUpdatesSorted>>,
/// Hashed post state for each block in the chain, keyed by block number.
hashed_state: BTreeMap<BlockNumber, Arc<HashedPostStateSorted>>,
/// Lazy trie data for each block in the chain, keyed by block number.
///
/// Contains handles to lazily-initialized sorted trie updates and hashed state.
trie_data: BTreeMap<BlockNumber, LazyTrieData>,
}
type ChainTxReceiptMeta<'a, N> = (
@@ -52,8 +52,7 @@ impl<N: NodePrimitives> Default for Chain<N> {
Self {
blocks: Default::default(),
execution_outcome: Default::default(),
trie_updates: Default::default(),
hashed_state: Default::default(),
trie_data: Default::default(),
}
}
}
@@ -67,27 +66,23 @@ impl<N: NodePrimitives> Chain<N> {
pub fn new(
blocks: impl IntoIterator<Item = RecoveredBlock<N::Block>>,
execution_outcome: ExecutionOutcome<N::Receipt>,
trie_updates: BTreeMap<BlockNumber, Arc<TrieUpdatesSorted>>,
hashed_state: BTreeMap<BlockNumber, Arc<HashedPostStateSorted>>,
trie_data: BTreeMap<BlockNumber, LazyTrieData>,
) -> Self {
let blocks =
blocks.into_iter().map(|b| (b.header().number(), b)).collect::<BTreeMap<_, _>>();
debug_assert!(!blocks.is_empty(), "Chain should have at least one block");
Self { blocks, execution_outcome, trie_updates, hashed_state }
Self { blocks, execution_outcome, trie_data }
}
/// Create new Chain from a single block and its state.
pub fn from_block(
block: RecoveredBlock<N::Block>,
execution_outcome: ExecutionOutcome<N::Receipt>,
trie_updates: Arc<TrieUpdatesSorted>,
hashed_state: Arc<HashedPostStateSorted>,
trie_data: LazyTrieData,
) -> Self {
let block_number = block.header().number();
let trie_updates_map = BTreeMap::from([(block_number, trie_updates)]);
let hashed_state_map = BTreeMap::from([(block_number, hashed_state)]);
Self::new([block], execution_outcome, trie_updates_map, hashed_state_map)
Self::new([block], execution_outcome, BTreeMap::from([(block_number, trie_data)]))
}
/// Get the blocks in this chain.
@@ -105,37 +100,19 @@ impl<N: NodePrimitives> Chain<N> {
self.blocks.values().map(|block| block.clone_sealed_header())
}
/// Get all trie updates for this chain.
pub const fn trie_updates(&self) -> &BTreeMap<BlockNumber, Arc<TrieUpdatesSorted>> {
&self.trie_updates
/// Get all trie data for this chain.
pub const fn trie_data(&self) -> &BTreeMap<BlockNumber, LazyTrieData> {
&self.trie_data
}
/// Get trie updates for a specific block number.
pub fn trie_updates_at(&self, block_number: BlockNumber) -> Option<&Arc<TrieUpdatesSorted>> {
self.trie_updates.get(&block_number)
/// Get trie data for a specific block number.
pub fn trie_data_at(&self, block_number: BlockNumber) -> Option<&LazyTrieData> {
self.trie_data.get(&block_number)
}
/// Remove all trie updates for this chain.
pub fn clear_trie_updates(&mut self) {
self.trie_updates.clear();
}
/// Get all hashed states for this chain.
pub const fn hashed_state(&self) -> &BTreeMap<BlockNumber, Arc<HashedPostStateSorted>> {
&self.hashed_state
}
/// Get hashed state for a specific block number.
pub fn hashed_state_at(
&self,
block_number: BlockNumber,
) -> Option<&Arc<HashedPostStateSorted>> {
self.hashed_state.get(&block_number)
}
/// Remove all hashed states for this chain.
pub fn clear_hashed_state(&mut self) {
self.hashed_state.clear();
/// Remove all trie data for this chain.
pub fn clear_trie_data(&mut self) {
self.trie_data.clear();
}
/// Get execution outcome of this chain
@@ -183,23 +160,16 @@ impl<N: NodePrimitives> Chain<N> {
/// Destructure the chain into its inner components:
/// 1. The blocks contained in the chain.
/// 2. The execution outcome representing the final state.
/// 3. The trie updates map.
/// 4. The hashed state map.
/// 3. The trie data map.
#[allow(clippy::type_complexity)]
pub fn into_inner(
self,
) -> (
ChainBlocks<'static, N::Block>,
ExecutionOutcome<N::Receipt>,
BTreeMap<BlockNumber, Arc<TrieUpdatesSorted>>,
BTreeMap<BlockNumber, Arc<HashedPostStateSorted>>,
BTreeMap<BlockNumber, LazyTrieData>,
) {
(
ChainBlocks { blocks: Cow::Owned(self.blocks) },
self.execution_outcome,
self.trie_updates,
self.hashed_state,
)
(ChainBlocks { blocks: Cow::Owned(self.blocks) }, self.execution_outcome, self.trie_data)
}
/// Destructure the chain into its inner components:
@@ -329,14 +299,12 @@ impl<N: NodePrimitives> Chain<N> {
&mut self,
block: RecoveredBlock<N::Block>,
execution_outcome: ExecutionOutcome<N::Receipt>,
trie_updates: Arc<TrieUpdatesSorted>,
hashed_state: Arc<HashedPostStateSorted>,
trie_data: LazyTrieData,
) {
let block_number = block.header().number();
self.blocks.insert(block_number, block);
self.execution_outcome.extend(execution_outcome);
self.trie_updates.insert(block_number, trie_updates);
self.hashed_state.insert(block_number, hashed_state);
self.trie_data.insert(block_number, trie_data);
}
/// Merge two chains by appending the given chain into the current one.
@@ -355,8 +323,7 @@ impl<N: NodePrimitives> Chain<N> {
// Insert blocks from other chain
self.blocks.extend(other.blocks);
self.execution_outcome.extend(other.execution_outcome);
self.trie_updates.extend(other.trie_updates);
self.hashed_state.extend(other.hashed_state);
self.trie_data.extend(other.trie_data);
Ok(())
}
@@ -583,14 +550,14 @@ pub(super) mod serde_bincode_compat {
execution_outcome: value.execution_outcome.as_repr(),
_trie_updates_legacy: None,
trie_updates: value
.trie_updates
.trie_data
.iter()
.map(|(k, v)| (*k, v.as_ref().into()))
.map(|(k, v)| (*k, v.get().trie_updates.as_ref().into()))
.collect(),
hashed_state: value
.hashed_state
.trie_data
.iter()
.map(|(k, v)| (*k, v.as_ref().into()))
.map(|(k, v)| (*k, v.get().hashed_state.as_ref().into()))
.collect(),
}
}
@@ -603,19 +570,24 @@ pub(super) mod serde_bincode_compat {
>,
{
fn from(value: Chain<'a, N>) -> Self {
use reth_trie_common::LazyTrieData;
let hashed_state_map: BTreeMap<_, _> =
value.hashed_state.into_iter().map(|(k, v)| (k, Arc::new(v.into()))).collect();
let trie_data: BTreeMap<BlockNumber, LazyTrieData> = value
.trie_updates
.into_iter()
.map(|(k, v)| {
let hashed_state = hashed_state_map.get(&k).cloned().unwrap_or_default();
(k, LazyTrieData::ready(hashed_state, Arc::new(v.into())))
})
.collect();
Self {
blocks: value.blocks.0.into_owned(),
execution_outcome: ExecutionOutcome::from_repr(value.execution_outcome),
trie_updates: value
.trie_updates
.into_iter()
.map(|(k, v)| (k, Arc::new(v.into())))
.collect(),
hashed_state: value
.hashed_state
.into_iter()
.map(|(k, v)| (k, Arc::new(v.into())))
.collect(),
trie_data,
}
}
}
@@ -676,7 +648,6 @@ pub(super) mod serde_bincode_compat {
.unwrap()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
),
};
@@ -776,12 +747,8 @@ mod tests {
let mut block_state_extended = execution_outcome1;
block_state_extended.extend(execution_outcome2);
let chain: Chain = Chain::new(
vec![block1.clone(), block2.clone()],
block_state_extended,
BTreeMap::new(),
BTreeMap::new(),
);
let chain: Chain =
Chain::new(vec![block1.clone(), block2.clone()], block_state_extended, BTreeMap::new());
// return tip state
assert_eq!(

View File

@@ -1,3 +1,5 @@
use alloy_primitives::{Address, B256, U256};
use reth_primitives_traits::{Account, Bytecode};
use revm::database::BundleState;
pub use alloy_evm::block::BlockExecutionResult;
@@ -23,3 +25,36 @@ pub struct BlockExecutionOutput<T> {
/// The changed state of the block after execution.
pub state: BundleState,
}
impl<T> BlockExecutionOutput<T> {
/// Return bytecode if known.
pub fn bytecode(&self, code_hash: &B256) -> Option<Bytecode> {
self.state.bytecode(code_hash).map(Bytecode)
}
/// Get account if account is known.
pub fn account(&self, address: &Address) -> Option<Option<Account>> {
self.state.account(address).map(|a| a.info.as_ref().map(Into::into))
}
/// Get storage if value is known.
///
/// This means that depending on status we can potentially return `U256::ZERO`.
pub fn storage(&self, address: &Address, storage_key: U256) -> Option<U256> {
self.state.account(address).and_then(|a| a.storage_slot(storage_key))
}
}
impl<T> Default for BlockExecutionOutput<T> {
fn default() -> Self {
Self {
result: BlockExecutionResult {
receipts: Default::default(),
requests: Default::default(),
gas_used: 0,
blob_gas_used: 0,
},
state: Default::default(),
}
}
}

View File

@@ -249,6 +249,14 @@ impl<T> ExecutionOutcome<T> {
&self.receipts[index]
}
/// Returns an iterator over receipt slices, one per block.
///
/// This is a more ergonomic alternative to `receipts()` that yields slices
/// instead of requiring indexing into a nested `Vec<Vec<T>>`.
pub fn receipts_iter(&self) -> impl Iterator<Item = &[T]> + '_ {
self.receipts.iter().map(|v| v.as_slice())
}
/// Is execution outcome empty.
pub const fn is_empty(&self) -> bool {
self.len() == 0
@@ -934,10 +942,20 @@ mod tests {
let address3 = Address::random();
// Set up account info with some changes
let account_info1 =
AccountInfo { nonce: 1, balance: U256::from(100), code_hash: B256::ZERO, code: None };
let account_info2 =
AccountInfo { nonce: 2, balance: U256::from(200), code_hash: B256::ZERO, code: None };
let account_info1 = AccountInfo {
nonce: 1,
balance: U256::from(100),
code_hash: B256::ZERO,
code: None,
account_id: None,
};
let account_info2 = AccountInfo {
nonce: 2,
balance: U256::from(200),
code_hash: B256::ZERO,
code: None,
account_id: None,
};
// Set up the bundle state with these accounts
let mut bundle_state = BundleState::default();

View File

@@ -149,7 +149,7 @@ where
executor.into_state().take_bundle(),
results,
);
let chain = Chain::new(blocks, outcome, BTreeMap::new(), BTreeMap::new());
let chain = Chain::new(blocks, outcome, BTreeMap::new());
Ok(chain)
}
}

View File

@@ -503,6 +503,7 @@ where
}
break
}
let buffer_full = this.buffer.len() >= this.max_capacity;
// Update capacity
this.update_capacity();
@@ -536,6 +537,12 @@ where
// Update capacity
this.update_capacity();
// If the buffer was full and we made space, we need to wake up to accept new notifications
if buffer_full && this.buffer.len() < this.max_capacity {
debug!(target: "exex::manager", "Buffer has space again, waking up senders");
cx.waker().wake_by_ref();
}
// Update watch channel block number
let finished_height = this.exex_handles.iter_mut().try_fold(u64::MAX, |curr, exex| {
exex.finished_height.map_or(Err(()), |height| Ok(height.number.min(curr)))
@@ -687,7 +694,6 @@ mod tests {
BlockWriter, Chain, DBProvider, DatabaseProviderFactory, TransactionVariant,
};
use reth_testing_utils::generators::{self, random_block, BlockParams};
use std::collections::BTreeMap;
fn empty_finalized_header_stream() -> ForkChoiceStream<SealedHeader> {
let (tx, rx) = watch::channel(None);
@@ -789,12 +795,7 @@ mod tests {
block1.set_block_number(10);
let notification1 = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![block1.clone()],
Default::default(),
Default::default(),
Default::default(),
)),
new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
};
// Push the first notification
@@ -812,12 +813,7 @@ mod tests {
block2.set_block_number(20);
let notification2 = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![block2.clone()],
Default::default(),
Default::default(),
Default::default(),
)),
new: Arc::new(Chain::new(vec![block2.clone()], Default::default(), Default::default())),
};
exex_manager.push_notification(notification2.clone());
@@ -860,12 +856,7 @@ mod tests {
block1.set_block_number(10);
let notification1 = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![block1.clone()],
Default::default(),
Default::default(),
Default::default(),
)),
new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
};
exex_manager.push_notification(notification1.clone());
@@ -1093,7 +1084,6 @@ mod tests {
vec![Default::default()],
Default::default(),
Default::default(),
Default::default(),
)),
};
@@ -1164,7 +1154,6 @@ mod tests {
vec![Default::default()],
Default::default(),
Default::default(),
Default::default(),
)),
};
@@ -1209,12 +1198,7 @@ mod tests {
block1.set_block_number(10);
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![block1.clone()],
Default::default(),
Default::default(),
Default::default(),
)),
new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
};
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
@@ -1363,17 +1347,11 @@ mod tests {
new: Arc::new(Chain::new(
vec![genesis_block.clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
Default::default(),
)),
};
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![block.clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
new: Arc::new(Chain::new(vec![block.clone()], Default::default(), Default::default())),
};
let (finalized_headers_tx, rx) = watch::channel(None);
@@ -1443,4 +1421,78 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_deadlock_manager_wakes_after_buffer_clears() {
// This test simulates the scenario where the buffer fills up, ingestion pauses,
// and then space clears. We verify the manager wakes up to process pending items.
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let provider_factory = create_test_provider_factory();
init_genesis(&provider_factory).unwrap();
let provider = BlockchainProvider::new(provider_factory.clone()).unwrap();
// 1. Setup Manager with Capacity = 1
let (exex_handle, _, mut notifications) = ExExHandle::new(
"test_exex".to_string(),
Default::default(),
provider,
EthEvmConfig::mainnet(),
wal.handle(),
);
let max_capacity = 2;
let exex_manager = ExExManager::new(
provider_factory,
vec![exex_handle],
max_capacity,
wal,
empty_finalized_header_stream(),
);
let manager_handle = exex_manager.handle();
// Spawn manager in background so it runs continuously
tokio::spawn(async move {
exex_manager.await.ok();
});
// Helper to create notifications
let mut rng = generators::rng();
let mut make_notif = |id: u64| {
let block = random_block(&mut rng, id, BlockParams::default()).try_recover().unwrap();
ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block], Default::default(), Default::default())),
}
};
manager_handle.send(ExExNotificationSource::Pipeline, make_notif(1)).unwrap();
// Send the "Stuck" Item (Notification #100).
// At this point, the Manager loop has skipped the ingestion logic because buffer is full
// (buffer_full=true). This item sits in the unbounded 'handle_rx' channel waiting.
manager_handle.send(ExExNotificationSource::Pipeline, make_notif(100)).unwrap();
// 3. Relieve Pressure
// We consume items from the ExEx.
// As we pull items out, the ExEx frees space -> Manager sends buffered item -> Manager
// frees space. Once Manager frees space, the FIX (wake_by_ref) should trigger,
// causing it to read Notif #100.
// Consume the jam
let _ = notifications.next().await.unwrap();
// 4. Assert No Deadlock
// We expect Notification #100 next.
// If the wake_by_ref fix is missing, this will Time Out because the manager is sleeping
// despite having empty buffer.
let result =
tokio::time::timeout(std::time::Duration::from_secs(1), notifications.next()).await;
assert!(
result.is_ok(),
"Deadlock detected! Manager failed to wake up and process Pending Item #100."
);
}
}

View File

@@ -501,7 +501,6 @@ mod tests {
.try_recover()?],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
};
@@ -570,7 +569,6 @@ mod tests {
.try_recover()?],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
};
@@ -638,7 +636,6 @@ mod tests {
vec![exex_head_block.clone().try_recover()?],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
};
wal.commit(&exex_head_notification)?;
@@ -653,7 +650,6 @@ mod tests {
.try_recover()?],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
};
@@ -711,7 +707,6 @@ mod tests {
vec![exex_head_block.clone().try_recover()?],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
};
wal.commit(&exex_head_notification)?;
@@ -731,7 +726,6 @@ mod tests {
.try_recover()?],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
};

View File

@@ -304,37 +304,24 @@ mod tests {
vec![blocks[0].clone(), blocks[1].clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
};
let reverted_notification = ExExNotification::ChainReverted {
old: Arc::new(Chain::new(
vec![blocks[1].clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
old: Arc::new(Chain::new(vec![blocks[1].clone()], Default::default(), BTreeMap::new())),
};
let committed_notification_2 = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![block_1_reorged.clone(), blocks[2].clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
};
let reorged_notification = ExExNotification::ChainReorged {
old: Arc::new(Chain::new(
vec![blocks[2].clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
old: Arc::new(Chain::new(vec![blocks[2].clone()], Default::default(), BTreeMap::new())),
new: Arc::new(Chain::new(
vec![block_2_reorged.clone(), blocks[3].clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
};

View File

@@ -189,7 +189,7 @@ mod tests {
use reth_testing_utils::generators::{self, random_block};
use reth_trie_common::{
updates::{StorageTrieUpdates, TrieUpdates},
BranchNodeCompact, HashedPostState, HashedStorage, Nibbles,
BranchNodeCompact, HashedPostState, HashedStorage, LazyTrieData, Nibbles,
};
use std::{collections::BTreeMap, fs::File, sync::Arc};
@@ -241,18 +241,8 @@ mod tests {
let new_block = random_block(&mut rng, 0, Default::default()).try_recover()?;
let notification = ExExNotification::ChainReorged {
new: Arc::new(Chain::new(
vec![new_block],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
old: Arc::new(Chain::new(
vec![old_block],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
new: Arc::new(Chain::new(vec![new_block], Default::default(), BTreeMap::new())),
old: Arc::new(Chain::new(vec![old_block], Default::default(), BTreeMap::new())),
};
// Do a round trip serialization and deserialization
@@ -346,13 +336,17 @@ mod tests {
)]),
};
let trie_data = LazyTrieData::ready(
Arc::new(hashed_state.into_sorted()),
Arc::new(trie_updates.into_sorted()),
);
let notification: ExExNotification<reth_ethereum_primitives::EthPrimitives> =
ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![block],
Default::default(),
BTreeMap::from([(block_number, Arc::new(trie_updates.into_sorted()))]),
BTreeMap::from([(block_number, Arc::new(hashed_state.into_sorted()))]),
BTreeMap::from([(block_number, trie_data)]),
)),
};
Ok(notification)

View File

@@ -223,14 +223,12 @@ pub(super) mod serde_bincode_compat {
.unwrap()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
new: Arc::new(Chain::new(
vec![RecoveredBlock::arbitrary(&mut arbitrary::Unstructured::new(&bytes))
.unwrap()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
},
};

View File

@@ -106,7 +106,7 @@ impl BanList {
self.banned_ips.contains_key(ip)
}
/// checks the ban list to see if it contains the given ip
/// checks the ban list to see if it contains the given peer
#[inline]
pub fn is_banned_peer(&self, peer_id: &PeerId) -> bool {
self.banned_peers.contains_key(peer_id)
@@ -117,7 +117,7 @@ impl BanList {
self.banned_ips.remove(ip);
}
/// Unbans the ip address
/// Unbans the peer
pub fn unban_peer(&mut self, peer_id: &PeerId) {
self.banned_peers.remove(peer_id);
}

View File

@@ -54,6 +54,7 @@ reth-tasks.workspace = true
reth-tokio-util.workspace = true
reth-tracing.workspace = true
reth-transaction-pool.workspace = true
reth-trie-db = { workspace = true, features = ["metrics"] }
reth-basic-payload-builder.workspace = true
reth-node-ethstats.workspace = true
@@ -115,6 +116,7 @@ test-utils = [
"reth-db-api/test-utils",
"reth-provider/test-utils",
"reth-transaction-pool/test-utils",
"reth-trie-db/test-utils",
"reth-evm-ethereum/test-utils",
"reth-node-ethereum/test-utils",
"reth-primitives-traits/test-utils",

View File

@@ -84,6 +84,7 @@ use reth_tracing::{
tracing::{debug, error, info, warn},
};
use reth_transaction_pool::TransactionPool;
use reth_trie_db::ChangesetCache;
use std::{sync::Arc, thread::available_parallelism, time::Duration};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedSender},
@@ -470,7 +471,10 @@ where
/// Returns the [`ProviderFactory`] for the attached storage after executing a consistent check
/// between the database and static files. **It may execute a pipeline unwind if it fails this
/// check.**
pub async fn create_provider_factory<N, Evm>(&self) -> eyre::Result<ProviderFactory<N>>
pub async fn create_provider_factory<N, Evm>(
&self,
changeset_cache: ChangesetCache,
) -> eyre::Result<ProviderFactory<N>>
where
N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
@@ -500,7 +504,8 @@ where
static_file_provider,
rocksdb_provider,
)?
.with_prune_modes(self.prune_modes());
.with_prune_modes(self.prune_modes())
.with_changeset_cache(changeset_cache);
// Keep MDBX, static files, and RocksDB aligned. If any check fails, unwind to the
// earliest consistent block.
@@ -593,12 +598,13 @@ where
/// Creates a new [`ProviderFactory`] and attaches it to the launch context.
pub async fn with_provider_factory<N, Evm>(
self,
changeset_cache: ChangesetCache,
) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
where
N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
{
let factory = self.create_provider_factory::<N, Evm>().await?;
let factory = self.create_provider_factory::<N, Evm>(changeset_cache).await?;
let ctx = LaunchContextWith {
inner: self.inner,
attachment: self.attachment.map_right(|_| factory),

View File

@@ -37,6 +37,7 @@ use reth_provider::{
use reth_tasks::TaskExecutor;
use reth_tokio_util::EventSender;
use reth_tracing::tracing::{debug, error, info};
use reth_trie_db::ChangesetCache;
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::sync::{mpsc::unbounded_channel, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
@@ -87,6 +88,9 @@ impl EngineNodeLauncher {
} = target;
let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
// Create changeset cache that will be shared across the engine
let changeset_cache = ChangesetCache::new();
// setup the launch context
let ctx = ctx
.with_configured_globals(engine_tree_config.reserved_cpu_cores())
@@ -98,8 +102,8 @@ impl EngineNodeLauncher {
.attach(database.clone())
// ensure certain settings take effect
.with_adjusted_configs()
// Create the provider factory
.with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>().await?
// Create the provider factory with changeset cache
.with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>(changeset_cache.clone()).await?
.inspect(|ctx| {
info!(target: "reth::cli", "Database opened");
match ctx.provider_factory().storage_settings() {
@@ -204,7 +208,7 @@ impl EngineNodeLauncher {
// Build the engine validator with all required components
let engine_validator = validator_builder
.clone()
.build_tree_validator(&add_ons_ctx, engine_tree_config.clone())
.build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), changeset_cache.clone())
.await?;
// Create the consensus engine stream with optional reorg
@@ -214,7 +218,13 @@ impl EngineNodeLauncher {
.maybe_reorg(
ctx.blockchain_db().clone(),
ctx.components().evm_config().clone(),
|| validator_builder.build_tree_validator(&add_ons_ctx, engine_tree_config.clone()),
|| async {
// Create a separate cache for reorg validator (not shared with main engine)
let reorg_cache = ChangesetCache::new();
validator_builder
.build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), reorg_cache)
.await
},
node_config.debug.reorg_frequency,
node_config.debug.reorg_depth,
)
@@ -239,6 +249,7 @@ impl EngineNodeLauncher {
engine_tree_config,
ctx.sync_metrics_tx(),
ctx.components().evm_config().clone(),
changeset_cache,
);
info!(target: "reth::cli", "Consensus engine initialized");

View File

@@ -3,6 +3,7 @@
pub use jsonrpsee::server::middleware::rpc::{RpcService, RpcServiceBuilder};
pub use reth_engine_tree::tree::{BasicEngineValidator, EngineValidator};
pub use reth_rpc_builder::{middleware::RethRpcMiddleware, Identity, Stack};
pub use reth_trie_db::ChangesetCache;
use crate::{
invalid_block_hook::InvalidBlockHookExt, ConfigureEngineEvm, ConsensusEngineEvent,
@@ -1288,6 +1289,7 @@ pub trait EngineValidatorBuilder<Node: FullNodeComponents>: Send + Sync + Clone
self,
ctx: &AddOnsContext<'_, Node>,
tree_config: TreeConfig,
changeset_cache: ChangesetCache,
) -> impl Future<Output = eyre::Result<Self::EngineValidator>> + Send;
}
@@ -1335,10 +1337,12 @@ where
self,
ctx: &AddOnsContext<'_, Node>,
tree_config: TreeConfig,
changeset_cache: ChangesetCache,
) -> eyre::Result<Self::EngineValidator> {
let validator = self.payload_validator_builder.build(ctx).await?;
let data_dir = ctx.config.datadir.clone().resolve_datadir(ctx.config.chain.chain());
let invalid_block_hook = ctx.create_invalid_block_hook(&data_dir).await?;
Ok(BasicEngineValidator::new(
ctx.node.provider().clone(),
std::sync::Arc::new(ctx.node.consensus().clone()),
@@ -1346,6 +1350,7 @@ where
validator,
tree_config,
invalid_block_hook,
changeset_cache,
))
}
}

View File

@@ -81,7 +81,8 @@ tokio.workspace = true
jemalloc = ["reth-cli-util/jemalloc"]
asm-keccak = ["alloy-primitives/asm-keccak"]
keccak-cache-global = ["alloy-primitives/keccak-cache-global"]
otlp = ["reth-tracing/otlp"]
otlp = ["reth-tracing/otlp", "reth-tracing-otlp/otlp"]
otlp-logs = ["reth-tracing/otlp-logs", "reth-tracing-otlp/otlp-logs"]
tracy = ["reth-tracing/tracy"]
min-error-logs = ["tracing/release_max_level_error"]

View File

@@ -26,7 +26,7 @@ pub use log::{ColorMode, LogArgs, Verbosity};
/// `TraceArgs` for tracing and spans support
mod trace;
pub use trace::{OtlpInitStatus, TraceArgs};
pub use trace::{OtlpInitStatus, OtlpLogsStatus, TraceArgs};
/// `MetricArgs` to configure metrics.
mod metric;

View File

@@ -5,10 +5,7 @@ use alloy_primitives::{Address, BlockNumber};
use clap::{builder::RangedU64ValueParser, Args};
use reth_chainspec::EthereumHardforks;
use reth_config::config::PruneConfig;
use reth_prune_types::{
PruneMode, PruneModes, ReceiptsLogPruneConfig, MERKLE_CHANGESETS_RETENTION_BLOCKS,
MINIMUM_PRUNING_DISTANCE,
};
use reth_prune_types::{PruneMode, PruneModes, ReceiptsLogPruneConfig, MINIMUM_PRUNING_DISTANCE};
use std::{collections::BTreeMap, ops::Not};
/// Parameters for pruning and full node
@@ -143,7 +140,6 @@ impl PruningArgs {
.ethereum_fork_activation(EthereumHardfork::Paris)
.block_number()
.map(PruneMode::Before),
merkle_changesets: PruneMode::Distance(MERKLE_CHANGESETS_RETENTION_BLOCKS),
receipts_log_filter: Default::default(),
},
}
@@ -160,7 +156,6 @@ impl PruningArgs {
account_history: Some(PruneMode::Distance(10064)),
storage_history: Some(PruneMode::Distance(10064)),
bodies_history: Some(PruneMode::Distance(10064)),
merkle_changesets: PruneMode::Distance(MERKLE_CHANGESETS_RETENTION_BLOCKS),
receipts_log_filter: Default::default(),
},
}

View File

@@ -640,6 +640,13 @@ pub struct RpcServerArgs {
value_parser = parse_duration_from_secs_or_ms,
)]
pub rpc_send_raw_transaction_sync_timeout: Duration,
/// Skip invalid transactions in `testing_buildBlockV1` instead of failing.
///
/// When enabled, transactions that fail execution will be skipped, and all subsequent
/// transactions from the same sender will also be skipped.
#[arg(long = "testing.skip-invalid-transactions", default_value_t = false)]
pub testing_skip_invalid_transactions: bool,
}
impl RpcServerArgs {
@@ -852,6 +859,7 @@ impl Default for RpcServerArgs {
rpc_state_cache,
gas_price_oracle,
rpc_send_raw_transaction_sync_timeout,
testing_skip_invalid_transactions: false,
}
}
}
@@ -1026,6 +1034,7 @@ mod tests {
default_suggested_fee: None,
},
rpc_send_raw_transaction_sync_timeout: std::time::Duration::from_secs(30),
testing_skip_invalid_transactions: true,
};
let parsed_args = CommandParser::<RpcServerArgs>::parse_from([
@@ -1114,6 +1123,7 @@ mod tests {
"60",
"--rpc.send-raw-transaction-sync-timeout",
"30s",
"--testing.skip-invalid-transactions",
])
.args;

View File

@@ -38,11 +38,6 @@ pub enum StageEnum {
///
/// Handles Merkle tree-related computations and data processing.
Merkle,
/// The merkle changesets stage within the pipeline.
///
/// Handles Merkle trie changesets for storage and accounts.
#[value(name = "merkle-changesets")]
MerkleChangeSets,
/// The transaction lookup stage within the pipeline.
///
/// Deals with the retrieval and processing of transactions.

View File

@@ -1,4 +1,4 @@
//! Opentelemetry tracing configuration through CLI args.
//! Opentelemetry tracing and logging configuration through CLI args.
use clap::Parser;
use eyre::WrapErr;
@@ -6,7 +6,7 @@ use reth_tracing::{tracing_subscriber::EnvFilter, Layers};
use reth_tracing_otlp::OtlpProtocol;
use url::Url;
/// CLI arguments for configuring `Opentelemetry` trace and span export.
/// CLI arguments for configuring `Opentelemetry` trace and logs export.
#[derive(Debug, Clone, Parser)]
pub struct TraceArgs {
/// Enable `Opentelemetry` tracing export to an OTLP endpoint.
@@ -30,9 +30,29 @@ pub struct TraceArgs {
)]
pub otlp: Option<Url>,
/// OTLP transport protocol to use for exporting traces.
/// Enable `Opentelemetry` logs export to an OTLP endpoint.
///
/// - `http`: expects endpoint path to end with `/v1/traces`
/// If no value provided, defaults based on protocol:
/// - HTTP: `http://localhost:4318/v1/logs`
/// - gRPC: `http://localhost:4317`
///
/// Example: --logs-otlp=http://collector:4318/v1/logs
#[arg(
long = "logs-otlp",
env = "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT",
global = true,
value_name = "URL",
num_args = 0..=1,
default_missing_value = "http://localhost:4318/v1/logs",
require_equals = true,
value_parser = parse_otlp_endpoint,
help_heading = "Logging"
)]
pub logs_otlp: Option<Url>,
/// OTLP transport protocol to use for exporting traces and logs.
///
/// - `http`: expects endpoint path to end with `/v1/traces` or `/v1/logs`
/// - `grpc`: expects endpoint without a path
///
/// Defaults to HTTP if not specified.
@@ -62,6 +82,22 @@ pub struct TraceArgs {
)]
pub otlp_filter: EnvFilter,
/// Set a filter directive for the OTLP logs exporter. This controls the verbosity
/// of logs sent to the OTLP endpoint. It follows the same syntax as the
/// `RUST_LOG` environment variable.
///
/// Example: --logs-otlp.filter=info,reth=debug
///
/// Defaults to INFO if not specified.
#[arg(
long = "logs-otlp.filter",
global = true,
value_name = "FILTER",
default_value = "info",
help_heading = "Logging"
)]
pub logs_otlp_filter: EnvFilter,
/// Service name to use for OTLP tracing export.
///
/// This name will be used to identify the service in distributed tracing systems
@@ -101,8 +137,10 @@ impl Default for TraceArgs {
fn default() -> Self {
Self {
otlp: None,
logs_otlp: None,
protocol: OtlpProtocol::Http,
otlp_filter: EnvFilter::from_default_env(),
logs_otlp_filter: EnvFilter::try_new("info").expect("valid filter"),
sample_ratio: None,
service_name: "reth".to_string(),
}
@@ -150,6 +188,37 @@ impl TraceArgs {
Ok(OtlpInitStatus::Disabled)
}
}
/// Initialize OTLP logs export with the given layers.
///
/// This method handles OTLP logs initialization based on the configured options,
/// including validation and protocol selection.
///
/// Returns the initialization status to allow callers to log appropriate messages.
pub async fn init_otlp_logs(&mut self, _layers: &mut Layers) -> eyre::Result<OtlpLogsStatus> {
if let Some(endpoint) = self.logs_otlp.as_mut() {
self.protocol.validate_logs_endpoint(endpoint)?;
#[cfg(feature = "otlp-logs")]
{
let config = reth_tracing_otlp::OtlpLogsConfig::new(
self.service_name.clone(),
endpoint.clone(),
self.protocol,
)?;
_layers.with_log_layer(config.clone(), self.logs_otlp_filter.clone())?;
Ok(OtlpLogsStatus::Started(config.endpoint().clone()))
}
#[cfg(not(feature = "otlp-logs"))]
{
Ok(OtlpLogsStatus::NoFeature)
}
} else {
Ok(OtlpLogsStatus::Disabled)
}
}
}
/// Status of OTLP tracing initialization.
@@ -163,6 +232,17 @@ pub enum OtlpInitStatus {
NoFeature,
}
/// Status of OTLP logs initialization.
#[derive(Debug)]
pub enum OtlpLogsStatus {
/// OTLP logs export was successfully started with the given endpoint.
Started(Url),
/// OTLP logs export is disabled (no endpoint configured).
Disabled,
/// OTLP logs arguments provided but feature is not compiled.
NoFeature,
}
// Parses an OTLP endpoint url.
fn parse_otlp_endpoint(arg: &str) -> eyre::Result<Url> {
Url::parse(arg).wrap_err("Invalid URL for OTLP trace output")

View File

@@ -43,6 +43,7 @@ tracy = ["reth-optimism-cli/tracy"]
asm-keccak = ["reth-optimism-cli/asm-keccak", "reth-optimism-node/asm-keccak"]
keccak-cache-global = [
"reth-optimism-cli/keccak-cache-global",
"reth-optimism-node/keccak-cache-global",
]
dev = [

View File

@@ -76,8 +76,9 @@ reth-optimism-chainspec = { workspace = true, features = ["std", "superchain-con
[features]
default = []
# Opentelemtry feature to activate metrics export
# Opentelemetry feature to activate tracing and logs export
otlp = ["reth-tracing/otlp", "reth-node-core/otlp"]
otlp-logs = ["reth-tracing/otlp-logs", "reth-node-core/otlp-logs"]
asm-keccak = [
"alloy-primitives/asm-keccak",
@@ -85,6 +86,12 @@ asm-keccak = [
"reth-optimism-node/asm-keccak",
]
keccak-cache-global = [
"alloy-primitives/keccak-cache-global",
"reth-node-core/keccak-cache-global",
"reth-optimism-node/keccak-cache-global",
]
# Jemalloc feature for vergen to generate correct env vars
jemalloc = [
"reth-node-core/jemalloc",

View File

@@ -3,7 +3,7 @@ use eyre::{eyre, Result};
use reth_cli::chainspec::ChainSpecParser;
use reth_cli_commands::launcher::Launcher;
use reth_cli_runner::CliRunner;
use reth_node_core::args::OtlpInitStatus;
use reth_node_core::args::{OtlpInitStatus, OtlpLogsStatus};
use reth_node_metrics::recorder::install_prometheus_recorder;
use reth_optimism_chainspec::OpChainSpec;
use reth_optimism_consensus::OpBeaconConsensus;
@@ -124,9 +124,11 @@ where
let mut layers = self.layers.take().unwrap_or_default();
let otlp_status = runner.block_on(self.cli.traces.init_otlp_tracing(&mut layers))?;
let otlp_logs_status = runner.block_on(self.cli.traces.init_otlp_logs(&mut layers))?;
self.guard = self.cli.logs.init_tracing_with_layers(layers)?;
info!(target: "reth::cli", "Initialized tracing, debug log directory: {}", self.cli.logs.log_file_directory);
match otlp_status {
OtlpInitStatus::Started(endpoint) => {
info!(target: "reth::cli", "Started OTLP {:?} tracing export to {endpoint}", self.cli.traces.protocol);
@@ -136,6 +138,16 @@ where
}
OtlpInitStatus::Disabled => {}
}
match otlp_logs_status {
OtlpLogsStatus::Started(endpoint) => {
info!(target: "reth::cli", "Started OTLP {:?} logs export to {endpoint}", self.cli.traces.protocol);
}
OtlpLogsStatus::NoFeature => {
warn!(target: "reth::cli", "Provided OTLP logs arguments do not have effect, compile with the `otlp-logs` feature")
}
OtlpLogsStatus::Disabled => {}
}
}
Ok(())
}

View File

@@ -18,7 +18,7 @@ use reth_optimism_primitives::{bedrock::is_dup_tx, OpPrimitives, OpReceipt};
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::ProviderNodeTypes, DBProvider, DatabaseProviderFactory, OriginalValuesKnown,
ProviderFactory, StageCheckpointReader, StageCheckpointWriter, StateWriter,
ProviderFactory, StageCheckpointReader, StageCheckpointWriter, StateWriteConfig, StateWriter,
StaticFileProviderFactory, StatsReader,
};
use reth_stages::{StageCheckpoint, StageId};
@@ -228,7 +228,11 @@ where
ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default());
// finally, write the receipts
provider.write_state(&execution_outcome, OriginalValuesKnown::Yes)?;
provider.write_state(
&execution_outcome,
OriginalValuesKnown::Yes,
StateWriteConfig::default(),
)?;
}
// Only commit if we have imported as many receipts as the number of transactions.

View File

@@ -18,7 +18,7 @@ use alloy_consensus::{
use alloy_primitives::B64;
use core::fmt::Debug;
use reth_chainspec::EthChainSpec;
use reth_consensus::{Consensus, ConsensusError, FullConsensus, HeaderValidator};
use reth_consensus::{Consensus, ConsensusError, FullConsensus, HeaderValidator, ReceiptRootBloom};
use reth_consensus_common::validation::{
validate_against_parent_eip1559_base_fee, validate_against_parent_hash_number,
validate_against_parent_timestamp, validate_cancun_gas, validate_header_base_fee,
@@ -79,8 +79,9 @@ where
&self,
block: &RecoveredBlock<N::Block>,
result: &BlockExecutionResult<N::Receipt>,
receipt_root_bloom: Option<ReceiptRootBloom>,
) -> Result<(), ConsensusError> {
validate_block_post_execution(block.header(), &self.chain_spec, result)
validate_block_post_execution(block.header(), &self.chain_spec, result, receipt_root_bloom)
}
}
@@ -410,7 +411,8 @@ mod tests {
let post_execution = <OpBeaconConsensus<OpChainSpec> as FullConsensus<OpPrimitives>>::validate_block_post_execution(
&beacon_consensus,
&block,
&result
&result,
None,
);
// validate blob, it should pass blob gas used validation
@@ -479,7 +481,8 @@ mod tests {
let post_execution = <OpBeaconConsensus<OpChainSpec> as FullConsensus<OpPrimitives>>::validate_block_post_execution(
&beacon_consensus,
&block,
&result
&result,
None,
);
// validate blob, it should fail blob gas used validation post execution.

View File

@@ -85,10 +85,14 @@ where
///
/// - Compares the receipts root in the block header to the block body
/// - Compares the gas used in the block header to the actual gas usage after execution
///
/// If `receipt_root_bloom` is provided, the pre-computed receipt root and logs bloom are used
/// instead of computing them from the receipts.
pub fn validate_block_post_execution<R: DepositReceipt>(
header: impl BlockHeader,
chain_spec: impl OpHardforks,
result: &BlockExecutionResult<R>,
receipt_root_bloom: Option<(B256, Bloom)>,
) -> Result<(), ConsensusError> {
// Validate that the blob gas used is present and correctly computed if Jovian is active.
if chain_spec.is_jovian_active_at_timestamp(header.timestamp()) {
@@ -110,21 +114,32 @@ pub fn validate_block_post_execution<R: DepositReceipt>(
// operation as hashing that is required for state root got calculated in every
// transaction This was replaced with is_success flag.
// See more about EIP here: https://eips.ethereum.org/EIPS/eip-658
if chain_spec.is_byzantium_active_at_block(header.number()) &&
let Err(error) = verify_receipts_optimism(
header.receipts_root(),
header.logs_bloom(),
receipts,
chain_spec,
header.timestamp(),
)
{
let receipts = receipts
.iter()
.map(|r| Bytes::from(r.with_bloom_ref().encoded_2718()))
.collect::<Vec<_>>();
tracing::debug!(%error, ?receipts, "receipts verification failed");
return Err(error)
if chain_spec.is_byzantium_active_at_block(header.number()) {
let result = if let Some((receipts_root, logs_bloom)) = receipt_root_bloom {
compare_receipts_root_and_logs_bloom(
receipts_root,
logs_bloom,
header.receipts_root(),
header.logs_bloom(),
)
} else {
verify_receipts_optimism(
header.receipts_root(),
header.logs_bloom(),
receipts,
chain_spec,
header.timestamp(),
)
};
if let Err(error) = result {
let receipts = receipts
.iter()
.map(|r| Bytes::from(r.with_bloom_ref().encoded_2718()))
.collect::<Vec<_>>();
tracing::debug!(%error, ?receipts, "receipts verification failed");
return Err(error)
}
}
// Check if gas used matches the value set in header.
@@ -543,7 +558,7 @@ mod tests {
requests: Requests::default(),
gas_used: GAS_USED,
};
validate_block_post_execution(&header, &chainspec, &result).unwrap();
validate_block_post_execution(&header, &chainspec, &result, None).unwrap();
}
#[test]
@@ -565,7 +580,7 @@ mod tests {
gas_used: GAS_USED,
};
assert!(matches!(
validate_block_post_execution(&header, &chainspec, &result).unwrap_err(),
validate_block_post_execution(&header, &chainspec, &result, None).unwrap_err(),
ConsensusError::BlobGasUsedDiff(diff)
if diff.got == BLOB_GAS_USED && diff.expected == BLOB_GAS_USED + 1
));

View File

@@ -230,7 +230,9 @@ where
let spec = revm_spec_by_timestamp_after_bedrock(self.chain_spec(), timestamp);
let cfg_env = CfgEnv::new().with_chain_id(self.chain_spec().chain().id()).with_spec(spec);
let cfg_env = CfgEnv::new()
.with_chain_id(self.chain_spec().chain().id())
.with_spec_and_mainnet_gas_params(spec);
let blob_excess_gas_and_price = spec
.into_eth_spec()
@@ -362,7 +364,8 @@ mod tests {
let db = CacheDB::<EmptyDBTyped<ProviderError>>::default();
// Create a custom configuration environment with a chain ID of 111
let cfg = CfgEnv::new().with_chain_id(111).with_spec(OpSpecId::default());
let cfg =
CfgEnv::new().with_chain_id(111).with_spec_and_mainnet_gas_params(OpSpecId::default());
let evm_env = EvmEnv { cfg_env: cfg.clone(), ..Default::default() };
@@ -400,8 +403,10 @@ mod tests {
let db = CacheDB::<EmptyDBTyped<ProviderError>>::default();
let evm_env =
EvmEnv { cfg_env: CfgEnv::new().with_spec(OpSpecId::ECOTONE), ..Default::default() };
let evm_env = EvmEnv {
cfg_env: CfgEnv::new().with_spec_and_mainnet_gas_params(OpSpecId::ECOTONE),
..Default::default()
};
let evm = evm_config.evm_with_env(db, evm_env.clone());
@@ -427,7 +432,8 @@ mod tests {
let evm_config = test_evm_config();
let db = CacheDB::<EmptyDBTyped<ProviderError>>::default();
let cfg = CfgEnv::new().with_chain_id(111).with_spec(OpSpecId::default());
let cfg =
CfgEnv::new().with_chain_id(111).with_spec_and_mainnet_gas_params(OpSpecId::default());
let block = BlockEnv::default();
let evm_env = EvmEnv { block_env: block, cfg_env: cfg.clone() };
@@ -463,8 +469,10 @@ mod tests {
let evm_config = test_evm_config();
let db = CacheDB::<EmptyDBTyped<ProviderError>>::default();
let evm_env =
EvmEnv { cfg_env: CfgEnv::new().with_spec(OpSpecId::ECOTONE), ..Default::default() };
let evm_env = EvmEnv {
cfg_env: CfgEnv::new().with_spec_and_mainnet_gas_params(OpSpecId::ECOTONE),
..Default::default()
};
let evm = evm_config.evm_with_env_and_inspector(db, evm_env.clone(), NoOpInspector {});
@@ -521,12 +529,8 @@ mod tests {
// Create a Chain object with a BTreeMap of blocks mapped to their block numbers,
// including block1_hash and block2_hash, and the execution_outcome
let chain: Chain<OpPrimitives> = Chain::new(
[block1, block2],
execution_outcome.clone(),
BTreeMap::new(),
BTreeMap::new(),
);
let chain: Chain<OpPrimitives> =
Chain::new([block1, block2], execution_outcome.clone(), BTreeMap::new());
// Assert that the proper receipt vector is returned for block1_hash
assert_eq!(chain.receipts_by_block_hash(block1_hash), Some(vec![&receipt1]));

View File

@@ -12,10 +12,18 @@ use reth_primitives_traits::{AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives
use reth_revm::cached::CachedReads;
use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
use reth_tasks::TaskExecutor;
use std::{sync::Arc, time::Instant};
use tokio::sync::{oneshot, watch};
use std::{
sync::Arc,
time::{Duration, Instant},
};
use tokio::{
sync::{oneshot, watch},
time::sleep,
};
use tracing::*;
const CONNECTION_BACKOUT_PERIOD: Duration = Duration::from_secs(5);
/// The `FlashBlockService` maintains an in-memory [`PendingFlashBlock`] built out of a sequence of
/// [`FlashBlock`]s.
#[derive(Debug)]
@@ -167,7 +175,13 @@ where
self.try_start_build_job();
}
Some(Err(err)) => {
warn!(target: "flashblocks", %err, "Error receiving flashblock");
warn!(
target: "flashblocks",
%err,
retry_period = CONNECTION_BACKOUT_PERIOD.as_secs(),
"Error receiving flashblock"
);
sleep(CONNECTION_BACKOUT_PERIOD).await;
}
None => {
warn!(target: "flashblocks", "Flashblock stream ended");

View File

@@ -8,10 +8,8 @@ use reth_evm::{
execute::{BlockBuilder, BlockBuilderOutcome},
ConfigureEvm,
};
use reth_execution_types::ExecutionOutcome;
use reth_primitives_traits::{
AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, Recovered,
};
use reth_execution_types::BlockExecutionOutput;
use reth_primitives_traits::{BlockTy, HeaderTy, NodePrimitives, ReceiptTy, Recovered};
use reth_revm::{cached::CachedReads, database::StateProviderDatabase, db::State};
use reth_rpc_eth_types::{EthApiError, PendingBlock};
use reth_storage_api::{noop::NoopProvider, BlockReaderIdExt, StateProviderFactory};
@@ -112,12 +110,8 @@ where
builder.finish(NoopProvider::default())?
};
let execution_outcome = ExecutionOutcome::new(
state.take_bundle(),
vec![execution_result.receipts],
block.number(),
vec![execution_result.requests],
);
let execution_outcome =
BlockExecutionOutput { state: state.take_bundle(), result: execution_result };
let pending_block = PendingBlock::with_executed_block(
Instant::now() + Duration::from_secs(1),

View File

@@ -81,6 +81,7 @@ reth-revm = { workspace = true, features = ["std"] }
reth-rpc.workspace = true
reth-rpc-eth-types.workspace = true
reth-stages-types.workspace = true
reth-trie-db.workspace = true
alloy-network.workspace = true
alloy-op-hardforks.workspace = true
@@ -125,6 +126,7 @@ test-utils = [
"reth-optimism-primitives/arbitrary",
"reth-primitives-traits/test-utils",
"reth-trie-common/test-utils",
"reth-trie-db/test-utils",
"reth-stages-types/test-utils",
]
reth-codec = ["reth-optimism-primitives/reth-codec"]

View File

@@ -24,6 +24,7 @@
//! use reth_rpc::TraceApi;
//! use reth_rpc_eth_types::{EthConfig, EthStateCache};
//! use reth_tasks::{pool::BlockingTaskGuard, TaskManager};
//! use reth_trie_db::ChangesetCache;
//! use std::sync::Arc;
//!
//! #[tokio::main]
@@ -37,7 +38,7 @@
//! .with_loaded_toml_config(sepolia)
//! .unwrap()
//! .attach(Arc::new(db))
//! .with_provider_factory::<_, OpEvmConfig>()
//! .with_provider_factory::<_, OpEvmConfig>(ChangesetCache::new())
//! .await
//! .unwrap()
//! .with_genesis()

View File

@@ -19,7 +19,7 @@ use reth_optimism_node::{args::RollupArgs, OpEvmConfig, OpExecutorBuilder, OpNod
use reth_optimism_primitives::OpPrimitives;
use reth_provider::providers::BlockchainProvider;
use revm::{
context::{BlockEnv, Cfg, ContextTr, TxEnv},
context::{BlockEnv, ContextTr, TxEnv},
context_interface::result::EVMError,
inspector::NoOpInspector,
interpreter::interpreter::EthInterpreter,
@@ -103,7 +103,7 @@ fn test_setup_custom_precompiles() {
input: EvmEnv<OpSpecId>,
) -> Self::Evm<DB, NoOpInspector> {
let mut op_evm = OpEvmFactory::default().create_evm(db, input);
*op_evm.components_mut().2 = UniPrecompiles::precompiles(op_evm.ctx().cfg().spec());
*op_evm.components_mut().2 = UniPrecompiles::precompiles(*op_evm.ctx().cfg().spec());
op_evm
}
@@ -119,7 +119,7 @@ fn test_setup_custom_precompiles() {
) -> Self::Evm<DB, I> {
let mut op_evm =
OpEvmFactory::default().create_evm_with_inspector(db, input, inspector);
*op_evm.components_mut().2 = UniPrecompiles::precompiles(op_evm.ctx().cfg().spec());
*op_evm.components_mut().2 = UniPrecompiles::precompiles(*op_evm.ctx().cfg().spec());
op_evm
}

View File

@@ -18,7 +18,7 @@ use reth_evm::{
op_revm::{constants::L1_BLOCK_CONTRACT, L1BlockInfo},
ConfigureEvm, Database,
};
use reth_execution_types::ExecutionOutcome;
use reth_execution_types::BlockExecutionOutput;
use reth_optimism_forks::OpHardforks;
use reth_optimism_primitives::{transaction::OpTransaction, L2_TO_L1_MESSAGE_PASSER_ADDRESS};
use reth_optimism_txpool::{
@@ -375,12 +375,8 @@ impl<Txs> OpBuilder<'_, Txs> {
let sealed_block = Arc::new(block.sealed_block().clone());
debug!(target: "payload_builder", id=%ctx.attributes().payload_id(), sealed_block_header = ?sealed_block.header(), "sealed built block");
let execution_outcome = ExecutionOutcome::new(
db.take_bundle(),
vec![execution_result.receipts],
block.number(),
Vec::new(),
);
let execution_outcome =
BlockExecutionOutput { state: db.take_bundle(), result: execution_result };
// create the executed block data
let executed: BuiltPayloadExecutedBlock<N> = BuiltPayloadExecutedBlock {
@@ -634,7 +630,7 @@ where
if sequencer_tx.value().is_eip4844() {
return Err(PayloadBuilderError::other(
OpPayloadBuilderError::BlobTransactionRejected,
))
));
}
// Convert the transaction to a [RecoveredTx]. This is

View File

@@ -77,6 +77,7 @@ arbitrary = [
keccak-cache-global = [
"reth-optimism-node?/keccak-cache-global",
"reth-node-core?/keccak-cache-global",
"reth-optimism-cli?/keccak-cache-global",
]
test-utils = [
"reth-chainspec/test-utils",

View File

@@ -11,7 +11,7 @@ use alloy_rpc_types_engine::{PayloadAttributes as EthPayloadAttributes, PayloadI
use core::fmt;
use either::Either;
use reth_chain_state::ComputedTrieData;
use reth_execution_types::ExecutionOutcome;
use reth_execution_types::BlockExecutionOutput;
use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
use reth_trie_common::{
updates::{TrieUpdates, TrieUpdatesSorted},
@@ -27,7 +27,7 @@ pub struct BuiltPayloadExecutedBlock<N: NodePrimitives> {
/// Recovered Block
pub recovered_block: Arc<RecoveredBlock<N::Block>>,
/// Block's execution outcome.
pub execution_output: Arc<ExecutionOutcome<N::Receipt>>,
pub execution_output: Arc<BlockExecutionOutput<N::Receipt>>,
/// Block's hashed state.
///
/// Supports both unsorted and sorted variants so payload builders can avoid cloning in order

Some files were not shown because too many files have changed in this diff Show More