mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
134 Commits
matt/rm-lo
...
klkvr/spar
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1549e93eac | ||
|
|
48e1270dec | ||
|
|
821974a6a6 | ||
|
|
6183adb2db | ||
|
|
1b0c54a0a9 | ||
|
|
3aaa8daefc | ||
|
|
bb97b80116 | ||
|
|
81d1aa1eb4 | ||
|
|
d6d1a090e5 | ||
|
|
b52700dd95 | ||
|
|
65201a1abf | ||
|
|
b7cb06b88f | ||
|
|
5b86a17331 | ||
|
|
914ab7d5e6 | ||
|
|
7e38827df5 | ||
|
|
dd2c5b7c53 | ||
|
|
ed528fb975 | ||
|
|
25a97f0be4 | ||
|
|
844e531a3f | ||
|
|
ef860c20fb | ||
|
|
755d1e5f61 | ||
|
|
7d1bd7c9f8 | ||
|
|
93c2455cef | ||
|
|
6c661eb868 | ||
|
|
f90b5c8a7f | ||
|
|
c8b84404ae | ||
|
|
b722a7f360 | ||
|
|
d4fa6806b7 | ||
|
|
63742ab4ae | ||
|
|
f0eb2aad7c | ||
|
|
ea6da11cf4 | ||
|
|
20029d3c1a | ||
|
|
40eb3bb7b4 | ||
|
|
8875c8da25 | ||
|
|
d87b48c9fc | ||
|
|
08122bc1ea | ||
|
|
95778a0cc1 | ||
|
|
17359dadd9 | ||
|
|
83afaf1aa7 | ||
|
|
d72300c685 | ||
|
|
faf64c712e | ||
|
|
b3d532ce9d | ||
|
|
9d064be77e | ||
|
|
bab01a7bba | ||
|
|
82960045c9 | ||
|
|
513fca16e9 | ||
|
|
ad242a2002 | ||
|
|
e3c256340e | ||
|
|
840d6066a6 | ||
|
|
1f315b4f3d | ||
|
|
fa604aa3dc | ||
|
|
d0df549ddb | ||
|
|
4b851acfdb | ||
|
|
7ccb43ea13 | ||
|
|
785933a8ef | ||
|
|
20f48b1e50 | ||
|
|
eb5455de68 | ||
|
|
cb9bf1fe3a | ||
|
|
99973c7781 | ||
|
|
9d89d4cb5e | ||
|
|
0470c65e6c | ||
|
|
f07b37f029 | ||
|
|
10b87c8a2e | ||
|
|
9de1f0905e | ||
|
|
095d021969 | ||
|
|
0c192ef514 | ||
|
|
946c74a538 | ||
|
|
327a1a6681 | ||
|
|
2455fa4ff5 | ||
|
|
0585dc1180 | ||
|
|
8b985c102a | ||
|
|
6fefbfc65f | ||
|
|
2012602909 | ||
|
|
c84b3475a4 | ||
|
|
f3107565f3 | ||
|
|
ac6a0fa372 | ||
|
|
5899cd4188 | ||
|
|
a3b76591b7 | ||
|
|
b8f27b73ad | ||
|
|
7ec5ff6483 | ||
|
|
f98af4ad9f | ||
|
|
ab84d61f91 | ||
|
|
8c1724af13 | ||
|
|
d8e912f66b | ||
|
|
3f6354c2a4 | ||
|
|
ba64eb5fc7 | ||
|
|
67a7a1c2d1 | ||
|
|
0572c4e0ca | ||
|
|
1b2935763b | ||
|
|
2b1833576b | ||
|
|
5592c362d4 | ||
|
|
cf3eef1874 | ||
|
|
cc3f3d7062 | ||
|
|
6beec25f43 | ||
|
|
92ce4f3af0 | ||
|
|
19bf580f93 | ||
|
|
28e5911482 | ||
|
|
257ccd41e9 | ||
|
|
796ba6d5dc | ||
|
|
a53ef64a54 | ||
|
|
5307dfc22b | ||
|
|
f380ed1581 | ||
|
|
f7313c755c | ||
|
|
3bc2191590 | ||
|
|
320f2a6015 | ||
|
|
74f9e386ed | ||
|
|
70bfdafd26 | ||
|
|
45ff349b00 | ||
|
|
d057c4e9e6 | ||
|
|
405b40f02f | ||
|
|
868fffe0dd | ||
|
|
b288c4e259 | ||
|
|
e9fe0283a9 | ||
|
|
4e26a6edc9 | ||
|
|
92b8857625 | ||
|
|
2d71243cf6 | ||
|
|
732bf712aa | ||
|
|
0901c2ca8b | ||
|
|
2352158b3d | ||
|
|
26f788d5ca | ||
|
|
9433ac5666 | ||
|
|
1a98605ce6 | ||
|
|
8d37f76d23 | ||
|
|
2d9cf4c989 | ||
|
|
f5ca71d2fb | ||
|
|
8d58c98034 | ||
|
|
50e0591540 | ||
|
|
013dfdf8c8 | ||
|
|
0796a20f24 | ||
|
|
2633d3e513 | ||
|
|
1f8fb7e58f | ||
|
|
effa0ab4c7 | ||
|
|
543a85e9f3 | ||
|
|
88eb0beeb2 |
@@ -38,6 +38,6 @@ for pid in "${saving_pids[@]}"; do
|
||||
done
|
||||
|
||||
# Make sure we don't rebuild images on the CI jobs
|
||||
git apply ../.github/assets/hive/no_sim_build.diff
|
||||
git apply ../.github/scripts/hive/no_sim_build.diff
|
||||
go build .
|
||||
mv ./hive ../hive_assets/
|
||||
10
.github/workflows/hive.yml
vendored
10
.github/workflows/hive.yml
vendored
@@ -58,11 +58,11 @@ jobs:
|
||||
uses: actions/cache@v5
|
||||
with:
|
||||
path: ./hive_assets
|
||||
key: hive-assets-${{ steps.hive-commit.outputs.hash }}-${{ hashFiles('.github/assets/hive/build_simulators.sh') }}
|
||||
key: hive-assets-${{ steps.hive-commit.outputs.hash }}-${{ hashFiles('.github/scripts/hive/build_simulators.sh') }}
|
||||
|
||||
- name: Build hive assets
|
||||
if: steps.cache-hive.outputs.cache-hit != 'true'
|
||||
run: .github/assets/hive/build_simulators.sh
|
||||
run: .github/scripts/hive/build_simulators.sh
|
||||
|
||||
- name: Load cached Docker images
|
||||
if: steps.cache-hive.outputs.cache-hit == 'true'
|
||||
@@ -213,7 +213,7 @@ jobs:
|
||||
path: /tmp
|
||||
|
||||
- name: Load Docker images
|
||||
run: .github/assets/hive/load_images.sh
|
||||
run: .github/scripts/hive/load_images.sh
|
||||
|
||||
- name: Move hive binary
|
||||
run: |
|
||||
@@ -241,11 +241,11 @@ jobs:
|
||||
FILTER="/"
|
||||
fi
|
||||
echo "filter: $FILTER"
|
||||
.github/assets/hive/run_simulator.sh "${{ matrix.scenario.sim }}" "$FILTER"
|
||||
.github/scripts/hive/run_simulator.sh "${{ matrix.scenario.sim }}" "$FILTER"
|
||||
|
||||
- name: Parse hive output
|
||||
run: |
|
||||
find hivetests/workspace/logs -type f -name "*.json" ! -name "hive.json" | xargs -I {} python .github/assets/hive/parse.py {} --exclusion .github/assets/hive/expected_failures.yaml --ignored .github/assets/hive/ignored_tests.yaml
|
||||
find hivetests/workspace/logs -type f -name "*.json" ! -name "hive.json" | xargs -I {} python .github/scripts/hive/parse.py {} --exclusion .github/scripts/hive/expected_failures.yaml --ignored .github/scripts/hive/ignored_tests.yaml
|
||||
|
||||
- name: Print simulator output
|
||||
if: ${{ failure() }}
|
||||
|
||||
10
.github/workflows/integration.yml
vendored
10
.github/workflows/integration.yml
vendored
@@ -22,7 +22,7 @@ concurrency:
|
||||
|
||||
jobs:
|
||||
test:
|
||||
name: test / ${{ matrix.network }}
|
||||
name: test / ${{ matrix.network }} / ${{ matrix.storage }}
|
||||
if: github.event_name != 'schedule'
|
||||
runs-on: depot-ubuntu-latest-4
|
||||
env:
|
||||
@@ -30,13 +30,17 @@ jobs:
|
||||
strategy:
|
||||
matrix:
|
||||
network: ["ethereum", "optimism"]
|
||||
storage: ["stable", "edge"]
|
||||
exclude:
|
||||
- network: optimism
|
||||
storage: edge
|
||||
timeout-minutes: 60
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- uses: rui314/setup-mold@v1
|
||||
- uses: dtolnay/rust-toolchain@stable
|
||||
- name: Install Geth
|
||||
run: .github/assets/install_geth.sh
|
||||
run: .github/scripts/install_geth.sh
|
||||
- uses: taiki-e/install-action@nextest
|
||||
- uses: mozilla-actions/sccache-action@v0.0.9
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
@@ -46,7 +50,7 @@ jobs:
|
||||
name: Run tests
|
||||
run: |
|
||||
cargo nextest run \
|
||||
--locked --features "asm-keccak ${{ matrix.network }}" \
|
||||
--locked --features "asm-keccak ${{ matrix.network }} ${{ matrix.storage == 'edge' && 'edge' || '' }}" \
|
||||
--workspace --exclude ef-tests \
|
||||
-E "kind(test) and not binary(e2e_testsuite)"
|
||||
- if: matrix.network == 'optimism'
|
||||
|
||||
2
.github/workflows/label-pr.yml
vendored
2
.github/workflows/label-pr.yml
vendored
@@ -19,5 +19,5 @@ jobs:
|
||||
uses: actions/github-script@v8
|
||||
with:
|
||||
script: |
|
||||
const label_pr = require('./.github/assets/label_pr.js')
|
||||
const label_pr = require('./.github/scripts/label_pr.js')
|
||||
await label_pr({github, context})
|
||||
|
||||
4
.github/workflows/lint.yml
vendored
4
.github/workflows/lint.yml
vendored
@@ -76,7 +76,7 @@ jobs:
|
||||
- name: Run Wasm checks
|
||||
run: |
|
||||
sudo apt update && sudo apt install gcc-multilib
|
||||
.github/assets/check_wasm.sh
|
||||
.github/scripts/check_wasm.sh
|
||||
|
||||
riscv:
|
||||
runs-on: depot-ubuntu-latest
|
||||
@@ -94,7 +94,7 @@ jobs:
|
||||
cache-on-failure: true
|
||||
- uses: dcarbone/install-jq-action@v3
|
||||
- name: Run RISC-V checks
|
||||
run: .github/assets/check_rv32imac.sh
|
||||
run: .github/scripts/check_rv32imac.sh
|
||||
|
||||
crate-checks:
|
||||
name: crate-checks (${{ matrix.partition }}/${{ matrix.total_partitions }})
|
||||
|
||||
2
.github/workflows/prepare-reth.yml
vendored
2
.github/workflows/prepare-reth.yml
vendored
@@ -43,7 +43,7 @@ jobs:
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
file: .github/assets/hive/Dockerfile
|
||||
file: .github/scripts/hive/Dockerfile
|
||||
tags: ${{ inputs.image_tag }}
|
||||
outputs: type=docker,dest=./artifacts/reth_image.tar
|
||||
build-args: |
|
||||
|
||||
157
Cargo.lock
generated
157
Cargo.lock
generated
@@ -186,9 +186,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-dyn-abi"
|
||||
version = "1.5.2"
|
||||
version = "1.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "369f5707b958927176265e8a58627fc6195e5dfa5c55689396e68b241b3a72e6"
|
||||
checksum = "14ff5ee5f27aa305bda825c735f686ad71bb65508158f059f513895abe69b8c3"
|
||||
dependencies = [
|
||||
"alloy-json-abi",
|
||||
"alloy-primitives",
|
||||
@@ -340,9 +340,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-json-abi"
|
||||
version = "1.5.2"
|
||||
version = "1.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "84e3cf01219c966f95a460c95f1d4c30e12f6c18150c21a30b768af2a2a29142"
|
||||
checksum = "8708475665cc00e081c085886e68eada2f64cfa08fc668213a9231655093d4de"
|
||||
dependencies = [
|
||||
"alloy-primitives",
|
||||
"alloy-sol-type-parser",
|
||||
@@ -437,9 +437,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-primitives"
|
||||
version = "1.5.2"
|
||||
version = "1.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f6a0fb18dd5fb43ec5f0f6a20be1ce0287c79825827de5744afaa6c957737c33"
|
||||
checksum = "3b88cf92ed20685979ed1d8472422f0c6c2d010cec77caf63aaa7669cc1a7bc2"
|
||||
dependencies = [
|
||||
"alloy-rlp",
|
||||
"arbitrary",
|
||||
@@ -464,7 +464,6 @@ dependencies = [
|
||||
"rustc-hash",
|
||||
"serde",
|
||||
"sha3",
|
||||
"tiny-keccak",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -794,9 +793,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-sol-macro"
|
||||
version = "1.5.2"
|
||||
version = "1.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "09eb18ce0df92b4277291bbaa0ed70545d78b02948df756bbd3d6214bf39a218"
|
||||
checksum = "f5fa1ca7e617c634d2bd9fa71f9ec8e47c07106e248b9fcbd3eaddc13cabd625"
|
||||
dependencies = [
|
||||
"alloy-sol-macro-expander",
|
||||
"alloy-sol-macro-input",
|
||||
@@ -808,9 +807,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-sol-macro-expander"
|
||||
version = "1.5.2"
|
||||
version = "1.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "95d9fa2daf21f59aa546d549943f10b5cce1ae59986774019fbedae834ffe01b"
|
||||
checksum = "27c00c0c3a75150a9dc7c8c679ca21853a137888b4e1c5569f92d7e2b15b5102"
|
||||
dependencies = [
|
||||
"alloy-sol-macro-input",
|
||||
"const-hex",
|
||||
@@ -819,16 +818,16 @@ dependencies = [
|
||||
"proc-macro-error2",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"sha3",
|
||||
"syn 2.0.114",
|
||||
"syn-solidity",
|
||||
"tiny-keccak",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "alloy-sol-macro-input"
|
||||
version = "1.5.2"
|
||||
version = "1.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9396007fe69c26ee118a19f4dee1f5d1d6be186ea75b3881adf16d87f8444686"
|
||||
checksum = "297db260eb4d67c105f68d6ba11b8874eec681caec5505eab8fbebee97f790bc"
|
||||
dependencies = [
|
||||
"const-hex",
|
||||
"dunce",
|
||||
@@ -842,9 +841,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-sol-type-parser"
|
||||
version = "1.5.2"
|
||||
version = "1.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "af67a0b0dcebe14244fc92002cd8d96ecbf65db4639d479f5fcd5805755a4c27"
|
||||
checksum = "94b91b13181d3bcd23680fd29d7bc861d1f33fbe90fdd0af67162434aeba902d"
|
||||
dependencies = [
|
||||
"serde",
|
||||
"winnow",
|
||||
@@ -852,9 +851,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-sol-types"
|
||||
version = "1.5.2"
|
||||
version = "1.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "09aeea64f09a7483bdcd4193634c7e5cf9fd7775ee767585270cd8ce2d69dc95"
|
||||
checksum = "fc442cc2a75207b708d481314098a0f8b6f7b58e3148dd8d8cc7407b0d6f9385"
|
||||
dependencies = [
|
||||
"alloy-json-abi",
|
||||
"alloy-primitives",
|
||||
@@ -1040,6 +1039,15 @@ version = "1.0.100"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61"
|
||||
|
||||
[[package]]
|
||||
name = "approx"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cab112f0a86d568ea0e627cc1d6be74a1e9cd55214684db5561995f6dad897c6"
|
||||
dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aquamarine"
|
||||
version = "0.6.0"
|
||||
@@ -2209,9 +2217,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "4.5.54"
|
||||
version = "4.5.55"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c6e6ff9dcd79cff5cd969a17a545d79e84ab086e444102a591e288a8aa3ce394"
|
||||
checksum = "3e34525d5bbbd55da2bb745d34b36121baac88d07619a9a09cfcf4a6c0832785"
|
||||
dependencies = [
|
||||
"clap_builder",
|
||||
"clap_derive",
|
||||
@@ -2219,9 +2227,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "clap_builder"
|
||||
version = "4.5.54"
|
||||
version = "4.5.55"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fa42cf4d2b7a41bc8f663a7cab4031ebafa1bf3875705bfaf8466dc60ab52c00"
|
||||
checksum = "59a20016a20a3da95bef50ec7238dbd09baeef4311dcdd38ec15aba69812fb61"
|
||||
dependencies = [
|
||||
"anstream",
|
||||
"anstyle",
|
||||
@@ -2231,9 +2239,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "clap_derive"
|
||||
version = "4.5.49"
|
||||
version = "4.5.55"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2a0b5487afeab2deb2ff4e03a807ad1a03ac532ff5a2cee5d86884440c7f7671"
|
||||
checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"proc-macro2",
|
||||
@@ -2258,35 +2266,42 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "codspeed"
|
||||
version = "2.10.1"
|
||||
version = "4.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "93f4cce9c27c49c4f101fffeebb1826f41a9df2e7498b7cd4d95c0658b796c6c"
|
||||
checksum = "38c2eb3388ebe26b5a0ab6bf4969d9c4840143d7f6df07caa3cc851b0606cef6"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"cc",
|
||||
"colored",
|
||||
"getrandom 0.2.17",
|
||||
"glob",
|
||||
"libc",
|
||||
"nix 0.30.1",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"uuid",
|
||||
"statrs",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codspeed-criterion-compat"
|
||||
version = "2.10.1"
|
||||
version = "4.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c3c23d880a28a2aab52d38ca8481dd7a3187157d0a952196b6db1db3c8499725"
|
||||
checksum = "e1e270597a1d1e183f86d1cc9f94f0133654ee3daf201c17903ee29363555dd7"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"codspeed",
|
||||
"codspeed-criterion-compat-walltime",
|
||||
"colored",
|
||||
"futures",
|
||||
"regex",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codspeed-criterion-compat-walltime"
|
||||
version = "2.10.1"
|
||||
version = "4.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7b0a2f7365e347f4f22a67e9ea689bf7bc89900a354e22e26cf8a531a42c8fbb"
|
||||
checksum = "e6c2613d2fac930fe34456be76f9124ee0800bb9db2e7fd2d6c65b9ebe98a292"
|
||||
dependencies = [
|
||||
"anes",
|
||||
"cast",
|
||||
@@ -3590,7 +3605,6 @@ version = "0.0.0"
|
||||
dependencies = [
|
||||
"alloy-eips",
|
||||
"alloy-evm",
|
||||
"alloy-sol-macro",
|
||||
"alloy-sol-types",
|
||||
"eyre",
|
||||
"reth-ethereum",
|
||||
@@ -4819,9 +4833,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "iana-time-zone"
|
||||
version = "0.1.64"
|
||||
version = "0.1.65"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "33e57f83510bb73707521ebaffa789ec8caf86f9657cad665b092b581d40e9fb"
|
||||
checksum = "e31bc9ad994ba00e440a8aa5c9ef0ec67d5cb5e5cb0cc7f8b744a35b389cc470"
|
||||
dependencies = [
|
||||
"android_system_properties",
|
||||
"core-foundation-sys",
|
||||
@@ -5489,9 +5503,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "keccak-asm"
|
||||
version = "0.1.4"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "505d1856a39b200489082f90d897c3f07c455563880bc5952e38eabf731c83b6"
|
||||
checksum = "b646a74e746cd25045aa0fd42f4f7f78aa6d119380182c7e63a5593c4ab8df6f"
|
||||
dependencies = [
|
||||
"digest 0.10.7",
|
||||
"sha3-asm",
|
||||
@@ -5979,9 +5993,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "moka"
|
||||
version = "0.12.12"
|
||||
version = "0.12.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a3dec6bd31b08944e08b58fd99373893a6c17054d6f3ea5006cc894f4f4eee2a"
|
||||
checksum = "b4ac832c50ced444ef6be0767a008b02c106a909ba79d1d830501e94b96f6b7e"
|
||||
dependencies = [
|
||||
"crossbeam-channel",
|
||||
"crossbeam-epoch",
|
||||
@@ -6095,9 +6109,12 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "notify-types"
|
||||
version = "2.0.0"
|
||||
version = "2.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5e0826a989adedc2a244799e823aece04662b66609d96af8dff7ac6df9a8925d"
|
||||
checksum = "42b8cfee0e339a0337359f3c88165702ac6e600dc01c0cc9579a92d62b08477a"
|
||||
dependencies = [
|
||||
"bitflags 2.10.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ntapi"
|
||||
@@ -7797,6 +7814,7 @@ dependencies = [
|
||||
"itertools 0.14.0",
|
||||
"lz4",
|
||||
"metrics",
|
||||
"parking_lot",
|
||||
"proptest",
|
||||
"proptest-arbitrary-interop",
|
||||
"ratatui",
|
||||
@@ -8041,6 +8059,7 @@ dependencies = [
|
||||
"derive_more",
|
||||
"metrics",
|
||||
"modular-bitfield",
|
||||
"op-alloy-consensus",
|
||||
"parity-scale-codec",
|
||||
"proptest",
|
||||
"proptest-arbitrary-interop",
|
||||
@@ -8048,7 +8067,6 @@ dependencies = [
|
||||
"reth-codecs",
|
||||
"reth-db-models",
|
||||
"reth-ethereum-primitives",
|
||||
"reth-optimism-primitives",
|
||||
"reth-primitives-traits",
|
||||
"reth-prune-types",
|
||||
"reth-stages-types",
|
||||
@@ -8319,7 +8337,6 @@ dependencies = [
|
||||
"reth-chainspec",
|
||||
"reth-engine-primitives",
|
||||
"reth-ethereum-engine-primitives",
|
||||
"reth-optimism-chainspec",
|
||||
"reth-payload-builder",
|
||||
"reth-payload-primitives",
|
||||
"reth-primitives-traits",
|
||||
@@ -10526,9 +10543,7 @@ dependencies = [
|
||||
"op-alloy-rpc-types",
|
||||
"reth-ethereum-primitives",
|
||||
"reth-evm",
|
||||
"reth-optimism-primitives",
|
||||
"reth-primitives-traits",
|
||||
"reth-storage-api",
|
||||
"serde_json",
|
||||
"thiserror 2.0.18",
|
||||
]
|
||||
@@ -10718,6 +10733,7 @@ version = "1.10.2"
|
||||
dependencies = [
|
||||
"alloy-consensus",
|
||||
"alloy-eips",
|
||||
"alloy-genesis",
|
||||
"alloy-primitives",
|
||||
"alloy-rlp",
|
||||
"assert_matches",
|
||||
@@ -10737,6 +10753,7 @@ dependencies = [
|
||||
"reth-consensus",
|
||||
"reth-db",
|
||||
"reth-db-api",
|
||||
"reth-db-common",
|
||||
"reth-downloaders",
|
||||
"reth-era",
|
||||
"reth-era-downloader",
|
||||
@@ -10762,6 +10779,7 @@ dependencies = [
|
||||
"reth-storage-api",
|
||||
"reth-storage-errors",
|
||||
"reth-testing-utils",
|
||||
"reth-tracing",
|
||||
"reth-trie",
|
||||
"reth-trie-db",
|
||||
"tempfile",
|
||||
@@ -12204,9 +12222,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sha3-asm"
|
||||
version = "0.1.4"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c28efc5e327c837aa837c59eae585fc250715ef939ac32881bcc11677cd02d46"
|
||||
checksum = "b31139435f327c93c6038ed350ae4588e2c70a13d50599509fee6349967ba35a"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"cfg-if",
|
||||
@@ -12318,9 +12336,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "siphasher"
|
||||
version = "1.0.1"
|
||||
version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d"
|
||||
checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e"
|
||||
|
||||
[[package]]
|
||||
name = "sketches-ddsketch"
|
||||
@@ -12436,6 +12454,16 @@ version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
|
||||
|
||||
[[package]]
|
||||
name = "statrs"
|
||||
version = "0.18.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2a3fe7c28c6512e766b0874335db33c94ad7b8f9054228ae1c2abd47ce7d335e"
|
||||
dependencies = [
|
||||
"approx",
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "strsim"
|
||||
version = "0.11.1"
|
||||
@@ -12515,9 +12543,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "syn-solidity"
|
||||
version = "1.5.2"
|
||||
version = "1.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5f92d01b5de07eaf324f7fca61cc6bd3d82bbc1de5b6c963e6fe79e86f36580d"
|
||||
checksum = "2379beea9476b89d0237078be761cf8e012d92d5ae4ae0c9a329f974838870fc"
|
||||
dependencies = [
|
||||
"paste",
|
||||
"proc-macro2",
|
||||
@@ -12825,15 +12853,6 @@ dependencies = [
|
||||
"time-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tiny-keccak"
|
||||
version = "2.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237"
|
||||
dependencies = [
|
||||
"crunchy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tinystr"
|
||||
version = "0.8.2"
|
||||
@@ -13025,9 +13044,9 @@ checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801"
|
||||
|
||||
[[package]]
|
||||
name = "tonic"
|
||||
version = "0.14.2"
|
||||
version = "0.14.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203"
|
||||
checksum = "a286e33f82f8a1ee2df63f4fa35c0becf4a85a0cb03091a15fd7bf0b402dc94a"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"base64 0.22.1",
|
||||
@@ -13051,9 +13070,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tonic-prost"
|
||||
version = "0.14.2"
|
||||
version = "0.14.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67"
|
||||
checksum = "d6c55a2d6a14174563de34409c9f92ff981d006f56da9c6ecd40d9d4a31500b0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"prost 0.14.3",
|
||||
@@ -14440,18 +14459,18 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy"
|
||||
version = "0.8.33"
|
||||
version = "0.8.35"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "668f5168d10b9ee831de31933dc111a459c97ec93225beb307aed970d1372dfd"
|
||||
checksum = "fdea86ddd5568519879b8187e1cf04e24fce28f7fe046ceecbce472ff19a2572"
|
||||
dependencies = [
|
||||
"zerocopy-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy-derive"
|
||||
version = "0.8.33"
|
||||
version = "0.8.35"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2c7962b26b0a8685668b671ee4b54d007a67d4eaf05fda79ac0ecf41e32270f1"
|
||||
checksum = "0c15e1b46eff7c6c91195752e0eeed8ef040e391cdece7c25376957d5f15df22"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -14535,9 +14554,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "zmij"
|
||||
version = "1.0.16"
|
||||
version = "1.0.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dfcd145825aace48cff44a8844de64bf75feec3080e0aa5cdbde72961ae51a65"
|
||||
checksum = "02aae0f83f69aafc94776e879363e9771d7ecbffe2c7fbb6c14c5e00dfe88439"
|
||||
|
||||
[[package]]
|
||||
name = "zstd"
|
||||
|
||||
21
Cargo.toml
21
Cargo.toml
@@ -77,6 +77,10 @@ members = [
|
||||
"crates/optimism/cli",
|
||||
"crates/optimism/consensus",
|
||||
"crates/optimism/evm/",
|
||||
"crates/optimism/examples/custom-node",
|
||||
"crates/optimism/examples/engine-api-access",
|
||||
"crates/optimism/examples/exex-hello-world",
|
||||
"crates/optimism/examples/op-db-access",
|
||||
"crates/optimism/flashblocks/",
|
||||
"crates/optimism/hardforks/",
|
||||
"crates/optimism/node/",
|
||||
@@ -145,7 +149,6 @@ members = [
|
||||
"examples/beacon-api-sse/",
|
||||
"examples/bsc-p2p",
|
||||
"examples/custom-dev-node/",
|
||||
"examples/custom-node/",
|
||||
"examples/custom-engine-types/",
|
||||
"examples/custom-evm/",
|
||||
"examples/custom-hardforks/",
|
||||
@@ -154,10 +157,7 @@ members = [
|
||||
"examples/custom-payload-builder/",
|
||||
"examples/custom-rlpx-subprotocol",
|
||||
"examples/custom-rpc-middleware",
|
||||
"examples/custom-node",
|
||||
"examples/db-access",
|
||||
"examples/engine-api-access",
|
||||
"examples/exex-hello-world",
|
||||
"examples/exex-subscription",
|
||||
"examples/exex-test",
|
||||
"examples/full-contract-state",
|
||||
@@ -168,7 +168,6 @@ members = [
|
||||
"examples/node-builder-api/",
|
||||
"examples/node-custom-rpc/",
|
||||
"examples/node-event-hooks/",
|
||||
"examples/op-db-access/",
|
||||
"examples/polygon-p2p/",
|
||||
"examples/rpc-db/",
|
||||
"examples/precompile-cache/",
|
||||
@@ -484,15 +483,15 @@ op-revm = { version = "15.0.0", default-features = false }
|
||||
revm-inspectors = "0.34.1"
|
||||
|
||||
# eth
|
||||
alloy-dyn-abi = "1.5.4"
|
||||
alloy-primitives = { version = "1.5.4", default-features = false, features = ["map-foldhash"] }
|
||||
alloy-sol-types = { version = "1.5.4", default-features = false }
|
||||
|
||||
alloy-chains = { version = "0.2.5", default-features = false }
|
||||
alloy-dyn-abi = "1.5.2"
|
||||
alloy-eip2124 = { version = "0.2.0", default-features = false }
|
||||
alloy-eip7928 = { version = "0.3.0", default-features = false }
|
||||
alloy-evm = { version = "0.27.0", default-features = false }
|
||||
alloy-primitives = { version = "1.5.0", default-features = false, features = ["map-foldhash"] }
|
||||
alloy-rlp = { version = "0.3.10", default-features = false, features = ["core-net"] }
|
||||
alloy-sol-macro = "1.5.0"
|
||||
alloy-sol-types = { version = "1.5.0", default-features = false }
|
||||
alloy-trie = { version = "0.9.1", default-features = false }
|
||||
|
||||
alloy-hardforks = "0.4.5"
|
||||
@@ -671,7 +670,7 @@ tracing-opentelemetry = "0.32"
|
||||
# misc-testing
|
||||
arbitrary = "1.3"
|
||||
assert_matches = "1.5.0"
|
||||
criterion = { package = "codspeed-criterion-compat", version = "2.7" }
|
||||
criterion = { package = "codspeed-criterion-compat", version = "4.3" }
|
||||
insta = "1.41"
|
||||
proptest = "1.7"
|
||||
proptest-derive = "0.5"
|
||||
@@ -733,7 +732,7 @@ snap = "1.1.1"
|
||||
socket2 = { version = "0.5", default-features = false }
|
||||
sysinfo = { version = "0.33", default-features = false }
|
||||
tracing-journald = "0.3"
|
||||
tracing-logfmt = "0.3.3"
|
||||
tracing-logfmt = "=0.3.5"
|
||||
tracing-samply = "0.1"
|
||||
tracing-subscriber = { version = "0.3", default-features = false }
|
||||
tracing-tracy = "0.11"
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
# reth: --build-arg BINARY=reth
|
||||
# op-reth: --build-arg BINARY=op-reth --build-arg MANIFEST_PATH=crates/optimism/bin
|
||||
|
||||
FROM lukemathwalker/cargo-chef:latest-rust-1 AS chef
|
||||
FROM rust:1 AS builder
|
||||
WORKDIR /app
|
||||
|
||||
LABEL org.opencontainers.image.source=https://github.com/paradigmxyz/reth
|
||||
@@ -19,14 +19,6 @@ ENV RUSTC_WRAPPER=sccache
|
||||
ENV SCCACHE_DIR=/sccache
|
||||
ENV SCCACHE_WEBDAV_ENDPOINT=https://cache.depot.dev
|
||||
|
||||
# Builds a cargo-chef plan
|
||||
FROM chef AS planner
|
||||
COPY --exclude=.git . .
|
||||
RUN cargo chef prepare --recipe-path recipe.json
|
||||
|
||||
FROM chef AS builder
|
||||
COPY --from=planner /app/recipe.json recipe.json
|
||||
|
||||
# Binary to build (reth or op-reth)
|
||||
ARG BINARY=reth
|
||||
|
||||
@@ -53,13 +45,6 @@ ENV VERGEN_GIT_SHA=$VERGEN_GIT_SHA
|
||||
ENV VERGEN_GIT_DESCRIBE=$VERGEN_GIT_DESCRIBE
|
||||
ENV VERGEN_GIT_DIRTY=$VERGEN_GIT_DIRTY
|
||||
|
||||
# Build dependencies
|
||||
RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \
|
||||
--mount=type=cache,target=/usr/local/cargo/registry,sharing=shared \
|
||||
--mount=type=cache,target=/usr/local/cargo/git,sharing=shared \
|
||||
--mount=type=cache,target=$SCCACHE_DIR,sharing=shared \
|
||||
cargo chef cook --profile $BUILD_PROFILE --features "$FEATURES" --locked --recipe-path recipe.json --manifest-path $MANIFEST_PATH/Cargo.toml
|
||||
|
||||
# Build application
|
||||
COPY --exclude=.git . .
|
||||
RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \
|
||||
|
||||
@@ -274,10 +274,10 @@ impl Args {
|
||||
/// Get the default RPC URL for a given chain
|
||||
const fn get_default_rpc_url(chain: &Chain) -> &'static str {
|
||||
match chain.id() {
|
||||
8453 => "https://base-mainnet.rpc.ithaca.xyz", // base
|
||||
8453 => "https://base.reth.rs/rpc", // base
|
||||
84532 => "https://base-sepolia.rpc.ithaca.xyz", // base-sepolia
|
||||
27082 => "https://rpc.hoodi.ethpandaops.io", // hoodi
|
||||
_ => "https://reth-ethereum.ithaca.xyz/rpc", // mainnet and fallback
|
||||
_ => "https://ethereum.reth.rs/rpc", // mainnet and fallback
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -132,13 +132,24 @@ impl<S: TransactionSource> TransactionCollector<S> {
|
||||
/// Collect transactions starting from the given block number.
|
||||
///
|
||||
/// Skips blob transactions (type 3) and collects until target gas is reached.
|
||||
/// Returns the collected raw transaction bytes, total gas used, and the next block number.
|
||||
pub async fn collect(&self, start_block: u64) -> eyre::Result<(Vec<Bytes>, u64, u64)> {
|
||||
let mut transactions: Vec<Bytes> = Vec::new();
|
||||
/// Returns a `CollectionResult` with transactions, gas info, and next block.
|
||||
pub async fn collect(&self, start_block: u64) -> eyre::Result<CollectionResult> {
|
||||
self.collect_gas(start_block, self.target_gas).await
|
||||
}
|
||||
|
||||
/// Collect transactions up to a specific gas target.
|
||||
///
|
||||
/// This is used both for initial collection and for retry top-ups.
|
||||
pub async fn collect_gas(
|
||||
&self,
|
||||
start_block: u64,
|
||||
gas_target: u64,
|
||||
) -> eyre::Result<CollectionResult> {
|
||||
let mut transactions: Vec<RawTransaction> = Vec::new();
|
||||
let mut total_gas: u64 = 0;
|
||||
let mut current_block = start_block;
|
||||
|
||||
while total_gas < self.target_gas {
|
||||
while total_gas < gas_target {
|
||||
let Some((block_txs, _)) = self.source.fetch_block_transactions(current_block).await?
|
||||
else {
|
||||
warn!(block = current_block, "Block not found, stopping");
|
||||
@@ -151,12 +162,12 @@ impl<S: TransactionSource> TransactionCollector<S> {
|
||||
continue;
|
||||
}
|
||||
|
||||
if total_gas + tx.gas_used <= self.target_gas {
|
||||
transactions.push(tx.raw);
|
||||
if total_gas + tx.gas_used <= gas_target {
|
||||
total_gas += tx.gas_used;
|
||||
transactions.push(tx);
|
||||
}
|
||||
|
||||
if total_gas >= self.target_gas {
|
||||
if total_gas >= gas_target {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -164,7 +175,7 @@ impl<S: TransactionSource> TransactionCollector<S> {
|
||||
current_block += 1;
|
||||
|
||||
// Stop early if remaining gas is under 1M (close enough to target)
|
||||
let remaining_gas = self.target_gas.saturating_sub(total_gas);
|
||||
let remaining_gas = gas_target.saturating_sub(total_gas);
|
||||
if remaining_gas < 1_000_000 {
|
||||
break;
|
||||
}
|
||||
@@ -172,12 +183,12 @@ impl<S: TransactionSource> TransactionCollector<S> {
|
||||
|
||||
info!(
|
||||
total_txs = transactions.len(),
|
||||
total_gas,
|
||||
gas_sent = total_gas,
|
||||
next_block = current_block,
|
||||
"Finished collecting transactions"
|
||||
);
|
||||
|
||||
Ok((transactions, total_gas, current_block))
|
||||
Ok(CollectionResult { transactions, gas_sent: total_gas, next_block: current_block })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -252,6 +263,80 @@ struct BuiltPayload {
|
||||
envelope: ExecutionPayloadEnvelopeV4,
|
||||
block_hash: B256,
|
||||
timestamp: u64,
|
||||
/// The actual gas used in the built block.
|
||||
gas_used: u64,
|
||||
}
|
||||
|
||||
/// Result of collecting transactions from blocks.
|
||||
#[derive(Debug)]
|
||||
pub struct CollectionResult {
|
||||
/// Collected transactions with their gas info.
|
||||
pub transactions: Vec<RawTransaction>,
|
||||
/// Total gas sent (sum of historical `gas_used` for all collected txs).
|
||||
pub gas_sent: u64,
|
||||
/// Next block number to continue collecting from.
|
||||
pub next_block: u64,
|
||||
}
|
||||
|
||||
/// Constants for retry logic.
|
||||
const MAX_BUILD_RETRIES: u32 = 5;
|
||||
/// Maximum retries for fetching a transaction batch.
|
||||
const MAX_FETCH_RETRIES: u32 = 5;
|
||||
/// Tolerance: if `gas_used` is within 1M of target, don't retry.
|
||||
const MIN_TARGET_SLACK: u64 = 1_000_000;
|
||||
/// Maximum gas to request in retries (10x target as safety cap).
|
||||
const MAX_ADDITIONAL_GAS_MULTIPLIER: u64 = 10;
|
||||
|
||||
/// Fetches a batch of transactions with retry logic.
|
||||
///
|
||||
/// Returns `None` if all retries are exhausted.
|
||||
async fn fetch_batch_with_retry<S: TransactionSource>(
|
||||
collector: &TransactionCollector<S>,
|
||||
block: u64,
|
||||
) -> Option<CollectionResult> {
|
||||
for attempt in 1..=MAX_FETCH_RETRIES {
|
||||
match collector.collect(block).await {
|
||||
Ok(result) => return Some(result),
|
||||
Err(e) => {
|
||||
if attempt == MAX_FETCH_RETRIES {
|
||||
warn!(attempt, error = %e, "Failed to fetch transactions after max retries");
|
||||
return None;
|
||||
}
|
||||
warn!(attempt, error = %e, "Failed to fetch transactions, retrying...");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Outcome of a build attempt check.
|
||||
enum RetryOutcome {
|
||||
/// Payload is close enough to target gas.
|
||||
Success,
|
||||
/// Max retries reached, accept what we have.
|
||||
MaxRetries,
|
||||
/// Need more transactions with the specified gas amount.
|
||||
NeedMore(u64),
|
||||
}
|
||||
|
||||
/// Buffer for receiving transaction batches from the fetcher.
|
||||
///
|
||||
/// This abstracts over the channel to allow the main loop to request
|
||||
/// batches on demand, including for retries.
|
||||
struct TxBuffer {
|
||||
receiver: mpsc::Receiver<CollectionResult>,
|
||||
}
|
||||
|
||||
impl TxBuffer {
|
||||
const fn new(receiver: mpsc::Receiver<CollectionResult>) -> Self {
|
||||
Self { receiver }
|
||||
}
|
||||
|
||||
/// Take the next available batch from the fetcher.
|
||||
async fn take_batch(&mut self) -> Option<CollectionResult> {
|
||||
self.receiver.recv().await
|
||||
}
|
||||
}
|
||||
|
||||
impl Command {
|
||||
@@ -312,19 +397,20 @@ impl Command {
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
// Single payload - collect transactions and build
|
||||
// Single payload - collect transactions and build with retry
|
||||
let tx_source = RpcTransactionSource::from_url(&self.rpc_url)?;
|
||||
let collector = TransactionCollector::new(tx_source, self.target_gas);
|
||||
let (transactions, _total_gas, _next_block) = collector.collect(start_block).await?;
|
||||
let result = collector.collect(start_block).await?;
|
||||
|
||||
if transactions.is_empty() {
|
||||
if result.transactions.is_empty() {
|
||||
return Err(eyre::eyre!("No transactions collected"));
|
||||
}
|
||||
|
||||
self.execute_sequential(
|
||||
self.execute_sequential_with_retry(
|
||||
&auth_provider,
|
||||
&testing_provider,
|
||||
transactions,
|
||||
&collector,
|
||||
result,
|
||||
parent_hash,
|
||||
parent_timestamp,
|
||||
)
|
||||
@@ -335,32 +421,34 @@ impl Command {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Sequential execution path for single payload or no-execute mode.
|
||||
async fn execute_sequential(
|
||||
/// Sequential execution path with retry logic for underfilled payloads.
|
||||
async fn execute_sequential_with_retry<S: TransactionSource>(
|
||||
&self,
|
||||
auth_provider: &RootProvider<AnyNetwork>,
|
||||
testing_provider: &RootProvider<AnyNetwork>,
|
||||
transactions: Vec<Bytes>,
|
||||
collector: &TransactionCollector<S>,
|
||||
initial_result: CollectionResult,
|
||||
mut parent_hash: B256,
|
||||
mut parent_timestamp: u64,
|
||||
) -> eyre::Result<()> {
|
||||
for i in 0..self.count {
|
||||
info!(
|
||||
payload = i + 1,
|
||||
total = self.count,
|
||||
parent_hash = %parent_hash,
|
||||
parent_timestamp = parent_timestamp,
|
||||
"Building payload via testing_buildBlockV1"
|
||||
);
|
||||
let mut current_result = initial_result;
|
||||
|
||||
for i in 0..self.count {
|
||||
let built = self
|
||||
.build_payload(testing_provider, &transactions, i, parent_hash, parent_timestamp)
|
||||
.build_with_retry(
|
||||
testing_provider,
|
||||
collector,
|
||||
&mut current_result,
|
||||
i,
|
||||
parent_hash,
|
||||
parent_timestamp,
|
||||
)
|
||||
.await?;
|
||||
|
||||
self.save_payload(&built)?;
|
||||
|
||||
if self.execute || self.count > 1 {
|
||||
info!(payload = i + 1, block_hash = %built.block_hash, "Executing payload (newPayload + FCU)");
|
||||
info!(payload = i + 1, block_hash = %built.block_hash, gas_used = built.gas_used, "Executing payload (newPayload + FCU)");
|
||||
self.execute_payload_v4(auth_provider, built.envelope, parent_hash).await?;
|
||||
info!(payload = i + 1, "Payload executed successfully");
|
||||
}
|
||||
@@ -371,7 +459,62 @@ impl Command {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Pipelined execution - fetches transactions and builds payloads in background.
|
||||
/// Build a payload with retry logic when `gas_used` is below target.
|
||||
///
|
||||
/// Uses the ratio of `gas_used/gas_sent` to estimate how many more transactions
|
||||
/// are needed to hit the target gas.
|
||||
async fn build_with_retry<S: TransactionSource>(
|
||||
&self,
|
||||
testing_provider: &RootProvider<AnyNetwork>,
|
||||
collector: &TransactionCollector<S>,
|
||||
result: &mut CollectionResult,
|
||||
index: u64,
|
||||
parent_hash: B256,
|
||||
parent_timestamp: u64,
|
||||
) -> eyre::Result<BuiltPayload> {
|
||||
for attempt in 1..=MAX_BUILD_RETRIES {
|
||||
let tx_bytes: Vec<Bytes> = result.transactions.iter().map(|t| t.raw.clone()).collect();
|
||||
let gas_sent = result.gas_sent;
|
||||
|
||||
info!(
|
||||
payload = index + 1,
|
||||
attempt,
|
||||
tx_count = tx_bytes.len(),
|
||||
gas_sent,
|
||||
parent_hash = %parent_hash,
|
||||
"Building payload via testing_buildBlockV1"
|
||||
);
|
||||
|
||||
let built = Self::build_payload_static(
|
||||
testing_provider,
|
||||
&tx_bytes,
|
||||
index,
|
||||
parent_hash,
|
||||
parent_timestamp,
|
||||
)
|
||||
.await?;
|
||||
|
||||
match self.check_retry_outcome(&built, index, attempt, gas_sent) {
|
||||
RetryOutcome::Success | RetryOutcome::MaxRetries => return Ok(built),
|
||||
RetryOutcome::NeedMore(additional_gas) => {
|
||||
let additional =
|
||||
collector.collect_gas(result.next_block, additional_gas).await?;
|
||||
result.transactions.extend(additional.transactions);
|
||||
result.gas_sent = result.gas_sent.saturating_add(additional.gas_sent);
|
||||
result.next_block = additional.next_block;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
warn!(payload = index + 1, "Retry loop exited without returning a payload");
|
||||
Err(eyre::eyre!("build_with_retry exhausted retries without result"))
|
||||
}
|
||||
|
||||
/// Pipelined execution - fetches transactions in background, builds with retry.
|
||||
///
|
||||
/// The fetcher continuously produces transaction batches. The main loop consumes them,
|
||||
/// builds payloads with retry logic (requesting more transactions if underfilled),
|
||||
/// and executes each payload before moving to the next.
|
||||
async fn execute_pipelined(
|
||||
&self,
|
||||
auth_provider: &RootProvider<AnyNetwork>,
|
||||
@@ -380,180 +523,220 @@ impl Command {
|
||||
initial_parent_hash: B256,
|
||||
initial_parent_timestamp: u64,
|
||||
) -> eyre::Result<()> {
|
||||
// Create channel for transaction batches (one batch per payload)
|
||||
let (tx_sender, mut tx_receiver) = mpsc::channel::<Vec<Bytes>>(self.prefetch_buffer);
|
||||
// Create channel for transaction batches - fetcher sends CollectionResult
|
||||
let (tx_sender, tx_receiver) = mpsc::channel::<CollectionResult>(self.prefetch_buffer);
|
||||
|
||||
// Spawn background task to continuously fetch transaction batches
|
||||
let rpc_url = self.rpc_url.clone();
|
||||
let target_gas = self.target_gas;
|
||||
let count = self.count;
|
||||
|
||||
let fetcher_handle = tokio::spawn(async move {
|
||||
let tx_source = match RpcTransactionSource::from_url(&rpc_url) {
|
||||
Ok(source) => source,
|
||||
Err(e) => {
|
||||
warn!(error = %e, "Failed to create transaction source");
|
||||
return;
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let collector = TransactionCollector::new(tx_source, target_gas);
|
||||
let mut current_block = start_block;
|
||||
|
||||
for payload_idx in 0..count {
|
||||
const MAX_RETRIES: u32 = 5;
|
||||
let mut attempts = 0;
|
||||
let result = loop {
|
||||
attempts += 1;
|
||||
match collector.collect(current_block).await {
|
||||
Ok(res) => break Some(res),
|
||||
Err(e) => {
|
||||
if attempts >= MAX_RETRIES {
|
||||
warn!(payload = payload_idx + 1, attempts, error = %e, "Failed to fetch transactions after max retries");
|
||||
break None;
|
||||
}
|
||||
warn!(payload = payload_idx + 1, attempts, error = %e, "Failed to fetch transactions, retrying...");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let Some((transactions, total_gas, next_block)) = result else {
|
||||
while let Some(batch) = fetch_batch_with_retry(&collector, current_block).await {
|
||||
if batch.transactions.is_empty() {
|
||||
info!(block = current_block, "Reached chain tip, stopping fetcher");
|
||||
break;
|
||||
};
|
||||
}
|
||||
|
||||
info!(
|
||||
payload = payload_idx + 1,
|
||||
tx_count = transactions.len(),
|
||||
total_gas,
|
||||
blocks = format!("{}..{}", current_block, next_block),
|
||||
"Fetched transactions"
|
||||
tx_count = batch.transactions.len(),
|
||||
gas_sent = batch.gas_sent,
|
||||
blocks = format!("{}..{}", current_block, batch.next_block),
|
||||
"Fetched transaction batch"
|
||||
);
|
||||
current_block = next_block;
|
||||
current_block = batch.next_block;
|
||||
|
||||
if tx_sender.send(transactions).await.is_err() {
|
||||
if tx_sender.send(batch).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Some(current_block)
|
||||
});
|
||||
|
||||
// Transaction buffer: holds transactions from batches + any extras from retries
|
||||
let mut tx_buffer = TxBuffer::new(tx_receiver);
|
||||
|
||||
let mut parent_hash = initial_parent_hash;
|
||||
let mut parent_timestamp = initial_parent_timestamp;
|
||||
let mut pending_build: Option<tokio::task::JoinHandle<eyre::Result<BuiltPayload>>> = None;
|
||||
|
||||
for i in 0..self.count {
|
||||
let is_last = i == self.count - 1;
|
||||
// Get initial batch of transactions for this payload
|
||||
let mut result = tx_buffer
|
||||
.take_batch()
|
||||
.await
|
||||
.ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?;
|
||||
|
||||
// Get current payload (either from pending build or build now)
|
||||
let current_payload = if let Some(handle) = pending_build.take() {
|
||||
handle.await??
|
||||
} else {
|
||||
// First payload - wait for transactions and build synchronously
|
||||
let transactions = tx_receiver
|
||||
.recv()
|
||||
.await
|
||||
.ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?;
|
||||
if result.transactions.is_empty() {
|
||||
return Err(eyre::eyre!("No transactions collected for payload {}", i + 1));
|
||||
}
|
||||
|
||||
if transactions.is_empty() {
|
||||
return Err(eyre::eyre!("No transactions collected for payload {}", i + 1));
|
||||
}
|
||||
|
||||
info!(
|
||||
payload = i + 1,
|
||||
total = self.count,
|
||||
parent_hash = %parent_hash,
|
||||
parent_timestamp = parent_timestamp,
|
||||
tx_count = transactions.len(),
|
||||
"Building payload via testing_buildBlockV1"
|
||||
);
|
||||
self.build_payload(
|
||||
// Build with retry - may need to request more transactions
|
||||
let built = self
|
||||
.build_with_retry_buffered(
|
||||
testing_provider,
|
||||
&transactions,
|
||||
&mut tx_buffer,
|
||||
&mut result,
|
||||
i,
|
||||
parent_hash,
|
||||
parent_timestamp,
|
||||
)
|
||||
.await?
|
||||
};
|
||||
.await?;
|
||||
|
||||
self.save_payload(¤t_payload)?;
|
||||
self.save_payload(&built)?;
|
||||
|
||||
let current_block_hash = current_payload.block_hash;
|
||||
let current_timestamp = current_payload.timestamp;
|
||||
let current_block_hash = built.block_hash;
|
||||
let current_timestamp = built.timestamp;
|
||||
|
||||
// Execute current payload first
|
||||
info!(payload = i + 1, block_hash = %current_block_hash, "Executing payload (newPayload + FCU)");
|
||||
self.execute_payload_v4(auth_provider, current_payload.envelope, parent_hash).await?;
|
||||
// Execute payload
|
||||
info!(payload = i + 1, block_hash = %current_block_hash, gas_used = built.gas_used, "Executing payload (newPayload + FCU)");
|
||||
self.execute_payload_v4(auth_provider, built.envelope, parent_hash).await?;
|
||||
info!(payload = i + 1, "Payload executed successfully");
|
||||
|
||||
// Start building next payload in background (if not last) - AFTER execution
|
||||
if !is_last {
|
||||
// Get transactions for next payload (should already be fetched or fetching)
|
||||
let next_transactions = tx_receiver
|
||||
.recv()
|
||||
.await
|
||||
.ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?;
|
||||
|
||||
if next_transactions.is_empty() {
|
||||
return Err(eyre::eyre!("No transactions collected for payload {}", i + 2));
|
||||
}
|
||||
|
||||
let testing_provider = testing_provider.clone();
|
||||
let next_index = i + 1;
|
||||
let total = self.count;
|
||||
|
||||
pending_build = Some(tokio::spawn(async move {
|
||||
info!(
|
||||
payload = next_index + 1,
|
||||
total = total,
|
||||
parent_hash = %current_block_hash,
|
||||
parent_timestamp = current_timestamp,
|
||||
tx_count = next_transactions.len(),
|
||||
"Building payload via testing_buildBlockV1"
|
||||
);
|
||||
|
||||
Self::build_payload_static(
|
||||
&testing_provider,
|
||||
&next_transactions,
|
||||
next_index,
|
||||
current_block_hash,
|
||||
current_timestamp,
|
||||
)
|
||||
.await
|
||||
}));
|
||||
}
|
||||
|
||||
parent_hash = current_block_hash;
|
||||
parent_timestamp = current_timestamp;
|
||||
}
|
||||
|
||||
// Clean up the fetcher task
|
||||
drop(tx_receiver);
|
||||
drop(tx_buffer);
|
||||
let _ = fetcher_handle.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Build a single payload via `testing_buildBlockV1`.
|
||||
async fn build_payload(
|
||||
/// Build a payload with retry logic, using the buffered transaction source.
|
||||
async fn build_with_retry_buffered(
|
||||
&self,
|
||||
testing_provider: &RootProvider<AnyNetwork>,
|
||||
transactions: &[Bytes],
|
||||
tx_buffer: &mut TxBuffer,
|
||||
result: &mut CollectionResult,
|
||||
index: u64,
|
||||
parent_hash: B256,
|
||||
parent_timestamp: u64,
|
||||
) -> eyre::Result<BuiltPayload> {
|
||||
Self::build_payload_static(
|
||||
testing_provider,
|
||||
transactions,
|
||||
index,
|
||||
parent_hash,
|
||||
parent_timestamp,
|
||||
)
|
||||
.await
|
||||
for attempt in 1..=MAX_BUILD_RETRIES {
|
||||
let tx_bytes: Vec<Bytes> = result.transactions.iter().map(|t| t.raw.clone()).collect();
|
||||
let gas_sent = result.gas_sent;
|
||||
|
||||
info!(
|
||||
payload = index + 1,
|
||||
attempt,
|
||||
tx_count = tx_bytes.len(),
|
||||
gas_sent,
|
||||
parent_hash = %parent_hash,
|
||||
"Building payload via testing_buildBlockV1"
|
||||
);
|
||||
|
||||
let built = Self::build_payload_static(
|
||||
testing_provider,
|
||||
&tx_bytes,
|
||||
index,
|
||||
parent_hash,
|
||||
parent_timestamp,
|
||||
)
|
||||
.await?;
|
||||
|
||||
match self.check_retry_outcome(&built, index, attempt, gas_sent) {
|
||||
RetryOutcome::Success | RetryOutcome::MaxRetries => return Ok(built),
|
||||
RetryOutcome::NeedMore(additional_gas) => {
|
||||
let mut collected_gas = 0u64;
|
||||
while collected_gas < additional_gas {
|
||||
if let Some(batch) = tx_buffer.take_batch().await {
|
||||
collected_gas += batch.gas_sent;
|
||||
result.transactions.extend(batch.transactions);
|
||||
result.gas_sent = result.gas_sent.saturating_add(batch.gas_sent);
|
||||
result.next_block = batch.next_block;
|
||||
} else {
|
||||
warn!("Transaction fetcher exhausted, proceeding with available transactions");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
warn!(payload = index + 1, "Retry loop exited without returning a payload");
|
||||
Err(eyre::eyre!("build_with_retry_buffered exhausted retries without result"))
|
||||
}
|
||||
|
||||
/// Static version for use in spawned tasks.
|
||||
/// Determines the outcome of a build attempt.
|
||||
fn check_retry_outcome(
|
||||
&self,
|
||||
built: &BuiltPayload,
|
||||
index: u64,
|
||||
attempt: u32,
|
||||
gas_sent: u64,
|
||||
) -> RetryOutcome {
|
||||
let gas_used = built.gas_used;
|
||||
|
||||
if gas_used + MIN_TARGET_SLACK >= self.target_gas {
|
||||
info!(
|
||||
payload = index + 1,
|
||||
gas_used,
|
||||
target_gas = self.target_gas,
|
||||
attempts = attempt,
|
||||
"Payload built successfully"
|
||||
);
|
||||
return RetryOutcome::Success;
|
||||
}
|
||||
|
||||
if attempt == MAX_BUILD_RETRIES {
|
||||
warn!(
|
||||
payload = index + 1,
|
||||
gas_used,
|
||||
target_gas = self.target_gas,
|
||||
gas_sent,
|
||||
"Underfilled after max retries, accepting payload"
|
||||
);
|
||||
return RetryOutcome::MaxRetries;
|
||||
}
|
||||
|
||||
if gas_used == 0 {
|
||||
warn!(
|
||||
payload = index + 1,
|
||||
"Zero gas used in payload, requesting fixed chunk of additional transactions"
|
||||
);
|
||||
return RetryOutcome::NeedMore(self.target_gas);
|
||||
}
|
||||
|
||||
let gas_sent_needed_total =
|
||||
(self.target_gas as u128 * gas_sent as u128).div_ceil(gas_used as u128) as u64;
|
||||
let additional = gas_sent_needed_total.saturating_sub(gas_sent);
|
||||
let additional = additional.min(self.target_gas * MAX_ADDITIONAL_GAS_MULTIPLIER);
|
||||
|
||||
if additional == 0 {
|
||||
info!(
|
||||
payload = index + 1,
|
||||
gas_used,
|
||||
target_gas = self.target_gas,
|
||||
"No additional transactions needed based on ratio"
|
||||
);
|
||||
return RetryOutcome::Success;
|
||||
}
|
||||
|
||||
let ratio = gas_used as f64 / gas_sent as f64;
|
||||
info!(
|
||||
payload = index + 1,
|
||||
gas_used,
|
||||
gas_sent,
|
||||
ratio = format!("{:.4}", ratio),
|
||||
additional_gas = additional,
|
||||
"Underfilled, collecting more transactions for retry"
|
||||
);
|
||||
RetryOutcome::NeedMore(additional)
|
||||
}
|
||||
|
||||
/// Build a single payload via `testing_buildBlockV1`.
|
||||
async fn build_payload_static(
|
||||
testing_provider: &RootProvider<AnyNetwork>,
|
||||
transactions: &[Bytes],
|
||||
@@ -591,8 +774,9 @@ impl Command {
|
||||
let block_hash = inner.block_hash;
|
||||
let block_number = inner.block_number;
|
||||
let timestamp = inner.timestamp;
|
||||
let gas_used = inner.gas_used;
|
||||
|
||||
Ok(BuiltPayload { block_number, envelope: v4_envelope, block_hash, timestamp })
|
||||
Ok(BuiltPayload { block_number, envelope: v4_envelope, block_hash, timestamp, gas_used })
|
||||
}
|
||||
|
||||
/// Save a payload to disk.
|
||||
|
||||
@@ -1,6 +1,33 @@
|
||||
//! Common helpers for reth-bench commands.
|
||||
|
||||
use crate::valid_payload::call_forkchoice_updated;
|
||||
use eyre::Result;
|
||||
use std::io::{BufReader, Read};
|
||||
|
||||
/// Read input from either a file path or stdin.
|
||||
pub(crate) fn read_input(path: Option<&str>) -> Result<String> {
|
||||
Ok(match path {
|
||||
Some(path) => reth_fs_util::read_to_string(path)?,
|
||||
None => String::from_utf8(
|
||||
BufReader::new(std::io::stdin()).bytes().collect::<Result<Vec<_>, _>>()?,
|
||||
)?,
|
||||
})
|
||||
}
|
||||
|
||||
/// Load JWT secret from either a file or use the provided string directly.
|
||||
pub(crate) fn load_jwt_secret(jwt_secret: Option<&str>) -> Result<Option<String>> {
|
||||
match jwt_secret {
|
||||
Some(secret) => {
|
||||
// Try to read as file first
|
||||
match std::fs::read_to_string(secret) {
|
||||
Ok(contents) => Ok(Some(contents.trim().to_string())),
|
||||
// If file read fails, use the string directly
|
||||
Err(_) => Ok(Some(secret.to_string())),
|
||||
}
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Parses a gas limit value with optional suffix: K for thousand, M for million, G for billion.
|
||||
///
|
||||
|
||||
@@ -15,6 +15,7 @@ pub use generate_big_block::{
|
||||
mod new_payload_fcu;
|
||||
mod new_payload_only;
|
||||
mod output;
|
||||
mod persistence_waiter;
|
||||
mod replay_payloads;
|
||||
mod send_invalid_payload;
|
||||
mod send_payload;
|
||||
|
||||
@@ -15,28 +15,23 @@ use crate::{
|
||||
output::{
|
||||
write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
|
||||
},
|
||||
persistence_waiter::{
|
||||
derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
|
||||
PERSISTENCE_CHECKPOINT_TIMEOUT,
|
||||
},
|
||||
},
|
||||
valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
|
||||
};
|
||||
use alloy_eips::BlockNumHash;
|
||||
use alloy_network::Ethereum;
|
||||
use alloy_provider::{Provider, RootProvider};
|
||||
use alloy_pubsub::SubscriptionStream;
|
||||
use alloy_rpc_client::RpcClient;
|
||||
use alloy_provider::Provider;
|
||||
use alloy_rpc_types_engine::ForkchoiceState;
|
||||
use alloy_transport_ws::WsConnect;
|
||||
use clap::Parser;
|
||||
use eyre::{Context, OptionExt};
|
||||
use futures::StreamExt;
|
||||
use humantime::parse_duration;
|
||||
use reth_cli_runner::CliContext;
|
||||
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
|
||||
use reth_node_core::args::BenchmarkArgs;
|
||||
use std::time::{Duration, Instant};
|
||||
use tracing::{debug, info};
|
||||
use url::Url;
|
||||
|
||||
const PERSISTENCE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
/// `reth benchmark new-payload-fcu` command
|
||||
#[derive(Debug, Parser)]
|
||||
@@ -105,7 +100,11 @@ impl Command {
|
||||
let mut waiter = match (self.wait_time, self.wait_for_persistence) {
|
||||
(Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)),
|
||||
(None, true) => {
|
||||
let sub = self.setup_persistence_subscription().await?;
|
||||
let ws_url = derive_ws_rpc_url(
|
||||
self.benchmark.ws_rpc_url.as_deref(),
|
||||
&self.benchmark.engine_rpc_url,
|
||||
)?;
|
||||
let sub = setup_persistence_subscription(ws_url).await?;
|
||||
Some(PersistenceWaiter::with_subscription(
|
||||
sub,
|
||||
self.persistence_threshold,
|
||||
@@ -245,293 +244,22 @@ impl Command {
|
||||
results.into_iter().unzip();
|
||||
|
||||
if let Some(ref path) = self.benchmark.output {
|
||||
write_benchmark_results(path, &gas_output_results, combined_results)?;
|
||||
write_benchmark_results(path, &gas_output_results, &combined_results)?;
|
||||
}
|
||||
|
||||
let gas_output = TotalGasOutput::new(gas_output_results)?;
|
||||
let gas_output =
|
||||
TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
|
||||
|
||||
info!(
|
||||
total_duration=?gas_output.total_duration,
|
||||
total_gas_used=?gas_output.total_gas_used,
|
||||
blocks_processed=?gas_output.blocks_processed,
|
||||
"Total Ggas/s: {:.4}",
|
||||
gas_output.total_gigagas_per_second()
|
||||
total_gas_used = gas_output.total_gas_used,
|
||||
total_duration = ?gas_output.total_duration,
|
||||
execution_duration = ?gas_output.execution_duration,
|
||||
blocks_processed = gas_output.blocks_processed,
|
||||
wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()),
|
||||
execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()),
|
||||
"Benchmark complete"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the websocket RPC URL used for the persistence subscription.
|
||||
///
|
||||
/// Preference:
|
||||
/// - If `--ws-rpc-url` is provided, use it directly.
|
||||
/// - Otherwise, derive a WS RPC URL from `--engine-rpc-url`.
|
||||
///
|
||||
/// The persistence subscription endpoint (`reth_subscribePersistedBlock`) is exposed on
|
||||
/// the regular RPC server (WS port, usually 8546), not on the engine API port (usually 8551).
|
||||
/// Since `BenchmarkArgs` only has the engine URL by default, we convert the scheme
|
||||
/// (http→ws, https→wss) and force the port to 8546.
|
||||
fn derive_ws_rpc_url(&self) -> eyre::Result<Url> {
|
||||
if let Some(ref ws_url) = self.benchmark.ws_rpc_url {
|
||||
let parsed: Url = ws_url
|
||||
.parse()
|
||||
.wrap_err_with(|| format!("Failed to parse WebSocket RPC URL: {ws_url}"))?;
|
||||
info!(target: "reth-bench", ws_url = %parsed, "Using provided WebSocket RPC URL");
|
||||
Ok(parsed)
|
||||
} else {
|
||||
let derived = engine_url_to_ws_url(&self.benchmark.engine_rpc_url)?;
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
engine_url = %self.benchmark.engine_rpc_url,
|
||||
%derived,
|
||||
"Derived WebSocket RPC URL from engine RPC URL"
|
||||
);
|
||||
Ok(derived)
|
||||
}
|
||||
}
|
||||
|
||||
/// Establishes a websocket connection and subscribes to `reth_subscribePersistedBlock`.
|
||||
async fn setup_persistence_subscription(&self) -> eyre::Result<PersistenceSubscription> {
|
||||
let ws_url = self.derive_ws_rpc_url()?;
|
||||
|
||||
info!("Connecting to WebSocket at {} for persistence subscription", ws_url);
|
||||
|
||||
let ws_connect = WsConnect::new(ws_url.to_string());
|
||||
let client = RpcClient::connect_pubsub(ws_connect)
|
||||
.await
|
||||
.wrap_err("Failed to connect to WebSocket RPC endpoint")?;
|
||||
let provider: RootProvider<Ethereum> = RootProvider::new(client);
|
||||
|
||||
let subscription = provider
|
||||
.subscribe_to::<BlockNumHash>("reth_subscribePersistedBlock")
|
||||
.await
|
||||
.wrap_err("Failed to subscribe to persistence notifications")?;
|
||||
|
||||
info!("Subscribed to persistence notifications");
|
||||
|
||||
Ok(PersistenceSubscription::new(provider, subscription.into_stream()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts an engine API URL to the default RPC websocket URL.
|
||||
///
|
||||
/// Transformations:
|
||||
/// - `http` → `ws`
|
||||
/// - `https` → `wss`
|
||||
/// - `ws` / `wss` keep their scheme
|
||||
/// - Port is always set to `8546`, reth's default RPC websocket port.
|
||||
///
|
||||
/// This is used when we only know the engine API URL (typically `:8551`) but
|
||||
/// need to connect to the node's WS RPC endpoint for persistence events.
|
||||
fn engine_url_to_ws_url(engine_url: &str) -> eyre::Result<Url> {
|
||||
let url: Url = engine_url
|
||||
.parse()
|
||||
.wrap_err_with(|| format!("Failed to parse engine RPC URL: {engine_url}"))?;
|
||||
|
||||
let mut ws_url = url.clone();
|
||||
|
||||
match ws_url.scheme() {
|
||||
"http" => ws_url
|
||||
.set_scheme("ws")
|
||||
.map_err(|_| eyre::eyre!("Failed to set WS scheme for URL: {url}"))?,
|
||||
"https" => ws_url
|
||||
.set_scheme("wss")
|
||||
.map_err(|_| eyre::eyre!("Failed to set WSS scheme for URL: {url}"))?,
|
||||
"ws" | "wss" => {}
|
||||
scheme => {
|
||||
return Err(eyre::eyre!(
|
||||
"Unsupported URL scheme '{scheme}' for URL: {url}. Expected http, https, ws, or wss."
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
ws_url.set_port(Some(8546)).map_err(|_| eyre::eyre!("Failed to set port for URL: {url}"))?;
|
||||
|
||||
Ok(ws_url)
|
||||
}
|
||||
|
||||
/// Waits until the persistence subscription reports that `target` has been persisted.
|
||||
///
|
||||
/// Consumes subscription events until `last_persisted >= target`, or returns an error if:
|
||||
/// - the subscription stream ends unexpectedly, or
|
||||
/// - `timeout` elapses before `target` is observed.
|
||||
async fn wait_for_persistence(
|
||||
stream: &mut SubscriptionStream<BlockNumHash>,
|
||||
target: u64,
|
||||
last_persisted: &mut u64,
|
||||
timeout: Duration,
|
||||
) -> eyre::Result<()> {
|
||||
tokio::time::timeout(timeout, async {
|
||||
while *last_persisted < target {
|
||||
match stream.next().await {
|
||||
Some(persisted) => {
|
||||
*last_persisted = persisted.number;
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
persisted_block = ?last_persisted,
|
||||
"Received persistence notification"
|
||||
);
|
||||
}
|
||||
None => {
|
||||
return Err(eyre::eyre!("Persistence subscription closed unexpectedly"));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
eyre::eyre!(
|
||||
"Persistence timeout: target block {} not persisted within {:?}. Last persisted: {}",
|
||||
target,
|
||||
timeout,
|
||||
last_persisted
|
||||
)
|
||||
})?
|
||||
}
|
||||
|
||||
/// Wrapper that keeps both the subscription stream and the underlying provider alive.
|
||||
/// The provider must be kept alive for the subscription to continue receiving events.
|
||||
struct PersistenceSubscription {
|
||||
_provider: RootProvider<Ethereum>,
|
||||
stream: SubscriptionStream<BlockNumHash>,
|
||||
}
|
||||
|
||||
impl PersistenceSubscription {
|
||||
const fn new(
|
||||
provider: RootProvider<Ethereum>,
|
||||
stream: SubscriptionStream<BlockNumHash>,
|
||||
) -> Self {
|
||||
Self { _provider: provider, stream }
|
||||
}
|
||||
|
||||
const fn stream_mut(&mut self) -> &mut SubscriptionStream<BlockNumHash> {
|
||||
&mut self.stream
|
||||
}
|
||||
}
|
||||
|
||||
/// Encapsulates the block waiting logic.
|
||||
///
|
||||
/// Provides a simple `on_block()` interface that handles both:
|
||||
/// - Fixed duration waits (when `wait_time` is set)
|
||||
/// - Persistence-based waits (when `subscription` is set)
|
||||
///
|
||||
/// For persistence mode, waits after every `(threshold + 1)` blocks.
|
||||
struct PersistenceWaiter {
|
||||
wait_time: Option<Duration>,
|
||||
subscription: Option<PersistenceSubscription>,
|
||||
blocks_sent: u64,
|
||||
last_persisted: u64,
|
||||
threshold: u64,
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
impl PersistenceWaiter {
|
||||
const fn with_duration(wait_time: Duration) -> Self {
|
||||
Self {
|
||||
wait_time: Some(wait_time),
|
||||
subscription: None,
|
||||
blocks_sent: 0,
|
||||
last_persisted: 0,
|
||||
threshold: 0,
|
||||
timeout: Duration::ZERO,
|
||||
}
|
||||
}
|
||||
|
||||
const fn with_subscription(
|
||||
subscription: PersistenceSubscription,
|
||||
threshold: u64,
|
||||
timeout: Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
wait_time: None,
|
||||
subscription: Some(subscription),
|
||||
blocks_sent: 0,
|
||||
last_persisted: 0,
|
||||
threshold,
|
||||
timeout,
|
||||
}
|
||||
}
|
||||
|
||||
/// Called once per block. Waits based on the configured mode.
|
||||
#[allow(clippy::manual_is_multiple_of)]
|
||||
async fn on_block(&mut self, block_number: u64) -> eyre::Result<()> {
|
||||
if let Some(wait_time) = self.wait_time {
|
||||
tokio::time::sleep(wait_time).await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let Some(ref mut subscription) = self.subscription else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
self.blocks_sent += 1;
|
||||
|
||||
if self.blocks_sent % (self.threshold + 1) == 0 {
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
target_block = ?block_number,
|
||||
last_persisted = self.last_persisted,
|
||||
blocks_sent = self.blocks_sent,
|
||||
"Waiting for persistence"
|
||||
);
|
||||
|
||||
wait_for_persistence(
|
||||
subscription.stream_mut(),
|
||||
block_number,
|
||||
&mut self.last_persisted,
|
||||
self.timeout,
|
||||
)
|
||||
.await?;
|
||||
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
persisted = self.last_persisted,
|
||||
"Persistence caught up"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_engine_url_to_ws_url() {
|
||||
// http -> ws, always uses port 8546
|
||||
let result = engine_url_to_ws_url("http://localhost:8551").unwrap();
|
||||
assert_eq!(result.as_str(), "ws://localhost:8546/");
|
||||
|
||||
// https -> wss
|
||||
let result = engine_url_to_ws_url("https://localhost:8551").unwrap();
|
||||
assert_eq!(result.as_str(), "wss://localhost:8546/");
|
||||
|
||||
// Custom engine port still maps to 8546
|
||||
let result = engine_url_to_ws_url("http://localhost:9551").unwrap();
|
||||
assert_eq!(result.port(), Some(8546));
|
||||
|
||||
// Already ws passthrough
|
||||
let result = engine_url_to_ws_url("ws://localhost:8546").unwrap();
|
||||
assert_eq!(result.scheme(), "ws");
|
||||
|
||||
// Invalid inputs
|
||||
assert!(engine_url_to_ws_url("ftp://localhost:8551").is_err());
|
||||
assert!(engine_url_to_ws_url("not a valid url").is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_waiter_with_duration() {
|
||||
let mut waiter = PersistenceWaiter::with_duration(Duration::from_millis(1));
|
||||
|
||||
let start = Instant::now();
|
||||
waiter.on_block(1).await.unwrap();
|
||||
waiter.on_block(2).await.unwrap();
|
||||
waiter.on_block(3).await.unwrap();
|
||||
|
||||
// Should have waited ~3ms total
|
||||
assert!(start.elapsed() >= Duration::from_millis(3));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ use csv::Writer;
|
||||
use eyre::OptionExt;
|
||||
use reth_primitives_traits::constants::GIGAGAS;
|
||||
use serde::{ser::SerializeStruct, Deserialize, Serialize};
|
||||
use std::{path::Path, time::Duration};
|
||||
use std::{fs, path::Path, time::Duration};
|
||||
use tracing::info;
|
||||
|
||||
/// This is the suffix for gas output csv files.
|
||||
@@ -158,29 +158,58 @@ pub(crate) struct TotalGasRow {
|
||||
pub(crate) struct TotalGasOutput {
|
||||
/// The total gas used in the benchmark.
|
||||
pub(crate) total_gas_used: u64,
|
||||
/// The total duration of the benchmark.
|
||||
/// The total wall-clock duration of the benchmark (includes wait times).
|
||||
pub(crate) total_duration: Duration,
|
||||
/// The total gas used per second.
|
||||
pub(crate) total_gas_per_second: f64,
|
||||
/// The total execution-only duration (excludes wait times).
|
||||
pub(crate) execution_duration: Duration,
|
||||
/// The number of blocks processed.
|
||||
pub(crate) blocks_processed: u64,
|
||||
}
|
||||
|
||||
impl TotalGasOutput {
|
||||
/// Create a new [`TotalGasOutput`] from a list of [`TotalGasRow`].
|
||||
/// Create a new [`TotalGasOutput`] from gas rows only.
|
||||
///
|
||||
/// Use this when execution-only timing is not available (e.g., `new_payload_only`).
|
||||
/// `execution_duration` will equal `total_duration`.
|
||||
pub(crate) fn new(rows: Vec<TotalGasRow>) -> eyre::Result<Self> {
|
||||
// the duration is obtained from the last row
|
||||
let total_duration = rows.last().map(|row| row.time).ok_or_eyre("empty results")?;
|
||||
let blocks_processed = rows.len() as u64;
|
||||
let total_gas_used: u64 = rows.into_iter().map(|row| row.gas_used).sum();
|
||||
let total_gas_per_second = total_gas_used as f64 / total_duration.as_secs_f64();
|
||||
|
||||
Ok(Self { total_gas_used, total_duration, total_gas_per_second, blocks_processed })
|
||||
Ok(Self {
|
||||
total_gas_used,
|
||||
total_duration,
|
||||
execution_duration: total_duration,
|
||||
blocks_processed,
|
||||
})
|
||||
}
|
||||
|
||||
/// Return the total gigagas per second.
|
||||
/// Create a new [`TotalGasOutput`] from gas rows and combined results.
|
||||
///
|
||||
/// - `rows`: Used for total gas and wall-clock duration
|
||||
/// - `combined_results`: Used for execution-only duration (sum of `total_latency`)
|
||||
pub(crate) fn with_combined_results(
|
||||
rows: Vec<TotalGasRow>,
|
||||
combined_results: &[CombinedResult],
|
||||
) -> eyre::Result<Self> {
|
||||
let total_duration = rows.last().map(|row| row.time).ok_or_eyre("empty results")?;
|
||||
let blocks_processed = rows.len() as u64;
|
||||
let total_gas_used: u64 = rows.into_iter().map(|row| row.gas_used).sum();
|
||||
|
||||
// Sum execution-only time from combined results
|
||||
let execution_duration: Duration = combined_results.iter().map(|r| r.total_latency).sum();
|
||||
|
||||
Ok(Self { total_gas_used, total_duration, execution_duration, blocks_processed })
|
||||
}
|
||||
|
||||
/// Return the total gigagas per second based on wall-clock time.
|
||||
pub(crate) fn total_gigagas_per_second(&self) -> f64 {
|
||||
self.total_gas_per_second / GIGAGAS as f64
|
||||
self.total_gas_used as f64 / self.total_duration.as_secs_f64() / GIGAGAS as f64
|
||||
}
|
||||
|
||||
/// Return the execution-only gigagas per second (excludes wait times).
|
||||
pub(crate) fn execution_gigagas_per_second(&self) -> f64 {
|
||||
self.total_gas_used as f64 / self.execution_duration.as_secs_f64() / GIGAGAS as f64
|
||||
}
|
||||
}
|
||||
|
||||
@@ -192,8 +221,10 @@ impl TotalGasOutput {
|
||||
pub(crate) fn write_benchmark_results(
|
||||
output_dir: &Path,
|
||||
gas_results: &[TotalGasRow],
|
||||
combined_results: Vec<CombinedResult>,
|
||||
combined_results: &[CombinedResult],
|
||||
) -> eyre::Result<()> {
|
||||
fs::create_dir_all(output_dir)?;
|
||||
|
||||
let output_path = output_dir.join(COMBINED_OUTPUT_SUFFIX);
|
||||
info!("Writing engine api call latency output to file: {:?}", output_path);
|
||||
let mut writer = Writer::from_path(&output_path)?;
|
||||
|
||||
304
bin/reth-bench/src/bench/persistence_waiter.rs
Normal file
304
bin/reth-bench/src/bench/persistence_waiter.rs
Normal file
@@ -0,0 +1,304 @@
|
||||
//! Persistence waiting utilities for benchmarks.
|
||||
//!
|
||||
//! Provides waiting behavior to control benchmark pacing:
|
||||
//! - **Fixed duration waits**: Sleep for a fixed time between blocks
|
||||
//! - **Persistence-based waits**: Wait for blocks to be persisted using
|
||||
//! `reth_subscribePersistedBlock` subscription
|
||||
|
||||
use alloy_eips::BlockNumHash;
|
||||
use alloy_network::Ethereum;
|
||||
use alloy_provider::{Provider, RootProvider};
|
||||
use alloy_pubsub::SubscriptionStream;
|
||||
use alloy_rpc_client::RpcClient;
|
||||
use alloy_transport_ws::WsConnect;
|
||||
use eyre::Context;
|
||||
use futures::StreamExt;
|
||||
use std::time::Duration;
|
||||
use tracing::{debug, info};
|
||||
|
||||
/// Default `WebSocket` RPC port for reth.
|
||||
const DEFAULT_WS_RPC_PORT: u16 = 8546;
|
||||
use url::Url;
|
||||
|
||||
/// Default timeout for waiting on persistence.
|
||||
pub(crate) const PERSISTENCE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
/// Returns the websocket RPC URL used for the persistence subscription.
|
||||
///
|
||||
/// Preference:
|
||||
/// - If `ws_rpc_url` is provided, use it directly.
|
||||
/// - Otherwise, derive a WS RPC URL from `engine_rpc_url`.
|
||||
///
|
||||
/// The persistence subscription endpoint (`reth_subscribePersistedBlock`) is exposed on
|
||||
/// the regular RPC server (WS port, usually 8546), not on the engine API port (usually 8551).
|
||||
/// Since we may only have the engine URL by default, we convert the scheme
|
||||
/// (http→ws, https→wss) and force the port to 8546.
|
||||
pub(crate) fn derive_ws_rpc_url(
|
||||
ws_rpc_url: Option<&str>,
|
||||
engine_rpc_url: &str,
|
||||
) -> eyre::Result<Url> {
|
||||
if let Some(ws_url) = ws_rpc_url {
|
||||
let parsed: Url = ws_url
|
||||
.parse()
|
||||
.wrap_err_with(|| format!("Failed to parse WebSocket RPC URL: {ws_url}"))?;
|
||||
info!(target: "reth-bench", ws_url = %parsed, "Using provided WebSocket RPC URL");
|
||||
Ok(parsed)
|
||||
} else {
|
||||
let derived = engine_url_to_ws_url(engine_rpc_url)?;
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
engine_url = %engine_rpc_url,
|
||||
%derived,
|
||||
"Derived WebSocket RPC URL from engine RPC URL"
|
||||
);
|
||||
Ok(derived)
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts an engine API URL to the default RPC websocket URL.
|
||||
///
|
||||
/// Transformations:
|
||||
/// - `http` → `ws`
|
||||
/// - `https` → `wss`
|
||||
/// - `ws` / `wss` keep their scheme
|
||||
/// - Port is always set to `8546`, reth's default RPC websocket port.
|
||||
///
|
||||
/// This is used when we only know the engine API URL (typically `:8551`) but
|
||||
/// need to connect to the node's WS RPC endpoint for persistence events.
|
||||
fn engine_url_to_ws_url(engine_url: &str) -> eyre::Result<Url> {
|
||||
let url: Url = engine_url
|
||||
.parse()
|
||||
.wrap_err_with(|| format!("Failed to parse engine RPC URL: {engine_url}"))?;
|
||||
|
||||
let mut ws_url = url.clone();
|
||||
|
||||
match ws_url.scheme() {
|
||||
"http" => ws_url
|
||||
.set_scheme("ws")
|
||||
.map_err(|_| eyre::eyre!("Failed to set WS scheme for URL: {url}"))?,
|
||||
"https" => ws_url
|
||||
.set_scheme("wss")
|
||||
.map_err(|_| eyre::eyre!("Failed to set WSS scheme for URL: {url}"))?,
|
||||
"ws" | "wss" => {}
|
||||
scheme => {
|
||||
return Err(eyre::eyre!(
|
||||
"Unsupported URL scheme '{scheme}' for URL: {url}. Expected http, https, ws, or wss."
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
ws_url
|
||||
.set_port(Some(DEFAULT_WS_RPC_PORT))
|
||||
.map_err(|_| eyre::eyre!("Failed to set port for URL: {url}"))?;
|
||||
|
||||
Ok(ws_url)
|
||||
}
|
||||
|
||||
/// Waits until the persistence subscription reports that `target` has been persisted.
|
||||
///
|
||||
/// Consumes subscription events until `last_persisted >= target`, or returns an error if:
|
||||
/// - the subscription stream ends unexpectedly, or
|
||||
/// - `timeout` elapses before `target` is observed.
|
||||
async fn wait_for_persistence(
|
||||
stream: &mut SubscriptionStream<BlockNumHash>,
|
||||
target: u64,
|
||||
last_persisted: &mut u64,
|
||||
timeout: Duration,
|
||||
) -> eyre::Result<()> {
|
||||
tokio::time::timeout(timeout, async {
|
||||
while *last_persisted < target {
|
||||
match stream.next().await {
|
||||
Some(persisted) => {
|
||||
*last_persisted = persisted.number;
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
persisted_block = ?last_persisted,
|
||||
"Received persistence notification"
|
||||
);
|
||||
}
|
||||
None => {
|
||||
return Err(eyre::eyre!("Persistence subscription closed unexpectedly"));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
eyre::eyre!(
|
||||
"Persistence timeout: target block {} not persisted within {:?}. Last persisted: {}",
|
||||
target,
|
||||
timeout,
|
||||
last_persisted
|
||||
)
|
||||
})?
|
||||
}
|
||||
|
||||
/// Wrapper that keeps both the subscription stream and the underlying provider alive.
|
||||
/// The provider must be kept alive for the subscription to continue receiving events.
|
||||
pub(crate) struct PersistenceSubscription {
|
||||
_provider: RootProvider<Ethereum>,
|
||||
stream: SubscriptionStream<BlockNumHash>,
|
||||
}
|
||||
|
||||
impl PersistenceSubscription {
|
||||
const fn new(
|
||||
provider: RootProvider<Ethereum>,
|
||||
stream: SubscriptionStream<BlockNumHash>,
|
||||
) -> Self {
|
||||
Self { _provider: provider, stream }
|
||||
}
|
||||
|
||||
const fn stream_mut(&mut self) -> &mut SubscriptionStream<BlockNumHash> {
|
||||
&mut self.stream
|
||||
}
|
||||
}
|
||||
|
||||
/// Establishes a websocket connection and subscribes to `reth_subscribePersistedBlock`.
|
||||
pub(crate) async fn setup_persistence_subscription(
|
||||
ws_url: Url,
|
||||
) -> eyre::Result<PersistenceSubscription> {
|
||||
info!("Connecting to WebSocket at {} for persistence subscription", ws_url);
|
||||
|
||||
let ws_connect = WsConnect::new(ws_url.to_string());
|
||||
let client = RpcClient::connect_pubsub(ws_connect)
|
||||
.await
|
||||
.wrap_err("Failed to connect to WebSocket RPC endpoint")?;
|
||||
let provider: RootProvider<Ethereum> = RootProvider::new(client);
|
||||
|
||||
let subscription = provider
|
||||
.subscribe_to::<BlockNumHash>("reth_subscribePersistedBlock")
|
||||
.await
|
||||
.wrap_err("Failed to subscribe to persistence notifications")?;
|
||||
|
||||
info!("Subscribed to persistence notifications");
|
||||
|
||||
Ok(PersistenceSubscription::new(provider, subscription.into_stream()))
|
||||
}
|
||||
|
||||
/// Encapsulates the block waiting logic.
|
||||
///
|
||||
/// Provides a simple `on_block()` interface that handles both:
|
||||
/// - Fixed duration waits (when `wait_time` is set)
|
||||
/// - Persistence-based waits (when `subscription` is set)
|
||||
///
|
||||
/// For persistence mode, waits after every `(threshold + 1)` blocks.
|
||||
pub(crate) struct PersistenceWaiter {
|
||||
wait_time: Option<Duration>,
|
||||
subscription: Option<PersistenceSubscription>,
|
||||
blocks_sent: u64,
|
||||
last_persisted: u64,
|
||||
threshold: u64,
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
impl PersistenceWaiter {
|
||||
pub(crate) const fn with_duration(wait_time: Duration) -> Self {
|
||||
Self {
|
||||
wait_time: Some(wait_time),
|
||||
subscription: None,
|
||||
blocks_sent: 0,
|
||||
last_persisted: 0,
|
||||
threshold: 0,
|
||||
timeout: Duration::ZERO,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) const fn with_subscription(
|
||||
subscription: PersistenceSubscription,
|
||||
threshold: u64,
|
||||
timeout: Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
wait_time: None,
|
||||
subscription: Some(subscription),
|
||||
blocks_sent: 0,
|
||||
last_persisted: 0,
|
||||
threshold,
|
||||
timeout,
|
||||
}
|
||||
}
|
||||
|
||||
/// Called once per block. Waits based on the configured mode.
|
||||
#[allow(clippy::manual_is_multiple_of)]
|
||||
pub(crate) async fn on_block(&mut self, block_number: u64) -> eyre::Result<()> {
|
||||
if let Some(wait_time) = self.wait_time {
|
||||
tokio::time::sleep(wait_time).await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let Some(ref mut subscription) = self.subscription else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
self.blocks_sent += 1;
|
||||
|
||||
if self.blocks_sent % (self.threshold + 1) == 0 {
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
target_block = ?block_number,
|
||||
last_persisted = self.last_persisted,
|
||||
blocks_sent = self.blocks_sent,
|
||||
"Waiting for persistence"
|
||||
);
|
||||
|
||||
wait_for_persistence(
|
||||
subscription.stream_mut(),
|
||||
block_number,
|
||||
&mut self.last_persisted,
|
||||
self.timeout,
|
||||
)
|
||||
.await?;
|
||||
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
persisted = self.last_persisted,
|
||||
"Persistence caught up"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::time::Instant;
|
||||
|
||||
#[test]
|
||||
fn test_engine_url_to_ws_url() {
|
||||
// http -> ws, always uses port 8546
|
||||
let result = engine_url_to_ws_url("http://localhost:8551").unwrap();
|
||||
assert_eq!(result.as_str(), "ws://localhost:8546/");
|
||||
|
||||
// https -> wss
|
||||
let result = engine_url_to_ws_url("https://localhost:8551").unwrap();
|
||||
assert_eq!(result.as_str(), "wss://localhost:8546/");
|
||||
|
||||
// Custom engine port still maps to 8546
|
||||
let result = engine_url_to_ws_url("http://localhost:9551").unwrap();
|
||||
assert_eq!(result.port(), Some(8546));
|
||||
|
||||
// Already ws passthrough
|
||||
let result = engine_url_to_ws_url("ws://localhost:8546").unwrap();
|
||||
assert_eq!(result.scheme(), "ws");
|
||||
|
||||
// Invalid inputs
|
||||
assert!(engine_url_to_ws_url("ftp://localhost:8551").is_err());
|
||||
assert!(engine_url_to_ws_url("not a valid url").is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_waiter_with_duration() {
|
||||
let mut waiter = PersistenceWaiter::with_duration(Duration::from_millis(1));
|
||||
|
||||
let start = Instant::now();
|
||||
waiter.on_block(1).await.unwrap();
|
||||
waiter.on_block(2).await.unwrap();
|
||||
waiter.on_block(3).await.unwrap();
|
||||
|
||||
// Should have waited ~3ms total
|
||||
assert!(start.elapsed() >= Duration::from_millis(3));
|
||||
}
|
||||
}
|
||||
@@ -2,10 +2,27 @@
|
||||
//!
|
||||
//! This command reads `ExecutionPayloadEnvelopeV4` files from a directory and replays them
|
||||
//! in sequence using `newPayload` followed by `forkchoiceUpdated`.
|
||||
//!
|
||||
//! Supports configurable waiting behavior:
|
||||
//! - **`--wait-time`**: Fixed sleep interval between blocks.
|
||||
//! - **`--wait-for-persistence`**: Waits for every Nth block to be persisted using the
|
||||
//! `reth_subscribePersistedBlock` subscription, where N matches the engine's persistence
|
||||
//! threshold. This ensures the benchmark doesn't outpace persistence.
|
||||
//!
|
||||
//! Both options can be used together or independently.
|
||||
|
||||
use crate::{
|
||||
authenticated_transport::AuthenticatedTransportConnect,
|
||||
bench::output::GasRampPayloadFile,
|
||||
bench::{
|
||||
output::{
|
||||
write_benchmark_results, CombinedResult, GasRampPayloadFile, NewPayloadResult,
|
||||
TotalGasOutput, TotalGasRow,
|
||||
},
|
||||
persistence_waiter::{
|
||||
derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
|
||||
PERSISTENCE_CHECKPOINT_TIMEOUT,
|
||||
},
|
||||
},
|
||||
valid_payload::{call_forkchoice_updated, call_new_payload},
|
||||
};
|
||||
use alloy_primitives::B256;
|
||||
@@ -14,11 +31,16 @@ use alloy_rpc_client::ClientBuilder;
|
||||
use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret};
|
||||
use clap::Parser;
|
||||
use eyre::Context;
|
||||
use reqwest::Url;
|
||||
use humantime::parse_duration;
|
||||
use reth_cli_runner::CliContext;
|
||||
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
|
||||
use reth_node_api::EngineApiMessageVersion;
|
||||
use std::path::PathBuf;
|
||||
use std::{
|
||||
path::PathBuf,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tracing::{debug, info};
|
||||
use url::Url;
|
||||
|
||||
/// `reth bench replay-payloads` command
|
||||
///
|
||||
@@ -51,6 +73,42 @@ pub struct Command {
|
||||
/// These are replayed before the main payloads to warm up the gas limit.
|
||||
#[arg(long, value_name = "GAS_RAMP_DIR")]
|
||||
gas_ramp_dir: Option<PathBuf>,
|
||||
|
||||
/// Optional output directory for benchmark results (CSV files).
|
||||
#[arg(long, value_name = "OUTPUT")]
|
||||
output: Option<PathBuf>,
|
||||
|
||||
/// How long to wait after a forkchoice update before sending the next payload.
|
||||
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
|
||||
wait_time: Option<Duration>,
|
||||
|
||||
/// Wait for blocks to be persisted before sending the next batch.
|
||||
///
|
||||
/// When enabled, waits for every Nth block to be persisted using the
|
||||
/// `reth_subscribePersistedBlock` subscription. This ensures the benchmark
|
||||
/// doesn't outpace persistence.
|
||||
///
|
||||
/// The subscription uses the regular RPC websocket endpoint (no JWT required).
|
||||
#[arg(long, default_value = "false", verbatim_doc_comment)]
|
||||
wait_for_persistence: bool,
|
||||
|
||||
/// Engine persistence threshold used for deciding when to wait for persistence.
|
||||
///
|
||||
/// The benchmark waits after every `(threshold + 1)` blocks. By default this
|
||||
/// matches the engine's `DEFAULT_PERSISTENCE_THRESHOLD` (2), so waits occur
|
||||
/// at blocks 3, 6, 9, etc.
|
||||
#[arg(
|
||||
long = "persistence-threshold",
|
||||
value_name = "PERSISTENCE_THRESHOLD",
|
||||
default_value_t = DEFAULT_PERSISTENCE_THRESHOLD,
|
||||
verbatim_doc_comment
|
||||
)]
|
||||
persistence_threshold: u64,
|
||||
|
||||
/// Optional `WebSocket` RPC URL for persistence subscription.
|
||||
/// If not provided, derives from engine RPC URL by changing scheme to ws and port to 8546.
|
||||
#[arg(long, value_name = "WS_RPC_URL", verbatim_doc_comment)]
|
||||
ws_rpc_url: Option<String>,
|
||||
}
|
||||
|
||||
/// A loaded payload ready for execution.
|
||||
@@ -78,6 +136,33 @@ impl Command {
|
||||
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
|
||||
info!(payload_dir = %self.payload_dir.display(), "Replaying payloads");
|
||||
|
||||
// Log mode configuration
|
||||
if let Some(duration) = self.wait_time {
|
||||
info!("Using wait-time mode with {}ms delay between blocks", duration.as_millis());
|
||||
}
|
||||
if self.wait_for_persistence {
|
||||
info!(
|
||||
"Persistence waiting enabled (waits after every {} blocks to match engine gap > {} behavior)",
|
||||
self.persistence_threshold + 1,
|
||||
self.persistence_threshold
|
||||
);
|
||||
}
|
||||
|
||||
// Set up waiter based on configured options (duration takes precedence)
|
||||
let mut waiter = match (self.wait_time, self.wait_for_persistence) {
|
||||
(Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)),
|
||||
(None, true) => {
|
||||
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
|
||||
let sub = setup_persistence_subscription(ws_url).await?;
|
||||
Some(PersistenceWaiter::with_subscription(
|
||||
sub,
|
||||
self.persistence_threshold,
|
||||
PERSISTENCE_CHECKPOINT_TIMEOUT,
|
||||
))
|
||||
}
|
||||
(None, false) => None,
|
||||
};
|
||||
|
||||
// Set up authenticated engine provider
|
||||
let jwt =
|
||||
std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
|
||||
@@ -144,6 +229,11 @@ impl Command {
|
||||
call_forkchoice_updated(&auth_provider, payload.version, fcu_state, None).await?;
|
||||
|
||||
info!(gas_ramp_payload = i + 1, "Gas ramp payload executed successfully");
|
||||
|
||||
if let Some(w) = &mut waiter {
|
||||
w.on_block(payload.block_number).await?;
|
||||
}
|
||||
|
||||
parent_hash = payload.file.block_hash;
|
||||
}
|
||||
|
||||
@@ -151,22 +241,112 @@ impl Command {
|
||||
info!(count = gas_ramp_payloads.len(), "All gas ramp payloads replayed");
|
||||
}
|
||||
|
||||
let mut results = Vec::new();
|
||||
let total_benchmark_duration = Instant::now();
|
||||
|
||||
for (i, payload) in payloads.iter().enumerate() {
|
||||
info!(
|
||||
let envelope = &payload.envelope;
|
||||
let block_hash = payload.block_hash;
|
||||
let execution_payload = &envelope.envelope_inner.execution_payload;
|
||||
let inner_payload = &execution_payload.payload_inner.payload_inner;
|
||||
|
||||
let gas_used = inner_payload.gas_used;
|
||||
let gas_limit = inner_payload.gas_limit;
|
||||
let block_number = inner_payload.block_number;
|
||||
let transaction_count =
|
||||
execution_payload.payload_inner.payload_inner.transactions.len() as u64;
|
||||
|
||||
debug!(
|
||||
payload = i + 1,
|
||||
total = payloads.len(),
|
||||
index = payload.index,
|
||||
block_hash = %payload.block_hash,
|
||||
block_hash = %block_hash,
|
||||
"Executing payload (newPayload + FCU)"
|
||||
);
|
||||
|
||||
self.execute_payload_v4(&auth_provider, &payload.envelope, parent_hash).await?;
|
||||
let start = Instant::now();
|
||||
|
||||
info!(payload = i + 1, "Payload executed successfully");
|
||||
parent_hash = payload.block_hash;
|
||||
debug!(
|
||||
method = "engine_newPayloadV4",
|
||||
block_hash = %block_hash,
|
||||
"Sending newPayload"
|
||||
);
|
||||
|
||||
let status = auth_provider
|
||||
.new_payload_v4(
|
||||
execution_payload.clone(),
|
||||
vec![],
|
||||
B256::ZERO,
|
||||
envelope.execution_requests.to_vec(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
|
||||
|
||||
if !status.is_valid() {
|
||||
return Err(eyre::eyre!("Payload rejected: {:?}", status));
|
||||
}
|
||||
|
||||
let fcu_state = ForkchoiceState {
|
||||
head_block_hash: block_hash,
|
||||
safe_block_hash: parent_hash,
|
||||
finalized_block_hash: parent_hash,
|
||||
};
|
||||
|
||||
debug!(method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated");
|
||||
|
||||
let fcu_result = auth_provider.fork_choice_updated_v3(fcu_state, None).await?;
|
||||
|
||||
let total_latency = start.elapsed();
|
||||
let fcu_latency = total_latency - new_payload_result.latency;
|
||||
|
||||
let combined_result = CombinedResult {
|
||||
block_number,
|
||||
gas_limit,
|
||||
transaction_count,
|
||||
new_payload_result,
|
||||
fcu_latency,
|
||||
total_latency,
|
||||
};
|
||||
|
||||
let current_duration = total_benchmark_duration.elapsed();
|
||||
info!(%combined_result);
|
||||
|
||||
if let Some(w) = &mut waiter {
|
||||
w.on_block(block_number).await?;
|
||||
}
|
||||
|
||||
let gas_row =
|
||||
TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
|
||||
results.push((gas_row, combined_result));
|
||||
|
||||
debug!(?status, ?fcu_result, "Payload executed successfully");
|
||||
parent_hash = block_hash;
|
||||
}
|
||||
|
||||
info!(count = payloads.len(), "All payloads replayed successfully");
|
||||
// Drop waiter - we don't need to wait for final blocks to persist
|
||||
// since the benchmark goal is measuring Ggas/s of newPayload/FCU, not persistence.
|
||||
drop(waiter);
|
||||
|
||||
let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
|
||||
results.into_iter().unzip();
|
||||
|
||||
if let Some(ref path) = self.output {
|
||||
write_benchmark_results(path, &gas_output_results, &combined_results)?;
|
||||
}
|
||||
|
||||
let gas_output =
|
||||
TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
|
||||
info!(
|
||||
total_gas_used = gas_output.total_gas_used,
|
||||
total_duration = ?gas_output.total_duration,
|
||||
execution_duration = ?gas_output.execution_duration,
|
||||
blocks_processed = gas_output.blocks_processed,
|
||||
wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()),
|
||||
execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()),
|
||||
"Benchmark complete"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -284,49 +464,4 @@ impl Command {
|
||||
|
||||
Ok(payloads)
|
||||
}
|
||||
|
||||
async fn execute_payload_v4(
|
||||
&self,
|
||||
provider: &RootProvider<AnyNetwork>,
|
||||
envelope: &ExecutionPayloadEnvelopeV4,
|
||||
parent_hash: B256,
|
||||
) -> eyre::Result<()> {
|
||||
let block_hash =
|
||||
envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
|
||||
|
||||
debug!(
|
||||
method = "engine_newPayloadV4",
|
||||
block_hash = %block_hash,
|
||||
"Sending newPayload"
|
||||
);
|
||||
|
||||
let status = provider
|
||||
.new_payload_v4(
|
||||
envelope.envelope_inner.execution_payload.clone(),
|
||||
vec![],
|
||||
B256::ZERO,
|
||||
envelope.execution_requests.to_vec(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
info!(?status, "newPayloadV4 response");
|
||||
|
||||
if !status.is_valid() {
|
||||
return Err(eyre::eyre!("Payload rejected: {:?}", status));
|
||||
}
|
||||
|
||||
let fcu_state = ForkchoiceState {
|
||||
head_block_hash: block_hash,
|
||||
safe_block_hash: parent_hash,
|
||||
finalized_block_hash: parent_hash,
|
||||
};
|
||||
|
||||
debug!(method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated");
|
||||
|
||||
let fcu_result = provider.fork_choice_updated_v3(fcu_state, None).await?;
|
||||
|
||||
info!(?fcu_result, "forkchoiceUpdatedV3 response");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
mod invalidation;
|
||||
use invalidation::InvalidationConfig;
|
||||
|
||||
use super::helpers::{load_jwt_secret, read_input};
|
||||
use alloy_primitives::{Address, B256};
|
||||
use alloy_provider::network::AnyRpcBlock;
|
||||
use alloy_rpc_types_engine::ExecutionPayload;
|
||||
@@ -10,7 +11,7 @@ use clap::Parser;
|
||||
use eyre::{OptionExt, Result};
|
||||
use op_alloy_consensus::OpTxEnvelope;
|
||||
use reth_cli_runner::CliContext;
|
||||
use std::io::{BufReader, Read, Write};
|
||||
use std::io::Write;
|
||||
|
||||
/// Command for generating and sending an invalid `engine_newPayload` request.
|
||||
///
|
||||
@@ -180,27 +181,6 @@ enum Mode {
|
||||
}
|
||||
|
||||
impl Command {
|
||||
/// Read input from either a file or stdin
|
||||
fn read_input(&self) -> Result<String> {
|
||||
Ok(match &self.path {
|
||||
Some(path) => reth_fs_util::read_to_string(path)?,
|
||||
None => String::from_utf8(
|
||||
BufReader::new(std::io::stdin()).bytes().collect::<Result<Vec<_>, _>>()?,
|
||||
)?,
|
||||
})
|
||||
}
|
||||
|
||||
/// Load JWT secret from either a file or use the provided string directly
|
||||
fn load_jwt_secret(&self) -> Result<Option<String>> {
|
||||
match &self.jwt_secret {
|
||||
Some(secret) => match std::fs::read_to_string(secret) {
|
||||
Ok(contents) => Ok(Some(contents.trim().to_string())),
|
||||
Err(_) => Ok(Some(secret.clone())),
|
||||
},
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Build `InvalidationConfig` from command flags
|
||||
const fn build_invalidation_config(&self) -> InvalidationConfig {
|
||||
InvalidationConfig {
|
||||
@@ -236,8 +216,8 @@ impl Command {
|
||||
|
||||
/// Execute the command
|
||||
pub async fn execute(self, _ctx: CliContext) -> Result<()> {
|
||||
let block_json = self.read_input()?;
|
||||
let jwt_secret = self.load_jwt_secret()?;
|
||||
let block_json = read_input(self.path.as_deref())?;
|
||||
let jwt_secret = load_jwt_secret(self.jwt_secret.as_deref())?;
|
||||
|
||||
let block = serde_json::from_str::<AnyRpcBlock>(&block_json)?
|
||||
.into_inner()
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
use super::helpers::{load_jwt_secret, read_input};
|
||||
use alloy_provider::network::AnyRpcBlock;
|
||||
use alloy_rpc_types_engine::ExecutionPayload;
|
||||
use clap::Parser;
|
||||
use eyre::{OptionExt, Result};
|
||||
use op_alloy_consensus::OpTxEnvelope;
|
||||
use reth_cli_runner::CliContext;
|
||||
use std::io::{BufReader, Read, Write};
|
||||
use std::io::Write;
|
||||
|
||||
/// Command for generating and sending an `engine_newPayload` request constructed from an RPC
|
||||
/// block.
|
||||
@@ -51,38 +52,13 @@ enum Mode {
|
||||
}
|
||||
|
||||
impl Command {
|
||||
/// Read input from either a file or stdin
|
||||
fn read_input(&self) -> Result<String> {
|
||||
Ok(match &self.path {
|
||||
Some(path) => reth_fs_util::read_to_string(path)?,
|
||||
None => String::from_utf8(
|
||||
BufReader::new(std::io::stdin()).bytes().collect::<Result<Vec<_>, _>>()?,
|
||||
)?,
|
||||
})
|
||||
}
|
||||
|
||||
/// Load JWT secret from either a file or use the provided string directly
|
||||
fn load_jwt_secret(&self) -> Result<Option<String>> {
|
||||
match &self.jwt_secret {
|
||||
Some(secret) => {
|
||||
// Try to read as file first
|
||||
match std::fs::read_to_string(secret) {
|
||||
Ok(contents) => Ok(Some(contents.trim().to_string())),
|
||||
// If file read fails, use the string directly
|
||||
Err(_) => Ok(Some(secret.clone())),
|
||||
}
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute the generate payload command
|
||||
pub async fn execute(self, _ctx: CliContext) -> Result<()> {
|
||||
// Load block
|
||||
let block_json = self.read_input()?;
|
||||
let block_json = read_input(self.path.as_deref())?;
|
||||
|
||||
// Load JWT secret
|
||||
let jwt_secret = self.load_jwt_secret()?;
|
||||
let jwt_secret = load_jwt_secret(self.jwt_secret.as_deref())?;
|
||||
|
||||
// Parse the block
|
||||
let block = serde_json::from_str::<AnyRpcBlock>(&block_json)?
|
||||
|
||||
@@ -260,7 +260,9 @@ pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
|
||||
while !status.is_valid() {
|
||||
if status.is_invalid() {
|
||||
error!(?status, ?params, "Invalid {method}",);
|
||||
panic!("Invalid {method}: {status:?}");
|
||||
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(std::io::Error::other(
|
||||
format!("Invalid {method}: {status:?}"),
|
||||
))))
|
||||
}
|
||||
if status.is_syncing() {
|
||||
return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
|
||||
|
||||
@@ -78,6 +78,7 @@ lz4.workspace = true
|
||||
zstd.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
parking_lot.workspace = true
|
||||
tar.workspace = true
|
||||
tracing.workspace = true
|
||||
backon.workspace = true
|
||||
|
||||
@@ -17,6 +17,7 @@ mod get;
|
||||
mod list;
|
||||
mod repair_trie;
|
||||
mod settings;
|
||||
mod state;
|
||||
mod static_file_header;
|
||||
mod stats;
|
||||
/// DB List TUI
|
||||
@@ -65,6 +66,8 @@ pub enum Subcommands {
|
||||
Settings(settings::Command),
|
||||
/// Gets storage size information for an account
|
||||
AccountStorage(account_storage::Command),
|
||||
/// Gets account state and storage at a specific block
|
||||
State(state::Command),
|
||||
}
|
||||
|
||||
/// Initializes a provider factory with specified access rights, and then execute with the provided
|
||||
@@ -198,6 +201,11 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
|
||||
command.execute(&tool)?;
|
||||
});
|
||||
}
|
||||
Subcommands::State(command) => {
|
||||
db_exec!(self.env, tool, N, AccessRights::RO, {
|
||||
command.execute(&tool)?;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
412
crates/cli/commands/src/db/state.rs
Normal file
412
crates/cli/commands/src/db/state.rs
Normal file
@@ -0,0 +1,412 @@
|
||||
use alloy_primitives::{Address, BlockNumber, B256, U256};
|
||||
use clap::Parser;
|
||||
use parking_lot::Mutex;
|
||||
use reth_db_api::{
|
||||
cursor::{DbCursorRO, DbDupCursorRO},
|
||||
database::Database,
|
||||
tables,
|
||||
transaction::DbTx,
|
||||
};
|
||||
use reth_db_common::DbTool;
|
||||
use reth_node_builder::NodeTypesWithDB;
|
||||
use reth_provider::providers::ProviderNodeTypes;
|
||||
use reth_storage_api::{BlockNumReader, StateProvider, StorageSettingsCache};
|
||||
use std::{
|
||||
collections::BTreeSet,
|
||||
thread,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tracing::{error, info};
|
||||
|
||||
/// Log progress every 5 seconds
|
||||
const LOG_INTERVAL: Duration = Duration::from_secs(30);
|
||||
|
||||
/// The arguments for the `reth db state` command
|
||||
#[derive(Parser, Debug)]
|
||||
pub struct Command {
|
||||
/// The account address to get state for
|
||||
address: Address,
|
||||
|
||||
/// Block number to query state at (uses current state if not provided)
|
||||
#[arg(long, short)]
|
||||
block: Option<BlockNumber>,
|
||||
|
||||
/// Maximum number of storage slots to display
|
||||
#[arg(long, short, default_value = "100")]
|
||||
limit: usize,
|
||||
|
||||
/// Output format (table, json, csv)
|
||||
#[arg(long, short, default_value = "table")]
|
||||
format: OutputFormat,
|
||||
}
|
||||
|
||||
impl Command {
|
||||
/// Execute `db state` command
|
||||
pub fn execute<N: NodeTypesWithDB + ProviderNodeTypes>(
|
||||
self,
|
||||
tool: &DbTool<N>,
|
||||
) -> eyre::Result<()> {
|
||||
let address = self.address;
|
||||
let limit = self.limit;
|
||||
|
||||
if let Some(block) = self.block {
|
||||
self.execute_historical(tool, address, block, limit)
|
||||
} else {
|
||||
self.execute_current(tool, address, limit)
|
||||
}
|
||||
}
|
||||
|
||||
fn execute_current<N: NodeTypesWithDB + ProviderNodeTypes>(
|
||||
&self,
|
||||
tool: &DbTool<N>,
|
||||
address: Address,
|
||||
limit: usize,
|
||||
) -> eyre::Result<()> {
|
||||
let entries = tool.provider_factory.db_ref().view(|tx| {
|
||||
// Get account info
|
||||
let account = tx.get::<tables::PlainAccountState>(address)?;
|
||||
|
||||
// Get storage entries
|
||||
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
|
||||
let mut entries = Vec::new();
|
||||
let mut last_log = Instant::now();
|
||||
|
||||
let walker = cursor.walk_dup(Some(address), None)?;
|
||||
for (idx, entry) in walker.enumerate() {
|
||||
let (_, storage_entry) = entry?;
|
||||
|
||||
if storage_entry.value != U256::ZERO {
|
||||
entries.push((storage_entry.key, storage_entry.value));
|
||||
}
|
||||
|
||||
if entries.len() >= limit {
|
||||
break;
|
||||
}
|
||||
|
||||
if last_log.elapsed() >= LOG_INTERVAL {
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
address = %address,
|
||||
slots_scanned = idx,
|
||||
"Scanning storage slots"
|
||||
);
|
||||
last_log = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
Ok::<_, eyre::Report>((account, entries))
|
||||
})??;
|
||||
|
||||
let (account, storage_entries) = entries;
|
||||
|
||||
self.print_results(address, None, account, &storage_entries);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn execute_historical<N: NodeTypesWithDB + ProviderNodeTypes>(
|
||||
&self,
|
||||
tool: &DbTool<N>,
|
||||
address: Address,
|
||||
block: BlockNumber,
|
||||
limit: usize,
|
||||
) -> eyre::Result<()> {
|
||||
let provider = tool.provider_factory.history_by_block_number(block)?;
|
||||
|
||||
// Get account info at that block
|
||||
let account = provider.basic_account(&address)?;
|
||||
|
||||
// Check storage settings to determine where history is stored
|
||||
let storage_settings = tool.provider_factory.cached_storage_settings();
|
||||
let history_in_rocksdb = storage_settings.storages_history_in_rocksdb;
|
||||
|
||||
// For historical queries, enumerate keys from history indices only
|
||||
// (not PlainStorageState, which reflects current state)
|
||||
let mut storage_keys = BTreeSet::new();
|
||||
|
||||
if history_in_rocksdb {
|
||||
error!(
|
||||
target: "reth::cli",
|
||||
"Historical storage queries with RocksDB backend are not yet supported. \
|
||||
Use MDBX for storage history or query current state without --block."
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Collect keys from MDBX StorageChangeSets using parallel scanning
|
||||
self.collect_mdbx_storage_keys_parallel(tool, address, &mut storage_keys)?;
|
||||
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
address = %address,
|
||||
block = block,
|
||||
total_keys = storage_keys.len(),
|
||||
"Found storage keys to query"
|
||||
);
|
||||
|
||||
// Now query each key at the historical block using the StateProvider
|
||||
// This handles both MDBX and RocksDB backends transparently
|
||||
let mut entries = Vec::new();
|
||||
let mut last_log = Instant::now();
|
||||
|
||||
for (idx, key) in storage_keys.iter().enumerate() {
|
||||
match provider.storage(address, *key) {
|
||||
Ok(Some(value)) if value != U256::ZERO => {
|
||||
entries.push((*key, value));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if entries.len() >= limit {
|
||||
break;
|
||||
}
|
||||
|
||||
if last_log.elapsed() >= LOG_INTERVAL {
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
address = %address,
|
||||
block = block,
|
||||
keys_total = storage_keys.len(),
|
||||
slots_scanned = idx,
|
||||
slots_found = entries.len(),
|
||||
"Scanning historical storage slots"
|
||||
);
|
||||
last_log = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
self.print_results(address, Some(block), account, &entries);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Collects storage keys from MDBX StorageChangeSets using parallel block range scanning.
|
||||
fn collect_mdbx_storage_keys_parallel<N: NodeTypesWithDB + ProviderNodeTypes>(
|
||||
&self,
|
||||
tool: &DbTool<N>,
|
||||
address: Address,
|
||||
keys: &mut BTreeSet<B256>,
|
||||
) -> eyre::Result<()> {
|
||||
const CHUNK_SIZE: u64 = 500_000; // 500k blocks per thread
|
||||
let num_threads = std::thread::available_parallelism()
|
||||
.map(|p| p.get().saturating_sub(1).max(1))
|
||||
.unwrap_or(4);
|
||||
|
||||
// Get the current tip block
|
||||
let tip = tool.provider_factory.provider()?.best_block_number()?;
|
||||
|
||||
if tip == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
address = %address,
|
||||
tip,
|
||||
chunk_size = CHUNK_SIZE,
|
||||
num_threads,
|
||||
"Starting parallel MDBX changeset scan"
|
||||
);
|
||||
|
||||
// Shared state for collecting keys
|
||||
let collected_keys: Mutex<BTreeSet<B256>> = Mutex::new(BTreeSet::new());
|
||||
let total_entries_scanned = Mutex::new(0usize);
|
||||
|
||||
// Create chunk ranges
|
||||
let mut chunks: Vec<(u64, u64)> = Vec::new();
|
||||
let mut start = 0u64;
|
||||
while start <= tip {
|
||||
let end = (start + CHUNK_SIZE - 1).min(tip);
|
||||
chunks.push((start, end));
|
||||
start = end + 1;
|
||||
}
|
||||
|
||||
let chunks_ref = &chunks;
|
||||
let next_chunk = Mutex::new(0usize);
|
||||
let next_chunk_ref = &next_chunk;
|
||||
let collected_keys_ref = &collected_keys;
|
||||
let total_entries_ref = &total_entries_scanned;
|
||||
|
||||
thread::scope(|s| {
|
||||
let handles: Vec<_> = (0..num_threads)
|
||||
.map(|thread_id| {
|
||||
s.spawn(move || {
|
||||
loop {
|
||||
// Get next chunk to process
|
||||
let chunk_idx = {
|
||||
let mut idx = next_chunk_ref.lock();
|
||||
if *idx >= chunks_ref.len() {
|
||||
return Ok::<_, eyre::Report>(());
|
||||
}
|
||||
let current = *idx;
|
||||
*idx += 1;
|
||||
current
|
||||
};
|
||||
|
||||
let (chunk_start, chunk_end) = chunks_ref[chunk_idx];
|
||||
|
||||
// Open a new read transaction for this chunk
|
||||
tool.provider_factory.db_ref().view(|tx| {
|
||||
tx.disable_long_read_transaction_safety();
|
||||
|
||||
let mut changeset_cursor =
|
||||
tx.cursor_read::<tables::StorageChangeSets>()?;
|
||||
let start_key =
|
||||
reth_db_api::models::BlockNumberAddress((chunk_start, address));
|
||||
let end_key =
|
||||
reth_db_api::models::BlockNumberAddress((chunk_end, address));
|
||||
|
||||
let mut local_keys = BTreeSet::new();
|
||||
let mut entries_in_chunk = 0usize;
|
||||
|
||||
if let Ok(walker) = changeset_cursor.walk_range(start_key..=end_key)
|
||||
{
|
||||
for (block_addr, storage_entry) in walker.flatten() {
|
||||
if block_addr.address() == address {
|
||||
local_keys.insert(storage_entry.key);
|
||||
}
|
||||
entries_in_chunk += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Merge into global state
|
||||
collected_keys_ref.lock().extend(local_keys);
|
||||
*total_entries_ref.lock() += entries_in_chunk;
|
||||
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
thread_id,
|
||||
chunk_start,
|
||||
chunk_end,
|
||||
entries_in_chunk,
|
||||
"Thread completed chunk"
|
||||
);
|
||||
|
||||
Ok::<_, eyre::Report>(())
|
||||
})??;
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
for handle in handles {
|
||||
handle.join().map_err(|_| eyre::eyre!("Thread panicked"))??;
|
||||
}
|
||||
|
||||
Ok::<_, eyre::Report>(())
|
||||
})?;
|
||||
|
||||
let final_keys = collected_keys.into_inner();
|
||||
let total = *total_entries_scanned.lock();
|
||||
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
address = %address,
|
||||
total_entries = total,
|
||||
unique_keys = final_keys.len(),
|
||||
"Finished parallel MDBX changeset scan"
|
||||
);
|
||||
|
||||
keys.extend(final_keys);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn print_results(
|
||||
&self,
|
||||
address: Address,
|
||||
block: Option<BlockNumber>,
|
||||
account: Option<reth_primitives_traits::Account>,
|
||||
storage: &[(alloy_primitives::B256, U256)],
|
||||
) {
|
||||
match self.format {
|
||||
OutputFormat::Table => {
|
||||
println!("Account: {address}");
|
||||
if let Some(b) = block {
|
||||
println!("Block: {b}");
|
||||
} else {
|
||||
println!("Block: latest");
|
||||
}
|
||||
println!();
|
||||
|
||||
if let Some(acc) = account {
|
||||
println!("Nonce: {}", acc.nonce);
|
||||
println!("Balance: {} wei", acc.balance);
|
||||
if let Some(code_hash) = acc.bytecode_hash {
|
||||
println!("Code hash: {code_hash}");
|
||||
}
|
||||
} else {
|
||||
println!("Account not found");
|
||||
}
|
||||
|
||||
println!();
|
||||
println!("Storage ({} slots):", storage.len());
|
||||
println!("{:-<130}", "");
|
||||
println!("{:<66} | {:<64}", "Slot", "Value");
|
||||
println!("{:-<130}", "");
|
||||
for (key, value) in storage {
|
||||
println!("{key} | {value:#066x}");
|
||||
}
|
||||
}
|
||||
OutputFormat::Json => {
|
||||
let output = serde_json::json!({
|
||||
"address": address.to_string(),
|
||||
"block": block,
|
||||
"account": account.map(|a| serde_json::json!({
|
||||
"nonce": a.nonce,
|
||||
"balance": a.balance.to_string(),
|
||||
"code_hash": a.bytecode_hash.map(|h| h.to_string()),
|
||||
})),
|
||||
"storage": storage.iter().map(|(k, v)| {
|
||||
serde_json::json!({
|
||||
"key": k.to_string(),
|
||||
"value": format!("{v:#066x}"),
|
||||
})
|
||||
}).collect::<Vec<_>>(),
|
||||
});
|
||||
println!("{}", serde_json::to_string_pretty(&output).unwrap());
|
||||
}
|
||||
OutputFormat::Csv => {
|
||||
println!("slot,value");
|
||||
for (key, value) in storage {
|
||||
println!("{key},{value:#066x}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, clap::ValueEnum)]
|
||||
pub enum OutputFormat {
|
||||
#[default]
|
||||
Table,
|
||||
Json,
|
||||
Csv,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn parse_state_args() {
|
||||
let cmd = Command::try_parse_from([
|
||||
"state",
|
||||
"0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045",
|
||||
"--block",
|
||||
"1000000",
|
||||
])
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
cmd.address,
|
||||
"0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045".parse::<Address>().unwrap()
|
||||
);
|
||||
assert_eq!(cmd.block, Some(1000000));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_state_args_no_block() {
|
||||
let cmd = Command::try_parse_from(["state", "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"])
|
||||
.unwrap();
|
||||
assert_eq!(cmd.block, None);
|
||||
}
|
||||
}
|
||||
@@ -1,8 +1,16 @@
|
||||
//! Command that runs pruning without any limits.
|
||||
use crate::common::{AccessRights, CliNodeTypes, EnvironmentArgs};
|
||||
use clap::Parser;
|
||||
use reth_chainspec::{EthChainSpec, EthereumHardforks};
|
||||
use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks};
|
||||
use reth_cli::chainspec::ChainSpecParser;
|
||||
use reth_cli_runner::CliContext;
|
||||
use reth_node_builder::common::metrics_hooks;
|
||||
use reth_node_core::{args::MetricArgs, version::version_metadata};
|
||||
use reth_node_metrics::{
|
||||
chain::ChainSpecInfo,
|
||||
server::{MetricServer, MetricServerConfig},
|
||||
version::VersionInfo,
|
||||
};
|
||||
use reth_prune::PrunerBuilder;
|
||||
use reth_static_file::StaticFileProducer;
|
||||
use std::sync::Arc;
|
||||
@@ -13,14 +21,42 @@ use tracing::info;
|
||||
pub struct PruneCommand<C: ChainSpecParser> {
|
||||
#[command(flatten)]
|
||||
env: EnvironmentArgs<C>,
|
||||
|
||||
/// Prometheus metrics configuration.
|
||||
#[command(flatten)]
|
||||
metrics: MetricArgs,
|
||||
}
|
||||
|
||||
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> PruneCommand<C> {
|
||||
/// Execute the `prune` command
|
||||
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(self) -> eyre::Result<()> {
|
||||
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(
|
||||
self,
|
||||
ctx: CliContext,
|
||||
) -> eyre::Result<()> {
|
||||
let env = self.env.init::<N>(AccessRights::RW)?;
|
||||
let provider_factory = env.provider_factory;
|
||||
let config = env.config.prune;
|
||||
let data_dir = env.data_dir;
|
||||
|
||||
if let Some(listen_addr) = self.metrics.prometheus {
|
||||
let config = MetricServerConfig::new(
|
||||
listen_addr,
|
||||
VersionInfo {
|
||||
version: version_metadata().cargo_pkg_version.as_ref(),
|
||||
build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
|
||||
cargo_features: version_metadata().vergen_cargo_features.as_ref(),
|
||||
git_sha: version_metadata().vergen_git_sha.as_ref(),
|
||||
target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
|
||||
build_profile: version_metadata().build_profile_name.as_ref(),
|
||||
},
|
||||
ChainSpecInfo { name: provider_factory.chain_spec().chain().to_string() },
|
||||
ctx.task_executor,
|
||||
metrics_hooks(&provider_factory),
|
||||
data_dir.pprof_dumps(),
|
||||
);
|
||||
|
||||
MetricServer::new(config).serve().await?;
|
||||
}
|
||||
|
||||
// Copy data from database to static files
|
||||
info!(target: "reth::cli", "Copying data from database to static files...");
|
||||
|
||||
@@ -38,12 +38,8 @@ pub trait BlockProvider: Send + Sync + 'static {
|
||||
offset: usize,
|
||||
) -> impl Future<Output = eyre::Result<B256>> + Send {
|
||||
async move {
|
||||
let stored_hash = previous_block_hashes
|
||||
.len()
|
||||
.checked_sub(offset)
|
||||
.and_then(|index| previous_block_hashes.get(index));
|
||||
if let Some(hash) = stored_hash {
|
||||
return Ok(*hash);
|
||||
if let Some(hash) = get_hash_at_offset(previous_block_hashes, offset) {
|
||||
return Ok(hash);
|
||||
}
|
||||
|
||||
// Return zero hash if the chain isn't long enough to have the block at the offset.
|
||||
@@ -83,7 +79,7 @@ where
|
||||
/// Spawn the client to start sending FCUs and new payloads by periodically fetching recent
|
||||
/// blocks.
|
||||
pub async fn run(self) {
|
||||
let mut previous_block_hashes = AllocRingBuffer::new(64);
|
||||
let mut previous_block_hashes = AllocRingBuffer::new(65);
|
||||
let mut block_stream = {
|
||||
let (tx, rx) = mpsc::channel::<P::Block>(64);
|
||||
let block_provider = self.block_provider.clone();
|
||||
@@ -142,3 +138,60 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Looks up a block hash from the ring buffer at the given offset from the most recent entry.
|
||||
///
|
||||
/// Returns `None` if the buffer doesn't have enough entries to satisfy the offset.
|
||||
fn get_hash_at_offset(buffer: &AllocRingBuffer<B256>, offset: usize) -> Option<B256> {
|
||||
buffer.len().checked_sub(offset + 1).and_then(|index| buffer.get(index).copied())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_get_hash_at_offset() {
|
||||
let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
|
||||
|
||||
// Empty buffer returns None for any offset
|
||||
assert_eq!(get_hash_at_offset(&buffer, 0), None);
|
||||
assert_eq!(get_hash_at_offset(&buffer, 1), None);
|
||||
|
||||
// Push hashes 0..65
|
||||
for i in 0..65u8 {
|
||||
buffer.push(B256::with_last_byte(i));
|
||||
}
|
||||
|
||||
// offset=0 should return the most recent (64)
|
||||
assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(64)));
|
||||
|
||||
// offset=32 (safe block) should return hash 32
|
||||
assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(32)));
|
||||
|
||||
// offset=64 (finalized block) should return hash 0 (the oldest)
|
||||
assert_eq!(get_hash_at_offset(&buffer, 64), Some(B256::with_last_byte(0)));
|
||||
|
||||
// offset=65 exceeds buffer, should return None
|
||||
assert_eq!(get_hash_at_offset(&buffer, 65), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_hash_at_offset_insufficient_entries() {
|
||||
let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
|
||||
|
||||
// With only 1 entry, only offset=0 works
|
||||
buffer.push(B256::with_last_byte(1));
|
||||
assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(1)));
|
||||
assert_eq!(get_hash_at_offset(&buffer, 1), None);
|
||||
assert_eq!(get_hash_at_offset(&buffer, 32), None);
|
||||
assert_eq!(get_hash_at_offset(&buffer, 64), None);
|
||||
|
||||
// With 33 entries, offset=32 works but offset=64 doesn't
|
||||
for i in 2..=33u8 {
|
||||
buffer.push(B256::with_last_byte(i));
|
||||
}
|
||||
assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(1)));
|
||||
assert_eq!(get_hash_at_offset(&buffer, 64), None);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -469,3 +469,123 @@ async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reorg with `RocksDB`: verifies that unwind correctly reads changesets from
|
||||
/// storage-aware locations (static files vs MDBX) rather than directly from MDBX.
|
||||
///
|
||||
/// This test exercises `unwind_trie_state_from` which previously failed with
|
||||
/// `UnsortedInput` errors because it read changesets directly from MDBX tables
|
||||
/// instead of using storage-aware methods that check `storage_changesets_in_static_files`.
|
||||
#[tokio::test]
|
||||
async fn test_rocksdb_reorg_unwind() -> Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
test_attributes_generator,
|
||||
)
|
||||
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
assert_eq!(nodes.len(), 1);
|
||||
|
||||
// Use two separate wallets to avoid nonce conflicts during reorg
|
||||
let wallets = wallet::Wallet::new(2).with_chain_id(chain_id).wallet_gen();
|
||||
let signer1 = wallets[0].clone();
|
||||
let signer2 = wallets[1].clone();
|
||||
let client = nodes[0].rpc_client().expect("RPC client");
|
||||
|
||||
// Mine block 1 with a transaction from signer1
|
||||
let raw_tx1 =
|
||||
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer1.clone(), 0).await;
|
||||
let tx_hash1 = nodes[0].rpc.inject_tx(raw_tx1).await?;
|
||||
wait_for_pending_tx(&client, tx_hash1).await;
|
||||
|
||||
let payload1 = nodes[0].advance_block().await?;
|
||||
let block1_hash = payload1.block().hash();
|
||||
assert_eq!(payload1.block().number(), 1);
|
||||
|
||||
// Poll until tx1 appears in RocksDB (ensures persistence happened)
|
||||
let tx_number1 = poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash1).await;
|
||||
assert_eq!(tx_number1, 0, "First tx should have tx_number 0");
|
||||
|
||||
// Mine block 2 with transaction from signer1 (nonce 1)
|
||||
let raw_tx2 =
|
||||
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer1.clone(), 1).await;
|
||||
let tx_hash2 = nodes[0].rpc.inject_tx(raw_tx2).await?;
|
||||
wait_for_pending_tx(&client, tx_hash2).await;
|
||||
|
||||
let payload2 = nodes[0].advance_block().await?;
|
||||
assert_eq!(payload2.block().number(), 2);
|
||||
|
||||
// Poll until tx2 appears in RocksDB
|
||||
let tx_number2 = poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash2).await;
|
||||
assert_eq!(tx_number2, 1, "Second tx should have tx_number 1");
|
||||
|
||||
// Mine block 3 with transaction from signer1 (nonce 2)
|
||||
let raw_tx3 =
|
||||
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer1.clone(), 2).await;
|
||||
let tx_hash3 = nodes[0].rpc.inject_tx(raw_tx3).await?;
|
||||
wait_for_pending_tx(&client, tx_hash3).await;
|
||||
|
||||
let payload3 = nodes[0].advance_block().await?;
|
||||
assert_eq!(payload3.block().number(), 3);
|
||||
|
||||
// Poll until tx3 appears in RocksDB
|
||||
let tx_number3 = poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash3).await;
|
||||
assert_eq!(tx_number3, 2, "Third tx should have tx_number 2");
|
||||
|
||||
// Now create an alternate block 2 using signer2 (different wallet, avoids nonce conflict)
|
||||
// Inject a tx from signer2 (nonce 0) before building the alternate block
|
||||
let raw_alt_tx =
|
||||
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer2.clone(), 0).await;
|
||||
let alt_tx_hash = nodes[0].rpc.inject_tx(raw_alt_tx).await?;
|
||||
wait_for_pending_tx(&client, alt_tx_hash).await;
|
||||
|
||||
// Build an alternate payload (this builds on top of the current head, i.e., block 3)
|
||||
// But we want to reorg back to block 1, so we'll use the payload and then FCU to it
|
||||
let alt_payload = nodes[0].new_payload().await?;
|
||||
let alt_block_hash = nodes[0].submit_payload(alt_payload.clone()).await?;
|
||||
|
||||
// Trigger reorg: make the alternate chain canonical by sending FCU pointing to block 1's hash
|
||||
// as finalized, which should trigger an unwind of blocks 2 and 3
|
||||
// The alt block becomes the new head
|
||||
nodes[0].update_forkchoice(block1_hash, alt_block_hash).await?;
|
||||
|
||||
// Give time for the reorg to complete
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
|
||||
// Verify we can still query transactions and the chain is consistent
|
||||
// If unwind_trie_state_from failed, this would have errored during reorg
|
||||
let latest: Option<alloy_rpc_types_eth::Block> =
|
||||
client.request("eth_getBlockByNumber", ("latest", false)).await?;
|
||||
let latest = latest.expect("Latest block should exist");
|
||||
// The alt block is at height 4 (on top of block 3)
|
||||
assert!(latest.header.number >= 3, "Should be at height >= 3 after operation");
|
||||
|
||||
// tx1 from block 1 should still be there
|
||||
let tx1: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash1]).await?;
|
||||
assert!(tx1.is_some(), "tx1 from block 1 should still be queryable");
|
||||
assert_eq!(tx1.unwrap().block_number, Some(1));
|
||||
|
||||
// Mine another block to verify the chain can continue
|
||||
let raw_tx_final =
|
||||
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer2.clone(), 1).await;
|
||||
let tx_hash_final = nodes[0].rpc.inject_tx(raw_tx_final).await?;
|
||||
wait_for_pending_tx(&client, tx_hash_final).await;
|
||||
|
||||
let final_payload = nodes[0].advance_block().await?;
|
||||
assert!(final_payload.block().number() > 3, "Should be able to mine block after reorg");
|
||||
|
||||
// Verify tx_final is included
|
||||
let tx_final: Option<Transaction> =
|
||||
client.request("eth_getTransactionByHash", [tx_hash_final]).await?;
|
||||
assert!(tx_final.is_some(), "final tx should be in latest block");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -32,9 +32,7 @@ futures-util.workspace = true
|
||||
# misc
|
||||
eyre.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
op-alloy-rpc-types-engine = { workspace = true, optional = true }
|
||||
reth-optimism-chainspec = { workspace = true, optional = true }
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
@@ -42,7 +40,6 @@ workspace = true
|
||||
[features]
|
||||
op = [
|
||||
"dep:op-alloy-rpc-types-engine",
|
||||
"dep:reth-optimism-chainspec",
|
||||
"reth-payload-primitives/op",
|
||||
"reth-primitives-traits/op",
|
||||
]
|
||||
|
||||
@@ -72,13 +72,18 @@ where
|
||||
&self,
|
||||
parent: &SealedHeader<ChainSpec::Header>,
|
||||
) -> op_alloy_rpc_types_engine::OpPayloadAttributes {
|
||||
/// Dummy system transaction for dev mode.
|
||||
/// OP Mainnet transaction at index 0 in block 124665056.
|
||||
///
|
||||
/// <https://optimistic.etherscan.io/tx/0x312e290cf36df704a2217b015d6455396830b0ce678b860ebfcc30f41403d7b1>
|
||||
const TX_SET_L1_BLOCK_OP_MAINNET_BLOCK_124665056: [u8; 251] = alloy_primitives::hex!(
|
||||
"7ef8f8a0683079df94aa5b9cf86687d739a60a9b4f0835e520ec4d664e2e415dca17a6df94deaddeaddeaddeaddeaddeaddeaddeaddead00019442000000000000000000000000000000000000158080830f424080b8a4440a5e200000146b000f79c500000000000000040000000066d052e700000000013ad8a3000000000000000000000000000000000000000000000000000000003ef1278700000000000000000000000000000000000000000000000000000000000000012fdf87b89884a61e74b322bbcf60386f543bfae7827725efaaf0ab1de2294a590000000000000000000000006887246668a3b87f54deb3b94ba47a6f63f32985"
|
||||
);
|
||||
|
||||
op_alloy_rpc_types_engine::OpPayloadAttributes {
|
||||
payload_attributes: self.build(parent),
|
||||
// Add dummy system transaction
|
||||
transactions: Some(vec![
|
||||
reth_optimism_chainspec::constants::TX_SET_L1_BLOCK_OP_MAINNET_BLOCK_124665056
|
||||
.into(),
|
||||
]),
|
||||
transactions: Some(vec![TX_SET_L1_BLOCK_OP_MAINNET_BLOCK_124665056.into()]),
|
||||
no_tx_pool: None,
|
||||
gas_limit: None,
|
||||
eip_1559_params: None,
|
||||
|
||||
@@ -47,6 +47,17 @@ pub const DEFAULT_RESERVED_CPU_CORES: usize = 1;
|
||||
/// Default maximum concurrency for prewarm task.
|
||||
pub const DEFAULT_PREWARM_MAX_CONCURRENCY: usize = 16;
|
||||
|
||||
/// Default depth for sparse trie pruning.
|
||||
///
|
||||
/// Nodes at this depth and below are converted to hash stubs to reduce memory.
|
||||
/// Depth 4 means we keep roughly 16^4 = 65536 potential branch paths at most.
|
||||
pub const DEFAULT_SPARSE_TRIE_PRUNE_DEPTH: usize = 4;
|
||||
|
||||
/// Default maximum number of storage tries to keep after pruning.
|
||||
///
|
||||
/// Storage tries beyond this limit are cleared (but allocations preserved).
|
||||
pub const DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES: usize = 100;
|
||||
|
||||
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = EPOCH_SLOTS as u32 * 2;
|
||||
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
|
||||
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
|
||||
@@ -152,6 +163,12 @@ pub struct TreeConfig {
|
||||
disable_proof_v2: bool,
|
||||
/// Whether to disable cache metrics recording (can be expensive with large cached state).
|
||||
disable_cache_metrics: bool,
|
||||
/// Whether to enable sparse trie as cache.
|
||||
enable_sparse_trie_as_cache: bool,
|
||||
/// Depth for sparse trie pruning after state root computation.
|
||||
sparse_trie_prune_depth: usize,
|
||||
/// Maximum number of storage tries to retain after pruning.
|
||||
sparse_trie_max_storage_tries: usize,
|
||||
}
|
||||
|
||||
impl Default for TreeConfig {
|
||||
@@ -181,6 +198,9 @@ impl Default for TreeConfig {
|
||||
account_worker_count: default_account_worker_count(),
|
||||
disable_proof_v2: false,
|
||||
disable_cache_metrics: false,
|
||||
enable_sparse_trie_as_cache: false,
|
||||
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
|
||||
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -213,6 +233,8 @@ impl TreeConfig {
|
||||
account_worker_count: usize,
|
||||
disable_proof_v2: bool,
|
||||
disable_cache_metrics: bool,
|
||||
sparse_trie_prune_depth: usize,
|
||||
sparse_trie_max_storage_tries: usize,
|
||||
) -> Self {
|
||||
Self {
|
||||
persistence_threshold,
|
||||
@@ -239,6 +261,9 @@ impl TreeConfig {
|
||||
account_worker_count,
|
||||
disable_proof_v2,
|
||||
disable_cache_metrics,
|
||||
enable_sparse_trie_as_cache: false,
|
||||
sparse_trie_prune_depth,
|
||||
sparse_trie_max_storage_tries,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -540,4 +565,37 @@ impl TreeConfig {
|
||||
self.disable_cache_metrics = disable_cache_metrics;
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns whether sparse trie as cache is enabled.
|
||||
pub const fn enable_sparse_trie_as_cache(&self) -> bool {
|
||||
self.enable_sparse_trie_as_cache
|
||||
}
|
||||
|
||||
/// Setter for whether to enable sparse trie as cache.
|
||||
pub const fn with_enable_sparse_trie_as_cache(mut self, value: bool) -> Self {
|
||||
self.enable_sparse_trie_as_cache = value;
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns the sparse trie prune depth.
|
||||
pub const fn sparse_trie_prune_depth(&self) -> usize {
|
||||
self.sparse_trie_prune_depth
|
||||
}
|
||||
|
||||
/// Setter for sparse trie prune depth.
|
||||
pub const fn with_sparse_trie_prune_depth(mut self, depth: usize) -> Self {
|
||||
self.sparse_trie_prune_depth = depth;
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns the maximum number of storage tries to retain after pruning.
|
||||
pub const fn sparse_trie_max_storage_tries(&self) -> usize {
|
||||
self.sparse_trie_max_storage_tries
|
||||
}
|
||||
|
||||
/// Setter for maximum storage tries to retain.
|
||||
pub const fn with_sparse_trie_max_storage_tries(mut self, max_tries: usize) -> Self {
|
||||
self.sparse_trie_max_storage_tries = max_tries;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ use reth_payload_primitives::{
|
||||
};
|
||||
use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
|
||||
use reth_provider::{
|
||||
BlockExecutionOutput, BlockExecutionResult, BlockNumReader, BlockReader, ChangeSetReader,
|
||||
BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
|
||||
DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
|
||||
StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
|
||||
TransactionVariant,
|
||||
@@ -321,11 +321,10 @@ where
|
||||
+ HashedPostStateProvider
|
||||
+ Clone
|
||||
+ 'static,
|
||||
<P as DatabaseProviderFactory>::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
|
||||
P::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
|
||||
+ StageCheckpointReader
|
||||
+ ChangeSetReader
|
||||
+ StorageChangeSetReader
|
||||
+ BlockNumReader,
|
||||
+ StorageChangeSetReader,
|
||||
C: ConfigureEvm<Primitives = N> + 'static,
|
||||
T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
|
||||
V: EngineValidator<T>,
|
||||
@@ -1406,7 +1405,20 @@ where
|
||||
);
|
||||
self.changeset_cache.evict(eviction_threshold);
|
||||
|
||||
// Invalidate cached overlay since the anchor has changed
|
||||
self.state.tree_state.invalidate_cached_overlay();
|
||||
|
||||
self.on_new_persisted_block()?;
|
||||
|
||||
// Re-prepare overlay for the current canonical head with the new anchor.
|
||||
// Spawn a background task to trigger computation so it's ready when the next payload
|
||||
// arrives.
|
||||
if let Some(overlay) = self.state.tree_state.prepare_canonical_overlay() {
|
||||
rayon::spawn(move || {
|
||||
let _ = overlay.get();
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -7,14 +7,14 @@ use crate::tree::{
|
||||
prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
|
||||
sparse_trie::StateRootComputeOutcome,
|
||||
},
|
||||
sparse_trie::SparseTrieTask,
|
||||
sparse_trie::{SparseTrieCacheTask, SparseTrieTask, SpawnedSparseTrieTask},
|
||||
StateProviderBuilder, TreeConfig,
|
||||
};
|
||||
use alloy_eip7928::BlockAccessList;
|
||||
use alloy_eips::eip1898::BlockWithParent;
|
||||
use alloy_evm::block::StateChangeSource;
|
||||
use alloy_primitives::B256;
|
||||
use crossbeam_channel::Sender as CrossbeamSender;
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
use executor::WorkloadExecutor;
|
||||
use metrics::Counter;
|
||||
use multiproof::{SparseTrieUpdate, *};
|
||||
@@ -39,10 +39,7 @@ use reth_trie_parallel::{
|
||||
proof_task::{ProofTaskCtx, ProofWorkerHandle},
|
||||
root::ParallelStateRootError,
|
||||
};
|
||||
use reth_trie_sparse::{
|
||||
provider::{TrieNodeProvider, TrieNodeProviderFactory},
|
||||
ClearedSparseStateTrie, RevealableSparseTrie, SparseStateTrie,
|
||||
};
|
||||
use reth_trie_sparse::{RevealableSparseTrie, SparseStateTrie};
|
||||
use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
@@ -59,10 +56,13 @@ use tracing::{debug, debug_span, instrument, warn, Span};
|
||||
pub mod bal;
|
||||
pub mod executor;
|
||||
pub mod multiproof;
|
||||
mod preserved_sparse_trie;
|
||||
pub mod prewarm;
|
||||
pub mod receipt_root_task;
|
||||
pub mod sparse_trie;
|
||||
|
||||
use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
|
||||
|
||||
/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
|
||||
///
|
||||
/// These values were determined by performing benchmarks using gradually increasing values to judge
|
||||
@@ -125,13 +125,16 @@ where
|
||||
precompile_cache_disabled: bool,
|
||||
/// Precompile cache map.
|
||||
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
|
||||
/// A cleared `SparseStateTrie`, kept around to be reused for the state root computation so
|
||||
/// that allocations can be minimized.
|
||||
sparse_state_trie: Arc<
|
||||
parking_lot::Mutex<Option<ClearedSparseStateTrie<ParallelSparseTrie, ParallelSparseTrie>>>,
|
||||
>,
|
||||
/// A pruned `SparseStateTrie`, kept around as a cache of already revealed trie nodes and to
|
||||
/// re-use allocated memory. Stored with the block hash it was computed for to enable trie
|
||||
/// preservation across sequential payload validations.
|
||||
sparse_state_trie: SharedPreservedSparseTrie,
|
||||
/// Maximum concurrency for prewarm task.
|
||||
prewarm_max_concurrency: usize,
|
||||
/// Sparse trie prune depth.
|
||||
sparse_trie_prune_depth: usize,
|
||||
/// Maximum storage tries to retain after pruning.
|
||||
sparse_trie_max_storage_tries: usize,
|
||||
/// Whether to disable cache metrics recording.
|
||||
disable_cache_metrics: bool,
|
||||
}
|
||||
@@ -163,8 +166,10 @@ where
|
||||
disable_state_cache: config.disable_state_cache(),
|
||||
precompile_cache_disabled: config.precompile_cache_disabled(),
|
||||
precompile_cache_map,
|
||||
sparse_state_trie: Arc::default(),
|
||||
sparse_state_trie: SharedPreservedSparseTrie::default(),
|
||||
prewarm_max_concurrency: config.prewarm_max_concurrency(),
|
||||
sparse_trie_prune_depth: config.sparse_trie_prune_depth(),
|
||||
sparse_trie_max_storage_tries: config.sparse_trie_max_storage_tries(),
|
||||
disable_cache_metrics: config.disable_cache_metrics(),
|
||||
}
|
||||
}
|
||||
@@ -240,6 +245,9 @@ where
|
||||
// Extract V2 proofs flag early so we can pass it to prewarm
|
||||
let v2_proofs_enabled = !config.disable_proof_v2();
|
||||
|
||||
// Capture parent_state_root before env is moved into spawn_caching_with
|
||||
let parent_state_root = env.parent_state_root;
|
||||
|
||||
// Handle BAL-based optimization if available
|
||||
let prewarm_handle = if let Some(bal) = bal {
|
||||
// When BAL is present, use BAL prewarming and send BAL to multiproof
|
||||
@@ -283,37 +291,46 @@ where
|
||||
v2_proofs_enabled,
|
||||
);
|
||||
|
||||
let multi_proof_task = MultiProofTask::new(
|
||||
proof_handle.clone(),
|
||||
to_sparse_trie,
|
||||
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
|
||||
to_multi_proof.clone(),
|
||||
from_multi_proof,
|
||||
)
|
||||
.with_v2_proofs_enabled(v2_proofs_enabled);
|
||||
if !config.enable_sparse_trie_as_cache() {
|
||||
let multi_proof_task = MultiProofTask::new(
|
||||
proof_handle.clone(),
|
||||
to_sparse_trie,
|
||||
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
|
||||
to_multi_proof.clone(),
|
||||
from_multi_proof.clone(),
|
||||
)
|
||||
.with_v2_proofs_enabled(v2_proofs_enabled);
|
||||
|
||||
// spawn multi-proof task
|
||||
let parent_span = span.clone();
|
||||
let saved_cache = prewarm_handle.saved_cache.clone();
|
||||
self.executor.spawn_blocking(move || {
|
||||
let _enter = parent_span.entered();
|
||||
// Build a state provider for the multiproof task
|
||||
let provider = provider_builder.build().expect("failed to build provider");
|
||||
let provider = if let Some(saved_cache) = saved_cache {
|
||||
let (cache, metrics, _disable_metrics) = saved_cache.split();
|
||||
Box::new(CachedStateProvider::new(provider, cache, metrics))
|
||||
as Box<dyn StateProvider>
|
||||
} else {
|
||||
Box::new(provider)
|
||||
};
|
||||
multi_proof_task.run(provider);
|
||||
});
|
||||
// spawn multi-proof task
|
||||
let parent_span = span.clone();
|
||||
let saved_cache = prewarm_handle.saved_cache.clone();
|
||||
self.executor.spawn_blocking(move || {
|
||||
let _enter = parent_span.entered();
|
||||
// Build a state provider for the multiproof task
|
||||
let provider = provider_builder.build().expect("failed to build provider");
|
||||
let provider = if let Some(saved_cache) = saved_cache {
|
||||
let (cache, metrics, _disable_metrics) = saved_cache.split();
|
||||
Box::new(CachedStateProvider::new(provider, cache, metrics))
|
||||
as Box<dyn StateProvider>
|
||||
} else {
|
||||
Box::new(provider)
|
||||
};
|
||||
multi_proof_task.run(provider);
|
||||
});
|
||||
}
|
||||
|
||||
// wire the sparse trie to the state root response receiver
|
||||
let (state_root_tx, state_root_rx) = channel();
|
||||
|
||||
// Spawn the sparse trie task using any stored trie and parallel trie configuration.
|
||||
self.spawn_sparse_trie_task(sparse_trie_rx, proof_handle, state_root_tx);
|
||||
self.spawn_sparse_trie_task(
|
||||
sparse_trie_rx,
|
||||
proof_handle,
|
||||
state_root_tx,
|
||||
from_multi_proof,
|
||||
config,
|
||||
parent_state_root,
|
||||
);
|
||||
|
||||
PayloadHandle {
|
||||
to_multi_proof: Some(to_multi_proof),
|
||||
@@ -492,64 +509,124 @@ where
|
||||
}
|
||||
|
||||
/// Spawns the [`SparseTrieTask`] for this payload processor.
|
||||
///
|
||||
/// The trie is preserved when the new payload is a child of the previous one.
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
|
||||
fn spawn_sparse_trie_task<BPF>(
|
||||
fn spawn_sparse_trie_task(
|
||||
&self,
|
||||
sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
|
||||
proof_worker_handle: BPF,
|
||||
proof_worker_handle: ProofWorkerHandle,
|
||||
state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
|
||||
) where
|
||||
BPF: TrieNodeProviderFactory + Clone + Send + Sync + 'static,
|
||||
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
{
|
||||
let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
|
||||
from_multi_proof: CrossbeamReceiver<MultiProofMessage>,
|
||||
config: &TreeConfig,
|
||||
parent_state_root: B256,
|
||||
) {
|
||||
let preserved_sparse_trie = self.sparse_state_trie.clone();
|
||||
let trie_metrics = self.trie_metrics.clone();
|
||||
let span = Span::current();
|
||||
let disable_sparse_trie_as_cache = !config.enable_sparse_trie_as_cache();
|
||||
let prune_depth = self.sparse_trie_prune_depth;
|
||||
let max_storage_tries = self.sparse_trie_max_storage_tries;
|
||||
|
||||
self.executor.spawn_blocking(move || {
|
||||
let _enter = span.entered();
|
||||
|
||||
// Reuse a stored SparseStateTrie, or create a new one using the desired configuration
|
||||
// if there's none to reuse.
|
||||
let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
|
||||
let default_trie = RevealableSparseTrie::blind_from(
|
||||
ParallelSparseTrie::default()
|
||||
.with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
|
||||
);
|
||||
ClearedSparseStateTrie::from_state_trie(
|
||||
// Reuse a stored SparseStateTrie if available, applying continuation logic.
|
||||
// If this payload's parent state root matches the preserved trie's anchor,
|
||||
// we can reuse the pruned trie structure. Otherwise, we clear the trie but
|
||||
// keep allocations.
|
||||
let sparse_state_trie = preserved_sparse_trie
|
||||
.take()
|
||||
.map(|preserved| preserved.into_trie_for(parent_state_root))
|
||||
.unwrap_or_else(|| {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor",
|
||||
"Creating new sparse trie - no preserved trie available"
|
||||
);
|
||||
let default_trie = RevealableSparseTrie::blind_from(
|
||||
ParallelSparseTrie::default().with_parallelism_thresholds(
|
||||
PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS,
|
||||
),
|
||||
);
|
||||
SparseStateTrie::new()
|
||||
.with_accounts_trie(default_trie.clone())
|
||||
.with_default_storage_trie(default_trie)
|
||||
.with_updates(true),
|
||||
)
|
||||
});
|
||||
.with_updates(true)
|
||||
});
|
||||
|
||||
let task =
|
||||
SparseTrieTask::<_, ParallelSparseTrie, ParallelSparseTrie>::new_with_cleared_trie(
|
||||
let mut task = if disable_sparse_trie_as_cache {
|
||||
SpawnedSparseTrieTask::Cleared(SparseTrieTask::new(
|
||||
sparse_trie_rx,
|
||||
proof_worker_handle,
|
||||
trie_metrics,
|
||||
trie_metrics.clone(),
|
||||
sparse_state_trie,
|
||||
))
|
||||
} else {
|
||||
SpawnedSparseTrieTask::Cached(SparseTrieCacheTask::new(
|
||||
from_multi_proof,
|
||||
proof_worker_handle,
|
||||
trie_metrics.clone(),
|
||||
sparse_state_trie,
|
||||
))
|
||||
};
|
||||
|
||||
let result = task.run();
|
||||
if let Err(e) = &result {
|
||||
tracing::error!(target: "engine::tree::payload_processor", "State root computation failed: {e:?}");
|
||||
}
|
||||
// Capture the computed state_root before sending the result
|
||||
let computed_state_root = result.as_ref().ok().map(|outcome| outcome.state_root);
|
||||
|
||||
// Acquire the guard before sending the result to prevent a race condition:
|
||||
// Without this, the next block could start after send() but before store(),
|
||||
// causing take() to return None and forcing it to create a new empty trie
|
||||
// instead of reusing the preserved one. Holding the guard ensures the next
|
||||
// block's take() blocks until we've stored the trie for reuse.
|
||||
let mut guard = preserved_sparse_trie.lock();
|
||||
|
||||
// Send state root computation result - next block may start but will block on take()
|
||||
if state_root_tx.send(result).is_err() {
|
||||
// Receiver dropped - payload was likely invalid or cancelled.
|
||||
// Clear the trie instead of preserving potentially invalid state.
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor",
|
||||
"State root receiver dropped, clearing trie"
|
||||
);
|
||||
let trie = task.into_cleared_trie(
|
||||
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
|
||||
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
|
||||
);
|
||||
guard.store(PreservedSparseTrie::cleared(trie));
|
||||
return;
|
||||
}
|
||||
|
||||
let (result, trie) = task.run();
|
||||
// Send state root computation result
|
||||
let _ = state_root_tx.send(result);
|
||||
|
||||
// Clear the SparseStateTrie, shrink, and replace it back into the mutex _after_ sending
|
||||
// results to the next step, so that time spent clearing doesn't block the step after
|
||||
// this one.
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor", "clear").entered();
|
||||
let mut cleared_trie = ClearedSparseStateTrie::from_state_trie(trie);
|
||||
|
||||
// Shrink the sparse trie so that we don't have ever increasing memory.
|
||||
cleared_trie.shrink_to(
|
||||
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
|
||||
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
|
||||
);
|
||||
|
||||
cleared_sparse_trie.lock().replace(cleared_trie);
|
||||
// Only preserve the trie as anchored if computation succeeded.
|
||||
// A failed computation may have left the trie in a partially updated state.
|
||||
let _enter =
|
||||
debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
|
||||
if let Some(state_root) = computed_state_root {
|
||||
let start = std::time::Instant::now();
|
||||
let trie = task.into_trie_for_reuse(
|
||||
prune_depth,
|
||||
max_storage_tries,
|
||||
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
|
||||
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
|
||||
);
|
||||
trie_metrics
|
||||
.into_trie_for_reuse_duration_histogram
|
||||
.record(start.elapsed().as_secs_f64());
|
||||
guard.store(PreservedSparseTrie::anchored(trie, state_root));
|
||||
} else {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor",
|
||||
"State root computation failed, clearing trie"
|
||||
);
|
||||
let trie = task.into_cleared_trie(
|
||||
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
|
||||
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
|
||||
);
|
||||
guard.store(PreservedSparseTrie::cleared(trie));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -883,6 +960,10 @@ pub struct ExecutionEnv<Evm: ConfigureEvm> {
|
||||
pub hash: B256,
|
||||
/// Hash of the parent block.
|
||||
pub parent_hash: B256,
|
||||
/// State root of the parent block.
|
||||
/// Used for sparse trie continuation: if the preserved trie's anchor matches this,
|
||||
/// the trie can be reused directly.
|
||||
pub parent_state_root: B256,
|
||||
}
|
||||
|
||||
impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
|
||||
@@ -894,6 +975,7 @@ where
|
||||
evm_env: Default::default(),
|
||||
hash: Default::default(),
|
||||
parent_hash: Default::default(),
|
||||
parent_state_root: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -311,11 +311,7 @@ impl VersionedMultiProofTargets {
|
||||
fn chunking_length(&self) -> usize {
|
||||
match self {
|
||||
Self::Legacy(targets) => targets.chunking_length(),
|
||||
Self::V2(targets) => {
|
||||
// For V2, count accounts + storage slots
|
||||
targets.account_targets.len() +
|
||||
targets.storage_targets.values().map(|slots| slots.len()).sum::<usize>()
|
||||
}
|
||||
Self::V2(targets) => targets.chunking_length(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -587,6 +583,8 @@ pub(crate) struct MultiProofTaskMetrics {
|
||||
pub first_update_wait_time_histogram: Histogram,
|
||||
/// Total time spent waiting for the last proof result.
|
||||
pub last_proof_wait_time_histogram: Histogram,
|
||||
/// Time spent preparing the sparse trie for reuse after state root computation.
|
||||
pub into_trie_for_reuse_duration_histogram: Histogram,
|
||||
}
|
||||
|
||||
/// Standalone task that receives a transaction state stream and updates relevant
|
||||
@@ -1492,7 +1490,7 @@ fn get_proof_targets(
|
||||
/// Dispatches work items as a single unit or in chunks based on target size and worker
|
||||
/// availability.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn dispatch_with_chunking<T, I>(
|
||||
pub(crate) fn dispatch_with_chunking<T, I>(
|
||||
items: T,
|
||||
chunking_len: usize,
|
||||
chunk_size: Option<usize>,
|
||||
|
||||
@@ -0,0 +1,117 @@
|
||||
//! Preserved sparse trie for reuse across payload validations.
|
||||
|
||||
use alloy_primitives::B256;
|
||||
use parking_lot::Mutex;
|
||||
use reth_trie_sparse::SparseStateTrie;
|
||||
use reth_trie_sparse_parallel::ParallelSparseTrie;
|
||||
use std::sync::Arc;
|
||||
use tracing::debug;
|
||||
|
||||
/// Type alias for the sparse trie type used in preservation.
|
||||
pub(super) type SparseTrie = SparseStateTrie<ParallelSparseTrie, ParallelSparseTrie>;
|
||||
|
||||
/// Shared handle to a preserved sparse trie that can be reused across payload validations.
|
||||
///
|
||||
/// This is stored in [`PayloadProcessor`](super::PayloadProcessor) and cloned to pass to
|
||||
/// [`SparseTrieTask`](super::sparse_trie::SparseTrieTask) for trie reuse.
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub(super) struct SharedPreservedSparseTrie(Arc<Mutex<Option<PreservedSparseTrie>>>);
|
||||
|
||||
impl SharedPreservedSparseTrie {
|
||||
/// Takes the preserved trie if present, leaving `None` in its place.
|
||||
pub(super) fn take(&self) -> Option<PreservedSparseTrie> {
|
||||
self.0.lock().take()
|
||||
}
|
||||
|
||||
/// Acquires a guard that blocks `take()` until dropped.
|
||||
/// Use this before sending the state root result to ensure the next block
|
||||
/// waits for the trie to be stored.
|
||||
pub(super) fn lock(&self) -> PreservedTrieGuard<'_> {
|
||||
PreservedTrieGuard(self.0.lock())
|
||||
}
|
||||
}
|
||||
|
||||
/// Guard that holds the lock on the preserved trie.
|
||||
/// While held, `take()` will block. Call `store()` to save the trie before dropping.
|
||||
pub(super) struct PreservedTrieGuard<'a>(parking_lot::MutexGuard<'a, Option<PreservedSparseTrie>>);
|
||||
|
||||
impl PreservedTrieGuard<'_> {
|
||||
/// Stores a preserved trie for later reuse.
|
||||
pub(super) fn store(&mut self, trie: PreservedSparseTrie) {
|
||||
self.0.replace(trie);
|
||||
}
|
||||
}
|
||||
|
||||
/// A preserved sparse trie that can be reused across payload validations.
|
||||
///
|
||||
/// The trie exists in one of two states:
|
||||
/// - **Anchored**: Has a computed state root and can be reused for payloads whose parent state root
|
||||
/// matches the anchor.
|
||||
/// - **Cleared**: Trie data has been cleared but allocations are preserved for reuse.
|
||||
#[derive(Debug)]
|
||||
pub(super) enum PreservedSparseTrie {
|
||||
/// Trie with a computed state root that can be reused for continuation payloads.
|
||||
Anchored {
|
||||
/// The sparse state trie (pruned after root computation).
|
||||
trie: SparseTrie,
|
||||
/// The state root this trie represents (computed from the previous block).
|
||||
/// Used to verify continuity: new payload's `parent_state_root` must match this.
|
||||
state_root: B256,
|
||||
},
|
||||
/// Cleared trie with preserved allocations, ready for fresh use.
|
||||
Cleared {
|
||||
/// The sparse state trie with cleared data but preserved allocations.
|
||||
trie: SparseTrie,
|
||||
},
|
||||
}
|
||||
|
||||
impl PreservedSparseTrie {
|
||||
/// Creates a new anchored preserved trie.
|
||||
///
|
||||
/// The `state_root` is the computed state root from the trie, which becomes the
|
||||
/// anchor for determining if subsequent payloads can reuse this trie.
|
||||
pub(super) const fn anchored(trie: SparseTrie, state_root: B256) -> Self {
|
||||
Self::Anchored { trie, state_root }
|
||||
}
|
||||
|
||||
/// Creates a cleared preserved trie (allocations preserved, data cleared).
|
||||
pub(super) const fn cleared(trie: SparseTrie) -> Self {
|
||||
Self::Cleared { trie }
|
||||
}
|
||||
|
||||
/// Consumes self and returns the trie for reuse.
|
||||
///
|
||||
/// If the preserved trie is anchored and the parent state root matches, the pruned
|
||||
/// trie structure is reused directly. Otherwise, the trie is cleared but allocations
|
||||
/// are preserved to reduce memory overhead.
|
||||
pub(super) fn into_trie_for(self, parent_state_root: B256) -> SparseTrie {
|
||||
match self {
|
||||
Self::Anchored { trie, state_root } if state_root == parent_state_root => {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor",
|
||||
%state_root,
|
||||
"Reusing anchored sparse trie for continuation payload"
|
||||
);
|
||||
trie
|
||||
}
|
||||
Self::Anchored { mut trie, state_root } => {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor",
|
||||
anchor_root = %state_root,
|
||||
%parent_state_root,
|
||||
"Clearing anchored sparse trie - parent state root mismatch"
|
||||
);
|
||||
trie.clear();
|
||||
trie
|
||||
}
|
||||
Self::Cleared { trie } => {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor",
|
||||
%parent_state_root,
|
||||
"Using cleared sparse trie with preserved allocations"
|
||||
);
|
||||
trie
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,21 +1,100 @@
|
||||
//! Sparse Trie task related functionality.
|
||||
|
||||
use crate::tree::payload_processor::multiproof::{MultiProofTaskMetrics, SparseTrieUpdate};
|
||||
use alloy_primitives::B256;
|
||||
use rayon::iter::{ParallelBridge, ParallelIterator};
|
||||
use reth_trie::{updates::TrieUpdates, Nibbles};
|
||||
use reth_trie_parallel::{proof_task::ProofResult, root::ParallelStateRootError};
|
||||
use reth_trie_sparse::{
|
||||
errors::{SparseStateTrieResult, SparseTrieErrorKind},
|
||||
provider::{TrieNodeProvider, TrieNodeProviderFactory},
|
||||
ClearedSparseStateTrie, SerialSparseTrie, SparseStateTrie, SparseTrie,
|
||||
use crate::tree::{
|
||||
multiproof::{
|
||||
dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
|
||||
VersionedMultiProofTargets,
|
||||
},
|
||||
payload_processor::multiproof::{MultiProofTaskMetrics, SparseTrieUpdate},
|
||||
};
|
||||
use alloy_primitives::B256;
|
||||
use alloy_rlp::Decodable;
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
use rayon::iter::{IntoParallelRefMutIterator, ParallelBridge, ParallelIterator};
|
||||
use reth_primitives_traits::Account;
|
||||
use reth_revm::state::EvmState;
|
||||
use reth_trie::{
|
||||
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
|
||||
TrieAccount, EMPTY_ROOT_HASH,
|
||||
};
|
||||
use reth_trie_parallel::{
|
||||
proof_task::{
|
||||
AccountMultiproofInput, ProofResult, ProofResultContext, ProofResultMessage,
|
||||
ProofWorkerHandle,
|
||||
},
|
||||
root::ParallelStateRootError,
|
||||
targets_v2::MultiProofTargetsV2,
|
||||
};
|
||||
use reth_trie_sparse::{
|
||||
errors::{SparseStateTrieResult, SparseTrieErrorKind, SparseTrieResult},
|
||||
provider::{TrieNodeProvider, TrieNodeProviderFactory},
|
||||
LeafUpdate, SerialSparseTrie, SparseStateTrie, SparseTrie, SparseTrieExt,
|
||||
};
|
||||
use revm_primitives::{hash_map::Entry, B256Map};
|
||||
use smallvec::SmallVec;
|
||||
use std::{
|
||||
sync::mpsc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tracing::{debug, debug_span, instrument, trace};
|
||||
use tracing::{debug, debug_span, error, instrument, trace};
|
||||
|
||||
#[expect(clippy::large_enum_variant)]
|
||||
pub(super) enum SpawnedSparseTrieTask<BPF, A, S>
|
||||
where
|
||||
BPF: TrieNodeProviderFactory + Send + Sync,
|
||||
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
|
||||
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
|
||||
{
|
||||
Cleared(SparseTrieTask<BPF, A, S>),
|
||||
Cached(SparseTrieCacheTask<A, S>),
|
||||
}
|
||||
|
||||
impl<BPF, A, S> SpawnedSparseTrieTask<BPF, A, S>
|
||||
where
|
||||
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
|
||||
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
|
||||
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
|
||||
{
|
||||
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
|
||||
match self {
|
||||
Self::Cleared(task) => task.run(),
|
||||
Self::Cached(task) => task.run(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn into_trie_for_reuse(
|
||||
self,
|
||||
prune_depth: usize,
|
||||
max_storage_tries: usize,
|
||||
max_nodes_capacity: usize,
|
||||
max_values_capacity: usize,
|
||||
) -> SparseStateTrie<A, S> {
|
||||
match self {
|
||||
Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
|
||||
Self::Cached(task) => task.into_trie_for_reuse(
|
||||
prune_depth,
|
||||
max_storage_tries,
|
||||
max_nodes_capacity,
|
||||
max_values_capacity,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn into_cleared_trie(
|
||||
self,
|
||||
max_nodes_capacity: usize,
|
||||
max_values_capacity: usize,
|
||||
) -> SparseStateTrie<A, S> {
|
||||
match self {
|
||||
Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
|
||||
Self::Cached(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A task responsible for populating the sparse trie.
|
||||
pub(super) struct SparseTrieTask<BPF, A = SerialSparseTrie, S = SerialSparseTrie>
|
||||
@@ -38,46 +117,29 @@ where
|
||||
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
|
||||
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
A: SparseTrie + Send + Sync + Default,
|
||||
S: SparseTrie + Send + Sync + Default + Clone,
|
||||
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
|
||||
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
|
||||
{
|
||||
/// Creates a new sparse trie, pre-populating with a [`ClearedSparseStateTrie`].
|
||||
pub(super) fn new_with_cleared_trie(
|
||||
/// Creates a new sparse trie task with the given trie.
|
||||
pub(super) const fn new(
|
||||
updates: mpsc::Receiver<SparseTrieUpdate>,
|
||||
blinded_provider_factory: BPF,
|
||||
metrics: MultiProofTaskMetrics,
|
||||
sparse_state_trie: ClearedSparseStateTrie<A, S>,
|
||||
trie: SparseStateTrie<A, S>,
|
||||
) -> Self {
|
||||
Self { updates, metrics, trie: sparse_state_trie.into_inner(), blinded_provider_factory }
|
||||
Self { updates, metrics, trie, blinded_provider_factory }
|
||||
}
|
||||
|
||||
/// Runs the sparse trie task to completion.
|
||||
/// Runs the sparse trie task to completion, computing the state root.
|
||||
///
|
||||
/// This waits for new incoming [`SparseTrieUpdate`].
|
||||
///
|
||||
/// This concludes once the last trie update has been received.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// - State root computation outcome.
|
||||
/// - `SparseStateTrie` that needs to be cleared and reused to avoid reallocations.
|
||||
/// Receives [`SparseTrieUpdate`]s until the channel is closed, applying each update
|
||||
/// to the trie. Once all updates are processed, computes and returns the final state root.
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
)]
|
||||
pub(super) fn run(
|
||||
mut self,
|
||||
) -> (Result<StateRootComputeOutcome, ParallelStateRootError>, SparseStateTrie<A, S>) {
|
||||
// run the main loop to completion
|
||||
let result = self.run_inner();
|
||||
(result, self.trie)
|
||||
}
|
||||
|
||||
/// Inner function to run the sparse trie task to completion.
|
||||
///
|
||||
/// See [`Self::run`] for more information.
|
||||
fn run_inner(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
|
||||
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
|
||||
let now = Instant::now();
|
||||
|
||||
let mut num_iterations = 0;
|
||||
@@ -127,6 +189,528 @@ where
|
||||
|
||||
Ok(StateRootComputeOutcome { state_root, trie_updates })
|
||||
}
|
||||
|
||||
/// Clears and shrinks the trie, discarding all state.
|
||||
///
|
||||
/// Use this when the payload was invalid or cancelled - we don't want to preserve
|
||||
/// potentially invalid trie state, but we keep the allocations for reuse.
|
||||
pub(super) fn into_cleared_trie(
|
||||
mut self,
|
||||
max_nodes_capacity: usize,
|
||||
max_values_capacity: usize,
|
||||
) -> SparseStateTrie<A, S> {
|
||||
self.trie.clear();
|
||||
self.trie.shrink_to(max_nodes_capacity, max_values_capacity);
|
||||
self.trie
|
||||
}
|
||||
}
|
||||
|
||||
/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
|
||||
pub(super) struct SparseTrieCacheTask<A = SerialSparseTrie, S = SerialSparseTrie> {
|
||||
/// Sender for proof results.
|
||||
proof_result_tx: CrossbeamSender<ProofResultMessage>,
|
||||
/// Receiver for proof results directly from workers.
|
||||
proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
|
||||
/// Receives updates from execution and prewarming.
|
||||
updates: CrossbeamReceiver<MultiProofMessage>,
|
||||
/// `SparseStateTrie` used for computing the state root.
|
||||
trie: SparseStateTrie<A, S>,
|
||||
/// Handle to the proof worker pools (storage and account).
|
||||
proof_worker_handle: ProofWorkerHandle,
|
||||
/// Account trie updates.
|
||||
account_updates: B256Map<LeafUpdate>,
|
||||
/// Storage trie updates. hashed address -> slot -> update.
|
||||
storage_updates: B256Map<B256Map<LeafUpdate>>,
|
||||
/// Account updates that are blocked by storage root calculation or account reveal.
|
||||
///
|
||||
/// Those are being moved into `account_updates` once storage roots
|
||||
/// are revealed and/or calculated.
|
||||
///
|
||||
/// Invariant: for each entry in `pending_account_updates` account must either be already
|
||||
/// revealed in the trie or have an entry in `account_updates`.
|
||||
///
|
||||
/// Values can be either of:
|
||||
/// - None: account had a storage update and is awaiting storage root calculation and/or
|
||||
/// account node reveal to complete.
|
||||
/// - Some(_): account was changed/destroyed and is awaiting storage root calculation/reveal
|
||||
/// to complete.
|
||||
pending_account_updates: B256Map<Option<Option<Account>>>,
|
||||
/// Cache of account proof targets that were already fetched/requested from the proof workers.
|
||||
/// account -> lowest `min_len` requested.
|
||||
fetched_account_targets: B256Map<u8>,
|
||||
/// Cache of storage proof targets that have already been fetched/requested from the proof
|
||||
/// workers. account -> slot -> lowest `min_len` requested.
|
||||
fetched_storage_targets: B256Map<B256Map<u8>>,
|
||||
/// Whether the last state update has been received.
|
||||
finished_state_updates: bool,
|
||||
/// Pending targets to be dispatched to the proof workers.
|
||||
pending_targets: MultiProofTargetsV2,
|
||||
/// Number of pending updates that were received but not yet processed.
|
||||
pending_updates: usize,
|
||||
/// Metrics for the sparse trie.
|
||||
metrics: MultiProofTaskMetrics,
|
||||
}
|
||||
|
||||
impl<A, S> SparseTrieCacheTask<A, S>
|
||||
where
|
||||
A: SparseTrieExt + Default,
|
||||
S: SparseTrieExt + Default + Clone,
|
||||
{
|
||||
/// Creates a new sparse trie, pre-populating with a [`ClearedSparseStateTrie`].
|
||||
pub(super) fn new(
|
||||
updates: CrossbeamReceiver<MultiProofMessage>,
|
||||
proof_worker_handle: ProofWorkerHandle,
|
||||
metrics: MultiProofTaskMetrics,
|
||||
trie: SparseStateTrie<A, S>,
|
||||
) -> Self {
|
||||
let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
|
||||
Self {
|
||||
proof_result_tx,
|
||||
proof_result_rx,
|
||||
updates,
|
||||
proof_worker_handle,
|
||||
trie,
|
||||
account_updates: Default::default(),
|
||||
storage_updates: Default::default(),
|
||||
pending_account_updates: Default::default(),
|
||||
fetched_account_targets: Default::default(),
|
||||
fetched_storage_targets: Default::default(),
|
||||
finished_state_updates: Default::default(),
|
||||
pending_targets: Default::default(),
|
||||
pending_updates: Default::default(),
|
||||
metrics,
|
||||
}
|
||||
}
|
||||
|
||||
/// Prunes and shrinks the trie for reuse in the next payload built on top of this one.
|
||||
///
|
||||
/// Should be called after the state root result has been sent.
|
||||
pub(super) fn into_trie_for_reuse(
|
||||
mut self,
|
||||
prune_depth: usize,
|
||||
max_storage_tries: usize,
|
||||
max_nodes_capacity: usize,
|
||||
max_values_capacity: usize,
|
||||
) -> SparseStateTrie<A, S> {
|
||||
self.trie.prune(prune_depth, max_storage_tries);
|
||||
self.trie.shrink_to(max_nodes_capacity, max_values_capacity);
|
||||
self.trie
|
||||
}
|
||||
|
||||
/// Clears and shrinks the trie, discarding all state.
|
||||
///
|
||||
/// Use this when the payload was invalid or cancelled - we don't want to preserve
|
||||
/// potentially invalid trie state, but we keep the allocations for reuse.
|
||||
pub(super) fn into_cleared_trie(
|
||||
mut self,
|
||||
max_nodes_capacity: usize,
|
||||
max_values_capacity: usize,
|
||||
) -> SparseStateTrie<A, S> {
|
||||
self.trie.clear();
|
||||
self.trie.shrink_to(max_nodes_capacity, max_values_capacity);
|
||||
self.trie
|
||||
}
|
||||
|
||||
/// Runs the sparse trie task to completion.
|
||||
///
|
||||
/// This waits for new incoming [`MultiProofMessage`]s, applies updates to the trie and
|
||||
/// schedules proof fetching when needed.
|
||||
///
|
||||
/// This concludes once the last state update has been received and processed.
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
)]
|
||||
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
|
||||
let now = Instant::now();
|
||||
|
||||
loop {
|
||||
crossbeam_channel::select_biased! {
|
||||
recv(self.updates) -> message => {
|
||||
let update = match message {
|
||||
Ok(m) => m,
|
||||
Err(_) => {
|
||||
break
|
||||
}
|
||||
};
|
||||
|
||||
self.on_multiproof_message(update);
|
||||
self.pending_updates += 1;
|
||||
}
|
||||
recv(self.proof_result_rx) -> message => {
|
||||
let Ok(result) = message else {
|
||||
unreachable!("we own the sender half")
|
||||
};
|
||||
let ProofResult::V2(mut result) = result.result? else {
|
||||
unreachable!("sparse trie as cache must only be used with multiproof v2");
|
||||
};
|
||||
|
||||
while let Ok(res) = self.proof_result_rx.try_recv() {
|
||||
let ProofResult::V2(res) = res.result? else {
|
||||
unreachable!("sparse trie as cache must only be used with multiproof v2");
|
||||
};
|
||||
result.extend(res);
|
||||
}
|
||||
self.on_proof_result(result)?;
|
||||
},
|
||||
}
|
||||
|
||||
if self.updates.is_empty() && self.proof_result_rx.is_empty() {
|
||||
self.dispatch_pending_targets();
|
||||
self.process_updates()?;
|
||||
self.dispatch_pending_targets();
|
||||
} else if self.updates.is_empty() || self.pending_updates > 100 {
|
||||
self.process_leaf_updates()?;
|
||||
self.dispatch_pending_targets();
|
||||
} else if self.updates.is_empty() || self.pending_targets.chunking_length() > 100 {
|
||||
self.dispatch_pending_targets();
|
||||
}
|
||||
|
||||
if self.finished_state_updates &&
|
||||
self.account_updates.is_empty() &&
|
||||
self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
debug!(target: "engine::root", "All proofs processed, ending calculation");
|
||||
|
||||
let start = Instant::now();
|
||||
let (state_root, trie_updates) =
|
||||
self.trie.root_with_updates(&self.proof_worker_handle).map_err(|e| {
|
||||
ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
|
||||
})?;
|
||||
|
||||
let end = Instant::now();
|
||||
self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
|
||||
self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
|
||||
|
||||
Ok(StateRootComputeOutcome { state_root, trie_updates })
|
||||
}
|
||||
|
||||
fn on_multiproof_message(&mut self, message: MultiProofMessage) {
|
||||
match message {
|
||||
MultiProofMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
|
||||
MultiProofMessage::StateUpdate(_, state) => self.on_state_update(state),
|
||||
MultiProofMessage::EmptyProof { .. } => unreachable!(),
|
||||
MultiProofMessage::BlockAccessList(_) => todo!(),
|
||||
MultiProofMessage::FinishedStateUpdates => self.finished_state_updates = true,
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
)]
|
||||
fn on_prewarm_targets(&mut self, targets: VersionedMultiProofTargets) {
|
||||
let VersionedMultiProofTargets::V2(targets) = targets else {
|
||||
unreachable!("sparse trie as cache must only be used with V2 multiproof targets");
|
||||
};
|
||||
|
||||
for target in targets.account_targets {
|
||||
// Only touch accounts that are not yet present in the updates set.
|
||||
self.account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
|
||||
}
|
||||
|
||||
for (address, slots) in targets.storage_targets {
|
||||
for slot in slots {
|
||||
// Only touch storages that are not yet present in the updates set.
|
||||
self.storage_updates
|
||||
.entry(address)
|
||||
.or_default()
|
||||
.entry(slot.key())
|
||||
.or_insert(LeafUpdate::Touched);
|
||||
}
|
||||
|
||||
// Touch corresponding account leaf to make sure its revealed in accounts trie for
|
||||
// storage root update.
|
||||
self.account_updates.entry(address).or_insert(LeafUpdate::Touched);
|
||||
}
|
||||
}
|
||||
|
||||
/// Processes a state update and encodes all state changes as trie updates.
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all,
|
||||
fields(accounts = update.len())
|
||||
)]
|
||||
fn on_state_update(&mut self, update: EvmState) {
|
||||
let hashed_state_update = evm_state_to_hashed_post_state(update);
|
||||
|
||||
for (address, storage) in hashed_state_update.storages {
|
||||
for (slot, value) in storage.storage {
|
||||
let encoded = if value.is_zero() {
|
||||
Vec::new()
|
||||
} else {
|
||||
alloy_rlp::encode_fixed_size(&value).to_vec()
|
||||
};
|
||||
self.storage_updates
|
||||
.entry(address)
|
||||
.or_default()
|
||||
.insert(slot, LeafUpdate::Changed(encoded));
|
||||
}
|
||||
|
||||
// Make sure account is tracked in `account_updates` so that it is revealed in accounts
|
||||
// trie for storage root update.
|
||||
self.account_updates.entry(address).or_insert(LeafUpdate::Touched);
|
||||
|
||||
// Make sure account is tracked in `pending_account_updates` so that once storage root
|
||||
// is computed, it will be updated in the accounts trie.
|
||||
self.pending_account_updates.entry(address).or_insert(None);
|
||||
}
|
||||
|
||||
for (address, account) in hashed_state_update.accounts {
|
||||
// Track account as touched.
|
||||
//
|
||||
// This might overwrite an existing update, which is fine, because storage root from it
|
||||
// is already tracked in the trie and can be easily fetched again.
|
||||
self.account_updates.insert(address, LeafUpdate::Touched);
|
||||
|
||||
// Track account in `pending_account_updates` so that once storage root is computed,
|
||||
// it will be updated in the accounts trie.
|
||||
self.pending_account_updates.insert(address, Some(account));
|
||||
}
|
||||
}
|
||||
|
||||
fn on_proof_result(
|
||||
&mut self,
|
||||
result: DecodedMultiProofV2,
|
||||
) -> Result<(), ParallelStateRootError> {
|
||||
self.trie.reveal_decoded_multiproof_v2(result).map_err(|e| {
|
||||
ParallelStateRootError::Other(format!("could not reveal multiproof: {e:?}"))
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
)]
|
||||
fn process_leaf_updates(&mut self) -> SparseTrieResult<()> {
|
||||
self.pending_updates = 0;
|
||||
|
||||
// Make sure that tries exist for all addresses that have updates.
|
||||
for address in self.storage_updates.keys() {
|
||||
self.trie.get_or_create_storage_trie_mut(*address);
|
||||
}
|
||||
|
||||
let storage_results: Vec<_> = self
|
||||
.trie
|
||||
.storage_tries()
|
||||
.iter_mut()
|
||||
.filter_map(|(address, trie)| {
|
||||
let updates = self.storage_updates.remove(address)?;
|
||||
let fetched = self.fetched_storage_targets.remove(address).unwrap_or_default();
|
||||
|
||||
Some((address, updates, fetched, trie))
|
||||
})
|
||||
.par_bridge()
|
||||
.map(|(address, mut updates, mut fetched, trie)| {
|
||||
let mut targets = Vec::new();
|
||||
|
||||
trie.update_leaves(&mut updates, |path, min_len| match fetched.entry(path) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if min_len < *entry.get() {
|
||||
entry.insert(min_len);
|
||||
targets.push(Target::new(path).with_min_len(min_len));
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(min_len);
|
||||
targets.push(Target::new(path).with_min_len(min_len));
|
||||
}
|
||||
})?;
|
||||
|
||||
SparseTrieResult::Ok((address, targets, fetched, updates))
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
for (address, targets, fetched, updates) in storage_results {
|
||||
self.fetched_storage_targets.insert(*address, fetched);
|
||||
self.storage_updates.insert(*address, updates);
|
||||
|
||||
if !targets.is_empty() {
|
||||
self.pending_targets.storage_targets.entry(*address).or_default().extend(targets);
|
||||
}
|
||||
}
|
||||
|
||||
// Process account trie updates and fill the account targets.
|
||||
self.trie.trie_mut().update_leaves(
|
||||
&mut self.account_updates,
|
||||
|target, min_len| match self.fetched_account_targets.entry(target) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if min_len < *entry.get() {
|
||||
entry.insert(min_len);
|
||||
self.pending_targets
|
||||
.account_targets
|
||||
.push(Target::new(target).with_min_len(min_len));
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(min_len);
|
||||
self.pending_targets
|
||||
.account_targets
|
||||
.push(Target::new(target).with_min_len(min_len));
|
||||
}
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Applies updates to the sparse trie and dispatches requested multiproof targets.
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
)]
|
||||
fn process_updates(&mut self) -> SparseTrieResult<()> {
|
||||
self.process_leaf_updates()?;
|
||||
|
||||
if self.pending_account_updates.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let roots = self
|
||||
.trie
|
||||
.storage_tries()
|
||||
.par_iter_mut()
|
||||
.filter(|(address, _)| {
|
||||
self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty())
|
||||
})
|
||||
.map(|(address, trie)| {
|
||||
let root =
|
||||
trie.root().expect("updates are drained, trie should be revealed by now");
|
||||
|
||||
(address, root)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for (address, storage_root) in roots {
|
||||
// If the storage root is known and we have a pending update for this account, encode it
|
||||
// into a proper update.
|
||||
if let Entry::Occupied(entry) = self.pending_account_updates.entry(*address) &&
|
||||
entry.get().is_some()
|
||||
{
|
||||
let account = entry.remove().expect("just checked, should be Some");
|
||||
let encoded = if account.is_none_or(|account| account.is_empty()) &&
|
||||
storage_root == EMPTY_ROOT_HASH
|
||||
{
|
||||
Vec::new()
|
||||
} else {
|
||||
// TODO: optimize allocation
|
||||
alloy_rlp::encode(account.unwrap_or_default().into_trie_account(storage_root))
|
||||
};
|
||||
self.account_updates.insert(*address, LeafUpdate::Changed(encoded));
|
||||
}
|
||||
}
|
||||
|
||||
// Now promote pending account updates if possible.
|
||||
self.pending_account_updates.retain(|addr, account| {
|
||||
// If account has pending storage updates, it is still pending.
|
||||
if self.storage_updates.get(addr).is_some_and(|updates| !updates.is_empty()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Get the current account state either from the trie or from latest account update.
|
||||
let trie_account = if let Some(LeafUpdate::Changed(encoded)) = self.account_updates.get(addr) {
|
||||
Some(encoded).filter(|encoded| !encoded.is_empty())
|
||||
} else if !self.account_updates.contains_key(addr) {
|
||||
self.trie.get_account_value(addr)
|
||||
} else {
|
||||
// Needs to be revealed first
|
||||
return true;
|
||||
};
|
||||
|
||||
let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
|
||||
|
||||
let (account, storage_root) = if let Some(account) = account.take() {
|
||||
// If account is Some(_) here it means it didn't have any storage updates
|
||||
// and we can fetch the storage root directly from the account trie.
|
||||
//
|
||||
// If it did have storage updates, we would've had processed it above when iterating over storage tries.
|
||||
let storage_root = trie_account.map(|account| account.storage_root).unwrap_or(EMPTY_ROOT_HASH);
|
||||
|
||||
(account, storage_root)
|
||||
} else {
|
||||
(trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
|
||||
};
|
||||
|
||||
let encoded = if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
|
||||
Vec::new()
|
||||
} else {
|
||||
let account = account.unwrap_or_default().into_trie_account(storage_root);
|
||||
|
||||
// TODO: optimize allocation
|
||||
alloy_rlp::encode(account)
|
||||
};
|
||||
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
|
||||
|
||||
false
|
||||
});
|
||||
|
||||
// Process account trie updates and fill the account targets.
|
||||
self.trie.trie_mut().update_leaves(
|
||||
&mut self.account_updates,
|
||||
|target, min_len| match self.fetched_account_targets.entry(target) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if min_len < *entry.get() {
|
||||
entry.insert(min_len);
|
||||
self.pending_targets
|
||||
.account_targets
|
||||
.push(Target::new(target).with_min_len(min_len));
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(min_len);
|
||||
self.pending_targets
|
||||
.account_targets
|
||||
.push(Target::new(target).with_min_len(min_len));
|
||||
}
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
)]
|
||||
fn dispatch_pending_targets(&mut self) {
|
||||
if !self.pending_targets.is_empty() {
|
||||
let chunking_length = self.pending_targets.chunking_length();
|
||||
dispatch_with_chunking(
|
||||
std::mem::take(&mut self.pending_targets),
|
||||
chunking_length,
|
||||
Some(60),
|
||||
300,
|
||||
self.proof_worker_handle.available_account_workers(),
|
||||
self.proof_worker_handle.available_storage_workers(),
|
||||
MultiProofTargetsV2::chunks,
|
||||
|proof_targets| {
|
||||
if let Err(e) = self.proof_worker_handle.dispatch_account_multiproof(
|
||||
AccountMultiproofInput::V2 {
|
||||
targets: proof_targets,
|
||||
proof_result_sender: ProofResultContext::new(
|
||||
self.proof_result_tx.clone(),
|
||||
0,
|
||||
HashedPostState::default(),
|
||||
Instant::now(),
|
||||
),
|
||||
},
|
||||
) {
|
||||
error!("failed to dispatch account multiproof: {e:?}");
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Outcome of the state root computation, including the state root itself with
|
||||
|
||||
@@ -402,7 +402,12 @@ where
|
||||
.in_scope(|| self.evm_env_for(&input))
|
||||
.map_err(NewPayloadError::other)?;
|
||||
|
||||
let env = ExecutionEnv { evm_env, hash: input.hash(), parent_hash: input.parent_hash() };
|
||||
let env = ExecutionEnv {
|
||||
evm_env,
|
||||
hash: input.hash(),
|
||||
parent_hash: input.parent_hash(),
|
||||
parent_state_root: parent_block.state_root(),
|
||||
};
|
||||
|
||||
// Plan the strategy used for state root computation.
|
||||
let strategy = self.plan_state_root_computation();
|
||||
@@ -1112,10 +1117,13 @@ where
|
||||
/// while the trie input computation is deferred until the overlay is actually needed.
|
||||
///
|
||||
/// If parent is on disk (no in-memory blocks), returns `None` for the lazy overlay.
|
||||
///
|
||||
/// Uses a cached overlay if available for the canonical head (the common case).
|
||||
fn get_parent_lazy_overlay(
|
||||
parent_hash: B256,
|
||||
state: &EngineApiTreeState<N>,
|
||||
) -> (Option<LazyOverlay>, B256) {
|
||||
// Get blocks leading to the parent to determine the anchor
|
||||
let (anchor_hash, blocks) =
|
||||
state.tree_state.blocks_by_hash(parent_hash).unwrap_or_else(|| (parent_hash, vec![]));
|
||||
|
||||
@@ -1124,6 +1132,17 @@ where
|
||||
return (None, anchor_hash);
|
||||
}
|
||||
|
||||
// Try to use the cached overlay if it matches both parent hash and anchor
|
||||
if let Some(cached) = state.tree_state.get_cached_overlay(parent_hash, anchor_hash) {
|
||||
debug!(
|
||||
target: "engine::tree::payload_validator",
|
||||
%parent_hash,
|
||||
%anchor_hash,
|
||||
"Using cached canonical overlay"
|
||||
);
|
||||
return (Some(cached.overlay.clone()), cached.anchor_hash);
|
||||
}
|
||||
|
||||
debug!(
|
||||
target: "engine::tree::payload_validator",
|
||||
%anchor_hash,
|
||||
|
||||
@@ -6,7 +6,7 @@ use alloy_primitives::{
|
||||
map::{HashMap, HashSet},
|
||||
BlockNumber, B256,
|
||||
};
|
||||
use reth_chain_state::{EthPrimitives, ExecutedBlock};
|
||||
use reth_chain_state::{DeferredTrieData, EthPrimitives, ExecutedBlock, LazyOverlay};
|
||||
use reth_primitives_traits::{AlloyBlockHeader, NodePrimitives, SealedHeader};
|
||||
use std::{
|
||||
collections::{btree_map, hash_map, BTreeMap, VecDeque},
|
||||
@@ -38,6 +38,12 @@ pub struct TreeState<N: NodePrimitives = EthPrimitives> {
|
||||
pub(crate) current_canonical_head: BlockNumHash,
|
||||
/// The engine API variant of this handler
|
||||
pub(crate) engine_kind: EngineApiKind,
|
||||
/// Pre-computed lazy overlay for the canonical head.
|
||||
///
|
||||
/// This is optimistically prepared after the canonical head changes, so that
|
||||
/// the next payload building on the canonical head can use it immediately
|
||||
/// without recomputing.
|
||||
pub(crate) cached_canonical_overlay: Option<PreparedCanonicalOverlay>,
|
||||
}
|
||||
|
||||
impl<N: NodePrimitives> TreeState<N> {
|
||||
@@ -49,6 +55,7 @@ impl<N: NodePrimitives> TreeState<N> {
|
||||
current_canonical_head,
|
||||
parent_to_child: HashMap::default(),
|
||||
engine_kind,
|
||||
cached_canonical_overlay: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,6 +99,66 @@ impl<N: NodePrimitives> TreeState<N> {
|
||||
Some((parent_hash, blocks))
|
||||
}
|
||||
|
||||
/// Prepares a cached lazy overlay for the current canonical head.
|
||||
///
|
||||
/// This should be called after the canonical head changes to optimistically
|
||||
/// prepare the overlay for the next payload that will likely build on it.
|
||||
///
|
||||
/// Returns a clone of the [`LazyOverlay`] so the caller can spawn a background
|
||||
/// task to trigger computation via [`LazyOverlay::get`]. This ensures the overlay
|
||||
/// is actually computed before the next payload arrives.
|
||||
pub(crate) fn prepare_canonical_overlay(&mut self) -> Option<LazyOverlay> {
|
||||
let canonical_hash = self.current_canonical_head.hash;
|
||||
|
||||
// Get blocks leading to the canonical head
|
||||
let Some((anchor_hash, blocks)) = self.blocks_by_hash(canonical_hash) else {
|
||||
// Canonical head not in memory (persisted), no overlay needed
|
||||
self.cached_canonical_overlay = None;
|
||||
return None;
|
||||
};
|
||||
|
||||
// Extract deferred trie data handles from blocks (newest to oldest)
|
||||
let handles: Vec<DeferredTrieData> = blocks.iter().map(|b| b.trie_data_handle()).collect();
|
||||
|
||||
let overlay = LazyOverlay::new(anchor_hash, handles);
|
||||
self.cached_canonical_overlay = Some(PreparedCanonicalOverlay {
|
||||
parent_hash: canonical_hash,
|
||||
overlay: overlay.clone(),
|
||||
anchor_hash,
|
||||
});
|
||||
|
||||
debug!(
|
||||
target: "engine::tree",
|
||||
%canonical_hash,
|
||||
%anchor_hash,
|
||||
num_blocks = blocks.len(),
|
||||
"Prepared cached canonical overlay"
|
||||
);
|
||||
|
||||
Some(overlay)
|
||||
}
|
||||
|
||||
/// Returns the cached overlay if it matches the requested parent hash and anchor.
|
||||
///
|
||||
/// Both parent hash and anchor hash must match to ensure the overlay is valid.
|
||||
/// This prevents using a stale overlay after persistence has advanced the anchor.
|
||||
pub(crate) fn get_cached_overlay(
|
||||
&self,
|
||||
parent_hash: B256,
|
||||
expected_anchor: B256,
|
||||
) -> Option<&PreparedCanonicalOverlay> {
|
||||
self.cached_canonical_overlay.as_ref().filter(|cached| {
|
||||
cached.parent_hash == parent_hash && cached.anchor_hash == expected_anchor
|
||||
})
|
||||
}
|
||||
|
||||
/// Invalidates the cached overlay.
|
||||
///
|
||||
/// Should be called when the anchor changes (e.g., after persistence).
|
||||
pub(crate) fn invalidate_cached_overlay(&mut self) {
|
||||
self.cached_canonical_overlay = None;
|
||||
}
|
||||
|
||||
/// Insert executed block into the state.
|
||||
pub(crate) fn insert_executed(&mut self, executed: ExecutedBlock<N>) {
|
||||
let hash = executed.recovered_block().hash();
|
||||
@@ -288,6 +355,9 @@ impl<N: NodePrimitives> TreeState<N> {
|
||||
if let Some(finalized_num_hash) = finalized_num_hash {
|
||||
self.prune_finalized_sidechains(finalized_num_hash);
|
||||
}
|
||||
|
||||
// Invalidate the cached overlay since blocks were removed and the anchor may have changed
|
||||
self.invalidate_cached_overlay();
|
||||
}
|
||||
|
||||
/// Updates the canonical head to the given block.
|
||||
@@ -355,6 +425,39 @@ impl<N: NodePrimitives> TreeState<N> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Pre-computed lazy overlay for the canonical head block.
|
||||
///
|
||||
/// This is prepared **optimistically** when the canonical head changes, allowing
|
||||
/// the next payload (which typically builds on the canonical head) to reuse
|
||||
/// the pre-computed overlay immediately without re-traversing in-memory blocks.
|
||||
///
|
||||
/// The overlay captures deferred trie data handles from all in-memory blocks
|
||||
/// between the canonical head and the persisted anchor. When a new payload
|
||||
/// arrives building on the canonical head, this cached overlay can be used
|
||||
/// directly instead of calling `blocks_by_hash` and collecting handles again.
|
||||
///
|
||||
/// # Invalidation
|
||||
///
|
||||
/// The cached overlay is invalidated when:
|
||||
/// - Persistence completes (anchor changes)
|
||||
/// - The canonical head changes to a different block
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PreparedCanonicalOverlay {
|
||||
/// The block hash for which this overlay is prepared as a parent.
|
||||
///
|
||||
/// When a payload arrives with this parent hash, the overlay can be reused.
|
||||
pub parent_hash: B256,
|
||||
/// The pre-computed lazy overlay containing deferred trie data handles.
|
||||
///
|
||||
/// This is computed optimistically after `set_canonical_head` so subsequent
|
||||
/// payloads don't need to re-collect the handles.
|
||||
pub overlay: LazyOverlay,
|
||||
/// The anchor hash (persisted ancestor) this overlay is based on.
|
||||
///
|
||||
/// Used to verify the overlay is still valid (anchor hasn't changed due to persistence).
|
||||
pub anchor_hash: B256,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -259,6 +259,7 @@ impl TestHarness {
|
||||
current_canonical_head: blocks.last().unwrap().recovered_block().num_hash(),
|
||||
parent_to_child,
|
||||
engine_kind: EngineApiKind::Ethereum,
|
||||
cached_canonical_overlay: None,
|
||||
};
|
||||
|
||||
let last_executed_block = blocks.last().unwrap().clone();
|
||||
|
||||
@@ -174,7 +174,7 @@ where
|
||||
}
|
||||
Commands::P2P(command) => runner.run_until_ctrl_c(command.execute::<N>()),
|
||||
Commands::Config(command) => runner.run_until_ctrl_c(command.execute()),
|
||||
Commands::Prune(command) => runner.run_until_ctrl_c(command.execute::<N>()),
|
||||
Commands::Prune(command) => runner.run_command_until_exit(|ctx| command.execute::<N>(ctx)),
|
||||
#[cfg(feature = "dev")]
|
||||
Commands::TestVectors(command) => runner.run_until_ctrl_c(command.execute()),
|
||||
Commands::ReExecute(command) => runner.run_until_ctrl_c(command.execute::<N>(components)),
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::utils::eth_payload_attributes;
|
||||
use crate::utils::{advance_with_random_transactions, eth_payload_attributes};
|
||||
use alloy_eips::eip7685::RequestsOrHash;
|
||||
use alloy_genesis::Genesis;
|
||||
use alloy_primitives::{Address, B256};
|
||||
@@ -6,8 +6,9 @@ use alloy_rpc_types_engine::{PayloadAttributes, PayloadStatusEnum};
|
||||
use jsonrpsee_core::client::ClientT;
|
||||
use reth_chainspec::{ChainSpecBuilder, EthChainSpec, MAINNET};
|
||||
use reth_e2e_test_utils::{
|
||||
node::NodeTestContext, setup, transaction::TransactionTestContext, wallet::Wallet,
|
||||
node::NodeTestContext, setup, setup_engine, transaction::TransactionTestContext, wallet::Wallet,
|
||||
};
|
||||
use reth_node_api::TreeConfig;
|
||||
use reth_node_builder::{NodeBuilder, NodeHandle};
|
||||
use reth_node_core::{args::RpcServerArgs, node_config::NodeConfig};
|
||||
use reth_node_ethereum::EthereumNode;
|
||||
@@ -256,3 +257,56 @@ async fn test_testing_build_block_v1_osaka() -> eyre::Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Tests that sparse trie allocation reuse works correctly across consecutive blocks.
|
||||
///
|
||||
/// This test exercises the sparse trie allocation reuse path by:
|
||||
/// 1. Starting a node with parallel state root computation enabled
|
||||
/// 2. Advancing multiple consecutive blocks with random transactions
|
||||
/// 3. Verifying that all blocks are successfully validated (state roots match)
|
||||
///
|
||||
/// Note: Trie structure reuse is currently disabled due to pruning creating blinded
|
||||
/// nodes. The preserved trie's allocations are still reused to reduce memory overhead,
|
||||
/// but the trie is cleared between blocks.
|
||||
#[tokio::test]
|
||||
async fn test_sparse_trie_reuse_across_blocks() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
// Use parallel state root (non-legacy) with pruning enabled
|
||||
let tree_config = TreeConfig::default()
|
||||
.with_legacy_state_root(false)
|
||||
.with_sparse_trie_prune_depth(2)
|
||||
.with_sparse_trie_max_storage_tries(100);
|
||||
|
||||
let (mut nodes, _tasks, _wallet) = setup_engine::<EthereumNode>(
|
||||
1,
|
||||
Arc::new(
|
||||
ChainSpecBuilder::default()
|
||||
.chain(MAINNET.chain)
|
||||
.genesis(serde_json::from_str(include_str!("../assets/genesis.json")).unwrap())
|
||||
.cancun_activated()
|
||||
.build(),
|
||||
),
|
||||
false,
|
||||
tree_config,
|
||||
eth_payload_attributes,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut node = nodes.pop().unwrap();
|
||||
|
||||
// Use a seeded RNG for reproducibility
|
||||
let mut rng = rand::rng();
|
||||
|
||||
// Advance multiple consecutive blocks with random transactions.
|
||||
// This exercises the sparse trie reuse path where each block's pruned trie
|
||||
// is reused for the next block's state root computation.
|
||||
let num_blocks = 5;
|
||||
advance_with_random_transactions(&mut node, num_blocks, &mut rng, true).await?;
|
||||
|
||||
// Verify the chain advanced correctly
|
||||
let best_block = node.inner.provider.best_block_number()?;
|
||||
assert_eq!(best_block, num_blocks as u64, "Expected {} blocks, got {}", num_blocks, best_block);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -171,7 +171,7 @@ pub enum SparseTrieErrorKind {
|
||||
/// Path to the node.
|
||||
path: Nibbles,
|
||||
/// Node that was at the path when revealing.
|
||||
node: Box<dyn core::fmt::Debug + Send>,
|
||||
node: Box<dyn core::fmt::Debug + Send + Sync>,
|
||||
},
|
||||
/// RLP error.
|
||||
#[error(transparent)]
|
||||
@@ -184,7 +184,7 @@ pub enum SparseTrieErrorKind {
|
||||
},
|
||||
/// Other.
|
||||
#[error(transparent)]
|
||||
Other(#[from] Box<dyn core::error::Error + Send>),
|
||||
Other(#[from] Box<dyn core::error::Error + Send + Sync>),
|
||||
}
|
||||
|
||||
/// Trie witness errors.
|
||||
|
||||
@@ -83,6 +83,28 @@ impl From<&'static str> for FileClientError {
|
||||
}
|
||||
|
||||
impl<B: FullBlock> FileClient<B> {
|
||||
/// Create a new file client from a slice of sealed blocks.
|
||||
pub fn from_blocks(blocks: impl IntoIterator<Item = SealedBlock<B>>) -> Self {
|
||||
let blocks: Vec<_> = blocks.into_iter().collect();
|
||||
let capacity = blocks.len();
|
||||
|
||||
let mut headers = HashMap::with_capacity(capacity);
|
||||
let mut hash_to_number = HashMap::with_capacity(capacity);
|
||||
let mut bodies = HashMap::with_capacity(capacity);
|
||||
|
||||
for block in blocks {
|
||||
let number = block.number();
|
||||
let hash = block.hash();
|
||||
let (header, body) = block.split_sealed_header_body();
|
||||
|
||||
headers.insert(number, header.into_header());
|
||||
hash_to_number.insert(hash, number);
|
||||
bodies.insert(hash, body);
|
||||
}
|
||||
|
||||
Self { headers, hash_to_number, bodies }
|
||||
}
|
||||
|
||||
/// Create a new file client from a file path.
|
||||
pub async fn new<P: AsRef<Path>>(
|
||||
path: P,
|
||||
|
||||
@@ -68,7 +68,7 @@ pub enum P2PStreamError {
|
||||
SendBufferFull,
|
||||
|
||||
/// Disconnected error.
|
||||
#[error("disconnected")]
|
||||
#[error("disconnected: {0}")]
|
||||
Disconnected(DisconnectReason),
|
||||
|
||||
/// Unknown disconnect reason error.
|
||||
|
||||
@@ -20,7 +20,7 @@ use std::{
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tracing::{debug, trace};
|
||||
use tracing::trace;
|
||||
|
||||
#[cfg_attr(doc, aquamarine::aquamarine)]
|
||||
/// Contains the connectivity related state of the network.
|
||||
@@ -259,7 +259,7 @@ impl<N: NetworkPrimitives> Swarm<N> {
|
||||
if self.sessions.is_valid_fork_id(fork_id) {
|
||||
self.state_mut().peers_mut().set_discovered_fork_id(peer_id, fork_id);
|
||||
} else {
|
||||
debug!(target: "net", ?peer_id, remote_fork_id=?fork_id, our_fork_id=?self.sessions.fork_id(), "fork id mismatch, removing peer");
|
||||
trace!(target: "net", ?peer_id, remote_fork_id=?fork_id, our_fork_id=?self.sessions.fork_id(), "fork id mismatch, removing peer");
|
||||
self.state_mut().peers_mut().remove_peer(peer_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -303,20 +303,8 @@ impl EngineNodeLauncher {
|
||||
// the CL
|
||||
loop {
|
||||
tokio::select! {
|
||||
shutdown_req = &mut shutdown_rx => {
|
||||
if let Ok(req) = shutdown_req {
|
||||
debug!(target: "reth::cli", "received engine shutdown request");
|
||||
engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(
|
||||
FromOrchestrator::Terminate { tx: req.done_tx }.into()
|
||||
);
|
||||
}
|
||||
}
|
||||
payload = built_payloads.select_next_some() => {
|
||||
if let Some(executed_block) = payload.executed_block() {
|
||||
debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(), "inserting built payload");
|
||||
engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block.into_executed_payload()).into());
|
||||
}
|
||||
}
|
||||
biased;
|
||||
|
||||
event = engine_service.next() => {
|
||||
let Some(event) = event else { break };
|
||||
debug!(target: "reth::cli", "Event: {event}");
|
||||
@@ -364,6 +352,20 @@ impl EngineNodeLauncher {
|
||||
}
|
||||
}
|
||||
}
|
||||
payload = built_payloads.select_next_some() => {
|
||||
if let Some(executed_block) = payload.executed_block() {
|
||||
debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(), "inserting built payload");
|
||||
engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block.into_executed_payload()).into());
|
||||
}
|
||||
}
|
||||
shutdown_req = &mut shutdown_rx => {
|
||||
if let Ok(req) = shutdown_req {
|
||||
debug!(target: "reth::cli", "received engine shutdown request");
|
||||
engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(
|
||||
FromOrchestrator::Terminate { tx: req.done_tx }.into()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -37,6 +37,7 @@ pub struct DefaultEngineValues {
|
||||
account_worker_count: Option<usize>,
|
||||
disable_proof_v2: bool,
|
||||
cache_metrics_disabled: bool,
|
||||
enable_sparse_trie_as_cache: bool,
|
||||
}
|
||||
|
||||
impl DefaultEngineValues {
|
||||
@@ -172,6 +173,12 @@ impl DefaultEngineValues {
|
||||
self.cache_metrics_disabled = v;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set whether to enable sparse trie as cache by default
|
||||
pub const fn with_enable_sparse_trie_as_cache(mut self, v: bool) -> Self {
|
||||
self.enable_sparse_trie_as_cache = v;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for DefaultEngineValues {
|
||||
@@ -197,6 +204,7 @@ impl Default for DefaultEngineValues {
|
||||
account_worker_count: None,
|
||||
disable_proof_v2: false,
|
||||
cache_metrics_disabled: false,
|
||||
enable_sparse_trie_as_cache: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -324,6 +332,10 @@ pub struct EngineArgs {
|
||||
/// Disable cache metrics recording, which can take up to 50ms with large cached state.
|
||||
#[arg(long = "engine.disable-cache-metrics", default_value_t = DefaultEngineValues::get_global().cache_metrics_disabled)]
|
||||
pub cache_metrics_disabled: bool,
|
||||
|
||||
/// Enable sparse trie as cache.
|
||||
#[arg(long = "engine.enable-sparse-trie-as-cache", default_value_t = DefaultEngineValues::get_global().enable_sparse_trie_as_cache, conflicts_with = "disable_proof_v2")]
|
||||
pub enable_sparse_trie_as_cache: bool,
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
@@ -350,6 +362,7 @@ impl Default for EngineArgs {
|
||||
account_worker_count,
|
||||
disable_proof_v2,
|
||||
cache_metrics_disabled,
|
||||
enable_sparse_trie_as_cache,
|
||||
} = DefaultEngineValues::get_global().clone();
|
||||
Self {
|
||||
persistence_threshold,
|
||||
@@ -376,6 +389,7 @@ impl Default for EngineArgs {
|
||||
account_worker_count,
|
||||
disable_proof_v2,
|
||||
cache_metrics_disabled,
|
||||
enable_sparse_trie_as_cache,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -412,6 +426,7 @@ impl EngineArgs {
|
||||
|
||||
config = config.with_disable_proof_v2(self.disable_proof_v2);
|
||||
config = config.without_cache_metrics(self.cache_metrics_disabled);
|
||||
config = config.with_enable_sparse_trie_as_cache(self.enable_sparse_trie_as_cache);
|
||||
|
||||
config
|
||||
}
|
||||
@@ -464,6 +479,7 @@ mod tests {
|
||||
account_worker_count: Some(8),
|
||||
disable_proof_v2: false,
|
||||
cache_metrics_disabled: true,
|
||||
enable_sparse_trie_as_cache: false,
|
||||
};
|
||||
|
||||
let parsed_args = CommandParser::<EngineArgs>::parse_from([
|
||||
|
||||
@@ -5,7 +5,9 @@ use alloy_primitives::{Address, BlockNumber};
|
||||
use clap::{builder::RangedU64ValueParser, Args};
|
||||
use reth_chainspec::EthereumHardforks;
|
||||
use reth_config::config::PruneConfig;
|
||||
use reth_prune_types::{PruneMode, PruneModes, ReceiptsLogPruneConfig, MINIMUM_PRUNING_DISTANCE};
|
||||
use reth_prune_types::{
|
||||
PruneMode, PruneModes, ReceiptsLogPruneConfig, MINIMUM_UNWIND_SAFE_DISTANCE,
|
||||
};
|
||||
use std::{collections::BTreeMap, ops::Not, sync::OnceLock};
|
||||
|
||||
/// Global static pruning defaults
|
||||
@@ -68,9 +70,9 @@ impl Default for DefaultPruningValues {
|
||||
full_prune_modes: PruneModes {
|
||||
sender_recovery: Some(PruneMode::Full),
|
||||
transaction_lookup: None,
|
||||
receipts: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
|
||||
account_history: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
|
||||
storage_history: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
|
||||
receipts: Some(PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)),
|
||||
account_history: Some(PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)),
|
||||
storage_history: Some(PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)),
|
||||
// This field is ignored when full_bodies_history_use_pre_merge is true
|
||||
bodies_history: None,
|
||||
receipts_log_filter: Default::default(),
|
||||
@@ -80,9 +82,9 @@ impl Default for DefaultPruningValues {
|
||||
sender_recovery: Some(PruneMode::Full),
|
||||
transaction_lookup: Some(PruneMode::Full),
|
||||
receipts: Some(PruneMode::Full),
|
||||
account_history: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
|
||||
storage_history: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
|
||||
bodies_history: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
|
||||
account_history: Some(PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)),
|
||||
storage_history: Some(PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)),
|
||||
bodies_history: Some(PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)),
|
||||
receipts_log_filter: Default::default(),
|
||||
},
|
||||
}
|
||||
@@ -93,7 +95,8 @@ impl Default for DefaultPruningValues {
|
||||
#[derive(Debug, Clone, Args, PartialEq, Eq, Default)]
|
||||
#[command(next_help_heading = "Pruning")]
|
||||
pub struct PruningArgs {
|
||||
/// Run full node. Only the most recent [`MINIMUM_PRUNING_DISTANCE`] block states are stored.
|
||||
/// Run full node. Only the most recent [`MINIMUM_UNWIND_SAFE_DISTANCE`] block states are
|
||||
/// stored.
|
||||
#[arg(long, default_value_t = false, conflicts_with = "minimal")]
|
||||
pub full: bool,
|
||||
|
||||
|
||||
@@ -645,7 +645,7 @@ pub struct RpcServerArgs {
|
||||
///
|
||||
/// When enabled, transactions that fail execution will be skipped, and all subsequent
|
||||
/// transactions from the same sender will also be skipped.
|
||||
#[arg(long = "testing.skip-invalid-transactions", default_value_t = false)]
|
||||
#[arg(long = "testing.skip-invalid-transactions", default_value_t = true)]
|
||||
pub testing_skip_invalid_transactions: bool,
|
||||
}
|
||||
|
||||
@@ -859,7 +859,7 @@ impl Default for RpcServerArgs {
|
||||
rpc_state_cache,
|
||||
gas_price_oracle,
|
||||
rpc_send_raw_transaction_sync_timeout,
|
||||
testing_skip_invalid_transactions: false,
|
||||
testing_skip_invalid_transactions: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,15 +13,23 @@ pub(crate) struct EthstatsCredentials {
|
||||
pub secret: String,
|
||||
/// Host address of the `EthStats` server
|
||||
pub host: String,
|
||||
/// Whether to use secure `WebSocket` (`WSS`) connection
|
||||
pub use_tls: bool,
|
||||
}
|
||||
|
||||
impl FromStr for EthstatsCredentials {
|
||||
type Err = EthStatsError;
|
||||
|
||||
/// Parse credentials from a string in the format "`node_id:secret@host`"
|
||||
/// Parse credentials from a string in the format "`node_id:secret@host`" or
|
||||
/// "`node_id:secret@wss://host`"
|
||||
///
|
||||
/// Supports the following formats:
|
||||
/// - `node_id:secret@host` - Uses plain `WebSocket` (`ws://`)
|
||||
/// - `node_id:secret@ws://host` - Explicitly use plain `WebSocket`
|
||||
/// - `node_id:secret@wss://host` - Use secure `WebSocket` (`WSS`)
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `s` - String containing credentials in the format "`node_id:secret@host`"
|
||||
/// * `s` - String containing credentials
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Ok(EthstatsCredentials)` - Successfully parsed credentials
|
||||
@@ -32,7 +40,7 @@ impl FromStr for EthstatsCredentials {
|
||||
return Err(EthStatsError::InvalidUrl("Missing '@' separator".to_string()));
|
||||
}
|
||||
let creds = parts[0];
|
||||
let host = parts[1].to_string();
|
||||
let mut host = parts[1].to_string();
|
||||
let creds_parts: Vec<&str> = creds.split(':').collect();
|
||||
if creds_parts.len() != 2 {
|
||||
return Err(EthStatsError::InvalidUrl(
|
||||
@@ -42,6 +50,16 @@ impl FromStr for EthstatsCredentials {
|
||||
let node_id = creds_parts[0].to_string();
|
||||
let secret = creds_parts[1].to_string();
|
||||
|
||||
Ok(Self { node_id, secret, host })
|
||||
// Detect and strip protocol prefix if present
|
||||
let mut use_tls = false;
|
||||
if let Some(stripped) = host.strip_prefix("wss://") {
|
||||
use_tls = true;
|
||||
host = stripped.to_string();
|
||||
} else if let Some(stripped) = host.strip_prefix("ws://") {
|
||||
use_tls = false;
|
||||
host = stripped.to_string();
|
||||
}
|
||||
|
||||
Ok(Self { node_id, secret, host, use_tls })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,13 +102,15 @@ where
|
||||
/// Establish `WebSocket` connection to the `EthStats` server
|
||||
///
|
||||
/// Attempts to connect to the server using the credentials and handles
|
||||
/// connection timeouts and errors.
|
||||
/// connection timeouts and errors. Uses either `ws://` or `wss://` based
|
||||
/// on the credentials configuration.
|
||||
async fn connect(&self) -> Result<(), EthStatsError> {
|
||||
debug!(
|
||||
target: "ethstats",
|
||||
"Attempting to connect to EthStats server at {}", self.credentials.host
|
||||
);
|
||||
let full_url = format!("ws://{}/api", self.credentials.host);
|
||||
let protocol = if self.credentials.use_tls { "wss" } else { "ws" };
|
||||
let full_url = format!("{}://{}/api", protocol, self.credentials.host);
|
||||
let url = Url::parse(&full_url).map_err(EthStatsError::Url)?;
|
||||
|
||||
match timeout(CONNECT_TIMEOUT, connect_async(url.as_str())).await {
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
//! OP stack variation of chain spec constants.
|
||||
|
||||
use alloy_primitives::hex;
|
||||
|
||||
//------------------------------- BASE MAINNET -------------------------------//
|
||||
|
||||
/// Max gas limit on Base: <https://basescan.org/block/17208876>
|
||||
@@ -11,13 +9,3 @@ pub const BASE_MAINNET_MAX_GAS_LIMIT: u64 = 105_000_000;
|
||||
|
||||
/// Max gas limit on Base Sepolia: <https://sepolia.basescan.org/block/12506483>
|
||||
pub const BASE_SEPOLIA_MAX_GAS_LIMIT: u64 = 45_000_000;
|
||||
|
||||
//----------------------------------- DEV ------------------------------------//
|
||||
|
||||
/// Dummy system transaction for dev mode
|
||||
/// OP Mainnet transaction at index 0 in block 124665056.
|
||||
///
|
||||
/// <https://optimistic.etherscan.io/tx/0x312e290cf36df704a2217b015d6455396830b0ce678b860ebfcc30f41403d7b1>
|
||||
pub const TX_SET_L1_BLOCK_OP_MAINNET_BLOCK_124665056: [u8; 251] = hex!(
|
||||
"7ef8f8a0683079df94aa5b9cf86687d739a60a9b4f0835e520ec4d664e2e415dca17a6df94deaddeaddeaddeaddeaddeaddeaddeaddead00019442000000000000000000000000000000000000158080830f424080b8a4440a5e200000146b000f79c500000000000000040000000066d052e700000000013ad8a3000000000000000000000000000000000000000000000000000000003ef1278700000000000000000000000000000000000000000000000000000000000000012fdf87b89884a61e74b322bbcf60386f543bfae7827725efaaf0ab1de2294a590000000000000000000000006887246668a3b87f54deb3b94ba47a6f63f32985"
|
||||
);
|
||||
|
||||
@@ -106,7 +106,9 @@ where
|
||||
}
|
||||
Commands::P2P(command) => runner.run_until_ctrl_c(command.execute::<OpNode>()),
|
||||
Commands::Config(command) => runner.run_until_ctrl_c(command.execute()),
|
||||
Commands::Prune(command) => runner.run_until_ctrl_c(command.execute::<OpNode>()),
|
||||
Commands::Prune(command) => {
|
||||
runner.run_command_until_exit(|ctx| command.execute::<OpNode>(ctx))
|
||||
}
|
||||
#[cfg(feature = "dev")]
|
||||
Commands::TestVectors(command) => runner.run_until_ctrl_c(command.execute()),
|
||||
Commands::ReExecute(command) => {
|
||||
|
||||
@@ -4,14 +4,17 @@ use crate::{OpEthApi, OpEthApiError, SequencerClient};
|
||||
use alloy_primitives::{Bytes, B256};
|
||||
use alloy_rpc_types_eth::TransactionInfo;
|
||||
use futures::StreamExt;
|
||||
use op_alloy_consensus::{transaction::OpTransactionInfo, OpTransaction};
|
||||
use op_alloy_consensus::{
|
||||
transaction::{OpDepositInfo, OpTransactionInfo},
|
||||
OpTransaction,
|
||||
};
|
||||
use reth_chain_state::CanonStateSubscriptions;
|
||||
use reth_optimism_primitives::DepositReceipt;
|
||||
use reth_primitives_traits::{Recovered, SignedTransaction, SignerRecoverable, WithEncoded};
|
||||
use reth_rpc_eth_api::{
|
||||
helpers::{spec::SignersForRpc, EthTransactions, LoadReceipt, LoadTransaction, SpawnBlocking},
|
||||
try_into_op_tx_info, EthApiTypes as _, FromEthApiError, FromEvmError, RpcConvert, RpcNodeCore,
|
||||
RpcReceipt, TxInfoMapper,
|
||||
EthApiTypes as _, FromEthApiError, FromEvmError, RpcConvert, RpcNodeCore, RpcReceipt,
|
||||
TxInfoMapper,
|
||||
};
|
||||
use reth_rpc_eth_types::{block::convert_transaction_receipt, EthApiError, TransactionSource};
|
||||
use reth_storage_api::{errors::ProviderError, ProviderTx, ReceiptProvider, TransactionsProvider};
|
||||
@@ -282,6 +285,18 @@ where
|
||||
type Err = ProviderError;
|
||||
|
||||
fn try_map(&self, tx: &T, tx_info: TransactionInfo) -> Result<Self::Out, ProviderError> {
|
||||
try_into_op_tx_info(&self.provider, tx, tx_info)
|
||||
let deposit_meta = if tx.is_deposit() {
|
||||
self.provider.receipt_by_hash(*tx.tx_hash())?.and_then(|receipt| {
|
||||
receipt.as_deposit_receipt().map(|receipt| OpDepositInfo {
|
||||
deposit_receipt_version: receipt.deposit_receipt_version,
|
||||
deposit_nonce: receipt.deposit_nonce,
|
||||
})
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
.unwrap_or_default();
|
||||
|
||||
Ok(OpTransactionInfo::new(tx_info, deposit_meta))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,6 +89,12 @@ impl From<revm_state::Account> for Account {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TrieAccount> for Account {
|
||||
fn from(value: TrieAccount) -> Self {
|
||||
Self { balance: value.balance, nonce: value.nonce, bytecode_hash: Some(value.code_hash) }
|
||||
}
|
||||
}
|
||||
|
||||
impl InMemorySize for Account {
|
||||
fn size(&self) -> usize {
|
||||
size_of::<Self>()
|
||||
|
||||
@@ -11,7 +11,7 @@ use reth_provider::{
|
||||
};
|
||||
use reth_prune_types::{
|
||||
PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, ReceiptsLogPruneConfig, SegmentOutput,
|
||||
MINIMUM_PRUNING_DISTANCE,
|
||||
MINIMUM_UNWIND_SAFE_DISTANCE,
|
||||
};
|
||||
use tracing::{instrument, trace};
|
||||
#[derive(Debug)]
|
||||
@@ -49,8 +49,8 @@ where
|
||||
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
|
||||
// Contract log filtering removes every receipt possible except the ones in the list. So,
|
||||
// for the other receipts it's as if they had a `PruneMode::Distance()` of
|
||||
// `MINIMUM_PRUNING_DISTANCE`.
|
||||
let to_block = PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)
|
||||
// `MINIMUM_UNWIND_SAFE_DISTANCE`.
|
||||
let to_block = PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE)
|
||||
.prune_target_block(input.to_block, PruneSegment::ContractLogs, PrunePurpose::User)?
|
||||
.map(|(bn, _)| bn)
|
||||
.unwrap_or_default();
|
||||
|
||||
@@ -1,14 +1,18 @@
|
||||
use crate::{
|
||||
db_ext::DbTxPruneExt,
|
||||
segments::{PruneInput, Segment},
|
||||
segments::{self, PruneInput, Segment},
|
||||
PrunerError,
|
||||
};
|
||||
use reth_db_api::{tables, transaction::DbTxMut};
|
||||
use reth_provider::{BlockReader, DBProvider, TransactionsProvider};
|
||||
use reth_provider::{
|
||||
BlockReader, DBProvider, EitherWriterDestination, StaticFileProviderFactory,
|
||||
StorageSettingsCache, TransactionsProvider,
|
||||
};
|
||||
use reth_prune_types::{
|
||||
PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
|
||||
};
|
||||
use tracing::{instrument, trace};
|
||||
use reth_static_file_types::StaticFileSegment;
|
||||
use tracing::{debug, instrument, trace};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SenderRecovery {
|
||||
@@ -23,7 +27,11 @@ impl SenderRecovery {
|
||||
|
||||
impl<Provider> Segment<Provider> for SenderRecovery
|
||||
where
|
||||
Provider: DBProvider<Tx: DbTxMut> + TransactionsProvider + BlockReader,
|
||||
Provider: DBProvider<Tx: DbTxMut>
|
||||
+ TransactionsProvider
|
||||
+ BlockReader
|
||||
+ StorageSettingsCache
|
||||
+ StaticFileProviderFactory,
|
||||
{
|
||||
fn segment(&self) -> PruneSegment {
|
||||
PruneSegment::SenderRecovery
|
||||
@@ -39,6 +47,16 @@ where
|
||||
|
||||
#[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))]
|
||||
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
|
||||
if EitherWriterDestination::senders(provider).is_static_file() {
|
||||
debug!(target: "pruner", "Pruning transaction senders from static files.");
|
||||
return segments::prune_static_files(
|
||||
provider,
|
||||
input,
|
||||
StaticFileSegment::TransactionSenders,
|
||||
)
|
||||
}
|
||||
debug!(target: "pruner", "Pruning transaction senders from database.");
|
||||
|
||||
let tx_range = match input.get_next_tx_num_range(provider)? {
|
||||
Some(range) => range,
|
||||
None => {
|
||||
|
||||
@@ -30,7 +30,9 @@ pub use pruner::{
|
||||
SegmentOutputCheckpoint,
|
||||
};
|
||||
pub use segment::{PrunePurpose, PruneSegment, PruneSegmentError};
|
||||
pub use target::{PruneModes, UnwindTargetPrunedError, MINIMUM_PRUNING_DISTANCE};
|
||||
pub use target::{
|
||||
PruneModes, UnwindTargetPrunedError, MINIMUM_DISTANCE, MINIMUM_UNWIND_SAFE_DISTANCE,
|
||||
};
|
||||
|
||||
/// Configuration for pruning receipts not associated with logs emitted by the specified contracts.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Default)]
|
||||
|
||||
@@ -41,8 +41,12 @@ impl PruneMode {
|
||||
segment: PruneSegment,
|
||||
purpose: PrunePurpose,
|
||||
) -> Result<Option<(BlockNumber, Self)>, PruneSegmentError> {
|
||||
let min_blocks = segment.min_blocks();
|
||||
let result = match self {
|
||||
Self::Full if segment.min_blocks() == 0 => Some((tip, *self)),
|
||||
Self::Full if min_blocks == 0 => Some((tip, *self)),
|
||||
// For segments with min_blocks > 0, Full mode behaves like Distance(min_blocks)
|
||||
Self::Full if min_blocks <= tip => Some((tip - min_blocks, *self)),
|
||||
Self::Full => None, // Nothing to prune yet
|
||||
Self::Distance(distance) if *distance > tip => None, // Nothing to prune yet
|
||||
Self::Distance(distance) if *distance >= segment.min_blocks() => {
|
||||
Some((tip - distance, *self))
|
||||
@@ -84,9 +88,7 @@ impl PruneMode {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{
|
||||
PruneMode, PrunePurpose, PruneSegment, PruneSegmentError, MINIMUM_PRUNING_DISTANCE,
|
||||
};
|
||||
use crate::{PruneMode, PrunePurpose, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE};
|
||||
use assert_matches::assert_matches;
|
||||
use serde::Deserialize;
|
||||
|
||||
@@ -96,8 +98,8 @@ mod tests {
|
||||
let segment = PruneSegment::AccountHistory;
|
||||
|
||||
let tests = vec![
|
||||
// MINIMUM_PRUNING_DISTANCE makes this impossible
|
||||
(PruneMode::Full, Err(PruneSegmentError::Configuration(segment))),
|
||||
// Full mode with min_blocks > 0 behaves like Distance(min_blocks)
|
||||
(PruneMode::Full, Ok(Some(tip - segment.min_blocks()))),
|
||||
// Nothing to prune
|
||||
(PruneMode::Distance(tip + 1), Ok(None)),
|
||||
(
|
||||
@@ -107,12 +109,12 @@ mod tests {
|
||||
// Nothing to prune
|
||||
(PruneMode::Before(tip + 1), Ok(None)),
|
||||
(
|
||||
PruneMode::Before(tip - MINIMUM_PRUNING_DISTANCE),
|
||||
Ok(Some(tip - MINIMUM_PRUNING_DISTANCE - 1)),
|
||||
PruneMode::Before(tip - MINIMUM_UNWIND_SAFE_DISTANCE),
|
||||
Ok(Some(tip - MINIMUM_UNWIND_SAFE_DISTANCE - 1)),
|
||||
),
|
||||
(
|
||||
PruneMode::Before(tip - MINIMUM_PRUNING_DISTANCE - 1),
|
||||
Ok(Some(tip - MINIMUM_PRUNING_DISTANCE - 2)),
|
||||
PruneMode::Before(tip - MINIMUM_UNWIND_SAFE_DISTANCE - 1),
|
||||
Ok(Some(tip - MINIMUM_UNWIND_SAFE_DISTANCE - 2)),
|
||||
),
|
||||
// Nothing to prune
|
||||
(PruneMode::Before(tip - 1), Ok(None)),
|
||||
@@ -146,13 +148,13 @@ mod tests {
|
||||
let tests = vec![
|
||||
(PruneMode::Distance(tip + 1), 1, !should_prune),
|
||||
(
|
||||
PruneMode::Distance(MINIMUM_PRUNING_DISTANCE + 1),
|
||||
tip - MINIMUM_PRUNING_DISTANCE - 1,
|
||||
PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE + 1),
|
||||
tip - MINIMUM_UNWIND_SAFE_DISTANCE - 1,
|
||||
!should_prune,
|
||||
),
|
||||
(
|
||||
PruneMode::Distance(MINIMUM_PRUNING_DISTANCE + 1),
|
||||
tip - MINIMUM_PRUNING_DISTANCE - 2,
|
||||
PruneMode::Distance(MINIMUM_UNWIND_SAFE_DISTANCE + 1),
|
||||
tip - MINIMUM_UNWIND_SAFE_DISTANCE - 2,
|
||||
should_prune,
|
||||
),
|
||||
(PruneMode::Before(tip + 1), 1, should_prune),
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#![allow(deprecated)] // necessary to all defining deprecated `PruneSegment` variants
|
||||
|
||||
use crate::MINIMUM_PRUNING_DISTANCE;
|
||||
use crate::{MINIMUM_DISTANCE, MINIMUM_UNWIND_SAFE_DISTANCE};
|
||||
use derive_more::Display;
|
||||
use strum::{EnumIter, IntoEnumIterator};
|
||||
use thiserror::Error;
|
||||
@@ -65,9 +65,10 @@ impl PruneSegment {
|
||||
/// Returns minimum number of blocks to keep in the database for this segment.
|
||||
pub const fn min_blocks(&self) -> u64 {
|
||||
match self {
|
||||
Self::SenderRecovery | Self::TransactionLookup | Self::Receipts | Self::Bodies => 0,
|
||||
Self::SenderRecovery | Self::TransactionLookup => 0,
|
||||
Self::Receipts | Self::Bodies => MINIMUM_DISTANCE,
|
||||
Self::ContractLogs | Self::AccountHistory | Self::StorageHistory => {
|
||||
MINIMUM_PRUNING_DISTANCE
|
||||
MINIMUM_UNWIND_SAFE_DISTANCE
|
||||
}
|
||||
#[expect(deprecated)]
|
||||
#[expect(clippy::match_same_arms)]
|
||||
|
||||
@@ -9,7 +9,12 @@ use crate::{PruneCheckpoint, PruneMode, PruneSegment, ReceiptsLogPruneConfig};
|
||||
/// consensus protocol.
|
||||
/// 2. Another 10k blocks to have a room for maneuver in case when things go wrong and a manual
|
||||
/// unwind is required.
|
||||
pub const MINIMUM_PRUNING_DISTANCE: u64 = 32 * 2 + 10_000;
|
||||
pub const MINIMUM_UNWIND_SAFE_DISTANCE: u64 = 32 * 2 + 10_000;
|
||||
|
||||
/// Minimum blocks to retain for receipts and bodies to ensure reorg safety.
|
||||
/// This prevents pruning data that may be needed when handling chain reorganizations,
|
||||
/// specifically when `canonical_block_by_hash` needs to reconstruct `ExecutedBlock` from disk.
|
||||
pub const MINIMUM_DISTANCE: u64 = 64;
|
||||
|
||||
/// Type of history that can be pruned
|
||||
#[derive(Debug, Error, PartialEq, Eq, Clone)]
|
||||
@@ -56,7 +61,7 @@ pub struct PruneModes {
|
||||
any(test, feature = "serde"),
|
||||
serde(
|
||||
skip_serializing_if = "Option::is_none",
|
||||
deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<MINIMUM_PRUNING_DISTANCE, _>"
|
||||
deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<MINIMUM_UNWIND_SAFE_DISTANCE, _>"
|
||||
)
|
||||
)]
|
||||
pub account_history: Option<PruneMode>,
|
||||
@@ -65,7 +70,7 @@ pub struct PruneModes {
|
||||
any(test, feature = "serde"),
|
||||
serde(
|
||||
skip_serializing_if = "Option::is_none",
|
||||
deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<MINIMUM_PRUNING_DISTANCE, _>"
|
||||
deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<MINIMUM_UNWIND_SAFE_DISTANCE, _>"
|
||||
)
|
||||
)]
|
||||
pub storage_history: Option<PruneMode>,
|
||||
|
||||
@@ -117,10 +117,7 @@ impl tokio_util::codec::Decoder for StreamCodec {
|
||||
buf.advance(start_idx);
|
||||
}
|
||||
let bts = buf.split_to(idx + 1 - start_idx);
|
||||
return match String::from_utf8(bts.into()) {
|
||||
Ok(val) => Ok(Some(val)),
|
||||
Err(_) => Ok(None),
|
||||
}
|
||||
return Ok(String::from_utf8(bts.into()).ok())
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
|
||||
@@ -14,7 +14,6 @@ workspace = true
|
||||
[dependencies]
|
||||
# reth
|
||||
reth-primitives-traits.workspace = true
|
||||
reth-storage-api = { workspace = true, optional = true }
|
||||
reth-evm.workspace = true
|
||||
reth-ethereum-primitives.workspace = true
|
||||
|
||||
@@ -31,7 +30,6 @@ alloy-evm = { workspace = true, features = ["rpc"] }
|
||||
op-alloy-consensus = { workspace = true, optional = true }
|
||||
op-alloy-rpc-types = { workspace = true, optional = true }
|
||||
op-alloy-network = { workspace = true, optional = true }
|
||||
reth-optimism-primitives = { workspace = true, optional = true }
|
||||
|
||||
# io
|
||||
jsonrpsee-types.workspace = true
|
||||
@@ -51,8 +49,6 @@ op = [
|
||||
"dep:op-alloy-consensus",
|
||||
"dep:op-alloy-rpc-types",
|
||||
"dep:op-alloy-network",
|
||||
"dep:reth-optimism-primitives",
|
||||
"dep:reth-storage-api",
|
||||
"reth-evm/op",
|
||||
"reth-primitives-traits/op",
|
||||
"alloy-evm/op",
|
||||
|
||||
@@ -24,6 +24,3 @@ pub use transaction::{
|
||||
};
|
||||
|
||||
pub use alloy_evm::rpc::{CallFees, CallFeesError, EthTxEnvError, TryIntoTxEnv};
|
||||
|
||||
#[cfg(feature = "op")]
|
||||
pub use transaction::op::*;
|
||||
|
||||
@@ -29,7 +29,7 @@ impl TryFromReceiptResponse<alloy_network::Ethereum> for reth_ethereum_primitive
|
||||
}
|
||||
|
||||
#[cfg(feature = "op")]
|
||||
impl TryFromReceiptResponse<op_alloy_network::Optimism> for reth_optimism_primitives::OpReceipt {
|
||||
impl TryFromReceiptResponse<op_alloy_network::Optimism> for op_alloy_consensus::OpReceipt {
|
||||
type Error = Infallible;
|
||||
|
||||
fn from_receipt_response(
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user