Compare commits

..

4 Commits

Author SHA1 Message Date
Georgios Konstantopoulos
a9eabeefb2 feat(stages): make prewarming runnable with new_with_prewarm constructor
Add ExecutionStage::new_with_prewarm() constructor that directly creates
an execution stage with prewarming enabled. This allows running prewarming
on a reth box by modifying the pipeline setup.

Changes:
- Add MaybePrewarmFactory trait and NoPrewarm/PrewarmFactory types for
  type-safe prewarm configuration
- Add new_with_prewarm() constructor that takes provider factory and cache size
- Actually spawn prewarm threads during execution loop
- Add example showing how to enable prewarming

Usage:
1. Modify crates/node/builder/src/setup.rs
2. Replace ExecutionStage::new() with ExecutionStage::new_with_prewarm()
3. Pass provider_factory.clone() and cache size (e.g., 8_000_000_000 for 8GB)
4. Build and run: cargo build --release -p reth

Metrics to monitor:
- sync.stages.execution.prewarm.prewarm_blocks_started
- sync.stages.execution.prewarm.prewarm_blocks_completed
- sync.stages.execution.caching.{account,storage,code}_cache_{hits,misses}

Amp-Thread-ID: https://ampcode.com/threads/T-019be803-c0e3-77f8-a172-7fe0163703a8
Co-authored-by: Amp <amp@ampcode.com>
2026-01-23 00:44:42 +00:00
Georgios Konstantopoulos
c4b2ed4faa fix(stages): remove 'static bound from ExecutionStage Stage impl
The 'static bound was added during the prewarm POC but is not needed
since PrewarmController is no longer stored as a struct field. This
bound was propagating to downstream crates and breaking compilation.

Also:
- Remove unused PrewarmController import
- Add #[allow(dead_code)] for intentionally unused POC fields
- Remove prewarm controller field (will be created on-demand later)
2026-01-23 00:22:43 +00:00
Georgios Konstantopoulos
a39a689c11 test(stages): add unit tests for prewarm controller 2026-01-23 00:02:21 +00:00
Georgios Konstantopoulos
831357225f feat(stages): add execution stage prewarming POC
Introduces prewarming support for the ExecutionStage during staged sync.
This POC adds the infrastructure to spawn background threads that execute
future blocks speculatively, warming the execution cache for faster main
execution.

Key changes:
- Add execution_cache.rs with ExecutionCache, CachedStateProvider, and
  CachedStateMetrics (adapted from engine-tree's cached_state)
- Add execution_prewarm.rs with PrewarmController for managing prewarm tasks
- Integrate PrewarmController into ExecutionStage with .with_prewarm() builder
- Add prewarm spawn point in execute loop (spawns for block N+1 while
  executing block N)
- Cancel prewarm on batch commit to prevent stale cache entries

The POC demonstrates the integration points. Full implementation requires:
1. Wrapping main executor's state provider in CachedStateProvider
2. Passing a cloneable provider factory to prewarm threads
3. Actual transaction execution in prewarm (currently simulated)

Co-authored-by: Amp <amp@sourcegraph.com>
2026-01-23 00:01:37 +00:00
216 changed files with 4473 additions and 7533 deletions

2
.github/CODEOWNERS vendored
View File

@@ -37,7 +37,7 @@ crates/storage/db/ @joshieDo
crates/storage/errors/ @joshieDo
crates/storage/libmdbx-rs/ @shekhirin
crates/storage/nippy-jar/ @joshieDo @shekhirin
crates/storage/provider/ @joshieDo @shekhirin @yongkangc
crates/storage/provider/ @joshieDo @shekhirin
crates/storage/storage-api/ @joshieDo
crates/tasks/ @mattsse
crates/tokio-util/ @mattsse

54
.github/workflows/docker-git.yml vendored Normal file
View File

@@ -0,0 +1,54 @@
# Publishes the Docker image, only to be used with `workflow_dispatch`. The
# images from this workflow will be tagged with the git sha of the branch used
# and will NOT tag it as `latest`.
name: docker-git
on:
workflow_dispatch: {}
env:
REPO_NAME: ${{ github.repository_owner }}/reth
IMAGE_NAME: ${{ github.repository_owner }}/reth
OP_IMAGE_NAME: ${{ github.repository_owner }}/op-reth
CARGO_TERM_COLOR: always
DOCKER_IMAGE_NAME: ghcr.io/${{ github.repository_owner }}/reth
OP_DOCKER_IMAGE_NAME: ghcr.io/${{ github.repository_owner }}/op-reth
DOCKER_USERNAME: ${{ github.actor }}
GIT_SHA: ${{ github.sha }}
jobs:
build:
name: build and push
runs-on: ubuntu-24.04
permissions:
packages: write
contents: read
strategy:
fail-fast: false
matrix:
build:
- name: 'Build and push the git-sha-tagged reth image'
command: 'make PROFILE=maxperf GIT_SHA=$GIT_SHA docker-build-push-git-sha'
- name: 'Build and push the git-sha-tagged op-reth image'
command: 'make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME GIT_SHA=$GIT_SHA PROFILE=maxperf op-docker-build-push-git-sha'
steps:
- uses: actions/checkout@v6
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
- name: Install cross main
id: cross_main
run: |
cargo install cross --git https://github.com/cross-rs/cross
- name: Log in to Docker
run: |
echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io --username ${DOCKER_USERNAME} --password-stdin
- name: Set up Docker builder
run: |
docker run --privileged --rm tonistiigi/binfmt --install arm64,amd64
docker buildx create --use --name cross-builder
- name: Build and push ${{ matrix.build.name }}
run: ${{ matrix.build.command }}

65
.github/workflows/docker-nightly.yml vendored Normal file
View File

@@ -0,0 +1,65 @@
# Publishes the nightly Docker image.
name: docker-nightly
on:
workflow_dispatch:
schedule:
- cron: "0 1 * * *"
env:
REPO_NAME: ${{ github.repository_owner }}/reth
IMAGE_NAME: ${{ github.repository_owner }}/reth
OP_IMAGE_NAME: ${{ github.repository_owner }}/op-reth
CARGO_TERM_COLOR: always
DOCKER_IMAGE_NAME: ghcr.io/${{ github.repository_owner }}/reth
OP_DOCKER_IMAGE_NAME: ghcr.io/${{ github.repository_owner }}/op-reth
DOCKER_USERNAME: ${{ github.actor }}
jobs:
build:
name: build and push
runs-on: ubuntu-24.04
permissions:
packages: write
contents: read
strategy:
fail-fast: false
matrix:
build:
- name: 'Build and push the nightly reth image'
command: 'make PROFILE=maxperf docker-build-push-nightly'
- name: 'Build and push the nightly edge profiling reth image'
command: 'make PROFILE=profiling docker-build-push-nightly-edge-profiling'
- name: 'Build and push the nightly profiling reth image'
command: 'make PROFILE=profiling docker-build-push-nightly-profiling'
- name: 'Build and push the nightly op-reth image'
command: 'make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME PROFILE=maxperf op-docker-build-push-nightly'
- name: 'Build and push the nightly edge profiling op-reth image'
command: 'make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME PROFILE=profiling op-docker-build-push-nightly-edge-profiling'
- name: 'Build and push the nightly profiling op-reth image'
command: 'make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME PROFILE=profiling op-docker-build-push-nightly-profiling'
steps:
- uses: actions/checkout@v6
- name: Remove bloatware
uses: laverdet/remove-bloatware@v1.0.0
with:
docker: true
lang: rust
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
- name: Install cross main
id: cross_main
run: |
cargo install cross --git https://github.com/cross-rs/cross
- name: Log in to Docker
run: |
echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io --username ${DOCKER_USERNAME} --password-stdin
- name: Set up Docker builder
run: |
docker run --privileged --rm tonistiigi/binfmt --install arm64,amd64
docker buildx create --use --name cross-builder
- name: Build and push ${{ matrix.build.name }}
run: ${{ matrix.build.command }}

View File

@@ -1,9 +1,4 @@
# Publishes Docker images.
#
# Triggers:
# - Push tag v*: builds release (RC or latest)
# - Schedule: builds nightly + profiling
# - Manual: builds git-sha or nightly
# Publishes the Docker image.
name: docker
@@ -11,94 +6,84 @@ on:
push:
tags:
- v*
schedule:
- cron: "0 1 * * *"
workflow_dispatch:
inputs:
build_type:
description: "Build type"
required: true
type: choice
options:
- git-sha
- nightly
default: git-sha
dry_run:
description: "Skip pushing images (dry run)"
required: false
type: boolean
default: false
env:
IMAGE_NAME: ${{ github.repository_owner }}/reth
OP_IMAGE_NAME: ${{ github.repository_owner }}/op-reth
CARGO_TERM_COLOR: always
DOCKER_IMAGE_NAME: ghcr.io/${{ github.repository_owner }}/reth
OP_DOCKER_IMAGE_NAME: ghcr.io/${{ github.repository_owner }}/op-reth
DOCKER_USERNAME: ${{ github.actor }}
jobs:
build:
name: Build Docker images
build-rc:
if: contains(github.ref, '-rc')
name: build and push as release candidate
runs-on: ubuntu-24.04
permissions:
packages: write
contents: read
id-token: write
strategy:
fail-fast: false
matrix:
build:
- name: "Build and push reth image"
command: "make IMAGE_NAME=$IMAGE_NAME DOCKER_IMAGE_NAME=$DOCKER_IMAGE_NAME PROFILE=maxperf docker-build-push"
- name: "Build and push op-reth image"
command: "make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME PROFILE=maxperf op-docker-build-push"
steps:
- uses: actions/checkout@v6
- name: Set up Depot CLI
uses: depot/setup-action@v1
- name: Log in to GHCR
uses: docker/login-action@v3
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Get git info for vergen
id: git
cache-on-failure: true
- name: Install cross main
id: cross_main
run: |
echo "sha=${{ github.sha }}" >> "$GITHUB_OUTPUT"
echo "describe=$(git describe --always --tags)" >> "$GITHUB_OUTPUT"
echo "dirty=false" >> "$GITHUB_OUTPUT"
- name: Determine build parameters
id: params
cargo install cross --git https://github.com/cross-rs/cross
- name: Log in to Docker
run: |
REGISTRY="ghcr.io/${{ github.repository_owner }}"
echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io --username ${DOCKER_USERNAME} --password-stdin
- name: Set up Docker builder
run: |
docker run --privileged --rm tonistiigi/binfmt --install arm64,amd64
docker buildx create --use --name cross-builder
- name: Build and push ${{ matrix.build.name }}
run: ${{ matrix.build.command }}
if [[ "${{ github.event_name }}" == "push" ]]; then
VERSION="${GITHUB_REF#refs/tags/}"
echo "targets=ethereum optimism" >> "$GITHUB_OUTPUT"
# Add 'latest' tag for non-RC releases
if [[ ! "$VERSION" =~ -rc ]]; then
echo "ethereum_tags=${REGISTRY}/reth:${VERSION},${REGISTRY}/reth:latest" >> "$GITHUB_OUTPUT"
echo "optimism_tags=${REGISTRY}/op-reth:${VERSION},${REGISTRY}/op-reth:latest" >> "$GITHUB_OUTPUT"
else
echo "ethereum_tags=${REGISTRY}/reth:${VERSION}" >> "$GITHUB_OUTPUT"
echo "optimism_tags=${REGISTRY}/op-reth:${VERSION}" >> "$GITHUB_OUTPUT"
fi
elif [[ "${{ github.event_name }}" == "schedule" ]] || [[ "${{ inputs.build_type }}" == "nightly" ]]; then
echo "targets=nightly" >> "$GITHUB_OUTPUT"
echo "ethereum_tags=${REGISTRY}/reth:nightly" >> "$GITHUB_OUTPUT"
echo "optimism_tags=${REGISTRY}/op-reth:nightly" >> "$GITHUB_OUTPUT"
else
# git-sha build
echo "targets=ethereum optimism" >> "$GITHUB_OUTPUT"
echo "ethereum_tags=${REGISTRY}/reth:${{ github.sha }}" >> "$GITHUB_OUTPUT"
echo "optimism_tags=${REGISTRY}/op-reth:${{ github.sha }}" >> "$GITHUB_OUTPUT"
fi
- name: Build and push images
uses: depot/bake-action@v1
env:
VERGEN_GIT_SHA: ${{ steps.git.outputs.sha }}
VERGEN_GIT_DESCRIBE: ${{ steps.git.outputs.describe }}
VERGEN_GIT_DIRTY: ${{ steps.git.outputs.dirty }}
DEPOT_TOKEN: ${{ secrets.DEPOT_TOKEN }}
build:
if: ${{ !contains(github.ref, '-rc') }}
name: build and push as latest
runs-on: ubuntu-24.04
permissions:
packages: write
contents: read
strategy:
fail-fast: false
matrix:
build:
- name: "Build and push reth image"
command: "make IMAGE_NAME=$IMAGE_NAME DOCKER_IMAGE_NAME=$DOCKER_IMAGE_NAME PROFILE=maxperf docker-build-push-latest"
- name: "Build and push op-reth image"
command: "make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME PROFILE=maxperf op-docker-build-push-latest"
steps:
- uses: actions/checkout@v6
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
project: ${{ vars.DEPOT_PROJECT_ID }}
files: docker-bake.hcl
targets: ${{ steps.params.outputs.targets }}
push: ${{ !(github.event_name == 'workflow_dispatch' && inputs.dry_run) }}
set: |
ethereum.tags=${{ steps.params.outputs.ethereum_tags }}
optimism.tags=${{ steps.params.outputs.optimism_tags }}
cache-on-failure: true
- name: Install cross main
id: cross_main
run: |
cargo install cross --git https://github.com/cross-rs/cross
- name: Log in to Docker
run: |
echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io --username ${DOCKER_USERNAME} --password-stdin
- name: Set up Docker builder
run: |
docker run --privileged --rm tonistiigi/binfmt --install arm64,amd64
docker buildx create --use --name cross-builder
- name: Build and push ${{ matrix.build.name }}
run: ${{ matrix.build.command }}

View File

@@ -44,24 +44,3 @@ jobs:
--exclude 'op-reth' \
--exclude 'reth' \
-E 'binary(e2e_testsuite)'
rocksdb:
name: e2e-rocksdb
runs-on: depot-ubuntu-latest-4
env:
RUST_BACKTRACE: 1
timeout-minutes: 60
steps:
- uses: actions/checkout@v6
- uses: dtolnay/rust-toolchain@stable
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: taiki-e/install-action@nextest
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
- name: Run RocksDB e2e tests
run: |
cargo nextest run \
--locked --features "edge" \
-p reth-e2e-test-utils \
-E 'binary(rocksdb)'

187
Cargo.lock generated
View File

@@ -106,9 +106,9 @@ checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
[[package]]
name = "alloy-chains"
version = "0.2.30"
version = "0.2.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90f374d3c6d729268bbe2d0e0ff992bb97898b2df756691a62ee1d5f0506bc39"
checksum = "3842d8c52fcd3378039f4703dba392dca8b546b1c8ed6183048f8dab95b2be78"
dependencies = [
"alloy-primitives",
"alloy-rlp",
@@ -249,9 +249,9 @@ dependencies = [
[[package]]
name = "alloy-eip7928"
version = "0.3.2"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3231de68d5d6e75332b7489cfcc7f4dfabeba94d990a10e4b923af0e6623540"
checksum = "6adac476434bf024279164dcdca299309f0c7d1e3557024eb7a83f8d9d01c6b5"
dependencies = [
"alloy-primitives",
"alloy-rlp",
@@ -495,7 +495,7 @@ dependencies = [
"async-stream",
"async-trait",
"auto_impl",
"dashmap",
"dashmap 6.1.0",
"either",
"futures",
"futures-utils-wasm",
@@ -1774,7 +1774,7 @@ dependencies = [
"bytemuck",
"cfg-if",
"cow-utils",
"dashmap",
"dashmap 6.1.0",
"dynify",
"fast-float2",
"float16",
@@ -1968,6 +1968,12 @@ version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7575182f7272186991736b70173b0ea045398f984bf5ebbb3804736ce1330c9d"
[[package]]
name = "bytecount"
version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "175812e0be2bccb6abe50bb8d566126198344f707e304f45c648fd8f2cc0365e"
[[package]]
name = "bytemuck"
version = "1.24.0"
@@ -2057,6 +2063,19 @@ dependencies = [
"serde_core",
]
[[package]]
name = "cargo_metadata"
version = "0.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa"
dependencies = [
"camino",
"cargo-platform 0.1.9",
"semver 1.0.27",
"serde",
"serde_json",
]
[[package]]
name = "cargo_metadata"
version = "0.19.2"
@@ -2108,9 +2127,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.2.54"
version = "1.2.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6354c81bbfd62d9cfa9cb3c773c2b7b2a3a482d569de977fd0e961f6e7c00583"
checksum = "755d2fce177175ffca841e9a06afdb2c4ab0f593d53b4dee48147dfaade85932"
dependencies = [
"find-msvc-tools",
"jobserver",
@@ -2911,6 +2930,19 @@ dependencies = [
"syn 2.0.114",
]
[[package]]
name = "dashmap"
version = "5.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
dependencies = [
"cfg-if",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "dashmap"
version = "6.1.0"
@@ -3463,6 +3495,15 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "error-chain"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc"
dependencies = [
"version_check",
]
[[package]]
name = "ethereum_hashing"
version = "0.7.0"
@@ -4041,12 +4082,11 @@ checksum = "8591b0bcc8a98a64310a2fae1bb3e9b8564dd10e381e6e28010fde8e8e8568db"
[[package]]
name = "fixed-cache"
version = "0.1.7"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0aaafa7294e9617eb29e5c684a3af33324ef512a1bf596af2d1938a03798da29"
checksum = "25d3af83468398d500e9bc19e001812dcb1a11e4d3d6a5956c789aa3c11a8cb5"
dependencies = [
"equivalent",
"typeid",
]
[[package]]
@@ -4811,7 +4851,7 @@ dependencies = [
"libc",
"percent-encoding",
"pin-project-lite",
"socket2 0.6.2",
"socket2 0.6.1",
"tokio",
"tower-service",
"tracing",
@@ -5553,9 +5593,9 @@ dependencies = [
[[package]]
name = "libm"
version = "0.2.16"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981"
checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de"
[[package]]
name = "libp2p-identity"
@@ -5927,6 +5967,21 @@ dependencies = [
"unicase",
]
[[package]]
name = "mini-moka"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c325dfab65f261f386debee8b0969da215b3fa0037e74c8a1234db7ba986d803"
dependencies = [
"crossbeam-channel",
"crossbeam-utils",
"dashmap 5.5.3",
"skeptic",
"smallvec",
"tagptr",
"triomphe",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@@ -6153,9 +6208,9 @@ dependencies = [
[[package]]
name = "num-conv"
version = "0.2.0"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
[[package]]
name = "num-integer"
@@ -6460,9 +6515,9 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"
[[package]]
name = "openssl-probe"
version = "0.2.1"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe"
checksum = "9f50d9b3dabb09ecd771ad0aa242ca6894994c130308ca3d7684634df8037391"
[[package]]
name = "opentelemetry"
@@ -6967,9 +7022,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.106"
version = "1.0.105"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934"
checksum = "535d180e0ecab6268a3e718bb9fd44db66bbbc256257165fc699dadf70d16fe7"
dependencies = [
"unicode-ident",
]
@@ -7117,6 +7172,17 @@ dependencies = [
"syn 2.0.114",
]
[[package]]
name = "pulldown-cmark"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57206b407293d2bcd3af849ce869d52068623f19e1b5ff8e8778e3309439682b"
dependencies = [
"bitflags 2.10.0",
"memchr",
"unicase",
]
[[package]]
name = "quanta"
version = "0.12.6"
@@ -7160,7 +7226,7 @@ dependencies = [
"quinn-udp",
"rustc-hash",
"rustls",
"socket2 0.6.2",
"socket2 0.6.1",
"thiserror 2.0.18",
"tokio",
"tracing",
@@ -7197,16 +7263,16 @@ dependencies = [
"cfg_aliases",
"libc",
"once_cell",
"socket2 0.6.2",
"socket2 0.6.1",
"tracing",
"windows-sys 0.60.2",
]
[[package]]
name = "quote"
version = "1.0.44"
version = "1.0.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21b2ebcf727b7760c461f091f9f0f539b77b8e87f2fd88131e7f1b433b3cece4"
checksum = "dc74d9a594b72ae6656596548f56f667211f8a97b3d4c3d467150794690dc40a"
dependencies = [
"proc-macro2",
]
@@ -8399,13 +8465,13 @@ dependencies = [
"assert_matches",
"codspeed-criterion-compat",
"crossbeam-channel",
"dashmap",
"dashmap 6.1.0",
"derive_more",
"eyre",
"fixed-cache",
"futures",
"metrics",
"metrics-util",
"mini-moka",
"moka",
"parking_lot",
"proptest",
@@ -9045,7 +9111,7 @@ dependencies = [
"bitflags 2.10.0",
"byteorder",
"codspeed-criterion-compat",
"dashmap",
"dashmap 6.1.0",
"derive_more",
"parking_lot",
"rand 0.9.2",
@@ -9130,7 +9196,6 @@ dependencies = [
"reth-eth-wire-types",
"reth-ethereum-forks",
"reth-ethereum-primitives",
"reth-evm-ethereum",
"reth-fs-util",
"reth-metrics",
"reth-net-banlist",
@@ -9998,7 +10063,6 @@ dependencies = [
"parking_lot",
"reth-chain-state",
"reth-chainspec",
"reth-evm",
"reth-metrics",
"reth-optimism-chainspec",
"reth-optimism-evm",
@@ -10154,7 +10218,7 @@ dependencies = [
"alloy-primitives",
"alloy-rpc-types-engine",
"assert_matches",
"dashmap",
"dashmap 6.1.0",
"eyre",
"itertools 0.14.0",
"metrics",
@@ -10217,7 +10281,6 @@ dependencies = [
"reth-stages",
"reth-stages-types",
"reth-static-file-types",
"reth-storage-api",
"reth-testing-utils",
"reth-tokio-util",
"reth-tracing",
@@ -10248,7 +10311,6 @@ dependencies = [
"strum 0.27.2",
"thiserror 2.0.18",
"toml",
"tracing",
]
[[package]]
@@ -10726,6 +10788,8 @@ dependencies = [
"eyre",
"futures-util",
"itertools 0.14.0",
"metrics",
"mini-moka",
"num-traits",
"paste",
"rand 0.9.2",
@@ -10749,6 +10813,7 @@ dependencies = [
"reth-execution-types",
"reth-exex",
"reth-fs-util",
"reth-metrics",
"reth-network-p2p",
"reth-network-peers",
"reth-primitives-traits",
@@ -11055,8 +11120,6 @@ dependencies = [
"reth-chainspec",
"reth-eth-wire-types",
"reth-ethereum-primitives",
"reth-evm",
"reth-evm-ethereum",
"reth-execution-types",
"reth-fs-util",
"reth-metrics",
@@ -11065,7 +11128,6 @@ dependencies = [
"reth-storage-api",
"reth-tasks",
"reth-tracing",
"revm",
"revm-interpreter",
"revm-primitives",
"rustc-hash",
@@ -11186,7 +11248,7 @@ dependencies = [
"alloy-rlp",
"codspeed-criterion-compat",
"crossbeam-channel",
"dashmap",
"dashmap 6.1.0",
"derive_more",
"itertools 0.14.0",
"metrics",
@@ -11409,9 +11471,9 @@ dependencies = [
[[package]]
name = "revm-inspectors"
version = "0.34.1"
version = "0.34.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a24ca988ae1f7a0bb5688630579c00e867cd9f1df0a2f040623887f63d3b414c"
checksum = "4a1ce3f52a052d78cc251714d57bf05dc8bc75e269677de11805d3153300a2cd"
dependencies = [
"alloy-primitives",
"alloy-rpc-types-eth",
@@ -12322,6 +12384,21 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d"
[[package]]
name = "skeptic"
version = "0.13.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8"
dependencies = [
"bytecount",
"cargo_metadata 0.14.2",
"error-chain",
"glob",
"pulldown-cmark",
"tempfile",
"walkdir",
]
[[package]]
name = "sketches-ddsketch"
version = "0.3.0"
@@ -12390,9 +12467,9 @@ dependencies = [
[[package]]
name = "socket2"
version = "0.6.2"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86f4aa3ad99f2088c990dfa82d367e19cb29268ed67c574d10d0a4bfe71f07e0"
checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881"
dependencies = [
"libc",
"windows-sys 0.60.2",
@@ -12793,9 +12870,9 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.46"
version = "0.3.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9da98b7d9b7dad93488a84b8248efc35352b0b2657397d4167e7ad67e5d535e5"
checksum = "f9e442fc33d7fdb45aa9bfeb312c095964abdf596f7567261062b2a7107aaabd"
dependencies = [
"deranged",
"itoa",
@@ -12811,15 +12888,15 @@ dependencies = [
[[package]]
name = "time-core"
version = "0.1.8"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca"
checksum = "8b36ee98fd31ec7426d599183e8fe26932a8dc1fb76ddb6214d05493377d34ca"
[[package]]
name = "time-macros"
version = "0.2.26"
version = "0.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78cc610bac2dcee56805c99642447d4c5dbde4d01f752ffea0199aee1f601dc4"
checksum = "71e552d1249bf61ac2a52db88179fd0673def1e1ad8243a00d9ec9ed71fee3dd"
dependencies = [
"num-conv",
"time-core",
@@ -12882,7 +12959,7 @@ dependencies = [
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.6.2",
"socket2 0.6.1",
"tokio-macros",
"windows-sys 0.61.2",
]
@@ -13362,6 +13439,12 @@ dependencies = [
"rlp",
]
[[package]]
name = "triomphe"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd69c5aa8f924c7519d6372789a74eac5b94fb0f8fcf0d4a97eb0bfc3e785f39"
[[package]]
name = "try-lock"
version = "0.2.5"
@@ -13387,12 +13470,6 @@ dependencies = [
"utf-8",
]
[[package]]
name = "typeid"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c"
[[package]]
name = "typenum"
version = "1.19.0"
@@ -13549,9 +13626,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "1.20.0"
version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee48d38b119b0cd71fe4141b30f5ba9c7c5d9f4e7a3a8b4a674e4b6ef789976f"
checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a"
dependencies = [
"getrandom 0.3.4",
"js-sys",

View File

@@ -481,7 +481,7 @@ revm-primitives = { version = "22.0.0", default-features = false }
revm-interpreter = { version = "32.0.0", default-features = false }
revm-database-interface = { version = "9.0.0", default-features = false }
op-revm = { version = "15.0.0", default-features = false }
revm-inspectors = "0.34.1"
revm-inspectors = "0.34.0"
# eth
alloy-chains = { version = "0.2.5", default-features = false }
@@ -588,7 +588,7 @@ tracing-appender = "0.2"
url = { version = "2.3", default-features = false }
zstd = "0.13"
byteorder = "1"
fixed-cache = { version = "0.1.7", features = ["stats"] }
mini-moka = "0.10"
moka = "0.12"
tar-no-std = { version = "0.3.2", default-features = false }
miniz_oxide = { version = "0.8.4", default-features = false }

15
Dockerfile.cross Normal file
View File

@@ -0,0 +1,15 @@
# This image is meant to enable cross-architecture builds.
# It assumes the reth binary has already been compiled for `$TARGETPLATFORM` and is
# locatable in `./dist/bin/$TARGETARCH`
FROM --platform=$TARGETPLATFORM ubuntu:22.04
LABEL org.opencontainers.image.source=https://github.com/paradigmxyz/reth
LABEL org.opencontainers.image.licenses="MIT OR Apache-2.0"
# Filled by docker buildx
ARG TARGETARCH
COPY ./dist/bin/$TARGETARCH/reth /usr/local/bin/reth
EXPOSE 30303 30303/udp 9001 8545 8546
ENTRYPOINT ["/usr/local/bin/reth"]

View File

@@ -1,99 +0,0 @@
# syntax=docker/dockerfile:1
# Unified Dockerfile for reth and op-reth, optimized for Depot builds
# Usage:
# reth: --build-arg BINARY=reth
# op-reth: --build-arg BINARY=op-reth --build-arg MANIFEST_PATH=crates/optimism/bin
FROM lukemathwalker/cargo-chef:latest-rust-1 AS chef
WORKDIR /app
LABEL org.opencontainers.image.source=https://github.com/paradigmxyz/reth
LABEL org.opencontainers.image.licenses="MIT OR Apache-2.0"
RUN apt-get update && apt-get install -y libclang-dev pkg-config
# Install sccache for compilation caching
RUN cargo install sccache --locked
ENV RUSTC_WRAPPER=sccache
ENV SCCACHE_DIR=/sccache
ENV SCCACHE_WEBDAV_ENDPOINT=https://cache.depot.dev
# Builds a cargo-chef plan
FROM chef AS planner
COPY --exclude=.git . .
RUN cargo chef prepare --recipe-path recipe.json
FROM chef AS builder
COPY --from=planner /app/recipe.json recipe.json
# Binary to build (reth or op-reth)
ARG BINARY=reth
# Manifest path for the binary
ARG MANIFEST_PATH=bin/reth
# Build profile, release by default
ARG BUILD_PROFILE=release
ENV BUILD_PROFILE=$BUILD_PROFILE
# Extra Cargo flags
ARG RUSTFLAGS=""
ENV RUSTFLAGS="$RUSTFLAGS"
# Extra Cargo features
ARG FEATURES=""
ENV FEATURES=$FEATURES
# Git info for vergen (since .git is excluded from Docker context)
ARG VERGEN_GIT_SHA=""
ARG VERGEN_GIT_DESCRIBE=""
ARG VERGEN_GIT_DIRTY="false"
ENV VERGEN_GIT_SHA=$VERGEN_GIT_SHA
ENV VERGEN_GIT_DESCRIBE=$VERGEN_GIT_DESCRIBE
ENV VERGEN_GIT_DIRTY=$VERGEN_GIT_DIRTY
# Build dependencies
RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \
--mount=type=cache,target=/usr/local/cargo/registry,sharing=shared \
--mount=type=cache,target=/usr/local/cargo/git,sharing=shared \
--mount=type=cache,target=$SCCACHE_DIR,sharing=shared \
cargo chef cook --profile $BUILD_PROFILE --features "$FEATURES" --locked --recipe-path recipe.json --manifest-path $MANIFEST_PATH/Cargo.toml
# Build application
COPY --exclude=.git . .
RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \
--mount=type=cache,target=/usr/local/cargo/registry,sharing=shared \
--mount=type=cache,target=/usr/local/cargo/git,sharing=shared \
--mount=type=cache,target=$SCCACHE_DIR,sharing=shared \
cargo build --profile $BUILD_PROFILE --features "$FEATURES" --locked --bin $BINARY --manifest-path $MANIFEST_PATH/Cargo.toml
RUN sccache --show-stats || true
# Copy binary to a known location (ARG not resolved in COPY)
# Note: Custom profiles like maxperf/profiling output to target/<profile>/, not target/release/
RUN cp /app/target/$BUILD_PROFILE/$BINARY /app/binary || \
cp /app/target/release/$BINARY /app/binary
FROM ubuntu:24.04 AS runtime
WORKDIR /app
# Binary name for entrypoint
ARG BINARY=reth
# Install runtime dependencies
RUN apt-get update && \
apt-get install -y --no-install-recommends ca-certificates && \
rm -rf /var/lib/apt/lists/*
# Copy binary from build stage and create canonical symlink for entrypoint
COPY --from=builder /app/binary /usr/local/bin/
RUN mv /usr/local/bin/binary /usr/local/bin/$BINARY && \
ln -s /usr/local/bin/$BINARY /usr/local/bin/reth-binary && \
chmod +x /usr/local/bin/$BINARY
# Copy licenses
COPY LICENSE-* ./
EXPOSE 30303 30303/udp 9001 8545 8546
ENTRYPOINT ["/usr/local/bin/reth-binary"]

15
DockerfileOp.cross Normal file
View File

@@ -0,0 +1,15 @@
# This image is meant to enable cross-architecture builds.
# It assumes the reth binary has already been compiled for `$TARGETPLATFORM` and is
# locatable in `./dist/bin/$TARGETARCH`
FROM --platform=$TARGETPLATFORM ubuntu:22.04
LABEL org.opencontainers.image.source=https://github.com/paradigmxyz/reth
LABEL org.opencontainers.image.licenses="MIT OR Apache-2.0"
# Filled by docker buildx
ARG TARGETARCH
COPY ./dist/bin/$TARGETARCH/op-reth /usr/local/bin/op-reth
EXPOSE 30303 30303/udp 9001 8545 8546
ENTRYPOINT ["/usr/local/bin/op-reth"]

134
Makefile
View File

@@ -35,6 +35,9 @@ EEST_TESTS_TAG := v4.5.0
EEST_TESTS_URL := https://github.com/ethereum/execution-spec-tests/releases/download/$(EEST_TESTS_TAG)/fixtures_stable.tar.gz
EEST_TESTS_DIR := ./testing/ef-tests/execution-spec-tests
# The docker image name
DOCKER_IMAGE_NAME ?= ghcr.io/paradigmxyz/reth
##@ Help
.PHONY: help
@@ -239,6 +242,137 @@ install-reth-bench: ## Build and install the reth binary under `$(CARGO_HOME)/bi
--features "$(FEATURES)" \
--profile "$(PROFILE)"
##@ Docker
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --driver docker-container --name cross-builder`
.PHONY: docker-build-push
docker-build-push: ## Build and push a cross-arch Docker image tagged with the latest git tag.
$(call docker_build_push,$(GIT_TAG),$(GIT_TAG))
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --driver docker-container --name cross-builder`
.PHONY: docker-build-push-git-sha
docker-build-push-git-sha: ## Build and push a cross-arch Docker image tagged with the latest git sha.
$(call docker_build_push,$(GIT_SHA),$(GIT_SHA))
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --driver docker-container --name cross-builder`
.PHONY: docker-build-push-latest
docker-build-push-latest: ## Build and push a cross-arch Docker image tagged with the latest git tag and `latest`.
$(call docker_build_push,$(GIT_TAG),latest)
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --name cross-builder`
.PHONY: docker-build-push-nightly
docker-build-push-nightly: ## Build and push cross-arch Docker image tagged with the latest git tag with a `-nightly` suffix, and `latest-nightly`.
$(call docker_build_push,nightly,nightly)
.PHONY: docker-build-push-nightly-edge-profiling
docker-build-push-nightly-edge-profiling: FEATURES := $(FEATURES) edge
docker-build-push-nightly-edge-profiling: ## Build and push cross-arch Docker image with edge features tagged with `nightly-edge-profiling`.
$(call docker_build_push,nightly-edge-profiling,nightly-edge-profiling)
# Create a cross-arch Docker image with the given tags and push it
define docker_build_push
$(MAKE) FEATURES="$(FEATURES)" build-x86_64-unknown-linux-gnu
mkdir -p $(BIN_DIR)/amd64
cp $(CARGO_TARGET_DIR)/x86_64-unknown-linux-gnu/$(PROFILE)/reth $(BIN_DIR)/amd64/reth
$(MAKE) FEATURES="$(FEATURES)" build-aarch64-unknown-linux-gnu
mkdir -p $(BIN_DIR)/arm64
cp $(CARGO_TARGET_DIR)/aarch64-unknown-linux-gnu/$(PROFILE)/reth $(BIN_DIR)/arm64/reth
docker buildx build --file ./Dockerfile.cross . \
--platform linux/amd64,linux/arm64 \
--tag $(DOCKER_IMAGE_NAME):$(1) \
--tag $(DOCKER_IMAGE_NAME):$(2) \
--provenance=false \
--push
endef
##@ Optimism docker
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --driver docker-container --name cross-builder`
.PHONY: op-docker-build-push
op-docker-build-push: ## Build and push a cross-arch Docker image tagged with the latest git tag.
$(call op_docker_build_push,$(GIT_TAG),$(GIT_TAG))
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --driver docker-container --name cross-builder`
.PHONY: op-docker-build-push-git-sha
op-docker-build-push-git-sha: ## Build and push a cross-arch Docker image tagged with the latest git sha.
$(call op_docker_build_push,$(GIT_SHA),$(GIT_SHA))
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --driver docker-container --name cross-builder`
.PHONY: op-docker-build-push-latest
op-docker-build-push-latest: ## Build and push a cross-arch Docker image tagged with the latest git tag and `latest`.
$(call op_docker_build_push,$(GIT_TAG),latest)
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --name cross-builder`
.PHONY: op-docker-build-push-nightly
op-docker-build-push-nightly: ## Build and push cross-arch Docker image tagged with the latest git tag with a `-nightly` suffix, and `latest-nightly`.
$(call op_docker_build_push,nightly,nightly)
.PHONY: op-docker-build-push-nightly-edge-profiling
op-docker-build-push-nightly-edge-profiling: FEATURES := $(FEATURES) edge
op-docker-build-push-nightly-edge-profiling: ## Build and push cross-arch Docker image with edge features tagged with `nightly-edge-profiling`.
$(call op_docker_build_push,nightly-edge-profiling,nightly-edge-profiling)
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --name cross-builder`
.PHONY: docker-build-push-nightly-profiling
docker-build-push-nightly-profiling: ## Build and push cross-arch Docker image with profiling profile tagged with nightly-profiling.
$(call docker_build_push,nightly-profiling,nightly-profiling)
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --name cross-builder`
.PHONY: op-docker-build-push-nightly-profiling
op-docker-build-push-nightly-profiling: ## Build and push cross-arch Docker image tagged with the latest git tag with a `-nightly` suffix, and `latest-nightly`.
$(call op_docker_build_push,nightly-profiling,nightly-profiling)
# Create a cross-arch Docker image with the given tags and push it
define op_docker_build_push
$(MAKE) FEATURES="$(FEATURES)" op-build-x86_64-unknown-linux-gnu
mkdir -p $(BIN_DIR)/amd64
cp $(CARGO_TARGET_DIR)/x86_64-unknown-linux-gnu/$(PROFILE)/op-reth $(BIN_DIR)/amd64/op-reth
$(MAKE) FEATURES="$(FEATURES)" op-build-aarch64-unknown-linux-gnu
mkdir -p $(BIN_DIR)/arm64
cp $(CARGO_TARGET_DIR)/aarch64-unknown-linux-gnu/$(PROFILE)/op-reth $(BIN_DIR)/arm64/op-reth
docker buildx build --file ./DockerfileOp.cross . \
--platform linux/amd64,linux/arm64 \
--tag $(DOCKER_IMAGE_NAME):$(1) \
--tag $(DOCKER_IMAGE_NAME):$(2) \
--provenance=false \
--push
endef
##@ Other
.PHONY: clean

View File

@@ -3,7 +3,7 @@
use crate::{
authenticated_transport::AuthenticatedTransportConnect,
bench::{
helpers::{build_payload, parse_gas_limit, prepare_payload_request, rpc_block_to_header},
helpers::{build_payload, prepare_payload_request, rpc_block_to_header},
output::GasRampPayloadFile,
},
valid_payload::{call_forkchoice_updated, call_new_payload, payload_to_new_payload},
@@ -22,6 +22,29 @@ use reth_primitives_traits::constants::{GAS_LIMIT_BOUND_DIVISOR, MAXIMUM_GAS_LIM
use std::{path::PathBuf, time::Instant};
use tracing::info;
/// Parses a gas limit value with optional suffix: K for thousand, M for million, G for billion.
///
/// Examples: "30000000", "30M", "1G", "2G"
fn parse_gas_limit(s: &str) -> eyre::Result<u64> {
let s = s.trim();
if s.is_empty() {
return Err(eyre::eyre!("empty value"));
}
let (num_str, multiplier) = if let Some(prefix) = s.strip_suffix(['G', 'g']) {
(prefix, 1_000_000_000u64)
} else if let Some(prefix) = s.strip_suffix(['M', 'm']) {
(prefix, 1_000_000u64)
} else if let Some(prefix) = s.strip_suffix(['K', 'k']) {
(prefix, 1_000u64)
} else {
(s, 1u64)
};
let base: u64 = num_str.trim().parse()?;
base.checked_mul(multiplier).ok_or_else(|| eyre::eyre!("value overflow"))
}
/// `reth benchmark gas-limit-ramp` command.
#[derive(Debug, Parser)]
pub struct Command {
@@ -214,3 +237,50 @@ const fn should_stop(mode: RampMode, blocks_processed: u64, current_gas_limit: u
RampMode::TargetGasLimit(target) => current_gas_limit >= target,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_gas_limit_plain_number() {
assert_eq!(parse_gas_limit("30000000").unwrap(), 30_000_000);
assert_eq!(parse_gas_limit("1").unwrap(), 1);
assert_eq!(parse_gas_limit("0").unwrap(), 0);
}
#[test]
fn test_parse_gas_limit_k_suffix() {
assert_eq!(parse_gas_limit("1K").unwrap(), 1_000);
assert_eq!(parse_gas_limit("30k").unwrap(), 30_000);
assert_eq!(parse_gas_limit("100K").unwrap(), 100_000);
}
#[test]
fn test_parse_gas_limit_m_suffix() {
assert_eq!(parse_gas_limit("1M").unwrap(), 1_000_000);
assert_eq!(parse_gas_limit("30m").unwrap(), 30_000_000);
assert_eq!(parse_gas_limit("100M").unwrap(), 100_000_000);
}
#[test]
fn test_parse_gas_limit_g_suffix() {
assert_eq!(parse_gas_limit("1G").unwrap(), 1_000_000_000);
assert_eq!(parse_gas_limit("2g").unwrap(), 2_000_000_000);
assert_eq!(parse_gas_limit("10G").unwrap(), 10_000_000_000);
}
#[test]
fn test_parse_gas_limit_with_whitespace() {
assert_eq!(parse_gas_limit(" 1G ").unwrap(), 1_000_000_000);
assert_eq!(parse_gas_limit("2 M").unwrap(), 2_000_000);
}
#[test]
fn test_parse_gas_limit_errors() {
assert!(parse_gas_limit("").is_err());
assert!(parse_gas_limit("abc").is_err());
assert!(parse_gas_limit("G").is_err());
assert!(parse_gas_limit("-1G").is_err());
}
}

View File

@@ -3,9 +3,7 @@
//! This command fetches transactions from existing blocks and packs them into a single
//! large block using the `testing_buildBlockV1` RPC endpoint.
use crate::{
authenticated_transport::AuthenticatedTransportConnect, bench::helpers::parse_gas_limit,
};
use crate::authenticated_transport::AuthenticatedTransportConnect;
use alloy_eips::{BlockNumberOrTag, Typed2718};
use alloy_primitives::{Bytes, B256};
use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider};
@@ -204,26 +202,13 @@ pub struct Command {
jwt_secret: std::path::PathBuf,
/// Target gas to pack into the block.
/// Accepts short notation: K for thousand, M for million, G for billion (e.g., 1G = 1
/// billion).
#[arg(long, value_name = "TARGET_GAS", default_value = "30000000", value_parser = parse_gas_limit)]
#[arg(long, value_name = "TARGET_GAS", default_value = "30000000")]
target_gas: u64,
/// Block number to start fetching transactions from (required).
///
/// This must be the last canonical block BEFORE any gas limit ramping was performed.
/// The command collects transactions from historical blocks starting at this number
/// to pack into large blocks.
///
/// How to determine this value:
/// - If starting from a fresh node (no gas limit ramp yet): use the current chain tip
/// - If gas limit ramping has already been performed: use the block number that was the chain
/// tip BEFORE ramping began (you must track this yourself)
///
/// Using a block after ramping started will cause transaction collection to fail
/// because those blocks contain synthetic transactions that cannot be replayed.
/// Starting block number to fetch transactions from.
/// If not specified, starts from the engine's latest block.
#[arg(long, value_name = "FROM_BLOCK")]
from_block: u64,
from_block: Option<u64>,
/// Execute the payload (call newPayload + forkchoiceUpdated).
/// If false, only builds the payload and prints it.
@@ -299,7 +284,7 @@ impl Command {
format!("Failed to create output directory: {:?}", self.output_dir)
})?;
let start_block = self.from_block;
let start_block = self.from_block.unwrap_or(parent_number);
// Use pipelined execution when generating multiple payloads
if self.count > 1 {
@@ -401,38 +386,25 @@ impl Command {
let mut current_block = start_block;
for payload_idx in 0..count {
const MAX_RETRIES: u32 = 5;
let mut attempts = 0;
let result = loop {
attempts += 1;
match collector.collect(current_block).await {
Ok(res) => break Some(res),
Err(e) => {
if attempts >= MAX_RETRIES {
warn!(payload = payload_idx + 1, attempts, error = %e, "Failed to fetch transactions after max retries");
break None;
}
warn!(payload = payload_idx + 1, attempts, error = %e, "Failed to fetch transactions, retrying...");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
match collector.collect(current_block).await {
Ok((transactions, total_gas, next_block)) => {
info!(
payload = payload_idx + 1,
tx_count = transactions.len(),
total_gas,
blocks = format!("{}..{}", current_block, next_block),
"Fetched transactions"
);
current_block = next_block;
if tx_sender.send(transactions).await.is_err() {
break;
}
}
};
let Some((transactions, total_gas, next_block)) = result else {
break;
};
info!(
payload = payload_idx + 1,
tx_count = transactions.len(),
total_gas,
blocks = format!("{}..{}", current_block, next_block),
"Fetched transactions"
);
current_block = next_block;
if tx_sender.send(transactions).await.is_err() {
break;
Err(e) => {
warn!(payload = payload_idx + 1, error = %e, "Failed to fetch transactions");
break;
}
}
}
});

View File

@@ -1,29 +1,6 @@
//! Common helpers for reth-bench commands.
use crate::valid_payload::call_forkchoice_updated;
/// Parses a gas limit value with optional suffix: K for thousand, M for million, G for billion.
///
/// Examples: "30000000", "30M", "1G", "2G"
pub(crate) fn parse_gas_limit(s: &str) -> eyre::Result<u64> {
let s = s.trim();
if s.is_empty() {
return Err(eyre::eyre!("empty value"));
}
let (num_str, multiplier) = if let Some(prefix) = s.strip_suffix(['G', 'g']) {
(prefix, 1_000_000_000u64)
} else if let Some(prefix) = s.strip_suffix(['M', 'm']) {
(prefix, 1_000_000u64)
} else if let Some(prefix) = s.strip_suffix(['K', 'k']) {
(prefix, 1_000u64)
} else {
(s, 1u64)
};
let base: u64 = num_str.trim().parse()?;
base.checked_mul(multiplier).ok_or_else(|| eyre::eyre!("value overflow"))
}
use alloy_consensus::Header;
use alloy_eips::eip4844::kzg_to_versioned_hash;
use alloy_primitives::{Address, B256};
@@ -217,50 +194,3 @@ pub(crate) async fn get_payload_with_sidecar(
_ => panic!("This tool does not support getPayload versions past v5"),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_gas_limit_plain_number() {
assert_eq!(parse_gas_limit("30000000").unwrap(), 30_000_000);
assert_eq!(parse_gas_limit("1").unwrap(), 1);
assert_eq!(parse_gas_limit("0").unwrap(), 0);
}
#[test]
fn test_parse_gas_limit_k_suffix() {
assert_eq!(parse_gas_limit("1K").unwrap(), 1_000);
assert_eq!(parse_gas_limit("30k").unwrap(), 30_000);
assert_eq!(parse_gas_limit("100K").unwrap(), 100_000);
}
#[test]
fn test_parse_gas_limit_m_suffix() {
assert_eq!(parse_gas_limit("1M").unwrap(), 1_000_000);
assert_eq!(parse_gas_limit("30m").unwrap(), 30_000_000);
assert_eq!(parse_gas_limit("100M").unwrap(), 100_000_000);
}
#[test]
fn test_parse_gas_limit_g_suffix() {
assert_eq!(parse_gas_limit("1G").unwrap(), 1_000_000_000);
assert_eq!(parse_gas_limit("2g").unwrap(), 2_000_000_000);
assert_eq!(parse_gas_limit("10G").unwrap(), 10_000_000_000);
}
#[test]
fn test_parse_gas_limit_with_whitespace() {
assert_eq!(parse_gas_limit(" 1G ").unwrap(), 1_000_000_000);
assert_eq!(parse_gas_limit("2 M").unwrap(), 2_000_000);
}
#[test]
fn test_parse_gas_limit_errors() {
assert!(parse_gas_limit("").is_err());
assert!(parse_gas_limit("abc").is_err());
assert!(parse_gas_limit("G").is_err());
assert!(parse_gas_limit("-1G").is_err());
}
}

View File

@@ -25,8 +25,8 @@
//! - `jemalloc-unprefixed`: Uses unprefixed jemalloc symbols.
//! - `tracy-allocator`: Enables [Tracy](https://github.com/wolfpld/tracy) profiler allocator
//! integration for memory profiling.
//! - `snmalloc`: Uses [snmalloc](https://github.com/microsoft/snmalloc) as the global allocator.
//! Use `--no-default-features` when enabling this, as jemalloc takes precedence.
//! - `snmalloc`: Uses [snmalloc](https://github.com/snmalloc/snmalloc) as the global allocator. Use
//! `--no-default-features` when enabling this, as jemalloc takes precedence.
//! - `snmalloc-native`: Uses snmalloc with native CPU optimizations. Use `--no-default-features`
//! when enabling this.
//!

View File

@@ -206,33 +206,11 @@ impl DeferredTrieData {
Default::default(), // prefix_sets are per-block, not cumulative
);
// Only trigger COW clone if there's actually data to add.
#[cfg(feature = "rayon")]
{
rayon::join(
|| {
if !sorted_hashed_state.is_empty() {
Arc::make_mut(&mut overlay.state)
.extend_ref_and_sort(&sorted_hashed_state);
}
},
|| {
if !sorted_trie_updates.is_empty() {
Arc::make_mut(&mut overlay.nodes)
.extend_ref_and_sort(&sorted_trie_updates);
}
},
);
if !sorted_hashed_state.is_empty() {
Arc::make_mut(&mut overlay.state).extend_ref_and_sort(&sorted_hashed_state);
}
#[cfg(not(feature = "rayon"))]
{
if !sorted_hashed_state.is_empty() {
Arc::make_mut(&mut overlay.state)
.extend_ref_and_sort(&sorted_hashed_state);
}
if !sorted_trie_updates.is_empty() {
Arc::make_mut(&mut overlay.nodes)
.extend_ref_and_sort(&sorted_trie_updates);
}
if !sorted_trie_updates.is_empty() {
Arc::make_mut(&mut overlay.nodes).extend_ref_and_sort(&sorted_trie_updates);
}
overlay
}
@@ -265,8 +243,53 @@ impl DeferredTrieData {
/// In normal operation, the parent always has a cached overlay and this
/// function is never called.
///
/// Iterates ancestors oldest -> newest, then extends with current block's data,
/// so later state takes precedence.
/// When the `rayon` feature is enabled, uses parallel collection and merge:
/// 1. Collects ancestor data in parallel (each `wait_cloned()` may compute)
/// 2. Merges hashed state and trie updates in parallel with each other
/// 3. Uses tree reduction within each merge for O(log n) depth
#[cfg(feature = "rayon")]
fn merge_ancestors_into_overlay(
ancestors: &[Self],
sorted_hashed_state: &HashedPostStateSorted,
sorted_trie_updates: &TrieUpdatesSorted,
) -> TrieInputSorted {
// Early exit: no ancestors means just wrap current block's data
if ancestors.is_empty() {
return TrieInputSorted::new(
Arc::new(sorted_trie_updates.clone()),
Arc::new(sorted_hashed_state.clone()),
Default::default(),
);
}
// Collect ancestor data, unzipping states and updates into Arc slices
let (states, updates): (Vec<_>, Vec<_>) = ancestors
.iter()
.map(|a| {
let data = a.wait_cloned();
(data.hashed_state, data.trie_updates)
})
.unzip();
// Merge state and nodes in parallel with each other using tree reduction
let (state, nodes) = rayon::join(
|| {
let mut merged = HashedPostStateSorted::merge_parallel(&states);
merged.extend_ref_and_sort(sorted_hashed_state);
merged
},
|| {
let mut merged = TrieUpdatesSorted::merge_parallel(&updates);
merged.extend_ref_and_sort(sorted_trie_updates);
merged
},
);
TrieInputSorted::new(Arc::new(nodes), Arc::new(state), Default::default())
}
/// Merge all ancestors and current block's data into a single overlay (sequential fallback).
#[cfg(not(feature = "rayon"))]
fn merge_ancestors_into_overlay(
ancestors: &[Self],
sorted_hashed_state: &HashedPostStateSorted,
@@ -284,17 +307,8 @@ impl DeferredTrieData {
}
// Extend with current block's sorted data last (takes precedence)
#[cfg(feature = "rayon")]
rayon::join(
|| state_mut.extend_ref_and_sort(sorted_hashed_state),
|| nodes_mut.extend_ref_and_sort(sorted_trie_updates),
);
#[cfg(not(feature = "rayon"))]
{
state_mut.extend_ref_and_sort(sorted_hashed_state);
nodes_mut.extend_ref_and_sort(sorted_trie_updates);
}
state_mut.extend_ref_and_sort(sorted_hashed_state);
nodes_mut.extend_ref_and_sort(sorted_trie_updates);
overlay
}

View File

@@ -17,10 +17,7 @@ use reth_primitives_traits::{
SignedTransaction,
};
use reth_storage_api::StateProviderBox;
use reth_trie::{
updates::TrieUpdatesSorted, HashedPostStateSorted, LazyTrieData, SortedTrieData,
TrieInputSorted,
};
use reth_trie::{updates::TrieUpdatesSorted, HashedPostStateSorted, LazyTrieData, TrieInputSorted};
use std::{collections::BTreeMap, sync::Arc, time::Instant};
use tokio::sync::{broadcast, watch};
@@ -951,36 +948,22 @@ impl<N: NodePrimitives<SignedTx: SignedTransaction>> NewCanonicalChain<N> {
match blocks {
[] => Chain::default(),
[first, rest @ ..] => {
let trie_data_handle = first.trie_data_handle();
let mut chain = Chain::from_block(
first.recovered_block().clone(),
ExecutionOutcome::from((
first.execution_outcome().clone(),
first.block_number(),
)),
LazyTrieData::deferred(move || {
let trie_data = trie_data_handle.wait_cloned();
SortedTrieData {
hashed_state: trie_data.hashed_state,
trie_updates: trie_data.trie_updates,
}
}),
LazyTrieData::ready(first.hashed_state(), first.trie_updates()),
);
for exec in rest {
let trie_data_handle = exec.trie_data_handle();
chain.append_block(
exec.recovered_block().clone(),
ExecutionOutcome::from((
exec.execution_outcome().clone(),
exec.block_number(),
)),
LazyTrieData::deferred(move || {
let trie_data = trie_data_handle.wait_cloned();
SortedTrieData {
hashed_state: trie_data.hashed_state,
trie_updates: trie_data.trie_updates,
}
}),
LazyTrieData::ready(exec.hashed_state(), exec.trie_updates()),
);
}
chain

View File

@@ -25,7 +25,6 @@ pub use alloy_chains::{Chain, ChainKind, NamedChain};
/// Re-export for convenience
pub use reth_ethereum_forks::*;
pub use alloy_evm::EvmLimitParams;
pub use api::EthChainSpec;
pub use info::ChainInfo;
#[cfg(any(test, feature = "test-utils"))]

View File

@@ -101,8 +101,8 @@ impl<N: NodeTypes> TableViewer<()> for ListTableViewer<'_, N> {
// We may be using the tui for a long time
tx.disable_long_read_transaction_safety();
let table_db = tx.inner().open_db(Some(self.args.table.name())).wrap_err("Could not open db.")?;
let stats = tx.inner().db_stat(table_db.dbi()).wrap_err(format!("Could not find table: {}", self.args.table.name()))?;
let table_db = tx.inner.open_db(Some(self.args.table.name())).wrap_err("Could not open db.")?;
let stats = tx.inner.db_stat(table_db.dbi()).wrap_err(format!("Could not find table: {}", self.args.table.name()))?;
let total_entries = stats.entries();
let final_entry_idx = total_entries.saturating_sub(1);
if self.args.skip > final_entry_idx {

View File

@@ -92,10 +92,10 @@ impl Command {
db_tables.sort();
let mut total_size = 0;
for db_table in db_tables {
let table_db = tx.inner().open_db(Some(db_table)).wrap_err("Could not open db.")?;
let table_db = tx.inner.open_db(Some(db_table)).wrap_err("Could not open db.")?;
let stats = tx
.inner()
.inner
.db_stat(table_db.dbi())
.wrap_err(format!("Could not find table: {db_table}"))?;
@@ -136,9 +136,9 @@ impl Command {
.add_cell(Cell::new(human_bytes(total_size as f64)));
table.add_row(row);
let freelist = tx.inner().env().freelist()?;
let freelist = tx.inner.env().freelist()?;
let pagesize =
tx.inner().db_stat(mdbx::Database::freelist_db().dbi())?.page_size() as usize;
tx.inner.db_stat(mdbx::Database::freelist_db().dbi())?.page_size() as usize;
let freelist_size = freelist * pagesize;
let mut row = Row::new();
@@ -205,16 +205,6 @@ impl Command {
.add_cell(Cell::new(human_bytes(total_size as f64)))
.add_cell(Cell::new(human_bytes(total_pending as f64)));
table.add_row(row);
let wal_size = tool.provider_factory.rocksdb_provider().wal_size_bytes();
let mut row = Row::new();
row.add_cell(Cell::new("WAL"))
.add_cell(Cell::new(""))
.add_cell(Cell::new(""))
.add_cell(Cell::new(""))
.add_cell(Cell::new(human_bytes(wal_size as f64)))
.add_cell(Cell::new(""));
table.add_row(row);
}
table

View File

@@ -26,14 +26,6 @@ pub struct ImportCommand<C: ChainSpecParser> {
#[arg(long, value_name = "CHUNK_LEN", verbatim_doc_comment)]
chunk_len: Option<u64>,
/// Fail immediately when an invalid block is encountered.
///
/// By default, the import will stop at the last valid block if an invalid block is
/// encountered during execution or validation, leaving the database at the last valid
/// block state. When this flag is set, the import will instead fail with an error.
#[arg(long, verbatim_doc_comment)]
fail_on_invalid_block: bool,
/// The path(s) to block file(s) for import.
///
/// The online stages (headers and bodies) are replaced by a file import, after which the
@@ -60,11 +52,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
info!(target: "reth::cli", "Starting import of {} file(s)", self.paths.len());
let import_config = ImportConfig {
no_state: self.no_state,
chunk_len: self.chunk_len,
fail_on_invalid_block: self.fail_on_invalid_block,
};
let import_config = ImportConfig { no_state: self.no_state, chunk_len: self.chunk_len };
let executor = components.evm_config().clone();
let consensus = Arc::new(components.consensus().clone());
@@ -93,20 +81,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
total_decoded_blocks += result.total_decoded_blocks;
total_decoded_txns += result.total_decoded_txns;
// Check if we stopped due to an invalid block
if result.stopped_on_invalid_block {
info!(target: "reth::cli",
"Stopped at last valid block {} due to invalid block {} in file: {}. Imported {} blocks, {} transactions",
result.last_valid_block.unwrap_or(0),
result.bad_block.unwrap_or(0),
path.display(),
result.total_imported_blocks,
result.total_imported_txns);
// Stop importing further files and exit successfully
break;
}
if !result.is_successful() {
if !result.is_complete() {
return Err(eyre::eyre!(
"Chain was partially imported from file: {}. Imported {}/{} blocks, {}/{} transactions",
path.display(),
@@ -123,7 +98,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
}
info!(target: "reth::cli",
"Import complete. Total: {}/{} blocks, {}/{} transactions",
"All files imported successfully. Total: {}/{} blocks, {}/{} transactions",
total_imported_blocks, total_decoded_blocks, total_imported_txns, total_decoded_txns);
Ok(())
@@ -164,20 +139,4 @@ mod tests {
assert_eq!(args.paths[1], PathBuf::from("file2.rlp"));
assert_eq!(args.paths[2], PathBuf::from("file3.rlp"));
}
#[test]
fn parse_import_command_with_fail_on_invalid_block() {
let args: ImportCommand<EthereumChainSpecParser> =
ImportCommand::parse_from(["reth", "--fail-on-invalid-block", "chain.rlp"]);
assert!(args.fail_on_invalid_block);
assert_eq!(args.paths.len(), 1);
assert_eq!(args.paths[0], PathBuf::from("chain.rlp"));
}
#[test]
fn parse_import_command_default_stops_on_invalid_block() {
let args: ImportCommand<EthereumChainSpecParser> =
ImportCommand::parse_from(["reth", "chain.rlp"]);
assert!(!args.fail_on_invalid_block);
}
}

View File

@@ -22,11 +22,11 @@ use reth_provider::{
StageCheckpointReader,
};
use reth_prune::PruneModes;
use reth_stages::{prelude::*, ControlFlow, Pipeline, StageId, StageSet};
use reth_stages::{prelude::*, Pipeline, StageId, StageSet};
use reth_static_file::StaticFileProducer;
use std::{path::Path, sync::Arc};
use tokio::sync::watch;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info};
/// Configuration for importing blocks from RLP files.
#[derive(Debug, Clone, Default)]
@@ -35,9 +35,6 @@ pub struct ImportConfig {
pub no_state: bool,
/// Chunk byte length to read from file.
pub chunk_len: Option<u64>,
/// If true, fail immediately when an invalid block is encountered.
/// By default (false), the import stops at the last valid block and exits successfully.
pub fail_on_invalid_block: bool,
}
/// Result of an import operation.
@@ -51,12 +48,6 @@ pub struct ImportResult {
pub total_imported_blocks: usize,
/// Total number of transactions imported into the database.
pub total_imported_txns: usize,
/// Whether the import was stopped due to an invalid block.
pub stopped_on_invalid_block: bool,
/// The block number that was invalid, if any.
pub bad_block: Option<u64>,
/// The last valid block number when stopped due to invalid block.
pub last_valid_block: Option<u64>,
}
impl ImportResult {
@@ -65,14 +56,6 @@ impl ImportResult {
self.total_decoded_blocks == self.total_imported_blocks &&
self.total_decoded_txns == self.total_imported_txns
}
/// Returns true if the import was successful, considering stop-on-invalid-block mode.
///
/// In stop-on-invalid-block mode, a partial import is considered successful if we
/// stopped due to an invalid block (leaving the DB at the last valid block).
pub fn is_successful(&self) -> bool {
self.is_complete() || self.stopped_on_invalid_block
}
}
/// Imports blocks from an RLP-encoded file into the database.
@@ -120,11 +103,6 @@ where
let static_file_producer =
StaticFileProducer::new(provider_factory.clone(), PruneModes::default());
// Track if we stopped due to an invalid block
let mut stopped_on_invalid_block = false;
let mut bad_block_number: Option<u64> = None;
let mut last_valid_block_number: Option<u64> = None;
while let Some(file_client) =
reader.next_chunk::<BlockTy<N>>(consensus.clone(), Some(sealed_header)).await?
{
@@ -159,51 +137,12 @@ where
// Run pipeline
info!(target: "reth::import", "Starting sync pipeline");
if import_config.fail_on_invalid_block {
// Original behavior: fail on unwind
tokio::select! {
res = pipeline.run() => res?,
_ = tokio::signal::ctrl_c() => {
info!(target: "reth::import", "Import interrupted by user");
break;
},
}
} else {
// Default behavior: Use run_loop() to handle unwinds gracefully
let result = tokio::select! {
res = pipeline.run_loop() => res,
_ = tokio::signal::ctrl_c() => {
info!(target: "reth::import", "Import interrupted by user");
break;
},
};
match result {
Ok(ControlFlow::Unwind { target, bad_block }) => {
// An invalid block was encountered; stop at last valid block
let bad = bad_block.block.number;
warn!(
target: "reth::import",
bad_block = bad,
last_valid_block = target,
"Invalid block encountered during import; stopping at last valid block"
);
stopped_on_invalid_block = true;
bad_block_number = Some(bad);
last_valid_block_number = Some(target);
break;
}
Ok(ControlFlow::Continue { block_number }) => {
debug!(target: "reth::import", block_number, "Pipeline chunk completed");
}
Ok(ControlFlow::NoProgress { block_number }) => {
debug!(target: "reth::import", ?block_number, "Pipeline made no progress");
}
Err(e) => {
// Propagate other pipeline errors
return Err(e.into());
}
}
tokio::select! {
res = pipeline.run() => res?,
_ = tokio::signal::ctrl_c() => {
info!(target: "reth::import", "Import interrupted by user");
break;
},
}
sealed_header = provider_factory
@@ -221,20 +160,9 @@ where
total_decoded_txns,
total_imported_blocks,
total_imported_txns,
stopped_on_invalid_block,
bad_block: bad_block_number,
last_valid_block: last_valid_block_number,
};
if result.stopped_on_invalid_block {
info!(target: "reth::import",
total_imported_blocks,
total_imported_txns,
bad_block = ?result.bad_block,
last_valid_block = ?result.last_valid_block,
"Import stopped at last valid block due to invalid block"
);
} else if !result.is_complete() {
if !result.is_complete() {
error!(target: "reth::import",
total_decoded_blocks,
total_imported_blocks,

View File

@@ -1,34 +0,0 @@
//! Enode identifier command
use clap::Parser;
use reth_cli_util::get_secret_key;
use reth_network_peers::NodeRecord;
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
};
/// Print the enode identifier for a given secret key.
#[derive(Parser, Debug)]
pub struct Command {
/// Path to the secret key file for discovery.
pub discovery_secret: PathBuf,
/// Optional IP address to include in the enode URL.
///
/// If not provided, defaults to 0.0.0.0.
#[arg(long)]
pub ip: Option<IpAddr>,
}
impl Command {
/// Execute the enode command.
pub fn execute(self) -> eyre::Result<()> {
let sk = get_secret_key(&self.discovery_secret)?;
let ip = self.ip.unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED));
let addr = SocketAddr::new(ip, 30303);
let enr = NodeRecord::from_secret_key(addr, &sk);
println!("{enr}");
Ok(())
}
}

View File

@@ -18,7 +18,6 @@ use reth_node_core::{
};
pub mod bootnode;
pub mod enode;
pub mod rlpx;
/// `reth p2p` command
@@ -86,9 +85,6 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
Subcommands::Bootnode(command) => {
command.execute().await?;
}
Subcommands::Enode(command) => {
command.execute()?;
}
}
Ok(())
@@ -103,7 +99,6 @@ impl<C: ChainSpecParser> Command<C> {
Subcommands::Body { args, .. } => Some(&args.chain),
Subcommands::Rlpx(_) => None,
Subcommands::Bootnode(_) => None,
Subcommands::Enode(_) => None,
}
}
}
@@ -131,8 +126,6 @@ pub enum Subcommands<C: ChainSpecParser> {
Rlpx(rlpx::Command),
/// Bootnode command
Bootnode(bootnode::Command),
/// Print enode identifier
Enode(enode::Command),
}
#[derive(Debug, Clone, Parser)]
@@ -232,16 +225,4 @@ mod tests {
let _args: Command<EthereumChainSpecParser> =
Command::parse_from(["reth", "body", "--chain", "mainnet", "1000"]);
}
#[test]
fn parse_enode_cmd() {
let _args: Command<EthereumChainSpecParser> =
Command::parse_from(["reth", "enode", "/tmp/secret"]);
}
#[test]
fn parse_enode_cmd_with_ip() {
let _args: Command<EthereumChainSpecParser> =
Command::parse_from(["reth", "enode", "/tmp/secret", "--ip", "192.168.1.1"]);
}
}

View File

@@ -1,11 +1,14 @@
//! Collection of methods for block validation.
use alloy_consensus::{BlockHeader as _, EMPTY_OMMER_ROOT_HASH};
use alloy_consensus::{BlockHeader as _, Transaction, EMPTY_OMMER_ROOT_HASH};
use alloy_eips::{eip4844::DATA_GAS_PER_BLOB, eip7840::BlobParams};
use reth_chainspec::{EthChainSpec, EthereumHardfork, EthereumHardforks};
use reth_consensus::ConsensusError;
use reth_consensus::{ConsensusError, TxGasLimitTooHighErr};
use reth_primitives_traits::{
constants::{GAS_LIMIT_BOUND_DIVISOR, MAXIMUM_GAS_LIMIT_BLOCK, MINIMUM_GAS_LIMIT},
constants::{
GAS_LIMIT_BOUND_DIVISOR, MAXIMUM_GAS_LIMIT_BLOCK, MAX_TX_GAS_LIMIT_OSAKA, MINIMUM_GAS_LIMIT,
},
transaction::TxHashRef,
Block, BlockBody, BlockHeader, GotExpected, SealedBlock, SealedHeader,
};
@@ -143,7 +146,7 @@ pub fn validate_block_pre_execution<B, ChainSpec>(
) -> Result<(), ConsensusError>
where
B: Block,
ChainSpec: EthChainSpec + EthereumHardforks,
ChainSpec: EthereumHardforks,
{
post_merge_hardfork_fields(block, chain_spec)?;
@@ -151,6 +154,19 @@ where
if let Err(error) = block.ensure_transaction_root_valid() {
return Err(ConsensusError::BodyTransactionRootDiff(error.into()))
}
// EIP-7825 validation
if chain_spec.is_osaka_active_at_timestamp(block.timestamp()) {
for tx in block.body().transactions() {
if tx.gas_limit() > MAX_TX_GAS_LIMIT_OSAKA {
return Err(TxGasLimitTooHighErr {
tx_hash: *tx.tx_hash(),
gas_limit: tx.gas_limit(),
max_allowed: MAX_TX_GAS_LIMIT_OSAKA,
}
.into());
}
}
}
Ok(())
}

View File

@@ -72,11 +72,3 @@ derive_more.workspace = true
[[test]]
name = "e2e_testsuite"
path = "tests/e2e-testsuite/main.rs"
[[test]]
name = "rocksdb"
path = "tests/rocksdb/main.rs"
required-features = ["edge"]
[features]
edge = ["reth-node-core/edge", "reth-provider/rocksdb", "reth-cli-commands/edge"]

View File

@@ -103,10 +103,7 @@ where
N: NodeBuilderHelper,
{
E2ETestSetupBuilder::new(num_nodes, chain_spec, attributes_generator)
.with_tree_config_modifier(move |base| {
// Apply caller's tree_config but preserve the small cache size from base
tree_config.clone().with_cross_block_cache_size(base.cross_block_cache_size())
})
.with_tree_config_modifier(move |_| tree_config.clone())
.with_node_config_modifier(move |config| config.set_dev(is_dev))
.with_connect_nodes(connect_nodes)
.build()

View File

@@ -112,13 +112,11 @@ where
..NetworkArgs::default()
};
// Apply tree config modifier if present, with test-appropriate defaults
let base_tree_config =
reth_node_api::TreeConfig::default().with_cross_block_cache_size(1024 * 1024);
// Apply tree config modifier if present
let tree_config = if let Some(modifier) = self.tree_config_modifier {
modifier(base_tree_config)
modifier(reth_node_api::TreeConfig::default())
} else {
base_tree_config
reth_node_api::TreeConfig::default()
};
let mut nodes = (0..self.num_nodes)

View File

@@ -125,10 +125,7 @@ pub async fn setup_engine_with_chain_import(
db.clone(),
chain_spec.clone(),
reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())?,
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
.with_default_tables()
.build()
.unwrap(),
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
)?;
// Initialize genesis if needed
@@ -331,7 +328,6 @@ mod tests {
reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())
.unwrap(),
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path.clone())
.with_default_tables()
.build()
.unwrap(),
)
@@ -396,7 +392,6 @@ mod tests {
reth_provider::providers::StaticFileProvider::read_only(static_files_path, false)
.unwrap(),
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
.with_default_tables()
.build()
.unwrap(),
)
@@ -495,10 +490,7 @@ mod tests {
db.clone(),
chain_spec.clone(),
reth_provider::providers::StaticFileProvider::read_write(static_files_path).unwrap(),
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
.with_default_tables()
.build()
.unwrap(),
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
)
.expect("failed to create provider factory");

View File

@@ -38,18 +38,6 @@ impl TransactionTestContext {
signed.encoded_2718().into()
}
/// Creates a transfer with a specific nonce and signs it, returning bytes.
/// Uses high `max_fee_per_gas` (1000 gwei) to ensure tx acceptance regardless of basefee.
pub async fn transfer_tx_bytes_with_nonce(
chain_id: u64,
wallet: PrivateKeySigner,
nonce: u64,
) -> Bytes {
let tx = tx(chain_id, 21000, None, None, nonce, Some(1000e9 as u128));
let signed = Self::sign_tx(wallet, tx).await;
signed.encoded_2718().into()
}
/// Creates a deployment transaction and signs it, returning an envelope.
pub async fn deploy_tx(
chain_id: u64,

View File

@@ -1,471 +0,0 @@
//! E2E tests for `RocksDB` provider functionality.
#![cfg(all(feature = "edge", unix))]
use alloy_consensus::BlockHeader;
use alloy_primitives::B256;
use alloy_rpc_types_eth::{Transaction, TransactionReceipt};
use eyre::Result;
use jsonrpsee::core::client::ClientT;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
use reth_db::tables;
use reth_e2e_test_utils::{transaction::TransactionTestContext, wallet, E2ETestSetupBuilder};
use reth_node_core::args::RocksDbArgs;
use reth_node_ethereum::EthereumNode;
use reth_payload_builder::EthPayloadBuilderAttributes;
use reth_provider::{RocksDBProviderFactory, StorageSettings};
use std::{sync::Arc, time::Duration};
const ROCKSDB_POLL_TIMEOUT: Duration = Duration::from_secs(60);
const ROCKSDB_POLL_INTERVAL: Duration = Duration::from_millis(50);
/// Polls RPC until the given `tx_hash` is visible as pending (not yet mined).
/// Prevents race conditions where `advance_block` is called before txs are in the pool.
/// Returns the pending transaction.
async fn wait_for_pending_tx<C: ClientT>(client: &C, tx_hash: B256) -> Transaction {
let start = std::time::Instant::now();
loop {
let tx: Option<Transaction> = client
.request("eth_getTransactionByHash", [tx_hash])
.await
.expect("RPC request failed");
if let Some(tx) = tx {
assert!(
tx.block_number.is_none(),
"Expected pending tx but tx_hash={tx_hash:?} is already mined in block {:?}",
tx.block_number
);
return tx;
}
assert!(
start.elapsed() < ROCKSDB_POLL_TIMEOUT,
"Timed out after {:?} waiting for tx_hash={tx_hash:?} to appear in pending pool",
start.elapsed()
);
tokio::time::sleep(ROCKSDB_POLL_INTERVAL).await;
}
}
/// Polls `RocksDB` until the given `tx_hash` appears in `TransactionHashNumbers`.
/// Returns the `tx_number` on success, or panics on timeout.
async fn poll_tx_in_rocksdb<P: RocksDBProviderFactory>(provider: &P, tx_hash: B256) -> u64 {
let start = std::time::Instant::now();
let mut interval = ROCKSDB_POLL_INTERVAL;
loop {
// Re-acquire handle each iteration to avoid stale snapshot reads
let rocksdb = provider.rocksdb_provider();
let tx_number: Option<u64> =
rocksdb.get::<tables::TransactionHashNumbers>(tx_hash).expect("RocksDB get failed");
if let Some(n) = tx_number {
return n;
}
assert!(
start.elapsed() < ROCKSDB_POLL_TIMEOUT,
"Timed out after {:?} waiting for tx_hash={tx_hash:?} in RocksDB",
start.elapsed()
);
tokio::time::sleep(interval).await;
// Simple backoff: 50ms -> 100ms -> 200ms (capped)
interval = std::cmp::min(interval * 2, Duration::from_millis(200));
}
}
/// Returns the test chain spec for `RocksDB` tests.
fn test_chain_spec() -> Arc<ChainSpec> {
Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(
serde_json::from_str(include_str!("../../src/testsuite/assets/genesis.json"))
.expect("failed to parse genesis.json"),
)
.cancun_activated()
.build(),
)
}
/// Returns test payload attributes for the given timestamp.
fn test_attributes_generator(timestamp: u64) -> EthPayloadBuilderAttributes {
let attributes = alloy_rpc_types_engine::PayloadAttributes {
timestamp,
prev_randao: B256::ZERO,
suggested_fee_recipient: alloy_primitives::Address::ZERO,
withdrawals: Some(vec![]),
parent_beacon_block_root: Some(B256::ZERO),
};
EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
}
/// Verifies that `RocksDB` CLI defaults match `StorageSettings::base()`.
#[test]
fn test_rocksdb_defaults_match_storage_settings() {
let args = RocksDbArgs::default();
let settings = StorageSettings::base();
assert_eq!(
args.tx_hash, settings.transaction_hash_numbers_in_rocksdb,
"tx_hash default should match StorageSettings::base()"
);
assert_eq!(
args.storages_history, settings.storages_history_in_rocksdb,
"storages_history default should match StorageSettings::base()"
);
assert_eq!(
args.account_history, settings.account_history_in_rocksdb,
"account_history default should match StorageSettings::base()"
);
}
/// Smoke test: node boots with `RocksDB` routing enabled.
#[tokio::test]
async fn test_rocksdb_node_startup() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let (nodes, _tasks, _wallet) =
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
.build()
.await?;
assert_eq!(nodes.len(), 1);
// Verify RocksDB provider is functional (can query without error)
let rocksdb = nodes[0].inner.provider.rocksdb_provider();
let missing_hash = B256::from([0xab; 32]);
let result: Option<u64> = rocksdb.get::<tables::TransactionHashNumbers>(missing_hash)?;
assert!(result.is_none(), "Missing hash should return None");
let genesis_hash = nodes[0].block_hash(0);
assert_ne!(genesis_hash, B256::ZERO);
Ok(())
}
/// Block mining works with `RocksDB` storage.
#[tokio::test]
async fn test_rocksdb_block_mining() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _wallet) =
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
.build()
.await?;
assert_eq!(nodes.len(), 1);
let genesis_hash = nodes[0].block_hash(0);
assert_ne!(genesis_hash, B256::ZERO);
// Mine 3 blocks with transactions
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
let signer = wallets[0].clone();
let client = nodes[0].rpc_client().expect("RPC client should be available");
for i in 1..=3u64 {
let raw_tx =
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), i - 1)
.await;
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
// Wait for tx to enter pending pool before mining
wait_for_pending_tx(&client, tx_hash).await;
let payload = nodes[0].advance_block().await?;
let block = payload.block();
assert_eq!(block.number(), i);
assert_ne!(block.hash(), B256::ZERO);
// Verify tx was actually included in the block
let receipt: Option<TransactionReceipt> =
client.request("eth_getTransactionReceipt", [tx_hash]).await?;
let receipt = receipt.expect("Receipt should exist after mining");
assert_eq!(receipt.block_number, Some(i), "Tx should be in block {i}");
}
// Verify all blocks are stored
for i in 0..=3 {
let block_hash = nodes[0].block_hash(i);
assert_ne!(block_hash, B256::ZERO);
}
Ok(())
}
/// Tx hash lookup exercises `TransactionHashNumbers` table.
#[tokio::test]
async fn test_rocksdb_transaction_queries() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
)
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.build()
.await?;
assert_eq!(nodes.len(), 1);
// Inject and mine a transaction
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
let signer = wallets[0].clone();
let client = nodes[0].rpc_client().expect("RPC client should be available");
let raw_tx = TransactionTestContext::transfer_tx_bytes(chain_id, signer).await;
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
// Wait for tx to enter pending pool before mining
wait_for_pending_tx(&client, tx_hash).await;
let payload = nodes[0].advance_block().await?;
assert_eq!(payload.block().number(), 1);
// Query each transaction by hash
let tx: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash]).await?;
let tx = tx.expect("Transaction should be found");
assert_eq!(tx.block_number, Some(1));
let receipt: Option<TransactionReceipt> =
client.request("eth_getTransactionReceipt", [tx_hash]).await?;
let receipt = receipt.expect("Receipt should be found");
assert_eq!(receipt.block_number, Some(1));
assert!(receipt.status());
// Direct RocksDB assertion - poll with timeout since persistence is async
let tx_number = poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash).await;
assert_eq!(tx_number, 0, "First tx should have TxNumber 0");
// Verify missing hash returns None
let missing_hash = B256::from([0xde; 32]);
let rocksdb = nodes[0].inner.provider.rocksdb_provider();
let missing_tx_number: Option<u64> =
rocksdb.get::<tables::TransactionHashNumbers>(missing_hash)?;
assert!(missing_tx_number.is_none());
let missing_tx: Option<Transaction> =
client.request("eth_getTransactionByHash", [missing_hash]).await?;
assert!(missing_tx.is_none(), "expected no transaction for missing hash");
let missing_receipt: Option<TransactionReceipt> =
client.request("eth_getTransactionReceipt", [missing_hash]).await?;
assert!(missing_receipt.is_none(), "expected no receipt for missing hash");
Ok(())
}
/// Multiple transactions in the same block are correctly persisted to `RocksDB`.
#[tokio::test]
async fn test_rocksdb_multi_tx_same_block() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
)
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.build()
.await?;
// Create 3 txs from the same wallet with sequential nonces
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
let signer = wallets[0].clone();
let client = nodes[0].rpc_client().expect("RPC client");
let mut tx_hashes = Vec::new();
for nonce in 0..3 {
let raw_tx =
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), nonce)
.await;
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
tx_hashes.push(tx_hash);
}
// Wait for all txs to appear in pending pool before mining
for tx_hash in &tx_hashes {
wait_for_pending_tx(&client, *tx_hash).await;
}
// Mine one block containing all 3 txs
let payload = nodes[0].advance_block().await?;
assert_eq!(payload.block().number(), 1);
// Verify block contains all 3 txs
let block: Option<alloy_rpc_types_eth::Block> =
client.request("eth_getBlockByNumber", ("0x1", true)).await?;
let block = block.expect("Block 1 should exist");
assert_eq!(block.transactions.len(), 3, "Block should contain 3 txs");
// Verify each tx via RPC
for tx_hash in &tx_hashes {
let tx: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash]).await?;
let tx = tx.expect("Transaction should be found");
assert_eq!(tx.block_number, Some(1), "All txs should be in block 1");
}
// Poll RocksDB for all tx hashes and collect tx_numbers
let mut tx_numbers = Vec::new();
for tx_hash in &tx_hashes {
let n = poll_tx_in_rocksdb(&nodes[0].inner.provider, *tx_hash).await;
tx_numbers.push(n);
}
// Verify tx_numbers form the set {0, 1, 2}
tx_numbers.sort();
assert_eq!(tx_numbers, vec![0, 1, 2], "TxNumbers should be 0, 1, 2");
Ok(())
}
/// Transactions across multiple blocks have globally continuous `tx_numbers`.
#[tokio::test]
async fn test_rocksdb_txs_across_blocks() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
)
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.build()
.await?;
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
let signer = wallets[0].clone();
let client = nodes[0].rpc_client().expect("RPC client");
// Block 1: 2 transactions
let tx_hash_0 = nodes[0]
.rpc
.inject_tx(
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), 0).await,
)
.await?;
let tx_hash_1 = nodes[0]
.rpc
.inject_tx(
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), 1).await,
)
.await?;
// Wait for both txs to appear in pending pool
wait_for_pending_tx(&client, tx_hash_0).await;
wait_for_pending_tx(&client, tx_hash_1).await;
let payload1 = nodes[0].advance_block().await?;
assert_eq!(payload1.block().number(), 1);
// Block 2: 1 transaction
let tx_hash_2 = nodes[0]
.rpc
.inject_tx(
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), 2).await,
)
.await?;
wait_for_pending_tx(&client, tx_hash_2).await;
let payload2 = nodes[0].advance_block().await?;
assert_eq!(payload2.block().number(), 2);
// Verify block contents via RPC
let tx0: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash_0]).await?;
let tx1: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash_1]).await?;
let tx2: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash_2]).await?;
assert_eq!(tx0.expect("tx0").block_number, Some(1));
assert_eq!(tx1.expect("tx1").block_number, Some(1));
assert_eq!(tx2.expect("tx2").block_number, Some(2));
// Poll RocksDB and verify global tx_number continuity
let all_tx_hashes = [tx_hash_0, tx_hash_1, tx_hash_2];
let mut tx_numbers = Vec::new();
for tx_hash in &all_tx_hashes {
let n = poll_tx_in_rocksdb(&nodes[0].inner.provider, *tx_hash).await;
tx_numbers.push(n);
}
// Verify they form a continuous sequence {0, 1, 2}
tx_numbers.sort();
assert_eq!(tx_numbers, vec![0, 1, 2], "TxNumbers should be globally continuous: 0, 1, 2");
// Re-query block 1 txs after block 2 is mined (regression guard)
let tx0_again: Option<Transaction> =
client.request("eth_getTransactionByHash", [tx_hash_0]).await?;
assert!(tx0_again.is_some(), "Block 1 tx should still be queryable after block 2");
Ok(())
}
/// Pending transactions should NOT appear in `RocksDB` until mined.
#[tokio::test]
async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
)
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.build()
.await?;
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
let signer = wallets[0].clone();
// Inject tx but do NOT mine
let raw_tx = TransactionTestContext::transfer_tx_bytes(chain_id, signer).await;
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
// Verify tx is in pending pool via RPC
let client = nodes[0].rpc_client().expect("RPC client");
wait_for_pending_tx(&client, tx_hash).await;
let pending_tx: Option<Transaction> =
client.request("eth_getTransactionByHash", [tx_hash]).await?;
assert!(pending_tx.is_some(), "Pending tx should be visible via RPC");
assert!(pending_tx.unwrap().block_number.is_none(), "Pending tx should have no block_number");
// Assert tx is NOT in RocksDB before mining (single check - tx is confirmed pending)
let rocksdb = nodes[0].inner.provider.rocksdb_provider();
let tx_number: Option<u64> = rocksdb.get::<tables::TransactionHashNumbers>(tx_hash)?;
assert!(
tx_number.is_none(),
"Pending tx should NOT be in RocksDB before mining, but found tx_number={:?}",
tx_number
);
// Now mine the block
let payload = nodes[0].advance_block().await?;
assert_eq!(payload.block().number(), 1);
// Poll until tx appears in RocksDB
let tx_number = poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash).await;
assert_eq!(tx_number, 0, "First tx should have tx_number 0");
// Verify tx is now mined via RPC
let mined_tx: Option<Transaction> =
client.request("eth_getTransactionByHash", [tx_hash]).await?;
assert_eq!(mined_tx.expect("mined tx").block_number, Some(1));
Ok(())
}

View File

@@ -50,17 +50,7 @@ pub const DEFAULT_PREWARM_MAX_CONCURRENCY: usize = 16;
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = EPOCH_SLOTS as u32 * 2;
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
const DEFAULT_CROSS_BLOCK_CACHE_SIZE: usize = default_cross_block_cache_size();
const fn default_cross_block_cache_size() -> usize {
if cfg!(test) {
1024 * 1024 // 1 MB in tests
} else if cfg!(target_pointer_width = "32") {
usize::MAX // max possible on wasm32 / 32-bit
} else {
4 * 1024 * 1024 * 1024 // 4 GB on 64-bit
}
}
const DEFAULT_CROSS_BLOCK_CACHE_SIZE: u64 = 4 * 1024 * 1024 * 1024;
/// Determines if the host has enough parallelism to run the payload processor.
///
@@ -110,10 +100,12 @@ pub struct TreeConfig {
disable_state_cache: bool,
/// Whether to disable parallel prewarming.
disable_prewarming: bool,
/// Whether to disable the parallel sparse trie state root algorithm.
disable_parallel_sparse_trie: bool,
/// Whether to enable state provider metrics.
state_provider_metrics: bool,
/// Cross-block cache size in bytes.
cross_block_cache_size: usize,
cross_block_cache_size: u64,
/// Whether the host has enough parallelism to run state root task.
has_enough_parallelism: bool,
/// Whether multiproof task should chunk proof targets.
@@ -148,8 +140,8 @@ pub struct TreeConfig {
storage_worker_count: usize,
/// Number of account proof worker threads.
account_worker_count: usize,
/// Whether to disable V2 storage proofs.
disable_proof_v2: bool,
/// Whether to enable V2 storage proofs.
enable_proof_v2: bool,
/// Whether to disable cache metrics recording (can be expensive with large cached state).
disable_cache_metrics: bool,
}
@@ -166,6 +158,7 @@ impl Default for TreeConfig {
always_compare_trie_updates: false,
disable_state_cache: false,
disable_prewarming: false,
disable_parallel_sparse_trie: false,
state_provider_metrics: false,
cross_block_cache_size: DEFAULT_CROSS_BLOCK_CACHE_SIZE,
has_enough_parallelism: has_enough_parallelism(),
@@ -179,7 +172,7 @@ impl Default for TreeConfig {
allow_unwind_canonical_header: false,
storage_worker_count: default_storage_worker_count(),
account_worker_count: default_account_worker_count(),
disable_proof_v2: false,
enable_proof_v2: false,
disable_cache_metrics: false,
}
}
@@ -198,8 +191,9 @@ impl TreeConfig {
always_compare_trie_updates: bool,
disable_state_cache: bool,
disable_prewarming: bool,
disable_parallel_sparse_trie: bool,
state_provider_metrics: bool,
cross_block_cache_size: usize,
cross_block_cache_size: u64,
has_enough_parallelism: bool,
multiproof_chunking_enabled: bool,
multiproof_chunk_size: usize,
@@ -211,7 +205,7 @@ impl TreeConfig {
allow_unwind_canonical_header: bool,
storage_worker_count: usize,
account_worker_count: usize,
disable_proof_v2: bool,
enable_proof_v2: bool,
disable_cache_metrics: bool,
) -> Self {
Self {
@@ -224,6 +218,7 @@ impl TreeConfig {
always_compare_trie_updates,
disable_state_cache,
disable_prewarming,
disable_parallel_sparse_trie,
state_provider_metrics,
cross_block_cache_size,
has_enough_parallelism,
@@ -237,7 +232,7 @@ impl TreeConfig {
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
disable_proof_v2,
enable_proof_v2,
disable_cache_metrics,
}
}
@@ -280,8 +275,7 @@ impl TreeConfig {
/// Return the multiproof task chunk size, using the V2 default if V2 proofs are enabled
/// and the chunk size is at the default value.
pub const fn effective_multiproof_chunk_size(&self) -> usize {
if !self.disable_proof_v2 &&
self.multiproof_chunk_size == DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE
if self.enable_proof_v2 && self.multiproof_chunk_size == DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE
{
DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2
} else {
@@ -305,6 +299,11 @@ impl TreeConfig {
self.state_provider_metrics
}
/// Returns whether or not the parallel sparse trie is disabled.
pub const fn disable_parallel_sparse_trie(&self) -> bool {
self.disable_parallel_sparse_trie
}
/// Returns whether or not state cache is disabled.
pub const fn disable_state_cache(&self) -> bool {
self.disable_state_cache
@@ -322,7 +321,7 @@ impl TreeConfig {
}
/// Returns the cross-block cache size.
pub const fn cross_block_cache_size(&self) -> usize {
pub const fn cross_block_cache_size(&self) -> u64 {
self.cross_block_cache_size
}
@@ -425,7 +424,7 @@ impl TreeConfig {
}
/// Setter for cross block cache size.
pub const fn with_cross_block_cache_size(mut self, cross_block_cache_size: usize) -> Self {
pub const fn with_cross_block_cache_size(mut self, cross_block_cache_size: u64) -> Self {
self.cross_block_cache_size = cross_block_cache_size;
self
}
@@ -442,6 +441,15 @@ impl TreeConfig {
self
}
/// Setter for whether to disable the parallel sparse trie
pub const fn with_disable_parallel_sparse_trie(
mut self,
disable_parallel_sparse_trie: bool,
) -> Self {
self.disable_parallel_sparse_trie = disable_parallel_sparse_trie;
self
}
/// Setter for whether multiproof task should chunk proof targets.
pub const fn with_multiproof_chunking_enabled(
mut self,
@@ -519,14 +527,14 @@ impl TreeConfig {
self
}
/// Return whether V2 storage proofs are disabled.
pub const fn disable_proof_v2(&self) -> bool {
self.disable_proof_v2
/// Return whether V2 storage proofs are enabled.
pub const fn enable_proof_v2(&self) -> bool {
self.enable_proof_v2
}
/// Setter for whether to disable V2 storage proofs.
pub const fn with_disable_proof_v2(mut self, disable_proof_v2: bool) -> Self {
self.disable_proof_v2 = disable_proof_v2;
/// Setter for whether to enable V2 storage proofs.
pub const fn with_enable_proof_v2(mut self, enable_proof_v2: bool) -> Self {
self.enable_proof_v2 = enable_proof_v2;
self
}

View File

@@ -101,7 +101,7 @@ where
let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new(
let (to_tree_tx, from_tree) = EngineApiTreeHandler::<N::Primitives, _, _, _, _>::spawn_new(
blockchain_db,
consensus,
payload_validator,

View File

@@ -53,7 +53,7 @@ revm-primitives.workspace = true
futures.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "sync", "macros"] }
fixed-cache.workspace = true
mini-moka = { workspace = true, features = ["sync"] }
moka = { workspace = true, features = ["sync"] }
smallvec.workspace = true

File diff suppressed because it is too large Load Diff

View File

@@ -22,9 +22,6 @@ pub(crate) struct EngineApiMetrics {
pub(crate) block_validation: BlockValidationMetrics,
/// Canonical chain and reorg related metrics
pub tree: TreeMetrics,
/// Metrics for EIP-7928 Block-Level Access Lists (BAL).
#[allow(dead_code)]
pub(crate) bal: BalMetrics,
}
impl EngineApiMetrics {
@@ -242,8 +239,6 @@ pub(crate) struct NewPayloadStatusMetrics {
pub(crate) new_payload_error: Counter,
/// The total gas of valid new payload messages received.
pub(crate) new_payload_total_gas: Histogram,
/// The gas used for the last valid new payload.
pub(crate) new_payload_total_gas_last: Gauge,
/// The gas per second of valid new payload messages received.
pub(crate) new_payload_gas_per_second: Histogram,
/// The gas per second for the last new payload call.
@@ -256,8 +251,6 @@ pub(crate) struct NewPayloadStatusMetrics {
pub(crate) time_between_new_payloads: Histogram,
/// Time from previous payload start to current payload start (total interval).
pub(crate) new_payload_interval: Histogram,
/// Time diff between forkchoice updated call response and the next new payload call request.
pub(crate) forkchoice_updated_new_payload_time_diff: Histogram,
}
impl NewPayloadStatusMetrics {
@@ -265,7 +258,6 @@ impl NewPayloadStatusMetrics {
pub(crate) fn update_response_metrics(
&mut self,
start: Instant,
latest_forkchoice_updated_at: &mut Option<Instant>,
result: &Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError>,
gas_used: u64,
) {
@@ -285,7 +277,6 @@ impl NewPayloadStatusMetrics {
PayloadStatusEnum::Valid => {
self.new_payload_valid.increment(1);
self.new_payload_total_gas.record(gas_used as f64);
self.new_payload_total_gas_last.set(gas_used as f64);
let gas_per_second = gas_used as f64 / elapsed.as_secs_f64();
self.new_payload_gas_per_second.record(gas_per_second);
self.new_payload_gas_per_second_last.set(gas_per_second);
@@ -299,40 +290,9 @@ impl NewPayloadStatusMetrics {
self.new_payload_messages.increment(1);
self.new_payload_latency.record(elapsed);
self.new_payload_last.set(elapsed);
if let Some(latest_forkchoice_updated_at) = latest_forkchoice_updated_at.take() {
self.forkchoice_updated_new_payload_time_diff
.record(start - latest_forkchoice_updated_at);
}
}
}
/// Metrics for EIP-7928 Block-Level Access Lists (BAL).
///
/// See also <https://github.com/ethereum/execution-metrics/issues/5>
#[allow(dead_code)]
#[derive(Metrics, Clone)]
#[metrics(scope = "execution.block_access_list")]
pub(crate) struct BalMetrics {
/// Size of the BAL in bytes for the current block.
pub(crate) size_bytes: Gauge,
/// Total number of blocks with valid BALs.
pub(crate) valid_total: Counter,
/// Total number of blocks with invalid BALs.
pub(crate) invalid_total: Counter,
/// Time taken to validate the BAL against actual execution.
pub(crate) validation_time_seconds: Histogram,
/// Number of account changes in the BAL.
pub(crate) account_changes: Gauge,
/// Number of storage changes in the BAL.
pub(crate) storage_changes: Gauge,
/// Number of balance changes in the BAL.
pub(crate) balance_changes: Gauge,
/// Number of nonce changes in the BAL.
pub(crate) nonce_changes: Gauge,
/// Number of code changes in the BAL.
pub(crate) code_changes: Gauge,
}
/// Metrics for non-execution related block validation.
#[derive(Metrics, Clone)]
#[metrics(scope = "sync.block_validation")]
@@ -341,8 +301,6 @@ pub(crate) struct BlockValidationMetrics {
pub(crate) state_root_storage_tries_updated_total: Counter,
/// Total number of times the parallel state root computation fell back to regular.
pub(crate) state_root_parallel_fallback_total: Counter,
/// Total number of times the state root task failed but the fallback succeeded.
pub(crate) state_root_task_fallback_success_total: Counter,
/// Latest state root duration, ie the time spent blocked waiting for the state root.
pub(crate) state_root_duration: Gauge,
/// Histogram for state root duration ie the time spent blocked waiting for the state root

View File

@@ -85,12 +85,6 @@ pub mod state;
/// backfill this gap.
pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
/// The minimum number of blocks to retain in the changeset cache after eviction.
///
/// This ensures that recent trie changesets are kept in memory for potential reorgs,
/// even when the finalized block is not set (e.g., on L2s like Optimism).
const CHANGESET_CACHE_RETENTION_BLOCKS: u64 = 64;
/// A builder for creating state providers that can be used across threads.
#[derive(Clone, Debug)]
pub struct StateProviderBuilder<N: NodePrimitives, P> {
@@ -1384,27 +1378,19 @@ where
debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
// Evict trie changesets for blocks below the eviction threshold.
// Keep at least CHANGESET_CACHE_RETENTION_BLOCKS from the persisted tip, and also respect
// the finalized block if set.
let min_threshold =
last_persisted_block_number.saturating_sub(CHANGESET_CACHE_RETENTION_BLOCKS);
let eviction_threshold =
if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() {
// Use the minimum of finalized block and retention threshold to be conservative
finalized.number.min(min_threshold)
} else {
// When finalized is not set (e.g., on L2s), use the retention threshold
min_threshold
};
debug!(
target: "engine::tree",
last_persisted = last_persisted_block_number,
finalized_number = ?self.canonical_in_memory_state.get_finalized_num_hash().map(|f| f.number),
eviction_threshold,
"Evicting changesets below threshold"
);
self.changeset_cache.evict(eviction_threshold);
// Evict trie changesets for blocks below the finalized block, but keep at least 64 blocks
if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() {
let min_threshold = last_persisted_block_number.saturating_sub(64);
let eviction_threshold = finalized.number.min(min_threshold);
debug!(
target: "engine::tree",
last_persisted = last_persisted_block_number,
finalized_number = finalized.number,
eviction_threshold,
"Evicting changesets below threshold"
);
self.changeset_cache.evict(eviction_threshold);
}
self.on_new_persisted_block()?;
Ok(())
@@ -1492,10 +1478,6 @@ where
self.on_maybe_tree_event(res.event.take())?;
}
if let Err(ref err) = output {
error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
}
self.metrics.engine.forkchoice_updated.update_response_metrics(
start,
&mut self.metrics.engine.new_payload.latest_finish_at,
@@ -1518,12 +1500,10 @@ where
let gas_used = payload.gas_used();
let num_hash = payload.num_hash();
let mut output = self.on_new_payload(payload);
self.metrics.engine.new_payload.update_response_metrics(
start,
&mut self.metrics.engine.forkchoice_updated.latest_finish_at,
&output,
gas_used,
);
self.metrics
.engine
.new_payload
.update_response_metrics(start, &output, gas_used);
let maybe_event =
output.as_mut().ok().and_then(|out| out.event.take());

View File

@@ -0,0 +1,188 @@
//! Configured sparse trie enum for switching between serial and parallel implementations.
use alloy_primitives::B256;
use reth_trie::{BranchNodeMasks, Nibbles, ProofTrieNode, TrieNode};
use reth_trie_sparse::{
errors::SparseTrieResult, provider::TrieNodeProvider, LeafLookup, LeafLookupError,
SerialSparseTrie, SparseTrieInterface, SparseTrieUpdates,
};
use reth_trie_sparse_parallel::ParallelSparseTrie;
use std::borrow::Cow;
/// Enum for switching between serial and parallel sparse trie implementations.
///
/// This type allows runtime selection between different sparse trie implementations,
/// providing flexibility in choosing the appropriate implementation based on workload
/// characteristics.
#[derive(Debug, Clone)]
pub(crate) enum ConfiguredSparseTrie {
/// Serial implementation of the sparse trie.
Serial(Box<SerialSparseTrie>),
/// Parallel implementation of the sparse trie.
Parallel(Box<ParallelSparseTrie>),
}
impl From<SerialSparseTrie> for ConfiguredSparseTrie {
fn from(trie: SerialSparseTrie) -> Self {
Self::Serial(Box::new(trie))
}
}
impl From<ParallelSparseTrie> for ConfiguredSparseTrie {
fn from(trie: ParallelSparseTrie) -> Self {
Self::Parallel(Box::new(trie))
}
}
impl Default for ConfiguredSparseTrie {
fn default() -> Self {
Self::Serial(Default::default())
}
}
impl SparseTrieInterface for ConfiguredSparseTrie {
fn with_root(
self,
root: TrieNode,
masks: Option<BranchNodeMasks>,
retain_updates: bool,
) -> SparseTrieResult<Self> {
match self {
Self::Serial(trie) => {
trie.with_root(root, masks, retain_updates).map(|t| Self::Serial(Box::new(t)))
}
Self::Parallel(trie) => {
trie.with_root(root, masks, retain_updates).map(|t| Self::Parallel(Box::new(t)))
}
}
}
fn with_updates(self, retain_updates: bool) -> Self {
match self {
Self::Serial(trie) => Self::Serial(Box::new(trie.with_updates(retain_updates))),
Self::Parallel(trie) => Self::Parallel(Box::new(trie.with_updates(retain_updates))),
}
}
fn reserve_nodes(&mut self, additional: usize) {
match self {
Self::Serial(trie) => trie.reserve_nodes(additional),
Self::Parallel(trie) => trie.reserve_nodes(additional),
}
}
fn reveal_node(
&mut self,
path: Nibbles,
node: TrieNode,
masks: Option<BranchNodeMasks>,
) -> SparseTrieResult<()> {
match self {
Self::Serial(trie) => trie.reveal_node(path, node, masks),
Self::Parallel(trie) => trie.reveal_node(path, node, masks),
}
}
fn reveal_nodes(&mut self, nodes: Vec<ProofTrieNode>) -> SparseTrieResult<()> {
match self {
Self::Serial(trie) => trie.reveal_nodes(nodes),
Self::Parallel(trie) => trie.reveal_nodes(nodes),
}
}
fn update_leaf<P: TrieNodeProvider>(
&mut self,
full_path: Nibbles,
value: Vec<u8>,
provider: P,
) -> SparseTrieResult<()> {
match self {
Self::Serial(trie) => trie.update_leaf(full_path, value, provider),
Self::Parallel(trie) => trie.update_leaf(full_path, value, provider),
}
}
fn remove_leaf<P: TrieNodeProvider>(
&mut self,
full_path: &Nibbles,
provider: P,
) -> SparseTrieResult<()> {
match self {
Self::Serial(trie) => trie.remove_leaf(full_path, provider),
Self::Parallel(trie) => trie.remove_leaf(full_path, provider),
}
}
fn root(&mut self) -> B256 {
match self {
Self::Serial(trie) => trie.root(),
Self::Parallel(trie) => trie.root(),
}
}
fn update_subtrie_hashes(&mut self) {
match self {
Self::Serial(trie) => trie.update_subtrie_hashes(),
Self::Parallel(trie) => trie.update_subtrie_hashes(),
}
}
fn get_leaf_value(&self, full_path: &Nibbles) -> Option<&Vec<u8>> {
match self {
Self::Serial(trie) => trie.get_leaf_value(full_path),
Self::Parallel(trie) => trie.get_leaf_value(full_path),
}
}
fn find_leaf(
&self,
full_path: &Nibbles,
expected_value: Option<&Vec<u8>>,
) -> Result<LeafLookup, LeafLookupError> {
match self {
Self::Serial(trie) => trie.find_leaf(full_path, expected_value),
Self::Parallel(trie) => trie.find_leaf(full_path, expected_value),
}
}
fn take_updates(&mut self) -> SparseTrieUpdates {
match self {
Self::Serial(trie) => trie.take_updates(),
Self::Parallel(trie) => trie.take_updates(),
}
}
fn wipe(&mut self) {
match self {
Self::Serial(trie) => trie.wipe(),
Self::Parallel(trie) => trie.wipe(),
}
}
fn clear(&mut self) {
match self {
Self::Serial(trie) => trie.clear(),
Self::Parallel(trie) => trie.clear(),
}
}
fn updates_ref(&self) -> Cow<'_, SparseTrieUpdates> {
match self {
Self::Serial(trie) => trie.updates_ref(),
Self::Parallel(trie) => trie.updates_ref(),
}
}
fn shrink_nodes_to(&mut self, size: usize) {
match self {
Self::Serial(trie) => trie.shrink_nodes_to(size),
Self::Parallel(trie) => trie.shrink_nodes_to(size),
}
}
fn shrink_values_to(&mut self, size: usize) {
match self {
Self::Serial(trie) => trie.shrink_values_to(size),
Self::Parallel(trie) => trie.shrink_values_to(size),
}
}
}

View File

@@ -2,7 +2,10 @@
use super::precompile_cache::PrecompileCacheMap;
use crate::tree::{
cached_state::{CachedStateMetrics, CachedStateProvider, ExecutionCache, SavedCache},
cached_state::{
CachedStateMetrics, CachedStateProvider, ExecutionCache as StateExecutionCache,
ExecutionCacheBuilder, SavedCache,
},
payload_processor::{
prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
sparse_trie::StateRootComputeOutcome,
@@ -41,7 +44,7 @@ use reth_trie_parallel::{
};
use reth_trie_sparse::{
provider::{TrieNodeProvider, TrieNodeProviderFactory},
ClearedSparseStateTrie, RevealableSparseTrie, SparseStateTrie,
ClearedSparseStateTrie, SparseStateTrie, SparseTrie,
};
use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
use std::{
@@ -57,12 +60,15 @@ use std::{
use tracing::{debug, debug_span, instrument, warn, Span};
pub mod bal;
mod configured_sparse_trie;
pub mod executor;
pub mod multiproof;
pub mod prewarm;
pub mod receipt_root_task;
pub mod sparse_trie;
use configured_sparse_trie::ConfiguredSparseTrie;
/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
///
/// These values were determined by performing benchmarks using gradually increasing values to judge
@@ -110,11 +116,11 @@ where
/// The executor used by to spawn tasks.
executor: WorkloadExecutor,
/// The most recent cache used for execution.
execution_cache: PayloadExecutionCache,
execution_cache: ExecutionCache,
/// Metrics for trie operations
trie_metrics: MultiProofTaskMetrics,
/// Cross-block cache size in bytes.
cross_block_cache_size: usize,
cross_block_cache_size: u64,
/// Whether transactions should not be executed on prewarming task.
disable_transaction_prewarming: bool,
/// Whether state cache should be disable
@@ -128,8 +134,12 @@ where
/// A cleared `SparseStateTrie`, kept around to be reused for the state root computation so
/// that allocations can be minimized.
sparse_state_trie: Arc<
parking_lot::Mutex<Option<ClearedSparseStateTrie<ParallelSparseTrie, ParallelSparseTrie>>>,
parking_lot::Mutex<
Option<ClearedSparseStateTrie<ConfiguredSparseTrie, ConfiguredSparseTrie>>,
>,
>,
/// Whether to disable the parallel sparse trie.
disable_parallel_sparse_trie: bool,
/// Maximum concurrency for prewarm task.
prewarm_max_concurrency: usize,
/// Whether to disable cache metrics recording.
@@ -164,6 +174,7 @@ where
precompile_cache_disabled: config.precompile_cache_disabled(),
precompile_cache_map,
sparse_state_trie: Arc::default(),
disable_parallel_sparse_trie: config.disable_parallel_sparse_trie(),
prewarm_max_concurrency: config.prewarm_max_concurrency(),
disable_cache_metrics: config.disable_cache_metrics(),
}
@@ -238,7 +249,7 @@ where
let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded();
// Extract V2 proofs flag early so we can pass it to prewarm
let v2_proofs_enabled = !config.disable_proof_v2();
let v2_proofs_enabled = config.enable_proof_v2();
// Handle BAL-based optimization if available
let prewarm_handle = if let Some(bal) = bal {
@@ -286,7 +297,9 @@ where
let multi_proof_task = MultiProofTask::new(
proof_handle.clone(),
to_sparse_trie,
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
config
.multiproof_chunking_enabled()
.then_some(config.effective_multiproof_chunk_size()),
to_multi_proof.clone(),
from_multi_proof,
)
@@ -300,7 +313,7 @@ where
// Build a state provider for the multiproof task
let provider = provider_builder.build().expect("failed to build provider");
let provider = if let Some(saved_cache) = saved_cache {
let (cache, metrics, _disable_metrics) = saved_cache.split();
let (cache, metrics, _) = saved_cache.split();
Box::new(CachedStateProvider::new(provider, cache, metrics))
as Box<dyn StateProvider>
} else {
@@ -482,11 +495,8 @@ where
cache
} else {
debug!("creating new execution cache on cache miss");
let start = Instant::now();
let cache = ExecutionCache::new(self.cross_block_cache_size);
let metrics = CachedStateMetrics::zeroed();
metrics.record_cache_creation(start.elapsed());
SavedCache::new(parent_hash, cache, metrics)
let cache = ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size);
SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
.with_disable_cache_metrics(self.disable_cache_metrics)
}
}
@@ -504,6 +514,7 @@ where
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
{
let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
let disable_parallel_sparse_trie = self.disable_parallel_sparse_trie;
let trie_metrics = self.trie_metrics.clone();
let span = Span::current();
@@ -513,10 +524,14 @@ where
// Reuse a stored SparseStateTrie, or create a new one using the desired configuration
// if there's none to reuse.
let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
let default_trie = RevealableSparseTrie::blind_from(
ParallelSparseTrie::default()
.with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
);
let default_trie = SparseTrie::blind_from(if disable_parallel_sparse_trie {
ConfiguredSparseTrie::Serial(Default::default())
} else {
ConfiguredSparseTrie::Parallel(Box::new(
ParallelSparseTrie::default()
.with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
))
});
ClearedSparseStateTrie::from_state_trie(
SparseStateTrie::new()
.with_accounts_trie(default_trie.clone())
@@ -525,13 +540,12 @@ where
)
});
let task =
SparseTrieTask::<_, ParallelSparseTrie, ParallelSparseTrie>::new_with_cleared_trie(
sparse_trie_rx,
proof_worker_handle,
trie_metrics,
sparse_state_trie,
);
let task = SparseTrieTask::<_, ConfiguredSparseTrie, ConfiguredSparseTrie>::new_with_cleared_trie(
sparse_trie_rx,
proof_worker_handle,
trie_metrics,
sparse_state_trie,
);
let (result, trie) = task.run();
// Send state root computation result
@@ -573,27 +587,28 @@ where
parent_hash = %block_with_parent.parent,
"Cannot find cache for parent hash, skip updating cache with new state for inserted executed block",
);
return
return;
}
// Take existing cache (if any) or create fresh caches
let (caches, cache_metrics, _) = match cached.take() {
Some(existing) => existing.split(),
let (caches, cache_metrics) = match cached.take() {
Some(existing) => {
let (c, m, _) = existing.split();
(c, m)
}
None => (
ExecutionCache::new(self.cross_block_cache_size),
ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size),
CachedStateMetrics::zeroed(),
false,
),
};
// Insert the block's bundle state into cache
let new_cache =
SavedCache::new(block_with_parent.block.hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);
let new_cache = SavedCache::new(block_with_parent.block.hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);
if new_cache.cache().insert_state(bundle_state).is_err() {
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on update error");
return
return;
}
new_cache.update_metrics();
@@ -657,7 +672,7 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
}
/// Returns a clone of the caches used by prewarming
pub(super) fn caches(&self) -> Option<ExecutionCache> {
pub(super) fn caches(&self) -> Option<StateExecutionCache> {
self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.cache().clone())
}
@@ -761,29 +776,29 @@ impl<R> Drop for CacheTaskHandle<R> {
/// ## Cache Safety
///
/// **CRITICAL**: Cache update operations require exclusive access. All concurrent cache users
/// (such as prewarming tasks) must be terminated before calling
/// [`PayloadExecutionCache::update_with_guard`], otherwise the cache may be corrupted or cleared.
/// (such as prewarming tasks) must be terminated before calling `update_with_guard`, otherwise
/// the cache may be corrupted or cleared.
///
/// ## Cache vs Prewarming Distinction
///
/// **[`PayloadExecutionCache`]**:
/// **`ExecutionCache`**:
/// - Stores parent block's execution state after completion
/// - Used to fetch parent data for next block's execution
/// - Must be exclusively accessed during save operations
///
/// **[`PrewarmCacheTask`]**:
/// **`PrewarmCacheTask`**:
/// - Speculatively loads accounts/storage that might be used in transaction execution
/// - Prepares data for state root proof computation
/// - Runs concurrently but must not interfere with cache saves
#[derive(Clone, Debug, Default)]
struct PayloadExecutionCache {
struct ExecutionCache {
/// Guarded cloneable cache identified by a block hash.
inner: Arc<RwLock<Option<SavedCache>>>,
/// Metrics for cache operations.
metrics: ExecutionCacheMetrics,
}
impl PayloadExecutionCache {
impl ExecutionCache {
/// Returns the cache for `parent_hash` if it's available for use.
///
/// A cache is considered available when:
@@ -819,15 +834,11 @@ impl PayloadExecutionCache {
"Existing cache found"
);
if available {
// If the has is available (no other threads are using it), but has a mismatching
// parent hash, we can just clear it and keep using without re-creating from
// scratch.
if !hash_matches {
c.clear();
}
return Some(c.clone())
} else if hash_matches {
if hash_matches && available {
return Some(c.clone());
}
if hash_matches && !available {
self.metrics.execution_cache_in_use.increment(1);
}
} else {
@@ -900,9 +911,9 @@ where
#[cfg(test)]
mod tests {
use super::PayloadExecutionCache;
use super::ExecutionCache;
use crate::tree::{
cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
cached_state::{CachedStateMetrics, ExecutionCacheBuilder, SavedCache},
payload_processor::{
evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
},
@@ -932,13 +943,13 @@ mod tests {
use std::sync::Arc;
fn make_saved_cache(hash: B256) -> SavedCache {
let execution_cache = ExecutionCache::new(1_000);
let execution_cache = ExecutionCacheBuilder::default().build_caches(1_000);
SavedCache::new(hash, execution_cache, CachedStateMetrics::zeroed())
}
#[test]
fn execution_cache_allows_single_checkout() {
let execution_cache = PayloadExecutionCache::default();
let execution_cache = ExecutionCache::default();
let hash = B256::from([1u8; 32]);
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
@@ -957,7 +968,7 @@ mod tests {
#[test]
fn execution_cache_checkout_releases_on_drop() {
let execution_cache = PayloadExecutionCache::default();
let execution_cache = ExecutionCache::default();
let hash = B256::from([2u8; 32]);
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
@@ -973,21 +984,19 @@ mod tests {
}
#[test]
fn execution_cache_mismatch_parent_clears_and_returns() {
let execution_cache = PayloadExecutionCache::default();
fn execution_cache_mismatch_parent_returns_none() {
let execution_cache = ExecutionCache::default();
let hash = B256::from([3u8; 32]);
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
// When the parent hash doesn't match, the cache is cleared and returned for reuse
let different_hash = B256::from([4u8; 32]);
let cache = execution_cache.get_cache_for(different_hash);
assert!(cache.is_some(), "cache should be returned for reuse after clearing")
let miss = execution_cache.get_cache_for(B256::from([4u8; 32]));
assert!(miss.is_none(), "checkout should fail for different parent hash");
}
#[test]
fn execution_cache_update_after_release_succeeds() {
let execution_cache = PayloadExecutionCache::default();
let execution_cache = ExecutionCache::default();
let initial = B256::from([5u8; 32]);
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));

View File

@@ -777,21 +777,10 @@ impl MultiProofTask {
// [`MultiAddedRemovedKeys`]. Even if there are not any known removed keys for the account,
// we still want to optimistically fetch extension children for the leaf addition case.
// V2 multiproofs don't need this.
//
// Only clone the AddedRemovedKeys for accounts in the targets, not the entire accumulated
// set, to avoid O(n) cloning with many buffered blocks.
let multi_added_removed_keys =
if let VersionedMultiProofTargets::Legacy(legacy_targets) = &targets {
self.multi_added_removed_keys.touch_accounts(legacy_targets.keys().copied());
Some(Arc::new(MultiAddedRemovedKeys {
account: self.multi_added_removed_keys.account.clone(),
storages: legacy_targets
.keys()
.filter_map(|k| {
self.multi_added_removed_keys.storages.get(k).map(|v| (*k, v.clone()))
})
.collect(),
}))
Some(Arc::new(self.multi_added_removed_keys.clone()))
} else {
None
};
@@ -1527,9 +1516,8 @@ where
#[cfg(test)]
mod tests {
use crate::tree::cached_state::CachedStateProvider;
use super::*;
use crate::tree::cached_state::{CachedStateProvider, ExecutionCacheBuilder};
use alloy_eip7928::{AccountChanges, BalanceChange};
use alloy_primitives::Address;
use reth_provider::{
@@ -1589,7 +1577,7 @@ mod tests {
{
let db_provider = factory.database_provider_ro().unwrap();
let state_provider: StateProviderBox = Box::new(LatestStateProvider::new(db_provider));
let cache = crate::tree::cached_state::ExecutionCache::new(1000);
let cache = ExecutionCacheBuilder::default().build_caches(1000);
CachedStateProvider::new(state_provider, cache, Default::default())
}

View File

@@ -17,16 +17,17 @@ use crate::tree::{
bal::{total_slots, BALSlotIter},
executor::WorkloadExecutor,
multiproof::{MultiProofMessage, VersionedMultiProofTargets},
PayloadExecutionCache,
ExecutionCache as PayloadExecutionCache,
},
precompile_cache::{CachedPrecompile, PrecompileCacheMap},
ExecutionEnv, StateProviderBuilder,
};
use alloy_consensus::transaction::TxHashRef;
use alloy_eip7928::BlockAccessList;
use alloy_eips::Typed2718;
use alloy_evm::Database;
use alloy_primitives::{keccak256, map::B256Set, B256};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use crossbeam_channel::Sender as CrossbeamSender;
use metrics::{Counter, Gauge, Histogram};
use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, RecoveredTx, SpecFor};
use reth_metrics::Metrics;
@@ -65,6 +66,19 @@ struct IndexedTransaction<Tx> {
tx: Tx,
}
/// Maximum standard Ethereum transaction type value.
///
/// Standard transaction types are:
/// - Type 0: Legacy transactions (original Ethereum)
/// - Type 1: EIP-2930 (access list transactions)
/// - Type 2: EIP-1559 (dynamic fee transactions)
/// - Type 3: EIP-4844 (blob transactions)
/// - Type 4: EIP-7702 (set code authorization transactions)
///
/// Any transaction with a type > 4 is considered a non-standard/system transaction,
/// typically used by L2s for special purposes (e.g., Optimism deposit transactions use type 126).
const MAX_STANDARD_TX_TYPE: u8 = 4;
/// A task that is responsible for caching and prewarming the cache by executing transactions
/// individually in parallel.
///
@@ -163,8 +177,8 @@ where
transaction_count_hint.min(max_concurrency)
};
// Spawn workers
let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, actions_tx.clone(), done_tx.clone());
// Initialize worker handles container
let handles = ctx.clone().spawn_workers(workers_needed, &executor, actions_tx.clone(), done_tx.clone());
// Distribute transactions to workers
let mut tx_index = 0usize;
@@ -179,18 +193,37 @@ where
}
let indexed_tx = IndexedTransaction { index: tx_index, tx };
let is_system_tx = indexed_tx.tx.tx().ty() > MAX_STANDARD_TX_TYPE;
// Send transaction to the workers
// Ignore send errors: workers listen to terminate_execution and may
// exit early when signaled.
let _ = tx_sender.send(indexed_tx);
// System transactions (type > 4) in the first position set critical metadata
// that affects all subsequent transactions (e.g., L1 block info on L2s).
// Broadcast the first system transaction to all workers to ensure they have
// the critical state. This is particularly important for L2s like Optimism
// where the first deposit transaction (type 126) contains essential block metadata.
if tx_index == 0 && is_system_tx {
for handle in &handles {
// Ignore send errors: workers listen to terminate_execution and may
// exit early when signaled. Sending to a disconnected worker is
// possible and harmless and should happen at most once due to
// the terminate_execution check above.
let _ = handle.send(indexed_tx.clone());
}
} else {
// Round-robin distribution for all other transactions
let worker_idx = tx_index % workers_needed;
// Ignore send errors: workers listen to terminate_execution and may
// exit early when signaled. Sending to a disconnected worker is
// possible and harmless and should happen at most once due to
// the terminate_execution check above.
let _ = handles[worker_idx].send(indexed_tx);
}
tx_index += 1;
}
// drop sender and wait for all tasks to finish
// drop handle and wait for all tasks to finish and drop theirs
drop(done_tx);
drop(tx_sender);
drop(handles);
while done_rx.recv().is_ok() {}
let _ = actions_tx
@@ -529,7 +562,7 @@ where
Some((evm, metrics, terminate_execution, v2_proofs_enabled))
}
/// Accepts a [`CrossbeamReceiver`] of transactions and a handle to prewarm task. Executes
/// Accepts an [`mpsc::Receiver`] of transactions and a handle to prewarm task. Executes
/// transactions and streams [`PrewarmTaskEvent::Outcome`] messages for each transaction.
///
/// This function processes transactions sequentially from the receiver and emits outcome events
@@ -541,7 +574,7 @@ where
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn transact_batch<Tx>(
self,
txs: CrossbeamReceiver<IndexedTransaction<Tx>>,
txs: mpsc::Receiver<IndexedTransaction<Tx>>,
sender: Sender<PrewarmTaskEvent<N::Receipt>>,
done_tx: Sender<()>,
) where
@@ -563,7 +596,6 @@ where
index,
tx_hash = %tx.tx().tx_hash(),
is_success = tracing::field::Empty,
gas_used = tracing::field::Empty,
)
.entered();
@@ -629,31 +661,35 @@ where
let _ = done_tx.send(());
}
/// Spawns worker tasks that pull transactions from a shared channel.
///
/// Returns the sender for distributing transactions to workers.
/// Spawns a worker task for transaction execution and returns its sender channel.
fn spawn_workers<Tx>(
self,
workers_needed: usize,
task_executor: &WorkloadExecutor,
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
done_tx: Sender<()>,
) -> CrossbeamSender<IndexedTransaction<Tx>>
) -> Vec<mpsc::Sender<IndexedTransaction<Tx>>>
where
Tx: ExecutableTxFor<Evm> + Send + 'static,
{
let (tx_sender, tx_receiver) = crossbeam_channel::unbounded();
let mut handles = Vec::with_capacity(workers_needed);
let mut receivers = Vec::with_capacity(workers_needed);
// Spawn workers that all pull from the shared receiver
for _ in 0..workers_needed {
let (tx, rx) = mpsc::channel();
handles.push(tx);
receivers.push(rx);
}
// Spawn a separate task spawning workers in parallel.
let executor = task_executor.clone();
let span = Span::current();
task_executor.spawn_blocking(move || {
let _enter = span.entered();
for idx in 0..workers_needed {
for (idx, rx) in receivers.into_iter().enumerate() {
let ctx = self.clone();
let actions_tx = actions_tx.clone();
let done_tx = done_tx.clone();
let rx = tx_receiver.clone();
let span = debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
executor.spawn_blocking(move || {
let _enter = span.entered();
@@ -662,7 +698,7 @@ where
}
});
tx_sender
handles
}
/// Spawns a worker task for BAL slot prefetching.

View File

@@ -77,22 +77,8 @@ impl<R: Receipt> ReceiptRootTaskHandle<R> {
receipt_with_bloom.encode_2718(&mut encode_buf);
aggregated_bloom |= *receipt_with_bloom.bloom_ref();
match builder.push(indexed_receipt.index, &encode_buf) {
Ok(()) => {
received_count += 1;
}
Err(err) => {
// If a duplicate or out-of-bounds index is streamed, skip it and
// fall back to computing the receipt root from the full receipts
// vector later.
tracing::error!(
target: "engine::tree::payload_processor",
index = indexed_receipt.index,
?err,
"Receipt root task received invalid receipt index, skipping"
);
}
}
builder.push_unchecked(indexed_receipt.index, &encode_buf);
received_count += 1;
}
let Ok(root) = builder.finalize() else {

View File

@@ -8,7 +8,7 @@ use reth_trie_parallel::{proof_task::ProofResult, root::ParallelStateRootError};
use reth_trie_sparse::{
errors::{SparseStateTrieResult, SparseTrieErrorKind},
provider::{TrieNodeProvider, TrieNodeProviderFactory},
ClearedSparseStateTrie, SerialSparseTrie, SparseStateTrie, SparseTrie,
ClearedSparseStateTrie, SerialSparseTrie, SparseStateTrie, SparseTrieInterface,
};
use smallvec::SmallVec;
use std::{
@@ -38,8 +38,8 @@ where
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
A: SparseTrieInterface + Send + Sync + Default,
S: SparseTrieInterface + Send + Sync + Default + Clone,
{
/// Creates a new sparse trie, pre-populating with a [`ClearedSparseStateTrie`].
pub(super) fn new_with_cleared_trie(
@@ -150,8 +150,8 @@ where
BPF: TrieNodeProviderFactory + Send + Sync,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
A: SparseTrieInterface + Send + Sync + Default,
S: SparseTrieInterface + Send + Sync + Default + Clone,
{
trace!(target: "engine::root::sparse", "Updating sparse trie");
let started_at = Instant::now();

View File

@@ -503,7 +503,6 @@ where
let root_time = Instant::now();
let mut maybe_state_root = None;
let mut state_root_task_failed = false;
match strategy {
StateRootStrategy::StateRootTask => {
@@ -522,12 +521,10 @@ where
block_state_root = ?block.header().state_root(),
"State root task returned incorrect state root"
);
state_root_task_failed = true;
}
}
Err(error) => {
debug!(target: "engine::tree::payload_validator", %error, "State root task failed");
state_root_task_failed = true;
}
}
}
@@ -572,11 +569,6 @@ where
self.compute_state_root_serial(overlay_factory.clone(), &hashed_state),
block
);
if state_root_task_failed {
self.metrics.block_validation.state_root_task_fallback_success_total.increment(1);
}
(root, updates, root_time.elapsed())
};
@@ -792,11 +784,6 @@ where
// Execute transactions
let exec_span = debug_span!(target: "engine::tree", "execution").entered();
let mut transactions = transactions.into_iter();
// Some executors may execute transactions that do not append receipts during the
// main loop (e.g., system transactions whose receipts are added during finalization).
// In that case, invoking the callback on every transaction would resend the previous
// receipt with the same index and can panic the ordered root builder.
let mut last_sent_len = 0usize;
loop {
// Measure time spent waiting for next transaction from iterator
// (e.g., parallel signature recovery)
@@ -823,14 +810,10 @@ where
let gas_used = executor.execute_transaction(tx)?;
self.metrics.record_transaction_execution(tx_start.elapsed());
let current_len = executor.receipts().len();
if current_len > last_sent_len {
last_sent_len = current_len;
// Send the latest receipt to the background task for incremental root computation.
if let Some(receipt) = executor.receipts().last() {
let tx_index = current_len - 1;
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
}
// Send the latest receipt to the background task for incremental root computation
if let Some(receipt) = executor.receipts().last() {
let tx_index = executor.receipts().len() - 1;
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
}
enter.record("gas_used", gas_used);

View File

@@ -52,7 +52,7 @@ pub fn read_dir(
checksums.next().transpose()?.ok_or_eyre("Got less checksums than ERA files")?;
}
entries.sort_by_key(|(left, _)| *left);
entries.sort_by(|(left, _), (right, _)| left.cmp(right));
Ok(stream::iter(entries.into_iter().skip_while(move |(n, _)| *n < start_index).map(
move |(_, path)| {

View File

@@ -20,7 +20,6 @@ use reth_era::{
},
};
use reth_fs_util as fs;
use reth_primitives_traits::Block;
use reth_storage_api::{BlockNumReader, BlockReader, HeaderProvider};
use std::{
path::PathBuf,
@@ -296,11 +295,9 @@ where
return Err(eyre!("Expected block {expected_block_number}, got {actual_block_number}"));
}
// CompressedBody must contain the block *body* (rlp(body)), not the full block (rlp(block)).
let body = provider
.block_by_number(actual_block_number)?
.ok_or_else(|| eyre!("Block not found for block {}", actual_block_number))?
.into_body();
.ok_or_else(|| eyre!("Block body not found for block {}", actual_block_number))?;
let receipts = provider
.receipts_by_block(actual_block_number.into())?

View File

@@ -59,7 +59,6 @@ std = [
"reth-storage-errors/std",
]
test-utils = [
"std",
"dep:parking_lot",
"dep:derive_more",
"reth-chainspec/test-utils",

View File

@@ -456,22 +456,17 @@ pub struct EthereumPoolBuilder {
// TODO add options for txpool args
}
impl<Types, Node, Evm> PoolBuilder<Node, Evm> for EthereumPoolBuilder
impl<Types, Node> PoolBuilder<Node> for EthereumPoolBuilder
where
Types: NodeTypes<
ChainSpec: EthereumHardforks,
Primitives: NodePrimitives<SignedTx = TransactionSigned>,
>,
Node: FullNodeTypes<Types = Types>,
Evm: ConfigureEvm<Primitives = PrimitivesTy<Types>> + Clone + 'static,
{
type Pool = EthTransactionPool<Node::Provider, DiskFileBlobStore, Evm>;
type Pool = EthTransactionPool<Node::Provider, DiskFileBlobStore>;
async fn build_pool(
self,
ctx: &BuilderContext<Node>,
evm_config: Evm,
) -> eyre::Result<Self::Pool> {
async fn build_pool(self, ctx: &BuilderContext<Node>) -> eyre::Result<Self::Pool> {
let pool_config = ctx.pool_config();
let blobs_disabled = ctx.config().txpool.disable_blobs_support ||
@@ -497,17 +492,17 @@ where
let blob_store =
reth_node_builder::components::create_blob_store_with_cache(ctx, blob_cache_size)?;
let validator =
TransactionValidationTaskExecutor::eth_builder(ctx.provider().clone(), evm_config)
.set_eip4844(!blobs_disabled)
.kzg_settings(ctx.kzg_settings()?)
.with_max_tx_input_bytes(ctx.config().txpool.max_tx_input_bytes)
.with_local_transactions_config(pool_config.local_transactions_config.clone())
.set_tx_fee_cap(ctx.config().rpc.rpc_tx_fee_cap)
.with_max_tx_gas_limit(ctx.config().txpool.max_tx_gas_limit)
.with_minimum_priority_fee(ctx.config().txpool.minimum_priority_fee)
.with_additional_tasks(ctx.config().txpool.additional_validation_tasks)
.build_with_tasks(ctx.task_executor().clone(), blob_store.clone());
let validator = TransactionValidationTaskExecutor::eth_builder(ctx.provider().clone())
.with_head_timestamp(ctx.head().timestamp)
.set_eip4844(!blobs_disabled)
.kzg_settings(ctx.kzg_settings()?)
.with_max_tx_input_bytes(ctx.config().txpool.max_tx_input_bytes)
.with_local_transactions_config(pool_config.local_transactions_config.clone())
.set_tx_fee_cap(ctx.config().rpc.rpc_tx_fee_cap)
.with_max_tx_gas_limit(ctx.config().txpool.max_tx_gas_limit)
.with_minimum_priority_fee(ctx.config().txpool.minimum_priority_fee)
.with_additional_tasks(ctx.config().txpool.additional_validation_tasks)
.build_with_tasks(ctx.task_executor().clone(), blob_store.clone());
if validator.validator().eip4844() {
// initializing the KZG settings can be expensive, this should be done upfront so that

View File

@@ -9,7 +9,6 @@ mod p2p;
mod pool;
mod prestate;
mod rpc;
mod selfdestruct;
mod utils;
const fn main() {}

View File

@@ -1,529 +0,0 @@
//! E2E tests for SELFDESTRUCT behavior and output state verification.
//!
//! These tests verify that:
//! - Pre-Dencun: SELFDESTRUCT clears storage and code, output state reflects this
//! - Post-Dencun (EIP-6780): SELFDESTRUCT only works in same-tx creation, state persists
//!
//! We disable prewarming to ensure deterministic cache behavior and verify the execution
//! output state contains the expected account status after SELFDESTRUCT.
use crate::utils::{eth_payload_attributes, eth_payload_attributes_shanghai};
use alloy_network::{EthereumWallet, TransactionBuilder};
use alloy_primitives::{bytes, Address, Bytes, TxKind, U256};
use alloy_provider::{Provider, ProviderBuilder};
use alloy_rpc_types_eth::TransactionRequest;
use futures::StreamExt;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
use reth_e2e_test_utils::setup_engine;
use reth_node_api::TreeConfig;
use reth_node_ethereum::EthereumNode;
use reth_revm::db::BundleAccount;
use std::sync::Arc;
const MAX_FEE_PER_GAS: u128 = 20_000_000_000;
const MAX_PRIORITY_FEE_PER_GAS: u128 = 1_000_000_000;
fn cancun_spec() -> Arc<ChainSpec> {
Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(serde_json::from_str(include_str!("../assets/genesis.json")).unwrap())
.cancun_activated()
.build(),
)
}
fn shanghai_spec() -> Arc<ChainSpec> {
Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(serde_json::from_str(include_str!("../assets/genesis.json")).unwrap())
.shanghai_activated()
.build(),
)
}
fn deploy_tx(from: Address, nonce: u64, init_code: Bytes) -> TransactionRequest {
TransactionRequest::default()
.with_from(from)
.with_nonce(nonce)
.with_gas_limit(500_000)
.with_max_fee_per_gas(MAX_FEE_PER_GAS)
.with_max_priority_fee_per_gas(MAX_PRIORITY_FEE_PER_GAS)
.with_input(init_code)
.with_kind(TxKind::Create)
}
fn call_tx(from: Address, to: Address, nonce: u64) -> TransactionRequest {
TransactionRequest::default()
.with_from(from)
.with_to(to)
.with_nonce(nonce)
.with_gas_limit(100_000)
.with_max_fee_per_gas(MAX_FEE_PER_GAS)
.with_max_priority_fee_per_gas(MAX_PRIORITY_FEE_PER_GAS)
}
fn transfer_tx(from: Address, to: Address, nonce: u64, value: U256) -> TransactionRequest {
TransactionRequest::default()
.with_from(from)
.with_to(to)
.with_nonce(nonce)
.with_value(value)
.with_gas_limit(21_000)
.with_max_fee_per_gas(MAX_FEE_PER_GAS)
.with_max_priority_fee_per_gas(MAX_PRIORITY_FEE_PER_GAS)
}
/// Creates init code for a contract that selfdestructs during deployment (same tx).
/// This tests the EIP-6780 exception where SELFDESTRUCT in same tx as creation still works.
///
/// The contract:
/// 1. Stores 0x42 at slot 0
/// 2. Immediately selfdestructs to beneficiary (during init, before returning runtime)
fn selfdestruct_in_constructor_init_code() -> Bytes {
// Init code that selfdestructs during deployment:
// PUSH1 0x42, PUSH1 0x00, SSTORE (store 0x42 at slot 0)
// PUSH20 <beneficiary>, SELFDESTRUCT
let mut init = Vec::new();
init.extend_from_slice(&[0x60, 0x42, 0x60, 0x00, 0x55]); // PUSH1 0x42, PUSH1 0x00, SSTORE
init.extend_from_slice(&[
0x73, // PUSH20
0xde, 0xad, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x01, // beneficiary address
]);
init.push(0xff); // SELFDESTRUCT
Bytes::from(init)
}
/// Creates init code for a simple contract that:
/// 1. Stores 0x42 at slot 0 during deployment
/// 2. On any call: selfdestructs to beneficiary
///
/// This simpler contract avoids complex branching logic.
fn selfdestruct_contract_init_code() -> Bytes {
// Runtime: just selfdestruct on any call
// PUSH20 <beneficiary>
// SELFDESTRUCT
let runtime = bytes!(
"73dead000000000000000000000000000000000001" // PUSH20 beneficiary
"ff" // SELFDESTRUCT
);
let runtime_len = runtime.len(); // 22 bytes
// Init code: SSTORE(0, 0x42), CODECOPY, RETURN
// Total init code before runtime = 17 bytes
let init_len: u8 = 17;
let mut init = Vec::new();
init.extend_from_slice(&[0x60, 0x42, 0x60, 0x00, 0x55]); // PUSH1 0x42, PUSH1 0x00, SSTORE
init.extend_from_slice(&[0x60, runtime_len as u8, 0x60, init_len, 0x60, 0x00, 0x39]); // CODECOPY
init.extend_from_slice(&[0x60, runtime_len as u8, 0x60, 0x00, 0xf3]); // RETURN
init.extend_from_slice(&runtime);
Bytes::from(init)
}
/// Tests SELFDESTRUCT behavior post-Dencun (Cancun+).
///
/// Post-Dencun (EIP-6780):
/// - SELFDESTRUCT only deletes contract if called in same tx as creation
/// - For existing contracts, SELFDESTRUCT only sends balance, code/storage persist
/// - The output state should NOT mark the account as destroyed
///
/// This test verifies:
/// 1. Contract deploys with storage
/// 2. SELFDESTRUCT in later tx does NOT delete code/storage
/// 3. Output state shows account is NOT destroyed
#[tokio::test]
async fn test_selfdestruct_post_dencun() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
let (mut nodes, _tasks, wallet) =
setup_engine::<EthereumNode>(1, cancun_spec(), false, tree_config, eth_payload_attributes)
.await?;
let mut node = nodes.pop().unwrap();
let signer = wallet.inner.clone();
let provider = ProviderBuilder::new()
.wallet(EthereumWallet::new(signer.clone()))
.connect_http(node.rpc_url());
// Deploy contract that stores 0x42 at slot 0 and selfdestructs on any call
let pending = provider
.send_transaction(deploy_tx(signer.address(), 0, selfdestruct_contract_init_code()))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "Contract deployment should succeed");
let contract_address = receipt.contract_address.expect("Should have contract address");
// Consume the canonical notification for deployment block
let _ = node.canonical_stream.next().await;
// Trigger SELFDESTRUCT by calling the contract
let pending = provider.send_transaction(call_tx(signer.address(), contract_address, 1)).await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "Selfdestruct tx should succeed");
// Get the canonical notification for the selfdestruct block
let notification = node.canonical_stream.next().await.unwrap();
let chain = notification.committed();
let execution_outcome = chain.execution_outcome();
// Verify the output state: post-Dencun, account should NOT be destroyed
let account_state: Option<&BundleAccount> = execution_outcome.bundle.account(&contract_address);
assert!(
account_state.is_none() || !account_state.unwrap().was_destroyed(),
"Post-Dencun (EIP-6780): Account should NOT be destroyed when SELFDESTRUCT called on existing contract"
);
// Verify via RPC that code and storage persist
let code_after = provider.get_code_at(contract_address).await?;
assert!(!code_after.is_empty(), "Post-Dencun: Contract code should persist");
let slot0_after = provider.get_storage_at(contract_address, U256::ZERO).await?;
assert_eq!(slot0_after, U256::from(0x42), "Post-Dencun: Storage should persist");
// Send another transaction to the contract address in a new block.
// This tests cache behavior - if cache has stale data, execution would be incorrect.
// Post-Dencun: calling the contract should trigger SELFDESTRUCT again (but only transfer
// balance)
let pending = provider.send_transaction(call_tx(signer.address(), contract_address, 2)).await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "Second call to contract should succeed");
// Consume the canonical notification
let notification = node.canonical_stream.next().await.unwrap();
let chain = notification.committed();
let execution_outcome = chain.execution_outcome();
// Verify the output state still shows account NOT destroyed
let account_state: Option<&BundleAccount> = execution_outcome.bundle.account(&contract_address);
assert!(
account_state.is_none() || !account_state.unwrap().was_destroyed(),
"Post-Dencun: Account should still NOT be destroyed after second SELFDESTRUCT call"
);
// Verify code and storage still persist after the second call
let code_final = provider.get_code_at(contract_address).await?;
assert!(!code_final.is_empty(), "Post-Dencun: Contract code should still persist");
let slot0_final = provider.get_storage_at(contract_address, U256::ZERO).await?;
assert_eq!(slot0_final, U256::from(0x42), "Post-Dencun: Storage should still persist");
Ok(())
}
/// Tests SELFDESTRUCT in same transaction as creation (post-Dencun).
///
/// Post-Dencun (EIP-6780):
/// - SELFDESTRUCT during the same transaction as creation DOES delete the contract
/// - This is the exception to the rule that SELFDESTRUCT no longer deletes contracts
///
/// This test verifies:
/// 1. Contract selfdestructs during its constructor
/// 2. Contract is deleted (same-tx exception applies)
/// 3. No code or storage remains
/// 4. Since account never existed in DB before, bundle has no entry for it
#[tokio::test]
async fn test_selfdestruct_same_tx_post_dencun() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
let (mut nodes, _tasks, wallet) =
setup_engine::<EthereumNode>(1, cancun_spec(), false, tree_config, eth_payload_attributes)
.await?;
let mut node = nodes.pop().unwrap();
let signer = wallet.inner.clone();
let provider = ProviderBuilder::new()
.wallet(EthereumWallet::new(signer.clone()))
.connect_http(node.rpc_url());
// Deploy contract that selfdestructs during its constructor
let pending = provider
.send_transaction(deploy_tx(signer.address(), 0, selfdestruct_in_constructor_init_code()))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "Contract deployment with selfdestruct should succeed");
// Calculate the contract address (CREATE uses sender + nonce)
let contract_address = signer.address().create(0);
// Get the canonical notification for the deployment block
let notification = node.canonical_stream.next().await.unwrap();
let chain = notification.committed();
let execution_outcome = chain.execution_outcome();
// Verify the output state: same-tx SELFDESTRUCT should destroy the account
let account_state: Option<&BundleAccount> = execution_outcome.bundle.account(&contract_address);
assert!(
account_state.is_none(),
"Post-Dencun same-tx: Account was created and selfdestructed in the same transaction, no trace in bundle state"
);
// Verify via RPC that code and storage are cleared
let code = provider.get_code_at(contract_address).await?;
assert!(code.is_empty(), "Post-Dencun same-tx: Contract code should be deleted");
let slot0 = provider.get_storage_at(contract_address, U256::ZERO).await?;
assert_eq!(slot0, U256::ZERO, "Post-Dencun same-tx: Storage should be cleared");
// Send ETH to the destroyed address in a new block to test cache behavior
let pending = provider
.send_transaction(transfer_tx(signer.address(), contract_address, 1, U256::from(1000)))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "ETH transfer to destroyed address should succeed");
// Consume the canonical notification
let _ = node.canonical_stream.next().await;
// Verify code is still empty and account received ETH
let code_final = provider.get_code_at(contract_address).await?;
assert!(code_final.is_empty(), "Post-Dencun same-tx: Contract code should remain deleted");
let balance = provider.get_balance(contract_address).await?;
assert_eq!(balance, U256::from(1000), "Post-Dencun same-tx: Account should have received ETH");
Ok(())
}
/// Tests SELFDESTRUCT behavior pre-Dencun (Shanghai).
///
/// Pre-Dencun:
/// - SELFDESTRUCT deletes contract code and storage regardless of when contract was created
/// - The output state MUST mark the account as destroyed
///
/// This test verifies:
/// 1. Contract deploys with storage
/// 2. SELFDESTRUCT deletes code and storage
/// 3. Output state shows account IS destroyed
#[tokio::test]
async fn test_selfdestruct_pre_dencun() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
1,
shanghai_spec(),
false,
tree_config,
eth_payload_attributes_shanghai,
)
.await?;
let mut node = nodes.pop().unwrap();
let signer = wallet.inner.clone();
let provider = ProviderBuilder::new()
.wallet(EthereumWallet::new(signer.clone()))
.connect_http(node.rpc_url());
// Deploy contract that stores 0x42 at slot 0 and selfdestructs on any call
let pending = provider
.send_transaction(deploy_tx(signer.address(), 0, selfdestruct_contract_init_code()))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "Contract deployment should succeed");
let contract_address = receipt.contract_address.expect("Should have contract address");
// Consume the canonical notification for deployment block
let _ = node.canonical_stream.next().await;
// Trigger SELFDESTRUCT by calling the contract
let pending = provider.send_transaction(call_tx(signer.address(), contract_address, 1)).await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "Selfdestruct tx should succeed");
// Get the canonical notification for the selfdestruct block
let notification = node.canonical_stream.next().await.unwrap();
let chain = notification.committed();
let execution_outcome = chain.execution_outcome();
// Verify the output state: pre-Dencun, account MUST be destroyed
let account_state: Option<&BundleAccount> = execution_outcome.bundle.account(&contract_address);
assert!(
account_state.is_some_and(|a: &BundleAccount| a.was_destroyed()),
"Pre-Dencun: Account MUST be marked as destroyed in output state"
);
// Verify via RPC that code and storage are cleared
let code_after = provider.get_code_at(contract_address).await?;
assert!(code_after.is_empty(), "Pre-Dencun: Contract code should be deleted");
let slot0_after = provider.get_storage_at(contract_address, U256::ZERO).await?;
assert_eq!(slot0_after, U256::ZERO, "Pre-Dencun: Storage should be cleared");
// Send ETH to the destroyed contract address in a new block.
// This tests cache behavior - the cache should correctly reflect the account was destroyed.
// Pre-Dencun: the contract no longer exists, so this is just a plain ETH transfer.
let pending = provider
.send_transaction(transfer_tx(signer.address(), contract_address, 2, U256::from(1000)))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "ETH transfer to destroyed contract address should succeed");
// Consume the canonical notification
let notification = node.canonical_stream.next().await.unwrap();
let chain = notification.committed();
let execution_outcome = chain.execution_outcome();
// Verify the output state shows the account exists (received ETH) but has no code
let account_state: Option<&BundleAccount> = execution_outcome.bundle.account(&contract_address);
// After receiving ETH, the account should exist with balance but no code
assert!(
account_state.is_some(),
"Pre-Dencun: Account should exist after receiving ETH (even though contract was destroyed)"
);
// Verify code is still empty (contract was destroyed, only ETH was received)
let code_final = provider.get_code_at(contract_address).await?;
assert!(code_final.is_empty(), "Pre-Dencun: Contract code should remain deleted");
// Verify storage is still cleared
let slot0_final = provider.get_storage_at(contract_address, U256::ZERO).await?;
assert_eq!(slot0_final, U256::ZERO, "Pre-Dencun: Storage should remain cleared");
// Verify the account now has the ETH balance we sent
let balance = provider.get_balance(contract_address).await?;
assert_eq!(balance, U256::from(1000), "Pre-Dencun: Account should have received ETH");
Ok(())
}
/// Tests SELFDESTRUCT in same transaction as creation, where account previously had ETH
/// (post-Dencun).
///
/// Post-Dencun (EIP-6780):
/// - The same-tx exception applies when the CONTRACT is created in that transaction
/// - Even if the address previously had ETH (as an EOA), deploying a contract there and
/// selfdestructing in the same tx DOES delete the contract
/// - The "created in same tx" refers to contract creation, not account existence
///
/// This test verifies:
/// 1. Send ETH to the future contract address (address has balance but no code)
/// 2. Deploy contract that selfdestructs during constructor to that address
/// 3. Contract is deleted (same-tx exception applies - contract was created this tx)
/// 4. Code and storage are cleared
/// 5. Since account existed in DB before (had ETH), bundle marks it as Destroyed
#[tokio::test]
async fn test_selfdestruct_same_tx_preexisting_account_post_dencun() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
let (mut nodes, _tasks, wallet) =
setup_engine::<EthereumNode>(1, cancun_spec(), false, tree_config, eth_payload_attributes)
.await?;
let mut node = nodes.pop().unwrap();
let signer = wallet.inner.clone();
let provider = ProviderBuilder::new()
.wallet(EthereumWallet::new(signer.clone()))
.connect_http(node.rpc_url());
// Calculate where the contract will be deployed (CREATE uses sender + nonce)
// We'll use nonce 1 for deployment, so first send ETH with nonce 0
let future_contract_address = signer.address().create(1);
// Send ETH to the future contract address first (makes it a pre-existing account)
let pending = provider
.send_transaction(transfer_tx(
signer.address(),
future_contract_address,
0,
U256::from(1000),
))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "ETH transfer should succeed");
// Consume the canonical notification
let _ = node.canonical_stream.next().await;
// Verify the account exists and has balance
let balance_before = provider.get_balance(future_contract_address).await?;
assert_eq!(balance_before, U256::from(1000), "Account should have ETH before deployment");
// Now deploy contract that selfdestructs during its constructor to the same address
let pending = provider
.send_transaction(deploy_tx(signer.address(), 1, selfdestruct_in_constructor_init_code()))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "Contract deployment with selfdestruct should succeed");
// Verify deployment went to the expected address
assert_eq!(
receipt.contract_address,
Some(future_contract_address),
"Contract should be deployed to pre-computed address"
);
// Get the canonical notification for the deployment block
let notification = node.canonical_stream.next().await.unwrap();
let chain = notification.committed();
let execution_outcome = chain.execution_outcome();
// Verify the output state: same-tx exception DOES apply because contract was created this tx
// The account should be marked as destroyed. Since it had prior state (ETH balance),
// the bundle will contain it with status Destroyed and original_info set.
let account_state: Option<&BundleAccount> =
execution_outcome.bundle.account(&future_contract_address);
assert!(
account_state.is_some_and(|a| a.was_destroyed()),
"Post-Dencun same-tx with prior ETH: Account MUST be marked as destroyed"
);
// Verify via RPC that code and storage are cleared
let code = provider.get_code_at(future_contract_address).await?;
assert!(code.is_empty(), "Post-Dencun same-tx: Contract code should be deleted");
let slot0 = provider.get_storage_at(future_contract_address, U256::ZERO).await?;
assert_eq!(slot0, U256::ZERO, "Post-Dencun same-tx: Storage should be cleared");
// Balance should be zero (sent to beneficiary during SELFDESTRUCT)
let balance_after = provider.get_balance(future_contract_address).await?;
assert_eq!(
balance_after,
U256::ZERO,
"Post-Dencun same-tx: Balance should be zero (sent to beneficiary)"
);
// Send ETH to the destroyed address to verify cache behavior
let pending = provider
.send_transaction(transfer_tx(
signer.address(),
future_contract_address,
2,
U256::from(2000),
))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "ETH transfer should succeed");
// Consume notification
let _ = node.canonical_stream.next().await;
// Verify the account received ETH and has no code (it's now just an EOA)
let balance_final = provider.get_balance(future_contract_address).await?;
assert_eq!(balance_final, U256::from(2000), "Account should have received ETH");
let code_final = provider.get_code_at(future_contract_address).await?;
assert!(code_final.is_empty(), "Code should remain empty after ETH transfer");
let slot0_final = provider.get_storage_at(future_contract_address, U256::ZERO).await?;
assert_eq!(slot0_final, U256::ZERO, "Storage should remain cleared");
Ok(())
}

View File

@@ -29,19 +29,6 @@ pub(crate) fn eth_payload_attributes(timestamp: u64) -> EthPayloadBuilderAttribu
EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
}
/// Helper function to create pre-Cancun (Shanghai) payload attributes.
/// No `parent_beacon_block_root` field.
pub(crate) fn eth_payload_attributes_shanghai(timestamp: u64) -> EthPayloadBuilderAttributes {
let attributes = PayloadAttributes {
timestamp,
prev_randao: B256::ZERO,
suggested_fee_recipient: Address::ZERO,
withdrawals: Some(vec![]),
parent_beacon_block_root: None,
};
EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
}
/// Advances node by producing blocks with random transactions.
pub(crate) async fn advance_with_random_transactions<Provider>(
node: &mut NodeHelperType<EthereumNode, Provider>,

View File

@@ -75,11 +75,9 @@ pub trait Executor<DB: Database>: Sized {
where
I: IntoIterator<Item = &'a RecoveredBlock<<Self::Primitives as NodePrimitives>::Block>>,
{
let blocks_iter = blocks.into_iter();
let capacity = blocks_iter.size_hint().0;
let mut results = Vec::with_capacity(capacity);
let mut results = Vec::new();
let mut first_block = None;
for block in blocks_iter {
for block in blocks {
if first_block.is_none() {
first_block = Some(block.header().number());
}

View File

@@ -35,7 +35,7 @@ use reth_execution_errors::BlockExecutionError;
use reth_primitives_traits::{
BlockTy, HeaderTy, NodePrimitives, ReceiptTy, SealedBlock, SealedHeader, TxTy,
};
use revm::{context::TxEnv, database::State, primitives::hardfork::SpecId};
use revm::{context::TxEnv, database::State};
pub mod either;
/// EVM environment configuration.
@@ -203,7 +203,6 @@ pub trait ConfigureEvm: Clone + Debug + Send + Sync + Unpin {
+ FromRecoveredTx<TxTy<Self::Primitives>>
+ FromTxWithEncoded<TxTy<Self::Primitives>>,
Precompiles = PrecompilesMap,
Spec: Into<SpecId>,
>,
>;

View File

@@ -66,17 +66,13 @@ use tokio::sync::mpsc::{Sender, UnboundedReceiver};
#[non_exhaustive]
pub struct TestPoolBuilder;
impl<Node, Evm: Send> PoolBuilder<Node, Evm> for TestPoolBuilder
impl<Node> PoolBuilder<Node> for TestPoolBuilder
where
Node: FullNodeTypes<Types: NodeTypes<Primitives: NodePrimitives<SignedTx = TransactionSigned>>>,
{
type Pool = TestPool;
async fn build_pool(
self,
_ctx: &BuilderContext<Node>,
_evm_config: Evm,
) -> eyre::Result<Self::Pool> {
async fn build_pool(self, _ctx: &BuilderContext<Node>) -> eyre::Result<Self::Pool> {
Ok(testing_pool())
}
}
@@ -251,7 +247,7 @@ pub async fn test_exex_context_with_chain_spec(
db,
chain_spec.clone(),
StaticFileProvider::read_write(static_dir.keep()).expect("static file provider"),
RocksDBProvider::builder(rocksdb_dir.keep()).with_default_tables().build().unwrap(),
RocksDBProvider::builder(rocksdb_dir.keep()).build().unwrap(),
)?;
let genesis_hash = init_genesis(&provider_factory)?;

View File

@@ -1631,7 +1631,7 @@ impl Discv4Service {
.filter(|entry| entry.node.value.is_expired())
.map(|n| n.node.value)
.collect::<Vec<_>>();
nodes.sort_by_key(|a| a.last_seen);
nodes.sort_by(|a, b| a.last_seen.cmp(&b.last_seen));
let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::<Vec<_>>();
for node in to_ping {
self.try_ping(node, PingReason::RePing)

View File

@@ -14,7 +14,6 @@ workspace = true
[dependencies]
# reth
reth-chainspec.workspace = true
reth-evm-ethereum = { workspace = true, optional = true }
reth-fs-util.workspace = true
reth-primitives-traits.workspace = true
reth-net-banlist.workspace = true
@@ -137,8 +136,6 @@ test-utils = [
"reth-primitives-traits/test-utils",
"reth-provider/test-utils",
"reth-ethereum-primitives/test-utils",
"dep:reth-evm-ethereum",
"reth-evm-ethereum?/test-utils",
]
[[bench]]

View File

@@ -19,7 +19,6 @@ use reth_eth_wire::{
protocol::Protocol, DisconnectReason, EthNetworkPrimitives, HelloMessageWithProtocols,
};
use reth_ethereum_primitives::{PooledTransactionVariant, TransactionSigned};
use reth_evm_ethereum::EthEvmConfig;
use reth_network_api::{
events::{PeerEvent, SessionInfo},
test_utils::{PeersHandle, PeersHandleProvider},
@@ -183,20 +182,17 @@ where
C: ChainSpecProvider<ChainSpec: EthereumHardforks>
+ StateProviderFactory
+ BlockReaderIdExt
+ HeaderProvider<Header = alloy_consensus::Header>
+ HeaderProvider
+ Clone
+ 'static,
Pool: TransactionPool,
{
/// Installs an eth pool on each peer
pub fn with_eth_pool(
self,
) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore, EthEvmConfig>> {
pub fn with_eth_pool(self) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
self.map_pool(|peer| {
let blob_store = InMemoryBlobStore::default();
let pool = TransactionValidationTaskExecutor::eth(
peer.client.clone(),
EthEvmConfig::mainnet(),
blob_store.clone(),
TokioTaskExecutor::default(),
);
@@ -212,7 +208,7 @@ where
pub fn with_eth_pool_config(
self,
tx_manager_config: TransactionsManagerConfig,
) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore, EthEvmConfig>> {
) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
self.with_eth_pool_config_and_policy(tx_manager_config, Default::default())
}
@@ -221,12 +217,11 @@ where
self,
tx_manager_config: TransactionsManagerConfig,
policy: TransactionPropagationKind,
) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore, EthEvmConfig>> {
) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
self.map_pool(|peer| {
let blob_store = InMemoryBlobStore::default();
let pool = TransactionValidationTaskExecutor::eth(
peer.client.clone(),
EthEvmConfig::mainnet(),
blob_store.clone(),
TokioTaskExecutor::default(),
);

View File

@@ -188,7 +188,13 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
let TxFetchMetadata { fallback_peers, .. } =
self.hashes_fetch_inflight_and_pending_fetch.peek(&hash)?;
fallback_peers.iter().find(|peer_id| self.is_idle(peer_id))
for peer_id in fallback_peers.iter() {
if self.is_idle(peer_id) {
return Some(peer_id)
}
}
None
}
/// Returns any idle peer for any hash pending fetch. If one is found, the corresponding

View File

@@ -20,7 +20,6 @@ use reth_network_p2p::{
};
use reth_network_peers::{mainnet_nodes, NodeRecord, TrustedPeer};
use reth_network_types::peers::config::PeerBackoffDurations;
use reth_provider::test_utils::MockEthProvider;
use reth_storage_api::noop::NoopProvider;
use reth_tracing::init_test_tracing;
use reth_transaction_pool::test_utils::testing_pool;
@@ -656,8 +655,7 @@ async fn new_random_peer(
async fn test_connect_many() {
reth_tracing::init_test_tracing();
let provider = MockEthProvider::default().with_genesis_block();
let net = Testnet::create_with(5, provider).await;
let net = Testnet::create_with(5, NoopProvider::default()).await;
// install request handlers
let net = net.with_eth_pool();

View File

@@ -22,7 +22,7 @@ use tokio::join;
async fn test_tx_gossip() {
reth_tracing::init_test_tracing();
let provider = MockEthProvider::default().with_genesis_block();
let provider = MockEthProvider::default();
let net = Testnet::create_with(2, provider.clone()).await;
// install request handlers
@@ -61,7 +61,7 @@ async fn test_tx_gossip() {
async fn test_tx_propagation_policy_trusted_only() {
reth_tracing::init_test_tracing();
let provider = MockEthProvider::default().with_genesis_block();
let provider = MockEthProvider::default();
let policy = TransactionPropagationKind::Trusted;
let net = Testnet::create_with(2, provider.clone()).await;
@@ -129,7 +129,7 @@ async fn test_tx_propagation_policy_trusted_only() {
async fn test_tx_ingress_policy_trusted_only() {
reth_tracing::init_test_tracing();
let provider = MockEthProvider::default().with_genesis_block();
let provider = MockEthProvider::default();
let tx_manager_config = TransactionsManagerConfig {
ingress_policy: TransactionIngressPolicy::Trusted,
@@ -195,7 +195,7 @@ async fn test_tx_ingress_policy_trusted_only() {
#[tokio::test(flavor = "multi_thread")]
async fn test_4844_tx_gossip_penalization() {
reth_tracing::init_test_tracing();
let provider = MockEthProvider::default().with_genesis_block();
let provider = MockEthProvider::default();
let net = Testnet::create_with(2, provider.clone()).await;
// install request handlers
@@ -246,7 +246,7 @@ async fn test_4844_tx_gossip_penalization() {
#[tokio::test(flavor = "multi_thread")]
async fn test_sending_invalid_transactions() {
reth_tracing::init_test_tracing();
let provider = MockEthProvider::default().with_genesis_block();
let provider = MockEthProvider::default();
let net = Testnet::create_with(2, provider.clone()).await;
// install request handlers
let net = net.with_eth_pool();

View File

@@ -571,8 +571,8 @@ where
debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed");
}
}
if this.request.bodies.is_none() && !this.is_bodies_complete() {
// no pending bodies request (e.g., request error), retry remaining bodies
if this.bodies.is_empty() {
// received bad response, re-request headers
// TODO: convert this into two futures, one which is a headers range
// future, and one which is a bodies range future.
//
@@ -751,12 +751,8 @@ mod tests {
use reth_ethereum_primitives::BlockBody;
use super::*;
use crate::{error::RequestError, test_utils::TestFullBlockClient};
use std::{
ops::Range,
sync::atomic::{AtomicUsize, Ordering},
};
use tokio::time::{timeout, Duration};
use crate::test_utils::TestFullBlockClient;
use std::ops::Range;
#[tokio::test]
async fn download_single_full_block() {
@@ -804,65 +800,6 @@ mod tests {
(sealed_header, body)
}
#[derive(Clone, Debug)]
struct FailingBodiesClient {
inner: TestFullBlockClient,
fail_on: usize,
body_requests: Arc<AtomicUsize>,
}
impl FailingBodiesClient {
fn new(inner: TestFullBlockClient, fail_on: usize) -> Self {
Self { inner, fail_on, body_requests: Arc::new(AtomicUsize::new(0)) }
}
}
impl DownloadClient for FailingBodiesClient {
fn report_bad_message(&self, peer_id: PeerId) {
self.inner.report_bad_message(peer_id);
}
fn num_connected_peers(&self) -> usize {
self.inner.num_connected_peers()
}
}
impl HeadersClient for FailingBodiesClient {
type Header = <TestFullBlockClient as HeadersClient>::Header;
type Output = <TestFullBlockClient as HeadersClient>::Output;
fn get_headers_with_priority(
&self,
request: HeadersRequest,
priority: Priority,
) -> Self::Output {
self.inner.get_headers_with_priority(request, priority)
}
}
impl BodiesClient for FailingBodiesClient {
type Body = <TestFullBlockClient as BodiesClient>::Body;
type Output = <TestFullBlockClient as BodiesClient>::Output;
fn get_block_bodies_with_priority_and_range_hint(
&self,
hashes: Vec<B256>,
priority: Priority,
range_hint: Option<RangeInclusive<u64>>,
) -> Self::Output {
let attempt = self.body_requests.fetch_add(1, Ordering::SeqCst);
if attempt == self.fail_on {
return futures::future::ready(Err(RequestError::Timeout))
}
self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
}
}
impl BlockClient for FailingBodiesClient {
type Block = reth_ethereum_primitives::Block;
}
#[tokio::test]
async fn download_full_block_range() {
let client = TestFullBlockClient::default();
@@ -900,25 +837,6 @@ mod tests {
}
}
#[tokio::test]
async fn download_full_block_range_retries_after_body_error() {
let mut client = TestFullBlockClient::default();
client.set_soft_limit(2);
let (header, _) = insert_headers_into_client(&client, 0..3);
let client = FailingBodiesClient::new(client, 1);
let body_requests = Arc::clone(&client.body_requests);
let client = FullBlockClient::test_client(client);
let received =
timeout(Duration::from_secs(1), client.get_full_block_range(header.hash(), 3))
.await
.expect("body request retry should complete");
assert_eq!(received.len(), 3);
assert_eq!(body_requests.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn download_full_block_range_with_invalid_header() {
let client = TestFullBlockClient::default();

View File

@@ -62,12 +62,12 @@ impl<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB>
pool_builder,
payload_builder,
network_builder,
executor_builder,
executor_builder: evm_builder,
consensus_builder,
_marker,
} = self;
ComponentsBuilder {
executor_builder,
executor_builder: evm_builder,
pool_builder,
payload_builder,
network_builder,
@@ -149,12 +149,15 @@ where
pub fn pool<PB>(
self,
pool_builder: PB,
) -> ComponentsBuilder<Node, PB, PayloadB, NetworkB, ExecB, ConsB> {
) -> ComponentsBuilder<Node, PB, PayloadB, NetworkB, ExecB, ConsB>
where
PB: PoolBuilder<Node>,
{
let Self {
pool_builder: _,
payload_builder,
network_builder,
executor_builder,
executor_builder: evm_builder,
consensus_builder,
_marker,
} = self;
@@ -162,7 +165,7 @@ where
pool_builder,
payload_builder,
network_builder,
executor_builder,
executor_builder: evm_builder,
consensus_builder,
_marker,
}
@@ -182,6 +185,72 @@ where
_marker: self._marker,
}
}
}
impl<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB>
ComponentsBuilder<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB>
where
Node: FullNodeTypes,
PoolB: PoolBuilder<Node>,
{
/// Configures the network builder.
///
/// This accepts a [`NetworkBuilder`] instance that will be used to create the node's network
/// stack.
pub fn network<NB>(
self,
network_builder: NB,
) -> ComponentsBuilder<Node, PoolB, PayloadB, NB, ExecB, ConsB>
where
NB: NetworkBuilder<Node, PoolB::Pool>,
{
let Self {
pool_builder,
payload_builder,
network_builder: _,
executor_builder: evm_builder,
consensus_builder,
_marker,
} = self;
ComponentsBuilder {
pool_builder,
payload_builder,
network_builder,
executor_builder: evm_builder,
consensus_builder,
_marker,
}
}
/// Configures the payload builder.
///
/// This accepts a [`PayloadServiceBuilder`] instance that will be used to create the node's
/// payload builder service.
pub fn payload<PB>(
self,
payload_builder: PB,
) -> ComponentsBuilder<Node, PoolB, PB, NetworkB, ExecB, ConsB>
where
ExecB: ExecutorBuilder<Node>,
PB: PayloadServiceBuilder<Node, PoolB::Pool, ExecB::EVM>,
{
let Self {
pool_builder,
payload_builder: _,
network_builder,
executor_builder: evm_builder,
consensus_builder,
_marker,
} = self;
ComponentsBuilder {
pool_builder,
payload_builder,
network_builder,
executor_builder: evm_builder,
consensus_builder,
_marker,
}
}
/// Configures the executor builder.
///
@@ -229,72 +298,7 @@ where
network_builder,
executor_builder,
consensus_builder: _,
_marker,
} = self;
ComponentsBuilder {
pool_builder,
payload_builder,
network_builder,
executor_builder,
consensus_builder,
_marker,
}
}
}
impl<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB>
ComponentsBuilder<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB>
where
Node: FullNodeTypes,
ExecB: ExecutorBuilder<Node>,
PoolB: PoolBuilder<Node, ExecB::EVM>,
{
/// Configures the network builder.
///
/// This accepts a [`NetworkBuilder`] instance that will be used to create the node's network
/// stack.
pub fn network<NB>(
self,
network_builder: NB,
) -> ComponentsBuilder<Node, PoolB, PayloadB, NB, ExecB, ConsB>
where
NB: NetworkBuilder<Node, PoolB::Pool>,
{
let Self {
pool_builder,
payload_builder,
network_builder: _,
executor_builder,
consensus_builder,
_marker,
} = self;
ComponentsBuilder {
pool_builder,
payload_builder,
network_builder,
executor_builder,
consensus_builder,
_marker,
}
}
/// Configures the payload builder.
///
/// This accepts a [`PayloadServiceBuilder`] instance that will be used to create the node's
/// payload builder service.
pub fn payload<PB>(
self,
payload_builder: PB,
) -> ComponentsBuilder<Node, PoolB, PB, NetworkB, ExecB, ConsB>
where
PB: PayloadServiceBuilder<Node, PoolB::Pool, ExecB::EVM>,
{
let Self {
pool_builder,
payload_builder: _,
network_builder,
executor_builder,
consensus_builder,
_marker,
} = self;
ComponentsBuilder {
@@ -354,7 +358,7 @@ impl<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB> NodeComponentsBuilder<Node>
for ComponentsBuilder<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB>
where
Node: FullNodeTypes,
PoolB: PoolBuilder<Node, ExecB::EVM, Pool: TransactionPool>,
PoolB: PoolBuilder<Node, Pool: TransactionPool>,
NetworkB: NetworkBuilder<
Node,
PoolB::Pool,
@@ -380,13 +384,13 @@ where
pool_builder,
payload_builder,
network_builder,
executor_builder,
executor_builder: evm_builder,
consensus_builder,
_marker,
} = self;
let evm_config = executor_builder.build_evm(context).await?;
let pool = pool_builder.build_pool(context, evm_config.clone()).await?;
let evm_config = evm_builder.build_evm(context).await?;
let pool = pool_builder.build_pool(context).await?;
let network = network_builder.build_network(context, pool.clone()).await?;
let payload_builder_handle = payload_builder
.spawn_payload_builder_service(context, pool.clone(), evm_config.clone())
@@ -467,19 +471,14 @@ where
#[derive(Debug, Clone)]
pub struct NoopTransactionPoolBuilder<Tx = EthPooledTransaction>(PhantomData<Tx>);
impl<N, Tx, Evm> PoolBuilder<N, Evm> for NoopTransactionPoolBuilder<Tx>
impl<N, Tx> PoolBuilder<N> for NoopTransactionPoolBuilder<Tx>
where
N: FullNodeTypes,
Tx: EthPoolTransaction<Consensus = TxTy<N::Types>> + Unpin,
Evm: Send,
{
type Pool = NoopTransactionPool<Tx>;
async fn build_pool(
self,
_ctx: &BuilderContext<N>,
_evm_config: Evm,
) -> eyre::Result<Self::Pool> {
async fn build_pool(self, _ctx: &BuilderContext<N>) -> eyre::Result<Self::Pool> {
Ok(NoopTransactionPool::<Tx>::new())
}
}

View File

@@ -4,7 +4,7 @@ use crate::{BuilderContext, FullNodeTypes};
use alloy_primitives::Address;
use reth_chain_state::CanonStateSubscriptions;
use reth_chainspec::EthereumHardforks;
use reth_node_api::{BlockTy, NodeTypes, TxTy};
use reth_node_api::{NodeTypes, TxTy};
use reth_transaction_pool::{
blobstore::DiskFileBlobStore, BlobStore, CoinbaseTipOrdering, PoolConfig, PoolTransaction,
SubPoolLimit, TransactionPool, TransactionValidationTaskExecutor, TransactionValidator,
@@ -12,7 +12,7 @@ use reth_transaction_pool::{
use std::{collections::HashSet, future::Future};
/// A type that knows how to build the transaction pool.
pub trait PoolBuilder<Node: FullNodeTypes, Evm>: Send {
pub trait PoolBuilder<Node: FullNodeTypes>: Send {
/// The transaction pool to build.
type Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>>
+ Unpin
@@ -22,17 +22,16 @@ pub trait PoolBuilder<Node: FullNodeTypes, Evm>: Send {
fn build_pool(
self,
ctx: &BuilderContext<Node>,
evm_config: Evm,
) -> impl Future<Output = eyre::Result<Self::Pool>> + Send;
}
impl<Node, F, Fut, Pool, Evm> PoolBuilder<Node, Evm> for F
impl<Node, F, Fut, Pool> PoolBuilder<Node> for F
where
Node: FullNodeTypes,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>>
+ Unpin
+ 'static,
F: FnOnce(&BuilderContext<Node>, Evm) -> Fut + Send,
F: FnOnce(&BuilderContext<Node>) -> Fut + Send,
Fut: Future<Output = eyre::Result<Pool>> + Send,
{
type Pool = Pool;
@@ -40,9 +39,8 @@ where
fn build_pool(
self,
ctx: &BuilderContext<Node>,
evm_config: Evm,
) -> impl Future<Output = eyre::Result<Self::Pool>> {
self(ctx, evm_config)
self(ctx)
}
}
@@ -131,7 +129,7 @@ impl<'a, Node: FullNodeTypes, V> TxPoolBuilder<'a, Node, V> {
impl<'a, Node, V> TxPoolBuilder<'a, Node, TransactionValidationTaskExecutor<V>>
where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec: EthereumHardforks>>,
V: TransactionValidator<Block = BlockTy<Node::Types>> + 'static,
V: TransactionValidator + 'static,
V::Transaction:
PoolTransaction<Consensus = TxTy<Node::Types>> + reth_transaction_pool::EthPoolTransaction,
{
@@ -250,7 +248,7 @@ fn spawn_pool_maintenance_task<Node, Pool>(
) -> eyre::Result<()>
where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec: EthereumHardforks>>,
Pool: reth_transaction_pool::TransactionPoolExt<Block = BlockTy<Node::Types>> + Clone + 'static,
Pool: reth_transaction_pool::TransactionPoolExt + Clone + 'static,
Pool::Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>,
{
let chain_events = ctx.provider().canonical_state_stream();
@@ -282,7 +280,7 @@ pub fn spawn_maintenance_tasks<Node, Pool>(
) -> eyre::Result<()>
where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec: EthereumHardforks>>,
Pool: reth_transaction_pool::TransactionPoolExt<Block = BlockTy<Node::Types>> + Clone + 'static,
Pool: reth_transaction_pool::TransactionPoolExt + Clone + 'static,
Pool::Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>,
{
spawn_local_backup_task(ctx, pool.clone())?;

View File

@@ -131,5 +131,21 @@ where
)
.build(provider_factory, static_file_producer);
// NOTE: To enable prewarming (POC), use ExecutionStage::new_with_prewarm() instead:
//
// ```
// ExecutionStage::new_with_prewarm(
// evm_config,
// consensus,
// stage_config.execution.into(),
// stage_config.execution_external_clean_threshold(),
// exex_manager_handle,
// provider_factory.clone(), // Clone the factory for prewarm threads
// 8_000_000_000, // 8GB cache
// )
// ```
//
// This requires modifying the pipeline builder to accept the different stage type.
Ok(pipeline)
}

View File

@@ -22,8 +22,9 @@ pub struct DefaultEngineValues {
legacy_state_root_task_enabled: bool,
state_cache_disabled: bool,
prewarming_disabled: bool,
parallel_sparse_trie_disabled: bool,
state_provider_metrics: bool,
cross_block_cache_size: usize,
cross_block_cache_size: u64,
state_root_task_compare_updates: bool,
accept_execution_requests_hash: bool,
multiproof_chunking_enabled: bool,
@@ -35,7 +36,7 @@ pub struct DefaultEngineValues {
allow_unwind_canonical_header: bool,
storage_worker_count: Option<usize>,
account_worker_count: Option<usize>,
disable_proof_v2: bool,
enable_proof_v2: bool,
cache_metrics_disabled: bool,
}
@@ -80,6 +81,12 @@ impl DefaultEngineValues {
self
}
/// Set whether to disable parallel sparse trie by default
pub const fn with_parallel_sparse_trie_disabled(mut self, v: bool) -> Self {
self.parallel_sparse_trie_disabled = v;
self
}
/// Set whether to enable state provider metrics by default
pub const fn with_state_provider_metrics(mut self, v: bool) -> Self {
self.state_provider_metrics = v;
@@ -87,7 +94,7 @@ impl DefaultEngineValues {
}
/// Set the default cross-block cache size in MB
pub const fn with_cross_block_cache_size(mut self, v: usize) -> Self {
pub const fn with_cross_block_cache_size(mut self, v: u64) -> Self {
self.cross_block_cache_size = v;
self
}
@@ -161,9 +168,9 @@ impl DefaultEngineValues {
self
}
/// Set whether to disable proof V2 by default
pub const fn with_disable_proof_v2(mut self, v: bool) -> Self {
self.disable_proof_v2 = v;
/// Set whether to enable proof V2 by default
pub const fn with_enable_proof_v2(mut self, v: bool) -> Self {
self.enable_proof_v2 = v;
self
}
@@ -182,6 +189,7 @@ impl Default for DefaultEngineValues {
legacy_state_root_task_enabled: false,
state_cache_disabled: false,
prewarming_disabled: false,
parallel_sparse_trie_disabled: false,
state_provider_metrics: false,
cross_block_cache_size: DEFAULT_CROSS_BLOCK_CACHE_SIZE_MB,
state_root_task_compare_updates: false,
@@ -195,7 +203,7 @@ impl Default for DefaultEngineValues {
allow_unwind_canonical_header: false,
storage_worker_count: None,
account_worker_count: None,
disable_proof_v2: false,
enable_proof_v2: false,
cache_metrics_disabled: false,
}
}
@@ -236,14 +244,14 @@ pub struct EngineArgs {
#[arg(long = "engine.disable-prewarming", alias = "engine.disable-caching-and-prewarming", default_value_t = DefaultEngineValues::get_global().prewarming_disabled)]
pub prewarming_disabled: bool,
/// CAUTION: This CLI flag has no effect anymore. The parallel sparse trie is always enabled.
/// CAUTION: This CLI flag has no effect anymore, use --engine.disable-parallel-sparse-trie
/// if you want to disable usage of the `ParallelSparseTrie`.
#[deprecated]
#[arg(long = "engine.parallel-sparse-trie", default_value = "true", hide = true)]
pub parallel_sparse_trie_enabled: bool,
/// CAUTION: This CLI flag has no effect anymore. The parallel sparse trie is always enabled.
#[deprecated]
#[arg(long = "engine.disable-parallel-sparse-trie", default_value = "false", hide = true)]
/// Disable the parallel sparse trie in the engine.
#[arg(long = "engine.disable-parallel-sparse-trie", default_value_t = DefaultEngineValues::get_global().parallel_sparse_trie_disabled)]
pub parallel_sparse_trie_disabled: bool,
/// Enable state provider latency metrics. This allows the engine to collect and report stats
@@ -254,7 +262,7 @@ pub struct EngineArgs {
/// Configure the size of cross-block cache in megabytes
#[arg(long = "engine.cross-block-cache-size", default_value_t = DefaultEngineValues::get_global().cross_block_cache_size)]
pub cross_block_cache_size: usize,
pub cross_block_cache_size: u64,
/// Enable comparing trie updates from the state root task to the trie updates from the regular
/// state root calculation.
@@ -317,9 +325,9 @@ pub struct EngineArgs {
#[arg(long = "engine.account-worker-count", default_value = Resettable::from(DefaultEngineValues::get_global().account_worker_count.map(|v| v.to_string().into())))]
pub account_worker_count: Option<usize>,
/// Disable V2 storage proofs for state root calculations
#[arg(long = "engine.disable-proof-v2", default_value_t = DefaultEngineValues::get_global().disable_proof_v2)]
pub disable_proof_v2: bool,
/// Enable V2 storage proofs for state root calculations
#[arg(long = "engine.enable-proof-v2", default_value_t = DefaultEngineValues::get_global().enable_proof_v2)]
pub enable_proof_v2: bool,
/// Disable cache metrics recording, which can take up to 50ms with large cached state.
#[arg(long = "engine.disable-cache-metrics", default_value_t = DefaultEngineValues::get_global().cache_metrics_disabled)]
@@ -335,6 +343,7 @@ impl Default for EngineArgs {
legacy_state_root_task_enabled,
state_cache_disabled,
prewarming_disabled,
parallel_sparse_trie_disabled,
state_provider_metrics,
cross_block_cache_size,
state_root_task_compare_updates,
@@ -348,7 +357,7 @@ impl Default for EngineArgs {
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
disable_proof_v2,
enable_proof_v2,
cache_metrics_disabled,
} = DefaultEngineValues::get_global().clone();
Self {
@@ -360,7 +369,7 @@ impl Default for EngineArgs {
state_cache_disabled,
prewarming_disabled,
parallel_sparse_trie_enabled: true,
parallel_sparse_trie_disabled: false,
parallel_sparse_trie_disabled,
state_provider_metrics,
cross_block_cache_size,
accept_execution_requests_hash,
@@ -374,7 +383,7 @@ impl Default for EngineArgs {
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
disable_proof_v2,
enable_proof_v2,
cache_metrics_disabled,
}
}
@@ -389,6 +398,7 @@ impl EngineArgs {
.with_legacy_state_root(self.legacy_state_root_task_enabled)
.without_state_cache(self.state_cache_disabled)
.without_prewarming(self.prewarming_disabled)
.with_disable_parallel_sparse_trie(self.parallel_sparse_trie_disabled)
.with_state_provider_metrics(self.state_provider_metrics)
.with_always_compare_trie_updates(self.state_root_task_compare_updates)
.with_cross_block_cache_size(self.cross_block_cache_size * 1024 * 1024)
@@ -410,7 +420,7 @@ impl EngineArgs {
config = config.with_account_worker_count(count);
}
config = config.with_disable_proof_v2(self.disable_proof_v2);
config = config.with_enable_proof_v2(self.enable_proof_v2);
config = config.without_cache_metrics(self.cache_metrics_disabled);
config
@@ -447,7 +457,7 @@ mod tests {
state_cache_disabled: true,
prewarming_disabled: true,
parallel_sparse_trie_enabled: true,
parallel_sparse_trie_disabled: false,
parallel_sparse_trie_disabled: true,
state_provider_metrics: true,
cross_block_cache_size: 256,
state_root_task_compare_updates: true,
@@ -462,7 +472,7 @@ mod tests {
allow_unwind_canonical_header: true,
storage_worker_count: Some(16),
account_worker_count: Some(8),
disable_proof_v2: false,
enable_proof_v2: false,
cache_metrics_disabled: true,
};
@@ -475,6 +485,7 @@ mod tests {
"--engine.legacy-state-root",
"--engine.disable-state-cache",
"--engine.disable-prewarming",
"--engine.disable-parallel-sparse-trie",
"--engine.state-provider-metrics",
"--engine.cross-block-cache-size",
"256",

View File

@@ -1,27 +1,13 @@
//! clap [Args](clap::Args) for `RocksDB` table routing configuration
use clap::{ArgAction, Args};
use reth_storage_api::StorageSettings;
/// Default value for `tx_hash` routing flag.
/// Default value for `RocksDB` routing flags.
///
/// Derived from [`StorageSettings::base()`] to ensure CLI defaults match storage defaults.
const fn default_tx_hash_in_rocksdb() -> bool {
StorageSettings::base().transaction_hash_numbers_in_rocksdb
}
/// Default value for `storages_history` routing flag.
///
/// Derived from [`StorageSettings::base()`] to ensure CLI defaults match storage defaults.
const fn default_storages_history_in_rocksdb() -> bool {
StorageSettings::base().storages_history_in_rocksdb
}
/// Default value for `account_history` routing flag.
///
/// Derived from [`StorageSettings::base()`] to ensure CLI defaults match storage defaults.
const fn default_account_history_in_rocksdb() -> bool {
StorageSettings::base().account_history_in_rocksdb
/// When the `edge` feature is enabled, defaults to `true` to enable edge storage features.
/// Otherwise defaults to `false` for legacy behavior.
const fn default_rocksdb_flag() -> bool {
cfg!(feature = "edge")
}
/// Parameters for `RocksDB` table routing configuration.
@@ -42,21 +28,21 @@ pub struct RocksDbArgs {
///
/// This is a genesis-initialization-only flag: changing it after genesis requires a re-sync.
/// Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
#[arg(long = "rocksdb.tx-hash", default_value_t = default_tx_hash_in_rocksdb(), action = ArgAction::Set)]
#[arg(long = "rocksdb.tx-hash", default_value_t = default_rocksdb_flag(), action = ArgAction::Set)]
pub tx_hash: bool,
/// Route storages history tables to `RocksDB` instead of MDBX.
///
/// This is a genesis-initialization-only flag: changing it after genesis requires a re-sync.
/// Defaults to `false`.
#[arg(long = "rocksdb.storages-history", default_value_t = default_storages_history_in_rocksdb(), action = ArgAction::Set)]
/// Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
#[arg(long = "rocksdb.storages-history", default_value_t = default_rocksdb_flag(), action = ArgAction::Set)]
pub storages_history: bool,
/// Route account history tables to `RocksDB` instead of MDBX.
///
/// This is a genesis-initialization-only flag: changing it after genesis requires a re-sync.
/// Defaults to `false`.
#[arg(long = "rocksdb.account-history", default_value_t = default_account_history_in_rocksdb(), action = ArgAction::Set)]
/// Defaults to `true` when the `edge` feature is enabled, `false` otherwise.
#[arg(long = "rocksdb.account-history", default_value_t = default_rocksdb_flag(), action = ArgAction::Set)]
pub account_history: bool,
}
@@ -64,9 +50,9 @@ impl Default for RocksDbArgs {
fn default() -> Self {
Self {
all: false,
tx_hash: default_tx_hash_in_rocksdb(),
storages_history: default_storages_history_in_rocksdb(),
account_history: default_account_history_in_rocksdb(),
tx_hash: default_rocksdb_flag(),
storages_history: default_rocksdb_flag(),
account_history: default_rocksdb_flag(),
}
}
}
@@ -120,25 +106,7 @@ mod tests {
fn test_parse_all_flag() {
let args = CommandParser::<RocksDbArgs>::parse_from(["reth", "--rocksdb.all"]).args;
assert!(args.all);
assert_eq!(args.tx_hash, default_tx_hash_in_rocksdb());
}
#[test]
fn test_defaults_match_storage_settings() {
let args = RocksDbArgs::default();
let settings = StorageSettings::base();
assert_eq!(
args.tx_hash, settings.transaction_hash_numbers_in_rocksdb,
"tx_hash default should match StorageSettings::base()"
);
assert_eq!(
args.storages_history, settings.storages_history_in_rocksdb,
"storages_history default should match StorageSettings::base()"
);
assert_eq!(
args.account_history, settings.account_history_in_rocksdb,
"account_history default should match StorageSettings::base()"
);
assert_eq!(args.tx_hash, default_rocksdb_flag());
}
#[test]

View File

@@ -1025,7 +1025,6 @@ mod tests {
max_receipts: 2000,
max_headers: 1000,
max_concurrent_db_requests: 512,
max_cached_tx_hashes: 30_000,
},
gas_price_oracle: GasPriceOracleArgs {
blocks: 20,

View File

@@ -1,7 +1,7 @@
use clap::Args;
use reth_rpc_server_types::constants::cache::{
DEFAULT_BLOCK_CACHE_MAX_LEN, DEFAULT_CONCURRENT_DB_REQUESTS, DEFAULT_HEADER_CACHE_MAX_LEN,
DEFAULT_MAX_CACHED_TX_HASHES, DEFAULT_RECEIPT_CACHE_MAX_LEN,
DEFAULT_RECEIPT_CACHE_MAX_LEN,
};
/// Parameters to configure RPC state cache.
@@ -36,13 +36,6 @@ pub struct RpcStateCacheArgs {
default_value_t = DEFAULT_CONCURRENT_DB_REQUESTS,
)]
pub max_concurrent_db_requests: usize,
/// Maximum number of transaction hashes to cache for transaction lookups.
#[arg(
long = "rpc-cache.max-cached-tx-hashes",
default_value_t = DEFAULT_MAX_CACHED_TX_HASHES,
)]
pub max_cached_tx_hashes: u32,
}
impl RpcStateCacheArgs {
@@ -61,7 +54,6 @@ impl Default for RpcStateCacheArgs {
max_receipts: DEFAULT_RECEIPT_CACHE_MAX_LEN,
max_headers: DEFAULT_HEADER_CACHE_MAX_LEN,
max_concurrent_db_requests: DEFAULT_CONCURRENT_DB_REQUESTS,
max_cached_tx_hashes: DEFAULT_MAX_CACHED_TX_HASHES,
}
}
}

View File

@@ -90,7 +90,7 @@ impl StaticFilesArgs {
/// args.
///
/// If `minimal` is true, uses [`MINIMAL_BLOCKS_PER_FILE`] blocks per file as the default for
/// all segments.
/// headers, transactions, and receipts segments.
pub fn merge_with_config(&self, config: StaticFilesConfig, minimal: bool) -> StaticFilesConfig {
let minimal_blocks_per_file = minimal.then_some(MINIMAL_BLOCKS_PER_FILE);
StaticFilesConfig {
@@ -109,15 +109,12 @@ impl StaticFilesArgs {
.or(config.blocks_per_file.receipts),
transaction_senders: self
.blocks_per_file_transaction_senders
.or(minimal_blocks_per_file)
.or(config.blocks_per_file.transaction_senders),
account_change_sets: self
.blocks_per_file_account_change_sets
.or(minimal_blocks_per_file)
.or(config.blocks_per_file.account_change_sets),
storage_change_sets: self
.blocks_per_file_storage_change_sets
.or(minimal_blocks_per_file)
.or(config.blocks_per_file.storage_change_sets),
},
}

View File

@@ -507,7 +507,7 @@ impl RethTransactionPoolConfig for TxPoolArgs {
PoolConfig {
local_transactions_config: LocalTransactionConfig {
no_exemptions: self.no_locals,
local_addresses: self.locals.iter().copied().collect(),
local_addresses: self.locals.clone().into_iter().collect(),
propagate_local_transactions: !self.no_local_transactions_propagation,
},
pending_limit: SubPoolLimit {

View File

@@ -39,7 +39,7 @@ pub use reth_engine_primitives::{
};
/// Default size of cross-block cache in megabytes.
pub const DEFAULT_CROSS_BLOCK_CACHE_SIZE_MB: usize = 4 * 1024;
pub const DEFAULT_CROSS_BLOCK_CACHE_SIZE_MB: u64 = 4 * 1024;
/// This includes all necessary configuration to launch the node.
/// The individual configuration options can be overwritten before launching the node.

View File

@@ -257,11 +257,6 @@ fn describe_rocksdb_metrics() {
Unit::Bytes,
"The size of memtables for a RocksDB table"
);
describe_gauge!(
"rocksdb.wal_size",
Unit::Bytes,
"The total size of WAL (Write-Ahead Log) files. Important: this is not included in table_size or sst_size metrics"
);
}
#[cfg(all(feature = "jemalloc", unix))]

View File

@@ -165,7 +165,6 @@ impl OpNode {
self.args;
ComponentsBuilder::default()
.node_types::<Node>()
.executor(OpExecutorBuilder::default())
.pool(
OpPoolBuilder::default()
.with_enable_tx_conditional(self.args.enable_tx_conditional)
@@ -174,6 +173,7 @@ impl OpNode {
self.args.supervisor_safety_level,
),
)
.executor(OpExecutorBuilder::default())
.payload(BasicPayloadServiceBuilder::new(
OpPayloadBuilder::new(compute_pending_block)
.with_da_config(self.da_config.clone())
@@ -957,19 +957,14 @@ impl<T> OpPoolBuilder<T> {
}
}
impl<Node, T, Evm> PoolBuilder<Node, Evm> for OpPoolBuilder<T>
impl<Node, T> PoolBuilder<Node> for OpPoolBuilder<T>
where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec: OpHardforks>>,
T: EthPoolTransaction<Consensus = TxTy<Node::Types>> + OpPooledTx,
Evm: ConfigureEvm<Primitives = PrimitivesTy<Node::Types>> + Clone + 'static,
{
type Pool = OpTransactionPool<Node::Provider, DiskFileBlobStore, Evm, T>;
type Pool = OpTransactionPool<Node::Provider, DiskFileBlobStore, T>;
async fn build_pool(
self,
ctx: &BuilderContext<Node>,
evm_config: Evm,
) -> eyre::Result<Self::Pool> {
async fn build_pool(self, ctx: &BuilderContext<Node>) -> eyre::Result<Self::Pool> {
let Self { pool_config_overrides, .. } = self;
// supervisor used for interop
@@ -987,27 +982,27 @@ where
.await;
let blob_store = reth_node_builder::components::create_blob_store(ctx)?;
let validator =
TransactionValidationTaskExecutor::eth_builder(ctx.provider().clone(), evm_config)
.no_eip4844()
.with_max_tx_input_bytes(ctx.config().txpool.max_tx_input_bytes)
.kzg_settings(ctx.kzg_settings()?)
.set_tx_fee_cap(ctx.config().rpc.rpc_tx_fee_cap)
.with_max_tx_gas_limit(ctx.config().txpool.max_tx_gas_limit)
.with_minimum_priority_fee(ctx.config().txpool.minimum_priority_fee)
.with_additional_tasks(
pool_config_overrides
.additional_validation_tasks
.unwrap_or_else(|| ctx.config().txpool.additional_validation_tasks),
)
.build_with_tasks(ctx.task_executor().clone(), blob_store.clone())
.map(|validator| {
OpTransactionValidator::new(validator)
// In --dev mode we can't require gas fees because we're unable to decode
// the L1 block info
.require_l1_data_gas_fee(!ctx.config().dev.dev)
.with_supervisor(supervisor_client.clone())
});
let validator = TransactionValidationTaskExecutor::eth_builder(ctx.provider().clone())
.no_eip4844()
.with_head_timestamp(ctx.head().timestamp)
.with_max_tx_input_bytes(ctx.config().txpool.max_tx_input_bytes)
.kzg_settings(ctx.kzg_settings()?)
.set_tx_fee_cap(ctx.config().rpc.rpc_tx_fee_cap)
.with_max_tx_gas_limit(ctx.config().txpool.max_tx_gas_limit)
.with_minimum_priority_fee(ctx.config().txpool.minimum_priority_fee)
.with_additional_tasks(
pool_config_overrides
.additional_validation_tasks
.unwrap_or_else(|| ctx.config().txpool.additional_validation_tasks),
)
.build_with_tasks(ctx.task_executor().clone(), blob_store.clone())
.map(|validator| {
OpTransactionValidator::new(validator)
// In --dev mode we can't require gas fees because we're unable to decode
// the L1 block info
.require_l1_data_gas_fee(!ctx.config().dev.dev)
.with_supervisor(supervisor_client.clone())
});
let final_pool_config = pool_config_overrides.apply(ctx.pool_config());

View File

@@ -52,9 +52,9 @@
//! ComponentsBuilder::default()
//! .node_types::<RethFullAdapter<_, OpNode>>()
//! .noop_pool::<OpPooledTransaction>()
//! .executor(OpExecutorBuilder::default())
//! .noop_consensus()
//! .noop_network::<OpNetworkPrimitives>()
//! .noop_consensus()
//! .executor(OpExecutorBuilder::default())
//! .noop_payload(),
//! Box::new(()) as Box<dyn OnComponentInitializedHook<_>>,
//! )

View File

@@ -155,10 +155,6 @@ impl IsTyped2718 for OpTransactionSigned {
}
impl SignedTransaction for OpTransactionSigned {
fn is_system_tx(&self) -> bool {
self.is_deposit()
}
fn recalculate_hash(&self) -> B256 {
keccak256(self.encoded_2718())
}

View File

@@ -23,7 +23,6 @@ alloy-serde.workspace = true
# reth
reth-chainspec.workspace = true
reth-evm.workspace = true
reth-primitives-traits.workspace = true
reth-chain-state.workspace = true
reth-storage-api.workspace = true

View File

@@ -24,8 +24,8 @@ pub mod estimated_da_size;
use reth_transaction_pool::{CoinbaseTipOrdering, Pool, TransactionValidationTaskExecutor};
/// Type alias for default optimism transaction pool
pub type OpTransactionPool<Client, S, Evm, T = OpPooledTransaction> = Pool<
TransactionValidationTaskExecutor<OpTransactionValidator<Client, T, Evm>>,
pub type OpTransactionPool<Client, S, T = OpPooledTransaction> = Pool<
TransactionValidationTaskExecutor<OpTransactionValidator<Client, T>>,
CoinbaseTipOrdering<T>,
S,
>;

View File

@@ -316,8 +316,7 @@ mod tests {
use alloy_primitives::{TxKind, U256};
use op_alloy_consensus::TxDeposit;
use reth_optimism_chainspec::OP_MAINNET;
use reth_optimism_evm::OpEvmConfig;
use reth_optimism_primitives::{OpPrimitives, OpTransactionSigned};
use reth_optimism_primitives::OpTransactionSigned;
use reth_provider::test_utils::MockEthProvider;
use reth_transaction_pool::{
blobstore::InMemoryBlobStore, validate::EthTransactionValidatorBuilder, TransactionOrigin,
@@ -325,11 +324,8 @@ mod tests {
};
#[tokio::test]
async fn validate_optimism_transaction() {
let client = MockEthProvider::<OpPrimitives>::new()
.with_chain_spec(OP_MAINNET.clone())
.with_genesis_block();
let evm_config = OpEvmConfig::optimism(OP_MAINNET.clone());
let validator = EthTransactionValidatorBuilder::new(client, evm_config)
let client = MockEthProvider::default().with_chain_spec(OP_MAINNET.clone());
let validator = EthTransactionValidatorBuilder::new(client)
.no_shanghai()
.no_cancun()
.build(InMemoryBlobStore::default());

View File

@@ -3,12 +3,10 @@ use alloy_consensus::{BlockHeader, Transaction};
use op_revm::L1BlockInfo;
use parking_lot::RwLock;
use reth_chainspec::ChainSpecProvider;
use reth_evm::ConfigureEvm;
use reth_optimism_evm::RethL1BlockInfo;
use reth_optimism_forks::OpHardforks;
use reth_primitives_traits::{
transaction::error::InvalidTransactionError, Block, BlockBody, BlockTy, GotExpected,
SealedBlock,
transaction::error::InvalidTransactionError, Block, BlockBody, GotExpected, SealedBlock,
};
use reth_storage_api::{AccountInfoReader, BlockReaderIdExt, StateProviderFactory};
use reth_transaction_pool::{
@@ -41,9 +39,9 @@ impl OpL1BlockInfo {
/// Validator for Optimism transactions.
#[derive(Debug, Clone)]
pub struct OpTransactionValidator<Client, Tx, Evm> {
pub struct OpTransactionValidator<Client, Tx> {
/// The type that performs the actual validation.
inner: Arc<EthTransactionValidator<Client, Tx, Evm>>,
inner: Arc<EthTransactionValidator<Client, Tx>>,
/// Additional block info required for validation.
block_info: Arc<OpL1BlockInfo>,
/// If true, ensure that the transaction's sender has enough balance to cover the L1 gas fee
@@ -56,7 +54,7 @@ pub struct OpTransactionValidator<Client, Tx, Evm> {
fork_tracker: Arc<OpForkTracker>,
}
impl<Client, Tx, Evm> OpTransactionValidator<Client, Tx, Evm> {
impl<Client, Tx> OpTransactionValidator<Client, Tx> {
/// Returns the configured chain spec
pub fn chain_spec(&self) -> Arc<Client::ChainSpec>
where
@@ -88,15 +86,14 @@ impl<Client, Tx, Evm> OpTransactionValidator<Client, Tx, Evm> {
}
}
impl<Client, Tx, Evm> OpTransactionValidator<Client, Tx, Evm>
impl<Client, Tx> OpTransactionValidator<Client, Tx>
where
Client:
ChainSpecProvider<ChainSpec: OpHardforks> + StateProviderFactory + BlockReaderIdExt + Sync,
Tx: EthPoolTransaction + OpPooledTx,
Evm: ConfigureEvm,
{
/// Create a new [`OpTransactionValidator`].
pub fn new(inner: EthTransactionValidator<Client, Tx, Evm>) -> Self {
pub fn new(inner: EthTransactionValidator<Client, Tx>) -> Self {
let this = Self::with_block_info(inner, OpL1BlockInfo::default());
if let Ok(Some(block)) =
this.inner.client().block_by_number_or_tag(alloy_eips::BlockNumberOrTag::Latest)
@@ -115,7 +112,7 @@ where
/// Create a new [`OpTransactionValidator`] with the given [`OpL1BlockInfo`].
pub fn with_block_info(
inner: EthTransactionValidator<Client, Tx, Evm>,
inner: EthTransactionValidator<Client, Tx>,
block_info: OpL1BlockInfo,
) -> Self {
Self {
@@ -291,15 +288,13 @@ where
}
}
impl<Client, Tx, Evm> TransactionValidator for OpTransactionValidator<Client, Tx, Evm>
impl<Client, Tx> TransactionValidator for OpTransactionValidator<Client, Tx>
where
Client:
ChainSpecProvider<ChainSpec: OpHardforks> + StateProviderFactory + BlockReaderIdExt + Sync,
Tx: EthPoolTransaction + OpPooledTx,
Evm: ConfigureEvm,
{
type Transaction = Tx;
type Block = BlockTy<Evm::Primitives>;
async fn validate_transaction(
&self,
@@ -330,7 +325,10 @@ where
.await
}
fn on_new_head_block(&self, new_tip_block: &SealedBlock<Self::Block>) {
fn on_new_head_block<B>(&self, new_tip_block: &SealedBlock<B>)
where
B: Block,
{
self.inner.on_new_head_block(new_tip_block);
self.update_l1_block_info(
new_tip_block.header(),

View File

@@ -798,14 +798,12 @@ mod rpc_compat {
.zip(senders)
.enumerate()
.map(|(idx, (tx, sender))| {
#[allow(clippy::needless_update)]
let tx_info = TransactionInfo {
hash: Some(*tx.tx_hash()),
block_hash,
block_number: Some(block_number),
base_fee,
index: Some(idx as u64),
..Default::default()
};
converter(Recovered::new_unchecked(tx, sender), tx_info)

View File

@@ -48,16 +48,6 @@ pub trait SignedTransaction:
+ TxHashRef
+ IsTyped2718
{
/// Returns whether this is a system transaction.
///
/// System transactions are created at the protocol level rather than by users. They are
/// typically used by L2s for special purposes (e.g., Optimism deposit transactions with type
/// 126) and may have different validation rules or fee handling compared to standard
/// user-initiated transactions.
fn is_system_tx(&self) -> bool {
false
}
/// Returns whether this transaction type can be __broadcasted__ as full transaction over the
/// network.
///

View File

@@ -17,7 +17,6 @@ reth-exex-types.workspace = true
reth-db-api.workspace = true
reth-errors.workspace = true
reth-provider.workspace = true
reth-storage-api.workspace = true
reth-tokio-util.workspace = true
reth-config.workspace = true
reth-prune-types.workspace = true

View File

@@ -10,7 +10,6 @@ use reth_provider::{
StageCheckpointReader, StaticFileProviderFactory, StorageSettingsCache,
};
use reth_prune_types::PruneModes;
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
use std::time::Duration;
use tokio::sync::watch;
@@ -83,8 +82,6 @@ impl PrunerBuilder {
+ ChainStateBlockReader
+ StorageSettingsCache
+ StageCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader
+ StaticFileProviderFactory<
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
>,
@@ -119,9 +116,7 @@ impl PrunerBuilder {
+ PruneCheckpointWriter
+ PruneCheckpointReader
+ StorageSettingsCache
+ StageCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader,
+ StageCheckpointReader,
{
let segments = SegmentSet::<Provider>::from_components(static_file_provider, self.segments);

View File

@@ -149,7 +149,21 @@ where
let elapsed = start.elapsed();
self.metrics.duration_seconds.record(elapsed);
output.debug_log(tip_block_number, deleted_entries, elapsed);
let message = match output.progress {
PruneProgress::HasMoreData(_) => "Pruner interrupted and has more data to prune",
PruneProgress::Finished => "Pruner finished",
};
debug!(
target: "pruner",
%tip_block_number,
?elapsed,
?deleted_entries,
?limiter,
?output,
?stats,
"{message}",
);
self.event_sender.notify(PrunerEvent::Finished { tip_block_number, elapsed, stats });

View File

@@ -10,7 +10,6 @@ use reth_provider::{
PruneCheckpointReader, PruneCheckpointWriter, StaticFileProviderFactory, StorageSettingsCache,
};
use reth_prune_types::PruneModes;
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
/// Collection of [`Segment`]. Thread-safe, allocated on the heap.
#[derive(Debug)]
@@ -53,9 +52,7 @@ where
+ PruneCheckpointReader
+ BlockReader<Transaction: Encodable2718>
+ ChainStateBlockReader
+ StorageSettingsCache
+ ChangeSetReader
+ StorageChangeSetReader,
+ StorageSettingsCache,
{
/// Creates a [`SegmentSet`] from an existing components, such as [`StaticFileProvider`] and
/// [`PruneModes`].

View File

@@ -1,22 +1,14 @@
use crate::{
db_ext::DbTxPruneExt,
segments::{
user::history::{finalize_history_prune, HistoryPruneResult},
PruneInput, Segment,
},
segments::{user::history::prune_history_indices, PruneInput, Segment},
PrunerError,
};
use alloy_primitives::BlockNumber;
use itertools::Itertools;
use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
use reth_provider::{
changeset_walker::StaticFileAccountChangesetWalker, DBProvider, EitherWriter,
StaticFileProviderFactory, StorageSettingsCache,
};
use reth_provider::DBProvider;
use reth_prune_types::{
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::ChangeSetReader;
use rustc_hash::FxHashMap;
use tracing::{instrument, trace};
@@ -39,10 +31,7 @@ impl AccountHistory {
impl<Provider> Segment<Provider> for AccountHistory
where
Provider: DBProvider<Tx: DbTxMut>
+ StaticFileProviderFactory
+ StorageSettingsCache
+ ChangeSetReader,
Provider: DBProvider<Tx: DbTxMut>,
{
fn segment(&self) -> PruneSegment {
PruneSegment::AccountHistory
@@ -67,33 +56,11 @@ where
};
let range_end = *range.end();
// Check where account changesets are stored
if EitherWriter::account_changesets_destination(provider).is_static_file() {
self.prune_static_files(provider, input, range, range_end)
} else {
self.prune_database(provider, input, range, range_end)
}
}
}
impl AccountHistory {
/// Prunes account history when changesets are stored in static files.
fn prune_static_files<Provider>(
&self,
provider: &Provider,
input: PruneInput,
range: std::ops::RangeInclusive<BlockNumber>,
range_end: BlockNumber,
) -> Result<SegmentOutput, PrunerError>
where
Provider: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory + ChangeSetReader,
{
let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE)
} else {
input.limiter
};
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
@@ -101,86 +68,15 @@ impl AccountHistory {
))
}
// The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with the current defaults it's usually `3500 * 5 /
// 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total size should be up to
// ~0.25MB + some hashmap overhead. `blocks_since_last_run` is additionally limited by the
// `max_reorg_depth`, so no OOM is expected here.
let mut highest_deleted_accounts = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut pruned_changesets = 0;
let mut done = true;
let walker = StaticFileAccountChangesetWalker::new(provider, range);
for result in walker {
if limiter.is_limit_reached() {
done = false;
break;
}
let (block_number, changeset) = result?;
highest_deleted_accounts.insert(changeset.address, block_number);
last_changeset_pruned_block = Some(block_number);
pruned_changesets += 1;
limiter.increment_deleted_entries_count();
}
// Delete static file jars below the pruned block
if let Some(last_block) = last_changeset_pruned_block {
provider
.static_file_provider()
.delete_segment_below_block(StaticFileSegment::AccountChangeSets, last_block + 1)?;
}
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from static files)");
let result = HistoryPruneResult {
highest_deleted: highest_deleted_accounts,
last_pruned_block: last_changeset_pruned_block,
pruned_count: pruned_changesets,
done,
};
finalize_history_prune::<_, tables::AccountsHistory, _, _>(
provider,
result,
range_end,
&limiter,
ShardedKey::new,
|a, b| a.key == b.key,
)
.map_err(Into::into)
}
fn prune_database<Provider>(
&self,
provider: &Provider,
input: PruneInput,
range: std::ops::RangeInclusive<BlockNumber>,
range_end: BlockNumber,
) -> Result<SegmentOutput, PrunerError>
where
Provider: DBProvider<Tx: DbTxMut>,
{
let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE)
} else {
input.limiter
};
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
// Deleted account changeset keys (account addresses) with the highest block number deleted
// for that key.
//
// The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with the current defaults it's usually `3500 * 5 /
// 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total size should be up to
// ~0.25MB + some hashmap overhead. `blocks_since_last_run` is additionally limited by the
// `max_reorg_depth`, so no OOM is expected here.
let mut last_changeset_pruned_block = None;
// The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5
// / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
// size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is
// additionally limited by the `max_reorg_depth`, so no OOM is expected here.
let mut highest_deleted_accounts = FxHashMap::default();
let (pruned_changesets, done) =
provider.tx_ref().prune_table_with_range::<tables::AccountChangeSets>(
@@ -192,52 +88,69 @@ impl AccountHistory {
last_changeset_pruned_block = Some(block_number);
},
)?;
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from database)");
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets)");
let result = HistoryPruneResult {
highest_deleted: highest_deleted_accounts,
last_pruned_block: last_changeset_pruned_block,
pruned_count: pruned_changesets,
done,
};
finalize_history_prune::<_, tables::AccountsHistory, _, _>(
let last_changeset_pruned_block = last_changeset_pruned_block
// If there's more account changesets to prune, set the checkpoint block number to
// previous, so we could finish pruning its account changesets on the next run.
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);
// Sort highest deleted block numbers by account address and turn them into sharded keys.
// We did not use `BTreeMap` from the beginning, because it's inefficient for hashes.
let highest_sharded_keys = highest_deleted_accounts
.into_iter()
.sorted_unstable() // Unstable is fine because no equal keys exist in the map
.map(|(address, block_number)| {
ShardedKey::new(address, block_number.min(last_changeset_pruned_block))
});
let outcomes = prune_history_indices::<Provider, tables::AccountsHistory, _>(
provider,
result,
range_end,
&limiter,
ShardedKey::new,
highest_sharded_keys,
|a, b| a.key == b.key,
)
.map_err(Into::into)
)?;
trace!(target: "pruner", ?outcomes, %done, "Pruned account history (indices)");
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: pruned_changesets + outcomes.deleted,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
}
#[cfg(test)]
mod tests {
use super::ACCOUNT_HISTORY_TABLES_TO_PRUNE;
use crate::segments::{AccountHistory, PruneInput, PruneLimiter, Segment, SegmentOutput};
use crate::segments::{
user::account_history::ACCOUNT_HISTORY_TABLES_TO_PRUNE, AccountHistory, PruneInput,
PruneLimiter, Segment, SegmentOutput,
};
use alloy_primitives::{BlockNumber, B256};
use assert_matches::assert_matches;
use reth_db_api::{models::StorageSettings, tables, BlockNumberList};
use reth_db_api::{tables, BlockNumberList};
use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_storage_api::StorageSettingsCache;
use reth_testing_utils::generators::{
self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
};
use std::{collections::BTreeMap, ops::AddAssign};
#[test]
fn prune_legacy() {
fn prune() {
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=5000,
1..=5000,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
@@ -289,9 +202,6 @@ mod tests {
let segment = AccountHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_account_changesets_in_static_files(false),
);
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
@@ -329,18 +239,20 @@ mod tests {
.map(|(i, _)| i)
.unwrap_or_default();
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let mut pruned_changesets = changesets
.iter()
// Skip what we've pruned so far, subtracting one to get last pruned block
// number further down
.skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _)| if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
} as BlockNumber)
.unwrap_or(to_block);
.next()
.map(|(block_number, _)| if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
} as BlockNumber)
.unwrap_or(to_block);
let pruned_changesets = pruned_changesets.fold(
BTreeMap::<_, Vec<_>>::new(),
@@ -391,152 +303,4 @@ mod tests {
test_prune(998, 2, (PruneProgress::Finished, 998));
test_prune(1400, 3, (PruneProgress::Finished, 804));
}
#[test]
fn prune_static_file() {
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=5000,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
0..0,
0..0,
);
db.insert_changesets_to_static_files(changesets.clone(), None)
.expect("insert changesets to static files");
db.insert_history(changesets.clone(), None).expect("insert history");
let account_occurrences = db.table::<tables::AccountsHistory>().unwrap().into_iter().fold(
BTreeMap::<_, usize>::new(),
|mut map, (key, _)| {
map.entry(key.key).or_default().add_assign(1);
map
},
);
assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
let original_shards = db.table::<tables::AccountsHistory>().unwrap();
let test_prune =
|to_block: BlockNumber, run: usize, expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let deleted_entries_limit = 2000;
let mut limiter =
PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
let input = PruneInput {
previous_checkpoint: db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap(),
to_block,
limiter: limiter.clone(),
};
let segment = AccountHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_account_changesets_in_static_files(true),
);
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
assert_matches!(
result,
SegmentOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");
let changesets = changesets
.iter()
.enumerate()
.flat_map(|(block_number, changeset)| {
changeset.iter().map(move |change| (block_number, change))
})
.collect::<Vec<_>>();
#[expect(clippy::skip_while_next)]
let pruned = changesets
.iter()
.enumerate()
.skip_while(|(i, (block_number, _))| {
*i < deleted_entries_limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE * run &&
*block_number <= to_block as usize
})
.next()
.map(|(i, _)| i)
.unwrap_or_default();
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _)| {
(if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
}) as BlockNumber
})
.unwrap_or(to_block);
let actual_shards = db.table::<tables::AccountsHistory>().unwrap();
let expected_shards = original_shards
.iter()
.filter(|(key, _)| key.highest_block_number > last_pruned_block_number)
.map(|(key, blocks)| {
let new_blocks =
blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
})
.collect::<Vec<_>>();
assert_eq!(actual_shards, expected_shards);
assert_eq!(
db.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
prune_mode
})
);
};
test_prune(
998,
1,
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 1000),
);
test_prune(998, 2, (PruneProgress::Finished, 1000));
test_prune(1400, 3, (PruneProgress::Finished, 804));
}
}

View File

@@ -1,6 +1,4 @@
use crate::PruneLimiter;
use alloy_primitives::BlockNumber;
use itertools::Itertools;
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
models::ShardedKey,
@@ -9,8 +7,6 @@ use reth_db_api::{
BlockNumberList, DatabaseError, RawKey, RawTable, RawValue,
};
use reth_provider::DBProvider;
use reth_prune_types::{SegmentOutput, SegmentOutputCheckpoint};
use rustc_hash::FxHashMap;
enum PruneShardOutcome {
Deleted,
@@ -25,65 +21,6 @@ pub(crate) struct PrunedIndices {
pub(crate) unchanged: usize,
}
/// Result of pruning history changesets, used to build the final output.
pub(crate) struct HistoryPruneResult<K> {
/// Map of the highest deleted changeset keys to their block numbers.
pub(crate) highest_deleted: FxHashMap<K, BlockNumber>,
/// The last block number that had changesets pruned.
pub(crate) last_pruned_block: Option<BlockNumber>,
/// Number of changesets pruned.
pub(crate) pruned_count: usize,
/// Whether pruning is complete.
pub(crate) done: bool,
}
/// Finalizes history pruning by sorting sharded keys, pruning history indices, and building output.
///
/// This is shared between static file and database pruning for both account and storage history.
pub(crate) fn finalize_history_prune<Provider, T, K, SK>(
provider: &Provider,
result: HistoryPruneResult<K>,
range_end: BlockNumber,
limiter: &PruneLimiter,
to_sharded_key: impl Fn(K, BlockNumber) -> T::Key,
key_matches: impl Fn(&T::Key, &T::Key) -> bool,
) -> Result<SegmentOutput, DatabaseError>
where
Provider: DBProvider<Tx: DbTxMut>,
T: Table<Value = BlockNumberList>,
T::Key: AsRef<ShardedKey<SK>>,
K: Ord,
{
let HistoryPruneResult { highest_deleted, last_pruned_block, pruned_count, done } = result;
// If there's more changesets to prune, set the checkpoint block number to previous,
// so we could finish pruning its changesets on the next run.
let last_changeset_pruned_block = last_pruned_block
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);
// Sort highest deleted block numbers and turn them into sharded keys.
// We use `sorted_unstable` because no equal keys exist in the map.
let highest_sharded_keys =
highest_deleted.into_iter().sorted_unstable().map(|(key, block_number)| {
to_sharded_key(key, block_number.min(last_changeset_pruned_block))
});
let outcomes =
prune_history_indices::<Provider, T, _>(provider, highest_sharded_keys, key_matches)?;
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: pruned_count + outcomes.deleted,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
/// Prune history indices according to the provided list of highest sharded keys.
///
/// Returns total number of deleted, updated and unchanged entities.

View File

@@ -1,27 +1,20 @@
use crate::{
db_ext::DbTxPruneExt,
segments::{
user::history::{finalize_history_prune, HistoryPruneResult},
PruneInput, Segment,
},
segments::{user::history::prune_history_indices, PruneInput, Segment, SegmentOutput},
PrunerError,
};
use alloy_primitives::{Address, BlockNumber, B256};
use itertools::Itertools;
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress},
tables,
transaction::DbTxMut,
};
use reth_provider::{DBProvider, EitherWriter, StaticFileProviderFactory};
use reth_prune_types::{
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
use reth_provider::DBProvider;
use reth_prune_types::{PruneMode, PrunePurpose, PruneSegment, SegmentOutputCheckpoint};
use rustc_hash::FxHashMap;
use tracing::{instrument, trace};
/// Number of storage history tables to prune in one step.
/// Number of storage history tables to prune in one step
///
/// Storage History consists of two tables: [`tables::StorageChangeSets`] and
/// [`tables::StoragesHistory`]. We want to prune them to the same block number.
@@ -40,10 +33,7 @@ impl StorageHistory {
impl<Provider> Segment<Provider> for StorageHistory
where
Provider: DBProvider<Tx: DbTxMut>
+ StaticFileProviderFactory
+ StorageChangeSetReader
+ StorageSettingsCache,
Provider: DBProvider<Tx: DbTxMut>,
{
fn segment(&self) -> PruneSegment {
PruneSegment::StorageHistory
@@ -68,32 +58,11 @@ where
};
let range_end = *range.end();
if EitherWriter::storage_changesets_destination(provider).is_static_file() {
self.prune_static_files(provider, input, range, range_end)
} else {
self.prune_database(provider, input, range, range_end)
}
}
}
impl StorageHistory {
/// Prunes storage history when changesets are stored in static files.
fn prune_static_files<Provider>(
&self,
provider: &Provider,
input: PruneInput,
range: std::ops::RangeInclusive<BlockNumber>,
range_end: BlockNumber,
) -> Result<SegmentOutput, PrunerError>
where
Provider: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory,
{
let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE)
} else {
input.limiter
};
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
@@ -101,90 +70,15 @@ impl StorageHistory {
))
}
// The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
// STORAGE_HISTORY_TABLES_TO_PRUNE`, and with current defaults it's usually `3500 * 5
// / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
// size should be up to ~0.5MB + some hashmap overhead. `blocks_since_last_run` is
// additionally limited by the `max_reorg_depth`, so no OOM is expected here.
let mut highest_deleted_storages = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut pruned_changesets = 0;
let mut done = true;
let walker = provider.static_file_provider().walk_storage_changeset_range(range);
for result in walker {
if limiter.is_limit_reached() {
done = false;
break;
}
let (block_address, entry) = result?;
let block_number = block_address.block_number();
let address = block_address.address();
highest_deleted_storages.insert((address, entry.key), block_number);
last_changeset_pruned_block = Some(block_number);
pruned_changesets += 1;
limiter.increment_deleted_entries_count();
}
// Delete static file jars below the pruned block
if let Some(last_block) = last_changeset_pruned_block {
provider
.static_file_provider()
.delete_segment_below_block(StaticFileSegment::StorageChangeSets, last_block + 1)?;
}
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned storage history (changesets from static files)");
let result = HistoryPruneResult {
highest_deleted: highest_deleted_storages,
last_pruned_block: last_changeset_pruned_block,
pruned_count: pruned_changesets,
done,
};
finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>(
provider,
result,
range_end,
&limiter,
|(address, storage_key), block_number| {
StorageShardedKey::new(address, storage_key, block_number)
},
|a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
)
.map_err(Into::into)
}
fn prune_database<Provider>(
&self,
provider: &Provider,
input: PruneInput,
range: std::ops::RangeInclusive<BlockNumber>,
range_end: BlockNumber,
) -> Result<SegmentOutput, PrunerError>
where
Provider: DBProvider<Tx: DbTxMut>,
{
let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE)
} else {
input.limiter
};
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
// Deleted storage changeset keys (account addresses and storage slots) with the highest
// block number deleted for that key.
//
// The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
// STORAGE_HISTORY_TABLES_TO_PRUNE`, and with current defaults it's usually `3500 * 5
// The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
// STORAGE_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5
// / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
// size should be up to ~0.5MB + some hashmap overhead. `blocks_since_last_run` is
// size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is
// additionally limited by the `max_reorg_depth`, so no OOM is expected here.
let mut last_changeset_pruned_block = None;
let mut highest_deleted_storages = FxHashMap::default();
let (pruned_changesets, done) =
provider.tx_ref().prune_table_with_range::<tables::StorageChangeSets>(
@@ -198,46 +92,64 @@ impl StorageHistory {
)?;
trace!(target: "pruner", deleted = %pruned_changesets, %done, "Pruned storage history (changesets)");
let result = HistoryPruneResult {
highest_deleted: highest_deleted_storages,
last_pruned_block: last_changeset_pruned_block,
pruned_count: pruned_changesets,
done,
};
finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>(
let last_changeset_pruned_block = last_changeset_pruned_block
// If there's more storage changesets to prune, set the checkpoint block number to
// previous, so we could finish pruning its storage changesets on the next run.
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);
// Sort highest deleted block numbers by account address and storage key and turn them into
// sharded keys.
// We did not use `BTreeMap` from the beginning, because it's inefficient for hashes.
let highest_sharded_keys = highest_deleted_storages
.into_iter()
.sorted_unstable() // Unstable is fine because no equal keys exist in the map
.map(|((address, storage_key), block_number)| {
StorageShardedKey::new(
address,
storage_key,
block_number.min(last_changeset_pruned_block),
)
});
let outcomes = prune_history_indices::<Provider, tables::StoragesHistory, _>(
provider,
result,
range_end,
&limiter,
|(address, storage_key), block_number| {
StorageShardedKey::new(address, storage_key, block_number)
},
highest_sharded_keys,
|a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
)
.map_err(Into::into)
)?;
trace!(target: "pruner", ?outcomes, %done, "Pruned storage history (indices)");
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: pruned_changesets + outcomes.deleted,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
}
#[cfg(test)]
mod tests {
use super::STORAGE_HISTORY_TABLES_TO_PRUNE;
use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, StorageHistory};
use crate::segments::{
user::storage_history::STORAGE_HISTORY_TABLES_TO_PRUNE, PruneInput, PruneLimiter, Segment,
SegmentOutput, StorageHistory,
};
use alloy_primitives::{BlockNumber, B256};
use assert_matches::assert_matches;
use reth_db_api::{models::StorageSettings, tables, BlockNumberList};
use reth_db_api::{tables, BlockNumberList};
use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_storage_api::StorageSettingsCache;
use reth_testing_utils::generators::{
self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
};
use std::{collections::BTreeMap, ops::AddAssign};
#[test]
fn prune_legacy() {
fn prune() {
let db = TestStageDB::default();
let mut rng = generators::rng();
@@ -296,9 +208,6 @@ mod tests {
let segment = StorageHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_storage_changesets_in_static_files(false),
);
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
@@ -338,19 +247,19 @@ mod tests {
.map(|(i, _)| i)
.unwrap_or_default();
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let mut pruned_changesets = changesets
.iter()
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
.skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _, _)| {
(if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
}) as BlockNumber
})
.map(|(block_number, _, _)| if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
} as BlockNumber)
.unwrap_or(to_block);
let pruned_changesets = pruned_changesets.fold(
@@ -397,160 +306,14 @@ mod tests {
test_prune(
998,
1,
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 500),
(
PruneProgress::HasMoreData(
reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached,
),
500,
),
);
test_prune(998, 2, (PruneProgress::Finished, 499));
test_prune(1200, 3, (PruneProgress::Finished, 202));
}
#[test]
fn prune_static_file() {
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=5000,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
1..2,
1..2,
);
db.insert_changesets_to_static_files(changesets.clone(), None)
.expect("insert changesets to static files");
db.insert_history(changesets.clone(), None).expect("insert history");
let storage_occurrences = db.table::<tables::StoragesHistory>().unwrap().into_iter().fold(
BTreeMap::<_, usize>::new(),
|mut map, (key, _)| {
map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1);
map
},
);
assert!(storage_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
let original_shards = db.table::<tables::StoragesHistory>().unwrap();
let test_prune = |to_block: BlockNumber,
run: usize,
expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let deleted_entries_limit = 1000;
let mut limiter =
PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
let input = PruneInput {
previous_checkpoint: db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::StorageHistory)
.unwrap(),
to_block,
limiter: limiter.clone(),
};
let segment = StorageHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_storage_changesets_in_static_files(true),
);
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
assert_matches!(
result,
SegmentOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");
let changesets = changesets
.iter()
.enumerate()
.flat_map(|(block_number, changeset)| {
changeset.iter().flat_map(move |(address, _, entries)| {
entries.iter().map(move |entry| (block_number, address, entry))
})
})
.collect::<Vec<_>>();
#[expect(clippy::skip_while_next)]
let pruned = changesets
.iter()
.enumerate()
.skip_while(|(i, (block_number, _, _))| {
*i < deleted_entries_limit / STORAGE_HISTORY_TABLES_TO_PRUNE * run &&
*block_number <= to_block as usize
})
.next()
.map(|(i, _)| i)
.unwrap_or_default();
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _, _)| {
(if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
}) as BlockNumber
})
.unwrap_or(to_block);
let actual_shards = db.table::<tables::StoragesHistory>().unwrap();
let expected_shards = original_shards
.iter()
.filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number)
.map(|(key, blocks)| {
let new_blocks =
blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
})
.collect::<Vec<_>>();
assert_eq!(actual_shards, expected_shards);
assert_eq!(
db.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::StorageHistory)
.unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
prune_mode
})
);
};
test_prune(
998,
1,
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 500),
);
test_prune(998, 2, (PruneProgress::Finished, 500));
test_prune(1200, 3, (PruneProgress::Finished, 202));
}
}

View File

@@ -18,7 +18,6 @@ alloy-primitives.workspace = true
derive_more.workspace = true
strum = { workspace = true, features = ["derive"] }
thiserror.workspace = true
tracing.workspace = true
modular-bitfield = { workspace = true, optional = true }
serde = { workspace = true, features = ["derive"], optional = true }
@@ -43,9 +42,8 @@ std = [
"derive_more/std",
"serde?/std",
"serde_json/std",
"strum/std",
"thiserror/std",
"tracing/std",
"strum/std",
]
test-utils = [
"std",

View File

@@ -1,9 +1,7 @@
use crate::{PruneCheckpoint, PruneMode, PruneSegment};
use alloc::{format, string::ToString, vec::Vec};
use alloc::vec::Vec;
use alloy_primitives::{BlockNumber, TxNumber};
use core::time::Duration;
use derive_more::Display;
use tracing::debug;
/// Pruner run output.
#[derive(Debug)]
@@ -20,49 +18,6 @@ impl From<PruneProgress> for PrunerOutput {
}
}
impl PrunerOutput {
/// Logs a human-readable summary of the pruner run at DEBUG level.
///
/// Format: `"Pruner finished tip=24328929 deleted=10886 elapsed=148ms
/// segments=AccountHistory[24318865, done] ..."`
#[inline]
pub fn debug_log(
&self,
tip_block_number: BlockNumber,
deleted_entries: usize,
elapsed: Duration,
) {
let message = match self.progress {
PruneProgress::HasMoreData(_) => "Pruner interrupted, has more data",
PruneProgress::Finished => "Pruner finished",
};
let segments: Vec<_> = self
.segments
.iter()
.filter(|(_, seg)| seg.pruned > 0)
.map(|(segment, seg)| {
let block = seg
.checkpoint
.and_then(|c| c.block_number)
.map(|b| b.to_string())
.unwrap_or_else(|| "?".to_string());
let status = if seg.progress.is_finished() { "done" } else { "in_progress" };
format!("{segment}[{block}, {status}]")
})
.collect();
debug!(
target: "pruner",
%tip_block_number,
deleted_entries,
?elapsed,
segments = %segments.join(" "),
"{message}",
);
}
}
/// Represents information of a pruner run for a segment.
#[derive(Debug, Clone, PartialEq, Eq, Display)]
#[display("(table={segment}, pruned={pruned}, status={progress})")]

View File

@@ -24,9 +24,9 @@ pub enum PruneSegment {
Receipts,
/// Prune segment responsible for some rows in `Receipts` table filtered by logs.
ContractLogs,
/// Prunes account changesets (static files/MDBX) and `AccountsHistory`.
/// Prune segment responsible for the `AccountChangeSets` and `AccountsHistory` tables.
AccountHistory,
/// Prunes storage changesets (static files/MDBX) and `StoragesHistory`.
/// Prune segment responsible for the `StorageChangeSets` and `StoragesHistory` tables.
StorageHistory,
#[deprecated = "Variant indexes cannot be changed"]
#[strum(disabled)]

View File

@@ -122,7 +122,6 @@ impl RethRpcServerConfig for RpcServerArgs {
max_receipts: self.rpc_state_cache.max_receipts,
max_headers: self.rpc_state_cache.max_headers,
max_concurrent_db_requests: self.rpc_state_cache.max_concurrent_db_requests,
max_cached_tx_hashes: self.rpc_state_cache.max_cached_tx_hashes,
}
}

View File

@@ -3,13 +3,6 @@
use std::collections::HashSet;
use tracing::warn;
/// Critical Engine API method prefixes that warrant warnings on capability mismatches.
///
/// These are essential for block production and chain synchronization. Missing support
/// for these methods indicates a significant version mismatch that operators should address.
const CRITICAL_METHOD_PREFIXES: &[&str] =
&["engine_forkchoiceUpdated", "engine_getPayload", "engine_newPayload"];
/// All Engine API capabilities supported by Reth (Ethereum mainnet).
///
/// See <https://github.com/ethereum/execution-apis/tree/main/src/engine> for updates.
@@ -79,52 +72,31 @@ impl EngineCapabilities {
CapabilityMismatches { missing_in_el, missing_in_cl }
}
/// Logs warnings if CL and EL capabilities don't match for critical methods.
/// Logs warnings if CL and EL capabilities don't match.
///
/// Called during `engine_exchangeCapabilities` to warn operators about
/// version mismatches between the consensus layer and execution layer.
///
/// Only warns about critical methods (`engine_forkchoiceUpdated`, `engine_getPayload`,
/// `engine_newPayload`) that are essential for block production and chain synchronization.
/// Non-critical methods like `engine_getBlobs` are not warned about since not all
/// clients support them.
pub fn log_capability_mismatches(&self, cl_capabilities: &[String]) {
let mismatches = self.get_capability_mismatches(cl_capabilities);
let critical_missing_in_el: Vec<_> =
mismatches.missing_in_el.iter().filter(|m| is_critical_method(m)).cloned().collect();
let critical_missing_in_cl: Vec<_> =
mismatches.missing_in_cl.iter().filter(|m| is_critical_method(m)).cloned().collect();
if !critical_missing_in_el.is_empty() {
if !mismatches.missing_in_el.is_empty() {
warn!(
target: "rpc::engine",
missing = ?critical_missing_in_el,
missing = ?mismatches.missing_in_el,
"CL supports Engine API methods that Reth doesn't. Consider upgrading Reth."
);
}
if !critical_missing_in_cl.is_empty() {
if !mismatches.missing_in_cl.is_empty() {
warn!(
target: "rpc::engine",
missing = ?critical_missing_in_cl,
missing = ?mismatches.missing_in_cl,
"Reth supports Engine API methods that CL doesn't. Consider upgrading your consensus client."
);
}
}
}
/// Returns `true` if the method is critical for block production and chain synchronization.
fn is_critical_method(method: &str) -> bool {
CRITICAL_METHOD_PREFIXES.iter().any(|prefix| {
method.starts_with(prefix) &&
method[prefix.len()..]
.strip_prefix('V')
.is_some_and(|s| s.chars().next().is_some_and(|c| c.is_ascii_digit()))
})
}
impl Default for EngineCapabilities {
fn default() -> Self {
Self::new(CAPABILITIES.iter().copied())
@@ -201,20 +173,4 @@ mod tests {
assert_eq!(result.missing_in_el, vec!["a_other", "z_other"]);
assert_eq!(result.missing_in_cl, vec!["a_method", "z_method"]);
}
#[test]
fn test_is_critical_method() {
assert!(is_critical_method("engine_forkchoiceUpdatedV1"));
assert!(is_critical_method("engine_forkchoiceUpdatedV3"));
assert!(is_critical_method("engine_getPayloadV1"));
assert!(is_critical_method("engine_getPayloadV4"));
assert!(is_critical_method("engine_newPayloadV1"));
assert!(is_critical_method("engine_newPayloadV4"));
assert!(!is_critical_method("engine_getBlobsV1"));
assert!(!is_critical_method("engine_getBlobsV3"));
assert!(!is_critical_method("engine_getPayloadBodiesByHashV1"));
assert!(!is_critical_method("engine_getPayloadBodiesByRangeV1"));
assert!(!is_critical_method("engine_getClientVersionV1"));
}
}

View File

@@ -20,8 +20,8 @@ use alloy_rpc_types_eth::{
use futures::Future;
use reth_errors::{ProviderError, RethError};
use reth_evm::{
env::BlockEnvironment, execute::BlockBuilder, ConfigureEvm, Evm, EvmEnvFor, HaltReasonFor,
InspectorFor, TransactionEnv, TxEnvFor,
env::BlockEnvironment, ConfigureEvm, Evm, EvmEnvFor, HaltReasonFor, InspectorFor,
TransactionEnv, TxEnvFor,
};
use reth_node_api::BlockBody;
use reth_primitives_traits::Recovered;
@@ -96,37 +96,7 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
self.spawn_with_state_at_block(block, move |this, mut db| {
let mut blocks: Vec<SimulatedBlock<RpcBlock<Self::NetworkTypes>>> =
Vec::with_capacity(block_state_calls.len());
// Track previous block number and timestamp for validation
let mut prev_block_number = parent.number();
let mut prev_timestamp = parent.timestamp();
for block in block_state_calls {
// Validate block number ordering if overridden
if let Some(number) = block.block_overrides.as_ref().and_then(|o| o.number) {
let number: u64 = number.try_into().unwrap_or(u64::MAX);
if number <= prev_block_number {
return Err(EthApiError::other(EthSimulateError::BlockNumberInvalid {
got: number,
parent: prev_block_number,
})
.into());
}
}
// Validate timestamp ordering if overridden
if let Some(time) = block
.block_overrides
.as_ref()
.and_then(|o| o.time)
.filter(|&t| t <= prev_timestamp)
{
return Err(EthApiError::other(EthSimulateError::BlockTimestampInvalid {
got: time,
parent: prev_timestamp,
})
.into());
}
let mut evm_env = this
.evm_config()
.next_evm_env(&parent, &this.next_env_attributes(&parent)?)
@@ -146,11 +116,6 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
let SimBlock { block_overrides, state_overrides, calls } = block;
// Set prevrandao to zero for simulated blocks by default,
// matching spec behavior where MixDigest is zero-initialized.
// If user provides an override, it will be applied by apply_block_overrides.
evm_env.block_env.inner_mut().prevrandao = Some(B256::ZERO);
if let Some(block_overrides) = block_overrides {
// ensure we don't allow uncapped gas limit per block
if let Some(gas_limit_override) = block_overrides.gas_limit &&
@@ -165,8 +130,8 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
evm_env.block_env.inner_mut(),
);
}
if let Some(ref state_overrides) = state_overrides {
apply_state_overrides(state_overrides.clone(), &mut db)
if let Some(state_overrides) = state_overrides {
apply_state_overrides(state_overrides, &mut db)
.map_err(Self::Error::from_eth_err)?;
}
@@ -187,17 +152,7 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
}
if txs_without_gas_limit > 0 {
// Per spec: "gasLimit: blockGasLimit - soFarUsedGasInBlock"
// Divide remaining gas equally among transactions without gas
let gas_per_tx = (block_gas_limit - total_specified_gas) /
txs_without_gas_limit as u64;
// Cap to RPC gas limit, matching spec behavior
let call_gas_limit = this.call_gas_limit();
if call_gas_limit > 0 {
gas_per_tx.min(call_gas_limit)
} else {
gas_per_tx
}
(block_gas_limit - total_specified_gas) / txs_without_gas_limit as u64
} else {
0
}
@@ -222,16 +177,7 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
let evm = this
.evm_config()
.evm_with_env_and_inspector(&mut db, evm_env, inspector);
let mut builder = this.evm_config().create_block_builder(evm, &parent, ctx);
if let Some(ref state_overrides) = state_overrides {
simulate::apply_precompile_overrides(
state_overrides,
builder.evm_mut().precompiles_mut(),
)
.map_err(|e| Self::Error::from_eth_err(EthApiError::other(e)))?;
}
let builder = this.evm_config().create_block_builder(evm, &parent, ctx);
simulate::execute_transactions(
builder,
calls,
@@ -242,16 +188,7 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
.map_err(map_err)?
} else {
let evm = this.evm_config().evm_with_env(&mut db, evm_env);
let mut builder = this.evm_config().create_block_builder(evm, &parent, ctx);
if let Some(ref state_overrides) = state_overrides {
simulate::apply_precompile_overrides(
state_overrides,
builder.evm_mut().precompiles_mut(),
)
.map_err(|e| Self::Error::from_eth_err(EthApiError::other(e)))?;
}
let builder = this.evm_config().create_block_builder(evm, &parent, ctx);
simulate::execute_transactions(
builder,
calls,
@@ -264,10 +201,6 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
parent = result.block.clone_sealed_header();
// Update tracking for next iteration's validation
prev_block_number = parent.number();
prev_timestamp = parent.timestamp();
let block = simulate::build_simulated_block::<Self::Error, _>(
result.block,
results,

View File

@@ -79,7 +79,7 @@ where
blob_schedule: chain_spec
.blob_params_at_timestamp(timestamp)
// no blob support, so we set this to original cancun values as defined in eip-4844
.unwrap_or_else(BlobParams::cancun),
.unwrap_or(BlobParams::cancun()),
chain_id: chain_spec.chain().id(),
fork_id,
precompiles,

View File

@@ -310,14 +310,12 @@ pub trait Trace: LoadState<Error: FromEvmError<Self::Evm>> + Call {
.evm_factory()
.create_tracer(&mut db, evm_env, inspector_setup())
.try_trace_many(block.transactions_recovered().take(max_transactions), |ctx| {
#[allow(clippy::needless_update)]
let tx_info = TransactionInfo {
hash: Some(*ctx.tx.tx_hash()),
index: Some(idx),
block_hash: Some(block_hash),
block_number: Some(block_number),
base_fee: Some(base_fee),
..Default::default()
};
idx += 1;

View File

@@ -290,14 +290,12 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
let block_number = block.number();
let base_fee_per_gas = block.base_fee_per_gas();
if let Some((signer, tx)) = block.transactions_with_sender().nth(index) {
#[allow(clippy::needless_update)]
let tx_info = TransactionInfo {
hash: Some(*tx.tx_hash()),
block_hash: Some(block_hash),
block_number: Some(block_number),
base_fee: base_fee_per_gas,
index: Some(index as u64),
..Default::default()
};
return Ok(Some(
@@ -368,14 +366,12 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
.enumerate()
.find(|(_, (signer, tx))| **signer == sender && (*tx).nonce() == nonce)
.map(|(index, (signer, tx))| {
#[allow(clippy::needless_update)]
let tx_info = TransactionInfo {
hash: Some(*tx.tx_hash()),
block_hash: Some(block_hash),
block_number: Some(block_number),
base_fee: base_fee_per_gas,
index: Some(index as u64),
..Default::default()
};
Ok(self.converter().fill(tx.clone().with_signer(*signer), tx_info)?)
})
@@ -623,20 +619,7 @@ pub trait LoadTransaction: SpawnBlocking + FullEthApiTypes + RpcNodeCoreExt {
Output = Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error>,
> + Send {
async move {
// First, try the RPC cache
if let Some(cached) = self.cache().get_transaction_by_hash(hash).await &&
let Some(tx) = cached.recovered_transaction()
{
return Ok(Some(TransactionSource::Block {
transaction: tx.cloned(),
index: cached.tx_index as u64,
block_hash: cached.block.hash(),
block_number: cached.block.number(),
base_fee: cached.block.base_fee_per_gas(),
}));
}
// Cache miss - try to find the transaction on disk
// Try to find the transaction on disk
if let Some((tx, meta)) = self
.spawn_blocking_io(move |this| {
this.provider()

View File

@@ -2,68 +2,15 @@
use std::sync::Arc;
use alloy_consensus::{transaction::TxHashRef, TxReceipt};
use alloy_consensus::TxReceipt;
use alloy_primitives::TxHash;
use reth_primitives_traits::{
Block, BlockBody, BlockTy, IndexedTx, NodePrimitives, ReceiptTy, Recovered, RecoveredBlock,
SealedBlock,
BlockTy, IndexedTx, NodePrimitives, ReceiptTy, RecoveredBlock, SealedBlock,
};
use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcConvert, RpcTypes};
use crate::utils::calculate_gas_used_and_next_log_index;
/// Cached data for a transaction lookup.
#[derive(Debug, Clone)]
pub struct CachedTransaction<B: Block, R> {
/// The block containing this transaction.
pub block: Arc<RecoveredBlock<B>>,
/// Index of the transaction within the block.
pub tx_index: usize,
/// Receipts for the block, if available.
pub receipts: Option<Arc<Vec<R>>>,
}
impl<B: Block, R> CachedTransaction<B, R> {
/// Creates a new cached transaction entry.
pub const fn new(
block: Arc<RecoveredBlock<B>>,
tx_index: usize,
receipts: Option<Arc<Vec<R>>>,
) -> Self {
Self { block, tx_index, receipts }
}
/// Returns the `Recovered<&T>` transaction at the cached index.
pub fn recovered_transaction(&self) -> Option<Recovered<&<B::Body as BlockBody>::Transaction>> {
self.block.recovered_transaction(self.tx_index)
}
/// Converts this cached transaction into an RPC receipt using the given converter.
///
/// Returns `None` if receipts are not available or the transaction index is out of bounds.
pub fn into_receipt<N, C>(
self,
converter: &C,
) -> Option<Result<<C::Network as RpcTypes>::Receipt, C::Error>>
where
N: NodePrimitives<Block = B, Receipt = R>,
R: TxReceipt + Clone,
C: RpcConvert<Primitives = N>,
{
let receipts = self.receipts?;
let receipt = receipts.get(self.tx_index)?;
let tx_hash = *self.block.body().transactions().get(self.tx_index)?.tx_hash();
let tx = self.block.find_indexed(tx_hash)?;
convert_transaction_receipt::<N, C>(
self.block.as_ref(),
receipts.as_ref(),
tx,
receipt,
converter,
)
}
}
/// A pair of an [`Arc`] wrapped [`RecoveredBlock`] and its corresponding receipts.
///
/// This type is used throughout the RPC layer to efficiently pass around

View File

@@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
use reth_rpc_server_types::constants::cache::{
DEFAULT_BLOCK_CACHE_MAX_LEN, DEFAULT_CONCURRENT_DB_REQUESTS, DEFAULT_HEADER_CACHE_MAX_LEN,
DEFAULT_MAX_CACHED_TX_HASHES, DEFAULT_RECEIPT_CACHE_MAX_LEN,
DEFAULT_RECEIPT_CACHE_MAX_LEN,
};
/// Settings for the [`EthStateCache`](super::EthStateCache).
@@ -27,8 +27,6 @@ pub struct EthStateCacheConfig {
///
/// Default is 512.
pub max_concurrent_db_requests: usize,
/// Maximum number of transaction hashes to cache for transaction lookups.
pub max_cached_tx_hashes: u32,
}
impl Default for EthStateCacheConfig {
@@ -38,7 +36,6 @@ impl Default for EthStateCacheConfig {
max_receipts: DEFAULT_RECEIPT_CACHE_MAX_LEN,
max_headers: DEFAULT_HEADER_CACHE_MAX_LEN,
max_concurrent_db_requests: DEFAULT_CONCURRENT_DB_REQUESTS,
max_cached_tx_hashes: DEFAULT_MAX_CACHED_TX_HASHES,
}
}
}

View File

@@ -1,18 +1,17 @@
//! Async caching support for eth RPC
use super::{EthStateCacheConfig, MultiConsumerLruCache};
use crate::block::CachedTransaction;
use alloy_consensus::{transaction::TxHashRef, BlockHeader};
use alloy_consensus::BlockHeader;
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::{TxHash, B256};
use alloy_primitives::B256;
use futures::{stream::FuturesOrdered, Stream, StreamExt};
use reth_chain_state::CanonStateNotification;
use reth_errors::{ProviderError, ProviderResult};
use reth_execution_types::Chain;
use reth_primitives_traits::{Block, BlockBody, NodePrimitives, RecoveredBlock};
use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock};
use reth_storage_api::{BlockReader, TransactionVariant};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use schnellru::{ByLength, Limiter, LruMap};
use schnellru::{ByLength, Limiter};
use std::{
future::Future,
pin::Pin,
@@ -48,9 +47,6 @@ type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
/// The type that can send the response with a chain of cached blocks
type CachedParentBlocksResponseSender<B> = oneshot::Sender<Vec<Arc<RecoveredBlock<B>>>>;
/// The type that can send the response for a transaction hash lookup
type TransactionHashResponseSender<B, R> = oneshot::Sender<Option<CachedTransaction<B, R>>>;
type BlockLruCache<B, L> =
MultiConsumerLruCache<B256, Arc<RecoveredBlock<B>>, L, BlockWithSendersResponseSender<B>>;
@@ -83,13 +79,11 @@ impl<N: NodePrimitives> EthStateCache<N> {
max_receipts: u32,
max_headers: u32,
max_concurrent_db_operations: usize,
max_cached_tx_hashes: u32,
) -> (Self, EthStateCacheService<Provider, Tasks>)
where
Provider: BlockReader<Block = N::Block, Receipt = N::Receipt>,
{
let (to_service, rx) = unbounded_channel();
let service = EthStateCacheService {
provider,
full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
@@ -99,7 +93,6 @@ impl<N: NodePrimitives> EthStateCache<N> {
action_rx: UnboundedReceiverStream::new(rx),
action_task_spawner,
rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)),
tx_hash_index: LruMap::new(ByLength::new(max_cached_tx_hashes)),
};
let cache = Self { to_service };
(cache, service)
@@ -134,7 +127,6 @@ impl<N: NodePrimitives> EthStateCache<N> {
max_receipts,
max_headers,
max_concurrent_db_requests,
max_cached_tx_hashes,
} = config;
let (this, service) = Self::create(
provider,
@@ -143,7 +135,6 @@ impl<N: NodePrimitives> EthStateCache<N> {
max_receipts,
max_headers,
max_concurrent_db_requests,
max_cached_tx_hashes,
);
executor.spawn_critical("eth state cache", Box::pin(service));
this
@@ -264,19 +255,6 @@ impl<N: NodePrimitives> EthStateCache<N> {
Some(blocks)
}
}
/// Looks up a transaction by its hash in the cache index.
///
/// Returns the cached block, transaction index, and optionally receipts if the transaction
/// is in a cached block.
pub async fn get_transaction_by_hash(
&self,
tx_hash: TxHash,
) -> Option<CachedTransaction<N::Block, N::Receipt>> {
let (response_tx, rx) = oneshot::channel();
let _ = self.to_service.send(CacheAction::GetTransactionByHash { tx_hash, response_tx });
rx.await.ok()?
}
}
/// Thrown when the cache service task dropped.
#[derive(Debug, thiserror::Error)]
@@ -339,8 +317,6 @@ pub(crate) struct EthStateCacheService<
///
/// This restricts the max concurrent fetch tasks at the same time.
rate_limiter: Arc<Semaphore>,
/// LRU index mapping transaction hashes to their block hash and index within the block.
tx_hash_index: LruMap<TxHash, (B256, usize), ByLength>,
}
impl<Provider, Tasks> EthStateCacheService<Provider, Tasks>
@@ -348,29 +324,6 @@ where
Provider: BlockReader + Clone + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
{
/// Indexes all transactions in a block by transaction hash.
fn index_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
let block_hash = block.hash();
for (tx_idx, tx) in block.body().transactions().iter().enumerate() {
self.tx_hash_index.insert(*tx.tx_hash(), (block_hash, tx_idx));
}
}
/// Removes transaction index entries for a reorged block.
///
/// Only removes entries that still point to this block, preserving mappings for transactions
/// that were re-mined in a new canonical block.
fn remove_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
let block_hash = block.hash();
for tx in block.body().transactions() {
if let Some((mapped_hash, _)) = self.tx_hash_index.get(tx.tx_hash()) &&
*mapped_hash == block_hash
{
self.tx_hash_index.remove(tx.tx_hash());
}
}
}
fn on_new_block(
&mut self,
block_hash: B256,
@@ -597,8 +550,6 @@ where
}
CacheAction::CacheNewCanonicalChain { chain_change } => {
for block in chain_change.blocks {
// Index transactions before caching the block
this.index_block_transactions(&block);
this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
}
@@ -611,8 +562,6 @@ where
}
CacheAction::RemoveReorgedChain { chain_change } => {
for block in chain_change.blocks {
// Remove transaction index entries for reorged blocks
this.remove_block_transactions(&block);
this.on_reorg_block(block.hash(), Ok(Some(block)));
}
@@ -647,15 +596,6 @@ where
let _ = response_tx.send(blocks);
}
CacheAction::GetTransactionByHash { tx_hash, response_tx } => {
let result =
this.tx_hash_index.get(&tx_hash).and_then(|(block_hash, idx)| {
let block = this.full_block_cache.get(block_hash).cloned()?;
let receipts = this.receipts_cache.get(block_hash).cloned();
Some(CachedTransaction::new(block, *idx, receipts))
});
let _ = response_tx.send(result);
}
};
this.update_cached_metrics();
}
@@ -709,11 +649,6 @@ enum CacheAction<B: Block, R> {
max_blocks: usize,
response_tx: CachedParentBlocksResponseSender<B>,
},
/// Look up a transaction's cached data by its hash
GetTransactionByHash {
tx_hash: TxHash,
response_tx: TransactionHashResponseSender<B, R>,
},
}
struct BlockReceipts<R> {

Some files were not shown because too many files have changed in this diff Show More