Compare commits

...

16 Commits

Author SHA1 Message Date
yongkangc
9bd5a3ecba fix: strengthen batch invariants and prevent blinded node starvation
- Change debug_assert to assert for multi_added_removed_keys Arc equality
  check in BatchedStorageProof::merge, ensuring incorrect proofs are
  caught in release builds, not just debug

- Change BatchedAccountProof::merge to try_merge returning Result, properly
  handling incompatible caches by processing as separate batches instead
  of panicking

- Add MAX_DEFERRED_BLINDED_NODES (16) limit to prevent starvation of
  blinded node requests under high proof load - stops batching early when
  limit reached

- Pre-allocate deferred_blinded_nodes vectors with capacity

- Remove unnecessary clone of storage_work_tx by taking reference
2025-12-09 11:32:31 +00:00
yongkangc
1a5d9a3ad3 perf(trie): use Arc for proof sharing and add batch validation
- Wrap DecodedMultiProof and DecodedStorageMultiProof in Arc within ProofResult
  for O(1) cloning when sending batched results to multiple receivers
- Add debug assertions in BatchedStorageProof::merge and BatchedAccountProof::merge
  to validate that all batched jobs share the same Arc for multi_added_removed_keys
  and missed_leaves_storage_roots (critical invariants for correctness)
- Unwrap Arc at extraction sites using try_unwrap for zero-cost when sole owner
2025-12-09 11:32:31 +00:00
yongkangc
394b0d5989 perf(trie): batch proof jobs at worker level
Implement worker-level batching for both storage and account proofs to
reduce redundant trie traversals when multiple proof requests queue up.

When proof requests arrive faster than workers can process them, jobs
for the same account (storage proofs) or consecutive jobs (account proofs)
are now merged into single proof computations.
2025-12-09 11:32:31 +00:00
Arsenii Kulikov
bb4285c9ba feat: add helper method to eth validator (#20206) 2025-12-09 11:32:31 +00:00
Karl Yu
88dfd15243 feat: add StorageSettings for StoragesHistory in RocksDB (#20154) 2025-12-09 11:32:31 +00:00
futreall
388507bcee fix(rpc): return error if toBlock exceeds current head (#20202) 2025-12-09 11:32:31 +00:00
Léa Narzis
a430fb6798 test(era): complete int tests with roundtrip mainnet era files (#20064) 2025-12-09 11:32:31 +00:00
joshieDo
b6dca7b319 fix: set merkle changesets distance minimum to 128 (#20200) 2025-12-09 11:32:31 +00:00
Matthias Seitz
12fdca7e34 perf: avoid duplicate storage get call (#20180) 2025-12-09 11:32:31 +00:00
Matthias Seitz
c0d73fa5fe fix: trace filter range off by one (#20199) 2025-12-09 11:32:31 +00:00
forkfury
0cbd2aecdf feat(primitives-traits): add recover_transactions_ref to avoid cloning (#20187)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2025-12-09 11:32:31 +00:00
Matthias Seitz
3094cd68ea fix: make inserted blocks part of fcu canonical (#20164)
Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
2025-12-09 11:32:31 +00:00
Arsenii Kulikov
9265700ff9 feat: parallelize recovery (#20169)
Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2025-12-09 11:32:31 +00:00
Alexey Shekhirin
0b07657f33 ci: run on ubuntu instead of reth runner (#20196) 2025-12-09 11:32:31 +00:00
github-actions[bot]
e7bd168eca chore(deps): weekly cargo update (#20174)
Co-authored-by: github-merge-queue <118344674+github-merge-queue@users.noreply.github.com>
2025-12-09 11:32:31 +00:00
yongkangc
9a55cb9da5 Add bench compare latency stats 2025-12-07 02:17:06 +00:00
40 changed files with 1332 additions and 752 deletions

View File

@@ -15,8 +15,7 @@ env:
name: bench
jobs:
codspeed:
runs-on:
group: Reth
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
with:

View File

@@ -17,8 +17,7 @@ env:
name: compact-codec
jobs:
compact-codec:
runs-on:
group: Reth
runs-on: ubuntu-latest
strategy:
matrix:
bin:

View File

@@ -19,8 +19,7 @@ concurrency:
jobs:
test:
name: e2e-testsuite
runs-on:
group: Reth
runs-on: ubuntu-latest
env:
RUST_BACKTRACE: 1
timeout-minutes: 90
@@ -43,4 +42,3 @@ jobs:
--exclude 'op-reth' \
--exclude 'reth' \
-E 'binary(e2e_testsuite)'

View File

@@ -24,8 +24,7 @@ jobs:
prepare-hive:
if: github.repository == 'paradigmxyz/reth'
timeout-minutes: 45
runs-on:
group: Reth
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- name: Checkout hive tests
@@ -179,8 +178,7 @@ jobs:
- prepare-reth
- prepare-hive
name: run ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }}
runs-on:
group: Reth
runs-on: ubuntu-latest
permissions:
issues: write
steps:
@@ -247,8 +245,7 @@ jobs:
notify-on-error:
needs: test
if: failure()
runs-on:
group: Reth
runs-on: ubuntu-latest
steps:
- name: Slack Webhook Action
uses: rtCamp/action-slack-notify@v2

View File

@@ -23,8 +23,7 @@ jobs:
test:
name: test / ${{ matrix.network }}
if: github.event_name != 'schedule'
runs-on:
group: Reth
runs-on: ubuntu-latest
env:
RUST_BACKTRACE: 1
strategy:

View File

@@ -9,7 +9,7 @@ on:
push:
tags:
- '*'
- "*"
env:
CARGO_TERM_COLOR: always
@@ -32,8 +32,7 @@ jobs:
strategy:
fail-fast: false
name: run kurtosis
runs-on:
group: Reth
runs-on: ubuntu-latest
needs:
- prepare-reth
steps:
@@ -83,12 +82,10 @@ jobs:
kurtosis service logs -a op-devnet op-cl-2151908-2-op-node-op-reth-op-kurtosis
exit 1
notify-on-error:
needs: test
if: failure()
runs-on:
group: Reth
runs-on: ubuntu-latest
steps:
- name: Slack Webhook Action
uses: rtCamp/action-slack-notify@v2

View File

@@ -9,7 +9,7 @@ on:
push:
tags:
- '*'
- "*"
env:
CARGO_TERM_COLOR: always
@@ -30,8 +30,7 @@ jobs:
strategy:
fail-fast: false
name: run kurtosis
runs-on:
group: Reth
runs-on: ubuntu-latest
needs:
- prepare-reth
steps:
@@ -54,13 +53,12 @@ jobs:
- name: Run kurtosis
uses: ethpandaops/kurtosis-assertoor-github-action@v1
with:
ethereum_package_args: '.github/assets/kurtosis_network_params.yaml'
ethereum_package_args: ".github/assets/kurtosis_network_params.yaml"
notify-on-error:
needs: test
if: failure()
runs-on:
group: Reth
runs-on: ubuntu-latest
steps:
- name: Slack Webhook Action
uses: rtCamp/action-slack-notify@v2

View File

@@ -26,8 +26,7 @@ jobs:
prepare-reth:
if: github.repository == 'paradigmxyz/reth'
timeout-minutes: 45
runs-on:
group: Reth
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- run: mkdir artifacts

View File

@@ -22,8 +22,7 @@ jobs:
name: stage-run-test
# Only run stage commands test in merge groups
if: github.event_name == 'merge_group'
runs-on:
group: Reth
runs-on: ubuntu-latest
env:
RUST_LOG: info,sync=error
RUST_BACKTRACE: 1

View File

@@ -17,8 +17,7 @@ concurrency:
jobs:
sync:
name: sync (${{ matrix.chain.bin }})
runs-on:
group: Reth
runs-on: ubuntu-latest
env:
RUST_LOG: info,sync=error
RUST_BACKTRACE: 1
@@ -64,4 +63,4 @@ jobs:
${{ matrix.chain.bin }} stage unwind num-blocks 100 --chain ${{ matrix.chain.chain }}
- name: Run stage unwind to block hash
run: |
${{ matrix.chain.bin }} stage unwind to-block ${{ matrix.chain.unwind-target }} --chain ${{ matrix.chain.chain }}
${{ matrix.chain.bin }} stage unwind to-block ${{ matrix.chain.unwind-target }} --chain ${{ matrix.chain.chain }}

View File

@@ -17,8 +17,7 @@ concurrency:
jobs:
sync:
name: sync (${{ matrix.chain.bin }})
runs-on:
group: Reth
runs-on: ubuntu-latest
env:
RUST_LOG: info,sync=error
RUST_BACKTRACE: 1
@@ -63,4 +62,4 @@ jobs:
${{ matrix.chain.bin }} stage unwind num-blocks 100 --chain ${{ matrix.chain.chain }}
- name: Run stage unwind to block hash
run: |
${{ matrix.chain.bin }} stage unwind to-block ${{ matrix.chain.unwind-target }} --chain ${{ matrix.chain.chain }}
${{ matrix.chain.bin }} stage unwind to-block ${{ matrix.chain.unwind-target }} --chain ${{ matrix.chain.chain }}

View File

@@ -19,8 +19,7 @@ concurrency:
jobs:
test:
name: test / ${{ matrix.type }} (${{ matrix.partition }}/${{ matrix.total_partitions }})
runs-on:
group: Reth
runs-on: ubuntu-latest
env:
RUST_BACKTRACE: 1
strategy:
@@ -65,8 +64,7 @@ jobs:
state:
name: Ethereum state tests
runs-on:
group: Reth
runs-on: ubuntu-latest
env:
RUST_LOG: info,sync=error
RUST_BACKTRACE: 1
@@ -100,8 +98,7 @@ jobs:
doc:
name: doc tests
runs-on:
group: Reth
runs-on: ubuntu-latest
env:
RUST_BACKTRACE: 1
timeout-minutes: 30

195
Cargo.lock generated
View File

@@ -97,9 +97,9 @@ checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
[[package]]
name = "alloy-chains"
version = "0.2.20"
version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bc32535569185cbcb6ad5fa64d989a47bccb9a08e27284b1f2a3ccf16e6d010"
checksum = "1b9ebac8ff9c2f07667e1803dc777304337e160ce5153335beb45e8ec0751808"
dependencies = [
"alloy-primitives",
"alloy-rlp",
@@ -112,9 +112,9 @@ dependencies = [
[[package]]
name = "alloy-consensus"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b6440213a22df93a87ed512d2f668e7dc1d62a05642d107f82d61edc9e12370"
checksum = "2e318e25fb719e747a7e8db1654170fc185024f3ed5b10f86c08d448a912f6e2"
dependencies = [
"alloy-eips",
"alloy-primitives",
@@ -140,9 +140,9 @@ dependencies = [
[[package]]
name = "alloy-consensus-any"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15d0bea09287942405c4f9d2a4f22d1e07611c2dbd9d5bf94b75366340f9e6e0"
checksum = "364380a845193a317bcb7a5398fc86cdb66c47ebe010771dde05f6869bf9e64a"
dependencies = [
"alloy-consensus",
"alloy-eips",
@@ -155,9 +155,9 @@ dependencies = [
[[package]]
name = "alloy-contract"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d69af404f1d00ddb42f2419788fa87746a4cd13bab271916d7726fda6c792d94"
checksum = "08d39c80ffc806f27a76ed42f3351a455f3dc4f81d6ff92c8aad2cf36b7d3a34"
dependencies = [
"alloy-consensus",
"alloy-dyn-abi",
@@ -240,9 +240,9 @@ dependencies = [
[[package]]
name = "alloy-eips"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bd2c7ae05abcab4483ce821f12f285e01c0b33804e6883dd9ca1569a87ee2be"
checksum = "a4c4d7c5839d9f3a467900c625416b24328450c65702eb3d8caff8813e4d1d33"
dependencies = [
"alloy-eip2124",
"alloy-eip2930",
@@ -288,9 +288,9 @@ dependencies = [
[[package]]
name = "alloy-genesis"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc47eaae86488b07ea8e20236184944072a78784a1f4993f8ec17b3aa5d08c21"
checksum = "1ba4b1be0988c11f0095a2380aa596e35533276b8fa6c9e06961bbfe0aebcac5"
dependencies = [
"alloy-eips",
"alloy-primitives",
@@ -329,9 +329,9 @@ dependencies = [
[[package]]
name = "alloy-json-rpc"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "003f46c54f22854a32b9cc7972660a476968008ad505427eabab49225309ec40"
checksum = "f72cf87cda808e593381fb9f005ffa4d2475552b7a6c5ac33d087bf77d82abd0"
dependencies = [
"alloy-primitives",
"alloy-sol-types",
@@ -344,9 +344,9 @@ dependencies = [
[[package]]
name = "alloy-network"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f4029954d9406a40979f3a3b46950928a0fdcfe3ea8a9b0c17490d57e8aa0e3"
checksum = "12aeb37b6f2e61b93b1c3d34d01ee720207c76fe447e2a2c217e433ac75b17f5"
dependencies = [
"alloy-consensus",
"alloy-consensus-any",
@@ -370,9 +370,9 @@ dependencies = [
[[package]]
name = "alloy-network-primitives"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7805124ad69e57bbae7731c9c344571700b2a18d351bda9e0eba521c991d1bcb"
checksum = "abd29ace62872083e30929cd9b282d82723196d196db589f3ceda67edcc05552"
dependencies = [
"alloy-consensus",
"alloy-eips",
@@ -401,9 +401,9 @@ dependencies = [
[[package]]
name = "alloy-op-hardforks"
version = "0.4.4"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95ac97adaba4c26e17192d81f49186ac20c1e844e35a00e169c8d3d58bc84e6b"
checksum = "f96fb2fce4024ada5b2c11d4076acf778a0d3e4f011c6dfd2ffce6d0fcf84ee9"
dependencies = [
"alloy-chains",
"alloy-hardforks",
@@ -444,9 +444,9 @@ dependencies = [
[[package]]
name = "alloy-provider"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d369e12c92870d069e0c9dc5350377067af8a056e29e3badf8446099d7e00889"
checksum = "9b710636d7126e08003b8217e24c09f0cca0b46d62f650a841736891b1ed1fc1"
dependencies = [
"alloy-chains",
"alloy-consensus",
@@ -489,9 +489,9 @@ dependencies = [
[[package]]
name = "alloy-pubsub"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f77d20cdbb68a614c7a86b3ffef607b37d087bb47a03c58f4c3f8f99bc3ace3b"
checksum = "cdd4c64eb250a18101d22ae622357c6b505e158e9165d4c7974d59082a600c5e"
dependencies = [
"alloy-json-rpc",
"alloy-primitives",
@@ -533,9 +533,9 @@ dependencies = [
[[package]]
name = "alloy-rpc-client"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31c89883fe6b7381744cbe80fef638ac488ead4f1956a4278956a1362c71cd2e"
checksum = "d0882e72d2c1c0c79dcf4ab60a67472d3f009a949f774d4c17d0bdb669cfde05"
dependencies = [
"alloy-json-rpc",
"alloy-primitives",
@@ -559,9 +559,9 @@ dependencies = [
[[package]]
name = "alloy-rpc-types"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64e279e6d40ee40fe8f76753b678d8d5d260cb276dc6c8a8026099b16d2b43f4"
checksum = "39cf1398cb33aacb139a960fa3d8cf8b1202079f320e77e952a0b95967bf7a9f"
dependencies = [
"alloy-primitives",
"alloy-rpc-types-engine",
@@ -572,9 +572,9 @@ dependencies = [
[[package]]
name = "alloy-rpc-types-admin"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bcf50ccb65d29b8599f8f5e23dcac685f1d79459654c830cba381345760e901"
checksum = "65a583d2029b171301f5dcf122aa2ef443a65a373778ec76540d999691ae867d"
dependencies = [
"alloy-genesis",
"alloy-primitives",
@@ -584,9 +584,9 @@ dependencies = [
[[package]]
name = "alloy-rpc-types-anvil"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e176c26fdd87893b6afeb5d92099d8f7e7a1fe11d6f4fe0883d6e33ac5f31ba"
checksum = "c3ce4c24e416bd0f17fceeb2f26cd8668df08fe19e1dc02f9d41c3b8ed1e93e0"
dependencies = [
"alloy-primitives",
"alloy-rpc-types-eth",
@@ -596,9 +596,9 @@ dependencies = [
[[package]]
name = "alloy-rpc-types-any"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b43c1622aac2508d528743fd4cfdac1dea92d5a8fa894038488ff7edd0af0b32"
checksum = "6a63fb40ed24e4c92505f488f9dd256e2afaed17faa1b7a221086ebba74f4122"
dependencies = [
"alloy-consensus-any",
"alloy-rpc-types-eth",
@@ -607,9 +607,9 @@ dependencies = [
[[package]]
name = "alloy-rpc-types-beacon"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1786681640d4c60f22b6b8376b0f3fa200360bf1c3c2cb913e6c97f51928eb1b"
checksum = "16633087e23d8d75161c3a59aa183203637b817a5a8d2f662f612ccb6d129af0"
dependencies = [
"alloy-eips",
"alloy-primitives",
@@ -627,9 +627,9 @@ dependencies = [
[[package]]
name = "alloy-rpc-types-debug"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b2ca3a434a6d49910a7e8e51797eb25db42ef8a5578c52d877fcb26d0afe7bc"
checksum = "4936f579d9d10eae01772b2ab3497f9d568684f05f26f8175e12f9a1a2babc33"
dependencies = [
"alloy-primitives",
"derive_more",
@@ -639,9 +639,9 @@ dependencies = [
[[package]]
name = "alloy-rpc-types-engine"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4c53a8b0905d931e7921774a1830609713bd3e8222347963172b03a3ecc68"
checksum = "4c60bdce3be295924122732b7ecd0b2495ce4790bedc5370ca7019c08ad3f26e"
dependencies = [
"alloy-consensus",
"alloy-eips",
@@ -660,9 +660,9 @@ dependencies = [
[[package]]
name = "alloy-rpc-types-eth"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed5fafb741c19b3cca4cdd04fa215c89413491f9695a3e928dee2ae5657f607e"
checksum = "9eae0c7c40da20684548cbc8577b6b7447f7bf4ddbac363df95e3da220e41e72"
dependencies = [
"alloy-consensus",
"alloy-consensus-any",
@@ -682,9 +682,9 @@ dependencies = [
[[package]]
name = "alloy-rpc-types-mev"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49a97bfc6d9b411c85bb08e1174ddd3e5d61b10d3bd13f529d6609f733cb2f6f"
checksum = "81c0dd81c24944cfbf45b5df7cd149d9cd3e354db81ccf08aa47e0e05be8ab97"
dependencies = [
"alloy-consensus",
"alloy-eips",
@@ -697,9 +697,9 @@ dependencies = [
[[package]]
name = "alloy-rpc-types-trace"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c55324323aa634b01bdecb2d47462a8dce05f5505b14a6e5db361eef16eda476"
checksum = "ef206a4b8d436fbb7cf2e6a61c692d11df78f9382becc3c9a283bd58e64f0583"
dependencies = [
"alloy-primitives",
"alloy-rpc-types-eth",
@@ -711,9 +711,9 @@ dependencies = [
[[package]]
name = "alloy-rpc-types-txpool"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96b1aa28effb6854be356ce92ed64cea3b323acd04c3f8bfb5126e2839698043"
checksum = "ecb5a795264a02222f9534435b8f40dcbd88de8e9d586647884aae24f389ebf2"
dependencies = [
"alloy-primitives",
"alloy-rpc-types-eth",
@@ -723,9 +723,9 @@ dependencies = [
[[package]]
name = "alloy-serde"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6f180c399ca7c1e2fe17ea58343910cad0090878a696ff5a50241aee12fc529"
checksum = "c0df1987ed0ff2d0159d76b52e7ddfc4e4fbddacc54d2fbee765e0d14d7c01b5"
dependencies = [
"alloy-primitives",
"arbitrary",
@@ -735,9 +735,9 @@ dependencies = [
[[package]]
name = "alloy-signer"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecc39ad2c0a3d2da8891f4081565780703a593f090f768f884049aa3aa929cbc"
checksum = "6ff69deedee7232d7ce5330259025b868c5e6a52fa8dffda2c861fb3a5889b24"
dependencies = [
"alloy-primitives",
"async-trait",
@@ -750,9 +750,9 @@ dependencies = [
[[package]]
name = "alloy-signer-local"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "930e17cb1e46446a193a593a3bfff8d0ecee4e510b802575ebe300ae2e43ef75"
checksum = "72cfe0be3ec5a8c1a46b2e5a7047ed41121d360d97f4405bb7c1c784880c86cb"
dependencies = [
"alloy-consensus",
"alloy-network",
@@ -839,9 +839,9 @@ dependencies = [
[[package]]
name = "alloy-transport"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cae82426d98f8bc18f53c5223862907cac30ab8fc5e4cd2bb50808e6d3ab43d8"
checksum = "be98b07210d24acf5b793c99b759e9a696e4a2e67593aec0487ae3b3e1a2478c"
dependencies = [
"alloy-json-rpc",
"auto_impl",
@@ -862,9 +862,9 @@ dependencies = [
[[package]]
name = "alloy-transport-http"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90aa6825760905898c106aba9c804b131816a15041523e80b6d4fe7af6380ada"
checksum = "4198a1ee82e562cab85e7f3d5921aab725d9bd154b6ad5017f82df1695877c97"
dependencies = [
"alloy-json-rpc",
"alloy-transport",
@@ -877,9 +877,9 @@ dependencies = [
[[package]]
name = "alloy-transport-ipc"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ace83a4a6bb896e5894c3479042e6ba78aa5271dde599aa8c36a021d49cc8cc"
checksum = "d8db249779ebc20dc265920c7e706ed0d31dbde8627818d1cbde60919b875bb0"
dependencies = [
"alloy-json-rpc",
"alloy-pubsub",
@@ -897,9 +897,9 @@ dependencies = [
[[package]]
name = "alloy-transport-ws"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86c9ab4c199e3a8f3520b60ba81aa67bb21fed9ed0d8304e0569094d0758a56f"
checksum = "5ad2344a12398d7105e3722c9b7a7044ea837128e11d453604dec6e3731a86e2"
dependencies = [
"alloy-pubsub",
"alloy-transport",
@@ -935,9 +935,9 @@ dependencies = [
[[package]]
name = "alloy-tx-macros"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae109e33814b49fc0a62f2528993aa8a2dd346c26959b151f05441dc0b9da292"
checksum = "333544408503f42d7d3792bfc0f7218b643d968a03d2c0ed383ae558fb4a76d0"
dependencies = [
"darling 0.21.3",
"proc-macro2",
@@ -1627,15 +1627,15 @@ checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7"
[[package]]
name = "bitcoin-io"
version = "0.1.3"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b47c4ab7a93edb0c7198c5535ed9b52b63095f4e9b45279c6736cec4b856baf"
checksum = "2dee39a0ee5b4095224a0cfc6bf4cc1baf0f9624b96b367e53b66d974e51d953"
[[package]]
name = "bitcoin_hashes"
version = "0.14.0"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb18c03d0db0247e147a21a6faafd5a7eb851c743db062de72018b6b7e8e4d16"
checksum = "26ec84b80c482df901772e931a9a681e26a1b9ee2302edeff23cb30328745c8b"
dependencies = [
"bitcoin-io",
"hex-conservative",
@@ -2471,9 +2471,9 @@ dependencies = [
[[package]]
name = "convert_case"
version = "0.7.1"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb402b8d4c85569410425650ce3eddc7d698ed96d39a73f941b08fb63082f1e7"
checksum = "633458d4ef8c78b72454de2d54fd6ab2e60f9e02be22f3c6104cdc8a4e0fceb9"
dependencies = [
"unicode-segmentation",
]
@@ -2978,22 +2978,23 @@ dependencies = [
[[package]]
name = "derive_more"
version = "2.0.1"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678"
checksum = "10b768e943bed7bf2cab53df09f4bc34bfd217cdb57d971e769874c9a6710618"
dependencies = [
"derive_more-impl",
]
[[package]]
name = "derive_more-impl"
version = "2.0.1"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3"
checksum = "6d286bfdaf75e988b4a78e013ecd79c581e06399ab53fbacd2d916c2f904f30b"
dependencies = [
"convert_case",
"proc-macro2",
"quote",
"rustc_version 0.4.1",
"syn 2.0.111",
"unicode-xid",
]
@@ -3981,9 +3982,9 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
[[package]]
name = "flate2"
version = "1.1.5"
version = "1.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfe33edd8e85a12a67454e37f8c75e730830d83e313556ab9ebf9ee7fbeb3bfb"
checksum = "a2152dbcb980c05735e2a651d96011320a949eb31a0c8b38b72645ce97dec676"
dependencies = [
"crc32fast",
"miniz_oxide",
@@ -4277,9 +4278,9 @@ dependencies = [
[[package]]
name = "git2"
version = "0.20.2"
version = "0.20.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2deb07a133b1520dc1a5690e9bd08950108873d7ed5de38dcc74d3b5ebffa110"
checksum = "3e2b37e2f62729cdada11f0e6b3b6fe383c69c29fc619e391223e12856af308c"
dependencies = [
"bitflags 2.10.0",
"libc",
@@ -4704,9 +4705,9 @@ dependencies = [
[[package]]
name = "hyper-util"
version = "0.1.18"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52e9a2a24dc5c6821e71a7030e1e14b7b632acac55c40e9d2e082c621261bb56"
checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f"
dependencies = [
"base64 0.22.1",
"bytes",
@@ -5416,15 +5417,15 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]]
name = "libc"
version = "0.2.177"
version = "0.2.178"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976"
checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091"
[[package]]
name = "libgit2-sys"
version = "0.18.2+1.9.1"
version = "0.18.3+1.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c42fe03df2bd3c53a3a9c7317ad91d80c81cd1fb0caec8d7cc4cd2bfa10c222"
checksum = "c9b3acc4b91781bb0b3386669d325163746af5f6e4f73e6d2d630e09a35f3487"
dependencies = [
"cc",
"libc",
@@ -5569,9 +5570,9 @@ dependencies = [
[[package]]
name = "log"
version = "0.4.28"
version = "0.4.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432"
checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897"
[[package]]
name = "loom"
@@ -5839,9 +5840,9 @@ dependencies = [
[[package]]
name = "mio"
version = "1.1.0"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873"
checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc"
dependencies = [
"libc",
"log",
@@ -6792,7 +6793,7 @@ version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "219cb19e96be00ab2e37d6e299658a0cfa83e52429179969b0f0121b4ac46983"
dependencies = [
"toml_edit 0.23.7",
"toml_edit 0.23.9",
]
[[package]]
@@ -12787,9 +12788,9 @@ dependencies = [
[[package]]
name = "toml_edit"
version = "0.23.7"
version = "0.23.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6485ef6d0d9b5d0ec17244ff7eb05310113c3f316f2d14200d4de56b3cb98f8d"
checksum = "5d7cbc3b4b49633d57a0509303158ca50de80ae32c265093b24c414705807832"
dependencies = [
"indexmap 2.12.1",
"toml_datetime 0.7.3",
@@ -13303,9 +13304,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "1.18.1"
version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2"
checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a"
dependencies = [
"getrandom 0.3.4",
"js-sys",
@@ -14282,18 +14283,18 @@ dependencies = [
[[package]]
name = "zerocopy"
version = "0.8.30"
version = "0.8.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ea879c944afe8a2b25fef16bb4ba234f47c694565e97383b36f3a878219065c"
checksum = "fd74ec98b9250adb3ca554bdde269adf631549f51d8a8f8f0a10b50f1cb298c3"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.30"
version = "0.8.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf955aa904d6040f70dc8e9384444cb1030aed272ba3cb09bbc4ab9e7c1f34f5"
checksum = "d8a8d209fdf45cf5138cbb5a506f6b52522a25afccc534d1475dad8e31105c6a"
dependencies = [
"proc-macro2",
"quote",

View File

@@ -495,33 +495,33 @@ alloy-trie = { version = "0.9.1", default-features = false }
alloy-hardforks = "0.4.5"
alloy-consensus = { version = "1.1.2", default-features = false }
alloy-contract = { version = "1.1.2", default-features = false }
alloy-eips = { version = "1.1.2", default-features = false }
alloy-genesis = { version = "1.1.2", default-features = false }
alloy-json-rpc = { version = "1.1.2", default-features = false }
alloy-network = { version = "1.1.2", default-features = false }
alloy-network-primitives = { version = "1.1.2", default-features = false }
alloy-provider = { version = "1.1.2", features = ["reqwest", "debug-api"], default-features = false }
alloy-pubsub = { version = "1.1.2", default-features = false }
alloy-rpc-client = { version = "1.1.2", default-features = false }
alloy-rpc-types = { version = "1.1.2", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.1.2", default-features = false }
alloy-rpc-types-anvil = { version = "1.1.2", default-features = false }
alloy-rpc-types-beacon = { version = "1.1.2", default-features = false }
alloy-rpc-types-debug = { version = "1.1.2", default-features = false }
alloy-rpc-types-engine = { version = "1.1.2", default-features = false }
alloy-rpc-types-eth = { version = "1.1.2", default-features = false }
alloy-rpc-types-mev = { version = "1.1.2", default-features = false }
alloy-rpc-types-trace = { version = "1.1.2", default-features = false }
alloy-rpc-types-txpool = { version = "1.1.2", default-features = false }
alloy-serde = { version = "1.1.2", default-features = false }
alloy-signer = { version = "1.1.2", default-features = false }
alloy-signer-local = { version = "1.1.2", default-features = false }
alloy-transport = { version = "1.1.2" }
alloy-transport-http = { version = "1.1.2", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.1.2", default-features = false }
alloy-transport-ws = { version = "1.1.2", default-features = false }
alloy-consensus = { version = "1.1.3", default-features = false }
alloy-contract = { version = "1.1.3", default-features = false }
alloy-eips = { version = "1.1.3", default-features = false }
alloy-genesis = { version = "1.1.3", default-features = false }
alloy-json-rpc = { version = "1.1.3", default-features = false }
alloy-network = { version = "1.1.3", default-features = false }
alloy-network-primitives = { version = "1.1.3", default-features = false }
alloy-provider = { version = "1.1.3", features = ["reqwest", "debug-api"], default-features = false }
alloy-pubsub = { version = "1.1.3", default-features = false }
alloy-rpc-client = { version = "1.1.3", default-features = false }
alloy-rpc-types = { version = "1.1.3", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.1.3", default-features = false }
alloy-rpc-types-anvil = { version = "1.1.3", default-features = false }
alloy-rpc-types-beacon = { version = "1.1.3", default-features = false }
alloy-rpc-types-debug = { version = "1.1.3", default-features = false }
alloy-rpc-types-engine = { version = "1.1.3", default-features = false }
alloy-rpc-types-eth = { version = "1.1.3", default-features = false }
alloy-rpc-types-mev = { version = "1.1.3", default-features = false }
alloy-rpc-types-trace = { version = "1.1.3", default-features = false }
alloy-rpc-types-txpool = { version = "1.1.3", default-features = false }
alloy-serde = { version = "1.1.3", default-features = false }
alloy-signer = { version = "1.1.3", default-features = false }
alloy-signer-local = { version = "1.1.3", default-features = false }
alloy-transport = { version = "1.1.3" }
alloy-transport-http = { version = "1.1.3", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.1.3", default-features = false }
alloy-transport-ws = { version = "1.1.3", default-features = false }
# op
alloy-op-evm = { version = "0.24.1", default-features = false }

View File

@@ -57,6 +57,7 @@ pub(crate) struct TotalGasRow {
/// - `mean_new_payload_latency_ms`: arithmetic mean latency across blocks.
/// - `median_new_payload_latency_ms`: p50 latency across blocks.
/// - `p90_new_payload_latency_ms` / `p99_new_payload_latency_ms`: tail latencies across blocks.
/// - `std_dev_new_payload_latency_ms`: standard deviation of latency across blocks.
#[derive(Debug, Clone, Serialize)]
pub(crate) struct BenchmarkSummary {
pub total_blocks: u64,
@@ -66,6 +67,7 @@ pub(crate) struct BenchmarkSummary {
pub median_new_payload_latency_ms: f64,
pub p90_new_payload_latency_ms: f64,
pub p99_new_payload_latency_ms: f64,
pub std_dev_new_payload_latency_ms: f64,
pub gas_per_second: f64,
pub blocks_per_second: f64,
pub min_block_number: u64,
@@ -96,6 +98,7 @@ pub(crate) struct RefInfo {
/// Percent deltas are `(feature - baseline) / baseline * 100`:
/// - `new_payload_latency_p50_change_percent` / p90 / p99: percent changes of the respective
/// per-block percentiles.
/// - `std_dev_change_percent`: percent change in standard deviation of newPayload latency.
/// - `per_block_latency_change_mean_percent` / `per_block_latency_change_median_percent` are the
/// mean and median of per-block percent deltas (feature vs baseline), capturing block-level
/// drift.
@@ -114,6 +117,7 @@ pub(crate) struct ComparisonSummary {
pub new_payload_latency_p50_change_percent: f64,
pub new_payload_latency_p90_change_percent: f64,
pub new_payload_latency_p99_change_percent: f64,
pub std_dev_change_percent: f64,
pub gas_per_second_change_percent: f64,
pub blocks_per_second_change_percent: f64,
}
@@ -335,6 +339,9 @@ impl ComparisonGenerator {
let mean_new_payload_latency_ms: f64 =
latencies_ms.iter().sum::<f64>() / total_blocks as f64;
let std_dev_new_payload_latency_ms =
calculate_std_dev(&latencies_ms, mean_new_payload_latency_ms);
let mut sorted_latencies_ms = latencies_ms;
sorted_latencies_ms.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
let median_new_payload_latency_ms = percentile(&sorted_latencies_ms, 0.5);
@@ -365,6 +372,7 @@ impl ComparisonGenerator {
median_new_payload_latency_ms,
p90_new_payload_latency_ms,
p99_new_payload_latency_ms,
std_dev_new_payload_latency_ms,
gas_per_second,
blocks_per_second,
min_block_number,
@@ -432,6 +440,10 @@ impl ComparisonGenerator {
baseline.p99_new_payload_latency_ms,
feature.p99_new_payload_latency_ms,
),
std_dev_change_percent: calc_percent_change(
baseline.std_dev_new_payload_latency_ms,
feature.std_dev_new_payload_latency_ms,
),
gas_per_second_change_percent: calc_percent_change(
baseline.gas_per_second,
feature.gas_per_second,
@@ -562,6 +574,7 @@ impl ComparisonGenerator {
" NewPayload Latency p99: {:+.2}%",
summary.new_payload_latency_p99_change_percent
);
println!(" NewPayload Latency std dev: {:+.2}%", summary.std_dev_change_percent);
println!(
" Gas/Second: {:+.2}%",
summary.gas_per_second_change_percent
@@ -584,11 +597,12 @@ impl ComparisonGenerator {
);
println!(" NewPayload latency (ms):");
println!(
" mean: {:.2}, p50: {:.2}, p90: {:.2}, p99: {:.2}",
" mean: {:.2}, p50: {:.2}, p90: {:.2}, p99: {:.2}, std dev: {:.2}",
baseline.mean_new_payload_latency_ms,
baseline.median_new_payload_latency_ms,
baseline.p90_new_payload_latency_ms,
baseline.p99_new_payload_latency_ms
baseline.p99_new_payload_latency_ms,
baseline.std_dev_new_payload_latency_ms
);
if let (Some(start), Some(end)) =
(&report.baseline.start_timestamp, &report.baseline.end_timestamp)
@@ -613,11 +627,12 @@ impl ComparisonGenerator {
);
println!(" NewPayload latency (ms):");
println!(
" mean: {:.2}, p50: {:.2}, p90: {:.2}, p99: {:.2}",
" mean: {:.2}, p50: {:.2}, p90: {:.2}, p99: {:.2}, std dev: {:.2}",
feature.mean_new_payload_latency_ms,
feature.median_new_payload_latency_ms,
feature.p90_new_payload_latency_ms,
feature.p99_new_payload_latency_ms
feature.p99_new_payload_latency_ms,
feature.std_dev_new_payload_latency_ms
);
if let (Some(start), Some(end)) =
(&report.feature.start_timestamp, &report.feature.end_timestamp)

View File

@@ -91,6 +91,7 @@ impl Command {
let mut settings @ StorageSettings {
receipts_in_static_files: _,
transaction_senders_in_static_files: _,
storages_history_in_rocksdb: _,
} = settings.unwrap_or_else(StorageSettings::legacy);
// Update the setting based on the key

View File

@@ -229,9 +229,15 @@ fn bench_state_root(c: &mut Criterion) {
black_box({
let mut handle = payload_processor.spawn(
Default::default(),
core::iter::empty::<
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
>(),
(
core::iter::empty::<
Result<
Recovered<TransactionSigned>,
core::convert::Infallible,
>,
>(),
std::convert::identity,
),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider),
&TreeConfig::default(),

View File

@@ -146,17 +146,26 @@ impl<S: StateProvider> StateProvider for CachedStateProvider<S> {
storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
match self.caches.get_storage(&account, &storage_key) {
SlotStatus::NotCached => {
self.metrics.storage_cache_misses.increment(1);
(SlotStatus::NotCached, maybe_cache) => {
let final_res = self.state_provider.storage(account, storage_key)?;
self.caches.insert_storage(account, storage_key, final_res);
let account_cache = maybe_cache.unwrap_or_default();
account_cache.insert_storage(storage_key, final_res);
// we always need to insert the value to update the weights.
// Note: there exists a race when the storage cache did not exist yet and two
// consumers looking up the a storage value for this account for the first time,
// however we can assume that this will only happen for the very first (mostlikely
// the same) value, and don't expect that this will accidentally
// replace an account storage cache with additional values.
self.caches.insert_storage_cache(account, account_cache);
self.metrics.storage_cache_misses.increment(1);
Ok(final_res)
}
SlotStatus::Empty => {
(SlotStatus::Empty, _) => {
self.metrics.storage_cache_hits.increment(1);
Ok(None)
}
SlotStatus::Value(value) => {
(SlotStatus::Value(value), _) => {
self.metrics.storage_cache_hits.increment(1);
Ok(Some(value))
}
@@ -311,18 +320,28 @@ pub(crate) struct ExecutionCache {
impl ExecutionCache {
/// Get storage value from hierarchical cache.
///
/// Returns a `SlotStatus` indicating whether:
/// - `NotCached`: The account's storage cache doesn't exist
/// - `Empty`: The slot exists in the account's cache but is empty
/// - `Value`: The slot exists and has a specific value
pub(crate) fn get_storage(&self, address: &Address, key: &StorageKey) -> SlotStatus {
/// Returns a tuple of:
/// - `SlotStatus` indicating whether:
/// - `NotCached`: The account's storage cache doesn't exist
/// - `Empty`: The slot exists in the account's cache but is empty
/// - `Value`: The slot exists and has a specific value
/// - `Option<Arc<AccountStorageCache>>`: The account's storage cache if it exists
pub(crate) fn get_storage(
&self,
address: &Address,
key: &StorageKey,
) -> (SlotStatus, Option<Arc<AccountStorageCache>>) {
match self.storage_cache.get(address) {
None => SlotStatus::NotCached,
Some(account_cache) => account_cache.get_storage(key),
None => (SlotStatus::NotCached, None),
Some(account_cache) => {
let status = account_cache.get_storage(key);
(status, Some(account_cache))
}
}
}
/// Insert storage value into hierarchical cache
#[cfg(test)]
pub(crate) fn insert_storage(
&self,
address: Address,
@@ -351,6 +370,15 @@ impl ExecutionCache {
self.storage_cache.insert(address, account_cache);
}
/// Inserts the [`AccountStorageCache`].
pub(crate) fn insert_storage_cache(
&self,
address: Address,
storage_cache: Arc<AccountStorageCache>,
) {
self.storage_cache.insert(address, storage_cache);
}
/// Invalidate storage for specific account
pub(crate) fn invalidate_account_storage(&self, address: &Address) {
self.storage_cache.invalidate(address);
@@ -800,7 +828,7 @@ mod tests {
caches.insert_storage(address, storage_key, Some(storage_value));
// check that the storage returns the cached value
let slot_status = caches.get_storage(&address, &storage_key);
let (slot_status, _) = caches.get_storage(&address, &storage_key);
assert_eq!(slot_status, SlotStatus::Value(storage_value));
}
@@ -814,7 +842,7 @@ mod tests {
let caches = ExecutionCacheBuilder::default().build_caches(1000);
// check that the storage is not cached
let slot_status = caches.get_storage(&address, &storage_key);
let (slot_status, _) = caches.get_storage(&address, &storage_key);
assert_eq!(slot_status, SlotStatus::NotCached);
}
@@ -830,7 +858,7 @@ mod tests {
caches.insert_storage(address, storage_key, None);
// check that the storage is empty
let slot_status = caches.get_storage(&address, &storage_key);
let (slot_status, _) = caches.get_storage(&address, &storage_key);
assert_eq!(slot_status, SlotStatus::Empty);
}

View File

@@ -1901,6 +1901,16 @@ where
false
}
/// Returns true if the given hash is part of the last received sync target fork choice update.
///
/// See [`ForkchoiceStateTracker::sync_target_state`]
fn is_any_sync_target(&self, block_hash: B256) -> bool {
if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
return target.contains(block_hash)
}
false
}
/// Checks if the given `check` hash points to an invalid header, inserting the given `head`
/// block into the invalid header cache if the `check` hash has a known invalid ancestor.
///
@@ -2040,9 +2050,12 @@ where
match self.insert_block(child) {
Ok(res) => {
debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
if self.is_sync_target_head(child_num_hash.hash) &&
if self.is_any_sync_target(child_num_hash.hash) &&
matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
{
debug!(target: "engine::tree", child =?child_num_hash, "connected sync target block");
// we just inserted a block that we know is part of the canonical chain, so
// we can make it canonical
self.make_canonical(child_num_hash.hash)?;
}
}
@@ -2348,11 +2361,15 @@ where
// try to append the block
match self.insert_block(block) {
Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
if self.is_sync_target_head(block_num_hash.hash) {
trace!(target: "engine::tree", "appended downloaded sync target block");
// check if we just inserted a block that's part of sync targets,
// i.e. head, safe, or finalized
if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
sync_target.contains(block_num_hash.hash)
{
debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
// we just inserted the current sync target block, we can try to make it
// canonical
// we just inserted a block that we know is part of the canonical chain, so we
// can make it canonical
return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
sync_target_head: block_num_hash.hash,
})))

View File

@@ -21,6 +21,7 @@ use executor::WorkloadExecutor;
use multiproof::{SparseTrieUpdate, *};
use parking_lot::RwLock;
use prewarm::PrewarmMetrics;
use rayon::iter::{ParallelBridge, ParallelIterator};
use reth_engine_primitives::ExecutableTxIterator;
use reth_evm::{
execute::{ExecutableTxFor, WithTxEnv},
@@ -40,6 +41,7 @@ use reth_trie_sparse::{
};
use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
use std::{
collections::BTreeMap,
sync::{
atomic::AtomicBool,
mpsc::{self, channel},
@@ -312,21 +314,50 @@ where
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>>,
usize,
) {
let (transactions, convert) = transactions.into();
let transactions = transactions.into_iter();
// Get the transaction count for prewarming task
// Use upper bound if available (more accurate), otherwise use lower bound
let (lower, upper) = transactions.size_hint();
let transaction_count_hint = upper.unwrap_or(lower);
// Spawn a task that iterates through all transactions in parallel and sends them to the
// main task.
let (tx, rx) = mpsc::channel();
self.executor.spawn_blocking(move || {
transactions.enumerate().par_bridge().for_each_with(tx, |sender, (idx, tx)| {
let tx = convert(tx);
let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
let _ = sender.send((idx, tx));
});
});
// Spawn a task that processes out-of-order transactions from the task above and sends them
// to prewarming and execution tasks.
let (prewarm_tx, prewarm_rx) = mpsc::channel();
let (execute_tx, execute_rx) = mpsc::channel();
self.executor.spawn_blocking(move || {
for tx in transactions {
let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
let mut next_for_execution = 0;
let mut queue = BTreeMap::new();
while let Ok((idx, tx)) = rx.recv() {
// only send Ok(_) variants to prewarming task
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
let _ = execute_tx.send(tx);
if next_for_execution == idx {
let _ = execute_tx.send(tx);
next_for_execution += 1;
while let Some(entry) = queue.first_entry() &&
*entry.key() == next_for_execution
{
let _ = execute_tx.send(entry.remove());
next_for_execution += 1;
}
} else {
queue.insert(idx, tx);
}
}
});
@@ -1017,13 +1048,19 @@ mod tests {
let provider_factory = BlockchainProvider::new(factory).unwrap();
let mut handle = payload_processor.spawn(
Default::default(),
core::iter::empty::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>(),
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider_factory),
&TreeConfig::default(),
);
let mut handle =
payload_processor.spawn(
Default::default(),
(
core::iter::empty::<
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
>(),
std::convert::identity,
),
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider_factory),
&TreeConfig::default(),
);
let mut state_hook = handle.state_hook();

View File

@@ -213,16 +213,31 @@ where
Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
{
match input {
BlockOrPayload::Payload(payload) => Ok(Either::Left(
self.evm_config
BlockOrPayload::Payload(payload) => {
let (iter, convert) = self
.evm_config
.tx_iterator_for_payload(payload)
.map_err(NewPayloadError::other)?
.map(|res| res.map(Either::Left).map_err(NewPayloadError::other)),
)),
.into();
let iter = Either::Left(iter.into_iter().map(Either::Left));
let convert = move |tx| {
let Either::Left(tx) = tx else { unreachable!() };
convert(tx).map(Either::Left).map_err(Either::Left)
};
// Box the closure to satisfy the `Fn` bound both here and in the branch below
Ok((iter, Box::new(convert) as Box<dyn Fn(_) -> _ + Send + Sync + 'static>))
}
BlockOrPayload::Block(block) => {
Ok(Either::Right(block.body().clone_transactions().into_iter().map(|tx| {
Ok(Either::Right(tx.try_into_recovered().map_err(NewPayloadError::other)?))
})))
let iter =
Either::Right(block.body().clone_transactions().into_iter().map(Either::Right));
let convert = move |tx: Either<_, N::SignedTx>| {
let Either::Right(tx) = tx else { unreachable!() };
tx.try_into_recovered().map(Either::Right).map_err(Either::Right)
};
Ok((iter, Box::new(convert)))
}
}
}

View File

@@ -1,190 +0,0 @@
//! Simple decoding and decompressing tests
//! for mainnet era files
use reth_era::{
common::file_ops::{StreamReader, StreamWriter},
era::file::{EraReader, EraWriter},
};
use std::io::Cursor;
use crate::{EraTestDownloader, HOODI};
// Helper function to test decompression and decoding for a specific era file
async fn test_era_file_decompression_and_decoding(
downloader: &EraTestDownloader,
filename: &str,
network: &str,
) -> eyre::Result<()> {
println!("\nTesting file: {filename}");
let file = downloader.open_era_file(filename, network).await?;
// Handle genesis era separately
if file.group.is_genesis() {
// Genesis has no blocks
assert_eq!(file.group.blocks.len(), 0, "Genesis should have no blocks");
assert!(file.group.slot_index.is_none(), "Genesis should not have block slot index");
// Test genesis state decompression
let state_data = file.group.era_state.decompress()?;
assert!(!state_data.is_empty(), "Genesis state should decompress to non-empty data");
// Verify state slot index
assert_eq!(
file.group.state_slot_index.slot_count(),
1,
"Genesis state index should have count of 1"
);
let mut buffer = Vec::new();
{
let mut writer = EraWriter::new(&mut buffer);
writer.write_file(&file)?;
}
let reader = EraReader::new(Cursor::new(&buffer));
let read_back_file = reader.read(file.id.network_name.clone())?;
assert_eq!(
file.group.era_state.decompress()?,
read_back_file.group.era_state.decompress()?,
"Genesis state data should be identical"
);
println!("Genesis era verified successfully");
return Ok(());
}
// Non-genesis era - test beacon blocks
println!(
" Non-genesis era with {} beacon blocks, starting at slot {}",
file.group.blocks.len(),
file.group.starting_slot()
);
// Test beacon block decompression across different positions
let test_block_indices = [
0, // First block
file.group.blocks.len() / 2, // Middle block
file.group.blocks.len() - 1, // Last block
];
for &block_idx in &test_block_indices {
let block = &file.group.blocks[block_idx];
let slot = file.group.starting_slot() + block_idx as u64;
println!(
"\n Testing beacon block at slot {}, compressed size: {} bytes",
slot,
block.data.len()
);
// Test beacon block decompression
let block_data = block.decompress()?;
assert!(
!block_data.is_empty(),
"Beacon block at slot {slot} decompression should produce non-empty data"
);
}
// Test era state decompression
let state_data = file.group.era_state.decompress()?;
assert!(!state_data.is_empty(), "Era state decompression should produce non-empty data");
println!(" Era state decompressed: {} bytes", state_data.len());
// Verify slot indices
if let Some(ref block_slot_index) = file.group.slot_index {
println!(
" Block slot index: starting_slot={}, count={}",
block_slot_index.starting_slot,
block_slot_index.slot_count()
);
// Check for empty slots
let empty_slots: Vec<usize> = (0..block_slot_index.slot_count())
.filter(|&i| !block_slot_index.has_data_at_slot(i))
.collect();
if !empty_slots.is_empty() {
println!(
" Found {} empty slots (first few): {:?}",
empty_slots.len(),
&empty_slots[..empty_slots.len().min(5)]
);
}
}
// Test round-trip serialization
let mut buffer = Vec::new();
{
let mut writer = EraWriter::new(&mut buffer);
writer.write_file(&file)?;
}
// Read back from buffer
let reader = EraReader::new(Cursor::new(&buffer));
let read_back_file = reader.read(file.id.network_name.clone())?;
// Verify basic properties are preserved
assert_eq!(file.id.network_name, read_back_file.id.network_name);
assert_eq!(file.id.start_slot, read_back_file.id.start_slot);
assert_eq!(file.id.slot_count, read_back_file.id.slot_count);
assert_eq!(file.group.blocks.len(), read_back_file.group.blocks.len());
// Test data preservation for beacon blocks
for &idx in &test_block_indices {
let original_block = &file.group.blocks[idx];
let read_back_block = &read_back_file.group.blocks[idx];
let slot = file.group.starting_slot() + idx as u64;
// Test that decompressed data is identical
assert_eq!(
original_block.decompress()?,
read_back_block.decompress()?,
"Beacon block data should be identical for slot {slot}"
);
}
// Test state data preservation
assert_eq!(
file.group.era_state.decompress()?,
read_back_file.group.era_state.decompress()?,
"Era state data should be identical"
);
// Test slot indices preservation
if let (Some(original_index), Some(read_index)) =
(&file.group.slot_index, &read_back_file.group.slot_index)
{
assert_eq!(
original_index.starting_slot, read_index.starting_slot,
"Block slot index starting slot should match"
);
assert_eq!(
original_index.offsets, read_index.offsets,
"Block slot index offsets should match"
);
}
assert_eq!(
file.group.state_slot_index.starting_slot,
read_back_file.group.state_slot_index.starting_slot,
"State slot index starting slot should match"
);
assert_eq!(
file.group.state_slot_index.offsets, read_back_file.group.state_slot_index.offsets,
"State slot index offsets should match"
);
Ok(())
}
#[test_case::test_case("hoodi-00000-212f13fc.era"; "era_dd_hoodi_0")]
#[test_case::test_case("hoodi-00021-857e418b.era"; "era_dd_hoodi_21")]
#[test_case::test_case("hoodi-00175-202aaa6d.era"; "era_dd_hoodi_175")]
#[test_case::test_case("hoodi-00201-0d521fc8.era"; "era_dd_hoodi_201")]
#[tokio::test(flavor = "multi_thread")]
#[ignore = "download intensive"]
async fn test_hoodi_era1_file_decompression_and_decoding(filename: &str) -> eyre::Result<()> {
let downloader = EraTestDownloader::new().await?;
test_era_file_decompression_and_decoding(&downloader, filename, HOODI).await
}

View File

@@ -1,2 +1,2 @@
mod dd;
mod genesis;
mod roundtrip;

View File

@@ -0,0 +1,228 @@
//! Roundtrip tests for `.era` files.
//!
//! These tests verify the full lifecycle of era files by:
//! - Reading files from their original source
//! - Decompressing their contents
//! - Re-compressing the data
//! - Writing the data back to a new file
//! - Confirming that all original data is preserved throughout the process
//!
//!
//! Only a couple of era files are downloaded from `https://mainnet.era.nimbus.team/` for mainnet
//! and `https://hoodi.era.nimbus.team/` for hoodi to keep the tests efficient.
use reth_era::{
common::file_ops::{EraFileFormat, StreamReader, StreamWriter},
era::{
file::{EraFile, EraReader, EraWriter},
types::{
consensus::{CompressedBeaconState, CompressedSignedBeaconBlock},
group::{EraGroup, EraId},
},
},
};
use std::io::Cursor;
use crate::{EraTestDownloader, HOODI, MAINNET};
// Helper function to test roundtrip compression/encoding for a specific file
async fn test_era_file_roundtrip(
downloader: &EraTestDownloader,
filename: &str,
network: &str,
) -> eyre::Result<()> {
println!("\nTesting roundtrip for file: {filename}");
let original_file = downloader.open_era_file(filename, network).await?;
if original_file.group.is_genesis() {
println!("Genesis era detected, using special handling");
assert_eq!(original_file.group.blocks.len(), 0, "Genesis should have no blocks");
assert!(
original_file.group.slot_index.is_none(),
"Genesis should not have block slot index"
);
let state_data = original_file.group.era_state.decompress()?;
println!(" Genesis state decompressed: {} bytes", state_data.len());
// File roundtrip test
let mut buffer = Vec::new();
{
let mut writer = EraWriter::new(&mut buffer);
writer.write_file(&original_file)?;
}
let reader = EraReader::new(Cursor::new(&buffer));
let roundtrip_file = reader.read(network.to_string())?;
assert_eq!(
original_file.group.era_state.decompress()?,
roundtrip_file.group.era_state.decompress()?,
"Genesis state data should be identical after roundtrip"
);
println!("Genesis era verified successfully");
return Ok(());
}
// non genesis start
let original_state_data = original_file.group.era_state.decompress()?;
let mut buffer = Vec::new();
{
let mut writer = EraWriter::new(&mut buffer);
writer.write_file(&original_file)?;
}
// Read back from buffer
let reader = EraReader::new(Cursor::new(&buffer));
let roundtrip_file = reader.read(network.to_string())?;
assert_eq!(
original_file.id.network_name, roundtrip_file.id.network_name,
"Network name should match after roundtrip"
);
assert_eq!(
original_file.id.start_slot, roundtrip_file.id.start_slot,
"Start slot should match after roundtrip"
);
assert_eq!(
original_file.group.blocks.len(),
roundtrip_file.group.blocks.len(),
"Block count should match after roundtrip"
);
// Select a few blocks to test
let test_block_indices = [
0, // First block
original_file.group.blocks.len() / 2, // Middle block
original_file.group.blocks.len() - 1, // Last block
];
// Test individual beacon blocks
for &block_idx in &test_block_indices {
let original_block = &original_file.group.blocks[block_idx];
let roundtrip_block = &roundtrip_file.group.blocks[block_idx];
let original_block_data = original_block.decompress()?;
let roundtrip_block_data = roundtrip_block.decompress()?;
// Verify file roundtrip preserves data
assert_eq!(
original_block_data, roundtrip_block_data,
"Block {block_idx} data should be identical after file roundtrip"
);
// Verify compression roundtrip
let recompressed_block = CompressedSignedBeaconBlock::from_ssz(&original_block_data)?;
let recompressed_block_data = recompressed_block.decompress()?;
assert_eq!(
original_block_data, recompressed_block_data,
"Block {block_idx} should be identical after re-compression cycle"
);
}
let roundtrip_state_data = roundtrip_file.group.era_state.decompress()?;
assert_eq!(
original_state_data, roundtrip_state_data,
"Era state data should be identical after roundtrip"
);
let recompressed_state = CompressedBeaconState::from_ssz(&roundtrip_state_data)?;
let recompressed_state_data = recompressed_state.decompress()?;
assert_eq!(
original_state_data, recompressed_state_data,
"Era state data should be identical after re-compression cycle"
);
let recompressed_blocks: Vec<CompressedSignedBeaconBlock> = roundtrip_file
.group
.blocks
.iter()
.map(|block| {
let data = block.decompress()?;
CompressedSignedBeaconBlock::from_ssz(&data)
})
.collect::<Result<Vec<_>, _>>()?;
let new_group = if let Some(ref block_index) = roundtrip_file.group.slot_index {
EraGroup::with_block_index(
recompressed_blocks,
recompressed_state,
block_index.clone(),
roundtrip_file.group.state_slot_index.clone(),
)
} else {
EraGroup::new(
recompressed_blocks,
recompressed_state,
roundtrip_file.group.state_slot_index,
)
};
let (start_slot, slot_count) = new_group.slot_range();
let new_file = EraFile::new(new_group, EraId::new(network, start_slot, slot_count));
let mut reconstructed_buffer = Vec::new();
{
let mut writer = EraWriter::new(&mut reconstructed_buffer);
writer.write_file(&new_file)?;
}
let reader = EraReader::new(Cursor::new(&reconstructed_buffer));
let reconstructed_file = reader.read(network.to_string())?;
assert_eq!(
original_file.group.blocks.len(),
reconstructed_file.group.blocks.len(),
"Block count should match after full reconstruction"
);
// Verify all reconstructed blocks match
for (idx, (orig, recon)) in
original_file.group.blocks.iter().zip(reconstructed_file.group.blocks.iter()).enumerate()
{
assert_eq!(
orig.decompress()?,
recon.decompress()?,
"Block {idx} should match after full reconstruction"
);
}
// Verify reconstructed state matches
assert_eq!(
original_state_data,
reconstructed_file.group.era_state.decompress()?,
"State should match after full reconstruction"
);
println!("File {filename} roundtrip successful");
Ok(())
}
#[test_case::test_case("mainnet-00000-4b363db9.era"; "era_roundtrip_mainnet_0")]
#[test_case::test_case("mainnet-00178-0d0a5290.era"; "era_roundtrip_mainnet_178")]
#[test_case::test_case("mainnet-01070-7616e3e2.era"; "era_roundtrip_mainnet_1070")]
#[test_case::test_case("mainnet-01267-e3ddc749.era"; "era_roundtrip_mainnet_1267")]
#[test_case::test_case("mainnet-01592-d4dc8b98.era"; "era_roundtrip_mainnet_1592")]
#[tokio::test(flavor = "multi_thread")]
#[ignore = "download intensive"]
async fn test_roundtrip_compression_encoding_mainnet(filename: &str) -> eyre::Result<()> {
let downloader = EraTestDownloader::new().await?;
test_era_file_roundtrip(&downloader, filename, MAINNET).await
}
#[test_case::test_case("hoodi-00000-212f13fc.era"; "era_roundtrip_hoodi_0")]
#[test_case::test_case("hoodi-00021-857e418b.era"; "era_roundtrip_hoodi_21")]
#[test_case::test_case("hoodi-00175-202aaa6d.era"; "era_roundtrip_hoodi_175")]
#[test_case::test_case("hoodi-00201-0d521fc8.era"; "era_roundtrip_hoodi_201")]
#[tokio::test(flavor = "multi_thread")]
#[ignore = "download intensive"]
async fn test_roundtrip_compression_encoding_hoodi(filename: &str) -> eyre::Result<()> {
let downloader = EraTestDownloader::new().await?;
test_era_file_roundtrip(&downloader, filename, HOODI).await
}

View File

@@ -1,159 +0,0 @@
//! Simple decoding and decompressing tests
//! for mainnet era1 files
use alloy_consensus::{BlockBody, Header};
use alloy_primitives::U256;
use reth_era::{
common::file_ops::{StreamReader, StreamWriter},
e2s::types::IndexEntry,
era1::{
file::{Era1Reader, Era1Writer},
types::execution::CompressedBody,
},
};
use reth_ethereum_primitives::TransactionSigned;
use std::io::Cursor;
use crate::{EraTestDownloader, MAINNET};
// Helper function to test decompression and decoding for a specific era1 file
async fn test_file_decompression(
downloader: &EraTestDownloader,
filename: &str,
) -> eyre::Result<()> {
println!("\nTesting file: {filename}");
let file = downloader.open_era1_file(filename, MAINNET).await?;
// Test block decompression across different positions in the file
let test_block_indices = [
0, // First block
file.group.blocks.len() / 2, // Middle block
file.group.blocks.len() - 1, // Last block
];
for &block_idx in &test_block_indices {
let block = &file.group.blocks[block_idx];
let block_number = file.group.block_index.starting_number() + block_idx as u64;
println!(
"\n Testing block {}, compressed body size: {} bytes",
block_number,
block.body.data.len()
);
// Test header decompression and decoding
let header_data = block.header.decompress()?;
assert!(
!header_data.is_empty(),
"Block {block_number} header decompression should produce non-empty data"
);
let header = block.header.decode_header()?;
assert_eq!(header.number, block_number, "Decoded header should have correct block number");
println!("Header decompression and decoding successful");
// Test body decompression
let body_data = block.body.decompress()?;
assert!(
!body_data.is_empty(),
"Block {block_number} body decompression should produce non-empty data"
);
println!("Body decompression successful ({} bytes)", body_data.len());
let decoded_body: BlockBody<TransactionSigned> =
CompressedBody::decode_body_from_decompressed::<TransactionSigned, Header>(&body_data)
.expect("Failed to decode body");
println!(
"Body decoding successful: {} transactions, {} ommers, withdrawals: {}",
decoded_body.transactions.len(),
decoded_body.ommers.len(),
decoded_body.withdrawals.is_some()
);
// Test receipts decompression
let receipts_data = block.receipts.decompress()?;
assert!(
!receipts_data.is_empty(),
"Block {block_number} receipts decompression should produce non-empty data"
);
println!("Receipts decompression successful ({} bytes)", receipts_data.len());
assert!(
block.total_difficulty.value > U256::ZERO,
"Block {block_number} should have non-zero difficulty"
);
println!("Total difficulty verified: {}", block.total_difficulty.value);
}
// Test round-trip serialization
println!("\n Testing data preservation roundtrip...");
let mut buffer = Vec::new();
{
let mut writer = Era1Writer::new(&mut buffer);
writer.write_file(&file)?;
}
// Read back from buffer
let reader = Era1Reader::new(Cursor::new(&buffer));
let read_back_file = reader.read(file.id.network_name.clone())?;
// Verify basic properties are preserved
assert_eq!(file.id.network_name, read_back_file.id.network_name);
assert_eq!(file.id.start_block, read_back_file.id.start_block);
assert_eq!(file.group.blocks.len(), read_back_file.group.blocks.len());
assert_eq!(file.group.accumulator.root, read_back_file.group.accumulator.root);
// Test data preservation for some blocks
for &idx in &test_block_indices {
let original_block = &file.group.blocks[idx];
let read_back_block = &read_back_file.group.blocks[idx];
let block_number = file.group.block_index.starting_number() + idx as u64;
println!("Block {block_number} details:");
println!(" Header size: {} bytes", original_block.header.data.len());
println!(" Body size: {} bytes", original_block.body.data.len());
println!(" Receipts size: {} bytes", original_block.receipts.data.len());
// Test that decompressed data is identical
assert_eq!(
original_block.header.decompress()?,
read_back_block.header.decompress()?,
"Header data should be identical for block {block_number}"
);
assert_eq!(
original_block.body.decompress()?,
read_back_block.body.decompress()?,
"Body data should be identical for block {block_number}"
);
assert_eq!(
original_block.receipts.decompress()?,
read_back_block.receipts.decompress()?,
"Receipts data should be identical for block {block_number}"
);
assert_eq!(
original_block.total_difficulty.value, read_back_block.total_difficulty.value,
"Total difficulty should be identical for block {block_number}"
);
}
Ok(())
}
#[test_case::test_case("mainnet-00000-5ec1ffb8.era1"; "era_dd_mainnet_0")]
#[test_case::test_case("mainnet-00003-d8b8a40b.era1"; "era_dd_mainnet_3")]
#[test_case::test_case("mainnet-00151-e322efe1.era1"; "era_dd_mainnet_151")]
#[test_case::test_case("mainnet-00293-0d6c5812.era1"; "era_dd_mainnet_293")]
#[test_case::test_case("mainnet-00443-ea71b6f9.era1"; "era_dd_mainnet_443")]
#[test_case::test_case("mainnet-01367-d7efc68f.era1"; "era_dd_mainnet_1367")]
#[test_case::test_case("mainnet-01610-99fdde4b.era1"; "era_dd_mainnet_1610")]
#[test_case::test_case("mainnet-01895-3f81607c.era1"; "era_dd_mainnet_1895")]
#[tokio::test(flavor = "multi_thread")]
#[ignore = "download intensive"]
async fn test_mainnet_era1_file_decompression_and_decoding(filename: &str) -> eyre::Result<()> {
let downloader = EraTestDownloader::new().await?;
test_file_decompression(&downloader, filename).await
}

View File

@@ -1,3 +1,2 @@
mod dd;
mod genesis;
mod roundtrip;

View File

@@ -6,6 +6,9 @@
//! - Re-encoding and recompressing the data
//! - Writing the data back to a new file
//! - Confirming that all original data is preserved throughout the process
//!
//! Only a couple of era1 files are downloaded from <https://era.ithaca.xyz/era1/> for mainnet
//! and <https://era.ithaca.xyz/sepolia-era1/> for sepolia to keep the tests efficient.
use alloy_consensus::{BlockBody, BlockHeader, Header, ReceiptEnvelope};
use reth_era::{
@@ -27,7 +30,7 @@ use std::io::Cursor;
use crate::{EraTestDownloader, MAINNET, SEPOLIA};
// Helper function to test roundtrip compression/encoding for a specific file
async fn test_file_roundtrip(
async fn test_era1_file_roundtrip(
downloader: &EraTestDownloader,
filename: &str,
network: &str,
@@ -252,27 +255,27 @@ async fn test_file_roundtrip(
Ok(())
}
#[test_case::test_case("mainnet-00000-5ec1ffb8.era1"; "era_mainnet_0")]
#[test_case::test_case("mainnet-00151-e322efe1.era1"; "era_mainnet_151")]
#[test_case::test_case("mainnet-01367-d7efc68f.era1"; "era_mainnet_1367")]
#[test_case::test_case("mainnet-01895-3f81607c.era1"; "era_mainnet_1895")]
#[test_case::test_case("mainnet-00000-5ec1ffb8.era1"; "era1_roundtrip_mainnet_0")]
#[test_case::test_case("mainnet-00151-e322efe1.era1"; "era1_roundtrip_mainnet_151")]
#[test_case::test_case("mainnet-01367-d7efc68f.era1"; "era1_roundtrip_mainnet_1367")]
#[test_case::test_case("mainnet-01895-3f81607c.era1"; "era1_roundtrip_mainnet_1895")]
#[tokio::test(flavor = "multi_thread")]
#[ignore = "download intensive"]
async fn test_roundtrip_compression_encoding_mainnet(filename: &str) -> eyre::Result<()> {
let downloader = EraTestDownloader::new().await?;
test_file_roundtrip(&downloader, filename, MAINNET).await
test_era1_file_roundtrip(&downloader, filename, MAINNET).await
}
#[test_case::test_case("sepolia-00000-643a00f7.era1"; "era_sepolia_0")]
#[test_case::test_case("sepolia-00074-0e81003c.era1"; "era_sepolia_74")]
#[test_case::test_case("sepolia-00173-b6924da5.era1"; "era_sepolia_173")]
#[test_case::test_case("sepolia-00182-a4f0a8a1.era1"; "era_sepolia_182")]
#[test_case::test_case("sepolia-00000-643a00f7.era1"; "era1_roundtrip_sepolia_0")]
#[test_case::test_case("sepolia-00074-0e81003c.era1"; "era1_roundtrip_sepolia_74")]
#[test_case::test_case("sepolia-00173-b6924da5.era1"; "era1_roundtrip_sepolia_173")]
#[test_case::test_case("sepolia-00182-a4f0a8a1.era1"; "era1_roundtrip_sepolia_182")]
#[tokio::test(flavor = "multi_thread")]
#[ignore = "download intensive"]
async fn test_roundtrip_compression_encoding_sepolia(filename: &str) -> eyre::Result<()> {
let downloader = EraTestDownloader::new().await?;
test_file_roundtrip(&downloader, filename, SEPOLIA).await?;
test_era1_file_roundtrip(&downloader, filename, SEPOLIA).await?;
Ok(())
}

View File

@@ -91,16 +91,19 @@ const ERA_MAINNET_URL: &str = "https://mainnet.era.nimbus.team/";
/// Succinct list of mainnet files we want to download
/// from <https://mainnet.era.nimbus.team/> //TODO: to replace with internal era files hosting url
/// for testing purposes
const ERA_MAINNET_FILES_NAMES: [&str; 4] = [
const ERA_MAINNET_FILES_NAMES: [&str; 8] = [
"mainnet-00000-4b363db9.era",
"mainnet-00178-0d0a5290.era",
"mainnet-00518-4e267a3a.era",
"mainnet-01140-f70d4869.era",
"mainnet-00780-bb546fec.era",
"mainnet-01070-7616e3e2.era",
"mainnet-01267-e3ddc749.era",
"mainnet-01581-82073d28.era",
"mainnet-01592-d4dc8b98.era",
];
/// Utility for downloading `.era1` files for tests
/// in a temporary directory
/// and caching them in memory
/// Utility for downloading `.era` and `.era1` files for tests
/// in a temporary directory and caching them in memory
#[derive(Debug)]
struct EraTestDownloader {
/// Temporary directory for storing downloaded files
@@ -180,7 +183,7 @@ impl EraTestDownloader {
Ok(())
}
/// Get network configuration, URL and supported files, based on network and file type
/// Get network configuration, URL and supported files, based on network and file type
fn get_network_config(
&self,
filename: &str,
@@ -202,14 +205,13 @@ impl EraTestDownloader {
}
}
/// open .era1 file, downloading it if necessary
/// Open `.era1` file, downloading it if necessary
async fn open_era1_file(&self, filename: &str, network: &str) -> Result<Era1File> {
let path = self.download_file(filename, network).await?;
Era1Reader::open(&path, network).map_err(|e| eyre!("Failed to open Era1 file: {e}"))
}
/// open .era file, downloading it if necessary
#[allow(dead_code)]
/// Open `.era` file, downloading it if necessary
async fn open_era_file(&self, filename: &str, network: &str) -> Result<EraFile> {
let path = self.download_file(filename, network).await?;
EraReader::open(&path, network).map_err(|e| eyre!("Failed to open Era1 file: {e}"))

View File

@@ -289,12 +289,15 @@ where
&self,
payload: &ExecutionData,
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
Ok(payload.payload.transactions().clone().into_iter().map(|tx| {
let txs = payload.payload.transactions().clone().into_iter();
let convert = |tx: Bytes| {
let tx =
TxTy::<Self::Primitives>::decode_2718_exact(tx.as_ref()).map_err(AnyError::new)?;
let signer = tx.try_recover().map_err(AnyError::new)?;
Ok::<_, AnyError>(tx.with_signer(signer))
}))
};
Ok((txs, convert))
}
}

View File

@@ -18,22 +18,51 @@ pub trait ConfigureEngineEvm<ExecutionData>: ConfigureEvm {
) -> Result<impl ExecutableTxIterator<Self>, Self::Error>;
}
/// Iterator over executable transactions.
pub trait ExecutableTxIterator<Evm: ConfigureEvm>:
Iterator<Item = Result<Self::Tx, Self::Error>> + Send + 'static
{
/// A helper trait representing a pair of a "raw" transactions iterator and a closure that can be
/// used to convert them to an executable transaction. This tuple is used in the engine to
/// parallelize heavy work like decoding or recovery.
pub trait ExecutableTxTuple: Into<(Self::Iter, Self::Convert)> + Send + 'static {
/// Raw transaction that can be converted to an [`ExecutableTxTuple::Tx`]
///
/// This can be any type that can be converted to an [`ExecutableTxTuple::Tx`]. For example,
/// an unrecovered transaction or just the transaction bytes.
type RawTx: Send + Sync + 'static;
/// The executable transaction type iterator yields.
type Tx: ExecutableTxFor<Evm> + Clone + Send + Sync + 'static;
type Tx: Clone + Send + Sync + 'static;
/// Errors that may occur while recovering or decoding transactions.
type Error: core::error::Error + Send + Sync + 'static;
/// Iterator over [`ExecutableTxTuple::Tx`]
type Iter: Iterator<Item = Self::RawTx> + Send + 'static;
/// Closure that can be used to convert a [`ExecutableTxTuple::RawTx`] to a
/// [`ExecutableTxTuple::Tx`]. This might involve heavy work like decoding or recovery
/// and will be parallelized in the engine.
type Convert: Fn(Self::RawTx) -> Result<Self::Tx, Self::Error> + Send + Sync + 'static;
}
impl<Evm: ConfigureEvm, Tx, Err, T> ExecutableTxIterator<Evm> for T
impl<RawTx, Tx, Err, I, F> ExecutableTxTuple for (I, F)
where
Tx: ExecutableTxFor<Evm> + Clone + Send + Sync + 'static,
RawTx: Send + Sync + 'static,
Tx: Clone + Send + Sync + 'static,
Err: core::error::Error + Send + Sync + 'static,
T: Iterator<Item = Result<Tx, Err>> + Send + 'static,
I: Iterator<Item = RawTx> + Send + 'static,
F: Fn(RawTx) -> Result<Tx, Err> + Send + Sync + 'static,
{
type RawTx = RawTx;
type Tx = Tx;
type Error = Err;
type Iter = I;
type Convert = F;
}
/// Iterator over executable transactions.
pub trait ExecutableTxIterator<Evm: ConfigureEvm>:
ExecutableTxTuple<Tx: ExecutableTxFor<Evm>>
{
}
impl<T, Evm: ConfigureEvm> ExecutableTxIterator<Evm> for T where
T: ExecutableTxTuple<Tx: ExecutableTxFor<Evm>>
{
}

View File

@@ -16,7 +16,7 @@ use alloy_consensus::{BlockHeader, Header};
use alloy_eips::Decodable2718;
use alloy_evm::{EvmFactory, FromRecoveredTx, FromTxWithEncoded};
use alloy_op_evm::block::{receipt_builder::OpReceiptBuilder, OpTxEnv};
use alloy_primitives::U256;
use alloy_primitives::{Bytes, U256};
use core::fmt::Debug;
use op_alloy_consensus::EIP1559ParamError;
use op_alloy_rpc_types_engine::OpExecutionData;
@@ -265,12 +265,15 @@ where
&self,
payload: &OpExecutionData,
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
Ok(payload.payload.transactions().clone().into_iter().map(|encoded| {
let transactions = payload.payload.transactions().clone().into_iter();
let convert = |encoded: Bytes| {
let tx = TxTy::<Self::Primitives>::decode_2718_exact(encoded.as_ref())
.map_err(AnyError::new)?;
let signer = tx.try_recover().map_err(AnyError::new)?;
Ok::<_, AnyError>(WithEncoded::new(encoded, tx.with_signer(signer)))
}))
};
Ok((transactions, convert))
}
}

View File

@@ -198,6 +198,26 @@ pub trait BlockBody:
.collect()
})
}
/// Returns an iterator over `Recovered<&Transaction>` for all transactions in the block body.
///
/// This method recovers signers and returns an iterator without cloning transactions,
/// making it more efficient than [`BlockBody::recover_transactions`] when owned values are not
/// required.
///
/// # Errors
///
/// Returns an error if any transaction's signature is invalid.
fn recover_transactions_ref(
&self,
) -> Result<impl Iterator<Item = Recovered<&Self::Transaction>> + '_, RecoveryError> {
let signers = self.recover_signers()?;
Ok(self
.transactions()
.iter()
.zip(signers)
.map(|(tx, signer)| Recovered::new_unchecked(tx, signer)))
}
}
impl<T, H> BlockBody for alloy_consensus::BlockBody<T, H>

View File

@@ -38,7 +38,7 @@ pub enum HistoryType {
/// Default number of blocks to retain for merkle changesets.
/// This is used by both the `MerkleChangeSets` stage and the pruner segment.
pub const MERKLE_CHANGESETS_RETENTION_BLOCKS: u64 = 64;
pub const MERKLE_CHANGESETS_RETENTION_BLOCKS: u64 = 128;
/// Default pruning mode for merkle changesets
const fn default_merkle_changesets_mode() -> PruneMode {
@@ -95,13 +95,7 @@ pub struct PruneModes {
pub bodies_history: Option<PruneMode>,
/// Merkle Changesets pruning configuration for `AccountsTrieChangeSets` and
/// `StoragesTrieChangeSets`.
#[cfg_attr(
any(test, feature = "serde"),
serde(
default = "default_merkle_changesets_mode",
deserialize_with = "deserialize_prune_mode_with_min_blocks::<MERKLE_CHANGESETS_RETENTION_BLOCKS, _>"
)
)]
#[cfg_attr(any(test, feature = "serde"), serde(default = "default_merkle_changesets_mode"))]
pub merkle_changesets: PruneMode,
/// Receipts pruning configuration by retaining only those receipts that contain logs emitted
/// by the specified addresses, discarding others. This setting is overridden by `receipts`.
@@ -155,14 +149,15 @@ impl PruneModes {
/// Returns `true` if any migration was performed.
///
/// Currently migrates:
/// - `merkle_changesets`: `Distance(10064)` -> `Distance(64)`
pub fn migrate(&mut self) -> bool {
if self.merkle_changesets == PruneMode::Distance(MINIMUM_PRUNING_DISTANCE) {
/// - `merkle_changesets`: `Distance(n)` where `n < 128` or `n == 10064` -> `Distance(128)`
pub const fn migrate(&mut self) -> bool {
if let PruneMode::Distance(d) = self.merkle_changesets &&
(d < MERKLE_CHANGESETS_RETENTION_BLOCKS || d == MINIMUM_PRUNING_DISTANCE)
{
self.merkle_changesets = PruneMode::Distance(MERKLE_CHANGESETS_RETENTION_BLOCKS);
true
} else {
false
return true;
}
false
}
/// Returns an error if we can't unwind to the targeted block because the target block is
@@ -214,28 +209,6 @@ impl PruneModes {
}
}
/// Deserializes [`PruneMode`] and validates that the value is not less than the const
/// generic parameter `MIN_BLOCKS`. This parameter represents the number of blocks that needs to be
/// left in database after the pruning.
///
/// 1. For [`PruneMode::Full`], it fails if `MIN_BLOCKS > 0`.
/// 2. For [`PruneMode::Distance`], it fails if `distance < MIN_BLOCKS + 1`. `+ 1` is needed because
/// `PruneMode::Distance(0)` means that we leave zero blocks from the latest, meaning we have one
/// block in the database.
#[cfg(any(test, feature = "serde"))]
fn deserialize_prune_mode_with_min_blocks<
'de,
const MIN_BLOCKS: u64,
D: serde::Deserializer<'de>,
>(
deserializer: D,
) -> Result<PruneMode, D::Error> {
use serde::Deserialize;
let prune_mode = PruneMode::deserialize(deserializer)?;
serde_deserialize_validate::<MIN_BLOCKS, D>(&prune_mode)?;
Ok(prune_mode)
}
/// Deserializes [`Option<PruneMode>`] and validates that the value is not less than the const
/// generic parameter `MIN_BLOCKS`. This parameter represents the number of blocks that needs to be
/// left in database after the pruning.

View File

@@ -546,6 +546,13 @@ where
.transpose()?
.flatten();
// Return error if toBlock exceeds current head
if let Some(t) = to &&
t > info.best_number
{
return Err(EthFilterError::BlockRangeExceedsHead);
}
if let Some(f) = from &&
f > info.best_number
{
@@ -894,6 +901,9 @@ pub enum EthFilterError {
/// Invalid block range.
#[error("invalid block range params")]
InvalidBlockRangeParams,
/// Block range extends beyond current head.
#[error("block range extends beyond current head block")]
BlockRangeExceedsHead,
/// Query scope is too broad.
#[error("query exceeds max block range {0}")]
QueryExceedsMaxBlocks(u64),
@@ -928,7 +938,8 @@ impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
EthFilterError::EthAPIError(err) => err.into(),
err @ (EthFilterError::InvalidBlockRangeParams |
EthFilterError::QueryExceedsMaxBlocks(_) |
EthFilterError::QueryExceedsMaxResults { .. }) => {
EthFilterError::QueryExceedsMaxResults { .. } |
EthFilterError::BlockRangeExceedsHead) => {
rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
}
}

View File

@@ -382,7 +382,7 @@ where
let mut all_traces = Vec::new();
let mut block_traces = Vec::with_capacity(self.inner.eth_config.max_tracing_requests);
for chunk_start in (start..end).step_by(self.inner.eth_config.max_tracing_requests) {
for chunk_start in (start..=end).step_by(self.inner.eth_config.max_tracing_requests) {
let chunk_end =
std::cmp::min(chunk_start + self.inner.eth_config.max_tracing_requests as u64, end);

View File

@@ -19,6 +19,9 @@ pub struct StorageSettings {
/// Whether this node always writes transaction senders to static files.
#[serde(default)]
pub transaction_senders_in_static_files: bool,
/// Whether `StoragesHistory` is stored in `RocksDB`.
#[serde(default)]
pub storages_history_in_rocksdb: bool,
}
impl StorageSettings {
@@ -28,7 +31,11 @@ impl StorageSettings {
/// `false`, ensuring older nodes continue writing receipts and transaction senders to the
/// database when receipt pruning is enabled.
pub const fn legacy() -> Self {
Self { receipts_in_static_files: false, transaction_senders_in_static_files: false }
Self {
receipts_in_static_files: false,
transaction_senders_in_static_files: false,
storages_history_in_rocksdb: false,
}
}
/// Sets the `receipts_in_static_files` flag to the provided value.
@@ -42,4 +49,10 @@ impl StorageSettings {
self.transaction_senders_in_static_files = value;
self
}
/// Sets the `storages_history_in_rocksdb` flag to the provided value.
pub const fn with_storages_history_in_rocksdb(mut self, value: bool) -> Self {
self.storages_history_in_rocksdb = value;
self
}
}

View File

@@ -247,6 +247,20 @@ where
}
}
/// Validates a single transaction with the provided state provider.
pub fn validate_one_with_state_provider(
&self,
origin: TransactionOrigin,
transaction: Tx,
state: impl AccountInfoReader,
) -> TransactionValidationOutcome<Tx> {
let tx = match self.validate_one_no_state(origin, transaction) {
Ok(tx) => tx,
Err(invalid_outcome) => return invalid_outcome,
};
self.validate_one_against_state(origin, tx, state)
}
/// Performs stateless validation on single transaction. Returns unaltered input transaction
/// if all checks pass, so transaction can continue through to stateful validation as argument
/// to [`validate_one_against_state`](Self::validate_one_against_state).

View File

@@ -126,7 +126,8 @@ impl ParallelProof {
)))
})?;
// Extract storage proof directly from the result
// Extract storage proof directly from the result.
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here.
let storage_proof = match proof_msg.result? {
crate::proof_task::ProofResult::StorageProof { hashed_address: addr, proof } => {
debug_assert_eq!(
@@ -134,7 +135,8 @@ impl ParallelProof {
hashed_address,
"storage worker must return same address: expected {hashed_address}, got {addr}"
);
proof
// Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise.
Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone())
}
crate::proof_task::ProofResult::AccountMultiproof { .. } => {
unreachable!("storage worker only sends StorageProof variant")
@@ -223,8 +225,12 @@ impl ParallelProof {
)
})?;
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here.
let (multiproof, stats) = match proof_result_msg.result? {
crate::proof_task::ProofResult::AccountMultiproof { proof, stats } => (proof, stats),
crate::proof_task::ProofResult::AccountMultiproof { proof, stats } => {
// Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise.
(Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone()), stats)
}
crate::proof_task::ProofResult::StorageProof { .. } => {
unreachable!("account worker only sends AccountMultiproof variant")
}

View File

@@ -41,6 +41,7 @@ use alloy_primitives::{
use alloy_rlp::{BufMut, Encodable};
use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use dashmap::DashMap;
use metrics::Histogram;
use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind};
use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
use reth_storage_errors::db::DatabaseError;
@@ -79,6 +80,275 @@ use crate::proof_task_metrics::{
type StorageProofResult = Result<DecodedStorageMultiProof, ParallelStateRootError>;
type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
/// Maximum number of storage proof jobs to batch together per account.
const STORAGE_PROOF_BATCH_LIMIT: usize = 32;
/// Maximum number of blinded node requests to defer during storage proof batching.
/// When this limit is reached, batching stops early to process deferred nodes,
/// preventing starvation of blinded node requests under high proof load.
const MAX_DEFERRED_BLINDED_NODES: usize = 16;
/// Holds batched storage proof jobs for the same account.
///
/// When multiple storage proof requests arrive for the same account, they can be merged
/// into a single proof computation with combined prefix sets and target slots.
#[derive(Debug)]
struct BatchedStorageProof {
/// The merged prefix set from all batched jobs.
prefix_set: PrefixSetMut,
/// The merged target slots from all batched jobs.
target_slots: B256Set,
/// Whether any job requested branch node masks.
with_branch_node_masks: bool,
/// The `multi_added_removed_keys` from the first job (they should all share the same `Arc`).
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
/// All senders that need to receive the result.
senders: Vec<ProofResultContext>,
}
impl BatchedStorageProof {
/// Creates a new batch from the first storage proof input.
fn new(input: StorageProofInput, sender: ProofResultContext) -> Self {
// Convert frozen PrefixSet to mutable PrefixSetMut by collecting its keys.
let prefix_set = PrefixSetMut::from(input.prefix_set.iter().copied());
Self {
prefix_set,
target_slots: input.target_slots,
with_branch_node_masks: input.with_branch_node_masks,
multi_added_removed_keys: input.multi_added_removed_keys,
senders: vec![sender],
}
}
/// Merges another storage proof job into this batch.
///
/// # Panics
/// Panics if `input.multi_added_removed_keys` does not point to the same Arc as the batch's.
/// This is a critical invariant for proof correctness.
fn merge(&mut self, input: StorageProofInput, sender: ProofResultContext) {
// Validate that all batched jobs share the same multi_added_removed_keys Arc.
// This is a critical invariant: if jobs have different keys, the merged proof
// would be computed with only the first job's keys, producing incorrect results.
// Using assert! (not debug_assert!) because incorrect proofs could cause consensus
// failures.
assert!(
match (&self.multi_added_removed_keys, &input.multi_added_removed_keys) {
(Some(a), Some(b)) => Arc::ptr_eq(a, b),
(None, None) => true,
_ => false,
},
"All batched storage proof jobs must share the same multi_added_removed_keys Arc"
);
self.prefix_set.extend_keys(input.prefix_set.iter().copied());
self.target_slots.extend(input.target_slots);
self.with_branch_node_masks |= input.with_branch_node_masks;
self.senders.push(sender);
}
/// Converts this batch into a single `StorageProofInput` for computation.
fn into_input(self, hashed_address: B256) -> (StorageProofInput, Vec<ProofResultContext>) {
let input = StorageProofInput {
hashed_address,
prefix_set: self.prefix_set.freeze(),
target_slots: self.target_slots,
with_branch_node_masks: self.with_branch_node_masks,
multi_added_removed_keys: self.multi_added_removed_keys,
};
(input, self.senders)
}
}
/// Metrics for storage worker batching.
#[derive(Clone, Default)]
struct StorageWorkerBatchMetrics {
/// Histogram of batch sizes (number of jobs merged per computation).
#[cfg(feature = "metrics")]
batch_size_histogram: Option<Histogram>,
}
impl StorageWorkerBatchMetrics {
#[cfg(feature = "metrics")]
fn new() -> Self {
Self {
batch_size_histogram: Some(metrics::histogram!(
"trie.proof_task.storage_worker_batch_size"
)),
}
}
#[cfg(not(feature = "metrics"))]
fn new() -> Self {
Self {}
}
fn record_batch_size(&self, _size: usize) {
#[cfg(feature = "metrics")]
if let Some(h) = &self.batch_size_histogram {
h.record(_size as f64);
}
}
}
/// Maximum number of account multiproof jobs to batch together.
const ACCOUNT_PROOF_BATCH_LIMIT: usize = 32;
/// Holds batched account multiproof jobs.
///
/// When multiple account multiproof requests arrive, they can be merged
/// into a single proof computation with combined targets and prefix sets.
#[derive(Debug)]
struct BatchedAccountProof {
/// The merged targets from all batched jobs.
targets: MultiProofTargets,
/// The merged account prefix set from all batched jobs.
account_prefix_set: PrefixSetMut,
/// The merged storage prefix sets from all batched jobs.
storage_prefix_sets: B256Map<PrefixSetMut>,
/// The merged destroyed accounts from all batched jobs.
destroyed_accounts: B256Set,
/// Whether any job requested branch node masks.
collect_branch_node_masks: bool,
/// The `multi_added_removed_keys` from the first job (they should all share the same `Arc`).
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
/// The shared `missed_leaves_storage_roots` cache from the first job.
missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
/// All senders that need to receive the result.
senders: Vec<ProofResultContext>,
}
impl BatchedAccountProof {
/// Creates a new batch from the first account multiproof input.
fn new(input: AccountMultiproofInput) -> Self {
// Convert frozen prefix sets to mutable versions.
let account_prefix_set =
PrefixSetMut::from(input.prefix_sets.account_prefix_set.iter().copied());
let storage_prefix_sets = input
.prefix_sets
.storage_prefix_sets
.into_iter()
.map(|(addr, ps)| (addr, PrefixSetMut::from(ps.iter().copied())))
.collect();
let destroyed_accounts = input.prefix_sets.destroyed_accounts;
Self {
targets: input.targets,
account_prefix_set,
storage_prefix_sets,
destroyed_accounts,
collect_branch_node_masks: input.collect_branch_node_masks,
multi_added_removed_keys: input.multi_added_removed_keys,
missed_leaves_storage_roots: input.missed_leaves_storage_roots,
senders: vec![input.proof_result_sender],
}
}
/// Attempts to merge another account multiproof job into this batch.
///
/// Returns the job back if caches are incompatible so the caller can process it separately.
fn try_merge(&mut self, input: AccountMultiproofInput) -> Result<(), AccountMultiproofInput> {
// Require all jobs to share the same caches; otherwise merging would produce
// incorrect proofs by reusing the wrong retained keys or missed-leaf storage roots.
let multi_added_removed_keys_mismatch =
!match (&self.multi_added_removed_keys, &input.multi_added_removed_keys) {
(Some(a), Some(b)) => Arc::ptr_eq(a, b),
(None, None) => true,
_ => false,
};
if multi_added_removed_keys_mismatch ||
!Arc::ptr_eq(&self.missed_leaves_storage_roots, &input.missed_leaves_storage_roots)
{
return Err(input);
}
// Merge targets.
self.targets.extend(input.targets);
// Merge account prefix set.
self.account_prefix_set.extend_keys(input.prefix_sets.account_prefix_set.iter().copied());
// Merge storage prefix sets.
for (addr, ps) in input.prefix_sets.storage_prefix_sets {
match self.storage_prefix_sets.entry(addr) {
alloy_primitives::map::Entry::Occupied(mut entry) => {
entry.get_mut().extend_keys(ps.iter().copied());
}
alloy_primitives::map::Entry::Vacant(entry) => {
entry.insert(PrefixSetMut::from(ps.iter().copied()));
}
}
}
// Merge destroyed accounts.
self.destroyed_accounts.extend(input.prefix_sets.destroyed_accounts);
// OR the branch node masks flag.
self.collect_branch_node_masks |= input.collect_branch_node_masks;
// Collect the sender.
self.senders.push(input.proof_result_sender);
Ok(())
}
/// Converts this batch into a single `AccountMultiproofInput` for computation.
fn into_input(self) -> (AccountMultiproofInput, Vec<ProofResultContext>) {
// Freeze the mutable prefix sets.
let storage_prefix_sets: B256Map<PrefixSet> =
self.storage_prefix_sets.into_iter().map(|(addr, ps)| (addr, ps.freeze())).collect();
let prefix_sets = TriePrefixSets {
account_prefix_set: self.account_prefix_set.freeze(),
storage_prefix_sets,
destroyed_accounts: self.destroyed_accounts,
};
// Use a dummy sender for the input since we'll handle all senders separately.
let dummy_sender = self.senders.first().expect("batch always has at least one sender");
let input = AccountMultiproofInput {
targets: self.targets,
prefix_sets,
collect_branch_node_masks: self.collect_branch_node_masks,
multi_added_removed_keys: self.multi_added_removed_keys,
missed_leaves_storage_roots: self.missed_leaves_storage_roots,
proof_result_sender: dummy_sender.clone(),
};
(input, self.senders)
}
}
/// Metrics for account worker batching.
#[derive(Clone, Default)]
struct AccountWorkerBatchMetrics {
/// Histogram of batch sizes (number of jobs merged per computation).
#[cfg(feature = "metrics")]
batch_size_histogram: Option<Histogram>,
}
impl AccountWorkerBatchMetrics {
#[cfg(feature = "metrics")]
fn new() -> Self {
Self {
batch_size_histogram: Some(metrics::histogram!(
"trie.proof_task.account_worker_batch_size"
)),
}
}
#[cfg(not(feature = "metrics"))]
fn new() -> Self {
Self {}
}
fn record_batch_size(&self, _size: usize) {
#[cfg(feature = "metrics")]
if let Some(h) = &self.batch_size_histogram {
h.record(_size as f64);
}
}
}
/// A handle that provides type-safe access to proof worker pools.
///
/// The handle stores direct senders to both storage and account worker pools,
@@ -552,12 +822,16 @@ impl TrieNodeProvider for ProofTaskTrieNodeProvider {
}
}
/// Result of a proof calculation, which can be either an account multiproof or a storage proof.
#[derive(Debug)]
///
/// The proof data is wrapped in `Arc` to enable efficient sharing when batching multiple
/// proof requests. This avoids expensive cloning of the underlying proof structures
/// when sending results to multiple receivers.
#[derive(Debug, Clone)]
pub enum ProofResult {
/// Account multiproof with statistics
AccountMultiproof {
/// The account multiproof
proof: DecodedMultiProof,
/// The account multiproof (Arc-wrapped for efficient sharing in batches)
proof: Arc<DecodedMultiProof>,
/// Statistics collected during proof computation
stats: ParallelTrieStats,
},
@@ -565,8 +839,8 @@ pub enum ProofResult {
StorageProof {
/// The hashed address this storage proof belongs to
hashed_address: B256,
/// The storage multiproof
proof: DecodedStorageMultiProof,
/// The storage multiproof (Arc-wrapped for efficient sharing in batches)
proof: Arc<DecodedStorageMultiProof>,
},
}
@@ -575,11 +849,17 @@ impl ProofResult {
///
/// For account multiproofs, returns the multiproof directly (discarding stats).
/// For storage proofs, wraps the storage proof into a minimal multiproof.
///
/// Note: This method clones the inner proof data. If you need to avoid the clone
/// when you're the sole owner, consider using `Arc::try_unwrap` first.
pub fn into_multiproof(self) -> DecodedMultiProof {
match self {
Self::AccountMultiproof { proof, stats: _ } => proof,
Self::AccountMultiproof { proof, stats: _ } => {
Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone())
}
Self::StorageProof { hashed_address, proof } => {
DecodedMultiProof::from_storage_proof(hashed_address, proof)
let storage_proof = Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone());
DecodedMultiProof::from_storage_proof(hashed_address, storage_proof)
}
}
}
@@ -708,11 +988,18 @@ where
/// 2. Advertises availability
/// 3. Processes jobs in a loop:
/// - Receives job from channel
/// - Drains additional same-account storage proof jobs (batching)
/// - Marks worker as busy
/// - Processes the job
/// - Processes the batched jobs as a single proof computation
/// - Marks worker as available
/// 4. Shuts down when channel closes
///
/// # Batching Strategy
///
/// When multiple storage proof requests arrive for the same account, they are merged
/// into a single proof computation. This reduces redundant trie traversals when state
/// updates arrive faster than proof computation can process them.
///
/// # Panic Safety
///
/// If this function panics, the worker thread terminates but other workers
@@ -732,6 +1019,7 @@ where
// Create provider from factory
let provider = task_ctx.factory.database_provider_ro()?;
let proof_tx = ProofTaskTx::new(provider, worker_id);
let batch_metrics = StorageWorkerBatchMetrics::new();
trace!(
target: "trie::proof_task",
@@ -746,20 +1034,104 @@ where
// Initially mark this worker as available.
available_workers.fetch_add(1, Ordering::Relaxed);
// Deferred blinded node jobs to process after batched storage proofs.
// Pre-allocate with capacity to avoid reallocations during batching.
let mut deferred_blinded_nodes: Vec<(B256, Nibbles, Sender<TrieNodeProviderResult>)> =
Vec::with_capacity(MAX_DEFERRED_BLINDED_NODES);
while let Ok(job) = work_rx.recv() {
// Mark worker as busy.
available_workers.fetch_sub(1, Ordering::Relaxed);
match job {
StorageWorkerJob::StorageProof { input, proof_result_sender } => {
Self::process_storage_proof(
worker_id,
&proof_tx,
input,
proof_result_sender,
&mut storage_proofs_processed,
&mut cursor_metrics_cache,
// Start batching: group storage proofs by account.
let mut batches: B256Map<BatchedStorageProof> = B256Map::default();
batches.insert(
input.hashed_address,
BatchedStorageProof::new(input, proof_result_sender),
);
let mut total_jobs = 1usize;
// Drain additional jobs from the queue.
while total_jobs < STORAGE_PROOF_BATCH_LIMIT {
match work_rx.try_recv() {
Ok(StorageWorkerJob::StorageProof {
input: next_input,
proof_result_sender: next_sender,
}) => {
total_jobs += 1;
let addr = next_input.hashed_address;
match batches.entry(addr) {
alloy_primitives::map::Entry::Occupied(mut entry) => {
entry.get_mut().merge(next_input, next_sender);
}
alloy_primitives::map::Entry::Vacant(entry) => {
entry.insert(BatchedStorageProof::new(
next_input,
next_sender,
));
}
}
}
Ok(StorageWorkerJob::BlindedStorageNode {
account,
path,
result_sender,
}) => {
// Defer blinded node jobs to process after batched proofs.
deferred_blinded_nodes.push((account, path, result_sender));
// Stop batching if too many blinded nodes are deferred to prevent
// starvation.
if deferred_blinded_nodes.len() >= MAX_DEFERRED_BLINDED_NODES {
break;
}
}
Err(_) => break,
}
}
// Process all batched storage proofs.
for (hashed_address, batch) in batches {
let batch_size = batch.senders.len();
batch_metrics.record_batch_size(batch_size);
let (merged_input, senders) = batch.into_input(hashed_address);
trace!(
target: "trie::proof_task",
worker_id,
?hashed_address,
batch_size,
prefix_set_len = merged_input.prefix_set.len(),
target_slots_len = merged_input.target_slots.len(),
"Processing batched storage proof"
);
Self::process_batched_storage_proof(
worker_id,
&proof_tx,
hashed_address,
merged_input,
senders,
&mut storage_proofs_processed,
&mut cursor_metrics_cache,
);
}
// Process any deferred blinded node jobs.
for (account, path, result_sender) in
std::mem::take(&mut deferred_blinded_nodes)
{
Self::process_blinded_node(
worker_id,
&proof_tx,
account,
path,
result_sender,
&mut storage_nodes_processed,
);
}
}
StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
@@ -795,82 +1167,103 @@ where
Ok(())
}
/// Processes a storage proof request.
fn process_storage_proof<Provider>(
/// Processes a batched storage proof request and sends results to all waiting receivers.
///
/// This computes a single storage proof with merged targets and sends the same result
/// to all original requestors, reducing redundant trie traversals.
fn process_batched_storage_proof<Provider>(
worker_id: usize,
proof_tx: &ProofTaskTx<Provider>,
hashed_address: B256,
input: StorageProofInput,
proof_result_sender: ProofResultContext,
senders: Vec<ProofResultContext>,
storage_proofs_processed: &mut u64,
cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
) where
Provider: TrieCursorFactory + HashedCursorFactory,
{
let hashed_address = input.hashed_address;
let ProofResultContext { sender, sequence_number: seq, state, start_time } =
proof_result_sender;
let mut trie_cursor_metrics = TrieCursorMetricsCache::default();
let mut hashed_cursor_metrics = HashedCursorMetricsCache::default();
trace!(
target: "trie::proof_task",
worker_id,
hashed_address = ?hashed_address,
prefix_set_len = input.prefix_set.len(),
target_slots_len = input.target_slots.len(),
"Processing storage proof"
);
let proof_start = Instant::now();
let result = proof_tx.compute_storage_proof(
input,
&mut trie_cursor_metrics,
&mut hashed_cursor_metrics,
);
let proof_elapsed = proof_start.elapsed();
*storage_proofs_processed += 1;
let result_msg = result.map(|storage_proof| ProofResult::StorageProof {
hashed_address,
proof: storage_proof,
});
// Send the result to all waiting receivers.
let num_senders = senders.len();
match result {
Ok(storage_proof) => {
// Success case: wrap proof in Arc for efficient sharing across all senders.
let proof_result =
ProofResult::StorageProof { hashed_address, proof: Arc::new(storage_proof) };
if sender
.send(ProofResultMessage {
sequence_number: seq,
result: result_msg,
elapsed: start_time.elapsed(),
state,
})
.is_err()
{
trace!(
target: "trie::proof_task",
worker_id,
hashed_address = ?hashed_address,
storage_proofs_processed,
"Proof result receiver dropped, discarding result"
);
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
*storage_proofs_processed += 1;
if sender
.send(ProofResultMessage {
sequence_number,
result: Ok(proof_result.clone()),
elapsed: start_time.elapsed(),
state,
})
.is_err()
{
trace!(
target: "trie::proof_task",
worker_id,
?hashed_address,
sequence_number,
"Proof result receiver dropped, discarding result"
);
}
}
}
Err(error) => {
// Error case: convert to string for cloning, then send to all receivers.
let error_msg = error.to_string();
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
*storage_proofs_processed += 1;
if sender
.send(ProofResultMessage {
sequence_number,
result: Err(ParallelStateRootError::Other(error_msg.clone())),
elapsed: start_time.elapsed(),
state,
})
.is_err()
{
trace!(
target: "trie::proof_task",
worker_id,
?hashed_address,
sequence_number,
"Proof result receiver dropped, discarding result"
);
}
}
}
}
trace!(
target: "trie::proof_task",
worker_id,
hashed_address = ?hashed_address,
?hashed_address,
proof_time_us = proof_elapsed.as_micros(),
total_processed = storage_proofs_processed,
num_senders,
trie_cursor_duration_us = trie_cursor_metrics.total_duration.as_micros(),
hashed_cursor_duration_us = hashed_cursor_metrics.total_duration.as_micros(),
?trie_cursor_metrics,
?hashed_cursor_metrics,
"Storage proof completed"
"Batched storage proof completed"
);
#[cfg(feature = "metrics")]
{
// Accumulate per-proof metrics into the worker's cache
let per_proof_cache = ProofTaskCursorMetricsCache {
account_trie_cursor: TrieCursorMetricsCache::default(),
account_hashed_cursor: HashedCursorMetricsCache::default(),
@@ -987,11 +1380,18 @@ where
/// 2. Advertises availability
/// 3. Processes jobs in a loop:
/// - Receives job from channel
/// - Drains additional account multiproof jobs (batching)
/// - Marks worker as busy
/// - Processes the job
/// - Processes the batched jobs as a single proof computation
/// - Marks worker as available
/// 4. Shuts down when channel closes
///
/// # Batching Strategy
///
/// When multiple account multiproof requests arrive, they are merged into
/// a single proof computation. This reduces redundant trie traversals when
/// state updates arrive faster than proof computation can process them.
///
/// # Panic Safety
///
/// If this function panics, the worker thread terminates but other workers
@@ -1012,6 +1412,7 @@ where
// Create provider from factory
let provider = task_ctx.factory.database_provider_ro()?;
let proof_tx = ProofTaskTx::new(provider, worker_id);
let batch_metrics = AccountWorkerBatchMetrics::new();
trace!(
target: "trie::proof_task",
@@ -1026,20 +1427,98 @@ where
// Count this worker as available only after successful initialization.
available_workers.fetch_add(1, Ordering::Relaxed);
// Deferred blinded node jobs to process after batched account proofs.
// Pre-allocate with capacity to avoid reallocations during batching.
let mut deferred_blinded_nodes: Vec<(Nibbles, Sender<TrieNodeProviderResult>)> =
Vec::with_capacity(MAX_DEFERRED_BLINDED_NODES);
while let Ok(job) = work_rx.recv() {
// Mark worker as busy.
available_workers.fetch_sub(1, Ordering::Relaxed);
match job {
AccountWorkerJob::AccountMultiproof { input } => {
Self::process_account_multiproof(
worker_id,
&proof_tx,
storage_work_tx.clone(),
*input,
&mut account_proofs_processed,
&mut cursor_metrics_cache,
);
// Start batching: accumulate account multiproof jobs. If we encounter an
// incompatible job (different caches), process it as a separate batch.
let mut next_account_job: Option<Box<AccountMultiproofInput>> = Some(input);
while let Some(account_job) = next_account_job.take() {
let mut batch = BatchedAccountProof::new(*account_job);
let mut pending_incompatible: Option<Box<AccountMultiproofInput>> = None;
// Drain additional jobs from the queue.
while batch.senders.len() < ACCOUNT_PROOF_BATCH_LIMIT {
match work_rx.try_recv() {
Ok(AccountWorkerJob::AccountMultiproof { input: next_input }) => {
match batch.try_merge(*next_input) {
Ok(()) => {}
Err(incompatible) => {
trace!(
target: "trie::proof_task",
worker_id,
"Account multiproof batch split due to incompatible caches"
);
pending_incompatible = Some(Box::new(incompatible));
break;
}
}
}
Ok(AccountWorkerJob::BlindedAccountNode {
path,
result_sender,
}) => {
// Defer blinded node jobs to process after batched proofs.
deferred_blinded_nodes.push((path, result_sender));
// Stop batching if too many blinded nodes are deferred to
// prevent starvation.
if deferred_blinded_nodes.len() >= MAX_DEFERRED_BLINDED_NODES {
break;
}
}
Err(_) => break,
}
}
let batch_size = batch.senders.len();
batch_metrics.record_batch_size(batch_size);
let (merged_input, senders) = batch.into_input();
trace!(
target: "trie::proof_task",
worker_id,
batch_size,
targets_len = merged_input.targets.len(),
"Processing batched account multiproof"
);
Self::process_batched_account_multiproof(
worker_id,
&proof_tx,
&storage_work_tx,
merged_input,
senders,
&mut account_proofs_processed,
&mut cursor_metrics_cache,
);
// If we encountered an incompatible job, process it as its own batch
// before handling any deferred blinded node requests.
if let Some(incompatible_job) = pending_incompatible {
next_account_job = Some(incompatible_job);
}
}
// Process any deferred blinded node jobs.
for (path, result_sender) in std::mem::take(&mut deferred_blinded_nodes) {
Self::process_blinded_node(
worker_id,
&proof_tx,
path,
result_sender,
&mut account_nodes_processed,
);
}
}
AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
@@ -1074,12 +1553,16 @@ where
Ok(())
}
/// Processes an account multiproof request.
fn process_account_multiproof<Provider>(
/// Processes a batched account multiproof request and sends results to all waiting receivers.
///
/// This computes a single account multiproof with merged targets and sends the same result
/// to all original requestors, reducing redundant trie traversals.
fn process_batched_account_multiproof<Provider>(
worker_id: usize,
proof_tx: &ProofTaskTx<Provider>,
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
input: AccountMultiproofInput,
senders: Vec<ProofResultContext>,
account_proofs_processed: &mut u64,
cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
) where
@@ -1091,21 +1574,21 @@ where
collect_branch_node_masks,
multi_added_removed_keys,
missed_leaves_storage_roots,
proof_result_sender:
ProofResultContext { sender: result_tx, sequence_number: seq, state, start_time: start },
proof_result_sender: _, // We use the senders vec instead
} = input;
let span = debug_span!(
target: "trie::proof_task",
"Account multiproof calculation",
"Batched account multiproof calculation",
targets = targets.len(),
batch_size = senders.len(),
worker_id,
);
let _span_guard = span.enter();
trace!(
target: "trie::proof_task",
"Processing account multiproof"
"Processing batched account multiproof"
);
let proof_start = Instant::now();
@@ -1120,7 +1603,7 @@ where
tracker.set_precomputed_storage_roots(storage_root_targets_len as u64);
let storage_proof_receivers = match dispatch_storage_proofs(
&storage_work_tx,
storage_work_tx,
&targets,
&mut storage_prefix_sets,
collect_branch_node_masks,
@@ -1128,14 +1611,17 @@ where
) {
Ok(receivers) => receivers,
Err(error) => {
// Send error through result channel
error!(target: "trie::proof_task", "Failed to dispatch storage proofs: {error}");
let _ = result_tx.send(ProofResultMessage {
sequence_number: seq,
result: Err(error),
elapsed: start.elapsed(),
state,
});
// Send error to all receivers
let error_msg = error.to_string();
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
*account_proofs_processed += 1;
let _ = sender.send(ProofResultMessage {
sequence_number,
result: Err(ParallelStateRootError::Other(error_msg.clone())),
elapsed: start_time.elapsed(),
state,
});
}
return;
}
};
@@ -1156,46 +1642,75 @@ where
build_account_multiproof_with_storage_roots(&proof_tx.provider, ctx, &mut tracker);
let proof_elapsed = proof_start.elapsed();
let total_elapsed = start.elapsed();
let proof_cursor_metrics = tracker.cursor_metrics;
proof_cursor_metrics.record_spans();
let stats = tracker.finish();
let result = result.map(|proof| ProofResult::AccountMultiproof { proof, stats });
*account_proofs_processed += 1;
// Send result to MultiProofTask
if result_tx
.send(ProofResultMessage {
sequence_number: seq,
result,
elapsed: total_elapsed,
state,
})
.is_err()
{
trace!(
target: "trie::proof_task",
worker_id,
account_proofs_processed,
"Account multiproof receiver dropped, discarding result"
);
// Send the result to all waiting receivers.
let num_senders = senders.len();
match result {
Ok(proof) => {
// Success case: wrap proof in Arc for efficient sharing across all senders.
let proof_result = ProofResult::AccountMultiproof { proof: Arc::new(proof), stats };
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
*account_proofs_processed += 1;
if sender
.send(ProofResultMessage {
sequence_number,
result: Ok(proof_result.clone()),
elapsed: start_time.elapsed(),
state,
})
.is_err()
{
trace!(
target: "trie::proof_task",
worker_id,
sequence_number,
"Account multiproof receiver dropped, discarding result"
);
}
}
}
Err(error) => {
// Error case: convert to string for cloning, then send to all receivers.
let error_msg = error.to_string();
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
*account_proofs_processed += 1;
if sender
.send(ProofResultMessage {
sequence_number,
result: Err(ParallelStateRootError::Other(error_msg.clone())),
elapsed: start_time.elapsed(),
state,
})
.is_err()
{
trace!(
target: "trie::proof_task",
worker_id,
sequence_number,
"Account multiproof receiver dropped, discarding result"
);
}
}
}
}
trace!(
target: "trie::proof_task",
proof_time_us = proof_elapsed.as_micros(),
total_elapsed_us = total_elapsed.as_micros(),
total_processed = account_proofs_processed,
num_senders,
account_trie_cursor_duration_us = proof_cursor_metrics.account_trie_cursor.total_duration.as_micros(),
account_hashed_cursor_duration_us = proof_cursor_metrics.account_hashed_cursor.total_duration.as_micros(),
storage_trie_cursor_duration_us = proof_cursor_metrics.storage_trie_cursor.total_duration.as_micros(),
storage_hashed_cursor_duration_us = proof_cursor_metrics.storage_hashed_cursor.total_duration.as_micros(),
account_trie_cursor_metrics = ?proof_cursor_metrics.account_trie_cursor,
account_hashed_cursor_metrics = ?proof_cursor_metrics.account_hashed_cursor,
storage_trie_cursor_metrics = ?proof_cursor_metrics.storage_trie_cursor,
storage_hashed_cursor_metrics = ?proof_cursor_metrics.storage_hashed_cursor,
"Account multiproof completed"
"Batched account multiproof completed"
);
#[cfg(feature = "metrics")]
@@ -1338,7 +1853,9 @@ where
drop(_guard);
// Extract storage proof from the result
// Extract storage proof from the result.
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it
// here.
let proof = match proof_msg.result? {
ProofResult::StorageProof { hashed_address: addr, proof } => {
debug_assert_eq!(
@@ -1346,7 +1863,9 @@ where
hashed_address,
"storage worker must return same address: expected {hashed_address}, got {addr}"
);
proof
// Efficiently unwrap Arc: returns inner value if sole owner, clones
// otherwise.
Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone())
}
ProofResult::AccountMultiproof { .. } => {
unreachable!("storage worker only sends StorageProof variant")
@@ -1409,8 +1928,11 @@ where
// Consume remaining storage proof receivers for accounts not encountered during trie walk.
for (hashed_address, receiver) in storage_proof_receivers {
if let Ok(proof_msg) = receiver.recv() {
// Extract storage proof from the result
// Extract storage proof from the result.
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here.
if let Ok(ProofResult::StorageProof { proof, .. }) = proof_msg.result {
// Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise.
let proof = Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone());
collected_decoded_storages.insert(hashed_address, proof);
}
}

View File

@@ -25,6 +25,7 @@ use reth_op::{
primitives::SignedTransaction,
};
use reth_rpc_api::eth::helpers::pending_block::BuildPendingEnv;
use revm_primitives::Bytes;
use std::sync::Arc;
#[derive(Debug, Clone)]
@@ -126,13 +127,15 @@ impl ConfigureEngineEvm<CustomExecutionData> for CustomEvmConfig {
&self,
payload: &CustomExecutionData,
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
Ok(payload.inner.payload.transactions().clone().into_iter().map(|encoded| {
let transactions = payload.inner.payload.transactions().clone().into_iter();
let convert = |encoded: Bytes| {
let tx = CustomTransaction::decode_2718_exact(encoded.as_ref())
.map_err(Into::into)
.map_err(PayloadError::Decode)?;
let signer = tx.try_recover().map_err(NewPayloadError::other)?;
Ok::<_, NewPayloadError>(WithEncoded::new(encoded, tx.with_signer(signer)))
}))
};
Ok((transactions, convert))
}
}