Compare commits

..

8 Commits

Author SHA1 Message Date
yongkangc
7e80c3cac2 merrge 2025-12-15 05:05:50 +00:00
yongkangc
6e8d62617d fix(multiproof): sr mismatvch
- Updated the handling of state updates to ensure that removals are recorded before merging updates, preserving the integrity of intermediate deletions.
- Introduced a new method, `record_removals`, to track monotonic removals from EVM state updates, ensuring that once a key is marked as removed, it remains so for proof invalidation.
- Improved comments for clarity on the removal tracking process and its implications for state updates.
2025-12-15 02:52:33 +00:00
yongkangc
866b8ded5f refactor(multiproof): streamline state update message handling
- Consolidated the sending of MultiProofMessage::StateUpdate to improve readability.
- Updated comments for clarity regarding the batch source logic in tests, ensuring it reflects the expected behavior of the PreBlock source.
2025-12-10 09:41:23 +00:00
yongkangc
51985e249c refactor(multiproof): batch consecutive
- Simplified the batching logic for state updates by removing unnecessary checks and consolidating the handling of different source types.
- Updated tests to verify that state updates from various sources can be batched together correctly while respecting the target limits.
- Improved clarity and maintainability of the code by refining comments and restructuring the logic for merging updates.
2025-12-10 09:41:23 +00:00
yongkangc
5ac911b707 refactor(engine): extract multiproof batch context into structs
Extract &mut parameters from process_multiproof_message into:
- MultiproofBatchCtx: core processing state (pending_msg, timing, updates_finished)
- MultiproofBatchMetrics: counters for proofs processed/requested

This improves code organization and reduces function parameter count.
2025-12-10 09:41:23 +00:00
yongkangc
e9a5a11a9f fix(engine): rename outcome to num_chunks for clarity
Addresses reviewer nit: the variable returned from dispatch_with_chunking
represents number of chunks, so the name should reflect that.
2025-12-10 09:41:23 +00:00
yongkangc
7dd14651e4 revert comparison 2025-12-10 09:41:22 +00:00
yongkangc
51ef406b94 Add bench compare latency stats 2025-12-10 09:38:03 +00:00
125 changed files with 1341 additions and 4199 deletions

View File

@@ -7,7 +7,7 @@ sim="${1}"
limit="${2}"
run_hive() {
hive --sim "${sim}" --sim.limit "${limit}" --sim.parallelism 16 --client reth 2>&1 | tee /tmp/log || true
hive --sim "${sim}" --sim.limit "${limit}" --sim.parallelism 8 --client reth 2>&1 | tee /tmp/log || true
}
check_log() {

View File

@@ -11,7 +11,6 @@ env:
CARGO_TERM_COLOR: always
BASELINE: base
SEED: reth
RUSTC_WRAPPER: "sccache"
name: bench
jobs:
@@ -23,7 +22,6 @@ jobs:
submodules: true
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true

View File

@@ -10,12 +10,9 @@ on:
types: [opened, reopened, synchronize, closed]
merge_group:
env:
RUSTC_WRAPPER: "sccache"
jobs:
build:
runs-on: depot-ubuntu-latest-8
runs-on: depot-ubuntu-latest-16
timeout-minutes: 90
steps:
- name: Checkout
@@ -36,8 +33,6 @@ jobs:
- name: Install Rust nightly
uses: dtolnay/rust-toolchain@nightly
- uses: mozilla-actions/sccache-action@v0.0.9
- name: Build docs
run: cd docs/vocs && bash scripts/build-cargo-docs.sh

View File

@@ -13,12 +13,11 @@ on:
env:
CARGO_TERM_COLOR: always
RUSTC_WRAPPER: "sccache"
name: compact-codec
jobs:
compact-codec:
runs-on: depot-ubuntu-latest
runs-on: depot-ubuntu-latest-16
strategy:
matrix:
bin:
@@ -27,7 +26,6 @@ jobs:
steps:
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true

View File

@@ -11,7 +11,6 @@ on:
env:
CARGO_TERM_COLOR: always
SEED: rustethereumethereumrust
RUSTC_WRAPPER: "sccache"
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
@@ -20,14 +19,13 @@ concurrency:
jobs:
test:
name: e2e-testsuite
runs-on: depot-ubuntu-latest-4
runs-on: depot-ubuntu-latest-16
env:
RUST_BACKTRACE: 1
timeout-minutes: 90
steps:
- uses: actions/checkout@v6
- uses: dtolnay/rust-toolchain@stable
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: taiki-e/install-action@nextest
- uses: Swatinem/rust-cache@v2
with:

View File

@@ -24,7 +24,7 @@ jobs:
prepare-hive:
if: github.repository == 'paradigmxyz/reth'
timeout-minutes: 45
runs-on: depot-ubuntu-latest-16
runs-on: depot-ubuntu-latest
steps:
- uses: actions/checkout@v6
- name: Checkout hive tests
@@ -44,7 +44,7 @@ jobs:
- name: Restore hive assets cache
id: cache-hive
uses: actions/cache@v5
uses: actions/cache@v4
with:
path: ./hive_assets
key: hive-assets-${{ steps.hive-commit.outputs.hash }}-${{ hashFiles('.github/assets/hive/build_simulators.sh') }}
@@ -178,7 +178,7 @@ jobs:
- prepare-reth
- prepare-hive
name: run ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }}
runs-on: depot-ubuntu-latest-16
runs-on: depot-ubuntu-latest
permissions:
issues: write
steps:
@@ -245,7 +245,7 @@ jobs:
notify-on-error:
needs: test
if: failure()
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest
steps:
- name: Slack Webhook Action
uses: rtCamp/action-slack-notify@v2

View File

@@ -14,7 +14,6 @@ on:
env:
CARGO_TERM_COLOR: always
SEED: rustethereumethereumrust
RUSTC_WRAPPER: "sccache"
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
@@ -24,7 +23,7 @@ jobs:
test:
name: test / ${{ matrix.network }}
if: github.event_name != 'schedule'
runs-on: depot-ubuntu-latest-4
runs-on: depot-ubuntu-latest-16
env:
RUST_BACKTRACE: 1
strategy:
@@ -38,7 +37,6 @@ jobs:
- name: Install Geth
run: .github/assets/install_geth.sh
- uses: taiki-e/install-action@nextest
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
@@ -76,7 +74,6 @@ jobs:
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: taiki-e/install-action@nextest
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true

View File

@@ -85,7 +85,7 @@ jobs:
notify-on-error:
needs: test
if: failure()
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest
steps:
- name: Slack Webhook Action
uses: rtCamp/action-slack-notify@v2

View File

@@ -58,7 +58,7 @@ jobs:
notify-on-error:
needs: test
if: failure()
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest
steps:
- name: Slack Webhook Action
uses: rtCamp/action-slack-notify@v2

View File

@@ -8,7 +8,6 @@ on:
env:
CARGO_TERM_COLOR: always
RUSTC_WRAPPER: "sccache"
jobs:
clippy-binaries:
@@ -27,7 +26,6 @@ jobs:
- uses: dtolnay/rust-toolchain@clippy
with:
components: clippy
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
@@ -42,7 +40,7 @@ jobs:
clippy:
name: clippy
runs-on: depot-ubuntu-latest
runs-on: depot-ubuntu-latest-16
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -50,7 +48,6 @@ jobs:
- uses: dtolnay/rust-toolchain@nightly
with:
components: clippy
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
@@ -68,7 +65,6 @@ jobs:
with:
target: wasm32-wasip1
- uses: taiki-e/install-action@cargo-hack
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
@@ -88,7 +84,6 @@ jobs:
with:
target: riscv32imac-unknown-none-elf
- uses: taiki-e/install-action@cargo-hack
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
@@ -98,18 +93,17 @@ jobs:
crate-checks:
name: crate-checks (${{ matrix.partition }}/${{ matrix.total_partitions }})
runs-on: depot-ubuntu-latest-4
runs-on: depot-ubuntu-latest-16
strategy:
matrix:
partition: [1, 2, 3]
total_partitions: [3]
partition: [1, 2]
total_partitions: [2]
timeout-minutes: 60
steps:
- uses: actions/checkout@v6
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: taiki-e/install-action@cargo-hack
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
@@ -130,7 +124,6 @@ jobs:
- uses: dtolnay/rust-toolchain@master
with:
toolchain: "1.88" # MSRV
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
@@ -140,13 +133,12 @@ jobs:
docs:
name: docs
runs-on: depot-ubuntu-latest-4
runs-on: depot-ubuntu-latest-16
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@nightly
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
@@ -166,19 +158,17 @@ jobs:
- uses: dtolnay/rust-toolchain@nightly
with:
components: rustfmt
- uses: mozilla-actions/sccache-action@v0.0.9
- name: Run fmt
run: cargo fmt --all --check
udeps:
name: udeps
runs-on: depot-ubuntu-latest
runs-on: depot-ubuntu-latest-16
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@nightly
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
@@ -187,13 +177,12 @@ jobs:
book:
name: book
runs-on: depot-ubuntu-latest
runs-on: depot-ubuntu-latest-16
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@nightly
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
@@ -246,7 +235,7 @@ jobs:
# Checks that selected crates can compile with power set of features
features:
name: features (${{ matrix.partition }}/${{ matrix.total_partitions }})
runs-on: depot-ubuntu-latest
runs-on: depot-ubuntu-latest-16
strategy:
matrix:
partition: [1, 2]
@@ -256,7 +245,6 @@ jobs:
- uses: actions/checkout@v6
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@clippy
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
@@ -279,7 +267,6 @@ jobs:
steps:
- uses: actions/checkout@v6
- uses: dtolnay/rust-toolchain@stable
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: rui314/setup-mold@v1
- uses: taiki-e/cache-cargo-install-action@v2
with:

View File

@@ -22,7 +22,6 @@ env:
CARGO_TERM_COLOR: always
DOCKER_IMAGE_NAME_URL: https://ghcr.io/${{ github.repository_owner }}/reth
DOCKER_OP_IMAGE_NAME_URL: https://ghcr.io/${{ github.repository_owner }}/op-reth
RUSTC_WRAPPER: "sccache"
jobs:
dry-run:
@@ -52,7 +51,6 @@ jobs:
steps:
- uses: actions/checkout@v6
- uses: dtolnay/rust-toolchain@stable
- uses: mozilla-actions/sccache-action@v0.0.9
- name: Verify crate version matches tag
# Check that the Cargo version starts with the tag,
# so that Cargo version 1.4.8 can be matched against both v1.4.8 and v1.4.8-rc.1
@@ -106,7 +104,6 @@ jobs:
- uses: dtolnay/rust-toolchain@stable
with:
target: ${{ matrix.configs.target }}
- uses: mozilla-actions/sccache-action@v0.0.9
- name: Install cross main
id: cross_main
run: |

View File

@@ -12,7 +12,6 @@ env:
CARGO_TERM_COLOR: always
FROM_BLOCK: 0
TO_BLOCK: 50000
RUSTC_WRAPPER: "sccache"
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
@@ -32,7 +31,6 @@ jobs:
- uses: actions/checkout@v6
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true

View File

@@ -9,7 +9,6 @@ on:
env:
CARGO_TERM_COLOR: always
RUSTC_WRAPPER: "sccache"
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
@@ -42,7 +41,6 @@ jobs:
- uses: actions/checkout@v6
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true

View File

@@ -9,7 +9,6 @@ on:
env:
CARGO_TERM_COLOR: always
RUSTC_WRAPPER: "sccache"
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
@@ -42,7 +41,6 @@ jobs:
- uses: actions/checkout@v6
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true

View File

@@ -11,7 +11,6 @@ on:
env:
CARGO_TERM_COLOR: always
SEED: rustethereumethereumrust
RUSTC_WRAPPER: "sccache"
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
@@ -20,7 +19,7 @@ concurrency:
jobs:
test:
name: test / ${{ matrix.type }} (${{ matrix.partition }}/${{ matrix.total_partitions }})
runs-on: depot-ubuntu-latest-4
runs-on: depot-ubuntu-latest-16
env:
RUST_BACKTRACE: 1
strategy:
@@ -47,7 +46,6 @@ jobs:
- uses: actions/checkout@v6
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
@@ -93,7 +91,6 @@ jobs:
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: taiki-e/install-action@nextest
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
@@ -101,7 +98,7 @@ jobs:
doc:
name: doc tests
runs-on: depot-ubuntu-latest
runs-on: depot-ubuntu-latest-16
env:
RUST_BACKTRACE: 1
timeout-minutes: 30
@@ -109,7 +106,6 @@ jobs:
- uses: actions/checkout@v6
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true

View File

@@ -9,12 +9,9 @@ on:
branches: [main]
merge_group:
env:
RUSTC_WRAPPER: "sccache"
jobs:
check-reth:
runs-on: depot-ubuntu-latest
runs-on: depot-ubuntu-latest-16
timeout-minutes: 60
steps:
@@ -24,7 +21,6 @@ jobs:
with:
target: x86_64-pc-windows-gnu
- uses: taiki-e/install-action@cross
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
@@ -34,7 +30,7 @@ jobs:
run: cargo check --target x86_64-pc-windows-gnu
check-op-reth:
runs-on: depot-ubuntu-latest
runs-on: depot-ubuntu-latest-16
timeout-minutes: 60
steps:
@@ -44,7 +40,6 @@ jobs:
with:
target: x86_64-pc-windows-gnu
- uses: taiki-e/install-action@cross
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true

97
Cargo.lock generated
View File

@@ -97,9 +97,9 @@ checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
[[package]]
name = "alloy-chains"
version = "0.2.23"
version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35d744058a9daa51a8cf22a3009607498fcf82d3cf4c5444dd8056cdf651f471"
checksum = "1b9ebac8ff9c2f07667e1803dc777304337e160ce5153335beb45e8ec0751808"
dependencies = [
"alloy-primitives",
"alloy-rlp",
@@ -238,18 +238,6 @@ dependencies = [
"thiserror 2.0.17",
]
[[package]]
name = "alloy-eip7928"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "926b2c0d34e641cf8b17bf54ce50fda16715b9f68ad878fa6128bae410c6f890"
dependencies = [
"alloy-primitives",
"alloy-rlp",
"borsh",
"serde",
]
[[package]]
name = "alloy-eips"
version = "1.1.3"
@@ -278,9 +266,9 @@ dependencies = [
[[package]]
name = "alloy-evm"
version = "0.25.2"
version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6ccc4c702c840148af1ce784cc5c6ed9274a020ef32417c5b1dbeab8c317673"
checksum = "01be36ba6f5e6e62563b369e03ca529eac46aea50677f84655084b4750816574"
dependencies = [
"alloy-consensus",
"alloy-eips",
@@ -395,9 +383,9 @@ dependencies = [
[[package]]
name = "alloy-op-evm"
version = "0.25.2"
version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f640da852f93ddaa3b9a602b7ca41d80e0023f77a67b68aaaf511c32f1fe0ce"
checksum = "231262d7e06000f3fb642d32d38ca75e09e78e04977c10be0a07a5ee2c869cfd"
dependencies = [
"alloy-consensus",
"alloy-eips",
@@ -1381,9 +1369,9 @@ dependencies = [
[[package]]
name = "async-compression"
version = "0.4.36"
version = "0.4.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98ec5f6c2f8bc326c994cb9e241cc257ddaba9afa8555a43cffbb5dd86efaa37"
checksum = "0e86f6d3dc9dc4352edeea6b8e499e13e3f5dc3b964d7ca5fd411415a3498473"
dependencies = [
"compression-codecs",
"compression-core",
@@ -1541,9 +1529,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]]
name = "base64ct"
version = "1.8.1"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e050f626429857a27ddccb31e0aca21356bfa709c04041aefddac081a8f068a"
checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba"
[[package]]
name = "bech32"
@@ -2389,9 +2377,9 @@ dependencies = [
[[package]]
name = "compression-codecs"
version = "0.4.35"
version = "0.4.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0f7ac3e5b97fdce45e8922fb05cae2c37f7bbd63d30dd94821dacfd8f3f2bf2"
checksum = "302266479cb963552d11bd042013a58ef1adc56768016c8b82b4199488f2d4ad"
dependencies = [
"brotli",
"compression-core",
@@ -4005,9 +3993,9 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
[[package]]
name = "flate2"
version = "1.1.5"
version = "1.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfe33edd8e85a12a67454e37f8c75e730830d83e313556ab9ebf9ee7fbeb3bfb"
checksum = "a2152dbcb980c05735e2a651d96011320a949eb31a0c8b38b72645ce97dec676"
dependencies = [
"crc32fast",
"miniz_oxide",
@@ -5474,9 +5462,9 @@ checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de"
[[package]]
name = "libp2p-identity"
version = "0.2.13"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0c7892c221730ba55f7196e98b0b8ba5e04b4155651736036628e9f73ed6fc3"
checksum = "3104e13b51e4711ff5738caa1fb54467c8604c2e94d607e27745bcf709068774"
dependencies = [
"asn1_der",
"bs58",
@@ -6211,9 +6199,9 @@ checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e"
[[package]]
name = "op-alloy"
version = "0.23.1"
version = "0.22.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9b8fee21003dd4f076563de9b9d26f8c97840157ef78593cd7f262c5ca99848"
checksum = "c3b13412d297c1f9341f678b763750b120a73ffe998fa54a94d6eda98449e7ca"
dependencies = [
"op-alloy-consensus",
"op-alloy-network",
@@ -6224,9 +6212,9 @@ dependencies = [
[[package]]
name = "op-alloy-consensus"
version = "0.23.1"
version = "0.22.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "736381a95471d23e267263cfcee9e1d96d30b9754a94a2819148f83379de8a86"
checksum = "726da827358a547be9f1e37c2a756b9e3729cb0350f43408164794b370cad8ae"
dependencies = [
"alloy-consensus",
"alloy-eips",
@@ -6250,9 +6238,9 @@ checksum = "a79f352fc3893dcd670172e615afef993a41798a1d3fc0db88a3e60ef2e70ecc"
[[package]]
name = "op-alloy-network"
version = "0.23.1"
version = "0.22.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4034183dca6bff6632e7c24c92e75ff5f0eabb58144edb4d8241814851334d47"
checksum = "f63f27e65be273ec8fcb0b6af0fd850b550979465ab93423705ceb3dfddbd2ab"
dependencies = [
"alloy-consensus",
"alloy-network",
@@ -6266,9 +6254,9 @@ dependencies = [
[[package]]
name = "op-alloy-provider"
version = "0.23.1"
version = "0.22.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6753d90efbaa8ea8bcb89c1737408ca85fa60d7adb875049d3f382c063666f86"
checksum = "a71456699aa256dc20119736422ad9a44da8b9585036117afb936778122093b9"
dependencies = [
"alloy-network",
"alloy-primitives",
@@ -6281,9 +6269,9 @@ dependencies = [
[[package]]
name = "op-alloy-rpc-jsonrpsee"
version = "0.23.1"
version = "0.22.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1c820ef9c802ebc732281a940bfb6ac2345af4d9fff041cbb64b4b546676686"
checksum = "8ef9114426b16172254555aad34a8ea96c01895e40da92f5d12ea680a1baeaa7"
dependencies = [
"alloy-primitives",
"jsonrpsee",
@@ -6291,9 +6279,9 @@ dependencies = [
[[package]]
name = "op-alloy-rpc-types"
version = "0.23.1"
version = "0.22.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddd87c6b9e5b6eee8d6b76f41b04368dca0e9f38d83338e5b00e730c282098a4"
checksum = "562dd4462562c41f9fdc4d860858c40e14a25df7f983ae82047f15f08fce4d19"
dependencies = [
"alloy-consensus",
"alloy-eips",
@@ -6311,9 +6299,9 @@ dependencies = [
[[package]]
name = "op-alloy-rpc-types-engine"
version = "0.23.1"
version = "0.22.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77727699310a18cdeed32da3928c709e2704043b6584ed416397d5da65694efc"
checksum = "d8f24b8cb66e4b33e6c9e508bf46b8ecafc92eadd0b93fedd306c0accb477657"
dependencies = [
"alloy-consensus",
"alloy-eips",
@@ -6327,7 +6315,6 @@ dependencies = [
"ethereum_ssz_derive",
"op-alloy-consensus",
"serde",
"sha2",
"snap",
"thiserror 2.0.17",
]
@@ -7363,9 +7350,9 @@ checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2"
[[package]]
name = "reqwest"
version = "0.12.25"
version = "0.12.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6eff9328d40131d43bd911d42d79eb6a47312002a4daefc9e37f17e74a7701a"
checksum = "9d0946410b9f7b082a427e4ef5c8ff541a88b357bc6c637c40db3a68ac70a36f"
dependencies = [
"base64 0.22.1",
"bytes",
@@ -8220,7 +8207,6 @@ name = "reth-engine-tree"
version = "1.9.3"
dependencies = [
"alloy-consensus",
"alloy-eip7928",
"alloy-eips",
"alloy-evm",
"alloy-primitives",
@@ -9245,7 +9231,6 @@ dependencies = [
"alloy-sol-types",
"eyre",
"futures",
"jsonrpsee-core",
"rand 0.9.2",
"reth-chainspec",
"reth-db",
@@ -9281,7 +9266,6 @@ dependencies = [
"serde",
"serde_json",
"similar-asserts",
"tempfile",
"tokio",
]
@@ -10172,7 +10156,6 @@ dependencies = [
"reth-db-api",
"reth-engine-primitives",
"reth-errors",
"reth-ethereum-engine-primitives",
"reth-ethereum-primitives",
"reth-evm",
"reth-evm-ethereum",
@@ -10235,9 +10218,6 @@ dependencies = [
"reth-network-peers",
"reth-rpc-eth-api",
"reth-trie-common",
"serde",
"serde_json",
"tokio",
]
[[package]]
@@ -10302,7 +10282,6 @@ dependencies = [
"reth-rpc-server-types",
"reth-storage-api",
"reth-tasks",
"reth-tokio-util",
"reth-tracing",
"reth-transaction-pool",
"serde",
@@ -11196,9 +11175,9 @@ dependencies = [
[[package]]
name = "revm-inspectors"
version = "0.33.2"
version = "0.33.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01def7351cd9af844150b8e88980bcd11304f33ce23c3d7c25f2a8dab87c1345"
checksum = "6c93974333e7acc4b2dc024b10def99707f7375a4d53db7a7f8351722d25673f"
dependencies = [
"alloy-primitives",
"alloy-rpc-types-eth",
@@ -12080,9 +12059,9 @@ dependencies = [
[[package]]
name = "simd-adler32"
version = "0.3.8"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2"
checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe"
[[package]]
name = "similar"
@@ -12904,9 +12883,9 @@ dependencies = [
[[package]]
name = "tower-http"
version = "0.6.8"
version = "0.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8"
checksum = "9cf146f99d442e8e68e585f5d798ccd3cad9a7835b917e09728880a862706456"
dependencies = [
"async-compression",
"base64 0.22.1",

View File

@@ -376,11 +376,11 @@ reth-era-utils = { path = "crates/era-utils" }
reth-errors = { path = "crates/errors" }
reth-eth-wire = { path = "crates/net/eth-wire" }
reth-eth-wire-types = { path = "crates/net/eth-wire-types" }
reth-ethereum-payload-builder = { path = "crates/ethereum/payload" }
reth-ethereum-cli = { path = "crates/ethereum/cli", default-features = false }
reth-ethereum-consensus = { path = "crates/ethereum/consensus", default-features = false }
reth-ethereum-engine-primitives = { path = "crates/ethereum/engine-primitives", default-features = false }
reth-ethereum-forks = { path = "crates/ethereum/hardforks", default-features = false }
reth-ethereum-payload-builder = { path = "crates/ethereum/payload" }
reth-ethereum-primitives = { path = "crates/ethereum/primitives", default-features = false }
reth-ethereum = { path = "crates/ethereum/reth" }
reth-etl = { path = "crates/etl" }
@@ -481,14 +481,13 @@ revm-primitives = { version = "21.0.2", default-features = false }
revm-interpreter = { version = "31.1.0", default-features = false }
revm-database-interface = { version = "8.0.5", default-features = false }
op-revm = { version = "14.1.0", default-features = false }
revm-inspectors = "0.33.2"
revm-inspectors = "0.33.1"
# eth
alloy-chains = { version = "0.2.5", default-features = false }
alloy-dyn-abi = "1.4.1"
alloy-eip2124 = { version = "0.2.0", default-features = false }
alloy-eip7928 = { version = "0.1.0" }
alloy-evm = { version = "0.25.1", default-features = false }
alloy-evm = { version = "0.24.1", default-features = false }
alloy-primitives = { version = "1.4.1", default-features = false, features = ["map-foldhash"] }
alloy-rlp = { version = "0.3.10", default-features = false, features = ["core-net"] }
alloy-sol-macro = "1.4.1"
@@ -526,13 +525,13 @@ alloy-transport-ipc = { version = "1.1.3", default-features = false }
alloy-transport-ws = { version = "1.1.3", default-features = false }
# op
alloy-op-evm = { version = "0.25.0", default-features = false }
alloy-op-evm = { version = "0.24.1", default-features = false }
alloy-op-hardforks = "0.4.4"
op-alloy-rpc-types = { version = "0.23.1", default-features = false }
op-alloy-rpc-types-engine = { version = "0.23.1", default-features = false }
op-alloy-network = { version = "0.23.1", default-features = false }
op-alloy-consensus = { version = "0.23.1", default-features = false }
op-alloy-rpc-jsonrpsee = { version = "0.23.1", default-features = false }
op-alloy-rpc-types = { version = "0.22.4", default-features = false }
op-alloy-rpc-types-engine = { version = "0.22.4", default-features = false }
op-alloy-network = { version = "0.22.4", default-features = false }
op-alloy-consensus = { version = "0.22.4", default-features = false }
op-alloy-rpc-jsonrpsee = { version = "0.22.4", default-features = false }
op-alloy-flz = { version = "0.13.1", default-features = false }
# misc

View File

@@ -80,7 +80,7 @@ RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --no-default-fe
### Run the Benchmark:
First, start the reth node. Here is an example that runs `reth` compiled with the `profiling` profile, runs `samply`, and configures `reth` to run with metrics enabled:
```bash
samply record -p 3001 target/profiling/reth node --metrics localhost:9001 --authrpc.jwtsecret <jwt_file_path>
samply record -p 3001 target/profiling/reth node --metrics localhost:9001 --authrpc.jwt-secret <jwt_file_path>
```
```bash

View File

@@ -92,8 +92,6 @@ impl Command {
receipts_in_static_files: _,
transaction_senders_in_static_files: _,
storages_history_in_rocksdb: _,
transaction_hash_numbers_in_rocksdb: _,
account_history_in_rocksdb: _,
} = settings.unwrap_or_else(StorageSettings::legacy);
// Update the setting based on the key

View File

@@ -1,5 +1,5 @@
use crate::BlockProvider;
use alloy_provider::{ConnectionConfig, Network, Provider, ProviderBuilder, WebSocketConfig};
use alloy_provider::{Network, Provider, ProviderBuilder};
use alloy_transport::TransportResult;
use futures::{Stream, StreamExt};
use reth_node_api::Block;
@@ -25,19 +25,7 @@ impl<N: Network, PrimitiveBlock> RpcBlockProvider<N, PrimitiveBlock> {
convert: impl Fn(N::BlockResponse) -> PrimitiveBlock + Send + Sync + 'static,
) -> eyre::Result<Self> {
Ok(Self {
provider: Arc::new(
ProviderBuilder::default()
.connect_with_config(
rpc_url,
ConnectionConfig::default().with_max_retries(u32::MAX).with_ws_config(
WebSocketConfig::default()
// allow larger messages/frames for big blocks
.max_frame_size(Some(128 * 1024 * 1024))
.max_message_size(Some(128 * 1024 * 1024)),
),
)
.await?,
),
provider: Arc::new(ProviderBuilder::default().connect(rpc_url).await?),
url: rpc_url.to_string(),
convert: Arc::new(convert),
})

View File

@@ -1,7 +1,5 @@
//! Engine tree configuration.
use alloy_eips::merge::EPOCH_SLOTS;
/// Triggers persistence when the number of canonical blocks in memory exceeds this threshold.
pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
@@ -42,7 +40,7 @@ pub const DEFAULT_RESERVED_CPU_CORES: usize = 1;
/// Default maximum concurrency for prewarm task.
pub const DEFAULT_PREWARM_MAX_CONCURRENCY: usize = 16;
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = EPOCH_SLOTS as u32 * 2;
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = 256;
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
const DEFAULT_CROSS_BLOCK_CACHE_SIZE: u64 = 4 * 1024 * 1024 * 1024;
@@ -91,8 +89,6 @@ pub struct TreeConfig {
/// Whether to always compare trie updates from the state root task to the trie updates from
/// the regular state root calculation.
always_compare_trie_updates: bool,
/// Whether to disable state cache.
disable_state_cache: bool,
/// Whether to disable parallel prewarming.
disable_prewarming: bool,
/// Whether to disable the parallel sparse trie state root algorithm.
@@ -147,7 +143,6 @@ impl Default for TreeConfig {
max_execute_block_batch_size: DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE,
legacy_state_root: false,
always_compare_trie_updates: false,
disable_state_cache: false,
disable_prewarming: false,
disable_parallel_sparse_trie: false,
state_provider_metrics: false,
@@ -178,7 +173,6 @@ impl TreeConfig {
max_execute_block_batch_size: usize,
legacy_state_root: bool,
always_compare_trie_updates: bool,
disable_state_cache: bool,
disable_prewarming: bool,
disable_parallel_sparse_trie: bool,
state_provider_metrics: bool,
@@ -203,7 +197,6 @@ impl TreeConfig {
max_execute_block_batch_size,
legacy_state_root,
always_compare_trie_updates,
disable_state_cache,
disable_prewarming,
disable_parallel_sparse_trie,
state_provider_metrics,
@@ -278,12 +271,7 @@ impl TreeConfig {
self.disable_parallel_sparse_trie
}
/// Returns whether or not state cache is disabled.
pub const fn disable_state_cache(&self) -> bool {
self.disable_state_cache
}
/// Returns whether or not parallel prewarming is disabled.
/// Returns whether or not parallel prewarming should be used.
pub const fn disable_prewarming(&self) -> bool {
self.disable_prewarming
}
@@ -375,12 +363,6 @@ impl TreeConfig {
self
}
/// Setter for whether to disable state cache.
pub const fn without_state_cache(mut self, disable_state_cache: bool) -> Self {
self.disable_state_cache = disable_state_cache;
self
}
/// Setter for whether to disable parallel prewarming.
pub const fn without_prewarming(mut self, disable_prewarming: bool) -> Self {
self.disable_prewarming = disable_prewarming;

View File

@@ -39,7 +39,6 @@ reth-trie.workspace = true
alloy-evm.workspace = true
alloy-consensus.workspace = true
alloy-eips.workspace = true
alloy-eip7928.workspace = true
alloy-primitives.workspace = true
alloy-rlp.workspace = true
alloy-rpc-types-engine.workspace = true

View File

@@ -22,7 +22,7 @@ const NANOS_PER_SEC: u32 = 1_000_000_000;
/// An atomic version of [`Duration`], using an [`AtomicU64`] to store the total nanoseconds in the
/// duration.
#[derive(Debug, Default)]
#[derive(Default)]
pub(crate) struct AtomicDuration {
/// The nanoseconds part of the duration
///
@@ -59,8 +59,7 @@ impl AtomicDuration {
}
/// A wrapper of a state provider and latency metrics.
#[derive(Debug)]
pub struct InstrumentedStateProvider<S> {
pub(crate) struct InstrumentedStateProvider<S> {
/// The state provider
state_provider: S,
@@ -81,12 +80,11 @@ impl<S> InstrumentedStateProvider<S>
where
S: StateProvider,
{
/// Creates a new [`InstrumentedStateProvider`] from a state provider with the provided label
/// for metrics.
pub fn from_state_provider(state_provider: S, source: &'static str) -> Self {
/// Creates a new [`InstrumentedStateProvider`] from a state provider
pub(crate) fn from_state_provider(state_provider: S) -> Self {
Self {
state_provider,
metrics: StateProviderMetrics::new_with_labels(&[("source", source)]),
metrics: StateProviderMetrics::default(),
total_storage_fetch_latency: AtomicDuration::zero(),
total_code_fetch_latency: AtomicDuration::zero(),
total_account_fetch_latency: AtomicDuration::zero(),
@@ -136,12 +134,6 @@ impl<S> InstrumentedStateProvider<S> {
}
}
impl<S> Drop for InstrumentedStateProvider<S> {
fn drop(&mut self) {
self.record_total_latency();
}
}
/// Metrics for the instrumented state provider
#[derive(Metrics, Clone)]
#[metrics(scope = "sync.state_provider")]

View File

@@ -54,7 +54,7 @@ use tracing::*;
mod block_buffer;
mod cached_state;
pub mod error;
pub mod instrumented_state;
mod instrumented_state;
mod invalid_headers;
mod metrics;
mod payload_processor;

View File

@@ -106,8 +106,6 @@ where
cross_block_cache_size: u64,
/// Whether transactions should not be executed on prewarming task.
disable_transaction_prewarming: bool,
/// Whether state cache should be disable
disable_state_cache: bool,
/// Determines how to configure the evm for execution.
evm_config: Evm,
/// Whether precompile cache should be disabled.
@@ -151,7 +149,6 @@ where
cross_block_cache_size: config.cross_block_cache_size(),
disable_transaction_prewarming: config.disable_prewarming(),
evm_config,
disable_state_cache: config.disable_state_cache(),
precompile_cache_disabled: config.precompile_cache_disabled(),
precompile_cache_map,
sparse_state_trie: Arc::default(),
@@ -385,15 +382,9 @@ where
transactions = mpsc::channel().1;
}
let (saved_cache, cache, cache_metrics) = if self.disable_state_cache {
(None, None, None)
} else {
let saved_cache = self.cache_for(env.parent_hash);
let cache = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
(Some(saved_cache), Some(cache), Some(cache_metrics))
};
let saved_cache = self.cache_for(env.parent_hash);
let cache = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
// configure prewarming
let prewarm_ctx = PrewarmContext {
env,
@@ -605,12 +596,12 @@ impl<Tx, Err> PayloadHandle<Tx, Err> {
}
/// Returns a clone of the caches used by prewarming
pub(super) fn caches(&self) -> Option<StateExecutionCache> {
pub(super) fn caches(&self) -> StateExecutionCache {
self.prewarm_handle.cache.clone()
}
/// Returns a clone of the cache metrics used by prewarming
pub(super) fn cache_metrics(&self) -> Option<CachedStateMetrics> {
pub(super) fn cache_metrics(&self) -> CachedStateMetrics {
self.prewarm_handle.cache_metrics.clone()
}
@@ -640,9 +631,9 @@ impl<Tx, Err> PayloadHandle<Tx, Err> {
#[derive(Debug)]
pub(crate) struct CacheTaskHandle {
/// The shared cache the task operates with.
cache: Option<StateExecutionCache>,
cache: StateExecutionCache,
/// Metrics for the caches
cache_metrics: Option<CachedStateMetrics>,
cache_metrics: CachedStateMetrics,
/// Channel to the spawned prewarm task if any
to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent>>,
}

View File

@@ -3,7 +3,7 @@
use alloy_evm::block::StateChangeSource;
use alloy_primitives::{
keccak256,
map::{B256Set, HashSet},
map::{B256Map, B256Set, HashSet},
B256,
};
use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
@@ -23,7 +23,7 @@ use reth_trie_parallel::{
StorageProofInput,
},
};
use std::{collections::BTreeMap, mem, ops::DerefMut, sync::Arc, time::Instant};
use std::{collections::BTreeMap, ops::DerefMut, sync::Arc, time::Instant};
use tracing::{debug, error, instrument, trace};
/// Maximum number of targets to batch together for prefetch batching.
@@ -354,7 +354,7 @@ impl MultiproofManager {
fn dispatch(&self, input: PendingMultiproofTask) {
// If there are no proof targets, we can just send an empty multiproof back immediately
if input.proof_targets_is_empty() {
trace!(
debug!(
sequence_number = input.proof_sequence_number(),
"No proof targets, sending empty multiproof back immediately"
);
@@ -883,16 +883,31 @@ impl MultiProofTask {
skip(self, update),
fields(accounts = update.len(), chunks = 0)
)]
fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 {
fn on_state_update(
&mut self,
source: StateChangeSource,
update: EvmState,
batch_removed_keys: &BatchedRemovedKeys,
) -> u64 {
let hashed_state_update = evm_state_to_hashed_post_state(update);
// Update removed keys based on the state update.
// Update the persistent removed keys state based on the MERGED state update.
// This tracks the "current" removal state for subsequent batches.
self.multi_added_removed_keys.update_with_state(&hashed_state_update);
// For proof selection within THIS batch, we need to include any keys that were
// deleted mid-batch but recreated (e.g., 100→0→100). The merged state shows
// the final value, but update_with_state clears the removal flag when it sees
// non-zero values. So we apply the batch_removed_keys to override those clears.
let mut proof_added_removed_keys = self.multi_added_removed_keys.clone();
if !batch_removed_keys.is_empty() {
batch_removed_keys.apply_to(&mut proof_added_removed_keys);
}
// Split the state update into already fetched and not fetched according to the proof
// targets.
let (fetched_state_update, not_fetched_state_update) = hashed_state_update
.partition_by_targets(&self.fetched_proof_targets, &self.multi_added_removed_keys);
.partition_by_targets(&self.fetched_proof_targets, &proof_added_removed_keys);
let mut state_updates = 0;
// If there are any accounts or storage slots that we already fetched the proofs for,
@@ -906,7 +921,7 @@ impl MultiProofTask {
}
// Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks
let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
let multi_added_removed_keys = Arc::new(proof_added_removed_keys);
let chunking_len = not_fetched_state_update.chunking_length();
let mut spawned_proof_targets = MultiProofTargets::default();
@@ -1045,7 +1060,7 @@ impl MultiProofTask {
let storage_targets =
merged_targets.values().map(|slots| slots.len()).sum::<usize>();
batch_metrics.prefetch_proofs_requested += self.on_prefetch_proof(merged_targets);
trace!(
debug!(
target: "engine::tree::payload_processor::multiproof",
account_targets,
storage_targets,
@@ -1056,7 +1071,7 @@ impl MultiProofTask {
false
}
// State update: batch consecutive updates from the same source
// State update: batch consecutive updates up to the target cap
MultiProofMessage::StateUpdate(source, update) => {
trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::StateUpdate");
@@ -1077,18 +1092,6 @@ impl MultiProofTask {
while accumulated_targets < STATE_UPDATE_MAX_BATCH_TARGETS {
match self.rx.try_recv() {
Ok(MultiProofMessage::StateUpdate(next_source, next_update)) => {
let (batch_source, batch_update) = &ctx.accumulated_state_updates[0];
if !can_batch_state_update(
*batch_source,
batch_update,
next_source,
&next_update,
) {
ctx.pending_msg =
Some(MultiProofMessage::StateUpdate(next_source, next_update));
break;
}
let next_estimate = estimate_evm_state_targets(&next_update);
// Would exceed batch cap; leave pending to dispatch on next iteration.
if accumulated_targets + next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS
@@ -1112,30 +1115,16 @@ impl MultiProofTask {
let num_batched = ctx.accumulated_state_updates.len();
self.metrics.state_update_batch_size_histogram.record(num_batched as f64);
#[cfg(debug_assertions)]
{
let batch_source = ctx.accumulated_state_updates[0].0;
let batch_update = &ctx.accumulated_state_updates[0].1;
debug_assert!(ctx.accumulated_state_updates.iter().all(|(source, update)| {
can_batch_state_update(batch_source, batch_update, *source, update)
}));
}
// Merge all accumulated updates into a single EvmState payload.
// Use drain to preserve the buffer allocation.
let mut accumulated_iter = ctx.accumulated_state_updates.drain(..);
let (mut batch_source, mut merged_update) = accumulated_iter
.next()
.expect("state update batch always has at least one entry");
for (next_source, next_update) in accumulated_iter {
batch_source = next_source;
merged_update.extend(next_update);
}
// Merge all accumulated updates into a single EvmState payload while
// preserving deletion information. Use drain to preserve buffer allocation.
let (batch_source, merged_update, batch_removed_keys) =
batch_state_updates(ctx.accumulated_state_updates.drain(..))
.expect("state update batch always has at least one entry");
let batch_len = merged_update.len();
batch_metrics.state_update_proofs_requested +=
self.on_state_update(batch_source, merged_update);
trace!(
self.on_state_update(batch_source, merged_update, &batch_removed_keys);
debug!(
target: "engine::tree::payload_processor::multiproof",
?batch_source,
len = batch_len,
@@ -1271,7 +1260,7 @@ impl MultiProofTask {
// Convert ProofResultMessage to SparseTrieUpdate
match proof_result.result {
Ok(proof_result_data) => {
trace!(
debug!(
target: "engine::tree::payload_processor::multiproof",
sequence = proof_result.sequence_number,
total_proofs = batch_metrics.proofs_processed,
@@ -1419,7 +1408,9 @@ fn get_proof_targets(
// first collect all new accounts (not previously fetched)
for &hashed_address in state_update.accounts.keys() {
if !fetched_proof_targets.contains_key(&hashed_address) {
if !fetched_proof_targets.contains_key(&hashed_address) ||
multi_added_removed_keys.get_accounts().is_removed(&hashed_address)
{
targets.insert(hashed_address, HashSet::default());
}
}
@@ -1450,6 +1441,81 @@ fn get_proof_targets(
targets
}
/// Tracks removals across batched state updates so deletion signals are not lost when updates are
/// merged.
#[derive(Default, Debug)]
struct BatchedRemovedKeys {
accounts: HashSet<B256>,
storages: B256Map<HashSet<B256>>,
}
impl BatchedRemovedKeys {
/// Records any account or storage removals observed in the given state update.
fn record(&mut self, update: &EvmState) {
for (address, account) in update {
if !account.is_touched() {
continue;
}
let hashed_address = keccak256(*address);
// Selfdestruct clears storage, so force a refetch of the account proof.
if account.is_selfdestructed() {
self.accounts.insert(hashed_address);
continue;
}
for (slot, value) in &account.storage {
if value.is_changed() && value.present_value.is_zero() {
self.storages
.entry(hashed_address)
.or_default()
.insert(keccak256(B256::from(*slot)));
}
}
}
}
/// Applies recorded removals to the given [`MultiAddedRemovedKeys`].
fn apply_to(&self, added_removed_keys: &mut MultiAddedRemovedKeys) {
for account in &self.accounts {
added_removed_keys.mark_account_removed(*account);
}
for (hashed_address, slots) in &self.storages {
for slot in slots {
added_removed_keys.mark_storage_removed(*hashed_address, *slot);
}
}
}
/// Returns true if no removals were recorded.
fn is_empty(&self) -> bool {
self.accounts.is_empty() && self.storages.is_empty()
}
}
/// Merges multiple state updates while preserving deletion information.
///
/// When `extend()` merges updates, intermediate deletions (values set to zero) are overwritten
/// by later non-zero values. This function captures those deletions before they're lost,
/// ensuring proofs are correctly refetched for keys that were deleted mid-batch.
///
/// Returns `(source, merged_state, removed_keys)` or `None` if the iterator is empty.
fn batch_state_updates(
updates: impl Iterator<Item = (StateChangeSource, EvmState)>,
) -> Option<(StateChangeSource, EvmState, BatchedRemovedKeys)> {
let mut iter = updates;
let (source, mut merged) = iter.next()?;
let mut removed = BatchedRemovedKeys::default();
removed.record(&merged);
for (_, next) in iter {
removed.record(&next);
merged.extend(next);
}
Some((source, merged, removed))
}
/// Dispatches work items as a single unit or in chunks based on target size and worker
/// availability.
#[allow(clippy::too_many_arguments)]
@@ -1486,44 +1552,6 @@ where
1
}
/// Checks whether two state updates can be merged in a batch.
///
/// Transaction updates with the same transaction ID (`StateChangeSource::Transaction(id)`)
/// are safe to merge because they originate from the same logical execution and can be
/// coalesced to amortize proof work.
fn can_batch_state_update(
batch_source: StateChangeSource,
batch_update: &EvmState,
next_source: StateChangeSource,
next_update: &EvmState,
) -> bool {
if !same_state_change_source(batch_source, next_source) {
return false;
}
match (batch_source, next_source) {
(StateChangeSource::PreBlock(_), StateChangeSource::PreBlock(_)) |
(StateChangeSource::PostBlock(_), StateChangeSource::PostBlock(_)) => {
batch_update == next_update
}
_ => true,
}
}
/// Checks whether two state change sources refer to the same origin.
fn same_state_change_source(lhs: StateChangeSource, rhs: StateChangeSource) -> bool {
match (lhs, rhs) {
(StateChangeSource::Transaction(a), StateChangeSource::Transaction(b)) => a == b,
(StateChangeSource::PreBlock(a), StateChangeSource::PreBlock(b)) => {
mem::discriminant(&a) == mem::discriminant(&b)
}
(StateChangeSource::PostBlock(a), StateChangeSource::PostBlock(b)) => {
mem::discriminant(&a) == mem::discriminant(&b)
}
_ => false,
}
}
/// Estimates target count from `EvmState` for batching decisions.
fn estimate_evm_state_targets(state: &EvmState) -> usize {
state
@@ -2012,6 +2040,158 @@ mod tests {
assert!(!targets.contains_key(&addr));
}
#[test]
fn test_get_proof_targets_refetches_removed_account() {
let mut state = HashedPostState::default();
let mut fetched = MultiProofTargets::default();
let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
let addr = B256::random();
state.accounts.insert(addr, Some(Default::default()));
fetched.insert(addr, HashSet::default());
multi_added_removed_keys.mark_account_removed(addr);
let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
assert!(targets.contains_key(&addr));
}
#[test]
fn test_batched_removed_keys_preserve_hidden_deletions() {
use alloy_primitives::Address;
use revm_state::{Account, AccountInfo, AccountStatus, EvmStorageSlot};
let mut removed_keys = BatchedRemovedKeys::default();
let address = Address::random();
let slot = U256::from(1);
// Record a deletion in an intermediate update.
let mut delete_update = EvmState::default();
delete_update.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 0,
storage: std::iter::once((
slot,
EvmStorageSlot::new_changed(U256::from(10), U256::ZERO, 0),
))
.collect(),
status: AccountStatus::Touched,
},
);
removed_keys.record(&delete_update);
// Final merged state re-adds the slot with a non-zero value.
let mut state = HashedPostState::default();
let hashed_address = keccak256(address);
let hashed_slot = keccak256(B256::from(slot));
let mut storage = HashedStorage::default();
storage.storage.insert(hashed_slot, U256::from(20));
state.accounts.insert(hashed_address, Some(Default::default()));
state.storages.insert(hashed_address, storage);
// Slot was already fetched previously.
let mut fetched = MultiProofTargets::default();
fetched.insert(hashed_address, HashSet::from_iter([hashed_slot]));
// Apply recorded deletions so proof selection still refetches.
let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
removed_keys.apply_to(&mut multi_added_removed_keys);
let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
assert!(targets.get(&hashed_address).is_some_and(|slots| slots.contains(&hashed_slot)));
}
#[test]
fn test_batch_state_updates_preserves_intermediate_deletions() {
use alloy_primitives::Address;
use revm_state::{Account, AccountInfo, AccountStatus, EvmStorageSlot};
let address = Address::random();
let slot = U256::from(42);
// Update 1: Create slot with value 100
let mut update1 = EvmState::default();
update1.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 0,
storage: std::iter::once((
slot,
EvmStorageSlot::new_changed(U256::ZERO, U256::from(100), 0),
))
.collect(),
status: AccountStatus::Touched,
},
);
// Update 2: Delete slot (set to 0)
let mut update2 = EvmState::default();
update2.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 1,
storage: std::iter::once((
slot,
EvmStorageSlot::new_changed(U256::from(100), U256::ZERO, 1),
))
.collect(),
status: AccountStatus::Touched,
},
);
// Update 3: Recreate slot with value 200
let mut update3 = EvmState::default();
update3.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 2,
storage: std::iter::once((
slot,
EvmStorageSlot::new_changed(U256::ZERO, U256::from(200), 2),
))
.collect(),
status: AccountStatus::Touched,
},
);
let source = StateChangeSource::Transaction(0);
let updates = vec![
(source, update1),
(StateChangeSource::Transaction(1), update2),
(StateChangeSource::Transaction(2), update3),
];
let (result_source, merged, removed_keys) =
batch_state_updates(updates.into_iter()).expect("should have updates");
// Source should be from first update
assert!(matches!(result_source, StateChangeSource::Transaction(0)));
// Merged state should have final value 200
let account = merged.get(&address).expect("account should exist");
let slot_value = account.storage.get(&slot).expect("slot should exist");
assert_eq!(slot_value.present_value, U256::from(200));
// Crucially: removal should be recorded even though final value is non-zero
let hashed_address = keccak256(address);
let hashed_slot = keccak256(B256::from(slot));
let mut added_removed_keys = MultiAddedRemovedKeys::new();
removed_keys.apply_to(&mut added_removed_keys);
let storage_keys = added_removed_keys.get_storage(&hashed_address);
assert!(
storage_keys.is_some_and(|k| k.is_removed(&hashed_slot)),
"slot should be marked as removed despite final non-zero value"
);
}
/// Verifies that consecutive prefetch proof messages are batched together.
#[test]
fn test_prefetch_proofs_batching() {
@@ -2129,142 +2309,16 @@ mod tests {
assert!(merged_update.contains_key(&addr1));
assert!(merged_update.contains_key(&addr2));
task.on_state_update(source, merged_update)
task.on_state_update(source, merged_update, &BatchedRemovedKeys::default())
} else {
panic!("Expected StateUpdate message");
};
assert_eq!(proofs_requested, 1);
}
/// Verifies that state updates from different sources are not batched together.
/// Verifies that state updates from different sources batch together.
#[test]
fn test_state_update_batching_separates_sources() {
use alloy_evm::block::StateChangeSource;
use revm_state::Account;
let test_provider_factory = create_test_provider_factory();
let task = create_test_state_root_task(test_provider_factory);
let addr_a1 = alloy_primitives::Address::random();
let addr_b1 = alloy_primitives::Address::random();
let addr_a2 = alloy_primitives::Address::random();
let create_state_update = |addr: alloy_primitives::Address, balance: u64| {
let mut state = EvmState::default();
state.insert(
addr,
Account {
info: revm_state::AccountInfo {
balance: U256::from(balance),
nonce: 1,
code_hash: Default::default(),
code: Default::default(),
},
transaction_id: Default::default(),
storage: Default::default(),
status: revm_state::AccountStatus::Touched,
},
);
state
};
let source_a = StateChangeSource::Transaction(1);
let source_b = StateChangeSource::Transaction(2);
// Queue: A1 (immediate dispatch), B1 (batched), A2 (should become pending)
let tx = task.state_root_message_sender();
tx.send(MultiProofMessage::StateUpdate(source_a, create_state_update(addr_a1, 100)))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(source_b, create_state_update(addr_b1, 200)))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(source_a, create_state_update(addr_a2, 300)))
.unwrap();
let mut pending_msg: Option<MultiProofMessage> = None;
if let Ok(MultiProofMessage::StateUpdate(first_source, _)) = task.rx.recv() {
assert!(same_state_change_source(first_source, source_a));
// Simulate batching loop for remaining messages
let mut accumulated_updates: Vec<(StateChangeSource, EvmState)> = Vec::new();
let mut accumulated_targets = 0usize;
loop {
if accumulated_targets >= STATE_UPDATE_MAX_BATCH_TARGETS {
break;
}
match task.rx.try_recv() {
Ok(MultiProofMessage::StateUpdate(next_source, next_update)) => {
if let Some((batch_source, batch_update)) = accumulated_updates.first() &&
!can_batch_state_update(
*batch_source,
batch_update,
next_source,
&next_update,
)
{
pending_msg =
Some(MultiProofMessage::StateUpdate(next_source, next_update));
break;
}
let next_estimate = estimate_evm_state_targets(&next_update);
if next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS {
pending_msg =
Some(MultiProofMessage::StateUpdate(next_source, next_update));
break;
}
if accumulated_targets + next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS &&
!accumulated_updates.is_empty()
{
pending_msg =
Some(MultiProofMessage::StateUpdate(next_source, next_update));
break;
}
accumulated_targets += next_estimate;
accumulated_updates.push((next_source, next_update));
}
Ok(other_msg) => {
pending_msg = Some(other_msg);
break;
}
Err(_) => break,
}
}
assert_eq!(accumulated_updates.len(), 1, "Should only batch matching sources");
let batch_source = accumulated_updates[0].0;
assert!(same_state_change_source(batch_source, source_b));
let batch_source = accumulated_updates[0].0;
let mut merged_update = accumulated_updates.remove(0).1;
for (_, next_update) in accumulated_updates {
merged_update.extend(next_update);
}
assert!(
same_state_change_source(batch_source, source_b),
"Batch should use matching source"
);
assert!(merged_update.contains_key(&addr_b1));
assert!(!merged_update.contains_key(&addr_a1));
assert!(!merged_update.contains_key(&addr_a2));
} else {
panic!("Expected first StateUpdate");
}
match pending_msg {
Some(MultiProofMessage::StateUpdate(pending_source, pending_update)) => {
assert!(same_state_change_source(pending_source, source_a));
assert!(pending_update.contains_key(&addr_a2));
}
other => panic!("Expected pending StateUpdate with source_a, got {:?}", other),
}
}
/// Verifies that pre-block updates only batch when their payloads are identical.
#[test]
fn test_pre_block_updates_require_payload_match_to_batch() {
fn test_different_sources_batch_together() {
use alloy_evm::block::{StateChangePreBlockSource, StateChangeSource};
use revm_state::Account;
@@ -2294,51 +2348,192 @@ mod tests {
state
};
let source = StateChangeSource::PreBlock(StateChangePreBlockSource::BeaconRootContract);
// Different source types
let source_tx1 = StateChangeSource::Transaction(1);
let source_tx2 = StateChangeSource::Transaction(2);
let source_preblock =
StateChangeSource::PreBlock(StateChangePreBlockSource::BeaconRootContract);
// Queue: first update dispatched immediately, next two should not merge
// Queue updates from different sources
let tx = task.state_root_message_sender();
tx.send(MultiProofMessage::StateUpdate(source, create_state_update(addr1, 100))).unwrap();
tx.send(MultiProofMessage::StateUpdate(source, create_state_update(addr2, 200))).unwrap();
tx.send(MultiProofMessage::StateUpdate(source, create_state_update(addr3, 300))).unwrap();
tx.send(MultiProofMessage::StateUpdate(source_tx1, create_state_update(addr1, 100)))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(source_tx2, create_state_update(addr2, 200)))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(source_preblock, create_state_update(addr3, 300)))
.unwrap();
// Receive and batch all updates
if let Ok(MultiProofMessage::StateUpdate(_, first_update)) = task.rx.recv() {
let mut merged_update = first_update;
let mut num_batched = 1;
while let Ok(MultiProofMessage::StateUpdate(_, next_update)) = task.rx.try_recv() {
merged_update.extend(next_update);
num_batched += 1;
}
// All three updates should batch together regardless of source type
assert_eq!(num_batched, 3, "All updates from different sources should batch together");
assert_eq!(merged_update.len(), 3);
assert!(merged_update.contains_key(&addr1));
assert!(merged_update.contains_key(&addr2));
assert!(merged_update.contains_key(&addr3));
} else {
panic!("Expected StateUpdate message");
}
}
/// Verifies that cross-source batching preserves all state data correctly.
#[test]
fn test_cross_source_batching_preserves_all_state() {
use alloy_evm::block::{
StateChangePostBlockSource, StateChangePreBlockSource, StateChangeSource,
};
use revm_state::Account;
let test_provider_factory = create_test_provider_factory();
let task = create_test_state_root_task(test_provider_factory);
let addr_tx = alloy_primitives::Address::random();
let addr_preblock = alloy_primitives::Address::random();
let addr_postblock = alloy_primitives::Address::random();
let create_state_update = |addr: alloy_primitives::Address, balance: u64, nonce: u64| {
let mut state = EvmState::default();
state.insert(
addr,
Account {
info: revm_state::AccountInfo {
balance: U256::from(balance),
nonce,
code_hash: Default::default(),
code: Default::default(),
},
transaction_id: Default::default(),
storage: Default::default(),
status: revm_state::AccountStatus::Touched,
},
);
state
};
let source_tx = StateChangeSource::Transaction(0);
let source_preblock =
StateChangeSource::PreBlock(StateChangePreBlockSource::BeaconRootContract);
let source_postblock =
StateChangeSource::PostBlock(StateChangePostBlockSource::BalanceIncrements);
// Queue updates from all three source types
let tx = task.state_root_message_sender();
tx.send(MultiProofMessage::StateUpdate(source_tx, create_state_update(addr_tx, 100, 1)))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(
source_preblock,
create_state_update(addr_preblock, 200, 2),
))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(
source_postblock,
create_state_update(addr_postblock, 300, 3),
))
.unwrap();
// Receive and batch all updates
if let Ok(MultiProofMessage::StateUpdate(_, first_update)) = task.rx.recv() {
let mut merged_update = first_update;
while let Ok(MultiProofMessage::StateUpdate(_, next_update)) = task.rx.try_recv() {
merged_update.extend(next_update);
}
// Verify all accounts are present with correct data
assert_eq!(merged_update.len(), 3);
let tx_account = merged_update.get(&addr_tx).expect("Transaction account missing");
assert_eq!(tx_account.info.balance, U256::from(100));
assert_eq!(tx_account.info.nonce, 1);
let preblock_account =
merged_update.get(&addr_preblock).expect("PreBlock account missing");
assert_eq!(preblock_account.info.balance, U256::from(200));
assert_eq!(preblock_account.info.nonce, 2);
let postblock_account =
merged_update.get(&addr_postblock).expect("PostBlock account missing");
assert_eq!(postblock_account.info.balance, U256::from(300));
assert_eq!(postblock_account.info.nonce, 3);
} else {
panic!("Expected StateUpdate message");
}
}
/// Verifies that mixed-source batches keep the first source for attribution/logging.
#[test]
fn test_mixed_source_batch_preserves_first_source() {
use alloy_evm::block::{
StateChangePostBlockSource, StateChangePreBlockSource, StateChangeSource,
};
use revm_state::Account;
let test_provider_factory = create_test_provider_factory();
let task = create_test_state_root_task(test_provider_factory);
let addr_preblock = alloy_primitives::Address::random();
let addr_tx = alloy_primitives::Address::random();
let addr_postblock = alloy_primitives::Address::random();
let create_state_update = |addr: alloy_primitives::Address, balance: u64, nonce: u64| {
let mut state = EvmState::default();
state.insert(
addr,
Account {
info: revm_state::AccountInfo {
balance: U256::from(balance),
nonce,
code_hash: Default::default(),
code: Default::default(),
},
transaction_id: Default::default(),
storage: Default::default(),
status: revm_state::AccountStatus::Touched,
},
);
state
};
let source_preblock =
StateChangeSource::PreBlock(StateChangePreBlockSource::BeaconRootContract);
let source_tx = StateChangeSource::Transaction(123);
let source_postblock =
StateChangeSource::PostBlock(StateChangePostBlockSource::BalanceIncrements);
// Queue mixed sources; ensure we don't hit the batching cap.
let tx = task.state_root_message_sender();
tx.send(MultiProofMessage::StateUpdate(
source_preblock,
create_state_update(addr_preblock, 1, 1),
))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(source_tx, create_state_update(addr_tx, 2, 2)))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(
source_postblock,
create_state_update(addr_postblock, 3, 3),
))
.unwrap();
let mut pending_msg: Option<MultiProofMessage> = None;
if let Ok(MultiProofMessage::StateUpdate(first_source, first_update)) = task.rx.recv() {
assert!(same_state_change_source(first_source, source));
assert!(first_update.contains_key(&addr1));
let mut accumulated_updates = vec![(first_source, first_update)];
let mut accumulated_targets = estimate_evm_state_targets(&accumulated_updates[0].1);
let mut accumulated_updates: Vec<(StateChangeSource, EvmState)> = Vec::new();
let mut accumulated_targets = 0usize;
loop {
if accumulated_targets >= STATE_UPDATE_MAX_BATCH_TARGETS {
break;
}
while accumulated_targets < STATE_UPDATE_MAX_BATCH_TARGETS {
match task.rx.try_recv() {
Ok(MultiProofMessage::StateUpdate(next_source, next_update)) => {
if let Some((batch_source, batch_update)) = accumulated_updates.first() &&
!can_batch_state_update(
*batch_source,
batch_update,
next_source,
&next_update,
)
{
pending_msg =
Some(MultiProofMessage::StateUpdate(next_source, next_update));
break;
}
let next_estimate = estimate_evm_state_targets(&next_update);
if next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS {
pending_msg =
Some(MultiProofMessage::StateUpdate(next_source, next_update));
break;
}
if accumulated_targets + next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS &&
!accumulated_updates.is_empty()
{
if accumulated_targets + next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS {
pending_msg =
Some(MultiProofMessage::StateUpdate(next_source, next_update));
break;
@@ -2354,24 +2549,128 @@ mod tests {
}
}
assert_eq!(
accumulated_updates.len(),
1,
"Second pre-block update should not merge with a different payload"
assert!(
pending_msg.is_none(),
"Mixed sources within cap should batch without leaving pending messages"
);
let (batched_source, batched_update) = accumulated_updates.remove(0);
assert!(same_state_change_source(batched_source, source));
assert!(batched_update.contains_key(&addr2));
assert!(!batched_update.contains_key(&addr3));
assert_eq!(accumulated_updates.len(), 3);
match pending_msg {
Some(MultiProofMessage::StateUpdate(_, pending_update)) => {
assert!(pending_update.contains_key(&addr3));
}
other => panic!("Expected pending third pre-block update, got {:?}", other),
let mut accumulated_iter = accumulated_updates.into_iter();
let (batch_source, mut merged_update) =
accumulated_iter.next().expect("at least one update");
for (_, next_update) in accumulated_iter {
merged_update.extend(next_update);
}
// Batch source should stay the first source that arrived (PreBlock).
assert!(matches!(batch_source, StateChangeSource::PreBlock(_)));
assert_eq!(merged_update.len(), 3);
assert!(merged_update.contains_key(&addr_preblock));
assert!(merged_update.contains_key(&addr_tx));
assert!(merged_update.contains_key(&addr_postblock));
} else {
panic!("Expected first StateUpdate");
panic!("Expected initial StateUpdate");
}
}
/// Verifies that mixed source batching respects the target limit.
#[test]
fn test_mixed_sources_respect_target_limit() {
use alloy_evm::block::{StateChangePreBlockSource, StateChangeSource};
use revm_state::Account;
let test_provider_factory = create_test_provider_factory();
let task = create_test_state_root_task(test_provider_factory);
// Create a helper to generate state updates with many storage slots
let create_large_state_update = |addr: alloy_primitives::Address, num_slots: usize| {
let mut state = EvmState::default();
let mut storage = revm_state::EvmStorage::default();
for i in 0..num_slots {
storage.insert(
U256::from(i),
revm_state::EvmStorageSlot::new_changed(U256::ZERO, U256::from(i), 0),
);
}
state.insert(
addr,
Account {
info: revm_state::AccountInfo {
balance: U256::from(100),
nonce: 1,
code_hash: Default::default(),
code: Default::default(),
},
transaction_id: Default::default(),
storage,
status: revm_state::AccountStatus::Touched,
},
);
state
};
let addr1 = alloy_primitives::Address::random();
let addr2 = alloy_primitives::Address::random();
let addr3 = alloy_primitives::Address::random();
// Create updates that will exceed STATE_UPDATE_MAX_BATCH_TARGETS (64)
// Each update: 1 account + slots = targets
// First update: 1 + 30 = 31 targets
// Second update: 1 + 30 = 31 targets (total: 62, still under 64)
// Third update: 1 + 30 = 31 targets (total would be 93, exceeds 64)
let source_tx = StateChangeSource::Transaction(1);
let source_preblock =
StateChangeSource::PreBlock(StateChangePreBlockSource::BeaconRootContract);
let source_tx2 = StateChangeSource::Transaction(2);
let tx = task.state_root_message_sender();
tx.send(MultiProofMessage::StateUpdate(source_tx, create_large_state_update(addr1, 30)))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(
source_preblock,
create_large_state_update(addr2, 30),
))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(source_tx2, create_large_state_update(addr3, 30)))
.unwrap();
// First receive should get first update
if let Ok(MultiProofMessage::StateUpdate(_, first_update)) = task.rx.recv() {
let mut merged_update = first_update;
let mut num_batched = 1;
let mut accumulated_targets = estimate_evm_state_targets(&merged_update);
let mut pending_update: Option<EvmState> = None;
// Batch while under limit
while accumulated_targets < STATE_UPDATE_MAX_BATCH_TARGETS {
match task.rx.try_recv() {
Ok(MultiProofMessage::StateUpdate(_, next_update)) => {
let next_targets = estimate_evm_state_targets(&next_update);
if accumulated_targets + next_targets > STATE_UPDATE_MAX_BATCH_TARGETS {
// Would exceed limit - save as pending and stop batching
pending_update = Some(next_update);
break;
}
accumulated_targets += next_targets;
merged_update.extend(next_update);
num_batched += 1;
}
_ => break,
}
}
// Should have batched first two updates (62 targets) but not the third
assert_eq!(num_batched, 2, "Should batch updates up to target limit");
assert!(merged_update.contains_key(&addr1));
assert!(merged_update.contains_key(&addr2));
assert!(!merged_update.contains_key(&addr3), "Third update should not be batched");
// Third update should be pending (not batched due to target limit)
let pending = pending_update.expect("Third update should be pending");
assert!(pending.contains_key(&addr3), "Third update should contain addr3");
} else {
panic!("Expected StateUpdate message");
}
}

View File

@@ -29,7 +29,7 @@ use metrics::{Counter, Gauge, Histogram};
use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
use reth_metrics::Metrics;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{BlockReader, StateProviderBox, StateProviderFactory, StateReader};
use reth_provider::{BlockReader, StateProviderFactory, StateReader};
use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
use reth_trie::MultiProofTargets;
use std::{
@@ -255,35 +255,31 @@ where
self;
let hash = env.hash;
if let Some(saved_cache) = saved_cache {
debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
// Perform all cache operations atomically under the lock
execution_cache.update_with_guard(|cached| {
// consumes the `SavedCache` held by the prewarming task, which releases its usage
// guard
let (caches, cache_metrics) = saved_cache.split();
let new_cache = SavedCache::new(hash, caches, cache_metrics);
debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
// Perform all cache operations atomically under the lock
execution_cache.update_with_guard(|cached| {
// consumes the `SavedCache` held by the prewarming task, which releases its usage guard
let (caches, cache_metrics) = saved_cache.split();
let new_cache = SavedCache::new(hash, caches, cache_metrics);
// Insert state into cache while holding the lock
if new_cache.cache().insert_state(&state).is_err() {
// Clear the cache on error to prevent having a polluted cache
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on update error");
return;
}
// Insert state into cache while holding the lock
if new_cache.cache().insert_state(&state).is_err() {
// Clear the cache on error to prevent having a polluted cache
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on update error");
return;
}
new_cache.update_metrics();
new_cache.update_metrics();
// Replace the shared cache with the new one; the previous cache (if any) is
// dropped.
*cached = Some(new_cache);
});
// Replace the shared cache with the new one; the previous cache (if any) is dropped.
*cached = Some(new_cache);
});
let elapsed = start.elapsed();
debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
let elapsed = start.elapsed();
debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
metrics.cache_saving_duration.set(elapsed.as_secs_f64());
}
metrics.cache_saving_duration.set(elapsed.as_secs_f64());
}
/// Executes the task.
@@ -360,7 +356,7 @@ where
{
pub(super) env: ExecutionEnv<Evm>,
pub(super) evm_config: Evm,
pub(super) saved_cache: Option<SavedCache>,
pub(super) saved_cache: SavedCache,
/// Provider to obtain the state
pub(super) provider: StateProviderBuilder<N, P>,
pub(super) metrics: PrewarmMetrics,
@@ -404,13 +400,10 @@ where
};
// Use the caches to create a new provider with caching
let state_provider: StateProviderBox = if let Some(saved_cache) = saved_cache {
let caches = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
Box::new(CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics))
} else {
state_provider
};
let caches = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
let state_provider =
CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
let state_provider = StateProviderDatabase::new(state_provider);

View File

@@ -11,7 +11,6 @@ use crate::tree::{
StateProviderDatabase, TreeConfig,
};
use alloy_consensus::transaction::Either;
use alloy_eip7928::BlockAccessList;
use alloy_eips::{eip1898::BlockWithParent, NumHash};
use alloy_evm::Evm;
use alloy_primitives::B256;
@@ -369,7 +368,7 @@ where
)
.into())
};
let mut state_provider = ensure_ok!(provider_builder.build());
let state_provider = ensure_ok!(provider_builder.build());
drop(_enter);
// fetch parent block
@@ -412,19 +411,18 @@ where
// Use cached state provider before executing, used in execution after prewarming threads
// complete
if let Some((caches, cache_metrics)) = handle.caches().zip(handle.cache_metrics()) {
state_provider = Box::new(CachedStateProvider::new_with_caches(
state_provider,
caches,
cache_metrics,
));
};
let state_provider = CachedStateProvider::new_with_caches(
state_provider,
handle.caches(),
handle.cache_metrics(),
);
// Execute the block and handle any execution errors
let (output, senders) = match if self.config.state_provider_metrics() {
let state_provider =
InstrumentedStateProvider::from_state_provider(&state_provider, "engine");
self.execute_block(&state_provider, env, &input, &mut handle)
let state_provider = InstrumentedStateProvider::from_state_provider(&state_provider);
let result = self.execute_block(&state_provider, env, &input, &mut handle);
state_provider.record_total_latency();
result
} else {
self.execute_block(&state_provider, env, &input, &mut handle)
} {
@@ -1245,10 +1243,4 @@ impl<T: PayloadTypes> BlockOrPayload<T> {
Self::Block(_) => "block",
}
}
/// Returns the block access list if available.
pub const fn block_access_list(&self) -> Option<Result<BlockAccessList, alloy_rlp::Error>> {
// TODO decode and return `BlockAccessList`
None
}
}

View File

@@ -5,6 +5,7 @@ use alloy_consensus::{
};
use alloy_eips::merge::BEACON_NONCE;
use alloy_evm::{block::BlockExecutorFactory, eth::EthBlockExecutionCtx};
use alloy_primitives::Bytes;
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_evm::execute::{BlockAssembler, BlockAssemblerInput, BlockExecutionError};
use reth_execution_types::BlockExecutionResult;
@@ -16,12 +17,14 @@ use revm::context::Block as _;
pub struct EthBlockAssembler<ChainSpec = reth_chainspec::ChainSpec> {
/// The chainspec.
pub chain_spec: Arc<ChainSpec>,
/// Extra data to use for the blocks.
pub extra_data: Bytes,
}
impl<ChainSpec> EthBlockAssembler<ChainSpec> {
/// Creates a new [`EthBlockAssembler`].
pub const fn new(chain_spec: Arc<ChainSpec>) -> Self {
Self { chain_spec }
pub fn new(chain_spec: Arc<ChainSpec>) -> Self {
Self { chain_spec, extra_data: Default::default() }
}
}
@@ -107,7 +110,7 @@ where
gas_limit: evm_env.block_env.gas_limit(),
difficulty: evm_env.block_env.difficulty(),
gas_used: *gas_used,
extra_data: ctx.extra_data,
extra_data: self.extra_data.clone(),
parent_beacon_block_root: ctx.parent_beacon_block_root,
blob_gas_used: block_blob_gas_used,
excess_blob_gas,

View File

@@ -116,6 +116,12 @@ impl<ChainSpec, EvmFactory> EthEvmConfig<ChainSpec, EvmFactory> {
pub const fn chain_spec(&self) -> &Arc<ChainSpec> {
self.executor_factory.spec()
}
/// Sets the extra data for the block assembler.
pub fn with_extra_data(mut self, extra_data: Bytes) -> Self {
self.block_assembler.extra_data = extra_data;
self
}
}
impl<ChainSpec, EvmF> ConfigureEvm for EthEvmConfig<ChainSpec, EvmF>
@@ -187,7 +193,6 @@ where
parent_beacon_block_root: block.header().parent_beacon_block_root,
ommers: &block.body().ommers,
withdrawals: block.body().withdrawals.as_ref().map(Cow::Borrowed),
extra_data: block.header().extra_data.clone(),
})
}
@@ -201,7 +206,6 @@ where
parent_beacon_block_root: attributes.parent_beacon_block_root,
ommers: &[],
withdrawals: attributes.withdrawals.map(Cow::Owned),
extra_data: attributes.extra_data,
})
}
}
@@ -278,7 +282,6 @@ where
parent_beacon_block_root: payload.sidecar.parent_beacon_block_root(),
ommers: &[],
withdrawals: payload.payload.withdrawals().map(|w| Cow::Owned(w.clone().into())),
extra_data: payload.payload.as_v1().extra_data.clone(),
})
}

View File

@@ -61,8 +61,6 @@ reth-node-core.workspace = true
reth-e2e-test-utils.workspace = true
reth-tasks.workspace = true
reth-testing-utils.workspace = true
tempfile.workspace = true
jsonrpsee-core.workspace = true
alloy-primitives.workspace = true
alloy-provider.workspace = true

View File

@@ -32,15 +32,15 @@ use reth_node_builder::{
EngineValidatorBuilder, EthApiBuilder, EthApiCtx, Identity, PayloadValidatorBuilder,
RethRpcAddOns, RpcAddOns, RpcHandle,
},
BuilderContext, DebugNode, Node, NodeAdapter,
BuilderContext, DebugNode, Node, NodeAdapter, PayloadBuilderConfig,
};
use reth_payload_primitives::PayloadTypes;
use reth_provider::{providers::ProviderFactoryBuilder, EthStorage};
use reth_rpc::{
eth::core::{EthApiFor, EthRpcConverterFor},
TestingApi, ValidationApi,
ValidationApi,
};
use reth_rpc_api::servers::{BlockSubmissionValidationApiServer, TestingApiServer};
use reth_rpc_api::servers::BlockSubmissionValidationApiServer;
use reth_rpc_builder::{config::RethRpcServerConfig, middleware::RethRpcMiddleware};
use reth_rpc_eth_api::{
helpers::{
@@ -313,17 +313,6 @@ where
.modules
.merge_if_module_configured(RethRpcModule::Eth, eth_config.into_rpc())?;
// testing_buildBlockV1: only wire when the hidden testing module is explicitly
// requested on any transport. Default stays disabled to honor security guidance.
let testing_api = TestingApi::new(
container.registry.eth_api().clone(),
container.registry.evm_config().clone(),
)
.into_rpc();
container
.modules
.merge_if_module_configured(RethRpcModule::Testing, testing_api)?;
Ok(())
})
.await
@@ -437,7 +426,9 @@ where
type EVM = EthEvmConfig<Types::ChainSpec>;
async fn build_evm(self, ctx: &BuilderContext<Node>) -> eyre::Result<Self::EVM> {
Ok(EthEvmConfig::new(ctx.chain_spec()))
let evm_config = EthEvmConfig::new(ctx.chain_spec())
.with_extra_data(ctx.payload_builder_config().extra_data_bytes());
Ok(evm_config)
}
}

View File

@@ -54,8 +54,7 @@ where
evm_config,
EthereumBuilderConfig::new()
.with_gas_limit(gas_limit)
.with_max_blobs_per_block(conf.max_blobs_per_block())
.with_extra_data(conf.extra_data_bytes()),
.with_max_blobs_per_block(conf.max_blobs_per_block()),
))
}
}

View File

@@ -2,6 +2,5 @@
mod builder;
mod exex;
mod testing;
const fn main() {}

View File

@@ -1,84 +0,0 @@
//! E2E tests for the testing RPC namespace.
use alloy_primitives::{Address, B256};
use alloy_rpc_types_engine::ExecutionPayloadEnvelopeV4;
use jsonrpsee_core::client::ClientT;
use reth_db::test_utils::create_test_rw_db;
use reth_ethereum_engine_primitives::EthPayloadAttributes;
use reth_node_builder::{NodeBuilder, NodeConfig};
use reth_node_core::{
args::DatadirArgs,
dirs::{DataDirPath, MaybePlatformPath},
};
use reth_node_ethereum::{node::EthereumAddOns, EthereumNode};
use reth_rpc_api::TestingBuildBlockRequestV1;
use reth_rpc_server_types::{RethRpcModule, RpcModuleSelection};
use reth_tasks::TaskManager;
use std::str::FromStr;
use tempfile::tempdir;
use tokio::sync::oneshot;
#[tokio::test(flavor = "multi_thread")]
async fn testing_rpc_build_block_works() -> eyre::Result<()> {
let tasks = TaskManager::current();
let mut rpc_args = reth_node_core::args::RpcServerArgs::default().with_http();
rpc_args.http_api = Some(RpcModuleSelection::from_iter([RethRpcModule::Testing]));
let tempdir = tempdir().expect("temp datadir");
let datadir_args = DatadirArgs {
datadir: MaybePlatformPath::<DataDirPath>::from_str(tempdir.path().to_str().unwrap())
.expect("valid datadir"),
static_files_path: Some(tempdir.path().join("static")),
};
let config = NodeConfig::test().with_datadir_args(datadir_args).with_rpc(rpc_args);
let db = create_test_rw_db();
let (tx, rx): (
oneshot::Sender<eyre::Result<ExecutionPayloadEnvelopeV4>>,
oneshot::Receiver<eyre::Result<ExecutionPayloadEnvelopeV4>>,
) = oneshot::channel();
let builder = NodeBuilder::new(config)
.with_database(db)
.with_launch_context(tasks.executor())
.with_types::<EthereumNode>()
.with_components(EthereumNode::components())
.with_add_ons(EthereumAddOns::default())
.on_rpc_started(move |ctx, handles| {
let Some(client) = handles.rpc.http_client() else { return Ok(()) };
let chain = ctx.config().chain.clone();
let parent_block_hash = chain.genesis_hash();
let payload_attributes = EthPayloadAttributes {
timestamp: chain.genesis().timestamp + 1,
prev_randao: B256::ZERO,
suggested_fee_recipient: Address::ZERO,
withdrawals: None,
parent_beacon_block_root: None,
};
let request = TestingBuildBlockRequestV1 {
parent_block_hash,
payload_attributes,
transactions: vec![],
extra_data: None,
};
tokio::spawn(async move {
let res: eyre::Result<ExecutionPayloadEnvelopeV4> =
client.request("testing_buildBlockV1", [request]).await.map_err(Into::into);
let _ = tx.send(res);
});
Ok(())
});
// Launch the node with the default engine launcher.
let launcher = builder.engine_api_launcher();
let _node = builder.launch_with(launcher).await?;
// Wait for the testing RPC call to return.
let res = rx.await.expect("testing_buildBlockV1 response");
assert!(res.is_ok(), "testing_buildBlockV1 failed: {:?}", res.err());
Ok(())
}

View File

@@ -1,5 +1,4 @@
use alloy_eips::eip1559::ETHEREUM_BLOCK_GAS_LIMIT_30M;
use alloy_primitives::Bytes;
use reth_primitives_traits::constants::GAS_LIMIT_BOUND_DIVISOR;
/// Settings for the Ethereum builder.
@@ -14,8 +13,6 @@ pub struct EthereumBuilderConfig {
///
/// If `None`, defaults to the protocol maximum.
pub max_blobs_per_block: Option<u64>,
/// Extra data for built blocks.
pub extra_data: Bytes,
}
impl Default for EthereumBuilderConfig {
@@ -31,7 +28,6 @@ impl EthereumBuilderConfig {
desired_gas_limit: ETHEREUM_BLOCK_GAS_LIMIT_30M,
await_payload_on_missing: true,
max_blobs_per_block: None,
extra_data: Bytes::new(),
}
}
@@ -53,12 +49,6 @@ impl EthereumBuilderConfig {
self.max_blobs_per_block = max_blobs_per_block;
self
}
/// Set the extra data for built blocks.
pub fn with_extra_data(mut self, extra_data: Bytes) -> Self {
self.extra_data = extra_data;
self
}
}
impl EthereumBuilderConfig {

View File

@@ -168,7 +168,6 @@ where
gas_limit: builder_config.gas_limit(parent_header.gas_limit),
parent_beacon_block_root: attributes.parent_beacon_block_root(),
withdrawals: Some(attributes.withdrawals().clone()),
extra_data: builder_config.extra_data,
},
)
.map_err(PayloadBuilderError::other)?;

View File

@@ -28,7 +28,7 @@ use alloy_evm::{
block::{BlockExecutorFactory, BlockExecutorFor},
precompiles::PrecompilesMap,
};
use alloy_primitives::{Address, Bytes, B256};
use alloy_primitives::{Address, B256};
use core::{error::Error, fmt::Debug};
use execute::{BasicBlockExecutor, BlockAssembler, BlockBuilder};
use reth_execution_errors::BlockExecutionError;
@@ -501,8 +501,6 @@ pub struct NextBlockEnvAttributes {
pub parent_beacon_block_root: Option<B256>,
/// Withdrawals
pub withdrawals: Option<Withdrawals>,
/// Optional extra data.
pub extra_data: Bytes,
}
/// Abstraction over transaction environment.

View File

@@ -1372,6 +1372,21 @@ where
// reallocations
let mut new_txs = Vec::with_capacity(transactions.len());
for tx in transactions {
// recover transaction
let tx = match tx.try_into_recovered() {
Ok(tx) => tx,
Err(badtx) => {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%badtx.tx_hash(),
client_version=%peer.client_version,
"failed ecrecovery for transaction"
);
has_bad_transactions = true;
continue
}
};
match self.transactions_by_peers.entry(*tx.tx_hash()) {
Entry::Occupied(mut entry) => {
// transaction was already inserted
@@ -1389,21 +1404,6 @@ where
} else {
// this is a new transaction that should be imported into the pool
// recover transaction
let tx = match tx.try_into_recovered() {
Ok(tx) => tx,
Err(badtx) => {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%badtx.tx_hash(),
client_version=%peer.client_version,
"failed ecrecovery for transaction"
);
has_bad_transactions = true;
continue
}
};
let pool_transaction = Pool::Transaction::from_pooled(tx);
new_txs.push(pool_transaction);

View File

@@ -533,27 +533,6 @@ where
}
/// Modifies the addons with the given closure.
///
/// This method provides access to methods on the addons type that don't have
/// direct builder methods. It's useful for advanced configuration scenarios
/// where you need to call addon-specific methods.
///
/// # Examples
///
/// ```rust,ignore
/// use tower::layer::util::Identity;
///
/// let builder = NodeBuilder::new(config)
/// .with_types::<EthereumNode>()
/// .with_components(EthereumNode::components())
/// .with_add_ons(EthereumAddOns::default())
/// .map_add_ons(|addons| addons.with_rpc_middleware(Identity::default()));
/// ```
///
/// # See also
///
/// - [`NodeAddOns`] trait for available addon types
/// - [`crate::NodeBuilderWithComponents::extend_rpc_modules`] for RPC module configuration
pub fn map_add_ons<F>(self, f: F) -> Self
where
F: FnOnce(AO) -> AO,
@@ -600,10 +579,10 @@ where
/// .extend_rpc_modules(|ctx| {
/// // Access node components, so they can used by the CustomApi
/// let pool = ctx.pool().clone();
///
///
/// // Add custom RPC namespace
/// ctx.modules.merge_configured(CustomApi { pool }.into_rpc())?;
///
///
/// Ok(())
/// })
/// .build()?;
@@ -859,8 +838,8 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
.request_handler(self.provider().clone())
.split_with_handle();
self.executor.spawn_critical_blocking("p2p txpool", Box::pin(txpool));
self.executor.spawn_critical_blocking("p2p eth request handler", Box::pin(eth));
self.executor.spawn_critical("p2p txpool", Box::pin(txpool));
self.executor.spawn_critical("p2p eth request handler", Box::pin(eth));
let default_peers_path = self.config().datadir().known_peers();
let known_peers_file = self.config().network.persistent_peers_file(default_peers_path);

View File

@@ -235,27 +235,6 @@ where
}
/// Modifies the addons with the given closure.
///
/// This method provides access to methods on the addons type that don't have
/// direct builder methods. It's useful for advanced configuration scenarios
/// where you need to call addon-specific methods.
///
/// # Examples
///
/// ```rust,ignore
/// use tower::layer::util::Identity;
///
/// let builder = NodeBuilder::new(config)
/// .with_types::<EthereumNode>()
/// .with_components(EthereumNode::components())
/// .with_add_ons(EthereumAddOns::default())
/// .map_add_ons(|addons| addons.with_rpc_middleware(Identity::default()));
/// ```
///
/// # See also
///
/// - [`NodeAddOns`] trait for available addon types
/// - [`crate::NodeBuilderWithComponents::extend_rpc_modules`] for RPC module configuration
pub fn map_add_ons<F>(mut self, f: F) -> Self
where
F: FnOnce(AO) -> AO,

View File

@@ -1010,7 +1010,7 @@ where
.with_executor(Box::new(node.task_executor().clone()))
.with_evm_config(node.evm_config().clone())
.with_consensus(node.consensus().clone())
.build_with_auth_server(module_config, engine_api, eth_api, engine_events.clone());
.build_with_auth_server(module_config, engine_api, eth_api);
// in dev mode we generate 20 random dev-signer accounts
if config.dev.dev {
@@ -1179,7 +1179,6 @@ impl<'a, N: FullNodeComponents<Types: NodeTypes<ChainSpec: Hardforks + EthereumH
.proof_permits(self.config.proof_permits)
.gas_oracle_config(self.config.gas_oracle)
.max_batch_size(self.config.max_batch_size)
.max_blocking_io_requests(self.config.max_blocking_io_requests)
.pending_block_kind(self.config.pending_block_kind)
.raw_tx_forwarder(self.config.raw_tx_forwarder)
.evm_memory_limit(self.config.rpc_evm_memory_limit)
@@ -1189,7 +1188,10 @@ impl<'a, N: FullNodeComponents<Types: NodeTypes<ChainSpec: Hardforks + EthereumH
/// A `EthApi` that knows how to build `eth` namespace API from [`FullNodeComponents`].
pub trait EthApiBuilder<N: FullNodeComponents>: Default + Send + 'static {
/// The Ethapi implementation this builder will build.
type EthApi: FullEthApiServer<Provider = N::Provider, Pool = N::Pool>;
type EthApi: EthApiTypes
+ FullEthApiServer<Provider = N::Provider, Pool = N::Pool>
+ Unpin
+ 'static;
/// Builds the [`EthApiServer`](reth_rpc_api::eth::EthApiServer) from the given context.
fn build_eth_api(

View File

@@ -1,198 +1,13 @@
//! clap [Args](clap::Args) for engine purposes
use clap::{builder::Resettable, Args};
use clap::Args;
use reth_engine_primitives::{TreeConfig, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE};
use std::sync::OnceLock;
use crate::node_config::{
DEFAULT_CROSS_BLOCK_CACHE_SIZE_MB, DEFAULT_MEMORY_BLOCK_BUFFER_TARGET,
DEFAULT_PERSISTENCE_THRESHOLD, DEFAULT_RESERVED_CPU_CORES,
};
/// Global static engine defaults
static ENGINE_DEFAULTS: OnceLock<DefaultEngineValues> = OnceLock::new();
/// Default values for engine that can be customized
///
/// Global defaults can be set via [`DefaultEngineValues::try_init`].
#[derive(Debug, Clone)]
pub struct DefaultEngineValues {
persistence_threshold: u64,
memory_block_buffer_target: u64,
legacy_state_root_task_enabled: bool,
state_cache_disabled: bool,
prewarming_disabled: bool,
parallel_sparse_trie_disabled: bool,
state_provider_metrics: bool,
cross_block_cache_size: u64,
state_root_task_compare_updates: bool,
accept_execution_requests_hash: bool,
multiproof_chunking_enabled: bool,
multiproof_chunk_size: usize,
reserved_cpu_cores: usize,
precompile_cache_disabled: bool,
state_root_fallback: bool,
always_process_payload_attributes_on_canonical_head: bool,
allow_unwind_canonical_header: bool,
storage_worker_count: Option<usize>,
account_worker_count: Option<usize>,
}
impl DefaultEngineValues {
/// Initialize the global engine defaults with this configuration
pub fn try_init(self) -> Result<(), Self> {
ENGINE_DEFAULTS.set(self)
}
/// Get a reference to the global engine defaults
pub fn get_global() -> &'static Self {
ENGINE_DEFAULTS.get_or_init(Self::default)
}
/// Set the default persistence threshold
pub const fn with_persistence_threshold(mut self, v: u64) -> Self {
self.persistence_threshold = v;
self
}
/// Set the default memory block buffer target
pub const fn with_memory_block_buffer_target(mut self, v: u64) -> Self {
self.memory_block_buffer_target = v;
self
}
/// Set whether to enable legacy state root task by default
pub const fn with_legacy_state_root_task_enabled(mut self, v: bool) -> Self {
self.legacy_state_root_task_enabled = v;
self
}
/// Set whether to disable state cache by default
pub const fn with_state_cache_disabled(mut self, v: bool) -> Self {
self.state_cache_disabled = v;
self
}
/// Set whether to disable prewarming by default
pub const fn with_prewarming_disabled(mut self, v: bool) -> Self {
self.prewarming_disabled = v;
self
}
/// Set whether to disable parallel sparse trie by default
pub const fn with_parallel_sparse_trie_disabled(mut self, v: bool) -> Self {
self.parallel_sparse_trie_disabled = v;
self
}
/// Set whether to enable state provider metrics by default
pub const fn with_state_provider_metrics(mut self, v: bool) -> Self {
self.state_provider_metrics = v;
self
}
/// Set the default cross-block cache size in MB
pub const fn with_cross_block_cache_size(mut self, v: u64) -> Self {
self.cross_block_cache_size = v;
self
}
/// Set whether to compare state root task updates by default
pub const fn with_state_root_task_compare_updates(mut self, v: bool) -> Self {
self.state_root_task_compare_updates = v;
self
}
/// Set whether to accept execution requests hash by default
pub const fn with_accept_execution_requests_hash(mut self, v: bool) -> Self {
self.accept_execution_requests_hash = v;
self
}
/// Set whether to enable multiproof chunking by default
pub const fn with_multiproof_chunking_enabled(mut self, v: bool) -> Self {
self.multiproof_chunking_enabled = v;
self
}
/// Set the default multiproof chunk size
pub const fn with_multiproof_chunk_size(mut self, v: usize) -> Self {
self.multiproof_chunk_size = v;
self
}
/// Set the default number of reserved CPU cores
pub const fn with_reserved_cpu_cores(mut self, v: usize) -> Self {
self.reserved_cpu_cores = v;
self
}
/// Set whether to disable precompile cache by default
pub const fn with_precompile_cache_disabled(mut self, v: bool) -> Self {
self.precompile_cache_disabled = v;
self
}
/// Set whether to enable state root fallback by default
pub const fn with_state_root_fallback(mut self, v: bool) -> Self {
self.state_root_fallback = v;
self
}
/// Set whether to always process payload attributes on canonical head by default
pub const fn with_always_process_payload_attributes_on_canonical_head(
mut self,
v: bool,
) -> Self {
self.always_process_payload_attributes_on_canonical_head = v;
self
}
/// Set whether to allow unwinding canonical header by default
pub const fn with_allow_unwind_canonical_header(mut self, v: bool) -> Self {
self.allow_unwind_canonical_header = v;
self
}
/// Set the default storage worker count
pub const fn with_storage_worker_count(mut self, v: Option<usize>) -> Self {
self.storage_worker_count = v;
self
}
/// Set the default account worker count
pub const fn with_account_worker_count(mut self, v: Option<usize>) -> Self {
self.account_worker_count = v;
self
}
}
impl Default for DefaultEngineValues {
fn default() -> Self {
Self {
persistence_threshold: DEFAULT_PERSISTENCE_THRESHOLD,
memory_block_buffer_target: DEFAULT_MEMORY_BLOCK_BUFFER_TARGET,
legacy_state_root_task_enabled: false,
state_cache_disabled: false,
prewarming_disabled: false,
parallel_sparse_trie_disabled: false,
state_provider_metrics: false,
cross_block_cache_size: DEFAULT_CROSS_BLOCK_CACHE_SIZE_MB,
state_root_task_compare_updates: false,
accept_execution_requests_hash: false,
multiproof_chunking_enabled: true,
multiproof_chunk_size: DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE,
reserved_cpu_cores: DEFAULT_RESERVED_CPU_CORES,
precompile_cache_disabled: false,
state_root_fallback: false,
always_process_payload_attributes_on_canonical_head: false,
allow_unwind_canonical_header: false,
storage_worker_count: None,
account_worker_count: None,
}
}
}
/// Parameters for configuring the engine driver.
#[derive(Debug, Clone, Args, PartialEq, Eq)]
#[command(next_help_heading = "Engine")]
@@ -203,15 +18,15 @@ pub struct EngineArgs {
///
/// To persist blocks as fast as the node receives them, set this value to zero. This will
/// cause more frequent DB writes.
#[arg(long = "engine.persistence-threshold", default_value_t = DefaultEngineValues::get_global().persistence_threshold)]
#[arg(long = "engine.persistence-threshold", default_value_t = DEFAULT_PERSISTENCE_THRESHOLD)]
pub persistence_threshold: u64,
/// Configure the target number of blocks to keep in memory.
#[arg(long = "engine.memory-block-buffer-target", default_value_t = DefaultEngineValues::get_global().memory_block_buffer_target)]
#[arg(long = "engine.memory-block-buffer-target", default_value_t = DEFAULT_MEMORY_BLOCK_BUFFER_TARGET)]
pub memory_block_buffer_target: u64,
/// Enable legacy state root
#[arg(long = "engine.legacy-state-root", default_value_t = DefaultEngineValues::get_global().legacy_state_root_task_enabled)]
#[arg(long = "engine.legacy-state-root", default_value = "false")]
pub legacy_state_root_task_enabled: bool,
/// CAUTION: This CLI flag has no effect anymore, use --engine.disable-caching-and-prewarming
@@ -220,12 +35,8 @@ pub struct EngineArgs {
#[deprecated]
pub caching_and_prewarming_enabled: bool,
/// Disable state cache
#[arg(long = "engine.disable-state-cache", default_value_t = DefaultEngineValues::get_global().state_cache_disabled)]
pub state_cache_disabled: bool,
/// Disable parallel prewarming
#[arg(long = "engine.disable-prewarming", alias = "engine.disable-caching-and-prewarming", default_value_t = DefaultEngineValues::get_global().prewarming_disabled)]
#[arg(long = "engine.disable-prewarming", alias = "engine.disable-caching-and-prewarming")]
pub prewarming_disabled: bool,
/// CAUTION: This CLI flag has no effect anymore, use --engine.disable-parallel-sparse-trie
@@ -235,38 +46,38 @@ pub struct EngineArgs {
pub parallel_sparse_trie_enabled: bool,
/// Disable the parallel sparse trie in the engine.
#[arg(long = "engine.disable-parallel-sparse-trie", default_value_t = DefaultEngineValues::get_global().parallel_sparse_trie_disabled)]
#[arg(long = "engine.disable-parallel-sparse-trie", default_value = "false")]
pub parallel_sparse_trie_disabled: bool,
/// Enable state provider latency metrics. This allows the engine to collect and report stats
/// about how long state provider calls took during execution, but this does introduce slight
/// overhead to state provider calls.
#[arg(long = "engine.state-provider-metrics", default_value_t = DefaultEngineValues::get_global().state_provider_metrics)]
#[arg(long = "engine.state-provider-metrics", default_value = "false")]
pub state_provider_metrics: bool,
/// Configure the size of cross-block cache in megabytes
#[arg(long = "engine.cross-block-cache-size", default_value_t = DefaultEngineValues::get_global().cross_block_cache_size)]
#[arg(long = "engine.cross-block-cache-size", default_value_t = DEFAULT_CROSS_BLOCK_CACHE_SIZE_MB)]
pub cross_block_cache_size: u64,
/// Enable comparing trie updates from the state root task to the trie updates from the regular
/// state root calculation.
#[arg(long = "engine.state-root-task-compare-updates", default_value_t = DefaultEngineValues::get_global().state_root_task_compare_updates)]
#[arg(long = "engine.state-root-task-compare-updates")]
pub state_root_task_compare_updates: bool,
/// Enables accepting requests hash instead of an array of requests in `engine_newPayloadV4`.
#[arg(long = "engine.accept-execution-requests-hash", default_value_t = DefaultEngineValues::get_global().accept_execution_requests_hash)]
#[arg(long = "engine.accept-execution-requests-hash")]
pub accept_execution_requests_hash: bool,
/// Whether multiproof task should chunk proof targets.
#[arg(long = "engine.multiproof-chunking", default_value_t = DefaultEngineValues::get_global().multiproof_chunking_enabled)]
#[arg(long = "engine.multiproof-chunking", default_value = "true")]
pub multiproof_chunking_enabled: bool,
/// Multiproof task chunk size for proof targets.
#[arg(long = "engine.multiproof-chunk-size", default_value_t = DefaultEngineValues::get_global().multiproof_chunk_size)]
#[arg(long = "engine.multiproof-chunk-size", default_value_t = DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE)]
pub multiproof_chunk_size: usize,
/// Configure the number of reserved CPU cores for non-reth processes
#[arg(long = "engine.reserved-cpu-cores", default_value_t = DefaultEngineValues::get_global().reserved_cpu_cores)]
#[arg(long = "engine.reserved-cpu-cores", default_value_t = DEFAULT_RESERVED_CPU_CORES)]
pub reserved_cpu_cores: usize,
/// CAUTION: This CLI flag has no effect anymore, use --engine.disable-precompile-cache
@@ -276,11 +87,11 @@ pub struct EngineArgs {
pub precompile_cache_enabled: bool,
/// Disable precompile cache
#[arg(long = "engine.disable-precompile-cache", default_value_t = DefaultEngineValues::get_global().precompile_cache_disabled)]
#[arg(long = "engine.disable-precompile-cache", default_value = "false")]
pub precompile_cache_disabled: bool,
/// Enable state root fallback, useful for testing
#[arg(long = "engine.state-root-fallback", default_value_t = DefaultEngineValues::get_global().state_root_fallback)]
#[arg(long = "engine.state-root-fallback", default_value = "false")]
pub state_root_fallback: bool,
/// Always process payload attributes and begin a payload build process even if
@@ -290,73 +101,51 @@ pub struct EngineArgs {
/// Note: This is a no-op on OP Stack.
#[arg(
long = "engine.always-process-payload-attributes-on-canonical-head",
default_value_t = DefaultEngineValues::get_global().always_process_payload_attributes_on_canonical_head
default_value = "false"
)]
pub always_process_payload_attributes_on_canonical_head: bool,
/// Allow unwinding canonical header to ancestor during forkchoice updates.
/// See `TreeConfig::unwind_canonical_header` for more details.
#[arg(long = "engine.allow-unwind-canonical-header", default_value_t = DefaultEngineValues::get_global().allow_unwind_canonical_header)]
#[arg(long = "engine.allow-unwind-canonical-header", default_value = "false")]
pub allow_unwind_canonical_header: bool,
/// Configure the number of storage proof workers in the Tokio blocking pool.
/// If not specified, defaults to 2x available parallelism, clamped between 2 and 64.
#[arg(long = "engine.storage-worker-count", default_value = Resettable::from(DefaultEngineValues::get_global().storage_worker_count.map(|v| v.to_string().into())))]
#[arg(long = "engine.storage-worker-count")]
pub storage_worker_count: Option<usize>,
/// Configure the number of account proof workers in the Tokio blocking pool.
/// If not specified, defaults to the same count as storage workers.
#[arg(long = "engine.account-worker-count", default_value = Resettable::from(DefaultEngineValues::get_global().account_worker_count.map(|v| v.to_string().into())))]
#[arg(long = "engine.account-worker-count")]
pub account_worker_count: Option<usize>,
}
#[allow(deprecated)]
impl Default for EngineArgs {
fn default() -> Self {
let DefaultEngineValues {
persistence_threshold,
memory_block_buffer_target,
legacy_state_root_task_enabled,
state_cache_disabled,
prewarming_disabled,
parallel_sparse_trie_disabled,
state_provider_metrics,
cross_block_cache_size,
state_root_task_compare_updates,
accept_execution_requests_hash,
multiproof_chunking_enabled,
multiproof_chunk_size,
reserved_cpu_cores,
precompile_cache_disabled,
state_root_fallback,
always_process_payload_attributes_on_canonical_head,
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
} = DefaultEngineValues::get_global().clone();
Self {
persistence_threshold,
memory_block_buffer_target,
legacy_state_root_task_enabled,
state_root_task_compare_updates,
persistence_threshold: DEFAULT_PERSISTENCE_THRESHOLD,
memory_block_buffer_target: DEFAULT_MEMORY_BLOCK_BUFFER_TARGET,
legacy_state_root_task_enabled: false,
state_root_task_compare_updates: false,
caching_and_prewarming_enabled: true,
state_cache_disabled,
prewarming_disabled,
prewarming_disabled: false,
parallel_sparse_trie_enabled: true,
parallel_sparse_trie_disabled,
state_provider_metrics,
cross_block_cache_size,
accept_execution_requests_hash,
multiproof_chunking_enabled,
multiproof_chunk_size,
reserved_cpu_cores,
parallel_sparse_trie_disabled: false,
state_provider_metrics: false,
cross_block_cache_size: DEFAULT_CROSS_BLOCK_CACHE_SIZE_MB,
accept_execution_requests_hash: false,
multiproof_chunking_enabled: true,
multiproof_chunk_size: DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE,
reserved_cpu_cores: DEFAULT_RESERVED_CPU_CORES,
precompile_cache_enabled: true,
precompile_cache_disabled,
state_root_fallback,
always_process_payload_attributes_on_canonical_head,
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
precompile_cache_disabled: false,
state_root_fallback: false,
always_process_payload_attributes_on_canonical_head: false,
allow_unwind_canonical_header: false,
storage_worker_count: None,
account_worker_count: None,
}
}
}
@@ -368,7 +157,6 @@ impl EngineArgs {
.with_persistence_threshold(self.persistence_threshold)
.with_memory_block_buffer_target(self.memory_block_buffer_target)
.with_legacy_state_root(self.legacy_state_root_task_enabled)
.without_state_cache(self.state_cache_disabled)
.without_prewarming(self.prewarming_disabled)
.with_disable_parallel_sparse_trie(self.parallel_sparse_trie_disabled)
.with_state_provider_metrics(self.state_provider_metrics)
@@ -414,66 +202,4 @@ mod tests {
let args = CommandParser::<EngineArgs>::parse_from(["reth"]).args;
assert_eq!(args, default_args);
}
#[test]
#[allow(deprecated)]
fn engine_args() {
let args = EngineArgs {
persistence_threshold: 100,
memory_block_buffer_target: 50,
legacy_state_root_task_enabled: true,
caching_and_prewarming_enabled: true,
state_cache_disabled: true,
prewarming_disabled: true,
parallel_sparse_trie_enabled: true,
parallel_sparse_trie_disabled: true,
state_provider_metrics: true,
cross_block_cache_size: 256,
state_root_task_compare_updates: true,
accept_execution_requests_hash: true,
multiproof_chunking_enabled: true,
multiproof_chunk_size: 512,
reserved_cpu_cores: 4,
precompile_cache_enabled: true,
precompile_cache_disabled: true,
state_root_fallback: true,
always_process_payload_attributes_on_canonical_head: true,
allow_unwind_canonical_header: true,
storage_worker_count: Some(16),
account_worker_count: Some(8),
};
let parsed_args = CommandParser::<EngineArgs>::parse_from([
"reth",
"--engine.persistence-threshold",
"100",
"--engine.memory-block-buffer-target",
"50",
"--engine.legacy-state-root",
"--engine.disable-state-cache",
"--engine.disable-prewarming",
"--engine.disable-parallel-sparse-trie",
"--engine.state-provider-metrics",
"--engine.cross-block-cache-size",
"256",
"--engine.state-root-task-compare-updates",
"--engine.accept-execution-requests-hash",
"--engine.multiproof-chunking",
"--engine.multiproof-chunk-size",
"512",
"--engine.reserved-cpu-cores",
"4",
"--engine.disable-precompile-cache",
"--engine.state-root-fallback",
"--engine.always-process-payload-attributes-on-canonical-head",
"--engine.allow-unwind-canonical-header",
"--engine.storage-worker-count",
"16",
"--engine.account-worker-count",
"8",
])
.args;
assert_eq!(parsed_args, args);
}
}

View File

@@ -6,7 +6,7 @@ pub use network::{DiscoveryArgs, NetworkArgs};
/// RpcServerArg struct for configuring the RPC
mod rpc_server;
pub use rpc_server::{DefaultRpcServerArgs, RpcServerArgs};
pub use rpc_server::RpcServerArgs;
/// `RpcStateCacheArgs` struct for configuring RPC state cache
mod rpc_state_cache;
@@ -66,7 +66,7 @@ pub use benchmark_args::BenchmarkArgs;
/// EngineArgs for configuring the engine
mod engine;
pub use engine::{DefaultEngineValues, EngineArgs};
pub use engine::EngineArgs;
/// `RessArgs` for configuring ress subprotocol.
mod ress_args;

View File

@@ -119,18 +119,6 @@ pub struct NetworkArgs {
#[arg(long)]
pub max_inbound_peers: Option<usize>,
/// Maximum number of total peers (inbound + outbound).
///
/// Splits peers using approximately 2:1 inbound:outbound ratio. Cannot be used together with
/// `--max-outbound-peers` or `--max-inbound-peers`.
#[arg(
long,
value_name = "COUNT",
conflicts_with = "max_outbound_peers",
conflicts_with = "max_inbound_peers"
)]
pub max_peers: Option<usize>,
/// Max concurrent `GetPooledTransactions` requests.
#[arg(long = "max-tx-reqs", value_name = "COUNT", default_value_t = DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS, verbatim_doc_comment)]
pub max_concurrent_tx_requests: u32,
@@ -257,34 +245,6 @@ impl NetworkArgs {
bootnodes.into_iter().filter_map(|node| node.resolve_blocking().ok()).collect()
})
}
/// Returns the max inbound peers (2:1 ratio).
pub fn resolved_max_inbound_peers(&self) -> Option<usize> {
if let Some(max_peers) = self.max_peers {
if max_peers == 0 {
Some(0)
} else {
let outbound = (max_peers / 3).max(1);
Some(max_peers.saturating_sub(outbound))
}
} else {
self.max_inbound_peers
}
}
/// Returns the max outbound peers (1:2 ratio).
pub fn resolved_max_outbound_peers(&self) -> Option<usize> {
if let Some(max_peers) = self.max_peers {
if max_peers == 0 {
Some(0)
} else {
Some((max_peers / 3).max(1))
}
} else {
self.max_outbound_peers
}
}
/// Configures and returns a `TransactionsManagerConfig` based on the current settings.
pub const fn transactions_manager_config(&self) -> TransactionsManagerConfig {
TransactionsManagerConfig {
@@ -331,8 +291,8 @@ impl NetworkArgs {
.peers_config_with_basic_nodes_from_file(
self.persistent_peers_file(peers_file).as_deref(),
)
.with_max_inbound_opt(self.resolved_max_inbound_peers())
.with_max_outbound_opt(self.resolved_max_outbound_peers())
.with_max_inbound_opt(self.max_inbound_peers)
.with_max_outbound_opt(self.max_outbound_peers)
.with_ip_filter(ip_filter);
// Configure basic network stack
@@ -474,7 +434,6 @@ impl Default for NetworkArgs {
port: DEFAULT_DISCOVERY_PORT,
max_outbound_peers: None,
max_inbound_peers: None,
max_peers: None,
max_concurrent_tx_requests: DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS,
max_concurrent_tx_requests_per_peer: DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
soft_limit_byte_size_pooled_transactions_response:
@@ -799,96 +758,6 @@ mod tests {
assert!(args.disable_tx_gossip);
}
#[test]
fn parse_max_peers_flag() {
let args = CommandParser::<NetworkArgs>::parse_from(["reth", "--max-peers", "90"]).args;
assert_eq!(args.max_peers, Some(90));
assert_eq!(args.max_outbound_peers, None);
assert_eq!(args.max_inbound_peers, None);
assert_eq!(args.resolved_max_outbound_peers(), Some(30));
assert_eq!(args.resolved_max_inbound_peers(), Some(60));
}
#[test]
fn max_peers_conflicts_with_outbound() {
let result = CommandParser::<NetworkArgs>::try_parse_from([
"reth",
"--max-peers",
"90",
"--max-outbound-peers",
"50",
]);
assert!(
result.is_err(),
"Should fail when both --max-peers and --max-outbound-peers are used"
);
}
#[test]
fn max_peers_conflicts_with_inbound() {
let result = CommandParser::<NetworkArgs>::try_parse_from([
"reth",
"--max-peers",
"90",
"--max-inbound-peers",
"30",
]);
assert!(
result.is_err(),
"Should fail when both --max-peers and --max-inbound-peers are used"
);
}
#[test]
fn max_peers_split_calculation() {
let args = CommandParser::<NetworkArgs>::parse_from(["reth", "--max-peers", "90"]).args;
assert_eq!(args.max_peers, Some(90));
assert_eq!(args.resolved_max_outbound_peers(), Some(30));
assert_eq!(args.resolved_max_inbound_peers(), Some(60));
}
#[test]
fn max_peers_small_values() {
let args1 = CommandParser::<NetworkArgs>::parse_from(["reth", "--max-peers", "1"]).args;
assert_eq!(args1.resolved_max_outbound_peers(), Some(1));
assert_eq!(args1.resolved_max_inbound_peers(), Some(0));
let args2 = CommandParser::<NetworkArgs>::parse_from(["reth", "--max-peers", "2"]).args;
assert_eq!(args2.resolved_max_outbound_peers(), Some(1));
assert_eq!(args2.resolved_max_inbound_peers(), Some(1));
let args3 = CommandParser::<NetworkArgs>::parse_from(["reth", "--max-peers", "3"]).args;
assert_eq!(args3.resolved_max_outbound_peers(), Some(1));
assert_eq!(args3.resolved_max_inbound_peers(), Some(2));
}
#[test]
fn resolved_peers_without_max_peers() {
let args = CommandParser::<NetworkArgs>::parse_from([
"reth",
"--max-outbound-peers",
"75",
"--max-inbound-peers",
"15",
])
.args;
assert_eq!(args.max_peers, None);
assert_eq!(args.resolved_max_outbound_peers(), Some(75));
assert_eq!(args.resolved_max_inbound_peers(), Some(15));
}
#[test]
fn resolved_peers_with_defaults() {
let args = CommandParser::<NetworkArgs>::parse_from(["reth"]).args;
assert_eq!(args.max_peers, None);
assert_eq!(args.resolved_max_outbound_peers(), None);
assert_eq!(args.resolved_max_inbound_peers(), None);
}
#[test]
fn network_args_default_sanity_test() {
let default_args = NetworkArgs::default();

View File

@@ -7,7 +7,7 @@ use crate::args::{
use alloy_primitives::Address;
use alloy_rpc_types_engine::JwtSecret;
use clap::{
builder::{PossibleValue, RangedU64ValueParser, Resettable, TypedValueParser},
builder::{PossibleValue, RangedU64ValueParser, TypedValueParser},
Arg, Args, Command,
};
use rand::Rng;
@@ -19,16 +19,12 @@ use std::{
ffi::OsStr,
net::{IpAddr, Ipv4Addr},
path::PathBuf,
sync::OnceLock,
time::Duration,
};
use url::Url;
use super::types::MaxOr;
/// Global static RPC server defaults
static RPC_SERVER_DEFAULTS: OnceLock<DefaultRpcServerArgs> = OnceLock::new();
/// Default max number of subscriptions per connection.
pub(crate) const RPC_DEFAULT_MAX_SUBS_PER_CONN: u32 = 1024;
@@ -41,442 +37,76 @@ pub(crate) const RPC_DEFAULT_MAX_REQUEST_SIZE_MB: u32 = 15;
pub(crate) const RPC_DEFAULT_MAX_RESPONSE_SIZE_MB: u32 = 160;
/// Default number of incoming connections.
///
/// This restricts how many active connections (http, ws) the server accepts.
/// Once exceeded, the server can reject new connections.
pub(crate) const RPC_DEFAULT_MAX_CONNECTIONS: u32 = 500;
/// Default values for RPC server that can be customized
///
/// Global defaults can be set via [`DefaultRpcServerArgs::try_init`].
#[derive(Debug, Clone)]
pub struct DefaultRpcServerArgs {
http: bool,
http_addr: IpAddr,
http_port: u16,
http_disable_compression: bool,
http_api: Option<RpcModuleSelection>,
http_corsdomain: Option<String>,
ws: bool,
ws_addr: IpAddr,
ws_port: u16,
ws_allowed_origins: Option<String>,
ws_api: Option<RpcModuleSelection>,
ipcdisable: bool,
ipcpath: String,
ipc_socket_permissions: Option<String>,
auth_addr: IpAddr,
auth_port: u16,
auth_jwtsecret: Option<PathBuf>,
auth_ipc: bool,
auth_ipc_path: String,
disable_auth_server: bool,
rpc_jwtsecret: Option<JwtSecret>,
rpc_max_request_size: MaxU32,
rpc_max_response_size: MaxU32,
rpc_max_subscriptions_per_connection: MaxU32,
rpc_max_connections: MaxU32,
rpc_max_tracing_requests: usize,
rpc_max_blocking_io_requests: usize,
rpc_max_trace_filter_blocks: u64,
rpc_max_blocks_per_filter: ZeroAsNoneU64,
rpc_max_logs_per_response: ZeroAsNoneU64,
rpc_gas_cap: u64,
rpc_evm_memory_limit: u64,
rpc_tx_fee_cap: u128,
rpc_max_simulate_blocks: u64,
rpc_eth_proof_window: u64,
rpc_proof_permits: usize,
rpc_pending_block: PendingBlockKind,
rpc_forwarder: Option<Url>,
builder_disallow: Option<HashSet<Address>>,
rpc_state_cache: RpcStateCacheArgs,
gas_price_oracle: GasPriceOracleArgs,
rpc_send_raw_transaction_sync_timeout: Duration,
}
impl DefaultRpcServerArgs {
/// Initialize the global RPC server defaults with this configuration
pub fn try_init(self) -> Result<(), Self> {
RPC_SERVER_DEFAULTS.set(self)
}
/// Get a reference to the global RPC server defaults
pub fn get_global() -> &'static Self {
RPC_SERVER_DEFAULTS.get_or_init(Self::default)
}
/// Set the default HTTP enabled state
pub const fn with_http(mut self, v: bool) -> Self {
self.http = v;
self
}
/// Set the default HTTP address
pub const fn with_http_addr(mut self, v: IpAddr) -> Self {
self.http_addr = v;
self
}
/// Set the default HTTP port
pub const fn with_http_port(mut self, v: u16) -> Self {
self.http_port = v;
self
}
/// Set whether to disable HTTP compression by default
pub const fn with_http_disable_compression(mut self, v: bool) -> Self {
self.http_disable_compression = v;
self
}
/// Set the default HTTP API modules
pub fn with_http_api(mut self, v: Option<RpcModuleSelection>) -> Self {
self.http_api = v;
self
}
/// Set the default HTTP CORS domain
pub fn with_http_corsdomain(mut self, v: Option<String>) -> Self {
self.http_corsdomain = v;
self
}
/// Set the default WS enabled state
pub const fn with_ws(mut self, v: bool) -> Self {
self.ws = v;
self
}
/// Set the default WS address
pub const fn with_ws_addr(mut self, v: IpAddr) -> Self {
self.ws_addr = v;
self
}
/// Set the default WS port
pub const fn with_ws_port(mut self, v: u16) -> Self {
self.ws_port = v;
self
}
/// Set the default WS allowed origins
pub fn with_ws_allowed_origins(mut self, v: Option<String>) -> Self {
self.ws_allowed_origins = v;
self
}
/// Set the default WS API modules
pub fn with_ws_api(mut self, v: Option<RpcModuleSelection>) -> Self {
self.ws_api = v;
self
}
/// Set whether to disable IPC by default
pub const fn with_ipcdisable(mut self, v: bool) -> Self {
self.ipcdisable = v;
self
}
/// Set the default IPC path
pub fn with_ipcpath(mut self, v: String) -> Self {
self.ipcpath = v;
self
}
/// Set the default IPC socket permissions
pub fn with_ipc_socket_permissions(mut self, v: Option<String>) -> Self {
self.ipc_socket_permissions = v;
self
}
/// Set the default auth server address
pub const fn with_auth_addr(mut self, v: IpAddr) -> Self {
self.auth_addr = v;
self
}
/// Set the default auth server port
pub const fn with_auth_port(mut self, v: u16) -> Self {
self.auth_port = v;
self
}
/// Set the default auth JWT secret path
pub fn with_auth_jwtsecret(mut self, v: Option<PathBuf>) -> Self {
self.auth_jwtsecret = v;
self
}
/// Set the default auth IPC enabled state
pub const fn with_auth_ipc(mut self, v: bool) -> Self {
self.auth_ipc = v;
self
}
/// Set the default auth IPC path
pub fn with_auth_ipc_path(mut self, v: String) -> Self {
self.auth_ipc_path = v;
self
}
/// Set whether to disable the auth server by default
pub const fn with_disable_auth_server(mut self, v: bool) -> Self {
self.disable_auth_server = v;
self
}
/// Set the default RPC JWT secret
pub const fn with_rpc_jwtsecret(mut self, v: Option<JwtSecret>) -> Self {
self.rpc_jwtsecret = v;
self
}
/// Set the default max request size
pub const fn with_rpc_max_request_size(mut self, v: MaxU32) -> Self {
self.rpc_max_request_size = v;
self
}
/// Set the default max response size
pub const fn with_rpc_max_response_size(mut self, v: MaxU32) -> Self {
self.rpc_max_response_size = v;
self
}
/// Set the default max subscriptions per connection
pub const fn with_rpc_max_subscriptions_per_connection(mut self, v: MaxU32) -> Self {
self.rpc_max_subscriptions_per_connection = v;
self
}
/// Set the default max connections
pub const fn with_rpc_max_connections(mut self, v: MaxU32) -> Self {
self.rpc_max_connections = v;
self
}
/// Set the default max tracing requests
pub const fn with_rpc_max_tracing_requests(mut self, v: usize) -> Self {
self.rpc_max_tracing_requests = v;
self
}
/// Set the default max blocking IO requests
pub const fn with_rpc_max_blocking_io_requests(mut self, v: usize) -> Self {
self.rpc_max_blocking_io_requests = v;
self
}
/// Set the default max trace filter blocks
pub const fn with_rpc_max_trace_filter_blocks(mut self, v: u64) -> Self {
self.rpc_max_trace_filter_blocks = v;
self
}
/// Set the default max blocks per filter
pub const fn with_rpc_max_blocks_per_filter(mut self, v: ZeroAsNoneU64) -> Self {
self.rpc_max_blocks_per_filter = v;
self
}
/// Set the default max logs per response
pub const fn with_rpc_max_logs_per_response(mut self, v: ZeroAsNoneU64) -> Self {
self.rpc_max_logs_per_response = v;
self
}
/// Set the default gas cap
pub const fn with_rpc_gas_cap(mut self, v: u64) -> Self {
self.rpc_gas_cap = v;
self
}
/// Set the default EVM memory limit
pub const fn with_rpc_evm_memory_limit(mut self, v: u64) -> Self {
self.rpc_evm_memory_limit = v;
self
}
/// Set the default tx fee cap
pub const fn with_rpc_tx_fee_cap(mut self, v: u128) -> Self {
self.rpc_tx_fee_cap = v;
self
}
/// Set the default max simulate blocks
pub const fn with_rpc_max_simulate_blocks(mut self, v: u64) -> Self {
self.rpc_max_simulate_blocks = v;
self
}
/// Set the default eth proof window
pub const fn with_rpc_eth_proof_window(mut self, v: u64) -> Self {
self.rpc_eth_proof_window = v;
self
}
/// Set the default proof permits
pub const fn with_rpc_proof_permits(mut self, v: usize) -> Self {
self.rpc_proof_permits = v;
self
}
/// Set the default pending block kind
pub const fn with_rpc_pending_block(mut self, v: PendingBlockKind) -> Self {
self.rpc_pending_block = v;
self
}
/// Set the default RPC forwarder
pub fn with_rpc_forwarder(mut self, v: Option<Url>) -> Self {
self.rpc_forwarder = v;
self
}
/// Set the default builder disallow addresses
pub fn with_builder_disallow(mut self, v: Option<HashSet<Address>>) -> Self {
self.builder_disallow = v;
self
}
/// Set the default RPC state cache args
pub const fn with_rpc_state_cache(mut self, v: RpcStateCacheArgs) -> Self {
self.rpc_state_cache = v;
self
}
/// Set the default gas price oracle args
pub const fn with_gas_price_oracle(mut self, v: GasPriceOracleArgs) -> Self {
self.gas_price_oracle = v;
self
}
/// Set the default send raw transaction sync timeout
pub const fn with_rpc_send_raw_transaction_sync_timeout(mut self, v: Duration) -> Self {
self.rpc_send_raw_transaction_sync_timeout = v;
self
}
}
impl Default for DefaultRpcServerArgs {
fn default() -> Self {
Self {
http: false,
http_addr: Ipv4Addr::LOCALHOST.into(),
http_port: constants::DEFAULT_HTTP_RPC_PORT,
http_disable_compression: false,
http_api: None,
http_corsdomain: None,
ws: false,
ws_addr: Ipv4Addr::LOCALHOST.into(),
ws_port: constants::DEFAULT_WS_RPC_PORT,
ws_allowed_origins: None,
ws_api: None,
ipcdisable: false,
ipcpath: constants::DEFAULT_IPC_ENDPOINT.to_string(),
ipc_socket_permissions: None,
auth_addr: Ipv4Addr::LOCALHOST.into(),
auth_port: constants::DEFAULT_AUTH_PORT,
auth_jwtsecret: None,
auth_ipc: false,
auth_ipc_path: constants::DEFAULT_ENGINE_API_IPC_ENDPOINT.to_string(),
disable_auth_server: false,
rpc_jwtsecret: None,
rpc_max_request_size: RPC_DEFAULT_MAX_REQUEST_SIZE_MB.into(),
rpc_max_response_size: RPC_DEFAULT_MAX_RESPONSE_SIZE_MB.into(),
rpc_max_subscriptions_per_connection: RPC_DEFAULT_MAX_SUBS_PER_CONN.into(),
rpc_max_connections: RPC_DEFAULT_MAX_CONNECTIONS.into(),
rpc_max_tracing_requests: constants::default_max_tracing_requests(),
rpc_max_blocking_io_requests: constants::DEFAULT_MAX_BLOCKING_IO_REQUEST,
rpc_max_trace_filter_blocks: constants::DEFAULT_MAX_TRACE_FILTER_BLOCKS,
rpc_max_blocks_per_filter: constants::DEFAULT_MAX_BLOCKS_PER_FILTER.into(),
rpc_max_logs_per_response: (constants::DEFAULT_MAX_LOGS_PER_RESPONSE as u64).into(),
rpc_gas_cap: constants::gas_oracle::RPC_DEFAULT_GAS_CAP,
rpc_evm_memory_limit: (1 << 32) - 1,
rpc_tx_fee_cap: constants::DEFAULT_TX_FEE_CAP_WEI,
rpc_max_simulate_blocks: constants::DEFAULT_MAX_SIMULATE_BLOCKS,
rpc_eth_proof_window: constants::DEFAULT_ETH_PROOF_WINDOW,
rpc_proof_permits: constants::DEFAULT_PROOF_PERMITS,
rpc_pending_block: PendingBlockKind::Full,
rpc_forwarder: None,
builder_disallow: None,
rpc_state_cache: RpcStateCacheArgs::default(),
gas_price_oracle: GasPriceOracleArgs::default(),
rpc_send_raw_transaction_sync_timeout:
constants::RPC_DEFAULT_SEND_RAW_TX_SYNC_TIMEOUT_SECS,
}
}
}
/// Parameters for configuring the rpc more granularity via CLI
#[derive(Debug, Clone, Args, PartialEq, Eq)]
#[command(next_help_heading = "RPC")]
pub struct RpcServerArgs {
/// Enable the HTTP-RPC server
#[arg(long, default_value_if("dev", "true", "true"), default_value_t = DefaultRpcServerArgs::get_global().http)]
#[arg(long, default_value_if("dev", "true", "true"))]
pub http: bool,
/// Http server address to listen on
#[arg(long = "http.addr", default_value_t = DefaultRpcServerArgs::get_global().http_addr)]
#[arg(long = "http.addr", default_value_t = IpAddr::V4(Ipv4Addr::LOCALHOST))]
pub http_addr: IpAddr,
/// Http server port to listen on
#[arg(long = "http.port", default_value_t = DefaultRpcServerArgs::get_global().http_port)]
#[arg(long = "http.port", default_value_t = constants::DEFAULT_HTTP_RPC_PORT)]
pub http_port: u16,
/// Disable compression for HTTP responses
#[arg(long = "http.disable-compression", default_value_t = DefaultRpcServerArgs::get_global().http_disable_compression)]
#[arg(long = "http.disable-compression", default_value_t = false)]
pub http_disable_compression: bool,
/// Rpc Modules to be configured for the HTTP server
#[arg(long = "http.api", value_parser = RpcModuleSelectionValueParser::default(), default_value = Resettable::from(DefaultRpcServerArgs::get_global().http_api.as_ref().map(|v| v.to_string().into())))]
#[arg(long = "http.api", value_parser = RpcModuleSelectionValueParser::default())]
pub http_api: Option<RpcModuleSelection>,
/// Http Corsdomain to allow request from
#[arg(long = "http.corsdomain", default_value = Resettable::from(DefaultRpcServerArgs::get_global().http_corsdomain.as_ref().map(|v| v.to_string().into())))]
#[arg(long = "http.corsdomain")]
pub http_corsdomain: Option<String>,
/// Enable the WS-RPC server
#[arg(long, default_value_t = DefaultRpcServerArgs::get_global().ws)]
#[arg(long)]
pub ws: bool,
/// Ws server address to listen on
#[arg(long = "ws.addr", default_value_t = DefaultRpcServerArgs::get_global().ws_addr)]
#[arg(long = "ws.addr", default_value_t = IpAddr::V4(Ipv4Addr::LOCALHOST))]
pub ws_addr: IpAddr,
/// Ws server port to listen on
#[arg(long = "ws.port", default_value_t = DefaultRpcServerArgs::get_global().ws_port)]
#[arg(long = "ws.port", default_value_t = constants::DEFAULT_WS_RPC_PORT)]
pub ws_port: u16,
/// Origins from which to accept `WebSocket` requests
#[arg(id = "ws.origins", long = "ws.origins", alias = "ws.corsdomain", default_value = Resettable::from(DefaultRpcServerArgs::get_global().ws_allowed_origins.as_ref().map(|v| v.to_string().into())))]
#[arg(id = "ws.origins", long = "ws.origins", alias = "ws.corsdomain")]
pub ws_allowed_origins: Option<String>,
/// Rpc Modules to be configured for the WS server
#[arg(long = "ws.api", value_parser = RpcModuleSelectionValueParser::default(), default_value = Resettable::from(DefaultRpcServerArgs::get_global().ws_api.as_ref().map(|v| v.to_string().into())))]
#[arg(long = "ws.api", value_parser = RpcModuleSelectionValueParser::default())]
pub ws_api: Option<RpcModuleSelection>,
/// Disable the IPC-RPC server
#[arg(long, default_value_t = DefaultRpcServerArgs::get_global().ipcdisable)]
#[arg(long)]
pub ipcdisable: bool,
/// Filename for IPC socket/pipe within the datadir
#[arg(long, default_value_t = DefaultRpcServerArgs::get_global().ipcpath.clone())]
#[arg(long, default_value_t = constants::DEFAULT_IPC_ENDPOINT.to_string())]
pub ipcpath: String,
/// Set the permissions for the IPC socket file, in octal format.
///
/// If not specified, the permissions will be set by the system's umask.
#[arg(long = "ipc.permissions", default_value = Resettable::from(DefaultRpcServerArgs::get_global().ipc_socket_permissions.as_ref().map(|v| v.to_string().into())))]
#[arg(long = "ipc.permissions")]
pub ipc_socket_permissions: Option<String>,
/// Auth server address to listen on
#[arg(long = "authrpc.addr", default_value_t = DefaultRpcServerArgs::get_global().auth_addr)]
#[arg(long = "authrpc.addr", default_value_t = IpAddr::V4(Ipv4Addr::LOCALHOST))]
pub auth_addr: IpAddr,
/// Auth server port to listen on
#[arg(long = "authrpc.port", default_value_t = DefaultRpcServerArgs::get_global().auth_port)]
#[arg(long = "authrpc.port", default_value_t = constants::DEFAULT_AUTH_PORT)]
pub auth_port: u16,
/// Path to a JWT secret to use for the authenticated engine-API RPC server.
@@ -485,22 +115,22 @@ pub struct RpcServerArgs {
///
/// If no path is provided, a secret will be generated and stored in the datadir under
/// `<DIR>/<CHAIN_ID>/jwt.hex`. For mainnet this would be `~/.reth/mainnet/jwt.hex` by default.
#[arg(long = "authrpc.jwtsecret", value_name = "PATH", global = true, required = false, default_value = Resettable::from(DefaultRpcServerArgs::get_global().auth_jwtsecret.as_ref().map(|v| v.to_string_lossy().into())))]
#[arg(long = "authrpc.jwtsecret", value_name = "PATH", global = true, required = false)]
pub auth_jwtsecret: Option<PathBuf>,
/// Enable auth engine API over IPC
#[arg(long, default_value_t = DefaultRpcServerArgs::get_global().auth_ipc)]
#[arg(long)]
pub auth_ipc: bool,
/// Filename for auth IPC socket/pipe within the datadir
#[arg(long = "auth-ipc.path", default_value_t = DefaultRpcServerArgs::get_global().auth_ipc_path.clone())]
#[arg(long = "auth-ipc.path", default_value_t = constants::DEFAULT_ENGINE_API_IPC_ENDPOINT.to_string())]
pub auth_ipc_path: String,
/// Disable the auth/engine API server.
///
/// This will prevent the authenticated engine-API server from starting. Use this if you're
/// running a node that doesn't need to serve engine API requests.
#[arg(long = "disable-auth-server", alias = "disable-engine-api", default_value_t = DefaultRpcServerArgs::get_global().disable_auth_server)]
#[arg(long = "disable-auth-server", alias = "disable-engine-api")]
pub disable_auth_server: bool,
/// Hex encoded JWT secret to authenticate the regular RPC server(s), see `--http.api` and
@@ -508,23 +138,23 @@ pub struct RpcServerArgs {
///
/// This is __not__ used for the authenticated engine-API RPC server, see
/// `--authrpc.jwtsecret`.
#[arg(long = "rpc.jwtsecret", value_name = "HEX", global = true, required = false, default_value = Resettable::from(DefaultRpcServerArgs::get_global().rpc_jwtsecret.as_ref().map(|v| format!("{:?}", v).into())))]
#[arg(long = "rpc.jwtsecret", value_name = "HEX", global = true, required = false)]
pub rpc_jwtsecret: Option<JwtSecret>,
/// Set the maximum RPC request payload size for both HTTP and WS in megabytes.
#[arg(long = "rpc.max-request-size", alias = "rpc-max-request-size", default_value_t = DefaultRpcServerArgs::get_global().rpc_max_request_size)]
#[arg(long = "rpc.max-request-size", alias = "rpc-max-request-size", default_value_t = RPC_DEFAULT_MAX_REQUEST_SIZE_MB.into())]
pub rpc_max_request_size: MaxU32,
/// Set the maximum RPC response payload size for both HTTP and WS in megabytes.
#[arg(long = "rpc.max-response-size", alias = "rpc-max-response-size", visible_alias = "rpc.returndata.limit", default_value_t = DefaultRpcServerArgs::get_global().rpc_max_response_size)]
#[arg(long = "rpc.max-response-size", alias = "rpc-max-response-size", visible_alias = "rpc.returndata.limit", default_value_t = RPC_DEFAULT_MAX_RESPONSE_SIZE_MB.into())]
pub rpc_max_response_size: MaxU32,
/// Set the maximum concurrent subscriptions per connection.
#[arg(long = "rpc.max-subscriptions-per-connection", alias = "rpc-max-subscriptions-per-connection", default_value_t = DefaultRpcServerArgs::get_global().rpc_max_subscriptions_per_connection)]
#[arg(long = "rpc.max-subscriptions-per-connection", alias = "rpc-max-subscriptions-per-connection", default_value_t = RPC_DEFAULT_MAX_SUBS_PER_CONN.into())]
pub rpc_max_subscriptions_per_connection: MaxU32,
/// Maximum number of RPC server connections.
#[arg(long = "rpc.max-connections", alias = "rpc-max-connections", value_name = "COUNT", default_value_t = DefaultRpcServerArgs::get_global().rpc_max_connections)]
#[arg(long = "rpc.max-connections", alias = "rpc-max-connections", value_name = "COUNT", default_value_t = RPC_DEFAULT_MAX_CONNECTIONS.into())]
pub rpc_max_connections: MaxU32,
/// Maximum number of concurrent tracing requests.
@@ -533,27 +163,19 @@ pub struct RpcServerArgs {
/// Tracing requests are generally CPU bound.
/// Choosing a value that is higher than the available CPU cores can have a negative impact on
/// the performance of the node and affect the node's ability to maintain sync.
#[arg(long = "rpc.max-tracing-requests", alias = "rpc-max-tracing-requests", value_name = "COUNT", default_value_t = DefaultRpcServerArgs::get_global().rpc_max_tracing_requests)]
#[arg(long = "rpc.max-tracing-requests", alias = "rpc-max-tracing-requests", value_name = "COUNT", default_value_t = constants::default_max_tracing_requests())]
pub rpc_max_tracing_requests: usize,
/// Maximum number of concurrent blocking IO requests.
///
/// Blocking IO requests include `eth_call`, `eth_estimateGas`, and similar methods that
/// require EVM execution. These are spawned as blocking tasks to avoid blocking the async
/// runtime.
#[arg(long = "rpc.max-blocking-io-requests", alias = "rpc-max-blocking-io-requests", value_name = "COUNT", default_value_t = DefaultRpcServerArgs::get_global().rpc_max_blocking_io_requests)]
pub rpc_max_blocking_io_requests: usize,
/// Maximum number of blocks for `trace_filter` requests.
#[arg(long = "rpc.max-trace-filter-blocks", alias = "rpc-max-trace-filter-blocks", value_name = "COUNT", default_value_t = DefaultRpcServerArgs::get_global().rpc_max_trace_filter_blocks)]
#[arg(long = "rpc.max-trace-filter-blocks", alias = "rpc-max-trace-filter-blocks", value_name = "COUNT", default_value_t = constants::DEFAULT_MAX_TRACE_FILTER_BLOCKS)]
pub rpc_max_trace_filter_blocks: u64,
/// Maximum number of blocks that could be scanned per filter request. (0 = entire chain)
#[arg(long = "rpc.max-blocks-per-filter", alias = "rpc-max-blocks-per-filter", value_name = "COUNT", default_value_t = DefaultRpcServerArgs::get_global().rpc_max_blocks_per_filter)]
#[arg(long = "rpc.max-blocks-per-filter", alias = "rpc-max-blocks-per-filter", value_name = "COUNT", default_value_t = ZeroAsNoneU64::new(constants::DEFAULT_MAX_BLOCKS_PER_FILTER))]
pub rpc_max_blocks_per_filter: ZeroAsNoneU64,
/// Maximum number of logs that can be returned in a single response. (0 = no limit)
#[arg(long = "rpc.max-logs-per-response", alias = "rpc-max-logs-per-response", value_name = "COUNT", default_value_t = DefaultRpcServerArgs::get_global().rpc_max_logs_per_response)]
#[arg(long = "rpc.max-logs-per-response", alias = "rpc-max-logs-per-response", value_name = "COUNT", default_value_t = ZeroAsNoneU64::new(constants::DEFAULT_MAX_LOGS_PER_RESPONSE as u64))]
pub rpc_max_logs_per_response: ZeroAsNoneU64,
/// Maximum gas limit for `eth_call` and call tracing RPC methods.
@@ -562,7 +184,7 @@ pub struct RpcServerArgs {
alias = "rpc-gascap",
value_name = "GAS_CAP",
value_parser = MaxOr::new(RangedU64ValueParser::<u64>::new().range(1..)),
default_value_t = DefaultRpcServerArgs::get_global().rpc_gas_cap
default_value_t = constants::gas_oracle::RPC_DEFAULT_GAS_CAP
)]
pub rpc_gas_cap: u64,
@@ -572,7 +194,7 @@ pub struct RpcServerArgs {
alias = "rpc-evm-memory-limit",
value_name = "MEMORY_LIMIT",
value_parser = MaxOr::new(RangedU64ValueParser::<u64>::new().range(1..)),
default_value_t = DefaultRpcServerArgs::get_global().rpc_evm_memory_limit
default_value_t = (1 << 32) - 1
)]
pub rpc_evm_memory_limit: u64,
@@ -590,7 +212,7 @@ pub struct RpcServerArgs {
#[arg(
long = "rpc.max-simulate-blocks",
value_name = "BLOCKS_COUNT",
default_value_t = DefaultRpcServerArgs::get_global().rpc_max_simulate_blocks
default_value_t = constants::DEFAULT_MAX_SIMULATE_BLOCKS
)]
pub rpc_max_simulate_blocks: u64,
@@ -599,7 +221,7 @@ pub struct RpcServerArgs {
/// configured number of blocks from current tip (up to `tip - window`).
#[arg(
long = "rpc.eth-proof-window",
default_value_t = DefaultRpcServerArgs::get_global().rpc_eth_proof_window,
default_value_t = constants::DEFAULT_ETH_PROOF_WINDOW,
value_parser = RangedU64ValueParser::<u64>::new().range(..=constants::MAX_ETH_PROOF_WINDOW)
)]
pub rpc_eth_proof_window: u64,
@@ -621,7 +243,7 @@ pub struct RpcServerArgs {
/// Path to file containing disallowed addresses, json-encoded list of strings. Block
/// validation API will reject blocks containing transactions from these addresses.
#[arg(long = "builder.disallow", value_name = "PATH", value_parser = reth_cli_util::parsers::read_json_from_file::<HashSet<Address>>, default_value = Resettable::from(DefaultRpcServerArgs::get_global().builder_disallow.as_ref().map(|v| format!("{:?}", v).into())))]
#[arg(long = "builder.disallow", value_name = "PATH", value_parser = reth_cli_util::parsers::read_json_from_file::<HashSet<Address>>)]
pub builder_disallow: Option<HashSet<Address>>,
/// State cache configuration.
@@ -765,93 +387,49 @@ impl RpcServerArgs {
impl Default for RpcServerArgs {
fn default() -> Self {
let DefaultRpcServerArgs {
http,
http_addr,
http_port,
http_disable_compression,
http_api,
http_corsdomain,
ws,
ws_addr,
ws_port,
ws_allowed_origins,
ws_api,
ipcdisable,
ipcpath,
ipc_socket_permissions,
auth_addr,
auth_port,
auth_jwtsecret,
auth_ipc,
auth_ipc_path,
disable_auth_server,
rpc_jwtsecret,
rpc_max_request_size,
rpc_max_response_size,
rpc_max_subscriptions_per_connection,
rpc_max_connections,
rpc_max_tracing_requests,
rpc_max_blocking_io_requests,
rpc_max_trace_filter_blocks,
rpc_max_blocks_per_filter,
rpc_max_logs_per_response,
rpc_gas_cap,
rpc_evm_memory_limit,
rpc_tx_fee_cap,
rpc_max_simulate_blocks,
rpc_eth_proof_window,
rpc_proof_permits,
rpc_pending_block,
rpc_forwarder,
builder_disallow,
rpc_state_cache,
gas_price_oracle,
rpc_send_raw_transaction_sync_timeout,
} = DefaultRpcServerArgs::get_global().clone();
Self {
http,
http_addr,
http_port,
http_disable_compression,
http_api,
http_corsdomain,
ws,
ws_addr,
ws_port,
ws_allowed_origins,
ws_api,
ipcdisable,
ipcpath,
ipc_socket_permissions,
auth_addr,
auth_port,
auth_jwtsecret,
auth_ipc,
auth_ipc_path,
disable_auth_server,
rpc_jwtsecret,
rpc_max_request_size,
rpc_max_response_size,
rpc_max_subscriptions_per_connection,
rpc_max_connections,
rpc_max_tracing_requests,
rpc_max_blocking_io_requests,
rpc_max_trace_filter_blocks,
rpc_max_blocks_per_filter,
rpc_max_logs_per_response,
rpc_gas_cap,
rpc_evm_memory_limit,
rpc_tx_fee_cap,
rpc_max_simulate_blocks,
rpc_eth_proof_window,
rpc_proof_permits,
rpc_pending_block,
rpc_forwarder,
builder_disallow,
rpc_state_cache,
gas_price_oracle,
rpc_send_raw_transaction_sync_timeout,
http: false,
http_addr: Ipv4Addr::LOCALHOST.into(),
http_port: constants::DEFAULT_HTTP_RPC_PORT,
http_disable_compression: false,
http_api: None,
http_corsdomain: None,
ws: false,
ws_addr: Ipv4Addr::LOCALHOST.into(),
ws_port: constants::DEFAULT_WS_RPC_PORT,
ws_allowed_origins: None,
ws_api: None,
ipcdisable: false,
ipcpath: constants::DEFAULT_IPC_ENDPOINT.to_string(),
ipc_socket_permissions: None,
auth_addr: Ipv4Addr::LOCALHOST.into(),
auth_port: constants::DEFAULT_AUTH_PORT,
auth_jwtsecret: None,
auth_ipc: false,
auth_ipc_path: constants::DEFAULT_ENGINE_API_IPC_ENDPOINT.to_string(),
disable_auth_server: false,
rpc_jwtsecret: None,
rpc_max_request_size: RPC_DEFAULT_MAX_REQUEST_SIZE_MB.into(),
rpc_max_response_size: RPC_DEFAULT_MAX_RESPONSE_SIZE_MB.into(),
rpc_max_subscriptions_per_connection: RPC_DEFAULT_MAX_SUBS_PER_CONN.into(),
rpc_max_connections: RPC_DEFAULT_MAX_CONNECTIONS.into(),
rpc_max_tracing_requests: constants::default_max_tracing_requests(),
rpc_max_trace_filter_blocks: constants::DEFAULT_MAX_TRACE_FILTER_BLOCKS,
rpc_max_blocks_per_filter: constants::DEFAULT_MAX_BLOCKS_PER_FILTER.into(),
rpc_max_logs_per_response: (constants::DEFAULT_MAX_LOGS_PER_RESPONSE as u64).into(),
rpc_gas_cap: constants::gas_oracle::RPC_DEFAULT_GAS_CAP,
rpc_evm_memory_limit: (1 << 32) - 1,
rpc_tx_fee_cap: constants::DEFAULT_TX_FEE_CAP_WEI,
rpc_max_simulate_blocks: constants::DEFAULT_MAX_SIMULATE_BLOCKS,
rpc_eth_proof_window: constants::DEFAULT_ETH_PROOF_WINDOW,
rpc_pending_block: PendingBlockKind::Full,
gas_price_oracle: GasPriceOracleArgs::default(),
rpc_state_cache: RpcStateCacheArgs::default(),
rpc_proof_permits: constants::DEFAULT_PROOF_PERMITS,
rpc_forwarder: None,
builder_disallow: Default::default(),
rpc_send_raw_transaction_sync_timeout:
constants::RPC_DEFAULT_SEND_RAW_TX_SYNC_TIMEOUT_SECS,
}
}
}
@@ -964,159 +542,4 @@ mod tests {
let expected = 1_000_000_000_000_000_000u128;
assert_eq!(args.rpc_tx_fee_cap, expected); // 1 ETH default cap
}
#[test]
fn test_rpc_server_args() {
let args = RpcServerArgs {
http: true,
http_addr: "127.0.0.1".parse().unwrap(),
http_port: 8545,
http_disable_compression: false,
http_api: Some(RpcModuleSelection::try_from_selection(["eth", "admin"]).unwrap()),
http_corsdomain: Some("*".to_string()),
ws: true,
ws_addr: "127.0.0.1".parse().unwrap(),
ws_port: 8546,
ws_allowed_origins: Some("*".to_string()),
ws_api: Some(RpcModuleSelection::try_from_selection(["eth", "admin"]).unwrap()),
ipcdisable: false,
ipcpath: "reth.ipc".to_string(),
ipc_socket_permissions: Some("0o666".to_string()),
auth_addr: "127.0.0.1".parse().unwrap(),
auth_port: 8551,
auth_jwtsecret: Some(std::path::PathBuf::from("/tmp/jwt.hex")),
auth_ipc: false,
auth_ipc_path: "engine.ipc".to_string(),
disable_auth_server: false,
rpc_jwtsecret: Some(
JwtSecret::from_hex(
"0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
)
.unwrap(),
),
rpc_max_request_size: 15u32.into(),
rpc_max_response_size: 160u32.into(),
rpc_max_subscriptions_per_connection: 1024u32.into(),
rpc_max_connections: 500u32.into(),
rpc_max_tracing_requests: 16,
rpc_max_blocking_io_requests: 256,
rpc_max_trace_filter_blocks: 4000,
rpc_max_blocks_per_filter: 1000u64.into(),
rpc_max_logs_per_response: 10000u64.into(),
rpc_gas_cap: 50_000_000,
rpc_evm_memory_limit: 256,
rpc_tx_fee_cap: 2_000_000_000_000_000_000u128,
rpc_max_simulate_blocks: 256,
rpc_eth_proof_window: 100_000,
rpc_proof_permits: 16,
rpc_pending_block: PendingBlockKind::Full,
rpc_forwarder: Some("http://localhost:8545".parse().unwrap()),
builder_disallow: None,
rpc_state_cache: RpcStateCacheArgs {
max_blocks: 5000,
max_receipts: 2000,
max_headers: 1000,
max_concurrent_db_requests: 512,
},
gas_price_oracle: GasPriceOracleArgs {
blocks: 20,
ignore_price: 2,
max_price: 500_000_000_000,
percentile: 60,
default_suggested_fee: None,
},
rpc_send_raw_transaction_sync_timeout: std::time::Duration::from_secs(30),
};
let parsed_args = CommandParser::<RpcServerArgs>::parse_from([
"reth",
"--http",
"--http.addr",
"127.0.0.1",
"--http.port",
"8545",
"--http.api",
"eth,admin",
"--http.corsdomain",
"*",
"--ws",
"--ws.addr",
"127.0.0.1",
"--ws.port",
"8546",
"--ws.origins",
"*",
"--ws.api",
"eth,admin",
"--ipcpath",
"reth.ipc",
"--ipc.permissions",
"0o666",
"--authrpc.addr",
"127.0.0.1",
"--authrpc.port",
"8551",
"--authrpc.jwtsecret",
"/tmp/jwt.hex",
"--auth-ipc.path",
"engine.ipc",
"--rpc.jwtsecret",
"0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
"--rpc.max-request-size",
"15",
"--rpc.max-response-size",
"160",
"--rpc.max-subscriptions-per-connection",
"1024",
"--rpc.max-connections",
"500",
"--rpc.max-tracing-requests",
"16",
"--rpc.max-blocking-io-requests",
"256",
"--rpc.max-trace-filter-blocks",
"4000",
"--rpc.max-blocks-per-filter",
"1000",
"--rpc.max-logs-per-response",
"10000",
"--rpc.gascap",
"50000000",
"--rpc.evm-memory-limit",
"256",
"--rpc.txfeecap",
"2.0",
"--rpc.max-simulate-blocks",
"256",
"--rpc.eth-proof-window",
"100000",
"--rpc.proof-permits",
"16",
"--rpc.pending-block",
"full",
"--rpc.forwarder",
"http://localhost:8545",
"--rpc-cache.max-blocks",
"5000",
"--rpc-cache.max-receipts",
"2000",
"--rpc-cache.max-headers",
"1000",
"--rpc-cache.max-concurrent-db-requests",
"512",
"--gpo.blocks",
"20",
"--gpo.ignoreprice",
"2",
"--gpo.maxprice",
"500000000000",
"--gpo.percentile",
"60",
"--rpc.send-raw-transaction-sync-timeout",
"30s",
])
.args;
assert_eq!(parsed_args, args);
}
}

View File

@@ -70,7 +70,7 @@ use reth_chainspec::{
};
use reth_ethereum_forks::{ChainHardforks, EthereumHardfork, ForkCondition};
use reth_network_peers::NodeRecord;
use reth_optimism_primitives::L2_TO_L1_MESSAGE_PASSER_ADDRESS;
use reth_optimism_primitives::ADDRESS_L2_TO_L1_MESSAGE_PASSER;
use reth_primitives_traits::{sync::LazyLock, SealedHeader};
/// Chain spec builder for a OP stack chain.
@@ -499,7 +499,7 @@ pub fn make_op_genesis_header(genesis: &Genesis, hardforks: &ChainHardforks) ->
// If Isthmus is active, overwrite the withdrawals root with the storage root of predeploy
// `L2ToL1MessagePasser.sol`
if hardforks.fork(OpHardfork::Isthmus).active_at_timestamp(header.timestamp) &&
let Some(predeploy) = genesis.alloc.get(&L2_TO_L1_MESSAGE_PASSER_ADDRESS) &&
let Some(predeploy) = genesis.alloc.get(&ADDRESS_L2_TO_L1_MESSAGE_PASSER) &&
let Some(storage) = &predeploy.storage
{
header.withdrawals_root =

View File

@@ -98,7 +98,7 @@ mod tests {
OP_SEPOLIA_CANYON_TIMESTAMP, OP_SEPOLIA_ECOTONE_TIMESTAMP, OP_SEPOLIA_ISTHMUS_TIMESTAMP,
OP_SEPOLIA_JOVIAN_TIMESTAMP,
};
use reth_optimism_primitives::L2_TO_L1_MESSAGE_PASSER_ADDRESS;
use reth_optimism_primitives::ADDRESS_L2_TO_L1_MESSAGE_PASSER;
use tar_no_std::TarArchiveRef;
#[test]
@@ -106,7 +106,7 @@ mod tests {
let genesis = read_superchain_genesis("unichain", "mainnet").unwrap();
assert_eq!(genesis.config.chain_id, 130);
assert_eq!(genesis.timestamp, 1730748359);
assert!(genesis.alloc.contains_key(&L2_TO_L1_MESSAGE_PASSER_ADDRESS));
assert!(genesis.alloc.contains_key(&ADDRESS_L2_TO_L1_MESSAGE_PASSER));
}
#[test]
@@ -114,7 +114,7 @@ mod tests {
let genesis = read_superchain_genesis("funki", "mainnet").unwrap();
assert_eq!(genesis.config.chain_id, 33979);
assert_eq!(genesis.timestamp, 1721211095);
assert!(genesis.alloc.contains_key(&L2_TO_L1_MESSAGE_PASSER_ADDRESS));
assert!(genesis.alloc.contains_key(&ADDRESS_L2_TO_L1_MESSAGE_PASSER));
}
#[test]

View File

@@ -241,7 +241,7 @@ mod tests {
use alloy_consensus::{BlockBody, Eip658Value, Header, Receipt, TxEip7702, TxReceipt};
use alloy_eips::{eip4895::Withdrawals, eip7685::Requests};
use alloy_primitives::{Address, Bytes, Log, Signature, U256};
use alloy_primitives::{Address, Bytes, Signature, U256};
use op_alloy_consensus::{
encode_holocene_extra_data, encode_jovian_extra_data, OpTypedTransaction,
};
@@ -367,7 +367,7 @@ mod tests {
let beacon_consensus = OpBeaconConsensus::new(Arc::new(chain_spec));
let receipt = OpReceipt::Eip7702(Receipt::<Log> {
let receipt = OpReceipt::Eip7702(Receipt {
status: Eip658Value::success(),
cumulative_gas_used: GAS_USED,
logs: vec![],
@@ -436,7 +436,7 @@ mod tests {
let beacon_consensus = OpBeaconConsensus::new(Arc::new(chain_spec));
let receipt = OpReceipt::Eip7702(Receipt::<Log> {
let receipt = OpReceipt::Eip7702(Receipt {
status: Eip658Value::success(),
cumulative_gas_used: GAS_USED,
logs: vec![],
@@ -451,9 +451,7 @@ mod tests {
)),
gas_used: GAS_USED,
timestamp: u64::MAX,
receipts_root: proofs::calculate_receipt_root(std::slice::from_ref(
&receipt.with_bloom_ref(),
)),
receipts_root: proofs::calculate_receipt_root(std::slice::from_ref(&receipt)),
logs_bloom: receipt.bloom(),
..Default::default()
};
@@ -511,7 +509,7 @@ mod tests {
let beacon_consensus = OpBeaconConsensus::new(Arc::new(chain_spec));
let receipt = OpReceipt::Eip7702(Receipt::<Log> {
let receipt = OpReceipt::Eip7702(Receipt {
status: Eip658Value::success(),
cumulative_gas_used: 0,
logs: vec![],
@@ -528,9 +526,7 @@ mod tests {
)),
gas_used: 0,
timestamp: u64::MAX - 1,
receipts_root: proofs::calculate_receipt_root(std::slice::from_ref(
&receipt.with_bloom_ref(),
)),
receipts_root: proofs::calculate_receipt_root(std::slice::from_ref(&receipt)),
logs_bloom: receipt.bloom(),
extra_data: encode_jovian_extra_data(
Default::default(),
@@ -553,9 +549,7 @@ mod tests {
)),
gas_used: 0,
timestamp: u64::MAX,
receipts_root: proofs::calculate_receipt_root(std::slice::from_ref(
&receipt.with_bloom_ref(),
)),
receipts_root: proofs::calculate_receipt_root(std::slice::from_ref(&receipt)),
logs_bloom: receipt.bloom(),
parent_hash: parent.hash(),
..Default::default()
@@ -582,7 +576,7 @@ mod tests {
let beacon_consensus = OpBeaconConsensus::new(Arc::new(chain_spec));
let receipt = OpReceipt::Eip7702(Receipt::<Log> {
let receipt = OpReceipt::Eip7702(Receipt {
status: Eip658Value::success(),
cumulative_gas_used: 0,
logs: vec![],
@@ -599,9 +593,7 @@ mod tests {
)),
gas_used: 0,
timestamp: u64::MAX - 1,
receipts_root: proofs::calculate_receipt_root(std::slice::from_ref(
&receipt.with_bloom_ref(),
)),
receipts_root: proofs::calculate_receipt_root(std::slice::from_ref(&receipt)),
logs_bloom: receipt.bloom(),
extra_data: encode_jovian_extra_data(
Default::default(),
@@ -624,9 +616,7 @@ mod tests {
)),
gas_used: 0,
timestamp: u64::MAX,
receipts_root: proofs::calculate_receipt_root(std::slice::from_ref(
&receipt.with_bloom_ref(),
)),
receipts_root: proofs::calculate_receipt_root(std::slice::from_ref(&receipt)),
logs_bloom: receipt.bloom(),
parent_hash: parent.hash(),
..Default::default()
@@ -662,7 +652,7 @@ mod tests {
let beacon_consensus = OpBeaconConsensus::new(Arc::new(chain_spec));
let receipt = OpReceipt::Eip7702(Receipt::<Log> {
let receipt = OpReceipt::Eip7702(Receipt {
status: Eip658Value::success(),
cumulative_gas_used: 0,
logs: vec![],
@@ -679,9 +669,7 @@ mod tests {
)),
gas_used: 0,
timestamp: u64::MAX - 1,
receipts_root: proofs::calculate_receipt_root(std::slice::from_ref(
&receipt.with_bloom_ref(),
)),
receipts_root: proofs::calculate_receipt_root(std::slice::from_ref(&receipt)),
logs_bloom: receipt.bloom(),
extra_data: encode_jovian_extra_data(
Default::default(),
@@ -705,9 +693,7 @@ mod tests {
)),
gas_used: 0,
timestamp: u64::MAX,
receipts_root: proofs::calculate_receipt_root(std::slice::from_ref(
&receipt.with_bloom_ref(),
)),
receipts_root: proofs::calculate_receipt_root(std::slice::from_ref(&receipt)),
logs_bloom: receipt.bloom(),
parent_hash: parent.hash(),
..Default::default()
@@ -736,7 +722,7 @@ mod tests {
let beacon_consensus = OpBeaconConsensus::new(Arc::new(chain_spec));
let receipt = OpReceipt::Eip7702(Receipt::<Log> {
let receipt = OpReceipt::Eip7702(Receipt {
status: Eip658Value::success(),
cumulative_gas_used: 0,
logs: vec![],
@@ -753,9 +739,7 @@ mod tests {
)),
gas_used: 0,
timestamp: u64::MAX - 1,
receipts_root: proofs::calculate_receipt_root(std::slice::from_ref(
&receipt.with_bloom_ref(),
)),
receipts_root: proofs::calculate_receipt_root(std::slice::from_ref(&receipt)),
logs_bloom: receipt.bloom(),
extra_data: encode_holocene_extra_data(Default::default(), BaseFeeParams::optimism())
.unwrap(),
@@ -775,9 +759,7 @@ mod tests {
)),
gas_used: 0,
timestamp: u64::MAX,
receipts_root: proofs::calculate_receipt_root(std::slice::from_ref(
&receipt.with_bloom_ref(),
)),
receipts_root: proofs::calculate_receipt_root(std::slice::from_ref(&receipt)),
logs_bloom: receipt.bloom(),
parent_hash: parent.hash(),
..Default::default()

View File

@@ -4,7 +4,7 @@ use crate::OpConsensusError;
use alloy_consensus::BlockHeader;
use alloy_primitives::B256;
use alloy_trie::EMPTY_ROOT_HASH;
use reth_optimism_primitives::L2_TO_L1_MESSAGE_PASSER_ADDRESS;
use reth_optimism_primitives::ADDRESS_L2_TO_L1_MESSAGE_PASSER;
use reth_storage_api::{errors::ProviderResult, StorageRootProvider};
use reth_trie_common::HashedStorage;
use revm::database::BundleState;
@@ -32,7 +32,7 @@ pub fn withdrawals_root<DB: StorageRootProvider>(
withdrawals_root_prehashed(
state_updates
.state()
.get(&L2_TO_L1_MESSAGE_PASSER_ADDRESS)
.get(&ADDRESS_L2_TO_L1_MESSAGE_PASSER)
.map(|acc| {
HashedStorage::from_plain_storage(
acc.status,
@@ -52,7 +52,7 @@ pub fn withdrawals_root_prehashed<DB: StorageRootProvider>(
hashed_storage_updates: HashedStorage,
state: DB,
) -> ProviderResult<B256> {
state.storage_root(L2_TO_L1_MESSAGE_PASSER_ADDRESS, hashed_storage_updates)
state.storage_root(ADDRESS_L2_TO_L1_MESSAGE_PASSER, hashed_storage_updates)
}
/// Verifies block header field `withdrawals_root` against storage root of
@@ -146,7 +146,7 @@ mod test {
#[test]
fn l2tol1_message_passer_no_withdrawals() {
let hashed_address = keccak256(L2_TO_L1_MESSAGE_PASSER_ADDRESS);
let hashed_address = keccak256(ADDRESS_L2_TO_L1_MESSAGE_PASSER);
// create account storage
let init_storage = HashedStorage::from_iter(

View File

@@ -486,14 +486,14 @@ mod tests {
block2.set_hash(block2_hash);
// Create a random receipt object, receipt1
let receipt1 = OpReceipt::Legacy(Receipt::<Log> {
let receipt1 = OpReceipt::Legacy(Receipt {
cumulative_gas_used: 46913,
logs: vec![],
status: true.into(),
});
// Create another random receipt object, receipt2
let receipt2 = OpReceipt::Legacy(Receipt::<Log> {
let receipt2 = OpReceipt::Legacy(Receipt {
cumulative_gas_used: 1325345,
logs: vec![],
status: true.into(),
@@ -544,7 +544,7 @@ mod tests {
);
// Create a Receipts object with a vector of receipt vectors
let receipts = vec![vec![Some(OpReceipt::Legacy(Receipt::<Log> {
let receipts = vec![vec![Some(OpReceipt::Legacy(Receipt {
cumulative_gas_used: 46913,
logs: vec![],
status: true.into(),
@@ -602,7 +602,7 @@ mod tests {
#[test]
fn test_block_number_to_index() {
// Create a Receipts object with a vector of receipt vectors
let receipts = vec![vec![Some(OpReceipt::Legacy(Receipt::<Log> {
let receipts = vec![vec![Some(OpReceipt::Legacy(Receipt {
cumulative_gas_used: 46913,
logs: vec![],
status: true.into(),
@@ -633,7 +633,7 @@ mod tests {
#[test]
fn test_get_logs() {
// Create a Receipts object with a vector of receipt vectors
let receipts = vec![vec![OpReceipt::Legacy(Receipt::<Log> {
let receipts = vec![vec![OpReceipt::Legacy(Receipt {
cumulative_gas_used: 46913,
logs: vec![Log::<LogData>::default()],
status: true.into(),
@@ -661,7 +661,7 @@ mod tests {
#[test]
fn test_receipts_by_block() {
// Create a Receipts object with a vector of receipt vectors
let receipts = vec![vec![Some(OpReceipt::Legacy(Receipt::<Log> {
let receipts = vec![vec![Some(OpReceipt::Legacy(Receipt {
cumulative_gas_used: 46913,
logs: vec![Log::<LogData>::default()],
status: true.into(),
@@ -685,7 +685,7 @@ mod tests {
// Assert that the receipts for block number 123 match the expected receipts
assert_eq!(
receipts_by_block,
vec![&Some(OpReceipt::Legacy(Receipt::<Log> {
vec![&Some(OpReceipt::Legacy(Receipt {
cumulative_gas_used: 46913,
logs: vec![Log::<LogData>::default()],
status: true.into(),
@@ -696,7 +696,7 @@ mod tests {
#[test]
fn test_receipts_len() {
// Create a Receipts object with a vector of receipt vectors
let receipts = vec![vec![Some(OpReceipt::Legacy(Receipt::<Log> {
let receipts = vec![vec![Some(OpReceipt::Legacy(Receipt {
cumulative_gas_used: 46913,
logs: vec![Log::<LogData>::default()],
status: true.into(),
@@ -741,7 +741,7 @@ mod tests {
#[test]
fn test_revert_to() {
// Create a random receipt object
let receipt = OpReceipt::Legacy(Receipt::<Log> {
let receipt = OpReceipt::Legacy(Receipt {
cumulative_gas_used: 46913,
logs: vec![],
status: true.into(),
@@ -786,7 +786,7 @@ mod tests {
#[test]
fn test_extend_execution_outcome() {
// Create a Receipt object with specific attributes.
let receipt = OpReceipt::Legacy(Receipt::<Log> {
let receipt = OpReceipt::Legacy(Receipt {
cumulative_gas_used: 46913,
logs: vec![],
status: true.into(),
@@ -826,7 +826,7 @@ mod tests {
#[test]
fn test_split_at_execution_outcome() {
// Create a random receipt object
let receipt = OpReceipt::Legacy(Receipt::<Log> {
let receipt = OpReceipt::Legacy(Receipt {
cumulative_gas_used: 46913,
logs: vec![],
status: true.into(),

View File

@@ -18,7 +18,7 @@ use reth_node_api::{
use reth_optimism_consensus::isthmus;
use reth_optimism_forks::OpHardforks;
use reth_optimism_payload_builder::{OpExecutionPayloadValidator, OpPayloadTypes};
use reth_optimism_primitives::{OpBlock, L2_TO_L1_MESSAGE_PASSER_ADDRESS};
use reth_optimism_primitives::{OpBlock, ADDRESS_L2_TO_L1_MESSAGE_PASSER};
use reth_primitives_traits::{Block, RecoveredBlock, SealedBlock, SignedTransaction};
use reth_provider::StateProviderFactory;
use reth_trie_common::{HashedPostState, KeyHasher};
@@ -76,7 +76,7 @@ pub struct OpEngineValidator<P, Tx, ChainSpec> {
impl<P, Tx, ChainSpec> OpEngineValidator<P, Tx, ChainSpec> {
/// Instantiates a new validator.
pub fn new<KH: KeyHasher>(chain_spec: Arc<ChainSpec>, provider: P) -> Self {
let hashed_addr_l2tol1_msg_passer = KH::hash_key(L2_TO_L1_MESSAGE_PASSER_ADDRESS);
let hashed_addr_l2tol1_msg_passer = KH::hash_key(ADDRESS_L2_TO_L1_MESSAGE_PASSER);
Self {
inner: OpExecutionPayloadValidator::new(chain_spec),
provider,

View File

@@ -20,7 +20,7 @@ use reth_evm::{
};
use reth_execution_types::ExecutionOutcome;
use reth_optimism_forks::OpHardforks;
use reth_optimism_primitives::{transaction::OpTransaction, L2_TO_L1_MESSAGE_PASSER_ADDRESS};
use reth_optimism_primitives::{transaction::OpTransaction, ADDRESS_L2_TO_L1_MESSAGE_PASSER};
use reth_optimism_txpool::{
estimated_da_size::DataAvailabilitySized,
interop::{is_valid_interop, MaybeInteropTransaction},
@@ -435,7 +435,7 @@ impl<Txs> OpBuilder<'_, Txs> {
if ctx.chain_spec.is_isthmus_active_at_timestamp(ctx.attributes().timestamp()) {
// force load `L2ToL1MessagePasser.sol` so l2 withdrawals root can be computed even if
// no l2 withdrawals in block
_ = db.load_cache_account(L2_TO_L1_MESSAGE_PASSER_ADDRESS)?;
_ = db.load_cache_account(ADDRESS_L2_TO_L1_MESSAGE_PASSER)?;
}
let ExecutionWitnessRecord { hashed_state, codes, keys, lowest_block_number: _ } =

View File

@@ -13,8 +13,8 @@ extern crate alloc;
pub mod bedrock;
// Re-export predeploys from op-alloy-consensus
pub use op_alloy_consensus::L2_TO_L1_MESSAGE_PASSER_ADDRESS;
pub mod predeploys;
pub use predeploys::ADDRESS_L2_TO_L1_MESSAGE_PASSER;
pub mod transaction;
pub use transaction::*;

View File

@@ -0,0 +1,8 @@
//! Addresses of OP pre-deploys.
// todo: move to op-alloy
use alloy_primitives::{address, Address};
/// The L2 contract `L2ToL1MessagePasser`, stores commitments to withdrawal transactions.
pub const ADDRESS_L2_TO_L1_MESSAGE_PASSER: Address =
address!("0x4200000000000000000000000000000000000016");

View File

@@ -176,7 +176,7 @@ mod tests {
let mut data = Vec::with_capacity(expected.length());
let receipt = ReceiptWithBloom {
receipt: OpReceipt::Legacy(Receipt::<Log> {
receipt: OpReceipt::Legacy(Receipt {
status: Eip658Value::Eip658(false),
cumulative_gas_used: 0x1,
logs: vec![Log::new_unchecked(
@@ -207,7 +207,7 @@ mod tests {
// EIP658Receipt
let expected = ReceiptWithBloom {
receipt: OpReceipt::Legacy(Receipt::<Log> {
receipt: OpReceipt::Legacy(Receipt {
status: Eip658Value::Eip658(false),
cumulative_gas_used: 0x1,
logs: vec![Log::new_unchecked(
@@ -235,7 +235,7 @@ mod tests {
// Deposit Receipt (post-regolith)
let expected = ReceiptWithBloom {
receipt: OpReceipt::Deposit(OpDepositReceipt {
inner: Receipt::<Log> {
inner: Receipt {
status: Eip658Value::Eip658(true),
cumulative_gas_used: 46913,
logs: vec![],
@@ -260,10 +260,10 @@ mod tests {
"b901117ef9010d0182b741b9010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000c0833d3bbf01"
);
// Deposit Receipt (post-canyon)
// Deposit Receipt (post-regolith)
let expected = ReceiptWithBloom {
receipt: OpReceipt::Deposit(OpDepositReceipt {
inner: Receipt::<Log> {
inner: Receipt {
status: Eip658Value::Eip658(true),
cumulative_gas_used: 46913,
logs: vec![],
@@ -284,7 +284,7 @@ mod tests {
#[test]
fn gigantic_receipt() {
let receipt = OpReceipt::Legacy(Receipt::<Log> {
let receipt = OpReceipt::Legacy(Receipt {
status: Eip658Value::Eip658(true),
cumulative_gas_used: 16747627,
logs: vec![
@@ -314,7 +314,7 @@ mod tests {
#[test]
fn test_encode_2718_length() {
let receipt = ReceiptWithBloom {
receipt: OpReceipt::Eip1559(Receipt::<Log> {
receipt: OpReceipt::Eip1559(Receipt {
status: Eip658Value::Eip658(true),
cumulative_gas_used: 21000,
logs: vec![],
@@ -331,7 +331,7 @@ mod tests {
// Test for legacy receipt as well
let legacy_receipt = ReceiptWithBloom {
receipt: OpReceipt::Legacy(Receipt::<Log> {
receipt: OpReceipt::Legacy(Receipt {
status: Eip658Value::Eip658(true),
cumulative_gas_used: 21000,
logs: vec![],

View File

@@ -272,11 +272,6 @@ where
fn tracing_task_guard(&self) -> &BlockingTaskGuard {
self.inner.eth_api.blocking_task_guard()
}
#[inline]
fn blocking_io_task_guard(&self) -> &Arc<tokio::sync::Semaphore> {
self.inner.eth_api.blocking_io_request_semaphore()
}
}
impl<N, Rpc> LoadFee for OpEthApi<N, Rpc>

View File

@@ -1,16 +1,17 @@
//! Loads and formats OP receipt RPC response.
use crate::{eth::RpcNodeCore, OpEthApi, OpEthApiError};
use alloy_consensus::{BlockHeader, Receipt, ReceiptWithBloom, TxReceipt};
use alloy_consensus::{BlockHeader, Receipt, TxReceipt};
use alloy_eips::eip2718::Encodable2718;
use alloy_rpc_types_eth::{Log, TransactionReceipt};
use op_alloy_consensus::{OpReceipt, OpTransaction};
use op_alloy_consensus::{OpReceiptEnvelope, OpTransaction};
use op_alloy_rpc_types::{L1BlockInfo, OpTransactionReceipt, OpTransactionReceiptFields};
use op_revm::estimate_tx_compressed_size;
use reth_chainspec::ChainSpecProvider;
use reth_node_api::NodePrimitives;
use reth_optimism_evm::RethL1BlockInfo;
use reth_optimism_forks::OpHardforks;
use reth_optimism_primitives::OpReceipt;
use reth_primitives_traits::SealedBlock;
use reth_rpc_eth_api::{
helpers::LoadReceipt,
@@ -269,7 +270,7 @@ impl OpReceiptFieldsBuilder {
#[derive(Debug)]
pub struct OpReceiptBuilder {
/// Core receipt, has all the fields of an L1 receipt and is the basis for the OP receipt.
pub core_receipt: TransactionReceipt<ReceiptWithBloom<OpReceipt<Log>>>,
pub core_receipt: TransactionReceipt<OpReceiptEnvelope<Log>>,
/// Additional OP receipt fields.
pub op_receipt_fields: OpTransactionReceiptFields,
}
@@ -293,14 +294,24 @@ impl OpReceiptBuilder {
let logs = Log::collect_for_receipt(next_log_index, meta, logs);
Receipt { status, cumulative_gas_used, logs }
};
let mapped_receipt: OpReceipt<Log> = match receipt {
OpReceipt::Legacy(receipt) => OpReceipt::Legacy(map_logs(receipt)),
OpReceipt::Eip2930(receipt) => OpReceipt::Eip2930(map_logs(receipt)),
OpReceipt::Eip1559(receipt) => OpReceipt::Eip1559(map_logs(receipt)),
OpReceipt::Eip7702(receipt) => OpReceipt::Eip7702(map_logs(receipt)),
OpReceipt::Deposit(receipt) => OpReceipt::Deposit(receipt.map_inner(map_logs)),
};
mapped_receipt.into_with_bloom()
match receipt {
OpReceipt::Legacy(receipt) => {
OpReceiptEnvelope::Legacy(map_logs(receipt).into_with_bloom())
}
OpReceipt::Eip2930(receipt) => {
OpReceiptEnvelope::Eip2930(map_logs(receipt).into_with_bloom())
}
OpReceipt::Eip1559(receipt) => {
OpReceiptEnvelope::Eip1559(map_logs(receipt).into_with_bloom())
}
OpReceipt::Eip7702(receipt) => {
OpReceiptEnvelope::Eip7702(map_logs(receipt).into_with_bloom())
}
OpReceipt::Deposit(receipt) => {
OpReceiptEnvelope::Deposit(receipt.map_inner(map_logs).into_with_bloom())
}
}
});
// In jovian, we're using the blob gas used field to store the current da

View File

@@ -3,7 +3,7 @@
use crate::{MessageValidationKind, PayloadAttributes};
use alloc::vec::Vec;
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal, eip7685::Requests, BlockNumHash};
use alloy_primitives::{Bytes, B256};
use alloy_primitives::B256;
use alloy_rpc_types_engine::ExecutionData;
use core::fmt::Debug;
use serde::{de::DeserializeOwned, Serialize};
@@ -40,11 +40,6 @@ pub trait ExecutionPayload:
/// Returns `None` for pre-Shanghai blocks.
fn withdrawals(&self) -> Option<&Vec<Withdrawal>>;
/// Returns the access list included in this payload.
///
/// Returns `None` for pre-Amsterdam blocks.
fn block_access_list(&self) -> Option<&Bytes>;
/// Returns the beacon block root associated with this payload.
///
/// Returns `None` for pre-merge payloads.
@@ -74,10 +69,6 @@ impl ExecutionPayload for ExecutionData {
self.payload.withdrawals()
}
fn block_access_list(&self) -> Option<&Bytes> {
None
}
fn parent_beacon_block_root(&self) -> Option<B256> {
self.sidecar.parent_beacon_block_root()
}
@@ -181,10 +172,6 @@ impl ExecutionPayload for op_alloy_rpc_types_engine::OpExecutionData {
self.payload.as_v2().map(|p| &p.withdrawals)
}
fn block_access_list(&self) -> Option<&Bytes> {
None
}
fn parent_beacon_block_root(&self) -> Option<B256> {
self.sidecar.parent_beacon_block_root()
}

View File

@@ -218,27 +218,6 @@ pub trait BlockBody:
.zip(signers)
.map(|(tx, signer)| Recovered::new_unchecked(tx, signer)))
}
/// Returns an iterator over `Recovered<&Transaction>` for all transactions in the block body
/// _without ensuring that the signature has a low `s` value_.
///
/// This method recovers signers and returns an iterator without cloning transactions,
/// making it more efficient than recovering with owned transactions when owned values are not
/// required.
///
/// # Errors
///
/// Returns an error if any transaction's signature is invalid.
fn recover_transactions_unchecked_ref(
&self,
) -> Result<impl Iterator<Item = Recovered<&Self::Transaction>> + '_, RecoveryError> {
let signers = self.recover_signers_unchecked()?;
Ok(self
.transactions()
.iter()
.zip(signers)
.map(|(tx, signer)| Recovered::new_unchecked(tx, signer)))
}
}
impl<T, H> BlockBody for alloy_consensus::BlockBody<T, H>

View File

@@ -1,14 +1,14 @@
use crate::PruneLimiter;
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW, RangeWalker},
table::{DupSort, Table, TableRow},
transaction::{DbTx, DbTxMut},
table::{Table, TableRow},
transaction::DbTxMut,
DatabaseError,
};
use std::{fmt::Debug, ops::RangeBounds};
use tracing::debug;
pub(crate) trait DbTxPruneExt: DbTxMut + DbTx {
pub(crate) trait DbTxPruneExt: DbTxMut {
/// Prune the table for the specified pre-sorted key iterator.
///
/// Returns number of rows pruned.
@@ -123,55 +123,9 @@ pub(crate) trait DbTxPruneExt: DbTxMut + DbTx {
Ok(false)
}
/// Prune a DUPSORT table for the specified key range.
///
/// Returns number of rows pruned.
fn prune_dupsort_table_with_range<T: DupSort>(
&self,
keys: impl RangeBounds<T::Key> + Clone + Debug,
limiter: &mut PruneLimiter,
mut delete_callback: impl FnMut(TableRow<T>),
) -> Result<(usize, bool), DatabaseError> {
let starting_entries = self.entries::<T>()?;
let mut cursor = self.cursor_dup_write::<T>()?;
let mut walker = cursor.walk_range(keys)?;
let done = loop {
if limiter.is_limit_reached() {
debug!(
target: "providers::db",
?limiter,
deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(),
time_limit = %limiter.is_time_limit_reached(),
table = %T::NAME,
"Pruning limit reached"
);
break false
}
let Some(res) = walker.next() else { break true };
let row = res?;
walker.delete_current_duplicates()?;
limiter.increment_deleted_entries_count();
delete_callback(row);
};
debug!(
target: "providers::db",
table=?T::NAME,
cursor_current=?cursor.current(),
"done walking",
);
let ending_entries = self.entries::<T>()?;
Ok((starting_entries - ending_entries, done))
}
}
impl<Tx> DbTxPruneExt for Tx where Tx: DbTxMut + DbTx {}
impl<Tx> DbTxPruneExt for Tx where Tx: DbTxMut {}
#[cfg(test)]
mod tests {

View File

@@ -71,9 +71,10 @@ where
let mut last_storages_pruned_block = None;
let (storages_pruned, done) =
provider.tx_ref().prune_dupsort_table_with_range::<tables::StoragesTrieChangeSets>(
provider.tx_ref().prune_table_with_range::<tables::StoragesTrieChangeSets>(
storage_range,
&mut limiter,
|_| false,
|(BlockNumberHashedAddress((block_number, _)), _)| {
last_storages_pruned_block = Some(block_number);
},
@@ -89,9 +90,10 @@ where
.unwrap_or(block_range_end);
let (accounts_pruned, done) =
provider.tx_ref().prune_dupsort_table_with_range::<tables::AccountsTrieChangeSets>(
provider.tx_ref().prune_table_with_range::<tables::AccountsTrieChangeSets>(
block_range,
&mut limiter,
|_| false,
|row| last_accounts_pruned_block = row.0,
)?;

View File

@@ -35,11 +35,9 @@ alloy-serde.workspace = true
alloy-rpc-types-beacon.workspace = true
alloy-rpc-types-engine.workspace = true
alloy-genesis.workspace = true
serde = { workspace = true, features = ["derive"] }
# misc
jsonrpsee = { workspace = true, features = ["server", "macros"] }
serde_json.workspace = true
[features]
client = [
@@ -47,8 +45,3 @@ client = [
"jsonrpsee/async-client",
"reth-rpc-eth-api/client",
]
[dev-dependencies]
serde_json = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
jsonrpsee = { workspace = true, features = ["client", "async-client", "http-client"] }

View File

@@ -3,7 +3,7 @@ use alloy_genesis::ChainConfig;
use alloy_json_rpc::RpcObject;
use alloy_primitives::{Address, Bytes, B256};
use alloy_rpc_types_debug::ExecutionWitness;
use alloy_rpc_types_eth::{Bundle, StateContext};
use alloy_rpc_types_eth::{Block, Bundle, StateContext};
use alloy_rpc_types_trace::geth::{
BlockTraceResult, GethDebugTracingCallOptions, GethDebugTracingOptions, GethTrace, TraceResult,
};
@@ -38,7 +38,7 @@ pub trait DebugApi<TxReq: RpcObject> {
/// Returns an array of recent bad blocks that the client has seen on the network.
#[method(name = "getBadBlocks")]
async fn bad_blocks(&self) -> RpcResult<Vec<serde_json::Value>>;
async fn bad_blocks(&self) -> RpcResult<Vec<Block>>;
/// Returns the structured logs created during the execution of EVM between two blocks
/// (excluding start) as a JSON object.

View File

@@ -25,14 +25,11 @@ mod net;
mod otterscan;
mod reth;
mod rpc;
mod testing;
mod trace;
mod txpool;
mod validation;
mod web3;
pub use testing::{TestingBuildBlockRequestV1, TESTING_BUILD_BLOCK_V1};
/// re-export of all server traits
pub use servers::*;
@@ -48,7 +45,6 @@ pub mod servers {
otterscan::OtterscanServer,
reth::RethApiServer,
rpc::RpcApiServer,
testing::TestingApiServer,
trace::TraceApiServer,
txpool::TxPoolApiServer,
validation::BlockSubmissionValidationApiServer,
@@ -79,7 +75,6 @@ pub mod clients {
otterscan::OtterscanClient,
reth::RethApiClient,
rpc::RpcApiServer,
testing::TestingApiClient,
trace::TraceApiClient,
txpool::TxPoolApiClient,
validation::BlockSubmissionValidationApiClient,

View File

@@ -1,45 +0,0 @@
//! Testing namespace for building a block in a single call.
//!
//! This follows the `testing_buildBlockV1` specification. **Highly sensitive:**
//! testing-only, powerful enough to include arbitrary transactions; must stay
//! disabled by default and never be exposed on public-facing RPC without an
//! explicit operator flag.
use alloy_primitives::{Bytes, B256};
use alloy_rpc_types_engine::{
ExecutionPayloadEnvelopeV5, PayloadAttributes as EthPayloadAttributes,
};
use jsonrpsee::proc_macros::rpc;
use serde::{Deserialize, Serialize};
/// Capability string for `testing_buildBlockV1`.
pub const TESTING_BUILD_BLOCK_V1: &str = "testing_buildBlockV1";
/// Request payload for `testing_buildBlockV1`.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TestingBuildBlockRequestV1 {
/// Parent block hash of the block to build.
pub parent_block_hash: B256,
/// Payload attributes (Cancun version).
pub payload_attributes: EthPayloadAttributes,
/// Raw signed transactions to force-include in order.
pub transactions: Vec<Bytes>,
/// Optional extra data for the block header.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub extra_data: Option<Bytes>,
}
/// Testing RPC interface for building a block in a single call.
#[cfg_attr(not(feature = "client"), rpc(server, namespace = "testing"))]
#[cfg_attr(feature = "client", rpc(server, client, namespace = "testing"))]
pub trait TestingApi {
/// Builds a block using the provided parent, payload attributes, and transactions.
///
/// See <https://github.com/marcindsobczak/execution-apis/blob/main/src/testing/testing_buildBlockV1.md>
#[method(name = "buildBlockV1")]
async fn build_block_v1(
&self,
request: TestingBuildBlockRequestV1,
) -> jsonrpsee::core::RpcResult<ExecutionPayloadEnvelopeV5>;
}

View File

@@ -17,7 +17,6 @@ reth-primitives-traits.workspace = true
reth-ipc.workspace = true
reth-chainspec.workspace = true
reth-consensus.workspace = true
reth-engine-primitives.workspace = true
reth-network-api.workspace = true
reth-node-core.workspace = true
reth-rpc.workspace = true
@@ -27,7 +26,6 @@ reth-rpc-layer.workspace = true
reth-rpc-eth-types.workspace = true
reth-rpc-server-types.workspace = true
reth-tasks = { workspace = true, features = ["rayon"] }
reth-tokio-util.workspace = true
reth-transaction-pool.workspace = true
reth-storage-api.workspace = true
reth-chain-state.workspace = true
@@ -65,6 +63,7 @@ reth-rpc-api = { workspace = true, features = ["client"] }
reth-rpc-engine-api.workspace = true
reth-tracing.workspace = true
reth-transaction-pool = { workspace = true, features = ["test-utils"] }
reth-engine-primitives.workspace = true
reth-node-ethereum.workspace = true
alloy-primitives.workspace = true

View File

@@ -94,7 +94,6 @@ impl RethRpcServerConfig for RpcServerArgs {
fn eth_config(&self) -> EthConfig {
EthConfig::default()
.max_tracing_requests(self.rpc_max_tracing_requests)
.max_blocking_io_requests(self.rpc_max_blocking_io_requests)
.max_trace_filter_blocks(self.rpc_max_trace_filter_blocks)
.max_blocks_per_filter(self.rpc_max_blocks_per_filter.unwrap_or_max())
.max_logs_per_response(self.rpc_max_logs_per_response.unwrap_or_max() as usize)
@@ -139,7 +138,7 @@ impl RethRpcServerConfig for RpcServerArgs {
fn transport_rpc_module_config(&self) -> TransportRpcModuleConfig {
let mut config = TransportRpcModuleConfig::default()
.with_config(RpcModuleConfig::new(self.eth_config()));
.with_config(RpcModuleConfig::new(self.eth_config(), self.flashbots_config()));
if self.http {
config = config.with_http(

View File

@@ -32,13 +32,12 @@ use jsonrpsee::{
};
use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
use reth_consensus::{ConsensusError, FullConsensus};
use reth_engine_primitives::ConsensusEngineEvent;
use reth_evm::ConfigureEvm;
use reth_network_api::{noop::NoopNetwork, NetworkInfo, Peers};
use reth_primitives_traits::{NodePrimitives, TxTy};
use reth_rpc::{
AdminApi, DebugApi, EngineEthApi, EthApi, EthApiBuilder, EthBundle, MinerApi, NetApi,
OtterscanApi, RPCApi, RethApi, TraceApi, TxPoolApi, Web3Api,
OtterscanApi, RPCApi, RethApi, TraceApi, TxPoolApi, ValidationApiConfig, Web3Api,
};
use reth_rpc_api::servers::*;
use reth_rpc_eth_api::{
@@ -47,18 +46,16 @@ use reth_rpc_eth_api::{
TraceExt,
},
node::RpcNodeCoreAdapter,
EthApiServer, EthApiTypes, FullEthApiServer, FullEthApiTypes, RpcBlock, RpcConvert,
RpcConverter, RpcHeader, RpcNodeCore, RpcReceipt, RpcTransaction, RpcTxReq,
EthApiServer, EthApiTypes, FullEthApiServer, RpcBlock, RpcConvert, RpcConverter, RpcHeader,
RpcNodeCore, RpcReceipt, RpcTransaction, RpcTxReq,
};
use reth_rpc_eth_types::{receipt::EthReceiptConverter, EthConfig, EthSubscriptionIdProvider};
use reth_rpc_layer::{AuthLayer, Claims, CompressionLayer, JwtAuthValidator, JwtSecret};
pub use reth_rpc_server_types::RethRpcModule;
use reth_storage_api::{
AccountReader, BlockReader, ChangeSetReader, FullRpcProvider, NodePrimitivesProvider,
AccountReader, BlockReader, ChangeSetReader, FullRpcProvider, ProviderBlock,
StateProviderFactory,
};
use reth_tasks::{pool::BlockingTaskGuard, TaskSpawner, TokioTaskExecutor};
use reth_tokio_util::EventSender;
use reth_transaction_pool::{noop::NoopTransactionPool, TransactionPool};
use serde::{Deserialize, Serialize};
use std::{
@@ -77,7 +74,7 @@ use jsonrpsee::server::ServerConfigBuilder;
pub use reth_ipc::server::{
Builder as IpcServerBuilder, RpcServiceBuilder as IpcRpcServiceBuilder,
};
pub use reth_rpc_server_types::{constants, RpcModuleSelection};
pub use reth_rpc_server_types::{constants, RethRpcModule, RpcModuleSelection};
pub use tower::layer::util::{Identity, Stack};
/// Auth server utilities.
@@ -329,7 +326,6 @@ where
module_config: TransportRpcModuleConfig,
engine: impl IntoEngineApiRpcModule,
eth: EthApi,
engine_events: EventSender<ConsensusEngineEvent<N>>,
) -> (
TransportRpcModules,
AuthRpcModule,
@@ -338,10 +334,16 @@ where
where
EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
{
let Self { provider, pool, network, executor, consensus, evm_config, .. } = self;
let config = module_config.config.clone().unwrap_or_default();
let mut registry = self.into_registry(config, eth, engine_events);
let mut registry = RpcRegistryInner::new(
provider, pool, network, executor, consensus, config, evm_config, eth,
);
let modules = registry.create_transport_rpc_modules(module_config);
let auth_module = registry.create_auth_module(engine);
(modules, auth_module, registry)
@@ -355,23 +357,12 @@ where
self,
config: RpcModuleConfig,
eth: EthApi,
engine_events: EventSender<ConsensusEngineEvent<N>>,
) -> RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
where
EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
EthApi: EthApiTypes + 'static,
{
let Self { provider, pool, network, executor, consensus, evm_config, .. } = self;
RpcRegistryInner::new(
provider,
pool,
network,
executor,
consensus,
config,
evm_config,
eth,
engine_events,
)
RpcRegistryInner::new(provider, pool, network, executor, consensus, config, evm_config, eth)
}
/// Configures all [`RpcModule`]s specific to the given [`TransportRpcModuleConfig`] which can
@@ -380,17 +371,27 @@ where
self,
module_config: TransportRpcModuleConfig,
eth: EthApi,
engine_events: EventSender<ConsensusEngineEvent<N>>,
) -> TransportRpcModules<()>
where
EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
{
let mut modules = TransportRpcModules::default();
let Self { provider, pool, network, executor, consensus, evm_config, .. } = self;
if !module_config.is_empty() {
let TransportRpcModuleConfig { http, ws, ipc, config } = module_config.clone();
let mut registry = self.into_registry(config.unwrap_or_default(), eth, engine_events);
let mut registry = RpcRegistryInner::new(
provider,
pool,
network,
executor,
consensus,
config.unwrap_or_default(),
evm_config,
eth,
);
modules.config = module_config;
modules.http = registry.maybe_module(http.as_ref());
@@ -413,6 +414,8 @@ impl<N: NodePrimitives> Default for RpcModuleBuilder<N, (), (), (), (), ()> {
pub struct RpcModuleConfig {
/// `eth` namespace settings
eth: EthConfig,
/// `flashbots` namespace settings
flashbots: ValidationApiConfig,
}
// === impl RpcModuleConfig ===
@@ -424,8 +427,8 @@ impl RpcModuleConfig {
}
/// Returns a new RPC module config given the eth namespace config
pub const fn new(eth: EthConfig) -> Self {
Self { eth }
pub const fn new(eth: EthConfig, flashbots: ValidationApiConfig) -> Self {
Self { eth, flashbots }
}
/// Get a reference to the eth namespace config
@@ -443,6 +446,7 @@ impl RpcModuleConfig {
#[derive(Clone, Debug, Default)]
pub struct RpcModuleConfigBuilder {
eth: Option<EthConfig>,
flashbots: Option<ValidationApiConfig>,
}
// === impl RpcModuleConfigBuilder ===
@@ -454,10 +458,16 @@ impl RpcModuleConfigBuilder {
self
}
/// Configures a custom flashbots namespace config
pub fn flashbots(mut self, flashbots: ValidationApiConfig) -> Self {
self.flashbots = Some(flashbots);
self
}
/// Consumes the type and creates the [`RpcModuleConfig`]
pub fn build(self) -> RpcModuleConfig {
let Self { eth } = self;
RpcModuleConfig { eth: eth.unwrap_or_default() }
let Self { eth, flashbots } = self;
RpcModuleConfig { eth: eth.unwrap_or_default(), flashbots: flashbots.unwrap_or_default() }
}
/// Get a reference to the eth namespace config, if any
@@ -477,8 +487,16 @@ impl RpcModuleConfigBuilder {
}
/// A Helper type the holds instances of the configured modules.
#[derive(Debug)]
pub struct RpcRegistryInner<Provider, Pool, Network, EthApi: EthApiTypes, EvmConfig, Consensus> {
#[derive(Debug, Clone)]
#[expect(dead_code)] // Consensus generic, might be useful in the future
pub struct RpcRegistryInner<
Provider: BlockReader,
Pool,
Network,
EthApi: EthApiTypes,
EvmConfig,
Consensus,
> {
provider: Provider,
pool: Pool,
network: Network,
@@ -493,9 +511,6 @@ pub struct RpcRegistryInner<Provider, Pool, Network, EthApi: EthApiTypes, EvmCon
modules: HashMap<RethRpcModule, Methods>,
/// eth config settings
eth_config: EthConfig,
/// Notification channel for engine API events
engine_events:
EventSender<ConsensusEngineEvent<<EthApi::RpcConvert as RpcConvert>::Primitives>>,
}
// === impl RpcRegistryInner ===
@@ -512,7 +527,7 @@ where
+ 'static,
Pool: Send + Sync + Clone + 'static,
Network: Clone + 'static,
EthApi: FullEthApiTypes + 'static,
EthApi: EthApiTypes + 'static,
EvmConfig: ConfigureEvm<Primitives = N>,
{
/// Creates a new, empty instance.
@@ -526,9 +541,6 @@ where
config: RpcModuleConfig,
evm_config: EvmConfig,
eth_api: EthApi,
engine_events: EventSender<
ConsensusEngineEvent<<EthApi::Provider as NodePrimitivesProvider>::Primitives>,
>,
) -> Self
where
EvmConfig: ConfigureEvm<Primitives = N>,
@@ -548,14 +560,14 @@ where
blocking_pool_guard,
eth_config: config.eth,
evm_config,
engine_events,
}
}
}
impl<Provider, Pool, Network, EthApi, Evm, Consensus>
RpcRegistryInner<Provider, Pool, Network, EthApi, Evm, Consensus>
impl<Provider, Pool, Network, EthApi, BlockExecutor, Consensus>
RpcRegistryInner<Provider, Pool, Network, EthApi, BlockExecutor, Consensus>
where
Provider: BlockReader,
EthApi: EthApiTypes,
{
/// Returns a reference to the installed [`EthApi`].
@@ -583,11 +595,6 @@ where
&self.provider
}
/// Returns a reference to the evm config
pub const fn evm_config(&self) -> &Evm {
&self.evm_config
}
/// Returns all installed methods
pub fn methods(&self) -> Vec<Methods> {
self.modules.values().cloned().collect()
@@ -699,7 +706,8 @@ where
/// If called outside of the tokio runtime. See also [`Self::eth_api`]
pub fn register_debug(&mut self) -> &mut Self
where
EthApi: EthTransactions + TraceExt,
EthApi: EthApiSpec + EthTransactions + TraceExt,
EvmConfig::Primitives: NodePrimitives<Block = ProviderBlock<EthApi::Provider>>,
{
let debug_api = self.debug_api();
self.modules.insert(RethRpcModule::Debug, debug_api.into_rpc().into());
@@ -806,16 +814,8 @@ where
/// # Panics
///
/// If called outside of the tokio runtime. See also [`Self::eth_api`]
pub fn debug_api(&self) -> DebugApi<EthApi>
where
EthApi: FullEthApiTypes,
{
DebugApi::new(
self.eth_api().clone(),
self.blocking_pool_guard.clone(),
self.tasks(),
self.engine_events.new_listener(),
)
pub fn debug_api(&self) -> DebugApi<EthApi> {
DebugApi::new(self.eth_api().clone(), self.blocking_pool_guard.clone())
}
/// Instantiates `NetApi`
@@ -933,14 +933,11 @@ where
)
.into_rpc()
.into(),
RethRpcModule::Debug => DebugApi::new(
eth_api.clone(),
self.blocking_pool_guard.clone(),
&*self.executor,
self.engine_events.new_listener(),
)
.into_rpc()
.into(),
RethRpcModule::Debug => {
DebugApi::new(eth_api.clone(), self.blocking_pool_guard.clone())
.into_rpc()
.into()
}
RethRpcModule::Eth => {
// merge all eth handlers
let mut module = eth_api.clone().into_rpc();
@@ -989,18 +986,18 @@ where
.into_rpc()
.into()
}
// only relevant for Ethereum and configured in `EthereumAddOns`
// implementation
// TODO: can we get rid of this here?
// Custom modules are not handled here - they should be registered via
// extend_rpc_modules
RethRpcModule::Flashbots | RethRpcModule::Other(_) => Default::default(),
RethRpcModule::Miner => MinerApi::default().into_rpc().into(),
RethRpcModule::Mev => {
EthSimBundle::new(eth_api.clone(), self.blocking_pool_guard.clone())
.into_rpc()
.into()
}
// these are implementation specific and need to be handled during
// initialization and should be registered via extend_rpc_modules in the
// nodebuilder rpc addon stack
RethRpcModule::Flashbots |
RethRpcModule::Testing |
RethRpcModule::Other(_) => Default::default(),
})
.clone()
})
@@ -1008,33 +1005,6 @@ where
}
}
impl<Provider, Pool, Network, EthApi, EvmConfig, Consensus> Clone
for RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
where
EthApi: EthApiTypes,
Provider: Clone,
Pool: Clone,
Network: Clone,
EvmConfig: Clone,
Consensus: Clone,
{
fn clone(&self) -> Self {
Self {
provider: self.provider.clone(),
pool: self.pool.clone(),
network: self.network.clone(),
executor: self.executor.clone(),
evm_config: self.evm_config.clone(),
consensus: self.consensus.clone(),
eth: self.eth.clone(),
blocking_pool_guard: self.blocking_pool_guard.clone(),
modules: self.modules.clone(),
eth_config: self.eth_config.clone(),
engine_events: self.engine_events.clone(),
}
}
}
/// A builder type for configuring and launching the servers that will handle RPC requests.
///
/// Supported server transports are:

View File

@@ -9,7 +9,6 @@ use reth_ethereum_primitives::TransactionSigned;
use reth_rpc_builder::{RpcServerConfig, TransportRpcModuleConfig};
use reth_rpc_eth_api::EthApiClient;
use reth_rpc_server_types::RpcModuleSelection;
use reth_tokio_util::EventSender;
use std::{
future::Future,
sync::{
@@ -74,11 +73,8 @@ where
async fn test_rpc_middleware() {
let builder = test_rpc_builder();
let eth_api = builder.bootstrap_eth_api();
let modules = builder.build(
TransportRpcModuleConfig::set_http(RpcModuleSelection::All),
eth_api,
EventSender::new(1),
);
let modules =
builder.build(TransportRpcModuleConfig::set_http(RpcModuleSelection::All), eth_api);
let mylayer = MyMiddlewareLayer::default();

View File

@@ -7,7 +7,6 @@ use reth_rpc_builder::{
RpcServerConfig, TransportRpcModuleConfig,
};
use reth_rpc_server_types::RethRpcModule;
use reth_tokio_util::EventSender;
use crate::utils::{
launch_http, launch_http_ws_same_port, launch_ws, test_address, test_rpc_builder,
@@ -28,11 +27,8 @@ async fn test_http_addr_in_use() {
let addr = handle.http_local_addr().unwrap();
let builder = test_rpc_builder();
let eth_api = builder.bootstrap_eth_api();
let server = builder.build(
TransportRpcModuleConfig::set_http(vec![RethRpcModule::Admin]),
eth_api,
EventSender::new(1),
);
let server =
builder.build(TransportRpcModuleConfig::set_http(vec![RethRpcModule::Admin]), eth_api);
let result =
RpcServerConfig::http(Default::default()).with_http_address(addr).start(&server).await;
let err = result.unwrap_err();
@@ -45,11 +41,8 @@ async fn test_ws_addr_in_use() {
let addr = handle.ws_local_addr().unwrap();
let builder = test_rpc_builder();
let eth_api = builder.bootstrap_eth_api();
let server = builder.build(
TransportRpcModuleConfig::set_ws(vec![RethRpcModule::Admin]),
eth_api,
EventSender::new(1),
);
let server =
builder.build(TransportRpcModuleConfig::set_ws(vec![RethRpcModule::Admin]), eth_api);
let result = RpcServerConfig::ws(Default::default()).with_ws_address(addr).start(&server).await;
let err = result.unwrap_err();
assert!(is_addr_in_use_kind(&err, ServerKind::WS(addr)), "{err}");
@@ -71,7 +64,6 @@ async fn test_launch_same_port_different_modules() {
TransportRpcModuleConfig::set_ws(vec![RethRpcModule::Admin])
.with_http(vec![RethRpcModule::Eth]),
eth_api,
EventSender::new(1),
);
let addr = test_address();
let res = RpcServerConfig::ws(Default::default())
@@ -95,7 +87,6 @@ async fn test_launch_same_port_same_cors() {
TransportRpcModuleConfig::set_ws(vec![RethRpcModule::Eth])
.with_http(vec![RethRpcModule::Eth]),
eth_api,
EventSender::new(1),
);
let addr = test_address();
let res = RpcServerConfig::ws(Default::default())
@@ -117,7 +108,6 @@ async fn test_launch_same_port_different_cors() {
TransportRpcModuleConfig::set_ws(vec![RethRpcModule::Eth])
.with_http(vec![RethRpcModule::Eth]),
eth_api,
EventSender::new(1),
);
let addr = test_address();
let res = RpcServerConfig::ws(Default::default())

View File

@@ -4,7 +4,6 @@ use reth_consensus::noop::NoopConsensus;
use reth_engine_primitives::ConsensusEngineHandle;
use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_ethereum_primitives::EthPrimitives;
use reth_tokio_util::EventSender;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use reth_evm_ethereum::EthEvmConfig;
@@ -63,8 +62,7 @@ pub async fn launch_auth(secret: JwtSecret) -> AuthServerHandle {
pub async fn launch_http(modules: impl Into<RpcModuleSelection>) -> RpcServerHandle {
let builder = test_rpc_builder();
let eth_api = builder.bootstrap_eth_api();
let server =
builder.build(TransportRpcModuleConfig::set_http(modules), eth_api, EventSender::new(1));
let server = builder.build(TransportRpcModuleConfig::set_http(modules), eth_api);
RpcServerConfig::http(Default::default())
.with_http_address(test_address())
.start(&server)
@@ -76,8 +74,7 @@ pub async fn launch_http(modules: impl Into<RpcModuleSelection>) -> RpcServerHan
pub async fn launch_ws(modules: impl Into<RpcModuleSelection>) -> RpcServerHandle {
let builder = test_rpc_builder();
let eth_api = builder.bootstrap_eth_api();
let server =
builder.build(TransportRpcModuleConfig::set_ws(modules), eth_api, EventSender::new(1));
let server = builder.build(TransportRpcModuleConfig::set_ws(modules), eth_api);
RpcServerConfig::ws(Default::default())
.with_ws_address(test_address())
.start(&server)
@@ -90,11 +87,8 @@ pub async fn launch_http_ws(modules: impl Into<RpcModuleSelection>) -> RpcServer
let builder = test_rpc_builder();
let eth_api = builder.bootstrap_eth_api();
let modules = modules.into();
let server = builder.build(
TransportRpcModuleConfig::set_ws(modules.clone()).with_http(modules),
eth_api,
EventSender::new(1),
);
let server = builder
.build(TransportRpcModuleConfig::set_ws(modules.clone()).with_http(modules), eth_api);
RpcServerConfig::ws(Default::default())
.with_ws_address(test_address())
.with_ws_address(test_address())
@@ -110,11 +104,8 @@ pub async fn launch_http_ws_same_port(modules: impl Into<RpcModuleSelection>) ->
let builder = test_rpc_builder();
let modules = modules.into();
let eth_api = builder.bootstrap_eth_api();
let server = builder.build(
TransportRpcModuleConfig::set_ws(modules.clone()).with_http(modules),
eth_api,
EventSender::new(1),
);
let server = builder
.build(TransportRpcModuleConfig::set_ws(modules.clone()).with_http(modules), eth_api);
let addr = test_address();
RpcServerConfig::ws(Default::default())
.with_ws_address(addr)

View File

@@ -35,7 +35,7 @@ impl TryFromReceiptResponse<op_alloy_network::Optimism> for reth_optimism_primit
fn from_receipt_response(
receipt_response: op_alloy_rpc_types::OpTransactionReceipt,
) -> Result<Self, Self::Error> {
Ok(receipt_response.inner.inner.into_components().0.map_logs(Into::into))
Ok(receipt_response.inner.inner.map_logs(Into::into).into())
}
}
@@ -70,17 +70,14 @@ mod tests {
#[cfg(feature = "op")]
#[test]
fn test_try_from_receipt_response_optimism() {
use alloy_consensus::ReceiptWithBloom;
use op_alloy_consensus::OpReceipt;
use op_alloy_consensus::OpReceiptEnvelope;
use op_alloy_network::Optimism;
use op_alloy_rpc_types::OpTransactionReceipt;
use reth_optimism_primitives::OpReceipt;
let op_receipt = OpTransactionReceipt {
inner: alloy_rpc_types_eth::TransactionReceipt {
inner: ReceiptWithBloom {
receipt: OpReceipt::Eip1559(Default::default()),
logs_bloom: Default::default(),
},
inner: OpReceiptEnvelope::Eip1559(Default::default()),
transaction_hash: Default::default(),
transaction_index: None,
block_hash: None,

View File

@@ -90,14 +90,13 @@ impl SignableTxRequest<op_alloy_consensus::OpTxEnvelope>
) -> Result<op_alloy_consensus::OpTxEnvelope, SignTxRequestError> {
let mut tx =
self.build_typed_tx().map_err(|_| SignTxRequestError::InvalidTransactionRequest)?;
let signature = signer.sign_transaction(&mut tx).await?;
// sanity check: deposit transactions must not be signed by the user
// sanity check
if tx.is_deposit() {
return Err(SignTxRequestError::InvalidTransactionRequest);
}
let signature = signer.sign_transaction(&mut tx).await?;
Ok(tx.into_signed(signature).into())
}
}

View File

@@ -7,29 +7,18 @@ use reth_tasks::{
pool::{BlockingTaskGuard, BlockingTaskPool},
TaskSpawner,
};
use std::sync::Arc;
use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit, Semaphore};
use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit};
use crate::EthApiTypes;
/// Helpers for spawning blocking operations.
///
/// Operations can be blocking because they require lots of CPU work and/or IO.
///
/// This differentiates between workloads that are primarily CPU bound and heavier in general (such
/// as tracing tasks) and tasks that have a more balanced profile (io and cpu), such as `eth_call`
/// and alike.
///
/// This provides access to semaphores that permit how many of those are permitted concurrently.
/// It's expected that tracing related tasks are configured with a lower threshold, because not only
/// are they CPU heavy but they can also accumulate more memory for the traces.
/// Executes code on a blocking thread.
pub trait SpawnBlocking: EthApiTypes + Clone + Send + Sync + 'static {
/// Returns a handle for spawning IO heavy blocking tasks.
///
/// Runtime access in default trait method implementations.
fn io_task_spawner(&self) -> impl TaskSpawner;
/// Returns a handle for spawning __CPU heavy__ blocking tasks, such as tracing requests.
/// Returns a handle for spawning CPU heavy blocking tasks.
///
/// Thread pool access in default trait method implementations.
fn tracing_task_pool(&self) -> &BlockingTaskPool;
@@ -37,121 +26,21 @@ pub trait SpawnBlocking: EthApiTypes + Clone + Send + Sync + 'static {
/// Returns handle to semaphore for pool of CPU heavy blocking tasks.
fn tracing_task_guard(&self) -> &BlockingTaskGuard;
/// Returns handle to semaphore for blocking IO tasks.
///
/// This semaphore is used to limit concurrent blocking IO operations like `eth_call`,
/// `eth_estimateGas`, and similar methods that require EVM execution.
fn blocking_io_task_guard(&self) -> &Arc<Semaphore>;
/// Acquires a permit from the tracing task semaphore.
///
/// This should be used for __CPU heavy__ operations like `debug_traceTransaction`,
/// `debug_traceCall`, and similar tracing methods. These tasks are typically:
/// - Primarily CPU bound with intensive computation
/// - Can accumulate significant memory for trace results
/// - Expected to have lower concurrency limits than general blocking IO tasks
///
/// For blocking IO tasks like `eth_call` or `eth_estimateGas`, use
/// [`acquire_owned_blocking_io`](Self::acquire_owned_blocking_io) instead.
///
/// See also [`Semaphore::acquire_owned`](`tokio::sync::Semaphore::acquire_owned`).
fn acquire_owned_tracing(
fn acquire_owned(
&self,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
self.tracing_task_guard().clone().acquire_owned()
}
/// Acquires multiple permits from the tracing task semaphore.
///
/// This should be used for particularly heavy tracing operations that require more resources
/// than a standard trace. The permit count should reflect the expected resource consumption
/// relative to a standard tracing operation.
///
/// Like [`acquire_owned_tracing`](Self::acquire_owned_tracing), this is specifically for
/// CPU-intensive tracing tasks, not general blocking IO operations.
///
/// See also [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`).
fn acquire_many_owned_tracing(
fn acquire_many_owned(
&self,
n: u32,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
self.tracing_task_guard().clone().acquire_many_owned(n)
}
/// Acquires a permit from the blocking IO request semaphore.
///
/// This should be used for operations like `eth_call`, `eth_estimateGas`, and similar methods
/// that require EVM execution and are spawned as blocking tasks.
///
/// See also [`Semaphore::acquire_owned`](`tokio::sync::Semaphore::acquire_owned`).
fn acquire_owned_blocking_io(
&self,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
self.blocking_io_task_guard().clone().acquire_owned()
}
/// Acquires multiple permits from the blocking IO request semaphore.
///
/// This should be used for operations that may require more resources than a single permit
/// allows.
///
/// See also [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`).
fn acquire_many_owned_blocking_io(
&self,
n: u32,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
self.blocking_io_task_guard().clone().acquire_many_owned(n)
}
/// Acquires permits from the blocking IO request semaphore based on a calculated weight.
///
/// The weight determines the maximum number of concurrent requests of this type that can run.
/// For example, if the semaphore has 256 total permits and `weight=10`, then at most 10
/// concurrent requests of this type are allowed.
///
/// The permits acquired per request is calculated as `total_permits / weight`, with an
/// adjustment: if this result is even, we add 1 to ensure that `weight - 1` permits are
/// always available for other tasks, preventing complete semaphore exhaustion.
///
/// This should be used to explicitly limit concurrent requests based on their expected
/// resource consumption:
///
/// - **Block range queries**: Higher weight for larger ranges (fewer concurrent requests)
/// - **Complex calls**: Higher weight for expensive operations
/// - **Batch operations**: Higher weight for larger batches
/// - **Historical queries**: Higher weight for deeper history lookups
///
/// # Examples
///
/// ```ignore
/// // For a heavy request, use higher weight to limit concurrency
/// let weight = 20; // Allow at most 20 concurrent requests of this type
/// let _permit = self.acquire_weighted_blocking_io(weight).await?;
/// ```
///
/// This helps prevent resource exhaustion from concurrent expensive operations while allowing
/// many cheap operations to run in parallel.
///
/// See also [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`).
fn acquire_weighted_blocking_io(
&self,
weight: u32,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
let guard = self.blocking_io_task_guard();
let total_permits = guard.available_permits().max(1) as u32;
let weight = weight.max(1);
let mut permits_to_acquire = (total_permits / weight).max(1);
// If total_permits divides evenly by weight, add 1 to ensure that when `weight`
// concurrent requests are running, at least `weight - 1` permits remain available
// for other tasks
if total_permits.is_multiple_of(weight) {
permits_to_acquire += 1;
}
guard.clone().acquire_many_owned(permits_to_acquire)
}
/// Executes the future on a new blocking task.
///
/// Note: This is expected for futures that are dominated by blocking IO operations, for tracing

View File

@@ -29,7 +29,7 @@ use reth_revm::{cancelled::CancelOnDrop, database::StateProviderDatabase, db::St
use reth_rpc_convert::{RpcConvert, RpcTxReq};
use reth_rpc_eth_types::{
cache::db::StateProviderTraitObjWrapper,
error::{AsEthApiError, FromEthApiError},
error::FromEthApiError,
simulate::{self, EthSimulateError},
EthApiError, StateCacheDb,
};
@@ -159,13 +159,6 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
.context_for_next_block(&parent, this.next_env_attributes(&parent)?)
.map_err(RethError::other)
.map_err(Self::Error::from_eth_err)?;
let map_err = |e: EthApiError| -> Self::Error {
match e.as_simulate_error() {
Some(sim_err) => Self::Error::from_eth_err(EthApiError::other(sim_err)),
None => Self::Error::from_eth_err(e),
}
};
let (result, results) = if trace_transfers {
// prepare inspector to capture transfer inside the evm so they are recorded
// and included in logs
@@ -180,8 +173,7 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
default_gas_limit,
chain_id,
this.converter(),
)
.map_err(map_err)?
)?
} else {
let evm = this.evm_config().evm_with_env(&mut db, evm_env);
let builder = this.evm_config().create_block_builder(evm, &parent, ctx);
@@ -191,8 +183,7 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
default_gas_limit,
chain_id,
this.converter(),
)
.map_err(map_err)?
)?
};
parent = result.block.clone_sealed_header();
@@ -221,7 +212,6 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
overrides: EvmOverrides,
) -> impl Future<Output = Result<Bytes, Self::Error>> + Send {
async move {
let _permit = self.acquire_owned_blocking_io().await;
let res =
self.transact_call_at(request, block_number.unwrap_or_default(), overrides).await?;

View File

@@ -420,7 +420,6 @@ impl<H: BlockHeader> BuildPendingEnv<H> for NextBlockEnvAttributes {
gas_limit: parent.gas_limit(),
parent_beacon_block_root: parent.parent_beacon_block_root(),
withdrawals: parent.withdrawals_root().map(|_| Default::default()),
extra_data: parent.extra_data().clone(),
}
}
}

View File

@@ -16,8 +16,7 @@ use reth_rpc_eth_types::{
error::FromEvmError, EthApiError, PendingBlockEnv, RpcInvalidTransactionError,
};
use reth_storage_api::{
BlockIdReader, BlockNumReader, BlockReaderIdExt, StateProvider, StateProviderBox,
StateProviderFactory,
BlockIdReader, BlockNumReader, StateProvider, StateProviderBox, StateProviderFactory,
};
use reth_transaction_pool::TransactionPool;
@@ -97,7 +96,7 @@ pub trait EthState: LoadState + SpawnBlocking {
{
Ok(async move {
let _permit = self
.acquire_owned_tracing()
.acquire_owned()
.await
.map_err(RethError::other)
.map_err(EthApiError::Internal)?;
@@ -274,20 +273,21 @@ pub trait LoadState:
let PendingBlockEnv { evm_env, origin } = self.pending_block_env_and_cfg()?;
Ok((evm_env, origin.state_block_id()))
} else {
// we can assume that the blockid will be predominantly `Latest` (e.g. for
// `eth_call`) and if requested by number or hash we can quickly fetch just the
// header
let header = RpcNodeCore::provider(self)
.sealed_header_by_id(at)
// Use cached values if there is no pending block
let block_hash = RpcNodeCore::provider(self)
.block_hash_for_id(at)
.map_err(Self::Error::from_eth_err)?
.ok_or_else(|| EthApiError::HeaderNotFound(at))?;
.ok_or(EthApiError::HeaderNotFound(at))?;
let header =
self.cache().get_header(block_hash).await.map_err(Self::Error::from_eth_err)?;
let evm_env = self
.evm_config()
.evm_env(&header)
.map_err(RethError::other)
.map_err(Self::Error::from_eth_err)?;
Ok((evm_env, header.hash().into()))
Ok((evm_env, block_hash.into()))
}
}
}

View File

@@ -8,10 +8,9 @@ use crate::{
};
use reqwest::Url;
use reth_rpc_server_types::constants::{
default_max_tracing_requests, DEFAULT_ETH_PROOF_WINDOW, DEFAULT_MAX_BLOCKING_IO_REQUEST,
DEFAULT_MAX_BLOCKS_PER_FILTER, DEFAULT_MAX_LOGS_PER_RESPONSE, DEFAULT_MAX_SIMULATE_BLOCKS,
DEFAULT_MAX_TRACE_FILTER_BLOCKS, DEFAULT_PROOF_PERMITS,
RPC_DEFAULT_SEND_RAW_TX_SYNC_TIMEOUT_SECS,
default_max_tracing_requests, DEFAULT_ETH_PROOF_WINDOW, DEFAULT_MAX_BLOCKS_PER_FILTER,
DEFAULT_MAX_LOGS_PER_RESPONSE, DEFAULT_MAX_SIMULATE_BLOCKS, DEFAULT_MAX_TRACE_FILTER_BLOCKS,
DEFAULT_PROOF_PERMITS, RPC_DEFAULT_SEND_RAW_TX_SYNC_TIMEOUT_SECS,
};
use serde::{Deserialize, Serialize};
@@ -69,15 +68,6 @@ pub struct EthConfig {
pub eth_proof_window: u64,
/// The maximum number of tracing calls that can be executed in concurrently.
pub max_tracing_requests: usize,
/// The maximum number of blocking IO calls that can be executed in concurrently.
///
/// Requests such as `eth_call`, `eth_estimateGas` and alike require evm execution, which is
/// considered blocking since it's usually more heavy on the IO side but also CPU constrained.
/// It is expected that these are spawned as short lived blocking tokio tasks. This config
/// determines how many can be spawned concurrently, to avoid a build up in the tokio's
/// blocking pool queue since there's only a limited number of threads available. This setting
/// restricts how many tasks are spawned concurrently.
pub max_blocking_io_requests: usize,
/// Maximum number of blocks for `trace_filter` requests.
pub max_trace_filter_blocks: u64,
/// Maximum number of blocks that could be scanned per filter request in `eth_getLogs` calls.
@@ -126,7 +116,6 @@ impl Default for EthConfig {
gas_oracle: GasPriceOracleConfig::default(),
eth_proof_window: DEFAULT_ETH_PROOF_WINDOW,
max_tracing_requests: default_max_tracing_requests(),
max_blocking_io_requests: DEFAULT_MAX_BLOCKING_IO_REQUEST,
max_trace_filter_blocks: DEFAULT_MAX_TRACE_FILTER_BLOCKS,
max_blocks_per_filter: DEFAULT_MAX_BLOCKS_PER_FILTER,
max_logs_per_response: DEFAULT_MAX_LOGS_PER_RESPONSE,
@@ -163,12 +152,6 @@ impl EthConfig {
self
}
/// Configures the maximum number of blocking IO requests
pub const fn max_blocking_io_requests(mut self, max_requests: usize) -> Self {
self.max_blocking_io_requests = max_requests;
self
}
/// Configures the maximum block length to scan per `eth_getLogs` request
pub const fn max_blocks_per_filter(mut self, max_blocks: u64) -> Self {
self.max_blocks_per_filter = max_blocks;

View File

@@ -1,7 +1,7 @@
//! Helper traits to wrap generic l1 errors, in network specific error type configured in
//! `reth_rpc_eth_api::EthApiTypes`.
use crate::{simulate::EthSimulateError, EthApiError, RevertError};
use crate::{EthApiError, RevertError};
use alloy_primitives::Bytes;
use reth_errors::ProviderError;
use reth_evm::{ConfigureEvm, EvmErrorFor, HaltReasonFor};
@@ -74,32 +74,6 @@ pub trait AsEthApiError {
false
}
/// Returns [`EthSimulateError`] if this error maps to a simulate-specific error code.
fn as_simulate_error(&self) -> Option<EthSimulateError> {
let err = self.as_err()?;
match err {
EthApiError::InvalidTransaction(tx_err) => match tx_err {
RpcInvalidTransactionError::NonceTooLow { tx, state } => {
Some(EthSimulateError::NonceTooLow { tx: *tx, state: *state })
}
RpcInvalidTransactionError::NonceTooHigh => Some(EthSimulateError::NonceTooHigh),
RpcInvalidTransactionError::FeeCapTooLow => {
Some(EthSimulateError::BaseFeePerGasTooLow)
}
RpcInvalidTransactionError::GasTooLow => Some(EthSimulateError::IntrinsicGasTooLow),
RpcInvalidTransactionError::InsufficientFunds { cost, balance } => {
Some(EthSimulateError::InsufficientFunds { cost: *cost, balance: *balance })
}
RpcInvalidTransactionError::SenderNoEOA => Some(EthSimulateError::SenderNotEOA),
RpcInvalidTransactionError::MaxInitCodeSizeExceeded => {
Some(EthSimulateError::MaxInitCodeSizeExceeded)
}
_ => None,
},
_ => None,
}
}
}
impl AsEthApiError for EthApiError {

View File

@@ -23,7 +23,7 @@ use reth_storage_api::noop::NoopProvider;
use revm::{
context::Block,
context_interface::result::ExecutionResult,
primitives::{Address, Bytes, TxKind, U256},
primitives::{Address, Bytes, TxKind},
Database,
};
@@ -36,67 +36,12 @@ pub enum EthSimulateError {
/// Max gas limit for entire operation exceeded.
#[error("Client adjustable limit reached")]
GasLimitReached,
/// Block number in sequence did not increase.
#[error("Block number in sequence did not increase")]
BlockNumberInvalid,
/// Block timestamp in sequence did not increase or stay the same.
#[error("Block timestamp in sequence did not increase")]
BlockTimestampInvalid,
/// Transaction nonce is too low.
#[error("nonce too low: next nonce {state}, tx nonce {tx}")]
NonceTooLow {
/// Transaction nonce.
tx: u64,
/// Current state nonce.
state: u64,
},
/// Transaction nonce is too high.
#[error("nonce too high")]
NonceTooHigh,
/// Transaction's baseFeePerGas is too low.
#[error("max fee per gas less than block base fee")]
BaseFeePerGasTooLow,
/// Not enough gas provided to pay for intrinsic gas.
#[error("intrinsic gas too low")]
IntrinsicGasTooLow,
/// Insufficient funds to pay for gas fees and value.
#[error("insufficient funds for gas * price + value: have {balance} want {cost}")]
InsufficientFunds {
/// Transaction cost.
cost: U256,
/// Sender balance.
balance: U256,
},
/// Sender is not an EOA.
#[error("sender is not an EOA")]
SenderNotEOA,
/// Max init code size exceeded.
#[error("max initcode size exceeded")]
MaxInitCodeSizeExceeded,
/// `MovePrecompileToAddress` referenced itself in replacement.
#[error("MovePrecompileToAddress referenced itself")]
PrecompileSelfReference,
/// Multiple `MovePrecompileToAddress` referencing the same address.
#[error("Multiple MovePrecompileToAddress referencing the same address")]
PrecompileDuplicateAddress,
}
impl EthSimulateError {
/// Returns the JSON-RPC error code for a `eth_simulateV1` error.
pub const fn error_code(&self) -> i32 {
const fn error_code(&self) -> i32 {
match self {
Self::NonceTooLow { .. } => -38010,
Self::NonceTooHigh => -38011,
Self::BaseFeePerGasTooLow => -38012,
Self::IntrinsicGasTooLow => -38013,
Self::InsufficientFunds { .. } => -38014,
Self::BlockGasLimitExceeded => -38015,
Self::BlockNumberInvalid => -38020,
Self::BlockTimestampInvalid => -38021,
Self::PrecompileSelfReference => -38022,
Self::PrecompileDuplicateAddress => -38023,
Self::SenderNotEOA => -38024,
Self::MaxInitCodeSizeExceeded => -38025,
Self::GasLimitReached => -38026,
}
}

View File

@@ -18,20 +18,6 @@ pub const DEFAULT_MAX_LOGS_PER_RESPONSE: usize = 20_000;
/// The default maximum number of blocks for `trace_filter` requests.
pub const DEFAULT_MAX_TRACE_FILTER_BLOCKS: u64 = 100;
/// Setting for how many concurrent (heavier) _blocking_ IO requests are allowed.
///
/// What is considered a blocking IO request can depend on the RPC method. In general anything that
/// requires IO is considered blocking and should be spawned as blocking. This setting is however,
/// primarily intended for heavier blocking requests that require evm execution for example,
/// `eth_call` and alike. This is intended to be used with a semaphore that must be acquired before
/// a new task is spawned to avoid unnecessary pooling if the number of inflight requests exceeds
/// the available threads in the pool.
///
/// tokio's blocking pool, has a default of 512 and could grow unbounded, since requests like
/// `eth_call` also require a lot of cpu which will occupy the thread, we can set this to a lower
/// value.
pub const DEFAULT_MAX_BLOCKING_IO_REQUEST: usize = 256;
/// The default maximum number tracing requests we're allowing concurrently.
/// Tracing is mostly CPU bound so we're limiting the number of concurrent requests to something
/// lower that the number of cores, in order to minimize the impact on the rest of the system.

View File

@@ -323,8 +323,6 @@ pub enum RethRpcModule {
Miner,
/// `mev_` module
Mev,
/// `testing_` module
Testing,
/// Custom RPC module not part of the standard set
#[strum(default)]
#[serde(untagged)]
@@ -349,7 +347,6 @@ impl RethRpcModule {
Self::Flashbots,
Self::Miner,
Self::Mev,
Self::Testing,
];
/// Returns the number of standard variants (excludes Other)
@@ -409,7 +406,6 @@ impl AsRef<str> for RethRpcModule {
Self::Flashbots => "flashbots",
Self::Miner => "miner",
Self::Mev => "mev",
Self::Testing => "testing",
}
}
}
@@ -432,7 +428,6 @@ impl FromStr for RethRpcModule {
"flashbots" => Self::Flashbots,
"miner" => Self::Miner,
"mev" => Self::Mev,
"testing" => Self::Testing,
// Any unknown module becomes Other
other => Self::Other(other.to_string()),
})

View File

@@ -38,8 +38,6 @@ reth-rpc-server-types.workspace = true
reth-network-types.workspace = true
reth-consensus.workspace = true
reth-consensus-common.workspace = true
reth-ethereum-primitives.workspace = true
reth-ethereum-engine-primitives.workspace = true
reth-node-api.workspace = true
reth-trie-common.workspace = true

View File

@@ -7,29 +7,25 @@ use alloy_evm::env::BlockEnvironment;
use alloy_genesis::ChainConfig;
use alloy_primitives::{hex::decode, uint, Address, Bytes, B256};
use alloy_rlp::{Decodable, Encodable};
use alloy_rpc_types::BlockTransactionsKind;
use alloy_rpc_types_debug::ExecutionWitness;
use alloy_rpc_types_eth::{state::EvmOverrides, BlockError, Bundle, StateContext};
use alloy_rpc_types_eth::{
state::EvmOverrides, Block as RpcBlock, BlockError, Bundle, StateContext,
};
use alloy_rpc_types_trace::geth::{
BlockTraceResult, GethDebugTracingCallOptions, GethDebugTracingOptions, GethTrace, TraceResult,
};
use async_trait::async_trait;
use futures::Stream;
use jsonrpsee::core::RpcResult;
use parking_lot::RwLock;
use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks};
use reth_engine_primitives::ConsensusEngineEvent;
use reth_errors::RethError;
use reth_evm::{execute::Executor, ConfigureEvm, EvmEnvFor};
use reth_primitives_traits::{
Block as BlockTrait, BlockBody, BlockTy, ReceiptWithBloom, RecoveredBlock,
};
use reth_primitives_traits::{Block as _, BlockBody, ReceiptWithBloom, RecoveredBlock};
use reth_revm::{db::State, witness::ExecutionWitnessRecord};
use reth_rpc_api::DebugApiServer;
use reth_rpc_convert::RpcTxReq;
use reth_rpc_eth_api::{
helpers::{EthTransactions, TraceExt},
FromEthApiError, RpcConvert, RpcNodeCore,
EthApiTypes, FromEthApiError, RpcNodeCore,
};
use reth_rpc_eth_types::EthApiError;
use reth_rpc_server_types::{result::internal_rpc_err, ToRpcResult};
@@ -37,52 +33,26 @@ use reth_storage_api::{
BlockIdReader, BlockReaderIdExt, HeaderProvider, ProviderBlock, ReceiptProviderIdExt,
StateProofProvider, StateProviderFactory, StateRootProvider, TransactionVariant,
};
use reth_tasks::{pool::BlockingTaskGuard, TaskSpawner};
use reth_tasks::pool::BlockingTaskGuard;
use reth_trie_common::{updates::TrieUpdates, HashedPostState};
use revm::DatabaseCommit;
use revm_inspectors::tracing::{DebugInspector, TransactionContext};
use serde::{Deserialize, Serialize};
use std::{collections::VecDeque, sync::Arc};
use std::sync::Arc;
use tokio::sync::{AcquireError, OwnedSemaphorePermit};
use tokio_stream::StreamExt;
/// `debug` API implementation.
///
/// This type provides the functionality for handling `debug` related requests.
pub struct DebugApi<Eth: RpcNodeCore> {
pub struct DebugApi<Eth> {
inner: Arc<DebugApiInner<Eth>>,
}
impl<Eth> DebugApi<Eth>
where
Eth: RpcNodeCore,
{
// === impl DebugApi ===
impl<Eth> DebugApi<Eth> {
/// Create a new instance of the [`DebugApi`]
pub fn new(
eth_api: Eth,
blocking_task_guard: BlockingTaskGuard,
executor: impl TaskSpawner,
mut stream: impl Stream<Item = ConsensusEngineEvent<Eth::Primitives>> + Send + Unpin + 'static,
) -> Self {
let bad_block_store = BadBlockStore::default();
let inner = Arc::new(DebugApiInner {
eth_api,
blocking_task_guard,
bad_block_store: bad_block_store.clone(),
});
// Spawn a task caching bad blocks
executor.spawn(Box::pin(async move {
while let Some(event) = stream.next().await {
if let ConsensusEngineEvent::InvalidBlock(block) = event &&
let Ok(recovered) =
RecoveredBlock::try_recover_sealed(block.as_ref().clone())
{
bad_block_store.insert(recovered);
}
}
}));
pub fn new(eth_api: Eth, blocking_task_guard: BlockingTaskGuard) -> Self {
let inner = Arc::new(DebugApiInner { eth_api, blocking_task_guard });
Self { inner }
}
@@ -90,7 +60,9 @@ where
pub fn eth_api(&self) -> &Eth {
&self.inner.eth_api
}
}
impl<Eth: RpcNodeCore> DebugApi<Eth> {
/// Access the underlying provider.
pub fn provider(&self) -> &Eth::Provider {
self.inner.eth_api.provider()
@@ -101,7 +73,7 @@ where
impl<Eth> DebugApi<Eth>
where
Eth: TraceExt,
Eth: EthApiTypes + TraceExt + 'static,
{
/// Acquires a permit to execute a tracing call.
async fn acquire_trace_permit(&self) -> Result<OwnedSemaphorePermit, AcquireError> {
@@ -638,7 +610,7 @@ where
#[async_trait]
impl<Eth> DebugApiServer<RpcTxReq<Eth::NetworkTypes>> for DebugApi<Eth>
where
Eth: EthTransactions + TraceExt,
Eth: EthApiTypes + EthTransactions + TraceExt + 'static,
{
/// Handler for `debug_getRawHeader`
async fn raw_header(&self, block_id: BlockId) -> RpcResult<Bytes> {
@@ -688,7 +660,7 @@ where
/// Handler for `debug_getRawTransactions`
/// Returns the bytes of the transaction for the given hash.
async fn raw_transactions(&self, block_id: BlockId) -> RpcResult<Vec<Bytes>> {
let block: RecoveredBlock<BlockTy<Eth::Primitives>> = self
let block = self
.provider()
.block_with_senders_by_id(block_id, TransactionVariant::NoHash)
.to_rpc_result()?
@@ -709,36 +681,8 @@ where
}
/// Handler for `debug_getBadBlocks`
async fn bad_blocks(&self) -> RpcResult<Vec<serde_json::Value>> {
let blocks = self.inner.bad_block_store.all();
let mut bad_blocks = Vec::with_capacity(blocks.len());
#[derive(Serialize, Deserialize)]
struct BadBlockSerde<T> {
block: T,
hash: B256,
rlp: Bytes,
}
for block in blocks {
let rlp = alloy_rlp::encode(block.sealed_block()).into();
let hash = block.hash();
let block = block
.clone_into_rpc_block(
BlockTransactionsKind::Full,
|tx, tx_info| self.eth_api().converter().fill(tx, tx_info),
|header, size| self.eth_api().converter().convert_header(header, size),
)
.map_err(|err| Eth::Error::from(err).into())?;
let bad_block = serde_json::to_value(BadBlockSerde { block, hash, rlp })
.map_err(|err| EthApiError::other(internal_rpc_err(err.to_string())))?;
bad_blocks.push(bad_block);
}
Ok(bad_blocks)
async fn bad_blocks(&self) -> RpcResult<Vec<RpcBlock>> {
Ok(vec![])
}
/// Handler for `debug_traceChain`
@@ -1101,66 +1045,21 @@ where
}
}
impl<Eth: RpcNodeCore> std::fmt::Debug for DebugApi<Eth> {
impl<Eth> std::fmt::Debug for DebugApi<Eth> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DebugApi").finish_non_exhaustive()
}
}
impl<Eth: RpcNodeCore> Clone for DebugApi<Eth> {
impl<Eth> Clone for DebugApi<Eth> {
fn clone(&self) -> Self {
Self { inner: Arc::clone(&self.inner) }
}
}
struct DebugApiInner<Eth: RpcNodeCore> {
struct DebugApiInner<Eth> {
/// The implementation of `eth` API
eth_api: Eth,
// restrict the number of concurrent calls to blocking calls
blocking_task_guard: BlockingTaskGuard,
/// Cache for bad blocks.
bad_block_store: BadBlockStore<BlockTy<Eth::Primitives>>,
}
/// A bounded, deduplicating store of recently observed bad blocks.
#[derive(Clone, Debug)]
struct BadBlockStore<B: BlockTrait> {
inner: Arc<RwLock<VecDeque<Arc<RecoveredBlock<B>>>>>,
limit: usize,
}
impl<B: BlockTrait> BadBlockStore<B> {
/// Creates a new store with the given capacity.
fn new(limit: usize) -> Self {
Self { inner: Arc::new(RwLock::new(VecDeque::with_capacity(limit))), limit }
}
/// Inserts a recovered block, keeping only the most recent `limit` entries and deduplicating
/// by block hash.
fn insert(&self, block: RecoveredBlock<B>) {
let hash = block.hash();
let mut guard = self.inner.write();
// skip if we already recorded this bad block , and keep original ordering
if guard.iter().any(|b| b.hash() == hash) {
return;
}
guard.push_back(Arc::new(block));
while guard.len() > self.limit {
guard.pop_front();
}
}
/// Returns all cached bad blocks ordered from newest to oldest.
fn all(&self) -> Vec<Arc<RecoveredBlock<B>>> {
let guard = self.inner.read();
guard.iter().rev().cloned().collect()
}
}
impl<B: BlockTrait> Default for BadBlockStore<B> {
fn default() -> Self {
Self::new(64)
}
}

View File

@@ -15,8 +15,7 @@ use reth_rpc_eth_types::{
FeeHistoryCacheConfig, ForwardConfig, GasCap, GasPriceOracle, GasPriceOracleConfig,
};
use reth_rpc_server_types::constants::{
DEFAULT_ETH_PROOF_WINDOW, DEFAULT_MAX_BLOCKING_IO_REQUEST, DEFAULT_MAX_SIMULATE_BLOCKS,
DEFAULT_PROOF_PERMITS,
DEFAULT_ETH_PROOF_WINDOW, DEFAULT_MAX_SIMULATE_BLOCKS, DEFAULT_PROOF_PERMITS,
};
use reth_tasks::{pool::BlockingTaskPool, TaskSpawner, TokioTaskExecutor};
use std::{sync::Arc, time::Duration};
@@ -42,7 +41,6 @@ pub struct EthApiBuilder<N: RpcNodeCore, Rpc, NextEnv = ()> {
task_spawner: Box<dyn TaskSpawner + 'static>,
next_env: NextEnv,
max_batch_size: usize,
max_blocking_io_requests: usize,
pending_block_kind: PendingBlockKind,
raw_tx_forwarder: ForwardConfig,
send_raw_transaction_sync_timeout: Duration,
@@ -94,7 +92,6 @@ impl<N: RpcNodeCore, Rpc, NextEnv> EthApiBuilder<N, Rpc, NextEnv> {
task_spawner,
next_env,
max_batch_size,
max_blocking_io_requests,
pending_block_kind,
raw_tx_forwarder,
send_raw_transaction_sync_timeout,
@@ -116,7 +113,6 @@ impl<N: RpcNodeCore, Rpc, NextEnv> EthApiBuilder<N, Rpc, NextEnv> {
task_spawner,
next_env,
max_batch_size,
max_blocking_io_requests,
pending_block_kind,
raw_tx_forwarder,
send_raw_transaction_sync_timeout,
@@ -149,7 +145,6 @@ where
eth_state_cache_config: Default::default(),
next_env: Default::default(),
max_batch_size: 1,
max_blocking_io_requests: DEFAULT_MAX_BLOCKING_IO_REQUEST,
pending_block_kind: PendingBlockKind::Full,
raw_tx_forwarder: ForwardConfig::default(),
send_raw_transaction_sync_timeout: Duration::from_secs(30),
@@ -189,7 +184,6 @@ where
gas_oracle_config,
next_env,
max_batch_size,
max_blocking_io_requests,
pending_block_kind,
raw_tx_forwarder,
send_raw_transaction_sync_timeout,
@@ -211,7 +205,6 @@ where
gas_oracle_config,
next_env,
max_batch_size,
max_blocking_io_requests,
pending_block_kind,
raw_tx_forwarder,
send_raw_transaction_sync_timeout,
@@ -240,7 +233,6 @@ where
gas_oracle_config,
next_env: _,
max_batch_size,
max_blocking_io_requests,
pending_block_kind,
raw_tx_forwarder,
send_raw_transaction_sync_timeout,
@@ -262,7 +254,6 @@ where
gas_oracle_config,
next_env,
max_batch_size,
max_blocking_io_requests,
pending_block_kind,
raw_tx_forwarder,
send_raw_transaction_sync_timeout,
@@ -344,12 +335,6 @@ where
self
}
/// Sets the maximum number of concurrent blocking IO requests.
pub const fn max_blocking_io_requests(mut self, max_blocking_io_requests: usize) -> Self {
self.max_blocking_io_requests = max_blocking_io_requests;
self
}
/// Sets the pending block kind
pub const fn pending_block_kind(mut self, pending_block_kind: PendingBlockKind) -> Self {
self.pending_block_kind = pending_block_kind;
@@ -497,7 +482,6 @@ where
task_spawner,
next_env,
max_batch_size,
max_blocking_io_requests,
pending_block_kind,
raw_tx_forwarder,
send_raw_transaction_sync_timeout,
@@ -539,7 +523,6 @@ where
rpc_converter,
next_env,
max_batch_size,
max_blocking_io_requests,
pending_block_kind,
raw_tx_forwarder.forwarder_client(),
send_raw_transaction_sync_timeout,

View File

@@ -33,7 +33,7 @@ use reth_transaction_pool::{
blobstore::BlobSidecarConverter, noop::NoopTransactionPool, AddedTransactionOutcome,
BatchTxProcessor, BatchTxRequest, TransactionPool,
};
use tokio::sync::{broadcast, mpsc, Mutex, Semaphore};
use tokio::sync::{broadcast, mpsc, Mutex};
const DEFAULT_BROADCAST_CAPACITY: usize = 2000;
@@ -152,7 +152,6 @@ where
proof_permits: usize,
rpc_converter: Rpc,
max_batch_size: usize,
max_blocking_io_requests: usize,
pending_block_kind: PendingBlockKind,
raw_tx_forwarder: ForwardConfig,
send_raw_transaction_sync_timeout: Duration,
@@ -172,7 +171,6 @@ where
rpc_converter,
(),
max_batch_size,
max_blocking_io_requests,
pending_block_kind,
raw_tx_forwarder.forwarder_client(),
send_raw_transaction_sync_timeout,
@@ -265,11 +263,6 @@ where
fn tracing_task_guard(&self) -> &BlockingTaskGuard {
self.inner.blocking_task_guard()
}
#[inline]
fn blocking_io_task_guard(&self) -> &std::sync::Arc<tokio::sync::Semaphore> {
self.inner.blocking_io_request_semaphore()
}
}
/// Container type `EthApi`
@@ -303,9 +296,6 @@ pub struct EthApiInner<N: RpcNodeCore, Rpc: RpcConvert> {
/// Guard for getproof calls
blocking_task_guard: BlockingTaskGuard,
/// Semaphore to limit concurrent blocking IO requests (`eth_call`, `eth_estimateGas`, etc.)
blocking_io_request_semaphore: Arc<Semaphore>,
/// Transaction broadcast channel
raw_tx_sender: broadcast::Sender<Bytes>,
@@ -356,7 +346,6 @@ where
converter: Rpc,
next_env: impl PendingEnvBuilder<N::Evm>,
max_batch_size: usize,
max_blocking_io_requests: usize,
pending_block_kind: PendingBlockKind,
raw_tx_forwarder: Option<RpcClient>,
send_raw_transaction_sync_timeout: Duration,
@@ -395,7 +384,6 @@ where
blocking_task_pool,
fee_history_cache,
blocking_task_guard: BlockingTaskGuard::new(proof_permits),
blocking_io_request_semaphore: Arc::new(Semaphore::new(max_blocking_io_requests)),
raw_tx_sender,
raw_tx_forwarder,
converter,
@@ -452,8 +440,6 @@ where
}
/// Returns a handle to the blocking thread pool.
///
/// This is intended for tasks that are CPU bound.
#[inline]
pub const fn blocking_task_pool(&self) -> &BlockingTaskPool {
&self.blocking_task_pool
@@ -539,7 +525,7 @@ where
/// Returns the transaction batch sender
#[inline]
pub const fn tx_batch_sender(
const fn tx_batch_sender(
&self,
) -> &mpsc::UnboundedSender<BatchTxRequest<<N::Pool as TransactionPool>::Transaction>> {
&self.tx_batch_sender
@@ -590,12 +576,6 @@ where
pub const fn evm_memory_limit(&self) -> u64 {
self.evm_memory_limit
}
/// Returns a reference to the blocking IO request semaphore.
#[inline]
pub const fn blocking_io_request_semaphore(&self) -> &Arc<Semaphore> {
&self.blocking_io_request_semaphore
}
}
#[cfg(test)]

View File

@@ -42,7 +42,6 @@ mod net;
mod otterscan;
mod reth;
mod rpc;
mod testing;
mod trace;
mod txpool;
mod validation;
@@ -59,7 +58,6 @@ pub use otterscan::OtterscanApi;
pub use reth::RethApi;
pub use reth_rpc_convert::RpcTypes;
pub use rpc::RPCApi;
pub use testing::TestingApi;
pub use trace::TraceApi;
pub use txpool::TxPoolApi;
pub use validation::{ValidationApi, ValidationApiConfig};

View File

@@ -1,127 +0,0 @@
//! Implementation of the `testing` namespace.
//!
//! This exposes `testing_buildBlockV1`, intended for non-production/debug use.
use alloy_consensus::{Header, Transaction};
use alloy_evm::Evm;
use alloy_primitives::U256;
use alloy_rpc_types_engine::ExecutionPayloadEnvelopeV5;
use async_trait::async_trait;
use jsonrpsee::core::RpcResult;
use reth_errors::RethError;
use reth_ethereum_engine_primitives::EthBuiltPayload;
use reth_ethereum_primitives::EthPrimitives;
use reth_evm::{execute::BlockBuilder, ConfigureEvm, NextBlockEnvAttributes};
use reth_primitives_traits::{AlloyBlockHeader as BlockTrait, Recovered, TxTy};
use reth_revm::{database::StateProviderDatabase, db::State};
use reth_rpc_api::{TestingApiServer, TestingBuildBlockRequestV1};
use reth_rpc_eth_api::{helpers::Call, FromEthApiError};
use reth_rpc_eth_types::{utils::recover_raw_transaction, EthApiError};
use reth_storage_api::{BlockReader, HeaderProvider};
use revm::context::Block;
use std::sync::Arc;
/// Testing API handler.
#[derive(Debug, Clone)]
pub struct TestingApi<Eth, Evm> {
eth_api: Eth,
evm_config: Evm,
}
impl<Eth, Evm> TestingApi<Eth, Evm> {
/// Create a new testing API handler.
pub const fn new(eth_api: Eth, evm_config: Evm) -> Self {
Self { eth_api, evm_config }
}
}
impl<Eth, Evm> TestingApi<Eth, Evm>
where
Eth: Call<Provider: BlockReader<Header = Header>>,
Evm: ConfigureEvm<NextBlockEnvCtx = NextBlockEnvAttributes, Primitives = EthPrimitives>
+ 'static,
{
async fn build_block_v1(
&self,
request: TestingBuildBlockRequestV1,
) -> Result<ExecutionPayloadEnvelopeV5, Eth::Error> {
let evm_config = self.evm_config.clone();
self.eth_api
.spawn_with_state_at_block(request.parent_block_hash, move |eth_api, state| {
let state = state.database.0;
let mut db = State::builder()
.with_bundle_update()
.with_database(StateProviderDatabase::new(&state))
.build();
let parent = eth_api
.provider()
.sealed_header_by_hash(request.parent_block_hash)?
.ok_or_else(|| {
EthApiError::HeaderNotFound(request.parent_block_hash.into())
})?;
let env_attrs = NextBlockEnvAttributes {
timestamp: request.payload_attributes.timestamp,
suggested_fee_recipient: request.payload_attributes.suggested_fee_recipient,
prev_randao: request.payload_attributes.prev_randao,
gas_limit: parent.gas_limit(),
parent_beacon_block_root: request.payload_attributes.parent_beacon_block_root,
withdrawals: request.payload_attributes.withdrawals.map(Into::into),
extra_data: request.extra_data.unwrap_or_default(),
};
let mut builder = evm_config
.builder_for_next_block(&mut db, &parent, env_attrs)
.map_err(RethError::other)
.map_err(Eth::Error::from_eth_err)?;
builder.apply_pre_execution_changes().map_err(Eth::Error::from_eth_err)?;
let mut total_fees = U256::ZERO;
let base_fee = builder.evm_mut().block().basefee();
for tx in request.transactions {
let tx: Recovered<TxTy<Evm::Primitives>> = recover_raw_transaction(&tx)?;
let tip = tx.effective_tip_per_gas(base_fee).unwrap_or_default();
let gas_used =
builder.execute_transaction(tx).map_err(Eth::Error::from_eth_err)?;
total_fees += U256::from(tip) * U256::from(gas_used);
}
let outcome = builder.finish(&state).map_err(Eth::Error::from_eth_err)?;
let requests = outcome
.block
.requests_hash()
.is_some()
.then_some(outcome.execution_result.requests);
EthBuiltPayload::new(
alloy_rpc_types_engine::PayloadId::default(),
Arc::new(outcome.block.into_sealed_block()),
total_fees,
requests,
)
.try_into_v5()
.map_err(RethError::other)
.map_err(Eth::Error::from_eth_err)
})
.await
}
}
#[async_trait]
impl<Eth, Evm> TestingApiServer for TestingApi<Eth, Evm>
where
Eth: Call<Provider: BlockReader<Header = Header>>,
Evm: ConfigureEvm<NextBlockEnvCtx = NextBlockEnvAttributes, Primitives = EthPrimitives>
+ 'static,
{
/// Handles `testing_buildBlockV1` by gating concurrency via a semaphore and offloading heavy
/// work to the blocking pool to avoid stalling the async runtime.
async fn build_block_v1(
&self,
request: TestingBuildBlockRequestV1,
) -> RpcResult<ExecutionPayloadEnvelopeV5> {
self.build_block_v1(request).await.map_err(Into::into)
}
}

View File

@@ -318,14 +318,6 @@ impl<T: Table, CURSOR: DbCursorRW<T> + DbCursorRO<T>> RangeWalker<'_, T, CURSOR>
}
}
impl<T: DupSort, CURSOR: DbDupCursorRW<T> + DbCursorRO<T>> RangeWalker<'_, T, CURSOR> {
/// Delete all duplicate entries for current key that walker points to.
pub fn delete_current_duplicates(&mut self) -> Result<(), DatabaseError> {
self.start.take();
self.cursor.delete_current_duplicates()
}
}
/// Provides an iterator to `Cursor` when handling a `DupSort` table.
///
/// Reason why we have two lifetimes is to distinguish between `'cursor` lifetime

View File

@@ -22,12 +22,6 @@ pub struct StorageSettings {
/// Whether `StoragesHistory` is stored in `RocksDB`.
#[serde(default)]
pub storages_history_in_rocksdb: bool,
/// Whether `TransactionHashNumbers` is stored in `RocksDB`.
#[serde(default)]
pub transaction_hash_numbers_in_rocksdb: bool,
/// Whether `AccountsHistory` is stored in `RocksDB`.
#[serde(default)]
pub account_history_in_rocksdb: bool,
}
impl StorageSettings {
@@ -41,8 +35,6 @@ impl StorageSettings {
receipts_in_static_files: false,
transaction_senders_in_static_files: false,
storages_history_in_rocksdb: false,
transaction_hash_numbers_in_rocksdb: false,
account_history_in_rocksdb: false,
}
}
@@ -63,16 +55,4 @@ impl StorageSettings {
self.storages_history_in_rocksdb = value;
self
}
/// Sets the `transaction_hash_numbers_in_rocksdb` flag to the provided value.
pub const fn with_transaction_hash_numbers_in_rocksdb(mut self, value: bool) -> Self {
self.transaction_hash_numbers_in_rocksdb = value;
self
}
/// Sets the `account_history_in_rocksdb` flag to the provided value.
pub const fn with_account_history_in_rocksdb(mut self, value: bool) -> Self {
self.account_history_in_rocksdb = value;
self
}
}

View File

@@ -1,10 +1,8 @@
//! Generic reader and writer abstractions for interacting with either database tables or static
//! files.
use std::{marker::PhantomData, ops::Range};
use std::ops::Range;
#[cfg(all(unix, feature = "rocksdb"))]
use crate::providers::rocksdb::RocksDBWriteMode;
use crate::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut},
StaticFileProviderFactory,
@@ -26,8 +24,8 @@ use reth_storage_errors::provider::ProviderResult;
use strum::{Display, EnumIs};
/// Type alias for [`EitherReader`] constructors.
type EitherReaderTy<'a, P, T> =
EitherReader<'a, CursorTy<<P as DBProvider>::Tx, T>, <P as NodePrimitivesProvider>::Primitives>;
type EitherReaderTy<P, T> =
EitherReader<CursorTy<<P as DBProvider>::Tx, T>, <P as NodePrimitivesProvider>::Primitives>;
/// Type alias for [`EitherWriter`] constructors.
type EitherWriterTy<'a, P, T> = EitherWriter<
@@ -36,31 +34,13 @@ type EitherWriterTy<'a, P, T> = EitherWriter<
<P as NodePrimitivesProvider>::Primitives,
>;
// Helper types so constructors stay exported even when RocksDB feature is off.
// RocksDBWriteMode encapsulates the choice between Transaction and Batch writes.
#[cfg(all(unix, feature = "rocksdb"))]
type RocksWriteModeArg<'a> = crate::providers::rocksdb::RocksDBWriteMode<'a>;
#[cfg(not(all(unix, feature = "rocksdb")))]
type RocksWriteModeArg<'a> = ();
#[cfg(all(unix, feature = "rocksdb"))]
type RocksTxRefArg<'a> = &'a crate::providers::rocksdb::RocksTx<'a>;
#[cfg(not(all(unix, feature = "rocksdb")))]
type RocksTxRefArg<'a> = ();
/// Represents a destination for writing data, either to database, static files, or `RocksDB`.
/// Represents a destination for writing data, either to database or static files.
#[derive(Debug, Display)]
pub enum EitherWriter<'a, CURSOR, N> {
/// Write to database table via cursor
Database(CURSOR),
/// Write to static file
StaticFile(StaticFileProviderRWRefMut<'a, N>),
/// Write to `RocksDB` (transaction or batch - internal detail).
///
/// Uses [`RocksDBWriteMode`] to encapsulate the choice between full transaction
/// semantics and high-throughput batch writes.
#[cfg(all(unix, feature = "rocksdb"))]
RocksDB(RocksDBWriteMode<'a>),
}
impl<'a> EitherWriter<'a, (), ()> {
@@ -129,65 +109,6 @@ impl<'a> EitherWriter<'a, (), ()> {
))
}
}
/// Creates a new [`EitherWriter`] for storages history based on storage settings.
///
/// Accepts either Transaction or Batch mode via [`RocksDBWriteMode`].
pub fn new_storages_history<P>(
provider: &P,
_rocksdb_mode: RocksWriteModeArg<'a>,
) -> ProviderResult<EitherWriterTy<'a, P, tables::StoragesHistory>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTxMut,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storages_history_in_rocksdb {
return Ok(EitherWriter::RocksDB(_rocksdb_mode));
}
Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::StoragesHistory>()?))
}
/// Creates a new [`EitherWriter`] for accounts history based on storage settings.
///
/// Accepts either Transaction or Batch mode via [`RocksDBWriteMode`].
pub fn new_accounts_history<P>(
provider: &P,
_rocksdb_mode: RocksWriteModeArg<'a>,
) -> ProviderResult<EitherWriterTy<'a, P, tables::AccountsHistory>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTxMut,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().account_history_in_rocksdb {
return Ok(EitherWriter::RocksDB(_rocksdb_mode));
}
Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::AccountsHistory>()?))
}
/// Creates a new [`EitherWriter`] for transaction hash numbers based on storage settings.
///
/// Accepts either Transaction or Batch mode via [`RocksDBWriteMode`].
pub fn new_transaction_hash_numbers<P>(
provider: &P,
_rocksdb_mode: RocksWriteModeArg<'a>,
) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionHashNumbers>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTxMut,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
return Ok(EitherWriter::RocksDB(_rocksdb_mode));
}
Ok(EitherWriter::Database(
provider.tx_ref().cursor_write::<tables::TransactionHashNumbers>()?,
))
}
}
impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
@@ -198,8 +119,6 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
match self {
Self::Database(_) => Ok(()),
Self::StaticFile(writer) => writer.increment_block(expected_block_number),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
@@ -213,26 +132,6 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
match self {
Self::Database(_) => Ok(()),
Self::StaticFile(writer) => writer.ensure_at_block(block_number),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
/// Commits `RocksDB` writes if this is a `RocksDB` writer.
///
/// For [`Self::Database`] and [`Self::StaticFile`], this is a no-op as they use
/// different commit patterns (MDBX transaction commit, static file sync).
///
/// # Commit Order
///
/// Call this AFTER the outer MDBX transaction commits successfully. This ensures
/// that if `RocksDB` commit fails, the primary data (MDBX) is still intact and
/// the `RocksDB` data (which is derived) can be rebuilt.
#[cfg(all(unix, feature = "rocksdb"))]
pub fn commit(self) -> ProviderResult<()> {
match self {
Self::Database(_) | Self::StaticFile(_) => Ok(()),
Self::RocksDB(mode) => mode.commit(),
}
}
}
@@ -247,8 +146,6 @@ where
match self {
Self::Database(cursor) => Ok(cursor.append(tx_num, receipt)?),
Self::StaticFile(writer) => writer.append_receipt(tx_num, receipt),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
}
@@ -262,8 +159,6 @@ where
match self {
Self::Database(cursor) => Ok(cursor.append(tx_num, sender)?),
Self::StaticFile(writer) => writer.append_transaction_sender(tx_num, sender),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
@@ -280,8 +175,6 @@ where
Ok(())
}
Self::StaticFile(writer) => writer.append_transaction_senders(senders),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
@@ -313,87 +206,41 @@ where
writer.prune_transaction_senders(to_delete, block)?;
}
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
}
Ok(())
}
}
/// Represents a source for reading data, either from database, static files, or `RocksDB`.
/// Represents a source for reading data, either from database or static files.
#[derive(Debug, Display)]
pub enum EitherReader<'a, CURSOR, N> {
pub enum EitherReader<CURSOR, N> {
/// Read from database table via cursor
Database(CURSOR, PhantomData<&'a ()>),
Database(CURSOR),
/// Read from static file
StaticFile(StaticFileProvider<N>, PhantomData<&'a ()>),
/// Read from `RocksDB` transaction
#[cfg(all(unix, feature = "rocksdb"))]
RocksDB(&'a crate::providers::rocksdb::RocksTx<'a>),
StaticFile(StaticFileProvider<N>),
}
impl<'a> EitherReader<'a, (), ()> {
impl EitherReader<(), ()> {
/// Creates a new [`EitherReader`] for senders based on storage settings.
pub fn new_senders<P>(
provider: &P,
) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionSenders>>
) -> ProviderResult<EitherReaderTy<P, tables::TransactionSenders>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
P::Tx: DbTx,
{
if EitherWriterDestination::senders(provider).is_static_file() {
Ok(EitherReader::StaticFile(provider.static_file_provider(), PhantomData))
Ok(EitherReader::StaticFile(provider.static_file_provider()))
} else {
Ok(EitherReader::Database(
provider.tx_ref().cursor_read::<tables::TransactionSenders>()?,
PhantomData,
))
}
}
/// Creates a new [`EitherReader`] for storages history based on storage settings.
pub fn new_storages_history<P>(
provider: &P,
_rocksdb_tx: RocksTxRefArg<'a>,
) -> ProviderResult<EitherReaderTy<'a, P, tables::StoragesHistory>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTx,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storages_history_in_rocksdb {
return Ok(EitherReader::RocksDB(_rocksdb_tx));
}
Ok(EitherReader::Database(
provider.tx_ref().cursor_read::<tables::StoragesHistory>()?,
PhantomData,
))
}
/// Creates a new [`EitherReader`] for transaction hash numbers based on storage settings.
pub fn new_transaction_hash_numbers<P>(
provider: &P,
_rocksdb_tx: RocksTxRefArg<'a>,
) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionHashNumbers>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTx,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
return Ok(EitherReader::RocksDB(_rocksdb_tx));
}
Ok(EitherReader::Database(
provider.tx_ref().cursor_read::<tables::TransactionHashNumbers>()?,
PhantomData,
))
}
}
impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
impl<CURSOR, N: NodePrimitives> EitherReader<CURSOR, N>
where
CURSOR: DbCursorRO<tables::TransactionSenders>,
{
@@ -403,11 +250,11 @@ where
range: Range<TxNumber>,
) -> ProviderResult<HashMap<TxNumber, Address>> {
match self {
Self::Database(cursor, _) => cursor
Self::Database(cursor) => cursor
.walk_range(range)?
.map(|result| result.map_err(ProviderError::from))
.collect::<ProviderResult<HashMap<_, _>>>(),
Self::StaticFile(provider, _) => range
Self::StaticFile(provider) => range
.clone()
.zip(provider.fetch_range_iter(
StaticFileSegment::TransactionSenders,
@@ -419,8 +266,6 @@ where
Some(result.map(|sender| (tx_num, sender)))
})
.collect::<ProviderResult<HashMap<_, _>>>(),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
}
@@ -432,8 +277,6 @@ pub enum EitherWriterDestination {
Database,
/// Write to static file
StaticFile,
/// Write to `RocksDB`
RocksDB,
}
impl EitherWriterDestination {
@@ -493,9 +336,9 @@ mod tests {
let provider = factory.database_provider_ro().unwrap();
let mut reader = EitherReader::new_senders(&provider).unwrap();
if transaction_senders_in_static_files {
assert!(matches!(reader, EitherReader::StaticFile(_, _)));
assert!(matches!(reader, EitherReader::StaticFile(_)));
} else {
assert!(matches!(reader, EitherReader::Database(_, _)));
assert!(matches!(reader, EitherReader::Database(_)));
}
assert_eq!(

View File

@@ -32,9 +32,9 @@ pub use consistent::ConsistentProvider;
// RocksDB currently only supported on Unix platforms
// Windows support is planned for future releases
#[cfg(all(unix, feature = "rocksdb"))]
pub(crate) mod rocksdb;
mod rocksdb;
#[cfg(all(unix, feature = "rocksdb"))]
pub use rocksdb::{RocksDBBuilder, RocksDBProvider, RocksDBWriteMode, RocksTx};
pub use rocksdb::{RocksDBBuilder, RocksDBProvider};
/// Helper trait to bound [`NodeTypes`] so that combined with database they satisfy
/// [`ProviderNodeTypes`].

View File

@@ -2,7 +2,4 @@
mod metrics;
mod provider;
pub use provider::{RocksDBBuilder, RocksDBProvider, RocksDBWriteMode, RocksTx};
#[cfg(test)]
pub(crate) use provider::RocksDBBatch;
pub use provider::{RocksDBBuilder, RocksDBProvider};

View File

@@ -8,9 +8,8 @@ use reth_storage_errors::{
provider::{ProviderError, ProviderResult},
};
use rocksdb::{
BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
IteratorMode, Options, Transaction, TransactionDB, TransactionDBOptions, TransactionOptions,
WriteBatchWithTransaction, WriteOptions,
BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType, Options,
WriteBatch, DB,
};
use std::{
fmt,
@@ -18,7 +17,6 @@ use std::{
sync::Arc,
time::Instant,
};
use tracing::warn;
/// Default cache size for `RocksDB` block cache (128 MB).
const DEFAULT_CACHE_SIZE: usize = 128 << 20;
@@ -179,15 +177,7 @@ impl RocksDBBuilder {
})
.collect();
// Use TransactionDB for MDBX-like transaction semantics (read-your-writes, rollback)
let txn_db_options = TransactionDBOptions::default();
let db = TransactionDB::open_cf_descriptors(
&options,
&txn_db_options,
&self.path,
cf_descriptors,
)
.map_err(|e| {
let db = DB::open_cf_descriptors(&options, &self.path, cf_descriptors).map_err(|e| {
ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
@@ -219,22 +209,14 @@ macro_rules! compress_to_buf_or_ref {
pub struct RocksDBProvider(Arc<RocksDBProviderInner>);
/// Inner state for `RocksDB` provider.
#[derive(Debug)]
struct RocksDBProviderInner {
/// `RocksDB` database instance with transaction support.
db: TransactionDB,
/// `RocksDB` database instance.
db: DB,
/// Metrics latency & operations.
metrics: Option<RocksDBMetrics>,
}
impl fmt::Debug for RocksDBProviderInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RocksDBProviderInner")
.field("db", &"<TransactionDB>")
.field("metrics", &self.metrics)
.finish()
}
}
impl Clone for RocksDBProvider {
fn clone(&self) -> Self {
Self(self.0.clone())
@@ -252,34 +234,6 @@ impl RocksDBProvider {
RocksDBBuilder::new(path)
}
/// Creates a new transaction with MDBX-like semantics (read-your-writes, rollback).
pub fn tx(&self) -> RocksTx<'_> {
let write_options = WriteOptions::default();
let txn_options = TransactionOptions::default();
let inner = self.0.db.transaction_opt(&write_options, &txn_options);
RocksTx { inner, provider: self }
}
/// Creates a new batch for manual commit.
///
/// Use [`Self::write_batch`] for closure-based atomic writes.
/// Use this method when the batch needs to be held by [`EitherWriter`](crate::EitherWriter).
///
/// # Example
///
/// ```ignore
/// let batch = provider.batch();
/// batch.put::<SomeTable>(key, &value)?;
/// batch.commit()?;
/// ```
pub fn batch(&self) -> RocksDBBatch<'_> {
RocksDBBatch {
provider: self,
inner: WriteBatchWithTransaction::<true>::default(),
buf: Vec::new(),
}
}
/// Gets the column family handle for a table.
fn get_cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
self.0
@@ -377,17 +331,12 @@ impl RocksDBProvider {
{
// Note: Using "Batch" as table name for batch operations across multiple tables
self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
let mut batch_handle = RocksDBBatch {
provider: this,
inner: WriteBatchWithTransaction::<true>::default(),
buf: Vec::new(),
};
let mut batch_handle =
RocksDBBatch { provider: this, inner: WriteBatch::default(), buf: Vec::new() };
f(&mut batch_handle)?;
// Take ownership of inner to prevent Drop from logging a warning
let batch = std::mem::take(&mut batch_handle.inner);
this.0.db.write(batch).map_err(|e| {
this.0.db.write(batch_handle.inner).map_err(|e| {
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
@@ -398,19 +347,17 @@ impl RocksDBProvider {
}
/// Handle for building a batch of operations atomically.
///
/// Uses `WriteBatchWithTransaction<true>` for compatibility with `TransactionDB`.
pub struct RocksDBBatch<'a> {
provider: &'a RocksDBProvider,
inner: WriteBatchWithTransaction<true>,
inner: WriteBatch,
buf: Vec<u8>,
}
impl fmt::Debug for RocksDBBatch<'_> {
impl<'a> fmt::Debug for RocksDBBatch<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RocksDBBatch")
.field("provider", &self.provider)
.field("batch", &"<WriteBatchWithTransaction>")
.field("batch", &"<WriteBatch>")
// Number of operations in this batch
.field("length", &self.inner.len())
// Total serialized size (encoded key + compressed value + metadata) of this batch
@@ -443,250 +390,6 @@ impl<'a> RocksDBBatch<'a> {
self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
Ok(())
}
/// Commits the batch to the database.
///
/// This consumes the batch and writes all operations atomically to `RocksDB`.
///
/// # Errors
///
/// Returns an error if the write fails.
pub fn commit(mut self) -> ProviderResult<()> {
// Take ownership of inner to prevent Drop from logging a warning
let batch = std::mem::take(&mut self.inner);
self.provider.0.db.write(batch).map_err(|e| {
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})
}
/// Returns the number of operations in this batch.
pub fn len(&self) -> usize {
self.inner.len()
}
/// Returns `true` if the batch contains no operations.
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
impl Drop for RocksDBBatch<'_> {
fn drop(&mut self) {
if !self.inner.is_empty() {
warn!(
target: "reth::storage",
batch_len = %self.inner.len(),
"RocksDBBatch dropped without commit - data discarded"
);
}
}
}
/// `RocksDB` transaction wrapper providing MDBX-like semantics.
///
/// Supports:
/// - Read-your-writes: reads see uncommitted writes within the same transaction
/// - Atomic commit/rollback
/// - Iteration over uncommitted data
///
/// Note: `Transaction` is `Send` but NOT `Sync`. This wrapper does not implement
/// `DbTx`/`DbTxMut` traits directly; use RocksDB-specific methods instead.
pub struct RocksTx<'db> {
inner: Transaction<'db, TransactionDB>,
provider: &'db RocksDBProvider,
}
impl fmt::Debug for RocksTx<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RocksTx").field("provider", &self.provider).finish_non_exhaustive()
}
}
impl<'db> RocksTx<'db> {
/// Gets a value from the specified table. Sees uncommitted writes in this transaction.
pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
let encoded_key = key.encode();
self.get_encoded::<T>(&encoded_key)
}
/// Gets a value using pre-encoded key. Sees uncommitted writes in this transaction.
pub fn get_encoded<T: Table>(
&self,
key: &<T::Key as Encode>::Encoded,
) -> ProviderResult<Option<T::Value>> {
let cf = self.provider.get_cf_handle::<T>()?;
let result = self.inner.get_cf(cf, key.as_ref()).map_err(|e| {
ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})?;
Ok(result.and_then(|value| T::Value::decompress(&value).ok()))
}
/// Puts a value into the specified table.
pub fn put<T: Table>(&self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
let encoded_key = key.encode();
self.put_encoded::<T>(&encoded_key, value)
}
/// Puts a value using pre-encoded key.
pub fn put_encoded<T: Table>(
&self,
key: &<T::Key as Encode>::Encoded,
value: &T::Value,
) -> ProviderResult<()> {
let cf = self.provider.get_cf_handle::<T>()?;
let mut buf = Vec::new();
let value_bytes = compress_to_buf_or_ref!(buf, value).unwrap_or(&buf);
self.inner.put_cf(cf, key.as_ref(), value_bytes).map_err(|e| {
ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
operation: DatabaseWriteOperation::PutUpsert,
table_name: T::NAME,
key: key.as_ref().to_vec(),
})))
})
}
/// Deletes a value from the specified table.
pub fn delete<T: Table>(&self, key: T::Key) -> ProviderResult<()> {
let cf = self.provider.get_cf_handle::<T>()?;
self.inner.delete_cf(cf, key.encode().as_ref()).map_err(|e| {
ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})
}
/// Creates an iterator for the specified table. Sees uncommitted writes in this transaction.
///
/// Returns an iterator that yields `(encoded_key, compressed_value)` pairs.
pub fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
let cf = self.provider.get_cf_handle::<T>()?;
let iter = self.inner.iterator_cf(cf, IteratorMode::Start);
Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
}
/// Creates an iterator starting from the given key (inclusive).
pub fn iter_from<T: Table>(&self, key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
let cf = self.provider.get_cf_handle::<T>()?;
let encoded_key = key.encode();
let iter = self
.inner
.iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward));
Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
}
/// Commits the transaction, persisting all changes.
pub fn commit(self) -> ProviderResult<()> {
self.inner.commit().map_err(|e| {
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})
}
/// Rolls back the transaction, discarding all changes.
pub fn rollback(self) -> ProviderResult<()> {
self.inner.rollback().map_err(|e| {
ProviderError::Database(DatabaseError::Other(format!("rollback failed: {e}")))
})
}
}
/// Iterator over a `RocksDB` table within a transaction.
///
/// Yields decoded `(Key, Value)` pairs. Sees uncommitted writes.
pub struct RocksTxIter<'tx, T: Table> {
inner: rocksdb::DBIteratorWithThreadMode<'tx, Transaction<'tx, TransactionDB>>,
_marker: std::marker::PhantomData<T>,
}
impl<T: Table> fmt::Debug for RocksTxIter<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RocksTxIter").field("table", &T::NAME).finish_non_exhaustive()
}
}
impl<T: Table> Iterator for RocksTxIter<'_, T> {
type Item = ProviderResult<(T::Key, T::Value)>;
fn next(&mut self) -> Option<Self::Item> {
let (key_bytes, value_bytes) = match self.inner.next()? {
Ok(kv) => kv,
Err(e) => {
return Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))))
}
};
// Decode key
let key = match <T::Key as reth_db_api::table::Decode>::decode(&key_bytes) {
Ok(k) => k,
Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
};
// Decompress value
let value = match T::Value::decompress(&value_bytes) {
Ok(v) => v,
Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
};
Some(Ok((key, value)))
}
}
/// `RocksDB` write strategy - internal implementation detail.
///
/// This enum encapsulates the choice between full transaction semantics
/// and high-throughput batch writes. Use [`RocksDBWriteMode::Transaction`] for
/// read-modify-write operations that need read-your-writes semantics.
/// Use [`RocksDBWriteMode::Batch`] for bulk sync operations where
/// read-your-writes is not needed.
pub enum RocksDBWriteMode<'a> {
/// Full transaction with read-your-writes, rollback support.
/// Use for read-modify-write operations.
Transaction(RocksTx<'a>),
/// Write-only batch for maximum throughput.
/// Use for bulk sync operations where read-your-writes is not needed.
Batch(RocksDBBatch<'a>),
}
impl fmt::Debug for RocksDBWriteMode<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Transaction(tx) => f.debug_tuple("Transaction").field(tx).finish(),
Self::Batch(batch) => f.debug_tuple("Batch").field(batch).finish(),
}
}
}
impl<'a> RocksDBWriteMode<'a> {
/// Puts a value into the specified table.
pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
match self {
Self::Transaction(tx) => tx.put::<T>(key, value),
Self::Batch(batch) => batch.put::<T>(key, value),
}
}
/// Commits the transaction or batch.
pub fn commit(self) -> ProviderResult<()> {
match self {
Self::Transaction(tx) => tx.commit(),
Self::Batch(batch) => batch.commit(),
}
}
}
/// Converts Reth's [`LogLevel`] to `RocksDB`'s [`rocksdb::LogLevel`].
@@ -821,321 +524,44 @@ mod tests {
#[test]
fn test_statistics_enabled() {
let temp_dir = TempDir::new().unwrap();
// Just verify that building with statistics doesn't panic
let provider = RocksDBBuilder::new(temp_dir.path())
.with_table::<TestTable>()
.with_statistics()
.build()
.unwrap();
// Do operations - data should be immediately readable with TransactionDB
// Do operations
for i in 0..10 {
let value = vec![i as u8];
provider.put::<TestTable>(i, &value).unwrap();
// Verify write is visible
assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
}
// Verify statistics enabled
let stats = provider.0.db.property_value("rocksdb.stats").unwrap();
assert!(stats.is_some(), "Statistics should be enabled");
let stats_str = stats.unwrap();
assert!(stats_str.contains("DB Stats"));
}
#[test]
fn test_data_persistence() {
fn test_compression_after_flush() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Insert data - TransactionDB writes are immediately visible
// Insert compressible data
let value = vec![42u8; 1000];
for i in 0..100 {
provider.put::<TestTable>(i, &value).unwrap();
}
// Verify data is readable
// Get CF handle and flush it
let cf = provider.0.db.cf_handle("TestTable").expect("CF should exist");
provider.0.db.flush_cf(cf).unwrap();
// Verify data is persisted by reading it back
for i in 0..100 {
assert!(provider.get::<TestTable>(i).unwrap().is_some(), "Data should be readable");
assert!(provider.get::<TestTable>(i).unwrap().is_some(), "Data should be persisted");
}
}
#[test]
fn test_transaction_read_your_writes() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Create a transaction
let tx = provider.tx();
// Write data within the transaction
let key = 42u64;
let value = b"test_value".to_vec();
tx.put::<TestTable>(key, &value).unwrap();
// Read-your-writes: should see uncommitted data in same transaction
let result = tx.get::<TestTable>(key).unwrap();
assert_eq!(
result,
Some(value.clone()),
"Transaction should see its own uncommitted writes"
);
// Data should NOT be visible via provider (outside transaction)
let provider_result = provider.get::<TestTable>(key).unwrap();
assert_eq!(provider_result, None, "Uncommitted data should not be visible outside tx");
// Commit the transaction
tx.commit().unwrap();
// Now data should be visible via provider
let committed_result = provider.get::<TestTable>(key).unwrap();
assert_eq!(committed_result, Some(value), "Committed data should be visible");
}
#[test]
fn test_transaction_rollback() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// First, put some initial data
let key = 100u64;
let initial_value = b"initial".to_vec();
provider.put::<TestTable>(key, &initial_value).unwrap();
// Create a transaction and modify data
let tx = provider.tx();
let new_value = b"modified".to_vec();
tx.put::<TestTable>(key, &new_value).unwrap();
// Verify modification is visible within transaction
assert_eq!(tx.get::<TestTable>(key).unwrap(), Some(new_value));
// Rollback instead of commit
tx.rollback().unwrap();
// Data should be unchanged (initial value)
let result = provider.get::<TestTable>(key).unwrap();
assert_eq!(result, Some(initial_value), "Rollback should preserve original data");
}
#[test]
fn test_transaction_iterator() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Create a transaction
let tx = provider.tx();
// Write multiple entries
for i in 0..5u64 {
let value = format!("value_{i}").into_bytes();
tx.put::<TestTable>(i, &value).unwrap();
}
// Iterate - should see uncommitted writes
let mut count = 0;
for result in tx.iter::<TestTable>().unwrap() {
let (key, value) = result.unwrap();
assert_eq!(value, format!("value_{key}").into_bytes());
count += 1;
}
assert_eq!(count, 5, "Iterator should see all uncommitted writes");
// Commit
tx.commit().unwrap();
}
#[test]
fn test_batch_manual_commit() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Create a batch via provider.batch()
let mut batch = provider.batch();
// Add entries
for i in 0..10u64 {
let value = format!("batch_value_{i}").into_bytes();
batch.put::<TestTable>(i, &value).unwrap();
}
// Verify len/is_empty
assert_eq!(batch.len(), 10);
assert!(!batch.is_empty());
// Data should NOT be visible before commit
assert_eq!(provider.get::<TestTable>(0).unwrap(), None);
// Commit the batch
batch.commit().unwrap();
// Now data should be visible
for i in 0..10u64 {
let value = format!("batch_value_{i}").into_bytes();
assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
}
}
#[test]
fn test_write_mode_batch() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Create write mode using Batch
let batch = provider.batch();
let mut mode = RocksDBWriteMode::Batch(batch);
// Write via RocksDBWriteMode
let key = 42u64;
let value = b"test_via_mode".to_vec();
mode.put::<TestTable>(key, &value).unwrap();
// Commit via RocksDBWriteMode
mode.commit().unwrap();
// Verify data is visible
assert_eq!(provider.get::<TestTable>(key).unwrap(), Some(value));
}
#[test]
fn test_write_mode_transaction() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Create write mode using Transaction
let tx = provider.tx();
let mut mode = RocksDBWriteMode::Transaction(tx);
// Write via RocksDBWriteMode
let key = 100u64;
let value = b"test_via_tx_mode".to_vec();
mode.put::<TestTable>(key, &value).unwrap();
// Commit via RocksDBWriteMode
mode.commit().unwrap();
// Verify data is visible
assert_eq!(provider.get::<TestTable>(key).unwrap(), Some(value));
}
#[test]
fn test_batch_empty_commit() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Create an empty batch
let batch = provider.batch();
assert!(batch.is_empty());
assert_eq!(batch.len(), 0);
// Commit should succeed (no-op)
batch.commit().unwrap();
}
#[test]
fn test_batch_drop_without_commit_does_not_persist() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Create a batch and add entries but DON'T commit
{
let mut batch = provider.batch();
for i in 0..5u64 {
let value = format!("dropped_value_{i}").into_bytes();
batch.put::<TestTable>(i, &value).unwrap();
}
// batch dropped here without commit - should log warning
}
// Data should NOT be persisted
for i in 0..5u64 {
assert_eq!(
provider.get::<TestTable>(i).unwrap(),
None,
"Dropped batch should not persist data"
);
}
}
#[test]
fn test_write_mode_debug_formatting() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Test Batch variant debug output
let batch = provider.batch();
let mode_batch = RocksDBWriteMode::Batch(batch);
let debug_str = format!("{:?}", mode_batch);
assert!(debug_str.contains("Batch"), "Debug should contain 'Batch': {debug_str}");
// Test Transaction variant debug output
let tx = provider.tx();
let mode_tx = RocksDBWriteMode::Transaction(tx);
let debug_str = format!("{:?}", mode_tx);
assert!(
debug_str.contains("Transaction"),
"Debug should contain 'Transaction': {debug_str}"
);
}
#[test]
fn test_batch_delete_operation() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// First, insert some data
let mut batch = provider.batch();
for i in 0..5u64 {
batch.put::<TestTable>(i, &vec![i as u8]).unwrap();
}
batch.commit().unwrap();
// Verify data exists
for i in 0..5u64 {
assert!(provider.get::<TestTable>(i).unwrap().is_some());
}
// Now delete via batch
let mut delete_batch = provider.batch();
for i in 0..5u64 {
delete_batch.delete::<TestTable>(i).unwrap();
}
delete_batch.commit().unwrap();
// Verify data is deleted
for i in 0..5u64 {
assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
}
}
#[test]
fn test_batch_overwrite_existing_key() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
let key = 42u64;
let initial_value = b"initial".to_vec();
let updated_value = b"updated".to_vec();
// Insert initial value
let mut batch = provider.batch();
batch.put::<TestTable>(key, &initial_value).unwrap();
batch.commit().unwrap();
assert_eq!(provider.get::<TestTable>(key).unwrap(), Some(initial_value));
// Overwrite with new value
let mut batch = provider.batch();
batch.put::<TestTable>(key, &updated_value).unwrap();
batch.commit().unwrap();
assert_eq!(provider.get::<TestTable>(key).unwrap(), Some(updated_value));
}
}

View File

@@ -213,17 +213,17 @@ where
}
/// Converts the changed accounts to a map of sender ids to sender info (internal identifier
/// used for __tracked__ accounts)
/// used for accounts)
fn changed_senders(
&self,
accs: impl Iterator<Item = ChangedAccount>,
) -> FxHashMap<SenderId, SenderInfo> {
let identifiers = self.identifiers.read();
let mut identifiers = self.identifiers.write();
accs.into_iter()
.filter_map(|acc| {
.map(|acc| {
let ChangedAccount { address, nonce, balance } = acc;
let sender_id = identifiers.sender_id(&address)?;
Some((sender_id, SenderInfo { state_nonce: nonce, balance }))
let sender_id = identifiers.sender_id_or_create(address);
(sender_id, SenderInfo { state_nonce: nonce, balance })
})
.collect()
}

View File

@@ -3,8 +3,7 @@
#![allow(dead_code)]
use crate::{
error::PoolErrorKind,
pool::{state::SubPool, txpool::TxPool, AddedTransaction},
pool::{txpool::TxPool, AddedTransaction},
test_utils::{MockOrdering, MockTransactionDistribution, MockTransactionFactory},
TransactionOrdering,
};
@@ -75,8 +74,6 @@ pub(crate) struct MockTransactionSimulator<R: Rng> {
executed: HashMap<Address, ExecutedScenarios>,
/// "Validates" generated transactions.
validator: MockTransactionFactory,
/// Represents the gaps in nonces for each sender.
nonce_gaps: HashMap<Address, u64>,
/// The rng instance used to select senders and scenarios.
rng: R,
}
@@ -94,23 +91,10 @@ impl<R: Rng> MockTransactionSimulator<R> {
tx_generator: config.tx_generator,
executed: Default::default(),
validator: Default::default(),
nonce_gaps: Default::default(),
rng,
}
}
/// Creates a pool configured for this simulator
///
/// This is needed because `MockPool::default()` sets `pending_basefee` to 7, but we might want
/// to use different values
pub(crate) fn create_pool(&self) -> MockPool {
let mut pool = MockPool::default();
let mut info = pool.block_info();
info.pending_basefee = self.base_fee as u64;
pool.set_block_info(info);
pool
}
/// Returns a random address from the senders set
fn rng_address(&mut self) -> Address {
let idx = self.rng.random_range(0..self.senders.len());
@@ -132,20 +116,17 @@ impl<R: Rng> MockTransactionSimulator<R> {
match scenario {
ScenarioType::OnchainNonce => {
// uses fee from fee_ranges
let tx = self.tx_generator.tx(on_chain_nonce, &mut self.rng).with_sender(sender);
let tx = self
.tx_generator
.tx(on_chain_nonce, &mut self.rng)
.with_gas_price(self.base_fee);
let valid_tx = self.validator.validated(tx);
let res =
match pool.add_transaction(valid_tx, on_chain_balance, on_chain_nonce, None) {
Ok(res) => res,
Err(e) => match e.kind {
// skip pool capacity/replacement errors (not relevant)
PoolErrorKind::SpammerExceededCapacity(_) |
PoolErrorKind::ReplacementUnderpriced => return,
_ => panic!("unexpected error: {e:?}"),
},
};
pool.add_transaction(valid_tx, on_chain_balance, on_chain_nonce, None).unwrap();
// TODO(mattsse): need a way expect based on the current state of the pool and tx
// settings
match res {
AddedTransaction::Pending(_) => {}
@@ -154,173 +135,15 @@ impl<R: Rng> MockTransactionSimulator<R> {
}
}
self.executed
.entry(sender)
.or_insert_with(|| ExecutedScenarios { sender, scenarios: vec![] }) // in the case of a new sender
.scenarios
.push(ExecutedScenario {
balance: on_chain_balance,
nonce: on_chain_nonce,
scenario: Scenario::OnchainNonce { nonce: on_chain_nonce },
});
self.nonces.insert(sender, on_chain_nonce + 1);
// TODO(mattsse): check subpools
}
ScenarioType::HigherNonce { skip } => {
// if this sender already has a nonce gap, skip
if self.nonce_gaps.contains_key(&sender) {
return;
}
let higher_nonce = on_chain_nonce + skip;
// uses fee from fee_ranges
let tx = self.tx_generator.tx(higher_nonce, &mut self.rng).with_sender(sender);
let valid_tx = self.validator.validated(tx);
let res =
match pool.add_transaction(valid_tx, on_chain_balance, on_chain_nonce, None) {
Ok(res) => res,
Err(e) => match e.kind {
// skip pool capacity/replacement errors (not relevant)
PoolErrorKind::SpammerExceededCapacity(_) |
PoolErrorKind::ReplacementUnderpriced => return,
_ => panic!("unexpected error: {e:?}"),
},
};
match res {
AddedTransaction::Pending(_) => {
panic!("expected parked")
}
AddedTransaction::Parked { subpool, .. } => {
assert_eq!(
subpool,
SubPool::Queued,
"expected to be moved to queued subpool"
);
}
}
self.executed
.entry(sender)
.or_insert_with(|| ExecutedScenarios { sender, scenarios: vec![] }) // in the case of a new sender
.scenarios
.push(ExecutedScenario {
balance: on_chain_balance,
nonce: on_chain_nonce,
scenario: Scenario::HigherNonce {
onchain: on_chain_nonce,
nonce: higher_nonce,
},
});
self.nonce_gaps.insert(sender, higher_nonce);
}
ScenarioType::BelowBaseFee { fee } => {
// fee should be in [MIN_PROTOCOL_BASE_FEE, base_fee)
let tx = self
.tx_generator
.tx(on_chain_nonce, &mut self.rng)
.with_sender(sender)
.with_gas_price(fee);
let valid_tx = self.validator.validated(tx);
let res =
match pool.add_transaction(valid_tx, on_chain_balance, on_chain_nonce, None) {
Ok(res) => res,
Err(e) => match e.kind {
// skip pool capacity/replacement errors (not relevant)
PoolErrorKind::SpammerExceededCapacity(_) |
PoolErrorKind::ReplacementUnderpriced => return,
_ => panic!("unexpected error: {e:?}"),
},
};
match res {
AddedTransaction::Pending(_) => panic!("expected parked"),
AddedTransaction::Parked { subpool, .. } => {
assert_eq!(
subpool,
SubPool::BaseFee,
"expected to be moved to base fee subpool"
);
}
}
self.executed
.entry(sender)
.or_insert_with(|| ExecutedScenarios { sender, scenarios: vec![] }) // in the case of a new sender
.scenarios
.push(ExecutedScenario {
balance: on_chain_balance,
nonce: on_chain_nonce,
scenario: Scenario::BelowBaseFee { fee },
});
}
ScenarioType::FillNonceGap => {
if self.nonce_gaps.is_empty() {
return;
}
let gap_senders: Vec<Address> = self.nonce_gaps.keys().copied().collect();
let idx = self.rng.random_range(0..gap_senders.len());
let gap_sender = gap_senders[idx];
let queued_nonce = self.nonce_gaps[&gap_sender];
let sender_onchain_nonce = self.nonces[&gap_sender];
let sender_balance = self.balances[&gap_sender];
for fill_nonce in sender_onchain_nonce..queued_nonce {
let tx =
self.tx_generator.tx(fill_nonce, &mut self.rng).with_sender(gap_sender);
let valid_tx = self.validator.validated(tx);
let res = match pool.add_transaction(
valid_tx,
sender_balance,
sender_onchain_nonce,
None,
) {
Ok(res) => res,
Err(e) => match e.kind {
// skip pool capacity/replacement errors (not relevant)
PoolErrorKind::SpammerExceededCapacity(_) |
PoolErrorKind::ReplacementUnderpriced => return,
_ => panic!("unexpected error: {e:?}"),
},
};
match res {
AddedTransaction::Pending(_) => {}
AddedTransaction::Parked { .. } => {
panic!("expected pending when filling gap")
}
}
self.executed
.entry(gap_sender)
.or_insert_with(|| ExecutedScenarios {
sender: gap_sender,
scenarios: vec![],
})
.scenarios
.push(ExecutedScenario {
balance: sender_balance,
nonce: fill_nonce,
scenario: Scenario::FillNonceGap {
filled_nonce: fill_nonce,
promoted_nonce: queued_nonce,
},
});
}
self.nonces.insert(gap_sender, queued_nonce + 1);
self.nonce_gaps.remove(&gap_sender);
ScenarioType::HigherNonce { .. } => {
unimplemented!()
}
}
// make sure everything is set
pool.enforce_invariants();
pool.enforce_invariants()
}
}
@@ -349,8 +172,6 @@ impl MockSimulatorConfig {
pub(crate) enum ScenarioType {
OnchainNonce,
HigherNonce { skip: u64 },
BelowBaseFee { fee: u128 },
FillNonceGap,
}
/// The actual scenario, ready to be executed
@@ -365,12 +186,10 @@ pub(crate) enum Scenario {
OnchainNonce { nonce: u64 },
/// Send a tx with a higher nonce that what the sender has on chain
HigherNonce { onchain: u64, nonce: u64 },
/// Send a tx with a base fee below the base fee of the pool
BelowBaseFee { fee: u128 },
/// Fill a nonce gap to promote queued transactions
FillNonceGap { filled_nonce: u64, promoted_nonce: u64 },
/// Execute multiple test scenarios
Multi { scenario: Vec<Self> },
Multi {
// Execute multiple test scenarios
scenario: Vec<Self>,
},
}
/// Represents an executed scenario
@@ -407,18 +226,17 @@ mod tests {
blob_pct: 0,
};
let base_fee = 10u128;
let fee_ranges = MockFeeRange {
gas_price: (base_fee..100).try_into().unwrap(),
priority_fee: (1u128..10).try_into().unwrap(),
max_fee: (base_fee..110).try_into().unwrap(),
gas_price: (10u128..100).try_into().unwrap(),
priority_fee: (10u128..100).try_into().unwrap(),
max_fee: (100u128..110).try_into().unwrap(),
max_fee_blob: (1u128..100).try_into().unwrap(),
};
let config = MockSimulatorConfig {
num_senders: 10,
scenarios: vec![ScenarioType::OnchainNonce],
base_fee,
base_fee: 10,
tx_generator: MockTransactionDistribution::new(
transaction_ratio,
fee_ranges,
@@ -427,181 +245,8 @@ mod tests {
),
};
let mut simulator = MockTransactionSimulator::new(rand::rng(), config);
let mut pool = simulator.create_pool();
let mut pool = MockPool::default();
simulator.next(&mut pool);
assert_eq!(pool.pending().len(), 1);
assert_eq!(pool.queued().len(), 0);
assert_eq!(pool.base_fee().len(), 0);
}
#[test]
fn test_higher_nonce_scenario() {
let transaction_ratio = MockTransactionRatio {
legacy_pct: 30,
dynamic_fee_pct: 70,
access_list_pct: 0,
blob_pct: 0,
};
let base_fee = 10u128;
let fee_ranges = MockFeeRange {
gas_price: (base_fee..100).try_into().unwrap(),
priority_fee: (1u128..10).try_into().unwrap(),
max_fee: (base_fee..110).try_into().unwrap(),
max_fee_blob: (1u128..100).try_into().unwrap(),
};
let config = MockSimulatorConfig {
num_senders: 10,
scenarios: vec![ScenarioType::HigherNonce { skip: 1 }],
base_fee,
tx_generator: MockTransactionDistribution::new(
transaction_ratio,
fee_ranges,
10..100,
10..100,
),
};
let mut simulator = MockTransactionSimulator::new(rand::rng(), config);
let mut pool = simulator.create_pool();
simulator.next(&mut pool);
assert_eq!(pool.pending().len(), 0);
assert_eq!(pool.queued().len(), 1);
assert_eq!(pool.base_fee().len(), 0);
}
#[test]
fn test_below_base_fee_scenario() {
let transaction_ratio = MockTransactionRatio {
legacy_pct: 30,
dynamic_fee_pct: 70,
access_list_pct: 0,
blob_pct: 0,
};
let base_fee = 10u128;
let fee_ranges = MockFeeRange {
gas_price: (base_fee..100).try_into().unwrap(),
priority_fee: (1u128..10).try_into().unwrap(),
max_fee: (base_fee..110).try_into().unwrap(),
max_fee_blob: (1u128..100).try_into().unwrap(),
};
let config = MockSimulatorConfig {
num_senders: 10,
scenarios: vec![ScenarioType::BelowBaseFee { fee: 8 }], /* fee should be in
* [MIN_PROTOCOL_BASE_FEE,
* base_fee) */
base_fee,
tx_generator: MockTransactionDistribution::new(
transaction_ratio,
fee_ranges,
10..100,
10..100,
),
};
let mut simulator = MockTransactionSimulator::new(rand::rng(), config);
let mut pool = simulator.create_pool();
simulator.next(&mut pool);
assert_eq!(pool.pending().len(), 0);
assert_eq!(pool.queued().len(), 0);
assert_eq!(pool.base_fee().len(), 1);
}
#[test]
fn test_fill_nonce_gap_scenario() {
let transaction_ratio = MockTransactionRatio {
legacy_pct: 30,
dynamic_fee_pct: 70,
access_list_pct: 0,
blob_pct: 0,
};
let base_fee = 10u128;
let fee_ranges = MockFeeRange {
gas_price: (base_fee..100).try_into().unwrap(),
priority_fee: (1u128..10).try_into().unwrap(),
max_fee: (base_fee..110).try_into().unwrap(),
max_fee_blob: (1u128..100).try_into().unwrap(),
};
let config = MockSimulatorConfig {
num_senders: 5,
scenarios: vec![ScenarioType::HigherNonce { skip: 5 }],
base_fee,
tx_generator: MockTransactionDistribution::new(
transaction_ratio,
fee_ranges,
10..100,
10..100,
),
};
let mut simulator = MockTransactionSimulator::new(rand::rng(), config);
let mut pool = simulator.create_pool();
// create some nonce gaps
for _ in 0..10 {
simulator.next(&mut pool);
}
let num_gaps = simulator.nonce_gaps.len();
assert_eq!(pool.pending().len(), 0);
assert_eq!(pool.queued().len(), num_gaps);
assert_eq!(pool.base_fee().len(), 0);
simulator.scenarios = vec![ScenarioType::FillNonceGap];
for _ in 0..num_gaps {
simulator.next(&mut pool);
}
let expected_pending = num_gaps * 6;
assert_eq!(pool.pending().len(), expected_pending);
assert_eq!(pool.queued().len(), 0);
assert_eq!(pool.base_fee().len(), 0);
}
#[test]
fn test_random_scenarios() {
let transaction_ratio = MockTransactionRatio {
legacy_pct: 30,
dynamic_fee_pct: 70,
access_list_pct: 0,
blob_pct: 0,
};
let base_fee = 10u128;
let fee_ranges = MockFeeRange {
gas_price: (base_fee..100).try_into().unwrap(),
priority_fee: (1u128..10).try_into().unwrap(),
max_fee: (base_fee..110).try_into().unwrap(),
max_fee_blob: (1u128..100).try_into().unwrap(),
};
let config = MockSimulatorConfig {
num_senders: 10,
scenarios: vec![
ScenarioType::OnchainNonce,
ScenarioType::HigherNonce { skip: 2 },
ScenarioType::BelowBaseFee { fee: 8 },
ScenarioType::FillNonceGap,
],
base_fee,
tx_generator: MockTransactionDistribution::new(
transaction_ratio,
fee_ranges,
10..100,
10..100,
),
};
let mut simulator = MockTransactionSimulator::new(rand::rng(), config);
let mut pool = simulator.create_pool();
for _ in 0..1000 {
simulator.next(&mut pool);
}
}
}

View File

@@ -31,6 +31,7 @@ nybbles = { workspace = true, features = ["rlp"] }
# reth
revm-database.workspace = true
revm-state.workspace = true
# `serde` feature
serde = { workspace = true, optional = true }
@@ -64,7 +65,6 @@ bincode.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
revm-state.workspace = true
[features]
default = ["std"]

View File

@@ -83,6 +83,19 @@ impl MultiAddedRemovedKeys {
self.storages.entry(address).or_insert_with(default_added_removed_keys);
}
}
/// Marks an account as removed.
pub fn mark_account_removed(&mut self, account: B256) {
self.account.insert_removed(account);
}
/// Marks a storage slot as removed for the given account.
pub fn mark_storage_removed(&mut self, hashed_address: B256, slot: B256) {
self.storages
.entry(hashed_address)
.or_insert_with(default_added_removed_keys)
.insert_removed(slot);
}
}
#[cfg(test)]
@@ -184,6 +197,147 @@ mod tests {
assert!(!multi_keys.get_accounts().is_removed(&addr));
}
#[test]
fn test_record_removals_is_monotonic() {
use alloy_primitives::Address;
use revm_state::{Account, AccountInfo, AccountStatus, EvmStorageSlot};
let mut multi_keys = MultiAddedRemovedKeys::new();
let address = Address::random();
let slot = U256::from(42);
let hashed_addr = keccak256(address);
let hashed_slot = keccak256(B256::from(slot));
// Update 1: Create slot with value 100
let mut update1 = EvmState::default();
update1.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 0,
storage: std::iter::once((
slot,
EvmStorageSlot::new_changed(U256::ZERO, U256::from(100), 0),
))
.collect(),
status: AccountStatus::Touched,
},
);
multi_keys.record_removals(&update1);
// Slot should NOT be marked as removed (value is 100, not 0)
assert!(
multi_keys.get_storage(&hashed_addr).is_none() ||
!multi_keys.get_storage(&hashed_addr).unwrap().is_removed(&hashed_slot)
);
// Update 2: Delete slot (set to 0)
let mut update2 = EvmState::default();
update2.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 1,
storage: std::iter::once((
slot,
EvmStorageSlot::new_changed(U256::from(100), U256::ZERO, 1),
))
.collect(),
status: AccountStatus::Touched,
},
);
multi_keys.record_removals(&update2);
// Slot should be marked as removed
assert!(multi_keys.get_storage(&hashed_addr).unwrap().is_removed(&hashed_slot));
// Update 3: Recreate slot with value 200
let mut update3 = EvmState::default();
update3.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 2,
storage: std::iter::once((
slot,
EvmStorageSlot::new_changed(U256::ZERO, U256::from(200), 2),
))
.collect(),
status: AccountStatus::Touched,
},
);
multi_keys.record_removals(&update3);
// KEY ASSERTION: Still removed after recreation!
// This is the critical difference from update_with_state.
// Removals are monotonic - once removed, stays removed for proof invalidation.
assert!(
multi_keys.get_storage(&hashed_addr).unwrap().is_removed(&hashed_slot),
"slot should remain marked as removed even after recreation"
);
}
#[test]
fn test_record_removals_selfdestruct() {
use alloy_primitives::Address;
use revm_state::{Account, AccountInfo, AccountStatus};
let mut multi_keys = MultiAddedRemovedKeys::new();
let address = Address::random();
let hashed_addr = keccak256(address);
// Selfdestruct the account (must also be Touched to be processed)
let mut update = EvmState::default();
update.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 0,
storage: Default::default(),
status: AccountStatus::SelfDestructed | AccountStatus::Touched,
},
);
multi_keys.record_removals(&update);
// Account should be marked as removed
assert!(multi_keys.get_accounts().is_removed(&hashed_addr));
}
#[test]
fn test_record_removals_ignores_untouched() {
use alloy_primitives::Address;
use revm_state::{Account, AccountInfo, AccountStatus, EvmStorageSlot};
let mut multi_keys = MultiAddedRemovedKeys::new();
let address = Address::random();
let slot = U256::from(1);
let hashed_addr = keccak256(address);
let hashed_slot = keccak256(B256::from(slot));
// Create an untouched account with zero storage
let mut update = EvmState::default();
update.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 0,
storage: std::iter::once((
slot,
EvmStorageSlot::new_changed(U256::from(100), U256::ZERO, 0),
))
.collect(),
status: AccountStatus::default(), // NOT touched
},
);
multi_keys.record_removals(&update);
// Should NOT be marked as removed because account wasn't touched
assert!(
multi_keys.get_storage(&hashed_addr).is_none() ||
!multi_keys.get_storage(&hashed_addr).unwrap().is_removed(&hashed_slot)
);
}
#[test]
fn test_update_with_state_account_with_balance() {
let mut multi_keys = MultiAddedRemovedKeys::new();

Some files were not shown because too many files have changed in this diff Show More