mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
16 Commits
yk/either-
...
yk/worker-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9bd5a3ecba | ||
|
|
1a5d9a3ad3 | ||
|
|
394b0d5989 | ||
|
|
bb4285c9ba | ||
|
|
88dfd15243 | ||
|
|
388507bcee | ||
|
|
a430fb6798 | ||
|
|
b6dca7b319 | ||
|
|
12fdca7e34 | ||
|
|
c0d73fa5fe | ||
|
|
0cbd2aecdf | ||
|
|
3094cd68ea | ||
|
|
9265700ff9 | ||
|
|
0b07657f33 | ||
|
|
e7bd168eca | ||
|
|
9a55cb9da5 |
3
.github/workflows/bench.yml
vendored
3
.github/workflows/bench.yml
vendored
@@ -15,8 +15,7 @@ env:
|
||||
name: bench
|
||||
jobs:
|
||||
codspeed:
|
||||
runs-on:
|
||||
group: Reth
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
with:
|
||||
|
||||
3
.github/workflows/compact.yml
vendored
3
.github/workflows/compact.yml
vendored
@@ -17,8 +17,7 @@ env:
|
||||
name: compact-codec
|
||||
jobs:
|
||||
compact-codec:
|
||||
runs-on:
|
||||
group: Reth
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
bin:
|
||||
|
||||
4
.github/workflows/e2e.yml
vendored
4
.github/workflows/e2e.yml
vendored
@@ -19,8 +19,7 @@ concurrency:
|
||||
jobs:
|
||||
test:
|
||||
name: e2e-testsuite
|
||||
runs-on:
|
||||
group: Reth
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
timeout-minutes: 90
|
||||
@@ -43,4 +42,3 @@ jobs:
|
||||
--exclude 'op-reth' \
|
||||
--exclude 'reth' \
|
||||
-E 'binary(e2e_testsuite)'
|
||||
|
||||
|
||||
9
.github/workflows/hive.yml
vendored
9
.github/workflows/hive.yml
vendored
@@ -24,8 +24,7 @@ jobs:
|
||||
prepare-hive:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
timeout-minutes: 45
|
||||
runs-on:
|
||||
group: Reth
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- name: Checkout hive tests
|
||||
@@ -179,8 +178,7 @@ jobs:
|
||||
- prepare-reth
|
||||
- prepare-hive
|
||||
name: run ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }}
|
||||
runs-on:
|
||||
group: Reth
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
issues: write
|
||||
steps:
|
||||
@@ -247,8 +245,7 @@ jobs:
|
||||
notify-on-error:
|
||||
needs: test
|
||||
if: failure()
|
||||
runs-on:
|
||||
group: Reth
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Slack Webhook Action
|
||||
uses: rtCamp/action-slack-notify@v2
|
||||
|
||||
3
.github/workflows/integration.yml
vendored
3
.github/workflows/integration.yml
vendored
@@ -23,8 +23,7 @@ jobs:
|
||||
test:
|
||||
name: test / ${{ matrix.network }}
|
||||
if: github.event_name != 'schedule'
|
||||
runs-on:
|
||||
group: Reth
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
strategy:
|
||||
|
||||
9
.github/workflows/kurtosis-op.yml
vendored
9
.github/workflows/kurtosis-op.yml
vendored
@@ -9,7 +9,7 @@ on:
|
||||
|
||||
push:
|
||||
tags:
|
||||
- '*'
|
||||
- "*"
|
||||
|
||||
env:
|
||||
CARGO_TERM_COLOR: always
|
||||
@@ -32,8 +32,7 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
name: run kurtosis
|
||||
runs-on:
|
||||
group: Reth
|
||||
runs-on: ubuntu-latest
|
||||
needs:
|
||||
- prepare-reth
|
||||
steps:
|
||||
@@ -83,12 +82,10 @@ jobs:
|
||||
kurtosis service logs -a op-devnet op-cl-2151908-2-op-node-op-reth-op-kurtosis
|
||||
exit 1
|
||||
|
||||
|
||||
notify-on-error:
|
||||
needs: test
|
||||
if: failure()
|
||||
runs-on:
|
||||
group: Reth
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Slack Webhook Action
|
||||
uses: rtCamp/action-slack-notify@v2
|
||||
|
||||
10
.github/workflows/kurtosis.yml
vendored
10
.github/workflows/kurtosis.yml
vendored
@@ -9,7 +9,7 @@ on:
|
||||
|
||||
push:
|
||||
tags:
|
||||
- '*'
|
||||
- "*"
|
||||
|
||||
env:
|
||||
CARGO_TERM_COLOR: always
|
||||
@@ -30,8 +30,7 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
name: run kurtosis
|
||||
runs-on:
|
||||
group: Reth
|
||||
runs-on: ubuntu-latest
|
||||
needs:
|
||||
- prepare-reth
|
||||
steps:
|
||||
@@ -54,13 +53,12 @@ jobs:
|
||||
- name: Run kurtosis
|
||||
uses: ethpandaops/kurtosis-assertoor-github-action@v1
|
||||
with:
|
||||
ethereum_package_args: '.github/assets/kurtosis_network_params.yaml'
|
||||
ethereum_package_args: ".github/assets/kurtosis_network_params.yaml"
|
||||
|
||||
notify-on-error:
|
||||
needs: test
|
||||
if: failure()
|
||||
runs-on:
|
||||
group: Reth
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Slack Webhook Action
|
||||
uses: rtCamp/action-slack-notify@v2
|
||||
|
||||
3
.github/workflows/prepare-reth.yml
vendored
3
.github/workflows/prepare-reth.yml
vendored
@@ -26,8 +26,7 @@ jobs:
|
||||
prepare-reth:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
timeout-minutes: 45
|
||||
runs-on:
|
||||
group: Reth
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- run: mkdir artifacts
|
||||
|
||||
3
.github/workflows/stage.yml
vendored
3
.github/workflows/stage.yml
vendored
@@ -22,8 +22,7 @@ jobs:
|
||||
name: stage-run-test
|
||||
# Only run stage commands test in merge groups
|
||||
if: github.event_name == 'merge_group'
|
||||
runs-on:
|
||||
group: Reth
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
RUST_LOG: info,sync=error
|
||||
RUST_BACKTRACE: 1
|
||||
|
||||
5
.github/workflows/sync-era.yml
vendored
5
.github/workflows/sync-era.yml
vendored
@@ -17,8 +17,7 @@ concurrency:
|
||||
jobs:
|
||||
sync:
|
||||
name: sync (${{ matrix.chain.bin }})
|
||||
runs-on:
|
||||
group: Reth
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
RUST_LOG: info,sync=error
|
||||
RUST_BACKTRACE: 1
|
||||
@@ -64,4 +63,4 @@ jobs:
|
||||
${{ matrix.chain.bin }} stage unwind num-blocks 100 --chain ${{ matrix.chain.chain }}
|
||||
- name: Run stage unwind to block hash
|
||||
run: |
|
||||
${{ matrix.chain.bin }} stage unwind to-block ${{ matrix.chain.unwind-target }} --chain ${{ matrix.chain.chain }}
|
||||
${{ matrix.chain.bin }} stage unwind to-block ${{ matrix.chain.unwind-target }} --chain ${{ matrix.chain.chain }}
|
||||
|
||||
5
.github/workflows/sync.yml
vendored
5
.github/workflows/sync.yml
vendored
@@ -17,8 +17,7 @@ concurrency:
|
||||
jobs:
|
||||
sync:
|
||||
name: sync (${{ matrix.chain.bin }})
|
||||
runs-on:
|
||||
group: Reth
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
RUST_LOG: info,sync=error
|
||||
RUST_BACKTRACE: 1
|
||||
@@ -63,4 +62,4 @@ jobs:
|
||||
${{ matrix.chain.bin }} stage unwind num-blocks 100 --chain ${{ matrix.chain.chain }}
|
||||
- name: Run stage unwind to block hash
|
||||
run: |
|
||||
${{ matrix.chain.bin }} stage unwind to-block ${{ matrix.chain.unwind-target }} --chain ${{ matrix.chain.chain }}
|
||||
${{ matrix.chain.bin }} stage unwind to-block ${{ matrix.chain.unwind-target }} --chain ${{ matrix.chain.chain }}
|
||||
|
||||
9
.github/workflows/unit.yml
vendored
9
.github/workflows/unit.yml
vendored
@@ -19,8 +19,7 @@ concurrency:
|
||||
jobs:
|
||||
test:
|
||||
name: test / ${{ matrix.type }} (${{ matrix.partition }}/${{ matrix.total_partitions }})
|
||||
runs-on:
|
||||
group: Reth
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
strategy:
|
||||
@@ -65,8 +64,7 @@ jobs:
|
||||
|
||||
state:
|
||||
name: Ethereum state tests
|
||||
runs-on:
|
||||
group: Reth
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
RUST_LOG: info,sync=error
|
||||
RUST_BACKTRACE: 1
|
||||
@@ -100,8 +98,7 @@ jobs:
|
||||
|
||||
doc:
|
||||
name: doc tests
|
||||
runs-on:
|
||||
group: Reth
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
timeout-minutes: 30
|
||||
|
||||
195
Cargo.lock
generated
195
Cargo.lock
generated
@@ -97,9 +97,9 @@ checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
|
||||
|
||||
[[package]]
|
||||
name = "alloy-chains"
|
||||
version = "0.2.20"
|
||||
version = "0.2.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4bc32535569185cbcb6ad5fa64d989a47bccb9a08e27284b1f2a3ccf16e6d010"
|
||||
checksum = "1b9ebac8ff9c2f07667e1803dc777304337e160ce5153335beb45e8ec0751808"
|
||||
dependencies = [
|
||||
"alloy-primitives",
|
||||
"alloy-rlp",
|
||||
@@ -112,9 +112,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-consensus"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b6440213a22df93a87ed512d2f668e7dc1d62a05642d107f82d61edc9e12370"
|
||||
checksum = "2e318e25fb719e747a7e8db1654170fc185024f3ed5b10f86c08d448a912f6e2"
|
||||
dependencies = [
|
||||
"alloy-eips",
|
||||
"alloy-primitives",
|
||||
@@ -140,9 +140,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-consensus-any"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "15d0bea09287942405c4f9d2a4f22d1e07611c2dbd9d5bf94b75366340f9e6e0"
|
||||
checksum = "364380a845193a317bcb7a5398fc86cdb66c47ebe010771dde05f6869bf9e64a"
|
||||
dependencies = [
|
||||
"alloy-consensus",
|
||||
"alloy-eips",
|
||||
@@ -155,9 +155,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-contract"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d69af404f1d00ddb42f2419788fa87746a4cd13bab271916d7726fda6c792d94"
|
||||
checksum = "08d39c80ffc806f27a76ed42f3351a455f3dc4f81d6ff92c8aad2cf36b7d3a34"
|
||||
dependencies = [
|
||||
"alloy-consensus",
|
||||
"alloy-dyn-abi",
|
||||
@@ -240,9 +240,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-eips"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4bd2c7ae05abcab4483ce821f12f285e01c0b33804e6883dd9ca1569a87ee2be"
|
||||
checksum = "a4c4d7c5839d9f3a467900c625416b24328450c65702eb3d8caff8813e4d1d33"
|
||||
dependencies = [
|
||||
"alloy-eip2124",
|
||||
"alloy-eip2930",
|
||||
@@ -288,9 +288,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-genesis"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fc47eaae86488b07ea8e20236184944072a78784a1f4993f8ec17b3aa5d08c21"
|
||||
checksum = "1ba4b1be0988c11f0095a2380aa596e35533276b8fa6c9e06961bbfe0aebcac5"
|
||||
dependencies = [
|
||||
"alloy-eips",
|
||||
"alloy-primitives",
|
||||
@@ -329,9 +329,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-json-rpc"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "003f46c54f22854a32b9cc7972660a476968008ad505427eabab49225309ec40"
|
||||
checksum = "f72cf87cda808e593381fb9f005ffa4d2475552b7a6c5ac33d087bf77d82abd0"
|
||||
dependencies = [
|
||||
"alloy-primitives",
|
||||
"alloy-sol-types",
|
||||
@@ -344,9 +344,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-network"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4f4029954d9406a40979f3a3b46950928a0fdcfe3ea8a9b0c17490d57e8aa0e3"
|
||||
checksum = "12aeb37b6f2e61b93b1c3d34d01ee720207c76fe447e2a2c217e433ac75b17f5"
|
||||
dependencies = [
|
||||
"alloy-consensus",
|
||||
"alloy-consensus-any",
|
||||
@@ -370,9 +370,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-network-primitives"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7805124ad69e57bbae7731c9c344571700b2a18d351bda9e0eba521c991d1bcb"
|
||||
checksum = "abd29ace62872083e30929cd9b282d82723196d196db589f3ceda67edcc05552"
|
||||
dependencies = [
|
||||
"alloy-consensus",
|
||||
"alloy-eips",
|
||||
@@ -401,9 +401,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-op-hardforks"
|
||||
version = "0.4.4"
|
||||
version = "0.4.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "95ac97adaba4c26e17192d81f49186ac20c1e844e35a00e169c8d3d58bc84e6b"
|
||||
checksum = "f96fb2fce4024ada5b2c11d4076acf778a0d3e4f011c6dfd2ffce6d0fcf84ee9"
|
||||
dependencies = [
|
||||
"alloy-chains",
|
||||
"alloy-hardforks",
|
||||
@@ -444,9 +444,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-provider"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d369e12c92870d069e0c9dc5350377067af8a056e29e3badf8446099d7e00889"
|
||||
checksum = "9b710636d7126e08003b8217e24c09f0cca0b46d62f650a841736891b1ed1fc1"
|
||||
dependencies = [
|
||||
"alloy-chains",
|
||||
"alloy-consensus",
|
||||
@@ -489,9 +489,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-pubsub"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f77d20cdbb68a614c7a86b3ffef607b37d087bb47a03c58f4c3f8f99bc3ace3b"
|
||||
checksum = "cdd4c64eb250a18101d22ae622357c6b505e158e9165d4c7974d59082a600c5e"
|
||||
dependencies = [
|
||||
"alloy-json-rpc",
|
||||
"alloy-primitives",
|
||||
@@ -533,9 +533,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-rpc-client"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "31c89883fe6b7381744cbe80fef638ac488ead4f1956a4278956a1362c71cd2e"
|
||||
checksum = "d0882e72d2c1c0c79dcf4ab60a67472d3f009a949f774d4c17d0bdb669cfde05"
|
||||
dependencies = [
|
||||
"alloy-json-rpc",
|
||||
"alloy-primitives",
|
||||
@@ -559,9 +559,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-rpc-types"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "64e279e6d40ee40fe8f76753b678d8d5d260cb276dc6c8a8026099b16d2b43f4"
|
||||
checksum = "39cf1398cb33aacb139a960fa3d8cf8b1202079f320e77e952a0b95967bf7a9f"
|
||||
dependencies = [
|
||||
"alloy-primitives",
|
||||
"alloy-rpc-types-engine",
|
||||
@@ -572,9 +572,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-rpc-types-admin"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2bcf50ccb65d29b8599f8f5e23dcac685f1d79459654c830cba381345760e901"
|
||||
checksum = "65a583d2029b171301f5dcf122aa2ef443a65a373778ec76540d999691ae867d"
|
||||
dependencies = [
|
||||
"alloy-genesis",
|
||||
"alloy-primitives",
|
||||
@@ -584,9 +584,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-rpc-types-anvil"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5e176c26fdd87893b6afeb5d92099d8f7e7a1fe11d6f4fe0883d6e33ac5f31ba"
|
||||
checksum = "c3ce4c24e416bd0f17fceeb2f26cd8668df08fe19e1dc02f9d41c3b8ed1e93e0"
|
||||
dependencies = [
|
||||
"alloy-primitives",
|
||||
"alloy-rpc-types-eth",
|
||||
@@ -596,9 +596,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-rpc-types-any"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b43c1622aac2508d528743fd4cfdac1dea92d5a8fa894038488ff7edd0af0b32"
|
||||
checksum = "6a63fb40ed24e4c92505f488f9dd256e2afaed17faa1b7a221086ebba74f4122"
|
||||
dependencies = [
|
||||
"alloy-consensus-any",
|
||||
"alloy-rpc-types-eth",
|
||||
@@ -607,9 +607,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-rpc-types-beacon"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1786681640d4c60f22b6b8376b0f3fa200360bf1c3c2cb913e6c97f51928eb1b"
|
||||
checksum = "16633087e23d8d75161c3a59aa183203637b817a5a8d2f662f612ccb6d129af0"
|
||||
dependencies = [
|
||||
"alloy-eips",
|
||||
"alloy-primitives",
|
||||
@@ -627,9 +627,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-rpc-types-debug"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1b2ca3a434a6d49910a7e8e51797eb25db42ef8a5578c52d877fcb26d0afe7bc"
|
||||
checksum = "4936f579d9d10eae01772b2ab3497f9d568684f05f26f8175e12f9a1a2babc33"
|
||||
dependencies = [
|
||||
"alloy-primitives",
|
||||
"derive_more",
|
||||
@@ -639,9 +639,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-rpc-types-engine"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d9c4c53a8b0905d931e7921774a1830609713bd3e8222347963172b03a3ecc68"
|
||||
checksum = "4c60bdce3be295924122732b7ecd0b2495ce4790bedc5370ca7019c08ad3f26e"
|
||||
dependencies = [
|
||||
"alloy-consensus",
|
||||
"alloy-eips",
|
||||
@@ -660,9 +660,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-rpc-types-eth"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ed5fafb741c19b3cca4cdd04fa215c89413491f9695a3e928dee2ae5657f607e"
|
||||
checksum = "9eae0c7c40da20684548cbc8577b6b7447f7bf4ddbac363df95e3da220e41e72"
|
||||
dependencies = [
|
||||
"alloy-consensus",
|
||||
"alloy-consensus-any",
|
||||
@@ -682,9 +682,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-rpc-types-mev"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "49a97bfc6d9b411c85bb08e1174ddd3e5d61b10d3bd13f529d6609f733cb2f6f"
|
||||
checksum = "81c0dd81c24944cfbf45b5df7cd149d9cd3e354db81ccf08aa47e0e05be8ab97"
|
||||
dependencies = [
|
||||
"alloy-consensus",
|
||||
"alloy-eips",
|
||||
@@ -697,9 +697,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-rpc-types-trace"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c55324323aa634b01bdecb2d47462a8dce05f5505b14a6e5db361eef16eda476"
|
||||
checksum = "ef206a4b8d436fbb7cf2e6a61c692d11df78f9382becc3c9a283bd58e64f0583"
|
||||
dependencies = [
|
||||
"alloy-primitives",
|
||||
"alloy-rpc-types-eth",
|
||||
@@ -711,9 +711,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-rpc-types-txpool"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "96b1aa28effb6854be356ce92ed64cea3b323acd04c3f8bfb5126e2839698043"
|
||||
checksum = "ecb5a795264a02222f9534435b8f40dcbd88de8e9d586647884aae24f389ebf2"
|
||||
dependencies = [
|
||||
"alloy-primitives",
|
||||
"alloy-rpc-types-eth",
|
||||
@@ -723,9 +723,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-serde"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a6f180c399ca7c1e2fe17ea58343910cad0090878a696ff5a50241aee12fc529"
|
||||
checksum = "c0df1987ed0ff2d0159d76b52e7ddfc4e4fbddacc54d2fbee765e0d14d7c01b5"
|
||||
dependencies = [
|
||||
"alloy-primitives",
|
||||
"arbitrary",
|
||||
@@ -735,9 +735,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-signer"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ecc39ad2c0a3d2da8891f4081565780703a593f090f768f884049aa3aa929cbc"
|
||||
checksum = "6ff69deedee7232d7ce5330259025b868c5e6a52fa8dffda2c861fb3a5889b24"
|
||||
dependencies = [
|
||||
"alloy-primitives",
|
||||
"async-trait",
|
||||
@@ -750,9 +750,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-signer-local"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "930e17cb1e46446a193a593a3bfff8d0ecee4e510b802575ebe300ae2e43ef75"
|
||||
checksum = "72cfe0be3ec5a8c1a46b2e5a7047ed41121d360d97f4405bb7c1c784880c86cb"
|
||||
dependencies = [
|
||||
"alloy-consensus",
|
||||
"alloy-network",
|
||||
@@ -839,9 +839,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-transport"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cae82426d98f8bc18f53c5223862907cac30ab8fc5e4cd2bb50808e6d3ab43d8"
|
||||
checksum = "be98b07210d24acf5b793c99b759e9a696e4a2e67593aec0487ae3b3e1a2478c"
|
||||
dependencies = [
|
||||
"alloy-json-rpc",
|
||||
"auto_impl",
|
||||
@@ -862,9 +862,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-transport-http"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "90aa6825760905898c106aba9c804b131816a15041523e80b6d4fe7af6380ada"
|
||||
checksum = "4198a1ee82e562cab85e7f3d5921aab725d9bd154b6ad5017f82df1695877c97"
|
||||
dependencies = [
|
||||
"alloy-json-rpc",
|
||||
"alloy-transport",
|
||||
@@ -877,9 +877,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-transport-ipc"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6ace83a4a6bb896e5894c3479042e6ba78aa5271dde599aa8c36a021d49cc8cc"
|
||||
checksum = "d8db249779ebc20dc265920c7e706ed0d31dbde8627818d1cbde60919b875bb0"
|
||||
dependencies = [
|
||||
"alloy-json-rpc",
|
||||
"alloy-pubsub",
|
||||
@@ -897,9 +897,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-transport-ws"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "86c9ab4c199e3a8f3520b60ba81aa67bb21fed9ed0d8304e0569094d0758a56f"
|
||||
checksum = "5ad2344a12398d7105e3722c9b7a7044ea837128e11d453604dec6e3731a86e2"
|
||||
dependencies = [
|
||||
"alloy-pubsub",
|
||||
"alloy-transport",
|
||||
@@ -935,9 +935,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-tx-macros"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ae109e33814b49fc0a62f2528993aa8a2dd346c26959b151f05441dc0b9da292"
|
||||
checksum = "333544408503f42d7d3792bfc0f7218b643d968a03d2c0ed383ae558fb4a76d0"
|
||||
dependencies = [
|
||||
"darling 0.21.3",
|
||||
"proc-macro2",
|
||||
@@ -1627,15 +1627,15 @@ checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7"
|
||||
|
||||
[[package]]
|
||||
name = "bitcoin-io"
|
||||
version = "0.1.3"
|
||||
version = "0.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0b47c4ab7a93edb0c7198c5535ed9b52b63095f4e9b45279c6736cec4b856baf"
|
||||
checksum = "2dee39a0ee5b4095224a0cfc6bf4cc1baf0f9624b96b367e53b66d974e51d953"
|
||||
|
||||
[[package]]
|
||||
name = "bitcoin_hashes"
|
||||
version = "0.14.0"
|
||||
version = "0.14.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bb18c03d0db0247e147a21a6faafd5a7eb851c743db062de72018b6b7e8e4d16"
|
||||
checksum = "26ec84b80c482df901772e931a9a681e26a1b9ee2302edeff23cb30328745c8b"
|
||||
dependencies = [
|
||||
"bitcoin-io",
|
||||
"hex-conservative",
|
||||
@@ -2471,9 +2471,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "convert_case"
|
||||
version = "0.7.1"
|
||||
version = "0.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bb402b8d4c85569410425650ce3eddc7d698ed96d39a73f941b08fb63082f1e7"
|
||||
checksum = "633458d4ef8c78b72454de2d54fd6ab2e60f9e02be22f3c6104cdc8a4e0fceb9"
|
||||
dependencies = [
|
||||
"unicode-segmentation",
|
||||
]
|
||||
@@ -2978,22 +2978,23 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "derive_more"
|
||||
version = "2.0.1"
|
||||
version = "2.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678"
|
||||
checksum = "10b768e943bed7bf2cab53df09f4bc34bfd217cdb57d971e769874c9a6710618"
|
||||
dependencies = [
|
||||
"derive_more-impl",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "derive_more-impl"
|
||||
version = "2.0.1"
|
||||
version = "2.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3"
|
||||
checksum = "6d286bfdaf75e988b4a78e013ecd79c581e06399ab53fbacd2d916c2f904f30b"
|
||||
dependencies = [
|
||||
"convert_case",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"rustc_version 0.4.1",
|
||||
"syn 2.0.111",
|
||||
"unicode-xid",
|
||||
]
|
||||
@@ -3981,9 +3982,9 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
|
||||
|
||||
[[package]]
|
||||
name = "flate2"
|
||||
version = "1.1.5"
|
||||
version = "1.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bfe33edd8e85a12a67454e37f8c75e730830d83e313556ab9ebf9ee7fbeb3bfb"
|
||||
checksum = "a2152dbcb980c05735e2a651d96011320a949eb31a0c8b38b72645ce97dec676"
|
||||
dependencies = [
|
||||
"crc32fast",
|
||||
"miniz_oxide",
|
||||
@@ -4277,9 +4278,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "git2"
|
||||
version = "0.20.2"
|
||||
version = "0.20.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2deb07a133b1520dc1a5690e9bd08950108873d7ed5de38dcc74d3b5ebffa110"
|
||||
checksum = "3e2b37e2f62729cdada11f0e6b3b6fe383c69c29fc619e391223e12856af308c"
|
||||
dependencies = [
|
||||
"bitflags 2.10.0",
|
||||
"libc",
|
||||
@@ -4704,9 +4705,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "hyper-util"
|
||||
version = "0.1.18"
|
||||
version = "0.1.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "52e9a2a24dc5c6821e71a7030e1e14b7b632acac55c40e9d2e082c621261bb56"
|
||||
checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
@@ -5416,15 +5417,15 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.177"
|
||||
version = "0.2.178"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976"
|
||||
checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091"
|
||||
|
||||
[[package]]
|
||||
name = "libgit2-sys"
|
||||
version = "0.18.2+1.9.1"
|
||||
version = "0.18.3+1.9.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1c42fe03df2bd3c53a3a9c7317ad91d80c81cd1fb0caec8d7cc4cd2bfa10c222"
|
||||
checksum = "c9b3acc4b91781bb0b3386669d325163746af5f6e4f73e6d2d630e09a35f3487"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
@@ -5569,9 +5570,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "log"
|
||||
version = "0.4.28"
|
||||
version = "0.4.29"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432"
|
||||
checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897"
|
||||
|
||||
[[package]]
|
||||
name = "loom"
|
||||
@@ -5839,9 +5840,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "mio"
|
||||
version = "1.1.0"
|
||||
version = "1.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873"
|
||||
checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"log",
|
||||
@@ -6792,7 +6793,7 @@ version = "3.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "219cb19e96be00ab2e37d6e299658a0cfa83e52429179969b0f0121b4ac46983"
|
||||
dependencies = [
|
||||
"toml_edit 0.23.7",
|
||||
"toml_edit 0.23.9",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -12787,9 +12788,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "toml_edit"
|
||||
version = "0.23.7"
|
||||
version = "0.23.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6485ef6d0d9b5d0ec17244ff7eb05310113c3f316f2d14200d4de56b3cb98f8d"
|
||||
checksum = "5d7cbc3b4b49633d57a0509303158ca50de80ae32c265093b24c414705807832"
|
||||
dependencies = [
|
||||
"indexmap 2.12.1",
|
||||
"toml_datetime 0.7.3",
|
||||
@@ -13303,9 +13304,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
|
||||
|
||||
[[package]]
|
||||
name = "uuid"
|
||||
version = "1.18.1"
|
||||
version = "1.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2"
|
||||
checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a"
|
||||
dependencies = [
|
||||
"getrandom 0.3.4",
|
||||
"js-sys",
|
||||
@@ -14282,18 +14283,18 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy"
|
||||
version = "0.8.30"
|
||||
version = "0.8.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4ea879c944afe8a2b25fef16bb4ba234f47c694565e97383b36f3a878219065c"
|
||||
checksum = "fd74ec98b9250adb3ca554bdde269adf631549f51d8a8f8f0a10b50f1cb298c3"
|
||||
dependencies = [
|
||||
"zerocopy-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy-derive"
|
||||
version = "0.8.30"
|
||||
version = "0.8.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cf955aa904d6040f70dc8e9384444cb1030aed272ba3cb09bbc4ab9e7c1f34f5"
|
||||
checksum = "d8a8d209fdf45cf5138cbb5a506f6b52522a25afccc534d1475dad8e31105c6a"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
|
||||
54
Cargo.toml
54
Cargo.toml
@@ -495,33 +495,33 @@ alloy-trie = { version = "0.9.1", default-features = false }
|
||||
|
||||
alloy-hardforks = "0.4.5"
|
||||
|
||||
alloy-consensus = { version = "1.1.2", default-features = false }
|
||||
alloy-contract = { version = "1.1.2", default-features = false }
|
||||
alloy-eips = { version = "1.1.2", default-features = false }
|
||||
alloy-genesis = { version = "1.1.2", default-features = false }
|
||||
alloy-json-rpc = { version = "1.1.2", default-features = false }
|
||||
alloy-network = { version = "1.1.2", default-features = false }
|
||||
alloy-network-primitives = { version = "1.1.2", default-features = false }
|
||||
alloy-provider = { version = "1.1.2", features = ["reqwest", "debug-api"], default-features = false }
|
||||
alloy-pubsub = { version = "1.1.2", default-features = false }
|
||||
alloy-rpc-client = { version = "1.1.2", default-features = false }
|
||||
alloy-rpc-types = { version = "1.1.2", features = ["eth"], default-features = false }
|
||||
alloy-rpc-types-admin = { version = "1.1.2", default-features = false }
|
||||
alloy-rpc-types-anvil = { version = "1.1.2", default-features = false }
|
||||
alloy-rpc-types-beacon = { version = "1.1.2", default-features = false }
|
||||
alloy-rpc-types-debug = { version = "1.1.2", default-features = false }
|
||||
alloy-rpc-types-engine = { version = "1.1.2", default-features = false }
|
||||
alloy-rpc-types-eth = { version = "1.1.2", default-features = false }
|
||||
alloy-rpc-types-mev = { version = "1.1.2", default-features = false }
|
||||
alloy-rpc-types-trace = { version = "1.1.2", default-features = false }
|
||||
alloy-rpc-types-txpool = { version = "1.1.2", default-features = false }
|
||||
alloy-serde = { version = "1.1.2", default-features = false }
|
||||
alloy-signer = { version = "1.1.2", default-features = false }
|
||||
alloy-signer-local = { version = "1.1.2", default-features = false }
|
||||
alloy-transport = { version = "1.1.2" }
|
||||
alloy-transport-http = { version = "1.1.2", features = ["reqwest-rustls-tls"], default-features = false }
|
||||
alloy-transport-ipc = { version = "1.1.2", default-features = false }
|
||||
alloy-transport-ws = { version = "1.1.2", default-features = false }
|
||||
alloy-consensus = { version = "1.1.3", default-features = false }
|
||||
alloy-contract = { version = "1.1.3", default-features = false }
|
||||
alloy-eips = { version = "1.1.3", default-features = false }
|
||||
alloy-genesis = { version = "1.1.3", default-features = false }
|
||||
alloy-json-rpc = { version = "1.1.3", default-features = false }
|
||||
alloy-network = { version = "1.1.3", default-features = false }
|
||||
alloy-network-primitives = { version = "1.1.3", default-features = false }
|
||||
alloy-provider = { version = "1.1.3", features = ["reqwest", "debug-api"], default-features = false }
|
||||
alloy-pubsub = { version = "1.1.3", default-features = false }
|
||||
alloy-rpc-client = { version = "1.1.3", default-features = false }
|
||||
alloy-rpc-types = { version = "1.1.3", features = ["eth"], default-features = false }
|
||||
alloy-rpc-types-admin = { version = "1.1.3", default-features = false }
|
||||
alloy-rpc-types-anvil = { version = "1.1.3", default-features = false }
|
||||
alloy-rpc-types-beacon = { version = "1.1.3", default-features = false }
|
||||
alloy-rpc-types-debug = { version = "1.1.3", default-features = false }
|
||||
alloy-rpc-types-engine = { version = "1.1.3", default-features = false }
|
||||
alloy-rpc-types-eth = { version = "1.1.3", default-features = false }
|
||||
alloy-rpc-types-mev = { version = "1.1.3", default-features = false }
|
||||
alloy-rpc-types-trace = { version = "1.1.3", default-features = false }
|
||||
alloy-rpc-types-txpool = { version = "1.1.3", default-features = false }
|
||||
alloy-serde = { version = "1.1.3", default-features = false }
|
||||
alloy-signer = { version = "1.1.3", default-features = false }
|
||||
alloy-signer-local = { version = "1.1.3", default-features = false }
|
||||
alloy-transport = { version = "1.1.3" }
|
||||
alloy-transport-http = { version = "1.1.3", features = ["reqwest-rustls-tls"], default-features = false }
|
||||
alloy-transport-ipc = { version = "1.1.3", default-features = false }
|
||||
alloy-transport-ws = { version = "1.1.3", default-features = false }
|
||||
|
||||
# op
|
||||
alloy-op-evm = { version = "0.24.1", default-features = false }
|
||||
|
||||
@@ -57,6 +57,7 @@ pub(crate) struct TotalGasRow {
|
||||
/// - `mean_new_payload_latency_ms`: arithmetic mean latency across blocks.
|
||||
/// - `median_new_payload_latency_ms`: p50 latency across blocks.
|
||||
/// - `p90_new_payload_latency_ms` / `p99_new_payload_latency_ms`: tail latencies across blocks.
|
||||
/// - `std_dev_new_payload_latency_ms`: standard deviation of latency across blocks.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub(crate) struct BenchmarkSummary {
|
||||
pub total_blocks: u64,
|
||||
@@ -66,6 +67,7 @@ pub(crate) struct BenchmarkSummary {
|
||||
pub median_new_payload_latency_ms: f64,
|
||||
pub p90_new_payload_latency_ms: f64,
|
||||
pub p99_new_payload_latency_ms: f64,
|
||||
pub std_dev_new_payload_latency_ms: f64,
|
||||
pub gas_per_second: f64,
|
||||
pub blocks_per_second: f64,
|
||||
pub min_block_number: u64,
|
||||
@@ -96,6 +98,7 @@ pub(crate) struct RefInfo {
|
||||
/// Percent deltas are `(feature - baseline) / baseline * 100`:
|
||||
/// - `new_payload_latency_p50_change_percent` / p90 / p99: percent changes of the respective
|
||||
/// per-block percentiles.
|
||||
/// - `std_dev_change_percent`: percent change in standard deviation of newPayload latency.
|
||||
/// - `per_block_latency_change_mean_percent` / `per_block_latency_change_median_percent` are the
|
||||
/// mean and median of per-block percent deltas (feature vs baseline), capturing block-level
|
||||
/// drift.
|
||||
@@ -114,6 +117,7 @@ pub(crate) struct ComparisonSummary {
|
||||
pub new_payload_latency_p50_change_percent: f64,
|
||||
pub new_payload_latency_p90_change_percent: f64,
|
||||
pub new_payload_latency_p99_change_percent: f64,
|
||||
pub std_dev_change_percent: f64,
|
||||
pub gas_per_second_change_percent: f64,
|
||||
pub blocks_per_second_change_percent: f64,
|
||||
}
|
||||
@@ -335,6 +339,9 @@ impl ComparisonGenerator {
|
||||
let mean_new_payload_latency_ms: f64 =
|
||||
latencies_ms.iter().sum::<f64>() / total_blocks as f64;
|
||||
|
||||
let std_dev_new_payload_latency_ms =
|
||||
calculate_std_dev(&latencies_ms, mean_new_payload_latency_ms);
|
||||
|
||||
let mut sorted_latencies_ms = latencies_ms;
|
||||
sorted_latencies_ms.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
|
||||
let median_new_payload_latency_ms = percentile(&sorted_latencies_ms, 0.5);
|
||||
@@ -365,6 +372,7 @@ impl ComparisonGenerator {
|
||||
median_new_payload_latency_ms,
|
||||
p90_new_payload_latency_ms,
|
||||
p99_new_payload_latency_ms,
|
||||
std_dev_new_payload_latency_ms,
|
||||
gas_per_second,
|
||||
blocks_per_second,
|
||||
min_block_number,
|
||||
@@ -432,6 +440,10 @@ impl ComparisonGenerator {
|
||||
baseline.p99_new_payload_latency_ms,
|
||||
feature.p99_new_payload_latency_ms,
|
||||
),
|
||||
std_dev_change_percent: calc_percent_change(
|
||||
baseline.std_dev_new_payload_latency_ms,
|
||||
feature.std_dev_new_payload_latency_ms,
|
||||
),
|
||||
gas_per_second_change_percent: calc_percent_change(
|
||||
baseline.gas_per_second,
|
||||
feature.gas_per_second,
|
||||
@@ -562,6 +574,7 @@ impl ComparisonGenerator {
|
||||
" NewPayload Latency p99: {:+.2}%",
|
||||
summary.new_payload_latency_p99_change_percent
|
||||
);
|
||||
println!(" NewPayload Latency std dev: {:+.2}%", summary.std_dev_change_percent);
|
||||
println!(
|
||||
" Gas/Second: {:+.2}%",
|
||||
summary.gas_per_second_change_percent
|
||||
@@ -584,11 +597,12 @@ impl ComparisonGenerator {
|
||||
);
|
||||
println!(" NewPayload latency (ms):");
|
||||
println!(
|
||||
" mean: {:.2}, p50: {:.2}, p90: {:.2}, p99: {:.2}",
|
||||
" mean: {:.2}, p50: {:.2}, p90: {:.2}, p99: {:.2}, std dev: {:.2}",
|
||||
baseline.mean_new_payload_latency_ms,
|
||||
baseline.median_new_payload_latency_ms,
|
||||
baseline.p90_new_payload_latency_ms,
|
||||
baseline.p99_new_payload_latency_ms
|
||||
baseline.p99_new_payload_latency_ms,
|
||||
baseline.std_dev_new_payload_latency_ms
|
||||
);
|
||||
if let (Some(start), Some(end)) =
|
||||
(&report.baseline.start_timestamp, &report.baseline.end_timestamp)
|
||||
@@ -613,11 +627,12 @@ impl ComparisonGenerator {
|
||||
);
|
||||
println!(" NewPayload latency (ms):");
|
||||
println!(
|
||||
" mean: {:.2}, p50: {:.2}, p90: {:.2}, p99: {:.2}",
|
||||
" mean: {:.2}, p50: {:.2}, p90: {:.2}, p99: {:.2}, std dev: {:.2}",
|
||||
feature.mean_new_payload_latency_ms,
|
||||
feature.median_new_payload_latency_ms,
|
||||
feature.p90_new_payload_latency_ms,
|
||||
feature.p99_new_payload_latency_ms
|
||||
feature.p99_new_payload_latency_ms,
|
||||
feature.std_dev_new_payload_latency_ms
|
||||
);
|
||||
if let (Some(start), Some(end)) =
|
||||
(&report.feature.start_timestamp, &report.feature.end_timestamp)
|
||||
|
||||
@@ -91,6 +91,7 @@ impl Command {
|
||||
let mut settings @ StorageSettings {
|
||||
receipts_in_static_files: _,
|
||||
transaction_senders_in_static_files: _,
|
||||
storages_history_in_rocksdb: _,
|
||||
} = settings.unwrap_or_else(StorageSettings::legacy);
|
||||
|
||||
// Update the setting based on the key
|
||||
|
||||
@@ -229,9 +229,15 @@ fn bench_state_root(c: &mut Criterion) {
|
||||
black_box({
|
||||
let mut handle = payload_processor.spawn(
|
||||
Default::default(),
|
||||
core::iter::empty::<
|
||||
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
|
||||
>(),
|
||||
(
|
||||
core::iter::empty::<
|
||||
Result<
|
||||
Recovered<TransactionSigned>,
|
||||
core::convert::Infallible,
|
||||
>,
|
||||
>(),
|
||||
std::convert::identity,
|
||||
),
|
||||
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
|
||||
OverlayStateProviderFactory::new(provider),
|
||||
&TreeConfig::default(),
|
||||
|
||||
@@ -146,17 +146,26 @@ impl<S: StateProvider> StateProvider for CachedStateProvider<S> {
|
||||
storage_key: StorageKey,
|
||||
) -> ProviderResult<Option<StorageValue>> {
|
||||
match self.caches.get_storage(&account, &storage_key) {
|
||||
SlotStatus::NotCached => {
|
||||
self.metrics.storage_cache_misses.increment(1);
|
||||
(SlotStatus::NotCached, maybe_cache) => {
|
||||
let final_res = self.state_provider.storage(account, storage_key)?;
|
||||
self.caches.insert_storage(account, storage_key, final_res);
|
||||
let account_cache = maybe_cache.unwrap_or_default();
|
||||
account_cache.insert_storage(storage_key, final_res);
|
||||
// we always need to insert the value to update the weights.
|
||||
// Note: there exists a race when the storage cache did not exist yet and two
|
||||
// consumers looking up the a storage value for this account for the first time,
|
||||
// however we can assume that this will only happen for the very first (mostlikely
|
||||
// the same) value, and don't expect that this will accidentally
|
||||
// replace an account storage cache with additional values.
|
||||
self.caches.insert_storage_cache(account, account_cache);
|
||||
|
||||
self.metrics.storage_cache_misses.increment(1);
|
||||
Ok(final_res)
|
||||
}
|
||||
SlotStatus::Empty => {
|
||||
(SlotStatus::Empty, _) => {
|
||||
self.metrics.storage_cache_hits.increment(1);
|
||||
Ok(None)
|
||||
}
|
||||
SlotStatus::Value(value) => {
|
||||
(SlotStatus::Value(value), _) => {
|
||||
self.metrics.storage_cache_hits.increment(1);
|
||||
Ok(Some(value))
|
||||
}
|
||||
@@ -311,18 +320,28 @@ pub(crate) struct ExecutionCache {
|
||||
impl ExecutionCache {
|
||||
/// Get storage value from hierarchical cache.
|
||||
///
|
||||
/// Returns a `SlotStatus` indicating whether:
|
||||
/// - `NotCached`: The account's storage cache doesn't exist
|
||||
/// - `Empty`: The slot exists in the account's cache but is empty
|
||||
/// - `Value`: The slot exists and has a specific value
|
||||
pub(crate) fn get_storage(&self, address: &Address, key: &StorageKey) -> SlotStatus {
|
||||
/// Returns a tuple of:
|
||||
/// - `SlotStatus` indicating whether:
|
||||
/// - `NotCached`: The account's storage cache doesn't exist
|
||||
/// - `Empty`: The slot exists in the account's cache but is empty
|
||||
/// - `Value`: The slot exists and has a specific value
|
||||
/// - `Option<Arc<AccountStorageCache>>`: The account's storage cache if it exists
|
||||
pub(crate) fn get_storage(
|
||||
&self,
|
||||
address: &Address,
|
||||
key: &StorageKey,
|
||||
) -> (SlotStatus, Option<Arc<AccountStorageCache>>) {
|
||||
match self.storage_cache.get(address) {
|
||||
None => SlotStatus::NotCached,
|
||||
Some(account_cache) => account_cache.get_storage(key),
|
||||
None => (SlotStatus::NotCached, None),
|
||||
Some(account_cache) => {
|
||||
let status = account_cache.get_storage(key);
|
||||
(status, Some(account_cache))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Insert storage value into hierarchical cache
|
||||
#[cfg(test)]
|
||||
pub(crate) fn insert_storage(
|
||||
&self,
|
||||
address: Address,
|
||||
@@ -351,6 +370,15 @@ impl ExecutionCache {
|
||||
self.storage_cache.insert(address, account_cache);
|
||||
}
|
||||
|
||||
/// Inserts the [`AccountStorageCache`].
|
||||
pub(crate) fn insert_storage_cache(
|
||||
&self,
|
||||
address: Address,
|
||||
storage_cache: Arc<AccountStorageCache>,
|
||||
) {
|
||||
self.storage_cache.insert(address, storage_cache);
|
||||
}
|
||||
|
||||
/// Invalidate storage for specific account
|
||||
pub(crate) fn invalidate_account_storage(&self, address: &Address) {
|
||||
self.storage_cache.invalidate(address);
|
||||
@@ -800,7 +828,7 @@ mod tests {
|
||||
caches.insert_storage(address, storage_key, Some(storage_value));
|
||||
|
||||
// check that the storage returns the cached value
|
||||
let slot_status = caches.get_storage(&address, &storage_key);
|
||||
let (slot_status, _) = caches.get_storage(&address, &storage_key);
|
||||
assert_eq!(slot_status, SlotStatus::Value(storage_value));
|
||||
}
|
||||
|
||||
@@ -814,7 +842,7 @@ mod tests {
|
||||
let caches = ExecutionCacheBuilder::default().build_caches(1000);
|
||||
|
||||
// check that the storage is not cached
|
||||
let slot_status = caches.get_storage(&address, &storage_key);
|
||||
let (slot_status, _) = caches.get_storage(&address, &storage_key);
|
||||
assert_eq!(slot_status, SlotStatus::NotCached);
|
||||
}
|
||||
|
||||
@@ -830,7 +858,7 @@ mod tests {
|
||||
caches.insert_storage(address, storage_key, None);
|
||||
|
||||
// check that the storage is empty
|
||||
let slot_status = caches.get_storage(&address, &storage_key);
|
||||
let (slot_status, _) = caches.get_storage(&address, &storage_key);
|
||||
assert_eq!(slot_status, SlotStatus::Empty);
|
||||
}
|
||||
|
||||
|
||||
@@ -1901,6 +1901,16 @@ where
|
||||
false
|
||||
}
|
||||
|
||||
/// Returns true if the given hash is part of the last received sync target fork choice update.
|
||||
///
|
||||
/// See [`ForkchoiceStateTracker::sync_target_state`]
|
||||
fn is_any_sync_target(&self, block_hash: B256) -> bool {
|
||||
if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
|
||||
return target.contains(block_hash)
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Checks if the given `check` hash points to an invalid header, inserting the given `head`
|
||||
/// block into the invalid header cache if the `check` hash has a known invalid ancestor.
|
||||
///
|
||||
@@ -2040,9 +2050,12 @@ where
|
||||
match self.insert_block(child) {
|
||||
Ok(res) => {
|
||||
debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
|
||||
if self.is_sync_target_head(child_num_hash.hash) &&
|
||||
if self.is_any_sync_target(child_num_hash.hash) &&
|
||||
matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
|
||||
{
|
||||
debug!(target: "engine::tree", child =?child_num_hash, "connected sync target block");
|
||||
// we just inserted a block that we know is part of the canonical chain, so
|
||||
// we can make it canonical
|
||||
self.make_canonical(child_num_hash.hash)?;
|
||||
}
|
||||
}
|
||||
@@ -2348,11 +2361,15 @@ where
|
||||
// try to append the block
|
||||
match self.insert_block(block) {
|
||||
Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
|
||||
if self.is_sync_target_head(block_num_hash.hash) {
|
||||
trace!(target: "engine::tree", "appended downloaded sync target block");
|
||||
// check if we just inserted a block that's part of sync targets,
|
||||
// i.e. head, safe, or finalized
|
||||
if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() &&
|
||||
sync_target.contains(block_num_hash.hash)
|
||||
{
|
||||
debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block");
|
||||
|
||||
// we just inserted the current sync target block, we can try to make it
|
||||
// canonical
|
||||
// we just inserted a block that we know is part of the canonical chain, so we
|
||||
// can make it canonical
|
||||
return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
|
||||
sync_target_head: block_num_hash.hash,
|
||||
})))
|
||||
|
||||
@@ -21,6 +21,7 @@ use executor::WorkloadExecutor;
|
||||
use multiproof::{SparseTrieUpdate, *};
|
||||
use parking_lot::RwLock;
|
||||
use prewarm::PrewarmMetrics;
|
||||
use rayon::iter::{ParallelBridge, ParallelIterator};
|
||||
use reth_engine_primitives::ExecutableTxIterator;
|
||||
use reth_evm::{
|
||||
execute::{ExecutableTxFor, WithTxEnv},
|
||||
@@ -40,6 +41,7 @@ use reth_trie_sparse::{
|
||||
};
|
||||
use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
sync::{
|
||||
atomic::AtomicBool,
|
||||
mpsc::{self, channel},
|
||||
@@ -312,21 +314,50 @@ where
|
||||
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>>,
|
||||
usize,
|
||||
) {
|
||||
let (transactions, convert) = transactions.into();
|
||||
let transactions = transactions.into_iter();
|
||||
// Get the transaction count for prewarming task
|
||||
// Use upper bound if available (more accurate), otherwise use lower bound
|
||||
let (lower, upper) = transactions.size_hint();
|
||||
let transaction_count_hint = upper.unwrap_or(lower);
|
||||
|
||||
// Spawn a task that iterates through all transactions in parallel and sends them to the
|
||||
// main task.
|
||||
let (tx, rx) = mpsc::channel();
|
||||
self.executor.spawn_blocking(move || {
|
||||
transactions.enumerate().par_bridge().for_each_with(tx, |sender, (idx, tx)| {
|
||||
let tx = convert(tx);
|
||||
let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
|
||||
let _ = sender.send((idx, tx));
|
||||
});
|
||||
});
|
||||
|
||||
// Spawn a task that processes out-of-order transactions from the task above and sends them
|
||||
// to prewarming and execution tasks.
|
||||
let (prewarm_tx, prewarm_rx) = mpsc::channel();
|
||||
let (execute_tx, execute_rx) = mpsc::channel();
|
||||
self.executor.spawn_blocking(move || {
|
||||
for tx in transactions {
|
||||
let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
|
||||
let mut next_for_execution = 0;
|
||||
let mut queue = BTreeMap::new();
|
||||
while let Ok((idx, tx)) = rx.recv() {
|
||||
// only send Ok(_) variants to prewarming task
|
||||
if let Ok(tx) = &tx {
|
||||
let _ = prewarm_tx.send(tx.clone());
|
||||
}
|
||||
let _ = execute_tx.send(tx);
|
||||
|
||||
if next_for_execution == idx {
|
||||
let _ = execute_tx.send(tx);
|
||||
next_for_execution += 1;
|
||||
|
||||
while let Some(entry) = queue.first_entry() &&
|
||||
*entry.key() == next_for_execution
|
||||
{
|
||||
let _ = execute_tx.send(entry.remove());
|
||||
next_for_execution += 1;
|
||||
}
|
||||
} else {
|
||||
queue.insert(idx, tx);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -1017,13 +1048,19 @@ mod tests {
|
||||
|
||||
let provider_factory = BlockchainProvider::new(factory).unwrap();
|
||||
|
||||
let mut handle = payload_processor.spawn(
|
||||
Default::default(),
|
||||
core::iter::empty::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>(),
|
||||
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
|
||||
OverlayStateProviderFactory::new(provider_factory),
|
||||
&TreeConfig::default(),
|
||||
);
|
||||
let mut handle =
|
||||
payload_processor.spawn(
|
||||
Default::default(),
|
||||
(
|
||||
core::iter::empty::<
|
||||
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
|
||||
>(),
|
||||
std::convert::identity,
|
||||
),
|
||||
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
|
||||
OverlayStateProviderFactory::new(provider_factory),
|
||||
&TreeConfig::default(),
|
||||
);
|
||||
|
||||
let mut state_hook = handle.state_hook();
|
||||
|
||||
|
||||
@@ -213,16 +213,31 @@ where
|
||||
Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
|
||||
{
|
||||
match input {
|
||||
BlockOrPayload::Payload(payload) => Ok(Either::Left(
|
||||
self.evm_config
|
||||
BlockOrPayload::Payload(payload) => {
|
||||
let (iter, convert) = self
|
||||
.evm_config
|
||||
.tx_iterator_for_payload(payload)
|
||||
.map_err(NewPayloadError::other)?
|
||||
.map(|res| res.map(Either::Left).map_err(NewPayloadError::other)),
|
||||
)),
|
||||
.into();
|
||||
|
||||
let iter = Either::Left(iter.into_iter().map(Either::Left));
|
||||
let convert = move |tx| {
|
||||
let Either::Left(tx) = tx else { unreachable!() };
|
||||
convert(tx).map(Either::Left).map_err(Either::Left)
|
||||
};
|
||||
|
||||
// Box the closure to satisfy the `Fn` bound both here and in the branch below
|
||||
Ok((iter, Box::new(convert) as Box<dyn Fn(_) -> _ + Send + Sync + 'static>))
|
||||
}
|
||||
BlockOrPayload::Block(block) => {
|
||||
Ok(Either::Right(block.body().clone_transactions().into_iter().map(|tx| {
|
||||
Ok(Either::Right(tx.try_into_recovered().map_err(NewPayloadError::other)?))
|
||||
})))
|
||||
let iter =
|
||||
Either::Right(block.body().clone_transactions().into_iter().map(Either::Right));
|
||||
let convert = move |tx: Either<_, N::SignedTx>| {
|
||||
let Either::Right(tx) = tx else { unreachable!() };
|
||||
tx.try_into_recovered().map(Either::Right).map_err(Either::Right)
|
||||
};
|
||||
|
||||
Ok((iter, Box::new(convert)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,190 +0,0 @@
|
||||
//! Simple decoding and decompressing tests
|
||||
//! for mainnet era files
|
||||
|
||||
use reth_era::{
|
||||
common::file_ops::{StreamReader, StreamWriter},
|
||||
era::file::{EraReader, EraWriter},
|
||||
};
|
||||
use std::io::Cursor;
|
||||
|
||||
use crate::{EraTestDownloader, HOODI};
|
||||
|
||||
// Helper function to test decompression and decoding for a specific era file
|
||||
async fn test_era_file_decompression_and_decoding(
|
||||
downloader: &EraTestDownloader,
|
||||
filename: &str,
|
||||
network: &str,
|
||||
) -> eyre::Result<()> {
|
||||
println!("\nTesting file: {filename}");
|
||||
let file = downloader.open_era_file(filename, network).await?;
|
||||
|
||||
// Handle genesis era separately
|
||||
if file.group.is_genesis() {
|
||||
// Genesis has no blocks
|
||||
assert_eq!(file.group.blocks.len(), 0, "Genesis should have no blocks");
|
||||
assert!(file.group.slot_index.is_none(), "Genesis should not have block slot index");
|
||||
|
||||
// Test genesis state decompression
|
||||
let state_data = file.group.era_state.decompress()?;
|
||||
assert!(!state_data.is_empty(), "Genesis state should decompress to non-empty data");
|
||||
|
||||
// Verify state slot index
|
||||
assert_eq!(
|
||||
file.group.state_slot_index.slot_count(),
|
||||
1,
|
||||
"Genesis state index should have count of 1"
|
||||
);
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
{
|
||||
let mut writer = EraWriter::new(&mut buffer);
|
||||
writer.write_file(&file)?;
|
||||
}
|
||||
|
||||
let reader = EraReader::new(Cursor::new(&buffer));
|
||||
let read_back_file = reader.read(file.id.network_name.clone())?;
|
||||
|
||||
assert_eq!(
|
||||
file.group.era_state.decompress()?,
|
||||
read_back_file.group.era_state.decompress()?,
|
||||
"Genesis state data should be identical"
|
||||
);
|
||||
|
||||
println!("Genesis era verified successfully");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Non-genesis era - test beacon blocks
|
||||
println!(
|
||||
" Non-genesis era with {} beacon blocks, starting at slot {}",
|
||||
file.group.blocks.len(),
|
||||
file.group.starting_slot()
|
||||
);
|
||||
|
||||
// Test beacon block decompression across different positions
|
||||
let test_block_indices = [
|
||||
0, // First block
|
||||
file.group.blocks.len() / 2, // Middle block
|
||||
file.group.blocks.len() - 1, // Last block
|
||||
];
|
||||
|
||||
for &block_idx in &test_block_indices {
|
||||
let block = &file.group.blocks[block_idx];
|
||||
let slot = file.group.starting_slot() + block_idx as u64;
|
||||
|
||||
println!(
|
||||
"\n Testing beacon block at slot {}, compressed size: {} bytes",
|
||||
slot,
|
||||
block.data.len()
|
||||
);
|
||||
|
||||
// Test beacon block decompression
|
||||
let block_data = block.decompress()?;
|
||||
assert!(
|
||||
!block_data.is_empty(),
|
||||
"Beacon block at slot {slot} decompression should produce non-empty data"
|
||||
);
|
||||
}
|
||||
|
||||
// Test era state decompression
|
||||
let state_data = file.group.era_state.decompress()?;
|
||||
assert!(!state_data.is_empty(), "Era state decompression should produce non-empty data");
|
||||
println!(" Era state decompressed: {} bytes", state_data.len());
|
||||
|
||||
// Verify slot indices
|
||||
if let Some(ref block_slot_index) = file.group.slot_index {
|
||||
println!(
|
||||
" Block slot index: starting_slot={}, count={}",
|
||||
block_slot_index.starting_slot,
|
||||
block_slot_index.slot_count()
|
||||
);
|
||||
|
||||
// Check for empty slots
|
||||
let empty_slots: Vec<usize> = (0..block_slot_index.slot_count())
|
||||
.filter(|&i| !block_slot_index.has_data_at_slot(i))
|
||||
.collect();
|
||||
|
||||
if !empty_slots.is_empty() {
|
||||
println!(
|
||||
" Found {} empty slots (first few): {:?}",
|
||||
empty_slots.len(),
|
||||
&empty_slots[..empty_slots.len().min(5)]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Test round-trip serialization
|
||||
let mut buffer = Vec::new();
|
||||
{
|
||||
let mut writer = EraWriter::new(&mut buffer);
|
||||
writer.write_file(&file)?;
|
||||
}
|
||||
|
||||
// Read back from buffer
|
||||
let reader = EraReader::new(Cursor::new(&buffer));
|
||||
let read_back_file = reader.read(file.id.network_name.clone())?;
|
||||
|
||||
// Verify basic properties are preserved
|
||||
assert_eq!(file.id.network_name, read_back_file.id.network_name);
|
||||
assert_eq!(file.id.start_slot, read_back_file.id.start_slot);
|
||||
assert_eq!(file.id.slot_count, read_back_file.id.slot_count);
|
||||
assert_eq!(file.group.blocks.len(), read_back_file.group.blocks.len());
|
||||
|
||||
// Test data preservation for beacon blocks
|
||||
for &idx in &test_block_indices {
|
||||
let original_block = &file.group.blocks[idx];
|
||||
let read_back_block = &read_back_file.group.blocks[idx];
|
||||
let slot = file.group.starting_slot() + idx as u64;
|
||||
|
||||
// Test that decompressed data is identical
|
||||
assert_eq!(
|
||||
original_block.decompress()?,
|
||||
read_back_block.decompress()?,
|
||||
"Beacon block data should be identical for slot {slot}"
|
||||
);
|
||||
}
|
||||
|
||||
// Test state data preservation
|
||||
assert_eq!(
|
||||
file.group.era_state.decompress()?,
|
||||
read_back_file.group.era_state.decompress()?,
|
||||
"Era state data should be identical"
|
||||
);
|
||||
|
||||
// Test slot indices preservation
|
||||
if let (Some(original_index), Some(read_index)) =
|
||||
(&file.group.slot_index, &read_back_file.group.slot_index)
|
||||
{
|
||||
assert_eq!(
|
||||
original_index.starting_slot, read_index.starting_slot,
|
||||
"Block slot index starting slot should match"
|
||||
);
|
||||
assert_eq!(
|
||||
original_index.offsets, read_index.offsets,
|
||||
"Block slot index offsets should match"
|
||||
);
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
file.group.state_slot_index.starting_slot,
|
||||
read_back_file.group.state_slot_index.starting_slot,
|
||||
"State slot index starting slot should match"
|
||||
);
|
||||
assert_eq!(
|
||||
file.group.state_slot_index.offsets, read_back_file.group.state_slot_index.offsets,
|
||||
"State slot index offsets should match"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test_case::test_case("hoodi-00000-212f13fc.era"; "era_dd_hoodi_0")]
|
||||
#[test_case::test_case("hoodi-00021-857e418b.era"; "era_dd_hoodi_21")]
|
||||
#[test_case::test_case("hoodi-00175-202aaa6d.era"; "era_dd_hoodi_175")]
|
||||
#[test_case::test_case("hoodi-00201-0d521fc8.era"; "era_dd_hoodi_201")]
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
#[ignore = "download intensive"]
|
||||
async fn test_hoodi_era1_file_decompression_and_decoding(filename: &str) -> eyre::Result<()> {
|
||||
let downloader = EraTestDownloader::new().await?;
|
||||
test_era_file_decompression_and_decoding(&downloader, filename, HOODI).await
|
||||
}
|
||||
@@ -1,2 +1,2 @@
|
||||
mod dd;
|
||||
mod genesis;
|
||||
mod roundtrip;
|
||||
|
||||
228
crates/era/tests/it/era/roundtrip.rs
Normal file
228
crates/era/tests/it/era/roundtrip.rs
Normal file
@@ -0,0 +1,228 @@
|
||||
//! Roundtrip tests for `.era` files.
|
||||
//!
|
||||
//! These tests verify the full lifecycle of era files by:
|
||||
//! - Reading files from their original source
|
||||
//! - Decompressing their contents
|
||||
//! - Re-compressing the data
|
||||
//! - Writing the data back to a new file
|
||||
//! - Confirming that all original data is preserved throughout the process
|
||||
//!
|
||||
//!
|
||||
//! Only a couple of era files are downloaded from `https://mainnet.era.nimbus.team/` for mainnet
|
||||
//! and `https://hoodi.era.nimbus.team/` for hoodi to keep the tests efficient.
|
||||
|
||||
use reth_era::{
|
||||
common::file_ops::{EraFileFormat, StreamReader, StreamWriter},
|
||||
era::{
|
||||
file::{EraFile, EraReader, EraWriter},
|
||||
types::{
|
||||
consensus::{CompressedBeaconState, CompressedSignedBeaconBlock},
|
||||
group::{EraGroup, EraId},
|
||||
},
|
||||
},
|
||||
};
|
||||
use std::io::Cursor;
|
||||
|
||||
use crate::{EraTestDownloader, HOODI, MAINNET};
|
||||
|
||||
// Helper function to test roundtrip compression/encoding for a specific file
|
||||
async fn test_era_file_roundtrip(
|
||||
downloader: &EraTestDownloader,
|
||||
filename: &str,
|
||||
network: &str,
|
||||
) -> eyre::Result<()> {
|
||||
println!("\nTesting roundtrip for file: {filename}");
|
||||
|
||||
let original_file = downloader.open_era_file(filename, network).await?;
|
||||
|
||||
if original_file.group.is_genesis() {
|
||||
println!("Genesis era detected, using special handling");
|
||||
assert_eq!(original_file.group.blocks.len(), 0, "Genesis should have no blocks");
|
||||
assert!(
|
||||
original_file.group.slot_index.is_none(),
|
||||
"Genesis should not have block slot index"
|
||||
);
|
||||
|
||||
let state_data = original_file.group.era_state.decompress()?;
|
||||
println!(" Genesis state decompressed: {} bytes", state_data.len());
|
||||
|
||||
// File roundtrip test
|
||||
let mut buffer = Vec::new();
|
||||
{
|
||||
let mut writer = EraWriter::new(&mut buffer);
|
||||
writer.write_file(&original_file)?;
|
||||
}
|
||||
|
||||
let reader = EraReader::new(Cursor::new(&buffer));
|
||||
let roundtrip_file = reader.read(network.to_string())?;
|
||||
|
||||
assert_eq!(
|
||||
original_file.group.era_state.decompress()?,
|
||||
roundtrip_file.group.era_state.decompress()?,
|
||||
"Genesis state data should be identical after roundtrip"
|
||||
);
|
||||
|
||||
println!("Genesis era verified successfully");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// non genesis start
|
||||
let original_state_data = original_file.group.era_state.decompress()?;
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
{
|
||||
let mut writer = EraWriter::new(&mut buffer);
|
||||
writer.write_file(&original_file)?;
|
||||
}
|
||||
|
||||
// Read back from buffer
|
||||
let reader = EraReader::new(Cursor::new(&buffer));
|
||||
let roundtrip_file = reader.read(network.to_string())?;
|
||||
|
||||
assert_eq!(
|
||||
original_file.id.network_name, roundtrip_file.id.network_name,
|
||||
"Network name should match after roundtrip"
|
||||
);
|
||||
assert_eq!(
|
||||
original_file.id.start_slot, roundtrip_file.id.start_slot,
|
||||
"Start slot should match after roundtrip"
|
||||
);
|
||||
assert_eq!(
|
||||
original_file.group.blocks.len(),
|
||||
roundtrip_file.group.blocks.len(),
|
||||
"Block count should match after roundtrip"
|
||||
);
|
||||
|
||||
// Select a few blocks to test
|
||||
let test_block_indices = [
|
||||
0, // First block
|
||||
original_file.group.blocks.len() / 2, // Middle block
|
||||
original_file.group.blocks.len() - 1, // Last block
|
||||
];
|
||||
|
||||
// Test individual beacon blocks
|
||||
for &block_idx in &test_block_indices {
|
||||
let original_block = &original_file.group.blocks[block_idx];
|
||||
let roundtrip_block = &roundtrip_file.group.blocks[block_idx];
|
||||
|
||||
let original_block_data = original_block.decompress()?;
|
||||
let roundtrip_block_data = roundtrip_block.decompress()?;
|
||||
|
||||
// Verify file roundtrip preserves data
|
||||
assert_eq!(
|
||||
original_block_data, roundtrip_block_data,
|
||||
"Block {block_idx} data should be identical after file roundtrip"
|
||||
);
|
||||
|
||||
// Verify compression roundtrip
|
||||
let recompressed_block = CompressedSignedBeaconBlock::from_ssz(&original_block_data)?;
|
||||
let recompressed_block_data = recompressed_block.decompress()?;
|
||||
|
||||
assert_eq!(
|
||||
original_block_data, recompressed_block_data,
|
||||
"Block {block_idx} should be identical after re-compression cycle"
|
||||
);
|
||||
}
|
||||
|
||||
let roundtrip_state_data = roundtrip_file.group.era_state.decompress()?;
|
||||
|
||||
assert_eq!(
|
||||
original_state_data, roundtrip_state_data,
|
||||
"Era state data should be identical after roundtrip"
|
||||
);
|
||||
|
||||
let recompressed_state = CompressedBeaconState::from_ssz(&roundtrip_state_data)?;
|
||||
let recompressed_state_data = recompressed_state.decompress()?;
|
||||
|
||||
assert_eq!(
|
||||
original_state_data, recompressed_state_data,
|
||||
"Era state data should be identical after re-compression cycle"
|
||||
);
|
||||
|
||||
let recompressed_blocks: Vec<CompressedSignedBeaconBlock> = roundtrip_file
|
||||
.group
|
||||
.blocks
|
||||
.iter()
|
||||
.map(|block| {
|
||||
let data = block.decompress()?;
|
||||
CompressedSignedBeaconBlock::from_ssz(&data)
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
let new_group = if let Some(ref block_index) = roundtrip_file.group.slot_index {
|
||||
EraGroup::with_block_index(
|
||||
recompressed_blocks,
|
||||
recompressed_state,
|
||||
block_index.clone(),
|
||||
roundtrip_file.group.state_slot_index.clone(),
|
||||
)
|
||||
} else {
|
||||
EraGroup::new(
|
||||
recompressed_blocks,
|
||||
recompressed_state,
|
||||
roundtrip_file.group.state_slot_index,
|
||||
)
|
||||
};
|
||||
|
||||
let (start_slot, slot_count) = new_group.slot_range();
|
||||
let new_file = EraFile::new(new_group, EraId::new(network, start_slot, slot_count));
|
||||
|
||||
let mut reconstructed_buffer = Vec::new();
|
||||
{
|
||||
let mut writer = EraWriter::new(&mut reconstructed_buffer);
|
||||
writer.write_file(&new_file)?;
|
||||
}
|
||||
|
||||
let reader = EraReader::new(Cursor::new(&reconstructed_buffer));
|
||||
let reconstructed_file = reader.read(network.to_string())?;
|
||||
|
||||
assert_eq!(
|
||||
original_file.group.blocks.len(),
|
||||
reconstructed_file.group.blocks.len(),
|
||||
"Block count should match after full reconstruction"
|
||||
);
|
||||
|
||||
// Verify all reconstructed blocks match
|
||||
for (idx, (orig, recon)) in
|
||||
original_file.group.blocks.iter().zip(reconstructed_file.group.blocks.iter()).enumerate()
|
||||
{
|
||||
assert_eq!(
|
||||
orig.decompress()?,
|
||||
recon.decompress()?,
|
||||
"Block {idx} should match after full reconstruction"
|
||||
);
|
||||
}
|
||||
|
||||
// Verify reconstructed state matches
|
||||
assert_eq!(
|
||||
original_state_data,
|
||||
reconstructed_file.group.era_state.decompress()?,
|
||||
"State should match after full reconstruction"
|
||||
);
|
||||
|
||||
println!("File {filename} roundtrip successful");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test_case::test_case("mainnet-00000-4b363db9.era"; "era_roundtrip_mainnet_0")]
|
||||
#[test_case::test_case("mainnet-00178-0d0a5290.era"; "era_roundtrip_mainnet_178")]
|
||||
#[test_case::test_case("mainnet-01070-7616e3e2.era"; "era_roundtrip_mainnet_1070")]
|
||||
#[test_case::test_case("mainnet-01267-e3ddc749.era"; "era_roundtrip_mainnet_1267")]
|
||||
#[test_case::test_case("mainnet-01592-d4dc8b98.era"; "era_roundtrip_mainnet_1592")]
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
#[ignore = "download intensive"]
|
||||
async fn test_roundtrip_compression_encoding_mainnet(filename: &str) -> eyre::Result<()> {
|
||||
let downloader = EraTestDownloader::new().await?;
|
||||
test_era_file_roundtrip(&downloader, filename, MAINNET).await
|
||||
}
|
||||
|
||||
#[test_case::test_case("hoodi-00000-212f13fc.era"; "era_roundtrip_hoodi_0")]
|
||||
#[test_case::test_case("hoodi-00021-857e418b.era"; "era_roundtrip_hoodi_21")]
|
||||
#[test_case::test_case("hoodi-00175-202aaa6d.era"; "era_roundtrip_hoodi_175")]
|
||||
#[test_case::test_case("hoodi-00201-0d521fc8.era"; "era_roundtrip_hoodi_201")]
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
#[ignore = "download intensive"]
|
||||
async fn test_roundtrip_compression_encoding_hoodi(filename: &str) -> eyre::Result<()> {
|
||||
let downloader = EraTestDownloader::new().await?;
|
||||
test_era_file_roundtrip(&downloader, filename, HOODI).await
|
||||
}
|
||||
@@ -1,159 +0,0 @@
|
||||
//! Simple decoding and decompressing tests
|
||||
//! for mainnet era1 files
|
||||
|
||||
use alloy_consensus::{BlockBody, Header};
|
||||
use alloy_primitives::U256;
|
||||
use reth_era::{
|
||||
common::file_ops::{StreamReader, StreamWriter},
|
||||
e2s::types::IndexEntry,
|
||||
era1::{
|
||||
file::{Era1Reader, Era1Writer},
|
||||
types::execution::CompressedBody,
|
||||
},
|
||||
};
|
||||
use reth_ethereum_primitives::TransactionSigned;
|
||||
use std::io::Cursor;
|
||||
|
||||
use crate::{EraTestDownloader, MAINNET};
|
||||
|
||||
// Helper function to test decompression and decoding for a specific era1 file
|
||||
async fn test_file_decompression(
|
||||
downloader: &EraTestDownloader,
|
||||
filename: &str,
|
||||
) -> eyre::Result<()> {
|
||||
println!("\nTesting file: {filename}");
|
||||
let file = downloader.open_era1_file(filename, MAINNET).await?;
|
||||
|
||||
// Test block decompression across different positions in the file
|
||||
let test_block_indices = [
|
||||
0, // First block
|
||||
file.group.blocks.len() / 2, // Middle block
|
||||
file.group.blocks.len() - 1, // Last block
|
||||
];
|
||||
|
||||
for &block_idx in &test_block_indices {
|
||||
let block = &file.group.blocks[block_idx];
|
||||
let block_number = file.group.block_index.starting_number() + block_idx as u64;
|
||||
|
||||
println!(
|
||||
"\n Testing block {}, compressed body size: {} bytes",
|
||||
block_number,
|
||||
block.body.data.len()
|
||||
);
|
||||
|
||||
// Test header decompression and decoding
|
||||
let header_data = block.header.decompress()?;
|
||||
assert!(
|
||||
!header_data.is_empty(),
|
||||
"Block {block_number} header decompression should produce non-empty data"
|
||||
);
|
||||
|
||||
let header = block.header.decode_header()?;
|
||||
assert_eq!(header.number, block_number, "Decoded header should have correct block number");
|
||||
println!("Header decompression and decoding successful");
|
||||
|
||||
// Test body decompression
|
||||
let body_data = block.body.decompress()?;
|
||||
assert!(
|
||||
!body_data.is_empty(),
|
||||
"Block {block_number} body decompression should produce non-empty data"
|
||||
);
|
||||
println!("Body decompression successful ({} bytes)", body_data.len());
|
||||
|
||||
let decoded_body: BlockBody<TransactionSigned> =
|
||||
CompressedBody::decode_body_from_decompressed::<TransactionSigned, Header>(&body_data)
|
||||
.expect("Failed to decode body");
|
||||
|
||||
println!(
|
||||
"Body decoding successful: {} transactions, {} ommers, withdrawals: {}",
|
||||
decoded_body.transactions.len(),
|
||||
decoded_body.ommers.len(),
|
||||
decoded_body.withdrawals.is_some()
|
||||
);
|
||||
|
||||
// Test receipts decompression
|
||||
let receipts_data = block.receipts.decompress()?;
|
||||
assert!(
|
||||
!receipts_data.is_empty(),
|
||||
"Block {block_number} receipts decompression should produce non-empty data"
|
||||
);
|
||||
println!("Receipts decompression successful ({} bytes)", receipts_data.len());
|
||||
|
||||
assert!(
|
||||
block.total_difficulty.value > U256::ZERO,
|
||||
"Block {block_number} should have non-zero difficulty"
|
||||
);
|
||||
println!("Total difficulty verified: {}", block.total_difficulty.value);
|
||||
}
|
||||
|
||||
// Test round-trip serialization
|
||||
println!("\n Testing data preservation roundtrip...");
|
||||
let mut buffer = Vec::new();
|
||||
{
|
||||
let mut writer = Era1Writer::new(&mut buffer);
|
||||
writer.write_file(&file)?;
|
||||
}
|
||||
|
||||
// Read back from buffer
|
||||
let reader = Era1Reader::new(Cursor::new(&buffer));
|
||||
let read_back_file = reader.read(file.id.network_name.clone())?;
|
||||
|
||||
// Verify basic properties are preserved
|
||||
assert_eq!(file.id.network_name, read_back_file.id.network_name);
|
||||
assert_eq!(file.id.start_block, read_back_file.id.start_block);
|
||||
assert_eq!(file.group.blocks.len(), read_back_file.group.blocks.len());
|
||||
assert_eq!(file.group.accumulator.root, read_back_file.group.accumulator.root);
|
||||
|
||||
// Test data preservation for some blocks
|
||||
for &idx in &test_block_indices {
|
||||
let original_block = &file.group.blocks[idx];
|
||||
let read_back_block = &read_back_file.group.blocks[idx];
|
||||
let block_number = file.group.block_index.starting_number() + idx as u64;
|
||||
|
||||
println!("Block {block_number} details:");
|
||||
println!(" Header size: {} bytes", original_block.header.data.len());
|
||||
println!(" Body size: {} bytes", original_block.body.data.len());
|
||||
println!(" Receipts size: {} bytes", original_block.receipts.data.len());
|
||||
|
||||
// Test that decompressed data is identical
|
||||
assert_eq!(
|
||||
original_block.header.decompress()?,
|
||||
read_back_block.header.decompress()?,
|
||||
"Header data should be identical for block {block_number}"
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
original_block.body.decompress()?,
|
||||
read_back_block.body.decompress()?,
|
||||
"Body data should be identical for block {block_number}"
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
original_block.receipts.decompress()?,
|
||||
read_back_block.receipts.decompress()?,
|
||||
"Receipts data should be identical for block {block_number}"
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
original_block.total_difficulty.value, read_back_block.total_difficulty.value,
|
||||
"Total difficulty should be identical for block {block_number}"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test_case::test_case("mainnet-00000-5ec1ffb8.era1"; "era_dd_mainnet_0")]
|
||||
#[test_case::test_case("mainnet-00003-d8b8a40b.era1"; "era_dd_mainnet_3")]
|
||||
#[test_case::test_case("mainnet-00151-e322efe1.era1"; "era_dd_mainnet_151")]
|
||||
#[test_case::test_case("mainnet-00293-0d6c5812.era1"; "era_dd_mainnet_293")]
|
||||
#[test_case::test_case("mainnet-00443-ea71b6f9.era1"; "era_dd_mainnet_443")]
|
||||
#[test_case::test_case("mainnet-01367-d7efc68f.era1"; "era_dd_mainnet_1367")]
|
||||
#[test_case::test_case("mainnet-01610-99fdde4b.era1"; "era_dd_mainnet_1610")]
|
||||
#[test_case::test_case("mainnet-01895-3f81607c.era1"; "era_dd_mainnet_1895")]
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
#[ignore = "download intensive"]
|
||||
async fn test_mainnet_era1_file_decompression_and_decoding(filename: &str) -> eyre::Result<()> {
|
||||
let downloader = EraTestDownloader::new().await?;
|
||||
test_file_decompression(&downloader, filename).await
|
||||
}
|
||||
@@ -1,3 +1,2 @@
|
||||
mod dd;
|
||||
mod genesis;
|
||||
mod roundtrip;
|
||||
|
||||
@@ -6,6 +6,9 @@
|
||||
//! - Re-encoding and recompressing the data
|
||||
//! - Writing the data back to a new file
|
||||
//! - Confirming that all original data is preserved throughout the process
|
||||
//!
|
||||
//! Only a couple of era1 files are downloaded from <https://era.ithaca.xyz/era1/> for mainnet
|
||||
//! and <https://era.ithaca.xyz/sepolia-era1/> for sepolia to keep the tests efficient.
|
||||
|
||||
use alloy_consensus::{BlockBody, BlockHeader, Header, ReceiptEnvelope};
|
||||
use reth_era::{
|
||||
@@ -27,7 +30,7 @@ use std::io::Cursor;
|
||||
use crate::{EraTestDownloader, MAINNET, SEPOLIA};
|
||||
|
||||
// Helper function to test roundtrip compression/encoding for a specific file
|
||||
async fn test_file_roundtrip(
|
||||
async fn test_era1_file_roundtrip(
|
||||
downloader: &EraTestDownloader,
|
||||
filename: &str,
|
||||
network: &str,
|
||||
@@ -252,27 +255,27 @@ async fn test_file_roundtrip(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test_case::test_case("mainnet-00000-5ec1ffb8.era1"; "era_mainnet_0")]
|
||||
#[test_case::test_case("mainnet-00151-e322efe1.era1"; "era_mainnet_151")]
|
||||
#[test_case::test_case("mainnet-01367-d7efc68f.era1"; "era_mainnet_1367")]
|
||||
#[test_case::test_case("mainnet-01895-3f81607c.era1"; "era_mainnet_1895")]
|
||||
#[test_case::test_case("mainnet-00000-5ec1ffb8.era1"; "era1_roundtrip_mainnet_0")]
|
||||
#[test_case::test_case("mainnet-00151-e322efe1.era1"; "era1_roundtrip_mainnet_151")]
|
||||
#[test_case::test_case("mainnet-01367-d7efc68f.era1"; "era1_roundtrip_mainnet_1367")]
|
||||
#[test_case::test_case("mainnet-01895-3f81607c.era1"; "era1_roundtrip_mainnet_1895")]
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
#[ignore = "download intensive"]
|
||||
async fn test_roundtrip_compression_encoding_mainnet(filename: &str) -> eyre::Result<()> {
|
||||
let downloader = EraTestDownloader::new().await?;
|
||||
test_file_roundtrip(&downloader, filename, MAINNET).await
|
||||
test_era1_file_roundtrip(&downloader, filename, MAINNET).await
|
||||
}
|
||||
|
||||
#[test_case::test_case("sepolia-00000-643a00f7.era1"; "era_sepolia_0")]
|
||||
#[test_case::test_case("sepolia-00074-0e81003c.era1"; "era_sepolia_74")]
|
||||
#[test_case::test_case("sepolia-00173-b6924da5.era1"; "era_sepolia_173")]
|
||||
#[test_case::test_case("sepolia-00182-a4f0a8a1.era1"; "era_sepolia_182")]
|
||||
#[test_case::test_case("sepolia-00000-643a00f7.era1"; "era1_roundtrip_sepolia_0")]
|
||||
#[test_case::test_case("sepolia-00074-0e81003c.era1"; "era1_roundtrip_sepolia_74")]
|
||||
#[test_case::test_case("sepolia-00173-b6924da5.era1"; "era1_roundtrip_sepolia_173")]
|
||||
#[test_case::test_case("sepolia-00182-a4f0a8a1.era1"; "era1_roundtrip_sepolia_182")]
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
#[ignore = "download intensive"]
|
||||
async fn test_roundtrip_compression_encoding_sepolia(filename: &str) -> eyre::Result<()> {
|
||||
let downloader = EraTestDownloader::new().await?;
|
||||
|
||||
test_file_roundtrip(&downloader, filename, SEPOLIA).await?;
|
||||
test_era1_file_roundtrip(&downloader, filename, SEPOLIA).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -91,16 +91,19 @@ const ERA_MAINNET_URL: &str = "https://mainnet.era.nimbus.team/";
|
||||
/// Succinct list of mainnet files we want to download
|
||||
/// from <https://mainnet.era.nimbus.team/> //TODO: to replace with internal era files hosting url
|
||||
/// for testing purposes
|
||||
const ERA_MAINNET_FILES_NAMES: [&str; 4] = [
|
||||
const ERA_MAINNET_FILES_NAMES: [&str; 8] = [
|
||||
"mainnet-00000-4b363db9.era",
|
||||
"mainnet-00178-0d0a5290.era",
|
||||
"mainnet-00518-4e267a3a.era",
|
||||
"mainnet-01140-f70d4869.era",
|
||||
"mainnet-00780-bb546fec.era",
|
||||
"mainnet-01070-7616e3e2.era",
|
||||
"mainnet-01267-e3ddc749.era",
|
||||
"mainnet-01581-82073d28.era",
|
||||
"mainnet-01592-d4dc8b98.era",
|
||||
];
|
||||
|
||||
/// Utility for downloading `.era1` files for tests
|
||||
/// in a temporary directory
|
||||
/// and caching them in memory
|
||||
/// Utility for downloading `.era` and `.era1` files for tests
|
||||
/// in a temporary directory and caching them in memory
|
||||
#[derive(Debug)]
|
||||
struct EraTestDownloader {
|
||||
/// Temporary directory for storing downloaded files
|
||||
@@ -180,7 +183,7 @@ impl EraTestDownloader {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get network configuration, URL and supported files, based on network and file type
|
||||
/// Get network configuration, URL and supported files, based on network and file type
|
||||
fn get_network_config(
|
||||
&self,
|
||||
filename: &str,
|
||||
@@ -202,14 +205,13 @@ impl EraTestDownloader {
|
||||
}
|
||||
}
|
||||
|
||||
/// open .era1 file, downloading it if necessary
|
||||
/// Open `.era1` file, downloading it if necessary
|
||||
async fn open_era1_file(&self, filename: &str, network: &str) -> Result<Era1File> {
|
||||
let path = self.download_file(filename, network).await?;
|
||||
Era1Reader::open(&path, network).map_err(|e| eyre!("Failed to open Era1 file: {e}"))
|
||||
}
|
||||
|
||||
/// open .era file, downloading it if necessary
|
||||
#[allow(dead_code)]
|
||||
/// Open `.era` file, downloading it if necessary
|
||||
async fn open_era_file(&self, filename: &str, network: &str) -> Result<EraFile> {
|
||||
let path = self.download_file(filename, network).await?;
|
||||
EraReader::open(&path, network).map_err(|e| eyre!("Failed to open Era1 file: {e}"))
|
||||
|
||||
@@ -289,12 +289,15 @@ where
|
||||
&self,
|
||||
payload: &ExecutionData,
|
||||
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
|
||||
Ok(payload.payload.transactions().clone().into_iter().map(|tx| {
|
||||
let txs = payload.payload.transactions().clone().into_iter();
|
||||
let convert = |tx: Bytes| {
|
||||
let tx =
|
||||
TxTy::<Self::Primitives>::decode_2718_exact(tx.as_ref()).map_err(AnyError::new)?;
|
||||
let signer = tx.try_recover().map_err(AnyError::new)?;
|
||||
Ok::<_, AnyError>(tx.with_signer(signer))
|
||||
}))
|
||||
};
|
||||
|
||||
Ok((txs, convert))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -18,22 +18,51 @@ pub trait ConfigureEngineEvm<ExecutionData>: ConfigureEvm {
|
||||
) -> Result<impl ExecutableTxIterator<Self>, Self::Error>;
|
||||
}
|
||||
|
||||
/// Iterator over executable transactions.
|
||||
pub trait ExecutableTxIterator<Evm: ConfigureEvm>:
|
||||
Iterator<Item = Result<Self::Tx, Self::Error>> + Send + 'static
|
||||
{
|
||||
/// A helper trait representing a pair of a "raw" transactions iterator and a closure that can be
|
||||
/// used to convert them to an executable transaction. This tuple is used in the engine to
|
||||
/// parallelize heavy work like decoding or recovery.
|
||||
pub trait ExecutableTxTuple: Into<(Self::Iter, Self::Convert)> + Send + 'static {
|
||||
/// Raw transaction that can be converted to an [`ExecutableTxTuple::Tx`]
|
||||
///
|
||||
/// This can be any type that can be converted to an [`ExecutableTxTuple::Tx`]. For example,
|
||||
/// an unrecovered transaction or just the transaction bytes.
|
||||
type RawTx: Send + Sync + 'static;
|
||||
/// The executable transaction type iterator yields.
|
||||
type Tx: ExecutableTxFor<Evm> + Clone + Send + Sync + 'static;
|
||||
type Tx: Clone + Send + Sync + 'static;
|
||||
/// Errors that may occur while recovering or decoding transactions.
|
||||
type Error: core::error::Error + Send + Sync + 'static;
|
||||
|
||||
/// Iterator over [`ExecutableTxTuple::Tx`]
|
||||
type Iter: Iterator<Item = Self::RawTx> + Send + 'static;
|
||||
/// Closure that can be used to convert a [`ExecutableTxTuple::RawTx`] to a
|
||||
/// [`ExecutableTxTuple::Tx`]. This might involve heavy work like decoding or recovery
|
||||
/// and will be parallelized in the engine.
|
||||
type Convert: Fn(Self::RawTx) -> Result<Self::Tx, Self::Error> + Send + Sync + 'static;
|
||||
}
|
||||
|
||||
impl<Evm: ConfigureEvm, Tx, Err, T> ExecutableTxIterator<Evm> for T
|
||||
impl<RawTx, Tx, Err, I, F> ExecutableTxTuple for (I, F)
|
||||
where
|
||||
Tx: ExecutableTxFor<Evm> + Clone + Send + Sync + 'static,
|
||||
RawTx: Send + Sync + 'static,
|
||||
Tx: Clone + Send + Sync + 'static,
|
||||
Err: core::error::Error + Send + Sync + 'static,
|
||||
T: Iterator<Item = Result<Tx, Err>> + Send + 'static,
|
||||
I: Iterator<Item = RawTx> + Send + 'static,
|
||||
F: Fn(RawTx) -> Result<Tx, Err> + Send + Sync + 'static,
|
||||
{
|
||||
type RawTx = RawTx;
|
||||
type Tx = Tx;
|
||||
type Error = Err;
|
||||
|
||||
type Iter = I;
|
||||
type Convert = F;
|
||||
}
|
||||
|
||||
/// Iterator over executable transactions.
|
||||
pub trait ExecutableTxIterator<Evm: ConfigureEvm>:
|
||||
ExecutableTxTuple<Tx: ExecutableTxFor<Evm>>
|
||||
{
|
||||
}
|
||||
|
||||
impl<T, Evm: ConfigureEvm> ExecutableTxIterator<Evm> for T where
|
||||
T: ExecutableTxTuple<Tx: ExecutableTxFor<Evm>>
|
||||
{
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@ use alloy_consensus::{BlockHeader, Header};
|
||||
use alloy_eips::Decodable2718;
|
||||
use alloy_evm::{EvmFactory, FromRecoveredTx, FromTxWithEncoded};
|
||||
use alloy_op_evm::block::{receipt_builder::OpReceiptBuilder, OpTxEnv};
|
||||
use alloy_primitives::U256;
|
||||
use alloy_primitives::{Bytes, U256};
|
||||
use core::fmt::Debug;
|
||||
use op_alloy_consensus::EIP1559ParamError;
|
||||
use op_alloy_rpc_types_engine::OpExecutionData;
|
||||
@@ -265,12 +265,15 @@ where
|
||||
&self,
|
||||
payload: &OpExecutionData,
|
||||
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
|
||||
Ok(payload.payload.transactions().clone().into_iter().map(|encoded| {
|
||||
let transactions = payload.payload.transactions().clone().into_iter();
|
||||
let convert = |encoded: Bytes| {
|
||||
let tx = TxTy::<Self::Primitives>::decode_2718_exact(encoded.as_ref())
|
||||
.map_err(AnyError::new)?;
|
||||
let signer = tx.try_recover().map_err(AnyError::new)?;
|
||||
Ok::<_, AnyError>(WithEncoded::new(encoded, tx.with_signer(signer)))
|
||||
}))
|
||||
};
|
||||
|
||||
Ok((transactions, convert))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -198,6 +198,26 @@ pub trait BlockBody:
|
||||
.collect()
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns an iterator over `Recovered<&Transaction>` for all transactions in the block body.
|
||||
///
|
||||
/// This method recovers signers and returns an iterator without cloning transactions,
|
||||
/// making it more efficient than [`BlockBody::recover_transactions`] when owned values are not
|
||||
/// required.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an error if any transaction's signature is invalid.
|
||||
fn recover_transactions_ref(
|
||||
&self,
|
||||
) -> Result<impl Iterator<Item = Recovered<&Self::Transaction>> + '_, RecoveryError> {
|
||||
let signers = self.recover_signers()?;
|
||||
Ok(self
|
||||
.transactions()
|
||||
.iter()
|
||||
.zip(signers)
|
||||
.map(|(tx, signer)| Recovered::new_unchecked(tx, signer)))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, H> BlockBody for alloy_consensus::BlockBody<T, H>
|
||||
|
||||
@@ -38,7 +38,7 @@ pub enum HistoryType {
|
||||
|
||||
/// Default number of blocks to retain for merkle changesets.
|
||||
/// This is used by both the `MerkleChangeSets` stage and the pruner segment.
|
||||
pub const MERKLE_CHANGESETS_RETENTION_BLOCKS: u64 = 64;
|
||||
pub const MERKLE_CHANGESETS_RETENTION_BLOCKS: u64 = 128;
|
||||
|
||||
/// Default pruning mode for merkle changesets
|
||||
const fn default_merkle_changesets_mode() -> PruneMode {
|
||||
@@ -95,13 +95,7 @@ pub struct PruneModes {
|
||||
pub bodies_history: Option<PruneMode>,
|
||||
/// Merkle Changesets pruning configuration for `AccountsTrieChangeSets` and
|
||||
/// `StoragesTrieChangeSets`.
|
||||
#[cfg_attr(
|
||||
any(test, feature = "serde"),
|
||||
serde(
|
||||
default = "default_merkle_changesets_mode",
|
||||
deserialize_with = "deserialize_prune_mode_with_min_blocks::<MERKLE_CHANGESETS_RETENTION_BLOCKS, _>"
|
||||
)
|
||||
)]
|
||||
#[cfg_attr(any(test, feature = "serde"), serde(default = "default_merkle_changesets_mode"))]
|
||||
pub merkle_changesets: PruneMode,
|
||||
/// Receipts pruning configuration by retaining only those receipts that contain logs emitted
|
||||
/// by the specified addresses, discarding others. This setting is overridden by `receipts`.
|
||||
@@ -155,14 +149,15 @@ impl PruneModes {
|
||||
/// Returns `true` if any migration was performed.
|
||||
///
|
||||
/// Currently migrates:
|
||||
/// - `merkle_changesets`: `Distance(10064)` -> `Distance(64)`
|
||||
pub fn migrate(&mut self) -> bool {
|
||||
if self.merkle_changesets == PruneMode::Distance(MINIMUM_PRUNING_DISTANCE) {
|
||||
/// - `merkle_changesets`: `Distance(n)` where `n < 128` or `n == 10064` -> `Distance(128)`
|
||||
pub const fn migrate(&mut self) -> bool {
|
||||
if let PruneMode::Distance(d) = self.merkle_changesets &&
|
||||
(d < MERKLE_CHANGESETS_RETENTION_BLOCKS || d == MINIMUM_PRUNING_DISTANCE)
|
||||
{
|
||||
self.merkle_changesets = PruneMode::Distance(MERKLE_CHANGESETS_RETENTION_BLOCKS);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
return true;
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Returns an error if we can't unwind to the targeted block because the target block is
|
||||
@@ -214,28 +209,6 @@ impl PruneModes {
|
||||
}
|
||||
}
|
||||
|
||||
/// Deserializes [`PruneMode`] and validates that the value is not less than the const
|
||||
/// generic parameter `MIN_BLOCKS`. This parameter represents the number of blocks that needs to be
|
||||
/// left in database after the pruning.
|
||||
///
|
||||
/// 1. For [`PruneMode::Full`], it fails if `MIN_BLOCKS > 0`.
|
||||
/// 2. For [`PruneMode::Distance`], it fails if `distance < MIN_BLOCKS + 1`. `+ 1` is needed because
|
||||
/// `PruneMode::Distance(0)` means that we leave zero blocks from the latest, meaning we have one
|
||||
/// block in the database.
|
||||
#[cfg(any(test, feature = "serde"))]
|
||||
fn deserialize_prune_mode_with_min_blocks<
|
||||
'de,
|
||||
const MIN_BLOCKS: u64,
|
||||
D: serde::Deserializer<'de>,
|
||||
>(
|
||||
deserializer: D,
|
||||
) -> Result<PruneMode, D::Error> {
|
||||
use serde::Deserialize;
|
||||
let prune_mode = PruneMode::deserialize(deserializer)?;
|
||||
serde_deserialize_validate::<MIN_BLOCKS, D>(&prune_mode)?;
|
||||
Ok(prune_mode)
|
||||
}
|
||||
|
||||
/// Deserializes [`Option<PruneMode>`] and validates that the value is not less than the const
|
||||
/// generic parameter `MIN_BLOCKS`. This parameter represents the number of blocks that needs to be
|
||||
/// left in database after the pruning.
|
||||
|
||||
@@ -546,6 +546,13 @@ where
|
||||
.transpose()?
|
||||
.flatten();
|
||||
|
||||
// Return error if toBlock exceeds current head
|
||||
if let Some(t) = to &&
|
||||
t > info.best_number
|
||||
{
|
||||
return Err(EthFilterError::BlockRangeExceedsHead);
|
||||
}
|
||||
|
||||
if let Some(f) = from &&
|
||||
f > info.best_number
|
||||
{
|
||||
@@ -894,6 +901,9 @@ pub enum EthFilterError {
|
||||
/// Invalid block range.
|
||||
#[error("invalid block range params")]
|
||||
InvalidBlockRangeParams,
|
||||
/// Block range extends beyond current head.
|
||||
#[error("block range extends beyond current head block")]
|
||||
BlockRangeExceedsHead,
|
||||
/// Query scope is too broad.
|
||||
#[error("query exceeds max block range {0}")]
|
||||
QueryExceedsMaxBlocks(u64),
|
||||
@@ -928,7 +938,8 @@ impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
|
||||
EthFilterError::EthAPIError(err) => err.into(),
|
||||
err @ (EthFilterError::InvalidBlockRangeParams |
|
||||
EthFilterError::QueryExceedsMaxBlocks(_) |
|
||||
EthFilterError::QueryExceedsMaxResults { .. }) => {
|
||||
EthFilterError::QueryExceedsMaxResults { .. } |
|
||||
EthFilterError::BlockRangeExceedsHead) => {
|
||||
rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -382,7 +382,7 @@ where
|
||||
|
||||
let mut all_traces = Vec::new();
|
||||
let mut block_traces = Vec::with_capacity(self.inner.eth_config.max_tracing_requests);
|
||||
for chunk_start in (start..end).step_by(self.inner.eth_config.max_tracing_requests) {
|
||||
for chunk_start in (start..=end).step_by(self.inner.eth_config.max_tracing_requests) {
|
||||
let chunk_end =
|
||||
std::cmp::min(chunk_start + self.inner.eth_config.max_tracing_requests as u64, end);
|
||||
|
||||
|
||||
@@ -19,6 +19,9 @@ pub struct StorageSettings {
|
||||
/// Whether this node always writes transaction senders to static files.
|
||||
#[serde(default)]
|
||||
pub transaction_senders_in_static_files: bool,
|
||||
/// Whether `StoragesHistory` is stored in `RocksDB`.
|
||||
#[serde(default)]
|
||||
pub storages_history_in_rocksdb: bool,
|
||||
}
|
||||
|
||||
impl StorageSettings {
|
||||
@@ -28,7 +31,11 @@ impl StorageSettings {
|
||||
/// `false`, ensuring older nodes continue writing receipts and transaction senders to the
|
||||
/// database when receipt pruning is enabled.
|
||||
pub const fn legacy() -> Self {
|
||||
Self { receipts_in_static_files: false, transaction_senders_in_static_files: false }
|
||||
Self {
|
||||
receipts_in_static_files: false,
|
||||
transaction_senders_in_static_files: false,
|
||||
storages_history_in_rocksdb: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the `receipts_in_static_files` flag to the provided value.
|
||||
@@ -42,4 +49,10 @@ impl StorageSettings {
|
||||
self.transaction_senders_in_static_files = value;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the `storages_history_in_rocksdb` flag to the provided value.
|
||||
pub const fn with_storages_history_in_rocksdb(mut self, value: bool) -> Self {
|
||||
self.storages_history_in_rocksdb = value;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -247,6 +247,20 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Validates a single transaction with the provided state provider.
|
||||
pub fn validate_one_with_state_provider(
|
||||
&self,
|
||||
origin: TransactionOrigin,
|
||||
transaction: Tx,
|
||||
state: impl AccountInfoReader,
|
||||
) -> TransactionValidationOutcome<Tx> {
|
||||
let tx = match self.validate_one_no_state(origin, transaction) {
|
||||
Ok(tx) => tx,
|
||||
Err(invalid_outcome) => return invalid_outcome,
|
||||
};
|
||||
self.validate_one_against_state(origin, tx, state)
|
||||
}
|
||||
|
||||
/// Performs stateless validation on single transaction. Returns unaltered input transaction
|
||||
/// if all checks pass, so transaction can continue through to stateful validation as argument
|
||||
/// to [`validate_one_against_state`](Self::validate_one_against_state).
|
||||
|
||||
@@ -126,7 +126,8 @@ impl ParallelProof {
|
||||
)))
|
||||
})?;
|
||||
|
||||
// Extract storage proof directly from the result
|
||||
// Extract storage proof directly from the result.
|
||||
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here.
|
||||
let storage_proof = match proof_msg.result? {
|
||||
crate::proof_task::ProofResult::StorageProof { hashed_address: addr, proof } => {
|
||||
debug_assert_eq!(
|
||||
@@ -134,7 +135,8 @@ impl ParallelProof {
|
||||
hashed_address,
|
||||
"storage worker must return same address: expected {hashed_address}, got {addr}"
|
||||
);
|
||||
proof
|
||||
// Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise.
|
||||
Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone())
|
||||
}
|
||||
crate::proof_task::ProofResult::AccountMultiproof { .. } => {
|
||||
unreachable!("storage worker only sends StorageProof variant")
|
||||
@@ -223,8 +225,12 @@ impl ParallelProof {
|
||||
)
|
||||
})?;
|
||||
|
||||
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here.
|
||||
let (multiproof, stats) = match proof_result_msg.result? {
|
||||
crate::proof_task::ProofResult::AccountMultiproof { proof, stats } => (proof, stats),
|
||||
crate::proof_task::ProofResult::AccountMultiproof { proof, stats } => {
|
||||
// Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise.
|
||||
(Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone()), stats)
|
||||
}
|
||||
crate::proof_task::ProofResult::StorageProof { .. } => {
|
||||
unreachable!("account worker only sends AccountMultiproof variant")
|
||||
}
|
||||
|
||||
@@ -41,6 +41,7 @@ use alloy_primitives::{
|
||||
use alloy_rlp::{BufMut, Encodable};
|
||||
use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
use dashmap::DashMap;
|
||||
use metrics::Histogram;
|
||||
use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind};
|
||||
use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
|
||||
use reth_storage_errors::db::DatabaseError;
|
||||
@@ -79,6 +80,275 @@ use crate::proof_task_metrics::{
|
||||
type StorageProofResult = Result<DecodedStorageMultiProof, ParallelStateRootError>;
|
||||
type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
|
||||
|
||||
/// Maximum number of storage proof jobs to batch together per account.
|
||||
const STORAGE_PROOF_BATCH_LIMIT: usize = 32;
|
||||
|
||||
/// Maximum number of blinded node requests to defer during storage proof batching.
|
||||
/// When this limit is reached, batching stops early to process deferred nodes,
|
||||
/// preventing starvation of blinded node requests under high proof load.
|
||||
const MAX_DEFERRED_BLINDED_NODES: usize = 16;
|
||||
|
||||
/// Holds batched storage proof jobs for the same account.
|
||||
///
|
||||
/// When multiple storage proof requests arrive for the same account, they can be merged
|
||||
/// into a single proof computation with combined prefix sets and target slots.
|
||||
#[derive(Debug)]
|
||||
struct BatchedStorageProof {
|
||||
/// The merged prefix set from all batched jobs.
|
||||
prefix_set: PrefixSetMut,
|
||||
/// The merged target slots from all batched jobs.
|
||||
target_slots: B256Set,
|
||||
/// Whether any job requested branch node masks.
|
||||
with_branch_node_masks: bool,
|
||||
/// The `multi_added_removed_keys` from the first job (they should all share the same `Arc`).
|
||||
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
|
||||
/// All senders that need to receive the result.
|
||||
senders: Vec<ProofResultContext>,
|
||||
}
|
||||
|
||||
impl BatchedStorageProof {
|
||||
/// Creates a new batch from the first storage proof input.
|
||||
fn new(input: StorageProofInput, sender: ProofResultContext) -> Self {
|
||||
// Convert frozen PrefixSet to mutable PrefixSetMut by collecting its keys.
|
||||
let prefix_set = PrefixSetMut::from(input.prefix_set.iter().copied());
|
||||
Self {
|
||||
prefix_set,
|
||||
target_slots: input.target_slots,
|
||||
with_branch_node_masks: input.with_branch_node_masks,
|
||||
multi_added_removed_keys: input.multi_added_removed_keys,
|
||||
senders: vec![sender],
|
||||
}
|
||||
}
|
||||
|
||||
/// Merges another storage proof job into this batch.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if `input.multi_added_removed_keys` does not point to the same Arc as the batch's.
|
||||
/// This is a critical invariant for proof correctness.
|
||||
fn merge(&mut self, input: StorageProofInput, sender: ProofResultContext) {
|
||||
// Validate that all batched jobs share the same multi_added_removed_keys Arc.
|
||||
// This is a critical invariant: if jobs have different keys, the merged proof
|
||||
// would be computed with only the first job's keys, producing incorrect results.
|
||||
// Using assert! (not debug_assert!) because incorrect proofs could cause consensus
|
||||
// failures.
|
||||
assert!(
|
||||
match (&self.multi_added_removed_keys, &input.multi_added_removed_keys) {
|
||||
(Some(a), Some(b)) => Arc::ptr_eq(a, b),
|
||||
(None, None) => true,
|
||||
_ => false,
|
||||
},
|
||||
"All batched storage proof jobs must share the same multi_added_removed_keys Arc"
|
||||
);
|
||||
|
||||
self.prefix_set.extend_keys(input.prefix_set.iter().copied());
|
||||
self.target_slots.extend(input.target_slots);
|
||||
self.with_branch_node_masks |= input.with_branch_node_masks;
|
||||
self.senders.push(sender);
|
||||
}
|
||||
|
||||
/// Converts this batch into a single `StorageProofInput` for computation.
|
||||
fn into_input(self, hashed_address: B256) -> (StorageProofInput, Vec<ProofResultContext>) {
|
||||
let input = StorageProofInput {
|
||||
hashed_address,
|
||||
prefix_set: self.prefix_set.freeze(),
|
||||
target_slots: self.target_slots,
|
||||
with_branch_node_masks: self.with_branch_node_masks,
|
||||
multi_added_removed_keys: self.multi_added_removed_keys,
|
||||
};
|
||||
(input, self.senders)
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for storage worker batching.
|
||||
#[derive(Clone, Default)]
|
||||
struct StorageWorkerBatchMetrics {
|
||||
/// Histogram of batch sizes (number of jobs merged per computation).
|
||||
#[cfg(feature = "metrics")]
|
||||
batch_size_histogram: Option<Histogram>,
|
||||
}
|
||||
|
||||
impl StorageWorkerBatchMetrics {
|
||||
#[cfg(feature = "metrics")]
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
batch_size_histogram: Some(metrics::histogram!(
|
||||
"trie.proof_task.storage_worker_batch_size"
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "metrics"))]
|
||||
fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
|
||||
fn record_batch_size(&self, _size: usize) {
|
||||
#[cfg(feature = "metrics")]
|
||||
if let Some(h) = &self.batch_size_histogram {
|
||||
h.record(_size as f64);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Maximum number of account multiproof jobs to batch together.
|
||||
const ACCOUNT_PROOF_BATCH_LIMIT: usize = 32;
|
||||
|
||||
/// Holds batched account multiproof jobs.
|
||||
///
|
||||
/// When multiple account multiproof requests arrive, they can be merged
|
||||
/// into a single proof computation with combined targets and prefix sets.
|
||||
#[derive(Debug)]
|
||||
struct BatchedAccountProof {
|
||||
/// The merged targets from all batched jobs.
|
||||
targets: MultiProofTargets,
|
||||
/// The merged account prefix set from all batched jobs.
|
||||
account_prefix_set: PrefixSetMut,
|
||||
/// The merged storage prefix sets from all batched jobs.
|
||||
storage_prefix_sets: B256Map<PrefixSetMut>,
|
||||
/// The merged destroyed accounts from all batched jobs.
|
||||
destroyed_accounts: B256Set,
|
||||
/// Whether any job requested branch node masks.
|
||||
collect_branch_node_masks: bool,
|
||||
/// The `multi_added_removed_keys` from the first job (they should all share the same `Arc`).
|
||||
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
|
||||
/// The shared `missed_leaves_storage_roots` cache from the first job.
|
||||
missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
|
||||
/// All senders that need to receive the result.
|
||||
senders: Vec<ProofResultContext>,
|
||||
}
|
||||
|
||||
impl BatchedAccountProof {
|
||||
/// Creates a new batch from the first account multiproof input.
|
||||
fn new(input: AccountMultiproofInput) -> Self {
|
||||
// Convert frozen prefix sets to mutable versions.
|
||||
let account_prefix_set =
|
||||
PrefixSetMut::from(input.prefix_sets.account_prefix_set.iter().copied());
|
||||
let storage_prefix_sets = input
|
||||
.prefix_sets
|
||||
.storage_prefix_sets
|
||||
.into_iter()
|
||||
.map(|(addr, ps)| (addr, PrefixSetMut::from(ps.iter().copied())))
|
||||
.collect();
|
||||
let destroyed_accounts = input.prefix_sets.destroyed_accounts;
|
||||
|
||||
Self {
|
||||
targets: input.targets,
|
||||
account_prefix_set,
|
||||
storage_prefix_sets,
|
||||
destroyed_accounts,
|
||||
collect_branch_node_masks: input.collect_branch_node_masks,
|
||||
multi_added_removed_keys: input.multi_added_removed_keys,
|
||||
missed_leaves_storage_roots: input.missed_leaves_storage_roots,
|
||||
senders: vec![input.proof_result_sender],
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to merge another account multiproof job into this batch.
|
||||
///
|
||||
/// Returns the job back if caches are incompatible so the caller can process it separately.
|
||||
fn try_merge(&mut self, input: AccountMultiproofInput) -> Result<(), AccountMultiproofInput> {
|
||||
// Require all jobs to share the same caches; otherwise merging would produce
|
||||
// incorrect proofs by reusing the wrong retained keys or missed-leaf storage roots.
|
||||
let multi_added_removed_keys_mismatch =
|
||||
!match (&self.multi_added_removed_keys, &input.multi_added_removed_keys) {
|
||||
(Some(a), Some(b)) => Arc::ptr_eq(a, b),
|
||||
(None, None) => true,
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if multi_added_removed_keys_mismatch ||
|
||||
!Arc::ptr_eq(&self.missed_leaves_storage_roots, &input.missed_leaves_storage_roots)
|
||||
{
|
||||
return Err(input);
|
||||
}
|
||||
|
||||
// Merge targets.
|
||||
self.targets.extend(input.targets);
|
||||
|
||||
// Merge account prefix set.
|
||||
self.account_prefix_set.extend_keys(input.prefix_sets.account_prefix_set.iter().copied());
|
||||
|
||||
// Merge storage prefix sets.
|
||||
for (addr, ps) in input.prefix_sets.storage_prefix_sets {
|
||||
match self.storage_prefix_sets.entry(addr) {
|
||||
alloy_primitives::map::Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().extend_keys(ps.iter().copied());
|
||||
}
|
||||
alloy_primitives::map::Entry::Vacant(entry) => {
|
||||
entry.insert(PrefixSetMut::from(ps.iter().copied()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Merge destroyed accounts.
|
||||
self.destroyed_accounts.extend(input.prefix_sets.destroyed_accounts);
|
||||
|
||||
// OR the branch node masks flag.
|
||||
self.collect_branch_node_masks |= input.collect_branch_node_masks;
|
||||
|
||||
// Collect the sender.
|
||||
self.senders.push(input.proof_result_sender);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Converts this batch into a single `AccountMultiproofInput` for computation.
|
||||
fn into_input(self) -> (AccountMultiproofInput, Vec<ProofResultContext>) {
|
||||
// Freeze the mutable prefix sets.
|
||||
let storage_prefix_sets: B256Map<PrefixSet> =
|
||||
self.storage_prefix_sets.into_iter().map(|(addr, ps)| (addr, ps.freeze())).collect();
|
||||
|
||||
let prefix_sets = TriePrefixSets {
|
||||
account_prefix_set: self.account_prefix_set.freeze(),
|
||||
storage_prefix_sets,
|
||||
destroyed_accounts: self.destroyed_accounts,
|
||||
};
|
||||
|
||||
// Use a dummy sender for the input since we'll handle all senders separately.
|
||||
let dummy_sender = self.senders.first().expect("batch always has at least one sender");
|
||||
let input = AccountMultiproofInput {
|
||||
targets: self.targets,
|
||||
prefix_sets,
|
||||
collect_branch_node_masks: self.collect_branch_node_masks,
|
||||
multi_added_removed_keys: self.multi_added_removed_keys,
|
||||
missed_leaves_storage_roots: self.missed_leaves_storage_roots,
|
||||
proof_result_sender: dummy_sender.clone(),
|
||||
};
|
||||
(input, self.senders)
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for account worker batching.
|
||||
#[derive(Clone, Default)]
|
||||
struct AccountWorkerBatchMetrics {
|
||||
/// Histogram of batch sizes (number of jobs merged per computation).
|
||||
#[cfg(feature = "metrics")]
|
||||
batch_size_histogram: Option<Histogram>,
|
||||
}
|
||||
|
||||
impl AccountWorkerBatchMetrics {
|
||||
#[cfg(feature = "metrics")]
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
batch_size_histogram: Some(metrics::histogram!(
|
||||
"trie.proof_task.account_worker_batch_size"
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "metrics"))]
|
||||
fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
|
||||
fn record_batch_size(&self, _size: usize) {
|
||||
#[cfg(feature = "metrics")]
|
||||
if let Some(h) = &self.batch_size_histogram {
|
||||
h.record(_size as f64);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A handle that provides type-safe access to proof worker pools.
|
||||
///
|
||||
/// The handle stores direct senders to both storage and account worker pools,
|
||||
@@ -552,12 +822,16 @@ impl TrieNodeProvider for ProofTaskTrieNodeProvider {
|
||||
}
|
||||
}
|
||||
/// Result of a proof calculation, which can be either an account multiproof or a storage proof.
|
||||
#[derive(Debug)]
|
||||
///
|
||||
/// The proof data is wrapped in `Arc` to enable efficient sharing when batching multiple
|
||||
/// proof requests. This avoids expensive cloning of the underlying proof structures
|
||||
/// when sending results to multiple receivers.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ProofResult {
|
||||
/// Account multiproof with statistics
|
||||
AccountMultiproof {
|
||||
/// The account multiproof
|
||||
proof: DecodedMultiProof,
|
||||
/// The account multiproof (Arc-wrapped for efficient sharing in batches)
|
||||
proof: Arc<DecodedMultiProof>,
|
||||
/// Statistics collected during proof computation
|
||||
stats: ParallelTrieStats,
|
||||
},
|
||||
@@ -565,8 +839,8 @@ pub enum ProofResult {
|
||||
StorageProof {
|
||||
/// The hashed address this storage proof belongs to
|
||||
hashed_address: B256,
|
||||
/// The storage multiproof
|
||||
proof: DecodedStorageMultiProof,
|
||||
/// The storage multiproof (Arc-wrapped for efficient sharing in batches)
|
||||
proof: Arc<DecodedStorageMultiProof>,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -575,11 +849,17 @@ impl ProofResult {
|
||||
///
|
||||
/// For account multiproofs, returns the multiproof directly (discarding stats).
|
||||
/// For storage proofs, wraps the storage proof into a minimal multiproof.
|
||||
///
|
||||
/// Note: This method clones the inner proof data. If you need to avoid the clone
|
||||
/// when you're the sole owner, consider using `Arc::try_unwrap` first.
|
||||
pub fn into_multiproof(self) -> DecodedMultiProof {
|
||||
match self {
|
||||
Self::AccountMultiproof { proof, stats: _ } => proof,
|
||||
Self::AccountMultiproof { proof, stats: _ } => {
|
||||
Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone())
|
||||
}
|
||||
Self::StorageProof { hashed_address, proof } => {
|
||||
DecodedMultiProof::from_storage_proof(hashed_address, proof)
|
||||
let storage_proof = Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone());
|
||||
DecodedMultiProof::from_storage_proof(hashed_address, storage_proof)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -708,11 +988,18 @@ where
|
||||
/// 2. Advertises availability
|
||||
/// 3. Processes jobs in a loop:
|
||||
/// - Receives job from channel
|
||||
/// - Drains additional same-account storage proof jobs (batching)
|
||||
/// - Marks worker as busy
|
||||
/// - Processes the job
|
||||
/// - Processes the batched jobs as a single proof computation
|
||||
/// - Marks worker as available
|
||||
/// 4. Shuts down when channel closes
|
||||
///
|
||||
/// # Batching Strategy
|
||||
///
|
||||
/// When multiple storage proof requests arrive for the same account, they are merged
|
||||
/// into a single proof computation. This reduces redundant trie traversals when state
|
||||
/// updates arrive faster than proof computation can process them.
|
||||
///
|
||||
/// # Panic Safety
|
||||
///
|
||||
/// If this function panics, the worker thread terminates but other workers
|
||||
@@ -732,6 +1019,7 @@ where
|
||||
// Create provider from factory
|
||||
let provider = task_ctx.factory.database_provider_ro()?;
|
||||
let proof_tx = ProofTaskTx::new(provider, worker_id);
|
||||
let batch_metrics = StorageWorkerBatchMetrics::new();
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
@@ -746,20 +1034,104 @@ where
|
||||
// Initially mark this worker as available.
|
||||
available_workers.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// Deferred blinded node jobs to process after batched storage proofs.
|
||||
// Pre-allocate with capacity to avoid reallocations during batching.
|
||||
let mut deferred_blinded_nodes: Vec<(B256, Nibbles, Sender<TrieNodeProviderResult>)> =
|
||||
Vec::with_capacity(MAX_DEFERRED_BLINDED_NODES);
|
||||
|
||||
while let Ok(job) = work_rx.recv() {
|
||||
// Mark worker as busy.
|
||||
available_workers.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match job {
|
||||
StorageWorkerJob::StorageProof { input, proof_result_sender } => {
|
||||
Self::process_storage_proof(
|
||||
worker_id,
|
||||
&proof_tx,
|
||||
input,
|
||||
proof_result_sender,
|
||||
&mut storage_proofs_processed,
|
||||
&mut cursor_metrics_cache,
|
||||
// Start batching: group storage proofs by account.
|
||||
let mut batches: B256Map<BatchedStorageProof> = B256Map::default();
|
||||
batches.insert(
|
||||
input.hashed_address,
|
||||
BatchedStorageProof::new(input, proof_result_sender),
|
||||
);
|
||||
let mut total_jobs = 1usize;
|
||||
|
||||
// Drain additional jobs from the queue.
|
||||
while total_jobs < STORAGE_PROOF_BATCH_LIMIT {
|
||||
match work_rx.try_recv() {
|
||||
Ok(StorageWorkerJob::StorageProof {
|
||||
input: next_input,
|
||||
proof_result_sender: next_sender,
|
||||
}) => {
|
||||
total_jobs += 1;
|
||||
let addr = next_input.hashed_address;
|
||||
match batches.entry(addr) {
|
||||
alloy_primitives::map::Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().merge(next_input, next_sender);
|
||||
}
|
||||
alloy_primitives::map::Entry::Vacant(entry) => {
|
||||
entry.insert(BatchedStorageProof::new(
|
||||
next_input,
|
||||
next_sender,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(StorageWorkerJob::BlindedStorageNode {
|
||||
account,
|
||||
path,
|
||||
result_sender,
|
||||
}) => {
|
||||
// Defer blinded node jobs to process after batched proofs.
|
||||
deferred_blinded_nodes.push((account, path, result_sender));
|
||||
// Stop batching if too many blinded nodes are deferred to prevent
|
||||
// starvation.
|
||||
if deferred_blinded_nodes.len() >= MAX_DEFERRED_BLINDED_NODES {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
// Process all batched storage proofs.
|
||||
for (hashed_address, batch) in batches {
|
||||
let batch_size = batch.senders.len();
|
||||
batch_metrics.record_batch_size(batch_size);
|
||||
|
||||
let (merged_input, senders) = batch.into_input(hashed_address);
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
?hashed_address,
|
||||
batch_size,
|
||||
prefix_set_len = merged_input.prefix_set.len(),
|
||||
target_slots_len = merged_input.target_slots.len(),
|
||||
"Processing batched storage proof"
|
||||
);
|
||||
|
||||
Self::process_batched_storage_proof(
|
||||
worker_id,
|
||||
&proof_tx,
|
||||
hashed_address,
|
||||
merged_input,
|
||||
senders,
|
||||
&mut storage_proofs_processed,
|
||||
&mut cursor_metrics_cache,
|
||||
);
|
||||
}
|
||||
|
||||
// Process any deferred blinded node jobs.
|
||||
for (account, path, result_sender) in
|
||||
std::mem::take(&mut deferred_blinded_nodes)
|
||||
{
|
||||
Self::process_blinded_node(
|
||||
worker_id,
|
||||
&proof_tx,
|
||||
account,
|
||||
path,
|
||||
result_sender,
|
||||
&mut storage_nodes_processed,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
|
||||
@@ -795,82 +1167,103 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Processes a storage proof request.
|
||||
fn process_storage_proof<Provider>(
|
||||
/// Processes a batched storage proof request and sends results to all waiting receivers.
|
||||
///
|
||||
/// This computes a single storage proof with merged targets and sends the same result
|
||||
/// to all original requestors, reducing redundant trie traversals.
|
||||
fn process_batched_storage_proof<Provider>(
|
||||
worker_id: usize,
|
||||
proof_tx: &ProofTaskTx<Provider>,
|
||||
hashed_address: B256,
|
||||
input: StorageProofInput,
|
||||
proof_result_sender: ProofResultContext,
|
||||
senders: Vec<ProofResultContext>,
|
||||
storage_proofs_processed: &mut u64,
|
||||
cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
|
||||
) where
|
||||
Provider: TrieCursorFactory + HashedCursorFactory,
|
||||
{
|
||||
let hashed_address = input.hashed_address;
|
||||
let ProofResultContext { sender, sequence_number: seq, state, start_time } =
|
||||
proof_result_sender;
|
||||
|
||||
let mut trie_cursor_metrics = TrieCursorMetricsCache::default();
|
||||
let mut hashed_cursor_metrics = HashedCursorMetricsCache::default();
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
hashed_address = ?hashed_address,
|
||||
prefix_set_len = input.prefix_set.len(),
|
||||
target_slots_len = input.target_slots.len(),
|
||||
"Processing storage proof"
|
||||
);
|
||||
|
||||
let proof_start = Instant::now();
|
||||
let result = proof_tx.compute_storage_proof(
|
||||
input,
|
||||
&mut trie_cursor_metrics,
|
||||
&mut hashed_cursor_metrics,
|
||||
);
|
||||
|
||||
let proof_elapsed = proof_start.elapsed();
|
||||
*storage_proofs_processed += 1;
|
||||
|
||||
let result_msg = result.map(|storage_proof| ProofResult::StorageProof {
|
||||
hashed_address,
|
||||
proof: storage_proof,
|
||||
});
|
||||
// Send the result to all waiting receivers.
|
||||
let num_senders = senders.len();
|
||||
match result {
|
||||
Ok(storage_proof) => {
|
||||
// Success case: wrap proof in Arc for efficient sharing across all senders.
|
||||
let proof_result =
|
||||
ProofResult::StorageProof { hashed_address, proof: Arc::new(storage_proof) };
|
||||
|
||||
if sender
|
||||
.send(ProofResultMessage {
|
||||
sequence_number: seq,
|
||||
result: result_msg,
|
||||
elapsed: start_time.elapsed(),
|
||||
state,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
hashed_address = ?hashed_address,
|
||||
storage_proofs_processed,
|
||||
"Proof result receiver dropped, discarding result"
|
||||
);
|
||||
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
|
||||
*storage_proofs_processed += 1;
|
||||
|
||||
if sender
|
||||
.send(ProofResultMessage {
|
||||
sequence_number,
|
||||
result: Ok(proof_result.clone()),
|
||||
elapsed: start_time.elapsed(),
|
||||
state,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
?hashed_address,
|
||||
sequence_number,
|
||||
"Proof result receiver dropped, discarding result"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
// Error case: convert to string for cloning, then send to all receivers.
|
||||
let error_msg = error.to_string();
|
||||
|
||||
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
|
||||
*storage_proofs_processed += 1;
|
||||
|
||||
if sender
|
||||
.send(ProofResultMessage {
|
||||
sequence_number,
|
||||
result: Err(ParallelStateRootError::Other(error_msg.clone())),
|
||||
elapsed: start_time.elapsed(),
|
||||
state,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
?hashed_address,
|
||||
sequence_number,
|
||||
"Proof result receiver dropped, discarding result"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
hashed_address = ?hashed_address,
|
||||
?hashed_address,
|
||||
proof_time_us = proof_elapsed.as_micros(),
|
||||
total_processed = storage_proofs_processed,
|
||||
num_senders,
|
||||
trie_cursor_duration_us = trie_cursor_metrics.total_duration.as_micros(),
|
||||
hashed_cursor_duration_us = hashed_cursor_metrics.total_duration.as_micros(),
|
||||
?trie_cursor_metrics,
|
||||
?hashed_cursor_metrics,
|
||||
"Storage proof completed"
|
||||
"Batched storage proof completed"
|
||||
);
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
{
|
||||
// Accumulate per-proof metrics into the worker's cache
|
||||
let per_proof_cache = ProofTaskCursorMetricsCache {
|
||||
account_trie_cursor: TrieCursorMetricsCache::default(),
|
||||
account_hashed_cursor: HashedCursorMetricsCache::default(),
|
||||
@@ -987,11 +1380,18 @@ where
|
||||
/// 2. Advertises availability
|
||||
/// 3. Processes jobs in a loop:
|
||||
/// - Receives job from channel
|
||||
/// - Drains additional account multiproof jobs (batching)
|
||||
/// - Marks worker as busy
|
||||
/// - Processes the job
|
||||
/// - Processes the batched jobs as a single proof computation
|
||||
/// - Marks worker as available
|
||||
/// 4. Shuts down when channel closes
|
||||
///
|
||||
/// # Batching Strategy
|
||||
///
|
||||
/// When multiple account multiproof requests arrive, they are merged into
|
||||
/// a single proof computation. This reduces redundant trie traversals when
|
||||
/// state updates arrive faster than proof computation can process them.
|
||||
///
|
||||
/// # Panic Safety
|
||||
///
|
||||
/// If this function panics, the worker thread terminates but other workers
|
||||
@@ -1012,6 +1412,7 @@ where
|
||||
// Create provider from factory
|
||||
let provider = task_ctx.factory.database_provider_ro()?;
|
||||
let proof_tx = ProofTaskTx::new(provider, worker_id);
|
||||
let batch_metrics = AccountWorkerBatchMetrics::new();
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
@@ -1026,20 +1427,98 @@ where
|
||||
// Count this worker as available only after successful initialization.
|
||||
available_workers.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// Deferred blinded node jobs to process after batched account proofs.
|
||||
// Pre-allocate with capacity to avoid reallocations during batching.
|
||||
let mut deferred_blinded_nodes: Vec<(Nibbles, Sender<TrieNodeProviderResult>)> =
|
||||
Vec::with_capacity(MAX_DEFERRED_BLINDED_NODES);
|
||||
|
||||
while let Ok(job) = work_rx.recv() {
|
||||
// Mark worker as busy.
|
||||
available_workers.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match job {
|
||||
AccountWorkerJob::AccountMultiproof { input } => {
|
||||
Self::process_account_multiproof(
|
||||
worker_id,
|
||||
&proof_tx,
|
||||
storage_work_tx.clone(),
|
||||
*input,
|
||||
&mut account_proofs_processed,
|
||||
&mut cursor_metrics_cache,
|
||||
);
|
||||
// Start batching: accumulate account multiproof jobs. If we encounter an
|
||||
// incompatible job (different caches), process it as a separate batch.
|
||||
let mut next_account_job: Option<Box<AccountMultiproofInput>> = Some(input);
|
||||
|
||||
while let Some(account_job) = next_account_job.take() {
|
||||
let mut batch = BatchedAccountProof::new(*account_job);
|
||||
let mut pending_incompatible: Option<Box<AccountMultiproofInput>> = None;
|
||||
|
||||
// Drain additional jobs from the queue.
|
||||
while batch.senders.len() < ACCOUNT_PROOF_BATCH_LIMIT {
|
||||
match work_rx.try_recv() {
|
||||
Ok(AccountWorkerJob::AccountMultiproof { input: next_input }) => {
|
||||
match batch.try_merge(*next_input) {
|
||||
Ok(()) => {}
|
||||
Err(incompatible) => {
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
"Account multiproof batch split due to incompatible caches"
|
||||
);
|
||||
pending_incompatible = Some(Box::new(incompatible));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(AccountWorkerJob::BlindedAccountNode {
|
||||
path,
|
||||
result_sender,
|
||||
}) => {
|
||||
// Defer blinded node jobs to process after batched proofs.
|
||||
deferred_blinded_nodes.push((path, result_sender));
|
||||
// Stop batching if too many blinded nodes are deferred to
|
||||
// prevent starvation.
|
||||
if deferred_blinded_nodes.len() >= MAX_DEFERRED_BLINDED_NODES {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
let batch_size = batch.senders.len();
|
||||
batch_metrics.record_batch_size(batch_size);
|
||||
|
||||
let (merged_input, senders) = batch.into_input();
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
batch_size,
|
||||
targets_len = merged_input.targets.len(),
|
||||
"Processing batched account multiproof"
|
||||
);
|
||||
|
||||
Self::process_batched_account_multiproof(
|
||||
worker_id,
|
||||
&proof_tx,
|
||||
&storage_work_tx,
|
||||
merged_input,
|
||||
senders,
|
||||
&mut account_proofs_processed,
|
||||
&mut cursor_metrics_cache,
|
||||
);
|
||||
|
||||
// If we encountered an incompatible job, process it as its own batch
|
||||
// before handling any deferred blinded node requests.
|
||||
if let Some(incompatible_job) = pending_incompatible {
|
||||
next_account_job = Some(incompatible_job);
|
||||
}
|
||||
}
|
||||
|
||||
// Process any deferred blinded node jobs.
|
||||
for (path, result_sender) in std::mem::take(&mut deferred_blinded_nodes) {
|
||||
Self::process_blinded_node(
|
||||
worker_id,
|
||||
&proof_tx,
|
||||
path,
|
||||
result_sender,
|
||||
&mut account_nodes_processed,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
|
||||
@@ -1074,12 +1553,16 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Processes an account multiproof request.
|
||||
fn process_account_multiproof<Provider>(
|
||||
/// Processes a batched account multiproof request and sends results to all waiting receivers.
|
||||
///
|
||||
/// This computes a single account multiproof with merged targets and sends the same result
|
||||
/// to all original requestors, reducing redundant trie traversals.
|
||||
fn process_batched_account_multiproof<Provider>(
|
||||
worker_id: usize,
|
||||
proof_tx: &ProofTaskTx<Provider>,
|
||||
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
|
||||
storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
|
||||
input: AccountMultiproofInput,
|
||||
senders: Vec<ProofResultContext>,
|
||||
account_proofs_processed: &mut u64,
|
||||
cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
|
||||
) where
|
||||
@@ -1091,21 +1574,21 @@ where
|
||||
collect_branch_node_masks,
|
||||
multi_added_removed_keys,
|
||||
missed_leaves_storage_roots,
|
||||
proof_result_sender:
|
||||
ProofResultContext { sender: result_tx, sequence_number: seq, state, start_time: start },
|
||||
proof_result_sender: _, // We use the senders vec instead
|
||||
} = input;
|
||||
|
||||
let span = debug_span!(
|
||||
target: "trie::proof_task",
|
||||
"Account multiproof calculation",
|
||||
"Batched account multiproof calculation",
|
||||
targets = targets.len(),
|
||||
batch_size = senders.len(),
|
||||
worker_id,
|
||||
);
|
||||
let _span_guard = span.enter();
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
"Processing account multiproof"
|
||||
"Processing batched account multiproof"
|
||||
);
|
||||
|
||||
let proof_start = Instant::now();
|
||||
@@ -1120,7 +1603,7 @@ where
|
||||
tracker.set_precomputed_storage_roots(storage_root_targets_len as u64);
|
||||
|
||||
let storage_proof_receivers = match dispatch_storage_proofs(
|
||||
&storage_work_tx,
|
||||
storage_work_tx,
|
||||
&targets,
|
||||
&mut storage_prefix_sets,
|
||||
collect_branch_node_masks,
|
||||
@@ -1128,14 +1611,17 @@ where
|
||||
) {
|
||||
Ok(receivers) => receivers,
|
||||
Err(error) => {
|
||||
// Send error through result channel
|
||||
error!(target: "trie::proof_task", "Failed to dispatch storage proofs: {error}");
|
||||
let _ = result_tx.send(ProofResultMessage {
|
||||
sequence_number: seq,
|
||||
result: Err(error),
|
||||
elapsed: start.elapsed(),
|
||||
state,
|
||||
});
|
||||
// Send error to all receivers
|
||||
let error_msg = error.to_string();
|
||||
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
|
||||
*account_proofs_processed += 1;
|
||||
let _ = sender.send(ProofResultMessage {
|
||||
sequence_number,
|
||||
result: Err(ParallelStateRootError::Other(error_msg.clone())),
|
||||
elapsed: start_time.elapsed(),
|
||||
state,
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -1156,46 +1642,75 @@ where
|
||||
build_account_multiproof_with_storage_roots(&proof_tx.provider, ctx, &mut tracker);
|
||||
|
||||
let proof_elapsed = proof_start.elapsed();
|
||||
let total_elapsed = start.elapsed();
|
||||
let proof_cursor_metrics = tracker.cursor_metrics;
|
||||
proof_cursor_metrics.record_spans();
|
||||
|
||||
let stats = tracker.finish();
|
||||
let result = result.map(|proof| ProofResult::AccountMultiproof { proof, stats });
|
||||
*account_proofs_processed += 1;
|
||||
|
||||
// Send result to MultiProofTask
|
||||
if result_tx
|
||||
.send(ProofResultMessage {
|
||||
sequence_number: seq,
|
||||
result,
|
||||
elapsed: total_elapsed,
|
||||
state,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
account_proofs_processed,
|
||||
"Account multiproof receiver dropped, discarding result"
|
||||
);
|
||||
// Send the result to all waiting receivers.
|
||||
let num_senders = senders.len();
|
||||
match result {
|
||||
Ok(proof) => {
|
||||
// Success case: wrap proof in Arc for efficient sharing across all senders.
|
||||
let proof_result = ProofResult::AccountMultiproof { proof: Arc::new(proof), stats };
|
||||
|
||||
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
|
||||
*account_proofs_processed += 1;
|
||||
|
||||
if sender
|
||||
.send(ProofResultMessage {
|
||||
sequence_number,
|
||||
result: Ok(proof_result.clone()),
|
||||
elapsed: start_time.elapsed(),
|
||||
state,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
sequence_number,
|
||||
"Account multiproof receiver dropped, discarding result"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
// Error case: convert to string for cloning, then send to all receivers.
|
||||
let error_msg = error.to_string();
|
||||
|
||||
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
|
||||
*account_proofs_processed += 1;
|
||||
|
||||
if sender
|
||||
.send(ProofResultMessage {
|
||||
sequence_number,
|
||||
result: Err(ParallelStateRootError::Other(error_msg.clone())),
|
||||
elapsed: start_time.elapsed(),
|
||||
state,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
sequence_number,
|
||||
"Account multiproof receiver dropped, discarding result"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
proof_time_us = proof_elapsed.as_micros(),
|
||||
total_elapsed_us = total_elapsed.as_micros(),
|
||||
total_processed = account_proofs_processed,
|
||||
num_senders,
|
||||
account_trie_cursor_duration_us = proof_cursor_metrics.account_trie_cursor.total_duration.as_micros(),
|
||||
account_hashed_cursor_duration_us = proof_cursor_metrics.account_hashed_cursor.total_duration.as_micros(),
|
||||
storage_trie_cursor_duration_us = proof_cursor_metrics.storage_trie_cursor.total_duration.as_micros(),
|
||||
storage_hashed_cursor_duration_us = proof_cursor_metrics.storage_hashed_cursor.total_duration.as_micros(),
|
||||
account_trie_cursor_metrics = ?proof_cursor_metrics.account_trie_cursor,
|
||||
account_hashed_cursor_metrics = ?proof_cursor_metrics.account_hashed_cursor,
|
||||
storage_trie_cursor_metrics = ?proof_cursor_metrics.storage_trie_cursor,
|
||||
storage_hashed_cursor_metrics = ?proof_cursor_metrics.storage_hashed_cursor,
|
||||
"Account multiproof completed"
|
||||
"Batched account multiproof completed"
|
||||
);
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
@@ -1338,7 +1853,9 @@ where
|
||||
|
||||
drop(_guard);
|
||||
|
||||
// Extract storage proof from the result
|
||||
// Extract storage proof from the result.
|
||||
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it
|
||||
// here.
|
||||
let proof = match proof_msg.result? {
|
||||
ProofResult::StorageProof { hashed_address: addr, proof } => {
|
||||
debug_assert_eq!(
|
||||
@@ -1346,7 +1863,9 @@ where
|
||||
hashed_address,
|
||||
"storage worker must return same address: expected {hashed_address}, got {addr}"
|
||||
);
|
||||
proof
|
||||
// Efficiently unwrap Arc: returns inner value if sole owner, clones
|
||||
// otherwise.
|
||||
Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone())
|
||||
}
|
||||
ProofResult::AccountMultiproof { .. } => {
|
||||
unreachable!("storage worker only sends StorageProof variant")
|
||||
@@ -1409,8 +1928,11 @@ where
|
||||
// Consume remaining storage proof receivers for accounts not encountered during trie walk.
|
||||
for (hashed_address, receiver) in storage_proof_receivers {
|
||||
if let Ok(proof_msg) = receiver.recv() {
|
||||
// Extract storage proof from the result
|
||||
// Extract storage proof from the result.
|
||||
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here.
|
||||
if let Ok(ProofResult::StorageProof { proof, .. }) = proof_msg.result {
|
||||
// Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise.
|
||||
let proof = Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone());
|
||||
collected_decoded_storages.insert(hashed_address, proof);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ use reth_op::{
|
||||
primitives::SignedTransaction,
|
||||
};
|
||||
use reth_rpc_api::eth::helpers::pending_block::BuildPendingEnv;
|
||||
use revm_primitives::Bytes;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -126,13 +127,15 @@ impl ConfigureEngineEvm<CustomExecutionData> for CustomEvmConfig {
|
||||
&self,
|
||||
payload: &CustomExecutionData,
|
||||
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
|
||||
Ok(payload.inner.payload.transactions().clone().into_iter().map(|encoded| {
|
||||
let transactions = payload.inner.payload.transactions().clone().into_iter();
|
||||
let convert = |encoded: Bytes| {
|
||||
let tx = CustomTransaction::decode_2718_exact(encoded.as_ref())
|
||||
.map_err(Into::into)
|
||||
.map_err(PayloadError::Decode)?;
|
||||
let signer = tx.try_recover().map_err(NewPayloadError::other)?;
|
||||
Ok::<_, NewPayloadError>(WithEncoded::new(encoded, tx.with_signer(signer)))
|
||||
}))
|
||||
};
|
||||
Ok((transactions, convert))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user